chiark / gitweb /
separate thread to add to heap
authorRichard Kettlewell <rjk@greenend.org.uk>
Sun, 23 Sep 2007 11:54:25 +0000 (12:54 +0100)
committerRichard Kettlewell <rjk@greenend.org.uk>
Sun, 23 Sep 2007 11:54:25 +0000 (12:54 +0100)
clients/playrtp.c
lib/timeval.h
server/speaker.c

index d64162be33ee57437e3e37bfd1a5a433bf1ca0c0..1ed0d938eb1210fdbf01a825f32a02bf6de90da1 100644 (file)
  * systems.  There is no support for Microsoft Windows yet, and that will in
  * fact probably an entirely separate program.
  *
- * The program runs (at least) two threads.  listen_thread() is responsible for
- * reading RTP packets off the wire and adding them to the binary heap @ref
- * packets, assuming they are basically sound.
+ * The program runs (at least) three threads.  listen_thread() is responsible
+ * for reading RTP packets off the wire and adding them to the linked list @ref
+ * received_packets, assuming they are basically sound.  queue_thread() takes
+ * packets off this linked list and adds them to @ref packets (an operation
+ * which might be much slower due to contention for @ref lock).
  *
  * The main thread is responsible for actually playing audio.  In ALSA this
  * means it waits until ALSA says it's ready for more audio which it then
@@ -41,6 +43,9 @@
  * Sometimes it happens that there is no audio available to play.  This may
  * because the server went away, or a packet was dropped, or the server
  * deliberately did not send any sound because it encountered a silence.
+ *
+ * Assumptions:
+ * - it is safe to read uint32_t values without a lock protecting them
  */
 
 #include <config.h>
@@ -67,6 +72,7 @@
 #include "defs.h"
 #include "vector.h"
 #include "heap.h"
+#include "timeval.h"
 
 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
 # include <CoreAudio/AudioHardware.h>
