From: Ian Jackson Date: Tue, 27 Apr 2010 15:13:36 +0000 (+0100) Subject: move some code about; rename check_master_queue to check_assign_articles X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=commitdiff_plain;h=a93cc4b6928fd81d305f53c981fffb281020d766;hp=7b2769c6878f854cb3601fceb3b14383852f9cdf move some code about; rename check_master_queue to check_assign_articles --- diff --git a/backends/innduct.c b/backends/innduct.c index 92e8006..844583e 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -282,7 +282,7 @@ static void statemc_setstate(StateMachineState newsms, int periods, static void statemc_start_flush(const char *why); /* Normal => Flushing */ static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ -static void check_master_queue(void); +static void check_assign_articles(void); static void queue_check_input_done(void); static void statemc_check_flushing_done(void); @@ -675,7 +675,7 @@ static int isewouldblock(int errnoval) { return errnoval==EWOULDBLOCK || errnoval==EAGAIN; } -/*========== making new connections ==========*/ +/*========== management of connections ==========*/ static void conn_dispose(Conn *conn) { if (!conn) return; @@ -688,6 +688,101 @@ static void conn_dispose(Conn *conn) { until_connect= reconnect_delay_periods; } +static void *conn_exception(oop_source *lp, int fd, + oop_event ev, void *conn_v) { + Conn *conn= conn_v; + unsigned char ch; + assert(fd == conn->fd); + assert(ev == OOP_EXCEPTION); + int r= read(conn->fd, &ch, 1); + if (r<0) connfail(conn,"read failed: %s",strerror(errno)); + else connfail(conn,"exceptional condition on socket (peer sent urgent" + " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch); + return OOP_CONTINUE; +} + +static void vconnfail(Conn *conn, const char *fmt, va_list al) { + int requeue[art_MaxState]; + + Article *art; + while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->sent))) { + requeue[art->state]++; + if (art->state==art_Unsolicited) art->state= art_Unchecked; + LIST_ADDTAIL(queue,art); + } + + int i; + XmitDetails *d; + for (i=0, d=conn->xmitd; ixmitu; i++, d++) + xmit_free(d); + + char *m= xvasprintf(fmt,al); + warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s", + conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); + free(m); + + LIST_REMOVE(conns,conn); + conn_dispose(conn); + check_assign_articles(); +} + +static void connfail(Conn *conn, const char *fmt, ...) { + va_list al; + va_start(al,fmt); + vconnfail(conn,fmt,al); + va_end(al); +} + +static void check_idle_conns(void) { + Conn *conn; + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) + conn->since_activity++; + search_again: + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) { + if (conn->since_activity <= need_activity_periods) continue; + + /* We need to shut this down */ + if (conn->quitting) + connfail(conn,"timed out waiting for response to QUIT"); + else if (conn->sent.count) + connfail(conn,"timed out waiting for responses"); + else if (conn->waiting.count || conn->priority.count) + connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); + else if (conn->xmitu) + connfail(conn,"peer has been sending responses" + " before receiving our commands!"); + else { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= 1; + conn->since_activity= 0; + debug("C%d is idle, quitting", conn->fd); + break; + } + } + } + goto search_again; + } +} + +/*---------- making new connections ----------*/ + static int connecting_sockets[2]= {-1,-1}; static pid_t connecting_child; @@ -799,7 +894,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { LIST_ADDHEAD(conns, conn); connect_attempt_discard(); - check_master_queue(); + check_assign_articles(); return 0; x: @@ -807,18 +902,6 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { connect_attempt_discard(); } -static void conn_exception(oop_source *lp, int fd, - oop_event ev, void *conn_v) { - Conn *conn= conn_v; - char ch; - assert(fd == conn->fd); - assert(ev == OOP_EXCEPTION); - r= read(conn->fd, &ch, 1); - if (r<0) connfail(conn,"read failed: %s",strerror(errno)); - connfail(conn,"exceptional condition on socket (peer sent urgent" - " data? read(,,1)=%d)",r); -} - static int allow_connect_start(void) { return conns.count < max_connections && !connecting_child @@ -915,55 +998,9 @@ static void connect_start(void) { connect_attempt_discard(); } -static void check_idle_conns(void) { - Conn *conn; - for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) - conn->since_activity++; - search_again: - for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) { - if (conn->since_activity <= need_activity_periods) continue; - - /* We need to shut this down */ - if (conn->quitting) - connfail(conn,"timed out waiting for response to QUIT"); - else if (conn->sent.count) - connfail(conn,"timed out waiting for responses"); - else if (conn->waiting.count || conn->priority.count) - connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); - else if (conn->xmitu) - connfail(conn,"peer has been sending responses" - " before receiving our commands!"); - else { - static const char quitcmd[]= "QUIT\r\n"; - int todo= sizeof(quitcmd)-1; - const char *p= quitcmd; - for (;;) { - int r= write(conn->fd, p, todo); - if (r<0) { - if (isewouldblock(errno)) - connfail(conn, "blocked writing QUIT to idle connection"); - else - connfail(conn, "failed to write QUIT to idle connection: %s", - strerror(errno)); - break; - } - assert(r<=todo); - todo -= r; - if (!todo) { - conn->quitting= 1; - conn->since_activity= 0; - debug("C%d is idle, quitting", conn->fd); - break; - } - } - } - goto search_again; - } -} - -/*========== overall control of article flow ==========*/ +/*---------- assigning articles to conns, and transmitting ----------*/ -static void check_master_queue(void) { +static void check_assign_articles(void) { for (;;) { if (!queue.count) break; @@ -1033,40 +1070,6 @@ static void conn_maybe_write(Conn *conn) { } } -static void vconnfail(Conn *conn, const char *fmt, va_list al) { - int requeue[art_MaxState]; - - Article *art; - while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); - while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); - while ((art= LIST_REMHEAD(conn->sent))) { - requeue[art->state]++; - if (art->state==art_Unsolicited) art->state= art_Unchecked; - LIST_ADDTAIL(queue,art); - } - - int i; - XmitDetails *d; - for (i=0, d=conn->xmitd; ixmitu; i++, d++) - xmit_free(d); - - char *m= xvasprintf(fmt,al); - warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s", - conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); - free(m); - - LIST_REMOVE(conns,conn); - conn_dispose(conn); - check_master_queue(); -} - -static void connfail(Conn *conn, const char *fmt, ...) { - va_list al; - va_start(al,fmt); - vconnfail(conn,fmt,al); - va_end(al); -} - /*========== article transmission ==========*/ static XmitDetails *xmit_core(Conn *conn, const char *data, int len, @@ -1404,7 +1407,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, } conn_maybe_write(conn); - check_master_queue(); + check_assign_articles(); return OOP_CONTINUE; } @@ -1550,7 +1553,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->offset >= target_max_feedfile_size) statemc_start_flush("feed file size"); - check_master_queue(); + check_assign_articles(); return OOP_CONTINUE; } @@ -2484,7 +2487,7 @@ EVERY(period, -1,0, { poll_backlog_file(); if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_period_poll(); - check_master_queue(); + check_assign_articles(); check_idle_conns(); });