From 81f2b44ebdc2ec86a9b5910fb2af3d9e9d5effa9 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Wed, 3 Mar 2010 18:17:55 +0000 Subject: [PATCH] Inputfile machinery. Now do state machine --- backends/innduct.c | 183 +++++++++++++++++++++++++++++++++------------ 1 file changed, 137 insertions(+), 46 deletions(-) diff --git a/backends/innduct.c b/backends/innduct.c index cf8fe6f..51b4233 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -87,23 +87,24 @@ | | | | | | innd finishes writing D, creates F | | | | inndcomm reports flush successful | | - | | duct opens F too | | + | | | | | V | | - | Flushed <----------------' | - | F: innd writing, duct reading F!=D / - | D: duct reading both exist / - | | / - | | duct gets to the end of D / - | V duct finishes processing D / - | | duct unlinks D / - | | | - `--<--' V - Dropping - F: ENOENT - D: duct reading - | - | duct finishes - | processing D + | Separated <----------------' | + | F: innd writing F!=D / + | D: duct reading both exist / + | | / + | | duct gets to the end of D / + | | duct opens F too / + | V / + | Finishing / + | F: innd writing, duct reading | + | D: duct finishing V + | | Dropping + | | duct finishes processing D F: ENOENT + | V duct unlinks D D: duct reading + | | | + `--<--' | duct finishes + | processing D | duct unlinks D | duct exits V @@ -700,8 +701,62 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, /*---------- tailing input file ----------*/ +static void filemon_start(InputFile *ipf) { + assert(!ipf->filemon); + + ipf->filemon= xmalloc(sizeof(*ipf->filemon)); + memset(ipf->filemon, 0, sizeof(*ipf->filemon)); + filemon_method_startfile(ipf, ipf->filemon); +} + +static void filemon_stop(InputFile *ipf) { + if (!ipf->filemon) return; + filemon_method_stopfile(ipf, ipf->filemon); + free(ipf->filemon); + ipf->filemon= 0; +} + +static void filemon_callback(InputFile *ipf) { + ipf->readable_callback(ipf->readable_callback_user); +} + +static void *tailing_rable_call_time(oop_source *loop, struct timeval tv, + void *user) { + InputFile *ipf= user; + return ipf->readable_callback(ipf->readable_callback_user); +} +static void on_cancel(struct oop_readable *rable) { + InputFile *ipf= (void*)rable; + if (ipf->filemon) filemon_stopfile(ipf); + loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); +} + +static int tailing_on_readable(struct oop_readable *rable, + oop_readable_call *cb, void *user) { + InputFile *ipf= (void*)rable; + + tailing_on_cancel(rable); + ipf->readable_callback= cb; + ipf->readable_callback_user= user; + filemon_startfile(ipf); + + loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); + return 0; +} + +static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, + size_t length) { + InputFile *ipf= (void*)rable; + for (;;) { + ssize_t r= read(ipf->fd, buffer, length); + if (!r && ipf==main_input_file) { errno=EAGAIN; return -1; } + if (r==-1 && errno==EINTR) continue; + return r; + } +} + /*---------- filemon implemented with inotify ----------*/ #if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON) @@ -717,7 +772,7 @@ typedef struct Filemon_Perfile { int wd; } Filemon_Inotify_Perfile; -static void filemon_startfile(InputFile *ipf) { +static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY); if (wd < 0) sysdie("inotify_add_watch %s", ipf->path); @@ -733,18 +788,14 @@ static void filemon_startfile(InputFile *ipf) { assert(!filemon_inotify_wd2ipf[wd]); filemon_inotify_wd2ipf[wd]= ipf; - assert(!ipf->filemon); - ipf->filemon= xmalloc(sizeof(*ipf->filemon)); - ipf->filemon->wd= wd; + pf->wd= wd; } -static void filemon_stopfile(InputFile *ipf) { - int wd= ipf->filemon->wd; +static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { + int wd= pf->wd; int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd); if (r) sysdie("inotify_rm_watch"); filemon_inotify_wd2ipf[wd]= 0; - free(ipf->filemon); - ipf->filemon= 0; } static void *filemon_inotify_readable(oop_source *lp, int fd, @@ -760,12 +811,13 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, } else { die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev)); } - filemon_callback(filemon_inotify_wd2ipf[iev.wd]); + InputFile *ipf= filemon_inotify_wd2ipf[iev.wd]; + filemon_callback(ipf); } return OOP_CONTINUE; } -static int filemon_init(void) { +static int filemon_method_init(void) { filemon_inotify_fd= inotify_init(); if (filemon_inotify_fd<0) { syswarn("could not initialise inotify: inotify_init failed"); @@ -783,11 +835,47 @@ static int filemon_init(void) { #if !defined(HAVE_FILEMON) -static int filemon_init(void) { return 0; } -static void filemon_startfile(InputFile *ipf) { } -static void filemon_stopfile(InputFile *ipf) { } +typedef struct Filemon_Perfile { int dummy; } Filemon_Dummy_Perfile; -#endif +static int filemon_method_init(void) { return 0; } +static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { } +static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { } + +#endif /* !HAVE_FILEMON */ + +/*---------- interface to start and stop an input file ----------*/ + +static const oop_rd_style feedfile_rdstyle= { + OOP_RD_DELIM_STRIP, '\n', + OOP_RD_NUL_FORBID, + OOP_RD_SHORTREC_EOF, +}; + +static void inputfile_tailing_start(InputFile *ipf) { + assert(!ipf->fd); + ipf->readable->on_readable= tailing_on_readable; + ipf->readable->on_cancel= tailing_on_cancel; + ipf->readable->try_read= tailing_try_read; + ipf->readable->delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */ + ipf->readable->delete_kill= 0; + + ipf->readable_callback= 0; + ipf->readable_callback_user= 0; + + ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0); + assert(ipf->fd); + + int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, + feedfile_got_article,ipf, feedfile_problem,ipf); + if (r) sysdie("unable start reading feedfile %s",ipf->path); +} + +static void inputfile_tailing_stop(InputFile *ipf) { + assert(ipf->fd); + oop_rd_delete(ipf->rd); + ipf->rd= 0; + assert(!ipf->filemon); /* we shouldn't be monitoring it now */ +} /*========== interaction with innd ==========*/ @@ -844,38 +932,41 @@ static void filemon_stopfile(InputFile *ipf) { } | | FLUSH OK | | | open F | AT EOF OF D AND ALL PROCESSED | V | install defer as backlog - | ========== | unlink D - | FLUSHED | exit - | [Flushed] V - | read D, F ========== - | ========== (ESRCH) + | =========== | unlink D + | SEPARATED | exit + | [Separated] V + | read D ========== + | =========== (ESRCH) | | [Droppped] | | ========== + | V + | | AT EOF OF D + ^ | + | =========== + | FINISHING + | [Finishing] + | read F + | write D + | =========== | | - | | AT EOF OF D AND ALL D PROCESSED - ^ V unlink D - | | close D + | | ALL D PROCESSED | | install defer as backlog | | start new defer + ^ V unlink D + | | close D | | `----------' * - * duct state - * WAITING - * NORMAL - * FLUSHING - * FLUSHED - * FLUSHFAIL - * DROPPING */ static char *path_ductlock, *path_duct, *path_ductdefer; typedef struct { /* This is an instance of struct oop_readable */ - struct oop_readable readable; + struct oop_readable readable; /* first */ oop_readable_call *readable_callback; + void *readable_callback_user; int fd; const char *path; /* ptr copy of path_ or feedfile */ -- 2.30.2