chiark / gitweb /
correct next_timestamp logic
authorrjk@greenend.org.uk <>
Sun, 16 Sep 2007 18:52:30 +0000 (19:52 +0100)
committerrjk@greenend.org.uk <>
Sun, 16 Sep 2007 18:52:30 +0000 (19:52 +0100)
clients/playrtp.c

index 7f2b2385169ad839befb3286eb40e1792513d888..5606fb5194f2d80e195bd6a4a4abea7d7db04aa2 100644 (file)
@@ -108,10 +108,15 @@ static struct packet *packets;
 /** @brief Timestamp of next packet to play.
  *
  * This is set to the timestamp of the last packet, plus the number of
 /** @brief Timestamp of next packet to play.
  *
  * This is set to the timestamp of the last packet, plus the number of
- * samples it contained.
+ * samples it contained.  Only valid if @ref active is nonzero.
  */
 static uint32_t next_timestamp;
 
  */
 static uint32_t next_timestamp;
 
+/** @brief True if actively playing
+ *
+ * This is true when playing and false when just buffering. */
+static int active;
+
 /** @brief Lock protecting @ref packets */
 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
 
 /** @brief Lock protecting @ref packets */
 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
 
@@ -127,16 +132,16 @@ static const struct option options[] = {
 };
 
 /** @brief Return true iff a < b in sequence-space arithmetic */
 };
 
 /** @brief Return true iff a < b in sequence-space arithmetic */
-static inline int lt(const struct packet *a, const struct packet *b) {
-  return (uint32_t)(a->timestamp - b->timestamp) & 0x80000000;
+static inline int lt(uint32_t a, uint32_t b) {
+  return (uint32_t)(a - b) & 0x80000000;
 }
 
 }
 
