chiark / gitweb /
wip make it compile; before eliminate multiple conn lists
authorIan Jackson <ian@liberator.(none)>
Sat, 24 Apr 2010 23:14:48 +0000 (00:14 +0100)
committerIan Jackson <ian@liberator.(none)>
Sat, 24 Apr 2010 23:14:48 +0000 (00:14 +0100)
backends/innduct.c
doc/man/innduct.8

index 0ccaa1c9d360d36c9e0b3c25169ca4903a4fe20e..a2b3742b47ef924ff6675780ea744926cd1a101a 100644 (file)
@@ -234,6 +234,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.
 typedef struct Conn Conn;
 typedef struct Article Article;
 typedef struct InputFile InputFile;
 typedef struct Conn Conn;
 typedef struct Article Article;
 typedef struct InputFile InputFile;
+typedef struct XmitDetails XmitDetails;
 typedef enum StateMachineState StateMachineState;
 
 DEFLIST(Conn);
 typedef enum StateMachineState StateMachineState;
 
 DEFLIST(Conn);
@@ -245,6 +246,8 @@ static void conn_check_work(Conn *conn);
 static void conn_make_some_xmits(Conn *conn);
 static void *conn_write_some_xmits(Conn *conn);
 
 static void conn_make_some_xmits(Conn *conn);
 static void *conn_write_some_xmits(Conn *conn);
 
+static void xmit_free(XmitDetails *d);
+
 static int filemon_init(void);
 static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path);
 static void filemon_callback(void);
 static int filemon_init(void);
 static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path);
 static void filemon_callback(void);
@@ -321,13 +324,13 @@ typedef enum {
   xk_Malloc, xk_Const, xk_Artdata
 } XmitKind;
 
   xk_Malloc, xk_Const, xk_Artdata
 } XmitKind;
 
-typedef struct {
+struct XmitDetails {
   XmitKind kind;
   union {
     char *malloc_tofree;
     ARTHANDLE *sm_art;
   } info;
   XmitKind kind;
   union {
     char *malloc_tofree;
     ARTHANDLE *sm_art;
   } info;
-} XmitDetails;
+};
 
 
 /*----- core operational data structure types -----*/
 
 
 /*----- core operational data structure types -----*/
