/*
* todo
- * - reset signals TERM and INT (and HUP) in children
- *
+ * - rename sm_period_counter as it's just about flushes
* - manpage: document control master stuff
- * - admin-initiated flush
*
* debugging rune:
* build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
#define VA va_list al; va_start(al,fmt)
#define PRINTF(f,a) __attribute__((__format__(printf,f,a)))
#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
+#define NORET __attribute__((__noreturn__))
#define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr))))
#define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
static void statemc_start_flush(const char *why); /* Normal => Flushing */
static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
+static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
static void article_done(Conn *conn, Article *art, int whichcount);
static void close_defer(void);
static void search_backlog_file(void);
static void preterminate(void);
-static void raise_default(int signo);
+static void raise_default(int signo) NORET;
static char *debug_report_ipf(InputFile *ipf);
static void inputfile_reading_start(InputFile *ipf);
#define CONNIOVS 128
typedef enum {
- xk_Malloc, xk_Const, xk_Artdata
+ xk_Const, xk_Artdata
} XmitKind;
struct XmitDetails {
XmitKind kind;
union {
- char *malloc_tofree;
ARTHANDLE *sm_art;
} info;
};
return now;
}
-static void xsigaction(int s, const struct sigaction *sa) {
- int r= sigaction(s,sa,0);
- if (r) sysdie("sigaction failed for \"%s\"", strsignal(s));
+static void xsigaction(int signo, const struct sigaction *sa) {
+ int r= sigaction(signo,sa,0);
+ if (r) sysdie("sigaction failed for \"%s\"", strsignal(signo));
+}
+
+static void xsigsetdefault(int signo) {
+ struct sigaction sa;
+ memset(&sa,0,sizeof(sa));
+ sa.sa_handler= SIG_DFL;
+ xsigaction(signo,&sa);
}
static void xgettimeofday(struct timeval *tv_r) {
fprintf(cc->out, " %s\n", ccmd->cmd);
}
-CCMD(period) { period(); }
-CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
-CCMD(setint) { *(int*)c->xdata= c->xval; }
-CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
-CCMD(dump);
+CCMD(flush) {
+ int ok= trigger_flush_ok();
+ if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
+}
CCMD(stop) {
preterminate();
abort();
}
+CCMD(dump);
+
+/* messing with our head: */
+CCMD(period) { period(); }
+CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
+CCMD(setint) { *(int*)c->xdata= c->xval; }
+CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
+
static const ControlCommand control_commands[]= {
{ "h", ccmd_help },
- { "p", ccmd_period },
+ { "flush", ccmd_flush },
{ "stop", ccmd_stop },
{ "dump q", ccmd_dump, 0,0 },
{ "dump a", ccmd_dump, 0,1 },
+ { "p", ccmd_period },
+
#define POKES(cmd,func) \
{ cmd "sm", func, &sm_period_counter, 1 }, \
{ cmd "conn", func, &until_connect, 0 }, \
static void xmit_free(XmitDetails *d) {
switch (d->kind) {
- case xk_Malloc: free(d->info.malloc_tofree); break;
case xk_Artdata: SMfreearticle(d->info.sm_art); break;
case xk_Const: break;
default: abort();
ipf->readcount_ok++;
art= xmalloc(sizeof(*art) - 1 + midlen + 1);
- memset(art,0,sizeof(art));
+ memset(art,0,sizeof(*art));
art->state= art_Unchecked;
art->midlen= midlen;
art->ipf= ipf; ipf->inprogress++;
| flsh->rd!=0 | | flsh->rd!=0
| [Separated] | | [Dropping]
| main F idle | | main none
- | old D tail | | old D tail
+ | flsh D tail | | flsh D tail
| ============= | | ============
| | | | install |
^ | EOF ON D | | defer | EOF ON D
| flsh->rd==0 | V flsh->rd==0
| [Finishing] | | [Dropping]
| main F tail | `. main none
- | old D closed | `. old D closed
+ | flsh D closed | `. flsh D closed
| =============== V `. ===============
| | `. |
| | ALL D PROCESSED `. | ALL D PROCESSED
DROPPED
[Dropped]
main none
- old none
+ flsh none
some backlog
==============
|
if (file_d) {
debug("startup: F!=D => Separated");
startup_set_input_file(file_d);
+ flushing_input_file= main_input_file;
+ main_input_file= open_input_file(feedfile);
+ if (!main_input_file) die("feedfile vanished during startup");
SMS(SEPARATED, 0, "found both old and current feed files");
} else {
debug("startup: F exists, D ENOENT => Normal");
spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
}
-static void statemc_period_poll(void) {
- if (!sm_period_counter) return;
- sm_period_counter--;
- assert(sm_period_counter>=0);
-
- if (sm_period_counter) return;
+static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
switch (sms) {
case sm_NORMAL:
statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
- break;
+ return 1;
case sm_FLUSHFAILED:
spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
- break;
+ return 1;
default:
- abort();
+ return 0;
}
}
+static void statemc_period_poll(void) {
+ if (!sm_period_counter) return;
+ sm_period_counter--;
+ assert(sm_period_counter>=0);
+
+ if (sm_period_counter) return;
+ int ok= trigger_flush_ok();
+ assert(ok);
+}
+
static int inputfile_is_done(InputFile *ipf) {
if (!ipf) return 0;
if (ipf->inprogress) return 0; /* new article in the meantime */
static void preterminate(void) {
if (in_child) return;
notice_processed(main_input_file,0,"feedfile","");
- notice_processed(flushing_input_file,0,"flushing file","");
+ notice_processed(flushing_input_file,0,"flushing","");
if (backlog_input_file)
notice_processed(backlog_input_file,0, "backlog file ",
backlog_input_file->path);
static sig_atomic_t terminate_sig_flag;
static void raise_default(int signo) {
- struct sigaction sa;
- memset(&sa,0,sizeof(sa));
- sa.sa_handler= SIG_DFL;
- xsigaction(signo,&sa);
+ xsigsetdefault(signo);
raise(signo);
+ abort();
}
static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
preterminate();
notice("terminating (%s)", strsignal(terminate_sig_flag));
raise_default(terminate_sig_flag);
- abort();
}
return OOP_CONTINUE;
}
static void sigarrived_handler(int signum) {
static char x;
switch (signum) {
- case SIGINT: case SIGTERM:
+ case SIGTERM:
+ case SIGINT:
if (!terminate_sig_flag) terminate_sig_flag= signum;
break;
default:
static void postfork(void) {
in_child= 1;
- if (signal(SIGPIPE, SIG_DFL) == SIG_ERR)
- sysdie("(in child) failed to reset SIGPIPE");
+ xsigsetdefault(SIGTERM);
+ xsigsetdefault(SIGINT);
+ xsigsetdefault(SIGPIPE);
+ if (terminate_sig_flag) raise(terminate_sig_flag);
postfork_inputfile(main_input_file);
postfork_inputfile(flushing_input_file);
char *dinfo;
long diff;
switch (xd->kind) {
- case xk_Malloc:
- diff= xd->info.malloc_tofree - (char*)iv->iov_base;
- dinfo= xasprintf("M%5ld", diff);
- break;
- case xk_Const:
- dinfo= xasprintf("Const");
- break;
- case xk_Artdata:
- dinfo= xasprintf("A%p", xd->info.sm_art);
- break;
+ case xk_Const: dinfo= xasprintf("Const"); break;
+ case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break;
default:
abort();
}