+ for (done=0; rs && done<xmitu; done++) {
+ struct iovec *vp= &conn->xmit[done];
+ XmitDetails *dp= &conn->xmitd[done];
+ if (rs > vp->iov_len) {
+ rs -= vp->iov_len;
+ xmit_free(dp);
+ } else {
+ vp->iov_base += rs;
+ vp->iov_len -= rs;
+ }
+ }
+ int newu= conn->xmitu - done;
+ memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
+ memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
+ conn->xmitu= newu;
+ }
+}
+
+static void conn_make_some_xmits(Conn *conn) {
+ for (;;) {
+ if (conn->xmitu+5 > CONNIOVS)
+ break;
+
+ Article *art= LIST_REMHEAD(queue);
+ if (!art) break;
+
+ if (art->checked || conn->nocheck) {
+ /* actually send it */
+
+ ARTHANDLE *artdata= SMretrieve(somehow);
+
+ if (conn->stream) {
+ if (artdata) {
+ XMIT_LITERAL("TAKETHIS ");
+ xmit_noalloc(art->mid, art->midlen);
+ XMIT_LITERAL("\r\n");
+ xmit_artbody(artdata);
+ }
+ } else {
+ /* we got 235 from IHAVE */
+ if (artdata) {
+ xmit_artbody(artdata);
+ } else {
+ XMIT_LITERAL(".\r\n");
+ }
+ }
+ art->sent= 1;
+ LIST_ADDTAIL(conn->sent, art);
+
+ } else {
+ /* check it */
+
+ if (conn->stream)
+ XMIT_LITERAL("IHAVE ");
+ else
+ XMIT_LITERAL("CHECK ");
+ xmit_noalloc(art->mid, art->midlen);
+ XMIT_LITERAL("\r\n");
+
+ LIST_ADDTAIL(conn->sent, art);
+ }
+ }
+}
+
+/*========== responses from peer ==========*/
+
+static const oop_rd_style peer_rd_style= {
+ OOP_RD_DELIM_STRIP, '\n',
+ OOP_RD_NUL_FORBID,
+ OOP_RD_SHORTREC_FORBID
+};
+
+static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
+ const char *errmsg, int errnoval,
+ const char *data, size_t recsz, void *conn_v) {
+ Conn *conn= conn_v;
+
+ if (ev == OOP_RD_EOF) {
+ warn("unexpected EOF from peer");
+ conn_failed(conn);
+ return;
+ }
+ assert(ev == OOP_RD_OK);
+
+ char *ep;
+ unsigned long code= strtoul(data, &ep, 10);
+ if (ep != data+3 || *ep != ' ' || data[0]=='0') {
+ char sanibuf[100];
+ const char *p= data;
+ char *q= sanibuf;
+ *q++= '`';
+ for (;;) {
+ if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; }
+ int c= *p++;
+ if (!c) { *q++= '\''; break; }
+ if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
+ sprintf(q,"\\x%02x",c);
+ q += 4;
+ }
+ warn("badly formatted response from peer: %s", sanibuf);
+ conn_failed(conn);
+ return;
+ }
+
+ if (conn->quitting) {
+ if (code!=205) {
+ warn("peer gave failure response to QUIT: %s", sani);
+ conn_failed(conn);
+ return;
+ }
+ conn close ok;
+ return;
+ }
+
+ switch (code) {
+ case 438: /* CHECK says they have it */
+ case 435: /* IHAVE says they have it */
+ ARTICLE_DEALTWITH(1,unwanted);
+ break;
+
+ case 238: /* CHECK says send it */
+ case 335: /* IHAVE says send it */
+ count_checkedwanted++;
+ Article *art= LIST_REMHEAD(conn->sent);
+ art->checked= 1;
+ LIST_ADDTAIL(conn->queue);
+ break;
+
+ case 235: /* IHAVE says thanks */
+ case 239: /* TAKETHIS says thanks */
+ ARTICLE_DEALTWITH(1,accepted);
+ break;
+
+ case 439: /* TAKETHIS says rejected */
+ case 437: /* IHAVE says rejected */
+ ARTICLE_DEALTWITH(1,rejected);
+ break;
+
+ case 431: /* CHECK or TAKETHIS says try later */
+ case 436: /* IHAVE says try later */
+ ARTICLE_DEALTWITH(0,deferred);
+ break;
+
+ case 400: warn("peer has stopped accepting articles: %s", sani); goto failed;
+ case 503: warn("peer timed us out: %s", sani); goto failed;
+ default: warn("peer sent unexpected message: %s", sani);
+ failed:
+ conn_failed(conn);
+ return OOP_CONTINUE;;
+ }
+
+ return OOP_CONTINUE;
+}
+
+/*========== monitoring of input file ==========*/
+
+/*---------- tailing input file ----------*/
+
+static void filemon_start(InputFile *ipf) {
+ assert(!ipf->filemon);
+
+ ipf->filemon= xmalloc(sizeof(*ipf->filemon));
+ memset(ipf->filemon, 0, sizeof(*ipf->filemon));
+ filemon_method_startfile(ipf, ipf->filemon);
+}
+
+static void filemon_stop(InputFile *ipf) {
+ if (!ipf->filemon) return;
+ filemon_method_stopfile(ipf, ipf->filemon);
+ free(ipf->filemon);
+ ipf->filemon= 0;
+}
+
+static void filemon_callback(InputFile *ipf) {
+ ipf->readable_callback(ipf->readable_callback_user);
+}
+
+static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
+ void *user) {
+ InputFile *ipf= user;
+ return ipf->readable_callback(ipf->readable_callback_user);
+}
+
+static void on_cancel(struct oop_readable *rable) {
+ InputFile *ipf= (void*)rable;
+
+ if (ipf->filemon) filemon_stopfile(ipf);
+ loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+}
+
+static int tailing_on_readable(struct oop_readable *rable,
+ oop_readable_call *cb, void *user) {
+ InputFile *ipf= (void*)rable;
+
+ tailing_on_cancel(rable);
+ ipf->readable_callback= cb;
+ ipf->readable_callback_user= user;
+ filemon_startfile(ipf);
+
+ loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+ return 0;
+}
+
+static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
+ size_t length) {
+ InputFile *ipf= (void*)rable;
+ for (;;) {
+ ssize_t r= read(ipf->fd, buffer, length);
+ if (!r && ipf==main_input_file) { errno=EAGAIN; return -1; }
+ if (r==-1 && errno==EINTR) continue;
+ return r;
+ }
+}
+
+/*---------- filemon implemented with inotify ----------*/
+
+#if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON)
+#define HAVE_FILEMON
+
+#include <linux/inotify.h>
+
+static int filemon_inotify_fd;
+static int filemon_inotify_wdmax;
+static InputFile **filemon_inotify_wd2ipf;
+
+typedef struct Filemon_Perfile {
+ int wd;
+} Filemon_Inotify_Perfile;
+
+static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
+ int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
+ if (wd < 0) sysdie("inotify_add_watch %s", ipf->path);
+
+ if (wd >= filemon_inotify_wdmax) {
+ int newmax= wd+2;
+ filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf,
+ sizeof(*filemon_inotify_wd2ipf) * newmax);
+ memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
+ sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
+ filemon_inotify_wdmax= newmax;
+ }
+
+ assert(!filemon_inotify_wd2ipf[wd]);
+ filemon_inotify_wd2ipf[wd]= ipf;
+
+ pf->wd= wd;
+}
+
+static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
+ int wd= pf->wd;
+ int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
+ if (r) sysdie("inotify_rm_watch");
+ filemon_inotify_wd2ipf[wd]= 0;
+}
+
+static void *filemon_inotify_readable(oop_source *lp, int fd,
+ oop_event e, void *u) {
+ struct inotify_event iev;
+ for (;;) {
+ int r= read(filemon_inotify_fd, &iev, sizeof(iev));
+ if (r==-1) {
+ if (errno==EAGAIN) break;
+ sysdie("read from inotify master");
+ } else if (r==sizeof(iev)) {
+ assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
+ } else {
+ die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
+ }
+ InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
+ filemon_callback(ipf);