chiark / gitweb /
503 is not timeout
[inn-innduct.git] / backends / innduct.c
1 /*
2  * debugging rune:
3  *  build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
4  */
5
6 /*--
7 flow control notes
8 to ensure articles go away eventually
9 separate queue for each input file
10   queue expiry
11     every period, check head of backlog queue for expiry with SMretrieve
12       if too old: discard, and check next article
13     also check every backlog article as we read it
14   flush expiry
15     after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
16     one-off: eat queued articles from flushing and write them to defer
17     one-off: connfail all connections which have any articles from flushing
18     newly read articles from flushing go straight to defer
19     this should take care of it and get us out of this state
20 to avoid filling up ram needlessly
21   input control
22     limit number of queued articles for each ipf
23     pause/resume inputfile tailing
24 --*/
25
26 /*
27  * Newsfeeds file entries should look like this:
28  *     host.name.of.site[/exclude,exclude,...]\
29  *             :pattern,pattern...[/distribution,distribution...]\
30  *             :Tf,Wnm
31  *             :
32  * or
33  *     sitename[/exclude,exclude,...]\
34  *             :pattern,pattern...[/distribution,distribution...]\
35  *             :Tf,Wnm
36  *             :host.name.of.site
37  *
38  * Four files full of
39  *    token messageid
40  * or might be blanked out
41  *    <spc><spc><spc><spc>....
42  *
43  * F site.name                 main feed file
44  *                                opened/created, then written, by innd
45  *                                read by duct
46  *                                unlinked by duct
47  *                                tokens blanked out by duct when processed
48  *   site.name_lock            lock preventing multiple ducts
49  *                                to hold lock must open,F_SETLK[W]
50  *                                  and then stat to check that locked file
51  *                                  still has name site.name_lock
52  *                                holder of this lock is "duct"
53  *                                (only) lockholder may remove the lockfile
54  * D site.name_flushing        temporary feed file during flush (or crash)
55  *                                hardlink created by duct
56  *                                unlinked by duct
57  *   site.name_defer           431'd articles, still being written,
58  *                                created, written, used by duct
59  *
60  *   site.name_backlog.<date>.<inum>
61  *                             431'd articles, ready for innxmit or duct
62  *                                created (link/mv) by duct
63  *   site.name_backlog<anything-else>  (where <anything-else> does not
64  *                                      contain '#' or '~') eg
65  *   site.name_backlog.manual
66  *                             anything the sysadmin likes (eg, feed files
67  *                             from old feeds to be merged into this one)
68  *                                created (link/mv) by admin
69  *                                may be symlinks (in which case links
70  *                                may be written through, but only links
71  *                                will be removed.
72  *
73  *                             It is safe to remove backlog files manually,
74  *                             if it's desired to throw away the backlog.
75  *
76  * Backlog files are also processed by innduct.  We find the oldest
77  * backlog file which is at least a certain amount old, and feed it
78  * back into our processing.  When every article in it has been read
79  * and processed, we unlink it and look for another backlog file.
80  *
81  * If we don't have a backlog file that we're reading, we close the
82  * defer file that we're writing and make it into a backlog file at
83  * the first convenient opportunity.
84  * -8<-
85
86
87    OVERALL STATES:
88
89                                                                 START
90                                                                   |
91      ,-->--.                                                 check F, D
92      |     |                                                      |
93      |     |                                                      |
94      |     |  <----------------<---------------------------------'|
95      |     |                                       F exists       |
96      |     |                                       D ENOENT       |
97      |     |  duct opens F                                        |
98      |     V                                                      |
99      |  Normal                                                    |
100      |   F: innd writing, duct reading                            |
101      |   D: ENOENT                                                |
102      |     |                                                      |
103      |     |  duct decides time to flush                          |
104      |     |  duct makes hardlink                                 |
105      |     |                                                      |
106      |     V                            <------------------------'|
107      |  Hardlinked                                  F==D          |
108      |   F == D: innd writing, duct reading         both exist    |
109      ^     |                                                      |
110      |     |  duct unlinks F                                      |
111      |     |                        <-----------<-------------<--'|
112      |     |                           open D         F ENOENT    |
113      |     |                           if exists                  |
114      |     |                                                      |
115      |     V                        <---------------------.       |
116      |  Moved                                             |       |
117      |   F: ENOENT                                        |       |
118      |   D: innd writing, duct reading; or ENOENT         |       |
119      |     |                                              |       |
120      |     |  duct requests flush of feed                 |       |
121      |     |   (others can too, harmlessly)               |       |
122      |     V                                              |       |
123      |  Flushing                                          |       |
124      |   F: ENOENT                                        |       |
125      |   D: innd flushing, duct; or ENOENT                |       |
126      |     |                                              |       |
127      |     |   inndcomm flush fails                       |       |
128      |     |`-------------------------->------------------'       |
129      |     |                                                      |
130      |     |   inndcomm reports no such site                      |
131      |     |`---------------------------------------------------- | -.
132      |     |                                                      |  |
133      |     |  innd finishes writing D, creates F                  |  |
134      |     |  inndcomm reports flush successful                   |  |
135      |     |                                                      |  |
136      |     V                                                      |  |
137      |  Separated                                <----------------'  |
138      |   F: innd writing                            F!=D             /
139      |   D: duct reading; or ENOENT                  both exist     /
140      |     |                                                       /
141      |     |  duct gets to the end of D                           /
142      |     |  duct opens F too                                   /
143      |     V                                                    /
144      |  Finishing                                              /
145      |   F: innd writing, duct reading                        |
146      |   D: duct finishing                                    V
147      |     |                                            Dropping
148      |     |  duct finishes processing D                 F: ENOENT
149      |     V  duct unlinks D                             D: duct reading
150      |     |                                                  |
151      `--<--'                                                  | duct finishes
152                                                               |  processing D
153                                                               | duct unlinks D
154                                                               | duct exits
155                                                               V
156                                                         Dropped
157                                                          F: ENOENT
158                                                          D: ENOENT
159                                                          duct not running
160
161    "duct reading" means innduct is reading the file but also
162    overwriting processed tokens.
163
164  * ->8- -^L-
165  *
166  * rune for printing diagrams:
167
168 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
169
170  *
171  */
172
173 /*============================== PROGRAM ==============================*/
174
175 #define _GNU_SOURCE 1
176
177 #include "config.h"
178 #include "storage.h"
179 #include "nntp.h"
180 #include "libinn.h"
181 #include "inndcomm.h"
182
183 #include "inn/list.h"
184 #include "inn/innconf.h"
185
186 #include <sys/uio.h>
187 #include <sys/types.h>
188 #include <sys/wait.h>
189 #include <sys/stat.h>
190 #include <sys/socket.h>
191 #include <sys/un.h>
192 #include <unistd.h>
193 #include <string.h>
194 #include <signal.h>
195 #include <stdio.h>
196 #include <errno.h>
197 #include <syslog.h>
198 #include <fcntl.h>
199 #include <stdarg.h>
200 #include <assert.h>
201 #include <stdlib.h>
202 #include <stddef.h>
203 #include <glob.h>
204 #include <time.h>
205 #include <math.h>
206 #include <ctype.h>
207
208 #include <oop.h>
209 #include <oop-read.h>
210
211 /*----- general definitions, probably best not changed -----*/
212
213 #define CONNCHILD_ESTATUS_STREAM   24
214 #define CONNCHILD_ESTATUS_NOSTREAM 25
215
216 #define INNDCOMMCHILD_ESTATUS_FAIL     26
217 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
218
219 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
220 #define MAX_CONTROL_COMMAND 1000
221
222 #define VA                va_list al;  va_start(al,fmt)
223 #define PRINTF(f,a)       __attribute__((__format__(printf,f,a)))
224 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
225 #define NORET             __attribute__((__noreturn__))
226
227 #define NEW(ptr)              ((ptr)= zxmalloc(sizeof(*(ptr))))
228 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
229
230 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
231
232 #define FOR_CONN(conn) \
233   for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
234
235 /*----- doubly linked lists -----*/
236
237 #define ISNODE(T)   struct node list_node
238 #define DEFLIST(T)                              \
239    typedef struct {                             \
240      union { struct list li; T *for_type; } u;  \
241      int count;                                 \
242    } T##List
243
244 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
245
246 #define LIST_CHECKCANHAVENODE(l,n) \
247   ((void)((n) == ((l).u.for_type))) /* just for the type check */
248
249 #define LIST_ADDSOMEHOW(l,n,list_addsomehow)    \
250  ( LIST_CHECKCANHAVENODE(l,n),                  \
251    list_addsomehow(&(l).u.li, NODE((n))),       \
252    (void)(l).count++                            \
253    )
254
255 #define LIST_REMSOMEHOW(l,list_remsomehow)      \
256  ( (typeof((l).u.for_type))                     \
257    ( (l).count                                  \
258      ? ( (l).count--,                           \
259          list_remsomehow(&(l).u.li) )           \
260      : 0                                        \
261      )                                          \
262    )
263
264
265 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
266 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
267 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
268 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
269
270 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
271 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
272 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
273 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
274
275 #define LIST_REMOVE(l,n)                        \
276  ( LIST_CHECKCANHAVENODE(l,n),                  \
277    list_remove(NODE((n))),                      \
278    (void)(l).count--                            \
279    )
280
281 #define LIST_INSERT(l,n,pred)                                   \
282  ( LIST_CHECKCANHAVENODE(l,n),                                  \
283    LIST_CHECKCANHAVENODE(l,pred),                               \
284    list_insert((struct list*)&(l), NODE((n)), NODE((pred))),    \
285    (void)(l).count++                                            \
286    )
287
288 /*----- type predeclarations -----*/
289
290 typedef struct Conn Conn;
291 typedef struct Article Article;
292 typedef struct InputFile InputFile;
293 typedef struct XmitDetails XmitDetails;
294 typedef struct Filemon_Perfile Filemon_Perfile;
295 typedef enum StateMachineState StateMachineState;
296 typedef struct ControlCommand ControlCommand;
297
298 DEFLIST(Conn);
299 DEFLIST(Article);
300
301 /*----- function predeclarations -----*/
302
303 static void conn_maybe_write(Conn *conn);
304 static void conn_make_some_xmits(Conn *conn);
305 static void *conn_write_some_xmits(Conn *conn);
306
307 static void xmit_free(XmitDetails *d);
308
309 #define SMS(newstate, periods, why) \
310    (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
311 static void statemc_setstate(StateMachineState newsms, int periods,
312                              const char *forlog, const char *why);
313
314 static void statemc_start_flush(const char *why); /* Normal => Flushing */
315 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
316 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
317
318 static void article_done(Article *art, int whichcount);
319
320 static void check_assign_articles(void);
321 static void queue_check_input_done(void);
322 static void check_reading_pause_resume(InputFile *ipf);
323
324 static void statemc_check_flushing_done(void);
325 static void statemc_check_backlog_done(void);
326
327 static void postfork(void);
328 static void period(void);
329
330 static void open_defer(void);
331 static void close_defer(void);
332 static void search_backlog_file(void);
333 static void preterminate(void);
334 static void raise_default(int signo) NORET;
335 static char *debug_report_ipf(InputFile *ipf);
336
337 static void inputfile_reading_start(InputFile *ipf);
338 static void inputfile_reading_stop(InputFile *ipf);
339 static void inputfile_reading_pause(InputFile *ipf);
340 static void inputfile_reading_resume(InputFile *ipf);
341   /* pause and resume are idempotent, and no-op if not done _reading_start */
342
343 static void filemon_start(InputFile *ipf);
344 static void filemon_stop(InputFile *ipf);
345 static void filemon_callback(InputFile *ipf);
346
347 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
348 static void connfail(Conn *conn, const char *fmt, ...)         PRINTF(2,3);
349
350 static const oop_rd_style peer_rd_style;
351 static oop_rd_call peer_rd_err, peer_rd_ok;
352
353 /*----- configuration options -----*/
354 /* when changing defaults, remember to update the manpage */
355
356 static const char *sitename, *remote_host;
357 static const char *feedfile, *realsockdir="/tmp/innduct.control";
358 static int quiet_multiple=0;
359 static int become_daemon=1, try_filemon=1;
360 static int try_stream=1;
361 static int port=119;
362 static const char *inndconffile;
363
364 static int max_connections=10;
365 static int max_queue_per_conn=200;
366 static int target_max_feedfile_size=100000;
367 static int period_seconds=60;
368 static int filepoll_seconds=5;
369 static int max_queue_per_ipf=-1;
370
371 static int connection_setup_timeout=200;
372 static int inndcomm_flush_timeout=100;
373
374 static double nocheck_thresh= 95.0; /* converted from percentage by main */
375 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
376
377 /* all these are initialised to seconds, and converted to periods in main */
378 static int reconnect_delay_periods=1000;
379 static int flushfail_retry_periods=1000;
380 static int backlog_retry_minperiods=50;
381 static int backlog_spontrescan_periods=300;
382 static int spontaneous_flush_periods=100000;
383 static int max_separated_periods=2000;
384 static int need_activity_periods=1000;
385
386 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
387 static int max_bad_data_initial= 30;
388   /* in one corrupt 4096-byte block the number of newlines has
389    * mean 16 and standard deviation 3.99.  30 corresponds to z=+3.5 */
390
391
392 /*----- statistics -----*/
393
394 typedef enum {      /* in queue                 in conn->sent             */
395   art_Unchecked,    /*   not checked, not sent    checking                */
396   art_Wanted,       /*   checked, wanted          sent body as requested  */
397   art_Unsolicited,  /*   -                        sent body without check */
398   art_MaxState,
399 } ArtState;
400
401 static const char *const artstate_names[]=
402   { "Unchecked", "Wanted", "Unsolicited", 0 };
403
404 #define RESULT_COUNTS(RCS,RCN)                  \
405   RCS(sent)                                     \
406   RCS(accepted)                                 \
407   RCN(unwanted)                                 \
408   RCN(rejected)                                 \
409   RCN(deferred)                                 \
410   RCN(missing)                                  \
411   RCN(connretry)
412
413 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
414 #define RCI_TRIPLE_VALS_BASE(counts,x)          \
415        counts[art_Unchecked] x                  \
416        + counts[art_Wanted] x                   \
417        + counts[art_Unsolicited] x,             \
418        counts[art_Unchecked] x                  \
419        , counts[art_Wanted] x                   \
420        , counts[art_Unsolicited] x
421
422 typedef enum {
423 #define RC_INDEX(x) RC_##x,
424   RESULT_COUNTS(RC_INDEX, RC_INDEX)
425   RCI_max
426 } ResultCountIndex;
427
428
429 /*----- transmission buffers -----*/
430
431 #define CONNIOVS 128
432
433 typedef enum {
434   xk_Const, xk_Artdata
435 } XmitKind;
436
437 struct XmitDetails {
438   XmitKind kind;
439   union {
440     ARTHANDLE *sm_art;
441   } info;
442 };
443
444
445 /*----- core operational data structure types -----*/
446
447 struct InputFile {
448   /* This is also an instance of struct oop_readable */
449   struct oop_readable readable; /* first */
450   oop_readable_call *readable_callback;
451   void *readable_callback_user;
452
453   int fd;
454   Filemon_Perfile *filemon;
455
456   oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
457   off_t offset;
458   int skippinglong, paused;
459
460   ArticleList queue;
461   long inprogress; /* includes queue.count and also articles in conns */
462   long autodefer; /* -1 means not doing autodefer */
463
464   int counts[art_MaxState][RCI_max];
465   int readcount_ok, readcount_blank, readcount_err, count_nooffer_missing;
466   char path[];
467 };
468
469 struct Article {
470   ISNODE(Article);
471   ArtState state;
472   int midlen, missing;
473   InputFile *ipf;
474   TOKEN token;
475   off_t offset;
476   int blanklen;
477   char messageid[1];
478 };
479
480 #define SMS_LIST(X)                             \
481   X(NORMAL)                                     \
482   X(FLUSHING)                                   \
483   X(FLUSHFAILED)                                \
484   X(SEPARATED)                                  \
485   X(DROPPING)                                   \
486   X(DROPPED)
487
488 enum StateMachineState {
489 #define SMS_DEF_ENUM(s) sm_##s,
490   SMS_LIST(SMS_DEF_ENUM)
491 };
492
493 static const char *sms_names[]= {
494 #define SMS_DEF_NAME(s) #s ,
495   SMS_LIST(SMS_DEF_NAME)
496   0
497 };
498
499 struct Conn {
500   ISNODE(Conn);
501   int fd; /* may be 0, meaning closed (during construction/destruction) */
502   oop_read *rd; /* likewise */
503   int max_queue, stream, quitting;
504   int since_activity; /* periods */
505   ArticleList waiting; /* not yet told peer */
506   ArticleList priority; /* peer says send it now */
507   ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
508   struct iovec xmit[CONNIOVS];
509   XmitDetails xmitd[CONNIOVS];
510   int xmitu;
511 };
512
513
514 /*----- general operational variables -----*/
515
516 /* main initialises */
517 static oop_source *loop;
518 static ConnList conns;
519 static char *path_lock, *path_flushing, *path_defer;
520 static char *path_control, *path_dump;
521 static char *globpat_backlog;
522 static pid_t self_pid;
523
524 /* statemc_init initialises */
525 static StateMachineState sms;
526 static int until_flush;
527 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
528 static FILE *defer;
529
530 /* initialisation to 0 is good */
531 static int until_connect, until_backlog_nextscan;
532 static double accept_proportion;
533 static int nocheck, nocheck_reported, in_child;
534
535 /* for simulation, debugging, etc. */
536 int simulate_flush= -1;
537
538 /*========== logging ==========*/
539
540 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
541 static void logcore(int sysloglevel, const char *fmt, ...) {
542   VA;
543   if (become_daemon) {
544     vsyslog(sysloglevel,fmt,al);
545   } else {
546     if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
547     vfprintf(stderr,fmt,al);
548     putc('\n',stderr);
549   }
550   va_end(al);
551 }
552
553 static void logv(int sysloglevel, const char *pfx, int errnoval,
554                  const char *fmt, va_list al) PRINTF(5,0);
555 static void logv(int sysloglevel, const char *pfx, int errnoval,
556                  const char *fmt, va_list al) {
557   char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
558   vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
559   msgbuf[sizeof(msgbuf)-1]= 0;
560
561   if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
562     sysloglevel= LOG_ERR; /* run by wrong user, probably */
563
564   logcore(sysloglevel, "<%s>%s: %s%s%s",
565          sitename, pfx, msgbuf,
566          errnoval>=0 ? ": " : "",
567          errnoval>=0 ? strerror(errnoval) : "");
568 }
569
570 #define diewrap(fn, pfx, sysloglevel, err, estatus)             \
571   static void fn(const char *fmt, ...) NORET_PRINTF(1,2);       \
572   static void fn(const char *fmt, ...) {                        \
573     preterminate();                                             \
574     VA;                                                         \
575     logv(sysloglevel, pfx, err, fmt, al);                       \
576     exit(estatus);                                              \
577   }
578
579 #define logwrap(fn, pfx, sysloglevel, err)              \
580   static void fn(const char *fmt, ...) PRINTF(1,2);     \
581   static void fn(const char *fmt, ...) {                \
582     VA;                                                 \
583     logv(sysloglevel, pfx, err, fmt, al);               \
584     va_end(al);                                         \
585   }
586
587 diewrap(sysdie,   " critical", LOG_CRIT,    errno, 16);
588 diewrap(die,      " critical", LOG_CRIT,    -1,    16);
589
590 diewrap(sysfatal, " fatal",    LOG_ERR,     errno, 12);
591 diewrap(fatal,    " fatal",    LOG_ERR,     -1,    12);
592
593 logwrap(syswarn,  " warning",  LOG_WARNING, errno);
594 logwrap(warn,     " warning",  LOG_WARNING, -1);
595
596 logwrap(notice,   " notice",   LOG_NOTICE,  -1);
597 logwrap(info,     " info",     LOG_INFO,    -1);
598 logwrap(debug,    " debug",    LOG_DEBUG,   -1);
599
600
601 /*========== utility functions etc. ==========*/
602
603 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
604 static char *xvasprintf(const char *fmt, va_list al) {
605   char *str;
606   int rc= vasprintf(&str,fmt,al);
607   if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
608   return str;
609 }
610 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
611 static char *xasprintf(const char *fmt, ...) {
612   VA;
613   char *str= xvasprintf(fmt,al);
614   va_end(al);
615   return str;
616 }
617
618 static int close_perhaps(int *fd) {
619   if (*fd <= 0) return 0;
620   int r= close(*fd);
621   *fd=0;
622   return r;
623 }
624 static void xclose(int fd, const char *what, const char *what2) {
625   int r= close(fd);
626   if (r) sysdie("close %s%s",what,what2?what2:"");
627 }
628 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
629   if (*fd <= 0) return;
630   xclose(*fd,what,what2);
631   *fd=0;
632 }
633
634 static pid_t xfork(const char *what) {
635   pid_t child;
636
637   child= fork();
638   if (child==-1) sysfatal("cannot fork for %s",what);
639   debug("forked %s %ld", what, (unsigned long)child);
640   if (!child) postfork();
641   return child;
642 }
643
644 static void on_fd_read_except(int fd, oop_call_fd callback) {
645   loop->on_fd(loop, fd, OOP_READ,      callback, 0);
646   loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
647 }
648 static void cancel_fd_read_except(int fd) {
649   loop->cancel_fd(loop, fd, OOP_READ);
650   loop->cancel_fd(loop, fd, OOP_EXCEPTION);
651 }
652
653 static void report_child_status(const char *what, int status) {
654   if (WIFEXITED(status)) {
655     int es= WEXITSTATUS(status);
656     if (es)
657       warn("%s: child died with error exit status %d", what, es);
658   } else if (WIFSIGNALED(status)) {
659     int sig= WTERMSIG(status);
660     const char *sigstr= strsignal(sig);
661     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
662     if (sigstr)
663       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
664     else
665       warn("%s: child died due to unknown fatal signal %d%s",
666            what, sig, coredump);
667   } else {
668     warn("%s: child died with unknown wait status %d", what,status);
669   }
670 }
671
672 static int xwaitpid(pid_t *pid, const char *what) {
673   int status;
674
675   int r= kill(*pid, SIGKILL);
676   if (r) sysdie("cannot kill %s child", what);
677
678   pid_t got= waitpid(*pid, &status, 0);
679   if (got==-1) sysdie("cannot reap %s child", what);
680   if (got==0) die("cannot reap %s child", what);
681
682   *pid= 0;
683
684   return status;
685 }
686
687 static void *zxmalloc(size_t sz) {
688   void *p= xmalloc(sz);
689   memset(p,0,sz);
690   return p;
691 }
692
693 static void xunlink(const char *path, const char *what) {
694   int r= unlink(path);
695   if (r) sysdie("can't unlink %s %s", path, what);
696 }
697
698 static time_t xtime(void) {
699   time_t now= time(0);
700   if (now==-1) sysdie("time(2) failed");
701   return now;
702 }
703
704 static void xsigaction(int signo, const struct sigaction *sa) {
705   int r= sigaction(signo,sa,0);
706   if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
707 }
708
709 static void xsigsetdefault(int signo) {
710   struct sigaction sa;
711   memset(&sa,0,sizeof(sa));
712   sa.sa_handler= SIG_DFL;
713   xsigaction(signo,&sa);
714 }
715
716 static void xgettimeofday(struct timeval *tv_r) {
717   int r= gettimeofday(tv_r,0);
718   if (r) sysdie("gettimeofday(2) failed");
719 }
720
721 static void xsetnonblock(int fd, int nonblocking) {
722   int errnoval= oop_fd_nonblock(fd, nonblocking);
723   if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
724 }
725
726 static void check_isreg(const struct stat *stab, const char *path,
727                         const char *what) {
728   if (!S_ISREG(stab->st_mode))
729     die("%s %s not a plain file (mode 0%lo)",
730         what, path, (unsigned long)stab->st_mode);
731 }
732
733 static void xfstat(int fd, struct stat *stab_r, const char *what) {
734   int r= fstat(fd, stab_r);
735   if (r) sysdie("could not fstat %s", what);
736 }
737
738 static void xfstat_isreg(int fd, struct stat *stab_r,
739                          const char *path, const char *what) {
740   xfstat(fd, stab_r, what);
741   check_isreg(stab_r, path, what);
742 }
743
744 static void xlstat_isreg(const char *path, struct stat *stab,
745                          int *enoent_r /* 0 means ENOENT is fatal */,
746                          const char *what) {
747   int r= lstat(path, stab);
748   if (r) {
749     if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
750     sysdie("could not lstat %s %s", what, path);
751   }
752   if (enoent_r) *enoent_r= 0;
753   check_isreg(stab, path, what);
754 }
755
756 static int samefile(const struct stat *a, const struct stat *b) {
757   assert(S_ISREG(a->st_mode));
758   assert(S_ISREG(b->st_mode));
759   return (a->st_ino == b->st_ino &&
760           a->st_dev == b->st_dev);
761 }
762
763 static char *sanitise(const char *input, int len) {
764   static char sanibuf[100]; /* returns pointer to this buffer! */
765
766   const char *p= input;
767   const char *endp= len>=0 ? input+len : 0;
768   char *q= sanibuf;
769   *q++= '`';
770   for (;;) {
771     if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
772     int c= (!endp || p<endp) ? *p++ : 0;
773     if (!c) { *q++= '\''; *q=0; break; }
774     if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
775     sprintf(q,"\\x%02x",c);
776     q += 4;
777   }
778   return sanibuf;
779 }
780
781 static int isewouldblock(int errnoval) {
782   return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
783 }
784
785 /*========== command and control connections ==========*/
786
787 static int control_master;
788
789 typedef struct ControlConn ControlConn;
790 struct ControlConn {
791   void (*destroy)(ControlConn*);
792   int fd;
793   oop_read *rd;
794   FILE *out;
795   union {
796     struct sockaddr sa;
797     struct sockaddr_un un;
798   } sa;
799   socklen_t salen;
800 };
801
802 static const oop_rd_style control_rd_style= {
803   OOP_RD_DELIM_STRIP, '\n',
804   OOP_RD_NUL_FORBID,
805   OOP_RD_SHORTREC_FORBID
806 };
807
808 static void control_destroy(ControlConn *cc) {
809   cc->destroy(cc);
810 }
811
812 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
813   if (ferror(cc->out) | fflush(cc->out)) {
814     info("CTRL%d write error %s", cc->fd, strerror(errno));
815     control_destroy(cc);
816   }
817 }
818
819 static void control_prompt(ControlConn *cc /* may destroy*/) {
820   fprintf(cc->out, "%s| ", sitename);
821   control_checkouterr(cc);
822 }
823
824 struct ControlCommand {
825   const char *cmd;
826   void (*f)(ControlConn *cc, const ControlCommand *ccmd,
827             const char *arg, size_t argsz);
828   void *xdata;
829   int xval;
830 };
831
832 static const ControlCommand control_commands[];
833
834 #define CCMD(wh)                                                        \
835   static void ccmd_##wh(ControlConn *cc, const ControlCommand *c,       \
836                         const char *arg, size_t argsz)
837
838 CCMD(help) {
839   fputs("commands:\n", cc->out);
840   const ControlCommand *ccmd;
841   for (ccmd=control_commands; ccmd->cmd; ccmd++)
842     fprintf(cc->out, " %s\n", ccmd->cmd);
843   fputs("NB: permissible arguments are not shown above."
844         "  Not all commands listed are safe.  See innduct(8).\n", cc->out);
845 }
846
847 CCMD(flush) {
848   int ok= trigger_flush_ok();
849   if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
850 }
851
852 CCMD(stop) {
853   preterminate();
854   notice("terminating (CTRL%d)",cc->fd);
855   raise_default(SIGTERM);
856   abort();
857 }
858
859 CCMD(dump);
860
861 /* messing with our head: */
862 CCMD(period) { period(); }
863 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
864 CCMD(setint) { *(int*)c->xdata= c->xval; }
865 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
866
867 static const ControlCommand control_commands[]= {
868   { "h",             ccmd_help      },
869   { "flush",         ccmd_flush     },
870   { "stop",          ccmd_stop      },
871   { "dump q",        ccmd_dump, 0,0 },
872   { "dump a",        ccmd_dump, 0,1 },
873
874   { "p",             ccmd_period    },
875
876 #define POKES(cmd,func)                                                 \
877   { cmd "flush",     func,           &until_flush,             1 },     \
878   { cmd "conn",      func,           &until_connect,           0 },     \
879   { cmd "blscan",    func,           &until_backlog_nextscan,  0 },
880 POKES("next ", ccmd_setint)
881 POKES("prod ", ccmd_setint_period)
882
883   { "pretend flush", ccmd_setintarg, &simulate_flush             },
884   { "wedge blscan",  ccmd_setint,    &until_backlog_nextscan, -1 },
885   { 0 }
886 };
887
888 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
889                            const char *errmsg, int errnoval,
890                            const char *data, size_t recsz, void *cc_v) {
891   ControlConn *cc= cc_v;
892
893   if (!data) {
894     info("CTRL%d closed", cc->fd);
895     cc->destroy(cc);
896     return OOP_CONTINUE;
897   }
898
899   if (recsz == 0) goto prompt;
900
901   const ControlCommand *ccmd;
902   for (ccmd=control_commands; ccmd->cmd; ccmd++) {
903     int l= strlen(ccmd->cmd);
904     if (recsz < l) continue;
905     if (recsz > l && data[l] != ' ') continue;
906     if (memcmp(data, ccmd->cmd, l)) continue;
907
908     int argl= (int)recsz - (l+1); 
909     ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
910     goto prompt;
911   }
912
913   fputs("unknown command; h for help\n", cc->out);
914
915  prompt:
916   control_prompt(cc);
917   return OOP_CONTINUE;
918 }
919
920 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
921                             const char *errmsg, int errnoval,
922                             const char *data, size_t recsz, void *cc_v) {
923   ControlConn *cc= cc_v;
924   
925   info("CTRL%d read error %s", cc->fd, errmsg);
926   cc->destroy(cc);
927   return OOP_CONTINUE;
928 }
929
930 static int control_conn_startup(ControlConn *cc /* may destroy*/,
931                                 const char *how) {
932   cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
933   if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
934
935   int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
936                       control_rd_ok, cc,
937                       control_rd_err, cc);
938   if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
939
940   info("CTRL%d %s ready", cc->fd, how);
941   control_prompt(cc);
942   return 0;
943 }
944
945 static void control_stdio_destroy(ControlConn *cc) {
946   if (cc->rd) {
947     oop_rd_cancel(cc->rd);
948     errno= oop_rd_delete_tidy(cc->rd);
949     if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
950   }
951   free(cc);
952 }
953
954 static void control_stdio(void) {
955   NEW_DECL(ControlConn *,cc);
956   cc->destroy= control_stdio_destroy;
957
958   cc->fd= 0;
959   cc->out= stdout;
960   int r= control_conn_startup(cc,"stdio");
961   if (r) cc->destroy(cc);
962 }
963
964 static void control_accepted_destroy(ControlConn *cc) {
965   if (cc->rd) {
966     oop_rd_cancel(cc->rd);
967     oop_rd_delete_kill(cc->rd);
968   }
969   if (cc->out) { fclose(cc->out); cc->fd=0; }
970   close_perhaps(&cc->fd);
971   free(cc);
972 }
973
974 static void *control_master_readable(oop_source *lp, int master,
975                                      oop_event ev, void *u) {
976   NEW_DECL(ControlConn *,cc);
977   cc->destroy= control_accepted_destroy;
978
979   cc->salen= sizeof(cc->sa);
980   cc->fd= accept(master, &cc->sa.sa, &cc->salen);
981   if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
982
983   cc->out= fdopen(cc->fd, "w");
984   if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
985
986   int r= control_conn_startup(cc, "accepted");
987   if (r) goto x;
988
989   return OOP_CONTINUE;
990
991  x:
992   cc->destroy(cc);
993   return OOP_CONTINUE;
994 }
995
996 #define NOCONTROL(...) do{                                              \
997     syswarn("no control socket, because failed to " __VA_ARGS__);       \
998     goto nocontrol;                                                     \
999   }while(0)
1000
1001 static void control_init(void) {
1002   char *real=0;
1003   
1004   union {
1005     struct sockaddr sa;
1006     struct sockaddr_un un;
1007   } sa;
1008
1009   memset(&sa,0,sizeof(sa));
1010   int maxlen= sizeof(sa.un.sun_path);
1011
1012   int reallen= readlink(path_control, sa.un.sun_path, maxlen);
1013   if (reallen<0) {
1014     if (errno != ENOENT)
1015       NOCONTROL("readlink control socket symlink path %s", path_control);
1016   }
1017   if (reallen >= maxlen) {
1018     debug("control socket symlink path too long (r=%d)",reallen);
1019     xunlink(path_control, "old (overlong) control socket symlink");
1020     reallen= -1;
1021   }
1022   
1023   if (reallen<0) {
1024     struct stat stab;
1025     int r= lstat(realsockdir,&stab);
1026     if (r) {
1027       if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
1028
1029       r= mkdir(realsockdir, 0700);
1030       if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
1031
1032     } else {
1033       uid_t self= geteuid();
1034       if (!S_ISDIR(stab.st_mode) ||
1035           stab.st_uid != self ||
1036           stab.st_mode & 0007) {
1037         warn("no control socket, because real socket directory"
1038              " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
1039              !!S_ISDIR(stab.st_mode),
1040              (unsigned long)stab.st_uid, (unsigned long)self,
1041              (unsigned long)stab.st_mode & 0777UL);
1042         goto nocontrol;
1043       }
1044     }
1045
1046     real= xasprintf("%s/s%lx.%lx", realsockdir,
1047                     (unsigned long)xtime(), (unsigned long)self_pid);
1048     int reallen= strlen(real);
1049
1050     if (reallen >= maxlen) {
1051       warn("no control socket, because tmpnam gave overly-long path"
1052            " %s", real);
1053       goto nocontrol;
1054     }
1055     r= symlink(real, path_control);
1056     if (r) NOCONTROL("make control socket path %s a symlink to real"
1057                      " socket path %s", path_control, real);
1058     memcpy(sa.un.sun_path, real, reallen);
1059   }
1060
1061   int r= unlink(sa.un.sun_path);
1062   if (r && errno!=ENOENT)
1063     NOCONTROL("remove old real socket %s", sa.un.sun_path);
1064
1065   control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1066   if (control_master<0) NOCONTROL("create new control socket");
1067
1068   sa.un.sun_family= AF_UNIX;
1069   int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1070   r= bind(control_master, &sa.sa, sl);
1071   if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1072
1073   r= listen(control_master, 5);
1074   if (r) NOCONTROL("listen");
1075
1076   xsetnonblock(control_master, 1);
1077
1078   loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1079   info("control socket ok, real path %s", sa.un.sun_path);
1080
1081   return;
1082
1083  nocontrol:
1084   free(real);
1085   xclose_perhaps(&control_master, "control master",0);
1086   return;
1087 }
1088
1089 /*========== management of connections ==========*/
1090
1091 static void conn_closefd(Conn *conn, const char *msgprefix) {
1092   int r= close_perhaps(&conn->fd);
1093   if (r) info("C%d %serror closing socket: %s",
1094               conn->fd, msgprefix, strerror(errno));
1095 }
1096
1097 static void conn_dispose(Conn *conn) {
1098   if (!conn) return;
1099   if (conn->rd) {
1100     oop_rd_cancel(conn->rd);
1101     oop_rd_delete_kill(conn->rd);
1102     conn->rd= 0;
1103   }
1104   if (conn->fd) {
1105     loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1106     loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1107   }
1108   conn_closefd(conn,"");
1109   free(conn);
1110   until_connect= reconnect_delay_periods;
1111 }
1112
1113 static void *conn_exception(oop_source *lp, int fd,
1114                             oop_event ev, void *conn_v) {
1115   Conn *conn= conn_v;
1116   unsigned char ch;
1117   assert(fd == conn->fd);
1118   assert(ev == OOP_EXCEPTION);
1119   int r= read(conn->fd, &ch, 1);
1120   if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1121   else connfail(conn,"exceptional condition on socket (peer sent urgent"
1122                 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1123   return OOP_CONTINUE;
1124 }  
1125
1126 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1127   int requeue[art_MaxState];
1128   memset(requeue,0,sizeof(requeue));
1129
1130   Article *art;
1131   
1132   while ((art= LIST_REMHEAD(conn->priority)))
1133     LIST_ADDTAIL(art->ipf->queue, art);
1134
1135   while ((art= LIST_REMHEAD(conn->waiting)))
1136     LIST_ADDTAIL(art->ipf->queue, art);
1137
1138   while ((art= LIST_REMHEAD(conn->sent))) {
1139     requeue[art->state]++;
1140     if (art->state==art_Unsolicited) art->state= art_Unchecked;
1141     LIST_ADDTAIL(art->ipf->queue,art);
1142     check_reading_pause_resume(art->ipf);
1143   }
1144
1145   int i;
1146   XmitDetails *d;
1147   for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1148     xmit_free(d);
1149
1150   char *m= xvasprintf(fmt,al);
1151   warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1152        conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1153   free(m);
1154
1155   LIST_REMOVE(conns,conn);
1156   conn_dispose(conn);
1157   check_assign_articles();
1158 }
1159
1160 static void connfail(Conn *conn, const char *fmt, ...) {
1161   va_list al;
1162   va_start(al,fmt);
1163   vconnfail(conn,fmt,al);
1164   va_end(al);
1165 }
1166
1167 static void check_idle_conns(void) {
1168   Conn *conn;
1169   FOR_CONN(conn)
1170     conn->since_activity++;
1171  search_again:
1172   FOR_CONN(conn) {
1173     if (conn->since_activity <= need_activity_periods) continue;
1174
1175     /* We need to shut this down */
1176     if (conn->quitting)
1177       connfail(conn,"timed out waiting for response to QUIT");
1178     else if (conn->sent.count)
1179       connfail(conn,"timed out waiting for responses");
1180     else if (conn->waiting.count || conn->priority.count)
1181       connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1182     else if (conn->xmitu)
1183       connfail(conn,"peer has been sending responses"
1184                " before receiving our commands!");
1185     else {
1186       static const char quitcmd[]= "QUIT\r\n";
1187       int todo= sizeof(quitcmd)-1;
1188       const char *p= quitcmd;
1189       for (;;) {
1190         int r= write(conn->fd, p, todo);
1191         if (r<0) {
1192           if (isewouldblock(errno))
1193             connfail(conn, "blocked writing QUIT to idle connection");
1194           else
1195             connfail(conn, "failed to write QUIT to idle connection: %s",
1196                      strerror(errno));
1197           break;
1198         }
1199         assert(r<=todo);
1200         todo -= r;
1201         if (!todo) {
1202           conn->quitting= 1;
1203           conn->since_activity= 0;
1204           debug("C%d is idle, quitting", conn->fd);
1205           break;
1206         }
1207       }
1208     }
1209     goto search_again;
1210   }
1211 }  
1212
1213 /*---------- making new connections ----------*/
1214
1215 static pid_t connecting_child;
1216 static int connecting_fdpass_sock;
1217
1218 static void connect_attempt_discard(void) {
1219   if (connecting_child) {
1220     int status= xwaitpid(&connecting_child, "connect");
1221     if (!(WIFEXITED(status) ||
1222           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1223       report_child_status("connect", status);
1224   }
1225   if (connecting_fdpass_sock) {
1226     cancel_fd_read_except(connecting_fdpass_sock);
1227     xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1228   }
1229 }
1230
1231 #define PREP_DECL_MSG_CMSG(msg)                 \
1232   char msgbyte= 0;                              \
1233   struct iovec msgiov;                          \
1234   msgiov.iov_base= &msgbyte;                    \
1235   msgiov.iov_len= 1;                            \
1236   struct msghdr msg;                            \
1237   memset(&msg,0,sizeof(msg));                   \
1238   char msg##cbuf[CMSG_SPACE(sizeof(int))];      \
1239   msg.msg_iov= &msgiov;                         \
1240   msg.msg_iovlen= 1;                            \
1241   msg.msg_control= msg##cbuf;                   \
1242   msg.msg_controllen= sizeof(msg##cbuf);
1243
1244 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1245   Conn *conn= 0;
1246
1247   assert(fd == connecting_fdpass_sock);
1248
1249   PREP_DECL_MSG_CMSG(msg);
1250   
1251   ssize_t rs= recvmsg(fd, &msg, 0);
1252   if (rs<0) {
1253     if (isewouldblock(errno)) return OOP_CONTINUE;
1254     syswarn("failed to read socket from connecting child");
1255     goto x;
1256   }
1257
1258   NEW(conn);
1259   LIST_INIT(conn->waiting);
1260   LIST_INIT(conn->priority);
1261   LIST_INIT(conn->sent);
1262
1263   struct cmsghdr *h= 0;
1264   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1265   if (!h) {
1266     int status= xwaitpid(&connecting_child, "connect child (broken)");
1267
1268     if (WIFEXITED(status)) {
1269       if (WEXITSTATUS(status) != 0 &&
1270           WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1271           WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1272         /* child already reported the problem */;
1273       else {
1274         if (e == OOP_EXCEPTION)
1275           warn("connect: connection child exited code %d but"
1276                " unexpected exception on fdpass socket",
1277                WEXITSTATUS(status));
1278         else
1279           warn("connect: connection child exited code %d but"
1280                " no cmsg (rs=%d)",
1281                WEXITSTATUS(status), (int)rs);
1282       }
1283     } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1284       warn("connect: connection attempt timed out");
1285     } else {
1286       report_child_status("connect", status);
1287     }
1288     goto x;
1289   }
1290
1291 #define CHK(field, val)                                                  \
1292   if (h->cmsg_##field != val) {                                          \
1293     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1294         h->cmsg_##field, val);                                           \
1295     goto x;                                                              \
1296   }
1297   CHK(level, SOL_SOCKET);
1298   CHK(type,  SCM_RIGHTS);
1299   CHK(len,   CMSG_LEN(sizeof(conn->fd)));
1300 #undef CHK
1301
1302   if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1303
1304   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1305
1306   int status;
1307   pid_t got= waitpid(connecting_child, &status, 0);
1308   if (got==-1) sysdie("connect: real wait for child");
1309   assert(got == connecting_child);
1310   connecting_child= 0;
1311
1312   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1313   int es= WEXITSTATUS(status);
1314   switch (es) {
1315   case CONNCHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
1316   case CONNCHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
1317   default:
1318     fatal("connect: child gave unexpected exit status %d", es);
1319   }
1320
1321   /* Phew! */
1322   conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1323
1324   loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1325   conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1326   if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1327   int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1328                      &peer_rd_ok, conn,
1329                      &peer_rd_err, conn);
1330   if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1331
1332   notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1333   LIST_ADDHEAD(conns, conn);
1334
1335   connect_attempt_discard();
1336   check_assign_articles();
1337   return OOP_CONTINUE;
1338
1339  x:
1340   conn_dispose(conn);
1341   connect_attempt_discard();
1342   return OOP_CONTINUE;
1343 }
1344
1345 static int allow_connect_start(void) {
1346   return conns.count < max_connections
1347     && !connecting_child
1348     && !until_connect;
1349 }
1350
1351 static void connect_start(void) {
1352   assert(!connecting_child);
1353   assert(!connecting_fdpass_sock);
1354
1355   info("starting connection attempt");
1356
1357   int socks[2];
1358   int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1359   if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1360
1361   connecting_child= xfork("connection");
1362
1363   if (!connecting_child) {
1364     FILE *cn_from, *cn_to;
1365     char buf[NNTP_STRLEN+100];
1366     int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1367
1368     xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1369
1370     alarm(connection_setup_timeout);
1371     if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1372       int l= strlen(buf);
1373       int stripped=0;
1374       while (l>0) {
1375         unsigned char c= buf[l-1];
1376         if (!isspace(c)) break;
1377         if (c=='\n' || c=='\r') stripped=1;
1378         --l;
1379       }
1380       if (!buf[0]) {
1381         sysfatal("connect: connection attempt failed");
1382       } else {
1383         buf[l]= 0;
1384         fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1385               sanitise(buf,-1));
1386       }
1387     }
1388     if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1389       sysfatal("connect: authentication failed");
1390     if (try_stream) {
1391       if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1392           fflush(cn_to))
1393         sysfatal("connect: could not send MODE STREAM");
1394       buf[sizeof(buf)-1]= 0;
1395       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1396         if (ferror(cn_from))
1397           sysfatal("connect: could not read response to MODE STREAM");
1398         else
1399           fatal("connect: connection close in response to MODE STREAM");
1400       }
1401       int l= strlen(buf);
1402       assert(l>=1);
1403       if (buf[l-1]!='\n')
1404         fatal("connect: response to MODE STREAM is too long: %.100s...",
1405               sanitise(buf,-1));
1406       l--;  if (l>0 && buf[l-1]=='\r') l--;
1407       buf[l]= 0;
1408       char *ep;
1409       int rcode= strtoul(buf,&ep,10);
1410       if (ep != &buf[3])
1411         fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1412
1413       switch (rcode) {
1414       case 203:
1415         exitstatus= CONNCHILD_ESTATUS_STREAM;
1416         break;
1417       case 480:
1418       case 500:
1419         break;
1420       default:
1421         warn("connect: unexpected response to MODE STREAM: %.50s",
1422              sanitise(buf,-1));
1423         exitstatus= 2;
1424         break;
1425       }
1426     }
1427     int fd= fileno(cn_from);
1428
1429     PREP_DECL_MSG_CMSG(msg);
1430     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1431     cmsg->cmsg_level= SOL_SOCKET;
1432     cmsg->cmsg_type=  SCM_RIGHTS;
1433     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
1434     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1435
1436     msg.msg_controllen= cmsg->cmsg_len;
1437     r= sendmsg(socks[1], &msg, 0);
1438     if (r<0) sysdie("sendmsg failed for new connection");
1439     if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1440
1441     _exit(exitstatus);
1442   }
1443
1444   xclose(socks[1], "connecting fdpass child's socket",0);
1445   connecting_fdpass_sock= socks[0];
1446   xsetnonblock(connecting_fdpass_sock, 1);
1447   on_fd_read_except(connecting_fdpass_sock, connchild_event);
1448 }
1449
1450 /*---------- assigning articles to conns, and transmitting ----------*/
1451
1452 static Article *dequeue_from(int peek, InputFile *ipf) {
1453   if (!ipf) return 0;
1454   if (peek) return LIST_HEAD(ipf->queue);
1455
1456   Article *art= LIST_REMHEAD(ipf->queue);
1457   if (!art) return 0;
1458   check_reading_pause_resume(ipf);
1459   return art;
1460 }
1461
1462 static Article *dequeue(int peek) {
1463   Article *art;
1464   art= dequeue_from(peek, flushing_input_file);  if (art) return art;
1465   art= dequeue_from(peek, backlog_input_file);   if (art) return art;
1466   art= dequeue_from(peek, main_input_file);      if (art) return art;
1467   return 0;
1468 }
1469
1470 static void check_assign_articles(void) {
1471   for (;;) {
1472     if (!dequeue(1))
1473       break;
1474
1475     Conn *walk, *use=0;
1476     int spare=0, inqueue=0;
1477
1478     /* Find a connection to offer this article.  We prefer a busy
1479      * connection to an idle one, provided it's not full.  We take the
1480      * first (oldest) and since that's stable, it will mean we fill up
1481      * connections in order.  That way if we have too many
1482      * connections, the spare ones will go away eventually.
1483      */
1484     FOR_CONN(walk) {
1485       if (walk->quitting) continue;
1486       inqueue= walk->sent.count + walk->priority.count
1487              + walk->waiting.count;
1488       spare= walk->max_queue - inqueue;
1489       assert(inqueue <= max_queue_per_conn);
1490       assert(spare >= 0);
1491       if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1492       else if (spare>0) /*working*/ { use= walk; break; }
1493     }
1494     if (use) {
1495       if (!inqueue) use->since_activity= 0; /* reset idle counter */
1496       while (spare>0) {
1497         Article *art= dequeue(0);
1498         if (!art) break;
1499         LIST_ADDTAIL(use->waiting, art);
1500         spare--;
1501       }
1502       conn_maybe_write(use);
1503     } else if (allow_connect_start()) {
1504       until_connect= reconnect_delay_periods;
1505       connect_start();
1506       break;
1507     } else {
1508       break;
1509     }
1510   }
1511 }
1512
1513 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1514   conn_maybe_write(u);
1515   return OOP_CONTINUE;
1516 }
1517
1518 static void conn_maybe_write(Conn *conn)  {
1519   for (;;) {
1520     conn_make_some_xmits(conn);
1521     if (!conn->xmitu) {
1522       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1523       return;
1524     }
1525
1526     void *rp= conn_write_some_xmits(conn);
1527     if (rp==OOP_CONTINUE) {
1528       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1529       return;
1530     } else if (rp==OOP_HALT) {
1531       return;
1532     } else if (!rp) {
1533       /* transmitted everything */
1534     } else {
1535       abort();
1536     }
1537   }
1538 }
1539
1540 /*---------- expiry, flow control and deferral ----------*/
1541
1542 static void check_reading_pause_resume(InputFile *ipf) {
1543   if (ipf->queue.count >= max_queue_per_ipf)
1544     inputfile_reading_pause(ipf);
1545   else
1546     inputfile_reading_resume(ipf);
1547 }
1548
1549 static void article_defer(Article *art /* not on a queue */, int whichcount) {
1550   open_defer();
1551   if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1552       || fflush(defer))
1553     sysfatal("write to defer file %s",path_defer);
1554   article_done(art, whichcount);
1555 }
1556
1557 static int article_check_expired(Article *art /* must be queued, not conn */) {
1558   ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
1559   if (artdata) { SMfreearticle(artdata); return 0; }
1560
1561   LIST_REMOVE(art->ipf->queue, art);
1562   art->missing= 1;
1563   art->ipf->count_nooffer_missing++;
1564   article_done(art,-1);
1565   return 1;
1566 }
1567
1568 static void inputfile_queue_check_expired(InputFile *ipf) {
1569   if (!ipf) return;
1570
1571   for (;;) {
1572     Article *art= LIST_HEAD(ipf->queue);
1573     int exp= article_check_expired(art);
1574     if (!exp) break;
1575   }
1576   check_reading_pause_resume(ipf);
1577 }
1578
1579 static void article_autodefer(InputFile *ipf, Article *art) {
1580   ipf->autodefer++;
1581   article_defer(art,-1);
1582 }
1583
1584 static int has_article_in(const ArticleList *al, InputFile *ipf) {
1585   Article *art;
1586   for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
1587     if (art->ipf == ipf) return 1;
1588   return 0;
1589 }
1590
1591 static void autodefer_input_file_articles(InputFile *ipf) {
1592   Article *art;
1593   while ((art= LIST_REMHEAD(ipf->queue)))
1594     article_autodefer(ipf, art);
1595 }
1596
1597 static void autodefer_input_file(InputFile *ipf) {
1598   ipf->autodefer= 0;
1599
1600   autodefer_input_file_articles(ipf);
1601
1602   if (ipf->inprogress) {
1603     Conn *walk;
1604     FOR_CONN(walk) {
1605       if (has_article_in(&walk->waiting,  ipf) ||
1606           has_article_in(&walk->priority, ipf) ||
1607           has_article_in(&walk->sent,     ipf))
1608         walk->quitting= -1;
1609     }
1610     while (ipf->inprogress) {
1611       FOR_CONN(walk)
1612         if (walk->quitting < 0) goto found;
1613       abort(); /* where are they ?? */
1614
1615     found:
1616       connfail(walk, "connection is stuck or crawling,"
1617                " and we need to finish flush");
1618       autodefer_input_file_articles(ipf);
1619     }
1620   }
1621
1622   check_reading_pause_resume(ipf);
1623 }
1624
1625 /*========== article transmission ==========*/
1626
1627 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1628                   XmitKind kind) { /* caller must then fill in details */
1629   struct iovec *v= &conn->xmit[conn->xmitu];
1630   XmitDetails *d= &conn->xmitd[conn->xmitu++];
1631   v->iov_base= (char*)data;
1632   v->iov_len= len;
1633   d->kind= kind;
1634   return d;
1635 }
1636
1637 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1638   xmit_core(conn,data,len, xk_Const);
1639 }
1640 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1641
1642 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1643   XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1644   d->info.sm_art= ah;
1645 }
1646
1647 static void xmit_free(XmitDetails *d) {
1648   switch (d->kind) {
1649   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1650   case xk_Const:                                  break;
1651   default: abort();
1652   }
1653 }
1654
1655 static void *conn_write_some_xmits(Conn *conn) {
1656   /* return values:
1657    *      0:            nothing more to write, no need to call us again
1658    *      OOP_CONTINUE: more to write but fd not writeable
1659    *      OOP_HALT:     disaster, have destroyed conn
1660    */
1661   for (;;) {
1662     int count= conn->xmitu;
1663     if (!count) return 0;
1664
1665     if (count > IOV_MAX) count= IOV_MAX;
1666     ssize_t rs= writev(conn->fd, conn->xmit, count);
1667     if (rs < 0) {
1668       if (isewouldblock(errno)) return OOP_CONTINUE;
1669       connfail(conn, "write failed: %s", strerror(errno));
1670       return OOP_HALT;
1671     }
1672     assert(rs > 0);
1673
1674     int done;
1675     for (done=0; rs && done<conn->xmitu; done++) {
1676       struct iovec *vp= &conn->xmit[done];
1677       XmitDetails *dp= &conn->xmitd[done];
1678       if (rs > vp->iov_len) {
1679         rs -= vp->iov_len;
1680         xmit_free(dp);
1681       } else {
1682         vp->iov_base= (char*)vp->iov_base + rs;
1683         vp->iov_len -= rs;
1684       }
1685     }
1686     int newu= conn->xmitu - done;
1687     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
1688     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1689     conn->xmitu= newu;
1690   }
1691 }
1692
1693 static void conn_make_some_xmits(Conn *conn) {
1694   for (;;) {
1695     if (conn->xmitu+5 > CONNIOVS)
1696       break;
1697
1698     Article *art= LIST_REMHEAD(conn->priority);
1699     if (!art) art= LIST_REMHEAD(conn->waiting);
1700     if (!art) break;
1701
1702     if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1703       /* actually send it */
1704
1705       ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1706
1707       art->state=
1708         art->state == art_Unchecked ? art_Unsolicited :
1709         art->state == art_Wanted    ? art_Wanted      :
1710         (abort(),-1);
1711
1712       if (!artdata) art->missing= 1;
1713       art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1714
1715       if (conn->stream) {
1716         if (artdata) {
1717           XMIT_LITERAL("TAKETHIS ");
1718           xmit_noalloc(conn, art->messageid, art->midlen);
1719           XMIT_LITERAL("\r\n");
1720           xmit_artbody(conn, artdata);
1721         } else {
1722           article_done(art, -1);
1723           continue;
1724         }
1725       } else {
1726         /* we got 235 from IHAVE */
1727         if (artdata) {
1728           xmit_artbody(conn, artdata);
1729         } else {
1730           XMIT_LITERAL(".\r\n");
1731         }
1732       }
1733
1734       LIST_ADDTAIL(conn->sent, art);
1735
1736     } else {
1737       /* check it */
1738
1739       if (conn->stream)
1740         XMIT_LITERAL("CHECK ");
1741       else
1742         XMIT_LITERAL("IHAVE ");
1743       xmit_noalloc(conn, art->messageid, art->midlen);
1744       XMIT_LITERAL("\r\n");
1745
1746       assert(art->state == art_Unchecked);
1747       art->ipf->counts[art->state][RC_sent]++;
1748       LIST_ADDTAIL(conn->sent, art);
1749     }
1750   }
1751 }
1752
1753 /*========== handling responses from peer ==========*/
1754
1755 static const oop_rd_style peer_rd_style= {
1756   OOP_RD_DELIM_STRIP, '\n',
1757   OOP_RD_NUL_FORBID,
1758   OOP_RD_SHORTREC_FORBID
1759 };
1760
1761 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1762                          const char *errmsg, int errnoval,
1763                          const char *data, size_t recsz, void *conn_v) {
1764   Conn *conn= conn_v;
1765   connfail(conn, "error receiving from peer: %s", errmsg);
1766   return OOP_CONTINUE;
1767 }
1768
1769 static Article *article_reply_check(Conn *conn, const char *response,
1770                                     int code_indicates_streaming,
1771                                     int must_have_sent
1772                                         /* 1:yes, -1:no, 0:dontcare */,
1773                                     const char *sanitised_response) {
1774   Article *art= LIST_HEAD(conn->sent);
1775
1776   if (!art) {
1777     connfail(conn,
1778              "peer gave unexpected response when no commands outstanding: %s",
1779              sanitised_response);
1780     return 0;
1781   }
1782
1783   if (code_indicates_streaming) {
1784     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1785     if (!conn->stream) {
1786       connfail(conn, "peer gave streaming response code "
1787                " to IHAVE or subsequent body: %s", sanitised_response);
1788       return 0;
1789     }
1790     const char *got_mid= response+4;
1791     int got_midlen= strcspn(got_mid, " \n\r");
1792     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1793       connfail(conn, "peer gave streaming response with syntactically invalid"
1794                " messageid: %s", sanitised_response);
1795       return 0;
1796     }
1797     if (got_midlen != art->midlen ||
1798         memcmp(got_mid, art->messageid, got_midlen)) {
1799       connfail(conn, "peer gave streaming response code to wrong article -"
1800                " probable synchronisation problem; we offered: %s;"
1801                " peer said: %s",
1802                art->messageid, sanitised_response);
1803       return 0;
1804     }
1805   } else {
1806     if (conn->stream) {
1807       connfail(conn, "peer gave non-streaming response code to"
1808                " CHECK/TAKETHIS: %s", sanitised_response);
1809       return 0;
1810     }
1811   }
1812
1813   if (must_have_sent>0 && art->state < art_Wanted) {
1814     connfail(conn, "peer says article accepted but"
1815              " we had not sent the body: %s", sanitised_response);
1816     return 0;
1817   }
1818   if (must_have_sent<0 && art->state >= art_Wanted) {
1819     connfail(conn, "peer says please sent the article but we just did: %s",
1820              sanitised_response);
1821     return 0;
1822   }
1823
1824   Article *art_again= LIST_REMHEAD(conn->sent);
1825   assert(art_again == art);
1826   return art;
1827 }
1828
1829 static void update_nocheck(int accepted) {
1830   accept_proportion *= nocheck_decay;
1831   accept_proportion += accepted * (1.0 - nocheck_decay);
1832   int new_nocheck= accept_proportion >= nocheck_thresh;
1833   if (new_nocheck && !nocheck_reported) {
1834     notice("entering nocheck mode for the first time");
1835     nocheck_reported= 1;
1836   } else if (new_nocheck != nocheck) {
1837     debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1838   }
1839   nocheck= new_nocheck;
1840 }
1841
1842 static void article_done(Article *art, int whichcount) {
1843   if (whichcount>=0 && !art->missing)
1844     art->ipf->counts[art->state][whichcount]++;
1845
1846   if (whichcount == RC_accepted) update_nocheck(1);
1847   else if (whichcount == RC_unwanted) update_nocheck(0);
1848
1849   InputFile *ipf= art->ipf;
1850
1851   while (art->blanklen) {
1852     static const char spaces[]=
1853       "                                                                "
1854       "                                                                "
1855       "                                                                "
1856       "                                                                "
1857       "                                                                "
1858       "                                                                "
1859       "                                                                "
1860       "                                                                "
1861       "                                                                ";
1862     int w= art->blanklen;  if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1863     int r= pwrite(ipf->fd, spaces, w, art->offset);
1864     if (r==-1) {
1865       if (errno==EINTR) continue;
1866       sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1867              art->messageid, art->blanklen,
1868              (unsigned long)art->offset, ipf->path);
1869     }
1870     assert(r>=0 && r<=w);
1871     art->blanklen -= w;
1872     art->offset += w;
1873   }
1874
1875   ipf->inprogress--;
1876   assert(ipf->inprogress >= 0);
1877   free(art);
1878
1879   if (!ipf->inprogress && ipf != main_input_file)
1880     queue_check_input_done();
1881 }
1882
1883 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1884                         const char *errmsg, int errnoval,
1885                         const char *data, size_t recsz, void *conn_v) {
1886   Conn *conn= conn_v;
1887
1888   if (ev == OOP_RD_EOF) {
1889     connfail(conn, "unexpected EOF from peer");
1890     return OOP_CONTINUE;
1891   }
1892   assert(ev == OOP_RD_OK);
1893
1894   char *sani= sanitise(data,-1);
1895
1896   char *ep;
1897   unsigned long code= strtoul(data, &ep, 10);
1898   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1899     connfail(conn, "badly formatted response from peer: %s", sani);
1900     return OOP_CONTINUE;
1901   }
1902
1903   int conn_busy=
1904     conn->waiting.count ||
1905     conn->priority.count ||
1906     conn->sent.count ||
1907     conn->xmitu;
1908
1909   if (conn->quitting) {
1910     if (code!=205 && code!=503) {
1911       connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1912     } else {
1913       notice("C%d idle connection closed by us", conn->fd);
1914       assert(!conn_busy);
1915       LIST_REMOVE(conns,conn);
1916       conn_dispose(conn);
1917     }
1918     return OOP_CONTINUE;
1919   }
1920
1921   conn->since_activity= 0;
1922   Article *art;
1923
1924 #define GET_ARTICLE(musthavesent) do{                                         \
1925     art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1926     if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
1927   }while(0) 
1928
1929 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{       \
1930     code_streaming= (streaming);                                \
1931     GET_ARTICLE(musthavesent);                                  \
1932     article_done(art, RC_##how);                                \
1933     goto dealtwith;                                             \
1934   }while(0)
1935
1936 #define PEERBADMSG(m) do {                                      \
1937     connfail(conn, m ": %s", sani);  return OOP_CONTINUE;       \
1938   }while(0)
1939
1940   int code_streaming= 0;
1941
1942   switch (code) {
1943
1944   default:  PEERBADMSG("peer sent unexpected message");
1945
1946   case 400:
1947     if (conn_busy)
1948       PEERBADMSG("peer timed us out or stopped accepting articles");
1949
1950     notice("C%d idle connection closed by peer", conn->fd);
1951     LIST_REMOVE(conns,conn);
1952     conn_dispose(conn);
1953     return OOP_CONTINUE;
1954
1955   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1956   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1957
1958   case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1959   case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1960
1961   case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1962   case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1963
1964   case 238: /* CHECK says send it */
1965     code_streaming= 1;
1966   case 335: /* IHAVE says send it */
1967     GET_ARTICLE(-1);
1968     assert(art->state == art_Unchecked);
1969     art->ipf->counts[art->state][RC_accepted]++;
1970     art->state= art_Wanted;
1971     LIST_ADDTAIL(conn->priority, art);
1972     break;
1973
1974   case 431: /* CHECK or TAKETHIS says try later */
1975     code_streaming= 1;
1976   case 436: /* IHAVE says try later */
1977     GET_ARTICLE(0);
1978     article_defer(art, RC_deferred);
1979     break;
1980
1981   }
1982 dealtwith:
1983
1984   conn_maybe_write(conn);
1985   check_assign_articles();
1986   return OOP_CONTINUE;
1987 }
1988
1989
1990 /*========== monitoring of input files ==========*/
1991
1992 static void feedfile_eof(InputFile *ipf) {
1993   assert(ipf != main_input_file); /* promised by tailing_try_read */
1994   inputfile_reading_stop(ipf);
1995
1996   if (ipf == flushing_input_file) {
1997     assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1998     if (main_input_file) inputfile_reading_start(main_input_file);
1999     statemc_check_flushing_done();
2000   } else if (ipf == backlog_input_file) {
2001     statemc_check_backlog_done();
2002   } else {
2003     abort(); /* supposed to wait rather than get EOF on main input file */
2004   }
2005 }
2006
2007 static InputFile *open_input_file(const char *path) {
2008   int fd= open(path, O_RDWR);
2009   if (fd<0) {
2010     if (errno==ENOENT) return 0;
2011     sysfatal("unable to open input file %s", path);
2012   }
2013   assert(fd>0);
2014
2015   InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
2016   memset(ipf,0,sizeof(*ipf));
2017
2018   ipf->fd= fd;
2019   ipf->autodefer= -1;
2020   LIST_INIT(ipf->queue);
2021   strcpy(ipf->path, path);
2022
2023   return ipf;
2024 }
2025
2026 static void close_input_file(InputFile *ipf) { /* does not free */
2027   assert(!ipf->readable_callback); /* must have had ->on_cancel */
2028   assert(!ipf->filemon); /* must have had inputfile_reading_stop */
2029   assert(!ipf->rd); /* must have had inputfile_reading_stop */
2030   assert(!ipf->inprogress); /* no dangling pointers pointing here */
2031   xclose_perhaps(&ipf->fd, "input file ", ipf->path);
2032 }
2033
2034
2035 /*---------- dealing with articles read in the input file ----------*/
2036
2037 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
2038                                    const char *data, const char *how) {
2039   warn("corrupted file: %s, offset %lu: %s: in %s",
2040        ipf->path, (unsigned long)offset, how, sanitise(data,-1));
2041   ipf->readcount_err++;
2042   if (ipf->readcount_err > max_bad_data_initial +
2043       (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
2044     die("too much garbage in input file!  (%d errs, %d ok, %d blank)",
2045         ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
2046   return OOP_CONTINUE;
2047 }
2048
2049 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
2050                                oop_rd_event ev, const char *errmsg,
2051                                int errnoval, const char *data, size_t recsz,
2052                                void *ipf_v) {
2053   InputFile *ipf= ipf_v;
2054   assert(ev == OOP_RD_SYSTEM);
2055   errno= errnoval;
2056   sysdie("error reading input file: %s, offset %lu",
2057          ipf->path, (unsigned long)ipf->offset);
2058 }
2059
2060 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
2061                                   oop_rd_event ev, const char *errmsg,
2062                                   int errnoval, const char *data, size_t recsz,
2063                                   void *ipf_v) {
2064   InputFile *ipf= ipf_v;
2065   Article *art;
2066   char tokentextbuf[sizeof(TOKEN)*2+3];
2067
2068   if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
2069
2070   off_t old_offset= ipf->offset;
2071   ipf->offset += recsz + !!(ev == OOP_RD_OK);
2072
2073 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
2074
2075   if (ev==OOP_RD_PARTREC)
2076     feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
2077     /* but process it anyway */
2078
2079   if (ipf->skippinglong) {
2080     if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
2081     return OOP_CONTINUE;
2082   }
2083   if (ev==OOP_RD_LONG) {
2084     ipf->skippinglong= 1;
2085     X_BAD_DATA("overly long line");
2086   }
2087
2088   if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
2089   if (!recsz) X_BAD_DATA("empty line");
2090
2091   if (data[0]==' ') {
2092     if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
2093     ipf->readcount_blank++;
2094     return OOP_CONTINUE;
2095   }
2096
2097   char *space= strchr(data,' ');
2098   int tokenlen= space-data;
2099   int midlen= (int)recsz-tokenlen-1;
2100   if (midlen <= 2) X_BAD_DATA("no room for messageid");
2101   if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
2102
2103   if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
2104   memcpy(tokentextbuf, data, tokenlen);
2105   tokentextbuf[tokenlen]= 0;
2106   if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
2107
2108   ipf->readcount_ok++;
2109
2110   art= xmalloc(sizeof(*art) - 1 + midlen + 1);
2111   memset(art,0,sizeof(*art));
2112   art->state= art_Unchecked;
2113   art->midlen= midlen;
2114   art->ipf= ipf;  ipf->inprogress++;
2115   art->token= TextToToken(tokentextbuf);
2116   art->offset= old_offset;
2117   art->blanklen= recsz;
2118   strcpy(art->messageid, space+1);
2119   LIST_ADDTAIL(ipf->queue, art);
2120
2121   if (ipf->autodefer >= 0)
2122     article_autodefer(ipf, art);
2123   else if (ipf==backlog_input_file)
2124     article_check_expired(art);
2125
2126   if (sms==sm_NORMAL && ipf==main_input_file &&
2127       ipf->offset >= target_max_feedfile_size)
2128     statemc_start_flush("feed file size");
2129
2130   check_assign_articles(); /* may destroy conn but that's OK */
2131   check_reading_pause_resume(ipf);
2132   return OOP_CONTINUE;
2133 }
2134
2135 /*========== tailing input file ==========*/
2136
2137 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
2138                                      void *user) {
2139   InputFile *ipf= user;
2140   return ipf->readable_callback(loop, &ipf->readable,
2141                                 ipf->readable_callback_user);
2142 }
2143
2144 static void tailing_on_cancel(struct oop_readable *rable) {
2145   InputFile *ipf= (void*)rable;
2146
2147   if (ipf->filemon) filemon_stop(ipf);
2148   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2149   ipf->readable_callback= 0;
2150 }
2151
2152 static void tailing_queue_readable(InputFile *ipf) {
2153   /* lifetime of ipf here is OK because destruction will cause
2154    * on_cancel which will cancel this callback */
2155   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2156 }
2157
2158 static int tailing_on_readable(struct oop_readable *rable,
2159                                 oop_readable_call *cb, void *user) {
2160   InputFile *ipf= (void*)rable;
2161
2162   tailing_on_cancel(rable);
2163   ipf->readable_callback= cb;
2164   ipf->readable_callback_user= user;
2165   filemon_start(ipf);
2166
2167   tailing_queue_readable(ipf);
2168   return 0;
2169 }
2170
2171 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2172                                 size_t length) {
2173   InputFile *ipf= (void*)rable;
2174   for (;;) {
2175     ssize_t r= read(ipf->fd, buffer, length);
2176     if (r==-1) {
2177       if (errno==EINTR) continue;
2178       return r;
2179     }
2180     if (!r) {
2181       if (ipf==main_input_file) {
2182         errno=EAGAIN;
2183         return -1;
2184       } else if (ipf==flushing_input_file) {
2185         assert(ipf->rd);
2186         assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2187       } else if (ipf==backlog_input_file) {
2188         assert(ipf->rd);
2189       } else {
2190         abort();
2191       }
2192     }
2193     tailing_queue_readable(ipf);
2194     return r;
2195   }
2196 }
2197
2198 /*---------- filemon implemented with inotify ----------*/
2199
2200 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2201 #define HAVE_FILEMON
2202
2203 #include <sys/inotify.h>
2204
2205 static int filemon_inotify_fd;
2206 static int filemon_inotify_wdmax;
2207 static InputFile **filemon_inotify_wd2ipf;
2208
2209 struct Filemon_Perfile {
2210   int wd;
2211 };
2212
2213 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2214   int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2215   if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2216
2217   if (wd >= filemon_inotify_wdmax) {
2218     int newmax= wd+2;
2219     filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2220                                  sizeof(*filemon_inotify_wd2ipf) * newmax);
2221     memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2222            sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2223     filemon_inotify_wdmax= newmax;
2224   }
2225
2226   assert(!filemon_inotify_wd2ipf[wd]);
2227   filemon_inotify_wd2ipf[wd]= ipf;
2228
2229   debug("filemon inotify startfile %p wd=%d wdmax=%d",
2230         ipf, wd, filemon_inotify_wdmax);
2231
2232   pf->wd= wd;
2233 }
2234
2235 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2236   int wd= pf->wd;
2237   debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2238   int r= inotify_rm_watch(filemon_inotify_fd, wd);
2239   if (r) sysdie("inotify_rm_watch");
2240   filemon_inotify_wd2ipf[wd]= 0;
2241 }
2242
2243 static void *filemon_inotify_readable(oop_source *lp, int fd,
2244                                       oop_event e, void *u) {
2245   struct inotify_event iev;
2246   for (;;) {
2247     int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2248     if (r==-1) {
2249       if (isewouldblock(errno)) break;
2250       sysdie("read from inotify master");
2251     } else if (r==sizeof(iev)) {
2252       assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2253     } else {
2254       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2255     }
2256     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2257     debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2258     filemon_callback(ipf);
2259   }
2260   return OOP_CONTINUE;
2261 }
2262
2263 static int filemon_method_init(void) {
2264   filemon_inotify_fd= inotify_init();
2265   if (filemon_inotify_fd<0) {
2266     syswarn("filemon/inotify: inotify_init failed");
2267     return 0;
2268   }
2269   xsetnonblock(filemon_inotify_fd, 1);
2270   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2271
2272   debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2273   return 1;
2274 }
2275
2276 static void filemon_method_dump_info(FILE *f) {
2277   int i;
2278   fprintf(f,"inotify");
2279   DUMPV("%d",,filemon_inotify_fd);
2280   DUMPV("%d",,filemon_inotify_wdmax);
2281   for (i=0; i<filemon_inotify_wdmax; i++)
2282     fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i],);
2283 }
2284
2285 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2286
2287 /*---------- filemon dummy implementation ----------*/
2288
2289 #if !defined(HAVE_FILEMON)
2290
2291 struct Filemon_Perfile { int dummy; };
2292
2293 static int filemon_method_init(void) {
2294   warn("filemon/dummy: no filemon method compiled in");
2295   return 0;
2296 }
2297 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2298 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2299 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2300
2301 #endif /* !HAVE_FILEMON */
2302
2303 /*---------- filemon generic interface ----------*/
2304
2305 static void filemon_start(InputFile *ipf) {
2306   assert(!ipf->filemon);
2307
2308   NEW(ipf->filemon);
2309   filemon_method_startfile(ipf, ipf->filemon);
2310 }
2311
2312 static void filemon_stop(InputFile *ipf) {
2313   if (!ipf->filemon) return;
2314   filemon_method_stopfile(ipf, ipf->filemon);
2315   free(ipf->filemon);
2316   ipf->filemon= 0;
2317 }
2318
2319 static void filemon_callback(InputFile *ipf) {
2320   if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2321     ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2322 }
2323
2324 /*---------- interface to start and stop an input file ----------*/
2325
2326 static const oop_rd_style feedfile_rdstyle= {
2327   OOP_RD_DELIM_STRIP, '\n',
2328   OOP_RD_NUL_PERMIT,
2329   OOP_RD_SHORTREC_LONG,
2330 };
2331
2332 static void inputfile_reading_resume(InputFile *ipf) {
2333   if (!ipf->rd) return;
2334   if (!ipf->paused) return;
2335
2336   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2337                      feedfile_got_article,ipf, feedfile_read_err, ipf);
2338   if (r) sysdie("unable start reading feedfile %s",ipf->path);
2339
2340   ipf->paused= 0;
2341 }
2342
2343 static void inputfile_reading_pause(InputFile *ipf) {
2344   if (!ipf->rd) return;
2345   if (ipf->paused) return;
2346   oop_rd_cancel(ipf->rd);
2347   ipf->paused= 1;
2348 }
2349
2350 static void inputfile_reading_start(InputFile *ipf) {
2351   assert(!ipf->rd);
2352   ipf->readable.on_readable= tailing_on_readable;
2353   ipf->readable.on_cancel=   tailing_on_cancel;
2354   ipf->readable.try_read=    tailing_try_read;
2355   ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2356   ipf->readable.delete_kill= 0;
2357
2358   ipf->readable_callback= 0;
2359   ipf->readable_callback_user= 0;
2360
2361   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2362   assert(ipf->rd);
2363
2364   ipf->paused= 1;
2365   inputfile_reading_resume(ipf);
2366 }
2367
2368 static void inputfile_reading_stop(InputFile *ipf) {
2369   assert(ipf->rd);
2370   inputfile_reading_pause(ipf);
2371   oop_rd_delete(ipf->rd);
2372   ipf->rd= 0;
2373   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2374 }
2375
2376
2377 /*========== interaction with innd - state machine ==========*/
2378
2379 /* See official state diagram at top of file.  We implement
2380  * this as follows:
2381  * -8<-
2382
2383             .=======.
2384             ||START||
2385             `======='
2386                 |
2387                 | open F
2388                 |
2389                 |    F ENOENT
2390                 |`---------------------------------------------------.
2391       F OPEN OK |                                                    |
2392                 |`---------------- - - -                             |
2393        D ENOENT |       D EXISTS   see OVERALL STATES diagram        |
2394                 |                  for full startup logic            |
2395      ,--------->|                                                    |
2396      |          V                                                    |
2397      |     ============                                       try to |
2398      |      NORMAL                                            open D |
2399      |     [Normal]                                                  |
2400      |      main F tail                                              |
2401      |     ============                                              V
2402      |          |                                                    |
2403      |          | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT            |
2404      ^          | hardlink F to D                                    |
2405      |     [Hardlinked]                                              |
2406      |          | unlink F                                           |
2407      |          | our handle onto F is now onto D                    |
2408      |     [Moved]                                                   |
2409      |          |                                                    |
2410      |          |<-------------------<---------------------<---------+
2411      |          |                                                    |
2412      |          | spawn inndcomm flush                               |
2413      |          V                                                    |
2414      |     ==================                                        |
2415      |      FLUSHING[-ABSENT]                                        |
2416      |     [Flushing]                                                |
2417      |     main D tail/none                                          |
2418      |     ==================                                        |
2419      |          |                                                    |
2420      |          |   INNDCOMM FLUSH FAILS                             ^
2421      |          |`----------------------->----------.                |
2422      |          |                                   |                |
2423      |          |   NO SUCH SITE                    V                |
2424      ^          |`--------------->----.         ==================== |
2425      |          |                      \        FLUSHFAILED[-ABSENT] |
2426      |          |                       \         [Moved]            |
2427      |          | FLUSH OK               \       main D tail/none    |
2428      |          | open F                  \     ==================== |
2429      |          |                          \        |                |
2430      |          |                           \       | TIME TO RETRY  |
2431      |          |`------->----.     ,---<---'\      `----------------'
2432      |          |    D NONE   |     | D NONE  `----.
2433      |          V             |     |              V
2434      |     =============      V     V             ============
2435      |      SEPARATED-1       |     |              DROPPING-1
2436      |      flsh->rd!=0       |     |              flsh->rd!=0
2437      |     [Separated]        |     |             [Dropping]
2438      |      main F idle       |     |              main none
2439      |      flsh D tail       |     |              flsh D tail
2440      |     =============      |     |             ============
2441      |          |             |     | install       |
2442      ^          | EOF ON D    |     |  defer        | EOF ON D
2443      |          V             |     |               V
2444      |     ===============    |     |             ===============
2445      |      SEPARATED-2       |     |              DROPPING-2
2446      |      flsh->rd==0       |     V              flsh->rd==0
2447      |     [Finishing]        |     |             [Dropping]
2448      |      main F tail       |     `.             main none
2449      |      flsh D closed     |       `.           flsh D closed
2450      |     ===============    V         `.        ===============
2451      |          |                         `.          |
2452      |          | ALL D PROCESSED           `.        | ALL D PROCESSED
2453      |          V install defer as backlog    `.      | install defer
2454      ^          | close D                       `.    | close D
2455      |          | unlink D                        `.  | unlink D
2456      |          |                                  |  |
2457      |          |                                  V  V
2458      `----------'                               ==============
2459                                                  DROPPED
2460                                                 [Dropped]
2461                                                  main none
2462                                                  flsh none
2463                                                  some backlog
2464                                                 ==============
2465                                                       |
2466                                                       | ALL BACKLOG DONE
2467                                                       |
2468                                                       | unlink lock
2469                                                       | exit
2470                                                       V
2471                                                   ==========
2472                                                    (ESRCH)
2473                                                   [Droppped]
2474                                                   ==========
2475  * ->8-
2476  */
2477
2478 static void startup_set_input_file(InputFile *f) {
2479   assert(!main_input_file);
2480   main_input_file= f;
2481   inputfile_reading_start(f);
2482 }
2483
2484 static void statemc_lock(void) {
2485   int lockfd;
2486   struct stat stab, stabf;
2487   
2488   for (;;) {
2489     lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2490     if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2491
2492     struct flock fl;
2493     memset(&fl,0,sizeof(fl));
2494     fl.l_type= F_WRLCK;
2495     fl.l_whence= SEEK_SET;
2496     int r= fcntl(lockfd, F_SETLK, &fl);
2497     if (r==-1) {
2498       if (errno==EACCES || isewouldblock(errno)) {
2499         if (quiet_multiple) exit(0);
2500         fatal("another duct holds the lockfile");
2501       }
2502       sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2503     }
2504
2505     xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2506     int lock_noent;
2507     xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2508
2509     if (!lock_noent && samefile(&stab, &stabf))
2510       break;
2511
2512     xclose(lockfd, "stale lockfile ", path_lock);
2513   }
2514
2515   FILE *lockfile= fdopen(lockfd, "w");
2516   if (!lockfile) sysdie("fdopen lockfile");
2517
2518   int r= ftruncate(lockfd, 0);
2519   if (r) sysdie("truncate lockfile to write new info");
2520
2521   if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2522               (unsigned long)self_pid,
2523               sitename, feedfile, remote_host) == EOF ||
2524       fflush(lockfile))
2525     sysfatal("write info to lockfile %s", path_lock);
2526
2527   debug("startup: locked");
2528 }
2529
2530 static void statemc_init(void) {
2531   struct stat stabdefer;
2532
2533   search_backlog_file();
2534
2535   int defer_noent;
2536   xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2537   if (defer_noent) {
2538     debug("startup: ductdefer ENOENT");
2539   } else {
2540     debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2541     switch (stabdefer.st_nlink==1) {
2542     case 1:
2543       open_defer(); /* so that we will later close it and rename it */
2544       break;
2545     case 2:
2546       xunlink(path_defer, "stale defer file link"
2547               " (presumably hardlink to backlog file)");
2548       break;
2549     default:
2550       die("defer file %s has unexpected link count %d",
2551           path_defer, stabdefer.st_nlink);
2552     }
2553   }
2554
2555   struct stat stab_f, stab_d;
2556   int noent_f;
2557
2558   InputFile *file_d= open_input_file(path_flushing);
2559   if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2560
2561   xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2562
2563   if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2564     debug("startup: F==D => Hardlinked");
2565     xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2566     noent_f= 1;
2567   }
2568
2569   if (noent_f) {
2570     debug("startup: F ENOENT => Moved");
2571     if (file_d) startup_set_input_file(file_d);
2572     spawn_inndcomm_flush("feedfile missing at startup");
2573     /* => Flushing, sms:=FLUSHING */
2574   } else {
2575     if (file_d) {
2576       debug("startup: F!=D => Separated");
2577       startup_set_input_file(file_d);
2578       flushing_input_file= main_input_file;
2579       main_input_file= open_input_file(feedfile);
2580       if (!main_input_file) die("feedfile vanished during startup");
2581       SMS(SEPARATED, max_separated_periods,
2582           "found both old and current feed files");
2583     } else {
2584       debug("startup: F exists, D ENOENT => Normal");
2585       InputFile *file_f= open_input_file(feedfile);
2586       if (!file_f) die("feed file vanished during startup");
2587       startup_set_input_file(file_f);
2588       SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2589     }
2590   }
2591 }
2592
2593 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2594   assert(sms == sm_NORMAL);
2595
2596   debug("starting flush (%s) (%lu >?= %lu) (%d)",
2597         why,
2598         (unsigned long)(main_input_file ? main_input_file->offset : 0),
2599         (unsigned long)target_max_feedfile_size,
2600         until_flush);
2601
2602   int r= link(feedfile, path_flushing);
2603   if (r) sysfatal("link feedfile %s to flushing file %s",
2604                   feedfile, path_flushing);
2605   /* => Hardlinked */
2606
2607   xunlink(feedfile, "old feedfile link");
2608   /* => Moved */
2609
2610   spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2611 }
2612
2613 static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
2614   switch (sms) {
2615
2616   case sm_NORMAL:
2617     statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2618     return 1;
2619
2620   case sm_FLUSHFAILED:
2621     spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2622     return 1;
2623
2624   case sm_SEPARATED:
2625   case sm_DROPPING:
2626     warn("took too long to complete old feedfile after flush, autodeferring");
2627     assert(flushing_input_file);
2628     autodefer_input_file(flushing_input_file);
2629     return 1;
2630
2631   default:
2632     return 0;
2633   }
2634 }
2635
2636 static void statemc_period_poll(void) {
2637   if (!until_flush) return;
2638   until_flush--;
2639   assert(until_flush>=0);
2640
2641   if (until_flush) return;
2642   int ok= trigger_flush_ok();
2643   assert(ok);
2644 }
2645
2646 static int inputfile_is_done(InputFile *ipf) {
2647   if (!ipf) return 0;
2648   if (ipf->inprogress) return 0; /* new article in the meantime */
2649   if (ipf->rd) return 0; /* not had EOF */
2650   return 1;
2651 }
2652
2653 static void notice_processed(InputFile *ipf, int completed,
2654                              const char *what, const char *spec) {
2655   if (!ipf) return; /* allows preterminate to be lazy */
2656
2657 #define RCI_NOTHING(x) /* nothing */
2658 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2659 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2660
2661 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2662
2663   char *inprog= completed
2664     ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2665     : xasprintf(" inprogress=%ld", ipf->inprogress);
2666   char *autodefer= ipf->autodefer >= 0
2667     ? xasprintf(" autodeferred=%ld", ipf->autodefer)
2668     : xasprintf("%s","");
2669
2670   info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
2671        " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2672        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2673        ,
2674        completed?"completed":"processed", what, spec,
2675        ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
2676        inprog, autodefer, ipf->count_nooffer_missing,
2677        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2678        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2679        CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2680        , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2681        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
2682        );
2683
2684   free(inprog);
2685   free(autodefer);
2686
2687 #undef CNT
2688 }
2689
2690 static void statemc_check_backlog_done(void) {
2691   InputFile *ipf= backlog_input_file;
2692   if (!inputfile_is_done(ipf)) return;
2693
2694   const char *slash= strrchr(ipf->path, '/');
2695   const char *leaf= slash ? slash+1 : ipf->path;
2696   const char *under= strchr(slash, '_');
2697   const char *rest= under ? under+1 : leaf;
2698   if (!strncmp(rest,"backlog",7)) rest += 7;
2699   notice_processed(ipf,1,"backlog ",rest);
2700
2701   close_input_file(ipf);
2702   if (unlink(ipf->path)) {
2703     if (errno != ENOENT)
2704       sysdie("could not unlink processed backlog file %s", ipf->path);
2705     warn("backlog file %s vanished while we were reading it"
2706          " so we couldn't remove it (but it's done now, anyway)",
2707          ipf->path);
2708   }
2709   free(ipf);
2710   backlog_input_file= 0;
2711   search_backlog_file();
2712   return;
2713 }
2714
2715 static void statemc_check_flushing_done(void) {
2716   InputFile *ipf= flushing_input_file;
2717   if (!inputfile_is_done(ipf)) return;
2718
2719   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2720
2721   notice_processed(ipf,1,"feedfile","");
2722
2723   close_defer();
2724
2725   xunlink(path_flushing, "old flushing file");
2726
2727   close_input_file(flushing_input_file);
2728   free(flushing_input_file);
2729   flushing_input_file= 0;
2730
2731   if (sms==sm_SEPARATED) {
2732     notice("flush complete");
2733     SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2734   } else if (sms==sm_DROPPING) {
2735     SMS(DROPPED, max_separated_periods, "old flush complete");
2736     search_backlog_file();
2737     notice("feed dropped, but will continue until backlog is finished");
2738   }
2739 }
2740
2741 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2742                                       void *u) {
2743   assert(!inputfile_is_done(main_input_file));
2744   statemc_check_flushing_done();
2745   statemc_check_backlog_done();
2746   return OOP_CONTINUE;
2747 }
2748
2749 static void queue_check_input_done(void) {
2750   loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2751 }
2752
2753 static void statemc_setstate(StateMachineState newsms, int periods,
2754                              const char *forlog, const char *why) {
2755   sms= newsms;
2756   until_flush= periods;
2757
2758   const char *xtra= "";
2759   switch (sms) {
2760   case sm_FLUSHING:
2761   case sm_FLUSHFAILED:
2762     if (!main_input_file) xtra= "-ABSENT";
2763     break;
2764   case sm_SEPARATED:
2765   case sm_DROPPING:
2766     xtra= flushing_input_file->rd ? "-1" : "-2";
2767     break;
2768   default:;
2769   }
2770
2771   if (periods) {
2772     info("state %s%s[%d] %s",forlog,xtra,periods,why);
2773   } else {
2774     info("state %s%s %s",forlog,xtra,why);
2775   }
2776 }
2777
2778 /*---------- defer and backlog files ----------*/
2779
2780 static void open_defer(void) {
2781   struct stat stab;
2782
2783   if (defer) return;
2784
2785   defer= fopen(path_defer, "a+");
2786   if (!defer) sysfatal("could not open defer file %s", path_defer);
2787
2788   /* truncate away any half-written records */
2789
2790   xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2791
2792   if (stab.st_size > LONG_MAX)
2793     die("defer file %s size is far too large", path_defer);
2794
2795   if (!stab.st_size)
2796     return;
2797
2798   long orgsize= stab.st_size;
2799   long truncto= stab.st_size;
2800   for (;;) {
2801     if (!truncto) break; /* was only (if anything) one half-truncated record */
2802     if (fseek(defer, truncto-1, SEEK_SET) < 0)
2803       sysdie("seek in defer file %s while truncating partial", path_defer);
2804
2805     int r= getc(defer);
2806     if (r==EOF) {
2807       if (ferror(defer))
2808         sysdie("failed read from defer file %s", path_defer);
2809       else
2810         die("defer file %s shrank while we were checking it!", path_defer);
2811     }
2812     if (r=='\n') break;
2813     truncto--;
2814   }
2815
2816   if (stab.st_size != truncto) {
2817     warn("truncating half-record at end of defer file %s -"
2818          " shrinking by %ld bytes from %ld to %ld",
2819          path_defer, orgsize - truncto, orgsize, truncto);
2820
2821     if (fflush(defer))
2822       sysfatal("could not flush defer file %s", path_defer);
2823     if (ftruncate(fileno(defer), truncto))
2824       sysdie("could not truncate defer file %s", path_defer);
2825
2826   } else {
2827     info("continuing existing defer file %s (%ld bytes)",
2828          path_defer, orgsize);
2829   }
2830   if (fseek(defer, truncto, SEEK_SET))
2831     sysdie("could not seek to new end of defer file %s", path_defer);
2832 }
2833
2834 static void close_defer(void) {
2835   if (!defer)
2836     return;
2837
2838   struct stat stab;
2839   xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2840
2841   if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2842   defer= 0;
2843
2844   time_t now= xtime();
2845
2846   char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2847                            (unsigned long)now,
2848                            (unsigned long)stab.st_ino);
2849   if (link(path_defer, backlog))
2850     sysfatal("could not install defer file %s as backlog file %s",
2851            path_defer, backlog);
2852   if (unlink(path_defer))
2853     sysdie("could not unlink old defer link %s to backlog file %s",
2854            path_defer, backlog);
2855
2856   free(backlog);
2857
2858   if (until_backlog_nextscan < 0 ||
2859       until_backlog_nextscan > backlog_retry_minperiods + 1)
2860     until_backlog_nextscan= backlog_retry_minperiods + 1;
2861 }
2862
2863 static void poll_backlog_file(void) {
2864   if (until_backlog_nextscan < 0) return;
2865   if (until_backlog_nextscan-- > 0) return;
2866   search_backlog_file();
2867 }
2868
2869 static void search_backlog_file(void) {
2870   /* returns non-0 iff there are any backlog files */
2871
2872   glob_t gl;
2873   int r, i;
2874   struct stat stab;
2875   const char *oldest_path=0;
2876   time_t oldest_mtime=0, now;
2877
2878   if (backlog_input_file) return;
2879
2880  try_again:
2881
2882   r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2883
2884   switch (r) {
2885   case GLOB_ABORTED:
2886     sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2887   case GLOB_NOSPACE:
2888     fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2889   case 0:
2890     for (i=0; i<gl.gl_pathc; i++) {
2891       const char *path= gl.gl_pathv[i];
2892
2893       if (strchr(path,'#') || strchr(path,'~')) {
2894         debug("backlog file search skipping %s", path);
2895         continue;
2896       }
2897       r= stat(path, &stab);
2898       if (r) {
2899         syswarn("failed to stat backlog file %s", path);
2900         continue;
2901       }
2902       if (!S_ISREG(stab.st_mode)) {
2903         warn("backlog file %s is not a plain file (or link to one)", path);
2904         continue;
2905       }
2906       if (!oldest_path || stab.st_mtime < oldest_mtime) {
2907         oldest_path= path;
2908         oldest_mtime= stab.st_mtime;
2909       }
2910     }
2911   case GLOB_NOMATCH: /* fall through */
2912     break;
2913   default:
2914     sysdie("glob expansion of backlog pattern %s gave unexpected"
2915            " nonzero (error?) return value %d", globpat_backlog, r);
2916   }
2917
2918   if (!oldest_path) {
2919     debug("backlog scan: none");
2920
2921     if (sms==sm_DROPPED) {
2922       preterminate();
2923       notice("feed dropped and our work is complete");
2924
2925       int r= unlink(path_control);
2926       if (r && errno!=ENOENT)
2927         syswarn("failed to remove control symlink for old feed");
2928
2929       xunlink(path_lock,    "lockfile for old feed");
2930       exit(4);
2931     }
2932     until_backlog_nextscan= backlog_spontrescan_periods;
2933     goto xfree;
2934   }
2935
2936   now= xtime();
2937   double age= difftime(now, oldest_mtime);
2938   long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2939
2940   if (age_deficiency <= 0) {
2941     debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2942           age, age_deficiency, oldest_path);
2943
2944     backlog_input_file= open_input_file(oldest_path);
2945     if (!backlog_input_file) {
2946       warn("backlog file %s vanished as we opened it", oldest_path);
2947       globfree(&gl);
2948       goto try_again;
2949     }
2950     inputfile_reading_start(backlog_input_file);
2951     until_backlog_nextscan= -1;
2952     goto xfree;
2953   }
2954
2955   until_backlog_nextscan= age_deficiency / period_seconds;
2956
2957   if (backlog_spontrescan_periods >= 0 &&
2958       until_backlog_nextscan > backlog_spontrescan_periods)
2959     until_backlog_nextscan= backlog_spontrescan_periods;
2960
2961   debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2962         age, age_deficiency, until_backlog_nextscan, oldest_path);
2963
2964  xfree:
2965   globfree(&gl);
2966   return;
2967 }
2968
2969 /*---------- shutdown and signal handling ----------*/
2970
2971 static void preterminate(void) {
2972   if (in_child) return;
2973   notice_processed(main_input_file,0,"feedfile","");
2974   notice_processed(flushing_input_file,0,"flushing","");
2975   if (backlog_input_file)
2976     notice_processed(backlog_input_file,0, "backlog file ",
2977                      backlog_input_file->path);
2978 }
2979
2980 static int signal_self_pipe[2];
2981 static sig_atomic_t terminate_sig_flag;
2982
2983 static void raise_default(int signo) {
2984   xsigsetdefault(signo);
2985   raise(signo);
2986   abort();
2987 }
2988
2989 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2990   assert(fd=signal_self_pipe[0]);
2991   char buf[PIPE_BUF];
2992   int r= read(signal_self_pipe[0], buf, sizeof(buf));
2993   if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2994   if (r==0) die("eof on signal self pipe");
2995   if (terminate_sig_flag) {
2996     preterminate();
2997     notice("terminating (%s)", strsignal(terminate_sig_flag));
2998     raise_default(terminate_sig_flag);
2999   }
3000   return OOP_CONTINUE;
3001 }
3002
3003 static void sigarrived_handler(int signum) {
3004   static char x;
3005   switch (signum) {
3006   case SIGTERM:
3007   case SIGINT:
3008     if (!terminate_sig_flag) terminate_sig_flag= signum;
3009     break;
3010   default:
3011     abort();
3012   }
3013   write(signal_self_pipe[1],&x,1);
3014 }
3015
3016 static void init_signals(void) {
3017   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
3018     sysdie("could not ignore SIGPIPE");
3019
3020   if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
3021
3022   xsetnonblock(signal_self_pipe[0],1);
3023   xsetnonblock(signal_self_pipe[1],1);
3024
3025   struct sigaction sa;
3026   memset(&sa,0,sizeof(sa));
3027   sa.sa_handler= sigarrived_handler;
3028   sa.sa_flags= SA_RESTART;
3029   xsigaction(SIGTERM,&sa);
3030   xsigaction(SIGINT,&sa);
3031
3032   on_fd_read_except(signal_self_pipe[0], sigarrived_event);
3033 }
3034
3035 /*========== flushing the feed ==========*/
3036
3037 static pid_t inndcomm_child;
3038 static int inndcomm_sentinel_fd;
3039
3040 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
3041   assert(inndcomm_child);
3042   assert(fd == inndcomm_sentinel_fd);
3043   int status= xwaitpid(&inndcomm_child, "inndcomm");
3044   inndcomm_child= 0;
3045   
3046   cancel_fd_read_except(fd);
3047   xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
3048   inndcomm_sentinel_fd= 0;
3049
3050   assert(!flushing_input_file);
3051
3052   if (WIFEXITED(status)) {
3053     switch (WEXITSTATUS(status)) {
3054
3055     case INNDCOMMCHILD_ESTATUS_FAIL:
3056       goto failed;
3057
3058     case INNDCOMMCHILD_ESTATUS_NONESUCH:
3059       notice("feed has been dropped by innd, finishing up");
3060       flushing_input_file= main_input_file;
3061       tailing_queue_readable(flushing_input_file);
3062         /* we probably previously returned EAGAIN from our fake read method
3063          * when in fact we were at EOF, so signal another readable event
3064          * so we actually see the EOF */
3065
3066       main_input_file= 0;
3067
3068       if (flushing_input_file) {
3069         SMS(DROPPING, max_separated_periods,
3070             "feed dropped by innd, but must finish last flush");
3071       } else {
3072         close_defer();
3073         SMS(DROPPED, 0, "feed dropped by innd");
3074         search_backlog_file();
3075       }
3076       return OOP_CONTINUE;
3077
3078     case 0:
3079       /* as above */
3080       flushing_input_file= main_input_file;
3081       tailing_queue_readable(flushing_input_file);
3082
3083       main_input_file= open_input_file(feedfile);
3084       if (!main_input_file)
3085         die("flush succeeded but feedfile %s does not exist!", feedfile);
3086
3087       if (flushing_input_file) {
3088         SMS(SEPARATED, max_separated_periods, "recovery flush complete");
3089       } else {
3090         close_defer();
3091         SMS(NORMAL, spontaneous_flush_periods, "flush complete");
3092       }
3093       return OOP_CONTINUE;
3094
3095     default:
3096       goto unexpected_exitstatus;
3097
3098     }
3099   } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
3100     warn("flush timed out trying to talk to innd");
3101     goto failed;
3102   } else {
3103   unexpected_exitstatus:
3104     report_child_status("inndcomm child", status);
3105   }
3106
3107  failed:
3108   SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
3109   return OOP_CONTINUE;
3110 }
3111
3112 static void inndcommfail(const char *what) {
3113   syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
3114   exit(INNDCOMMCHILD_ESTATUS_FAIL);
3115 }
3116
3117 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
3118   int pipefds[2];
3119
3120   notice("flushing %s",why);
3121
3122   assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
3123   assert(!inndcomm_child);
3124   assert(!inndcomm_sentinel_fd);
3125
3126   if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
3127
3128   inndcomm_child= xfork("inndcomm child");
3129
3130   if (!inndcomm_child) {
3131     const char *flushargv[2]= { sitename, 0 };
3132     char *reply;
3133     int r;
3134
3135     xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
3136     /* parent spots the autoclose of pipefds[1] when we die or exit */
3137
3138     if (simulate_flush>=0) {
3139       warn("SIMULATING flush child status %d", simulate_flush);
3140       if (simulate_flush>128) raise(simulate_flush-128);
3141       else exit(simulate_flush);
3142     }
3143
3144     alarm(inndcomm_flush_timeout);
3145     r= ICCopen();                         if (r)   inndcommfail("connect");
3146     r= ICCcommand('f',flushargv,&reply);  if (r<0) inndcommfail("transmit");
3147     if (!r) exit(0); /* yay! */
3148
3149     if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
3150     syswarn("innd ctlinnd flush failed: innd said %s", reply);
3151     exit(INNDCOMMCHILD_ESTATUS_FAIL);
3152   }
3153
3154   simulate_flush= -1;
3155
3156   xclose(pipefds[1], "inndcomm sentinel child's end",0);
3157   inndcomm_sentinel_fd= pipefds[0];
3158   assert(inndcomm_sentinel_fd);
3159   on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
3160
3161   SMS(FLUSHING, 0, why);
3162 }
3163
3164 /*========== main program ==========*/
3165
3166 static void postfork_inputfile(InputFile *ipf) {
3167   if (!ipf) return;
3168   xclose(ipf->fd, "(in child) input file ", ipf->path);
3169 }
3170
3171 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
3172   /* we have no stdio streams that are buffered long-term */
3173   if (!f) return;
3174   if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
3175 }
3176
3177 static void postfork(void) {
3178   in_child= 1;
3179
3180   xsigsetdefault(SIGTERM);
3181   xsigsetdefault(SIGINT);
3182   xsigsetdefault(SIGPIPE);
3183   if (terminate_sig_flag) raise(terminate_sig_flag);
3184
3185   postfork_inputfile(main_input_file);
3186   postfork_inputfile(flushing_input_file);
3187
3188   Conn *conn;
3189   FOR_CONN(conn)
3190     conn_closefd(conn,"(in child) ");
3191
3192   postfork_stdio(defer, "defer file ", path_defer);
3193 }
3194
3195 typedef struct Every Every;
3196 struct Every {
3197   struct timeval interval;
3198   int fixed_rate;
3199   void (*f)(void);
3200 };
3201
3202 static void every_schedule(Every *e, struct timeval base);
3203
3204 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3205   Every *e= e_v;
3206   e->f();
3207   if (!e->fixed_rate) xgettimeofday(&base);
3208   every_schedule(e, base);
3209   return OOP_CONTINUE;
3210 }
3211
3212 static void every_schedule(Every *e, struct timeval base) {
3213   struct timeval when;
3214   timeradd(&base, &e->interval, &when);
3215   loop->on_time(loop, when, every_happens, e);
3216 }
3217
3218 static void every(int interval, int fixed_rate, void (*f)(void)) {
3219   NEW_DECL(Every *,e);
3220   e->interval.tv_sec= interval;
3221   e->interval.tv_usec= 0;
3222   e->fixed_rate= fixed_rate;
3223   e->f= f;
3224   struct timeval now;
3225   xgettimeofday(&now);
3226   every_schedule(e, now);
3227 }
3228
3229 static void filepoll(void) {
3230   filemon_callback(main_input_file);
3231   filemon_callback(flushing_input_file);
3232 }
3233
3234 static char *debug_report_ipf(InputFile *ipf) {
3235   if (!ipf) return xasprintf("none");
3236
3237   const char *slash= strrchr(ipf->path,'/');
3238   const char *path= slash ? slash+1 : ipf->path;
3239
3240   return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
3241                    ipf, path,
3242                    ipf->queue.count, ipf->inprogress, ipf->autodefer,
3243                    (long)ipf->offset, ipf->fd,
3244                    ipf->rd ? "" : ",!rd",
3245                    ipf->skippinglong ? "*skiplong" : "",
3246                    ipf->rd && ipf->paused ? "*paused" : "");
3247 }
3248
3249 static void period(void) {
3250   char *dipf_main=     debug_report_ipf(main_input_file);
3251   char *dipf_flushing= debug_report_ipf(flushing_input_file);
3252   char *dipf_backlog=  debug_report_ipf(backlog_input_file);
3253
3254   debug("PERIOD"
3255         " sms=%s[%d] conns=%d until_connect=%d"
3256         " input_files main:%s flushing:%s backlog:%s[%d]"
3257         " children connecting=%ld inndcomm=%ld"
3258         ,
3259         sms_names[sms], until_flush, conns.count, until_connect,
3260         dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan,
3261         (long)connecting_child, (long)inndcomm_child
3262         );
3263
3264   free(dipf_main);
3265   free(dipf_flushing);
3266   free(dipf_backlog);
3267
3268   if (until_connect) until_connect--;
3269
3270   inputfile_queue_check_expired(backlog_input_file);
3271   poll_backlog_file();
3272   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3273   statemc_period_poll();
3274   check_assign_articles();
3275   check_idle_conns();
3276 }
3277
3278
3279 /*========== dumping state ==========*/
3280
3281 static void dump_article_list(FILE *f, const ControlCommand *c,
3282                               const ArticleList *al) {
3283   fprintf(f, " count=%d\n", al->count);
3284   if (!c->xval) return;
3285   
3286   int i; Article *art;
3287   for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3288     fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3289     DUMPV("%p", art->,ipf);
3290     DUMPV("%d", art->,missing);
3291     DUMPV("%lu", (unsigned long)art->,offset);
3292     DUMPV("%d", art->,blanklen);
3293     DUMPV("%d", art->,midlen);
3294     fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3295   }
3296 }
3297   
3298 static void dump_input_file(FILE *f, const ControlCommand *c,
3299                             InputFile *ipf, const char *wh) {
3300   char *dipf= debug_report_ipf(ipf);
3301   fprintf(f,"input %s %s", wh, dipf);
3302   free(dipf);
3303   
3304   if (ipf) {
3305     DUMPV("%d", ipf->,readcount_ok);
3306     DUMPV("%d", ipf->,readcount_blank);
3307     DUMPV("%d", ipf->,readcount_err);
3308   }
3309   fprintf(f,"\n");
3310   if (ipf) {
3311     ArtState state; const char *const *statename; 
3312     for (state=0, statename=artstate_names; *statename; state++,statename++) {
3313 #define RC_DUMP_FMT(x) " " #x "=%d"
3314 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3315       fprintf(f,"input %s counts %-11s"
3316               RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3317               wh, *statename
3318               RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3319     }
3320     fprintf(f,"input %s queue", wh);
3321     dump_article_list(f,c,&ipf->queue);
3322   }
3323 }
3324
3325 CCMD(dump) {
3326   int i;
3327   fprintf(cc->out, "dumping state to %s\n", path_dump);
3328   FILE *f= fopen(path_dump, "w");
3329   if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3330
3331   fprintf(f,"general");
3332   DUMPV("%s", sms_names,[sms]);
3333   DUMPV("%d", ,until_flush);
3334   DUMPV("%ld", (long),self_pid);
3335   DUMPV("%p", , defer);
3336   DUMPV("%d", , until_connect);
3337   DUMPV("%d", , until_backlog_nextscan);
3338   DUMPV("%d", , simulate_flush);
3339   fprintf(f,"\nnocheck");
3340   DUMPV("%#.10f", , accept_proportion);
3341   DUMPV("%d", , nocheck);
3342   DUMPV("%d", , nocheck_reported);
3343   fprintf(f,"\n");
3344
3345   fprintf(f,"special");
3346   DUMPV("%ld", (long),connecting_child);
3347   DUMPV("%d", , connecting_fdpass_sock);
3348   DUMPV("%d", , control_master);
3349   fprintf(f,"\n");
3350
3351   fprintf(f,"filemon ");
3352   filemon_method_dump_info(f);
3353
3354   dump_input_file(f,c, main_input_file,     "main"    );
3355   dump_input_file(f,c, flushing_input_file, "flushing");
3356   dump_input_file(f,c, backlog_input_file,  "backlog" );
3357
3358   fprintf(f,"conns count=%d\n", conns.count);
3359
3360   Conn *conn;
3361   FOR_CONN(conn) {
3362
3363     fprintf(f,"C%d",conn->fd);
3364     DUMPV("%p",conn->,rd);             DUMPV("%d",conn->,max_queue);
3365     DUMPV("%d",conn->,stream);         DUMPV("%d",conn->,quitting);
3366     DUMPV("%d",conn->,since_activity);
3367     fprintf(f,"\n");
3368
3369     fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3370     fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3371     fprintf(f,"C%d sent",    conn->fd); dump_article_list(f,c,&conn->sent);
3372
3373     fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3374     for (i=0; i<conn->xmitu; i++) {
3375       const struct iovec *iv= &conn->xmit[i];
3376       const XmitDetails *xd= &conn->xmitd[i];
3377       char *dinfo;
3378       switch (xd->kind) {
3379       case xk_Const:    dinfo= xasprintf("Const");                 break;
3380       case xk_Artdata:  dinfo= xasprintf("A%p", xd->info.sm_art);  break;
3381       default:
3382         abort();
3383       }
3384       fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3385               sanitise(iv->iov_base, iv->iov_len));
3386       free(dinfo);
3387     }
3388   }
3389
3390   fprintf(f,"paths");
3391   DUMPV("%s", , path_lock);
3392   DUMPV("%s", , path_flushing);
3393   DUMPV("%s", , path_defer);
3394   DUMPV("%s", , path_control);
3395   DUMPV("%s", , path_dump);
3396   DUMPV("%s", , globpat_backlog);
3397   fprintf(f,"\n");
3398
3399   if (!!ferror(f) + !!fclose(f)) {
3400     fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3401     return;
3402   }
3403 }
3404
3405 /*========== option parsing ==========*/
3406
3407 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3408 static void vbadusage(const char *fmt, va_list al) {
3409   char *m= xvasprintf(fmt,al);
3410   fprintf(stderr, "bad usage: %s\n"
3411           "say --help for help, or read the manpage\n",
3412           m);
3413   if (become_daemon)
3414     syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3415   exit(8);
3416 }
3417
3418 /*---------- generic option parser ----------*/
3419
3420 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3421 static void badusage(const char *fmt, ...) {
3422   va_list al;
3423   va_start(al,fmt);
3424   vbadusage(fmt,al);
3425 }
3426
3427 enum OptFlags {
3428   of_seconds= 001000u,
3429   of_boolean= 002000u,
3430 };
3431
3432 typedef struct Option Option;
3433 typedef void OptionParser(const Option*, const char *val);
3434
3435 struct Option {
3436   int shrt;
3437   const char *lng, *formarg;
3438   void *store;
3439   OptionParser *fn;
3440   int intval;
3441 };
3442
3443 static void parse_options(const Option *options, char ***argvp) {
3444   /* on return *argvp is first non-option arg; argc is not updated */
3445
3446   for (;;) {
3447     const char *arg= *++(*argvp);
3448     if (!arg) break;
3449     if (*arg != '-') break;
3450     if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3451     int a;
3452     while ((a= *++arg)) {
3453       const Option *o;
3454       if (a=='-') {
3455         arg++;
3456         char *equals= strchr(arg,'=');
3457         int len= equals ? (equals - arg) : strlen(arg);
3458         for (o=options; o->shrt || o->lng; o++)
3459           if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3460             goto found_long;
3461         badusage("unknown long option --%s",arg);
3462       found_long:
3463         if (!o->formarg) {
3464           if (equals) badusage("option --%s does not take a value",o->lng);
3465           arg= 0;
3466         } else if (equals) {
3467           arg= equals+1;
3468         } else {
3469           arg= *++(*argvp);
3470           if (!arg) badusage("option --%s needs a value for %s",
3471                              o->lng, o->formarg);
3472         }
3473         o->fn(o, arg);
3474         break; /* eaten the whole argument now */
3475       }
3476       for (o=options; o->shrt || o->lng; o++)
3477         if (a == o->shrt)
3478           goto found_short;
3479       badusage("unknown short option -%c",a);
3480     found_short:
3481       if (!o->formarg) {
3482         o->fn(o,0);
3483       } else {
3484         if (!*++arg) {
3485           arg= *++(*argvp);
3486           if (!arg) badusage("option -%c needs a value for %s",
3487                              o->shrt, o->formarg);
3488         }
3489         o->fn(o,arg);
3490         break; /* eaten the whole argument now */
3491       }
3492     }
3493   }
3494 }
3495
3496 #define DELIMPERHAPS(delim,str)  (str) ? (delim) : "", (str) ? (str) : ""
3497
3498 static void print_options(const Option *options, FILE *f) {
3499   const Option *o;
3500   for (o=options; o->shrt || o->lng; o++) {
3501     char shrt[2] = { o->shrt, 0 };
3502     char *optspec= xasprintf("%s%s%s%s%s",
3503                              o->shrt ? "-" : "", shrt,
3504                              o->shrt && o->lng ? "|" : "",
3505                              DELIMPERHAPS("--", o->lng));
3506     fprintf(f, "  %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3507     free(optspec);
3508   }
3509 }
3510
3511 /*---------- specific option types ----------*/
3512
3513 static void op_integer(const Option *o, const char *val) {
3514   char *ep;
3515   errno= 0;
3516   unsigned long ul= strtoul(val,&ep,10);
3517   if (*ep || ep==val || errno || ul>INT_MAX)
3518     badusage("bad integer value for %s",o->lng);
3519   int *store= o->store;
3520   *store= ul;
3521 }
3522
3523 static void op_double(const Option *o, const char *val) {
3524   int *store= o->store;
3525   char *ep;
3526   errno= 0;
3527   *store= strtod(val, &ep);
3528   if (*ep || ep==val || errno)
3529     badusage("bad floating point value for %s",o->lng);
3530 }
3531
3532 static void op_string(const Option *o, const char *val) {
3533   const char **store= o->store;
3534   *store= val;
3535 }
3536
3537 static void op_seconds(const Option *o, const char *val) {
3538   int *store= o->store;
3539   char *ep;
3540   int unit;
3541
3542   double v= strtod(val,&ep);
3543   if (ep==val) badusage("bad time/duration value for %s",o->lng);
3544
3545   if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3546   else if (!strcmp(ep,"m") || !strcmp(ep,"min"))    unit= 60;
3547   else if (!strcmp(ep,"h") || !strcmp(ep,"hour"))   unit= 3600;
3548   else if (!strcmp(ep,"d") || !strcmp(ep,"day"))    unit= 86400;
3549   else if (!strcmp(ep,"das")) unit= 10;
3550   else if (!strcmp(ep,"hs"))  unit= 100;
3551   else if (!strcmp(ep,"ks"))  unit= 1000;
3552   else if (!strcmp(ep,"Ms"))  unit= 1000000;
3553   else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3554
3555   v *= unit;
3556   v= ceil(v);
3557   if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3558   *store= v;
3559 }
3560
3561 static void op_setint(const Option *o, const char *val) {
3562   int *store= o->store;
3563   *store= o->intval;
3564 }
3565
3566 /*---------- specific options ----------*/
3567
3568 static void help(const Option *o, const char *val);
3569
3570 static const Option innduct_options[]= {
3571 {'f',"feedfile",         "F",     &feedfile,                 op_string      },
3572 {'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
3573 {0,"no-daemon",          0,       &become_daemon,            op_setint, 0   },
3574 {0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
3575 {0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
3576 {'C',"inndconf",         "F",     &inndconffile,             op_string      },
3577 {'P',"port",             "PORT",  &port,                     op_integer     },
3578 {0,"ctrl-sock-dir",      0,       &realsockdir,              op_string      },
3579 {0,"help",               0,       0,                         help           },
3580
3581 {0,"max-connections",    "N",     &max_connections,          op_integer     },
3582 {0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
3583 {0,"max-queue-per-file", "N",     &max_queue_per_ipf,        op_integer     },
3584 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
3585 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
3586
3587 {0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
3588 {0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
3589 {0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
3590
3591 {0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
3592 {0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
3593
3594 {0,"reconnect-interval",     "PERIOD", &reconnect_delay_periods,  op_seconds },
3595 {0,"flush-retry-interval",   "PERIOD", &flushfail_retry_periods,  op_seconds },
3596 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3597 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3598 {0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
3599 {0,"flush-finish-timeout",   "PERIOD", &max_separated_periods,    op_seconds },
3600 {0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
3601
3602 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
3603 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
3604
3605 {0,0}
3606 };
3607
3608 static void printusage(FILE *f) {
3609   fputs("usage: innduct [options] site [fqdn]\n"
3610         "available options are:\n", f);
3611   print_options(innduct_options, f);
3612 }
3613
3614 static void help(const Option *o, const char *val) {
3615   printusage(stdout);
3616   if (ferror(stdout) || fflush(stdout)) {
3617     perror("innduct: writing help");
3618     exit(12);
3619   }
3620   exit(0);
3621 }
3622
3623 static void convert_to_periods_rndup(int *store) {
3624   *store += period_seconds-1;
3625   *store /= period_seconds;
3626 }
3627
3628 int main(int argc, char **argv) {
3629   if (!argv[1]) {
3630     printusage(stderr);
3631     exit(8);
3632   }
3633
3634   parse_options(innduct_options, &argv);
3635
3636   /* arguments */
3637
3638   sitename= *argv++;
3639   if (!sitename) badusage("need site name argument");
3640   remote_host= *argv++;
3641   if (*argv) badusage("too many non-option arguments");
3642
3643   /* defaults */
3644
3645   int r= innconf_read(inndconffile);
3646   if (!r) badusage("could not read inn.conf (more info on stderr)");
3647
3648   if (!remote_host) remote_host= sitename;
3649
3650   if (nocheck_thresh < 0 || nocheck_thresh > 100)
3651     badusage("nocheck threshold percentage must be between 0..100");
3652   nocheck_thresh *= 0.01;
3653
3654   if (nocheck_decay < 0.1)
3655     badusage("nocheck decay articles must be at least 0.1");
3656   nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3657
3658   convert_to_periods_rndup(&reconnect_delay_periods);
3659   convert_to_periods_rndup(&flushfail_retry_periods);
3660   convert_to_periods_rndup(&backlog_retry_minperiods);
3661   convert_to_periods_rndup(&backlog_spontrescan_periods);
3662   convert_to_periods_rndup(&spontaneous_flush_periods);
3663   convert_to_periods_rndup(&max_separated_periods);
3664   convert_to_periods_rndup(&need_activity_periods);
3665
3666   if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3667     badusage("bad input data ratio must be between 0..100");
3668   max_bad_data_ratio *= 0.01;
3669
3670   if (!feedfile) {
3671     feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3672   } else if (!feedfile[0]) {
3673     badusage("feed filename must be nonempty");
3674   } else if (feedfile[strlen(feedfile)-1]=='/') {
3675     feedfile= xasprintf("%s%s",feedfile,sitename);
3676   }
3677
3678   if (max_queue_per_ipf<0)
3679     max_queue_per_ipf= max_queue_per_conn * 2;
3680
3681   const char *feedfile_forbidden= "?*[~#";
3682   int c;
3683   while ((c= *feedfile_forbidden++))
3684     if (strchr(feedfile, c))
3685       badusage("feed filename may not contain metacharacter %c",c);
3686
3687   /* set things up */
3688
3689   path_lock=        xasprintf("%s_lock",      feedfile);
3690   path_flushing=    xasprintf("%s_flushing",  feedfile);
3691   path_defer=       xasprintf("%s_defer",     feedfile);
3692   path_control=     xasprintf("%s_control",   feedfile);
3693   path_dump=        xasprintf("%s_dump",      feedfile);
3694   globpat_backlog=  xasprintf("%s_backlog*",  feedfile);
3695
3696   oop_source_sys *sysloop= oop_sys_new();
3697   if (!sysloop) sysdie("could not create liboop event loop");
3698   loop= (oop_source*)sysloop;
3699
3700   LIST_INIT(conns);
3701
3702   if (become_daemon) {
3703     int i;
3704     for (i=3; i<255; i++)
3705       /* do this now before we open syslog, etc. */
3706       close(i);
3707     openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3708
3709     int null= open("/dev/null",O_RDWR);
3710     if (null<0) sysfatal("failed to open /dev/null");
3711     dup2(null,0);
3712     dup2(null,1);
3713     dup2(null,2);
3714     xclose(null, "/dev/null original fd",0);
3715
3716     pid_t child1= xfork("daemonise first fork");
3717     if (child1) _exit(0);
3718
3719     pid_t sid= setsid();
3720     if (sid != child1) sysfatal("setsid failed");
3721
3722     pid_t child2= xfork("daemonise second fork");
3723     if (child2) _exit(0);
3724   }
3725
3726   self_pid= getpid();
3727   if (self_pid==-1) sysdie("getpid");
3728
3729   statemc_lock();
3730
3731   init_signals();
3732
3733   notice("starting");
3734
3735   if (!become_daemon)
3736     control_stdio();
3737
3738   control_init();
3739
3740   int filemon_ok= 0;
3741   if (!try_filemon) {
3742     notice("filemon: suppressed by command line option, polling");
3743   } else {
3744     filemon_ok= filemon_method_init();
3745     if (!filemon_ok)
3746       warn("filemon: no file monitoring available, polling");
3747   }
3748   if (!filemon_ok)
3749     every(filepoll_seconds,0,filepoll);
3750
3751   every(period_seconds,1,period);
3752
3753   statemc_init();
3754
3755   /* let's go */
3756
3757   void *run= oop_sys_run(sysloop);
3758   assert(run == OOP_ERROR);
3759   sysdie("event loop failed");
3760 }