/*
+ * warning if no inotify
+ * inotify not working ?
+ * some per-conn info thing for control
+
* todo
+
* - actually do something with readable on control master
* - option for realsockdir
* - option for filepoll
#include <glob.h>
#include <time.h>
#include <math.h>
+#include <ctype.h>
#include <oop.h>
#include <oop-read.h>
#define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
#define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
-#define LIST_INIT(l) (list_new(&(l).u.li))
+#define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
#define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
#define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
#define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
static void statemc_start_flush(const char *why); /* Normal => Flushing */
static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
+static void article_done(Conn *conn, Article *art, int whichcount);
+
static void check_assign_articles(void);
static void queue_check_input_done(void);
art_Unchecked, /* not checked, not sent checking */
art_Wanted, /* checked, wanted sent body as requested */
art_Unsolicited, /* - sent body without check */
- art_MaxState
+ art_MaxState,
} ArtState;
#define RESULT_COUNTS(RCS,RCN) \
RCN(unwanted) \
RCN(rejected) \
RCN(deferred) \
+ RCN(missing) \
RCN(connretry)
-#define RCI_TRIPLE_FMT_BASE "%d(id%d+bd%d+nc%d)"
+#define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
#define RCI_TRIPLE_VALS_BASE(counts,x) \
counts[art_Unchecked] x \
+ counts[art_Wanted] x \
struct Article {
ISNODE(Article);
ArtState state;
- int midlen;
+ int midlen, missing;
InputFile *ipf;
TOKEN token;
off_t offset;
static void vconnfail(Conn *conn, const char *fmt, va_list al) {
int requeue[art_MaxState];
+ memset(requeue,0,sizeof(requeue));
Article *art;
while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art);
}
#define PREP_DECL_MSG_CMSG(msg) \
+ char msgbyte= 0; \
+ struct iovec msgiov; \
+ msgiov.iov_base= &msgbyte; \
+ msgiov.iov_len= 1; \
struct msghdr msg; \
memset(&msg,0,sizeof(msg)); \
- char msg##cbuf[CMSG_SPACE(sizeof(fd))]; \
+ char msg##cbuf[CMSG_SPACE(sizeof(int))]; \
+ msg.msg_iov= &msgiov; \
+ msg.msg_iovlen= 1; \
msg.msg_control= msg##cbuf; \
msg.msg_controllen= sizeof(msg##cbuf);
Conn *conn= 0;
assert(fd == connecting_fdpass_sock);
+
PREP_DECL_MSG_CMSG(msg);
+
ssize_t rs= recvmsg(fd, &msg, 0);
if (rs<0) {
if (isewouldblock(errno)) return OOP_CONTINUE;
conn= xmalloc(sizeof(*conn));
memset(conn,0,sizeof(*conn));
+ LIST_INIT(conn->waiting);
+ LIST_INIT(conn->priority);
+ LIST_INIT(conn->sent);
struct cmsghdr *h= 0;
if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
}
/* Phew! */
- LIST_INIT(conn->waiting);
- LIST_INIT(conn->priority);
- LIST_INIT(conn->sent);
conn->max_queue= conn->stream ? max_queue_per_conn : 1;
loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
connect_attempt_discard();
check_assign_articles();
- return 0;
+ return OOP_CONTINUE;
x:
conn_dispose(conn);
assert(!connecting_child);
assert(!connecting_fdpass_sock);
- notice("starting connection attempt");
+ info("starting connection attempt");
int socks[2];
int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
alarm(connection_setup_timeout);
if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
- if (buf[0]) fatal("connect: failed: %s", sanitise(buf));
- else sysfatal("connect: connection attempt failed");
+ int l= strlen(buf);
+ int stripped=0;
+ while (l>0) {
+ unsigned char c= buf[l-1];
+ if (!isspace(c)) break;
+ if (c=='\n' || c=='\r') stripped=1;
+ --l;
+ }
+ if (!buf[0]) {
+ sysfatal("connect: connection attempt failed");
+ } else {
+ buf[l]= 0;
+ fatal("connect: %s: %s", stripped ? "rejected" : "failed",
+ sanitise(buf));
+ }
}
if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
sysfatal("connect: authentication failed");
if (try_stream) {
- if (fputs("MODE STREAM\r\n", cn_to) ||
+ if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
fflush(cn_to))
sysfatal("connect: could not send MODE STREAM");
buf[sizeof(buf)-1]= 0;
}
int l= strlen(buf);
assert(l>=1);
- if (buf[-1]!='\n')
+ if (buf[l-1]!='\n')
fatal("connect: response to MODE STREAM is too long: %.100s...",
sanitise(buf));
l--; if (l>0 && buf[l-1]=='\r') l--;
msg.msg_controllen= cmsg->cmsg_len;
r= sendmsg(socks[1], &msg, 0);
- if (r) sysdie("sendmsg failed for new connection");
+ if (r<0) sysdie("sendmsg failed for new connection");
+ if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
_exit(exitstatus);
}
if (!inqueue) use->since_activity= 0; /* reset idle counter */
while (spare>0) {
Article *art= LIST_REMHEAD(queue);
+ if (!art) break;
LIST_ADDTAIL(use->waiting, art);
spare--;
}
ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
+ art->state=
+ art->state == art_Unchecked ? art_Unsolicited :
+ art->state == art_Wanted ? art_Wanted :
+ (abort(),-1);
+
+ if (!artdata) art->missing= 1;
+ art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
+
if (conn->stream) {
if (artdata) {
XMIT_LITERAL("TAKETHIS ");
xmit_noalloc(conn, art->messageid, art->midlen);
XMIT_LITERAL("\r\n");
xmit_artbody(conn, artdata);
+ } else {
+ article_done(conn, art, -1);
+ continue;
}
} else {
/* we got 235 from IHAVE */
}
}
- art->state=
- art->state == art_Unchecked ? art_Unsolicited :
- art->state == art_Wanted ? art_Wanted :
- (abort(),-1);
- art->ipf->counts[art->state][RC_sent]++;
LIST_ADDTAIL(conn->sent, art);
} else {
/* check it */
if (conn->stream)
- XMIT_LITERAL("IHAVE ");
- else
XMIT_LITERAL("CHECK ");
+ else
+ XMIT_LITERAL("IHAVE ");
xmit_noalloc(conn, art->messageid, art->midlen);
XMIT_LITERAL("\r\n");
}
static void article_done(Conn *conn, Article *art, int whichcount) {
- art->ipf->counts[art->state][whichcount]++;
+ if (!art->missing) art->ipf->counts[art->state][whichcount]++;
+
if (whichcount == RC_accepted) update_nocheck(1);
else if (whichcount == RC_unwanted) update_nocheck(0);
while (art->blanklen) {
static const char spaces[]=
+ " "
+ " "
+ " "
+ " "
+ " "
" "
" "
" "
return OOP_CONTINUE;
}
+ int conn_busy=
+ conn->waiting.count ||
+ conn->priority.count ||
+ conn->sent.count ||
+ conn->xmitu;
+
if (conn->quitting) {
if (code!=205 && code!=503) {
connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
} else {
- notice("C%d idle connection closed", conn->fd);
- assert(!conn->waiting.count);
- assert(!conn->priority.count);
- assert(!conn->sent.count);
- assert(!conn->xmitu);
+ notice("C%d idle connection closed by us", conn->fd);
+ assert(!conn_busy);
LIST_REMOVE(conns,conn);
conn_dispose(conn);
}
conn->since_activity= 0;
Article *art;
-#define GET_ARTICLE(musthavesent) \
- art= article_reply_check(conn, data, musthavesent, code_streaming, sani); \
- if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */
+#define GET_ARTICLE(musthavesent) do{ \
+ art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
+ if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */ \
+ }while(0)
-#define ARTICLE_DEALTWITH(streaming,musthavesent,how) \
- code_streaming= (streaming); \
- GET_ARTICLE(musthavesent); \
- article_done(conn, art, RC_##how); break;
+#define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \
+ code_streaming= (streaming); \
+ GET_ARTICLE(musthavesent); \
+ article_done(conn, art, RC_##how); \
+ goto dealtwith; \
+ }while(0)
-#define PEERBADMSG(m) connfail(conn, m ": %s", sani); return OOP_CONTINUE
+#define PEERBADMSG(m) do { \
+ connfail(conn, m ": %s", sani); return OOP_CONTINUE; \
+ }while(0)
int code_streaming= 0;
switch (code) {
case 400: PEERBADMSG("peer stopped accepting articles");
- case 503: PEERBADMSG("peer timed us out");
default: PEERBADMSG("peer sent unexpected message");
+ case 503:
+ if (conn_busy) PEERBADMSG("peer timed us out");
+ notice("C%d idle connection closed by peer", conn->fd);
+ LIST_REMOVE(conns,conn);
+ conn_dispose(conn);
+ return OOP_CONTINUE;
+
case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
break;
}
+dealtwith:
conn_maybe_write(conn);
check_assign_articles();
}
static InputFile *open_input_file(const char *path) {
- int fd= open(path, O_RDONLY);
+ int fd= open(path, O_RDWR);
if (fd<0) {
if (errno==ENOENT) return 0;
sysfatal("unable to open input file %s", path);
art->midlen= midlen;
art->ipf= ipf; ipf->inprogress++;
art->token= TextToToken(tokentextbuf);
- art->offset= ipf->offset;
+ art->offset= old_offset;
art->blanklen= recsz;
strcpy(art->messageid, space+1);
LIST_ADDTAIL(queue, art);
/*---------- filemon implemented with inotify ----------*/
-#if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON)
+#if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
#define HAVE_FILEMON
-#include <linux/inotify.h>
+#include <sys/inotify.h>
static int filemon_inotify_fd;
static int filemon_inotify_wdmax;
if (wd >= filemon_inotify_wdmax) {
int newmax= wd+2;
- filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf,
+ filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
sizeof(*filemon_inotify_wd2ipf) * newmax);
memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
int wd= pf->wd;
debug("filemon inotify stopfile %p wd=%d", ipf, wd);
- int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
+ int r= inotify_rm_watch(filemon_inotify_fd, wd);
if (r) sysdie("inotify_rm_watch");
filemon_inotify_wd2ipf[wd]= 0;
}
die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
}
InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
- debug("filemon inotify readable read %p wd=%p", iev.wd, ipf);
+ debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
filemon_callback(ipf);
}
return OOP_CONTINUE;
static int filemon_method_init(void) {
filemon_inotify_fd= inotify_init();
if (filemon_inotify_fd<0) {
- syswarn("could not initialise inotify: inotify_init failed");
+ syswarn("filemon/inotify: inotify_init failed");
return 0;
}
- set nonblock;
- loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
+ xsetnonblock(filemon_inotify_fd, 1);
+ loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
return 1;
struct Filemon_Perfile { int dummy; };
-static int filemon_method_init(void) { return 0; }
+static int filemon_method_init(void) {
+ warn("filemon/dummy: no filemon method compiled in");
+ return 0;
+}
static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
#define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
- info("processed %s%s read=%d(+%dbl,+%derr)"
- " offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)"
+ info("processed %s%s read=%d (+bl=%d,+err=%d)"
+ " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
,
what, spec,
const char *under= strchr(slash, '_');
const char *rest= under ? under+1 : leaf;
if (!strncmp(rest,"backlog",7)) rest += 7;
- notice_processed(ipf,"backlog:",rest);
+ notice_processed(ipf,"backlog ",rest);
close_input_file(ipf);
if (unlink(ipf->path)) {
assert(sms==sm_SEPARATED || sms==sm_DROPPING);
- notice_processed(ipf,"feedfile",0);
+ notice_processed(ipf,"feedfile","");
close_defer();
if (sms==sm_SEPARATED) {
notice("flush complete");
- SMS(NORMAL, 0, "flush complete");
+ SMS(NORMAL, spontaneous_flush_periods, "flush complete");
} else if (sms==sm_DROPPING) {
SMS(DROPPED, 0, "old flush complete");
search_backlog_file();
return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s",
ipf, path,
ipf->inprogress, (long)ipf->offset,
- ipf->fd, ipf->rd ? "+" : "");
+ ipf->fd, ipf->rd ? "" : ",!rd");
}
static void period(void) {
debug("PERIOD"
" sms=%s[%d] conns=%d queue=%d until_connect=%d"
- " input_files main:%s old:%s flushing:%s"
+ " input_files main:%s flushing:%s backlog:%s"
" children connecting=%ld inndcomm=%ld"
,
sms_names[sms], sm_period_counter,
{'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 },
{0,"no-daemon", 0, &become_daemon, op_setint, 0 },
{0,"no-streaming", 0, &try_stream, op_setint, 0 },
-{0,"inndconf", "F", &inndconffile, op_string },
+{'C',"inndconf", "F", &inndconffile, op_string },
{'P',"port", "PORT", &port, op_integer },
{0,"help", 0, 0, help },
/* defaults */
+ int r= innconf_read(inndconffile);
+ if (!r) badusage("could not read inn.conf (more info on stderr)");
+
if (!remote_host) remote_host= sitename;
if (nocheck_thresh < 0 || nocheck_thresh > 100)
max_bad_data_ratio *= 0.01;
if (!feedfile) {
- innconf_read(inndconffile);
feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
} else if (!feedfile[0]) {
badusage("feed filename must be nonempty");
control_init();
if (!filemon_method_init()) {
- warn("no file monitoring available, polling");
+ warn("filemon: no file monitoring available, polling");
every(5,0,filepoll);
}
/* let's go */
- void *r= oop_sys_run(sysloop);
- assert(r == OOP_ERROR);
+ void *run= oop_sys_run(sysloop);
+ assert(run == OOP_ERROR);
sysdie("event loop failed");
}