chiark / gitweb /
Update to work with jessie's automake (tested with automake 1.14)
[innduct.git] / xmit.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  xmit.c - transmitting checks and articles, flow control, expiry
5  *
6  *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7  * 
8  *  This program is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 3 of the License, or
11  *  (at your option) any later version.
12  * 
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  * 
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  *  (I believe that when you compile and link this as part of the inn2
22  *  build, with the Makefile runes I have provided, all the libraries
23  *  and files which end up included in innduct are licence-compatible
24  *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
25  */
26
27 #include "innduct.h"
28
29 const char *const artstate_names[]=
30   { "Unchecked", "Wanted", "Unsolicited", 0 };
31
32 /*---------- assigning articles to conns, and transmitting ----------*/
33
34 static Article *dequeue_from(int peek, InputFile *ipf) {
35   if (!ipf) return 0;
36   if (peek) return LIST_HEAD(ipf->queue);
37
38   Article *art= LIST_REMHEAD(ipf->queue);
39   if (!art) return 0;
40   check_reading_pause_resume(ipf);
41   return art;
42 }
43
44 static Article *dequeue(int peek) {
45   Article *art;
46   art= dequeue_from(peek, flushing_input_file);  if (art) return art;
47   art= dequeue_from(peek, backlog_input_file);   if (art) return art;
48   art= dequeue_from(peek, main_input_file);      if (art) return art;
49   return 0;
50 }
51
52 static void conn_inqueue_spare(const Conn *conn,
53                                int *inqueue_r, int *spare_r) {
54   int inqueue= conn->sent.count + conn->priority.count + conn->waiting.count;
55   int spare= conn->max_queue - inqueue;
56   if (inqueue_r) *inqueue_r= inqueue;
57   if (spare_r) *spare_r= spare;
58 }
59
60 void check_assign_articles(void) {
61   for (;;) {
62     if (!dequeue(1))
63       break;
64
65     Conn *walk, *use=0;
66
67     /* Find a connection to offer this article.  We prefer a busy
68      * connection to an idle one, provided it's not full.  We take the
69      * first (oldest) and since that's stable, it will mean we fill up
70      * connections in order.  That way if we have too many
71      * connections, the spare ones will go away eventually.
72      */
73     FOR_CONN(walk) {
74       if (walk->quitting) continue;
75       int inqueue, spare;
76       conn_inqueue_spare(walk, &inqueue, &spare);
77       assert(inqueue <= max_queue_per_conn);
78       assert(spare >= 0);
79       if (inqueue==0) /*idle*/ { if (!use) use= walk; }
80       else if (spare>0) /*working*/ { use= walk; break; }
81     }
82     if (use) {
83       int inqueue, spare;
84       conn_inqueue_spare(use, &inqueue, &spare);
85       if (!inqueue) use->since_activity= 0; /* reset idle counter */
86       while (spare>0) {
87         Article *art= dequeue(0);
88         if (!art) break;
89         LIST_ADDTAIL(use->waiting, art);
90         lowvol_perperiod[lowvol_circptr]++;
91         spare--;
92       }
93       conn_maybe_write(use);
94     } else if (allow_connect_start()) {
95       connect_start();
96       break;
97     } else {
98       break;
99     }
100   }
101 }
102
103 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
104   conn_maybe_write(u);
105   return OOP_CONTINUE;
106 }
107
108 void conn_maybe_write(Conn *conn) {
109   for (;;) {
110     conn_make_some_xmits(conn);
111     if (!conn->xmitu) {
112       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
113       conn->oopwriting= 0;
114       return;
115     }
116
117     void *rp= conn_write_some_xmits(conn);
118     if (rp==OOP_CONTINUE) {
119       if (!conn->oopwriting) {
120         loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
121         conn->oopwriting= 1;
122       }
123       return;
124     } else if (rp==OOP_HALT) {
125       return;
126     } else if (!rp) {
127       /* transmitted everything */
128     } else {
129       abort();
130     }
131   }
132 }
133
134 /*---------- expiry, flow control and deferral ----------*/
135
136 /*
137  * flow control notes
138  * to ensure articles go away eventually
139  * separate queue for each input file
140  *   queue expiry
141  *     every period, check head of backlog queue for expiry with SMretrieve
142  *       if too old: discard, and check next article
143  *     also check every backlog article as we read it
144  *   flush expiry
145  *     after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
146  *     one-off: eat queued articles from flushing and write them to defer
147  *     one-off: connfail all connections which have any articles from flushing
148  *     newly read articles from flushing go straight to defer
149  *     this should take care of it and get us out of this state
150  * to avoid filling up ram needlessly
151  *   input control
152  *     limit number of queued articles for each ipf
153  *     pause/resume inputfile tailing
154  */
155
156 void check_reading_pause_resume(InputFile *ipf) {
157   if (ipf->queue.count >= max_queue_per_ipf)
158     inputfile_reading_pause(ipf);
159   else
160     inputfile_reading_resume(ipf);
161 }
162
163 void article_defer(Article *art /* not on a queue */, int whichcount) {
164   open_defer();
165   if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
166       || fflush(defer))
167     sysdie("write to defer file %s",path_defer);
168   article_done(art, whichcount);
169 }
170
171 int article_check_expired(Article *art /* must be queued, not conn */) {
172   ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
173   if (artdata) { SMfreearticle(artdata); return 0; }
174
175   LIST_REMOVE(art->ipf->queue, art);
176   art->missing= 1;
177   art->ipf->counts.events[nooffer_missing]++;
178   article_done(art,-1);
179   return 1;
180 }
181
182 void inputfile_queue_check_expired(InputFile *ipf) {
183   if (!ipf) return;
184
185   for (;;) {
186     Article *art= LIST_HEAD(ipf->queue);
187     if (!art) break;
188     int expd= article_check_expired(art);
189     if (!expd) break;
190   }
191   check_reading_pause_resume(ipf);
192 }
193
194 void article_autodefer(InputFile *ipf, Article *art) {
195   ipf->autodefer++;
196   article_defer(art,-1);
197 }
198
199 static int has_article_in(const ArticleList *al, InputFile *ipf) {
200   Article *art;
201   for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
202     if (art->ipf == ipf) return 1;
203   return 0;
204 }
205
206 static void autodefer_input_file_articles(InputFile *ipf) {
207   Article *art;
208   while ((art= LIST_REMHEAD(ipf->queue)))
209     article_autodefer(ipf, art);
210 }
211
212 void autodefer_input_file(InputFile *ipf) {
213   static const char *const abandon= "stuck";
214   ipf->autodefer= 0;
215
216   autodefer_input_file_articles(ipf);
217
218   if (ipf->inprogress) {
219     Conn *walk;
220     FOR_CONN(walk) {
221       if (has_article_in(&walk->waiting,  ipf) ||
222           has_article_in(&walk->priority, ipf) ||
223           has_article_in(&walk->sent,     ipf))
224         walk->quitting= abandon;
225     }
226     while (ipf->inprogress) {
227       FOR_CONN(walk)
228         if (walk->quitting == abandon) goto found;
229       abort(); /* where are they ?? */
230
231     found:
232       connfail(walk, "connection is stuck or crawling,"
233                " and we need to finish flush");
234       autodefer_input_file_articles(ipf);
235     }
236   }
237
238   check_reading_pause_resume(ipf);
239 }
240
241 /*========== article transmission ==========*/
242
243 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
244                   XmitKind kind) { /* caller must then fill in details */
245   struct iovec *v= &conn->xmit[conn->xmitu];
246   XmitDetails *d= &conn->xmitd[conn->xmitu++];
247   v->iov_base= (char*)data;
248   v->iov_len= len;
249   d->kind= kind;
250   return d;
251 }
252
253 static void xmit_noalloc(Conn *conn, const char *data, int len) {
254   xmit_core(conn,data,len, xk_Const);
255 }
256 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
257
258 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
259   XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
260   d->info.sm_art= ah;
261 }
262
263 void xmit_free(XmitDetails *d) {
264   switch (d->kind) {
265   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
266   case xk_Const:                                  break;
267   default: abort();
268   }
269 }
270
271 void *conn_write_some_xmits(Conn *conn) {
272   /* return values:
273    *      0:            nothing more to write, no need to call us again
274    *      OOP_CONTINUE: more to write but fd not writeable
275    *      OOP_HALT:     disaster, have destroyed conn
276    */
277   for (;;) {
278     int count= conn->xmitu;
279     if (!count) return 0;
280
281     if (count > IOV_MAX) count= IOV_MAX;
282     ssize_t rs= writev(conn->fd, conn->xmit, count);
283     if (rs < 0) {
284       if (isewouldblock(errno)) return OOP_CONTINUE;
285       connfail(conn, "write failed: %s", strerror(errno));
286       return OOP_HALT;
287     }
288     assert(rs > 0);
289
290     int done;
291     for (done=0; rs; ) {
292       assert(done<conn->xmitu);
293       struct iovec *vp= &conn->xmit[done];
294       XmitDetails *dp= &conn->xmitd[done];
295       assert(vp->iov_len <= SSIZE_MAX);
296       if ((size_t)rs >= vp->iov_len) {
297         rs -= vp->iov_len;
298         xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */
299         done++;
300       } else {
301         vp->iov_base= (char*)vp->iov_base + rs;
302         vp->iov_len -= rs;
303         break; /* rs -= rs */
304       }
305     }
306     int newu= conn->xmitu - done;
307     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
308     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
309     conn->xmitu= newu;
310   }
311 }
312
313 void conn_make_some_xmits(Conn *conn) {
314   for (;;) {
315     if (conn->xmitu+5 > CONNIOVS)
316       break;
317
318     Article *art= LIST_REMHEAD(conn->priority);
319     if (!art) art= LIST_REMHEAD(conn->waiting);
320     if (!art) break;
321
322     if (art->state >= art_Wanted || (conn->stream && nocheck)) {
323       /* actually send it */
324
325       ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
326
327       art->state=
328         art->state == art_Unchecked ? art_Unsolicited :
329         art->state == art_Wanted    ? art_Wanted      :
330         (abort(),-1);
331
332       if (!artdata) art->missing= 1;
333       art->ipf->counts.results[art->state][ artdata ? RC_sent : RC_missing ]++;
334
335       if (conn->stream) {
336         if (artdata) {
337           XMIT_LITERAL("TAKETHIS ");
338           xmit_noalloc(conn, art->messageid, art->midlen);
339           XMIT_LITERAL("\r\n");
340           xmit_artbody(conn, artdata);
341         } else {
342           article_done(art, -1);
343           continue;
344         }
345       } else {
346         /* we got 235 from IHAVE */
347         if (artdata) {
348           xmit_artbody(conn, artdata);
349         } else {
350           XMIT_LITERAL(".\r\n");
351         }
352       }
353
354       LIST_ADDTAIL(conn->sent, art);
355
356     } else {
357       /* check it */
358
359       if (conn->stream)
360         XMIT_LITERAL("CHECK ");
361       else
362         XMIT_LITERAL("IHAVE ");
363       xmit_noalloc(conn, art->messageid, art->midlen);
364       XMIT_LITERAL("\r\n");
365
366       assert(art->state == art_Unchecked);
367       art->ipf->counts.results[art->state][RC_sent]++;
368       LIST_ADDTAIL(conn->sent, art);
369     }
370   }
371 }
372