chiark / gitweb /
changelog: Finalise 2.2
[innduct.git] / xmit.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  xmit.c - transmitting checks and articles, flow control, expiry
5  *
6  *  Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
7  *  and contributors; see LICENCE.txt.
8  *  SPDX-License-Identifier: GPL-3.0-or-later
9  */
10
11 #include "innduct.h"
12
13 const char *const artstate_names[]=
14   { "Unchecked", "Wanted", "Unsolicited", 0 };
15
16 /*---------- assigning articles to conns, and transmitting ----------*/
17
18 static Article *dequeue_from(int peek, InputFile *ipf) {
19   if (!ipf) return 0;
20   if (peek) return LIST_HEAD(ipf->queue);
21
22   Article *art= LIST_REMHEAD(ipf->queue);
23   if (!art) return 0;
24   check_reading_pause_resume(ipf);
25   return art;
26 }
27
28 static Article *dequeue(int peek) {
29   Article *art;
30   art= dequeue_from(peek, flushing_input_file);  if (art) return art;
31   art= dequeue_from(peek, backlog_input_file);   if (art) return art;
32   art= dequeue_from(peek, main_input_file);      if (art) return art;
33   return 0;
34 }
35
36 static void conn_inqueue_spare(const Conn *conn,
37                                int *inqueue_r, int *spare_r) {
38   int inqueue= conn->sent.count + conn->priority.count + conn->waiting.count;
39   int spare= conn->max_queue - inqueue;
40   if (inqueue_r) *inqueue_r= inqueue;
41   if (spare_r) *spare_r= spare;
42 }
43
44 void check_assign_articles(void) {
45   for (;;) {
46     if (!dequeue(1))
47       break;
48
49     Conn *walk, *use=0;
50
51     /* Find a connection to offer this article.  We prefer a busy
52      * connection to an idle one, provided it's not full.  We take the
53      * first (oldest) and since that's stable, it will mean we fill up
54      * connections in order.  That way if we have too many
55      * connections, the spare ones will go away eventually.
56      */
57     FOR_CONN(walk) {
58       if (walk->quitting) continue;
59       int inqueue, spare;
60       conn_inqueue_spare(walk, &inqueue, &spare);
61       assert(inqueue <= max_queue_per_conn);
62       assert(spare >= 0);
63       if (inqueue==0) /*idle*/ { if (!use) use= walk; }
64       else if (spare>0) /*working*/ { use= walk; break; }
65     }
66     if (use) {
67       int inqueue, spare;
68       conn_inqueue_spare(use, &inqueue, &spare);
69       if (!inqueue) use->since_activity= 0; /* reset idle counter */
70       while (spare>0) {
71         Article *art= dequeue(0);
72         if (!art) break;
73         LIST_ADDTAIL(use->waiting, art);
74         lowvol_perperiod[lowvol_circptr]++;
75         spare--;
76       }
77       conn_maybe_write(use);
78     } else if (allow_connect_start()) {
79       connect_start();
80       break;
81     } else {
82       break;
83     }
84   }
85 }
86
87 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
88   conn_maybe_write(u);
89   return OOP_CONTINUE;
90 }
91
92 void conn_maybe_write(Conn *conn) {
93   for (;;) {
94     conn_make_some_xmits(conn);
95     if (!conn->xmitu) {
96       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
97       conn->oopwriting= 0;
98       return;
99     }
100
101     void *rp= conn_write_some_xmits(conn);
102     if (rp==OOP_CONTINUE) {
103       if (!conn->oopwriting) {
104         loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
105         conn->oopwriting= 1;
106       }
107       return;
108     } else if (rp==OOP_HALT) {
109       return;
110     } else if (!rp) {
111       /* transmitted everything */
112     } else {
113       abort();
114     }
115   }
116 }
117
118 /*---------- expiry, flow control and deferral ----------*/
119
120 /*
121  * flow control notes
122  * to ensure articles go away eventually
123  * separate queue for each input file
124  *   queue expiry
125  *     every period, check head of backlog queue for expiry with SMretrieve
126  *       if too old: discard, and check next article
127  *     also check every backlog article as we read it
128  *   flush expiry
129  *     after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
130  *     one-off: eat queued articles from flushing and write them to defer
131  *     one-off: connfail all connections which have any articles from flushing
132  *     newly read articles from flushing go straight to defer
133  *     this should take care of it and get us out of this state
134  * to avoid filling up ram needlessly
135  *   input control
136  *     limit number of queued articles for each ipf
137  *     pause/resume inputfile tailing
138  */
139
140 void check_reading_pause_resume(InputFile *ipf) {
141   if (ipf->queue.count >= max_queue_per_ipf)
142     inputfile_reading_pause(ipf);
143   else
144     inputfile_reading_resume(ipf);
145 }
146
147 void article_defer(Article *art /* not on a queue */, int whichcount) {
148   open_defer();
149   if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
150       || fflush(defer))
151     sysdie("write to defer file %s",path_defer);
152   article_done(art, whichcount);
153 }
154
155 int article_check_expired(Article *art /* must be queued, not conn */) {
156   ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
157   if (artdata) { SMfreearticle(artdata); return 0; }
158
159   LIST_REMOVE(art->ipf->queue, art);
160   art->missing= 1;
161   art->ipf->counts.events[nooffer_missing]++;
162   article_done(art,-1);
163   return 1;
164 }
165
166 void inputfile_queue_check_expired(InputFile *ipf) {
167   if (!ipf) return;
168
169   for (;;) {
170     Article *art= LIST_HEAD(ipf->queue);
171     if (!art) break;
172     int expd= article_check_expired(art);
173     if (!expd) break;
174   }
175   check_reading_pause_resume(ipf);
176 }
177
178 void article_autodefer(InputFile *ipf, Article *art) {
179   ipf->autodefer++;
180   article_defer(art,-1);
181 }
182
183 static int has_article_in(const ArticleList *al, InputFile *ipf) {
184   Article *art;
185   for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
186     if (art->ipf == ipf) return 1;
187   return 0;
188 }
189
190 static void autodefer_input_file_articles(InputFile *ipf) {
191   Article *art;
192   while ((art= LIST_REMHEAD(ipf->queue)))
193     article_autodefer(ipf, art);
194 }
195
196 void autodefer_input_file(InputFile *ipf) {
197   static const char *const abandon= "stuck";
198   ipf->autodefer= 0;
199
200   autodefer_input_file_articles(ipf);
201
202   if (ipf->inprogress) {
203     Conn *walk;
204     FOR_CONN(walk) {
205       if (has_article_in(&walk->waiting,  ipf) ||
206           has_article_in(&walk->priority, ipf) ||
207           has_article_in(&walk->sent,     ipf))
208         walk->quitting= abandon;
209     }
210     while (ipf->inprogress) {
211       FOR_CONN(walk)
212         if (walk->quitting == abandon) goto found;
213       abort(); /* where are they ?? */
214
215     found:
216       connfail(walk, "connection is stuck or crawling,"
217                " and we need to finish flush");
218       autodefer_input_file_articles(ipf);
219     }
220   }
221
222   check_reading_pause_resume(ipf);
223 }
224
225 /*========== article transmission ==========*/
226
227 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
228                   XmitKind kind) { /* caller must then fill in details */
229   struct iovec *v= &conn->xmit[conn->xmitu];
230   XmitDetails *d= &conn->xmitd[conn->xmitu++];
231   v->iov_base= (char*)data;
232   v->iov_len= len;
233   d->kind= kind;
234   return d;
235 }
236
237 static void xmit_noalloc(Conn *conn, const char *data, int len) {
238   xmit_core(conn,data,len, xk_Const);
239 }
240 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
241
242 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
243   XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
244   d->info.sm_art= ah;
245 }
246
247 void xmit_free(XmitDetails *d) {
248   switch (d->kind) {
249   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
250   case xk_Const:                                  break;
251   default: abort();
252   }
253 }
254
255 void *conn_write_some_xmits(Conn *conn) {
256   /* return values:
257    *      0:            nothing more to write, no need to call us again
258    *      OOP_CONTINUE: more to write but fd not writeable
259    *      OOP_HALT:     disaster, have destroyed conn
260    */
261   for (;;) {
262     int count= conn->xmitu;
263     if (!count) return 0;
264
265     if (count > IOV_MAX) count= IOV_MAX;
266     ssize_t rs= writev(conn->fd, conn->xmit, count);
267     if (rs < 0) {
268       if (isewouldblock(errno)) return OOP_CONTINUE;
269       connfail(conn, "write failed: %s", strerror(errno));
270       return OOP_HALT;
271     }
272     assert(rs > 0);
273
274     int done;
275     for (done=0; rs; ) {
276       assert(done<conn->xmitu);
277       struct iovec *vp= &conn->xmit[done];
278       XmitDetails *dp= &conn->xmitd[done];
279       assert(vp->iov_len <= SSIZE_MAX);
280       if ((size_t)rs >= vp->iov_len) {
281         rs -= vp->iov_len;
282         xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */
283         done++;
284       } else {
285         vp->iov_base= (char*)vp->iov_base + rs;
286         vp->iov_len -= rs;
287         break; /* rs -= rs */
288       }
289     }
290     int newu= conn->xmitu - done;
291     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
292     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
293     conn->xmitu= newu;
294   }
295 }
296
297 void conn_make_some_xmits(Conn *conn) {
298   for (;;) {
299     if (conn->xmitu+5 > CONNIOVS)
300       break;
301
302     Article *art= LIST_REMHEAD(conn->priority);
303     if (!art) art= LIST_REMHEAD(conn->waiting);
304     if (!art) break;
305
306     if (art->state >= art_Wanted || (conn->stream && nocheck)) {
307       /* actually send it */
308
309       ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
310
311       art->state=
312         art->state == art_Unchecked ? art_Unsolicited :
313         art->state == art_Wanted    ? art_Wanted      :
314         (abort(),-1);
315
316       if (!artdata) art->missing= 1;
317       art->ipf->counts.results[art->state][ artdata ? RC_sent : RC_missing ]++;
318
319       if (conn->stream) {
320         if (artdata) {
321           XMIT_LITERAL("TAKETHIS ");
322           xmit_noalloc(conn, art->messageid, art->midlen);
323           XMIT_LITERAL("\r\n");
324           xmit_artbody(conn, artdata);
325         } else {
326           article_done(art, -1);
327           continue;
328         }
329       } else {
330         /* we got 235 from IHAVE */
331         if (artdata) {
332           xmit_artbody(conn, artdata);
333         } else {
334           XMIT_LITERAL(".\r\n");
335         }
336       }
337
338       LIST_ADDTAIL(conn->sent, art);
339
340     } else {
341       /* check it */
342
343       if (conn->stream)
344         XMIT_LITERAL("CHECK ");
345       else
346         XMIT_LITERAL("IHAVE ");
347       xmit_noalloc(conn, art->messageid, art->midlen);
348       XMIT_LITERAL("\r\n");
349
350       assert(art->state == art_Unchecked);
351       art->ipf->counts.results[art->state][RC_sent]++;
352       LIST_ADDTAIL(conn->sent, art);
353     }
354   }
355 }
356