chiark / gitweb /
comment for dealing with wedge problems
[innduct.git] / backends / innduct.c
1 /*
2  * debugging rune:
3  *  build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
4  */
5
6 /*--
7 flow control notes
8 to ensure articles go away eventually
9 separate queue for each input file
10   queue expiry
11     every period, check head of backlog queue for expiry with SMretrieve
12       if too old: discard, and check next article
13     also check every backlog article as we read it
14   flush expiry
15     after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
16     one-off: eat queued articles from flushing and write them to defer
17     one-off: connfail all connections which have any articles from flushing
18     newly read articles from flushing go straight to defer
19     this should take care of it and get us out of this state
20 to avoid filling up ram needlessly
21   input control
22     limit number of queued articles for each ipf
23     pause/resume inputfile tailing
24 --*/
25
26 /*
27  * Newsfeeds file entries should look like this:
28  *     host.name.of.site[/exclude,exclude,...]\
29  *             :pattern,pattern...[/distribution,distribution...]\
30  *             :Tf,Wnm
31  *             :
32  * or
33  *     sitename[/exclude,exclude,...]\
34  *             :pattern,pattern...[/distribution,distribution...]\
35  *             :Tf,Wnm
36  *             :host.name.of.site
37  *
38  * Four files full of
39  *    token messageid
40  * or might be blanked out
41  *    <spc><spc><spc><spc>....
42  *
43  * F site.name                 main feed file
44  *                                opened/created, then written, by innd
45  *                                read by duct
46  *                                unlinked by duct
47  *                                tokens blanked out by duct when processed
48  *   site.name_lock            lock preventing multiple ducts
49  *                                to hold lock must open,F_SETLK[W]
50  *                                  and then stat to check that locked file
51  *                                  still has name site.name_lock
52  *                                holder of this lock is "duct"
53  *                                (only) lockholder may remove the lockfile
54  * D site.name_flushing        temporary feed file during flush (or crash)
55  *                                hardlink created by duct
56  *                                unlinked by duct
57  *   site.name_defer           431'd articles, still being written,
58  *                                created, written, used by duct
59  *
60  *   site.name_backlog.<date>.<inum>
61  *                             431'd articles, ready for innxmit or duct
62  *                                created (link/mv) by duct
63  *   site.name_backlog<anything-else>  (where <anything-else> does not
64  *                                      contain '#' or '~') eg
65  *   site.name_backlog.manual
66  *                             anything the sysadmin likes (eg, feed files
67  *                             from old feeds to be merged into this one)
68  *                                created (link/mv) by admin
69  *                                may be symlinks (in which case links
70  *                                may be written through, but only links
71  *                                will be removed.
72  *
73  *                             It is safe to remove backlog files manually,
74  *                             if it's desired to throw away the backlog.
75  *
76  * Backlog files are also processed by innduct.  We find the oldest
77  * backlog file which is at least a certain amount old, and feed it
78  * back into our processing.  When every article in it has been read
79  * and processed, we unlink it and look for another backlog file.
80  *
81  * If we don't have a backlog file that we're reading, we close the
82  * defer file that we're writing and make it into a backlog file at
83  * the first convenient opportunity.
84  * -8<-
85
86
87    OVERALL STATES:
88
89                                                                 START
90                                                                   |
91      ,-->--.                                                 check F, D
92      |     |                                                      |
93      |     |                                                      |
94      |     |  <----------------<---------------------------------'|
95      |     |                                       F exists       |
96      |     |                                       D ENOENT       |
97      |     |  duct opens F                                        |
98      |     V                                                      |
99      |  Normal                                                    |
100      |   F: innd writing, duct reading                            |
101      |   D: ENOENT                                                |
102      |     |                                                      |
103      |     |  duct decides time to flush                          |
104      |     |  duct makes hardlink                                 |
105      |     |                                                      |
106      |     V                            <------------------------'|
107      |  Hardlinked                                  F==D          |
108      |   F == D: innd writing, duct reading         both exist    |
109      ^     |                                                      |
110      |     |  duct unlinks F                                      |
111      |     |                        <-----------<-------------<--'|
112      |     |                           open D         F ENOENT    |
113      |     |                           if exists                  |
114      |     |                                                      |
115      |     V                        <---------------------.       |
116      |  Moved                                             |       |
117      |   F: ENOENT                                        |       |
118      |   D: innd writing, duct reading; or ENOENT         |       |
119      |     |                                              |       |
120      |     |  duct requests flush of feed                 |       |
121      |     |   (others can too, harmlessly)               |       |
122      |     V                                              |       |
123      |  Flushing                                          |       |
124      |   F: ENOENT                                        |       |
125      |   D: innd flushing, duct; or ENOENT                |       |
126      |     |                                              |       |
127      |     |   inndcomm flush fails                       |       |
128      |     |`-------------------------->------------------'       |
129      |     |                                                      |
130      |     |   inndcomm reports no such site                      |
131      |     |`---------------------------------------------------- | -.
132      |     |                                                      |  |
133      |     |  innd finishes writing D, creates F                  |  |
134      |     |  inndcomm reports flush successful                   |  |
135      |     |                                                      |  |
136      |     V                                                      |  |
137      |  Separated                                <----------------'  |
138      |   F: innd writing                            F!=D             /
139      |   D: duct reading; or ENOENT                  both exist     /
140      |     |                                                       /
141      |     |  duct gets to the end of D                           /
142      |     |  duct opens F too                                   /
143      |     V                                                    /
144      |  Finishing                                              /
145      |   F: innd writing, duct reading                        |
146      |   D: duct finishing                                    V
147      |     |                                            Dropping
148      |     |  duct finishes processing D                 F: ENOENT
149      |     V  duct unlinks D                             D: duct reading
150      |     |                                                  |
151      `--<--'                                                  | duct finishes
152                                                               |  processing D
153                                                               | duct unlinks D
154                                                               | duct exits
155                                                               V
156                                                         Dropped
157                                                          F: ENOENT
158                                                          D: ENOENT
159                                                          duct not running
160
161    "duct reading" means innduct is reading the file but also
162    overwriting processed tokens.
163
164  * ->8- -^L-
165  *
166  * rune for printing diagrams:
167
168 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
169
170  *
171  */
172
173 /*============================== PROGRAM ==============================*/
174
175 #define _GNU_SOURCE 1
176
177 #include "config.h"
178 #include "storage.h"
179 #include "nntp.h"
180 #include "libinn.h"
181 #include "inndcomm.h"
182
183 #include "inn/list.h"
184 #include "inn/innconf.h"
185
186 #include <sys/uio.h>
187 #include <sys/types.h>
188 #include <sys/wait.h>
189 #include <sys/stat.h>
190 #include <sys/socket.h>
191 #include <sys/un.h>
192 #include <unistd.h>
193 #include <string.h>
194 #include <signal.h>
195 #include <stdio.h>
196 #include <errno.h>
197 #include <syslog.h>
198 #include <fcntl.h>
199 #include <stdarg.h>
200 #include <assert.h>
201 #include <stdlib.h>
202 #include <stddef.h>
203 #include <glob.h>
204 #include <time.h>
205 #include <math.h>
206 #include <ctype.h>
207
208 #include <oop.h>
209 #include <oop-read.h>
210
211 /*----- general definitions, probably best not changed -----*/
212
213 #define CONNCHILD_ESTATUS_STREAM   24
214 #define CONNCHILD_ESTATUS_NOSTREAM 25
215
216 #define INNDCOMMCHILD_ESTATUS_FAIL     26
217 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
218
219 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
220 #define MAX_CONTROL_COMMAND 1000
221
222 #define VA                va_list al;  va_start(al,fmt)
223 #define PRINTF(f,a)       __attribute__((__format__(printf,f,a)))
224 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
225 #define NORET             __attribute__((__noreturn__))
226
227 #define NEW(ptr)              ((ptr)= zxmalloc(sizeof(*(ptr))))
228 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
229
230 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
231
232 #define FOR_CONN(conn) \
233   for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
234
235 /*----- doubly linked lists -----*/
236
237 #define ISNODE(T)   struct node list_node
238 #define DEFLIST(T)                              \
239    typedef struct {                             \
240      union { struct list li; T *for_type; } u;  \
241      int count;                                 \
242    } T##List
243
244 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
245
246 #define LIST_CHECKCANHAVENODE(l,n) \
247   ((void)((n) == ((l).u.for_type))) /* just for the type check */
248
249 #define LIST_ADDSOMEHOW(l,n,list_addsomehow)    \
250  ( LIST_CHECKCANHAVENODE(l,n),                  \
251    list_addsomehow(&(l).u.li, NODE((n))),       \
252    (void)(l).count++                            \
253    )
254
255 #define LIST_REMSOMEHOW(l,list_remsomehow)      \
256  ( (typeof((l).u.for_type))                     \
257    ( (l).count                                  \
258      ? ( (l).count--,                           \
259          list_remsomehow(&(l).u.li) )           \
260      : 0                                        \
261      )                                          \
262    )
263
264
265 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
266 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
267 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
268 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
269
270 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
271 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
272 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
273 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
274
275 #define LIST_REMOVE(l,n)                        \
276  ( LIST_CHECKCANHAVENODE(l,n),                  \
277    list_remove(NODE((n))),                      \
278    (void)(l).count--                            \
279    )
280
281 #define LIST_INSERT(l,n,pred)                                   \
282  ( LIST_CHECKCANHAVENODE(l,n),                                  \
283    LIST_CHECKCANHAVENODE(l,pred),                               \
284    list_insert((struct list*)&(l), NODE((n)), NODE((pred))),    \
285    (void)(l).count++                                            \
286    )
287
288 /*----- type predeclarations -----*/
289
290 typedef struct Conn Conn;
291 typedef struct Article Article;
292 typedef struct InputFile InputFile;
293 typedef struct XmitDetails XmitDetails;
294 typedef struct Filemon_Perfile Filemon_Perfile;
295 typedef enum StateMachineState StateMachineState;
296 typedef struct ControlCommand ControlCommand;
297
298 DEFLIST(Conn);
299 DEFLIST(Article);
300
301 /*----- function predeclarations -----*/
302
303 static void conn_maybe_write(Conn *conn);
304 static void conn_make_some_xmits(Conn *conn);
305 static void *conn_write_some_xmits(Conn *conn);
306
307 static void xmit_free(XmitDetails *d);
308
309 #define SMS(newstate, periods, why) \
310    (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
311 static void statemc_setstate(StateMachineState newsms, int periods,
312                              const char *forlog, const char *why);
313
314 static void statemc_start_flush(const char *why); /* Normal => Flushing */
315 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
316 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
317
318 static void article_done(Conn *conn, Article *art, int whichcount);
319
320 static void check_assign_articles(void);
321 static void queue_check_input_done(void);
322
323 static void statemc_check_flushing_done(void);
324 static void statemc_check_backlog_done(void);
325
326 static void postfork(void);
327 static void period(void);
328
329 static void open_defer(void);
330 static void close_defer(void);
331 static void search_backlog_file(void);
332 static void preterminate(void);
333 static void raise_default(int signo) NORET;
334 static char *debug_report_ipf(InputFile *ipf);
335
336 static void inputfile_reading_start(InputFile *ipf);
337 static void inputfile_reading_stop(InputFile *ipf);
338
339 static void filemon_start(InputFile *ipf);
340 static void filemon_stop(InputFile *ipf);
341 static void filemon_callback(InputFile *ipf);
342
343 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
344 static void connfail(Conn *conn, const char *fmt, ...)         PRINTF(2,3);
345
346 static const oop_rd_style peer_rd_style;
347 static oop_rd_call peer_rd_err, peer_rd_ok;
348
349 /*----- configuration options -----*/
350 /* when changing defaults, remember to update the manpage */
351
352 static const char *sitename, *remote_host;
353 static const char *feedfile, *realsockdir="/tmp/innduct.control";
354 static int quiet_multiple=0;
355 static int become_daemon=1, try_filemon=1;
356 static int try_stream=1;
357 static int port=119;
358 static const char *inndconffile;
359
360 static int max_connections=10;
361 static int max_queue_per_conn=200;
362 static int target_max_feedfile_size=100000;
363 static int period_seconds=60;
364 static int filepoll_seconds=5;
365
366 static int connection_setup_timeout=200;
367 static int inndcomm_flush_timeout=100;
368
369 static double nocheck_thresh= 95.0; /* converted from percentage by main */
370 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
371
372 /* all these are initialised to seconds, and converted to periods in main */
373 static int reconnect_delay_periods=1000;
374 static int flushfail_retry_periods=1000;
375 static int backlog_retry_minperiods=50;
376 static int backlog_spontrescan_periods=300;
377 static int spontaneous_flush_periods=100000;
378 static int need_activity_periods=1000;
379
380 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
381 static int max_bad_data_initial= 30;
382   /* in one corrupt 4096-byte block the number of newlines has
383    * mean 16 and standard deviation 3.99.  30 corresponds to z=+3.5 */
384
385
386 /*----- statistics -----*/
387
388 typedef enum {      /* in queue                 in conn->sent             */
389   art_Unchecked,    /*   not checked, not sent    checking                */
390   art_Wanted,       /*   checked, wanted          sent body as requested  */
391   art_Unsolicited,  /*   -                        sent body without check */
392   art_MaxState,
393 } ArtState;
394
395 static const char *const artstate_names[]=
396   { "Unchecked", "Wanted", "Unsolicited", 0 };
397
398 #define RESULT_COUNTS(RCS,RCN)                  \
399   RCS(sent)                                     \
400   RCS(accepted)                                 \
401   RCN(unwanted)                                 \
402   RCN(rejected)                                 \
403   RCN(deferred)                                 \
404   RCN(missing)                                  \
405   RCN(connretry)
406
407 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
408 #define RCI_TRIPLE_VALS_BASE(counts,x)          \
409        counts[art_Unchecked] x                  \
410        + counts[art_Wanted] x                   \
411        + counts[art_Unsolicited] x,             \
412        counts[art_Unchecked] x                  \
413        , counts[art_Wanted] x                   \
414        , counts[art_Unsolicited] x
415
416 typedef enum {
417 #define RC_INDEX(x) RC_##x,
418   RESULT_COUNTS(RC_INDEX, RC_INDEX)
419   RCI_max
420 } ResultCountIndex;
421
422
423 /*----- transmission buffers -----*/
424
425 #define CONNIOVS 128
426
427 typedef enum {
428   xk_Const, xk_Artdata
429 } XmitKind;
430
431 struct XmitDetails {
432   XmitKind kind;
433   union {
434     ARTHANDLE *sm_art;
435   } info;
436 };
437
438
439 /*----- core operational data structure types -----*/
440
441 struct InputFile {
442   /* This is also an instance of struct oop_readable */
443   struct oop_readable readable; /* first */
444   oop_readable_call *readable_callback;
445   void *readable_callback_user;
446
447   int fd;
448   Filemon_Perfile *filemon;
449
450   oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
451   long inprogress; /* no. of articles read but not processed */
452   off_t offset;
453   int skippinglong;
454
455   int counts[art_MaxState][RCI_max];
456   int readcount_ok, readcount_blank, readcount_err;
457   char path[];
458 };
459
460 struct Article {
461   ISNODE(Article);
462   ArtState state;
463   int midlen, missing;
464   InputFile *ipf;
465   TOKEN token;
466   off_t offset;
467   int blanklen;
468   char messageid[1];
469 };
470
471 #define SMS_LIST(X)                             \
472   X(NORMAL)                                     \
473   X(FLUSHING)                                   \
474   X(FLUSHFAILED)                                \
475   X(SEPARATED)                                  \
476   X(DROPPING)                                   \
477   X(DROPPED)
478
479 enum StateMachineState {
480 #define SMS_DEF_ENUM(s) sm_##s,
481   SMS_LIST(SMS_DEF_ENUM)
482 };
483
484 static const char *sms_names[]= {
485 #define SMS_DEF_NAME(s) #s ,
486   SMS_LIST(SMS_DEF_NAME)
487   0
488 };
489
490 struct Conn {
491   ISNODE(Conn);
492   int fd; /* may be 0, meaning closed (during construction/destruction) */
493   oop_read *rd; /* likewise */
494   int max_queue, stream, quitting;
495   int since_activity; /* periods */
496   ArticleList waiting; /* not yet told peer */
497   ArticleList priority; /* peer says send it now */
498   ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
499   struct iovec xmit[CONNIOVS];
500   XmitDetails xmitd[CONNIOVS];
501   int xmitu;
502 };
503
504
505 /*----- general operational variables -----*/
506
507 /* main initialises */
508 static oop_source *loop;
509 static ConnList conns;
510 static ArticleList queue;
511 static char *path_lock, *path_flushing, *path_defer;
512 static char *path_control, *path_dump;
513 static char *globpat_backlog;
514 static pid_t self_pid;
515
516 /* statemc_init initialises */
517 static StateMachineState sms;
518 static int until_flush;
519 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
520 static FILE *defer;
521
522 /* initialisation to 0 is good */
523 static int until_connect, until_backlog_nextscan;
524 static double accept_proportion;
525 static int nocheck, nocheck_reported, in_child;
526
527 /* for simulation, debugging, etc. */
528 int simulate_flush= -1;
529
530 /*========== logging ==========*/
531
532 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
533 static void logcore(int sysloglevel, const char *fmt, ...) {
534   VA;
535   if (become_daemon) {
536     vsyslog(sysloglevel,fmt,al);
537   } else {
538     if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
539     vfprintf(stderr,fmt,al);
540     putc('\n',stderr);
541   }
542   va_end(al);
543 }
544
545 static void logv(int sysloglevel, const char *pfx, int errnoval,
546                  const char *fmt, va_list al) PRINTF(5,0);
547 static void logv(int sysloglevel, const char *pfx, int errnoval,
548                  const char *fmt, va_list al) {
549   char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
550   vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
551   msgbuf[sizeof(msgbuf)-1]= 0;
552
553   if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
554     sysloglevel= LOG_ERR; /* run by wrong user, probably */
555
556   logcore(sysloglevel, "<%s>%s: %s%s%s",
557          sitename, pfx, msgbuf,
558          errnoval>=0 ? ": " : "",
559          errnoval>=0 ? strerror(errnoval) : "");
560 }
561
562 #define diewrap(fn, pfx, sysloglevel, err, estatus)             \
563   static void fn(const char *fmt, ...) NORET_PRINTF(1,2);       \
564   static void fn(const char *fmt, ...) {                        \
565     preterminate();                                             \
566     VA;                                                         \
567     logv(sysloglevel, pfx, err, fmt, al);                       \
568     exit(estatus);                                              \
569   }
570
571 #define logwrap(fn, pfx, sysloglevel, err)              \
572   static void fn(const char *fmt, ...) PRINTF(1,2);     \
573   static void fn(const char *fmt, ...) {                \
574     VA;                                                 \
575     logv(sysloglevel, pfx, err, fmt, al);               \
576     va_end(al);                                         \
577   }
578
579 diewrap(sysdie,   " critical", LOG_CRIT,    errno, 16);
580 diewrap(die,      " critical", LOG_CRIT,    -1,    16);
581
582 diewrap(sysfatal, " fatal",    LOG_ERR,     errno, 12);
583 diewrap(fatal,    " fatal",    LOG_ERR,     -1,    12);
584
585 logwrap(syswarn,  " warning",  LOG_WARNING, errno);
586 logwrap(warn,     " warning",  LOG_WARNING, -1);
587
588 logwrap(notice,   " notice",   LOG_NOTICE,  -1);
589 logwrap(info,     " info",     LOG_INFO,    -1);
590 logwrap(debug,    " debug",    LOG_DEBUG,   -1);
591
592
593 /*========== utility functions etc. ==========*/
594
595 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
596 static char *xvasprintf(const char *fmt, va_list al) {
597   char *str;
598   int rc= vasprintf(&str,fmt,al);
599   if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
600   return str;
601 }
602 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
603 static char *xasprintf(const char *fmt, ...) {
604   VA;
605   char *str= xvasprintf(fmt,al);
606   va_end(al);
607   return str;
608 }
609
610 static int close_perhaps(int *fd) {
611   if (*fd <= 0) return 0;
612   int r= close(*fd);
613   *fd=0;
614   return r;
615 }
616 static void xclose(int fd, const char *what, const char *what2) {
617   int r= close(fd);
618   if (r) sysdie("close %s%s",what,what2?what2:"");
619 }
620 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
621   if (*fd <= 0) return;
622   xclose(*fd,what,what2);
623   *fd=0;
624 }
625
626 static pid_t xfork(const char *what) {
627   pid_t child;
628
629   child= fork();
630   if (child==-1) sysfatal("cannot fork for %s",what);
631   debug("forked %s %ld", what, (unsigned long)child);
632   if (!child) postfork();
633   return child;
634 }
635
636 static void on_fd_read_except(int fd, oop_call_fd callback) {
637   loop->on_fd(loop, fd, OOP_READ,      callback, 0);
638   loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
639 }
640 static void cancel_fd_read_except(int fd) {
641   loop->cancel_fd(loop, fd, OOP_READ);
642   loop->cancel_fd(loop, fd, OOP_EXCEPTION);
643 }
644
645 static void report_child_status(const char *what, int status) {
646   if (WIFEXITED(status)) {
647     int es= WEXITSTATUS(status);
648     if (es)
649       warn("%s: child died with error exit status %d", what, es);
650   } else if (WIFSIGNALED(status)) {
651     int sig= WTERMSIG(status);
652     const char *sigstr= strsignal(sig);
653     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
654     if (sigstr)
655       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
656     else
657       warn("%s: child died due to unknown fatal signal %d%s",
658            what, sig, coredump);
659   } else {
660     warn("%s: child died with unknown wait status %d", what,status);
661   }
662 }
663
664 static int xwaitpid(pid_t *pid, const char *what) {
665   int status;
666
667   int r= kill(*pid, SIGKILL);
668   if (r) sysdie("cannot kill %s child", what);
669
670   pid_t got= waitpid(*pid, &status, 0);
671   if (got==-1) sysdie("cannot reap %s child", what);
672   if (got==0) die("cannot reap %s child", what);
673
674   *pid= 0;
675
676   return status;
677 }
678
679 static void *zxmalloc(size_t sz) {
680   void *p= xmalloc(sz);
681   memset(p,0,sz);
682   return p;
683 }
684
685 static void xunlink(const char *path, const char *what) {
686   int r= unlink(path);
687   if (r) sysdie("can't unlink %s %s", path, what);
688 }
689
690 static time_t xtime(void) {
691   time_t now= time(0);
692   if (now==-1) sysdie("time(2) failed");
693   return now;
694 }
695
696 static void xsigaction(int signo, const struct sigaction *sa) {
697   int r= sigaction(signo,sa,0);
698   if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
699 }
700
701 static void xsigsetdefault(int signo) {
702   struct sigaction sa;
703   memset(&sa,0,sizeof(sa));
704   sa.sa_handler= SIG_DFL;
705   xsigaction(signo,&sa);
706 }
707
708 static void xgettimeofday(struct timeval *tv_r) {
709   int r= gettimeofday(tv_r,0);
710   if (r) sysdie("gettimeofday(2) failed");
711 }
712
713 static void xsetnonblock(int fd, int nonblocking) {
714   int errnoval= oop_fd_nonblock(fd, nonblocking);
715   if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
716 }
717
718 static void check_isreg(const struct stat *stab, const char *path,
719                         const char *what) {
720   if (!S_ISREG(stab->st_mode))
721     die("%s %s not a plain file (mode 0%lo)",
722         what, path, (unsigned long)stab->st_mode);
723 }
724
725 static void xfstat(int fd, struct stat *stab_r, const char *what) {
726   int r= fstat(fd, stab_r);
727   if (r) sysdie("could not fstat %s", what);
728 }
729
730 static void xfstat_isreg(int fd, struct stat *stab_r,
731                          const char *path, const char *what) {
732   xfstat(fd, stab_r, what);
733   check_isreg(stab_r, path, what);
734 }
735
736 static void xlstat_isreg(const char *path, struct stat *stab,
737                          int *enoent_r /* 0 means ENOENT is fatal */,
738                          const char *what) {
739   int r= lstat(path, stab);
740   if (r) {
741     if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
742     sysdie("could not lstat %s %s", what, path);
743   }
744   if (enoent_r) *enoent_r= 0;
745   check_isreg(stab, path, what);
746 }
747
748 static int samefile(const struct stat *a, const struct stat *b) {
749   assert(S_ISREG(a->st_mode));
750   assert(S_ISREG(b->st_mode));
751   return (a->st_ino == b->st_ino &&
752           a->st_dev == b->st_dev);
753 }
754
755 static char *sanitise(const char *input, int len) {
756   static char sanibuf[100]; /* returns pointer to this buffer! */
757
758   const char *p= input;
759   const char *endp= len>=0 ? input+len : 0;
760   char *q= sanibuf;
761   *q++= '`';
762   for (;;) {
763     if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
764     int c= (!endp || p<endp) ? *p++ : 0;
765     if (!c) { *q++= '\''; *q=0; break; }
766     if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
767     sprintf(q,"\\x%02x",c);
768     q += 4;
769   }
770   return sanibuf;
771 }
772
773 static int isewouldblock(int errnoval) {
774   return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
775 }
776
777 /*========== command and control connections ==========*/
778
779 static int control_master;
780
781 typedef struct ControlConn ControlConn;
782 struct ControlConn {
783   void (*destroy)(ControlConn*);
784   int fd;
785   oop_read *rd;
786   FILE *out;
787   union {
788     struct sockaddr sa;
789     struct sockaddr_un un;
790   } sa;
791   socklen_t salen;
792 };
793
794 static const oop_rd_style control_rd_style= {
795   OOP_RD_DELIM_STRIP, '\n',
796   OOP_RD_NUL_FORBID,
797   OOP_RD_SHORTREC_FORBID
798 };
799
800 static void control_destroy(ControlConn *cc) {
801   cc->destroy(cc);
802 }
803
804 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
805   if (ferror(cc->out) | fflush(cc->out)) {
806     info("CTRL%d write error %s", cc->fd, strerror(errno));
807     control_destroy(cc);
808   }
809 }
810
811 static void control_prompt(ControlConn *cc /* may destroy*/) {
812   fprintf(cc->out, "%s| ", sitename);
813   control_checkouterr(cc);
814 }
815
816 struct ControlCommand {
817   const char *cmd;
818   void (*f)(ControlConn *cc, const ControlCommand *ccmd,
819             const char *arg, size_t argsz);
820   void *xdata;
821   int xval;
822 };
823
824 static const ControlCommand control_commands[];
825
826 #define CCMD(wh)                                                        \
827   static void ccmd_##wh(ControlConn *cc, const ControlCommand *c,       \
828                         const char *arg, size_t argsz)
829
830 CCMD(help) {
831   fputs("commands:\n", cc->out);
832   const ControlCommand *ccmd;
833   for (ccmd=control_commands; ccmd->cmd; ccmd++)
834     fprintf(cc->out, " %s\n", ccmd->cmd);
835   fputs("NB: permissible arguments are not shown above."
836         "  Not all commands listed are safe.  See innduct(8).\n", cc->out);
837 }
838
839 CCMD(flush) {
840   int ok= trigger_flush_ok();
841   if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
842 }
843
844 CCMD(stop) {
845   preterminate();
846   notice("terminating (CTRL%d)",cc->fd);
847   raise_default(SIGTERM);
848   abort();
849 }
850
851 CCMD(dump);
852
853 /* messing with our head: */
854 CCMD(period) { period(); }
855 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
856 CCMD(setint) { *(int*)c->xdata= c->xval; }
857 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
858
859 static const ControlCommand control_commands[]= {
860   { "h",             ccmd_help      },
861   { "flush",         ccmd_flush     },
862   { "stop",          ccmd_stop      },
863   { "dump q",        ccmd_dump, 0,0 },
864   { "dump a",        ccmd_dump, 0,1 },
865
866   { "p",             ccmd_period    },
867
868 #define POKES(cmd,func)                                                 \
869   { cmd "flush",     func,           &until_flush,             1 },     \
870   { cmd "conn",      func,           &until_connect,           0 },     \
871   { cmd "blscan",    func,           &until_backlog_nextscan,  0 },
872 POKES("next ", ccmd_setint)
873 POKES("prod ", ccmd_setint_period)
874
875   { "pretend flush", ccmd_setintarg, &simulate_flush             },
876   { "wedge blscan",  ccmd_setint,    &until_backlog_nextscan, -1 },
877   { 0 }
878 };
879
880 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
881                            const char *errmsg, int errnoval,
882                            const char *data, size_t recsz, void *cc_v) {
883   ControlConn *cc= cc_v;
884
885   if (!data) {
886     info("CTRL%d closed", cc->fd);
887     cc->destroy(cc);
888     return OOP_CONTINUE;
889   }
890
891   if (recsz == 0) goto prompt;
892
893   const ControlCommand *ccmd;
894   for (ccmd=control_commands; ccmd->cmd; ccmd++) {
895     int l= strlen(ccmd->cmd);
896     if (recsz < l) continue;
897     if (recsz > l && data[l] != ' ') continue;
898     if (memcmp(data, ccmd->cmd, l)) continue;
899
900     int argl= (int)recsz - (l+1); 
901     ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
902     goto prompt;
903   }
904
905   fputs("unknown command; h for help\n", cc->out);
906
907  prompt:
908   control_prompt(cc);
909   return OOP_CONTINUE;
910 }
911
912 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
913                             const char *errmsg, int errnoval,
914                             const char *data, size_t recsz, void *cc_v) {
915   ControlConn *cc= cc_v;
916   
917   info("CTRL%d read error %s", cc->fd, errmsg);
918   cc->destroy(cc);
919   return OOP_CONTINUE;
920 }
921
922 static int control_conn_startup(ControlConn *cc /* may destroy*/,
923                                 const char *how) {
924   cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
925   if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
926
927   int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
928                       control_rd_ok, cc,
929                       control_rd_err, cc);
930   if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
931
932   info("CTRL%d %s ready", cc->fd, how);
933   control_prompt(cc);
934   return 0;
935 }
936
937 static void control_stdio_destroy(ControlConn *cc) {
938   if (cc->rd) {
939     oop_rd_cancel(cc->rd);
940     errno= oop_rd_delete_tidy(cc->rd);
941     if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
942   }
943   free(cc);
944 }
945
946 static void control_stdio(void) {
947   NEW_DECL(ControlConn *,cc);
948   cc->destroy= control_stdio_destroy;
949
950   cc->fd= 0;
951   cc->out= stdout;
952   int r= control_conn_startup(cc,"stdio");
953   if (r) cc->destroy(cc);
954 }
955
956 static void control_accepted_destroy(ControlConn *cc) {
957   if (cc->rd) {
958     oop_rd_cancel(cc->rd);
959     oop_rd_delete_kill(cc->rd);
960   }
961   if (cc->out) { fclose(cc->out); cc->fd=0; }
962   close_perhaps(&cc->fd);
963   free(cc);
964 }
965
966 static void *control_master_readable(oop_source *lp, int master,
967                                      oop_event ev, void *u) {
968   NEW_DECL(ControlConn *,cc);
969   cc->destroy= control_accepted_destroy;
970
971   cc->salen= sizeof(cc->sa);
972   cc->fd= accept(master, &cc->sa.sa, &cc->salen);
973   if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
974
975   cc->out= fdopen(cc->fd, "w");
976   if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
977
978   int r= control_conn_startup(cc, "accepted");
979   if (r) goto x;
980
981   return OOP_CONTINUE;
982
983  x:
984   cc->destroy(cc);
985   return OOP_CONTINUE;
986 }
987
988 #define NOCONTROL(...) do{                                              \
989     syswarn("no control socket, because failed to " __VA_ARGS__);       \
990     goto nocontrol;                                                     \
991   }while(0)
992
993 static void control_init(void) {
994   char *real=0;
995   
996   union {
997     struct sockaddr sa;
998     struct sockaddr_un un;
999   } sa;
1000
1001   memset(&sa,0,sizeof(sa));
1002   int maxlen= sizeof(sa.un.sun_path);
1003
1004   int reallen= readlink(path_control, sa.un.sun_path, maxlen);
1005   if (reallen<0) {
1006     if (errno != ENOENT)
1007       NOCONTROL("readlink control socket symlink path %s", path_control);
1008   }
1009   if (reallen >= maxlen) {
1010     debug("control socket symlink path too long (r=%d)",reallen);
1011     xunlink(path_control, "old (overlong) control socket symlink");
1012     reallen= -1;
1013   }
1014   
1015   if (reallen<0) {
1016     struct stat stab;
1017     int r= lstat(realsockdir,&stab);
1018     if (r) {
1019       if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
1020
1021       r= mkdir(realsockdir, 0700);
1022       if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
1023
1024     } else {
1025       uid_t self= geteuid();
1026       if (!S_ISDIR(stab.st_mode) ||
1027           stab.st_uid != self ||
1028           stab.st_mode & 0007) {
1029         warn("no control socket, because real socket directory"
1030              " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
1031              !!S_ISDIR(stab.st_mode),
1032              (unsigned long)stab.st_uid, (unsigned long)self,
1033              (unsigned long)stab.st_mode & 0777UL);
1034         goto nocontrol;
1035       }
1036     }
1037
1038     real= xasprintf("%s/s%lx.%lx", realsockdir,
1039                     (unsigned long)xtime(), (unsigned long)self_pid);
1040     int reallen= strlen(real);
1041
1042     if (reallen >= maxlen) {
1043       warn("no control socket, because tmpnam gave overly-long path"
1044            " %s", real);
1045       goto nocontrol;
1046     }
1047     r= symlink(real, path_control);
1048     if (r) NOCONTROL("make control socket path %s a symlink to real"
1049                      " socket path %s", path_control, real);
1050     memcpy(sa.un.sun_path, real, reallen);
1051   }
1052
1053   int r= unlink(sa.un.sun_path);
1054   if (r && errno!=ENOENT)
1055     NOCONTROL("remove old real socket %s", sa.un.sun_path);
1056
1057   control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1058   if (control_master<0) NOCONTROL("create new control socket");
1059
1060   sa.un.sun_family= AF_UNIX;
1061   int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1062   r= bind(control_master, &sa.sa, sl);
1063   if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1064
1065   r= listen(control_master, 5);
1066   if (r) NOCONTROL("listen");
1067
1068   xsetnonblock(control_master, 1);
1069
1070   loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1071   info("control socket ok, real path %s", sa.un.sun_path);
1072
1073   return;
1074
1075  nocontrol:
1076   free(real);
1077   xclose_perhaps(&control_master, "control master",0);
1078   return;
1079 }
1080
1081 /*========== management of connections ==========*/
1082
1083 static void conn_closefd(Conn *conn, const char *msgprefix) {
1084   int r= close_perhaps(&conn->fd);
1085   if (r) info("C%d %serror closing socket: %s",
1086               conn->fd, msgprefix, strerror(errno));
1087 }
1088
1089 static void conn_dispose(Conn *conn) {
1090   if (!conn) return;
1091   if (conn->rd) {
1092     oop_rd_cancel(conn->rd);
1093     oop_rd_delete_kill(conn->rd);
1094     conn->rd= 0;
1095   }
1096   if (conn->fd) {
1097     loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1098     loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1099   }
1100   conn_closefd(conn,"");
1101   free(conn);
1102   until_connect= reconnect_delay_periods;
1103 }
1104
1105 static void *conn_exception(oop_source *lp, int fd,
1106                             oop_event ev, void *conn_v) {
1107   Conn *conn= conn_v;
1108   unsigned char ch;
1109   assert(fd == conn->fd);
1110   assert(ev == OOP_EXCEPTION);
1111   int r= read(conn->fd, &ch, 1);
1112   if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1113   else connfail(conn,"exceptional condition on socket (peer sent urgent"
1114                 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1115   return OOP_CONTINUE;
1116 }  
1117
1118 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1119   int requeue[art_MaxState];
1120   memset(requeue,0,sizeof(requeue));
1121
1122   Article *art;
1123   while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art);
1124   while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art);
1125   while ((art= LIST_REMHEAD(conn->sent))) {
1126     requeue[art->state]++;
1127     if (art->state==art_Unsolicited) art->state= art_Unchecked;
1128     LIST_ADDTAIL(queue,art);
1129   }
1130
1131   int i;
1132   XmitDetails *d;
1133   for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1134     xmit_free(d);
1135
1136   char *m= xvasprintf(fmt,al);
1137   warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1138        conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1139   free(m);
1140
1141   LIST_REMOVE(conns,conn);
1142   conn_dispose(conn);
1143   check_assign_articles();
1144 }
1145
1146 static void connfail(Conn *conn, const char *fmt, ...) {
1147   va_list al;
1148   va_start(al,fmt);
1149   vconnfail(conn,fmt,al);
1150   va_end(al);
1151 }
1152
1153 static void check_idle_conns(void) {
1154   Conn *conn;
1155   FOR_CONN(conn)
1156     conn->since_activity++;
1157  search_again:
1158   FOR_CONN(conn) {
1159     if (conn->since_activity <= need_activity_periods) continue;
1160
1161     /* We need to shut this down */
1162     if (conn->quitting)
1163       connfail(conn,"timed out waiting for response to QUIT");
1164     else if (conn->sent.count)
1165       connfail(conn,"timed out waiting for responses");
1166     else if (conn->waiting.count || conn->priority.count)
1167       connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1168     else if (conn->xmitu)
1169       connfail(conn,"peer has been sending responses"
1170                " before receiving our commands!");
1171     else {
1172       static const char quitcmd[]= "QUIT\r\n";
1173       int todo= sizeof(quitcmd)-1;
1174       const char *p= quitcmd;
1175       for (;;) {
1176         int r= write(conn->fd, p, todo);
1177         if (r<0) {
1178           if (isewouldblock(errno))
1179             connfail(conn, "blocked writing QUIT to idle connection");
1180           else
1181             connfail(conn, "failed to write QUIT to idle connection: %s",
1182                      strerror(errno));
1183           break;
1184         }
1185         assert(r<=todo);
1186         todo -= r;
1187         if (!todo) {
1188           conn->quitting= 1;
1189           conn->since_activity= 0;
1190           debug("C%d is idle, quitting", conn->fd);
1191           break;
1192         }
1193       }
1194     }
1195     goto search_again;
1196   }
1197 }  
1198
1199 /*---------- making new connections ----------*/
1200
1201 static pid_t connecting_child;
1202 static int connecting_fdpass_sock;
1203
1204 static void connect_attempt_discard(void) {
1205   if (connecting_child) {
1206     int status= xwaitpid(&connecting_child, "connect");
1207     if (!(WIFEXITED(status) ||
1208           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1209       report_child_status("connect", status);
1210   }
1211   if (connecting_fdpass_sock) {
1212     cancel_fd_read_except(connecting_fdpass_sock);
1213     xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1214   }
1215 }
1216
1217 #define PREP_DECL_MSG_CMSG(msg)                 \
1218   char msgbyte= 0;                              \
1219   struct iovec msgiov;                          \
1220   msgiov.iov_base= &msgbyte;                    \
1221   msgiov.iov_len= 1;                            \
1222   struct msghdr msg;                            \
1223   memset(&msg,0,sizeof(msg));                   \
1224   char msg##cbuf[CMSG_SPACE(sizeof(int))];      \
1225   msg.msg_iov= &msgiov;                         \
1226   msg.msg_iovlen= 1;                            \
1227   msg.msg_control= msg##cbuf;                   \
1228   msg.msg_controllen= sizeof(msg##cbuf);
1229
1230 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1231   Conn *conn= 0;
1232
1233   assert(fd == connecting_fdpass_sock);
1234
1235   PREP_DECL_MSG_CMSG(msg);
1236   
1237   ssize_t rs= recvmsg(fd, &msg, 0);
1238   if (rs<0) {
1239     if (isewouldblock(errno)) return OOP_CONTINUE;
1240     syswarn("failed to read socket from connecting child");
1241     goto x;
1242   }
1243
1244   NEW(conn);
1245   LIST_INIT(conn->waiting);
1246   LIST_INIT(conn->priority);
1247   LIST_INIT(conn->sent);
1248
1249   struct cmsghdr *h= 0;
1250   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1251   if (!h) {
1252     int status= xwaitpid(&connecting_child, "connect child (broken)");
1253
1254     if (WIFEXITED(status)) {
1255       if (WEXITSTATUS(status) != 0 &&
1256           WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1257           WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1258         /* child already reported the problem */;
1259       else {
1260         if (e == OOP_EXCEPTION)
1261           warn("connect: connection child exited code %d but"
1262                " unexpected exception on fdpass socket",
1263                WEXITSTATUS(status));
1264         else
1265           warn("connect: connection child exited code %d but"
1266                " no cmsg (rs=%d)",
1267                WEXITSTATUS(status), (int)rs);
1268       }
1269     } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1270       warn("connect: connection attempt timed out");
1271     } else {
1272       report_child_status("connect", status);
1273     }
1274     goto x;
1275   }
1276
1277 #define CHK(field, val)                                                  \
1278   if (h->cmsg_##field != val) {                                          \
1279     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1280         h->cmsg_##field, val);                                           \
1281     goto x;                                                              \
1282   }
1283   CHK(level, SOL_SOCKET);
1284   CHK(type,  SCM_RIGHTS);
1285   CHK(len,   CMSG_LEN(sizeof(conn->fd)));
1286 #undef CHK
1287
1288   if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1289
1290   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1291
1292   int status;
1293   pid_t got= waitpid(connecting_child, &status, 0);
1294   if (got==-1) sysdie("connect: real wait for child");
1295   assert(got == connecting_child);
1296   connecting_child= 0;
1297
1298   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1299   int es= WEXITSTATUS(status);
1300   switch (es) {
1301   case CONNCHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
1302   case CONNCHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
1303   default:
1304     fatal("connect: child gave unexpected exit status %d", es);
1305   }
1306
1307   /* Phew! */
1308   conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1309
1310   loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1311   conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1312   if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1313   int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1314                      &peer_rd_ok, conn,
1315                      &peer_rd_err, conn);
1316   if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1317
1318   notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1319   LIST_ADDHEAD(conns, conn);
1320
1321   connect_attempt_discard();
1322   check_assign_articles();
1323   return OOP_CONTINUE;
1324
1325  x:
1326   conn_dispose(conn);
1327   connect_attempt_discard();
1328   return OOP_CONTINUE;
1329 }
1330
1331 static int allow_connect_start(void) {
1332   return conns.count < max_connections
1333     && !connecting_child
1334     && !until_connect;
1335 }
1336
1337 static void connect_start(void) {
1338   assert(!connecting_child);
1339   assert(!connecting_fdpass_sock);
1340
1341   info("starting connection attempt");
1342
1343   int socks[2];
1344   int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1345   if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1346
1347   connecting_child= xfork("connection");
1348
1349   if (!connecting_child) {
1350     FILE *cn_from, *cn_to;
1351     char buf[NNTP_STRLEN+100];
1352     int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1353
1354     xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1355
1356     alarm(connection_setup_timeout);
1357     if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1358       int l= strlen(buf);
1359       int stripped=0;
1360       while (l>0) {
1361         unsigned char c= buf[l-1];
1362         if (!isspace(c)) break;
1363         if (c=='\n' || c=='\r') stripped=1;
1364         --l;
1365       }
1366       if (!buf[0]) {
1367         sysfatal("connect: connection attempt failed");
1368       } else {
1369         buf[l]= 0;
1370         fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1371               sanitise(buf,-1));
1372       }
1373     }
1374     if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1375       sysfatal("connect: authentication failed");
1376     if (try_stream) {
1377       if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1378           fflush(cn_to))
1379         sysfatal("connect: could not send MODE STREAM");
1380       buf[sizeof(buf)-1]= 0;
1381       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1382         if (ferror(cn_from))
1383           sysfatal("connect: could not read response to MODE STREAM");
1384         else
1385           fatal("connect: connection close in response to MODE STREAM");
1386       }
1387       int l= strlen(buf);
1388       assert(l>=1);
1389       if (buf[l-1]!='\n')
1390         fatal("connect: response to MODE STREAM is too long: %.100s...",
1391               sanitise(buf,-1));
1392       l--;  if (l>0 && buf[l-1]=='\r') l--;
1393       buf[l]= 0;
1394       char *ep;
1395       int rcode= strtoul(buf,&ep,10);
1396       if (ep != &buf[3])
1397         fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1398
1399       switch (rcode) {
1400       case 203:
1401         exitstatus= CONNCHILD_ESTATUS_STREAM;
1402         break;
1403       case 480:
1404       case 500:
1405         break;
1406       default:
1407         warn("connect: unexpected response to MODE STREAM: %.50s",
1408              sanitise(buf,-1));
1409         exitstatus= 2;
1410         break;
1411       }
1412     }
1413     int fd= fileno(cn_from);
1414
1415     PREP_DECL_MSG_CMSG(msg);
1416     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1417     cmsg->cmsg_level= SOL_SOCKET;
1418     cmsg->cmsg_type=  SCM_RIGHTS;
1419     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
1420     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1421
1422     msg.msg_controllen= cmsg->cmsg_len;
1423     r= sendmsg(socks[1], &msg, 0);
1424     if (r<0) sysdie("sendmsg failed for new connection");
1425     if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1426
1427     _exit(exitstatus);
1428   }
1429
1430   xclose(socks[1], "connecting fdpass child's socket",0);
1431   connecting_fdpass_sock= socks[0];
1432   xsetnonblock(connecting_fdpass_sock, 1);
1433   on_fd_read_except(connecting_fdpass_sock, connchild_event);
1434 }
1435
1436 /*---------- assigning articles to conns, and transmitting ----------*/
1437
1438 static void check_assign_articles(void) {
1439   for (;;) {
1440     if (!queue.count)
1441       break;
1442
1443     Conn *walk, *use=0;
1444     int spare=0, inqueue=0;
1445
1446     /* Find a connection to offer this article.  We prefer a busy
1447      * connection to an idle one, provided it's not full.  We take the
1448      * first (oldest) and since that's stable, it will mean we fill up
1449      * connections in order.  That way if we have too many
1450      * connections, the spare ones will go away eventually.
1451      */
1452     FOR_CONN(walk) {
1453       if (walk->quitting) continue;
1454       inqueue= walk->sent.count + walk->priority.count
1455              + walk->waiting.count;
1456       spare= walk->max_queue - inqueue;
1457       assert(inqueue <= max_queue_per_conn);
1458       assert(spare >= 0);
1459       if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1460       else if (spare>0) /*working*/ { use= walk; break; }
1461     }
1462     if (use) {
1463       if (!inqueue) use->since_activity= 0; /* reset idle counter */
1464       while (spare>0) {
1465         Article *art= LIST_REMHEAD(queue);
1466         if (!art) break;
1467         LIST_ADDTAIL(use->waiting, art);
1468         spare--;
1469       }
1470       conn_maybe_write(use);
1471     } else if (allow_connect_start()) {
1472       until_connect= reconnect_delay_periods;
1473       connect_start();
1474       break;
1475     } else {
1476       break;
1477     }
1478   }
1479 }
1480
1481 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1482   conn_maybe_write(u);
1483   return OOP_CONTINUE;
1484 }
1485
1486 static void conn_maybe_write(Conn *conn)  {
1487   for (;;) {
1488     conn_make_some_xmits(conn);
1489     if (!conn->xmitu) {
1490       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1491       return;
1492     }
1493
1494     void *rp= conn_write_some_xmits(conn);
1495     if (rp==OOP_CONTINUE) {
1496       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1497       return;
1498     } else if (rp==OOP_HALT) {
1499       return;
1500     } else if (!rp) {
1501       /* transmitted everything */
1502     } else {
1503       abort();
1504     }
1505   }
1506 }
1507
1508 /*========== article transmission ==========*/
1509
1510 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1511                   XmitKind kind) { /* caller must then fill in details */
1512   struct iovec *v= &conn->xmit[conn->xmitu];
1513   XmitDetails *d= &conn->xmitd[conn->xmitu++];
1514   v->iov_base= (char*)data;
1515   v->iov_len= len;
1516   d->kind= kind;
1517   return d;
1518 }
1519
1520 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1521   xmit_core(conn,data,len, xk_Const);
1522 }
1523 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1524
1525 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1526   XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1527   d->info.sm_art= ah;
1528 }
1529
1530 static void xmit_free(XmitDetails *d) {
1531   switch (d->kind) {
1532   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1533   case xk_Const:                                  break;
1534   default: abort();
1535   }
1536 }
1537
1538 static void *conn_write_some_xmits(Conn *conn) {
1539   /* return values:
1540    *      0:            nothing more to write, no need to call us again
1541    *      OOP_CONTINUE: more to write but fd not writeable
1542    *      OOP_HALT:     disaster, have destroyed conn
1543    */
1544   for (;;) {
1545     int count= conn->xmitu;
1546     if (!count) return 0;
1547
1548     if (count > IOV_MAX) count= IOV_MAX;
1549     ssize_t rs= writev(conn->fd, conn->xmit, count);
1550     if (rs < 0) {
1551       if (isewouldblock(errno)) return OOP_CONTINUE;
1552       connfail(conn, "write failed: %s", strerror(errno));
1553       return OOP_HALT;
1554     }
1555     assert(rs > 0);
1556
1557     int done;
1558     for (done=0; rs && done<conn->xmitu; done++) {
1559       struct iovec *vp= &conn->xmit[done];
1560       XmitDetails *dp= &conn->xmitd[done];
1561       if (rs > vp->iov_len) {
1562         rs -= vp->iov_len;
1563         xmit_free(dp);
1564       } else {
1565         vp->iov_base= (char*)vp->iov_base + rs;
1566         vp->iov_len -= rs;
1567       }
1568     }
1569     int newu= conn->xmitu - done;
1570     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
1571     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1572     conn->xmitu= newu;
1573   }
1574 }
1575
1576 static void conn_make_some_xmits(Conn *conn) {
1577   for (;;) {
1578     if (conn->xmitu+5 > CONNIOVS)
1579       break;
1580
1581     Article *art= LIST_REMHEAD(conn->priority);
1582     if (!art) art= LIST_REMHEAD(conn->waiting);
1583     if (!art) break;
1584
1585     if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1586       /* actually send it */
1587
1588       ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1589
1590       art->state=
1591         art->state == art_Unchecked ? art_Unsolicited :
1592         art->state == art_Wanted    ? art_Wanted      :
1593         (abort(),-1);
1594
1595       if (!artdata) art->missing= 1;
1596       art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1597
1598       if (conn->stream) {
1599         if (artdata) {
1600           XMIT_LITERAL("TAKETHIS ");
1601           xmit_noalloc(conn, art->messageid, art->midlen);
1602           XMIT_LITERAL("\r\n");
1603           xmit_artbody(conn, artdata);
1604         } else {
1605           article_done(conn, art, -1);
1606           continue;
1607         }
1608       } else {
1609         /* we got 235 from IHAVE */
1610         if (artdata) {
1611           xmit_artbody(conn, artdata);
1612         } else {
1613           XMIT_LITERAL(".\r\n");
1614         }
1615       }
1616
1617       LIST_ADDTAIL(conn->sent, art);
1618
1619     } else {
1620       /* check it */
1621
1622       if (conn->stream)
1623         XMIT_LITERAL("CHECK ");
1624       else
1625         XMIT_LITERAL("IHAVE ");
1626       xmit_noalloc(conn, art->messageid, art->midlen);
1627       XMIT_LITERAL("\r\n");
1628
1629       assert(art->state == art_Unchecked);
1630       art->ipf->counts[art->state][RC_sent]++;
1631       LIST_ADDTAIL(conn->sent, art);
1632     }
1633   }
1634 }
1635
1636
1637 /*========== handling responses from peer ==========*/
1638
1639 static const oop_rd_style peer_rd_style= {
1640   OOP_RD_DELIM_STRIP, '\n',
1641   OOP_RD_NUL_FORBID,
1642   OOP_RD_SHORTREC_FORBID
1643 };
1644
1645 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1646                          const char *errmsg, int errnoval,
1647                          const char *data, size_t recsz, void *conn_v) {
1648   Conn *conn= conn_v;
1649   connfail(conn, "error receiving from peer: %s", errmsg);
1650   return OOP_CONTINUE;
1651 }
1652
1653 static Article *article_reply_check(Conn *conn, const char *response,
1654                                     int code_indicates_streaming,
1655                                     int must_have_sent
1656                                         /* 1:yes, -1:no, 0:dontcare */,
1657                                     const char *sanitised_response) {
1658   Article *art= LIST_HEAD(conn->sent);
1659
1660   if (!art) {
1661     connfail(conn,
1662              "peer gave unexpected response when no commands outstanding: %s",
1663              sanitised_response);
1664     return 0;
1665   }
1666
1667   if (code_indicates_streaming) {
1668     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1669     if (!conn->stream) {
1670       connfail(conn, "peer gave streaming response code "
1671                " to IHAVE or subsequent body: %s", sanitised_response);
1672       return 0;
1673     }
1674     const char *got_mid= response+4;
1675     int got_midlen= strcspn(got_mid, " \n\r");
1676     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1677       connfail(conn, "peer gave streaming response with syntactically invalid"
1678                " messageid: %s", sanitised_response);
1679       return 0;
1680     }
1681     if (got_midlen != art->midlen ||
1682         memcmp(got_mid, art->messageid, got_midlen)) {
1683       connfail(conn, "peer gave streaming response code to wrong article -"
1684                " probable synchronisation problem; we offered: %s;"
1685                " peer said: %s",
1686                art->messageid, sanitised_response);
1687       return 0;
1688     }
1689   } else {
1690     if (conn->stream) {
1691       connfail(conn, "peer gave non-streaming response code to"
1692                " CHECK/TAKETHIS: %s", sanitised_response);
1693       return 0;
1694     }
1695   }
1696
1697   if (must_have_sent>0 && art->state < art_Wanted) {
1698     connfail(conn, "peer says article accepted but"
1699              " we had not sent the body: %s", sanitised_response);
1700     return 0;
1701   }
1702   if (must_have_sent<0 && art->state >= art_Wanted) {
1703     connfail(conn, "peer says please sent the article but we just did: %s",
1704              sanitised_response);
1705     return 0;
1706   }
1707
1708   Article *art_again= LIST_REMHEAD(conn->sent);
1709   assert(art_again == art);
1710   return art;
1711 }
1712
1713 static void update_nocheck(int accepted) {
1714   accept_proportion *= nocheck_decay;
1715   accept_proportion += accepted * (1.0 - nocheck_decay);
1716   int new_nocheck= accept_proportion >= nocheck_thresh;
1717   if (new_nocheck && !nocheck_reported) {
1718     notice("entering nocheck mode for the first time");
1719     nocheck_reported= 1;
1720   } else if (new_nocheck != nocheck) {
1721     debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1722   }
1723   nocheck= new_nocheck;
1724 }
1725
1726 static void article_done(Conn *conn, Article *art, int whichcount) {
1727   if (!art->missing) art->ipf->counts[art->state][whichcount]++;
1728
1729   if (whichcount == RC_accepted) update_nocheck(1);
1730   else if (whichcount == RC_unwanted) update_nocheck(0);
1731
1732   InputFile *ipf= art->ipf;
1733
1734   while (art->blanklen) {
1735     static const char spaces[]=
1736       "                                                                "
1737       "                                                                "
1738       "                                                                "
1739       "                                                                "
1740       "                                                                "
1741       "                                                                "
1742       "                                                                "
1743       "                                                                "
1744       "                                                                ";
1745     int w= art->blanklen;  if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1746     int r= pwrite(ipf->fd, spaces, w, art->offset);
1747     if (r==-1) {
1748       if (errno==EINTR) continue;
1749       sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1750              art->messageid, art->blanklen,
1751              (unsigned long)art->offset, ipf->path);
1752     }
1753     assert(r>=0 && r<=w);
1754     art->blanklen -= w;
1755     art->offset += w;
1756   }
1757
1758   ipf->inprogress--;
1759   assert(ipf->inprogress >= 0);
1760   free(art);
1761
1762   if (!ipf->inprogress && ipf != main_input_file)
1763     queue_check_input_done();
1764 }
1765
1766 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1767                         const char *errmsg, int errnoval,
1768                         const char *data, size_t recsz, void *conn_v) {
1769   Conn *conn= conn_v;
1770
1771   if (ev == OOP_RD_EOF) {
1772     connfail(conn, "unexpected EOF from peer");
1773     return OOP_CONTINUE;
1774   }
1775   assert(ev == OOP_RD_OK);
1776
1777   char *sani= sanitise(data,-1);
1778
1779   char *ep;
1780   unsigned long code= strtoul(data, &ep, 10);
1781   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1782     connfail(conn, "badly formatted response from peer: %s", sani);
1783     return OOP_CONTINUE;
1784   }
1785
1786   int conn_busy=
1787     conn->waiting.count ||
1788     conn->priority.count ||
1789     conn->sent.count ||
1790     conn->xmitu;
1791
1792   if (conn->quitting) {
1793     if (code!=205 && code!=503) {
1794       connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1795     } else {
1796       notice("C%d idle connection closed by us", conn->fd);
1797       assert(!conn_busy);
1798       LIST_REMOVE(conns,conn);
1799       conn_dispose(conn);
1800     }
1801     return OOP_CONTINUE;
1802   }
1803
1804   conn->since_activity= 0;
1805   Article *art;
1806
1807 #define GET_ARTICLE(musthavesent) do{                                         \
1808     art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1809     if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
1810   }while(0) 
1811
1812 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{       \
1813     code_streaming= (streaming);                                \
1814     GET_ARTICLE(musthavesent);                                  \
1815     article_done(conn, art, RC_##how);                          \
1816     goto dealtwith;                                             \
1817   }while(0)
1818
1819 #define PEERBADMSG(m) do {                                      \
1820     connfail(conn, m ": %s", sani);  return OOP_CONTINUE;       \
1821   }while(0)
1822
1823   int code_streaming= 0;
1824
1825   switch (code) {
1826
1827   case 400: PEERBADMSG("peer stopped accepting articles");
1828   default:  PEERBADMSG("peer sent unexpected message");
1829
1830   case 503:
1831     if (conn_busy) PEERBADMSG("peer timed us out");
1832     notice("C%d idle connection closed by peer", conn->fd);
1833     LIST_REMOVE(conns,conn);
1834     conn_dispose(conn);
1835     return OOP_CONTINUE;
1836
1837   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1838   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1839
1840   case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1841   case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1842
1843   case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1844   case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1845
1846   case 238: /* CHECK says send it */
1847     code_streaming= 1;
1848   case 335: /* IHAVE says send it */
1849     GET_ARTICLE(-1);
1850     assert(art->state == art_Unchecked);
1851     art->ipf->counts[art->state][RC_accepted]++;
1852     art->state= art_Wanted;
1853     LIST_ADDTAIL(conn->priority, art);
1854     break;
1855
1856   case 431: /* CHECK or TAKETHIS says try later */
1857     code_streaming= 1;
1858   case 436: /* IHAVE says try later */
1859     GET_ARTICLE(0);
1860     open_defer();
1861     if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1862         || fflush(defer))
1863       sysfatal("write to defer file %s",path_defer);
1864     article_done(conn, art, RC_deferred);
1865     break;
1866
1867   }
1868 dealtwith:
1869
1870   conn_maybe_write(conn);
1871   check_assign_articles();
1872   return OOP_CONTINUE;
1873 }
1874
1875
1876 /*========== monitoring of input files ==========*/
1877
1878 static void feedfile_eof(InputFile *ipf) {
1879   assert(ipf != main_input_file); /* promised by tailing_try_read */
1880   inputfile_reading_stop(ipf);
1881
1882   if (ipf == flushing_input_file) {
1883     assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1884     if (main_input_file) inputfile_reading_start(main_input_file);
1885     statemc_check_flushing_done();
1886   } else if (ipf == backlog_input_file) {
1887     statemc_check_backlog_done();
1888   } else {
1889     abort(); /* supposed to wait rather than get EOF on main input file */
1890   }
1891 }
1892
1893 static InputFile *open_input_file(const char *path) {
1894   int fd= open(path, O_RDWR);
1895   if (fd<0) {
1896     if (errno==ENOENT) return 0;
1897     sysfatal("unable to open input file %s", path);
1898   }
1899   assert(fd>0);
1900
1901   InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
1902   memset(ipf,0,sizeof(*ipf));
1903
1904   ipf->fd= fd;
1905   strcpy(ipf->path, path);
1906
1907   return ipf;
1908 }
1909
1910 static void close_input_file(InputFile *ipf) { /* does not free */
1911   assert(!ipf->readable_callback); /* must have had ->on_cancel */
1912   assert(!ipf->filemon); /* must have had inputfile_reading_stop */
1913   assert(!ipf->rd); /* must have had inputfile_reading_stop */
1914   assert(!ipf->inprogress); /* no dangling pointers pointing here */
1915   xclose_perhaps(&ipf->fd, "input file ", ipf->path);
1916 }
1917
1918
1919 /*---------- dealing with articles read in the input file ----------*/
1920
1921 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
1922                                    const char *data, const char *how) {
1923   warn("corrupted file: %s, offset %lu: %s: in %s",
1924        ipf->path, (unsigned long)offset, how, sanitise(data,-1));
1925   ipf->readcount_err++;
1926   if (ipf->readcount_err > max_bad_data_initial +
1927       (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
1928     die("too much garbage in input file!  (%d errs, %d ok, %d blank)",
1929         ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
1930   return OOP_CONTINUE;
1931 }
1932
1933 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
1934                                oop_rd_event ev, const char *errmsg,
1935                                int errnoval, const char *data, size_t recsz,
1936                                void *ipf_v) {
1937   InputFile *ipf= ipf_v;
1938   assert(ev == OOP_RD_SYSTEM);
1939   errno= errnoval;
1940   sysdie("error reading input file: %s, offset %lu",
1941          ipf->path, (unsigned long)ipf->offset);
1942 }
1943
1944 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
1945                                   oop_rd_event ev, const char *errmsg,
1946                                   int errnoval, const char *data, size_t recsz,
1947                                   void *ipf_v) {
1948   InputFile *ipf= ipf_v;
1949   Article *art;
1950   char tokentextbuf[sizeof(TOKEN)*2+3];
1951
1952   if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
1953
1954   off_t old_offset= ipf->offset;
1955   ipf->offset += recsz + !!(ev == OOP_RD_OK);
1956
1957 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
1958
1959   if (ev==OOP_RD_PARTREC)
1960     feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
1961     /* but process it anyway */
1962
1963   if (ipf->skippinglong) {
1964     if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
1965     return OOP_CONTINUE;
1966   }
1967   if (ev==OOP_RD_LONG) {
1968     ipf->skippinglong= 1;
1969     X_BAD_DATA("overly long line");
1970   }
1971
1972   if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
1973   if (!recsz) X_BAD_DATA("empty line");
1974
1975   if (data[0]==' ') {
1976     if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
1977     ipf->readcount_blank++;
1978     return OOP_CONTINUE;
1979   }
1980
1981   char *space= strchr(data,' ');
1982   int tokenlen= space-data;
1983   int midlen= (int)recsz-tokenlen-1;
1984   if (midlen <= 2) X_BAD_DATA("no room for messageid");
1985   if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
1986
1987   if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
1988   memcpy(tokentextbuf, data, tokenlen);
1989   tokentextbuf[tokenlen]= 0;
1990   if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
1991
1992   ipf->readcount_ok++;
1993
1994   art= xmalloc(sizeof(*art) - 1 + midlen + 1);
1995   memset(art,0,sizeof(*art));
1996   art->state= art_Unchecked;
1997   art->midlen= midlen;
1998   art->ipf= ipf;  ipf->inprogress++;
1999   art->token= TextToToken(tokentextbuf);
2000   art->offset= old_offset;
2001   art->blanklen= recsz;
2002   strcpy(art->messageid, space+1);
2003   LIST_ADDTAIL(queue, art);
2004
2005   if (sms==sm_NORMAL && ipf==main_input_file &&
2006       ipf->offset >= target_max_feedfile_size)
2007     statemc_start_flush("feed file size");
2008
2009   check_assign_articles();
2010   return OOP_CONTINUE;
2011 }
2012
2013 /*========== tailing input file ==========*/
2014
2015 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
2016                                      void *user) {
2017   InputFile *ipf= user;
2018   return ipf->readable_callback(loop, &ipf->readable,
2019                                 ipf->readable_callback_user);
2020 }
2021
2022 static void tailing_on_cancel(struct oop_readable *rable) {
2023   InputFile *ipf= (void*)rable;
2024
2025   if (ipf->filemon) filemon_stop(ipf);
2026   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2027   ipf->readable_callback= 0;
2028 }
2029
2030 static void tailing_queue_readable(InputFile *ipf) {
2031   /* lifetime of ipf here is OK because destruction will cause
2032    * on_cancel which will cancel this callback */
2033   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2034 }
2035
2036 static int tailing_on_readable(struct oop_readable *rable,
2037                                 oop_readable_call *cb, void *user) {
2038   InputFile *ipf= (void*)rable;
2039
2040   tailing_on_cancel(rable);
2041   ipf->readable_callback= cb;
2042   ipf->readable_callback_user= user;
2043   filemon_start(ipf);
2044
2045   tailing_queue_readable(ipf);
2046   return 0;
2047 }
2048
2049 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2050                                 size_t length) {
2051   InputFile *ipf= (void*)rable;
2052   for (;;) {
2053     ssize_t r= read(ipf->fd, buffer, length);
2054     if (r==-1) {
2055       if (errno==EINTR) continue;
2056       return r;
2057     }
2058     if (!r) {
2059       if (ipf==main_input_file) {
2060         errno=EAGAIN;
2061         return -1;
2062       } else if (ipf==flushing_input_file) {
2063         assert(ipf->rd);
2064         assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2065       } else if (ipf==backlog_input_file) {
2066         assert(ipf->rd);
2067       } else {
2068         abort();
2069       }
2070     }
2071     tailing_queue_readable(ipf);
2072     return r;
2073   }
2074 }
2075
2076 /*---------- filemon implemented with inotify ----------*/
2077
2078 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2079 #define HAVE_FILEMON
2080
2081 #include <sys/inotify.h>
2082
2083 static int filemon_inotify_fd;
2084 static int filemon_inotify_wdmax;
2085 static InputFile **filemon_inotify_wd2ipf;
2086
2087 struct Filemon_Perfile {
2088   int wd;
2089 };
2090
2091 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2092   int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2093   if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2094
2095   if (wd >= filemon_inotify_wdmax) {
2096     int newmax= wd+2;
2097     filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2098                                  sizeof(*filemon_inotify_wd2ipf) * newmax);
2099     memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2100            sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2101     filemon_inotify_wdmax= newmax;
2102   }
2103
2104   assert(!filemon_inotify_wd2ipf[wd]);
2105   filemon_inotify_wd2ipf[wd]= ipf;
2106
2107   debug("filemon inotify startfile %p wd=%d wdmax=%d",
2108         ipf, wd, filemon_inotify_wdmax);
2109
2110   pf->wd= wd;
2111 }
2112
2113 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2114   int wd= pf->wd;
2115   debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2116   int r= inotify_rm_watch(filemon_inotify_fd, wd);
2117   if (r) sysdie("inotify_rm_watch");
2118   filemon_inotify_wd2ipf[wd]= 0;
2119 }
2120
2121 static void *filemon_inotify_readable(oop_source *lp, int fd,
2122                                       oop_event e, void *u) {
2123   struct inotify_event iev;
2124   for (;;) {
2125     int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2126     if (r==-1) {
2127       if (isewouldblock(errno)) break;
2128       sysdie("read from inotify master");
2129     } else if (r==sizeof(iev)) {
2130       assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2131     } else {
2132       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2133     }
2134     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2135     debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2136     filemon_callback(ipf);
2137   }
2138   return OOP_CONTINUE;
2139 }
2140
2141 static int filemon_method_init(void) {
2142   filemon_inotify_fd= inotify_init();
2143   if (filemon_inotify_fd<0) {
2144     syswarn("filemon/inotify: inotify_init failed");
2145     return 0;
2146   }
2147   xsetnonblock(filemon_inotify_fd, 1);
2148   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2149
2150   debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2151   return 1;
2152 }
2153
2154 static void filemon_method_dump_info(FILE *f) {
2155   int i;
2156   fprintf(f,"inotify");
2157   DUMPV("%d",,filemon_inotify_fd);
2158   DUMPV("%d",,filemon_inotify_wdmax);
2159   for (i=0; i<filemon_inotify_wdmax; i++)
2160     fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i],);
2161 }
2162
2163 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2164
2165 /*---------- filemon dummy implementation ----------*/
2166
2167 #if !defined(HAVE_FILEMON)
2168
2169 struct Filemon_Perfile { int dummy; };
2170
2171 static int filemon_method_init(void) {
2172   warn("filemon/dummy: no filemon method compiled in");
2173   return 0;
2174 }
2175 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2176 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2177 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2178
2179 #endif /* !HAVE_FILEMON */
2180
2181 /*---------- filemon generic interface ----------*/
2182
2183 static void filemon_start(InputFile *ipf) {
2184   assert(!ipf->filemon);
2185
2186   NEW(ipf->filemon);
2187   filemon_method_startfile(ipf, ipf->filemon);
2188 }
2189
2190 static void filemon_stop(InputFile *ipf) {
2191   if (!ipf->filemon) return;
2192   filemon_method_stopfile(ipf, ipf->filemon);
2193   free(ipf->filemon);
2194   ipf->filemon= 0;
2195 }
2196
2197 static void filemon_callback(InputFile *ipf) {
2198   if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2199     ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2200 }
2201
2202 /*---------- interface to start and stop an input file ----------*/
2203
2204 static const oop_rd_style feedfile_rdstyle= {
2205   OOP_RD_DELIM_STRIP, '\n',
2206   OOP_RD_NUL_PERMIT,
2207   OOP_RD_SHORTREC_LONG,
2208 };
2209
2210 static void inputfile_reading_start(InputFile *ipf) {
2211   assert(!ipf->rd);
2212   ipf->readable.on_readable= tailing_on_readable;
2213   ipf->readable.on_cancel=   tailing_on_cancel;
2214   ipf->readable.try_read=    tailing_try_read;
2215   ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2216   ipf->readable.delete_kill= 0;
2217
2218   ipf->readable_callback= 0;
2219   ipf->readable_callback_user= 0;
2220
2221   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2222   assert(ipf->rd);
2223
2224   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2225                      feedfile_got_article,ipf, feedfile_read_err, ipf);
2226   if (r) sysdie("unable start reading feedfile %s",ipf->path);
2227 }
2228
2229 static void inputfile_reading_stop(InputFile *ipf) {
2230   assert(ipf->rd);
2231   oop_rd_cancel(ipf->rd);
2232   oop_rd_delete(ipf->rd);
2233   ipf->rd= 0;
2234   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2235 }
2236
2237
2238 /*========== interaction with innd - state machine ==========*/
2239
2240 /* See official state diagram at top of file.  We implement
2241  * this as follows:
2242  * -8<-
2243
2244             .=======.
2245             ||START||
2246             `======='
2247                 |
2248                 | open F
2249                 |
2250                 |    F ENOENT
2251                 |`---------------------------------------------------.
2252       F OPEN OK |                                                    |
2253                 |`---------------- - - -                             |
2254        D ENOENT |       D EXISTS   see OVERALL STATES diagram        |
2255                 |                  for full startup logic            |
2256      ,--------->|                                                    |
2257      |          V                                                    |
2258      |     ============                                       try to |
2259      |      NORMAL                                            open D |
2260      |     [Normal]                                                  |
2261      |      main F tail                                              |
2262      |     ============                                              V
2263      |          |                                                    |
2264      |          | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT            |
2265      ^          | hardlink F to D                                    |
2266      |     [Hardlinked]                                              |
2267      |          | unlink F                                           |
2268      |          | our handle onto F is now onto D                    |
2269      |     [Moved]                                                   |
2270      |          |                                                    |
2271      |          |<-------------------<---------------------<---------+
2272      |          |                                                    |
2273      |          | spawn inndcomm flush                               |
2274      |          V                                                    |
2275      |     ==================                                        |
2276      |      FLUSHING[-ABSENT]                                        |
2277      |     [Flushing]                                                |
2278      |     main D tail/none                                          |
2279      |     ==================                                        |
2280      |          |                                                    |
2281      |          |   INNDCOMM FLUSH FAILS                             ^
2282      |          |`----------------------->----------.                |
2283      |          |                                   |                |
2284      |          |   NO SUCH SITE                    V                |
2285      ^          |`--------------->----.         ==================== |
2286      |          |                      \        FLUSHFAILED[-ABSENT] |
2287      |          |                       \         [Moved]            |
2288      |          | FLUSH OK               \       main D tail/none    |
2289      |          | open F                  \     ==================== |
2290      |          |                          \        |                |
2291      |          |                           \       | TIME TO RETRY  |
2292      |          |`------->----.     ,---<---'\      `----------------'
2293      |          |    D NONE   |     | D NONE  `----.
2294      |          V             |     |              V
2295      |     =============      V     V             ============
2296      |      SEPARATED-1       |     |              DROPPING-1
2297      |      flsh->rd!=0       |     |              flsh->rd!=0
2298      |     [Separated]        |     |             [Dropping]
2299      |      main F idle       |     |              main none
2300      |      flsh D tail       |     |              flsh D tail
2301      |     =============      |     |             ============
2302      |          |             |     | install       |
2303      ^          | EOF ON D    |     |  defer        | EOF ON D
2304      |          V             |     |               V
2305      |     ===============    |     |             ===============
2306      |      SEPARATED-2       |     |              DROPPING-2
2307      |      flsh->rd==0       |     V              flsh->rd==0
2308      |     [Finishing]        |     |             [Dropping]
2309      |      main F tail       |     `.             main none
2310      |      flsh D closed     |       `.           flsh D closed
2311      |     ===============    V         `.        ===============
2312      |          |                         `.          |
2313      |          | ALL D PROCESSED           `.        | ALL D PROCESSED
2314      |          V install defer as backlog    `.      | install defer
2315      ^          | close D                       `.    | close D
2316      |          | unlink D                        `.  | unlink D
2317      |          |                                  |  |
2318      |          |                                  V  V
2319      `----------'                               ==============
2320                                                  DROPPED
2321                                                 [Dropped]
2322                                                  main none
2323                                                  flsh none
2324                                                  some backlog
2325                                                 ==============
2326                                                       |
2327                                                       | ALL BACKLOG DONE
2328                                                       |
2329                                                       | unlink lock
2330                                                       | exit
2331                                                       V
2332                                                   ==========
2333                                                    (ESRCH)
2334                                                   [Droppped]
2335                                                   ==========
2336  * ->8-
2337  */
2338
2339 static void startup_set_input_file(InputFile *f) {
2340   assert(!main_input_file);
2341   main_input_file= f;
2342   inputfile_reading_start(f);
2343 }
2344
2345 static void statemc_lock(void) {
2346   int lockfd;
2347   struct stat stab, stabf;
2348   
2349   for (;;) {
2350     lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2351     if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2352
2353     struct flock fl;
2354     memset(&fl,0,sizeof(fl));
2355     fl.l_type= F_WRLCK;
2356     fl.l_whence= SEEK_SET;
2357     int r= fcntl(lockfd, F_SETLK, &fl);
2358     if (r==-1) {
2359       if (errno==EACCES || isewouldblock(errno)) {
2360         if (quiet_multiple) exit(0);
2361         fatal("another duct holds the lockfile");
2362       }
2363       sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2364     }
2365
2366     xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2367     int lock_noent;
2368     xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2369
2370     if (!lock_noent && samefile(&stab, &stabf))
2371       break;
2372
2373     xclose(lockfd, "stale lockfile ", path_lock);
2374   }
2375
2376   FILE *lockfile= fdopen(lockfd, "w");
2377   if (!lockfile) sysdie("fdopen lockfile");
2378
2379   int r= ftruncate(lockfd, 0);
2380   if (r) sysdie("truncate lockfile to write new info");
2381
2382   if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2383               (unsigned long)self_pid,
2384               sitename, feedfile, remote_host) == EOF ||
2385       fflush(lockfile))
2386     sysfatal("write info to lockfile %s", path_lock);
2387
2388   debug("startup: locked");
2389 }
2390
2391 static void statemc_init(void) {
2392   struct stat stabdefer;
2393
2394   search_backlog_file();
2395
2396   int defer_noent;
2397   xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2398   if (defer_noent) {
2399     debug("startup: ductdefer ENOENT");
2400   } else {
2401     debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2402     switch (stabdefer.st_nlink==1) {
2403     case 1:
2404       open_defer(); /* so that we will later close it and rename it */
2405       break;
2406     case 2:
2407       xunlink(path_defer, "stale defer file link"
2408               " (presumably hardlink to backlog file)");
2409       break;
2410     default:
2411       die("defer file %s has unexpected link count %d",
2412           path_defer, stabdefer.st_nlink);
2413     }
2414   }
2415
2416   struct stat stab_f, stab_d;
2417   int noent_f;
2418
2419   InputFile *file_d= open_input_file(path_flushing);
2420   if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2421
2422   xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2423
2424   if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2425     debug("startup: F==D => Hardlinked");
2426     xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2427     noent_f= 1;
2428   }
2429
2430   if (noent_f) {
2431     debug("startup: F ENOENT => Moved");
2432     if (file_d) startup_set_input_file(file_d);
2433     spawn_inndcomm_flush("feedfile missing at startup");
2434     /* => Flushing, sms:=FLUSHING */
2435   } else {
2436     if (file_d) {
2437       debug("startup: F!=D => Separated");
2438       startup_set_input_file(file_d);
2439       flushing_input_file= main_input_file;
2440       main_input_file= open_input_file(feedfile);
2441       if (!main_input_file) die("feedfile vanished during startup");
2442       SMS(SEPARATED, 0, "found both old and current feed files");
2443     } else {
2444       debug("startup: F exists, D ENOENT => Normal");
2445       InputFile *file_f= open_input_file(feedfile);
2446       if (!file_f) die("feed file vanished during startup");
2447       startup_set_input_file(file_f);
2448       SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2449     }
2450   }
2451 }
2452
2453 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2454   assert(sms == sm_NORMAL);
2455
2456   debug("starting flush (%s) (%lu >?= %lu) (%d)",
2457         why,
2458         (unsigned long)(main_input_file ? main_input_file->offset : 0),
2459         (unsigned long)target_max_feedfile_size,
2460         until_flush);
2461
2462   int r= link(feedfile, path_flushing);
2463   if (r) sysfatal("link feedfile %s to flushing file %s",
2464                   feedfile, path_flushing);
2465   /* => Hardlinked */
2466
2467   xunlink(feedfile, "old feedfile link");
2468   /* => Moved */
2469
2470   spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2471 }
2472
2473 static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
2474   switch (sms) {
2475   case sm_NORMAL:
2476     statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2477     return 1;
2478   case sm_FLUSHFAILED:
2479     spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2480     return 1;
2481   default:
2482     return 0;
2483   }
2484 }
2485
2486 static void statemc_period_poll(void) {
2487   if (!until_flush) return;
2488   until_flush--;
2489   assert(until_flush>=0);
2490
2491   if (until_flush) return;
2492   int ok= trigger_flush_ok();
2493   assert(ok);
2494 }
2495
2496 static int inputfile_is_done(InputFile *ipf) {
2497   if (!ipf) return 0;
2498   if (ipf->inprogress) return 0; /* new article in the meantime */
2499   if (ipf->rd) return 0; /* not had EOF */
2500   return 1;
2501 }
2502
2503 static void notice_processed(InputFile *ipf, int completed,
2504                              const char *what, const char *spec) {
2505   if (!ipf) return; /* allows preterminate to be lazy */
2506
2507 #define RCI_NOTHING(x) /* nothing */
2508 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2509 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2510
2511 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2512
2513   char *inprog= completed
2514     ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2515     : xasprintf(" inprogress=%ld", ipf->inprogress);
2516
2517   info("%s %s%s read=%d (+bl=%d,+err=%d)%s"
2518        " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2519        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2520        ,
2521        completed?"completed":"processed", what, spec,
2522        ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, inprog,
2523        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2524        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2525        CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2526        , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2527        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
2528        );
2529
2530   free(inprog);
2531
2532 #undef CNT
2533 }
2534
2535 static void statemc_check_backlog_done(void) {
2536   InputFile *ipf= backlog_input_file;
2537   if (!inputfile_is_done(ipf)) return;
2538
2539   const char *slash= strrchr(ipf->path, '/');
2540   const char *leaf= slash ? slash+1 : ipf->path;
2541   const char *under= strchr(slash, '_');
2542   const char *rest= under ? under+1 : leaf;
2543   if (!strncmp(rest,"backlog",7)) rest += 7;
2544   notice_processed(ipf,1,"backlog ",rest);
2545
2546   close_input_file(ipf);
2547   if (unlink(ipf->path)) {
2548     if (errno != ENOENT)
2549       sysdie("could not unlink processed backlog file %s", ipf->path);
2550     warn("backlog file %s vanished while we were reading it"
2551          " so we couldn't remove it (but it's done now, anyway)",
2552          ipf->path);
2553   }
2554   free(ipf);
2555   backlog_input_file= 0;
2556   search_backlog_file();
2557   return;
2558 }
2559
2560 static void statemc_check_flushing_done(void) {
2561   InputFile *ipf= flushing_input_file;
2562   if (!inputfile_is_done(ipf)) return;
2563
2564   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2565
2566   notice_processed(ipf,1,"feedfile","");
2567
2568   close_defer();
2569
2570   xunlink(path_flushing, "old flushing file");
2571
2572   close_input_file(flushing_input_file);
2573   free(flushing_input_file);
2574   flushing_input_file= 0;
2575
2576   if (sms==sm_SEPARATED) {
2577     notice("flush complete");
2578     SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2579   } else if (sms==sm_DROPPING) {
2580     SMS(DROPPED, 0, "old flush complete");
2581     search_backlog_file();
2582     notice("feed dropped, but will continue until backlog is finished");
2583   }
2584 }
2585
2586 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2587                                       void *u) {
2588   assert(!inputfile_is_done(main_input_file));
2589   statemc_check_flushing_done();
2590   statemc_check_backlog_done();
2591   return OOP_CONTINUE;
2592 }
2593
2594 static void queue_check_input_done(void) {
2595   loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2596 }
2597
2598 static void statemc_setstate(StateMachineState newsms, int periods,
2599                              const char *forlog, const char *why) {
2600   sms= newsms;
2601   until_flush= periods;
2602
2603   const char *xtra= "";
2604   switch (sms) {
2605   case sm_FLUSHING:
2606   case sm_FLUSHFAILED:
2607     if (!main_input_file) xtra= "-ABSENT";
2608     break;
2609   case sm_SEPARATED:
2610   case sm_DROPPING:
2611     xtra= flushing_input_file->rd ? "-1" : "-2";
2612     break;
2613   default:;
2614   }
2615
2616   if (periods) {
2617     info("state %s%s[%d] %s",forlog,xtra,periods,why);
2618   } else {
2619     info("state %s%s %s",forlog,xtra,why);
2620   }
2621 }
2622
2623 /*---------- defer and backlog files ----------*/
2624
2625 static void open_defer(void) {
2626   struct stat stab;
2627
2628   if (defer) return;
2629
2630   defer= fopen(path_defer, "a+");
2631   if (!defer) sysfatal("could not open defer file %s", path_defer);
2632
2633   /* truncate away any half-written records */
2634
2635   xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2636
2637   if (stab.st_size > LONG_MAX)
2638     die("defer file %s size is far too large", path_defer);
2639
2640   if (!stab.st_size)
2641     return;
2642
2643   long orgsize= stab.st_size;
2644   long truncto= stab.st_size;
2645   for (;;) {
2646     if (!truncto) break; /* was only (if anything) one half-truncated record */
2647     if (fseek(defer, truncto-1, SEEK_SET) < 0)
2648       sysdie("seek in defer file %s while truncating partial", path_defer);
2649
2650     int r= getc(defer);
2651     if (r==EOF) {
2652       if (ferror(defer))
2653         sysdie("failed read from defer file %s", path_defer);
2654       else
2655         die("defer file %s shrank while we were checking it!", path_defer);
2656     }
2657     if (r=='\n') break;
2658     truncto--;
2659   }
2660
2661   if (stab.st_size != truncto) {
2662     warn("truncating half-record at end of defer file %s -"
2663          " shrinking by %ld bytes from %ld to %ld",
2664          path_defer, orgsize - truncto, orgsize, truncto);
2665
2666     if (fflush(defer))
2667       sysfatal("could not flush defer file %s", path_defer);
2668     if (ftruncate(fileno(defer), truncto))
2669       sysdie("could not truncate defer file %s", path_defer);
2670
2671   } else {
2672     info("continuing existing defer file %s (%ld bytes)",
2673          path_defer, orgsize);
2674   }
2675   if (fseek(defer, truncto, SEEK_SET))
2676     sysdie("could not seek to new end of defer file %s", path_defer);
2677 }
2678
2679 static void close_defer(void) {
2680   if (!defer)
2681     return;
2682
2683   struct stat stab;
2684   xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2685
2686   if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2687   defer= 0;
2688
2689   time_t now= xtime();
2690
2691   char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2692                            (unsigned long)now,
2693                            (unsigned long)stab.st_ino);
2694   if (link(path_defer, backlog))
2695     sysfatal("could not install defer file %s as backlog file %s",
2696            path_defer, backlog);
2697   if (unlink(path_defer))
2698     sysdie("could not unlink old defer link %s to backlog file %s",
2699            path_defer, backlog);
2700
2701   free(backlog);
2702
2703   if (until_backlog_nextscan < 0 ||
2704       until_backlog_nextscan > backlog_retry_minperiods + 1)
2705     until_backlog_nextscan= backlog_retry_minperiods + 1;
2706 }
2707
2708 static void poll_backlog_file(void) {
2709   if (until_backlog_nextscan < 0) return;
2710   if (until_backlog_nextscan-- > 0) return;
2711   search_backlog_file();
2712 }
2713
2714 static void search_backlog_file(void) {
2715   /* returns non-0 iff there are any backlog files */
2716
2717   glob_t gl;
2718   int r, i;
2719   struct stat stab;
2720   const char *oldest_path=0;
2721   time_t oldest_mtime=0, now;
2722
2723   if (backlog_input_file) return;
2724
2725  try_again:
2726
2727   r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2728
2729   switch (r) {
2730   case GLOB_ABORTED:
2731     sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2732   case GLOB_NOSPACE:
2733     fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2734   case 0:
2735     for (i=0; i<gl.gl_pathc; i++) {
2736       const char *path= gl.gl_pathv[i];
2737
2738       if (strchr(path,'#') || strchr(path,'~')) {
2739         debug("backlog file search skipping %s", path);
2740         continue;
2741       }
2742       r= stat(path, &stab);
2743       if (r) {
2744         syswarn("failed to stat backlog file %s", path);
2745         continue;
2746       }
2747       if (!S_ISREG(stab.st_mode)) {
2748         warn("backlog file %s is not a plain file (or link to one)", path);
2749         continue;
2750       }
2751       if (!oldest_path || stab.st_mtime < oldest_mtime) {
2752         oldest_path= path;
2753         oldest_mtime= stab.st_mtime;
2754       }
2755     }
2756   case GLOB_NOMATCH: /* fall through */
2757     break;
2758   default:
2759     sysdie("glob expansion of backlog pattern %s gave unexpected"
2760            " nonzero (error?) return value %d", globpat_backlog, r);
2761   }
2762
2763   if (!oldest_path) {
2764     debug("backlog scan: none");
2765
2766     if (sms==sm_DROPPED) {
2767       preterminate();
2768       notice("feed dropped and our work is complete");
2769
2770       int r= unlink(path_control);
2771       if (r && errno!=ENOENT)
2772         syswarn("failed to remove control symlink for old feed");
2773
2774       xunlink(path_lock,    "lockfile for old feed");
2775       exit(4);
2776     }
2777     until_backlog_nextscan= backlog_spontrescan_periods;
2778     goto xfree;
2779   }
2780
2781   now= xtime();
2782   double age= difftime(now, oldest_mtime);
2783   long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2784
2785   if (age_deficiency <= 0) {
2786     debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2787           age, age_deficiency, oldest_path);
2788
2789     backlog_input_file= open_input_file(oldest_path);
2790     if (!backlog_input_file) {
2791       warn("backlog file %s vanished as we opened it", oldest_path);
2792       globfree(&gl);
2793       goto try_again;
2794     }
2795     inputfile_reading_start(backlog_input_file);
2796     until_backlog_nextscan= -1;
2797     goto xfree;
2798   }
2799
2800   until_backlog_nextscan= age_deficiency / period_seconds;
2801
2802   if (backlog_spontrescan_periods >= 0 &&
2803       until_backlog_nextscan > backlog_spontrescan_periods)
2804     until_backlog_nextscan= backlog_spontrescan_periods;
2805
2806   debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2807         age, age_deficiency, until_backlog_nextscan, oldest_path);
2808
2809  xfree:
2810   globfree(&gl);
2811   return;
2812 }
2813
2814 /*---------- shutdown and signal handling ----------*/
2815
2816 static void preterminate(void) {
2817   if (in_child) return;
2818   notice_processed(main_input_file,0,"feedfile","");
2819   notice_processed(flushing_input_file,0,"flushing","");
2820   if (backlog_input_file)
2821     notice_processed(backlog_input_file,0, "backlog file ",
2822                      backlog_input_file->path);
2823 }
2824
2825 static int signal_self_pipe[2];
2826 static sig_atomic_t terminate_sig_flag;
2827
2828 static void raise_default(int signo) {
2829   xsigsetdefault(signo);
2830   raise(signo);
2831   abort();
2832 }
2833
2834 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2835   assert(fd=signal_self_pipe[0]);
2836   char buf[PIPE_BUF];
2837   int r= read(signal_self_pipe[0], buf, sizeof(buf));
2838   if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2839   if (r==0) die("eof on signal self pipe");
2840   if (terminate_sig_flag) {
2841     preterminate();
2842     notice("terminating (%s)", strsignal(terminate_sig_flag));
2843     raise_default(terminate_sig_flag);
2844   }
2845   return OOP_CONTINUE;
2846 }
2847
2848 static void sigarrived_handler(int signum) {
2849   static char x;
2850   switch (signum) {
2851   case SIGTERM:
2852   case SIGINT:
2853     if (!terminate_sig_flag) terminate_sig_flag= signum;
2854     break;
2855   default:
2856     abort();
2857   }
2858   write(signal_self_pipe[1],&x,1);
2859 }
2860
2861 static void init_signals(void) {
2862   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
2863     sysdie("could not ignore SIGPIPE");
2864
2865   if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
2866
2867   xsetnonblock(signal_self_pipe[0],1);
2868   xsetnonblock(signal_self_pipe[1],1);
2869
2870   struct sigaction sa;
2871   memset(&sa,0,sizeof(sa));
2872   sa.sa_handler= sigarrived_handler;
2873   sa.sa_flags= SA_RESTART;
2874   xsigaction(SIGTERM,&sa);
2875   xsigaction(SIGINT,&sa);
2876
2877   on_fd_read_except(signal_self_pipe[0], sigarrived_event);
2878 }
2879
2880 /*========== flushing the feed ==========*/
2881
2882 static pid_t inndcomm_child;
2883 static int inndcomm_sentinel_fd;
2884
2885 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
2886   assert(inndcomm_child);
2887   assert(fd == inndcomm_sentinel_fd);
2888   int status= xwaitpid(&inndcomm_child, "inndcomm");
2889   inndcomm_child= 0;
2890   
2891   cancel_fd_read_except(fd);
2892   xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
2893   inndcomm_sentinel_fd= 0;
2894
2895   assert(!flushing_input_file);
2896
2897   if (WIFEXITED(status)) {
2898     switch (WEXITSTATUS(status)) {
2899
2900     case INNDCOMMCHILD_ESTATUS_FAIL:
2901       goto failed;
2902
2903     case INNDCOMMCHILD_ESTATUS_NONESUCH:
2904       notice("feed has been dropped by innd, finishing up");
2905       flushing_input_file= main_input_file;
2906       tailing_queue_readable(flushing_input_file);
2907         /* we probably previously returned EAGAIN from our fake read method
2908          * when in fact we were at EOF, so signal another readable event
2909          * so we actually see the EOF */
2910
2911       main_input_file= 0;
2912
2913       if (flushing_input_file) {
2914         SMS(DROPPING, 0, "feed dropped by innd, but must finish last flush");
2915       } else {
2916         close_defer();
2917         SMS(DROPPED, 0, "feed dropped by innd");
2918         search_backlog_file();
2919       }
2920       return OOP_CONTINUE;
2921
2922     case 0:
2923       /* as above */
2924       flushing_input_file= main_input_file;
2925       tailing_queue_readable(flushing_input_file);
2926
2927       main_input_file= open_input_file(feedfile);
2928       if (!main_input_file)
2929         die("flush succeeded but feedfile %s does not exist!", feedfile);
2930
2931       if (flushing_input_file) {
2932         SMS(SEPARATED, spontaneous_flush_periods, "recovery flush complete");
2933       } else {
2934         close_defer();
2935         SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2936       }
2937       return OOP_CONTINUE;
2938
2939     default:
2940       goto unexpected_exitstatus;
2941
2942     }
2943   } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
2944     warn("flush timed out trying to talk to innd");
2945     goto failed;
2946   } else {
2947   unexpected_exitstatus:
2948     report_child_status("inndcomm child", status);
2949   }
2950
2951  failed:
2952   SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
2953   return OOP_CONTINUE;
2954 }
2955
2956 static void inndcommfail(const char *what) {
2957   syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
2958   exit(INNDCOMMCHILD_ESTATUS_FAIL);
2959 }
2960
2961 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
2962   int pipefds[2];
2963
2964   notice("flushing %s",why);
2965
2966   assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
2967   assert(!inndcomm_child);
2968   assert(!inndcomm_sentinel_fd);
2969
2970   if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
2971
2972   inndcomm_child= xfork("inndcomm child");
2973
2974   if (!inndcomm_child) {
2975     const char *flushargv[2]= { sitename, 0 };
2976     char *reply;
2977     int r;
2978
2979     xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
2980     /* parent spots the autoclose of pipefds[1] when we die or exit */
2981
2982     if (simulate_flush>=0) {
2983       warn("SIMULATING flush child status %d", simulate_flush);
2984       if (simulate_flush>128) raise(simulate_flush-128);
2985       else exit(simulate_flush);
2986     }
2987
2988     alarm(inndcomm_flush_timeout);
2989     r= ICCopen();                         if (r)   inndcommfail("connect");
2990     r= ICCcommand('f',flushargv,&reply);  if (r<0) inndcommfail("transmit");
2991     if (!r) exit(0); /* yay! */
2992
2993     if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
2994     syswarn("innd ctlinnd flush failed: innd said %s", reply);
2995     exit(INNDCOMMCHILD_ESTATUS_FAIL);
2996   }
2997
2998   simulate_flush= -1;
2999
3000   xclose(pipefds[1], "inndcomm sentinel child's end",0);
3001   inndcomm_sentinel_fd= pipefds[0];
3002   assert(inndcomm_sentinel_fd);
3003   on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
3004
3005   SMS(FLUSHING, 0, why);
3006 }
3007
3008 /*========== main program ==========*/
3009
3010 static void postfork_inputfile(InputFile *ipf) {
3011   if (!ipf) return;
3012   xclose(ipf->fd, "(in child) input file ", ipf->path);
3013 }
3014
3015 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
3016   /* we have no stdio streams that are buffered long-term */
3017   if (!f) return;
3018   if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
3019 }
3020
3021 static void postfork(void) {
3022   in_child= 1;
3023
3024   xsigsetdefault(SIGTERM);
3025   xsigsetdefault(SIGINT);
3026   xsigsetdefault(SIGPIPE);
3027   if (terminate_sig_flag) raise(terminate_sig_flag);
3028
3029   postfork_inputfile(main_input_file);
3030   postfork_inputfile(flushing_input_file);
3031
3032   Conn *conn;
3033   FOR_CONN(conn)
3034     conn_closefd(conn,"(in child) ");
3035
3036   postfork_stdio(defer, "defer file ", path_defer);
3037 }
3038
3039 typedef struct Every Every;
3040 struct Every {
3041   struct timeval interval;
3042   int fixed_rate;
3043   void (*f)(void);
3044 };
3045
3046 static void every_schedule(Every *e, struct timeval base);
3047
3048 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3049   Every *e= e_v;
3050   e->f();
3051   if (!e->fixed_rate) xgettimeofday(&base);
3052   every_schedule(e, base);
3053   return OOP_CONTINUE;
3054 }
3055
3056 static void every_schedule(Every *e, struct timeval base) {
3057   struct timeval when;
3058   timeradd(&base, &e->interval, &when);
3059   loop->on_time(loop, when, every_happens, e);
3060 }
3061
3062 static void every(int interval, int fixed_rate, void (*f)(void)) {
3063   NEW_DECL(Every *,e);
3064   e->interval.tv_sec= interval;
3065   e->interval.tv_usec= 0;
3066   e->fixed_rate= fixed_rate;
3067   e->f= f;
3068   struct timeval now;
3069   xgettimeofday(&now);
3070   every_schedule(e, now);
3071 }
3072
3073 static void filepoll(void) {
3074   filemon_callback(main_input_file);
3075   filemon_callback(flushing_input_file);
3076 }
3077
3078 static char *debug_report_ipf(InputFile *ipf) {
3079   if (!ipf) return xasprintf("none");
3080
3081   const char *slash= strrchr(ipf->path,'/');
3082   const char *path= slash ? slash+1 : ipf->path;
3083
3084   return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s%s",
3085                    ipf, path,
3086                    ipf->inprogress, (long)ipf->offset,
3087                    ipf->fd,
3088                    ipf->rd ? "" : ",!rd",
3089                    ipf->skippinglong ? "*skiplong" : "");
3090 }
3091
3092 static void period(void) {
3093   char *dipf_main=     debug_report_ipf(main_input_file);
3094   char *dipf_flushing= debug_report_ipf(flushing_input_file);
3095   char *dipf_backlog=  debug_report_ipf(backlog_input_file);
3096
3097   debug("PERIOD"
3098         " sms=%s[%d] conns=%d queue=%d until_connect=%d"
3099         " input_files main:%s flushing:%s backlog:%s"
3100         " children connecting=%ld inndcomm=%ld"
3101         ,
3102         sms_names[sms], until_flush,
3103           conns.count, queue.count, until_connect,
3104         dipf_main, dipf_flushing, dipf_backlog,
3105         (long)connecting_child, (long)inndcomm_child
3106         );
3107
3108   free(dipf_main);
3109   free(dipf_flushing);
3110   free(dipf_backlog);
3111
3112   if (until_connect) until_connect--;
3113
3114   poll_backlog_file();
3115   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3116   statemc_period_poll();
3117   check_assign_articles();
3118   check_idle_conns();
3119 }
3120
3121
3122 /*========== dumping state ==========*/
3123
3124 static void dump_article_list(FILE *f, const ControlCommand *c,
3125                               const ArticleList *al) {
3126   fprintf(f, " count=%d\n", al->count);
3127   if (!c->xval) return;
3128   
3129   int i; Article *art;
3130   for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3131     fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3132     DUMPV("%p", art->,ipf);
3133     DUMPV("%d", art->,missing);
3134     DUMPV("%lu", (unsigned long)art->,offset);
3135     DUMPV("%d", art->,blanklen);
3136     DUMPV("%d", art->,midlen);
3137     fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3138   }
3139 }
3140   
3141 static void dump_input_file(FILE *f, InputFile *ipf, const char *wh) {
3142   char *dipf= debug_report_ipf(ipf);
3143   fprintf(f,"input %s %s", wh, dipf);
3144   free(dipf);
3145   
3146   if (ipf) {
3147     DUMPV("%d", ipf->,readcount_ok);
3148     DUMPV("%d", ipf->,readcount_blank);
3149     DUMPV("%d", ipf->,readcount_err);
3150   }
3151   fprintf(f,"\n");
3152   if (ipf) {
3153     ArtState state; const char *const *statename; 
3154     for (state=0, statename=artstate_names; *statename; state++,statename++) {
3155 #define RC_DUMP_FMT(x) " " #x "=%d"
3156 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3157       fprintf(f,"input %s counts %-11s"
3158               RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3159               wh, *statename
3160               RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3161     }
3162   }
3163 }
3164
3165 CCMD(dump) {
3166   int i;
3167   fprintf(cc->out, "dumping state to %s\n", path_dump);
3168   FILE *f= fopen(path_dump, "w");
3169   if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3170
3171   fprintf(f,"general");
3172   DUMPV("%s", sms_names,[sms]);
3173   DUMPV("%d", ,until_flush);
3174   DUMPV("%ld", (long),self_pid);
3175   DUMPV("%p", , defer);
3176   DUMPV("%d", , until_connect);
3177   DUMPV("%d", , until_backlog_nextscan);
3178   DUMPV("%d", , simulate_flush);
3179   fprintf(f,"\nnocheck");
3180   DUMPV("%#.10f", , accept_proportion);
3181   DUMPV("%d", , nocheck);
3182   DUMPV("%d", , nocheck_reported);
3183   fprintf(f,"\n");
3184
3185   fprintf(f,"special");
3186   DUMPV("%ld", (long),connecting_child);
3187   DUMPV("%d", , connecting_fdpass_sock);
3188   DUMPV("%d", , control_master);
3189   fprintf(f,"\n");
3190
3191   fprintf(f,"filemon ");
3192   filemon_method_dump_info(f);
3193
3194   dump_input_file(f, main_input_file,     "main"    );
3195   dump_input_file(f, flushing_input_file, "flushing");
3196   dump_input_file(f, backlog_input_file,  "backlog" );
3197
3198   fprintf(f,"conns count=%d\n", conns.count);
3199
3200   Conn *conn;
3201   FOR_CONN(conn) {
3202
3203     fprintf(f,"C%d",conn->fd);
3204     DUMPV("%p",conn->,rd);             DUMPV("%d",conn->,max_queue);
3205     DUMPV("%d",conn->,stream);         DUMPV("%d",conn->,quitting);
3206     DUMPV("%d",conn->,since_activity);
3207     fprintf(f,"\n");
3208
3209     fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3210     fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3211     fprintf(f,"C%d sent",    conn->fd); dump_article_list(f,c,&conn->sent);
3212
3213     fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3214     for (i=0; i<conn->xmitu; i++) {
3215       const struct iovec *iv= &conn->xmit[i];
3216       const XmitDetails *xd= &conn->xmitd[i];
3217       char *dinfo;
3218       long diff;
3219       switch (xd->kind) {
3220       case xk_Const:    dinfo= xasprintf("Const");                 break;
3221       case xk_Artdata:  dinfo= xasprintf("A%p", xd->info.sm_art);  break;
3222       default:
3223         abort();
3224       }
3225       fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3226               sanitise(iv->iov_base, iv->iov_len));
3227       free(dinfo);
3228     }
3229   }
3230
3231   fprintf(f,"queue"); dump_article_list(f,c,&queue);
3232
3233   fprintf(f,"paths");
3234   DUMPV("%s", , path_lock);
3235   DUMPV("%s", , path_flushing);
3236   DUMPV("%s", , path_defer);
3237   DUMPV("%s", , path_control);
3238   DUMPV("%s", , path_dump);
3239   DUMPV("%s", , globpat_backlog);
3240   fprintf(f,"\n");
3241
3242   if (!!ferror(f) + !!fclose(f)) {
3243     fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3244     return;
3245   }
3246 }
3247
3248 /*========== option parsing ==========*/
3249
3250 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3251 static void vbadusage(const char *fmt, va_list al) {
3252   char *m= xvasprintf(fmt,al);
3253   fprintf(stderr, "bad usage: %s\n"
3254           "say --help for help, or read the manpage\n",
3255           m);
3256   if (become_daemon)
3257     syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3258   exit(8);
3259 }
3260
3261 /*---------- generic option parser ----------*/
3262
3263 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3264 static void badusage(const char *fmt, ...) {
3265   va_list al;
3266   va_start(al,fmt);
3267   vbadusage(fmt,al);
3268 }
3269
3270 enum OptFlags {
3271   of_seconds= 001000u,
3272   of_boolean= 002000u,
3273 };
3274
3275 typedef struct Option Option;
3276 typedef void OptionParser(const Option*, const char *val);
3277
3278 struct Option {
3279   int shrt;
3280   const char *lng, *formarg;
3281   void *store;
3282   OptionParser *fn;
3283   int intval;
3284 };
3285
3286 static void parse_options(const Option *options, char ***argvp) {
3287   /* on return *argvp is first non-option arg; argc is not updated */
3288
3289   for (;;) {
3290     const char *arg= *++(*argvp);
3291     if (!arg) break;
3292     if (*arg != '-') break;
3293     if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3294     int a;
3295     while ((a= *++arg)) {
3296       const Option *o;
3297       if (a=='-') {
3298         arg++;
3299         char *equals= strchr(arg,'=');
3300         int len= equals ? (equals - arg) : strlen(arg);
3301         for (o=options; o->shrt || o->lng; o++)
3302           if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3303             goto found_long;
3304         badusage("unknown long option --%s",arg);
3305       found_long:
3306         if (!o->formarg) {
3307           if (equals) badusage("option --%s does not take a value",o->lng);
3308           arg= 0;
3309         } else if (equals) {
3310           arg= equals+1;
3311         } else {
3312           arg= *++(*argvp);
3313           if (!arg) badusage("option --%s needs a value for %s",
3314                              o->lng, o->formarg);
3315         }
3316         o->fn(o, arg);
3317         break; /* eaten the whole argument now */
3318       }
3319       for (o=options; o->shrt || o->lng; o++)
3320         if (a == o->shrt)
3321           goto found_short;
3322       badusage("unknown short option -%c",a);
3323     found_short:
3324       if (!o->formarg) {
3325         o->fn(o,0);
3326       } else {
3327         if (!*++arg) {
3328           arg= *++(*argvp);
3329           if (!arg) badusage("option -%c needs a value for %s",
3330                              o->shrt, o->formarg);
3331         }
3332         o->fn(o,arg);
3333         break; /* eaten the whole argument now */
3334       }
3335     }
3336   }
3337 }
3338
3339 #define DELIMPERHAPS(delim,str)  (str) ? (delim) : "", (str) ? (str) : ""
3340
3341 static void print_options(const Option *options, FILE *f) {
3342   const Option *o;
3343   for (o=options; o->shrt || o->lng; o++) {
3344     char shrt[2] = { o->shrt, 0 };
3345     char *optspec= xasprintf("%s%s%s%s%s",
3346                              o->shrt ? "-" : "", shrt,
3347                              o->shrt && o->lng ? "|" : "",
3348                              DELIMPERHAPS("--", o->lng));
3349     fprintf(f, "  %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3350     free(optspec);
3351   }
3352 }
3353
3354 /*---------- specific option types ----------*/
3355
3356 static void op_integer(const Option *o, const char *val) {
3357   char *ep;
3358   errno= 0;
3359   unsigned long ul= strtoul(val,&ep,10);
3360   if (*ep || ep==val || errno || ul>INT_MAX)
3361     badusage("bad integer value for %s",o->lng);
3362   int *store= o->store;
3363   *store= ul;
3364 }
3365
3366 static void op_double(const Option *o, const char *val) {
3367   int *store= o->store;
3368   char *ep;
3369   errno= 0;
3370   *store= strtod(val, &ep);
3371   if (*ep || ep==val || errno)
3372     badusage("bad floating point value for %s",o->lng);
3373 }
3374
3375 static void op_string(const Option *o, const char *val) {
3376   const char **store= o->store;
3377   *store= val;
3378 }
3379
3380 static void op_seconds(const Option *o, const char *val) {
3381   int *store= o->store;
3382   char *ep;
3383   int unit;
3384
3385   double v= strtod(val,&ep);
3386   if (ep==val) badusage("bad time/duration value for %s",o->lng);
3387
3388   if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3389   else if (!strcmp(ep,"m") || !strcmp(ep,"min"))    unit= 60;
3390   else if (!strcmp(ep,"h") || !strcmp(ep,"hour"))   unit= 3600;
3391   else if (!strcmp(ep,"d") || !strcmp(ep,"day"))    unit= 86400;
3392   else if (!strcmp(ep,"das")) unit= 10;
3393   else if (!strcmp(ep,"hs"))  unit= 100;
3394   else if (!strcmp(ep,"ks"))  unit= 1000;
3395   else if (!strcmp(ep,"Ms"))  unit= 1000000;
3396   else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3397
3398   v *= unit;
3399   v= ceil(v);
3400   if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3401   *store= v;
3402 }
3403
3404 static void op_setint(const Option *o, const char *val) {
3405   int *store= o->store;
3406   *store= o->intval;
3407 }
3408
3409 /*---------- specific options ----------*/
3410
3411 static void help(const Option *o, const char *val);
3412
3413 static const Option innduct_options[]= {
3414 {'f',"feedfile",         "F",     &feedfile,                 op_string      },
3415 {'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
3416 {0,"no-daemon",          0,       &become_daemon,            op_setint, 0   },
3417 {0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
3418 {0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
3419 {'C',"inndconf",         "F",     &inndconffile,             op_string      },
3420 {'P',"port",             "PORT",  &port,                     op_integer     },
3421 {0,"ctrl-sock-dir",      0,       &realsockdir,              op_string      },
3422 {0,"help",               0,       0,                         help           },
3423
3424 {0,"max-connections",    "N",     &max_connections,          op_integer     },
3425 {0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
3426 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
3427 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
3428
3429 {0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
3430 {0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
3431 {0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
3432
3433 {0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
3434 {0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
3435
3436 {0,"reconnect-interval",     "PERIOD", &reconnect_delay_periods,  op_seconds },
3437 {0,"flush-retry-interval",   "PERIOD", &flushfail_retry_periods,  op_seconds },
3438 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3439 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3440 {0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
3441 {0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
3442
3443 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
3444 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
3445
3446 {0,0}
3447 };
3448
3449 static void printusage(FILE *f) {
3450   fputs("usage: innduct [options] site [fqdn]\n"
3451         "available options are:\n", f);
3452   print_options(innduct_options, f);
3453 }
3454
3455 static void help(const Option *o, const char *val) {
3456   printusage(stdout);
3457   if (ferror(stdout) || fflush(stdout)) {
3458     perror("innduct: writing help");
3459     exit(12);
3460   }
3461   exit(0);
3462 }
3463
3464 static void convert_to_periods_rndup(int *store) {
3465   *store += period_seconds-1;
3466   *store /= period_seconds;
3467 }
3468
3469 int main(int argc, char **argv) {
3470   if (!argv[1]) {
3471     printusage(stderr);
3472     exit(8);
3473   }
3474
3475   parse_options(innduct_options, &argv);
3476
3477   /* arguments */
3478
3479   sitename= *argv++;
3480   if (!sitename) badusage("need site name argument");
3481   remote_host= *argv++;
3482   if (*argv) badusage("too many non-option arguments");
3483
3484   /* defaults */
3485
3486   int r= innconf_read(inndconffile);
3487   if (!r) badusage("could not read inn.conf (more info on stderr)");
3488
3489   if (!remote_host) remote_host= sitename;
3490
3491   if (nocheck_thresh < 0 || nocheck_thresh > 100)
3492     badusage("nocheck threshold percentage must be between 0..100");
3493   nocheck_thresh *= 0.01;
3494
3495   if (nocheck_decay < 0.1)
3496     badusage("nocheck decay articles must be at least 0.1");
3497   nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3498
3499   convert_to_periods_rndup(&reconnect_delay_periods);
3500   convert_to_periods_rndup(&flushfail_retry_periods);
3501   convert_to_periods_rndup(&backlog_retry_minperiods);
3502   convert_to_periods_rndup(&backlog_spontrescan_periods);
3503   convert_to_periods_rndup(&spontaneous_flush_periods);
3504   convert_to_periods_rndup(&need_activity_periods);
3505
3506   if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3507     badusage("bad input data ratio must be between 0..100");
3508   max_bad_data_ratio *= 0.01;
3509
3510   if (!feedfile) {
3511     feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3512   } else if (!feedfile[0]) {
3513     badusage("feed filename must be nonempty");
3514   } else if (feedfile[strlen(feedfile)-1]=='/') {
3515     feedfile= xasprintf("%s%s",feedfile,sitename);
3516   }
3517
3518   const char *feedfile_forbidden= "?*[~#";
3519   int c;
3520   while ((c= *feedfile_forbidden++))
3521     if (strchr(feedfile, c))
3522       badusage("feed filename may not contain metacharacter %c",c);
3523
3524   /* set things up */
3525
3526   path_lock=        xasprintf("%s_lock",      feedfile);
3527   path_flushing=    xasprintf("%s_flushing",  feedfile);
3528   path_defer=       xasprintf("%s_defer",     feedfile);
3529   path_control=     xasprintf("%s_control",   feedfile);
3530   path_dump=        xasprintf("%s_dump",      feedfile);
3531   globpat_backlog=  xasprintf("%s_backlog*",  feedfile);
3532
3533   oop_source_sys *sysloop= oop_sys_new();
3534   if (!sysloop) sysdie("could not create liboop event loop");
3535   loop= (oop_source*)sysloop;
3536
3537   LIST_INIT(conns);
3538   LIST_INIT(queue);
3539
3540   if (become_daemon) {
3541     int i;
3542     for (i=3; i<255; i++)
3543       /* do this now before we open syslog, etc. */
3544       close(i);
3545     openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3546
3547     int null= open("/dev/null",O_RDWR);
3548     if (null<0) sysfatal("failed to open /dev/null");
3549     dup2(null,0);
3550     dup2(null,1);
3551     dup2(null,2);
3552     xclose(null, "/dev/null original fd",0);
3553
3554     pid_t child1= xfork("daemonise first fork");
3555     if (child1) _exit(0);
3556
3557     pid_t sid= setsid();
3558     if (sid != child1) sysfatal("setsid failed");
3559
3560     pid_t child2= xfork("daemonise second fork");
3561     if (child2) _exit(0);
3562   }
3563
3564   self_pid= getpid();
3565   if (self_pid==-1) sysdie("getpid");
3566
3567   statemc_lock();
3568
3569   init_signals();
3570
3571   notice("starting");
3572
3573   if (!become_daemon)
3574     control_stdio();
3575
3576   control_init();
3577
3578   int filemon_ok= 0;
3579   if (!try_filemon) {
3580     notice("filemon: suppressed by command line option, polling");
3581   } else {
3582     filemon_ok= filemon_method_init();
3583     if (!filemon_ok)
3584       warn("filemon: no file monitoring available, polling");
3585   }
3586   if (!filemon_ok)
3587     every(filepoll_seconds,0,filepoll);
3588
3589   every(period_seconds,1,period);
3590
3591   statemc_init();
3592
3593   /* let's go */
3594
3595   void *run= oop_sys_run(sysloop);
3596   assert(run == OOP_ERROR);
3597   sysdie("event loop failed");
3598 }