/*
- * todo
- * - abolish xk_Malloc
- * - rename sm_period_counter as it's just about flushes
- * - manpage: document control master stuff
- *
* debugging rune:
* build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
*/
+/*--
+flow control notes
+to ensure articles go away eventually
+separate queue for each input file
+ queue expiry
+ every period, check head of backlog queue for expiry with SMretrieve
+ if too old: discard, and check next article
+ also check every backlog article as we read it
+ flush expiry
+ after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping
+ one-off: eat queued articles from flushing and write them to defer
+ one-off: connfail all connections which have any articles from flushing
+ newly read articles from flushing go straight to defer
+ this should take care of it and get us out of this state
+to avoid filling up ram needlessly
+ input control
+ limit number of queued articles for each ipf
+ pause/resume inputfile tailing
+--*/
+
/*
* Newsfeeds file entries should look like this:
* host.name.of.site[/exclude,exclude,...]\
#define CONNIOVS 128
typedef enum {
- xk_Malloc, xk_Const, xk_Artdata
+ xk_Const, xk_Artdata
} XmitKind;
struct XmitDetails {
XmitKind kind;
union {
- char *malloc_tofree;
ARTHANDLE *sm_art;
} info;
};
/* statemc_init initialises */
static StateMachineState sms;
-static int sm_period_counter;
+static int until_flush;
static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
static FILE *defer;
const ControlCommand *ccmd;
for (ccmd=control_commands; ccmd->cmd; ccmd++)
fprintf(cc->out, " %s\n", ccmd->cmd);
+ fputs("NB: permissible arguments are not shown above."
+ " Not all commands listed are safe. See innduct(8).\n", cc->out);
}
CCMD(flush) {
{ "p", ccmd_period },
#define POKES(cmd,func) \
- { cmd "sm", func, &sm_period_counter, 1 }, \
+ { cmd "flush", func, &until_flush, 1 }, \
{ cmd "conn", func, &until_connect, 0 }, \
{ cmd "blscan", func, &until_backlog_nextscan, 0 },
-POKES("prod ", ccmd_setint_period)
POKES("next ", ccmd_setint)
+POKES("prod ", ccmd_setint_period)
{ "pretend flush", ccmd_setintarg, &simulate_flush },
{ "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 },
static void xmit_free(XmitDetails *d) {
switch (d->kind) {
- case xk_Malloc: free(d->info.malloc_tofree); break;
case xk_Artdata: SMfreearticle(d->info.sm_art); break;
case xk_Const: break;
default: abort();
why,
(unsigned long)(main_input_file ? main_input_file->offset : 0),
(unsigned long)target_max_feedfile_size,
- sm_period_counter);
+ until_flush);
int r= link(feedfile, path_flushing);
if (r) sysfatal("link feedfile %s to flushing file %s",
}
static void statemc_period_poll(void) {
- if (!sm_period_counter) return;
- sm_period_counter--;
- assert(sm_period_counter>=0);
+ if (!until_flush) return;
+ until_flush--;
+ assert(until_flush>=0);
- if (sm_period_counter) return;
+ if (until_flush) return;
int ok= trigger_flush_ok();
assert(ok);
}
static void statemc_setstate(StateMachineState newsms, int periods,
const char *forlog, const char *why) {
sms= newsms;
- sm_period_counter= periods;
+ until_flush= periods;
const char *xtra= "";
switch (sms) {
" input_files main:%s flushing:%s backlog:%s"
" children connecting=%ld inndcomm=%ld"
,
- sms_names[sms], sm_period_counter,
+ sms_names[sms], until_flush,
conns.count, queue.count, until_connect,
dipf_main, dipf_flushing, dipf_backlog,
(long)connecting_child, (long)inndcomm_child
fprintf(f,"general");
DUMPV("%s", sms_names,[sms]);
- DUMPV("%d", ,sm_period_counter);
+ DUMPV("%d", ,until_flush);
DUMPV("%ld", (long),self_pid);
DUMPV("%p", , defer);
DUMPV("%d", , until_connect);
char *dinfo;
long diff;
switch (xd->kind) {
- case xk_Malloc:
- diff= xd->info.malloc_tofree - (char*)iv->iov_base;
- dinfo= xasprintf("M%5ld", diff);
- break;
- case xk_Const:
- dinfo= xasprintf("Const");
- break;
- case xk_Artdata:
- dinfo= xasprintf("A%p", xd->info.sm_art);
- break;
+ case xk_Const: dinfo= xasprintf("Const"); break;
+ case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break;
default:
abort();
}