chiark / gitweb /
Inputfile machinery. Now do state machine
[innduct.git] / backends / innduct.c
1 /*
2  * Four files full of
3  *    token article
4  *
5  *   site.name_duct.lock       lock preventing multiple ducts
6  *                                holder of this lock is "duct"
7  * F site.name                 main feed file
8  *                                opened/created, then written, by innd
9  *                                read by duct
10  *                                unlinked by duct
11  *                                tokens blanked out by duct when processed
12  * D site.name_duct            temporary feed file during flush (or crash)
13  *                                hardlink created by duct
14  *                                unlinked by duct
15  *   site.name_duct.defer      431'd articles, still being written,
16  *                                created, written, used by duct
17  *   site.name_backlog.lock    lock taken out by innxmit wrapper
18  *                                holder and its child are "xmit"
19  *   site.name_backlog_<inum>  431'd articles, ready for innxmit
20  *                                created (link/mv) by duct
21  *                                read by xmit
22  *                                unlinked by xmit
23  *   site.name_backlog_<letters> eg
24  *   site.name_backlog_manual
25  *                             anything the sysadmin likes (eg, feed files
26  *                             from old feeds to be merged into this one)
27  *                                created (link/mv) by admin
28  *                                read by xmit
29  *                                unlinked by xmit
30
31
32    OVERALL STATES:
33
34                                                                 START
35                                                                   |
36                                                              check D, F
37                                                                   |
38                           <--------------------------------------'|
39         Nothing                            F, D both ENOENT       |
40          F: ENOENT                                                |
41          D: ENOENT                                                |
42          duct: not not reading anything                           |
43            |                                                      |
44            |`---------------------.                               |
45            |                      | duct times out waiting for F  |
46            V  innd creates F      | duct exits                    |
47            |                      V                               |
48         Noduct                    GO TO Dropped                   |
49          F: innd writing                                          |
50          D: ENOENT                                                |
51          duct: not running or not reading anything                |
52            |                                                      |
53            |                                                      |
54      ,-->--+                   <---------------------------------'|
55      |     |  duct opens F                         F exists       |
56      |     |                                       D ENOENT       |
57      |     V                                                      |
58      |  Normal                                                    |
59      |   F: innd writing, duct reading                            |
60      |   D: ENOENT                                                |
61      |     |                                                      |
62      |     |  duct decides time to flush                          |
63      |     |  duct makes hardlink                                 |
64      |     |                                                      |
65      |     V                            <------------------------'|
66      |  Hardlinked                                  F==D          |
67      |   F == D: innd writing, duct reading         both exist    |
68      ^     |                                                      |
69      |     |  duct unlinks F                                      |
70      |     V                                                      |
71      |  Moved                               <----+------------<--'|
72      |   F: ENOENT                               |  F ENOENT      |
73      |   D: innd writing, duct reading           |  D exists      |
74      |     |                                     |                |
75      |     |  duct requests flush of feed        |                |
76      |     |   (others can too, harmlessly)      |                |
77      |     V                                     |                |
78      |  Flushing                                 |                |
79      |   F: ENOENT                               |                |
80      |   D: innd flushing, duct reading          |                |
81      |     |                                     |                |
82      |     |   inndcomm flush fails              |                |
83      |     |`-------------------------->---------'                |
84      |     |                                                      |
85      |     |   inndcomm reports no such site                      |
86      |     |`---------------------------------------------------- | -.
87      |     |                                                      |  |
88      |     |  innd finishes writing D, creates F                  |  |
89      |     |  inndcomm reports flush successful                   |  |
90      |     |                                                      |  |
91      |     V                                                      |  |
92      |  Separated                                <----------------'  |
93      |   F: innd writing                            F!=D             /
94      |   D: duct reading                             both exist     /
95      |     |                                                       /
96      |     |  duct gets to the end of D                           /
97      |     |  duct opens F too                                   /
98      |     V                                                    /
99      |  Finishing                                              /
100      |   F: innd writing, duct reading                        |
101      |   D: duct finishing                                    V
102      |     |                                            Dropping
103      |     |  duct finishes processing D                 F: ENOENT
104      |     V  duct unlinks D                             D: duct reading
105      |     |                                                  |
106      `--<--'                                                  | duct finishes
107                                                               |  processing D
108                                                               | duct unlinks D
109                                                               | duct exits
110                                                               V
111                                                         Dropped
112                                                          F: ENOENT
113                                                          D: ENOENT
114                                                          duct not running
115
116    "duct reading" means innduct is reading the file but also
117    overwriting processed tokens.
118
119  *
120  *
121  */
122
123 #define PERIOD_SECONDS 60
124
125 static char *feedfile;
126 static int max_connections, max_queue_per_conn;
127 static int connection_setup_timeout, port, try_stream;
128 static const char *remote_host;
129
130 #define ISNODE(T)    T *next, *back;
131 #define LIST(T)      struct { T *head, *tail, *tailpred; int count; }
132
133 #define NODE(n) ((struct node*)&(n)->head)
134
135 #define LIST_ADDHEAD(l,n)                                               \
136  (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++)
137 #define LIST_ADDTAIL(l,n)                                               \
138  (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++)
139
140 #define LIST_REMHEAD(l)                                                   \
141  ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0)
142 #define LIST_REMTAIL(l)                                                   \
143  ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0)
144 #define LIST_REMOVE(l,n)                        \
145  (list_remove(NODE((n))), (void)(l).count--)
146 #define LIST_INSERT(l,n,pred) \
147  (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++)
148
149 struct Article {
150   char *mid;
151   int midlen;
152   int checked, sentbody;
153   fd and offset for blanking token or mid;
154 };
155
156 #define CONNIOVS 128
157
158 #define CN "<%d> "
159
160 typedef struct Conn Conn;
161
162 typedef enum {
163   Malloc, Const, Artdata;
164 } XmitKind;
165
166 typedef struct {
167   XmitKind kind;
168   union {
169     char *malloc_tofree;
170     ARTHANDLE *sm_art;
171   } info;
172 } XmitDetails;
173
174 struct Conn {
175   ISNODE(Conn);
176   int fd, max_queue, stream;
177   LIST(Article) queue; /* not yet told peer, or CHECK said send it */
178   LIST(Article) sent; /* offered/transmitted - in xmit or waiting reply */
179   struct iovec xmit[CONNIOVS];
180   XmitDetails xmitd[CONNIOVS];
181   int xmitu;
182 };
183
184 static int filemon_init(void);
185 static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path);
186 static void filemon_callback(void);
187
188
189 #define CHILD_ESTATUS_STREAM   4
190 #define CHILD_ESTATUS_NOSTREAM 5
191
192 static int since_connect_attempt;
193 static int nconns;
194 static LIST(Conn) idle, working, full;
195
196 static LIST(Article) *queue;
197
198 static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
199
200 /*========== making new connections ==========*/
201
202 static int connecting_sockets[2]= {-1,-1};
203 static pid_t connecting_child;
204
205 static void report_child_status(const char *what, int status) {
206   if (WIFEXITED(status)) {
207     int es= WEXITSTATUS(status);
208     if (es)
209       warn("%s: child died with error exit status %d",es);
210   } else if (WIFSIGNALED(status)) {
211     int sig= WTERMSIG(status);
212     const char *sigstr= strsignal(sig);
213     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
214     if (sigstr)
215       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
216     else
217       warn("%s: child died due to unknown fatal signal %d%s",
218            what, sig, coredump);
219   } else {
220     warn("%s: child died with unknown wait status %d", status);
221   }
222 }
223
224 static void connect_attempt_discard(void) {
225   if (connecting_sockets[0]) {
226     cancel_fd(loop, connecting_sockets[0], OOP_READ);
227     cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION);
228   }
229   perhaps_close(&connecting_sockets[0]);
230   perhaps_close(&connecting_sockets[1]);
231
232   if (connecting_child) {
233     int status;
234     r= kill(connecting_child, SIGKILL);
235     if (r) sysdie("cannot kill connect child");
236
237     pid_t got= waitpid(connecting_child, &status, WNOHANG);
238     if (got==-1) sysdie("cannot reap connect child");
239
240     if (!(WIFEXITED(status) ||
241           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) {
242       report_child_status("connect"
243     }
244     connecting_child= 0;
245   }
246 }
247
248 #define PREP_DECL_MSG_CMSG(msg)                 \
249   struct msghdr msg;                            \
250   memset(&msg,0,sizeof(msg));                   \
251   char msg##cbuf[CMSG_SPACE(sizeof(fd))];       \
252   msg.msg_control= msg##cbuf;                   \
253   msg.msg_controllen= sizeof(msg##cbuf);
254
255 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
256   Conn *conn= 0;
257
258   conn= xcalloc(sizeof(*conn));
259
260   DECL_MSG_CMSG(msg);
261   struct cmsghdr *h= 0;
262   ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
263   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
264   if (!h) {
265     int status;
266     pid_t got= waitpid(connecting_child, &status, WNOHANG);
267     if (got != -1) {
268       assert(got==connecting_child);
269       connecting_child= 0;
270       if (WIFEXITED(status) &&
271           (WEXITSTATUS(status) != 0
272            WEXITSTATUS(status) != CHILD_ESTATUS_STREAM &&
273            WEXITSTATUS(status) != CHILD_ESTATUS_NOSTREAM)) {
274         /* child already reported the problem */
275       } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) {
276         warn("connect: connection attempt timed out");
277       } else if (!WIFEXITED(status)) {
278         report_child_status("connect", status);
279         /* that's probably the root cause then */
280       }
281     } else {
282       /* child is still running apparently, report the socket problem */
283       if (rs < 0)
284         syswarn("connect: read from child socket failed");
285       else if (e == OOP_EXCEPTIONN)
286         warn("connect: unexpected exception on child socket");
287       else
288         warn("connect: unexpected EOF on child socket");
289     }
290     goto x;
291   }
292
293 #define CHK(field, val)                                                   \
294   if (h->cmsg_##field != val) {                                           \
295     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \
296     goto x;                                                               \
297   }
298   CHK(level, SOL_SOCKET);
299   CHK(type,  SCM_RIGHTS);
300   CHK(len,   CMSG_LEN(sizeof(conn-b>fd)));
301 #undef CHK
302
303   if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; }
304
305   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
306
307   pid_t got= waitpid(connecting_child, &status, 0);
308   if (got==-1) sysdie("connect: real wait for child");
309   assert(got == connecting_child);
310   connecting_child= 0;
311
312   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
313   int es= WEXITSTATUS(status);
314   switch (es) {
315   case CHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
316   case CHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
317   default:
318     die("connect: child gave unexpected exit status %d", es);
319   }
320
321   set nonblocking;
322
323   /* Phew! */
324   LIST_ADDHEAD(idle, conn);
325   notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
326   connect_attempt_discard();
327   check_master_queue();
328   return 0;
329
330  x:
331   if (conn) {
332     perhaps_close(&conn->fd);
333     free(conn);
334   }
335   connect_attempt_discard();
336 }
337
338 static void connect_start() {
339   assert(!connecting_sockets[0]);
340   assert(!connecting_sockets[1]);
341   assert(!connecting_child);
342
343   notice("starting connection attempt");
344
345   r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
346   if (r) { syswarn("connect: cannot create socketpair for child"); goto x; }
347
348   connecting_child= fork();
349   if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; }
350
351   if (!connecting_child) {
352     FILE *cn_from, *cn_to;
353     char buf[NNTP_STRLEN+100];
354     int exitstatus= CHILD_ESTATUS_NOSTREAM;
355
356     put sigpipe back;
357     close unwanted fds;
358
359     r= close(connecting_sockets[0]);
360     if (r) sysdie("connect: close parent socket in child");
361
362     alarm(connection_setup_timeout);
363     if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) {
364       if (buf[0]) {
365         sanitise_inplace(buf);
366         die("connect: rejected: %s", buf);
367       } else {
368         sysdie("connect: connection attempt failed");
369       }
370     }
371     if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0)
372       sysdie("connect: authentication failed");
373     if (try_stream) {
374       if (fputs("MODE STREAM\r\n", cn_to) ||
375           fflush(cn_to))
376         sysdie("connect: could not send MODE STREAM");
377       buf[sizeof(buf)-1]= 0;
378       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
379         if (ferror(cn_from))
380           sysdie("connect: could not read response to MODE STREAM");
381         else
382           die("connect: connection close in response to MODE STREAM");
383       }
384       int l= strlen(buf);
385       assert(l>=1);
386       if (buf[-1]!='\n') {
387         sanitise_inplace(buf);
388         die("connect: response to MODE STREAM is too long: %.100s...",
389             remote_host, buf);
390       }
391       l--;  if (l>0 && buf[1-]=='\r') l--;
392       buf[l]= 0;
393       char *ep;
394       int rcode= strtoul(buf,&ep,10);
395       if (ep != buf[3]) {
396         sanitise_inplace(buf);
397         die("connect: bad response to MODE STREAM: %.50s", buf);
398       }
399       switch (rcode) {
400       case 203:
401         exitstatus= CHILD_ESTATUS_STREAM;
402         break;
403       case 480:
404       case 500:
405         break;
406       default:
407         sanitise_inplace(buf);
408         warn("connect: unexpected response to MODE STREAM: %.50s", buf);
409         exitstatus= 2;
410         break;
411       }
412     }
413     int fd= fileno(cn_from);
414
415     PREP_DECL_MSG_CMSG(msg);
416     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
417     cmsg->cmsg_level= SOL_SOCKET;
418     cmsg->cmsg_type=  SCM_RIGHTS;
419     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
420     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
421
422     msg.msg_controllen= cmsg->cmsg_len;
423     r= sendmsg(connecting_sockets[1], &msg, 0);
424     if (r) sysdie("sendmsg failed for new connection");
425
426     _exit(exitstatus);
427   }
428
429   r= close(connecting_sockets[1]);  connecting_sockets[1]= 0;
430   if (r) sysdie("connect: close child socket in parent");
431
432   loop->on_fd(loop, connecting_sockets[0], OOP_READ,      connchild_event, 0);
433   loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0);
434   return OOP_CONTINUE;
435
436  x:
437   connect_attempt_discard();
438 }
439
440 /*========== overall control of article flow ==========*/
441
442 static void conn_check_work(Conn *conn);
443
444 static void check_master_queue(void) {
445   try reading current feed file;
446
447   if (!queue.count)
448     return;
449
450   Conn *last_assigned=0;
451   for (;;) {
452     if (working.head) {
453       conn_assign_one_article(&working, &last_assigned);
454     } else if (idle.head) {
455       conn_assign_one_article(&idle, &last_assigned);
456     } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
457                !connecting_child && !connect_delay) {
458       connect_delay= reconnect_delay_periods;
459       connect_start();
460     } else {
461       break;
462     }
463   }
464   conn_check_work(last_assigned);
465 }
466
467 static void conn_assign_one_article(LIST(Conn) *connlist,
468                                     Conn **last_assigned) {
469   Conn *conn= connlist->head;
470
471   LIST_REMOVE(*connlist, conn);
472   Article *art= LIST_REMHEAD(queue);
473   LIST_ADDTAIL(conn->queue, art);
474   LIST_ADD(*conn_determine_right_list(conn), conn);
475
476   /* This slightly odd arrangement is so that we call conn_check_work
477    * once after filling the queue for a new connection in
478    * check_master_queue, rather than for each article. */
479   if (conn != *last_assigned && *last_assigned)
480     conn_check_work(*last_assigned);
481   *last_assigned= conn;
482 }
483
484 static int conn_total_queued_articles(Conn *conn) {
485   return conn->sent.count + conn->queue.count;
486 }
487
488 static LIST(Conn) *conn_determine_right_list(Conn *conn) {
489   int inqueue= conn_total_queued_articles(conn);
490   assert(inqueue <= max_queue);
491   if (inqueue == 0) return &idle;
492   if (inqueue == conn->max_queue) return &full;
493   return &working;
494 }
495
496 static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
497   check_conn_work(u);
498   return OOP_CONTINUE;
499 }
500
501 static void conn_check_work(Conn *conn)  {
502   void *rp= 0;
503   for (;;) {
504     conn_make_some_xmits(conn);
505     if (!conn->xmitu) {
506       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
507       return;
508     }
509
510     void *rp= conn_write_some_xmits(conn);
511     if (rp==OOP_CONTINUE) {
512       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
513       return;
514     } else if (rp==OOP_HALT) {
515       return;
516     } else if (!rp) {
517       /* transmitted everything */
518     } else {
519       abort();
520     }
521   }
522 }
523
524 /*========== article transmission ==========*/
525
526 static void *conn_write_some_xmits(Conn *conn) {
527   /* return values:
528    *      0:            nothing more to write, no need to call us again
529    *      OOP_CONTINUE: more to write but fd not writeable
530    *      OOP_HALT:     disaster, have destroyed conn
531    */
532   for (;;) {
533     int count= conn->xmitu;
534     if (!count) return 0;
535
536     if (count > IOV_MAX) count= IOV_MAX;
537     ssize_t rs= writev(conn->fd, conn->xmit, count);
538     if (rs < 0) {
539       if (errno == EAGAIN) return OOP_CONTINUE;
540       syswarn(CN "write failed", conn->fd);
541       conn_failed(conn);
542       return OOP_HALT;
543     }
544     assert(rs > 0);
545
546     for (done=0; rs && done<xmitu; done++) {
547       struct iovec *vp= &conn->xmit[done];
548       XmitDetails *dp= &conn->xmitd[done];
549       if (rs > vp->iov_len) {
550         rs -= vp->iov_len;
551         xmit_free(dp);
552       } else {
553         vp->iov_base += rs;
554         vp->iov_len -= rs;
555       }
556     }
557     int newu= conn->xmitu - done;
558     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
559     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
560     conn->xmitu= newu;
561   }
562 }
563
564 static void conn_make_some_xmits(Conn *conn) {
565   for (;;) {
566     if (conn->xmitu+5 > CONNIOVS)
567       break;
568
569     Article *art= LIST_REMHEAD(queue);
570     if (!art) break;
571
572     if (art->checked || conn->nocheck) {
573       /* actually send it */
574
575       ARTHANDLE *artdata= SMretrieve(somehow);
576
577       if (conn->stream) {
578         if (artdata) {
579           XMIT_LITERAL("TAKETHIS ");
580           xmit_noalloc(art->mid, art->midlen);
581           XMIT_LITERAL("\r\n");
582           xmit_artbody(artdata);
583         }
584       } else {
585         /* we got 235 from IHAVE */
586         if (artdata) {
587           xmit_artbody(artdata);
588         } else {
589           XMIT_LITERAL(".\r\n");
590         }
591       }
592       art->sent= 1;
593       LIST_ADDTAIL(conn->sent, art);
594
595     } else {
596       /* check it */
597
598       if (conn->stream)
599         XMIT_LITERAL("IHAVE ");
600       else
601         XMIT_LITERAL("CHECK ");
602       xmit_noalloc(art->mid, art->midlen);
603       XMIT_LITERAL("\r\n");
604
605       LIST_ADDTAIL(conn->sent, art);
606     }
607   }
608 }
609
610 /*========== responses from peer ==========*/
611
612 static const oop_rd_style peer_rd_style= {
613   OOP_RD_DELIM_STRIP, '\n',
614   OOP_RD_NUL_FORBID,
615   OOP_RD_SHORTREC_FORBID
616 };
617
618 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
619                         const char *errmsg, int errnoval,
620                         const char *data, size_t recsz, void *conn_v) {
621   Conn *conn= conn_v;
622
623   if (ev == OOP_RD_EOF) {
624     warn("unexpected EOF from peer");
625     conn_failed(conn);
626     return;
627   }
628   assert(ev == OOP_RD_OK);
629
630   char *ep;
631   unsigned long code= strtoul(data, &ep, 10);
632   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
633     char sanibuf[100];
634     const char *p= data;
635     char *q= sanibuf;
636     *q++= '`';
637     for (;;) {
638       if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; }
639       int c= *p++;
640       if (!c) { *q++= '\''; break; }
641       if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
642       sprintf(q,"\\x%02x",c);
643       q += 4;
644     }
645     warn("badly formatted response from peer: %s", sanibuf);
646     conn_failed(conn);
647     return;
648   }
649
650   if (conn->quitting) {
651     if (code!=205) {
652       warn("peer gave failure response to QUIT: %s", sani);
653       conn_failed(conn);
654       return;
655     }
656     conn close ok;
657     return;
658   }
659
660   switch (code) {
661   case 438: /* CHECK says they have it */
662   case 435: /* IHAVE says they have it */
663     ARTICLE_DEALTWITH(1,unwanted);
664     break;
665
666   case 238: /* CHECK says send it */
667   case 335: /* IHAVE says send it */
668     count_checkedwanted++;
669     Article *art= LIST_REMHEAD(conn->sent);
670     art->checked= 1;
671     LIST_ADDTAIL(conn->queue);
672     break;
673
674   case 235: /* IHAVE says thanks */
675   case 239: /* TAKETHIS says thanks */
676     ARTICLE_DEALTWITH(1,accepted);
677     break;
678
679   case 439: /* TAKETHIS says rejected */
680   case 437: /* IHAVE says rejected */
681     ARTICLE_DEALTWITH(1,rejected);
682     break;
683
684   case 431: /* CHECK or TAKETHIS says try later */
685   case 436: /* IHAVE says try later */
686     ARTICLE_DEALTWITH(0,deferred);
687     break;
688
689   case 400: warn("peer has stopped accepting articles: %s", sani); goto failed;
690   case 503: warn("peer timed us out: %s", sani);                   goto failed;
691   default:  warn("peer sent unexpected message: %s", sani);
692   failed:
693     conn_failed(conn);
694     return OOP_CONTINUE;;
695   }
696
697   return OOP_CONTINUE;
698 }
699
700 /*========== monitoring of input file ==========*/
701
702 /*---------- tailing input file ----------*/
703
704 static void filemon_start(InputFile *ipf) {
705   assert(!ipf->filemon);
706
707   ipf->filemon= xmalloc(sizeof(*ipf->filemon));
708   memset(ipf->filemon, 0, sizeof(*ipf->filemon));
709   filemon_method_startfile(ipf, ipf->filemon);
710 }
711  
712 static void filemon_stop(InputFile *ipf) {
713   if (!ipf->filemon) return;
714   filemon_method_stopfile(ipf, ipf->filemon);
715   free(ipf->filemon);
716   ipf->filemon= 0;
717 }
718
719 static void filemon_callback(InputFile *ipf) {
720   ipf->readable_callback(ipf->readable_callback_user);
721 }
722  
723 static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
724                                      void *user) {
725   InputFile *ipf= user;
726   return ipf->readable_callback(ipf->readable_callback_user);
727 }
728
729 static void on_cancel(struct oop_readable *rable) {
730   InputFile *ipf= (void*)rable;
731
732   if (ipf->filemon) filemon_stopfile(ipf);
733   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
734 }
735
736 static int tailing_on_readable(struct oop_readable *rable,
737                                 oop_readable_call *cb, void *user) {
738   InputFile *ipf= (void*)rable;
739
740   tailing_on_cancel(rable);
741   ipf->readable_callback= cb;
742   ipf->readable_callback_user= user;
743   filemon_startfile(ipf);
744
745   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
746   return 0;
747 }
748
749 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
750                                 size_t length) {
751   InputFile *ipf= (void*)rable;
752   for (;;) {
753     ssize_t r= read(ipf->fd, buffer, length);
754     if (!r && ipf==main_input_file) { errno=EAGAIN; return -1; }
755     if (r==-1 && errno==EINTR) continue;
756     return r;
757   }
758 }
759  
760 /*---------- filemon implemented with inotify ----------*/
761
762 #if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON)
763 #define HAVE_FILEMON
764
765 #include <linux/inotify.h>
766
767 static int filemon_inotify_fd;
768 static int filemon_inotify_wdmax;
769 static InputFile **filemon_inotify_wd2ipf;
770
771 typedef struct Filemon_Perfile {
772   int wd;
773 } Filemon_Inotify_Perfile;
774
775 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
776   int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
777   if (wd < 0) sysdie("inotify_add_watch %s", ipf->path);
778
779   if (wd >= filemon_inotify_wdmax) {
780     int newmax= wd+2;
781     filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf,
782                                  sizeof(*filemon_inotify_wd2ipf) * newmax);
783     memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
784            sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
785     filemon_inotify_wdmax= newmax;
786   }
787
788   assert(!filemon_inotify_wd2ipf[wd]);
789   filemon_inotify_wd2ipf[wd]= ipf;
790
791   pf->wd= wd;
792 }
793
794 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
795   int wd= pf->wd;
796   int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
797   if (r) sysdie("inotify_rm_watch");
798   filemon_inotify_wd2ipf[wd]= 0;
799 }
800
801 static void *filemon_inotify_readable(oop_source *lp, int fd,
802                                       oop_event e, void *u) {
803   struct inotify_event iev;
804   for (;;) {
805     int r= read(filemon_inotify_fd, &iev, sizeof(iev));
806     if (r==-1) {
807       if (errno==EAGAIN) break;
808       sysdie("read from inotify master");
809     } else if (r==sizeof(iev)) {
810       assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
811     } else {
812       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
813     }
814     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
815     filemon_callback(ipf);
816   }
817   return OOP_CONTINUE;
818 }
819
820 static int filemon_method_init(void) {
821   filemon_inotify_fd= inotify_init();
822   if (filemon_inotify_fd<0) {
823     syswarn("could not initialise inotify: inotify_init failed");
824     return 0;
825   }
826   set nonblock;
827   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
828
829   return 1;
830 }
831
832 #endif /* HAVE_INOTIFY && !HAVE_FILEMON *//
833
834 /*---------- filemon dummy implementation ----------*/
835
836 #if !defined(HAVE_FILEMON)
837
838 typedef struct Filemon_Perfile { int dummy; } Filemon_Dummy_Perfile;
839
840 static int filemon_method_init(void) { return 0; }
841 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
842 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
843
844 #endif /* !HAVE_FILEMON */
845
846 /*---------- interface to start and stop an input file ----------*/
847
848 static const oop_rd_style feedfile_rdstyle= {
849   OOP_RD_DELIM_STRIP, '\n',
850   OOP_RD_NUL_FORBID,
851   OOP_RD_SHORTREC_EOF,
852 };
853  
854 static void inputfile_tailing_start(InputFile *ipf) {
855   assert(!ipf->fd);
856   ipf->readable->on_readable= tailing_on_readable;
857   ipf->readable->on_cancel=   tailing_on_cancel;
858   ipf->readable->try_read=    tailing_try_read;
859   ipf->readable->delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
860   ipf->readable->delete_kill= 0;
861
862   ipf->readable_callback= 0;
863   ipf->readable_callback_user= 0;
864
865   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
866   assert(ipf->fd);
867
868   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
869                      feedfile_got_article,ipf, feedfile_problem,ipf);
870   if (r) sysdie("unable start reading feedfile %s",ipf->path);
871 }
872
873 static void inputfile_tailing_stop(InputFile *ipf) {
874   assert(ipf->fd);
875   oop_rd_delete(ipf->rd);
876   ipf->rd= 0;
877   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
878 }
879
880 /*========== interaction with innd ==========*/
881
882 /* See official state diagram at top of file.  We implement
883  * this as follows:
884  *
885            ================
886             WAITING
887            [Nothing/Noduct]
888             poll for F
889            ================
890                 |
891                 |     TIMEOUT
892                 |`--------------------------.
893                 |                           | install defer as backlog
894      ,--------->|                           | exit
895      |          | OPEN F SUCCEEDS           V
896      |          V                         =========
897      |     ========                        (ESRCH)
898      |      NORMAL                        [Dropped]
899      |     [Normal]                       =========
900      |      read F
901      |     ========
902      |          |
903      |          | F IS SO BIG WE SHOULD FLUSH
904      ^          | hardlink F to D
905      |     [Hardlinked]
906      |          | unlink F
907      |          | our handle onto F is now onto D
908      |     [Moved]
909      |          |
910      |          |<---------------------------------------------------.
911      |          |                                                    |
912      |          | spawn inndcomm flush                               |
913      |          V                                                    |
914      |     ==========                                                |
915      |      FLUSHING                                                 |
916      |     [Flushing]                                                |
917      |      read D                                                   |
918      |     ==========                                                |
919      |          |                                                    |
920      |          |   INNDCOMM FLUSH FAILS                             ^
921      |          |`----------------------->--------.                  |
922      |          |                                 |                  |
923      |          |   NO SUCH SITE                  V                  |
924      ^          |`----------------.            =========             |
925      |          |                 |            FLUSHFAIL             |
926      |          |                 V            [Moved]               |
927      |          |            ==========        read D                |
928      |          |             DROPPING         =========             |
929      |          |            [Dropping]           |                  |
930      |          |             read D              | TIME TO RETRY    |
931      |          |            ==========           `------------------'
932      |          | FLUSH OK        |
933      |          | open F          | AT EOF OF D AND ALL PROCESSED
934      |          V                 | install defer as backlog
935      |     ===========            | unlink D
936      |      SEPARATED             | exit
937      |     [Separated]            V
938      |      read D            ==========
939      |     ===========         (ESRCH)
940      |          |             [Droppped]
941      |          |             ==========
942      |          V
943      |          | AT EOF OF D
944      ^          |
945      |     ===========
946      |      FINISHING
947      |     [Finishing]
948      |      read F
949      |      write D
950      |     ===========
951      |          |
952      |          | ALL D PROCESSED
953      |          | install defer as backlog
954      |          | start new defer
955      ^          V unlink D
956      |          | close D
957      |          |
958      `----------'
959
960  *
961  */
962
963 static char *path_ductlock, *path_duct, *path_ductdefer;
964
965 typedef struct {
966   /* This is an instance of struct oop_readable */
967   struct oop_readable readable; /* first */
968   oop_readable_call *readable_callback;
969   void *readable_callback_user;
970
971   int fd;
972   const char *path; /* ptr copy of path_<foo> or feedfile */
973   struct Filemon_Perfile *filemon;
974
975   oop_read *rd;
976   long inprogress; /* no. of articles read but not processed */
977 } InputFile;
978
979 static void statemc_init(void) {
980   path_ductlock=  xasprintf("%s_duct.lock",  feedfile);
981   path_duct=      xasprintf("%s_duct",       feedfile);
982   path_ductdefer= xasprintf("%s_duct.defer", feedfile);
983
984   int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600);
985   if (lockfd<0) sysdie("open lockfile %s", path_ductlock);
986
987   struct flock fl;
988   memset(&fl,0,sizeof(fl));
989   fl.l_type= F_WRLCK;
990   fl.l_whence= SEEK_SET;
991   r= fcntl(lockfd, F_SETLK, &fl);
992   if (r==-1) {
993     if (errno==EACCES || errno==EAGAIN)
994       die("another duct holds the lockfile");
995     sysdie("fcntl F_SETLK lockfile %s", path_ductlock);
996   }
997 }
998
999 static void statemc_poll(void) {
1000   if (tailing_fd>=0) return;
1001
1002   int d_fd= open(path_duct, O_RDWR);
1003   if (d_fd<0)
1004     if (errno!=ENOENT) sysdie("open duct file %s", path_duct);
1005
1006   int f_fd= open(feedfile, O_RDWR);
1007   if (f_fd<0)
1008     if (errno!=ENOENT) sysdie("open feed file %s", feedfile);
1009
1010   if (d_fd<0) {
1011     if (f_fd>=0)
1012       start_tailing(f_fd);
1013     return;
1014   }
1015
1016
1017
1018 /*========== main program ==========*/
1019
1020 #define EVERY(what, interval, body)                                          \
1021   static const struct timeval what##_timeout = { 5, 0 };                     \
1022   static void what##_schedule(void);                                         \
1023   static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \
1024     { body }                                                                 \
1025     what##_schedule();                                                       \
1026   }                                                                          \
1027   static void what##_schedule(void) {                                        \
1028     loop->on_time(loop, what##_timeout, what##_timedout, 0);                 \
1029   }
1030
1031 EVERY(filepoll, {5,0}, { check_master_queue(); })
1032
1033 EVERY(period, {PERIOD_SECONDS,0}, {
1034   if (connect_delay) connect_delay--;
1035   statemc_poll();
1036   check_master_queue();
1037 });
1038
1039 main {
1040   ignore sigpipe;
1041   if (!filemon_init())
1042     filepoll_schedule();
1043   period_schedule();
1044 };