/*
- * bugs
- *
-
- [740] <sit> info: processed feedfile(null) read=4(+0bl,+6err) offered=5(ch5,nc0) accepted=0(ch0+nc0) unwanted=0(0id+0bd+0nc) rejected=0(0id+0bd+0nc) deferred=0(0id+0bd+0nc) missing=2(0id+2bd+0nc) connretry=0(0id+0bd+0nc)
-
- also unwanted should be nonzero I think
-
- [740] <sit> warning: corrupted file: /home/ian/things/Innfeed/inn2-2.4.5/fee, offset 349: line partially blanked: in ` @050000002D130000006A0000000000000000@ <mi'..
-
-
-
- * some per-conn info thing for control
-
* todo
- * - actually do something with readable on control master
- * - option for realsockdir
- * - option for filepoll
- * - option for no inotify
+ * - inotify not working ?
+ * - some per-conn info thing for control
* - manpage: document control master stuff
- * - manpage: innconf is used for communicating with innd
- * - debug this:
+ *
+ * debugging rune:
* build-lfs/backends/innduct --no-daemon -f `pwd`/fee sit dom
*/
static const char *sitename, *remote_host;
static const char *feedfile, *realsockdir="/tmp/innduct.control";
static int quiet_multiple=0;
-static int become_daemon=1;
+static int become_daemon=1, try_filemon=1;
static int try_stream=1;
static int port=119;
static const char *inndconffile;
static int max_queue_per_conn=200;
static int target_max_feedfile_size=100000;
static int period_seconds=60;
+static int filepoll_seconds=5;
static int connection_setup_timeout=200;
static int inndcomm_flush_timeout=100;
CCMD(period) { period(); }
CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
CCMD(setint) { *(int*)c->xdata= c->xval; }
+CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
static const ControlCommand control_commands[]= {
{ "h", ccmd_help },
{ "p", ccmd_period },
{ "pretend flush", ccmd_setintarg, &simulate_flush },
- { "poke sm", ccmd_setint, &sm_period_counter, 1 },
- { "poke conn", ccmd_setint, &until_connect, 0 },
- { "poke blscan", ccmd_setint, &until_backlog_nextscan, 0 },
+
+#define POKES(cmd,func) \
+ { cmd "sm", func, &sm_period_counter, 1 }, \
+ { cmd "conn", func, &until_connect, 0 }, \
+ { cmd "blscan", func, &until_backlog_nextscan, 0 },
+POKES("prod ", ccmd_setint_period)
+POKES("next ", ccmd_setint)
+
{ "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 },
{ 0 }
};
uid_t self= geteuid();
if (!S_ISDIR(stab.st_mode) ||
stab.st_uid != self ||
- stab.st_mode & 0077) {
+ stab.st_mode & 0007) {
warn("no control socket, because real socket directory"
" is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
!!S_ISDIR(stab.st_mode),
(abort(),-1);
if (!artdata) art->missing= 1;
-fprintf(stderr,"INC %d %d?%d:%d\n",(int)art->state,
- !!artdata, (int)RC_sent, (int)RC_missing);
art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
if (conn->stream) {
XMIT_LITERAL("\r\n");
assert(art->state == art_Unchecked);
-fprintf(stderr,"INC %d %d\n",(int)art->state,(int)RC_sent);
art->ipf->counts[art->state][RC_sent]++;
LIST_ADDTAIL(conn->sent, art);
}
}
static void article_done(Conn *conn, Article *art, int whichcount) {
-fprintf(stderr,"INC %d %d\n",(int)art->state,whichcount);
if (!art->missing) art->ipf->counts[art->state][whichcount]++;
if (whichcount == RC_accepted) update_nocheck(1);
return OOP_CONTINUE;
}
+ int conn_busy=
+ conn->waiting.count ||
+ conn->priority.count ||
+ conn->sent.count ||
+ conn->xmitu;
+
if (conn->quitting) {
if (code!=205 && code!=503) {
connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
} else {
- notice("C%d idle connection closed", conn->fd);
- assert(!conn->waiting.count);
- assert(!conn->priority.count);
- assert(!conn->sent.count);
- assert(!conn->xmitu);
+ notice("C%d idle connection closed by us", conn->fd);
+ assert(!conn_busy);
LIST_REMOVE(conns,conn);
conn_dispose(conn);
}
conn->since_activity= 0;
Article *art;
-#define GET_ARTICLE(musthavesent) \
- art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
- if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */
+#define GET_ARTICLE(musthavesent) do{ \
+ art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
+ if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */ \
+ }while(0)
-#define ARTICLE_DEALTWITH(streaming,musthavesent,how) \
- code_streaming= (streaming); \
- GET_ARTICLE(musthavesent); \
- article_done(conn, art, RC_##how); break;
+#define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \
+ code_streaming= (streaming); \
+ GET_ARTICLE(musthavesent); \
+ article_done(conn, art, RC_##how); \
+ goto dealtwith; \
+ }while(0)
-#define PEERBADMSG(m) connfail(conn, m ": %s", sani); return OOP_CONTINUE
+#define PEERBADMSG(m) do { \
+ connfail(conn, m ": %s", sani); return OOP_CONTINUE; \
+ }while(0)
int code_streaming= 0;
switch (code) {
case 400: PEERBADMSG("peer stopped accepting articles");
- case 503: PEERBADMSG("peer timed us out");
default: PEERBADMSG("peer sent unexpected message");
+ case 503:
+ if (conn_busy) PEERBADMSG("peer timed us out");
+ notice("C%d idle connection closed by peer", conn->fd);
+ LIST_REMOVE(conns,conn);
+ conn_dispose(conn);
+ return OOP_CONTINUE;
+
case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
case 335: /* IHAVE says send it */
GET_ARTICLE(-1);
assert(art->state == art_Unchecked);
-fprintf(stderr,"INC %d %d\n",(int)art->state,(int)RC_accepted);
art->ipf->counts[art->state][RC_accepted]++;
art->state= art_Wanted;
LIST_ADDTAIL(conn->priority, art);
break;
}
+dealtwith:
conn_maybe_write(conn);
check_assign_articles();
art->midlen= midlen;
art->ipf= ipf; ipf->inprogress++;
art->token= TextToToken(tokentextbuf);
- art->offset= ipf->offset;
+ art->offset= old_offset;
art->blanklen= recsz;
strcpy(art->messageid, space+1);
LIST_ADDTAIL(queue, art);
/*---------- filemon implemented with inotify ----------*/
-#if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON)
+#if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
#define HAVE_FILEMON
-#include <linux/inotify.h>
+#include <sys/inotify.h>
static int filemon_inotify_fd;
static int filemon_inotify_wdmax;
if (wd >= filemon_inotify_wdmax) {
int newmax= wd+2;
- filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf,
+ filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
sizeof(*filemon_inotify_wd2ipf) * newmax);
memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
int wd= pf->wd;
debug("filemon inotify stopfile %p wd=%d", ipf, wd);
- int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
+ int r= inotify_rm_watch(filemon_inotify_fd, wd);
if (r) sysdie("inotify_rm_watch");
filemon_inotify_wd2ipf[wd]= 0;
}
die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
}
InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
- debug("filemon inotify readable read %p wd=%p", iev.wd, ipf);
+ debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
filemon_callback(ipf);
}
return OOP_CONTINUE;
static int filemon_method_init(void) {
filemon_inotify_fd= inotify_init();
if (filemon_inotify_fd<0) {
- syswarn("could not initialise inotify: inotify_init failed");
+ syswarn("filemon/inotify: inotify_init failed");
return 0;
}
- set nonblock;
- loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
+ xsetnonblock(filemon_inotify_fd, 1);
+ loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
return 1;
struct Filemon_Perfile { int dummy; };
-static int filemon_method_init(void) { return 0; }
+static int filemon_method_init(void) {
+ warn("filemon/dummy: no filemon method compiled in");
+ return 0;
+}
static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
{'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 },
{0,"no-daemon", 0, &become_daemon, op_setint, 0 },
{0,"no-streaming", 0, &try_stream, op_setint, 0 },
+{0,"no-filemon", 0, &try_filemon, op_setint, 0 },
{'C',"inndconf", "F", &inndconffile, op_string },
{'P',"port", "PORT", &port, op_integer },
+{0,"ctrl-sock-dir", 0, &realsockdir, op_string },
{0,"help", 0, 0, help },
{0,"max-connections", "N", &max_connections, op_integer },
{0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer },
{0,"period-interval", "TIME", &period_seconds, op_seconds },
-{0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds },
-{0,"stuck-flush-timeout","TIME", &inndcomm_flush_timeout, op_seconds },
+{0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds },
+{0,"stuck-flush-timeout", "TIME", &inndcomm_flush_timeout, op_seconds },
+{0,"feedfile-poll", "TIME", &filepoll_seconds, op_seconds },
{0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double },
{0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double },
control_init();
- if (!filemon_method_init()) {
- warn("no file monitoring available, polling");
- every(5,0,filepoll);
+ int filemon_ok= 0;
+ if (!try_filemon) {
+ notice("filemon: suppressed by command line option, polling");
+ } else {
+ filemon_ok= filemon_method_init();
+ if (!filemon_ok)
+ warn("filemon: no file monitoring available, polling");
}
+ if (!filemon_ok)
+ every(filepoll_seconds,0,filepoll);
every(period_seconds,1,period);