3 * tailing reliable realtime streaming feeder for inn
4 * xmit.c - transmitting checks and articles, flow control, expiry
6 * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
8 * This program is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 * (I believe that when you compile and link this as part of the inn2
22 * build, with the Makefile runes I have provided, all the libraries
23 * and files which end up included in innduct are licence-compatible
24 * with GPLv3. If not then please let me know. -Ian Jackson.)
29 const char *const artstate_names[]=
30 { "Unchecked", "Wanted", "Unsolicited", 0 };
32 /*---------- assigning articles to conns, and transmitting ----------*/
34 static Article *dequeue_from(int peek, InputFile *ipf) {
36 if (peek) return LIST_HEAD(ipf->queue);
38 Article *art= LIST_REMHEAD(ipf->queue);
40 check_reading_pause_resume(ipf);
44 static Article *dequeue(int peek) {
46 art= dequeue_from(peek, flushing_input_file); if (art) return art;
47 art= dequeue_from(peek, backlog_input_file); if (art) return art;
48 art= dequeue_from(peek, main_input_file); if (art) return art;
52 static void conn_inqueue_spare(const Conn *conn,
53 int *inqueue_r, int *spare_r) {
54 int inqueue= conn->sent.count + conn->priority.count + conn->waiting.count;
55 int spare= conn->max_queue - inqueue;
56 if (inqueue_r) *inqueue_r= inqueue;
57 if (spare_r) *spare_r= spare;
60 void check_assign_articles(void) {
67 /* Find a connection to offer this article. We prefer a busy
68 * connection to an idle one, provided it's not full. We take the
69 * first (oldest) and since that's stable, it will mean we fill up
70 * connections in order. That way if we have too many
71 * connections, the spare ones will go away eventually.
74 if (walk->quitting) continue;
76 conn_inqueue_spare(walk, &inqueue, &spare);
77 assert(inqueue <= max_queue_per_conn);
79 if (inqueue==0) /*idle*/ { if (!use) use= walk; }
80 else if (spare>0) /*working*/ { use= walk; break; }
84 conn_inqueue_spare(use, &inqueue, &spare);
85 if (!inqueue) use->since_activity= 0; /* reset idle counter */
87 Article *art= dequeue(0);
89 LIST_ADDTAIL(use->waiting, art);
90 lowvol_perperiod[lowvol_circptr]++;
93 conn_maybe_write(use);
94 } else if (allow_connect_start()) {
103 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
108 void conn_maybe_write(Conn *conn) {
110 conn_make_some_xmits(conn);
112 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
117 void *rp= conn_write_some_xmits(conn);
118 if (rp==OOP_CONTINUE) {
119 if (!conn->oopwriting) {
120 loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
124 } else if (rp==OOP_HALT) {
127 /* transmitted everything */
134 /*---------- expiry, flow control and deferral ----------*/
138 * to ensure articles go away eventually
139 * separate queue for each input file
141 * every period, check head of backlog queue for expiry with SMretrieve
142 * if too old: discard, and check next article
143 * also check every backlog article as we read it
145 * after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
146 * one-off: eat queued articles from flushing and write them to defer
147 * one-off: connfail all connections which have any articles from flushing
148 * newly read articles from flushing go straight to defer
149 * this should take care of it and get us out of this state
150 * to avoid filling up ram needlessly
152 * limit number of queued articles for each ipf
153 * pause/resume inputfile tailing
156 void check_reading_pause_resume(InputFile *ipf) {
157 if (ipf->queue.count >= max_queue_per_ipf)
158 inputfile_reading_pause(ipf);
160 inputfile_reading_resume(ipf);
163 void article_defer(Article *art /* not on a queue */, int whichcount) {
165 if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
167 sysdie("write to defer file %s",path_defer);
168 article_done(art, whichcount);
171 int article_check_expired(Article *art /* must be queued, not conn */) {
172 ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
173 if (artdata) { SMfreearticle(artdata); return 0; }
175 LIST_REMOVE(art->ipf->queue, art);
177 art->ipf->counts.events[nooffer_missing]++;
178 article_done(art,-1);
182 void inputfile_queue_check_expired(InputFile *ipf) {
186 Article *art= LIST_HEAD(ipf->queue);
188 int expd= article_check_expired(art);
191 check_reading_pause_resume(ipf);
194 void article_autodefer(InputFile *ipf, Article *art) {
196 article_defer(art,-1);
199 static int has_article_in(const ArticleList *al, InputFile *ipf) {
201 for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
202 if (art->ipf == ipf) return 1;
206 static void autodefer_input_file_articles(InputFile *ipf) {
208 while ((art= LIST_REMHEAD(ipf->queue)))
209 article_autodefer(ipf, art);
212 void autodefer_input_file(InputFile *ipf) {
213 static const char *const abandon= "stuck";
216 autodefer_input_file_articles(ipf);
218 if (ipf->inprogress) {
221 if (has_article_in(&walk->waiting, ipf) ||
222 has_article_in(&walk->priority, ipf) ||
223 has_article_in(&walk->sent, ipf))
224 walk->quitting= abandon;
226 while (ipf->inprogress) {
228 if (walk->quitting == abandon) goto found;
229 abort(); /* where are they ?? */
232 connfail(walk, "connection is stuck or crawling,"
233 " and we need to finish flush");
234 autodefer_input_file_articles(ipf);
238 check_reading_pause_resume(ipf);
241 /*========== article transmission ==========*/
243 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
244 XmitKind kind) { /* caller must then fill in details */
245 struct iovec *v= &conn->xmit[conn->xmitu];
246 XmitDetails *d= &conn->xmitd[conn->xmitu++];
247 v->iov_base= (char*)data;
253 static void xmit_noalloc(Conn *conn, const char *data, int len) {
254 xmit_core(conn,data,len, xk_Const);
256 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
258 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
259 XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
263 void xmit_free(XmitDetails *d) {
265 case xk_Artdata: SMfreearticle(d->info.sm_art); break;
266 case xk_Const: break;
271 void *conn_write_some_xmits(Conn *conn) {
273 * 0: nothing more to write, no need to call us again
274 * OOP_CONTINUE: more to write but fd not writeable
275 * OOP_HALT: disaster, have destroyed conn
278 int count= conn->xmitu;
279 if (!count) return 0;
281 if (count > IOV_MAX) count= IOV_MAX;
282 ssize_t rs= writev(conn->fd, conn->xmit, count);
284 if (isewouldblock(errno)) return OOP_CONTINUE;
285 connfail(conn, "write failed: %s", strerror(errno));
292 assert(done<conn->xmitu);
293 struct iovec *vp= &conn->xmit[done];
294 XmitDetails *dp= &conn->xmitd[done];
295 assert(vp->iov_len <= SSIZE_MAX);
296 if ((size_t)rs >= vp->iov_len) {
298 xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */
301 vp->iov_base= (char*)vp->iov_base + rs;
303 break; /* rs -= rs */
306 int newu= conn->xmitu - done;
307 memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
308 memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
313 void conn_make_some_xmits(Conn *conn) {
315 if (conn->xmitu+5 > CONNIOVS)
318 Article *art= LIST_REMHEAD(conn->priority);
319 if (!art) art= LIST_REMHEAD(conn->waiting);
322 if (art->state >= art_Wanted || (conn->stream && nocheck)) {
323 /* actually send it */
325 ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
328 art->state == art_Unchecked ? art_Unsolicited :
329 art->state == art_Wanted ? art_Wanted :
332 if (!artdata) art->missing= 1;
333 art->ipf->counts.results[art->state][ artdata ? RC_sent : RC_missing ]++;
337 XMIT_LITERAL("TAKETHIS ");
338 xmit_noalloc(conn, art->messageid, art->midlen);
339 XMIT_LITERAL("\r\n");
340 xmit_artbody(conn, artdata);
342 article_done(art, -1);
346 /* we got 235 from IHAVE */
348 xmit_artbody(conn, artdata);
350 XMIT_LITERAL(".\r\n");
354 LIST_ADDTAIL(conn->sent, art);
360 XMIT_LITERAL("CHECK ");
362 XMIT_LITERAL("IHAVE ");
363 xmit_noalloc(conn, art->messageid, art->midlen);
364 XMIT_LITERAL("\r\n");
366 assert(art->state == art_Unchecked);
367 art->ipf->counts.results[art->state][RC_sent]++;
368 LIST_ADDTAIL(conn->sent, art);