X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=c0404fc011b9a0b0715055664c635170ee6812d7;hp=b829841613d0aa010d7ce43bdd0e0934479a5fe6;hb=8637044e5d6c13c4ddf8dc83440a59b060b25a3d;hpb=df2fee3940b22c3b173e077bbd0c83007b23e794 diff --git a/backends/innduct.c b/backends/innduct.c index b829841..c0404fc 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,28 +1,28 @@ /* - * debugging rune: - * build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost + * innduct + * tailing reliable realtime streaming feeder for inn + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) */ -/*-- -flow control notes -to ensure articles go away eventually -separate queue for each input file - queue expiry - every period, check head of backlog queue for expiry with SMretrieve - if too old: discard, and check next article - also check every backlog article as we read it - flush expiry - after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping - one-off: eat queued articles from flushing and write them to defer - one-off: connfail all connections which have any articles from flushing - newly read articles from flushing go straight to defer - this should take care of it and get us out of this state -to avoid filling up ram needlessly - input control - limit number of queued articles for each ipf - pause/resume inputfile tailing ---*/ - /* * Newsfeeds file entries should look like this: * host.name.of.site[/exclude,exclude,...]\ @@ -350,11 +350,12 @@ static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); static const oop_rd_style peer_rd_style; static oop_rd_call peer_rd_err, peer_rd_ok; + /*----- configuration options -----*/ /* when changing defaults, remember to update the manpage */ static const char *sitename, *remote_host; -static const char *feedfile, *realsockdir="/tmp/innduct.control"; +static const char *feedfile, *path_control; static int quiet_multiple=0; static int become_daemon=1, try_filemon=1; static int try_stream=1; @@ -516,8 +517,7 @@ struct Conn { /* main initialises */ static oop_source *loop; static ConnList conns; -static char *path_lock, *path_flushing, *path_defer; -static char *path_control, *path_dump; +static char *path_lock, *path_flushing, *path_defer, *path_dump; static char *globpat_backlog; static pid_t self_pid; @@ -999,8 +999,6 @@ static void *control_master_readable(oop_source *lp, int master, }while(0) static void control_init(void) { - char *real=0; - union { struct sockaddr sa; struct sockaddr_un un; @@ -1009,66 +1007,24 @@ static void control_init(void) { memset(&sa,0,sizeof(sa)); int maxlen= sizeof(sa.un.sun_path); - int reallen= readlink(path_control, sa.un.sun_path, maxlen); - if (reallen<0) { - if (errno != ENOENT) - NOCONTROL("readlink control socket symlink path %s", path_control); - } - if (reallen >= maxlen) { - debug("control socket symlink path too long (r=%d)",reallen); - xunlink(path_control, "old (overlong) control socket symlink"); - reallen= -1; - } - - if (reallen<0) { - struct stat stab; - int r= lstat(realsockdir,&stab); - if (r) { - if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir); - - r= mkdir(realsockdir, 0700); - if (r) NOCONTROL("mkdir real socket dir %s", realsockdir); - - } else { - uid_t self= geteuid(); - if (!S_ISDIR(stab.st_mode) || - stab.st_uid != self || - stab.st_mode & 0007) { - warn("no control socket, because real socket directory" - " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)", - !!S_ISDIR(stab.st_mode), - (unsigned long)stab.st_uid, (unsigned long)self, - (unsigned long)stab.st_mode & 0777UL); - goto nocontrol; - } - } - - real= xasprintf("%s/s%lx.%lx", realsockdir, - (unsigned long)xtime(), (unsigned long)self_pid); - int reallen= strlen(real); + int pathlen= strlen(path_control); + if (pathlen > maxlen) + NOCONTROL("control socket path %s too long (%d>%d)", + path_control, pathlen, maxlen); - if (reallen >= maxlen) { - warn("no control socket, because tmpnam gave overly-long path" - " %s", real); - goto nocontrol; - } - r= symlink(real, path_control); - if (r) NOCONTROL("make control socket path %s a symlink to real" - " socket path %s", path_control, real); - memcpy(sa.un.sun_path, real, reallen); - } - - int r= unlink(sa.un.sun_path); + int r= unlink(path_control); if (r && errno!=ENOENT) - NOCONTROL("remove old real socket %s", sa.un.sun_path); + NOCONTROL("remove old control socket %s", path_control); control_master= socket(PF_UNIX, SOCK_STREAM, 0); if (control_master<0) NOCONTROL("create new control socket"); - sa.un.sun_family= AF_UNIX; int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path); + sa.un.sun_family= AF_UNIX; + memcpy(sa.un.sun_path, path_control, pathlen); + r= bind(control_master, &sa.sa, sl); - if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path); + if (r) NOCONTROL("bind to control socket path %s", sa.un.sun_path); r= listen(control_master, 5); if (r) NOCONTROL("listen"); @@ -1076,12 +1032,11 @@ static void control_init(void) { xsetnonblock(control_master, 1); loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0); - info("control socket ok, real path %s", sa.un.sun_path); + info("control socket ok, listening on %s", path_control); return; nocontrol: - free(real); xclose_perhaps(&control_master, "control master",0); return; } @@ -1539,6 +1494,26 @@ static void conn_maybe_write(Conn *conn) { /*---------- expiry, flow control and deferral ----------*/ +/* + * flow control notes + * to ensure articles go away eventually + * separate queue for each input file + * queue expiry + * every period, check head of backlog queue for expiry with SMretrieve + * if too old: discard, and check next article + * also check every backlog article as we read it + * flush expiry + * after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping + * one-off: eat queued articles from flushing and write them to defer + * one-off: connfail all connections which have any articles from flushing + * newly read articles from flushing go straight to defer + * this should take care of it and get us out of this state + * to avoid filling up ram needlessly + * input control + * limit number of queued articles for each ipf + * pause/resume inputfile tailing + */ + static void check_reading_pause_resume(InputFile *ipf) { if (ipf->queue.count >= max_queue_per_ipf) inputfile_reading_pause(ipf); @@ -3305,6 +3280,7 @@ static void dump_input_file(FILE *f, const ControlCommand *c, DUMPV("%d", ipf->,readcount_ok); DUMPV("%d", ipf->,readcount_blank); DUMPV("%d", ipf->,readcount_err); + DUMPV("%d", ipf->,count_nooffer_missing); } fprintf(f,"\n"); if (ipf) { @@ -3388,10 +3364,11 @@ CCMD(dump) { } fprintf(f,"paths"); + DUMPV("%s", , feedfile); + DUMPV("%s", , path_control); DUMPV("%s", , path_lock); DUMPV("%s", , path_flushing); DUMPV("%s", , path_defer); - DUMPV("%s", , path_control); DUMPV("%s", , path_dump); DUMPV("%s", , globpat_backlog); fprintf(f,"\n"); @@ -3575,7 +3552,7 @@ static const Option innduct_options[]= { {0,"no-filemon", 0, &try_filemon, op_setint, 0 }, {'C',"inndconf", "F", &inndconffile, op_string }, {'P',"port", "PORT", &port, op_integer }, -{0,"ctrl-sock-dir", 0, &realsockdir, op_string }, +{0,"cli", 0, &path_control, op_string }, {0,"help", 0, 0, help }, {0,"max-connections", "N", &max_connections, op_integer }, @@ -3625,6 +3602,14 @@ static void convert_to_periods_rndup(int *store) { *store /= period_seconds; } +static void assemble_path(const char **path_io, const char *suffix, + const char *what) { + const char *const specified= *path_io; + if (!specified[0]) badusage("%s, if specified, must be nonempty", what); + if (specified[strlen(specified)-1]=='/') + *path_io= xasprintf("%s%s%s", specified, sitename, suffix); +} + int main(int argc, char **argv) { if (!argv[1]) { printusage(stderr); @@ -3666,14 +3651,12 @@ int main(int argc, char **argv) { if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100) badusage("bad input data ratio must be between 0..100"); max_bad_data_ratio *= 0.01; + + if (!feedfile) feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename); + else assemble_path(&feedfile, "", "feed filename"); - if (!feedfile) { - feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename); - } else if (!feedfile[0]) { - badusage("feed filename must be nonempty"); - } else if (feedfile[strlen(feedfile)-1]=='/') { - feedfile= xasprintf("%s%s",feedfile,sitename); - } + if (path_control) path_control= xasprintf("%s_cli", feedfile); + else assemble_path(&path_control, "%s_cli", "control socket path"); if (max_queue_per_ipf<0) max_queue_per_ipf= max_queue_per_conn * 2; @@ -3689,7 +3672,6 @@ int main(int argc, char **argv) { path_lock= xasprintf("%s_lock", feedfile); path_flushing= xasprintf("%s_flushing", feedfile); path_defer= xasprintf("%s_defer", feedfile); - path_control= xasprintf("%s_control", feedfile); path_dump= xasprintf("%s_dump", feedfile); globpat_backlog= xasprintf("%s_backlog*", feedfile); @@ -3732,6 +3714,10 @@ int main(int argc, char **argv) { notice("starting"); + int val= 1; + r= SMsetup(SM_PREOPEN, &val); if (!r) warn("SMsetup SM_PREOPEN failed"); +// r= SMinit(); if (!r) die("storage manager initialisation (SMinit) failed"); + if (!become_daemon) control_stdio();