3 * build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
8 to ensure articles go away eventually
9 separate queue for each input file
11 every period, check head of backlog queue for expiry with SMretrieve
12 if too old: discard, and check next article
13 also check every backlog article as we read it
15 after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
16 one-off: eat queued articles from flushing and write them to defer
17 one-off: connfail all connections which have any articles from flushing
18 newly read articles from flushing go straight to defer
19 this should take care of it and get us out of this state
20 to avoid filling up ram needlessly
22 limit number of queued articles for each ipf
23 pause/resume inputfile tailing
27 * Newsfeeds file entries should look like this:
28 * host.name.of.site[/exclude,exclude,...]\
29 * :pattern,pattern...[/distribution,distribution...]\
33 * sitename[/exclude,exclude,...]\
34 * :pattern,pattern...[/distribution,distribution...]\
40 * or might be blanked out
41 * <spc><spc><spc><spc>....
43 * F site.name main feed file
44 * opened/created, then written, by innd
47 * tokens blanked out by duct when processed
48 * site.name_lock lock preventing multiple ducts
49 * to hold lock must open,F_SETLK[W]
50 * and then stat to check that locked file
51 * still has name site.name_lock
52 * holder of this lock is "duct"
53 * (only) lockholder may remove the lockfile
54 * D site.name_flushing temporary feed file during flush (or crash)
55 * hardlink created by duct
57 * site.name_defer 431'd articles, still being written,
58 * created, written, used by duct
60 * site.name_backlog.<date>.<inum>
61 * 431'd articles, ready for innxmit or duct
62 * created (link/mv) by duct
63 * site.name_backlog<anything-else> (where <anything-else> does not
64 * contain '#' or '~') eg
65 * site.name_backlog.manual
66 * anything the sysadmin likes (eg, feed files
67 * from old feeds to be merged into this one)
68 * created (link/mv) by admin
69 * may be symlinks (in which case links
70 * may be written through, but only links
73 * It is safe to remove backlog files manually,
74 * if it's desired to throw away the backlog.
76 * Backlog files are also processed by innduct. We find the oldest
77 * backlog file which is at least a certain amount old, and feed it
78 * back into our processing. When every article in it has been read
79 * and processed, we unlink it and look for another backlog file.
81 * If we don't have a backlog file that we're reading, we close the
82 * defer file that we're writing and make it into a backlog file at
83 * the first convenient opportunity.
94 | | <----------------<---------------------------------'|
100 | F: innd writing, duct reading |
103 | | duct decides time to flush |
104 | | duct makes hardlink |
106 | V <------------------------'|
108 | F == D: innd writing, duct reading both exist |
111 | | <-----------<-------------<--'|
112 | | open D F ENOENT |
115 | V <---------------------. |
118 | D: innd writing, duct reading; or ENOENT | |
120 | | duct requests flush of feed | |
121 | | (others can too, harmlessly) | |
125 | D: innd flushing, duct; or ENOENT | |
127 | | inndcomm flush fails | |
128 | |`-------------------------->------------------' |
130 | | inndcomm reports no such site |
131 | |`---------------------------------------------------- | -.
133 | | innd finishes writing D, creates F | |
134 | | inndcomm reports flush successful | |
137 | Separated <----------------' |
138 | F: innd writing F!=D /
139 | D: duct reading; or ENOENT both exist /
141 | | duct gets to the end of D /
142 | | duct opens F too /
145 | F: innd writing, duct reading |
146 | D: duct finishing V
148 | | duct finishes processing D F: ENOENT
149 | V duct unlinks D D: duct reading
151 `--<--' | duct finishes
161 "duct reading" means innduct is reading the file but also
162 overwriting processed tokens.
166 * rune for printing diagrams:
168 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
173 /*============================== PROGRAM ==============================*/
175 #define _GNU_SOURCE 1
181 #include "inndcomm.h"
183 #include "inn/list.h"
184 #include "inn/innconf.h"
187 #include <sys/types.h>
188 #include <sys/wait.h>
189 #include <sys/stat.h>
190 #include <sys/socket.h>
209 #include <oop-read.h>
211 /*----- general definitions, probably best not changed -----*/
213 #define CONNCHILD_ESTATUS_STREAM 24
214 #define CONNCHILD_ESTATUS_NOSTREAM 25
216 #define INNDCOMMCHILD_ESTATUS_FAIL 26
217 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
219 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
220 #define MAX_CONTROL_COMMAND 1000
222 #define VA va_list al; va_start(al,fmt)
223 #define PRINTF(f,a) __attribute__((__format__(printf,f,a)))
224 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
225 #define NORET __attribute__((__noreturn__))
227 #define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr))))
228 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
230 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
232 #define FOR_CONN(conn) \
233 for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
235 /*----- doubly linked lists -----*/
237 #define ISNODE(T) struct node list_node
240 union { struct list li; T *for_type; } u; \
244 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
246 #define LIST_CHECKCANHAVENODE(l,n) \
247 ((void)((n) == ((l).u.for_type))) /* just for the type check */
249 #define LIST_ADDSOMEHOW(l,n,list_addsomehow) \
250 ( LIST_CHECKCANHAVENODE(l,n), \
251 list_addsomehow(&(l).u.li, NODE((n))), \
255 #define LIST_REMSOMEHOW(l,list_remsomehow) \
256 ( (typeof((l).u.for_type)) \
259 list_remsomehow(&(l).u.li) ) \
265 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
266 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
267 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
268 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
270 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
271 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
272 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
273 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
275 #define LIST_REMOVE(l,n) \
276 ( LIST_CHECKCANHAVENODE(l,n), \
277 list_remove(NODE((n))), \
281 #define LIST_INSERT(l,n,pred) \
282 ( LIST_CHECKCANHAVENODE(l,n), \
283 LIST_CHECKCANHAVENODE(l,pred), \
284 list_insert((struct list*)&(l), NODE((n)), NODE((pred))), \
288 /*----- type predeclarations -----*/
290 typedef struct Conn Conn;
291 typedef struct Article Article;
292 typedef struct InputFile InputFile;
293 typedef struct XmitDetails XmitDetails;
294 typedef struct Filemon_Perfile Filemon_Perfile;
295 typedef enum StateMachineState StateMachineState;
296 typedef struct ControlCommand ControlCommand;
301 /*----- function predeclarations -----*/
303 static void conn_maybe_write(Conn *conn);
304 static void conn_make_some_xmits(Conn *conn);
305 static void *conn_write_some_xmits(Conn *conn);
307 static void xmit_free(XmitDetails *d);
309 #define SMS(newstate, periods, why) \
310 (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
311 static void statemc_setstate(StateMachineState newsms, int periods,
312 const char *forlog, const char *why);
314 static void statemc_start_flush(const char *why); /* Normal => Flushing */
315 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
316 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
318 static void article_done(Conn *conn, Article *art, int whichcount);
320 static void check_assign_articles(void);
321 static void queue_check_input_done(void);
323 static void statemc_check_flushing_done(void);
324 static void statemc_check_backlog_done(void);
326 static void postfork(void);
327 static void period(void);
329 static void open_defer(void);
330 static void close_defer(void);
331 static void search_backlog_file(void);
332 static void preterminate(void);
333 static void raise_default(int signo) NORET;
334 static char *debug_report_ipf(InputFile *ipf);
336 static void inputfile_reading_start(InputFile *ipf);
337 static void inputfile_reading_stop(InputFile *ipf);
339 static void filemon_start(InputFile *ipf);
340 static void filemon_stop(InputFile *ipf);
341 static void filemon_callback(InputFile *ipf);
343 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
344 static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
346 static const oop_rd_style peer_rd_style;
347 static oop_rd_call peer_rd_err, peer_rd_ok;
349 /*----- configuration options -----*/
350 /* when changing defaults, remember to update the manpage */
352 static const char *sitename, *remote_host;
353 static const char *feedfile, *realsockdir="/tmp/innduct.control";
354 static int quiet_multiple=0;
355 static int become_daemon=1, try_filemon=1;
356 static int try_stream=1;
358 static const char *inndconffile;
360 static int max_connections=10;
361 static int max_queue_per_conn=200;
362 static int target_max_feedfile_size=100000;
363 static int period_seconds=60;
364 static int filepoll_seconds=5;
366 static int connection_setup_timeout=200;
367 static int inndcomm_flush_timeout=100;
369 static double nocheck_thresh= 95.0; /* converted from percentage by main */
370 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
372 /* all these are initialised to seconds, and converted to periods in main */
373 static int reconnect_delay_periods=1000;
374 static int flushfail_retry_periods=1000;
375 static int backlog_retry_minperiods=50;
376 static int backlog_spontrescan_periods=300;
377 static int spontaneous_flush_periods=100000;
378 static int need_activity_periods=1000;
380 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
381 static int max_bad_data_initial= 30;
382 /* in one corrupt 4096-byte block the number of newlines has
383 * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */
386 /*----- statistics -----*/
388 typedef enum { /* in queue in conn->sent */
389 art_Unchecked, /* not checked, not sent checking */
390 art_Wanted, /* checked, wanted sent body as requested */
391 art_Unsolicited, /* - sent body without check */
395 static const char *const artstate_names[]=
396 { "Unchecked", "Wanted", "Unsolicited", 0 };
398 #define RESULT_COUNTS(RCS,RCN) \
407 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
408 #define RCI_TRIPLE_VALS_BASE(counts,x) \
409 counts[art_Unchecked] x \
410 + counts[art_Wanted] x \
411 + counts[art_Unsolicited] x, \
412 counts[art_Unchecked] x \
413 , counts[art_Wanted] x \
414 , counts[art_Unsolicited] x
417 #define RC_INDEX(x) RC_##x,
418 RESULT_COUNTS(RC_INDEX, RC_INDEX)
423 /*----- transmission buffers -----*/
439 /*----- core operational data structure types -----*/
442 /* This is also an instance of struct oop_readable */
443 struct oop_readable readable; /* first */
444 oop_readable_call *readable_callback;
445 void *readable_callback_user;
448 Filemon_Perfile *filemon;
450 oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
451 long inprogress; /* no. of articles read but not processed */
455 int counts[art_MaxState][RCI_max];
456 int readcount_ok, readcount_blank, readcount_err;
471 #define SMS_LIST(X) \
479 enum StateMachineState {
480 #define SMS_DEF_ENUM(s) sm_##s,
481 SMS_LIST(SMS_DEF_ENUM)
484 static const char *sms_names[]= {
485 #define SMS_DEF_NAME(s) #s ,
486 SMS_LIST(SMS_DEF_NAME)
492 int fd; /* may be 0, meaning closed (during construction/destruction) */
493 oop_read *rd; /* likewise */
494 int max_queue, stream, quitting;
495 int since_activity; /* periods */
496 ArticleList waiting; /* not yet told peer */
497 ArticleList priority; /* peer says send it now */
498 ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
499 struct iovec xmit[CONNIOVS];
500 XmitDetails xmitd[CONNIOVS];
505 /*----- general operational variables -----*/
507 /* main initialises */
508 static oop_source *loop;
509 static ConnList conns;
510 static ArticleList queue;
511 static char *path_lock, *path_flushing, *path_defer;
512 static char *path_control, *path_dump;
513 static char *globpat_backlog;
514 static pid_t self_pid;
516 /* statemc_init initialises */
517 static StateMachineState sms;
518 static int until_flush;
519 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
522 /* initialisation to 0 is good */
523 static int until_connect, until_backlog_nextscan;
524 static double accept_proportion;
525 static int nocheck, nocheck_reported, in_child;
527 /* for simulation, debugging, etc. */
528 int simulate_flush= -1;
530 /*========== logging ==========*/
532 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
533 static void logcore(int sysloglevel, const char *fmt, ...) {
536 vsyslog(sysloglevel,fmt,al);
538 if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
539 vfprintf(stderr,fmt,al);
545 static void logv(int sysloglevel, const char *pfx, int errnoval,
546 const char *fmt, va_list al) PRINTF(5,0);
547 static void logv(int sysloglevel, const char *pfx, int errnoval,
548 const char *fmt, va_list al) {
549 char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
550 vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
551 msgbuf[sizeof(msgbuf)-1]= 0;
553 if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
554 sysloglevel= LOG_ERR; /* run by wrong user, probably */
556 logcore(sysloglevel, "<%s>%s: %s%s%s",
557 sitename, pfx, msgbuf,
558 errnoval>=0 ? ": " : "",
559 errnoval>=0 ? strerror(errnoval) : "");
562 #define diewrap(fn, pfx, sysloglevel, err, estatus) \
563 static void fn(const char *fmt, ...) NORET_PRINTF(1,2); \
564 static void fn(const char *fmt, ...) { \
567 logv(sysloglevel, pfx, err, fmt, al); \
571 #define logwrap(fn, pfx, sysloglevel, err) \
572 static void fn(const char *fmt, ...) PRINTF(1,2); \
573 static void fn(const char *fmt, ...) { \
575 logv(sysloglevel, pfx, err, fmt, al); \
579 diewrap(sysdie, " critical", LOG_CRIT, errno, 16);
580 diewrap(die, " critical", LOG_CRIT, -1, 16);
582 diewrap(sysfatal, " fatal", LOG_ERR, errno, 12);
583 diewrap(fatal, " fatal", LOG_ERR, -1, 12);
585 logwrap(syswarn, " warning", LOG_WARNING, errno);
586 logwrap(warn, " warning", LOG_WARNING, -1);
588 logwrap(notice, " notice", LOG_NOTICE, -1);
589 logwrap(info, " info", LOG_INFO, -1);
590 logwrap(debug, " debug", LOG_DEBUG, -1);
593 /*========== utility functions etc. ==========*/
595 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
596 static char *xvasprintf(const char *fmt, va_list al) {
598 int rc= vasprintf(&str,fmt,al);
599 if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
602 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
603 static char *xasprintf(const char *fmt, ...) {
605 char *str= xvasprintf(fmt,al);
610 static int close_perhaps(int *fd) {
611 if (*fd <= 0) return 0;
616 static void xclose(int fd, const char *what, const char *what2) {
618 if (r) sysdie("close %s%s",what,what2?what2:"");
620 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
621 if (*fd <= 0) return;
622 xclose(*fd,what,what2);
626 static pid_t xfork(const char *what) {
630 if (child==-1) sysfatal("cannot fork for %s",what);
631 debug("forked %s %ld", what, (unsigned long)child);
632 if (!child) postfork();
636 static void on_fd_read_except(int fd, oop_call_fd callback) {
637 loop->on_fd(loop, fd, OOP_READ, callback, 0);
638 loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
640 static void cancel_fd_read_except(int fd) {
641 loop->cancel_fd(loop, fd, OOP_READ);
642 loop->cancel_fd(loop, fd, OOP_EXCEPTION);
645 static void report_child_status(const char *what, int status) {
646 if (WIFEXITED(status)) {
647 int es= WEXITSTATUS(status);
649 warn("%s: child died with error exit status %d", what, es);
650 } else if (WIFSIGNALED(status)) {
651 int sig= WTERMSIG(status);
652 const char *sigstr= strsignal(sig);
653 const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
655 warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
657 warn("%s: child died due to unknown fatal signal %d%s",
658 what, sig, coredump);
660 warn("%s: child died with unknown wait status %d", what,status);
664 static int xwaitpid(pid_t *pid, const char *what) {
667 int r= kill(*pid, SIGKILL);
668 if (r) sysdie("cannot kill %s child", what);
670 pid_t got= waitpid(*pid, &status, 0);
671 if (got==-1) sysdie("cannot reap %s child", what);
672 if (got==0) die("cannot reap %s child", what);
679 static void *zxmalloc(size_t sz) {
680 void *p= xmalloc(sz);
685 static void xunlink(const char *path, const char *what) {
687 if (r) sysdie("can't unlink %s %s", path, what);
690 static time_t xtime(void) {
692 if (now==-1) sysdie("time(2) failed");
696 static void xsigaction(int signo, const struct sigaction *sa) {
697 int r= sigaction(signo,sa,0);
698 if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
701 static void xsigsetdefault(int signo) {
703 memset(&sa,0,sizeof(sa));
704 sa.sa_handler= SIG_DFL;
705 xsigaction(signo,&sa);
708 static void xgettimeofday(struct timeval *tv_r) {
709 int r= gettimeofday(tv_r,0);
710 if (r) sysdie("gettimeofday(2) failed");
713 static void xsetnonblock(int fd, int nonblocking) {
714 int errnoval= oop_fd_nonblock(fd, nonblocking);
715 if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
718 static void check_isreg(const struct stat *stab, const char *path,
720 if (!S_ISREG(stab->st_mode))
721 die("%s %s not a plain file (mode 0%lo)",
722 what, path, (unsigned long)stab->st_mode);
725 static void xfstat(int fd, struct stat *stab_r, const char *what) {
726 int r= fstat(fd, stab_r);
727 if (r) sysdie("could not fstat %s", what);
730 static void xfstat_isreg(int fd, struct stat *stab_r,
731 const char *path, const char *what) {
732 xfstat(fd, stab_r, what);
733 check_isreg(stab_r, path, what);
736 static void xlstat_isreg(const char *path, struct stat *stab,
737 int *enoent_r /* 0 means ENOENT is fatal */,
739 int r= lstat(path, stab);
741 if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
742 sysdie("could not lstat %s %s", what, path);
744 if (enoent_r) *enoent_r= 0;
745 check_isreg(stab, path, what);
748 static int samefile(const struct stat *a, const struct stat *b) {
749 assert(S_ISREG(a->st_mode));
750 assert(S_ISREG(b->st_mode));
751 return (a->st_ino == b->st_ino &&
752 a->st_dev == b->st_dev);
755 static char *sanitise(const char *input, int len) {
756 static char sanibuf[100]; /* returns pointer to this buffer! */
758 const char *p= input;
759 const char *endp= len>=0 ? input+len : 0;
763 if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
764 int c= (!endp || p<endp) ? *p++ : 0;
765 if (!c) { *q++= '\''; *q=0; break; }
766 if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
767 sprintf(q,"\\x%02x",c);
773 static int isewouldblock(int errnoval) {
774 return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
777 /*========== command and control connections ==========*/
779 static int control_master;
781 typedef struct ControlConn ControlConn;
783 void (*destroy)(ControlConn*);
789 struct sockaddr_un un;
794 static const oop_rd_style control_rd_style= {
795 OOP_RD_DELIM_STRIP, '\n',
797 OOP_RD_SHORTREC_FORBID
800 static void control_destroy(ControlConn *cc) {
804 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
805 if (ferror(cc->out) | fflush(cc->out)) {
806 info("CTRL%d write error %s", cc->fd, strerror(errno));
811 static void control_prompt(ControlConn *cc /* may destroy*/) {
812 fprintf(cc->out, "%s| ", sitename);
813 control_checkouterr(cc);
816 struct ControlCommand {
818 void (*f)(ControlConn *cc, const ControlCommand *ccmd,
819 const char *arg, size_t argsz);
824 static const ControlCommand control_commands[];
827 static void ccmd_##wh(ControlConn *cc, const ControlCommand *c, \
828 const char *arg, size_t argsz)
831 fputs("commands:\n", cc->out);
832 const ControlCommand *ccmd;
833 for (ccmd=control_commands; ccmd->cmd; ccmd++)
834 fprintf(cc->out, " %s\n", ccmd->cmd);
835 fputs("NB: permissible arguments are not shown above."
836 " Not all commands listed are safe. See innduct(8).\n", cc->out);
840 int ok= trigger_flush_ok();
841 if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
846 notice("terminating (CTRL%d)",cc->fd);
847 raise_default(SIGTERM);
853 /* messing with our head: */
854 CCMD(period) { period(); }
855 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
856 CCMD(setint) { *(int*)c->xdata= c->xval; }
857 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
859 static const ControlCommand control_commands[]= {
861 { "flush", ccmd_flush },
862 { "stop", ccmd_stop },
863 { "dump q", ccmd_dump, 0,0 },
864 { "dump a", ccmd_dump, 0,1 },
866 { "p", ccmd_period },
868 #define POKES(cmd,func) \
869 { cmd "flush", func, &until_flush, 1 }, \
870 { cmd "conn", func, &until_connect, 0 }, \
871 { cmd "blscan", func, &until_backlog_nextscan, 0 },
872 POKES("next ", ccmd_setint)
873 POKES("prod ", ccmd_setint_period)
875 { "pretend flush", ccmd_setintarg, &simulate_flush },
876 { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 },
880 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
881 const char *errmsg, int errnoval,
882 const char *data, size_t recsz, void *cc_v) {
883 ControlConn *cc= cc_v;
886 info("CTRL%d closed", cc->fd);
891 if (recsz == 0) goto prompt;
893 const ControlCommand *ccmd;
894 for (ccmd=control_commands; ccmd->cmd; ccmd++) {
895 int l= strlen(ccmd->cmd);
896 if (recsz < l) continue;
897 if (recsz > l && data[l] != ' ') continue;
898 if (memcmp(data, ccmd->cmd, l)) continue;
900 int argl= (int)recsz - (l+1);
901 ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
905 fputs("unknown command; h for help\n", cc->out);
912 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
913 const char *errmsg, int errnoval,
914 const char *data, size_t recsz, void *cc_v) {
915 ControlConn *cc= cc_v;
917 info("CTRL%d read error %s", cc->fd, errmsg);
922 static int control_conn_startup(ControlConn *cc /* may destroy*/,
924 cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
925 if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
927 int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
930 if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
932 info("CTRL%d %s ready", cc->fd, how);
937 static void control_stdio_destroy(ControlConn *cc) {
939 oop_rd_cancel(cc->rd);
940 errno= oop_rd_delete_tidy(cc->rd);
941 if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
946 static void control_stdio(void) {
947 NEW_DECL(ControlConn *,cc);
948 cc->destroy= control_stdio_destroy;
952 int r= control_conn_startup(cc,"stdio");
953 if (r) cc->destroy(cc);
956 static void control_accepted_destroy(ControlConn *cc) {
958 oop_rd_cancel(cc->rd);
959 oop_rd_delete_kill(cc->rd);
961 if (cc->out) { fclose(cc->out); cc->fd=0; }
962 close_perhaps(&cc->fd);
966 static void *control_master_readable(oop_source *lp, int master,
967 oop_event ev, void *u) {
968 NEW_DECL(ControlConn *,cc);
969 cc->destroy= control_accepted_destroy;
971 cc->salen= sizeof(cc->sa);
972 cc->fd= accept(master, &cc->sa.sa, &cc->salen);
973 if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
975 cc->out= fdopen(cc->fd, "w");
976 if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
978 int r= control_conn_startup(cc, "accepted");
988 #define NOCONTROL(...) do{ \
989 syswarn("no control socket, because failed to " __VA_ARGS__); \
993 static void control_init(void) {
998 struct sockaddr_un un;
1001 memset(&sa,0,sizeof(sa));
1002 int maxlen= sizeof(sa.un.sun_path);
1004 int reallen= readlink(path_control, sa.un.sun_path, maxlen);
1006 if (errno != ENOENT)
1007 NOCONTROL("readlink control socket symlink path %s", path_control);
1009 if (reallen >= maxlen) {
1010 debug("control socket symlink path too long (r=%d)",reallen);
1011 xunlink(path_control, "old (overlong) control socket symlink");
1017 int r= lstat(realsockdir,&stab);
1019 if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
1021 r= mkdir(realsockdir, 0700);
1022 if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
1025 uid_t self= geteuid();
1026 if (!S_ISDIR(stab.st_mode) ||
1027 stab.st_uid != self ||
1028 stab.st_mode & 0007) {
1029 warn("no control socket, because real socket directory"
1030 " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
1031 !!S_ISDIR(stab.st_mode),
1032 (unsigned long)stab.st_uid, (unsigned long)self,
1033 (unsigned long)stab.st_mode & 0777UL);
1038 real= xasprintf("%s/s%lx.%lx", realsockdir,
1039 (unsigned long)xtime(), (unsigned long)self_pid);
1040 int reallen= strlen(real);
1042 if (reallen >= maxlen) {
1043 warn("no control socket, because tmpnam gave overly-long path"
1047 r= symlink(real, path_control);
1048 if (r) NOCONTROL("make control socket path %s a symlink to real"
1049 " socket path %s", path_control, real);
1050 memcpy(sa.un.sun_path, real, reallen);
1053 int r= unlink(sa.un.sun_path);
1054 if (r && errno!=ENOENT)
1055 NOCONTROL("remove old real socket %s", sa.un.sun_path);
1057 control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1058 if (control_master<0) NOCONTROL("create new control socket");
1060 sa.un.sun_family= AF_UNIX;
1061 int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1062 r= bind(control_master, &sa.sa, sl);
1063 if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1065 r= listen(control_master, 5);
1066 if (r) NOCONTROL("listen");
1068 xsetnonblock(control_master, 1);
1070 loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1071 info("control socket ok, real path %s", sa.un.sun_path);
1077 xclose_perhaps(&control_master, "control master",0);
1081 /*========== management of connections ==========*/
1083 static void conn_closefd(Conn *conn, const char *msgprefix) {
1084 int r= close_perhaps(&conn->fd);
1085 if (r) info("C%d %serror closing socket: %s",
1086 conn->fd, msgprefix, strerror(errno));
1089 static void conn_dispose(Conn *conn) {
1092 oop_rd_cancel(conn->rd);
1093 oop_rd_delete_kill(conn->rd);
1097 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1098 loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1100 conn_closefd(conn,"");
1102 until_connect= reconnect_delay_periods;
1105 static void *conn_exception(oop_source *lp, int fd,
1106 oop_event ev, void *conn_v) {
1109 assert(fd == conn->fd);
1110 assert(ev == OOP_EXCEPTION);
1111 int r= read(conn->fd, &ch, 1);
1112 if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1113 else connfail(conn,"exceptional condition on socket (peer sent urgent"
1114 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1115 return OOP_CONTINUE;
1118 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1119 int requeue[art_MaxState];
1120 memset(requeue,0,sizeof(requeue));
1123 while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art);
1124 while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art);
1125 while ((art= LIST_REMHEAD(conn->sent))) {
1126 requeue[art->state]++;
1127 if (art->state==art_Unsolicited) art->state= art_Unchecked;
1128 LIST_ADDTAIL(queue,art);
1133 for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1136 char *m= xvasprintf(fmt,al);
1137 warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1138 conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1141 LIST_REMOVE(conns,conn);
1143 check_assign_articles();
1146 static void connfail(Conn *conn, const char *fmt, ...) {
1149 vconnfail(conn,fmt,al);
1153 static void check_idle_conns(void) {
1156 conn->since_activity++;
1159 if (conn->since_activity <= need_activity_periods) continue;
1161 /* We need to shut this down */
1163 connfail(conn,"timed out waiting for response to QUIT");
1164 else if (conn->sent.count)
1165 connfail(conn,"timed out waiting for responses");
1166 else if (conn->waiting.count || conn->priority.count)
1167 connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1168 else if (conn->xmitu)
1169 connfail(conn,"peer has been sending responses"
1170 " before receiving our commands!");
1172 static const char quitcmd[]= "QUIT\r\n";
1173 int todo= sizeof(quitcmd)-1;
1174 const char *p= quitcmd;
1176 int r= write(conn->fd, p, todo);
1178 if (isewouldblock(errno))
1179 connfail(conn, "blocked writing QUIT to idle connection");
1181 connfail(conn, "failed to write QUIT to idle connection: %s",
1189 conn->since_activity= 0;
1190 debug("C%d is idle, quitting", conn->fd);
1199 /*---------- making new connections ----------*/
1201 static pid_t connecting_child;
1202 static int connecting_fdpass_sock;
1204 static void connect_attempt_discard(void) {
1205 if (connecting_child) {
1206 int status= xwaitpid(&connecting_child, "connect");
1207 if (!(WIFEXITED(status) ||
1208 (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1209 report_child_status("connect", status);
1211 if (connecting_fdpass_sock) {
1212 cancel_fd_read_except(connecting_fdpass_sock);
1213 xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1217 #define PREP_DECL_MSG_CMSG(msg) \
1219 struct iovec msgiov; \
1220 msgiov.iov_base= &msgbyte; \
1221 msgiov.iov_len= 1; \
1222 struct msghdr msg; \
1223 memset(&msg,0,sizeof(msg)); \
1224 char msg##cbuf[CMSG_SPACE(sizeof(int))]; \
1225 msg.msg_iov= &msgiov; \
1226 msg.msg_iovlen= 1; \
1227 msg.msg_control= msg##cbuf; \
1228 msg.msg_controllen= sizeof(msg##cbuf);
1230 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1233 assert(fd == connecting_fdpass_sock);
1235 PREP_DECL_MSG_CMSG(msg);
1237 ssize_t rs= recvmsg(fd, &msg, 0);
1239 if (isewouldblock(errno)) return OOP_CONTINUE;
1240 syswarn("failed to read socket from connecting child");
1245 LIST_INIT(conn->waiting);
1246 LIST_INIT(conn->priority);
1247 LIST_INIT(conn->sent);
1249 struct cmsghdr *h= 0;
1250 if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1252 int status= xwaitpid(&connecting_child, "connect child (broken)");
1254 if (WIFEXITED(status)) {
1255 if (WEXITSTATUS(status) != 0 &&
1256 WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1257 WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1258 /* child already reported the problem */;
1260 if (e == OOP_EXCEPTION)
1261 warn("connect: connection child exited code %d but"
1262 " unexpected exception on fdpass socket",
1263 WEXITSTATUS(status));
1265 warn("connect: connection child exited code %d but"
1267 WEXITSTATUS(status), (int)rs);
1269 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1270 warn("connect: connection attempt timed out");
1272 report_child_status("connect", status);
1277 #define CHK(field, val) \
1278 if (h->cmsg_##field != val) { \
1279 die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1280 h->cmsg_##field, val); \
1283 CHK(level, SOL_SOCKET);
1284 CHK(type, SCM_RIGHTS);
1285 CHK(len, CMSG_LEN(sizeof(conn->fd)));
1288 if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1290 memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1293 pid_t got= waitpid(connecting_child, &status, 0);
1294 if (got==-1) sysdie("connect: real wait for child");
1295 assert(got == connecting_child);
1296 connecting_child= 0;
1298 if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1299 int es= WEXITSTATUS(status);
1301 case CONNCHILD_ESTATUS_STREAM: conn->stream= 1; break;
1302 case CONNCHILD_ESTATUS_NOSTREAM: conn->stream= 0; break;
1304 fatal("connect: child gave unexpected exit status %d", es);
1308 conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1310 loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1311 conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1312 if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1313 int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1315 &peer_rd_err, conn);
1316 if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1318 notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1319 LIST_ADDHEAD(conns, conn);
1321 connect_attempt_discard();
1322 check_assign_articles();
1323 return OOP_CONTINUE;
1327 connect_attempt_discard();
1328 return OOP_CONTINUE;
1331 static int allow_connect_start(void) {
1332 return conns.count < max_connections
1333 && !connecting_child
1337 static void connect_start(void) {
1338 assert(!connecting_child);
1339 assert(!connecting_fdpass_sock);
1341 info("starting connection attempt");
1344 int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1345 if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1347 connecting_child= xfork("connection");
1349 if (!connecting_child) {
1350 FILE *cn_from, *cn_to;
1351 char buf[NNTP_STRLEN+100];
1352 int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1354 xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1356 alarm(connection_setup_timeout);
1357 if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1361 unsigned char c= buf[l-1];
1362 if (!isspace(c)) break;
1363 if (c=='\n' || c=='\r') stripped=1;
1367 sysfatal("connect: connection attempt failed");
1370 fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1374 if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1375 sysfatal("connect: authentication failed");
1377 if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1379 sysfatal("connect: could not send MODE STREAM");
1380 buf[sizeof(buf)-1]= 0;
1381 if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1382 if (ferror(cn_from))
1383 sysfatal("connect: could not read response to MODE STREAM");
1385 fatal("connect: connection close in response to MODE STREAM");
1390 fatal("connect: response to MODE STREAM is too long: %.100s...",
1392 l--; if (l>0 && buf[l-1]=='\r') l--;
1395 int rcode= strtoul(buf,&ep,10);
1397 fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1401 exitstatus= CONNCHILD_ESTATUS_STREAM;
1407 warn("connect: unexpected response to MODE STREAM: %.50s",
1413 int fd= fileno(cn_from);
1415 PREP_DECL_MSG_CMSG(msg);
1416 struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1417 cmsg->cmsg_level= SOL_SOCKET;
1418 cmsg->cmsg_type= SCM_RIGHTS;
1419 cmsg->cmsg_len= CMSG_LEN(sizeof(fd));
1420 memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1422 msg.msg_controllen= cmsg->cmsg_len;
1423 r= sendmsg(socks[1], &msg, 0);
1424 if (r<0) sysdie("sendmsg failed for new connection");
1425 if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1430 xclose(socks[1], "connecting fdpass child's socket",0);
1431 connecting_fdpass_sock= socks[0];
1432 xsetnonblock(connecting_fdpass_sock, 1);
1433 on_fd_read_except(connecting_fdpass_sock, connchild_event);
1436 /*---------- assigning articles to conns, and transmitting ----------*/
1438 static void check_assign_articles(void) {
1444 int spare=0, inqueue=0;
1446 /* Find a connection to offer this article. We prefer a busy
1447 * connection to an idle one, provided it's not full. We take the
1448 * first (oldest) and since that's stable, it will mean we fill up
1449 * connections in order. That way if we have too many
1450 * connections, the spare ones will go away eventually.
1453 if (walk->quitting) continue;
1454 inqueue= walk->sent.count + walk->priority.count
1455 + walk->waiting.count;
1456 spare= walk->max_queue - inqueue;
1457 assert(inqueue <= max_queue_per_conn);
1459 if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1460 else if (spare>0) /*working*/ { use= walk; break; }
1463 if (!inqueue) use->since_activity= 0; /* reset idle counter */
1465 Article *art= LIST_REMHEAD(queue);
1467 LIST_ADDTAIL(use->waiting, art);
1470 conn_maybe_write(use);
1471 } else if (allow_connect_start()) {
1472 until_connect= reconnect_delay_periods;
1481 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1482 conn_maybe_write(u);
1483 return OOP_CONTINUE;
1486 static void conn_maybe_write(Conn *conn) {
1488 conn_make_some_xmits(conn);
1490 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1494 void *rp= conn_write_some_xmits(conn);
1495 if (rp==OOP_CONTINUE) {
1496 loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1498 } else if (rp==OOP_HALT) {
1501 /* transmitted everything */
1508 /*========== article transmission ==========*/
1510 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1511 XmitKind kind) { /* caller must then fill in details */
1512 struct iovec *v= &conn->xmit[conn->xmitu];
1513 XmitDetails *d= &conn->xmitd[conn->xmitu++];
1514 v->iov_base= (char*)data;
1520 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1521 xmit_core(conn,data,len, xk_Const);
1523 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1525 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1526 XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1530 static void xmit_free(XmitDetails *d) {
1532 case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1533 case xk_Const: break;
1538 static void *conn_write_some_xmits(Conn *conn) {
1540 * 0: nothing more to write, no need to call us again
1541 * OOP_CONTINUE: more to write but fd not writeable
1542 * OOP_HALT: disaster, have destroyed conn
1545 int count= conn->xmitu;
1546 if (!count) return 0;
1548 if (count > IOV_MAX) count= IOV_MAX;
1549 ssize_t rs= writev(conn->fd, conn->xmit, count);
1551 if (isewouldblock(errno)) return OOP_CONTINUE;
1552 connfail(conn, "write failed: %s", strerror(errno));
1558 for (done=0; rs && done<conn->xmitu; done++) {
1559 struct iovec *vp= &conn->xmit[done];
1560 XmitDetails *dp= &conn->xmitd[done];
1561 if (rs > vp->iov_len) {
1565 vp->iov_base= (char*)vp->iov_base + rs;
1569 int newu= conn->xmitu - done;
1570 memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
1571 memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1576 static void conn_make_some_xmits(Conn *conn) {
1578 if (conn->xmitu+5 > CONNIOVS)
1581 Article *art= LIST_REMHEAD(conn->priority);
1582 if (!art) art= LIST_REMHEAD(conn->waiting);
1585 if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1586 /* actually send it */
1588 ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1591 art->state == art_Unchecked ? art_Unsolicited :
1592 art->state == art_Wanted ? art_Wanted :
1595 if (!artdata) art->missing= 1;
1596 art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1600 XMIT_LITERAL("TAKETHIS ");
1601 xmit_noalloc(conn, art->messageid, art->midlen);
1602 XMIT_LITERAL("\r\n");
1603 xmit_artbody(conn, artdata);
1605 article_done(conn, art, -1);
1609 /* we got 235 from IHAVE */
1611 xmit_artbody(conn, artdata);
1613 XMIT_LITERAL(".\r\n");
1617 LIST_ADDTAIL(conn->sent, art);
1623 XMIT_LITERAL("CHECK ");
1625 XMIT_LITERAL("IHAVE ");
1626 xmit_noalloc(conn, art->messageid, art->midlen);
1627 XMIT_LITERAL("\r\n");
1629 assert(art->state == art_Unchecked);
1630 art->ipf->counts[art->state][RC_sent]++;
1631 LIST_ADDTAIL(conn->sent, art);
1637 /*========== handling responses from peer ==========*/
1639 static const oop_rd_style peer_rd_style= {
1640 OOP_RD_DELIM_STRIP, '\n',
1642 OOP_RD_SHORTREC_FORBID
1645 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1646 const char *errmsg, int errnoval,
1647 const char *data, size_t recsz, void *conn_v) {
1649 connfail(conn, "error receiving from peer: %s", errmsg);
1650 return OOP_CONTINUE;
1653 static Article *article_reply_check(Conn *conn, const char *response,
1654 int code_indicates_streaming,
1656 /* 1:yes, -1:no, 0:dontcare */,
1657 const char *sanitised_response) {
1658 Article *art= LIST_HEAD(conn->sent);
1662 "peer gave unexpected response when no commands outstanding: %s",
1663 sanitised_response);
1667 if (code_indicates_streaming) {
1668 assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1669 if (!conn->stream) {
1670 connfail(conn, "peer gave streaming response code "
1671 " to IHAVE or subsequent body: %s", sanitised_response);
1674 const char *got_mid= response+4;
1675 int got_midlen= strcspn(got_mid, " \n\r");
1676 if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1677 connfail(conn, "peer gave streaming response with syntactically invalid"
1678 " messageid: %s", sanitised_response);
1681 if (got_midlen != art->midlen ||
1682 memcmp(got_mid, art->messageid, got_midlen)) {
1683 connfail(conn, "peer gave streaming response code to wrong article -"
1684 " probable synchronisation problem; we offered: %s;"
1686 art->messageid, sanitised_response);
1691 connfail(conn, "peer gave non-streaming response code to"
1692 " CHECK/TAKETHIS: %s", sanitised_response);
1697 if (must_have_sent>0 && art->state < art_Wanted) {
1698 connfail(conn, "peer says article accepted but"
1699 " we had not sent the body: %s", sanitised_response);
1702 if (must_have_sent<0 && art->state >= art_Wanted) {
1703 connfail(conn, "peer says please sent the article but we just did: %s",
1704 sanitised_response);
1708 Article *art_again= LIST_REMHEAD(conn->sent);
1709 assert(art_again == art);
1713 static void update_nocheck(int accepted) {
1714 accept_proportion *= nocheck_decay;
1715 accept_proportion += accepted * (1.0 - nocheck_decay);
1716 int new_nocheck= accept_proportion >= nocheck_thresh;
1717 if (new_nocheck && !nocheck_reported) {
1718 notice("entering nocheck mode for the first time");
1719 nocheck_reported= 1;
1720 } else if (new_nocheck != nocheck) {
1721 debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1723 nocheck= new_nocheck;
1726 static void article_done(Conn *conn, Article *art, int whichcount) {
1727 if (!art->missing) art->ipf->counts[art->state][whichcount]++;
1729 if (whichcount == RC_accepted) update_nocheck(1);
1730 else if (whichcount == RC_unwanted) update_nocheck(0);
1732 InputFile *ipf= art->ipf;
1734 while (art->blanklen) {
1735 static const char spaces[]=
1745 int w= art->blanklen; if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1746 int r= pwrite(ipf->fd, spaces, w, art->offset);
1748 if (errno==EINTR) continue;
1749 sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1750 art->messageid, art->blanklen,
1751 (unsigned long)art->offset, ipf->path);
1753 assert(r>=0 && r<=w);
1759 assert(ipf->inprogress >= 0);
1762 if (!ipf->inprogress && ipf != main_input_file)
1763 queue_check_input_done();
1766 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1767 const char *errmsg, int errnoval,
1768 const char *data, size_t recsz, void *conn_v) {
1771 if (ev == OOP_RD_EOF) {
1772 connfail(conn, "unexpected EOF from peer");
1773 return OOP_CONTINUE;
1775 assert(ev == OOP_RD_OK);
1777 char *sani= sanitise(data,-1);
1780 unsigned long code= strtoul(data, &ep, 10);
1781 if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1782 connfail(conn, "badly formatted response from peer: %s", sani);
1783 return OOP_CONTINUE;
1787 conn->waiting.count ||
1788 conn->priority.count ||
1792 if (conn->quitting) {
1793 if (code!=205 && code!=503) {
1794 connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1796 notice("C%d idle connection closed by us", conn->fd);
1798 LIST_REMOVE(conns,conn);
1801 return OOP_CONTINUE;
1804 conn->since_activity= 0;
1807 #define GET_ARTICLE(musthavesent) do{ \
1808 art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1809 if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */ \
1812 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \
1813 code_streaming= (streaming); \
1814 GET_ARTICLE(musthavesent); \
1815 article_done(conn, art, RC_##how); \
1819 #define PEERBADMSG(m) do { \
1820 connfail(conn, m ": %s", sani); return OOP_CONTINUE; \
1823 int code_streaming= 0;
1827 case 400: PEERBADMSG("peer stopped accepting articles");
1828 default: PEERBADMSG("peer sent unexpected message");
1831 if (conn_busy) PEERBADMSG("peer timed us out");
1832 notice("C%d idle connection closed by peer", conn->fd);
1833 LIST_REMOVE(conns,conn);
1835 return OOP_CONTINUE;
1837 case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1838 case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1840 case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1841 case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1843 case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1844 case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1846 case 238: /* CHECK says send it */
1848 case 335: /* IHAVE says send it */
1850 assert(art->state == art_Unchecked);
1851 art->ipf->counts[art->state][RC_accepted]++;
1852 art->state= art_Wanted;
1853 LIST_ADDTAIL(conn->priority, art);
1856 case 431: /* CHECK or TAKETHIS says try later */
1858 case 436: /* IHAVE says try later */
1861 if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1863 sysfatal("write to defer file %s",path_defer);
1864 article_done(conn, art, RC_deferred);
1870 conn_maybe_write(conn);
1871 check_assign_articles();
1872 return OOP_CONTINUE;
1876 /*========== monitoring of input files ==========*/
1878 static void feedfile_eof(InputFile *ipf) {
1879 assert(ipf != main_input_file); /* promised by tailing_try_read */
1880 inputfile_reading_stop(ipf);
1882 if (ipf == flushing_input_file) {
1883 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1884 if (main_input_file) inputfile_reading_start(main_input_file);
1885 statemc_check_flushing_done();
1886 } else if (ipf == backlog_input_file) {
1887 statemc_check_backlog_done();
1889 abort(); /* supposed to wait rather than get EOF on main input file */
1893 static InputFile *open_input_file(const char *path) {
1894 int fd= open(path, O_RDWR);
1896 if (errno==ENOENT) return 0;
1897 sysfatal("unable to open input file %s", path);
1901 InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
1902 memset(ipf,0,sizeof(*ipf));
1905 strcpy(ipf->path, path);
1910 static void close_input_file(InputFile *ipf) { /* does not free */
1911 assert(!ipf->readable_callback); /* must have had ->on_cancel */
1912 assert(!ipf->filemon); /* must have had inputfile_reading_stop */
1913 assert(!ipf->rd); /* must have had inputfile_reading_stop */
1914 assert(!ipf->inprogress); /* no dangling pointers pointing here */
1915 xclose_perhaps(&ipf->fd, "input file ", ipf->path);
1919 /*---------- dealing with articles read in the input file ----------*/
1921 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
1922 const char *data, const char *how) {
1923 warn("corrupted file: %s, offset %lu: %s: in %s",
1924 ipf->path, (unsigned long)offset, how, sanitise(data,-1));
1925 ipf->readcount_err++;
1926 if (ipf->readcount_err > max_bad_data_initial +
1927 (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
1928 die("too much garbage in input file! (%d errs, %d ok, %d blank)",
1929 ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
1930 return OOP_CONTINUE;
1933 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
1934 oop_rd_event ev, const char *errmsg,
1935 int errnoval, const char *data, size_t recsz,
1937 InputFile *ipf= ipf_v;
1938 assert(ev == OOP_RD_SYSTEM);
1940 sysdie("error reading input file: %s, offset %lu",
1941 ipf->path, (unsigned long)ipf->offset);
1944 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
1945 oop_rd_event ev, const char *errmsg,
1946 int errnoval, const char *data, size_t recsz,
1948 InputFile *ipf= ipf_v;
1950 char tokentextbuf[sizeof(TOKEN)*2+3];
1952 if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
1954 off_t old_offset= ipf->offset;
1955 ipf->offset += recsz + !!(ev == OOP_RD_OK);
1957 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
1959 if (ev==OOP_RD_PARTREC)
1960 feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
1961 /* but process it anyway */
1963 if (ipf->skippinglong) {
1964 if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
1965 return OOP_CONTINUE;
1967 if (ev==OOP_RD_LONG) {
1968 ipf->skippinglong= 1;
1969 X_BAD_DATA("overly long line");
1972 if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
1973 if (!recsz) X_BAD_DATA("empty line");
1976 if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
1977 ipf->readcount_blank++;
1978 return OOP_CONTINUE;
1981 char *space= strchr(data,' ');
1982 int tokenlen= space-data;
1983 int midlen= (int)recsz-tokenlen-1;
1984 if (midlen <= 2) X_BAD_DATA("no room for messageid");
1985 if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
1987 if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
1988 memcpy(tokentextbuf, data, tokenlen);
1989 tokentextbuf[tokenlen]= 0;
1990 if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
1992 ipf->readcount_ok++;
1994 art= xmalloc(sizeof(*art) - 1 + midlen + 1);
1995 memset(art,0,sizeof(*art));
1996 art->state= art_Unchecked;
1997 art->midlen= midlen;
1998 art->ipf= ipf; ipf->inprogress++;
1999 art->token= TextToToken(tokentextbuf);
2000 art->offset= old_offset;
2001 art->blanklen= recsz;
2002 strcpy(art->messageid, space+1);
2003 LIST_ADDTAIL(queue, art);
2005 if (sms==sm_NORMAL && ipf==main_input_file &&
2006 ipf->offset >= target_max_feedfile_size)
2007 statemc_start_flush("feed file size");
2009 check_assign_articles();
2010 return OOP_CONTINUE;
2013 /*========== tailing input file ==========*/
2015 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
2017 InputFile *ipf= user;
2018 return ipf->readable_callback(loop, &ipf->readable,
2019 ipf->readable_callback_user);
2022 static void tailing_on_cancel(struct oop_readable *rable) {
2023 InputFile *ipf= (void*)rable;
2025 if (ipf->filemon) filemon_stop(ipf);
2026 loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2027 ipf->readable_callback= 0;
2030 static void tailing_queue_readable(InputFile *ipf) {
2031 /* lifetime of ipf here is OK because destruction will cause
2032 * on_cancel which will cancel this callback */
2033 loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2036 static int tailing_on_readable(struct oop_readable *rable,
2037 oop_readable_call *cb, void *user) {
2038 InputFile *ipf= (void*)rable;
2040 tailing_on_cancel(rable);
2041 ipf->readable_callback= cb;
2042 ipf->readable_callback_user= user;
2045 tailing_queue_readable(ipf);
2049 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2051 InputFile *ipf= (void*)rable;
2053 ssize_t r= read(ipf->fd, buffer, length);
2055 if (errno==EINTR) continue;
2059 if (ipf==main_input_file) {
2062 } else if (ipf==flushing_input_file) {
2064 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2065 } else if (ipf==backlog_input_file) {
2071 tailing_queue_readable(ipf);
2076 /*---------- filemon implemented with inotify ----------*/
2078 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2079 #define HAVE_FILEMON
2081 #include <sys/inotify.h>
2083 static int filemon_inotify_fd;
2084 static int filemon_inotify_wdmax;
2085 static InputFile **filemon_inotify_wd2ipf;
2087 struct Filemon_Perfile {
2091 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2092 int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2093 if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2095 if (wd >= filemon_inotify_wdmax) {
2097 filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2098 sizeof(*filemon_inotify_wd2ipf) * newmax);
2099 memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2100 sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2101 filemon_inotify_wdmax= newmax;
2104 assert(!filemon_inotify_wd2ipf[wd]);
2105 filemon_inotify_wd2ipf[wd]= ipf;
2107 debug("filemon inotify startfile %p wd=%d wdmax=%d",
2108 ipf, wd, filemon_inotify_wdmax);
2113 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2115 debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2116 int r= inotify_rm_watch(filemon_inotify_fd, wd);
2117 if (r) sysdie("inotify_rm_watch");
2118 filemon_inotify_wd2ipf[wd]= 0;
2121 static void *filemon_inotify_readable(oop_source *lp, int fd,
2122 oop_event e, void *u) {
2123 struct inotify_event iev;
2125 int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2127 if (isewouldblock(errno)) break;
2128 sysdie("read from inotify master");
2129 } else if (r==sizeof(iev)) {
2130 assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2132 die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2134 InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2135 debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2136 filemon_callback(ipf);
2138 return OOP_CONTINUE;
2141 static int filemon_method_init(void) {
2142 filemon_inotify_fd= inotify_init();
2143 if (filemon_inotify_fd<0) {
2144 syswarn("filemon/inotify: inotify_init failed");
2147 xsetnonblock(filemon_inotify_fd, 1);
2148 loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2150 debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2154 static void filemon_method_dump_info(FILE *f) {
2156 fprintf(f,"inotify");
2157 DUMPV("%d",,filemon_inotify_fd);
2158 DUMPV("%d",,filemon_inotify_wdmax);
2159 for (i=0; i<filemon_inotify_wdmax; i++)
2160 fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i],);
2163 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2165 /*---------- filemon dummy implementation ----------*/
2167 #if !defined(HAVE_FILEMON)
2169 struct Filemon_Perfile { int dummy; };
2171 static int filemon_method_init(void) {
2172 warn("filemon/dummy: no filemon method compiled in");
2175 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2176 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2177 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2179 #endif /* !HAVE_FILEMON */
2181 /*---------- filemon generic interface ----------*/
2183 static void filemon_start(InputFile *ipf) {
2184 assert(!ipf->filemon);
2187 filemon_method_startfile(ipf, ipf->filemon);
2190 static void filemon_stop(InputFile *ipf) {
2191 if (!ipf->filemon) return;
2192 filemon_method_stopfile(ipf, ipf->filemon);
2197 static void filemon_callback(InputFile *ipf) {
2198 if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2199 ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2202 /*---------- interface to start and stop an input file ----------*/
2204 static const oop_rd_style feedfile_rdstyle= {
2205 OOP_RD_DELIM_STRIP, '\n',
2207 OOP_RD_SHORTREC_LONG,
2210 static void inputfile_reading_start(InputFile *ipf) {
2212 ipf->readable.on_readable= tailing_on_readable;
2213 ipf->readable.on_cancel= tailing_on_cancel;
2214 ipf->readable.try_read= tailing_try_read;
2215 ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2216 ipf->readable.delete_kill= 0;
2218 ipf->readable_callback= 0;
2219 ipf->readable_callback_user= 0;
2221 ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2224 int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2225 feedfile_got_article,ipf, feedfile_read_err, ipf);
2226 if (r) sysdie("unable start reading feedfile %s",ipf->path);
2229 static void inputfile_reading_stop(InputFile *ipf) {
2231 oop_rd_cancel(ipf->rd);
2232 oop_rd_delete(ipf->rd);
2234 assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2238 /*========== interaction with innd - state machine ==========*/
2240 /* See official state diagram at top of file. We implement
2251 |`---------------------------------------------------.
2253 |`---------------- - - - |
2254 D ENOENT | D EXISTS see OVERALL STATES diagram |
2255 | for full startup logic |
2258 | ============ try to |
2264 | | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT |
2265 ^ | hardlink F to D |
2268 | | our handle onto F is now onto D |
2271 | |<-------------------<---------------------<---------+
2273 | | spawn inndcomm flush |
2275 | ================== |
2276 | FLUSHING[-ABSENT] |
2278 | main D tail/none |
2279 | ================== |
2281 | | INNDCOMM FLUSH FAILS ^
2282 | |`----------------------->----------. |
2284 | | NO SUCH SITE V |
2285 ^ |`--------------->----. ==================== |
2286 | | \ FLUSHFAILED[-ABSENT] |
2288 | | FLUSH OK \ main D tail/none |
2289 | | open F \ ==================== |
2291 | | \ | TIME TO RETRY |
2292 | |`------->----. ,---<---'\ `----------------'
2293 | | D NONE | | D NONE `----.
2295 | ============= V V ============
2296 | SEPARATED-1 | | DROPPING-1
2297 | flsh->rd!=0 | | flsh->rd!=0
2298 | [Separated] | | [Dropping]
2299 | main F idle | | main none
2300 | flsh D tail | | flsh D tail
2301 | ============= | | ============
2303 ^ | EOF ON D | | defer | EOF ON D
2305 | =============== | | ===============
2306 | SEPARATED-2 | | DROPPING-2
2307 | flsh->rd==0 | V flsh->rd==0
2308 | [Finishing] | | [Dropping]
2309 | main F tail | `. main none
2310 | flsh D closed | `. flsh D closed
2311 | =============== V `. ===============
2313 | | ALL D PROCESSED `. | ALL D PROCESSED
2314 | V install defer as backlog `. | install defer
2315 ^ | close D `. | close D
2316 | | unlink D `. | unlink D
2319 `----------' ==============
2339 static void startup_set_input_file(InputFile *f) {
2340 assert(!main_input_file);
2342 inputfile_reading_start(f);
2345 static void statemc_lock(void) {
2347 struct stat stab, stabf;
2350 lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2351 if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2354 memset(&fl,0,sizeof(fl));
2356 fl.l_whence= SEEK_SET;
2357 int r= fcntl(lockfd, F_SETLK, &fl);
2359 if (errno==EACCES || isewouldblock(errno)) {
2360 if (quiet_multiple) exit(0);
2361 fatal("another duct holds the lockfile");
2363 sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2366 xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2368 xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2370 if (!lock_noent && samefile(&stab, &stabf))
2373 xclose(lockfd, "stale lockfile ", path_lock);
2376 FILE *lockfile= fdopen(lockfd, "w");
2377 if (!lockfile) sysdie("fdopen lockfile");
2379 int r= ftruncate(lockfd, 0);
2380 if (r) sysdie("truncate lockfile to write new info");
2382 if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2383 (unsigned long)self_pid,
2384 sitename, feedfile, remote_host) == EOF ||
2386 sysfatal("write info to lockfile %s", path_lock);
2388 debug("startup: locked");
2391 static void statemc_init(void) {
2392 struct stat stabdefer;
2394 search_backlog_file();
2397 xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2399 debug("startup: ductdefer ENOENT");
2401 debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2402 switch (stabdefer.st_nlink==1) {
2404 open_defer(); /* so that we will later close it and rename it */
2407 xunlink(path_defer, "stale defer file link"
2408 " (presumably hardlink to backlog file)");
2411 die("defer file %s has unexpected link count %d",
2412 path_defer, stabdefer.st_nlink);
2416 struct stat stab_f, stab_d;
2419 InputFile *file_d= open_input_file(path_flushing);
2420 if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2422 xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2424 if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2425 debug("startup: F==D => Hardlinked");
2426 xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2431 debug("startup: F ENOENT => Moved");
2432 if (file_d) startup_set_input_file(file_d);
2433 spawn_inndcomm_flush("feedfile missing at startup");
2434 /* => Flushing, sms:=FLUSHING */
2437 debug("startup: F!=D => Separated");
2438 startup_set_input_file(file_d);
2439 flushing_input_file= main_input_file;
2440 main_input_file= open_input_file(feedfile);
2441 if (!main_input_file) die("feedfile vanished during startup");
2442 SMS(SEPARATED, 0, "found both old and current feed files");
2444 debug("startup: F exists, D ENOENT => Normal");
2445 InputFile *file_f= open_input_file(feedfile);
2446 if (!file_f) die("feed file vanished during startup");
2447 startup_set_input_file(file_f);
2448 SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2453 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2454 assert(sms == sm_NORMAL);
2456 debug("starting flush (%s) (%lu >?= %lu) (%d)",
2458 (unsigned long)(main_input_file ? main_input_file->offset : 0),
2459 (unsigned long)target_max_feedfile_size,
2462 int r= link(feedfile, path_flushing);
2463 if (r) sysfatal("link feedfile %s to flushing file %s",
2464 feedfile, path_flushing);
2467 xunlink(feedfile, "old feedfile link");
2470 spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2473 static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
2476 statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2478 case sm_FLUSHFAILED:
2479 spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2486 static void statemc_period_poll(void) {
2487 if (!until_flush) return;
2489 assert(until_flush>=0);
2491 if (until_flush) return;
2492 int ok= trigger_flush_ok();
2496 static int inputfile_is_done(InputFile *ipf) {
2498 if (ipf->inprogress) return 0; /* new article in the meantime */
2499 if (ipf->rd) return 0; /* not had EOF */
2503 static void notice_processed(InputFile *ipf, int completed,
2504 const char *what, const char *spec) {
2505 if (!ipf) return; /* allows preterminate to be lazy */
2507 #define RCI_NOTHING(x) /* nothing */
2508 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2509 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2511 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2513 char *inprog= completed
2514 ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2515 : xasprintf(" inprogress=%ld", ipf->inprogress);
2517 info("%s %s%s read=%d (+bl=%d,+err=%d)%s"
2518 " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2519 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2521 completed?"completed":"processed", what, spec,
2522 ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, inprog,
2523 CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2524 , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2525 CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2526 , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2527 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
2535 static void statemc_check_backlog_done(void) {
2536 InputFile *ipf= backlog_input_file;
2537 if (!inputfile_is_done(ipf)) return;
2539 const char *slash= strrchr(ipf->path, '/');
2540 const char *leaf= slash ? slash+1 : ipf->path;
2541 const char *under= strchr(slash, '_');
2542 const char *rest= under ? under+1 : leaf;
2543 if (!strncmp(rest,"backlog",7)) rest += 7;
2544 notice_processed(ipf,1,"backlog ",rest);
2546 close_input_file(ipf);
2547 if (unlink(ipf->path)) {
2548 if (errno != ENOENT)
2549 sysdie("could not unlink processed backlog file %s", ipf->path);
2550 warn("backlog file %s vanished while we were reading it"
2551 " so we couldn't remove it (but it's done now, anyway)",
2555 backlog_input_file= 0;
2556 search_backlog_file();
2560 static void statemc_check_flushing_done(void) {
2561 InputFile *ipf= flushing_input_file;
2562 if (!inputfile_is_done(ipf)) return;
2564 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2566 notice_processed(ipf,1,"feedfile","");
2570 xunlink(path_flushing, "old flushing file");
2572 close_input_file(flushing_input_file);
2573 free(flushing_input_file);
2574 flushing_input_file= 0;
2576 if (sms==sm_SEPARATED) {
2577 notice("flush complete");
2578 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2579 } else if (sms==sm_DROPPING) {
2580 SMS(DROPPED, 0, "old flush complete");
2581 search_backlog_file();
2582 notice("feed dropped, but will continue until backlog is finished");
2586 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2588 assert(!inputfile_is_done(main_input_file));
2589 statemc_check_flushing_done();
2590 statemc_check_backlog_done();
2591 return OOP_CONTINUE;
2594 static void queue_check_input_done(void) {
2595 loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2598 static void statemc_setstate(StateMachineState newsms, int periods,
2599 const char *forlog, const char *why) {
2601 until_flush= periods;
2603 const char *xtra= "";
2606 case sm_FLUSHFAILED:
2607 if (!main_input_file) xtra= "-ABSENT";
2611 xtra= flushing_input_file->rd ? "-1" : "-2";
2617 info("state %s%s[%d] %s",forlog,xtra,periods,why);
2619 info("state %s%s %s",forlog,xtra,why);
2623 /*---------- defer and backlog files ----------*/
2625 static void open_defer(void) {
2630 defer= fopen(path_defer, "a+");
2631 if (!defer) sysfatal("could not open defer file %s", path_defer);
2633 /* truncate away any half-written records */
2635 xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2637 if (stab.st_size > LONG_MAX)
2638 die("defer file %s size is far too large", path_defer);
2643 long orgsize= stab.st_size;
2644 long truncto= stab.st_size;
2646 if (!truncto) break; /* was only (if anything) one half-truncated record */
2647 if (fseek(defer, truncto-1, SEEK_SET) < 0)
2648 sysdie("seek in defer file %s while truncating partial", path_defer);
2653 sysdie("failed read from defer file %s", path_defer);
2655 die("defer file %s shrank while we were checking it!", path_defer);
2661 if (stab.st_size != truncto) {
2662 warn("truncating half-record at end of defer file %s -"
2663 " shrinking by %ld bytes from %ld to %ld",
2664 path_defer, orgsize - truncto, orgsize, truncto);
2667 sysfatal("could not flush defer file %s", path_defer);
2668 if (ftruncate(fileno(defer), truncto))
2669 sysdie("could not truncate defer file %s", path_defer);
2672 info("continuing existing defer file %s (%ld bytes)",
2673 path_defer, orgsize);
2675 if (fseek(defer, truncto, SEEK_SET))
2676 sysdie("could not seek to new end of defer file %s", path_defer);
2679 static void close_defer(void) {
2684 xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2686 if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2689 time_t now= xtime();
2691 char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2693 (unsigned long)stab.st_ino);
2694 if (link(path_defer, backlog))
2695 sysfatal("could not install defer file %s as backlog file %s",
2696 path_defer, backlog);
2697 if (unlink(path_defer))
2698 sysdie("could not unlink old defer link %s to backlog file %s",
2699 path_defer, backlog);
2703 if (until_backlog_nextscan < 0 ||
2704 until_backlog_nextscan > backlog_retry_minperiods + 1)
2705 until_backlog_nextscan= backlog_retry_minperiods + 1;
2708 static void poll_backlog_file(void) {
2709 if (until_backlog_nextscan < 0) return;
2710 if (until_backlog_nextscan-- > 0) return;
2711 search_backlog_file();
2714 static void search_backlog_file(void) {
2715 /* returns non-0 iff there are any backlog files */
2720 const char *oldest_path=0;
2721 time_t oldest_mtime=0, now;
2723 if (backlog_input_file) return;
2727 r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2731 sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2733 fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2735 for (i=0; i<gl.gl_pathc; i++) {
2736 const char *path= gl.gl_pathv[i];
2738 if (strchr(path,'#') || strchr(path,'~')) {
2739 debug("backlog file search skipping %s", path);
2742 r= stat(path, &stab);
2744 syswarn("failed to stat backlog file %s", path);
2747 if (!S_ISREG(stab.st_mode)) {
2748 warn("backlog file %s is not a plain file (or link to one)", path);
2751 if (!oldest_path || stab.st_mtime < oldest_mtime) {
2753 oldest_mtime= stab.st_mtime;
2756 case GLOB_NOMATCH: /* fall through */
2759 sysdie("glob expansion of backlog pattern %s gave unexpected"
2760 " nonzero (error?) return value %d", globpat_backlog, r);
2764 debug("backlog scan: none");
2766 if (sms==sm_DROPPED) {
2768 notice("feed dropped and our work is complete");
2770 int r= unlink(path_control);
2771 if (r && errno!=ENOENT)
2772 syswarn("failed to remove control symlink for old feed");
2774 xunlink(path_lock, "lockfile for old feed");
2777 until_backlog_nextscan= backlog_spontrescan_periods;
2782 double age= difftime(now, oldest_mtime);
2783 long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2785 if (age_deficiency <= 0) {
2786 debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2787 age, age_deficiency, oldest_path);
2789 backlog_input_file= open_input_file(oldest_path);
2790 if (!backlog_input_file) {
2791 warn("backlog file %s vanished as we opened it", oldest_path);
2795 inputfile_reading_start(backlog_input_file);
2796 until_backlog_nextscan= -1;
2800 until_backlog_nextscan= age_deficiency / period_seconds;
2802 if (backlog_spontrescan_periods >= 0 &&
2803 until_backlog_nextscan > backlog_spontrescan_periods)
2804 until_backlog_nextscan= backlog_spontrescan_periods;
2806 debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2807 age, age_deficiency, until_backlog_nextscan, oldest_path);
2814 /*---------- shutdown and signal handling ----------*/
2816 static void preterminate(void) {
2817 if (in_child) return;
2818 notice_processed(main_input_file,0,"feedfile","");
2819 notice_processed(flushing_input_file,0,"flushing","");
2820 if (backlog_input_file)
2821 notice_processed(backlog_input_file,0, "backlog file ",
2822 backlog_input_file->path);
2825 static int signal_self_pipe[2];
2826 static sig_atomic_t terminate_sig_flag;
2828 static void raise_default(int signo) {
2829 xsigsetdefault(signo);
2834 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2835 assert(fd=signal_self_pipe[0]);
2837 int r= read(signal_self_pipe[0], buf, sizeof(buf));
2838 if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2839 if (r==0) die("eof on signal self pipe");
2840 if (terminate_sig_flag) {
2842 notice("terminating (%s)", strsignal(terminate_sig_flag));
2843 raise_default(terminate_sig_flag);
2845 return OOP_CONTINUE;
2848 static void sigarrived_handler(int signum) {
2853 if (!terminate_sig_flag) terminate_sig_flag= signum;
2858 write(signal_self_pipe[1],&x,1);
2861 static void init_signals(void) {
2862 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
2863 sysdie("could not ignore SIGPIPE");
2865 if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
2867 xsetnonblock(signal_self_pipe[0],1);
2868 xsetnonblock(signal_self_pipe[1],1);
2870 struct sigaction sa;
2871 memset(&sa,0,sizeof(sa));
2872 sa.sa_handler= sigarrived_handler;
2873 sa.sa_flags= SA_RESTART;
2874 xsigaction(SIGTERM,&sa);
2875 xsigaction(SIGINT,&sa);
2877 on_fd_read_except(signal_self_pipe[0], sigarrived_event);
2880 /*========== flushing the feed ==========*/
2882 static pid_t inndcomm_child;
2883 static int inndcomm_sentinel_fd;
2885 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
2886 assert(inndcomm_child);
2887 assert(fd == inndcomm_sentinel_fd);
2888 int status= xwaitpid(&inndcomm_child, "inndcomm");
2891 cancel_fd_read_except(fd);
2892 xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
2893 inndcomm_sentinel_fd= 0;
2895 assert(!flushing_input_file);
2897 if (WIFEXITED(status)) {
2898 switch (WEXITSTATUS(status)) {
2900 case INNDCOMMCHILD_ESTATUS_FAIL:
2903 case INNDCOMMCHILD_ESTATUS_NONESUCH:
2904 notice("feed has been dropped by innd, finishing up");
2905 flushing_input_file= main_input_file;
2906 tailing_queue_readable(flushing_input_file);
2907 /* we probably previously returned EAGAIN from our fake read method
2908 * when in fact we were at EOF, so signal another readable event
2909 * so we actually see the EOF */
2913 if (flushing_input_file) {
2914 SMS(DROPPING, 0, "feed dropped by innd, but must finish last flush");
2917 SMS(DROPPED, 0, "feed dropped by innd");
2918 search_backlog_file();
2920 return OOP_CONTINUE;
2924 flushing_input_file= main_input_file;
2925 tailing_queue_readable(flushing_input_file);
2927 main_input_file= open_input_file(feedfile);
2928 if (!main_input_file)
2929 die("flush succeeded but feedfile %s does not exist!", feedfile);
2931 if (flushing_input_file) {
2932 SMS(SEPARATED, spontaneous_flush_periods, "recovery flush complete");
2935 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2937 return OOP_CONTINUE;
2940 goto unexpected_exitstatus;
2943 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
2944 warn("flush timed out trying to talk to innd");
2947 unexpected_exitstatus:
2948 report_child_status("inndcomm child", status);
2952 SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
2953 return OOP_CONTINUE;
2956 static void inndcommfail(const char *what) {
2957 syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
2958 exit(INNDCOMMCHILD_ESTATUS_FAIL);
2961 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
2964 notice("flushing %s",why);
2966 assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
2967 assert(!inndcomm_child);
2968 assert(!inndcomm_sentinel_fd);
2970 if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
2972 inndcomm_child= xfork("inndcomm child");
2974 if (!inndcomm_child) {
2975 const char *flushargv[2]= { sitename, 0 };
2979 xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
2980 /* parent spots the autoclose of pipefds[1] when we die or exit */
2982 if (simulate_flush>=0) {
2983 warn("SIMULATING flush child status %d", simulate_flush);
2984 if (simulate_flush>128) raise(simulate_flush-128);
2985 else exit(simulate_flush);
2988 alarm(inndcomm_flush_timeout);
2989 r= ICCopen(); if (r) inndcommfail("connect");
2990 r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit");
2991 if (!r) exit(0); /* yay! */
2993 if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
2994 syswarn("innd ctlinnd flush failed: innd said %s", reply);
2995 exit(INNDCOMMCHILD_ESTATUS_FAIL);
3000 xclose(pipefds[1], "inndcomm sentinel child's end",0);
3001 inndcomm_sentinel_fd= pipefds[0];
3002 assert(inndcomm_sentinel_fd);
3003 on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
3005 SMS(FLUSHING, 0, why);
3008 /*========== main program ==========*/
3010 static void postfork_inputfile(InputFile *ipf) {
3012 xclose(ipf->fd, "(in child) input file ", ipf->path);
3015 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
3016 /* we have no stdio streams that are buffered long-term */
3018 if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
3021 static void postfork(void) {
3024 xsigsetdefault(SIGTERM);
3025 xsigsetdefault(SIGINT);
3026 xsigsetdefault(SIGPIPE);
3027 if (terminate_sig_flag) raise(terminate_sig_flag);
3029 postfork_inputfile(main_input_file);
3030 postfork_inputfile(flushing_input_file);
3034 conn_closefd(conn,"(in child) ");
3036 postfork_stdio(defer, "defer file ", path_defer);
3039 typedef struct Every Every;
3041 struct timeval interval;
3046 static void every_schedule(Every *e, struct timeval base);
3048 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3051 if (!e->fixed_rate) xgettimeofday(&base);
3052 every_schedule(e, base);
3053 return OOP_CONTINUE;
3056 static void every_schedule(Every *e, struct timeval base) {
3057 struct timeval when;
3058 timeradd(&base, &e->interval, &when);
3059 loop->on_time(loop, when, every_happens, e);
3062 static void every(int interval, int fixed_rate, void (*f)(void)) {
3063 NEW_DECL(Every *,e);
3064 e->interval.tv_sec= interval;
3065 e->interval.tv_usec= 0;
3066 e->fixed_rate= fixed_rate;
3069 xgettimeofday(&now);
3070 every_schedule(e, now);
3073 static void filepoll(void) {
3074 filemon_callback(main_input_file);
3075 filemon_callback(flushing_input_file);
3078 static char *debug_report_ipf(InputFile *ipf) {
3079 if (!ipf) return xasprintf("none");
3081 const char *slash= strrchr(ipf->path,'/');
3082 const char *path= slash ? slash+1 : ipf->path;
3084 return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s%s",
3086 ipf->inprogress, (long)ipf->offset,
3088 ipf->rd ? "" : ",!rd",
3089 ipf->skippinglong ? "*skiplong" : "");
3092 static void period(void) {
3093 char *dipf_main= debug_report_ipf(main_input_file);
3094 char *dipf_flushing= debug_report_ipf(flushing_input_file);
3095 char *dipf_backlog= debug_report_ipf(backlog_input_file);
3098 " sms=%s[%d] conns=%d queue=%d until_connect=%d"
3099 " input_files main:%s flushing:%s backlog:%s"
3100 " children connecting=%ld inndcomm=%ld"
3102 sms_names[sms], until_flush,
3103 conns.count, queue.count, until_connect,
3104 dipf_main, dipf_flushing, dipf_backlog,
3105 (long)connecting_child, (long)inndcomm_child
3109 free(dipf_flushing);
3112 if (until_connect) until_connect--;
3114 poll_backlog_file();
3115 if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3116 statemc_period_poll();
3117 check_assign_articles();
3122 /*========== dumping state ==========*/
3124 static void dump_article_list(FILE *f, const ControlCommand *c,
3125 const ArticleList *al) {
3126 fprintf(f, " count=%d\n", al->count);
3127 if (!c->xval) return;
3129 int i; Article *art;
3130 for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3131 fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3132 DUMPV("%p", art->,ipf);
3133 DUMPV("%d", art->,missing);
3134 DUMPV("%lu", (unsigned long)art->,offset);
3135 DUMPV("%d", art->,blanklen);
3136 DUMPV("%d", art->,midlen);
3137 fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3141 static void dump_input_file(FILE *f, InputFile *ipf, const char *wh) {
3142 char *dipf= debug_report_ipf(ipf);
3143 fprintf(f,"input %s %s", wh, dipf);
3147 DUMPV("%d", ipf->,readcount_ok);
3148 DUMPV("%d", ipf->,readcount_blank);
3149 DUMPV("%d", ipf->,readcount_err);
3153 ArtState state; const char *const *statename;
3154 for (state=0, statename=artstate_names; *statename; state++,statename++) {
3155 #define RC_DUMP_FMT(x) " " #x "=%d"
3156 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3157 fprintf(f,"input %s counts %-11s"
3158 RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3160 RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3167 fprintf(cc->out, "dumping state to %s\n", path_dump);
3168 FILE *f= fopen(path_dump, "w");
3169 if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3171 fprintf(f,"general");
3172 DUMPV("%s", sms_names,[sms]);
3173 DUMPV("%d", ,until_flush);
3174 DUMPV("%ld", (long),self_pid);
3175 DUMPV("%p", , defer);
3176 DUMPV("%d", , until_connect);
3177 DUMPV("%d", , until_backlog_nextscan);
3178 DUMPV("%d", , simulate_flush);
3179 fprintf(f,"\nnocheck");
3180 DUMPV("%#.10f", , accept_proportion);
3181 DUMPV("%d", , nocheck);
3182 DUMPV("%d", , nocheck_reported);
3185 fprintf(f,"special");
3186 DUMPV("%ld", (long),connecting_child);
3187 DUMPV("%d", , connecting_fdpass_sock);
3188 DUMPV("%d", , control_master);
3191 fprintf(f,"filemon ");
3192 filemon_method_dump_info(f);
3194 dump_input_file(f, main_input_file, "main" );
3195 dump_input_file(f, flushing_input_file, "flushing");
3196 dump_input_file(f, backlog_input_file, "backlog" );
3198 fprintf(f,"conns count=%d\n", conns.count);
3203 fprintf(f,"C%d",conn->fd);
3204 DUMPV("%p",conn->,rd); DUMPV("%d",conn->,max_queue);
3205 DUMPV("%d",conn->,stream); DUMPV("%d",conn->,quitting);
3206 DUMPV("%d",conn->,since_activity);
3209 fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3210 fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3211 fprintf(f,"C%d sent", conn->fd); dump_article_list(f,c,&conn->sent);
3213 fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3214 for (i=0; i<conn->xmitu; i++) {
3215 const struct iovec *iv= &conn->xmit[i];
3216 const XmitDetails *xd= &conn->xmitd[i];
3220 case xk_Const: dinfo= xasprintf("Const"); break;
3221 case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break;
3225 fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3226 sanitise(iv->iov_base, iv->iov_len));
3231 fprintf(f,"queue"); dump_article_list(f,c,&queue);
3234 DUMPV("%s", , path_lock);
3235 DUMPV("%s", , path_flushing);
3236 DUMPV("%s", , path_defer);
3237 DUMPV("%s", , path_control);
3238 DUMPV("%s", , path_dump);
3239 DUMPV("%s", , globpat_backlog);
3242 if (!!ferror(f) + !!fclose(f)) {
3243 fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3248 /*========== option parsing ==========*/
3250 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3251 static void vbadusage(const char *fmt, va_list al) {
3252 char *m= xvasprintf(fmt,al);
3253 fprintf(stderr, "bad usage: %s\n"
3254 "say --help for help, or read the manpage\n",
3257 syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3261 /*---------- generic option parser ----------*/
3263 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3264 static void badusage(const char *fmt, ...) {
3271 of_seconds= 001000u,
3272 of_boolean= 002000u,
3275 typedef struct Option Option;
3276 typedef void OptionParser(const Option*, const char *val);
3280 const char *lng, *formarg;
3286 static void parse_options(const Option *options, char ***argvp) {
3287 /* on return *argvp is first non-option arg; argc is not updated */
3290 const char *arg= *++(*argvp);
3292 if (*arg != '-') break;
3293 if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3295 while ((a= *++arg)) {
3299 char *equals= strchr(arg,'=');
3300 int len= equals ? (equals - arg) : strlen(arg);
3301 for (o=options; o->shrt || o->lng; o++)
3302 if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3304 badusage("unknown long option --%s",arg);
3307 if (equals) badusage("option --%s does not take a value",o->lng);
3309 } else if (equals) {
3313 if (!arg) badusage("option --%s needs a value for %s",
3314 o->lng, o->formarg);
3317 break; /* eaten the whole argument now */
3319 for (o=options; o->shrt || o->lng; o++)
3322 badusage("unknown short option -%c",a);
3329 if (!arg) badusage("option -%c needs a value for %s",
3330 o->shrt, o->formarg);
3333 break; /* eaten the whole argument now */
3339 #define DELIMPERHAPS(delim,str) (str) ? (delim) : "", (str) ? (str) : ""
3341 static void print_options(const Option *options, FILE *f) {
3343 for (o=options; o->shrt || o->lng; o++) {
3344 char shrt[2] = { o->shrt, 0 };
3345 char *optspec= xasprintf("%s%s%s%s%s",
3346 o->shrt ? "-" : "", shrt,
3347 o->shrt && o->lng ? "|" : "",
3348 DELIMPERHAPS("--", o->lng));
3349 fprintf(f, " %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3354 /*---------- specific option types ----------*/
3356 static void op_integer(const Option *o, const char *val) {
3359 unsigned long ul= strtoul(val,&ep,10);
3360 if (*ep || ep==val || errno || ul>INT_MAX)
3361 badusage("bad integer value for %s",o->lng);
3362 int *store= o->store;
3366 static void op_double(const Option *o, const char *val) {
3367 int *store= o->store;
3370 *store= strtod(val, &ep);
3371 if (*ep || ep==val || errno)
3372 badusage("bad floating point value for %s",o->lng);
3375 static void op_string(const Option *o, const char *val) {
3376 const char **store= o->store;
3380 static void op_seconds(const Option *o, const char *val) {
3381 int *store= o->store;
3385 double v= strtod(val,&ep);
3386 if (ep==val) badusage("bad time/duration value for %s",o->lng);
3388 if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3389 else if (!strcmp(ep,"m") || !strcmp(ep,"min")) unit= 60;
3390 else if (!strcmp(ep,"h") || !strcmp(ep,"hour")) unit= 3600;
3391 else if (!strcmp(ep,"d") || !strcmp(ep,"day")) unit= 86400;
3392 else if (!strcmp(ep,"das")) unit= 10;
3393 else if (!strcmp(ep,"hs")) unit= 100;
3394 else if (!strcmp(ep,"ks")) unit= 1000;
3395 else if (!strcmp(ep,"Ms")) unit= 1000000;
3396 else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3400 if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3404 static void op_setint(const Option *o, const char *val) {
3405 int *store= o->store;
3409 /*---------- specific options ----------*/
3411 static void help(const Option *o, const char *val);
3413 static const Option innduct_options[]= {
3414 {'f',"feedfile", "F", &feedfile, op_string },
3415 {'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 },
3416 {0,"no-daemon", 0, &become_daemon, op_setint, 0 },
3417 {0,"no-streaming", 0, &try_stream, op_setint, 0 },
3418 {0,"no-filemon", 0, &try_filemon, op_setint, 0 },
3419 {'C',"inndconf", "F", &inndconffile, op_string },
3420 {'P',"port", "PORT", &port, op_integer },
3421 {0,"ctrl-sock-dir", 0, &realsockdir, op_string },
3422 {0,"help", 0, 0, help },
3424 {0,"max-connections", "N", &max_connections, op_integer },
3425 {0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer },
3426 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer },
3427 {0,"period-interval", "TIME", &period_seconds, op_seconds },
3429 {0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds },
3430 {0,"stuck-flush-timeout", "TIME", &inndcomm_flush_timeout, op_seconds },
3431 {0,"feedfile-poll", "TIME", &filepoll_seconds, op_seconds },
3433 {0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double },
3434 {0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double },
3436 {0,"reconnect-interval", "PERIOD", &reconnect_delay_periods, op_seconds },
3437 {0,"flush-retry-interval", "PERIOD", &flushfail_retry_periods, op_seconds },
3438 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3439 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3440 {0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds },
3441 {0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds },
3443 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double },
3444 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer },
3449 static void printusage(FILE *f) {
3450 fputs("usage: innduct [options] site [fqdn]\n"
3451 "available options are:\n", f);
3452 print_options(innduct_options, f);
3455 static void help(const Option *o, const char *val) {
3457 if (ferror(stdout) || fflush(stdout)) {
3458 perror("innduct: writing help");
3464 static void convert_to_periods_rndup(int *store) {
3465 *store += period_seconds-1;
3466 *store /= period_seconds;
3469 int main(int argc, char **argv) {
3475 parse_options(innduct_options, &argv);
3480 if (!sitename) badusage("need site name argument");
3481 remote_host= *argv++;
3482 if (*argv) badusage("too many non-option arguments");
3486 int r= innconf_read(inndconffile);
3487 if (!r) badusage("could not read inn.conf (more info on stderr)");
3489 if (!remote_host) remote_host= sitename;
3491 if (nocheck_thresh < 0 || nocheck_thresh > 100)
3492 badusage("nocheck threshold percentage must be between 0..100");
3493 nocheck_thresh *= 0.01;
3495 if (nocheck_decay < 0.1)
3496 badusage("nocheck decay articles must be at least 0.1");
3497 nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3499 convert_to_periods_rndup(&reconnect_delay_periods);
3500 convert_to_periods_rndup(&flushfail_retry_periods);
3501 convert_to_periods_rndup(&backlog_retry_minperiods);
3502 convert_to_periods_rndup(&backlog_spontrescan_periods);
3503 convert_to_periods_rndup(&spontaneous_flush_periods);
3504 convert_to_periods_rndup(&need_activity_periods);
3506 if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3507 badusage("bad input data ratio must be between 0..100");
3508 max_bad_data_ratio *= 0.01;
3511 feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3512 } else if (!feedfile[0]) {
3513 badusage("feed filename must be nonempty");
3514 } else if (feedfile[strlen(feedfile)-1]=='/') {
3515 feedfile= xasprintf("%s%s",feedfile,sitename);
3518 const char *feedfile_forbidden= "?*[~#";
3520 while ((c= *feedfile_forbidden++))
3521 if (strchr(feedfile, c))
3522 badusage("feed filename may not contain metacharacter %c",c);
3526 path_lock= xasprintf("%s_lock", feedfile);
3527 path_flushing= xasprintf("%s_flushing", feedfile);
3528 path_defer= xasprintf("%s_defer", feedfile);
3529 path_control= xasprintf("%s_control", feedfile);
3530 path_dump= xasprintf("%s_dump", feedfile);
3531 globpat_backlog= xasprintf("%s_backlog*", feedfile);
3533 oop_source_sys *sysloop= oop_sys_new();
3534 if (!sysloop) sysdie("could not create liboop event loop");
3535 loop= (oop_source*)sysloop;
3540 if (become_daemon) {
3542 for (i=3; i<255; i++)
3543 /* do this now before we open syslog, etc. */
3545 openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3547 int null= open("/dev/null",O_RDWR);
3548 if (null<0) sysfatal("failed to open /dev/null");
3552 xclose(null, "/dev/null original fd",0);
3554 pid_t child1= xfork("daemonise first fork");
3555 if (child1) _exit(0);
3557 pid_t sid= setsid();
3558 if (sid != child1) sysfatal("setsid failed");
3560 pid_t child2= xfork("daemonise second fork");
3561 if (child2) _exit(0);
3565 if (self_pid==-1) sysdie("getpid");
3580 notice("filemon: suppressed by command line option, polling");
3582 filemon_ok= filemon_method_init();
3584 warn("filemon: no file monitoring available, polling");
3587 every(filepoll_seconds,0,filepoll);
3589 every(period_seconds,1,period);
3595 void *run= oop_sys_run(sysloop);
3596 assert(run == OOP_ERROR);
3597 sysdie("event loop failed");