chiark / gitweb /
Add a new 'wait' flag to the rescan command. This allows the caller
authorRichard Kettlewell <rjk@greenend.org.uk>
Sun, 18 May 2008 16:51:36 +0000 (17:51 +0100)
committerRichard Kettlewell <rjk@greenend.org.uk>
Sun, 18 May 2008 16:51:36 +0000 (17:51 +0100)
to request that the rescan command blocks until the rescan is
complete.

The reason is that if you run the tests on a Linux tmpfs they would
with high probability hang, due to the rescan completing before the
rescan_monitor had started up.

The flags is available in the Python interface but not the C interface
or the command-line client.  This could easily be fixed if there is
demand.

There's also a 'fresh' flag, to demand that the rescan start after the
receipt of the command (i.e. to guarantee your new tracks make it in)
but I disabled that due to the inconvenience of testing it.  However
the code is still there if anyone feels like writing tests.

doc/disorder_protocol.5.in
lib/trackdb.c
lib/trackdb.h
python/disorder.py.in
server/disorderd.c
server/server.c
server/state.c
tests/dtest.py

index 5a2bf839a25e5601ad66b4e84824b48979ee3f4b..08470508c9be7c5e9c33ed7d12dad4d9fcf0fe74 100644 (file)
@@ -255,9 +255,21 @@ Requires one of the \fBremove mine\fR, \fBremove random\fR or
 \fBremove any\fR rights depending on how the
 track came to be added to the queue.
 .TP
 \fBremove any\fR rights depending on how the
 track came to be added to the queue.
 .TP
-.B rescan
+.B rescan \fR[\fBwait\fR] \fR[\fBfresh\fR]
 Rescan all roots for new or obsolete tracks.
 Requires the \fBrescan\fR right.
 Rescan all roots for new or obsolete tracks.
 Requires the \fBrescan\fR right.
+.IP
+If the \fBwait\fR flag is present then the response is delayed until the rescan
+completes.
+Otherwise the response arrives immediately.
+This is primarily intended for testing.
+.IP
+If the \fBfresh\fR flag is present a rescan is already underway then a second
+rescan will be started when it completes.
+The default behavior is to piggyback on the existing rescan.
+.IP
+NB that \fBfresh\fR is currently disabled in the server source, so using this
+flag will just provoke an error.
 .TP
 .B resolve \fITRACK\fR
 Resolve a track name, i.e. if this is an alias then return the real track name.
 .TP
 .B resolve \fITRACK\fR
 Resolve a track name, i.e. if this is an alias then return the real track name.
