chiark / gitweb /
WIP input file handling
authorIan Jackson <ian@liberator.(none)>
Sat, 13 Feb 2010 16:32:05 +0000 (16:32 +0000)
committerIan Jackson <ian@liberator.(none)>
Sat, 13 Feb 2010 16:32:05 +0000 (16:32 +0000)
backends/innduct.c

index 65417c91aaac29eae95b4b3a135e138c00ebb9b3..a2a4ec60a27b46948fb88f74c65c441418ec159b 100644 (file)
@@ -2,25 +2,53 @@
  * Four files full of
  *    token article
  *
- *   site.name_ductlock        lock taken out by innduct
- * F site.name                 written by innd
- * D site.name_duct            moved aside by innduct, for flush
- *   site.name_deferwork       431'd articles, still being written
- *   site.name_defergo_<inum>  431'd articles, ready for innxmit
- *   site.name_deferlock       lock taken out by innxmit wrapper
- *
+ *   site.name_duct.lock       lock preventing multiple ducts
+ *                                holder of this lock is "duct"
+ * F site.name                 main feed file
+ *                                opened/created, then written, by innd
+ *                                read by duct
+ *                                unlinked by duct
+ *                                tokens blanked out by duct when processed
+ * D site.name_duct            temporary feed file during flush (or crash)
+ *                                hardlink created by duct
+ *                                unlinked by duct
+ *   site.name_duct.defer      431'd articles, still being written,
+ *                                created, written, used by duct
+ *   site.name_backlog.lock    lock taken out by innxmit wrapper
+ *                                holder and its child are "xmit"
+ *   site.name_backlog_<inum>  431'd articles, ready for innxmit
+ *                                created (link/mv) by duct
+ *                                read by xmit
+ *                                unlinked by xmit
+ *   site.name_backlog_<letters> eg
+ *   site.name_backlog_manual
+ *                             anything the sysadmin likes (eg, feed files
+ *                             from old feeds to be merged into this one)
+ *                                created (link/mv) by admin
+ *                                read by xmit
+ *                                unlinked by xmit
  *
  *
  * OVERALL STATES:
  *
  *                                                  START
  *   ,-->--.                                           |
- *   |     |                                         stat D
+ *   |     |                                        check D
  *   |     |                                         /   |
  *   |     |                                  ENOENT/    |exists
- *   |     V                            <----------'     |
- *   |  Normal                                         stat F
- *   |   F: innd writing, duct reading                  /|\
+ *   |     V  <----------+--<----------------------'     |
+ *   |  None             |                              |
+ *   |   F: unopened     |                              |
+ *   |   D: ENOENT              |                               |
+ *   |     |             |                              |
+ *   |  repeatedly       |                              |
+ *   |   open F ------->-'                              |
+ *   |     |     ENOENT                                 |
+ *   |     |OK                                          |
+ *   |     |                                                    |
+ *   |     V                                            |
+ *   |  Normal                                        check F
+ *   |   F: innd writing, duct reading / editing        /|\
  *   |   D: ENOENT                                     / | \
  *   |     |                                          /  |  \
  *   |     |  duct decides time to flush      same   /   |   |
  *   |     |                                       /     |   |
  *   |     V                            <---------'      |   |
  *   |  Hardlinked                                       |   |
- *   |   F == D: innd writing, duct reading              |   |
+ *   |   F == D: innd writing, duct reading / editing    |   |
  *   ^     |                                             |   |
  *   |     |  duct unlinks F                            /    |
  *   |     V                                  ENOENT   /     |
  *   |  Moved                            <------------'      |
  *   |   F: ENOENT                                           |
- *   |   D: innd writing, duct reading                       |
+ *   |   D: innd writing, duct reading / editing             |
  *   |     |                                                 |
- *   |     |  duct flushes feed                              |
+ *   |     |  duct requests flush of feed                    |
  *   |     |   (others can too, harmlessly)                  |
  *   |     V                                                 |
  *   |  Separated                          <-----------------'
  *   |   F: innd writing                        different to D
