static void filemon_start(InputFile *ipf);
static void filemon_stop(InputFile *ipf);
-static void filemon_callback(InputFile *ipf);
+static void tailing_make_readable(InputFile *ipf);
static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
/* when changing defaults, remember to update the manpage */
static const char *sitename, *remote_host;
-static const char *feedfile, *path_cli;
+static const char *feedfile, *path_cli, *path_cli_dir;
static int quiet_multiple=0;
static int become_daemon=1, try_filemon=1;
static int try_stream=1;
oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
off_t offset;
- int skippinglong, paused;
+ int skippinglong, paused, fake_readable;
ArticleList queue;
long inprogress; /* includes queue.count and also articles in conns */
memset(&sa,0,sizeof(sa));
int maxlen= sizeof(sa.un.sun_path);
+ if (!path_cli) {
+ info("control command line disabled");
+ return;
+ }
+
int pathlen= strlen(path_cli);
- if (pathlen > maxlen)
- NOCLI("cli socket path %s too long (%d>%d)", path_cli, pathlen, maxlen);
+ if (pathlen > maxlen) {
+ warn("no cli listener, because cli socket path %s too long (%d>%d)",
+ path_cli, pathlen, maxlen);
+ return;
+ }
+
+ if (path_cli_dir) {
+ int r= mkdir(path_cli_dir, 0700);
+ if (r && errno!=EEXIST)
+ NOCLI("create cli socket directory %s", path_cli_dir);
+ }
int r= unlink(path_cli);
if (r && errno!=ENOENT)
cli_master= socket(PF_UNIX, SOCK_STREAM, 0);
if (cli_master<0) NOCLI("create new cli master socket");
- int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path);
+ int sl= pathlen + offsetof(struct sockaddr_un, sun_path);
sa.un.sun_family= AF_UNIX;
memcpy(sa.un.sun_path, path_cli, pathlen);
xmit_free(d);
char *m= xvasprintf(fmt,al);
- warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s",
+ warn("C%d connection failed requeueing " RCI_TRIPLE_FMT_BASE ": %s",
conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
free(m);
static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
void *user) {
+ /* lifetime of ipf here is OK because destruction will cause
+ * on_cancel which will cancel this callback */
InputFile *ipf= user;
+
+ if (!ipf->fake_readable) return OOP_CONTINUE;
+
+ /* we just keep calling readable until our caller (oop_rd)
+ * has called try_read, and try_read has found EOF so given EAGAIN */
+ loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+
return ipf->readable_callback(loop, &ipf->readable,
ipf->readable_callback_user);
}
ipf->readable_callback= 0;
}
-static void tailing_queue_readable(InputFile *ipf) {
- /* lifetime of ipf here is OK because destruction will cause
- * on_cancel which will cancel this callback */
+static void tailing_make_readable(InputFile *ipf) {
+ if (!ipf || !ipf->readable_callback) /* so callers can be naive */
+ return;
+ ipf->fake_readable= 1;
loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
}
ipf->readable_callback= cb;
ipf->readable_callback_user= user;
filemon_start(ipf);
-
- tailing_queue_readable(ipf);
+ tailing_make_readable(ipf);
return 0;
}
ssize_t r= read(ipf->fd, buffer, length);
if (r==-1) {
if (errno==EINTR) continue;
+ ipf->fake_readable= 0;
return r;
}
if (!r) {
if (ipf==main_input_file) {
errno=EAGAIN;
+ ipf->fake_readable= 0;
return -1;
} else if (ipf==flushing_input_file) {
assert(ipf->rd);
abort();
}
}
- tailing_queue_readable(ipf);
return r;
}
}
}
InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
/*debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);*/
- filemon_callback(ipf);
+ tailing_make_readable(ipf);
}
return OOP_CONTINUE;
}
ipf->filemon= 0;
}
-static void filemon_callback(InputFile *ipf) {
- if (ipf && ipf->readable_callback) /* so filepoll() can be naive */
- ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user);
-}
-
/*---------- interface to start and stop an input file ----------*/
static const oop_rd_style feedfile_rdstyle= {
case INNDCOMMCHILD_ESTATUS_NONESUCH:
notice("feed has been dropped by innd, finishing up");
flushing_input_file= main_input_file;
- tailing_queue_readable(flushing_input_file);
+ tailing_make_readable(flushing_input_file);
/* we probably previously returned EAGAIN from our fake read method
* when in fact we were at EOF, so signal another readable event
* so we actually see the EOF */
case 0:
/* as above */
flushing_input_file= main_input_file;
- tailing_queue_readable(flushing_input_file);
+ tailing_make_readable(flushing_input_file);
main_input_file= open_input_file(feedfile);
if (!main_input_file)
}
static void filepoll(void) {
- filemon_callback(main_input_file);
- filemon_callback(flushing_input_file);
+ tailing_make_readable(main_input_file);
+ tailing_make_readable(flushing_input_file);
}
static char *debug_report_ipf(InputFile *ipf) {
{0,"no-filemon", 0, &try_filemon, op_setint, 0 },
{'C',"inndconf", "F", &inndconffile, op_string },
{'P',"port", "PORT", &port, op_integer },
-{0,"cli", 0, &path_cli, op_string },
+{0,"cli", "DIR/|PATH", &path_cli, op_string },
{0,"help", 0, 0, help },
{0,"max-connections", "N", &max_connections, op_integer },
*store /= period_seconds;
}
-static void assemble_path(const char **path_io, const char *suffix,
- const char *what) {
- const char *const specified= *path_io;
- if (!specified[0]) badusage("%s, if specified, must be nonempty", what);
- if (specified[strlen(specified)-1]=='/')
- *path_io= xasprintf("%s%s%s", specified, sitename, suffix);
+static int path_ends_slash(const char *specified) {
+ int l= strlen(specified);
+ assert(l);
+ return specified[l-1] == '/';
}
int main(int argc, char **argv) {
badusage("bad input data ratio must be between 0..100");
max_bad_data_ratio *= 0.01;
- if (!feedfile) feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
- else assemble_path(&feedfile, "", "feed filename");
-
- if (!path_cli) path_cli= xasprintf("%s_cli", feedfile);
- else assemble_path(&path_cli, "%s_cli", "cli socket path");
+ if (!feedfile) {
+ feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
+ } else if (!feedfile[0]) {
+ badusage("feed filename, if specified, must be nonempty");
+ } else if (path_ends_slash(feedfile)) {
+ feedfile= xasprintf("%s%s", feedfile, sitename);
+ }
+
+ if (!path_cli) {
+ path_cli_dir= xasprintf("%s/innduct", innconf->pathrun);
+ } else if (!path_cli[0] || !strcmp(path_cli,"none")) {
+ path_cli= 0; /* ok, don't then */
+ } else if (path_ends_slash(path_cli)) {
+ path_cli_dir= xasprintf("%.*s", strlen(path_cli)-1, path_cli);
+ }
+ if (path_cli_dir)
+ path_cli= xasprintf("%s/%s", path_cli_dir, sitename);
if (max_queue_per_ipf<0)
max_queue_per_ipf= max_queue_per_conn * 2;