/*
* TODO
* - close idle connections
+ * - cope better with garbage in feed file
+ * - cope better with NULs in feed file
* - -k kill mode ?
*/
static void check_master_queue(void);
static void queue_check_input_done(void);
+static void statemc_start_flush(const char *why); /* Normal => Flushing */
+static void statemc_check_flushing_done(void);
+static void statemc_check_backlog_done(void);
+
static void postfork(const char *what);
static void postfork_inputfile(InputFile *ipf);
+static void open_defer(void);
+
+static void inputfile_tailing_start(InputFile *ipf);
+static void inputfile_tailing_stop(InputFile *ipf);
+
/*----- configuration options -----*/
static char *sitename, *feedfile;
static int quiet_multiple=0, become_daemon=1;
static int max_connections=10, max_queue_per_conn=200;
+static int target_max_feedfile_size=100000;
static int connection_setup_timeout=200, port=119, try_stream=1;
static int inndcomm_flush_timeout=100;
struct Conn {
ISNODE(Conn);
int fd, max_queue, stream, quitting;
- ArticleList queue; /* not yet told peer, or CHECK said send it */
+ ArticleList waiting; /* not yet told peer */
+ ArticleList priority; /* peer says send it now */
ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
struct iovec xmit[CONNIOVS];
XmitDetails xmitd[CONNIOVS];
connect_attempt_discard();
}
+static int allow_connect_start(void) {
+ return conns.count < max_connections
+ && !connecting_child
+ && !until_connect;
+}
+
static void connect_start(void) {
assert(!connecting_sockets[0]);
assert(!connecting_sockets[1]);
Conn *walk, *use=0;
int spare;
+
+ /* Find a connection to offer this article. We prefer a busy
+ * connection to an idle one, provided it's not full. We take the
+ * first (oldest) and since that's stable, it will mean we fill up
+ * connections in order. That way if we have too many
+ * connections, the spare ones will go away eventually.
+ */
for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) {
- int inqueue= walk->sent.count + walk->queue.count;
+ int inqueue= walk->sent.count + walk->priority.count
+ + walk->waiting.count;
spare= walk->max_queue - inqueue;
assert(inqueue <= max_queue_per_conn);
assert(spare >= 0);
if (use) {
while (spare>0) {
Article *art= LIST_REMHEAD(queue);
- LIST_ADDTAIL(use->queue, art);
+ LIST_ADDTAIL(use->waiting, art);
spare--;
}
conn_maybe_write(use);
- } else if (conns.count < max_connections &&
- !connecting_child && !until_connect) {
+ } else if (allow_connect_start()) {
until_connect= reconnect_delay_periods;
connect_start();
break;
int requeue[art_MaxState];
Article *art;
- while ((art= LIST_REMHEAD(conn->queue))) LIST_ADDTAIL(queue, art);
+ while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art);
+ while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art);
while ((art= LIST_REMHEAD(conn->sent))) {
requeue[art->state]++;
if (art->state==art_Unsolicited) art->state= art_Unchecked;
if (conn->xmitu+5 > CONNIOVS)
break;
- Article *art= LIST_REMHEAD(queue);
+ Article *art= LIST_REMHEAD(conn->priority);
+ if (!art) art= LIST_REMHEAD(conn->waiting);
if (!art) break;
if (art->state >= art_Wanted || (conn->stream && nocheck)) {
XMIT_LITERAL("\r\n");
assert(art->state == art_Unchecked);
- art->ipf->counts[art->state][RCI_sent]++;
+ art->ipf->counts[art->state][RC_sent]++;
LIST_ADDTAIL(conn->sent, art);
}
}
connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
} else {
notice("#%d idle connection closed\n");
- assert(!conn->queue.count);
+ assert(!conn->waiting.count);
+ assert(!conn->priority.count);
assert(!conn->sent.count);
assert(!conn->xmitu);
LIST_REMOVE(conns,conn);
assert(art->state == art_Unchecked);
art->ipf->counts[art->state][RC_accepted]++;
art->state= art_Wanted;
- LIST_ADDTAIL(conn->queue, art);
+ LIST_ADDTAIL(conn->priority, art);
break;
case 431: /* CHECK or TAKETHIS says try later */
}
- conn_maybe_write(sendon);
+ conn_maybe_write(conn);
check_master_queue();
return OOP_CONTINUE;
}
InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
memset(ipf,0,sizeof(*ipf));
- ipf->readable.on_readable= tailing_on_readable;
- ipf->readable.on_cancel= tailing_on_cancel;
- ipf->readable.try_read= tailing_try_read;
-
ipf->fd= fd;
strcpy(ipf->path, path);
/*---------- dealing with articles read in the input file ----------*/
-typedef void *feedfile_got_article(oop_source *lp, oop_read *rd,
- oop_rd_event ev, const char *errmsg,
- int errnoval,
- const char *data, size_t recsz,
- void *ipf_v) {
+void *feedfile_got_article(oop_source *lp, oop_read *rd,
+ oop_rd_event ev, const char *errmsg,
+ int errnoval, const char *data, size_t recsz,
+ void *ipf_v) {
InputFile *ipf= ipf_v;
Article *art;
char tokentextbuf[sizeof(TOKEN)*2+3];
ipf->offset += recsz + 1;
if (sms==sm_NORMAL && ipf==main_input_file &&
- ipf->offset >= flush_threshold)
+ ipf->offset >= target_max_feedfile_size)
statemc_start_flush("feed file size");
check_master_queue();
}
-static void statemc_start_flush(const char *why) { /* Normal => Flushing */
- assert(sms == sm_NORMAL);
-
- debug("starting flush (%s) (%lu >= %lu) (%d)",
- why,
- (unsigned long)ipf->offset, (unsigned long)flush_threshold,
- sm_period_counter);
-
- int r= link(feedfile, duct_path);
- if (r) sysdie("link feedfile %s to flushing file %s", feedfile,
- path_duct);
- /* => Hardlinked */
-
- xunlink(feedfile, "old feedfile link");
- /* => Moved */
-
- spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
-}
-
/*========== tailing input file ==========*/
static void filemon_start(InputFile *ipf) {
}
}
+static void statemc_start_flush(const char *why) { /* Normal => Flushing */
+ assert(sms == sm_NORMAL);
+
+ debug("starting flush (%s) (%lu >= %lu) (%d)",
+ why,
+ (unsigned long)ipf->offset, (unsigned long)flush_threshold,
+ sm_period_counter);
+
+ int r= link(feedfile, duct_path);
+ if (r) sysdie("link feedfile %s to flushing file %s", feedfile,
+ path_duct);
+ /* => Hardlinked */
+
+ xunlink(feedfile, "old feedfile link");
+ /* => Moved */
+
+ spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
+}
+
static void statemc_period_poll(void) {
if (!sm_period_counter) return;
sm_period_counter--;