chiark / gitweb /
Consistency check for finished tracks.
[disorder] / server / speaker.c
index 123f7a2..a8524b4 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * This file is part of DisOrder
- * Copyright (C) 2005-2009 Richard Kettlewell
+ * Copyright (C) 2005-2010 Richard Kettlewell
  * Portions (C) 2007 Mark Wooding
  *
  * This program is free software: you can redistribute it and/or modify
  *
  * Audio is supplied from this buffer to the uaudio play callback.  Playback is
  * enabled when a track is to be played and disabled when the its last bytes
- * have been return by the callback; pause and resume is implemneted the
+ * have been returned by the callback; pause and resume is implemented the
  * obvious way.  If the callback finds itself required to play when there is no
  * playing track it returns dead air.
  *
+ * To implement gapless playback, the server is notified that a track has
+ * finished slightly early.  @ref SM_PLAY is therefore allowed to arrive while
+ * the previous track is still playing provided an early @ref SM_FINISHED has
+ * been sent for it.
+ *
  * @b Encodings.  The encodings supported depend entirely on the uaudio backend
  * chosen.  See @ref uaudio.h, etc.
  *
@@ -70,7 +75,6 @@
 #include <syslog.h>
 #include <unistd.h>
 #include <errno.h>
-#include <ao/ao.h>
 #include <sys/select.h>
 #include <sys/wait.h>
 #include <time.h>
@@ -80,6 +84,7 @@
 #include <sys/stat.h>
 #include <pthread.h>
 #include <sys/resource.h>
+#include <gcrypt.h>
 
 #include "configuration.h"
 #include "syscalls.h"
 /** @brief Maximum number of FDs to poll for */
 #define NFDS 1024
 
+/** @brief Number of bytes before end of track to send SM_FINISHED
+ *
+ * Generally set to 1 second.
+ */
+static size_t early_finish;
+
 /** @brief Track structure
  *
  * Known tracks are kept in a linked list.  Usually there will be at most two
@@ -105,7 +116,7 @@ struct track {
   struct track *next;
 
   /** @brief Input file descriptor */
-  int fd;                               /* input FD */
+  int fd;
 
   /** @brief Track ID */
   char id[24];
