3 * build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
8 to ensure articles go away eventually
9 separate queue for each input file
11 every period, check head of backlog queue for expiry with SMretrieve
12 if too old: discard, and check next article
13 also check every backlog article as we read it
15 after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
16 one-off: eat queued articles from flushing and write them to defer
17 one-off: connfail all connections which have any articles from flushing
18 newly read articles from flushing go straight to defer
19 this should take care of it and get us out of this state
20 to avoid filling up ram needlessly
22 limit number of queued articles for each ipf
23 pause/resume inputfile tailing
27 * Newsfeeds file entries should look like this:
28 * host.name.of.site[/exclude,exclude,...]\
29 * :pattern,pattern...[/distribution,distribution...]\
33 * sitename[/exclude,exclude,...]\
34 * :pattern,pattern...[/distribution,distribution...]\
40 * or might be blanked out
41 * <spc><spc><spc><spc>....
43 * F site.name main feed file
44 * opened/created, then written, by innd
47 * tokens blanked out by duct when processed
48 * site.name_lock lock preventing multiple ducts
49 * to hold lock must open,F_SETLK[W]
50 * and then stat to check that locked file
51 * still has name site.name_lock
52 * holder of this lock is "duct"
53 * (only) lockholder may remove the lockfile
54 * D site.name_flushing temporary feed file during flush (or crash)
55 * hardlink created by duct
57 * site.name_defer 431'd articles, still being written,
58 * created, written, used by duct
60 * site.name_backlog.<date>.<inum>
61 * 431'd articles, ready for innxmit or duct
62 * created (link/mv) by duct
63 * site.name_backlog<anything-else> (where <anything-else> does not
64 * contain '#' or '~') eg
65 * site.name_backlog.manual
66 * anything the sysadmin likes (eg, feed files
67 * from old feeds to be merged into this one)
68 * created (link/mv) by admin
69 * may be symlinks (in which case links
70 * may be written through, but only links
73 * It is safe to remove backlog files manually,
74 * if it's desired to throw away the backlog.
76 * Backlog files are also processed by innduct. We find the oldest
77 * backlog file which is at least a certain amount old, and feed it
78 * back into our processing. When every article in it has been read
79 * and processed, we unlink it and look for another backlog file.
81 * If we don't have a backlog file that we're reading, we close the
82 * defer file that we're writing and make it into a backlog file at
83 * the first convenient opportunity.
94 | | <----------------<---------------------------------'|
100 | F: innd writing, duct reading |
103 | | duct decides time to flush |
104 | | duct makes hardlink |
106 | V <------------------------'|
108 | F == D: innd writing, duct reading both exist |
111 | | <-----------<-------------<--'|
112 | | open D F ENOENT |
115 | V <---------------------. |
118 | D: innd writing, duct reading; or ENOENT | |
120 | | duct requests flush of feed | |
121 | | (others can too, harmlessly) | |
125 | D: innd flushing, duct; or ENOENT | |
127 | | inndcomm flush fails | |
128 | |`-------------------------->------------------' |
130 | | inndcomm reports no such site |
131 | |`---------------------------------------------------- | -.
133 | | innd finishes writing D, creates F | |
134 | | inndcomm reports flush successful | |
137 | Separated <----------------' |
138 | F: innd writing F!=D /
139 | D: duct reading; or ENOENT both exist /
141 | | duct gets to the end of D /
142 | | duct opens F too /
145 | F: innd writing, duct reading |
146 | D: duct finishing V
148 | | duct finishes processing D F: ENOENT
149 | V duct unlinks D D: duct reading
151 `--<--' | duct finishes
161 "duct reading" means innduct is reading the file but also
162 overwriting processed tokens.
166 * rune for printing diagrams:
168 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
173 /*============================== PROGRAM ==============================*/
175 #define _GNU_SOURCE 1
181 #include "inndcomm.h"
183 #include "inn/list.h"
184 #include "inn/innconf.h"
187 #include <sys/types.h>
188 #include <sys/wait.h>
189 #include <sys/stat.h>
190 #include <sys/socket.h>
209 #include <oop-read.h>
211 /*----- general definitions, probably best not changed -----*/
213 #define CONNCHILD_ESTATUS_STREAM 24
214 #define CONNCHILD_ESTATUS_NOSTREAM 25
216 #define INNDCOMMCHILD_ESTATUS_FAIL 26
217 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
219 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
220 #define MAX_CONTROL_COMMAND 1000
222 #define VA va_list al; va_start(al,fmt)
223 #define PRINTF(f,a) __attribute__((__format__(printf,f,a)))
224 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
225 #define NORET __attribute__((__noreturn__))
227 #define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr))))
228 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
230 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
232 #define FOR_CONN(conn) \
233 for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
235 /*----- doubly linked lists -----*/
237 #define ISNODE(T) struct node list_node
240 union { struct list li; T *for_type; } u; \
244 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
246 #define LIST_CHECKCANHAVENODE(l,n) \
247 ((void)((n) == ((l).u.for_type))) /* just for the type check */
249 #define LIST_ADDSOMEHOW(l,n,list_addsomehow) \
250 ( LIST_CHECKCANHAVENODE(l,n), \
251 list_addsomehow(&(l).u.li, NODE((n))), \
255 #define LIST_REMSOMEHOW(l,list_remsomehow) \
256 ( (typeof((l).u.for_type)) \
259 list_remsomehow(&(l).u.li) ) \
265 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
266 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
267 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
268 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
270 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
271 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
272 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
273 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
275 #define LIST_REMOVE(l,n) \
276 ( LIST_CHECKCANHAVENODE(l,n), \
277 list_remove(NODE((n))), \
281 #define LIST_INSERT(l,n,pred) \
282 ( LIST_CHECKCANHAVENODE(l,n), \
283 LIST_CHECKCANHAVENODE(l,pred), \
284 list_insert((struct list*)&(l), NODE((n)), NODE((pred))), \
288 /*----- type predeclarations -----*/
290 typedef struct Conn Conn;
291 typedef struct Article Article;
292 typedef struct InputFile InputFile;
293 typedef struct XmitDetails XmitDetails;
294 typedef struct Filemon_Perfile Filemon_Perfile;
295 typedef enum StateMachineState StateMachineState;
296 typedef struct ControlCommand ControlCommand;
301 /*----- function predeclarations -----*/
303 static void conn_maybe_write(Conn *conn);
304 static void conn_make_some_xmits(Conn *conn);
305 static void *conn_write_some_xmits(Conn *conn);
307 static void xmit_free(XmitDetails *d);
309 #define SMS(newstate, periods, why) \
310 (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
311 static void statemc_setstate(StateMachineState newsms, int periods,
312 const char *forlog, const char *why);
314 static void statemc_start_flush(const char *why); /* Normal => Flushing */
315 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
316 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
318 static void article_done(Article *art, int whichcount);
320 static void check_assign_articles(void);
321 static void queue_check_input_done(void);
322 static void check_reading_pause_resume(InputFile *ipf);
324 static void statemc_check_flushing_done(void);
325 static void statemc_check_backlog_done(void);
327 static void postfork(void);
328 static void period(void);
330 static void open_defer(void);
331 static void close_defer(void);
332 static void search_backlog_file(void);
333 static void preterminate(void);
334 static void raise_default(int signo) NORET;
335 static char *debug_report_ipf(InputFile *ipf);
337 static void inputfile_reading_start(InputFile *ipf);
338 static void inputfile_reading_stop(InputFile *ipf);
339 static void inputfile_reading_pause(InputFile *ipf);
340 static void inputfile_reading_resume(InputFile *ipf);
341 /* pause and resume are idempotent, and no-op if not done _reading_start */
343 static void filemon_start(InputFile *ipf);
344 static void filemon_stop(InputFile *ipf);
345 static void filemon_callback(InputFile *ipf);
347 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
348 static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
350 static const oop_rd_style peer_rd_style;
351 static oop_rd_call peer_rd_err, peer_rd_ok;
353 /*----- configuration options -----*/
354 /* when changing defaults, remember to update the manpage */
356 static const char *sitename, *remote_host;
357 static const char *feedfile, *realsockdir="/tmp/innduct.control";
358 static int quiet_multiple=0;
359 static int become_daemon=1, try_filemon=1;
360 static int try_stream=1;
362 static const char *inndconffile;
364 static int max_connections=10;
365 static int max_queue_per_conn=200;
366 static int target_max_feedfile_size=100000;
367 static int period_seconds=60;
368 static int filepoll_seconds=5;
369 static int max_queue_per_ipf=-1;
371 static int connection_setup_timeout=200;
372 static int inndcomm_flush_timeout=100;
374 static double nocheck_thresh= 95.0; /* converted from percentage by main */
375 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
377 /* all these are initialised to seconds, and converted to periods in main */
378 static int reconnect_delay_periods=1000;
379 static int flushfail_retry_periods=1000;
380 static int backlog_retry_minperiods=50;
381 static int backlog_spontrescan_periods=300;
382 static int spontaneous_flush_periods=100000;
383 static int max_separated_periods=2000;
384 static int need_activity_periods=1000;
386 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
387 static int max_bad_data_initial= 30;
388 /* in one corrupt 4096-byte block the number of newlines has
389 * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */
392 /*----- statistics -----*/
394 typedef enum { /* in queue in conn->sent */
395 art_Unchecked, /* not checked, not sent checking */
396 art_Wanted, /* checked, wanted sent body as requested */
397 art_Unsolicited, /* - sent body without check */
401 static const char *const artstate_names[]=
402 { "Unchecked", "Wanted", "Unsolicited", 0 };
404 #define RESULT_COUNTS(RCS,RCN) \
413 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
414 #define RCI_TRIPLE_VALS_BASE(counts,x) \
415 counts[art_Unchecked] x \
416 + counts[art_Wanted] x \
417 + counts[art_Unsolicited] x, \
418 counts[art_Unchecked] x \
419 , counts[art_Wanted] x \
420 , counts[art_Unsolicited] x
423 #define RC_INDEX(x) RC_##x,
424 RESULT_COUNTS(RC_INDEX, RC_INDEX)
429 /*----- transmission buffers -----*/
445 /*----- core operational data structure types -----*/
448 /* This is also an instance of struct oop_readable */
449 struct oop_readable readable; /* first */
450 oop_readable_call *readable_callback;
451 void *readable_callback_user;
454 Filemon_Perfile *filemon;
456 oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
458 int skippinglong, paused;
461 long inprogress; /* includes queue.count and also articles in conns */
462 long autodefer; /* -1 means not doing autodefer */
464 int counts[art_MaxState][RCI_max];
465 int readcount_ok, readcount_blank, readcount_err, count_nooffer_missing;
480 #define SMS_LIST(X) \
488 enum StateMachineState {
489 #define SMS_DEF_ENUM(s) sm_##s,
490 SMS_LIST(SMS_DEF_ENUM)
493 static const char *sms_names[]= {
494 #define SMS_DEF_NAME(s) #s ,
495 SMS_LIST(SMS_DEF_NAME)
501 int fd; /* may be 0, meaning closed (during construction/destruction) */
502 oop_read *rd; /* likewise */
503 int max_queue, stream, quitting;
504 int since_activity; /* periods */
505 ArticleList waiting; /* not yet told peer */
506 ArticleList priority; /* peer says send it now */
507 ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
508 struct iovec xmit[CONNIOVS];
509 XmitDetails xmitd[CONNIOVS];
514 /*----- general operational variables -----*/
516 /* main initialises */
517 static oop_source *loop;
518 static ConnList conns;
519 static char *path_lock, *path_flushing, *path_defer;
520 static char *path_control, *path_dump;
521 static char *globpat_backlog;
522 static pid_t self_pid;
524 /* statemc_init initialises */
525 static StateMachineState sms;
526 static int until_flush;
527 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
530 /* initialisation to 0 is good */
531 static int until_connect, until_backlog_nextscan;
532 static double accept_proportion;
533 static int nocheck, nocheck_reported, in_child;
535 /* for simulation, debugging, etc. */
536 int simulate_flush= -1;
538 /*========== logging ==========*/
540 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
541 static void logcore(int sysloglevel, const char *fmt, ...) {
544 vsyslog(sysloglevel,fmt,al);
546 if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
547 vfprintf(stderr,fmt,al);
553 static void logv(int sysloglevel, const char *pfx, int errnoval,
554 const char *fmt, va_list al) PRINTF(5,0);
555 static void logv(int sysloglevel, const char *pfx, int errnoval,
556 const char *fmt, va_list al) {
557 char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
558 vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
559 msgbuf[sizeof(msgbuf)-1]= 0;
561 if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
562 sysloglevel= LOG_ERR; /* run by wrong user, probably */
564 logcore(sysloglevel, "<%s>%s: %s%s%s",
565 sitename, pfx, msgbuf,
566 errnoval>=0 ? ": " : "",
567 errnoval>=0 ? strerror(errnoval) : "");
570 #define diewrap(fn, pfx, sysloglevel, err, estatus) \
571 static void fn(const char *fmt, ...) NORET_PRINTF(1,2); \
572 static void fn(const char *fmt, ...) { \
575 logv(sysloglevel, pfx, err, fmt, al); \
579 #define logwrap(fn, pfx, sysloglevel, err) \
580 static void fn(const char *fmt, ...) PRINTF(1,2); \
581 static void fn(const char *fmt, ...) { \
583 logv(sysloglevel, pfx, err, fmt, al); \
587 diewrap(sysdie, " critical", LOG_CRIT, errno, 16);
588 diewrap(die, " critical", LOG_CRIT, -1, 16);
590 diewrap(sysfatal, " fatal", LOG_ERR, errno, 12);
591 diewrap(fatal, " fatal", LOG_ERR, -1, 12);
593 logwrap(syswarn, " warning", LOG_WARNING, errno);
594 logwrap(warn, " warning", LOG_WARNING, -1);
596 logwrap(notice, " notice", LOG_NOTICE, -1);
597 logwrap(info, " info", LOG_INFO, -1);
598 logwrap(debug, " debug", LOG_DEBUG, -1);
601 /*========== utility functions etc. ==========*/
603 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
604 static char *xvasprintf(const char *fmt, va_list al) {
606 int rc= vasprintf(&str,fmt,al);
607 if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
610 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
611 static char *xasprintf(const char *fmt, ...) {
613 char *str= xvasprintf(fmt,al);
618 static int close_perhaps(int *fd) {
619 if (*fd <= 0) return 0;
624 static void xclose(int fd, const char *what, const char *what2) {
626 if (r) sysdie("close %s%s",what,what2?what2:"");
628 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
629 if (*fd <= 0) return;
630 xclose(*fd,what,what2);
634 static pid_t xfork(const char *what) {
638 if (child==-1) sysfatal("cannot fork for %s",what);
639 debug("forked %s %ld", what, (unsigned long)child);
640 if (!child) postfork();
644 static void on_fd_read_except(int fd, oop_call_fd callback) {
645 loop->on_fd(loop, fd, OOP_READ, callback, 0);
646 loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
648 static void cancel_fd_read_except(int fd) {
649 loop->cancel_fd(loop, fd, OOP_READ);
650 loop->cancel_fd(loop, fd, OOP_EXCEPTION);
653 static void report_child_status(const char *what, int status) {
654 if (WIFEXITED(status)) {
655 int es= WEXITSTATUS(status);
657 warn("%s: child died with error exit status %d", what, es);
658 } else if (WIFSIGNALED(status)) {
659 int sig= WTERMSIG(status);
660 const char *sigstr= strsignal(sig);
661 const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
663 warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
665 warn("%s: child died due to unknown fatal signal %d%s",
666 what, sig, coredump);
668 warn("%s: child died with unknown wait status %d", what,status);
672 static int xwaitpid(pid_t *pid, const char *what) {
675 int r= kill(*pid, SIGKILL);
676 if (r) sysdie("cannot kill %s child", what);
678 pid_t got= waitpid(*pid, &status, 0);
679 if (got==-1) sysdie("cannot reap %s child", what);
680 if (got==0) die("cannot reap %s child", what);
687 static void *zxmalloc(size_t sz) {
688 void *p= xmalloc(sz);
693 static void xunlink(const char *path, const char *what) {
695 if (r) sysdie("can't unlink %s %s", path, what);
698 static time_t xtime(void) {
700 if (now==-1) sysdie("time(2) failed");
704 static void xsigaction(int signo, const struct sigaction *sa) {
705 int r= sigaction(signo,sa,0);
706 if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
709 static void xsigsetdefault(int signo) {
711 memset(&sa,0,sizeof(sa));
712 sa.sa_handler= SIG_DFL;
713 xsigaction(signo,&sa);
716 static void xgettimeofday(struct timeval *tv_r) {
717 int r= gettimeofday(tv_r,0);
718 if (r) sysdie("gettimeofday(2) failed");
721 static void xsetnonblock(int fd, int nonblocking) {
722 int errnoval= oop_fd_nonblock(fd, nonblocking);
723 if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
726 static void check_isreg(const struct stat *stab, const char *path,
728 if (!S_ISREG(stab->st_mode))
729 die("%s %s not a plain file (mode 0%lo)",
730 what, path, (unsigned long)stab->st_mode);
733 static void xfstat(int fd, struct stat *stab_r, const char *what) {
734 int r= fstat(fd, stab_r);
735 if (r) sysdie("could not fstat %s", what);
738 static void xfstat_isreg(int fd, struct stat *stab_r,
739 const char *path, const char *what) {
740 xfstat(fd, stab_r, what);
741 check_isreg(stab_r, path, what);
744 static void xlstat_isreg(const char *path, struct stat *stab,
745 int *enoent_r /* 0 means ENOENT is fatal */,
747 int r= lstat(path, stab);
749 if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
750 sysdie("could not lstat %s %s", what, path);
752 if (enoent_r) *enoent_r= 0;
753 check_isreg(stab, path, what);
756 static int samefile(const struct stat *a, const struct stat *b) {
757 assert(S_ISREG(a->st_mode));
758 assert(S_ISREG(b->st_mode));
759 return (a->st_ino == b->st_ino &&
760 a->st_dev == b->st_dev);
763 static char *sanitise(const char *input, int len) {
764 static char sanibuf[100]; /* returns pointer to this buffer! */
766 const char *p= input;
767 const char *endp= len>=0 ? input+len : 0;
771 if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
772 int c= (!endp || p<endp) ? *p++ : 0;
773 if (!c) { *q++= '\''; *q=0; break; }
774 if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
775 sprintf(q,"\\x%02x",c);
781 static int isewouldblock(int errnoval) {
782 return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
785 /*========== command and control connections ==========*/
787 static int control_master;
789 typedef struct ControlConn ControlConn;
791 void (*destroy)(ControlConn*);
797 struct sockaddr_un un;
802 static const oop_rd_style control_rd_style= {
803 OOP_RD_DELIM_STRIP, '\n',
805 OOP_RD_SHORTREC_FORBID
808 static void control_destroy(ControlConn *cc) {
812 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
813 if (ferror(cc->out) | fflush(cc->out)) {
814 info("CTRL%d write error %s", cc->fd, strerror(errno));
819 static void control_prompt(ControlConn *cc /* may destroy*/) {
820 fprintf(cc->out, "%s| ", sitename);
821 control_checkouterr(cc);
824 struct ControlCommand {
826 void (*f)(ControlConn *cc, const ControlCommand *ccmd,
827 const char *arg, size_t argsz);
832 static const ControlCommand control_commands[];
835 static void ccmd_##wh(ControlConn *cc, const ControlCommand *c, \
836 const char *arg, size_t argsz)
839 fputs("commands:\n", cc->out);
840 const ControlCommand *ccmd;
841 for (ccmd=control_commands; ccmd->cmd; ccmd++)
842 fprintf(cc->out, " %s\n", ccmd->cmd);
843 fputs("NB: permissible arguments are not shown above."
844 " Not all commands listed are safe. See innduct(8).\n", cc->out);
848 int ok= trigger_flush_ok();
849 if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
854 notice("terminating (CTRL%d)",cc->fd);
855 raise_default(SIGTERM);
861 /* messing with our head: */
862 CCMD(period) { period(); }
863 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
864 CCMD(setint) { *(int*)c->xdata= c->xval; }
865 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
867 static const ControlCommand control_commands[]= {
869 { "flush", ccmd_flush },
870 { "stop", ccmd_stop },
871 { "dump q", ccmd_dump, 0,0 },
872 { "dump a", ccmd_dump, 0,1 },
874 { "p", ccmd_period },
876 #define POKES(cmd,func) \
877 { cmd "flush", func, &until_flush, 1 }, \
878 { cmd "conn", func, &until_connect, 0 }, \
879 { cmd "blscan", func, &until_backlog_nextscan, 0 },
880 POKES("next ", ccmd_setint)
881 POKES("prod ", ccmd_setint_period)
883 { "pretend flush", ccmd_setintarg, &simulate_flush },
884 { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 },
888 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
889 const char *errmsg, int errnoval,
890 const char *data, size_t recsz, void *cc_v) {
891 ControlConn *cc= cc_v;
894 info("CTRL%d closed", cc->fd);
899 if (recsz == 0) goto prompt;
901 const ControlCommand *ccmd;
902 for (ccmd=control_commands; ccmd->cmd; ccmd++) {
903 int l= strlen(ccmd->cmd);
904 if (recsz < l) continue;
905 if (recsz > l && data[l] != ' ') continue;
906 if (memcmp(data, ccmd->cmd, l)) continue;
908 int argl= (int)recsz - (l+1);
909 ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
913 fputs("unknown command; h for help\n", cc->out);
920 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
921 const char *errmsg, int errnoval,
922 const char *data, size_t recsz, void *cc_v) {
923 ControlConn *cc= cc_v;
925 info("CTRL%d read error %s", cc->fd, errmsg);
930 static int control_conn_startup(ControlConn *cc /* may destroy*/,
932 cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
933 if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
935 int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
938 if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
940 info("CTRL%d %s ready", cc->fd, how);
945 static void control_stdio_destroy(ControlConn *cc) {
947 oop_rd_cancel(cc->rd);
948 errno= oop_rd_delete_tidy(cc->rd);
949 if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
954 static void control_stdio(void) {
955 NEW_DECL(ControlConn *,cc);
956 cc->destroy= control_stdio_destroy;
960 int r= control_conn_startup(cc,"stdio");
961 if (r) cc->destroy(cc);
964 static void control_accepted_destroy(ControlConn *cc) {
966 oop_rd_cancel(cc->rd);
967 oop_rd_delete_kill(cc->rd);
969 if (cc->out) { fclose(cc->out); cc->fd=0; }
970 close_perhaps(&cc->fd);
974 static void *control_master_readable(oop_source *lp, int master,
975 oop_event ev, void *u) {
976 NEW_DECL(ControlConn *,cc);
977 cc->destroy= control_accepted_destroy;
979 cc->salen= sizeof(cc->sa);
980 cc->fd= accept(master, &cc->sa.sa, &cc->salen);
981 if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
983 cc->out= fdopen(cc->fd, "w");
984 if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
986 int r= control_conn_startup(cc, "accepted");
996 #define NOCONTROL(...) do{ \
997 syswarn("no control socket, because failed to " __VA_ARGS__); \
1001 static void control_init(void) {
1006 struct sockaddr_un un;
1009 memset(&sa,0,sizeof(sa));
1010 int maxlen= sizeof(sa.un.sun_path);
1012 int reallen= readlink(path_control, sa.un.sun_path, maxlen);
1014 if (errno != ENOENT)
1015 NOCONTROL("readlink control socket symlink path %s", path_control);
1017 if (reallen >= maxlen) {
1018 debug("control socket symlink path too long (r=%d)",reallen);
1019 xunlink(path_control, "old (overlong) control socket symlink");
1025 int r= lstat(realsockdir,&stab);
1027 if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
1029 r= mkdir(realsockdir, 0700);
1030 if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
1033 uid_t self= geteuid();
1034 if (!S_ISDIR(stab.st_mode) ||
1035 stab.st_uid != self ||
1036 stab.st_mode & 0007) {
1037 warn("no control socket, because real socket directory"
1038 " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
1039 !!S_ISDIR(stab.st_mode),
1040 (unsigned long)stab.st_uid, (unsigned long)self,
1041 (unsigned long)stab.st_mode & 0777UL);
1046 real= xasprintf("%s/s%lx.%lx", realsockdir,
1047 (unsigned long)xtime(), (unsigned long)self_pid);
1048 int reallen= strlen(real);
1050 if (reallen >= maxlen) {
1051 warn("no control socket, because tmpnam gave overly-long path"
1055 r= symlink(real, path_control);
1056 if (r) NOCONTROL("make control socket path %s a symlink to real"
1057 " socket path %s", path_control, real);
1058 memcpy(sa.un.sun_path, real, reallen);
1061 int r= unlink(sa.un.sun_path);
1062 if (r && errno!=ENOENT)
1063 NOCONTROL("remove old real socket %s", sa.un.sun_path);
1065 control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1066 if (control_master<0) NOCONTROL("create new control socket");
1068 sa.un.sun_family= AF_UNIX;
1069 int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1070 r= bind(control_master, &sa.sa, sl);
1071 if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1073 r= listen(control_master, 5);
1074 if (r) NOCONTROL("listen");
1076 xsetnonblock(control_master, 1);
1078 loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1079 info("control socket ok, real path %s", sa.un.sun_path);
1085 xclose_perhaps(&control_master, "control master",0);
1089 /*========== management of connections ==========*/
1091 static void conn_closefd(Conn *conn, const char *msgprefix) {
1092 int r= close_perhaps(&conn->fd);
1093 if (r) info("C%d %serror closing socket: %s",
1094 conn->fd, msgprefix, strerror(errno));
1097 static void conn_dispose(Conn *conn) {
1100 oop_rd_cancel(conn->rd);
1101 oop_rd_delete_kill(conn->rd);
1105 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1106 loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1108 conn_closefd(conn,"");
1110 until_connect= reconnect_delay_periods;
1113 static void *conn_exception(oop_source *lp, int fd,
1114 oop_event ev, void *conn_v) {
1117 assert(fd == conn->fd);
1118 assert(ev == OOP_EXCEPTION);
1119 int r= read(conn->fd, &ch, 1);
1120 if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1121 else connfail(conn,"exceptional condition on socket (peer sent urgent"
1122 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1123 return OOP_CONTINUE;
1126 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1127 int requeue[art_MaxState];
1128 memset(requeue,0,sizeof(requeue));
1132 while ((art= LIST_REMHEAD(conn->priority)))
1133 LIST_ADDTAIL(art->ipf->queue, art);
1135 while ((art= LIST_REMHEAD(conn->waiting)))
1136 LIST_ADDTAIL(art->ipf->queue, art);
1138 while ((art= LIST_REMHEAD(conn->sent))) {
1139 requeue[art->state]++;
1140 if (art->state==art_Unsolicited) art->state= art_Unchecked;
1141 LIST_ADDTAIL(art->ipf->queue,art);
1142 check_reading_pause_resume(art->ipf);
1147 for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1150 char *m= xvasprintf(fmt,al);
1151 warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1152 conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1155 LIST_REMOVE(conns,conn);
1157 check_assign_articles();
1160 static void connfail(Conn *conn, const char *fmt, ...) {
1163 vconnfail(conn,fmt,al);
1167 static void check_idle_conns(void) {
1170 conn->since_activity++;
1173 if (conn->since_activity <= need_activity_periods) continue;
1175 /* We need to shut this down */
1177 connfail(conn,"timed out waiting for response to QUIT");
1178 else if (conn->sent.count)
1179 connfail(conn,"timed out waiting for responses");
1180 else if (conn->waiting.count || conn->priority.count)
1181 connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1182 else if (conn->xmitu)
1183 connfail(conn,"peer has been sending responses"
1184 " before receiving our commands!");
1186 static const char quitcmd[]= "QUIT\r\n";
1187 int todo= sizeof(quitcmd)-1;
1188 const char *p= quitcmd;
1190 int r= write(conn->fd, p, todo);
1192 if (isewouldblock(errno))
1193 connfail(conn, "blocked writing QUIT to idle connection");
1195 connfail(conn, "failed to write QUIT to idle connection: %s",
1203 conn->since_activity= 0;
1204 debug("C%d is idle, quitting", conn->fd);
1213 /*---------- making new connections ----------*/
1215 static pid_t connecting_child;
1216 static int connecting_fdpass_sock;
1218 static void connect_attempt_discard(void) {
1219 if (connecting_child) {
1220 int status= xwaitpid(&connecting_child, "connect");
1221 if (!(WIFEXITED(status) ||
1222 (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1223 report_child_status("connect", status);
1225 if (connecting_fdpass_sock) {
1226 cancel_fd_read_except(connecting_fdpass_sock);
1227 xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1231 #define PREP_DECL_MSG_CMSG(msg) \
1233 struct iovec msgiov; \
1234 msgiov.iov_base= &msgbyte; \
1235 msgiov.iov_len= 1; \
1236 struct msghdr msg; \
1237 memset(&msg,0,sizeof(msg)); \
1238 char msg##cbuf[CMSG_SPACE(sizeof(int))]; \
1239 msg.msg_iov= &msgiov; \
1240 msg.msg_iovlen= 1; \
1241 msg.msg_control= msg##cbuf; \
1242 msg.msg_controllen= sizeof(msg##cbuf);
1244 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1247 assert(fd == connecting_fdpass_sock);
1249 PREP_DECL_MSG_CMSG(msg);
1251 ssize_t rs= recvmsg(fd, &msg, 0);
1253 if (isewouldblock(errno)) return OOP_CONTINUE;
1254 syswarn("failed to read socket from connecting child");
1259 LIST_INIT(conn->waiting);
1260 LIST_INIT(conn->priority);
1261 LIST_INIT(conn->sent);
1263 struct cmsghdr *h= 0;
1264 if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1266 int status= xwaitpid(&connecting_child, "connect child (broken)");
1268 if (WIFEXITED(status)) {
1269 if (WEXITSTATUS(status) != 0 &&
1270 WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1271 WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1272 /* child already reported the problem */;
1274 if (e == OOP_EXCEPTION)
1275 warn("connect: connection child exited code %d but"
1276 " unexpected exception on fdpass socket",
1277 WEXITSTATUS(status));
1279 warn("connect: connection child exited code %d but"
1281 WEXITSTATUS(status), (int)rs);
1283 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1284 warn("connect: connection attempt timed out");
1286 report_child_status("connect", status);
1291 #define CHK(field, val) \
1292 if (h->cmsg_##field != val) { \
1293 die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1294 h->cmsg_##field, val); \
1297 CHK(level, SOL_SOCKET);
1298 CHK(type, SCM_RIGHTS);
1299 CHK(len, CMSG_LEN(sizeof(conn->fd)));
1302 if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1304 memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1307 pid_t got= waitpid(connecting_child, &status, 0);
1308 if (got==-1) sysdie("connect: real wait for child");
1309 assert(got == connecting_child);
1310 connecting_child= 0;
1312 if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1313 int es= WEXITSTATUS(status);
1315 case CONNCHILD_ESTATUS_STREAM: conn->stream= 1; break;
1316 case CONNCHILD_ESTATUS_NOSTREAM: conn->stream= 0; break;
1318 fatal("connect: child gave unexpected exit status %d", es);
1322 conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1324 loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1325 conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1326 if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1327 int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1329 &peer_rd_err, conn);
1330 if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1332 notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1333 LIST_ADDHEAD(conns, conn);
1335 connect_attempt_discard();
1336 check_assign_articles();
1337 return OOP_CONTINUE;
1341 connect_attempt_discard();
1342 return OOP_CONTINUE;
1345 static int allow_connect_start(void) {
1346 return conns.count < max_connections
1347 && !connecting_child
1351 static void connect_start(void) {
1352 assert(!connecting_child);
1353 assert(!connecting_fdpass_sock);
1355 info("starting connection attempt");
1358 int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1359 if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1361 connecting_child= xfork("connection");
1363 if (!connecting_child) {
1364 FILE *cn_from, *cn_to;
1365 char buf[NNTP_STRLEN+100];
1366 int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1368 xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1370 alarm(connection_setup_timeout);
1371 if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1375 unsigned char c= buf[l-1];
1376 if (!isspace(c)) break;
1377 if (c=='\n' || c=='\r') stripped=1;
1381 sysfatal("connect: connection attempt failed");
1384 fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1388 if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1389 sysfatal("connect: authentication failed");
1391 if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1393 sysfatal("connect: could not send MODE STREAM");
1394 buf[sizeof(buf)-1]= 0;
1395 if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1396 if (ferror(cn_from))
1397 sysfatal("connect: could not read response to MODE STREAM");
1399 fatal("connect: connection close in response to MODE STREAM");
1404 fatal("connect: response to MODE STREAM is too long: %.100s...",
1406 l--; if (l>0 && buf[l-1]=='\r') l--;
1409 int rcode= strtoul(buf,&ep,10);
1411 fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1415 exitstatus= CONNCHILD_ESTATUS_STREAM;
1421 warn("connect: unexpected response to MODE STREAM: %.50s",
1427 int fd= fileno(cn_from);
1429 PREP_DECL_MSG_CMSG(msg);
1430 struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1431 cmsg->cmsg_level= SOL_SOCKET;
1432 cmsg->cmsg_type= SCM_RIGHTS;
1433 cmsg->cmsg_len= CMSG_LEN(sizeof(fd));
1434 memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1436 msg.msg_controllen= cmsg->cmsg_len;
1437 r= sendmsg(socks[1], &msg, 0);
1438 if (r<0) sysdie("sendmsg failed for new connection");
1439 if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1444 xclose(socks[1], "connecting fdpass child's socket",0);
1445 connecting_fdpass_sock= socks[0];
1446 xsetnonblock(connecting_fdpass_sock, 1);
1447 on_fd_read_except(connecting_fdpass_sock, connchild_event);
1450 /*---------- assigning articles to conns, and transmitting ----------*/
1452 static Article *dequeue_from(int peek, InputFile *ipf) {
1454 if (peek) return LIST_HEAD(ipf->queue);
1456 Article *art= LIST_REMHEAD(ipf->queue);
1458 check_reading_pause_resume(ipf);
1462 static Article *dequeue(int peek) {
1464 art= dequeue_from(peek, flushing_input_file); if (art) return art;
1465 art= dequeue_from(peek, backlog_input_file); if (art) return art;
1466 art= dequeue_from(peek, main_input_file); if (art) return art;
1470 static void check_assign_articles(void) {
1476 int spare=0, inqueue=0;
1478 /* Find a connection to offer this article. We prefer a busy
1479 * connection to an idle one, provided it's not full. We take the
1480 * first (oldest) and since that's stable, it will mean we fill up
1481 * connections in order. That way if we have too many
1482 * connections, the spare ones will go away eventually.
1485 if (walk->quitting) continue;
1486 inqueue= walk->sent.count + walk->priority.count
1487 + walk->waiting.count;
1488 spare= walk->max_queue - inqueue;
1489 assert(inqueue <= max_queue_per_conn);
1491 if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1492 else if (spare>0) /*working*/ { use= walk; break; }
1495 if (!inqueue) use->since_activity= 0; /* reset idle counter */
1497 Article *art= dequeue(0);
1499 LIST_ADDTAIL(use->waiting, art);
1502 conn_maybe_write(use);
1503 } else if (allow_connect_start()) {
1504 until_connect= reconnect_delay_periods;
1513 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1514 conn_maybe_write(u);
1515 return OOP_CONTINUE;
1518 static void conn_maybe_write(Conn *conn) {
1520 conn_make_some_xmits(conn);
1522 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1526 void *rp= conn_write_some_xmits(conn);
1527 if (rp==OOP_CONTINUE) {
1528 loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1530 } else if (rp==OOP_HALT) {
1533 /* transmitted everything */
1540 /*---------- expiry, flow control and deferral ----------*/
1542 static void check_reading_pause_resume(InputFile *ipf) {
1543 if (ipf->queue.count >= max_queue_per_ipf)
1544 inputfile_reading_pause(ipf);
1546 inputfile_reading_resume(ipf);
1549 static void article_defer(Article *art /* not on a queue */, int whichcount) {
1551 if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1553 sysfatal("write to defer file %s",path_defer);
1554 article_done(art, whichcount);
1557 static int article_check_expired(Article *art /* must be queued, not conn */) {
1558 ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
1559 if (artdata) { SMfreearticle(artdata); return 0; }
1561 LIST_REMOVE(art->ipf->queue, art);
1563 art->ipf->count_nooffer_missing++;
1564 article_done(art,-1);
1568 static void inputfile_queue_check_expired(InputFile *ipf) {
1572 Article *art= LIST_HEAD(ipf->queue);
1573 int exp= article_check_expired(art);
1576 check_reading_pause_resume(ipf);
1579 static void article_autodefer(InputFile *ipf, Article *art) {
1581 article_defer(art,-1);
1584 static int has_article_in(const ArticleList *al, InputFile *ipf) {
1586 for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
1587 if (art->ipf == ipf) return 1;
1591 static void autodefer_input_file_articles(InputFile *ipf) {
1593 while ((art= LIST_REMHEAD(ipf->queue)))
1594 article_autodefer(ipf, art);
1597 static void autodefer_input_file(InputFile *ipf) {
1600 autodefer_input_file_articles(ipf);
1602 if (ipf->inprogress) {
1605 if (has_article_in(&walk->waiting, ipf) ||
1606 has_article_in(&walk->priority, ipf) ||
1607 has_article_in(&walk->sent, ipf))
1610 while (ipf->inprogress) {
1612 if (walk->quitting < 0) goto found;
1613 abort(); /* where are they ?? */
1616 connfail(walk, "connection is stuck or crawling,"
1617 " and we need to finish flush");
1618 autodefer_input_file_articles(ipf);
1622 check_reading_pause_resume(ipf);
1625 /*========== article transmission ==========*/
1627 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1628 XmitKind kind) { /* caller must then fill in details */
1629 struct iovec *v= &conn->xmit[conn->xmitu];
1630 XmitDetails *d= &conn->xmitd[conn->xmitu++];
1631 v->iov_base= (char*)data;
1637 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1638 xmit_core(conn,data,len, xk_Const);
1640 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1642 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1643 XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1647 static void xmit_free(XmitDetails *d) {
1649 case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1650 case xk_Const: break;
1655 static void *conn_write_some_xmits(Conn *conn) {
1657 * 0: nothing more to write, no need to call us again
1658 * OOP_CONTINUE: more to write but fd not writeable
1659 * OOP_HALT: disaster, have destroyed conn
1662 int count= conn->xmitu;
1663 if (!count) return 0;
1665 if (count > IOV_MAX) count= IOV_MAX;
1666 ssize_t rs= writev(conn->fd, conn->xmit, count);
1668 if (isewouldblock(errno)) return OOP_CONTINUE;
1669 connfail(conn, "write failed: %s", strerror(errno));
1675 for (done=0; rs && done<conn->xmitu; done++) {
1676 struct iovec *vp= &conn->xmit[done];
1677 XmitDetails *dp= &conn->xmitd[done];
1678 if (rs > vp->iov_len) {
1682 vp->iov_base= (char*)vp->iov_base + rs;
1686 int newu= conn->xmitu - done;
1687 memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
1688 memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1693 static void conn_make_some_xmits(Conn *conn) {
1695 if (conn->xmitu+5 > CONNIOVS)
1698 Article *art= LIST_REMHEAD(conn->priority);
1699 if (!art) art= LIST_REMHEAD(conn->waiting);
1702 if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1703 /* actually send it */
1705 ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1708 art->state == art_Unchecked ? art_Unsolicited :
1709 art->state == art_Wanted ? art_Wanted :
1712 if (!artdata) art->missing= 1;
1713 art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1717 XMIT_LITERAL("TAKETHIS ");
1718 xmit_noalloc(conn, art->messageid, art->midlen);
1719 XMIT_LITERAL("\r\n");
1720 xmit_artbody(conn, artdata);
1722 article_done(art, -1);
1726 /* we got 235 from IHAVE */
1728 xmit_artbody(conn, artdata);
1730 XMIT_LITERAL(".\r\n");
1734 LIST_ADDTAIL(conn->sent, art);
1740 XMIT_LITERAL("CHECK ");
1742 XMIT_LITERAL("IHAVE ");
1743 xmit_noalloc(conn, art->messageid, art->midlen);
1744 XMIT_LITERAL("\r\n");
1746 assert(art->state == art_Unchecked);
1747 art->ipf->counts[art->state][RC_sent]++;
1748 LIST_ADDTAIL(conn->sent, art);
1753 /*========== handling responses from peer ==========*/
1755 static const oop_rd_style peer_rd_style= {
1756 OOP_RD_DELIM_STRIP, '\n',
1758 OOP_RD_SHORTREC_FORBID
1761 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1762 const char *errmsg, int errnoval,
1763 const char *data, size_t recsz, void *conn_v) {
1765 connfail(conn, "error receiving from peer: %s", errmsg);
1766 return OOP_CONTINUE;
1769 static Article *article_reply_check(Conn *conn, const char *response,
1770 int code_indicates_streaming,
1772 /* 1:yes, -1:no, 0:dontcare */,
1773 const char *sanitised_response) {
1774 Article *art= LIST_HEAD(conn->sent);
1778 "peer gave unexpected response when no commands outstanding: %s",
1779 sanitised_response);
1783 if (code_indicates_streaming) {
1784 assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1785 if (!conn->stream) {
1786 connfail(conn, "peer gave streaming response code "
1787 " to IHAVE or subsequent body: %s", sanitised_response);
1790 const char *got_mid= response+4;
1791 int got_midlen= strcspn(got_mid, " \n\r");
1792 if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1793 connfail(conn, "peer gave streaming response with syntactically invalid"
1794 " messageid: %s", sanitised_response);
1797 if (got_midlen != art->midlen ||
1798 memcmp(got_mid, art->messageid, got_midlen)) {
1799 connfail(conn, "peer gave streaming response code to wrong article -"
1800 " probable synchronisation problem; we offered: %s;"
1802 art->messageid, sanitised_response);
1807 connfail(conn, "peer gave non-streaming response code to"
1808 " CHECK/TAKETHIS: %s", sanitised_response);
1813 if (must_have_sent>0 && art->state < art_Wanted) {
1814 connfail(conn, "peer says article accepted but"
1815 " we had not sent the body: %s", sanitised_response);
1818 if (must_have_sent<0 && art->state >= art_Wanted) {
1819 connfail(conn, "peer says please sent the article but we just did: %s",
1820 sanitised_response);
1824 Article *art_again= LIST_REMHEAD(conn->sent);
1825 assert(art_again == art);
1829 static void update_nocheck(int accepted) {
1830 accept_proportion *= nocheck_decay;
1831 accept_proportion += accepted * (1.0 - nocheck_decay);
1832 int new_nocheck= accept_proportion >= nocheck_thresh;
1833 if (new_nocheck && !nocheck_reported) {
1834 notice("entering nocheck mode for the first time");
1835 nocheck_reported= 1;
1836 } else if (new_nocheck != nocheck) {
1837 debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1839 nocheck= new_nocheck;
1842 static void article_done(Article *art, int whichcount) {
1843 if (whichcount>=0 && !art->missing)
1844 art->ipf->counts[art->state][whichcount]++;
1846 if (whichcount == RC_accepted) update_nocheck(1);
1847 else if (whichcount == RC_unwanted) update_nocheck(0);
1849 InputFile *ipf= art->ipf;
1851 while (art->blanklen) {
1852 static const char spaces[]=
1862 int w= art->blanklen; if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1863 int r= pwrite(ipf->fd, spaces, w, art->offset);
1865 if (errno==EINTR) continue;
1866 sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1867 art->messageid, art->blanklen,
1868 (unsigned long)art->offset, ipf->path);
1870 assert(r>=0 && r<=w);
1876 assert(ipf->inprogress >= 0);
1879 if (!ipf->inprogress && ipf != main_input_file)
1880 queue_check_input_done();
1883 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1884 const char *errmsg, int errnoval,
1885 const char *data, size_t recsz, void *conn_v) {
1888 if (ev == OOP_RD_EOF) {
1889 connfail(conn, "unexpected EOF from peer");
1890 return OOP_CONTINUE;
1892 assert(ev == OOP_RD_OK);
1894 char *sani= sanitise(data,-1);
1897 unsigned long code= strtoul(data, &ep, 10);
1898 if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1899 connfail(conn, "badly formatted response from peer: %s", sani);
1900 return OOP_CONTINUE;
1904 conn->waiting.count ||
1905 conn->priority.count ||
1909 if (conn->quitting) {
1910 if (code!=205 && code!=503) {
1911 connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1913 notice("C%d idle connection closed by us", conn->fd);
1915 LIST_REMOVE(conns,conn);
1918 return OOP_CONTINUE;
1921 conn->since_activity= 0;
1924 #define GET_ARTICLE(musthavesent) do{ \
1925 art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1926 if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */ \
1929 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \
1930 code_streaming= (streaming); \
1931 GET_ARTICLE(musthavesent); \
1932 article_done(art, RC_##how); \
1936 #define PEERBADMSG(m) do { \
1937 connfail(conn, m ": %s", sani); return OOP_CONTINUE; \
1940 int code_streaming= 0;
1944 default: PEERBADMSG("peer sent unexpected message");
1948 PEERBADMSG("peer timed us out or stopped accepting articles");
1950 notice("C%d idle connection closed by peer", conn->fd);
1951 LIST_REMOVE(conns,conn);
1953 return OOP_CONTINUE;
1955 case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1956 case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1958 case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1959 case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1961 case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1962 case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1964 case 238: /* CHECK says send it */
1966 case 335: /* IHAVE says send it */
1968 assert(art->state == art_Unchecked);
1969 art->ipf->counts[art->state][RC_accepted]++;
1970 art->state= art_Wanted;
1971 LIST_ADDTAIL(conn->priority, art);
1974 case 431: /* CHECK or TAKETHIS says try later */
1976 case 436: /* IHAVE says try later */
1978 article_defer(art, RC_deferred);
1984 conn_maybe_write(conn);
1985 check_assign_articles();
1986 return OOP_CONTINUE;
1990 /*========== monitoring of input files ==========*/
1992 static void feedfile_eof(InputFile *ipf) {
1993 assert(ipf != main_input_file); /* promised by tailing_try_read */
1994 inputfile_reading_stop(ipf);
1996 if (ipf == flushing_input_file) {
1997 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1998 if (main_input_file) inputfile_reading_start(main_input_file);
1999 statemc_check_flushing_done();
2000 } else if (ipf == backlog_input_file) {
2001 statemc_check_backlog_done();
2003 abort(); /* supposed to wait rather than get EOF on main input file */
2007 static InputFile *open_input_file(const char *path) {
2008 int fd= open(path, O_RDWR);
2010 if (errno==ENOENT) return 0;
2011 sysfatal("unable to open input file %s", path);
2015 InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
2016 memset(ipf,0,sizeof(*ipf));
2020 LIST_INIT(ipf->queue);
2021 strcpy(ipf->path, path);
2026 static void close_input_file(InputFile *ipf) { /* does not free */
2027 assert(!ipf->readable_callback); /* must have had ->on_cancel */
2028 assert(!ipf->filemon); /* must have had inputfile_reading_stop */
2029 assert(!ipf->rd); /* must have had inputfile_reading_stop */
2030 assert(!ipf->inprogress); /* no dangling pointers pointing here */
2031 xclose_perhaps(&ipf->fd, "input file ", ipf->path);
2035 /*---------- dealing with articles read in the input file ----------*/
2037 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
2038 const char *data, const char *how) {
2039 warn("corrupted file: %s, offset %lu: %s: in %s",
2040 ipf->path, (unsigned long)offset, how, sanitise(data,-1));
2041 ipf->readcount_err++;
2042 if (ipf->readcount_err > max_bad_data_initial +
2043 (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
2044 die("too much garbage in input file! (%d errs, %d ok, %d blank)",
2045 ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
2046 return OOP_CONTINUE;
2049 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
2050 oop_rd_event ev, const char *errmsg,
2051 int errnoval, const char *data, size_t recsz,
2053 InputFile *ipf= ipf_v;
2054 assert(ev == OOP_RD_SYSTEM);
2056 sysdie("error reading input file: %s, offset %lu",
2057 ipf->path, (unsigned long)ipf->offset);
2060 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
2061 oop_rd_event ev, const char *errmsg,
2062 int errnoval, const char *data, size_t recsz,
2064 InputFile *ipf= ipf_v;
2066 char tokentextbuf[sizeof(TOKEN)*2+3];
2068 if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
2070 off_t old_offset= ipf->offset;
2071 ipf->offset += recsz + !!(ev == OOP_RD_OK);
2073 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
2075 if (ev==OOP_RD_PARTREC)
2076 feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
2077 /* but process it anyway */
2079 if (ipf->skippinglong) {
2080 if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
2081 return OOP_CONTINUE;
2083 if (ev==OOP_RD_LONG) {
2084 ipf->skippinglong= 1;
2085 X_BAD_DATA("overly long line");
2088 if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
2089 if (!recsz) X_BAD_DATA("empty line");
2092 if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
2093 ipf->readcount_blank++;
2094 return OOP_CONTINUE;
2097 char *space= strchr(data,' ');
2098 int tokenlen= space-data;
2099 int midlen= (int)recsz-tokenlen-1;
2100 if (midlen <= 2) X_BAD_DATA("no room for messageid");
2101 if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
2103 if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
2104 memcpy(tokentextbuf, data, tokenlen);
2105 tokentextbuf[tokenlen]= 0;
2106 if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
2108 ipf->readcount_ok++;
2110 art= xmalloc(sizeof(*art) - 1 + midlen + 1);
2111 memset(art,0,sizeof(*art));
2112 art->state= art_Unchecked;
2113 art->midlen= midlen;
2114 art->ipf= ipf; ipf->inprogress++;
2115 art->token= TextToToken(tokentextbuf);
2116 art->offset= old_offset;
2117 art->blanklen= recsz;
2118 strcpy(art->messageid, space+1);
2119 LIST_ADDTAIL(ipf->queue, art);
2121 if (ipf->autodefer >= 0)
2122 article_autodefer(ipf, art);
2123 else if (ipf==backlog_input_file)
2124 article_check_expired(art);
2126 if (sms==sm_NORMAL && ipf==main_input_file &&
2127 ipf->offset >= target_max_feedfile_size)
2128 statemc_start_flush("feed file size");
2130 check_assign_articles(); /* may destroy conn but that's OK */
2131 check_reading_pause_resume(ipf);
2132 return OOP_CONTINUE;
2135 /*========== tailing input file ==========*/
2137 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
2139 InputFile *ipf= user;
2140 return ipf->readable_callback(loop, &ipf->readable,
2141 ipf->readable_callback_user);
2144 static void tailing_on_cancel(struct oop_readable *rable) {
2145 InputFile *ipf= (void*)rable;
2147 if (ipf->filemon) filemon_stop(ipf);
2148 loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2149 ipf->readable_callback= 0;
2152 static void tailing_queue_readable(InputFile *ipf) {
2153 /* lifetime of ipf here is OK because destruction will cause
2154 * on_cancel which will cancel this callback */
2155 loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2158 static int tailing_on_readable(struct oop_readable *rable,
2159 oop_readable_call *cb, void *user) {
2160 InputFile *ipf= (void*)rable;
2162 tailing_on_cancel(rable);
2163 ipf->readable_callback= cb;
2164 ipf->readable_callback_user= user;
2167 tailing_queue_readable(ipf);
2171 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2173 InputFile *ipf= (void*)rable;
2175 ssize_t r= read(ipf->fd, buffer, length);
2177 if (errno==EINTR) continue;
2181 if (ipf==main_input_file) {
2184 } else if (ipf==flushing_input_file) {
2186 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2187 } else if (ipf==backlog_input_file) {
2193 tailing_queue_readable(ipf);
2198 /*---------- filemon implemented with inotify ----------*/
2200 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2201 #define HAVE_FILEMON
2203 #include <sys/inotify.h>
2205 static int filemon_inotify_fd;
2206 static int filemon_inotify_wdmax;
2207 static InputFile **filemon_inotify_wd2ipf;
2209 struct Filemon_Perfile {
2213 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2214 int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2215 if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2217 if (wd >= filemon_inotify_wdmax) {
2219 filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2220 sizeof(*filemon_inotify_wd2ipf) * newmax);
2221 memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2222 sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2223 filemon_inotify_wdmax= newmax;
2226 assert(!filemon_inotify_wd2ipf[wd]);
2227 filemon_inotify_wd2ipf[wd]= ipf;
2229 debug("filemon inotify startfile %p wd=%d wdmax=%d",
2230 ipf, wd, filemon_inotify_wdmax);
2235 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2237 debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2238 int r= inotify_rm_watch(filemon_inotify_fd, wd);
2239 if (r) sysdie("inotify_rm_watch");
2240 filemon_inotify_wd2ipf[wd]= 0;
2243 static void *filemon_inotify_readable(oop_source *lp, int fd,
2244 oop_event e, void *u) {
2245 struct inotify_event iev;
2247 int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2249 if (isewouldblock(errno)) break;
2250 sysdie("read from inotify master");
2251 } else if (r==sizeof(iev)) {
2252 assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2254 die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2256 InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2257 debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2258 filemon_callback(ipf);
2260 return OOP_CONTINUE;
2263 static int filemon_method_init(void) {
2264 filemon_inotify_fd= inotify_init();
2265 if (filemon_inotify_fd<0) {
2266 syswarn("filemon/inotify: inotify_init failed");
2269 xsetnonblock(filemon_inotify_fd, 1);
2270 loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2272 debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2276 static void filemon_method_dump_info(FILE *f) {
2278 fprintf(f,"inotify");
2279 DUMPV("%d",,filemon_inotify_fd);
2280 DUMPV("%d",,filemon_inotify_wdmax);
2281 for (i=0; i<filemon_inotify_wdmax; i++)
2282 fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i]);
2285 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2287 /*---------- filemon dummy implementation ----------*/
2289 #if !defined(HAVE_FILEMON)
2291 struct Filemon_Perfile { int dummy; };
2293 static int filemon_method_init(void) {
2294 warn("filemon/dummy: no filemon method compiled in");
2297 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2298 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2299 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2301 #endif /* !HAVE_FILEMON */
2303 /*---------- filemon generic interface ----------*/
2305 static void filemon_start(InputFile *ipf) {
2306 assert(!ipf->filemon);
2309 filemon_method_startfile(ipf, ipf->filemon);
2312 static void filemon_stop(InputFile *ipf) {
2313 if (!ipf->filemon) return;
2314 filemon_method_stopfile(ipf, ipf->filemon);
2319 static void filemon_callback(InputFile *ipf) {
2320 if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2321 ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2324 /*---------- interface to start and stop an input file ----------*/
2326 static const oop_rd_style feedfile_rdstyle= {
2327 OOP_RD_DELIM_STRIP, '\n',
2329 OOP_RD_SHORTREC_LONG,
2332 static void inputfile_reading_resume(InputFile *ipf) {
2333 if (!ipf->rd) return;
2334 if (!ipf->paused) return;
2336 int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2337 feedfile_got_article,ipf, feedfile_read_err, ipf);
2338 if (r) sysdie("unable start reading feedfile %s",ipf->path);
2343 static void inputfile_reading_pause(InputFile *ipf) {
2344 if (!ipf->rd) return;
2345 if (ipf->paused) return;
2346 oop_rd_cancel(ipf->rd);
2350 static void inputfile_reading_start(InputFile *ipf) {
2352 ipf->readable.on_readable= tailing_on_readable;
2353 ipf->readable.on_cancel= tailing_on_cancel;
2354 ipf->readable.try_read= tailing_try_read;
2355 ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2356 ipf->readable.delete_kill= 0;
2358 ipf->readable_callback= 0;
2359 ipf->readable_callback_user= 0;
2361 ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2365 inputfile_reading_resume(ipf);
2368 static void inputfile_reading_stop(InputFile *ipf) {
2370 inputfile_reading_pause(ipf);
2371 oop_rd_delete(ipf->rd);
2373 assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2377 /*========== interaction with innd - state machine ==========*/
2379 /* See official state diagram at top of file. We implement
2390 |`---------------------------------------------------.
2392 |`---------------- - - - |
2393 D ENOENT | D EXISTS see OVERALL STATES diagram |
2394 | for full startup logic |
2397 | ============ try to |
2403 | | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT |
2404 ^ | hardlink F to D |
2407 | | our handle onto F is now onto D |
2410 | |<-------------------<---------------------<---------+
2412 | | spawn inndcomm flush |
2414 | ================== |
2415 | FLUSHING[-ABSENT] |
2417 | main D tail/none |
2418 | ================== |
2420 | | INNDCOMM FLUSH FAILS ^
2421 | |`----------------------->----------. |
2423 | | NO SUCH SITE V |
2424 ^ |`--------------->----. ==================== |
2425 | | \ FLUSHFAILED[-ABSENT] |
2427 | | FLUSH OK \ main D tail/none |
2428 | | open F \ ==================== |
2430 | | \ | TIME TO RETRY |
2431 | |`------->----. ,---<---'\ `----------------'
2432 | | D NONE | | D NONE `----.
2434 | ============= V V ============
2435 | SEPARATED-1 | | DROPPING-1
2436 | flsh->rd!=0 | | flsh->rd!=0
2437 | [Separated] | | [Dropping]
2438 | main F idle | | main none
2439 | flsh D tail | | flsh D tail
2440 | ============= | | ============
2442 ^ | EOF ON D | | defer | EOF ON D
2444 | =============== | | ===============
2445 | SEPARATED-2 | | DROPPING-2
2446 | flsh->rd==0 | V flsh->rd==0
2447 | [Finishing] | | [Dropping]
2448 | main F tail | `. main none
2449 | flsh D closed | `. flsh D closed
2450 | =============== V `. ===============
2452 | | ALL D PROCESSED `. | ALL D PROCESSED
2453 | V install defer as backlog `. | install defer
2454 ^ | close D `. | close D
2455 | | unlink D `. | unlink D
2458 `----------' ==============
2478 static void startup_set_input_file(InputFile *f) {
2479 assert(!main_input_file);
2481 inputfile_reading_start(f);
2484 static void statemc_lock(void) {
2486 struct stat stab, stabf;
2489 lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2490 if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2493 memset(&fl,0,sizeof(fl));
2495 fl.l_whence= SEEK_SET;
2496 int r= fcntl(lockfd, F_SETLK, &fl);
2498 if (errno==EACCES || isewouldblock(errno)) {
2499 if (quiet_multiple) exit(0);
2500 fatal("another duct holds the lockfile");
2502 sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2505 xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2507 xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2509 if (!lock_noent && samefile(&stab, &stabf))
2512 xclose(lockfd, "stale lockfile ", path_lock);
2515 FILE *lockfile= fdopen(lockfd, "w");
2516 if (!lockfile) sysdie("fdopen lockfile");
2518 int r= ftruncate(lockfd, 0);
2519 if (r) sysdie("truncate lockfile to write new info");
2521 if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2522 (unsigned long)self_pid,
2523 sitename, feedfile, remote_host) == EOF ||
2525 sysfatal("write info to lockfile %s", path_lock);
2527 debug("startup: locked");
2530 static void statemc_init(void) {
2531 struct stat stabdefer;
2533 search_backlog_file();
2536 xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2538 debug("startup: ductdefer ENOENT");
2540 debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2541 switch (stabdefer.st_nlink==1) {
2543 open_defer(); /* so that we will later close it and rename it */
2546 xunlink(path_defer, "stale defer file link"
2547 " (presumably hardlink to backlog file)");
2550 die("defer file %s has unexpected link count %d",
2551 path_defer, stabdefer.st_nlink);
2555 struct stat stab_f, stab_d;
2558 InputFile *file_d= open_input_file(path_flushing);
2559 if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2561 xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2563 if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2564 debug("startup: F==D => Hardlinked");
2565 xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2570 debug("startup: F ENOENT => Moved");
2571 if (file_d) startup_set_input_file(file_d);
2572 spawn_inndcomm_flush("feedfile missing at startup");
2573 /* => Flushing, sms:=FLUSHING */
2576 debug("startup: F!=D => Separated");
2577 startup_set_input_file(file_d);
2578 flushing_input_file= main_input_file;
2579 main_input_file= open_input_file(feedfile);
2580 if (!main_input_file) die("feedfile vanished during startup");
2581 SMS(SEPARATED, max_separated_periods,
2582 "found both old and current feed files");
2584 debug("startup: F exists, D ENOENT => Normal");
2585 InputFile *file_f= open_input_file(feedfile);
2586 if (!file_f) die("feed file vanished during startup");
2587 startup_set_input_file(file_f);
2588 SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2593 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2594 assert(sms == sm_NORMAL);
2596 debug("starting flush (%s) (%lu >?= %lu) (%d)",
2598 (unsigned long)(main_input_file ? main_input_file->offset : 0),
2599 (unsigned long)target_max_feedfile_size,
2602 int r= link(feedfile, path_flushing);
2603 if (r) sysfatal("link feedfile %s to flushing file %s",
2604 feedfile, path_flushing);
2607 xunlink(feedfile, "old feedfile link");
2610 spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2613 static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
2617 statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2620 case sm_FLUSHFAILED:
2621 spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2626 warn("took too long to complete old feedfile after flush, autodeferring");
2627 assert(flushing_input_file);
2628 autodefer_input_file(flushing_input_file);
2636 static void statemc_period_poll(void) {
2637 if (!until_flush) return;
2639 assert(until_flush>=0);
2641 if (until_flush) return;
2642 int ok= trigger_flush_ok();
2646 static int inputfile_is_done(InputFile *ipf) {
2648 if (ipf->inprogress) return 0; /* new article in the meantime */
2649 if (ipf->rd) return 0; /* not had EOF */
2653 static void notice_processed(InputFile *ipf, int completed,
2654 const char *what, const char *spec) {
2655 if (!ipf) return; /* allows preterminate to be lazy */
2657 #define RCI_NOTHING(x) /* nothing */
2658 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2659 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2661 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2663 char *inprog= completed
2664 ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2665 : xasprintf(" inprogress=%ld", ipf->inprogress);
2666 char *autodefer= ipf->autodefer >= 0
2667 ? xasprintf(" autodeferred=%ld", ipf->autodefer)
2668 : xasprintf("%s","");
2670 info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
2671 " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2672 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2674 completed?"completed":"processed", what, spec,
2675 ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
2676 inprog, autodefer, ipf->count_nooffer_missing,
2677 CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2678 , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2679 CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2680 , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2681 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
2690 static void statemc_check_backlog_done(void) {
2691 InputFile *ipf= backlog_input_file;
2692 if (!inputfile_is_done(ipf)) return;
2694 const char *slash= strrchr(ipf->path, '/');
2695 const char *leaf= slash ? slash+1 : ipf->path;
2696 const char *under= strchr(slash, '_');
2697 const char *rest= under ? under+1 : leaf;
2698 if (!strncmp(rest,"backlog",7)) rest += 7;
2699 notice_processed(ipf,1,"backlog ",rest);
2701 close_input_file(ipf);
2702 if (unlink(ipf->path)) {
2703 if (errno != ENOENT)
2704 sysdie("could not unlink processed backlog file %s", ipf->path);
2705 warn("backlog file %s vanished while we were reading it"
2706 " so we couldn't remove it (but it's done now, anyway)",
2710 backlog_input_file= 0;
2711 search_backlog_file();
2715 static void statemc_check_flushing_done(void) {
2716 InputFile *ipf= flushing_input_file;
2717 if (!inputfile_is_done(ipf)) return;
2719 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2721 notice_processed(ipf,1,"feedfile","");
2725 xunlink(path_flushing, "old flushing file");
2727 close_input_file(flushing_input_file);
2728 free(flushing_input_file);
2729 flushing_input_file= 0;
2731 if (sms==sm_SEPARATED) {
2732 notice("flush complete");
2733 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2734 } else if (sms==sm_DROPPING) {
2735 SMS(DROPPED, max_separated_periods, "old flush complete");
2736 search_backlog_file();
2737 notice("feed dropped, but will continue until backlog is finished");
2741 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2743 assert(!inputfile_is_done(main_input_file));
2744 statemc_check_flushing_done();
2745 statemc_check_backlog_done();
2746 return OOP_CONTINUE;
2749 static void queue_check_input_done(void) {
2750 loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2753 static void statemc_setstate(StateMachineState newsms, int periods,
2754 const char *forlog, const char *why) {
2756 until_flush= periods;
2758 const char *xtra= "";
2761 case sm_FLUSHFAILED:
2762 if (!main_input_file) xtra= "-ABSENT";
2766 xtra= flushing_input_file->rd ? "-1" : "-2";
2772 info("state %s%s[%d] %s",forlog,xtra,periods,why);
2774 info("state %s%s %s",forlog,xtra,why);
2778 /*---------- defer and backlog files ----------*/
2780 static void open_defer(void) {
2785 defer= fopen(path_defer, "a+");
2786 if (!defer) sysfatal("could not open defer file %s", path_defer);
2788 /* truncate away any half-written records */
2790 xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2792 if (stab.st_size > LONG_MAX)
2793 die("defer file %s size is far too large", path_defer);
2798 long orgsize= stab.st_size;
2799 long truncto= stab.st_size;
2801 if (!truncto) break; /* was only (if anything) one half-truncated record */
2802 if (fseek(defer, truncto-1, SEEK_SET) < 0)
2803 sysdie("seek in defer file %s while truncating partial", path_defer);
2808 sysdie("failed read from defer file %s", path_defer);
2810 die("defer file %s shrank while we were checking it!", path_defer);
2816 if (stab.st_size != truncto) {
2817 warn("truncating half-record at end of defer file %s -"
2818 " shrinking by %ld bytes from %ld to %ld",
2819 path_defer, orgsize - truncto, orgsize, truncto);
2822 sysfatal("could not flush defer file %s", path_defer);
2823 if (ftruncate(fileno(defer), truncto))
2824 sysdie("could not truncate defer file %s", path_defer);
2827 info("continuing existing defer file %s (%ld bytes)",
2828 path_defer, orgsize);
2830 if (fseek(defer, truncto, SEEK_SET))
2831 sysdie("could not seek to new end of defer file %s", path_defer);
2834 static void close_defer(void) {
2839 xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2841 if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2844 time_t now= xtime();
2846 char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2848 (unsigned long)stab.st_ino);
2849 if (link(path_defer, backlog))
2850 sysfatal("could not install defer file %s as backlog file %s",
2851 path_defer, backlog);
2852 if (unlink(path_defer))
2853 sysdie("could not unlink old defer link %s to backlog file %s",
2854 path_defer, backlog);
2858 if (until_backlog_nextscan < 0 ||
2859 until_backlog_nextscan > backlog_retry_minperiods + 1)
2860 until_backlog_nextscan= backlog_retry_minperiods + 1;
2863 static void poll_backlog_file(void) {
2864 if (until_backlog_nextscan < 0) return;
2865 if (until_backlog_nextscan-- > 0) return;
2866 search_backlog_file();
2869 static void search_backlog_file(void) {
2870 /* returns non-0 iff there are any backlog files */
2875 const char *oldest_path=0;
2876 time_t oldest_mtime=0, now;
2878 if (backlog_input_file) return;
2882 r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2886 sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2888 fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2890 for (i=0; i<gl.gl_pathc; i++) {
2891 const char *path= gl.gl_pathv[i];
2893 if (strchr(path,'#') || strchr(path,'~')) {
2894 debug("backlog file search skipping %s", path);
2897 r= stat(path, &stab);
2899 syswarn("failed to stat backlog file %s", path);
2902 if (!S_ISREG(stab.st_mode)) {
2903 warn("backlog file %s is not a plain file (or link to one)", path);
2906 if (!oldest_path || stab.st_mtime < oldest_mtime) {
2908 oldest_mtime= stab.st_mtime;
2911 case GLOB_NOMATCH: /* fall through */
2914 sysdie("glob expansion of backlog pattern %s gave unexpected"
2915 " nonzero (error?) return value %d", globpat_backlog, r);
2919 debug("backlog scan: none");
2921 if (sms==sm_DROPPED) {
2923 notice("feed dropped and our work is complete");
2925 int r= unlink(path_control);
2926 if (r && errno!=ENOENT)
2927 syswarn("failed to remove control symlink for old feed");
2929 xunlink(path_lock, "lockfile for old feed");
2932 until_backlog_nextscan= backlog_spontrescan_periods;
2937 double age= difftime(now, oldest_mtime);
2938 long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2940 if (age_deficiency <= 0) {
2941 debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2942 age, age_deficiency, oldest_path);
2944 backlog_input_file= open_input_file(oldest_path);
2945 if (!backlog_input_file) {
2946 warn("backlog file %s vanished as we opened it", oldest_path);
2950 inputfile_reading_start(backlog_input_file);
2951 until_backlog_nextscan= -1;
2955 until_backlog_nextscan= age_deficiency / period_seconds;
2957 if (backlog_spontrescan_periods >= 0 &&
2958 until_backlog_nextscan > backlog_spontrescan_periods)
2959 until_backlog_nextscan= backlog_spontrescan_periods;
2961 debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2962 age, age_deficiency, until_backlog_nextscan, oldest_path);
2969 /*---------- shutdown and signal handling ----------*/
2971 static void preterminate(void) {
2972 if (in_child) return;
2973 notice_processed(main_input_file,0,"feedfile","");
2974 notice_processed(flushing_input_file,0,"flushing","");
2975 if (backlog_input_file)
2976 notice_processed(backlog_input_file,0, "backlog file ",
2977 backlog_input_file->path);
2980 static int signal_self_pipe[2];
2981 static sig_atomic_t terminate_sig_flag;
2983 static void raise_default(int signo) {
2984 xsigsetdefault(signo);
2989 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2990 assert(fd=signal_self_pipe[0]);
2992 int r= read(signal_self_pipe[0], buf, sizeof(buf));
2993 if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2994 if (r==0) die("eof on signal self pipe");
2995 if (terminate_sig_flag) {
2997 notice("terminating (%s)", strsignal(terminate_sig_flag));
2998 raise_default(terminate_sig_flag);
3000 return OOP_CONTINUE;
3003 static void sigarrived_handler(int signum) {
3008 if (!terminate_sig_flag) terminate_sig_flag= signum;
3013 write(signal_self_pipe[1],&x,1);
3016 static void init_signals(void) {
3017 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
3018 sysdie("could not ignore SIGPIPE");
3020 if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
3022 xsetnonblock(signal_self_pipe[0],1);
3023 xsetnonblock(signal_self_pipe[1],1);
3025 struct sigaction sa;
3026 memset(&sa,0,sizeof(sa));
3027 sa.sa_handler= sigarrived_handler;
3028 sa.sa_flags= SA_RESTART;
3029 xsigaction(SIGTERM,&sa);
3030 xsigaction(SIGINT,&sa);
3032 on_fd_read_except(signal_self_pipe[0], sigarrived_event);
3035 /*========== flushing the feed ==========*/
3037 static pid_t inndcomm_child;
3038 static int inndcomm_sentinel_fd;
3040 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
3041 assert(inndcomm_child);
3042 assert(fd == inndcomm_sentinel_fd);
3043 int status= xwaitpid(&inndcomm_child, "inndcomm");
3046 cancel_fd_read_except(fd);
3047 xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
3048 inndcomm_sentinel_fd= 0;
3050 assert(!flushing_input_file);
3052 if (WIFEXITED(status)) {
3053 switch (WEXITSTATUS(status)) {
3055 case INNDCOMMCHILD_ESTATUS_FAIL:
3058 case INNDCOMMCHILD_ESTATUS_NONESUCH:
3059 notice("feed has been dropped by innd, finishing up");
3060 flushing_input_file= main_input_file;
3061 tailing_queue_readable(flushing_input_file);
3062 /* we probably previously returned EAGAIN from our fake read method
3063 * when in fact we were at EOF, so signal another readable event
3064 * so we actually see the EOF */
3068 if (flushing_input_file) {
3069 SMS(DROPPING, max_separated_periods,
3070 "feed dropped by innd, but must finish last flush");
3073 SMS(DROPPED, 0, "feed dropped by innd");
3074 search_backlog_file();
3076 return OOP_CONTINUE;
3080 flushing_input_file= main_input_file;
3081 tailing_queue_readable(flushing_input_file);
3083 main_input_file= open_input_file(feedfile);
3084 if (!main_input_file)
3085 die("flush succeeded but feedfile %s does not exist!", feedfile);
3087 if (flushing_input_file) {
3088 SMS(SEPARATED, max_separated_periods, "recovery flush complete");
3091 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
3093 return OOP_CONTINUE;
3096 goto unexpected_exitstatus;
3099 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
3100 warn("flush timed out trying to talk to innd");
3103 unexpected_exitstatus:
3104 report_child_status("inndcomm child", status);
3108 SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
3109 return OOP_CONTINUE;
3112 static void inndcommfail(const char *what) {
3113 syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
3114 exit(INNDCOMMCHILD_ESTATUS_FAIL);
3117 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
3120 notice("flushing %s",why);
3122 assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
3123 assert(!inndcomm_child);
3124 assert(!inndcomm_sentinel_fd);
3126 if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
3128 inndcomm_child= xfork("inndcomm child");
3130 if (!inndcomm_child) {
3131 const char *flushargv[2]= { sitename, 0 };
3135 xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
3136 /* parent spots the autoclose of pipefds[1] when we die or exit */
3138 if (simulate_flush>=0) {
3139 warn("SIMULATING flush child status %d", simulate_flush);
3140 if (simulate_flush>128) raise(simulate_flush-128);
3141 else exit(simulate_flush);
3144 alarm(inndcomm_flush_timeout);
3145 r= ICCopen(); if (r) inndcommfail("connect");
3146 r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit");
3147 if (!r) exit(0); /* yay! */
3149 if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
3150 syswarn("innd ctlinnd flush failed: innd said %s", reply);
3151 exit(INNDCOMMCHILD_ESTATUS_FAIL);
3156 xclose(pipefds[1], "inndcomm sentinel child's end",0);
3157 inndcomm_sentinel_fd= pipefds[0];
3158 assert(inndcomm_sentinel_fd);
3159 on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
3161 SMS(FLUSHING, 0, why);
3164 /*========== main program ==========*/
3166 static void postfork_inputfile(InputFile *ipf) {
3168 xclose(ipf->fd, "(in child) input file ", ipf->path);
3171 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
3172 /* we have no stdio streams that are buffered long-term */
3174 if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
3177 static void postfork(void) {
3180 xsigsetdefault(SIGTERM);
3181 xsigsetdefault(SIGINT);
3182 xsigsetdefault(SIGPIPE);
3183 if (terminate_sig_flag) raise(terminate_sig_flag);
3185 postfork_inputfile(main_input_file);
3186 postfork_inputfile(flushing_input_file);
3190 conn_closefd(conn,"(in child) ");
3192 postfork_stdio(defer, "defer file ", path_defer);
3195 typedef struct Every Every;
3197 struct timeval interval;
3202 static void every_schedule(Every *e, struct timeval base);
3204 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3207 if (!e->fixed_rate) xgettimeofday(&base);
3208 every_schedule(e, base);
3209 return OOP_CONTINUE;
3212 static void every_schedule(Every *e, struct timeval base) {
3213 struct timeval when;
3214 timeradd(&base, &e->interval, &when);
3215 loop->on_time(loop, when, every_happens, e);
3218 static void every(int interval, int fixed_rate, void (*f)(void)) {
3219 NEW_DECL(Every *,e);
3220 e->interval.tv_sec= interval;
3221 e->interval.tv_usec= 0;
3222 e->fixed_rate= fixed_rate;
3225 xgettimeofday(&now);
3226 every_schedule(e, now);
3229 static void filepoll(void) {
3230 filemon_callback(main_input_file);
3231 filemon_callback(flushing_input_file);
3234 static char *debug_report_ipf(InputFile *ipf) {
3235 if (!ipf) return xasprintf("none");
3237 const char *slash= strrchr(ipf->path,'/');
3238 const char *path= slash ? slash+1 : ipf->path;
3240 return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
3242 ipf->queue.count, ipf->inprogress, ipf->autodefer,
3243 (long)ipf->offset, ipf->fd,
3244 ipf->rd ? "" : ",!rd",
3245 ipf->skippinglong ? "*skiplong" : "",
3246 ipf->rd && ipf->paused ? "*paused" : "");
3249 static void period(void) {
3250 char *dipf_main= debug_report_ipf(main_input_file);
3251 char *dipf_flushing= debug_report_ipf(flushing_input_file);
3252 char *dipf_backlog= debug_report_ipf(backlog_input_file);
3255 " sms=%s[%d] conns=%d until_connect=%d"
3256 " input_files main:%s flushing:%s backlog:%s[%d]"
3257 " children connecting=%ld inndcomm=%ld"
3259 sms_names[sms], until_flush, conns.count, until_connect,
3260 dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan,
3261 (long)connecting_child, (long)inndcomm_child
3265 free(dipf_flushing);
3268 if (until_connect) until_connect--;
3270 inputfile_queue_check_expired(backlog_input_file);
3271 poll_backlog_file();
3272 if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3273 statemc_period_poll();
3274 check_assign_articles();
3279 /*========== dumping state ==========*/
3281 static void dump_article_list(FILE *f, const ControlCommand *c,
3282 const ArticleList *al) {
3283 fprintf(f, " count=%d\n", al->count);
3284 if (!c->xval) return;
3286 int i; Article *art;
3287 for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3288 fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3289 DUMPV("%p", art->,ipf);
3290 DUMPV("%d", art->,missing);
3291 DUMPV("%lu", (unsigned long)art->,offset);
3292 DUMPV("%d", art->,blanklen);
3293 DUMPV("%d", art->,midlen);
3294 fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3298 static void dump_input_file(FILE *f, const ControlCommand *c,
3299 InputFile *ipf, const char *wh) {
3300 char *dipf= debug_report_ipf(ipf);
3301 fprintf(f,"input %s %s", wh, dipf);
3305 DUMPV("%d", ipf->,readcount_ok);
3306 DUMPV("%d", ipf->,readcount_blank);
3307 DUMPV("%d", ipf->,readcount_err);
3311 ArtState state; const char *const *statename;
3312 for (state=0, statename=artstate_names; *statename; state++,statename++) {
3313 #define RC_DUMP_FMT(x) " " #x "=%d"
3314 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3315 fprintf(f,"input %s counts %-11s"
3316 RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3318 RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3320 fprintf(f,"input %s queue", wh);
3321 dump_article_list(f,c,&ipf->queue);
3327 fprintf(cc->out, "dumping state to %s\n", path_dump);
3328 FILE *f= fopen(path_dump, "w");
3329 if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3331 fprintf(f,"general");
3332 DUMPV("%s", sms_names,[sms]);
3333 DUMPV("%d", ,until_flush);
3334 DUMPV("%ld", (long),self_pid);
3335 DUMPV("%p", , defer);
3336 DUMPV("%d", , until_connect);
3337 DUMPV("%d", , until_backlog_nextscan);
3338 DUMPV("%d", , simulate_flush);
3339 fprintf(f,"\nnocheck");
3340 DUMPV("%#.10f", , accept_proportion);
3341 DUMPV("%d", , nocheck);
3342 DUMPV("%d", , nocheck_reported);
3345 fprintf(f,"special");
3346 DUMPV("%ld", (long),connecting_child);
3347 DUMPV("%d", , connecting_fdpass_sock);
3348 DUMPV("%d", , control_master);
3351 fprintf(f,"filemon ");
3352 filemon_method_dump_info(f);
3354 dump_input_file(f,c, main_input_file, "main" );
3355 dump_input_file(f,c, flushing_input_file, "flushing");
3356 dump_input_file(f,c, backlog_input_file, "backlog" );
3358 fprintf(f,"conns count=%d\n", conns.count);
3363 fprintf(f,"C%d",conn->fd);
3364 DUMPV("%p",conn->,rd); DUMPV("%d",conn->,max_queue);
3365 DUMPV("%d",conn->,stream); DUMPV("%d",conn->,quitting);
3366 DUMPV("%d",conn->,since_activity);
3369 fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3370 fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3371 fprintf(f,"C%d sent", conn->fd); dump_article_list(f,c,&conn->sent);
3373 fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3374 for (i=0; i<conn->xmitu; i++) {
3375 const struct iovec *iv= &conn->xmit[i];
3376 const XmitDetails *xd= &conn->xmitd[i];
3379 case xk_Const: dinfo= xasprintf("Const"); break;
3380 case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break;
3384 fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3385 sanitise(iv->iov_base, iv->iov_len));
3391 DUMPV("%s", , path_lock);
3392 DUMPV("%s", , path_flushing);
3393 DUMPV("%s", , path_defer);
3394 DUMPV("%s", , path_control);
3395 DUMPV("%s", , path_dump);
3396 DUMPV("%s", , globpat_backlog);
3399 if (!!ferror(f) + !!fclose(f)) {
3400 fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3405 /*========== option parsing ==========*/
3407 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3408 static void vbadusage(const char *fmt, va_list al) {
3409 char *m= xvasprintf(fmt,al);
3410 fprintf(stderr, "bad usage: %s\n"
3411 "say --help for help, or read the manpage\n",
3414 syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3418 /*---------- generic option parser ----------*/
3420 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3421 static void badusage(const char *fmt, ...) {
3428 of_seconds= 001000u,
3429 of_boolean= 002000u,
3432 typedef struct Option Option;
3433 typedef void OptionParser(const Option*, const char *val);
3437 const char *lng, *formarg;
3443 static void parse_options(const Option *options, char ***argvp) {
3444 /* on return *argvp is first non-option arg; argc is not updated */
3447 const char *arg= *++(*argvp);
3449 if (*arg != '-') break;
3450 if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3452 while ((a= *++arg)) {
3456 char *equals= strchr(arg,'=');
3457 int len= equals ? (equals - arg) : strlen(arg);
3458 for (o=options; o->shrt || o->lng; o++)
3459 if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3461 badusage("unknown long option --%s",arg);
3464 if (equals) badusage("option --%s does not take a value",o->lng);
3466 } else if (equals) {
3470 if (!arg) badusage("option --%s needs a value for %s",
3471 o->lng, o->formarg);
3474 break; /* eaten the whole argument now */
3476 for (o=options; o->shrt || o->lng; o++)
3479 badusage("unknown short option -%c",a);
3486 if (!arg) badusage("option -%c needs a value for %s",
3487 o->shrt, o->formarg);
3490 break; /* eaten the whole argument now */
3496 #define DELIMPERHAPS(delim,str) (str) ? (delim) : "", (str) ? (str) : ""
3498 static void print_options(const Option *options, FILE *f) {
3500 for (o=options; o->shrt || o->lng; o++) {
3501 char shrt[2] = { o->shrt, 0 };
3502 char *optspec= xasprintf("%s%s%s%s%s",
3503 o->shrt ? "-" : "", shrt,
3504 o->shrt && o->lng ? "|" : "",
3505 DELIMPERHAPS("--", o->lng));
3506 fprintf(f, " %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3511 /*---------- specific option types ----------*/
3513 static void op_integer(const Option *o, const char *val) {
3516 unsigned long ul= strtoul(val,&ep,10);
3517 if (*ep || ep==val || errno || ul>INT_MAX)
3518 badusage("bad integer value for %s",o->lng);
3519 int *store= o->store;
3523 static void op_double(const Option *o, const char *val) {
3524 int *store= o->store;
3527 *store= strtod(val, &ep);
3528 if (*ep || ep==val || errno)
3529 badusage("bad floating point value for %s",o->lng);
3532 static void op_string(const Option *o, const char *val) {
3533 const char **store= o->store;
3537 static void op_seconds(const Option *o, const char *val) {
3538 int *store= o->store;
3542 double v= strtod(val,&ep);
3543 if (ep==val) badusage("bad time/duration value for %s",o->lng);
3545 if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3546 else if (!strcmp(ep,"m") || !strcmp(ep,"min")) unit= 60;
3547 else if (!strcmp(ep,"h") || !strcmp(ep,"hour")) unit= 3600;
3548 else if (!strcmp(ep,"d") || !strcmp(ep,"day")) unit= 86400;
3549 else if (!strcmp(ep,"das")) unit= 10;
3550 else if (!strcmp(ep,"hs")) unit= 100;
3551 else if (!strcmp(ep,"ks")) unit= 1000;
3552 else if (!strcmp(ep,"Ms")) unit= 1000000;
3553 else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3557 if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3561 static void op_setint(const Option *o, const char *val) {
3562 int *store= o->store;
3566 /*---------- specific options ----------*/
3568 static void help(const Option *o, const char *val);
3570 static const Option innduct_options[]= {
3571 {'f',"feedfile", "F", &feedfile, op_string },
3572 {'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 },
3573 {0,"no-daemon", 0, &become_daemon, op_setint, 0 },
3574 {0,"no-streaming", 0, &try_stream, op_setint, 0 },
3575 {0,"no-filemon", 0, &try_filemon, op_setint, 0 },
3576 {'C',"inndconf", "F", &inndconffile, op_string },
3577 {'P',"port", "PORT", &port, op_integer },
3578 {0,"ctrl-sock-dir", 0, &realsockdir, op_string },
3579 {0,"help", 0, 0, help },
3581 {0,"max-connections", "N", &max_connections, op_integer },
3582 {0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer },
3583 {0,"max-queue-per-file", "N", &max_queue_per_ipf, op_integer },
3584 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer },
3585 {0,"period-interval", "TIME", &period_seconds, op_seconds },
3587 {0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds },
3588 {0,"stuck-flush-timeout", "TIME", &inndcomm_flush_timeout, op_seconds },
3589 {0,"feedfile-poll", "TIME", &filepoll_seconds, op_seconds },
3591 {0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double },
3592 {0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double },
3594 {0,"reconnect-interval", "PERIOD", &reconnect_delay_periods, op_seconds },
3595 {0,"flush-retry-interval", "PERIOD", &flushfail_retry_periods, op_seconds },
3596 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3597 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3598 {0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds },
3599 {0,"flush-finish-timeout", "PERIOD", &max_separated_periods, op_seconds },
3600 {0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds },
3602 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double },
3603 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer },
3608 static void printusage(FILE *f) {
3609 fputs("usage: innduct [options] site [fqdn]\n"
3610 "available options are:\n", f);
3611 print_options(innduct_options, f);
3614 static void help(const Option *o, const char *val) {
3616 if (ferror(stdout) || fflush(stdout)) {
3617 perror("innduct: writing help");
3623 static void convert_to_periods_rndup(int *store) {
3624 *store += period_seconds-1;
3625 *store /= period_seconds;
3628 int main(int argc, char **argv) {
3634 parse_options(innduct_options, &argv);
3639 if (!sitename) badusage("need site name argument");
3640 remote_host= *argv++;
3641 if (*argv) badusage("too many non-option arguments");
3645 int r= innconf_read(inndconffile);
3646 if (!r) badusage("could not read inn.conf (more info on stderr)");
3648 if (!remote_host) remote_host= sitename;
3650 if (nocheck_thresh < 0 || nocheck_thresh > 100)
3651 badusage("nocheck threshold percentage must be between 0..100");
3652 nocheck_thresh *= 0.01;
3654 if (nocheck_decay < 0.1)
3655 badusage("nocheck decay articles must be at least 0.1");
3656 nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3658 convert_to_periods_rndup(&reconnect_delay_periods);
3659 convert_to_periods_rndup(&flushfail_retry_periods);
3660 convert_to_periods_rndup(&backlog_retry_minperiods);
3661 convert_to_periods_rndup(&backlog_spontrescan_periods);
3662 convert_to_periods_rndup(&spontaneous_flush_periods);
3663 convert_to_periods_rndup(&max_separated_periods);
3664 convert_to_periods_rndup(&need_activity_periods);
3666 if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3667 badusage("bad input data ratio must be between 0..100");
3668 max_bad_data_ratio *= 0.01;
3671 feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3672 } else if (!feedfile[0]) {
3673 badusage("feed filename must be nonempty");
3674 } else if (feedfile[strlen(feedfile)-1]=='/') {
3675 feedfile= xasprintf("%s%s",feedfile,sitename);
3678 if (max_queue_per_ipf<0)
3679 max_queue_per_ipf= max_queue_per_conn * 2;
3681 const char *feedfile_forbidden= "?*[~#";
3683 while ((c= *feedfile_forbidden++))
3684 if (strchr(feedfile, c))
3685 badusage("feed filename may not contain metacharacter %c",c);
3689 path_lock= xasprintf("%s_lock", feedfile);
3690 path_flushing= xasprintf("%s_flushing", feedfile);
3691 path_defer= xasprintf("%s_defer", feedfile);
3692 path_control= xasprintf("%s_control", feedfile);
3693 path_dump= xasprintf("%s_dump", feedfile);
3694 globpat_backlog= xasprintf("%s_backlog*", feedfile);
3696 oop_source_sys *sysloop= oop_sys_new();
3697 if (!sysloop) sysdie("could not create liboop event loop");
3698 loop= (oop_source*)sysloop;
3702 if (become_daemon) {
3704 for (i=3; i<255; i++)
3705 /* do this now before we open syslog, etc. */
3707 openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3709 int null= open("/dev/null",O_RDWR);
3710 if (null<0) sysfatal("failed to open /dev/null");
3714 xclose(null, "/dev/null original fd",0);
3716 pid_t child1= xfork("daemonise first fork");
3717 if (child1) _exit(0);
3719 pid_t sid= setsid();
3720 if (sid != child1) sysfatal("setsid failed");
3722 pid_t child2= xfork("daemonise second fork");
3723 if (child2) _exit(0);
3727 if (self_pid==-1) sysdie("getpid");
3742 notice("filemon: suppressed by command line option, polling");
3744 filemon_ok= filemon_method_init();
3746 warn("filemon: no file monitoring available, polling");
3749 every(filepoll_seconds,0,filepoll);
3751 every(period_seconds,1,period);
3757 void *run= oop_sys_run(sysloop);
3758 assert(run == OOP_ERROR);
3759 sysdie("event loop failed");