3 * tailing reliable realtime streaming feeder for inn
4 * xmit.c - transmitting checks and articles, flow control, expiry
6 * Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
7 * and contributors; see LICENCE.txt.
8 * SPDX-License-Identifier: GPL-3.0-or-later
13 const char *const artstate_names[]=
14 { "Unchecked", "Wanted", "Unsolicited", 0 };
16 /*---------- assigning articles to conns, and transmitting ----------*/
18 static Article *dequeue_from(int peek, InputFile *ipf) {
20 if (peek) return LIST_HEAD(ipf->queue);
22 Article *art= LIST_REMHEAD(ipf->queue);
24 check_reading_pause_resume(ipf);
28 static Article *dequeue(int peek) {
30 art= dequeue_from(peek, flushing_input_file); if (art) return art;
31 art= dequeue_from(peek, backlog_input_file); if (art) return art;
32 art= dequeue_from(peek, main_input_file); if (art) return art;
36 static void conn_inqueue_spare(const Conn *conn,
37 int *inqueue_r, int *spare_r) {
38 int inqueue= conn->sent.count + conn->priority.count + conn->waiting.count;
39 int spare= conn->max_queue - inqueue;
40 if (inqueue_r) *inqueue_r= inqueue;
41 if (spare_r) *spare_r= spare;
44 void check_assign_articles(void) {
51 /* Find a connection to offer this article. We prefer a busy
52 * connection to an idle one, provided it's not full. We take the
53 * first (oldest) and since that's stable, it will mean we fill up
54 * connections in order. That way if we have too many
55 * connections, the spare ones will go away eventually.
58 if (walk->quitting) continue;
60 conn_inqueue_spare(walk, &inqueue, &spare);
61 assert(inqueue <= max_queue_per_conn);
63 if (inqueue==0) /*idle*/ { if (!use) use= walk; }
64 else if (spare>0) /*working*/ { use= walk; break; }
68 conn_inqueue_spare(use, &inqueue, &spare);
69 if (!inqueue) use->since_activity= 0; /* reset idle counter */
71 Article *art= dequeue(0);
73 LIST_ADDTAIL(use->waiting, art);
74 lowvol_perperiod[lowvol_circptr]++;
77 conn_maybe_write(use);
78 } else if (allow_connect_start()) {
87 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
92 void conn_maybe_write(Conn *conn) {
94 conn_make_some_xmits(conn);
96 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
101 void *rp= conn_write_some_xmits(conn);
102 if (rp==OOP_CONTINUE) {
103 if (!conn->oopwriting) {
104 loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
108 } else if (rp==OOP_HALT) {
111 /* transmitted everything */
118 /*---------- expiry, flow control and deferral ----------*/
122 * to ensure articles go away eventually
123 * separate queue for each input file
125 * every period, check head of backlog queue for expiry with SMretrieve
126 * if too old: discard, and check next article
127 * also check every backlog article as we read it
129 * after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
130 * one-off: eat queued articles from flushing and write them to defer
131 * one-off: connfail all connections which have any articles from flushing
132 * newly read articles from flushing go straight to defer
133 * this should take care of it and get us out of this state
134 * to avoid filling up ram needlessly
136 * limit number of queued articles for each ipf
137 * pause/resume inputfile tailing
140 void check_reading_pause_resume(InputFile *ipf) {
141 if (ipf->queue.count >= max_queue_per_ipf)
142 inputfile_reading_pause(ipf);
144 inputfile_reading_resume(ipf);
147 void article_defer(Article *art /* not on a queue */, int whichcount) {
149 if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
151 sysdie("write to defer file %s",path_defer);
152 article_done(art, whichcount);
155 int article_check_expired(Article *art /* must be queued, not conn */) {
156 ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
157 if (artdata) { SMfreearticle(artdata); return 0; }
159 LIST_REMOVE(art->ipf->queue, art);
161 art->ipf->counts.events[nooffer_missing]++;
162 article_done(art,-1);
166 void inputfile_queue_check_expired(InputFile *ipf) {
170 Article *art= LIST_HEAD(ipf->queue);
172 int expd= article_check_expired(art);
175 check_reading_pause_resume(ipf);
178 void article_autodefer(InputFile *ipf, Article *art) {
180 article_defer(art,-1);
183 static int has_article_in(const ArticleList *al, InputFile *ipf) {
185 for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
186 if (art->ipf == ipf) return 1;
190 static void autodefer_input_file_articles(InputFile *ipf) {
192 while ((art= LIST_REMHEAD(ipf->queue)))
193 article_autodefer(ipf, art);
196 void autodefer_input_file(InputFile *ipf) {
197 static const char *const abandon= "stuck";
200 autodefer_input_file_articles(ipf);
202 if (ipf->inprogress) {
205 if (has_article_in(&walk->waiting, ipf) ||
206 has_article_in(&walk->priority, ipf) ||
207 has_article_in(&walk->sent, ipf))
208 walk->quitting= abandon;
210 while (ipf->inprogress) {
212 if (walk->quitting == abandon) goto found;
213 abort(); /* where are they ?? */
216 connfail(walk, "connection is stuck or crawling,"
217 " and we need to finish flush");
218 autodefer_input_file_articles(ipf);
222 check_reading_pause_resume(ipf);
225 /*========== article transmission ==========*/
227 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
228 XmitKind kind) { /* caller must then fill in details */
229 struct iovec *v= &conn->xmit[conn->xmitu];
230 XmitDetails *d= &conn->xmitd[conn->xmitu++];
231 v->iov_base= (char*)data;
237 static void xmit_noalloc(Conn *conn, const char *data, int len) {
238 xmit_core(conn,data,len, xk_Const);
240 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
242 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
243 XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
247 void xmit_free(XmitDetails *d) {
249 case xk_Artdata: SMfreearticle(d->info.sm_art); break;
250 case xk_Const: break;
255 void *conn_write_some_xmits(Conn *conn) {
257 * 0: nothing more to write, no need to call us again
258 * OOP_CONTINUE: more to write but fd not writeable
259 * OOP_HALT: disaster, have destroyed conn
262 int count= conn->xmitu;
263 if (!count) return 0;
265 if (count > IOV_MAX) count= IOV_MAX;
266 ssize_t rs= writev(conn->fd, conn->xmit, count);
268 if (isewouldblock(errno)) return OOP_CONTINUE;
269 connfail(conn, "write failed: %s", strerror(errno));
276 assert(done<conn->xmitu);
277 struct iovec *vp= &conn->xmit[done];
278 XmitDetails *dp= &conn->xmitd[done];
279 assert(vp->iov_len <= SSIZE_MAX);
280 if ((size_t)rs >= vp->iov_len) {
282 xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */
285 vp->iov_base= (char*)vp->iov_base + rs;
287 break; /* rs -= rs */
290 int newu= conn->xmitu - done;
291 memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
292 memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
297 void conn_make_some_xmits(Conn *conn) {
299 if (conn->xmitu+5 > CONNIOVS)
302 Article *art= LIST_REMHEAD(conn->priority);
303 if (!art) art= LIST_REMHEAD(conn->waiting);
306 if (art->state >= art_Wanted || (conn->stream && nocheck)) {
307 /* actually send it */
309 ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
312 art->state == art_Unchecked ? art_Unsolicited :
313 art->state == art_Wanted ? art_Wanted :
316 if (!artdata) art->missing= 1;
317 art->ipf->counts.results[art->state][ artdata ? RC_sent : RC_missing ]++;
321 XMIT_LITERAL("TAKETHIS ");
322 xmit_noalloc(conn, art->messageid, art->midlen);
323 XMIT_LITERAL("\r\n");
324 xmit_artbody(conn, artdata);
326 article_done(art, -1);
330 /* we got 235 from IHAVE */
332 xmit_artbody(conn, artdata);
334 XMIT_LITERAL(".\r\n");
338 LIST_ADDTAIL(conn->sent, art);
344 XMIT_LITERAL("CHECK ");
346 XMIT_LITERAL("IHAVE ");
347 xmit_noalloc(conn, art->messageid, art->midlen);
348 XMIT_LITERAL("\r\n");
350 assert(art->state == art_Unchecked);
351 art->ipf->counts.results[art->state][RC_sent]++;
352 LIST_ADDTAIL(conn->sent, art);