/*
* TODO
- * - close idle connections
- * - cope better with garbage in feed file
- * - cope better with NULs in feed file
+ * - make period size a tuneable
* - check all structs initialised
* - check all fd watches properly undone
* - check all init functions called
/*============================== PROGRAM ==============================*/
-#define _GNU_SOURCE
+#define _GNU_SOURCE 1
#include "config.h"
#include "storage.h"
#define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
+#define VA va_list al; va_start(al,fmt)
+#define PRINTF(f,a) __attribute__((__format__(printf,f,a)))
+#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
+
/*----- doubly linked lists -----*/
#define ISNODE(T) struct { T *succ, *pred; } node /* must be at start */
static void filemon_stop(InputFile *ipf);
static void filemon_callback(InputFile *ipf);
+static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
+static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
+
/*----- configuration options -----*/
static const char *sitename, *feedfile, *pathoutgoing;
static int inndcomm_flush_timeout=100;
static int reconnect_delay_periods, flushfail_retry_periods, open_wait_periods;
static int backlog_retry_minperiods, backlog_spontaneous_rescan_periods;
-static int spontaneous_flush_periods;
+static int spontaneous_flush_periods, need_activity_periods;
static const char *inndconffile;
static double nocheck_thresh_pct= 95.0;
struct Conn {
ISNODE(Conn);
int fd, max_queue, stream, quitting;
+ int since_activity; /* periods */
ArticleList waiting; /* not yet told peer */
ArticleList priority; /* peer says send it now */
ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
/*========== logging ==========*/
-#define VA va_list al; va_start(al,fmt)
-#define PRINTF(f,a) __attribute__((__format__(printf,f,a)))
-#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
-
static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3);
static void logcore(int sysloglevel, const char *fmt, ...) {
VA;
return sanibuf;
}
+static int isewouldblock(int errnoval) {
+ return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
+}
+
/*========== making new connections ==========*/
static void conn_dispose(Conn *conn) {
connect_attempt_discard();
}
+static void check_idle_conns(void) {
+ Conn *conn;
+ for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn))
+ conn->since_activity++;
+ search_again:
+ for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) {
+ if (conn->since_activity <= need_activity_periods) continue;
+
+ /* We need to shut this down */
+ if (conn->quitting)
+ connfail(conn,"timed out waiting for response to QUIT");
+ else if (conn->sent.count)
+ connfail(conn,"timed out waiting for responses");
+ else if (conn->waiting.count || conn->priority.count)
+ connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
+ else if (conn->xmitu)
+ connfail(conn,"peer has been sending responses"
+ " before receiving our commands!");
+ else {
+ static const char quitcmd[]= "QUIT\r\n";
+ int todo= sizeof(quitcmd)-1;
+ const char *p= quitcmd;
+ for (;;) {
+ int r= write(conn->fd, p, todo);
+ if (r<0) {
+ if (isewouldblock(errno))
+ connfail(conn, "blocked writing QUIT to idle connection");
+ else
+ connfail(conn, "failed to write QUIT to idle connection: %s",
+ strerror(errno));
+ break;
+ }
+ assert(r<=todo);
+ todo -= r;
+ if (!todo) {
+ conn->quitting= 1;
+ conn->since_activity= 0;
+ debug("C%d is idle, quitting", conn->fd);
+ break;
+ }
+ }
+ }
+ goto search_again;
+ }
+}
/*========== overall control of article flow ==========*/
break;
Conn *walk, *use=0;
- int spare;
+ int spare, inqueue;
/* Find a connection to offer this article. We prefer a busy
* connection to an idle one, provided it's not full. We take the
* connections, the spare ones will go away eventually.
*/
for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) {
- int inqueue= walk->sent.count + walk->priority.count
- + walk->waiting.count;
+ if (walk->quitting) continue;
+ inqueue= walk->sent.count + walk->priority.count
+ + walk->waiting.count;
spare= walk->max_queue - inqueue;
assert(inqueue <= max_queue_per_conn);
assert(spare >= 0);
else if (spare>0) /*working*/ { use= walk; break; }
}
if (use) {
+ if (!inqueue) use->since_activity= 0; /* reset idle counter */
while (spare>0) {
Article *art= LIST_REMHEAD(queue);
LIST_ADDTAIL(use->waiting, art);
}
}
-static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
static void vconnfail(Conn *conn, const char *fmt, va_list al) {
int requeue[art_MaxState];
check_master_queue();
}
-static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
static void connfail(Conn *conn, const char *fmt, ...) {
va_list al;
va_start(al,fmt);
if (count > IOV_MAX) count= IOV_MAX;
ssize_t rs= writev(conn->fd, conn->xmit, count);
if (rs < 0) {
- if (errno == EAGAIN) return OOP_CONTINUE;
+ if (isewouldblock(errno)) return OOP_CONTINUE;
connfail(conn, "write failed: %s", strerror(errno));
return OOP_HALT;
}
if (code!=205 && code!=503) {
connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
} else {
- notice("C%d idle connection closed\n");
+ notice("C%d idle connection closed", conn->fd);
assert(!conn->waiting.count);
assert(!conn->priority.count);
assert(!conn->sent.count);
return OOP_CONTINUE;
}
+ conn->since_activity= 0;
Article *art;
#define GET_ARTICLE(musthavesent) \
for (;;) {
int r= read(filemon_inotify_fd, &iev, sizeof(iev));
if (r==-1) {
- if (errno==EAGAIN) break;
+ if (isewouldblock(errno)) break;
sysdie("read from inotify master");
} else if (r==sizeof(iev)) {
assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
fl.l_whence= SEEK_SET;
int r= fcntl(lockfd, F_SETLK, &fl);
if (r==-1) {
- if (errno==EACCES || errno==EAGAIN) {
+ if (errno==EACCES || isewouldblock(errno)) {
if (quiet_multiple) exit(0);
fatal("another duct holds the lockfile");
}
if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
statemc_period_poll();
check_master_queue();
+ check_idle_conns();
});