3 * build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
8 to ensure articles go away eventually
9 separate queue for each input file
11 every period, check head of backlog queue for expiry with SMretrieve
12 if too old: discard, and check next article
13 also check every backlog article as we read it
15 after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
16 one-off: eat queued articles from flushing and write them to defer
17 one-off: connfail all connections which have any articles from flushing
18 newly read articles from flushing go straight to defer
19 this should take care of it and get us out of this state
20 to avoid filling up ram needlessly
22 limit number of queued articles for each ipf
23 pause/resume inputfile tailing
27 * Newsfeeds file entries should look like this:
28 * host.name.of.site[/exclude,exclude,...]\
29 * :pattern,pattern...[/distribution,distribution...]\
33 * sitename[/exclude,exclude,...]\
34 * :pattern,pattern...[/distribution,distribution...]\
40 * or might be blanked out
41 * <spc><spc><spc><spc>....
43 * F site.name main feed file
44 * opened/created, then written, by innd
47 * tokens blanked out by duct when processed
48 * site.name_lock lock preventing multiple ducts
49 * to hold lock must open,F_SETLK[W]
50 * and then stat to check that locked file
51 * still has name site.name_lock
52 * holder of this lock is "duct"
53 * (only) lockholder may remove the lockfile
54 * D site.name_flushing temporary feed file during flush (or crash)
55 * hardlink created by duct
57 * site.name_defer 431'd articles, still being written,
58 * created, written, used by duct
60 * site.name_backlog.<date>.<inum>
61 * 431'd articles, ready for innxmit or duct
62 * created (link/mv) by duct
63 * site.name_backlog<anything-else> (where <anything-else> does not
64 * contain '#' or '~') eg
65 * site.name_backlog.manual
66 * anything the sysadmin likes (eg, feed files
67 * from old feeds to be merged into this one)
68 * created (link/mv) by admin
69 * may be symlinks (in which case links
70 * may be written through, but only links
73 * It is safe to remove backlog files manually,
74 * if it's desired to throw away the backlog.
76 * Backlog files are also processed by innduct. We find the oldest
77 * backlog file which is at least a certain amount old, and feed it
78 * back into our processing. When every article in it has been read
79 * and processed, we unlink it and look for another backlog file.
81 * If we don't have a backlog file that we're reading, we close the
82 * defer file that we're writing and make it into a backlog file at
83 * the first convenient opportunity.
94 | | <----------------<---------------------------------'|
100 | F: innd writing, duct reading |
103 | | duct decides time to flush |
104 | | duct makes hardlink |
106 | V <------------------------'|
108 | F == D: innd writing, duct reading both exist |
111 | | <-----------<-------------<--'|
112 | | open D F ENOENT |
115 | V <---------------------. |
118 | D: innd writing, duct reading; or ENOENT | |
120 | | duct requests flush of feed | |
121 | | (others can too, harmlessly) | |
125 | D: innd flushing, duct; or ENOENT | |
127 | | inndcomm flush fails | |
128 | |`-------------------------->------------------' |
130 | | inndcomm reports no such site |
131 | |`---------------------------------------------------- | -.
133 | | innd finishes writing D, creates F | |
134 | | inndcomm reports flush successful | |
137 | Separated <----------------' |
138 | F: innd writing F!=D /
139 | D: duct reading; or ENOENT both exist /
141 | | duct gets to the end of D /
142 | | duct opens F too /
145 | F: innd writing, duct reading |
146 | D: duct finishing V
148 | | duct finishes processing D F: ENOENT
149 | V duct unlinks D D: duct reading
151 `--<--' | duct finishes
161 "duct reading" means innduct is reading the file but also
162 overwriting processed tokens.
166 * rune for printing diagrams:
168 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
173 /*============================== PROGRAM ==============================*/
175 #define _GNU_SOURCE 1
181 #include "inndcomm.h"
183 #include "inn/list.h"
184 #include "inn/innconf.h"
187 #include <sys/types.h>
188 #include <sys/wait.h>
189 #include <sys/stat.h>
190 #include <sys/socket.h>
209 #include <oop-read.h>
211 /*----- general definitions, probably best not changed -----*/
213 #define CONNCHILD_ESTATUS_STREAM 24
214 #define CONNCHILD_ESTATUS_NOSTREAM 25
216 #define INNDCOMMCHILD_ESTATUS_FAIL 26
217 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
219 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
220 #define MAX_CONTROL_COMMAND 1000
222 #define VA va_list al; va_start(al,fmt)
223 #define PRINTF(f,a) __attribute__((__format__(printf,f,a)))
224 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
225 #define NORET __attribute__((__noreturn__))
227 #define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr))))
228 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
230 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
232 #define FOR_CONN(conn) \
233 for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
235 /*----- doubly linked lists -----*/
237 #define ISNODE(T) struct node list_node
240 union { struct list li; T *for_type; } u; \
244 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
246 #define LIST_CHECKCANHAVENODE(l,n) \
247 ((void)((n) == ((l).u.for_type))) /* just for the type check */
249 #define LIST_ADDSOMEHOW(l,n,list_addsomehow) \
250 ( LIST_CHECKCANHAVENODE(l,n), \
251 list_addsomehow(&(l).u.li, NODE((n))), \
255 #define LIST_REMSOMEHOW(l,list_remsomehow) \
256 ( (typeof((l).u.for_type)) \
259 list_remsomehow(&(l).u.li) ) \
265 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
266 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
267 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
268 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
270 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
271 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
272 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
273 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
275 #define LIST_REMOVE(l,n) \
276 ( LIST_CHECKCANHAVENODE(l,n), \
277 list_remove(NODE((n))), \
281 #define LIST_INSERT(l,n,pred) \
282 ( LIST_CHECKCANHAVENODE(l,n), \
283 LIST_CHECKCANHAVENODE(l,pred), \
284 list_insert((struct list*)&(l), NODE((n)), NODE((pred))), \
288 /*----- type predeclarations -----*/
290 typedef struct Conn Conn;
291 typedef struct Article Article;
292 typedef struct InputFile InputFile;
293 typedef struct XmitDetails XmitDetails;
294 typedef struct Filemon_Perfile Filemon_Perfile;
295 typedef enum StateMachineState StateMachineState;
296 typedef struct ControlCommand ControlCommand;
301 /*----- function predeclarations -----*/
303 static void conn_maybe_write(Conn *conn);
304 static void conn_make_some_xmits(Conn *conn);
305 static void *conn_write_some_xmits(Conn *conn);
307 static void xmit_free(XmitDetails *d);
309 #define SMS(newstate, periods, why) \
310 (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
311 static void statemc_setstate(StateMachineState newsms, int periods,
312 const char *forlog, const char *why);
314 static void statemc_start_flush(const char *why); /* Normal => Flushing */
315 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
316 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
318 static void article_done(Article *art, int whichcount);
320 static void check_assign_articles(void);
321 static void queue_check_input_done(void);
323 static void statemc_check_flushing_done(void);
324 static void statemc_check_backlog_done(void);
326 static void postfork(void);
327 static void period(void);
329 static void open_defer(void);
330 static void close_defer(void);
331 static void search_backlog_file(void);
332 static void preterminate(void);
333 static void raise_default(int signo) NORET;
334 static char *debug_report_ipf(InputFile *ipf);
336 static void inputfile_reading_start(InputFile *ipf);
337 static void inputfile_reading_stop(InputFile *ipf);
339 static void filemon_start(InputFile *ipf);
340 static void filemon_stop(InputFile *ipf);
341 static void filemon_callback(InputFile *ipf);
343 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
344 static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
346 static const oop_rd_style peer_rd_style;
347 static oop_rd_call peer_rd_err, peer_rd_ok;
349 /*----- configuration options -----*/
350 /* when changing defaults, remember to update the manpage */
352 static const char *sitename, *remote_host;
353 static const char *feedfile, *realsockdir="/tmp/innduct.control";
354 static int quiet_multiple=0;
355 static int become_daemon=1, try_filemon=1;
356 static int try_stream=1;
358 static const char *inndconffile;
360 static int max_connections=10;
361 static int max_queue_per_conn=200;
362 static int target_max_feedfile_size=100000;
363 static int period_seconds=60;
364 static int filepoll_seconds=5;
366 static int connection_setup_timeout=200;
367 static int inndcomm_flush_timeout=100;
369 static double nocheck_thresh= 95.0; /* converted from percentage by main */
370 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
372 /* all these are initialised to seconds, and converted to periods in main */
373 static int reconnect_delay_periods=1000;
374 static int flushfail_retry_periods=1000;
375 static int backlog_retry_minperiods=50;
376 static int backlog_spontrescan_periods=300;
377 static int spontaneous_flush_periods=100000;
378 static int max_separated_periods=2000;
379 static int need_activity_periods=1000;
381 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
382 static int max_bad_data_initial= 30;
383 /* in one corrupt 4096-byte block the number of newlines has
384 * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */
387 /*----- statistics -----*/
389 typedef enum { /* in queue in conn->sent */
390 art_Unchecked, /* not checked, not sent checking */
391 art_Wanted, /* checked, wanted sent body as requested */
392 art_Unsolicited, /* - sent body without check */
396 static const char *const artstate_names[]=
397 { "Unchecked", "Wanted", "Unsolicited", 0 };
399 #define RESULT_COUNTS(RCS,RCN) \
408 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
409 #define RCI_TRIPLE_VALS_BASE(counts,x) \
410 counts[art_Unchecked] x \
411 + counts[art_Wanted] x \
412 + counts[art_Unsolicited] x, \
413 counts[art_Unchecked] x \
414 , counts[art_Wanted] x \
415 , counts[art_Unsolicited] x
418 #define RC_INDEX(x) RC_##x,
419 RESULT_COUNTS(RC_INDEX, RC_INDEX)
424 /*----- transmission buffers -----*/
440 /*----- core operational data structure types -----*/
443 /* This is also an instance of struct oop_readable */
444 struct oop_readable readable; /* first */
445 oop_readable_call *readable_callback;
446 void *readable_callback_user;
449 Filemon_Perfile *filemon;
451 oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
456 long inprogress; /* includes queue.count and also articles in conns */
457 long autodefer; /* -1 means not doing autodefer */
459 int counts[art_MaxState][RCI_max];
460 int readcount_ok, readcount_blank, readcount_err;
475 #define SMS_LIST(X) \
483 enum StateMachineState {
484 #define SMS_DEF_ENUM(s) sm_##s,
485 SMS_LIST(SMS_DEF_ENUM)
488 static const char *sms_names[]= {
489 #define SMS_DEF_NAME(s) #s ,
490 SMS_LIST(SMS_DEF_NAME)
496 int fd; /* may be 0, meaning closed (during construction/destruction) */
497 oop_read *rd; /* likewise */
498 int max_queue, stream, quitting;
499 int since_activity; /* periods */
500 ArticleList waiting; /* not yet told peer */
501 ArticleList priority; /* peer says send it now */
502 ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
503 struct iovec xmit[CONNIOVS];
504 XmitDetails xmitd[CONNIOVS];
509 /*----- general operational variables -----*/
511 /* main initialises */
512 static oop_source *loop;
513 static ConnList conns;
514 static char *path_lock, *path_flushing, *path_defer;
515 static char *path_control, *path_dump;
516 static char *globpat_backlog;
517 static pid_t self_pid;
519 /* statemc_init initialises */
520 static StateMachineState sms;
521 static int until_flush;
522 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
525 /* initialisation to 0 is good */
526 static int until_connect, until_backlog_nextscan;
527 static double accept_proportion;
528 static int nocheck, nocheck_reported, in_child;
530 /* for simulation, debugging, etc. */
531 int simulate_flush= -1;
533 /*========== logging ==========*/
535 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
536 static void logcore(int sysloglevel, const char *fmt, ...) {
539 vsyslog(sysloglevel,fmt,al);
541 if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
542 vfprintf(stderr,fmt,al);
548 static void logv(int sysloglevel, const char *pfx, int errnoval,
549 const char *fmt, va_list al) PRINTF(5,0);
550 static void logv(int sysloglevel, const char *pfx, int errnoval,
551 const char *fmt, va_list al) {
552 char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
553 vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
554 msgbuf[sizeof(msgbuf)-1]= 0;
556 if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
557 sysloglevel= LOG_ERR; /* run by wrong user, probably */
559 logcore(sysloglevel, "<%s>%s: %s%s%s",
560 sitename, pfx, msgbuf,
561 errnoval>=0 ? ": " : "",
562 errnoval>=0 ? strerror(errnoval) : "");
565 #define diewrap(fn, pfx, sysloglevel, err, estatus) \
566 static void fn(const char *fmt, ...) NORET_PRINTF(1,2); \
567 static void fn(const char *fmt, ...) { \
570 logv(sysloglevel, pfx, err, fmt, al); \
574 #define logwrap(fn, pfx, sysloglevel, err) \
575 static void fn(const char *fmt, ...) PRINTF(1,2); \
576 static void fn(const char *fmt, ...) { \
578 logv(sysloglevel, pfx, err, fmt, al); \
582 diewrap(sysdie, " critical", LOG_CRIT, errno, 16);
583 diewrap(die, " critical", LOG_CRIT, -1, 16);
585 diewrap(sysfatal, " fatal", LOG_ERR, errno, 12);
586 diewrap(fatal, " fatal", LOG_ERR, -1, 12);
588 logwrap(syswarn, " warning", LOG_WARNING, errno);
589 logwrap(warn, " warning", LOG_WARNING, -1);
591 logwrap(notice, " notice", LOG_NOTICE, -1);
592 logwrap(info, " info", LOG_INFO, -1);
593 logwrap(debug, " debug", LOG_DEBUG, -1);
596 /*========== utility functions etc. ==========*/
598 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
599 static char *xvasprintf(const char *fmt, va_list al) {
601 int rc= vasprintf(&str,fmt,al);
602 if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
605 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
606 static char *xasprintf(const char *fmt, ...) {
608 char *str= xvasprintf(fmt,al);
613 static int close_perhaps(int *fd) {
614 if (*fd <= 0) return 0;
619 static void xclose(int fd, const char *what, const char *what2) {
621 if (r) sysdie("close %s%s",what,what2?what2:"");
623 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
624 if (*fd <= 0) return;
625 xclose(*fd,what,what2);
629 static pid_t xfork(const char *what) {
633 if (child==-1) sysfatal("cannot fork for %s",what);
634 debug("forked %s %ld", what, (unsigned long)child);
635 if (!child) postfork();
639 static void on_fd_read_except(int fd, oop_call_fd callback) {
640 loop->on_fd(loop, fd, OOP_READ, callback, 0);
641 loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
643 static void cancel_fd_read_except(int fd) {
644 loop->cancel_fd(loop, fd, OOP_READ);
645 loop->cancel_fd(loop, fd, OOP_EXCEPTION);
648 static void report_child_status(const char *what, int status) {
649 if (WIFEXITED(status)) {
650 int es= WEXITSTATUS(status);
652 warn("%s: child died with error exit status %d", what, es);
653 } else if (WIFSIGNALED(status)) {
654 int sig= WTERMSIG(status);
655 const char *sigstr= strsignal(sig);
656 const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
658 warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
660 warn("%s: child died due to unknown fatal signal %d%s",
661 what, sig, coredump);
663 warn("%s: child died with unknown wait status %d", what,status);
667 static int xwaitpid(pid_t *pid, const char *what) {
670 int r= kill(*pid, SIGKILL);
671 if (r) sysdie("cannot kill %s child", what);
673 pid_t got= waitpid(*pid, &status, 0);
674 if (got==-1) sysdie("cannot reap %s child", what);
675 if (got==0) die("cannot reap %s child", what);
682 static void *zxmalloc(size_t sz) {
683 void *p= xmalloc(sz);
688 static void xunlink(const char *path, const char *what) {
690 if (r) sysdie("can't unlink %s %s", path, what);
693 static time_t xtime(void) {
695 if (now==-1) sysdie("time(2) failed");
699 static void xsigaction(int signo, const struct sigaction *sa) {
700 int r= sigaction(signo,sa,0);
701 if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
704 static void xsigsetdefault(int signo) {
706 memset(&sa,0,sizeof(sa));
707 sa.sa_handler= SIG_DFL;
708 xsigaction(signo,&sa);
711 static void xgettimeofday(struct timeval *tv_r) {
712 int r= gettimeofday(tv_r,0);
713 if (r) sysdie("gettimeofday(2) failed");
716 static void xsetnonblock(int fd, int nonblocking) {
717 int errnoval= oop_fd_nonblock(fd, nonblocking);
718 if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
721 static void check_isreg(const struct stat *stab, const char *path,
723 if (!S_ISREG(stab->st_mode))
724 die("%s %s not a plain file (mode 0%lo)",
725 what, path, (unsigned long)stab->st_mode);
728 static void xfstat(int fd, struct stat *stab_r, const char *what) {
729 int r= fstat(fd, stab_r);
730 if (r) sysdie("could not fstat %s", what);
733 static void xfstat_isreg(int fd, struct stat *stab_r,
734 const char *path, const char *what) {
735 xfstat(fd, stab_r, what);
736 check_isreg(stab_r, path, what);
739 static void xlstat_isreg(const char *path, struct stat *stab,
740 int *enoent_r /* 0 means ENOENT is fatal */,
742 int r= lstat(path, stab);
744 if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
745 sysdie("could not lstat %s %s", what, path);
747 if (enoent_r) *enoent_r= 0;
748 check_isreg(stab, path, what);
751 static int samefile(const struct stat *a, const struct stat *b) {
752 assert(S_ISREG(a->st_mode));
753 assert(S_ISREG(b->st_mode));
754 return (a->st_ino == b->st_ino &&
755 a->st_dev == b->st_dev);
758 static char *sanitise(const char *input, int len) {
759 static char sanibuf[100]; /* returns pointer to this buffer! */
761 const char *p= input;
762 const char *endp= len>=0 ? input+len : 0;
766 if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
767 int c= (!endp || p<endp) ? *p++ : 0;
768 if (!c) { *q++= '\''; *q=0; break; }
769 if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
770 sprintf(q,"\\x%02x",c);
776 static int isewouldblock(int errnoval) {
777 return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
780 /*========== command and control connections ==========*/
782 static int control_master;
784 typedef struct ControlConn ControlConn;
786 void (*destroy)(ControlConn*);
792 struct sockaddr_un un;
797 static const oop_rd_style control_rd_style= {
798 OOP_RD_DELIM_STRIP, '\n',
800 OOP_RD_SHORTREC_FORBID
803 static void control_destroy(ControlConn *cc) {
807 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
808 if (ferror(cc->out) | fflush(cc->out)) {
809 info("CTRL%d write error %s", cc->fd, strerror(errno));
814 static void control_prompt(ControlConn *cc /* may destroy*/) {
815 fprintf(cc->out, "%s| ", sitename);
816 control_checkouterr(cc);
819 struct ControlCommand {
821 void (*f)(ControlConn *cc, const ControlCommand *ccmd,
822 const char *arg, size_t argsz);
827 static const ControlCommand control_commands[];
830 static void ccmd_##wh(ControlConn *cc, const ControlCommand *c, \
831 const char *arg, size_t argsz)
834 fputs("commands:\n", cc->out);
835 const ControlCommand *ccmd;
836 for (ccmd=control_commands; ccmd->cmd; ccmd++)
837 fprintf(cc->out, " %s\n", ccmd->cmd);
838 fputs("NB: permissible arguments are not shown above."
839 " Not all commands listed are safe. See innduct(8).\n", cc->out);
843 int ok= trigger_flush_ok();
844 if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
849 notice("terminating (CTRL%d)",cc->fd);
850 raise_default(SIGTERM);
856 /* messing with our head: */
857 CCMD(period) { period(); }
858 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
859 CCMD(setint) { *(int*)c->xdata= c->xval; }
860 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
862 static const ControlCommand control_commands[]= {
864 { "flush", ccmd_flush },
865 { "stop", ccmd_stop },
866 { "dump q", ccmd_dump, 0,0 },
867 { "dump a", ccmd_dump, 0,1 },
869 { "p", ccmd_period },
871 #define POKES(cmd,func) \
872 { cmd "flush", func, &until_flush, 1 }, \
873 { cmd "conn", func, &until_connect, 0 }, \
874 { cmd "blscan", func, &until_backlog_nextscan, 0 },
875 POKES("next ", ccmd_setint)
876 POKES("prod ", ccmd_setint_period)
878 { "pretend flush", ccmd_setintarg, &simulate_flush },
879 { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 },
883 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
884 const char *errmsg, int errnoval,
885 const char *data, size_t recsz, void *cc_v) {
886 ControlConn *cc= cc_v;
889 info("CTRL%d closed", cc->fd);
894 if (recsz == 0) goto prompt;
896 const ControlCommand *ccmd;
897 for (ccmd=control_commands; ccmd->cmd; ccmd++) {
898 int l= strlen(ccmd->cmd);
899 if (recsz < l) continue;
900 if (recsz > l && data[l] != ' ') continue;
901 if (memcmp(data, ccmd->cmd, l)) continue;
903 int argl= (int)recsz - (l+1);
904 ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
908 fputs("unknown command; h for help\n", cc->out);
915 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
916 const char *errmsg, int errnoval,
917 const char *data, size_t recsz, void *cc_v) {
918 ControlConn *cc= cc_v;
920 info("CTRL%d read error %s", cc->fd, errmsg);
925 static int control_conn_startup(ControlConn *cc /* may destroy*/,
927 cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
928 if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
930 int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
933 if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
935 info("CTRL%d %s ready", cc->fd, how);
940 static void control_stdio_destroy(ControlConn *cc) {
942 oop_rd_cancel(cc->rd);
943 errno= oop_rd_delete_tidy(cc->rd);
944 if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
949 static void control_stdio(void) {
950 NEW_DECL(ControlConn *,cc);
951 cc->destroy= control_stdio_destroy;
955 int r= control_conn_startup(cc,"stdio");
956 if (r) cc->destroy(cc);
959 static void control_accepted_destroy(ControlConn *cc) {
961 oop_rd_cancel(cc->rd);
962 oop_rd_delete_kill(cc->rd);
964 if (cc->out) { fclose(cc->out); cc->fd=0; }
965 close_perhaps(&cc->fd);
969 static void *control_master_readable(oop_source *lp, int master,
970 oop_event ev, void *u) {
971 NEW_DECL(ControlConn *,cc);
972 cc->destroy= control_accepted_destroy;
974 cc->salen= sizeof(cc->sa);
975 cc->fd= accept(master, &cc->sa.sa, &cc->salen);
976 if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
978 cc->out= fdopen(cc->fd, "w");
979 if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
981 int r= control_conn_startup(cc, "accepted");
991 #define NOCONTROL(...) do{ \
992 syswarn("no control socket, because failed to " __VA_ARGS__); \
996 static void control_init(void) {
1001 struct sockaddr_un un;
1004 memset(&sa,0,sizeof(sa));
1005 int maxlen= sizeof(sa.un.sun_path);
1007 int reallen= readlink(path_control, sa.un.sun_path, maxlen);
1009 if (errno != ENOENT)
1010 NOCONTROL("readlink control socket symlink path %s", path_control);
1012 if (reallen >= maxlen) {
1013 debug("control socket symlink path too long (r=%d)",reallen);
1014 xunlink(path_control, "old (overlong) control socket symlink");
1020 int r= lstat(realsockdir,&stab);
1022 if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
1024 r= mkdir(realsockdir, 0700);
1025 if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
1028 uid_t self= geteuid();
1029 if (!S_ISDIR(stab.st_mode) ||
1030 stab.st_uid != self ||
1031 stab.st_mode & 0007) {
1032 warn("no control socket, because real socket directory"
1033 " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
1034 !!S_ISDIR(stab.st_mode),
1035 (unsigned long)stab.st_uid, (unsigned long)self,
1036 (unsigned long)stab.st_mode & 0777UL);
1041 real= xasprintf("%s/s%lx.%lx", realsockdir,
1042 (unsigned long)xtime(), (unsigned long)self_pid);
1043 int reallen= strlen(real);
1045 if (reallen >= maxlen) {
1046 warn("no control socket, because tmpnam gave overly-long path"
1050 r= symlink(real, path_control);
1051 if (r) NOCONTROL("make control socket path %s a symlink to real"
1052 " socket path %s", path_control, real);
1053 memcpy(sa.un.sun_path, real, reallen);
1056 int r= unlink(sa.un.sun_path);
1057 if (r && errno!=ENOENT)
1058 NOCONTROL("remove old real socket %s", sa.un.sun_path);
1060 control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1061 if (control_master<0) NOCONTROL("create new control socket");
1063 sa.un.sun_family= AF_UNIX;
1064 int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1065 r= bind(control_master, &sa.sa, sl);
1066 if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1068 r= listen(control_master, 5);
1069 if (r) NOCONTROL("listen");
1071 xsetnonblock(control_master, 1);
1073 loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1074 info("control socket ok, real path %s", sa.un.sun_path);
1080 xclose_perhaps(&control_master, "control master",0);
1084 /*========== management of connections ==========*/
1086 static void conn_closefd(Conn *conn, const char *msgprefix) {
1087 int r= close_perhaps(&conn->fd);
1088 if (r) info("C%d %serror closing socket: %s",
1089 conn->fd, msgprefix, strerror(errno));
1092 static void conn_dispose(Conn *conn) {
1095 oop_rd_cancel(conn->rd);
1096 oop_rd_delete_kill(conn->rd);
1100 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1101 loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1103 conn_closefd(conn,"");
1105 until_connect= reconnect_delay_periods;
1108 static void *conn_exception(oop_source *lp, int fd,
1109 oop_event ev, void *conn_v) {
1112 assert(fd == conn->fd);
1113 assert(ev == OOP_EXCEPTION);
1114 int r= read(conn->fd, &ch, 1);
1115 if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1116 else connfail(conn,"exceptional condition on socket (peer sent urgent"
1117 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1118 return OOP_CONTINUE;
1121 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1122 int requeue[art_MaxState];
1123 memset(requeue,0,sizeof(requeue));
1127 while ((art= LIST_REMHEAD(conn->priority)))
1128 LIST_ADDTAIL(art->ipf->queue, art);
1130 while ((art= LIST_REMHEAD(conn->waiting)))
1131 LIST_ADDTAIL(art->ipf->queue, art);
1133 while ((art= LIST_REMHEAD(conn->sent))) {
1134 requeue[art->state]++;
1135 if (art->state==art_Unsolicited) art->state= art_Unchecked;
1136 LIST_ADDTAIL(art->ipf->queue,art);
1141 for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1144 char *m= xvasprintf(fmt,al);
1145 warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1146 conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1149 LIST_REMOVE(conns,conn);
1151 check_assign_articles();
1154 static void connfail(Conn *conn, const char *fmt, ...) {
1157 vconnfail(conn,fmt,al);
1161 static void check_idle_conns(void) {
1164 conn->since_activity++;
1167 if (conn->since_activity <= need_activity_periods) continue;
1169 /* We need to shut this down */
1171 connfail(conn,"timed out waiting for response to QUIT");
1172 else if (conn->sent.count)
1173 connfail(conn,"timed out waiting for responses");
1174 else if (conn->waiting.count || conn->priority.count)
1175 connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1176 else if (conn->xmitu)
1177 connfail(conn,"peer has been sending responses"
1178 " before receiving our commands!");
1180 static const char quitcmd[]= "QUIT\r\n";
1181 int todo= sizeof(quitcmd)-1;
1182 const char *p= quitcmd;
1184 int r= write(conn->fd, p, todo);
1186 if (isewouldblock(errno))
1187 connfail(conn, "blocked writing QUIT to idle connection");
1189 connfail(conn, "failed to write QUIT to idle connection: %s",
1197 conn->since_activity= 0;
1198 debug("C%d is idle, quitting", conn->fd);
1207 /*---------- making new connections ----------*/
1209 static pid_t connecting_child;
1210 static int connecting_fdpass_sock;
1212 static void connect_attempt_discard(void) {
1213 if (connecting_child) {
1214 int status= xwaitpid(&connecting_child, "connect");
1215 if (!(WIFEXITED(status) ||
1216 (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1217 report_child_status("connect", status);
1219 if (connecting_fdpass_sock) {
1220 cancel_fd_read_except(connecting_fdpass_sock);
1221 xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1225 #define PREP_DECL_MSG_CMSG(msg) \
1227 struct iovec msgiov; \
1228 msgiov.iov_base= &msgbyte; \
1229 msgiov.iov_len= 1; \
1230 struct msghdr msg; \
1231 memset(&msg,0,sizeof(msg)); \
1232 char msg##cbuf[CMSG_SPACE(sizeof(int))]; \
1233 msg.msg_iov= &msgiov; \
1234 msg.msg_iovlen= 1; \
1235 msg.msg_control= msg##cbuf; \
1236 msg.msg_controllen= sizeof(msg##cbuf);
1238 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1241 assert(fd == connecting_fdpass_sock);
1243 PREP_DECL_MSG_CMSG(msg);
1245 ssize_t rs= recvmsg(fd, &msg, 0);
1247 if (isewouldblock(errno)) return OOP_CONTINUE;
1248 syswarn("failed to read socket from connecting child");
1253 LIST_INIT(conn->waiting);
1254 LIST_INIT(conn->priority);
1255 LIST_INIT(conn->sent);
1257 struct cmsghdr *h= 0;
1258 if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1260 int status= xwaitpid(&connecting_child, "connect child (broken)");
1262 if (WIFEXITED(status)) {
1263 if (WEXITSTATUS(status) != 0 &&
1264 WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1265 WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1266 /* child already reported the problem */;
1268 if (e == OOP_EXCEPTION)
1269 warn("connect: connection child exited code %d but"
1270 " unexpected exception on fdpass socket",
1271 WEXITSTATUS(status));
1273 warn("connect: connection child exited code %d but"
1275 WEXITSTATUS(status), (int)rs);
1277 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1278 warn("connect: connection attempt timed out");
1280 report_child_status("connect", status);
1285 #define CHK(field, val) \
1286 if (h->cmsg_##field != val) { \
1287 die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1288 h->cmsg_##field, val); \
1291 CHK(level, SOL_SOCKET);
1292 CHK(type, SCM_RIGHTS);
1293 CHK(len, CMSG_LEN(sizeof(conn->fd)));
1296 if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1298 memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1301 pid_t got= waitpid(connecting_child, &status, 0);
1302 if (got==-1) sysdie("connect: real wait for child");
1303 assert(got == connecting_child);
1304 connecting_child= 0;
1306 if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1307 int es= WEXITSTATUS(status);
1309 case CONNCHILD_ESTATUS_STREAM: conn->stream= 1; break;
1310 case CONNCHILD_ESTATUS_NOSTREAM: conn->stream= 0; break;
1312 fatal("connect: child gave unexpected exit status %d", es);
1316 conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1318 loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1319 conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1320 if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1321 int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1323 &peer_rd_err, conn);
1324 if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1326 notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1327 LIST_ADDHEAD(conns, conn);
1329 connect_attempt_discard();
1330 check_assign_articles();
1331 return OOP_CONTINUE;
1335 connect_attempt_discard();
1336 return OOP_CONTINUE;
1339 static int allow_connect_start(void) {
1340 return conns.count < max_connections
1341 && !connecting_child
1345 static void connect_start(void) {
1346 assert(!connecting_child);
1347 assert(!connecting_fdpass_sock);
1349 info("starting connection attempt");
1352 int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1353 if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1355 connecting_child= xfork("connection");
1357 if (!connecting_child) {
1358 FILE *cn_from, *cn_to;
1359 char buf[NNTP_STRLEN+100];
1360 int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1362 xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1364 alarm(connection_setup_timeout);
1365 if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1369 unsigned char c= buf[l-1];
1370 if (!isspace(c)) break;
1371 if (c=='\n' || c=='\r') stripped=1;
1375 sysfatal("connect: connection attempt failed");
1378 fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1382 if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1383 sysfatal("connect: authentication failed");
1385 if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1387 sysfatal("connect: could not send MODE STREAM");
1388 buf[sizeof(buf)-1]= 0;
1389 if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1390 if (ferror(cn_from))
1391 sysfatal("connect: could not read response to MODE STREAM");
1393 fatal("connect: connection close in response to MODE STREAM");
1398 fatal("connect: response to MODE STREAM is too long: %.100s...",
1400 l--; if (l>0 && buf[l-1]=='\r') l--;
1403 int rcode= strtoul(buf,&ep,10);
1405 fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1409 exitstatus= CONNCHILD_ESTATUS_STREAM;
1415 warn("connect: unexpected response to MODE STREAM: %.50s",
1421 int fd= fileno(cn_from);
1423 PREP_DECL_MSG_CMSG(msg);
1424 struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1425 cmsg->cmsg_level= SOL_SOCKET;
1426 cmsg->cmsg_type= SCM_RIGHTS;
1427 cmsg->cmsg_len= CMSG_LEN(sizeof(fd));
1428 memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1430 msg.msg_controllen= cmsg->cmsg_len;
1431 r= sendmsg(socks[1], &msg, 0);
1432 if (r<0) sysdie("sendmsg failed for new connection");
1433 if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1438 xclose(socks[1], "connecting fdpass child's socket",0);
1439 connecting_fdpass_sock= socks[0];
1440 xsetnonblock(connecting_fdpass_sock, 1);
1441 on_fd_read_except(connecting_fdpass_sock, connchild_event);
1444 /*---------- assigning articles to conns, and transmitting ----------*/
1446 static Article *dequeue_from(int peek, InputFile *ipf) {
1448 if (peek) return LIST_HEAD(ipf->queue);
1449 else return LIST_REMHEAD(ipf->queue);
1452 static Article *dequeue(int peek) {
1454 art= dequeue_from(peek, flushing_input_file); if (art) return art;
1455 art= dequeue_from(peek, backlog_input_file); if (art) return art;
1456 art= dequeue_from(peek, main_input_file); if (art) return art;
1460 static void check_assign_articles(void) {
1466 int spare=0, inqueue=0;
1468 /* Find a connection to offer this article. We prefer a busy
1469 * connection to an idle one, provided it's not full. We take the
1470 * first (oldest) and since that's stable, it will mean we fill up
1471 * connections in order. That way if we have too many
1472 * connections, the spare ones will go away eventually.
1475 if (walk->quitting) continue;
1476 inqueue= walk->sent.count + walk->priority.count
1477 + walk->waiting.count;
1478 spare= walk->max_queue - inqueue;
1479 assert(inqueue <= max_queue_per_conn);
1481 if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1482 else if (spare>0) /*working*/ { use= walk; break; }
1485 if (!inqueue) use->since_activity= 0; /* reset idle counter */
1487 Article *art= dequeue(0);
1489 LIST_ADDTAIL(use->waiting, art);
1492 conn_maybe_write(use);
1493 } else if (allow_connect_start()) {
1494 until_connect= reconnect_delay_periods;
1503 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1504 conn_maybe_write(u);
1505 return OOP_CONTINUE;
1508 static void conn_maybe_write(Conn *conn) {
1510 conn_make_some_xmits(conn);
1512 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1516 void *rp= conn_write_some_xmits(conn);
1517 if (rp==OOP_CONTINUE) {
1518 loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1520 } else if (rp==OOP_HALT) {
1523 /* transmitted everything */
1530 /*---------- expiry and deferral ----------*/
1532 static void article_defer(Article *art /* not on a queue */, int whichcount) {
1534 if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1536 sysfatal("write to defer file %s",path_defer);
1537 article_done(art, whichcount);
1540 static int article_check_expired(Article *art /* must be queued, not conn */) {
1541 ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
1542 if (artdata) { SMfreearticle(artdata); return 0; }
1544 LIST_REMOVE(art->ipf->queue, art);
1546 art->ipf->counts[art_Unchecked][RC_missing]++;
1547 article_done(art,-1);
1551 static void inputfile_queue_check_expired(InputFile *ipf) {
1555 Article *art= LIST_HEAD(ipf->queue);
1556 int exp= article_check_expired(art);
1561 static void article_autodefer(InputFile *ipf, Article *art) {
1563 article_defer(art,-1);
1566 static int has_article_in(const ArticleList *al, InputFile *ipf) {
1568 for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
1569 if (art->ipf == ipf) return 1;
1573 static void autodefer_input_file(InputFile *ipf) {
1577 while ((art= LIST_REMHEAD(ipf->queue)))
1578 article_autodefer(ipf, art);
1580 if (ipf->inprogress) {
1583 if (has_article_in(&walk->waiting, ipf) ||
1584 has_article_in(&walk->priority, ipf) ||
1585 has_article_in(&walk->sent, ipf))
1588 while (ipf->inprogress) {
1590 if (walk->quitting < 0) goto found;
1591 abort(); /* where are they ?? */
1594 connfail(walk, "connection is stuck or crawling,"
1595 " and we need to finish flush");
1600 /*========== article transmission ==========*/
1602 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1603 XmitKind kind) { /* caller must then fill in details */
1604 struct iovec *v= &conn->xmit[conn->xmitu];
1605 XmitDetails *d= &conn->xmitd[conn->xmitu++];
1606 v->iov_base= (char*)data;
1612 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1613 xmit_core(conn,data,len, xk_Const);
1615 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1617 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1618 XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1622 static void xmit_free(XmitDetails *d) {
1624 case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1625 case xk_Const: break;
1630 static void *conn_write_some_xmits(Conn *conn) {
1632 * 0: nothing more to write, no need to call us again
1633 * OOP_CONTINUE: more to write but fd not writeable
1634 * OOP_HALT: disaster, have destroyed conn
1637 int count= conn->xmitu;
1638 if (!count) return 0;
1640 if (count > IOV_MAX) count= IOV_MAX;
1641 ssize_t rs= writev(conn->fd, conn->xmit, count);
1643 if (isewouldblock(errno)) return OOP_CONTINUE;
1644 connfail(conn, "write failed: %s", strerror(errno));
1650 for (done=0; rs && done<conn->xmitu; done++) {
1651 struct iovec *vp= &conn->xmit[done];
1652 XmitDetails *dp= &conn->xmitd[done];
1653 if (rs > vp->iov_len) {
1657 vp->iov_base= (char*)vp->iov_base + rs;
1661 int newu= conn->xmitu - done;
1662 memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
1663 memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1668 static void conn_make_some_xmits(Conn *conn) {
1670 if (conn->xmitu+5 > CONNIOVS)
1673 Article *art= LIST_REMHEAD(conn->priority);
1674 if (!art) art= LIST_REMHEAD(conn->waiting);
1677 if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1678 /* actually send it */
1680 ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1683 art->state == art_Unchecked ? art_Unsolicited :
1684 art->state == art_Wanted ? art_Wanted :
1687 if (!artdata) art->missing= 1;
1688 art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1692 XMIT_LITERAL("TAKETHIS ");
1693 xmit_noalloc(conn, art->messageid, art->midlen);
1694 XMIT_LITERAL("\r\n");
1695 xmit_artbody(conn, artdata);
1697 article_done(art, -1);
1701 /* we got 235 from IHAVE */
1703 xmit_artbody(conn, artdata);
1705 XMIT_LITERAL(".\r\n");
1709 LIST_ADDTAIL(conn->sent, art);
1715 XMIT_LITERAL("CHECK ");
1717 XMIT_LITERAL("IHAVE ");
1718 xmit_noalloc(conn, art->messageid, art->midlen);
1719 XMIT_LITERAL("\r\n");
1721 assert(art->state == art_Unchecked);
1722 art->ipf->counts[art->state][RC_sent]++;
1723 LIST_ADDTAIL(conn->sent, art);
1728 /*========== handling responses from peer ==========*/
1730 static const oop_rd_style peer_rd_style= {
1731 OOP_RD_DELIM_STRIP, '\n',
1733 OOP_RD_SHORTREC_FORBID
1736 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1737 const char *errmsg, int errnoval,
1738 const char *data, size_t recsz, void *conn_v) {
1740 connfail(conn, "error receiving from peer: %s", errmsg);
1741 return OOP_CONTINUE;
1744 static Article *article_reply_check(Conn *conn, const char *response,
1745 int code_indicates_streaming,
1747 /* 1:yes, -1:no, 0:dontcare */,
1748 const char *sanitised_response) {
1749 Article *art= LIST_HEAD(conn->sent);
1753 "peer gave unexpected response when no commands outstanding: %s",
1754 sanitised_response);
1758 if (code_indicates_streaming) {
1759 assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1760 if (!conn->stream) {
1761 connfail(conn, "peer gave streaming response code "
1762 " to IHAVE or subsequent body: %s", sanitised_response);
1765 const char *got_mid= response+4;
1766 int got_midlen= strcspn(got_mid, " \n\r");
1767 if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1768 connfail(conn, "peer gave streaming response with syntactically invalid"
1769 " messageid: %s", sanitised_response);
1772 if (got_midlen != art->midlen ||
1773 memcmp(got_mid, art->messageid, got_midlen)) {
1774 connfail(conn, "peer gave streaming response code to wrong article -"
1775 " probable synchronisation problem; we offered: %s;"
1777 art->messageid, sanitised_response);
1782 connfail(conn, "peer gave non-streaming response code to"
1783 " CHECK/TAKETHIS: %s", sanitised_response);
1788 if (must_have_sent>0 && art->state < art_Wanted) {
1789 connfail(conn, "peer says article accepted but"
1790 " we had not sent the body: %s", sanitised_response);
1793 if (must_have_sent<0 && art->state >= art_Wanted) {
1794 connfail(conn, "peer says please sent the article but we just did: %s",
1795 sanitised_response);
1799 Article *art_again= LIST_REMHEAD(conn->sent);
1800 assert(art_again == art);
1804 static void update_nocheck(int accepted) {
1805 accept_proportion *= nocheck_decay;
1806 accept_proportion += accepted * (1.0 - nocheck_decay);
1807 int new_nocheck= accept_proportion >= nocheck_thresh;
1808 if (new_nocheck && !nocheck_reported) {
1809 notice("entering nocheck mode for the first time");
1810 nocheck_reported= 1;
1811 } else if (new_nocheck != nocheck) {
1812 debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1814 nocheck= new_nocheck;
1817 static void article_done(Article *art, int whichcount) {
1818 if (!art->missing) art->ipf->counts[art->state][whichcount]++;
1820 if (whichcount == RC_accepted) update_nocheck(1);
1821 else if (whichcount == RC_unwanted) update_nocheck(0);
1823 InputFile *ipf= art->ipf;
1825 while (art->blanklen) {
1826 static const char spaces[]=
1836 int w= art->blanklen; if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1837 int r= pwrite(ipf->fd, spaces, w, art->offset);
1839 if (errno==EINTR) continue;
1840 sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1841 art->messageid, art->blanklen,
1842 (unsigned long)art->offset, ipf->path);
1844 assert(r>=0 && r<=w);
1850 assert(ipf->inprogress >= 0);
1853 if (!ipf->inprogress && ipf != main_input_file)
1854 queue_check_input_done();
1857 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1858 const char *errmsg, int errnoval,
1859 const char *data, size_t recsz, void *conn_v) {
1862 if (ev == OOP_RD_EOF) {
1863 connfail(conn, "unexpected EOF from peer");
1864 return OOP_CONTINUE;
1866 assert(ev == OOP_RD_OK);
1868 char *sani= sanitise(data,-1);
1871 unsigned long code= strtoul(data, &ep, 10);
1872 if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1873 connfail(conn, "badly formatted response from peer: %s", sani);
1874 return OOP_CONTINUE;
1878 conn->waiting.count ||
1879 conn->priority.count ||
1883 if (conn->quitting) {
1884 if (code!=205 && code!=503) {
1885 connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1887 notice("C%d idle connection closed by us", conn->fd);
1889 LIST_REMOVE(conns,conn);
1892 return OOP_CONTINUE;
1895 conn->since_activity= 0;
1898 #define GET_ARTICLE(musthavesent) do{ \
1899 art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1900 if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */ \
1903 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \
1904 code_streaming= (streaming); \
1905 GET_ARTICLE(musthavesent); \
1906 article_done(art, RC_##how); \
1910 #define PEERBADMSG(m) do { \
1911 connfail(conn, m ": %s", sani); return OOP_CONTINUE; \
1914 int code_streaming= 0;
1918 case 400: PEERBADMSG("peer stopped accepting articles");
1919 default: PEERBADMSG("peer sent unexpected message");
1922 if (conn_busy) PEERBADMSG("peer timed us out");
1923 notice("C%d idle connection closed by peer", conn->fd);
1924 LIST_REMOVE(conns,conn);
1926 return OOP_CONTINUE;
1928 case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1929 case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1931 case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1932 case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1934 case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1935 case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1937 case 238: /* CHECK says send it */
1939 case 335: /* IHAVE says send it */
1941 assert(art->state == art_Unchecked);
1942 art->ipf->counts[art->state][RC_accepted]++;
1943 art->state= art_Wanted;
1944 LIST_ADDTAIL(conn->priority, art);
1947 case 431: /* CHECK or TAKETHIS says try later */
1949 case 436: /* IHAVE says try later */
1951 article_defer(art, RC_deferred);
1957 conn_maybe_write(conn);
1958 check_assign_articles();
1959 return OOP_CONTINUE;
1963 /*========== monitoring of input files ==========*/
1965 static void feedfile_eof(InputFile *ipf) {
1966 assert(ipf != main_input_file); /* promised by tailing_try_read */
1967 inputfile_reading_stop(ipf);
1969 if (ipf == flushing_input_file) {
1970 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1971 if (main_input_file) inputfile_reading_start(main_input_file);
1972 statemc_check_flushing_done();
1973 } else if (ipf == backlog_input_file) {
1974 statemc_check_backlog_done();
1976 abort(); /* supposed to wait rather than get EOF on main input file */
1980 static InputFile *open_input_file(const char *path) {
1981 int fd= open(path, O_RDWR);
1983 if (errno==ENOENT) return 0;
1984 sysfatal("unable to open input file %s", path);
1988 InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
1989 memset(ipf,0,sizeof(*ipf));
1993 LIST_INIT(ipf->queue);
1994 strcpy(ipf->path, path);
1999 static void close_input_file(InputFile *ipf) { /* does not free */
2000 assert(!ipf->readable_callback); /* must have had ->on_cancel */
2001 assert(!ipf->filemon); /* must have had inputfile_reading_stop */
2002 assert(!ipf->rd); /* must have had inputfile_reading_stop */
2003 assert(!ipf->inprogress); /* no dangling pointers pointing here */
2004 xclose_perhaps(&ipf->fd, "input file ", ipf->path);
2008 /*---------- dealing with articles read in the input file ----------*/
2010 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
2011 const char *data, const char *how) {
2012 warn("corrupted file: %s, offset %lu: %s: in %s",
2013 ipf->path, (unsigned long)offset, how, sanitise(data,-1));
2014 ipf->readcount_err++;
2015 if (ipf->readcount_err > max_bad_data_initial +
2016 (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
2017 die("too much garbage in input file! (%d errs, %d ok, %d blank)",
2018 ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
2019 return OOP_CONTINUE;
2022 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
2023 oop_rd_event ev, const char *errmsg,
2024 int errnoval, const char *data, size_t recsz,
2026 InputFile *ipf= ipf_v;
2027 assert(ev == OOP_RD_SYSTEM);
2029 sysdie("error reading input file: %s, offset %lu",
2030 ipf->path, (unsigned long)ipf->offset);
2033 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
2034 oop_rd_event ev, const char *errmsg,
2035 int errnoval, const char *data, size_t recsz,
2037 InputFile *ipf= ipf_v;
2039 char tokentextbuf[sizeof(TOKEN)*2+3];
2041 if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
2043 off_t old_offset= ipf->offset;
2044 ipf->offset += recsz + !!(ev == OOP_RD_OK);
2046 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
2048 if (ev==OOP_RD_PARTREC)
2049 feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
2050 /* but process it anyway */
2052 if (ipf->skippinglong) {
2053 if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
2054 return OOP_CONTINUE;
2056 if (ev==OOP_RD_LONG) {
2057 ipf->skippinglong= 1;
2058 X_BAD_DATA("overly long line");
2061 if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
2062 if (!recsz) X_BAD_DATA("empty line");
2065 if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
2066 ipf->readcount_blank++;
2067 return OOP_CONTINUE;
2070 char *space= strchr(data,' ');
2071 int tokenlen= space-data;
2072 int midlen= (int)recsz-tokenlen-1;
2073 if (midlen <= 2) X_BAD_DATA("no room for messageid");
2074 if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
2076 if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
2077 memcpy(tokentextbuf, data, tokenlen);
2078 tokentextbuf[tokenlen]= 0;
2079 if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
2081 ipf->readcount_ok++;
2083 art= xmalloc(sizeof(*art) - 1 + midlen + 1);
2084 memset(art,0,sizeof(*art));
2085 art->state= art_Unchecked;
2086 art->midlen= midlen;
2087 art->ipf= ipf; ipf->inprogress++;
2088 art->token= TextToToken(tokentextbuf);
2089 art->offset= old_offset;
2090 art->blanklen= recsz;
2091 strcpy(art->messageid, space+1);
2092 LIST_ADDTAIL(ipf->queue, art);
2094 if (ipf->autodefer >= 0)
2095 article_autodefer(ipf, art);
2096 else if (ipf==backlog_input_file)
2097 article_check_expired(art);
2099 if (sms==sm_NORMAL && ipf==main_input_file &&
2100 ipf->offset >= target_max_feedfile_size)
2101 statemc_start_flush("feed file size");
2103 check_assign_articles();
2104 return OOP_CONTINUE;
2107 /*========== tailing input file ==========*/
2109 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
2111 InputFile *ipf= user;
2112 return ipf->readable_callback(loop, &ipf->readable,
2113 ipf->readable_callback_user);
2116 static void tailing_on_cancel(struct oop_readable *rable) {
2117 InputFile *ipf= (void*)rable;
2119 if (ipf->filemon) filemon_stop(ipf);
2120 loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2121 ipf->readable_callback= 0;
2124 static void tailing_queue_readable(InputFile *ipf) {
2125 /* lifetime of ipf here is OK because destruction will cause
2126 * on_cancel which will cancel this callback */
2127 loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2130 static int tailing_on_readable(struct oop_readable *rable,
2131 oop_readable_call *cb, void *user) {
2132 InputFile *ipf= (void*)rable;
2134 tailing_on_cancel(rable);
2135 ipf->readable_callback= cb;
2136 ipf->readable_callback_user= user;
2139 tailing_queue_readable(ipf);
2143 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2145 InputFile *ipf= (void*)rable;
2147 ssize_t r= read(ipf->fd, buffer, length);
2149 if (errno==EINTR) continue;
2153 if (ipf==main_input_file) {
2156 } else if (ipf==flushing_input_file) {
2158 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2159 } else if (ipf==backlog_input_file) {
2165 tailing_queue_readable(ipf);
2170 /*---------- filemon implemented with inotify ----------*/
2172 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2173 #define HAVE_FILEMON
2175 #include <sys/inotify.h>
2177 static int filemon_inotify_fd;
2178 static int filemon_inotify_wdmax;
2179 static InputFile **filemon_inotify_wd2ipf;
2181 struct Filemon_Perfile {
2185 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2186 int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2187 if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2189 if (wd >= filemon_inotify_wdmax) {
2191 filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2192 sizeof(*filemon_inotify_wd2ipf) * newmax);
2193 memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2194 sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2195 filemon_inotify_wdmax= newmax;
2198 assert(!filemon_inotify_wd2ipf[wd]);
2199 filemon_inotify_wd2ipf[wd]= ipf;
2201 debug("filemon inotify startfile %p wd=%d wdmax=%d",
2202 ipf, wd, filemon_inotify_wdmax);
2207 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2209 debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2210 int r= inotify_rm_watch(filemon_inotify_fd, wd);
2211 if (r) sysdie("inotify_rm_watch");
2212 filemon_inotify_wd2ipf[wd]= 0;
2215 static void *filemon_inotify_readable(oop_source *lp, int fd,
2216 oop_event e, void *u) {
2217 struct inotify_event iev;
2219 int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2221 if (isewouldblock(errno)) break;
2222 sysdie("read from inotify master");
2223 } else if (r==sizeof(iev)) {
2224 assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2226 die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2228 InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2229 debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2230 filemon_callback(ipf);
2232 return OOP_CONTINUE;
2235 static int filemon_method_init(void) {
2236 filemon_inotify_fd= inotify_init();
2237 if (filemon_inotify_fd<0) {
2238 syswarn("filemon/inotify: inotify_init failed");
2241 xsetnonblock(filemon_inotify_fd, 1);
2242 loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2244 debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2248 static void filemon_method_dump_info(FILE *f) {
2250 fprintf(f,"inotify");
2251 DUMPV("%d",,filemon_inotify_fd);
2252 DUMPV("%d",,filemon_inotify_wdmax);
2253 for (i=0; i<filemon_inotify_wdmax; i++)
2254 fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i],);
2257 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2259 /*---------- filemon dummy implementation ----------*/
2261 #if !defined(HAVE_FILEMON)
2263 struct Filemon_Perfile { int dummy; };
2265 static int filemon_method_init(void) {
2266 warn("filemon/dummy: no filemon method compiled in");
2269 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2270 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2271 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2273 #endif /* !HAVE_FILEMON */
2275 /*---------- filemon generic interface ----------*/
2277 static void filemon_start(InputFile *ipf) {
2278 assert(!ipf->filemon);
2281 filemon_method_startfile(ipf, ipf->filemon);
2284 static void filemon_stop(InputFile *ipf) {
2285 if (!ipf->filemon) return;
2286 filemon_method_stopfile(ipf, ipf->filemon);
2291 static void filemon_callback(InputFile *ipf) {
2292 if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2293 ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2296 /*---------- interface to start and stop an input file ----------*/
2298 static const oop_rd_style feedfile_rdstyle= {
2299 OOP_RD_DELIM_STRIP, '\n',
2301 OOP_RD_SHORTREC_LONG,
2304 static void inputfile_reading_start(InputFile *ipf) {
2306 ipf->readable.on_readable= tailing_on_readable;
2307 ipf->readable.on_cancel= tailing_on_cancel;
2308 ipf->readable.try_read= tailing_try_read;
2309 ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2310 ipf->readable.delete_kill= 0;
2312 ipf->readable_callback= 0;
2313 ipf->readable_callback_user= 0;
2315 ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2318 int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2319 feedfile_got_article,ipf, feedfile_read_err, ipf);
2320 if (r) sysdie("unable start reading feedfile %s",ipf->path);
2323 static void inputfile_reading_stop(InputFile *ipf) {
2325 oop_rd_cancel(ipf->rd);
2326 oop_rd_delete(ipf->rd);
2328 assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2332 /*========== interaction with innd - state machine ==========*/
2334 /* See official state diagram at top of file. We implement
2345 |`---------------------------------------------------.
2347 |`---------------- - - - |
2348 D ENOENT | D EXISTS see OVERALL STATES diagram |
2349 | for full startup logic |
2352 | ============ try to |
2358 | | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT |
2359 ^ | hardlink F to D |
2362 | | our handle onto F is now onto D |
2365 | |<-------------------<---------------------<---------+
2367 | | spawn inndcomm flush |
2369 | ================== |
2370 | FLUSHING[-ABSENT] |
2372 | main D tail/none |
2373 | ================== |
2375 | | INNDCOMM FLUSH FAILS ^
2376 | |`----------------------->----------. |
2378 | | NO SUCH SITE V |
2379 ^ |`--------------->----. ==================== |
2380 | | \ FLUSHFAILED[-ABSENT] |
2382 | | FLUSH OK \ main D tail/none |
2383 | | open F \ ==================== |
2385 | | \ | TIME TO RETRY |
2386 | |`------->----. ,---<---'\ `----------------'
2387 | | D NONE | | D NONE `----.
2389 | ============= V V ============
2390 | SEPARATED-1 | | DROPPING-1
2391 | flsh->rd!=0 | | flsh->rd!=0
2392 | [Separated] | | [Dropping]
2393 | main F idle | | main none
2394 | flsh D tail | | flsh D tail
2395 | ============= | | ============
2397 ^ | EOF ON D | | defer | EOF ON D
2399 | =============== | | ===============
2400 | SEPARATED-2 | | DROPPING-2
2401 | flsh->rd==0 | V flsh->rd==0
2402 | [Finishing] | | [Dropping]
2403 | main F tail | `. main none
2404 | flsh D closed | `. flsh D closed
2405 | =============== V `. ===============
2407 | | ALL D PROCESSED `. | ALL D PROCESSED
2408 | V install defer as backlog `. | install defer
2409 ^ | close D `. | close D
2410 | | unlink D `. | unlink D
2413 `----------' ==============
2433 static void startup_set_input_file(InputFile *f) {
2434 assert(!main_input_file);
2436 inputfile_reading_start(f);
2439 static void statemc_lock(void) {
2441 struct stat stab, stabf;
2444 lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2445 if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2448 memset(&fl,0,sizeof(fl));
2450 fl.l_whence= SEEK_SET;
2451 int r= fcntl(lockfd, F_SETLK, &fl);
2453 if (errno==EACCES || isewouldblock(errno)) {
2454 if (quiet_multiple) exit(0);
2455 fatal("another duct holds the lockfile");
2457 sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2460 xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2462 xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2464 if (!lock_noent && samefile(&stab, &stabf))
2467 xclose(lockfd, "stale lockfile ", path_lock);
2470 FILE *lockfile= fdopen(lockfd, "w");
2471 if (!lockfile) sysdie("fdopen lockfile");
2473 int r= ftruncate(lockfd, 0);
2474 if (r) sysdie("truncate lockfile to write new info");
2476 if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2477 (unsigned long)self_pid,
2478 sitename, feedfile, remote_host) == EOF ||
2480 sysfatal("write info to lockfile %s", path_lock);
2482 debug("startup: locked");
2485 static void statemc_init(void) {
2486 struct stat stabdefer;
2488 search_backlog_file();
2491 xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2493 debug("startup: ductdefer ENOENT");
2495 debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2496 switch (stabdefer.st_nlink==1) {
2498 open_defer(); /* so that we will later close it and rename it */
2501 xunlink(path_defer, "stale defer file link"
2502 " (presumably hardlink to backlog file)");
2505 die("defer file %s has unexpected link count %d",
2506 path_defer, stabdefer.st_nlink);
2510 struct stat stab_f, stab_d;
2513 InputFile *file_d= open_input_file(path_flushing);
2514 if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2516 xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2518 if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2519 debug("startup: F==D => Hardlinked");
2520 xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2525 debug("startup: F ENOENT => Moved");
2526 if (file_d) startup_set_input_file(file_d);
2527 spawn_inndcomm_flush("feedfile missing at startup");
2528 /* => Flushing, sms:=FLUSHING */
2531 debug("startup: F!=D => Separated");
2532 startup_set_input_file(file_d);
2533 flushing_input_file= main_input_file;
2534 main_input_file= open_input_file(feedfile);
2535 if (!main_input_file) die("feedfile vanished during startup");
2536 SMS(SEPARATED, max_separated_periods,
2537 "found both old and current feed files");
2539 debug("startup: F exists, D ENOENT => Normal");
2540 InputFile *file_f= open_input_file(feedfile);
2541 if (!file_f) die("feed file vanished during startup");
2542 startup_set_input_file(file_f);
2543 SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2548 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2549 assert(sms == sm_NORMAL);
2551 debug("starting flush (%s) (%lu >?= %lu) (%d)",
2553 (unsigned long)(main_input_file ? main_input_file->offset : 0),
2554 (unsigned long)target_max_feedfile_size,
2557 int r= link(feedfile, path_flushing);
2558 if (r) sysfatal("link feedfile %s to flushing file %s",
2559 feedfile, path_flushing);
2562 xunlink(feedfile, "old feedfile link");
2565 spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2568 static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
2572 statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2575 case sm_FLUSHFAILED:
2576 spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2581 warn("took too long to complete old feedfile after flush, autodeferring");
2582 assert(flushing_input_file);
2583 autodefer_input_file(flushing_input_file);
2591 static void statemc_period_poll(void) {
2592 if (!until_flush) return;
2594 assert(until_flush>=0);
2596 if (until_flush) return;
2597 int ok= trigger_flush_ok();
2601 static int inputfile_is_done(InputFile *ipf) {
2603 if (ipf->inprogress) return 0; /* new article in the meantime */
2604 if (ipf->rd) return 0; /* not had EOF */
2608 static void notice_processed(InputFile *ipf, int completed,
2609 const char *what, const char *spec) {
2610 if (!ipf) return; /* allows preterminate to be lazy */
2612 #define RCI_NOTHING(x) /* nothing */
2613 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2614 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2616 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2618 char *inprog= completed
2619 ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2620 : xasprintf(" inprogress=%ld", ipf->inprogress);
2621 char *autodefer= ipf->autodefer >= 0
2622 ? xasprintf(" autodeferred=%ld", ipf->autodefer)
2623 : xasprintf("%s","");
2625 info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
2626 " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2627 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2629 completed?"completed":"processed", what, spec,
2630 ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
2632 CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2633 , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2634 CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2635 , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2636 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
2645 static void statemc_check_backlog_done(void) {
2646 InputFile *ipf= backlog_input_file;
2647 if (!inputfile_is_done(ipf)) return;
2649 const char *slash= strrchr(ipf->path, '/');
2650 const char *leaf= slash ? slash+1 : ipf->path;
2651 const char *under= strchr(slash, '_');
2652 const char *rest= under ? under+1 : leaf;
2653 if (!strncmp(rest,"backlog",7)) rest += 7;
2654 notice_processed(ipf,1,"backlog ",rest);
2656 close_input_file(ipf);
2657 if (unlink(ipf->path)) {
2658 if (errno != ENOENT)
2659 sysdie("could not unlink processed backlog file %s", ipf->path);
2660 warn("backlog file %s vanished while we were reading it"
2661 " so we couldn't remove it (but it's done now, anyway)",
2665 backlog_input_file= 0;
2666 search_backlog_file();
2670 static void statemc_check_flushing_done(void) {
2671 InputFile *ipf= flushing_input_file;
2672 if (!inputfile_is_done(ipf)) return;
2674 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2676 notice_processed(ipf,1,"feedfile","");
2680 xunlink(path_flushing, "old flushing file");
2682 close_input_file(flushing_input_file);
2683 free(flushing_input_file);
2684 flushing_input_file= 0;
2686 if (sms==sm_SEPARATED) {
2687 notice("flush complete");
2688 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2689 } else if (sms==sm_DROPPING) {
2690 SMS(DROPPED, max_separated_periods, "old flush complete");
2691 search_backlog_file();
2692 notice("feed dropped, but will continue until backlog is finished");
2696 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2698 assert(!inputfile_is_done(main_input_file));
2699 statemc_check_flushing_done();
2700 statemc_check_backlog_done();
2701 return OOP_CONTINUE;
2704 static void queue_check_input_done(void) {
2705 loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2708 static void statemc_setstate(StateMachineState newsms, int periods,
2709 const char *forlog, const char *why) {
2711 until_flush= periods;
2713 const char *xtra= "";
2716 case sm_FLUSHFAILED:
2717 if (!main_input_file) xtra= "-ABSENT";
2721 xtra= flushing_input_file->rd ? "-1" : "-2";
2727 info("state %s%s[%d] %s",forlog,xtra,periods,why);
2729 info("state %s%s %s",forlog,xtra,why);
2733 /*---------- defer and backlog files ----------*/
2735 static void open_defer(void) {
2740 defer= fopen(path_defer, "a+");
2741 if (!defer) sysfatal("could not open defer file %s", path_defer);
2743 /* truncate away any half-written records */
2745 xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2747 if (stab.st_size > LONG_MAX)
2748 die("defer file %s size is far too large", path_defer);
2753 long orgsize= stab.st_size;
2754 long truncto= stab.st_size;
2756 if (!truncto) break; /* was only (if anything) one half-truncated record */
2757 if (fseek(defer, truncto-1, SEEK_SET) < 0)
2758 sysdie("seek in defer file %s while truncating partial", path_defer);
2763 sysdie("failed read from defer file %s", path_defer);
2765 die("defer file %s shrank while we were checking it!", path_defer);
2771 if (stab.st_size != truncto) {
2772 warn("truncating half-record at end of defer file %s -"
2773 " shrinking by %ld bytes from %ld to %ld",
2774 path_defer, orgsize - truncto, orgsize, truncto);
2777 sysfatal("could not flush defer file %s", path_defer);
2778 if (ftruncate(fileno(defer), truncto))
2779 sysdie("could not truncate defer file %s", path_defer);
2782 info("continuing existing defer file %s (%ld bytes)",
2783 path_defer, orgsize);
2785 if (fseek(defer, truncto, SEEK_SET))
2786 sysdie("could not seek to new end of defer file %s", path_defer);
2789 static void close_defer(void) {
2794 xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2796 if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2799 time_t now= xtime();
2801 char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2803 (unsigned long)stab.st_ino);
2804 if (link(path_defer, backlog))
2805 sysfatal("could not install defer file %s as backlog file %s",
2806 path_defer, backlog);
2807 if (unlink(path_defer))
2808 sysdie("could not unlink old defer link %s to backlog file %s",
2809 path_defer, backlog);
2813 if (until_backlog_nextscan < 0 ||
2814 until_backlog_nextscan > backlog_retry_minperiods + 1)
2815 until_backlog_nextscan= backlog_retry_minperiods + 1;
2818 static void poll_backlog_file(void) {
2819 if (until_backlog_nextscan < 0) return;
2820 if (until_backlog_nextscan-- > 0) return;
2821 search_backlog_file();
2824 static void search_backlog_file(void) {
2825 /* returns non-0 iff there are any backlog files */
2830 const char *oldest_path=0;
2831 time_t oldest_mtime=0, now;
2833 if (backlog_input_file) return;
2837 r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2841 sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2843 fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2845 for (i=0; i<gl.gl_pathc; i++) {
2846 const char *path= gl.gl_pathv[i];
2848 if (strchr(path,'#') || strchr(path,'~')) {
2849 debug("backlog file search skipping %s", path);
2852 r= stat(path, &stab);
2854 syswarn("failed to stat backlog file %s", path);
2857 if (!S_ISREG(stab.st_mode)) {
2858 warn("backlog file %s is not a plain file (or link to one)", path);
2861 if (!oldest_path || stab.st_mtime < oldest_mtime) {
2863 oldest_mtime= stab.st_mtime;
2866 case GLOB_NOMATCH: /* fall through */
2869 sysdie("glob expansion of backlog pattern %s gave unexpected"
2870 " nonzero (error?) return value %d", globpat_backlog, r);
2874 debug("backlog scan: none");
2876 if (sms==sm_DROPPED) {
2878 notice("feed dropped and our work is complete");
2880 int r= unlink(path_control);
2881 if (r && errno!=ENOENT)
2882 syswarn("failed to remove control symlink for old feed");
2884 xunlink(path_lock, "lockfile for old feed");
2887 until_backlog_nextscan= backlog_spontrescan_periods;
2892 double age= difftime(now, oldest_mtime);
2893 long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2895 if (age_deficiency <= 0) {
2896 debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2897 age, age_deficiency, oldest_path);
2899 backlog_input_file= open_input_file(oldest_path);
2900 if (!backlog_input_file) {
2901 warn("backlog file %s vanished as we opened it", oldest_path);
2905 inputfile_reading_start(backlog_input_file);
2906 until_backlog_nextscan= -1;
2910 until_backlog_nextscan= age_deficiency / period_seconds;
2912 if (backlog_spontrescan_periods >= 0 &&
2913 until_backlog_nextscan > backlog_spontrescan_periods)
2914 until_backlog_nextscan= backlog_spontrescan_periods;
2916 debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2917 age, age_deficiency, until_backlog_nextscan, oldest_path);
2924 /*---------- shutdown and signal handling ----------*/
2926 static void preterminate(void) {
2927 if (in_child) return;
2928 notice_processed(main_input_file,0,"feedfile","");
2929 notice_processed(flushing_input_file,0,"flushing","");
2930 if (backlog_input_file)
2931 notice_processed(backlog_input_file,0, "backlog file ",
2932 backlog_input_file->path);
2935 static int signal_self_pipe[2];
2936 static sig_atomic_t terminate_sig_flag;
2938 static void raise_default(int signo) {
2939 xsigsetdefault(signo);
2944 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2945 assert(fd=signal_self_pipe[0]);
2947 int r= read(signal_self_pipe[0], buf, sizeof(buf));
2948 if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2949 if (r==0) die("eof on signal self pipe");
2950 if (terminate_sig_flag) {
2952 notice("terminating (%s)", strsignal(terminate_sig_flag));
2953 raise_default(terminate_sig_flag);
2955 return OOP_CONTINUE;
2958 static void sigarrived_handler(int signum) {
2963 if (!terminate_sig_flag) terminate_sig_flag= signum;
2968 write(signal_self_pipe[1],&x,1);
2971 static void init_signals(void) {
2972 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
2973 sysdie("could not ignore SIGPIPE");
2975 if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
2977 xsetnonblock(signal_self_pipe[0],1);
2978 xsetnonblock(signal_self_pipe[1],1);
2980 struct sigaction sa;
2981 memset(&sa,0,sizeof(sa));
2982 sa.sa_handler= sigarrived_handler;
2983 sa.sa_flags= SA_RESTART;
2984 xsigaction(SIGTERM,&sa);
2985 xsigaction(SIGINT,&sa);
2987 on_fd_read_except(signal_self_pipe[0], sigarrived_event);
2990 /*========== flushing the feed ==========*/
2992 static pid_t inndcomm_child;
2993 static int inndcomm_sentinel_fd;
2995 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
2996 assert(inndcomm_child);
2997 assert(fd == inndcomm_sentinel_fd);
2998 int status= xwaitpid(&inndcomm_child, "inndcomm");
3001 cancel_fd_read_except(fd);
3002 xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
3003 inndcomm_sentinel_fd= 0;
3005 assert(!flushing_input_file);
3007 if (WIFEXITED(status)) {
3008 switch (WEXITSTATUS(status)) {
3010 case INNDCOMMCHILD_ESTATUS_FAIL:
3013 case INNDCOMMCHILD_ESTATUS_NONESUCH:
3014 notice("feed has been dropped by innd, finishing up");
3015 flushing_input_file= main_input_file;
3016 tailing_queue_readable(flushing_input_file);
3017 /* we probably previously returned EAGAIN from our fake read method
3018 * when in fact we were at EOF, so signal another readable event
3019 * so we actually see the EOF */
3023 if (flushing_input_file) {
3024 SMS(DROPPING, max_separated_periods,
3025 "feed dropped by innd, but must finish last flush");
3028 SMS(DROPPED, 0, "feed dropped by innd");
3029 search_backlog_file();
3031 return OOP_CONTINUE;
3035 flushing_input_file= main_input_file;
3036 tailing_queue_readable(flushing_input_file);
3038 main_input_file= open_input_file(feedfile);
3039 if (!main_input_file)
3040 die("flush succeeded but feedfile %s does not exist!", feedfile);
3042 if (flushing_input_file) {
3043 SMS(SEPARATED, max_separated_periods, "recovery flush complete");
3046 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
3048 return OOP_CONTINUE;
3051 goto unexpected_exitstatus;
3054 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
3055 warn("flush timed out trying to talk to innd");
3058 unexpected_exitstatus:
3059 report_child_status("inndcomm child", status);
3063 SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
3064 return OOP_CONTINUE;
3067 static void inndcommfail(const char *what) {
3068 syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
3069 exit(INNDCOMMCHILD_ESTATUS_FAIL);
3072 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
3075 notice("flushing %s",why);
3077 assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
3078 assert(!inndcomm_child);
3079 assert(!inndcomm_sentinel_fd);
3081 if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
3083 inndcomm_child= xfork("inndcomm child");
3085 if (!inndcomm_child) {
3086 const char *flushargv[2]= { sitename, 0 };
3090 xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
3091 /* parent spots the autoclose of pipefds[1] when we die or exit */
3093 if (simulate_flush>=0) {
3094 warn("SIMULATING flush child status %d", simulate_flush);
3095 if (simulate_flush>128) raise(simulate_flush-128);
3096 else exit(simulate_flush);
3099 alarm(inndcomm_flush_timeout);
3100 r= ICCopen(); if (r) inndcommfail("connect");
3101 r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit");
3102 if (!r) exit(0); /* yay! */
3104 if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
3105 syswarn("innd ctlinnd flush failed: innd said %s", reply);
3106 exit(INNDCOMMCHILD_ESTATUS_FAIL);
3111 xclose(pipefds[1], "inndcomm sentinel child's end",0);
3112 inndcomm_sentinel_fd= pipefds[0];
3113 assert(inndcomm_sentinel_fd);
3114 on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
3116 SMS(FLUSHING, 0, why);
3119 /*========== main program ==========*/
3121 static void postfork_inputfile(InputFile *ipf) {
3123 xclose(ipf->fd, "(in child) input file ", ipf->path);
3126 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
3127 /* we have no stdio streams that are buffered long-term */
3129 if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
3132 static void postfork(void) {
3135 xsigsetdefault(SIGTERM);
3136 xsigsetdefault(SIGINT);
3137 xsigsetdefault(SIGPIPE);
3138 if (terminate_sig_flag) raise(terminate_sig_flag);
3140 postfork_inputfile(main_input_file);
3141 postfork_inputfile(flushing_input_file);
3145 conn_closefd(conn,"(in child) ");
3147 postfork_stdio(defer, "defer file ", path_defer);
3150 typedef struct Every Every;
3152 struct timeval interval;
3157 static void every_schedule(Every *e, struct timeval base);
3159 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3162 if (!e->fixed_rate) xgettimeofday(&base);
3163 every_schedule(e, base);
3164 return OOP_CONTINUE;
3167 static void every_schedule(Every *e, struct timeval base) {
3168 struct timeval when;
3169 timeradd(&base, &e->interval, &when);
3170 loop->on_time(loop, when, every_happens, e);
3173 static void every(int interval, int fixed_rate, void (*f)(void)) {
3174 NEW_DECL(Every *,e);
3175 e->interval.tv_sec= interval;
3176 e->interval.tv_usec= 0;
3177 e->fixed_rate= fixed_rate;
3180 xgettimeofday(&now);
3181 every_schedule(e, now);
3184 static void filepoll(void) {
3185 filemon_callback(main_input_file);
3186 filemon_callback(flushing_input_file);
3189 static char *debug_report_ipf(InputFile *ipf) {
3190 if (!ipf) return xasprintf("none");
3192 const char *slash= strrchr(ipf->path,'/');
3193 const char *path= slash ? slash+1 : ipf->path;
3195 return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s",
3197 ipf->queue.count, ipf->inprogress, ipf->autodefer,
3198 (long)ipf->offset, ipf->fd,
3199 ipf->rd ? "" : ",!rd",
3200 ipf->skippinglong ? "*skiplong" : "");
3203 static void period(void) {
3204 char *dipf_main= debug_report_ipf(main_input_file);
3205 char *dipf_flushing= debug_report_ipf(flushing_input_file);
3206 char *dipf_backlog= debug_report_ipf(backlog_input_file);
3209 " sms=%s[%d] conns=%d until_connect=%d"
3210 " input_files main:%s flushing:%s backlog:%s"
3211 " children connecting=%ld inndcomm=%ld"
3213 sms_names[sms], until_flush, conns.count, until_connect,
3214 dipf_main, dipf_flushing, dipf_backlog,
3215 (long)connecting_child, (long)inndcomm_child
3219 free(dipf_flushing);
3222 if (until_connect) until_connect--;
3224 inputfile_queue_check_expired(backlog_input_file);
3225 poll_backlog_file();
3226 if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3227 statemc_period_poll();
3228 check_assign_articles();
3233 /*========== dumping state ==========*/
3235 static void dump_article_list(FILE *f, const ControlCommand *c,
3236 const ArticleList *al) {
3237 fprintf(f, " count=%d\n", al->count);
3238 if (!c->xval) return;
3240 int i; Article *art;
3241 for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3242 fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3243 DUMPV("%p", art->,ipf);
3244 DUMPV("%d", art->,missing);
3245 DUMPV("%lu", (unsigned long)art->,offset);
3246 DUMPV("%d", art->,blanklen);
3247 DUMPV("%d", art->,midlen);
3248 fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3252 static void dump_input_file(FILE *f, const ControlCommand *c,
3253 InputFile *ipf, const char *wh) {
3254 char *dipf= debug_report_ipf(ipf);
3255 fprintf(f,"input %s %s", wh, dipf);
3259 DUMPV("%d", ipf->,readcount_ok);
3260 DUMPV("%d", ipf->,readcount_blank);
3261 DUMPV("%d", ipf->,readcount_err);
3265 ArtState state; const char *const *statename;
3266 for (state=0, statename=artstate_names; *statename; state++,statename++) {
3267 #define RC_DUMP_FMT(x) " " #x "=%d"
3268 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3269 fprintf(f,"input %s counts %-11s"
3270 RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3272 RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3274 fprintf(f,"input %s queue", wh);
3275 dump_article_list(f,c,&ipf->queue);
3281 fprintf(cc->out, "dumping state to %s\n", path_dump);
3282 FILE *f= fopen(path_dump, "w");
3283 if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3285 fprintf(f,"general");
3286 DUMPV("%s", sms_names,[sms]);
3287 DUMPV("%d", ,until_flush);
3288 DUMPV("%ld", (long),self_pid);
3289 DUMPV("%p", , defer);
3290 DUMPV("%d", , until_connect);
3291 DUMPV("%d", , until_backlog_nextscan);
3292 DUMPV("%d", , simulate_flush);
3293 fprintf(f,"\nnocheck");
3294 DUMPV("%#.10f", , accept_proportion);
3295 DUMPV("%d", , nocheck);
3296 DUMPV("%d", , nocheck_reported);
3299 fprintf(f,"special");
3300 DUMPV("%ld", (long),connecting_child);
3301 DUMPV("%d", , connecting_fdpass_sock);
3302 DUMPV("%d", , control_master);
3305 fprintf(f,"filemon ");
3306 filemon_method_dump_info(f);
3308 dump_input_file(f,c, main_input_file, "main" );
3309 dump_input_file(f,c, flushing_input_file, "flushing");
3310 dump_input_file(f,c, backlog_input_file, "backlog" );
3312 fprintf(f,"conns count=%d\n", conns.count);
3317 fprintf(f,"C%d",conn->fd);
3318 DUMPV("%p",conn->,rd); DUMPV("%d",conn->,max_queue);
3319 DUMPV("%d",conn->,stream); DUMPV("%d",conn->,quitting);
3320 DUMPV("%d",conn->,since_activity);
3323 fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3324 fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3325 fprintf(f,"C%d sent", conn->fd); dump_article_list(f,c,&conn->sent);
3327 fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3328 for (i=0; i<conn->xmitu; i++) {
3329 const struct iovec *iv= &conn->xmit[i];
3330 const XmitDetails *xd= &conn->xmitd[i];
3333 case xk_Const: dinfo= xasprintf("Const"); break;
3334 case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break;
3338 fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3339 sanitise(iv->iov_base, iv->iov_len));
3345 DUMPV("%s", , path_lock);
3346 DUMPV("%s", , path_flushing);
3347 DUMPV("%s", , path_defer);
3348 DUMPV("%s", , path_control);
3349 DUMPV("%s", , path_dump);
3350 DUMPV("%s", , globpat_backlog);
3353 if (!!ferror(f) + !!fclose(f)) {
3354 fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3359 /*========== option parsing ==========*/
3361 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3362 static void vbadusage(const char *fmt, va_list al) {
3363 char *m= xvasprintf(fmt,al);
3364 fprintf(stderr, "bad usage: %s\n"
3365 "say --help for help, or read the manpage\n",
3368 syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3372 /*---------- generic option parser ----------*/
3374 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3375 static void badusage(const char *fmt, ...) {
3382 of_seconds= 001000u,
3383 of_boolean= 002000u,
3386 typedef struct Option Option;
3387 typedef void OptionParser(const Option*, const char *val);
3391 const char *lng, *formarg;
3397 static void parse_options(const Option *options, char ***argvp) {
3398 /* on return *argvp is first non-option arg; argc is not updated */
3401 const char *arg= *++(*argvp);
3403 if (*arg != '-') break;
3404 if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3406 while ((a= *++arg)) {
3410 char *equals= strchr(arg,'=');
3411 int len= equals ? (equals - arg) : strlen(arg);
3412 for (o=options; o->shrt || o->lng; o++)
3413 if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3415 badusage("unknown long option --%s",arg);
3418 if (equals) badusage("option --%s does not take a value",o->lng);
3420 } else if (equals) {
3424 if (!arg) badusage("option --%s needs a value for %s",
3425 o->lng, o->formarg);
3428 break; /* eaten the whole argument now */
3430 for (o=options; o->shrt || o->lng; o++)
3433 badusage("unknown short option -%c",a);
3440 if (!arg) badusage("option -%c needs a value for %s",
3441 o->shrt, o->formarg);
3444 break; /* eaten the whole argument now */
3450 #define DELIMPERHAPS(delim,str) (str) ? (delim) : "", (str) ? (str) : ""
3452 static void print_options(const Option *options, FILE *f) {
3454 for (o=options; o->shrt || o->lng; o++) {
3455 char shrt[2] = { o->shrt, 0 };
3456 char *optspec= xasprintf("%s%s%s%s%s",
3457 o->shrt ? "-" : "", shrt,
3458 o->shrt && o->lng ? "|" : "",
3459 DELIMPERHAPS("--", o->lng));
3460 fprintf(f, " %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3465 /*---------- specific option types ----------*/
3467 static void op_integer(const Option *o, const char *val) {
3470 unsigned long ul= strtoul(val,&ep,10);
3471 if (*ep || ep==val || errno || ul>INT_MAX)
3472 badusage("bad integer value for %s",o->lng);
3473 int *store= o->store;
3477 static void op_double(const Option *o, const char *val) {
3478 int *store= o->store;
3481 *store= strtod(val, &ep);
3482 if (*ep || ep==val || errno)
3483 badusage("bad floating point value for %s",o->lng);
3486 static void op_string(const Option *o, const char *val) {
3487 const char **store= o->store;
3491 static void op_seconds(const Option *o, const char *val) {
3492 int *store= o->store;
3496 double v= strtod(val,&ep);
3497 if (ep==val) badusage("bad time/duration value for %s",o->lng);
3499 if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3500 else if (!strcmp(ep,"m") || !strcmp(ep,"min")) unit= 60;
3501 else if (!strcmp(ep,"h") || !strcmp(ep,"hour")) unit= 3600;
3502 else if (!strcmp(ep,"d") || !strcmp(ep,"day")) unit= 86400;
3503 else if (!strcmp(ep,"das")) unit= 10;
3504 else if (!strcmp(ep,"hs")) unit= 100;
3505 else if (!strcmp(ep,"ks")) unit= 1000;
3506 else if (!strcmp(ep,"Ms")) unit= 1000000;
3507 else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3511 if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3515 static void op_setint(const Option *o, const char *val) {
3516 int *store= o->store;
3520 /*---------- specific options ----------*/
3522 static void help(const Option *o, const char *val);
3524 static const Option innduct_options[]= {
3525 {'f',"feedfile", "F", &feedfile, op_string },
3526 {'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 },
3527 {0,"no-daemon", 0, &become_daemon, op_setint, 0 },
3528 {0,"no-streaming", 0, &try_stream, op_setint, 0 },
3529 {0,"no-filemon", 0, &try_filemon, op_setint, 0 },
3530 {'C',"inndconf", "F", &inndconffile, op_string },
3531 {'P',"port", "PORT", &port, op_integer },
3532 {0,"ctrl-sock-dir", 0, &realsockdir, op_string },
3533 {0,"help", 0, 0, help },
3535 {0,"max-connections", "N", &max_connections, op_integer },
3536 {0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer },
3537 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer },
3538 {0,"period-interval", "TIME", &period_seconds, op_seconds },
3540 {0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds },
3541 {0,"stuck-flush-timeout", "TIME", &inndcomm_flush_timeout, op_seconds },
3542 {0,"feedfile-poll", "TIME", &filepoll_seconds, op_seconds },
3544 {0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double },
3545 {0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double },
3547 {0,"reconnect-interval", "PERIOD", &reconnect_delay_periods, op_seconds },
3548 {0,"flush-retry-interval", "PERIOD", &flushfail_retry_periods, op_seconds },
3549 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3550 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3551 {0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds },
3552 {0,"flush-finish-timeout", "PERIOD", &max_separated_periods, op_seconds },
3553 {0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds },
3555 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double },
3556 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer },
3561 static void printusage(FILE *f) {
3562 fputs("usage: innduct [options] site [fqdn]\n"
3563 "available options are:\n", f);
3564 print_options(innduct_options, f);
3567 static void help(const Option *o, const char *val) {
3569 if (ferror(stdout) || fflush(stdout)) {
3570 perror("innduct: writing help");
3576 static void convert_to_periods_rndup(int *store) {
3577 *store += period_seconds-1;
3578 *store /= period_seconds;
3581 int main(int argc, char **argv) {
3587 parse_options(innduct_options, &argv);
3592 if (!sitename) badusage("need site name argument");
3593 remote_host= *argv++;
3594 if (*argv) badusage("too many non-option arguments");
3598 int r= innconf_read(inndconffile);
3599 if (!r) badusage("could not read inn.conf (more info on stderr)");
3601 if (!remote_host) remote_host= sitename;
3603 if (nocheck_thresh < 0 || nocheck_thresh > 100)
3604 badusage("nocheck threshold percentage must be between 0..100");
3605 nocheck_thresh *= 0.01;
3607 if (nocheck_decay < 0.1)
3608 badusage("nocheck decay articles must be at least 0.1");
3609 nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3611 convert_to_periods_rndup(&reconnect_delay_periods);
3612 convert_to_periods_rndup(&flushfail_retry_periods);
3613 convert_to_periods_rndup(&backlog_retry_minperiods);
3614 convert_to_periods_rndup(&backlog_spontrescan_periods);
3615 convert_to_periods_rndup(&spontaneous_flush_periods);
3616 convert_to_periods_rndup(&max_separated_periods);
3617 convert_to_periods_rndup(&need_activity_periods);
3619 if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3620 badusage("bad input data ratio must be between 0..100");
3621 max_bad_data_ratio *= 0.01;
3624 feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3625 } else if (!feedfile[0]) {
3626 badusage("feed filename must be nonempty");
3627 } else if (feedfile[strlen(feedfile)-1]=='/') {
3628 feedfile= xasprintf("%s%s",feedfile,sitename);
3631 const char *feedfile_forbidden= "?*[~#";
3633 while ((c= *feedfile_forbidden++))
3634 if (strchr(feedfile, c))
3635 badusage("feed filename may not contain metacharacter %c",c);
3639 path_lock= xasprintf("%s_lock", feedfile);
3640 path_flushing= xasprintf("%s_flushing", feedfile);
3641 path_defer= xasprintf("%s_defer", feedfile);
3642 path_control= xasprintf("%s_control", feedfile);
3643 path_dump= xasprintf("%s_dump", feedfile);
3644 globpat_backlog= xasprintf("%s_backlog*", feedfile);
3646 oop_source_sys *sysloop= oop_sys_new();
3647 if (!sysloop) sysdie("could not create liboop event loop");
3648 loop= (oop_source*)sysloop;
3652 if (become_daemon) {
3654 for (i=3; i<255; i++)
3655 /* do this now before we open syslog, etc. */
3657 openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3659 int null= open("/dev/null",O_RDWR);
3660 if (null<0) sysfatal("failed to open /dev/null");
3664 xclose(null, "/dev/null original fd",0);
3666 pid_t child1= xfork("daemonise first fork");
3667 if (child1) _exit(0);
3669 pid_t sid= setsid();
3670 if (sid != child1) sysfatal("setsid failed");
3672 pid_t child2= xfork("daemonise second fork");
3673 if (child2) _exit(0);
3677 if (self_pid==-1) sysdie("getpid");
3692 notice("filemon: suppressed by command line option, polling");
3694 filemon_ok= filemon_method_init();
3696 warn("filemon: no file monitoring available, polling");
3699 every(filepoll_seconds,0,filepoll);
3701 every(period_seconds,1,period);
3707 void *run= oop_sys_run(sysloop);
3708 assert(run == OOP_ERROR);
3709 sysdie("event loop failed");