chiark / gitweb /
df2cdb9d95b5057258693dd2e533a420a15533d2
[inn-innduct.git] / backends / innduct.c
1 /*
2  * debugging rune:
3  *  build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
4  */
5
6 /*--
7 flow control notes
8 to ensure articles go away eventually
9 separate queue for each input file
10   queue expiry
11     every period, check head of backlog queue for expiry with SMretrieve
12       if too old: discard, and check next article
13     also check every backlog article as we read it
14   flush expiry
15     after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
16     one-off: eat queued articles from flushing and write them to defer
17     one-off: connfail all connections which have any articles from flushing
18     newly read articles from flushing go straight to defer
19     this should take care of it and get us out of this state
20 to avoid filling up ram needlessly
21   input control
22     limit number of queued articles for each ipf
23     pause/resume inputfile tailing
24 --*/
25
26 /*
27  * Newsfeeds file entries should look like this:
28  *     host.name.of.site[/exclude,exclude,...]\
29  *             :pattern,pattern...[/distribution,distribution...]\
30  *             :Tf,Wnm
31  *             :
32  * or
33  *     sitename[/exclude,exclude,...]\
34  *             :pattern,pattern...[/distribution,distribution...]\
35  *             :Tf,Wnm
36  *             :host.name.of.site
37  *
38  * Four files full of
39  *    token messageid
40  * or might be blanked out
41  *    <spc><spc><spc><spc>....
42  *
43  * F site.name                 main feed file
44  *                                opened/created, then written, by innd
45  *                                read by duct
46  *                                unlinked by duct
47  *                                tokens blanked out by duct when processed
48  *   site.name_lock            lock preventing multiple ducts
49  *                                to hold lock must open,F_SETLK[W]
50  *                                  and then stat to check that locked file
51  *                                  still has name site.name_lock
52  *                                holder of this lock is "duct"
53  *                                (only) lockholder may remove the lockfile
54  * D site.name_flushing        temporary feed file during flush (or crash)
55  *                                hardlink created by duct
56  *                                unlinked by duct
57  *   site.name_defer           431'd articles, still being written,
58  *                                created, written, used by duct
59  *
60  *   site.name_backlog.<date>.<inum>
61  *                             431'd articles, ready for innxmit or duct
62  *                                created (link/mv) by duct
63  *   site.name_backlog<anything-else>  (where <anything-else> does not
64  *                                      contain '#' or '~') eg
65  *   site.name_backlog.manual
66  *                             anything the sysadmin likes (eg, feed files
67  *                             from old feeds to be merged into this one)
68  *                                created (link/mv) by admin
69  *                                may be symlinks (in which case links
70  *                                may be written through, but only links
71  *                                will be removed.
72  *
73  *                             It is safe to remove backlog files manually,
74  *                             if it's desired to throw away the backlog.
75  *
76  * Backlog files are also processed by innduct.  We find the oldest
77  * backlog file which is at least a certain amount old, and feed it
78  * back into our processing.  When every article in it has been read
79  * and processed, we unlink it and look for another backlog file.
80  *
81  * If we don't have a backlog file that we're reading, we close the
82  * defer file that we're writing and make it into a backlog file at
83  * the first convenient opportunity.
84  * -8<-
85
86
87    OVERALL STATES:
88
89                                                                 START
90                                                                   |
91      ,-->--.                                                 check F, D
92      |     |                                                      |
93      |     |                                                      |
94      |     |  <----------------<---------------------------------'|
95      |     |                                       F exists       |
96      |     |                                       D ENOENT       |
97      |     |  duct opens F                                        |
98      |     V                                                      |
99      |  Normal                                                    |
100      |   F: innd writing, duct reading                            |
101      |   D: ENOENT                                                |
102      |     |                                                      |
103      |     |  duct decides time to flush                          |
104      |     |  duct makes hardlink                                 |
105      |     |                                                      |
106      |     V                            <------------------------'|
107      |  Hardlinked                                  F==D          |
108      |   F == D: innd writing, duct reading         both exist    |
109      ^     |                                                      |
110      |     |  duct unlinks F                                      |
111      |     |                        <-----------<-------------<--'|
112      |     |                           open D         F ENOENT    |
113      |     |                           if exists                  |
114      |     |                                                      |
115      |     V                        <---------------------.       |
116      |  Moved                                             |       |
117      |   F: ENOENT                                        |       |
118      |   D: innd writing, duct reading; or ENOENT         |       |
119      |     |                                              |       |
120      |     |  duct requests flush of feed                 |       |
121      |     |   (others can too, harmlessly)               |       |
122      |     V                                              |       |
123      |  Flushing                                          |       |
124      |   F: ENOENT                                        |       |
125      |   D: innd flushing, duct; or ENOENT                |       |
126      |     |                                              |       |
127      |     |   inndcomm flush fails                       |       |
128      |     |`-------------------------->------------------'       |
129      |     |                                                      |
130      |     |   inndcomm reports no such site                      |
131      |     |`---------------------------------------------------- | -.
132      |     |                                                      |  |
133      |     |  innd finishes writing D, creates F                  |  |
134      |     |  inndcomm reports flush successful                   |  |
135      |     |                                                      |  |
136      |     V                                                      |  |
137      |  Separated                                <----------------'  |
138      |   F: innd writing                            F!=D             /
139      |   D: duct reading; or ENOENT                  both exist     /
140      |     |                                                       /
141      |     |  duct gets to the end of D                           /
142      |     |  duct opens F too                                   /
143      |     V                                                    /
144      |  Finishing                                              /
145      |   F: innd writing, duct reading                        |
146      |   D: duct finishing                                    V
147      |     |                                            Dropping
148      |     |  duct finishes processing D                 F: ENOENT
149      |     V  duct unlinks D                             D: duct reading
150      |     |                                                  |
151      `--<--'                                                  | duct finishes
152                                                               |  processing D
153                                                               | duct unlinks D
154                                                               | duct exits
155                                                               V
156                                                         Dropped
157                                                          F: ENOENT
158                                                          D: ENOENT
159                                                          duct not running
160
161    "duct reading" means innduct is reading the file but also
162    overwriting processed tokens.
163
164  * ->8- -^L-
165  *
166  * rune for printing diagrams:
167
168 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
169
170  *
171  */
172
173 /*============================== PROGRAM ==============================*/
174
175 #define _GNU_SOURCE 1
176
177 #include "config.h"
178 #include "storage.h"
179 #include "nntp.h"
180 #include "libinn.h"
181 #include "inndcomm.h"
182
183 #include "inn/list.h"
184 #include "inn/innconf.h"
185
186 #include <sys/uio.h>
187 #include <sys/types.h>
188 #include <sys/wait.h>
189 #include <sys/stat.h>
190 #include <sys/socket.h>
191 #include <sys/un.h>
192 #include <unistd.h>
193 #include <string.h>
194 #include <signal.h>
195 #include <stdio.h>
196 #include <errno.h>
197 #include <syslog.h>
198 #include <fcntl.h>
199 #include <stdarg.h>
200 #include <assert.h>
201 #include <stdlib.h>
202 #include <stddef.h>
203 #include <glob.h>
204 #include <time.h>
205 #include <math.h>
206 #include <ctype.h>
207
208 #include <oop.h>
209 #include <oop-read.h>
210
211 /*----- general definitions, probably best not changed -----*/
212
213 #define CONNCHILD_ESTATUS_STREAM   24
214 #define CONNCHILD_ESTATUS_NOSTREAM 25
215
216 #define INNDCOMMCHILD_ESTATUS_FAIL     26
217 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
218
219 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
220 #define MAX_CONTROL_COMMAND 1000
221
222 #define VA                va_list al;  va_start(al,fmt)
223 #define PRINTF(f,a)       __attribute__((__format__(printf,f,a)))
224 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
225 #define NORET             __attribute__((__noreturn__))
226
227 #define NEW(ptr)              ((ptr)= zxmalloc(sizeof(*(ptr))))
228 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
229
230 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
231
232 #define FOR_CONN(conn) \
233   for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
234
235 /*----- doubly linked lists -----*/
236
237 #define ISNODE(T)   struct node list_node
238 #define DEFLIST(T)                              \
239    typedef struct {                             \
240      union { struct list li; T *for_type; } u;  \
241      int count;                                 \
242    } T##List
243
244 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
245
246 #define LIST_CHECKCANHAVENODE(l,n) \
247   ((void)((n) == ((l).u.for_type))) /* just for the type check */
248
249 #define LIST_ADDSOMEHOW(l,n,list_addsomehow)    \
250  ( LIST_CHECKCANHAVENODE(l,n),                  \
251    list_addsomehow(&(l).u.li, NODE((n))),       \
252    (void)(l).count++                            \
253    )
254
255 #define LIST_REMSOMEHOW(l,list_remsomehow)      \
256  ( (typeof((l).u.for_type))                     \
257    ( (l).count                                  \
258      ? ( (l).count--,                           \
259          list_remsomehow(&(l).u.li) )           \
260      : 0                                        \
261      )                                          \
262    )
263
264
265 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
266 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
267 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
268 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
269
270 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
271 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
272 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
273 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
274
275 #define LIST_REMOVE(l,n)                        \
276  ( LIST_CHECKCANHAVENODE(l,n),                  \
277    list_remove(NODE((n))),                      \
278    (void)(l).count--                            \
279    )
280
281 #define LIST_INSERT(l,n,pred)                                   \
282  ( LIST_CHECKCANHAVENODE(l,n),                                  \
283    LIST_CHECKCANHAVENODE(l,pred),                               \
284    list_insert((struct list*)&(l), NODE((n)), NODE((pred))),    \
285    (void)(l).count++                                            \
286    )
287
288 /*----- type predeclarations -----*/
289
290 typedef struct Conn Conn;
291 typedef struct Article Article;
292 typedef struct InputFile InputFile;
293 typedef struct XmitDetails XmitDetails;
294 typedef struct Filemon_Perfile Filemon_Perfile;
295 typedef enum StateMachineState StateMachineState;
296 typedef struct ControlCommand ControlCommand;
297
298 DEFLIST(Conn);
299 DEFLIST(Article);
300
301 /*----- function predeclarations -----*/
302
303 static void conn_maybe_write(Conn *conn);
304 static void conn_make_some_xmits(Conn *conn);
305 static void *conn_write_some_xmits(Conn *conn);
306
307 static void xmit_free(XmitDetails *d);
308
309 #define SMS(newstate, periods, why) \
310    (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
311 static void statemc_setstate(StateMachineState newsms, int periods,
312                              const char *forlog, const char *why);
313
314 static void statemc_start_flush(const char *why); /* Normal => Flushing */
315 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
316 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
317
318 static void article_done(Article *art, int whichcount);
319
320 static void check_assign_articles(void);
321 static void queue_check_input_done(void);
322 static void check_reading_pause_resume(InputFile *ipf);
323
324 static void statemc_check_flushing_done(void);
325 static void statemc_check_backlog_done(void);
326
327 static void postfork(void);
328 static void period(void);
329
330 static void open_defer(void);
331 static void close_defer(void);
332 static void search_backlog_file(void);
333 static void preterminate(void);
334 static void raise_default(int signo) NORET;
335 static char *debug_report_ipf(InputFile *ipf);
336
337 static void inputfile_reading_start(InputFile *ipf);
338 static void inputfile_reading_stop(InputFile *ipf);
339 static void inputfile_reading_pause(InputFile *ipf);
340 static void inputfile_reading_resume(InputFile *ipf);
341   /* pause and resume are idempotent, and no-op if not done _reading_start */
342
343 static void filemon_start(InputFile *ipf);
344 static void filemon_stop(InputFile *ipf);
345 static void filemon_callback(InputFile *ipf);
346
347 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
348 static void connfail(Conn *conn, const char *fmt, ...)         PRINTF(2,3);
349
350 static const oop_rd_style peer_rd_style;
351 static oop_rd_call peer_rd_err, peer_rd_ok;
352
353 /*----- configuration options -----*/
354 /* when changing defaults, remember to update the manpage */
355
356 static const char *sitename, *remote_host;
357 static const char *feedfile, *realsockdir="/tmp/innduct.control";
358 static int quiet_multiple=0;
359 static int become_daemon=1, try_filemon=1;
360 static int try_stream=1;
361 static int port=119;
362 static const char *inndconffile;
363
364 static int max_connections=10;
365 static int max_queue_per_conn=200;
366 static int target_max_feedfile_size=100000;
367 static int period_seconds=60;
368 static int filepoll_seconds=5;
369 static int max_queue_per_ipf=-1;
370
371 static int connection_setup_timeout=200;
372 static int inndcomm_flush_timeout=100;
373
374 static double nocheck_thresh= 95.0; /* converted from percentage by main */
375 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
376
377 /* all these are initialised to seconds, and converted to periods in main */
378 static int reconnect_delay_periods=1000;
379 static int flushfail_retry_periods=1000;
380 static int backlog_retry_minperiods=50;
381 static int backlog_spontrescan_periods=300;
382 static int spontaneous_flush_periods=100000;
383 static int max_separated_periods=2000;
384 static int need_activity_periods=1000;
385
386 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
387 static int max_bad_data_initial= 30;
388   /* in one corrupt 4096-byte block the number of newlines has
389    * mean 16 and standard deviation 3.99.  30 corresponds to z=+3.5 */
390
391
392 /*----- statistics -----*/
393
394 typedef enum {      /* in queue                 in conn->sent             */
395   art_Unchecked,    /*   not checked, not sent    checking                */
396   art_Wanted,       /*   checked, wanted          sent body as requested  */
397   art_Unsolicited,  /*   -                        sent body without check */
398   art_MaxState,
399 } ArtState;
400
401 static const char *const artstate_names[]=
402   { "Unchecked", "Wanted", "Unsolicited", 0 };
403
404 #define RESULT_COUNTS(RCS,RCN)                  \
405   RCS(sent)                                     \
406   RCS(accepted)                                 \
407   RCN(unwanted)                                 \
408   RCN(rejected)                                 \
409   RCN(deferred)                                 \
410   RCN(missing)                                  \
411   RCN(connretry)
412
413 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
414 #define RCI_TRIPLE_VALS_BASE(counts,x)          \
415        counts[art_Unchecked] x                  \
416        + counts[art_Wanted] x                   \
417        + counts[art_Unsolicited] x,             \
418        counts[art_Unchecked] x                  \
419        , counts[art_Wanted] x                   \
420        , counts[art_Unsolicited] x
421
422 typedef enum {
423 #define RC_INDEX(x) RC_##x,
424   RESULT_COUNTS(RC_INDEX, RC_INDEX)
425   RCI_max
426 } ResultCountIndex;
427
428
429 /*----- transmission buffers -----*/
430
431 #define CONNIOVS 128
432
433 typedef enum {
434   xk_Const, xk_Artdata
435 } XmitKind;
436
437 struct XmitDetails {
438   XmitKind kind;
439   union {
440     ARTHANDLE *sm_art;
441   } info;
442 };
443
444
445 /*----- core operational data structure types -----*/
446
447 struct InputFile {
448   /* This is also an instance of struct oop_readable */
449   struct oop_readable readable; /* first */
450   oop_readable_call *readable_callback;
451   void *readable_callback_user;
452
453   int fd;
454   Filemon_Perfile *filemon;
455
456   oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
457   off_t offset;
458   int skippinglong, paused;
459
460   ArticleList queue;
461   long inprogress; /* includes queue.count and also articles in conns */
462   long autodefer; /* -1 means not doing autodefer */
463
464   int counts[art_MaxState][RCI_max];
465   int readcount_ok, readcount_blank, readcount_err, count_nooffer_missing;
466   char path[];
467 };
468
469 struct Article {
470   ISNODE(Article);
471   ArtState state;
472   int midlen, missing;
473   InputFile *ipf;
474   TOKEN token;
475   off_t offset;
476   int blanklen;
477   char messageid[1];
478 };
479
480 #define SMS_LIST(X)                             \
481   X(NORMAL)                                     \
482   X(FLUSHING)                                   \
483   X(FLUSHFAILED)                                \
484   X(SEPARATED)                                  \
485   X(DROPPING)                                   \
486   X(DROPPED)
487
488 enum StateMachineState {
489 #define SMS_DEF_ENUM(s) sm_##s,
490   SMS_LIST(SMS_DEF_ENUM)
491 };
492
493 static const char *sms_names[]= {
494 #define SMS_DEF_NAME(s) #s ,
495   SMS_LIST(SMS_DEF_NAME)
496   0
497 };
498
499 struct Conn {
500   ISNODE(Conn);
501   int fd; /* may be 0, meaning closed (during construction/destruction) */
502   oop_read *rd; /* likewise */
503   int max_queue, stream, quitting;
504   int since_activity; /* periods */
505   ArticleList waiting; /* not yet told peer */
506   ArticleList priority; /* peer says send it now */
507   ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
508   struct iovec xmit[CONNIOVS];
509   XmitDetails xmitd[CONNIOVS];
510   int xmitu;
511 };
512
513
514 /*----- general operational variables -----*/
515
516 /* main initialises */
517 static oop_source *loop;
518 static ConnList conns;
519 static char *path_lock, *path_flushing, *path_defer;
520 static char *path_control, *path_dump;
521 static char *globpat_backlog;
522 static pid_t self_pid;
523
524 /* statemc_init initialises */
525 static StateMachineState sms;
526 static int until_flush;
527 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
528 static FILE *defer;
529
530 /* initialisation to 0 is good */
531 static int until_connect, until_backlog_nextscan;
532 static double accept_proportion;
533 static int nocheck, nocheck_reported, in_child;
534
535 /* for simulation, debugging, etc. */
536 int simulate_flush= -1;
537
538 /*========== logging ==========*/
539
540 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
541 static void logcore(int sysloglevel, const char *fmt, ...) {
542   VA;
543   if (become_daemon) {
544     vsyslog(sysloglevel,fmt,al);
545   } else {
546     if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
547     vfprintf(stderr,fmt,al);
548     putc('\n',stderr);
549   }
550   va_end(al);
551 }
552
553 static void logv(int sysloglevel, const char *pfx, int errnoval,
554                  const char *fmt, va_list al) PRINTF(5,0);
555 static void logv(int sysloglevel, const char *pfx, int errnoval,
556                  const char *fmt, va_list al) {
557   char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
558   vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
559   msgbuf[sizeof(msgbuf)-1]= 0;
560
561   if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
562     sysloglevel= LOG_ERR; /* run by wrong user, probably */
563
564   logcore(sysloglevel, "<%s>%s: %s%s%s",
565          sitename, pfx, msgbuf,
566          errnoval>=0 ? ": " : "",
567          errnoval>=0 ? strerror(errnoval) : "");
568 }
569
570 #define diewrap(fn, pfx, sysloglevel, err, estatus)             \
571   static void fn(const char *fmt, ...) NORET_PRINTF(1,2);       \
572   static void fn(const char *fmt, ...) {                        \
573     preterminate();                                             \
574     VA;                                                         \
575     logv(sysloglevel, pfx, err, fmt, al);                       \
576     exit(estatus);                                              \
577   }
578
579 #define logwrap(fn, pfx, sysloglevel, err)              \
580   static void fn(const char *fmt, ...) PRINTF(1,2);     \
581   static void fn(const char *fmt, ...) {                \
582     VA;                                                 \
583     logv(sysloglevel, pfx, err, fmt, al);               \
584     va_end(al);                                         \
585   }
586
587 diewrap(sysdie,   " critical", LOG_CRIT,    errno, 16);
588 diewrap(die,      " critical", LOG_CRIT,    -1,    16);
589
590 diewrap(sysfatal, " fatal",    LOG_ERR,     errno, 12);
591 diewrap(fatal,    " fatal",    LOG_ERR,     -1,    12);
592
593 logwrap(syswarn,  " warning",  LOG_WARNING, errno);
594 logwrap(warn,     " warning",  LOG_WARNING, -1);
595
596 logwrap(notice,   " notice",   LOG_NOTICE,  -1);
597 logwrap(info,     " info",     LOG_INFO,    -1);
598 logwrap(debug,    " debug",    LOG_DEBUG,   -1);
599
600
601 /*========== utility functions etc. ==========*/
602
603 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
604 static char *xvasprintf(const char *fmt, va_list al) {
605   char *str;
606   int rc= vasprintf(&str,fmt,al);
607   if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
608   return str;
609 }
610 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
611 static char *xasprintf(const char *fmt, ...) {
612   VA;
613   char *str= xvasprintf(fmt,al);
614   va_end(al);
615   return str;
616 }
617
618 static int close_perhaps(int *fd) {
619   if (*fd <= 0) return 0;
620   int r= close(*fd);
621   *fd=0;
622   return r;
623 }
624 static void xclose(int fd, const char *what, const char *what2) {
625   int r= close(fd);
626   if (r) sysdie("close %s%s",what,what2?what2:"");
627 }
628 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
629   if (*fd <= 0) return;
630   xclose(*fd,what,what2);
631   *fd=0;
632 }
633
634 static pid_t xfork(const char *what) {
635   pid_t child;
636
637   child= fork();
638   if (child==-1) sysfatal("cannot fork for %s",what);
639   debug("forked %s %ld", what, (unsigned long)child);
640   if (!child) postfork();
641   return child;
642 }
643
644 static void on_fd_read_except(int fd, oop_call_fd callback) {
645   loop->on_fd(loop, fd, OOP_READ,      callback, 0);
646   loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
647 }
648 static void cancel_fd_read_except(int fd) {
649   loop->cancel_fd(loop, fd, OOP_READ);
650   loop->cancel_fd(loop, fd, OOP_EXCEPTION);
651 }
652
653 static void report_child_status(const char *what, int status) {
654   if (WIFEXITED(status)) {
655     int es= WEXITSTATUS(status);
656     if (es)
657       warn("%s: child died with error exit status %d", what, es);
658   } else if (WIFSIGNALED(status)) {
659     int sig= WTERMSIG(status);
660     const char *sigstr= strsignal(sig);
661     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
662     if (sigstr)
663       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
664     else
665       warn("%s: child died due to unknown fatal signal %d%s",
666            what, sig, coredump);
667   } else {
668     warn("%s: child died with unknown wait status %d", what,status);
669   }
670 }
671
672 static int xwaitpid(pid_t *pid, const char *what) {
673   int status;
674
675   int r= kill(*pid, SIGKILL);
676   if (r) sysdie("cannot kill %s child", what);
677
678   pid_t got= waitpid(*pid, &status, 0);
679   if (got==-1) sysdie("cannot reap %s child", what);
680   if (got==0) die("cannot reap %s child", what);
681
682   *pid= 0;
683
684   return status;
685 }
686
687 static void *zxmalloc(size_t sz) {
688   void *p= xmalloc(sz);
689   memset(p,0,sz);
690   return p;
691 }
692
693 static void xunlink(const char *path, const char *what) {
694   int r= unlink(path);
695   if (r) sysdie("can't unlink %s %s", path, what);
696 }
697
698 static time_t xtime(void) {
699   time_t now= time(0);
700   if (now==-1) sysdie("time(2) failed");
701   return now;
702 }
703
704 static void xsigaction(int signo, const struct sigaction *sa) {
705   int r= sigaction(signo,sa,0);
706   if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
707 }
708
709 static void xsigsetdefault(int signo) {
710   struct sigaction sa;
711   memset(&sa,0,sizeof(sa));
712   sa.sa_handler= SIG_DFL;
713   xsigaction(signo,&sa);
714 }
715
716 static void xgettimeofday(struct timeval *tv_r) {
717   int r= gettimeofday(tv_r,0);
718   if (r) sysdie("gettimeofday(2) failed");
719 }
720
721 static void xsetnonblock(int fd, int nonblocking) {
722   int errnoval= oop_fd_nonblock(fd, nonblocking);
723   if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
724 }
725
726 static void check_isreg(const struct stat *stab, const char *path,
727                         const char *what) {
728   if (!S_ISREG(stab->st_mode))
729     die("%s %s not a plain file (mode 0%lo)",
730         what, path, (unsigned long)stab->st_mode);
731 }
732
733 static void xfstat(int fd, struct stat *stab_r, const char *what) {
734   int r= fstat(fd, stab_r);
735   if (r) sysdie("could not fstat %s", what);
736 }
737
738 static void xfstat_isreg(int fd, struct stat *stab_r,
739                          const char *path, const char *what) {
740   xfstat(fd, stab_r, what);
741   check_isreg(stab_r, path, what);
742 }
743
744 static void xlstat_isreg(const char *path, struct stat *stab,
745                          int *enoent_r /* 0 means ENOENT is fatal */,
746                          const char *what) {
747   int r= lstat(path, stab);
748   if (r) {
749     if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
750     sysdie("could not lstat %s %s", what, path);
751   }
752   if (enoent_r) *enoent_r= 0;
753   check_isreg(stab, path, what);
754 }
755
756 static int samefile(const struct stat *a, const struct stat *b) {
757   assert(S_ISREG(a->st_mode));
758   assert(S_ISREG(b->st_mode));
759   return (a->st_ino == b->st_ino &&
760           a->st_dev == b->st_dev);
761 }
762
763 static char *sanitise(const char *input, int len) {
764   static char sanibuf[100]; /* returns pointer to this buffer! */
765
766   const char *p= input;
767   const char *endp= len>=0 ? input+len : 0;
768   char *q= sanibuf;
769   *q++= '`';
770   for (;;) {
771     if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
772     int c= (!endp || p<endp) ? *p++ : 0;
773     if (!c) { *q++= '\''; *q=0; break; }
774     if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
775     sprintf(q,"\\x%02x",c);
776     q += 4;
777   }
778   return sanibuf;
779 }
780
781 static int isewouldblock(int errnoval) {
782   return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
783 }
784
785 /*========== command and control connections ==========*/
786
787 static int control_master;
788
789 typedef struct ControlConn ControlConn;
790 struct ControlConn {
791   void (*destroy)(ControlConn*);
792   int fd;
793   oop_read *rd;
794   FILE *out;
795   union {
796     struct sockaddr sa;
797     struct sockaddr_un un;
798   } sa;
799   socklen_t salen;
800 };
801
802 static const oop_rd_style control_rd_style= {
803   OOP_RD_DELIM_STRIP, '\n',
804   OOP_RD_NUL_FORBID,
805   OOP_RD_SHORTREC_FORBID
806 };
807
808 static void control_destroy(ControlConn *cc) {
809   cc->destroy(cc);
810 }
811
812 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
813   if (ferror(cc->out) | fflush(cc->out)) {
814     info("CTRL%d write error %s", cc->fd, strerror(errno));
815     control_destroy(cc);
816   }
817 }
818
819 static void control_prompt(ControlConn *cc /* may destroy*/) {
820   fprintf(cc->out, "%s| ", sitename);
821   control_checkouterr(cc);
822 }
823
824 struct ControlCommand {
825   const char *cmd;
826   void (*f)(ControlConn *cc, const ControlCommand *ccmd,
827             const char *arg, size_t argsz);
828   void *xdata;
829   int xval;
830 };
831
832 static const ControlCommand control_commands[];
833
834 #define CCMD(wh)                                                        \
835   static void ccmd_##wh(ControlConn *cc, const ControlCommand *c,       \
836                         const char *arg, size_t argsz)
837
838 CCMD(help) {
839   fputs("commands:\n", cc->out);
840   const ControlCommand *ccmd;
841   for (ccmd=control_commands; ccmd->cmd; ccmd++)
842     fprintf(cc->out, " %s\n", ccmd->cmd);
843   fputs("NB: permissible arguments are not shown above."
844         "  Not all commands listed are safe.  See innduct(8).\n", cc->out);
845 }
846
847 CCMD(flush) {
848   int ok= trigger_flush_ok();
849   if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
850 }
851
852 CCMD(stop) {
853   preterminate();
854   notice("terminating (CTRL%d)",cc->fd);
855   raise_default(SIGTERM);
856   abort();
857 }
858
859 CCMD(dump);
860
861 /* messing with our head: */
862 CCMD(period) { period(); }
863 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
864 CCMD(setint) { *(int*)c->xdata= c->xval; }
865 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
866
867 static const ControlCommand control_commands[]= {
868   { "h",             ccmd_help      },
869   { "flush",         ccmd_flush     },
870   { "stop",          ccmd_stop      },
871   { "dump q",        ccmd_dump, 0,0 },
872   { "dump a",        ccmd_dump, 0,1 },
873
874   { "p",             ccmd_period    },
875
876 #define POKES(cmd,func)                                                 \
877   { cmd "flush",     func,           &until_flush,             1 },     \
878   { cmd "conn",      func,           &until_connect,           0 },     \
879   { cmd "blscan",    func,           &until_backlog_nextscan,  0 },
880 POKES("next ", ccmd_setint)
881 POKES("prod ", ccmd_setint_period)
882
883   { "pretend flush", ccmd_setintarg, &simulate_flush             },
884   { "wedge blscan",  ccmd_setint,    &until_backlog_nextscan, -1 },
885   { 0 }
886 };
887
888 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
889                            const char *errmsg, int errnoval,
890                            const char *data, size_t recsz, void *cc_v) {
891   ControlConn *cc= cc_v;
892
893   if (!data) {
894     info("CTRL%d closed", cc->fd);
895     cc->destroy(cc);
896     return OOP_CONTINUE;
897   }
898
899   if (recsz == 0) goto prompt;
900
901   const ControlCommand *ccmd;
902   for (ccmd=control_commands; ccmd->cmd; ccmd++) {
903     int l= strlen(ccmd->cmd);
904     if (recsz < l) continue;
905     if (recsz > l && data[l] != ' ') continue;
906     if (memcmp(data, ccmd->cmd, l)) continue;
907
908     int argl= (int)recsz - (l+1); 
909     ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
910     goto prompt;
911   }
912
913   fputs("unknown command; h for help\n", cc->out);
914
915  prompt:
916   control_prompt(cc);
917   return OOP_CONTINUE;
918 }
919
920 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
921                             const char *errmsg, int errnoval,
922                             const char *data, size_t recsz, void *cc_v) {
923   ControlConn *cc= cc_v;
924   
925   info("CTRL%d read error %s", cc->fd, errmsg);
926   cc->destroy(cc);
927   return OOP_CONTINUE;
928 }
929
930 static int control_conn_startup(ControlConn *cc /* may destroy*/,
931                                 const char *how) {
932   cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
933   if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
934
935   int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
936                       control_rd_ok, cc,
937                       control_rd_err, cc);
938   if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
939
940   info("CTRL%d %s ready", cc->fd, how);
941   control_prompt(cc);
942   return 0;
943 }
944
945 static void control_stdio_destroy(ControlConn *cc) {
946   if (cc->rd) {
947     oop_rd_cancel(cc->rd);
948     errno= oop_rd_delete_tidy(cc->rd);
949     if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
950   }
951   free(cc);
952 }
953
954 static void control_stdio(void) {
955   NEW_DECL(ControlConn *,cc);
956   cc->destroy= control_stdio_destroy;
957
958   cc->fd= 0;
959   cc->out= stdout;
960   int r= control_conn_startup(cc,"stdio");
961   if (r) cc->destroy(cc);
962 }
963
964 static void control_accepted_destroy(ControlConn *cc) {
965   if (cc->rd) {
966     oop_rd_cancel(cc->rd);
967     oop_rd_delete_kill(cc->rd);
968   }
969   if (cc->out) { fclose(cc->out); cc->fd=0; }
970   close_perhaps(&cc->fd);
971   free(cc);
972 }
973
974 static void *control_master_readable(oop_source *lp, int master,
975                                      oop_event ev, void *u) {
976   NEW_DECL(ControlConn *,cc);
977   cc->destroy= control_accepted_destroy;
978
979   cc->salen= sizeof(cc->sa);
980   cc->fd= accept(master, &cc->sa.sa, &cc->salen);
981   if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
982
983   cc->out= fdopen(cc->fd, "w");
984   if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
985
986   int r= control_conn_startup(cc, "accepted");
987   if (r) goto x;
988
989   return OOP_CONTINUE;
990
991  x:
992   cc->destroy(cc);
993   return OOP_CONTINUE;
994 }
995
996 #define NOCONTROL(...) do{                                              \
997     syswarn("no control socket, because failed to " __VA_ARGS__);       \
998     goto nocontrol;                                                     \
999   }while(0)
1000
1001 static void control_init(void) {
1002   char *real=0;
1003   
1004   union {
1005     struct sockaddr sa;
1006     struct sockaddr_un un;
1007   } sa;
1008
1009   memset(&sa,0,sizeof(sa));
1010   int maxlen= sizeof(sa.un.sun_path);
1011
1012   int reallen= readlink(path_control, sa.un.sun_path, maxlen);
1013   if (reallen<0) {
1014     if (errno != ENOENT)
1015       NOCONTROL("readlink control socket symlink path %s", path_control);
1016   }
1017   if (reallen >= maxlen) {
1018     debug("control socket symlink path too long (r=%d)",reallen);
1019     xunlink(path_control, "old (overlong) control socket symlink");
1020     reallen= -1;
1021   }
1022   
1023   if (reallen<0) {
1024     struct stat stab;
1025     int r= lstat(realsockdir,&stab);
1026     if (r) {
1027       if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
1028
1029       r= mkdir(realsockdir, 0700);
1030       if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
1031
1032     } else {
1033       uid_t self= geteuid();
1034       if (!S_ISDIR(stab.st_mode) ||
1035           stab.st_uid != self ||
1036           stab.st_mode & 0007) {
1037         warn("no control socket, because real socket directory"
1038              " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
1039              !!S_ISDIR(stab.st_mode),
1040              (unsigned long)stab.st_uid, (unsigned long)self,
1041              (unsigned long)stab.st_mode & 0777UL);
1042         goto nocontrol;
1043       }
1044     }
1045
1046     real= xasprintf("%s/s%lx.%lx", realsockdir,
1047                     (unsigned long)xtime(), (unsigned long)self_pid);
1048     int reallen= strlen(real);
1049
1050     if (reallen >= maxlen) {
1051       warn("no control socket, because tmpnam gave overly-long path"
1052            " %s", real);
1053       goto nocontrol;
1054     }
1055     r= symlink(real, path_control);
1056     if (r) NOCONTROL("make control socket path %s a symlink to real"
1057                      " socket path %s", path_control, real);
1058     memcpy(sa.un.sun_path, real, reallen);
1059   }
1060
1061   int r= unlink(sa.un.sun_path);
1062   if (r && errno!=ENOENT)
1063     NOCONTROL("remove old real socket %s", sa.un.sun_path);
1064
1065   control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1066   if (control_master<0) NOCONTROL("create new control socket");
1067
1068   sa.un.sun_family= AF_UNIX;
1069   int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1070   r= bind(control_master, &sa.sa, sl);
1071   if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1072
1073   r= listen(control_master, 5);
1074   if (r) NOCONTROL("listen");
1075
1076   xsetnonblock(control_master, 1);
1077
1078   loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1079   info("control socket ok, real path %s", sa.un.sun_path);
1080
1081   return;
1082
1083  nocontrol:
1084   free(real);
1085   xclose_perhaps(&control_master, "control master",0);
1086   return;
1087 }
1088
1089 /*========== management of connections ==========*/
1090
1091 static void conn_closefd(Conn *conn, const char *msgprefix) {
1092   int r= close_perhaps(&conn->fd);
1093   if (r) info("C%d %serror closing socket: %s",
1094               conn->fd, msgprefix, strerror(errno));
1095 }
1096
1097 static void conn_dispose(Conn *conn) {
1098   if (!conn) return;
1099   if (conn->rd) {
1100     oop_rd_cancel(conn->rd);
1101     oop_rd_delete_kill(conn->rd);
1102     conn->rd= 0;
1103   }
1104   if (conn->fd) {
1105     loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1106     loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1107   }
1108   conn_closefd(conn,"");
1109   free(conn);
1110   until_connect= reconnect_delay_periods;
1111 }
1112
1113 static void *conn_exception(oop_source *lp, int fd,
1114                             oop_event ev, void *conn_v) {
1115   Conn *conn= conn_v;
1116   unsigned char ch;
1117   assert(fd == conn->fd);
1118   assert(ev == OOP_EXCEPTION);
1119   int r= read(conn->fd, &ch, 1);
1120   if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1121   else connfail(conn,"exceptional condition on socket (peer sent urgent"
1122                 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1123   return OOP_CONTINUE;
1124 }  
1125
1126 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1127   int requeue[art_MaxState];
1128   memset(requeue,0,sizeof(requeue));
1129
1130   Article *art;
1131   
1132   while ((art= LIST_REMHEAD(conn->priority)))
1133     LIST_ADDTAIL(art->ipf->queue, art);
1134
1135   while ((art= LIST_REMHEAD(conn->waiting)))
1136     LIST_ADDTAIL(art->ipf->queue, art);
1137
1138   while ((art= LIST_REMHEAD(conn->sent))) {
1139     requeue[art->state]++;
1140     if (art->state==art_Unsolicited) art->state= art_Unchecked;
1141     LIST_ADDTAIL(art->ipf->queue,art);
1142     check_reading_pause_resume(art->ipf);
1143   }
1144
1145   int i;
1146   XmitDetails *d;
1147   for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1148     xmit_free(d);
1149
1150   char *m= xvasprintf(fmt,al);
1151   warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1152        conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1153   free(m);
1154
1155   LIST_REMOVE(conns,conn);
1156   conn_dispose(conn);
1157   check_assign_articles();
1158 }
1159
1160 static void connfail(Conn *conn, const char *fmt, ...) {
1161   va_list al;
1162   va_start(al,fmt);
1163   vconnfail(conn,fmt,al);
1164   va_end(al);
1165 }
1166
1167 static void check_idle_conns(void) {
1168   Conn *conn;
1169   FOR_CONN(conn)
1170     conn->since_activity++;
1171  search_again:
1172   FOR_CONN(conn) {
1173     if (conn->since_activity <= need_activity_periods) continue;
1174
1175     /* We need to shut this down */
1176     if (conn->quitting)
1177       connfail(conn,"timed out waiting for response to QUIT");
1178     else if (conn->sent.count)
1179       connfail(conn,"timed out waiting for responses");
1180     else if (conn->waiting.count || conn->priority.count)
1181       connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1182     else if (conn->xmitu)
1183       connfail(conn,"peer has been sending responses"
1184                " before receiving our commands!");
1185     else {
1186       static const char quitcmd[]= "QUIT\r\n";
1187       int todo= sizeof(quitcmd)-1;
1188       const char *p= quitcmd;
1189       for (;;) {
1190         int r= write(conn->fd, p, todo);
1191         if (r<0) {
1192           if (isewouldblock(errno))
1193             connfail(conn, "blocked writing QUIT to idle connection");
1194           else
1195             connfail(conn, "failed to write QUIT to idle connection: %s",
1196                      strerror(errno));
1197           break;
1198         }
1199         assert(r<=todo);
1200         todo -= r;
1201         if (!todo) {
1202           conn->quitting= 1;
1203           conn->since_activity= 0;
1204           debug("C%d is idle, quitting", conn->fd);
1205           break;
1206         }
1207       }
1208     }
1209     goto search_again;
1210   }
1211 }  
1212
1213 /*---------- making new connections ----------*/
1214
1215 static pid_t connecting_child;
1216 static int connecting_fdpass_sock;
1217
1218 static void connect_attempt_discard(void) {
1219   if (connecting_child) {
1220     int status= xwaitpid(&connecting_child, "connect");
1221     if (!(WIFEXITED(status) ||
1222           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1223       report_child_status("connect", status);
1224   }
1225   if (connecting_fdpass_sock) {
1226     cancel_fd_read_except(connecting_fdpass_sock);
1227     xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1228   }
1229 }
1230
1231 #define PREP_DECL_MSG_CMSG(msg)                 \
1232   char msgbyte= 0;                              \
1233   struct iovec msgiov;                          \
1234   msgiov.iov_base= &msgbyte;                    \
1235   msgiov.iov_len= 1;                            \
1236   struct msghdr msg;                            \
1237   memset(&msg,0,sizeof(msg));                   \
1238   char msg##cbuf[CMSG_SPACE(sizeof(int))];      \
1239   msg.msg_iov= &msgiov;                         \
1240   msg.msg_iovlen= 1;                            \
1241   msg.msg_control= msg##cbuf;                   \
1242   msg.msg_controllen= sizeof(msg##cbuf);
1243
1244 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1245   Conn *conn= 0;
1246
1247   assert(fd == connecting_fdpass_sock);
1248
1249   PREP_DECL_MSG_CMSG(msg);
1250   
1251   ssize_t rs= recvmsg(fd, &msg, 0);
1252   if (rs<0) {
1253     if (isewouldblock(errno)) return OOP_CONTINUE;
1254     syswarn("failed to read socket from connecting child");
1255     goto x;
1256   }
1257
1258   NEW(conn);
1259   LIST_INIT(conn->waiting);
1260   LIST_INIT(conn->priority);
1261   LIST_INIT(conn->sent);
1262
1263   struct cmsghdr *h= 0;
1264   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1265   if (!h) {
1266     int status= xwaitpid(&connecting_child, "connect child (broken)");
1267
1268     if (WIFEXITED(status)) {
1269       if (WEXITSTATUS(status) != 0 &&
1270           WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1271           WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1272         /* child already reported the problem */;
1273       else {
1274         if (e == OOP_EXCEPTION)
1275           warn("connect: connection child exited code %d but"
1276                " unexpected exception on fdpass socket",
1277                WEXITSTATUS(status));
1278         else
1279           warn("connect: connection child exited code %d but"
1280                " no cmsg (rs=%d)",
1281                WEXITSTATUS(status), (int)rs);
1282       }
1283     } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1284       warn("connect: connection attempt timed out");
1285     } else {
1286       report_child_status("connect", status);
1287     }
1288     goto x;
1289   }
1290
1291 #define CHK(field, val)                                                  \
1292   if (h->cmsg_##field != val) {                                          \
1293     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1294         h->cmsg_##field, val);                                           \
1295     goto x;                                                              \
1296   }
1297   CHK(level, SOL_SOCKET);
1298   CHK(type,  SCM_RIGHTS);
1299   CHK(len,   CMSG_LEN(sizeof(conn->fd)));
1300 #undef CHK
1301
1302   if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1303
1304   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1305
1306   int status;
1307   pid_t got= waitpid(connecting_child, &status, 0);
1308   if (got==-1) sysdie("connect: real wait for child");
1309   assert(got == connecting_child);
1310   connecting_child= 0;
1311
1312   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1313   int es= WEXITSTATUS(status);
1314   switch (es) {
1315   case CONNCHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
1316   case CONNCHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
1317   default:
1318     fatal("connect: child gave unexpected exit status %d", es);
1319   }
1320
1321   /* Phew! */
1322   conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1323
1324   loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1325   conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1326   if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1327   int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1328                      &peer_rd_ok, conn,
1329                      &peer_rd_err, conn);
1330   if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1331
1332   notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1333   LIST_ADDHEAD(conns, conn);
1334
1335   connect_attempt_discard();
1336   check_assign_articles();
1337   return OOP_CONTINUE;
1338
1339  x:
1340   conn_dispose(conn);
1341   connect_attempt_discard();
1342   return OOP_CONTINUE;
1343 }
1344
1345 static int allow_connect_start(void) {
1346   return conns.count < max_connections
1347     && !connecting_child
1348     && !until_connect;
1349 }
1350
1351 static void connect_start(void) {
1352   assert(!connecting_child);
1353   assert(!connecting_fdpass_sock);
1354
1355   info("starting connection attempt");
1356
1357   int socks[2];
1358   int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1359   if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1360
1361   connecting_child= xfork("connection");
1362
1363   if (!connecting_child) {
1364     FILE *cn_from, *cn_to;
1365     char buf[NNTP_STRLEN+100];
1366     int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1367
1368     xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1369
1370     alarm(connection_setup_timeout);
1371     if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1372       int l= strlen(buf);
1373       int stripped=0;
1374       while (l>0) {
1375         unsigned char c= buf[l-1];
1376         if (!isspace(c)) break;
1377         if (c=='\n' || c=='\r') stripped=1;
1378         --l;
1379       }
1380       if (!buf[0]) {
1381         sysfatal("connect: connection attempt failed");
1382       } else {
1383         buf[l]= 0;
1384         fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1385               sanitise(buf,-1));
1386       }
1387     }
1388     if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1389       sysfatal("connect: authentication failed");
1390     if (try_stream) {
1391       if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1392           fflush(cn_to))
1393         sysfatal("connect: could not send MODE STREAM");
1394       buf[sizeof(buf)-1]= 0;
1395       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1396         if (ferror(cn_from))
1397           sysfatal("connect: could not read response to MODE STREAM");
1398         else
1399           fatal("connect: connection close in response to MODE STREAM");
1400       }
1401       int l= strlen(buf);
1402       assert(l>=1);
1403       if (buf[l-1]!='\n')
1404         fatal("connect: response to MODE STREAM is too long: %.100s...",
1405               sanitise(buf,-1));
1406       l--;  if (l>0 && buf[l-1]=='\r') l--;
1407       buf[l]= 0;
1408       char *ep;
1409       int rcode= strtoul(buf,&ep,10);
1410       if (ep != &buf[3])
1411         fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1412
1413       switch (rcode) {
1414       case 203:
1415         exitstatus= CONNCHILD_ESTATUS_STREAM;
1416         break;
1417       case 480:
1418       case 500:
1419         break;
1420       default:
1421         warn("connect: unexpected response to MODE STREAM: %.50s",
1422              sanitise(buf,-1));
1423         exitstatus= 2;
1424         break;
1425       }
1426     }
1427     int fd= fileno(cn_from);
1428
1429     PREP_DECL_MSG_CMSG(msg);
1430     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1431     cmsg->cmsg_level= SOL_SOCKET;
1432     cmsg->cmsg_type=  SCM_RIGHTS;
1433     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
1434     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1435
1436     msg.msg_controllen= cmsg->cmsg_len;
1437     r= sendmsg(socks[1], &msg, 0);
1438     if (r<0) sysdie("sendmsg failed for new connection");
1439     if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1440
1441     _exit(exitstatus);
1442   }
1443
1444   xclose(socks[1], "connecting fdpass child's socket",0);
1445   connecting_fdpass_sock= socks[0];
1446   xsetnonblock(connecting_fdpass_sock, 1);
1447   on_fd_read_except(connecting_fdpass_sock, connchild_event);
1448 }
1449
1450 /*---------- assigning articles to conns, and transmitting ----------*/
1451
1452 static Article *dequeue_from(int peek, InputFile *ipf) {
1453   if (!ipf) return 0;
1454   if (peek) return LIST_HEAD(ipf->queue);
1455
1456   Article *art= LIST_REMHEAD(ipf->queue);
1457   if (!art) return 0;
1458   check_reading_pause_resume(ipf);
1459   return art;
1460 }
1461
1462 static Article *dequeue(int peek) {
1463   Article *art;
1464   art= dequeue_from(peek, flushing_input_file);  if (art) return art;
1465   art= dequeue_from(peek, backlog_input_file);   if (art) return art;
1466   art= dequeue_from(peek, main_input_file);      if (art) return art;
1467   return 0;
1468 }
1469
1470 static void check_assign_articles(void) {
1471   for (;;) {
1472     if (!dequeue(1))
1473       break;
1474
1475     Conn *walk, *use=0;
1476     int spare=0, inqueue=0;
1477
1478     /* Find a connection to offer this article.  We prefer a busy
1479      * connection to an idle one, provided it's not full.  We take the
1480      * first (oldest) and since that's stable, it will mean we fill up
1481      * connections in order.  That way if we have too many
1482      * connections, the spare ones will go away eventually.
1483      */
1484     FOR_CONN(walk) {
1485       if (walk->quitting) continue;
1486       inqueue= walk->sent.count + walk->priority.count
1487              + walk->waiting.count;
1488       spare= walk->max_queue - inqueue;
1489       assert(inqueue <= max_queue_per_conn);
1490       assert(spare >= 0);
1491       if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1492       else if (spare>0) /*working*/ { use= walk; break; }
1493     }
1494     if (use) {
1495       if (!inqueue) use->since_activity= 0; /* reset idle counter */
1496       while (spare>0) {
1497         Article *art= dequeue(0);
1498         if (!art) break;
1499         LIST_ADDTAIL(use->waiting, art);
1500         spare--;
1501       }
1502       conn_maybe_write(use);
1503     } else if (allow_connect_start()) {
1504       until_connect= reconnect_delay_periods;
1505       connect_start();
1506       break;
1507     } else {
1508       break;
1509     }
1510   }
1511 }
1512
1513 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1514   conn_maybe_write(u);
1515   return OOP_CONTINUE;
1516 }
1517
1518 static void conn_maybe_write(Conn *conn)  {
1519   for (;;) {
1520     conn_make_some_xmits(conn);
1521     if (!conn->xmitu) {
1522       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1523       return;
1524     }
1525
1526     void *rp= conn_write_some_xmits(conn);
1527     if (rp==OOP_CONTINUE) {
1528       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1529       return;
1530     } else if (rp==OOP_HALT) {
1531       return;
1532     } else if (!rp) {
1533       /* transmitted everything */
1534     } else {
1535       abort();
1536     }
1537   }
1538 }
1539
1540 /*---------- expiry, flow control and deferral ----------*/
1541
1542 static void check_reading_pause_resume(InputFile *ipf) {
1543   if (ipf->queue.count >= max_queue_per_ipf)
1544     inputfile_reading_pause(ipf);
1545   else
1546     inputfile_reading_resume(ipf);
1547 }
1548
1549 static void article_defer(Article *art /* not on a queue */, int whichcount) {
1550   open_defer();
1551   if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1552       || fflush(defer))
1553     sysfatal("write to defer file %s",path_defer);
1554   article_done(art, whichcount);
1555 }
1556
1557 static int article_check_expired(Article *art /* must be queued, not conn */) {
1558   ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
1559   if (artdata) { SMfreearticle(artdata); return 0; }
1560
1561   LIST_REMOVE(art->ipf->queue, art);
1562   art->missing= 1;
1563   art->ipf->count_nooffer_missing++;
1564   article_done(art,-1);
1565   return 1;
1566 }
1567
1568 static void inputfile_queue_check_expired(InputFile *ipf) {
1569   if (!ipf) return;
1570
1571   for (;;) {
1572     Article *art= LIST_HEAD(ipf->queue);
1573     int exp= article_check_expired(art);
1574     if (!exp) break;
1575   }
1576   check_reading_pause_resume(ipf);
1577 }
1578
1579 static void article_autodefer(InputFile *ipf, Article *art) {
1580   ipf->autodefer++;
1581   article_defer(art,-1);
1582 }
1583
1584 static int has_article_in(const ArticleList *al, InputFile *ipf) {
1585   Article *art;
1586   for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
1587     if (art->ipf == ipf) return 1;
1588   return 0;
1589 }
1590
1591 static void autodefer_input_file_articles(InputFile *ipf) {
1592   Article *art;
1593   while ((art= LIST_REMHEAD(ipf->queue)))
1594     article_autodefer(ipf, art);
1595 }
1596
1597 static void autodefer_input_file(InputFile *ipf) {
1598   ipf->autodefer= 0;
1599
1600   autodefer_input_file_articles(ipf);
1601
1602   if (ipf->inprogress) {
1603     Conn *walk;
1604     FOR_CONN(walk) {
1605       if (has_article_in(&walk->waiting,  ipf) ||
1606           has_article_in(&walk->priority, ipf) ||
1607           has_article_in(&walk->sent,     ipf))
1608         walk->quitting= -1;
1609     }
1610     while (ipf->inprogress) {
1611       FOR_CONN(walk)
1612         if (walk->quitting < 0) goto found;
1613       abort(); /* where are they ?? */
1614
1615     found:
1616       connfail(walk, "connection is stuck or crawling,"
1617                " and we need to finish flush");
1618       autodefer_input_file_articles(ipf);
1619     }
1620   }
1621
1622   check_reading_pause_resume(ipf);
1623 }
1624
1625 /*========== article transmission ==========*/
1626
1627 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1628                   XmitKind kind) { /* caller must then fill in details */
1629   struct iovec *v= &conn->xmit[conn->xmitu];
1630   XmitDetails *d= &conn->xmitd[conn->xmitu++];
1631   v->iov_base= (char*)data;
1632   v->iov_len= len;
1633   d->kind= kind;
1634   return d;
1635 }
1636
1637 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1638   xmit_core(conn,data,len, xk_Const);
1639 }
1640 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1641
1642 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1643   XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1644   d->info.sm_art= ah;
1645 }
1646
1647 static void xmit_free(XmitDetails *d) {
1648   switch (d->kind) {
1649   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1650   case xk_Const:                                  break;
1651   default: abort();
1652   }
1653 }
1654
1655 static void *conn_write_some_xmits(Conn *conn) {
1656   /* return values:
1657    *      0:            nothing more to write, no need to call us again
1658    *      OOP_CONTINUE: more to write but fd not writeable
1659    *      OOP_HALT:     disaster, have destroyed conn
1660    */
1661   for (;;) {
1662     int count= conn->xmitu;
1663     if (!count) return 0;
1664
1665     if (count > IOV_MAX) count= IOV_MAX;
1666     ssize_t rs= writev(conn->fd, conn->xmit, count);
1667     if (rs < 0) {
1668       if (isewouldblock(errno)) return OOP_CONTINUE;
1669       connfail(conn, "write failed: %s", strerror(errno));
1670       return OOP_HALT;
1671     }
1672     assert(rs > 0);
1673
1674     int done;
1675     for (done=0; rs && done<conn->xmitu; done++) {
1676       struct iovec *vp= &conn->xmit[done];
1677       XmitDetails *dp= &conn->xmitd[done];
1678       if (rs > vp->iov_len) {
1679         rs -= vp->iov_len;
1680         xmit_free(dp);
1681       } else {
1682         vp->iov_base= (char*)vp->iov_base + rs;
1683         vp->iov_len -= rs;
1684       }
1685     }
1686     int newu= conn->xmitu - done;
1687     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
1688     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1689     conn->xmitu= newu;
1690   }
1691 }
1692
1693 static void conn_make_some_xmits(Conn *conn) {
1694   for (;;) {
1695     if (conn->xmitu+5 > CONNIOVS)
1696       break;
1697
1698     Article *art= LIST_REMHEAD(conn->priority);
1699     if (!art) art= LIST_REMHEAD(conn->waiting);
1700     if (!art) break;
1701
1702     if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1703       /* actually send it */
1704
1705       ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1706
1707       art->state=
1708         art->state == art_Unchecked ? art_Unsolicited :
1709         art->state == art_Wanted    ? art_Wanted      :
1710         (abort(),-1);
1711
1712       if (!artdata) art->missing= 1;
1713       art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1714
1715       if (conn->stream) {
1716         if (artdata) {
1717           XMIT_LITERAL("TAKETHIS ");
1718           xmit_noalloc(conn, art->messageid, art->midlen);
1719           XMIT_LITERAL("\r\n");
1720           xmit_artbody(conn, artdata);
1721         } else {
1722           article_done(art, -1);
1723           continue;
1724         }
1725       } else {
1726         /* we got 235 from IHAVE */
1727         if (artdata) {
1728           xmit_artbody(conn, artdata);
1729         } else {
1730           XMIT_LITERAL(".\r\n");
1731         }
1732       }
1733
1734       LIST_ADDTAIL(conn->sent, art);
1735
1736     } else {
1737       /* check it */
1738
1739       if (conn->stream)
1740         XMIT_LITERAL("CHECK ");
1741       else
1742         XMIT_LITERAL("IHAVE ");
1743       xmit_noalloc(conn, art->messageid, art->midlen);
1744       XMIT_LITERAL("\r\n");
1745
1746       assert(art->state == art_Unchecked);
1747       art->ipf->counts[art->state][RC_sent]++;
1748       LIST_ADDTAIL(conn->sent, art);
1749     }
1750   }
1751 }
1752
1753 /*========== handling responses from peer ==========*/
1754
1755 static const oop_rd_style peer_rd_style= {
1756   OOP_RD_DELIM_STRIP, '\n',
1757   OOP_RD_NUL_FORBID,
1758   OOP_RD_SHORTREC_FORBID
1759 };
1760
1761 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1762                          const char *errmsg, int errnoval,
1763                          const char *data, size_t recsz, void *conn_v) {
1764   Conn *conn= conn_v;
1765   connfail(conn, "error receiving from peer: %s", errmsg);
1766   return OOP_CONTINUE;
1767 }
1768
1769 static Article *article_reply_check(Conn *conn, const char *response,
1770                                     int code_indicates_streaming,
1771                                     int must_have_sent
1772                                         /* 1:yes, -1:no, 0:dontcare */,
1773                                     const char *sanitised_response) {
1774   Article *art= LIST_HEAD(conn->sent);
1775
1776   if (!art) {
1777     connfail(conn,
1778              "peer gave unexpected response when no commands outstanding: %s",
1779              sanitised_response);
1780     return 0;
1781   }
1782
1783   if (code_indicates_streaming) {
1784     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1785     if (!conn->stream) {
1786       connfail(conn, "peer gave streaming response code "
1787                " to IHAVE or subsequent body: %s", sanitised_response);
1788       return 0;
1789     }
1790     const char *got_mid= response+4;
1791     int got_midlen= strcspn(got_mid, " \n\r");
1792     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1793       connfail(conn, "peer gave streaming response with syntactically invalid"
1794                " messageid: %s", sanitised_response);
1795       return 0;
1796     }
1797     if (got_midlen != art->midlen ||
1798         memcmp(got_mid, art->messageid, got_midlen)) {
1799       connfail(conn, "peer gave streaming response code to wrong article -"
1800                " probable synchronisation problem; we offered: %s;"
1801                " peer said: %s",
1802                art->messageid, sanitised_response);
1803       return 0;
1804     }
1805   } else {
1806     if (conn->stream) {
1807       connfail(conn, "peer gave non-streaming response code to"
1808                " CHECK/TAKETHIS: %s", sanitised_response);
1809       return 0;
1810     }
1811   }
1812
1813   if (must_have_sent>0 && art->state < art_Wanted) {
1814     connfail(conn, "peer says article accepted but"
1815              " we had not sent the body: %s", sanitised_response);
1816     return 0;
1817   }
1818   if (must_have_sent<0 && art->state >= art_Wanted) {
1819     connfail(conn, "peer says please sent the article but we just did: %s",
1820              sanitised_response);
1821     return 0;
1822   }
1823
1824   Article *art_again= LIST_REMHEAD(conn->sent);
1825   assert(art_again == art);
1826   return art;
1827 }
1828
1829 static void update_nocheck(int accepted) {
1830   accept_proportion *= nocheck_decay;
1831   accept_proportion += accepted * (1.0 - nocheck_decay);
1832   int new_nocheck= accept_proportion >= nocheck_thresh;
1833   if (new_nocheck && !nocheck_reported) {
1834     notice("entering nocheck mode for the first time");
1835     nocheck_reported= 1;
1836   } else if (new_nocheck != nocheck) {
1837     debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1838   }
1839   nocheck= new_nocheck;
1840 }
1841
1842 static void article_done(Article *art, int whichcount) {
1843   if (whichcount>=0 && !art->missing)
1844     art->ipf->counts[art->state][whichcount]++;
1845
1846   if (whichcount == RC_accepted) update_nocheck(1);
1847   else if (whichcount == RC_unwanted) update_nocheck(0);
1848
1849   InputFile *ipf= art->ipf;
1850
1851   while (art->blanklen) {
1852     static const char spaces[]=
1853       "                                                                "
1854       "                                                                "
1855       "                                                                "
1856       "                                                                "
1857       "                                                                "
1858       "                                                                "
1859       "                                                                "
1860       "                                                                "
1861       "                                                                ";
1862     int w= art->blanklen;  if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1863     int r= pwrite(ipf->fd, spaces, w, art->offset);
1864     if (r==-1) {
1865       if (errno==EINTR) continue;
1866       sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1867              art->messageid, art->blanklen,
1868              (unsigned long)art->offset, ipf->path);
1869     }
1870     assert(r>=0 && r<=w);
1871     art->blanklen -= w;
1872     art->offset += w;
1873   }
1874
1875   ipf->inprogress--;
1876   assert(ipf->inprogress >= 0);
1877   free(art);
1878
1879   if (!ipf->inprogress && ipf != main_input_file)
1880     queue_check_input_done();
1881 }
1882
1883 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1884                         const char *errmsg, int errnoval,
1885                         const char *data, size_t recsz, void *conn_v) {
1886   Conn *conn= conn_v;
1887
1888   if (ev == OOP_RD_EOF) {
1889     connfail(conn, "unexpected EOF from peer");
1890     return OOP_CONTINUE;
1891   }
1892   assert(ev == OOP_RD_OK);
1893
1894   char *sani= sanitise(data,-1);
1895
1896   char *ep;
1897   unsigned long code= strtoul(data, &ep, 10);
1898   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1899     connfail(conn, "badly formatted response from peer: %s", sani);
1900     return OOP_CONTINUE;
1901   }
1902
1903   int conn_busy=
1904     conn->waiting.count ||
1905     conn->priority.count ||
1906     conn->sent.count ||
1907     conn->xmitu;
1908
1909   if (conn->quitting) {
1910     if (code!=205 && code!=503) {
1911       connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1912     } else {
1913       notice("C%d idle connection closed by us", conn->fd);
1914       assert(!conn_busy);
1915       LIST_REMOVE(conns,conn);
1916       conn_dispose(conn);
1917     }
1918     return OOP_CONTINUE;
1919   }
1920
1921   conn->since_activity= 0;
1922   Article *art;
1923
1924 #define GET_ARTICLE(musthavesent) do{                                         \
1925     art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1926     if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
1927   }while(0) 
1928
1929 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{       \
1930     code_streaming= (streaming);                                \
1931     GET_ARTICLE(musthavesent);                                  \
1932     article_done(art, RC_##how);                                \
1933     goto dealtwith;                                             \
1934   }while(0)
1935
1936 #define PEERBADMSG(m) do {                                      \
1937     connfail(conn, m ": %s", sani);  return OOP_CONTINUE;       \
1938   }while(0)
1939
1940   int code_streaming= 0;
1941
1942   switch (code) {
1943
1944   case 400: PEERBADMSG("peer stopped accepting articles");
1945   default:  PEERBADMSG("peer sent unexpected message");
1946
1947   case 503:
1948     if (conn_busy) PEERBADMSG("peer timed us out");
1949     notice("C%d idle connection closed by peer", conn->fd);
1950     LIST_REMOVE(conns,conn);
1951     conn_dispose(conn);
1952     return OOP_CONTINUE;
1953
1954   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1955   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1956
1957   case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1958   case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1959
1960   case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1961   case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1962
1963   case 238: /* CHECK says send it */
1964     code_streaming= 1;
1965   case 335: /* IHAVE says send it */
1966     GET_ARTICLE(-1);
1967     assert(art->state == art_Unchecked);
1968     art->ipf->counts[art->state][RC_accepted]++;
1969     art->state= art_Wanted;
1970     LIST_ADDTAIL(conn->priority, art);
1971     break;
1972
1973   case 431: /* CHECK or TAKETHIS says try later */
1974     code_streaming= 1;
1975   case 436: /* IHAVE says try later */
1976     GET_ARTICLE(0);
1977     article_defer(art, RC_deferred);
1978     break;
1979
1980   }
1981 dealtwith:
1982
1983   conn_maybe_write(conn);
1984   check_assign_articles();
1985   return OOP_CONTINUE;
1986 }
1987
1988
1989 /*========== monitoring of input files ==========*/
1990
1991 static void feedfile_eof(InputFile *ipf) {
1992   assert(ipf != main_input_file); /* promised by tailing_try_read */
1993   inputfile_reading_stop(ipf);
1994
1995   if (ipf == flushing_input_file) {
1996     assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1997     if (main_input_file) inputfile_reading_start(main_input_file);
1998     statemc_check_flushing_done();
1999   } else if (ipf == backlog_input_file) {
2000     statemc_check_backlog_done();
2001   } else {
2002     abort(); /* supposed to wait rather than get EOF on main input file */
2003   }
2004 }
2005
2006 static InputFile *open_input_file(const char *path) {
2007   int fd= open(path, O_RDWR);
2008   if (fd<0) {
2009     if (errno==ENOENT) return 0;
2010     sysfatal("unable to open input file %s", path);
2011   }
2012   assert(fd>0);
2013
2014   InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
2015   memset(ipf,0,sizeof(*ipf));
2016
2017   ipf->fd= fd;
2018   ipf->autodefer= -1;
2019   LIST_INIT(ipf->queue);
2020   strcpy(ipf->path, path);
2021
2022   return ipf;
2023 }
2024
2025 static void close_input_file(InputFile *ipf) { /* does not free */
2026   assert(!ipf->readable_callback); /* must have had ->on_cancel */
2027   assert(!ipf->filemon); /* must have had inputfile_reading_stop */
2028   assert(!ipf->rd); /* must have had inputfile_reading_stop */
2029   assert(!ipf->inprogress); /* no dangling pointers pointing here */
2030   xclose_perhaps(&ipf->fd, "input file ", ipf->path);
2031 }
2032
2033
2034 /*---------- dealing with articles read in the input file ----------*/
2035
2036 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
2037                                    const char *data, const char *how) {
2038   warn("corrupted file: %s, offset %lu: %s: in %s",
2039        ipf->path, (unsigned long)offset, how, sanitise(data,-1));
2040   ipf->readcount_err++;
2041   if (ipf->readcount_err > max_bad_data_initial +
2042       (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
2043     die("too much garbage in input file!  (%d errs, %d ok, %d blank)",
2044         ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
2045   return OOP_CONTINUE;
2046 }
2047
2048 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
2049                                oop_rd_event ev, const char *errmsg,
2050                                int errnoval, const char *data, size_t recsz,
2051                                void *ipf_v) {
2052   InputFile *ipf= ipf_v;
2053   assert(ev == OOP_RD_SYSTEM);
2054   errno= errnoval;
2055   sysdie("error reading input file: %s, offset %lu",
2056          ipf->path, (unsigned long)ipf->offset);
2057 }
2058
2059 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
2060                                   oop_rd_event ev, const char *errmsg,
2061                                   int errnoval, const char *data, size_t recsz,
2062                                   void *ipf_v) {
2063   InputFile *ipf= ipf_v;
2064   Article *art;
2065   char tokentextbuf[sizeof(TOKEN)*2+3];
2066
2067   if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
2068
2069   off_t old_offset= ipf->offset;
2070   ipf->offset += recsz + !!(ev == OOP_RD_OK);
2071
2072 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
2073
2074   if (ev==OOP_RD_PARTREC)
2075     feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
2076     /* but process it anyway */
2077
2078   if (ipf->skippinglong) {
2079     if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
2080     return OOP_CONTINUE;
2081   }
2082   if (ev==OOP_RD_LONG) {
2083     ipf->skippinglong= 1;
2084     X_BAD_DATA("overly long line");
2085   }
2086
2087   if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
2088   if (!recsz) X_BAD_DATA("empty line");
2089
2090   if (data[0]==' ') {
2091     if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
2092     ipf->readcount_blank++;
2093     return OOP_CONTINUE;
2094   }
2095
2096   char *space= strchr(data,' ');
2097   int tokenlen= space-data;
2098   int midlen= (int)recsz-tokenlen-1;
2099   if (midlen <= 2) X_BAD_DATA("no room for messageid");
2100   if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
2101
2102   if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
2103   memcpy(tokentextbuf, data, tokenlen);
2104   tokentextbuf[tokenlen]= 0;
2105   if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
2106
2107   ipf->readcount_ok++;
2108
2109   art= xmalloc(sizeof(*art) - 1 + midlen + 1);
2110   memset(art,0,sizeof(*art));
2111   art->state= art_Unchecked;
2112   art->midlen= midlen;
2113   art->ipf= ipf;  ipf->inprogress++;
2114   art->token= TextToToken(tokentextbuf);
2115   art->offset= old_offset;
2116   art->blanklen= recsz;
2117   strcpy(art->messageid, space+1);
2118   LIST_ADDTAIL(ipf->queue, art);
2119
2120   if (ipf->autodefer >= 0)
2121     article_autodefer(ipf, art);
2122   else if (ipf==backlog_input_file)
2123     article_check_expired(art);
2124
2125   if (sms==sm_NORMAL && ipf==main_input_file &&
2126       ipf->offset >= target_max_feedfile_size)
2127     statemc_start_flush("feed file size");
2128
2129   check_assign_articles(); /* may destroy conn but that's OK */
2130   check_reading_pause_resume(ipf);
2131   return OOP_CONTINUE;
2132 }
2133
2134 /*========== tailing input file ==========*/
2135
2136 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
2137                                      void *user) {
2138   InputFile *ipf= user;
2139   return ipf->readable_callback(loop, &ipf->readable,
2140                                 ipf->readable_callback_user);
2141 }
2142
2143 static void tailing_on_cancel(struct oop_readable *rable) {
2144   InputFile *ipf= (void*)rable;
2145
2146   if (ipf->filemon) filemon_stop(ipf);
2147   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2148   ipf->readable_callback= 0;
2149 }
2150
2151 static void tailing_queue_readable(InputFile *ipf) {
2152   /* lifetime of ipf here is OK because destruction will cause
2153    * on_cancel which will cancel this callback */
2154   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
2155 }
2156
2157 static int tailing_on_readable(struct oop_readable *rable,
2158                                 oop_readable_call *cb, void *user) {
2159   InputFile *ipf= (void*)rable;
2160
2161   tailing_on_cancel(rable);
2162   ipf->readable_callback= cb;
2163   ipf->readable_callback_user= user;
2164   filemon_start(ipf);
2165
2166   tailing_queue_readable(ipf);
2167   return 0;
2168 }
2169
2170 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2171                                 size_t length) {
2172   InputFile *ipf= (void*)rable;
2173   for (;;) {
2174     ssize_t r= read(ipf->fd, buffer, length);
2175     if (r==-1) {
2176       if (errno==EINTR) continue;
2177       return r;
2178     }
2179     if (!r) {
2180       if (ipf==main_input_file) {
2181         errno=EAGAIN;
2182         return -1;
2183       } else if (ipf==flushing_input_file) {
2184         assert(ipf->rd);
2185         assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2186       } else if (ipf==backlog_input_file) {
2187         assert(ipf->rd);
2188       } else {
2189         abort();
2190       }
2191     }
2192     tailing_queue_readable(ipf);
2193     return r;
2194   }
2195 }
2196
2197 /*---------- filemon implemented with inotify ----------*/
2198
2199 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2200 #define HAVE_FILEMON
2201
2202 #include <sys/inotify.h>
2203
2204 static int filemon_inotify_fd;
2205 static int filemon_inotify_wdmax;
2206 static InputFile **filemon_inotify_wd2ipf;
2207
2208 struct Filemon_Perfile {
2209   int wd;
2210 };
2211
2212 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2213   int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2214   if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2215
2216   if (wd >= filemon_inotify_wdmax) {
2217     int newmax= wd+2;
2218     filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2219                                  sizeof(*filemon_inotify_wd2ipf) * newmax);
2220     memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2221            sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2222     filemon_inotify_wdmax= newmax;
2223   }
2224
2225   assert(!filemon_inotify_wd2ipf[wd]);
2226   filemon_inotify_wd2ipf[wd]= ipf;
2227
2228   debug("filemon inotify startfile %p wd=%d wdmax=%d",
2229         ipf, wd, filemon_inotify_wdmax);
2230
2231   pf->wd= wd;
2232 }
2233
2234 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2235   int wd= pf->wd;
2236   debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2237   int r= inotify_rm_watch(filemon_inotify_fd, wd);
2238   if (r) sysdie("inotify_rm_watch");
2239   filemon_inotify_wd2ipf[wd]= 0;
2240 }
2241
2242 static void *filemon_inotify_readable(oop_source *lp, int fd,
2243                                       oop_event e, void *u) {
2244   struct inotify_event iev;
2245   for (;;) {
2246     int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2247     if (r==-1) {
2248       if (isewouldblock(errno)) break;
2249       sysdie("read from inotify master");
2250     } else if (r==sizeof(iev)) {
2251       assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2252     } else {
2253       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2254     }
2255     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2256     debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2257     filemon_callback(ipf);
2258   }
2259   return OOP_CONTINUE;
2260 }
2261
2262 static int filemon_method_init(void) {
2263   filemon_inotify_fd= inotify_init();
2264   if (filemon_inotify_fd<0) {
2265     syswarn("filemon/inotify: inotify_init failed");
2266     return 0;
2267   }
2268   xsetnonblock(filemon_inotify_fd, 1);
2269   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2270
2271   debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2272   return 1;
2273 }
2274
2275 static void filemon_method_dump_info(FILE *f) {
2276   int i;
2277   fprintf(f,"inotify");
2278   DUMPV("%d",,filemon_inotify_fd);
2279   DUMPV("%d",,filemon_inotify_wdmax);
2280   for (i=0; i<filemon_inotify_wdmax; i++)
2281     fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i],);
2282 }
2283
2284 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2285
2286 /*---------- filemon dummy implementation ----------*/
2287
2288 #if !defined(HAVE_FILEMON)
2289
2290 struct Filemon_Perfile { int dummy; };
2291
2292 static int filemon_method_init(void) {
2293   warn("filemon/dummy: no filemon method compiled in");
2294   return 0;
2295 }
2296 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2297 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2298 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2299
2300 #endif /* !HAVE_FILEMON */
2301
2302 /*---------- filemon generic interface ----------*/
2303
2304 static void filemon_start(InputFile *ipf) {
2305   assert(!ipf->filemon);
2306
2307   NEW(ipf->filemon);
2308   filemon_method_startfile(ipf, ipf->filemon);
2309 }
2310
2311 static void filemon_stop(InputFile *ipf) {
2312   if (!ipf->filemon) return;
2313   filemon_method_stopfile(ipf, ipf->filemon);
2314   free(ipf->filemon);
2315   ipf->filemon= 0;
2316 }
2317
2318 static void filemon_callback(InputFile *ipf) {
2319   if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2320     ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2321 }
2322
2323 /*---------- interface to start and stop an input file ----------*/
2324
2325 static const oop_rd_style feedfile_rdstyle= {
2326   OOP_RD_DELIM_STRIP, '\n',
2327   OOP_RD_NUL_PERMIT,
2328   OOP_RD_SHORTREC_LONG,
2329 };
2330
2331 static void inputfile_reading_resume(InputFile *ipf) {
2332   if (!ipf->rd) return;
2333   if (!ipf->paused) return;
2334
2335   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2336                      feedfile_got_article,ipf, feedfile_read_err, ipf);
2337   if (r) sysdie("unable start reading feedfile %s",ipf->path);
2338
2339   ipf->paused= 0;
2340 }
2341
2342 static void inputfile_reading_pause(InputFile *ipf) {
2343   if (!ipf->rd) return;
2344   if (ipf->paused) return;
2345   oop_rd_cancel(ipf->rd);
2346   ipf->paused= 1;
2347 }
2348
2349 static void inputfile_reading_start(InputFile *ipf) {
2350   assert(!ipf->rd);
2351   ipf->readable.on_readable= tailing_on_readable;
2352   ipf->readable.on_cancel=   tailing_on_cancel;
2353   ipf->readable.try_read=    tailing_try_read;
2354   ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2355   ipf->readable.delete_kill= 0;
2356
2357   ipf->readable_callback= 0;
2358   ipf->readable_callback_user= 0;
2359
2360   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2361   assert(ipf->rd);
2362
2363   ipf->paused= 1;
2364   inputfile_reading_resume(ipf);
2365 }
2366
2367 static void inputfile_reading_stop(InputFile *ipf) {
2368   assert(ipf->rd);
2369   inputfile_reading_pause(ipf);
2370   oop_rd_delete(ipf->rd);
2371   ipf->rd= 0;
2372   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2373 }
2374
2375
2376 /*========== interaction with innd - state machine ==========*/
2377
2378 /* See official state diagram at top of file.  We implement
2379  * this as follows:
2380  * -8<-
2381
2382             .=======.
2383             ||START||
2384             `======='
2385                 |
2386                 | open F
2387                 |
2388                 |    F ENOENT
2389                 |`---------------------------------------------------.
2390       F OPEN OK |                                                    |
2391                 |`---------------- - - -                             |
2392        D ENOENT |       D EXISTS   see OVERALL STATES diagram        |
2393                 |                  for full startup logic            |
2394      ,--------->|                                                    |
2395      |          V                                                    |
2396      |     ============                                       try to |
2397      |      NORMAL                                            open D |
2398      |     [Normal]                                                  |
2399      |      main F tail                                              |
2400      |     ============                                              V
2401      |          |                                                    |
2402      |          | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT            |
2403      ^          | hardlink F to D                                    |
2404      |     [Hardlinked]                                              |
2405      |          | unlink F                                           |
2406      |          | our handle onto F is now onto D                    |
2407      |     [Moved]                                                   |
2408      |          |                                                    |
2409      |          |<-------------------<---------------------<---------+
2410      |          |                                                    |
2411      |          | spawn inndcomm flush                               |
2412      |          V                                                    |
2413      |     ==================                                        |
2414      |      FLUSHING[-ABSENT]                                        |
2415      |     [Flushing]                                                |
2416      |     main D tail/none                                          |
2417      |     ==================                                        |
2418      |          |                                                    |
2419      |          |   INNDCOMM FLUSH FAILS                             ^
2420      |          |`----------------------->----------.                |
2421      |          |                                   |                |
2422      |          |   NO SUCH SITE                    V                |
2423      ^          |`--------------->----.         ==================== |
2424      |          |                      \        FLUSHFAILED[-ABSENT] |
2425      |          |                       \         [Moved]            |
2426      |          | FLUSH OK               \       main D tail/none    |
2427      |          | open F                  \     ==================== |
2428      |          |                          \        |                |
2429      |          |                           \       | TIME TO RETRY  |
2430      |          |`------->----.     ,---<---'\      `----------------'
2431      |          |    D NONE   |     | D NONE  `----.
2432      |          V             |     |              V
2433      |     =============      V     V             ============
2434      |      SEPARATED-1       |     |              DROPPING-1
2435      |      flsh->rd!=0       |     |              flsh->rd!=0
2436      |     [Separated]        |     |             [Dropping]
2437      |      main F idle       |     |              main none
2438      |      flsh D tail       |     |              flsh D tail
2439      |     =============      |     |             ============
2440      |          |             |     | install       |
2441      ^          | EOF ON D    |     |  defer        | EOF ON D
2442      |          V             |     |               V
2443      |     ===============    |     |             ===============
2444      |      SEPARATED-2       |     |              DROPPING-2
2445      |      flsh->rd==0       |     V              flsh->rd==0
2446      |     [Finishing]        |     |             [Dropping]
2447      |      main F tail       |     `.             main none
2448      |      flsh D closed     |       `.           flsh D closed
2449      |     ===============    V         `.        ===============
2450      |          |                         `.          |
2451      |          | ALL D PROCESSED           `.        | ALL D PROCESSED
2452      |          V install defer as backlog    `.      | install defer
2453      ^          | close D                       `.    | close D
2454      |          | unlink D                        `.  | unlink D
2455      |          |                                  |  |
2456      |          |                                  V  V
2457      `----------'                               ==============
2458                                                  DROPPED
2459                                                 [Dropped]
2460                                                  main none
2461                                                  flsh none
2462                                                  some backlog
2463                                                 ==============
2464                                                       |
2465                                                       | ALL BACKLOG DONE
2466                                                       |
2467                                                       | unlink lock
2468                                                       | exit
2469                                                       V
2470                                                   ==========
2471                                                    (ESRCH)
2472                                                   [Droppped]
2473                                                   ==========
2474  * ->8-
2475  */
2476
2477 static void startup_set_input_file(InputFile *f) {
2478   assert(!main_input_file);
2479   main_input_file= f;
2480   inputfile_reading_start(f);
2481 }
2482
2483 static void statemc_lock(void) {
2484   int lockfd;
2485   struct stat stab, stabf;
2486   
2487   for (;;) {
2488     lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2489     if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2490
2491     struct flock fl;
2492     memset(&fl,0,sizeof(fl));
2493     fl.l_type= F_WRLCK;
2494     fl.l_whence= SEEK_SET;
2495     int r= fcntl(lockfd, F_SETLK, &fl);
2496     if (r==-1) {
2497       if (errno==EACCES || isewouldblock(errno)) {
2498         if (quiet_multiple) exit(0);
2499         fatal("another duct holds the lockfile");
2500       }
2501       sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2502     }
2503
2504     xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2505     int lock_noent;
2506     xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2507
2508     if (!lock_noent && samefile(&stab, &stabf))
2509       break;
2510
2511     xclose(lockfd, "stale lockfile ", path_lock);
2512   }
2513
2514   FILE *lockfile= fdopen(lockfd, "w");
2515   if (!lockfile) sysdie("fdopen lockfile");
2516
2517   int r= ftruncate(lockfd, 0);
2518   if (r) sysdie("truncate lockfile to write new info");
2519
2520   if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2521               (unsigned long)self_pid,
2522               sitename, feedfile, remote_host) == EOF ||
2523       fflush(lockfile))
2524     sysfatal("write info to lockfile %s", path_lock);
2525
2526   debug("startup: locked");
2527 }
2528
2529 static void statemc_init(void) {
2530   struct stat stabdefer;
2531
2532   search_backlog_file();
2533
2534   int defer_noent;
2535   xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2536   if (defer_noent) {
2537     debug("startup: ductdefer ENOENT");
2538   } else {
2539     debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2540     switch (stabdefer.st_nlink==1) {
2541     case 1:
2542       open_defer(); /* so that we will later close it and rename it */
2543       break;
2544     case 2:
2545       xunlink(path_defer, "stale defer file link"
2546               " (presumably hardlink to backlog file)");
2547       break;
2548     default:
2549       die("defer file %s has unexpected link count %d",
2550           path_defer, stabdefer.st_nlink);
2551     }
2552   }
2553
2554   struct stat stab_f, stab_d;
2555   int noent_f;
2556
2557   InputFile *file_d= open_input_file(path_flushing);
2558   if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2559
2560   xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2561
2562   if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2563     debug("startup: F==D => Hardlinked");
2564     xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2565     noent_f= 1;
2566   }
2567
2568   if (noent_f) {
2569     debug("startup: F ENOENT => Moved");
2570     if (file_d) startup_set_input_file(file_d);
2571     spawn_inndcomm_flush("feedfile missing at startup");
2572     /* => Flushing, sms:=FLUSHING */
2573   } else {
2574     if (file_d) {
2575       debug("startup: F!=D => Separated");
2576       startup_set_input_file(file_d);
2577       flushing_input_file= main_input_file;
2578       main_input_file= open_input_file(feedfile);
2579       if (!main_input_file) die("feedfile vanished during startup");
2580       SMS(SEPARATED, max_separated_periods,
2581           "found both old and current feed files");
2582     } else {
2583       debug("startup: F exists, D ENOENT => Normal");
2584       InputFile *file_f= open_input_file(feedfile);
2585       if (!file_f) die("feed file vanished during startup");
2586       startup_set_input_file(file_f);
2587       SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2588     }
2589   }
2590 }
2591
2592 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2593   assert(sms == sm_NORMAL);
2594
2595   debug("starting flush (%s) (%lu >?= %lu) (%d)",
2596         why,
2597         (unsigned long)(main_input_file ? main_input_file->offset : 0),
2598         (unsigned long)target_max_feedfile_size,
2599         until_flush);
2600
2601   int r= link(feedfile, path_flushing);
2602   if (r) sysfatal("link feedfile %s to flushing file %s",
2603                   feedfile, path_flushing);
2604   /* => Hardlinked */
2605
2606   xunlink(feedfile, "old feedfile link");
2607   /* => Moved */
2608
2609   spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2610 }
2611
2612 static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
2613   switch (sms) {
2614
2615   case sm_NORMAL:
2616     statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2617     return 1;
2618
2619   case sm_FLUSHFAILED:
2620     spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2621     return 1;
2622
2623   case sm_SEPARATED:
2624   case sm_DROPPING:
2625     warn("took too long to complete old feedfile after flush, autodeferring");
2626     assert(flushing_input_file);
2627     autodefer_input_file(flushing_input_file);
2628     return 1;
2629
2630   default:
2631     return 0;
2632   }
2633 }
2634
2635 static void statemc_period_poll(void) {
2636   if (!until_flush) return;
2637   until_flush--;
2638   assert(until_flush>=0);
2639
2640   if (until_flush) return;
2641   int ok= trigger_flush_ok();
2642   assert(ok);
2643 }
2644
2645 static int inputfile_is_done(InputFile *ipf) {
2646   if (!ipf) return 0;
2647   if (ipf->inprogress) return 0; /* new article in the meantime */
2648   if (ipf->rd) return 0; /* not had EOF */
2649   return 1;
2650 }
2651
2652 static void notice_processed(InputFile *ipf, int completed,
2653                              const char *what, const char *spec) {
2654   if (!ipf) return; /* allows preterminate to be lazy */
2655
2656 #define RCI_NOTHING(x) /* nothing */
2657 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2658 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2659
2660 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2661
2662   char *inprog= completed
2663     ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2664     : xasprintf(" inprogress=%ld", ipf->inprogress);
2665   char *autodefer= ipf->autodefer >= 0
2666     ? xasprintf(" autodeferred=%ld", ipf->autodefer)
2667     : xasprintf("%s","");
2668
2669   info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
2670        " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2671        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2672        ,
2673        completed?"completed":"processed", what, spec,
2674        ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
2675        inprog, autodefer, ipf->count_nooffer_missing,
2676        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2677        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2678        CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2679        , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2680        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
2681        );
2682
2683   free(inprog);
2684   free(autodefer);
2685
2686 #undef CNT
2687 }
2688
2689 static void statemc_check_backlog_done(void) {
2690   InputFile *ipf= backlog_input_file;
2691   if (!inputfile_is_done(ipf)) return;
2692
2693   const char *slash= strrchr(ipf->path, '/');
2694   const char *leaf= slash ? slash+1 : ipf->path;
2695   const char *under= strchr(slash, '_');
2696   const char *rest= under ? under+1 : leaf;
2697   if (!strncmp(rest,"backlog",7)) rest += 7;
2698   notice_processed(ipf,1,"backlog ",rest);
2699
2700   close_input_file(ipf);
2701   if (unlink(ipf->path)) {
2702     if (errno != ENOENT)
2703       sysdie("could not unlink processed backlog file %s", ipf->path);
2704     warn("backlog file %s vanished while we were reading it"
2705          " so we couldn't remove it (but it's done now, anyway)",
2706          ipf->path);
2707   }
2708   free(ipf);
2709   backlog_input_file= 0;
2710   search_backlog_file();
2711   return;
2712 }
2713
2714 static void statemc_check_flushing_done(void) {
2715   InputFile *ipf= flushing_input_file;
2716   if (!inputfile_is_done(ipf)) return;
2717
2718   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2719
2720   notice_processed(ipf,1,"feedfile","");
2721
2722   close_defer();
2723
2724   xunlink(path_flushing, "old flushing file");
2725
2726   close_input_file(flushing_input_file);
2727   free(flushing_input_file);
2728   flushing_input_file= 0;
2729
2730   if (sms==sm_SEPARATED) {
2731     notice("flush complete");
2732     SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2733   } else if (sms==sm_DROPPING) {
2734     SMS(DROPPED, max_separated_periods, "old flush complete");
2735     search_backlog_file();
2736     notice("feed dropped, but will continue until backlog is finished");
2737   }
2738 }
2739
2740 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2741                                       void *u) {
2742   assert(!inputfile_is_done(main_input_file));
2743   statemc_check_flushing_done();
2744   statemc_check_backlog_done();
2745   return OOP_CONTINUE;
2746 }
2747
2748 static void queue_check_input_done(void) {
2749   loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2750 }
2751
2752 static void statemc_setstate(StateMachineState newsms, int periods,
2753                              const char *forlog, const char *why) {
2754   sms= newsms;
2755   until_flush= periods;
2756
2757   const char *xtra= "";
2758   switch (sms) {
2759   case sm_FLUSHING:
2760   case sm_FLUSHFAILED:
2761     if (!main_input_file) xtra= "-ABSENT";
2762     break;
2763   case sm_SEPARATED:
2764   case sm_DROPPING:
2765     xtra= flushing_input_file->rd ? "-1" : "-2";
2766     break;
2767   default:;
2768   }
2769
2770   if (periods) {
2771     info("state %s%s[%d] %s",forlog,xtra,periods,why);
2772   } else {
2773     info("state %s%s %s",forlog,xtra,why);
2774   }
2775 }
2776
2777 /*---------- defer and backlog files ----------*/
2778
2779 static void open_defer(void) {
2780   struct stat stab;
2781
2782   if (defer) return;
2783
2784   defer= fopen(path_defer, "a+");
2785   if (!defer) sysfatal("could not open defer file %s", path_defer);
2786
2787   /* truncate away any half-written records */
2788
2789   xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2790
2791   if (stab.st_size > LONG_MAX)
2792     die("defer file %s size is far too large", path_defer);
2793
2794   if (!stab.st_size)
2795     return;
2796
2797   long orgsize= stab.st_size;
2798   long truncto= stab.st_size;
2799   for (;;) {
2800     if (!truncto) break; /* was only (if anything) one half-truncated record */
2801     if (fseek(defer, truncto-1, SEEK_SET) < 0)
2802       sysdie("seek in defer file %s while truncating partial", path_defer);
2803
2804     int r= getc(defer);
2805     if (r==EOF) {
2806       if (ferror(defer))
2807         sysdie("failed read from defer file %s", path_defer);
2808       else
2809         die("defer file %s shrank while we were checking it!", path_defer);
2810     }
2811     if (r=='\n') break;
2812     truncto--;
2813   }
2814
2815   if (stab.st_size != truncto) {
2816     warn("truncating half-record at end of defer file %s -"
2817          " shrinking by %ld bytes from %ld to %ld",
2818          path_defer, orgsize - truncto, orgsize, truncto);
2819
2820     if (fflush(defer))
2821       sysfatal("could not flush defer file %s", path_defer);
2822     if (ftruncate(fileno(defer), truncto))
2823       sysdie("could not truncate defer file %s", path_defer);
2824
2825   } else {
2826     info("continuing existing defer file %s (%ld bytes)",
2827          path_defer, orgsize);
2828   }
2829   if (fseek(defer, truncto, SEEK_SET))
2830     sysdie("could not seek to new end of defer file %s", path_defer);
2831 }
2832
2833 static void close_defer(void) {
2834   if (!defer)
2835     return;
2836
2837   struct stat stab;
2838   xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2839
2840   if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2841   defer= 0;
2842
2843   time_t now= xtime();
2844
2845   char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2846                            (unsigned long)now,
2847                            (unsigned long)stab.st_ino);
2848   if (link(path_defer, backlog))
2849     sysfatal("could not install defer file %s as backlog file %s",
2850            path_defer, backlog);
2851   if (unlink(path_defer))
2852     sysdie("could not unlink old defer link %s to backlog file %s",
2853            path_defer, backlog);
2854
2855   free(backlog);
2856
2857   if (until_backlog_nextscan < 0 ||
2858       until_backlog_nextscan > backlog_retry_minperiods + 1)
2859     until_backlog_nextscan= backlog_retry_minperiods + 1;
2860 }
2861
2862 static void poll_backlog_file(void) {
2863   if (until_backlog_nextscan < 0) return;
2864   if (until_backlog_nextscan-- > 0) return;
2865   search_backlog_file();
2866 }
2867
2868 static void search_backlog_file(void) {
2869   /* returns non-0 iff there are any backlog files */
2870
2871   glob_t gl;
2872   int r, i;
2873   struct stat stab;
2874   const char *oldest_path=0;
2875   time_t oldest_mtime=0, now;
2876
2877   if (backlog_input_file) return;
2878
2879  try_again:
2880
2881   r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2882
2883   switch (r) {
2884   case GLOB_ABORTED:
2885     sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2886   case GLOB_NOSPACE:
2887     fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2888   case 0:
2889     for (i=0; i<gl.gl_pathc; i++) {
2890       const char *path= gl.gl_pathv[i];
2891
2892       if (strchr(path,'#') || strchr(path,'~')) {
2893         debug("backlog file search skipping %s", path);
2894         continue;
2895       }
2896       r= stat(path, &stab);
2897       if (r) {
2898         syswarn("failed to stat backlog file %s", path);
2899         continue;
2900       }
2901       if (!S_ISREG(stab.st_mode)) {
2902         warn("backlog file %s is not a plain file (or link to one)", path);
2903         continue;
2904       }
2905       if (!oldest_path || stab.st_mtime < oldest_mtime) {
2906         oldest_path= path;
2907         oldest_mtime= stab.st_mtime;
2908       }
2909     }
2910   case GLOB_NOMATCH: /* fall through */
2911     break;
2912   default:
2913     sysdie("glob expansion of backlog pattern %s gave unexpected"
2914            " nonzero (error?) return value %d", globpat_backlog, r);
2915   }
2916
2917   if (!oldest_path) {
2918     debug("backlog scan: none");
2919
2920     if (sms==sm_DROPPED) {
2921       preterminate();
2922       notice("feed dropped and our work is complete");
2923
2924       int r= unlink(path_control);
2925       if (r && errno!=ENOENT)
2926         syswarn("failed to remove control symlink for old feed");
2927
2928       xunlink(path_lock,    "lockfile for old feed");
2929       exit(4);
2930     }
2931     until_backlog_nextscan= backlog_spontrescan_periods;
2932     goto xfree;
2933   }
2934
2935   now= xtime();
2936   double age= difftime(now, oldest_mtime);
2937   long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2938
2939   if (age_deficiency <= 0) {
2940     debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2941           age, age_deficiency, oldest_path);
2942
2943     backlog_input_file= open_input_file(oldest_path);
2944     if (!backlog_input_file) {
2945       warn("backlog file %s vanished as we opened it", oldest_path);
2946       globfree(&gl);
2947       goto try_again;
2948     }
2949     inputfile_reading_start(backlog_input_file);
2950     until_backlog_nextscan= -1;
2951     goto xfree;
2952   }
2953
2954   until_backlog_nextscan= age_deficiency / period_seconds;
2955
2956   if (backlog_spontrescan_periods >= 0 &&
2957       until_backlog_nextscan > backlog_spontrescan_periods)
2958     until_backlog_nextscan= backlog_spontrescan_periods;
2959
2960   debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2961         age, age_deficiency, until_backlog_nextscan, oldest_path);
2962
2963  xfree:
2964   globfree(&gl);
2965   return;
2966 }
2967
2968 /*---------- shutdown and signal handling ----------*/
2969
2970 static void preterminate(void) {
2971   if (in_child) return;
2972   notice_processed(main_input_file,0,"feedfile","");
2973   notice_processed(flushing_input_file,0,"flushing","");
2974   if (backlog_input_file)
2975     notice_processed(backlog_input_file,0, "backlog file ",
2976                      backlog_input_file->path);
2977 }
2978
2979 static int signal_self_pipe[2];
2980 static sig_atomic_t terminate_sig_flag;
2981
2982 static void raise_default(int signo) {
2983   xsigsetdefault(signo);
2984   raise(signo);
2985   abort();
2986 }
2987
2988 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2989   assert(fd=signal_self_pipe[0]);
2990   char buf[PIPE_BUF];
2991   int r= read(signal_self_pipe[0], buf, sizeof(buf));
2992   if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2993   if (r==0) die("eof on signal self pipe");
2994   if (terminate_sig_flag) {
2995     preterminate();
2996     notice("terminating (%s)", strsignal(terminate_sig_flag));
2997     raise_default(terminate_sig_flag);
2998   }
2999   return OOP_CONTINUE;
3000 }
3001
3002 static void sigarrived_handler(int signum) {
3003   static char x;
3004   switch (signum) {
3005   case SIGTERM:
3006   case SIGINT:
3007     if (!terminate_sig_flag) terminate_sig_flag= signum;
3008     break;
3009   default:
3010     abort();
3011   }
3012   write(signal_self_pipe[1],&x,1);
3013 }
3014
3015 static void init_signals(void) {
3016   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
3017     sysdie("could not ignore SIGPIPE");
3018
3019   if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
3020
3021   xsetnonblock(signal_self_pipe[0],1);
3022   xsetnonblock(signal_self_pipe[1],1);
3023
3024   struct sigaction sa;
3025   memset(&sa,0,sizeof(sa));
3026   sa.sa_handler= sigarrived_handler;
3027   sa.sa_flags= SA_RESTART;
3028   xsigaction(SIGTERM,&sa);
3029   xsigaction(SIGINT,&sa);
3030
3031   on_fd_read_except(signal_self_pipe[0], sigarrived_event);
3032 }
3033
3034 /*========== flushing the feed ==========*/
3035
3036 static pid_t inndcomm_child;
3037 static int inndcomm_sentinel_fd;
3038
3039 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
3040   assert(inndcomm_child);
3041   assert(fd == inndcomm_sentinel_fd);
3042   int status= xwaitpid(&inndcomm_child, "inndcomm");
3043   inndcomm_child= 0;
3044   
3045   cancel_fd_read_except(fd);
3046   xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
3047   inndcomm_sentinel_fd= 0;
3048
3049   assert(!flushing_input_file);
3050
3051   if (WIFEXITED(status)) {
3052     switch (WEXITSTATUS(status)) {
3053
3054     case INNDCOMMCHILD_ESTATUS_FAIL:
3055       goto failed;
3056
3057     case INNDCOMMCHILD_ESTATUS_NONESUCH:
3058       notice("feed has been dropped by innd, finishing up");
3059       flushing_input_file= main_input_file;
3060       tailing_queue_readable(flushing_input_file);
3061         /* we probably previously returned EAGAIN from our fake read method
3062          * when in fact we were at EOF, so signal another readable event
3063          * so we actually see the EOF */
3064
3065       main_input_file= 0;
3066
3067       if (flushing_input_file) {
3068         SMS(DROPPING, max_separated_periods,
3069             "feed dropped by innd, but must finish last flush");
3070       } else {
3071         close_defer();
3072         SMS(DROPPED, 0, "feed dropped by innd");
3073         search_backlog_file();
3074       }
3075       return OOP_CONTINUE;
3076
3077     case 0:
3078       /* as above */
3079       flushing_input_file= main_input_file;
3080       tailing_queue_readable(flushing_input_file);
3081
3082       main_input_file= open_input_file(feedfile);
3083       if (!main_input_file)
3084         die("flush succeeded but feedfile %s does not exist!", feedfile);
3085
3086       if (flushing_input_file) {
3087         SMS(SEPARATED, max_separated_periods, "recovery flush complete");
3088       } else {
3089         close_defer();
3090         SMS(NORMAL, spontaneous_flush_periods, "flush complete");
3091       }
3092       return OOP_CONTINUE;
3093
3094     default:
3095       goto unexpected_exitstatus;
3096
3097     }
3098   } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
3099     warn("flush timed out trying to talk to innd");
3100     goto failed;
3101   } else {
3102   unexpected_exitstatus:
3103     report_child_status("inndcomm child", status);
3104   }
3105
3106  failed:
3107   SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
3108   return OOP_CONTINUE;
3109 }
3110
3111 static void inndcommfail(const char *what) {
3112   syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
3113   exit(INNDCOMMCHILD_ESTATUS_FAIL);
3114 }
3115
3116 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
3117   int pipefds[2];
3118
3119   notice("flushing %s",why);
3120
3121   assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
3122   assert(!inndcomm_child);
3123   assert(!inndcomm_sentinel_fd);
3124
3125   if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
3126
3127   inndcomm_child= xfork("inndcomm child");
3128
3129   if (!inndcomm_child) {
3130     const char *flushargv[2]= { sitename, 0 };
3131     char *reply;
3132     int r;
3133
3134     xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
3135     /* parent spots the autoclose of pipefds[1] when we die or exit */
3136
3137     if (simulate_flush>=0) {
3138       warn("SIMULATING flush child status %d", simulate_flush);
3139       if (simulate_flush>128) raise(simulate_flush-128);
3140       else exit(simulate_flush);
3141     }
3142
3143     alarm(inndcomm_flush_timeout);
3144     r= ICCopen();                         if (r)   inndcommfail("connect");
3145     r= ICCcommand('f',flushargv,&reply);  if (r<0) inndcommfail("transmit");
3146     if (!r) exit(0); /* yay! */
3147
3148     if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
3149     syswarn("innd ctlinnd flush failed: innd said %s", reply);
3150     exit(INNDCOMMCHILD_ESTATUS_FAIL);
3151   }
3152
3153   simulate_flush= -1;
3154
3155   xclose(pipefds[1], "inndcomm sentinel child's end",0);
3156   inndcomm_sentinel_fd= pipefds[0];
3157   assert(inndcomm_sentinel_fd);
3158   on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
3159
3160   SMS(FLUSHING, 0, why);
3161 }
3162
3163 /*========== main program ==========*/
3164
3165 static void postfork_inputfile(InputFile *ipf) {
3166   if (!ipf) return;
3167   xclose(ipf->fd, "(in child) input file ", ipf->path);
3168 }
3169
3170 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
3171   /* we have no stdio streams that are buffered long-term */
3172   if (!f) return;
3173   if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
3174 }
3175
3176 static void postfork(void) {
3177   in_child= 1;
3178
3179   xsigsetdefault(SIGTERM);
3180   xsigsetdefault(SIGINT);
3181   xsigsetdefault(SIGPIPE);
3182   if (terminate_sig_flag) raise(terminate_sig_flag);
3183
3184   postfork_inputfile(main_input_file);
3185   postfork_inputfile(flushing_input_file);
3186
3187   Conn *conn;
3188   FOR_CONN(conn)
3189     conn_closefd(conn,"(in child) ");
3190
3191   postfork_stdio(defer, "defer file ", path_defer);
3192 }
3193
3194 typedef struct Every Every;
3195 struct Every {
3196   struct timeval interval;
3197   int fixed_rate;
3198   void (*f)(void);
3199 };
3200
3201 static void every_schedule(Every *e, struct timeval base);
3202
3203 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3204   Every *e= e_v;
3205   e->f();
3206   if (!e->fixed_rate) xgettimeofday(&base);
3207   every_schedule(e, base);
3208   return OOP_CONTINUE;
3209 }
3210
3211 static void every_schedule(Every *e, struct timeval base) {
3212   struct timeval when;
3213   timeradd(&base, &e->interval, &when);
3214   loop->on_time(loop, when, every_happens, e);
3215 }
3216
3217 static void every(int interval, int fixed_rate, void (*f)(void)) {
3218   NEW_DECL(Every *,e);
3219   e->interval.tv_sec= interval;
3220   e->interval.tv_usec= 0;
3221   e->fixed_rate= fixed_rate;
3222   e->f= f;
3223   struct timeval now;
3224   xgettimeofday(&now);
3225   every_schedule(e, now);
3226 }
3227
3228 static void filepoll(void) {
3229   filemon_callback(main_input_file);
3230   filemon_callback(flushing_input_file);
3231 }
3232
3233 static char *debug_report_ipf(InputFile *ipf) {
3234   if (!ipf) return xasprintf("none");
3235
3236   const char *slash= strrchr(ipf->path,'/');
3237   const char *path= slash ? slash+1 : ipf->path;
3238
3239   return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
3240                    ipf, path,
3241                    ipf->queue.count, ipf->inprogress, ipf->autodefer,
3242                    (long)ipf->offset, ipf->fd,
3243                    ipf->rd ? "" : ",!rd",
3244                    ipf->skippinglong ? "*skiplong" : "",
3245                    ipf->rd && ipf->paused ? "*paused" : "");
3246 }
3247
3248 static void period(void) {
3249   char *dipf_main=     debug_report_ipf(main_input_file);
3250   char *dipf_flushing= debug_report_ipf(flushing_input_file);
3251   char *dipf_backlog=  debug_report_ipf(backlog_input_file);
3252
3253   debug("PERIOD"
3254         " sms=%s[%d] conns=%d until_connect=%d"
3255         " input_files main:%s flushing:%s backlog:%s[%d]"
3256         " children connecting=%ld inndcomm=%ld"
3257         ,
3258         sms_names[sms], until_flush, conns.count, until_connect,
3259         dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan,
3260         (long)connecting_child, (long)inndcomm_child
3261         );
3262
3263   free(dipf_main);
3264   free(dipf_flushing);
3265   free(dipf_backlog);
3266
3267   if (until_connect) until_connect--;
3268
3269   inputfile_queue_check_expired(backlog_input_file);
3270   poll_backlog_file();
3271   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3272   statemc_period_poll();
3273   check_assign_articles();
3274   check_idle_conns();
3275 }
3276
3277
3278 /*========== dumping state ==========*/
3279
3280 static void dump_article_list(FILE *f, const ControlCommand *c,
3281                               const ArticleList *al) {
3282   fprintf(f, " count=%d\n", al->count);
3283   if (!c->xval) return;
3284   
3285   int i; Article *art;
3286   for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3287     fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3288     DUMPV("%p", art->,ipf);
3289     DUMPV("%d", art->,missing);
3290     DUMPV("%lu", (unsigned long)art->,offset);
3291     DUMPV("%d", art->,blanklen);
3292     DUMPV("%d", art->,midlen);
3293     fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3294   }
3295 }
3296   
3297 static void dump_input_file(FILE *f, const ControlCommand *c,
3298                             InputFile *ipf, const char *wh) {
3299   char *dipf= debug_report_ipf(ipf);
3300   fprintf(f,"input %s %s", wh, dipf);
3301   free(dipf);
3302   
3303   if (ipf) {
3304     DUMPV("%d", ipf->,readcount_ok);
3305     DUMPV("%d", ipf->,readcount_blank);
3306     DUMPV("%d", ipf->,readcount_err);
3307   }
3308   fprintf(f,"\n");
3309   if (ipf) {
3310     ArtState state; const char *const *statename; 
3311     for (state=0, statename=artstate_names; *statename; state++,statename++) {
3312 #define RC_DUMP_FMT(x) " " #x "=%d"
3313 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3314       fprintf(f,"input %s counts %-11s"
3315               RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3316               wh, *statename
3317               RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3318     }
3319     fprintf(f,"input %s queue", wh);
3320     dump_article_list(f,c,&ipf->queue);
3321   }
3322 }
3323
3324 CCMD(dump) {
3325   int i;
3326   fprintf(cc->out, "dumping state to %s\n", path_dump);
3327   FILE *f= fopen(path_dump, "w");
3328   if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3329
3330   fprintf(f,"general");
3331   DUMPV("%s", sms_names,[sms]);
3332   DUMPV("%d", ,until_flush);
3333   DUMPV("%ld", (long),self_pid);
3334   DUMPV("%p", , defer);
3335   DUMPV("%d", , until_connect);
3336   DUMPV("%d", , until_backlog_nextscan);
3337   DUMPV("%d", , simulate_flush);
3338   fprintf(f,"\nnocheck");
3339   DUMPV("%#.10f", , accept_proportion);
3340   DUMPV("%d", , nocheck);
3341   DUMPV("%d", , nocheck_reported);
3342   fprintf(f,"\n");
3343
3344   fprintf(f,"special");
3345   DUMPV("%ld", (long),connecting_child);
3346   DUMPV("%d", , connecting_fdpass_sock);
3347   DUMPV("%d", , control_master);
3348   fprintf(f,"\n");
3349
3350   fprintf(f,"filemon ");
3351   filemon_method_dump_info(f);
3352
3353   dump_input_file(f,c, main_input_file,     "main"    );
3354   dump_input_file(f,c, flushing_input_file, "flushing");
3355   dump_input_file(f,c, backlog_input_file,  "backlog" );
3356
3357   fprintf(f,"conns count=%d\n", conns.count);
3358
3359   Conn *conn;
3360   FOR_CONN(conn) {
3361
3362     fprintf(f,"C%d",conn->fd);
3363     DUMPV("%p",conn->,rd);             DUMPV("%d",conn->,max_queue);
3364     DUMPV("%d",conn->,stream);         DUMPV("%d",conn->,quitting);
3365     DUMPV("%d",conn->,since_activity);
3366     fprintf(f,"\n");
3367
3368     fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3369     fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3370     fprintf(f,"C%d sent",    conn->fd); dump_article_list(f,c,&conn->sent);
3371
3372     fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3373     for (i=0; i<conn->xmitu; i++) {
3374       const struct iovec *iv= &conn->xmit[i];
3375       const XmitDetails *xd= &conn->xmitd[i];
3376       char *dinfo;
3377       switch (xd->kind) {
3378       case xk_Const:    dinfo= xasprintf("Const");                 break;
3379       case xk_Artdata:  dinfo= xasprintf("A%p", xd->info.sm_art);  break;
3380       default:
3381         abort();
3382       }
3383       fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3384               sanitise(iv->iov_base, iv->iov_len));
3385       free(dinfo);
3386     }
3387   }
3388
3389   fprintf(f,"paths");
3390   DUMPV("%s", , path_lock);
3391   DUMPV("%s", , path_flushing);
3392   DUMPV("%s", , path_defer);
3393   DUMPV("%s", , path_control);
3394   DUMPV("%s", , path_dump);
3395   DUMPV("%s", , globpat_backlog);
3396   fprintf(f,"\n");
3397
3398   if (!!ferror(f) + !!fclose(f)) {
3399     fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3400     return;
3401   }
3402 }
3403
3404 /*========== option parsing ==========*/
3405
3406 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3407 static void vbadusage(const char *fmt, va_list al) {
3408   char *m= xvasprintf(fmt,al);
3409   fprintf(stderr, "bad usage: %s\n"
3410           "say --help for help, or read the manpage\n",
3411           m);
3412   if (become_daemon)
3413     syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3414   exit(8);
3415 }
3416
3417 /*---------- generic option parser ----------*/
3418
3419 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3420 static void badusage(const char *fmt, ...) {
3421   va_list al;
3422   va_start(al,fmt);
3423   vbadusage(fmt,al);
3424 }
3425
3426 enum OptFlags {
3427   of_seconds= 001000u,
3428   of_boolean= 002000u,
3429 };
3430
3431 typedef struct Option Option;
3432 typedef void OptionParser(const Option*, const char *val);
3433
3434 struct Option {
3435   int shrt;
3436   const char *lng, *formarg;
3437   void *store;
3438   OptionParser *fn;
3439   int intval;
3440 };
3441
3442 static void parse_options(const Option *options, char ***argvp) {
3443   /* on return *argvp is first non-option arg; argc is not updated */
3444
3445   for (;;) {
3446     const char *arg= *++(*argvp);
3447     if (!arg) break;
3448     if (*arg != '-') break;
3449     if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3450     int a;
3451     while ((a= *++arg)) {
3452       const Option *o;
3453       if (a=='-') {
3454         arg++;
3455         char *equals= strchr(arg,'=');
3456         int len= equals ? (equals - arg) : strlen(arg);
3457         for (o=options; o->shrt || o->lng; o++)
3458           if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3459             goto found_long;
3460         badusage("unknown long option --%s",arg);
3461       found_long:
3462         if (!o->formarg) {
3463           if (equals) badusage("option --%s does not take a value",o->lng);
3464           arg= 0;
3465         } else if (equals) {
3466           arg= equals+1;
3467         } else {
3468           arg= *++(*argvp);
3469           if (!arg) badusage("option --%s needs a value for %s",
3470                              o->lng, o->formarg);
3471         }
3472         o->fn(o, arg);
3473         break; /* eaten the whole argument now */
3474       }
3475       for (o=options; o->shrt || o->lng; o++)
3476         if (a == o->shrt)
3477           goto found_short;
3478       badusage("unknown short option -%c",a);
3479     found_short:
3480       if (!o->formarg) {
3481         o->fn(o,0);
3482       } else {
3483         if (!*++arg) {
3484           arg= *++(*argvp);
3485           if (!arg) badusage("option -%c needs a value for %s",
3486                              o->shrt, o->formarg);
3487         }
3488         o->fn(o,arg);
3489         break; /* eaten the whole argument now */
3490       }
3491     }
3492   }
3493 }
3494
3495 #define DELIMPERHAPS(delim,str)  (str) ? (delim) : "", (str) ? (str) : ""
3496
3497 static void print_options(const Option *options, FILE *f) {
3498   const Option *o;
3499   for (o=options; o->shrt || o->lng; o++) {
3500     char shrt[2] = { o->shrt, 0 };
3501     char *optspec= xasprintf("%s%s%s%s%s",
3502                              o->shrt ? "-" : "", shrt,
3503                              o->shrt && o->lng ? "|" : "",
3504                              DELIMPERHAPS("--", o->lng));
3505     fprintf(f, "  %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3506     free(optspec);
3507   }
3508 }
3509
3510 /*---------- specific option types ----------*/
3511
3512 static void op_integer(const Option *o, const char *val) {
3513   char *ep;
3514   errno= 0;
3515   unsigned long ul= strtoul(val,&ep,10);
3516   if (*ep || ep==val || errno || ul>INT_MAX)
3517     badusage("bad integer value for %s",o->lng);
3518   int *store= o->store;
3519   *store= ul;
3520 }
3521
3522 static void op_double(const Option *o, const char *val) {
3523   int *store= o->store;
3524   char *ep;
3525   errno= 0;
3526   *store= strtod(val, &ep);
3527   if (*ep || ep==val || errno)
3528     badusage("bad floating point value for %s",o->lng);
3529 }
3530
3531 static void op_string(const Option *o, const char *val) {
3532   const char **store= o->store;
3533   *store= val;
3534 }
3535
3536 static void op_seconds(const Option *o, const char *val) {
3537   int *store= o->store;
3538   char *ep;
3539   int unit;
3540
3541   double v= strtod(val,&ep);
3542   if (ep==val) badusage("bad time/duration value for %s",o->lng);
3543
3544   if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3545   else if (!strcmp(ep,"m") || !strcmp(ep,"min"))    unit= 60;
3546   else if (!strcmp(ep,"h") || !strcmp(ep,"hour"))   unit= 3600;
3547   else if (!strcmp(ep,"d") || !strcmp(ep,"day"))    unit= 86400;
3548   else if (!strcmp(ep,"das")) unit= 10;
3549   else if (!strcmp(ep,"hs"))  unit= 100;
3550   else if (!strcmp(ep,"ks"))  unit= 1000;
3551   else if (!strcmp(ep,"Ms"))  unit= 1000000;
3552   else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3553
3554   v *= unit;
3555   v= ceil(v);
3556   if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3557   *store= v;
3558 }
3559
3560 static void op_setint(const Option *o, const char *val) {
3561   int *store= o->store;
3562   *store= o->intval;
3563 }
3564
3565 /*---------- specific options ----------*/
3566
3567 static void help(const Option *o, const char *val);
3568
3569 static const Option innduct_options[]= {
3570 {'f',"feedfile",         "F",     &feedfile,                 op_string      },
3571 {'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
3572 {0,"no-daemon",          0,       &become_daemon,            op_setint, 0   },
3573 {0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
3574 {0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
3575 {'C',"inndconf",         "F",     &inndconffile,             op_string      },
3576 {'P',"port",             "PORT",  &port,                     op_integer     },
3577 {0,"ctrl-sock-dir",      0,       &realsockdir,              op_string      },
3578 {0,"help",               0,       0,                         help           },
3579
3580 {0,"max-connections",    "N",     &max_connections,          op_integer     },
3581 {0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
3582 {0,"max-queue-per-file", "N",     &max_queue_per_ipf,        op_integer     },
3583 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
3584 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
3585
3586 {0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
3587 {0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
3588 {0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
3589
3590 {0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
3591 {0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
3592
3593 {0,"reconnect-interval",     "PERIOD", &reconnect_delay_periods,  op_seconds },
3594 {0,"flush-retry-interval",   "PERIOD", &flushfail_retry_periods,  op_seconds },
3595 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3596 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3597 {0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
3598 {0,"flush-finish-timeout",   "PERIOD", &max_separated_periods,    op_seconds },
3599 {0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
3600
3601 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
3602 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
3603
3604 {0,0}
3605 };
3606
3607 static void printusage(FILE *f) {
3608   fputs("usage: innduct [options] site [fqdn]\n"
3609         "available options are:\n", f);
3610   print_options(innduct_options, f);
3611 }
3612
3613 static void help(const Option *o, const char *val) {
3614   printusage(stdout);
3615   if (ferror(stdout) || fflush(stdout)) {
3616     perror("innduct: writing help");
3617     exit(12);
3618   }
3619   exit(0);
3620 }
3621
3622 static void convert_to_periods_rndup(int *store) {
3623   *store += period_seconds-1;
3624   *store /= period_seconds;
3625 }
3626
3627 int main(int argc, char **argv) {
3628   if (!argv[1]) {
3629     printusage(stderr);
3630     exit(8);
3631   }
3632
3633   parse_options(innduct_options, &argv);
3634
3635   /* arguments */
3636
3637   sitename= *argv++;
3638   if (!sitename) badusage("need site name argument");
3639   remote_host= *argv++;
3640   if (*argv) badusage("too many non-option arguments");
3641
3642   /* defaults */
3643
3644   int r= innconf_read(inndconffile);
3645   if (!r) badusage("could not read inn.conf (more info on stderr)");
3646
3647   if (!remote_host) remote_host= sitename;
3648
3649   if (nocheck_thresh < 0 || nocheck_thresh > 100)
3650     badusage("nocheck threshold percentage must be between 0..100");
3651   nocheck_thresh *= 0.01;
3652
3653   if (nocheck_decay < 0.1)
3654     badusage("nocheck decay articles must be at least 0.1");
3655   nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3656
3657   convert_to_periods_rndup(&reconnect_delay_periods);
3658   convert_to_periods_rndup(&flushfail_retry_periods);
3659   convert_to_periods_rndup(&backlog_retry_minperiods);
3660   convert_to_periods_rndup(&backlog_spontrescan_periods);
3661   convert_to_periods_rndup(&spontaneous_flush_periods);
3662   convert_to_periods_rndup(&max_separated_periods);
3663   convert_to_periods_rndup(&need_activity_periods);
3664
3665   if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3666     badusage("bad input data ratio must be between 0..100");
3667   max_bad_data_ratio *= 0.01;
3668
3669   if (!feedfile) {
3670     feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3671   } else if (!feedfile[0]) {
3672     badusage("feed filename must be nonempty");
3673   } else if (feedfile[strlen(feedfile)-1]=='/') {
3674     feedfile= xasprintf("%s%s",feedfile,sitename);
3675   }
3676
3677   if (max_queue_per_ipf<0)
3678     max_queue_per_ipf= max_queue_per_conn * 2;
3679
3680   const char *feedfile_forbidden= "?*[~#";
3681   int c;
3682   while ((c= *feedfile_forbidden++))
3683     if (strchr(feedfile, c))
3684       badusage("feed filename may not contain metacharacter %c",c);
3685
3686   /* set things up */
3687
3688   path_lock=        xasprintf("%s_lock",      feedfile);
3689   path_flushing=    xasprintf("%s_flushing",  feedfile);
3690   path_defer=       xasprintf("%s_defer",     feedfile);
3691   path_control=     xasprintf("%s_control",   feedfile);
3692   path_dump=        xasprintf("%s_dump",      feedfile);
3693   globpat_backlog=  xasprintf("%s_backlog*",  feedfile);
3694
3695   oop_source_sys *sysloop= oop_sys_new();
3696   if (!sysloop) sysdie("could not create liboop event loop");
3697   loop= (oop_source*)sysloop;
3698
3699   LIST_INIT(conns);
3700
3701   if (become_daemon) {
3702     int i;
3703     for (i=3; i<255; i++)
3704       /* do this now before we open syslog, etc. */
3705       close(i);
3706     openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3707
3708     int null= open("/dev/null",O_RDWR);
3709     if (null<0) sysfatal("failed to open /dev/null");
3710     dup2(null,0);
3711     dup2(null,1);
3712     dup2(null,2);
3713     xclose(null, "/dev/null original fd",0);
3714
3715     pid_t child1= xfork("daemonise first fork");
3716     if (child1) _exit(0);
3717
3718     pid_t sid= setsid();
3719     if (sid != child1) sysfatal("setsid failed");
3720
3721     pid_t child2= xfork("daemonise second fork");
3722     if (child2) _exit(0);
3723   }
3724
3725   self_pid= getpid();
3726   if (self_pid==-1) sysdie("getpid");
3727
3728   statemc_lock();
3729
3730   init_signals();
3731
3732   notice("starting");
3733
3734   if (!become_daemon)
3735     control_stdio();
3736
3737   control_init();
3738
3739   int filemon_ok= 0;
3740   if (!try_filemon) {
3741     notice("filemon: suppressed by command line option, polling");
3742   } else {
3743     filemon_ok= filemon_method_init();
3744     if (!filemon_ok)
3745       warn("filemon: no file monitoring available, polling");
3746   }
3747   if (!filemon_ok)
3748     every(filepoll_seconds,0,filepoll);
3749
3750   every(period_seconds,1,period);
3751
3752   statemc_init();
3753
3754   /* let's go */
3755
3756   void *run= oop_sys_run(sysloop);
3757   assert(run == OOP_ERROR);
3758   sysdie("event loop failed");
3759 }