From dd9af5cb6877b3c6c6b33d99800ec35745272729 Mon Sep 17 00:00:00 2001 Message-Id: From: Mark Wooding Date: Sun, 18 May 2008 17:51:36 +0100 Subject: [PATCH] Add a new 'wait' flag to the rescan command. This allows the caller to request that the rescan command blocks until the rescan is complete. Organization: Straylight/Edgeware From: Richard Kettlewell The reason is that if you run the tests on a Linux tmpfs they would with high probability hang, due to the rescan completing before the rescan_monitor had started up. The flags is available in the Python interface but not the C interface or the command-line client. This could easily be fixed if there is demand. There's also a 'fresh' flag, to demand that the rescan start after the receipt of the command (i.e. to guarantee your new tracks make it in) but I disabled that due to the inconvenience of testing it. However the code is still there if anyone feels like writing tests. --- doc/disorder_protocol.5.in | 14 ++++- lib/trackdb.c | 43 ++++++++++++++- lib/trackdb.h | 7 ++- python/disorder.py.in | 4 +- server/disorderd.c | 2 +- server/server.c | 110 ++++++++++++++++++++++++++++++++++--- server/state.c | 2 +- tests/dtest.py | 10 +--- 8 files changed, 170 insertions(+), 22 deletions(-) diff --git a/doc/disorder_protocol.5.in b/doc/disorder_protocol.5.in index 5a2bf83..0847050 100644 --- a/doc/disorder_protocol.5.in +++ b/doc/disorder_protocol.5.in @@ -255,9 +255,21 @@ Requires one of the \fBremove mine\fR, \fBremove random\fR or \fBremove any\fR rights depending on how the track came to be added to the queue. .TP -.B rescan +.B rescan \fR[\fBwait\fR] \fR[\fBfresh\fR] Rescan all roots for new or obsolete tracks. Requires the \fBrescan\fR right. +.IP +If the \fBwait\fR flag is present then the response is delayed until the rescan +completes. +Otherwise the response arrives immediately. +This is primarily intended for testing. +.IP +If the \fBfresh\fR flag is present a rescan is already underway then a second +rescan will be started when it completes. +The default behavior is to piggyback on the existing rescan. +.IP +NB that \fBfresh\fR is currently disabled in the server source, so using this +flag will just provoke an error. .TP .B resolve \fITRACK\fR Resolve a track name, i.e. if this is an alias then return the real track name. diff --git a/lib/trackdb.c b/lib/trackdb.c index 54016ba..2080e63 100644 --- a/lib/trackdb.c +++ b/lib/trackdb.c @@ -2097,6 +2097,28 @@ int trackdb_scan(const char *root, /* trackdb_rescan ************************************************************/ +/** @brief Node in the list of rescan-complete callbacks */ +struct rescanned_node { + struct rescanned_node *next; + void (*rescanned)(void *ru); + void *ru; +}; + +/** @brief List of rescan-complete callbacks */ +static struct rescanned_node *rescanned_list; + +/** @brief Add a rescan completion callback */ +void trackdb_add_rescanned(void (*rescanned)(void *ru), + void *ru) { + if(rescanned) { + struct rescanned_node *n = xmalloc(sizeof *n); + n->next = rescanned_list; + n->rescanned = rescanned; + n->ru = ru; + rescanned_list = n; + } +} + /* called when the rescanner terminates */ static int reap_rescan(ev_source attribute((unused)) *ev, pid_t pid, @@ -2111,23 +2133,37 @@ static int reap_rescan(ev_source attribute((unused)) *ev, /* Our cache of file lookups is out of date now */ cache_clean(&cache_files_type); eventlog("rescanned", (char *)0); + /* Call rescanned callbacks */ + while(rescanned_list) { + void (*rescanned)(void *u) = rescanned_list->rescanned; + void *ru = rescanned_list->ru; + + rescanned_list = rescanned_list->next; + rescanned(ru); + } return 0; } /** @brief Initiate a rescan * @param ev Event loop or 0 to block * @param recheck 1 to recheck lengths, 0 to suppress check + * @param rescanned Called on completion (if not NULL) + * @param u Passed to @p rescanned */ -void trackdb_rescan(ev_source *ev, int recheck) { +void trackdb_rescan(ev_source *ev, int recheck, + void (*rescanned)(void *ru), + void *ru) { int w; if(rescan_pid != -1) { + trackdb_add_rescanned(rescanned, ru); error(0, "rescan already underway"); return; } rescan_pid = subprogram(ev, -1, RESCAN, recheck ? "--check" : "--no-check", (char *)0); + trackdb_add_rescanned(rescanned, ru); if(ev) { ev_child(ev, rescan_pid, 0, reap_rescan, 0); D(("started rescanner")); @@ -2147,6 +2183,11 @@ int trackdb_rescan_cancel(void) { return 1; } +/** @brief Return true if a rescan is underway */ +int trackdb_rescan_underway(void) { + return rescan_pid != -1; +} + /* global prefs **************************************************************/ void trackdb_set_global(const char *name, diff --git a/lib/trackdb.h b/lib/trackdb.h index f372f4c..f97ee5f 100644 --- a/lib/trackdb.h +++ b/lib/trackdb.h @@ -136,7 +136,9 @@ char **trackdb_search(char **wordlist, int nwordlist, int *ntracks); /* return a list of tracks containing all of the words given. If you * ask for only stopwords you get no tracks. */ -void trackdb_rescan(struct ev_source *ev, int recheck); +void trackdb_rescan(struct ev_source *ev, int recheck, + void (*rescanned)(void *ru), + void *ru); /* Start a rescan, if one is not running already */ int trackdb_rescan_cancel(void); @@ -177,6 +179,9 @@ typedef void random_callback(struct ev_source *ev, const char *track); int trackdb_request_random(struct ev_source *ev, random_callback *callback); +void trackdb_add_rescanned(void (*rescanned)(void *ru), + void *ru); +int trackdb_rescan_underway(void); #endif /* TRACKDB_H */ diff --git a/python/disorder.py.in b/python/disorder.py.in index 23f840c..ef33106 100644 --- a/python/disorder.py.in +++ b/python/disorder.py.in @@ -479,12 +479,12 @@ class client: """ self._simple("reconfigure") - def rescan(self): + def rescan(self, *flags): """Rescan one or more collections. Only trusted users can perform this operation. """ - self._simple("rescan") + self._simple("rescan", *flags) def version(self): """Return the server's version number.""" diff --git a/server/disorderd.c b/server/disorderd.c index 3a5e093..9f444b6 100644 --- a/server/disorderd.c +++ b/server/disorderd.c @@ -160,7 +160,7 @@ static void create_periodic(ev_source *ev_, } static void periodic_rescan(ev_source *ev_) { - trackdb_rescan(ev_, 1/*check*/); + trackdb_rescan(ev_, 1/*check*/, 0, 0); } static void periodic_database_gc(ev_source attribute((unused)) *ev_) { diff --git a/server/server.c b/server/server.c index fd0fcd8..60cb869 100644 --- a/server/server.c +++ b/server/server.c @@ -124,6 +124,8 @@ struct conn { rights_type rights; /** @brief Next connection */ struct conn *next; + /** @brief True if pending rescan had 'wait' set */ + int rescan_wait; }; /** @brief Linked list of connections */ @@ -355,13 +357,107 @@ static int c_reconfigure(struct conn *c, return 1; /* completed */ } +static void finished_rescan(void *ru) { + struct conn *const c = ru; + + sink_writes(ev_writer_sink(c->w), "250 rescan completed\n"); + /* Turn this connection back on */ + ev_reader_enable(c->r); +} + +static void start_fresh_rescan(void *ru) { + struct conn *const c = ru; + + if(trackdb_rescan_underway()) { + /* Some other waiter beat us to it. However in this case we're happy to + * piggyback; the requirement is that a new rescan be started, not that it + * was _our_ rescan. */ + if(c->rescan_wait) { + /* We block until the rescan completes */ + trackdb_add_rescanned(finished_rescan, c); + } else { + /* We report that the new rescan has started */ + sink_writes(ev_writer_sink(c->w), "250 rescan initiated\n"); + /* Turn this connection back on */ + ev_reader_enable(c->r); + } + } else { + /* We are the first connection to get a callback so we must start a + * rescan. */ + if(c->rescan_wait) { + /* We want to block until the new rescan completes */ + trackdb_rescan(c->ev, 1/*check*/, finished_rescan, c); + } else { + /* We can report back immediately */ + trackdb_rescan(c->ev, 1/*check*/, 0, 0); + sink_writes(ev_writer_sink(c->w), "250 rescan initiated\n"); + /* Turn this connection back on */ + ev_reader_enable(c->r); + } + } +} + static int c_rescan(struct conn *c, - char attribute((unused)) **vec, - int attribute((unused)) nvec) { - info("S%x rescan by %s", c->tag, c->who); - trackdb_rescan(c->ev, 1/*check*/); - sink_writes(ev_writer_sink(c->w), "250 initiated rescan\n"); - return 1; /* completed */ + char **vec, + int nvec) { + int wait = 0, fresh = 0, n; + + /* Parse flags */ + for(n = 0; n < nvec; ++n) { + if(!strcmp(vec[n], "wait")) + wait = 1; /* wait for rescan to complete */ +#if 0 + /* Currently disabled because untested (and hard to test). */ + else if(!strcmp(vec[n], "fresh")) + fresh = 1; /* don't piggyback underway rescan */ +#endif + else { + sink_writes(ev_writer_sink(c->w), "550 unknown flag\n"); + return 1; /* completed */ + } + } + /* Report what was requested */ + info("S%x rescan by %s (%s %s)", c->tag, c->who, + wait ? "wait" : "", + fresh ? "fresh" : ""); + if(trackdb_rescan_underway()) { + if(fresh) { + /* We want a fresh rescan but there is already one underway. Arrange a + * callback when it completes and then set off a new one. */ + c->rescan_wait = wait; + trackdb_add_rescanned(start_fresh_rescan, c); + if(wait) + return 0; + else { + sink_writes(ev_writer_sink(c->w), "250 rescan queued\n"); + return 1; + } + } else { + /* There's a rescan underway, and it's acceptable to piggyback on it */ + if(wait) { + /* We want to block until completion. */ + trackdb_add_rescanned(finished_rescan, c); + return 0; + } else { + /* We don't want to block. So we just report that things are in + * hand. */ + sink_writes(ev_writer_sink(c->w), "250 rescan already underway\n"); + return 1; + } + } + } else { + /* No rescan is underway. fresh is therefore irrelevant. */ + if(wait) { + /* We want to block until completion */ + trackdb_rescan(c->ev, 1/*check*/, finished_rescan, c); + return 0; + } else { + /* We don't want to block. */ + trackdb_rescan(c->ev, 1/*check*/, 0, 0); + sink_writes(ev_writer_sink(c->w), "250 rescan initiated\n"); + return 1; /* completed */ + } + } } static int c_version(struct conn *c, @@ -1465,7 +1561,7 @@ static const struct command { { "register", 3, 3, c_register, RIGHT_REGISTER|RIGHT__LOCAL }, { "reminder", 1, 1, c_reminder, RIGHT__LOCAL }, { "remove", 1, 1, c_remove, RIGHT_REMOVE__MASK }, - { "rescan", 0, 0, c_rescan, RIGHT_RESCAN }, + { "rescan", 0, INT_MAX, c_rescan, RIGHT_RESCAN }, { "resolve", 1, 1, c_resolve, RIGHT_READ }, { "resume", 0, 0, c_resume, RIGHT_PAUSE }, { "revoke", 0, 0, c_revoke, RIGHT_READ }, diff --git a/server/state.c b/server/state.c index 3adcf2b..f88067b 100644 --- a/server/state.c +++ b/server/state.c @@ -154,7 +154,7 @@ int reconfigure(ev_source *ev, int reload) { /* We only allow for upgrade at startup */ trackdb_open(TRACKDB_CAN_UPGRADE); if(need_another_rescan) - trackdb_rescan(ev, 1/*check*/); + trackdb_rescan(ev, 1/*check*/, 0, 0); if(!ret) { queue_read(); recent_read(); diff --git a/tests/dtest.py b/tests/dtest.py index 9439bce..5eac2ea 100644 --- a/tests/dtest.py +++ b/tests/dtest.py @@ -271,16 +271,10 @@ def create_user(username="fred", password="fredpass"): "--user", "root", "edituser", username, "rights", "all"]) def rescan(c=None): - class rescan_monitor(disorder.monitor): - def rescanned(self): - return False + print " initiating rescan" if c is None: c = disorder.client() - m = rescan_monitor() - print " initiating rescan" - c.rescan() - print " waiting for rescan to complete" - m.run() + c.rescan('wait') print " rescan completed" def stop_daemon(): -- [mdw]