typedef struct Conn Conn;
typedef struct Article Article;
typedef struct InputFile InputFile;
+typedef struct XmitDetails XmitDetails;
typedef enum StateMachineState StateMachineState;
DEFLIST(Conn);
static void conn_make_some_xmits(Conn *conn);
static void *conn_write_some_xmits(Conn *conn);
+static void xmit_free(XmitDetails *d);
+
static int filemon_init(void);
static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path);
static void filemon_callback(void);
xk_Malloc, xk_Const, xk_Artdata
} XmitKind;
-typedef struct {
+struct XmitDetails {
XmitKind kind;
union {
char *malloc_tofree;
ARTHANDLE *sm_art;
} info;
-} XmitDetails;
+};
/*----- core operational data structure types -----*/
__attribute__((__format__(printf,5,0)));
static void logv(int sysloglevel, const char *pfx, int errnoval,
int exitstatus, const char *fmt, va_list al) {
- char msgbuf[256];
+ char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */
vsnprintf(msgbuf,sizeof(msgbuf), fmt,al);
msgbuf[sizeof(msgbuf)-1]= 0;
/*========== utility functions etc. ==========*/
+static char *xvasprintf(const char *fmt, va_list al)
+ __attribute__((__format__(printf,1,0)));
+static char *xvasprintf(const char *fmt, va_list al) {
+ char *str;
+ int rc= vasprintf(&str,fmt,al);
+ if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt);
+ return str;
+}
+static char *xasprintf(const char *fmt, ...)
+ __attribute__((__format__(printf,1,2)));
+static char *xasprintf(const char *fmt, ...) {
+ va_list al;
+ va_start(al,fmt);
+ char *str= xvasprintf(fmt,al);
+ va_end(al);
+ return str;
+}
+
static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
static pid_t xfork(const char *what) {
fatal("connect: child gave unexpected exit status %d", es);
}
- setnonblock(conn->fd, 1);
-
/* Phew! */
+ setnonblock(conn->fd, 1);
+ conn->max_queue= conn->stream ? max_queue_per_conn : 1;
LIST_ADDHEAD(idle, conn);
notice("#%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
connect_attempt_discard();
/*========== overall control of article flow ==========*/
+static int conn_owned_articles(Conn *conn) {
+ return conn->sent.count + conn->queue.count;
+}
+
static void check_master_queue(void) {
if (!queue.count)
return;
conn_assign_one_article(&working, &last_assigned);
} else if (idle.head) {
conn_assign_one_article(&idle, &last_assigned);
- } else if (nconns < max_connections &&
- conn_total_queued_articles(conn) >= max_queue_per_conn &&
+ } else if (full.count < max_connections &&
!connecting_child && !until_connect) {
until_connect= reconnect_delay_periods;
connect_start();
*last_assigned= conn;
}
-static int conn_total_queued_articles(Conn *conn) {
- return conn->sent.count + conn->queue.count;
-}
-
static ConnList *conn_determine_right_list(Conn *conn) {
- int inqueue= conn_total_queued_articles(conn);
+ int inqueue= conn_owned_articles(conn);
assert(inqueue <= max_queue_per_conn);
if (inqueue == 0) return &idle;
if (inqueue == conn->max_queue) return &full;
Article *art;
while ((art= LIST_REMHEAD(conn->queue))) LIST_ADDTAIL(queue, art);
while ((art= LIST_REMHEAD(conn->sent))) {
- counts[art->state]++;
+ requeue[art->state]++;
if (art->state==art_Unsolicited) art->state= art_Unchecked;
- LIST_ADDTAIL(queue);
+ LIST_ADDTAIL(queue,art);
}
int i;
- XmitDetails *xd;
- for (i=0, dp=&conn->xmitd; i<conn->xmitu; i++, dp++)
- xmit_free(dp);
+ XmitDetails *d;
+ for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
+ xmit_free(d);
char *m= xvasprintf(fmt,al);
warn("#%d connection failed, requeueing " RCI_TRIPLE_FMT_BASE ": %s",
free(m);
close(conn->fd);
+ fixme remove conn from the appropriate list;
free(conn);
until_connect= reconnect_delay_periods;