From: Ian Jackson Date: Sun, 14 Feb 2010 20:44:00 +0000 (+0000) Subject: WIP before rethink reading-two-files-at-once X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=commitdiff_plain;h=e9a4be9b440d9bb16c5904a673641ad531cc05a0 WIP before rethink reading-two-files-at-once --- diff --git a/backends/innduct.c b/backends/innduct.c index a2a4ec6..cf8fe6f 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -27,55 +27,96 @@ * created (link/mv) by admin * read by xmit * unlinked by xmit + + + OVERALL STATES: + + START + | + check D, F + | + <--------------------------------------'| + Nothing F, D both ENOENT | + F: ENOENT | + D: ENOENT | + duct: not not reading anything | + | | + |`---------------------. | + | | duct times out waiting for F | + V innd creates F | duct exits | + | V | + Noduct GO TO Dropped | + F: innd writing | + D: ENOENT | + duct: not running or not reading anything | + | | + | | + ,-->--+ <---------------------------------'| + | | duct opens F F exists | + | | D ENOENT | + | V | + | Normal | + | F: innd writing, duct reading | + | D: ENOENT | + | | | + | | duct decides time to flush | + | | duct makes hardlink | + | | | + | V <------------------------'| + | Hardlinked F==D | + | F == D: innd writing, duct reading both exist | + ^ | | + | | duct unlinks F | + | V | + | Moved <----+------------<--'| + | F: ENOENT | F ENOENT | + | D: innd writing, duct reading | D exists | + | | | | + | | duct requests flush of feed | | + | | (others can too, harmlessly) | | + | V | | + | Flushing | | + | F: ENOENT | | + | D: innd flushing, duct reading | | + | | | | + | | inndcomm flush fails | | + | |`-------------------------->---------' | + | | | + | | inndcomm reports no such site | + | |`---------------------------------------------------- | -. + | | | | + | | innd finishes writing D, creates F | | + | | inndcomm reports flush successful | | + | | duct opens F too | | + | V | | + | Flushed <----------------' | + | F: innd writing, duct reading F!=D / + | D: duct reading both exist / + | | / + | | duct gets to the end of D / + | V duct finishes processing D / + | | duct unlinks D / + | | | + `--<--' V + Dropping + F: ENOENT + D: duct reading + | + | duct finishes + | processing D + | duct unlinks D + | duct exits + V + Dropped + F: ENOENT + D: ENOENT + duct not running + + "duct reading" means innduct is reading the file but also + overwriting processed tokens. + * * - * OVERALL STATES: - * - * START - * ,-->--. | - * | | check D - * | | / | - * | | ENOENT/ |exists - * | V <----------+--<----------------------' | - * | None | | - * | F: unopened | | - * | D: ENOENT | | - * | | | | - * | repeatedly | | - * | open F ------->-' | - * | | ENOENT | - * | |OK | - * | | | - * | V | - * | Normal check F - * | F: innd writing, duct reading / editing /|\ - * | D: ENOENT / | \ - * | | / | \ - * | | duct decides time to flush same / | | - * | | duct makes hardlink as D / | | - * | | / | | - * | V <---------' | | - * | Hardlinked | | - * | F == D: innd writing, duct reading / editing | | - * ^ | | | - * | | duct unlinks F / | - * | V ENOENT / | - * | Moved <------------' | - * | F: ENOENT | - * | D: innd writing, duct reading / editing | - * | | | - * | | duct requests flush of feed | - * | | (others can too, harmlessly) | - * | V | - * | Separated <-----------------' - * | F: innd writing different to D - * | D: duct reading / editing - * | | - * | V duct completes processing of D - * | | duct unlinks D - * | | - * `--<--' - * */ #define PERIOD_SECONDS 60 @@ -669,7 +710,42 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, #include static int filemon_inotify_fd; -static int filemon_inotify_wd= -1; +static int filemon_inotify_wdmax; +static InputFile **filemon_inotify_wd2ipf; + +typedef struct Filemon_Perfile { + int wd; +} Filemon_Inotify_Perfile; + +static void filemon_startfile(InputFile *ipf) { + int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY); + if (wd < 0) sysdie("inotify_add_watch %s", ipf->path); + + if (wd >= filemon_inotify_wdmax) { + int newmax= wd+2; + filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf, + sizeof(*filemon_inotify_wd2ipf) * newmax); + memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0, + sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax)); + filemon_inotify_wdmax= newmax; + } + + assert(!filemon_inotify_wd2ipf[wd]); + filemon_inotify_wd2ipf[wd]= ipf; + + assert(!ipf->filemon); + ipf->filemon= xmalloc(sizeof(*ipf->filemon)); + ipf->filemon->wd= wd; +} + +static void filemon_stopfile(InputFile *ipf) { + int wd= ipf->filemon->wd; + int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd); + if (r) sysdie("inotify_rm_watch"); + filemon_inotify_wd2ipf[wd]= 0; + free(ipf->filemon); + ipf->filemon= 0; +} static void *filemon_inotify_readable(oop_source *lp, int fd, oop_event e, void *u) { @@ -680,16 +756,12 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, if (errno==EAGAIN) break; sysdie("read from inotify master"); } else if (r==sizeof(iev)) { - assert(wd == filemon_inotify_wd); + assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax); } else { die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev)); } + filemon_callback(filemon_inotify_wd2ipf[iev.wd]); } - /* Technically speaking the select might fire early in which case - * we'll read no events and then call filemon_callback - * unnecessarily. We don't care about that. - */ - filemon_callback(); return OOP_CONTINUE; } @@ -705,15 +777,6 @@ static int filemon_init(void) { return 1; } -static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path) { - if (filemon_inotify_wd >= 0) { - int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd); - if (r) sysdie("inotify_rm_watch"); - } - filemon_inotify_wd= inotify_add_watch(filemon_inotify_fd, path, IN_MODIFY); - if (filemon_inotify_wd < 0) sysdie("inotify_add_watch"); -} - #endif /* HAVE_INOTIFY && !HAVE_FILEMON *// /*---------- filemon dummy implementation ----------*/ @@ -721,16 +784,106 @@ static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path) { #if !defined(HAVE_FILEMON) static int filemon_init(void) { return 0; } -static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path) { } +static void filemon_startfile(InputFile *ipf) { } +static void filemon_stopfile(InputFile *ipf) { } #endif /*========== interaction with innd ==========*/ -/* see state diagram at top of file */ +/* See official state diagram at top of file. We implement + * this as follows: + * + ================ + WAITING + [Nothing/Noduct] + poll for F + ================ + | + | TIMEOUT + |`--------------------------. + | | install defer as backlog + ,--------->| | exit + | | OPEN F SUCCEEDS V + | V ========= + | ======== (ESRCH) + | NORMAL [Dropped] + | [Normal] ========= + | read F + | ======== + | | + | | F IS SO BIG WE SHOULD FLUSH + ^ | hardlink F to D + | [Hardlinked] + | | unlink F + | | our handle onto F is now onto D + | [Moved] + | | + | |<---------------------------------------------------. + | | | + | | spawn inndcomm flush | + | V | + | ========== | + | FLUSHING | + | [Flushing] | + | read D | + | ========== | + | | | + | | INNDCOMM FLUSH FAILS ^ + | |`----------------------->--------. | + | | | | + | | NO SUCH SITE V | + ^ |`----------------. ========= | + | | | FLUSHFAIL | + | | V [Moved] | + | | ========== read D | + | | DROPPING ========= | + | | [Dropping] | | + | | read D | TIME TO RETRY | + | | ========== `------------------' + | | FLUSH OK | + | | open F | AT EOF OF D AND ALL PROCESSED + | V | install defer as backlog + | ========== | unlink D + | FLUSHED | exit + | [Flushed] V + | read D, F ========== + | ========== (ESRCH) + | | [Droppped] + | | ========== + | | + | | AT EOF OF D AND ALL D PROCESSED + ^ V unlink D + | | close D + | | install defer as backlog + | | start new defer + | | + `----------' + + * + * duct state + * WAITING + * NORMAL + * FLUSHING + * FLUSHED + * FLUSHFAIL + * DROPPING + */ static char *path_ductlock, *path_duct, *path_ductdefer; -static int tailing_fd= -1, flushing_fd= -1; + +typedef struct { + /* This is an instance of struct oop_readable */ + struct oop_readable readable; + oop_readable_call *readable_callback; + + int fd; + const char *path; /* ptr copy of path_ or feedfile */ + struct Filemon_Perfile *filemon; + + oop_read *rd; + long inprogress; /* no. of articles read but not processed */ +} InputFile; static void statemc_init(void) { path_ductlock= xasprintf("%s_duct.lock", feedfile); @@ -769,7 +922,7 @@ static void statemc_poll(void) { return; } - + /*========== main program ==========*/