@@ -132,6 +143,14 @@ struct track {
    * out life not playable.
    */
   int playable;
+
+  /** @brief Set when finished
+   *
+   * This is set when we've notified the server that the track is finished.
+   * Once this has happened (typically very late in the track's lifetime) the
+   * track cannot be paused or cancelled.
+   */
+  int finished;
   
   /** @brief Input buffer
    *
@@ -143,26 +162,47 @@ struct track {
 /** @brief Lock protecting data structures
  *
  * This lock protects values shared between the main thread and the callback.
- * It is needed e.g. if changing @ref playing or if modifying buffer pointers.
- * It is not needed to add a new track, to read values only modified in the
- * same thread, etc.
+ *
+ * It is held 'all' the time by the main thread, the exceptions being when
+ * called activate/deactivate callbacks and when calling (potentially) slow
+ * system calls (in particular poll(), where in fact the main thread will spend
+ * most of its time blocked).
+ *
+ * The callback holds it when it's running.
  */
 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
 
-/** @brief Linked list of all prepared tracks */
+/** @brief Linked list of all prepared tracks
+ *
+ * This includes @ref playing and @ref pending_playing.
+ */
 static struct track *tracks;
 
 /** @brief Playing track, or NULL
  *
- * This means the DESIRED playing track.  It does not reflect any other state
- * (e.g. activation of uaudio backend).
+ * This means the track the speaker process intends to play.  It does not
+ * reflect any other state (e.g. activation of uaudio backend).
+ *
+ * This track remains on @ref track.
  */
 static struct track *playing;
 
+/** @brief Pending playing track, or NULL
+ *
+ * This means the track the server wants the speaker to play.
+ *
+ * This track remains on @p track.
+ */
+static struct track *pending_playing;
+
 /** @brief Array of file descriptors for poll() */
 static struct pollfd fds[NFDS];
 
-/** @brief Next free slot in @ref fds */
+/** @brief Next free slot in @ref fds
+ *
+ * This is used when filling in the @ref fds array each iteration through the
+ * event loop.
+ */
 static int fdno;
 
 /** @brief Listen socket */
@@ -213,7 +253,7 @@ static void help(void) {
 
 /** @brief Find track @p id, maybe creating it if not found
  * @param id Track ID to find
- * @param create If nonzero, create track structure of @p id not found
+ * @param create If nonzero, create track structure of @p id if not found
  * @return Pointer to track structure or NULL
  */
 static struct track *findtrack(const char *id, int create) {
@@ -264,6 +304,8 @@ static void destroy(struct track *t) {
  * This is effectively the read callback on @c t->fd.  It is called from the
  * main loop whenever the track's file descriptor is readable, assuming the
  * buffer has not reached the maximum allowed occupancy.
+ *
+ * Errors count as EOF.
  */
 static int speaker_fill(struct track *t) {
   size_t where, left;
@@ -273,7 +315,6 @@ static int speaker_fill(struct track *t) {
      t->id, t->eof, t->used));
   if(t->eof)
     return -1;
-  pthread_mutex_lock(&lock);
   if(t->used < sizeof t->buffer) {
     /* there is room left in the buffer */
     where = (t->start + t->used) % sizeof t->buffer;
@@ -287,12 +328,16 @@ static int speaker_fill(struct track *t) {
       n = read(t->fd, t->buffer + where, left);
     } while(n < 0 && errno == EINTR);
     pthread_mutex_lock(&lock);
-    if(n < 0) {
-      if(errno != EAGAIN)
-        fatal(errno, "error reading sample stream");
+    if(n < 0 && errno == EAGAIN) {
+      /* EAGAIN means more later */
       rc = 0;
-    } else if(n == 0) {
-      D(("fill %s: eof detected", t->id));
+    } else if(n <= 0) {
+      /* n=0 means EOF.  n<0 means some error occurred.  We log the error but
+       * otherwise treat it as identical to EOF. */
+      if(n < 0)
+        disorder_error(errno, "error reading sample stream for %s", t->id);
+      else
+        D(("fill %s: eof detected", t->id));
       t->eof = 1;
       /* A track always becomes playable at EOF; we're not going to see any
        * more data. */
@@ -307,8 +352,8 @@ static int speaker_fill(struct track *t) {
         t->playable = 1;
       rc = 0;
     }
-  }
-  pthread_mutex_unlock(&lock);
+  } else
+    rc = 0;
   return rc;
 }
 
@@ -316,10 +361,17 @@ static int speaker_fill(struct track *t) {
  *
  * We want to play audio if there is a current track; and it is not paused; and
  * it is playable according to the rules for @ref track::playable.
+ *
+ * We don't allow tracks to be paused if we've already told the server we've
+ * finished them; that would cause such tracks to survive much longer than the
+ * few samples they're supposed to, with report() remaining silent for the
+ * duration.  The effect is that if you hit pause towards the end of a track,
+ * what should happen is that it finished but the next one is paused right at
+ * its start.
  */
 static int playable(void) {
   return playing
-         && !paused
+         && (!paused || playing->finished)
          && playing->playable;
 }
 
@@ -328,15 +380,17 @@ static void report(void) {
   struct speaker_message sm;
 
   if(playing) {
+    /* Had better not send a report for a track that the server thinks has
+     * finished, that would be confusing. */
+    if(playing->finished)
+      return;
     memset(&sm, 0, sizeof sm);
     sm.type = paused ? SM_PAUSED : SM_PLAYING;
     strcpy(sm.id, playing->id);
-    pthread_mutex_lock(&lock);
     sm.data = playing->played / (uaudio_rate * uaudio_channels);
-    pthread_mutex_unlock(&lock);
     speaker_send(1, &sm);
+    xtime(&last_report);
   }
-  time(&last_report);
 }
 
 /** @brief Add a file descriptor to the set to poll() for
@@ -390,9 +444,12 @@ static size_t speaker_callback(void *buffer,
       /* Wrap around to start of buffer */
       if(playing->start == sizeof playing->buffer)
         playing->start = 0;
-      /* See if we've reached the end of the track */
-      if(playing->used == 0 && playing->eof)
-        write(sigpipe[1], "", 1);
+      /* See if we've reached the end of the track; if so make sure the event
+       * loop wakes up. */
+      if(playing->used == 0 && playing->eof) {
+        int ignored = write(sigpipe[1], "", 1);
+        (void) ignored;
+      }
       provided_samples = bytes / uaudio_sample_size;
       playing->played += provided_samples;
     }
@@ -402,6 +459,11 @@ static size_t speaker_callback(void *buffer,
   if(!provided_samples) {
     memset(buffer, 0, max_bytes);
     provided_samples = max_samples;
+    if(playing)
+      disorder_info("%zu samples silence, playing->used=%zu",
+                    provided_samples, playing->used);
+    else
+      disorder_info("%zu samples silence, playing=NULL", provided_samples);
   }
   pthread_mutex_unlock(&lock);
   return provided_samples;
@@ -413,14 +475,15 @@ static void mainloop(void) {
   struct speaker_message sm;
   int n, fd, stdin_slot, timeout, listen_slot, sigpipe_slot;
 
+  pthread_mutex_lock(&lock);
   /* Keep going while our parent process is alive */
   while(getppid() != 1) {
     int force_report = 0;
 
     fdno = 0;
-    /* By default we will wait up to a second before thinking about current
-     * state. */
-    timeout = 1000;
+    /* By default we will wait up to half a second before thinking about
+     * current state. */
+    timeout = 500;
     /* Always ready for commands from the main server. */
     stdin_slot = addfd(0, POLLIN);
     /* Also always ready for inbound connections */
@@ -434,6 +497,8 @@ static void mainloop(void) {
       playing->slot = addfd(playing->fd, POLLIN);
     else if(playing)
       playing->slot = -1;
+    /* Allow the poll() to be interrupted at the end of a track */
+    sigpipe_slot = addfd(sigpipe[0], POLLIN);
     /* If any other tracks don't have a full buffer, try to read sample data
      * from them.  We do this last of all, so that if we run out of slots,
      * nothing important can't be monitored. */
@@ -446,12 +511,13 @@ static void mainloop(void) {
         } else
           t->slot = -1;
       }
-    sigpipe_slot = addfd(sigpipe[1], POLLIN);
     /* Wait for something interesting to happen */
+    pthread_mutex_unlock(&lock);
     n = poll(fds, fdno, timeout);
+    pthread_mutex_lock(&lock);
     if(n < 0) {
       if(errno == EINTR) continue;
-      fatal(errno, "error calling poll");
+      disorder_fatal(errno, "error calling poll");
     }
     /* Perhaps a connection has arrived */
     if(fds[listen_slot].revents & POLLIN) {
@@ -461,32 +527,41 @@ static void mainloop(void) {
       char id[24];
 
       if((fd = accept(listenfd, (struct sockaddr *)&addr, &addrlen)) >= 0) {
+        /* We do blocking reads for the header.  In theory this means that the
+         * connecting process could wedge the speaker indefinitely.  In
+         * practice that would mean that the main server was broken anyway.
+         * Still, this is ugly, and a rewrite would be nice. */
         blocking(fd);
         if(read(fd, &l, sizeof l) < 4) {
-          error(errno, "reading length from inbound connection");
+          disorder_error(errno, "reading length from inbound connection");
           xclose(fd);
         } else if(l >= sizeof id) {
-          error(0, "id length too long");
+          disorder_error(0, "id length too long");
           xclose(fd);
         } else if(read(fd, id, l) < (ssize_t)l) {
-          error(errno, "reading id from inbound connection");
+          disorder_error(errno, "reading id from inbound connection");
           xclose(fd);
         } else {
           id[l] = 0;
           D(("id %s fd %d", id, fd));
           t = findtrack(id, 1/*create*/);
           if (write(fd, "", 1) < 0)             /* write an ack */
-                       error(errno, "writing ack to inbound connection");
+            disorder_error(errno, "writing ack to inbound connection for %s",
+                           id);
           if(t->fd != -1) {
-            error(0, "%s: already got a connection", id);
+            disorder_error(0, "%s: already got a connection", id);
             xclose(fd);
           } else {
             nonblock(fd);
             t->fd = fd;               /* yay */
           }
+          /* Notify the server that the connection arrived */
+          sm.type = SM_ARRIVED;
+          strcpy(sm.id, id);
+          speaker_send(1, &sm);
         }
       } else
-        error(errno, "accept");
+        disorder_error(errno, "accept");
     }
     /* Perhaps we have a command to process */
     if(fds[stdin_slot].revents & POLLIN) {
@@ -494,18 +569,41 @@ static void mainloop(void) {
        * this won't be the case, so we don't bother looping around to pick them
        * all up. */ 
       n = speaker_recv(0, &sm);
-      /* TODO */
       if(n > 0)
+        /* As a rule we don't send success replies to most commands - we just
+         * force the regular status update to be sent immediately rather than
+         * on schedule. */
        switch(sm.type) {
        case SM_PLAY:
-          if(playing)
-            fatal(0, "got SM_PLAY but already playing something");
+          /* SM_PLAY is only allowed if the server reasonably believes that
+           * nothing is playing */
+          if(playing) {
+            /* If finished isn't set then the server can't believe that this
+             * track has finished */
+            if(!playing->finished)
+              disorder_fatal(0, "got SM_PLAY but already playing something");
+            /* If pending_playing is set then the server must believe that that
+             * is playing */
+            if(pending_playing)
+              disorder_fatal(0, "got SM_PLAY but have a pending playing track");
+          }
          t = findtrack(sm.id, 1);
           D(("SM_PLAY %s fd %d", t->id, t->fd));
           if(t->fd == -1)
-            error(0, "cannot play track because no connection arrived");
-          playing = t;
-          force_report = 1;
+            disorder_error(0,
+                           "cannot play track because no connection arrived");
+          /* TODO as things stand we often report this error message but then
+           * appear to proceed successfully.  Understanding why requires a look
+           * at play.c: we call prepare() which makes the connection in a child
+           * process, and then sends the SM_PLAY in the parent process.  The
+           * latter may well be faster.  As it happens this is harmless; we'll
+           * just sit around sending silence until the decoder connects and
+           * starts sending some sample data.  But is is annoying and ought to
+           * be fixed. */
+          pending_playing = t;
+          /* If nothing is currently playing then we'll switch to the pending
+           * track below so there's no point distinguishing the situations
+           * here. */
          break;
        case SM_PAUSE:
           D(("SM_PAUSE"));
@@ -521,40 +619,43 @@ static void mainloop(void) {
           D(("SM_CANCEL %s", sm.id));
          t = removetrack(sm.id);
          if(t) {
-            pthread_mutex_lock(&lock);
-           if(t == playing) {
-              /* scratching the playing track */
+           if(t == playing || t == pending_playing) {
+              /* Scratching the track that the server believes is playing,
+               * which might either be the actual playing track or a pending
+               * playing track */
               sm.type = SM_FINISHED;
-             playing = 0;
+              if(t == playing)
+                playing = 0;
+              else
+                pending_playing = 0;
             } else {
               /* Could be scratching the playing track before it's quite got
                * going, or could be just removing a track from the queue.  We
                * log more because there's been a bug here recently than because
                * it's particularly interesting; the log message will be removed
                * if no further problems show up. */
-              info("SM_CANCEL for nonplaying track %s", sm.id);
+              disorder_info("SM_CANCEL for nonplaying track %s", sm.id);
               sm.type = SM_STILLBORN;
             }
             strcpy(sm.id, t->id);
            destroy(t);
-            pthread_mutex_unlock(&lock);
          } else {
             /* Probably scratching the playing track well before it's got
              * going, but could indicate a bug, so we log this as an error. */
             sm.type = SM_UNKNOWN;
-           error(0, "SM_CANCEL for unknown track %s", sm.id);
+           disorder_error(0, "SM_CANCEL for unknown track %s", sm.id);
           }
           speaker_send(1, &sm);
           force_report = 1;
          break;
        case SM_RELOAD:
           D(("SM_RELOAD"));
-         if(config_read(1))
-            error(0, "cannot read configuration");
-          info("reloaded configuration");
+         if(config_read(1, NULL))
+            disorder_error(0, "cannot read configuration");
+          disorder_info("reloaded configuration");
          break;
        default:
-         error(0, "unknown message type %d", sm.type);
+         disorder_error(0, "unknown message type %d", sm.type);
         }
     }
     /* Read in any buffered data */
@@ -567,36 +668,59 @@ static void mainloop(void) {
      * interrupted poll(). */
     if(fds[sigpipe_slot].revents & POLLIN) {
       char buffer[64];
+      int ignored; (void)ignored;
 
-      read(sigpipe[0], buffer, sizeof buffer);
+      ignored = read(sigpipe[0], buffer, sizeof buffer);
     }
-    if(playing && playing->used == 0 && playing->eof) {
-      /* The playing track is done.  Tell the server, and destroy it. */
+    /* Send SM_FINISHED when we're near the end of the track.
+     *
+     * This is how we implement gapless play; we hope that the SM_PLAY from the
+     * server arrives before the remaining bytes of the track play out.
+     */
+    if(playing
+       && playing->eof
+       && !playing->finished
+       && playing->used <= early_finish) {
       memset(&sm, 0, sizeof sm);
       sm.type = SM_FINISHED;
       strcpy(sm.id, playing->id);
       speaker_send(1, &sm);
+      playing->finished = 1;
+    }
+    /* When the track is actually finished, deconfigure it */
+    if(playing && playing->eof && !playing->used) {
+      if(!playing->finished) {
+        /* should never happen but we'd like to know if it does */
+        disorder_fatal(0, "track finish state inconsistent");
+      }
       removetrack(playing->id);
-      pthread_mutex_lock(&lock);
       destroy(playing);
       playing = 0;
-      pthread_mutex_unlock(&lock);
-      /* The server will presumalby send as an SM_PLAY by return */
+    }
+    /* Act on the pending SM_PLAY */
+    if(!playing && pending_playing) {
+      playing = pending_playing;
+      pending_playing = 0;
+      force_report = 1;
     }
     /* Impose any state change required by the above */
     if(playable()) {
       if(!activated) {
         activated = 1;
+        pthread_mutex_unlock(&lock);
         backend->activate();
+        pthread_mutex_lock(&lock);
       }
     } else {
       if(activated) {
         activated = 0;
+        pthread_mutex_unlock(&lock);
         backend->deactivate();
+        pthread_mutex_lock(&lock);
       }
     }
     /* If we've not reported our state for a second do so now. */
-    if(force_report || time(0) > last_report)
+    if(force_report || xtime(0) > last_report)
       report();
   }
 }
@@ -611,7 +735,7 @@ int main(int argc, char **argv) {
   struct rlimit rl[1];
 
   set_progname(argv);
-  if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale");
+  if(!setlocale(LC_CTYPE, "")) disorder_fatal(errno, "error calling setlocale");
   while((n = getopt_long(argc, argv, "hVc:dDSs", options, 0)) >= 0) {
     switch(n) {
     case 'h': help();
@@ -621,7 +745,7 @@ int main(int argc, char **argv) {
     case 'D': debugging = 0; break;
     case 'S': logsyslog = 0; break;
     case 's': logsyslog = 1; break;
-    default: fatal(0, "invalid option");
+    default: disorder_fatal(0, "invalid option");
     }
   }
   if((d = getenv("DISORDER_DEBUG_SPEAKER"))) debugging = atoi(d);
@@ -630,7 +754,7 @@ int main(int argc, char **argv) {
     log_default = &log_syslog;
   }
   config_uaudio_apis = uaudio_apis;
-  if(config_read(1)) fatal(0, "cannot read configuration");
+  if(config_read(1, NULL)) disorder_fatal(0, "cannot read configuration");
   /* ignore SIGPIPE */
   signal(SIGPIPE, SIG_IGN);
   /* set nice value */
@@ -639,19 +763,24 @@ int main(int argc, char **argv) {
   become_mortal();
   /* make sure we're not root, whatever the config says */
   if(getuid() == 0 || geteuid() == 0)
-    fatal(0, "do not run as root");
+    disorder_fatal(0, "do not run as root");
   /* Make sure we can't have more than NFDS files open (it would bust our
    * poll() array) */
   if(getrlimit(RLIMIT_NOFILE, rl) < 0)
-    fatal(errno, "getrlimit RLIMIT_NOFILE");
+    disorder_fatal(errno, "getrlimit RLIMIT_NOFILE");
   if(rl->rlim_cur > NFDS) {
     rl->rlim_cur = NFDS;
     if(setrlimit(RLIMIT_NOFILE, rl) < 0)
-      fatal(errno, "setrlimit to reduce RLIMIT_NOFILE to %lu",
+      disorder_fatal(errno, "setrlimit to reduce RLIMIT_NOFILE to %lu",
             (unsigned long)rl->rlim_cur);
-    info("set RLIM_NOFILE to %lu", (unsigned long)rl->rlim_cur);
+    disorder_info("set RLIM_NOFILE to %lu", (unsigned long)rl->rlim_cur);
   } else
-    info("RLIM_NOFILE is %lu", (unsigned long)rl->rlim_cur);
+    disorder_info("RLIM_NOFILE is %lu", (unsigned long)rl->rlim_cur);
+  /* gcrypt initialization */
+  if(!gcry_check_version(NULL))
+    disorder_fatal(0, "gcry_check_version failed");
+  gcry_control(GCRYCTL_INIT_SECMEM, 0);
+  gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
   /* create a pipe between the backend callback and the poll() loop */
   xpipe(sigpipe);
   nonblock(sigpipe[0]);
@@ -660,15 +789,18 @@ int main(int argc, char **argv) {
                     config->sample_format.channels,
                     config->sample_format.bits,
                     config->sample_format.bits != 8);
+  early_finish = uaudio_sample_size * uaudio_channels * uaudio_rate;
   /* TODO other parameters! */
   backend = uaudio_find(config->api);
   /* backend-specific initialization */
+  if(backend->configure)
+    backend->configure();
   backend->start(speaker_callback, NULL);
   /* create the socket directory */
   byte_xasprintf(&dir, "%s/speaker", config->home);
   unlink(dir);                          /* might be a leftover socket */
   if(mkdir(dir, 0700) < 0 && errno != EEXIST)
-    fatal(errno, "error creating %s", dir);
+    disorder_fatal(errno, "error creating %s", dir);
   /* set up the listen socket */
   listenfd = xsocket(PF_UNIX, SOCK_STREAM, 0);
   memset(&addr, 0, sizeof addr);
@@ -676,18 +808,18 @@ int main(int argc, char **argv) {
   snprintf(addr.sun_path, sizeof addr.sun_path, "%s/speaker/socket",
            config->home);
   if(unlink(addr.sun_path) < 0 && errno != ENOENT)
-    error(errno, "removing %s", addr.sun_path);
+    disorder_error(errno, "removing %s", addr.sun_path);
   xsetsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof one);
   if(bind(listenfd, (const struct sockaddr *)&addr, sizeof addr) < 0)
-    fatal(errno, "error binding socket to %s", addr.sun_path);
+    disorder_fatal(errno, "error binding socket to %s", addr.sun_path);
   xlisten(listenfd, 128);
   nonblock(listenfd);
-  info("listening on %s", addr.sun_path);
+  disorder_info("listening on %s", addr.sun_path);
   memset(&sm, 0, sizeof sm);
   sm.type = SM_READY;
   speaker_send(1, &sm);
   mainloop();
-  info("stopped (parent terminated)");
+  disorder_info("stopped (parent terminated)");
   exit(0);
 }