3 * build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
8 to ensure articles go away eventually
9 separate queue for each input file
11 every period, check head of backlog queue for expiry with SMretrieve
12 if too old: discard, and check next article
13 also check every backlog article as we read it
15 after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
16 one-off: eat queued articles from flushing and write them to defer
17 one-off: connfail all connections which have any articles from flushing
18 newly read articles from flushing go straight to defer
19 this should take care of it and get us out of this state
20 to avoid filling up ram needlessly
22 limit number of queued articles for each ipf
23 pause/resume inputfile tailing
27 * Newsfeeds file entries should look like this:
28 * host.name.of.site[/exclude,exclude,...]\
29 * :pattern,pattern...[/distribution,distribution...]\
33 * sitename[/exclude,exclude,...]\
34 * :pattern,pattern...[/distribution,distribution...]\
40 * or might be blanked out
41 * <spc><spc><spc><spc>....
43 * F site.name main feed file
44 * opened/created, then written, by innd
47 * tokens blanked out by duct when processed
48 * site.name_lock lock preventing multiple ducts
49 * to hold lock must open,F_SETLK[W]
50 * and then stat to check that locked file
51 * still has name site.name_lock
52 * holder of this lock is "duct"
53 * (only) lockholder may remove the lockfile
54 * D site.name_flushing temporary feed file during flush (or crash)
55 * hardlink created by duct
57 * site.name_defer 431'd articles, still being written,
58 * created, written, used by duct
60 * site.name_backlog.<date>.<inum>
61 * 431'd articles, ready for innxmit or duct
62 * created (link/mv) by duct
63 * site.name_backlog<anything-else> (where <anything-else> does not
64 * contain '#' or '~') eg
65 * site.name_backlog.manual
66 * anything the sysadmin likes (eg, feed files
67 * from old feeds to be merged into this one)
68 * created (link/mv) by admin
69 * may be symlinks (in which case links
70 * may be written through, but only links
73 * It is safe to remove backlog files manually,
74 * if it's desired to throw away the backlog.
76 * Backlog files are also processed by innduct. We find the oldest
77 * backlog file which is at least a certain amount old, and feed it
78 * back into our processing. When every article in it has been read
79 * and processed, we unlink it and look for another backlog file.
81 * If we don't have a backlog file that we're reading, we close the
82 * defer file that we're writing and make it into a backlog file at
83 * the first convenient opportunity.
94 | | <----------------<---------------------------------'|
100 | F: innd writing, duct reading |
103 | | duct decides time to flush |
104 | | duct makes hardlink |
106 | V <------------------------'|
108 | F == D: innd writing, duct reading both exist |
111 | | <-----------<-------------<--'|
112 | | open D F ENOENT |
115 | V <---------------------. |
118 | D: innd writing, duct reading; or ENOENT | |
120 | | duct requests flush of feed | |
121 | | (others can too, harmlessly) | |
125 | D: innd flushing, duct; or ENOENT | |
127 | | inndcomm flush fails | |
128 | |`-------------------------->------------------' |
130 | | inndcomm reports no such site |
131 | |`---------------------------------------------------- | -.
133 | | innd finishes writing D, creates F | |
134 | | inndcomm reports flush successful | |
137 | Separated <----------------' |
138 | F: innd writing F!=D /
139 | D: duct reading; or ENOENT both exist /
141 | | duct gets to the end of D /
142 | | duct opens F too /
145 | F: innd writing, duct reading |
146 | D: duct finishing V
148 | | duct finishes processing D F: ENOENT
149 | V duct unlinks D D: duct reading
151 `--<--' | duct finishes
161 "duct reading" means innduct is reading the file but also
162 overwriting processed tokens.
166 * rune for printing diagrams:
168 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
173 /*============================== PROGRAM ==============================*/
175 #define _GNU_SOURCE 1
181 #include "inndcomm.h"
183 #include "inn/list.h"
184 #include "inn/innconf.h"
187 #include <sys/types.h>
188 #include <sys/wait.h>
189 #include <sys/stat.h>
190 #include <sys/socket.h>
209 #include <oop-read.h>
211 /*----- general definitions, probably best not changed -----*/
213 #define CONNCHILD_ESTATUS_STREAM 24
214 #define CONNCHILD_ESTATUS_NOSTREAM 25
216 #define INNDCOMMCHILD_ESTATUS_FAIL 26
217 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
219 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
220 #define MAX_CONTROL_COMMAND 1000
222 #define VA va_list al; va_start(al,fmt)
223 #define PRINTF(f,a) __attribute__((__format__(printf,f,a)))
224 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
225 #define NORET __attribute__((__noreturn__))
227 #define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr))))
228 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
230 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
232 #define FOR_CONN(conn) \
233 for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
235 /*----- doubly linked lists -----*/
237 #define ISNODE(T) struct node list_node
240 union { struct list li; T *for_type; } u; \
244 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
246 #define LIST_CHECKCANHAVENODE(l,n) \
247 ((void)((n) == ((l).u.for_type))) /* just for the type check */
249 #define LIST_ADDSOMEHOW(l,n,list_addsomehow) \
250 ( LIST_CHECKCANHAVENODE(l,n), \
251 list_addsomehow(&(l).u.li, NODE((n))), \
255 #define LIST_REMSOMEHOW(l,list_remsomehow) \
256 ( (typeof((l).u.for_type)) \
259 list_remsomehow(&(l).u.li) ) \
265 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
266 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
267 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
268 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
270 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
271 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
272 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
273 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
275 #define LIST_REMOVE(l,n) \
276 ( LIST_CHECKCANHAVENODE(l,n), \
277 list_remove(NODE((n))), \
281 #define LIST_INSERT(l,n,pred) \
282 ( LIST_CHECKCANHAVENODE(l,n), \
283 LIST_CHECKCANHAVENODE(l,pred), \
284 list_insert((struct list*)&(l), NODE((n)), NODE((pred))), \
288 /*----- type predeclarations -----*/
290 typedef struct Conn Conn;
291 typedef struct Article Article;
292 typedef struct InputFile InputFile;
293 typedef struct XmitDetails XmitDetails;
294 typedef struct Filemon_Perfile Filemon_Perfile;
295 typedef enum StateMachineState StateMachineState;
296 typedef struct ControlCommand ControlCommand;
301 /*----- function predeclarations -----*/
303 static void conn_maybe_write(Conn *conn);
304 static void conn_make_some_xmits(Conn *conn);
305 static void *conn_write_some_xmits(Conn *conn);
307 static void xmit_free(XmitDetails *d);
309 #define SMS(newstate, periods, why) \
310 (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
311 static void statemc_setstate(StateMachineState newsms, int periods,
312 const char *forlog, const char *why);
314 static void statemc_start_flush(const char *why); /* Normal => Flushing */
315 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
316 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
318 static void article_done(Article *art, int whichcount);
320 static void check_assign_articles(void);
321 static void queue_check_input_done(void);
323 static void statemc_check_flushing_done(void);
324 static void statemc_check_backlog_done(void);
326 static void postfork(void);
327 static void period(void);
329 static void open_defer(void);
330 static void close_defer(void);
331 static void search_backlog_file(void);
332 static void preterminate(void);
333 static void raise_default(int signo) NORET;
334 static char *debug_report_ipf(InputFile *ipf);
336 static void inputfile_reading_start(InputFile *ipf);
337 static void inputfile_reading_stop(InputFile *ipf);
339 static void filemon_start(InputFile *ipf);
340 static void filemon_stop(InputFile *ipf);
341 static void filemon_callback(InputFile *ipf);
343 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
344 static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
346 static const oop_rd_style peer_rd_style;
347 static oop_rd_call peer_rd_err, peer_rd_ok;
349 /*----- configuration options -----*/
350 /* when changing defaults, remember to update the manpage */
352 static const char *sitename, *remote_host;
353 static const char *feedfile, *realsockdir="/tmp/innduct.control";
354 static int quiet_multiple=0;
355 static int become_daemon=1, try_filemon=1;
356 static int try_stream=1;
358 static const char *inndconffile;
360 static int max_connections=10;
361 static int max_queue_per_conn=200;
362 static int target_max_feedfile_size=100000;
363 static int period_seconds=60;
364 static int filepoll_seconds=5;
366 static int connection_setup_timeout=200;
367 static int inndcomm_flush_timeout=100;
369 static double nocheck_thresh= 95.0; /* converted from percentage by main */
370 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
372 /* all these are initialised to seconds, and converted to periods in main */
373 static int reconnect_delay_periods=1000;
374 static int flushfail_retry_periods=1000;
375 static int backlog_retry_minperiods=50;
376 static int backlog_spontrescan_periods=300;
377 static int spontaneous_flush_periods=100000;
378 static int need_activity_periods=1000;
380 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
381 static int max_bad_data_initial= 30;
382 /* in one corrupt 4096-byte block the number of newlines has
383 * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */
386 /*----- statistics -----*/
388 typedef enum { /* in queue in conn->sent */
389 art_Unchecked, /* not checked, not sent checking */
390 art_Wanted, /* checked, wanted sent body as requested */
391 art_Unsolicited, /* - sent body without check */
395 static const char *const artstate_names[]=
396 { "Unchecked", "Wanted", "Unsolicited", 0 };
398 #define RESULT_COUNTS(RCS,RCN) \
407 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
408 #define RCI_TRIPLE_VALS_BASE(counts,x) \
409 counts[art_Unchecked] x \
410 + counts[art_Wanted] x \
411 + counts[art_Unsolicited] x, \
412 counts[art_Unchecked] x \
413 , counts[art_Wanted] x \
414 , counts[art_Unsolicited] x
417 #define RC_INDEX(x) RC_##x,
418 RESULT_COUNTS(RC_INDEX, RC_INDEX)
423 /*----- transmission buffers -----*/
439 /*----- core operational data structure types -----*/
442 /* This is also an instance of struct oop_readable */
443 struct oop_readable readable; /* first */
444 oop_readable_call *readable_callback;
445 void *readable_callback_user;
448 Filemon_Perfile *filemon;
450 oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
455 long inprogress; /* includes queue.count and also articles in conns */
457 int counts[art_MaxState][RCI_max];
458 int readcount_ok, readcount_blank, readcount_err;
473 #define SMS_LIST(X) \
481 enum StateMachineState {
482 #define SMS_DEF_ENUM(s) sm_##s,
483 SMS_LIST(SMS_DEF_ENUM)
486 static const char *sms_names[]= {
487 #define SMS_DEF_NAME(s) #s ,
488 SMS_LIST(SMS_DEF_NAME)
494 int fd; /* may be 0, meaning closed (during construction/destruction) */
495 oop_read *rd; /* likewise */
496 int max_queue, stream, quitting;
497 int since_activity; /* periods */
498 ArticleList waiting; /* not yet told peer */
499 ArticleList priority; /* peer says send it now */
500 ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
501 struct iovec xmit[CONNIOVS];
502 XmitDetails xmitd[CONNIOVS];
507 /*----- general operational variables -----*/
509 /* main initialises */
510 static oop_source *loop;
511 static ConnList conns;
512 static char *path_lock, *path_flushing, *path_defer;
513 static char *path_control, *path_dump;
514 static char *globpat_backlog;
515 static pid_t self_pid;
517 /* statemc_init initialises */
518 static StateMachineState sms;
519 static int until_flush;
520 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
523 /* initialisation to 0 is good */
524 static int until_connect, until_backlog_nextscan;
525 static double accept_proportion;
526 static int nocheck, nocheck_reported, in_child;
528 /* for simulation, debugging, etc. */
529 int simulate_flush= -1;
531 /*========== logging ==========*/
533 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
534 static void logcore(int sysloglevel, const char *fmt, ...) {
537 vsyslog(sysloglevel,fmt,al);
539 if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
540 vfprintf(stderr,fmt,al);
546 static void logv(int sysloglevel, const char *pfx, int errnoval,
547 const char *fmt, va_list al) PRINTF(5,0);
548 static void logv(int sysloglevel, const char *pfx, int errnoval,
549 const char *fmt, va_list al) {
550 char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
551 vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
552 msgbuf[sizeof(msgbuf)-1]= 0;
554 if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
555 sysloglevel= LOG_ERR; /* run by wrong user, probably */
557 logcore(sysloglevel, "<%s>%s: %s%s%s",
558 sitename, pfx, msgbuf,
559 errnoval>=0 ? ": " : "",
560 errnoval>=0 ? strerror(errnoval) : "");
563 #define diewrap(fn, pfx, sysloglevel, err, estatus) \
564 static void fn(const char *fmt, ...) NORET_PRINTF(1,2); \
565 static void fn(const char *fmt, ...) { \
568 logv(sysloglevel, pfx, err, fmt, al); \
572 #define logwrap(fn, pfx, sysloglevel, err) \
573 static void fn(const char *fmt, ...) PRINTF(1,2); \
574 static void fn(const char *fmt, ...) { \
576 logv(sysloglevel, pfx, err, fmt, al); \
580 diewrap(sysdie, " critical", LOG_CRIT, errno, 16);
581 diewrap(die, " critical", LOG_CRIT, -1, 16);
583 diewrap(sysfatal, " fatal", LOG_ERR, errno, 12);
584 diewrap(fatal, " fatal", LOG_ERR, -1, 12);
586 logwrap(syswarn, " warning", LOG_WARNING, errno);
587 logwrap(warn, " warning", LOG_WARNING, -1);
589 logwrap(notice, " notice", LOG_NOTICE, -1);
590 logwrap(info, " info", LOG_INFO, -1);
591 logwrap(debug, " debug", LOG_DEBUG, -1);
594 /*========== utility functions etc. ==========*/
596 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
597 static char *xvasprintf(const char *fmt, va_list al) {
599 int rc= vasprintf(&str,fmt,al);
600 if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
603 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
604 static char *xasprintf(const char *fmt, ...) {
606 char *str= xvasprintf(fmt,al);
611 static int close_perhaps(int *fd) {
612 if (*fd <= 0) return 0;
617 static void xclose(int fd, const char *what, const char *what2) {
619 if (r) sysdie("close %s%s",what,what2?what2:"");
621 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
622 if (*fd <= 0) return;
623 xclose(*fd,what,what2);
627 static pid_t xfork(const char *what) {
631 if (child==-1) sysfatal("cannot fork for %s",what);
632 debug("forked %s %ld", what, (unsigned long)child);
633 if (!child) postfork();
637 static void on_fd_read_except(int fd, oop_call_fd callback) {
638 loop->on_fd(loop, fd, OOP_READ, callback, 0);
639 loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
641 static void cancel_fd_read_except(int fd) {
642 loop->cancel_fd(loop, fd, OOP_READ);
643 loop->cancel_fd(loop, fd, OOP_EXCEPTION);
646 static void report_child_status(const char *what, int status) {
647 if (WIFEXITED(status)) {
648 int es= WEXITSTATUS(status);
650 warn("%s: child died with error exit status %d", what, es);
651 } else if (WIFSIGNALED(status)) {
652 int sig= WTERMSIG(status);
653 const char *sigstr= strsignal(sig);
654 const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
656 warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
658 warn("%s: child died due to unknown fatal signal %d%s",
659 what, sig, coredump);
661 warn("%s: child died with unknown wait status %d", what,status);
665 static int xwaitpid(pid_t *pid, const char *what) {
668 int r= kill(*pid, SIGKILL);
669 if (r) sysdie("cannot kill %s child", what);
671 pid_t got= waitpid(*pid, &status, 0);
672 if (got==-1) sysdie("cannot reap %s child", what);
673 if (got==0) die("cannot reap %s child", what);
680 static void *zxmalloc(size_t sz) {
681 void *p= xmalloc(sz);
686 static void xunlink(const char *path, const char *what) {
688 if (r) sysdie("can't unlink %s %s", path, what);
691 static time_t xtime(void) {
693 if (now==-1) sysdie("time(2) failed");
697 static void xsigaction(int signo, const struct sigaction *sa) {
698 int r= sigaction(signo,sa,0);
699 if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
702 static void xsigsetdefault(int signo) {
704 memset(&sa,0,sizeof(sa));
705 sa.sa_handler= SIG_DFL;
706 xsigaction(signo,&sa);
709 static void xgettimeofday(struct timeval *tv_r) {
710 int r= gettimeofday(tv_r,0);
711 if (r) sysdie("gettimeofday(2) failed");
714 static void xsetnonblock(int fd, int nonblocking) {
715 int errnoval= oop_fd_nonblock(fd, nonblocking);
716 if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
719 static void check_isreg(const struct stat *stab, const char *path,
721 if (!S_ISREG(stab->st_mode))
722 die("%s %s not a plain file (mode 0%lo)",
723 what, path, (unsigned long)stab->st_mode);
726 static void xfstat(int fd, struct stat *stab_r, const char *what) {
727 int r= fstat(fd, stab_r);
728 if (r) sysdie("could not fstat %s", what);
731 static void xfstat_isreg(int fd, struct stat *stab_r,
732 const char *path, const char *what) {
733 xfstat(fd, stab_r, what);
734 check_isreg(stab_r, path, what);
737 static void xlstat_isreg(const char *path, struct stat *stab,
738 int *enoent_r /* 0 means ENOENT is fatal */,
740 int r= lstat(path, stab);
742 if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
743 sysdie("could not lstat %s %s", what, path);
745 if (enoent_r) *enoent_r= 0;
746 check_isreg(stab, path, what);
749 static int samefile(const struct stat *a, const struct stat *b) {
750 assert(S_ISREG(a->st_mode));
751 assert(S_ISREG(b->st_mode));
752 return (a->st_ino == b->st_ino &&
753 a->st_dev == b->st_dev);
756 static char *sanitise(const char *input, int len) {
757 static char sanibuf[100]; /* returns pointer to this buffer! */
759 const char *p= input;
760 const char *endp= len>=0 ? input+len : 0;
764 if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
765 int c= (!endp || p<endp) ? *p++ : 0;
766 if (!c) { *q++= '\''; *q=0; break; }
767 if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
768 sprintf(q,"\\x%02x",c);
774 static int isewouldblock(int errnoval) {
775 return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
778 /*========== command and control connections ==========*/
780 static int control_master;
782 typedef struct ControlConn ControlConn;
784 void (*destroy)(ControlConn*);
790 struct sockaddr_un un;
795 static const oop_rd_style control_rd_style= {
796 OOP_RD_DELIM_STRIP, '\n',
798 OOP_RD_SHORTREC_FORBID
801 static void control_destroy(ControlConn *cc) {
805 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
806 if (ferror(cc->out) | fflush(cc->out)) {
807 info("CTRL%d write error %s", cc->fd, strerror(errno));
812 static void control_prompt(ControlConn *cc /* may destroy*/) {
813 fprintf(cc->out, "%s| ", sitename);
814 control_checkouterr(cc);
817 struct ControlCommand {
819 void (*f)(ControlConn *cc, const ControlCommand *ccmd,
820 const char *arg, size_t argsz);
825 static const ControlCommand control_commands[];
828 static void ccmd_##wh(ControlConn *cc, const ControlCommand *c, \
829 const char *arg, size_t argsz)
832 fputs("commands:\n", cc->out);
833 const ControlCommand *ccmd;
834 for (ccmd=control_commands; ccmd->cmd; ccmd++)
835 fprintf(cc->out, " %s\n", ccmd->cmd);
836 fputs("NB: permissible arguments are not shown above."
837 " Not all commands listed are safe. See innduct(8).\n", cc->out);
841 int ok= trigger_flush_ok();
842 if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
847 notice("terminating (CTRL%d)",cc->fd);
848 raise_default(SIGTERM);
854 /* messing with our head: */
855 CCMD(period) { period(); }
856 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
857 CCMD(setint) { *(int*)c->xdata= c->xval; }
858 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
860 static const ControlCommand control_commands[]= {
862 { "flush", ccmd_flush },
863 { "stop", ccmd_stop },
864 { "dump q", ccmd_dump, 0,0 },
865 { "dump a", ccmd_dump, 0,1 },
867 { "p", ccmd_period },
869 #define POKES(cmd,func) \
870 { cmd "flush", func, &until_flush, 1 }, \
871 { cmd "conn", func, &until_connect, 0 }, \
872 { cmd "blscan", func, &until_backlog_nextscan, 0 },
873 POKES("next ", ccmd_setint)
874 POKES("prod ", ccmd_setint_period)
876 { "pretend flush", ccmd_setintarg, &simulate_flush },
877 { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 },
881 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
882 const char *errmsg, int errnoval,
883 const char *data, size_t recsz, void *cc_v) {
884 ControlConn *cc= cc_v;
887 info("CTRL%d closed", cc->fd);
892 if (recsz == 0) goto prompt;
894 const ControlCommand *ccmd;
895 for (ccmd=control_commands; ccmd->cmd; ccmd++) {
896 int l= strlen(ccmd->cmd);
897 if (recsz < l) continue;
898 if (recsz > l && data[l] != ' ') continue;
899 if (memcmp(data, ccmd->cmd, l)) continue;
901 int argl= (int)recsz - (l+1);
902 ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
906 fputs("unknown command; h for help\n", cc->out);
913 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
914 const char *errmsg, int errnoval,
915 const char *data, size_t recsz, void *cc_v) {
916 ControlConn *cc= cc_v;
918 info("CTRL%d read error %s", cc->fd, errmsg);
923 static int control_conn_startup(ControlConn *cc /* may destroy*/,
925 cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
926 if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
928 int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
931 if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
933 info("CTRL%d %s ready", cc->fd, how);
938 static void control_stdio_destroy(ControlConn *cc) {
940 oop_rd_cancel(cc->rd);
941 errno= oop_rd_delete_tidy(cc->rd);
942 if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
947 static void control_stdio(void) {
948 NEW_DECL(ControlConn *,cc);
949 cc->destroy= control_stdio_destroy;
953 int r= control_conn_startup(cc,"stdio");
954 if (r) cc->destroy(cc);
957 static void control_accepted_destroy(ControlConn *cc) {
959 oop_rd_cancel(cc->rd);
960 oop_rd_delete_kill(cc->rd);
962 if (cc->out) { fclose(cc->out); cc->fd=0; }
963 close_perhaps(&cc->fd);
967 static void *control_master_readable(oop_source *lp, int master,
968 oop_event ev, void *u) {
969 NEW_DECL(ControlConn *,cc);
970 cc->destroy= control_accepted_destroy;
972 cc->salen= sizeof(cc->sa);
973 cc->fd= accept(master, &cc->sa.sa, &cc->salen);
974 if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
976 cc->out= fdopen(cc->fd, "w");
977 if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
979 int r= control_conn_startup(cc, "accepted");
989 #define NOCONTROL(...) do{ \
990 syswarn("no control socket, because failed to " __VA_ARGS__); \
994 static void control_init(void) {
999 struct sockaddr_un un;
1002 memset(&sa,0,sizeof(sa));
1003 int maxlen= sizeof(sa.un.sun_path);
1005 int reallen= readlink(path_control, sa.un.sun_path, maxlen);
1007 if (errno != ENOENT)
1008 NOCONTROL("readlink control socket symlink path %s", path_control);
1010 if (reallen >= maxlen) {
1011 debug("control socket symlink path too long (r=%d)",reallen);
1012 xunlink(path_control, "old (overlong) control socket symlink");
1018 int r= lstat(realsockdir,&stab);
1020 if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
1022 r= mkdir(realsockdir, 0700);
1023 if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
1026 uid_t self= geteuid();
1027 if (!S_ISDIR(stab.st_mode) ||
1028 stab.st_uid != self ||
1029 stab.st_mode & 0007) {
1030 warn("no control socket, because real socket directory"
1031 " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
1032 !!S_ISDIR(stab.st_mode),
1033 (unsigned long)stab.st_uid, (unsigned long)self,
1034 (unsigned long)stab.st_mode & 0777UL);
1039 real= xasprintf("%s/s%lx.%lx", realsockdir,
1040 (unsigned long)xtime(), (unsigned long)self_pid);
1041 int reallen= strlen(real);
1043 if (reallen >= maxlen) {
1044 warn("no control socket, because tmpnam gave overly-long path"
1048 r= symlink(real, path_control);
1049 if (r) NOCONTROL("make control socket path %s a symlink to real"
1050 " socket path %s", path_control, real);
1051 memcpy(sa.un.sun_path, real, reallen);
1054 int r= unlink(sa.un.sun_path);
1055 if (r && errno!=ENOENT)
1056 NOCONTROL("remove old real socket %s", sa.un.sun_path);
1058 control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1059 if (control_master<0) NOCONTROL("create new control socket");
1061 sa.un.sun_family= AF_UNIX;
1062 int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1063 r= bind(control_master, &sa.sa, sl);
1064 if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1066 r= listen(control_master, 5);
1067 if (r) NOCONTROL("listen");
1069 xsetnonblock(control_master, 1);
1071 loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1072 info("control socket ok, real path %s", sa.un.sun_path);
1078 xclose_perhaps(&control_master, "control master",0);
1082 /*========== management of connections ==========*/
1084 static void conn_closefd(Conn *conn, const char *msgprefix) {
1085 int r= close_perhaps(&conn->fd);
1086 if (r) info("C%d %serror closing socket: %s",
1087 conn->fd, msgprefix, strerror(errno));
1090 static void conn_dispose(Conn *conn) {
1093 oop_rd_cancel(conn->rd);
1094 oop_rd_delete_kill(conn->rd);
1098 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1099 loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1101 conn_closefd(conn,"");
1103 until_connect= reconnect_delay_periods;
1106 static void *conn_exception(oop_source *lp, int fd,
1107 oop_event ev, void *conn_v) {
1110 assert(fd == conn->fd);
1111 assert(ev == OOP_EXCEPTION);
1112 int r= read(conn->fd, &ch, 1);
1113 if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1114 else connfail(conn,"exceptional condition on socket (peer sent urgent"
1115 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1116 return OOP_CONTINUE;
1119 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1120 int requeue[art_MaxState];
1121 memset(requeue,0,sizeof(requeue));
1125 while ((art= LIST_REMHEAD(conn->priority)))
1126 LIST_ADDTAIL(art->ipf->queue, art);
1128 while ((art= LIST_REMHEAD(conn->waiting)))
1129 LIST_ADDTAIL(art->ipf->queue, art);
1131 while ((art= LIST_REMHEAD(conn->sent))) {
1132 requeue[art->state]++;
1133 if (art->state==art_Unsolicited) art->state= art_Unchecked;
1134 LIST_ADDTAIL(art->ipf->queue,art);
1139 for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1142 char *m= xvasprintf(fmt,al);
1143 warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1144 conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1147 LIST_REMOVE(conns,conn);
1149 check_assign_articles();
1152 static void connfail(Conn *conn, const char *fmt, ...) {
1155 vconnfail(conn,fmt,al);
1159 static void check_idle_conns(void) {
1162 conn->since_activity++;
1165 if (conn->since_activity <= need_activity_periods) continue;
1167 /* We need to shut this down */
1169 connfail(conn,"timed out waiting for response to QUIT");
1170 else if (conn->sent.count)
1171 connfail(conn,"timed out waiting for responses");
1172 else if (conn->waiting.count || conn->priority.count)
1173 connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1174 else if (conn->xmitu)
1175 connfail(conn,"peer has been sending responses"
1176 " before receiving our commands!");
1178 static const char quitcmd[]= "QUIT\r\n";
1179 int todo= sizeof(quitcmd)-1;
1180 const char *p= quitcmd;
1182 int r= write(conn->fd, p, todo);
1184 if (isewouldblock(errno))
1185 connfail(conn, "blocked writing QUIT to idle connection");
1187 connfail(conn, "failed to write QUIT to idle connection: %s",
1195 conn->since_activity= 0;
1196 debug("C%d is idle, quitting", conn->fd);
1205 /*---------- making new connections ----------*/
1207 static pid_t connecting_child;
1208 static int connecting_fdpass_sock;
1210 static void connect_attempt_discard(void) {
1211 if (connecting_child) {
1212 int status= xwaitpid(&connecting_child, "connect");
1213 if (!(WIFEXITED(status) ||
1214 (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1215 report_child_status("connect", status);
1217 if (connecting_fdpass_sock) {
1218 cancel_fd_read_except(connecting_fdpass_sock);
1219 xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1223 #define PREP_DECL_MSG_CMSG(msg) \
1225 struct iovec msgiov; \
1226 msgiov.iov_base= &msgbyte; \
1227 msgiov.iov_len= 1; \
1228 struct msghdr msg; \
1229 memset(&msg,0,sizeof(msg)); \
1230 char msg##cbuf[CMSG_SPACE(sizeof(int))]; \
1231 msg.msg_iov= &msgiov; \
1232 msg.msg_iovlen= 1; \
1233 msg.msg_control= msg##cbuf; \
1234 msg.msg_controllen= sizeof(msg##cbuf);
1236 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1239 assert(fd == connecting_fdpass_sock);
1241 PREP_DECL_MSG_CMSG(msg);
1243 ssize_t rs= recvmsg(fd, &msg, 0);
1245 if (isewouldblock(errno)) return OOP_CONTINUE;
1246 syswarn("failed to read socket from connecting child");
1251 LIST_INIT(conn->waiting);
1252 LIST_INIT(conn->priority);
1253 LIST_INIT(conn->sent);
1255 struct cmsghdr *h= 0;
1256 if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1258 int status= xwaitpid(&connecting_child, "connect child (broken)");
1260 if (WIFEXITED(status)) {
1261 if (WEXITSTATUS(status) != 0 &&
1262 WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1263 WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1264 /* child already reported the problem */;
1266 if (e == OOP_EXCEPTION)
1267 warn("connect: connection child exited code %d but"
1268 " unexpected exception on fdpass socket",
1269 WEXITSTATUS(status));
1271 warn("connect: connection child exited code %d but"
1273 WEXITSTATUS(status), (int)rs);
1275 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1276 warn("connect: connection attempt timed out");
1278 report_child_status("connect", status);
1283 #define CHK(field, val) \
1284 if (h->cmsg_##field != val) { \
1285 die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1286 h->cmsg_##field, val); \
1289 CHK(level, SOL_SOCKET);
1290 CHK(type, SCM_RIGHTS);
1291 CHK(len, CMSG_LEN(sizeof(conn->fd)));
1294 if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1296 memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1299 pid_t got= waitpid(connecting_child, &status, 0);
1300 if (got==-1) sysdie("connect: real wait for child");
1301 assert(got == connecting_child);
1302 connecting_child= 0;
1304 if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1305 int es= WEXITSTATUS(status);
1307 case CONNCHILD_ESTATUS_STREAM: conn->stream= 1; break;
1308 case CONNCHILD_ESTATUS_NOSTREAM: conn->stream= 0; break;
1310 fatal("connect: child gave unexpected exit status %d", es);
1314 conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1316 loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1317 conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1318 if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1319 int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1321 &peer_rd_err, conn);
1322 if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1324 notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1325 LIST_ADDHEAD(conns, conn);
1327 connect_attempt_discard();
1328 check_assign_articles();
1329 return OOP_CONTINUE;
1333 connect_attempt_discard();
1334 return OOP_CONTINUE;
1337 static int allow_connect_start(void) {
1338 return conns.count < max_connections
1339 && !connecting_child
1343 static void connect_start(void) {
1344 assert(!connecting_child);
1345 assert(!connecting_fdpass_sock);
1347 info("starting connection attempt");
1350 int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1351 if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1353 connecting_child= xfork("connection");
1355 if (!connecting_child) {
1356 FILE *cn_from, *cn_to;
1357 char buf[NNTP_STRLEN+100];
1358 int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1360 xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1362 alarm(connection_setup_timeout);
1363 if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1367 unsigned char c= buf[l-1];
1368 if (!isspace(c)) break;
1369 if (c=='\n' || c=='\r') stripped=1;
1373 sysfatal("connect: connection attempt failed");
1376 fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1380 if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1381 sysfatal("connect: authentication failed");
1383 if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1385 sysfatal("connect: could not send MODE STREAM");
1386 buf[sizeof(buf)-1]= 0;
1387 if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1388 if (ferror(cn_from))
1389 sysfatal("connect: could not read response to MODE STREAM");
1391 fatal("connect: connection close in response to MODE STREAM");
1396 fatal("connect: response to MODE STREAM is too long: %.100s...",
1398 l--; if (l>0 && buf[l-1]=='\r') l--;
1401 int rcode= strtoul(buf,&ep,10);
1403 fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1407 exitstatus= CONNCHILD_ESTATUS_STREAM;
1413 warn("connect: unexpected response to MODE STREAM: %.50s",
1419 int fd= fileno(cn_from);
1421 PREP_DECL_MSG_CMSG(msg);
1422 struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1423 cmsg->cmsg_level= SOL_SOCKET;
1424 cmsg->cmsg_type= SCM_RIGHTS;
1425 cmsg->cmsg_len= CMSG_LEN(sizeof(fd));
1426 memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1428 msg.msg_controllen= cmsg->cmsg_len;
1429 r= sendmsg(socks[1], &msg, 0);
1430 if (r<0) sysdie("sendmsg failed for new connection");
1431 if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1436 xclose(socks[1], "connecting fdpass child's socket",0);
1437 connecting_fdpass_sock= socks[0];
1438 xsetnonblock(connecting_fdpass_sock, 1);
1439 on_fd_read_except(connecting_fdpass_sock, connchild_event);
1442 /*---------- assigning articles to conns, and transmitting ----------*/
1444 static Article *dequeue_from(int peek, InputFile *ipf) {
1446 if (peek) return LIST_HEAD(ipf->queue);
1447 else return LIST_REMHEAD(ipf->queue);
1450 static Article *dequeue(int peek) {
1452 art= dequeue_from(peek, flushing_input_file); if (art) return art;
1453 art= dequeue_from(peek, backlog_input_file); if (art) return art;
1454 art= dequeue_from(peek, main_input_file); if (art) return art;
1458 static void check_assign_articles(void) {
1464 int spare=0, inqueue=0;
1466 /* Find a connection to offer this article. We prefer a busy
1467 * connection to an idle one, provided it's not full. We take the
1468 * first (oldest) and since that's stable, it will mean we fill up
1469 * connections in order. That way if we have too many
1470 * connections, the spare ones will go away eventually.
1473 if (walk->quitting) continue;
1474 inqueue= walk->sent.count + walk->priority.count
1475 + walk->waiting.count;
1476 spare= walk->max_queue - inqueue;
1477 assert(inqueue <= max_queue_per_conn);
1479 if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1480 else if (spare>0) /*working*/ { use= walk; break; }
1483 if (!inqueue) use->since_activity= 0; /* reset idle counter */
1485 Article *art= dequeue(0);
1487 LIST_ADDTAIL(use->waiting, art);
1490 conn_maybe_write(use);
1491 } else if (allow_connect_start()) {
1492 until_connect= reconnect_delay_periods;
1501 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1502 conn_maybe_write(u);
1503 return OOP_CONTINUE;
1506 static void conn_maybe_write(Conn *conn) {
1508 conn_make_some_xmits(conn);
1510 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1514 void *rp= conn_write_some_xmits(conn);
1515 if (rp==OOP_CONTINUE) {
1516 loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1518 } else if (rp==OOP_HALT) {
1521 /* transmitted everything */
1528 static int article_check_expired(Article *art /* must be queued, not conn */) {
1529 ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
1530 if (artdata) { SMfreearticle(artdata); return 0; }
1532 LIST_REMOVE(art->ipf->queue, art);
1534 art->ipf->counts[art_Unchecked][RC_missing]++;
1535 article_done(art,-1);
1539 static void inputfile_queue_check_expired(InputFile *ipf) {
1543 Article *art= LIST_HEAD(ipf->queue);
1544 int exp= article_check_expired(art);
1549 /*========== article transmission ==========*/
1551 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1552 XmitKind kind) { /* caller must then fill in details */
1553 struct iovec *v= &conn->xmit[conn->xmitu];
1554 XmitDetails *d= &conn->xmitd[conn->xmitu++];
1555 v->iov_base= (char*)data;
1561 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1562 xmit_core(conn,data,len, xk_Const);
1564 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1566 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1567 XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1571 static void xmit_free(XmitDetails *d) {
1573 case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1574 case xk_Const: break;
1579 static void *conn_write_some_xmits(Conn *conn) {
1581 * 0: nothing more to write, no need to call us again
1582 * OOP_CONTINUE: more to write but fd not writeable
1583 * OOP_HALT: disaster, have destroyed conn
1586 int count= conn->xmitu;
1587 if (!count) return 0;
1589 if (count > IOV_MAX) count= IOV_MAX;
1590 ssize_t rs= writev(conn->fd, conn->xmit, count);
1592 if (isewouldblock(errno)) return OOP_CONTINUE;
1593 connfail(conn, "write failed: %s", strerror(errno));
1599 for (done=0; rs && done<conn->xmitu; done++) {
1600 struct iovec *vp= &conn->xmit[done];
1601 XmitDetails *dp= &conn->xmitd[done];
1602 if (rs > vp->iov_len) {
1606 vp->iov_base= (char*)vp->iov_base + rs;
1610 int newu= conn->xmitu - done;
1611 memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
1612 memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1617 static void conn_make_some_xmits(Conn *conn) {
1619 if (conn->xmitu+5 > CONNIOVS)
1622 Article *art= LIST_REMHEAD(conn->priority);
1623 if (!art) art= LIST_REMHEAD(conn->waiting);
1626 if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1627 /* actually send it */
1629 ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1632 art->state == art_Unchecked ? art_Unsolicited :
1633 art->state == art_Wanted ? art_Wanted :
1636 if (!artdata) art->missing= 1;
1637 art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1641 XMIT_LITERAL("TAKETHIS ");
1642 xmit_noalloc(conn, art->messageid, art->midlen);
1643 XMIT_LITERAL("\r\n");
1644 xmit_artbody(conn, artdata);
1646 article_done(art, -1);
1650 /* we got 235 from IHAVE */
1652 xmit_artbody(conn, artdata);
1654 XMIT_LITERAL(".\r\n");
1658 LIST_ADDTAIL(conn->sent, art);
1664 XMIT_LITERAL("CHECK ");
1666 XMIT_LITERAL("IHAVE ");
1667 xmit_noalloc(conn, art->messageid, art->midlen);
1668 XMIT_LITERAL("\r\n");
1670 assert(art->state == art_Unchecked);
1671 art->ipf->counts[art->state][RC_sent]++;
1672 LIST_ADDTAIL(conn->sent, art);
1678 /*========== handling responses from peer ==========*/
1680 static const oop_rd_style peer_rd_style= {
1681 OOP_RD_DELIM_STRIP, '\n',
1683 OOP_RD_SHORTREC_FORBID
1686 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1687 const char *errmsg, int errnoval,
1688 const char *data, size_t recsz, void *conn_v) {
1690 connfail(conn, "error receiving from peer: %s", errmsg);
1691 return OOP_CONTINUE;
1694 static Article *article_reply_check(Conn *conn, const char *response,
1695 int code_indicates_streaming,
1697 /* 1:yes, -1:no, 0:dontcare */,
1698 const char *sanitised_response) {
1699 Article *art= LIST_HEAD(conn->sent);
1703 "peer gave unexpected response when no commands outstanding: %s",
1704 sanitised_response);
1708 if (code_indicates_streaming) {
1709 assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1710 if (!conn->stream) {
1711 connfail(conn, "peer gave streaming response code "
1712 " to IHAVE or subsequent body: %s", sanitised_response);
1715 const char *got_mid= response+4;
1716 int got_midlen= strcspn(got_mid, " \n\r");
1717 if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1718 connfail(conn, "peer gave streaming response with syntactically invalid"
1719 " messageid: %s", sanitised_response);
1722 if (got_midlen != art->midlen ||
1723 memcmp(got_mid, art->messageid, got_midlen)) {
1724 connfail(conn, "peer gave streaming response code to wrong article -"
1725 " probable synchronisation problem; we offered: %s;"
1727 art->messageid, sanitised_response);
1732 connfail(conn, "peer gave non-streaming response code to"
1733 " CHECK/TAKETHIS: %s", sanitised_response);
1738 if (must_have_sent>0 && art->state < art_Wanted) {
1739 connfail(conn, "peer says article accepted but"
1740 " we had not sent the body: %s", sanitised_response);
1743 if (must_have_sent<0 && art->state >= art_Wanted) {
1744 connfail(conn, "peer says please sent the article but we just did: %s",
1745 sanitised_response);
1749 Article *art_again= LIST_REMHEAD(conn->sent);
1750 assert(art_again == art);
1754 static void update_nocheck(int accepted) {
1755 accept_proportion *= nocheck_decay;
1756 accept_proportion += accepted * (1.0 - nocheck_decay);
1757 int new_nocheck= accept_proportion >= nocheck_thresh;
1758 if (new_nocheck && !nocheck_reported) {
1759 notice("entering nocheck mode for the first time");
1760 nocheck_reported= 1;
1761 } else if (new_nocheck != nocheck) {
1762 debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1764 nocheck= new_nocheck;
1767 static void article_done(Article *art, int whichcount) {
1768 if (!art->missing) art->ipf->counts[art->state][whichcount]++;
1770 if (whichcount == RC_accepted) update_nocheck(1);
1771 else if (whichcount == RC_unwanted) update_nocheck(0);
1773 InputFile *ipf= art->ipf;
1775 while (art->blanklen) {
1776 static const char spaces[]=
1786 int w= art->blanklen; if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1787 int r= pwrite(ipf->fd, spaces, w, art->offset);
1789 if (errno==EINTR) continue;
1790 sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1791 art->messageid, art->blanklen,
1792 (unsigned long)art->offset, ipf->path);
1794 assert(r>=0 && r<=w);
1800 assert(ipf->inprogress >= 0);
1803 if (!ipf->inprogress && ipf != main_input_file)
1804 queue_check_input_done();
1807 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1808 const char *errmsg, int errnoval,
1809 const char *data, size_t recsz, void *conn_v) {
1812 if (ev == OOP_RD_EOF) {
1813 connfail(conn, "unexpected EOF from peer");
1814 return OOP_CONTINUE;
1816 assert(ev == OOP_RD_OK);
1818 char *sani= sanitise(data,-1);
1821 unsigned long code= strtoul(data, &ep, 10);
1822 if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1823 connfail(conn, "badly formatted response from peer: %s", sani);
1824 return OOP_CONTINUE;
1828 conn->waiting.count ||
1829 conn->priority.count ||
1833 if (conn->quitting) {
1834 if (code!=205 && code!=503) {
1835 connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1837 notice("C%d idle connection closed by us", conn->fd);
1839 LIST_REMOVE(conns,conn);
1842 return OOP_CONTINUE;
1845 conn->since_activity= 0;
1848 #define GET_ARTICLE(musthavesent) do{ \
1849 art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1850 if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */ \
1853 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \
1854 code_streaming= (streaming); \
1855 GET_ARTICLE(musthavesent); \
1856 article_done(art, RC_##how); \
1860 #define PEERBADMSG(m) do { \
1861 connfail(conn, m ": %s", sani); return OOP_CONTINUE; \
1864 int code_streaming= 0;
1868 case 400: PEERBADMSG("peer stopped accepting articles");
1869 default: PEERBADMSG("peer sent unexpected message");
1872 if (conn_busy) PEERBADMSG("peer timed us out");
1873 notice("C%d idle connection closed by peer", conn->fd);
1874 LIST_REMOVE(conns,conn);
1876 return OOP_CONTINUE;
1878 case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1879 case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1881 case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1882 case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1884 case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1885 case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1887 case 238: /* CHECK says send it */
1889 case 335: /* IHAVE says send it */
1891 assert(art->state == art_Unchecked);
1892 art->ipf->counts[art->state][RC_accepted]++;
1893 art->state= art_Wanted;
1894 LIST_ADDTAIL(conn->priority, art);
1897 case 431: /* CHECK or TAKETHIS says try later */
1899 case 436: /* IHAVE says try later */
1902 if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1904 sysfatal("write to defer file %s",path_defer);
1905 article_done(art, RC_deferred);
1911 conn_maybe_write(conn);
1912 check_assign_articles();
1913 return OOP_CONTINUE;
1917 /*========== monitoring of input files ==========*/
1919 static void feedfile_eof(InputFile *ipf) {
1920 assert(ipf != main_input_file); /* promised by tailing_try_read */
1921 inputfile_reading_stop(ipf);
1923 if (ipf == flushing_input_file) {
1924 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1925 if (main_input_file) inputfile_reading_start(main_input_file);
1926 statemc_check_flushing_done();
1927 } else if (ipf == backlog_input_file) {
1928 statemc_check_backlog_done();
1930 abort(); /* supposed to wait rather than get EOF on main input file */
1934 static InputFile *open_input_file(const char *path) {
1935 int fd= open(path, O_RDWR);
1937 if (errno==ENOENT) return 0;
1938 sysfatal("unable to open input file %s", path);
1942 InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
1943 memset(ipf,0,sizeof(*ipf));
1946 strcpy(ipf->path, path);
1947 LIST_INIT(ipf->queue);
1952 static void close_input_file(InputFile *ipf) { /* does not free */
1953 assert(!ipf->readable_callback); /* must have had ->on_cancel */
1954 assert(!ipf->filemon); /* must have had inputfile_reading_stop */
1955 assert(!ipf->rd); /* must have had inputfile_reading_stop */
1956 assert(!ipf->inprogress); /* no dangling pointers pointing here */
1957 xclose_perhaps(&ipf->fd, "input file ", ipf->path);
1961 /*---------- dealing with articles read in the input file ----------*/
1963 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
1964 const char *data, const char *how) {
1965 warn("corrupted file: %s, offset %lu: %s: in %s",
1966 ipf->path, (unsigned long)offset, how, sanitise(data,-1));
1967 ipf->readcount_err++;
1968 if (ipf->readcount_err > max_bad_data_initial +
1969 (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
1970 die("too much garbage in input file! (%d errs, %d ok, %d blank)",
1971 ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
1972 return OOP_CONTINUE;
1975 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
1976 oop_rd_event ev, const char *errmsg,
1977 int errnoval, const char *data, size_t recsz,
1979 InputFile *ipf= ipf_v;
1980 assert(ev == OOP_RD_SYSTEM);
1982 sysdie("error reading input file: %s, offset %lu",
1983 ipf->path, (unsigned long)ipf->offset);
1986 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
1987 oop_rd_event ev, const char *errmsg,
1988 int errnoval, const char *data, size_t recsz,
1990 InputFile *ipf= ipf_v;
1992 char tokentextbuf[sizeof(TOKEN)*2+3];
1994 if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
1996 off_t old_offset= ipf->offset;
1997 ipf->offset += recsz + !!(ev == OOP_RD_OK);
1999 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
2001 if (ev==OOP_RD_PARTREC)
2002 feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
2003 /* but process it anyway */
2005 if (ipf->skippinglong) {
2006 if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
2007 return OOP_CONTINUE;
2009 if (ev==OOP_RD_LONG) {
2010 ipf->skippinglong= 1;
2011 X_BAD_DATA("overly long line");
2014 if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
2015 if (!recsz) X_BAD_DATA("empty line");
2018 if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
2019 ipf->readcount_blank++;
2020 return OOP_CONTINUE;
2023 char *space= strchr(data,' ');
2024 int tokenlen= space-data;
2025 int midlen= (int)recsz-tokenlen-1;
2026 if (midlen <= 2) X_BAD_DATA("no room for messageid");
2027 if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
2029 if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
2030 memcpy(tokentextbuf, data, tokenlen);
2031 tokentextbuf[tokenlen]= 0;
2032 if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
2034 ipf->readcount_ok++;
2036 art= xmalloc(sizeof(*art) - 1 + midlen + 1);
2037 memset(art,0,sizeof(*art));
2038 art->state= art_Unchecked;
2039 art->midlen= midlen;
2040 art->ipf= ipf; ipf->inprogress++;
2041 art->token= TextToToken(tokentextbuf);
2042 art->offset= old_offset;
2043 art->blanklen= recsz;
2044 strcpy(art->messageid, space+1);
2045 LIST_ADDTAIL(ipf->queue, art);
2047 if (ipf==backlog_input_file)
2048 article_check_expired(art);
2050 if (sms==sm_NORMAL && ipf==main_input_file &&
2051 ipf->offset >= target_max_feedfile_size)
2052 statemc_start_flush("feed file size");
2054 check_assign_articles();
2055 return OOP_CONTINUE;
2058 /*========== tailing input file ==========*/
2060 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
2062 InputFile *ipf= user;
2063 return ipf->readable_callback(loop, &ipf->readable,
2064 ipf->readable_callback_user);
2067 static void tailing_on_cancel(struct oop_readable *rable) {
2068 InputFile *ipf= (void*)rable;
2070 if (ipf->filemon) filemon_stop(ipf);
2071 loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2072 ipf->readable_callback= 0;
2075 static void tailing_queue_readable(InputFile *ipf) {
2076 /* lifetime of ipf here is OK because destruction will cause
2077 * on_cancel which will cancel this callback */
2078 loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2081 static int tailing_on_readable(struct oop_readable *rable,
2082 oop_readable_call *cb, void *user) {
2083 InputFile *ipf= (void*)rable;
2085 tailing_on_cancel(rable);
2086 ipf->readable_callback= cb;
2087 ipf->readable_callback_user= user;
2090 tailing_queue_readable(ipf);
2094 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2096 InputFile *ipf= (void*)rable;
2098 ssize_t r= read(ipf->fd, buffer, length);
2100 if (errno==EINTR) continue;
2104 if (ipf==main_input_file) {
2107 } else if (ipf==flushing_input_file) {
2109 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2110 } else if (ipf==backlog_input_file) {
2116 tailing_queue_readable(ipf);
2121 /*---------- filemon implemented with inotify ----------*/
2123 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2124 #define HAVE_FILEMON
2126 #include <sys/inotify.h>
2128 static int filemon_inotify_fd;
2129 static int filemon_inotify_wdmax;
2130 static InputFile **filemon_inotify_wd2ipf;
2132 struct Filemon_Perfile {
2136 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2137 int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2138 if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2140 if (wd >= filemon_inotify_wdmax) {
2142 filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2143 sizeof(*filemon_inotify_wd2ipf) * newmax);
2144 memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2145 sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2146 filemon_inotify_wdmax= newmax;
2149 assert(!filemon_inotify_wd2ipf[wd]);
2150 filemon_inotify_wd2ipf[wd]= ipf;
2152 debug("filemon inotify startfile %p wd=%d wdmax=%d",
2153 ipf, wd, filemon_inotify_wdmax);
2158 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2160 debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2161 int r= inotify_rm_watch(filemon_inotify_fd, wd);
2162 if (r) sysdie("inotify_rm_watch");
2163 filemon_inotify_wd2ipf[wd]= 0;
2166 static void *filemon_inotify_readable(oop_source *lp, int fd,
2167 oop_event e, void *u) {
2168 struct inotify_event iev;
2170 int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2172 if (isewouldblock(errno)) break;
2173 sysdie("read from inotify master");
2174 } else if (r==sizeof(iev)) {
2175 assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2177 die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2179 InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2180 debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2181 filemon_callback(ipf);
2183 return OOP_CONTINUE;
2186 static int filemon_method_init(void) {
2187 filemon_inotify_fd= inotify_init();
2188 if (filemon_inotify_fd<0) {
2189 syswarn("filemon/inotify: inotify_init failed");
2192 xsetnonblock(filemon_inotify_fd, 1);
2193 loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2195 debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2199 static void filemon_method_dump_info(FILE *f) {
2201 fprintf(f,"inotify");
2202 DUMPV("%d",,filemon_inotify_fd);
2203 DUMPV("%d",,filemon_inotify_wdmax);
2204 for (i=0; i<filemon_inotify_wdmax; i++)
2205 fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i],);
2208 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2210 /*---------- filemon dummy implementation ----------*/
2212 #if !defined(HAVE_FILEMON)
2214 struct Filemon_Perfile { int dummy; };
2216 static int filemon_method_init(void) {
2217 warn("filemon/dummy: no filemon method compiled in");
2220 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2221 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2222 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2224 #endif /* !HAVE_FILEMON */
2226 /*---------- filemon generic interface ----------*/
2228 static void filemon_start(InputFile *ipf) {
2229 assert(!ipf->filemon);
2232 filemon_method_startfile(ipf, ipf->filemon);
2235 static void filemon_stop(InputFile *ipf) {
2236 if (!ipf->filemon) return;
2237 filemon_method_stopfile(ipf, ipf->filemon);
2242 static void filemon_callback(InputFile *ipf) {
2243 if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2244 ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2247 /*---------- interface to start and stop an input file ----------*/
2249 static const oop_rd_style feedfile_rdstyle= {
2250 OOP_RD_DELIM_STRIP, '\n',
2252 OOP_RD_SHORTREC_LONG,
2255 static void inputfile_reading_start(InputFile *ipf) {
2257 ipf->readable.on_readable= tailing_on_readable;
2258 ipf->readable.on_cancel= tailing_on_cancel;
2259 ipf->readable.try_read= tailing_try_read;
2260 ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2261 ipf->readable.delete_kill= 0;
2263 ipf->readable_callback= 0;
2264 ipf->readable_callback_user= 0;
2266 ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2269 int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2270 feedfile_got_article,ipf, feedfile_read_err, ipf);
2271 if (r) sysdie("unable start reading feedfile %s",ipf->path);
2274 static void inputfile_reading_stop(InputFile *ipf) {
2276 oop_rd_cancel(ipf->rd);
2277 oop_rd_delete(ipf->rd);
2279 assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2283 /*========== interaction with innd - state machine ==========*/
2285 /* See official state diagram at top of file. We implement
2296 |`---------------------------------------------------.
2298 |`---------------- - - - |
2299 D ENOENT | D EXISTS see OVERALL STATES diagram |
2300 | for full startup logic |
2303 | ============ try to |
2309 | | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT |
2310 ^ | hardlink F to D |
2313 | | our handle onto F is now onto D |
2316 | |<-------------------<---------------------<---------+
2318 | | spawn inndcomm flush |
2320 | ================== |
2321 | FLUSHING[-ABSENT] |
2323 | main D tail/none |
2324 | ================== |
2326 | | INNDCOMM FLUSH FAILS ^
2327 | |`----------------------->----------. |
2329 | | NO SUCH SITE V |
2330 ^ |`--------------->----. ==================== |
2331 | | \ FLUSHFAILED[-ABSENT] |
2333 | | FLUSH OK \ main D tail/none |
2334 | | open F \ ==================== |
2336 | | \ | TIME TO RETRY |
2337 | |`------->----. ,---<---'\ `----------------'
2338 | | D NONE | | D NONE `----.
2340 | ============= V V ============
2341 | SEPARATED-1 | | DROPPING-1
2342 | flsh->rd!=0 | | flsh->rd!=0
2343 | [Separated] | | [Dropping]
2344 | main F idle | | main none
2345 | flsh D tail | | flsh D tail
2346 | ============= | | ============
2348 ^ | EOF ON D | | defer | EOF ON D
2350 | =============== | | ===============
2351 | SEPARATED-2 | | DROPPING-2
2352 | flsh->rd==0 | V flsh->rd==0
2353 | [Finishing] | | [Dropping]
2354 | main F tail | `. main none
2355 | flsh D closed | `. flsh D closed
2356 | =============== V `. ===============
2358 | | ALL D PROCESSED `. | ALL D PROCESSED
2359 | V install defer as backlog `. | install defer
2360 ^ | close D `. | close D
2361 | | unlink D `. | unlink D
2364 `----------' ==============
2384 static void startup_set_input_file(InputFile *f) {
2385 assert(!main_input_file);
2387 inputfile_reading_start(f);
2390 static void statemc_lock(void) {
2392 struct stat stab, stabf;
2395 lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2396 if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2399 memset(&fl,0,sizeof(fl));
2401 fl.l_whence= SEEK_SET;
2402 int r= fcntl(lockfd, F_SETLK, &fl);
2404 if (errno==EACCES || isewouldblock(errno)) {
2405 if (quiet_multiple) exit(0);
2406 fatal("another duct holds the lockfile");
2408 sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2411 xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2413 xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2415 if (!lock_noent && samefile(&stab, &stabf))
2418 xclose(lockfd, "stale lockfile ", path_lock);
2421 FILE *lockfile= fdopen(lockfd, "w");
2422 if (!lockfile) sysdie("fdopen lockfile");
2424 int r= ftruncate(lockfd, 0);
2425 if (r) sysdie("truncate lockfile to write new info");
2427 if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2428 (unsigned long)self_pid,
2429 sitename, feedfile, remote_host) == EOF ||
2431 sysfatal("write info to lockfile %s", path_lock);
2433 debug("startup: locked");
2436 static void statemc_init(void) {
2437 struct stat stabdefer;
2439 search_backlog_file();
2442 xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2444 debug("startup: ductdefer ENOENT");
2446 debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2447 switch (stabdefer.st_nlink==1) {
2449 open_defer(); /* so that we will later close it and rename it */
2452 xunlink(path_defer, "stale defer file link"
2453 " (presumably hardlink to backlog file)");
2456 die("defer file %s has unexpected link count %d",
2457 path_defer, stabdefer.st_nlink);
2461 struct stat stab_f, stab_d;
2464 InputFile *file_d= open_input_file(path_flushing);
2465 if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2467 xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2469 if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2470 debug("startup: F==D => Hardlinked");
2471 xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2476 debug("startup: F ENOENT => Moved");
2477 if (file_d) startup_set_input_file(file_d);
2478 spawn_inndcomm_flush("feedfile missing at startup");
2479 /* => Flushing, sms:=FLUSHING */
2482 debug("startup: F!=D => Separated");
2483 startup_set_input_file(file_d);
2484 flushing_input_file= main_input_file;
2485 main_input_file= open_input_file(feedfile);
2486 if (!main_input_file) die("feedfile vanished during startup");
2487 SMS(SEPARATED, 0, "found both old and current feed files");
2489 debug("startup: F exists, D ENOENT => Normal");
2490 InputFile *file_f= open_input_file(feedfile);
2491 if (!file_f) die("feed file vanished during startup");
2492 startup_set_input_file(file_f);
2493 SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2498 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2499 assert(sms == sm_NORMAL);
2501 debug("starting flush (%s) (%lu >?= %lu) (%d)",
2503 (unsigned long)(main_input_file ? main_input_file->offset : 0),
2504 (unsigned long)target_max_feedfile_size,
2507 int r= link(feedfile, path_flushing);
2508 if (r) sysfatal("link feedfile %s to flushing file %s",
2509 feedfile, path_flushing);
2512 xunlink(feedfile, "old feedfile link");
2515 spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2518 static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
2521 statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2523 case sm_FLUSHFAILED:
2524 spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2531 static void statemc_period_poll(void) {
2532 if (!until_flush) return;
2534 assert(until_flush>=0);
2536 if (until_flush) return;
2537 int ok= trigger_flush_ok();
2541 static int inputfile_is_done(InputFile *ipf) {
2543 if (ipf->inprogress) return 0; /* new article in the meantime */
2544 if (ipf->rd) return 0; /* not had EOF */
2548 static void notice_processed(InputFile *ipf, int completed,
2549 const char *what, const char *spec) {
2550 if (!ipf) return; /* allows preterminate to be lazy */
2552 #define RCI_NOTHING(x) /* nothing */
2553 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2554 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2556 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2558 char *inprog= completed
2559 ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2560 : xasprintf(" inprogress=%ld", ipf->inprogress);
2562 info("%s %s%s read=%d (+bl=%d,+err=%d)%s"
2563 " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2564 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2566 completed?"completed":"processed", what, spec,
2567 ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, inprog,
2568 CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2569 , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2570 CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2571 , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2572 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
2580 static void statemc_check_backlog_done(void) {
2581 InputFile *ipf= backlog_input_file;
2582 if (!inputfile_is_done(ipf)) return;
2584 const char *slash= strrchr(ipf->path, '/');
2585 const char *leaf= slash ? slash+1 : ipf->path;
2586 const char *under= strchr(slash, '_');
2587 const char *rest= under ? under+1 : leaf;
2588 if (!strncmp(rest,"backlog",7)) rest += 7;
2589 notice_processed(ipf,1,"backlog ",rest);
2591 close_input_file(ipf);
2592 if (unlink(ipf->path)) {
2593 if (errno != ENOENT)
2594 sysdie("could not unlink processed backlog file %s", ipf->path);
2595 warn("backlog file %s vanished while we were reading it"
2596 " so we couldn't remove it (but it's done now, anyway)",
2600 backlog_input_file= 0;
2601 search_backlog_file();
2605 static void statemc_check_flushing_done(void) {
2606 InputFile *ipf= flushing_input_file;
2607 if (!inputfile_is_done(ipf)) return;
2609 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2611 notice_processed(ipf,1,"feedfile","");
2615 xunlink(path_flushing, "old flushing file");
2617 close_input_file(flushing_input_file);
2618 free(flushing_input_file);
2619 flushing_input_file= 0;
2621 if (sms==sm_SEPARATED) {
2622 notice("flush complete");
2623 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2624 } else if (sms==sm_DROPPING) {
2625 SMS(DROPPED, 0, "old flush complete");
2626 search_backlog_file();
2627 notice("feed dropped, but will continue until backlog is finished");
2631 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2633 assert(!inputfile_is_done(main_input_file));
2634 statemc_check_flushing_done();
2635 statemc_check_backlog_done();
2636 return OOP_CONTINUE;
2639 static void queue_check_input_done(void) {
2640 loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2643 static void statemc_setstate(StateMachineState newsms, int periods,
2644 const char *forlog, const char *why) {
2646 until_flush= periods;
2648 const char *xtra= "";
2651 case sm_FLUSHFAILED:
2652 if (!main_input_file) xtra= "-ABSENT";
2656 xtra= flushing_input_file->rd ? "-1" : "-2";
2662 info("state %s%s[%d] %s",forlog,xtra,periods,why);
2664 info("state %s%s %s",forlog,xtra,why);
2668 /*---------- defer and backlog files ----------*/
2670 static void open_defer(void) {
2675 defer= fopen(path_defer, "a+");
2676 if (!defer) sysfatal("could not open defer file %s", path_defer);
2678 /* truncate away any half-written records */
2680 xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2682 if (stab.st_size > LONG_MAX)
2683 die("defer file %s size is far too large", path_defer);
2688 long orgsize= stab.st_size;
2689 long truncto= stab.st_size;
2691 if (!truncto) break; /* was only (if anything) one half-truncated record */
2692 if (fseek(defer, truncto-1, SEEK_SET) < 0)
2693 sysdie("seek in defer file %s while truncating partial", path_defer);
2698 sysdie("failed read from defer file %s", path_defer);
2700 die("defer file %s shrank while we were checking it!", path_defer);
2706 if (stab.st_size != truncto) {
2707 warn("truncating half-record at end of defer file %s -"
2708 " shrinking by %ld bytes from %ld to %ld",
2709 path_defer, orgsize - truncto, orgsize, truncto);
2712 sysfatal("could not flush defer file %s", path_defer);
2713 if (ftruncate(fileno(defer), truncto))
2714 sysdie("could not truncate defer file %s", path_defer);
2717 info("continuing existing defer file %s (%ld bytes)",
2718 path_defer, orgsize);
2720 if (fseek(defer, truncto, SEEK_SET))
2721 sysdie("could not seek to new end of defer file %s", path_defer);
2724 static void close_defer(void) {
2729 xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2731 if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2734 time_t now= xtime();
2736 char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2738 (unsigned long)stab.st_ino);
2739 if (link(path_defer, backlog))
2740 sysfatal("could not install defer file %s as backlog file %s",
2741 path_defer, backlog);
2742 if (unlink(path_defer))
2743 sysdie("could not unlink old defer link %s to backlog file %s",
2744 path_defer, backlog);
2748 if (until_backlog_nextscan < 0 ||
2749 until_backlog_nextscan > backlog_retry_minperiods + 1)
2750 until_backlog_nextscan= backlog_retry_minperiods + 1;
2753 static void poll_backlog_file(void) {
2754 if (until_backlog_nextscan < 0) return;
2755 if (until_backlog_nextscan-- > 0) return;
2756 search_backlog_file();
2759 static void search_backlog_file(void) {
2760 /* returns non-0 iff there are any backlog files */
2765 const char *oldest_path=0;
2766 time_t oldest_mtime=0, now;
2768 if (backlog_input_file) return;
2772 r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2776 sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2778 fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2780 for (i=0; i<gl.gl_pathc; i++) {
2781 const char *path= gl.gl_pathv[i];
2783 if (strchr(path,'#') || strchr(path,'~')) {
2784 debug("backlog file search skipping %s", path);
2787 r= stat(path, &stab);
2789 syswarn("failed to stat backlog file %s", path);
2792 if (!S_ISREG(stab.st_mode)) {
2793 warn("backlog file %s is not a plain file (or link to one)", path);
2796 if (!oldest_path || stab.st_mtime < oldest_mtime) {
2798 oldest_mtime= stab.st_mtime;
2801 case GLOB_NOMATCH: /* fall through */
2804 sysdie("glob expansion of backlog pattern %s gave unexpected"
2805 " nonzero (error?) return value %d", globpat_backlog, r);
2809 debug("backlog scan: none");
2811 if (sms==sm_DROPPED) {
2813 notice("feed dropped and our work is complete");
2815 int r= unlink(path_control);
2816 if (r && errno!=ENOENT)
2817 syswarn("failed to remove control symlink for old feed");
2819 xunlink(path_lock, "lockfile for old feed");
2822 until_backlog_nextscan= backlog_spontrescan_periods;
2827 double age= difftime(now, oldest_mtime);
2828 long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2830 if (age_deficiency <= 0) {
2831 debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2832 age, age_deficiency, oldest_path);
2834 backlog_input_file= open_input_file(oldest_path);
2835 if (!backlog_input_file) {
2836 warn("backlog file %s vanished as we opened it", oldest_path);
2840 inputfile_reading_start(backlog_input_file);
2841 until_backlog_nextscan= -1;
2845 until_backlog_nextscan= age_deficiency / period_seconds;
2847 if (backlog_spontrescan_periods >= 0 &&
2848 until_backlog_nextscan > backlog_spontrescan_periods)
2849 until_backlog_nextscan= backlog_spontrescan_periods;
2851 debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2852 age, age_deficiency, until_backlog_nextscan, oldest_path);
2859 /*---------- shutdown and signal handling ----------*/
2861 static void preterminate(void) {
2862 if (in_child) return;
2863 notice_processed(main_input_file,0,"feedfile","");
2864 notice_processed(flushing_input_file,0,"flushing","");
2865 if (backlog_input_file)
2866 notice_processed(backlog_input_file,0, "backlog file ",
2867 backlog_input_file->path);
2870 static int signal_self_pipe[2];
2871 static sig_atomic_t terminate_sig_flag;
2873 static void raise_default(int signo) {
2874 xsigsetdefault(signo);
2879 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2880 assert(fd=signal_self_pipe[0]);
2882 int r= read(signal_self_pipe[0], buf, sizeof(buf));
2883 if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2884 if (r==0) die("eof on signal self pipe");
2885 if (terminate_sig_flag) {
2887 notice("terminating (%s)", strsignal(terminate_sig_flag));
2888 raise_default(terminate_sig_flag);
2890 return OOP_CONTINUE;
2893 static void sigarrived_handler(int signum) {
2898 if (!terminate_sig_flag) terminate_sig_flag= signum;
2903 write(signal_self_pipe[1],&x,1);
2906 static void init_signals(void) {
2907 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
2908 sysdie("could not ignore SIGPIPE");
2910 if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
2912 xsetnonblock(signal_self_pipe[0],1);
2913 xsetnonblock(signal_self_pipe[1],1);
2915 struct sigaction sa;
2916 memset(&sa,0,sizeof(sa));
2917 sa.sa_handler= sigarrived_handler;
2918 sa.sa_flags= SA_RESTART;
2919 xsigaction(SIGTERM,&sa);
2920 xsigaction(SIGINT,&sa);
2922 on_fd_read_except(signal_self_pipe[0], sigarrived_event);
2925 /*========== flushing the feed ==========*/
2927 static pid_t inndcomm_child;
2928 static int inndcomm_sentinel_fd;
2930 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
2931 assert(inndcomm_child);
2932 assert(fd == inndcomm_sentinel_fd);
2933 int status= xwaitpid(&inndcomm_child, "inndcomm");
2936 cancel_fd_read_except(fd);
2937 xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
2938 inndcomm_sentinel_fd= 0;
2940 assert(!flushing_input_file);
2942 if (WIFEXITED(status)) {
2943 switch (WEXITSTATUS(status)) {
2945 case INNDCOMMCHILD_ESTATUS_FAIL:
2948 case INNDCOMMCHILD_ESTATUS_NONESUCH:
2949 notice("feed has been dropped by innd, finishing up");
2950 flushing_input_file= main_input_file;
2951 tailing_queue_readable(flushing_input_file);
2952 /* we probably previously returned EAGAIN from our fake read method
2953 * when in fact we were at EOF, so signal another readable event
2954 * so we actually see the EOF */
2958 if (flushing_input_file) {
2959 SMS(DROPPING, 0, "feed dropped by innd, but must finish last flush");
2962 SMS(DROPPED, 0, "feed dropped by innd");
2963 search_backlog_file();
2965 return OOP_CONTINUE;
2969 flushing_input_file= main_input_file;
2970 tailing_queue_readable(flushing_input_file);
2972 main_input_file= open_input_file(feedfile);
2973 if (!main_input_file)
2974 die("flush succeeded but feedfile %s does not exist!", feedfile);
2976 if (flushing_input_file) {
2977 SMS(SEPARATED, spontaneous_flush_periods, "recovery flush complete");
2980 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2982 return OOP_CONTINUE;
2985 goto unexpected_exitstatus;
2988 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
2989 warn("flush timed out trying to talk to innd");
2992 unexpected_exitstatus:
2993 report_child_status("inndcomm child", status);
2997 SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
2998 return OOP_CONTINUE;
3001 static void inndcommfail(const char *what) {
3002 syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
3003 exit(INNDCOMMCHILD_ESTATUS_FAIL);
3006 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
3009 notice("flushing %s",why);
3011 assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
3012 assert(!inndcomm_child);
3013 assert(!inndcomm_sentinel_fd);
3015 if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
3017 inndcomm_child= xfork("inndcomm child");
3019 if (!inndcomm_child) {
3020 const char *flushargv[2]= { sitename, 0 };
3024 xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
3025 /* parent spots the autoclose of pipefds[1] when we die or exit */
3027 if (simulate_flush>=0) {
3028 warn("SIMULATING flush child status %d", simulate_flush);
3029 if (simulate_flush>128) raise(simulate_flush-128);
3030 else exit(simulate_flush);
3033 alarm(inndcomm_flush_timeout);
3034 r= ICCopen(); if (r) inndcommfail("connect");
3035 r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit");
3036 if (!r) exit(0); /* yay! */
3038 if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
3039 syswarn("innd ctlinnd flush failed: innd said %s", reply);
3040 exit(INNDCOMMCHILD_ESTATUS_FAIL);
3045 xclose(pipefds[1], "inndcomm sentinel child's end",0);
3046 inndcomm_sentinel_fd= pipefds[0];
3047 assert(inndcomm_sentinel_fd);
3048 on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
3050 SMS(FLUSHING, 0, why);
3053 /*========== main program ==========*/
3055 static void postfork_inputfile(InputFile *ipf) {
3057 xclose(ipf->fd, "(in child) input file ", ipf->path);
3060 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
3061 /* we have no stdio streams that are buffered long-term */
3063 if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
3066 static void postfork(void) {
3069 xsigsetdefault(SIGTERM);
3070 xsigsetdefault(SIGINT);
3071 xsigsetdefault(SIGPIPE);
3072 if (terminate_sig_flag) raise(terminate_sig_flag);
3074 postfork_inputfile(main_input_file);
3075 postfork_inputfile(flushing_input_file);
3079 conn_closefd(conn,"(in child) ");
3081 postfork_stdio(defer, "defer file ", path_defer);
3084 typedef struct Every Every;
3086 struct timeval interval;
3091 static void every_schedule(Every *e, struct timeval base);
3093 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3096 if (!e->fixed_rate) xgettimeofday(&base);
3097 every_schedule(e, base);
3098 return OOP_CONTINUE;
3101 static void every_schedule(Every *e, struct timeval base) {
3102 struct timeval when;
3103 timeradd(&base, &e->interval, &when);
3104 loop->on_time(loop, when, every_happens, e);
3107 static void every(int interval, int fixed_rate, void (*f)(void)) {
3108 NEW_DECL(Every *,e);
3109 e->interval.tv_sec= interval;
3110 e->interval.tv_usec= 0;
3111 e->fixed_rate= fixed_rate;
3114 xgettimeofday(&now);
3115 every_schedule(e, now);
3118 static void filepoll(void) {
3119 filemon_callback(main_input_file);
3120 filemon_callback(flushing_input_file);
3123 static char *debug_report_ipf(InputFile *ipf) {
3124 if (!ipf) return xasprintf("none");
3126 const char *slash= strrchr(ipf->path,'/');
3127 const char *path= slash ? slash+1 : ipf->path;
3129 return xasprintf("%p/%s:queue=%d,ip=%ld,off=%ld,fd=%d%s%s",
3131 ipf->queue.count, ipf->inprogress, (long)ipf->offset,
3133 ipf->rd ? "" : ",!rd",
3134 ipf->skippinglong ? "*skiplong" : "");
3137 static void period(void) {
3138 char *dipf_main= debug_report_ipf(main_input_file);
3139 char *dipf_flushing= debug_report_ipf(flushing_input_file);
3140 char *dipf_backlog= debug_report_ipf(backlog_input_file);
3143 " sms=%s[%d] conns=%d until_connect=%d"
3144 " input_files main:%s flushing:%s backlog:%s"
3145 " children connecting=%ld inndcomm=%ld"
3147 sms_names[sms], until_flush, conns.count, until_connect,
3148 dipf_main, dipf_flushing, dipf_backlog,
3149 (long)connecting_child, (long)inndcomm_child
3153 free(dipf_flushing);
3156 if (until_connect) until_connect--;
3158 inputfile_queue_check_expired(backlog_input_file);
3159 poll_backlog_file();
3160 if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3161 statemc_period_poll();
3162 check_assign_articles();
3167 /*========== dumping state ==========*/
3169 static void dump_article_list(FILE *f, const ControlCommand *c,
3170 const ArticleList *al) {
3171 fprintf(f, " count=%d\n", al->count);
3172 if (!c->xval) return;
3174 int i; Article *art;
3175 for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3176 fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3177 DUMPV("%p", art->,ipf);
3178 DUMPV("%d", art->,missing);
3179 DUMPV("%lu", (unsigned long)art->,offset);
3180 DUMPV("%d", art->,blanklen);
3181 DUMPV("%d", art->,midlen);
3182 fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3186 static void dump_input_file(FILE *f, const ControlCommand *c,
3187 InputFile *ipf, const char *wh) {
3188 char *dipf= debug_report_ipf(ipf);
3189 fprintf(f,"input %s %s", wh, dipf);
3193 DUMPV("%d", ipf->,readcount_ok);
3194 DUMPV("%d", ipf->,readcount_blank);
3195 DUMPV("%d", ipf->,readcount_err);
3199 ArtState state; const char *const *statename;
3200 for (state=0, statename=artstate_names; *statename; state++,statename++) {
3201 #define RC_DUMP_FMT(x) " " #x "=%d"
3202 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3203 fprintf(f,"input %s counts %-11s"
3204 RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3206 RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3208 fprintf(f,"input %s queue", wh);
3209 dump_article_list(f,c,&ipf->queue);
3215 fprintf(cc->out, "dumping state to %s\n", path_dump);
3216 FILE *f= fopen(path_dump, "w");
3217 if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3219 fprintf(f,"general");
3220 DUMPV("%s", sms_names,[sms]);
3221 DUMPV("%d", ,until_flush);
3222 DUMPV("%ld", (long),self_pid);
3223 DUMPV("%p", , defer);
3224 DUMPV("%d", , until_connect);
3225 DUMPV("%d", , until_backlog_nextscan);
3226 DUMPV("%d", , simulate_flush);
3227 fprintf(f,"\nnocheck");
3228 DUMPV("%#.10f", , accept_proportion);
3229 DUMPV("%d", , nocheck);
3230 DUMPV("%d", , nocheck_reported);
3233 fprintf(f,"special");
3234 DUMPV("%ld", (long),connecting_child);
3235 DUMPV("%d", , connecting_fdpass_sock);
3236 DUMPV("%d", , control_master);
3239 fprintf(f,"filemon ");
3240 filemon_method_dump_info(f);
3242 dump_input_file(f,c, main_input_file, "main" );
3243 dump_input_file(f,c, flushing_input_file, "flushing");
3244 dump_input_file(f,c, backlog_input_file, "backlog" );
3246 fprintf(f,"conns count=%d\n", conns.count);
3251 fprintf(f,"C%d",conn->fd);
3252 DUMPV("%p",conn->,rd); DUMPV("%d",conn->,max_queue);
3253 DUMPV("%d",conn->,stream); DUMPV("%d",conn->,quitting);
3254 DUMPV("%d",conn->,since_activity);
3257 fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3258 fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3259 fprintf(f,"C%d sent", conn->fd); dump_article_list(f,c,&conn->sent);
3261 fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3262 for (i=0; i<conn->xmitu; i++) {
3263 const struct iovec *iv= &conn->xmit[i];
3264 const XmitDetails *xd= &conn->xmitd[i];
3267 case xk_Const: dinfo= xasprintf("Const"); break;
3268 case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break;
3272 fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3273 sanitise(iv->iov_base, iv->iov_len));
3279 DUMPV("%s", , path_lock);
3280 DUMPV("%s", , path_flushing);
3281 DUMPV("%s", , path_defer);
3282 DUMPV("%s", , path_control);
3283 DUMPV("%s", , path_dump);
3284 DUMPV("%s", , globpat_backlog);
3287 if (!!ferror(f) + !!fclose(f)) {
3288 fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3293 /*========== option parsing ==========*/
3295 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3296 static void vbadusage(const char *fmt, va_list al) {
3297 char *m= xvasprintf(fmt,al);
3298 fprintf(stderr, "bad usage: %s\n"
3299 "say --help for help, or read the manpage\n",
3302 syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3306 /*---------- generic option parser ----------*/
3308 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3309 static void badusage(const char *fmt, ...) {
3316 of_seconds= 001000u,
3317 of_boolean= 002000u,
3320 typedef struct Option Option;
3321 typedef void OptionParser(const Option*, const char *val);
3325 const char *lng, *formarg;
3331 static void parse_options(const Option *options, char ***argvp) {
3332 /* on return *argvp is first non-option arg; argc is not updated */
3335 const char *arg= *++(*argvp);
3337 if (*arg != '-') break;
3338 if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3340 while ((a= *++arg)) {
3344 char *equals= strchr(arg,'=');
3345 int len= equals ? (equals - arg) : strlen(arg);
3346 for (o=options; o->shrt || o->lng; o++)
3347 if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3349 badusage("unknown long option --%s",arg);
3352 if (equals) badusage("option --%s does not take a value",o->lng);
3354 } else if (equals) {
3358 if (!arg) badusage("option --%s needs a value for %s",
3359 o->lng, o->formarg);
3362 break; /* eaten the whole argument now */
3364 for (o=options; o->shrt || o->lng; o++)
3367 badusage("unknown short option -%c",a);
3374 if (!arg) badusage("option -%c needs a value for %s",
3375 o->shrt, o->formarg);
3378 break; /* eaten the whole argument now */
3384 #define DELIMPERHAPS(delim,str) (str) ? (delim) : "", (str) ? (str) : ""
3386 static void print_options(const Option *options, FILE *f) {
3388 for (o=options; o->shrt || o->lng; o++) {
3389 char shrt[2] = { o->shrt, 0 };
3390 char *optspec= xasprintf("%s%s%s%s%s",
3391 o->shrt ? "-" : "", shrt,
3392 o->shrt && o->lng ? "|" : "",
3393 DELIMPERHAPS("--", o->lng));
3394 fprintf(f, " %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3399 /*---------- specific option types ----------*/
3401 static void op_integer(const Option *o, const char *val) {
3404 unsigned long ul= strtoul(val,&ep,10);
3405 if (*ep || ep==val || errno || ul>INT_MAX)
3406 badusage("bad integer value for %s",o->lng);
3407 int *store= o->store;
3411 static void op_double(const Option *o, const char *val) {
3412 int *store= o->store;
3415 *store= strtod(val, &ep);
3416 if (*ep || ep==val || errno)
3417 badusage("bad floating point value for %s",o->lng);
3420 static void op_string(const Option *o, const char *val) {
3421 const char **store= o->store;
3425 static void op_seconds(const Option *o, const char *val) {
3426 int *store= o->store;
3430 double v= strtod(val,&ep);
3431 if (ep==val) badusage("bad time/duration value for %s",o->lng);
3433 if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3434 else if (!strcmp(ep,"m") || !strcmp(ep,"min")) unit= 60;
3435 else if (!strcmp(ep,"h") || !strcmp(ep,"hour")) unit= 3600;
3436 else if (!strcmp(ep,"d") || !strcmp(ep,"day")) unit= 86400;
3437 else if (!strcmp(ep,"das")) unit= 10;
3438 else if (!strcmp(ep,"hs")) unit= 100;
3439 else if (!strcmp(ep,"ks")) unit= 1000;
3440 else if (!strcmp(ep,"Ms")) unit= 1000000;
3441 else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3445 if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3449 static void op_setint(const Option *o, const char *val) {
3450 int *store= o->store;
3454 /*---------- specific options ----------*/
3456 static void help(const Option *o, const char *val);
3458 static const Option innduct_options[]= {
3459 {'f',"feedfile", "F", &feedfile, op_string },
3460 {'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 },
3461 {0,"no-daemon", 0, &become_daemon, op_setint, 0 },
3462 {0,"no-streaming", 0, &try_stream, op_setint, 0 },
3463 {0,"no-filemon", 0, &try_filemon, op_setint, 0 },
3464 {'C',"inndconf", "F", &inndconffile, op_string },
3465 {'P',"port", "PORT", &port, op_integer },
3466 {0,"ctrl-sock-dir", 0, &realsockdir, op_string },
3467 {0,"help", 0, 0, help },
3469 {0,"max-connections", "N", &max_connections, op_integer },
3470 {0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer },
3471 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer },
3472 {0,"period-interval", "TIME", &period_seconds, op_seconds },
3474 {0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds },
3475 {0,"stuck-flush-timeout", "TIME", &inndcomm_flush_timeout, op_seconds },
3476 {0,"feedfile-poll", "TIME", &filepoll_seconds, op_seconds },
3478 {0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double },
3479 {0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double },
3481 {0,"reconnect-interval", "PERIOD", &reconnect_delay_periods, op_seconds },
3482 {0,"flush-retry-interval", "PERIOD", &flushfail_retry_periods, op_seconds },
3483 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3484 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3485 {0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds },
3486 {0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds },
3488 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double },
3489 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer },
3494 static void printusage(FILE *f) {
3495 fputs("usage: innduct [options] site [fqdn]\n"
3496 "available options are:\n", f);
3497 print_options(innduct_options, f);
3500 static void help(const Option *o, const char *val) {
3502 if (ferror(stdout) || fflush(stdout)) {
3503 perror("innduct: writing help");
3509 static void convert_to_periods_rndup(int *store) {
3510 *store += period_seconds-1;
3511 *store /= period_seconds;
3514 int main(int argc, char **argv) {
3520 parse_options(innduct_options, &argv);
3525 if (!sitename) badusage("need site name argument");
3526 remote_host= *argv++;
3527 if (*argv) badusage("too many non-option arguments");
3531 int r= innconf_read(inndconffile);
3532 if (!r) badusage("could not read inn.conf (more info on stderr)");
3534 if (!remote_host) remote_host= sitename;
3536 if (nocheck_thresh < 0 || nocheck_thresh > 100)
3537 badusage("nocheck threshold percentage must be between 0..100");
3538 nocheck_thresh *= 0.01;
3540 if (nocheck_decay < 0.1)
3541 badusage("nocheck decay articles must be at least 0.1");
3542 nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3544 convert_to_periods_rndup(&reconnect_delay_periods);
3545 convert_to_periods_rndup(&flushfail_retry_periods);
3546 convert_to_periods_rndup(&backlog_retry_minperiods);
3547 convert_to_periods_rndup(&backlog_spontrescan_periods);
3548 convert_to_periods_rndup(&spontaneous_flush_periods);
3549 convert_to_periods_rndup(&need_activity_periods);
3551 if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3552 badusage("bad input data ratio must be between 0..100");
3553 max_bad_data_ratio *= 0.01;
3556 feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3557 } else if (!feedfile[0]) {
3558 badusage("feed filename must be nonempty");
3559 } else if (feedfile[strlen(feedfile)-1]=='/') {
3560 feedfile= xasprintf("%s%s",feedfile,sitename);
3563 const char *feedfile_forbidden= "?*[~#";
3565 while ((c= *feedfile_forbidden++))
3566 if (strchr(feedfile, c))
3567 badusage("feed filename may not contain metacharacter %c",c);
3571 path_lock= xasprintf("%s_lock", feedfile);
3572 path_flushing= xasprintf("%s_flushing", feedfile);
3573 path_defer= xasprintf("%s_defer", feedfile);
3574 path_control= xasprintf("%s_control", feedfile);
3575 path_dump= xasprintf("%s_dump", feedfile);
3576 globpat_backlog= xasprintf("%s_backlog*", feedfile);
3578 oop_source_sys *sysloop= oop_sys_new();
3579 if (!sysloop) sysdie("could not create liboop event loop");
3580 loop= (oop_source*)sysloop;
3584 if (become_daemon) {
3586 for (i=3; i<255; i++)
3587 /* do this now before we open syslog, etc. */
3589 openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3591 int null= open("/dev/null",O_RDWR);
3592 if (null<0) sysfatal("failed to open /dev/null");
3596 xclose(null, "/dev/null original fd",0);
3598 pid_t child1= xfork("daemonise first fork");
3599 if (child1) _exit(0);
3601 pid_t sid= setsid();
3602 if (sid != child1) sysfatal("setsid failed");
3604 pid_t child2= xfork("daemonise second fork");
3605 if (child2) _exit(0);
3609 if (self_pid==-1) sysdie("getpid");
3624 notice("filemon: suppressed by command line option, polling");
3626 filemon_ok= filemon_method_init();
3628 warn("filemon: no file monitoring available, polling");
3631 every(filepoll_seconds,0,filepoll);
3633 every(period_seconds,1,period);
3639 void *run= oop_sys_run(sysloop);
3640 assert(run == OOP_ERROR);
3641 sysdie("event loop failed");