static void filemon_start(InputFile *ipf);
static void filemon_stop(InputFile *ipf);
-static void filemon_callback(InputFile *ipf);
+static void tailing_make_readable(InputFile *ipf);
static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
/* when changing defaults, remember to update the manpage */
static const char *sitename, *remote_host;
-static const char *feedfile, *path_cli, *path_cli_dir;
+static const char *feedfile, *path_run, *path_cli, *path_cli_dir;
static int quiet_multiple=0;
static int become_daemon=1, try_filemon=1;
static int try_stream=1;
/* all these are initialised to seconds, and converted to periods in main */
static int reconnect_delay_periods=1000;
static int flushfail_retry_periods=1000;
-static int backlog_retry_minperiods=50;
+static int backlog_retry_minperiods=100;
static int backlog_spontrescan_periods=300;
static int spontaneous_flush_periods=100000;
static int max_separated_periods=2000;
oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
off_t offset;
- int skippinglong, paused;
+ int skippinglong, paused, fake_readable;
ArticleList queue;
long inprogress; /* includes queue.count and also articles in conns */
for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
xmit_free(d);
+ LIST_REMOVE(conns,conn);
+
char *m= xvasprintf(fmt,al);
- warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
- conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
+ warn("C%d[%d] connection failed requeueing " RCI_TRIPLE_FMT_BASE ": %s",
+ conn->fd, conns.count, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
free(m);
- LIST_REMOVE(conns,conn);
conn_dispose(conn);
check_assign_articles();
}
&peer_rd_err, conn);
if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
- notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
LIST_ADDHEAD(conns, conn);
+ notice("C%d[%d] connected %s",
+ conns.count, conn->fd, conn->stream ? "streaming" : "plain");
connect_attempt_discard();
check_assign_articles();
if (code!=205 && code!=503) {
connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
} else {
- notice("C%d idle connection closed by us", conn->fd);
- assert(!conn_busy);
LIST_REMOVE(conns,conn);
+ notice("C%d[%d] idle connection closed by us", conns.count, conn->fd);
+ assert(!conn_busy);
conn_dispose(conn);
}
return OOP_CONTINUE;
if (conn_busy)
PEERBADMSG("peer timed us out or stopped accepting articles");
- notice("C%d idle connection closed by peer", conn->fd);
LIST_REMOVE(conns,conn);
+ notice("C%d[%d] idle connection closed by peer", conns.count, conn->fd);
conn_dispose(conn);
return OOP_CONTINUE;
static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
void *user) {
+ /* lifetime of ipf here is OK because destruction will cause
+ * on_cancel which will cancel this callback */
InputFile *ipf= user;
+
+ if (!ipf->fake_readable) return OOP_CONTINUE;
+
+ /* we just keep calling readable until our caller (oop_rd)
+ * has called try_read, and try_read has found EOF so given EAGAIN */
+ loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+
return ipf->readable_callback(loop, &ipf->readable,
ipf->readable_callback_user);
}
ipf->readable_callback= 0;
}
-static void tailing_queue_readable(InputFile *ipf) {
- /* lifetime of ipf here is OK because destruction will cause
- * on_cancel which will cancel this callback */
+static void tailing_make_readable(InputFile *ipf) {
+ if (!ipf || !ipf->readable_callback) /* so callers can be naive */
+ return;
+ ipf->fake_readable= 1;
loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
}
ipf->readable_callback= cb;
ipf->readable_callback_user= user;
filemon_start(ipf);
-
- tailing_queue_readable(ipf);
+ tailing_make_readable(ipf);
return 0;
}
ssize_t r= read(ipf->fd, buffer, length);
if (r==-1) {
if (errno==EINTR) continue;
+ ipf->fake_readable= 0;
return r;
}
if (!r) {
if (ipf==main_input_file) {
errno=EAGAIN;
+ ipf->fake_readable= 0;
return -1;
} else if (ipf==flushing_input_file) {
assert(ipf->rd);
abort();
}
}
- tailing_queue_readable(ipf);
return r;
}
}
}
InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
/*debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);*/
- filemon_callback(ipf);
+ tailing_make_readable(ipf);
}
return OOP_CONTINUE;
}
ipf->filemon= 0;
}
-static void filemon_callback(InputFile *ipf) {
- if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
- ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
-}
-
/*---------- interface to start and stop an input file ----------*/
static const oop_rd_style feedfile_rdstyle= {
case INNDCOMMCHILD_ESTATUS_NONESUCH:
notice("feed has been dropped by innd, finishing up");
flushing_input_file= main_input_file;
- tailing_queue_readable(flushing_input_file);
+ tailing_make_readable(flushing_input_file);
/* we probably previously returned EAGAIN from our fake read method
* when in fact we were at EOF, so signal another readable event
* so we actually see the EOF */
case 0:
/* as above */
flushing_input_file= main_input_file;
- tailing_queue_readable(flushing_input_file);
+ tailing_make_readable(flushing_input_file);
main_input_file= open_input_file(feedfile);
if (!main_input_file)
- die("flush succeeded but feedfile %s does not exist!", feedfile);
+ die("flush succeeded but feedfile %s does not exist!"
+ " (this probably means feedfile does not correspond"
+ " to site %s in newsfeeds)", feedfile, sitename);
if (flushing_input_file) {
- SMS(SEPARATED, max_separated_periods, "recovery flush complete");
+ SMS(SEPARATED, max_separated_periods, "flush complete");
} else {
close_defer();
- SMS(NORMAL, spontaneous_flush_periods, "flush complete");
+ SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete");
}
return OOP_CONTINUE;
}
static void filepoll(void) {
- filemon_callback(main_input_file);
- filemon_callback(flushing_input_file);
+ tailing_make_readable(main_input_file);
+ tailing_make_readable(flushing_input_file);
}
static char *debug_report_ipf(InputFile *ipf) {
{0,"no-filemon", 0, &try_filemon, op_setint, 0 },
{'C',"inndconf", "F", &inndconffile, op_string },
{'P',"port", "PORT", &port, op_integer },
+{0,"chdir", "DIR", &path_run, op_string },
{0,"cli", "DIR/|PATH", &path_cli, op_string },
{0,"help", 0, 0, help },
sitename= *argv++;
if (!sitename) badusage("need site name argument");
- remote_host= *argv++;
+
+ if (*argv) remote_host= *argv++;
+ else remote_host= sitename;
+
if (*argv) badusage("too many non-option arguments");
/* defaults */
if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
badusage("bad input data ratio must be between 0..100");
max_bad_data_ratio *= 0.01;
-
- if (!feedfile) {
- feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
- } else if (!feedfile[0]) {
- badusage("feed filename, if specified, must be nonempty");
- } else if (path_ends_slash(feedfile)) {
+
+ if (!path_run)
+ path_run= innconf->pathrun;
+
+ if (!feedfile) feedfile= sitename;
+ if (!feedfile[0]) badusage("feed filename, if specified, must be nonempty");
+ if (path_ends_slash(feedfile))
feedfile= xasprintf("%s%s", feedfile, sitename);
- }
+ if (feedfile[0] != '/')
+ feedfile= xasprintf("%s/%s", innconf->pathoutgoing, feedfile);
if (!path_cli) {
- path_cli_dir= xasprintf("%s/innduct", innconf->pathrun);
+ path_cli_dir= "innduct";
} else if (!path_cli[0] || !strcmp(path_cli,"none")) {
path_cli= 0; /* ok, don't then */
} else if (path_ends_slash(path_cli)) {
if (child1) _exit(0);
pid_t sid= setsid();
- if (sid != child1) sysfatal("setsid failed");
+ if (sid == -1) sysfatal("setsid failed");
pid_t child2= xfork("daemonise second fork");
if (child2) _exit(0);
self_pid= getpid();
if (self_pid==-1) sysdie("getpid");
+ r= chdir(path_run);
+ if (r) sysdie("could not chdir to pathrun %s", path_run);
+
statemc_lock();
init_signals();