X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=innduct.git;a=blobdiff_plain;f=xmit.c;h=c4b1854717333b05249bf009e33387fe9b8d13de;hp=dd909e41128a20d6e6df89b0ba4a1a1fba8b67d6;hb=a34c62479ae1f91aac7b30d3d5f1a5106a6635f5;hpb=f4aee95c41a0d6231d115386b8fbb23f6b8e349a diff --git a/xmit.c b/xmit.c index dd909e4..c4b1854 100644 --- a/xmit.c +++ b/xmit.c @@ -1,3 +1,34 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * xmit.c - transmitting checks and articles, flow control, expiry + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +#include "innduct.h" + +const char *const artstate_names[]= + { "Unchecked", "Wanted", "Unsolicited", 0 }; + /*---------- assigning articles to conns, and transmitting ----------*/ static Article *dequeue_from(int peek, InputFile *ipf) { @@ -18,13 +49,20 @@ static Article *dequeue(int peek) { return 0; } -static void check_assign_articles(void) { +static void conn_inqueue_spare(const Conn *conn, + int *inqueue_r, int *spare_r) { + int inqueue= conn->sent.count + conn->priority.count + conn->waiting.count; + int spare= conn->max_queue - inqueue; + if (inqueue_r) *inqueue_r= inqueue; + if (spare_r) *spare_r= spare; +} + +void check_assign_articles(void) { for (;;) { if (!dequeue(1)) break; Conn *walk, *use=0; - int spare=0, inqueue=0; /* Find a connection to offer this article. We prefer a busy * connection to an idle one, provided it's not full. We take the @@ -34,15 +72,16 @@ static void check_assign_articles(void) { */ FOR_CONN(walk) { if (walk->quitting) continue; - inqueue= walk->sent.count + walk->priority.count - + walk->waiting.count; - spare= walk->max_queue - inqueue; + int inqueue, spare; + conn_inqueue_spare(walk, &inqueue, &spare); assert(inqueue <= max_queue_per_conn); assert(spare >= 0); if (inqueue==0) /*idle*/ { if (!use) use= walk; } else if (spare>0) /*working*/ { use= walk; break; } } if (use) { + int inqueue, spare; + conn_inqueue_spare(use, &inqueue, &spare); if (!inqueue) use->since_activity= 0; /* reset idle counter */ while (spare>0) { Article *art= dequeue(0); @@ -66,7 +105,7 @@ static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) { return OOP_CONTINUE; } -static void conn_maybe_write(Conn *conn) { +void conn_maybe_write(Conn *conn) { for (;;) { conn_make_some_xmits(conn); if (!conn->xmitu) { @@ -114,14 +153,14 @@ static void conn_maybe_write(Conn *conn) { * pause/resume inputfile tailing */ -static void check_reading_pause_resume(InputFile *ipf) { +void check_reading_pause_resume(InputFile *ipf) { if (ipf->queue.count >= max_queue_per_ipf) inputfile_reading_pause(ipf); else inputfile_reading_resume(ipf); } -static void article_defer(Article *art /* not on a queue */, int whichcount) { +void article_defer(Article *art /* not on a queue */, int whichcount) { open_defer(); if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0 || fflush(defer)) @@ -129,13 +168,13 @@ static void article_defer(Article *art /* not on a queue */, int whichcount) { article_done(art, whichcount); } -static int article_check_expired(Article *art /* must be queued, not conn */) { +int article_check_expired(Article *art /* must be queued, not conn */) { ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT); if (artdata) { SMfreearticle(artdata); return 0; } LIST_REMOVE(art->ipf->queue, art); art->missing= 1; - art->ipf->count_nooffer_missing++; + art->ipf->counts.events[nooffer_missing]++; article_done(art,-1); return 1; } @@ -145,13 +184,14 @@ void inputfile_queue_check_expired(InputFile *ipf) { for (;;) { Article *art= LIST_HEAD(ipf->queue); + if (!art) break; int expd= article_check_expired(art); if (!expd) break; } check_reading_pause_resume(ipf); } -static void article_autodefer(InputFile *ipf, Article *art) { +void article_autodefer(InputFile *ipf, Article *art) { ipf->autodefer++; article_defer(art,-1); } @@ -169,7 +209,7 @@ static void autodefer_input_file_articles(InputFile *ipf) { article_autodefer(ipf, art); } -static void autodefer_input_file(InputFile *ipf) { +void autodefer_input_file(InputFile *ipf) { static const char *const abandon= "stuck"; ipf->autodefer= 0; @@ -220,7 +260,7 @@ static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) { d->info.sm_art= ah; } -static void xmit_free(XmitDetails *d) { +void xmit_free(XmitDetails *d) { switch (d->kind) { case xk_Artdata: SMfreearticle(d->info.sm_art); break; case xk_Const: break; @@ -228,7 +268,7 @@ static void xmit_free(XmitDetails *d) { } } -static void *conn_write_some_xmits(Conn *conn) { +void *conn_write_some_xmits(Conn *conn) { /* return values: * 0: nothing more to write, no need to call us again * OOP_CONTINUE: more to write but fd not writeable @@ -270,7 +310,7 @@ static void *conn_write_some_xmits(Conn *conn) { } } -static void conn_make_some_xmits(Conn *conn) { +void conn_make_some_xmits(Conn *conn) { for (;;) { if (conn->xmitu+5 > CONNIOVS) break; @@ -290,7 +330,7 @@ static void conn_make_some_xmits(Conn *conn) { (abort(),-1); if (!artdata) art->missing= 1; - art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++; + art->ipf->counts.results[art->state][ artdata ? RC_sent : RC_missing ]++; if (conn->stream) { if (artdata) { @@ -324,7 +364,7 @@ static void conn_make_some_xmits(Conn *conn) { XMIT_LITERAL("\r\n"); assert(art->state == art_Unchecked); - art->ipf->counts[art->state][RC_sent]++; + art->ipf->counts.results[art->state][RC_sent]++; LIST_ADDTAIL(conn->sent, art); } }