chiark / gitweb /
queue for each input file
[inn-innduct.git] / backends / innduct.c
1 /*
2  * debugging rune:
3  *  build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
4  */
5
6 /*--
7 flow control notes
8 to ensure articles go away eventually
9 separate queue for each input file
10   queue expiry
11     every period, check head of backlog queue for expiry with SMretrieve
12       if too old: discard, and check next article
13     also check every backlog article as we read it
14   flush expiry
15     after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
16     one-off: eat queued articles from flushing and write them to defer
17     one-off: connfail all connections which have any articles from flushing
18     newly read articles from flushing go straight to defer
19     this should take care of it and get us out of this state
20 to avoid filling up ram needlessly
21   input control
22     limit number of queued articles for each ipf
23     pause/resume inputfile tailing
24 --*/
25
26 /*
27  * Newsfeeds file entries should look like this:
28  *     host.name.of.site[/exclude,exclude,...]\
29  *             :pattern,pattern...[/distribution,distribution...]\
30  *             :Tf,Wnm
31  *             :
32  * or
33  *     sitename[/exclude,exclude,...]\
34  *             :pattern,pattern...[/distribution,distribution...]\
35  *             :Tf,Wnm
36  *             :host.name.of.site
37  *
38  * Four files full of
39  *    token messageid
40  * or might be blanked out
41  *    <spc><spc><spc><spc>....
42  *
43  * F site.name                 main feed file
44  *                                opened/created, then written, by innd
45  *                                read by duct
46  *                                unlinked by duct
47  *                                tokens blanked out by duct when processed
48  *   site.name_lock            lock preventing multiple ducts
49  *                                to hold lock must open,F_SETLK[W]
50  *                                  and then stat to check that locked file
51  *                                  still has name site.name_lock
52  *                                holder of this lock is "duct"
53  *                                (only) lockholder may remove the lockfile
54  * D site.name_flushing        temporary feed file during flush (or crash)
55  *                                hardlink created by duct
56  *                                unlinked by duct
57  *   site.name_defer           431'd articles, still being written,
58  *                                created, written, used by duct
59  *
60  *   site.name_backlog.<date>.<inum>
61  *                             431'd articles, ready for innxmit or duct
62  *                                created (link/mv) by duct
63  *   site.name_backlog<anything-else>  (where <anything-else> does not
64  *                                      contain '#' or '~') eg
65  *   site.name_backlog.manual
66  *                             anything the sysadmin likes (eg, feed files
67  *                             from old feeds to be merged into this one)
68  *                                created (link/mv) by admin
69  *                                may be symlinks (in which case links
70  *                                may be written through, but only links
71  *                                will be removed.
72  *
73  *                             It is safe to remove backlog files manually,
74  *                             if it's desired to throw away the backlog.
75  *
76  * Backlog files are also processed by innduct.  We find the oldest
77  * backlog file which is at least a certain amount old, and feed it
78  * back into our processing.  When every article in it has been read
79  * and processed, we unlink it and look for another backlog file.
80  *
81  * If we don't have a backlog file that we're reading, we close the
82  * defer file that we're writing and make it into a backlog file at
83  * the first convenient opportunity.
84  * -8<-
85
86
87    OVERALL STATES:
88
89                                                                 START
90                                                                   |
91      ,-->--.                                                 check F, D
92      |     |                                                      |
93      |     |                                                      |
94      |     |  <----------------<---------------------------------'|
95      |     |                                       F exists       |
96      |     |                                       D ENOENT       |
97      |     |  duct opens F                                        |
98      |     V                                                      |
99      |  Normal                                                    |
100      |   F: innd writing, duct reading                            |
101      |   D: ENOENT                                                |
102      |     |                                                      |
103      |     |  duct decides time to flush                          |
104      |     |  duct makes hardlink                                 |
105      |     |                                                      |
106      |     V                            <------------------------'|
107      |  Hardlinked                                  F==D          |
108      |   F == D: innd writing, duct reading         both exist    |
109      ^     |                                                      |
110      |     |  duct unlinks F                                      |
111      |     |                        <-----------<-------------<--'|
112      |     |                           open D         F ENOENT    |
113      |     |                           if exists                  |
114      |     |                                                      |
115      |     V                        <---------------------.       |
116      |  Moved                                             |       |
117      |   F: ENOENT                                        |       |
118      |   D: innd writing, duct reading; or ENOENT         |       |
119      |     |                                              |       |
120      |     |  duct requests flush of feed                 |       |
121      |     |   (others can too, harmlessly)               |       |
122      |     V                                              |       |
123      |  Flushing                                          |       |
124      |   F: ENOENT                                        |       |
125      |   D: innd flushing, duct; or ENOENT                |       |
126      |     |                                              |       |
127      |     |   inndcomm flush fails                       |       |
128      |     |`-------------------------->------------------'       |
129      |     |                                                      |
130      |     |   inndcomm reports no such site                      |
131      |     |`---------------------------------------------------- | -.
132      |     |                                                      |  |
133      |     |  innd finishes writing D, creates F                  |  |
134      |     |  inndcomm reports flush successful                   |  |
135      |     |                                                      |  |
136      |     V                                                      |  |
137      |  Separated                                <----------------'  |
138      |   F: innd writing                            F!=D             /
139      |   D: duct reading; or ENOENT                  both exist     /
140      |     |                                                       /
141      |     |  duct gets to the end of D                           /
142      |     |  duct opens F too                                   /
143      |     V                                                    /
144      |  Finishing                                              /
145      |   F: innd writing, duct reading                        |
146      |   D: duct finishing                                    V
147      |     |                                            Dropping
148      |     |  duct finishes processing D                 F: ENOENT
149      |     V  duct unlinks D                             D: duct reading
150      |     |                                                  |
151      `--<--'                                                  | duct finishes
152                                                               |  processing D
153                                                               | duct unlinks D
154                                                               | duct exits
155                                                               V
156                                                         Dropped
157                                                          F: ENOENT
158                                                          D: ENOENT
159                                                          duct not running
160
161    "duct reading" means innduct is reading the file but also
162    overwriting processed tokens.
163
164  * ->8- -^L-
165  *
166  * rune for printing diagrams:
167
168 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
169
170  *
171  */
172
173 /*============================== PROGRAM ==============================*/
174
175 #define _GNU_SOURCE 1
176
177 #include "config.h"
178 #include "storage.h"
179 #include "nntp.h"
180 #include "libinn.h"
181 #include "inndcomm.h"
182
183 #include "inn/list.h"
184 #include "inn/innconf.h"
185
186 #include <sys/uio.h>
187 #include <sys/types.h>
188 #include <sys/wait.h>
189 #include <sys/stat.h>
190 #include <sys/socket.h>
191 #include <sys/un.h>
192 #include <unistd.h>
193 #include <string.h>
194 #include <signal.h>
195 #include <stdio.h>
196 #include <errno.h>
197 #include <syslog.h>
198 #include <fcntl.h>
199 #include <stdarg.h>
200 #include <assert.h>
201 #include <stdlib.h>
202 #include <stddef.h>
203 #include <glob.h>
204 #include <time.h>
205 #include <math.h>
206 #include <ctype.h>
207
208 #include <oop.h>
209 #include <oop-read.h>
210
211 /*----- general definitions, probably best not changed -----*/
212
213 #define CONNCHILD_ESTATUS_STREAM   24
214 #define CONNCHILD_ESTATUS_NOSTREAM 25
215
216 #define INNDCOMMCHILD_ESTATUS_FAIL     26
217 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
218
219 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
220 #define MAX_CONTROL_COMMAND 1000
221
222 #define VA                va_list al;  va_start(al,fmt)
223 #define PRINTF(f,a)       __attribute__((__format__(printf,f,a)))
224 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
225 #define NORET             __attribute__((__noreturn__))
226
227 #define NEW(ptr)              ((ptr)= zxmalloc(sizeof(*(ptr))))
228 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
229
230 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
231
232 #define FOR_CONN(conn) \
233   for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
234
235 /*----- doubly linked lists -----*/
236
237 #define ISNODE(T)   struct node list_node
238 #define DEFLIST(T)                              \
239    typedef struct {                             \
240      union { struct list li; T *for_type; } u;  \
241      int count;                                 \
242    } T##List
243
244 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
245
246 #define LIST_CHECKCANHAVENODE(l,n) \
247   ((void)((n) == ((l).u.for_type))) /* just for the type check */
248
249 #define LIST_ADDSOMEHOW(l,n,list_addsomehow)    \
250  ( LIST_CHECKCANHAVENODE(l,n),                  \
251    list_addsomehow(&(l).u.li, NODE((n))),       \
252    (void)(l).count++                            \
253    )
254
255 #define LIST_REMSOMEHOW(l,list_remsomehow)      \
256  ( (typeof((l).u.for_type))                     \
257    ( (l).count                                  \
258      ? ( (l).count--,                           \
259          list_remsomehow(&(l).u.li) )           \
260      : 0                                        \
261      )                                          \
262    )
263
264
265 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
266 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
267 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
268 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
269
270 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
271 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
272 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
273 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
274
275 #define LIST_REMOVE(l,n)                        \
276  ( LIST_CHECKCANHAVENODE(l,n),                  \
277    list_remove(NODE((n))),                      \
278    (void)(l).count--                            \
279    )
280
281 #define LIST_INSERT(l,n,pred)                                   \
282  ( LIST_CHECKCANHAVENODE(l,n),                                  \
283    LIST_CHECKCANHAVENODE(l,pred),                               \
284    list_insert((struct list*)&(l), NODE((n)), NODE((pred))),    \
285    (void)(l).count++                                            \
286    )
287
288 /*----- type predeclarations -----*/
289
290 typedef struct Conn Conn;
291 typedef struct Article Article;
292 typedef struct InputFile InputFile;
293 typedef struct XmitDetails XmitDetails;
294 typedef struct Filemon_Perfile Filemon_Perfile;
295 typedef enum StateMachineState StateMachineState;
296 typedef struct ControlCommand ControlCommand;
297
298 DEFLIST(Conn);
299 DEFLIST(Article);
300
301 /*----- function predeclarations -----*/
302
303 static void conn_maybe_write(Conn *conn);
304 static void conn_make_some_xmits(Conn *conn);
305 static void *conn_write_some_xmits(Conn *conn);
306
307 static void xmit_free(XmitDetails *d);
308
309 #define SMS(newstate, periods, why) \
310    (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
311 static void statemc_setstate(StateMachineState newsms, int periods,
312                              const char *forlog, const char *why);
313
314 static void statemc_start_flush(const char *why); /* Normal => Flushing */
315 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
316 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
317
318 static void article_done(Conn *conn, Article *art, int whichcount);
319
320 static void check_assign_articles(void);
321 static void queue_check_input_done(void);
322
323 static void statemc_check_flushing_done(void);
324 static void statemc_check_backlog_done(void);
325
326 static void postfork(void);
327 static void period(void);
328
329 static void open_defer(void);
330 static void close_defer(void);
331 static void search_backlog_file(void);
332 static void preterminate(void);
333 static void raise_default(int signo) NORET;
334 static char *debug_report_ipf(InputFile *ipf);
335
336 static void inputfile_reading_start(InputFile *ipf);
337 static void inputfile_reading_stop(InputFile *ipf);
338
339 static void filemon_start(InputFile *ipf);
340 static void filemon_stop(InputFile *ipf);
341 static void filemon_callback(InputFile *ipf);
342
343 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
344 static void connfail(Conn *conn, const char *fmt, ...)         PRINTF(2,3);
345
346 static const oop_rd_style peer_rd_style;
347 static oop_rd_call peer_rd_err, peer_rd_ok;
348
349 /*----- configuration options -----*/
350 /* when changing defaults, remember to update the manpage */
351
352 static const char *sitename, *remote_host;
353 static const char *feedfile, *realsockdir="/tmp/innduct.control";
354 static int quiet_multiple=0;
355 static int become_daemon=1, try_filemon=1;
356 static int try_stream=1;
357 static int port=119;
358 static const char *inndconffile;
359
360 static int max_connections=10;
361 static int max_queue_per_conn=200;
362 static int target_max_feedfile_size=100000;
363 static int period_seconds=60;
364 static int filepoll_seconds=5;
365
366 static int connection_setup_timeout=200;
367 static int inndcomm_flush_timeout=100;
368
369 static double nocheck_thresh= 95.0; /* converted from percentage by main */
370 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
371
372 /* all these are initialised to seconds, and converted to periods in main */
373 static int reconnect_delay_periods=1000;
374 static int flushfail_retry_periods=1000;
375 static int backlog_retry_minperiods=50;
376 static int backlog_spontrescan_periods=300;
377 static int spontaneous_flush_periods=100000;
378 static int need_activity_periods=1000;
379
380 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
381 static int max_bad_data_initial= 30;
382   /* in one corrupt 4096-byte block the number of newlines has
383    * mean 16 and standard deviation 3.99.  30 corresponds to z=+3.5 */
384
385
386 /*----- statistics -----*/
387
388 typedef enum {      /* in queue                 in conn->sent             */
389   art_Unchecked,    /*   not checked, not sent    checking                */
390   art_Wanted,       /*   checked, wanted          sent body as requested  */
391   art_Unsolicited,  /*   -                        sent body without check */
392   art_MaxState,
393 } ArtState;
394
395 static const char *const artstate_names[]=
396   { "Unchecked", "Wanted", "Unsolicited", 0 };
397
398 #define RESULT_COUNTS(RCS,RCN)                  \
399   RCS(sent)                                     \
400   RCS(accepted)                                 \
401   RCN(unwanted)                                 \
402   RCN(rejected)                                 \
403   RCN(deferred)                                 \
404   RCN(missing)                                  \
405   RCN(connretry)
406
407 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
408 #define RCI_TRIPLE_VALS_BASE(counts,x)          \
409        counts[art_Unchecked] x                  \
410        + counts[art_Wanted] x                   \
411        + counts[art_Unsolicited] x,             \
412        counts[art_Unchecked] x                  \
413        , counts[art_Wanted] x                   \
414        , counts[art_Unsolicited] x
415
416 typedef enum {
417 #define RC_INDEX(x) RC_##x,
418   RESULT_COUNTS(RC_INDEX, RC_INDEX)
419   RCI_max
420 } ResultCountIndex;
421
422
423 /*----- transmission buffers -----*/
424
425 #define CONNIOVS 128
426
427 typedef enum {
428   xk_Const, xk_Artdata
429 } XmitKind;
430
431 struct XmitDetails {
432   XmitKind kind;
433   union {
434     ARTHANDLE *sm_art;
435   } info;
436 };
437
438
439 /*----- core operational data structure types -----*/
440
441 struct InputFile {
442   /* This is also an instance of struct oop_readable */
443   struct oop_readable readable; /* first */
444   oop_readable_call *readable_callback;
445   void *readable_callback_user;
446
447   int fd;
448   Filemon_Perfile *filemon;
449
450   oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
451   off_t offset;
452   int skippinglong;
453
454   ArticleList queue;
455   long inprogress; /* includes queue.count and also articles in conns */
456
457   int counts[art_MaxState][RCI_max];
458   int readcount_ok, readcount_blank, readcount_err;
459   char path[];
460 };
461
462 struct Article {
463   ISNODE(Article);
464   ArtState state;
465   int midlen, missing;
466   InputFile *ipf;
467   TOKEN token;
468   off_t offset;
469   int blanklen;
470   char messageid[1];
471 };
472
473 #define SMS_LIST(X)                             \
474   X(NORMAL)                                     \
475   X(FLUSHING)                                   \
476   X(FLUSHFAILED)                                \
477   X(SEPARATED)                                  \
478   X(DROPPING)                                   \
479   X(DROPPED)
480
481 enum StateMachineState {
482 #define SMS_DEF_ENUM(s) sm_##s,
483   SMS_LIST(SMS_DEF_ENUM)
484 };
485
486 static const char *sms_names[]= {
487 #define SMS_DEF_NAME(s) #s ,
488   SMS_LIST(SMS_DEF_NAME)
489   0
490 };
491
492 struct Conn {
493   ISNODE(Conn);
494   int fd; /* may be 0, meaning closed (during construction/destruction) */
495   oop_read *rd; /* likewise */
496   int max_queue, stream, quitting;
497   int since_activity; /* periods */
498   ArticleList waiting; /* not yet told peer */
499   ArticleList priority; /* peer says send it now */
500   ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
501   struct iovec xmit[CONNIOVS];
502   XmitDetails xmitd[CONNIOVS];
503   int xmitu;
504 };
505
506
507 /*----- general operational variables -----*/
508
509 /* main initialises */
510 static oop_source *loop;
511 static ConnList conns;
512 static char *path_lock, *path_flushing, *path_defer;
513 static char *path_control, *path_dump;
514 static char *globpat_backlog;
515 static pid_t self_pid;
516
517 /* statemc_init initialises */
518 static StateMachineState sms;
519 static int until_flush;
520 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
521 static FILE *defer;
522
523 /* initialisation to 0 is good */
524 static int until_connect, until_backlog_nextscan;
525 static double accept_proportion;
526 static int nocheck, nocheck_reported, in_child;
527
528 /* for simulation, debugging, etc. */
529 int simulate_flush= -1;
530
531 /*========== logging ==========*/
532
533 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
534 static void logcore(int sysloglevel, const char *fmt, ...) {
535   VA;
536   if (become_daemon) {
537     vsyslog(sysloglevel,fmt,al);
538   } else {
539     if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
540     vfprintf(stderr,fmt,al);
541     putc('\n',stderr);
542   }
543   va_end(al);
544 }
545
546 static void logv(int sysloglevel, const char *pfx, int errnoval,
547                  const char *fmt, va_list al) PRINTF(5,0);
548 static void logv(int sysloglevel, const char *pfx, int errnoval,
549                  const char *fmt, va_list al) {
550   char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
551   vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
552   msgbuf[sizeof(msgbuf)-1]= 0;
553
554   if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
555     sysloglevel= LOG_ERR; /* run by wrong user, probably */
556
557   logcore(sysloglevel, "<%s>%s: %s%s%s",
558          sitename, pfx, msgbuf,
559          errnoval>=0 ? ": " : "",
560          errnoval>=0 ? strerror(errnoval) : "");
561 }
562
563 #define diewrap(fn, pfx, sysloglevel, err, estatus)             \
564   static void fn(const char *fmt, ...) NORET_PRINTF(1,2);       \
565   static void fn(const char *fmt, ...) {                        \
566     preterminate();                                             \
567     VA;                                                         \
568     logv(sysloglevel, pfx, err, fmt, al);                       \
569     exit(estatus);                                              \
570   }
571
572 #define logwrap(fn, pfx, sysloglevel, err)              \
573   static void fn(const char *fmt, ...) PRINTF(1,2);     \
574   static void fn(const char *fmt, ...) {                \
575     VA;                                                 \
576     logv(sysloglevel, pfx, err, fmt, al);               \
577     va_end(al);                                         \
578   }
579
580 diewrap(sysdie,   " critical", LOG_CRIT,    errno, 16);
581 diewrap(die,      " critical", LOG_CRIT,    -1,    16);
582
583 diewrap(sysfatal, " fatal",    LOG_ERR,     errno, 12);
584 diewrap(fatal,    " fatal",    LOG_ERR,     -1,    12);
585
586 logwrap(syswarn,  " warning",  LOG_WARNING, errno);
587 logwrap(warn,     " warning",  LOG_WARNING, -1);
588
589 logwrap(notice,   " notice",   LOG_NOTICE,  -1);
590 logwrap(info,     " info",     LOG_INFO,    -1);
591 logwrap(debug,    " debug",    LOG_DEBUG,   -1);
592
593
594 /*========== utility functions etc. ==========*/
595
596 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
597 static char *xvasprintf(const char *fmt, va_list al) {
598   char *str;
599   int rc= vasprintf(&str,fmt,al);
600   if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
601   return str;
602 }
603 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
604 static char *xasprintf(const char *fmt, ...) {
605   VA;
606   char *str= xvasprintf(fmt,al);
607   va_end(al);
608   return str;
609 }
610
611 static int close_perhaps(int *fd) {
612   if (*fd <= 0) return 0;
613   int r= close(*fd);
614   *fd=0;
615   return r;
616 }
617 static void xclose(int fd, const char *what, const char *what2) {
618   int r= close(fd);
619   if (r) sysdie("close %s%s",what,what2?what2:"");
620 }
621 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
622   if (*fd <= 0) return;
623   xclose(*fd,what,what2);
624   *fd=0;
625 }
626
627 static pid_t xfork(const char *what) {
628   pid_t child;
629
630   child= fork();
631   if (child==-1) sysfatal("cannot fork for %s",what);
632   debug("forked %s %ld", what, (unsigned long)child);
633   if (!child) postfork();
634   return child;
635 }
636
637 static void on_fd_read_except(int fd, oop_call_fd callback) {
638   loop->on_fd(loop, fd, OOP_READ,      callback, 0);
639   loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
640 }
641 static void cancel_fd_read_except(int fd) {
642   loop->cancel_fd(loop, fd, OOP_READ);
643   loop->cancel_fd(loop, fd, OOP_EXCEPTION);
644 }
645
646 static void report_child_status(const char *what, int status) {
647   if (WIFEXITED(status)) {
648     int es= WEXITSTATUS(status);
649     if (es)
650       warn("%s: child died with error exit status %d", what, es);
651   } else if (WIFSIGNALED(status)) {
652     int sig= WTERMSIG(status);
653     const char *sigstr= strsignal(sig);
654     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
655     if (sigstr)
656       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
657     else
658       warn("%s: child died due to unknown fatal signal %d%s",
659            what, sig, coredump);
660   } else {
661     warn("%s: child died with unknown wait status %d", what,status);
662   }
663 }
664
665 static int xwaitpid(pid_t *pid, const char *what) {
666   int status;
667
668   int r= kill(*pid, SIGKILL);
669   if (r) sysdie("cannot kill %s child", what);
670
671   pid_t got= waitpid(*pid, &status, 0);
672   if (got==-1) sysdie("cannot reap %s child", what);
673   if (got==0) die("cannot reap %s child", what);
674
675   *pid= 0;
676
677   return status;
678 }
679
680 static void *zxmalloc(size_t sz) {
681   void *p= xmalloc(sz);
682   memset(p,0,sz);
683   return p;
684 }
685
686 static void xunlink(const char *path, const char *what) {
687   int r= unlink(path);
688   if (r) sysdie("can't unlink %s %s", path, what);
689 }
690
691 static time_t xtime(void) {
692   time_t now= time(0);
693   if (now==-1) sysdie("time(2) failed");
694   return now;
695 }
696
697 static void xsigaction(int signo, const struct sigaction *sa) {
698   int r= sigaction(signo,sa,0);
699   if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
700 }
701
702 static void xsigsetdefault(int signo) {
703   struct sigaction sa;
704   memset(&sa,0,sizeof(sa));
705   sa.sa_handler= SIG_DFL;
706   xsigaction(signo,&sa);
707 }
708
709 static void xgettimeofday(struct timeval *tv_r) {
710   int r= gettimeofday(tv_r,0);
711   if (r) sysdie("gettimeofday(2) failed");
712 }
713
714 static void xsetnonblock(int fd, int nonblocking) {
715   int errnoval= oop_fd_nonblock(fd, nonblocking);
716   if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
717 }
718
719 static void check_isreg(const struct stat *stab, const char *path,
720                         const char *what) {
721   if (!S_ISREG(stab->st_mode))
722     die("%s %s not a plain file (mode 0%lo)",
723         what, path, (unsigned long)stab->st_mode);
724 }
725
726 static void xfstat(int fd, struct stat *stab_r, const char *what) {
727   int r= fstat(fd, stab_r);
728   if (r) sysdie("could not fstat %s", what);
729 }
730
731 static void xfstat_isreg(int fd, struct stat *stab_r,
732                          const char *path, const char *what) {
733   xfstat(fd, stab_r, what);
734   check_isreg(stab_r, path, what);
735 }
736
737 static void xlstat_isreg(const char *path, struct stat *stab,
738                          int *enoent_r /* 0 means ENOENT is fatal */,
739                          const char *what) {
740   int r= lstat(path, stab);
741   if (r) {
742     if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
743     sysdie("could not lstat %s %s", what, path);
744   }
745   if (enoent_r) *enoent_r= 0;
746   check_isreg(stab, path, what);
747 }
748
749 static int samefile(const struct stat *a, const struct stat *b) {
750   assert(S_ISREG(a->st_mode));
751   assert(S_ISREG(b->st_mode));
752   return (a->st_ino == b->st_ino &&
753           a->st_dev == b->st_dev);
754 }
755
756 static char *sanitise(const char *input, int len) {
757   static char sanibuf[100]; /* returns pointer to this buffer! */
758
759   const char *p= input;
760   const char *endp= len>=0 ? input+len : 0;
761   char *q= sanibuf;
762   *q++= '`';
763   for (;;) {
764     if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
765     int c= (!endp || p<endp) ? *p++ : 0;
766     if (!c) { *q++= '\''; *q=0; break; }
767     if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
768     sprintf(q,"\\x%02x",c);
769     q += 4;
770   }
771   return sanibuf;
772 }
773
774 static int isewouldblock(int errnoval) {
775   return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
776 }
777
778 /*========== command and control connections ==========*/
779
780 static int control_master;
781
782 typedef struct ControlConn ControlConn;
783 struct ControlConn {
784   void (*destroy)(ControlConn*);
785   int fd;
786   oop_read *rd;
787   FILE *out;
788   union {
789     struct sockaddr sa;
790     struct sockaddr_un un;
791   } sa;
792   socklen_t salen;
793 };
794
795 static const oop_rd_style control_rd_style= {
796   OOP_RD_DELIM_STRIP, '\n',
797   OOP_RD_NUL_FORBID,
798   OOP_RD_SHORTREC_FORBID
799 };
800
801 static void control_destroy(ControlConn *cc) {
802   cc->destroy(cc);
803 }
804
805 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
806   if (ferror(cc->out) | fflush(cc->out)) {
807     info("CTRL%d write error %s", cc->fd, strerror(errno));
808     control_destroy(cc);
809   }
810 }
811
812 static void control_prompt(ControlConn *cc /* may destroy*/) {
813   fprintf(cc->out, "%s| ", sitename);
814   control_checkouterr(cc);
815 }
816
817 struct ControlCommand {
818   const char *cmd;
819   void (*f)(ControlConn *cc, const ControlCommand *ccmd,
820             const char *arg, size_t argsz);
821   void *xdata;
822   int xval;
823 };
824
825 static const ControlCommand control_commands[];
826
827 #define CCMD(wh)                                                        \
828   static void ccmd_##wh(ControlConn *cc, const ControlCommand *c,       \
829                         const char *arg, size_t argsz)
830
831 CCMD(help) {
832   fputs("commands:\n", cc->out);
833   const ControlCommand *ccmd;
834   for (ccmd=control_commands; ccmd->cmd; ccmd++)
835     fprintf(cc->out, " %s\n", ccmd->cmd);
836   fputs("NB: permissible arguments are not shown above."
837         "  Not all commands listed are safe.  See innduct(8).\n", cc->out);
838 }
839
840 CCMD(flush) {
841   int ok= trigger_flush_ok();
842   if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
843 }
844
845 CCMD(stop) {
846   preterminate();
847   notice("terminating (CTRL%d)",cc->fd);
848   raise_default(SIGTERM);
849   abort();
850 }
851
852 CCMD(dump);
853
854 /* messing with our head: */
855 CCMD(period) { period(); }
856 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
857 CCMD(setint) { *(int*)c->xdata= c->xval; }
858 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
859
860 static const ControlCommand control_commands[]= {
861   { "h",             ccmd_help      },
862   { "flush",         ccmd_flush     },
863   { "stop",          ccmd_stop      },
864   { "dump q",        ccmd_dump, 0,0 },
865   { "dump a",        ccmd_dump, 0,1 },
866
867   { "p",             ccmd_period    },
868
869 #define POKES(cmd,func)                                                 \
870   { cmd "flush",     func,           &until_flush,             1 },     \
871   { cmd "conn",      func,           &until_connect,           0 },     \
872   { cmd "blscan",    func,           &until_backlog_nextscan,  0 },
873 POKES("next ", ccmd_setint)
874 POKES("prod ", ccmd_setint_period)
875
876   { "pretend flush", ccmd_setintarg, &simulate_flush             },
877   { "wedge blscan",  ccmd_setint,    &until_backlog_nextscan, -1 },
878   { 0 }
879 };
880
881 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
882                            const char *errmsg, int errnoval,
883                            const char *data, size_t recsz, void *cc_v) {
884   ControlConn *cc= cc_v;
885
886   if (!data) {
887     info("CTRL%d closed", cc->fd);
888     cc->destroy(cc);
889     return OOP_CONTINUE;
890   }
891
892   if (recsz == 0) goto prompt;
893
894   const ControlCommand *ccmd;
895   for (ccmd=control_commands; ccmd->cmd; ccmd++) {
896     int l= strlen(ccmd->cmd);
897     if (recsz < l) continue;
898     if (recsz > l && data[l] != ' ') continue;
899     if (memcmp(data, ccmd->cmd, l)) continue;
900
901     int argl= (int)recsz - (l+1); 
902     ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
903     goto prompt;
904   }
905
906   fputs("unknown command; h for help\n", cc->out);
907
908  prompt:
909   control_prompt(cc);
910   return OOP_CONTINUE;
911 }
912
913 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
914                             const char *errmsg, int errnoval,
915                             const char *data, size_t recsz, void *cc_v) {
916   ControlConn *cc= cc_v;
917   
918   info("CTRL%d read error %s", cc->fd, errmsg);
919   cc->destroy(cc);
920   return OOP_CONTINUE;
921 }
922
923 static int control_conn_startup(ControlConn *cc /* may destroy*/,
924                                 const char *how) {
925   cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
926   if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
927
928   int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
929                       control_rd_ok, cc,
930                       control_rd_err, cc);
931   if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
932
933   info("CTRL%d %s ready", cc->fd, how);
934   control_prompt(cc);
935   return 0;
936 }
937
938 static void control_stdio_destroy(ControlConn *cc) {
939   if (cc->rd) {
940     oop_rd_cancel(cc->rd);
941     errno= oop_rd_delete_tidy(cc->rd);
942     if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
943   }
944   free(cc);
945 }
946
947 static void control_stdio(void) {
948   NEW_DECL(ControlConn *,cc);
949   cc->destroy= control_stdio_destroy;
950
951   cc->fd= 0;
952   cc->out= stdout;
953   int r= control_conn_startup(cc,"stdio");
954   if (r) cc->destroy(cc);
955 }
956
957 static void control_accepted_destroy(ControlConn *cc) {
958   if (cc->rd) {
959     oop_rd_cancel(cc->rd);
960     oop_rd_delete_kill(cc->rd);
961   }
962   if (cc->out) { fclose(cc->out); cc->fd=0; }
963   close_perhaps(&cc->fd);
964   free(cc);
965 }
966
967 static void *control_master_readable(oop_source *lp, int master,
968                                      oop_event ev, void *u) {
969   NEW_DECL(ControlConn *,cc);
970   cc->destroy= control_accepted_destroy;
971
972   cc->salen= sizeof(cc->sa);
973   cc->fd= accept(master, &cc->sa.sa, &cc->salen);
974   if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
975
976   cc->out= fdopen(cc->fd, "w");
977   if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
978
979   int r= control_conn_startup(cc, "accepted");
980   if (r) goto x;
981
982   return OOP_CONTINUE;
983
984  x:
985   cc->destroy(cc);
986   return OOP_CONTINUE;
987 }
988
989 #define NOCONTROL(...) do{                                              \
990     syswarn("no control socket, because failed to " __VA_ARGS__);       \
991     goto nocontrol;                                                     \
992   }while(0)
993
994 static void control_init(void) {
995   char *real=0;
996   
997   union {
998     struct sockaddr sa;
999     struct sockaddr_un un;
1000   } sa;
1001
1002   memset(&sa,0,sizeof(sa));
1003   int maxlen= sizeof(sa.un.sun_path);
1004
1005   int reallen= readlink(path_control, sa.un.sun_path, maxlen);
1006   if (reallen<0) {
1007     if (errno != ENOENT)
1008       NOCONTROL("readlink control socket symlink path %s", path_control);
1009   }
1010   if (reallen >= maxlen) {
1011     debug("control socket symlink path too long (r=%d)",reallen);
1012     xunlink(path_control, "old (overlong) control socket symlink");
1013     reallen= -1;
1014   }
1015   
1016   if (reallen<0) {
1017     struct stat stab;
1018     int r= lstat(realsockdir,&stab);
1019     if (r) {
1020       if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
1021
1022       r= mkdir(realsockdir, 0700);
1023       if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
1024
1025     } else {
1026       uid_t self= geteuid();
1027       if (!S_ISDIR(stab.st_mode) ||
1028           stab.st_uid != self ||
1029           stab.st_mode & 0007) {
1030         warn("no control socket, because real socket directory"
1031              " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
1032              !!S_ISDIR(stab.st_mode),
1033              (unsigned long)stab.st_uid, (unsigned long)self,
1034              (unsigned long)stab.st_mode & 0777UL);
1035         goto nocontrol;
1036       }
1037     }
1038
1039     real= xasprintf("%s/s%lx.%lx", realsockdir,
1040                     (unsigned long)xtime(), (unsigned long)self_pid);
1041     int reallen= strlen(real);
1042
1043     if (reallen >= maxlen) {
1044       warn("no control socket, because tmpnam gave overly-long path"
1045            " %s", real);
1046       goto nocontrol;
1047     }
1048     r= symlink(real, path_control);
1049     if (r) NOCONTROL("make control socket path %s a symlink to real"
1050                      " socket path %s", path_control, real);
1051     memcpy(sa.un.sun_path, real, reallen);
1052   }
1053
1054   int r= unlink(sa.un.sun_path);
1055   if (r && errno!=ENOENT)
1056     NOCONTROL("remove old real socket %s", sa.un.sun_path);
1057
1058   control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1059   if (control_master<0) NOCONTROL("create new control socket");
1060
1061   sa.un.sun_family= AF_UNIX;
1062   int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1063   r= bind(control_master, &sa.sa, sl);
1064   if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1065
1066   r= listen(control_master, 5);
1067   if (r) NOCONTROL("listen");
1068
1069   xsetnonblock(control_master, 1);
1070
1071   loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1072   info("control socket ok, real path %s", sa.un.sun_path);
1073
1074   return;
1075
1076  nocontrol:
1077   free(real);
1078   xclose_perhaps(&control_master, "control master",0);
1079   return;
1080 }
1081
1082 /*========== management of connections ==========*/
1083
1084 static void conn_closefd(Conn *conn, const char *msgprefix) {
1085   int r= close_perhaps(&conn->fd);
1086   if (r) info("C%d %serror closing socket: %s",
1087               conn->fd, msgprefix, strerror(errno));
1088 }
1089
1090 static void conn_dispose(Conn *conn) {
1091   if (!conn) return;
1092   if (conn->rd) {
1093     oop_rd_cancel(conn->rd);
1094     oop_rd_delete_kill(conn->rd);
1095     conn->rd= 0;
1096   }
1097   if (conn->fd) {
1098     loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1099     loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1100   }
1101   conn_closefd(conn,"");
1102   free(conn);
1103   until_connect= reconnect_delay_periods;
1104 }
1105
1106 static void *conn_exception(oop_source *lp, int fd,
1107                             oop_event ev, void *conn_v) {
1108   Conn *conn= conn_v;
1109   unsigned char ch;
1110   assert(fd == conn->fd);
1111   assert(ev == OOP_EXCEPTION);
1112   int r= read(conn->fd, &ch, 1);
1113   if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1114   else connfail(conn,"exceptional condition on socket (peer sent urgent"
1115                 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1116   return OOP_CONTINUE;
1117 }  
1118
1119 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1120   int requeue[art_MaxState];
1121   memset(requeue,0,sizeof(requeue));
1122
1123   Article *art;
1124   
1125   while ((art= LIST_REMHEAD(conn->priority)))
1126     LIST_ADDTAIL(art->ipf->queue, art);
1127
1128   while ((art= LIST_REMHEAD(conn->waiting)))
1129     LIST_ADDTAIL(art->ipf->queue, art);
1130
1131   while ((art= LIST_REMHEAD(conn->sent))) {
1132     requeue[art->state]++;
1133     if (art->state==art_Unsolicited) art->state= art_Unchecked;
1134     LIST_ADDTAIL(art->ipf->queue,art);
1135   }
1136
1137   int i;
1138   XmitDetails *d;
1139   for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1140     xmit_free(d);
1141
1142   char *m= xvasprintf(fmt,al);
1143   warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1144        conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1145   free(m);
1146
1147   LIST_REMOVE(conns,conn);
1148   conn_dispose(conn);
1149   check_assign_articles();
1150 }
1151
1152 static void connfail(Conn *conn, const char *fmt, ...) {
1153   va_list al;
1154   va_start(al,fmt);
1155   vconnfail(conn,fmt,al);
1156   va_end(al);
1157 }
1158
1159 static void check_idle_conns(void) {
1160   Conn *conn;
1161   FOR_CONN(conn)
1162     conn->since_activity++;
1163  search_again:
1164   FOR_CONN(conn) {
1165     if (conn->since_activity <= need_activity_periods) continue;
1166
1167     /* We need to shut this down */
1168     if (conn->quitting)
1169       connfail(conn,"timed out waiting for response to QUIT");
1170     else if (conn->sent.count)
1171       connfail(conn,"timed out waiting for responses");
1172     else if (conn->waiting.count || conn->priority.count)
1173       connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1174     else if (conn->xmitu)
1175       connfail(conn,"peer has been sending responses"
1176                " before receiving our commands!");
1177     else {
1178       static const char quitcmd[]= "QUIT\r\n";
1179       int todo= sizeof(quitcmd)-1;
1180       const char *p= quitcmd;
1181       for (;;) {
1182         int r= write(conn->fd, p, todo);
1183         if (r<0) {
1184           if (isewouldblock(errno))
1185             connfail(conn, "blocked writing QUIT to idle connection");
1186           else
1187             connfail(conn, "failed to write QUIT to idle connection: %s",
1188                      strerror(errno));
1189           break;
1190         }
1191         assert(r<=todo);
1192         todo -= r;
1193         if (!todo) {
1194           conn->quitting= 1;
1195           conn->since_activity= 0;
1196           debug("C%d is idle, quitting", conn->fd);
1197           break;
1198         }
1199       }
1200     }
1201     goto search_again;
1202   }
1203 }  
1204
1205 /*---------- making new connections ----------*/
1206
1207 static pid_t connecting_child;
1208 static int connecting_fdpass_sock;
1209
1210 static void connect_attempt_discard(void) {
1211   if (connecting_child) {
1212     int status= xwaitpid(&connecting_child, "connect");
1213     if (!(WIFEXITED(status) ||
1214           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1215       report_child_status("connect", status);
1216   }
1217   if (connecting_fdpass_sock) {
1218     cancel_fd_read_except(connecting_fdpass_sock);
1219     xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1220   }
1221 }
1222
1223 #define PREP_DECL_MSG_CMSG(msg)                 \
1224   char msgbyte= 0;                              \
1225   struct iovec msgiov;                          \
1226   msgiov.iov_base= &msgbyte;                    \
1227   msgiov.iov_len= 1;                            \
1228   struct msghdr msg;                            \
1229   memset(&msg,0,sizeof(msg));                   \
1230   char msg##cbuf[CMSG_SPACE(sizeof(int))];      \
1231   msg.msg_iov= &msgiov;                         \
1232   msg.msg_iovlen= 1;                            \
1233   msg.msg_control= msg##cbuf;                   \
1234   msg.msg_controllen= sizeof(msg##cbuf);
1235
1236 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1237   Conn *conn= 0;
1238
1239   assert(fd == connecting_fdpass_sock);
1240
1241   PREP_DECL_MSG_CMSG(msg);
1242   
1243   ssize_t rs= recvmsg(fd, &msg, 0);
1244   if (rs<0) {
1245     if (isewouldblock(errno)) return OOP_CONTINUE;
1246     syswarn("failed to read socket from connecting child");
1247     goto x;
1248   }
1249
1250   NEW(conn);
1251   LIST_INIT(conn->waiting);
1252   LIST_INIT(conn->priority);
1253   LIST_INIT(conn->sent);
1254
1255   struct cmsghdr *h= 0;
1256   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1257   if (!h) {
1258     int status= xwaitpid(&connecting_child, "connect child (broken)");
1259
1260     if (WIFEXITED(status)) {
1261       if (WEXITSTATUS(status) != 0 &&
1262           WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1263           WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1264         /* child already reported the problem */;
1265       else {
1266         if (e == OOP_EXCEPTION)
1267           warn("connect: connection child exited code %d but"
1268                " unexpected exception on fdpass socket",
1269                WEXITSTATUS(status));
1270         else
1271           warn("connect: connection child exited code %d but"
1272                " no cmsg (rs=%d)",
1273                WEXITSTATUS(status), (int)rs);
1274       }
1275     } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1276       warn("connect: connection attempt timed out");
1277     } else {
1278       report_child_status("connect", status);
1279     }
1280     goto x;
1281   }
1282
1283 #define CHK(field, val)                                                  \
1284   if (h->cmsg_##field != val) {                                          \
1285     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1286         h->cmsg_##field, val);                                           \
1287     goto x;                                                              \
1288   }
1289   CHK(level, SOL_SOCKET);
1290   CHK(type,  SCM_RIGHTS);
1291   CHK(len,   CMSG_LEN(sizeof(conn->fd)));
1292 #undef CHK
1293
1294   if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1295
1296   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1297
1298   int status;
1299   pid_t got= waitpid(connecting_child, &status, 0);
1300   if (got==-1) sysdie("connect: real wait for child");
1301   assert(got == connecting_child);
1302   connecting_child= 0;
1303
1304   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1305   int es= WEXITSTATUS(status);
1306   switch (es) {
1307   case CONNCHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
1308   case CONNCHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
1309   default:
1310     fatal("connect: child gave unexpected exit status %d", es);
1311   }
1312
1313   /* Phew! */
1314   conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1315
1316   loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1317   conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1318   if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1319   int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1320                      &peer_rd_ok, conn,
1321                      &peer_rd_err, conn);
1322   if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1323
1324   notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1325   LIST_ADDHEAD(conns, conn);
1326
1327   connect_attempt_discard();
1328   check_assign_articles();
1329   return OOP_CONTINUE;
1330
1331  x:
1332   conn_dispose(conn);
1333   connect_attempt_discard();
1334   return OOP_CONTINUE;
1335 }
1336
1337 static int allow_connect_start(void) {
1338   return conns.count < max_connections
1339     && !connecting_child
1340     && !until_connect;
1341 }
1342
1343 static void connect_start(void) {
1344   assert(!connecting_child);
1345   assert(!connecting_fdpass_sock);
1346
1347   info("starting connection attempt");
1348
1349   int socks[2];
1350   int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1351   if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1352
1353   connecting_child= xfork("connection");
1354
1355   if (!connecting_child) {
1356     FILE *cn_from, *cn_to;
1357     char buf[NNTP_STRLEN+100];
1358     int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1359
1360     xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1361
1362     alarm(connection_setup_timeout);
1363     if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1364       int l= strlen(buf);
1365       int stripped=0;
1366       while (l>0) {
1367         unsigned char c= buf[l-1];
1368         if (!isspace(c)) break;
1369         if (c=='\n' || c=='\r') stripped=1;
1370         --l;
1371       }
1372       if (!buf[0]) {
1373         sysfatal("connect: connection attempt failed");
1374       } else {
1375         buf[l]= 0;
1376         fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1377               sanitise(buf,-1));
1378       }
1379     }
1380     if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1381       sysfatal("connect: authentication failed");
1382     if (try_stream) {
1383       if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1384           fflush(cn_to))
1385         sysfatal("connect: could not send MODE STREAM");
1386       buf[sizeof(buf)-1]= 0;
1387       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1388         if (ferror(cn_from))
1389           sysfatal("connect: could not read response to MODE STREAM");
1390         else
1391           fatal("connect: connection close in response to MODE STREAM");
1392       }
1393       int l= strlen(buf);
1394       assert(l>=1);
1395       if (buf[l-1]!='\n')
1396         fatal("connect: response to MODE STREAM is too long: %.100s...",
1397               sanitise(buf,-1));
1398       l--;  if (l>0 && buf[l-1]=='\r') l--;
1399       buf[l]= 0;
1400       char *ep;
1401       int rcode= strtoul(buf,&ep,10);
1402       if (ep != &buf[3])
1403         fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1404
1405       switch (rcode) {
1406       case 203:
1407         exitstatus= CONNCHILD_ESTATUS_STREAM;
1408         break;
1409       case 480:
1410       case 500:
1411         break;
1412       default:
1413         warn("connect: unexpected response to MODE STREAM: %.50s",
1414              sanitise(buf,-1));
1415         exitstatus= 2;
1416         break;
1417       }
1418     }
1419     int fd= fileno(cn_from);
1420
1421     PREP_DECL_MSG_CMSG(msg);
1422     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1423     cmsg->cmsg_level= SOL_SOCKET;
1424     cmsg->cmsg_type=  SCM_RIGHTS;
1425     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
1426     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1427
1428     msg.msg_controllen= cmsg->cmsg_len;
1429     r= sendmsg(socks[1], &msg, 0);
1430     if (r<0) sysdie("sendmsg failed for new connection");
1431     if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1432
1433     _exit(exitstatus);
1434   }
1435
1436   xclose(socks[1], "connecting fdpass child's socket",0);
1437   connecting_fdpass_sock= socks[0];
1438   xsetnonblock(connecting_fdpass_sock, 1);
1439   on_fd_read_except(connecting_fdpass_sock, connchild_event);
1440 }
1441
1442 /*---------- assigning articles to conns, and transmitting ----------*/
1443
1444 static Article *dequeue_from(int peek, InputFile *ipf) {
1445   if (!ipf) return 0;
1446   if (peek) return LIST_HEAD(ipf->queue);
1447   else return LIST_REMHEAD(ipf->queue);
1448 }
1449
1450 static Article *dequeue(int peek) {
1451   Article *art;
1452   art= dequeue_from(peek, flushing_input_file);  if (art) return art;
1453   art= dequeue_from(peek, backlog_input_file);   if (art) return art;
1454   art= dequeue_from(peek, main_input_file);      if (art) return art;
1455   return 0;
1456 }
1457
1458 static void check_assign_articles(void) {
1459   for (;;) {
1460     if (!dequeue(1))
1461       break;
1462
1463     Conn *walk, *use=0;
1464     int spare=0, inqueue=0;
1465
1466     /* Find a connection to offer this article.  We prefer a busy
1467      * connection to an idle one, provided it's not full.  We take the
1468      * first (oldest) and since that's stable, it will mean we fill up
1469      * connections in order.  That way if we have too many
1470      * connections, the spare ones will go away eventually.
1471      */
1472     FOR_CONN(walk) {
1473       if (walk->quitting) continue;
1474       inqueue= walk->sent.count + walk->priority.count
1475              + walk->waiting.count;
1476       spare= walk->max_queue - inqueue;
1477       assert(inqueue <= max_queue_per_conn);
1478       assert(spare >= 0);
1479       if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1480       else if (spare>0) /*working*/ { use= walk; break; }
1481     }
1482     if (use) {
1483       if (!inqueue) use->since_activity= 0; /* reset idle counter */
1484       while (spare>0) {
1485         Article *art= dequeue(0);
1486         if (!art) break;
1487         LIST_ADDTAIL(use->waiting, art);
1488         spare--;
1489       }
1490       conn_maybe_write(use);
1491     } else if (allow_connect_start()) {
1492       until_connect= reconnect_delay_periods;
1493       connect_start();
1494       break;
1495     } else {
1496       break;
1497     }
1498   }
1499 }
1500
1501 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1502   conn_maybe_write(u);
1503   return OOP_CONTINUE;
1504 }
1505
1506 static void conn_maybe_write(Conn *conn)  {
1507   for (;;) {
1508     conn_make_some_xmits(conn);
1509     if (!conn->xmitu) {
1510       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1511       return;
1512     }
1513
1514     void *rp= conn_write_some_xmits(conn);
1515     if (rp==OOP_CONTINUE) {
1516       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1517       return;
1518     } else if (rp==OOP_HALT) {
1519       return;
1520     } else if (!rp) {
1521       /* transmitted everything */
1522     } else {
1523       abort();
1524     }
1525   }
1526 }
1527
1528 /*========== article transmission ==========*/
1529
1530 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1531                   XmitKind kind) { /* caller must then fill in details */
1532   struct iovec *v= &conn->xmit[conn->xmitu];
1533   XmitDetails *d= &conn->xmitd[conn->xmitu++];
1534   v->iov_base= (char*)data;
1535   v->iov_len= len;
1536   d->kind= kind;
1537   return d;
1538 }
1539
1540 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1541   xmit_core(conn,data,len, xk_Const);
1542 }
1543 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1544
1545 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1546   XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1547   d->info.sm_art= ah;
1548 }
1549
1550 static void xmit_free(XmitDetails *d) {
1551   switch (d->kind) {
1552   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1553   case xk_Const:                                  break;
1554   default: abort();
1555   }
1556 }
1557
1558 static void *conn_write_some_xmits(Conn *conn) {
1559   /* return values:
1560    *      0:            nothing more to write, no need to call us again
1561    *      OOP_CONTINUE: more to write but fd not writeable
1562    *      OOP_HALT:     disaster, have destroyed conn
1563    */
1564   for (;;) {
1565     int count= conn->xmitu;
1566     if (!count) return 0;
1567
1568     if (count > IOV_MAX) count= IOV_MAX;
1569     ssize_t rs= writev(conn->fd, conn->xmit, count);
1570     if (rs < 0) {
1571       if (isewouldblock(errno)) return OOP_CONTINUE;
1572       connfail(conn, "write failed: %s", strerror(errno));
1573       return OOP_HALT;
1574     }
1575     assert(rs > 0);
1576
1577     int done;
1578     for (done=0; rs && done<conn->xmitu; done++) {
1579       struct iovec *vp= &conn->xmit[done];
1580       XmitDetails *dp= &conn->xmitd[done];
1581       if (rs > vp->iov_len) {
1582         rs -= vp->iov_len;
1583         xmit_free(dp);
1584       } else {
1585         vp->iov_base= (char*)vp->iov_base + rs;
1586         vp->iov_len -= rs;
1587       }
1588     }
1589     int newu= conn->xmitu - done;
1590     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
1591     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1592     conn->xmitu= newu;
1593   }
1594 }
1595
1596 static void conn_make_some_xmits(Conn *conn) {
1597   for (;;) {
1598     if (conn->xmitu+5 > CONNIOVS)
1599       break;
1600
1601     Article *art= LIST_REMHEAD(conn->priority);
1602     if (!art) art= LIST_REMHEAD(conn->waiting);
1603     if (!art) break;
1604
1605     if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1606       /* actually send it */
1607
1608       ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1609
1610       art->state=
1611         art->state == art_Unchecked ? art_Unsolicited :
1612         art->state == art_Wanted    ? art_Wanted      :
1613         (abort(),-1);
1614
1615       if (!artdata) art->missing= 1;
1616       art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1617
1618       if (conn->stream) {
1619         if (artdata) {
1620           XMIT_LITERAL("TAKETHIS ");
1621           xmit_noalloc(conn, art->messageid, art->midlen);
1622           XMIT_LITERAL("\r\n");
1623           xmit_artbody(conn, artdata);
1624         } else {
1625           article_done(conn, art, -1);
1626           continue;
1627         }
1628       } else {
1629         /* we got 235 from IHAVE */
1630         if (artdata) {
1631           xmit_artbody(conn, artdata);
1632         } else {
1633           XMIT_LITERAL(".\r\n");
1634         }
1635       }
1636
1637       LIST_ADDTAIL(conn->sent, art);
1638
1639     } else {
1640       /* check it */
1641
1642       if (conn->stream)
1643         XMIT_LITERAL("CHECK ");
1644       else
1645         XMIT_LITERAL("IHAVE ");
1646       xmit_noalloc(conn, art->messageid, art->midlen);
1647       XMIT_LITERAL("\r\n");
1648
1649       assert(art->state == art_Unchecked);
1650       art->ipf->counts[art->state][RC_sent]++;
1651       LIST_ADDTAIL(conn->sent, art);
1652     }
1653   }
1654 }
1655
1656
1657 /*========== handling responses from peer ==========*/
1658
1659 static const oop_rd_style peer_rd_style= {
1660   OOP_RD_DELIM_STRIP, '\n',
1661   OOP_RD_NUL_FORBID,
1662   OOP_RD_SHORTREC_FORBID
1663 };
1664
1665 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1666                          const char *errmsg, int errnoval,
1667                          const char *data, size_t recsz, void *conn_v) {
1668   Conn *conn= conn_v;
1669   connfail(conn, "error receiving from peer: %s", errmsg);
1670   return OOP_CONTINUE;
1671 }
1672
1673 static Article *article_reply_check(Conn *conn, const char *response,
1674                                     int code_indicates_streaming,
1675                                     int must_have_sent
1676                                         /* 1:yes, -1:no, 0:dontcare */,
1677                                     const char *sanitised_response) {
1678   Article *art= LIST_HEAD(conn->sent);
1679
1680   if (!art) {
1681     connfail(conn,
1682              "peer gave unexpected response when no commands outstanding: %s",
1683              sanitised_response);
1684     return 0;
1685   }
1686
1687   if (code_indicates_streaming) {
1688     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1689     if (!conn->stream) {
1690       connfail(conn, "peer gave streaming response code "
1691                " to IHAVE or subsequent body: %s", sanitised_response);
1692       return 0;
1693     }
1694     const char *got_mid= response+4;
1695     int got_midlen= strcspn(got_mid, " \n\r");
1696     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1697       connfail(conn, "peer gave streaming response with syntactically invalid"
1698                " messageid: %s", sanitised_response);
1699       return 0;
1700     }
1701     if (got_midlen != art->midlen ||
1702         memcmp(got_mid, art->messageid, got_midlen)) {
1703       connfail(conn, "peer gave streaming response code to wrong article -"
1704                " probable synchronisation problem; we offered: %s;"
1705                " peer said: %s",
1706                art->messageid, sanitised_response);
1707       return 0;
1708     }
1709   } else {
1710     if (conn->stream) {
1711       connfail(conn, "peer gave non-streaming response code to"
1712                " CHECK/TAKETHIS: %s", sanitised_response);
1713       return 0;
1714     }
1715   }
1716
1717   if (must_have_sent>0 && art->state < art_Wanted) {
1718     connfail(conn, "peer says article accepted but"
1719              " we had not sent the body: %s", sanitised_response);
1720     return 0;
1721   }
1722   if (must_have_sent<0 && art->state >= art_Wanted) {
1723     connfail(conn, "peer says please sent the article but we just did: %s",
1724              sanitised_response);
1725     return 0;
1726   }
1727
1728   Article *art_again= LIST_REMHEAD(conn->sent);
1729   assert(art_again == art);
1730   return art;
1731 }
1732
1733 static void update_nocheck(int accepted) {
1734   accept_proportion *= nocheck_decay;
1735   accept_proportion += accepted * (1.0 - nocheck_decay);
1736   int new_nocheck= accept_proportion >= nocheck_thresh;
1737   if (new_nocheck && !nocheck_reported) {
1738     notice("entering nocheck mode for the first time");
1739     nocheck_reported= 1;
1740   } else if (new_nocheck != nocheck) {
1741     debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1742   }
1743   nocheck= new_nocheck;
1744 }
1745
1746 static void article_done(Conn *conn, Article *art, int whichcount) {
1747   if (!art->missing) art->ipf->counts[art->state][whichcount]++;
1748
1749   if (whichcount == RC_accepted) update_nocheck(1);
1750   else if (whichcount == RC_unwanted) update_nocheck(0);
1751
1752   InputFile *ipf= art->ipf;
1753
1754   while (art->blanklen) {
1755     static const char spaces[]=
1756       "                                                                "
1757       "                                                                "
1758       "                                                                "
1759       "                                                                "
1760       "                                                                "
1761       "                                                                "
1762       "                                                                "
1763       "                                                                "
1764       "                                                                ";
1765     int w= art->blanklen;  if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1766     int r= pwrite(ipf->fd, spaces, w, art->offset);
1767     if (r==-1) {
1768       if (errno==EINTR) continue;
1769       sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1770              art->messageid, art->blanklen,
1771              (unsigned long)art->offset, ipf->path);
1772     }
1773     assert(r>=0 && r<=w);
1774     art->blanklen -= w;
1775     art->offset += w;
1776   }
1777
1778   ipf->inprogress--;
1779   assert(ipf->inprogress >= 0);
1780   free(art);
1781
1782   if (!ipf->inprogress && ipf != main_input_file)
1783     queue_check_input_done();
1784 }
1785
1786 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1787                         const char *errmsg, int errnoval,
1788                         const char *data, size_t recsz, void *conn_v) {
1789   Conn *conn= conn_v;
1790
1791   if (ev == OOP_RD_EOF) {
1792     connfail(conn, "unexpected EOF from peer");
1793     return OOP_CONTINUE;
1794   }
1795   assert(ev == OOP_RD_OK);
1796
1797   char *sani= sanitise(data,-1);
1798
1799   char *ep;
1800   unsigned long code= strtoul(data, &ep, 10);
1801   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1802     connfail(conn, "badly formatted response from peer: %s", sani);
1803     return OOP_CONTINUE;
1804   }
1805
1806   int conn_busy=
1807     conn->waiting.count ||
1808     conn->priority.count ||
1809     conn->sent.count ||
1810     conn->xmitu;
1811
1812   if (conn->quitting) {
1813     if (code!=205 && code!=503) {
1814       connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1815     } else {
1816       notice("C%d idle connection closed by us", conn->fd);
1817       assert(!conn_busy);
1818       LIST_REMOVE(conns,conn);
1819       conn_dispose(conn);
1820     }
1821     return OOP_CONTINUE;
1822   }
1823
1824   conn->since_activity= 0;
1825   Article *art;
1826
1827 #define GET_ARTICLE(musthavesent) do{                                         \
1828     art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1829     if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
1830   }while(0) 
1831
1832 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{       \
1833     code_streaming= (streaming);                                \
1834     GET_ARTICLE(musthavesent);                                  \
1835     article_done(conn, art, RC_##how);                          \
1836     goto dealtwith;                                             \
1837   }while(0)
1838
1839 #define PEERBADMSG(m) do {                                      \
1840     connfail(conn, m ": %s", sani);  return OOP_CONTINUE;       \
1841   }while(0)
1842
1843   int code_streaming= 0;
1844
1845   switch (code) {
1846
1847   case 400: PEERBADMSG("peer stopped accepting articles");
1848   default:  PEERBADMSG("peer sent unexpected message");
1849
1850   case 503:
1851     if (conn_busy) PEERBADMSG("peer timed us out");
1852     notice("C%d idle connection closed by peer", conn->fd);
1853     LIST_REMOVE(conns,conn);
1854     conn_dispose(conn);
1855     return OOP_CONTINUE;
1856
1857   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1858   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1859
1860   case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1861   case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1862
1863   case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1864   case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1865
1866   case 238: /* CHECK says send it */
1867     code_streaming= 1;
1868   case 335: /* IHAVE says send it */
1869     GET_ARTICLE(-1);
1870     assert(art->state == art_Unchecked);
1871     art->ipf->counts[art->state][RC_accepted]++;
1872     art->state= art_Wanted;
1873     LIST_ADDTAIL(conn->priority, art);
1874     break;
1875
1876   case 431: /* CHECK or TAKETHIS says try later */
1877     code_streaming= 1;
1878   case 436: /* IHAVE says try later */
1879     GET_ARTICLE(0);
1880     open_defer();
1881     if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1882         || fflush(defer))
1883       sysfatal("write to defer file %s",path_defer);
1884     article_done(conn, art, RC_deferred);
1885     break;
1886
1887   }
1888 dealtwith:
1889
1890   conn_maybe_write(conn);
1891   check_assign_articles();
1892   return OOP_CONTINUE;
1893 }
1894
1895
1896 /*========== monitoring of input files ==========*/
1897
1898 static void feedfile_eof(InputFile *ipf) {
1899   assert(ipf != main_input_file); /* promised by tailing_try_read */
1900   inputfile_reading_stop(ipf);
1901
1902   if (ipf == flushing_input_file) {
1903     assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1904     if (main_input_file) inputfile_reading_start(main_input_file);
1905     statemc_check_flushing_done();
1906   } else if (ipf == backlog_input_file) {
1907     statemc_check_backlog_done();
1908   } else {
1909     abort(); /* supposed to wait rather than get EOF on main input file */
1910   }
1911 }
1912
1913 static InputFile *open_input_file(const char *path) {
1914   int fd= open(path, O_RDWR);
1915   if (fd<0) {
1916     if (errno==ENOENT) return 0;
1917     sysfatal("unable to open input file %s", path);
1918   }
1919   assert(fd>0);
1920
1921   InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
1922   memset(ipf,0,sizeof(*ipf));
1923
1924   ipf->fd= fd;
1925   strcpy(ipf->path, path);
1926   LIST_INIT(ipf->queue);
1927
1928   return ipf;
1929 }
1930
1931 static void close_input_file(InputFile *ipf) { /* does not free */
1932   assert(!ipf->readable_callback); /* must have had ->on_cancel */
1933   assert(!ipf->filemon); /* must have had inputfile_reading_stop */
1934   assert(!ipf->rd); /* must have had inputfile_reading_stop */
1935   assert(!ipf->inprogress); /* no dangling pointers pointing here */
1936   xclose_perhaps(&ipf->fd, "input file ", ipf->path);
1937 }
1938
1939
1940 /*---------- dealing with articles read in the input file ----------*/
1941
1942 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
1943                                    const char *data, const char *how) {
1944   warn("corrupted file: %s, offset %lu: %s: in %s",
1945        ipf->path, (unsigned long)offset, how, sanitise(data,-1));
1946   ipf->readcount_err++;
1947   if (ipf->readcount_err > max_bad_data_initial +
1948       (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
1949     die("too much garbage in input file!  (%d errs, %d ok, %d blank)",
1950         ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
1951   return OOP_CONTINUE;
1952 }
1953
1954 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
1955                                oop_rd_event ev, const char *errmsg,
1956                                int errnoval, const char *data, size_t recsz,
1957                                void *ipf_v) {
1958   InputFile *ipf= ipf_v;
1959   assert(ev == OOP_RD_SYSTEM);
1960   errno= errnoval;
1961   sysdie("error reading input file: %s, offset %lu",
1962          ipf->path, (unsigned long)ipf->offset);
1963 }
1964
1965 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
1966                                   oop_rd_event ev, const char *errmsg,
1967                                   int errnoval, const char *data, size_t recsz,
1968                                   void *ipf_v) {
1969   InputFile *ipf= ipf_v;
1970   Article *art;
1971   char tokentextbuf[sizeof(TOKEN)*2+3];
1972
1973   if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
1974
1975   off_t old_offset= ipf->offset;
1976   ipf->offset += recsz + !!(ev == OOP_RD_OK);
1977
1978 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
1979
1980   if (ev==OOP_RD_PARTREC)
1981     feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
1982     /* but process it anyway */
1983
1984   if (ipf->skippinglong) {
1985     if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
1986     return OOP_CONTINUE;
1987   }
1988   if (ev==OOP_RD_LONG) {
1989     ipf->skippinglong= 1;
1990     X_BAD_DATA("overly long line");
1991   }
1992
1993   if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
1994   if (!recsz) X_BAD_DATA("empty line");
1995
1996   if (data[0]==' ') {
1997     if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
1998     ipf->readcount_blank++;
1999     return OOP_CONTINUE;
2000   }
2001
2002   char *space= strchr(data,' ');
2003   int tokenlen= space-data;
2004   int midlen= (int)recsz-tokenlen-1;
2005   if (midlen <= 2) X_BAD_DATA("no room for messageid");
2006   if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
2007
2008   if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
2009   memcpy(tokentextbuf, data, tokenlen);
2010   tokentextbuf[tokenlen]= 0;
2011   if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
2012
2013   ipf->readcount_ok++;
2014
2015   art= xmalloc(sizeof(*art) - 1 + midlen + 1);
2016   memset(art,0,sizeof(*art));
2017   art->state= art_Unchecked;
2018   art->midlen= midlen;
2019   art->ipf= ipf;  ipf->inprogress++;
2020   art->token= TextToToken(tokentextbuf);
2021   art->offset= old_offset;
2022   art->blanklen= recsz;
2023   strcpy(art->messageid, space+1);
2024   LIST_ADDTAIL(ipf->queue, art);
2025
2026   if (sms==sm_NORMAL && ipf==main_input_file &&
2027       ipf->offset >= target_max_feedfile_size)
2028     statemc_start_flush("feed file size");
2029
2030   check_assign_articles();
2031   return OOP_CONTINUE;
2032 }
2033
2034 /*========== tailing input file ==========*/
2035
2036 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
2037                                      void *user) {
2038   InputFile *ipf= user;
2039   return ipf->readable_callback(loop, &ipf->readable,
2040                                 ipf->readable_callback_user);
2041 }
2042
2043 static void tailing_on_cancel(struct oop_readable *rable) {
2044   InputFile *ipf= (void*)rable;
2045
2046   if (ipf->filemon) filemon_stop(ipf);
2047   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2048   ipf->readable_callback= 0;
2049 }
2050
2051 static void tailing_queue_readable(InputFile *ipf) {
2052   /* lifetime of ipf here is OK because destruction will cause
2053    * on_cancel which will cancel this callback */
2054   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2055 }
2056
2057 static int tailing_on_readable(struct oop_readable *rable,
2058                                 oop_readable_call *cb, void *user) {
2059   InputFile *ipf= (void*)rable;
2060
2061   tailing_on_cancel(rable);
2062   ipf->readable_callback= cb;
2063   ipf->readable_callback_user= user;
2064   filemon_start(ipf);
2065
2066   tailing_queue_readable(ipf);
2067   return 0;
2068 }
2069
2070 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2071                                 size_t length) {
2072   InputFile *ipf= (void*)rable;
2073   for (;;) {
2074     ssize_t r= read(ipf->fd, buffer, length);
2075     if (r==-1) {
2076       if (errno==EINTR) continue;
2077       return r;
2078     }
2079     if (!r) {
2080       if (ipf==main_input_file) {
2081         errno=EAGAIN;
2082         return -1;
2083       } else if (ipf==flushing_input_file) {
2084         assert(ipf->rd);
2085         assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2086       } else if (ipf==backlog_input_file) {
2087         assert(ipf->rd);
2088       } else {
2089         abort();
2090       }
2091     }
2092     tailing_queue_readable(ipf);
2093     return r;
2094   }
2095 }
2096
2097 /*---------- filemon implemented with inotify ----------*/
2098
2099 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2100 #define HAVE_FILEMON
2101
2102 #include <sys/inotify.h>
2103
2104 static int filemon_inotify_fd;
2105 static int filemon_inotify_wdmax;
2106 static InputFile **filemon_inotify_wd2ipf;
2107
2108 struct Filemon_Perfile {
2109   int wd;
2110 };
2111
2112 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2113   int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2114   if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2115
2116   if (wd >= filemon_inotify_wdmax) {
2117     int newmax= wd+2;
2118     filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2119                                  sizeof(*filemon_inotify_wd2ipf) * newmax);
2120     memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2121            sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2122     filemon_inotify_wdmax= newmax;
2123   }
2124
2125   assert(!filemon_inotify_wd2ipf[wd]);
2126   filemon_inotify_wd2ipf[wd]= ipf;
2127
2128   debug("filemon inotify startfile %p wd=%d wdmax=%d",
2129         ipf, wd, filemon_inotify_wdmax);
2130
2131   pf->wd= wd;
2132 }
2133
2134 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2135   int wd= pf->wd;
2136   debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2137   int r= inotify_rm_watch(filemon_inotify_fd, wd);
2138   if (r) sysdie("inotify_rm_watch");
2139   filemon_inotify_wd2ipf[wd]= 0;
2140 }
2141
2142 static void *filemon_inotify_readable(oop_source *lp, int fd,
2143                                       oop_event e, void *u) {
2144   struct inotify_event iev;
2145   for (;;) {
2146     int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2147     if (r==-1) {
2148       if (isewouldblock(errno)) break;
2149       sysdie("read from inotify master");
2150     } else if (r==sizeof(iev)) {
2151       assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2152     } else {
2153       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2154     }
2155     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2156     debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2157     filemon_callback(ipf);
2158   }
2159   return OOP_CONTINUE;
2160 }
2161
2162 static int filemon_method_init(void) {
2163   filemon_inotify_fd= inotify_init();
2164   if (filemon_inotify_fd<0) {
2165     syswarn("filemon/inotify: inotify_init failed");
2166     return 0;
2167   }
2168   xsetnonblock(filemon_inotify_fd, 1);
2169   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2170
2171   debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2172   return 1;
2173 }
2174
2175 static void filemon_method_dump_info(FILE *f) {
2176   int i;
2177   fprintf(f,"inotify");
2178   DUMPV("%d",,filemon_inotify_fd);
2179   DUMPV("%d",,filemon_inotify_wdmax);
2180   for (i=0; i<filemon_inotify_wdmax; i++)
2181     fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i],);
2182 }
2183
2184 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2185
2186 /*---------- filemon dummy implementation ----------*/
2187
2188 #if !defined(HAVE_FILEMON)
2189
2190 struct Filemon_Perfile { int dummy; };
2191
2192 static int filemon_method_init(void) {
2193   warn("filemon/dummy: no filemon method compiled in");
2194   return 0;
2195 }
2196 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2197 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2198 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2199
2200 #endif /* !HAVE_FILEMON */
2201
2202 /*---------- filemon generic interface ----------*/
2203
2204 static void filemon_start(InputFile *ipf) {
2205   assert(!ipf->filemon);
2206
2207   NEW(ipf->filemon);
2208   filemon_method_startfile(ipf, ipf->filemon);
2209 }
2210
2211 static void filemon_stop(InputFile *ipf) {
2212   if (!ipf->filemon) return;
2213   filemon_method_stopfile(ipf, ipf->filemon);
2214   free(ipf->filemon);
2215   ipf->filemon= 0;
2216 }
2217
2218 static void filemon_callback(InputFile *ipf) {
2219   if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2220     ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2221 }
2222
2223 /*---------- interface to start and stop an input file ----------*/
2224
2225 static const oop_rd_style feedfile_rdstyle= {
2226   OOP_RD_DELIM_STRIP, '\n',
2227   OOP_RD_NUL_PERMIT,
2228   OOP_RD_SHORTREC_LONG,
2229 };
2230
2231 static void inputfile_reading_start(InputFile *ipf) {
2232   assert(!ipf->rd);
2233   ipf->readable.on_readable= tailing_on_readable;
2234   ipf->readable.on_cancel=   tailing_on_cancel;
2235   ipf->readable.try_read=    tailing_try_read;
2236   ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2237   ipf->readable.delete_kill= 0;
2238
2239   ipf->readable_callback= 0;
2240   ipf->readable_callback_user= 0;
2241
2242   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2243   assert(ipf->rd);
2244
2245   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2246                      feedfile_got_article,ipf, feedfile_read_err, ipf);
2247   if (r) sysdie("unable start reading feedfile %s",ipf->path);
2248 }
2249
2250 static void inputfile_reading_stop(InputFile *ipf) {
2251   assert(ipf->rd);
2252   oop_rd_cancel(ipf->rd);
2253   oop_rd_delete(ipf->rd);
2254   ipf->rd= 0;
2255   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2256 }
2257
2258
2259 /*========== interaction with innd - state machine ==========*/
2260
2261 /* See official state diagram at top of file.  We implement
2262  * this as follows:
2263  * -8<-
2264
2265             .=======.
2266             ||START||
2267             `======='
2268                 |
2269                 | open F
2270                 |
2271                 |    F ENOENT
2272                 |`---------------------------------------------------.
2273       F OPEN OK |                                                    |
2274                 |`---------------- - - -                             |
2275        D ENOENT |       D EXISTS   see OVERALL STATES diagram        |
2276                 |                  for full startup logic            |
2277      ,--------->|                                                    |
2278      |          V                                                    |
2279      |     ============                                       try to |
2280      |      NORMAL                                            open D |
2281      |     [Normal]                                                  |
2282      |      main F tail                                              |
2283      |     ============                                              V
2284      |          |                                                    |
2285      |          | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT            |
2286      ^          | hardlink F to D                                    |
2287      |     [Hardlinked]                                              |
2288      |          | unlink F                                           |
2289      |          | our handle onto F is now onto D                    |
2290      |     [Moved]                                                   |
2291      |          |                                                    |
2292      |          |<-------------------<---------------------<---------+
2293      |          |                                                    |
2294      |          | spawn inndcomm flush                               |
2295      |          V                                                    |
2296      |     ==================                                        |
2297      |      FLUSHING[-ABSENT]                                        |
2298      |     [Flushing]                                                |
2299      |     main D tail/none                                          |
2300      |     ==================                                        |
2301      |          |                                                    |
2302      |          |   INNDCOMM FLUSH FAILS                             ^
2303      |          |`----------------------->----------.                |
2304      |          |                                   |                |
2305      |          |   NO SUCH SITE                    V                |
2306      ^          |`--------------->----.         ==================== |
2307      |          |                      \        FLUSHFAILED[-ABSENT] |
2308      |          |                       \         [Moved]            |
2309      |          | FLUSH OK               \       main D tail/none    |
2310      |          | open F                  \     ==================== |
2311      |          |                          \        |                |
2312      |          |                           \       | TIME TO RETRY  |
2313      |          |`------->----.     ,---<---'\      `----------------'
2314      |          |    D NONE   |     | D NONE  `----.
2315      |          V             |     |              V
2316      |     =============      V     V             ============
2317      |      SEPARATED-1       |     |              DROPPING-1
2318      |      flsh->rd!=0       |     |              flsh->rd!=0
2319      |     [Separated]        |     |             [Dropping]
2320      |      main F idle       |     |              main none
2321      |      flsh D tail       |     |              flsh D tail
2322      |     =============      |     |             ============
2323      |          |             |     | install       |
2324      ^          | EOF ON D    |     |  defer        | EOF ON D
2325      |          V             |     |               V
2326      |     ===============    |     |             ===============
2327      |      SEPARATED-2       |     |              DROPPING-2
2328      |      flsh->rd==0       |     V              flsh->rd==0
2329      |     [Finishing]        |     |             [Dropping]
2330      |      main F tail       |     `.             main none
2331      |      flsh D closed     |       `.           flsh D closed
2332      |     ===============    V         `.        ===============
2333      |          |                         `.          |
2334      |          | ALL D PROCESSED           `.        | ALL D PROCESSED
2335      |          V install defer as backlog    `.      | install defer
2336      ^          | close D                       `.    | close D
2337      |          | unlink D                        `.  | unlink D
2338      |          |                                  |  |
2339      |          |                                  V  V
2340      `----------'                               ==============
2341                                                  DROPPED
2342                                                 [Dropped]
2343                                                  main none
2344                                                  flsh none
2345                                                  some backlog
2346                                                 ==============
2347                                                       |
2348                                                       | ALL BACKLOG DONE
2349                                                       |
2350                                                       | unlink lock
2351                                                       | exit
2352                                                       V
2353                                                   ==========
2354                                                    (ESRCH)
2355                                                   [Droppped]
2356                                                   ==========
2357  * ->8-
2358  */
2359
2360 static void startup_set_input_file(InputFile *f) {
2361   assert(!main_input_file);
2362   main_input_file= f;
2363   inputfile_reading_start(f);
2364 }
2365
2366 static void statemc_lock(void) {
2367   int lockfd;
2368   struct stat stab, stabf;
2369   
2370   for (;;) {
2371     lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2372     if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2373
2374     struct flock fl;
2375     memset(&fl,0,sizeof(fl));
2376     fl.l_type= F_WRLCK;
2377     fl.l_whence= SEEK_SET;
2378     int r= fcntl(lockfd, F_SETLK, &fl);
2379     if (r==-1) {
2380       if (errno==EACCES || isewouldblock(errno)) {
2381         if (quiet_multiple) exit(0);
2382         fatal("another duct holds the lockfile");
2383       }
2384       sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2385     }
2386
2387     xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2388     int lock_noent;
2389     xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2390
2391     if (!lock_noent && samefile(&stab, &stabf))
2392       break;
2393
2394     xclose(lockfd, "stale lockfile ", path_lock);
2395   }
2396
2397   FILE *lockfile= fdopen(lockfd, "w");
2398   if (!lockfile) sysdie("fdopen lockfile");
2399
2400   int r= ftruncate(lockfd, 0);
2401   if (r) sysdie("truncate lockfile to write new info");
2402
2403   if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2404               (unsigned long)self_pid,
2405               sitename, feedfile, remote_host) == EOF ||
2406       fflush(lockfile))
2407     sysfatal("write info to lockfile %s", path_lock);
2408
2409   debug("startup: locked");
2410 }
2411
2412 static void statemc_init(void) {
2413   struct stat stabdefer;
2414
2415   search_backlog_file();
2416
2417   int defer_noent;
2418   xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2419   if (defer_noent) {
2420     debug("startup: ductdefer ENOENT");
2421   } else {
2422     debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2423     switch (stabdefer.st_nlink==1) {
2424     case 1:
2425       open_defer(); /* so that we will later close it and rename it */
2426       break;
2427     case 2:
2428       xunlink(path_defer, "stale defer file link"
2429               " (presumably hardlink to backlog file)");
2430       break;
2431     default:
2432       die("defer file %s has unexpected link count %d",
2433           path_defer, stabdefer.st_nlink);
2434     }
2435   }
2436
2437   struct stat stab_f, stab_d;
2438   int noent_f;
2439
2440   InputFile *file_d= open_input_file(path_flushing);
2441   if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2442
2443   xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2444
2445   if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2446     debug("startup: F==D => Hardlinked");
2447     xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2448     noent_f= 1;
2449   }
2450
2451   if (noent_f) {
2452     debug("startup: F ENOENT => Moved");
2453     if (file_d) startup_set_input_file(file_d);
2454     spawn_inndcomm_flush("feedfile missing at startup");
2455     /* => Flushing, sms:=FLUSHING */
2456   } else {
2457     if (file_d) {
2458       debug("startup: F!=D => Separated");
2459       startup_set_input_file(file_d);
2460       flushing_input_file= main_input_file;
2461       main_input_file= open_input_file(feedfile);
2462       if (!main_input_file) die("feedfile vanished during startup");
2463       SMS(SEPARATED, 0, "found both old and current feed files");
2464     } else {
2465       debug("startup: F exists, D ENOENT => Normal");
2466       InputFile *file_f= open_input_file(feedfile);
2467       if (!file_f) die("feed file vanished during startup");
2468       startup_set_input_file(file_f);
2469       SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2470     }
2471   }
2472 }
2473
2474 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2475   assert(sms == sm_NORMAL);
2476
2477   debug("starting flush (%s) (%lu >?= %lu) (%d)",
2478         why,
2479         (unsigned long)(main_input_file ? main_input_file->offset : 0),
2480         (unsigned long)target_max_feedfile_size,
2481         until_flush);
2482
2483   int r= link(feedfile, path_flushing);
2484   if (r) sysfatal("link feedfile %s to flushing file %s",
2485                   feedfile, path_flushing);
2486   /* => Hardlinked */
2487
2488   xunlink(feedfile, "old feedfile link");
2489   /* => Moved */
2490
2491   spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2492 }
2493
2494 static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
2495   switch (sms) {
2496   case sm_NORMAL:
2497     statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2498     return 1;
2499   case sm_FLUSHFAILED:
2500     spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2501     return 1;
2502   default:
2503     return 0;
2504   }
2505 }
2506
2507 static void statemc_period_poll(void) {
2508   if (!until_flush) return;
2509   until_flush--;
2510   assert(until_flush>=0);
2511
2512   if (until_flush) return;
2513   int ok= trigger_flush_ok();
2514   assert(ok);
2515 }
2516
2517 static int inputfile_is_done(InputFile *ipf) {
2518   if (!ipf) return 0;
2519   if (ipf->inprogress) return 0; /* new article in the meantime */
2520   if (ipf->rd) return 0; /* not had EOF */
2521   return 1;
2522 }
2523
2524 static void notice_processed(InputFile *ipf, int completed,
2525                              const char *what, const char *spec) {
2526   if (!ipf) return; /* allows preterminate to be lazy */
2527
2528 #define RCI_NOTHING(x) /* nothing */
2529 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2530 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2531
2532 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2533
2534   char *inprog= completed
2535     ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2536     : xasprintf(" inprogress=%ld", ipf->inprogress);
2537
2538   info("%s %s%s read=%d (+bl=%d,+err=%d)%s"
2539        " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2540        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2541        ,
2542        completed?"completed":"processed", what, spec,
2543        ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, inprog,
2544        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2545        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2546        CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2547        , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2548        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
2549        );
2550
2551   free(inprog);
2552
2553 #undef CNT
2554 }
2555
2556 static void statemc_check_backlog_done(void) {
2557   InputFile *ipf= backlog_input_file;
2558   if (!inputfile_is_done(ipf)) return;
2559
2560   const char *slash= strrchr(ipf->path, '/');
2561   const char *leaf= slash ? slash+1 : ipf->path;
2562   const char *under= strchr(slash, '_');
2563   const char *rest= under ? under+1 : leaf;
2564   if (!strncmp(rest,"backlog",7)) rest += 7;
2565   notice_processed(ipf,1,"backlog ",rest);
2566
2567   close_input_file(ipf);
2568   if (unlink(ipf->path)) {
2569     if (errno != ENOENT)
2570       sysdie("could not unlink processed backlog file %s", ipf->path);
2571     warn("backlog file %s vanished while we were reading it"
2572          " so we couldn't remove it (but it's done now, anyway)",
2573          ipf->path);
2574   }
2575   free(ipf);
2576   backlog_input_file= 0;
2577   search_backlog_file();
2578   return;
2579 }
2580
2581 static void statemc_check_flushing_done(void) {
2582   InputFile *ipf= flushing_input_file;
2583   if (!inputfile_is_done(ipf)) return;
2584
2585   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2586
2587   notice_processed(ipf,1,"feedfile","");
2588
2589   close_defer();
2590
2591   xunlink(path_flushing, "old flushing file");
2592
2593   close_input_file(flushing_input_file);
2594   free(flushing_input_file);
2595   flushing_input_file= 0;
2596
2597   if (sms==sm_SEPARATED) {
2598     notice("flush complete");
2599     SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2600   } else if (sms==sm_DROPPING) {
2601     SMS(DROPPED, 0, "old flush complete");
2602     search_backlog_file();
2603     notice("feed dropped, but will continue until backlog is finished");
2604   }
2605 }
2606
2607 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2608                                       void *u) {
2609   assert(!inputfile_is_done(main_input_file));
2610   statemc_check_flushing_done();
2611   statemc_check_backlog_done();
2612   return OOP_CONTINUE;
2613 }
2614
2615 static void queue_check_input_done(void) {
2616   loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2617 }
2618
2619 static void statemc_setstate(StateMachineState newsms, int periods,
2620                              const char *forlog, const char *why) {
2621   sms= newsms;
2622   until_flush= periods;
2623
2624   const char *xtra= "";
2625   switch (sms) {
2626   case sm_FLUSHING:
2627   case sm_FLUSHFAILED:
2628     if (!main_input_file) xtra= "-ABSENT";
2629     break;
2630   case sm_SEPARATED:
2631   case sm_DROPPING:
2632     xtra= flushing_input_file->rd ? "-1" : "-2";
2633     break;
2634   default:;
2635   }
2636
2637   if (periods) {
2638     info("state %s%s[%d] %s",forlog,xtra,periods,why);
2639   } else {
2640     info("state %s%s %s",forlog,xtra,why);
2641   }
2642 }
2643
2644 /*---------- defer and backlog files ----------*/
2645
2646 static void open_defer(void) {
2647   struct stat stab;
2648
2649   if (defer) return;
2650
2651   defer= fopen(path_defer, "a+");
2652   if (!defer) sysfatal("could not open defer file %s", path_defer);
2653
2654   /* truncate away any half-written records */
2655
2656   xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2657
2658   if (stab.st_size > LONG_MAX)
2659     die("defer file %s size is far too large", path_defer);
2660
2661   if (!stab.st_size)
2662     return;
2663
2664   long orgsize= stab.st_size;
2665   long truncto= stab.st_size;
2666   for (;;) {
2667     if (!truncto) break; /* was only (if anything) one half-truncated record */
2668     if (fseek(defer, truncto-1, SEEK_SET) < 0)
2669       sysdie("seek in defer file %s while truncating partial", path_defer);
2670
2671     int r= getc(defer);
2672     if (r==EOF) {
2673       if (ferror(defer))
2674         sysdie("failed read from defer file %s", path_defer);
2675       else
2676         die("defer file %s shrank while we were checking it!", path_defer);
2677     }
2678     if (r=='\n') break;
2679     truncto--;
2680   }
2681
2682   if (stab.st_size != truncto) {
2683     warn("truncating half-record at end of defer file %s -"
2684          " shrinking by %ld bytes from %ld to %ld",
2685          path_defer, orgsize - truncto, orgsize, truncto);
2686
2687     if (fflush(defer))
2688       sysfatal("could not flush defer file %s", path_defer);
2689     if (ftruncate(fileno(defer), truncto))
2690       sysdie("could not truncate defer file %s", path_defer);
2691
2692   } else {
2693     info("continuing existing defer file %s (%ld bytes)",
2694          path_defer, orgsize);
2695   }
2696   if (fseek(defer, truncto, SEEK_SET))
2697     sysdie("could not seek to new end of defer file %s", path_defer);
2698 }
2699
2700 static void close_defer(void) {
2701   if (!defer)
2702     return;
2703
2704   struct stat stab;
2705   xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2706
2707   if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2708   defer= 0;
2709
2710   time_t now= xtime();
2711
2712   char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2713                            (unsigned long)now,
2714                            (unsigned long)stab.st_ino);
2715   if (link(path_defer, backlog))
2716     sysfatal("could not install defer file %s as backlog file %s",
2717            path_defer, backlog);
2718   if (unlink(path_defer))
2719     sysdie("could not unlink old defer link %s to backlog file %s",
2720            path_defer, backlog);
2721
2722   free(backlog);
2723
2724   if (until_backlog_nextscan < 0 ||
2725       until_backlog_nextscan > backlog_retry_minperiods + 1)
2726     until_backlog_nextscan= backlog_retry_minperiods + 1;
2727 }
2728
2729 static void poll_backlog_file(void) {
2730   if (until_backlog_nextscan < 0) return;
2731   if (until_backlog_nextscan-- > 0) return;
2732   search_backlog_file();
2733 }
2734
2735 static void search_backlog_file(void) {
2736   /* returns non-0 iff there are any backlog files */
2737
2738   glob_t gl;
2739   int r, i;
2740   struct stat stab;
2741   const char *oldest_path=0;
2742   time_t oldest_mtime=0, now;
2743
2744   if (backlog_input_file) return;
2745
2746  try_again:
2747
2748   r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2749
2750   switch (r) {
2751   case GLOB_ABORTED:
2752     sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2753   case GLOB_NOSPACE:
2754     fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2755   case 0:
2756     for (i=0; i<gl.gl_pathc; i++) {
2757       const char *path= gl.gl_pathv[i];
2758
2759       if (strchr(path,'#') || strchr(path,'~')) {
2760         debug("backlog file search skipping %s", path);
2761         continue;
2762       }
2763       r= stat(path, &stab);
2764       if (r) {
2765         syswarn("failed to stat backlog file %s", path);
2766         continue;
2767       }
2768       if (!S_ISREG(stab.st_mode)) {
2769         warn("backlog file %s is not a plain file (or link to one)", path);
2770         continue;
2771       }
2772       if (!oldest_path || stab.st_mtime < oldest_mtime) {
2773         oldest_path= path;
2774         oldest_mtime= stab.st_mtime;
2775       }
2776     }
2777   case GLOB_NOMATCH: /* fall through */
2778     break;
2779   default:
2780     sysdie("glob expansion of backlog pattern %s gave unexpected"
2781            " nonzero (error?) return value %d", globpat_backlog, r);
2782   }
2783
2784   if (!oldest_path) {
2785     debug("backlog scan: none");
2786
2787     if (sms==sm_DROPPED) {
2788       preterminate();
2789       notice("feed dropped and our work is complete");
2790
2791       int r= unlink(path_control);
2792       if (r && errno!=ENOENT)
2793         syswarn("failed to remove control symlink for old feed");
2794
2795       xunlink(path_lock,    "lockfile for old feed");
2796       exit(4);
2797     }
2798     until_backlog_nextscan= backlog_spontrescan_periods;
2799     goto xfree;
2800   }
2801
2802   now= xtime();
2803   double age= difftime(now, oldest_mtime);
2804   long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2805
2806   if (age_deficiency <= 0) {
2807     debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2808           age, age_deficiency, oldest_path);
2809
2810     backlog_input_file= open_input_file(oldest_path);
2811     if (!backlog_input_file) {
2812       warn("backlog file %s vanished as we opened it", oldest_path);
2813       globfree(&gl);
2814       goto try_again;
2815     }
2816     inputfile_reading_start(backlog_input_file);
2817     until_backlog_nextscan= -1;
2818     goto xfree;
2819   }
2820
2821   until_backlog_nextscan= age_deficiency / period_seconds;
2822
2823   if (backlog_spontrescan_periods >= 0 &&
2824       until_backlog_nextscan > backlog_spontrescan_periods)
2825     until_backlog_nextscan= backlog_spontrescan_periods;
2826
2827   debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2828         age, age_deficiency, until_backlog_nextscan, oldest_path);
2829
2830  xfree:
2831   globfree(&gl);
2832   return;
2833 }
2834
2835 /*---------- shutdown and signal handling ----------*/
2836
2837 static void preterminate(void) {
2838   if (in_child) return;
2839   notice_processed(main_input_file,0,"feedfile","");
2840   notice_processed(flushing_input_file,0,"flushing","");
2841   if (backlog_input_file)
2842     notice_processed(backlog_input_file,0, "backlog file ",
2843                      backlog_input_file->path);
2844 }
2845
2846 static int signal_self_pipe[2];
2847 static sig_atomic_t terminate_sig_flag;
2848
2849 static void raise_default(int signo) {
2850   xsigsetdefault(signo);
2851   raise(signo);
2852   abort();
2853 }
2854
2855 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2856   assert(fd=signal_self_pipe[0]);
2857   char buf[PIPE_BUF];
2858   int r= read(signal_self_pipe[0], buf, sizeof(buf));
2859   if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2860   if (r==0) die("eof on signal self pipe");
2861   if (terminate_sig_flag) {
2862     preterminate();
2863     notice("terminating (%s)", strsignal(terminate_sig_flag));
2864     raise_default(terminate_sig_flag);
2865   }
2866   return OOP_CONTINUE;
2867 }
2868
2869 static void sigarrived_handler(int signum) {
2870   static char x;
2871   switch (signum) {
2872   case SIGTERM:
2873   case SIGINT:
2874     if (!terminate_sig_flag) terminate_sig_flag= signum;
2875     break;
2876   default:
2877     abort();
2878   }
2879   write(signal_self_pipe[1],&x,1);
2880 }
2881
2882 static void init_signals(void) {
2883   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
2884     sysdie("could not ignore SIGPIPE");
2885
2886   if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
2887
2888   xsetnonblock(signal_self_pipe[0],1);
2889   xsetnonblock(signal_self_pipe[1],1);
2890
2891   struct sigaction sa;
2892   memset(&sa,0,sizeof(sa));
2893   sa.sa_handler= sigarrived_handler;
2894   sa.sa_flags= SA_RESTART;
2895   xsigaction(SIGTERM,&sa);
2896   xsigaction(SIGINT,&sa);
2897
2898   on_fd_read_except(signal_self_pipe[0], sigarrived_event);
2899 }
2900
2901 /*========== flushing the feed ==========*/
2902
2903 static pid_t inndcomm_child;
2904 static int inndcomm_sentinel_fd;
2905
2906 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
2907   assert(inndcomm_child);
2908   assert(fd == inndcomm_sentinel_fd);
2909   int status= xwaitpid(&inndcomm_child, "inndcomm");
2910   inndcomm_child= 0;
2911   
2912   cancel_fd_read_except(fd);
2913   xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
2914   inndcomm_sentinel_fd= 0;
2915
2916   assert(!flushing_input_file);
2917
2918   if (WIFEXITED(status)) {
2919     switch (WEXITSTATUS(status)) {
2920
2921     case INNDCOMMCHILD_ESTATUS_FAIL:
2922       goto failed;
2923
2924     case INNDCOMMCHILD_ESTATUS_NONESUCH:
2925       notice("feed has been dropped by innd, finishing up");
2926       flushing_input_file= main_input_file;
2927       tailing_queue_readable(flushing_input_file);
2928         /* we probably previously returned EAGAIN from our fake read method
2929          * when in fact we were at EOF, so signal another readable event
2930          * so we actually see the EOF */
2931
2932       main_input_file= 0;
2933
2934       if (flushing_input_file) {
2935         SMS(DROPPING, 0, "feed dropped by innd, but must finish last flush");
2936       } else {
2937         close_defer();
2938         SMS(DROPPED, 0, "feed dropped by innd");
2939         search_backlog_file();
2940       }
2941       return OOP_CONTINUE;
2942
2943     case 0:
2944       /* as above */
2945       flushing_input_file= main_input_file;
2946       tailing_queue_readable(flushing_input_file);
2947
2948       main_input_file= open_input_file(feedfile);
2949       if (!main_input_file)
2950         die("flush succeeded but feedfile %s does not exist!", feedfile);
2951
2952       if (flushing_input_file) {
2953         SMS(SEPARATED, spontaneous_flush_periods, "recovery flush complete");
2954       } else {
2955         close_defer();
2956         SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2957       }
2958       return OOP_CONTINUE;
2959
2960     default:
2961       goto unexpected_exitstatus;
2962
2963     }
2964   } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
2965     warn("flush timed out trying to talk to innd");
2966     goto failed;
2967   } else {
2968   unexpected_exitstatus:
2969     report_child_status("inndcomm child", status);
2970   }
2971
2972  failed:
2973   SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
2974   return OOP_CONTINUE;
2975 }
2976
2977 static void inndcommfail(const char *what) {
2978   syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
2979   exit(INNDCOMMCHILD_ESTATUS_FAIL);
2980 }
2981
2982 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
2983   int pipefds[2];
2984
2985   notice("flushing %s",why);
2986
2987   assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
2988   assert(!inndcomm_child);
2989   assert(!inndcomm_sentinel_fd);
2990
2991   if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
2992
2993   inndcomm_child= xfork("inndcomm child");
2994
2995   if (!inndcomm_child) {
2996     const char *flushargv[2]= { sitename, 0 };
2997     char *reply;
2998     int r;
2999
3000     xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
3001     /* parent spots the autoclose of pipefds[1] when we die or exit */
3002
3003     if (simulate_flush>=0) {
3004       warn("SIMULATING flush child status %d", simulate_flush);
3005       if (simulate_flush>128) raise(simulate_flush-128);
3006       else exit(simulate_flush);
3007     }
3008
3009     alarm(inndcomm_flush_timeout);
3010     r= ICCopen();                         if (r)   inndcommfail("connect");
3011     r= ICCcommand('f',flushargv,&reply);  if (r<0) inndcommfail("transmit");
3012     if (!r) exit(0); /* yay! */
3013
3014     if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
3015     syswarn("innd ctlinnd flush failed: innd said %s", reply);
3016     exit(INNDCOMMCHILD_ESTATUS_FAIL);
3017   }
3018
3019   simulate_flush= -1;
3020
3021   xclose(pipefds[1], "inndcomm sentinel child's end",0);
3022   inndcomm_sentinel_fd= pipefds[0];
3023   assert(inndcomm_sentinel_fd);
3024   on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
3025
3026   SMS(FLUSHING, 0, why);
3027 }
3028
3029 /*========== main program ==========*/
3030
3031 static void postfork_inputfile(InputFile *ipf) {
3032   if (!ipf) return;
3033   xclose(ipf->fd, "(in child) input file ", ipf->path);
3034 }
3035
3036 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
3037   /* we have no stdio streams that are buffered long-term */
3038   if (!f) return;
3039   if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
3040 }
3041
3042 static void postfork(void) {
3043   in_child= 1;
3044
3045   xsigsetdefault(SIGTERM);
3046   xsigsetdefault(SIGINT);
3047   xsigsetdefault(SIGPIPE);
3048   if (terminate_sig_flag) raise(terminate_sig_flag);
3049
3050   postfork_inputfile(main_input_file);
3051   postfork_inputfile(flushing_input_file);
3052
3053   Conn *conn;
3054   FOR_CONN(conn)
3055     conn_closefd(conn,"(in child) ");
3056
3057   postfork_stdio(defer, "defer file ", path_defer);
3058 }
3059
3060 typedef struct Every Every;
3061 struct Every {
3062   struct timeval interval;
3063   int fixed_rate;
3064   void (*f)(void);
3065 };
3066
3067 static void every_schedule(Every *e, struct timeval base);
3068
3069 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3070   Every *e= e_v;
3071   e->f();
3072   if (!e->fixed_rate) xgettimeofday(&base);
3073   every_schedule(e, base);
3074   return OOP_CONTINUE;
3075 }
3076
3077 static void every_schedule(Every *e, struct timeval base) {
3078   struct timeval when;
3079   timeradd(&base, &e->interval, &when);
3080   loop->on_time(loop, when, every_happens, e);
3081 }
3082
3083 static void every(int interval, int fixed_rate, void (*f)(void)) {
3084   NEW_DECL(Every *,e);
3085   e->interval.tv_sec= interval;
3086   e->interval.tv_usec= 0;
3087   e->fixed_rate= fixed_rate;
3088   e->f= f;
3089   struct timeval now;
3090   xgettimeofday(&now);
3091   every_schedule(e, now);
3092 }
3093
3094 static void filepoll(void) {
3095   filemon_callback(main_input_file);
3096   filemon_callback(flushing_input_file);
3097 }
3098
3099 static char *debug_report_ipf(InputFile *ipf) {
3100   if (!ipf) return xasprintf("none");
3101
3102   const char *slash= strrchr(ipf->path,'/');
3103   const char *path= slash ? slash+1 : ipf->path;
3104
3105   return xasprintf("%p/%s:queue=%d,ip=%ld,off=%ld,fd=%d%s%s",
3106                    ipf, path,
3107                    ipf->queue.count, ipf->inprogress, (long)ipf->offset,
3108                    ipf->fd,
3109                    ipf->rd ? "" : ",!rd",
3110                    ipf->skippinglong ? "*skiplong" : "");
3111 }
3112
3113 static void period(void) {
3114   char *dipf_main=     debug_report_ipf(main_input_file);
3115   char *dipf_flushing= debug_report_ipf(flushing_input_file);
3116   char *dipf_backlog=  debug_report_ipf(backlog_input_file);
3117
3118   debug("PERIOD"
3119         " sms=%s[%d] conns=%d until_connect=%d"
3120         " input_files main:%s flushing:%s backlog:%s"
3121         " children connecting=%ld inndcomm=%ld"
3122         ,
3123         sms_names[sms], until_flush, conns.count, until_connect,
3124         dipf_main, dipf_flushing, dipf_backlog,
3125         (long)connecting_child, (long)inndcomm_child
3126         );
3127
3128   free(dipf_main);
3129   free(dipf_flushing);
3130   free(dipf_backlog);
3131
3132   if (until_connect) until_connect--;
3133
3134   poll_backlog_file();
3135   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3136   statemc_period_poll();
3137   check_assign_articles();
3138   check_idle_conns();
3139 }
3140
3141
3142 /*========== dumping state ==========*/
3143
3144 static void dump_article_list(FILE *f, const ControlCommand *c,
3145                               const ArticleList *al) {
3146   fprintf(f, " count=%d\n", al->count);
3147   if (!c->xval) return;
3148   
3149   int i; Article *art;
3150   for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3151     fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3152     DUMPV("%p", art->,ipf);
3153     DUMPV("%d", art->,missing);
3154     DUMPV("%lu", (unsigned long)art->,offset);
3155     DUMPV("%d", art->,blanklen);
3156     DUMPV("%d", art->,midlen);
3157     fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3158   }
3159 }
3160   
3161 static void dump_input_file(FILE *f, const ControlCommand *c,
3162                             InputFile *ipf, const char *wh) {
3163   char *dipf= debug_report_ipf(ipf);
3164   fprintf(f,"input %s %s", wh, dipf);
3165   free(dipf);
3166   
3167   if (ipf) {
3168     DUMPV("%d", ipf->,readcount_ok);
3169     DUMPV("%d", ipf->,readcount_blank);
3170     DUMPV("%d", ipf->,readcount_err);
3171   }
3172   fprintf(f,"\n");
3173   if (ipf) {
3174     ArtState state; const char *const *statename; 
3175     for (state=0, statename=artstate_names; *statename; state++,statename++) {
3176 #define RC_DUMP_FMT(x) " " #x "=%d"
3177 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3178       fprintf(f,"input %s counts %-11s"
3179               RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3180               wh, *statename
3181               RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3182     }
3183     fprintf(f,"input %s queue", wh);
3184     dump_article_list(f,c,&ipf->queue);
3185   }
3186 }
3187
3188 CCMD(dump) {
3189   int i;
3190   fprintf(cc->out, "dumping state to %s\n", path_dump);
3191   FILE *f= fopen(path_dump, "w");
3192   if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3193
3194   fprintf(f,"general");
3195   DUMPV("%s", sms_names,[sms]);
3196   DUMPV("%d", ,until_flush);
3197   DUMPV("%ld", (long),self_pid);
3198   DUMPV("%p", , defer);
3199   DUMPV("%d", , until_connect);
3200   DUMPV("%d", , until_backlog_nextscan);
3201   DUMPV("%d", , simulate_flush);
3202   fprintf(f,"\nnocheck");
3203   DUMPV("%#.10f", , accept_proportion);
3204   DUMPV("%d", , nocheck);
3205   DUMPV("%d", , nocheck_reported);
3206   fprintf(f,"\n");
3207
3208   fprintf(f,"special");
3209   DUMPV("%ld", (long),connecting_child);
3210   DUMPV("%d", , connecting_fdpass_sock);
3211   DUMPV("%d", , control_master);
3212   fprintf(f,"\n");
3213
3214   fprintf(f,"filemon ");
3215   filemon_method_dump_info(f);
3216
3217   dump_input_file(f,c, main_input_file,     "main"    );
3218   dump_input_file(f,c, flushing_input_file, "flushing");
3219   dump_input_file(f,c, backlog_input_file,  "backlog" );
3220
3221   fprintf(f,"conns count=%d\n", conns.count);
3222
3223   Conn *conn;
3224   FOR_CONN(conn) {
3225
3226     fprintf(f,"C%d",conn->fd);
3227     DUMPV("%p",conn->,rd);             DUMPV("%d",conn->,max_queue);
3228     DUMPV("%d",conn->,stream);         DUMPV("%d",conn->,quitting);
3229     DUMPV("%d",conn->,since_activity);
3230     fprintf(f,"\n");
3231
3232     fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3233     fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3234     fprintf(f,"C%d sent",    conn->fd); dump_article_list(f,c,&conn->sent);
3235
3236     fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3237     for (i=0; i<conn->xmitu; i++) {
3238       const struct iovec *iv= &conn->xmit[i];
3239       const XmitDetails *xd= &conn->xmitd[i];
3240       char *dinfo;
3241       switch (xd->kind) {
3242       case xk_Const:    dinfo= xasprintf("Const");                 break;
3243       case xk_Artdata:  dinfo= xasprintf("A%p", xd->info.sm_art);  break;
3244       default:
3245         abort();
3246       }
3247       fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3248               sanitise(iv->iov_base, iv->iov_len));
3249       free(dinfo);
3250     }
3251   }
3252
3253   fprintf(f,"paths");
3254   DUMPV("%s", , path_lock);
3255   DUMPV("%s", , path_flushing);
3256   DUMPV("%s", , path_defer);
3257   DUMPV("%s", , path_control);
3258   DUMPV("%s", , path_dump);
3259   DUMPV("%s", , globpat_backlog);
3260   fprintf(f,"\n");
3261
3262   if (!!ferror(f) + !!fclose(f)) {
3263     fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3264     return;
3265   }
3266 }
3267
3268 /*========== option parsing ==========*/
3269
3270 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3271 static void vbadusage(const char *fmt, va_list al) {
3272   char *m= xvasprintf(fmt,al);
3273   fprintf(stderr, "bad usage: %s\n"
3274           "say --help for help, or read the manpage\n",
3275           m);
3276   if (become_daemon)
3277     syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3278   exit(8);
3279 }
3280
3281 /*---------- generic option parser ----------*/
3282
3283 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3284 static void badusage(const char *fmt, ...) {
3285   va_list al;
3286   va_start(al,fmt);
3287   vbadusage(fmt,al);
3288 }
3289
3290 enum OptFlags {
3291   of_seconds= 001000u,
3292   of_boolean= 002000u,
3293 };
3294
3295 typedef struct Option Option;
3296 typedef void OptionParser(const Option*, const char *val);
3297
3298 struct Option {
3299   int shrt;
3300   const char *lng, *formarg;
3301   void *store;
3302   OptionParser *fn;
3303   int intval;
3304 };
3305
3306 static void parse_options(const Option *options, char ***argvp) {
3307   /* on return *argvp is first non-option arg; argc is not updated */
3308
3309   for (;;) {
3310     const char *arg= *++(*argvp);
3311     if (!arg) break;
3312     if (*arg != '-') break;
3313     if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3314     int a;
3315     while ((a= *++arg)) {
3316       const Option *o;
3317       if (a=='-') {
3318         arg++;
3319         char *equals= strchr(arg,'=');
3320         int len= equals ? (equals - arg) : strlen(arg);
3321         for (o=options; o->shrt || o->lng; o++)
3322           if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3323             goto found_long;
3324         badusage("unknown long option --%s",arg);
3325       found_long:
3326         if (!o->formarg) {
3327           if (equals) badusage("option --%s does not take a value",o->lng);
3328           arg= 0;
3329         } else if (equals) {
3330           arg= equals+1;
3331         } else {
3332           arg= *++(*argvp);
3333           if (!arg) badusage("option --%s needs a value for %s",
3334                              o->lng, o->formarg);
3335         }
3336         o->fn(o, arg);
3337         break; /* eaten the whole argument now */
3338       }
3339       for (o=options; o->shrt || o->lng; o++)
3340         if (a == o->shrt)
3341           goto found_short;
3342       badusage("unknown short option -%c",a);
3343     found_short:
3344       if (!o->formarg) {
3345         o->fn(o,0);
3346       } else {
3347         if (!*++arg) {
3348           arg= *++(*argvp);
3349           if (!arg) badusage("option -%c needs a value for %s",
3350                              o->shrt, o->formarg);
3351         }
3352         o->fn(o,arg);
3353         break; /* eaten the whole argument now */
3354       }
3355     }
3356   }
3357 }
3358
3359 #define DELIMPERHAPS(delim,str)  (str) ? (delim) : "", (str) ? (str) : ""
3360
3361 static void print_options(const Option *options, FILE *f) {
3362   const Option *o;
3363   for (o=options; o->shrt || o->lng; o++) {
3364     char shrt[2] = { o->shrt, 0 };
3365     char *optspec= xasprintf("%s%s%s%s%s",
3366                              o->shrt ? "-" : "", shrt,
3367                              o->shrt && o->lng ? "|" : "",
3368                              DELIMPERHAPS("--", o->lng));
3369     fprintf(f, "  %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3370     free(optspec);
3371   }
3372 }
3373
3374 /*---------- specific option types ----------*/
3375
3376 static void op_integer(const Option *o, const char *val) {
3377   char *ep;
3378   errno= 0;
3379   unsigned long ul= strtoul(val,&ep,10);
3380   if (*ep || ep==val || errno || ul>INT_MAX)
3381     badusage("bad integer value for %s",o->lng);
3382   int *store= o->store;
3383   *store= ul;
3384 }
3385
3386 static void op_double(const Option *o, const char *val) {
3387   int *store= o->store;
3388   char *ep;
3389   errno= 0;
3390   *store= strtod(val, &ep);
3391   if (*ep || ep==val || errno)
3392     badusage("bad floating point value for %s",o->lng);
3393 }
3394
3395 static void op_string(const Option *o, const char *val) {
3396   const char **store= o->store;
3397   *store= val;
3398 }
3399
3400 static void op_seconds(const Option *o, const char *val) {
3401   int *store= o->store;
3402   char *ep;
3403   int unit;
3404
3405   double v= strtod(val,&ep);
3406   if (ep==val) badusage("bad time/duration value for %s",o->lng);
3407
3408   if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3409   else if (!strcmp(ep,"m") || !strcmp(ep,"min"))    unit= 60;
3410   else if (!strcmp(ep,"h") || !strcmp(ep,"hour"))   unit= 3600;
3411   else if (!strcmp(ep,"d") || !strcmp(ep,"day"))    unit= 86400;
3412   else if (!strcmp(ep,"das")) unit= 10;
3413   else if (!strcmp(ep,"hs"))  unit= 100;
3414   else if (!strcmp(ep,"ks"))  unit= 1000;
3415   else if (!strcmp(ep,"Ms"))  unit= 1000000;
3416   else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3417
3418   v *= unit;
3419   v= ceil(v);
3420   if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3421   *store= v;
3422 }
3423
3424 static void op_setint(const Option *o, const char *val) {
3425   int *store= o->store;
3426   *store= o->intval;
3427 }
3428
3429 /*---------- specific options ----------*/
3430
3431 static void help(const Option *o, const char *val);
3432
3433 static const Option innduct_options[]= {
3434 {'f',"feedfile",         "F",     &feedfile,                 op_string      },
3435 {'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
3436 {0,"no-daemon",          0,       &become_daemon,            op_setint, 0   },
3437 {0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
3438 {0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
3439 {'C',"inndconf",         "F",     &inndconffile,             op_string      },
3440 {'P',"port",             "PORT",  &port,                     op_integer     },
3441 {0,"ctrl-sock-dir",      0,       &realsockdir,              op_string      },
3442 {0,"help",               0,       0,                         help           },
3443
3444 {0,"max-connections",    "N",     &max_connections,          op_integer     },
3445 {0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
3446 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
3447 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
3448
3449 {0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
3450 {0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
3451 {0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
3452
3453 {0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
3454 {0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
3455
3456 {0,"reconnect-interval",     "PERIOD", &reconnect_delay_periods,  op_seconds },
3457 {0,"flush-retry-interval",   "PERIOD", &flushfail_retry_periods,  op_seconds },
3458 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3459 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3460 {0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
3461 {0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
3462
3463 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
3464 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
3465
3466 {0,0}
3467 };
3468
3469 static void printusage(FILE *f) {
3470   fputs("usage: innduct [options] site [fqdn]\n"
3471         "available options are:\n", f);
3472   print_options(innduct_options, f);
3473 }
3474
3475 static void help(const Option *o, const char *val) {
3476   printusage(stdout);
3477   if (ferror(stdout) || fflush(stdout)) {
3478     perror("innduct: writing help");
3479     exit(12);
3480   }
3481   exit(0);
3482 }
3483
3484 static void convert_to_periods_rndup(int *store) {
3485   *store += period_seconds-1;
3486   *store /= period_seconds;
3487 }
3488
3489 int main(int argc, char **argv) {
3490   if (!argv[1]) {
3491     printusage(stderr);
3492     exit(8);
3493   }
3494
3495   parse_options(innduct_options, &argv);
3496
3497   /* arguments */
3498
3499   sitename= *argv++;
3500   if (!sitename) badusage("need site name argument");
3501   remote_host= *argv++;
3502   if (*argv) badusage("too many non-option arguments");
3503
3504   /* defaults */
3505
3506   int r= innconf_read(inndconffile);
3507   if (!r) badusage("could not read inn.conf (more info on stderr)");
3508
3509   if (!remote_host) remote_host= sitename;
3510
3511   if (nocheck_thresh < 0 || nocheck_thresh > 100)
3512     badusage("nocheck threshold percentage must be between 0..100");
3513   nocheck_thresh *= 0.01;
3514
3515   if (nocheck_decay < 0.1)
3516     badusage("nocheck decay articles must be at least 0.1");
3517   nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3518
3519   convert_to_periods_rndup(&reconnect_delay_periods);
3520   convert_to_periods_rndup(&flushfail_retry_periods);
3521   convert_to_periods_rndup(&backlog_retry_minperiods);
3522   convert_to_periods_rndup(&backlog_spontrescan_periods);
3523   convert_to_periods_rndup(&spontaneous_flush_periods);
3524   convert_to_periods_rndup(&need_activity_periods);
3525
3526   if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3527     badusage("bad input data ratio must be between 0..100");
3528   max_bad_data_ratio *= 0.01;
3529
3530   if (!feedfile) {
3531     feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3532   } else if (!feedfile[0]) {
3533     badusage("feed filename must be nonempty");
3534   } else if (feedfile[strlen(feedfile)-1]=='/') {
3535     feedfile= xasprintf("%s%s",feedfile,sitename);
3536   }
3537
3538   const char *feedfile_forbidden= "?*[~#";
3539   int c;
3540   while ((c= *feedfile_forbidden++))
3541     if (strchr(feedfile, c))
3542       badusage("feed filename may not contain metacharacter %c",c);
3543
3544   /* set things up */
3545
3546   path_lock=        xasprintf("%s_lock",      feedfile);
3547   path_flushing=    xasprintf("%s_flushing",  feedfile);
3548   path_defer=       xasprintf("%s_defer",     feedfile);
3549   path_control=     xasprintf("%s_control",   feedfile);
3550   path_dump=        xasprintf("%s_dump",      feedfile);
3551   globpat_backlog=  xasprintf("%s_backlog*",  feedfile);
3552
3553   oop_source_sys *sysloop= oop_sys_new();
3554   if (!sysloop) sysdie("could not create liboop event loop");
3555   loop= (oop_source*)sysloop;
3556
3557   LIST_INIT(conns);
3558
3559   if (become_daemon) {
3560     int i;
3561     for (i=3; i<255; i++)
3562       /* do this now before we open syslog, etc. */
3563       close(i);
3564     openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3565
3566     int null= open("/dev/null",O_RDWR);
3567     if (null<0) sysfatal("failed to open /dev/null");
3568     dup2(null,0);
3569     dup2(null,1);
3570     dup2(null,2);
3571     xclose(null, "/dev/null original fd",0);
3572
3573     pid_t child1= xfork("daemonise first fork");
3574     if (child1) _exit(0);
3575
3576     pid_t sid= setsid();
3577     if (sid != child1) sysfatal("setsid failed");
3578
3579     pid_t child2= xfork("daemonise second fork");
3580     if (child2) _exit(0);
3581   }
3582
3583   self_pid= getpid();
3584   if (self_pid==-1) sysdie("getpid");
3585
3586   statemc_lock();
3587
3588   init_signals();
3589
3590   notice("starting");
3591
3592   if (!become_daemon)
3593     control_stdio();
3594
3595   control_init();
3596
3597   int filemon_ok= 0;
3598   if (!try_filemon) {
3599     notice("filemon: suppressed by command line option, polling");
3600   } else {
3601     filemon_ok= filemon_method_init();
3602     if (!filemon_ok)
3603       warn("filemon: no file monitoring available, polling");
3604   }
3605   if (!filemon_ok)
3606     every(filepoll_seconds,0,filepoll);
3607
3608   every(period_seconds,1,period);
3609
3610   statemc_init();
3611
3612   /* let's go */
3613
3614   void *run= oop_sys_run(sysloop);
3615   assert(run == OOP_ERROR);
3616   sysdie("event loop failed");
3617 }