index 54016bae0c8bcaf2fff04784ce00b3c3068abaef..2080e63eeadebe4ed747fcd4929717b1152ed3bb 100644 (file)
@@ -2097,6 +2097,28 @@ int trackdb_scan(const char *root,
 
 /* trackdb_rescan ************************************************************/
 
 
 /* trackdb_rescan ************************************************************/
 
+/** @brief Node in the list of rescan-complete callbacks */
+struct rescanned_node {
+  struct rescanned_node *next;
+  void (*rescanned)(void *ru);
+  void *ru;
+};
+
+/** @brief List of rescan-complete callbacks */
+static struct rescanned_node *rescanned_list;
+
+/** @brief Add a rescan completion callback */
+void trackdb_add_rescanned(void (*rescanned)(void *ru),
+                           void *ru) {
+  if(rescanned) {
+    struct rescanned_node *n = xmalloc(sizeof *n);
+    n->next = rescanned_list;
+    n->rescanned = rescanned;
+    n->ru = ru;
+    rescanned_list = n;
+  }
+}
+
 /* called when the rescanner terminates */
 static int reap_rescan(ev_source attribute((unused)) *ev,
                        pid_t pid,
 /* called when the rescanner terminates */
 static int reap_rescan(ev_source attribute((unused)) *ev,
                        pid_t pid,
@@ -2111,23 +2133,37 @@ static int reap_rescan(ev_source attribute((unused)) *ev,
   /* Our cache of file lookups is out of date now */
   cache_clean(&cache_files_type);
   eventlog("rescanned", (char *)0);
   /* Our cache of file lookups is out of date now */
   cache_clean(&cache_files_type);
   eventlog("rescanned", (char *)0);
+  /* Call rescanned callbacks */
+  while(rescanned_list) {
+    void (*rescanned)(void *u) = rescanned_list->rescanned;
+    void *ru = rescanned_list->ru;
+
+    rescanned_list = rescanned_list->next;
+    rescanned(ru);
+  }
   return 0;
 }
 
 /** @brief Initiate a rescan
  * @param ev Event loop or 0 to block
  * @param recheck 1 to recheck lengths, 0 to suppress check
   return 0;
 }
 
 /** @brief Initiate a rescan
  * @param ev Event loop or 0 to block
  * @param recheck 1 to recheck lengths, 0 to suppress check
+ * @param rescanned Called on completion (if not NULL)
+ * @param u Passed to @p rescanned
  */
  */
-void trackdb_rescan(ev_source *ev, int recheck) {
+void trackdb_rescan(ev_source *ev, int recheck,
+                    void (*rescanned)(void *ru),
+                    void *ru) {
   int w;
 
   if(rescan_pid != -1) {
   int w;
 
   if(rescan_pid != -1) {
+    trackdb_add_rescanned(rescanned, ru);
     error(0, "rescan already underway");
     return;
   }
   rescan_pid = subprogram(ev, -1, RESCAN,
                           recheck ? "--check" : "--no-check",
                           (char *)0);
     error(0, "rescan already underway");
     return;
   }
   rescan_pid = subprogram(ev, -1, RESCAN,
                           recheck ? "--check" : "--no-check",
                           (char *)0);
+  trackdb_add_rescanned(rescanned, ru);
   if(ev) {
     ev_child(ev, rescan_pid, 0, reap_rescan, 0);
     D(("started rescanner"));
   if(ev) {
     ev_child(ev, rescan_pid, 0, reap_rescan, 0);
     D(("started rescanner"));
@@ -2147,6 +2183,11 @@ int trackdb_rescan_cancel(void) {
   return 1;
 }
 
   return 1;
 }
 
+/** @brief Return true if a rescan is underway */
+int trackdb_rescan_underway(void) {
+  return rescan_pid != -1;
+}
+
 /* global prefs **************************************************************/
 
 void trackdb_set_global(const char *name,
 /* global prefs **************************************************************/
 
 void trackdb_set_global(const char *name,
index f372f4cf0586e9fbe57e6fa3138a157d10d5eacc..f97ee5f338319d724ae13205afc6bd69b616e0d4 100644 (file)
@@ -136,7 +136,9 @@ char **trackdb_search(char **wordlist, int nwordlist, int *ntracks);
 /* return a list of tracks containing all of the words given.  If you
  * ask for only stopwords you get no tracks. */
 
 /* return a list of tracks containing all of the words given.  If you
  * ask for only stopwords you get no tracks. */
 
-void trackdb_rescan(struct ev_source *ev, int recheck);
+void trackdb_rescan(struct ev_source *ev, int recheck,
+                    void (*rescanned)(void *ru),
+                    void *ru);
 /* Start a rescan, if one is not running already */
 
 int trackdb_rescan_cancel(void);
 /* Start a rescan, if one is not running already */
 
 int trackdb_rescan_cancel(void);
@@ -177,6 +179,9 @@ typedef void random_callback(struct ev_source *ev,
                              const char *track);
 int trackdb_request_random(struct ev_source *ev,
                            random_callback *callback);
                              const char *track);
 int trackdb_request_random(struct ev_source *ev,
                            random_callback *callback);
+void trackdb_add_rescanned(void (*rescanned)(void *ru),
+                           void *ru);
+int trackdb_rescan_underway(void);
 
 #endif /* TRACKDB_H */
 
 
 #endif /* TRACKDB_H */
 
index 23f840cccd1749de5746627c3e34566a2676d103..ef33106fa8a1f8f007f1405e6889ab29c872619a 100644 (file)
@@ -479,12 +479,12 @@ class client:
     """
     self._simple("reconfigure")
 
     """
     self._simple("reconfigure")
 
-  def rescan(self):
+  def rescan(self, *flags):
     """Rescan one or more collections.
 
     Only trusted users can perform this operation.
     """
     """Rescan one or more collections.
 
     Only trusted users can perform this operation.
     """
-    self._simple("rescan")
+    self._simple("rescan", *flags)
 
   def version(self):
     """Return the server's version number."""
 
   def version(self):
     """Return the server's version number."""
index 3a5e09339ce775c814d12731772f36b1e611d006..9f444b683dd84d246cd6342f6804d5ea713c9d85 100644 (file)
@@ -160,7 +160,7 @@ static void create_periodic(ev_source *ev_,
 }
 
 static void periodic_rescan(ev_source *ev_) {
 }
 
 static void periodic_rescan(ev_source *ev_) {
-  trackdb_rescan(ev_, 1/*check*/);
+  trackdb_rescan(ev_, 1/*check*/, 0, 0);
 }
 
 static void periodic_database_gc(ev_source attribute((unused)) *ev_) {
 }
 
 static void periodic_database_gc(ev_source attribute((unused)) *ev_) {
index fd0fcd884ed29977d025beaeca07bc3ccff2a707..60cb869a8972bf85fc55a717e90358b0eb63be17 100644 (file)
@@ -124,6 +124,8 @@ struct conn {
   rights_type rights;
   /** @brief Next connection */
   struct conn *next;
   rights_type rights;
   /** @brief Next connection */
   struct conn *next;
+  /** @brief True if pending rescan had 'wait' set */
+  int rescan_wait;
 };
 
 /** @brief Linked list of connections */
 };
 
 /** @brief Linked list of connections */
@@ -355,13 +357,107 @@ static int c_reconfigure(struct conn *c,
   return 1;                            /* completed */
 }
 
   return 1;                            /* completed */
 }
 
+static void finished_rescan(void *ru) {
+  struct conn *const c = ru;
+
+  sink_writes(ev_writer_sink(c->w), "250 rescan completed\n");
+  /* Turn this connection back on */
+  ev_reader_enable(c->r);
+}
+
+static void start_fresh_rescan(void *ru) {
+  struct conn *const c = ru;
+
+  if(trackdb_rescan_underway()) {
+    /* Some other waiter beat us to it.  However in this case we're happy to
+     * piggyback; the requirement is that a new rescan be started, not that it
+     * was _our_ rescan. */
+    if(c->rescan_wait) {
+      /* We block until the rescan completes */
+      trackdb_add_rescanned(finished_rescan, c);
+    } else {
+      /* We report that the new rescan has started */
+      sink_writes(ev_writer_sink(c->w), "250 rescan initiated\n");
+      /* Turn this connection back on */
+      ev_reader_enable(c->r);
+    }
+  } else {
+    /* We are the first connection to get a callback so we must start a
+     * rescan. */
+    if(c->rescan_wait) {
+      /* We want to block until the new rescan completes */
+      trackdb_rescan(c->ev, 1/*check*/, finished_rescan, c);
+    } else {
+      /* We can report back immediately */
+      trackdb_rescan(c->ev, 1/*check*/, 0, 0);
+      sink_writes(ev_writer_sink(c->w), "250 rescan initiated\n");
+      /* Turn this connection back on */
+      ev_reader_enable(c->r);
+    }
+  }
+}
+
 static int c_rescan(struct conn *c,
 static int c_rescan(struct conn *c,
-                   char attribute((unused)) **vec,
-                   int attribute((unused)) nvec) {
-  info("S%x rescan by %s", c->tag, c->who);
-  trackdb_rescan(c->ev, 1/*check*/);
-  sink_writes(ev_writer_sink(c->w), "250 initiated rescan\n");
-  return 1;                            /* completed */
+                   char **vec,
+                   int nvec) {
+  int wait = 0, fresh = 0, n;
+
+  /* Parse flags */
+  for(n = 0; n < nvec; ++n) {
+    if(!strcmp(vec[n], "wait"))
+      wait = 1;                                /* wait for rescan to complete */
+#if 0
+    /* Currently disabled because untested (and hard to test). */
+    else if(!strcmp(vec[n], "fresh"))
+      fresh = 1;                       /* don't piggyback underway rescan */
+#endif
+    else {
+      sink_writes(ev_writer_sink(c->w), "550 unknown flag\n");
+      return 1;                                /* completed */
+    }
+  }
+  /* Report what was requested */
+  info("S%x rescan by %s (%s %s)", c->tag, c->who,
+       wait ? "wait" : "",
+       fresh ? "fresh" : "");
+  if(trackdb_rescan_underway()) {
+    if(fresh) {
+      /* We want a fresh rescan but there is already one underway.  Arrange a
+       * callback when it completes and then set off a new one. */
+      c->rescan_wait = wait;
+      trackdb_add_rescanned(start_fresh_rescan, c);
+      if(wait)
+       return 0;
+      else {
+       sink_writes(ev_writer_sink(c->w), "250 rescan queued\n");
+       return 1;
+      }
+    } else {
+      /* There's a rescan underway, and it's acceptable to piggyback on it */
+      if(wait) {
+       /* We want to block until completion. */
+       trackdb_add_rescanned(finished_rescan, c);
+       return 0;
+      } else {
+       /* We don't want to block.  So we just report that things are in
+        * hand. */
+       sink_writes(ev_writer_sink(c->w), "250 rescan already underway\n");
+       return 1;
+      }
+    }
+  } else {
+    /* No rescan is underway.  fresh is therefore irrelevant. */
+    if(wait) {
+      /* We want to block until completion */
+      trackdb_rescan(c->ev, 1/*check*/, finished_rescan, c);
+      return 0;
+    } else {
+      /* We don't want to block. */
+      trackdb_rescan(c->ev, 1/*check*/, 0, 0);
+      sink_writes(ev_writer_sink(c->w), "250 rescan initiated\n");
+      return 1;                                /* completed */
+    }
+  }
 }
 
 static int c_version(struct conn *c,
 }
 
 static int c_version(struct conn *c,
@@ -1465,7 +1561,7 @@ static const struct command {
   { "register",       3, 3,       c_register,       RIGHT_REGISTER|RIGHT__LOCAL },
   { "reminder",       1, 1,       c_reminder,       RIGHT__LOCAL },
   { "remove",         1, 1,       c_remove,         RIGHT_REMOVE__MASK },
   { "register",       3, 3,       c_register,       RIGHT_REGISTER|RIGHT__LOCAL },
   { "reminder",       1, 1,       c_reminder,       RIGHT__LOCAL },
   { "remove",         1, 1,       c_remove,         RIGHT_REMOVE__MASK },
-  { "rescan",         0, 0,       c_rescan,         RIGHT_RESCAN },
+  { "rescan",         0, INT_MAX, c_rescan,         RIGHT_RESCAN },
   { "resolve",        1, 1,       c_resolve,        RIGHT_READ },
   { "resume",         0, 0,       c_resume,         RIGHT_PAUSE },
   { "revoke",         0, 0,       c_revoke,         RIGHT_READ },
   { "resolve",        1, 1,       c_resolve,        RIGHT_READ },
   { "resume",         0, 0,       c_resume,         RIGHT_PAUSE },
   { "revoke",         0, 0,       c_revoke,         RIGHT_READ },
index 3adcf2b88fcf51ef1d651d5d0c00e0e7e8e5979e..f88067b80eb8dd7b094fc62793d22bf28d64f6e2 100644 (file)
@@ -154,7 +154,7 @@ int reconfigure(ev_source *ev, int reload) {
     /* We only allow for upgrade at startup */
     trackdb_open(TRACKDB_CAN_UPGRADE);
   if(need_another_rescan)
     /* We only allow for upgrade at startup */
     trackdb_open(TRACKDB_CAN_UPGRADE);
   if(need_another_rescan)
-    trackdb_rescan(ev, 1/*check*/);
+    trackdb_rescan(ev, 1/*check*/, 0, 0);
   if(!ret) {
     queue_read();
     recent_read();
   if(!ret) {
     queue_read();
     recent_read();
index 9439bcead790db6c5242339aedd0dfe240b0ccdf..5eac2ea6ba519f652cb17d95ba803e008bf6cf3a 100644 (file)
@@ -271,16 +271,10 @@ def create_user(username="fred", password="fredpass"):
              "--user", "root", "edituser", username, "rights", "all"])
 
 def rescan(c=None):
              "--user", "root", "edituser", username, "rights", "all"])
 
 def rescan(c=None):
-    class rescan_monitor(disorder.monitor):
-        def rescanned(self):
-            return False
+    print " initiating rescan"
     if c is None:
         c = disorder.client()
     if c is None:
         c = disorder.client()
-    m = rescan_monitor()
-    print " initiating rescan"
-    c.rescan()
-    print " waiting for rescan to complete"
-    m.run()
+    c.rescan('wait')
     print " rescan completed"
 
 def stop_daemon():
     print " rescan completed"
 
 def stop_daemon():