* created (link/mv) by admin
* read by xmit
* unlinked by xmit
+
+
+ OVERALL STATES:
+
+ START
+ |
+ check D, F
+ |
+ <--------------------------------------'|
+ Nothing F, D both ENOENT |
+ F: ENOENT |
+ D: ENOENT |
+ duct: not not reading anything |
+ | |
+ |`---------------------. |
+ | | duct times out waiting for F |
+ V innd creates F | duct exits |
+ | V |
+ Noduct GO TO Dropped |
+ F: innd writing |
+ D: ENOENT |
+ duct: not running or not reading anything |
+ | |
+ | |
+ ,-->--+ <---------------------------------'|
+ | | duct opens F F exists |
+ | | D ENOENT |
+ | V |
+ | Normal |
+ | F: innd writing, duct reading |
+ | D: ENOENT |
+ | | |
+ | | duct decides time to flush |
+ | | duct makes hardlink |
+ | | |
+ | V <------------------------'|
+ | Hardlinked F==D |
+ | F == D: innd writing, duct reading both exist |
+ ^ | |
+ | | duct unlinks F |
+ | V |
+ | Moved <----+------------<--'|
+ | F: ENOENT | F ENOENT |
+ | D: innd writing, duct reading | D exists |
+ | | | |
+ | | duct requests flush of feed | |
+ | | (others can too, harmlessly) | |
+ | V | |
+ | Flushing | |
+ | F: ENOENT | |
+ | D: innd flushing, duct reading | |
+ | | | |
+ | | inndcomm flush fails | |
+ | |`-------------------------->---------' |
+ | | |
+ | | inndcomm reports no such site |
+ | |`---------------------------------------------------- | -.
+ | | | |
+ | | innd finishes writing D, creates F | |
+ | | inndcomm reports flush successful | |
+ | | duct opens F too | |
+ | V | |
+ | Flushed <----------------' |
+ | F: innd writing, duct reading F!=D /
+ | D: duct reading both exist /
+ | | /
+ | | duct gets to the end of D /
+ | V duct finishes processing D /
+ | | duct unlinks D /
+ | | |
+ `--<--' V
+ Dropping
+ F: ENOENT
+ D: duct reading
+ |
+ | duct finishes
+ | processing D
+ | duct unlinks D
+ | duct exits
+ V
+ Dropped
+ F: ENOENT
+ D: ENOENT
+ duct not running
+
+ "duct reading" means innduct is reading the file but also
+ overwriting processed tokens.
+
*
*
- * OVERALL STATES:
- *
- * START
- * ,-->--. |
- * | | check D
- * | | / |
- * | | ENOENT/ |exists
- * | V <----------+--<----------------------' |
- * | None | |
- * | F: unopened | |
- * | D: ENOENT | |
- * | | | |
- * | repeatedly | |
- * | open F ------->-' |
- * | | ENOENT |
- * | |OK |
- * | | |
- * | V |
- * | Normal check F
- * | F: innd writing, duct reading / editing /|\
- * | D: ENOENT / | \
- * | | / | \
- * | | duct decides time to flush same / | |
- * | | duct makes hardlink as D / | |
- * | | / | |
- * | V <---------' | |
- * | Hardlinked | |
- * | F == D: innd writing, duct reading / editing | |
- * ^ | | |
- * | | duct unlinks F / |
- * | V ENOENT / |
- * | Moved <------------' |
- * | F: ENOENT |
- * | D: innd writing, duct reading / editing |
- * | | |
- * | | duct requests flush of feed |
- * | | (others can too, harmlessly) |
- * | V |
- * | Separated <-----------------'
- * | F: innd writing different to D
- * | D: duct reading / editing
- * | |
- * | V duct completes processing of D
- * | | duct unlinks D
- * | |
- * `--<--'
- *
*/
#define PERIOD_SECONDS 60
#include <linux/inotify.h>
static int filemon_inotify_fd;
-static int filemon_inotify_wd= -1;
+static int filemon_inotify_wdmax;
+static InputFile **filemon_inotify_wd2ipf;
+
+typedef struct Filemon_Perfile {
+ int wd;
+} Filemon_Inotify_Perfile;
+
+static void filemon_startfile(InputFile *ipf) {
+ int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
+ if (wd < 0) sysdie("inotify_add_watch %s", ipf->path);
+
+ if (wd >= filemon_inotify_wdmax) {
+ int newmax= wd+2;
+ filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf,
+ sizeof(*filemon_inotify_wd2ipf) * newmax);
+ memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
+ sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
+ filemon_inotify_wdmax= newmax;
+ }
+
+ assert(!filemon_inotify_wd2ipf[wd]);
+ filemon_inotify_wd2ipf[wd]= ipf;
+
+ assert(!ipf->filemon);
+ ipf->filemon= xmalloc(sizeof(*ipf->filemon));
+ ipf->filemon->wd= wd;
+}
+
+static void filemon_stopfile(InputFile *ipf) {
+ int wd= ipf->filemon->wd;
+ int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
+ if (r) sysdie("inotify_rm_watch");
+ filemon_inotify_wd2ipf[wd]= 0;
+ free(ipf->filemon);
+ ipf->filemon= 0;
+}
static void *filemon_inotify_readable(oop_source *lp, int fd,
oop_event e, void *u) {
if (errno==EAGAIN) break;
sysdie("read from inotify master");
} else if (r==sizeof(iev)) {
- assert(wd == filemon_inotify_wd);
+ assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
} else {
die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
}
+ filemon_callback(filemon_inotify_wd2ipf[iev.wd]);
}
- /* Technically speaking the select might fire early in which case
- * we'll read no events and then call filemon_callback
- * unnecessarily. We don't care about that.
- */
- filemon_callback();
return OOP_CONTINUE;
}
return 1;
}
-static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path) {
- if (filemon_inotify_wd >= 0) {
- int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
- if (r) sysdie("inotify_rm_watch");
- }
- filemon_inotify_wd= inotify_add_watch(filemon_inotify_fd, path, IN_MODIFY);
- if (filemon_inotify_wd < 0) sysdie("inotify_add_watch");
-}
-
#endif /* HAVE_INOTIFY && !HAVE_FILEMON *//
/*---------- filemon dummy implementation ----------*/
#if !defined(HAVE_FILEMON)
static int filemon_init(void) { return 0; }
-static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path) { }
+static void filemon_startfile(InputFile *ipf) { }
+static void filemon_stopfile(InputFile *ipf) { }
#endif
/*========== interaction with innd ==========*/
-/* see state diagram at top of file */
+/* See official state diagram at top of file. We implement
+ * this as follows:
+ *
+ ================
+ WAITING
+ [Nothing/Noduct]
+ poll for F
+ ================
+ |
+ | TIMEOUT
+ |`--------------------------.
+ | | install defer as backlog
+ ,--------->| | exit
+ | | OPEN F SUCCEEDS V
+ | V =========
+ | ======== (ESRCH)
+ | NORMAL [Dropped]
+ | [Normal] =========
+ | read F
+ | ========
+ | |
+ | | F IS SO BIG WE SHOULD FLUSH
+ ^ | hardlink F to D
+ | [Hardlinked]
+ | | unlink F
+ | | our handle onto F is now onto D
+ | [Moved]
+ | |
+ | |<---------------------------------------------------.
+ | | |
+ | | spawn inndcomm flush |
+ | V |
+ | ========== |
+ | FLUSHING |
+ | [Flushing] |
+ | read D |
+ | ========== |
+ | | |
+ | | INNDCOMM FLUSH FAILS ^
+ | |`----------------------->--------. |
+ | | | |
+ | | NO SUCH SITE V |
+ ^ |`----------------. ========= |
+ | | | FLUSHFAIL |
+ | | V [Moved] |
+ | | ========== read D |
+ | | DROPPING ========= |
+ | | [Dropping] | |
+ | | read D | TIME TO RETRY |
+ | | ========== `------------------'
+ | | FLUSH OK |
+ | | open F | AT EOF OF D AND ALL PROCESSED
+ | V | install defer as backlog
+ | ========== | unlink D
+ | FLUSHED | exit
+ | [Flushed] V
+ | read D, F ==========
+ | ========== (ESRCH)
+ | | [Droppped]
+ | | ==========
+ | |
+ | | AT EOF OF D AND ALL D PROCESSED
+ ^ V unlink D
+ | | close D
+ | | install defer as backlog
+ | | start new defer
+ | |
+ `----------'
+
+ *
+ * duct state
+ * WAITING
+ * NORMAL
+ * FLUSHING
+ * FLUSHED
+ * FLUSHFAIL
+ * DROPPING
+ */
static char *path_ductlock, *path_duct, *path_ductdefer;
-static int tailing_fd= -1, flushing_fd= -1;
+
+typedef struct {
+ /* This is an instance of struct oop_readable */
+ struct oop_readable readable;
+ oop_readable_call *readable_callback;
+
+ int fd;
+ const char *path; /* ptr copy of path_<foo> or feedfile */
+ struct Filemon_Perfile *filemon;
+
+ oop_read *rd;
+ long inprogress; /* no. of articles read but not processed */
+} InputFile;
static void statemc_init(void) {
path_ductlock= xasprintf("%s_duct.lock", feedfile);
return;
}
-
+
/*========== main program ==========*/