3 * tailing reliable realtime streaming feeder for inn
4 * xmit.c - transmitting checks and articles, flow control, expiry
6 * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
8 * This program is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 * (I believe that when you compile and link this as part of the inn2
22 * build, with the Makefile runes I have provided, all the libraries
23 * and files which end up included in innduct are licence-compatible
24 * with GPLv3. If not then please let me know. -Ian Jackson.)
29 const char *const artstate_names[]=
30 { "Unchecked", "Wanted", "Unsolicited", 0 };
32 /*---------- assigning articles to conns, and transmitting ----------*/
34 static Article *dequeue_from(int peek, InputFile *ipf) {
36 if (peek) return LIST_HEAD(ipf->queue);
38 Article *art= LIST_REMHEAD(ipf->queue);
40 check_reading_pause_resume(ipf);
44 static Article *dequeue(int peek) {
46 art= dequeue_from(peek, flushing_input_file); if (art) return art;
47 art= dequeue_from(peek, backlog_input_file); if (art) return art;
48 art= dequeue_from(peek, main_input_file); if (art) return art;
52 static void conn_inqueue_spare(const Conn *conn,
53 int *inqueue_r, int *spare_r) {
54 int inqueue= conn->sent.count + conn->priority.count + conn->waiting.count;
55 int spare= conn->max_queue - inqueue;
56 if (inqueue_r) *inqueue_r= inqueue;
57 if (spare_r) *spare_r= spare;
60 void check_assign_articles(void) {
67 /* Find a connection to offer this article. We prefer a busy
68 * connection to an idle one, provided it's not full. We take the
69 * first (oldest) and since that's stable, it will mean we fill up
70 * connections in order. That way if we have too many
71 * connections, the spare ones will go away eventually.
74 if (walk->quitting) continue;
76 conn_inqueue_spare(walk, &inqueue, &spare);
77 assert(inqueue <= max_queue_per_conn);
79 if (inqueue==0) /*idle*/ { if (!use) use= walk; }
80 else if (spare>0) /*working*/ { use= walk; break; }
84 conn_inqueue_spare(use, &inqueue, &spare);
85 if (!inqueue) use->since_activity= 0; /* reset idle counter */
87 Article *art= dequeue(0);
89 LIST_ADDTAIL(use->waiting, art);
90 lowvol_perperiod[lowvol_circptr]++;
93 conn_maybe_write(use);
94 } else if (allow_connect_start()) {
103 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
108 void conn_maybe_write(Conn *conn) {
110 conn_make_some_xmits(conn);
112 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
117 void *rp= conn_write_some_xmits(conn);
118 if (rp==OOP_CONTINUE) {
119 if (!conn->oopwriting) {
120 loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
124 } else if (rp==OOP_HALT) {
127 /* transmitted everything */
134 /*---------- expiry, flow control and deferral ----------*/
138 * to ensure articles go away eventually
139 * separate queue for each input file
141 * every period, check head of backlog queue for expiry with SMretrieve
142 * if too old: discard, and check next article
143 * also check every backlog article as we read it
145 * after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
146 * one-off: eat queued articles from flushing and write them to defer
147 * one-off: connfail all connections which have any articles from flushing
148 * newly read articles from flushing go straight to defer
149 * this should take care of it and get us out of this state
150 * to avoid filling up ram needlessly
152 * limit number of queued articles for each ipf
153 * pause/resume inputfile tailing
156 void check_reading_pause_resume(InputFile *ipf) {
157 if (ipf->queue.count >= max_queue_per_ipf)
158 inputfile_reading_pause(ipf);
160 inputfile_reading_resume(ipf);
163 void article_defer(Article *art /* not on a queue */, int whichcount) {
165 if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
167 sysdie("write to defer file %s",path_defer);
168 article_done(art, whichcount);
171 int article_check_expired(Article *art /* must be queued, not conn */) {
172 ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
173 if (artdata) { SMfreearticle(artdata); return 0; }
175 LIST_REMOVE(art->ipf->queue, art);
177 art->ipf->counts.events[nooffer_missing]++;
178 article_done(art,-1);
182 void inputfile_queue_check_expired(InputFile *ipf) {
186 Article *art= LIST_HEAD(ipf->queue);
187 int expd= article_check_expired(art);
190 check_reading_pause_resume(ipf);
193 void article_autodefer(InputFile *ipf, Article *art) {
195 article_defer(art,-1);
198 static int has_article_in(const ArticleList *al, InputFile *ipf) {
200 for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
201 if (art->ipf == ipf) return 1;
205 static void autodefer_input_file_articles(InputFile *ipf) {
207 while ((art= LIST_REMHEAD(ipf->queue)))
208 article_autodefer(ipf, art);
211 void autodefer_input_file(InputFile *ipf) {
212 static const char *const abandon= "stuck";
215 autodefer_input_file_articles(ipf);
217 if (ipf->inprogress) {
220 if (has_article_in(&walk->waiting, ipf) ||
221 has_article_in(&walk->priority, ipf) ||
222 has_article_in(&walk->sent, ipf))
223 walk->quitting= abandon;
225 while (ipf->inprogress) {
227 if (walk->quitting == abandon) goto found;
228 abort(); /* where are they ?? */
231 connfail(walk, "connection is stuck or crawling,"
232 " and we need to finish flush");
233 autodefer_input_file_articles(ipf);
237 check_reading_pause_resume(ipf);
240 /*========== article transmission ==========*/
242 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
243 XmitKind kind) { /* caller must then fill in details */
244 struct iovec *v= &conn->xmit[conn->xmitu];
245 XmitDetails *d= &conn->xmitd[conn->xmitu++];
246 v->iov_base= (char*)data;
252 static void xmit_noalloc(Conn *conn, const char *data, int len) {
253 xmit_core(conn,data,len, xk_Const);
255 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
257 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
258 XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
262 void xmit_free(XmitDetails *d) {
264 case xk_Artdata: SMfreearticle(d->info.sm_art); break;
265 case xk_Const: break;
270 void *conn_write_some_xmits(Conn *conn) {
272 * 0: nothing more to write, no need to call us again
273 * OOP_CONTINUE: more to write but fd not writeable
274 * OOP_HALT: disaster, have destroyed conn
277 int count= conn->xmitu;
278 if (!count) return 0;
280 if (count > IOV_MAX) count= IOV_MAX;
281 ssize_t rs= writev(conn->fd, conn->xmit, count);
283 if (isewouldblock(errno)) return OOP_CONTINUE;
284 connfail(conn, "write failed: %s", strerror(errno));
291 assert(done<conn->xmitu);
292 struct iovec *vp= &conn->xmit[done];
293 XmitDetails *dp= &conn->xmitd[done];
294 assert(vp->iov_len <= SSIZE_MAX);
295 if ((size_t)rs >= vp->iov_len) {
297 xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */
300 vp->iov_base= (char*)vp->iov_base + rs;
302 break; /* rs -= rs */
305 int newu= conn->xmitu - done;
306 memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
307 memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
312 void conn_make_some_xmits(Conn *conn) {
314 if (conn->xmitu+5 > CONNIOVS)
317 Article *art= LIST_REMHEAD(conn->priority);
318 if (!art) art= LIST_REMHEAD(conn->waiting);
321 if (art->state >= art_Wanted || (conn->stream && nocheck)) {
322 /* actually send it */
324 ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
327 art->state == art_Unchecked ? art_Unsolicited :
328 art->state == art_Wanted ? art_Wanted :
331 if (!artdata) art->missing= 1;
332 art->ipf->counts.results[art->state][ artdata ? RC_sent : RC_missing ]++;
336 XMIT_LITERAL("TAKETHIS ");
337 xmit_noalloc(conn, art->messageid, art->midlen);
338 XMIT_LITERAL("\r\n");
339 xmit_artbody(conn, artdata);
341 article_done(art, -1);
345 /* we got 235 from IHAVE */
347 xmit_artbody(conn, artdata);
349 XMIT_LITERAL(".\r\n");
353 LIST_ADDTAIL(conn->sent, art);
359 XMIT_LITERAL("CHECK ");
361 XMIT_LITERAL("IHAVE ");
362 xmit_noalloc(conn, art->messageid, art->midlen);
363 XMIT_LITERAL("\r\n");
365 assert(art->state == art_Unchecked);
366 art->ipf->counts.results[art->state][RC_sent]++;
367 LIST_ADDTAIL(conn->sent, art);