3 * build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
8 to ensure articles go away eventually
9 separate queue for each input file
11 every period, check head of backlog queue for expiry with SMretrieve
12 if too old: discard, and check next article
13 also check every backlog article as we read it
15 after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
16 one-off: eat queued articles from flushing and write them to defer
17 one-off: connfail all connections which have any articles from flushing
18 newly read articles from flushing go straight to defer
19 this should take care of it and get us out of this state
20 to avoid filling up ram needlessly
22 limit number of queued articles for each ipf
23 pause/resume inputfile tailing
27 * Newsfeeds file entries should look like this:
28 * host.name.of.site[/exclude,exclude,...]\
29 * :pattern,pattern...[/distribution,distribution...]\
33 * sitename[/exclude,exclude,...]\
34 * :pattern,pattern...[/distribution,distribution...]\
40 * or might be blanked out
41 * <spc><spc><spc><spc>....
43 * F site.name main feed file
44 * opened/created, then written, by innd
47 * tokens blanked out by duct when processed
48 * site.name_lock lock preventing multiple ducts
49 * to hold lock must open,F_SETLK[W]
50 * and then stat to check that locked file
51 * still has name site.name_lock
52 * holder of this lock is "duct"
53 * (only) lockholder may remove the lockfile
54 * D site.name_flushing temporary feed file during flush (or crash)
55 * hardlink created by duct
57 * site.name_defer 431'd articles, still being written,
58 * created, written, used by duct
60 * site.name_backlog.<date>.<inum>
61 * 431'd articles, ready for innxmit or duct
62 * created (link/mv) by duct
63 * site.name_backlog<anything-else> (where <anything-else> does not
64 * contain '#' or '~') eg
65 * site.name_backlog.manual
66 * anything the sysadmin likes (eg, feed files
67 * from old feeds to be merged into this one)
68 * created (link/mv) by admin
69 * may be symlinks (in which case links
70 * may be written through, but only links
73 * It is safe to remove backlog files manually,
74 * if it's desired to throw away the backlog.
76 * Backlog files are also processed by innduct. We find the oldest
77 * backlog file which is at least a certain amount old, and feed it
78 * back into our processing. When every article in it has been read
79 * and processed, we unlink it and look for another backlog file.
81 * If we don't have a backlog file that we're reading, we close the
82 * defer file that we're writing and make it into a backlog file at
83 * the first convenient opportunity.
94 | | <----------------<---------------------------------'|
100 | F: innd writing, duct reading |
103 | | duct decides time to flush |
104 | | duct makes hardlink |
106 | V <------------------------'|
108 | F == D: innd writing, duct reading both exist |
111 | | <-----------<-------------<--'|
112 | | open D F ENOENT |
115 | V <---------------------. |
118 | D: innd writing, duct reading; or ENOENT | |
120 | | duct requests flush of feed | |
121 | | (others can too, harmlessly) | |
125 | D: innd flushing, duct; or ENOENT | |
127 | | inndcomm flush fails | |
128 | |`-------------------------->------------------' |
130 | | inndcomm reports no such site |
131 | |`---------------------------------------------------- | -.
133 | | innd finishes writing D, creates F | |
134 | | inndcomm reports flush successful | |
137 | Separated <----------------' |
138 | F: innd writing F!=D /
139 | D: duct reading; or ENOENT both exist /
141 | | duct gets to the end of D /
142 | | duct opens F too /
145 | F: innd writing, duct reading |
146 | D: duct finishing V
148 | | duct finishes processing D F: ENOENT
149 | V duct unlinks D D: duct reading
151 `--<--' | duct finishes
161 "duct reading" means innduct is reading the file but also
162 overwriting processed tokens.
166 * rune for printing diagrams:
168 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
173 /*============================== PROGRAM ==============================*/
175 #define _GNU_SOURCE 1
181 #include "inndcomm.h"
183 #include "inn/list.h"
184 #include "inn/innconf.h"
187 #include <sys/types.h>
188 #include <sys/wait.h>
189 #include <sys/stat.h>
190 #include <sys/socket.h>
209 #include <oop-read.h>
211 /*----- general definitions, probably best not changed -----*/
213 #define CONNCHILD_ESTATUS_STREAM 24
214 #define CONNCHILD_ESTATUS_NOSTREAM 25
216 #define INNDCOMMCHILD_ESTATUS_FAIL 26
217 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
219 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
220 #define MAX_CONTROL_COMMAND 1000
222 #define VA va_list al; va_start(al,fmt)
223 #define PRINTF(f,a) __attribute__((__format__(printf,f,a)))
224 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
225 #define NORET __attribute__((__noreturn__))
227 #define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr))))
228 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
230 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
232 #define FOR_CONN(conn) \
233 for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
235 /*----- doubly linked lists -----*/
237 #define ISNODE(T) struct node list_node
240 union { struct list li; T *for_type; } u; \
244 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
246 #define LIST_CHECKCANHAVENODE(l,n) \
247 ((void)((n) == ((l).u.for_type))) /* just for the type check */
249 #define LIST_ADDSOMEHOW(l,n,list_addsomehow) \
250 ( LIST_CHECKCANHAVENODE(l,n), \
251 list_addsomehow(&(l).u.li, NODE((n))), \
255 #define LIST_REMSOMEHOW(l,list_remsomehow) \
256 ( (typeof((l).u.for_type)) \
259 list_remsomehow(&(l).u.li) ) \
265 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
266 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
267 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
268 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
270 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
271 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
272 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
273 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
275 #define LIST_REMOVE(l,n) \
276 ( LIST_CHECKCANHAVENODE(l,n), \
277 list_remove(NODE((n))), \
281 #define LIST_INSERT(l,n,pred) \
282 ( LIST_CHECKCANHAVENODE(l,n), \
283 LIST_CHECKCANHAVENODE(l,pred), \
284 list_insert((struct list*)&(l), NODE((n)), NODE((pred))), \
288 /*----- type predeclarations -----*/
290 typedef struct Conn Conn;
291 typedef struct Article Article;
292 typedef struct InputFile InputFile;
293 typedef struct XmitDetails XmitDetails;
294 typedef struct Filemon_Perfile Filemon_Perfile;
295 typedef enum StateMachineState StateMachineState;
296 typedef struct ControlCommand ControlCommand;
301 /*----- function predeclarations -----*/
303 static void conn_maybe_write(Conn *conn);
304 static void conn_make_some_xmits(Conn *conn);
305 static void *conn_write_some_xmits(Conn *conn);
307 static void xmit_free(XmitDetails *d);
309 #define SMS(newstate, periods, why) \
310 (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
311 static void statemc_setstate(StateMachineState newsms, int periods,
312 const char *forlog, const char *why);
314 static void statemc_start_flush(const char *why); /* Normal => Flushing */
315 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
316 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
318 static void article_done(Conn *conn, Article *art, int whichcount);
320 static void check_assign_articles(void);
321 static void queue_check_input_done(void);
323 static void statemc_check_flushing_done(void);
324 static void statemc_check_backlog_done(void);
326 static void postfork(void);
327 static void period(void);
329 static void open_defer(void);
330 static void close_defer(void);
331 static void search_backlog_file(void);
332 static void preterminate(void);
333 static void raise_default(int signo) NORET;
334 static char *debug_report_ipf(InputFile *ipf);
336 static void inputfile_reading_start(InputFile *ipf);
337 static void inputfile_reading_stop(InputFile *ipf);
339 static void filemon_start(InputFile *ipf);
340 static void filemon_stop(InputFile *ipf);
341 static void filemon_callback(InputFile *ipf);
343 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
344 static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
346 static const oop_rd_style peer_rd_style;
347 static oop_rd_call peer_rd_err, peer_rd_ok;
349 /*----- configuration options -----*/
350 /* when changing defaults, remember to update the manpage */
352 static const char *sitename, *remote_host;
353 static const char *feedfile, *realsockdir="/tmp/innduct.control";
354 static int quiet_multiple=0;
355 static int become_daemon=1, try_filemon=1;
356 static int try_stream=1;
358 static const char *inndconffile;
360 static int max_connections=10;
361 static int max_queue_per_conn=200;
362 static int target_max_feedfile_size=100000;
363 static int period_seconds=60;
364 static int filepoll_seconds=5;
366 static int connection_setup_timeout=200;
367 static int inndcomm_flush_timeout=100;
369 static double nocheck_thresh= 95.0; /* converted from percentage by main */
370 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
372 /* all these are initialised to seconds, and converted to periods in main */
373 static int reconnect_delay_periods=1000;
374 static int flushfail_retry_periods=1000;
375 static int backlog_retry_minperiods=50;
376 static int backlog_spontrescan_periods=300;
377 static int spontaneous_flush_periods=100000;
378 static int need_activity_periods=1000;
380 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
381 static int max_bad_data_initial= 30;
382 /* in one corrupt 4096-byte block the number of newlines has
383 * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */
386 /*----- statistics -----*/
388 typedef enum { /* in queue in conn->sent */
389 art_Unchecked, /* not checked, not sent checking */
390 art_Wanted, /* checked, wanted sent body as requested */
391 art_Unsolicited, /* - sent body without check */
395 static const char *const artstate_names[]=
396 { "Unchecked", "Wanted", "Unsolicited", 0 };
398 #define RESULT_COUNTS(RCS,RCN) \
407 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
408 #define RCI_TRIPLE_VALS_BASE(counts,x) \
409 counts[art_Unchecked] x \
410 + counts[art_Wanted] x \
411 + counts[art_Unsolicited] x, \
412 counts[art_Unchecked] x \
413 , counts[art_Wanted] x \
414 , counts[art_Unsolicited] x
417 #define RC_INDEX(x) RC_##x,
418 RESULT_COUNTS(RC_INDEX, RC_INDEX)
423 /*----- transmission buffers -----*/
439 /*----- core operational data structure types -----*/
442 /* This is also an instance of struct oop_readable */
443 struct oop_readable readable; /* first */
444 oop_readable_call *readable_callback;
445 void *readable_callback_user;
448 Filemon_Perfile *filemon;
450 oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
455 long inprogress; /* includes queue.count and also articles in conns */
457 int counts[art_MaxState][RCI_max];
458 int readcount_ok, readcount_blank, readcount_err;
473 #define SMS_LIST(X) \
481 enum StateMachineState {
482 #define SMS_DEF_ENUM(s) sm_##s,
483 SMS_LIST(SMS_DEF_ENUM)
486 static const char *sms_names[]= {
487 #define SMS_DEF_NAME(s) #s ,
488 SMS_LIST(SMS_DEF_NAME)
494 int fd; /* may be 0, meaning closed (during construction/destruction) */
495 oop_read *rd; /* likewise */
496 int max_queue, stream, quitting;
497 int since_activity; /* periods */
498 ArticleList waiting; /* not yet told peer */
499 ArticleList priority; /* peer says send it now */
500 ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
501 struct iovec xmit[CONNIOVS];
502 XmitDetails xmitd[CONNIOVS];
507 /*----- general operational variables -----*/
509 /* main initialises */
510 static oop_source *loop;
511 static ConnList conns;
512 static char *path_lock, *path_flushing, *path_defer;
513 static char *path_control, *path_dump;
514 static char *globpat_backlog;
515 static pid_t self_pid;
517 /* statemc_init initialises */
518 static StateMachineState sms;
519 static int until_flush;
520 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
523 /* initialisation to 0 is good */
524 static int until_connect, until_backlog_nextscan;
525 static double accept_proportion;
526 static int nocheck, nocheck_reported, in_child;
528 /* for simulation, debugging, etc. */
529 int simulate_flush= -1;
531 /*========== logging ==========*/
533 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
534 static void logcore(int sysloglevel, const char *fmt, ...) {
537 vsyslog(sysloglevel,fmt,al);
539 if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
540 vfprintf(stderr,fmt,al);
546 static void logv(int sysloglevel, const char *pfx, int errnoval,
547 const char *fmt, va_list al) PRINTF(5,0);
548 static void logv(int sysloglevel, const char *pfx, int errnoval,
549 const char *fmt, va_list al) {
550 char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
551 vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
552 msgbuf[sizeof(msgbuf)-1]= 0;
554 if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
555 sysloglevel= LOG_ERR; /* run by wrong user, probably */
557 logcore(sysloglevel, "<%s>%s: %s%s%s",
558 sitename, pfx, msgbuf,
559 errnoval>=0 ? ": " : "",
560 errnoval>=0 ? strerror(errnoval) : "");
563 #define diewrap(fn, pfx, sysloglevel, err, estatus) \
564 static void fn(const char *fmt, ...) NORET_PRINTF(1,2); \
565 static void fn(const char *fmt, ...) { \
568 logv(sysloglevel, pfx, err, fmt, al); \
572 #define logwrap(fn, pfx, sysloglevel, err) \
573 static void fn(const char *fmt, ...) PRINTF(1,2); \
574 static void fn(const char *fmt, ...) { \
576 logv(sysloglevel, pfx, err, fmt, al); \
580 diewrap(sysdie, " critical", LOG_CRIT, errno, 16);
581 diewrap(die, " critical", LOG_CRIT, -1, 16);
583 diewrap(sysfatal, " fatal", LOG_ERR, errno, 12);
584 diewrap(fatal, " fatal", LOG_ERR, -1, 12);
586 logwrap(syswarn, " warning", LOG_WARNING, errno);
587 logwrap(warn, " warning", LOG_WARNING, -1);
589 logwrap(notice, " notice", LOG_NOTICE, -1);
590 logwrap(info, " info", LOG_INFO, -1);
591 logwrap(debug, " debug", LOG_DEBUG, -1);
594 /*========== utility functions etc. ==========*/
596 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
597 static char *xvasprintf(const char *fmt, va_list al) {
599 int rc= vasprintf(&str,fmt,al);
600 if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
603 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
604 static char *xasprintf(const char *fmt, ...) {
606 char *str= xvasprintf(fmt,al);
611 static int close_perhaps(int *fd) {
612 if (*fd <= 0) return 0;
617 static void xclose(int fd, const char *what, const char *what2) {
619 if (r) sysdie("close %s%s",what,what2?what2:"");
621 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
622 if (*fd <= 0) return;
623 xclose(*fd,what,what2);
627 static pid_t xfork(const char *what) {
631 if (child==-1) sysfatal("cannot fork for %s",what);
632 debug("forked %s %ld", what, (unsigned long)child);
633 if (!child) postfork();
637 static void on_fd_read_except(int fd, oop_call_fd callback) {
638 loop->on_fd(loop, fd, OOP_READ, callback, 0);
639 loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
641 static void cancel_fd_read_except(int fd) {
642 loop->cancel_fd(loop, fd, OOP_READ);
643 loop->cancel_fd(loop, fd, OOP_EXCEPTION);
646 static void report_child_status(const char *what, int status) {
647 if (WIFEXITED(status)) {
648 int es= WEXITSTATUS(status);
650 warn("%s: child died with error exit status %d", what, es);
651 } else if (WIFSIGNALED(status)) {
652 int sig= WTERMSIG(status);
653 const char *sigstr= strsignal(sig);
654 const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
656 warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
658 warn("%s: child died due to unknown fatal signal %d%s",
659 what, sig, coredump);
661 warn("%s: child died with unknown wait status %d", what,status);
665 static int xwaitpid(pid_t *pid, const char *what) {
668 int r= kill(*pid, SIGKILL);
669 if (r) sysdie("cannot kill %s child", what);
671 pid_t got= waitpid(*pid, &status, 0);
672 if (got==-1) sysdie("cannot reap %s child", what);
673 if (got==0) die("cannot reap %s child", what);
680 static void *zxmalloc(size_t sz) {
681 void *p= xmalloc(sz);
686 static void xunlink(const char *path, const char *what) {
688 if (r) sysdie("can't unlink %s %s", path, what);
691 static time_t xtime(void) {
693 if (now==-1) sysdie("time(2) failed");
697 static void xsigaction(int signo, const struct sigaction *sa) {
698 int r= sigaction(signo,sa,0);
699 if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
702 static void xsigsetdefault(int signo) {
704 memset(&sa,0,sizeof(sa));
705 sa.sa_handler= SIG_DFL;
706 xsigaction(signo,&sa);
709 static void xgettimeofday(struct timeval *tv_r) {
710 int r= gettimeofday(tv_r,0);
711 if (r) sysdie("gettimeofday(2) failed");
714 static void xsetnonblock(int fd, int nonblocking) {
715 int errnoval= oop_fd_nonblock(fd, nonblocking);
716 if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
719 static void check_isreg(const struct stat *stab, const char *path,
721 if (!S_ISREG(stab->st_mode))
722 die("%s %s not a plain file (mode 0%lo)",
723 what, path, (unsigned long)stab->st_mode);
726 static void xfstat(int fd, struct stat *stab_r, const char *what) {
727 int r= fstat(fd, stab_r);
728 if (r) sysdie("could not fstat %s", what);
731 static void xfstat_isreg(int fd, struct stat *stab_r,
732 const char *path, const char *what) {
733 xfstat(fd, stab_r, what);
734 check_isreg(stab_r, path, what);
737 static void xlstat_isreg(const char *path, struct stat *stab,
738 int *enoent_r /* 0 means ENOENT is fatal */,
740 int r= lstat(path, stab);
742 if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
743 sysdie("could not lstat %s %s", what, path);
745 if (enoent_r) *enoent_r= 0;
746 check_isreg(stab, path, what);
749 static int samefile(const struct stat *a, const struct stat *b) {
750 assert(S_ISREG(a->st_mode));
751 assert(S_ISREG(b->st_mode));
752 return (a->st_ino == b->st_ino &&
753 a->st_dev == b->st_dev);
756 static char *sanitise(const char *input, int len) {
757 static char sanibuf[100]; /* returns pointer to this buffer! */
759 const char *p= input;
760 const char *endp= len>=0 ? input+len : 0;
764 if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
765 int c= (!endp || p<endp) ? *p++ : 0;
766 if (!c) { *q++= '\''; *q=0; break; }
767 if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
768 sprintf(q,"\\x%02x",c);
774 static int isewouldblock(int errnoval) {
775 return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
778 /*========== command and control connections ==========*/
780 static int control_master;
782 typedef struct ControlConn ControlConn;
784 void (*destroy)(ControlConn*);
790 struct sockaddr_un un;
795 static const oop_rd_style control_rd_style= {
796 OOP_RD_DELIM_STRIP, '\n',
798 OOP_RD_SHORTREC_FORBID
801 static void control_destroy(ControlConn *cc) {
805 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
806 if (ferror(cc->out) | fflush(cc->out)) {
807 info("CTRL%d write error %s", cc->fd, strerror(errno));
812 static void control_prompt(ControlConn *cc /* may destroy*/) {
813 fprintf(cc->out, "%s| ", sitename);
814 control_checkouterr(cc);
817 struct ControlCommand {
819 void (*f)(ControlConn *cc, const ControlCommand *ccmd,
820 const char *arg, size_t argsz);
825 static const ControlCommand control_commands[];
828 static void ccmd_##wh(ControlConn *cc, const ControlCommand *c, \
829 const char *arg, size_t argsz)
832 fputs("commands:\n", cc->out);
833 const ControlCommand *ccmd;
834 for (ccmd=control_commands; ccmd->cmd; ccmd++)
835 fprintf(cc->out, " %s\n", ccmd->cmd);
836 fputs("NB: permissible arguments are not shown above."
837 " Not all commands listed are safe. See innduct(8).\n", cc->out);
841 int ok= trigger_flush_ok();
842 if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
847 notice("terminating (CTRL%d)",cc->fd);
848 raise_default(SIGTERM);
854 /* messing with our head: */
855 CCMD(period) { period(); }
856 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
857 CCMD(setint) { *(int*)c->xdata= c->xval; }
858 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
860 static const ControlCommand control_commands[]= {
862 { "flush", ccmd_flush },
863 { "stop", ccmd_stop },
864 { "dump q", ccmd_dump, 0,0 },
865 { "dump a", ccmd_dump, 0,1 },
867 { "p", ccmd_period },
869 #define POKES(cmd,func) \
870 { cmd "flush", func, &until_flush, 1 }, \
871 { cmd "conn", func, &until_connect, 0 }, \
872 { cmd "blscan", func, &until_backlog_nextscan, 0 },
873 POKES("next ", ccmd_setint)
874 POKES("prod ", ccmd_setint_period)
876 { "pretend flush", ccmd_setintarg, &simulate_flush },
877 { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 },
881 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
882 const char *errmsg, int errnoval,
883 const char *data, size_t recsz, void *cc_v) {
884 ControlConn *cc= cc_v;
887 info("CTRL%d closed", cc->fd);
892 if (recsz == 0) goto prompt;
894 const ControlCommand *ccmd;
895 for (ccmd=control_commands; ccmd->cmd; ccmd++) {
896 int l= strlen(ccmd->cmd);
897 if (recsz < l) continue;
898 if (recsz > l && data[l] != ' ') continue;
899 if (memcmp(data, ccmd->cmd, l)) continue;
901 int argl= (int)recsz - (l+1);
902 ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
906 fputs("unknown command; h for help\n", cc->out);
913 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
914 const char *errmsg, int errnoval,
915 const char *data, size_t recsz, void *cc_v) {
916 ControlConn *cc= cc_v;
918 info("CTRL%d read error %s", cc->fd, errmsg);
923 static int control_conn_startup(ControlConn *cc /* may destroy*/,
925 cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
926 if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
928 int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
931 if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
933 info("CTRL%d %s ready", cc->fd, how);
938 static void control_stdio_destroy(ControlConn *cc) {
940 oop_rd_cancel(cc->rd);
941 errno= oop_rd_delete_tidy(cc->rd);
942 if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
947 static void control_stdio(void) {
948 NEW_DECL(ControlConn *,cc);
949 cc->destroy= control_stdio_destroy;
953 int r= control_conn_startup(cc,"stdio");
954 if (r) cc->destroy(cc);
957 static void control_accepted_destroy(ControlConn *cc) {
959 oop_rd_cancel(cc->rd);
960 oop_rd_delete_kill(cc->rd);
962 if (cc->out) { fclose(cc->out); cc->fd=0; }
963 close_perhaps(&cc->fd);
967 static void *control_master_readable(oop_source *lp, int master,
968 oop_event ev, void *u) {
969 NEW_DECL(ControlConn *,cc);
970 cc->destroy= control_accepted_destroy;
972 cc->salen= sizeof(cc->sa);
973 cc->fd= accept(master, &cc->sa.sa, &cc->salen);
974 if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
976 cc->out= fdopen(cc->fd, "w");
977 if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
979 int r= control_conn_startup(cc, "accepted");
989 #define NOCONTROL(...) do{ \
990 syswarn("no control socket, because failed to " __VA_ARGS__); \
994 static void control_init(void) {
999 struct sockaddr_un un;
1002 memset(&sa,0,sizeof(sa));
1003 int maxlen= sizeof(sa.un.sun_path);
1005 int reallen= readlink(path_control, sa.un.sun_path, maxlen);
1007 if (errno != ENOENT)
1008 NOCONTROL("readlink control socket symlink path %s", path_control);
1010 if (reallen >= maxlen) {
1011 debug("control socket symlink path too long (r=%d)",reallen);
1012 xunlink(path_control, "old (overlong) control socket symlink");
1018 int r= lstat(realsockdir,&stab);
1020 if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
1022 r= mkdir(realsockdir, 0700);
1023 if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
1026 uid_t self= geteuid();
1027 if (!S_ISDIR(stab.st_mode) ||
1028 stab.st_uid != self ||
1029 stab.st_mode & 0007) {
1030 warn("no control socket, because real socket directory"
1031 " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
1032 !!S_ISDIR(stab.st_mode),
1033 (unsigned long)stab.st_uid, (unsigned long)self,
1034 (unsigned long)stab.st_mode & 0777UL);
1039 real= xasprintf("%s/s%lx.%lx", realsockdir,
1040 (unsigned long)xtime(), (unsigned long)self_pid);
1041 int reallen= strlen(real);
1043 if (reallen >= maxlen) {
1044 warn("no control socket, because tmpnam gave overly-long path"
1048 r= symlink(real, path_control);
1049 if (r) NOCONTROL("make control socket path %s a symlink to real"
1050 " socket path %s", path_control, real);
1051 memcpy(sa.un.sun_path, real, reallen);
1054 int r= unlink(sa.un.sun_path);
1055 if (r && errno!=ENOENT)
1056 NOCONTROL("remove old real socket %s", sa.un.sun_path);
1058 control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1059 if (control_master<0) NOCONTROL("create new control socket");
1061 sa.un.sun_family= AF_UNIX;
1062 int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1063 r= bind(control_master, &sa.sa, sl);
1064 if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1066 r= listen(control_master, 5);
1067 if (r) NOCONTROL("listen");
1069 xsetnonblock(control_master, 1);
1071 loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1072 info("control socket ok, real path %s", sa.un.sun_path);
1078 xclose_perhaps(&control_master, "control master",0);
1082 /*========== management of connections ==========*/
1084 static void conn_closefd(Conn *conn, const char *msgprefix) {
1085 int r= close_perhaps(&conn->fd);
1086 if (r) info("C%d %serror closing socket: %s",
1087 conn->fd, msgprefix, strerror(errno));
1090 static void conn_dispose(Conn *conn) {
1093 oop_rd_cancel(conn->rd);
1094 oop_rd_delete_kill(conn->rd);
1098 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1099 loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1101 conn_closefd(conn,"");
1103 until_connect= reconnect_delay_periods;
1106 static void *conn_exception(oop_source *lp, int fd,
1107 oop_event ev, void *conn_v) {
1110 assert(fd == conn->fd);
1111 assert(ev == OOP_EXCEPTION);
1112 int r= read(conn->fd, &ch, 1);
1113 if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1114 else connfail(conn,"exceptional condition on socket (peer sent urgent"
1115 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1116 return OOP_CONTINUE;
1119 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1120 int requeue[art_MaxState];
1121 memset(requeue,0,sizeof(requeue));
1125 while ((art= LIST_REMHEAD(conn->priority)))
1126 LIST_ADDTAIL(art->ipf->queue, art);
1128 while ((art= LIST_REMHEAD(conn->waiting)))
1129 LIST_ADDTAIL(art->ipf->queue, art);
1131 while ((art= LIST_REMHEAD(conn->sent))) {
1132 requeue[art->state]++;
1133 if (art->state==art_Unsolicited) art->state= art_Unchecked;
1134 LIST_ADDTAIL(art->ipf->queue,art);
1139 for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1142 char *m= xvasprintf(fmt,al);
1143 warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1144 conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1147 LIST_REMOVE(conns,conn);
1149 check_assign_articles();
1152 static void connfail(Conn *conn, const char *fmt, ...) {
1155 vconnfail(conn,fmt,al);
1159 static void check_idle_conns(void) {
1162 conn->since_activity++;
1165 if (conn->since_activity <= need_activity_periods) continue;
1167 /* We need to shut this down */
1169 connfail(conn,"timed out waiting for response to QUIT");
1170 else if (conn->sent.count)
1171 connfail(conn,"timed out waiting for responses");
1172 else if (conn->waiting.count || conn->priority.count)
1173 connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1174 else if (conn->xmitu)
1175 connfail(conn,"peer has been sending responses"
1176 " before receiving our commands!");
1178 static const char quitcmd[]= "QUIT\r\n";
1179 int todo= sizeof(quitcmd)-1;
1180 const char *p= quitcmd;
1182 int r= write(conn->fd, p, todo);
1184 if (isewouldblock(errno))
1185 connfail(conn, "blocked writing QUIT to idle connection");
1187 connfail(conn, "failed to write QUIT to idle connection: %s",
1195 conn->since_activity= 0;
1196 debug("C%d is idle, quitting", conn->fd);
1205 /*---------- making new connections ----------*/
1207 static pid_t connecting_child;
1208 static int connecting_fdpass_sock;
1210 static void connect_attempt_discard(void) {
1211 if (connecting_child) {
1212 int status= xwaitpid(&connecting_child, "connect");
1213 if (!(WIFEXITED(status) ||
1214 (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1215 report_child_status("connect", status);
1217 if (connecting_fdpass_sock) {
1218 cancel_fd_read_except(connecting_fdpass_sock);
1219 xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1223 #define PREP_DECL_MSG_CMSG(msg) \
1225 struct iovec msgiov; \
1226 msgiov.iov_base= &msgbyte; \
1227 msgiov.iov_len= 1; \
1228 struct msghdr msg; \
1229 memset(&msg,0,sizeof(msg)); \
1230 char msg##cbuf[CMSG_SPACE(sizeof(int))]; \
1231 msg.msg_iov= &msgiov; \
1232 msg.msg_iovlen= 1; \
1233 msg.msg_control= msg##cbuf; \
1234 msg.msg_controllen= sizeof(msg##cbuf);
1236 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1239 assert(fd == connecting_fdpass_sock);
1241 PREP_DECL_MSG_CMSG(msg);
1243 ssize_t rs= recvmsg(fd, &msg, 0);
1245 if (isewouldblock(errno)) return OOP_CONTINUE;
1246 syswarn("failed to read socket from connecting child");
1251 LIST_INIT(conn->waiting);
1252 LIST_INIT(conn->priority);
1253 LIST_INIT(conn->sent);
1255 struct cmsghdr *h= 0;
1256 if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1258 int status= xwaitpid(&connecting_child, "connect child (broken)");
1260 if (WIFEXITED(status)) {
1261 if (WEXITSTATUS(status) != 0 &&
1262 WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1263 WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1264 /* child already reported the problem */;
1266 if (e == OOP_EXCEPTION)
1267 warn("connect: connection child exited code %d but"
1268 " unexpected exception on fdpass socket",
1269 WEXITSTATUS(status));
1271 warn("connect: connection child exited code %d but"
1273 WEXITSTATUS(status), (int)rs);
1275 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1276 warn("connect: connection attempt timed out");
1278 report_child_status("connect", status);
1283 #define CHK(field, val) \
1284 if (h->cmsg_##field != val) { \
1285 die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1286 h->cmsg_##field, val); \
1289 CHK(level, SOL_SOCKET);
1290 CHK(type, SCM_RIGHTS);
1291 CHK(len, CMSG_LEN(sizeof(conn->fd)));
1294 if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1296 memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1299 pid_t got= waitpid(connecting_child, &status, 0);
1300 if (got==-1) sysdie("connect: real wait for child");
1301 assert(got == connecting_child);
1302 connecting_child= 0;
1304 if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1305 int es= WEXITSTATUS(status);
1307 case CONNCHILD_ESTATUS_STREAM: conn->stream= 1; break;
1308 case CONNCHILD_ESTATUS_NOSTREAM: conn->stream= 0; break;
1310 fatal("connect: child gave unexpected exit status %d", es);
1314 conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1316 loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1317 conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1318 if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1319 int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1321 &peer_rd_err, conn);
1322 if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1324 notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1325 LIST_ADDHEAD(conns, conn);
1327 connect_attempt_discard();
1328 check_assign_articles();
1329 return OOP_CONTINUE;
1333 connect_attempt_discard();
1334 return OOP_CONTINUE;
1337 static int allow_connect_start(void) {
1338 return conns.count < max_connections
1339 && !connecting_child
1343 static void connect_start(void) {
1344 assert(!connecting_child);
1345 assert(!connecting_fdpass_sock);
1347 info("starting connection attempt");
1350 int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1351 if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1353 connecting_child= xfork("connection");
1355 if (!connecting_child) {
1356 FILE *cn_from, *cn_to;
1357 char buf[NNTP_STRLEN+100];
1358 int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1360 xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1362 alarm(connection_setup_timeout);
1363 if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1367 unsigned char c= buf[l-1];
1368 if (!isspace(c)) break;
1369 if (c=='\n' || c=='\r') stripped=1;
1373 sysfatal("connect: connection attempt failed");
1376 fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1380 if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1381 sysfatal("connect: authentication failed");
1383 if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1385 sysfatal("connect: could not send MODE STREAM");
1386 buf[sizeof(buf)-1]= 0;
1387 if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1388 if (ferror(cn_from))
1389 sysfatal("connect: could not read response to MODE STREAM");
1391 fatal("connect: connection close in response to MODE STREAM");
1396 fatal("connect: response to MODE STREAM is too long: %.100s...",
1398 l--; if (l>0 && buf[l-1]=='\r') l--;
1401 int rcode= strtoul(buf,&ep,10);
1403 fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1407 exitstatus= CONNCHILD_ESTATUS_STREAM;
1413 warn("connect: unexpected response to MODE STREAM: %.50s",
1419 int fd= fileno(cn_from);
1421 PREP_DECL_MSG_CMSG(msg);
1422 struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1423 cmsg->cmsg_level= SOL_SOCKET;
1424 cmsg->cmsg_type= SCM_RIGHTS;
1425 cmsg->cmsg_len= CMSG_LEN(sizeof(fd));
1426 memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1428 msg.msg_controllen= cmsg->cmsg_len;
1429 r= sendmsg(socks[1], &msg, 0);
1430 if (r<0) sysdie("sendmsg failed for new connection");
1431 if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1436 xclose(socks[1], "connecting fdpass child's socket",0);
1437 connecting_fdpass_sock= socks[0];
1438 xsetnonblock(connecting_fdpass_sock, 1);
1439 on_fd_read_except(connecting_fdpass_sock, connchild_event);
1442 /*---------- assigning articles to conns, and transmitting ----------*/
1444 static Article *dequeue_from(int peek, InputFile *ipf) {
1446 if (peek) return LIST_HEAD(ipf->queue);
1447 else return LIST_REMHEAD(ipf->queue);
1450 static Article *dequeue(int peek) {
1452 art= dequeue_from(peek, flushing_input_file); if (art) return art;
1453 art= dequeue_from(peek, backlog_input_file); if (art) return art;
1454 art= dequeue_from(peek, main_input_file); if (art) return art;
1458 static void check_assign_articles(void) {
1464 int spare=0, inqueue=0;
1466 /* Find a connection to offer this article. We prefer a busy
1467 * connection to an idle one, provided it's not full. We take the
1468 * first (oldest) and since that's stable, it will mean we fill up
1469 * connections in order. That way if we have too many
1470 * connections, the spare ones will go away eventually.
1473 if (walk->quitting) continue;
1474 inqueue= walk->sent.count + walk->priority.count
1475 + walk->waiting.count;
1476 spare= walk->max_queue - inqueue;
1477 assert(inqueue <= max_queue_per_conn);
1479 if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1480 else if (spare>0) /*working*/ { use= walk; break; }
1483 if (!inqueue) use->since_activity= 0; /* reset idle counter */
1485 Article *art= dequeue(0);
1487 LIST_ADDTAIL(use->waiting, art);
1490 conn_maybe_write(use);
1491 } else if (allow_connect_start()) {
1492 until_connect= reconnect_delay_periods;
1501 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1502 conn_maybe_write(u);
1503 return OOP_CONTINUE;
1506 static void conn_maybe_write(Conn *conn) {
1508 conn_make_some_xmits(conn);
1510 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1514 void *rp= conn_write_some_xmits(conn);
1515 if (rp==OOP_CONTINUE) {
1516 loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1518 } else if (rp==OOP_HALT) {
1521 /* transmitted everything */
1528 /*========== article transmission ==========*/
1530 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1531 XmitKind kind) { /* caller must then fill in details */
1532 struct iovec *v= &conn->xmit[conn->xmitu];
1533 XmitDetails *d= &conn->xmitd[conn->xmitu++];
1534 v->iov_base= (char*)data;
1540 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1541 xmit_core(conn,data,len, xk_Const);
1543 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1545 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1546 XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1550 static void xmit_free(XmitDetails *d) {
1552 case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1553 case xk_Const: break;
1558 static void *conn_write_some_xmits(Conn *conn) {
1560 * 0: nothing more to write, no need to call us again
1561 * OOP_CONTINUE: more to write but fd not writeable
1562 * OOP_HALT: disaster, have destroyed conn
1565 int count= conn->xmitu;
1566 if (!count) return 0;
1568 if (count > IOV_MAX) count= IOV_MAX;
1569 ssize_t rs= writev(conn->fd, conn->xmit, count);
1571 if (isewouldblock(errno)) return OOP_CONTINUE;
1572 connfail(conn, "write failed: %s", strerror(errno));
1578 for (done=0; rs && done<conn->xmitu; done++) {
1579 struct iovec *vp= &conn->xmit[done];
1580 XmitDetails *dp= &conn->xmitd[done];
1581 if (rs > vp->iov_len) {
1585 vp->iov_base= (char*)vp->iov_base + rs;
1589 int newu= conn->xmitu - done;
1590 memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
1591 memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1596 static void conn_make_some_xmits(Conn *conn) {
1598 if (conn->xmitu+5 > CONNIOVS)
1601 Article *art= LIST_REMHEAD(conn->priority);
1602 if (!art) art= LIST_REMHEAD(conn->waiting);
1605 if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1606 /* actually send it */
1608 ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1611 art->state == art_Unchecked ? art_Unsolicited :
1612 art->state == art_Wanted ? art_Wanted :
1615 if (!artdata) art->missing= 1;
1616 art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1620 XMIT_LITERAL("TAKETHIS ");
1621 xmit_noalloc(conn, art->messageid, art->midlen);
1622 XMIT_LITERAL("\r\n");
1623 xmit_artbody(conn, artdata);
1625 article_done(conn, art, -1);
1629 /* we got 235 from IHAVE */
1631 xmit_artbody(conn, artdata);
1633 XMIT_LITERAL(".\r\n");
1637 LIST_ADDTAIL(conn->sent, art);
1643 XMIT_LITERAL("CHECK ");
1645 XMIT_LITERAL("IHAVE ");
1646 xmit_noalloc(conn, art->messageid, art->midlen);
1647 XMIT_LITERAL("\r\n");
1649 assert(art->state == art_Unchecked);
1650 art->ipf->counts[art->state][RC_sent]++;
1651 LIST_ADDTAIL(conn->sent, art);
1657 /*========== handling responses from peer ==========*/
1659 static const oop_rd_style peer_rd_style= {
1660 OOP_RD_DELIM_STRIP, '\n',
1662 OOP_RD_SHORTREC_FORBID
1665 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1666 const char *errmsg, int errnoval,
1667 const char *data, size_t recsz, void *conn_v) {
1669 connfail(conn, "error receiving from peer: %s", errmsg);
1670 return OOP_CONTINUE;
1673 static Article *article_reply_check(Conn *conn, const char *response,
1674 int code_indicates_streaming,
1676 /* 1:yes, -1:no, 0:dontcare */,
1677 const char *sanitised_response) {
1678 Article *art= LIST_HEAD(conn->sent);
1682 "peer gave unexpected response when no commands outstanding: %s",
1683 sanitised_response);
1687 if (code_indicates_streaming) {
1688 assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1689 if (!conn->stream) {
1690 connfail(conn, "peer gave streaming response code "
1691 " to IHAVE or subsequent body: %s", sanitised_response);
1694 const char *got_mid= response+4;
1695 int got_midlen= strcspn(got_mid, " \n\r");
1696 if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1697 connfail(conn, "peer gave streaming response with syntactically invalid"
1698 " messageid: %s", sanitised_response);
1701 if (got_midlen != art->midlen ||
1702 memcmp(got_mid, art->messageid, got_midlen)) {
1703 connfail(conn, "peer gave streaming response code to wrong article -"
1704 " probable synchronisation problem; we offered: %s;"
1706 art->messageid, sanitised_response);
1711 connfail(conn, "peer gave non-streaming response code to"
1712 " CHECK/TAKETHIS: %s", sanitised_response);
1717 if (must_have_sent>0 && art->state < art_Wanted) {
1718 connfail(conn, "peer says article accepted but"
1719 " we had not sent the body: %s", sanitised_response);
1722 if (must_have_sent<0 && art->state >= art_Wanted) {
1723 connfail(conn, "peer says please sent the article but we just did: %s",
1724 sanitised_response);
1728 Article *art_again= LIST_REMHEAD(conn->sent);
1729 assert(art_again == art);
1733 static void update_nocheck(int accepted) {
1734 accept_proportion *= nocheck_decay;
1735 accept_proportion += accepted * (1.0 - nocheck_decay);
1736 int new_nocheck= accept_proportion >= nocheck_thresh;
1737 if (new_nocheck && !nocheck_reported) {
1738 notice("entering nocheck mode for the first time");
1739 nocheck_reported= 1;
1740 } else if (new_nocheck != nocheck) {
1741 debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1743 nocheck= new_nocheck;
1746 static void article_done(Conn *conn, Article *art, int whichcount) {
1747 if (!art->missing) art->ipf->counts[art->state][whichcount]++;
1749 if (whichcount == RC_accepted) update_nocheck(1);
1750 else if (whichcount == RC_unwanted) update_nocheck(0);
1752 InputFile *ipf= art->ipf;
1754 while (art->blanklen) {
1755 static const char spaces[]=
1765 int w= art->blanklen; if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1766 int r= pwrite(ipf->fd, spaces, w, art->offset);
1768 if (errno==EINTR) continue;
1769 sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1770 art->messageid, art->blanklen,
1771 (unsigned long)art->offset, ipf->path);
1773 assert(r>=0 && r<=w);
1779 assert(ipf->inprogress >= 0);
1782 if (!ipf->inprogress && ipf != main_input_file)
1783 queue_check_input_done();
1786 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1787 const char *errmsg, int errnoval,
1788 const char *data, size_t recsz, void *conn_v) {
1791 if (ev == OOP_RD_EOF) {
1792 connfail(conn, "unexpected EOF from peer");
1793 return OOP_CONTINUE;
1795 assert(ev == OOP_RD_OK);
1797 char *sani= sanitise(data,-1);
1800 unsigned long code= strtoul(data, &ep, 10);
1801 if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1802 connfail(conn, "badly formatted response from peer: %s", sani);
1803 return OOP_CONTINUE;
1807 conn->waiting.count ||
1808 conn->priority.count ||
1812 if (conn->quitting) {
1813 if (code!=205 && code!=503) {
1814 connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1816 notice("C%d idle connection closed by us", conn->fd);
1818 LIST_REMOVE(conns,conn);
1821 return OOP_CONTINUE;
1824 conn->since_activity= 0;
1827 #define GET_ARTICLE(musthavesent) do{ \
1828 art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1829 if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */ \
1832 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \
1833 code_streaming= (streaming); \
1834 GET_ARTICLE(musthavesent); \
1835 article_done(conn, art, RC_##how); \
1839 #define PEERBADMSG(m) do { \
1840 connfail(conn, m ": %s", sani); return OOP_CONTINUE; \
1843 int code_streaming= 0;
1847 case 400: PEERBADMSG("peer stopped accepting articles");
1848 default: PEERBADMSG("peer sent unexpected message");
1851 if (conn_busy) PEERBADMSG("peer timed us out");
1852 notice("C%d idle connection closed by peer", conn->fd);
1853 LIST_REMOVE(conns,conn);
1855 return OOP_CONTINUE;
1857 case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1858 case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1860 case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1861 case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1863 case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1864 case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1866 case 238: /* CHECK says send it */
1868 case 335: /* IHAVE says send it */
1870 assert(art->state == art_Unchecked);
1871 art->ipf->counts[art->state][RC_accepted]++;
1872 art->state= art_Wanted;
1873 LIST_ADDTAIL(conn->priority, art);
1876 case 431: /* CHECK or TAKETHIS says try later */
1878 case 436: /* IHAVE says try later */
1881 if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1883 sysfatal("write to defer file %s",path_defer);
1884 article_done(conn, art, RC_deferred);
1890 conn_maybe_write(conn);
1891 check_assign_articles();
1892 return OOP_CONTINUE;
1896 /*========== monitoring of input files ==========*/
1898 static void feedfile_eof(InputFile *ipf) {
1899 assert(ipf != main_input_file); /* promised by tailing_try_read */
1900 inputfile_reading_stop(ipf);
1902 if (ipf == flushing_input_file) {
1903 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1904 if (main_input_file) inputfile_reading_start(main_input_file);
1905 statemc_check_flushing_done();
1906 } else if (ipf == backlog_input_file) {
1907 statemc_check_backlog_done();
1909 abort(); /* supposed to wait rather than get EOF on main input file */
1913 static InputFile *open_input_file(const char *path) {
1914 int fd= open(path, O_RDWR);
1916 if (errno==ENOENT) return 0;
1917 sysfatal("unable to open input file %s", path);
1921 InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
1922 memset(ipf,0,sizeof(*ipf));
1925 strcpy(ipf->path, path);
1926 LIST_INIT(ipf->queue);
1931 static void close_input_file(InputFile *ipf) { /* does not free */
1932 assert(!ipf->readable_callback); /* must have had ->on_cancel */
1933 assert(!ipf->filemon); /* must have had inputfile_reading_stop */
1934 assert(!ipf->rd); /* must have had inputfile_reading_stop */
1935 assert(!ipf->inprogress); /* no dangling pointers pointing here */
1936 xclose_perhaps(&ipf->fd, "input file ", ipf->path);
1940 /*---------- dealing with articles read in the input file ----------*/
1942 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
1943 const char *data, const char *how) {
1944 warn("corrupted file: %s, offset %lu: %s: in %s",
1945 ipf->path, (unsigned long)offset, how, sanitise(data,-1));
1946 ipf->readcount_err++;
1947 if (ipf->readcount_err > max_bad_data_initial +
1948 (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
1949 die("too much garbage in input file! (%d errs, %d ok, %d blank)",
1950 ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
1951 return OOP_CONTINUE;
1954 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
1955 oop_rd_event ev, const char *errmsg,
1956 int errnoval, const char *data, size_t recsz,
1958 InputFile *ipf= ipf_v;
1959 assert(ev == OOP_RD_SYSTEM);
1961 sysdie("error reading input file: %s, offset %lu",
1962 ipf->path, (unsigned long)ipf->offset);
1965 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
1966 oop_rd_event ev, const char *errmsg,
1967 int errnoval, const char *data, size_t recsz,
1969 InputFile *ipf= ipf_v;
1971 char tokentextbuf[sizeof(TOKEN)*2+3];
1973 if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
1975 off_t old_offset= ipf->offset;
1976 ipf->offset += recsz + !!(ev == OOP_RD_OK);
1978 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
1980 if (ev==OOP_RD_PARTREC)
1981 feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
1982 /* but process it anyway */
1984 if (ipf->skippinglong) {
1985 if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
1986 return OOP_CONTINUE;
1988 if (ev==OOP_RD_LONG) {
1989 ipf->skippinglong= 1;
1990 X_BAD_DATA("overly long line");
1993 if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
1994 if (!recsz) X_BAD_DATA("empty line");
1997 if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
1998 ipf->readcount_blank++;
1999 return OOP_CONTINUE;
2002 char *space= strchr(data,' ');
2003 int tokenlen= space-data;
2004 int midlen= (int)recsz-tokenlen-1;
2005 if (midlen <= 2) X_BAD_DATA("no room for messageid");
2006 if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
2008 if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
2009 memcpy(tokentextbuf, data, tokenlen);
2010 tokentextbuf[tokenlen]= 0;
2011 if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
2013 ipf->readcount_ok++;
2015 art= xmalloc(sizeof(*art) - 1 + midlen + 1);
2016 memset(art,0,sizeof(*art));
2017 art->state= art_Unchecked;
2018 art->midlen= midlen;
2019 art->ipf= ipf; ipf->inprogress++;
2020 art->token= TextToToken(tokentextbuf);
2021 art->offset= old_offset;
2022 art->blanklen= recsz;
2023 strcpy(art->messageid, space+1);
2024 LIST_ADDTAIL(ipf->queue, art);
2026 if (sms==sm_NORMAL && ipf==main_input_file &&
2027 ipf->offset >= target_max_feedfile_size)
2028 statemc_start_flush("feed file size");
2030 check_assign_articles();
2031 return OOP_CONTINUE;
2034 /*========== tailing input file ==========*/
2036 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
2038 InputFile *ipf= user;
2039 return ipf->readable_callback(loop, &ipf->readable,
2040 ipf->readable_callback_user);
2043 static void tailing_on_cancel(struct oop_readable *rable) {
2044 InputFile *ipf= (void*)rable;
2046 if (ipf->filemon) filemon_stop(ipf);
2047 loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2048 ipf->readable_callback= 0;
2051 static void tailing_queue_readable(InputFile *ipf) {
2052 /* lifetime of ipf here is OK because destruction will cause
2053 * on_cancel which will cancel this callback */
2054 loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2057 static int tailing_on_readable(struct oop_readable *rable,
2058 oop_readable_call *cb, void *user) {
2059 InputFile *ipf= (void*)rable;
2061 tailing_on_cancel(rable);
2062 ipf->readable_callback= cb;
2063 ipf->readable_callback_user= user;
2066 tailing_queue_readable(ipf);
2070 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2072 InputFile *ipf= (void*)rable;
2074 ssize_t r= read(ipf->fd, buffer, length);
2076 if (errno==EINTR) continue;
2080 if (ipf==main_input_file) {
2083 } else if (ipf==flushing_input_file) {
2085 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2086 } else if (ipf==backlog_input_file) {
2092 tailing_queue_readable(ipf);
2097 /*---------- filemon implemented with inotify ----------*/
2099 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2100 #define HAVE_FILEMON
2102 #include <sys/inotify.h>
2104 static int filemon_inotify_fd;
2105 static int filemon_inotify_wdmax;
2106 static InputFile **filemon_inotify_wd2ipf;
2108 struct Filemon_Perfile {
2112 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2113 int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2114 if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2116 if (wd >= filemon_inotify_wdmax) {
2118 filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2119 sizeof(*filemon_inotify_wd2ipf) * newmax);
2120 memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2121 sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2122 filemon_inotify_wdmax= newmax;
2125 assert(!filemon_inotify_wd2ipf[wd]);
2126 filemon_inotify_wd2ipf[wd]= ipf;
2128 debug("filemon inotify startfile %p wd=%d wdmax=%d",
2129 ipf, wd, filemon_inotify_wdmax);
2134 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2136 debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2137 int r= inotify_rm_watch(filemon_inotify_fd, wd);
2138 if (r) sysdie("inotify_rm_watch");
2139 filemon_inotify_wd2ipf[wd]= 0;
2142 static void *filemon_inotify_readable(oop_source *lp, int fd,
2143 oop_event e, void *u) {
2144 struct inotify_event iev;
2146 int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2148 if (isewouldblock(errno)) break;
2149 sysdie("read from inotify master");
2150 } else if (r==sizeof(iev)) {
2151 assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2153 die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2155 InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2156 debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2157 filemon_callback(ipf);
2159 return OOP_CONTINUE;
2162 static int filemon_method_init(void) {
2163 filemon_inotify_fd= inotify_init();
2164 if (filemon_inotify_fd<0) {
2165 syswarn("filemon/inotify: inotify_init failed");
2168 xsetnonblock(filemon_inotify_fd, 1);
2169 loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2171 debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2175 static void filemon_method_dump_info(FILE *f) {
2177 fprintf(f,"inotify");
2178 DUMPV("%d",,filemon_inotify_fd);
2179 DUMPV("%d",,filemon_inotify_wdmax);
2180 for (i=0; i<filemon_inotify_wdmax; i++)
2181 fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i],);
2184 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2186 /*---------- filemon dummy implementation ----------*/
2188 #if !defined(HAVE_FILEMON)
2190 struct Filemon_Perfile { int dummy; };
2192 static int filemon_method_init(void) {
2193 warn("filemon/dummy: no filemon method compiled in");
2196 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2197 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2198 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2200 #endif /* !HAVE_FILEMON */
2202 /*---------- filemon generic interface ----------*/
2204 static void filemon_start(InputFile *ipf) {
2205 assert(!ipf->filemon);
2208 filemon_method_startfile(ipf, ipf->filemon);
2211 static void filemon_stop(InputFile *ipf) {
2212 if (!ipf->filemon) return;
2213 filemon_method_stopfile(ipf, ipf->filemon);
2218 static void filemon_callback(InputFile *ipf) {
2219 if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2220 ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2223 /*---------- interface to start and stop an input file ----------*/
2225 static const oop_rd_style feedfile_rdstyle= {
2226 OOP_RD_DELIM_STRIP, '\n',
2228 OOP_RD_SHORTREC_LONG,
2231 static void inputfile_reading_start(InputFile *ipf) {
2233 ipf->readable.on_readable= tailing_on_readable;
2234 ipf->readable.on_cancel= tailing_on_cancel;
2235 ipf->readable.try_read= tailing_try_read;
2236 ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2237 ipf->readable.delete_kill= 0;
2239 ipf->readable_callback= 0;
2240 ipf->readable_callback_user= 0;
2242 ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2245 int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2246 feedfile_got_article,ipf, feedfile_read_err, ipf);
2247 if (r) sysdie("unable start reading feedfile %s",ipf->path);
2250 static void inputfile_reading_stop(InputFile *ipf) {
2252 oop_rd_cancel(ipf->rd);
2253 oop_rd_delete(ipf->rd);
2255 assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2259 /*========== interaction with innd - state machine ==========*/
2261 /* See official state diagram at top of file. We implement
2272 |`---------------------------------------------------.
2274 |`---------------- - - - |
2275 D ENOENT | D EXISTS see OVERALL STATES diagram |
2276 | for full startup logic |
2279 | ============ try to |
2285 | | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT |
2286 ^ | hardlink F to D |
2289 | | our handle onto F is now onto D |
2292 | |<-------------------<---------------------<---------+
2294 | | spawn inndcomm flush |
2296 | ================== |
2297 | FLUSHING[-ABSENT] |
2299 | main D tail/none |
2300 | ================== |
2302 | | INNDCOMM FLUSH FAILS ^
2303 | |`----------------------->----------. |
2305 | | NO SUCH SITE V |
2306 ^ |`--------------->----. ==================== |
2307 | | \ FLUSHFAILED[-ABSENT] |
2309 | | FLUSH OK \ main D tail/none |
2310 | | open F \ ==================== |
2312 | | \ | TIME TO RETRY |
2313 | |`------->----. ,---<---'\ `----------------'
2314 | | D NONE | | D NONE `----.
2316 | ============= V V ============
2317 | SEPARATED-1 | | DROPPING-1
2318 | flsh->rd!=0 | | flsh->rd!=0
2319 | [Separated] | | [Dropping]
2320 | main F idle | | main none
2321 | flsh D tail | | flsh D tail
2322 | ============= | | ============
2324 ^ | EOF ON D | | defer | EOF ON D
2326 | =============== | | ===============
2327 | SEPARATED-2 | | DROPPING-2
2328 | flsh->rd==0 | V flsh->rd==0
2329 | [Finishing] | | [Dropping]
2330 | main F tail | `. main none
2331 | flsh D closed | `. flsh D closed
2332 | =============== V `. ===============
2334 | | ALL D PROCESSED `. | ALL D PROCESSED
2335 | V install defer as backlog `. | install defer
2336 ^ | close D `. | close D
2337 | | unlink D `. | unlink D
2340 `----------' ==============
2360 static void startup_set_input_file(InputFile *f) {
2361 assert(!main_input_file);
2363 inputfile_reading_start(f);
2366 static void statemc_lock(void) {
2368 struct stat stab, stabf;
2371 lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2372 if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2375 memset(&fl,0,sizeof(fl));
2377 fl.l_whence= SEEK_SET;
2378 int r= fcntl(lockfd, F_SETLK, &fl);
2380 if (errno==EACCES || isewouldblock(errno)) {
2381 if (quiet_multiple) exit(0);
2382 fatal("another duct holds the lockfile");
2384 sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2387 xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2389 xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2391 if (!lock_noent && samefile(&stab, &stabf))
2394 xclose(lockfd, "stale lockfile ", path_lock);
2397 FILE *lockfile= fdopen(lockfd, "w");
2398 if (!lockfile) sysdie("fdopen lockfile");
2400 int r= ftruncate(lockfd, 0);
2401 if (r) sysdie("truncate lockfile to write new info");
2403 if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2404 (unsigned long)self_pid,
2405 sitename, feedfile, remote_host) == EOF ||
2407 sysfatal("write info to lockfile %s", path_lock);
2409 debug("startup: locked");
2412 static void statemc_init(void) {
2413 struct stat stabdefer;
2415 search_backlog_file();
2418 xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2420 debug("startup: ductdefer ENOENT");
2422 debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2423 switch (stabdefer.st_nlink==1) {
2425 open_defer(); /* so that we will later close it and rename it */
2428 xunlink(path_defer, "stale defer file link"
2429 " (presumably hardlink to backlog file)");
2432 die("defer file %s has unexpected link count %d",
2433 path_defer, stabdefer.st_nlink);
2437 struct stat stab_f, stab_d;
2440 InputFile *file_d= open_input_file(path_flushing);
2441 if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2443 xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2445 if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2446 debug("startup: F==D => Hardlinked");
2447 xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2452 debug("startup: F ENOENT => Moved");
2453 if (file_d) startup_set_input_file(file_d);
2454 spawn_inndcomm_flush("feedfile missing at startup");
2455 /* => Flushing, sms:=FLUSHING */
2458 debug("startup: F!=D => Separated");
2459 startup_set_input_file(file_d);
2460 flushing_input_file= main_input_file;
2461 main_input_file= open_input_file(feedfile);
2462 if (!main_input_file) die("feedfile vanished during startup");
2463 SMS(SEPARATED, 0, "found both old and current feed files");
2465 debug("startup: F exists, D ENOENT => Normal");
2466 InputFile *file_f= open_input_file(feedfile);
2467 if (!file_f) die("feed file vanished during startup");
2468 startup_set_input_file(file_f);
2469 SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2474 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2475 assert(sms == sm_NORMAL);
2477 debug("starting flush (%s) (%lu >?= %lu) (%d)",
2479 (unsigned long)(main_input_file ? main_input_file->offset : 0),
2480 (unsigned long)target_max_feedfile_size,
2483 int r= link(feedfile, path_flushing);
2484 if (r) sysfatal("link feedfile %s to flushing file %s",
2485 feedfile, path_flushing);
2488 xunlink(feedfile, "old feedfile link");
2491 spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2494 static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
2497 statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2499 case sm_FLUSHFAILED:
2500 spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2507 static void statemc_period_poll(void) {
2508 if (!until_flush) return;
2510 assert(until_flush>=0);
2512 if (until_flush) return;
2513 int ok= trigger_flush_ok();
2517 static int inputfile_is_done(InputFile *ipf) {
2519 if (ipf->inprogress) return 0; /* new article in the meantime */
2520 if (ipf->rd) return 0; /* not had EOF */
2524 static void notice_processed(InputFile *ipf, int completed,
2525 const char *what, const char *spec) {
2526 if (!ipf) return; /* allows preterminate to be lazy */
2528 #define RCI_NOTHING(x) /* nothing */
2529 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2530 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2532 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2534 char *inprog= completed
2535 ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2536 : xasprintf(" inprogress=%ld", ipf->inprogress);
2538 info("%s %s%s read=%d (+bl=%d,+err=%d)%s"
2539 " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2540 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2542 completed?"completed":"processed", what, spec,
2543 ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, inprog,
2544 CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2545 , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2546 CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2547 , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2548 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
2556 static void statemc_check_backlog_done(void) {
2557 InputFile *ipf= backlog_input_file;
2558 if (!inputfile_is_done(ipf)) return;
2560 const char *slash= strrchr(ipf->path, '/');
2561 const char *leaf= slash ? slash+1 : ipf->path;
2562 const char *under= strchr(slash, '_');
2563 const char *rest= under ? under+1 : leaf;
2564 if (!strncmp(rest,"backlog",7)) rest += 7;
2565 notice_processed(ipf,1,"backlog ",rest);
2567 close_input_file(ipf);
2568 if (unlink(ipf->path)) {
2569 if (errno != ENOENT)
2570 sysdie("could not unlink processed backlog file %s", ipf->path);
2571 warn("backlog file %s vanished while we were reading it"
2572 " so we couldn't remove it (but it's done now, anyway)",
2576 backlog_input_file= 0;
2577 search_backlog_file();
2581 static void statemc_check_flushing_done(void) {
2582 InputFile *ipf= flushing_input_file;
2583 if (!inputfile_is_done(ipf)) return;
2585 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2587 notice_processed(ipf,1,"feedfile","");
2591 xunlink(path_flushing, "old flushing file");
2593 close_input_file(flushing_input_file);
2594 free(flushing_input_file);
2595 flushing_input_file= 0;
2597 if (sms==sm_SEPARATED) {
2598 notice("flush complete");
2599 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2600 } else if (sms==sm_DROPPING) {
2601 SMS(DROPPED, 0, "old flush complete");
2602 search_backlog_file();
2603 notice("feed dropped, but will continue until backlog is finished");
2607 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2609 assert(!inputfile_is_done(main_input_file));
2610 statemc_check_flushing_done();
2611 statemc_check_backlog_done();
2612 return OOP_CONTINUE;
2615 static void queue_check_input_done(void) {
2616 loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2619 static void statemc_setstate(StateMachineState newsms, int periods,
2620 const char *forlog, const char *why) {
2622 until_flush= periods;
2624 const char *xtra= "";
2627 case sm_FLUSHFAILED:
2628 if (!main_input_file) xtra= "-ABSENT";
2632 xtra= flushing_input_file->rd ? "-1" : "-2";
2638 info("state %s%s[%d] %s",forlog,xtra,periods,why);
2640 info("state %s%s %s",forlog,xtra,why);
2644 /*---------- defer and backlog files ----------*/
2646 static void open_defer(void) {
2651 defer= fopen(path_defer, "a+");
2652 if (!defer) sysfatal("could not open defer file %s", path_defer);
2654 /* truncate away any half-written records */
2656 xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2658 if (stab.st_size > LONG_MAX)
2659 die("defer file %s size is far too large", path_defer);
2664 long orgsize= stab.st_size;
2665 long truncto= stab.st_size;
2667 if (!truncto) break; /* was only (if anything) one half-truncated record */
2668 if (fseek(defer, truncto-1, SEEK_SET) < 0)
2669 sysdie("seek in defer file %s while truncating partial", path_defer);
2674 sysdie("failed read from defer file %s", path_defer);
2676 die("defer file %s shrank while we were checking it!", path_defer);
2682 if (stab.st_size != truncto) {
2683 warn("truncating half-record at end of defer file %s -"
2684 " shrinking by %ld bytes from %ld to %ld",
2685 path_defer, orgsize - truncto, orgsize, truncto);
2688 sysfatal("could not flush defer file %s", path_defer);
2689 if (ftruncate(fileno(defer), truncto))
2690 sysdie("could not truncate defer file %s", path_defer);
2693 info("continuing existing defer file %s (%ld bytes)",
2694 path_defer, orgsize);
2696 if (fseek(defer, truncto, SEEK_SET))
2697 sysdie("could not seek to new end of defer file %s", path_defer);
2700 static void close_defer(void) {
2705 xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2707 if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2710 time_t now= xtime();
2712 char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2714 (unsigned long)stab.st_ino);
2715 if (link(path_defer, backlog))
2716 sysfatal("could not install defer file %s as backlog file %s",
2717 path_defer, backlog);
2718 if (unlink(path_defer))
2719 sysdie("could not unlink old defer link %s to backlog file %s",
2720 path_defer, backlog);
2724 if (until_backlog_nextscan < 0 ||
2725 until_backlog_nextscan > backlog_retry_minperiods + 1)
2726 until_backlog_nextscan= backlog_retry_minperiods + 1;
2729 static void poll_backlog_file(void) {
2730 if (until_backlog_nextscan < 0) return;
2731 if (until_backlog_nextscan-- > 0) return;
2732 search_backlog_file();
2735 static void search_backlog_file(void) {
2736 /* returns non-0 iff there are any backlog files */
2741 const char *oldest_path=0;
2742 time_t oldest_mtime=0, now;
2744 if (backlog_input_file) return;
2748 r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2752 sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2754 fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2756 for (i=0; i<gl.gl_pathc; i++) {
2757 const char *path= gl.gl_pathv[i];
2759 if (strchr(path,'#') || strchr(path,'~')) {
2760 debug("backlog file search skipping %s", path);
2763 r= stat(path, &stab);
2765 syswarn("failed to stat backlog file %s", path);
2768 if (!S_ISREG(stab.st_mode)) {
2769 warn("backlog file %s is not a plain file (or link to one)", path);
2772 if (!oldest_path || stab.st_mtime < oldest_mtime) {
2774 oldest_mtime= stab.st_mtime;
2777 case GLOB_NOMATCH: /* fall through */
2780 sysdie("glob expansion of backlog pattern %s gave unexpected"
2781 " nonzero (error?) return value %d", globpat_backlog, r);
2785 debug("backlog scan: none");
2787 if (sms==sm_DROPPED) {
2789 notice("feed dropped and our work is complete");
2791 int r= unlink(path_control);
2792 if (r && errno!=ENOENT)
2793 syswarn("failed to remove control symlink for old feed");
2795 xunlink(path_lock, "lockfile for old feed");
2798 until_backlog_nextscan= backlog_spontrescan_periods;
2803 double age= difftime(now, oldest_mtime);
2804 long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2806 if (age_deficiency <= 0) {
2807 debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2808 age, age_deficiency, oldest_path);
2810 backlog_input_file= open_input_file(oldest_path);
2811 if (!backlog_input_file) {
2812 warn("backlog file %s vanished as we opened it", oldest_path);
2816 inputfile_reading_start(backlog_input_file);
2817 until_backlog_nextscan= -1;
2821 until_backlog_nextscan= age_deficiency / period_seconds;
2823 if (backlog_spontrescan_periods >= 0 &&
2824 until_backlog_nextscan > backlog_spontrescan_periods)
2825 until_backlog_nextscan= backlog_spontrescan_periods;
2827 debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2828 age, age_deficiency, until_backlog_nextscan, oldest_path);
2835 /*---------- shutdown and signal handling ----------*/
2837 static void preterminate(void) {
2838 if (in_child) return;
2839 notice_processed(main_input_file,0,"feedfile","");
2840 notice_processed(flushing_input_file,0,"flushing","");
2841 if (backlog_input_file)
2842 notice_processed(backlog_input_file,0, "backlog file ",
2843 backlog_input_file->path);
2846 static int signal_self_pipe[2];
2847 static sig_atomic_t terminate_sig_flag;
2849 static void raise_default(int signo) {
2850 xsigsetdefault(signo);
2855 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2856 assert(fd=signal_self_pipe[0]);
2858 int r= read(signal_self_pipe[0], buf, sizeof(buf));
2859 if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2860 if (r==0) die("eof on signal self pipe");
2861 if (terminate_sig_flag) {
2863 notice("terminating (%s)", strsignal(terminate_sig_flag));
2864 raise_default(terminate_sig_flag);
2866 return OOP_CONTINUE;
2869 static void sigarrived_handler(int signum) {
2874 if (!terminate_sig_flag) terminate_sig_flag= signum;
2879 write(signal_self_pipe[1],&x,1);
2882 static void init_signals(void) {
2883 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
2884 sysdie("could not ignore SIGPIPE");
2886 if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
2888 xsetnonblock(signal_self_pipe[0],1);
2889 xsetnonblock(signal_self_pipe[1],1);
2891 struct sigaction sa;
2892 memset(&sa,0,sizeof(sa));
2893 sa.sa_handler= sigarrived_handler;
2894 sa.sa_flags= SA_RESTART;
2895 xsigaction(SIGTERM,&sa);
2896 xsigaction(SIGINT,&sa);
2898 on_fd_read_except(signal_self_pipe[0], sigarrived_event);
2901 /*========== flushing the feed ==========*/
2903 static pid_t inndcomm_child;
2904 static int inndcomm_sentinel_fd;
2906 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
2907 assert(inndcomm_child);
2908 assert(fd == inndcomm_sentinel_fd);
2909 int status= xwaitpid(&inndcomm_child, "inndcomm");
2912 cancel_fd_read_except(fd);
2913 xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
2914 inndcomm_sentinel_fd= 0;
2916 assert(!flushing_input_file);
2918 if (WIFEXITED(status)) {
2919 switch (WEXITSTATUS(status)) {
2921 case INNDCOMMCHILD_ESTATUS_FAIL:
2924 case INNDCOMMCHILD_ESTATUS_NONESUCH:
2925 notice("feed has been dropped by innd, finishing up");
2926 flushing_input_file= main_input_file;
2927 tailing_queue_readable(flushing_input_file);
2928 /* we probably previously returned EAGAIN from our fake read method
2929 * when in fact we were at EOF, so signal another readable event
2930 * so we actually see the EOF */
2934 if (flushing_input_file) {
2935 SMS(DROPPING, 0, "feed dropped by innd, but must finish last flush");
2938 SMS(DROPPED, 0, "feed dropped by innd");
2939 search_backlog_file();
2941 return OOP_CONTINUE;
2945 flushing_input_file= main_input_file;
2946 tailing_queue_readable(flushing_input_file);
2948 main_input_file= open_input_file(feedfile);
2949 if (!main_input_file)
2950 die("flush succeeded but feedfile %s does not exist!", feedfile);
2952 if (flushing_input_file) {
2953 SMS(SEPARATED, spontaneous_flush_periods, "recovery flush complete");
2956 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2958 return OOP_CONTINUE;
2961 goto unexpected_exitstatus;
2964 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
2965 warn("flush timed out trying to talk to innd");
2968 unexpected_exitstatus:
2969 report_child_status("inndcomm child", status);
2973 SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
2974 return OOP_CONTINUE;
2977 static void inndcommfail(const char *what) {
2978 syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
2979 exit(INNDCOMMCHILD_ESTATUS_FAIL);
2982 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
2985 notice("flushing %s",why);
2987 assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
2988 assert(!inndcomm_child);
2989 assert(!inndcomm_sentinel_fd);
2991 if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
2993 inndcomm_child= xfork("inndcomm child");
2995 if (!inndcomm_child) {
2996 const char *flushargv[2]= { sitename, 0 };
3000 xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
3001 /* parent spots the autoclose of pipefds[1] when we die or exit */
3003 if (simulate_flush>=0) {
3004 warn("SIMULATING flush child status %d", simulate_flush);
3005 if (simulate_flush>128) raise(simulate_flush-128);
3006 else exit(simulate_flush);
3009 alarm(inndcomm_flush_timeout);
3010 r= ICCopen(); if (r) inndcommfail("connect");
3011 r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit");
3012 if (!r) exit(0); /* yay! */
3014 if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
3015 syswarn("innd ctlinnd flush failed: innd said %s", reply);
3016 exit(INNDCOMMCHILD_ESTATUS_FAIL);
3021 xclose(pipefds[1], "inndcomm sentinel child's end",0);
3022 inndcomm_sentinel_fd= pipefds[0];
3023 assert(inndcomm_sentinel_fd);
3024 on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
3026 SMS(FLUSHING, 0, why);
3029 /*========== main program ==========*/
3031 static void postfork_inputfile(InputFile *ipf) {
3033 xclose(ipf->fd, "(in child) input file ", ipf->path);
3036 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
3037 /* we have no stdio streams that are buffered long-term */
3039 if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
3042 static void postfork(void) {
3045 xsigsetdefault(SIGTERM);
3046 xsigsetdefault(SIGINT);
3047 xsigsetdefault(SIGPIPE);
3048 if (terminate_sig_flag) raise(terminate_sig_flag);
3050 postfork_inputfile(main_input_file);
3051 postfork_inputfile(flushing_input_file);
3055 conn_closefd(conn,"(in child) ");
3057 postfork_stdio(defer, "defer file ", path_defer);
3060 typedef struct Every Every;
3062 struct timeval interval;
3067 static void every_schedule(Every *e, struct timeval base);
3069 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3072 if (!e->fixed_rate) xgettimeofday(&base);
3073 every_schedule(e, base);
3074 return OOP_CONTINUE;
3077 static void every_schedule(Every *e, struct timeval base) {
3078 struct timeval when;
3079 timeradd(&base, &e->interval, &when);
3080 loop->on_time(loop, when, every_happens, e);
3083 static void every(int interval, int fixed_rate, void (*f)(void)) {
3084 NEW_DECL(Every *,e);
3085 e->interval.tv_sec= interval;
3086 e->interval.tv_usec= 0;
3087 e->fixed_rate= fixed_rate;
3090 xgettimeofday(&now);
3091 every_schedule(e, now);
3094 static void filepoll(void) {
3095 filemon_callback(main_input_file);
3096 filemon_callback(flushing_input_file);
3099 static char *debug_report_ipf(InputFile *ipf) {
3100 if (!ipf) return xasprintf("none");
3102 const char *slash= strrchr(ipf->path,'/');
3103 const char *path= slash ? slash+1 : ipf->path;
3105 return xasprintf("%p/%s:queue=%d,ip=%ld,off=%ld,fd=%d%s%s",
3107 ipf->queue.count, ipf->inprogress, (long)ipf->offset,
3109 ipf->rd ? "" : ",!rd",
3110 ipf->skippinglong ? "*skiplong" : "");
3113 static void period(void) {
3114 char *dipf_main= debug_report_ipf(main_input_file);
3115 char *dipf_flushing= debug_report_ipf(flushing_input_file);
3116 char *dipf_backlog= debug_report_ipf(backlog_input_file);
3119 " sms=%s[%d] conns=%d until_connect=%d"
3120 " input_files main:%s flushing:%s backlog:%s"
3121 " children connecting=%ld inndcomm=%ld"
3123 sms_names[sms], until_flush, conns.count, until_connect,
3124 dipf_main, dipf_flushing, dipf_backlog,
3125 (long)connecting_child, (long)inndcomm_child
3129 free(dipf_flushing);
3132 if (until_connect) until_connect--;
3134 poll_backlog_file();
3135 if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3136 statemc_period_poll();
3137 check_assign_articles();
3142 /*========== dumping state ==========*/
3144 static void dump_article_list(FILE *f, const ControlCommand *c,
3145 const ArticleList *al) {
3146 fprintf(f, " count=%d\n", al->count);
3147 if (!c->xval) return;
3149 int i; Article *art;
3150 for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3151 fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3152 DUMPV("%p", art->,ipf);
3153 DUMPV("%d", art->,missing);
3154 DUMPV("%lu", (unsigned long)art->,offset);
3155 DUMPV("%d", art->,blanklen);
3156 DUMPV("%d", art->,midlen);
3157 fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3161 static void dump_input_file(FILE *f, const ControlCommand *c,
3162 InputFile *ipf, const char *wh) {
3163 char *dipf= debug_report_ipf(ipf);
3164 fprintf(f,"input %s %s", wh, dipf);
3168 DUMPV("%d", ipf->,readcount_ok);
3169 DUMPV("%d", ipf->,readcount_blank);
3170 DUMPV("%d", ipf->,readcount_err);
3174 ArtState state; const char *const *statename;
3175 for (state=0, statename=artstate_names; *statename; state++,statename++) {
3176 #define RC_DUMP_FMT(x) " " #x "=%d"
3177 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3178 fprintf(f,"input %s counts %-11s"
3179 RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3181 RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3183 fprintf(f,"input %s queue", wh);
3184 dump_article_list(f,c,&ipf->queue);
3190 fprintf(cc->out, "dumping state to %s\n", path_dump);
3191 FILE *f= fopen(path_dump, "w");
3192 if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3194 fprintf(f,"general");
3195 DUMPV("%s", sms_names,[sms]);
3196 DUMPV("%d", ,until_flush);
3197 DUMPV("%ld", (long),self_pid);
3198 DUMPV("%p", , defer);
3199 DUMPV("%d", , until_connect);
3200 DUMPV("%d", , until_backlog_nextscan);
3201 DUMPV("%d", , simulate_flush);
3202 fprintf(f,"\nnocheck");
3203 DUMPV("%#.10f", , accept_proportion);
3204 DUMPV("%d", , nocheck);
3205 DUMPV("%d", , nocheck_reported);
3208 fprintf(f,"special");
3209 DUMPV("%ld", (long),connecting_child);
3210 DUMPV("%d", , connecting_fdpass_sock);
3211 DUMPV("%d", , control_master);
3214 fprintf(f,"filemon ");
3215 filemon_method_dump_info(f);
3217 dump_input_file(f,c, main_input_file, "main" );
3218 dump_input_file(f,c, flushing_input_file, "flushing");
3219 dump_input_file(f,c, backlog_input_file, "backlog" );
3221 fprintf(f,"conns count=%d\n", conns.count);
3226 fprintf(f,"C%d",conn->fd);
3227 DUMPV("%p",conn->,rd); DUMPV("%d",conn->,max_queue);
3228 DUMPV("%d",conn->,stream); DUMPV("%d",conn->,quitting);
3229 DUMPV("%d",conn->,since_activity);
3232 fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3233 fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3234 fprintf(f,"C%d sent", conn->fd); dump_article_list(f,c,&conn->sent);
3236 fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3237 for (i=0; i<conn->xmitu; i++) {
3238 const struct iovec *iv= &conn->xmit[i];
3239 const XmitDetails *xd= &conn->xmitd[i];
3242 case xk_Const: dinfo= xasprintf("Const"); break;
3243 case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break;
3247 fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3248 sanitise(iv->iov_base, iv->iov_len));
3254 DUMPV("%s", , path_lock);
3255 DUMPV("%s", , path_flushing);
3256 DUMPV("%s", , path_defer);
3257 DUMPV("%s", , path_control);
3258 DUMPV("%s", , path_dump);
3259 DUMPV("%s", , globpat_backlog);
3262 if (!!ferror(f) + !!fclose(f)) {
3263 fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3268 /*========== option parsing ==========*/
3270 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3271 static void vbadusage(const char *fmt, va_list al) {
3272 char *m= xvasprintf(fmt,al);
3273 fprintf(stderr, "bad usage: %s\n"
3274 "say --help for help, or read the manpage\n",
3277 syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3281 /*---------- generic option parser ----------*/
3283 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3284 static void badusage(const char *fmt, ...) {
3291 of_seconds= 001000u,
3292 of_boolean= 002000u,
3295 typedef struct Option Option;
3296 typedef void OptionParser(const Option*, const char *val);
3300 const char *lng, *formarg;
3306 static void parse_options(const Option *options, char ***argvp) {
3307 /* on return *argvp is first non-option arg; argc is not updated */
3310 const char *arg= *++(*argvp);
3312 if (*arg != '-') break;
3313 if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3315 while ((a= *++arg)) {
3319 char *equals= strchr(arg,'=');
3320 int len= equals ? (equals - arg) : strlen(arg);
3321 for (o=options; o->shrt || o->lng; o++)
3322 if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3324 badusage("unknown long option --%s",arg);
3327 if (equals) badusage("option --%s does not take a value",o->lng);
3329 } else if (equals) {
3333 if (!arg) badusage("option --%s needs a value for %s",
3334 o->lng, o->formarg);
3337 break; /* eaten the whole argument now */
3339 for (o=options; o->shrt || o->lng; o++)
3342 badusage("unknown short option -%c",a);
3349 if (!arg) badusage("option -%c needs a value for %s",
3350 o->shrt, o->formarg);
3353 break; /* eaten the whole argument now */
3359 #define DELIMPERHAPS(delim,str) (str) ? (delim) : "", (str) ? (str) : ""
3361 static void print_options(const Option *options, FILE *f) {
3363 for (o=options; o->shrt || o->lng; o++) {
3364 char shrt[2] = { o->shrt, 0 };
3365 char *optspec= xasprintf("%s%s%s%s%s",
3366 o->shrt ? "-" : "", shrt,
3367 o->shrt && o->lng ? "|" : "",
3368 DELIMPERHAPS("--", o->lng));
3369 fprintf(f, " %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3374 /*---------- specific option types ----------*/
3376 static void op_integer(const Option *o, const char *val) {
3379 unsigned long ul= strtoul(val,&ep,10);
3380 if (*ep || ep==val || errno || ul>INT_MAX)
3381 badusage("bad integer value for %s",o->lng);
3382 int *store= o->store;
3386 static void op_double(const Option *o, const char *val) {
3387 int *store= o->store;
3390 *store= strtod(val, &ep);
3391 if (*ep || ep==val || errno)
3392 badusage("bad floating point value for %s",o->lng);
3395 static void op_string(const Option *o, const char *val) {
3396 const char **store= o->store;
3400 static void op_seconds(const Option *o, const char *val) {
3401 int *store= o->store;
3405 double v= strtod(val,&ep);
3406 if (ep==val) badusage("bad time/duration value for %s",o->lng);
3408 if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3409 else if (!strcmp(ep,"m") || !strcmp(ep,"min")) unit= 60;
3410 else if (!strcmp(ep,"h") || !strcmp(ep,"hour")) unit= 3600;
3411 else if (!strcmp(ep,"d") || !strcmp(ep,"day")) unit= 86400;
3412 else if (!strcmp(ep,"das")) unit= 10;
3413 else if (!strcmp(ep,"hs")) unit= 100;
3414 else if (!strcmp(ep,"ks")) unit= 1000;
3415 else if (!strcmp(ep,"Ms")) unit= 1000000;
3416 else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3420 if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3424 static void op_setint(const Option *o, const char *val) {
3425 int *store= o->store;
3429 /*---------- specific options ----------*/
3431 static void help(const Option *o, const char *val);
3433 static const Option innduct_options[]= {
3434 {'f',"feedfile", "F", &feedfile, op_string },
3435 {'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 },
3436 {0,"no-daemon", 0, &become_daemon, op_setint, 0 },
3437 {0,"no-streaming", 0, &try_stream, op_setint, 0 },
3438 {0,"no-filemon", 0, &try_filemon, op_setint, 0 },
3439 {'C',"inndconf", "F", &inndconffile, op_string },
3440 {'P',"port", "PORT", &port, op_integer },
3441 {0,"ctrl-sock-dir", 0, &realsockdir, op_string },
3442 {0,"help", 0, 0, help },
3444 {0,"max-connections", "N", &max_connections, op_integer },
3445 {0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer },
3446 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer },
3447 {0,"period-interval", "TIME", &period_seconds, op_seconds },
3449 {0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds },
3450 {0,"stuck-flush-timeout", "TIME", &inndcomm_flush_timeout, op_seconds },
3451 {0,"feedfile-poll", "TIME", &filepoll_seconds, op_seconds },
3453 {0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double },
3454 {0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double },
3456 {0,"reconnect-interval", "PERIOD", &reconnect_delay_periods, op_seconds },
3457 {0,"flush-retry-interval", "PERIOD", &flushfail_retry_periods, op_seconds },
3458 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3459 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3460 {0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds },
3461 {0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds },
3463 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double },
3464 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer },
3469 static void printusage(FILE *f) {
3470 fputs("usage: innduct [options] site [fqdn]\n"
3471 "available options are:\n", f);
3472 print_options(innduct_options, f);
3475 static void help(const Option *o, const char *val) {
3477 if (ferror(stdout) || fflush(stdout)) {
3478 perror("innduct: writing help");
3484 static void convert_to_periods_rndup(int *store) {
3485 *store += period_seconds-1;
3486 *store /= period_seconds;
3489 int main(int argc, char **argv) {
3495 parse_options(innduct_options, &argv);
3500 if (!sitename) badusage("need site name argument");
3501 remote_host= *argv++;
3502 if (*argv) badusage("too many non-option arguments");
3506 int r= innconf_read(inndconffile);
3507 if (!r) badusage("could not read inn.conf (more info on stderr)");
3509 if (!remote_host) remote_host= sitename;
3511 if (nocheck_thresh < 0 || nocheck_thresh > 100)
3512 badusage("nocheck threshold percentage must be between 0..100");
3513 nocheck_thresh *= 0.01;
3515 if (nocheck_decay < 0.1)
3516 badusage("nocheck decay articles must be at least 0.1");
3517 nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3519 convert_to_periods_rndup(&reconnect_delay_periods);
3520 convert_to_periods_rndup(&flushfail_retry_periods);
3521 convert_to_periods_rndup(&backlog_retry_minperiods);
3522 convert_to_periods_rndup(&backlog_spontrescan_periods);
3523 convert_to_periods_rndup(&spontaneous_flush_periods);
3524 convert_to_periods_rndup(&need_activity_periods);
3526 if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3527 badusage("bad input data ratio must be between 0..100");
3528 max_bad_data_ratio *= 0.01;
3531 feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3532 } else if (!feedfile[0]) {
3533 badusage("feed filename must be nonempty");
3534 } else if (feedfile[strlen(feedfile)-1]=='/') {
3535 feedfile= xasprintf("%s%s",feedfile,sitename);
3538 const char *feedfile_forbidden= "?*[~#";
3540 while ((c= *feedfile_forbidden++))
3541 if (strchr(feedfile, c))
3542 badusage("feed filename may not contain metacharacter %c",c);
3546 path_lock= xasprintf("%s_lock", feedfile);
3547 path_flushing= xasprintf("%s_flushing", feedfile);
3548 path_defer= xasprintf("%s_defer", feedfile);
3549 path_control= xasprintf("%s_control", feedfile);
3550 path_dump= xasprintf("%s_dump", feedfile);
3551 globpat_backlog= xasprintf("%s_backlog*", feedfile);
3553 oop_source_sys *sysloop= oop_sys_new();
3554 if (!sysloop) sysdie("could not create liboop event loop");
3555 loop= (oop_source*)sysloop;
3559 if (become_daemon) {
3561 for (i=3; i<255; i++)
3562 /* do this now before we open syslog, etc. */
3564 openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3566 int null= open("/dev/null",O_RDWR);
3567 if (null<0) sysfatal("failed to open /dev/null");
3571 xclose(null, "/dev/null original fd",0);
3573 pid_t child1= xfork("daemonise first fork");
3574 if (child1) _exit(0);
3576 pid_t sid= setsid();
3577 if (sid != child1) sysfatal("setsid failed");
3579 pid_t child2= xfork("daemonise second fork");
3580 if (child2) _exit(0);
3584 if (self_pid==-1) sysdie("getpid");
3599 notice("filemon: suppressed by command line option, polling");
3601 filemon_ok= filemon_method_init();
3603 warn("filemon: no file monitoring available, polling");
3606 every(filepoll_seconds,0,filepoll);
3608 every(period_seconds,1,period);
3614 void *run= oop_sys_run(sysloop);
3615 assert(run == OOP_ERROR);
3616 sysdie("event loop failed");