chiark / gitweb /
ba4c6bd1330fea3b3797ed3d741862acde3005e2
[inn-innduct.git] / backends / innduct.c
1 /*
2  * Newsfeeds file entries should look like this:
3  *     host.name.of.site[/exclude,exclude,...]\
4  *             :pattern,pattern...[/distribution,distribution...]\
5  *             :Tf,Wnm
6  *             :
7  * or
8  *     sitename[/exclude,exclude,...]\
9  *             :pattern,pattern...[/distribution,distribution...]\
10  *             :Tf,Wnm
11  *             :host.name.of.site
12  *
13  * Four files full of
14  *    token messageid
15  * or might be blanked out
16  *    <spc><spc><spc><spc>....
17  *
18  *   site.name_duct.lock       lock preventing multiple ducts
19  *                                holder of this lock is "duct"
20  * F site.name                 main feed file
21  *                                opened/created, then written, by innd
22  *                                read by duct
23  *                                unlinked by duct
24  *                                tokens blanked out by duct when processed
25  * D site.name_duct            temporary feed file during flush (or crash)
26  *                                hardlink created by duct
27  *                                unlinked by duct
28  *   site.name_duct.defer      431'd articles, still being written,
29  *                                created, written, used by duct
30  *   site.name_backlog.lock    lock taken out by innxmit wrapper
31  *                                holder and its child are "xmit"
32  *   site.name_backlog_<date>.<inum>
33  *                             431'd articles, ready for innxmit
34  *                                created (link/mv) by duct
35  *                                read by xmit
36  *                                unlinked by xmit
37  *   site.name_backlog_<letters> eg
38  *   site.name_backlog_manual
39  *                             anything the sysadmin likes (eg, feed files
40  *                             from old feeds to be merged into this one)
41  *                                created (link/mv) by admin
42  *                                read by xmit
43  *                                unlinked by xmit
44
45
46    OVERALL STATES:
47
48                                                                 START
49                                                                   |
50                                                              check D, F
51                                                                   |
52                           <--------------------------------------'|
53         Nothing                            F, D both ENOENT       |
54          F: ENOENT                                                |
55          D: ENOENT                                                |
56          duct: not not reading anything                           |
57            |                                                      |
58            |`---------------------.                               |
59            |                      | duct times out waiting for F  |
60            V  innd creates F      | duct exits                    |
61            |                      V                               |
62         Noduct                    GO TO Dropped                   |
63          F: innd writing                                          |
64          D: ENOENT                                                |
65          duct: not running or not reading anything                |
66            |                                                      |
67            |                                                      |
68      ,-->--+                   <---------------------------------'|
69      |     |  duct opens F                         F exists       |
70      |     |                                       D ENOENT       |
71      |     V                                                      |
72      |  Normal                                                    |
73      |   F: innd writing, duct reading                            |
74      |   D: ENOENT                                                |
75      |     |                                                      |
76      |     |  duct decides time to flush                          |
77      |     |  duct makes hardlink                                 |
78      |     |                                                      |
79      |     V                            <------------------------'|
80      |  Hardlinked                                  F==D          |
81      |   F == D: innd writing, duct reading         both exist    |
82      ^     |                                                      |
83      |     |  duct unlinks F                                      |
84      |     V                                                      |
85      |  Moved                               <----+------------<--'|
86      |   F: ENOENT                               |  F ENOENT      |
87      |   D: innd writing, duct reading           |  D exists      |
88      |     |                                     |                |
89      |     |  duct requests flush of feed        |                |
90      |     |   (others can too, harmlessly)      |                |
91      |     V                                     |                |
92      |  Flushing                                 |                |
93      |   F: ENOENT                               |                |
94      |   D: innd flushing, duct reading          |                |
95      |     |                                     |                |
96      |     |   inndcomm flush fails              |                |
97      |     |`-------------------------->---------'                |
98      |     |                                                      |
99      |     |   inndcomm reports no such site                      |
100      |     |`---------------------------------------------------- | -.
101      |     |                                                      |  |
102      |     |  innd finishes writing D, creates F                  |  |
103      |     |  inndcomm reports flush successful                   |  |
104      |     |                                                      |  |
105      |     V                                                      |  |
106      |  Separated                                <----------------'  |
107      |   F: innd writing                            F!=D             /
108      |   D: duct reading                             both exist     /
109      |     |                                                       /
110      |     |  duct gets to the end of D                           /
111      |     |  duct opens F too                                   /
112      |     V                                                    /
113      |  Finishing                                              /
114      |   F: innd writing, duct reading                        |
115      |   D: duct finishing                                    V
116      |     |                                            Dropping
117      |     |  duct finishes processing D                 F: ENOENT
118      |     V  duct unlinks D                             D: duct reading
119      |     |                                                  |
120      `--<--'                                                  | duct finishes
121                                                               |  processing D
122                                                               | duct unlinks D
123                                                               | duct exits
124                                                               V
125                                                         Dropped
126                                                          F: ENOENT
127                                                          D: ENOENT
128                                                          duct not running
129
130    "duct reading" means innduct is reading the file but also
131    overwriting processed tokens.
132
133  *
134  *
135  */
136
137 #define PERIOD_SECONDS 60
138
139 static char *feedfile;
140 static int max_connections, max_queue_per_conn;
141 static int connection_setup_timeout, port, try_stream;
142 static const char *remote_host;
143
144 static double accept_proportion;
145 static double nocheck_thresh= 0.95;
146 static double nocheck_decay= 1-1/100;
147 static int nocheck, nocheck_reported;
148
149 #define ISNODE(T)    T *next, *back;
150 #define LIST(T)      struct { T *head, *tail, *tailpred; int count; }
151
152 #define NODE(n) ((struct node*)&(n)->head)
153
154 #define LIST_ADDHEAD(l,n)                                               \
155  (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++)
156 #define LIST_ADDTAIL(l,n)                                               \
157  (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++)
158
159 #define LIST_REMHEAD(l)                                                   \
160  ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0)
161 #define LIST_REMTAIL(l)                                                   \
162  ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0)
163 #define LIST_REMOVE(l,n)                        \
164  (list_remove(NODE((n))), (void)(l).count--)
165 #define LIST_INSERT(l,n,pred) \
166  (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++)
167
168
169 #define RESULT_COUNTS                           \
170   RC(offered)                                   \
171   RC(sent)                                      \
172   RC(unwanted)                                  \
173   RC(accepted)                                  \
174   RC(rejected)                                  \
175   RC(deferred)
176
177 typedef enum {
178 #define RC_INDEX(x) RCI_##x
179   RESULT_COUNTS
180   RCI_max
181 } ResultCountIndex;
182
183 typedef struct {
184   int articles[2 /* checked */][RCI_max];
185 } Counts;
186
187 struct Article {
188   int midlen;
189   int checked, sentbody;
190   InputFile *ipf;
191   TOKEN token;
192   off_t offset;
193   int blanklen;
194   char messageid[1];
195 };
196
197 #define CONNIOVS 128
198
199 #define CN "<%d> "
200
201 typedef struct Conn Conn;
202
203 typedef enum {
204   xk_Malloc, xk_Const, xk_Artdata;
205 } XmitKind;
206
207 typedef struct {
208   XmitKind kind;
209   union {
210     char *malloc_tofree;
211     ARTHANDLE *sm_art;
212   } info;
213 } XmitDetails;
214
215 struct Conn {
216   ISNODE(Conn);
217   int fd, max_queue, stream;
218   LIST(Article) queue; /* not yet told peer, or CHECK said send it */
219   LIST(Article) sent; /* offered/transmitted - in xmit or waiting reply */
220   struct iovec xmit[CONNIOVS];
221   XmitDetails xmitd[CONNIOVS];
222   int xmitu;
223 };
224
225 static FILE *defer;
226
227 static int filemon_init(void);
228 static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path);
229 static void filemon_callback(void);
230
231
232 #define CHILD_ESTATUS_STREAM   4
233 #define CHILD_ESTATUS_NOSTREAM 5
234
235 static int since_connect_attempt;
236 static int nconns;
237 static LIST(Conn) idle, working, full;
238
239 static LIST(Article) *queue;
240
241 static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
242
243 /*========== making new connections ==========*/
244
245 static int connecting_sockets[2]= {-1,-1};
246 static pid_t connecting_child;
247
248 static void report_child_status(const char *what, int status) {
249   if (WIFEXITED(status)) {
250     int es= WEXITSTATUS(status);
251     if (es)
252       warn("%s: child died with error exit status %d",es);
253   } else if (WIFSIGNALED(status)) {
254     int sig= WTERMSIG(status);
255     const char *sigstr= strsignal(sig);
256     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
257     if (sigstr)
258       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
259     else
260       warn("%s: child died due to unknown fatal signal %d%s",
261            what, sig, coredump);
262   } else {
263     warn("%s: child died with unknown wait status %d", status);
264   }
265 }
266
267 static void connect_attempt_discard(void) {
268   if (connecting_sockets[0]) {
269     cancel_fd(loop, connecting_sockets[0], OOP_READ);
270     cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION);
271   }
272   perhaps_close(&connecting_sockets[0]);
273   perhaps_close(&connecting_sockets[1]);
274
275   if (connecting_child) {
276     int status;
277     r= kill(connecting_child, SIGKILL);
278     if (r) sysdie("cannot kill connect child");
279
280     pid_t got= waitpid(connecting_child, &status, WNOHANG);
281     if (got==-1) sysdie("cannot reap connect child");
282
283     if (!(WIFEXITED(status) ||
284           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) {
285       report_child_status("connect"
286     }
287     connecting_child= 0;
288   }
289 }
290
291 #define PREP_DECL_MSG_CMSG(msg)                 \
292   struct msghdr msg;                            \
293   memset(&msg,0,sizeof(msg));                   \
294   char msg##cbuf[CMSG_SPACE(sizeof(fd))];       \
295   msg.msg_control= msg##cbuf;                   \
296   msg.msg_controllen= sizeof(msg##cbuf);
297
298 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
299   Conn *conn= 0;
300
301   conn= xcalloc(sizeof(*conn));
302
303   DECL_MSG_CMSG(msg);
304   struct cmsghdr *h= 0;
305   ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
306   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
307   if (!h) {
308     int status;
309     pid_t got= waitpid(connecting_child, &status, WNOHANG);
310     if (got != -1) {
311       assert(got==connecting_child);
312       connecting_child= 0;
313       if (WIFEXITED(status) &&
314           (WEXITSTATUS(status) != 0
315            WEXITSTATUS(status) != CHILD_ESTATUS_STREAM &&
316            WEXITSTATUS(status) != CHILD_ESTATUS_NOSTREAM)) {
317         /* child already reported the problem */
318       } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) {
319         warn("connect: connection attempt timed out");
320       } else if (!WIFEXITED(status)) {
321         report_child_status("connect", status);
322         /* that's probably the root cause then */
323       }
324     } else {
325       /* child is still running apparently, report the socket problem */
326       if (rs < 0)
327         syswarn("connect: read from child socket failed");
328       else if (e == OOP_EXCEPTIONN)
329         warn("connect: unexpected exception on child socket");
330       else
331         warn("connect: unexpected EOF on child socket");
332     }
333     goto x;
334   }
335
336 #define CHK(field, val)                                                   \
337   if (h->cmsg_##field != val) {                                           \
338     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \
339     goto x;                                                               \
340   }
341   CHK(level, SOL_SOCKET);
342   CHK(type,  SCM_RIGHTS);
343   CHK(len,   CMSG_LEN(sizeof(conn-b>fd)));
344 #undef CHK
345
346   if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; }
347
348   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
349
350   pid_t got= waitpid(connecting_child, &status, 0);
351   if (got==-1) sysdie("connect: real wait for child");
352   assert(got == connecting_child);
353   connecting_child= 0;
354
355   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
356   int es= WEXITSTATUS(status);
357   switch (es) {
358   case CHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
359   case CHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
360   default:
361     die("connect: child gave unexpected exit status %d", es);
362   }
363
364   set nonblocking;
365
366   /* Phew! */
367   LIST_ADDHEAD(idle, conn);
368   notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
369   connect_attempt_discard();
370   check_master_queue();
371   return 0;
372
373  x:
374   if (conn) {
375     perhaps_close(&conn->fd);
376     free(conn);
377   }
378   connect_attempt_discard();
379 }
380
381 static void connect_start() {
382   assert(!connecting_sockets[0]);
383   assert(!connecting_sockets[1]);
384   assert(!connecting_child);
385
386   notice("starting connection attempt");
387
388   r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
389   if (r) { syswarn("connect: cannot create socketpair for child"); goto x; }
390
391   connecting_child= fork();
392   if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; }
393
394   if (!connecting_child) {
395     FILE *cn_from, *cn_to;
396     char buf[NNTP_STRLEN+100];
397     int exitstatus= CHILD_ESTATUS_NOSTREAM;
398
399     put sigpipe back;
400     close unwanted fds;
401
402     r= close(connecting_sockets[0]);
403     if (r) sysdie("connect: close parent socket in child");
404
405     alarm(connection_setup_timeout);
406     if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) {
407       if (buf[0]) {
408         sanitise_inplace(buf);
409         die("connect: rejected: %s", buf);
410       } else {
411         sysdie("connect: connection attempt failed");
412       }
413     }
414     if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0)
415       sysdie("connect: authentication failed");
416     if (try_stream) {
417       if (fputs("MODE STREAM\r\n", cn_to) ||
418           fflush(cn_to))
419         sysdie("connect: could not send MODE STREAM");
420       buf[sizeof(buf)-1]= 0;
421       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
422         if (ferror(cn_from))
423           sysdie("connect: could not read response to MODE STREAM");
424         else
425           die("connect: connection close in response to MODE STREAM");
426       }
427       int l= strlen(buf);
428       assert(l>=1);
429       if (buf[-1]!='\n') {
430         sanitise_inplace(buf);
431         die("connect: response to MODE STREAM is too long: %.100s...",
432             remote_host, buf);
433       }
434       l--;  if (l>0 && buf[1-]=='\r') l--;
435       buf[l]= 0;
436       char *ep;
437       int rcode= strtoul(buf,&ep,10);
438       if (ep != buf[3]) {
439         sanitise_inplace(buf);
440         die("connect: bad response to MODE STREAM: %.50s", buf);
441       }
442       switch (rcode) {
443       case 203:
444         exitstatus= CHILD_ESTATUS_STREAM;
445         break;
446       case 480:
447       case 500:
448         break;
449       default:
450         sanitise_inplace(buf);
451         warn("connect: unexpected response to MODE STREAM: %.50s", buf);
452         exitstatus= 2;
453         break;
454       }
455     }
456     int fd= fileno(cn_from);
457
458     PREP_DECL_MSG_CMSG(msg);
459     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
460     cmsg->cmsg_level= SOL_SOCKET;
461     cmsg->cmsg_type=  SCM_RIGHTS;
462     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
463     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
464
465     msg.msg_controllen= cmsg->cmsg_len;
466     r= sendmsg(connecting_sockets[1], &msg, 0);
467     if (r) sysdie("sendmsg failed for new connection");
468
469     _exit(exitstatus);
470   }
471
472   r= close(connecting_sockets[1]);  connecting_sockets[1]= 0;
473   if (r) sysdie("connect: close child socket in parent");
474
475   loop->on_fd(loop, connecting_sockets[0], OOP_READ,      connchild_event, 0);
476   loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0);
477   return OOP_CONTINUE;
478
479  x:
480   connect_attempt_discard();
481 }
482
483 /*========== overall control of article flow ==========*/
484
485 static void conn_check_work(Conn *conn);
486
487 static void check_master_queue(void) {
488   try reading current feed file;
489
490   if (!queue.count)
491     return;
492
493   Conn *last_assigned=0;
494   for (;;) {
495     if (working.head) {
496       conn_assign_one_article(&working, &last_assigned);
497     } else if (idle.head) {
498       conn_assign_one_article(&idle, &last_assigned);
499     } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
500                !connecting_child && !connect_delay) {
501       connect_delay= reconnect_delay_periods;
502       connect_start();
503     } else {
504       break;
505     }
506   }
507   conn_check_work(last_assigned);
508 }
509
510 static void conn_assign_one_article(LIST(Conn) *connlist,
511                                     Conn **last_assigned) {
512   Conn *conn= connlist->head;
513
514   LIST_REMOVE(*connlist, conn);
515   Article *art= LIST_REMHEAD(queue);
516   LIST_ADDTAIL(conn->queue, art);
517   LIST_ADD(*conn_determine_right_list(conn), conn);
518
519   /* This slightly odd arrangement is so that we call conn_check_work
520    * once after filling the queue for a new connection in
521    * check_master_queue, rather than for each article. */
522   if (conn != *last_assigned && *last_assigned)
523     conn_check_work(*last_assigned);
524   *last_assigned= conn;
525 }
526
527 static int conn_total_queued_articles(Conn *conn) {
528   return conn->sent.count + conn->queue.count;
529 }
530
531 static LIST(Conn) *conn_determine_right_list(Conn *conn) {
532   int inqueue= conn_total_queued_articles(conn);
533   assert(inqueue <= max_queue);
534   if (inqueue == 0) return &idle;
535   if (inqueue == conn->max_queue) return &full;
536   return &working;
537 }
538
539 static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
540   check_conn_work(u);
541   return OOP_CONTINUE;
542 }
543
544 static void conn_check_work(Conn *conn)  {
545   void *rp= 0;
546   for (;;) {
547     conn_make_some_xmits(conn);
548     if (!conn->xmitu) {
549       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
550       return;
551     }
552
553     void *rp= conn_write_some_xmits(conn);
554     if (rp==OOP_CONTINUE) {
555       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
556       return;
557     } else if (rp==OOP_HALT) {
558       return;
559     } else if (!rp) {
560       /* transmitted everything */
561     } else {
562       abort();
563     }
564   }
565 }
566
567 /*========== article transmission ==========*/
568
569
570 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
571                   XmitKind kind) { /* caller must then fill in details */
572   struct iovec *v= &conn->xmit[conn->xmitu];
573   XmitDetails *d= &conn->xmitd[conn->xmitu++];
574   v->iov_base= data;
575   v->iov_len= len;
576   d->kind= kind;
577   return d;
578 }
579
580 static void xmit_noalloc(Conn *conn, const char *data, int len) {
581   xmit_core(conn,data,len, xk_Const);
582 }
583 #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
584
585 static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
586   XmitDetails *d= xmit_core(conn, ah->data, ah->len, sk_Artdata);
587   d->info.sm_art= ah;
588 }
589
590 static void xmit_free(XmitDetails *d) {
591   switch (d->kind) {
592   case xk_Malloc:  free(d->info.malloc_tofree);   break;
593   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
594   case xk_Const:                                  break;
595   default: abort();
596   }
597 }
598
599 static void *conn_write_some_xmits(Conn *conn) {
600   /* return values:
601    *      0:            nothing more to write, no need to call us again
602    *      OOP_CONTINUE: more to write but fd not writeable
603    *      OOP_HALT:     disaster, have destroyed conn
604    */
605   for (;;) {
606     int count= conn->xmitu;
607     if (!count) return 0;
608
609     if (count > IOV_MAX) count= IOV_MAX;
610     ssize_t rs= writev(conn->fd, conn->xmit, count);
611     if (rs < 0) {
612       if (errno == EAGAIN) return OOP_CONTINUE;
613       syswarn(CN "write failed", conn->fd);
614       conn_failed(conn);
615       return OOP_HALT;
616     }
617     assert(rs > 0);
618
619     for (done=0; rs && done<xmitu; done++) {
620       struct iovec *vp= &conn->xmit[done];
621       XmitDetails *dp= &conn->xmitd[done];
622       if (rs > vp->iov_len) {
623         rs -= vp->iov_len;
624         xmit_free(dp);
625       } else {
626         vp->iov_base += rs;
627         vp->iov_len -= rs;
628       }
629     }
630     int newu= conn->xmitu - done;
631     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
632     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
633     conn->xmitu= newu;
634   }
635 }
636
637 static void conn_make_some_xmits(Conn *conn) {
638   for (;;) {
639     if (conn->xmitu+5 > CONNIOVS)
640       break;
641
642     Article *art= LIST_REMHEAD(queue);
643     if (!art) break;
644
645     if (art->checked || (conn->stream && nocheck)) {
646       /* actually send it */
647
648       ARTHANDLE *artdata= SMretrieve();
649
650       if (conn->stream) {
651         if (artdata) {
652           XMIT_LITERAL("TAKETHIS ");
653           xmit_noalloc(conn, art->mid, art->midlen);
654           XMIT_LITERAL("\r\n");
655           xmit_artbody(conn, artdata);
656         }
657       } else {
658         /* we got 235 from IHAVE */
659         if (artdata) {
660           xmit_artbody(conn, artdata);
661         } else {
662           XMIT_LITERAL(".\r\n");
663         }
664       }
665
666       art->sent= 1;
667       LIST_ADDTAIL(conn->sent, art);
668
669       counts[art->checked].sent++;
670
671     } else {
672       /* check it */
673
674       if (conn->stream)
675         XMIT_LITERAL("IHAVE ");
676       else
677         XMIT_LITERAL("CHECK ");
678       xmit_noalloc(art->mid, art->midlen);
679       XMIT_LITERAL("\r\n");
680
681       LIST_ADDTAIL(conn->sent, art);
682       counts[art->checked].offered++;
683     }
684   }
685 }
686
687 /*========== responses from peer ==========*/
688
689 static const oop_rd_style peer_rd_style= {
690   OOP_RD_DELIM_STRIP, '\n',
691   OOP_RD_NUL_FORBID,
692   OOP_RD_SHORTREC_FORBID
693 };
694
695 static Article *article_reply_check(Connection *conn, const char *response,
696                                     int code_indicates_streaming,
697                                     const char *sanitised_response) {
698   Article *art= LIST_REMHEAD(conn->sent);
699
700   if (!art) {
701     warn("peer gave unexpected response when no commands outstanding: %s",
702          sanitised_response);
703     goto failed;
704   }
705
706   if (code_indicates_streaming) {
707     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
708     if (!conn->stream) {
709       warn("peer gave streaming response code "
710            " to IHAVE or subsequent body: %s", sanitised_response);
711       goto failed;
712     }
713     const char *got_mid= response+4;
714     int got_midlen= strcspn(got_mid, " \n\r");
715     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
716       warn("peer gave streaming response with syntactically invalid"
717            " messageid: %s", sanitised_response);
718       goto failed;
719     }
720     if (got_midlen != art->midlen ||
721         memcmp(got_mid, art->messageid, got_midlen)) {
722       peer("peer gave streaming response code to wrong article -"
723            " probable synchronisation problem; we offered: %s; peer said: %s",
724            art->messageid, sanitised_response);
725       goto failed;
726     }
727   } else {
728     if (conn->stream) {
729       warn("peer gave non-streaming response code to CHECK/TAKETHIS: %s",
730            sanitised_response);
731       goto failed;
732     }
733   }
734
735   return art;
736
737  failed:
738   conn_failed(conn);
739   return 0;
740 }
741
742 static void update_nocheck(int accepted) {
743   accept_proportion *= accept_decay;
744   accept_proportion += accepted;
745   nocheck= accept_proportion >= nocheck_thresh;
746   if (nocheck && !nocheck_reported) {
747     notice("entering nocheck mode for the first time");
748     nocheck_reported= 1;
749   }
750 }
751
752 static void article_done(Connection *conn, Article *art, int whichcount) {
753   *count++;
754   counts.articles[art->checked][whichcount]++;
755   if (whichcount == RC_accepted) update_nocheck(1);
756   else if (whichcount == RC_unwanted) update_nocheck(0);
757
758   InputFile *ipf= art->ipf;
759   while (art->blanklen) {
760     static const char spaces[]=
761       "                                                                "
762       "                                                                "
763       "                                                                "
764       "                                                                ";
765     int w= art->blanklen;  if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
766     int r= pwrite(ipf->fd, spaces, w, art->offset);
767     if (r==-1) {
768       if (errno==EINTR) continue;
769       sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
770              art->messageid, art->blanklen, art->offset, ipf->path);
771     }
772     assert(r>=0 && r<=w);
773     art->blanklen -= w;
774     art->offset += w;
775   }
776
777   ipf->inprogress--;
778   assert(ipf->inprogress >= 0);
779
780   free(art);
781 }
782
783 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
784                         const char *errmsg, int errnoval,
785                         const char *data, size_t recsz, void *conn_v) {
786   Conn *conn= conn_v;
787
788   if (ev == OOP_RD_EOF) {
789     warn("unexpected EOF from peer");
790     conn_failed(conn);
791     return;
792   }
793   assert(ev == OOP_RD_OK);
794
795   char *ep;
796   unsigned long code= strtoul(data, &ep, 10);
797   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
798     char sanibuf[100];
799     const char *p= data;
800     char *q= sanibuf;
801     *q++= '`';
802     for (;;) {
803       if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; }
804       int c= *p++;
805       if (!c) { *q++= '\''; break; }
806       if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
807       sprintf(q,"\\x%02x",c);
808       q += 4;
809     }
810     warn("badly formatted response from peer: %s", sanibuf);
811     conn_failed(conn);
812     return;
813   }
814
815   if (conn->quitting) {
816     if (code!=205) {
817       warn("peer gave failure response to QUIT: %s", sani);
818       conn_failed(conn);
819       return;
820     }
821     conn close ok;
822     return;
823   }
824
825   Article *art;
826
827 #define GET_ARTICLE                                                         \
828   art= article_reply_check(conn, data, code_streaming, sani);               \
829   if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */
830
831 #define ARTICLE_DEALTWITH(streaming,how)                        \
832   code_streaming= (streaming)                                   \
833   GET_ARTICLE;                                                  \
834   article_done(conn, art, RC_##how);  break;
835
836   int code_streaming= 0;
837
838   switch (code) {
839
840   case 435: ARTICLE_DEALTWITH(0,unwanted); /* IHAVE says they have it */
841   case 438: ARTICLE_DEALTWITH(1,unwanted); /* CHECK/TAKETHIS: they have it */
842
843   case 235: ARTICLE_DEALTWITH(0,accepted); /* IHAVE says thanks */
844   case 239: ARTICLE_DEALTWITH(1,accepted); /* TAKETHIS says thanks */
845
846   case 437: ARTICLE_DEALTWITH(0,rejected); /* IHAVE says rejected */
847   case 439: ARTICLE_DEALTWITH(1,rejected); /* TAKETHIS says rejected */
848
849   case 238: /* CHECK says send it */
850     code_streaming= 1;
851   case 335: /* IHAVE says send it */
852     GET_ARTICLE;
853     count_checkedwanted++;
854     LIST_ADDTAIL(conn->queue);
855     if (art->checked) {
856       warn("peer gave %d response to article body",code);
857       goto failed;
858     }
859     art->checked= 1;
860     break;
861
862   case 431: /* CHECK or TAKETHIS says try later */
863     code_streaming= 1;
864   case 436: /* IHAVE says try later */
865     GET_ARTICLE;
866     if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
867         || fflush(defer))
868       sysdie("write to defer file %s",path_ductdefer);
869     article_done(conn, art, RC_deferred);
870     break;
871
872   case 400: warn("peer has stopped accepting articles: %s", sani); goto failed;
873   case 503: warn("peer timed us out: %s", sani);                   goto failed;
874   default:  warn("peer sent unexpected message: %s", sani);        goto failed;
875
876   failed:
877     conn_failed(conn);
878     return OOP_CONTINUE;;
879   }
880
881   check_check_work(conn);
882   return OOP_CONTINUE;
883 }
884
885 /*========== monitoring of input file ==========*/
886
887 static void feedfile_eof(InputFile *ipf) {
888   assert(ipf != main_input_file); /* promised by tailing_try_read */
889   assert(ipf == old_input_file);
890   assert(sms == sm_SEPARATED);
891   sms= sm_FINISHING;
892   inputfile_tailing_stop(ipf);
893   inputfile_tailing_start(main_input_file);
894 }
895
896 static void statmc_finishdone(void) {
897   time_t now;
898   struct stat stab;
899
900   assert(sms == sm_FINISHING);
901
902   r= fstat(fileno(defer), &stab);
903   if (r) sysdie("check defer file %s", path_defer);
904
905   if (fclose(defer)) sysdie("could not close defer file %s", path_defer);
906   defer= 0;
907
908   now= time(0);
909   if (now==-1) sysdie("could not get current time for backlog filename");
910
911   char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
912                            (unsigned long)now,
913                            (unsigned long)stab.st_ino);
914   if (link(path_defer, path_backlog))
915     sysdie("could not install defer file %s as backlog file %s",
916            path_defer, backlog);
917   if (unlink(path_defer))
918     sysdie("could not unlink old defer link %s to backlog file %s",
919            path_defer, backlog);
920   open_defer();
921
922   close_input_file(old_input_file);
923   old_input_file= 0;
924
925   if (unlink(path_duct))
926     sysdie("could not unlink old duct file %s", path_duct);
927
928   sms= sm_NORMAL;
929 }
930
931 static InputFile *open_input_file(const char *path) {
932   int fd= open(path, O_RDONLY);
933   if (fd<0) {
934     if (errno==ENOENT) return 0;
935     sysdie("unable to open input file %s", path);
936   }
937
938   InputFile *ipf= xmalloc(sizeof(InputFile));
939   memset(ipf,0,sizeof(*ipf));
940   
941   ipf->readable.on_readable= tailing_on_readable;
942   ipf->readable.on_cancel=   tailing_on_cancel;
943   ipf->readable.try_read=    tailing_try_read;
944
945   ipf->fd= fd;
946   ipf->path= path;
947   
948   return ipf;
949 }
950
951 static void close_input_file(InputFile *ipf) {
952   assert(!ipf->readable_callback); /* must have had ->on_cancel */
953   assert(!ipf->filemon); /* must have had inputfile_tailing_stop */
954   assert(!ipf->rd); /* must have had inputfile_tailing_stop */
955   assert(!ipf->inprogress); /* no dangling pointers pointing here */
956
957   if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path);
958   free(ipf);
959 }
960
961 /*---------- dealing with articles read in the input file ----------*/
962
963 typedef void *feedfile_got_article(oop_source *lp, oop_read *rd,
964                                    oop_rd_event ev, const char *errmsg,
965                                    int errnoval,
966                                    const char *data, size_t recsz,
967                                    void *ipf_v) {
968   InputFile *ipf= ipf_v;
969   Article *art;
970   char tokentextbuf[sizeof(TOKEN)*2+3];
971
972   if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
973
974   if (data[0] && data[0]!=' ') {
975     char *space= strchr(data,' ');
976     int tokenlen= space-data;
977     int midlen= (int)recsz-tokenlen-1;
978     if (midlen < 0) goto bad_data;
979
980     if (tokenlen != sizeof(TOKEN)*2+2) goto bad_data;
981     memcpy(tokentextbuf, data, tokenlen);
982     tokentextbuf[tokenlen]= 0;
983     if (!IsToken(tokentextbuf)) goto bad_data;
984
985     art= xmalloc(sizeof(*art) - 1 + midlen + 1);
986     art->offset= ipf->offset;
987     art->blanklen= recsz;
988     art->midlen= midlen;
989     art->checked= art->sentbody= 0;
990     art->ipf= ipf;  ipf->inprogress++;
991     art->token= TextToToken(tokentextbuf);
992     strcpy(art->messageid, space+1);
993     LIST_ADDTAIL(queue, art);
994   }
995   ipf->offset += recsz + 1;
996
997   if (sms==sm_NORMAL && ipf->offset >= flush_threshold) {
998     int r= link(feedfile, duct_path);
999     if (r) sysdie("link feedfile %s to ductfile %s", feedfile, dut_path);
1000     /* => Hardlinked */
1001
1002     r= unlink(feedfile);
1003     if (r) sysdie("unlink old feedfile link %s", feedfile);
1004     /* => Moved */
1005
1006     spawn_inndcomm_flush(); /* => Flushing, sets sms to sm_FLUSHING */
1007   }
1008
1009   check_master_queue();
1010 }
1011
1012 /*---------- tailing input file ----------*/
1013
1014 static void filemon_start(InputFile *ipf) {
1015   assert(!ipf->filemon);
1016
1017   ipf->filemon= xmalloc(sizeof(*ipf->filemon));
1018   memset(ipf->filemon, 0, sizeof(*ipf->filemon));
1019   filemon_method_startfile(ipf, ipf->filemon);
1020 }
1021
1022 static void filemon_stop(InputFile *ipf) {
1023   if (!ipf->filemon) return;
1024   filemon_method_stopfile(ipf, ipf->filemon);
1025   free(ipf->filemon);
1026   ipf->filemon= 0;
1027 }
1028
1029 static void filemon_callback(InputFile *ipf) {
1030   ipf->readable_callback(ipf->readable_callback_user);
1031 }
1032
1033 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
1034                                      void *user) {
1035   InputFile *ipf= user;
1036   return ipf->readable_callback(ipf->readable_callback_user);
1037 }
1038
1039 static void on_cancel(struct oop_readable *rable) {
1040   InputFile *ipf= (void*)rable;
1041
1042   if (ipf->filemon) filemon_stopfile(ipf);
1043   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
1044   ipf->readable_callback= 0;
1045 }
1046
1047 static int tailing_on_readable(struct oop_readable *rable,
1048                                 oop_readable_call *cb, void *user) {
1049   InputFile *ipf= (void*)rable;
1050
1051   tailing_on_cancel(rable);
1052   ipf->readable_callback= cb;
1053   ipf->readable_callback_user= user;
1054   filemon_startfile(ipf);
1055
1056   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
1057   return 0;
1058 }
1059
1060 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
1061                                 size_t length) {
1062   InputFile *ipf= (void*)rable;
1063   for (;;) {
1064     ssize_t r= read(ipf->fd, buffer, length);
1065     if (!r && ipf==main_input_file) { errno=EAGAIN; return -1; }
1066     if (r==-1 && errno==EINTR) continue;
1067     return r;
1068   }
1069 }
1070
1071 /*---------- filemon implemented with inotify ----------*/
1072
1073 #if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON)
1074 #define HAVE_FILEMON
1075
1076 #include <linux/inotify.h>
1077
1078 static int filemon_inotify_fd;
1079 static int filemon_inotify_wdmax;
1080 static InputFile **filemon_inotify_wd2ipf;
1081
1082 typedef struct Filemon_Perfile {
1083   int wd;
1084 } Filemon_Inotify_Perfile;
1085
1086 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
1087   int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
1088   if (wd < 0) sysdie("inotify_add_watch %s", ipf->path);
1089
1090   if (wd >= filemon_inotify_wdmax) {
1091     int newmax= wd+2;
1092     filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf,
1093                                  sizeof(*filemon_inotify_wd2ipf) * newmax);
1094     memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
1095            sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
1096     filemon_inotify_wdmax= newmax;
1097   }
1098
1099   assert(!filemon_inotify_wd2ipf[wd]);
1100   filemon_inotify_wd2ipf[wd]= ipf;
1101
1102   pf->wd= wd;
1103 }
1104
1105 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
1106   int wd= pf->wd;
1107   int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
1108   if (r) sysdie("inotify_rm_watch");
1109   filemon_inotify_wd2ipf[wd]= 0;
1110 }
1111
1112 static void *filemon_inotify_readable(oop_source *lp, int fd,
1113                                       oop_event e, void *u) {
1114   struct inotify_event iev;
1115   for (;;) {
1116     int r= read(filemon_inotify_fd, &iev, sizeof(iev));
1117     if (r==-1) {
1118       if (errno==EAGAIN) break;
1119       sysdie("read from inotify master");
1120     } else if (r==sizeof(iev)) {
1121       assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
1122     } else {
1123       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
1124     }
1125     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
1126     filemon_callback(ipf);
1127   }
1128   return OOP_CONTINUE;
1129 }
1130
1131 static int filemon_method_init(void) {
1132   filemon_inotify_fd= inotify_init();
1133   if (filemon_inotify_fd<0) {
1134     syswarn("could not initialise inotify: inotify_init failed");
1135     return 0;
1136   }
1137   set nonblock;
1138   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
1139
1140   return 1;
1141 }
1142
1143 #endif /* HAVE_INOTIFY && !HAVE_FILEMON *//
1144
1145 /*---------- filemon dummy implementation ----------*/
1146
1147 #if !defined(HAVE_FILEMON)
1148
1149 typedef struct Filemon_Perfile { int dummy; } Filemon_Dummy_Perfile;
1150
1151 static int filemon_method_init(void) { return 0; }
1152 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
1153 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
1154
1155 #endif /* !HAVE_FILEMON */
1156
1157 /*---------- interface to start and stop an input file ----------*/
1158
1159 static const oop_rd_style feedfile_rdstyle= {
1160   OOP_RD_DELIM_STRIP, '\n',
1161   OOP_RD_NUL_FORBID,
1162   OOP_RD_SHORTREC_EOF,
1163 };
1164
1165 static void inputfile_tailing_start(InputFile *ipf) {
1166   assert(!ipf->fd);
1167   ipf->readable->on_readable= tailing_on_readable;
1168   ipf->readable->on_cancel=   tailing_on_cancel;
1169   ipf->readable->try_read=    tailing_try_read;
1170   ipf->readable->delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
1171   ipf->readable->delete_kill= 0;
1172
1173   ipf->readable_callback= 0;
1174   ipf->readable_callback_user= 0;
1175
1176   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
1177   assert(ipf->fd);
1178
1179   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
1180                      feedfile_got_article,ipf, feedfile_problem,ipf);
1181   if (r) sysdie("unable start reading feedfile %s",ipf->path);
1182 }
1183
1184 static void inputfile_tailing_stop(InputFile *ipf) {
1185   assert(ipf->fd);
1186   oop_rd_delete(ipf->rd);
1187   ipf->rd= 0;
1188   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
1189 }
1190
1191 /*========== interaction with innd ==========*/
1192
1193 /* See official state diagram at top of file.  We implement
1194  * this as follows:
1195  *
1196            ================
1197             WAITING
1198            [Nothing/Noduct]
1199             poll for F
1200            ================
1201                 |
1202                 |     TIMEOUT
1203                 |`--------------------------.
1204                 |                           | install defer as backlog
1205                 | OPEN F SUCCEEDS           | exit
1206      ,--------->|                           V
1207      |          V                         =========
1208      |     ========                        (ESRCH)
1209      |      NORMAL                        [Dropped]
1210      |     [Normal]                       =========
1211      |      read F
1212      |     ========
1213      |          |
1214      |          | F IS SO BIG WE SHOULD FLUSH
1215      ^          | hardlink F to D
1216      |     [Hardlinked]
1217      |          | unlink F
1218      |          | our handle onto F is now onto D
1219      |     [Moved]
1220      |          |
1221      |          |<---------------------------------------------------.
1222      |          |                                                    |
1223      |          | spawn inndcomm flush                               |
1224      |          V                                                    |
1225      |     ==========                                                |
1226      |      FLUSHING                                                 |
1227      |     [Flushing]                                                |
1228      |      read D                                                   |
1229      |     ==========                                                |
1230      |          |                                                    |
1231      |          |   INNDCOMM FLUSH FAILS                             ^
1232      |          |`----------------------->--------.                  |
1233      |          |                                 |                  |
1234      |          |   NO SUCH SITE                  V                  |
1235      ^          |`----------------.            =========             |
1236      |          |                 |            FLUSHFAIL             |
1237      |          |                 V            [Moved]               |
1238      |          |            ==========        read D                |
1239      |          |             DROPPING         =========             |
1240      |          |            [Dropping]           |                  |
1241      |          |             read D              | TIME TO RETRY    |
1242      |          |            ==========           `------------------'
1243      |          | FLUSH OK        |
1244      |          | open F          | AT EOF OF D AND ALL PROCESSED
1245      |          V                 | install defer as backlog
1246      |     ===========            | unlink D
1247      |      SEPARATED             | exit
1248      |     [Separated]            V
1249      |      read D            ==========
1250      |     ===========         (ESRCH)
1251      |          |             [Droppped]
1252      |          |             ==========
1253      |          V
1254      |          | AT EOF OF D
1255      ^          |
1256      |     ===========
1257      |      FINISHING
1258      |     [Finishing]
1259      |      read F
1260      |      write D
1261      |     ===========
1262      |          |
1263      |          | ALL D PROCESSED
1264      |          | install defer as backlog
1265      |          | start new defer
1266      ^          V unlink D
1267      |          | close D
1268      |          |
1269      `----------'
1270
1271  *
1272  */
1273
1274 static char *path_ductlock, *path_duct, *path_ductdefer;
1275
1276 typedef struct {
1277   /* This is an instance of struct oop_readable */
1278   struct oop_readable readable; /* first */
1279   oop_readable_call *readable_callback;
1280   void *readable_callback_user;
1281
1282   int fd;
1283   const char *path; /* ptr copy of path_<foo> or feedfile */
1284   struct Filemon_Perfile *filemon;
1285
1286   oop_read *rd;
1287   long inprogress; /* no. of articles read but not processed */
1288   off_t offset;
1289 } InputFile;
1290
1291 typedef enum {
1292   sm_WAITING,
1293   sm_NORMAL,
1294   sm_FLUSHING,
1295   sm_FLUSHFAIL,
1296   sm_DROPPING,
1297   sm_SEPARATED,
1298   sm_FINISHING;
1299 } StateMachineState;
1300
1301 static InputFile *main_input_file, *old_input_file;
1302 static StateMachineState sms;
1303 static int waiting_periods_sofar;
1304
1305 static void open_defer(void) {
1306   struct stat stab;
1307
1308   assert(!defer);
1309   defer= fopen(path_ductdefer, "a+");
1310   if (!defer) sysdie("could not open defer file %s", path_ductdefer);
1311
1312   /* truncate away any half-written records */
1313
1314   r= fstat(fileno(defer), &stab);
1315   if (r) sysdie("could not stat newly opened defer file %s", path_ductdefer);
1316
1317   if (stab.st_size > LONG_MAX)
1318     die("defer file %s size is far too large", path_ductdefer);
1319
1320   if (!stab.st_size)
1321     return;
1322
1323   long orgsize= stab.st_size;
1324   long truncto= stab.st_size;
1325   for (;;) {
1326     if (!truncto) break; /* was only (if anything) one half-truncated record */
1327     if (fseek(defer, truncto-1, SEEK_SET) < 0)
1328       sysdie("seek in defer file %s while truncating partial", path_ductdefer);
1329
1330     r= getc(defer);
1331     if (r==EOF) {
1332       if (ferror(defer))
1333         sysdie("failed read from defer file %s", path_ductdefer);
1334       else
1335         die("defer file %s shrank while we were checking it!", path_ductdefer);
1336     }
1337     if (r=='\n') break;
1338     truncto--;
1339   }
1340
1341   if (stab.st_size != truncto) {
1342     warn("truncating half-record at end of defer file %s -"
1343          " shrinking by %ld bytes from %ld to %ld",
1344          path_ductdefer, orgsize - truncto, orgsize, truncto);
1345
1346     if (fflush(defer)) sysdie("could not flush defer file %s", path_ductdefer);
1347     if (ftruncate(fileno(defer), truncto))
1348       sysdie("could not truncate defer file %s", path_ductdefer);
1349
1350   } else {
1351     info("continuing existing defer file %s (%ld bytes)",
1352          path_ductdefer, orgsize);
1353   }
1354   if (fseek(defer, truncto, SEEK_SET))
1355     sysdie("could not seek to new end of defer file %s", path_ductdefer);
1356 }
1357
1358 static void statemc_init(void) {
1359   struct stat stab;
1360
1361   path_ductlock=  xasprintf("%s_duct.lock",  feedfile);
1362   path_duct=      xasprintf("%s_duct",       feedfile);
1363   path_ductdefer= xasprintf("%s_duct.defer", feedfile);
1364
1365   if (lstat(path_ductdefer, &stab)) {
1366     if (errno!=ENOENT) sysdie("could not check defer file %s", path_defer);
1367   } else {
1368     if (!S_ISREG(stab.st_mode))
1369       die("defer file %s not a plain file (mode 0%lo)",
1370           path_defer, (unsigned long)stab.st_mode);
1371     switch (stab.st_nlink==1) {
1372     case 1: /* ok */ break;
1373     case 2:
1374       if (unlink(path_defer))
1375         sysdie("could not unlink stale defer file link %s (presumably"
1376                " hardlink to backlog file)", path_defer);
1377       break;
1378     default:
1379       die("defer file %s has unexpected link count %d",
1380           path_defer, stab.st_nlink);
1381     }
1382   }
1383   open_defer();
1384
1385   int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600);
1386   if (lockfd<0) sysdie("open lockfile %s", path_ductlock);
1387
1388   struct flock fl;
1389   memset(&fl,0,sizeof(fl));
1390   fl.l_type= F_WRLCK;
1391   fl.l_whence= SEEK_SET;
1392   r= fcntl(lockfd, F_SETLK, &fl);
1393   if (r==-1) {
1394     if (errno==EACCES || errno==EAGAIN)
1395       die("another duct holds the lockfile");
1396     sysdie("fcntl F_SETLK lockfile %s", path_ductlock);
1397   }
1398
1399   InputFile *file_d= open_input_file(path_duct);
1400
1401   if (file_d) {
1402     struct stat stab_f, stab_d;
1403
1404     r= stat(feedfile, &stab_f);
1405     if (r) {
1406       if (errno!=ENOENT) sysdie("check feed file %s", feedfile);
1407       /* D exists, F ENOENT => Moved */
1408       goto found_moved;
1409     }
1410
1411     /* F and D both exist */
1412
1413     r= fstat(file_d->fd, &stab_d);
1414     if (r) sysdie("check duct file %s", ductfile);
1415
1416     if (stab_d.st_ino == stab_f.st_ino &&
1417         stab_d.st_dev == stab_f.st_dev) {
1418       /* F==D => Hardlinked*/
1419       r= unlink(path_duct);
1420       if (r) sysdie("unlink feed file %s during startup", feedfile);
1421     found_moved:
1422       /* => Moved */
1423       startup_set_input_file(file_d);
1424       spawn_inndcomm_flush(); /* => Flushing, sets sms to sm_FLUSHING */
1425     } else {
1426       /* F!=D => Separated */
1427       sms= sm_SEPARATED;
1428       startup_set_input_file(file_d);
1429     }
1430   } else { /*!file_d*/
1431     sms= sm_WAITING;
1432     statemc_waiting_poll();
1433   }
1434 }
1435
1436 static void statemc_poll(void) {
1437   if (sms == sm_WAITING) statemc_waiting_poll();
1438   if (sms == sm_FINISHING && !old_input_file->inprogress) statemc_finishdone();
1439 }
1440
1441 static void statemc_waiting_poll(void) {
1442   InputFile *file_f= open_input_file(feedfile);
1443   if (!file_f) {
1444     if (waiting_periods_sofar++ > waiting_timeout_periods)
1445       die("timed out waiting for innd to create feed file %s", feedfile);
1446     return;
1447   }
1448   startup_set_input_file(file_d);
1449   sms= sm_NORMAL;
1450 }
1451
1452 static void startup_set_input_file(InputFile *f) {
1453   assert(!main_input_file);
1454   main_input_file= f;
1455   inputfile_tailing_start(f);
1456 }
1457
1458 /*========== flushing the feed ==========*/
1459
1460  
1461
1462 /*========== main program ==========*/
1463
1464 #define EVERY(what, interval, body)                                          \
1465   static const struct timeval what##_timeout = { 5, 0 };                     \
1466   static void what##_schedule(void);                                         \
1467   static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \
1468     { body }                                                                 \
1469     what##_schedule();                                                       \
1470   }                                                                          \
1471   static void what##_schedule(void) {                                        \
1472     loop->on_time(loop, what##_timeout, what##_timedout, 0);                 \
1473   }
1474
1475 EVERY(filepoll, {5,0}, { check_master_queue(); })
1476
1477 EVERY(period, {PERIOD_SECONDS,0}, {
1478   if (connect_delay) connect_delay--;
1479   statemc_poll();
1480   check_master_queue();
1481 });
1482
1483 main {
1484   ignore sigpipe;
1485   if (!filemon_init())
1486     filepoll_schedule();
1487   period_schedule();
1488 };