- *   |   D: duct reading
+ *   |   D: duct reading / editing
  *   |     |
  *   |     V  duct completes processing of D
  *   |     |  duct unlinks D
@@ -50,6 +78,9 @@
  *
  */
 
+#define PERIOD_SECONDS 60
+
+static char *feedfile;
 static int max_connections, max_queue_per_conn;
 static int connection_setup_timeout, port, try_stream;
 static const char *remote_host;
@@ -108,6 +139,10 @@ struct Conn {
   int xmitu;
 };
 
+static int filemon_init(void);
+static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path);
+static void filemon_callback(void);
+
 
 #define CHILD_ESTATUS_STREAM   4
 #define CHILD_ESTATUS_NOSTREAM 5
@@ -120,7 +155,6 @@ static LIST(Article) *queue;
 
 static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
 
-
 /*========== making new connections ==========*/
 
 static int connecting_sockets[2]= {-1,-1};
@@ -180,7 +214,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
   Conn *conn= 0;
 
   conn= xcalloc(sizeof(*conn));
-  
+
   DECL_MSG_CMSG(msg);
   struct cmsghdr *h= 0;
   ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
@@ -362,10 +396,12 @@ static void connect_start() {
 }
 
 /*========== overall control of article flow ==========*/
+
 static void conn_check_work(Conn *conn);
 
 static void check_master_queue(void) {
+  try reading current feed file;
+
   if (!queue.count)
     return;
 
@@ -384,8 +420,8 @@ static void check_master_queue(void) {
     }
   }
   conn_check_work(last_assigned);