@@ -430,7 +433,7 @@ static void logv(int sysloglevel, const char *pfx, int errnoval,
      __attribute__((__format__(printf,5,0)));
 static void logv(int sysloglevel, const char *pfx, int errnoval,
                 int exitstatus, const char *fmt, va_list al) {
      __attribute__((__format__(printf,5,0)));
 static void logv(int sysloglevel, const char *pfx, int errnoval,
                 int exitstatus, const char *fmt, va_list al) {
-  char msgbuf[256];
+  char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
   vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
   msgbuf[sizeof(msgbuf)-1]= 0;
 
   vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
   msgbuf[sizeof(msgbuf)-1]= 0;
 
@@ -468,6 +471,24 @@ logwrap(debug,    " debug",    LOG_DEBUG,   -1,    0);
 
 /*========== utility functions etc. ==========*/
 
 
 /*========== utility functions etc. ==========*/
 
+static char *xvasprintf(const char *fmt, va_list al)
+     __attribute__((__format__(printf,1,0)));
+static char *xvasprintf(const char *fmt, va_list al) {
+  char *str;
+  int rc= vasprintf(&str,fmt,al);
+  if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
+  return str;
+}
+static char *xasprintf(const char *fmt, ...)
+     __attribute__((__format__(printf,1,2)));
+static char *xasprintf(const char *fmt, ...) {
+  va_list al;
+  va_start(al,fmt);
+  char *str= xvasprintf(fmt,al);
+  va_end(al);
+  return str;
+}
+
 static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
 
 static pid_t xfork(const char *what) {
 static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
 
 static pid_t xfork(const char *what) {
@@ -686,9 +707,9 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
     fatal("connect: child gave unexpected exit status %d", es);
   }
 
     fatal("connect: child gave unexpected exit status %d", es);
   }
 
-  setnonblock(conn->fd, 1);
-
   /* Phew! */
   /* Phew! */
+  setnonblock(conn->fd, 1);
+  conn->max_queue= conn->stream ? max_queue_per_conn : 1;
   LIST_ADDHEAD(idle, conn);
   notice("#%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
   connect_attempt_discard();
   LIST_ADDHEAD(idle, conn);
   notice("#%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
   connect_attempt_discard();
@@ -796,6 +817,10 @@ static void connect_start(void) {
 
 /*========== overall control of article flow ==========*/
 
 
 /*========== overall control of article flow ==========*/
 
+static int conn_owned_articles(Conn *conn) {
+  return conn->sent.count + conn->queue.count;
+}
+
 static void check_master_queue(void) {
   if (!queue.count)
     return;
 static void check_master_queue(void) {
   if (!queue.count)
     return;
@@ -806,8 +831,7 @@ static void check_master_queue(void) {
       conn_assign_one_article(&working, &last_assigned);
     } else if (idle.head) {
       conn_assign_one_article(&idle, &last_assigned);
       conn_assign_one_article(&working, &last_assigned);
     } else if (idle.head) {
       conn_assign_one_article(&idle, &last_assigned);
-    } else if (nconns < max_connections &&
-              conn_total_queued_articles(conn) >= max_queue_per_conn &&
+    } else if (full.count < max_connections &&
               !connecting_child && !until_connect) {
       until_connect= reconnect_delay_periods;
       connect_start();
               !connecting_child && !until_connect) {
       until_connect= reconnect_delay_periods;
       connect_start();
@@ -834,12 +858,8 @@ static void conn_assign_one_article(ConnList *connlist, Conn **last_assigned) {
   *last_assigned= conn;
 }
 
   *last_assigned= conn;
 }
 
-static int conn_total_queued_articles(Conn *conn) {
-  return conn->sent.count + conn->queue.count;
-}
-
 static ConnList *conn_determine_right_list(Conn *conn) {
 static ConnList *conn_determine_right_list(Conn *conn) {
-  int inqueue= conn_total_queued_articles(conn);
+  int inqueue= conn_owned_articles(conn);
   assert(inqueue <= max_queue_per_conn);
   if (inqueue == 0) return &idle;
   if (inqueue == conn->max_queue) return &full;
   assert(inqueue <= max_queue_per_conn);
   if (inqueue == 0) return &idle;
   if (inqueue == conn->max_queue) return &full;
@@ -883,15 +903,15 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) {
   Article *art;
   while ((art= LIST_REMHEAD(conn->queue))) LIST_ADDTAIL(queue, art);
   while ((art= LIST_REMHEAD(conn->sent))) {
   Article *art;
   while ((art= LIST_REMHEAD(conn->queue))) LIST_ADDTAIL(queue, art);
   while ((art= LIST_REMHEAD(conn->sent))) {
-    counts[art->state]++;
+    requeue[art->state]++;
     if (art->state==art_Unsolicited) art->state= art_Unchecked;
     if (art->state==art_Unsolicited) art->state= art_Unchecked;
-    LIST_ADDTAIL(queue);
+    LIST_ADDTAIL(queue,art);
   }
 
   int i;
   }
 
   int i;
-  XmitDetails *xd;
-  for (i=0, dp=&conn->xmitd; i<conn->xmitu; i++, dp++)
-    xmit_free(dp);
+  XmitDetails *d;
+  for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
+    xmit_free(d);
 
   char *m= xvasprintf(fmt,al);
   warn("#%d connection failed, requeueing " RCI_TRIPLE_FMT_BASE ": %s",
 
   char *m= xvasprintf(fmt,al);
   warn("#%d connection failed, requeueing " RCI_TRIPLE_FMT_BASE ": %s",
@@ -899,6 +919,7 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) {
   free(m);
 
   close(conn->fd);
   free(m);
 
   close(conn->fd);
+  fixme remove conn from the appropriate list;
   free(conn);
 
   until_connect= reconnect_delay_periods;
   free(conn);
 
   until_connect= reconnect_delay_periods;
index c791d9f6ef87ce63c7d1f1a74a2d289f1604fd0b..18bdbf9bad460ddea1a61669bb19f50b667e2d09 100644 (file)
@@ -60,7 +60,8 @@ connections.
 Restricts the maximum number of outstanding articles queued on any
 particular connection
 .IR max .
 Restricts the maximum number of outstanding articles queued on any
 particular connection
 .IR max .
-The default is 200.
+The default is 200.  (Non-streaming connections can only handle one
+article at a time.)
 .TP
 .BI \-\-max-queue-per-conn= max
 Restricts the maximum number of outstanding articles queued on any
 .TP
 .BI \-\-max-queue-per-conn= max
 Restricts the maximum number of outstanding articles queued on any