chiark / gitweb /
dump control command
[innduct.git] / backends / innduct.c
1 /*
2  * todo
3  *  - rename defraise to raise_default
4  *  - xmalloc + memset -> xcalloc
5  *  - macro for conn iteration
6  *  - skipping_long offset calculation is wrong
7  *  - reset signals TERM and INT (and HUP) in children
8  *
9  *  - manpage: document control master stuff
10  *  - admin-initiated flush
11  *
12  * debugging rune:
13  *  build-lfs/backends/innduct --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
14  */
15
16 /*
17  * Newsfeeds file entries should look like this:
18  *     host.name.of.site[/exclude,exclude,...]\
19  *             :pattern,pattern...[/distribution,distribution...]\
20  *             :Tf,Wnm
21  *             :
22  * or
23  *     sitename[/exclude,exclude,...]\
24  *             :pattern,pattern...[/distribution,distribution...]\
25  *             :Tf,Wnm
26  *             :host.name.of.site
27  *
28  * Four files full of
29  *    token messageid
30  * or might be blanked out
31  *    <spc><spc><spc><spc>....
32  *
33  * F site.name                 main feed file
34  *                                opened/created, then written, by innd
35  *                                read by duct
36  *                                unlinked by duct
37  *                                tokens blanked out by duct when processed
38  *   site.name_lock            lock preventing multiple ducts
39  *                                to hold lock must open,F_SETLK[W]
40  *                                  and then stat to check that locked file
41  *                                  still has name site.name_lock
42  *                                holder of this lock is "duct"
43  *                                (only) lockholder may remove the lockfile
44  * D site.name_flushing        temporary feed file during flush (or crash)
45  *                                hardlink created by duct
46  *                                unlinked by duct
47  *   site.name_defer           431'd articles, still being written,
48  *                                created, written, used by duct
49  *
50  *   site.name_backlog.<date>.<inum>
51  *                             431'd articles, ready for innxmit or duct
52  *                                created (link/mv) by duct
53  *   site.name_backlog<anything-else>  (where <anything-else> does not
54  *                                      contain '#' or '~') eg
55  *   site.name_backlog.manual
56  *                             anything the sysadmin likes (eg, feed files
57  *                             from old feeds to be merged into this one)
58  *                                created (link/mv) by admin
59  *                                may be symlinks (in which case links
60  *                                may be written through, but only links
61  *                                will be removed.
62  *
63  *                             It is safe to remove backlog files manually,
64  *                             if it's desired to throw away the backlog.
65  *
66  * Backlog files are also processed by innduct.  We find the oldest
67  * backlog file which is at least a certain amount old, and feed it
68  * back into our processing.  When every article in it has been read
69  * and processed, we unlink it and look for another backlog file.
70  *
71  * If we don't have a backlog file that we're reading, we close the
72  * defer file that we're writing and make it into a backlog file at
73  * the first convenient opportunity.
74  * -8<-
75
76
77    OVERALL STATES:
78
79                                                                 START
80                                                                   |
81      ,-->--.                                                 check F, D
82      |     |                                                      |
83      |     |                                                      |
84      |     |  <----------------<---------------------------------'|
85      |     |                                       F exists       |
86      |     |                                       D ENOENT       |
87      |     |  duct opens F                                        |
88      |     V                                                      |
89      |  Normal                                                    |
90      |   F: innd writing, duct reading                            |
91      |   D: ENOENT                                                |
92      |     |                                                      |
93      |     |  duct decides time to flush                          |
94      |     |  duct makes hardlink                                 |
95      |     |                                                      |
96      |     V                            <------------------------'|
97      |  Hardlinked                                  F==D          |
98      |   F == D: innd writing, duct reading         both exist    |
99      ^     |                                                      |
100      |     |  duct unlinks F                                      |
101      |     |                        <-----------<-------------<--'|
102      |     |                           open D         F ENOENT    |
103      |     |                           if exists                  |
104      |     |                                                      |
105      |     V                        <---------------------.       |
106      |  Moved                                             |       |
107      |   F: ENOENT                                        |       |
108      |   D: innd writing, duct reading; or ENOENT         |       |
109      |     |                                              |       |
110      |     |  duct requests flush of feed                 |       |
111      |     |   (others can too, harmlessly)               |       |
112      |     V                                              |       |
113      |  Flushing                                          |       |
114      |   F: ENOENT                                        |       |
115      |   D: innd flushing, duct; or ENOENT                |       |
116      |     |                                              |       |
117      |     |   inndcomm flush fails                       |       |
118      |     |`-------------------------->------------------'       |
119      |     |                                                      |
120      |     |   inndcomm reports no such site                      |
121      |     |`---------------------------------------------------- | -.
122      |     |                                                      |  |
123      |     |  innd finishes writing D, creates F                  |  |
124      |     |  inndcomm reports flush successful                   |  |
125      |     |                                                      |  |
126      |     V                                                      |  |
127      |  Separated                                <----------------'  |
128      |   F: innd writing                            F!=D             /
129      |   D: duct reading; or ENOENT                  both exist     /
130      |     |                                                       /
131      |     |  duct gets to the end of D                           /
132      |     |  duct opens F too                                   /
133      |     V                                                    /
134      |  Finishing                                              /
135      |   F: innd writing, duct reading                        |
136      |   D: duct finishing                                    V
137      |     |                                            Dropping
138      |     |  duct finishes processing D                 F: ENOENT
139      |     V  duct unlinks D                             D: duct reading
140      |     |                                                  |
141      `--<--'                                                  | duct finishes
142                                                               |  processing D
143                                                               | duct unlinks D
144                                                               | duct exits
145                                                               V
146                                                         Dropped
147                                                          F: ENOENT
148                                                          D: ENOENT
149                                                          duct not running
150
151    "duct reading" means innduct is reading the file but also
152    overwriting processed tokens.
153
154  * ->8- -^L-
155  *
156  * rune for printing diagrams:
157
158 perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops
159
160  *
161  */
162
163 /*============================== PROGRAM ==============================*/
164
165 #define _GNU_SOURCE 1
166
167 #include "config.h"
168 #include "storage.h"
169 #include "nntp.h"
170 #include "libinn.h"
171 #include "inndcomm.h"
172
173 #include "inn/list.h"
174 #include "inn/innconf.h"
175
176 #include <sys/uio.h>
177 #include <sys/types.h>
178 #include <sys/wait.h>
179 #include <sys/stat.h>
180 #include <sys/socket.h>
181 #include <sys/un.h>
182 #include <unistd.h>
183 #include <string.h>
184 #include <signal.h>
185 #include <stdio.h>
186 #include <errno.h>
187 #include <syslog.h>
188 #include <fcntl.h>
189 #include <stdarg.h>
190 #include <assert.h>
191 #include <stdlib.h>
192 #include <stddef.h>
193 #include <glob.h>
194 #include <time.h>
195 #include <math.h>
196 #include <ctype.h>
197
198 #include <oop.h>
199 #include <oop-read.h>
200
201 /*----- general definitions, probably best not changed -----*/
202
203 #define CONNCHILD_ESTATUS_STREAM   24
204 #define CONNCHILD_ESTATUS_NOSTREAM 25
205
206 #define INNDCOMMCHILD_ESTATUS_FAIL     26
207 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
208
209 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
210 #define MAX_CONTROL_COMMAND 1000
211
212 #define VA                va_list al;  va_start(al,fmt)
213 #define PRINTF(f,a)       __attribute__((__format__(printf,f,a)))
214 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
215
216 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
217
218 /*----- doubly linked lists -----*/
219
220 #define ISNODE(T)   struct node list_node
221 #define DEFLIST(T)                              \
222    typedef struct {                             \
223      union { struct list li; T *for_type; } u;  \
224      int count;                                 \
225    } T##List
226
227 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
228
229 #define LIST_CHECKCANHAVENODE(l,n) \
230   ((void)((n) == ((l).u.for_type))) /* just for the type check */
231
232 #define LIST_ADDSOMEHOW(l,n,list_addsomehow)    \
233  ( LIST_CHECKCANHAVENODE(l,n),                  \
234    list_addsomehow(&(l).u.li, NODE((n))),       \
235    (void)(l).count++                            \
236    )
237
238 #define LIST_REMSOMEHOW(l,list_remsomehow)      \
239  ( (typeof((l).u.for_type))                     \
240    ( (l).count                                  \
241      ? ( (l).count--,                           \
242          list_remsomehow(&(l).u.li) )           \
243      : 0                                        \
244      )                                          \
245    )
246
247
248 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
249 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
250 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
251 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
252
253 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
254 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
255 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
256 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
257
258 #define LIST_REMOVE(l,n)                        \
259  ( LIST_CHECKCANHAVENODE(l,n),                  \
260    list_remove(NODE((n))),                      \
261    (void)(l).count--                            \
262    )
263
264 #define LIST_INSERT(l,n,pred)                                   \
265  ( LIST_CHECKCANHAVENODE(l,n),                                  \
266    LIST_CHECKCANHAVENODE(l,pred),                               \
267    list_insert((struct list*)&(l), NODE((n)), NODE((pred))),    \
268    (void)(l).count++                                            \
269    )
270
271 /*----- type predeclarations -----*/
272
273 typedef struct Conn Conn;
274 typedef struct Article Article;
275 typedef struct InputFile InputFile;
276 typedef struct XmitDetails XmitDetails;
277 typedef struct Filemon_Perfile Filemon_Perfile;
278 typedef enum StateMachineState StateMachineState;
279 typedef struct ControlCommand ControlCommand;
280
281 DEFLIST(Conn);
282 DEFLIST(Article);
283
284 /*----- function predeclarations -----*/
285
286 static void conn_maybe_write(Conn *conn);
287 static void conn_make_some_xmits(Conn *conn);
288 static void *conn_write_some_xmits(Conn *conn);
289
290 static void xmit_free(XmitDetails *d);
291
292 #define SMS(newstate, periods, why) \
293    (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
294 static void statemc_setstate(StateMachineState newsms, int periods,
295                              const char *forlog, const char *why);
296
297 static void statemc_start_flush(const char *why); /* Normal => Flushing */
298 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
299
300 static void article_done(Conn *conn, Article *art, int whichcount);
301
302 static void check_assign_articles(void);
303 static void queue_check_input_done(void);
304
305 static void statemc_check_flushing_done(void);
306 static void statemc_check_backlog_done(void);
307
308 static void postfork(void);
309 static void period(void);
310
311 static void open_defer(void);
312 static void close_defer(void);
313 static void search_backlog_file(void);
314 static void preterminate(void);
315 static void defraise(int signo);
316 static char *debug_report_ipf(InputFile *ipf);
317
318 static void inputfile_reading_start(InputFile *ipf);
319 static void inputfile_reading_stop(InputFile *ipf);
320
321 static void filemon_start(InputFile *ipf);
322 static void filemon_stop(InputFile *ipf);
323 static void filemon_callback(InputFile *ipf);
324
325 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
326 static void connfail(Conn *conn, const char *fmt, ...)         PRINTF(2,3);
327
328 static const oop_rd_style peer_rd_style;
329 static oop_rd_call peer_rd_err, peer_rd_ok;
330
331 /*----- configuration options -----*/
332 /* when changing defaults, remember to update the manpage */
333
334 static const char *sitename, *remote_host;
335 static const char *feedfile, *realsockdir="/tmp/innduct.control";
336 static int quiet_multiple=0;
337 static int become_daemon=1, try_filemon=1;
338 static int try_stream=1;
339 static int port=119;
340 static const char *inndconffile;
341
342 static int max_connections=10;
343 static int max_queue_per_conn=200;
344 static int target_max_feedfile_size=100000;
345 static int period_seconds=60;
346 static int filepoll_seconds=5;
347
348 static int connection_setup_timeout=200;
349 static int inndcomm_flush_timeout=100;
350
351 static double nocheck_thresh= 95.0; /* converted from percentage by main */
352 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
353
354 /* all these are initialised to seconds, and converted to periods in main */
355 static int reconnect_delay_periods=1000;
356 static int flushfail_retry_periods=1000;
357 static int backlog_retry_minperiods=50;
358 static int backlog_spontrescan_periods=300;
359 static int spontaneous_flush_periods=100000;
360 static int need_activity_periods=1000;
361
362 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
363 static int max_bad_data_initial= 30;
364   /* in one corrupt 4096-byte block the number of newlines has
365    * mean 16 and standard deviation 3.99.  30 corresponds to z=+3.5 */
366
367
368 /*----- statistics -----*/
369
370 typedef enum {      /* in queue                 in conn->sent             */
371   art_Unchecked,    /*   not checked, not sent    checking                */
372   art_Wanted,       /*   checked, wanted          sent body as requested  */
373   art_Unsolicited,  /*   -                        sent body without check */
374   art_MaxState,
375 } ArtState;
376
377 static const char *const artstate_names[]=
378   { "Unchecked", "Wanted", "Unsolicited", 0 };
379
380 #define RESULT_COUNTS(RCS,RCN)                  \
381   RCS(sent)                                     \
382   RCS(accepted)                                 \
383   RCN(unwanted)                                 \
384   RCN(rejected)                                 \
385   RCN(deferred)                                 \
386   RCN(missing)                                  \
387   RCN(connretry)
388
389 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
390 #define RCI_TRIPLE_VALS_BASE(counts,x)          \
391        counts[art_Unchecked] x                  \
392        + counts[art_Wanted] x                   \
393        + counts[art_Unsolicited] x,             \
394        counts[art_Unchecked] x                  \
395        , counts[art_Wanted] x                   \
396        , counts[art_Unsolicited] x
397
398 typedef enum {
399 #define RC_INDEX(x) RC_##x,
400   RESULT_COUNTS(RC_INDEX, RC_INDEX)
401   RCI_max
402 } ResultCountIndex;
403
404
405 /*----- transmission buffers -----*/
406
407 #define CONNIOVS 128
408
409 typedef enum {
410   xk_Malloc, xk_Const, xk_Artdata
411 } XmitKind;
412
413 struct XmitDetails {
414   XmitKind kind;
415   union {
416     char *malloc_tofree;
417     ARTHANDLE *sm_art;
418   } info;
419 };
420
421
422 /*----- core operational data structure types -----*/
423
424 struct InputFile {
425   /* This is also an instance of struct oop_readable */
426   struct oop_readable readable; /* first */
427   oop_readable_call *readable_callback;
428   void *readable_callback_user;
429
430   int fd;
431   Filemon_Perfile *filemon;
432
433   oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
434   long inprogress; /* no. of articles read but not processed */
435   off_t offset;
436   int skippinglong;
437
438   int counts[art_MaxState][RCI_max];
439   int readcount_ok, readcount_blank, readcount_err;
440   char path[];
441 };
442
443 struct Article {
444   ISNODE(Article);
445   ArtState state;
446   int midlen, missing;
447   InputFile *ipf;
448   TOKEN token;
449   off_t offset;
450   int blanklen;
451   char messageid[1];
452 };
453
454 #define SMS_LIST(X)                             \
455   X(NORMAL)                                     \
456   X(FLUSHING)                                   \
457   X(FLUSHFAILED)                                \
458   X(SEPARATED)                                  \
459   X(DROPPING)                                   \
460   X(DROPPED)
461
462 enum StateMachineState {
463 #define SMS_DEF_ENUM(s) sm_##s,
464   SMS_LIST(SMS_DEF_ENUM)
465 };
466
467 static const char *sms_names[]= {
468 #define SMS_DEF_NAME(s) #s ,
469   SMS_LIST(SMS_DEF_NAME)
470   0
471 };
472
473 struct Conn {
474   ISNODE(Conn);
475   int fd; /* may be 0, meaning closed (during construction/destruction) */
476   oop_read *rd; /* likewise */
477   int max_queue, stream, quitting;
478   int since_activity; /* periods */
479   ArticleList waiting; /* not yet told peer */
480   ArticleList priority; /* peer says send it now */
481   ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
482   struct iovec xmit[CONNIOVS];
483   XmitDetails xmitd[CONNIOVS];
484   int xmitu;
485 };
486
487
488 /*----- general operational variables -----*/
489
490 /* main initialises */
491 static oop_source *loop;
492 static ConnList conns;
493 static ArticleList queue;
494 static char *path_lock, *path_flushing, *path_defer;
495 static char *path_control, *path_dump;
496 static char *globpat_backlog;
497 static pid_t self_pid;
498
499 /* statemc_init initialises */
500 static StateMachineState sms;
501 static int sm_period_counter;
502 static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
503 static FILE *defer;
504
505 /* initialisation to 0 is good */
506 static int until_connect, until_backlog_nextscan;
507 static double accept_proportion;
508 static int nocheck, nocheck_reported, in_child;
509
510 /* for simulation, debugging, etc. */
511 int simulate_flush= -1;
512
513 /*========== logging ==========*/
514
515 static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
516 static void logcore(int sysloglevel, const char *fmt, ...) {
517   VA;
518   if (become_daemon) {
519     vsyslog(sysloglevel,fmt,al);
520   } else {
521     if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid);
522     vfprintf(stderr,fmt,al);
523     putc('\n',stderr);
524   }
525   va_end(al);
526 }
527
528 static void logv(int sysloglevel, const char *pfx, int errnoval,
529                  const char *fmt, va_list al) PRINTF(5,0);
530 static void logv(int sysloglevel, const char *pfx, int errnoval,
531                  const char *fmt, va_list al) {
532   char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
533   vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
534   msgbuf[sizeof(msgbuf)-1]= 0;
535
536   if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM))
537     sysloglevel= LOG_ERR; /* run by wrong user, probably */
538
539   logcore(sysloglevel, "<%s>%s: %s%s%s",
540          sitename, pfx, msgbuf,
541          errnoval>=0 ? ": " : "",
542          errnoval>=0 ? strerror(errnoval) : "");
543 }
544
545 #define diewrap(fn, pfx, sysloglevel, err, estatus)             \
546   static void fn(const char *fmt, ...) NORET_PRINTF(1,2);       \
547   static void fn(const char *fmt, ...) {                        \
548     preterminate();                                             \
549     VA;                                                         \
550     logv(sysloglevel, pfx, err, fmt, al);                       \
551     exit(estatus);                                              \
552   }
553
554 #define logwrap(fn, pfx, sysloglevel, err)              \
555   static void fn(const char *fmt, ...) PRINTF(1,2);     \
556   static void fn(const char *fmt, ...) {                \
557     VA;                                                 \
558     logv(sysloglevel, pfx, err, fmt, al);               \
559     va_end(al);                                         \
560   }
561
562 diewrap(sysdie,   " critical", LOG_CRIT,    errno, 16);
563 diewrap(die,      " critical", LOG_CRIT,    -1,    16);
564
565 diewrap(sysfatal, " fatal",    LOG_ERR,     errno, 12);
566 diewrap(fatal,    " fatal",    LOG_ERR,     -1,    12);
567
568 logwrap(syswarn,  " warning",  LOG_WARNING, errno);
569 logwrap(warn,     " warning",  LOG_WARNING, -1);
570
571 logwrap(notice,   " notice",   LOG_NOTICE,  -1);
572 logwrap(info,     " info",     LOG_INFO,    -1);
573 logwrap(debug,    " debug",    LOG_DEBUG,   -1);
574
575
576 /*========== utility functions etc. ==========*/
577
578 static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
579 static char *xvasprintf(const char *fmt, va_list al) {
580   char *str;
581   int rc= vasprintf(&str,fmt,al);
582   if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
583   return str;
584 }
585 static char *xasprintf(const char *fmt, ...) PRINTF(1,2);
586 static char *xasprintf(const char *fmt, ...) {
587   VA;
588   char *str= xvasprintf(fmt,al);
589   va_end(al);
590   return str;
591 }
592
593 static int close_perhaps(int *fd) {
594   if (*fd <= 0) return 0;
595   int r= close(*fd);
596   *fd=0;
597   return r;
598 }
599 static void xclose(int fd, const char *what, const char *what2) {
600   int r= close(fd);
601   if (r) sysdie("close %s%s",what,what2?what2:"");
602 }
603 static void xclose_perhaps(int *fd, const char *what, const char *what2) {
604   if (*fd <= 0) return;
605   xclose(*fd,what,what2);
606   *fd=0;
607 }
608
609 static pid_t xfork(const char *what) {
610   pid_t child;
611
612   child= fork();
613   if (child==-1) sysfatal("cannot fork for %s",what);
614   debug("forked %s %ld", what, (unsigned long)child);
615   if (!child) postfork();
616   return child;
617 }
618
619 static void on_fd_read_except(int fd, oop_call_fd callback) {
620   loop->on_fd(loop, fd, OOP_READ,      callback, 0);
621   loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0);
622 }
623 static void cancel_fd_read_except(int fd) {
624   loop->cancel_fd(loop, fd, OOP_READ);
625   loop->cancel_fd(loop, fd, OOP_EXCEPTION);
626 }
627
628 static void report_child_status(const char *what, int status) {
629   if (WIFEXITED(status)) {
630     int es= WEXITSTATUS(status);
631     if (es)
632       warn("%s: child died with error exit status %d", what, es);
633   } else if (WIFSIGNALED(status)) {
634     int sig= WTERMSIG(status);
635     const char *sigstr= strsignal(sig);
636     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
637     if (sigstr)
638       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
639     else
640       warn("%s: child died due to unknown fatal signal %d%s",
641            what, sig, coredump);
642   } else {
643     warn("%s: child died with unknown wait status %d", what,status);
644   }
645 }
646
647 static int xwaitpid(pid_t *pid, const char *what) {
648   int status;
649
650   int r= kill(*pid, SIGKILL);
651   if (r) sysdie("cannot kill %s child", what);
652
653   pid_t got= waitpid(*pid, &status, 0);
654   if (got==-1) sysdie("cannot reap %s child", what);
655   if (got==0) die("cannot reap %s child", what);
656
657   *pid= 0;
658
659   return status;
660 }
661
662 static void xunlink(const char *path, const char *what) {
663   int r= unlink(path);
664   if (r) sysdie("can't unlink %s %s", path, what);
665 }
666
667 static time_t xtime(void) {
668   time_t now= time(0);
669   if (now==-1) sysdie("time(2) failed");
670   return now;
671 }
672
673 static void xsigaction(int s, const struct sigaction *sa) {
674   int r= sigaction(s,sa,0);
675   if (r) sysdie("sigaction failed for \"%s\"", strsignal(s));
676 }
677
678 static void xgettimeofday(struct timeval *tv_r) {
679   int r= gettimeofday(tv_r,0);
680   if (r) sysdie("gettimeofday(2) failed");
681 }
682
683 static void xsetnonblock(int fd, int nonblocking) {
684   int errnoval= oop_fd_nonblock(fd, nonblocking);
685   if (errnoval) { errno= errnoval; sysdie("setnonblocking"); }
686 }
687
688 static void check_isreg(const struct stat *stab, const char *path,
689                         const char *what) {
690   if (!S_ISREG(stab->st_mode))
691     die("%s %s not a plain file (mode 0%lo)",
692         what, path, (unsigned long)stab->st_mode);
693 }
694
695 static void xfstat(int fd, struct stat *stab_r, const char *what) {
696   int r= fstat(fd, stab_r);
697   if (r) sysdie("could not fstat %s", what);
698 }
699
700 static void xfstat_isreg(int fd, struct stat *stab_r,
701                          const char *path, const char *what) {
702   xfstat(fd, stab_r, what);
703   check_isreg(stab_r, path, what);
704 }
705
706 static void xlstat_isreg(const char *path, struct stat *stab,
707                          int *enoent_r /* 0 means ENOENT is fatal */,
708                          const char *what) {
709   int r= lstat(path, stab);
710   if (r) {
711     if (errno==ENOENT && enoent_r) { *enoent_r=1; return; }
712     sysdie("could not lstat %s %s", what, path);
713   }
714   if (enoent_r) *enoent_r= 0;
715   check_isreg(stab, path, what);
716 }
717
718 static int samefile(const struct stat *a, const struct stat *b) {
719   assert(S_ISREG(a->st_mode));
720   assert(S_ISREG(b->st_mode));
721   return (a->st_ino == b->st_ino &&
722           a->st_dev == b->st_dev);
723 }
724
725 static char *sanitise(const char *input, int len) {
726   static char sanibuf[100]; /* returns pointer to this buffer! */
727
728   const char *p= input;
729   const char *endp= len>=0 ? input+len : 0;
730   char *q= sanibuf;
731   *q++= '`';
732   for (;;) {
733     if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; }
734     int c= (!endp || p<endp) ? *p++ : 0;
735     if (!c) { *q++= '\''; *q=0; break; }
736     if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
737     sprintf(q,"\\x%02x",c);
738     q += 4;
739   }
740   return sanibuf;
741 }
742
743 static int isewouldblock(int errnoval) {
744   return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
745 }
746
747 /*========== command and control connections ==========*/
748
749 static int control_master;
750
751 typedef struct ControlConn ControlConn;
752 struct ControlConn {
753   void (*destroy)(ControlConn*);
754   int fd;
755   oop_read *rd;
756   FILE *out;
757   union {
758     struct sockaddr sa;
759     struct sockaddr_un un;
760   } sa;
761   socklen_t salen;
762 };
763
764 static const oop_rd_style control_rd_style= {
765   OOP_RD_DELIM_STRIP, '\n',
766   OOP_RD_NUL_FORBID,
767   OOP_RD_SHORTREC_FORBID
768 };
769
770 static void control_destroy(ControlConn *cc) {
771   cc->destroy(cc);
772 }
773
774 static void control_checkouterr(ControlConn *cc /* may destroy*/) {
775   if (ferror(cc->out) | fflush(cc->out)) {
776     info("CTRL%d write error %s", cc->fd, strerror(errno));
777     control_destroy(cc);
778   }
779 }
780
781 static void control_prompt(ControlConn *cc /* may destroy*/) {
782   fprintf(cc->out, "%s| ", sitename);
783   control_checkouterr(cc);
784 }
785
786 struct ControlCommand {
787   const char *cmd;
788   void (*f)(ControlConn *cc, const ControlCommand *ccmd,
789             const char *arg, size_t argsz);
790   void *xdata;
791   int xval;
792 };
793
794 static const ControlCommand control_commands[];
795
796 #define CCMD(wh)                                                        \
797   static void ccmd_##wh(ControlConn *cc, const ControlCommand *c,       \
798                         const char *arg, size_t argsz)
799
800 CCMD(help) {
801   fputs("commands:\n", cc->out);
802   const ControlCommand *ccmd;
803   for (ccmd=control_commands; ccmd->cmd; ccmd++)
804     fprintf(cc->out, " %s\n", ccmd->cmd);
805 }
806
807 CCMD(period) { period(); }
808 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
809 CCMD(setint) { *(int*)c->xdata= c->xval; }
810 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
811 CCMD(dump);
812
813 CCMD(stop) {
814   preterminate();
815   notice("terminating (CTRL%d)",cc->fd);
816   defraise(SIGTERM);
817   abort();
818 }
819
820 static const ControlCommand control_commands[]= {
821   { "h",             ccmd_help      },
822   { "p",             ccmd_period    },
823   { "stop",          ccmd_stop      },
824   { "dump q",        ccmd_dump, 0,0 },
825   { "dump a",        ccmd_dump, 0,1 },
826
827 #define POKES(cmd,func)                                                 \
828   { cmd "sm",        func,           &sm_period_counter,       1 },     \
829   { cmd "conn",      func,           &until_connect,           0 },     \
830   { cmd "blscan",    func,           &until_backlog_nextscan,  0 },
831 POKES("prod ", ccmd_setint_period)
832 POKES("next ", ccmd_setint)
833
834   { "pretend flush", ccmd_setintarg, &simulate_flush             },
835   { "wedge blscan",  ccmd_setint,    &until_backlog_nextscan, -1 },
836   { 0 }
837 };
838
839 static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
840                            const char *errmsg, int errnoval,
841                            const char *data, size_t recsz, void *cc_v) {
842   ControlConn *cc= cc_v;
843
844   if (!data) {
845     info("CTRL%d closed", cc->fd);
846     cc->destroy(cc);
847     return OOP_CONTINUE;
848   }
849
850   if (recsz == 0) goto prompt;
851
852   const ControlCommand *ccmd;
853   for (ccmd=control_commands; ccmd->cmd; ccmd++) {
854     int l= strlen(ccmd->cmd);
855     if (recsz < l) continue;
856     if (recsz > l && data[l] != ' ') continue;
857     if (memcmp(data, ccmd->cmd, l)) continue;
858
859     int argl= (int)recsz - (l+1); 
860     ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
861     goto prompt;
862   }
863
864   fputs("unknown command; h for help\n", cc->out);
865
866  prompt:
867   control_prompt(cc);
868   return OOP_CONTINUE;
869 }
870
871 static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
872                             const char *errmsg, int errnoval,
873                             const char *data, size_t recsz, void *cc_v) {
874   ControlConn *cc= cc_v;
875   
876   info("CTRL%d read error %s", cc->fd, errmsg);
877   cc->destroy(cc);
878   return OOP_CONTINUE;
879 }
880
881 static int control_conn_startup(ControlConn *cc /* may destroy*/,
882                                 const char *how) {
883   cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
884   if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; }
885
886   int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND,
887                       control_rd_ok, cc,
888                       control_rd_err, cc);
889   if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; }
890
891   info("CTRL%d %s ready", cc->fd, how);
892   control_prompt(cc);
893   return 0;
894 }
895
896 static void control_stdio_destroy(ControlConn *cc) {
897   if (cc->rd) {
898     oop_rd_cancel(cc->rd);
899     errno= oop_rd_delete_tidy(cc->rd);
900     if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
901   }
902   free(cc);
903 }
904
905 static void control_stdio(void) {
906   ControlConn *cc= xmalloc(sizeof(*cc));
907   memset(cc,0,sizeof(*cc));
908   cc->destroy= control_stdio_destroy;
909
910   cc->fd= 0;
911   cc->out= stdout;
912   int r= control_conn_startup(cc,"stdio");
913   if (r) cc->destroy(cc);
914 }
915
916 static void control_accepted_destroy(ControlConn *cc) {
917   if (cc->rd) {
918     oop_rd_cancel(cc->rd);
919     oop_rd_delete_kill(cc->rd);
920   }
921   if (cc->out) { fclose(cc->out); cc->fd=0; }
922   close_perhaps(&cc->fd);
923   free(cc);
924 }
925
926 static void *control_master_readable(oop_source *lp, int master,
927                                      oop_event ev, void *u) {
928   ControlConn *cc= xmalloc(sizeof(*cc));
929   memset(cc,0,sizeof(*cc));
930   cc->destroy= control_accepted_destroy;
931
932   cc->salen= sizeof(cc->sa);
933   cc->fd= accept(master, &cc->sa.sa, &cc->salen);
934   if (cc->fd<0) { syswarn("error accepting control connection"); goto x; }
935
936   cc->out= fdopen(cc->fd, "w");
937   if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; }
938
939   int r= control_conn_startup(cc, "accepted");
940   if (r) goto x;
941
942   return OOP_CONTINUE;
943
944  x:
945   cc->destroy(cc);
946   return OOP_CONTINUE;
947 }
948
949 #define NOCONTROL(...) do{                                              \
950     syswarn("no control socket, because failed to " __VA_ARGS__);       \
951     goto nocontrol;                                                     \
952   }while(0)
953
954 static void control_init(void) {
955   char *real=0;
956   
957   union {
958     struct sockaddr sa;
959     struct sockaddr_un un;
960   } sa;
961
962   memset(&sa,0,sizeof(sa));
963   int maxlen= sizeof(sa.un.sun_path);
964
965   int reallen= readlink(path_control, sa.un.sun_path, maxlen);
966   if (reallen<0) {
967     if (errno != ENOENT)
968       NOCONTROL("readlink control socket symlink path %s", path_control);
969   }
970   if (reallen >= maxlen) {
971     debug("control socket symlink path too long (r=%d)",reallen);
972     xunlink(path_control, "old (overlong) control socket symlink");
973     reallen= -1;
974   }
975   
976   if (reallen<0) {
977     struct stat stab;
978     int r= lstat(realsockdir,&stab);
979     if (r) {
980       if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir);
981
982       r= mkdir(realsockdir, 0700);
983       if (r) NOCONTROL("mkdir real socket dir %s", realsockdir);
984
985     } else {
986       uid_t self= geteuid();
987       if (!S_ISDIR(stab.st_mode) ||
988           stab.st_uid != self ||
989           stab.st_mode & 0007) {
990         warn("no control socket, because real socket directory"
991              " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
992              !!S_ISDIR(stab.st_mode),
993              (unsigned long)stab.st_uid, (unsigned long)self,
994              (unsigned long)stab.st_mode & 0777UL);
995         goto nocontrol;
996       }
997     }
998
999     real= xasprintf("%s/s%lx.%lx", realsockdir,
1000                     (unsigned long)xtime(), (unsigned long)self_pid);
1001     int reallen= strlen(real);
1002
1003     if (reallen >= maxlen) {
1004       warn("no control socket, because tmpnam gave overly-long path"
1005            " %s", real);
1006       goto nocontrol;
1007     }
1008     r= symlink(real, path_control);
1009     if (r) NOCONTROL("make control socket path %s a symlink to real"
1010                      " socket path %s", path_control, real);
1011     memcpy(sa.un.sun_path, real, reallen);
1012   }
1013
1014   int r= unlink(sa.un.sun_path);
1015   if (r && errno!=ENOENT)
1016     NOCONTROL("remove old real socket %s", sa.un.sun_path);
1017
1018   control_master= socket(PF_UNIX, SOCK_STREAM, 0);
1019   if (control_master<0) NOCONTROL("create new control socket");
1020
1021   sa.un.sun_family= AF_UNIX;
1022   int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
1023   r= bind(control_master, &sa.sa, sl);
1024   if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path);
1025
1026   r= listen(control_master, 5);
1027   if (r) NOCONTROL("listen");
1028
1029   xsetnonblock(control_master, 1);
1030
1031   loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0);
1032   info("control socket ok, real path %s", sa.un.sun_path);
1033
1034   return;
1035
1036  nocontrol:
1037   free(real);
1038   xclose_perhaps(&control_master, "control master",0);
1039   return;
1040 }
1041
1042 /*========== management of connections ==========*/
1043
1044 static void conn_closefd(Conn *conn, const char *msgprefix) {
1045   int r= close_perhaps(&conn->fd);
1046   if (r) info("C%d %serror closing socket: %s",
1047               conn->fd, msgprefix, strerror(errno));
1048 }
1049
1050 static void conn_dispose(Conn *conn) {
1051   if (!conn) return;
1052   if (conn->rd) {
1053     oop_rd_cancel(conn->rd);
1054     oop_rd_delete_kill(conn->rd);
1055     conn->rd= 0;
1056   }
1057   if (conn->fd) {
1058     loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1059     loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
1060   }
1061   conn_closefd(conn,"");
1062   free(conn);
1063   until_connect= reconnect_delay_periods;
1064 }
1065
1066 static void *conn_exception(oop_source *lp, int fd,
1067                             oop_event ev, void *conn_v) {
1068   Conn *conn= conn_v;
1069   unsigned char ch;
1070   assert(fd == conn->fd);
1071   assert(ev == OOP_EXCEPTION);
1072   int r= read(conn->fd, &ch, 1);
1073   if (r<0) connfail(conn,"read failed: %s",strerror(errno));
1074   else connfail(conn,"exceptional condition on socket (peer sent urgent"
1075                 " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
1076   return OOP_CONTINUE;
1077 }  
1078
1079 static void vconnfail(Conn *conn, const char *fmt, va_list al) {
1080   int requeue[art_MaxState];
1081   memset(requeue,0,sizeof(requeue));
1082
1083   Article *art;
1084   while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art);
1085   while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art);
1086   while ((art= LIST_REMHEAD(conn->sent))) {
1087     requeue[art->state]++;
1088     if (art->state==art_Unsolicited) art->state= art_Unchecked;
1089     LIST_ADDTAIL(queue,art);
1090   }
1091
1092   int i;
1093   XmitDetails *d;
1094   for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
1095     xmit_free(d);
1096
1097   char *m= xvasprintf(fmt,al);
1098   warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
1099        conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
1100   free(m);
1101
1102   LIST_REMOVE(conns,conn);
1103   conn_dispose(conn);
1104   check_assign_articles();
1105 }
1106
1107 static void connfail(Conn *conn, const char *fmt, ...) {
1108   va_list al;
1109   va_start(al,fmt);
1110   vconnfail(conn,fmt,al);
1111   va_end(al);
1112 }
1113
1114 static void check_idle_conns(void) {
1115   Conn *conn;
1116   for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn))
1117     conn->since_activity++;
1118  search_again:
1119   for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) {
1120     if (conn->since_activity <= need_activity_periods) continue;
1121
1122     /* We need to shut this down */
1123     if (conn->quitting)
1124       connfail(conn,"timed out waiting for response to QUIT");
1125     else if (conn->sent.count)
1126       connfail(conn,"timed out waiting for responses");
1127     else if (conn->waiting.count || conn->priority.count)
1128       connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
1129     else if (conn->xmitu)
1130       connfail(conn,"peer has been sending responses"
1131                " before receiving our commands!");
1132     else {
1133       static const char quitcmd[]= "QUIT\r\n";
1134       int todo= sizeof(quitcmd)-1;
1135       const char *p= quitcmd;
1136       for (;;) {
1137         int r= write(conn->fd, p, todo);
1138         if (r<0) {
1139           if (isewouldblock(errno))
1140             connfail(conn, "blocked writing QUIT to idle connection");
1141           else
1142             connfail(conn, "failed to write QUIT to idle connection: %s",
1143                      strerror(errno));
1144           break;
1145         }
1146         assert(r<=todo);
1147         todo -= r;
1148         if (!todo) {
1149           conn->quitting= 1;
1150           conn->since_activity= 0;
1151           debug("C%d is idle, quitting", conn->fd);
1152           break;
1153         }
1154       }
1155     }
1156     goto search_again;
1157   }
1158 }  
1159
1160 /*---------- making new connections ----------*/
1161
1162 static pid_t connecting_child;
1163 static int connecting_fdpass_sock;
1164
1165 static void connect_attempt_discard(void) {
1166   if (connecting_child) {
1167     int status= xwaitpid(&connecting_child, "connect");
1168     if (!(WIFEXITED(status) ||
1169           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
1170       report_child_status("connect", status);
1171   }
1172   if (connecting_fdpass_sock) {
1173     cancel_fd_read_except(connecting_fdpass_sock);
1174     xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
1175   }
1176 }
1177
1178 #define PREP_DECL_MSG_CMSG(msg)                 \
1179   char msgbyte= 0;                              \
1180   struct iovec msgiov;                          \
1181   msgiov.iov_base= &msgbyte;                    \
1182   msgiov.iov_len= 1;                            \
1183   struct msghdr msg;                            \
1184   memset(&msg,0,sizeof(msg));                   \
1185   char msg##cbuf[CMSG_SPACE(sizeof(int))];      \
1186   msg.msg_iov= &msgiov;                         \
1187   msg.msg_iovlen= 1;                            \
1188   msg.msg_control= msg##cbuf;                   \
1189   msg.msg_controllen= sizeof(msg##cbuf);
1190
1191 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
1192   Conn *conn= 0;
1193
1194   assert(fd == connecting_fdpass_sock);
1195
1196   PREP_DECL_MSG_CMSG(msg);
1197   
1198   ssize_t rs= recvmsg(fd, &msg, 0);
1199   if (rs<0) {
1200     if (isewouldblock(errno)) return OOP_CONTINUE;
1201     syswarn("failed to read socket from connecting child");
1202     goto x;
1203   }
1204
1205   conn= xmalloc(sizeof(*conn));
1206   memset(conn,0,sizeof(*conn));
1207   LIST_INIT(conn->waiting);
1208   LIST_INIT(conn->priority);
1209   LIST_INIT(conn->sent);
1210
1211   struct cmsghdr *h= 0;
1212   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
1213   if (!h) {
1214     int status= xwaitpid(&connecting_child, "connect child (broken)");
1215
1216     if (WIFEXITED(status)) {
1217       if (WEXITSTATUS(status) != 0 &&
1218           WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
1219           WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
1220         /* child already reported the problem */;
1221       else {
1222         if (e == OOP_EXCEPTION)
1223           warn("connect: connection child exited code %d but"
1224                " unexpected exception on fdpass socket",
1225                WEXITSTATUS(status));
1226         else
1227           warn("connect: connection child exited code %d but"
1228                " no cmsg (rs=%d)",
1229                WEXITSTATUS(status), (int)rs);
1230       }
1231     } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
1232       warn("connect: connection attempt timed out");
1233     } else {
1234       report_child_status("connect", status);
1235     }
1236     goto x;
1237   }
1238
1239 #define CHK(field, val)                                                  \
1240   if (h->cmsg_##field != val) {                                          \
1241     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
1242         h->cmsg_##field, val);                                           \
1243     goto x;                                                              \
1244   }
1245   CHK(level, SOL_SOCKET);
1246   CHK(type,  SCM_RIGHTS);
1247   CHK(len,   CMSG_LEN(sizeof(conn->fd)));
1248 #undef CHK
1249
1250   if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
1251
1252   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
1253
1254   int status;
1255   pid_t got= waitpid(connecting_child, &status, 0);
1256   if (got==-1) sysdie("connect: real wait for child");
1257   assert(got == connecting_child);
1258   connecting_child= 0;
1259
1260   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
1261   int es= WEXITSTATUS(status);
1262   switch (es) {
1263   case CONNCHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
1264   case CONNCHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
1265   default:
1266     fatal("connect: child gave unexpected exit status %d", es);
1267   }
1268
1269   /* Phew! */
1270   conn->max_queue= conn->stream ? max_queue_per_conn : 1;
1271
1272   loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
1273   conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
1274   if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
1275   int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
1276                      &peer_rd_ok, conn,
1277                      &peer_rd_err, conn);
1278   if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
1279
1280   notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
1281   LIST_ADDHEAD(conns, conn);
1282
1283   connect_attempt_discard();
1284   check_assign_articles();
1285   return OOP_CONTINUE;
1286
1287  x:
1288   conn_dispose(conn);
1289   connect_attempt_discard();
1290   return OOP_CONTINUE;
1291 }
1292
1293 static int allow_connect_start(void) {
1294   return conns.count < max_connections
1295     && !connecting_child
1296     && !until_connect;
1297 }
1298
1299 static void connect_start(void) {
1300   assert(!connecting_child);
1301   assert(!connecting_fdpass_sock);
1302
1303   info("starting connection attempt");
1304
1305   int socks[2];
1306   int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
1307   if (r) { syswarn("connect: cannot create socketpair for child"); return; }
1308
1309   connecting_child= xfork("connection");
1310
1311   if (!connecting_child) {
1312     FILE *cn_from, *cn_to;
1313     char buf[NNTP_STRLEN+100];
1314     int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
1315
1316     xclose(socks[0], "(in child) parent's connection fdpass socket",0);
1317
1318     alarm(connection_setup_timeout);
1319     if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
1320       int l= strlen(buf);
1321       int stripped=0;
1322       while (l>0) {
1323         unsigned char c= buf[l-1];
1324         if (!isspace(c)) break;
1325         if (c=='\n' || c=='\r') stripped=1;
1326         --l;
1327       }
1328       if (!buf[0]) {
1329         sysfatal("connect: connection attempt failed");
1330       } else {
1331         buf[l]= 0;
1332         fatal("connect: %s: %s", stripped ? "rejected" : "failed",
1333               sanitise(buf,-1));
1334       }
1335     }
1336     if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
1337       sysfatal("connect: authentication failed");
1338     if (try_stream) {
1339       if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
1340           fflush(cn_to))
1341         sysfatal("connect: could not send MODE STREAM");
1342       buf[sizeof(buf)-1]= 0;
1343       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
1344         if (ferror(cn_from))
1345           sysfatal("connect: could not read response to MODE STREAM");
1346         else
1347           fatal("connect: connection close in response to MODE STREAM");
1348       }
1349       int l= strlen(buf);
1350       assert(l>=1);
1351       if (buf[l-1]!='\n')
1352         fatal("connect: response to MODE STREAM is too long: %.100s...",
1353               sanitise(buf,-1));
1354       l--;  if (l>0 && buf[l-1]=='\r') l--;
1355       buf[l]= 0;
1356       char *ep;
1357       int rcode= strtoul(buf,&ep,10);
1358       if (ep != &buf[3])
1359         fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
1360
1361       switch (rcode) {
1362       case 203:
1363         exitstatus= CONNCHILD_ESTATUS_STREAM;
1364         break;
1365       case 480:
1366       case 500:
1367         break;
1368       default:
1369         warn("connect: unexpected response to MODE STREAM: %.50s",
1370              sanitise(buf,-1));
1371         exitstatus= 2;
1372         break;
1373       }
1374     }
1375     int fd= fileno(cn_from);
1376
1377     PREP_DECL_MSG_CMSG(msg);
1378     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
1379     cmsg->cmsg_level= SOL_SOCKET;
1380     cmsg->cmsg_type=  SCM_RIGHTS;
1381     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
1382     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
1383
1384     msg.msg_controllen= cmsg->cmsg_len;
1385     r= sendmsg(socks[1], &msg, 0);
1386     if (r<0) sysdie("sendmsg failed for new connection");
1387     if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
1388
1389     _exit(exitstatus);
1390   }
1391
1392   xclose(socks[1], "connecting fdpass child's socket",0);
1393   connecting_fdpass_sock= socks[0];
1394   xsetnonblock(connecting_fdpass_sock, 1);
1395   on_fd_read_except(connecting_fdpass_sock, connchild_event);
1396 }
1397
1398 /*---------- assigning articles to conns, and transmitting ----------*/
1399
1400 static void check_assign_articles(void) {
1401   for (;;) {
1402     if (!queue.count)
1403       break;
1404
1405     Conn *walk, *use=0;
1406     int spare=0, inqueue=0;
1407
1408     /* Find a connection to offer this article.  We prefer a busy
1409      * connection to an idle one, provided it's not full.  We take the
1410      * first (oldest) and since that's stable, it will mean we fill up
1411      * connections in order.  That way if we have too many
1412      * connections, the spare ones will go away eventually.
1413      */
1414     for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) {
1415       if (walk->quitting) continue;
1416       inqueue= walk->sent.count + walk->priority.count
1417              + walk->waiting.count;
1418       spare= walk->max_queue - inqueue;
1419       assert(inqueue <= max_queue_per_conn);
1420       assert(spare >= 0);
1421       if (inqueue==0) /*idle*/ { if (!use) use= walk; }
1422       else if (spare>0) /*working*/ { use= walk; break; }
1423     }
1424     if (use) {
1425       if (!inqueue) use->since_activity= 0; /* reset idle counter */
1426       while (spare>0) {
1427         Article *art= LIST_REMHEAD(queue);
1428         if (!art) break;
1429         LIST_ADDTAIL(use->waiting, art);
1430         spare--;
1431       }
1432       conn_maybe_write(use);
1433     } else if (allow_connect_start()) {
1434       until_connect= reconnect_delay_periods;
1435       connect_start();
1436       break;
1437     } else {
1438       break;
1439     }
1440   }
1441 }
1442
1443 static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
1444   conn_maybe_write(u);
1445   return OOP_CONTINUE;
1446 }
1447
1448 static void conn_maybe_write(Conn *conn)  {
1449   for (;;) {
1450     conn_make_some_xmits(conn);
1451     if (!conn->xmitu) {
1452       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
1453       return;
1454     }
1455
1456     void *rp= conn_write_some_xmits(conn);
1457     if (rp==OOP_CONTINUE) {
1458       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
1459       return;
1460     } else if (rp==OOP_HALT) {
1461       return;
1462     } else if (!rp) {
1463       /* transmitted everything */
1464     } else {
1465       abort();
1466     }
1467   }
1468 }
1469
1470 /*========== article transmission ==========*/
1471
1472 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
1473                   XmitKind kind) { /* caller must then fill in details */
1474   struct iovec *v= &conn->xmit[conn->xmitu];
1475   XmitDetails *d= &conn->xmitd[conn->xmitu++];
1476   v->iov_base= (char*)data;
1477   v->iov_len= len;
1478   d->kind= kind;
1479   return d;
1480 }
1481
1482 static void xmit_noalloc(Conn *conn, const char *data, int len) {
1483   xmit_core(conn,data,len, xk_Const);
1484 }
1485 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
1486
1487 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
1488   XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
1489   d->info.sm_art= ah;
1490 }
1491
1492 static void xmit_free(XmitDetails *d) {
1493   switch (d->kind) {
1494   case xk_Malloc:  free(d->info.malloc_tofree);   break;
1495   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
1496   case xk_Const:                                  break;
1497   default: abort();
1498   }
1499 }
1500
1501 static void *conn_write_some_xmits(Conn *conn) {
1502   /* return values:
1503    *      0:            nothing more to write, no need to call us again
1504    *      OOP_CONTINUE: more to write but fd not writeable
1505    *      OOP_HALT:     disaster, have destroyed conn
1506    */
1507   for (;;) {
1508     int count= conn->xmitu;
1509     if (!count) return 0;
1510
1511     if (count > IOV_MAX) count= IOV_MAX;
1512     ssize_t rs= writev(conn->fd, conn->xmit, count);
1513     if (rs < 0) {
1514       if (isewouldblock(errno)) return OOP_CONTINUE;
1515       connfail(conn, "write failed: %s", strerror(errno));
1516       return OOP_HALT;
1517     }
1518     assert(rs > 0);
1519
1520     int done;
1521     for (done=0; rs && done<conn->xmitu; done++) {
1522       struct iovec *vp= &conn->xmit[done];
1523       XmitDetails *dp= &conn->xmitd[done];
1524       if (rs > vp->iov_len) {
1525         rs -= vp->iov_len;
1526         xmit_free(dp);
1527       } else {
1528         vp->iov_base= (char*)vp->iov_base + rs;
1529         vp->iov_len -= rs;
1530       }
1531     }
1532     int newu= conn->xmitu - done;
1533     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
1534     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
1535     conn->xmitu= newu;
1536   }
1537 }
1538
1539 static void conn_make_some_xmits(Conn *conn) {
1540   for (;;) {
1541     if (conn->xmitu+5 > CONNIOVS)
1542       break;
1543
1544     Article *art= LIST_REMHEAD(conn->priority);
1545     if (!art) art= LIST_REMHEAD(conn->waiting);
1546     if (!art) break;
1547
1548     if (art->state >= art_Wanted || (conn->stream && nocheck)) {
1549       /* actually send it */
1550
1551       ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
1552
1553       art->state=
1554         art->state == art_Unchecked ? art_Unsolicited :
1555         art->state == art_Wanted    ? art_Wanted      :
1556         (abort(),-1);
1557
1558       if (!artdata) art->missing= 1;
1559       art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
1560
1561       if (conn->stream) {
1562         if (artdata) {
1563           XMIT_LITERAL("TAKETHIS ");
1564           xmit_noalloc(conn, art->messageid, art->midlen);
1565           XMIT_LITERAL("\r\n");
1566           xmit_artbody(conn, artdata);
1567         } else {
1568           article_done(conn, art, -1);
1569           continue;
1570         }
1571       } else {
1572         /* we got 235 from IHAVE */
1573         if (artdata) {
1574           xmit_artbody(conn, artdata);
1575         } else {
1576           XMIT_LITERAL(".\r\n");
1577         }
1578       }
1579
1580       LIST_ADDTAIL(conn->sent, art);
1581
1582     } else {
1583       /* check it */
1584
1585       if (conn->stream)
1586         XMIT_LITERAL("CHECK ");
1587       else
1588         XMIT_LITERAL("IHAVE ");
1589       xmit_noalloc(conn, art->messageid, art->midlen);
1590       XMIT_LITERAL("\r\n");
1591
1592       assert(art->state == art_Unchecked);
1593       art->ipf->counts[art->state][RC_sent]++;
1594       LIST_ADDTAIL(conn->sent, art);
1595     }
1596   }
1597 }
1598
1599
1600 /*========== handling responses from peer ==========*/
1601
1602 static const oop_rd_style peer_rd_style= {
1603   OOP_RD_DELIM_STRIP, '\n',
1604   OOP_RD_NUL_FORBID,
1605   OOP_RD_SHORTREC_FORBID
1606 };
1607
1608 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
1609                          const char *errmsg, int errnoval,
1610                          const char *data, size_t recsz, void *conn_v) {
1611   Conn *conn= conn_v;
1612   connfail(conn, "error receiving from peer: %s", errmsg);
1613   return OOP_CONTINUE;
1614 }
1615
1616 static Article *article_reply_check(Conn *conn, const char *response,
1617                                     int code_indicates_streaming,
1618                                     int must_have_sent
1619                                         /* 1:yes, -1:no, 0:dontcare */,
1620                                     const char *sanitised_response) {
1621   Article *art= LIST_HEAD(conn->sent);
1622
1623   if (!art) {
1624     connfail(conn,
1625              "peer gave unexpected response when no commands outstanding: %s",
1626              sanitised_response);
1627     return 0;
1628   }
1629
1630   if (code_indicates_streaming) {
1631     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
1632     if (!conn->stream) {
1633       connfail(conn, "peer gave streaming response code "
1634                " to IHAVE or subsequent body: %s", sanitised_response);
1635       return 0;
1636     }
1637     const char *got_mid= response+4;
1638     int got_midlen= strcspn(got_mid, " \n\r");
1639     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
1640       connfail(conn, "peer gave streaming response with syntactically invalid"
1641                " messageid: %s", sanitised_response);
1642       return 0;
1643     }
1644     if (got_midlen != art->midlen ||
1645         memcmp(got_mid, art->messageid, got_midlen)) {
1646       connfail(conn, "peer gave streaming response code to wrong article -"
1647                " probable synchronisation problem; we offered: %s;"
1648                " peer said: %s",
1649                art->messageid, sanitised_response);
1650       return 0;
1651     }
1652   } else {
1653     if (conn->stream) {
1654       connfail(conn, "peer gave non-streaming response code to"
1655                " CHECK/TAKETHIS: %s", sanitised_response);
1656       return 0;
1657     }
1658   }
1659
1660   if (must_have_sent>0 && art->state < art_Wanted) {
1661     connfail(conn, "peer says article accepted but"
1662              " we had not sent the body: %s", sanitised_response);
1663     return 0;
1664   }
1665   if (must_have_sent<0 && art->state >= art_Wanted) {
1666     connfail(conn, "peer says please sent the article but we just did: %s",
1667              sanitised_response);
1668     return 0;
1669   }
1670
1671   Article *art_again= LIST_REMHEAD(conn->sent);
1672   assert(art_again == art);
1673   return art;
1674 }
1675
1676 static void update_nocheck(int accepted) {
1677   accept_proportion *= nocheck_decay;
1678   accept_proportion += accepted * (1.0 - nocheck_decay);
1679   int new_nocheck= accept_proportion >= nocheck_thresh;
1680   if (new_nocheck && !nocheck_reported) {
1681     notice("entering nocheck mode for the first time");
1682     nocheck_reported= 1;
1683   } else if (new_nocheck != nocheck) {
1684     debug("nocheck mode %s", new_nocheck ? "start" : "stop");
1685   }
1686   nocheck= new_nocheck;
1687 }
1688
1689 static void article_done(Conn *conn, Article *art, int whichcount) {
1690   if (!art->missing) art->ipf->counts[art->state][whichcount]++;
1691
1692   if (whichcount == RC_accepted) update_nocheck(1);
1693   else if (whichcount == RC_unwanted) update_nocheck(0);
1694
1695   InputFile *ipf= art->ipf;
1696
1697   while (art->blanklen) {
1698     static const char spaces[]=
1699       "                                                                "
1700       "                                                                "
1701       "                                                                "
1702       "                                                                "
1703       "                                                                "
1704       "                                                                "
1705       "                                                                "
1706       "                                                                "
1707       "                                                                ";
1708     int w= art->blanklen;  if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
1709     int r= pwrite(ipf->fd, spaces, w, art->offset);
1710     if (r==-1) {
1711       if (errno==EINTR) continue;
1712       sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
1713              art->messageid, art->blanklen,
1714              (unsigned long)art->offset, ipf->path);
1715     }
1716     assert(r>=0 && r<=w);
1717     art->blanklen -= w;
1718     art->offset += w;
1719   }
1720
1721   ipf->inprogress--;
1722   assert(ipf->inprogress >= 0);
1723   free(art);
1724
1725   if (!ipf->inprogress && ipf != main_input_file)
1726     queue_check_input_done();
1727 }
1728
1729 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
1730                         const char *errmsg, int errnoval,
1731                         const char *data, size_t recsz, void *conn_v) {
1732   Conn *conn= conn_v;
1733
1734   if (ev == OOP_RD_EOF) {
1735     connfail(conn, "unexpected EOF from peer");
1736     return OOP_CONTINUE;
1737   }
1738   assert(ev == OOP_RD_OK);
1739
1740   char *sani= sanitise(data,-1);
1741
1742   char *ep;
1743   unsigned long code= strtoul(data, &ep, 10);
1744   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
1745     connfail(conn, "badly formatted response from peer: %s", sani);
1746     return OOP_CONTINUE;
1747   }
1748
1749   int conn_busy=
1750     conn->waiting.count ||
1751     conn->priority.count ||
1752     conn->sent.count ||
1753     conn->xmitu;
1754
1755   if (conn->quitting) {
1756     if (code!=205 && code!=503) {
1757       connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
1758     } else {
1759       notice("C%d idle connection closed by us", conn->fd);
1760       assert(!conn_busy);
1761       LIST_REMOVE(conns,conn);
1762       conn_dispose(conn);
1763     }
1764     return OOP_CONTINUE;
1765   }
1766
1767   conn->since_activity= 0;
1768   Article *art;
1769
1770 #define GET_ARTICLE(musthavesent) do{                                         \
1771     art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
1772     if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
1773   }while(0) 
1774
1775 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{       \
1776     code_streaming= (streaming);                                \
1777     GET_ARTICLE(musthavesent);                                  \
1778     article_done(conn, art, RC_##how);                          \
1779     goto dealtwith;                                             \
1780   }while(0)
1781
1782 #define PEERBADMSG(m) do {                                      \
1783     connfail(conn, m ": %s", sani);  return OOP_CONTINUE;       \
1784   }while(0)
1785
1786   int code_streaming= 0;
1787
1788   switch (code) {
1789
1790   case 400: PEERBADMSG("peer stopped accepting articles");
1791   default:  PEERBADMSG("peer sent unexpected message");
1792
1793   case 503:
1794     if (conn_busy) PEERBADMSG("peer timed us out");
1795     notice("C%d idle connection closed by peer", conn->fd);
1796     LIST_REMOVE(conns,conn);
1797     conn_dispose(conn);
1798     return OOP_CONTINUE;
1799
1800   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
1801   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
1802
1803   case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
1804   case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
1805
1806   case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
1807   case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
1808
1809   case 238: /* CHECK says send it */
1810     code_streaming= 1;
1811   case 335: /* IHAVE says send it */
1812     GET_ARTICLE(-1);
1813     assert(art->state == art_Unchecked);
1814     art->ipf->counts[art->state][RC_accepted]++;
1815     art->state= art_Wanted;
1816     LIST_ADDTAIL(conn->priority, art);
1817     break;
1818
1819   case 431: /* CHECK or TAKETHIS says try later */
1820     code_streaming= 1;
1821   case 436: /* IHAVE says try later */
1822     GET_ARTICLE(0);
1823     open_defer();
1824     if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
1825         || fflush(defer))
1826       sysfatal("write to defer file %s",path_defer);
1827     article_done(conn, art, RC_deferred);
1828     break;
1829
1830   }
1831 dealtwith:
1832
1833   conn_maybe_write(conn);
1834   check_assign_articles();
1835   return OOP_CONTINUE;
1836 }
1837
1838
1839 /*========== monitoring of input files ==========*/
1840
1841 static void feedfile_eof(InputFile *ipf) {
1842   assert(ipf != main_input_file); /* promised by tailing_try_read */
1843   inputfile_reading_stop(ipf);
1844
1845   if (ipf == flushing_input_file) {
1846     assert(sms==sm_SEPARATED || sms==sm_DROPPING);
1847     if (main_input_file) inputfile_reading_start(main_input_file);
1848     statemc_check_flushing_done();
1849   } else if (ipf == backlog_input_file) {
1850     statemc_check_backlog_done();
1851   } else {
1852     abort(); /* supposed to wait rather than get EOF on main input file */
1853   }
1854 }
1855
1856 static InputFile *open_input_file(const char *path) {
1857   int fd= open(path, O_RDWR);
1858   if (fd<0) {
1859     if (errno==ENOENT) return 0;
1860     sysfatal("unable to open input file %s", path);
1861   }
1862   assert(fd>0);
1863
1864   InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
1865   memset(ipf,0,sizeof(*ipf));
1866
1867   ipf->fd= fd;
1868   strcpy(ipf->path, path);
1869
1870   return ipf;
1871 }
1872
1873 static void close_input_file(InputFile *ipf) { /* does not free */
1874   assert(!ipf->readable_callback); /* must have had ->on_cancel */
1875   assert(!ipf->filemon); /* must have had inputfile_reading_stop */
1876   assert(!ipf->rd); /* must have had inputfile_reading_stop */
1877   assert(!ipf->inprogress); /* no dangling pointers pointing here */
1878   xclose_perhaps(&ipf->fd, "input file ", ipf->path);
1879 }
1880
1881
1882 /*---------- dealing with articles read in the input file ----------*/
1883
1884 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
1885                                    const char *data, const char *how) {
1886   warn("corrupted file: %s, offset %lu: %s: in %s",
1887        ipf->path, (unsigned long)offset, how, sanitise(data,-1));
1888   ipf->readcount_err++;
1889   if (ipf->readcount_err > max_bad_data_initial +
1890       (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
1891     die("too much garbage in input file!  (%d errs, %d ok, %d blank)",
1892         ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
1893   return OOP_CONTINUE;
1894 }
1895
1896 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
1897                                oop_rd_event ev, const char *errmsg,
1898                                int errnoval, const char *data, size_t recsz,
1899                                void *ipf_v) {
1900   InputFile *ipf= ipf_v;
1901   assert(ev == OOP_RD_SYSTEM);
1902   errno= errnoval;
1903   sysdie("error reading input file: %s, offset %lu",
1904          ipf->path, (unsigned long)ipf->offset);
1905 }
1906
1907 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
1908                                   oop_rd_event ev, const char *errmsg,
1909                                   int errnoval, const char *data, size_t recsz,
1910                                   void *ipf_v) {
1911   InputFile *ipf= ipf_v;
1912   Article *art;
1913   char tokentextbuf[sizeof(TOKEN)*2+3];
1914
1915   if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
1916
1917   off_t old_offset= ipf->offset;
1918   ipf->offset += recsz + 1;
1919
1920 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
1921
1922   if (ev==OOP_RD_PARTREC)
1923     feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
1924     /* but process it anyway */
1925
1926   if (ipf->skippinglong) {
1927     if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
1928     return OOP_CONTINUE;
1929   }
1930   if (ev==OOP_RD_LONG) {
1931     ipf->skippinglong= 1;
1932     X_BAD_DATA("overly long line");
1933   }
1934
1935   if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
1936   if (!recsz) X_BAD_DATA("empty line");
1937
1938   if (data[0]==' ') {
1939     if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
1940     ipf->readcount_blank++;
1941     return OOP_CONTINUE;
1942   }
1943
1944   char *space= strchr(data,' ');
1945   int tokenlen= space-data;
1946   int midlen= (int)recsz-tokenlen-1;
1947   if (midlen <= 2) X_BAD_DATA("no room for messageid");
1948   if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
1949
1950   if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
1951   memcpy(tokentextbuf, data, tokenlen);
1952   tokentextbuf[tokenlen]= 0;
1953   if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
1954
1955   ipf->readcount_ok++;
1956
1957   art= xmalloc(sizeof(*art) - 1 + midlen + 1);
1958   memset(art,0,sizeof(art));
1959   art->state= art_Unchecked;
1960   art->midlen= midlen;
1961   art->ipf= ipf;  ipf->inprogress++;
1962   art->token= TextToToken(tokentextbuf);
1963   art->offset= old_offset;
1964   art->blanklen= recsz;
1965   strcpy(art->messageid, space+1);
1966   LIST_ADDTAIL(queue, art);
1967
1968   if (sms==sm_NORMAL && ipf==main_input_file &&
1969       ipf->offset >= target_max_feedfile_size)
1970     statemc_start_flush("feed file size");
1971
1972   check_assign_articles();
1973   return OOP_CONTINUE;
1974 }
1975
1976 /*========== tailing input file ==========*/
1977
1978 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
1979                                      void *user) {
1980   InputFile *ipf= user;
1981   return ipf->readable_callback(loop, &ipf->readable,
1982                                 ipf->readable_callback_user);
1983 }
1984
1985 static void tailing_on_cancel(struct oop_readable *rable) {
1986   InputFile *ipf= (void*)rable;
1987
1988   if (ipf->filemon) filemon_stop(ipf);
1989   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
1990   ipf->readable_callback= 0;
1991 }
1992
1993 static void tailing_queue_readable(InputFile *ipf) {
1994   /* lifetime of ipf here is OK because destruction will cause
1995    * on_cancel which will cancel this callback */
1996   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
1997 }
1998
1999 static int tailing_on_readable(struct oop_readable *rable,
2000                                 oop_readable_call *cb, void *user) {
2001   InputFile *ipf= (void*)rable;
2002
2003   tailing_on_cancel(rable);
2004   ipf->readable_callback= cb;
2005   ipf->readable_callback_user= user;
2006   filemon_start(ipf);
2007
2008   tailing_queue_readable(ipf);
2009   return 0;
2010 }
2011
2012 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
2013                                 size_t length) {
2014   InputFile *ipf= (void*)rable;
2015   for (;;) {
2016     ssize_t r= read(ipf->fd, buffer, length);
2017     if (r==-1) {
2018       if (errno==EINTR) continue;
2019       return r;
2020     }
2021     if (!r) {
2022       if (ipf==main_input_file) {
2023         errno=EAGAIN;
2024         return -1;
2025       } else if (ipf==flushing_input_file) {
2026         assert(ipf->rd);
2027         assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2028       } else if (ipf==backlog_input_file) {
2029         assert(ipf->rd);
2030       } else {
2031         abort();
2032       }
2033     }
2034     tailing_queue_readable(ipf);
2035     return r;
2036   }
2037 }
2038
2039 /*---------- filemon implemented with inotify ----------*/
2040
2041 #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
2042 #define HAVE_FILEMON
2043
2044 #include <sys/inotify.h>
2045
2046 static int filemon_inotify_fd;
2047 static int filemon_inotify_wdmax;
2048 static InputFile **filemon_inotify_wd2ipf;
2049
2050 struct Filemon_Perfile {
2051   int wd;
2052 };
2053
2054 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
2055   int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
2056   if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path);
2057
2058   if (wd >= filemon_inotify_wdmax) {
2059     int newmax= wd+2;
2060     filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
2061                                  sizeof(*filemon_inotify_wd2ipf) * newmax);
2062     memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
2063            sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
2064     filemon_inotify_wdmax= newmax;
2065   }
2066
2067   assert(!filemon_inotify_wd2ipf[wd]);
2068   filemon_inotify_wd2ipf[wd]= ipf;
2069
2070   debug("filemon inotify startfile %p wd=%d wdmax=%d",
2071         ipf, wd, filemon_inotify_wdmax);
2072
2073   pf->wd= wd;
2074 }
2075
2076 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
2077   int wd= pf->wd;
2078   debug("filemon inotify stopfile %p wd=%d", ipf, wd);
2079   int r= inotify_rm_watch(filemon_inotify_fd, wd);
2080   if (r) sysdie("inotify_rm_watch");
2081   filemon_inotify_wd2ipf[wd]= 0;
2082 }
2083
2084 static void *filemon_inotify_readable(oop_source *lp, int fd,
2085                                       oop_event e, void *u) {
2086   struct inotify_event iev;
2087   for (;;) {
2088     int r= read(filemon_inotify_fd, &iev, sizeof(iev));
2089     if (r==-1) {
2090       if (isewouldblock(errno)) break;
2091       sysdie("read from inotify master");
2092     } else if (r==sizeof(iev)) {
2093       assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
2094     } else {
2095       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
2096     }
2097     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
2098     debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
2099     filemon_callback(ipf);
2100   }
2101   return OOP_CONTINUE;
2102 }
2103
2104 static int filemon_method_init(void) {
2105   filemon_inotify_fd= inotify_init();
2106   if (filemon_inotify_fd<0) {
2107     syswarn("filemon/inotify: inotify_init failed");
2108     return 0;
2109   }
2110   xsetnonblock(filemon_inotify_fd, 1);
2111   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
2112
2113   debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
2114   return 1;
2115 }
2116
2117 static void filemon_method_dump_info(FILE *f) {
2118   int i;
2119   fprintf(f,"inotify");
2120   DUMPV("%d",,filemon_inotify_fd);
2121   DUMPV("%d",,filemon_inotify_wdmax);
2122   for (i=0; i<filemon_inotify_wdmax; i++)
2123     fprintf(f," wd2ipf[%d]=%p\n", i, filemon_inotify_wd2ipf[i],);
2124 }
2125
2126 #endif /* HAVE_INOTIFY && !HAVE_FILEMON */
2127
2128 /*---------- filemon dummy implementation ----------*/
2129
2130 #if !defined(HAVE_FILEMON)
2131
2132 struct Filemon_Perfile { int dummy; };
2133
2134 static int filemon_method_init(void) {
2135   warn("filemon/dummy: no filemon method compiled in");
2136   return 0;
2137 }
2138 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
2139 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
2140 static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
2141
2142 #endif /* !HAVE_FILEMON */
2143
2144 /*---------- filemon generic interface ----------*/
2145
2146 static void filemon_start(InputFile *ipf) {
2147   assert(!ipf->filemon);
2148
2149   ipf->filemon= xmalloc(sizeof(*ipf->filemon));
2150   memset(ipf->filemon, 0, sizeof(*ipf->filemon));
2151   filemon_method_startfile(ipf, ipf->filemon);
2152 }
2153
2154 static void filemon_stop(InputFile *ipf) {
2155   if (!ipf->filemon) return;
2156   filemon_method_stopfile(ipf, ipf->filemon);
2157   free(ipf->filemon);
2158   ipf->filemon= 0;
2159 }
2160
2161 static void filemon_callback(InputFile *ipf) {
2162   if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
2163     ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
2164 }
2165
2166 /*---------- interface to start and stop an input file ----------*/
2167
2168 static const oop_rd_style feedfile_rdstyle= {
2169   OOP_RD_DELIM_STRIP, '\n',
2170   OOP_RD_NUL_PERMIT,
2171   OOP_RD_SHORTREC_LONG,
2172 };
2173
2174 static void inputfile_reading_start(InputFile *ipf) {
2175   assert(!ipf->rd);
2176   ipf->readable.on_readable= tailing_on_readable;
2177   ipf->readable.on_cancel=   tailing_on_cancel;
2178   ipf->readable.try_read=    tailing_try_read;
2179   ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
2180   ipf->readable.delete_kill= 0;
2181
2182   ipf->readable_callback= 0;
2183   ipf->readable_callback_user= 0;
2184
2185   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
2186   assert(ipf->rd);
2187
2188   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
2189                      feedfile_got_article,ipf, feedfile_read_err, ipf);
2190   if (r) sysdie("unable start reading feedfile %s",ipf->path);
2191 }
2192
2193 static void inputfile_reading_stop(InputFile *ipf) {
2194   assert(ipf->rd);
2195   oop_rd_cancel(ipf->rd);
2196   oop_rd_delete(ipf->rd);
2197   ipf->rd= 0;
2198   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
2199 }
2200
2201
2202 /*========== interaction with innd - state machine ==========*/
2203
2204 /* See official state diagram at top of file.  We implement
2205  * this as follows:
2206  * -8<-
2207
2208             .=======.
2209             ||START||
2210             `======='
2211                 |
2212                 | open F
2213                 |
2214                 |    F ENOENT
2215                 |`---------------------------------------------------.
2216       F OPEN OK |                                                    |
2217                 |`---------------- - - -                             |
2218        D ENOENT |       D EXISTS   see OVERALL STATES diagram        |
2219                 |                  for full startup logic            |
2220      ,--------->|                                                    |
2221      |          V                                                    |
2222      |     ============                                       try to |
2223      |      NORMAL                                            open D |
2224      |     [Normal]                                                  |
2225      |      main F tail                                              |
2226      |     ============                                              V
2227      |          |                                                    |
2228      |          | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT            |
2229      ^          | hardlink F to D                                    |
2230      |     [Hardlinked]                                              |
2231      |          | unlink F                                           |
2232      |          | our handle onto F is now onto D                    |
2233      |     [Moved]                                                   |
2234      |          |                                                    |
2235      |          |<-------------------<---------------------<---------+
2236      |          |                                                    |
2237      |          | spawn inndcomm flush                               |
2238      |          V                                                    |
2239      |     ==================                                        |
2240      |      FLUSHING[-ABSENT]                                        |
2241      |     [Flushing]                                                |
2242      |     main D tail/none                                          |
2243      |     ==================                                        |
2244      |          |                                                    |
2245      |          |   INNDCOMM FLUSH FAILS                             ^
2246      |          |`----------------------->----------.                |
2247      |          |                                   |                |
2248      |          |   NO SUCH SITE                    V                |
2249      ^          |`--------------->----.         ==================== |
2250      |          |                      \        FLUSHFAILED[-ABSENT] |
2251      |          |                       \         [Moved]            |
2252      |          | FLUSH OK               \       main D tail/none    |
2253      |          | open F                  \     ==================== |
2254      |          |                          \        |                |
2255      |          |                           \       | TIME TO RETRY  |
2256      |          |`------->----.     ,---<---'\      `----------------'
2257      |          |    D NONE   |     | D NONE  `----.
2258      |          V             |     |              V
2259      |     =============      V     V             ============
2260      |      SEPARATED-1       |     |              DROPPING-1
2261      |      flsh->rd!=0       |     |              flsh->rd!=0
2262      |     [Separated]        |     |             [Dropping]
2263      |      main F idle       |     |              main none
2264      |      old D tail        |     |              old D tail
2265      |     =============      |     |             ============
2266      |          |             |     | install       |
2267      ^          | EOF ON D    |     |  defer        | EOF ON D
2268      |          V             |     |               V
2269      |     ===============    |     |             ===============
2270      |      SEPARATED-2       |     |              DROPPING-2
2271      |      flsh->rd==0       |     V              flsh->rd==0
2272      |     [Finishing]        |     |             [Dropping]
2273      |      main F tail       |     `.             main none
2274      |      old D closed      |       `.           old D closed
2275      |     ===============    V         `.        ===============
2276      |          |                         `.          |
2277      |          | ALL D PROCESSED           `.        | ALL D PROCESSED
2278      |          V install defer as backlog    `.      | install defer
2279      ^          | close D                       `.    | close D
2280      |          | unlink D                        `.  | unlink D
2281      |          |                                  |  |
2282      |          |                                  V  V
2283      `----------'                               ==============
2284                                                  DROPPED
2285                                                 [Dropped]
2286                                                  main none
2287                                                  old none
2288                                                  some backlog
2289                                                 ==============
2290                                                       |
2291                                                       | ALL BACKLOG DONE
2292                                                       |
2293                                                       | unlink lock
2294                                                       | exit
2295                                                       V
2296                                                   ==========
2297                                                    (ESRCH)
2298                                                   [Droppped]
2299                                                   ==========
2300  * ->8-
2301  */
2302
2303 static void startup_set_input_file(InputFile *f) {
2304   assert(!main_input_file);
2305   main_input_file= f;
2306   inputfile_reading_start(f);
2307 }
2308
2309 static void statemc_lock(void) {
2310   int lockfd;
2311   struct stat stab, stabf;
2312   
2313   for (;;) {
2314     lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
2315     if (lockfd<0) sysfatal("open lockfile %s", path_lock);
2316
2317     struct flock fl;
2318     memset(&fl,0,sizeof(fl));
2319     fl.l_type= F_WRLCK;
2320     fl.l_whence= SEEK_SET;
2321     int r= fcntl(lockfd, F_SETLK, &fl);
2322     if (r==-1) {
2323       if (errno==EACCES || isewouldblock(errno)) {
2324         if (quiet_multiple) exit(0);
2325         fatal("another duct holds the lockfile");
2326       }
2327       sysfatal("fcntl F_SETLK lockfile %s", path_lock);
2328     }
2329
2330     xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
2331     int lock_noent;
2332     xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
2333
2334     if (!lock_noent && samefile(&stab, &stabf))
2335       break;
2336
2337     xclose(lockfd, "stale lockfile ", path_lock);
2338   }
2339
2340   FILE *lockfile= fdopen(lockfd, "w");
2341   if (!lockfile) sysdie("fdopen lockfile");
2342
2343   int r= ftruncate(lockfd, 0);
2344   if (r) sysdie("truncate lockfile to write new info");
2345
2346   if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
2347               (unsigned long)self_pid,
2348               sitename, feedfile, remote_host) == EOF ||
2349       fflush(lockfile))
2350     sysfatal("write info to lockfile %s", path_lock);
2351
2352   debug("startup: locked");
2353 }
2354
2355 static void statemc_init(void) {
2356   struct stat stabdefer;
2357
2358   search_backlog_file();
2359
2360   int defer_noent;
2361   xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
2362   if (defer_noent) {
2363     debug("startup: ductdefer ENOENT");
2364   } else {
2365     debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
2366     switch (stabdefer.st_nlink==1) {
2367     case 1:
2368       open_defer(); /* so that we will later close it and rename it */
2369       break;
2370     case 2:
2371       xunlink(path_defer, "stale defer file link"
2372               " (presumably hardlink to backlog file)");
2373       break;
2374     default:
2375       die("defer file %s has unexpected link count %d",
2376           path_defer, stabdefer.st_nlink);
2377     }
2378   }
2379
2380   struct stat stab_f, stab_d;
2381   int noent_f;
2382
2383   InputFile *file_d= open_input_file(path_flushing);
2384   if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
2385
2386   xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
2387
2388   if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
2389     debug("startup: F==D => Hardlinked");
2390     xunlink(feedfile, "feed file (during startup)"); /* => Moved */
2391     noent_f= 1;
2392   }
2393
2394   if (noent_f) {
2395     debug("startup: F ENOENT => Moved");
2396     if (file_d) startup_set_input_file(file_d);
2397     spawn_inndcomm_flush("feedfile missing at startup");
2398     /* => Flushing, sms:=FLUSHING */
2399   } else {
2400     if (file_d) {
2401       debug("startup: F!=D => Separated");
2402       startup_set_input_file(file_d);
2403       SMS(SEPARATED, 0, "found both old and current feed files");
2404     } else {
2405       debug("startup: F exists, D ENOENT => Normal");
2406       InputFile *file_f= open_input_file(feedfile);
2407       if (!file_f) die("feed file vanished during startup");
2408       startup_set_input_file(file_f);
2409       SMS(NORMAL, spontaneous_flush_periods, "normal startup");
2410     }
2411   }
2412 }
2413
2414 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
2415   assert(sms == sm_NORMAL);
2416
2417   debug("starting flush (%s) (%lu >?= %lu) (%d)",
2418         why,
2419         (unsigned long)(main_input_file ? main_input_file->offset : 0),
2420         (unsigned long)target_max_feedfile_size,
2421         sm_period_counter);
2422
2423   int r= link(feedfile, path_flushing);
2424   if (r) sysfatal("link feedfile %s to flushing file %s",
2425                   feedfile, path_flushing);
2426   /* => Hardlinked */
2427
2428   xunlink(feedfile, "old feedfile link");
2429   /* => Moved */
2430
2431   spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
2432 }
2433
2434 static void statemc_period_poll(void) {
2435   if (!sm_period_counter) return;
2436   sm_period_counter--;
2437   assert(sm_period_counter>=0);
2438
2439   if (sm_period_counter) return;
2440   switch (sms) {
2441   case sm_NORMAL:
2442     statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
2443     break;
2444   case sm_FLUSHFAILED:
2445     spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
2446     break;
2447   default:
2448     abort();
2449   }
2450 }
2451
2452 static int inputfile_is_done(InputFile *ipf) {
2453   if (!ipf) return 0;
2454   if (ipf->inprogress) return 0; /* new article in the meantime */
2455   if (ipf->rd) return 0; /* not had EOF */
2456   return 1;
2457 }
2458
2459 static void notice_processed(InputFile *ipf, int completed,
2460                              const char *what, const char *spec) {
2461   if (!ipf) return; /* allows preterminate to be lazy */
2462
2463 #define RCI_NOTHING(x) /* nothing */
2464 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
2465 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
2466
2467 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
2468
2469   char *inprog= completed
2470     ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
2471     : xasprintf(" inprogress=%ld", ipf->inprogress);
2472
2473   info("%s %s%s read=%d (+bl=%d,+err=%d)%s"
2474        " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
2475        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
2476        ,
2477        completed?"completed":"processed", what, spec,
2478        ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, inprog,
2479        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
2480        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
2481        CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
2482        , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
2483        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
2484        );
2485
2486   free(inprog);
2487
2488 #undef CNT
2489 }
2490
2491 static void statemc_check_backlog_done(void) {
2492   InputFile *ipf= backlog_input_file;
2493   if (!inputfile_is_done(ipf)) return;
2494
2495   const char *slash= strrchr(ipf->path, '/');
2496   const char *leaf= slash ? slash+1 : ipf->path;
2497   const char *under= strchr(slash, '_');
2498   const char *rest= under ? under+1 : leaf;
2499   if (!strncmp(rest,"backlog",7)) rest += 7;
2500   notice_processed(ipf,1,"backlog ",rest);
2501
2502   close_input_file(ipf);
2503   if (unlink(ipf->path)) {
2504     if (errno != ENOENT)
2505       sysdie("could not unlink processed backlog file %s", ipf->path);
2506     warn("backlog file %s vanished while we were reading it"
2507          " so we couldn't remove it (but it's done now, anyway)",
2508          ipf->path);
2509   }
2510   free(ipf);
2511   backlog_input_file= 0;
2512   search_backlog_file();
2513   return;
2514 }
2515
2516 static void statemc_check_flushing_done(void) {
2517   InputFile *ipf= flushing_input_file;
2518   if (!inputfile_is_done(ipf)) return;
2519
2520   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
2521
2522   notice_processed(ipf,1,"feedfile","");
2523
2524   close_defer();
2525
2526   xunlink(path_flushing, "old flushing file");
2527
2528   close_input_file(flushing_input_file);
2529   free(flushing_input_file);
2530   flushing_input_file= 0;
2531
2532   if (sms==sm_SEPARATED) {
2533     notice("flush complete");
2534     SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2535   } else if (sms==sm_DROPPING) {
2536     SMS(DROPPED, 0, "old flush complete");
2537     search_backlog_file();
2538     notice("feed dropped, but will continue until backlog is finished");
2539   }
2540 }
2541
2542 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
2543                                       void *u) {
2544   assert(!inputfile_is_done(main_input_file));
2545   statemc_check_flushing_done();
2546   statemc_check_backlog_done();
2547   return OOP_CONTINUE;
2548 }
2549
2550 static void queue_check_input_done(void) {
2551   loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
2552 }
2553
2554 static void statemc_setstate(StateMachineState newsms, int periods,
2555                              const char *forlog, const char *why) {
2556   sms= newsms;
2557   sm_period_counter= periods;
2558
2559   const char *xtra= "";
2560   switch (sms) {
2561   case sm_FLUSHING:
2562   case sm_FLUSHFAILED:
2563     if (!main_input_file) xtra= "-ABSENT";
2564     break;
2565   case sm_SEPARATED:
2566   case sm_DROPPING:
2567     xtra= flushing_input_file->rd ? "-1" : "-2";
2568     break;
2569   default:;
2570   }
2571
2572   if (periods) {
2573     info("state %s%s[%d] %s",forlog,xtra,periods,why);
2574   } else {
2575     info("state %s%s %s",forlog,xtra,why);
2576   }
2577 }
2578
2579 /*---------- defer and backlog files ----------*/
2580
2581 static void open_defer(void) {
2582   struct stat stab;
2583
2584   if (defer) return;
2585
2586   defer= fopen(path_defer, "a+");
2587   if (!defer) sysfatal("could not open defer file %s", path_defer);
2588
2589   /* truncate away any half-written records */
2590
2591   xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
2592
2593   if (stab.st_size > LONG_MAX)
2594     die("defer file %s size is far too large", path_defer);
2595
2596   if (!stab.st_size)
2597     return;
2598
2599   long orgsize= stab.st_size;
2600   long truncto= stab.st_size;
2601   for (;;) {
2602     if (!truncto) break; /* was only (if anything) one half-truncated record */
2603     if (fseek(defer, truncto-1, SEEK_SET) < 0)
2604       sysdie("seek in defer file %s while truncating partial", path_defer);
2605
2606     int r= getc(defer);
2607     if (r==EOF) {
2608       if (ferror(defer))
2609         sysdie("failed read from defer file %s", path_defer);
2610       else
2611         die("defer file %s shrank while we were checking it!", path_defer);
2612     }
2613     if (r=='\n') break;
2614     truncto--;
2615   }
2616
2617   if (stab.st_size != truncto) {
2618     warn("truncating half-record at end of defer file %s -"
2619          " shrinking by %ld bytes from %ld to %ld",
2620          path_defer, orgsize - truncto, orgsize, truncto);
2621
2622     if (fflush(defer))
2623       sysfatal("could not flush defer file %s", path_defer);
2624     if (ftruncate(fileno(defer), truncto))
2625       sysdie("could not truncate defer file %s", path_defer);
2626
2627   } else {
2628     info("continuing existing defer file %s (%ld bytes)",
2629          path_defer, orgsize);
2630   }
2631   if (fseek(defer, truncto, SEEK_SET))
2632     sysdie("could not seek to new end of defer file %s", path_defer);
2633 }
2634
2635 static void close_defer(void) {
2636   if (!defer)
2637     return;
2638
2639   struct stat stab;
2640   xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
2641
2642   if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
2643   defer= 0;
2644
2645   time_t now= xtime();
2646
2647   char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
2648                            (unsigned long)now,
2649                            (unsigned long)stab.st_ino);
2650   if (link(path_defer, backlog))
2651     sysfatal("could not install defer file %s as backlog file %s",
2652            path_defer, backlog);
2653   if (unlink(path_defer))
2654     sysdie("could not unlink old defer link %s to backlog file %s",
2655            path_defer, backlog);
2656
2657   free(backlog);
2658
2659   if (until_backlog_nextscan < 0 ||
2660       until_backlog_nextscan > backlog_retry_minperiods + 1)
2661     until_backlog_nextscan= backlog_retry_minperiods + 1;
2662 }
2663
2664 static void poll_backlog_file(void) {
2665   if (until_backlog_nextscan < 0) return;
2666   if (until_backlog_nextscan-- > 0) return;
2667   search_backlog_file();
2668 }
2669
2670 static void search_backlog_file(void) {
2671   /* returns non-0 iff there are any backlog files */
2672
2673   glob_t gl;
2674   int r, i;
2675   struct stat stab;
2676   const char *oldest_path=0;
2677   time_t oldest_mtime=0, now;
2678
2679   if (backlog_input_file) return;
2680
2681  try_again:
2682
2683   r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
2684
2685   switch (r) {
2686   case GLOB_ABORTED:
2687     sysfatal("failed to expand backlog pattern %s", globpat_backlog);
2688   case GLOB_NOSPACE:
2689     fatal("out of memory expanding backlog pattern %s", globpat_backlog);
2690   case 0:
2691     for (i=0; i<gl.gl_pathc; i++) {
2692       const char *path= gl.gl_pathv[i];
2693
2694       if (strchr(path,'#') || strchr(path,'~')) {
2695         debug("backlog file search skipping %s", path);
2696         continue;
2697       }
2698       r= stat(path, &stab);
2699       if (r) {
2700         syswarn("failed to stat backlog file %s", path);
2701         continue;
2702       }
2703       if (!S_ISREG(stab.st_mode)) {
2704         warn("backlog file %s is not a plain file (or link to one)", path);
2705         continue;
2706       }
2707       if (!oldest_path || stab.st_mtime < oldest_mtime) {
2708         oldest_path= path;
2709         oldest_mtime= stab.st_mtime;
2710       }
2711     }
2712   case GLOB_NOMATCH: /* fall through */
2713     break;
2714   default:
2715     sysdie("glob expansion of backlog pattern %s gave unexpected"
2716            " nonzero (error?) return value %d", globpat_backlog, r);
2717   }
2718
2719   if (!oldest_path) {
2720     debug("backlog scan: none");
2721
2722     if (sms==sm_DROPPED) {
2723       preterminate();
2724       notice("feed dropped and our work is complete");
2725
2726       int r= unlink(path_control);
2727       if (r && errno!=ENOENT)
2728         syswarn("failed to remove control symlink for old feed");
2729
2730       xunlink(path_lock,    "lockfile for old feed");
2731       exit(4);
2732     }
2733     until_backlog_nextscan= backlog_spontrescan_periods;
2734     goto xfree;
2735   }
2736
2737   now= xtime();
2738   double age= difftime(now, oldest_mtime);
2739   long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
2740
2741   if (age_deficiency <= 0) {
2742     debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
2743           age, age_deficiency, oldest_path);
2744
2745     backlog_input_file= open_input_file(oldest_path);
2746     if (!backlog_input_file) {
2747       warn("backlog file %s vanished as we opened it", oldest_path);
2748       globfree(&gl);
2749       goto try_again;
2750     }
2751     inputfile_reading_start(backlog_input_file);
2752     until_backlog_nextscan= -1;
2753     goto xfree;
2754   }
2755
2756   until_backlog_nextscan= age_deficiency / period_seconds;
2757
2758   if (backlog_spontrescan_periods >= 0 &&
2759       until_backlog_nextscan > backlog_spontrescan_periods)
2760     until_backlog_nextscan= backlog_spontrescan_periods;
2761
2762   debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
2763         age, age_deficiency, until_backlog_nextscan, oldest_path);
2764
2765  xfree:
2766   globfree(&gl);
2767   return;
2768 }
2769
2770 /*---------- shutdown and signal handling ----------*/
2771
2772 static void preterminate(void) {
2773   if (in_child) return;
2774   notice_processed(main_input_file,0,"feedfile","");
2775   notice_processed(flushing_input_file,0,"flushing file","");
2776   if (backlog_input_file)
2777     notice_processed(backlog_input_file,0, "backlog file ",
2778                      backlog_input_file->path);
2779 }
2780
2781 static int signal_self_pipe[2];
2782 static sig_atomic_t terminate_sig_flag;
2783
2784 static void defraise(int signo) {
2785   struct sigaction sa;
2786   memset(&sa,0,sizeof(sa));
2787   sa.sa_handler= SIG_DFL;
2788   xsigaction(signo,&sa);
2789   raise(signo);
2790 }
2791
2792 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
2793   assert(fd=signal_self_pipe[0]);
2794   char buf[PIPE_BUF];
2795   int r= read(signal_self_pipe[0], buf, sizeof(buf));
2796   if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
2797   if (r==0) die("eof on signal self pipe");
2798   if (terminate_sig_flag) {
2799     preterminate();
2800     notice("terminating (%s)", strsignal(terminate_sig_flag));
2801     defraise(terminate_sig_flag);
2802     abort();
2803   }
2804   return OOP_CONTINUE;
2805 }
2806
2807 static void sigarrived_handler(int signum) {
2808   static char x;
2809   switch (signum) {
2810   case SIGINT: case SIGTERM:
2811     if (!terminate_sig_flag) terminate_sig_flag= signum;
2812     break;
2813   default:
2814     abort();
2815   }
2816   write(signal_self_pipe[1],&x,1);
2817 }
2818
2819 static void init_signals(void) {
2820   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
2821     sysdie("could not ignore SIGPIPE");
2822
2823   if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
2824
2825   xsetnonblock(signal_self_pipe[0],1);
2826   xsetnonblock(signal_self_pipe[1],1);
2827
2828   struct sigaction sa;
2829   memset(&sa,0,sizeof(sa));
2830   sa.sa_handler= sigarrived_handler;
2831   sa.sa_flags= SA_RESTART;
2832   xsigaction(SIGTERM,&sa);
2833   xsigaction(SIGINT,&sa);
2834
2835   on_fd_read_except(signal_self_pipe[0], sigarrived_event);
2836 }
2837
2838 /*========== flushing the feed ==========*/
2839
2840 static pid_t inndcomm_child;
2841 static int inndcomm_sentinel_fd;
2842
2843 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
2844   assert(inndcomm_child);
2845   assert(fd == inndcomm_sentinel_fd);
2846   int status= xwaitpid(&inndcomm_child, "inndcomm");
2847   inndcomm_child= 0;
2848   
2849   cancel_fd_read_except(fd);
2850   xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
2851   inndcomm_sentinel_fd= 0;
2852
2853   assert(!flushing_input_file);
2854
2855   if (WIFEXITED(status)) {
2856     switch (WEXITSTATUS(status)) {
2857
2858     case INNDCOMMCHILD_ESTATUS_FAIL:
2859       goto failed;
2860
2861     case INNDCOMMCHILD_ESTATUS_NONESUCH:
2862       notice("feed has been dropped by innd, finishing up");
2863       flushing_input_file= main_input_file;
2864       tailing_queue_readable(flushing_input_file);
2865         /* we probably previously returned EAGAIN from our fake read method
2866          * when in fact we were at EOF, so signal another readable event
2867          * so we actually see the EOF */
2868
2869       main_input_file= 0;
2870
2871       if (flushing_input_file) {
2872         SMS(DROPPING, 0, "feed dropped by innd, but must finish last flush");
2873       } else {
2874         close_defer();
2875         SMS(DROPPED, 0, "feed dropped by innd");
2876         search_backlog_file();
2877       }
2878       return OOP_CONTINUE;
2879
2880     case 0:
2881       /* as above */
2882       flushing_input_file= main_input_file;
2883       tailing_queue_readable(flushing_input_file);
2884
2885       main_input_file= open_input_file(feedfile);
2886       if (!main_input_file)
2887         die("flush succeeded but feedfile %s does not exist!", feedfile);
2888
2889       if (flushing_input_file) {
2890         SMS(SEPARATED, spontaneous_flush_periods, "recovery flush complete");
2891       } else {
2892         close_defer();
2893         SMS(NORMAL, spontaneous_flush_periods, "flush complete");
2894       }
2895       return OOP_CONTINUE;
2896
2897     default:
2898       goto unexpected_exitstatus;
2899
2900     }
2901   } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
2902     warn("flush timed out trying to talk to innd");
2903     goto failed;
2904   } else {
2905   unexpected_exitstatus:
2906     report_child_status("inndcomm child", status);
2907   }
2908
2909  failed:
2910   SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
2911   return OOP_CONTINUE;
2912 }
2913
2914 static void inndcommfail(const char *what) {
2915   syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
2916   exit(INNDCOMMCHILD_ESTATUS_FAIL);
2917 }
2918
2919 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
2920   int pipefds[2];
2921
2922   notice("flushing %s",why);
2923
2924   assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
2925   assert(!inndcomm_child);
2926   assert(!inndcomm_sentinel_fd);
2927
2928   if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
2929
2930   inndcomm_child= xfork("inndcomm child");
2931
2932   if (!inndcomm_child) {
2933     const char *flushargv[2]= { sitename, 0 };
2934     char *reply;
2935     int r;
2936
2937     xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
2938     /* parent spots the autoclose of pipefds[1] when we die or exit */
2939
2940     if (simulate_flush>=0) {
2941       warn("SIMULATING flush child status %d", simulate_flush);
2942       if (simulate_flush>128) raise(simulate_flush-128);
2943       else exit(simulate_flush);
2944     }
2945
2946     alarm(inndcomm_flush_timeout);
2947     r= ICCopen();                         if (r)   inndcommfail("connect");
2948     r= ICCcommand('f',flushargv,&reply);  if (r<0) inndcommfail("transmit");
2949     if (!r) exit(0); /* yay! */
2950
2951     if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
2952     syswarn("innd ctlinnd flush failed: innd said %s", reply);
2953     exit(INNDCOMMCHILD_ESTATUS_FAIL);
2954   }
2955
2956   simulate_flush= -1;
2957
2958   xclose(pipefds[1], "inndcomm sentinel child's end",0);
2959   inndcomm_sentinel_fd= pipefds[0];
2960   assert(inndcomm_sentinel_fd);
2961   on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
2962
2963   SMS(FLUSHING, 0, why);
2964 }
2965
2966 /*========== main program ==========*/
2967
2968 static void postfork_inputfile(InputFile *ipf) {
2969   if (!ipf) return;
2970   xclose(ipf->fd, "(in child) input file ", ipf->path);
2971 }
2972
2973 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
2974   /* we have no stdio streams that are buffered long-term */
2975   if (!f) return;
2976   if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
2977 }
2978
2979 static void postfork(void) {
2980   in_child= 1;
2981
2982   if (signal(SIGPIPE, SIG_DFL) == SIG_ERR)
2983     sysdie("(in child) failed to reset SIGPIPE");
2984
2985   postfork_inputfile(main_input_file);
2986   postfork_inputfile(flushing_input_file);
2987
2988   Conn *conn;
2989   for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn))
2990     conn_closefd(conn,"(in child) ");
2991
2992   postfork_stdio(defer, "defer file ", path_defer);
2993 }
2994
2995 typedef struct Every Every;
2996 struct Every {
2997   struct timeval interval;
2998   int fixed_rate;
2999   void (*f)(void);
3000 };
3001
3002 static void every_schedule(Every *e, struct timeval base);
3003
3004 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
3005   Every *e= e_v;
3006   e->f();
3007   if (!e->fixed_rate) xgettimeofday(&base);
3008   every_schedule(e, base);
3009   return OOP_CONTINUE;
3010 }
3011
3012 static void every_schedule(Every *e, struct timeval base) {
3013   struct timeval when;
3014   timeradd(&base, &e->interval, &when);
3015   loop->on_time(loop, when, every_happens, e);
3016 }
3017
3018 static void every(int interval, int fixed_rate, void (*f)(void)) {
3019   Every *e= xmalloc(sizeof(*e));
3020   e->interval.tv_sec= interval;
3021   e->interval.tv_usec= 0;
3022   e->fixed_rate= fixed_rate;
3023   e->f= f;
3024   struct timeval now;
3025   xgettimeofday(&now);
3026   every_schedule(e, now);
3027 }
3028
3029 static void filepoll(void) {
3030   filemon_callback(main_input_file);
3031   filemon_callback(flushing_input_file);
3032 }
3033
3034 static char *debug_report_ipf(InputFile *ipf) {
3035   if (!ipf) return xasprintf("none");
3036
3037   const char *slash= strrchr(ipf->path,'/');
3038   const char *path= slash ? slash+1 : ipf->path;
3039
3040   return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s%s",
3041                    ipf, path,
3042                    ipf->inprogress, (long)ipf->offset,
3043                    ipf->fd,
3044                    ipf->rd ? "" : ",!rd",
3045                    ipf->skippinglong ? "*skiplong" : "");
3046 }
3047
3048 static void period(void) {
3049   char *dipf_main=     debug_report_ipf(main_input_file);
3050   char *dipf_flushing= debug_report_ipf(flushing_input_file);
3051   char *dipf_backlog=  debug_report_ipf(backlog_input_file);
3052
3053   debug("PERIOD"
3054         " sms=%s[%d] conns=%d queue=%d until_connect=%d"
3055         " input_files main:%s flushing:%s backlog:%s"
3056         " children connecting=%ld inndcomm=%ld"
3057         ,
3058         sms_names[sms], sm_period_counter,
3059           conns.count, queue.count, until_connect,
3060         dipf_main, dipf_flushing, dipf_backlog,
3061         (long)connecting_child, (long)inndcomm_child
3062         );
3063
3064   free(dipf_main);
3065   free(dipf_flushing);
3066   free(dipf_backlog);
3067
3068   if (until_connect) until_connect--;
3069
3070   poll_backlog_file();
3071   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
3072   statemc_period_poll();
3073   check_assign_articles();
3074   check_idle_conns();
3075 }
3076
3077
3078 /*========== dumping state ==========*/
3079
3080 static void dump_article_list(FILE *f, const ControlCommand *c,
3081                               const ArticleList *al) {
3082   fprintf(f, " count=%d\n", al->count);
3083   if (!c->xval) return;
3084   
3085   int i; Article *art;
3086   for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
3087     fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
3088     DUMPV("%p", art->,ipf);
3089     DUMPV("%d", art->,missing);
3090     DUMPV("%lu", (unsigned long)art->,offset);
3091     DUMPV("%d", art->,blanklen);
3092     DUMPV("%d", art->,midlen);
3093     fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
3094   }
3095 }
3096   
3097 static void dump_input_file(FILE *f, InputFile *ipf, const char *wh) {
3098   char *dipf= debug_report_ipf(ipf);
3099   fprintf(f,"input %s %s", wh, dipf);
3100   free(dipf);
3101   
3102   if (ipf) {
3103     DUMPV("%d", ipf->,readcount_ok);
3104     DUMPV("%d", ipf->,readcount_blank);
3105     DUMPV("%d", ipf->,readcount_err);
3106   }
3107   fprintf(f,"\n");
3108   if (ipf) {
3109     ArtState state; const char *const *statename; 
3110     for (state=0, statename=artstate_names; *statename; state++,statename++) {
3111 #define RC_DUMP_FMT(x) " " #x "=%d"
3112 #define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
3113       fprintf(f,"input %s counts %-11s"
3114               RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
3115               wh, *statename
3116               RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
3117     }
3118   }
3119 }
3120
3121 CCMD(dump) {
3122   int i;
3123   fprintf(cc->out, "dumping state to %s\n", path_dump);
3124   FILE *f= fopen(path_dump, "w");
3125   if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
3126
3127   fprintf(f,"general");
3128   DUMPV("%s", sms_names,[sms]);
3129   DUMPV("%d", ,sm_period_counter);
3130   DUMPV("%ld", (long),self_pid);
3131   DUMPV("%p", , defer);
3132   DUMPV("%d", , until_connect);
3133   DUMPV("%d", , until_backlog_nextscan);
3134   DUMPV("%d", , simulate_flush);
3135   fprintf(f,"\nnocheck");
3136   DUMPV("%#.10f", , accept_proportion);
3137   DUMPV("%d", , nocheck);
3138   DUMPV("%d", , nocheck_reported);
3139   fprintf(f,"\n");
3140
3141   fprintf(f,"special");
3142   DUMPV("%ld", (long),connecting_child);
3143   DUMPV("%d", , connecting_fdpass_sock);
3144   DUMPV("%d", , control_master);
3145   fprintf(f,"\n");
3146
3147   fprintf(f,"filemon ");
3148   filemon_method_dump_info(f);
3149
3150   dump_input_file(f, main_input_file,     "main"    );
3151   dump_input_file(f, flushing_input_file, "flushing");
3152   dump_input_file(f, backlog_input_file,  "backlog" );
3153
3154   fprintf(f,"conns count=%d\n", conns.count);
3155
3156   Conn *conn;
3157   for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) {
3158
3159     fprintf(f,"C%d",conn->fd);
3160     DUMPV("%p",conn->,rd);             DUMPV("%d",conn->,max_queue);
3161     DUMPV("%d",conn->,stream);         DUMPV("%d",conn->,quitting);
3162     DUMPV("%d",conn->,since_activity);
3163     fprintf(f,"\n");
3164
3165     fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
3166     fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
3167     fprintf(f,"C%d sent",    conn->fd); dump_article_list(f,c,&conn->sent);
3168
3169     fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
3170     for (i=0; i<conn->xmitu; i++) {
3171       const struct iovec *iv= &conn->xmit[i];
3172       const XmitDetails *xd= &conn->xmitd[i];
3173       char *dinfo;
3174       long diff;
3175       switch (xd->kind) {
3176       case xk_Malloc:
3177         diff= xd->info.malloc_tofree - (char*)iv->iov_base;
3178         dinfo= xasprintf("M%5ld", diff);
3179         break;
3180       case xk_Const:
3181         dinfo= xasprintf("Const");
3182         break;
3183       case xk_Artdata:
3184         dinfo= xasprintf("A%p", xd->info.sm_art);
3185         break;
3186       default:
3187         abort();
3188       }
3189       fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
3190               sanitise(iv->iov_base, iv->iov_len));
3191       free(dinfo);
3192     }
3193   }
3194
3195   fprintf(f,"queue"); dump_article_list(f,c,&queue);
3196
3197   fprintf(f,"paths");
3198   DUMPV("%s", , path_lock);
3199   DUMPV("%s", , path_flushing);
3200   DUMPV("%s", , path_defer);
3201   DUMPV("%s", , path_control);
3202   DUMPV("%s", , path_dump);
3203   DUMPV("%s", , globpat_backlog);
3204   fprintf(f,"\n");
3205
3206   if (!!ferror(f) + !!fclose(f)) {
3207     fprintf(cc->out, "failed: write: %s\n", strerror(errno));
3208     return;
3209   }
3210 }
3211
3212 /*========== option parsing ==========*/
3213
3214 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
3215 static void vbadusage(const char *fmt, va_list al) {
3216   char *m= xvasprintf(fmt,al);
3217   fprintf(stderr, "bad usage: %s\n"
3218           "say --help for help, or read the manpage\n",
3219           m);
3220   if (become_daemon)
3221     syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
3222   exit(8);
3223 }
3224
3225 /*---------- generic option parser ----------*/
3226
3227 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
3228 static void badusage(const char *fmt, ...) {
3229   va_list al;
3230   va_start(al,fmt);
3231   vbadusage(fmt,al);
3232 }
3233
3234 enum OptFlags {
3235   of_seconds= 001000u,
3236   of_boolean= 002000u,
3237 };
3238
3239 typedef struct Option Option;
3240 typedef void OptionParser(const Option*, const char *val);
3241
3242 struct Option {
3243   int shrt;
3244   const char *lng, *formarg;
3245   void *store;
3246   OptionParser *fn;
3247   int intval;
3248 };
3249
3250 static void parse_options(const Option *options, char ***argvp) {
3251   /* on return *argvp is first non-option arg; argc is not updated */
3252
3253   for (;;) {
3254     const char *arg= *++(*argvp);
3255     if (!arg) break;
3256     if (*arg != '-') break;
3257     if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
3258     int a;
3259     while ((a= *++arg)) {
3260       const Option *o;
3261       if (a=='-') {
3262         arg++;
3263         char *equals= strchr(arg,'=');
3264         int len= equals ? (equals - arg) : strlen(arg);
3265         for (o=options; o->shrt || o->lng; o++)
3266           if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
3267             goto found_long;
3268         badusage("unknown long option --%s",arg);
3269       found_long:
3270         if (!o->formarg) {
3271           if (equals) badusage("option --%s does not take a value",o->lng);
3272           arg= 0;
3273         } else if (equals) {
3274           arg= equals+1;
3275         } else {
3276           arg= *++(*argvp);
3277           if (!arg) badusage("option --%s needs a value for %s",
3278                              o->lng, o->formarg);
3279         }
3280         o->fn(o, arg);
3281         break; /* eaten the whole argument now */
3282       }
3283       for (o=options; o->shrt || o->lng; o++)
3284         if (a == o->shrt)
3285           goto found_short;
3286       badusage("unknown short option -%c",a);
3287     found_short:
3288       if (!o->formarg) {
3289         o->fn(o,0);
3290       } else {
3291         if (!*++arg) {
3292           arg= *++(*argvp);
3293           if (!arg) badusage("option -%c needs a value for %s",
3294                              o->shrt, o->formarg);
3295         }
3296         o->fn(o,arg);
3297         break; /* eaten the whole argument now */
3298       }
3299     }
3300   }
3301 }
3302
3303 #define DELIMPERHAPS(delim,str)  (str) ? (delim) : "", (str) ? (str) : ""
3304
3305 static void print_options(const Option *options, FILE *f) {
3306   const Option *o;
3307   for (o=options; o->shrt || o->lng; o++) {
3308     char shrt[2] = { o->shrt, 0 };
3309     char *optspec= xasprintf("%s%s%s%s%s",
3310                              o->shrt ? "-" : "", shrt,
3311                              o->shrt && o->lng ? "|" : "",
3312                              DELIMPERHAPS("--", o->lng));
3313     fprintf(f, "  %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
3314     free(optspec);
3315   }
3316 }
3317
3318 /*---------- specific option types ----------*/
3319
3320 static void op_integer(const Option *o, const char *val) {
3321   char *ep;
3322   errno= 0;
3323   unsigned long ul= strtoul(val,&ep,10);
3324   if (*ep || ep==val || errno || ul>INT_MAX)
3325     badusage("bad integer value for %s",o->lng);
3326   int *store= o->store;
3327   *store= ul;
3328 }
3329
3330 static void op_double(const Option *o, const char *val) {
3331   int *store= o->store;
3332   char *ep;
3333   errno= 0;
3334   *store= strtod(val, &ep);
3335   if (*ep || ep==val || errno)
3336     badusage("bad floating point value for %s",o->lng);
3337 }
3338
3339 static void op_string(const Option *o, const char *val) {
3340   const char **store= o->store;
3341   *store= val;
3342 }
3343
3344 static void op_seconds(const Option *o, const char *val) {
3345   int *store= o->store;
3346   char *ep;
3347   int unit;
3348
3349   double v= strtod(val,&ep);
3350   if (ep==val) badusage("bad time/duration value for %s",o->lng);
3351
3352   if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
3353   else if (!strcmp(ep,"m") || !strcmp(ep,"min"))    unit= 60;
3354   else if (!strcmp(ep,"h") || !strcmp(ep,"hour"))   unit= 3600;
3355   else if (!strcmp(ep,"d") || !strcmp(ep,"day"))    unit= 86400;
3356   else if (!strcmp(ep,"das")) unit= 10;
3357   else if (!strcmp(ep,"hs"))  unit= 100;
3358   else if (!strcmp(ep,"ks"))  unit= 1000;
3359   else if (!strcmp(ep,"Ms"))  unit= 1000000;
3360   else badusage("bad units %s for time/duration value for %s",ep,o->lng);
3361
3362   v *= unit;
3363   v= ceil(v);
3364   if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
3365   *store= v;
3366 }
3367
3368 static void op_setint(const Option *o, const char *val) {
3369   int *store= o->store;
3370   *store= o->intval;
3371 }
3372
3373 /*---------- specific options ----------*/
3374
3375 static void help(const Option *o, const char *val);
3376
3377 static const Option innduct_options[]= {
3378 {'f',"feedfile",         "F",     &feedfile,                 op_string      },
3379 {'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
3380 {0,"no-daemon",          0,       &become_daemon,            op_setint, 0   },
3381 {0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
3382 {0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
3383 {'C',"inndconf",         "F",     &inndconffile,             op_string      },
3384 {'P',"port",             "PORT",  &port,                     op_integer     },
3385 {0,"ctrl-sock-dir",      0,       &realsockdir,              op_string      },
3386 {0,"help",               0,       0,                         help           },
3387
3388 {0,"max-connections",    "N",     &max_connections,          op_integer     },
3389 {0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
3390 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
3391 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
3392
3393 {0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
3394 {0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
3395 {0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
3396
3397 {0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
3398 {0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
3399
3400 {0,"reconnect-interval",     "PERIOD", &reconnect_delay_periods,  op_seconds },
3401 {0,"flush-retry-interval",   "PERIOD", &flushfail_retry_periods,  op_seconds },
3402 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
3403 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
3404 {0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
3405 {0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
3406
3407 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
3408 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
3409
3410 {0,0}
3411 };
3412
3413 static void printusage(FILE *f) {
3414   fputs("usage: innduct [options] site [fqdn]\n"
3415         "available options are:\n", f);
3416   print_options(innduct_options, f);
3417 }
3418
3419 static void help(const Option *o, const char *val) {
3420   printusage(stdout);
3421   if (ferror(stdout) || fflush(stdout)) {
3422     perror("innduct: writing help");
3423     exit(12);
3424   }
3425   exit(0);
3426 }
3427
3428 static void convert_to_periods_rndup(int *store) {
3429   *store += period_seconds-1;
3430   *store /= period_seconds;
3431 }
3432
3433 int main(int argc, char **argv) {
3434   if (!argv[1]) {
3435     printusage(stderr);
3436     exit(8);
3437   }
3438
3439   parse_options(innduct_options, &argv);
3440
3441   /* arguments */
3442
3443   sitename= *argv++;
3444   if (!sitename) badusage("need site name argument");
3445   remote_host= *argv++;
3446   if (*argv) badusage("too many non-option arguments");
3447
3448   /* defaults */
3449
3450   int r= innconf_read(inndconffile);
3451   if (!r) badusage("could not read inn.conf (more info on stderr)");
3452
3453   if (!remote_host) remote_host= sitename;
3454
3455   if (nocheck_thresh < 0 || nocheck_thresh > 100)
3456     badusage("nocheck threshold percentage must be between 0..100");
3457   nocheck_thresh *= 0.01;
3458
3459   if (nocheck_decay < 0.1)
3460     badusage("nocheck decay articles must be at least 0.1");
3461   nocheck_decay= pow(0.5, 1.0/nocheck_decay);
3462
3463   convert_to_periods_rndup(&reconnect_delay_periods);
3464   convert_to_periods_rndup(&flushfail_retry_periods);
3465   convert_to_periods_rndup(&backlog_retry_minperiods);
3466   convert_to_periods_rndup(&backlog_spontrescan_periods);
3467   convert_to_periods_rndup(&spontaneous_flush_periods);
3468   convert_to_periods_rndup(&need_activity_periods);
3469
3470   if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
3471     badusage("bad input data ratio must be between 0..100");
3472   max_bad_data_ratio *= 0.01;
3473
3474   if (!feedfile) {
3475     feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
3476   } else if (!feedfile[0]) {
3477     badusage("feed filename must be nonempty");
3478   } else if (feedfile[strlen(feedfile)-1]=='/') {
3479     feedfile= xasprintf("%s%s",feedfile,sitename);
3480   }
3481
3482   const char *feedfile_forbidden= "?*[~#";
3483   int c;
3484   while ((c= *feedfile_forbidden++))
3485     if (strchr(feedfile, c))
3486       badusage("feed filename may not contain metacharacter %c",c);
3487
3488   /* set things up */
3489
3490   path_lock=        xasprintf("%s_lock",      feedfile);
3491   path_flushing=    xasprintf("%s_flushing",  feedfile);
3492   path_defer=       xasprintf("%s_defer",     feedfile);
3493   path_control=     xasprintf("%s_control",   feedfile);
3494   path_dump=        xasprintf("%s_dump",      feedfile);
3495   globpat_backlog=  xasprintf("%s_backlog*",  feedfile);
3496
3497   oop_source_sys *sysloop= oop_sys_new();
3498   if (!sysloop) sysdie("could not create liboop event loop");
3499   loop= (oop_source*)sysloop;
3500
3501   LIST_INIT(conns);
3502   LIST_INIT(queue);
3503
3504   if (become_daemon) {
3505     int i;
3506     for (i=3; i<255; i++)
3507       /* do this now before we open syslog, etc. */
3508       close(i);
3509     openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
3510
3511     int null= open("/dev/null",O_RDWR);
3512     if (null<0) sysfatal("failed to open /dev/null");
3513     dup2(null,0);
3514     dup2(null,1);
3515     dup2(null,2);
3516     xclose(null, "/dev/null original fd",0);
3517
3518     pid_t child1= xfork("daemonise first fork");
3519     if (child1) _exit(0);
3520
3521     pid_t sid= setsid();
3522     if (sid != child1) sysfatal("setsid failed");
3523
3524     pid_t child2= xfork("daemonise second fork");
3525     if (child2) _exit(0);
3526   }
3527
3528   self_pid= getpid();
3529   if (self_pid==-1) sysdie("getpid");
3530
3531   statemc_lock();
3532
3533   init_signals();
3534
3535   notice("starting");
3536
3537   if (!become_daemon)
3538     control_stdio();
3539
3540   control_init();
3541
3542   int filemon_ok= 0;
3543   if (!try_filemon) {
3544     notice("filemon: suppressed by command line option, polling");
3545   } else {
3546     filemon_ok= filemon_method_init();
3547     if (!filemon_ok)
3548       warn("filemon: no file monitoring available, polling");
3549   }
3550   if (!filemon_ok)
3551     every(filepoll_seconds,0,filepoll);
3552
3553   every(period_seconds,1,period);
3554
3555   statemc_init();
3556
3557   /* let's go */
3558
3559   void *run= oop_sys_run(sysloop);
3560   assert(run == OOP_ERROR);
3561   sysdie("event loop failed");
3562 }