chiark / gitweb /
a2a4ec60a27b46948fb88f74c65c441418ec159b
[innduct.git] / backends / innduct.c
1 /*
2  * Four files full of
3  *    token article
4  *
5  *   site.name_duct.lock       lock preventing multiple ducts
6  *                                holder of this lock is "duct"
7  * F site.name                 main feed file
8  *                                opened/created, then written, by innd
9  *                                read by duct
10  *                                unlinked by duct
11  *                                tokens blanked out by duct when processed
12  * D site.name_duct            temporary feed file during flush (or crash)
13  *                                hardlink created by duct
14  *                                unlinked by duct
15  *   site.name_duct.defer      431'd articles, still being written,
16  *                                created, written, used by duct
17  *   site.name_backlog.lock    lock taken out by innxmit wrapper
18  *                                holder and its child are "xmit"
19  *   site.name_backlog_<inum>  431'd articles, ready for innxmit
20  *                                created (link/mv) by duct
21  *                                read by xmit
22  *                                unlinked by xmit
23  *   site.name_backlog_<letters> eg
24  *   site.name_backlog_manual
25  *                             anything the sysadmin likes (eg, feed files
26  *                             from old feeds to be merged into this one)
27  *                                created (link/mv) by admin
28  *                                read by xmit
29  *                                unlinked by xmit
30  *
31  *
32  * OVERALL STATES:
33  *
34  *                                                   START
35  *   ,-->--.                                           |
36  *   |     |                                        check D
37  *   |     |                                         /   |
38  *   |     |                                  ENOENT/    |exists
39  *   |     V  <----------+--<----------------------'     |
40  *   |  None             |                               |
41  *   |   F: unopened     |                               |
42  *   |   D: ENOENT       |                               |
43  *   |     |             |                               |
44  *   |  repeatedly       |                               |
45  *   |   open F ------->-'                               |
46  *   |     |     ENOENT                                  |
47  *   |     |OK                                           |
48  *   |     |                                             |
49  *   |     V                                             |
50  *   |  Normal                                        check F
51  *   |   F: innd writing, duct reading / editing        /|\
52  *   |   D: ENOENT                                     / | \
53  *   |     |                                          /  |  \
54  *   |     |  duct decides time to flush      same   /   |   |
55  *   |     |  duct makes hardlink             as D  /    |   |
56  *   |     |                                       /     |   |
57  *   |     V                            <---------'      |   |
58  *   |  Hardlinked                                       |   |
59  *   |   F == D: innd writing, duct reading / editing    |   |
60  *   ^     |                                             |   |
61  *   |     |  duct unlinks F                            /    |
62  *   |     V                                  ENOENT   /     |
63  *   |  Moved                            <------------'      |
64  *   |   F: ENOENT                                           |
65  *   |   D: innd writing, duct reading / editing             |
66  *   |     |                                                 |
67  *   |     |  duct requests flush of feed                    |
68  *   |     |   (others can too, harmlessly)                  |
69  *   |     V                                                 |
70  *   |  Separated                          <-----------------'
71  *   |   F: innd writing                        different to D
72  *   |   D: duct reading / editing
73  *   |     |
74  *   |     V  duct completes processing of D
75  *   |     |  duct unlinks D
76  *   |     |
77  *   `--<--'
78  *
79  */
80
81 #define PERIOD_SECONDS 60
82
83 static char *feedfile;
84 static int max_connections, max_queue_per_conn;
85 static int connection_setup_timeout, port, try_stream;
86 static const char *remote_host;
87
88 #define ISNODE(T)    T *next, *back;
89 #define LIST(T)      struct { T *head, *tail, *tailpred; int count; }
90
91 #define NODE(n) ((struct node*)&(n)->head)
92
93 #define LIST_ADDHEAD(l,n)                                               \
94  (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++)
95 #define LIST_ADDTAIL(l,n)                                               \
96  (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++)
97
98 #define LIST_REMHEAD(l)                                                   \
99  ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0)
100 #define LIST_REMTAIL(l)                                                   \
101  ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0)
102 #define LIST_REMOVE(l,n)                        \
103  (list_remove(NODE((n))), (void)(l).count--)
104 #define LIST_INSERT(l,n,pred) \
105  (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++)
106
107 struct Article {
108   char *mid;
109   int midlen;
110   int checked, sentbody;
111   fd and offset for blanking token or mid;
112 };
113
114 #define CONNIOVS 128
115
116 #define CN "<%d> "
117
118 typedef struct Conn Conn;
119
120 typedef enum {
121   Malloc, Const, Artdata;
122 } XmitKind;
123
124 typedef struct {
125   XmitKind kind;
126   union {
127     char *malloc_tofree;
128     ARTHANDLE *sm_art;
129   } info;
130 } XmitDetails;
131
132 struct Conn {
133   ISNODE(Conn);
134   int fd, max_queue, stream;
135   LIST(Article) queue; /* not yet told peer, or CHECK said send it */
136   LIST(Article) sent; /* offered/transmitted - in xmit or waiting reply */
137   struct iovec xmit[CONNIOVS];
138   XmitDetails xmitd[CONNIOVS];
139   int xmitu;
140 };
141
142 static int filemon_init(void);
143 static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path);
144 static void filemon_callback(void);
145
146
147 #define CHILD_ESTATUS_STREAM   4
148 #define CHILD_ESTATUS_NOSTREAM 5
149
150 static int since_connect_attempt;
151 static int nconns;
152 static LIST(Conn) idle, working, full;
153
154 static LIST(Article) *queue;
155
156 static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
157
158 /*========== making new connections ==========*/
159
160 static int connecting_sockets[2]= {-1,-1};
161 static pid_t connecting_child;
162
163 static void report_child_status(const char *what, int status) {
164   if (WIFEXITED(status)) {
165     int es= WEXITSTATUS(status);
166     if (es)
167       warn("%s: child died with error exit status %d",es);
168   } else if (WIFSIGNALED(status)) {
169     int sig= WTERMSIG(status);
170     const char *sigstr= strsignal(sig);
171     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
172     if (sigstr)
173       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
174     else
175       warn("%s: child died due to unknown fatal signal %d%s",
176            what, sig, coredump);
177   } else {
178     warn("%s: child died with unknown wait status %d", status);
179   }
180 }
181
182 static void connect_attempt_discard(void) {
183   if (connecting_sockets[0]) {
184     cancel_fd(loop, connecting_sockets[0], OOP_READ);
185     cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION);
186   }
187   perhaps_close(&connecting_sockets[0]);
188   perhaps_close(&connecting_sockets[1]);
189
190   if (connecting_child) {
191     int status;
192     r= kill(connecting_child, SIGKILL);
193     if (r) sysdie("cannot kill connect child");
194
195     pid_t got= waitpid(connecting_child, &status, WNOHANG);
196     if (got==-1) sysdie("cannot reap connect child");
197
198     if (!(WIFEXITED(status) ||
199           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) {
200       report_child_status("connect"
201     }
202     connecting_child= 0;
203   }
204 }
205
206 #define PREP_DECL_MSG_CMSG(msg)                 \
207   struct msghdr msg;                            \
208   memset(&msg,0,sizeof(msg));                   \
209   char msg##cbuf[CMSG_SPACE(sizeof(fd))];       \
210   msg.msg_control= msg##cbuf;                   \
211   msg.msg_controllen= sizeof(msg##cbuf);
212
213 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
214   Conn *conn= 0;
215
216   conn= xcalloc(sizeof(*conn));
217
218   DECL_MSG_CMSG(msg);
219   struct cmsghdr *h= 0;
220   ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
221   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
222   if (!h) {
223     int status;
224     pid_t got= waitpid(connecting_child, &status, WNOHANG);
225     if (got != -1) {
226       assert(got==connecting_child);
227       connecting_child= 0;
228       if (WIFEXITED(status) &&
229           (WEXITSTATUS(status) != 0
230            WEXITSTATUS(status) != CHILD_ESTATUS_STREAM &&
231            WEXITSTATUS(status) != CHILD_ESTATUS_NOSTREAM)) {
232         /* child already reported the problem */
233       } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) {
234         warn("connect: connection attempt timed out");
235       } else if (!WIFEXITED(status)) {
236         report_child_status("connect", status);
237         /* that's probably the root cause then */
238       }
239     } else {
240       /* child is still running apparently, report the socket problem */
241       if (rs < 0)
242         syswarn("connect: read from child socket failed");
243       else if (e == OOP_EXCEPTIONN)
244         warn("connect: unexpected exception on child socket");
245       else
246         warn("connect: unexpected EOF on child socket");
247     }
248     goto x;
249   }
250
251 #define CHK(field, val)                                                   \
252   if (h->cmsg_##field != val) {                                           \
253     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \
254     goto x;                                                               \
255   }
256   CHK(level, SOL_SOCKET);
257   CHK(type,  SCM_RIGHTS);
258   CHK(len,   CMSG_LEN(sizeof(conn-b>fd)));
259 #undef CHK
260
261   if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; }
262
263   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
264
265   pid_t got= waitpid(connecting_child, &status, 0);
266   if (got==-1) sysdie("connect: real wait for child");
267   assert(got == connecting_child);
268   connecting_child= 0;
269
270   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
271   int es= WEXITSTATUS(status);
272   switch (es) {
273   case CHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
274   case CHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
275   default:
276     die("connect: child gave unexpected exit status %d", es);
277   }
278
279   set nonblocking;
280
281   /* Phew! */
282   LIST_ADDHEAD(idle, conn);
283   notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
284   connect_attempt_discard();
285   check_master_queue();
286   return 0;
287
288  x:
289   if (conn) {
290     perhaps_close(&conn->fd);
291     free(conn);
292   }
293   connect_attempt_discard();
294 }
295
296 static void connect_start() {
297   assert(!connecting_sockets[0]);
298   assert(!connecting_sockets[1]);
299   assert(!connecting_child);
300
301   notice("starting connection attempt");
302
303   r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
304   if (r) { syswarn("connect: cannot create socketpair for child"); goto x; }
305
306   connecting_child= fork();
307   if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; }
308
309   if (!connecting_child) {
310     FILE *cn_from, *cn_to;
311     char buf[NNTP_STRLEN+100];
312     int exitstatus= CHILD_ESTATUS_NOSTREAM;
313
314     put sigpipe back;
315     close unwanted fds;
316
317     r= close(connecting_sockets[0]);
318     if (r) sysdie("connect: close parent socket in child");
319
320     alarm(connection_setup_timeout);
321     if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) {
322       if (buf[0]) {
323         sanitise_inplace(buf);
324         die("connect: rejected: %s", buf);
325       } else {
326         sysdie("connect: connection attempt failed");
327       }
328     }
329     if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0)
330       sysdie("connect: authentication failed");
331     if (try_stream) {
332       if (fputs("MODE STREAM\r\n", cn_to) ||
333           fflush(cn_to))
334         sysdie("connect: could not send MODE STREAM");
335       buf[sizeof(buf)-1]= 0;
336       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
337         if (ferror(cn_from))
338           sysdie("connect: could not read response to MODE STREAM");
339         else
340           die("connect: connection close in response to MODE STREAM");
341       }
342       int l= strlen(buf);
343       assert(l>=1);
344       if (buf[-1]!='\n') {
345         sanitise_inplace(buf);
346         die("connect: response to MODE STREAM is too long: %.100s...",
347             remote_host, buf);
348       }
349       l--;  if (l>0 && buf[1-]=='\r') l--;
350       buf[l]= 0;
351       char *ep;
352       int rcode= strtoul(buf,&ep,10);
353       if (ep != buf[3]) {
354         sanitise_inplace(buf);
355         die("connect: bad response to MODE STREAM: %.50s", buf);
356       }
357       switch (rcode) {
358       case 203:
359         exitstatus= CHILD_ESTATUS_STREAM;
360         break;
361       case 480:
362       case 500:
363         break;
364       default:
365         sanitise_inplace(buf);
366         warn("connect: unexpected response to MODE STREAM: %.50s", buf);
367         exitstatus= 2;
368         break;
369       }
370     }
371     int fd= fileno(cn_from);
372
373     PREP_DECL_MSG_CMSG(msg);
374     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
375     cmsg->cmsg_level= SOL_SOCKET;
376     cmsg->cmsg_type=  SCM_RIGHTS;
377     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
378     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
379
380     msg.msg_controllen= cmsg->cmsg_len;
381     r= sendmsg(connecting_sockets[1], &msg, 0);
382     if (r) sysdie("sendmsg failed for new connection");
383
384     _exit(exitstatus);
385   }
386
387   r= close(connecting_sockets[1]);  connecting_sockets[1]= 0;
388   if (r) sysdie("connect: close child socket in parent");
389
390   loop->on_fd(loop, connecting_sockets[0], OOP_READ,      connchild_event, 0);
391   loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0);
392   return OOP_CONTINUE;
393
394  x:
395   connect_attempt_discard();
396 }
397
398 /*========== overall control of article flow ==========*/
399
400 static void conn_check_work(Conn *conn);
401
402 static void check_master_queue(void) {
403   try reading current feed file;
404
405   if (!queue.count)
406     return;
407
408   Conn *last_assigned=0;
409   for (;;) {
410     if (working.head) {
411       conn_assign_one_article(&working, &last_assigned);
412     } else if (idle.head) {
413       conn_assign_one_article(&idle, &last_assigned);
414     } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
415                !connecting_child && !connect_delay) {
416       connect_delay= reconnect_delay_periods;
417       connect_start();
418     } else {
419       break;
420     }
421   }
422   conn_check_work(last_assigned);
423 }
424
425 static void conn_assign_one_article(LIST(Conn) *connlist,
426                                     Conn **last_assigned) {
427   Conn *conn= connlist->head;
428
429   LIST_REMOVE(*connlist, conn);
430   Article *art= LIST_REMHEAD(queue);
431   LIST_ADDTAIL(conn->queue, art);
432   LIST_ADD(*conn_determine_right_list(conn), conn);
433
434   /* This slightly odd arrangement is so that we call conn_check_work
435    * once after filling the queue for a new connection in
436    * check_master_queue, rather than for each article. */
437   if (conn != *last_assigned && *last_assigned)
438     conn_check_work(*last_assigned);
439   *last_assigned= conn;
440 }
441
442 static int conn_total_queued_articles(Conn *conn) {
443   return conn->sent.count + conn->queue.count;
444 }
445
446 static LIST(Conn) *conn_determine_right_list(Conn *conn) {
447   int inqueue= conn_total_queued_articles(conn);
448   assert(inqueue <= max_queue);
449   if (inqueue == 0) return &idle;
450   if (inqueue == conn->max_queue) return &full;
451   return &working;
452 }
453
454 static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
455   check_conn_work(u);
456   return OOP_CONTINUE;
457 }
458
459 static void conn_check_work(Conn *conn)  {
460   void *rp= 0;
461   for (;;) {
462     conn_make_some_xmits(conn);
463     if (!conn->xmitu) {
464       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
465       return;
466     }
467
468     void *rp= conn_write_some_xmits(conn);
469     if (rp==OOP_CONTINUE) {
470       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
471       return;
472     } else if (rp==OOP_HALT) {
473       return;
474     } else if (!rp) {
475       /* transmitted everything */
476     } else {
477       abort();
478     }
479   }
480 }
481
482 /*========== article transmission ==========*/
483
484 static void *conn_write_some_xmits(Conn *conn) {
485   /* return values:
486    *      0:            nothing more to write, no need to call us again
487    *      OOP_CONTINUE: more to write but fd not writeable
488    *      OOP_HALT:     disaster, have destroyed conn
489    */
490   for (;;) {
491     int count= conn->xmitu;
492     if (!count) return 0;
493
494     if (count > IOV_MAX) count= IOV_MAX;
495     ssize_t rs= writev(conn->fd, conn->xmit, count);
496     if (rs < 0) {
497       if (errno == EAGAIN) return OOP_CONTINUE;
498       syswarn(CN "write failed", conn->fd);
499       conn_failed(conn);
500       return OOP_HALT;
501     }
502     assert(rs > 0);
503
504     for (done=0; rs && done<xmitu; done++) {
505       struct iovec *vp= &conn->xmit[done];
506       XmitDetails *dp= &conn->xmitd[done];
507       if (rs > vp->iov_len) {
508         rs -= vp->iov_len;
509         xmit_free(dp);
510       } else {
511         vp->iov_base += rs;
512         vp->iov_len -= rs;
513       }
514     }
515     int newu= conn->xmitu - done;
516     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
517     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
518     conn->xmitu= newu;
519   }
520 }
521
522 static void conn_make_some_xmits(Conn *conn) {
523   for (;;) {
524     if (conn->xmitu+5 > CONNIOVS)
525       break;
526
527     Article *art= LIST_REMHEAD(queue);
528     if (!art) break;
529
530     if (art->checked || conn->nocheck) {
531       /* actually send it */
532
533       ARTHANDLE *artdata= SMretrieve(somehow);
534
535       if (conn->stream) {
536         if (artdata) {
537           XMIT_LITERAL("TAKETHIS ");
538           xmit_noalloc(art->mid, art->midlen);
539           XMIT_LITERAL("\r\n");
540           xmit_artbody(artdata);
541         }
542       } else {
543         /* we got 235 from IHAVE */
544         if (artdata) {
545           xmit_artbody(artdata);
546         } else {
547           XMIT_LITERAL(".\r\n");
548         }
549       }
550       art->sent= 1;
551       LIST_ADDTAIL(conn->sent, art);
552
553     } else {
554       /* check it */
555
556       if (conn->stream)
557         XMIT_LITERAL("IHAVE ");
558       else
559         XMIT_LITERAL("CHECK ");
560       xmit_noalloc(art->mid, art->midlen);
561       XMIT_LITERAL("\r\n");
562
563       LIST_ADDTAIL(conn->sent, art);
564     }
565   }
566 }
567
568 /*========== responses from peer ==========*/
569
570 static const oop_rd_style peer_rd_style= {
571   OOP_RD_DELIM_STRIP, '\n',
572   OOP_RD_NUL_FORBID,
573   OOP_RD_SHORTREC_FORBID
574 };
575
576 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
577                         const char *errmsg, int errnoval,
578                         const char *data, size_t recsz, void *conn_v) {
579   Conn *conn= conn_v;
580
581   if (ev == OOP_RD_EOF) {
582     warn("unexpected EOF from peer");
583     conn_failed(conn);
584     return;
585   }
586   assert(ev == OOP_RD_OK);
587
588   char *ep;
589   unsigned long code= strtoul(data, &ep, 10);
590   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
591     char sanibuf[100];
592     const char *p= data;
593     char *q= sanibuf;
594     *q++= '`';
595     for (;;) {
596       if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; }
597       int c= *p++;
598       if (!c) { *q++= '\''; break; }
599       if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
600       sprintf(q,"\\x%02x",c);
601       q += 4;
602     }
603     warn("badly formatted response from peer: %s", sanibuf);
604     conn_failed(conn);
605     return;
606   }
607
608   if (conn->quitting) {
609     if (code!=205) {
610       warn("peer gave failure response to QUIT: %s", sani);
611       conn_failed(conn);
612       return;
613     }
614     conn close ok;
615     return;
616   }
617
618   switch (code) {
619   case 438: /* CHECK says they have it */
620   case 435: /* IHAVE says they have it */
621     ARTICLE_DEALTWITH(1,unwanted);
622     break;
623
624   case 238: /* CHECK says send it */
625   case 335: /* IHAVE says send it */
626     count_checkedwanted++;
627     Article *art= LIST_REMHEAD(conn->sent);
628     art->checked= 1;
629     LIST_ADDTAIL(conn->queue);
630     break;
631
632   case 235: /* IHAVE says thanks */
633   case 239: /* TAKETHIS says thanks */
634     ARTICLE_DEALTWITH(1,accepted);
635     break;
636
637   case 439: /* TAKETHIS says rejected */
638   case 437: /* IHAVE says rejected */
639     ARTICLE_DEALTWITH(1,rejected);
640     break;
641
642   case 431: /* CHECK or TAKETHIS says try later */
643   case 436: /* IHAVE says try later */
644     ARTICLE_DEALTWITH(0,deferred);
645     break;
646
647   case 400: warn("peer has stopped accepting articles: %s", sani); goto failed;
648   case 503: warn("peer timed us out: %s", sani);                   goto failed;
649   default:  warn("peer sent unexpected message: %s", sani);
650   failed:
651     conn_failed(conn);
652     return OOP_CONTINUE;;
653   }
654
655   return OOP_CONTINUE;
656 }
657
658 /*========== monitoring of input file ==========*/
659
660 /*---------- tailing input file ----------*/
661
662
663
664 /*---------- filemon implemented with inotify ----------*/
665
666 #if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON)
667 #define HAVE_FILEMON
668
669 #include <linux/inotify.h>
670
671 static int filemon_inotify_fd;
672 static int filemon_inotify_wd= -1;
673
674 static void *filemon_inotify_readable(oop_source *lp, int fd,
675                                       oop_event e, void *u) {
676   struct inotify_event iev;
677   for (;;) {
678     int r= read(filemon_inotify_fd, &iev, sizeof(iev));
679     if (r==-1) {
680       if (errno==EAGAIN) break;
681       sysdie("read from inotify master");
682     } else if (r==sizeof(iev)) {
683       assert(wd == filemon_inotify_wd);
684     } else {
685       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
686     }
687   }
688   /* Technically speaking the select might fire early in which case
689    * we'll read no events and then call filemon_callback
690    * unnecessarily.  We don't care about that.
691    */
692   filemon_callback();
693   return OOP_CONTINUE;
694 }
695
696 static int filemon_init(void) {
697   filemon_inotify_fd= inotify_init();
698   if (filemon_inotify_fd<0) {
699     syswarn("could not initialise inotify: inotify_init failed");
700     return 0;
701   }
702   set nonblock;
703   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
704
705   return 1;
706 }
707
708 static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path) {
709   if (filemon_inotify_wd >= 0) {
710     int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
711     if (r) sysdie("inotify_rm_watch");
712   }
713   filemon_inotify_wd= inotify_add_watch(filemon_inotify_fd, path, IN_MODIFY);
714   if (filemon_inotify_wd < 0) sysdie("inotify_add_watch");
715 }
716
717 #endif /* HAVE_INOTIFY && !HAVE_FILEMON *//
718
719 /*---------- filemon dummy implementation ----------*/
720
721 #if !defined(HAVE_FILEMON)
722
723 static int filemon_init(void) { return 0; }
724 static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path) { }
725
726 #endif
727
728 /*========== interaction with innd ==========*/
729
730 /* see state diagram at top of file */
731
732 static char *path_ductlock, *path_duct, *path_ductdefer;
733 static int tailing_fd= -1, flushing_fd= -1;
734
735 static void statemc_init(void) {
736   path_ductlock=  xasprintf("%s_duct.lock",  feedfile);
737   path_duct=      xasprintf("%s_duct",       feedfile);
738   path_ductdefer= xasprintf("%s_duct.defer", feedfile);
739
740   int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600);
741   if (lockfd<0) sysdie("open lockfile %s", path_ductlock);
742
743   struct flock fl;
744   memset(&fl,0,sizeof(fl));
745   fl.l_type= F_WRLCK;
746   fl.l_whence= SEEK_SET;
747   r= fcntl(lockfd, F_SETLK, &fl);
748   if (r==-1) {
749     if (errno==EACCES || errno==EAGAIN)
750       die("another duct holds the lockfile");
751     sysdie("fcntl F_SETLK lockfile %s", path_ductlock);
752   }
753 }
754
755 static void statemc_poll(void) {
756   if (tailing_fd>=0) return;
757
758   int d_fd= open(path_duct, O_RDWR);
759   if (d_fd<0)
760     if (errno!=ENOENT) sysdie("open duct file %s", path_duct);
761
762   int f_fd= open(feedfile, O_RDWR);
763   if (f_fd<0)
764     if (errno!=ENOENT) sysdie("open feed file %s", feedfile);
765
766   if (d_fd<0) {
767     if (f_fd>=0)
768       start_tailing(f_fd);
769     return;
770   }
771
772   
773
774 /*========== main program ==========*/
775
776 #define EVERY(what, interval, body)                                          \
777   static const struct timeval what##_timeout = { 5, 0 };                     \
778   static void what##_schedule(void);                                         \
779   static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \
780     { body }                                                                 \
781     what##_schedule();                                                       \
782   }                                                                          \
783   static void what##_schedule(void) {                                        \
784     loop->on_time(loop, what##_timeout, what##_timedout, 0);                 \
785   }
786
787 EVERY(filepoll, {5,0}, { check_master_queue(); })
788
789 EVERY(period, {PERIOD_SECONDS,0}, {
790   if (connect_delay) connect_delay--;
791   statemc_poll();
792   check_master_queue();
793 });
794
795 main {
796   ignore sigpipe;
797   if (!filemon_init())
798     filepoll_schedule();
799   period_schedule();
800 };