-/** Background thread collecting samples
+/** @brief Background thread collecting samples
  *
  * This function collects samples, perhaps converts them to the target format,
  * and adds them to the packet list. */
 static void *listen_thread(void attribute((unused)) *arg) {
  *
  * This function collects samples, perhaps converts them to the target format,
  * and adds them to the packet list. */
 static void *listen_thread(void attribute((unused)) *arg) {
-  struct packet *f = 0, **ff;
+  struct packet *p = 0, **pp;
   int n;
   union {
     struct rtp_header header;
   int n;
   union {
     struct rtp_header header;
@@ -146,8 +151,8 @@ static void *listen_thread(void attribute((unused)) *arg) {
                                                + sizeof (struct rtp_header));
 
   for(;;) {
                                                + sizeof (struct rtp_header));
 
   for(;;) {
-    if(!f)
-      f = xmalloc(sizeof *f);
+    if(!p)
+      p = xmalloc(sizeof *p);
     n = read(rtpfd, packet.bytes, sizeof packet.bytes);
     if(n < 0) {
       switch(errno) {
     n = read(rtpfd, packet.bytes, sizeof packet.bytes);
     if(n < 0) {
       switch(errno) {
@@ -160,18 +165,23 @@ static void *listen_thread(void attribute((unused)) *arg) {
     /* Ignore too-short packets */
     if((size_t)n <= sizeof (struct rtp_header))
       continue;
     /* Ignore too-short packets */
     if((size_t)n <= sizeof (struct rtp_header))
       continue;
+    p->nused = 0;
+    p->timestamp = ntohl(packet.header.timestamp);
+    /* Ignore packets in the past */
+    if(active && lt(p->timestamp, next_timestamp))
+      continue;
     /* Convert to target format */
     switch(packet.header.mpt & 0x7F) {
     case 10:
     /* Convert to target format */
     switch(packet.header.mpt & 0x7F) {
     case 10:
-      f->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t);
+      p->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t);
 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
       /* Convert to what Core Audio expects */
 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
       /* Convert to what Core Audio expects */
-      for(n = 0; n < f->nsamples; ++n)
-        f->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767);
+      for(n = 0; n < p->nsamples; ++n)
+        p->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767);
 #else
       /* ALSA can do any necessary conversion itself (though it might be better
        * to do any necessary conversion in the background) */
 #else
       /* ALSA can do any necessary conversion itself (though it might be better
        * to do any necessary conversion in the background) */
-      memcpy(f->samples_raw, samples, n - sizeof (struct rtp_header));
+      memcpy(p->samples_raw, samples, n - sizeof (struct rtp_header));
 #endif
       break;
       /* TODO support other RFC3551 media types (when the speaker does) */
 #endif
       break;
       /* TODO support other RFC3551 media types (when the speaker does) */
@@ -179,8 +189,6 @@ static void *listen_thread(void attribute((unused)) *arg) {
       fatal(0, "unsupported RTP payload type %d",
             packet.header.mpt & 0x7F);
     }
       fatal(0, "unsupported RTP payload type %d",
             packet.header.mpt & 0x7F);
     }
-    f->nused = 0;
-    f->timestamp = ntohl(packet.header.timestamp);
     pthread_mutex_lock(&lock);
     /* Stop reading if we've reached the maximum.
      *
     pthread_mutex_lock(&lock);
     /* Stop reading if we've reached the maximum.
      *
@@ -188,25 +196,27 @@ static void *listen_thread(void attribute((unused)) *arg) {
      * out of order then we guarantee dropouts.  But for now... */
     while(nsamples >= MAXBUFFER)
       pthread_cond_wait(&cond, &lock);
      * out of order then we guarantee dropouts.  But for now... */
     while(nsamples >= MAXBUFFER)
       pthread_cond_wait(&cond, &lock);
-    for(ff = &packets; *ff && lt(*ff, f); ff = &(*ff)->next)
+    for(pp = &packets;
+        *pp && lt((*pp)->timestamp, p->timestamp);
+        pp = &(*pp)->next)
       ;
       ;
-    /* So now either !*ff or *ff >= f */
-    if(*ff && f->timestamp == (*ff)->timestamp) {
-      /* *ff == f; a duplicate.  Ideally we avoid the translation step here,
+    /* So now either !*pp or *pp >= p */
+    if(*pp && p->timestamp == (*pp)->timestamp) {
+      /* *pp == p; a duplicate.  Ideally we avoid the translation step here,
        * but we'll worry about that another time. */
        * but we'll worry about that another time. */
-      free(f);
     } else {
     } else {
-      f->next = *ff;
-      *ff = f;
-      nsamples += f->nsamples;
+      p->next = *pp;
+      *pp = p;
+      nsamples += p->nsamples;
       pthread_cond_broadcast(&cond);
       pthread_cond_broadcast(&cond);
+      p = 0;                            /* we've consumed this packet */
     }
     pthread_mutex_unlock(&lock);
     }
     pthread_mutex_unlock(&lock);
-    f = 0;
   }
 }
 
 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
   }
 }
 
 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
+/** @brief Callback from Core Audio */
 static OSStatus adioproc(AudioDeviceID inDevice,
                          const AudioTimeStamp *inNow,
                          const AudioBufferList *inInputData,
 static OSStatus adioproc(AudioDeviceID inDevice,
                          const AudioTimeStamp *inNow,
                          const AudioBufferList *inInputData,
@@ -227,9 +237,9 @@ static OSStatus adioproc(AudioDeviceID inDevice,
   while(packets && nbuffers > 0) {
     if(packets->used == packets->nsamples) {
       /* TODO if we dropped a packet then we should introduce a gap here */
   while(packets && nbuffers > 0) {
     if(packets->used == packets->nsamples) {
       /* TODO if we dropped a packet then we should introduce a gap here */
-      struct packet *const f = packets;
-      packets = f->next;
-      free(f);
+      struct packet *const p = packets;
+      packets = p->next;
+      free(p);
       pthread_cond_broadcast(&cond);
       continue;
     }
       pthread_cond_broadcast(&cond);
       continue;
     }
@@ -255,6 +265,15 @@ static OSStatus adioproc(AudioDeviceID inDevice,
 }
 #endif
 
 }
 #endif
 
+/** @brief Play an RTP stream
+ *
+ * This is the guts of the program.  It is responsible for:
+ * - starting the listening thread
+ * - opening the audio device
+ * - reading ahead to build up a buffer
+ * - arranging for audio to be played
+ * - detecting when the buffer has got too small and re-buffering
+ */
 static void play_rtp(void) {
   pthread_t ltid;
 
 static void play_rtp(void) {
   pthread_t ltid;
 
@@ -330,13 +349,16 @@ static void play_rtp(void) {
           fatal(0, "error calling snd_pcm_prepare: %d", err);
         prepared = 1;
       }
           fatal(0, "error calling snd_pcm_prepare: %d", err);
         prepared = 1;
       }
+      /* Start at the first available packet */
+      next_timestamp = packets->timestamp;
+      active = 1;
       /* Wait until the buffer empties out */
       while(nsamples >= MINBUFFER) {
         /* Wait for ALSA to ask us for more data */
         pthread_mutex_unlock(&lock);
         snd_pcm_wait(pcm, -1);
         pthread_mutex_lock(&lock);
       /* Wait until the buffer empties out */
       while(nsamples >= MINBUFFER) {
         /* Wait for ALSA to ask us for more data */
         pthread_mutex_unlock(&lock);
         snd_pcm_wait(pcm, -1);
         pthread_mutex_lock(&lock);
-        /* ALSA wants more data */
+        /* ALSA is ready for more data */
         if(packets && packets->timestamp + packets->nused == next_timestamp) {
           /* Hooray, we have a packet we can play */
           const size_t samples_available = packets->nsamples - packets->nused;
         if(packets && packets->timestamp + packets->nused == next_timestamp) {
           /* Hooray, we have a packet we can play */
           const size_t samples_available = packets->nsamples - packets->nused;
@@ -351,11 +373,12 @@ static void play_rtp(void) {
           packets->nused += samples_written;
           next_timestamp += samples_written;
           if(packets->nused == packets->nsamples) {
           packets->nused += samples_written;
           next_timestamp += samples_written;
           if(packets->nused == packets->nsamples) {
-            struct packet *f = packets;
+            /* We're done with this packet */
+            struct packet *p = packets;
 
 
-            packets = f->next;
-            nsamples -= f->nsamples;
-            free(f);
+            packets = p->next;
+            nsamples -= p->nsamples;
+            free(p);
             pthread_cond_broadcast(&cond);
           }
         } else {
             pthread_cond_broadcast(&cond);
           }
         } else {
@@ -373,6 +396,7 @@ static void play_rtp(void) {
           next_timestamp += samples_written;
         }
       }
           next_timestamp += samples_written;
         }
       }
+      active = 0;
       /* We stop playing for a bit until the buffer re-fills */
       pthread_mutex_unlock(&lock);
       if((err = snd_pcm_drain(pcm)))
       /* We stop playing for a bit until the buffer re-fills */
       pthread_mutex_unlock(&lock);
       if((err = snd_pcm_drain(pcm)))