@@ -120,6 +126,9 @@ static unsigned maxbuffer;
  * timestamp.
  */
 struct packet {
+  /** @brief Next packet in @ref next_free_packet or @ref received_packets */
+  struct packet *next;
+  
   /** @brief Number of samples in this packet */
   uint32_t nsamples;
 
@@ -177,6 +186,35 @@ static inline int lt_packet(const struct packet *a, const struct packet *b) {
   return lt(a->timestamp, b->timestamp);
 }
 
+/** @brief Received packets
+ * Protected by @ref receive_lock
+ *
+ * Received packets are added to this list, and queue_thread() picks them off
+ * it and adds them to @ref packets.  Whenever a packet is added to it, @ref
+ * receive_cond is signalled.
+ */
+static struct packet *received_packets;
+
+/** @brief Tail of @ref received_packets
+ * Protected by @ref receive_lock
+ */
+static struct packet **received_tail = &received_packets;
+
+/** @brief Lock protecting @ref received_packets 
+ *
+ * Only listen_thread() and queue_thread() ever hold this lock.  It is vital
+ * that queue_thread() not hold it any longer than it strictly has to. */
+static pthread_mutex_t receive_lock = PTHREAD_MUTEX_INITIALIZER;
+
+/** @brief Condition variable signalled when @ref received_packets is updated
+ *
+ * Used by listen_thread() to notify queue_thread() that it has added another
+ * packet to @ref received_packets. */
+static pthread_cond_t receive_cond = PTHREAD_COND_INITIALIZER;
+
+/** @brief Length of @ref received_packets */
+static uint32_t nreceived;
+
 /** @struct pheap 
  * @brief Binary heap of packets ordered by timestamp */
 HEAP_TYPE(pheap, struct packet *, lt_packet);
@@ -184,8 +222,12 @@ HEAP_TYPE(pheap, struct packet *, lt_packet);
 /** @brief Binary heap of received packets */
 static struct pheap packets;
 
-/** @brief Total number of samples available */
-static unsigned long nsamples;
+/** @brief Total number of samples available
+ *
+ * We make this volatile because we inspect it without a protecting lock,
+ * so the usual pthread_* guarantees aren't available.
+ */
+static volatile uint32_t nsamples;
 
 /** @brief Timestamp of next packet to play.
  *
@@ -199,6 +241,12 @@ static uint32_t next_timestamp;
  * This is true when playing and false when just buffering. */
 static int active;
 
+/** @brief Lock protecting @ref packets */
+static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
+
+/** @brief Condition variable signalled whenever @ref packets is changed */
+static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+
 /** @brief Structure of free packet list */
 union free_packet {
   struct packet p;
@@ -231,14 +279,8 @@ static union free_packet *next_free_packet;
  */
 static size_t count_free_packets;
 
-/** @brief Lock protecting @ref packets 
- *
- * This also protects the packet memory allocation infrastructure, @ref
- * free_packets and @ref next_free_packet. */
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-
-/** @brief Condition variable signalled whenever @ref packets is changed */
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+/** @brief Lock protecting packet allocator */
+static pthread_mutex_t mem_lock = PTHREAD_MUTEX_INITIALIZER;
 
 static const struct option options[] = {
   { "help", no_argument, 0, 'h' },
@@ -251,12 +293,11 @@ static const struct option options[] = {
   { 0, 0, 0, 0 }
 };
 
-/** @brief Return a new packet
- *
- * Assumes that @ref lock is held. */
+/** @Brief Return a new packet */
 static struct packet *new_packet(void) {
   struct packet *p;
-
+  
+  pthread_mutex_lock(&mem_lock);
   if(free_packets) {
     p = &free_packets->p;
     free_packets = free_packets->next;
@@ -268,16 +309,17 @@ static struct packet *new_packet(void) {
     p = &(next_free_packet++)->p;
     --count_free_packets;
   }
+  pthread_mutex_unlock(&mem_lock);
   return p;
 }
 
-/** @brief Free a packet
- *
- * Assumes that @ref lock is held. */
+/** @brief Free a packet */
 static void free_packet(struct packet *p) {
   union free_packet *u = (union free_packet *)p;
+  pthread_mutex_lock(&mem_lock);
   u->next = free_packets;
   free_packets = u;
+  pthread_mutex_unlock(&mem_lock);
 }
 
 /** @brief Drop the first packet
@@ -293,6 +335,36 @@ static void drop_first_packet(void) {
   }
 }
 
+/** @brief Background thread adding packets to heap
+ *
+ * This just transfers packets from @ref received_packets to @ref packets.  It
+ * is important that it holds @ref receive_lock for as little time as possible,
+ * in order to minimize the interval between calls to read() in
+ * listen_thread().
+ */
+static void *queue_thread(void attribute((unused)) *arg) {
+  struct packet *p;
+
+  for(;;) {
+    /* Get the next packet */
+    pthread_mutex_lock(&receive_lock);
+    while(!received_packets)
+      pthread_cond_wait(&receive_cond, &receive_lock);
+    p = received_packets;
+    received_packets = p->next;
+    if(!received_packets)
+      received_tail = &received_packets;
+    --nreceived;
+    pthread_mutex_unlock(&receive_lock);
+    /* Add it to the heap */
+    pthread_mutex_lock(&lock);
+    pheap_insert(&packets, p);
+    nsamples += p->nsamples;
+    pthread_cond_broadcast(&cond);
+    pthread_mutex_unlock(&lock);
+  }
+}
+
 /** @brief Background thread collecting samples
  *
  * This function collects samples, perhaps converts them to the target format,
@@ -323,11 +395,8 @@ static void *listen_thread(void attribute((unused)) *arg) {
   struct iovec iov[2];
 
   for(;;) {
-    if(!p) {
-      pthread_mutex_lock(&lock);
+    if(!p)
       p = new_packet();
-      pthread_mutex_unlock(&lock);
-    }
     iov[0].iov_base = &header;
     iov[0].iov_len = sizeof header;
     iov[1].iov_base = p->samples_raw;
@@ -354,7 +423,7 @@ static void *listen_thread(void attribute((unused)) *arg) {
            timestamp, next_timestamp);
       continue;
     }
-    pthread_mutex_lock(&lock);
+    p->next = 0;
     p->flags = 0;
     p->timestamp = timestamp;
     /* Convert to target format */
@@ -377,18 +446,20 @@ static void *listen_thread(void attribute((unused)) *arg) {
      * This is rather unsatisfactory: it means that if packets get heavily
      * out of order then we guarantee dropouts.  But for now... */
     if(nsamples >= maxbuffer) {
-      //info("Buffer full");
-      write(2, "B", 1);
+      pthread_mutex_lock(&lock);
       while(nsamples >= maxbuffer)
         pthread_cond_wait(&cond, &lock);
+      pthread_mutex_unlock(&lock);
     }
-    /* Add the packet to the heap */
-    pheap_insert(&packets, p);
-    nsamples += p->nsamples;
+    /* Add the packet to the receive queue */
+    pthread_mutex_lock(&receive_lock);
+    *received_tail = p;
+    received_tail = &p->next;
+    ++nreceived;
+    pthread_cond_signal(&receive_cond);
+    pthread_mutex_unlock(&receive_lock);
     /* We'll need a new packet */
     p = 0;
-    pthread_cond_broadcast(&cond);
-    pthread_mutex_unlock(&lock);
   }
 }
 
@@ -461,8 +532,6 @@ static OSStatus adioproc
     while(samplesOutLeft > 0) {
       const struct packet *p = next_packet();
       if(p && contains(p, next_timestamp)) {
-        if(p->flags & IDLE)
-          write(2, "I", 1);
         /* This packet is ready to play */
         const uint32_t packet_end = p->timestamp + p->nsamples;
         const uint32_t offset = next_timestamp - p->timestamp;
@@ -477,7 +546,6 @@ static OSStatus adioproc
           *samplesOut++ = (int16_t)ntohs(*ptr++) * (0.5 / 32767);
         /* We don't bother junking the packet - that'll be dealt with next time
          * round */
-        write(2, ".", 1);
       } else {
         /* No packet is ready to play (and there might be no packet at all) */
         samples_available = p ? p->timestamp - next_timestamp
@@ -489,7 +557,6 @@ static OSStatus adioproc
         next_timestamp += samples_available;
         samplesOut += samples_available;
         samplesOutLeft -= samples_available;
-        write(2, "?", 1);
       }
     }
     ++ab;
@@ -595,7 +662,6 @@ static int alsa_writei(const void *s, size_t n) {
     /* Something went wrong */
     switch(frames_written) {
     case -EAGAIN:
-      write(2, "#", 1);
       return 0;
     case -EPIPE:
       error(0, "error calling snd_pcm_writei: %ld",
@@ -617,9 +683,6 @@ static int alsa_writei(const void *s, size_t n) {
  * @return 0 on success, -1 on non-fatal error
  */
 static int alsa_play(const struct packet *p) {
-  if(p->flags & IDLE)
-    write(2, "I", 1);
-  write(2, ".", 1);
   return alsa_writei(p->samples_raw + next_timestamp - p->timestamp,
                      (p->timestamp + p->nsamples) - next_timestamp);
 }
@@ -634,7 +697,6 @@ static int alsa_infill(const struct packet *p) {
 
   if(p && samples_available > p->timestamp - next_timestamp)
     samples_available = p->timestamp - next_timestamp;
-  write(2, "?", 1);
   return alsa_writei(zeros, samples_available);
 }
 
@@ -670,6 +732,8 @@ static void play_rtp(void) {
 
   /* We receive and convert audio data in a background thread */
   pthread_create(&ltid, 0, listen_thread, 0);
+  /* We have a second thread to add received packets to the queue */
+  pthread_create(&ltid, 0, queue_thread, 0);
 #if API_ALSA
   {
     struct packet *p;
index ff8ccf08191678b91257db58e236b605f96602d5..9ec8c31038307b3245aed9c411fa46743fc57f46 100644 (file)
@@ -38,8 +38,8 @@ static inline struct timeval tvsub(const struct timeval a,
   return r;
 }
 
-static inline uint64_t tvsub_us(const struct timeval a,
-                                const struct timeval b) {
+static inline int64_t tvsub_us(const struct timeval a,
+                               const struct timeval b) {
   return (((uint64_t)a.tv_sec * 1000000 + a.tv_usec)
           - ((uint64_t)b.tv_sec * 1000000 + b.tv_usec));
 }
index c5fc668db06008aff26bfd07aa4219178a602635..f9badeb28f28f71b38bb32f063ec4caab4f337d6 100644 (file)
 #define NETWORK_BYTES 1024
 
 /** @brief Maximum RTP playahead (seconds) */
-#define RTP_AHEAD 2
+#define RTP_AHEAD 1
 
 /** @brief Maximum number of FDs to poll for */
 #define NFDS 256
@@ -971,8 +971,8 @@ int main(int argc, char **argv) {
         struct timeval now;
         uint64_t target_us;
         uint64_t target_rtp_time;
-        const uint64_t ahead = RTP_AHEAD * config->sample_format.rate
-          * config->sample_format.channels;
+        const uint64_t samples_ahead = RTP_AHEAD * config->sample_format.rate
+                                              * config->sample_format.channels;
 
         static unsigned logit;
 
@@ -988,14 +988,16 @@ int main(int argc, char **argv) {
                                      * config->sample_format.channels)
 
                           / 1000000;
+#if 0
         /* TODO remove logging guff */
         if(!(logit++ & 1023))
           info("rtp_time %llu target %llu difference %lld [%lld]", 
                rtp_time, target_rtp_time,
                rtp_time - target_rtp_time,
-               ahead);
+               samples_ahead);
+#endif
         if(rtp_time < target_rtp_time
-           || rtp_time - target_rtp_time < ahead)
+           || rtp_time - target_rtp_time < samples_ahead)
           bfd_slot = addfd(bfd, POLLOUT);
         break;
       }