/*
* todo
- * - rename defraise to raise_default
- * - xmalloc + memset -> xcalloc
- * - macro for conn iteration
- * - skipping_long offset calculation is wrong
- * - reset signals TERM and INT (and HUP) in children
- *
+ * - abolish xk_Malloc
+ * - rename sm_period_counter as it's just about flushes
* - manpage: document control master stuff
- * - admin-initiated flush
*
* debugging rune:
- * build-lfs/backends/innduct --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
+ * build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
*/
/*
#define VA va_list al; va_start(al,fmt)
#define PRINTF(f,a) __attribute__((__format__(printf,f,a)))
#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
+#define NORET __attribute__((__noreturn__))
+
+#define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr))))
+#define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
#define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
+#define FOR_CONN(conn) \
+ for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
+
/*----- doubly linked lists -----*/
#define ISNODE(T) struct node list_node
static void statemc_start_flush(const char *why); /* Normal => Flushing */
static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
+static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
static void article_done(Conn *conn, Article *art, int whichcount);
static void close_defer(void);
static void search_backlog_file(void);
static void preterminate(void);
-static void defraise(int signo);
+static void raise_default(int signo) NORET;
static char *debug_report_ipf(InputFile *ipf);
static void inputfile_reading_start(InputFile *ipf);
return status;
}
+static void *zxmalloc(size_t sz) {
+ void *p= xmalloc(sz);
+ memset(p,0,sz);
+ return p;
+}
+
static void xunlink(const char *path, const char *what) {
int r= unlink(path);
if (r) sysdie("can't unlink %s %s", path, what);
return now;
}
-static void xsigaction(int s, const struct sigaction *sa) {
- int r= sigaction(s,sa,0);
- if (r) sysdie("sigaction failed for \"%s\"", strsignal(s));
+static void xsigaction(int signo, const struct sigaction *sa) {
+ int r= sigaction(signo,sa,0);
+ if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
+}
+
+static void xsigsetdefault(int signo) {
+ struct sigaction sa;
+ memset(&sa,0,sizeof(sa));
+ sa.sa_handler= SIG_DFL;
+ xsigaction(signo,&sa);
}
static void xgettimeofday(struct timeval *tv_r) {
fprintf(cc->out, " %s\n", ccmd->cmd);
}
-CCMD(period) { period(); }
-CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
-CCMD(setint) { *(int*)c->xdata= c->xval; }
-CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
-CCMD(dump);
+CCMD(flush) {
+ int ok= trigger_flush_ok();
+ if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
+}
CCMD(stop) {
preterminate();
notice("terminating (CTRL%d)",cc->fd);
- defraise(SIGTERM);
+ raise_default(SIGTERM);
abort();
}
+CCMD(dump);
+
+/* messing with our head: */
+CCMD(period) { period(); }
+CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
+CCMD(setint) { *(int*)c->xdata= c->xval; }
+CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
+
static const ControlCommand control_commands[]= {
{ "h", ccmd_help },
- { "p", ccmd_period },
+ { "flush", ccmd_flush },
{ "stop", ccmd_stop },
{ "dump q", ccmd_dump, 0,0 },
{ "dump a", ccmd_dump, 0,1 },
+ { "p", ccmd_period },
+
#define POKES(cmd,func) \
{ cmd "sm", func, &sm_period_counter, 1 }, \
{ cmd "conn", func, &until_connect, 0 }, \
}
static void control_stdio(void) {
- ControlConn *cc= xmalloc(sizeof(*cc));
- memset(cc,0,sizeof(*cc));
+ NEW_DECL(ControlConn *,cc);
cc->destroy= control_stdio_destroy;
cc->fd= 0;
static void *control_master_readable(oop_source *lp, int master,
oop_event ev, void *u) {
- ControlConn *cc= xmalloc(sizeof(*cc));
- memset(cc,0,sizeof(*cc));
+ NEW_DECL(ControlConn *,cc);
cc->destroy= control_accepted_destroy;
cc->salen= sizeof(cc->sa);
static void check_idle_conns(void) {
Conn *conn;
- for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn))
+ FOR_CONN(conn)
conn->since_activity++;
search_again:
- for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) {
+ FOR_CONN(conn) {
if (conn->since_activity <= need_activity_periods) continue;
/* We need to shut this down */
goto x;
}
- conn= xmalloc(sizeof(*conn));
- memset(conn,0,sizeof(*conn));
+ NEW(conn);
LIST_INIT(conn->waiting);
LIST_INIT(conn->priority);
LIST_INIT(conn->sent);
* connections in order. That way if we have too many
* connections, the spare ones will go away eventually.
*/
- for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) {
+ FOR_CONN(walk) {
if (walk->quitting) continue;
inqueue= walk->sent.count + walk->priority.count
+ walk->waiting.count;
if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
off_t old_offset= ipf->offset;
- ipf->offset += recsz + 1;
+ ipf->offset += recsz + !!(ev == OOP_RD_OK);
#define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
ipf->readcount_ok++;
art= xmalloc(sizeof(*art) - 1 + midlen + 1);
- memset(art,0,sizeof(art));
+ memset(art,0,sizeof(*art));
art->state= art_Unchecked;
art->midlen= midlen;
art->ipf= ipf; ipf->inprogress++;
static void filemon_start(InputFile *ipf) {
assert(!ipf->filemon);
- ipf->filemon= xmalloc(sizeof(*ipf->filemon));
- memset(ipf->filemon, 0, sizeof(*ipf->filemon));
+ NEW(ipf->filemon);
filemon_method_startfile(ipf, ipf->filemon);
}
| flsh->rd!=0 | | flsh->rd!=0
| [Separated] | | [Dropping]
| main F idle | | main none
- | old D tail | | old D tail
+ | flsh D tail | | flsh D tail
| ============= | | ============
| | | | install |
^ | EOF ON D | | defer | EOF ON D
| flsh->rd==0 | V flsh->rd==0
| [Finishing] | | [Dropping]
| main F tail | `. main none
- | old D closed | `. old D closed
+ | flsh D closed | `. flsh D closed
| =============== V `. ===============
| | `. |
| | ALL D PROCESSED `. | ALL D PROCESSED
DROPPED
[Dropped]
main none
- old none
+ flsh none
some backlog
==============
|
if (file_d) {
debug("startup: F!=D => Separated");
startup_set_input_file(file_d);
+ flushing_input_file= main_input_file;
+ main_input_file= open_input_file(feedfile);
+ if (!main_input_file) die("feedfile vanished during startup");
SMS(SEPARATED, 0, "found both old and current feed files");
} else {
debug("startup: F exists, D ENOENT => Normal");
spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
}
-static void statemc_period_poll(void) {
- if (!sm_period_counter) return;
- sm_period_counter--;
- assert(sm_period_counter>=0);
-
- if (sm_period_counter) return;
+static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
switch (sms) {
case sm_NORMAL:
statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
- break;
+ return 1;
case sm_FLUSHFAILED:
spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
- break;
+ return 1;
default:
- abort();
+ return 0;
}
}
+static void statemc_period_poll(void) {
+ if (!sm_period_counter) return;
+ sm_period_counter--;
+ assert(sm_period_counter>=0);
+
+ if (sm_period_counter) return;
+ int ok= trigger_flush_ok();
+ assert(ok);
+}
+
static int inputfile_is_done(InputFile *ipf) {
if (!ipf) return 0;
if (ipf->inprogress) return 0; /* new article in the meantime */
static void preterminate(void) {
if (in_child) return;
notice_processed(main_input_file,0,"feedfile","");
- notice_processed(flushing_input_file,0,"flushing file","");
+ notice_processed(flushing_input_file,0,"flushing","");
if (backlog_input_file)
notice_processed(backlog_input_file,0, "backlog file ",
backlog_input_file->path);
static int signal_self_pipe[2];
static sig_atomic_t terminate_sig_flag;
-static void defraise(int signo) {
- struct sigaction sa;
- memset(&sa,0,sizeof(sa));
- sa.sa_handler= SIG_DFL;
- xsigaction(signo,&sa);
+static void raise_default(int signo) {
+ xsigsetdefault(signo);
raise(signo);
+ abort();
}
static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
if (terminate_sig_flag) {
preterminate();
notice("terminating (%s)", strsignal(terminate_sig_flag));
- defraise(terminate_sig_flag);
- abort();
+ raise_default(terminate_sig_flag);
}
return OOP_CONTINUE;
}
static void sigarrived_handler(int signum) {
static char x;
switch (signum) {
- case SIGINT: case SIGTERM:
+ case SIGTERM:
+ case SIGINT:
if (!terminate_sig_flag) terminate_sig_flag= signum;
break;
default:
static void postfork(void) {
in_child= 1;
- if (signal(SIGPIPE, SIG_DFL) == SIG_ERR)
- sysdie("(in child) failed to reset SIGPIPE");
+ xsigsetdefault(SIGTERM);
+ xsigsetdefault(SIGINT);
+ xsigsetdefault(SIGPIPE);
+ if (terminate_sig_flag) raise(terminate_sig_flag);
postfork_inputfile(main_input_file);
postfork_inputfile(flushing_input_file);
Conn *conn;
- for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn))
+ FOR_CONN(conn)
conn_closefd(conn,"(in child) ");
postfork_stdio(defer, "defer file ", path_defer);
}
static void every(int interval, int fixed_rate, void (*f)(void)) {
- Every *e= xmalloc(sizeof(*e));
+ NEW_DECL(Every *,e);
e->interval.tv_sec= interval;
e->interval.tv_usec= 0;
e->fixed_rate= fixed_rate;
fprintf(f,"conns count=%d\n", conns.count);
Conn *conn;
- for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) {
+ FOR_CONN(conn) {
fprintf(f,"C%d",conn->fd);
DUMPV("%p",conn->,rd); DUMPV("%d",conn->,max_queue);