X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=51b423303fb4023f3008554e64db0a6965d2afb1;hb=81f2b44ebdc2ec86a9b5910fb2af3d9e9d5effa9;hp=610504e8d84c0dc858d424d3ca890f753b7a3cc8;hpb=ece47a280c15bca137b918d8fa9695e7c38c25df;p=innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index 610504e..51b4233 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -2,54 +2,127 @@ * Four files full of * token article * - * site.name_ductlock lock taken out by innduct - * F site.name written by innd - * D site.name_duct moved aside by innduct, for flush - * site.name_deferwork 431'd articles, still being written - * site.name_defergo_ 431'd articles, ready for innxmit - * site.name_deferlock lock taken out by innxmit wrapper - * - * - * - * OVERALL STATES: + * site.name_duct.lock lock preventing multiple ducts + * holder of this lock is "duct" + * F site.name main feed file + * opened/created, then written, by innd + * read by duct + * unlinked by duct + * tokens blanked out by duct when processed + * D site.name_duct temporary feed file during flush (or crash) + * hardlink created by duct + * unlinked by duct + * site.name_duct.defer 431'd articles, still being written, + * created, written, used by duct + * site.name_backlog.lock lock taken out by innxmit wrapper + * holder and its child are "xmit" + * site.name_backlog_ 431'd articles, ready for innxmit + * created (link/mv) by duct + * read by xmit + * unlinked by xmit + * site.name_backlog_ eg + * site.name_backlog_manual + * anything the sysadmin likes (eg, feed files + * from old feeds to be merged into this one) + * created (link/mv) by admin + * read by xmit + * unlinked by xmit + + + OVERALL STATES: + + START + | + check D, F + | + <--------------------------------------'| + Nothing F, D both ENOENT | + F: ENOENT | + D: ENOENT | + duct: not not reading anything | + | | + |`---------------------. | + | | duct times out waiting for F | + V innd creates F | duct exits | + | V | + Noduct GO TO Dropped | + F: innd writing | + D: ENOENT | + duct: not running or not reading anything | + | | + | | + ,-->--+ <---------------------------------'| + | | duct opens F F exists | + | | D ENOENT | + | V | + | Normal | + | F: innd writing, duct reading | + | D: ENOENT | + | | | + | | duct decides time to flush | + | | duct makes hardlink | + | | | + | V <------------------------'| + | Hardlinked F==D | + | F == D: innd writing, duct reading both exist | + ^ | | + | | duct unlinks F | + | V | + | Moved <----+------------<--'| + | F: ENOENT | F ENOENT | + | D: innd writing, duct reading | D exists | + | | | | + | | duct requests flush of feed | | + | | (others can too, harmlessly) | | + | V | | + | Flushing | | + | F: ENOENT | | + | D: innd flushing, duct reading | | + | | | | + | | inndcomm flush fails | | + | |`-------------------------->---------' | + | | | + | | inndcomm reports no such site | + | |`---------------------------------------------------- | -. + | | | | + | | innd finishes writing D, creates F | | + | | inndcomm reports flush successful | | + | | | | + | V | | + | Separated <----------------' | + | F: innd writing F!=D / + | D: duct reading both exist / + | | / + | | duct gets to the end of D / + | | duct opens F too / + | V / + | Finishing / + | F: innd writing, duct reading | + | D: duct finishing V + | | Dropping + | | duct finishes processing D F: ENOENT + | V duct unlinks D D: duct reading + | | | + `--<--' | duct finishes + | processing D + | duct unlinks D + | duct exits + V + Dropped + F: ENOENT + D: ENOENT + duct not running + + "duct reading" means innduct is reading the file but also + overwriting processed tokens. + * - * START - * ,-->--. | - * | | stat D - * | | / | - * | | ENOENT/ |exists - * | V <----------' | - * | Normal stat F - * | F: innd writing, duct reading /|\ - * | D: ENOENT / | \ - * | | / | \ - * | | duct decides time to flush same / | | - * | | duct makes hardlink as D / | | - * | | / | | - * | V <---------' | | - * | Hardlinked | | - * | F == D: innd writing, duct reading | | - * ^ | | | - * | | duct unlinks F / | - * | V ENOENT / | - * | Moved <------------' | - * | F: ENOENT | - * | D: innd writing, duct reading | - * | | | - * | | duct flushes feed | - * | | (others can too, harmlessly) | - * | V | - * | Separated <-----------------' - * | F: innd writing different to D - * | D: duct reading - * | | - * | V duct completes processing of D - * | | duct unlinks D - * | | - * `--<--' * */ +#define PERIOD_SECONDS 60 + +static char *feedfile; static int max_connections, max_queue_per_conn; static int connection_setup_timeout, port, try_stream; static const char *remote_host; @@ -76,23 +149,42 @@ static const char *remote_host; struct Article { char *mid; int midlen; - int nocheck; /* also used when CHECK says yes please */ + int checked, sentbody; + fd and offset for blanking token or mid; }; -#define CONNBUFSZ 16384 +#define CONNIOVS 128 #define CN "<%d> " typedef struct Conn Conn; + +typedef enum { + Malloc, Const, Artdata; +} XmitKind; + +typedef struct { + XmitKind kind; + union { + char *malloc_tofree; + ARTHANDLE *sm_art; + } info; +} XmitDetails; + struct Conn { ISNODE(Conn); int fd, max_queue, stream; - LIST(Article) queue; - Article *tosend; /* points into queue */ - char circ_buf[CONNBUFSZ]; - unsigned circ_read, circ_write; + LIST(Article) queue; /* not yet told peer, or CHECK said send it */ + LIST(Article) sent; /* offered/transmitted - in xmit or waiting reply */ + struct iovec xmit[CONNIOVS]; + XmitDetails xmitd[CONNIOVS]; + int xmitu; }; +static int filemon_init(void); +static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path); +static void filemon_callback(void); + #define CHILD_ESTATUS_STREAM 4 #define CHILD_ESTATUS_NOSTREAM 5 @@ -105,7 +197,6 @@ static LIST(Article) *queue; static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } } - /*========== making new connections ==========*/ static int connecting_sockets[2]= {-1,-1}; @@ -165,7 +256,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { Conn *conn= 0; conn= xcalloc(sizeof(*conn)); - + DECL_MSG_CMSG(msg); struct cmsghdr *h= 0; ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT); @@ -233,7 +324,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { LIST_ADDHEAD(idle, conn); notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain"); connect_attempt_discard(); - process_queue(); + check_master_queue(); return 0; x: @@ -347,65 +438,607 @@ static void connect_start() { } /*========== overall control of article flow ==========*/ - -static void process_queue() { + +static void conn_check_work(Conn *conn); + +static void check_master_queue(void) { + try reading current feed file; + if (!queue.count) return; - if (working.head) { - transmit(working.head); - } else if (idle.head) { - transmit(idle.head); - } else if (nconns < maxconns && queue.count >= max_queue_per_conn && - !connecting_child && !connect_delay) { - connect_delay= reconnect_delay_periods; - connect_start(); + Conn *last_assigned=0; + for (;;) { + if (working.head) { + conn_assign_one_article(&working, &last_assigned); + } else if (idle.head) { + conn_assign_one_article(&idle, &last_assigned); + } else if (nconns < maxconns && queue.count >= max_queue_per_conn && + !connecting_child && !connect_delay) { + connect_delay= reconnect_delay_periods; + connect_start(); + } else { + break; + } } -} - -/*========== article transmission ==========*/ + conn_check_work(last_assigned); +} + +static void conn_assign_one_article(LIST(Conn) *connlist, + Conn **last_assigned) { + Conn *conn= connlist->head; + + LIST_REMOVE(*connlist, conn); + Article *art= LIST_REMHEAD(queue); + LIST_ADDTAIL(conn->queue, art); + LIST_ADD(*conn_determine_right_list(conn), conn); + + /* This slightly odd arrangement is so that we call conn_check_work + * once after filling the queue for a new connection in + * check_master_queue, rather than for each article. */ + if (conn != *last_assigned && *last_assigned) + conn_check_work(*last_assigned); + *last_assigned= conn; +} + +static int conn_total_queued_articles(Conn *conn) { + return conn->sent.count + conn->queue.count; +} + +static LIST(Conn) *conn_determine_right_list(Conn *conn) { + int inqueue= conn_total_queued_articles(conn); + assert(inqueue <= max_queue); + if (inqueue == 0) return &idle; + if (inqueue == conn->max_queue) return &full; + return &working; +} + +static void *conn_writeable(oop_source *l, int fd, int ev, void *u) { + check_conn_work(u); + return OOP_CONTINUE; +} -static void *conn_writeable() { +static void conn_check_work(Conn *conn) { + void *rp= 0; for (;;) { - int circ_used= circ_write - circ_read; - if (circ_used < 0) circ_used += CONNBUFSZ; - writeable_moredata(conn, CONNBUFSZ-1 - circ_used); - - if (conn->circ_read == conn->circ_write) - return OOP_CONTINUE; - - struct iovec iov[2]; - int niov= 1; - iov[0].iov_base= conn->circ_buf + conn->circ_read; - if (conn->circ_read > conn->circ_write) { /* wrapped */ - iov[0].iov_len= CONNBUFSZ - conn->circ_read; - iov[1].iov_base= conn->circ_buf; - iov[1].iov_len= conn->circ_write; - if (niov[1].iov_len) niov= 2; + conn_make_some_xmits(conn); + if (!conn->xmitu) { + loop->cancel_fd(loop, conn->fd, OOP_WRITE); + return; + } + + void *rp= conn_write_some_xmits(conn); + if (rp==OOP_CONTINUE) { + loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn); + return; + } else if (rp==OOP_HALT) { + return; + } else if (!rp) { + /* transmitted everything */ } else { - iov[0].iov_len= conn->circ_write - conn->circ_read; + abort(); } - ssize_t rs= writev(conn->fd, &iov, niov); + } +} + +/*========== article transmission ==========*/ + +static void *conn_write_some_xmits(Conn *conn) { + /* return values: + * 0: nothing more to write, no need to call us again + * OOP_CONTINUE: more to write but fd not writeable + * OOP_HALT: disaster, have destroyed conn + */ + for (;;) { + int count= conn->xmitu; + if (!count) return 0; + + if (count > IOV_MAX) count= IOV_MAX; + ssize_t rs= writev(conn->fd, conn->xmit, count); if (rs < 0) { if (errno == EAGAIN) return OOP_CONTINUE; syswarn(CN "write failed", conn->fd); conn_failed(conn); - return OOP_CONTINUE; + return OOP_HALT; } assert(rs > 0); - conn->circ_read += rs; - if (conn->circ_read > CONNBUFSZ) - conn->circ_read -= CONNBUFSZ; + for (done=0; rs && donexmit[done]; + XmitDetails *dp= &conn->xmitd[done]; + if (rs > vp->iov_len) { + rs -= vp->iov_len; + xmit_free(dp); + } else { + vp->iov_base += rs; + vp->iov_len -= rs; + } + } + int newu= conn->xmitu - done; + memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit)); + memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd)); + conn->xmitu= newu; + } +} + +static void conn_make_some_xmits(Conn *conn) { + for (;;) { + if (conn->xmitu+5 > CONNIOVS) + break; + + Article *art= LIST_REMHEAD(queue); + if (!art) break; + + if (art->checked || conn->nocheck) { + /* actually send it */ + + ARTHANDLE *artdata= SMretrieve(somehow); + + if (conn->stream) { + if (artdata) { + XMIT_LITERAL("TAKETHIS "); + xmit_noalloc(art->mid, art->midlen); + XMIT_LITERAL("\r\n"); + xmit_artbody(artdata); + } + } else { + /* we got 235 from IHAVE */ + if (artdata) { + xmit_artbody(artdata); + } else { + XMIT_LITERAL(".\r\n"); + } + } + art->sent= 1; + LIST_ADDTAIL(conn->sent, art); + + } else { + /* check it */ + + if (conn->stream) + XMIT_LITERAL("IHAVE "); + else + XMIT_LITERAL("CHECK "); + xmit_noalloc(art->mid, art->midlen); + XMIT_LITERAL("\r\n"); + + LIST_ADDTAIL(conn->sent, art); + } + } +} + +/*========== responses from peer ==========*/ + +static const oop_rd_style peer_rd_style= { + OOP_RD_DELIM_STRIP, '\n', + OOP_RD_NUL_FORBID, + OOP_RD_SHORTREC_FORBID +}; + +static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *conn_v) { + Conn *conn= conn_v; + + if (ev == OOP_RD_EOF) { + warn("unexpected EOF from peer"); + conn_failed(conn); + return; + } + assert(ev == OOP_RD_OK); + + char *ep; + unsigned long code= strtoul(data, &ep, 10); + if (ep != data+3 || *ep != ' ' || data[0]=='0') { + char sanibuf[100]; + const char *p= data; + char *q= sanibuf; + *q++= '`'; + for (;;) { + if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; } + int c= *p++; + if (!c) { *q++= '\''; break; } + if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; } + sprintf(q,"\\x%02x",c); + q += 4; + } + warn("badly formatted response from peer: %s", sanibuf); + conn_failed(conn); + return; + } + + if (conn->quitting) { + if (code!=205) { + warn("peer gave failure response to QUIT: %s", sani); + conn_failed(conn); + return; + } + conn close ok; + return; + } + + switch (code) { + case 438: /* CHECK says they have it */ + case 435: /* IHAVE says they have it */ + ARTICLE_DEALTWITH(1,unwanted); + break; + + case 238: /* CHECK says send it */ + case 335: /* IHAVE says send it */ + count_checkedwanted++; + Article *art= LIST_REMHEAD(conn->sent); + art->checked= 1; + LIST_ADDTAIL(conn->queue); + break; + + case 235: /* IHAVE says thanks */ + case 239: /* TAKETHIS says thanks */ + ARTICLE_DEALTWITH(1,accepted); + break; + + case 439: /* TAKETHIS says rejected */ + case 437: /* IHAVE says rejected */ + ARTICLE_DEALTWITH(1,rejected); + break; + + case 431: /* CHECK or TAKETHIS says try later */ + case 436: /* IHAVE says try later */ + ARTICLE_DEALTWITH(0,deferred); + break; + + case 400: warn("peer has stopped accepting articles: %s", sani); goto failed; + case 503: warn("peer timed us out: %s", sani); goto failed; + default: warn("peer sent unexpected message: %s", sani); + failed: + conn_failed(conn); + return OOP_CONTINUE;; + } + + return OOP_CONTINUE; +} + +/*========== monitoring of input file ==========*/ + +/*---------- tailing input file ----------*/ + +static void filemon_start(InputFile *ipf) { + assert(!ipf->filemon); + + ipf->filemon= xmalloc(sizeof(*ipf->filemon)); + memset(ipf->filemon, 0, sizeof(*ipf->filemon)); + filemon_method_startfile(ipf, ipf->filemon); +} + +static void filemon_stop(InputFile *ipf) { + if (!ipf->filemon) return; + filemon_method_stopfile(ipf, ipf->filemon); + free(ipf->filemon); + ipf->filemon= 0; +} + +static void filemon_callback(InputFile *ipf) { + ipf->readable_callback(ipf->readable_callback_user); +} + +static void *tailing_rable_call_time(oop_source *loop, struct timeval tv, + void *user) { + InputFile *ipf= user; + return ipf->readable_callback(ipf->readable_callback_user); +} + +static void on_cancel(struct oop_readable *rable) { + InputFile *ipf= (void*)rable; + + if (ipf->filemon) filemon_stopfile(ipf); + loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); +} + +static int tailing_on_readable(struct oop_readable *rable, + oop_readable_call *cb, void *user) { + InputFile *ipf= (void*)rable; + + tailing_on_cancel(rable); + ipf->readable_callback= cb; + ipf->readable_callback_user= user; + filemon_startfile(ipf); + + loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); + return 0; +} + +static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, + size_t length) { + InputFile *ipf= (void*)rable; + for (;;) { + ssize_t r= read(ipf->fd, buffer, length); + if (!r && ipf==main_input_file) { errno=EAGAIN; return -1; } + if (r==-1 && errno==EINTR) continue; + return r; + } +} + +/*---------- filemon implemented with inotify ----------*/ + +#if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON) +#define HAVE_FILEMON + +#include + +static int filemon_inotify_fd; +static int filemon_inotify_wdmax; +static InputFile **filemon_inotify_wd2ipf; + +typedef struct Filemon_Perfile { + int wd; +} Filemon_Inotify_Perfile; + +static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { + int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY); + if (wd < 0) sysdie("inotify_add_watch %s", ipf->path); + + if (wd >= filemon_inotify_wdmax) { + int newmax= wd+2; + filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf, + sizeof(*filemon_inotify_wd2ipf) * newmax); + memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0, + sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax)); + filemon_inotify_wdmax= newmax; + } + + assert(!filemon_inotify_wd2ipf[wd]); + filemon_inotify_wd2ipf[wd]= ipf; + + pf->wd= wd; +} + +static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { + int wd= pf->wd; + int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd); + if (r) sysdie("inotify_rm_watch"); + filemon_inotify_wd2ipf[wd]= 0; +} + +static void *filemon_inotify_readable(oop_source *lp, int fd, + oop_event e, void *u) { + struct inotify_event iev; + for (;;) { + int r= read(filemon_inotify_fd, &iev, sizeof(iev)); + if (r==-1) { + if (errno==EAGAIN) break; + sysdie("read from inotify master"); + } else if (r==sizeof(iev)) { + assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax); + } else { + die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev)); + } + InputFile *ipf= filemon_inotify_wd2ipf[iev.wd]; + filemon_callback(ipf); } + return OOP_CONTINUE; } +static int filemon_method_init(void) { + filemon_inotify_fd= inotify_init(); + if (filemon_inotify_fd<0) { + syswarn("could not initialise inotify: inotify_init failed"); + return 0; + } + set nonblock; + loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable); + return 1; +} + +#endif /* HAVE_INOTIFY && !HAVE_FILEMON *// + +/*---------- filemon dummy implementation ----------*/ + +#if !defined(HAVE_FILEMON) + +typedef struct Filemon_Perfile { int dummy; } Filemon_Dummy_Perfile; + +static int filemon_method_init(void) { return 0; } +static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { } +static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { } + +#endif /* !HAVE_FILEMON */ + +/*---------- interface to start and stop an input file ----------*/ + +static const oop_rd_style feedfile_rdstyle= { + OOP_RD_DELIM_STRIP, '\n', + OOP_RD_NUL_FORBID, + OOP_RD_SHORTREC_EOF, +}; -static void transmit(Conn *conn) { - assert(conn->queue.count < max_queue); - +static void inputfile_tailing_start(InputFile *ipf) { + assert(!ipf->fd); + ipf->readable->on_readable= tailing_on_readable; + ipf->readable->on_cancel= tailing_on_cancel; + ipf->readable->try_read= tailing_try_read; + ipf->readable->delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */ + ipf->readable->delete_kill= 0; + + ipf->readable_callback= 0; + ipf->readable_callback_user= 0; + + ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0); + assert(ipf->fd); + + int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, + feedfile_got_article,ipf, feedfile_problem,ipf); + if (r) sysdie("unable start reading feedfile %s",ipf->path); +} + +static void inputfile_tailing_stop(InputFile *ipf) { + assert(ipf->fd); + oop_rd_delete(ipf->rd); + ipf->rd= 0; + assert(!ipf->filemon); /* we shouldn't be monitoring it now */ +} + +/*========== interaction with innd ==========*/ + +/* See official state diagram at top of file. We implement + * this as follows: + * + ================ + WAITING + [Nothing/Noduct] + poll for F + ================ + | + | TIMEOUT + |`--------------------------. + | | install defer as backlog + ,--------->| | exit + | | OPEN F SUCCEEDS V + | V ========= + | ======== (ESRCH) + | NORMAL [Dropped] + | [Normal] ========= + | read F + | ======== + | | + | | F IS SO BIG WE SHOULD FLUSH + ^ | hardlink F to D + | [Hardlinked] + | | unlink F + | | our handle onto F is now onto D + | [Moved] + | | + | |<---------------------------------------------------. + | | | + | | spawn inndcomm flush | + | V | + | ========== | + | FLUSHING | + | [Flushing] | + | read D | + | ========== | + | | | + | | INNDCOMM FLUSH FAILS ^ + | |`----------------------->--------. | + | | | | + | | NO SUCH SITE V | + ^ |`----------------. ========= | + | | | FLUSHFAIL | + | | V [Moved] | + | | ========== read D | + | | DROPPING ========= | + | | [Dropping] | | + | | read D | TIME TO RETRY | + | | ========== `------------------' + | | FLUSH OK | + | | open F | AT EOF OF D AND ALL PROCESSED + | V | install defer as backlog + | =========== | unlink D + | SEPARATED | exit + | [Separated] V + | read D ========== + | =========== (ESRCH) + | | [Droppped] + | | ========== + | V + | | AT EOF OF D + ^ | + | =========== + | FINISHING + | [Finishing] + | read F + | write D + | =========== + | | + | | ALL D PROCESSED + | | install defer as backlog + | | start new defer + ^ V unlink D + | | close D + | | + `----------' + + * + */ + +static char *path_ductlock, *path_duct, *path_ductdefer; + +typedef struct { + /* This is an instance of struct oop_readable */ + struct oop_readable readable; /* first */ + oop_readable_call *readable_callback; + void *readable_callback_user; + + int fd; + const char *path; /* ptr copy of path_ or feedfile */ + struct Filemon_Perfile *filemon; + + oop_read *rd; + long inprogress; /* no. of articles read but not processed */ +} InputFile; + +static void statemc_init(void) { + path_ductlock= xasprintf("%s_duct.lock", feedfile); + path_duct= xasprintf("%s_duct", feedfile); + path_ductdefer= xasprintf("%s_duct.defer", feedfile); + + int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600); + if (lockfd<0) sysdie("open lockfile %s", path_ductlock); + + struct flock fl; + memset(&fl,0,sizeof(fl)); + fl.l_type= F_WRLCK; + fl.l_whence= SEEK_SET; + r= fcntl(lockfd, F_SETLK, &fl); + if (r==-1) { + if (errno==EACCES || errno==EAGAIN) + die("another duct holds the lockfile"); + sysdie("fcntl F_SETLK lockfile %s", path_ductlock); + } +} + +static void statemc_poll(void) { + if (tailing_fd>=0) return; + + int d_fd= open(path_duct, O_RDWR); + if (d_fd<0) + if (errno!=ENOENT) sysdie("open duct file %s", path_duct); + + int f_fd= open(feedfile, O_RDWR); + if (f_fd<0) + if (errno!=ENOENT) sysdie("open feed file %s", feedfile); + + if (d_fd<0) { + if (f_fd>=0) + start_tailing(f_fd); + return; + } + + + +/*========== main program ==========*/ + +#define EVERY(what, interval, body) \ + static const struct timeval what##_timeout = { 5, 0 }; \ + static void what##_schedule(void); \ + static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ + { body } \ + what##_schedule(); \ + } \ + static void what##_schedule(void) { \ + loop->on_time(loop, what##_timeout, what##_timedout, 0); \ + } + +EVERY(filepoll, {5,0}, { check_master_queue(); }) + +EVERY(period, {PERIOD_SECONDS,0}, { + if (connect_delay) connect_delay--; + statemc_poll(); + check_master_queue(); +}); main { ignore sigpipe; + if (!filemon_init()) + filepoll_schedule(); + period_schedule(); };