-} 
+}
+
 static void conn_assign_one_article(LIST(Conn) *connlist,
                                    Conn **last_assigned) {
   Conn *conn= connlist->head;
@@ -407,7 +443,7 @@ static int conn_total_queued_articles(Conn *conn) {
   return conn->sent.count + conn->queue.count;
 }
 
-static LIST(Conn) *conn_determine_right_list(Conn *conn) { 
+static LIST(Conn) *conn_determine_right_list(Conn *conn) {
   int inqueue= conn_total_queued_articles(conn);
   assert(inqueue <= max_queue);
   if (inqueue == 0) return &idle;
@@ -419,7 +455,7 @@ static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
   check_conn_work(u);
   return OOP_CONTINUE;
 }
-  
+
 static void conn_check_work(Conn *conn)  {
   void *rp= 0;
   for (;;) {
@@ -442,7 +478,7 @@ static void conn_check_work(Conn *conn)  {
     }
   }
 }
+
 /*========== article transmission ==========*/
 
 static void *conn_write_some_xmits(Conn *conn) {
@@ -454,7 +490,7 @@ static void *conn_write_some_xmits(Conn *conn) {
   for (;;) {
     int count= conn->xmitu;
     if (!count) return 0;
-  
+
     if (count > IOV_MAX) count= IOV_MAX;
     ssize_t rs= writev(conn->fd, conn->xmit, count);
     if (rs < 0) {
@@ -495,7 +531,7 @@ static void conn_make_some_xmits(Conn *conn) {
       /* actually send it */
 
       ARTHANDLE *artdata= SMretrieve(somehow);
-      
+
       if (conn->stream) {
        if (artdata) {
          XMIT_LITERAL("TAKETHIS ");
@@ -516,7 +552,7 @@ static void conn_make_some_xmits(Conn *conn) {
 
     } else {
       /* check it */
-      
+
       if (conn->stream)
        XMIT_LITERAL("IHAVE ");
       else
@@ -617,8 +653,148 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
   }
 
   return OOP_CONTINUE;
-}  
+}
+
+/*========== monitoring of input file ==========*/
+
+/*---------- tailing input file ----------*/
+
+
+
+/*---------- filemon implemented with inotify ----------*/
+
+#if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON)
+#define HAVE_FILEMON
+
+#include <linux/inotify.h>
+
+static int filemon_inotify_fd;
+static int filemon_inotify_wd= -1;
+
+static void *filemon_inotify_readable(oop_source *lp, int fd,
+                                     oop_event e, void *u) {
+  struct inotify_event iev;
+  for (;;) {
+    int r= read(filemon_inotify_fd, &iev, sizeof(iev));
+    if (r==-1) {
+      if (errno==EAGAIN) break;
+      sysdie("read from inotify master");
+    } else if (r==sizeof(iev)) {
+      assert(wd == filemon_inotify_wd);
+    } else {
+      die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
+    }
+  }
+  /* Technically speaking the select might fire early in which case
+   * we'll read no events and then call filemon_callback
+   * unnecessarily.  We don't care about that.
+   */
+  filemon_callback();
+  return OOP_CONTINUE;
+}
+
+static int filemon_init(void) {
+  filemon_inotify_fd= inotify_init();
+  if (filemon_inotify_fd<0) {
+    syswarn("could not initialise inotify: inotify_init failed");
+    return 0;
+  }
+  set nonblock;
+  loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
+
+  return 1;
+}
+
+static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path) {
+  if (filemon_inotify_wd >= 0) {
+    int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
+    if (r) sysdie("inotify_rm_watch");
+  }
+  filemon_inotify_wd= inotify_add_watch(filemon_inotify_fd, path, IN_MODIFY);
+  if (filemon_inotify_wd < 0) sysdie("inotify_add_watch");
+}
+
+#endif /* HAVE_INOTIFY && !HAVE_FILEMON *//
+
+/*---------- filemon dummy implementation ----------*/
+
+#if !defined(HAVE_FILEMON)
+
+static int filemon_init(void) { return 0; }
+static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path) { }
+
+#endif
+
+/*========== interaction with innd ==========*/
+
+/* see state diagram at top of file */
+
+static char *path_ductlock, *path_duct, *path_ductdefer;
+static int tailing_fd= -1, flushing_fd= -1;
+
+static void statemc_init(void) {
+  path_ductlock=  xasprintf("%s_duct.lock",  feedfile);
+  path_duct=      xasprintf("%s_duct",       feedfile);
+  path_ductdefer= xasprintf("%s_duct.defer", feedfile);
+
+  int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600);
+  if (lockfd<0) sysdie("open lockfile %s", path_ductlock);
+
+  struct flock fl;
+  memset(&fl,0,sizeof(fl));
+  fl.l_type= F_WRLCK;
+  fl.l_whence= SEEK_SET;
+  r= fcntl(lockfd, F_SETLK, &fl);
+  if (r==-1) {
+    if (errno==EACCES || errno==EAGAIN)
+      die("another duct holds the lockfile");
+    sysdie("fcntl F_SETLK lockfile %s", path_ductlock);
+  }
+}
+
+static void statemc_poll(void) {
+  if (tailing_fd>=0) return;
+
+  int d_fd= open(path_duct, O_RDWR);
+  if (d_fd<0)
+    if (errno!=ENOENT) sysdie("open duct file %s", path_duct);
+
+  int f_fd= open(feedfile, O_RDWR);
+  if (f_fd<0)
+    if (errno!=ENOENT) sysdie("open feed file %s", feedfile);
+
+  if (d_fd<0) {
+    if (f_fd>=0)
+      start_tailing(f_fd);
+    return;
+  }
+
+  
+
+/*========== main program ==========*/
+
+#define EVERY(what, interval, body)                                         \
+  static const struct timeval what##_timeout = { 5, 0 };                    \
+  static void what##_schedule(void);                                        \
+  static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \
+    { body }                                                                \
+    what##_schedule();                                                      \
+  }                                                                         \
+  static void what##_schedule(void) {                                       \
+    loop->on_time(loop, what##_timeout, what##_timedout, 0);                \
+  }
+
+EVERY(filepoll, {5,0}, { check_master_queue(); })
+
+EVERY(period, {PERIOD_SECONDS,0}, {
+  if (connect_delay) connect_delay--;
+  statemc_poll();
+  check_master_queue();
+});
+
 main {
   ignore sigpipe;
+  if (!filemon_init())
+    filepoll_schedule();
+  period_schedule();
 };