#define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
#define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
-#define LIST_HEAD(l) ((typeof((l)->hd))(list_head((struct list*)&(l))))
+#define LIST_HEAD(l) ((typeof((l).hd))(list_head((struct list*)&(l))))
#define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
#define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path);
static void filemon_callback(void);
-static ConnList *conn_determine_right_list(Conn *conn);
-static void conn_assign_one_article(ConnList *connlist, Conn **last_assigned);
static void statemc_setstate(StateMachineState newsms, int periods,
const char *forlog, const char *why);
static void check_master_queue(void);
/*========== overall control of article flow ==========*/
-static int conn_owned_articles(Conn *conn) {
- return conn->sent.count + conn->queue.count;
-}
-
static void check_master_queue(void) {
- if (!queue.count)
- return;
-
- Conn *last_assigned=0;
for (;;) {
- if (working.head) {
- conn_assign_one_article(&working, &last_assigned);
- } else if (idle.head) {
- conn_assign_one_article(&idle, &last_assigned);
- } else if (full.count < max_connections &&
- !connecting_child && !until_connect) {
+ if (!queue.count)
+ break;
+
+ Conn *walk, *use=0;
+ int spare;
+ for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) {
+ int inqueue= walk->sent.count + walk->queue.count;
+ spare= walk->max_queue - inqueue;
+ assert(inqueue <= max_queue_per_conn);
+ assert(spare >= 0);
+ if (inqueue==0) /*idle*/ { if (!use) use= walk; }
+ else if (spare>0) /*working*/ { use= walk; break; }
+ }
+ if (use) {
+ while (spare>0) {
+ Article *art= LIST_REMHEAD(queue);
+ LIST_ADDTAIL(use->queue, art);
+ spare--;
+ }
+ conn_check_work(use);
+ } else if (conns.count < max_connections &&
+ !connecting_child && !until_connect) {
until_connect= reconnect_delay_periods;
connect_start();
+ break;
} else {
break;
}
}
- conn_check_work(last_assigned);
-}
-
-static void conn_assign_one_article(ConnList *connlist, Conn **last_assigned) {
- Conn *conn= connlist->head;
-
- LIST_REMOVE(*connlist, conn);
- Article *art= LIST_REMHEAD(queue);
- LIST_ADDTAIL(conn->queue, art);
- LIST_ADDTAIL(*conn_determine_right_list(conn), conn);
-
- /* This slightly odd arrangement is so that we call conn_check_work
- * once after filling the queue for a new connection in
- * check_master_queue, rather than for each article. */
- if (conn != *last_assigned && *last_assigned)
- conn_check_work(*last_assigned);
- *last_assigned= conn;
-}
-
-static ConnList *conn_determine_right_list(Conn *conn) {
- int inqueue= conn_owned_articles(conn);
- assert(inqueue <= max_queue_per_conn);
- if (inqueue == 0) return &idle;
- if (inqueue == conn->max_queue) return &full;
- return &working;
}
static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
ipf->fd= -1;
}
-static void postfork_conns(Connection *conn) {
- while (conn) {
- close(conn->fd);
- conn= conn->next;
- }
-}
-
static void postfork_stdio(FILE *f) {
/* we have no stdio streams that are buffered long-term */
if (f) fclose(f);
postfork_inputfile(main_input_file);
postfork_inputfile(flushing_input_file);
- postfork_conns(idle.head);
- postfork_conns(working.head);
- postfork_conns(full.head);
+
+ Conn *conn;
+ for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn))
+ close(conn->fd);
+
postfork_stdio(defer);
}
EVERY(period, {PERIOD_SECONDS,0}, {
debug("PERIOD"
- " sms=%s[%d] queue=%d until_connect=%d"
+ " sms=%s[%d] conns=%d queue=%d until_connect=%d"
" input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_FMT(flushing)
- " conns idle=%d working=%d full=%d"
" children connecting=%ld inndcomm_child"
,
- sms_names[sms], sm_period_counter, queue.count, until_connect,
+ sms_names[sms], sm_period_counter,
+ queue.count, conns.count, until_connect,
DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing),
- idle.count, working.count, full.count,
(long)connecting_child, (long)inndcomm_child
);