* Four files full of
* token article
*
- * site.name_ductlock lock taken out by innduct
- * F site.name written by innd
- * D site.name_duct moved aside by innduct, for flush
- * site.name_deferwork 431'd articles, still being written
- * site.name_defergo_<inum> 431'd articles, ready for innxmit
- * site.name_deferlock lock taken out by innxmit wrapper
- *
+ * site.name_duct.lock lock preventing multiple ducts
+ * holder of this lock is "duct"
+ * F site.name main feed file
+ * opened/created, then written, by innd
+ * read by duct
+ * unlinked by duct
+ * tokens blanked out by duct when processed
+ * D site.name_duct temporary feed file during flush (or crash)
+ * hardlink created by duct
+ * unlinked by duct
+ * site.name_duct.defer 431'd articles, still being written,
+ * created, written, used by duct
+ * site.name_backlog.lock lock taken out by innxmit wrapper
+ * holder and its child are "xmit"
+ * site.name_backlog_<inum> 431'd articles, ready for innxmit
+ * created (link/mv) by duct
+ * read by xmit
+ * unlinked by xmit
+ * site.name_backlog_<letters> eg
+ * site.name_backlog_manual
+ * anything the sysadmin likes (eg, feed files
+ * from old feeds to be merged into this one)
+ * created (link/mv) by admin
+ * read by xmit
+ * unlinked by xmit
*
*
* OVERALL STATES:
*
* START
* ,-->--. |
- * | | stat D
+ * | | check D
* | | / |
* | | ENOENT/ |exists
- * | V <----------' |
- * | Normal stat F
- * | F: innd writing, duct reading /|\
+ * | V <----------+--<----------------------' |
+ * | None | |
+ * | F: unopened | |
+ * | D: ENOENT | |
+ * | | | |
+ * | repeatedly | |
+ * | open F ------->-' |
+ * | | ENOENT |
+ * | |OK |
+ * | | |
+ * | V |
+ * | Normal check F
+ * | F: innd writing, duct reading / editing /|\
* | D: ENOENT / | \
* | | / | \
* | | duct decides time to flush same / | |
* | | / | |
* | V <---------' | |
* | Hardlinked | |
- * | F == D: innd writing, duct reading | |
+ * | F == D: innd writing, duct reading / editing | |
* ^ | | |
* | | duct unlinks F / |
* | V ENOENT / |
* | Moved <------------' |
* | F: ENOENT |
- * | D: innd writing, duct reading |
+ * | D: innd writing, duct reading / editing |
* | | |
- * | | duct flushes feed |
+ * | | duct requests flush of feed |
* | | (others can too, harmlessly) |
* | V |
* | Separated <-----------------'
* | F: innd writing different to D
- * | D: duct reading
+ * | D: duct reading / editing
* | |
* | V duct completes processing of D
* | | duct unlinks D
*
*/
+#define PERIOD_SECONDS 60
+
+static char *feedfile;
static int max_connections, max_queue_per_conn;
static int connection_setup_timeout, port, try_stream;
static const char *remote_host;
int xmitu;
};
+static int filemon_init(void);
+static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path);
+static void filemon_callback(void);
+
#define CHILD_ESTATUS_STREAM 4
#define CHILD_ESTATUS_NOSTREAM 5
static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
-
/*========== making new connections ==========*/
static int connecting_sockets[2]= {-1,-1};
Conn *conn= 0;
conn= xcalloc(sizeof(*conn));
-
+
DECL_MSG_CMSG(msg);
struct cmsghdr *h= 0;
ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
}
/*========== overall control of article flow ==========*/
-
+
static void conn_check_work(Conn *conn);
static void check_master_queue(void) {
+ try reading current feed file;
+
if (!queue.count)
return;
}
}
conn_check_work(last_assigned);
-}
-
+}
+
static void conn_assign_one_article(LIST(Conn) *connlist,
Conn **last_assigned) {
Conn *conn= connlist->head;
return conn->sent.count + conn->queue.count;
}
-static LIST(Conn) *conn_determine_right_list(Conn *conn) {
+static LIST(Conn) *conn_determine_right_list(Conn *conn) {
int inqueue= conn_total_queued_articles(conn);
assert(inqueue <= max_queue);
if (inqueue == 0) return &idle;
check_conn_work(u);
return OOP_CONTINUE;
}
-
+
static void conn_check_work(Conn *conn) {
void *rp= 0;
for (;;) {
}
}
}
-
+
/*========== article transmission ==========*/
static void *conn_write_some_xmits(Conn *conn) {
for (;;) {
int count= conn->xmitu;
if (!count) return 0;
-
+
if (count > IOV_MAX) count= IOV_MAX;
ssize_t rs= writev(conn->fd, conn->xmit, count);
if (rs < 0) {
/* actually send it */
ARTHANDLE *artdata= SMretrieve(somehow);
-
+
if (conn->stream) {
if (artdata) {
XMIT_LITERAL("TAKETHIS ");
} else {
/* check it */
-
+
if (conn->stream)
XMIT_LITERAL("IHAVE ");
else
}
return OOP_CONTINUE;
-}
-
+}
+
+/*========== monitoring of input file ==========*/
+
+/*---------- tailing input file ----------*/
+
+
+
+/*---------- filemon implemented with inotify ----------*/
+
+#if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON)
+#define HAVE_FILEMON
+
+#include <linux/inotify.h>
+
+static int filemon_inotify_fd;
+static int filemon_inotify_wd= -1;
+
+static void *filemon_inotify_readable(oop_source *lp, int fd,
+ oop_event e, void *u) {
+ struct inotify_event iev;
+ for (;;) {
+ int r= read(filemon_inotify_fd, &iev, sizeof(iev));
+ if (r==-1) {
+ if (errno==EAGAIN) break;
+ sysdie("read from inotify master");
+ } else if (r==sizeof(iev)) {
+ assert(wd == filemon_inotify_wd);
+ } else {
+ die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
+ }
+ }
+ /* Technically speaking the select might fire early in which case
+ * we'll read no events and then call filemon_callback
+ * unnecessarily. We don't care about that.
+ */
+ filemon_callback();
+ return OOP_CONTINUE;
+}
+
+static int filemon_init(void) {
+ filemon_inotify_fd= inotify_init();
+ if (filemon_inotify_fd<0) {
+ syswarn("could not initialise inotify: inotify_init failed");
+ return 0;
+ }
+ set nonblock;
+ loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
+
+ return 1;
+}
+
+static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path) {
+ if (filemon_inotify_wd >= 0) {
+ int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
+ if (r) sysdie("inotify_rm_watch");
+ }
+ filemon_inotify_wd= inotify_add_watch(filemon_inotify_fd, path, IN_MODIFY);
+ if (filemon_inotify_wd < 0) sysdie("inotify_add_watch");
+}
+
+#endif /* HAVE_INOTIFY && !HAVE_FILEMON *//
+
+/*---------- filemon dummy implementation ----------*/
+
+#if !defined(HAVE_FILEMON)
+
+static int filemon_init(void) { return 0; }
+static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path) { }
+
+#endif
+
+/*========== interaction with innd ==========*/
+
+/* see state diagram at top of file */
+
+static char *path_ductlock, *path_duct, *path_ductdefer;
+static int tailing_fd= -1, flushing_fd= -1;
+
+static void statemc_init(void) {
+ path_ductlock= xasprintf("%s_duct.lock", feedfile);
+ path_duct= xasprintf("%s_duct", feedfile);
+ path_ductdefer= xasprintf("%s_duct.defer", feedfile);
+
+ int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600);
+ if (lockfd<0) sysdie("open lockfile %s", path_ductlock);
+
+ struct flock fl;
+ memset(&fl,0,sizeof(fl));
+ fl.l_type= F_WRLCK;
+ fl.l_whence= SEEK_SET;
+ r= fcntl(lockfd, F_SETLK, &fl);
+ if (r==-1) {
+ if (errno==EACCES || errno==EAGAIN)
+ die("another duct holds the lockfile");
+ sysdie("fcntl F_SETLK lockfile %s", path_ductlock);
+ }
+}
+
+static void statemc_poll(void) {
+ if (tailing_fd>=0) return;
+
+ int d_fd= open(path_duct, O_RDWR);
+ if (d_fd<0)
+ if (errno!=ENOENT) sysdie("open duct file %s", path_duct);
+
+ int f_fd= open(feedfile, O_RDWR);
+ if (f_fd<0)
+ if (errno!=ENOENT) sysdie("open feed file %s", feedfile);
+
+ if (d_fd<0) {
+ if (f_fd>=0)
+ start_tailing(f_fd);
+ return;
+ }
+
+
+
+/*========== main program ==========*/
+
+#define EVERY(what, interval, body) \
+ static const struct timeval what##_timeout = { 5, 0 }; \
+ static void what##_schedule(void); \
+ static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \
+ { body } \
+ what##_schedule(); \
+ } \
+ static void what##_schedule(void) { \
+ loop->on_time(loop, what##_timeout, what##_timedout, 0); \
+ }
+
+EVERY(filepoll, {5,0}, { check_master_queue(); })
+
+EVERY(period, {PERIOD_SECONDS,0}, {
+ if (connect_delay) connect_delay--;
+ statemc_poll();
+ check_master_queue();
+});
+
main {
ignore sigpipe;
+ if (!filemon_init())
+ filepoll_schedule();
+ period_schedule();
};