also we have to start a new backlog file every (some interval)
+ TODO for backlog file inputs
+ - all code to search for and open these files
+ - write proper algorithm comment
+
char messageid[1];
};
-typedef struct {
+typedef struct InputFile {
/* This is an instance of struct oop_readable */
struct oop_readable readable; /* first */
oop_readable_call *readable_callback;
oop_read *rd;
long inprogress; /* no. of articles read but not processed */
off_t offset;
+
+ Counts counts;
} InputFile;
typedef enum {
sm_NORMAL,
sm_FLUSHING,
sm_FLUSHFAIL,
- sm_SEPARATED1,
- sm_SEPARATED2, /* must follow SEPARATED2 - see feedfile_eof */
- sm_DROPPING1,
- sm_DROPPING2, /* must follow DROPPING1 - see feedfile_eof */
+ sm_SEPARATED,
+ sm_DROPPING,
} StateMachineState;
struct Conn {
static StateMachineState sms;
static FILE *defer;
-static InputFile *main_input_file, *old_input_file;
+static InputFile *main_input_file, *old_input_file, *backlog_input_file;
static int sm_period_counter;
art->sent= 1;
LIST_ADDTAIL(conn->sent, art);
- counts[art->checked].sent++;
+ art->ipf->counts[art->checked].sent++;
} else {
/* check it */
XMIT_LITERAL("\r\n");
LIST_ADDTAIL(conn->sent, art);
- counts[art->checked].offered++;
+ art->ipf->counts[art->checked].offered++;
}
}
}
static void article_done(Connection *conn, Article *art, int whichcount) {
*count++;
- counts.articles[art->checked][whichcount]++;
+ art->ipf->counts.articles[art->checked][whichcount]++;
if (whichcount == RC_accepted) update_nocheck(1);
else if (whichcount == RC_unwanted) update_nocheck(0);
assert(ipf->inprogress >= 0);
if (!ipf->inprogress)
- loop->on_time(loop, OOP_TIME_NOW, statemc_check_oldinput_done, 0);
+ loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, ipf);
free(art);
}
static void feedfile_eof(InputFile *ipf) {
assert(ipf != main_input_file); /* promised by tailing_try_read */
+
+ inputfile_tailing_stop(ipf);
+
+ if (ipf == backlog_input_file) {
+ assert(ipf->fd >= 0);
+ if (close(ipf->fd)) sysdie("could not close backlog file %s", ipf->path);
+ ipf->fd= -1;
+ return;
+ }
+
assert(ipf == old_input_file);
- if (sms==sm_SEPARATED1) SMS(SEPARATED2, 0, "eof on old feed file");
- else if (sms==sm_DROPPING1) SMS(DROPPING2, 0, "eof on dead feed file");
- else abort();
-
inputfile_tailing_stop(ipf);
+ assert(ipf->fd >= 0);
+ if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path);
+ ipf->fd= -1;
+
+ assert(sms==sm_SEPARATED || sms==sm_DROPPING);
+
if (main_input_file)
inputfile_tailing_start(main_input_file);
}
assert(!ipf->rd); /* must have had inputfile_tailing_stop */
assert(!ipf->inprogress); /* no dangling pointers pointing here */
- if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path);
+ if (ipf->fd >= 0)
+ if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path);
+
+ fixme maybe free ipf->path;
+
free(ipf);
}
return r;
}
if (!r) {
- if (ipf==main_input_file) { errno=EAGAIN; return -1; }
- assert(sms==sm_SEPARATED1 || sms==sm_DROPPING1);
+ if (ipf==main_input_file) {
+ errno=EAGAIN;
+ return -1;
+ } else if (ipf==old_input_file) {
+ assert(ipf->fd>=0);
+ assert(sms==sm_SEPARATED || sms==sm_DROPPING);
+ } else if (ipf==backlog_input_file) {
+ assert(ipf->fd>=0);
+ } else {
+ abort();
+ }
}
return r;
}
| | open F \
| V V
| ============= ============
- | SEPARATED1 DROPPING1
+ | SEPARATED/ DROPPING/
+ | old->fd>=0 old->fd>=0
| [Separated] [Dropping]
| main F idle main none
| old D tail old D tail
| | |
^ | EOF ON D | EOF ON D
| V V
- | ============= ============
- | SEPARATED2 DROPPING2
+ | =============== ===============
+ | SEPARATED/ DROPPING/
+ | old->fd==-1 old->fd==-1
| [Finishing] [Dropping]
| main F tail main none
- | old D idle old D idle
- | ============= ============
+ | old D closed old D closed
+ | =============== ===============
| | |
| | ALL D PROCESSED | ALL D PROCESSED
| V install defer as backlog V install defer as backlog
spawn_inndcomm_flush(); /* => Flushing, sets sms to sm_FLUSHING */
} else {
/* F!=D => Separated */
- SMS(SEPARATED1, 0, "found both old and current feed files");
+ SMS(SEPARATED, 0, "found both old and current feed files");
startup_set_input_file(file_d);
}
} else { /*!file_d*/
inputfile_tailing_start(f);
}
-static void *statemc_check_oldinput_done(oop_source *lp,
- struct timeval now, void *u) {
+static void *statemc_check_input_done(oop_source *lp,
+ struct timeval now, void *ipf_v) {
+ InputFile *ipf= ipf_v;
struct stat stab;
- int done= (sms==sm_SEPARATED2 || sms==sm_DROPPING2)
- && old_input_file->inprogress;
- if (!done) return;
+ if (ipf->inprogress) return; /* new article in the meantime */
+ if (ipf->fd >= 0); return; /* not had EOF */
+
+ if (ipf == backlog_input_file) {
+ notice_processed(ipf,"backlog file",ipf->path);
+ if (unlink(ipf->path))
+ sysdie("could not unlink done backlog file %s", ipf->path);
+ close_input_file(ipf);
+ fixme trigger search for new backlog file;
+ }
+
+ assert(ipf == old_input_file);
+ assert(sms==sm_SEPARATED || sms==sm_DROPPING);
+
+ notice_processed(ipf,"feed file",0);
r= fstat(fileno(defer), &stab);
if (r) sysdie("check defer file %s", path_defer);
if (unlink(path_duct))
sysdie("could not unlink old duct file %s", path_duct);
- if (sms==sm_DROPPING2) {
+ if (sms==sm_DROPPING) {
notice("feed dropped and our work is complete"
" (but check for backlog files)");
exit(0);
warn("feed has been dropped by innd, finishing up");
old_input_file= main_input_file;
main_input_file= 0;
- SMS(DROPPING1, 0, "dropped by innd");
+ SMS(DROPPING, 0, "dropped by innd");
return OOP_CONTINUE;
case 0:
main_input_file= open_input_file(feedfile);
if (!main_input_file)
die("flush succeeded but feedfile %s does not exist!", feedfile);
- SMS(SEPARATED1, 0, "feed file missing");
+ SMS(SEPARATED, 0, "feed file missing");
return OOP_CONTINUE;
default: