+/*========== responses from peer ==========*/
+
+static const oop_rd_style peer_rd_style= {
+ OOP_RD_DELIM_STRIP, '\n',
+ OOP_RD_NUL_FORBID,
+ OOP_RD_SHORTREC_FORBID
+};
+
+static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
+ const char *errmsg, int errnoval,
+ const char *data, size_t recsz, void *conn_v) {
+ Conn *conn= conn_v;
+
+ if (ev == OOP_RD_EOF) {
+ warn("unexpected EOF from peer");
+ conn_failed(conn);
+ return;
+ }
+ assert(ev == OOP_RD_OK);
+
+ char *ep;
+ unsigned long code= strtoul(data, &ep, 10);
+ if (ep != data+3 || *ep != ' ' || data[0]=='0') {
+ char sanibuf[100];
+ const char *p= data;
+ char *q= sanibuf;
+ *q++= '`';
+ for (;;) {
+ if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; }
+ int c= *p++;
+ if (!c) { *q++= '\''; break; }
+ if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
+ sprintf(q,"\\x%02x",c);
+ q += 4;
+ }
+ warn("badly formatted response from peer: %s", sanibuf);
+ conn_failed(conn);
+ return;
+ }
+
+ if (conn->quitting) {
+ if (code!=205) {
+ warn("peer gave failure response to QUIT: %s", sani);
+ conn_failed(conn);
+ return;
+ }
+ conn close ok;
+ return;
+ }
+
+ switch (code) {
+ case 438: /* CHECK says they have it */
+ case 435: /* IHAVE says they have it */
+ ARTICLE_DEALTWITH(1,unwanted);
+ break;
+
+ case 238: /* CHECK says send it */
+ case 335: /* IHAVE says send it */
+ count_checkedwanted++;
+ Article *art= LIST_REMHEAD(conn->sent);
+ art->checked= 1;
+ LIST_ADDTAIL(conn->queue);
+ break;
+
+ case 235: /* IHAVE says thanks */
+ case 239: /* TAKETHIS says thanks */
+ ARTICLE_DEALTWITH(1,accepted);
+ break;
+
+ case 439: /* TAKETHIS says rejected */
+ case 437: /* IHAVE says rejected */
+ ARTICLE_DEALTWITH(1,rejected);
+ break;
+
+ case 431: /* CHECK or TAKETHIS says try later */
+ case 436: /* IHAVE says try later */
+ ARTICLE_DEALTWITH(0,deferred);
+ break;
+
+ case 400: warn("peer has stopped accepting articles: %s", sani); goto failed;
+ case 503: warn("peer timed us out: %s", sani); goto failed;
+ default: warn("peer sent unexpected message: %s", sani);
+ failed:
+ conn_failed(conn);
+ return OOP_CONTINUE;;
+ }
+
+ return OOP_CONTINUE;
+}
+
+/*========== monitoring of input file ==========*/
+
+/*---------- tailing input file ----------*/
+
+
+
+/*---------- filemon implemented with inotify ----------*/
+
+#if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON)
+#define HAVE_FILEMON
+
+#include <linux/inotify.h>
+
+static int filemon_inotify_fd;
+static int filemon_inotify_wdmax;
+static InputFile **filemon_inotify_wd2ipf;
+
+typedef struct Filemon_Perfile {
+ int wd;
+} Filemon_Inotify_Perfile;
+
+static void filemon_startfile(InputFile *ipf) {
+ int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
+ if (wd < 0) sysdie("inotify_add_watch %s", ipf->path);
+
+ if (wd >= filemon_inotify_wdmax) {
+ int newmax= wd+2;
+ filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf,
+ sizeof(*filemon_inotify_wd2ipf) * newmax);
+ memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
+ sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
+ filemon_inotify_wdmax= newmax;
+ }
+
+ assert(!filemon_inotify_wd2ipf[wd]);
+ filemon_inotify_wd2ipf[wd]= ipf;
+
+ assert(!ipf->filemon);
+ ipf->filemon= xmalloc(sizeof(*ipf->filemon));
+ ipf->filemon->wd= wd;
+}
+
+static void filemon_stopfile(InputFile *ipf) {
+ int wd= ipf->filemon->wd;
+ int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
+ if (r) sysdie("inotify_rm_watch");
+ filemon_inotify_wd2ipf[wd]= 0;
+ free(ipf->filemon);
+ ipf->filemon= 0;
+}
+
+static void *filemon_inotify_readable(oop_source *lp, int fd,
+ oop_event e, void *u) {
+ struct inotify_event iev;
+ for (;;) {
+ int r= read(filemon_inotify_fd, &iev, sizeof(iev));
+ if (r==-1) {
+ if (errno==EAGAIN) break;
+ sysdie("read from inotify master");
+ } else if (r==sizeof(iev)) {
+ assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
+ } else {
+ die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
+ }
+ filemon_callback(filemon_inotify_wd2ipf[iev.wd]);
+ }
+ return OOP_CONTINUE;
+}
+
+static int filemon_init(void) {
+ filemon_inotify_fd= inotify_init();
+ if (filemon_inotify_fd<0) {
+ syswarn("could not initialise inotify: inotify_init failed");
+ return 0;
+ }
+ set nonblock;
+ loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
+
+ return 1;
+}
+
+#endif /* HAVE_INOTIFY && !HAVE_FILEMON *//
+
+/*---------- filemon dummy implementation ----------*/
+
+#if !defined(HAVE_FILEMON)
+
+static int filemon_init(void) { return 0; }
+static void filemon_startfile(InputFile *ipf) { }
+static void filemon_stopfile(InputFile *ipf) { }
+
+#endif
+
+/*========== interaction with innd ==========*/
+
+/* See official state diagram at top of file. We implement
+ * this as follows:
+ *
+ ================
+ WAITING
+ [Nothing/Noduct]
+ poll for F
+ ================
+ |
+ | TIMEOUT
+ |`--------------------------.
+ | | install defer as backlog
+ ,--------->| | exit
+ | | OPEN F SUCCEEDS V
+ | V =========
+ | ======== (ESRCH)
+ | NORMAL [Dropped]
+ | [Normal] =========
+ | read F
+ | ========
+ | |
+ | | F IS SO BIG WE SHOULD FLUSH
+ ^ | hardlink F to D
+ | [Hardlinked]
+ | | unlink F
+ | | our handle onto F is now onto D
+ | [Moved]
+ | |
+ | |<---------------------------------------------------.
+ | | |
+ | | spawn inndcomm flush |
+ | V |
+ | ========== |
+ | FLUSHING |
+ | [Flushing] |
+ | read D |
+ | ========== |
+ | | |
+ | | INNDCOMM FLUSH FAILS ^
+ | |`----------------------->--------. |
+ | | | |
+ | | NO SUCH SITE V |
+ ^ |`----------------. ========= |
+ | | | FLUSHFAIL |
+ | | V [Moved] |
+ | | ========== read D |
+ | | DROPPING ========= |
+ | | [Dropping] | |
+ | | read D | TIME TO RETRY |
+ | | ========== `------------------'
+ | | FLUSH OK |
+ | | open F | AT EOF OF D AND ALL PROCESSED
+ | V | install defer as backlog
+ | ========== | unlink D
+ | FLUSHED | exit
+ | [Flushed] V
+ | read D, F ==========
+ | ========== (ESRCH)
+ | | [Droppped]
+ | | ==========
+ | |
+ | | AT EOF OF D AND ALL D PROCESSED
+ ^ V unlink D
+ | | close D
+ | | install defer as backlog
+ | | start new defer
+ | |
+ `----------'
+
+ *
+ * duct state
+ * WAITING
+ * NORMAL
+ * FLUSHING
+ * FLUSHED
+ * FLUSHFAIL
+ * DROPPING
+ */
+
+static char *path_ductlock, *path_duct, *path_ductdefer;
+
+typedef struct {
+ /* This is an instance of struct oop_readable */
+ struct oop_readable readable;
+ oop_readable_call *readable_callback;
+
+ int fd;
+ const char *path; /* ptr copy of path_<foo> or feedfile */
+ struct Filemon_Perfile *filemon;
+
+ oop_read *rd;
+ long inprogress; /* no. of articles read but not processed */
+} InputFile;
+
+static void statemc_init(void) {
+ path_ductlock= xasprintf("%s_duct.lock", feedfile);
+ path_duct= xasprintf("%s_duct", feedfile);
+ path_ductdefer= xasprintf("%s_duct.defer", feedfile);
+
+ int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600);
+ if (lockfd<0) sysdie("open lockfile %s", path_ductlock);
+
+ struct flock fl;
+ memset(&fl,0,sizeof(fl));
+ fl.l_type= F_WRLCK;
+ fl.l_whence= SEEK_SET;
+ r= fcntl(lockfd, F_SETLK, &fl);
+ if (r==-1) {
+ if (errno==EACCES || errno==EAGAIN)
+ die("another duct holds the lockfile");
+ sysdie("fcntl F_SETLK lockfile %s", path_ductlock);
+ }
+}
+
+static void statemc_poll(void) {
+ if (tailing_fd>=0) return;
+
+ int d_fd= open(path_duct, O_RDWR);
+ if (d_fd<0)
+ if (errno!=ENOENT) sysdie("open duct file %s", path_duct);
+
+ int f_fd= open(feedfile, O_RDWR);
+ if (f_fd<0)
+ if (errno!=ENOENT) sysdie("open feed file %s", feedfile);
+
+ if (d_fd<0) {
+ if (f_fd>=0)
+ start_tailing(f_fd);
+ return;
+ }
+
+
+
+/*========== main program ==========*/
+
+#define EVERY(what, interval, body) \
+ static const struct timeval what##_timeout = { 5, 0 }; \
+ static void what##_schedule(void); \
+ static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \
+ { body } \
+ what##_schedule(); \
+ } \
+ static void what##_schedule(void) { \
+ loop->on_time(loop, what##_timeout, what##_timedout, 0); \
+ }
+
+EVERY(filepoll, {5,0}, { check_master_queue(); })