chiark / gitweb /
wip split into multiple files and make compile
[innduct.git] / xmit.c
1 /*---------- assigning articles to conns, and transmitting ----------*/
2
3 static Article *dequeue_from(int peek, InputFile *ipf) {
4   if (!ipf) return 0;
5   if (peek) return LIST_HEAD(ipf->queue);
6
7   Article *art= LIST_REMHEAD(ipf->queue);
8   if (!art) return 0;
9   check_reading_pause_resume(ipf);
10   return art;
11 }
12
13 static Article *dequeue(int peek) {
14   Article *art;
15   art= dequeue_from(peek, flushing_input_file);  if (art) return art;
16   art= dequeue_from(peek, backlog_input_file);   if (art) return art;
17   art= dequeue_from(peek, main_input_file);      if (art) return art;
18   return 0;
19 }
20
21 static void check_assign_articles(void) {
22   for (;;) {
23     if (!dequeue(1))
24       break;
25
26     Conn *walk, *use=0;
27     int spare=0, inqueue=0;
28
29     /* Find a connection to offer this article.  We prefer a busy
30      * connection to an idle one, provided it's not full.  We take the
31      * first (oldest) and since that's stable, it will mean we fill up
32      * connections in order.  That way if we have too many
33      * connections, the spare ones will go away eventually.
34      */
35     FOR_CONN(walk) {
36       if (walk->quitting) continue;
37       inqueue= walk->sent.count + walk->priority.count
38              + walk->waiting.count;
39       spare= walk->max_queue - inqueue;
40       assert(inqueue <= max_queue_per_conn);
41       assert(spare >= 0);
42       if (inqueue==0) /*idle*/ { if (!use) use= walk; }
43       else if (spare>0) /*working*/ { use= walk; break; }
44     }
45     if (use) {
46       if (!inqueue) use->since_activity= 0; /* reset idle counter */
47       while (spare>0) {
48         Article *art= dequeue(0);
49         if (!art) break;
50         LIST_ADDTAIL(use->waiting, art);
51         lowvol_perperiod[lowvol_circptr]++;
52         spare--;
53       }
54       conn_maybe_write(use);
55     } else if (allow_connect_start()) {
56       connect_start();
57       break;
58     } else {
59       break;
60     }
61   }
62 }
63
64 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
65   conn_maybe_write(u);
66   return OOP_CONTINUE;
67 }
68
69 static void conn_maybe_write(Conn *conn)  {
70   for (;;) {
71     conn_make_some_xmits(conn);
72     if (!conn->xmitu) {
73       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
74       conn->oopwriting= 0;
75       return;
76     }
77
78     void *rp= conn_write_some_xmits(conn);
79     if (rp==OOP_CONTINUE) {
80       if (!conn->oopwriting) {
81         loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
82         conn->oopwriting= 1;
83       }
84       return;
85     } else if (rp==OOP_HALT) {
86       return;
87     } else if (!rp) {
88       /* transmitted everything */
89     } else {
90       abort();
91     }
92   }
93 }
94
95 /*---------- expiry, flow control and deferral ----------*/
96
97 /*
98  * flow control notes
99  * to ensure articles go away eventually
100  * separate queue for each input file
101  *   queue expiry
102  *     every period, check head of backlog queue for expiry with SMretrieve
103  *       if too old: discard, and check next article
104  *     also check every backlog article as we read it
105  *   flush expiry
106  *     after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
107  *     one-off: eat queued articles from flushing and write them to defer
108  *     one-off: connfail all connections which have any articles from flushing
109  *     newly read articles from flushing go straight to defer
110  *     this should take care of it and get us out of this state
111  * to avoid filling up ram needlessly
112  *   input control
113  *     limit number of queued articles for each ipf
114  *     pause/resume inputfile tailing
115  */
116
117 static void check_reading_pause_resume(InputFile *ipf) {
118   if (ipf->queue.count >= max_queue_per_ipf)
119     inputfile_reading_pause(ipf);
120   else
121     inputfile_reading_resume(ipf);
122 }
123
124 static void article_defer(Article *art /* not on a queue */, int whichcount) {
125   open_defer();
126   if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
127       || fflush(defer))
128     sysdie("write to defer file %s",path_defer);
129   article_done(art, whichcount);
130 }
131
132 static int article_check_expired(Article *art /* must be queued, not conn */) {
133   ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
134   if (artdata) { SMfreearticle(artdata); return 0; }
135
136   LIST_REMOVE(art->ipf->queue, art);
137   art->missing= 1;
138   art->ipf->count_nooffer_missing++;
139   article_done(art,-1);
140   return 1;
141 }
142
143 void inputfile_queue_check_expired(InputFile *ipf) {
144   if (!ipf) return;
145
146   for (;;) {
147     Article *art= LIST_HEAD(ipf->queue);
148     int expd= article_check_expired(art);
149     if (!expd) break;
150   }
151   check_reading_pause_resume(ipf);
152 }
153
154 static void article_autodefer(InputFile *ipf, Article *art) {
155   ipf->autodefer++;
156   article_defer(art,-1);
157 }
158
159 static int has_article_in(const ArticleList *al, InputFile *ipf) {
160   Article *art;
161   for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
162     if (art->ipf == ipf) return 1;
163   return 0;
164 }
165
166 static void autodefer_input_file_articles(InputFile *ipf) {
167   Article *art;
168   while ((art= LIST_REMHEAD(ipf->queue)))
169     article_autodefer(ipf, art);
170 }
171
172 static void autodefer_input_file(InputFile *ipf) {
173   static const char *const abandon= "stuck";
174   ipf->autodefer= 0;
175
176   autodefer_input_file_articles(ipf);
177
178   if (ipf->inprogress) {
179     Conn *walk;
180     FOR_CONN(walk) {
181       if (has_article_in(&walk->waiting,  ipf) ||
182           has_article_in(&walk->priority, ipf) ||
183           has_article_in(&walk->sent,     ipf))
184         walk->quitting= abandon;
185     }
186     while (ipf->inprogress) {
187       FOR_CONN(walk)
188         if (walk->quitting == abandon) goto found;
189       abort(); /* where are they ?? */
190
191     found:
192       connfail(walk, "connection is stuck or crawling,"
193                " and we need to finish flush");
194       autodefer_input_file_articles(ipf);
195     }
196   }
197
198   check_reading_pause_resume(ipf);
199 }
200
201 /*========== article transmission ==========*/
202
203 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
204                   XmitKind kind) { /* caller must then fill in details */
205   struct iovec *v= &conn->xmit[conn->xmitu];
206   XmitDetails *d= &conn->xmitd[conn->xmitu++];
207   v->iov_base= (char*)data;
208   v->iov_len= len;
209   d->kind= kind;
210   return d;
211 }
212
213 static void xmit_noalloc(Conn *conn, const char *data, int len) {
214   xmit_core(conn,data,len, xk_Const);
215 }
216 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
217
218 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
219   XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
220   d->info.sm_art= ah;
221 }
222
223 static void xmit_free(XmitDetails *d) {
224   switch (d->kind) {
225   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
226   case xk_Const:                                  break;
227   default: abort();
228   }
229 }
230
231 static void *conn_write_some_xmits(Conn *conn) {
232   /* return values:
233    *      0:            nothing more to write, no need to call us again
234    *      OOP_CONTINUE: more to write but fd not writeable
235    *      OOP_HALT:     disaster, have destroyed conn
236    */
237   for (;;) {
238     int count= conn->xmitu;
239     if (!count) return 0;
240
241     if (count > IOV_MAX) count= IOV_MAX;
242     ssize_t rs= writev(conn->fd, conn->xmit, count);
243     if (rs < 0) {
244       if (isewouldblock(errno)) return OOP_CONTINUE;
245       connfail(conn, "write failed: %s", strerror(errno));
246       return OOP_HALT;
247     }
248     assert(rs > 0);
249
250     int done;
251     for (done=0; rs; ) {
252       assert(done<conn->xmitu);
253       struct iovec *vp= &conn->xmit[done];
254       XmitDetails *dp= &conn->xmitd[done];
255       assert(vp->iov_len <= SSIZE_MAX);
256       if ((size_t)rs >= vp->iov_len) {
257         rs -= vp->iov_len;
258         xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */
259         done++;
260       } else {
261         vp->iov_base= (char*)vp->iov_base + rs;
262         vp->iov_len -= rs;
263         break; /* rs -= rs */
264       }
265     }
266     int newu= conn->xmitu - done;
267     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
268     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
269     conn->xmitu= newu;
270   }
271 }
272
273 static void conn_make_some_xmits(Conn *conn) {
274   for (;;) {
275     if (conn->xmitu+5 > CONNIOVS)
276       break;
277
278     Article *art= LIST_REMHEAD(conn->priority);
279     if (!art) art= LIST_REMHEAD(conn->waiting);
280     if (!art) break;
281
282     if (art->state >= art_Wanted || (conn->stream && nocheck)) {
283       /* actually send it */
284
285       ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
286
287       art->state=
288         art->state == art_Unchecked ? art_Unsolicited :
289         art->state == art_Wanted    ? art_Wanted      :
290         (abort(),-1);
291
292       if (!artdata) art->missing= 1;
293       art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
294
295       if (conn->stream) {
296         if (artdata) {
297           XMIT_LITERAL("TAKETHIS ");
298           xmit_noalloc(conn, art->messageid, art->midlen);
299           XMIT_LITERAL("\r\n");
300           xmit_artbody(conn, artdata);
301         } else {
302           article_done(art, -1);
303           continue;
304         }
305       } else {
306         /* we got 235 from IHAVE */
307         if (artdata) {
308           xmit_artbody(conn, artdata);
309         } else {
310           XMIT_LITERAL(".\r\n");
311         }
312       }
313
314       LIST_ADDTAIL(conn->sent, art);
315
316     } else {
317       /* check it */
318
319       if (conn->stream)
320         XMIT_LITERAL("CHECK ");
321       else
322         XMIT_LITERAL("IHAVE ");
323       xmit_noalloc(conn, art->messageid, art->midlen);
324       XMIT_LITERAL("\r\n");
325
326       assert(art->state == art_Unchecked);
327       art->ipf->counts[art->state][RC_sent]++;
328       LIST_ADDTAIL(conn->sent, art);
329     }
330   }
331 }
332