From: Ian Jackson Date: Tue, 23 Mar 2010 17:27:24 +0000 (+0000) Subject: Much implementation. Before rearrange things a bit. X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=commitdiff_plain;h=82461c8e9ade04d1bcb3786a98e57011b9324f0e Much implementation. Before rearrange things a bit. --- diff --git a/backends/innduct.c b/backends/innduct.c index 51b4233..ba4c6bd 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,6 +1,19 @@ /* + * Newsfeeds file entries should look like this: + * host.name.of.site[/exclude,exclude,...]\ + * :pattern,pattern...[/distribution,distribution...]\ + * :Tf,Wnm + * : + * or + * sitename[/exclude,exclude,...]\ + * :pattern,pattern...[/distribution,distribution...]\ + * :Tf,Wnm + * :host.name.of.site + * * Four files full of - * token article + * token messageid + * or might be blanked out + * .... * * site.name_duct.lock lock preventing multiple ducts * holder of this lock is "duct" @@ -16,7 +29,8 @@ * created, written, used by duct * site.name_backlog.lock lock taken out by innxmit wrapper * holder and its child are "xmit" - * site.name_backlog_ 431'd articles, ready for innxmit + * site.name_backlog_. + * 431'd articles, ready for innxmit * created (link/mv) by duct * read by xmit * unlinked by xmit @@ -127,6 +141,11 @@ static int max_connections, max_queue_per_conn; static int connection_setup_timeout, port, try_stream; static const char *remote_host; +static double accept_proportion; +static double nocheck_thresh= 0.95; +static double nocheck_decay= 1-1/100; +static int nocheck, nocheck_reported; + #define ISNODE(T) T *next, *back; #define LIST(T) struct { T *head, *tail, *tailpred; int count; } @@ -146,11 +165,33 @@ static const char *remote_host; #define LIST_INSERT(l,n,pred) \ (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++) + +#define RESULT_COUNTS \ + RC(offered) \ + RC(sent) \ + RC(unwanted) \ + RC(accepted) \ + RC(rejected) \ + RC(deferred) + +typedef enum { +#define RC_INDEX(x) RCI_##x + RESULT_COUNTS + RCI_max +} ResultCountIndex; + +typedef struct { + int articles[2 /* checked */][RCI_max]; +} Counts; + struct Article { - char *mid; int midlen; int checked, sentbody; - fd and offset for blanking token or mid; + InputFile *ipf; + TOKEN token; + off_t offset; + int blanklen; + char messageid[1]; }; #define CONNIOVS 128 @@ -160,7 +201,7 @@ struct Article { typedef struct Conn Conn; typedef enum { - Malloc, Const, Artdata; + xk_Malloc, xk_Const, xk_Artdata; } XmitKind; typedef struct { @@ -181,6 +222,8 @@ struct Conn { int xmitu; }; +static FILE *defer; + static int filemon_init(void); static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path); static void filemon_callback(void); @@ -523,6 +566,36 @@ static void conn_check_work(Conn *conn) { /*========== article transmission ==========*/ + +static XmitDetails *xmit_core(Conn *conn, const char *data, int len, + XmitKind kind) { /* caller must then fill in details */ + struct iovec *v= &conn->xmit[conn->xmitu]; + XmitDetails *d= &conn->xmitd[conn->xmitu++]; + v->iov_base= data; + v->iov_len= len; + d->kind= kind; + return d; +} + +static void xmit_noalloc(Conn *conn, const char *data, int len) { + xmit_core(conn,data,len, xk_Const); +} +#define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1)) + +static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) { + XmitDetails *d= xmit_core(conn, ah->data, ah->len, sk_Artdata); + d->info.sm_art= ah; +} + +static void xmit_free(XmitDetails *d) { + switch (d->kind) { + case xk_Malloc: free(d->info.malloc_tofree); break; + case xk_Artdata: SMfreearticle(d->info.sm_art); break; + case xk_Const: break; + default: abort(); + } +} + static void *conn_write_some_xmits(Conn *conn) { /* return values: * 0: nothing more to write, no need to call us again @@ -569,29 +642,32 @@ static void conn_make_some_xmits(Conn *conn) { Article *art= LIST_REMHEAD(queue); if (!art) break; - if (art->checked || conn->nocheck) { + if (art->checked || (conn->stream && nocheck)) { /* actually send it */ - ARTHANDLE *artdata= SMretrieve(somehow); + ARTHANDLE *artdata= SMretrieve(); if (conn->stream) { if (artdata) { XMIT_LITERAL("TAKETHIS "); - xmit_noalloc(art->mid, art->midlen); + xmit_noalloc(conn, art->mid, art->midlen); XMIT_LITERAL("\r\n"); - xmit_artbody(artdata); + xmit_artbody(conn, artdata); } } else { /* we got 235 from IHAVE */ if (artdata) { - xmit_artbody(artdata); + xmit_artbody(conn, artdata); } else { XMIT_LITERAL(".\r\n"); } } + art->sent= 1; LIST_ADDTAIL(conn->sent, art); + counts[art->checked].sent++; + } else { /* check it */ @@ -603,6 +679,7 @@ static void conn_make_some_xmits(Conn *conn) { XMIT_LITERAL("\r\n"); LIST_ADDTAIL(conn->sent, art); + counts[art->checked].offered++; } } } @@ -615,6 +692,94 @@ static const oop_rd_style peer_rd_style= { OOP_RD_SHORTREC_FORBID }; +static Article *article_reply_check(Connection *conn, const char *response, + int code_indicates_streaming, + const char *sanitised_response) { + Article *art= LIST_REMHEAD(conn->sent); + + if (!art) { + warn("peer gave unexpected response when no commands outstanding: %s", + sanitised_response); + goto failed; + } + + if (code_indicates_streaming) { + assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */ + if (!conn->stream) { + warn("peer gave streaming response code " + " to IHAVE or subsequent body: %s", sanitised_response); + goto failed; + } + const char *got_mid= response+4; + int got_midlen= strcspn(got_mid, " \n\r"); + if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') { + warn("peer gave streaming response with syntactically invalid" + " messageid: %s", sanitised_response); + goto failed; + } + if (got_midlen != art->midlen || + memcmp(got_mid, art->messageid, got_midlen)) { + peer("peer gave streaming response code to wrong article -" + " probable synchronisation problem; we offered: %s; peer said: %s", + art->messageid, sanitised_response); + goto failed; + } + } else { + if (conn->stream) { + warn("peer gave non-streaming response code to CHECK/TAKETHIS: %s", + sanitised_response); + goto failed; + } + } + + return art; + + failed: + conn_failed(conn); + return 0; +} + +static void update_nocheck(int accepted) { + accept_proportion *= accept_decay; + accept_proportion += accepted; + nocheck= accept_proportion >= nocheck_thresh; + if (nocheck && !nocheck_reported) { + notice("entering nocheck mode for the first time"); + nocheck_reported= 1; + } +} + +static void article_done(Connection *conn, Article *art, int whichcount) { + *count++; + counts.articles[art->checked][whichcount]++; + if (whichcount == RC_accepted) update_nocheck(1); + else if (whichcount == RC_unwanted) update_nocheck(0); + + InputFile *ipf= art->ipf; + while (art->blanklen) { + static const char spaces[]= + " " + " " + " " + " "; + int w= art->blanklen; if (w >= sizeof(spaces)) w= sizeof(spaces)-1; + int r= pwrite(ipf->fd, spaces, w, art->offset); + if (r==-1) { + if (errno==EINTR) continue; + sysdie("failed to blank entry for %s (length %d at offset %lu) in %s", + art->messageid, art->blanklen, art->offset, ipf->path); + } + assert(r>=0 && r<=w); + art->blanklen -= w; + art->offset += w; + } + + ipf->inprogress--; + assert(ipf->inprogress >= 0); + + free(art); +} + static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, const char *errmsg, int errnoval, const char *data, size_t recsz, void *conn_v) { @@ -657,48 +822,193 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, return; } + Article *art; + +#define GET_ARTICLE \ + art= article_reply_check(conn, data, code_streaming, sani); \ + if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */ + +#define ARTICLE_DEALTWITH(streaming,how) \ + code_streaming= (streaming) \ + GET_ARTICLE; \ + article_done(conn, art, RC_##how); break; + + int code_streaming= 0; + switch (code) { - case 438: /* CHECK says they have it */ - case 435: /* IHAVE says they have it */ - ARTICLE_DEALTWITH(1,unwanted); - break; + + case 435: ARTICLE_DEALTWITH(0,unwanted); /* IHAVE says they have it */ + case 438: ARTICLE_DEALTWITH(1,unwanted); /* CHECK/TAKETHIS: they have it */ + + case 235: ARTICLE_DEALTWITH(0,accepted); /* IHAVE says thanks */ + case 239: ARTICLE_DEALTWITH(1,accepted); /* TAKETHIS says thanks */ + + case 437: ARTICLE_DEALTWITH(0,rejected); /* IHAVE says rejected */ + case 439: ARTICLE_DEALTWITH(1,rejected); /* TAKETHIS says rejected */ case 238: /* CHECK says send it */ + code_streaming= 1; case 335: /* IHAVE says send it */ + GET_ARTICLE; count_checkedwanted++; - Article *art= LIST_REMHEAD(conn->sent); - art->checked= 1; LIST_ADDTAIL(conn->queue); - break; - - case 235: /* IHAVE says thanks */ - case 239: /* TAKETHIS says thanks */ - ARTICLE_DEALTWITH(1,accepted); - break; - - case 439: /* TAKETHIS says rejected */ - case 437: /* IHAVE says rejected */ - ARTICLE_DEALTWITH(1,rejected); + if (art->checked) { + warn("peer gave %d response to article body",code); + goto failed; + } + art->checked= 1; break; case 431: /* CHECK or TAKETHIS says try later */ + code_streaming= 1; case 436: /* IHAVE says try later */ - ARTICLE_DEALTWITH(0,deferred); + GET_ARTICLE; + if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0 + || fflush(defer)) + sysdie("write to defer file %s",path_ductdefer); + article_done(conn, art, RC_deferred); break; case 400: warn("peer has stopped accepting articles: %s", sani); goto failed; case 503: warn("peer timed us out: %s", sani); goto failed; - default: warn("peer sent unexpected message: %s", sani); + default: warn("peer sent unexpected message: %s", sani); goto failed; + failed: conn_failed(conn); return OOP_CONTINUE;; } + check_check_work(conn); return OOP_CONTINUE; } /*========== monitoring of input file ==========*/ +static void feedfile_eof(InputFile *ipf) { + assert(ipf != main_input_file); /* promised by tailing_try_read */ + assert(ipf == old_input_file); + assert(sms == sm_SEPARATED); + sms= sm_FINISHING; + inputfile_tailing_stop(ipf); + inputfile_tailing_start(main_input_file); +} + +static void statmc_finishdone(void) { + time_t now; + struct stat stab; + + assert(sms == sm_FINISHING); + + r= fstat(fileno(defer), &stab); + if (r) sysdie("check defer file %s", path_defer); + + if (fclose(defer)) sysdie("could not close defer file %s", path_defer); + defer= 0; + + now= time(0); + if (now==-1) sysdie("could not get current time for backlog filename"); + + char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile, + (unsigned long)now, + (unsigned long)stab.st_ino); + if (link(path_defer, path_backlog)) + sysdie("could not install defer file %s as backlog file %s", + path_defer, backlog); + if (unlink(path_defer)) + sysdie("could not unlink old defer link %s to backlog file %s", + path_defer, backlog); + open_defer(); + + close_input_file(old_input_file); + old_input_file= 0; + + if (unlink(path_duct)) + sysdie("could not unlink old duct file %s", path_duct); + + sms= sm_NORMAL; +} + +static InputFile *open_input_file(const char *path) { + int fd= open(path, O_RDONLY); + if (fd<0) { + if (errno==ENOENT) return 0; + sysdie("unable to open input file %s", path); + } + + InputFile *ipf= xmalloc(sizeof(InputFile)); + memset(ipf,0,sizeof(*ipf)); + + ipf->readable.on_readable= tailing_on_readable; + ipf->readable.on_cancel= tailing_on_cancel; + ipf->readable.try_read= tailing_try_read; + + ipf->fd= fd; + ipf->path= path; + + return ipf; +} + +static void close_input_file(InputFile *ipf) { + assert(!ipf->readable_callback); /* must have had ->on_cancel */ + assert(!ipf->filemon); /* must have had inputfile_tailing_stop */ + assert(!ipf->rd); /* must have had inputfile_tailing_stop */ + assert(!ipf->inprogress); /* no dangling pointers pointing here */ + + if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); + free(ipf); +} + +/*---------- dealing with articles read in the input file ----------*/ + +typedef void *feedfile_got_article(oop_source *lp, oop_read *rd, + oop_rd_event ev, const char *errmsg, + int errnoval, + const char *data, size_t recsz, + void *ipf_v) { + InputFile *ipf= ipf_v; + Article *art; + char tokentextbuf[sizeof(TOKEN)*2+3]; + + if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; } + + if (data[0] && data[0]!=' ') { + char *space= strchr(data,' '); + int tokenlen= space-data; + int midlen= (int)recsz-tokenlen-1; + if (midlen < 0) goto bad_data; + + if (tokenlen != sizeof(TOKEN)*2+2) goto bad_data; + memcpy(tokentextbuf, data, tokenlen); + tokentextbuf[tokenlen]= 0; + if (!IsToken(tokentextbuf)) goto bad_data; + + art= xmalloc(sizeof(*art) - 1 + midlen + 1); + art->offset= ipf->offset; + art->blanklen= recsz; + art->midlen= midlen; + art->checked= art->sentbody= 0; + art->ipf= ipf; ipf->inprogress++; + art->token= TextToToken(tokentextbuf); + strcpy(art->messageid, space+1); + LIST_ADDTAIL(queue, art); + } + ipf->offset += recsz + 1; + + if (sms==sm_NORMAL && ipf->offset >= flush_threshold) { + int r= link(feedfile, duct_path); + if (r) sysdie("link feedfile %s to ductfile %s", feedfile, dut_path); + /* => Hardlinked */ + + r= unlink(feedfile); + if (r) sysdie("unlink old feedfile link %s", feedfile); + /* => Moved */ + + spawn_inndcomm_flush(); /* => Flushing, sets sms to sm_FLUSHING */ + } + + check_master_queue(); +} + /*---------- tailing input file ----------*/ static void filemon_start(InputFile *ipf) { @@ -708,7 +1018,7 @@ static void filemon_start(InputFile *ipf) { memset(ipf->filemon, 0, sizeof(*ipf->filemon)); filemon_method_startfile(ipf, ipf->filemon); } - + static void filemon_stop(InputFile *ipf) { if (!ipf->filemon) return; filemon_method_stopfile(ipf, ipf->filemon); @@ -719,7 +1029,7 @@ static void filemon_stop(InputFile *ipf) { static void filemon_callback(InputFile *ipf) { ipf->readable_callback(ipf->readable_callback_user); } - + static void *tailing_rable_call_time(oop_source *loop, struct timeval tv, void *user) { InputFile *ipf= user; @@ -731,6 +1041,7 @@ static void on_cancel(struct oop_readable *rable) { if (ipf->filemon) filemon_stopfile(ipf); loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); + ipf->readable_callback= 0; } static int tailing_on_readable(struct oop_readable *rable, @@ -756,7 +1067,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, return r; } } - + /*---------- filemon implemented with inotify ----------*/ #if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON) @@ -850,7 +1161,7 @@ static const oop_rd_style feedfile_rdstyle= { OOP_RD_NUL_FORBID, OOP_RD_SHORTREC_EOF, }; - + static void inputfile_tailing_start(InputFile *ipf) { assert(!ipf->fd); ipf->readable->on_readable= tailing_on_readable; @@ -891,8 +1202,8 @@ static void inputfile_tailing_stop(InputFile *ipf) { | TIMEOUT |`--------------------------. | | install defer as backlog - ,--------->| | exit - | | OPEN F SUCCEEDS V + | OPEN F SUCCEEDS | exit + ,--------->| V | V ========= | ======== (ESRCH) | NORMAL [Dropped] @@ -974,13 +1285,103 @@ typedef struct { oop_read *rd; long inprogress; /* no. of articles read but not processed */ + off_t offset; } InputFile; +typedef enum { + sm_WAITING, + sm_NORMAL, + sm_FLUSHING, + sm_FLUSHFAIL, + sm_DROPPING, + sm_SEPARATED, + sm_FINISHING; +} StateMachineState; + +static InputFile *main_input_file, *old_input_file; +static StateMachineState sms; +static int waiting_periods_sofar; + +static void open_defer(void) { + struct stat stab; + + assert(!defer); + defer= fopen(path_ductdefer, "a+"); + if (!defer) sysdie("could not open defer file %s", path_ductdefer); + + /* truncate away any half-written records */ + + r= fstat(fileno(defer), &stab); + if (r) sysdie("could not stat newly opened defer file %s", path_ductdefer); + + if (stab.st_size > LONG_MAX) + die("defer file %s size is far too large", path_ductdefer); + + if (!stab.st_size) + return; + + long orgsize= stab.st_size; + long truncto= stab.st_size; + for (;;) { + if (!truncto) break; /* was only (if anything) one half-truncated record */ + if (fseek(defer, truncto-1, SEEK_SET) < 0) + sysdie("seek in defer file %s while truncating partial", path_ductdefer); + + r= getc(defer); + if (r==EOF) { + if (ferror(defer)) + sysdie("failed read from defer file %s", path_ductdefer); + else + die("defer file %s shrank while we were checking it!", path_ductdefer); + } + if (r=='\n') break; + truncto--; + } + + if (stab.st_size != truncto) { + warn("truncating half-record at end of defer file %s -" + " shrinking by %ld bytes from %ld to %ld", + path_ductdefer, orgsize - truncto, orgsize, truncto); + + if (fflush(defer)) sysdie("could not flush defer file %s", path_ductdefer); + if (ftruncate(fileno(defer), truncto)) + sysdie("could not truncate defer file %s", path_ductdefer); + + } else { + info("continuing existing defer file %s (%ld bytes)", + path_ductdefer, orgsize); + } + if (fseek(defer, truncto, SEEK_SET)) + sysdie("could not seek to new end of defer file %s", path_ductdefer); +} + static void statemc_init(void) { + struct stat stab; + path_ductlock= xasprintf("%s_duct.lock", feedfile); path_duct= xasprintf("%s_duct", feedfile); path_ductdefer= xasprintf("%s_duct.defer", feedfile); + if (lstat(path_ductdefer, &stab)) { + if (errno!=ENOENT) sysdie("could not check defer file %s", path_defer); + } else { + if (!S_ISREG(stab.st_mode)) + die("defer file %s not a plain file (mode 0%lo)", + path_defer, (unsigned long)stab.st_mode); + switch (stab.st_nlink==1) { + case 1: /* ok */ break; + case 2: + if (unlink(path_defer)) + sysdie("could not unlink stale defer file link %s (presumably" + " hardlink to backlog file)", path_defer); + break; + default: + die("defer file %s has unexpected link count %d", + path_defer, stab.st_nlink); + } + } + open_defer(); + int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600); if (lockfd<0) sysdie("open lockfile %s", path_ductlock); @@ -994,26 +1395,69 @@ static void statemc_init(void) { die("another duct holds the lockfile"); sysdie("fcntl F_SETLK lockfile %s", path_ductlock); } -} -static void statemc_poll(void) { - if (tailing_fd>=0) return; + InputFile *file_d= open_input_file(path_duct); + + if (file_d) { + struct stat stab_f, stab_d; + + r= stat(feedfile, &stab_f); + if (r) { + if (errno!=ENOENT) sysdie("check feed file %s", feedfile); + /* D exists, F ENOENT => Moved */ + goto found_moved; + } - int d_fd= open(path_duct, O_RDWR); - if (d_fd<0) - if (errno!=ENOENT) sysdie("open duct file %s", path_duct); + /* F and D both exist */ - int f_fd= open(feedfile, O_RDWR); - if (f_fd<0) - if (errno!=ENOENT) sysdie("open feed file %s", feedfile); + r= fstat(file_d->fd, &stab_d); + if (r) sysdie("check duct file %s", ductfile); - if (d_fd<0) { - if (f_fd>=0) - start_tailing(f_fd); + if (stab_d.st_ino == stab_f.st_ino && + stab_d.st_dev == stab_f.st_dev) { + /* F==D => Hardlinked*/ + r= unlink(path_duct); + if (r) sysdie("unlink feed file %s during startup", feedfile); + found_moved: + /* => Moved */ + startup_set_input_file(file_d); + spawn_inndcomm_flush(); /* => Flushing, sets sms to sm_FLUSHING */ + } else { + /* F!=D => Separated */ + sms= sm_SEPARATED; + startup_set_input_file(file_d); + } + } else { /*!file_d*/ + sms= sm_WAITING; + statemc_waiting_poll(); + } +} + +static void statemc_poll(void) { + if (sms == sm_WAITING) statemc_waiting_poll(); + if (sms == sm_FINISHING && !old_input_file->inprogress) statemc_finishdone(); +} + +static void statemc_waiting_poll(void) { + InputFile *file_f= open_input_file(feedfile); + if (!file_f) { + if (waiting_periods_sofar++ > waiting_timeout_periods) + die("timed out waiting for innd to create feed file %s", feedfile); return; } + startup_set_input_file(file_d); + sms= sm_NORMAL; +} + +static void startup_set_input_file(InputFile *f) { + assert(!main_input_file); + main_input_file= f; + inputfile_tailing_start(f); +} +/*========== flushing the feed ==========*/ + /*========== main program ==========*/