chiark / gitweb /
Check for articles expiring, both in queue heads and reading backlog
[inn-innduct.git] / backends / innduct.c
1 /*
2  * debugging rune:
3  *  build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
4  */
5
6 /*--
7 flow control notes
8 to ensure articles go away eventually
9 separate queue for each input file
10   queue expiry
11     every period, check head of backlog queue for expiry with SMretrieve
12       if too old: discard, and check next article
13     also check every backlog article as we read it
14   flush expiry
15     after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
16     one-off: eat queued articles from flushing and write them to defer
17     one-off: connfail all connections which have any articles from flushing
18     newly read articles from flushing go straight to defer
19     this should take care of it and get us out of this state
20 to avoid filling up ram needlessly
21   input control
22     limit number of queued articles for each ipf
23     pause/resume inputfile tailing
24 --*/
25
26 /*
27  * Newsfeeds file entries should look like this:
28  *     host.name.of.site[/exclude,exclude,...]\
29  *             :pattern,pattern...[/distribution,distribution...]\
30  *             :Tf,Wnm
31  *             :
32  * or
33  *     sitename[/exclude,exclude,...]\
34  *             :pattern,pattern...[/distribution,distribution...]\
35  *             :Tf,Wnm
36  *             :host.name.of.site
37  *
38  * Four files full of
39  *    token messageid
40  * or might be blanked out
41  *    <spc><spc><spc><spc>....
42  *
43  * F site.name                 main feed file
44  *                                opened/created, then written, by innd
45  *                                read by duct
46  *                                unlinked by duct
47  *                                tokens blanked out by duct when processed
48  *   site.name_lock            lock preventing multiple ducts
49  *                                to hold lock must open,F_SETLK[W]
50  *                                  and then stat to check that locked file
51  *                                  still has name site.name_lock
52  *                                holder of this lock is "duct"
53  *                                (only) lockholder may remove the lockfile
54  * D site.name_flushing        temporary feed file during flush (or crash)
55  *                                hardlink created by duct
56  *                                unlinked by duct
57  *   site.name_defer           431'd articles, still being written,
58  *                                created, written, used by duct
59  *
60  *   site.name_backlog.<date>.<inum>
61  *                             431'd articles, ready for innxmit or duct
62  *                                created (link/mv) by duct
63  *   site.name_backlog<anything-else>  (where <anything-else> does not
64  *                                      contain '#' or '~') eg
65  *   site.name_backlog.manual
66  *                             anything the sysadmin likes (eg, feed files
67  *                             from old feeds to be merged into this one)
68  *                                created (link/mv) by admin
69  *                                may be symlinks (in which case links
70  *                                may be written through, but only links
71  *                                will be removed.
72  *
73  *                             It is safe to remove backlog files manually,
74  *                             if it's desired to throw away the backlog.
75  *
76  * Backlog files are also processed by innduct.  We find the oldest
77  * backlog file which is at least a certain amount old, and feed it
78  * back into our processing.  When every article in it has been read
79  * and processed, we unlink it and look for another backlog file.
80  *
81  * If we don't have a backlog file that we're reading, we close the
82  * defer file that we're writing and make it into a backlog file at
83  * the first convenient opportunity.
84  * -8<-
85
86
87    OVERALL STATES:
88
89                                                                 START
90                                                                   |
91      ,-->--.                                                 check F, D
92      |     |                                                      |
93      |     |                                                      |
94      |     |  <----------------<---------------------------------'|
95      |     |                                       F exists       |
96      |     |                                       D ENOENT       |
97      |     |  duct opens F                                        |
98      |     V                                                      |
99      |  Normal                                                    |
100      |   F: innd writing, duct reading                            |
101      |   D: ENOENT                                                |
102      |     |                                                      |
103      |     |  duct decides time to flush                          |
104      |     |  duct makes hardlink                                 |
105      |     |                                                      |
106      |     V                            <------------------------'|
107      |  Hardlinked                                  F==D          |
108      |   F == D: innd writing, duct reading         both exist    |
109      ^     |                                                      |
110      |     |  duct unlinks F                                      |
111      |     |                        <-----------<-------------<--'|
112      |     |                           open D         F ENOENT    |
113      |     |                           if exists                  |
114      |     |                                                      |
115      |     V                        <---------------------.       |
116      |  Moved                                             |       |
117      |   F: ENOENT                                        |       |
118      |   D: innd writing, duct reading; or ENOENT         |       |
119      |     |                                              |       |
120      |     |  duct requests flush of feed                 |       |
121      |     |   (others can too, harmlessly)               |       |
122      |     V                                              |       |
123      |  Flushing                                          |       |
124      |   F: ENOENT                                        |       |
125      |   D: innd flushing, duct; or ENOENT                |       |
126      |     |                                              |       |
127      |     |   inndcomm flush fails                       |       |
128      |     |`-------------------------->------------------'       |
129      |     |                                                      |
130      |     |   inndcomm reports no such site                      |
131      |     |`---------------------------------------------------- | -.
132      |     |                                                      |  |
133      |     |  innd finishes writing D, creates F                  |  |
134      |     |  inndcomm reports flush successful                   |  |
135      |     |                                                      |  |
136      |     V                                                      |  |
137      |  Separated                                <----------------'  |
138      |   F: innd writing                            F!=D             /
139      |   D: duct reading; or ENOENT                  both exist     /
140      |     |                                                       /
141      |     |  duct gets to the end of D                           /
142      |     |  duct opens F too                                   /
143      |     V                                                    /
144      |  Finishing                                              /
145      |   F: innd writing, duct reading                        |
146      |   D: duct finishing                                    V
147      |     |                                            Dropping
148      |     |  duct finishes processing D                 F: ENOENT
149      |     V  duct unlinks D                             D: duct reading
150      |     |                                                  |
151      `--<--'                                                  | duct finishes
152                                                               |  processing D
153                                                               | duct unlinks D
154                                                               | duct exits
155                                                               V
156                                                         Dropped
157                                                          F: ENOENT
158                                                          D: ENOENT
159                                                          duct not running
160
161    "duct reading" means innduct is reading the file but also
162    overwriting processed tokens.
163
164  * ->8- -^L-
165  *
166  * rune for printing diagrams:
167
168 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
169
170  *
171  */
172
173 /*============================== PROGRAM ==============================*/
174
175 #define _GNU_SOURCE 1
176
177 #include "config.h"
178 #include "storage.h"
179 #include "nntp.h"
180 #include "libinn.h"
181 #include "inndcomm.h"
182
183 #include "inn/list.h"
184 #include "inn/innconf.h"
185
186 #include <sys/uio.h>
187 #include <sys/types.h>
188 #include <sys/wait.h>
189 #include <sys/stat.h>
190 #include <sys/socket.h>
191 #include <sys/un.h>
192 #include <unistd.h>
193 #include <string.h>
194 #include <signal.h>
195 #include <stdio.h>
196 #include <errno.h>
197 #include <syslog.h>
198 #include <fcntl.h>
199 #include <stdarg.h>
200 #include <assert.h>
201 #include <stdlib.h>
202 #include <stddef.h>
203 #include <glob.h>
204 #include <time.h>
205 #include <math.h>
206 #include <ctype.h>
207
208 #include <oop.h>
209 #include <oop-read.h>
210
211 /*----- general definitions, probably best not changed -----*/
212
213 #define CONNCHILD_ESTATUS_STREAM   24
214 #define CONNCHILD_ESTATUS_NOSTREAM 25
215
216 #define INNDCOMMCHILD_ESTATUS_FAIL     26
217 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
218
219 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
220 #define MAX_CONTROL_COMMAND 1000
221
222 #define VA                va_list al;  va_start(al,fmt)
223 #define PRINTF(f,a)       __attribute__((__format__(printf,f,a)))
224 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
225 #define NORET             __attribute__((__noreturn__))
226
227 #define NEW(ptr)              ((ptr)= zxmalloc(sizeof(*(ptr))))
228 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
229
230 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
231
232 #define FOR_CONN(conn) \
233   for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
234
235 /*----- doubly linked lists -----*/
236
237 #define ISNODE(T)   struct node list_node
238 #define DEFLIST(T)                              \
239    typedef struct {                             \
240      union { struct list li; T *for_type; } u;  \
241      int count;                                 \
242    } T##List
243
244 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
245
246 #define LIST_CHECKCANHAVENODE(l,n) \
247   ((void)((n) == ((l).u.for_type))) /* just for the type check */
248
249 #define LIST_ADDSOMEHOW(l,n,list_addsomehow)    \
250  ( LIST_CHECKCANHAVENODE(l,n),                  \
251    list_addsomehow(&(l).u.li, NODE((n))),       \
252    (void)(l).count++                            \
253    )
254
255 #define LIST_REMSOMEHOW(l,list_remsomehow)      \
256  ( (typeof((l).u.for_type))                     \
257    ( (l).count                                  \
258      ? ( (l).count--,                           \
259          list_remsomehow(&(l).u.li) )           \
260      : 0                                        \
261      )                                          \
262    )
263
264
265 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
266 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
267 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
268 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
269
270 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
271 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
272 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
273 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
274
275 #define LIST_REMOVE(l,n)                        \
276  ( LIST_CHECKCANHAVENODE(l,n),                  \
277    list_remove(NODE((n))),                      \
278    (void)(l).count--                            \
279    )
280
281 #define LIST_INSERT(l,n,pred)                                   \
282  ( LIST_CHECKCANHAVENODE(l,n),                                  \
283    LIST_CHECKCANHAVENODE(l,pred),                               \
284    list_insert((struct list*)&(l), NODE((n)), NODE((pred))),    \
285    (void)(l).count++                                            \
286    )
287
288 /*----- type predeclarations -----*/
289
290 typedef struct Conn Conn;
291 typedef struct Article Article;
292 typedef struct InputFile InputFile;
293 typedef struct XmitDetails XmitDetails;
294 typedef struct Filemon_Perfile Filemon_Perfile;
295 typedef enum StateMachineState StateMachineState;
296 typedef struct ControlCommand ControlCommand;
297
298 DEFLIST(Conn);
299 DEFLIST(Article);
300
301 /*----- function predeclarations -----*/
302
303 static void conn_maybe_write(Conn *conn);
304 static void conn_make_some_xmits(Conn *conn);
305 static void *conn_write_some_xmits(Conn *conn);
306
307 static void xmit_free(XmitDetails *d);
308
309 #define SMS(newstate, periods, why) \
310    (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
311 static void statemc_setstate(StateMachineState newsms, int periods,
312                              const char *forlog, const char *why);
313
314 static void statemc_start_flush(const char *why); /* Normal => Flushing */
315 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
316 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
317
318 static void article_done(Article *art, int whichcount);
319
320 static void check_assign_articles(void);
321 static void queue_check_input_done(void);
322
323 static void statemc_check_flushing_done(void);
324 static void statemc_check_backlog_done(void);
325
326 static void postfork(void);
327 static void period(void);
328
329 static void open_defer(void);
330 static void close_defer(void);
331 static void search_backlog_file(void);
332 static void preterminate(void);
333 static void raise_default(int signo) NORET;
334 static char *debug_report_ipf(InputFile *ipf);
335
336 static void inputfile_reading_start(InputFile *ipf);
337 static void inputfile_reading_stop(InputFile *ipf);
338
339 static void filemon_start(InputFile *ipf);
340 static void filemon_stop(InputFile *ipf);
341 static void filemon_callback(InputFile *ipf);
342
343 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
344 static void connfail(Conn *conn, const char *fmt, ...)         PRINTF(2,3);
345
346 static const oop_rd_style peer_rd_style;
347 static oop_rd_call peer_rd_err, peer_rd_ok;
348
349 /*----- configuration options -----*/
350 /* when changing defaults, remember to update the manpage */
351
352 static const char *sitename, *remote_host;
353 static const char *feedfile, *realsockdir="/tmp/innduct.control";
354 static int quiet_multiple=0;
355 static int become_daemon=1, try_filemon=1;
356 static int try_stream=1;
357 static int port=119;
358 static const char *inndconffile;
359
360 static int max_connections=10;
361 static int max_queue_per_conn=200;
362 static int target_max_feedfile_size=100000;
363 static int period_seconds=60;
364 static int filepoll_seconds=5;
365
366 static int connection_setup_timeout=200;
367 static int inndcomm_flush_timeout=100;
368
369 static double nocheck_thresh= 95.0; /* converted from percentage by main */
370 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
371
372 /* all these are initialised to seconds, and converted to periods in main */
373 static int reconnect_delay_periods=1000;
374 static int flushfail_retry_periods=1000;
375 static int backlog_retry_minperiods=50;
376 static int backlog_spontrescan_periods=300;
377 static int spontaneous_flush_periods=100000;
378 static int need_activity_periods=1000;
379
380 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
381 static int max_bad_data_initial= 30;
382   /* in one corrupt 4096-byte block the number of newlines has
383    * mean 16 and standard deviation 3.99.  30 corresponds to z=+3.5 */
384
385
386 /*----- statistics -----*/
387
388 typedef enum {      /* in queue                 in conn->sent             */
389   art_Unchecked,    /*   not checked, not sent    checking                */
390   art_Wanted,       /*   checked, wanted          sent body as requested  */
391   art_Unsolicited,  /*   -                        sent body without check */
392   art_MaxState,
393 } ArtState;
394
395 static const char *const artstate_names[]=
396   { "Unchecked", "Wanted", "Unsolicited", 0 };
397
398 #define RESULT_COUNTS(RCS,RCN)                  \
399   RCS(sent)                                     \
400   RCS(accepted)                                 \
401   RCN(unwanted)                                 \
402   RCN(rejected)                                 \
403   RCN(deferred)                                 \
404   RCN(missing)                                  \
405   RCN(connretry)
406
407 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
408 #define RCI_TRIPLE_VALS_BASE(counts,x)          \
409        counts[art_Unchecked] x                  \
410        + counts[art_Wanted] x                   \
411        + counts[art_Unsolicited] x,             \
412        counts[art_Unchecked] x                  \
413        , counts[art_Wanted] x                   \
414        , counts[art_Unsolicited] x
415
416 typedef enum {
417 #define RC_INDEX(x) RC_##x,
418   RESULT_COUNTS(RC_INDEX, RC_INDEX)
419   RCI_max
420 } ResultCountIndex;
421
422
423 /*----- transmission buffers -----*/
424
425 #define CONNIOVS 128
426
427 typedef enum {
428   xk_Const, xk_Artdata
429 } XmitKind;
430
431 struct XmitDetails {
432   XmitKind kind;
433   union {
434     ARTHANDLE *sm_art;
435   } info;
436 };
437
438
439 /*----- core operational data structure types -----*/
440
441 struct InputFile {
442   /* This is also an instance of struct oop_readable */
443   struct oop_readable readable; /* first */
444   oop_readable_call *readable_callback;
445   void *readable_callback_user;
446
447   int fd;
448   Filemon_Perfile *filemon;
449
450   oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
451   off_t offset;
452   int skippinglong;
453
454   ArticleList queue;
455   long inprogress; /* includes queue.count and also articles in conns */
456
457   int counts[art_MaxState][RCI_max];
458   int readcount_ok, readcount_blank, readcount_err;
459   char path[];
460 };
461
462 struct Article {
463   ISNODE(Article);
464   ArtState state;
465   int midlen, missing;
466   InputFile *ipf;
467   TOKEN token;
468   off_t offset;
469   int blanklen;
470   char messageid[1];
471 };
472
473 #define SMS_LIST(X)                             \
474   X(NORMAL)                                     \
475   X(FLUSHING)                                   \
476   X(FLUSHFAILED)                                \
477   X(SEPARATED)                                  \
478   X(DROPPING)                                   \
479   X(DROPPED)
480
481 enum StateMachineState {
482 #define SMS_DEF_ENUM(s) sm_##s,
483   SMS_LIST(SMS_DEF_ENUM)
484 };
485
486 static const char *sms_names[]= {
487 #define SMS_DEF_NAME(s) #s ,
488   SMS_LIST(SMS_DEF_NAME)
489   0
490 };
491
492 struct Conn {
493   ISNODE(Conn);
494   int fd; /* may be 0, meaning closed (during construction/destruction) */
495   oop_read *rd; /* likewise */
496   int max_queue, stream, quitting;
497   int since_activity; /* periods */
498   ArticleList waiting; /* not yet told peer */
499   ArticleList priority; /* peer says send it now */
500   ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
501   struct iovec xmit[CONNIOVS];
502   XmitDetails xmitd[CONNIOVS];
503   int xmitu;
504 };
505
506
507 /*----- general operational variables -----*/
508
509 /* main initialises */
510 static oop_source *loop;
511 static ConnList conns;
512 static char *path_lock, *path_flushing, *path_defer;
513 static char *path_control, *path_dump;
514 static char *globpat_backlog;
515 static pid_t self_pid;
516
517 /* statemc_init initialises */
518 static StateMachineState sms;
519 static int until_flush;
520 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
521 static FILE *defer;
522
523 /* initialisation to 0 is good */
524 static int until_connect, until_backlog_nextscan;
525 static double accept_proportion;
526 static int nocheck, nocheck_reported, in_child;
527
528 /* for simulation, debugging, etc. */
529 int simulate_flush= -1;
530
531 /*========== logging ==========*/
532
533 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
534 static void logcore(int sysloglevel, const char *fmt, ...) {
535   VA;
536   if (become_daemon) {
537     vsyslog(sysloglevel,fmt,al);
538   } else {
539     if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
540     vfprintf(stderr,fmt,al);
541     putc('\n',stderr);
542   }
543   va_end(al);
544 }
545
546 static void logv(int sysloglevel, const char *pfx, int errnoval,
547                  const char *fmt, va_list al) PRINTF(5,0);
548 static void logv(int sysloglevel, const char *pfx, int errnoval,
549                  const char *fmt, va_list al) {
550   char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
551   vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
552   msgbuf[sizeof(msgbuf)-1]= 0;
553
554   if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
555     sysloglevel= LOG_ERR; /* run by wrong user, probably */
556
557   logcore(sysloglevel, "<%s>%s: %s%s%s",
558          sitename, pfx, msgbuf,
559          errnoval>=0 ? ": " : "",
560          errnoval>=0 ? strerror(errnoval) : "");
561 }
562
563 #define diewrap(fn, pfx, sysloglevel, err, estatus)             \
564   static void fn(const char *fmt, ...) NORET_PRINTF(1,2);       \
565   static void fn(const char *fmt, ...) {                        \
566     preterminate();                                             \
567     VA;                                                         \
568     logv(sysloglevel, pfx, err, fmt, al);                       \
569     exit(estatus);                                              \
570   }
571
572 #define logwrap(fn, pfx, sysloglevel, err)              \
573   static void fn(const char *fmt, ...) PRINTF(1,2);     \
574   static void fn(const char *fmt, ...) {                \
575     VA;                                                 \
576     logv(sysloglevel, pfx, err, fmt, al);               \
577     va_end(al);                                         \
578   }
579
580 diewrap(sysdie,   " critical", LOG_CRIT,    errno, 16);
581 diewrap(die,      " critical", LOG_CRIT,    -1,    16);
582
583 diewrap(sysfatal, " fatal",    LOG_ERR,     errno, 12);
584 diewrap(fatal,    " fatal",    LOG_ERR,     -1,    12);
585
586 logwrap(syswarn,  " warning",  LOG_WARNING, errno);
587 logwrap(warn,     " warning",  LOG_WARNING, -1);
588
589 logwrap(notice,   " notice",   LOG_NOTICE,  -1);
590 logwrap(info,     " info",     LOG_INFO,    -1);
591 logwrap(debug,    " debug",    LOG_DEBUG,   -1);
592
593
594 /*========== utility functions etc. ==========*/
595
596 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
597 static char *xvasprintf(const char *fmt, va_list al) {
598   char *str;
599   int rc= vasprintf(&str,fmt,al);
600   if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
601   return str;
602 }
603 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
604 static char *xasprintf(const char *fmt, ...) {
605   VA;
606   char *str= xvasprintf(fmt,al);
607   va_end(al);
608   return str;
609 }
610
611 static int close_perhaps(int *fd) {
612   if (*fd <= 0) return 0;
613   int r= close(*fd);
614   *fd=0;
615   return r;
616 }
617 static void xclose(int fd, const char *what, const char *what2) {
618   int r= close(fd);
619   if (r) sysdie("close %s%s",what,what2?what2:"");
620 }
621 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
622   if (*fd <= 0) return;
623   xclose(*fd,what,what2);
624   *fd=0;
625 }
626
627 static pid_t xfork(const char *what) {
628   pid_t child;
629
630   child= fork();
631   if (child==-1) sysfatal("cannot fork for %s",what);
632   debug("forked %s %ld", what, (unsigned long)child);
633   if (!child) postfork();
634   return child;
635 }
636
637 static void on_fd_read_except(int fd, oop_call_fd callback) {
638   loop->on_fd(loop, fd, OOP_READ,      callback, 0);
639   loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
640 }
641 static void cancel_fd_read_except(int fd) {
642   loop->cancel_fd(loop, fd, OOP_READ);
643   loop->cancel_fd(loop, fd, OOP_EXCEPTION);
644 }
645
646 static void report_child_status(const char *what, int status) {
647   if (WIFEXITED(status)) {
648     int es= WEXITSTATUS(status);
649     if (es)
650       warn("%s: child died with error exit status %d", what, es);
651   } else if (WIFSIGNALED(status)) {
652     int sig= WTERMSIG(status);
653     const char *sigstr= strsignal(sig);
654     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
655     if (sigstr)
656       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
657     else
658       warn("%s: child died due to unknown fatal signal %d%s",
659            what, sig, coredump);
660   } else {
661     warn("%s: child died with unknown wait status %d", what,status);
662   }
663 }
664
665 static int xwaitpid(pid_t *pid, const char *what) {
666   int status;
667
668   int r= kill(*pid, SIGKILL);
669   if (r) sysdie("cannot kill %s child", what);
670
671   pid_t got= waitpid(*pid, &status, 0);
672   if (got==-1) sysdie("cannot reap %s child", what);
673   if (got==0) die("cannot reap %s child", what);
674
675   *pid= 0;
676
677   return status;
678 }
679
680 static void *zxmalloc(size_t sz) {
681   void *p= xmalloc(sz);
682   memset(p,0,sz);
683   return p;
684 }
685
686 static void xunlink(const char *path, const char *what) {
687   int r= unlink(path);
688   if (r) sysdie("can't unlink %s %s", path, what);
689 }
690
691 static time_t xtime(void) {
692   time_t now= time(0);
693   if (now==-1) sysdie("time(2) failed");
694   return now;
695 }
696
697 static void xsigaction(int signo, const struct sigaction *sa) {
698   int r= sigaction(signo,sa,0);
699   if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
700 }
701
702 static void xsigsetdefault(int signo) {
703   struct sigaction sa;
704   memset(&sa,0,sizeof(sa));
705   sa.sa_handler= SIG_DFL;
706   xsigaction(signo,&sa);
707 }
708
709 static void xgettimeofday(struct timeval *tv_r) {
710   int r= gettimeofday(tv_r,0);
711   if (r) sysdie("gettimeofday(2) failed");
712 }
713
714 static void xsetnonblock(int fd, int nonblocking) {
715   int errnoval= oop_fd_nonblock(fd, nonblocking);
716   if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
717 }
718
719 static void check_isreg(const struct stat *stab, const char *path,
720                         const char *what) {
721   if (!S_ISREG(stab->st_mode))
722     die("%s %s not a plain file (mode 0%lo)",
723         what, path, (unsigned long)stab->st_mode);
724 }
725
726 static void xfstat(int fd, struct stat *stab_r, const char *what) {
727   int r= fstat(fd, stab_r);
728   if (r) sysdie("could not fstat %s", what);
729 }
730
731 static void xfstat_isreg(int fd, struct stat *stab_r,
732                          const char *path, const char *what) {
733   xfstat(fd, stab_r, what);
734   check_isreg(stab_r, path, what);
735 }
736
737 static void xlstat_isreg(const char *path, struct stat *stab,
738                          int *enoent_r /* 0 means ENOENT is fatal */,
739                          const char *what) {
740   int r= lstat(path, stab);
741   if (r) {
742     if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
743     sysdie("could not lstat %s %s", what, path);
744   }
745   if (enoent_r) *enoent_r= 0;
746   check_isreg(stab, path, what);
747 }
748
749 static int samefile(const struct stat *a, const struct stat *b) {
750   assert(S_ISREG(a->st_mode));
751   assert(S_ISREG(b->st_mode));
752   return (a->st_ino == b->st_ino &&
753           a->st_dev == b->st_dev);
754 }
755
756 static char *sanitise(const char *input, int len) {
757   static char sanibuf[100]; /* returns pointer to this buffer! */
758
759   const char *p= input;
760   const char *endp= len>=0 ? input+len : 0;
761   char *q= sanibuf;
762   *q++= '`';
763   for (;;) {
764     if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
765     int c= (!endp || p<endp) ? *p++ : 0;
766     if (!c) { *q++= '\''; *q=0; break; }
767     if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
768     sprintf(q,"\\x%02x",c);
769     q += 4;
770   }
771   return sanibuf;
772 }
773
774 static int isewouldblock(int errnoval) {
775   return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
776 }
777
778 /*========== command and control connections ==========*/
779
780 static int control_master;
781
782 typedef struct ControlConn ControlConn;
783 struct ControlConn {
784   void (*destroy)(ControlConn*);
785   int fd;
786   oop_read *rd;
787   FILE *out;
788   union {
789     struct sockaddr sa;
790     struct sockaddr_un un;
791   } sa;
792   socklen_t salen;
793 };
794
795 static const oop_rd_style control_rd_style= {
796   OOP_RD_DELIM_STRIP, '\n',
797   OOP_RD_NUL_FORBID,
798   OOP_RD_SHORTREC_FORBID
799 };
800
801 static void control_destroy(ControlConn *cc) {
802   cc->destroy(cc);
803 }
804
805 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
806   if (ferror(cc->out) | fflush(cc->out)) {
807     info("CTRL%d write error %s", cc->fd, strerror(errno));
808     control_destroy(cc);
809   }
810 }
811
812 static void control_prompt(ControlConn *cc /* may destroy*/) {
813   fprintf(cc->out, "%s| ", sitename);
814   control_checkouterr(cc);
815 }
816
817 struct ControlCommand {
818   const char *cmd;
819   void (*f)(ControlConn *cc, const ControlCommand *ccmd,
820             const char *arg, size_t argsz);
821   void *xdata;
822   int xval;
823 };
824
825 static const ControlCommand control_commands[];
826
827 #define CCMD(wh)                                                        \
828   static void ccmd_##wh(ControlConn *cc, const ControlCommand *c,       \
829                         const char *arg, size_t argsz)
830
831 CCMD(help) {
832   fputs("commands:\n", cc->out);
833   const ControlCommand *ccmd;
834   for (ccmd=control_commands; ccmd->cmd; ccmd++)
835     fprintf(cc->out, " %s\n", ccmd->cmd);
836   fputs("NB: permissible arguments are not shown above."
837         "  Not all commands listed are safe.  See innduct(8).\n", cc->out);
838 }
839
840 CCMD(flush) {
841   int ok= trigger_flush_ok();
842   if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
843 }
844
845 CCMD(stop) {
846   preterminate();
847   notice("terminating (CTRL%d)",cc->fd);
848   raise_default(SIGTERM);
849   abort();
850 }
851
852 CCMD(dump);
853
854 /* messing with our head: */
855 CCMD(period) { period(); }
856 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
857 CCMD(setint) { *(int*)c->xdata= c->xval; }
858 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
859
860 static const ControlCommand control_commands[]= {
861   { "h",             ccmd_help      },
862   { "flush",         ccmd_flush     },
863   { "stop",          ccmd_stop      },
864   { "dump q",        ccmd_dump, 0,0 },
865   { "dump a",        ccmd_dump, 0,1 },
866
867   { "p",             ccmd_period    },
868
869 #define POKES(cmd,func)                                                 \
870   { cmd "flush",     func,           &until_flush,             1 },     \
871   { cmd "conn",      func,           &until_connect,           0 },     \
872   { cmd "blscan",    func,           &until_backlog_nextscan,  0 },
873 POKES("next ", ccmd_setint)
874 POKES("prod ", ccmd_setint_period)
875
876   { "pretend flush", ccmd_setintarg, &simulate_flush             },
877   { "wedge blscan",  ccmd_setint,    &until_backlog_nextscan, -1 },
878   { 0 }
879 };
880
881 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
882                            const char *errmsg, int errnoval,
883                            const char *data, size_t recsz, void *cc_v) {
884   ControlConn *cc= cc_v;
885
886   if (!data) {
887     info("CTRL%d closed", cc->fd);
888     cc->destroy(cc);
889     return OOP_CONTINUE;
890   }
891
892   if (recsz == 0) goto prompt;
893
894   const ControlCommand *ccmd;
895   for (ccmd=control_commands; ccmd->cmd; ccmd++) {
896     int l= strlen(ccmd->cmd);
897     if (recsz < l) continue;
898     if (recsz > l && data[l] != ' ') continue;
899     if (memcmp(data, ccmd->cmd, l)) continue;
900
901     int argl= (int)recsz - (l+1); 
902     ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
903     goto prompt;
904   }
905
906   fputs("unknown command; h for help\n", cc->out);
907
908  prompt:
909   control_prompt(cc);
910   return OOP_CONTINUE;
911 }
912
913 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
914                             const char *errmsg, int errnoval,
915                             const char *data, size_t recsz, void *cc_v) {
916   ControlConn *cc= cc_v;
917   
918   info("CTRL%d read error %s", cc->fd, errmsg);
919   cc->destroy(cc);
920   return OOP_CONTINUE;
921 }
922
923 static int control_conn_startup(ControlConn *cc /* may destroy*/,
924                                 const char *how) {
925   cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
926   if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
927
928   int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
929                       control_rd_ok, cc,
930                       control_rd_err, cc);
931   if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
932
933   info("CTRL%d %s ready", cc->fd, how);
934   control_prompt(cc);
935   return 0;
936 }
937
938 static void control_stdio_destroy(ControlConn *cc) {
939   if (cc->rd) {
940     oop_rd_cancel(cc->rd);
941     errno= oop_rd_delete_tidy(cc->rd);
942     if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
943   }
944   free(cc);
945 }
946
947 static void control_stdio(void) {
948   NEW_DECL(ControlConn *,cc);
949   cc->destroy= control_stdio_destroy;
950
951   cc->fd= 0;
952   cc->out= stdout;
953   int r= control_conn_startup(cc,"stdio");
954   if (r) cc->destroy(cc);
955 }
956
957 static void control_accepted_destroy(ControlConn *cc) {
958   if (cc->rd) {
959     oop_rd_cancel(cc->rd);
960     oop_rd_delete_kill(cc->rd);
961   }
962   if (cc->out) { fclose(cc->out); cc->fd=0; }
963   close_perhaps(&cc->fd);
964   free(cc);
965 }
966
967 static void *control_master_readable(oop_source *lp, int master,
968                                      oop_event ev, void *u) {
969   NEW_DECL(ControlConn *,cc);
970   cc->destroy= control_accepted_destroy;
971
972   cc->salen= sizeof(cc->sa);
973   cc->fd= accept(master, &cc->sa.sa, &cc->salen);
974   if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
975
976   cc->out= fdopen(cc->fd, "w");
977   if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
978
979   int r= control_conn_startup(cc, "accepted");
980   if (r) goto x;
981
982   return OOP_CONTINUE;
983
984  x:
985   cc->destroy(cc);
986   return OOP_CONTINUE;
987 }
988
989 #define NOCONTROL(...) do{                                              \
990     syswarn("no control socket, because failed to " __VA_ARGS__);       \
991     goto nocontrol;                                                     \
992   }while(0)
993
994 static void control_init(void) {
995   char *real=0;
996   
997   union {
998     struct sockaddr sa;
999     struct sockaddr_un un;
1000   } sa;
1001
1002   memset(&sa,0,sizeof(sa));
1003   int maxlen= sizeof(sa.un.sun_path);
1004
1005   int reallen= readlink(path_control, sa.un.sun_path, maxlen);
1006   if (reallen<0) {
1007     if (errno != ENOENT)
1008       NOCONTROL("readlink control socket symlink path %s", path_control);
1009   }
1010   if (reallen >= maxlen) {
1011     debug("control socket symlink path too long (r=%d)",reallen);
1012     xunlink(path_control, "old (overlong) control socket symlink");
1013     reallen= -1;
1014   }
1015   
1016   if (reallen<0) {
1017     struct stat stab;
1018     int r= lstat(realsockdir,&stab);
1019     if (r) {
1020       if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
1021
1022       r= mkdir(realsockdir, 0700);
1023       if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
1024
1025     } else {
1026       uid_t self= geteuid();
1027       if (!S_ISDIR(stab.st_mode) ||
1028           stab.st_uid != self ||
1029           stab.st_mode & 0007) {
1030         warn("no control socket, because real socket directory"
1031              " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
1032              !!S_ISDIR(stab.st_mode),
1033              (unsigned long)stab.st_uid, (unsigned long)self,
1034              (unsigned long)stab.st_mode & 0777UL);
1035         goto nocontrol;
1036       }
1037     }
1038
1039     real= xasprintf("%s/s%lx.%lx", realsockdir,
1040                     (unsigned long)xtime(), (unsigned long)self_pid);
1041     int reallen= strlen(real);
1042
1043     if (reallen >= maxlen) {
1044       warn("no control socket, because tmpnam gave overly-long path"
1045            " %s", real);
1046       goto nocontrol;
1047     }
1048     r= symlink(real, path_control);
1049     if (r) NOCONTROL("make control socket path %s a symlink to real"
1050                      " socket path %s", path_control, real);
1051     memcpy(sa.un.sun_path, real, reallen);
1052   }
1053
1054   int r= unlink(sa.un.sun_path);
1055   if (r && errno!=ENOENT)
1056     NOCONTROL("remove old real socket %s", sa.un.sun_path);
1057
1058   control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1059   if (control_master<0) NOCONTROL("create new control socket");
1060
1061   sa.un.sun_family= AF_UNIX;
1062   int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1063   r= bind(control_master, &sa.sa, sl);
1064   if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1065
1066   r= listen(control_master, 5);
1067   if (r) NOCONTROL("listen");
1068
1069   xsetnonblock(control_master, 1);
1070
1071   loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1072   info("control socket ok, real path %s", sa.un.sun_path);
1073
1074   return;
1075
1076  nocontrol:
1077   free(real);
1078   xclose_perhaps(&control_master, "control master",0);
1079   return;
1080 }
1081
1082 /*========== management of connections ==========*/
1083
1084 static void conn_closefd(Conn *conn, const char *msgprefix) {
1085   int r= close_perhaps(&conn->fd);
1086   if (r) info("C%d %serror closing socket: %s",
1087               conn->fd, msgprefix, strerror(errno));
1088 }
1089
1090 static void conn_dispose(Conn *conn) {
1091   if (!conn) return;
1092   if (conn->rd) {
1093     oop_rd_cancel(conn->rd);
1094     oop_rd_delete_kill(conn->rd);
1095     conn->rd= 0;
1096   }
1097   if (conn->fd) {
1098     loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1099     loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1100   }
1101   conn_closefd(conn,"");
1102   free(conn);
1103   until_connect= reconnect_delay_periods;
1104 }
1105
1106 static void *conn_exception(oop_source *lp, int fd,
1107                             oop_event ev, void *conn_v) {
1108   Conn *conn= conn_v;
1109   unsigned char ch;
1110   assert(fd == conn->fd);
1111   assert(ev == OOP_EXCEPTION);
1112   int r= read(conn->fd, &ch, 1);
1113   if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1114   else connfail(conn,"exceptional condition on socket (peer sent urgent"
1115                 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1116   return OOP_CONTINUE;
1117 }  
1118
1119 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1120   int requeue[art_MaxState];
1121   memset(requeue,0,sizeof(requeue));
1122
1123   Article *art;
1124   
1125   while ((art= LIST_REMHEAD(conn->priority)))
1126     LIST_ADDTAIL(art->ipf->queue, art);
1127
1128   while ((art= LIST_REMHEAD(conn->waiting)))
1129     LIST_ADDTAIL(art->ipf->queue, art);
1130
1131   while ((art= LIST_REMHEAD(conn->sent))) {
1132     requeue[art->state]++;
1133     if (art->state==art_Unsolicited) art->state= art_Unchecked;
1134     LIST_ADDTAIL(art->ipf->queue,art);
1135   }
1136
1137   int i;
1138   XmitDetails *d;
1139   for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1140     xmit_free(d);
1141
1142   char *m= xvasprintf(fmt,al);
1143   warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1144        conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1145   free(m);
1146
1147   LIST_REMOVE(conns,conn);
1148   conn_dispose(conn);
1149   check_assign_articles();
1150 }
1151
1152 static void connfail(Conn *conn, const char *fmt, ...) {
1153   va_list al;
1154   va_start(al,fmt);
1155   vconnfail(conn,fmt,al);
1156   va_end(al);
1157 }
1158
1159 static void check_idle_conns(void) {
1160   Conn *conn;
1161   FOR_CONN(conn)
1162     conn->since_activity++;
1163  search_again:
1164   FOR_CONN(conn) {
1165     if (conn->since_activity <= need_activity_periods) continue;
1166
1167     /* We need to shut this down */
1168     if (conn->quitting)
1169       connfail(conn,"timed out waiting for response to QUIT");
1170     else if (conn->sent.count)
1171       connfail(conn,"timed out waiting for responses");
1172     else if (conn->waiting.count || conn->priority.count)
1173       connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1174     else if (conn->xmitu)
1175       connfail(conn,"peer has been sending responses"
1176                " before receiving our commands!");
1177     else {
1178       static const char quitcmd[]= "QUIT\r\n";
1179       int todo= sizeof(quitcmd)-1;
1180       const char *p= quitcmd;
1181       for (;;) {
1182         int r= write(conn->fd, p, todo);
1183         if (r<0) {
1184           if (isewouldblock(errno))
1185             connfail(conn, "blocked writing QUIT to idle connection");
1186           else
1187             connfail(conn, "failed to write QUIT to idle connection: %s",
1188                      strerror(errno));
1189           break;
1190         }
1191         assert(r<=todo);
1192         todo -= r;
1193         if (!todo) {
1194           conn->quitting= 1;
1195           conn->since_activity= 0;
1196           debug("C%d is idle, quitting", conn->fd);
1197           break;
1198         }
1199       }
1200     }
1201     goto search_again;
1202   }
1203 }  
1204
1205 /*---------- making new connections ----------*/
1206
1207 static pid_t connecting_child;
1208 static int connecting_fdpass_sock;
1209
1210 static void connect_attempt_discard(void) {
1211   if (connecting_child) {
1212     int status= xwaitpid(&connecting_child, "connect");
1213     if (!(WIFEXITED(status) ||
1214           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1215       report_child_status("connect", status);
1216   }
1217   if (connecting_fdpass_sock) {
1218     cancel_fd_read_except(connecting_fdpass_sock);
1219     xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1220   }
1221 }
1222
1223 #define PREP_DECL_MSG_CMSG(msg)                 \
1224   char msgbyte= 0;                              \
1225   struct iovec msgiov;                          \
1226   msgiov.iov_base= &msgbyte;                    \
1227   msgiov.iov_len= 1;                            \
1228   struct msghdr msg;                            \
1229   memset(&msg,0,sizeof(msg));                   \
1230   char msg##cbuf[CMSG_SPACE(sizeof(int))];      \
1231   msg.msg_iov= &msgiov;                         \
1232   msg.msg_iovlen= 1;                            \
1233   msg.msg_control= msg##cbuf;                   \
1234   msg.msg_controllen= sizeof(msg##cbuf);
1235
1236 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1237   Conn *conn= 0;
1238
1239   assert(fd == connecting_fdpass_sock);
1240
1241   PREP_DECL_MSG_CMSG(msg);
1242   
1243   ssize_t rs= recvmsg(fd, &msg, 0);
1244   if (rs<0) {
1245     if (isewouldblock(errno)) return OOP_CONTINUE;
1246     syswarn("failed to read socket from connecting child");
1247     goto x;
1248   }
1249
1250   NEW(conn);
1251   LIST_INIT(conn->waiting);
1252   LIST_INIT(conn->priority);
1253   LIST_INIT(conn->sent);
1254
1255   struct cmsghdr *h= 0;
1256   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1257   if (!h) {
1258     int status= xwaitpid(&connecting_child, "connect child (broken)");
1259
1260     if (WIFEXITED(status)) {
1261       if (WEXITSTATUS(status) != 0 &&
1262           WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1263           WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1264         /* child already reported the problem */;
1265       else {
1266         if (e == OOP_EXCEPTION)
1267           warn("connect: connection child exited code %d but"
1268                " unexpected exception on fdpass socket",
1269                WEXITSTATUS(status));
1270         else
1271           warn("connect: connection child exited code %d but"
1272                " no cmsg (rs=%d)",
1273                WEXITSTATUS(status), (int)rs);
1274       }
1275     } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1276       warn("connect: connection attempt timed out");
1277     } else {
1278       report_child_status("connect", status);
1279     }
1280     goto x;
1281   }
1282
1283 #define CHK(field, val)                                                  \
1284   if (h->cmsg_##field != val) {                                          \
1285     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1286         h->cmsg_##field, val);                                           \
1287     goto x;                                                              \
1288   }
1289   CHK(level, SOL_SOCKET);
1290   CHK(type,  SCM_RIGHTS);
1291   CHK(len,   CMSG_LEN(sizeof(conn->fd)));
1292 #undef CHK
1293
1294   if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1295
1296   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1297
1298   int status;
1299   pid_t got= waitpid(connecting_child, &status, 0);
1300   if (got==-1) sysdie("connect: real wait for child");
1301   assert(got == connecting_child);
1302   connecting_child= 0;
1303
1304   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1305   int es= WEXITSTATUS(status);
1306   switch (es) {
1307   case CONNCHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
1308   case CONNCHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
1309   default:
1310     fatal("connect: child gave unexpected exit status %d", es);
1311   }
1312
1313   /* Phew! */
1314   conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1315
1316   loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1317   conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1318   if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1319   int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1320                      &peer_rd_ok, conn,
1321                      &peer_rd_err, conn);
1322   if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1323
1324   notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1325   LIST_ADDHEAD(conns, conn);
1326
1327   connect_attempt_discard();
1328   check_assign_articles();
1329   return OOP_CONTINUE;
1330
1331  x:
1332   conn_dispose(conn);
1333   connect_attempt_discard();
1334   return OOP_CONTINUE;
1335 }
1336
1337 static int allow_connect_start(void) {
1338   return conns.count < max_connections
1339     && !connecting_child
1340     && !until_connect;
1341 }
1342
1343 static void connect_start(void) {
1344   assert(!connecting_child);
1345   assert(!connecting_fdpass_sock);
1346
1347   info("starting connection attempt");
1348
1349   int socks[2];
1350   int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1351   if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1352
1353   connecting_child= xfork("connection");
1354
1355   if (!connecting_child) {
1356     FILE *cn_from, *cn_to;
1357     char buf[NNTP_STRLEN+100];
1358     int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1359
1360     xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1361
1362     alarm(connection_setup_timeout);
1363     if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1364       int l= strlen(buf);
1365       int stripped=0;
1366       while (l>0) {
1367         unsigned char c= buf[l-1];
1368         if (!isspace(c)) break;
1369         if (c=='\n' || c=='\r') stripped=1;
1370         --l;
1371       }
1372       if (!buf[0]) {
1373         sysfatal("connect: connection attempt failed");
1374       } else {
1375         buf[l]= 0;
1376         fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1377               sanitise(buf,-1));
1378       }
1379     }
1380     if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1381       sysfatal("connect: authentication failed");
1382     if (try_stream) {
1383       if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1384           fflush(cn_to))
1385         sysfatal("connect: could not send MODE STREAM");
1386       buf[sizeof(buf)-1]= 0;
1387       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1388         if (ferror(cn_from))
1389           sysfatal("connect: could not read response to MODE STREAM");
1390         else
1391           fatal("connect: connection close in response to MODE STREAM");
1392       }
1393       int l= strlen(buf);
1394       assert(l>=1);
1395       if (buf[l-1]!='\n')
1396         fatal("connect: response to MODE STREAM is too long: %.100s...",
1397               sanitise(buf,-1));
1398       l--;  if (l>0 && buf[l-1]=='\r') l--;
1399       buf[l]= 0;
1400       char *ep;
1401       int rcode= strtoul(buf,&ep,10);
1402       if (ep != &buf[3])
1403         fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1404
1405       switch (rcode) {
1406       case 203:
1407         exitstatus= CONNCHILD_ESTATUS_STREAM;
1408         break;
1409       case 480:
1410       case 500:
1411         break;
1412       default:
1413         warn("connect: unexpected response to MODE STREAM: %.50s",
1414              sanitise(buf,-1));
1415         exitstatus= 2;
1416         break;
1417       }
1418     }
1419     int fd= fileno(cn_from);
1420
1421     PREP_DECL_MSG_CMSG(msg);
1422     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1423     cmsg->cmsg_level= SOL_SOCKET;
1424     cmsg->cmsg_type=  SCM_RIGHTS;
1425     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
1426     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1427
1428     msg.msg_controllen= cmsg->cmsg_len;
1429     r= sendmsg(socks[1], &msg, 0);
1430     if (r<0) sysdie("sendmsg failed for new connection");
1431     if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1432
1433     _exit(exitstatus);
1434   }
1435
1436   xclose(socks[1], "connecting fdpass child's socket",0);
1437   connecting_fdpass_sock= socks[0];
1438   xsetnonblock(connecting_fdpass_sock, 1);
1439   on_fd_read_except(connecting_fdpass_sock, connchild_event);
1440 }
1441
1442 /*---------- assigning articles to conns, and transmitting ----------*/
1443
1444 static Article *dequeue_from(int peek, InputFile *ipf) {
1445   if (!ipf) return 0;
1446   if (peek) return LIST_HEAD(ipf->queue);
1447   else return LIST_REMHEAD(ipf->queue);
1448 }
1449
1450 static Article *dequeue(int peek) {
1451   Article *art;
1452   art= dequeue_from(peek, flushing_input_file);  if (art) return art;
1453   art= dequeue_from(peek, backlog_input_file);   if (art) return art;
1454   art= dequeue_from(peek, main_input_file);      if (art) return art;
1455   return 0;
1456 }
1457
1458 static void check_assign_articles(void) {
1459   for (;;) {
1460     if (!dequeue(1))
1461       break;
1462
1463     Conn *walk, *use=0;
1464     int spare=0, inqueue=0;
1465
1466     /* Find a connection to offer this article.  We prefer a busy
1467      * connection to an idle one, provided it's not full.  We take the
1468      * first (oldest) and since that's stable, it will mean we fill up
1469      * connections in order.  That way if we have too many
1470      * connections, the spare ones will go away eventually.
1471      */
1472     FOR_CONN(walk) {
1473       if (walk->quitting) continue;
1474       inqueue= walk->sent.count + walk->priority.count
1475              + walk->waiting.count;
1476       spare= walk->max_queue - inqueue;
1477       assert(inqueue <= max_queue_per_conn);
1478       assert(spare >= 0);
1479       if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1480       else if (spare>0) /*working*/ { use= walk; break; }
1481     }
1482     if (use) {
1483       if (!inqueue) use->since_activity= 0; /* reset idle counter */
1484       while (spare>0) {
1485         Article *art= dequeue(0);
1486         if (!art) break;
1487         LIST_ADDTAIL(use->waiting, art);
1488         spare--;
1489       }
1490       conn_maybe_write(use);
1491     } else if (allow_connect_start()) {
1492       until_connect= reconnect_delay_periods;
1493       connect_start();
1494       break;
1495     } else {
1496       break;
1497     }
1498   }
1499 }
1500
1501 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1502   conn_maybe_write(u);
1503   return OOP_CONTINUE;
1504 }
1505
1506 static void conn_maybe_write(Conn *conn)  {
1507   for (;;) {
1508     conn_make_some_xmits(conn);
1509     if (!conn->xmitu) {
1510       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1511       return;
1512     }
1513
1514     void *rp= conn_write_some_xmits(conn);
1515     if (rp==OOP_CONTINUE) {
1516       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1517       return;
1518     } else if (rp==OOP_HALT) {
1519       return;
1520     } else if (!rp) {
1521       /* transmitted everything */
1522     } else {
1523       abort();
1524     }
1525   }
1526 }
1527
1528 static int article_check_expired(Article *art /* must be queued, not conn */) {
1529   ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
1530   if (artdata) { SMfreearticle(artdata); return 0; }
1531
1532   LIST_REMOVE(art->ipf->queue, art);
1533   art->missing= 1;
1534   art->ipf->counts[art_Unchecked][RC_missing]++;
1535   article_done(art,-1);
1536   return 1;
1537 }
1538
1539 static void inputfile_queue_check_expired(InputFile *ipf) {
1540   if (!ipf) return;
1541
1542   for (;;) {
1543     Article *art= LIST_HEAD(ipf->queue);
1544     int exp= article_check_expired(art);
1545     if (!exp) break;
1546   }
1547 }
1548
1549 /*========== article transmission ==========*/
1550
1551 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1552                   XmitKind kind) { /* caller must then fill in details */
1553   struct iovec *v= &conn->xmit[conn->xmitu];
1554   XmitDetails *d= &conn->xmitd[conn->xmitu++];
1555   v->iov_base= (char*)data;
1556   v->iov_len= len;
1557   d->kind= kind;
1558   return d;
1559 }
1560
1561 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1562   xmit_core(conn,data,len, xk_Const);
1563 }
1564 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1565
1566 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1567   XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1568   d->info.sm_art= ah;
1569 }
1570
1571 static void xmit_free(XmitDetails *d) {
1572   switch (d->kind) {
1573   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1574   case xk_Const:                                  break;
1575   default: abort();
1576   }
1577 }
1578
1579 static void *conn_write_some_xmits(Conn *conn) {
1580   /* return values:
1581    *      0:            nothing more to write, no need to call us again
1582    *      OOP_CONTINUE: more to write but fd not writeable
1583    *      OOP_HALT:     disaster, have destroyed conn
1584    */
1585   for (;;) {
1586     int count= conn->xmitu;
1587     if (!count) return 0;
1588
1589     if (count > IOV_MAX) count= IOV_MAX;
1590     ssize_t rs= writev(conn->fd, conn->xmit, count);
1591     if (rs < 0) {
1592       if (isewouldblock(errno)) return OOP_CONTINUE;
1593       connfail(conn, "write failed: %s", strerror(errno));
1594       return OOP_HALT;
1595     }
1596     assert(rs > 0);
1597
1598     int done;
1599     for (done=0; rs && done<conn->xmitu; done++) {
1600       struct iovec *vp= &conn->xmit[done];
1601       XmitDetails *dp= &conn->xmitd[done];
1602       if (rs > vp->iov_len) {
1603         rs -= vp->iov_len;
1604         xmit_free(dp);
1605       } else {
1606         vp->iov_base= (char*)vp->iov_base + rs;
1607         vp->iov_len -= rs;
1608       }
1609     }
1610     int newu= conn->xmitu - done;
1611     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
1612     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1613     conn->xmitu= newu;
1614   }
1615 }
1616
1617 static void conn_make_some_xmits(Conn *conn) {
1618   for (;;) {
1619     if (conn->xmitu+5 > CONNIOVS)
1620       break;
1621
1622     Article *art= LIST_REMHEAD(conn->priority);
1623     if (!art) art= LIST_REMHEAD(conn->waiting);
1624     if (!art) break;
1625
1626     if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1627       /* actually send it */
1628
1629       ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1630
1631       art->state=
1632         art->state == art_Unchecked ? art_Unsolicited :
1633         art->state == art_Wanted    ? art_Wanted      :
1634         (abort(),-1);
1635
1636       if (!artdata) art->missing= 1;
1637       art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1638
1639       if (conn->stream) {
1640         if (artdata) {
1641           XMIT_LITERAL("TAKETHIS ");
1642           xmit_noalloc(conn, art->messageid, art->midlen);
1643           XMIT_LITERAL("\r\n");
1644           xmit_artbody(conn, artdata);
1645         } else {
1646           article_done(art, -1);
1647           continue;
1648         }
1649       } else {
1650         /* we got 235 from IHAVE */
1651         if (artdata) {
1652           xmit_artbody(conn, artdata);
1653         } else {
1654           XMIT_LITERAL(".\r\n");
1655         }
1656       }
1657
1658       LIST_ADDTAIL(conn->sent, art);
1659
1660     } else {
1661       /* check it */
1662
1663       if (conn->stream)
1664         XMIT_LITERAL("CHECK ");
1665       else
1666         XMIT_LITERAL("IHAVE ");
1667       xmit_noalloc(conn, art->messageid, art->midlen);
1668       XMIT_LITERAL("\r\n");
1669
1670       assert(art->state == art_Unchecked);
1671       art->ipf->counts[art->state][RC_sent]++;
1672       LIST_ADDTAIL(conn->sent, art);
1673     }
1674   }
1675 }
1676
1677
1678 /*========== handling responses from peer ==========*/
1679
1680 static const oop_rd_style peer_rd_style= {
1681   OOP_RD_DELIM_STRIP, '\n',
1682   OOP_RD_NUL_FORBID,
1683   OOP_RD_SHORTREC_FORBID
1684 };
1685
1686 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1687                          const char *errmsg, int errnoval,
1688                          const char *data, size_t recsz, void *conn_v) {
1689   Conn *conn= conn_v;
1690   connfail(conn, "error receiving from peer: %s", errmsg);
1691   return OOP_CONTINUE;
1692 }
1693
1694 static Article *article_reply_check(Conn *conn, const char *response,
1695                                     int code_indicates_streaming,
1696                                     int must_have_sent
1697                                         /* 1:yes, -1:no, 0:dontcare */,
1698                                     const char *sanitised_response) {
1699   Article *art= LIST_HEAD(conn->sent);
1700
1701   if (!art) {
1702     connfail(conn,
1703              "peer gave unexpected response when no commands outstanding: %s",
1704              sanitised_response);
1705     return 0;
1706   }
1707
1708   if (code_indicates_streaming) {
1709     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1710     if (!conn->stream) {
1711       connfail(conn, "peer gave streaming response code "
1712                " to IHAVE or subsequent body: %s", sanitised_response);
1713       return 0;
1714     }
1715     const char *got_mid= response+4;
1716     int got_midlen= strcspn(got_mid, " \n\r");
1717     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1718       connfail(conn, "peer gave streaming response with syntactically invalid"
1719                " messageid: %s", sanitised_response);
1720       return 0;
1721     }
1722     if (got_midlen != art->midlen ||
1723         memcmp(got_mid, art->messageid, got_midlen)) {
1724       connfail(conn, "peer gave streaming response code to wrong article -"
1725                " probable synchronisation problem; we offered: %s;"
1726                " peer said: %s",
1727                art->messageid, sanitised_response);
1728       return 0;
1729     }
1730   } else {
1731     if (conn->stream) {
1732       connfail(conn, "peer gave non-streaming response code to"
1733                " CHECK/TAKETHIS: %s", sanitised_response);
1734       return 0;
1735     }
1736   }
1737
1738   if (must_have_sent>0 && art->state < art_Wanted) {
1739     connfail(conn, "peer says article accepted but"
1740              " we had not sent the body: %s", sanitised_response);
1741     return 0;
1742   }
1743   if (must_have_sent<0 && art->state >= art_Wanted) {
1744     connfail(conn, "peer says please sent the article but we just did: %s",
1745              sanitised_response);
1746     return 0;
1747   }
1748
1749   Article *art_again= LIST_REMHEAD(conn->sent);
1750   assert(art_again == art);
1751   return art;
1752 }
1753
1754 static void update_nocheck(int accepted) {
1755   accept_proportion *= nocheck_decay;
1756   accept_proportion += accepted * (1.0 - nocheck_decay);
1757   int new_nocheck= accept_proportion >= nocheck_thresh;
1758   if (new_nocheck && !nocheck_reported) {
1759     notice("entering nocheck mode for the first time");
1760     nocheck_reported= 1;
1761   } else if (new_nocheck != nocheck) {
1762     debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1763   }
1764   nocheck= new_nocheck;
1765 }
1766
1767 static void article_done(Article *art, int whichcount) {
1768   if (!art->missing) art->ipf->counts[art->state][whichcount]++;
1769
1770   if (whichcount == RC_accepted) update_nocheck(1);
1771   else if (whichcount == RC_unwanted) update_nocheck(0);
1772
1773   InputFile *ipf= art->ipf;
1774
1775   while (art->blanklen) {
1776     static const char spaces[]=
1777       "                                                                "
1778       "                                                                "
1779       "                                                                "
1780       "                                                                "
1781       "                                                                "
1782       "                                                                "
1783       "                                                                "
1784       "                                                                "
1785       "                                                                ";
1786     int w= art->blanklen;  if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1787     int r= pwrite(ipf->fd, spaces, w, art->offset);
1788     if (r==-1) {
1789       if (errno==EINTR) continue;
1790       sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1791              art->messageid, art->blanklen,
1792              (unsigned long)art->offset, ipf->path);
1793     }
1794     assert(r>=0 && r<=w);
1795     art->blanklen -= w;
1796     art->offset += w;
1797   }
1798
1799   ipf->inprogress--;
1800   assert(ipf->inprogress >= 0);
1801   free(art);
1802
1803   if (!ipf->inprogress && ipf != main_input_file)
1804     queue_check_input_done();
1805 }
1806
1807 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1808                         const char *errmsg, int errnoval,
1809                         const char *data, size_t recsz, void *conn_v) {
1810   Conn *conn= conn_v;
1811
1812   if (ev == OOP_RD_EOF) {
1813     connfail(conn, "unexpected EOF from peer");
1814     return OOP_CONTINUE;
1815   }
1816   assert(ev == OOP_RD_OK);
1817
1818   char *sani= sanitise(data,-1);
1819
1820   char *ep;
1821   unsigned long code= strtoul(data, &ep, 10);
1822   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1823     connfail(conn, "badly formatted response from peer: %s", sani);
1824     return OOP_CONTINUE;
1825   }
1826
1827   int conn_busy=
1828     conn->waiting.count ||
1829     conn->priority.count ||
1830     conn->sent.count ||
1831     conn->xmitu;
1832
1833   if (conn->quitting) {
1834     if (code!=205 && code!=503) {
1835       connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1836     } else {
1837       notice("C%d idle connection closed by us", conn->fd);
1838       assert(!conn_busy);
1839       LIST_REMOVE(conns,conn);
1840       conn_dispose(conn);
1841     }
1842     return OOP_CONTINUE;
1843   }
1844
1845   conn->since_activity= 0;
1846   Article *art;
1847
1848 #define GET_ARTICLE(musthavesent) do{                                         \
1849     art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1850     if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
1851   }while(0) 
1852
1853 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{       \
1854     code_streaming= (streaming);                                \
1855     GET_ARTICLE(musthavesent);                                  \
1856     article_done(art, RC_##how);                                \
1857     goto dealtwith;                                             \
1858   }while(0)
1859
1860 #define PEERBADMSG(m) do {                                      \
1861     connfail(conn, m ": %s", sani);  return OOP_CONTINUE;       \
1862   }while(0)
1863
1864   int code_streaming= 0;
1865
1866   switch (code) {
1867
1868   case 400: PEERBADMSG("peer stopped accepting articles");
1869   default:  PEERBADMSG("peer sent unexpected message");
1870
1871   case 503:
1872     if (conn_busy) PEERBADMSG("peer timed us out");
1873     notice("C%d idle connection closed by peer", conn->fd);
1874     LIST_REMOVE(conns,conn);
1875     conn_dispose(conn);
1876     return OOP_CONTINUE;
1877
1878   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1879   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1880
1881   case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1882   case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1883
1884   case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1885   case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1886
1887   case 238: /* CHECK says send it */
1888     code_streaming= 1;
1889   case 335: /* IHAVE says send it */
1890     GET_ARTICLE(-1);
1891     assert(art->state == art_Unchecked);
1892     art->ipf->counts[art->state][RC_accepted]++;
1893     art->state= art_Wanted;
1894     LIST_ADDTAIL(conn->priority, art);
1895     break;
1896
1897   case 431: /* CHECK or TAKETHIS says try later */
1898     code_streaming= 1;
1899   case 436: /* IHAVE says try later */
1900     GET_ARTICLE(0);
1901     open_defer();
1902     if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1903         || fflush(defer))
1904       sysfatal("write to defer file %s",path_defer);
1905     article_done(art, RC_deferred);
1906     break;
1907
1908   }
1909 dealtwith:
1910
1911   conn_maybe_write(conn);
1912   check_assign_articles();
1913   return OOP_CONTINUE;
1914 }
1915
1916
1917 /*========== monitoring of input files ==========*/
1918
1919 static void feedfile_eof(InputFile *ipf) {
1920   assert(ipf != main_input_file); /* promised by tailing_try_read */
1921   inputfile_reading_stop(ipf);
1922
1923   if (ipf == flushing_input_file) {
1924     assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1925     if (main_input_file) inputfile_reading_start(main_input_file);
1926     statemc_check_flushing_done();
1927   } else if (ipf == backlog_input_file) {
1928     statemc_check_backlog_done();
1929   } else {
1930     abort(); /* supposed to wait rather than get EOF on main input file */
1931   }
1932 }
1933
1934 static InputFile *open_input_file(const char *path) {
1935   int fd= open(path, O_RDWR);
1936   if (fd<0) {
1937     if (errno==ENOENT) return 0;
1938     sysfatal("unable to open input file %s", path);
1939   }
1940   assert(fd>0);
1941
1942   InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
1943   memset(ipf,0,sizeof(*ipf));
1944
1945   ipf->fd= fd;
1946   strcpy(ipf->path, path);
1947   LIST_INIT(ipf->queue);
1948
1949   return ipf;
1950 }
1951
1952 static void close_input_file(InputFile *ipf) { /* does not free */
1953   assert(!ipf->readable_callback); /* must have had ->on_cancel */
1954   assert(!ipf->filemon); /* must have had inputfile_reading_stop */
1955   assert(!ipf->rd); /* must have had inputfile_reading_stop */
1956   assert(!ipf->inprogress); /* no dangling pointers pointing here */
1957   xclose_perhaps(&ipf->fd, "input file ", ipf->path);
1958 }
1959
1960
1961 /*---------- dealing with articles read in the input file ----------*/
1962
1963 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
1964                                    const char *data, const char *how) {
1965   warn("corrupted file: %s, offset %lu: %s: in %s",
1966        ipf->path, (unsigned long)offset, how, sanitise(data,-1));
1967   ipf->readcount_err++;
1968   if (ipf->readcount_err > max_bad_data_initial +
1969       (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
1970     die("too much garbage in input file!  (%d errs, %d ok, %d blank)",
1971         ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
1972   return OOP_CONTINUE;
1973 }
1974
1975 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
1976                                oop_rd_event ev, const char *errmsg,
1977                                int errnoval, const char *data, size_t recsz,
1978                                void *ipf_v) {
1979   InputFile *ipf= ipf_v;
1980   assert(ev == OOP_RD_SYSTEM);
1981   errno= errnoval;
1982   sysdie("error reading input file: %s, offset %lu",
1983          ipf->path, (unsigned long)ipf->offset);
1984 }
1985
1986 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
1987                                   oop_rd_event ev, const char *errmsg,
1988                                   int errnoval, const char *data, size_t recsz,
1989                                   void *ipf_v) {
1990   InputFile *ipf= ipf_v;
1991   Article *art;
1992   char tokentextbuf[sizeof(TOKEN)*2+3];
1993
1994   if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
1995
1996   off_t old_offset= ipf->offset;
1997   ipf->offset += recsz + !!(ev == OOP_RD_OK);
1998
1999 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
2000
2001   if (ev==OOP_RD_PARTREC)
2002     feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
2003     /* but process it anyway */
2004
2005   if (ipf->skippinglong) {
2006     if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
2007     return OOP_CONTINUE;
2008   }
2009   if (ev==OOP_RD_LONG) {
2010     ipf->skippinglong= 1;
2011     X_BAD_DATA("overly long line");
2012   }
2013
2014   if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
2015   if (!recsz) X_BAD_DATA("empty line");
2016
2017   if (data[0]==' ') {
2018     if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
2019     ipf->readcount_blank++;
2020     return OOP_CONTINUE;
2021   }
2022
2023   char *space= strchr(data,' ');
2024   int tokenlen= space-data;
2025   int midlen= (int)recsz-tokenlen-1;
2026   if (midlen <= 2) X_BAD_DATA("no room for messageid");
2027   if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
2028
2029   if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
2030   memcpy(tokentextbuf, data, tokenlen);
2031   tokentextbuf[tokenlen]= 0;
2032   if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
2033
2034   ipf->readcount_ok++;
2035
2036   art= xmalloc(sizeof(*art) - 1 + midlen + 1);
2037   memset(art,0,sizeof(*art));
2038   art->state= art_Unchecked;
2039   art->midlen= midlen;
2040   art->ipf= ipf;  ipf->inprogress++;
2041   art->token= TextToToken(tokentextbuf);
2042   art->offset= old_offset;
2043   art->blanklen= recsz;
2044   strcpy(art->messageid, space+1);
2045   LIST_ADDTAIL(ipf->queue, art);
2046
2047   if (ipf==backlog_input_file)
2048     article_check_expired(art);
2049
2050   if (sms==sm_NORMAL && ipf==main_input_file &&
2051       ipf->offset >= target_max_feedfile_size)
2052     statemc_start_flush("feed file size");
2053
2054   check_assign_articles();
2055   return OOP_CONTINUE;
2056 }
2057
2058 /*========== tailing input file ==========*/
2059
2060 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
2061                                      void *user) {
2062   InputFile *ipf= user;
2063   return ipf->readable_callback(loop, &ipf->readable,
2064                                 ipf->readable_callback_user);
2065 }
2066
2067 static void tailing_on_cancel(struct oop_readable *rable) {
2068   InputFile *ipf= (void*)rable;
2069
2070   if (ipf->filemon) filemon_stop(ipf);
2071   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2072   ipf->readable_callback= 0;
2073 }
2074
2075 static void tailing_queue_readable(InputFile *ipf) {
2076   /* lifetime of ipf here is OK because destruction will cause
2077    * on_cancel which will cancel this callback */
2078   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2079 }
2080
2081 static int tailing_on_readable(struct oop_readable *rable,
2082                                 oop_readable_call *cb, void *user) {
2083   InputFile *ipf= (void*)rable;
2084
2085   tailing_on_cancel(rable);
2086   ipf->readable_callback= cb;
2087   ipf->readable_callback_user= user;
2088   filemon_start(ipf);
2089
2090   tailing_queue_readable(ipf);
2091   return 0;
2092 }
2093
2094 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2095                                 size_t length) {
2096   InputFile *ipf= (void*)rable;
2097   for (;;) {
2098     ssize_t r= read(ipf->fd, buffer, length);
2099     if (r==-1) {
2100       if (errno==EINTR) continue;
2101       return r;
2102     }
2103     if (!r) {
2104       if (ipf==main_input_file) {
2105         errno=EAGAIN;
2106         return -1;
2107       } else if (ipf==flushing_input_file) {
2108         assert(ipf->rd);
2109         assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2110       } else if (ipf==backlog_input_file) {
2111         assert(ipf->rd);
2112       } else {
2113         abort();
2114       }
2115     }
2116     tailing_queue_readable(ipf);
2117     return r;
2118   }
2119 }
2120
2121 /*---------- filemon implemented with inotify ----------*/
2122
2123 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2124 #define HAVE_FILEMON
2125
2126 #include <sys/inotify.h>
2127
2128 static int filemon_inotify_fd;
2129 static int filemon_inotify_wdmax;
2130 static InputFile **filemon_inotify_wd2ipf;
2131
2132 struct Filemon_Perfile {
2133   int wd;
2134 };
2135
2136 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2137   int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2138   if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2139
2140   if (wd >= filemon_inotify_wdmax) {
2141     int newmax= wd+2;
2142     filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2143                                  sizeof(*filemon_inotify_wd2ipf) * newmax);
2144     memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2145            sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2146     filemon_inotify_wdmax= newmax;
2147   }
2148
2149   assert(!filemon_inotify_wd2ipf[wd]);
2150   filemon_inotify_wd2ipf[wd]= ipf;
2151
2152   debug("filemon inotify startfile %p wd=%d wdmax=%d",
2153         ipf, wd, filemon_inotify_wdmax);
2154
2155   pf->wd= wd;
2156 }
2157
2158 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2159   int wd= pf->wd;
2160   debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2161   int r= inotify_rm_watch(filemon_inotify_fd, wd);
2162   if (r) sysdie("inotify_rm_watch");
2163   filemon_inotify_wd2ipf[wd]= 0;
2164 }
2165
2166 static void *filemon_inotify_readable(oop_source *lp, int fd,
2167                                       oop_event e, void *u) {
2168   struct inotify_event iev;
2169   for (;;) {
2170     int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2171     if (r==-1) {
2172       if (isewouldblock(errno)) break;
2173       sysdie("read from inotify master");
2174     } else if (r==sizeof(iev)) {
2175       assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2176     } else {
2177       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2178     }
2179     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2180     debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2181     filemon_callback(ipf);
2182   }
2183   return OOP_CONTINUE;
2184 }
2185
2186 static int filemon_method_init(void) {
2187   filemon_inotify_fd= inotify_init();
2188   if (filemon_inotify_fd<0) {
2189     syswarn("filemon/inotify: inotify_init failed");
2190     return 0;
2191   }
2192   xsetnonblock(filemon_inotify_fd, 1);
2193   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2194
2195   debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2196   return 1;
2197 }
2198
2199 static void filemon_method_dump_info(FILE *f) {
2200   int i;
2201   fprintf(f,"inotify");
2202   DUMPV("%d",,filemon_inotify_fd);
2203   DUMPV("%d",,filemon_inotify_wdmax);
2204   for (i=0; i<filemon_inotify_wdmax; i++)
2205     fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i],);
2206 }
2207
2208 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2209
2210 /*---------- filemon dummy implementation ----------*/
2211
2212 #if !defined(HAVE_FILEMON)
2213
2214 struct Filemon_Perfile { int dummy; };
2215
2216 static int filemon_method_init(void) {
2217   warn("filemon/dummy: no filemon method compiled in");
2218   return 0;
2219 }
2220 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2221 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2222 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2223
2224 #endif /* !HAVE_FILEMON */
2225
2226 /*---------- filemon generic interface ----------*/
2227
2228 static void filemon_start(InputFile *ipf) {
2229   assert(!ipf->filemon);
2230
2231   NEW(ipf->filemon);
2232   filemon_method_startfile(ipf, ipf->filemon);
2233 }
2234
2235 static void filemon_stop(InputFile *ipf) {
2236   if (!ipf->filemon) return;
2237   filemon_method_stopfile(ipf, ipf->filemon);
2238   free(ipf->filemon);
2239   ipf->filemon= 0;
2240 }
2241
2242 static void filemon_callback(InputFile *ipf) {
2243   if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2244     ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2245 }
2246
2247 /*---------- interface to start and stop an input file ----------*/
2248
2249 static const oop_rd_style feedfile_rdstyle= {
2250   OOP_RD_DELIM_STRIP, '\n',
2251   OOP_RD_NUL_PERMIT,
2252   OOP_RD_SHORTREC_LONG,
2253 };
2254
2255 static void inputfile_reading_start(InputFile *ipf) {
2256   assert(!ipf->rd);
2257   ipf->readable.on_readable= tailing_on_readable;
2258   ipf->readable.on_cancel=   tailing_on_cancel;
2259   ipf->readable.try_read=    tailing_try_read;
2260   ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2261   ipf->readable.delete_kill= 0;
2262
2263   ipf->readable_callback= 0;
2264   ipf->readable_callback_user= 0;
2265
2266   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2267   assert(ipf->rd);
2268
2269   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2270                      feedfile_got_article,ipf, feedfile_read_err, ipf);
2271   if (r) sysdie("unable start reading feedfile %s",ipf->path);
2272 }
2273
2274 static void inputfile_reading_stop(InputFile *ipf) {
2275   assert(ipf->rd);
2276   oop_rd_cancel(ipf->rd);
2277   oop_rd_delete(ipf->rd);
2278   ipf->rd= 0;
2279   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2280 }
2281
2282
2283 /*========== interaction with innd - state machine ==========*/
2284
2285 /* See official state diagram at top of file.  We implement
2286  * this as follows:
2287  * -8<-
2288
2289             .=======.
2290             ||START||
2291             `======='
2292                 |
2293                 | open F
2294                 |
2295                 |    F ENOENT
2296                 |`---------------------------------------------------.
2297       F OPEN OK |                                                    |
2298                 |`---------------- - - -                             |
2299        D ENOENT |       D EXISTS   see OVERALL STATES diagram        |
2300                 |                  for full startup logic            |
2301      ,--------->|                                                    |
2302      |          V                                                    |
2303      |     ============                                       try to |
2304      |      NORMAL                                            open D |
2305      |     [Normal]                                                  |
2306      |      main F tail                                              |
2307      |     ============                                              V
2308      |          |                                                    |
2309      |          | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT            |
2310      ^          | hardlink F to D                                    |
2311      |     [Hardlinked]                                              |
2312      |          | unlink F                                           |
2313      |          | our handle onto F is now onto D                    |
2314      |     [Moved]                                                   |
2315      |          |                                                    |
2316      |          |<-------------------<---------------------<---------+
2317      |          |                                                    |
2318      |          | spawn inndcomm flush                               |
2319      |          V                                                    |
2320      |     ==================                                        |
2321      |      FLUSHING[-ABSENT]                                        |
2322      |     [Flushing]                                                |
2323      |     main D tail/none                                          |
2324      |     ==================                                        |
2325      |          |                                                    |
2326      |          |   INNDCOMM FLUSH FAILS                             ^
2327      |          |`----------------------->----------.                |
2328      |          |                                   |                |
2329      |          |   NO SUCH SITE                    V                |
2330      ^          |`--------------->----.         ==================== |
2331      |          |                      \        FLUSHFAILED[-ABSENT] |
2332      |          |                       \         [Moved]            |
2333      |          | FLUSH OK               \       main D tail/none    |
2334      |          | open F                  \     ==================== |
2335      |          |                          \        |                |
2336      |          |                           \       | TIME TO RETRY  |
2337      |          |`------->----.     ,---<---'\      `----------------'
2338      |          |    D NONE   |     | D NONE  `----.
2339      |          V             |     |              V
2340      |     =============      V     V             ============
2341      |      SEPARATED-1       |     |              DROPPING-1
2342      |      flsh->rd!=0       |     |              flsh->rd!=0
2343      |     [Separated]        |     |             [Dropping]
2344      |      main F idle       |     |              main none
2345      |      flsh D tail       |     |              flsh D tail
2346      |     =============      |     |             ============
2347      |          |             |     | install       |
2348      ^          | EOF ON D    |     |  defer        | EOF ON D
2349      |          V             |     |               V
2350      |     ===============    |     |             ===============
2351      |      SEPARATED-2       |     |              DROPPING-2
2352      |      flsh->rd==0       |     V              flsh->rd==0
2353      |     [Finishing]        |     |             [Dropping]
2354      |      main F tail       |     `.             main none
2355      |      flsh D closed     |       `.           flsh D closed
2356      |     ===============    V         `.        ===============
2357      |          |                         `.          |
2358      |          | ALL D PROCESSED           `.        | ALL D PROCESSED
2359      |          V install defer as backlog    `.      | install defer
2360      ^          | close D                       `.    | close D
2361      |          | unlink D                        `.  | unlink D
2362      |          |                                  |  |
2363      |          |                                  V  V
2364      `----------'                               ==============
2365                                                  DROPPED
2366                                                 [Dropped]
2367                                                  main none
2368                                                  flsh none
2369                                                  some backlog
2370                                                 ==============
2371                                                       |
2372                                                       | ALL BACKLOG DONE
2373                                                       |
2374                                                       | unlink lock
2375                                                       | exit
2376                                                       V
2377                                                   ==========
2378                                                    (ESRCH)
2379                                                   [Droppped]
2380                                                   ==========
2381  * ->8-
2382  */
2383
2384 static void startup_set_input_file(InputFile *f) {
2385   assert(!main_input_file);
2386   main_input_file= f;
2387   inputfile_reading_start(f);
2388 }
2389
2390 static void statemc_lock(void) {
2391   int lockfd;
2392   struct stat stab, stabf;
2393   
2394   for (;;) {
2395     lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2396     if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2397
2398     struct flock fl;
2399     memset(&fl,0,sizeof(fl));
2400     fl.l_type= F_WRLCK;
2401     fl.l_whence= SEEK_SET;
2402     int r= fcntl(lockfd, F_SETLK, &fl);
2403     if (r==-1) {
2404       if (errno==EACCES || isewouldblock(errno)) {
2405         if (quiet_multiple) exit(0);
2406         fatal("another duct holds the lockfile");
2407       }
2408       sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2409     }
2410
2411     xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2412     int lock_noent;
2413     xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2414
2415     if (!lock_noent && samefile(&stab, &stabf))
2416       break;
2417
2418     xclose(lockfd, "stale lockfile ", path_lock);
2419   }
2420
2421   FILE *lockfile= fdopen(lockfd, "w");
2422   if (!lockfile) sysdie("fdopen lockfile");
2423
2424   int r= ftruncate(lockfd, 0);
2425   if (r) sysdie("truncate lockfile to write new info");
2426
2427   if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2428               (unsigned long)self_pid,
2429               sitename, feedfile, remote_host) == EOF ||
2430       fflush(lockfile))
2431     sysfatal("write info to lockfile %s", path_lock);
2432
2433   debug("startup: locked");
2434 }
2435
2436 static void statemc_init(void) {
2437   struct stat stabdefer;
2438
2439   search_backlog_file();
2440
2441   int defer_noent;
2442   xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2443   if (defer_noent) {
2444     debug("startup: ductdefer ENOENT");
2445   } else {
2446     debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2447     switch (stabdefer.st_nlink==1) {
2448     case 1:
2449       open_defer(); /* so that we will later close it and rename it */
2450       break;
2451     case 2:
2452       xunlink(path_defer, "stale defer file link"
2453               " (presumably hardlink to backlog file)");
2454       break;
2455     default:
2456       die("defer file %s has unexpected link count %d",
2457           path_defer, stabdefer.st_nlink);
2458     }
2459   }
2460
2461   struct stat stab_f, stab_d;
2462   int noent_f;
2463
2464   InputFile *file_d= open_input_file(path_flushing);
2465   if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2466
2467   xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2468
2469   if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2470     debug("startup: F==D => Hardlinked");
2471     xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2472     noent_f= 1;
2473   }
2474
2475   if (noent_f) {
2476     debug("startup: F ENOENT => Moved");
2477     if (file_d) startup_set_input_file(file_d);
2478     spawn_inndcomm_flush("feedfile missing at startup");
2479     /* => Flushing, sms:=FLUSHING */
2480   } else {
2481     if (file_d) {
2482       debug("startup: F!=D => Separated");
2483       startup_set_input_file(file_d);
2484       flushing_input_file= main_input_file;
2485       main_input_file= open_input_file(feedfile);
2486       if (!main_input_file) die("feedfile vanished during startup");
2487       SMS(SEPARATED, 0, "found both old and current feed files");
2488     } else {
2489       debug("startup: F exists, D ENOENT => Normal");
2490       InputFile *file_f= open_input_file(feedfile);
2491       if (!file_f) die("feed file vanished during startup");
2492       startup_set_input_file(file_f);
2493       SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2494     }
2495   }
2496 }
2497
2498 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2499   assert(sms == sm_NORMAL);
2500
2501   debug("starting flush (%s) (%lu >?= %lu) (%d)",
2502         why,
2503         (unsigned long)(main_input_file ? main_input_file->offset : 0),
2504         (unsigned long)target_max_feedfile_size,
2505         until_flush);
2506
2507   int r= link(feedfile, path_flushing);
2508   if (r) sysfatal("link feedfile %s to flushing file %s",
2509                   feedfile, path_flushing);
2510   /* => Hardlinked */
2511
2512   xunlink(feedfile, "old feedfile link");
2513   /* => Moved */
2514
2515   spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2516 }
2517
2518 static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
2519   switch (sms) {
2520   case sm_NORMAL:
2521     statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2522     return 1;
2523   case sm_FLUSHFAILED:
2524     spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2525     return 1;
2526   default:
2527     return 0;
2528   }
2529 }
2530
2531 static void statemc_period_poll(void) {
2532   if (!until_flush) return;
2533   until_flush--;
2534   assert(until_flush>=0);
2535
2536   if (until_flush) return;
2537   int ok= trigger_flush_ok();
2538   assert(ok);
2539 }
2540
2541 static int inputfile_is_done(InputFile *ipf) {
2542   if (!ipf) return 0;
2543   if (ipf->inprogress) return 0; /* new article in the meantime */
2544   if (ipf->rd) return 0; /* not had EOF */
2545   return 1;
2546 }
2547
2548 static void notice_processed(InputFile *ipf, int completed,
2549                              const char *what, const char *spec) {
2550   if (!ipf) return; /* allows preterminate to be lazy */
2551
2552 #define RCI_NOTHING(x) /* nothing */
2553 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2554 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2555
2556 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2557
2558   char *inprog= completed
2559     ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2560     : xasprintf(" inprogress=%ld", ipf->inprogress);
2561
2562   info("%s %s%s read=%d (+bl=%d,+err=%d)%s"
2563        " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2564        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2565        ,
2566        completed?"completed":"processed", what, spec,
2567        ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, inprog,
2568        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2569        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2570        CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2571        , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2572        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
2573        );
2574
2575   free(inprog);
2576
2577 #undef CNT
2578 }
2579
2580 static void statemc_check_backlog_done(void) {
2581   InputFile *ipf= backlog_input_file;
2582   if (!inputfile_is_done(ipf)) return;
2583
2584   const char *slash= strrchr(ipf->path, '/');
2585   const char *leaf= slash ? slash+1 : ipf->path;
2586   const char *under= strchr(slash, '_');
2587   const char *rest= under ? under+1 : leaf;
2588   if (!strncmp(rest,"backlog",7)) rest += 7;
2589   notice_processed(ipf,1,"backlog ",rest);
2590
2591   close_input_file(ipf);
2592   if (unlink(ipf->path)) {
2593     if (errno != ENOENT)
2594       sysdie("could not unlink processed backlog file %s", ipf->path);
2595     warn("backlog file %s vanished while we were reading it"
2596          " so we couldn't remove it (but it's done now, anyway)",
2597          ipf->path);
2598   }
2599   free(ipf);
2600   backlog_input_file= 0;
2601   search_backlog_file();
2602   return;
2603 }
2604
2605 static void statemc_check_flushing_done(void) {
2606   InputFile *ipf= flushing_input_file;
2607   if (!inputfile_is_done(ipf)) return;
2608
2609   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2610
2611   notice_processed(ipf,1,"feedfile","");
2612
2613   close_defer();
2614
2615   xunlink(path_flushing, "old flushing file");
2616
2617   close_input_file(flushing_input_file);
2618   free(flushing_input_file);
2619   flushing_input_file= 0;
2620
2621   if (sms==sm_SEPARATED) {
2622     notice("flush complete");
2623     SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2624   } else if (sms==sm_DROPPING) {
2625     SMS(DROPPED, 0, "old flush complete");
2626     search_backlog_file();
2627     notice("feed dropped, but will continue until backlog is finished");
2628   }
2629 }
2630
2631 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2632                                       void *u) {
2633   assert(!inputfile_is_done(main_input_file));
2634   statemc_check_flushing_done();
2635   statemc_check_backlog_done();
2636   return OOP_CONTINUE;
2637 }
2638
2639 static void queue_check_input_done(void) {
2640   loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2641 }
2642
2643 static void statemc_setstate(StateMachineState newsms, int periods,
2644                              const char *forlog, const char *why) {
2645   sms= newsms;
2646   until_flush= periods;
2647
2648   const char *xtra= "";
2649   switch (sms) {
2650   case sm_FLUSHING:
2651   case sm_FLUSHFAILED:
2652     if (!main_input_file) xtra= "-ABSENT";
2653     break;
2654   case sm_SEPARATED:
2655   case sm_DROPPING:
2656     xtra= flushing_input_file->rd ? "-1" : "-2";
2657     break;
2658   default:;
2659   }
2660
2661   if (periods) {
2662     info("state %s%s[%d] %s",forlog,xtra,periods,why);
2663   } else {
2664     info("state %s%s %s",forlog,xtra,why);
2665   }
2666 }
2667
2668 /*---------- defer and backlog files ----------*/
2669
2670 static void open_defer(void) {
2671   struct stat stab;
2672
2673   if (defer) return;
2674
2675   defer= fopen(path_defer, "a+");
2676   if (!defer) sysfatal("could not open defer file %s", path_defer);
2677
2678   /* truncate away any half-written records */
2679
2680   xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2681
2682   if (stab.st_size > LONG_MAX)
2683     die("defer file %s size is far too large", path_defer);
2684
2685   if (!stab.st_size)
2686     return;
2687
2688   long orgsize= stab.st_size;
2689   long truncto= stab.st_size;
2690   for (;;) {
2691     if (!truncto) break; /* was only (if anything) one half-truncated record */
2692     if (fseek(defer, truncto-1, SEEK_SET) < 0)
2693       sysdie("seek in defer file %s while truncating partial", path_defer);
2694
2695     int r= getc(defer);
2696     if (r==EOF) {
2697       if (ferror(defer))
2698         sysdie("failed read from defer file %s", path_defer);
2699       else
2700         die("defer file %s shrank while we were checking it!", path_defer);
2701     }
2702     if (r=='\n') break;
2703     truncto--;
2704   }
2705
2706   if (stab.st_size != truncto) {
2707     warn("truncating half-record at end of defer file %s -"
2708          " shrinking by %ld bytes from %ld to %ld",
2709          path_defer, orgsize - truncto, orgsize, truncto);
2710
2711     if (fflush(defer))
2712       sysfatal("could not flush defer file %s", path_defer);
2713     if (ftruncate(fileno(defer), truncto))
2714       sysdie("could not truncate defer file %s", path_defer);
2715
2716   } else {
2717     info("continuing existing defer file %s (%ld bytes)",
2718          path_defer, orgsize);
2719   }
2720   if (fseek(defer, truncto, SEEK_SET))
2721     sysdie("could not seek to new end of defer file %s", path_defer);
2722 }
2723
2724 static void close_defer(void) {
2725   if (!defer)
2726     return;
2727
2728   struct stat stab;
2729   xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2730
2731   if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2732   defer= 0;
2733
2734   time_t now= xtime();
2735
2736   char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2737                            (unsigned long)now,
2738                            (unsigned long)stab.st_ino);
2739   if (link(path_defer, backlog))
2740     sysfatal("could not install defer file %s as backlog file %s",
2741            path_defer, backlog);
2742   if (unlink(path_defer))
2743     sysdie("could not unlink old defer link %s to backlog file %s",
2744            path_defer, backlog);
2745
2746   free(backlog);
2747
2748   if (until_backlog_nextscan < 0 ||
2749       until_backlog_nextscan > backlog_retry_minperiods + 1)
2750     until_backlog_nextscan= backlog_retry_minperiods + 1;
2751 }
2752
2753 static void poll_backlog_file(void) {
2754   if (until_backlog_nextscan < 0) return;
2755   if (until_backlog_nextscan-- > 0) return;
2756   search_backlog_file();
2757 }
2758
2759 static void search_backlog_file(void) {
2760   /* returns non-0 iff there are any backlog files */
2761
2762   glob_t gl;
2763   int r, i;
2764   struct stat stab;
2765   const char *oldest_path=0;
2766   time_t oldest_mtime=0, now;
2767
2768   if (backlog_input_file) return;
2769
2770  try_again:
2771
2772   r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2773
2774   switch (r) {
2775   case GLOB_ABORTED:
2776     sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2777   case GLOB_NOSPACE:
2778     fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2779   case 0:
2780     for (i=0; i<gl.gl_pathc; i++) {
2781       const char *path= gl.gl_pathv[i];
2782
2783       if (strchr(path,'#') || strchr(path,'~')) {
2784         debug("backlog file search skipping %s", path);
2785         continue;
2786       }
2787       r= stat(path, &stab);
2788       if (r) {
2789         syswarn("failed to stat backlog file %s", path);
2790         continue;
2791       }
2792       if (!S_ISREG(stab.st_mode)) {
2793         warn("backlog file %s is not a plain file (or link to one)", path);
2794         continue;
2795       }
2796       if (!oldest_path || stab.st_mtime < oldest_mtime) {
2797         oldest_path= path;
2798         oldest_mtime= stab.st_mtime;
2799       }
2800     }
2801   case GLOB_NOMATCH: /* fall through */
2802     break;
2803   default:
2804     sysdie("glob expansion of backlog pattern %s gave unexpected"
2805            " nonzero (error?) return value %d", globpat_backlog, r);
2806   }
2807
2808   if (!oldest_path) {
2809     debug("backlog scan: none");
2810
2811     if (sms==sm_DROPPED) {
2812       preterminate();
2813       notice("feed dropped and our work is complete");
2814
2815       int r= unlink(path_control);
2816       if (r && errno!=ENOENT)
2817         syswarn("failed to remove control symlink for old feed");
2818
2819       xunlink(path_lock,    "lockfile for old feed");
2820       exit(4);
2821     }
2822     until_backlog_nextscan= backlog_spontrescan_periods;
2823     goto xfree;
2824   }
2825
2826   now= xtime();
2827   double age= difftime(now, oldest_mtime);
2828   long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2829
2830   if (age_deficiency <= 0) {
2831     debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2832           age, age_deficiency, oldest_path);
2833
2834     backlog_input_file= open_input_file(oldest_path);
2835     if (!backlog_input_file) {
2836       warn("backlog file %s vanished as we opened it", oldest_path);
2837       globfree(&gl);
2838       goto try_again;
2839     }
2840     inputfile_reading_start(backlog_input_file);
2841     until_backlog_nextscan= -1;
2842     goto xfree;
2843   }
2844
2845   until_backlog_nextscan= age_deficiency / period_seconds;
2846
2847   if (backlog_spontrescan_periods >= 0 &&
2848       until_backlog_nextscan > backlog_spontrescan_periods)
2849     until_backlog_nextscan= backlog_spontrescan_periods;
2850
2851   debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2852         age, age_deficiency, until_backlog_nextscan, oldest_path);
2853
2854  xfree:
2855   globfree(&gl);
2856   return;
2857 }
2858
2859 /*---------- shutdown and signal handling ----------*/
2860
2861 static void preterminate(void) {
2862   if (in_child) return;
2863   notice_processed(main_input_file,0,"feedfile","");
2864   notice_processed(flushing_input_file,0,"flushing","");
2865   if (backlog_input_file)
2866     notice_processed(backlog_input_file,0, "backlog file ",
2867                      backlog_input_file->path);
2868 }
2869
2870 static int signal_self_pipe[2];
2871 static sig_atomic_t terminate_sig_flag;
2872
2873 static void raise_default(int signo) {
2874   xsigsetdefault(signo);
2875   raise(signo);
2876   abort();
2877 }
2878
2879 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2880   assert(fd=signal_self_pipe[0]);
2881   char buf[PIPE_BUF];
2882   int r= read(signal_self_pipe[0], buf, sizeof(buf));
2883   if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2884   if (r==0) die("eof on signal self pipe");
2885   if (terminate_sig_flag) {
2886     preterminate();
2887     notice("terminating (%s)", strsignal(terminate_sig_flag));
2888     raise_default(terminate_sig_flag);
2889   }
2890   return OOP_CONTINUE;
2891 }
2892
2893 static void sigarrived_handler(int signum) {
2894   static char x;
2895   switch (signum) {
2896   case SIGTERM:
2897   case SIGINT:
2898     if (!terminate_sig_flag) terminate_sig_flag= signum;
2899     break;
2900   default:
2901     abort();
2902   }
2903   write(signal_self_pipe[1],&x,1);
2904 }
2905
2906 static void init_signals(void) {
2907   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
2908     sysdie("could not ignore SIGPIPE");
2909
2910   if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
2911
2912   xsetnonblock(signal_self_pipe[0],1);
2913   xsetnonblock(signal_self_pipe[1],1);
2914
2915   struct sigaction sa;
2916   memset(&sa,0,sizeof(sa));
2917   sa.sa_handler= sigarrived_handler;
2918   sa.sa_flags= SA_RESTART;
2919   xsigaction(SIGTERM,&sa);
2920   xsigaction(SIGINT,&sa);
2921
2922   on_fd_read_except(signal_self_pipe[0], sigarrived_event);
2923 }
2924
2925 /*========== flushing the feed ==========*/
2926
2927 static pid_t inndcomm_child;
2928 static int inndcomm_sentinel_fd;
2929
2930 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
2931   assert(inndcomm_child);
2932   assert(fd == inndcomm_sentinel_fd);
2933   int status= xwaitpid(&inndcomm_child, "inndcomm");
2934   inndcomm_child= 0;
2935   
2936   cancel_fd_read_except(fd);
2937   xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
2938   inndcomm_sentinel_fd= 0;
2939
2940   assert(!flushing_input_file);
2941
2942   if (WIFEXITED(status)) {
2943     switch (WEXITSTATUS(status)) {
2944
2945     case INNDCOMMCHILD_ESTATUS_FAIL:
2946       goto failed;
2947
2948     case INNDCOMMCHILD_ESTATUS_NONESUCH:
2949       notice("feed has been dropped by innd, finishing up");
2950       flushing_input_file= main_input_file;
2951       tailing_queue_readable(flushing_input_file);
2952         /* we probably previously returned EAGAIN from our fake read method
2953          * when in fact we were at EOF, so signal another readable event
2954          * so we actually see the EOF */
2955
2956       main_input_file= 0;
2957
2958       if (flushing_input_file) {
2959         SMS(DROPPING, 0, "feed dropped by innd, but must finish last flush");
2960       } else {
2961         close_defer();
2962         SMS(DROPPED, 0, "feed dropped by innd");
2963         search_backlog_file();
2964       }
2965       return OOP_CONTINUE;
2966
2967     case 0:
2968       /* as above */
2969       flushing_input_file= main_input_file;
2970       tailing_queue_readable(flushing_input_file);
2971
2972       main_input_file= open_input_file(feedfile);
2973       if (!main_input_file)
2974         die("flush succeeded but feedfile %s does not exist!", feedfile);
2975
2976       if (flushing_input_file) {
2977         SMS(SEPARATED, spontaneous_flush_periods, "recovery flush complete");
2978       } else {
2979         close_defer();
2980         SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2981       }
2982       return OOP_CONTINUE;
2983
2984     default:
2985       goto unexpected_exitstatus;
2986
2987     }
2988   } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
2989     warn("flush timed out trying to talk to innd");
2990     goto failed;
2991   } else {
2992   unexpected_exitstatus:
2993     report_child_status("inndcomm child", status);
2994   }
2995
2996  failed:
2997   SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
2998   return OOP_CONTINUE;
2999 }
3000
3001 static void inndcommfail(const char *what) {
3002   syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
3003   exit(INNDCOMMCHILD_ESTATUS_FAIL);
3004 }
3005
3006 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
3007   int pipefds[2];
3008
3009   notice("flushing %s",why);
3010
3011   assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
3012   assert(!inndcomm_child);
3013   assert(!inndcomm_sentinel_fd);
3014
3015   if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
3016
3017   inndcomm_child= xfork("inndcomm child");
3018
3019   if (!inndcomm_child) {
3020     const char *flushargv[2]= { sitename, 0 };
3021     char *reply;
3022     int r;
3023
3024     xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
3025     /* parent spots the autoclose of pipefds[1] when we die or exit */
3026
3027     if (simulate_flush>=0) {
3028       warn("SIMULATING flush child status %d", simulate_flush);
3029       if (simulate_flush>128) raise(simulate_flush-128);
3030       else exit(simulate_flush);
3031     }
3032
3033     alarm(inndcomm_flush_timeout);
3034     r= ICCopen();                         if (r)   inndcommfail("connect");
3035     r= ICCcommand('f',flushargv,&reply);  if (r<0) inndcommfail("transmit");
3036     if (!r) exit(0); /* yay! */
3037
3038     if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
3039     syswarn("innd ctlinnd flush failed: innd said %s", reply);
3040     exit(INNDCOMMCHILD_ESTATUS_FAIL);
3041   }
3042
3043   simulate_flush= -1;
3044
3045   xclose(pipefds[1], "inndcomm sentinel child's end",0);
3046   inndcomm_sentinel_fd= pipefds[0];
3047   assert(inndcomm_sentinel_fd);
3048   on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
3049
3050   SMS(FLUSHING, 0, why);
3051 }
3052
3053 /*========== main program ==========*/
3054
3055 static void postfork_inputfile(InputFile *ipf) {
3056   if (!ipf) return;
3057   xclose(ipf->fd, "(in child) input file ", ipf->path);
3058 }
3059
3060 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
3061   /* we have no stdio streams that are buffered long-term */
3062   if (!f) return;
3063   if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
3064 }
3065
3066 static void postfork(void) {
3067   in_child= 1;
3068
3069   xsigsetdefault(SIGTERM);
3070   xsigsetdefault(SIGINT);
3071   xsigsetdefault(SIGPIPE);
3072   if (terminate_sig_flag) raise(terminate_sig_flag);
3073
3074   postfork_inputfile(main_input_file);
3075   postfork_inputfile(flushing_input_file);
3076
3077   Conn *conn;
3078   FOR_CONN(conn)
3079     conn_closefd(conn,"(in child) ");
3080
3081   postfork_stdio(defer, "defer file ", path_defer);
3082 }
3083
3084 typedef struct Every Every;
3085 struct Every {
3086   struct timeval interval;
3087   int fixed_rate;
3088   void (*f)(void);
3089 };
3090
3091 static void every_schedule(Every *e, struct timeval base);
3092
3093 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3094   Every *e= e_v;
3095   e->f();
3096   if (!e->fixed_rate) xgettimeofday(&base);
3097   every_schedule(e, base);
3098   return OOP_CONTINUE;
3099 }
3100
3101 static void every_schedule(Every *e, struct timeval base) {
3102   struct timeval when;
3103   timeradd(&base, &e->interval, &when);
3104   loop->on_time(loop, when, every_happens, e);
3105 }
3106
3107 static void every(int interval, int fixed_rate, void (*f)(void)) {
3108   NEW_DECL(Every *,e);
3109   e->interval.tv_sec= interval;
3110   e->interval.tv_usec= 0;
3111   e->fixed_rate= fixed_rate;
3112   e->f= f;
3113   struct timeval now;
3114   xgettimeofday(&now);
3115   every_schedule(e, now);
3116 }
3117
3118 static void filepoll(void) {
3119   filemon_callback(main_input_file);
3120   filemon_callback(flushing_input_file);
3121 }
3122
3123 static char *debug_report_ipf(InputFile *ipf) {
3124   if (!ipf) return xasprintf("none");
3125
3126   const char *slash= strrchr(ipf->path,'/');
3127   const char *path= slash ? slash+1 : ipf->path;
3128
3129   return xasprintf("%p/%s:queue=%d,ip=%ld,off=%ld,fd=%d%s%s",
3130                    ipf, path,
3131                    ipf->queue.count, ipf->inprogress, (long)ipf->offset,
3132                    ipf->fd,
3133                    ipf->rd ? "" : ",!rd",
3134                    ipf->skippinglong ? "*skiplong" : "");
3135 }
3136
3137 static void period(void) {
3138   char *dipf_main=     debug_report_ipf(main_input_file);
3139   char *dipf_flushing= debug_report_ipf(flushing_input_file);
3140   char *dipf_backlog=  debug_report_ipf(backlog_input_file);
3141
3142   debug("PERIOD"
3143         " sms=%s[%d] conns=%d until_connect=%d"
3144         " input_files main:%s flushing:%s backlog:%s"
3145         " children connecting=%ld inndcomm=%ld"
3146         ,
3147         sms_names[sms], until_flush, conns.count, until_connect,
3148         dipf_main, dipf_flushing, dipf_backlog,
3149         (long)connecting_child, (long)inndcomm_child
3150         );
3151
3152   free(dipf_main);
3153   free(dipf_flushing);
3154   free(dipf_backlog);
3155
3156   if (until_connect) until_connect--;
3157
3158   inputfile_queue_check_expired(backlog_input_file);
3159   poll_backlog_file();
3160   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3161   statemc_period_poll();
3162   check_assign_articles();
3163   check_idle_conns();
3164 }
3165
3166
3167 /*========== dumping state ==========*/
3168
3169 static void dump_article_list(FILE *f, const ControlCommand *c,
3170                               const ArticleList *al) {
3171   fprintf(f, " count=%d\n", al->count);
3172   if (!c->xval) return;
3173   
3174   int i; Article *art;
3175   for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3176     fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3177     DUMPV("%p", art->,ipf);
3178     DUMPV("%d", art->,missing);
3179     DUMPV("%lu", (unsigned long)art->,offset);
3180     DUMPV("%d", art->,blanklen);
3181     DUMPV("%d", art->,midlen);
3182     fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3183   }
3184 }
3185   
3186 static void dump_input_file(FILE *f, const ControlCommand *c,
3187                             InputFile *ipf, const char *wh) {
3188   char *dipf= debug_report_ipf(ipf);
3189   fprintf(f,"input %s %s", wh, dipf);
3190   free(dipf);
3191   
3192   if (ipf) {
3193     DUMPV("%d", ipf->,readcount_ok);
3194     DUMPV("%d", ipf->,readcount_blank);
3195     DUMPV("%d", ipf->,readcount_err);
3196   }
3197   fprintf(f,"\n");
3198   if (ipf) {
3199     ArtState state; const char *const *statename; 
3200     for (state=0, statename=artstate_names; *statename; state++,statename++) {
3201 #define RC_DUMP_FMT(x) " " #x "=%d"
3202 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3203       fprintf(f,"input %s counts %-11s"
3204               RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3205               wh, *statename
3206               RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3207     }
3208     fprintf(f,"input %s queue", wh);
3209     dump_article_list(f,c,&ipf->queue);
3210   }
3211 }
3212
3213 CCMD(dump) {
3214   int i;
3215   fprintf(cc->out, "dumping state to %s\n", path_dump);
3216   FILE *f= fopen(path_dump, "w");
3217   if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3218
3219   fprintf(f,"general");
3220   DUMPV("%s", sms_names,[sms]);
3221   DUMPV("%d", ,until_flush);
3222   DUMPV("%ld", (long),self_pid);
3223   DUMPV("%p", , defer);
3224   DUMPV("%d", , until_connect);
3225   DUMPV("%d", , until_backlog_nextscan);
3226   DUMPV("%d", , simulate_flush);
3227   fprintf(f,"\nnocheck");
3228   DUMPV("%#.10f", , accept_proportion);
3229   DUMPV("%d", , nocheck);
3230   DUMPV("%d", , nocheck_reported);
3231   fprintf(f,"\n");
3232
3233   fprintf(f,"special");
3234   DUMPV("%ld", (long),connecting_child);
3235   DUMPV("%d", , connecting_fdpass_sock);
3236   DUMPV("%d", , control_master);
3237   fprintf(f,"\n");
3238
3239   fprintf(f,"filemon ");
3240   filemon_method_dump_info(f);
3241
3242   dump_input_file(f,c, main_input_file,     "main"    );
3243   dump_input_file(f,c, flushing_input_file, "flushing");
3244   dump_input_file(f,c, backlog_input_file,  "backlog" );
3245
3246   fprintf(f,"conns count=%d\n", conns.count);
3247
3248   Conn *conn;
3249   FOR_CONN(conn) {
3250
3251     fprintf(f,"C%d",conn->fd);
3252     DUMPV("%p",conn->,rd);             DUMPV("%d",conn->,max_queue);
3253     DUMPV("%d",conn->,stream);         DUMPV("%d",conn->,quitting);
3254     DUMPV("%d",conn->,since_activity);
3255     fprintf(f,"\n");
3256
3257     fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3258     fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3259     fprintf(f,"C%d sent",    conn->fd); dump_article_list(f,c,&conn->sent);
3260
3261     fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3262     for (i=0; i<conn->xmitu; i++) {
3263       const struct iovec *iv= &conn->xmit[i];
3264       const XmitDetails *xd= &conn->xmitd[i];
3265       char *dinfo;
3266       switch (xd->kind) {
3267       case xk_Const:    dinfo= xasprintf("Const");                 break;
3268       case xk_Artdata:  dinfo= xasprintf("A%p", xd->info.sm_art);  break;
3269       default:
3270         abort();
3271       }
3272       fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3273               sanitise(iv->iov_base, iv->iov_len));
3274       free(dinfo);
3275     }
3276   }
3277
3278   fprintf(f,"paths");
3279   DUMPV("%s", , path_lock);
3280   DUMPV("%s", , path_flushing);
3281   DUMPV("%s", , path_defer);
3282   DUMPV("%s", , path_control);
3283   DUMPV("%s", , path_dump);
3284   DUMPV("%s", , globpat_backlog);
3285   fprintf(f,"\n");
3286
3287   if (!!ferror(f) + !!fclose(f)) {
3288     fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3289     return;
3290   }
3291 }
3292
3293 /*========== option parsing ==========*/
3294
3295 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3296 static void vbadusage(const char *fmt, va_list al) {
3297   char *m= xvasprintf(fmt,al);
3298   fprintf(stderr, "bad usage: %s\n"
3299           "say --help for help, or read the manpage\n",
3300           m);
3301   if (become_daemon)
3302     syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3303   exit(8);
3304 }
3305
3306 /*---------- generic option parser ----------*/
3307
3308 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3309 static void badusage(const char *fmt, ...) {
3310   va_list al;
3311   va_start(al,fmt);
3312   vbadusage(fmt,al);
3313 }
3314
3315 enum OptFlags {
3316   of_seconds= 001000u,
3317   of_boolean= 002000u,
3318 };
3319
3320 typedef struct Option Option;
3321 typedef void OptionParser(const Option*, const char *val);
3322
3323 struct Option {
3324   int shrt;
3325   const char *lng, *formarg;
3326   void *store;
3327   OptionParser *fn;
3328   int intval;
3329 };
3330
3331 static void parse_options(const Option *options, char ***argvp) {
3332   /* on return *argvp is first non-option arg; argc is not updated */
3333
3334   for (;;) {
3335     const char *arg= *++(*argvp);
3336     if (!arg) break;
3337     if (*arg != '-') break;
3338     if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3339     int a;
3340     while ((a= *++arg)) {
3341       const Option *o;
3342       if (a=='-') {
3343         arg++;
3344         char *equals= strchr(arg,'=');
3345         int len= equals ? (equals - arg) : strlen(arg);
3346         for (o=options; o->shrt || o->lng; o++)
3347           if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3348             goto found_long;
3349         badusage("unknown long option --%s",arg);
3350       found_long:
3351         if (!o->formarg) {
3352           if (equals) badusage("option --%s does not take a value",o->lng);
3353           arg= 0;
3354         } else if (equals) {
3355           arg= equals+1;
3356         } else {
3357           arg= *++(*argvp);
3358           if (!arg) badusage("option --%s needs a value for %s",
3359                              o->lng, o->formarg);
3360         }
3361         o->fn(o, arg);
3362         break; /* eaten the whole argument now */
3363       }
3364       for (o=options; o->shrt || o->lng; o++)
3365         if (a == o->shrt)
3366           goto found_short;
3367       badusage("unknown short option -%c",a);
3368     found_short:
3369       if (!o->formarg) {
3370         o->fn(o,0);
3371       } else {
3372         if (!*++arg) {
3373           arg= *++(*argvp);
3374           if (!arg) badusage("option -%c needs a value for %s",
3375                              o->shrt, o->formarg);
3376         }
3377         o->fn(o,arg);
3378         break; /* eaten the whole argument now */
3379       }
3380     }
3381   }
3382 }
3383
3384 #define DELIMPERHAPS(delim,str)  (str) ? (delim) : "", (str) ? (str) : ""
3385
3386 static void print_options(const Option *options, FILE *f) {
3387   const Option *o;
3388   for (o=options; o->shrt || o->lng; o++) {
3389     char shrt[2] = { o->shrt, 0 };
3390     char *optspec= xasprintf("%s%s%s%s%s",
3391                              o->shrt ? "-" : "", shrt,
3392                              o->shrt && o->lng ? "|" : "",
3393                              DELIMPERHAPS("--", o->lng));
3394     fprintf(f, "  %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3395     free(optspec);
3396   }
3397 }
3398
3399 /*---------- specific option types ----------*/
3400
3401 static void op_integer(const Option *o, const char *val) {
3402   char *ep;
3403   errno= 0;
3404   unsigned long ul= strtoul(val,&ep,10);
3405   if (*ep || ep==val || errno || ul>INT_MAX)
3406     badusage("bad integer value for %s",o->lng);
3407   int *store= o->store;
3408   *store= ul;
3409 }
3410
3411 static void op_double(const Option *o, const char *val) {
3412   int *store= o->store;
3413   char *ep;
3414   errno= 0;
3415   *store= strtod(val, &ep);
3416   if (*ep || ep==val || errno)
3417     badusage("bad floating point value for %s",o->lng);
3418 }
3419
3420 static void op_string(const Option *o, const char *val) {
3421   const char **store= o->store;
3422   *store= val;
3423 }
3424
3425 static void op_seconds(const Option *o, const char *val) {
3426   int *store= o->store;
3427   char *ep;
3428   int unit;
3429
3430   double v= strtod(val,&ep);
3431   if (ep==val) badusage("bad time/duration value for %s",o->lng);
3432
3433   if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3434   else if (!strcmp(ep,"m") || !strcmp(ep,"min"))    unit= 60;
3435   else if (!strcmp(ep,"h") || !strcmp(ep,"hour"))   unit= 3600;
3436   else if (!strcmp(ep,"d") || !strcmp(ep,"day"))    unit= 86400;
3437   else if (!strcmp(ep,"das")) unit= 10;
3438   else if (!strcmp(ep,"hs"))  unit= 100;
3439   else if (!strcmp(ep,"ks"))  unit= 1000;
3440   else if (!strcmp(ep,"Ms"))  unit= 1000000;
3441   else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3442
3443   v *= unit;
3444   v= ceil(v);
3445   if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3446   *store= v;
3447 }
3448
3449 static void op_setint(const Option *o, const char *val) {
3450   int *store= o->store;
3451   *store= o->intval;
3452 }
3453
3454 /*---------- specific options ----------*/
3455
3456 static void help(const Option *o, const char *val);
3457
3458 static const Option innduct_options[]= {
3459 {'f',"feedfile",         "F",     &feedfile,                 op_string      },
3460 {'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
3461 {0,"no-daemon",          0,       &become_daemon,            op_setint, 0   },
3462 {0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
3463 {0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
3464 {'C',"inndconf",         "F",     &inndconffile,             op_string      },
3465 {'P',"port",             "PORT",  &port,                     op_integer     },
3466 {0,"ctrl-sock-dir",      0,       &realsockdir,              op_string      },
3467 {0,"help",               0,       0,                         help           },
3468
3469 {0,"max-connections",    "N",     &max_connections,          op_integer     },
3470 {0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
3471 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
3472 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
3473
3474 {0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
3475 {0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
3476 {0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
3477
3478 {0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
3479 {0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
3480
3481 {0,"reconnect-interval",     "PERIOD", &reconnect_delay_periods,  op_seconds },
3482 {0,"flush-retry-interval",   "PERIOD", &flushfail_retry_periods,  op_seconds },
3483 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3484 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3485 {0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
3486 {0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
3487
3488 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
3489 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
3490
3491 {0,0}
3492 };
3493
3494 static void printusage(FILE *f) {
3495   fputs("usage: innduct [options] site [fqdn]\n"
3496         "available options are:\n", f);
3497   print_options(innduct_options, f);
3498 }
3499
3500 static void help(const Option *o, const char *val) {
3501   printusage(stdout);
3502   if (ferror(stdout) || fflush(stdout)) {
3503     perror("innduct: writing help");
3504     exit(12);
3505   }
3506   exit(0);
3507 }
3508
3509 static void convert_to_periods_rndup(int *store) {
3510   *store += period_seconds-1;
3511   *store /= period_seconds;
3512 }
3513
3514 int main(int argc, char **argv) {
3515   if (!argv[1]) {
3516     printusage(stderr);
3517     exit(8);
3518   }
3519
3520   parse_options(innduct_options, &argv);
3521
3522   /* arguments */
3523
3524   sitename= *argv++;
3525   if (!sitename) badusage("need site name argument");
3526   remote_host= *argv++;
3527   if (*argv) badusage("too many non-option arguments");
3528
3529   /* defaults */
3530
3531   int r= innconf_read(inndconffile);
3532   if (!r) badusage("could not read inn.conf (more info on stderr)");
3533
3534   if (!remote_host) remote_host= sitename;
3535
3536   if (nocheck_thresh < 0 || nocheck_thresh > 100)
3537     badusage("nocheck threshold percentage must be between 0..100");
3538   nocheck_thresh *= 0.01;
3539
3540   if (nocheck_decay < 0.1)
3541     badusage("nocheck decay articles must be at least 0.1");
3542   nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3543
3544   convert_to_periods_rndup(&reconnect_delay_periods);
3545   convert_to_periods_rndup(&flushfail_retry_periods);
3546   convert_to_periods_rndup(&backlog_retry_minperiods);
3547   convert_to_periods_rndup(&backlog_spontrescan_periods);
3548   convert_to_periods_rndup(&spontaneous_flush_periods);
3549   convert_to_periods_rndup(&need_activity_periods);
3550
3551   if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3552     badusage("bad input data ratio must be between 0..100");
3553   max_bad_data_ratio *= 0.01;
3554
3555   if (!feedfile) {
3556     feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3557   } else if (!feedfile[0]) {
3558     badusage("feed filename must be nonempty");
3559   } else if (feedfile[strlen(feedfile)-1]=='/') {
3560     feedfile= xasprintf("%s%s",feedfile,sitename);
3561   }
3562
3563   const char *feedfile_forbidden= "?*[~#";
3564   int c;
3565   while ((c= *feedfile_forbidden++))
3566     if (strchr(feedfile, c))
3567       badusage("feed filename may not contain metacharacter %c",c);
3568
3569   /* set things up */
3570
3571   path_lock=        xasprintf("%s_lock",      feedfile);
3572   path_flushing=    xasprintf("%s_flushing",  feedfile);
3573   path_defer=       xasprintf("%s_defer",     feedfile);
3574   path_control=     xasprintf("%s_control",   feedfile);
3575   path_dump=        xasprintf("%s_dump",      feedfile);
3576   globpat_backlog=  xasprintf("%s_backlog*",  feedfile);
3577
3578   oop_source_sys *sysloop= oop_sys_new();
3579   if (!sysloop) sysdie("could not create liboop event loop");
3580   loop= (oop_source*)sysloop;
3581
3582   LIST_INIT(conns);
3583
3584   if (become_daemon) {
3585     int i;
3586     for (i=3; i<255; i++)
3587       /* do this now before we open syslog, etc. */
3588       close(i);
3589     openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3590
3591     int null= open("/dev/null",O_RDWR);
3592     if (null<0) sysfatal("failed to open /dev/null");
3593     dup2(null,0);
3594     dup2(null,1);
3595     dup2(null,2);
3596     xclose(null, "/dev/null original fd",0);
3597
3598     pid_t child1= xfork("daemonise first fork");
3599     if (child1) _exit(0);
3600
3601     pid_t sid= setsid();
3602     if (sid != child1) sysfatal("setsid failed");
3603
3604     pid_t child2= xfork("daemonise second fork");
3605     if (child2) _exit(0);
3606   }
3607
3608   self_pid= getpid();
3609   if (self_pid==-1) sysdie("getpid");
3610
3611   statemc_lock();
3612
3613   init_signals();
3614
3615   notice("starting");
3616
3617   if (!become_daemon)
3618     control_stdio();
3619
3620   control_init();
3621
3622   int filemon_ok= 0;
3623   if (!try_filemon) {
3624     notice("filemon: suppressed by command line option, polling");
3625   } else {
3626     filemon_ok= filemon_method_init();
3627     if (!filemon_ok)
3628       warn("filemon: no file monitoring available, polling");
3629   }
3630   if (!filemon_ok)
3631     every(filepoll_seconds,0,filepoll);
3632
3633   every(period_seconds,1,period);
3634
3635   statemc_init();
3636
3637   /* let's go */
3638
3639   void *run= oop_sys_run(sysloop);
3640   assert(run == OOP_ERROR);
3641   sysdie("event loop failed");
3642 }