3 * build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
8 to ensure articles go away eventually
9 separate queue for each input file
11 every period, check head of backlog queue for expiry with SMretrieve
12 if too old: discard, and check next article
13 also check every backlog article as we read it
15 after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
16 one-off: eat queued articles from flushing and write them to defer
17 one-off: connfail all connections which have any articles from flushing
18 newly read articles from flushing go straight to defer
19 this should take care of it and get us out of this state
20 to avoid filling up ram needlessly
22 limit number of queued articles for each ipf
23 pause/resume inputfile tailing
27 * Newsfeeds file entries should look like this:
28 * host.name.of.site[/exclude,exclude,...]\
29 * :pattern,pattern...[/distribution,distribution...]\
33 * sitename[/exclude,exclude,...]\
34 * :pattern,pattern...[/distribution,distribution...]\
40 * or might be blanked out
41 * <spc><spc><spc><spc>....
43 * F site.name main feed file
44 * opened/created, then written, by innd
47 * tokens blanked out by duct when processed
48 * site.name_lock lock preventing multiple ducts
49 * to hold lock must open,F_SETLK[W]
50 * and then stat to check that locked file
51 * still has name site.name_lock
52 * holder of this lock is "duct"
53 * (only) lockholder may remove the lockfile
54 * D site.name_flushing temporary feed file during flush (or crash)
55 * hardlink created by duct
57 * site.name_defer 431'd articles, still being written,
58 * created, written, used by duct
60 * site.name_backlog.<date>.<inum>
61 * 431'd articles, ready for innxmit or duct
62 * created (link/mv) by duct
63 * site.name_backlog<anything-else> (where <anything-else> does not
64 * contain '#' or '~') eg
65 * site.name_backlog.manual
66 * anything the sysadmin likes (eg, feed files
67 * from old feeds to be merged into this one)
68 * created (link/mv) by admin
69 * may be symlinks (in which case links
70 * may be written through, but only links
73 * It is safe to remove backlog files manually,
74 * if it's desired to throw away the backlog.
76 * Backlog files are also processed by innduct. We find the oldest
77 * backlog file which is at least a certain amount old, and feed it
78 * back into our processing. When every article in it has been read
79 * and processed, we unlink it and look for another backlog file.
81 * If we don't have a backlog file that we're reading, we close the
82 * defer file that we're writing and make it into a backlog file at
83 * the first convenient opportunity.
94 | | <----------------<---------------------------------'|
100 | F: innd writing, duct reading |
103 | | duct decides time to flush |
104 | | duct makes hardlink |
106 | V <------------------------'|
108 | F == D: innd writing, duct reading both exist |
111 | | <-----------<-------------<--'|
112 | | open D F ENOENT |
115 | V <---------------------. |
118 | D: innd writing, duct reading; or ENOENT | |
120 | | duct requests flush of feed | |
121 | | (others can too, harmlessly) | |
125 | D: innd flushing, duct; or ENOENT | |
127 | | inndcomm flush fails | |
128 | |`-------------------------->------------------' |
130 | | inndcomm reports no such site |
131 | |`---------------------------------------------------- | -.
133 | | innd finishes writing D, creates F | |
134 | | inndcomm reports flush successful | |
137 | Separated <----------------' |
138 | F: innd writing F!=D /
139 | D: duct reading; or ENOENT both exist /
141 | | duct gets to the end of D /
142 | | duct opens F too /
145 | F: innd writing, duct reading |
146 | D: duct finishing V
148 | | duct finishes processing D F: ENOENT
149 | V duct unlinks D D: duct reading
151 `--<--' | duct finishes
161 "duct reading" means innduct is reading the file but also
162 overwriting processed tokens.
166 * rune for printing diagrams:
168 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
173 /*============================== PROGRAM ==============================*/
175 #define _GNU_SOURCE 1
181 #include "inndcomm.h"
183 #include "inn/list.h"
184 #include "inn/innconf.h"
187 #include <sys/types.h>
188 #include <sys/wait.h>
189 #include <sys/stat.h>
190 #include <sys/socket.h>
209 #include <oop-read.h>
211 /*----- general definitions, probably best not changed -----*/
213 #define CONNCHILD_ESTATUS_STREAM 24
214 #define CONNCHILD_ESTATUS_NOSTREAM 25
216 #define INNDCOMMCHILD_ESTATUS_FAIL 26
217 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
219 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
220 #define MAX_CONTROL_COMMAND 1000
222 #define VA va_list al; va_start(al,fmt)
223 #define PRINTF(f,a) __attribute__((__format__(printf,f,a)))
224 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
225 #define NORET __attribute__((__noreturn__))
227 #define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr))))
228 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
230 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
232 #define FOR_CONN(conn) \
233 for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
235 /*----- doubly linked lists -----*/
237 #define ISNODE(T) struct node list_node
240 union { struct list li; T *for_type; } u; \
244 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
246 #define LIST_CHECKCANHAVENODE(l,n) \
247 ((void)((n) == ((l).u.for_type))) /* just for the type check */
249 #define LIST_ADDSOMEHOW(l,n,list_addsomehow) \
250 ( LIST_CHECKCANHAVENODE(l,n), \
251 list_addsomehow(&(l).u.li, NODE((n))), \
255 #define LIST_REMSOMEHOW(l,list_remsomehow) \
256 ( (typeof((l).u.for_type)) \
259 list_remsomehow(&(l).u.li) ) \
265 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
266 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
267 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
268 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
270 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
271 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
272 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
273 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
275 #define LIST_REMOVE(l,n) \
276 ( LIST_CHECKCANHAVENODE(l,n), \
277 list_remove(NODE((n))), \
281 #define LIST_INSERT(l,n,pred) \
282 ( LIST_CHECKCANHAVENODE(l,n), \
283 LIST_CHECKCANHAVENODE(l,pred), \
284 list_insert((struct list*)&(l), NODE((n)), NODE((pred))), \
288 /*----- type predeclarations -----*/
290 typedef struct Conn Conn;
291 typedef struct Article Article;
292 typedef struct InputFile InputFile;
293 typedef struct XmitDetails XmitDetails;
294 typedef struct Filemon_Perfile Filemon_Perfile;
295 typedef enum StateMachineState StateMachineState;
296 typedef struct ControlCommand ControlCommand;
301 /*----- function predeclarations -----*/
303 static void conn_maybe_write(Conn *conn);
304 static void conn_make_some_xmits(Conn *conn);
305 static void *conn_write_some_xmits(Conn *conn);
307 static void xmit_free(XmitDetails *d);
309 #define SMS(newstate, periods, why) \
310 (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
311 static void statemc_setstate(StateMachineState newsms, int periods,
312 const char *forlog, const char *why);
314 static void statemc_start_flush(const char *why); /* Normal => Flushing */
315 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
316 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
318 static void article_done(Article *art, int whichcount);
320 static void check_assign_articles(void);
321 static void queue_check_input_done(void);
322 static void check_reading_pause_resume(InputFile *ipf);
324 static void statemc_check_flushing_done(void);
325 static void statemc_check_backlog_done(void);
327 static void postfork(void);
328 static void period(void);
330 static void open_defer(void);
331 static void close_defer(void);
332 static void search_backlog_file(void);
333 static void preterminate(void);
334 static void raise_default(int signo) NORET;
335 static char *debug_report_ipf(InputFile *ipf);
337 static void inputfile_reading_start(InputFile *ipf);
338 static void inputfile_reading_stop(InputFile *ipf);
339 static void inputfile_reading_pause(InputFile *ipf);
340 static void inputfile_reading_resume(InputFile *ipf);
341 /* pause and resume are idempotent, and no-op if not done _reading_start */
343 static void filemon_start(InputFile *ipf);
344 static void filemon_stop(InputFile *ipf);
345 static void filemon_callback(InputFile *ipf);
347 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
348 static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
350 static const oop_rd_style peer_rd_style;
351 static oop_rd_call peer_rd_err, peer_rd_ok;
353 /*----- configuration options -----*/
354 /* when changing defaults, remember to update the manpage */
356 static const char *sitename, *remote_host;
357 static const char *feedfile, *realsockdir="/tmp/innduct.control";
358 static int quiet_multiple=0;
359 static int become_daemon=1, try_filemon=1;
360 static int try_stream=1;
362 static const char *inndconffile;
364 static int max_connections=10;
365 static int max_queue_per_conn=200;
366 static int target_max_feedfile_size=100000;
367 static int period_seconds=60;
368 static int filepoll_seconds=5;
369 static int max_queue_per_ipf=-1;
371 static int connection_setup_timeout=200;
372 static int inndcomm_flush_timeout=100;
374 static double nocheck_thresh= 95.0; /* converted from percentage by main */
375 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
377 /* all these are initialised to seconds, and converted to periods in main */
378 static int reconnect_delay_periods=1000;
379 static int flushfail_retry_periods=1000;
380 static int backlog_retry_minperiods=50;
381 static int backlog_spontrescan_periods=300;
382 static int spontaneous_flush_periods=100000;
383 static int max_separated_periods=2000;
384 static int need_activity_periods=1000;
386 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
387 static int max_bad_data_initial= 30;
388 /* in one corrupt 4096-byte block the number of newlines has
389 * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */
392 /*----- statistics -----*/
394 typedef enum { /* in queue in conn->sent */
395 art_Unchecked, /* not checked, not sent checking */
396 art_Wanted, /* checked, wanted sent body as requested */
397 art_Unsolicited, /* - sent body without check */
401 static const char *const artstate_names[]=
402 { "Unchecked", "Wanted", "Unsolicited", 0 };
404 #define RESULT_COUNTS(RCS,RCN) \
413 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
414 #define RCI_TRIPLE_VALS_BASE(counts,x) \
415 counts[art_Unchecked] x \
416 + counts[art_Wanted] x \
417 + counts[art_Unsolicited] x, \
418 counts[art_Unchecked] x \
419 , counts[art_Wanted] x \
420 , counts[art_Unsolicited] x
423 #define RC_INDEX(x) RC_##x,
424 RESULT_COUNTS(RC_INDEX, RC_INDEX)
429 /*----- transmission buffers -----*/
445 /*----- core operational data structure types -----*/
448 /* This is also an instance of struct oop_readable */
449 struct oop_readable readable; /* first */
450 oop_readable_call *readable_callback;
451 void *readable_callback_user;
454 Filemon_Perfile *filemon;
456 oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
458 int skippinglong, paused;
461 long inprogress; /* includes queue.count and also articles in conns */
462 long autodefer; /* -1 means not doing autodefer */
464 int counts[art_MaxState][RCI_max];
465 int readcount_ok, readcount_blank, readcount_err, count_nooffer_missing;
480 #define SMS_LIST(X) \
488 enum StateMachineState {
489 #define SMS_DEF_ENUM(s) sm_##s,
490 SMS_LIST(SMS_DEF_ENUM)
493 static const char *sms_names[]= {
494 #define SMS_DEF_NAME(s) #s ,
495 SMS_LIST(SMS_DEF_NAME)
501 int fd; /* may be 0, meaning closed (during construction/destruction) */
502 oop_read *rd; /* likewise */
503 int max_queue, stream, quitting;
504 int since_activity; /* periods */
505 ArticleList waiting; /* not yet told peer */
506 ArticleList priority; /* peer says send it now */
507 ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
508 struct iovec xmit[CONNIOVS];
509 XmitDetails xmitd[CONNIOVS];
514 /*----- general operational variables -----*/
516 /* main initialises */
517 static oop_source *loop;
518 static ConnList conns;
519 static char *path_lock, *path_flushing, *path_defer;
520 static char *path_control, *path_dump;
521 static char *globpat_backlog;
522 static pid_t self_pid;
524 /* statemc_init initialises */
525 static StateMachineState sms;
526 static int until_flush;
527 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
530 /* initialisation to 0 is good */
531 static int until_connect, until_backlog_nextscan;
532 static double accept_proportion;
533 static int nocheck, nocheck_reported, in_child;
535 /* for simulation, debugging, etc. */
536 int simulate_flush= -1;
538 /*========== logging ==========*/
540 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
541 static void logcore(int sysloglevel, const char *fmt, ...) {
544 vsyslog(sysloglevel,fmt,al);
546 if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
547 vfprintf(stderr,fmt,al);
553 static void logv(int sysloglevel, const char *pfx, int errnoval,
554 const char *fmt, va_list al) PRINTF(5,0);
555 static void logv(int sysloglevel, const char *pfx, int errnoval,
556 const char *fmt, va_list al) {
557 char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
558 vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
559 msgbuf[sizeof(msgbuf)-1]= 0;
561 if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
562 sysloglevel= LOG_ERR; /* run by wrong user, probably */
564 logcore(sysloglevel, "<%s>%s: %s%s%s",
565 sitename, pfx, msgbuf,
566 errnoval>=0 ? ": " : "",
567 errnoval>=0 ? strerror(errnoval) : "");
570 #define diewrap(fn, pfx, sysloglevel, err, estatus) \
571 static void fn(const char *fmt, ...) NORET_PRINTF(1,2); \
572 static void fn(const char *fmt, ...) { \
575 logv(sysloglevel, pfx, err, fmt, al); \
579 #define logwrap(fn, pfx, sysloglevel, err) \
580 static void fn(const char *fmt, ...) PRINTF(1,2); \
581 static void fn(const char *fmt, ...) { \
583 logv(sysloglevel, pfx, err, fmt, al); \
587 diewrap(sysdie, " critical", LOG_CRIT, errno, 16);
588 diewrap(die, " critical", LOG_CRIT, -1, 16);
590 diewrap(sysfatal, " fatal", LOG_ERR, errno, 12);
591 diewrap(fatal, " fatal", LOG_ERR, -1, 12);
593 logwrap(syswarn, " warning", LOG_WARNING, errno);
594 logwrap(warn, " warning", LOG_WARNING, -1);
596 logwrap(notice, " notice", LOG_NOTICE, -1);
597 logwrap(info, " info", LOG_INFO, -1);
598 logwrap(debug, " debug", LOG_DEBUG, -1);
601 /*========== utility functions etc. ==========*/
603 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
604 static char *xvasprintf(const char *fmt, va_list al) {
606 int rc= vasprintf(&str,fmt,al);
607 if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
610 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
611 static char *xasprintf(const char *fmt, ...) {
613 char *str= xvasprintf(fmt,al);
618 static int close_perhaps(int *fd) {
619 if (*fd <= 0) return 0;
624 static void xclose(int fd, const char *what, const char *what2) {
626 if (r) sysdie("close %s%s",what,what2?what2:"");
628 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
629 if (*fd <= 0) return;
630 xclose(*fd,what,what2);
634 static pid_t xfork(const char *what) {
638 if (child==-1) sysfatal("cannot fork for %s",what);
639 debug("forked %s %ld", what, (unsigned long)child);
640 if (!child) postfork();
644 static void on_fd_read_except(int fd, oop_call_fd callback) {
645 loop->on_fd(loop, fd, OOP_READ, callback, 0);
646 loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
648 static void cancel_fd_read_except(int fd) {
649 loop->cancel_fd(loop, fd, OOP_READ);
650 loop->cancel_fd(loop, fd, OOP_EXCEPTION);
653 static void report_child_status(const char *what, int status) {
654 if (WIFEXITED(status)) {
655 int es= WEXITSTATUS(status);
657 warn("%s: child died with error exit status %d", what, es);
658 } else if (WIFSIGNALED(status)) {
659 int sig= WTERMSIG(status);
660 const char *sigstr= strsignal(sig);
661 const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
663 warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
665 warn("%s: child died due to unknown fatal signal %d%s",
666 what, sig, coredump);
668 warn("%s: child died with unknown wait status %d", what,status);
672 static int xwaitpid(pid_t *pid, const char *what) {
675 int r= kill(*pid, SIGKILL);
676 if (r) sysdie("cannot kill %s child", what);
678 pid_t got= waitpid(*pid, &status, 0);
679 if (got==-1) sysdie("cannot reap %s child", what);
680 if (got==0) die("cannot reap %s child", what);
687 static void *zxmalloc(size_t sz) {
688 void *p= xmalloc(sz);
693 static void xunlink(const char *path, const char *what) {
695 if (r) sysdie("can't unlink %s %s", path, what);
698 static time_t xtime(void) {
700 if (now==-1) sysdie("time(2) failed");
704 static void xsigaction(int signo, const struct sigaction *sa) {
705 int r= sigaction(signo,sa,0);
706 if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
709 static void xsigsetdefault(int signo) {
711 memset(&sa,0,sizeof(sa));
712 sa.sa_handler= SIG_DFL;
713 xsigaction(signo,&sa);
716 static void xgettimeofday(struct timeval *tv_r) {
717 int r= gettimeofday(tv_r,0);
718 if (r) sysdie("gettimeofday(2) failed");
721 static void xsetnonblock(int fd, int nonblocking) {
722 int errnoval= oop_fd_nonblock(fd, nonblocking);
723 if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
726 static void check_isreg(const struct stat *stab, const char *path,
728 if (!S_ISREG(stab->st_mode))
729 die("%s %s not a plain file (mode 0%lo)",
730 what, path, (unsigned long)stab->st_mode);
733 static void xfstat(int fd, struct stat *stab_r, const char *what) {
734 int r= fstat(fd, stab_r);
735 if (r) sysdie("could not fstat %s", what);
738 static void xfstat_isreg(int fd, struct stat *stab_r,
739 const char *path, const char *what) {
740 xfstat(fd, stab_r, what);
741 check_isreg(stab_r, path, what);
744 static void xlstat_isreg(const char *path, struct stat *stab,
745 int *enoent_r /* 0 means ENOENT is fatal */,
747 int r= lstat(path, stab);
749 if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
750 sysdie("could not lstat %s %s", what, path);
752 if (enoent_r) *enoent_r= 0;
753 check_isreg(stab, path, what);
756 static int samefile(const struct stat *a, const struct stat *b) {
757 assert(S_ISREG(a->st_mode));
758 assert(S_ISREG(b->st_mode));
759 return (a->st_ino == b->st_ino &&
760 a->st_dev == b->st_dev);
763 static char *sanitise(const char *input, int len) {
764 static char sanibuf[100]; /* returns pointer to this buffer! */
766 const char *p= input;
767 const char *endp= len>=0 ? input+len : 0;
771 if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
772 int c= (!endp || p<endp) ? *p++ : 0;
773 if (!c) { *q++= '\''; *q=0; break; }
774 if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
775 sprintf(q,"\\x%02x",c);
781 static int isewouldblock(int errnoval) {
782 return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
785 /*========== command and control connections ==========*/
787 static int control_master;
789 typedef struct ControlConn ControlConn;
791 void (*destroy)(ControlConn*);
797 struct sockaddr_un un;
802 static const oop_rd_style control_rd_style= {
803 OOP_RD_DELIM_STRIP, '\n',
805 OOP_RD_SHORTREC_FORBID
808 static void control_destroy(ControlConn *cc) {
812 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
813 if (ferror(cc->out) | fflush(cc->out)) {
814 info("CTRL%d write error %s", cc->fd, strerror(errno));
819 static void control_prompt(ControlConn *cc /* may destroy*/) {
820 fprintf(cc->out, "%s| ", sitename);
821 control_checkouterr(cc);
824 struct ControlCommand {
826 void (*f)(ControlConn *cc, const ControlCommand *ccmd,
827 const char *arg, size_t argsz);
832 static const ControlCommand control_commands[];
835 static void ccmd_##wh(ControlConn *cc, const ControlCommand *c, \
836 const char *arg, size_t argsz)
839 fputs("commands:\n", cc->out);
840 const ControlCommand *ccmd;
841 for (ccmd=control_commands; ccmd->cmd; ccmd++)
842 fprintf(cc->out, " %s\n", ccmd->cmd);
843 fputs("NB: permissible arguments are not shown above."
844 " Not all commands listed are safe. See innduct(8).\n", cc->out);
848 int ok= trigger_flush_ok();
849 if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
854 notice("terminating (CTRL%d)",cc->fd);
855 raise_default(SIGTERM);
861 /* messing with our head: */
862 CCMD(period) { period(); }
863 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
864 CCMD(setint) { *(int*)c->xdata= c->xval; }
865 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
867 static const ControlCommand control_commands[]= {
869 { "flush", ccmd_flush },
870 { "stop", ccmd_stop },
871 { "dump q", ccmd_dump, 0,0 },
872 { "dump a", ccmd_dump, 0,1 },
874 { "p", ccmd_period },
876 #define POKES(cmd,func) \
877 { cmd "flush", func, &until_flush, 1 }, \
878 { cmd "conn", func, &until_connect, 0 }, \
879 { cmd "blscan", func, &until_backlog_nextscan, 0 },
880 POKES("next ", ccmd_setint)
881 POKES("prod ", ccmd_setint_period)
883 { "pretend flush", ccmd_setintarg, &simulate_flush },
884 { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 },
888 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
889 const char *errmsg, int errnoval,
890 const char *data, size_t recsz, void *cc_v) {
891 ControlConn *cc= cc_v;
894 info("CTRL%d closed", cc->fd);
899 if (recsz == 0) goto prompt;
901 const ControlCommand *ccmd;
902 for (ccmd=control_commands; ccmd->cmd; ccmd++) {
903 int l= strlen(ccmd->cmd);
904 if (recsz < l) continue;
905 if (recsz > l && data[l] != ' ') continue;
906 if (memcmp(data, ccmd->cmd, l)) continue;
908 int argl= (int)recsz - (l+1);
909 ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
913 fputs("unknown command; h for help\n", cc->out);
920 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
921 const char *errmsg, int errnoval,
922 const char *data, size_t recsz, void *cc_v) {
923 ControlConn *cc= cc_v;
925 info("CTRL%d read error %s", cc->fd, errmsg);
930 static int control_conn_startup(ControlConn *cc /* may destroy*/,
932 cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
933 if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
935 int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
938 if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
940 info("CTRL%d %s ready", cc->fd, how);
945 static void control_stdio_destroy(ControlConn *cc) {
947 oop_rd_cancel(cc->rd);
948 errno= oop_rd_delete_tidy(cc->rd);
949 if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
954 static void control_stdio(void) {
955 NEW_DECL(ControlConn *,cc);
956 cc->destroy= control_stdio_destroy;
960 int r= control_conn_startup(cc,"stdio");
961 if (r) cc->destroy(cc);
964 static void control_accepted_destroy(ControlConn *cc) {
966 oop_rd_cancel(cc->rd);
967 oop_rd_delete_kill(cc->rd);
969 if (cc->out) { fclose(cc->out); cc->fd=0; }
970 close_perhaps(&cc->fd);
974 static void *control_master_readable(oop_source *lp, int master,
975 oop_event ev, void *u) {
976 NEW_DECL(ControlConn *,cc);
977 cc->destroy= control_accepted_destroy;
979 cc->salen= sizeof(cc->sa);
980 cc->fd= accept(master, &cc->sa.sa, &cc->salen);
981 if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
983 cc->out= fdopen(cc->fd, "w");
984 if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
986 int r= control_conn_startup(cc, "accepted");
996 #define NOCONTROL(...) do{ \
997 syswarn("no control socket, because failed to " __VA_ARGS__); \
1001 static void control_init(void) {
1006 struct sockaddr_un un;
1009 memset(&sa,0,sizeof(sa));
1010 int maxlen= sizeof(sa.un.sun_path);
1012 int reallen= readlink(path_control, sa.un.sun_path, maxlen);
1014 if (errno != ENOENT)
1015 NOCONTROL("readlink control socket symlink path %s", path_control);
1017 if (reallen >= maxlen) {
1018 debug("control socket symlink path too long (r=%d)",reallen);
1019 xunlink(path_control, "old (overlong) control socket symlink");
1025 int r= lstat(realsockdir,&stab);
1027 if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
1029 r= mkdir(realsockdir, 0700);
1030 if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
1033 uid_t self= geteuid();
1034 if (!S_ISDIR(stab.st_mode) ||
1035 stab.st_uid != self ||
1036 stab.st_mode & 0007) {
1037 warn("no control socket, because real socket directory"
1038 " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
1039 !!S_ISDIR(stab.st_mode),
1040 (unsigned long)stab.st_uid, (unsigned long)self,
1041 (unsigned long)stab.st_mode & 0777UL);
1046 real= xasprintf("%s/s%lx.%lx", realsockdir,
1047 (unsigned long)xtime(), (unsigned long)self_pid);
1048 int reallen= strlen(real);
1050 if (reallen >= maxlen) {
1051 warn("no control socket, because tmpnam gave overly-long path"
1055 r= symlink(real, path_control);
1056 if (r) NOCONTROL("make control socket path %s a symlink to real"
1057 " socket path %s", path_control, real);
1058 memcpy(sa.un.sun_path, real, reallen);
1061 int r= unlink(sa.un.sun_path);
1062 if (r && errno!=ENOENT)
1063 NOCONTROL("remove old real socket %s", sa.un.sun_path);
1065 control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1066 if (control_master<0) NOCONTROL("create new control socket");
1068 sa.un.sun_family= AF_UNIX;
1069 int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1070 r= bind(control_master, &sa.sa, sl);
1071 if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1073 r= listen(control_master, 5);
1074 if (r) NOCONTROL("listen");
1076 xsetnonblock(control_master, 1);
1078 loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1079 info("control socket ok, real path %s", sa.un.sun_path);
1085 xclose_perhaps(&control_master, "control master",0);
1089 /*========== management of connections ==========*/
1091 static void conn_closefd(Conn *conn, const char *msgprefix) {
1092 int r= close_perhaps(&conn->fd);
1093 if (r) info("C%d %serror closing socket: %s",
1094 conn->fd, msgprefix, strerror(errno));
1097 static void conn_dispose(Conn *conn) {
1100 oop_rd_cancel(conn->rd);
1101 oop_rd_delete_kill(conn->rd);
1105 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1106 loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1108 conn_closefd(conn,"");
1110 until_connect= reconnect_delay_periods;
1113 static void *conn_exception(oop_source *lp, int fd,
1114 oop_event ev, void *conn_v) {
1117 assert(fd == conn->fd);
1118 assert(ev == OOP_EXCEPTION);
1119 int r= read(conn->fd, &ch, 1);
1120 if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1121 else connfail(conn,"exceptional condition on socket (peer sent urgent"
1122 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1123 return OOP_CONTINUE;
1126 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1127 int requeue[art_MaxState];
1128 memset(requeue,0,sizeof(requeue));
1132 while ((art= LIST_REMHEAD(conn->priority)))
1133 LIST_ADDTAIL(art->ipf->queue, art);
1135 while ((art= LIST_REMHEAD(conn->waiting)))
1136 LIST_ADDTAIL(art->ipf->queue, art);
1138 while ((art= LIST_REMHEAD(conn->sent))) {
1139 requeue[art->state]++;
1140 if (art->state==art_Unsolicited) art->state= art_Unchecked;
1141 LIST_ADDTAIL(art->ipf->queue,art);
1142 check_reading_pause_resume(art->ipf);
1147 for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1150 char *m= xvasprintf(fmt,al);
1151 warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1152 conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1155 LIST_REMOVE(conns,conn);
1157 check_assign_articles();
1160 static void connfail(Conn *conn, const char *fmt, ...) {
1163 vconnfail(conn,fmt,al);
1167 static void check_idle_conns(void) {
1170 conn->since_activity++;
1173 if (conn->since_activity <= need_activity_periods) continue;
1175 /* We need to shut this down */
1177 connfail(conn,"timed out waiting for response to QUIT");
1178 else if (conn->sent.count)
1179 connfail(conn,"timed out waiting for responses");
1180 else if (conn->waiting.count || conn->priority.count)
1181 connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1182 else if (conn->xmitu)
1183 connfail(conn,"peer has been sending responses"
1184 " before receiving our commands!");
1186 static const char quitcmd[]= "QUIT\r\n";
1187 int todo= sizeof(quitcmd)-1;
1188 const char *p= quitcmd;
1190 int r= write(conn->fd, p, todo);
1192 if (isewouldblock(errno))
1193 connfail(conn, "blocked writing QUIT to idle connection");
1195 connfail(conn, "failed to write QUIT to idle connection: %s",
1203 conn->since_activity= 0;
1204 debug("C%d is idle, quitting", conn->fd);
1213 /*---------- making new connections ----------*/
1215 static pid_t connecting_child;
1216 static int connecting_fdpass_sock;
1218 static void connect_attempt_discard(void) {
1219 if (connecting_child) {
1220 int status= xwaitpid(&connecting_child, "connect");
1221 if (!(WIFEXITED(status) ||
1222 (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1223 report_child_status("connect", status);
1225 if (connecting_fdpass_sock) {
1226 cancel_fd_read_except(connecting_fdpass_sock);
1227 xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1231 #define PREP_DECL_MSG_CMSG(msg) \
1233 struct iovec msgiov; \
1234 msgiov.iov_base= &msgbyte; \
1235 msgiov.iov_len= 1; \
1236 struct msghdr msg; \
1237 memset(&msg,0,sizeof(msg)); \
1238 char msg##cbuf[CMSG_SPACE(sizeof(int))]; \
1239 msg.msg_iov= &msgiov; \
1240 msg.msg_iovlen= 1; \
1241 msg.msg_control= msg##cbuf; \
1242 msg.msg_controllen= sizeof(msg##cbuf);
1244 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1247 assert(fd == connecting_fdpass_sock);
1249 PREP_DECL_MSG_CMSG(msg);
1251 ssize_t rs= recvmsg(fd, &msg, 0);
1253 if (isewouldblock(errno)) return OOP_CONTINUE;
1254 syswarn("failed to read socket from connecting child");
1259 LIST_INIT(conn->waiting);
1260 LIST_INIT(conn->priority);
1261 LIST_INIT(conn->sent);
1263 struct cmsghdr *h= 0;
1264 if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1266 int status= xwaitpid(&connecting_child, "connect child (broken)");
1268 if (WIFEXITED(status)) {
1269 if (WEXITSTATUS(status) != 0 &&
1270 WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1271 WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1272 /* child already reported the problem */;
1274 if (e == OOP_EXCEPTION)
1275 warn("connect: connection child exited code %d but"
1276 " unexpected exception on fdpass socket",
1277 WEXITSTATUS(status));
1279 warn("connect: connection child exited code %d but"
1281 WEXITSTATUS(status), (int)rs);
1283 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1284 warn("connect: connection attempt timed out");
1286 report_child_status("connect", status);
1291 #define CHK(field, val) \
1292 if (h->cmsg_##field != val) { \
1293 die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1294 h->cmsg_##field, val); \
1297 CHK(level, SOL_SOCKET);
1298 CHK(type, SCM_RIGHTS);
1299 CHK(len, CMSG_LEN(sizeof(conn->fd)));
1302 if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1304 memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1307 pid_t got= waitpid(connecting_child, &status, 0);
1308 if (got==-1) sysdie("connect: real wait for child");
1309 assert(got == connecting_child);
1310 connecting_child= 0;
1312 if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1313 int es= WEXITSTATUS(status);
1315 case CONNCHILD_ESTATUS_STREAM: conn->stream= 1; break;
1316 case CONNCHILD_ESTATUS_NOSTREAM: conn->stream= 0; break;
1318 fatal("connect: child gave unexpected exit status %d", es);
1322 conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1324 loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1325 conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1326 if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1327 int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1329 &peer_rd_err, conn);
1330 if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1332 notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1333 LIST_ADDHEAD(conns, conn);
1335 connect_attempt_discard();
1336 check_assign_articles();
1337 return OOP_CONTINUE;
1341 connect_attempt_discard();
1342 return OOP_CONTINUE;
1345 static int allow_connect_start(void) {
1346 return conns.count < max_connections
1347 && !connecting_child
1351 static void connect_start(void) {
1352 assert(!connecting_child);
1353 assert(!connecting_fdpass_sock);
1355 info("starting connection attempt");
1358 int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1359 if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1361 connecting_child= xfork("connection");
1363 if (!connecting_child) {
1364 FILE *cn_from, *cn_to;
1365 char buf[NNTP_STRLEN+100];
1366 int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1368 xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1370 alarm(connection_setup_timeout);
1371 if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1375 unsigned char c= buf[l-1];
1376 if (!isspace(c)) break;
1377 if (c=='\n' || c=='\r') stripped=1;
1381 sysfatal("connect: connection attempt failed");
1384 fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1388 if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1389 sysfatal("connect: authentication failed");
1391 if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1393 sysfatal("connect: could not send MODE STREAM");
1394 buf[sizeof(buf)-1]= 0;
1395 if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1396 if (ferror(cn_from))
1397 sysfatal("connect: could not read response to MODE STREAM");
1399 fatal("connect: connection close in response to MODE STREAM");
1404 fatal("connect: response to MODE STREAM is too long: %.100s...",
1406 l--; if (l>0 && buf[l-1]=='\r') l--;
1409 int rcode= strtoul(buf,&ep,10);
1411 fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1415 exitstatus= CONNCHILD_ESTATUS_STREAM;
1421 warn("connect: unexpected response to MODE STREAM: %.50s",
1427 int fd= fileno(cn_from);
1429 PREP_DECL_MSG_CMSG(msg);
1430 struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1431 cmsg->cmsg_level= SOL_SOCKET;
1432 cmsg->cmsg_type= SCM_RIGHTS;
1433 cmsg->cmsg_len= CMSG_LEN(sizeof(fd));
1434 memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1436 msg.msg_controllen= cmsg->cmsg_len;
1437 r= sendmsg(socks[1], &msg, 0);
1438 if (r<0) sysdie("sendmsg failed for new connection");
1439 if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1444 xclose(socks[1], "connecting fdpass child's socket",0);
1445 connecting_fdpass_sock= socks[0];
1446 xsetnonblock(connecting_fdpass_sock, 1);
1447 on_fd_read_except(connecting_fdpass_sock, connchild_event);
1450 /*---------- assigning articles to conns, and transmitting ----------*/
1452 static Article *dequeue_from(int peek, InputFile *ipf) {
1454 if (peek) return LIST_HEAD(ipf->queue);
1456 Article *art= LIST_REMHEAD(ipf->queue);
1458 check_reading_pause_resume(ipf);
1462 static Article *dequeue(int peek) {
1464 art= dequeue_from(peek, flushing_input_file); if (art) return art;
1465 art= dequeue_from(peek, backlog_input_file); if (art) return art;
1466 art= dequeue_from(peek, main_input_file); if (art) return art;
1470 static void check_assign_articles(void) {
1476 int spare=0, inqueue=0;
1478 /* Find a connection to offer this article. We prefer a busy
1479 * connection to an idle one, provided it's not full. We take the
1480 * first (oldest) and since that's stable, it will mean we fill up
1481 * connections in order. That way if we have too many
1482 * connections, the spare ones will go away eventually.
1485 if (walk->quitting) continue;
1486 inqueue= walk->sent.count + walk->priority.count
1487 + walk->waiting.count;
1488 spare= walk->max_queue - inqueue;
1489 assert(inqueue <= max_queue_per_conn);
1491 if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1492 else if (spare>0) /*working*/ { use= walk; break; }
1495 if (!inqueue) use->since_activity= 0; /* reset idle counter */
1497 Article *art= dequeue(0);
1499 LIST_ADDTAIL(use->waiting, art);
1502 conn_maybe_write(use);
1503 } else if (allow_connect_start()) {
1504 until_connect= reconnect_delay_periods;
1513 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1514 conn_maybe_write(u);
1515 return OOP_CONTINUE;
1518 static void conn_maybe_write(Conn *conn) {
1520 conn_make_some_xmits(conn);
1522 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1526 void *rp= conn_write_some_xmits(conn);
1527 if (rp==OOP_CONTINUE) {
1528 loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1530 } else if (rp==OOP_HALT) {
1533 /* transmitted everything */
1540 /*---------- expiry, flow control and deferral ----------*/
1542 static void check_reading_pause_resume(InputFile *ipf) {
1543 if (ipf->queue.count >= max_queue_per_ipf)
1544 inputfile_reading_pause(ipf);
1546 inputfile_reading_resume(ipf);
1549 static void article_defer(Article *art /* not on a queue */, int whichcount) {
1551 if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1553 sysfatal("write to defer file %s",path_defer);
1554 article_done(art, whichcount);
1557 static int article_check_expired(Article *art /* must be queued, not conn */) {
1558 ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
1559 if (artdata) { SMfreearticle(artdata); return 0; }
1561 LIST_REMOVE(art->ipf->queue, art);
1563 art->ipf->count_nooffer_missing++;
1564 article_done(art,-1);
1568 static void inputfile_queue_check_expired(InputFile *ipf) {
1572 Article *art= LIST_HEAD(ipf->queue);
1573 int exp= article_check_expired(art);
1576 check_reading_pause_resume(ipf);
1579 static void article_autodefer(InputFile *ipf, Article *art) {
1581 article_defer(art,-1);
1584 static int has_article_in(const ArticleList *al, InputFile *ipf) {
1586 for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
1587 if (art->ipf == ipf) return 1;
1591 static void autodefer_input_file_articles(InputFile *ipf) {
1593 while ((art= LIST_REMHEAD(ipf->queue)))
1594 article_autodefer(ipf, art);
1597 static void autodefer_input_file(InputFile *ipf) {
1600 autodefer_input_file_articles(ipf);
1602 if (ipf->inprogress) {
1605 if (has_article_in(&walk->waiting, ipf) ||
1606 has_article_in(&walk->priority, ipf) ||
1607 has_article_in(&walk->sent, ipf))
1610 while (ipf->inprogress) {
1612 if (walk->quitting < 0) goto found;
1613 abort(); /* where are they ?? */
1616 connfail(walk, "connection is stuck or crawling,"
1617 " and we need to finish flush");
1618 autodefer_input_file_articles(ipf);
1622 check_reading_pause_resume(ipf);
1625 /*========== article transmission ==========*/
1627 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1628 XmitKind kind) { /* caller must then fill in details */
1629 struct iovec *v= &conn->xmit[conn->xmitu];
1630 XmitDetails *d= &conn->xmitd[conn->xmitu++];
1631 v->iov_base= (char*)data;
1637 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1638 xmit_core(conn,data,len, xk_Const);
1640 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1642 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1643 XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1647 static void xmit_free(XmitDetails *d) {
1649 case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1650 case xk_Const: break;
1655 static void *conn_write_some_xmits(Conn *conn) {
1657 * 0: nothing more to write, no need to call us again
1658 * OOP_CONTINUE: more to write but fd not writeable
1659 * OOP_HALT: disaster, have destroyed conn
1662 int count= conn->xmitu;
1663 if (!count) return 0;
1665 if (count > IOV_MAX) count= IOV_MAX;
1666 ssize_t rs= writev(conn->fd, conn->xmit, count);
1668 if (isewouldblock(errno)) return OOP_CONTINUE;
1669 connfail(conn, "write failed: %s", strerror(errno));
1675 for (done=0; rs && done<conn->xmitu; done++) {
1676 struct iovec *vp= &conn->xmit[done];
1677 XmitDetails *dp= &conn->xmitd[done];
1678 if (rs > vp->iov_len) {
1682 vp->iov_base= (char*)vp->iov_base + rs;
1686 int newu= conn->xmitu - done;
1687 memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
1688 memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1693 static void conn_make_some_xmits(Conn *conn) {
1695 if (conn->xmitu+5 > CONNIOVS)
1698 Article *art= LIST_REMHEAD(conn->priority);
1699 if (!art) art= LIST_REMHEAD(conn->waiting);
1702 if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1703 /* actually send it */
1705 ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1708 art->state == art_Unchecked ? art_Unsolicited :
1709 art->state == art_Wanted ? art_Wanted :
1712 if (!artdata) art->missing= 1;
1713 art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1717 XMIT_LITERAL("TAKETHIS ");
1718 xmit_noalloc(conn, art->messageid, art->midlen);
1719 XMIT_LITERAL("\r\n");
1720 xmit_artbody(conn, artdata);
1722 article_done(art, -1);
1726 /* we got 235 from IHAVE */
1728 xmit_artbody(conn, artdata);
1730 XMIT_LITERAL(".\r\n");
1734 LIST_ADDTAIL(conn->sent, art);
1740 XMIT_LITERAL("CHECK ");
1742 XMIT_LITERAL("IHAVE ");
1743 xmit_noalloc(conn, art->messageid, art->midlen);
1744 XMIT_LITERAL("\r\n");
1746 assert(art->state == art_Unchecked);
1747 art->ipf->counts[art->state][RC_sent]++;
1748 LIST_ADDTAIL(conn->sent, art);
1753 /*========== handling responses from peer ==========*/
1755 static const oop_rd_style peer_rd_style= {
1756 OOP_RD_DELIM_STRIP, '\n',
1758 OOP_RD_SHORTREC_FORBID
1761 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1762 const char *errmsg, int errnoval,
1763 const char *data, size_t recsz, void *conn_v) {
1765 connfail(conn, "error receiving from peer: %s", errmsg);
1766 return OOP_CONTINUE;
1769 static Article *article_reply_check(Conn *conn, const char *response,
1770 int code_indicates_streaming,
1772 /* 1:yes, -1:no, 0:dontcare */,
1773 const char *sanitised_response) {
1774 Article *art= LIST_HEAD(conn->sent);
1778 "peer gave unexpected response when no commands outstanding: %s",
1779 sanitised_response);
1783 if (code_indicates_streaming) {
1784 assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1785 if (!conn->stream) {
1786 connfail(conn, "peer gave streaming response code "
1787 " to IHAVE or subsequent body: %s", sanitised_response);
1790 const char *got_mid= response+4;
1791 int got_midlen= strcspn(got_mid, " \n\r");
1792 if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1793 connfail(conn, "peer gave streaming response with syntactically invalid"
1794 " messageid: %s", sanitised_response);
1797 if (got_midlen != art->midlen ||
1798 memcmp(got_mid, art->messageid, got_midlen)) {
1799 connfail(conn, "peer gave streaming response code to wrong article -"
1800 " probable synchronisation problem; we offered: %s;"
1802 art->messageid, sanitised_response);
1807 connfail(conn, "peer gave non-streaming response code to"
1808 " CHECK/TAKETHIS: %s", sanitised_response);
1813 if (must_have_sent>0 && art->state < art_Wanted) {
1814 connfail(conn, "peer says article accepted but"
1815 " we had not sent the body: %s", sanitised_response);
1818 if (must_have_sent<0 && art->state >= art_Wanted) {
1819 connfail(conn, "peer says please sent the article but we just did: %s",
1820 sanitised_response);
1824 Article *art_again= LIST_REMHEAD(conn->sent);
1825 assert(art_again == art);
1829 static void update_nocheck(int accepted) {
1830 accept_proportion *= nocheck_decay;
1831 accept_proportion += accepted * (1.0 - nocheck_decay);
1832 int new_nocheck= accept_proportion >= nocheck_thresh;
1833 if (new_nocheck && !nocheck_reported) {
1834 notice("entering nocheck mode for the first time");
1835 nocheck_reported= 1;
1836 } else if (new_nocheck != nocheck) {
1837 debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1839 nocheck= new_nocheck;
1842 static void article_done(Article *art, int whichcount) {
1843 if (whichcount>=0 && !art->missing)
1844 art->ipf->counts[art->state][whichcount]++;
1846 if (whichcount == RC_accepted) update_nocheck(1);
1847 else if (whichcount == RC_unwanted) update_nocheck(0);
1849 InputFile *ipf= art->ipf;
1851 while (art->blanklen) {
1852 static const char spaces[]=
1862 int w= art->blanklen; if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1863 int r= pwrite(ipf->fd, spaces, w, art->offset);
1865 if (errno==EINTR) continue;
1866 sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1867 art->messageid, art->blanklen,
1868 (unsigned long)art->offset, ipf->path);
1870 assert(r>=0 && r<=w);
1876 assert(ipf->inprogress >= 0);
1879 if (!ipf->inprogress && ipf != main_input_file)
1880 queue_check_input_done();
1883 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1884 const char *errmsg, int errnoval,
1885 const char *data, size_t recsz, void *conn_v) {
1888 if (ev == OOP_RD_EOF) {
1889 connfail(conn, "unexpected EOF from peer");
1890 return OOP_CONTINUE;
1892 assert(ev == OOP_RD_OK);
1894 char *sani= sanitise(data,-1);
1897 unsigned long code= strtoul(data, &ep, 10);
1898 if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1899 connfail(conn, "badly formatted response from peer: %s", sani);
1900 return OOP_CONTINUE;
1904 conn->waiting.count ||
1905 conn->priority.count ||
1909 if (conn->quitting) {
1910 if (code!=205 && code!=503) {
1911 connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1913 notice("C%d idle connection closed by us", conn->fd);
1915 LIST_REMOVE(conns,conn);
1918 return OOP_CONTINUE;
1921 conn->since_activity= 0;
1924 #define GET_ARTICLE(musthavesent) do{ \
1925 art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1926 if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */ \
1929 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \
1930 code_streaming= (streaming); \
1931 GET_ARTICLE(musthavesent); \
1932 article_done(art, RC_##how); \
1936 #define PEERBADMSG(m) do { \
1937 connfail(conn, m ": %s", sani); return OOP_CONTINUE; \
1940 int code_streaming= 0;
1944 case 400: PEERBADMSG("peer stopped accepting articles");
1945 default: PEERBADMSG("peer sent unexpected message");
1948 if (conn_busy) PEERBADMSG("peer timed us out");
1949 notice("C%d idle connection closed by peer", conn->fd);
1950 LIST_REMOVE(conns,conn);
1952 return OOP_CONTINUE;
1954 case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1955 case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1957 case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1958 case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1960 case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1961 case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1963 case 238: /* CHECK says send it */
1965 case 335: /* IHAVE says send it */
1967 assert(art->state == art_Unchecked);
1968 art->ipf->counts[art->state][RC_accepted]++;
1969 art->state= art_Wanted;
1970 LIST_ADDTAIL(conn->priority, art);
1973 case 431: /* CHECK or TAKETHIS says try later */
1975 case 436: /* IHAVE says try later */
1977 article_defer(art, RC_deferred);
1983 conn_maybe_write(conn);
1984 check_assign_articles();
1985 return OOP_CONTINUE;
1989 /*========== monitoring of input files ==========*/
1991 static void feedfile_eof(InputFile *ipf) {
1992 assert(ipf != main_input_file); /* promised by tailing_try_read */
1993 inputfile_reading_stop(ipf);
1995 if (ipf == flushing_input_file) {
1996 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1997 if (main_input_file) inputfile_reading_start(main_input_file);
1998 statemc_check_flushing_done();
1999 } else if (ipf == backlog_input_file) {
2000 statemc_check_backlog_done();
2002 abort(); /* supposed to wait rather than get EOF on main input file */
2006 static InputFile *open_input_file(const char *path) {
2007 int fd= open(path, O_RDWR);
2009 if (errno==ENOENT) return 0;
2010 sysfatal("unable to open input file %s", path);
2014 InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
2015 memset(ipf,0,sizeof(*ipf));
2019 LIST_INIT(ipf->queue);
2020 strcpy(ipf->path, path);
2025 static void close_input_file(InputFile *ipf) { /* does not free */
2026 assert(!ipf->readable_callback); /* must have had ->on_cancel */
2027 assert(!ipf->filemon); /* must have had inputfile_reading_stop */
2028 assert(!ipf->rd); /* must have had inputfile_reading_stop */
2029 assert(!ipf->inprogress); /* no dangling pointers pointing here */
2030 xclose_perhaps(&ipf->fd, "input file ", ipf->path);
2034 /*---------- dealing with articles read in the input file ----------*/
2036 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
2037 const char *data, const char *how) {
2038 warn("corrupted file: %s, offset %lu: %s: in %s",
2039 ipf->path, (unsigned long)offset, how, sanitise(data,-1));
2040 ipf->readcount_err++;
2041 if (ipf->readcount_err > max_bad_data_initial +
2042 (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
2043 die("too much garbage in input file! (%d errs, %d ok, %d blank)",
2044 ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
2045 return OOP_CONTINUE;
2048 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
2049 oop_rd_event ev, const char *errmsg,
2050 int errnoval, const char *data, size_t recsz,
2052 InputFile *ipf= ipf_v;
2053 assert(ev == OOP_RD_SYSTEM);
2055 sysdie("error reading input file: %s, offset %lu",
2056 ipf->path, (unsigned long)ipf->offset);
2059 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
2060 oop_rd_event ev, const char *errmsg,
2061 int errnoval, const char *data, size_t recsz,
2063 InputFile *ipf= ipf_v;
2065 char tokentextbuf[sizeof(TOKEN)*2+3];
2067 if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
2069 off_t old_offset= ipf->offset;
2070 ipf->offset += recsz + !!(ev == OOP_RD_OK);
2072 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
2074 if (ev==OOP_RD_PARTREC)
2075 feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
2076 /* but process it anyway */
2078 if (ipf->skippinglong) {
2079 if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
2080 return OOP_CONTINUE;
2082 if (ev==OOP_RD_LONG) {
2083 ipf->skippinglong= 1;
2084 X_BAD_DATA("overly long line");
2087 if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
2088 if (!recsz) X_BAD_DATA("empty line");
2091 if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
2092 ipf->readcount_blank++;
2093 return OOP_CONTINUE;
2096 char *space= strchr(data,' ');
2097 int tokenlen= space-data;
2098 int midlen= (int)recsz-tokenlen-1;
2099 if (midlen <= 2) X_BAD_DATA("no room for messageid");
2100 if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
2102 if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
2103 memcpy(tokentextbuf, data, tokenlen);
2104 tokentextbuf[tokenlen]= 0;
2105 if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
2107 ipf->readcount_ok++;
2109 art= xmalloc(sizeof(*art) - 1 + midlen + 1);
2110 memset(art,0,sizeof(*art));
2111 art->state= art_Unchecked;
2112 art->midlen= midlen;
2113 art->ipf= ipf; ipf->inprogress++;
2114 art->token= TextToToken(tokentextbuf);
2115 art->offset= old_offset;
2116 art->blanklen= recsz;
2117 strcpy(art->messageid, space+1);
2118 LIST_ADDTAIL(ipf->queue, art);
2120 if (ipf->autodefer >= 0)
2121 article_autodefer(ipf, art);
2122 else if (ipf==backlog_input_file)
2123 article_check_expired(art);
2125 if (sms==sm_NORMAL && ipf==main_input_file &&
2126 ipf->offset >= target_max_feedfile_size)
2127 statemc_start_flush("feed file size");
2129 check_assign_articles(); /* may destroy conn but that's OK */
2130 check_reading_pause_resume(ipf);
2131 return OOP_CONTINUE;
2134 /*========== tailing input file ==========*/
2136 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
2138 InputFile *ipf= user;
2139 return ipf->readable_callback(loop, &ipf->readable,
2140 ipf->readable_callback_user);
2143 static void tailing_on_cancel(struct oop_readable *rable) {
2144 InputFile *ipf= (void*)rable;
2146 if (ipf->filemon) filemon_stop(ipf);
2147 loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2148 ipf->readable_callback= 0;
2151 static void tailing_queue_readable(InputFile *ipf) {
2152 /* lifetime of ipf here is OK because destruction will cause
2153 * on_cancel which will cancel this callback */
2154 loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2157 static int tailing_on_readable(struct oop_readable *rable,
2158 oop_readable_call *cb, void *user) {
2159 InputFile *ipf= (void*)rable;
2161 tailing_on_cancel(rable);
2162 ipf->readable_callback= cb;
2163 ipf->readable_callback_user= user;
2166 tailing_queue_readable(ipf);
2170 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2172 InputFile *ipf= (void*)rable;
2174 ssize_t r= read(ipf->fd, buffer, length);
2176 if (errno==EINTR) continue;
2180 if (ipf==main_input_file) {
2183 } else if (ipf==flushing_input_file) {
2185 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2186 } else if (ipf==backlog_input_file) {
2192 tailing_queue_readable(ipf);
2197 /*---------- filemon implemented with inotify ----------*/
2199 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2200 #define HAVE_FILEMON
2202 #include <sys/inotify.h>
2204 static int filemon_inotify_fd;
2205 static int filemon_inotify_wdmax;
2206 static InputFile **filemon_inotify_wd2ipf;
2208 struct Filemon_Perfile {
2212 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2213 int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2214 if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2216 if (wd >= filemon_inotify_wdmax) {
2218 filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2219 sizeof(*filemon_inotify_wd2ipf) * newmax);
2220 memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2221 sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2222 filemon_inotify_wdmax= newmax;
2225 assert(!filemon_inotify_wd2ipf[wd]);
2226 filemon_inotify_wd2ipf[wd]= ipf;
2228 debug("filemon inotify startfile %p wd=%d wdmax=%d",
2229 ipf, wd, filemon_inotify_wdmax);
2234 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2236 debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2237 int r= inotify_rm_watch(filemon_inotify_fd, wd);
2238 if (r) sysdie("inotify_rm_watch");
2239 filemon_inotify_wd2ipf[wd]= 0;
2242 static void *filemon_inotify_readable(oop_source *lp, int fd,
2243 oop_event e, void *u) {
2244 struct inotify_event iev;
2246 int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2248 if (isewouldblock(errno)) break;
2249 sysdie("read from inotify master");
2250 } else if (r==sizeof(iev)) {
2251 assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2253 die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2255 InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2256 debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2257 filemon_callback(ipf);
2259 return OOP_CONTINUE;
2262 static int filemon_method_init(void) {
2263 filemon_inotify_fd= inotify_init();
2264 if (filemon_inotify_fd<0) {
2265 syswarn("filemon/inotify: inotify_init failed");
2268 xsetnonblock(filemon_inotify_fd, 1);
2269 loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2271 debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2275 static void filemon_method_dump_info(FILE *f) {
2277 fprintf(f,"inotify");
2278 DUMPV("%d",,filemon_inotify_fd);
2279 DUMPV("%d",,filemon_inotify_wdmax);
2280 for (i=0; i<filemon_inotify_wdmax; i++)
2281 fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i],);
2284 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2286 /*---------- filemon dummy implementation ----------*/
2288 #if !defined(HAVE_FILEMON)
2290 struct Filemon_Perfile { int dummy; };
2292 static int filemon_method_init(void) {
2293 warn("filemon/dummy: no filemon method compiled in");
2296 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2297 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2298 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2300 #endif /* !HAVE_FILEMON */
2302 /*---------- filemon generic interface ----------*/
2304 static void filemon_start(InputFile *ipf) {
2305 assert(!ipf->filemon);
2308 filemon_method_startfile(ipf, ipf->filemon);
2311 static void filemon_stop(InputFile *ipf) {
2312 if (!ipf->filemon) return;
2313 filemon_method_stopfile(ipf, ipf->filemon);
2318 static void filemon_callback(InputFile *ipf) {
2319 if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2320 ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2323 /*---------- interface to start and stop an input file ----------*/
2325 static const oop_rd_style feedfile_rdstyle= {
2326 OOP_RD_DELIM_STRIP, '\n',
2328 OOP_RD_SHORTREC_LONG,
2331 static void inputfile_reading_resume(InputFile *ipf) {
2332 if (!ipf->rd) return;
2333 if (!ipf->paused) return;
2335 int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2336 feedfile_got_article,ipf, feedfile_read_err, ipf);
2337 if (r) sysdie("unable start reading feedfile %s",ipf->path);
2342 static void inputfile_reading_pause(InputFile *ipf) {
2343 if (!ipf->rd) return;
2344 if (ipf->paused) return;
2345 oop_rd_cancel(ipf->rd);
2349 static void inputfile_reading_start(InputFile *ipf) {
2351 ipf->readable.on_readable= tailing_on_readable;
2352 ipf->readable.on_cancel= tailing_on_cancel;
2353 ipf->readable.try_read= tailing_try_read;
2354 ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2355 ipf->readable.delete_kill= 0;
2357 ipf->readable_callback= 0;
2358 ipf->readable_callback_user= 0;
2360 ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2364 inputfile_reading_resume(ipf);
2367 static void inputfile_reading_stop(InputFile *ipf) {
2369 inputfile_reading_pause(ipf);
2370 oop_rd_delete(ipf->rd);
2372 assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2376 /*========== interaction with innd - state machine ==========*/
2378 /* See official state diagram at top of file. We implement
2389 |`---------------------------------------------------.
2391 |`---------------- - - - |
2392 D ENOENT | D EXISTS see OVERALL STATES diagram |
2393 | for full startup logic |
2396 | ============ try to |
2402 | | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT |
2403 ^ | hardlink F to D |
2406 | | our handle onto F is now onto D |
2409 | |<-------------------<---------------------<---------+
2411 | | spawn inndcomm flush |
2413 | ================== |
2414 | FLUSHING[-ABSENT] |
2416 | main D tail/none |
2417 | ================== |
2419 | | INNDCOMM FLUSH FAILS ^
2420 | |`----------------------->----------. |
2422 | | NO SUCH SITE V |
2423 ^ |`--------------->----. ==================== |
2424 | | \ FLUSHFAILED[-ABSENT] |
2426 | | FLUSH OK \ main D tail/none |
2427 | | open F \ ==================== |
2429 | | \ | TIME TO RETRY |
2430 | |`------->----. ,---<---'\ `----------------'
2431 | | D NONE | | D NONE `----.
2433 | ============= V V ============
2434 | SEPARATED-1 | | DROPPING-1
2435 | flsh->rd!=0 | | flsh->rd!=0
2436 | [Separated] | | [Dropping]
2437 | main F idle | | main none
2438 | flsh D tail | | flsh D tail
2439 | ============= | | ============
2441 ^ | EOF ON D | | defer | EOF ON D
2443 | =============== | | ===============
2444 | SEPARATED-2 | | DROPPING-2
2445 | flsh->rd==0 | V flsh->rd==0
2446 | [Finishing] | | [Dropping]
2447 | main F tail | `. main none
2448 | flsh D closed | `. flsh D closed
2449 | =============== V `. ===============
2451 | | ALL D PROCESSED `. | ALL D PROCESSED
2452 | V install defer as backlog `. | install defer
2453 ^ | close D `. | close D
2454 | | unlink D `. | unlink D
2457 `----------' ==============
2477 static void startup_set_input_file(InputFile *f) {
2478 assert(!main_input_file);
2480 inputfile_reading_start(f);
2483 static void statemc_lock(void) {
2485 struct stat stab, stabf;
2488 lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2489 if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2492 memset(&fl,0,sizeof(fl));
2494 fl.l_whence= SEEK_SET;
2495 int r= fcntl(lockfd, F_SETLK, &fl);
2497 if (errno==EACCES || isewouldblock(errno)) {
2498 if (quiet_multiple) exit(0);
2499 fatal("another duct holds the lockfile");
2501 sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2504 xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2506 xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2508 if (!lock_noent && samefile(&stab, &stabf))
2511 xclose(lockfd, "stale lockfile ", path_lock);
2514 FILE *lockfile= fdopen(lockfd, "w");
2515 if (!lockfile) sysdie("fdopen lockfile");
2517 int r= ftruncate(lockfd, 0);
2518 if (r) sysdie("truncate lockfile to write new info");
2520 if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2521 (unsigned long)self_pid,
2522 sitename, feedfile, remote_host) == EOF ||
2524 sysfatal("write info to lockfile %s", path_lock);
2526 debug("startup: locked");
2529 static void statemc_init(void) {
2530 struct stat stabdefer;
2532 search_backlog_file();
2535 xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2537 debug("startup: ductdefer ENOENT");
2539 debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2540 switch (stabdefer.st_nlink==1) {
2542 open_defer(); /* so that we will later close it and rename it */
2545 xunlink(path_defer, "stale defer file link"
2546 " (presumably hardlink to backlog file)");
2549 die("defer file %s has unexpected link count %d",
2550 path_defer, stabdefer.st_nlink);
2554 struct stat stab_f, stab_d;
2557 InputFile *file_d= open_input_file(path_flushing);
2558 if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2560 xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2562 if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2563 debug("startup: F==D => Hardlinked");
2564 xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2569 debug("startup: F ENOENT => Moved");
2570 if (file_d) startup_set_input_file(file_d);
2571 spawn_inndcomm_flush("feedfile missing at startup");
2572 /* => Flushing, sms:=FLUSHING */
2575 debug("startup: F!=D => Separated");
2576 startup_set_input_file(file_d);
2577 flushing_input_file= main_input_file;
2578 main_input_file= open_input_file(feedfile);
2579 if (!main_input_file) die("feedfile vanished during startup");
2580 SMS(SEPARATED, max_separated_periods,
2581 "found both old and current feed files");
2583 debug("startup: F exists, D ENOENT => Normal");
2584 InputFile *file_f= open_input_file(feedfile);
2585 if (!file_f) die("feed file vanished during startup");
2586 startup_set_input_file(file_f);
2587 SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2592 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2593 assert(sms == sm_NORMAL);
2595 debug("starting flush (%s) (%lu >?= %lu) (%d)",
2597 (unsigned long)(main_input_file ? main_input_file->offset : 0),
2598 (unsigned long)target_max_feedfile_size,
2601 int r= link(feedfile, path_flushing);
2602 if (r) sysfatal("link feedfile %s to flushing file %s",
2603 feedfile, path_flushing);
2606 xunlink(feedfile, "old feedfile link");
2609 spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2612 static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
2616 statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2619 case sm_FLUSHFAILED:
2620 spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2625 warn("took too long to complete old feedfile after flush, autodeferring");
2626 assert(flushing_input_file);
2627 autodefer_input_file(flushing_input_file);
2635 static void statemc_period_poll(void) {
2636 if (!until_flush) return;
2638 assert(until_flush>=0);
2640 if (until_flush) return;
2641 int ok= trigger_flush_ok();
2645 static int inputfile_is_done(InputFile *ipf) {
2647 if (ipf->inprogress) return 0; /* new article in the meantime */
2648 if (ipf->rd) return 0; /* not had EOF */
2652 static void notice_processed(InputFile *ipf, int completed,
2653 const char *what, const char *spec) {
2654 if (!ipf) return; /* allows preterminate to be lazy */
2656 #define RCI_NOTHING(x) /* nothing */
2657 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2658 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2660 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2662 char *inprog= completed
2663 ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2664 : xasprintf(" inprogress=%ld", ipf->inprogress);
2665 char *autodefer= ipf->autodefer >= 0
2666 ? xasprintf(" autodeferred=%ld", ipf->autodefer)
2667 : xasprintf("%s","");
2669 info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
2670 " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2671 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2673 completed?"completed":"processed", what, spec,
2674 ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
2675 inprog, autodefer, ipf->count_nooffer_missing,
2676 CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2677 , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2678 CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2679 , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2680 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
2689 static void statemc_check_backlog_done(void) {
2690 InputFile *ipf= backlog_input_file;
2691 if (!inputfile_is_done(ipf)) return;
2693 const char *slash= strrchr(ipf->path, '/');
2694 const char *leaf= slash ? slash+1 : ipf->path;
2695 const char *under= strchr(slash, '_');
2696 const char *rest= under ? under+1 : leaf;
2697 if (!strncmp(rest,"backlog",7)) rest += 7;
2698 notice_processed(ipf,1,"backlog ",rest);
2700 close_input_file(ipf);
2701 if (unlink(ipf->path)) {
2702 if (errno != ENOENT)
2703 sysdie("could not unlink processed backlog file %s", ipf->path);
2704 warn("backlog file %s vanished while we were reading it"
2705 " so we couldn't remove it (but it's done now, anyway)",
2709 backlog_input_file= 0;
2710 search_backlog_file();
2714 static void statemc_check_flushing_done(void) {
2715 InputFile *ipf= flushing_input_file;
2716 if (!inputfile_is_done(ipf)) return;
2718 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2720 notice_processed(ipf,1,"feedfile","");
2724 xunlink(path_flushing, "old flushing file");
2726 close_input_file(flushing_input_file);
2727 free(flushing_input_file);
2728 flushing_input_file= 0;
2730 if (sms==sm_SEPARATED) {
2731 notice("flush complete");
2732 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2733 } else if (sms==sm_DROPPING) {
2734 SMS(DROPPED, max_separated_periods, "old flush complete");
2735 search_backlog_file();
2736 notice("feed dropped, but will continue until backlog is finished");
2740 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2742 assert(!inputfile_is_done(main_input_file));
2743 statemc_check_flushing_done();
2744 statemc_check_backlog_done();
2745 return OOP_CONTINUE;
2748 static void queue_check_input_done(void) {
2749 loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2752 static void statemc_setstate(StateMachineState newsms, int periods,
2753 const char *forlog, const char *why) {
2755 until_flush= periods;
2757 const char *xtra= "";
2760 case sm_FLUSHFAILED:
2761 if (!main_input_file) xtra= "-ABSENT";
2765 xtra= flushing_input_file->rd ? "-1" : "-2";
2771 info("state %s%s[%d] %s",forlog,xtra,periods,why);
2773 info("state %s%s %s",forlog,xtra,why);
2777 /*---------- defer and backlog files ----------*/
2779 static void open_defer(void) {
2784 defer= fopen(path_defer, "a+");
2785 if (!defer) sysfatal("could not open defer file %s", path_defer);
2787 /* truncate away any half-written records */
2789 xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2791 if (stab.st_size > LONG_MAX)
2792 die("defer file %s size is far too large", path_defer);
2797 long orgsize= stab.st_size;
2798 long truncto= stab.st_size;
2800 if (!truncto) break; /* was only (if anything) one half-truncated record */
2801 if (fseek(defer, truncto-1, SEEK_SET) < 0)
2802 sysdie("seek in defer file %s while truncating partial", path_defer);
2807 sysdie("failed read from defer file %s", path_defer);
2809 die("defer file %s shrank while we were checking it!", path_defer);
2815 if (stab.st_size != truncto) {
2816 warn("truncating half-record at end of defer file %s -"
2817 " shrinking by %ld bytes from %ld to %ld",
2818 path_defer, orgsize - truncto, orgsize, truncto);
2821 sysfatal("could not flush defer file %s", path_defer);
2822 if (ftruncate(fileno(defer), truncto))
2823 sysdie("could not truncate defer file %s", path_defer);
2826 info("continuing existing defer file %s (%ld bytes)",
2827 path_defer, orgsize);
2829 if (fseek(defer, truncto, SEEK_SET))
2830 sysdie("could not seek to new end of defer file %s", path_defer);
2833 static void close_defer(void) {
2838 xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2840 if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2843 time_t now= xtime();
2845 char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2847 (unsigned long)stab.st_ino);
2848 if (link(path_defer, backlog))
2849 sysfatal("could not install defer file %s as backlog file %s",
2850 path_defer, backlog);
2851 if (unlink(path_defer))
2852 sysdie("could not unlink old defer link %s to backlog file %s",
2853 path_defer, backlog);
2857 if (until_backlog_nextscan < 0 ||
2858 until_backlog_nextscan > backlog_retry_minperiods + 1)
2859 until_backlog_nextscan= backlog_retry_minperiods + 1;
2862 static void poll_backlog_file(void) {
2863 if (until_backlog_nextscan < 0) return;
2864 if (until_backlog_nextscan-- > 0) return;
2865 search_backlog_file();
2868 static void search_backlog_file(void) {
2869 /* returns non-0 iff there are any backlog files */
2874 const char *oldest_path=0;
2875 time_t oldest_mtime=0, now;
2877 if (backlog_input_file) return;
2881 r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2885 sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2887 fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2889 for (i=0; i<gl.gl_pathc; i++) {
2890 const char *path= gl.gl_pathv[i];
2892 if (strchr(path,'#') || strchr(path,'~')) {
2893 debug("backlog file search skipping %s", path);
2896 r= stat(path, &stab);
2898 syswarn("failed to stat backlog file %s", path);
2901 if (!S_ISREG(stab.st_mode)) {
2902 warn("backlog file %s is not a plain file (or link to one)", path);
2905 if (!oldest_path || stab.st_mtime < oldest_mtime) {
2907 oldest_mtime= stab.st_mtime;
2910 case GLOB_NOMATCH: /* fall through */
2913 sysdie("glob expansion of backlog pattern %s gave unexpected"
2914 " nonzero (error?) return value %d", globpat_backlog, r);
2918 debug("backlog scan: none");
2920 if (sms==sm_DROPPED) {
2922 notice("feed dropped and our work is complete");
2924 int r= unlink(path_control);
2925 if (r && errno!=ENOENT)
2926 syswarn("failed to remove control symlink for old feed");
2928 xunlink(path_lock, "lockfile for old feed");
2931 until_backlog_nextscan= backlog_spontrescan_periods;
2936 double age= difftime(now, oldest_mtime);
2937 long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2939 if (age_deficiency <= 0) {
2940 debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2941 age, age_deficiency, oldest_path);
2943 backlog_input_file= open_input_file(oldest_path);
2944 if (!backlog_input_file) {
2945 warn("backlog file %s vanished as we opened it", oldest_path);
2949 inputfile_reading_start(backlog_input_file);
2950 until_backlog_nextscan= -1;
2954 until_backlog_nextscan= age_deficiency / period_seconds;
2956 if (backlog_spontrescan_periods >= 0 &&
2957 until_backlog_nextscan > backlog_spontrescan_periods)
2958 until_backlog_nextscan= backlog_spontrescan_periods;
2960 debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2961 age, age_deficiency, until_backlog_nextscan, oldest_path);
2968 /*---------- shutdown and signal handling ----------*/
2970 static void preterminate(void) {
2971 if (in_child) return;
2972 notice_processed(main_input_file,0,"feedfile","");
2973 notice_processed(flushing_input_file,0,"flushing","");
2974 if (backlog_input_file)
2975 notice_processed(backlog_input_file,0, "backlog file ",
2976 backlog_input_file->path);
2979 static int signal_self_pipe[2];
2980 static sig_atomic_t terminate_sig_flag;
2982 static void raise_default(int signo) {
2983 xsigsetdefault(signo);
2988 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2989 assert(fd=signal_self_pipe[0]);
2991 int r= read(signal_self_pipe[0], buf, sizeof(buf));
2992 if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2993 if (r==0) die("eof on signal self pipe");
2994 if (terminate_sig_flag) {
2996 notice("terminating (%s)", strsignal(terminate_sig_flag));
2997 raise_default(terminate_sig_flag);
2999 return OOP_CONTINUE;
3002 static void sigarrived_handler(int signum) {
3007 if (!terminate_sig_flag) terminate_sig_flag= signum;
3012 write(signal_self_pipe[1],&x,1);
3015 static void init_signals(void) {
3016 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
3017 sysdie("could not ignore SIGPIPE");
3019 if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
3021 xsetnonblock(signal_self_pipe[0],1);
3022 xsetnonblock(signal_self_pipe[1],1);
3024 struct sigaction sa;
3025 memset(&sa,0,sizeof(sa));
3026 sa.sa_handler= sigarrived_handler;
3027 sa.sa_flags= SA_RESTART;
3028 xsigaction(SIGTERM,&sa);
3029 xsigaction(SIGINT,&sa);
3031 on_fd_read_except(signal_self_pipe[0], sigarrived_event);
3034 /*========== flushing the feed ==========*/
3036 static pid_t inndcomm_child;
3037 static int inndcomm_sentinel_fd;
3039 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
3040 assert(inndcomm_child);
3041 assert(fd == inndcomm_sentinel_fd);
3042 int status= xwaitpid(&inndcomm_child, "inndcomm");
3045 cancel_fd_read_except(fd);
3046 xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
3047 inndcomm_sentinel_fd= 0;
3049 assert(!flushing_input_file);
3051 if (WIFEXITED(status)) {
3052 switch (WEXITSTATUS(status)) {
3054 case INNDCOMMCHILD_ESTATUS_FAIL:
3057 case INNDCOMMCHILD_ESTATUS_NONESUCH:
3058 notice("feed has been dropped by innd, finishing up");
3059 flushing_input_file= main_input_file;
3060 tailing_queue_readable(flushing_input_file);
3061 /* we probably previously returned EAGAIN from our fake read method
3062 * when in fact we were at EOF, so signal another readable event
3063 * so we actually see the EOF */
3067 if (flushing_input_file) {
3068 SMS(DROPPING, max_separated_periods,
3069 "feed dropped by innd, but must finish last flush");
3072 SMS(DROPPED, 0, "feed dropped by innd");
3073 search_backlog_file();
3075 return OOP_CONTINUE;
3079 flushing_input_file= main_input_file;
3080 tailing_queue_readable(flushing_input_file);
3082 main_input_file= open_input_file(feedfile);
3083 if (!main_input_file)
3084 die("flush succeeded but feedfile %s does not exist!", feedfile);
3086 if (flushing_input_file) {
3087 SMS(SEPARATED, max_separated_periods, "recovery flush complete");
3090 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
3092 return OOP_CONTINUE;
3095 goto unexpected_exitstatus;
3098 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
3099 warn("flush timed out trying to talk to innd");
3102 unexpected_exitstatus:
3103 report_child_status("inndcomm child", status);
3107 SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
3108 return OOP_CONTINUE;
3111 static void inndcommfail(const char *what) {
3112 syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
3113 exit(INNDCOMMCHILD_ESTATUS_FAIL);
3116 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
3119 notice("flushing %s",why);
3121 assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
3122 assert(!inndcomm_child);
3123 assert(!inndcomm_sentinel_fd);
3125 if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
3127 inndcomm_child= xfork("inndcomm child");
3129 if (!inndcomm_child) {
3130 const char *flushargv[2]= { sitename, 0 };
3134 xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
3135 /* parent spots the autoclose of pipefds[1] when we die or exit */
3137 if (simulate_flush>=0) {
3138 warn("SIMULATING flush child status %d", simulate_flush);
3139 if (simulate_flush>128) raise(simulate_flush-128);
3140 else exit(simulate_flush);
3143 alarm(inndcomm_flush_timeout);
3144 r= ICCopen(); if (r) inndcommfail("connect");
3145 r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit");
3146 if (!r) exit(0); /* yay! */
3148 if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
3149 syswarn("innd ctlinnd flush failed: innd said %s", reply);
3150 exit(INNDCOMMCHILD_ESTATUS_FAIL);
3155 xclose(pipefds[1], "inndcomm sentinel child's end",0);
3156 inndcomm_sentinel_fd= pipefds[0];
3157 assert(inndcomm_sentinel_fd);
3158 on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
3160 SMS(FLUSHING, 0, why);
3163 /*========== main program ==========*/
3165 static void postfork_inputfile(InputFile *ipf) {
3167 xclose(ipf->fd, "(in child) input file ", ipf->path);
3170 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
3171 /* we have no stdio streams that are buffered long-term */
3173 if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
3176 static void postfork(void) {
3179 xsigsetdefault(SIGTERM);
3180 xsigsetdefault(SIGINT);
3181 xsigsetdefault(SIGPIPE);
3182 if (terminate_sig_flag) raise(terminate_sig_flag);
3184 postfork_inputfile(main_input_file);
3185 postfork_inputfile(flushing_input_file);
3189 conn_closefd(conn,"(in child) ");
3191 postfork_stdio(defer, "defer file ", path_defer);
3194 typedef struct Every Every;
3196 struct timeval interval;
3201 static void every_schedule(Every *e, struct timeval base);
3203 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3206 if (!e->fixed_rate) xgettimeofday(&base);
3207 every_schedule(e, base);
3208 return OOP_CONTINUE;
3211 static void every_schedule(Every *e, struct timeval base) {
3212 struct timeval when;
3213 timeradd(&base, &e->interval, &when);
3214 loop->on_time(loop, when, every_happens, e);
3217 static void every(int interval, int fixed_rate, void (*f)(void)) {
3218 NEW_DECL(Every *,e);
3219 e->interval.tv_sec= interval;
3220 e->interval.tv_usec= 0;
3221 e->fixed_rate= fixed_rate;
3224 xgettimeofday(&now);
3225 every_schedule(e, now);
3228 static void filepoll(void) {
3229 filemon_callback(main_input_file);
3230 filemon_callback(flushing_input_file);
3233 static char *debug_report_ipf(InputFile *ipf) {
3234 if (!ipf) return xasprintf("none");
3236 const char *slash= strrchr(ipf->path,'/');
3237 const char *path= slash ? slash+1 : ipf->path;
3239 return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
3241 ipf->queue.count, ipf->inprogress, ipf->autodefer,
3242 (long)ipf->offset, ipf->fd,
3243 ipf->rd ? "" : ",!rd",
3244 ipf->skippinglong ? "*skiplong" : "",
3245 ipf->rd && ipf->paused ? "*paused" : "");
3248 static void period(void) {
3249 char *dipf_main= debug_report_ipf(main_input_file);
3250 char *dipf_flushing= debug_report_ipf(flushing_input_file);
3251 char *dipf_backlog= debug_report_ipf(backlog_input_file);
3254 " sms=%s[%d] conns=%d until_connect=%d"
3255 " input_files main:%s flushing:%s backlog:%s[%d]"
3256 " children connecting=%ld inndcomm=%ld"
3258 sms_names[sms], until_flush, conns.count, until_connect,
3259 dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan,
3260 (long)connecting_child, (long)inndcomm_child
3264 free(dipf_flushing);
3267 if (until_connect) until_connect--;
3269 inputfile_queue_check_expired(backlog_input_file);
3270 poll_backlog_file();
3271 if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3272 statemc_period_poll();
3273 check_assign_articles();
3278 /*========== dumping state ==========*/
3280 static void dump_article_list(FILE *f, const ControlCommand *c,
3281 const ArticleList *al) {
3282 fprintf(f, " count=%d\n", al->count);
3283 if (!c->xval) return;
3285 int i; Article *art;
3286 for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3287 fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3288 DUMPV("%p", art->,ipf);
3289 DUMPV("%d", art->,missing);
3290 DUMPV("%lu", (unsigned long)art->,offset);
3291 DUMPV("%d", art->,blanklen);
3292 DUMPV("%d", art->,midlen);
3293 fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3297 static void dump_input_file(FILE *f, const ControlCommand *c,
3298 InputFile *ipf, const char *wh) {
3299 char *dipf= debug_report_ipf(ipf);
3300 fprintf(f,"input %s %s", wh, dipf);
3304 DUMPV("%d", ipf->,readcount_ok);
3305 DUMPV("%d", ipf->,readcount_blank);
3306 DUMPV("%d", ipf->,readcount_err);
3310 ArtState state; const char *const *statename;
3311 for (state=0, statename=artstate_names; *statename; state++,statename++) {
3312 #define RC_DUMP_FMT(x) " " #x "=%d"
3313 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3314 fprintf(f,"input %s counts %-11s"
3315 RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3317 RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3319 fprintf(f,"input %s queue", wh);
3320 dump_article_list(f,c,&ipf->queue);
3326 fprintf(cc->out, "dumping state to %s\n", path_dump);
3327 FILE *f= fopen(path_dump, "w");
3328 if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3330 fprintf(f,"general");
3331 DUMPV("%s", sms_names,[sms]);
3332 DUMPV("%d", ,until_flush);
3333 DUMPV("%ld", (long),self_pid);
3334 DUMPV("%p", , defer);
3335 DUMPV("%d", , until_connect);
3336 DUMPV("%d", , until_backlog_nextscan);
3337 DUMPV("%d", , simulate_flush);
3338 fprintf(f,"\nnocheck");
3339 DUMPV("%#.10f", , accept_proportion);
3340 DUMPV("%d", , nocheck);
3341 DUMPV("%d", , nocheck_reported);
3344 fprintf(f,"special");
3345 DUMPV("%ld", (long),connecting_child);
3346 DUMPV("%d", , connecting_fdpass_sock);
3347 DUMPV("%d", , control_master);
3350 fprintf(f,"filemon ");
3351 filemon_method_dump_info(f);
3353 dump_input_file(f,c, main_input_file, "main" );
3354 dump_input_file(f,c, flushing_input_file, "flushing");
3355 dump_input_file(f,c, backlog_input_file, "backlog" );
3357 fprintf(f,"conns count=%d\n", conns.count);
3362 fprintf(f,"C%d",conn->fd);
3363 DUMPV("%p",conn->,rd); DUMPV("%d",conn->,max_queue);
3364 DUMPV("%d",conn->,stream); DUMPV("%d",conn->,quitting);
3365 DUMPV("%d",conn->,since_activity);
3368 fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3369 fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3370 fprintf(f,"C%d sent", conn->fd); dump_article_list(f,c,&conn->sent);
3372 fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3373 for (i=0; i<conn->xmitu; i++) {
3374 const struct iovec *iv= &conn->xmit[i];
3375 const XmitDetails *xd= &conn->xmitd[i];
3378 case xk_Const: dinfo= xasprintf("Const"); break;
3379 case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break;
3383 fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3384 sanitise(iv->iov_base, iv->iov_len));
3390 DUMPV("%s", , path_lock);
3391 DUMPV("%s", , path_flushing);
3392 DUMPV("%s", , path_defer);
3393 DUMPV("%s", , path_control);
3394 DUMPV("%s", , path_dump);
3395 DUMPV("%s", , globpat_backlog);
3398 if (!!ferror(f) + !!fclose(f)) {
3399 fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3404 /*========== option parsing ==========*/
3406 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3407 static void vbadusage(const char *fmt, va_list al) {
3408 char *m= xvasprintf(fmt,al);
3409 fprintf(stderr, "bad usage: %s\n"
3410 "say --help for help, or read the manpage\n",
3413 syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3417 /*---------- generic option parser ----------*/
3419 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3420 static void badusage(const char *fmt, ...) {
3427 of_seconds= 001000u,
3428 of_boolean= 002000u,
3431 typedef struct Option Option;
3432 typedef void OptionParser(const Option*, const char *val);
3436 const char *lng, *formarg;
3442 static void parse_options(const Option *options, char ***argvp) {
3443 /* on return *argvp is first non-option arg; argc is not updated */
3446 const char *arg= *++(*argvp);
3448 if (*arg != '-') break;
3449 if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3451 while ((a= *++arg)) {
3455 char *equals= strchr(arg,'=');
3456 int len= equals ? (equals - arg) : strlen(arg);
3457 for (o=options; o->shrt || o->lng; o++)
3458 if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3460 badusage("unknown long option --%s",arg);
3463 if (equals) badusage("option --%s does not take a value",o->lng);
3465 } else if (equals) {
3469 if (!arg) badusage("option --%s needs a value for %s",
3470 o->lng, o->formarg);
3473 break; /* eaten the whole argument now */
3475 for (o=options; o->shrt || o->lng; o++)
3478 badusage("unknown short option -%c",a);
3485 if (!arg) badusage("option -%c needs a value for %s",
3486 o->shrt, o->formarg);
3489 break; /* eaten the whole argument now */
3495 #define DELIMPERHAPS(delim,str) (str) ? (delim) : "", (str) ? (str) : ""
3497 static void print_options(const Option *options, FILE *f) {
3499 for (o=options; o->shrt || o->lng; o++) {
3500 char shrt[2] = { o->shrt, 0 };
3501 char *optspec= xasprintf("%s%s%s%s%s",
3502 o->shrt ? "-" : "", shrt,
3503 o->shrt && o->lng ? "|" : "",
3504 DELIMPERHAPS("--", o->lng));
3505 fprintf(f, " %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3510 /*---------- specific option types ----------*/
3512 static void op_integer(const Option *o, const char *val) {
3515 unsigned long ul= strtoul(val,&ep,10);
3516 if (*ep || ep==val || errno || ul>INT_MAX)
3517 badusage("bad integer value for %s",o->lng);
3518 int *store= o->store;
3522 static void op_double(const Option *o, const char *val) {
3523 int *store= o->store;
3526 *store= strtod(val, &ep);
3527 if (*ep || ep==val || errno)
3528 badusage("bad floating point value for %s",o->lng);
3531 static void op_string(const Option *o, const char *val) {
3532 const char **store= o->store;
3536 static void op_seconds(const Option *o, const char *val) {
3537 int *store= o->store;
3541 double v= strtod(val,&ep);
3542 if (ep==val) badusage("bad time/duration value for %s",o->lng);
3544 if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3545 else if (!strcmp(ep,"m") || !strcmp(ep,"min")) unit= 60;
3546 else if (!strcmp(ep,"h") || !strcmp(ep,"hour")) unit= 3600;
3547 else if (!strcmp(ep,"d") || !strcmp(ep,"day")) unit= 86400;
3548 else if (!strcmp(ep,"das")) unit= 10;
3549 else if (!strcmp(ep,"hs")) unit= 100;
3550 else if (!strcmp(ep,"ks")) unit= 1000;
3551 else if (!strcmp(ep,"Ms")) unit= 1000000;
3552 else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3556 if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3560 static void op_setint(const Option *o, const char *val) {
3561 int *store= o->store;
3565 /*---------- specific options ----------*/
3567 static void help(const Option *o, const char *val);
3569 static const Option innduct_options[]= {
3570 {'f',"feedfile", "F", &feedfile, op_string },
3571 {'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 },
3572 {0,"no-daemon", 0, &become_daemon, op_setint, 0 },
3573 {0,"no-streaming", 0, &try_stream, op_setint, 0 },
3574 {0,"no-filemon", 0, &try_filemon, op_setint, 0 },
3575 {'C',"inndconf", "F", &inndconffile, op_string },
3576 {'P',"port", "PORT", &port, op_integer },
3577 {0,"ctrl-sock-dir", 0, &realsockdir, op_string },
3578 {0,"help", 0, 0, help },
3580 {0,"max-connections", "N", &max_connections, op_integer },
3581 {0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer },
3582 {0,"max-queue-per-file", "N", &max_queue_per_ipf, op_integer },
3583 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer },
3584 {0,"period-interval", "TIME", &period_seconds, op_seconds },
3586 {0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds },
3587 {0,"stuck-flush-timeout", "TIME", &inndcomm_flush_timeout, op_seconds },
3588 {0,"feedfile-poll", "TIME", &filepoll_seconds, op_seconds },
3590 {0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double },
3591 {0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double },
3593 {0,"reconnect-interval", "PERIOD", &reconnect_delay_periods, op_seconds },
3594 {0,"flush-retry-interval", "PERIOD", &flushfail_retry_periods, op_seconds },
3595 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3596 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3597 {0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds },
3598 {0,"flush-finish-timeout", "PERIOD", &max_separated_periods, op_seconds },
3599 {0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds },
3601 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double },
3602 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer },
3607 static void printusage(FILE *f) {
3608 fputs("usage: innduct [options] site [fqdn]\n"
3609 "available options are:\n", f);
3610 print_options(innduct_options, f);
3613 static void help(const Option *o, const char *val) {
3615 if (ferror(stdout) || fflush(stdout)) {
3616 perror("innduct: writing help");
3622 static void convert_to_periods_rndup(int *store) {
3623 *store += period_seconds-1;
3624 *store /= period_seconds;
3627 int main(int argc, char **argv) {
3633 parse_options(innduct_options, &argv);
3638 if (!sitename) badusage("need site name argument");
3639 remote_host= *argv++;
3640 if (*argv) badusage("too many non-option arguments");
3644 int r= innconf_read(inndconffile);
3645 if (!r) badusage("could not read inn.conf (more info on stderr)");
3647 if (!remote_host) remote_host= sitename;
3649 if (nocheck_thresh < 0 || nocheck_thresh > 100)
3650 badusage("nocheck threshold percentage must be between 0..100");
3651 nocheck_thresh *= 0.01;
3653 if (nocheck_decay < 0.1)
3654 badusage("nocheck decay articles must be at least 0.1");
3655 nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3657 convert_to_periods_rndup(&reconnect_delay_periods);
3658 convert_to_periods_rndup(&flushfail_retry_periods);
3659 convert_to_periods_rndup(&backlog_retry_minperiods);
3660 convert_to_periods_rndup(&backlog_spontrescan_periods);
3661 convert_to_periods_rndup(&spontaneous_flush_periods);
3662 convert_to_periods_rndup(&max_separated_periods);
3663 convert_to_periods_rndup(&need_activity_periods);
3665 if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3666 badusage("bad input data ratio must be between 0..100");
3667 max_bad_data_ratio *= 0.01;
3670 feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3671 } else if (!feedfile[0]) {
3672 badusage("feed filename must be nonempty");
3673 } else if (feedfile[strlen(feedfile)-1]=='/') {
3674 feedfile= xasprintf("%s%s",feedfile,sitename);
3677 if (max_queue_per_ipf<0)
3678 max_queue_per_ipf= max_queue_per_conn * 2;
3680 const char *feedfile_forbidden= "?*[~#";
3682 while ((c= *feedfile_forbidden++))
3683 if (strchr(feedfile, c))
3684 badusage("feed filename may not contain metacharacter %c",c);
3688 path_lock= xasprintf("%s_lock", feedfile);
3689 path_flushing= xasprintf("%s_flushing", feedfile);
3690 path_defer= xasprintf("%s_defer", feedfile);
3691 path_control= xasprintf("%s_control", feedfile);
3692 path_dump= xasprintf("%s_dump", feedfile);
3693 globpat_backlog= xasprintf("%s_backlog*", feedfile);
3695 oop_source_sys *sysloop= oop_sys_new();
3696 if (!sysloop) sysdie("could not create liboop event loop");
3697 loop= (oop_source*)sysloop;
3701 if (become_daemon) {
3703 for (i=3; i<255; i++)
3704 /* do this now before we open syslog, etc. */
3706 openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3708 int null= open("/dev/null",O_RDWR);
3709 if (null<0) sysfatal("failed to open /dev/null");
3713 xclose(null, "/dev/null original fd",0);
3715 pid_t child1= xfork("daemonise first fork");
3716 if (child1) _exit(0);
3718 pid_t sid= setsid();
3719 if (sid != child1) sysfatal("setsid failed");
3721 pid_t child2= xfork("daemonise second fork");
3722 if (child2) _exit(0);
3726 if (self_pid==-1) sysdie("getpid");
3741 notice("filemon: suppressed by command line option, polling");
3743 filemon_ok= filemon_method_init();
3745 warn("filemon: no file monitoring available, polling");
3748 every(filepoll_seconds,0,filepoll);
3750 every(period_seconds,1,period);
3756 void *run= oop_sys_run(sysloop);
3757 assert(run == OOP_ERROR);
3758 sysdie("event loop failed");