+/*
+ * TODO
+ * - close idle connections
+ * - -k kill mode ?
+ */
+
/*
* Newsfeeds file entries should look like this:
* host.name.of.site[/exclude,exclude,...]\
/*----- function predeclarations -----*/
-static void conn_check_work(Conn *conn);
+static void conn_maybe_write(Conn *conn);
static void conn_make_some_xmits(Conn *conn);
static void *conn_write_some_xmits(Conn *conn);
static void statemc_setstate(StateMachineState newsms, int periods,
const char *forlog, const char *why);
static void check_master_queue(void);
+static void queue_check_input_done(void);
static void postfork(const char *what);
static void postfork_inputfile(InputFile *ipf);
+static void open_defer(void);
+
/*----- configuration options -----*/
static char *sitename, *feedfile;
, counts[art_Unsolicited] x
typedef enum {
-#define RC_INDEX(x) RCI_##x,
+#define RC_INDEX(x) RC_##x,
RESULT_COUNTS(RC_INDEX, RC_INDEX)
RCI_max
} ResultCountIndex;
struct Conn {
ISNODE(Conn);
- int fd, max_queue, stream;
- ArticleList queue; /* not yet told peer, or CHECK said send it */
+ int fd, max_queue, stream, quitting;
+ ArticleList waiting; /* not yet told peer */
+ ArticleList priority; /* peer says send it now */
ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
struct iovec xmit[CONNIOVS];
XmitDetails xmitd[CONNIOVS];
/*========== making new connections ==========*/
+static void conn_dispose(Conn *conn) {
+ if (!conn) return;
+ perhaps_close(&conn->fd);
+ free(conn);
+ until_connect= reconnect_delay_periods;
+}
+
static int connecting_sockets[2]= {-1,-1};
static pid_t connecting_child;
return 0;
x:
- if (conn) {
- perhaps_close(&conn->fd);
- free(conn);
- }
+ conn_dispose(conn);
connect_attempt_discard();
}
+static int allow_connect_start(void) {
+ return conns.count < max_connections
+ && !connecting_child
+ && !until_connect;
+}
+
static void connect_start(void) {
assert(!connecting_sockets[0]);
assert(!connecting_sockets[1]);
Conn *walk, *use=0;
int spare;
+
+ /* Find a connection to offer this article. We prefer a busy
+ * connection to an idle one, provided it's not full. We take the
+ * first (oldest) and since that's stable, it will mean we fill up
+ * connections in order. That way if we have too many
+ * connections, the spare ones will go away eventually.
+ */
for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) {
- int inqueue= walk->sent.count + walk->queue.count;
+ int inqueue= walk->sent.count + walk->priority.count
+ + walk->waiting.count;
spare= walk->max_queue - inqueue;
assert(inqueue <= max_queue_per_conn);
assert(spare >= 0);
if (use) {
while (spare>0) {
Article *art= LIST_REMHEAD(queue);
- LIST_ADDTAIL(use->queue, art);
+ LIST_ADDTAIL(use->waiting, art);
spare--;
}
- conn_check_work(use);
- } else if (conns.count < max_connections &&
- !connecting_child && !until_connect) {
+ conn_maybe_write(use);
+ } else if (allow_connect_start()) {
until_connect= reconnect_delay_periods;
connect_start();
break;
}
static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
- conn_check_work(u);
+ conn_maybe_write(u);
return OOP_CONTINUE;
}
-static void conn_check_work(Conn *conn) {
+static void conn_maybe_write(Conn *conn) {
void *rp= 0;
for (;;) {
conn_make_some_xmits(conn);
int requeue[art_MaxState];
Article *art;
- while ((art= LIST_REMHEAD(conn->queue))) LIST_ADDTAIL(queue, art);
+ while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art);
+ while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art);
while ((art= LIST_REMHEAD(conn->sent))) {
requeue[art->state]++;
if (art->state==art_Unsolicited) art->state= art_Unchecked;
conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
free(m);
- close(conn->fd);
LIST_REMOVE(conns,conn);
- free(conn);
-
- until_connect= reconnect_delay_periods;
+ conn_dispose(conn);
check_master_queue();
}
if (conn->xmitu+5 > CONNIOVS)
break;
- Article *art= LIST_REMHEAD(queue);
+ Article *art= LIST_REMHEAD(conn->priority);
+ if (!art) art= LIST_REMHEAD(conn->waiting);
if (!art) break;
if (art->state >= art_Wanted || (conn->stream && nocheck)) {
art->state == art_Unchecked ? art_Unsolicited :
art->state == art_Wanted ? art_Wanted :
(abort(),-1);
- art->ipf->counts[art->state][RCI_sent]++;
+ art->ipf->counts[art->state][RC_sent]++;
LIST_ADDTAIL(conn->sent, art);
} else {
XMIT_LITERAL("\r\n");
assert(art->state == art_Unchecked);
- art->ipf->counts[art->state][RCI_sent]++;
+ art->ipf->counts[art->state][RC_sent]++;
LIST_ADDTAIL(conn->sent, art);
}
}
}
static void update_nocheck(int accepted) {
- accept_proportion *= accept_decay;
- accept_proportion += accepted;
+ accept_proportion *= nocheck_decay;
+ accept_proportion += accepted * (1.0 - nocheck_decay);
int new_nocheck= accept_proportion >= nocheck_thresh;
if (new_nocheck && !nocheck_reported) {
notice("entering nocheck mode for the first time");
static void article_done(Conn *conn, Article *art, int whichcount) {
art->ipf->counts[art->state][whichcount]++;
- if (whichcount == RCI_accepted) update_nocheck(1);
- else if (whichcount == RCI_unwanted) update_nocheck(0);
+ if (whichcount == RC_accepted) update_nocheck(1);
+ else if (whichcount == RC_unwanted) update_nocheck(0);
InputFile *ipf= art->ipf;
while (art->blanklen) {
}
assert(ev == OOP_RD_OK);
- char *sani= sanitise(data, recsz);
+ char *sani= sanitise(data);
char *ep;
unsigned long code= strtoul(data, &ep, 10);
}
if (conn->quitting) {
- if (code!=205) {
- connfail(conn, "peer gave failure response to QUIT: %s", sani);
- return OOP_CONTINUE;
+ if (code!=205 && code!=503) {
+ connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
+ } else {
+ notice("#%d idle connection closed\n");
+ assert(!conn->waiting.count);
+ assert(!conn->priority.count);
+ assert(!conn->sent.count);
+ assert(!conn->xmitu);
+ LIST_REMOVE(conns,conn);
+ conn_dispose(conn);
}
- conn close ok;
- return;
+ return OOP_CONTINUE;
}
Article *art;
if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */
#define ARTICLE_DEALTWITH(streaming,musthavesent,how) \
- code_streaming= (streaming) \
+ code_streaming= (streaming); \
GET_ARTICLE(musthavesent); \
article_done(conn, art, RC_##how); break;
case 335: /* IHAVE says send it */
GET_ARTICLE(-1);
assert(art->state == art_Unchecked);
- art->ipf->counts[art->state].accepted++;
+ art->ipf->counts[art->state][RC_accepted]++;
art->state= art_Wanted;
- LIST_ADDTAIL(conn->queue);
+ LIST_ADDTAIL(conn->priority, art);
break;
case 431: /* CHECK or TAKETHIS says try later */
}
- check_check_work(conn);
+ conn_maybe_write(conn);
+ check_master_queue();
return OOP_CONTINUE;
}