chiark / gitweb /
047d99e6f9f3563903ac17e2310fa1470f93066e
[inn-innduct.git] / backends / innduct.c
1 /*
2  * debugging rune:
3  *  build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
4  */
5
6 /*--
7 flow control notes
8 to ensure articles go away eventually
9 separate queue for each input file
10   queue expiry
11     every period, check head of backlog queue for expiry with SMretrieve
12       if too old: discard, and check next article
13     also check every backlog article as we read it
14   flush expiry
15     after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
16     one-off: eat queued articles from flushing and write them to defer
17     one-off: connfail all connections which have any articles from flushing
18     newly read articles from flushing go straight to defer
19     this should take care of it and get us out of this state
20 to avoid filling up ram needlessly
21   input control
22     limit number of queued articles for each ipf
23     pause/resume inputfile tailing
24 --*/
25
26 /*
27  * Newsfeeds file entries should look like this:
28  *     host.name.of.site[/exclude,exclude,...]\
29  *             :pattern,pattern...[/distribution,distribution...]\
30  *             :Tf,Wnm
31  *             :
32  * or
33  *     sitename[/exclude,exclude,...]\
34  *             :pattern,pattern...[/distribution,distribution...]\
35  *             :Tf,Wnm
36  *             :host.name.of.site
37  *
38  * Four files full of
39  *    token messageid
40  * or might be blanked out
41  *    <spc><spc><spc><spc>....
42  *
43  * F site.name                 main feed file
44  *                                opened/created, then written, by innd
45  *                                read by duct
46  *                                unlinked by duct
47  *                                tokens blanked out by duct when processed
48  *   site.name_lock            lock preventing multiple ducts
49  *                                to hold lock must open,F_SETLK[W]
50  *                                  and then stat to check that locked file
51  *                                  still has name site.name_lock
52  *                                holder of this lock is "duct"
53  *                                (only) lockholder may remove the lockfile
54  * D site.name_flushing        temporary feed file during flush (or crash)
55  *                                hardlink created by duct
56  *                                unlinked by duct
57  *   site.name_defer           431'd articles, still being written,
58  *                                created, written, used by duct
59  *
60  *   site.name_backlog.<date>.<inum>
61  *                             431'd articles, ready for innxmit or duct
62  *                                created (link/mv) by duct
63  *   site.name_backlog<anything-else>  (where <anything-else> does not
64  *                                      contain '#' or '~') eg
65  *   site.name_backlog.manual
66  *                             anything the sysadmin likes (eg, feed files
67  *                             from old feeds to be merged into this one)
68  *                                created (link/mv) by admin
69  *                                may be symlinks (in which case links
70  *                                may be written through, but only links
71  *                                will be removed.
72  *
73  *                             It is safe to remove backlog files manually,
74  *                             if it's desired to throw away the backlog.
75  *
76  * Backlog files are also processed by innduct.  We find the oldest
77  * backlog file which is at least a certain amount old, and feed it
78  * back into our processing.  When every article in it has been read
79  * and processed, we unlink it and look for another backlog file.
80  *
81  * If we don't have a backlog file that we're reading, we close the
82  * defer file that we're writing and make it into a backlog file at
83  * the first convenient opportunity.
84  * -8<-
85
86
87    OVERALL STATES:
88
89                                                                 START
90                                                                   |
91      ,-->--.                                                 check F, D
92      |     |                                                      |
93      |     |                                                      |
94      |     |  <----------------<---------------------------------'|
95      |     |                                       F exists       |
96      |     |                                       D ENOENT       |
97      |     |  duct opens F                                        |
98      |     V                                                      |
99      |  Normal                                                    |
100      |   F: innd writing, duct reading                            |
101      |   D: ENOENT                                                |
102      |     |                                                      |
103      |     |  duct decides time to flush                          |
104      |     |  duct makes hardlink                                 |
105      |     |                                                      |
106      |     V                            <------------------------'|
107      |  Hardlinked                                  F==D          |
108      |   F == D: innd writing, duct reading         both exist    |
109      ^     |                                                      |
110      |     |  duct unlinks F                                      |
111      |     |                        <-----------<-------------<--'|
112      |     |                           open D         F ENOENT    |
113      |     |                           if exists                  |
114      |     |                                                      |
115      |     V                        <---------------------.       |
116      |  Moved                                             |       |
117      |   F: ENOENT                                        |       |
118      |   D: innd writing, duct reading; or ENOENT         |       |
119      |     |                                              |       |
120      |     |  duct requests flush of feed                 |       |
121      |     |   (others can too, harmlessly)               |       |
122      |     V                                              |       |
123      |  Flushing                                          |       |
124      |   F: ENOENT                                        |       |
125      |   D: innd flushing, duct; or ENOENT                |       |
126      |     |                                              |       |
127      |     |   inndcomm flush fails                       |       |
128      |     |`-------------------------->------------------'       |
129      |     |                                                      |
130      |     |   inndcomm reports no such site                      |
131      |     |`---------------------------------------------------- | -.
132      |     |                                                      |  |
133      |     |  innd finishes writing D, creates F                  |  |
134      |     |  inndcomm reports flush successful                   |  |
135      |     |                                                      |  |
136      |     V                                                      |  |
137      |  Separated                                <----------------'  |
138      |   F: innd writing                            F!=D             /
139      |   D: duct reading; or ENOENT                  both exist     /
140      |     |                                                       /
141      |     |  duct gets to the end of D                           /
142      |     |  duct opens F too                                   /
143      |     V                                                    /
144      |  Finishing                                              /
145      |   F: innd writing, duct reading                        |
146      |   D: duct finishing                                    V
147      |     |                                            Dropping
148      |     |  duct finishes processing D                 F: ENOENT
149      |     V  duct unlinks D                             D: duct reading
150      |     |                                                  |
151      `--<--'                                                  | duct finishes
152                                                               |  processing D
153                                                               | duct unlinks D
154                                                               | duct exits
155                                                               V
156                                                         Dropped
157                                                          F: ENOENT
158                                                          D: ENOENT
159                                                          duct not running
160
161    "duct reading" means innduct is reading the file but also
162    overwriting processed tokens.
163
164  * ->8- -^L-
165  *
166  * rune for printing diagrams:
167
168 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
169
170  *
171  */
172
173 /*============================== PROGRAM ==============================*/
174
175 #define _GNU_SOURCE 1
176
177 #include "config.h"
178 #include "storage.h"
179 #include "nntp.h"
180 #include "libinn.h"
181 #include "inndcomm.h"
182
183 #include "inn/list.h"
184 #include "inn/innconf.h"
185
186 #include <sys/uio.h>
187 #include <sys/types.h>
188 #include <sys/wait.h>
189 #include <sys/stat.h>
190 #include <sys/socket.h>
191 #include <sys/un.h>
192 #include <unistd.h>
193 #include <string.h>
194 #include <signal.h>
195 #include <stdio.h>
196 #include <errno.h>
197 #include <syslog.h>
198 #include <fcntl.h>
199 #include <stdarg.h>
200 #include <assert.h>
201 #include <stdlib.h>
202 #include <stddef.h>
203 #include <glob.h>
204 #include <time.h>
205 #include <math.h>
206 #include <ctype.h>
207
208 #include <oop.h>
209 #include <oop-read.h>
210
211 /*----- general definitions, probably best not changed -----*/
212
213 #define CONNCHILD_ESTATUS_STREAM   24
214 #define CONNCHILD_ESTATUS_NOSTREAM 25
215
216 #define INNDCOMMCHILD_ESTATUS_FAIL     26
217 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
218
219 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
220 #define MAX_CONTROL_COMMAND 1000
221
222 #define VA                va_list al;  va_start(al,fmt)
223 #define PRINTF(f,a)       __attribute__((__format__(printf,f,a)))
224 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
225 #define NORET             __attribute__((__noreturn__))
226
227 #define NEW(ptr)              ((ptr)= zxmalloc(sizeof(*(ptr))))
228 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
229
230 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
231
232 #define FOR_CONN(conn) \
233   for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
234
235 /*----- doubly linked lists -----*/
236
237 #define ISNODE(T)   struct node list_node
238 #define DEFLIST(T)                              \
239    typedef struct {                             \
240      union { struct list li; T *for_type; } u;  \
241      int count;                                 \
242    } T##List
243
244 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
245
246 #define LIST_CHECKCANHAVENODE(l,n) \
247   ((void)((n) == ((l).u.for_type))) /* just for the type check */
248
249 #define LIST_ADDSOMEHOW(l,n,list_addsomehow)    \
250  ( LIST_CHECKCANHAVENODE(l,n),                  \
251    list_addsomehow(&(l).u.li, NODE((n))),       \
252    (void)(l).count++                            \
253    )
254
255 #define LIST_REMSOMEHOW(l,list_remsomehow)      \
256  ( (typeof((l).u.for_type))                     \
257    ( (l).count                                  \
258      ? ( (l).count--,                           \
259          list_remsomehow(&(l).u.li) )           \
260      : 0                                        \
261      )                                          \
262    )
263
264
265 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
266 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
267 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
268 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
269
270 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
271 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
272 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
273 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
274
275 #define LIST_REMOVE(l,n)                        \
276  ( LIST_CHECKCANHAVENODE(l,n),                  \
277    list_remove(NODE((n))),                      \
278    (void)(l).count--                            \
279    )
280
281 #define LIST_INSERT(l,n,pred)                                   \
282  ( LIST_CHECKCANHAVENODE(l,n),                                  \
283    LIST_CHECKCANHAVENODE(l,pred),                               \
284    list_insert((struct list*)&(l), NODE((n)), NODE((pred))),    \
285    (void)(l).count++                                            \
286    )
287
288 /*----- type predeclarations -----*/
289
290 typedef struct Conn Conn;
291 typedef struct Article Article;
292 typedef struct InputFile InputFile;
293 typedef struct XmitDetails XmitDetails;
294 typedef struct Filemon_Perfile Filemon_Perfile;
295 typedef enum StateMachineState StateMachineState;
296 typedef struct ControlCommand ControlCommand;
297
298 DEFLIST(Conn);
299 DEFLIST(Article);
300
301 /*----- function predeclarations -----*/
302
303 static void conn_maybe_write(Conn *conn);
304 static void conn_make_some_xmits(Conn *conn);
305 static void *conn_write_some_xmits(Conn *conn);
306
307 static void xmit_free(XmitDetails *d);
308
309 #define SMS(newstate, periods, why) \
310    (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
311 static void statemc_setstate(StateMachineState newsms, int periods,
312                              const char *forlog, const char *why);
313
314 static void statemc_start_flush(const char *why); /* Normal => Flushing */
315 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
316 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
317
318 static void article_done(Article *art, int whichcount);
319
320 static void check_assign_articles(void);
321 static void queue_check_input_done(void);
322
323 static void statemc_check_flushing_done(void);
324 static void statemc_check_backlog_done(void);
325
326 static void postfork(void);
327 static void period(void);
328
329 static void open_defer(void);
330 static void close_defer(void);
331 static void search_backlog_file(void);
332 static void preterminate(void);
333 static void raise_default(int signo) NORET;
334 static char *debug_report_ipf(InputFile *ipf);
335
336 static void inputfile_reading_start(InputFile *ipf);
337 static void inputfile_reading_stop(InputFile *ipf);
338
339 static void filemon_start(InputFile *ipf);
340 static void filemon_stop(InputFile *ipf);
341 static void filemon_callback(InputFile *ipf);
342
343 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
344 static void connfail(Conn *conn, const char *fmt, ...)         PRINTF(2,3);
345
346 static const oop_rd_style peer_rd_style;
347 static oop_rd_call peer_rd_err, peer_rd_ok;
348
349 /*----- configuration options -----*/
350 /* when changing defaults, remember to update the manpage */
351
352 static const char *sitename, *remote_host;
353 static const char *feedfile, *realsockdir="/tmp/innduct.control";
354 static int quiet_multiple=0;
355 static int become_daemon=1, try_filemon=1;
356 static int try_stream=1;
357 static int port=119;
358 static const char *inndconffile;
359
360 static int max_connections=10;
361 static int max_queue_per_conn=200;
362 static int target_max_feedfile_size=100000;
363 static int period_seconds=60;
364 static int filepoll_seconds=5;
365
366 static int connection_setup_timeout=200;
367 static int inndcomm_flush_timeout=100;
368
369 static double nocheck_thresh= 95.0; /* converted from percentage by main */
370 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
371
372 /* all these are initialised to seconds, and converted to periods in main */
373 static int reconnect_delay_periods=1000;
374 static int flushfail_retry_periods=1000;
375 static int backlog_retry_minperiods=50;
376 static int backlog_spontrescan_periods=300;
377 static int spontaneous_flush_periods=100000;
378 static int max_separated_periods=2000;
379 static int need_activity_periods=1000;
380
381 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
382 static int max_bad_data_initial= 30;
383   /* in one corrupt 4096-byte block the number of newlines has
384    * mean 16 and standard deviation 3.99.  30 corresponds to z=+3.5 */
385
386
387 /*----- statistics -----*/
388
389 typedef enum {      /* in queue                 in conn->sent             */
390   art_Unchecked,    /*   not checked, not sent    checking                */
391   art_Wanted,       /*   checked, wanted          sent body as requested  */
392   art_Unsolicited,  /*   -                        sent body without check */
393   art_MaxState,
394 } ArtState;
395
396 static const char *const artstate_names[]=
397   { "Unchecked", "Wanted", "Unsolicited", 0 };
398
399 #define RESULT_COUNTS(RCS,RCN)                  \
400   RCS(sent)                                     \
401   RCS(accepted)                                 \
402   RCN(unwanted)                                 \
403   RCN(rejected)                                 \
404   RCN(deferred)                                 \
405   RCN(missing)                                  \
406   RCN(connretry)
407
408 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
409 #define RCI_TRIPLE_VALS_BASE(counts,x)          \
410        counts[art_Unchecked] x                  \
411        + counts[art_Wanted] x                   \
412        + counts[art_Unsolicited] x,             \
413        counts[art_Unchecked] x                  \
414        , counts[art_Wanted] x                   \
415        , counts[art_Unsolicited] x
416
417 typedef enum {
418 #define RC_INDEX(x) RC_##x,
419   RESULT_COUNTS(RC_INDEX, RC_INDEX)
420   RCI_max
421 } ResultCountIndex;
422
423
424 /*----- transmission buffers -----*/
425
426 #define CONNIOVS 128
427
428 typedef enum {
429   xk_Const, xk_Artdata
430 } XmitKind;
431
432 struct XmitDetails {
433   XmitKind kind;
434   union {
435     ARTHANDLE *sm_art;
436   } info;
437 };
438
439
440 /*----- core operational data structure types -----*/
441
442 struct InputFile {
443   /* This is also an instance of struct oop_readable */
444   struct oop_readable readable; /* first */
445   oop_readable_call *readable_callback;
446   void *readable_callback_user;
447
448   int fd;
449   Filemon_Perfile *filemon;
450
451   oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
452   off_t offset;
453   int skippinglong;
454
455   ArticleList queue;
456   long inprogress; /* includes queue.count and also articles in conns */
457   long autodefer; /* -1 means not doing autodefer */
458
459   int counts[art_MaxState][RCI_max];
460   int readcount_ok, readcount_blank, readcount_err;
461   char path[];
462 };
463
464 struct Article {
465   ISNODE(Article);
466   ArtState state;
467   int midlen, missing;
468   InputFile *ipf;
469   TOKEN token;
470   off_t offset;
471   int blanklen;
472   char messageid[1];
473 };
474
475 #define SMS_LIST(X)                             \
476   X(NORMAL)                                     \
477   X(FLUSHING)                                   \
478   X(FLUSHFAILED)                                \
479   X(SEPARATED)                                  \
480   X(DROPPING)                                   \
481   X(DROPPED)
482
483 enum StateMachineState {
484 #define SMS_DEF_ENUM(s) sm_##s,
485   SMS_LIST(SMS_DEF_ENUM)
486 };
487
488 static const char *sms_names[]= {
489 #define SMS_DEF_NAME(s) #s ,
490   SMS_LIST(SMS_DEF_NAME)
491   0
492 };
493
494 struct Conn {
495   ISNODE(Conn);
496   int fd; /* may be 0, meaning closed (during construction/destruction) */
497   oop_read *rd; /* likewise */
498   int max_queue, stream, quitting;
499   int since_activity; /* periods */
500   ArticleList waiting; /* not yet told peer */
501   ArticleList priority; /* peer says send it now */
502   ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
503   struct iovec xmit[CONNIOVS];
504   XmitDetails xmitd[CONNIOVS];
505   int xmitu;
506 };
507
508
509 /*----- general operational variables -----*/
510
511 /* main initialises */
512 static oop_source *loop;
513 static ConnList conns;
514 static char *path_lock, *path_flushing, *path_defer;
515 static char *path_control, *path_dump;
516 static char *globpat_backlog;
517 static pid_t self_pid;
518
519 /* statemc_init initialises */
520 static StateMachineState sms;
521 static int until_flush;
522 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
523 static FILE *defer;
524
525 /* initialisation to 0 is good */
526 static int until_connect, until_backlog_nextscan;
527 static double accept_proportion;
528 static int nocheck, nocheck_reported, in_child;
529
530 /* for simulation, debugging, etc. */
531 int simulate_flush= -1;
532
533 /*========== logging ==========*/
534
535 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
536 static void logcore(int sysloglevel, const char *fmt, ...) {
537   VA;
538   if (become_daemon) {
539     vsyslog(sysloglevel,fmt,al);
540   } else {
541     if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
542     vfprintf(stderr,fmt,al);
543     putc('\n',stderr);
544   }
545   va_end(al);
546 }
547
548 static void logv(int sysloglevel, const char *pfx, int errnoval,
549                  const char *fmt, va_list al) PRINTF(5,0);
550 static void logv(int sysloglevel, const char *pfx, int errnoval,
551                  const char *fmt, va_list al) {
552   char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
553   vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
554   msgbuf[sizeof(msgbuf)-1]= 0;
555
556   if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
557     sysloglevel= LOG_ERR; /* run by wrong user, probably */
558
559   logcore(sysloglevel, "<%s>%s: %s%s%s",
560          sitename, pfx, msgbuf,
561          errnoval>=0 ? ": " : "",
562          errnoval>=0 ? strerror(errnoval) : "");
563 }
564
565 #define diewrap(fn, pfx, sysloglevel, err, estatus)             \
566   static void fn(const char *fmt, ...) NORET_PRINTF(1,2);       \
567   static void fn(const char *fmt, ...) {                        \
568     preterminate();                                             \
569     VA;                                                         \
570     logv(sysloglevel, pfx, err, fmt, al);                       \
571     exit(estatus);                                              \
572   }
573
574 #define logwrap(fn, pfx, sysloglevel, err)              \
575   static void fn(const char *fmt, ...) PRINTF(1,2);     \
576   static void fn(const char *fmt, ...) {                \
577     VA;                                                 \
578     logv(sysloglevel, pfx, err, fmt, al);               \
579     va_end(al);                                         \
580   }
581
582 diewrap(sysdie,   " critical", LOG_CRIT,    errno, 16);
583 diewrap(die,      " critical", LOG_CRIT,    -1,    16);
584
585 diewrap(sysfatal, " fatal",    LOG_ERR,     errno, 12);
586 diewrap(fatal,    " fatal",    LOG_ERR,     -1,    12);
587
588 logwrap(syswarn,  " warning",  LOG_WARNING, errno);
589 logwrap(warn,     " warning",  LOG_WARNING, -1);
590
591 logwrap(notice,   " notice",   LOG_NOTICE,  -1);
592 logwrap(info,     " info",     LOG_INFO,    -1);
593 logwrap(debug,    " debug",    LOG_DEBUG,   -1);
594
595
596 /*========== utility functions etc. ==========*/
597
598 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
599 static char *xvasprintf(const char *fmt, va_list al) {
600   char *str;
601   int rc= vasprintf(&str,fmt,al);
602   if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
603   return str;
604 }
605 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
606 static char *xasprintf(const char *fmt, ...) {
607   VA;
608   char *str= xvasprintf(fmt,al);
609   va_end(al);
610   return str;
611 }
612
613 static int close_perhaps(int *fd) {
614   if (*fd <= 0) return 0;
615   int r= close(*fd);
616   *fd=0;
617   return r;
618 }
619 static void xclose(int fd, const char *what, const char *what2) {
620   int r= close(fd);
621   if (r) sysdie("close %s%s",what,what2?what2:"");
622 }
623 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
624   if (*fd <= 0) return;
625   xclose(*fd,what,what2);
626   *fd=0;
627 }
628
629 static pid_t xfork(const char *what) {
630   pid_t child;
631
632   child= fork();
633   if (child==-1) sysfatal("cannot fork for %s",what);
634   debug("forked %s %ld", what, (unsigned long)child);
635   if (!child) postfork();
636   return child;
637 }
638
639 static void on_fd_read_except(int fd, oop_call_fd callback) {
640   loop->on_fd(loop, fd, OOP_READ,      callback, 0);
641   loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
642 }
643 static void cancel_fd_read_except(int fd) {
644   loop->cancel_fd(loop, fd, OOP_READ);
645   loop->cancel_fd(loop, fd, OOP_EXCEPTION);
646 }
647
648 static void report_child_status(const char *what, int status) {
649   if (WIFEXITED(status)) {
650     int es= WEXITSTATUS(status);
651     if (es)
652       warn("%s: child died with error exit status %d", what, es);
653   } else if (WIFSIGNALED(status)) {
654     int sig= WTERMSIG(status);
655     const char *sigstr= strsignal(sig);
656     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
657     if (sigstr)
658       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
659     else
660       warn("%s: child died due to unknown fatal signal %d%s",
661            what, sig, coredump);
662   } else {
663     warn("%s: child died with unknown wait status %d", what,status);
664   }
665 }
666
667 static int xwaitpid(pid_t *pid, const char *what) {
668   int status;
669
670   int r= kill(*pid, SIGKILL);
671   if (r) sysdie("cannot kill %s child", what);
672
673   pid_t got= waitpid(*pid, &status, 0);
674   if (got==-1) sysdie("cannot reap %s child", what);
675   if (got==0) die("cannot reap %s child", what);
676
677   *pid= 0;
678
679   return status;
680 }
681
682 static void *zxmalloc(size_t sz) {
683   void *p= xmalloc(sz);
684   memset(p,0,sz);
685   return p;
686 }
687
688 static void xunlink(const char *path, const char *what) {
689   int r= unlink(path);
690   if (r) sysdie("can't unlink %s %s", path, what);
691 }
692
693 static time_t xtime(void) {
694   time_t now= time(0);
695   if (now==-1) sysdie("time(2) failed");
696   return now;
697 }
698
699 static void xsigaction(int signo, const struct sigaction *sa) {
700   int r= sigaction(signo,sa,0);
701   if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
702 }
703
704 static void xsigsetdefault(int signo) {
705   struct sigaction sa;
706   memset(&sa,0,sizeof(sa));
707   sa.sa_handler= SIG_DFL;
708   xsigaction(signo,&sa);
709 }
710
711 static void xgettimeofday(struct timeval *tv_r) {
712   int r= gettimeofday(tv_r,0);
713   if (r) sysdie("gettimeofday(2) failed");
714 }
715
716 static void xsetnonblock(int fd, int nonblocking) {
717   int errnoval= oop_fd_nonblock(fd, nonblocking);
718   if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
719 }
720
721 static void check_isreg(const struct stat *stab, const char *path,
722                         const char *what) {
723   if (!S_ISREG(stab->st_mode))
724     die("%s %s not a plain file (mode 0%lo)",
725         what, path, (unsigned long)stab->st_mode);
726 }
727
728 static void xfstat(int fd, struct stat *stab_r, const char *what) {
729   int r= fstat(fd, stab_r);
730   if (r) sysdie("could not fstat %s", what);
731 }
732
733 static void xfstat_isreg(int fd, struct stat *stab_r,
734                          const char *path, const char *what) {
735   xfstat(fd, stab_r, what);
736   check_isreg(stab_r, path, what);
737 }
738
739 static void xlstat_isreg(const char *path, struct stat *stab,
740                          int *enoent_r /* 0 means ENOENT is fatal */,
741                          const char *what) {
742   int r= lstat(path, stab);
743   if (r) {
744     if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
745     sysdie("could not lstat %s %s", what, path);
746   }
747   if (enoent_r) *enoent_r= 0;
748   check_isreg(stab, path, what);
749 }
750
751 static int samefile(const struct stat *a, const struct stat *b) {
752   assert(S_ISREG(a->st_mode));
753   assert(S_ISREG(b->st_mode));
754   return (a->st_ino == b->st_ino &&
755           a->st_dev == b->st_dev);
756 }
757
758 static char *sanitise(const char *input, int len) {
759   static char sanibuf[100]; /* returns pointer to this buffer! */
760
761   const char *p= input;
762   const char *endp= len>=0 ? input+len : 0;
763   char *q= sanibuf;
764   *q++= '`';
765   for (;;) {
766     if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
767     int c= (!endp || p<endp) ? *p++ : 0;
768     if (!c) { *q++= '\''; *q=0; break; }
769     if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
770     sprintf(q,"\\x%02x",c);
771     q += 4;
772   }
773   return sanibuf;
774 }
775
776 static int isewouldblock(int errnoval) {
777   return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
778 }
779
780 /*========== command and control connections ==========*/
781
782 static int control_master;
783
784 typedef struct ControlConn ControlConn;
785 struct ControlConn {
786   void (*destroy)(ControlConn*);
787   int fd;
788   oop_read *rd;
789   FILE *out;
790   union {
791     struct sockaddr sa;
792     struct sockaddr_un un;
793   } sa;
794   socklen_t salen;
795 };
796
797 static const oop_rd_style control_rd_style= {
798   OOP_RD_DELIM_STRIP, '\n',
799   OOP_RD_NUL_FORBID,
800   OOP_RD_SHORTREC_FORBID
801 };
802
803 static void control_destroy(ControlConn *cc) {
804   cc->destroy(cc);
805 }
806
807 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
808   if (ferror(cc->out) | fflush(cc->out)) {
809     info("CTRL%d write error %s", cc->fd, strerror(errno));
810     control_destroy(cc);
811   }
812 }
813
814 static void control_prompt(ControlConn *cc /* may destroy*/) {
815   fprintf(cc->out, "%s| ", sitename);
816   control_checkouterr(cc);
817 }
818
819 struct ControlCommand {
820   const char *cmd;
821   void (*f)(ControlConn *cc, const ControlCommand *ccmd,
822             const char *arg, size_t argsz);
823   void *xdata;
824   int xval;
825 };
826
827 static const ControlCommand control_commands[];
828
829 #define CCMD(wh)                                                        \
830   static void ccmd_##wh(ControlConn *cc, const ControlCommand *c,       \
831                         const char *arg, size_t argsz)
832
833 CCMD(help) {
834   fputs("commands:\n", cc->out);
835   const ControlCommand *ccmd;
836   for (ccmd=control_commands; ccmd->cmd; ccmd++)
837     fprintf(cc->out, " %s\n", ccmd->cmd);
838   fputs("NB: permissible arguments are not shown above."
839         "  Not all commands listed are safe.  See innduct(8).\n", cc->out);
840 }
841
842 CCMD(flush) {
843   int ok= trigger_flush_ok();
844   if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
845 }
846
847 CCMD(stop) {
848   preterminate();
849   notice("terminating (CTRL%d)",cc->fd);
850   raise_default(SIGTERM);
851   abort();
852 }
853
854 CCMD(dump);
855
856 /* messing with our head: */
857 CCMD(period) { period(); }
858 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
859 CCMD(setint) { *(int*)c->xdata= c->xval; }
860 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
861
862 static const ControlCommand control_commands[]= {
863   { "h",             ccmd_help      },
864   { "flush",         ccmd_flush     },
865   { "stop",          ccmd_stop      },
866   { "dump q",        ccmd_dump, 0,0 },
867   { "dump a",        ccmd_dump, 0,1 },
868
869   { "p",             ccmd_period    },
870
871 #define POKES(cmd,func)                                                 \
872   { cmd "flush",     func,           &until_flush,             1 },     \
873   { cmd "conn",      func,           &until_connect,           0 },     \
874   { cmd "blscan",    func,           &until_backlog_nextscan,  0 },
875 POKES("next ", ccmd_setint)
876 POKES("prod ", ccmd_setint_period)
877
878   { "pretend flush", ccmd_setintarg, &simulate_flush             },
879   { "wedge blscan",  ccmd_setint,    &until_backlog_nextscan, -1 },
880   { 0 }
881 };
882
883 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
884                            const char *errmsg, int errnoval,
885                            const char *data, size_t recsz, void *cc_v) {
886   ControlConn *cc= cc_v;
887
888   if (!data) {
889     info("CTRL%d closed", cc->fd);
890     cc->destroy(cc);
891     return OOP_CONTINUE;
892   }
893
894   if (recsz == 0) goto prompt;
895
896   const ControlCommand *ccmd;
897   for (ccmd=control_commands; ccmd->cmd; ccmd++) {
898     int l= strlen(ccmd->cmd);
899     if (recsz < l) continue;
900     if (recsz > l && data[l] != ' ') continue;
901     if (memcmp(data, ccmd->cmd, l)) continue;
902
903     int argl= (int)recsz - (l+1); 
904     ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
905     goto prompt;
906   }
907
908   fputs("unknown command; h for help\n", cc->out);
909
910  prompt:
911   control_prompt(cc);
912   return OOP_CONTINUE;
913 }
914
915 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
916                             const char *errmsg, int errnoval,
917                             const char *data, size_t recsz, void *cc_v) {
918   ControlConn *cc= cc_v;
919   
920   info("CTRL%d read error %s", cc->fd, errmsg);
921   cc->destroy(cc);
922   return OOP_CONTINUE;
923 }
924
925 static int control_conn_startup(ControlConn *cc /* may destroy*/,
926                                 const char *how) {
927   cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
928   if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
929
930   int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
931                       control_rd_ok, cc,
932                       control_rd_err, cc);
933   if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
934
935   info("CTRL%d %s ready", cc->fd, how);
936   control_prompt(cc);
937   return 0;
938 }
939
940 static void control_stdio_destroy(ControlConn *cc) {
941   if (cc->rd) {
942     oop_rd_cancel(cc->rd);
943     errno= oop_rd_delete_tidy(cc->rd);
944     if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
945   }
946   free(cc);
947 }
948
949 static void control_stdio(void) {
950   NEW_DECL(ControlConn *,cc);
951   cc->destroy= control_stdio_destroy;
952
953   cc->fd= 0;
954   cc->out= stdout;
955   int r= control_conn_startup(cc,"stdio");
956   if (r) cc->destroy(cc);
957 }
958
959 static void control_accepted_destroy(ControlConn *cc) {
960   if (cc->rd) {
961     oop_rd_cancel(cc->rd);
962     oop_rd_delete_kill(cc->rd);
963   }
964   if (cc->out) { fclose(cc->out); cc->fd=0; }
965   close_perhaps(&cc->fd);
966   free(cc);
967 }
968
969 static void *control_master_readable(oop_source *lp, int master,
970                                      oop_event ev, void *u) {
971   NEW_DECL(ControlConn *,cc);
972   cc->destroy= control_accepted_destroy;
973
974   cc->salen= sizeof(cc->sa);
975   cc->fd= accept(master, &cc->sa.sa, &cc->salen);
976   if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
977
978   cc->out= fdopen(cc->fd, "w");
979   if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
980
981   int r= control_conn_startup(cc, "accepted");
982   if (r) goto x;
983
984   return OOP_CONTINUE;
985
986  x:
987   cc->destroy(cc);
988   return OOP_CONTINUE;
989 }
990
991 #define NOCONTROL(...) do{                                              \
992     syswarn("no control socket, because failed to " __VA_ARGS__);       \
993     goto nocontrol;                                                     \
994   }while(0)
995
996 static void control_init(void) {
997   char *real=0;
998   
999   union {
1000     struct sockaddr sa;
1001     struct sockaddr_un un;
1002   } sa;
1003
1004   memset(&sa,0,sizeof(sa));
1005   int maxlen= sizeof(sa.un.sun_path);
1006
1007   int reallen= readlink(path_control, sa.un.sun_path, maxlen);
1008   if (reallen<0) {
1009     if (errno != ENOENT)
1010       NOCONTROL("readlink control socket symlink path %s", path_control);
1011   }
1012   if (reallen >= maxlen) {
1013     debug("control socket symlink path too long (r=%d)",reallen);
1014     xunlink(path_control, "old (overlong) control socket symlink");
1015     reallen= -1;
1016   }
1017   
1018   if (reallen<0) {
1019     struct stat stab;
1020     int r= lstat(realsockdir,&stab);
1021     if (r) {
1022       if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
1023
1024       r= mkdir(realsockdir, 0700);
1025       if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
1026
1027     } else {
1028       uid_t self= geteuid();
1029       if (!S_ISDIR(stab.st_mode) ||
1030           stab.st_uid != self ||
1031           stab.st_mode & 0007) {
1032         warn("no control socket, because real socket directory"
1033              " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
1034              !!S_ISDIR(stab.st_mode),
1035              (unsigned long)stab.st_uid, (unsigned long)self,
1036              (unsigned long)stab.st_mode & 0777UL);
1037         goto nocontrol;
1038       }
1039     }
1040
1041     real= xasprintf("%s/s%lx.%lx", realsockdir,
1042                     (unsigned long)xtime(), (unsigned long)self_pid);
1043     int reallen= strlen(real);
1044
1045     if (reallen >= maxlen) {
1046       warn("no control socket, because tmpnam gave overly-long path"
1047            " %s", real);
1048       goto nocontrol;
1049     }
1050     r= symlink(real, path_control);
1051     if (r) NOCONTROL("make control socket path %s a symlink to real"
1052                      " socket path %s", path_control, real);
1053     memcpy(sa.un.sun_path, real, reallen);
1054   }
1055
1056   int r= unlink(sa.un.sun_path);
1057   if (r && errno!=ENOENT)
1058     NOCONTROL("remove old real socket %s", sa.un.sun_path);
1059
1060   control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1061   if (control_master<0) NOCONTROL("create new control socket");
1062
1063   sa.un.sun_family= AF_UNIX;
1064   int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1065   r= bind(control_master, &sa.sa, sl);
1066   if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1067
1068   r= listen(control_master, 5);
1069   if (r) NOCONTROL("listen");
1070
1071   xsetnonblock(control_master, 1);
1072
1073   loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1074   info("control socket ok, real path %s", sa.un.sun_path);
1075
1076   return;
1077
1078  nocontrol:
1079   free(real);
1080   xclose_perhaps(&control_master, "control master",0);
1081   return;
1082 }
1083
1084 /*========== management of connections ==========*/
1085
1086 static void conn_closefd(Conn *conn, const char *msgprefix) {
1087   int r= close_perhaps(&conn->fd);
1088   if (r) info("C%d %serror closing socket: %s",
1089               conn->fd, msgprefix, strerror(errno));
1090 }
1091
1092 static void conn_dispose(Conn *conn) {
1093   if (!conn) return;
1094   if (conn->rd) {
1095     oop_rd_cancel(conn->rd);
1096     oop_rd_delete_kill(conn->rd);
1097     conn->rd= 0;
1098   }
1099   if (conn->fd) {
1100     loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1101     loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1102   }
1103   conn_closefd(conn,"");
1104   free(conn);
1105   until_connect= reconnect_delay_periods;
1106 }
1107
1108 static void *conn_exception(oop_source *lp, int fd,
1109                             oop_event ev, void *conn_v) {
1110   Conn *conn= conn_v;
1111   unsigned char ch;
1112   assert(fd == conn->fd);
1113   assert(ev == OOP_EXCEPTION);
1114   int r= read(conn->fd, &ch, 1);
1115   if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1116   else connfail(conn,"exceptional condition on socket (peer sent urgent"
1117                 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1118   return OOP_CONTINUE;
1119 }  
1120
1121 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1122   int requeue[art_MaxState];
1123   memset(requeue,0,sizeof(requeue));
1124
1125   Article *art;
1126   
1127   while ((art= LIST_REMHEAD(conn->priority)))
1128     LIST_ADDTAIL(art->ipf->queue, art);
1129
1130   while ((art= LIST_REMHEAD(conn->waiting)))
1131     LIST_ADDTAIL(art->ipf->queue, art);
1132
1133   while ((art= LIST_REMHEAD(conn->sent))) {
1134     requeue[art->state]++;
1135     if (art->state==art_Unsolicited) art->state= art_Unchecked;
1136     LIST_ADDTAIL(art->ipf->queue,art);
1137   }
1138
1139   int i;
1140   XmitDetails *d;
1141   for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1142     xmit_free(d);
1143
1144   char *m= xvasprintf(fmt,al);
1145   warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1146        conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1147   free(m);
1148
1149   LIST_REMOVE(conns,conn);
1150   conn_dispose(conn);
1151   check_assign_articles();
1152 }
1153
1154 static void connfail(Conn *conn, const char *fmt, ...) {
1155   va_list al;
1156   va_start(al,fmt);
1157   vconnfail(conn,fmt,al);
1158   va_end(al);
1159 }
1160
1161 static void check_idle_conns(void) {
1162   Conn *conn;
1163   FOR_CONN(conn)
1164     conn->since_activity++;
1165  search_again:
1166   FOR_CONN(conn) {
1167     if (conn->since_activity <= need_activity_periods) continue;
1168
1169     /* We need to shut this down */
1170     if (conn->quitting)
1171       connfail(conn,"timed out waiting for response to QUIT");
1172     else if (conn->sent.count)
1173       connfail(conn,"timed out waiting for responses");
1174     else if (conn->waiting.count || conn->priority.count)
1175       connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1176     else if (conn->xmitu)
1177       connfail(conn,"peer has been sending responses"
1178                " before receiving our commands!");
1179     else {
1180       static const char quitcmd[]= "QUIT\r\n";
1181       int todo= sizeof(quitcmd)-1;
1182       const char *p= quitcmd;
1183       for (;;) {
1184         int r= write(conn->fd, p, todo);
1185         if (r<0) {
1186           if (isewouldblock(errno))
1187             connfail(conn, "blocked writing QUIT to idle connection");
1188           else
1189             connfail(conn, "failed to write QUIT to idle connection: %s",
1190                      strerror(errno));
1191           break;
1192         }
1193         assert(r<=todo);
1194         todo -= r;
1195         if (!todo) {
1196           conn->quitting= 1;
1197           conn->since_activity= 0;
1198           debug("C%d is idle, quitting", conn->fd);
1199           break;
1200         }
1201       }
1202     }
1203     goto search_again;
1204   }
1205 }  
1206
1207 /*---------- making new connections ----------*/
1208
1209 static pid_t connecting_child;
1210 static int connecting_fdpass_sock;
1211
1212 static void connect_attempt_discard(void) {
1213   if (connecting_child) {
1214     int status= xwaitpid(&connecting_child, "connect");
1215     if (!(WIFEXITED(status) ||
1216           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1217       report_child_status("connect", status);
1218   }
1219   if (connecting_fdpass_sock) {
1220     cancel_fd_read_except(connecting_fdpass_sock);
1221     xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1222   }
1223 }
1224
1225 #define PREP_DECL_MSG_CMSG(msg)                 \
1226   char msgbyte= 0;                              \
1227   struct iovec msgiov;                          \
1228   msgiov.iov_base= &msgbyte;                    \
1229   msgiov.iov_len= 1;                            \
1230   struct msghdr msg;                            \
1231   memset(&msg,0,sizeof(msg));                   \
1232   char msg##cbuf[CMSG_SPACE(sizeof(int))];      \
1233   msg.msg_iov= &msgiov;                         \
1234   msg.msg_iovlen= 1;                            \
1235   msg.msg_control= msg##cbuf;                   \
1236   msg.msg_controllen= sizeof(msg##cbuf);
1237
1238 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1239   Conn *conn= 0;
1240
1241   assert(fd == connecting_fdpass_sock);
1242
1243   PREP_DECL_MSG_CMSG(msg);
1244   
1245   ssize_t rs= recvmsg(fd, &msg, 0);
1246   if (rs<0) {
1247     if (isewouldblock(errno)) return OOP_CONTINUE;
1248     syswarn("failed to read socket from connecting child");
1249     goto x;
1250   }
1251
1252   NEW(conn);
1253   LIST_INIT(conn->waiting);
1254   LIST_INIT(conn->priority);
1255   LIST_INIT(conn->sent);
1256
1257   struct cmsghdr *h= 0;
1258   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1259   if (!h) {
1260     int status= xwaitpid(&connecting_child, "connect child (broken)");
1261
1262     if (WIFEXITED(status)) {
1263       if (WEXITSTATUS(status) != 0 &&
1264           WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1265           WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1266         /* child already reported the problem */;
1267       else {
1268         if (e == OOP_EXCEPTION)
1269           warn("connect: connection child exited code %d but"
1270                " unexpected exception on fdpass socket",
1271                WEXITSTATUS(status));
1272         else
1273           warn("connect: connection child exited code %d but"
1274                " no cmsg (rs=%d)",
1275                WEXITSTATUS(status), (int)rs);
1276       }
1277     } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1278       warn("connect: connection attempt timed out");
1279     } else {
1280       report_child_status("connect", status);
1281     }
1282     goto x;
1283   }
1284
1285 #define CHK(field, val)                                                  \
1286   if (h->cmsg_##field != val) {                                          \
1287     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1288         h->cmsg_##field, val);                                           \
1289     goto x;                                                              \
1290   }
1291   CHK(level, SOL_SOCKET);
1292   CHK(type,  SCM_RIGHTS);
1293   CHK(len,   CMSG_LEN(sizeof(conn->fd)));
1294 #undef CHK
1295
1296   if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1297
1298   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1299
1300   int status;
1301   pid_t got= waitpid(connecting_child, &status, 0);
1302   if (got==-1) sysdie("connect: real wait for child");
1303   assert(got == connecting_child);
1304   connecting_child= 0;
1305
1306   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1307   int es= WEXITSTATUS(status);
1308   switch (es) {
1309   case CONNCHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
1310   case CONNCHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
1311   default:
1312     fatal("connect: child gave unexpected exit status %d", es);
1313   }
1314
1315   /* Phew! */
1316   conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1317
1318   loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1319   conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1320   if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1321   int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1322                      &peer_rd_ok, conn,
1323                      &peer_rd_err, conn);
1324   if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1325
1326   notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1327   LIST_ADDHEAD(conns, conn);
1328
1329   connect_attempt_discard();
1330   check_assign_articles();
1331   return OOP_CONTINUE;
1332
1333  x:
1334   conn_dispose(conn);
1335   connect_attempt_discard();
1336   return OOP_CONTINUE;
1337 }
1338
1339 static int allow_connect_start(void) {
1340   return conns.count < max_connections
1341     && !connecting_child
1342     && !until_connect;
1343 }
1344
1345 static void connect_start(void) {
1346   assert(!connecting_child);
1347   assert(!connecting_fdpass_sock);
1348
1349   info("starting connection attempt");
1350
1351   int socks[2];
1352   int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1353   if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1354
1355   connecting_child= xfork("connection");
1356
1357   if (!connecting_child) {
1358     FILE *cn_from, *cn_to;
1359     char buf[NNTP_STRLEN+100];
1360     int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1361
1362     xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1363
1364     alarm(connection_setup_timeout);
1365     if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1366       int l= strlen(buf);
1367       int stripped=0;
1368       while (l>0) {
1369         unsigned char c= buf[l-1];
1370         if (!isspace(c)) break;
1371         if (c=='\n' || c=='\r') stripped=1;
1372         --l;
1373       }
1374       if (!buf[0]) {
1375         sysfatal("connect: connection attempt failed");
1376       } else {
1377         buf[l]= 0;
1378         fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1379               sanitise(buf,-1));
1380       }
1381     }
1382     if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1383       sysfatal("connect: authentication failed");
1384     if (try_stream) {
1385       if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1386           fflush(cn_to))
1387         sysfatal("connect: could not send MODE STREAM");
1388       buf[sizeof(buf)-1]= 0;
1389       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1390         if (ferror(cn_from))
1391           sysfatal("connect: could not read response to MODE STREAM");
1392         else
1393           fatal("connect: connection close in response to MODE STREAM");
1394       }
1395       int l= strlen(buf);
1396       assert(l>=1);
1397       if (buf[l-1]!='\n')
1398         fatal("connect: response to MODE STREAM is too long: %.100s...",
1399               sanitise(buf,-1));
1400       l--;  if (l>0 && buf[l-1]=='\r') l--;
1401       buf[l]= 0;
1402       char *ep;
1403       int rcode= strtoul(buf,&ep,10);
1404       if (ep != &buf[3])
1405         fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1406
1407       switch (rcode) {
1408       case 203:
1409         exitstatus= CONNCHILD_ESTATUS_STREAM;
1410         break;
1411       case 480:
1412       case 500:
1413         break;
1414       default:
1415         warn("connect: unexpected response to MODE STREAM: %.50s",
1416              sanitise(buf,-1));
1417         exitstatus= 2;
1418         break;
1419       }
1420     }
1421     int fd= fileno(cn_from);
1422
1423     PREP_DECL_MSG_CMSG(msg);
1424     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1425     cmsg->cmsg_level= SOL_SOCKET;
1426     cmsg->cmsg_type=  SCM_RIGHTS;
1427     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
1428     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1429
1430     msg.msg_controllen= cmsg->cmsg_len;
1431     r= sendmsg(socks[1], &msg, 0);
1432     if (r<0) sysdie("sendmsg failed for new connection");
1433     if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1434
1435     _exit(exitstatus);
1436   }
1437
1438   xclose(socks[1], "connecting fdpass child's socket",0);
1439   connecting_fdpass_sock= socks[0];
1440   xsetnonblock(connecting_fdpass_sock, 1);
1441   on_fd_read_except(connecting_fdpass_sock, connchild_event);
1442 }
1443
1444 /*---------- assigning articles to conns, and transmitting ----------*/
1445
1446 static Article *dequeue_from(int peek, InputFile *ipf) {
1447   if (!ipf) return 0;
1448   if (peek) return LIST_HEAD(ipf->queue);
1449   else return LIST_REMHEAD(ipf->queue);
1450 }
1451
1452 static Article *dequeue(int peek) {
1453   Article *art;
1454   art= dequeue_from(peek, flushing_input_file);  if (art) return art;
1455   art= dequeue_from(peek, backlog_input_file);   if (art) return art;
1456   art= dequeue_from(peek, main_input_file);      if (art) return art;
1457   return 0;
1458 }
1459
1460 static void check_assign_articles(void) {
1461   for (;;) {
1462     if (!dequeue(1))
1463       break;
1464
1465     Conn *walk, *use=0;
1466     int spare=0, inqueue=0;
1467
1468     /* Find a connection to offer this article.  We prefer a busy
1469      * connection to an idle one, provided it's not full.  We take the
1470      * first (oldest) and since that's stable, it will mean we fill up
1471      * connections in order.  That way if we have too many
1472      * connections, the spare ones will go away eventually.
1473      */
1474     FOR_CONN(walk) {
1475       if (walk->quitting) continue;
1476       inqueue= walk->sent.count + walk->priority.count
1477              + walk->waiting.count;
1478       spare= walk->max_queue - inqueue;
1479       assert(inqueue <= max_queue_per_conn);
1480       assert(spare >= 0);
1481       if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1482       else if (spare>0) /*working*/ { use= walk; break; }
1483     }
1484     if (use) {
1485       if (!inqueue) use->since_activity= 0; /* reset idle counter */
1486       while (spare>0) {
1487         Article *art= dequeue(0);
1488         if (!art) break;
1489         LIST_ADDTAIL(use->waiting, art);
1490         spare--;
1491       }
1492       conn_maybe_write(use);
1493     } else if (allow_connect_start()) {
1494       until_connect= reconnect_delay_periods;
1495       connect_start();
1496       break;
1497     } else {
1498       break;
1499     }
1500   }
1501 }
1502
1503 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1504   conn_maybe_write(u);
1505   return OOP_CONTINUE;
1506 }
1507
1508 static void conn_maybe_write(Conn *conn)  {
1509   for (;;) {
1510     conn_make_some_xmits(conn);
1511     if (!conn->xmitu) {
1512       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1513       return;
1514     }
1515
1516     void *rp= conn_write_some_xmits(conn);
1517     if (rp==OOP_CONTINUE) {
1518       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1519       return;
1520     } else if (rp==OOP_HALT) {
1521       return;
1522     } else if (!rp) {
1523       /* transmitted everything */
1524     } else {
1525       abort();
1526     }
1527   }
1528 }
1529
1530 /*---------- expiry and deferral ----------*/
1531
1532 static void article_defer(Article *art /* not on a queue */, int whichcount) {
1533   open_defer();
1534   if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1535       || fflush(defer))
1536     sysfatal("write to defer file %s",path_defer);
1537   article_done(art, whichcount);
1538 }
1539
1540 static int article_check_expired(Article *art /* must be queued, not conn */) {
1541   ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
1542   if (artdata) { SMfreearticle(artdata); return 0; }
1543
1544   LIST_REMOVE(art->ipf->queue, art);
1545   art->missing= 1;
1546   art->ipf->counts[art_Unchecked][RC_missing]++;
1547   article_done(art,-1);
1548   return 1;
1549 }
1550
1551 static void inputfile_queue_check_expired(InputFile *ipf) {
1552   if (!ipf) return;
1553
1554   for (;;) {
1555     Article *art= LIST_HEAD(ipf->queue);
1556     int exp= article_check_expired(art);
1557     if (!exp) break;
1558   }
1559 }
1560
1561 static void article_autodefer(InputFile *ipf, Article *art) {
1562   ipf->autodefer++;
1563   article_defer(art,-1);
1564 }
1565
1566 static int has_article_in(const ArticleList *al, InputFile *ipf) {
1567   Article *art;
1568   for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
1569     if (art->ipf == ipf) return 1;
1570   return 0;
1571 }
1572
1573 static void autodefer_input_file(InputFile *ipf) {
1574   ipf->autodefer= 0;
1575
1576   Article *art;
1577   while ((art= LIST_REMHEAD(ipf->queue)))
1578     article_autodefer(ipf, art);
1579
1580   if (ipf->inprogress) {
1581     Conn *walk;
1582     FOR_CONN(walk) {
1583       if (has_article_in(&walk->waiting,  ipf) ||
1584           has_article_in(&walk->priority, ipf) ||
1585           has_article_in(&walk->sent,     ipf))
1586         walk->quitting= -1;
1587     }
1588     while (ipf->inprogress) {
1589       FOR_CONN(walk)
1590         if (walk->quitting < 0) goto found;
1591       abort(); /* where are they ?? */
1592
1593     found:
1594       connfail(walk, "connection is stuck or crawling,"
1595                " and we need to finish flush");
1596     }
1597   }
1598 }
1599
1600 /*========== article transmission ==========*/
1601
1602 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1603                   XmitKind kind) { /* caller must then fill in details */
1604   struct iovec *v= &conn->xmit[conn->xmitu];
1605   XmitDetails *d= &conn->xmitd[conn->xmitu++];
1606   v->iov_base= (char*)data;
1607   v->iov_len= len;
1608   d->kind= kind;
1609   return d;
1610 }
1611
1612 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1613   xmit_core(conn,data,len, xk_Const);
1614 }
1615 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1616
1617 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1618   XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1619   d->info.sm_art= ah;
1620 }
1621
1622 static void xmit_free(XmitDetails *d) {
1623   switch (d->kind) {
1624   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1625   case xk_Const:                                  break;
1626   default: abort();
1627   }
1628 }
1629
1630 static void *conn_write_some_xmits(Conn *conn) {
1631   /* return values:
1632    *      0:            nothing more to write, no need to call us again
1633    *      OOP_CONTINUE: more to write but fd not writeable
1634    *      OOP_HALT:     disaster, have destroyed conn
1635    */
1636   for (;;) {
1637     int count= conn->xmitu;
1638     if (!count) return 0;
1639
1640     if (count > IOV_MAX) count= IOV_MAX;
1641     ssize_t rs= writev(conn->fd, conn->xmit, count);
1642     if (rs < 0) {
1643       if (isewouldblock(errno)) return OOP_CONTINUE;
1644       connfail(conn, "write failed: %s", strerror(errno));
1645       return OOP_HALT;
1646     }
1647     assert(rs > 0);
1648
1649     int done;
1650     for (done=0; rs && done<conn->xmitu; done++) {
1651       struct iovec *vp= &conn->xmit[done];
1652       XmitDetails *dp= &conn->xmitd[done];
1653       if (rs > vp->iov_len) {
1654         rs -= vp->iov_len;
1655         xmit_free(dp);
1656       } else {
1657         vp->iov_base= (char*)vp->iov_base + rs;
1658         vp->iov_len -= rs;
1659       }
1660     }
1661     int newu= conn->xmitu - done;
1662     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
1663     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1664     conn->xmitu= newu;
1665   }
1666 }
1667
1668 static void conn_make_some_xmits(Conn *conn) {
1669   for (;;) {
1670     if (conn->xmitu+5 > CONNIOVS)
1671       break;
1672
1673     Article *art= LIST_REMHEAD(conn->priority);
1674     if (!art) art= LIST_REMHEAD(conn->waiting);
1675     if (!art) break;
1676
1677     if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1678       /* actually send it */
1679
1680       ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1681
1682       art->state=
1683         art->state == art_Unchecked ? art_Unsolicited :
1684         art->state == art_Wanted    ? art_Wanted      :
1685         (abort(),-1);
1686
1687       if (!artdata) art->missing= 1;
1688       art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1689
1690       if (conn->stream) {
1691         if (artdata) {
1692           XMIT_LITERAL("TAKETHIS ");
1693           xmit_noalloc(conn, art->messageid, art->midlen);
1694           XMIT_LITERAL("\r\n");
1695           xmit_artbody(conn, artdata);
1696         } else {
1697           article_done(art, -1);
1698           continue;
1699         }
1700       } else {
1701         /* we got 235 from IHAVE */
1702         if (artdata) {
1703           xmit_artbody(conn, artdata);
1704         } else {
1705           XMIT_LITERAL(".\r\n");
1706         }
1707       }
1708
1709       LIST_ADDTAIL(conn->sent, art);
1710
1711     } else {
1712       /* check it */
1713
1714       if (conn->stream)
1715         XMIT_LITERAL("CHECK ");
1716       else
1717         XMIT_LITERAL("IHAVE ");
1718       xmit_noalloc(conn, art->messageid, art->midlen);
1719       XMIT_LITERAL("\r\n");
1720
1721       assert(art->state == art_Unchecked);
1722       art->ipf->counts[art->state][RC_sent]++;
1723       LIST_ADDTAIL(conn->sent, art);
1724     }
1725   }
1726 }
1727
1728 /*========== handling responses from peer ==========*/
1729
1730 static const oop_rd_style peer_rd_style= {
1731   OOP_RD_DELIM_STRIP, '\n',
1732   OOP_RD_NUL_FORBID,
1733   OOP_RD_SHORTREC_FORBID
1734 };
1735
1736 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1737                          const char *errmsg, int errnoval,
1738                          const char *data, size_t recsz, void *conn_v) {
1739   Conn *conn= conn_v;
1740   connfail(conn, "error receiving from peer: %s", errmsg);
1741   return OOP_CONTINUE;
1742 }
1743
1744 static Article *article_reply_check(Conn *conn, const char *response,
1745                                     int code_indicates_streaming,
1746                                     int must_have_sent
1747                                         /* 1:yes, -1:no, 0:dontcare */,
1748                                     const char *sanitised_response) {
1749   Article *art= LIST_HEAD(conn->sent);
1750
1751   if (!art) {
1752     connfail(conn,
1753              "peer gave unexpected response when no commands outstanding: %s",
1754              sanitised_response);
1755     return 0;
1756   }
1757
1758   if (code_indicates_streaming) {
1759     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1760     if (!conn->stream) {
1761       connfail(conn, "peer gave streaming response code "
1762                " to IHAVE or subsequent body: %s", sanitised_response);
1763       return 0;
1764     }
1765     const char *got_mid= response+4;
1766     int got_midlen= strcspn(got_mid, " \n\r");
1767     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1768       connfail(conn, "peer gave streaming response with syntactically invalid"
1769                " messageid: %s", sanitised_response);
1770       return 0;
1771     }
1772     if (got_midlen != art->midlen ||
1773         memcmp(got_mid, art->messageid, got_midlen)) {
1774       connfail(conn, "peer gave streaming response code to wrong article -"
1775                " probable synchronisation problem; we offered: %s;"
1776                " peer said: %s",
1777                art->messageid, sanitised_response);
1778       return 0;
1779     }
1780   } else {
1781     if (conn->stream) {
1782       connfail(conn, "peer gave non-streaming response code to"
1783                " CHECK/TAKETHIS: %s", sanitised_response);
1784       return 0;
1785     }
1786   }
1787
1788   if (must_have_sent>0 && art->state < art_Wanted) {
1789     connfail(conn, "peer says article accepted but"
1790              " we had not sent the body: %s", sanitised_response);
1791     return 0;
1792   }
1793   if (must_have_sent<0 && art->state >= art_Wanted) {
1794     connfail(conn, "peer says please sent the article but we just did: %s",
1795              sanitised_response);
1796     return 0;
1797   }
1798
1799   Article *art_again= LIST_REMHEAD(conn->sent);
1800   assert(art_again == art);
1801   return art;
1802 }
1803
1804 static void update_nocheck(int accepted) {
1805   accept_proportion *= nocheck_decay;
1806   accept_proportion += accepted * (1.0 - nocheck_decay);
1807   int new_nocheck= accept_proportion >= nocheck_thresh;
1808   if (new_nocheck && !nocheck_reported) {
1809     notice("entering nocheck mode for the first time");
1810     nocheck_reported= 1;
1811   } else if (new_nocheck != nocheck) {
1812     debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1813   }
1814   nocheck= new_nocheck;
1815 }
1816
1817 static void article_done(Article *art, int whichcount) {
1818   if (!art->missing) art->ipf->counts[art->state][whichcount]++;
1819
1820   if (whichcount == RC_accepted) update_nocheck(1);
1821   else if (whichcount == RC_unwanted) update_nocheck(0);
1822
1823   InputFile *ipf= art->ipf;
1824
1825   while (art->blanklen) {
1826     static const char spaces[]=
1827       "                                                                "
1828       "                                                                "
1829       "                                                                "
1830       "                                                                "
1831       "                                                                "
1832       "                                                                "
1833       "                                                                "
1834       "                                                                "
1835       "                                                                ";
1836     int w= art->blanklen;  if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1837     int r= pwrite(ipf->fd, spaces, w, art->offset);
1838     if (r==-1) {
1839       if (errno==EINTR) continue;
1840       sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1841              art->messageid, art->blanklen,
1842              (unsigned long)art->offset, ipf->path);
1843     }
1844     assert(r>=0 && r<=w);
1845     art->blanklen -= w;
1846     art->offset += w;
1847   }
1848
1849   ipf->inprogress--;
1850   assert(ipf->inprogress >= 0);
1851   free(art);
1852
1853   if (!ipf->inprogress && ipf != main_input_file)
1854     queue_check_input_done();
1855 }
1856
1857 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1858                         const char *errmsg, int errnoval,
1859                         const char *data, size_t recsz, void *conn_v) {
1860   Conn *conn= conn_v;
1861
1862   if (ev == OOP_RD_EOF) {
1863     connfail(conn, "unexpected EOF from peer");
1864     return OOP_CONTINUE;
1865   }
1866   assert(ev == OOP_RD_OK);
1867
1868   char *sani= sanitise(data,-1);
1869
1870   char *ep;
1871   unsigned long code= strtoul(data, &ep, 10);
1872   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1873     connfail(conn, "badly formatted response from peer: %s", sani);
1874     return OOP_CONTINUE;
1875   }
1876
1877   int conn_busy=
1878     conn->waiting.count ||
1879     conn->priority.count ||
1880     conn->sent.count ||
1881     conn->xmitu;
1882
1883   if (conn->quitting) {
1884     if (code!=205 && code!=503) {
1885       connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1886     } else {
1887       notice("C%d idle connection closed by us", conn->fd);
1888       assert(!conn_busy);
1889       LIST_REMOVE(conns,conn);
1890       conn_dispose(conn);
1891     }
1892     return OOP_CONTINUE;
1893   }
1894
1895   conn->since_activity= 0;
1896   Article *art;
1897
1898 #define GET_ARTICLE(musthavesent) do{                                         \
1899     art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1900     if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
1901   }while(0) 
1902
1903 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{       \
1904     code_streaming= (streaming);                                \
1905     GET_ARTICLE(musthavesent);                                  \
1906     article_done(art, RC_##how);                                \
1907     goto dealtwith;                                             \
1908   }while(0)
1909
1910 #define PEERBADMSG(m) do {                                      \
1911     connfail(conn, m ": %s", sani);  return OOP_CONTINUE;       \
1912   }while(0)
1913
1914   int code_streaming= 0;
1915
1916   switch (code) {
1917
1918   case 400: PEERBADMSG("peer stopped accepting articles");
1919   default:  PEERBADMSG("peer sent unexpected message");
1920
1921   case 503:
1922     if (conn_busy) PEERBADMSG("peer timed us out");
1923     notice("C%d idle connection closed by peer", conn->fd);
1924     LIST_REMOVE(conns,conn);
1925     conn_dispose(conn);
1926     return OOP_CONTINUE;
1927
1928   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1929   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1930
1931   case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1932   case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1933
1934   case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1935   case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1936
1937   case 238: /* CHECK says send it */
1938     code_streaming= 1;
1939   case 335: /* IHAVE says send it */
1940     GET_ARTICLE(-1);
1941     assert(art->state == art_Unchecked);
1942     art->ipf->counts[art->state][RC_accepted]++;
1943     art->state= art_Wanted;
1944     LIST_ADDTAIL(conn->priority, art);
1945     break;
1946
1947   case 431: /* CHECK or TAKETHIS says try later */
1948     code_streaming= 1;
1949   case 436: /* IHAVE says try later */
1950     GET_ARTICLE(0);
1951     article_defer(art, RC_deferred);
1952     break;
1953
1954   }
1955 dealtwith:
1956
1957   conn_maybe_write(conn);
1958   check_assign_articles();
1959   return OOP_CONTINUE;
1960 }
1961
1962
1963 /*========== monitoring of input files ==========*/
1964
1965 static void feedfile_eof(InputFile *ipf) {
1966   assert(ipf != main_input_file); /* promised by tailing_try_read */
1967   inputfile_reading_stop(ipf);
1968
1969   if (ipf == flushing_input_file) {
1970     assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1971     if (main_input_file) inputfile_reading_start(main_input_file);
1972     statemc_check_flushing_done();
1973   } else if (ipf == backlog_input_file) {
1974     statemc_check_backlog_done();
1975   } else {
1976     abort(); /* supposed to wait rather than get EOF on main input file */
1977   }
1978 }
1979
1980 static InputFile *open_input_file(const char *path) {
1981   int fd= open(path, O_RDWR);
1982   if (fd<0) {
1983     if (errno==ENOENT) return 0;
1984     sysfatal("unable to open input file %s", path);
1985   }
1986   assert(fd>0);
1987
1988   InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
1989   memset(ipf,0,sizeof(*ipf));
1990
1991   ipf->fd= fd;
1992   ipf->autodefer= -1;
1993   LIST_INIT(ipf->queue);
1994   strcpy(ipf->path, path);
1995
1996   return ipf;
1997 }
1998
1999 static void close_input_file(InputFile *ipf) { /* does not free */
2000   assert(!ipf->readable_callback); /* must have had ->on_cancel */
2001   assert(!ipf->filemon); /* must have had inputfile_reading_stop */
2002   assert(!ipf->rd); /* must have had inputfile_reading_stop */
2003   assert(!ipf->inprogress); /* no dangling pointers pointing here */
2004   xclose_perhaps(&ipf->fd, "input file ", ipf->path);
2005 }
2006
2007
2008 /*---------- dealing with articles read in the input file ----------*/
2009
2010 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
2011                                    const char *data, const char *how) {
2012   warn("corrupted file: %s, offset %lu: %s: in %s",
2013        ipf->path, (unsigned long)offset, how, sanitise(data,-1));
2014   ipf->readcount_err++;
2015   if (ipf->readcount_err > max_bad_data_initial +
2016       (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
2017     die("too much garbage in input file!  (%d errs, %d ok, %d blank)",
2018         ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
2019   return OOP_CONTINUE;
2020 }
2021
2022 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
2023                                oop_rd_event ev, const char *errmsg,
2024                                int errnoval, const char *data, size_t recsz,
2025                                void *ipf_v) {
2026   InputFile *ipf= ipf_v;
2027   assert(ev == OOP_RD_SYSTEM);
2028   errno= errnoval;
2029   sysdie("error reading input file: %s, offset %lu",
2030          ipf->path, (unsigned long)ipf->offset);
2031 }
2032
2033 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
2034                                   oop_rd_event ev, const char *errmsg,
2035                                   int errnoval, const char *data, size_t recsz,
2036                                   void *ipf_v) {
2037   InputFile *ipf= ipf_v;
2038   Article *art;
2039   char tokentextbuf[sizeof(TOKEN)*2+3];
2040
2041   if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
2042
2043   off_t old_offset= ipf->offset;
2044   ipf->offset += recsz + !!(ev == OOP_RD_OK);
2045
2046 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
2047
2048   if (ev==OOP_RD_PARTREC)
2049     feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
2050     /* but process it anyway */
2051
2052   if (ipf->skippinglong) {
2053     if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
2054     return OOP_CONTINUE;
2055   }
2056   if (ev==OOP_RD_LONG) {
2057     ipf->skippinglong= 1;
2058     X_BAD_DATA("overly long line");
2059   }
2060
2061   if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
2062   if (!recsz) X_BAD_DATA("empty line");
2063
2064   if (data[0]==' ') {
2065     if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
2066     ipf->readcount_blank++;
2067     return OOP_CONTINUE;
2068   }
2069
2070   char *space= strchr(data,' ');
2071   int tokenlen= space-data;
2072   int midlen= (int)recsz-tokenlen-1;
2073   if (midlen <= 2) X_BAD_DATA("no room for messageid");
2074   if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
2075
2076   if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
2077   memcpy(tokentextbuf, data, tokenlen);
2078   tokentextbuf[tokenlen]= 0;
2079   if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
2080
2081   ipf->readcount_ok++;
2082
2083   art= xmalloc(sizeof(*art) - 1 + midlen + 1);
2084   memset(art,0,sizeof(*art));
2085   art->state= art_Unchecked;
2086   art->midlen= midlen;
2087   art->ipf= ipf;  ipf->inprogress++;
2088   art->token= TextToToken(tokentextbuf);
2089   art->offset= old_offset;
2090   art->blanklen= recsz;
2091   strcpy(art->messageid, space+1);
2092   LIST_ADDTAIL(ipf->queue, art);
2093
2094   if (ipf->autodefer >= 0)
2095     article_autodefer(ipf, art);
2096   else if (ipf==backlog_input_file)
2097     article_check_expired(art);
2098
2099   if (sms==sm_NORMAL && ipf==main_input_file &&
2100       ipf->offset >= target_max_feedfile_size)
2101     statemc_start_flush("feed file size");
2102
2103   check_assign_articles();
2104   return OOP_CONTINUE;
2105 }
2106
2107 /*========== tailing input file ==========*/
2108
2109 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
2110                                      void *user) {
2111   InputFile *ipf= user;
2112   return ipf->readable_callback(loop, &ipf->readable,
2113                                 ipf->readable_callback_user);
2114 }
2115
2116 static void tailing_on_cancel(struct oop_readable *rable) {
2117   InputFile *ipf= (void*)rable;
2118
2119   if (ipf->filemon) filemon_stop(ipf);
2120   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2121   ipf->readable_callback= 0;
2122 }
2123
2124 static void tailing_queue_readable(InputFile *ipf) {
2125   /* lifetime of ipf here is OK because destruction will cause
2126    * on_cancel which will cancel this callback */
2127   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2128 }
2129
2130 static int tailing_on_readable(struct oop_readable *rable,
2131                                 oop_readable_call *cb, void *user) {
2132   InputFile *ipf= (void*)rable;
2133
2134   tailing_on_cancel(rable);
2135   ipf->readable_callback= cb;
2136   ipf->readable_callback_user= user;
2137   filemon_start(ipf);
2138
2139   tailing_queue_readable(ipf);
2140   return 0;
2141 }
2142
2143 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2144                                 size_t length) {
2145   InputFile *ipf= (void*)rable;
2146   for (;;) {
2147     ssize_t r= read(ipf->fd, buffer, length);
2148     if (r==-1) {
2149       if (errno==EINTR) continue;
2150       return r;
2151     }
2152     if (!r) {
2153       if (ipf==main_input_file) {
2154         errno=EAGAIN;
2155         return -1;
2156       } else if (ipf==flushing_input_file) {
2157         assert(ipf->rd);
2158         assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2159       } else if (ipf==backlog_input_file) {
2160         assert(ipf->rd);
2161       } else {
2162         abort();
2163       }
2164     }
2165     tailing_queue_readable(ipf);
2166     return r;
2167   }
2168 }
2169
2170 /*---------- filemon implemented with inotify ----------*/
2171
2172 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2173 #define HAVE_FILEMON
2174
2175 #include <sys/inotify.h>
2176
2177 static int filemon_inotify_fd;
2178 static int filemon_inotify_wdmax;
2179 static InputFile **filemon_inotify_wd2ipf;
2180
2181 struct Filemon_Perfile {
2182   int wd;
2183 };
2184
2185 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2186   int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2187   if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2188
2189   if (wd >= filemon_inotify_wdmax) {
2190     int newmax= wd+2;
2191     filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2192                                  sizeof(*filemon_inotify_wd2ipf) * newmax);
2193     memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2194            sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2195     filemon_inotify_wdmax= newmax;
2196   }
2197
2198   assert(!filemon_inotify_wd2ipf[wd]);
2199   filemon_inotify_wd2ipf[wd]= ipf;
2200
2201   debug("filemon inotify startfile %p wd=%d wdmax=%d",
2202         ipf, wd, filemon_inotify_wdmax);
2203
2204   pf->wd= wd;
2205 }
2206
2207 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2208   int wd= pf->wd;
2209   debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2210   int r= inotify_rm_watch(filemon_inotify_fd, wd);
2211   if (r) sysdie("inotify_rm_watch");
2212   filemon_inotify_wd2ipf[wd]= 0;
2213 }
2214
2215 static void *filemon_inotify_readable(oop_source *lp, int fd,
2216                                       oop_event e, void *u) {
2217   struct inotify_event iev;
2218   for (;;) {
2219     int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2220     if (r==-1) {
2221       if (isewouldblock(errno)) break;
2222       sysdie("read from inotify master");
2223     } else if (r==sizeof(iev)) {
2224       assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2225     } else {
2226       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2227     }
2228     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2229     debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2230     filemon_callback(ipf);
2231   }
2232   return OOP_CONTINUE;
2233 }
2234
2235 static int filemon_method_init(void) {
2236   filemon_inotify_fd= inotify_init();
2237   if (filemon_inotify_fd<0) {
2238     syswarn("filemon/inotify: inotify_init failed");
2239     return 0;
2240   }
2241   xsetnonblock(filemon_inotify_fd, 1);
2242   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2243
2244   debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2245   return 1;
2246 }
2247
2248 static void filemon_method_dump_info(FILE *f) {
2249   int i;
2250   fprintf(f,"inotify");
2251   DUMPV("%d",,filemon_inotify_fd);
2252   DUMPV("%d",,filemon_inotify_wdmax);
2253   for (i=0; i<filemon_inotify_wdmax; i++)
2254     fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i],);
2255 }
2256
2257 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2258
2259 /*---------- filemon dummy implementation ----------*/
2260
2261 #if !defined(HAVE_FILEMON)
2262
2263 struct Filemon_Perfile { int dummy; };
2264
2265 static int filemon_method_init(void) {
2266   warn("filemon/dummy: no filemon method compiled in");
2267   return 0;
2268 }
2269 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2270 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2271 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2272
2273 #endif /* !HAVE_FILEMON */
2274
2275 /*---------- filemon generic interface ----------*/
2276
2277 static void filemon_start(InputFile *ipf) {
2278   assert(!ipf->filemon);
2279
2280   NEW(ipf->filemon);
2281   filemon_method_startfile(ipf, ipf->filemon);
2282 }
2283
2284 static void filemon_stop(InputFile *ipf) {
2285   if (!ipf->filemon) return;
2286   filemon_method_stopfile(ipf, ipf->filemon);
2287   free(ipf->filemon);
2288   ipf->filemon= 0;
2289 }
2290
2291 static void filemon_callback(InputFile *ipf) {
2292   if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2293     ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2294 }
2295
2296 /*---------- interface to start and stop an input file ----------*/
2297
2298 static const oop_rd_style feedfile_rdstyle= {
2299   OOP_RD_DELIM_STRIP, '\n',
2300   OOP_RD_NUL_PERMIT,
2301   OOP_RD_SHORTREC_LONG,
2302 };
2303
2304 static void inputfile_reading_start(InputFile *ipf) {
2305   assert(!ipf->rd);
2306   ipf->readable.on_readable= tailing_on_readable;
2307   ipf->readable.on_cancel=   tailing_on_cancel;
2308   ipf->readable.try_read=    tailing_try_read;
2309   ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2310   ipf->readable.delete_kill= 0;
2311
2312   ipf->readable_callback= 0;
2313   ipf->readable_callback_user= 0;
2314
2315   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2316   assert(ipf->rd);
2317
2318   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2319                      feedfile_got_article,ipf, feedfile_read_err, ipf);
2320   if (r) sysdie("unable start reading feedfile %s",ipf->path);
2321 }
2322
2323 static void inputfile_reading_stop(InputFile *ipf) {
2324   assert(ipf->rd);
2325   oop_rd_cancel(ipf->rd);
2326   oop_rd_delete(ipf->rd);
2327   ipf->rd= 0;
2328   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2329 }
2330
2331
2332 /*========== interaction with innd - state machine ==========*/
2333
2334 /* See official state diagram at top of file.  We implement
2335  * this as follows:
2336  * -8<-
2337
2338             .=======.
2339             ||START||
2340             `======='
2341                 |
2342                 | open F
2343                 |
2344                 |    F ENOENT
2345                 |`---------------------------------------------------.
2346       F OPEN OK |                                                    |
2347                 |`---------------- - - -                             |
2348        D ENOENT |       D EXISTS   see OVERALL STATES diagram        |
2349                 |                  for full startup logic            |
2350      ,--------->|                                                    |
2351      |          V                                                    |
2352      |     ============                                       try to |
2353      |      NORMAL                                            open D |
2354      |     [Normal]                                                  |
2355      |      main F tail                                              |
2356      |     ============                                              V
2357      |          |                                                    |
2358      |          | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT            |
2359      ^          | hardlink F to D                                    |
2360      |     [Hardlinked]                                              |
2361      |          | unlink F                                           |
2362      |          | our handle onto F is now onto D                    |
2363      |     [Moved]                                                   |
2364      |          |                                                    |
2365      |          |<-------------------<---------------------<---------+
2366      |          |                                                    |
2367      |          | spawn inndcomm flush                               |
2368      |          V                                                    |
2369      |     ==================                                        |
2370      |      FLUSHING[-ABSENT]                                        |
2371      |     [Flushing]                                                |
2372      |     main D tail/none                                          |
2373      |     ==================                                        |
2374      |          |                                                    |
2375      |          |   INNDCOMM FLUSH FAILS                             ^
2376      |          |`----------------------->----------.                |
2377      |          |                                   |                |
2378      |          |   NO SUCH SITE                    V                |
2379      ^          |`--------------->----.         ==================== |
2380      |          |                      \        FLUSHFAILED[-ABSENT] |
2381      |          |                       \         [Moved]            |
2382      |          | FLUSH OK               \       main D tail/none    |
2383      |          | open F                  \     ==================== |
2384      |          |                          \        |                |
2385      |          |                           \       | TIME TO RETRY  |
2386      |          |`------->----.     ,---<---'\      `----------------'
2387      |          |    D NONE   |     | D NONE  `----.
2388      |          V             |     |              V
2389      |     =============      V     V             ============
2390      |      SEPARATED-1       |     |              DROPPING-1
2391      |      flsh->rd!=0       |     |              flsh->rd!=0
2392      |     [Separated]        |     |             [Dropping]
2393      |      main F idle       |     |              main none
2394      |      flsh D tail       |     |              flsh D tail
2395      |     =============      |     |             ============
2396      |          |             |     | install       |
2397      ^          | EOF ON D    |     |  defer        | EOF ON D
2398      |          V             |     |               V
2399      |     ===============    |     |             ===============
2400      |      SEPARATED-2       |     |              DROPPING-2
2401      |      flsh->rd==0       |     V              flsh->rd==0
2402      |     [Finishing]        |     |             [Dropping]
2403      |      main F tail       |     `.             main none
2404      |      flsh D closed     |       `.           flsh D closed
2405      |     ===============    V         `.        ===============
2406      |          |                         `.          |
2407      |          | ALL D PROCESSED           `.        | ALL D PROCESSED
2408      |          V install defer as backlog    `.      | install defer
2409      ^          | close D                       `.    | close D
2410      |          | unlink D                        `.  | unlink D
2411      |          |                                  |  |
2412      |          |                                  V  V
2413      `----------'                               ==============
2414                                                  DROPPED
2415                                                 [Dropped]
2416                                                  main none
2417                                                  flsh none
2418                                                  some backlog
2419                                                 ==============
2420                                                       |
2421                                                       | ALL BACKLOG DONE
2422                                                       |
2423                                                       | unlink lock
2424                                                       | exit
2425                                                       V
2426                                                   ==========
2427                                                    (ESRCH)
2428                                                   [Droppped]
2429                                                   ==========
2430  * ->8-
2431  */
2432
2433 static void startup_set_input_file(InputFile *f) {
2434   assert(!main_input_file);
2435   main_input_file= f;
2436   inputfile_reading_start(f);
2437 }
2438
2439 static void statemc_lock(void) {
2440   int lockfd;
2441   struct stat stab, stabf;
2442   
2443   for (;;) {
2444     lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2445     if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2446
2447     struct flock fl;
2448     memset(&fl,0,sizeof(fl));
2449     fl.l_type= F_WRLCK;
2450     fl.l_whence= SEEK_SET;
2451     int r= fcntl(lockfd, F_SETLK, &fl);
2452     if (r==-1) {
2453       if (errno==EACCES || isewouldblock(errno)) {
2454         if (quiet_multiple) exit(0);
2455         fatal("another duct holds the lockfile");
2456       }
2457       sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2458     }
2459
2460     xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2461     int lock_noent;
2462     xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2463
2464     if (!lock_noent && samefile(&stab, &stabf))
2465       break;
2466
2467     xclose(lockfd, "stale lockfile ", path_lock);
2468   }
2469
2470   FILE *lockfile= fdopen(lockfd, "w");
2471   if (!lockfile) sysdie("fdopen lockfile");
2472
2473   int r= ftruncate(lockfd, 0);
2474   if (r) sysdie("truncate lockfile to write new info");
2475
2476   if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2477               (unsigned long)self_pid,
2478               sitename, feedfile, remote_host) == EOF ||
2479       fflush(lockfile))
2480     sysfatal("write info to lockfile %s", path_lock);
2481
2482   debug("startup: locked");
2483 }
2484
2485 static void statemc_init(void) {
2486   struct stat stabdefer;
2487
2488   search_backlog_file();
2489
2490   int defer_noent;
2491   xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2492   if (defer_noent) {
2493     debug("startup: ductdefer ENOENT");
2494   } else {
2495     debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2496     switch (stabdefer.st_nlink==1) {
2497     case 1:
2498       open_defer(); /* so that we will later close it and rename it */
2499       break;
2500     case 2:
2501       xunlink(path_defer, "stale defer file link"
2502               " (presumably hardlink to backlog file)");
2503       break;
2504     default:
2505       die("defer file %s has unexpected link count %d",
2506           path_defer, stabdefer.st_nlink);
2507     }
2508   }
2509
2510   struct stat stab_f, stab_d;
2511   int noent_f;
2512
2513   InputFile *file_d= open_input_file(path_flushing);
2514   if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2515
2516   xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2517
2518   if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2519     debug("startup: F==D => Hardlinked");
2520     xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2521     noent_f= 1;
2522   }
2523
2524   if (noent_f) {
2525     debug("startup: F ENOENT => Moved");
2526     if (file_d) startup_set_input_file(file_d);
2527     spawn_inndcomm_flush("feedfile missing at startup");
2528     /* => Flushing, sms:=FLUSHING */
2529   } else {
2530     if (file_d) {
2531       debug("startup: F!=D => Separated");
2532       startup_set_input_file(file_d);
2533       flushing_input_file= main_input_file;
2534       main_input_file= open_input_file(feedfile);
2535       if (!main_input_file) die("feedfile vanished during startup");
2536       SMS(SEPARATED, max_separated_periods,
2537           "found both old and current feed files");
2538     } else {
2539       debug("startup: F exists, D ENOENT => Normal");
2540       InputFile *file_f= open_input_file(feedfile);
2541       if (!file_f) die("feed file vanished during startup");
2542       startup_set_input_file(file_f);
2543       SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2544     }
2545   }
2546 }
2547
2548 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2549   assert(sms == sm_NORMAL);
2550
2551   debug("starting flush (%s) (%lu >?= %lu) (%d)",
2552         why,
2553         (unsigned long)(main_input_file ? main_input_file->offset : 0),
2554         (unsigned long)target_max_feedfile_size,
2555         until_flush);
2556
2557   int r= link(feedfile, path_flushing);
2558   if (r) sysfatal("link feedfile %s to flushing file %s",
2559                   feedfile, path_flushing);
2560   /* => Hardlinked */
2561
2562   xunlink(feedfile, "old feedfile link");
2563   /* => Moved */
2564
2565   spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2566 }
2567
2568 static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
2569   switch (sms) {
2570
2571   case sm_NORMAL:
2572     statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2573     return 1;
2574
2575   case sm_FLUSHFAILED:
2576     spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2577     return 1;
2578
2579   case sm_SEPARATED:
2580   case sm_DROPPING:
2581     warn("took too long to complete old feedfile after flush, autodeferring");
2582     assert(flushing_input_file);
2583     autodefer_input_file(flushing_input_file);
2584     return 1;
2585
2586   default:
2587     return 0;
2588   }
2589 }
2590
2591 static void statemc_period_poll(void) {
2592   if (!until_flush) return;
2593   until_flush--;
2594   assert(until_flush>=0);
2595
2596   if (until_flush) return;
2597   int ok= trigger_flush_ok();
2598   assert(ok);
2599 }
2600
2601 static int inputfile_is_done(InputFile *ipf) {
2602   if (!ipf) return 0;
2603   if (ipf->inprogress) return 0; /* new article in the meantime */
2604   if (ipf->rd) return 0; /* not had EOF */
2605   return 1;
2606 }
2607
2608 static void notice_processed(InputFile *ipf, int completed,
2609                              const char *what, const char *spec) {
2610   if (!ipf) return; /* allows preterminate to be lazy */
2611
2612 #define RCI_NOTHING(x) /* nothing */
2613 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2614 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2615
2616 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2617
2618   char *inprog= completed
2619     ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2620     : xasprintf(" inprogress=%ld", ipf->inprogress);
2621   char *autodefer= ipf->autodefer >= 0
2622     ? xasprintf(" autodeferred=%ld", ipf->autodefer)
2623     : xasprintf("%s","");
2624
2625   info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
2626        " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2627        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2628        ,
2629        completed?"completed":"processed", what, spec,
2630        ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
2631        inprog, autodefer,
2632        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2633        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2634        CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2635        , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2636        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
2637        );
2638
2639   free(inprog);
2640   free(autodefer);
2641
2642 #undef CNT
2643 }
2644
2645 static void statemc_check_backlog_done(void) {
2646   InputFile *ipf= backlog_input_file;
2647   if (!inputfile_is_done(ipf)) return;
2648
2649   const char *slash= strrchr(ipf->path, '/');
2650   const char *leaf= slash ? slash+1 : ipf->path;
2651   const char *under= strchr(slash, '_');
2652   const char *rest= under ? under+1 : leaf;
2653   if (!strncmp(rest,"backlog",7)) rest += 7;
2654   notice_processed(ipf,1,"backlog ",rest);
2655
2656   close_input_file(ipf);
2657   if (unlink(ipf->path)) {
2658     if (errno != ENOENT)
2659       sysdie("could not unlink processed backlog file %s", ipf->path);
2660     warn("backlog file %s vanished while we were reading it"
2661          " so we couldn't remove it (but it's done now, anyway)",
2662          ipf->path);
2663   }
2664   free(ipf);
2665   backlog_input_file= 0;
2666   search_backlog_file();
2667   return;
2668 }
2669
2670 static void statemc_check_flushing_done(void) {
2671   InputFile *ipf= flushing_input_file;
2672   if (!inputfile_is_done(ipf)) return;
2673
2674   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2675
2676   notice_processed(ipf,1,"feedfile","");
2677
2678   close_defer();
2679
2680   xunlink(path_flushing, "old flushing file");
2681
2682   close_input_file(flushing_input_file);
2683   free(flushing_input_file);
2684   flushing_input_file= 0;
2685
2686   if (sms==sm_SEPARATED) {
2687     notice("flush complete");
2688     SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2689   } else if (sms==sm_DROPPING) {
2690     SMS(DROPPED, max_separated_periods, "old flush complete");
2691     search_backlog_file();
2692     notice("feed dropped, but will continue until backlog is finished");
2693   }
2694 }
2695
2696 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2697                                       void *u) {
2698   assert(!inputfile_is_done(main_input_file));
2699   statemc_check_flushing_done();
2700   statemc_check_backlog_done();
2701   return OOP_CONTINUE;
2702 }
2703
2704 static void queue_check_input_done(void) {
2705   loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2706 }
2707
2708 static void statemc_setstate(StateMachineState newsms, int periods,
2709                              const char *forlog, const char *why) {
2710   sms= newsms;
2711   until_flush= periods;
2712
2713   const char *xtra= "";
2714   switch (sms) {
2715   case sm_FLUSHING:
2716   case sm_FLUSHFAILED:
2717     if (!main_input_file) xtra= "-ABSENT";
2718     break;
2719   case sm_SEPARATED:
2720   case sm_DROPPING:
2721     xtra= flushing_input_file->rd ? "-1" : "-2";
2722     break;
2723   default:;
2724   }
2725
2726   if (periods) {
2727     info("state %s%s[%d] %s",forlog,xtra,periods,why);
2728   } else {
2729     info("state %s%s %s",forlog,xtra,why);
2730   }
2731 }
2732
2733 /*---------- defer and backlog files ----------*/
2734
2735 static void open_defer(void) {
2736   struct stat stab;
2737
2738   if (defer) return;
2739
2740   defer= fopen(path_defer, "a+");
2741   if (!defer) sysfatal("could not open defer file %s", path_defer);
2742
2743   /* truncate away any half-written records */
2744
2745   xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2746
2747   if (stab.st_size > LONG_MAX)
2748     die("defer file %s size is far too large", path_defer);
2749
2750   if (!stab.st_size)
2751     return;
2752
2753   long orgsize= stab.st_size;
2754   long truncto= stab.st_size;
2755   for (;;) {
2756     if (!truncto) break; /* was only (if anything) one half-truncated record */
2757     if (fseek(defer, truncto-1, SEEK_SET) < 0)
2758       sysdie("seek in defer file %s while truncating partial", path_defer);
2759
2760     int r= getc(defer);
2761     if (r==EOF) {
2762       if (ferror(defer))
2763         sysdie("failed read from defer file %s", path_defer);
2764       else
2765         die("defer file %s shrank while we were checking it!", path_defer);
2766     }
2767     if (r=='\n') break;
2768     truncto--;
2769   }
2770
2771   if (stab.st_size != truncto) {
2772     warn("truncating half-record at end of defer file %s -"
2773          " shrinking by %ld bytes from %ld to %ld",
2774          path_defer, orgsize - truncto, orgsize, truncto);
2775
2776     if (fflush(defer))
2777       sysfatal("could not flush defer file %s", path_defer);
2778     if (ftruncate(fileno(defer), truncto))
2779       sysdie("could not truncate defer file %s", path_defer);
2780
2781   } else {
2782     info("continuing existing defer file %s (%ld bytes)",
2783          path_defer, orgsize);
2784   }
2785   if (fseek(defer, truncto, SEEK_SET))
2786     sysdie("could not seek to new end of defer file %s", path_defer);
2787 }
2788
2789 static void close_defer(void) {
2790   if (!defer)
2791     return;
2792
2793   struct stat stab;
2794   xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2795
2796   if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2797   defer= 0;
2798
2799   time_t now= xtime();
2800
2801   char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2802                            (unsigned long)now,
2803                            (unsigned long)stab.st_ino);
2804   if (link(path_defer, backlog))
2805     sysfatal("could not install defer file %s as backlog file %s",
2806            path_defer, backlog);
2807   if (unlink(path_defer))
2808     sysdie("could not unlink old defer link %s to backlog file %s",
2809            path_defer, backlog);
2810
2811   free(backlog);
2812
2813   if (until_backlog_nextscan < 0 ||
2814       until_backlog_nextscan > backlog_retry_minperiods + 1)
2815     until_backlog_nextscan= backlog_retry_minperiods + 1;
2816 }
2817
2818 static void poll_backlog_file(void) {
2819   if (until_backlog_nextscan < 0) return;
2820   if (until_backlog_nextscan-- > 0) return;
2821   search_backlog_file();
2822 }
2823
2824 static void search_backlog_file(void) {
2825   /* returns non-0 iff there are any backlog files */
2826
2827   glob_t gl;
2828   int r, i;
2829   struct stat stab;
2830   const char *oldest_path=0;
2831   time_t oldest_mtime=0, now;
2832
2833   if (backlog_input_file) return;
2834
2835  try_again:
2836
2837   r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2838
2839   switch (r) {
2840   case GLOB_ABORTED:
2841     sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2842   case GLOB_NOSPACE:
2843     fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2844   case 0:
2845     for (i=0; i<gl.gl_pathc; i++) {
2846       const char *path= gl.gl_pathv[i];
2847
2848       if (strchr(path,'#') || strchr(path,'~')) {
2849         debug("backlog file search skipping %s", path);
2850         continue;
2851       }
2852       r= stat(path, &stab);
2853       if (r) {
2854         syswarn("failed to stat backlog file %s", path);
2855         continue;
2856       }
2857       if (!S_ISREG(stab.st_mode)) {
2858         warn("backlog file %s is not a plain file (or link to one)", path);
2859         continue;
2860       }
2861       if (!oldest_path || stab.st_mtime < oldest_mtime) {
2862         oldest_path= path;
2863         oldest_mtime= stab.st_mtime;
2864       }
2865     }
2866   case GLOB_NOMATCH: /* fall through */
2867     break;
2868   default:
2869     sysdie("glob expansion of backlog pattern %s gave unexpected"
2870            " nonzero (error?) return value %d", globpat_backlog, r);
2871   }
2872
2873   if (!oldest_path) {
2874     debug("backlog scan: none");
2875
2876     if (sms==sm_DROPPED) {
2877       preterminate();
2878       notice("feed dropped and our work is complete");
2879
2880       int r= unlink(path_control);
2881       if (r && errno!=ENOENT)
2882         syswarn("failed to remove control symlink for old feed");
2883
2884       xunlink(path_lock,    "lockfile for old feed");
2885       exit(4);
2886     }
2887     until_backlog_nextscan= backlog_spontrescan_periods;
2888     goto xfree;
2889   }
2890
2891   now= xtime();
2892   double age= difftime(now, oldest_mtime);
2893   long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2894
2895   if (age_deficiency <= 0) {
2896     debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2897           age, age_deficiency, oldest_path);
2898
2899     backlog_input_file= open_input_file(oldest_path);
2900     if (!backlog_input_file) {
2901       warn("backlog file %s vanished as we opened it", oldest_path);
2902       globfree(&gl);
2903       goto try_again;
2904     }
2905     inputfile_reading_start(backlog_input_file);
2906     until_backlog_nextscan= -1;
2907     goto xfree;
2908   }
2909
2910   until_backlog_nextscan= age_deficiency / period_seconds;
2911
2912   if (backlog_spontrescan_periods >= 0 &&
2913       until_backlog_nextscan > backlog_spontrescan_periods)
2914     until_backlog_nextscan= backlog_spontrescan_periods;
2915
2916   debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2917         age, age_deficiency, until_backlog_nextscan, oldest_path);
2918
2919  xfree:
2920   globfree(&gl);
2921   return;
2922 }
2923
2924 /*---------- shutdown and signal handling ----------*/
2925
2926 static void preterminate(void) {
2927   if (in_child) return;
2928   notice_processed(main_input_file,0,"feedfile","");
2929   notice_processed(flushing_input_file,0,"flushing","");
2930   if (backlog_input_file)
2931     notice_processed(backlog_input_file,0, "backlog file ",
2932                      backlog_input_file->path);
2933 }
2934
2935 static int signal_self_pipe[2];
2936 static sig_atomic_t terminate_sig_flag;
2937
2938 static void raise_default(int signo) {
2939   xsigsetdefault(signo);
2940   raise(signo);
2941   abort();
2942 }
2943
2944 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2945   assert(fd=signal_self_pipe[0]);
2946   char buf[PIPE_BUF];
2947   int r= read(signal_self_pipe[0], buf, sizeof(buf));
2948   if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2949   if (r==0) die("eof on signal self pipe");
2950   if (terminate_sig_flag) {
2951     preterminate();
2952     notice("terminating (%s)", strsignal(terminate_sig_flag));
2953     raise_default(terminate_sig_flag);
2954   }
2955   return OOP_CONTINUE;
2956 }
2957
2958 static void sigarrived_handler(int signum) {
2959   static char x;
2960   switch (signum) {
2961   case SIGTERM:
2962   case SIGINT:
2963     if (!terminate_sig_flag) terminate_sig_flag= signum;
2964     break;
2965   default:
2966     abort();
2967   }
2968   write(signal_self_pipe[1],&x,1);
2969 }
2970
2971 static void init_signals(void) {
2972   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
2973     sysdie("could not ignore SIGPIPE");
2974
2975   if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
2976
2977   xsetnonblock(signal_self_pipe[0],1);
2978   xsetnonblock(signal_self_pipe[1],1);
2979
2980   struct sigaction sa;
2981   memset(&sa,0,sizeof(sa));
2982   sa.sa_handler= sigarrived_handler;
2983   sa.sa_flags= SA_RESTART;
2984   xsigaction(SIGTERM,&sa);
2985   xsigaction(SIGINT,&sa);
2986
2987   on_fd_read_except(signal_self_pipe[0], sigarrived_event);
2988 }
2989
2990 /*========== flushing the feed ==========*/
2991
2992 static pid_t inndcomm_child;
2993 static int inndcomm_sentinel_fd;
2994
2995 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
2996   assert(inndcomm_child);
2997   assert(fd == inndcomm_sentinel_fd);
2998   int status= xwaitpid(&inndcomm_child, "inndcomm");
2999   inndcomm_child= 0;
3000   
3001   cancel_fd_read_except(fd);
3002   xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
3003   inndcomm_sentinel_fd= 0;
3004
3005   assert(!flushing_input_file);
3006
3007   if (WIFEXITED(status)) {
3008     switch (WEXITSTATUS(status)) {
3009
3010     case INNDCOMMCHILD_ESTATUS_FAIL:
3011       goto failed;
3012
3013     case INNDCOMMCHILD_ESTATUS_NONESUCH:
3014       notice("feed has been dropped by innd, finishing up");
3015       flushing_input_file= main_input_file;
3016       tailing_queue_readable(flushing_input_file);
3017         /* we probably previously returned EAGAIN from our fake read method
3018          * when in fact we were at EOF, so signal another readable event
3019          * so we actually see the EOF */
3020
3021       main_input_file= 0;
3022
3023       if (flushing_input_file) {
3024         SMS(DROPPING, max_separated_periods,
3025             "feed dropped by innd, but must finish last flush");
3026       } else {
3027         close_defer();
3028         SMS(DROPPED, 0, "feed dropped by innd");
3029         search_backlog_file();
3030       }
3031       return OOP_CONTINUE;
3032
3033     case 0:
3034       /* as above */
3035       flushing_input_file= main_input_file;
3036       tailing_queue_readable(flushing_input_file);
3037
3038       main_input_file= open_input_file(feedfile);
3039       if (!main_input_file)
3040         die("flush succeeded but feedfile %s does not exist!", feedfile);
3041
3042       if (flushing_input_file) {
3043         SMS(SEPARATED, max_separated_periods, "recovery flush complete");
3044       } else {
3045         close_defer();
3046         SMS(NORMAL, spontaneous_flush_periods, "flush complete");
3047       }
3048       return OOP_CONTINUE;
3049
3050     default:
3051       goto unexpected_exitstatus;
3052
3053     }
3054   } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
3055     warn("flush timed out trying to talk to innd");
3056     goto failed;
3057   } else {
3058   unexpected_exitstatus:
3059     report_child_status("inndcomm child", status);
3060   }
3061
3062  failed:
3063   SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
3064   return OOP_CONTINUE;
3065 }
3066
3067 static void inndcommfail(const char *what) {
3068   syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
3069   exit(INNDCOMMCHILD_ESTATUS_FAIL);
3070 }
3071
3072 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
3073   int pipefds[2];
3074
3075   notice("flushing %s",why);
3076
3077   assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
3078   assert(!inndcomm_child);
3079   assert(!inndcomm_sentinel_fd);
3080
3081   if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
3082
3083   inndcomm_child= xfork("inndcomm child");
3084
3085   if (!inndcomm_child) {
3086     const char *flushargv[2]= { sitename, 0 };
3087     char *reply;
3088     int r;
3089
3090     xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
3091     /* parent spots the autoclose of pipefds[1] when we die or exit */
3092
3093     if (simulate_flush>=0) {
3094       warn("SIMULATING flush child status %d", simulate_flush);
3095       if (simulate_flush>128) raise(simulate_flush-128);
3096       else exit(simulate_flush);
3097     }
3098
3099     alarm(inndcomm_flush_timeout);
3100     r= ICCopen();                         if (r)   inndcommfail("connect");
3101     r= ICCcommand('f',flushargv,&reply);  if (r<0) inndcommfail("transmit");
3102     if (!r) exit(0); /* yay! */
3103
3104     if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
3105     syswarn("innd ctlinnd flush failed: innd said %s", reply);
3106     exit(INNDCOMMCHILD_ESTATUS_FAIL);
3107   }
3108
3109   simulate_flush= -1;
3110
3111   xclose(pipefds[1], "inndcomm sentinel child's end",0);
3112   inndcomm_sentinel_fd= pipefds[0];
3113   assert(inndcomm_sentinel_fd);
3114   on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
3115
3116   SMS(FLUSHING, 0, why);
3117 }
3118
3119 /*========== main program ==========*/
3120
3121 static void postfork_inputfile(InputFile *ipf) {
3122   if (!ipf) return;
3123   xclose(ipf->fd, "(in child) input file ", ipf->path);
3124 }
3125
3126 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
3127   /* we have no stdio streams that are buffered long-term */
3128   if (!f) return;
3129   if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
3130 }
3131
3132 static void postfork(void) {
3133   in_child= 1;
3134
3135   xsigsetdefault(SIGTERM);
3136   xsigsetdefault(SIGINT);
3137   xsigsetdefault(SIGPIPE);
3138   if (terminate_sig_flag) raise(terminate_sig_flag);
3139
3140   postfork_inputfile(main_input_file);
3141   postfork_inputfile(flushing_input_file);
3142
3143   Conn *conn;
3144   FOR_CONN(conn)
3145     conn_closefd(conn,"(in child) ");
3146
3147   postfork_stdio(defer, "defer file ", path_defer);
3148 }
3149
3150 typedef struct Every Every;
3151 struct Every {
3152   struct timeval interval;
3153   int fixed_rate;
3154   void (*f)(void);
3155 };
3156
3157 static void every_schedule(Every *e, struct timeval base);
3158
3159 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3160   Every *e= e_v;
3161   e->f();
3162   if (!e->fixed_rate) xgettimeofday(&base);
3163   every_schedule(e, base);
3164   return OOP_CONTINUE;
3165 }
3166
3167 static void every_schedule(Every *e, struct timeval base) {
3168   struct timeval when;
3169   timeradd(&base, &e->interval, &when);
3170   loop->on_time(loop, when, every_happens, e);
3171 }
3172
3173 static void every(int interval, int fixed_rate, void (*f)(void)) {
3174   NEW_DECL(Every *,e);
3175   e->interval.tv_sec= interval;
3176   e->interval.tv_usec= 0;
3177   e->fixed_rate= fixed_rate;
3178   e->f= f;
3179   struct timeval now;
3180   xgettimeofday(&now);
3181   every_schedule(e, now);
3182 }
3183
3184 static void filepoll(void) {
3185   filemon_callback(main_input_file);
3186   filemon_callback(flushing_input_file);
3187 }
3188
3189 static char *debug_report_ipf(InputFile *ipf) {
3190   if (!ipf) return xasprintf("none");
3191
3192   const char *slash= strrchr(ipf->path,'/');
3193   const char *path= slash ? slash+1 : ipf->path;
3194
3195   return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s",
3196                    ipf, path,
3197                    ipf->queue.count, ipf->inprogress, ipf->autodefer,
3198                    (long)ipf->offset, ipf->fd,
3199                    ipf->rd ? "" : ",!rd",
3200                    ipf->skippinglong ? "*skiplong" : "");
3201 }
3202
3203 static void period(void) {
3204   char *dipf_main=     debug_report_ipf(main_input_file);
3205   char *dipf_flushing= debug_report_ipf(flushing_input_file);
3206   char *dipf_backlog=  debug_report_ipf(backlog_input_file);
3207
3208   debug("PERIOD"
3209         " sms=%s[%d] conns=%d until_connect=%d"
3210         " input_files main:%s flushing:%s backlog:%s"
3211         " children connecting=%ld inndcomm=%ld"
3212         ,
3213         sms_names[sms], until_flush, conns.count, until_connect,
3214         dipf_main, dipf_flushing, dipf_backlog,
3215         (long)connecting_child, (long)inndcomm_child
3216         );
3217
3218   free(dipf_main);
3219   free(dipf_flushing);
3220   free(dipf_backlog);
3221
3222   if (until_connect) until_connect--;
3223
3224   inputfile_queue_check_expired(backlog_input_file);
3225   poll_backlog_file();
3226   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3227   statemc_period_poll();
3228   check_assign_articles();
3229   check_idle_conns();
3230 }
3231
3232
3233 /*========== dumping state ==========*/
3234
3235 static void dump_article_list(FILE *f, const ControlCommand *c,
3236                               const ArticleList *al) {
3237   fprintf(f, " count=%d\n", al->count);
3238   if (!c->xval) return;
3239   
3240   int i; Article *art;
3241   for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3242     fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3243     DUMPV("%p", art->,ipf);
3244     DUMPV("%d", art->,missing);
3245     DUMPV("%lu", (unsigned long)art->,offset);
3246     DUMPV("%d", art->,blanklen);
3247     DUMPV("%d", art->,midlen);
3248     fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3249   }
3250 }
3251   
3252 static void dump_input_file(FILE *f, const ControlCommand *c,
3253                             InputFile *ipf, const char *wh) {
3254   char *dipf= debug_report_ipf(ipf);
3255   fprintf(f,"input %s %s", wh, dipf);
3256   free(dipf);
3257   
3258   if (ipf) {
3259     DUMPV("%d", ipf->,readcount_ok);
3260     DUMPV("%d", ipf->,readcount_blank);
3261     DUMPV("%d", ipf->,readcount_err);
3262   }
3263   fprintf(f,"\n");
3264   if (ipf) {
3265     ArtState state; const char *const *statename; 
3266     for (state=0, statename=artstate_names; *statename; state++,statename++) {
3267 #define RC_DUMP_FMT(x) " " #x "=%d"
3268 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3269       fprintf(f,"input %s counts %-11s"
3270               RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3271               wh, *statename
3272               RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3273     }
3274     fprintf(f,"input %s queue", wh);
3275     dump_article_list(f,c,&ipf->queue);
3276   }
3277 }
3278
3279 CCMD(dump) {
3280   int i;
3281   fprintf(cc->out, "dumping state to %s\n", path_dump);
3282   FILE *f= fopen(path_dump, "w");
3283   if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3284
3285   fprintf(f,"general");
3286   DUMPV("%s", sms_names,[sms]);
3287   DUMPV("%d", ,until_flush);
3288   DUMPV("%ld", (long),self_pid);
3289   DUMPV("%p", , defer);
3290   DUMPV("%d", , until_connect);
3291   DUMPV("%d", , until_backlog_nextscan);
3292   DUMPV("%d", , simulate_flush);
3293   fprintf(f,"\nnocheck");
3294   DUMPV("%#.10f", , accept_proportion);
3295   DUMPV("%d", , nocheck);
3296   DUMPV("%d", , nocheck_reported);
3297   fprintf(f,"\n");
3298
3299   fprintf(f,"special");
3300   DUMPV("%ld", (long),connecting_child);
3301   DUMPV("%d", , connecting_fdpass_sock);
3302   DUMPV("%d", , control_master);
3303   fprintf(f,"\n");
3304
3305   fprintf(f,"filemon ");
3306   filemon_method_dump_info(f);
3307
3308   dump_input_file(f,c, main_input_file,     "main"    );
3309   dump_input_file(f,c, flushing_input_file, "flushing");
3310   dump_input_file(f,c, backlog_input_file,  "backlog" );
3311
3312   fprintf(f,"conns count=%d\n", conns.count);
3313
3314   Conn *conn;
3315   FOR_CONN(conn) {
3316
3317     fprintf(f,"C%d",conn->fd);
3318     DUMPV("%p",conn->,rd);             DUMPV("%d",conn->,max_queue);
3319     DUMPV("%d",conn->,stream);         DUMPV("%d",conn->,quitting);
3320     DUMPV("%d",conn->,since_activity);
3321     fprintf(f,"\n");
3322
3323     fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3324     fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3325     fprintf(f,"C%d sent",    conn->fd); dump_article_list(f,c,&conn->sent);
3326
3327     fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3328     for (i=0; i<conn->xmitu; i++) {
3329       const struct iovec *iv= &conn->xmit[i];
3330       const XmitDetails *xd= &conn->xmitd[i];
3331       char *dinfo;
3332       switch (xd->kind) {
3333       case xk_Const:    dinfo= xasprintf("Const");                 break;
3334       case xk_Artdata:  dinfo= xasprintf("A%p", xd->info.sm_art);  break;
3335       default:
3336         abort();
3337       }
3338       fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3339               sanitise(iv->iov_base, iv->iov_len));
3340       free(dinfo);
3341     }
3342   }
3343
3344   fprintf(f,"paths");
3345   DUMPV("%s", , path_lock);
3346   DUMPV("%s", , path_flushing);
3347   DUMPV("%s", , path_defer);
3348   DUMPV("%s", , path_control);
3349   DUMPV("%s", , path_dump);
3350   DUMPV("%s", , globpat_backlog);
3351   fprintf(f,"\n");
3352
3353   if (!!ferror(f) + !!fclose(f)) {
3354     fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3355     return;
3356   }
3357 }
3358
3359 /*========== option parsing ==========*/
3360
3361 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3362 static void vbadusage(const char *fmt, va_list al) {
3363   char *m= xvasprintf(fmt,al);
3364   fprintf(stderr, "bad usage: %s\n"
3365           "say --help for help, or read the manpage\n",
3366           m);
3367   if (become_daemon)
3368     syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3369   exit(8);
3370 }
3371
3372 /*---------- generic option parser ----------*/
3373
3374 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3375 static void badusage(const char *fmt, ...) {
3376   va_list al;
3377   va_start(al,fmt);
3378   vbadusage(fmt,al);
3379 }
3380
3381 enum OptFlags {
3382   of_seconds= 001000u,
3383   of_boolean= 002000u,
3384 };
3385
3386 typedef struct Option Option;
3387 typedef void OptionParser(const Option*, const char *val);
3388
3389 struct Option {
3390   int shrt;
3391   const char *lng, *formarg;
3392   void *store;
3393   OptionParser *fn;
3394   int intval;
3395 };
3396
3397 static void parse_options(const Option *options, char ***argvp) {
3398   /* on return *argvp is first non-option arg; argc is not updated */
3399
3400   for (;;) {
3401     const char *arg= *++(*argvp);
3402     if (!arg) break;
3403     if (*arg != '-') break;
3404     if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3405     int a;
3406     while ((a= *++arg)) {
3407       const Option *o;
3408       if (a=='-') {
3409         arg++;
3410         char *equals= strchr(arg,'=');
3411         int len= equals ? (equals - arg) : strlen(arg);
3412         for (o=options; o->shrt || o->lng; o++)
3413           if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3414             goto found_long;
3415         badusage("unknown long option --%s",arg);
3416       found_long:
3417         if (!o->formarg) {
3418           if (equals) badusage("option --%s does not take a value",o->lng);
3419           arg= 0;
3420         } else if (equals) {
3421           arg= equals+1;
3422         } else {
3423           arg= *++(*argvp);
3424           if (!arg) badusage("option --%s needs a value for %s",
3425                              o->lng, o->formarg);
3426         }
3427         o->fn(o, arg);
3428         break; /* eaten the whole argument now */
3429       }
3430       for (o=options; o->shrt || o->lng; o++)
3431         if (a == o->shrt)
3432           goto found_short;
3433       badusage("unknown short option -%c",a);
3434     found_short:
3435       if (!o->formarg) {
3436         o->fn(o,0);
3437       } else {
3438         if (!*++arg) {
3439           arg= *++(*argvp);
3440           if (!arg) badusage("option -%c needs a value for %s",
3441                              o->shrt, o->formarg);
3442         }
3443         o->fn(o,arg);
3444         break; /* eaten the whole argument now */
3445       }
3446     }
3447   }
3448 }
3449
3450 #define DELIMPERHAPS(delim,str)  (str) ? (delim) : "", (str) ? (str) : ""
3451
3452 static void print_options(const Option *options, FILE *f) {
3453   const Option *o;
3454   for (o=options; o->shrt || o->lng; o++) {
3455     char shrt[2] = { o->shrt, 0 };
3456     char *optspec= xasprintf("%s%s%s%s%s",
3457                              o->shrt ? "-" : "", shrt,
3458                              o->shrt && o->lng ? "|" : "",
3459                              DELIMPERHAPS("--", o->lng));
3460     fprintf(f, "  %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3461     free(optspec);
3462   }
3463 }
3464
3465 /*---------- specific option types ----------*/
3466
3467 static void op_integer(const Option *o, const char *val) {
3468   char *ep;
3469   errno= 0;
3470   unsigned long ul= strtoul(val,&ep,10);
3471   if (*ep || ep==val || errno || ul>INT_MAX)
3472     badusage("bad integer value for %s",o->lng);
3473   int *store= o->store;
3474   *store= ul;
3475 }
3476
3477 static void op_double(const Option *o, const char *val) {
3478   int *store= o->store;
3479   char *ep;
3480   errno= 0;
3481   *store= strtod(val, &ep);
3482   if (*ep || ep==val || errno)
3483     badusage("bad floating point value for %s",o->lng);
3484 }
3485
3486 static void op_string(const Option *o, const char *val) {
3487   const char **store= o->store;
3488   *store= val;
3489 }
3490
3491 static void op_seconds(const Option *o, const char *val) {
3492   int *store= o->store;
3493   char *ep;
3494   int unit;
3495
3496   double v= strtod(val,&ep);
3497   if (ep==val) badusage("bad time/duration value for %s",o->lng);
3498
3499   if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3500   else if (!strcmp(ep,"m") || !strcmp(ep,"min"))    unit= 60;
3501   else if (!strcmp(ep,"h") || !strcmp(ep,"hour"))   unit= 3600;
3502   else if (!strcmp(ep,"d") || !strcmp(ep,"day"))    unit= 86400;
3503   else if (!strcmp(ep,"das")) unit= 10;
3504   else if (!strcmp(ep,"hs"))  unit= 100;
3505   else if (!strcmp(ep,"ks"))  unit= 1000;
3506   else if (!strcmp(ep,"Ms"))  unit= 1000000;
3507   else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3508
3509   v *= unit;
3510   v= ceil(v);
3511   if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3512   *store= v;
3513 }
3514
3515 static void op_setint(const Option *o, const char *val) {
3516   int *store= o->store;
3517   *store= o->intval;
3518 }
3519
3520 /*---------- specific options ----------*/
3521
3522 static void help(const Option *o, const char *val);
3523
3524 static const Option innduct_options[]= {
3525 {'f',"feedfile",         "F",     &feedfile,                 op_string      },
3526 {'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
3527 {0,"no-daemon",          0,       &become_daemon,            op_setint, 0   },
3528 {0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
3529 {0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
3530 {'C',"inndconf",         "F",     &inndconffile,             op_string      },
3531 {'P',"port",             "PORT",  &port,                     op_integer     },
3532 {0,"ctrl-sock-dir",      0,       &realsockdir,              op_string      },
3533 {0,"help",               0,       0,                         help           },
3534
3535 {0,"max-connections",    "N",     &max_connections,          op_integer     },
3536 {0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
3537 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
3538 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
3539
3540 {0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
3541 {0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
3542 {0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
3543
3544 {0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
3545 {0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
3546
3547 {0,"reconnect-interval",     "PERIOD", &reconnect_delay_periods,  op_seconds },
3548 {0,"flush-retry-interval",   "PERIOD", &flushfail_retry_periods,  op_seconds },
3549 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3550 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3551 {0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
3552 {0,"flush-finish-timeout",   "PERIOD", &max_separated_periods,    op_seconds },
3553 {0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
3554
3555 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
3556 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
3557
3558 {0,0}
3559 };
3560
3561 static void printusage(FILE *f) {
3562   fputs("usage: innduct [options] site [fqdn]\n"
3563         "available options are:\n", f);
3564   print_options(innduct_options, f);
3565 }
3566
3567 static void help(const Option *o, const char *val) {
3568   printusage(stdout);
3569   if (ferror(stdout) || fflush(stdout)) {
3570     perror("innduct: writing help");
3571     exit(12);
3572   }
3573   exit(0);
3574 }
3575
3576 static void convert_to_periods_rndup(int *store) {
3577   *store += period_seconds-1;
3578   *store /= period_seconds;
3579 }
3580
3581 int main(int argc, char **argv) {
3582   if (!argv[1]) {
3583     printusage(stderr);
3584     exit(8);
3585   }
3586
3587   parse_options(innduct_options, &argv);
3588
3589   /* arguments */
3590
3591   sitename= *argv++;
3592   if (!sitename) badusage("need site name argument");
3593   remote_host= *argv++;
3594   if (*argv) badusage("too many non-option arguments");
3595
3596   /* defaults */
3597
3598   int r= innconf_read(inndconffile);
3599   if (!r) badusage("could not read inn.conf (more info on stderr)");
3600
3601   if (!remote_host) remote_host= sitename;
3602
3603   if (nocheck_thresh < 0 || nocheck_thresh > 100)
3604     badusage("nocheck threshold percentage must be between 0..100");
3605   nocheck_thresh *= 0.01;
3606
3607   if (nocheck_decay < 0.1)
3608     badusage("nocheck decay articles must be at least 0.1");
3609   nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3610
3611   convert_to_periods_rndup(&reconnect_delay_periods);
3612   convert_to_periods_rndup(&flushfail_retry_periods);
3613   convert_to_periods_rndup(&backlog_retry_minperiods);
3614   convert_to_periods_rndup(&backlog_spontrescan_periods);
3615   convert_to_periods_rndup(&spontaneous_flush_periods);
3616   convert_to_periods_rndup(&max_separated_periods);
3617   convert_to_periods_rndup(&need_activity_periods);
3618
3619   if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3620     badusage("bad input data ratio must be between 0..100");
3621   max_bad_data_ratio *= 0.01;
3622
3623   if (!feedfile) {
3624     feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3625   } else if (!feedfile[0]) {
3626     badusage("feed filename must be nonempty");
3627   } else if (feedfile[strlen(feedfile)-1]=='/') {
3628     feedfile= xasprintf("%s%s",feedfile,sitename);
3629   }
3630
3631   const char *feedfile_forbidden= "?*[~#";
3632   int c;
3633   while ((c= *feedfile_forbidden++))
3634     if (strchr(feedfile, c))
3635       badusage("feed filename may not contain metacharacter %c",c);
3636
3637   /* set things up */
3638
3639   path_lock=        xasprintf("%s_lock",      feedfile);
3640   path_flushing=    xasprintf("%s_flushing",  feedfile);
3641   path_defer=       xasprintf("%s_defer",     feedfile);
3642   path_control=     xasprintf("%s_control",   feedfile);
3643   path_dump=        xasprintf("%s_dump",      feedfile);
3644   globpat_backlog=  xasprintf("%s_backlog*",  feedfile);
3645
3646   oop_source_sys *sysloop= oop_sys_new();
3647   if (!sysloop) sysdie("could not create liboop event loop");
3648   loop= (oop_source*)sysloop;
3649
3650   LIST_INIT(conns);
3651
3652   if (become_daemon) {
3653     int i;
3654     for (i=3; i<255; i++)
3655       /* do this now before we open syslog, etc. */
3656       close(i);
3657     openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3658
3659     int null= open("/dev/null",O_RDWR);
3660     if (null<0) sysfatal("failed to open /dev/null");
3661     dup2(null,0);
3662     dup2(null,1);
3663     dup2(null,2);
3664     xclose(null, "/dev/null original fd",0);
3665
3666     pid_t child1= xfork("daemonise first fork");
3667     if (child1) _exit(0);
3668
3669     pid_t sid= setsid();
3670     if (sid != child1) sysfatal("setsid failed");
3671
3672     pid_t child2= xfork("daemonise second fork");
3673     if (child2) _exit(0);
3674   }
3675
3676   self_pid= getpid();
3677   if (self_pid==-1) sysdie("getpid");
3678
3679   statemc_lock();
3680
3681   init_signals();
3682
3683   notice("starting");
3684
3685   if (!become_daemon)
3686     control_stdio();
3687
3688   control_init();
3689
3690   int filemon_ok= 0;
3691   if (!try_filemon) {
3692     notice("filemon: suppressed by command line option, polling");
3693   } else {
3694     filemon_ok= filemon_method_init();
3695     if (!filemon_ok)
3696       warn("filemon: no file monitoring available, polling");
3697   }
3698   if (!filemon_ok)
3699     every(filepoll_seconds,0,filepoll);
3700
3701   every(period_seconds,1,period);
3702
3703   statemc_init();
3704
3705   /* let's go */
3706
3707   void *run= oop_sys_run(sysloop);
3708   assert(run == OOP_ERROR);
3709   sysdie("event loop failed");
3710 }