chiark / gitweb /
put flow control notes somewhere sensible
[innduct.git] / backends / innduct.c
1 /*
2  * debugging rune:
3  *  build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
4  */
5
6 /*
7  * Newsfeeds file entries should look like this:
8  *     host.name.of.site[/exclude,exclude,...]\
9  *             :pattern,pattern...[/distribution,distribution...]\
10  *             :Tf,Wnm
11  *             :
12  * or
13  *     sitename[/exclude,exclude,...]\
14  *             :pattern,pattern...[/distribution,distribution...]\
15  *             :Tf,Wnm
16  *             :host.name.of.site
17  *
18  * Four files full of
19  *    token messageid
20  * or might be blanked out
21  *    <spc><spc><spc><spc>....
22  *
23  * F site.name                 main feed file
24  *                                opened/created, then written, by innd
25  *                                read by duct
26  *                                unlinked by duct
27  *                                tokens blanked out by duct when processed
28  *   site.name_lock            lock preventing multiple ducts
29  *                                to hold lock must open,F_SETLK[W]
30  *                                  and then stat to check that locked file
31  *                                  still has name site.name_lock
32  *                                holder of this lock is "duct"
33  *                                (only) lockholder may remove the lockfile
34  * D site.name_flushing        temporary feed file during flush (or crash)
35  *                                hardlink created by duct
36  *                                unlinked by duct
37  *   site.name_defer           431'd articles, still being written,
38  *                                created, written, used by duct
39  *
40  *   site.name_backlog.<date>.<inum>
41  *                             431'd articles, ready for innxmit or duct
42  *                                created (link/mv) by duct
43  *   site.name_backlog<anything-else>  (where <anything-else> does not
44  *                                      contain '#' or '~') eg
45  *   site.name_backlog.manual
46  *                             anything the sysadmin likes (eg, feed files
47  *                             from old feeds to be merged into this one)
48  *                                created (link/mv) by admin
49  *                                may be symlinks (in which case links
50  *                                may be written through, but only links
51  *                                will be removed.
52  *
53  *                             It is safe to remove backlog files manually,
54  *                             if it's desired to throw away the backlog.
55  *
56  * Backlog files are also processed by innduct.  We find the oldest
57  * backlog file which is at least a certain amount old, and feed it
58  * back into our processing.  When every article in it has been read
59  * and processed, we unlink it and look for another backlog file.
60  *
61  * If we don't have a backlog file that we're reading, we close the
62  * defer file that we're writing and make it into a backlog file at
63  * the first convenient opportunity.
64  * -8<-
65
66
67    OVERALL STATES:
68
69                                                                 START
70                                                                   |
71      ,-->--.                                                 check F, D
72      |     |                                                      |
73      |     |                                                      |
74      |     |  <----------------<---------------------------------'|
75      |     |                                       F exists       |
76      |     |                                       D ENOENT       |
77      |     |  duct opens F                                        |
78      |     V                                                      |
79      |  Normal                                                    |
80      |   F: innd writing, duct reading                            |
81      |   D: ENOENT                                                |
82      |     |                                                      |
83      |     |  duct decides time to flush                          |
84      |     |  duct makes hardlink                                 |
85      |     |                                                      |
86      |     V                            <------------------------'|
87      |  Hardlinked                                  F==D          |
88      |   F == D: innd writing, duct reading         both exist    |
89      ^     |                                                      |
90      |     |  duct unlinks F                                      |
91      |     |                        <-----------<-------------<--'|
92      |     |                           open D         F ENOENT    |
93      |     |                           if exists                  |
94      |     |                                                      |
95      |     V                        <---------------------.       |
96      |  Moved                                             |       |
97      |   F: ENOENT                                        |       |
98      |   D: innd writing, duct reading; or ENOENT         |       |
99      |     |                                              |       |
100      |     |  duct requests flush of feed                 |       |
101      |     |   (others can too, harmlessly)               |       |
102      |     V                                              |       |
103      |  Flushing                                          |       |
104      |   F: ENOENT                                        |       |
105      |   D: innd flushing, duct; or ENOENT                |       |
106      |     |                                              |       |
107      |     |   inndcomm flush fails                       |       |
108      |     |`-------------------------->------------------'       |
109      |     |                                                      |
110      |     |   inndcomm reports no such site                      |
111      |     |`---------------------------------------------------- | -.
112      |     |                                                      |  |
113      |     |  innd finishes writing D, creates F                  |  |
114      |     |  inndcomm reports flush successful                   |  |
115      |     |                                                      |  |
116      |     V                                                      |  |
117      |  Separated                                <----------------'  |
118      |   F: innd writing                            F!=D             /
119      |   D: duct reading; or ENOENT                  both exist     /
120      |     |                                                       /
121      |     |  duct gets to the end of D                           /
122      |     |  duct opens F too                                   /
123      |     V                                                    /
124      |  Finishing                                              /
125      |   F: innd writing, duct reading                        |
126      |   D: duct finishing                                    V
127      |     |                                            Dropping
128      |     |  duct finishes processing D                 F: ENOENT
129      |     V  duct unlinks D                             D: duct reading
130      |     |                                                  |
131      `--<--'                                                  | duct finishes
132                                                               |  processing D
133                                                               | duct unlinks D
134                                                               | duct exits
135                                                               V
136                                                         Dropped
137                                                          F: ENOENT
138                                                          D: ENOENT
139                                                          duct not running
140
141    "duct reading" means innduct is reading the file but also
142    overwriting processed tokens.
143
144  * ->8- -^L-
145  *
146  * rune for printing diagrams:
147
148 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
149
150  *
151  */
152
153 /*============================== PROGRAM ==============================*/
154
155 #define _GNU_SOURCE 1
156
157 #include "config.h"
158 #include "storage.h"
159 #include "nntp.h"
160 #include "libinn.h"
161 #include "inndcomm.h"
162
163 #include "inn/list.h"
164 #include "inn/innconf.h"
165
166 #include <sys/uio.h>
167 #include <sys/types.h>
168 #include <sys/wait.h>
169 #include <sys/stat.h>
170 #include <sys/socket.h>
171 #include <sys/un.h>
172 #include <unistd.h>
173 #include <string.h>
174 #include <signal.h>
175 #include <stdio.h>
176 #include <errno.h>
177 #include <syslog.h>
178 #include <fcntl.h>
179 #include <stdarg.h>
180 #include <assert.h>
181 #include <stdlib.h>
182 #include <stddef.h>
183 #include <glob.h>
184 #include <time.h>
185 #include <math.h>
186 #include <ctype.h>
187
188 #include <oop.h>
189 #include <oop-read.h>
190
191 /*----- general definitions, probably best not changed -----*/
192
193 #define CONNCHILD_ESTATUS_STREAM   24
194 #define CONNCHILD_ESTATUS_NOSTREAM 25
195
196 #define INNDCOMMCHILD_ESTATUS_FAIL     26
197 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
198
199 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
200 #define MAX_CONTROL_COMMAND 1000
201
202 #define VA                va_list al;  va_start(al,fmt)
203 #define PRINTF(f,a)       __attribute__((__format__(printf,f,a)))
204 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
205 #define NORET             __attribute__((__noreturn__))
206
207 #define NEW(ptr)              ((ptr)= zxmalloc(sizeof(*(ptr))))
208 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
209
210 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
211
212 #define FOR_CONN(conn) \
213   for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
214
215 /*----- doubly linked lists -----*/
216
217 #define ISNODE(T)   struct node list_node
218 #define DEFLIST(T)                              \
219    typedef struct {                             \
220      union { struct list li; T *for_type; } u;  \
221      int count;                                 \
222    } T##List
223
224 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
225
226 #define LIST_CHECKCANHAVENODE(l,n) \
227   ((void)((n) == ((l).u.for_type))) /* just for the type check */
228
229 #define LIST_ADDSOMEHOW(l,n,list_addsomehow)    \
230  ( LIST_CHECKCANHAVENODE(l,n),                  \
231    list_addsomehow(&(l).u.li, NODE((n))),       \
232    (void)(l).count++                            \
233    )
234
235 #define LIST_REMSOMEHOW(l,list_remsomehow)      \
236  ( (typeof((l).u.for_type))                     \
237    ( (l).count                                  \
238      ? ( (l).count--,                           \
239          list_remsomehow(&(l).u.li) )           \
240      : 0                                        \
241      )                                          \
242    )
243
244
245 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
246 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
247 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
248 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
249
250 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
251 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
252 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
253 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
254
255 #define LIST_REMOVE(l,n)                        \
256  ( LIST_CHECKCANHAVENODE(l,n),                  \
257    list_remove(NODE((n))),                      \
258    (void)(l).count--                            \
259    )
260
261 #define LIST_INSERT(l,n,pred)                                   \
262  ( LIST_CHECKCANHAVENODE(l,n),                                  \
263    LIST_CHECKCANHAVENODE(l,pred),                               \
264    list_insert((struct list*)&(l), NODE((n)), NODE((pred))),    \
265    (void)(l).count++                                            \
266    )
267
268 /*----- type predeclarations -----*/
269
270 typedef struct Conn Conn;
271 typedef struct Article Article;
272 typedef struct InputFile InputFile;
273 typedef struct XmitDetails XmitDetails;
274 typedef struct Filemon_Perfile Filemon_Perfile;
275 typedef enum StateMachineState StateMachineState;
276 typedef struct ControlCommand ControlCommand;
277
278 DEFLIST(Conn);
279 DEFLIST(Article);
280
281 /*----- function predeclarations -----*/
282
283 static void conn_maybe_write(Conn *conn);
284 static void conn_make_some_xmits(Conn *conn);
285 static void *conn_write_some_xmits(Conn *conn);
286
287 static void xmit_free(XmitDetails *d);
288
289 #define SMS(newstate, periods, why) \
290    (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
291 static void statemc_setstate(StateMachineState newsms, int periods,
292                              const char *forlog, const char *why);
293
294 static void statemc_start_flush(const char *why); /* Normal => Flushing */
295 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
296 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
297
298 static void article_done(Article *art, int whichcount);
299
300 static void check_assign_articles(void);
301 static void queue_check_input_done(void);
302 static void check_reading_pause_resume(InputFile *ipf);
303
304 static void statemc_check_flushing_done(void);
305 static void statemc_check_backlog_done(void);
306
307 static void postfork(void);
308 static void period(void);
309
310 static void open_defer(void);
311 static void close_defer(void);
312 static void search_backlog_file(void);
313 static void preterminate(void);
314 static void raise_default(int signo) NORET;
315 static char *debug_report_ipf(InputFile *ipf);
316
317 static void inputfile_reading_start(InputFile *ipf);
318 static void inputfile_reading_stop(InputFile *ipf);
319 static void inputfile_reading_pause(InputFile *ipf);
320 static void inputfile_reading_resume(InputFile *ipf);
321   /* pause and resume are idempotent, and no-op if not done _reading_start */
322
323 static void filemon_start(InputFile *ipf);
324 static void filemon_stop(InputFile *ipf);
325 static void filemon_callback(InputFile *ipf);
326
327 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
328 static void connfail(Conn *conn, const char *fmt, ...)         PRINTF(2,3);
329
330 static const oop_rd_style peer_rd_style;
331 static oop_rd_call peer_rd_err, peer_rd_ok;
332
333 /*----- configuration options -----*/
334 /* when changing defaults, remember to update the manpage */
335
336 static const char *sitename, *remote_host;
337 static const char *feedfile, *realsockdir="/tmp/innduct.control";
338 static int quiet_multiple=0;
339 static int become_daemon=1, try_filemon=1;
340 static int try_stream=1;
341 static int port=119;
342 static const char *inndconffile;
343
344 static int max_connections=10;
345 static int max_queue_per_conn=200;
346 static int target_max_feedfile_size=100000;
347 static int period_seconds=60;
348 static int filepoll_seconds=5;
349 static int max_queue_per_ipf=-1;
350
351 static int connection_setup_timeout=200;
352 static int inndcomm_flush_timeout=100;
353
354 static double nocheck_thresh= 95.0; /* converted from percentage by main */
355 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
356
357 /* all these are initialised to seconds, and converted to periods in main */
358 static int reconnect_delay_periods=1000;
359 static int flushfail_retry_periods=1000;
360 static int backlog_retry_minperiods=50;
361 static int backlog_spontrescan_periods=300;
362 static int spontaneous_flush_periods=100000;
363 static int max_separated_periods=2000;
364 static int need_activity_periods=1000;
365
366 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
367 static int max_bad_data_initial= 30;
368   /* in one corrupt 4096-byte block the number of newlines has
369    * mean 16 and standard deviation 3.99.  30 corresponds to z=+3.5 */
370
371
372 /*----- statistics -----*/
373
374 typedef enum {      /* in queue                 in conn->sent             */
375   art_Unchecked,    /*   not checked, not sent    checking                */
376   art_Wanted,       /*   checked, wanted          sent body as requested  */
377   art_Unsolicited,  /*   -                        sent body without check */
378   art_MaxState,
379 } ArtState;
380
381 static const char *const artstate_names[]=
382   { "Unchecked", "Wanted", "Unsolicited", 0 };
383
384 #define RESULT_COUNTS(RCS,RCN)                  \
385   RCS(sent)                                     \
386   RCS(accepted)                                 \
387   RCN(unwanted)                                 \
388   RCN(rejected)                                 \
389   RCN(deferred)                                 \
390   RCN(missing)                                  \
391   RCN(connretry)
392
393 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
394 #define RCI_TRIPLE_VALS_BASE(counts,x)          \
395        counts[art_Unchecked] x                  \
396        + counts[art_Wanted] x                   \
397        + counts[art_Unsolicited] x,             \
398        counts[art_Unchecked] x                  \
399        , counts[art_Wanted] x                   \
400        , counts[art_Unsolicited] x
401
402 typedef enum {
403 #define RC_INDEX(x) RC_##x,
404   RESULT_COUNTS(RC_INDEX, RC_INDEX)
405   RCI_max
406 } ResultCountIndex;
407
408
409 /*----- transmission buffers -----*/
410
411 #define CONNIOVS 128
412
413 typedef enum {
414   xk_Const, xk_Artdata
415 } XmitKind;
416
417 struct XmitDetails {
418   XmitKind kind;
419   union {
420     ARTHANDLE *sm_art;
421   } info;
422 };
423
424
425 /*----- core operational data structure types -----*/
426
427 struct InputFile {
428   /* This is also an instance of struct oop_readable */
429   struct oop_readable readable; /* first */
430   oop_readable_call *readable_callback;
431   void *readable_callback_user;
432
433   int fd;
434   Filemon_Perfile *filemon;
435
436   oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
437   off_t offset;
438   int skippinglong, paused;
439
440   ArticleList queue;
441   long inprogress; /* includes queue.count and also articles in conns */
442   long autodefer; /* -1 means not doing autodefer */
443
444   int counts[art_MaxState][RCI_max];
445   int readcount_ok, readcount_blank, readcount_err, count_nooffer_missing;
446   char path[];
447 };
448
449 struct Article {
450   ISNODE(Article);
451   ArtState state;
452   int midlen, missing;
453   InputFile *ipf;
454   TOKEN token;
455   off_t offset;
456   int blanklen;
457   char messageid[1];
458 };
459
460 #define SMS_LIST(X)                             \
461   X(NORMAL)                                     \
462   X(FLUSHING)                                   \
463   X(FLUSHFAILED)                                \
464   X(SEPARATED)                                  \
465   X(DROPPING)                                   \
466   X(DROPPED)
467
468 enum StateMachineState {
469 #define SMS_DEF_ENUM(s) sm_##s,
470   SMS_LIST(SMS_DEF_ENUM)
471 };
472
473 static const char *sms_names[]= {
474 #define SMS_DEF_NAME(s) #s ,
475   SMS_LIST(SMS_DEF_NAME)
476   0
477 };
478
479 struct Conn {
480   ISNODE(Conn);
481   int fd; /* may be 0, meaning closed (during construction/destruction) */
482   oop_read *rd; /* likewise */
483   int max_queue, stream, quitting;
484   int since_activity; /* periods */
485   ArticleList waiting; /* not yet told peer */
486   ArticleList priority; /* peer says send it now */
487   ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
488   struct iovec xmit[CONNIOVS];
489   XmitDetails xmitd[CONNIOVS];
490   int xmitu;
491 };
492
493
494 /*----- general operational variables -----*/
495
496 /* main initialises */
497 static oop_source *loop;
498 static ConnList conns;
499 static char *path_lock, *path_flushing, *path_defer;
500 static char *path_control, *path_dump;
501 static char *globpat_backlog;
502 static pid_t self_pid;
503
504 /* statemc_init initialises */
505 static StateMachineState sms;
506 static int until_flush;
507 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
508 static FILE *defer;
509
510 /* initialisation to 0 is good */
511 static int until_connect, until_backlog_nextscan;
512 static double accept_proportion;
513 static int nocheck, nocheck_reported, in_child;
514
515 /* for simulation, debugging, etc. */
516 int simulate_flush= -1;
517
518 /*========== logging ==========*/
519
520 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
521 static void logcore(int sysloglevel, const char *fmt, ...) {
522   VA;
523   if (become_daemon) {
524     vsyslog(sysloglevel,fmt,al);
525   } else {
526     if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
527     vfprintf(stderr,fmt,al);
528     putc('\n',stderr);
529   }
530   va_end(al);
531 }
532
533 static void logv(int sysloglevel, const char *pfx, int errnoval,
534                  const char *fmt, va_list al) PRINTF(5,0);
535 static void logv(int sysloglevel, const char *pfx, int errnoval,
536                  const char *fmt, va_list al) {
537   char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
538   vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
539   msgbuf[sizeof(msgbuf)-1]= 0;
540
541   if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
542     sysloglevel= LOG_ERR; /* run by wrong user, probably */
543
544   logcore(sysloglevel, "<%s>%s: %s%s%s",
545          sitename, pfx, msgbuf,
546          errnoval>=0 ? ": " : "",
547          errnoval>=0 ? strerror(errnoval) : "");
548 }
549
550 #define diewrap(fn, pfx, sysloglevel, err, estatus)             \
551   static void fn(const char *fmt, ...) NORET_PRINTF(1,2);       \
552   static void fn(const char *fmt, ...) {                        \
553     preterminate();                                             \
554     VA;                                                         \
555     logv(sysloglevel, pfx, err, fmt, al);                       \
556     exit(estatus);                                              \
557   }
558
559 #define logwrap(fn, pfx, sysloglevel, err)              \
560   static void fn(const char *fmt, ...) PRINTF(1,2);     \
561   static void fn(const char *fmt, ...) {                \
562     VA;                                                 \
563     logv(sysloglevel, pfx, err, fmt, al);               \
564     va_end(al);                                         \
565   }
566
567 diewrap(sysdie,   " critical", LOG_CRIT,    errno, 16);
568 diewrap(die,      " critical", LOG_CRIT,    -1,    16);
569
570 diewrap(sysfatal, " fatal",    LOG_ERR,     errno, 12);
571 diewrap(fatal,    " fatal",    LOG_ERR,     -1,    12);
572
573 logwrap(syswarn,  " warning",  LOG_WARNING, errno);
574 logwrap(warn,     " warning",  LOG_WARNING, -1);
575
576 logwrap(notice,   " notice",   LOG_NOTICE,  -1);
577 logwrap(info,     " info",     LOG_INFO,    -1);
578 logwrap(debug,    " debug",    LOG_DEBUG,   -1);
579
580
581 /*========== utility functions etc. ==========*/
582
583 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
584 static char *xvasprintf(const char *fmt, va_list al) {
585   char *str;
586   int rc= vasprintf(&str,fmt,al);
587   if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
588   return str;
589 }
590 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
591 static char *xasprintf(const char *fmt, ...) {
592   VA;
593   char *str= xvasprintf(fmt,al);
594   va_end(al);
595   return str;
596 }
597
598 static int close_perhaps(int *fd) {
599   if (*fd <= 0) return 0;
600   int r= close(*fd);
601   *fd=0;
602   return r;
603 }
604 static void xclose(int fd, const char *what, const char *what2) {
605   int r= close(fd);
606   if (r) sysdie("close %s%s",what,what2?what2:"");
607 }
608 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
609   if (*fd <= 0) return;
610   xclose(*fd,what,what2);
611   *fd=0;
612 }
613
614 static pid_t xfork(const char *what) {
615   pid_t child;
616
617   child= fork();
618   if (child==-1) sysfatal("cannot fork for %s",what);
619   debug("forked %s %ld", what, (unsigned long)child);
620   if (!child) postfork();
621   return child;
622 }
623
624 static void on_fd_read_except(int fd, oop_call_fd callback) {
625   loop->on_fd(loop, fd, OOP_READ,      callback, 0);
626   loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
627 }
628 static void cancel_fd_read_except(int fd) {
629   loop->cancel_fd(loop, fd, OOP_READ);
630   loop->cancel_fd(loop, fd, OOP_EXCEPTION);
631 }
632
633 static void report_child_status(const char *what, int status) {
634   if (WIFEXITED(status)) {
635     int es= WEXITSTATUS(status);
636     if (es)
637       warn("%s: child died with error exit status %d", what, es);
638   } else if (WIFSIGNALED(status)) {
639     int sig= WTERMSIG(status);
640     const char *sigstr= strsignal(sig);
641     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
642     if (sigstr)
643       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
644     else
645       warn("%s: child died due to unknown fatal signal %d%s",
646            what, sig, coredump);
647   } else {
648     warn("%s: child died with unknown wait status %d", what,status);
649   }
650 }
651
652 static int xwaitpid(pid_t *pid, const char *what) {
653   int status;
654
655   int r= kill(*pid, SIGKILL);
656   if (r) sysdie("cannot kill %s child", what);
657
658   pid_t got= waitpid(*pid, &status, 0);
659   if (got==-1) sysdie("cannot reap %s child", what);
660   if (got==0) die("cannot reap %s child", what);
661
662   *pid= 0;
663
664   return status;
665 }
666
667 static void *zxmalloc(size_t sz) {
668   void *p= xmalloc(sz);
669   memset(p,0,sz);
670   return p;
671 }
672
673 static void xunlink(const char *path, const char *what) {
674   int r= unlink(path);
675   if (r) sysdie("can't unlink %s %s", path, what);
676 }
677
678 static time_t xtime(void) {
679   time_t now= time(0);
680   if (now==-1) sysdie("time(2) failed");
681   return now;
682 }
683
684 static void xsigaction(int signo, const struct sigaction *sa) {
685   int r= sigaction(signo,sa,0);
686   if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
687 }
688
689 static void xsigsetdefault(int signo) {
690   struct sigaction sa;
691   memset(&sa,0,sizeof(sa));
692   sa.sa_handler= SIG_DFL;
693   xsigaction(signo,&sa);
694 }
695
696 static void xgettimeofday(struct timeval *tv_r) {
697   int r= gettimeofday(tv_r,0);
698   if (r) sysdie("gettimeofday(2) failed");
699 }
700
701 static void xsetnonblock(int fd, int nonblocking) {
702   int errnoval= oop_fd_nonblock(fd, nonblocking);
703   if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
704 }
705
706 static void check_isreg(const struct stat *stab, const char *path,
707                         const char *what) {
708   if (!S_ISREG(stab->st_mode))
709     die("%s %s not a plain file (mode 0%lo)",
710         what, path, (unsigned long)stab->st_mode);
711 }
712
713 static void xfstat(int fd, struct stat *stab_r, const char *what) {
714   int r= fstat(fd, stab_r);
715   if (r) sysdie("could not fstat %s", what);
716 }
717
718 static void xfstat_isreg(int fd, struct stat *stab_r,
719                          const char *path, const char *what) {
720   xfstat(fd, stab_r, what);
721   check_isreg(stab_r, path, what);
722 }
723
724 static void xlstat_isreg(const char *path, struct stat *stab,
725                          int *enoent_r /* 0 means ENOENT is fatal */,
726                          const char *what) {
727   int r= lstat(path, stab);
728   if (r) {
729     if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
730     sysdie("could not lstat %s %s", what, path);
731   }
732   if (enoent_r) *enoent_r= 0;
733   check_isreg(stab, path, what);
734 }
735
736 static int samefile(const struct stat *a, const struct stat *b) {
737   assert(S_ISREG(a->st_mode));
738   assert(S_ISREG(b->st_mode));
739   return (a->st_ino == b->st_ino &&
740           a->st_dev == b->st_dev);
741 }
742
743 static char *sanitise(const char *input, int len) {
744   static char sanibuf[100]; /* returns pointer to this buffer! */
745
746   const char *p= input;
747   const char *endp= len>=0 ? input+len : 0;
748   char *q= sanibuf;
749   *q++= '`';
750   for (;;) {
751     if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
752     int c= (!endp || p<endp) ? *p++ : 0;
753     if (!c) { *q++= '\''; *q=0; break; }
754     if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
755     sprintf(q,"\\x%02x",c);
756     q += 4;
757   }
758   return sanibuf;
759 }
760
761 static int isewouldblock(int errnoval) {
762   return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
763 }
764
765 /*========== command and control connections ==========*/
766
767 static int control_master;
768
769 typedef struct ControlConn ControlConn;
770 struct ControlConn {
771   void (*destroy)(ControlConn*);
772   int fd;
773   oop_read *rd;
774   FILE *out;
775   union {
776     struct sockaddr sa;
777     struct sockaddr_un un;
778   } sa;
779   socklen_t salen;
780 };
781
782 static const oop_rd_style control_rd_style= {
783   OOP_RD_DELIM_STRIP, '\n',
784   OOP_RD_NUL_FORBID,
785   OOP_RD_SHORTREC_FORBID
786 };
787
788 static void control_destroy(ControlConn *cc) {
789   cc->destroy(cc);
790 }
791
792 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
793   if (ferror(cc->out) | fflush(cc->out)) {
794     info("CTRL%d write error %s", cc->fd, strerror(errno));
795     control_destroy(cc);
796   }
797 }
798
799 static void control_prompt(ControlConn *cc /* may destroy*/) {
800   fprintf(cc->out, "%s| ", sitename);
801   control_checkouterr(cc);
802 }
803
804 struct ControlCommand {
805   const char *cmd;
806   void (*f)(ControlConn *cc, const ControlCommand *ccmd,
807             const char *arg, size_t argsz);
808   void *xdata;
809   int xval;
810 };
811
812 static const ControlCommand control_commands[];
813
814 #define CCMD(wh)                                                        \
815   static void ccmd_##wh(ControlConn *cc, const ControlCommand *c,       \
816                         const char *arg, size_t argsz)
817
818 CCMD(help) {
819   fputs("commands:\n", cc->out);
820   const ControlCommand *ccmd;
821   for (ccmd=control_commands; ccmd->cmd; ccmd++)
822     fprintf(cc->out, " %s\n", ccmd->cmd);
823   fputs("NB: permissible arguments are not shown above."
824         "  Not all commands listed are safe.  See innduct(8).\n", cc->out);
825 }
826
827 CCMD(flush) {
828   int ok= trigger_flush_ok();
829   if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
830 }
831
832 CCMD(stop) {
833   preterminate();
834   notice("terminating (CTRL%d)",cc->fd);
835   raise_default(SIGTERM);
836   abort();
837 }
838
839 CCMD(dump);
840
841 /* messing with our head: */
842 CCMD(period) { period(); }
843 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
844 CCMD(setint) { *(int*)c->xdata= c->xval; }
845 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
846
847 static const ControlCommand control_commands[]= {
848   { "h",             ccmd_help      },
849   { "flush",         ccmd_flush     },
850   { "stop",          ccmd_stop      },
851   { "dump q",        ccmd_dump, 0,0 },
852   { "dump a",        ccmd_dump, 0,1 },
853
854   { "p",             ccmd_period    },
855
856 #define POKES(cmd,func)                                                 \
857   { cmd "flush",     func,           &until_flush,             1 },     \
858   { cmd "conn",      func,           &until_connect,           0 },     \
859   { cmd "blscan",    func,           &until_backlog_nextscan,  0 },
860 POKES("next ", ccmd_setint)
861 POKES("prod ", ccmd_setint_period)
862
863   { "pretend flush", ccmd_setintarg, &simulate_flush             },
864   { "wedge blscan",  ccmd_setint,    &until_backlog_nextscan, -1 },
865   { 0 }
866 };
867
868 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
869                            const char *errmsg, int errnoval,
870                            const char *data, size_t recsz, void *cc_v) {
871   ControlConn *cc= cc_v;
872
873   if (!data) {
874     info("CTRL%d closed", cc->fd);
875     cc->destroy(cc);
876     return OOP_CONTINUE;
877   }
878
879   if (recsz == 0) goto prompt;
880
881   const ControlCommand *ccmd;
882   for (ccmd=control_commands; ccmd->cmd; ccmd++) {
883     int l= strlen(ccmd->cmd);
884     if (recsz < l) continue;
885     if (recsz > l && data[l] != ' ') continue;
886     if (memcmp(data, ccmd->cmd, l)) continue;
887
888     int argl= (int)recsz - (l+1); 
889     ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
890     goto prompt;
891   }
892
893   fputs("unknown command; h for help\n", cc->out);
894
895  prompt:
896   control_prompt(cc);
897   return OOP_CONTINUE;
898 }
899
900 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
901                             const char *errmsg, int errnoval,
902                             const char *data, size_t recsz, void *cc_v) {
903   ControlConn *cc= cc_v;
904   
905   info("CTRL%d read error %s", cc->fd, errmsg);
906   cc->destroy(cc);
907   return OOP_CONTINUE;
908 }
909
910 static int control_conn_startup(ControlConn *cc /* may destroy*/,
911                                 const char *how) {
912   cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
913   if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
914
915   int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
916                       control_rd_ok, cc,
917                       control_rd_err, cc);
918   if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
919
920   info("CTRL%d %s ready", cc->fd, how);
921   control_prompt(cc);
922   return 0;
923 }
924
925 static void control_stdio_destroy(ControlConn *cc) {
926   if (cc->rd) {
927     oop_rd_cancel(cc->rd);
928     errno= oop_rd_delete_tidy(cc->rd);
929     if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
930   }
931   free(cc);
932 }
933
934 static void control_stdio(void) {
935   NEW_DECL(ControlConn *,cc);
936   cc->destroy= control_stdio_destroy;
937
938   cc->fd= 0;
939   cc->out= stdout;
940   int r= control_conn_startup(cc,"stdio");
941   if (r) cc->destroy(cc);
942 }
943
944 static void control_accepted_destroy(ControlConn *cc) {
945   if (cc->rd) {
946     oop_rd_cancel(cc->rd);
947     oop_rd_delete_kill(cc->rd);
948   }
949   if (cc->out) { fclose(cc->out); cc->fd=0; }
950   close_perhaps(&cc->fd);
951   free(cc);
952 }
953
954 static void *control_master_readable(oop_source *lp, int master,
955                                      oop_event ev, void *u) {
956   NEW_DECL(ControlConn *,cc);
957   cc->destroy= control_accepted_destroy;
958
959   cc->salen= sizeof(cc->sa);
960   cc->fd= accept(master, &cc->sa.sa, &cc->salen);
961   if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
962
963   cc->out= fdopen(cc->fd, "w");
964   if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
965
966   int r= control_conn_startup(cc, "accepted");
967   if (r) goto x;
968
969   return OOP_CONTINUE;
970
971  x:
972   cc->destroy(cc);
973   return OOP_CONTINUE;
974 }
975
976 #define NOCONTROL(...) do{                                              \
977     syswarn("no control socket, because failed to " __VA_ARGS__);       \
978     goto nocontrol;                                                     \
979   }while(0)
980
981 static void control_init(void) {
982   char *real=0;
983   
984   union {
985     struct sockaddr sa;
986     struct sockaddr_un un;
987   } sa;
988
989   memset(&sa,0,sizeof(sa));
990   int maxlen= sizeof(sa.un.sun_path);
991
992   int reallen= readlink(path_control, sa.un.sun_path, maxlen);
993   if (reallen<0) {
994     if (errno != ENOENT)
995       NOCONTROL("readlink control socket symlink path %s", path_control);
996   }
997   if (reallen >= maxlen) {
998     debug("control socket symlink path too long (r=%d)",reallen);
999     xunlink(path_control, "old (overlong) control socket symlink");
1000     reallen= -1;
1001   }
1002   
1003   if (reallen<0) {
1004     struct stat stab;
1005     int r= lstat(realsockdir,&stab);
1006     if (r) {
1007       if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
1008
1009       r= mkdir(realsockdir, 0700);
1010       if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
1011
1012     } else {
1013       uid_t self= geteuid();
1014       if (!S_ISDIR(stab.st_mode) ||
1015           stab.st_uid != self ||
1016           stab.st_mode & 0007) {
1017         warn("no control socket, because real socket directory"
1018              " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
1019              !!S_ISDIR(stab.st_mode),
1020              (unsigned long)stab.st_uid, (unsigned long)self,
1021              (unsigned long)stab.st_mode & 0777UL);
1022         goto nocontrol;
1023       }
1024     }
1025
1026     real= xasprintf("%s/s%lx.%lx", realsockdir,
1027                     (unsigned long)xtime(), (unsigned long)self_pid);
1028     int reallen= strlen(real);
1029
1030     if (reallen >= maxlen) {
1031       warn("no control socket, because tmpnam gave overly-long path"
1032            " %s", real);
1033       goto nocontrol;
1034     }
1035     r= symlink(real, path_control);
1036     if (r) NOCONTROL("make control socket path %s a symlink to real"
1037                      " socket path %s", path_control, real);
1038     memcpy(sa.un.sun_path, real, reallen);
1039   }
1040
1041   int r= unlink(sa.un.sun_path);
1042   if (r && errno!=ENOENT)
1043     NOCONTROL("remove old real socket %s", sa.un.sun_path);
1044
1045   control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1046   if (control_master<0) NOCONTROL("create new control socket");
1047
1048   sa.un.sun_family= AF_UNIX;
1049   int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1050   r= bind(control_master, &sa.sa, sl);
1051   if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1052
1053   r= listen(control_master, 5);
1054   if (r) NOCONTROL("listen");
1055
1056   xsetnonblock(control_master, 1);
1057
1058   loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1059   info("control socket ok, real path %s", sa.un.sun_path);
1060
1061   return;
1062
1063  nocontrol:
1064   free(real);
1065   xclose_perhaps(&control_master, "control master",0);
1066   return;
1067 }
1068
1069 /*========== management of connections ==========*/
1070
1071 static void conn_closefd(Conn *conn, const char *msgprefix) {
1072   int r= close_perhaps(&conn->fd);
1073   if (r) info("C%d %serror closing socket: %s",
1074               conn->fd, msgprefix, strerror(errno));
1075 }
1076
1077 static void conn_dispose(Conn *conn) {
1078   if (!conn) return;
1079   if (conn->rd) {
1080     oop_rd_cancel(conn->rd);
1081     oop_rd_delete_kill(conn->rd);
1082     conn->rd= 0;
1083   }
1084   if (conn->fd) {
1085     loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1086     loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1087   }
1088   conn_closefd(conn,"");
1089   free(conn);
1090   until_connect= reconnect_delay_periods;
1091 }
1092
1093 static void *conn_exception(oop_source *lp, int fd,
1094                             oop_event ev, void *conn_v) {
1095   Conn *conn= conn_v;
1096   unsigned char ch;
1097   assert(fd == conn->fd);
1098   assert(ev == OOP_EXCEPTION);
1099   int r= read(conn->fd, &ch, 1);
1100   if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1101   else connfail(conn,"exceptional condition on socket (peer sent urgent"
1102                 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1103   return OOP_CONTINUE;
1104 }  
1105
1106 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1107   int requeue[art_MaxState];
1108   memset(requeue,0,sizeof(requeue));
1109
1110   Article *art;
1111   
1112   while ((art= LIST_REMHEAD(conn->priority)))
1113     LIST_ADDTAIL(art->ipf->queue, art);
1114
1115   while ((art= LIST_REMHEAD(conn->waiting)))
1116     LIST_ADDTAIL(art->ipf->queue, art);
1117
1118   while ((art= LIST_REMHEAD(conn->sent))) {
1119     requeue[art->state]++;
1120     if (art->state==art_Unsolicited) art->state= art_Unchecked;
1121     LIST_ADDTAIL(art->ipf->queue,art);
1122     check_reading_pause_resume(art->ipf);
1123   }
1124
1125   int i;
1126   XmitDetails *d;
1127   for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1128     xmit_free(d);
1129
1130   char *m= xvasprintf(fmt,al);
1131   warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1132        conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1133   free(m);
1134
1135   LIST_REMOVE(conns,conn);
1136   conn_dispose(conn);
1137   check_assign_articles();
1138 }
1139
1140 static void connfail(Conn *conn, const char *fmt, ...) {
1141   va_list al;
1142   va_start(al,fmt);
1143   vconnfail(conn,fmt,al);
1144   va_end(al);
1145 }
1146
1147 static void check_idle_conns(void) {
1148   Conn *conn;
1149   FOR_CONN(conn)
1150     conn->since_activity++;
1151  search_again:
1152   FOR_CONN(conn) {
1153     if (conn->since_activity <= need_activity_periods) continue;
1154
1155     /* We need to shut this down */
1156     if (conn->quitting)
1157       connfail(conn,"timed out waiting for response to QUIT");
1158     else if (conn->sent.count)
1159       connfail(conn,"timed out waiting for responses");
1160     else if (conn->waiting.count || conn->priority.count)
1161       connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1162     else if (conn->xmitu)
1163       connfail(conn,"peer has been sending responses"
1164                " before receiving our commands!");
1165     else {
1166       static const char quitcmd[]= "QUIT\r\n";
1167       int todo= sizeof(quitcmd)-1;
1168       const char *p= quitcmd;
1169       for (;;) {
1170         int r= write(conn->fd, p, todo);
1171         if (r<0) {
1172           if (isewouldblock(errno))
1173             connfail(conn, "blocked writing QUIT to idle connection");
1174           else
1175             connfail(conn, "failed to write QUIT to idle connection: %s",
1176                      strerror(errno));
1177           break;
1178         }
1179         assert(r<=todo);
1180         todo -= r;
1181         if (!todo) {
1182           conn->quitting= 1;
1183           conn->since_activity= 0;
1184           debug("C%d is idle, quitting", conn->fd);
1185           break;
1186         }
1187       }
1188     }
1189     goto search_again;
1190   }
1191 }  
1192
1193 /*---------- making new connections ----------*/
1194
1195 static pid_t connecting_child;
1196 static int connecting_fdpass_sock;
1197
1198 static void connect_attempt_discard(void) {
1199   if (connecting_child) {
1200     int status= xwaitpid(&connecting_child, "connect");
1201     if (!(WIFEXITED(status) ||
1202           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1203       report_child_status("connect", status);
1204   }
1205   if (connecting_fdpass_sock) {
1206     cancel_fd_read_except(connecting_fdpass_sock);
1207     xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1208   }
1209 }
1210
1211 #define PREP_DECL_MSG_CMSG(msg)                 \
1212   char msgbyte= 0;                              \
1213   struct iovec msgiov;                          \
1214   msgiov.iov_base= &msgbyte;                    \
1215   msgiov.iov_len= 1;                            \
1216   struct msghdr msg;                            \
1217   memset(&msg,0,sizeof(msg));                   \
1218   char msg##cbuf[CMSG_SPACE(sizeof(int))];      \
1219   msg.msg_iov= &msgiov;                         \
1220   msg.msg_iovlen= 1;                            \
1221   msg.msg_control= msg##cbuf;                   \
1222   msg.msg_controllen= sizeof(msg##cbuf);
1223
1224 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1225   Conn *conn= 0;
1226
1227   assert(fd == connecting_fdpass_sock);
1228
1229   PREP_DECL_MSG_CMSG(msg);
1230   
1231   ssize_t rs= recvmsg(fd, &msg, 0);
1232   if (rs<0) {
1233     if (isewouldblock(errno)) return OOP_CONTINUE;
1234     syswarn("failed to read socket from connecting child");
1235     goto x;
1236   }
1237
1238   NEW(conn);
1239   LIST_INIT(conn->waiting);
1240   LIST_INIT(conn->priority);
1241   LIST_INIT(conn->sent);
1242
1243   struct cmsghdr *h= 0;
1244   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1245   if (!h) {
1246     int status= xwaitpid(&connecting_child, "connect child (broken)");
1247
1248     if (WIFEXITED(status)) {
1249       if (WEXITSTATUS(status) != 0 &&
1250           WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1251           WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1252         /* child already reported the problem */;
1253       else {
1254         if (e == OOP_EXCEPTION)
1255           warn("connect: connection child exited code %d but"
1256                " unexpected exception on fdpass socket",
1257                WEXITSTATUS(status));
1258         else
1259           warn("connect: connection child exited code %d but"
1260                " no cmsg (rs=%d)",
1261                WEXITSTATUS(status), (int)rs);
1262       }
1263     } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1264       warn("connect: connection attempt timed out");
1265     } else {
1266       report_child_status("connect", status);
1267     }
1268     goto x;
1269   }
1270
1271 #define CHK(field, val)                                                  \
1272   if (h->cmsg_##field != val) {                                          \
1273     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1274         h->cmsg_##field, val);                                           \
1275     goto x;                                                              \
1276   }
1277   CHK(level, SOL_SOCKET);
1278   CHK(type,  SCM_RIGHTS);
1279   CHK(len,   CMSG_LEN(sizeof(conn->fd)));
1280 #undef CHK
1281
1282   if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1283
1284   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1285
1286   int status;
1287   pid_t got= waitpid(connecting_child, &status, 0);
1288   if (got==-1) sysdie("connect: real wait for child");
1289   assert(got == connecting_child);
1290   connecting_child= 0;
1291
1292   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1293   int es= WEXITSTATUS(status);
1294   switch (es) {
1295   case CONNCHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
1296   case CONNCHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
1297   default:
1298     fatal("connect: child gave unexpected exit status %d", es);
1299   }
1300
1301   /* Phew! */
1302   conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1303
1304   loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1305   conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1306   if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1307   int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1308                      &peer_rd_ok, conn,
1309                      &peer_rd_err, conn);
1310   if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1311
1312   notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1313   LIST_ADDHEAD(conns, conn);
1314
1315   connect_attempt_discard();
1316   check_assign_articles();
1317   return OOP_CONTINUE;
1318
1319  x:
1320   conn_dispose(conn);
1321   connect_attempt_discard();
1322   return OOP_CONTINUE;
1323 }
1324
1325 static int allow_connect_start(void) {
1326   return conns.count < max_connections
1327     && !connecting_child
1328     && !until_connect;
1329 }
1330
1331 static void connect_start(void) {
1332   assert(!connecting_child);
1333   assert(!connecting_fdpass_sock);
1334
1335   info("starting connection attempt");
1336
1337   int socks[2];
1338   int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1339   if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1340
1341   connecting_child= xfork("connection");
1342
1343   if (!connecting_child) {
1344     FILE *cn_from, *cn_to;
1345     char buf[NNTP_STRLEN+100];
1346     int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1347
1348     xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1349
1350     alarm(connection_setup_timeout);
1351     if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1352       int l= strlen(buf);
1353       int stripped=0;
1354       while (l>0) {
1355         unsigned char c= buf[l-1];
1356         if (!isspace(c)) break;
1357         if (c=='\n' || c=='\r') stripped=1;
1358         --l;
1359       }
1360       if (!buf[0]) {
1361         sysfatal("connect: connection attempt failed");
1362       } else {
1363         buf[l]= 0;
1364         fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1365               sanitise(buf,-1));
1366       }
1367     }
1368     if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1369       sysfatal("connect: authentication failed");
1370     if (try_stream) {
1371       if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1372           fflush(cn_to))
1373         sysfatal("connect: could not send MODE STREAM");
1374       buf[sizeof(buf)-1]= 0;
1375       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1376         if (ferror(cn_from))
1377           sysfatal("connect: could not read response to MODE STREAM");
1378         else
1379           fatal("connect: connection close in response to MODE STREAM");
1380       }
1381       int l= strlen(buf);
1382       assert(l>=1);
1383       if (buf[l-1]!='\n')
1384         fatal("connect: response to MODE STREAM is too long: %.100s...",
1385               sanitise(buf,-1));
1386       l--;  if (l>0 && buf[l-1]=='\r') l--;
1387       buf[l]= 0;
1388       char *ep;
1389       int rcode= strtoul(buf,&ep,10);
1390       if (ep != &buf[3])
1391         fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1392
1393       switch (rcode) {
1394       case 203:
1395         exitstatus= CONNCHILD_ESTATUS_STREAM;
1396         break;
1397       case 480:
1398       case 500:
1399         break;
1400       default:
1401         warn("connect: unexpected response to MODE STREAM: %.50s",
1402              sanitise(buf,-1));
1403         exitstatus= 2;
1404         break;
1405       }
1406     }
1407     int fd= fileno(cn_from);
1408
1409     PREP_DECL_MSG_CMSG(msg);
1410     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1411     cmsg->cmsg_level= SOL_SOCKET;
1412     cmsg->cmsg_type=  SCM_RIGHTS;
1413     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
1414     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1415
1416     msg.msg_controllen= cmsg->cmsg_len;
1417     r= sendmsg(socks[1], &msg, 0);
1418     if (r<0) sysdie("sendmsg failed for new connection");
1419     if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1420
1421     _exit(exitstatus);
1422   }
1423
1424   xclose(socks[1], "connecting fdpass child's socket",0);
1425   connecting_fdpass_sock= socks[0];
1426   xsetnonblock(connecting_fdpass_sock, 1);
1427   on_fd_read_except(connecting_fdpass_sock, connchild_event);
1428 }
1429
1430 /*---------- assigning articles to conns, and transmitting ----------*/
1431
1432 static Article *dequeue_from(int peek, InputFile *ipf) {
1433   if (!ipf) return 0;
1434   if (peek) return LIST_HEAD(ipf->queue);
1435
1436   Article *art= LIST_REMHEAD(ipf->queue);
1437   if (!art) return 0;
1438   check_reading_pause_resume(ipf);
1439   return art;
1440 }
1441
1442 static Article *dequeue(int peek) {
1443   Article *art;
1444   art= dequeue_from(peek, flushing_input_file);  if (art) return art;
1445   art= dequeue_from(peek, backlog_input_file);   if (art) return art;
1446   art= dequeue_from(peek, main_input_file);      if (art) return art;
1447   return 0;
1448 }
1449
1450 static void check_assign_articles(void) {
1451   for (;;) {
1452     if (!dequeue(1))
1453       break;
1454
1455     Conn *walk, *use=0;
1456     int spare=0, inqueue=0;
1457
1458     /* Find a connection to offer this article.  We prefer a busy
1459      * connection to an idle one, provided it's not full.  We take the
1460      * first (oldest) and since that's stable, it will mean we fill up
1461      * connections in order.  That way if we have too many
1462      * connections, the spare ones will go away eventually.
1463      */
1464     FOR_CONN(walk) {
1465       if (walk->quitting) continue;
1466       inqueue= walk->sent.count + walk->priority.count
1467              + walk->waiting.count;
1468       spare= walk->max_queue - inqueue;
1469       assert(inqueue <= max_queue_per_conn);
1470       assert(spare >= 0);
1471       if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1472       else if (spare>0) /*working*/ { use= walk; break; }
1473     }
1474     if (use) {
1475       if (!inqueue) use->since_activity= 0; /* reset idle counter */
1476       while (spare>0) {
1477         Article *art= dequeue(0);
1478         if (!art) break;
1479         LIST_ADDTAIL(use->waiting, art);
1480         spare--;
1481       }
1482       conn_maybe_write(use);
1483     } else if (allow_connect_start()) {
1484       until_connect= reconnect_delay_periods;
1485       connect_start();
1486       break;
1487     } else {
1488       break;
1489     }
1490   }
1491 }
1492
1493 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1494   conn_maybe_write(u);
1495   return OOP_CONTINUE;
1496 }
1497
1498 static void conn_maybe_write(Conn *conn)  {
1499   for (;;) {
1500     conn_make_some_xmits(conn);
1501     if (!conn->xmitu) {
1502       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1503       return;
1504     }
1505
1506     void *rp= conn_write_some_xmits(conn);
1507     if (rp==OOP_CONTINUE) {
1508       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1509       return;
1510     } else if (rp==OOP_HALT) {
1511       return;
1512     } else if (!rp) {
1513       /* transmitted everything */
1514     } else {
1515       abort();
1516     }
1517   }
1518 }
1519
1520 /*---------- expiry, flow control and deferral ----------*/
1521
1522 /*
1523  * flow control notes
1524  * to ensure articles go away eventually
1525  * separate queue for each input file
1526  *   queue expiry
1527  *     every period, check head of backlog queue for expiry with SMretrieve
1528  *       if too old: discard, and check next article
1529  *     also check every backlog article as we read it
1530  *   flush expiry
1531  *     after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
1532  *     one-off: eat queued articles from flushing and write them to defer
1533  *     one-off: connfail all connections which have any articles from flushing
1534  *     newly read articles from flushing go straight to defer
1535  *     this should take care of it and get us out of this state
1536  * to avoid filling up ram needlessly
1537  *   input control
1538  *     limit number of queued articles for each ipf
1539  *     pause/resume inputfile tailing
1540  */
1541
1542 static void check_reading_pause_resume(InputFile *ipf) {
1543   if (ipf->queue.count >= max_queue_per_ipf)
1544     inputfile_reading_pause(ipf);
1545   else
1546     inputfile_reading_resume(ipf);
1547 }
1548
1549 static void article_defer(Article *art /* not on a queue */, int whichcount) {
1550   open_defer();
1551   if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1552       || fflush(defer))
1553     sysfatal("write to defer file %s",path_defer);
1554   article_done(art, whichcount);
1555 }
1556
1557 static int article_check_expired(Article *art /* must be queued, not conn */) {
1558   ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
1559   if (artdata) { SMfreearticle(artdata); return 0; }
1560
1561   LIST_REMOVE(art->ipf->queue, art);
1562   art->missing= 1;
1563   art->ipf->count_nooffer_missing++;
1564   article_done(art,-1);
1565   return 1;
1566 }
1567
1568 static void inputfile_queue_check_expired(InputFile *ipf) {
1569   if (!ipf) return;
1570
1571   for (;;) {
1572     Article *art= LIST_HEAD(ipf->queue);
1573     int exp= article_check_expired(art);
1574     if (!exp) break;
1575   }
1576   check_reading_pause_resume(ipf);
1577 }
1578
1579 static void article_autodefer(InputFile *ipf, Article *art) {
1580   ipf->autodefer++;
1581   article_defer(art,-1);
1582 }
1583
1584 static int has_article_in(const ArticleList *al, InputFile *ipf) {
1585   Article *art;
1586   for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
1587     if (art->ipf == ipf) return 1;
1588   return 0;
1589 }
1590
1591 static void autodefer_input_file_articles(InputFile *ipf) {
1592   Article *art;
1593   while ((art= LIST_REMHEAD(ipf->queue)))
1594     article_autodefer(ipf, art);
1595 }
1596
1597 static void autodefer_input_file(InputFile *ipf) {
1598   ipf->autodefer= 0;
1599
1600   autodefer_input_file_articles(ipf);
1601
1602   if (ipf->inprogress) {
1603     Conn *walk;
1604     FOR_CONN(walk) {
1605       if (has_article_in(&walk->waiting,  ipf) ||
1606           has_article_in(&walk->priority, ipf) ||
1607           has_article_in(&walk->sent,     ipf))
1608         walk->quitting= -1;
1609     }
1610     while (ipf->inprogress) {
1611       FOR_CONN(walk)
1612         if (walk->quitting < 0) goto found;
1613       abort(); /* where are they ?? */
1614
1615     found:
1616       connfail(walk, "connection is stuck or crawling,"
1617                " and we need to finish flush");
1618       autodefer_input_file_articles(ipf);
1619     }
1620   }
1621
1622   check_reading_pause_resume(ipf);
1623 }
1624
1625 /*========== article transmission ==========*/
1626
1627 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1628                   XmitKind kind) { /* caller must then fill in details */
1629   struct iovec *v= &conn->xmit[conn->xmitu];
1630   XmitDetails *d= &conn->xmitd[conn->xmitu++];
1631   v->iov_base= (char*)data;
1632   v->iov_len= len;
1633   d->kind= kind;
1634   return d;
1635 }
1636
1637 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1638   xmit_core(conn,data,len, xk_Const);
1639 }
1640 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1641
1642 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1643   XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1644   d->info.sm_art= ah;
1645 }
1646
1647 static void xmit_free(XmitDetails *d) {
1648   switch (d->kind) {
1649   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1650   case xk_Const:                                  break;
1651   default: abort();
1652   }
1653 }
1654
1655 static void *conn_write_some_xmits(Conn *conn) {
1656   /* return values:
1657    *      0:            nothing more to write, no need to call us again
1658    *      OOP_CONTINUE: more to write but fd not writeable
1659    *      OOP_HALT:     disaster, have destroyed conn
1660    */
1661   for (;;) {
1662     int count= conn->xmitu;
1663     if (!count) return 0;
1664
1665     if (count > IOV_MAX) count= IOV_MAX;
1666     ssize_t rs= writev(conn->fd, conn->xmit, count);
1667     if (rs < 0) {
1668       if (isewouldblock(errno)) return OOP_CONTINUE;
1669       connfail(conn, "write failed: %s", strerror(errno));
1670       return OOP_HALT;
1671     }
1672     assert(rs > 0);
1673
1674     int done;
1675     for (done=0; rs && done<conn->xmitu; done++) {
1676       struct iovec *vp= &conn->xmit[done];
1677       XmitDetails *dp= &conn->xmitd[done];
1678       if (rs > vp->iov_len) {
1679         rs -= vp->iov_len;
1680         xmit_free(dp);
1681       } else {
1682         vp->iov_base= (char*)vp->iov_base + rs;
1683         vp->iov_len -= rs;
1684       }
1685     }
1686     int newu= conn->xmitu - done;
1687     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
1688     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1689     conn->xmitu= newu;
1690   }
1691 }
1692
1693 static void conn_make_some_xmits(Conn *conn) {
1694   for (;;) {
1695     if (conn->xmitu+5 > CONNIOVS)
1696       break;
1697
1698     Article *art= LIST_REMHEAD(conn->priority);
1699     if (!art) art= LIST_REMHEAD(conn->waiting);
1700     if (!art) break;
1701
1702     if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1703       /* actually send it */
1704
1705       ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1706
1707       art->state=
1708         art->state == art_Unchecked ? art_Unsolicited :
1709         art->state == art_Wanted    ? art_Wanted      :
1710         (abort(),-1);
1711
1712       if (!artdata) art->missing= 1;
1713       art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1714
1715       if (conn->stream) {
1716         if (artdata) {
1717           XMIT_LITERAL("TAKETHIS ");
1718           xmit_noalloc(conn, art->messageid, art->midlen);
1719           XMIT_LITERAL("\r\n");
1720           xmit_artbody(conn, artdata);
1721         } else {
1722           article_done(art, -1);
1723           continue;
1724         }
1725       } else {
1726         /* we got 235 from IHAVE */
1727         if (artdata) {
1728           xmit_artbody(conn, artdata);
1729         } else {
1730           XMIT_LITERAL(".\r\n");
1731         }
1732       }
1733
1734       LIST_ADDTAIL(conn->sent, art);
1735
1736     } else {
1737       /* check it */
1738
1739       if (conn->stream)
1740         XMIT_LITERAL("CHECK ");
1741       else
1742         XMIT_LITERAL("IHAVE ");
1743       xmit_noalloc(conn, art->messageid, art->midlen);
1744       XMIT_LITERAL("\r\n");
1745
1746       assert(art->state == art_Unchecked);
1747       art->ipf->counts[art->state][RC_sent]++;
1748       LIST_ADDTAIL(conn->sent, art);
1749     }
1750   }
1751 }
1752
1753 /*========== handling responses from peer ==========*/
1754
1755 static const oop_rd_style peer_rd_style= {
1756   OOP_RD_DELIM_STRIP, '\n',
1757   OOP_RD_NUL_FORBID,
1758   OOP_RD_SHORTREC_FORBID
1759 };
1760
1761 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1762                          const char *errmsg, int errnoval,
1763                          const char *data, size_t recsz, void *conn_v) {
1764   Conn *conn= conn_v;
1765   connfail(conn, "error receiving from peer: %s", errmsg);
1766   return OOP_CONTINUE;
1767 }
1768
1769 static Article *article_reply_check(Conn *conn, const char *response,
1770                                     int code_indicates_streaming,
1771                                     int must_have_sent
1772                                         /* 1:yes, -1:no, 0:dontcare */,
1773                                     const char *sanitised_response) {
1774   Article *art= LIST_HEAD(conn->sent);
1775
1776   if (!art) {
1777     connfail(conn,
1778              "peer gave unexpected response when no commands outstanding: %s",
1779              sanitised_response);
1780     return 0;
1781   }
1782
1783   if (code_indicates_streaming) {
1784     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1785     if (!conn->stream) {
1786       connfail(conn, "peer gave streaming response code "
1787                " to IHAVE or subsequent body: %s", sanitised_response);
1788       return 0;
1789     }
1790     const char *got_mid= response+4;
1791     int got_midlen= strcspn(got_mid, " \n\r");
1792     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1793       connfail(conn, "peer gave streaming response with syntactically invalid"
1794                " messageid: %s", sanitised_response);
1795       return 0;
1796     }
1797     if (got_midlen != art->midlen ||
1798         memcmp(got_mid, art->messageid, got_midlen)) {
1799       connfail(conn, "peer gave streaming response code to wrong article -"
1800                " probable synchronisation problem; we offered: %s;"
1801                " peer said: %s",
1802                art->messageid, sanitised_response);
1803       return 0;
1804     }
1805   } else {
1806     if (conn->stream) {
1807       connfail(conn, "peer gave non-streaming response code to"
1808                " CHECK/TAKETHIS: %s", sanitised_response);
1809       return 0;
1810     }
1811   }
1812
1813   if (must_have_sent>0 && art->state < art_Wanted) {
1814     connfail(conn, "peer says article accepted but"
1815              " we had not sent the body: %s", sanitised_response);
1816     return 0;
1817   }
1818   if (must_have_sent<0 && art->state >= art_Wanted) {
1819     connfail(conn, "peer says please sent the article but we just did: %s",
1820              sanitised_response);
1821     return 0;
1822   }
1823
1824   Article *art_again= LIST_REMHEAD(conn->sent);
1825   assert(art_again == art);
1826   return art;
1827 }
1828
1829 static void update_nocheck(int accepted) {
1830   accept_proportion *= nocheck_decay;
1831   accept_proportion += accepted * (1.0 - nocheck_decay);
1832   int new_nocheck= accept_proportion >= nocheck_thresh;
1833   if (new_nocheck && !nocheck_reported) {
1834     notice("entering nocheck mode for the first time");
1835     nocheck_reported= 1;
1836   } else if (new_nocheck != nocheck) {
1837     debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1838   }
1839   nocheck= new_nocheck;
1840 }
1841
1842 static void article_done(Article *art, int whichcount) {
1843   if (whichcount>=0 && !art->missing)
1844     art->ipf->counts[art->state][whichcount]++;
1845
1846   if (whichcount == RC_accepted) update_nocheck(1);
1847   else if (whichcount == RC_unwanted) update_nocheck(0);
1848
1849   InputFile *ipf= art->ipf;
1850
1851   while (art->blanklen) {
1852     static const char spaces[]=
1853       "                                                                "
1854       "                                                                "
1855       "                                                                "
1856       "                                                                "
1857       "                                                                "
1858       "                                                                "
1859       "                                                                "
1860       "                                                                "
1861       "                                                                ";
1862     int w= art->blanklen;  if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1863     int r= pwrite(ipf->fd, spaces, w, art->offset);
1864     if (r==-1) {
1865       if (errno==EINTR) continue;
1866       sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1867              art->messageid, art->blanklen,
1868              (unsigned long)art->offset, ipf->path);
1869     }
1870     assert(r>=0 && r<=w);
1871     art->blanklen -= w;
1872     art->offset += w;
1873   }
1874
1875   ipf->inprogress--;
1876   assert(ipf->inprogress >= 0);
1877   free(art);
1878
1879   if (!ipf->inprogress && ipf != main_input_file)
1880     queue_check_input_done();
1881 }
1882
1883 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1884                         const char *errmsg, int errnoval,
1885                         const char *data, size_t recsz, void *conn_v) {
1886   Conn *conn= conn_v;
1887
1888   if (ev == OOP_RD_EOF) {
1889     connfail(conn, "unexpected EOF from peer");
1890     return OOP_CONTINUE;
1891   }
1892   assert(ev == OOP_RD_OK);
1893
1894   char *sani= sanitise(data,-1);
1895
1896   char *ep;
1897   unsigned long code= strtoul(data, &ep, 10);
1898   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1899     connfail(conn, "badly formatted response from peer: %s", sani);
1900     return OOP_CONTINUE;
1901   }
1902
1903   int conn_busy=
1904     conn->waiting.count ||
1905     conn->priority.count ||
1906     conn->sent.count ||
1907     conn->xmitu;
1908
1909   if (conn->quitting) {
1910     if (code!=205 && code!=503) {
1911       connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1912     } else {
1913       notice("C%d idle connection closed by us", conn->fd);
1914       assert(!conn_busy);
1915       LIST_REMOVE(conns,conn);
1916       conn_dispose(conn);
1917     }
1918     return OOP_CONTINUE;
1919   }
1920
1921   conn->since_activity= 0;
1922   Article *art;
1923
1924 #define GET_ARTICLE(musthavesent) do{                                         \
1925     art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1926     if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
1927   }while(0) 
1928
1929 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{       \
1930     code_streaming= (streaming);                                \
1931     GET_ARTICLE(musthavesent);                                  \
1932     article_done(art, RC_##how);                                \
1933     goto dealtwith;                                             \
1934   }while(0)
1935
1936 #define PEERBADMSG(m) do {                                      \
1937     connfail(conn, m ": %s", sani);  return OOP_CONTINUE;       \
1938   }while(0)
1939
1940   int code_streaming= 0;
1941
1942   switch (code) {
1943
1944   default:  PEERBADMSG("peer sent unexpected message");
1945
1946   case 400:
1947     if (conn_busy)
1948       PEERBADMSG("peer timed us out or stopped accepting articles");
1949
1950     notice("C%d idle connection closed by peer", conn->fd);
1951     LIST_REMOVE(conns,conn);
1952     conn_dispose(conn);
1953     return OOP_CONTINUE;
1954
1955   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1956   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1957
1958   case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1959   case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1960
1961   case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1962   case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1963
1964   case 238: /* CHECK says send it */
1965     code_streaming= 1;
1966   case 335: /* IHAVE says send it */
1967     GET_ARTICLE(-1);
1968     assert(art->state == art_Unchecked);
1969     art->ipf->counts[art->state][RC_accepted]++;
1970     art->state= art_Wanted;
1971     LIST_ADDTAIL(conn->priority, art);
1972     break;
1973
1974   case 431: /* CHECK or TAKETHIS says try later */
1975     code_streaming= 1;
1976   case 436: /* IHAVE says try later */
1977     GET_ARTICLE(0);
1978     article_defer(art, RC_deferred);
1979     break;
1980
1981   }
1982 dealtwith:
1983
1984   conn_maybe_write(conn);
1985   check_assign_articles();
1986   return OOP_CONTINUE;
1987 }
1988
1989
1990 /*========== monitoring of input files ==========*/
1991
1992 static void feedfile_eof(InputFile *ipf) {
1993   assert(ipf != main_input_file); /* promised by tailing_try_read */
1994   inputfile_reading_stop(ipf);
1995
1996   if (ipf == flushing_input_file) {
1997     assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1998     if (main_input_file) inputfile_reading_start(main_input_file);
1999     statemc_check_flushing_done();
2000   } else if (ipf == backlog_input_file) {
2001     statemc_check_backlog_done();
2002   } else {
2003     abort(); /* supposed to wait rather than get EOF on main input file */
2004   }
2005 }
2006
2007 static InputFile *open_input_file(const char *path) {
2008   int fd= open(path, O_RDWR);
2009   if (fd<0) {
2010     if (errno==ENOENT) return 0;
2011     sysfatal("unable to open input file %s", path);
2012   }
2013   assert(fd>0);
2014
2015   InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
2016   memset(ipf,0,sizeof(*ipf));
2017
2018   ipf->fd= fd;
2019   ipf->autodefer= -1;
2020   LIST_INIT(ipf->queue);
2021   strcpy(ipf->path, path);
2022
2023   return ipf;
2024 }
2025
2026 static void close_input_file(InputFile *ipf) { /* does not free */
2027   assert(!ipf->readable_callback); /* must have had ->on_cancel */
2028   assert(!ipf->filemon); /* must have had inputfile_reading_stop */
2029   assert(!ipf->rd); /* must have had inputfile_reading_stop */
2030   assert(!ipf->inprogress); /* no dangling pointers pointing here */
2031   xclose_perhaps(&ipf->fd, "input file ", ipf->path);
2032 }
2033
2034
2035 /*---------- dealing with articles read in the input file ----------*/
2036
2037 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
2038                                    const char *data, const char *how) {
2039   warn("corrupted file: %s, offset %lu: %s: in %s",
2040        ipf->path, (unsigned long)offset, how, sanitise(data,-1));
2041   ipf->readcount_err++;
2042   if (ipf->readcount_err > max_bad_data_initial +
2043       (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
2044     die("too much garbage in input file!  (%d errs, %d ok, %d blank)",
2045         ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
2046   return OOP_CONTINUE;
2047 }
2048
2049 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
2050                                oop_rd_event ev, const char *errmsg,
2051                                int errnoval, const char *data, size_t recsz,
2052                                void *ipf_v) {
2053   InputFile *ipf= ipf_v;
2054   assert(ev == OOP_RD_SYSTEM);
2055   errno= errnoval;
2056   sysdie("error reading input file: %s, offset %lu",
2057          ipf->path, (unsigned long)ipf->offset);
2058 }
2059
2060 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
2061                                   oop_rd_event ev, const char *errmsg,
2062                                   int errnoval, const char *data, size_t recsz,
2063                                   void *ipf_v) {
2064   InputFile *ipf= ipf_v;
2065   Article *art;
2066   char tokentextbuf[sizeof(TOKEN)*2+3];
2067
2068   if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
2069
2070   off_t old_offset= ipf->offset;
2071   ipf->offset += recsz + !!(ev == OOP_RD_OK);
2072
2073 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
2074
2075   if (ev==OOP_RD_PARTREC)
2076     feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
2077     /* but process it anyway */
2078
2079   if (ipf->skippinglong) {
2080     if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
2081     return OOP_CONTINUE;
2082   }
2083   if (ev==OOP_RD_LONG) {
2084     ipf->skippinglong= 1;
2085     X_BAD_DATA("overly long line");
2086   }
2087
2088   if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
2089   if (!recsz) X_BAD_DATA("empty line");
2090
2091   if (data[0]==' ') {
2092     if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
2093     ipf->readcount_blank++;
2094     return OOP_CONTINUE;
2095   }
2096
2097   char *space= strchr(data,' ');
2098   int tokenlen= space-data;
2099   int midlen= (int)recsz-tokenlen-1;
2100   if (midlen <= 2) X_BAD_DATA("no room for messageid");
2101   if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
2102
2103   if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
2104   memcpy(tokentextbuf, data, tokenlen);
2105   tokentextbuf[tokenlen]= 0;
2106   if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
2107
2108   ipf->readcount_ok++;
2109
2110   art= xmalloc(sizeof(*art) - 1 + midlen + 1);
2111   memset(art,0,sizeof(*art));
2112   art->state= art_Unchecked;
2113   art->midlen= midlen;
2114   art->ipf= ipf;  ipf->inprogress++;
2115   art->token= TextToToken(tokentextbuf);
2116   art->offset= old_offset;
2117   art->blanklen= recsz;
2118   strcpy(art->messageid, space+1);
2119   LIST_ADDTAIL(ipf->queue, art);
2120
2121   if (ipf->autodefer >= 0)
2122     article_autodefer(ipf, art);
2123   else if (ipf==backlog_input_file)
2124     article_check_expired(art);
2125
2126   if (sms==sm_NORMAL && ipf==main_input_file &&
2127       ipf->offset >= target_max_feedfile_size)
2128     statemc_start_flush("feed file size");
2129
2130   check_assign_articles(); /* may destroy conn but that's OK */
2131   check_reading_pause_resume(ipf);
2132   return OOP_CONTINUE;
2133 }
2134
2135 /*========== tailing input file ==========*/
2136
2137 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
2138                                      void *user) {
2139   InputFile *ipf= user;
2140   return ipf->readable_callback(loop, &ipf->readable,
2141                                 ipf->readable_callback_user);
2142 }
2143
2144 static void tailing_on_cancel(struct oop_readable *rable) {
2145   InputFile *ipf= (void*)rable;
2146
2147   if (ipf->filemon) filemon_stop(ipf);
2148   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2149   ipf->readable_callback= 0;
2150 }
2151
2152 static void tailing_queue_readable(InputFile *ipf) {
2153   /* lifetime of ipf here is OK because destruction will cause
2154    * on_cancel which will cancel this callback */
2155   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2156 }
2157
2158 static int tailing_on_readable(struct oop_readable *rable,
2159                                 oop_readable_call *cb, void *user) {
2160   InputFile *ipf= (void*)rable;
2161
2162   tailing_on_cancel(rable);
2163   ipf->readable_callback= cb;
2164   ipf->readable_callback_user= user;
2165   filemon_start(ipf);
2166
2167   tailing_queue_readable(ipf);
2168   return 0;
2169 }
2170
2171 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2172                                 size_t length) {
2173   InputFile *ipf= (void*)rable;
2174   for (;;) {
2175     ssize_t r= read(ipf->fd, buffer, length);
2176     if (r==-1) {
2177       if (errno==EINTR) continue;
2178       return r;
2179     }
2180     if (!r) {
2181       if (ipf==main_input_file) {
2182         errno=EAGAIN;
2183         return -1;
2184       } else if (ipf==flushing_input_file) {
2185         assert(ipf->rd);
2186         assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2187       } else if (ipf==backlog_input_file) {
2188         assert(ipf->rd);
2189       } else {
2190         abort();
2191       }
2192     }
2193     tailing_queue_readable(ipf);
2194     return r;
2195   }
2196 }
2197
2198 /*---------- filemon implemented with inotify ----------*/
2199
2200 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2201 #define HAVE_FILEMON
2202
2203 #include <sys/inotify.h>
2204
2205 static int filemon_inotify_fd;
2206 static int filemon_inotify_wdmax;
2207 static InputFile **filemon_inotify_wd2ipf;
2208
2209 struct Filemon_Perfile {
2210   int wd;
2211 };
2212
2213 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2214   int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2215   if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2216
2217   if (wd >= filemon_inotify_wdmax) {
2218     int newmax= wd+2;
2219     filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2220                                  sizeof(*filemon_inotify_wd2ipf) * newmax);
2221     memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2222            sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2223     filemon_inotify_wdmax= newmax;
2224   }
2225
2226   assert(!filemon_inotify_wd2ipf[wd]);
2227   filemon_inotify_wd2ipf[wd]= ipf;
2228
2229   debug("filemon inotify startfile %p wd=%d wdmax=%d",
2230         ipf, wd, filemon_inotify_wdmax);
2231
2232   pf->wd= wd;
2233 }
2234
2235 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2236   int wd= pf->wd;
2237   debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2238   int r= inotify_rm_watch(filemon_inotify_fd, wd);
2239   if (r) sysdie("inotify_rm_watch");
2240   filemon_inotify_wd2ipf[wd]= 0;
2241 }
2242
2243 static void *filemon_inotify_readable(oop_source *lp, int fd,
2244                                       oop_event e, void *u) {
2245   struct inotify_event iev;
2246   for (;;) {
2247     int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2248     if (r==-1) {
2249       if (isewouldblock(errno)) break;
2250       sysdie("read from inotify master");
2251     } else if (r==sizeof(iev)) {
2252       assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2253     } else {
2254       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2255     }
2256     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2257     debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2258     filemon_callback(ipf);
2259   }
2260   return OOP_CONTINUE;
2261 }
2262
2263 static int filemon_method_init(void) {
2264   filemon_inotify_fd= inotify_init();
2265   if (filemon_inotify_fd<0) {
2266     syswarn("filemon/inotify: inotify_init failed");
2267     return 0;
2268   }
2269   xsetnonblock(filemon_inotify_fd, 1);
2270   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2271
2272   debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2273   return 1;
2274 }
2275
2276 static void filemon_method_dump_info(FILE *f) {
2277   int i;
2278   fprintf(f,"inotify");
2279   DUMPV("%d",,filemon_inotify_fd);
2280   DUMPV("%d",,filemon_inotify_wdmax);
2281   for (i=0; i<filemon_inotify_wdmax; i++)
2282     fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i]);
2283 }
2284
2285 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2286
2287 /*---------- filemon dummy implementation ----------*/
2288
2289 #if !defined(HAVE_FILEMON)
2290
2291 struct Filemon_Perfile { int dummy; };
2292
2293 static int filemon_method_init(void) {
2294   warn("filemon/dummy: no filemon method compiled in");
2295   return 0;
2296 }
2297 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2298 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2299 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2300
2301 #endif /* !HAVE_FILEMON */
2302
2303 /*---------- filemon generic interface ----------*/
2304
2305 static void filemon_start(InputFile *ipf) {
2306   assert(!ipf->filemon);
2307
2308   NEW(ipf->filemon);
2309   filemon_method_startfile(ipf, ipf->filemon);
2310 }
2311
2312 static void filemon_stop(InputFile *ipf) {
2313   if (!ipf->filemon) return;
2314   filemon_method_stopfile(ipf, ipf->filemon);
2315   free(ipf->filemon);
2316   ipf->filemon= 0;
2317 }
2318
2319 static void filemon_callback(InputFile *ipf) {
2320   if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2321     ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2322 }
2323
2324 /*---------- interface to start and stop an input file ----------*/
2325
2326 static const oop_rd_style feedfile_rdstyle= {
2327   OOP_RD_DELIM_STRIP, '\n',
2328   OOP_RD_NUL_PERMIT,
2329   OOP_RD_SHORTREC_LONG,
2330 };
2331
2332 static void inputfile_reading_resume(InputFile *ipf) {
2333   if (!ipf->rd) return;
2334   if (!ipf->paused) return;
2335
2336   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2337                      feedfile_got_article,ipf, feedfile_read_err, ipf);
2338   if (r) sysdie("unable start reading feedfile %s",ipf->path);
2339
2340   ipf->paused= 0;
2341 }
2342
2343 static void inputfile_reading_pause(InputFile *ipf) {
2344   if (!ipf->rd) return;
2345   if (ipf->paused) return;
2346   oop_rd_cancel(ipf->rd);
2347   ipf->paused= 1;
2348 }
2349
2350 static void inputfile_reading_start(InputFile *ipf) {
2351   assert(!ipf->rd);
2352   ipf->readable.on_readable= tailing_on_readable;
2353   ipf->readable.on_cancel=   tailing_on_cancel;
2354   ipf->readable.try_read=    tailing_try_read;
2355   ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2356   ipf->readable.delete_kill= 0;
2357
2358   ipf->readable_callback= 0;
2359   ipf->readable_callback_user= 0;
2360
2361   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2362   assert(ipf->rd);
2363
2364   ipf->paused= 1;
2365   inputfile_reading_resume(ipf);
2366 }
2367
2368 static void inputfile_reading_stop(InputFile *ipf) {
2369   assert(ipf->rd);
2370   inputfile_reading_pause(ipf);
2371   oop_rd_delete(ipf->rd);
2372   ipf->rd= 0;
2373   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2374 }
2375
2376
2377 /*========== interaction with innd - state machine ==========*/
2378
2379 /* See official state diagram at top of file.  We implement
2380  * this as follows:
2381  * -8<-
2382
2383             .=======.
2384             ||START||
2385             `======='
2386                 |
2387                 | open F
2388                 |
2389                 |    F ENOENT
2390                 |`---------------------------------------------------.
2391       F OPEN OK |                                                    |
2392                 |`---------------- - - -                             |
2393        D ENOENT |       D EXISTS   see OVERALL STATES diagram        |
2394                 |                  for full startup logic            |
2395      ,--------->|                                                    |
2396      |          V                                                    |
2397      |     ============                                       try to |
2398      |      NORMAL                                            open D |
2399      |     [Normal]                                                  |
2400      |      main F tail                                              |
2401      |     ============                                              V
2402      |          |                                                    |
2403      |          | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT            |
2404      ^          | hardlink F to D                                    |
2405      |     [Hardlinked]                                              |
2406      |          | unlink F                                           |
2407      |          | our handle onto F is now onto D                    |
2408      |     [Moved]                                                   |
2409      |          |                                                    |
2410      |          |<-------------------<---------------------<---------+
2411      |          |                                                    |
2412      |          | spawn inndcomm flush                               |
2413      |          V                                                    |
2414      |     ==================                                        |
2415      |      FLUSHING[-ABSENT]                                        |
2416      |     [Flushing]                                                |
2417      |     main D tail/none                                          |
2418      |     ==================                                        |
2419      |          |                                                    |
2420      |          |   INNDCOMM FLUSH FAILS                             ^
2421      |          |`----------------------->----------.                |
2422      |          |                                   |                |
2423      |          |   NO SUCH SITE                    V                |
2424      ^          |`--------------->----.         ==================== |
2425      |          |                      \        FLUSHFAILED[-ABSENT] |
2426      |          |                       \         [Moved]            |
2427      |          | FLUSH OK               \       main D tail/none    |
2428      |          | open F                  \     ==================== |
2429      |          |                          \        |                |
2430      |          |                           \       | TIME TO RETRY  |
2431      |          |`------->----.     ,---<---'\      `----------------'
2432      |          |    D NONE   |     | D NONE  `----.
2433      |          V             |     |              V
2434      |     =============      V     V             ============
2435      |      SEPARATED-1       |     |              DROPPING-1
2436      |      flsh->rd!=0       |     |              flsh->rd!=0
2437      |     [Separated]        |     |             [Dropping]
2438      |      main F idle       |     |              main none
2439      |      flsh D tail       |     |              flsh D tail
2440      |     =============      |     |             ============
2441      |          |             |     | install       |
2442      ^          | EOF ON D    |     |  defer        | EOF ON D
2443      |          V             |     |               V
2444      |     ===============    |     |             ===============
2445      |      SEPARATED-2       |     |              DROPPING-2
2446      |      flsh->rd==0       |     V              flsh->rd==0
2447      |     [Finishing]        |     |             [Dropping]
2448      |      main F tail       |     `.             main none
2449      |      flsh D closed     |       `.           flsh D closed
2450      |     ===============    V         `.        ===============
2451      |          |                         `.          |
2452      |          | ALL D PROCESSED           `.        | ALL D PROCESSED
2453      |          V install defer as backlog    `.      | install defer
2454      ^          | close D                       `.    | close D
2455      |          | unlink D                        `.  | unlink D
2456      |          |                                  |  |
2457      |          |                                  V  V
2458      `----------'                               ==============
2459                                                  DROPPED
2460                                                 [Dropped]
2461                                                  main none
2462                                                  flsh none
2463                                                  some backlog
2464                                                 ==============
2465                                                       |
2466                                                       | ALL BACKLOG DONE
2467                                                       |
2468                                                       | unlink lock
2469                                                       | exit
2470                                                       V
2471                                                   ==========
2472                                                    (ESRCH)
2473                                                   [Droppped]
2474                                                   ==========
2475  * ->8-
2476  */
2477
2478 static void startup_set_input_file(InputFile *f) {
2479   assert(!main_input_file);
2480   main_input_file= f;
2481   inputfile_reading_start(f);
2482 }
2483
2484 static void statemc_lock(void) {
2485   int lockfd;
2486   struct stat stab, stabf;
2487   
2488   for (;;) {
2489     lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2490     if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2491
2492     struct flock fl;
2493     memset(&fl,0,sizeof(fl));
2494     fl.l_type= F_WRLCK;
2495     fl.l_whence= SEEK_SET;
2496     int r= fcntl(lockfd, F_SETLK, &fl);
2497     if (r==-1) {
2498       if (errno==EACCES || isewouldblock(errno)) {
2499         if (quiet_multiple) exit(0);
2500         fatal("another duct holds the lockfile");
2501       }
2502       sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2503     }
2504
2505     xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2506     int lock_noent;
2507     xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2508
2509     if (!lock_noent && samefile(&stab, &stabf))
2510       break;
2511
2512     xclose(lockfd, "stale lockfile ", path_lock);
2513   }
2514
2515   FILE *lockfile= fdopen(lockfd, "w");
2516   if (!lockfile) sysdie("fdopen lockfile");
2517
2518   int r= ftruncate(lockfd, 0);
2519   if (r) sysdie("truncate lockfile to write new info");
2520
2521   if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2522               (unsigned long)self_pid,
2523               sitename, feedfile, remote_host) == EOF ||
2524       fflush(lockfile))
2525     sysfatal("write info to lockfile %s", path_lock);
2526
2527   debug("startup: locked");
2528 }
2529
2530 static void statemc_init(void) {
2531   struct stat stabdefer;
2532
2533   search_backlog_file();
2534
2535   int defer_noent;
2536   xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2537   if (defer_noent) {
2538     debug("startup: ductdefer ENOENT");
2539   } else {
2540     debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2541     switch (stabdefer.st_nlink==1) {
2542     case 1:
2543       open_defer(); /* so that we will later close it and rename it */
2544       break;
2545     case 2:
2546       xunlink(path_defer, "stale defer file link"
2547               " (presumably hardlink to backlog file)");
2548       break;
2549     default:
2550       die("defer file %s has unexpected link count %d",
2551           path_defer, stabdefer.st_nlink);
2552     }
2553   }
2554
2555   struct stat stab_f, stab_d;
2556   int noent_f;
2557
2558   InputFile *file_d= open_input_file(path_flushing);
2559   if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2560
2561   xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2562
2563   if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2564     debug("startup: F==D => Hardlinked");
2565     xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2566     noent_f= 1;
2567   }
2568
2569   if (noent_f) {
2570     debug("startup: F ENOENT => Moved");
2571     if (file_d) startup_set_input_file(file_d);
2572     spawn_inndcomm_flush("feedfile missing at startup");
2573     /* => Flushing, sms:=FLUSHING */
2574   } else {
2575     if (file_d) {
2576       debug("startup: F!=D => Separated");
2577       startup_set_input_file(file_d);
2578       flushing_input_file= main_input_file;
2579       main_input_file= open_input_file(feedfile);
2580       if (!main_input_file) die("feedfile vanished during startup");
2581       SMS(SEPARATED, max_separated_periods,
2582           "found both old and current feed files");
2583     } else {
2584       debug("startup: F exists, D ENOENT => Normal");
2585       InputFile *file_f= open_input_file(feedfile);
2586       if (!file_f) die("feed file vanished during startup");
2587       startup_set_input_file(file_f);
2588       SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2589     }
2590   }
2591 }
2592
2593 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2594   assert(sms == sm_NORMAL);
2595
2596   debug("starting flush (%s) (%lu >?= %lu) (%d)",
2597         why,
2598         (unsigned long)(main_input_file ? main_input_file->offset : 0),
2599         (unsigned long)target_max_feedfile_size,
2600         until_flush);
2601
2602   int r= link(feedfile, path_flushing);
2603   if (r) sysfatal("link feedfile %s to flushing file %s",
2604                   feedfile, path_flushing);
2605   /* => Hardlinked */
2606
2607   xunlink(feedfile, "old feedfile link");
2608   /* => Moved */
2609
2610   spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2611 }
2612
2613 static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
2614   switch (sms) {
2615
2616   case sm_NORMAL:
2617     statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2618     return 1;
2619
2620   case sm_FLUSHFAILED:
2621     spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2622     return 1;
2623
2624   case sm_SEPARATED:
2625   case sm_DROPPING:
2626     warn("took too long to complete old feedfile after flush, autodeferring");
2627     assert(flushing_input_file);
2628     autodefer_input_file(flushing_input_file);
2629     return 1;
2630
2631   default:
2632     return 0;
2633   }
2634 }
2635
2636 static void statemc_period_poll(void) {
2637   if (!until_flush) return;
2638   until_flush--;
2639   assert(until_flush>=0);
2640
2641   if (until_flush) return;
2642   int ok= trigger_flush_ok();
2643   assert(ok);
2644 }
2645
2646 static int inputfile_is_done(InputFile *ipf) {
2647   if (!ipf) return 0;
2648   if (ipf->inprogress) return 0; /* new article in the meantime */
2649   if (ipf->rd) return 0; /* not had EOF */
2650   return 1;
2651 }
2652
2653 static void notice_processed(InputFile *ipf, int completed,
2654                              const char *what, const char *spec) {
2655   if (!ipf) return; /* allows preterminate to be lazy */
2656
2657 #define RCI_NOTHING(x) /* nothing */
2658 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2659 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2660
2661 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2662
2663   char *inprog= completed
2664     ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2665     : xasprintf(" inprogress=%ld", ipf->inprogress);
2666   char *autodefer= ipf->autodefer >= 0
2667     ? xasprintf(" autodeferred=%ld", ipf->autodefer)
2668     : xasprintf("%s","");
2669
2670   info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
2671        " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2672        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2673        ,
2674        completed?"completed":"processed", what, spec,
2675        ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
2676        inprog, autodefer, ipf->count_nooffer_missing,
2677        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2678        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2679        CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2680        , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2681        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
2682        );
2683
2684   free(inprog);
2685   free(autodefer);
2686
2687 #undef CNT
2688 }
2689
2690 static void statemc_check_backlog_done(void) {
2691   InputFile *ipf= backlog_input_file;
2692   if (!inputfile_is_done(ipf)) return;
2693
2694   const char *slash= strrchr(ipf->path, '/');
2695   const char *leaf= slash ? slash+1 : ipf->path;
2696   const char *under= strchr(slash, '_');
2697   const char *rest= under ? under+1 : leaf;
2698   if (!strncmp(rest,"backlog",7)) rest += 7;
2699   notice_processed(ipf,1,"backlog ",rest);
2700
2701   close_input_file(ipf);
2702   if (unlink(ipf->path)) {
2703     if (errno != ENOENT)
2704       sysdie("could not unlink processed backlog file %s", ipf->path);
2705     warn("backlog file %s vanished while we were reading it"
2706          " so we couldn't remove it (but it's done now, anyway)",
2707          ipf->path);
2708   }
2709   free(ipf);
2710   backlog_input_file= 0;
2711   search_backlog_file();
2712   return;
2713 }
2714
2715 static void statemc_check_flushing_done(void) {
2716   InputFile *ipf= flushing_input_file;
2717   if (!inputfile_is_done(ipf)) return;
2718
2719   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2720
2721   notice_processed(ipf,1,"feedfile","");
2722
2723   close_defer();
2724
2725   xunlink(path_flushing, "old flushing file");
2726
2727   close_input_file(flushing_input_file);
2728   free(flushing_input_file);
2729   flushing_input_file= 0;
2730
2731   if (sms==sm_SEPARATED) {
2732     notice("flush complete");
2733     SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2734   } else if (sms==sm_DROPPING) {
2735     SMS(DROPPED, max_separated_periods, "old flush complete");
2736     search_backlog_file();
2737     notice("feed dropped, but will continue until backlog is finished");
2738   }
2739 }
2740
2741 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2742                                       void *u) {
2743   assert(!inputfile_is_done(main_input_file));
2744   statemc_check_flushing_done();
2745   statemc_check_backlog_done();
2746   return OOP_CONTINUE;
2747 }
2748
2749 static void queue_check_input_done(void) {
2750   loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2751 }
2752
2753 static void statemc_setstate(StateMachineState newsms, int periods,
2754                              const char *forlog, const char *why) {
2755   sms= newsms;
2756   until_flush= periods;
2757
2758   const char *xtra= "";
2759   switch (sms) {
2760   case sm_FLUSHING:
2761   case sm_FLUSHFAILED:
2762     if (!main_input_file) xtra= "-ABSENT";
2763     break;
2764   case sm_SEPARATED:
2765   case sm_DROPPING:
2766     xtra= flushing_input_file->rd ? "-1" : "-2";
2767     break;
2768   default:;
2769   }
2770
2771   if (periods) {
2772     info("state %s%s[%d] %s",forlog,xtra,periods,why);
2773   } else {
2774     info("state %s%s %s",forlog,xtra,why);
2775   }
2776 }
2777
2778 /*---------- defer and backlog files ----------*/
2779
2780 static void open_defer(void) {
2781   struct stat stab;
2782
2783   if (defer) return;
2784
2785   defer= fopen(path_defer, "a+");
2786   if (!defer) sysfatal("could not open defer file %s", path_defer);
2787
2788   /* truncate away any half-written records */
2789
2790   xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2791
2792   if (stab.st_size > LONG_MAX)
2793     die("defer file %s size is far too large", path_defer);
2794
2795   if (!stab.st_size)
2796     return;
2797
2798   long orgsize= stab.st_size;
2799   long truncto= stab.st_size;
2800   for (;;) {
2801     if (!truncto) break; /* was only (if anything) one half-truncated record */
2802     if (fseek(defer, truncto-1, SEEK_SET) < 0)
2803       sysdie("seek in defer file %s while truncating partial", path_defer);
2804
2805     int r= getc(defer);
2806     if (r==EOF) {
2807       if (ferror(defer))
2808         sysdie("failed read from defer file %s", path_defer);
2809       else
2810         die("defer file %s shrank while we were checking it!", path_defer);
2811     }
2812     if (r=='\n') break;
2813     truncto--;
2814   }
2815
2816   if (stab.st_size != truncto) {
2817     warn("truncating half-record at end of defer file %s -"
2818          " shrinking by %ld bytes from %ld to %ld",
2819          path_defer, orgsize - truncto, orgsize, truncto);
2820
2821     if (fflush(defer))
2822       sysfatal("could not flush defer file %s", path_defer);
2823     if (ftruncate(fileno(defer), truncto))
2824       sysdie("could not truncate defer file %s", path_defer);
2825
2826   } else {
2827     info("continuing existing defer file %s (%ld bytes)",
2828          path_defer, orgsize);
2829   }
2830   if (fseek(defer, truncto, SEEK_SET))
2831     sysdie("could not seek to new end of defer file %s", path_defer);
2832 }
2833
2834 static void close_defer(void) {
2835   if (!defer)
2836     return;
2837
2838   struct stat stab;
2839   xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2840
2841   if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2842   defer= 0;
2843
2844   time_t now= xtime();
2845
2846   char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2847                            (unsigned long)now,
2848                            (unsigned long)stab.st_ino);
2849   if (link(path_defer, backlog))
2850     sysfatal("could not install defer file %s as backlog file %s",
2851            path_defer, backlog);
2852   if (unlink(path_defer))
2853     sysdie("could not unlink old defer link %s to backlog file %s",
2854            path_defer, backlog);
2855
2856   free(backlog);
2857
2858   if (until_backlog_nextscan < 0 ||
2859       until_backlog_nextscan > backlog_retry_minperiods + 1)
2860     until_backlog_nextscan= backlog_retry_minperiods + 1;
2861 }
2862
2863 static void poll_backlog_file(void) {
2864   if (until_backlog_nextscan < 0) return;
2865   if (until_backlog_nextscan-- > 0) return;
2866   search_backlog_file();
2867 }
2868
2869 static void search_backlog_file(void) {
2870   /* returns non-0 iff there are any backlog files */
2871
2872   glob_t gl;
2873   int r, i;
2874   struct stat stab;
2875   const char *oldest_path=0;
2876   time_t oldest_mtime=0, now;
2877
2878   if (backlog_input_file) return;
2879
2880  try_again:
2881
2882   r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2883
2884   switch (r) {
2885   case GLOB_ABORTED:
2886     sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2887   case GLOB_NOSPACE:
2888     fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2889   case 0:
2890     for (i=0; i<gl.gl_pathc; i++) {
2891       const char *path= gl.gl_pathv[i];
2892
2893       if (strchr(path,'#') || strchr(path,'~')) {
2894         debug("backlog file search skipping %s", path);
2895         continue;
2896       }
2897       r= stat(path, &stab);
2898       if (r) {
2899         syswarn("failed to stat backlog file %s", path);
2900         continue;
2901       }
2902       if (!S_ISREG(stab.st_mode)) {
2903         warn("backlog file %s is not a plain file (or link to one)", path);
2904         continue;
2905       }
2906       if (!oldest_path || stab.st_mtime < oldest_mtime) {
2907         oldest_path= path;
2908         oldest_mtime= stab.st_mtime;
2909       }
2910     }
2911   case GLOB_NOMATCH: /* fall through */
2912     break;
2913   default:
2914     sysdie("glob expansion of backlog pattern %s gave unexpected"
2915            " nonzero (error?) return value %d", globpat_backlog, r);
2916   }
2917
2918   if (!oldest_path) {
2919     debug("backlog scan: none");
2920
2921     if (sms==sm_DROPPED) {
2922       preterminate();
2923       notice("feed dropped and our work is complete");
2924
2925       int r= unlink(path_control);
2926       if (r && errno!=ENOENT)
2927         syswarn("failed to remove control symlink for old feed");
2928
2929       xunlink(path_lock,    "lockfile for old feed");
2930       exit(4);
2931     }
2932     until_backlog_nextscan= backlog_spontrescan_periods;
2933     goto xfree;
2934   }
2935
2936   now= xtime();
2937   double age= difftime(now, oldest_mtime);
2938   long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2939
2940   if (age_deficiency <= 0) {
2941     debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2942           age, age_deficiency, oldest_path);
2943
2944     backlog_input_file= open_input_file(oldest_path);
2945     if (!backlog_input_file) {
2946       warn("backlog file %s vanished as we opened it", oldest_path);
2947       globfree(&gl);
2948       goto try_again;
2949     }
2950     inputfile_reading_start(backlog_input_file);
2951     until_backlog_nextscan= -1;
2952     goto xfree;
2953   }
2954
2955   until_backlog_nextscan= age_deficiency / period_seconds;
2956
2957   if (backlog_spontrescan_periods >= 0 &&
2958       until_backlog_nextscan > backlog_spontrescan_periods)
2959     until_backlog_nextscan= backlog_spontrescan_periods;
2960
2961   debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2962         age, age_deficiency, until_backlog_nextscan, oldest_path);
2963
2964  xfree:
2965   globfree(&gl);
2966   return;
2967 }
2968
2969 /*---------- shutdown and signal handling ----------*/
2970
2971 static void preterminate(void) {
2972   if (in_child) return;
2973   notice_processed(main_input_file,0,"feedfile","");
2974   notice_processed(flushing_input_file,0,"flushing","");
2975   if (backlog_input_file)
2976     notice_processed(backlog_input_file,0, "backlog file ",
2977                      backlog_input_file->path);
2978 }
2979
2980 static int signal_self_pipe[2];
2981 static sig_atomic_t terminate_sig_flag;
2982
2983 static void raise_default(int signo) {
2984   xsigsetdefault(signo);
2985   raise(signo);
2986   abort();
2987 }
2988
2989 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2990   assert(fd=signal_self_pipe[0]);
2991   char buf[PIPE_BUF];
2992   int r= read(signal_self_pipe[0], buf, sizeof(buf));
2993   if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2994   if (r==0) die("eof on signal self pipe");
2995   if (terminate_sig_flag) {
2996     preterminate();
2997     notice("terminating (%s)", strsignal(terminate_sig_flag));
2998     raise_default(terminate_sig_flag);
2999   }
3000   return OOP_CONTINUE;
3001 }
3002
3003 static void sigarrived_handler(int signum) {
3004   static char x;
3005   switch (signum) {
3006   case SIGTERM:
3007   case SIGINT:
3008     if (!terminate_sig_flag) terminate_sig_flag= signum;
3009     break;
3010   default:
3011     abort();
3012   }
3013   write(signal_self_pipe[1],&x,1);
3014 }
3015
3016 static void init_signals(void) {
3017   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
3018     sysdie("could not ignore SIGPIPE");
3019
3020   if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
3021
3022   xsetnonblock(signal_self_pipe[0],1);
3023   xsetnonblock(signal_self_pipe[1],1);
3024
3025   struct sigaction sa;
3026   memset(&sa,0,sizeof(sa));
3027   sa.sa_handler= sigarrived_handler;
3028   sa.sa_flags= SA_RESTART;
3029   xsigaction(SIGTERM,&sa);
3030   xsigaction(SIGINT,&sa);
3031
3032   on_fd_read_except(signal_self_pipe[0], sigarrived_event);
3033 }
3034
3035 /*========== flushing the feed ==========*/
3036
3037 static pid_t inndcomm_child;
3038 static int inndcomm_sentinel_fd;
3039
3040 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
3041   assert(inndcomm_child);
3042   assert(fd == inndcomm_sentinel_fd);
3043   int status= xwaitpid(&inndcomm_child, "inndcomm");
3044   inndcomm_child= 0;
3045   
3046   cancel_fd_read_except(fd);
3047   xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
3048   inndcomm_sentinel_fd= 0;
3049
3050   assert(!flushing_input_file);
3051
3052   if (WIFEXITED(status)) {
3053     switch (WEXITSTATUS(status)) {
3054
3055     case INNDCOMMCHILD_ESTATUS_FAIL:
3056       goto failed;
3057
3058     case INNDCOMMCHILD_ESTATUS_NONESUCH:
3059       notice("feed has been dropped by innd, finishing up");
3060       flushing_input_file= main_input_file;
3061       tailing_queue_readable(flushing_input_file);
3062         /* we probably previously returned EAGAIN from our fake read method
3063          * when in fact we were at EOF, so signal another readable event
3064          * so we actually see the EOF */
3065
3066       main_input_file= 0;
3067
3068       if (flushing_input_file) {
3069         SMS(DROPPING, max_separated_periods,
3070             "feed dropped by innd, but must finish last flush");
3071       } else {
3072         close_defer();
3073         SMS(DROPPED, 0, "feed dropped by innd");
3074         search_backlog_file();
3075       }
3076       return OOP_CONTINUE;
3077
3078     case 0:
3079       /* as above */
3080       flushing_input_file= main_input_file;
3081       tailing_queue_readable(flushing_input_file);
3082
3083       main_input_file= open_input_file(feedfile);
3084       if (!main_input_file)
3085         die("flush succeeded but feedfile %s does not exist!", feedfile);
3086
3087       if (flushing_input_file) {
3088         SMS(SEPARATED, max_separated_periods, "recovery flush complete");
3089       } else {
3090         close_defer();
3091         SMS(NORMAL, spontaneous_flush_periods, "flush complete");
3092       }
3093       return OOP_CONTINUE;
3094
3095     default:
3096       goto unexpected_exitstatus;
3097
3098     }
3099   } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
3100     warn("flush timed out trying to talk to innd");
3101     goto failed;
3102   } else {
3103   unexpected_exitstatus:
3104     report_child_status("inndcomm child", status);
3105   }
3106
3107  failed:
3108   SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
3109   return OOP_CONTINUE;
3110 }
3111
3112 static void inndcommfail(const char *what) {
3113   syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
3114   exit(INNDCOMMCHILD_ESTATUS_FAIL);
3115 }
3116
3117 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
3118   int pipefds[2];
3119
3120   notice("flushing %s",why);
3121
3122   assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
3123   assert(!inndcomm_child);
3124   assert(!inndcomm_sentinel_fd);
3125
3126   if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
3127
3128   inndcomm_child= xfork("inndcomm child");
3129
3130   if (!inndcomm_child) {
3131     const char *flushargv[2]= { sitename, 0 };
3132     char *reply;
3133     int r;
3134
3135     xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
3136     /* parent spots the autoclose of pipefds[1] when we die or exit */
3137
3138     if (simulate_flush>=0) {
3139       warn("SIMULATING flush child status %d", simulate_flush);
3140       if (simulate_flush>128) raise(simulate_flush-128);
3141       else exit(simulate_flush);
3142     }
3143
3144     alarm(inndcomm_flush_timeout);
3145     r= ICCopen();                         if (r)   inndcommfail("connect");
3146     r= ICCcommand('f',flushargv,&reply);  if (r<0) inndcommfail("transmit");
3147     if (!r) exit(0); /* yay! */
3148
3149     if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
3150     syswarn("innd ctlinnd flush failed: innd said %s", reply);
3151     exit(INNDCOMMCHILD_ESTATUS_FAIL);
3152   }
3153
3154   simulate_flush= -1;
3155
3156   xclose(pipefds[1], "inndcomm sentinel child's end",0);
3157   inndcomm_sentinel_fd= pipefds[0];
3158   assert(inndcomm_sentinel_fd);
3159   on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
3160
3161   SMS(FLUSHING, 0, why);
3162 }
3163
3164 /*========== main program ==========*/
3165
3166 static void postfork_inputfile(InputFile *ipf) {
3167   if (!ipf) return;
3168   xclose(ipf->fd, "(in child) input file ", ipf->path);
3169 }
3170
3171 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
3172   /* we have no stdio streams that are buffered long-term */
3173   if (!f) return;
3174   if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
3175 }
3176
3177 static void postfork(void) {
3178   in_child= 1;
3179
3180   xsigsetdefault(SIGTERM);
3181   xsigsetdefault(SIGINT);
3182   xsigsetdefault(SIGPIPE);
3183   if (terminate_sig_flag) raise(terminate_sig_flag);
3184
3185   postfork_inputfile(main_input_file);
3186   postfork_inputfile(flushing_input_file);
3187
3188   Conn *conn;
3189   FOR_CONN(conn)
3190     conn_closefd(conn,"(in child) ");
3191
3192   postfork_stdio(defer, "defer file ", path_defer);
3193 }
3194
3195 typedef struct Every Every;
3196 struct Every {
3197   struct timeval interval;
3198   int fixed_rate;
3199   void (*f)(void);
3200 };
3201
3202 static void every_schedule(Every *e, struct timeval base);
3203
3204 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3205   Every *e= e_v;
3206   e->f();
3207   if (!e->fixed_rate) xgettimeofday(&base);
3208   every_schedule(e, base);
3209   return OOP_CONTINUE;
3210 }
3211
3212 static void every_schedule(Every *e, struct timeval base) {
3213   struct timeval when;
3214   timeradd(&base, &e->interval, &when);
3215   loop->on_time(loop, when, every_happens, e);
3216 }
3217
3218 static void every(int interval, int fixed_rate, void (*f)(void)) {
3219   NEW_DECL(Every *,e);
3220   e->interval.tv_sec= interval;
3221   e->interval.tv_usec= 0;
3222   e->fixed_rate= fixed_rate;
3223   e->f= f;
3224   struct timeval now;
3225   xgettimeofday(&now);
3226   every_schedule(e, now);
3227 }
3228
3229 static void filepoll(void) {
3230   filemon_callback(main_input_file);
3231   filemon_callback(flushing_input_file);
3232 }
3233
3234 static char *debug_report_ipf(InputFile *ipf) {
3235   if (!ipf) return xasprintf("none");
3236
3237   const char *slash= strrchr(ipf->path,'/');
3238   const char *path= slash ? slash+1 : ipf->path;
3239
3240   return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
3241                    ipf, path,
3242                    ipf->queue.count, ipf->inprogress, ipf->autodefer,
3243                    (long)ipf->offset, ipf->fd,
3244                    ipf->rd ? "" : ",!rd",
3245                    ipf->skippinglong ? "*skiplong" : "",
3246                    ipf->rd && ipf->paused ? "*paused" : "");
3247 }
3248
3249 static void period(void) {
3250   char *dipf_main=     debug_report_ipf(main_input_file);
3251   char *dipf_flushing= debug_report_ipf(flushing_input_file);
3252   char *dipf_backlog=  debug_report_ipf(backlog_input_file);
3253
3254   debug("PERIOD"
3255         " sms=%s[%d] conns=%d until_connect=%d"
3256         " input_files main:%s flushing:%s backlog:%s[%d]"
3257         " children connecting=%ld inndcomm=%ld"
3258         ,
3259         sms_names[sms], until_flush, conns.count, until_connect,
3260         dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan,
3261         (long)connecting_child, (long)inndcomm_child
3262         );
3263
3264   free(dipf_main);
3265   free(dipf_flushing);
3266   free(dipf_backlog);
3267
3268   if (until_connect) until_connect--;
3269
3270   inputfile_queue_check_expired(backlog_input_file);
3271   poll_backlog_file();
3272   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3273   statemc_period_poll();
3274   check_assign_articles();
3275   check_idle_conns();
3276 }
3277
3278
3279 /*========== dumping state ==========*/
3280
3281 static void dump_article_list(FILE *f, const ControlCommand *c,
3282                               const ArticleList *al) {
3283   fprintf(f, " count=%d\n", al->count);
3284   if (!c->xval) return;
3285   
3286   int i; Article *art;
3287   for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3288     fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3289     DUMPV("%p", art->,ipf);
3290     DUMPV("%d", art->,missing);
3291     DUMPV("%lu", (unsigned long)art->,offset);
3292     DUMPV("%d", art->,blanklen);
3293     DUMPV("%d", art->,midlen);
3294     fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3295   }
3296 }
3297   
3298 static void dump_input_file(FILE *f, const ControlCommand *c,
3299                             InputFile *ipf, const char *wh) {
3300   char *dipf= debug_report_ipf(ipf);
3301   fprintf(f,"input %s %s", wh, dipf);
3302   free(dipf);
3303   
3304   if (ipf) {
3305     DUMPV("%d", ipf->,readcount_ok);
3306     DUMPV("%d", ipf->,readcount_blank);
3307     DUMPV("%d", ipf->,readcount_err);
3308   }
3309   fprintf(f,"\n");
3310   if (ipf) {
3311     ArtState state; const char *const *statename; 
3312     for (state=0, statename=artstate_names; *statename; state++,statename++) {
3313 #define RC_DUMP_FMT(x) " " #x "=%d"
3314 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3315       fprintf(f,"input %s counts %-11s"
3316               RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3317               wh, *statename
3318               RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3319     }
3320     fprintf(f,"input %s queue", wh);
3321     dump_article_list(f,c,&ipf->queue);
3322   }
3323 }
3324
3325 CCMD(dump) {
3326   int i;
3327   fprintf(cc->out, "dumping state to %s\n", path_dump);
3328   FILE *f= fopen(path_dump, "w");
3329   if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3330
3331   fprintf(f,"general");
3332   DUMPV("%s", sms_names,[sms]);
3333   DUMPV("%d", ,until_flush);
3334   DUMPV("%ld", (long),self_pid);
3335   DUMPV("%p", , defer);
3336   DUMPV("%d", , until_connect);
3337   DUMPV("%d", , until_backlog_nextscan);
3338   DUMPV("%d", , simulate_flush);
3339   fprintf(f,"\nnocheck");
3340   DUMPV("%#.10f", , accept_proportion);
3341   DUMPV("%d", , nocheck);
3342   DUMPV("%d", , nocheck_reported);
3343   fprintf(f,"\n");
3344
3345   fprintf(f,"special");
3346   DUMPV("%ld", (long),connecting_child);
3347   DUMPV("%d", , connecting_fdpass_sock);
3348   DUMPV("%d", , control_master);
3349   fprintf(f,"\n");
3350
3351   fprintf(f,"filemon ");
3352   filemon_method_dump_info(f);
3353
3354   dump_input_file(f,c, main_input_file,     "main"    );
3355   dump_input_file(f,c, flushing_input_file, "flushing");
3356   dump_input_file(f,c, backlog_input_file,  "backlog" );
3357
3358   fprintf(f,"conns count=%d\n", conns.count);
3359
3360   Conn *conn;
3361   FOR_CONN(conn) {
3362
3363     fprintf(f,"C%d",conn->fd);
3364     DUMPV("%p",conn->,rd);             DUMPV("%d",conn->,max_queue);
3365     DUMPV("%d",conn->,stream);         DUMPV("%d",conn->,quitting);
3366     DUMPV("%d",conn->,since_activity);
3367     fprintf(f,"\n");
3368
3369     fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3370     fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3371     fprintf(f,"C%d sent",    conn->fd); dump_article_list(f,c,&conn->sent);
3372
3373     fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3374     for (i=0; i<conn->xmitu; i++) {
3375       const struct iovec *iv= &conn->xmit[i];
3376       const XmitDetails *xd= &conn->xmitd[i];
3377       char *dinfo;
3378       switch (xd->kind) {
3379       case xk_Const:    dinfo= xasprintf("Const");                 break;
3380       case xk_Artdata:  dinfo= xasprintf("A%p", xd->info.sm_art);  break;
3381       default:
3382         abort();
3383       }
3384       fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3385               sanitise(iv->iov_base, iv->iov_len));
3386       free(dinfo);
3387     }
3388   }
3389
3390   fprintf(f,"paths");
3391   DUMPV("%s", , path_lock);
3392   DUMPV("%s", , path_flushing);
3393   DUMPV("%s", , path_defer);
3394   DUMPV("%s", , path_control);
3395   DUMPV("%s", , path_dump);
3396   DUMPV("%s", , globpat_backlog);
3397   fprintf(f,"\n");
3398
3399   if (!!ferror(f) + !!fclose(f)) {
3400     fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3401     return;
3402   }
3403 }
3404
3405 /*========== option parsing ==========*/
3406
3407 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3408 static void vbadusage(const char *fmt, va_list al) {
3409   char *m= xvasprintf(fmt,al);
3410   fprintf(stderr, "bad usage: %s\n"
3411           "say --help for help, or read the manpage\n",
3412           m);
3413   if (become_daemon)
3414     syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3415   exit(8);
3416 }
3417
3418 /*---------- generic option parser ----------*/
3419
3420 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3421 static void badusage(const char *fmt, ...) {
3422   va_list al;
3423   va_start(al,fmt);
3424   vbadusage(fmt,al);
3425 }
3426
3427 enum OptFlags {
3428   of_seconds= 001000u,
3429   of_boolean= 002000u,
3430 };
3431
3432 typedef struct Option Option;
3433 typedef void OptionParser(const Option*, const char *val);
3434
3435 struct Option {
3436   int shrt;
3437   const char *lng, *formarg;
3438   void *store;
3439   OptionParser *fn;
3440   int intval;
3441 };
3442
3443 static void parse_options(const Option *options, char ***argvp) {
3444   /* on return *argvp is first non-option arg; argc is not updated */
3445
3446   for (;;) {
3447     const char *arg= *++(*argvp);
3448     if (!arg) break;
3449     if (*arg != '-') break;
3450     if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3451     int a;
3452     while ((a= *++arg)) {
3453       const Option *o;
3454       if (a=='-') {
3455         arg++;
3456         char *equals= strchr(arg,'=');
3457         int len= equals ? (equals - arg) : strlen(arg);
3458         for (o=options; o->shrt || o->lng; o++)
3459           if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3460             goto found_long;
3461         badusage("unknown long option --%s",arg);
3462       found_long:
3463         if (!o->formarg) {
3464           if (equals) badusage("option --%s does not take a value",o->lng);
3465           arg= 0;
3466         } else if (equals) {
3467           arg= equals+1;
3468         } else {
3469           arg= *++(*argvp);
3470           if (!arg) badusage("option --%s needs a value for %s",
3471                              o->lng, o->formarg);
3472         }
3473         o->fn(o, arg);
3474         break; /* eaten the whole argument now */
3475       }
3476       for (o=options; o->shrt || o->lng; o++)
3477         if (a == o->shrt)
3478           goto found_short;
3479       badusage("unknown short option -%c",a);
3480     found_short:
3481       if (!o->formarg) {
3482         o->fn(o,0);
3483       } else {
3484         if (!*++arg) {
3485           arg= *++(*argvp);
3486           if (!arg) badusage("option -%c needs a value for %s",
3487                              o->shrt, o->formarg);
3488         }
3489         o->fn(o,arg);
3490         break; /* eaten the whole argument now */
3491       }
3492     }
3493   }
3494 }
3495
3496 #define DELIMPERHAPS(delim,str)  (str) ? (delim) : "", (str) ? (str) : ""
3497
3498 static void print_options(const Option *options, FILE *f) {
3499   const Option *o;
3500   for (o=options; o->shrt || o->lng; o++) {
3501     char shrt[2] = { o->shrt, 0 };
3502     char *optspec= xasprintf("%s%s%s%s%s",
3503                              o->shrt ? "-" : "", shrt,
3504                              o->shrt && o->lng ? "|" : "",
3505                              DELIMPERHAPS("--", o->lng));
3506     fprintf(f, "  %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3507     free(optspec);
3508   }
3509 }
3510
3511 /*---------- specific option types ----------*/
3512
3513 static void op_integer(const Option *o, const char *val) {
3514   char *ep;
3515   errno= 0;
3516   unsigned long ul= strtoul(val,&ep,10);
3517   if (*ep || ep==val || errno || ul>INT_MAX)
3518     badusage("bad integer value for %s",o->lng);
3519   int *store= o->store;
3520   *store= ul;
3521 }
3522
3523 static void op_double(const Option *o, const char *val) {
3524   int *store= o->store;
3525   char *ep;
3526   errno= 0;
3527   *store= strtod(val, &ep);
3528   if (*ep || ep==val || errno)
3529     badusage("bad floating point value for %s",o->lng);
3530 }
3531
3532 static void op_string(const Option *o, const char *val) {
3533   const char **store= o->store;
3534   *store= val;
3535 }
3536
3537 static void op_seconds(const Option *o, const char *val) {
3538   int *store= o->store;
3539   char *ep;
3540   int unit;
3541
3542   double v= strtod(val,&ep);
3543   if (ep==val) badusage("bad time/duration value for %s",o->lng);
3544
3545   if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3546   else if (!strcmp(ep,"m") || !strcmp(ep,"min"))    unit= 60;
3547   else if (!strcmp(ep,"h") || !strcmp(ep,"hour"))   unit= 3600;
3548   else if (!strcmp(ep,"d") || !strcmp(ep,"day"))    unit= 86400;
3549   else if (!strcmp(ep,"das")) unit= 10;
3550   else if (!strcmp(ep,"hs"))  unit= 100;
3551   else if (!strcmp(ep,"ks"))  unit= 1000;
3552   else if (!strcmp(ep,"Ms"))  unit= 1000000;
3553   else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3554
3555   v *= unit;
3556   v= ceil(v);
3557   if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3558   *store= v;
3559 }
3560
3561 static void op_setint(const Option *o, const char *val) {
3562   int *store= o->store;
3563   *store= o->intval;
3564 }
3565
3566 /*---------- specific options ----------*/
3567
3568 static void help(const Option *o, const char *val);
3569
3570 static const Option innduct_options[]= {
3571 {'f',"feedfile",         "F",     &feedfile,                 op_string      },
3572 {'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
3573 {0,"no-daemon",          0,       &become_daemon,            op_setint, 0   },
3574 {0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
3575 {0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
3576 {'C',"inndconf",         "F",     &inndconffile,             op_string      },
3577 {'P',"port",             "PORT",  &port,                     op_integer     },
3578 {0,"ctrl-sock-dir",      0,       &realsockdir,              op_string      },
3579 {0,"help",               0,       0,                         help           },
3580
3581 {0,"max-connections",    "N",     &max_connections,          op_integer     },
3582 {0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
3583 {0,"max-queue-per-file", "N",     &max_queue_per_ipf,        op_integer     },
3584 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
3585 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
3586
3587 {0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
3588 {0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
3589 {0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
3590
3591 {0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
3592 {0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
3593
3594 {0,"reconnect-interval",     "PERIOD", &reconnect_delay_periods,  op_seconds },
3595 {0,"flush-retry-interval",   "PERIOD", &flushfail_retry_periods,  op_seconds },
3596 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3597 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3598 {0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
3599 {0,"flush-finish-timeout",   "PERIOD", &max_separated_periods,    op_seconds },
3600 {0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
3601
3602 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
3603 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
3604
3605 {0,0}
3606 };
3607
3608 static void printusage(FILE *f) {
3609   fputs("usage: innduct [options] site [fqdn]\n"
3610         "available options are:\n", f);
3611   print_options(innduct_options, f);
3612 }
3613
3614 static void help(const Option *o, const char *val) {
3615   printusage(stdout);
3616   if (ferror(stdout) || fflush(stdout)) {
3617     perror("innduct: writing help");
3618     exit(12);
3619   }
3620   exit(0);
3621 }
3622
3623 static void convert_to_periods_rndup(int *store) {
3624   *store += period_seconds-1;
3625   *store /= period_seconds;
3626 }
3627
3628 int main(int argc, char **argv) {
3629   if (!argv[1]) {
3630     printusage(stderr);
3631     exit(8);
3632   }
3633
3634   parse_options(innduct_options, &argv);
3635
3636   /* arguments */
3637
3638   sitename= *argv++;
3639   if (!sitename) badusage("need site name argument");
3640   remote_host= *argv++;
3641   if (*argv) badusage("too many non-option arguments");
3642
3643   /* defaults */
3644
3645   int r= innconf_read(inndconffile);
3646   if (!r) badusage("could not read inn.conf (more info on stderr)");
3647
3648   if (!remote_host) remote_host= sitename;
3649
3650   if (nocheck_thresh < 0 || nocheck_thresh > 100)
3651     badusage("nocheck threshold percentage must be between 0..100");
3652   nocheck_thresh *= 0.01;
3653
3654   if (nocheck_decay < 0.1)
3655     badusage("nocheck decay articles must be at least 0.1");
3656   nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3657
3658   convert_to_periods_rndup(&reconnect_delay_periods);
3659   convert_to_periods_rndup(&flushfail_retry_periods);
3660   convert_to_periods_rndup(&backlog_retry_minperiods);
3661   convert_to_periods_rndup(&backlog_spontrescan_periods);
3662   convert_to_periods_rndup(&spontaneous_flush_periods);
3663   convert_to_periods_rndup(&max_separated_periods);
3664   convert_to_periods_rndup(&need_activity_periods);
3665
3666   if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3667     badusage("bad input data ratio must be between 0..100");
3668   max_bad_data_ratio *= 0.01;
3669
3670   if (!feedfile) {
3671     feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3672   } else if (!feedfile[0]) {
3673     badusage("feed filename must be nonempty");
3674   } else if (feedfile[strlen(feedfile)-1]=='/') {
3675     feedfile= xasprintf("%s%s",feedfile,sitename);
3676   }
3677
3678   if (max_queue_per_ipf<0)
3679     max_queue_per_ipf= max_queue_per_conn * 2;
3680
3681   const char *feedfile_forbidden= "?*[~#";
3682   int c;
3683   while ((c= *feedfile_forbidden++))
3684     if (strchr(feedfile, c))
3685       badusage("feed filename may not contain metacharacter %c",c);
3686
3687   /* set things up */
3688
3689   path_lock=        xasprintf("%s_lock",      feedfile);
3690   path_flushing=    xasprintf("%s_flushing",  feedfile);
3691   path_defer=       xasprintf("%s_defer",     feedfile);
3692   path_control=     xasprintf("%s_control",   feedfile);
3693   path_dump=        xasprintf("%s_dump",      feedfile);
3694   globpat_backlog=  xasprintf("%s_backlog*",  feedfile);
3695
3696   oop_source_sys *sysloop= oop_sys_new();
3697   if (!sysloop) sysdie("could not create liboop event loop");
3698   loop= (oop_source*)sysloop;
3699
3700   LIST_INIT(conns);
3701
3702   if (become_daemon) {
3703     int i;
3704     for (i=3; i<255; i++)
3705       /* do this now before we open syslog, etc. */
3706       close(i);
3707     openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3708
3709     int null= open("/dev/null",O_RDWR);
3710     if (null<0) sysfatal("failed to open /dev/null");
3711     dup2(null,0);
3712     dup2(null,1);
3713     dup2(null,2);
3714     xclose(null, "/dev/null original fd",0);
3715
3716     pid_t child1= xfork("daemonise first fork");
3717     if (child1) _exit(0);
3718
3719     pid_t sid= setsid();
3720     if (sid != child1) sysfatal("setsid failed");
3721
3722     pid_t child2= xfork("daemonise second fork");
3723     if (child2) _exit(0);
3724   }
3725
3726   self_pid= getpid();
3727   if (self_pid==-1) sysdie("getpid");
3728
3729   statemc_lock();
3730
3731   init_signals();
3732
3733   notice("starting");
3734
3735   if (!become_daemon)
3736     control_stdio();
3737
3738   control_init();
3739
3740   int filemon_ok= 0;
3741   if (!try_filemon) {
3742     notice("filemon: suppressed by command line option, polling");
3743   } else {
3744     filemon_ok= filemon_method_init();
3745     if (!filemon_ok)
3746       warn("filemon: no file monitoring available, polling");
3747   }
3748   if (!filemon_ok)
3749     every(filepoll_seconds,0,filepoll);
3750
3751   every(period_seconds,1,period);
3752
3753   statemc_init();
3754
3755   /* let's go */
3756
3757   void *run= oop_sys_run(sysloop);
3758   assert(run == OOP_ERROR);
3759   sysdie("event loop failed");
3760 }