chiark / gitweb /
Reorganisation and a bit of new code
[inn-innduct.git] / backends / innduct.c
1 /*
2  * Newsfeeds file entries should look like this:
3  *     host.name.of.site[/exclude,exclude,...]\
4  *             :pattern,pattern...[/distribution,distribution...]\
5  *             :Tf,Wnm
6  *             :
7  * or
8  *     sitename[/exclude,exclude,...]\
9  *             :pattern,pattern...[/distribution,distribution...]\
10  *             :Tf,Wnm
11  *             :host.name.of.site
12  *
13  * Four files full of
14  *    token messageid
15  * or might be blanked out
16  *    <spc><spc><spc><spc>....
17  *
18  *   site.name_duct.lock       lock preventing multiple ducts
19  *                                holder of this lock is "duct"
20  * F site.name                 main feed file
21  *                                opened/created, then written, by innd
22  *                                read by duct
23  *                                unlinked by duct
24  *                                tokens blanked out by duct when processed
25  * D site.name_duct            temporary feed file during flush (or crash)
26  *                                hardlink created by duct
27  *                                unlinked by duct
28  *   site.name_duct.defer      431'd articles, still being written,
29  *                                created, written, used by duct
30  *   site.name_backlog.lock    lock taken out by innxmit wrapper
31  *                                holder and its child are "xmit"
32  *   site.name_backlog_<date>.<inum>
33  *                             431'd articles, ready for innxmit
34  *                                created (link/mv) by duct
35  *                                read by xmit
36  *                                unlinked by xmit
37  *   site.name_backlog_<letters> eg
38  *   site.name_backlog_manual
39  *                             anything the sysadmin likes (eg, feed files
40  *                             from old feeds to be merged into this one)
41  *                                created (link/mv) by admin
42  *                                read by xmit
43  *                                unlinked by xmit
44
45
46    OVERALL STATES:
47
48                                                                 START
49                                                                   |
50                                                              check D, F
51                                                                   |
52                           <--------------------------------------'|
53         Nothing                            F, D both ENOENT       |
54          F: ENOENT                                                |
55          D: ENOENT                                                |
56          duct: not not reading anything                           |
57            |                                                      |
58            |`---------------------.                               |
59            |                      | duct times out waiting for F  |
60            V  innd creates F      | duct exits                    |
61            |                      V                               |
62         Noduct                    GO TO Dropped                   |
63          F: innd writing                                          |
64          D: ENOENT                                                |
65          duct: not running or not reading anything                |
66            |                                                      |
67            |                                                      |
68      ,-->--+                   <---------------------------------'|
69      |     |  duct opens F                         F exists       |
70      |     |                                       D ENOENT       |
71      |     V                                                      |
72      |  Normal                                                    |
73      |   F: innd writing, duct reading                            |
74      |   D: ENOENT                                                |
75      |     |                                                      |
76      |     |  duct decides time to flush                          |
77      |     |  duct makes hardlink                                 |
78      |     |                                                      |
79      |     V                            <------------------------'|
80      |  Hardlinked                                  F==D          |
81      |   F == D: innd writing, duct reading         both exist    |
82      ^     |                                                      |
83      |     |  duct unlinks F                                      |
84      |     V                                                      |
85      |  Moved                               <----+------------<--'|
86      |   F: ENOENT                               |  F ENOENT      |
87      |   D: innd writing, duct reading           |  D exists      |
88      |     |                                     |                |
89      |     |  duct requests flush of feed        |                |
90      |     |   (others can too, harmlessly)      |                |
91      |     V                                     |                |
92      |  Flushing                                 |                |
93      |   F: ENOENT                               |                |
94      |   D: innd flushing, duct reading          |                |
95      |     |                                     |                |
96      |     |   inndcomm flush fails              |                |
97      |     |`-------------------------->---------'                |
98      |     |                                                      |
99      |     |   inndcomm reports no such site                      |
100      |     |`---------------------------------------------------- | -.
101      |     |                                                      |  |
102      |     |  innd finishes writing D, creates F                  |  |
103      |     |  inndcomm reports flush successful                   |  |
104      |     |                                                      |  |
105      |     V                                                      |  |
106      |  Separated                                <----------------'  |
107      |   F: innd writing                            F!=D             /
108      |   D: duct reading                             both exist     /
109      |     |                                                       /
110      |     |  duct gets to the end of D                           /
111      |     |  duct opens F too                                   /
112      |     V                                                    /
113      |  Finishing                                              /
114      |   F: innd writing, duct reading                        |
115      |   D: duct finishing                                    V
116      |     |                                            Dropping
117      |     |  duct finishes processing D                 F: ENOENT
118      |     V  duct unlinks D                             D: duct reading
119      |     |                                                  |
120      `--<--'                                                  | duct finishes
121                                                               |  processing D
122                                                               | duct unlinks D
123                                                               | duct exits
124                                                               V
125                                                         Dropped
126                                                          F: ENOENT
127                                                          D: ENOENT
128                                                          duct not running
129
130    "duct reading" means innduct is reading the file but also
131    overwriting processed tokens.
132
133  *
134  *
135  */
136
137
138 /*----- general definitions, probably best not changed -----*/
139
140 #define PERIOD_SECONDS 60
141
142 #define CONNCHILD_ESTATUS_STREAM   4
143 #define CONNCHILD_ESTATUS_NOSTREAM 5
144
145
146 /*----- configuration options -----*/
147
148 static char *feedfile;
149 static int max_connections, max_queue_per_conn;
150 static int connection_setup_timeout, port, try_stream;
151 static const char *remote_host;
152
153 static double accept_proportion;
154 static double nocheck_thresh= 0.95;
155 static double nocheck_decay= 1-1/100;
156 static int nocheck, nocheck_reported;
157
158
159 /*----- doubly linked lists -----*/
160
161 #define ISNODE(T)    T *next, *back;
162 #define LIST(T)      struct { T *head, *tail, *tailpred; int count; }
163
164 #define NODE(n) ((struct node*)&(n)->head)
165
166 #define LIST_ADDHEAD(l,n)                                               \
167  (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++)
168 #define LIST_ADDTAIL(l,n)                                               \
169  (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++)
170
171 #define LIST_REMHEAD(l)                                                   \
172  ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0)
173 #define LIST_REMTAIL(l)                                                   \
174  ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0)
175 #define LIST_REMOVE(l,n)                        \
176  (list_remove(NODE((n))), (void)(l).count--)
177 #define LIST_INSERT(l,n,pred) \
178  (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++)
179
180
181 /*----- statistics -----*/
182
183 #define RESULT_COUNTS                           \
184   RC(offered)                                   \
185   RC(sent)                                      \
186   RC(unwanted)                                  \
187   RC(accepted)                                  \
188   RC(rejected)                                  \
189   RC(deferred)
190
191 typedef enum {
192 #define RC_INDEX(x) RCI_##x
193   RESULT_COUNTS
194   RCI_max
195 } ResultCountIndex;
196
197 typedef struct {
198   int articles[2 /* checked */][RCI_max];
199 } Counts;
200
201
202 /*----- transmission buffers -----*/
203
204 #define CONNIOVS 128
205
206 typedef enum {
207   xk_Malloc, xk_Const, xk_Artdata;
208 } XmitKind;
209
210 typedef struct {
211   XmitKind kind;
212   union {
213     char *malloc_tofree;
214     ARTHANDLE *sm_art;
215   } info;
216 } XmitDetails;
217
218
219 /*----- core operational data structure types -----*/
220
221 struct Article {
222   int midlen;
223   int checked, sentbody;
224   InputFile *ipf;
225   TOKEN token;
226   off_t offset;
227   int blanklen;
228   char messageid[1];
229 };
230
231 typedef struct {
232   /* This is an instance of struct oop_readable */
233   struct oop_readable readable; /* first */
234   oop_readable_call *readable_callback;
235   void *readable_callback_user;
236
237   int fd;
238   const char *path; /* ptr copy of path_<foo> or feedfile */
239   struct Filemon_Perfile *filemon;
240
241   oop_read *rd;
242   long inprogress; /* no. of articles read but not processed */
243   off_t offset;
244 } InputFile;
245
246 typedef enum {
247   sm_WAITING,
248   sm_NORMAL,
249   sm_FLUSHING,
250   sm_FLUSHFAIL,
251   sm_DROPPING,
252   sm_SEPARATED,
253   sm_FINISHING;
254 } StateMachineState;
255
256 struct Conn {
257   ISNODE(Conn);
258   int fd, max_queue, stream;
259   LIST(Article) queue; /* not yet told peer, or CHECK said send it */
260   LIST(Article) sent; /* offered/transmitted - in xmit or waiting reply */
261   struct iovec xmit[CONNIOVS];
262   XmitDetails xmitd[CONNIOVS];
263   int xmitu;
264 };
265
266
267 /*----- operational variables -----*/
268
269 static int since_connect_attempt;
270 static int nconns;
271 static LIST(Conn) idle, working, full;
272 static LIST(Article) *queue;
273
274 static char *path_ductlock, *path_duct, *path_ductdefer;
275
276 static StateMachineState sms;
277 static FILE *defer;
278 static InputFile *main_input_file, *old_input_file;
279 static int waiting_periods_sofar;
280
281
282 /*----- function predeclarations -----*/
283
284 static void conn_check_work(Conn *conn);
285
286 static int filemon_init(void);
287 static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path);
288 static void filemon_callback(void);
289
290
291 /*========== utility functions etc. ==========*/
292
293 static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
294
295
296 /*========== making new connections ==========*/
297
298 static int connecting_sockets[2]= {-1,-1};
299 static pid_t connecting_child;
300
301 static void report_child_status(const char *what, int status) {
302   if (WIFEXITED(status)) {
303     int es= WEXITSTATUS(status);
304     if (es)
305       warn("%s: child died with error exit status %d",es);
306   } else if (WIFSIGNALED(status)) {
307     int sig= WTERMSIG(status);
308     const char *sigstr= strsignal(sig);
309     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
310     if (sigstr)
311       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
312     else
313       warn("%s: child died due to unknown fatal signal %d%s",
314            what, sig, coredump);
315   } else {
316     warn("%s: child died with unknown wait status %d", status);
317   }
318 }
319
320 static void connect_attempt_discard(void) {
321   if (connecting_sockets[0]) {
322     cancel_fd(loop, connecting_sockets[0], OOP_READ);
323     cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION);
324   }
325   perhaps_close(&connecting_sockets[0]);
326   perhaps_close(&connecting_sockets[1]);
327
328   if (connecting_child) {
329     int status;
330     r= kill(connecting_child, SIGKILL);
331     if (r) sysdie("cannot kill connect child");
332
333     pid_t got= waitpid(connecting_child, &status, WNOHANG);
334     if (got==-1) sysdie("cannot reap connect child");
335
336     if (!(WIFEXITED(status) ||
337           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) {
338       report_child_status("connect"
339     }
340     connecting_child= 0;
341   }
342 }
343
344 #define PREP_DECL_MSG_CMSG(msg)                 \
345   struct msghdr msg;                            \
346   memset(&msg,0,sizeof(msg));                   \
347   char msg##cbuf[CMSG_SPACE(sizeof(fd))];       \
348   msg.msg_control= msg##cbuf;                   \
349   msg.msg_controllen= sizeof(msg##cbuf);
350
351 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
352   Conn *conn= 0;
353
354   conn= xcalloc(sizeof(*conn));
355
356   DECL_MSG_CMSG(msg);
357   struct cmsghdr *h= 0;
358   ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
359   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
360   if (!h) {
361     int status;
362     pid_t got= waitpid(connecting_child, &status, WNOHANG);
363     if (got != -1) {
364       assert(got==connecting_child);
365       connecting_child= 0;
366       if (WIFEXITED(status) &&
367           (WEXITSTATUS(status) != 0
368            WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
369            WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)) {
370         /* child already reported the problem */
371       } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) {
372         warn("connect: connection attempt timed out");
373       } else if (!WIFEXITED(status)) {
374         report_child_status("connect", status);
375         /* that's probably the root cause then */
376       }
377     } else {
378       /* child is still running apparently, report the socket problem */
379       if (rs < 0)
380         syswarn("connect: read from child socket failed");
381       else if (e == OOP_EXCEPTIONN)
382         warn("connect: unexpected exception on child socket");
383       else
384         warn("connect: unexpected EOF on child socket");
385     }
386     goto x;
387   }
388
389 #define CHK(field, val)                                                   \
390   if (h->cmsg_##field != val) {                                           \
391     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \
392     goto x;                                                               \
393   }
394   CHK(level, SOL_SOCKET);
395   CHK(type,  SCM_RIGHTS);
396   CHK(len,   CMSG_LEN(sizeof(conn-b>fd)));
397 #undef CHK
398
399   if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; }
400
401   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
402
403   pid_t got= waitpid(connecting_child, &status, 0);
404   if (got==-1) sysdie("connect: real wait for child");
405   assert(got == connecting_child);
406   connecting_child= 0;
407
408   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
409   int es= WEXITSTATUS(status);
410   switch (es) {
411   case CONNCHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
412   case CONNCHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
413   default:
414     die("connect: child gave unexpected exit status %d", es);
415   }
416
417   set nonblocking;
418
419   /* Phew! */
420   LIST_ADDHEAD(idle, conn);
421   notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
422   connect_attempt_discard();
423   check_master_queue();
424   return 0;
425
426  x:
427   if (conn) {
428     perhaps_close(&conn->fd);
429     free(conn);
430   }
431   connect_attempt_discard();
432 }
433
434 static void connect_start() {
435   assert(!connecting_sockets[0]);
436   assert(!connecting_sockets[1]);
437   assert(!connecting_child);
438
439   notice("starting connection attempt");
440
441   r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
442   if (r) { syswarn("connect: cannot create socketpair for child"); goto x; }
443
444   connecting_child= fork();
445   if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; }
446
447   if (!connecting_child) {
448     FILE *cn_from, *cn_to;
449     char buf[NNTP_STRLEN+100];
450     int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
451
452     postfork();
453
454     r= close(connecting_sockets[0]);
455     if (r) sysdie("connect: close parent socket in child");
456
457     alarm(connection_setup_timeout);
458     if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) {
459       if (buf[0]) {
460         sanitise_inplace(buf);
461         die("connect: rejected: %s", buf);
462       } else {
463         sysdie("connect: connection attempt failed");
464       }
465     }
466     if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0)
467       sysdie("connect: authentication failed");
468     if (try_stream) {
469       if (fputs("MODE STREAM\r\n", cn_to) ||
470           fflush(cn_to))
471         sysdie("connect: could not send MODE STREAM");
472       buf[sizeof(buf)-1]= 0;
473       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
474         if (ferror(cn_from))
475           sysdie("connect: could not read response to MODE STREAM");
476         else
477           die("connect: connection close in response to MODE STREAM");
478       }
479       int l= strlen(buf);
480       assert(l>=1);
481       if (buf[-1]!='\n') {
482         sanitise_inplace(buf);
483         die("connect: response to MODE STREAM is too long: %.100s...",
484             remote_host, buf);
485       }
486       l--;  if (l>0 && buf[1-]=='\r') l--;
487       buf[l]= 0;
488       char *ep;
489       int rcode= strtoul(buf,&ep,10);
490       if (ep != buf[3]) {
491         sanitise_inplace(buf);
492         die("connect: bad response to MODE STREAM: %.50s", buf);
493       }
494       switch (rcode) {
495       case 203:
496         exitstatus= CONNCHILD_ESTATUS_STREAM;
497         break;
498       case 480:
499       case 500:
500         break;
501       default:
502         sanitise_inplace(buf);
503         warn("connect: unexpected response to MODE STREAM: %.50s", buf);
504         exitstatus= 2;
505         break;
506       }
507     }
508     int fd= fileno(cn_from);
509
510     PREP_DECL_MSG_CMSG(msg);
511     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
512     cmsg->cmsg_level= SOL_SOCKET;
513     cmsg->cmsg_type=  SCM_RIGHTS;
514     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
515     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
516
517     msg.msg_controllen= cmsg->cmsg_len;
518     r= sendmsg(connecting_sockets[1], &msg, 0);
519     if (r) sysdie("sendmsg failed for new connection");
520
521     _exit(exitstatus);
522   }
523
524   r= close(connecting_sockets[1]);  connecting_sockets[1]= 0;
525   if (r) sysdie("connect: close child socket in parent");
526
527   loop->on_fd(loop, connecting_sockets[0], OOP_READ,      connchild_event, 0);
528   loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0);
529   return OOP_CONTINUE;
530
531  x:
532   connect_attempt_discard();
533 }
534
535
536 /*========== overall control of article flow ==========*/
537
538 static void check_master_queue(void) {
539   try reading current feed file;
540
541   if (!queue.count)
542     return;
543
544   Conn *last_assigned=0;
545   for (;;) {
546     if (working.head) {
547       conn_assign_one_article(&working, &last_assigned);
548     } else if (idle.head) {
549       conn_assign_one_article(&idle, &last_assigned);
550     } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
551                !connecting_child && !connect_delay) {
552       connect_delay= reconnect_delay_periods;
553       connect_start();
554     } else {
555       break;
556     }
557   }
558   conn_check_work(last_assigned);
559 }
560
561 static void conn_assign_one_article(LIST(Conn) *connlist,
562                                     Conn **last_assigned) {
563   Conn *conn= connlist->head;
564
565   LIST_REMOVE(*connlist, conn);
566   Article *art= LIST_REMHEAD(queue);
567   LIST_ADDTAIL(conn->queue, art);
568   LIST_ADD(*conn_determine_right_list(conn), conn);
569
570   /* This slightly odd arrangement is so that we call conn_check_work
571    * once after filling the queue for a new connection in
572    * check_master_queue, rather than for each article. */
573   if (conn != *last_assigned && *last_assigned)
574     conn_check_work(*last_assigned);
575   *last_assigned= conn;
576 }
577
578 static int conn_total_queued_articles(Conn *conn) {
579   return conn->sent.count + conn->queue.count;
580 }
581
582 static LIST(Conn) *conn_determine_right_list(Conn *conn) {
583   int inqueue= conn_total_queued_articles(conn);
584   assert(inqueue <= max_queue);
585   if (inqueue == 0) return &idle;
586   if (inqueue == conn->max_queue) return &full;
587   return &working;
588 }
589
590 static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
591   check_conn_work(u);
592   return OOP_CONTINUE;
593 }
594
595 static void conn_check_work(Conn *conn)  {
596   void *rp= 0;
597   for (;;) {
598     conn_make_some_xmits(conn);
599     if (!conn->xmitu) {
600       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
601       return;
602     }
603
604     void *rp= conn_write_some_xmits(conn);
605     if (rp==OOP_CONTINUE) {
606       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
607       return;
608     } else if (rp==OOP_HALT) {
609       return;
610     } else if (!rp) {
611       /* transmitted everything */
612     } else {
613       abort();
614     }
615   }
616 }
617
618
619 /*========== article transmission ==========*/
620
621 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
622                   XmitKind kind) { /* caller must then fill in details */
623   struct iovec *v= &conn->xmit[conn->xmitu];
624   XmitDetails *d= &conn->xmitd[conn->xmitu++];
625   v->iov_base= data;
626   v->iov_len= len;
627   d->kind= kind;
628   return d;
629 }
630
631 static void xmit_noalloc(Conn *conn, const char *data, int len) {
632   xmit_core(conn,data,len, xk_Const);
633 }
634 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
635
636 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
637   XmitDetails *d= xmit_core(conn, ah->data, ah->len, sk_Artdata);
638   d->info.sm_art= ah;
639 }
640
641 static void xmit_free(XmitDetails *d) {
642   switch (d->kind) {
643   case xk_Malloc:  free(d->info.malloc_tofree);   break;
644   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
645   case xk_Const:                                  break;
646   default: abort();
647   }
648 }
649
650 static void *conn_write_some_xmits(Conn *conn) {
651   /* return values:
652    *      0:            nothing more to write, no need to call us again
653    *      OOP_CONTINUE: more to write but fd not writeable
654    *      OOP_HALT:     disaster, have destroyed conn
655    */
656   for (;;) {
657     int count= conn->xmitu;
658     if (!count) return 0;
659
660     if (count > IOV_MAX) count= IOV_MAX;
661     ssize_t rs= writev(conn->fd, conn->xmit, count);
662     if (rs < 0) {
663       if (errno == EAGAIN) return OOP_CONTINUE;
664       connfail(conn, "write failed: %s", strerror(errno));
665       return OOP_HALT;
666     }
667     assert(rs > 0);
668
669     for (done=0; rs && done<xmitu; done++) {
670       struct iovec *vp= &conn->xmit[done];
671       XmitDetails *dp= &conn->xmitd[done];
672       if (rs > vp->iov_len) {
673         rs -= vp->iov_len;
674         xmit_free(dp);
675       } else {
676         vp->iov_base += rs;
677         vp->iov_len -= rs;
678       }
679     }
680     int newu= conn->xmitu - done;
681     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
682     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
683     conn->xmitu= newu;
684   }
685 }
686
687 static void conn_make_some_xmits(Conn *conn) {
688   for (;;) {
689     if (conn->xmitu+5 > CONNIOVS)
690       break;
691
692     Article *art= LIST_REMHEAD(queue);
693     if (!art) break;
694
695     if (art->checked || (conn->stream && nocheck)) {
696       /* actually send it */
697
698       ARTHANDLE *artdata= SMretrieve();
699
700       if (conn->stream) {
701         if (artdata) {
702           XMIT_LITERAL("TAKETHIS ");
703           xmit_noalloc(conn, art->mid, art->midlen);
704           XMIT_LITERAL("\r\n");
705           xmit_artbody(conn, artdata);
706         }
707       } else {
708         /* we got 235 from IHAVE */
709         if (artdata) {
710           xmit_artbody(conn, artdata);
711         } else {
712           XMIT_LITERAL(".\r\n");
713         }
714       }
715
716       art->sent= 1;
717       LIST_ADDTAIL(conn->sent, art);
718
719       counts[art->checked].sent++;
720
721     } else {
722       /* check it */
723
724       if (conn->stream)
725         XMIT_LITERAL("IHAVE ");
726       else
727         XMIT_LITERAL("CHECK ");
728       xmit_noalloc(art->mid, art->midlen);
729       XMIT_LITERAL("\r\n");
730
731       LIST_ADDTAIL(conn->sent, art);
732       counts[art->checked].offered++;
733     }
734   }
735 }
736
737
738 /*========== handling responses from peer ==========*/
739
740 static const oop_rd_style peer_rd_style= {
741   OOP_RD_DELIM_STRIP, '\n',
742   OOP_RD_NUL_FORBID,
743   OOP_RD_SHORTREC_FORBID
744 };
745
746 static Article *article_reply_check(Connection *conn, const char *response,
747                                     int code_indicates_streaming,
748                                     const char *sanitised_response) {
749   Article *art= LIST_REMHEAD(conn->sent);
750
751   if (!art) {
752     connfail(conn,
753              "peer gave unexpected response when no commands outstanding: %s",
754              sanitised_response);
755     return 0;
756   }
757
758   if (code_indicates_streaming) {
759     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
760     if (!conn->stream) {
761       connfail("peer gave streaming response code "
762                " to IHAVE or subsequent body: %s", sanitised_response);
763       return 0;
764     }
765     const char *got_mid= response+4;
766     int got_midlen= strcspn(got_mid, " \n\r");
767     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
768       connfail("peer gave streaming response with syntactically invalid"
769                " messageid: %s", sanitised_response);
770       return 0;
771     }
772     if (got_midlen != art->midlen ||
773         memcmp(got_mid, art->messageid, got_midlen)) {
774       connfail("peer gave streaming response code to wrong article -"
775                " probable synchronisation problem; we offered: %s;"
776                " peer said: %s",
777                art->messageid, sanitised_response);
778       return 0;
779     }
780   } else {
781     if (conn->stream) {
782       connfail("peer gave non-streaming response code to CHECK/TAKETHIS: %s",
783                sanitised_response);
784       return 0;
785     }
786   }
787
788   return art;
789 }
790
791 static void update_nocheck(int accepted) {
792   accept_proportion *= accept_decay;
793   accept_proportion += accepted;
794   nocheck= accept_proportion >= nocheck_thresh;
795   if (nocheck && !nocheck_reported) {
796     notice("entering nocheck mode for the first time");
797     nocheck_reported= 1;
798   }
799 }
800
801 static void article_done(Connection *conn, Article *art, int whichcount) {
802   *count++;
803   counts.articles[art->checked][whichcount]++;
804   if (whichcount == RC_accepted) update_nocheck(1);
805   else if (whichcount == RC_unwanted) update_nocheck(0);
806
807   InputFile *ipf= art->ipf;
808   while (art->blanklen) {
809     static const char spaces[]=
810       "                                                                "
811       "                                                                "
812       "                                                                "
813       "                                                                ";
814     int w= art->blanklen;  if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
815     int r= pwrite(ipf->fd, spaces, w, art->offset);
816     if (r==-1) {
817       if (errno==EINTR) continue;
818       sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
819              art->messageid, art->blanklen, art->offset, ipf->path);
820     }
821     assert(r>=0 && r<=w);
822     art->blanklen -= w;
823     art->offset += w;
824   }
825
826   ipf->inprogress--;
827   assert(ipf->inprogress >= 0);
828
829   free(art);
830 }
831
832 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev,
833                          const char *errmsg, int errnoval,
834                          const char *data, size_t recsz, void *conn_v) {
835   Conn *conn= conn_v;
836   connfail(conn, "error receiving from peer: %s", errmsg);
837   return OOP_CONTINUE;
838 }
839
840 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
841                         const char *errmsg, int errnoval,
842                         const char *data, size_t recsz, void *conn_v) {
843   Conn *conn= conn_v;
844
845   if (ev == OOP_RD_EOF) {
846     connfail(conn, "unexpected EOF from peer");
847     return OOP_CONTINUE;
848   }
849   assert(ev == OOP_RD_OK);
850
851   char *ep;
852   unsigned long code= strtoul(data, &ep, 10);
853   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
854     char sanibuf[100];
855     const char *p= data;
856     char *q= sanibuf;
857     *q++= '`';
858     for (;;) {
859       if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; }
860       int c= *p++;
861       if (!c) { *q++= '\''; break; }
862       if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
863       sprintf(q,"\\x%02x",c);
864       q += 4;
865     }
866     connfail(conn, "badly formatted response from peer: %s", sanibuf);
867     return OOP_CONTINUE;
868   }
869
870   if (conn->quitting) {
871     if (code!=205) {
872       connfail(conn, "peer gave failure response to QUIT: %s", sani);
873       return OOP_CONTINUE;
874     }
875     conn close ok;
876     return;
877   }
878
879   Article *art;
880
881 #define GET_ARTICLE                                                         \
882   art= article_reply_check(conn, data, code_streaming, sani);               \
883   if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */
884
885 #define ARTICLE_DEALTWITH(streaming,how)                        \
886   code_streaming= (streaming)                                   \
887   GET_ARTICLE;                                                  \
888   article_done(conn, art, RC_##how);  break;
889
890 #define PEERBADMSG(m) connfail(conn, m ": %s", sani);  return OOP_CONTINUE
891
892   int code_streaming= 0;
893
894   switch (code) {
895
896   case 400: PEERBADMSG("peer stopped accepting articles");
897   case 503: PEERBADMSG("peer timed us out");
898   default:  PEERBADMSG("peer sent unexpected message");
899
900   case 435: ARTICLE_DEALTWITH(0,unwanted); /* IHAVE says they have it */
901   case 438: ARTICLE_DEALTWITH(1,unwanted); /* CHECK/TAKETHIS: they have it */
902
903   case 235: ARTICLE_DEALTWITH(0,accepted); /* IHAVE says thanks */
904   case 239: ARTICLE_DEALTWITH(1,accepted); /* TAKETHIS says thanks */
905
906   case 437: ARTICLE_DEALTWITH(0,rejected); /* IHAVE says rejected */
907   case 439: ARTICLE_DEALTWITH(1,rejected); /* TAKETHIS says rejected */
908
909   case 238: /* CHECK says send it */
910     code_streaming= 1;
911   case 335: /* IHAVE says send it */
912     GET_ARTICLE;
913     count_checkedwanted++;
914     LIST_ADDTAIL(conn->queue);
915     if (art->checked) {
916       connfail("peer gave %d response to article body: %s",code, sani);
917       return OOP_CONTINUE;
918     }
919     art->checked= 1;
920     break;
921
922   case 431: /* CHECK or TAKETHIS says try later */
923     code_streaming= 1;
924   case 436: /* IHAVE says try later */
925     GET_ARTICLE;
926     if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
927         || fflush(defer))
928       sysdie("write to defer file %s",path_ductdefer);
929     article_done(conn, art, RC_deferred);
930     break;
931
932   }
933
934   check_check_work(conn);
935   return OOP_CONTINUE;
936 }
937
938  
939 /*========== monitoring of input files ==========*/
940
941 static void feedfile_eof(InputFile *ipf) {
942   assert(ipf != main_input_file); /* promised by tailing_try_read */
943   assert(ipf == old_input_file);
944   assert(sms == sm_SEPARATED);
945   sms= sm_FINISHING;
946   inputfile_tailing_stop(ipf);
947   inputfile_tailing_start(main_input_file);
948 }
949
950 static InputFile *open_input_file(const char *path) {
951   int fd= open(path, O_RDONLY);
952   if (fd<0) {
953     if (errno==ENOENT) return 0;
954     sysdie("unable to open input file %s", path);
955   }
956
957   InputFile *ipf= xmalloc(sizeof(InputFile));
958   memset(ipf,0,sizeof(*ipf));
959   
960   ipf->readable.on_readable= tailing_on_readable;
961   ipf->readable.on_cancel=   tailing_on_cancel;
962   ipf->readable.try_read=    tailing_try_read;
963
964   ipf->fd= fd;
965   ipf->path= path;
966   
967   return ipf;
968 }
969
970 static void close_input_file(InputFile *ipf) {
971   assert(!ipf->readable_callback); /* must have had ->on_cancel */
972   assert(!ipf->filemon); /* must have had inputfile_tailing_stop */
973   assert(!ipf->rd); /* must have had inputfile_tailing_stop */
974   assert(!ipf->inprogress); /* no dangling pointers pointing here */
975
976   if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path);
977   free(ipf);
978 }
979
980
981 /*---------- dealing with articles read in the input file ----------*/
982
983 typedef void *feedfile_got_article(oop_source *lp, oop_read *rd,
984                                    oop_rd_event ev, const char *errmsg,
985                                    int errnoval,
986                                    const char *data, size_t recsz,
987                                    void *ipf_v) {
988   InputFile *ipf= ipf_v;
989   Article *art;
990   char tokentextbuf[sizeof(TOKEN)*2+3];
991
992   if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
993
994   if (data[0] && data[0]!=' ') {
995     char *space= strchr(data,' ');
996     int tokenlen= space-data;
997     int midlen= (int)recsz-tokenlen-1;
998     if (midlen < 0) goto bad_data;
999
1000     if (tokenlen != sizeof(TOKEN)*2+2) goto bad_data;
1001     memcpy(tokentextbuf, data, tokenlen);
1002     tokentextbuf[tokenlen]= 0;
1003     if (!IsToken(tokentextbuf)) goto bad_data;
1004
1005     art= xmalloc(sizeof(*art) - 1 + midlen + 1);
1006     art->offset= ipf->offset;
1007     art->blanklen= recsz;
1008     art->midlen= midlen;
1009     art->checked= art->sentbody= 0;
1010     art->ipf= ipf;  ipf->inprogress++;
1011     art->token= TextToToken(tokentextbuf);
1012     strcpy(art->messageid, space+1);
1013     LIST_ADDTAIL(queue, art);
1014   }
1015   ipf->offset += recsz + 1;
1016
1017   if (sms==sm_NORMAL && ipf->offset >= flush_threshold) {
1018     int r= link(feedfile, duct_path);
1019     if (r) sysdie("link feedfile %s to ductfile %s", feedfile, dut_path);
1020     /* => Hardlinked */
1021
1022     r= unlink(feedfile);
1023     if (r) sysdie("unlink old feedfile link %s", feedfile);
1024     /* => Moved */
1025
1026     spawn_inndcomm_flush(); /* => Flushing, sets sms to sm_FLUSHING */
1027   }
1028
1029   check_master_queue();
1030 }
1031
1032
1033 /*========== tailing input file ==========*/
1034
1035 static void filemon_start(InputFile *ipf) {
1036   assert(!ipf->filemon);
1037
1038   ipf->filemon= xmalloc(sizeof(*ipf->filemon));
1039   memset(ipf->filemon, 0, sizeof(*ipf->filemon));
1040   filemon_method_startfile(ipf, ipf->filemon);
1041 }
1042
1043 static void filemon_stop(InputFile *ipf) {
1044   if (!ipf->filemon) return;
1045   filemon_method_stopfile(ipf, ipf->filemon);
1046   free(ipf->filemon);
1047   ipf->filemon= 0;
1048 }
1049
1050 static void filemon_callback(InputFile *ipf) {
1051   ipf->readable_callback(ipf->readable_callback_user);
1052 }
1053
1054 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
1055                                      void *user) {
1056   InputFile *ipf= user;
1057   return ipf->readable_callback(ipf->readable_callback_user);
1058 }
1059
1060 static void on_cancel(struct oop_readable *rable) {
1061   InputFile *ipf= (void*)rable;
1062
1063   if (ipf->filemon) filemon_stopfile(ipf);
1064   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
1065   ipf->readable_callback= 0;
1066 }
1067
1068 static int tailing_on_readable(struct oop_readable *rable,
1069                                 oop_readable_call *cb, void *user) {
1070   InputFile *ipf= (void*)rable;
1071
1072   tailing_on_cancel(rable);
1073   ipf->readable_callback= cb;
1074   ipf->readable_callback_user= user;
1075   filemon_startfile(ipf);
1076
1077   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
1078   return 0;
1079 }
1080
1081 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
1082                                 size_t length) {
1083   InputFile *ipf= (void*)rable;
1084   for (;;) {
1085     ssize_t r= read(ipf->fd, buffer, length);
1086     if (!r && ipf==main_input_file) { errno=EAGAIN; return -1; }
1087     if (r==-1 && errno==EINTR) continue;
1088     return r;
1089   }
1090 }
1091
1092 /*---------- filemon implemented with inotify ----------*/
1093
1094 #if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON)
1095 #define HAVE_FILEMON
1096
1097 #include <linux/inotify.h>
1098
1099 static int filemon_inotify_fd;
1100 static int filemon_inotify_wdmax;
1101 static InputFile **filemon_inotify_wd2ipf;
1102
1103 typedef struct Filemon_Perfile {
1104   int wd;
1105 } Filemon_Inotify_Perfile;
1106
1107 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
1108   int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
1109   if (wd < 0) sysdie("inotify_add_watch %s", ipf->path);
1110
1111   if (wd >= filemon_inotify_wdmax) {
1112     int newmax= wd+2;
1113     filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf,
1114                                  sizeof(*filemon_inotify_wd2ipf) * newmax);
1115     memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
1116            sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
1117     filemon_inotify_wdmax= newmax;
1118   }
1119
1120   assert(!filemon_inotify_wd2ipf[wd]);
1121   filemon_inotify_wd2ipf[wd]= ipf;
1122
1123   pf->wd= wd;
1124 }
1125
1126 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
1127   int wd= pf->wd;
1128   int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
1129   if (r) sysdie("inotify_rm_watch");
1130   filemon_inotify_wd2ipf[wd]= 0;
1131 }
1132
1133 static void *filemon_inotify_readable(oop_source *lp, int fd,
1134                                       oop_event e, void *u) {
1135   struct inotify_event iev;
1136   for (;;) {
1137     int r= read(filemon_inotify_fd, &iev, sizeof(iev));
1138     if (r==-1) {
1139       if (errno==EAGAIN) break;
1140       sysdie("read from inotify master");
1141     } else if (r==sizeof(iev)) {
1142       assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
1143     } else {
1144       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
1145     }
1146     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
1147     filemon_callback(ipf);
1148   }
1149   return OOP_CONTINUE;
1150 }
1151
1152 static int filemon_method_init(void) {
1153   filemon_inotify_fd= inotify_init();
1154   if (filemon_inotify_fd<0) {
1155     syswarn("could not initialise inotify: inotify_init failed");
1156     return 0;
1157   }
1158   set nonblock;
1159   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
1160
1161   return 1;
1162 }
1163
1164 #endif /* HAVE_INOTIFY && !HAVE_FILEMON *//
1165
1166 /*---------- filemon dummy implementation ----------*/
1167
1168 #if !defined(HAVE_FILEMON)
1169
1170 typedef struct Filemon_Perfile { int dummy; } Filemon_Dummy_Perfile;
1171
1172 static int filemon_method_init(void) { return 0; }
1173 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
1174 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
1175
1176 #endif /* !HAVE_FILEMON */
1177
1178 /*---------- interface to start and stop an input file ----------*/
1179
1180 static const oop_rd_style feedfile_rdstyle= {
1181   OOP_RD_DELIM_STRIP, '\n',
1182   OOP_RD_NUL_FORBID,
1183   OOP_RD_SHORTREC_EOF,
1184 };
1185
1186 static void inputfile_tailing_start(InputFile *ipf) {
1187   assert(!ipf->fd);
1188   ipf->readable->on_readable= tailing_on_readable;
1189   ipf->readable->on_cancel=   tailing_on_cancel;
1190   ipf->readable->try_read=    tailing_try_read;
1191   ipf->readable->delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
1192   ipf->readable->delete_kill= 0;
1193
1194   ipf->readable_callback= 0;
1195   ipf->readable_callback_user= 0;
1196
1197   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
1198   assert(ipf->fd);
1199
1200   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
1201                      feedfile_got_article,ipf, feedfile_problem,ipf);
1202   if (r) sysdie("unable start reading feedfile %s",ipf->path);
1203 }
1204
1205 static void inputfile_tailing_stop(InputFile *ipf) {
1206   assert(ipf->fd);
1207   oop_rd_delete(ipf->rd);
1208   ipf->rd= 0;
1209   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
1210 }
1211
1212
1213 /*========== interaction with innd ==========*/
1214
1215 /* See official state diagram at top of file.  We implement
1216  * this as follows:
1217  *
1218            ================
1219             WAITING
1220            [Nothing/Noduct]
1221             poll for F
1222            ================
1223                 |
1224                 |     TIMEOUT
1225                 |`--------------------------.
1226                 |                           | install defer as backlog
1227                 | OPEN F SUCCEEDS           | exit
1228      ,--------->|                           V
1229      |          V                         =========
1230      |     ========                        (ESRCH)
1231      |      NORMAL                        [Dropped]
1232      |     [Normal]                       =========
1233      |      read F
1234      |     ========
1235      |          |
1236      |          | F IS SO BIG WE SHOULD FLUSH
1237      ^          | hardlink F to D
1238      |     [Hardlinked]
1239      |          | unlink F
1240      |          | our handle onto F is now onto D
1241      |     [Moved]
1242      |          |
1243      |          |<---------------------------------------------------.
1244      |          |                                                    |
1245      |          | spawn inndcomm flush                               |
1246      |          V                                                    |
1247      |     ==========                                                |
1248      |      FLUSHING                                                 |
1249      |     [Flushing]                                                |
1250      |      read D                                                   |
1251      |     ==========                                                |
1252      |          |                                                    |
1253      |          |   INNDCOMM FLUSH FAILS                             ^
1254      |          |`----------------------->--------.                  |
1255      |          |                                 |                  |
1256      |          |   NO SUCH SITE                  V                  |
1257      ^          |`----------------.            =========             |
1258      |          |                 |            FLUSHFAIL             |
1259      |          |                 V            [Moved]               |
1260      |          |            ==========        read D                |
1261      |          |             DROPPING         =========             |
1262      |          |            [Dropping]           |                  |
1263      |          |             read D              | TIME TO RETRY    |
1264      |          |            ==========           `------------------'
1265      |          | FLUSH OK        |
1266      |          | open F          | AT EOF OF D AND ALL PROCESSED
1267      |          V                 | install defer as backlog
1268      |     ===========            | unlink D
1269      |      SEPARATED             | exit
1270      |     [Separated]            V
1271      |      read D            ==========
1272      |     ===========         (ESRCH)
1273      |          |             [Droppped]
1274      |          |             ==========
1275      |          V
1276      |          | AT EOF OF D
1277      ^          |
1278      |     ===========
1279      |      FINISHING
1280      |     [Finishing]
1281      |      read F
1282      |      write D
1283      |     ===========
1284      |          |
1285      |          | ALL D PROCESSED
1286      |          | install defer as backlog
1287      |          | start new defer
1288      ^          V unlink D
1289      |          | close D
1290      |          |
1291      `----------'
1292
1293  *
1294  */
1295
1296 static void open_defer(void) {
1297   struct stat stab;
1298
1299   assert(!defer);
1300   defer= fopen(path_ductdefer, "a+");
1301   if (!defer) sysdie("could not open defer file %s", path_ductdefer);
1302
1303   /* truncate away any half-written records */
1304
1305   r= fstat(fileno(defer), &stab);
1306   if (r) sysdie("could not stat newly opened defer file %s", path_ductdefer);
1307
1308   if (stab.st_size > LONG_MAX)
1309     die("defer file %s size is far too large", path_ductdefer);
1310
1311   if (!stab.st_size)
1312     return;
1313
1314   long orgsize= stab.st_size;
1315   long truncto= stab.st_size;
1316   for (;;) {
1317     if (!truncto) break; /* was only (if anything) one half-truncated record */
1318     if (fseek(defer, truncto-1, SEEK_SET) < 0)
1319       sysdie("seek in defer file %s while truncating partial", path_ductdefer);
1320
1321     r= getc(defer);
1322     if (r==EOF) {
1323       if (ferror(defer))
1324         sysdie("failed read from defer file %s", path_ductdefer);
1325       else
1326         die("defer file %s shrank while we were checking it!", path_ductdefer);
1327     }
1328     if (r=='\n') break;
1329     truncto--;
1330   }
1331
1332   if (stab.st_size != truncto) {
1333     warn("truncating half-record at end of defer file %s -"
1334          " shrinking by %ld bytes from %ld to %ld",
1335          path_ductdefer, orgsize - truncto, orgsize, truncto);
1336
1337     if (fflush(defer)) sysdie("could not flush defer file %s", path_ductdefer);
1338     if (ftruncate(fileno(defer), truncto))
1339       sysdie("could not truncate defer file %s", path_ductdefer);
1340
1341   } else {
1342     info("continuing existing defer file %s (%ld bytes)",
1343          path_ductdefer, orgsize);
1344   }
1345   if (fseek(defer, truncto, SEEK_SET))
1346     sysdie("could not seek to new end of defer file %s", path_ductdefer);
1347 }
1348
1349 static void statemc_init(void) {
1350   struct stat stab;
1351
1352   path_ductlock=  xasprintf("%s_duct.lock",  feedfile);
1353   path_duct=      xasprintf("%s_duct",       feedfile);
1354   path_ductdefer= xasprintf("%s_duct.defer", feedfile);
1355
1356   if (lstat(path_ductdefer, &stab)) {
1357     if (errno!=ENOENT) sysdie("could not check defer file %s", path_defer);
1358   } else {
1359     if (!S_ISREG(stab.st_mode))
1360       die("defer file %s not a plain file (mode 0%lo)",
1361           path_defer, (unsigned long)stab.st_mode);
1362     switch (stab.st_nlink==1) {
1363     case 1: /* ok */ break;
1364     case 2:
1365       if (unlink(path_defer))
1366         sysdie("could not unlink stale defer file link %s (presumably"
1367                " hardlink to backlog file)", path_defer);
1368       break;
1369     default:
1370       die("defer file %s has unexpected link count %d",
1371           path_defer, stab.st_nlink);
1372     }
1373   }
1374   open_defer();
1375
1376   int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600);
1377   if (lockfd<0) sysdie("open lockfile %s", path_ductlock);
1378
1379   struct flock fl;
1380   memset(&fl,0,sizeof(fl));
1381   fl.l_type= F_WRLCK;
1382   fl.l_whence= SEEK_SET;
1383   r= fcntl(lockfd, F_SETLK, &fl);
1384   if (r==-1) {
1385     if (errno==EACCES || errno==EAGAIN)
1386       die("another duct holds the lockfile");
1387     sysdie("fcntl F_SETLK lockfile %s", path_ductlock);
1388   }
1389
1390   InputFile *file_d= open_input_file(path_duct);
1391
1392   if (file_d) {
1393     struct stat stab_f, stab_d;
1394
1395     r= stat(feedfile, &stab_f);
1396     if (r) {
1397       if (errno!=ENOENT) sysdie("check feed file %s", feedfile);
1398       /* D exists, F ENOENT => Moved */
1399       goto found_moved;
1400     }
1401
1402     /* F and D both exist */
1403
1404     r= fstat(file_d->fd, &stab_d);
1405     if (r) sysdie("check duct file %s", ductfile);
1406
1407     if (stab_d.st_ino == stab_f.st_ino &&
1408         stab_d.st_dev == stab_f.st_dev) {
1409       /* F==D => Hardlinked*/
1410       r= unlink(path_duct);
1411       if (r) sysdie("unlink feed file %s during startup", feedfile);
1412     found_moved:
1413       /* => Moved */
1414       startup_set_input_file(file_d);
1415       spawn_inndcomm_flush(); /* => Flushing, sets sms to sm_FLUSHING */
1416     } else {
1417       /* F!=D => Separated */
1418       sms= sm_SEPARATED;
1419       startup_set_input_file(file_d);
1420     }
1421   } else { /*!file_d*/
1422     sms= sm_WAITING;
1423     statemc_waiting_poll();
1424   }
1425 }
1426
1427 static void statemc_poll(void) {
1428   if (sms == sm_WAITING) statemc_waiting_poll();
1429   if (sms == sm_FINISHING && !old_input_file->inprogress) statemc_finishdone();
1430 }
1431
1432 static void statemc_waiting_poll(void) {
1433   InputFile *file_f= open_input_file(feedfile);
1434   if (!file_f) {
1435     if (waiting_periods_sofar++ > waiting_timeout_periods)
1436       die("timed out waiting for innd to create feed file %s", feedfile);
1437     return;
1438   }
1439   startup_set_input_file(file_d);
1440   sms= sm_NORMAL;
1441 }
1442
1443 static void startup_set_input_file(InputFile *f) {
1444   assert(!main_input_file);
1445   main_input_file= f;
1446   inputfile_tailing_start(f);
1447 }
1448
1449 static void statmc_finishdone(void) {
1450   time_t now;
1451   struct stat stab;
1452
1453   assert(sms == sm_FINISHING);
1454
1455   r= fstat(fileno(defer), &stab);
1456   if (r) sysdie("check defer file %s", path_defer);
1457
1458   if (fclose(defer)) sysdie("could not close defer file %s", path_defer);
1459   defer= 0;
1460
1461   now= time(0);
1462   if (now==-1) sysdie("could not get current time for backlog filename");
1463
1464   char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
1465                            (unsigned long)now,
1466                            (unsigned long)stab.st_ino);
1467   if (link(path_defer, path_backlog))
1468     sysdie("could not install defer file %s as backlog file %s",
1469            path_defer, backlog);
1470   if (unlink(path_defer))
1471     sysdie("could not unlink old defer link %s to backlog file %s",
1472            path_defer, backlog);
1473   open_defer();
1474
1475   close_input_file(old_input_file);
1476   old_input_file= 0;
1477
1478   if (unlink(path_duct))
1479     sysdie("could not unlink old duct file %s", path_duct);
1480
1481   sms= sm_NORMAL;
1482 }
1483
1484 /*========== flushing the feed ==========*/
1485
1486  
1487
1488 /*========== main program ==========*/
1489
1490 static void postfork_inputfile(InputFile *ipf) {
1491   if (!ipf) return;
1492   assert(ipf->fd >= 0);
1493   close(ipf->fd);
1494   ipf->fd= -1;
1495 }
1496
1497 static void postfork_conns(Connection *conn) {
1498   while (conn) {
1499     close(conn->fd);
1500     conn= conn->next;
1501   }
1502 }
1503
1504 static void postfork_stdio(FILE *f) {
1505   /* we have no stdio streams that are buffered long-term */
1506   if (f) fclose(f);
1507 }
1508
1509 static void postfork(const char *what) {
1510   if (signal(SIGPIPE, SIG_DFL) == SIG_ERR)
1511     sysdie("%s child: failed to reset SIGPIPE");
1512
1513   postfork_inputfile(main_input_file);
1514   postfork_inputfile(old_input_file);
1515   postfork_conns(idle.head);
1516   postfork_conns(working.head);
1517   postfork_conns(full.head);
1518   postfork_stdio(defer);
1519 }
1520
1521
1522 #define EVERY(what, interval, body)                                          \
1523   static const struct timeval what##_timeout = { 5, 0 };                     \
1524   static void what##_schedule(void);                                         \
1525   static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \
1526     { body }                                                                 \
1527     what##_schedule();                                                       \
1528   }                                                                          \
1529   static void what##_schedule(void) {                                        \
1530     loop->on_time(loop, what##_timeout, what##_timedout, 0);                 \
1531   }
1532
1533 EVERY(filepoll, {5,0}, { check_master_queue(); })
1534
1535 EVERY(period, {PERIOD_SECONDS,0}, {
1536   if (connect_delay) connect_delay--;
1537   statemc_poll();
1538   check_master_queue();
1539 });
1540
1541 main {
1542   ignore sigpipe;
1543   if (!filemon_init())
1544     filepoll_schedule();
1545   period_schedule();
1546 };