chiark / gitweb /
wip split into multiple files and make compile
[innduct.git] / xmit.c
diff --git a/xmit.c b/xmit.c
new file mode 100644 (file)
index 0000000..dd909e4
--- /dev/null
+++ b/xmit.c
@@ -0,0 +1,332 @@
+/*---------- assigning articles to conns, and transmitting ----------*/
+
+static Article *dequeue_from(int peek, InputFile *ipf) {
+  if (!ipf) return 0;
+  if (peek) return LIST_HEAD(ipf->queue);
+
+  Article *art= LIST_REMHEAD(ipf->queue);
+  if (!art) return 0;
+  check_reading_pause_resume(ipf);
+  return art;
+}
+
+static Article *dequeue(int peek) {
+  Article *art;
+  art= dequeue_from(peek, flushing_input_file);  if (art) return art;
+  art= dequeue_from(peek, backlog_input_file);   if (art) return art;
+  art= dequeue_from(peek, main_input_file);      if (art) return art;
+  return 0;
+}
+
+static void check_assign_articles(void) {
+  for (;;) {
+    if (!dequeue(1))
+      break;
+
+    Conn *walk, *use=0;
+    int spare=0, inqueue=0;
+
+    /* Find a connection to offer this article.  We prefer a busy
+     * connection to an idle one, provided it's not full.  We take the
+     * first (oldest) and since that's stable, it will mean we fill up
+     * connections in order.  That way if we have too many
+     * connections, the spare ones will go away eventually.
+     */
+    FOR_CONN(walk) {
+      if (walk->quitting) continue;
+      inqueue= walk->sent.count + walk->priority.count
+            + walk->waiting.count;
+      spare= walk->max_queue - inqueue;
+      assert(inqueue <= max_queue_per_conn);
+      assert(spare >= 0);
+      if (inqueue==0) /*idle*/ { if (!use) use= walk; }
+      else if (spare>0) /*working*/ { use= walk; break; }
+    }
+    if (use) {
+      if (!inqueue) use->since_activity= 0; /* reset idle counter */
+      while (spare>0) {
+       Article *art= dequeue(0);
+       if (!art) break;
+       LIST_ADDTAIL(use->waiting, art);
+       lowvol_perperiod[lowvol_circptr]++;
+       spare--;
+      }
+      conn_maybe_write(use);
+    } else if (allow_connect_start()) {
+      connect_start();
+      break;
+    } else {
+      break;
+    }
+  }
+}
+
+static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
+  conn_maybe_write(u);
+  return OOP_CONTINUE;
+}
+
+static void conn_maybe_write(Conn *conn)  {
+  for (;;) {
+    conn_make_some_xmits(conn);
+    if (!conn->xmitu) {
+      loop->cancel_fd(loop, conn->fd, OOP_WRITE);
+      conn->oopwriting= 0;
+      return;
+    }
+
+    void *rp= conn_write_some_xmits(conn);
+    if (rp==OOP_CONTINUE) {
+      if (!conn->oopwriting) {
+       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
+       conn->oopwriting= 1;
+      }
+      return;
+    } else if (rp==OOP_HALT) {
+      return;
+    } else if (!rp) {
+      /* transmitted everything */
+    } else {
+      abort();
+    }
+  }
+}
+
+/*---------- expiry, flow control and deferral ----------*/
+
+/*
+ * flow control notes
+ * to ensure articles go away eventually
+ * separate queue for each input file
+ *   queue expiry
+ *     every period, check head of backlog queue for expiry with SMretrieve
+ *       if too old: discard, and check next article
+ *     also check every backlog article as we read it
+ *   flush expiry
+ *     after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
+ *     one-off: eat queued articles from flushing and write them to defer
+ *     one-off: connfail all connections which have any articles from flushing
+ *     newly read articles from flushing go straight to defer
+ *     this should take care of it and get us out of this state
+ * to avoid filling up ram needlessly
+ *   input control
+ *     limit number of queued articles for each ipf
+ *     pause/resume inputfile tailing
+ */
+
+static void check_reading_pause_resume(InputFile *ipf) {
+  if (ipf->queue.count >= max_queue_per_ipf)
+    inputfile_reading_pause(ipf);
+  else
+    inputfile_reading_resume(ipf);
+}
+
+static void article_defer(Article *art /* not on a queue */, int whichcount) {
+  open_defer();
+  if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
+      || fflush(defer))
+    sysdie("write to defer file %s",path_defer);
+  article_done(art, whichcount);
+}
+
+static int article_check_expired(Article *art /* must be queued, not conn */) {
+  ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
+  if (artdata) { SMfreearticle(artdata); return 0; }
+
+  LIST_REMOVE(art->ipf->queue, art);
+  art->missing= 1;
+  art->ipf->count_nooffer_missing++;
+  article_done(art,-1);
+  return 1;
+}
+
+void inputfile_queue_check_expired(InputFile *ipf) {
+  if (!ipf) return;
+
+  for (;;) {
+    Article *art= LIST_HEAD(ipf->queue);
+    int expd= article_check_expired(art);
+    if (!expd) break;
+  }
+  check_reading_pause_resume(ipf);
+}
+
+static void article_autodefer(InputFile *ipf, Article *art) {
+  ipf->autodefer++;
+  article_defer(art,-1);
+}
+
+static int has_article_in(const ArticleList *al, InputFile *ipf) {
+  Article *art;
+  for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
+    if (art->ipf == ipf) return 1;
+  return 0;
+}
+
+static void autodefer_input_file_articles(InputFile *ipf) {
+  Article *art;
+  while ((art= LIST_REMHEAD(ipf->queue)))
+    article_autodefer(ipf, art);
+}
+
+static void autodefer_input_file(InputFile *ipf) {
+  static const char *const abandon= "stuck";
+  ipf->autodefer= 0;
+
+  autodefer_input_file_articles(ipf);
+
+  if (ipf->inprogress) {
+    Conn *walk;
+    FOR_CONN(walk) {
+      if (has_article_in(&walk->waiting,  ipf) ||
+         has_article_in(&walk->priority, ipf) ||
+         has_article_in(&walk->sent,     ipf))
+       walk->quitting= abandon;
+    }
+    while (ipf->inprogress) {
+      FOR_CONN(walk)
+       if (walk->quitting == abandon) goto found;
+      abort(); /* where are they ?? */
+
+    found:
+      connfail(walk, "connection is stuck or crawling,"
+              " and we need to finish flush");
+      autodefer_input_file_articles(ipf);
+    }
+  }
+
+  check_reading_pause_resume(ipf);
+}
+
+/*========== article transmission ==========*/
+
+static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
+                 XmitKind kind) { /* caller must then fill in details */
+  struct iovec *v= &conn->xmit[conn->xmitu];
+  XmitDetails *d= &conn->xmitd[conn->xmitu++];
+  v->iov_base= (char*)data;
+  v->iov_len= len;
+  d->kind= kind;
+  return d;
+}
+
+static void xmit_noalloc(Conn *conn, const char *data, int len) {
+  xmit_core(conn,data,len, xk_Const);
+}
+#define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
+
+static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
+  XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
+  d->info.sm_art= ah;
+}
+
+static void xmit_free(XmitDetails *d) {
+  switch (d->kind) {
+  case xk_Artdata: SMfreearticle(d->info.sm_art); break;
+  case xk_Const:                                  break;
+  default: abort();
+  }
+}
+
+static void *conn_write_some_xmits(Conn *conn) {
+  /* return values:
+   *      0:            nothing more to write, no need to call us again
+   *      OOP_CONTINUE: more to write but fd not writeable
+   *      OOP_HALT:     disaster, have destroyed conn
+   */
+  for (;;) {
+    int count= conn->xmitu;
+    if (!count) return 0;
+
+    if (count > IOV_MAX) count= IOV_MAX;
+    ssize_t rs= writev(conn->fd, conn->xmit, count);
+    if (rs < 0) {
+      if (isewouldblock(errno)) return OOP_CONTINUE;
+      connfail(conn, "write failed: %s", strerror(errno));
+      return OOP_HALT;
+    }
+    assert(rs > 0);
+
+    int done;
+    for (done=0; rs; ) {
+      assert(done<conn->xmitu);
+      struct iovec *vp= &conn->xmit[done];
+      XmitDetails *dp= &conn->xmitd[done];
+      assert(vp->iov_len <= SSIZE_MAX);
+      if ((size_t)rs >= vp->iov_len) {
+       rs -= vp->iov_len;
+       xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */
+       done++;
+      } else {
+       vp->iov_base= (char*)vp->iov_base + rs;
+       vp->iov_len -= rs;
+       break; /* rs -= rs */
+      }
+    }
+    int newu= conn->xmitu - done;
+    memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
+    memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
+    conn->xmitu= newu;
+  }
+}
+
+static void conn_make_some_xmits(Conn *conn) {
+  for (;;) {
+    if (conn->xmitu+5 > CONNIOVS)
+      break;
+
+    Article *art= LIST_REMHEAD(conn->priority);
+    if (!art) art= LIST_REMHEAD(conn->waiting);
+    if (!art) break;
+
+    if (art->state >= art_Wanted || (conn->stream && nocheck)) {
+      /* actually send it */
+
+      ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
+
+      art->state=
+       art->state == art_Unchecked ? art_Unsolicited :
+       art->state == art_Wanted    ? art_Wanted      :
+       (abort(),-1);
+
+      if (!artdata) art->missing= 1;
+      art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
+
+      if (conn->stream) {
+       if (artdata) {
+         XMIT_LITERAL("TAKETHIS ");
+         xmit_noalloc(conn, art->messageid, art->midlen);
+         XMIT_LITERAL("\r\n");
+         xmit_artbody(conn, artdata);
+       } else {
+         article_done(art, -1);
+         continue;
+       }
+      } else {
+       /* we got 235 from IHAVE */
+       if (artdata) {
+         xmit_artbody(conn, artdata);
+       } else {
+         XMIT_LITERAL(".\r\n");
+       }
+      }
+
+      LIST_ADDTAIL(conn->sent, art);
+
+    } else {
+      /* check it */
+
+      if (conn->stream)
+       XMIT_LITERAL("CHECK ");
+      else
+       XMIT_LITERAL("IHAVE ");
+      xmit_noalloc(conn, art->messageid, art->midlen);
+      XMIT_LITERAL("\r\n");
+
+      assert(art->state == art_Unchecked);
+      art->ipf->counts[art->state][RC_sent]++;
+      LIST_ADDTAIL(conn->sent, art);
+    }
+  }
+}
+