X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;ds=sidebyside;f=backends%2Finnduct.c;h=a2a4ec60a27b46948fb88f74c65c441418ec159b;hb=89b4d7115aba99970fcd4076f39ad4bc1944594a;hp=65417c91aaac29eae95b4b3a135e138c00ebb9b3;hpb=3cfc1f3827207ca05205357a5cacd710eed6c861;p=inn-innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index 65417c9..a2a4ec6 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -2,25 +2,53 @@ * Four files full of * token article * - * site.name_ductlock lock taken out by innduct - * F site.name written by innd - * D site.name_duct moved aside by innduct, for flush - * site.name_deferwork 431'd articles, still being written - * site.name_defergo_ 431'd articles, ready for innxmit - * site.name_deferlock lock taken out by innxmit wrapper - * + * site.name_duct.lock lock preventing multiple ducts + * holder of this lock is "duct" + * F site.name main feed file + * opened/created, then written, by innd + * read by duct + * unlinked by duct + * tokens blanked out by duct when processed + * D site.name_duct temporary feed file during flush (or crash) + * hardlink created by duct + * unlinked by duct + * site.name_duct.defer 431'd articles, still being written, + * created, written, used by duct + * site.name_backlog.lock lock taken out by innxmit wrapper + * holder and its child are "xmit" + * site.name_backlog_ 431'd articles, ready for innxmit + * created (link/mv) by duct + * read by xmit + * unlinked by xmit + * site.name_backlog_ eg + * site.name_backlog_manual + * anything the sysadmin likes (eg, feed files + * from old feeds to be merged into this one) + * created (link/mv) by admin + * read by xmit + * unlinked by xmit * * * OVERALL STATES: * * START * ,-->--. | - * | | stat D + * | | check D * | | / | * | | ENOENT/ |exists - * | V <----------' | - * | Normal stat F - * | F: innd writing, duct reading /|\ + * | V <----------+--<----------------------' | + * | None | | + * | F: unopened | | + * | D: ENOENT | | + * | | | | + * | repeatedly | | + * | open F ------->-' | + * | | ENOENT | + * | |OK | + * | | | + * | V | + * | Normal check F + * | F: innd writing, duct reading / editing /|\ * | D: ENOENT / | \ * | | / | \ * | | duct decides time to flush same / | | @@ -28,20 +56,20 @@ * | | / | | * | V <---------' | | * | Hardlinked | | - * | F == D: innd writing, duct reading | | + * | F == D: innd writing, duct reading / editing | | * ^ | | | * | | duct unlinks F / | * | V ENOENT / | * | Moved <------------' | * | F: ENOENT | - * | D: innd writing, duct reading | + * | D: innd writing, duct reading / editing | * | | | - * | | duct flushes feed | + * | | duct requests flush of feed | * | | (others can too, harmlessly) | * | V | * | Separated <-----------------' * | F: innd writing different to D - * | D: duct reading + * | D: duct reading / editing * | | * | V duct completes processing of D * | | duct unlinks D @@ -50,6 +78,9 @@ * */ +#define PERIOD_SECONDS 60 + +static char *feedfile; static int max_connections, max_queue_per_conn; static int connection_setup_timeout, port, try_stream; static const char *remote_host; @@ -108,6 +139,10 @@ struct Conn { int xmitu; }; +static int filemon_init(void); +static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path); +static void filemon_callback(void); + #define CHILD_ESTATUS_STREAM 4 #define CHILD_ESTATUS_NOSTREAM 5 @@ -120,7 +155,6 @@ static LIST(Article) *queue; static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } } - /*========== making new connections ==========*/ static int connecting_sockets[2]= {-1,-1}; @@ -180,7 +214,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { Conn *conn= 0; conn= xcalloc(sizeof(*conn)); - + DECL_MSG_CMSG(msg); struct cmsghdr *h= 0; ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT); @@ -362,10 +396,12 @@ static void connect_start() { } /*========== overall control of article flow ==========*/ - + static void conn_check_work(Conn *conn); static void check_master_queue(void) { + try reading current feed file; + if (!queue.count) return; @@ -384,8 +420,8 @@ static void check_master_queue(void) { } } conn_check_work(last_assigned); -} - +} + static void conn_assign_one_article(LIST(Conn) *connlist, Conn **last_assigned) { Conn *conn= connlist->head; @@ -407,7 +443,7 @@ static int conn_total_queued_articles(Conn *conn) { return conn->sent.count + conn->queue.count; } -static LIST(Conn) *conn_determine_right_list(Conn *conn) { +static LIST(Conn) *conn_determine_right_list(Conn *conn) { int inqueue= conn_total_queued_articles(conn); assert(inqueue <= max_queue); if (inqueue == 0) return &idle; @@ -419,7 +455,7 @@ static void *conn_writeable(oop_source *l, int fd, int ev, void *u) { check_conn_work(u); return OOP_CONTINUE; } - + static void conn_check_work(Conn *conn) { void *rp= 0; for (;;) { @@ -442,7 +478,7 @@ static void conn_check_work(Conn *conn) { } } } - + /*========== article transmission ==========*/ static void *conn_write_some_xmits(Conn *conn) { @@ -454,7 +490,7 @@ static void *conn_write_some_xmits(Conn *conn) { for (;;) { int count= conn->xmitu; if (!count) return 0; - + if (count > IOV_MAX) count= IOV_MAX; ssize_t rs= writev(conn->fd, conn->xmit, count); if (rs < 0) { @@ -495,7 +531,7 @@ static void conn_make_some_xmits(Conn *conn) { /* actually send it */ ARTHANDLE *artdata= SMretrieve(somehow); - + if (conn->stream) { if (artdata) { XMIT_LITERAL("TAKETHIS "); @@ -516,7 +552,7 @@ static void conn_make_some_xmits(Conn *conn) { } else { /* check it */ - + if (conn->stream) XMIT_LITERAL("IHAVE "); else @@ -617,8 +653,148 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, } return OOP_CONTINUE; -} - +} + +/*========== monitoring of input file ==========*/ + +/*---------- tailing input file ----------*/ + + + +/*---------- filemon implemented with inotify ----------*/ + +#if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON) +#define HAVE_FILEMON + +#include + +static int filemon_inotify_fd; +static int filemon_inotify_wd= -1; + +static void *filemon_inotify_readable(oop_source *lp, int fd, + oop_event e, void *u) { + struct inotify_event iev; + for (;;) { + int r= read(filemon_inotify_fd, &iev, sizeof(iev)); + if (r==-1) { + if (errno==EAGAIN) break; + sysdie("read from inotify master"); + } else if (r==sizeof(iev)) { + assert(wd == filemon_inotify_wd); + } else { + die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev)); + } + } + /* Technically speaking the select might fire early in which case + * we'll read no events and then call filemon_callback + * unnecessarily. We don't care about that. + */ + filemon_callback(); + return OOP_CONTINUE; +} + +static int filemon_init(void) { + filemon_inotify_fd= inotify_init(); + if (filemon_inotify_fd<0) { + syswarn("could not initialise inotify: inotify_init failed"); + return 0; + } + set nonblock; + loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable); + + return 1; +} + +static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path) { + if (filemon_inotify_wd >= 0) { + int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd); + if (r) sysdie("inotify_rm_watch"); + } + filemon_inotify_wd= inotify_add_watch(filemon_inotify_fd, path, IN_MODIFY); + if (filemon_inotify_wd < 0) sysdie("inotify_add_watch"); +} + +#endif /* HAVE_INOTIFY && !HAVE_FILEMON *// + +/*---------- filemon dummy implementation ----------*/ + +#if !defined(HAVE_FILEMON) + +static int filemon_init(void) { return 0; } +static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path) { } + +#endif + +/*========== interaction with innd ==========*/ + +/* see state diagram at top of file */ + +static char *path_ductlock, *path_duct, *path_ductdefer; +static int tailing_fd= -1, flushing_fd= -1; + +static void statemc_init(void) { + path_ductlock= xasprintf("%s_duct.lock", feedfile); + path_duct= xasprintf("%s_duct", feedfile); + path_ductdefer= xasprintf("%s_duct.defer", feedfile); + + int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600); + if (lockfd<0) sysdie("open lockfile %s", path_ductlock); + + struct flock fl; + memset(&fl,0,sizeof(fl)); + fl.l_type= F_WRLCK; + fl.l_whence= SEEK_SET; + r= fcntl(lockfd, F_SETLK, &fl); + if (r==-1) { + if (errno==EACCES || errno==EAGAIN) + die("another duct holds the lockfile"); + sysdie("fcntl F_SETLK lockfile %s", path_ductlock); + } +} + +static void statemc_poll(void) { + if (tailing_fd>=0) return; + + int d_fd= open(path_duct, O_RDWR); + if (d_fd<0) + if (errno!=ENOENT) sysdie("open duct file %s", path_duct); + + int f_fd= open(feedfile, O_RDWR); + if (f_fd<0) + if (errno!=ENOENT) sysdie("open feed file %s", feedfile); + + if (d_fd<0) { + if (f_fd>=0) + start_tailing(f_fd); + return; + } + + + +/*========== main program ==========*/ + +#define EVERY(what, interval, body) \ + static const struct timeval what##_timeout = { 5, 0 }; \ + static void what##_schedule(void); \ + static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ + { body } \ + what##_schedule(); \ + } \ + static void what##_schedule(void) { \ + loop->on_time(loop, what##_timeout, what##_timedout, 0); \ + } + +EVERY(filepoll, {5,0}, { check_master_queue(); }) + +EVERY(period, {PERIOD_SECONDS,0}, { + if (connect_delay) connect_delay--; + statemc_poll(); + check_master_queue(); +}); + main { ignore sigpipe; + if (!filemon_init()) + filepoll_schedule(); + period_schedule(); };