char *mid;
int midlen;
int checked, sentbody;
+ fd and offset for blanking token or mid;
};
#define CONNIOVS 128
ISNODE(Conn);
int fd, max_queue, stream;
LIST(Article) queue; /* not yet told peer, or CHECK said send it */
- LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */
+ LIST(Article) sent; /* offered/transmitted - in xmit or waiting reply */
struct iovec xmit[CONNIOVS];
XmitDetails xmitd[CONNIOVS];
int xmitu;
static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
check_conn_work(u);
+ return OOP_CONTINUE;
}
static void conn_check_work(Conn *conn) {
}
}
}
-
-
- if (conn->queue.head) {
- if (conn->queue.checked || conn->nocheck) {
-
-
- && conn->xmitu+3 <= CONNIOVS) {
- if (
- XMIT("
- if (conn->xmitu < CONNIOVS
-
- if (!queue
-
+/*========== responses from peer ==========*/
+static const oop_rd_style peer_rd_style= {
+ OOP_RD_DELIM_STRIP, '\n',
+ OOP_RD_NUL_FORBID,
+ OOP_RD_SHORTREC_FORBID
+};
+static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
+ const char *errmsg, int errnoval,
+ const char *data, size_t recsz, void *conn_v) {
+ Conn *conn= conn_v;
-
- int circ_used= circ_write - circ_read;
- if (circ_used < 0) circ_used += CONNBUFSZ;
- writeable_moredata(conn, CONNBUFSZ-1 - circ_used);
-
- if (conn->circ_read == conn->circ_write)
- return OOP_CONTINUE;
-
- struct iovec iov[2];
- int niov= 1;
- iov[0].iov_base= conn->circ_buf + conn->circ_read;
- if (conn->circ_read > conn->circ_write) { /* wrapped */
- iov[0].iov_len= CONNBUFSZ - conn->circ_read;
- iov[1].iov_base= conn->circ_buf;
- iov[1].iov_len= conn->circ_write;
- if (niov[1].iov_len) niov= 2;
- } else {
- iov[0].iov_len= conn->circ_write - conn->circ_read;
- }
- ssize_t rs= writev(conn->fd, &iov, niov);
- if (rs < 0) {
-
+ if (ev == OOP_RD_EOF) {
+ warn("unexpected EOF from peer");
+ conn_failed(conn);
+ return;
+ }
+ assert(ev == OOP_RD_OK);
+
+ char *ep;
+ unsigned long code= strtoul(data, &ep, 10);
+ if (ep != data+3 || *ep != ' ' || data[0]=='0') {
+ char sanibuf[100];
+ const char *p= data;
+ char *q= sanibuf;
+ *q++= '`';
+ for (;;) {
+ if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; }
+ int c= *p++;
+ if (!c) { *q++= '\''; break; }
+ if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
+ sprintf(q,"\\x%02x",c);
+ q += 4;
}
- assert(rs > 0);
+ warn("badly formatted response from peer: %s", sanibuf);
+ conn_failed(conn);
+ return;
+ }
- conn->circ_read += rs;
- if (conn->circ_read > CONNBUFSZ)
- conn->circ_read -= CONNBUFSZ;
+ if (conn->quitting) {
+ if (code!=205) {
+ warn("peer gave failure response to QUIT: %s", sani);
+ conn_failed(conn);
+ return;
+ }
+ conn close ok;
+ return;
}
-}
+ switch (code) {
+ case 438: /* CHECK says they have it */
+ case 435: /* IHAVE says they have it */
+ ARTICLE_DEALTWITH(1,unwanted);
+ break;
+
+ case 238: /* CHECK says send it */
+ case 335: /* IHAVE says send it */
+ count_checkedwanted++;
+ Article *art= LIST_REMHEAD(conn->sent);
+ art->checked= 1;
+ LIST_ADDTAIL(conn->queue);
+ break;
+
+ case 235: /* IHAVE says thanks */
+ case 239: /* TAKETHIS says thanks */
+ ARTICLE_DEALTWITH(1,accepted);
+ break;
+
+ case 439: /* TAKETHIS says rejected */
+ case 437: /* IHAVE says rejected */
+ ARTICLE_DEALTWITH(1,rejected);
+ break;
+
+ case 431: /* CHECK or TAKETHIS says try later */
+ case 436: /* IHAVE says try later */
+ ARTICLE_DEALTWITH(0,deferred);
+ break;
+
+ case 400: warn("peer has stopped accepting articles: %s", sani); goto failed;
+ case 503: warn("peer timed us out: %s", sani); goto failed;
+ default: warn("peer sent unexpected message: %s", sani);
+ failed:
+ conn_failed(conn);
+ return OOP_CONTINUE;;
+ }
+ return OOP_CONTINUE;
+}
+
main {
ignore sigpipe;
};