chiark / gitweb /
wip split into multiple files and make compile
[inn-innduct.git] / recv.c
diff --git a/recv.c b/recv.c
new file mode 100644 (file)
index 0000000..870e0e5
--- /dev/null
+++ b/recv.c
@@ -0,0 +1,237 @@
+/*========== handling responses from peer ==========*/
+
+static const oop_rd_style peer_rd_style= {
+  OOP_RD_DELIM_STRIP, '\n',
+  OOP_RD_NUL_FORBID,
+  OOP_RD_SHORTREC_FORBID
+};
+
+static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
+                        const char *errmsg, int errnoval,
+                        const char *data, size_t recsz, void *conn_v) {
+  Conn *conn= conn_v;
+  connfail(conn, "error receiving from peer: %s", errmsg);
+  return OOP_CONTINUE;
+}
+
+static Article *article_reply_check(Conn *conn, const char *response,
+                                   int code_indicates_streaming,
+                                   int must_have_sent
+                                       /* 1:yes, -1:no, 0:dontcare */,
+                                   const char *sanitised_response) {
+  Article *art= LIST_HEAD(conn->sent);
+
+  if (!art) {
+    connfail(conn,
+            "peer gave unexpected response when no commands outstanding: %s",
+            sanitised_response);
+    return 0;
+  }
+
+  if (code_indicates_streaming) {
+    assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
+    if (!conn->stream) {
+      connfail(conn, "peer gave streaming response code "
+              " to IHAVE or subsequent body: %s", sanitised_response);
+      return 0;
+    }
+    const char *got_mid= response+4;
+    int got_midlen= strcspn(got_mid, " \n\r");
+    if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
+      connfail(conn, "peer gave streaming response with syntactically invalid"
+              " messageid: %s", sanitised_response);
+      return 0;
+    }
+    if (got_midlen != art->midlen ||
+       memcmp(got_mid, art->messageid, got_midlen)) {
+      connfail(conn, "peer gave streaming response code to wrong article -"
+              " probable synchronisation problem; we offered: %s;"
+              " peer said: %s",
+              art->messageid, sanitised_response);
+      return 0;
+    }
+  } else {
+    if (conn->stream) {
+      connfail(conn, "peer gave non-streaming response code to"
+              " CHECK/TAKETHIS: %s", sanitised_response);
+      return 0;
+    }
+  }
+
+  if (must_have_sent>0 && art->state < art_Wanted) {
+    connfail(conn, "peer says article accepted but"
+            " we had not sent the body: %s", sanitised_response);
+    return 0;
+  }
+  if (must_have_sent<0 && art->state >= art_Wanted) {
+    connfail(conn, "peer says please sent the article but we just did: %s",
+            sanitised_response);
+    return 0;
+  }
+
+  Article *art_again= LIST_REMHEAD(conn->sent);
+  assert(art_again == art);
+  return art;
+}
+
+static void update_nocheck(int accepted) {
+  accept_proportion *= nocheck_decay;
+  accept_proportion += accepted * (1.0 - nocheck_decay);
+  int new_nocheck= accept_proportion >= nocheck_thresh;
+  if (new_nocheck && !nocheck_reported) {
+    notice("entering nocheck mode for the first time");
+    nocheck_reported= 1;
+  } else if (new_nocheck != nocheck) {
+    dbg("nocheck mode %s", new_nocheck ? "start" : "stop");
+  }
+  nocheck= new_nocheck;
+}
+
+static void article_done(Article *art, int whichcount) {
+  if (whichcount>=0 && !art->missing)
+    art->ipf->counts[art->state][whichcount]++;
+
+  if (whichcount == RC_accepted) update_nocheck(1);
+  else if (whichcount == RC_unwanted) update_nocheck(0);
+
+  InputFile *ipf= art->ipf;
+
+  while (art->blanklen) {
+    static const char spaces[]=
+      "                                                                "
+      "                                                                "
+      "                                                                "
+      "                                                                "
+      "                                                                "
+      "                                                                "
+      "                                                                "
+      "                                                                "
+      "                                                                ";
+    int nspaces= sizeof(spaces)-1;
+    int w= art->blanklen;  if (w > nspaces) w= nspaces;
+    int r= pwrite(ipf->fd, spaces, w, art->offset);
+    if (r==-1) {
+      if (errno==EINTR) continue;
+      syscrash("failed to blank entry for %s (length %d at offset %lu) in %s",
+              art->messageid, art->blanklen,
+              (unsigned long)art->offset, ipf->path);
+    }
+    assert(r>=0 && r<=w);
+    art->blanklen -= w;
+    art->offset += w;
+  }
+
+  ipf->inprogress--;
+  assert(ipf->inprogress >= 0);
+  free(art);
+
+  if (!ipf->inprogress && ipf != main_input_file)
+    queue_check_input_done();
+}
+
+static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
+                       const char *errmsg, int errnoval,
+                       const char *data, size_t recsz, void *conn_v) {
+  Conn *conn= conn_v;
+
+  if (ev == OOP_RD_EOF) {
+    connfail(conn, "unexpected EOF from peer");
+    return OOP_CONTINUE;
+  }
+  assert(ev == OOP_RD_OK);
+
+  char *sani= sanitise(data,-1);
+
+  char *ep;
+  unsigned long code= strtoul(data, &ep, 10);
+  if (ep != data+3 || *ep != ' ' || data[0]=='0') {
+    connfail(conn, "badly formatted response from peer: %s", sani);
+    return OOP_CONTINUE;
+  }
+
+  int busy= conn_busy(conn);
+
+  if (conn->quitting) {
+    if (code!=205 && code!=400) {
+      connfail(conn, "peer gave unexpected response to QUIT (%s): %s",
+              conn->quitting, sani);
+    } else {
+      LIST_REMOVE(conns,conn);
+      notice("C%d (now %d) idle connection closed (%s)",
+            conn->fd, conns.count, conn->quitting);
+      assert(!busy);
+      conn_dispose(conn);
+    }
+    return OOP_CONTINUE;
+  }
+
+  conn->since_activity= 0;
+  Article *art;
+
+#define GET_ARTICLE(musthavesent) do{                                        \
+    art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
+    if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
+  }while(0) 
+
+#define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{      \
+    code_streaming= (streaming);                               \
+    GET_ARTICLE(musthavesent);                                 \
+    article_done(art, RC_##how);                               \
+    goto dealtwith;                                            \
+  }while(0)
+
+#define PEERBADMSG(m) do {                                     \
+    connfail(conn, m ": %s", sani);  return OOP_CONTINUE;      \
+  }while(0)
+
+  int code_streaming= 0;
+
+  switch (code) {
+
+  default:  PEERBADMSG("peer sent unexpected message");
+
+  case 400:
+    if (busy)
+      PEERBADMSG("peer timed us out or stopped accepting articles");
+
+    LIST_REMOVE(conns,conn);
+    notice("C%d (now %d) idle connection closed by peer",
+          conns.count, conn->fd);
+    conn_dispose(conn);
+    return OOP_CONTINUE;
+
+  case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
+  case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
+
+  case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
+  case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
+
+  case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
+  case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
+
+  case 238: /* CHECK says send it */
+    code_streaming= 1;
+  case 335: /* IHAVE says send it */
+    GET_ARTICLE(-1);
+    assert(art->state == art_Unchecked);
+    art->ipf->counts[art->state][RC_accepted]++;
+    art->state= art_Wanted;
+    LIST_ADDTAIL(conn->priority, art);
+    break;
+
+  case 431: /* CHECK or TAKETHIS says try later */
+    code_streaming= 1;
+  case 436: /* IHAVE says try later */
+    GET_ARTICLE(0);
+    article_defer(art, RC_deferred);
+    break;
+
+  }
+dealtwith:
+
+  conn_maybe_write(conn);
+  check_assign_articles();
+  return OOP_CONTINUE;
+}
+
+