chiark / gitweb /
new aggressive close "recentact" aka "lowvol"; do not count idle connection disconnec...
authorIan Jackson <ian@liberator.(none)>
Wed, 5 May 2010 20:33:20 +0000 (21:33 +0100)
committerIan Jackson <ian@liberator.(none)>
Wed, 5 May 2010 20:33:20 +0000 (21:33 +0100)
backends/innduct.c
doc/man/innduct.8

index 561ff0022ff30e3d43130aa31e4bcc5ce4f56d6a..772fc89c0079e99be2146d40a53fd7e65bcf344c 100644 (file)
@@ -391,6 +391,8 @@ static int backlog_spontrescan_periods=300;
 static int spontaneous_flush_periods=100000;
 static int max_separated_periods=2000;
 static int need_activity_periods=1000;
+static int recentact_thresh=3;
+static int recentact_periods=1000;
 
 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
 static int max_bad_data_initial= 30;
@@ -528,6 +530,9 @@ static ConnList conns;
 static char *path_lock, *path_flushing, *path_defer, *path_dump;
 static char *globpat_backlog;
 static pid_t self_pid;
+static int *recentact_perperiod;
+static int recentact_circptr;
+static int recentact_total;
 
 /* statemc_init initialises */
 static StateMachineState sms;
@@ -1064,12 +1069,24 @@ static void cli_init(void) {
 
 /*========== management of connections ==========*/
 
+static void reconnect_blocking_event(void) {
+  until_connect= reconnect_delay_periods;
+}
+
 static void conn_closefd(Conn *conn, const char *msgprefix) {
   int r= close_perhaps(&conn->fd);
   if (r) info("C%d %serror closing socket: %s",
              conn->fd, msgprefix, strerror(errno));
 }
 
+static int conn_busy(Conn *conn) {
+  return
+    conn->waiting.count ||
+    conn->priority.count ||
+    conn->sent.count ||
+    conn->xmitu;
+}
+
 static void conn_dispose(Conn *conn) {
   if (!conn) return;
   if (conn->rd) {
@@ -1083,7 +1100,6 @@ static void conn_dispose(Conn *conn) {
   }
   conn_closefd(conn,"");
   free(conn);
-  until_connect= reconnect_delay_periods;
 }
 
 static void *conn_exception(oop_source *lp, int fd,
@@ -1130,6 +1146,7 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) {
        conn->fd, conns.count, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
   free(m);
 
+  reconnect_blocking_event();
   conn_dispose(conn);
   check_assign_articles();
 }
@@ -1141,10 +1158,47 @@ static void connfail(Conn *conn, const char *fmt, ...) {
   va_end(al);
 }
 
+static void conn_idle_close(Conn *conn, const char *why) {
+  static const char quitcmd[]= "QUIT\r\n";
+  int todo= sizeof(quitcmd)-1;
+  const char *p= quitcmd;
+  for (;;) {
+    int r= write(conn->fd, p, todo);
+    if (r<0) {
+      if (isewouldblock(errno))
+       connfail(conn, "blocked writing QUIT to idle connection");
+      else
+       connfail(conn, "failed to write QUIT to idle connection: %s",
+                strerror(errno));
+      break;
+    }
+    assert(r<=todo);
+    todo -= r;
+    if (!todo) {
+      conn->quitting= 1;
+      conn->since_activity= 0;
+      debug("C%d is idle (%s), quitting", conn->fd, why);
+      break;
+    }
+  }
+}
+
+/*
+ * For our last connection, we also shut it down if we have had
+ * less than K in the last L
+ */
 static void check_idle_conns(void) {
   Conn *conn;
+
+  int veryrecentact= recentact_perperiod[recentact_circptr];
+  recentact_circptr++;
+  recentact_circptr %= recentact_periods;
+  recentact_total -= recentact_perperiod[recentact_circptr];
+  recentact_perperiod[recentact_circptr]= 0;
+
   FOR_CONN(conn)
     conn->since_activity++;
+
  search_again:
   FOR_CONN(conn) {
     if (conn->since_activity <= need_activity_periods) continue;
@@ -1159,32 +1213,18 @@ static void check_idle_conns(void) {
     else if (conn->xmitu)
       connfail(conn,"peer has been sending responses"
               " before receiving our commands!");
-    else {
-      static const char quitcmd[]= "QUIT\r\n";
-      int todo= sizeof(quitcmd)-1;
-      const char *p= quitcmd;
-      for (;;) {
-       int r= write(conn->fd, p, todo);
-       if (r<0) {
-         if (isewouldblock(errno))
-           connfail(conn, "blocked writing QUIT to idle connection");
-         else
-           connfail(conn, "failed to write QUIT to idle connection: %s",
-                    strerror(errno));
-         break;
-       }
-       assert(r<=todo);
-       todo -= r;
-       if (!todo) {
-         conn->quitting= 1;
-         conn->since_activity= 0;
-         debug("C%d is idle, quitting", conn->fd);
-         break;
-       }
-      }
-    }
+    else
+      conn_idle_close(conn, "no activity");
+    
     goto search_again;
   }
+
+  conn= LIST_HEAD(conns);
+  if (!veryrecentact &&
+      conns.count==1 &&
+      recentact_total+veryrecentact < recentact_thresh &&
+      !conn_busy(conn))
+    conn_idle_close(conn, "low volume");
 }  
 
 /*---------- making new connections ----------*/
@@ -1193,6 +1233,7 @@ static pid_t connecting_child;
 static int connecting_fdpass_sock;
 
 static void connect_attempt_discard(void) {
+  reconnect_blocking_event();
   if (connecting_child) {
     int status= xwaitpid(&connecting_child, "connect");
     if (!(WIFEXITED(status) ||
@@ -1331,6 +1372,7 @@ static void connect_start(void) {
   assert(!connecting_fdpass_sock);
 
   info("starting connection attempt");
+  reconnect_blocking_event();
 
   int socks[2];
   int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
@@ -1475,11 +1517,11 @@ static void check_assign_articles(void) {
        Article *art= dequeue(0);
        if (!art) break;
        LIST_ADDTAIL(use->waiting, art);
+       recentact_perperiod[recentact_circptr]++;
        spare--;
       }
       conn_maybe_write(use);
     } else if (allow_connect_start()) {
-      until_connect= reconnect_delay_periods;
       connect_start();
       break;
     } else {
@@ -1901,20 +1943,16 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
     return OOP_CONTINUE;
   }
 
-  int conn_busy=
-    conn->waiting.count ||
-    conn->priority.count ||
-    conn->sent.count ||
-    conn->xmitu;
+  int busy= conn_busy(conn);
 
   if (conn->quitting) {
-    if (code!=205 && code!=503) {
+    if (code!=205 && code!=400) {
       connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
     } else {
       LIST_REMOVE(conns,conn);
       notice("C%d (now %d) idle connection closed by us",
             conn->fd, conns.count);
-      assert(!conn_busy);
+      assert(!busy);
       conn_dispose(conn);
     }
     return OOP_CONTINUE;
@@ -1946,7 +1984,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
   default:  PEERBADMSG("peer sent unexpected message");
 
   case 400:
-    if (conn_busy)
+    if (busy)
       PEERBADMSG("peer timed us out or stopped accepting articles");
 
     LIST_REMOVE(conns,conn);
@@ -3612,6 +3650,8 @@ static const Option innduct_options[]= {
 {0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
 {0,"flush-finish-timeout",   "PERIOD", &max_separated_periods,    op_seconds },
 {0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
+{0,"low-volume-thresh",      "PERIOD", &recentact_thresh,         op_integer },
+{0,"low-volume-window",      "PERIOD", &recentact_periods,        op_seconds },
 
 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
@@ -3685,6 +3725,7 @@ int main(int argc, char **argv) {
   convert_to_periods_rndup(&spontaneous_flush_periods);
   convert_to_periods_rndup(&max_separated_periods);
   convert_to_periods_rndup(&need_activity_periods);
+  convert_to_periods_rndup(&recentact_periods);
 
   if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
     badusage("bad input data ratio must be between 0..100");
@@ -3719,6 +3760,13 @@ int main(int argc, char **argv) {
     if (strchr(feedfile, c))
       badusage("feed filename may not contain metacharacter %c",c);
 
+  int i;
+  recentact_perperiod= xcalloc(sizeof(*recentact_perperiod),recentact_periods);
+  for (i=0; i<recentact_periods; i++) {
+    recentact_perperiod[i]= recentact_thresh;
+    recentact_total += recentact_thresh;
+  }
+
   /* set things up */
 
   path_lock=        xasprintf("%s_lock",      feedfile);
index 22799f84064eedcc6c215c0cb20962271d4fd06d..759abd7f2b4dcdde2551c698f1c018fd7f2199b1 100644 (file)
@@ -354,6 +354,22 @@ unresponsive or the connection has become broken.
 The default is
 .BR 1000s .
 .TP
+.BI \-\-low-volume-thresh= "WIN-THRESH " \-\-low-volume-window= "PERIOD "
+If innduct has only one connection to the peer, and has processed
+fewer than
+.I WIN-THRESH
+articles in the last
+.I PERIOD
+and also no articles in the last
+.IR PERIOD-INTERVAL
+it will close the connection quickly.  That is, innduct switches to a
+mode where it opens a connection for each article (or, perhaps, each
+handful of articles arriving together).
+The default is to close if fewer than
+.BR 3
+articles in the last
+.BR 1000s .
+.TP
 .BI \-\-max-bad-input-data-ratio= PERCENT
 We tolerate up to this proportion of badly-formatted lines in the
 feedfile and other input files.  Every badly-formatted line is logged,