chiark / gitweb /
rename innduct.c to duct.c - fix up comments
[inn-innduct.git] / xmit.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  xmit.c - transmitting checks and articles, flow control, expiry
5  *
6  *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7  * 
8  *  This program is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 3 of the License, or
11  *  (at your option) any later version.
12  * 
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  * 
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  *  (I believe that when you compile and link this as part of the inn2
22  *  build, with the Makefile runes I have provided, all the libraries
23  *  and files which end up included in innduct are licence-compatible
24  *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
25  */
26
27 #include "innduct.h"
28
29 const char *const artstate_names[]=
30   { "Unchecked", "Wanted", "Unsolicited", 0 };
31
32 /*---------- assigning articles to conns, and transmitting ----------*/
33
34 static Article *dequeue_from(int peek, InputFile *ipf) {
35   if (!ipf) return 0;
36   if (peek) return LIST_HEAD(ipf->queue);
37
38   Article *art= LIST_REMHEAD(ipf->queue);
39   if (!art) return 0;
40   check_reading_pause_resume(ipf);
41   return art;
42 }
43
44 static Article *dequeue(int peek) {
45   Article *art;
46   art= dequeue_from(peek, flushing_input_file);  if (art) return art;
47   art= dequeue_from(peek, backlog_input_file);   if (art) return art;
48   art= dequeue_from(peek, main_input_file);      if (art) return art;
49   return 0;
50 }
51
52 void check_assign_articles(void) {
53   for (;;) {
54     if (!dequeue(1))
55       break;
56
57     Conn *walk, *use=0;
58     int spare=0, inqueue=0;
59
60     /* Find a connection to offer this article.  We prefer a busy
61      * connection to an idle one, provided it's not full.  We take the
62      * first (oldest) and since that's stable, it will mean we fill up
63      * connections in order.  That way if we have too many
64      * connections, the spare ones will go away eventually.
65      */
66     FOR_CONN(walk) {
67       if (walk->quitting) continue;
68       inqueue= walk->sent.count + walk->priority.count
69              + walk->waiting.count;
70       spare= walk->max_queue - inqueue;
71       assert(inqueue <= max_queue_per_conn);
72       assert(spare >= 0);
73       if (inqueue==0) /*idle*/ { if (!use) use= walk; }
74       else if (spare>0) /*working*/ { use= walk; break; }
75     }
76     if (use) {
77       if (!inqueue) use->since_activity= 0; /* reset idle counter */
78       while (spare>0) {
79         Article *art= dequeue(0);
80         if (!art) break;
81         LIST_ADDTAIL(use->waiting, art);
82         lowvol_perperiod[lowvol_circptr]++;
83         spare--;
84       }
85       conn_maybe_write(use);
86     } else if (allow_connect_start()) {
87       connect_start();
88       break;
89     } else {
90       break;
91     }
92   }
93 }
94
95 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
96   conn_maybe_write(u);
97   return OOP_CONTINUE;
98 }
99
100 void conn_maybe_write(Conn *conn) {
101   for (;;) {
102     conn_make_some_xmits(conn);
103     if (!conn->xmitu) {
104       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
105       conn->oopwriting= 0;
106       return;
107     }
108
109     void *rp= conn_write_some_xmits(conn);
110     if (rp==OOP_CONTINUE) {
111       if (!conn->oopwriting) {
112         loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
113         conn->oopwriting= 1;
114       }
115       return;
116     } else if (rp==OOP_HALT) {
117       return;
118     } else if (!rp) {
119       /* transmitted everything */
120     } else {
121       abort();
122     }
123   }
124 }
125
126 /*---------- expiry, flow control and deferral ----------*/
127
128 /*
129  * flow control notes
130  * to ensure articles go away eventually
131  * separate queue for each input file
132  *   queue expiry
133  *     every period, check head of backlog queue for expiry with SMretrieve
134  *       if too old: discard, and check next article
135  *     also check every backlog article as we read it
136  *   flush expiry
137  *     after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
138  *     one-off: eat queued articles from flushing and write them to defer
139  *     one-off: connfail all connections which have any articles from flushing
140  *     newly read articles from flushing go straight to defer
141  *     this should take care of it and get us out of this state
142  * to avoid filling up ram needlessly
143  *   input control
144  *     limit number of queued articles for each ipf
145  *     pause/resume inputfile tailing
146  */
147
148 void check_reading_pause_resume(InputFile *ipf) {
149   if (ipf->queue.count >= max_queue_per_ipf)
150     inputfile_reading_pause(ipf);
151   else
152     inputfile_reading_resume(ipf);
153 }
154
155 void article_defer(Article *art /* not on a queue */, int whichcount) {
156   open_defer();
157   if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
158       || fflush(defer))
159     sysdie("write to defer file %s",path_defer);
160   article_done(art, whichcount);
161 }
162
163 int article_check_expired(Article *art /* must be queued, not conn */) {
164   ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
165   if (artdata) { SMfreearticle(artdata); return 0; }
166
167   LIST_REMOVE(art->ipf->queue, art);
168   art->missing= 1;
169   art->ipf->count_nooffer_missing++;
170   article_done(art,-1);
171   return 1;
172 }
173
174 void inputfile_queue_check_expired(InputFile *ipf) {
175   if (!ipf) return;
176
177   for (;;) {
178     Article *art= LIST_HEAD(ipf->queue);
179     int expd= article_check_expired(art);
180     if (!expd) break;
181   }
182   check_reading_pause_resume(ipf);
183 }
184
185 void article_autodefer(InputFile *ipf, Article *art) {
186   ipf->autodefer++;
187   article_defer(art,-1);
188 }
189
190 static int has_article_in(const ArticleList *al, InputFile *ipf) {
191   Article *art;
192   for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
193     if (art->ipf == ipf) return 1;
194   return 0;
195 }
196
197 static void autodefer_input_file_articles(InputFile *ipf) {
198   Article *art;
199   while ((art= LIST_REMHEAD(ipf->queue)))
200     article_autodefer(ipf, art);
201 }
202
203 void autodefer_input_file(InputFile *ipf) {
204   static const char *const abandon= "stuck";
205   ipf->autodefer= 0;
206
207   autodefer_input_file_articles(ipf);
208
209   if (ipf->inprogress) {
210     Conn *walk;
211     FOR_CONN(walk) {
212       if (has_article_in(&walk->waiting,  ipf) ||
213           has_article_in(&walk->priority, ipf) ||
214           has_article_in(&walk->sent,     ipf))
215         walk->quitting= abandon;
216     }
217     while (ipf->inprogress) {
218       FOR_CONN(walk)
219         if (walk->quitting == abandon) goto found;
220       abort(); /* where are they ?? */
221
222     found:
223       connfail(walk, "connection is stuck or crawling,"
224                " and we need to finish flush");
225       autodefer_input_file_articles(ipf);
226     }
227   }
228
229   check_reading_pause_resume(ipf);
230 }
231
232 /*========== article transmission ==========*/
233
234 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
235                   XmitKind kind) { /* caller must then fill in details */
236   struct iovec *v= &conn->xmit[conn->xmitu];
237   XmitDetails *d= &conn->xmitd[conn->xmitu++];
238   v->iov_base= (char*)data;
239   v->iov_len= len;
240   d->kind= kind;
241   return d;
242 }
243
244 static void xmit_noalloc(Conn *conn, const char *data, int len) {
245   xmit_core(conn,data,len, xk_Const);
246 }
247 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
248
249 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
250   XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
251   d->info.sm_art= ah;
252 }
253
254 void xmit_free(XmitDetails *d) {
255   switch (d->kind) {
256   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
257   case xk_Const:                                  break;
258   default: abort();
259   }
260 }
261
262 void *conn_write_some_xmits(Conn *conn) {
263   /* return values:
264    *      0:            nothing more to write, no need to call us again
265    *      OOP_CONTINUE: more to write but fd not writeable
266    *      OOP_HALT:     disaster, have destroyed conn
267    */
268   for (;;) {
269     int count= conn->xmitu;
270     if (!count) return 0;
271
272     if (count > IOV_MAX) count= IOV_MAX;
273     ssize_t rs= writev(conn->fd, conn->xmit, count);
274     if (rs < 0) {
275       if (isewouldblock(errno)) return OOP_CONTINUE;
276       connfail(conn, "write failed: %s", strerror(errno));
277       return OOP_HALT;
278     }
279     assert(rs > 0);
280
281     int done;
282     for (done=0; rs; ) {
283       assert(done<conn->xmitu);
284       struct iovec *vp= &conn->xmit[done];
285       XmitDetails *dp= &conn->xmitd[done];
286       assert(vp->iov_len <= SSIZE_MAX);
287       if ((size_t)rs >= vp->iov_len) {
288         rs -= vp->iov_len;
289         xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */
290         done++;
291       } else {
292         vp->iov_base= (char*)vp->iov_base + rs;
293         vp->iov_len -= rs;
294         break; /* rs -= rs */
295       }
296     }
297     int newu= conn->xmitu - done;
298     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
299     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
300     conn->xmitu= newu;
301   }
302 }
303
304 void conn_make_some_xmits(Conn *conn) {
305   for (;;) {
306     if (conn->xmitu+5 > CONNIOVS)
307       break;
308
309     Article *art= LIST_REMHEAD(conn->priority);
310     if (!art) art= LIST_REMHEAD(conn->waiting);
311     if (!art) break;
312
313     if (art->state >= art_Wanted || (conn->stream && nocheck)) {
314       /* actually send it */
315
316       ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
317
318       art->state=
319         art->state == art_Unchecked ? art_Unsolicited :
320         art->state == art_Wanted    ? art_Wanted      :
321         (abort(),-1);
322
323       if (!artdata) art->missing= 1;
324       art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
325
326       if (conn->stream) {
327         if (artdata) {
328           XMIT_LITERAL("TAKETHIS ");
329           xmit_noalloc(conn, art->messageid, art->midlen);
330           XMIT_LITERAL("\r\n");
331           xmit_artbody(conn, artdata);
332         } else {
333           article_done(art, -1);
334           continue;
335         }
336       } else {
337         /* we got 235 from IHAVE */
338         if (artdata) {
339           xmit_artbody(conn, artdata);
340         } else {
341           XMIT_LITERAL(".\r\n");
342         }
343       }
344
345       LIST_ADDTAIL(conn->sent, art);
346
347     } else {
348       /* check it */
349
350       if (conn->stream)
351         XMIT_LITERAL("CHECK ");
352       else
353         XMIT_LITERAL("IHAVE ");
354       xmit_noalloc(conn, art->messageid, art->midlen);
355       XMIT_LITERAL("\r\n");
356
357       assert(art->state == art_Unchecked);
358       art->ipf->counts[art->state][RC_sent]++;
359       LIST_ADDTAIL(conn->sent, art);
360     }
361   }
362 }
363