chiark / gitweb /
now builds on linux
authorrjk@greenend.org.uk <>
Sun, 16 Sep 2007 18:08:46 +0000 (19:08 +0100)
committerrjk@greenend.org.uk <>
Sun, 16 Sep 2007 18:08:46 +0000 (19:08 +0100)
.bzrignore
clients/Makefile.am
clients/playrtp.c
lib/rtp.h
server/speaker.c

index fb4ade0..71c11f7 100644 (file)
@@ -96,3 +96,4 @@ tests/testroot
 disorder.plist
 server/uk.org.greenend.rjk.disorder.plist
 doc/guts
+clients/disorder-playrtp
index b197b90..784ed95 100644 (file)
@@ -18,7 +18,7 @@
 # USA
 #
 
-bin_PROGRAMS=disorder disorderfm
+bin_PROGRAMS=disorder disorderfm disorder-playrtp
 noinst_PROGRAMS=test-eclient filename-bytes
 
 AM_CPPFLAGS=-I${top_srcdir}/lib -I../lib
@@ -34,6 +34,10 @@ disorderfm_SOURCES=disorderfm.c \
 disorderfm_LDADD=$(LIBOBJS) ../lib/libdisorder.a $(LIBGC) $(LIBICONV)
 disorderfm_DEPENDENCIES=$(LIBOBJS) ../lib/libdisorder.a
 
+disorder_playrtp_SOURCES=playrtp.c
+disorder_playrtp_LDADD=$(LIBOBJS) ../lib/libdisorder.a $(LIBASOUND)
+disorder_playrtp_DEPENDENCIES=$(LIBOBJS) ../lib/libdisorder.a
+
 filename_bytes_SOURCES=filename-bytes.c
 
 test_eclient_SOURCES=test-eclient.c \
index 7ec35d0..7f2b238 100644 (file)
@@ -29,6 +29,7 @@
 #include <sys/socket.h>
 #include <netdb.h>
 #include <pthread.h>
+#include <locale.h>
 
 #include "log.h"
 #include "mem.h"
 #include "addr.h"
 #include "syscalls.h"
 #include "rtp.h"
-#include "debug.h"
+#include "defs.h"
 
 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
 # include <CoreAudio/AudioHardware.h>
 #endif
+#if API_ALSA
+#include <alsa/asoundlib.h>
+#endif
 
+/** @brief RTP socket */
 static int rtpfd;
 
-#define MAXSAMPLES 2048                 /* max samples/frame we'll support */
-/* NB two channels = two samples in this program! */
-#define MINBUFFER 8820                  /* when to stop playing */
+/** @brief Output device */
+static const char *device;
+
+/** @brief Maximum samples per packet we'll support
+ *
+ * NB that two channels = two samples in this program.
+ */
+#define MAXSAMPLES 2048
+
+/** @brief Minimum buffer size
+ *
+ * We'll stop playing if there's only this many samples in the buffer. */
+#define MINBUFFER 8820
+
+/** @brief Maximum sample size
+ *
+ * The maximum supported size (in bytes) of one sample. */
+#define MAXSAMPLESIZE 2
+
 #define READAHEAD 88200                 /* how far to read ahead */
+
 #define MAXBUFFER (3 * 88200)           /* maximum buffer contents */
 
-struct frame {
-  struct frame *next;                   /* another frame */
-  int nsamples;                         /* number of samples */
-  int nused;                            /* number of samples used so far */
-  uint32_t timestamp;                   /* timestamp from packet */
+/** @brief Received packet
+ *
+ * Packets are recorded in an ordered linked list. */
+struct packet {
+  /** @brief Pointer to next packet
+   * The next packet might not be immediately next: if packets are dropped
+   * or mis-ordered there may be gaps at any given moment. */
+  struct packet *next;
+  /** @brief Number of samples in this packet */
+  int nsamples;
+  /** @brief Number of samples used from this packet */
+  int nused;
+  /** @brief Timestamp from RTP packet
+   *
+   * NB that "timestamps" are really sample counters.*/
+  uint32_t timestamp;
 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
-  float samples[MAXSAMPLES];            /* converted sample data */
+  /** @brief Converted sample data */
+  float samples_float[MAXSAMPLES];
+#else
+  /** @brief Raw sample data */
+  unsigned char samples_raw[MAXSAMPLES * MAXSAMPLESIZE];
 #endif
 };
 
-static unsigned long nsamples;          /* total samples available */
+/** @brief Total number of samples available */
+static unsigned long nsamples;
+
+/** @brief Linked list of packets
+ *
+ * In ascending order of timestamp. */
+static struct packet *packets;
+
+/** @brief Timestamp of next packet to play.
+ *
+ * This is set to the timestamp of the last packet, plus the number of
+ * samples it contained.
+ */
+static uint32_t next_timestamp;
 
-static struct frame *frames;            /* received frames in ascending order
-                                         * of timestamp */
+/** @brief Lock protecting @ref packets */
 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-/* lock protecting frame list */
 
-static pthread_cond_t cond = PTHREAD_CONDVAR_INITIALIZER;
-/* signalled whenever we add a new frame */
+/** @brief Condition variable signalled whenever @ref packets is changed */
+static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
 
 static const struct option options[] = {
   { "help", no_argument, 0, 'h' },
   { "version", no_argument, 0, 'V' },
   { "debug", no_argument, 0, 'd' },
+  { "device", required_argument, 0, 'D' },
   { 0, 0, 0, 0 }
 };
 
-/* Return true iff a > b in sequence-space arithmetic */
-static inline int gt(const struct frame *a, const struct frame *b) {
-  return (uint32_t)(a->timestamp - b->timestamp) < 0x80000000;
+/** @brief Return true iff a < b in sequence-space arithmetic */
+static inline int lt(const struct packet *a, const struct packet *b) {
+  return (uint32_t)(a->timestamp - b->timestamp) & 0x80000000;
 }
 
-/* Background thread that reads frames over the network and add them to the
- * list */
-static listen_thread(void attribute((unused)) *arg) {
-  struct frame *f = 0, **ff;
-  int n, i;
+/** Background thread collecting samples
+ *
+ * This function collects samples, perhaps converts them to the target format,
+ * and adds them to the packet list. */
+static void *listen_thread(void attribute((unused)) *arg) {
+  struct packet *f = 0, **ff;
+  int n;
   union {
     struct rtp_header header;
     uint8_t bytes[sizeof(uint16_t) * MAXSAMPLES + sizeof (struct rtp_header)];
@@ -106,32 +157,50 @@ static listen_thread(void attribute((unused)) *arg) {
         fatal(errno, "error reading from socket");
       }
     }
-#if HAVE_COREAUDIO_AUDIOHARDWARE_H
+    /* Ignore too-short packets */
+    if((size_t)n <= sizeof (struct rtp_header))
+      continue;
     /* Convert to target format */
-    switch(packet.header.mtp & 0x7F) {
+    switch(packet.header.mpt & 0x7F) {
     case 10:
       f->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t);
-      for(i = 0; i < f->nsamples; ++i)
-        f->samples[i] = (int16_t)ntohs(samples[i]) * (0.5f / 32767);
+#if HAVE_COREAUDIO_AUDIOHARDWARE_H
+      /* Convert to what Core Audio expects */
+      for(n = 0; n < f->nsamples; ++n)
+        f->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767);
+#else
+      /* ALSA can do any necessary conversion itself (though it might be better
+       * to do any necessary conversion in the background) */
+      memcpy(f->samples_raw, samples, n - sizeof (struct rtp_header));
+#endif
       break;
       /* TODO support other RFC3551 media types (when the speaker does) */
     default:
-      fatal(0, "unsupported RTP payload type %d", 
+      fatal(0, "unsupported RTP payload type %d",
             packet.header.mpt & 0x7F);
     }
-#endif
-    f->used = 0;
+    f->nused = 0;
     f->timestamp = ntohl(packet.header.timestamp);
     pthread_mutex_lock(&lock);
-    /* Stop reading if we've reached the maximum */
+    /* Stop reading if we've reached the maximum.
+     *
+     * This is rather unsatisfactory: it means that if packets get heavily
+     * out of order then we guarantee dropouts.  But for now... */
     while(nsamples >= MAXBUFFER)
       pthread_cond_wait(&cond, &lock);
-    for(ff = &frames; *ff && !gt(*ff, f); ff = &(*ff)->next)
+    for(ff = &packets; *ff && lt(*ff, f); ff = &(*ff)->next)
       ;
-    f->next = *ff;
-    *ff = f;
-    nsamples += f->nsamples;
-    pthread_cond_broadcast(&cond);
+    /* So now either !*ff or *ff >= f */
+    if(*ff && f->timestamp == (*ff)->timestamp) {
+      /* *ff == f; a duplicate.  Ideally we avoid the translation step here,
+       * but we'll worry about that another time. */
+      free(f);
+    } else {
+      f->next = *ff;
+      *ff = f;
+      nsamples += f->nsamples;
+      pthread_cond_broadcast(&cond);
+    }
     pthread_mutex_unlock(&lock);
     f = 0;
   }
@@ -152,14 +221,14 @@ static OSStatus adioproc(AudioDeviceID inDevice,
   size_t samplesInLeft;
   size_t samplesToCopy;
 
-  pthread_mutex_lock(&lock);  
+  pthread_mutex_lock(&lock);
   samplesOut = ab->data;
   samplesOutLeft = ab->mDataByteSize / sizeof (float);
-  while(frames && nbuffers > 0) {
-    if(frames->used == frames->nsamples) {
+  while(packets && nbuffers > 0) {
+    if(packets->used == packets->nsamples) {
       /* TODO if we dropped a packet then we should introduce a gap here */
-      struct frame *const f = frames;
-      frames = f->next;
+      struct packet *const f = packets;
+      packets = f->next;
       free(f);
       pthread_cond_broadcast(&cond);
       continue;
@@ -173,11 +242,11 @@ static OSStatus adioproc(AudioDeviceID inDevice,
     }
     /* Now: (1) there is some data left to read
      *      (2) there is some space to put it */
-    samplesInLeft = frames->nsamples - frames->used;
+    samplesInLeft = packets->nsamples - packets->used;
     samplesToCopy = (samplesInLeft < samplesOutLeft
                      ? samplesInLeft : samplesOutLeft);
-    memcpy(samplesOut, frame->samples + frames->used, samplesToCopy);
-    frames->used += samplesToCopy;
+    memcpy(samplesOut, packet->samples + packets->used, samplesToCopy);
+    packets->used += samplesToCopy;
     samplesOut += samplesToCopy;
     samesOutLeft -= samplesToCopy;
   }
@@ -186,13 +255,133 @@ static OSStatus adioproc(AudioDeviceID inDevice,
 }
 #endif
 
-void play_rtp(void) {
-  pthread_t lt;
+static void play_rtp(void) {
+  pthread_t ltid;
 
   /* We receive and convert audio data in a background thread */
-  pthread_create(&lt, 0, listen_thread, 0);
+  pthread_create(&ltid, 0, listen_thread, 0);
 #if API_ALSA
-  assert(!"implemented");
+  {
+    snd_pcm_t *pcm;
+    snd_pcm_hw_params_t *hwparams;
+    snd_pcm_sw_params_t *swparams;
+    /* Only support one format for now */
+    const int sample_format = SND_PCM_FORMAT_S16_BE;
+    unsigned rate = 44100;
+    const int channels = 2;
+    const int samplesize = channels * sizeof(uint16_t);
+    snd_pcm_uframes_t pcm_bufsize = MAXSAMPLES * samplesize * 3;
+    /* If we can write more than this many samples we'll get a wakeup */
+    const int avail_min = 256;
+    snd_pcm_sframes_t frames_written;
+    size_t samples_written;
+    int prepared = 1;
+    int err;
+
+    /* Open ALSA */
+    if((err = snd_pcm_open(&pcm,
+                           device ? device : "default",
+                           SND_PCM_STREAM_PLAYBACK,
+                           SND_PCM_NONBLOCK)))
+      fatal(0, "error from snd_pcm_open: %d", err);
+    /* Set up 'hardware' parameters */
+    snd_pcm_hw_params_alloca(&hwparams);
+    if((err = snd_pcm_hw_params_any(pcm, hwparams)) < 0)
+      fatal(0, "error from snd_pcm_hw_params_any: %d", err);
+    if((err = snd_pcm_hw_params_set_access(pcm, hwparams,
+                                           SND_PCM_ACCESS_RW_INTERLEAVED)) < 0)
+      fatal(0, "error from snd_pcm_hw_params_set_access: %d", err);
+    if((err = snd_pcm_hw_params_set_format(pcm, hwparams,
+                                           sample_format)) < 0)
+      fatal(0, "error from snd_pcm_hw_params_set_format (%d): %d",
+            sample_format, err);
+    if((err = snd_pcm_hw_params_set_rate_near(pcm, hwparams, &rate, 0)) < 0)
+      fatal(0, "error from snd_pcm_hw_params_set_rate (%d): %d",
+            rate, err);
+    if((err = snd_pcm_hw_params_set_channels(pcm, hwparams,
+                                             channels)) < 0)
+      fatal(0, "error from snd_pcm_hw_params_set_channels (%d): %d",
+            channels, err);
+    if((err = snd_pcm_hw_params_set_buffer_size_near(pcm, hwparams,
+                                                     &pcm_bufsize)) < 0)
+      fatal(0, "error from snd_pcm_hw_params_set_buffer_size (%d): %d",
+            MAXSAMPLES * samplesize * 3, err);
+    if((err = snd_pcm_hw_params(pcm, hwparams)) < 0)
+      fatal(0, "error calling snd_pcm_hw_params: %d", err);
+    /* Set up 'software' parameters */
+    snd_pcm_sw_params_alloca(&swparams);
+    if((err = snd_pcm_sw_params_current(pcm, swparams)) < 0)
+      fatal(0, "error calling snd_pcm_sw_params_current: %d", err);
+    if((err = snd_pcm_sw_params_set_avail_min(pcm, swparams, avail_min)) < 0)
+      fatal(0, "error calling snd_pcm_sw_params_set_avail_min %d: %d",
+            avail_min, err);
+    if((err = snd_pcm_sw_params(pcm, swparams)) < 0)
+      fatal(0, "error calling snd_pcm_sw_params: %d", err);
+
+    /* Ready to go */
+
+    pthread_mutex_lock(&lock);
+    for(;;) {
+      /* Wait for the buffer to fill up a bit */
+      while(nsamples < READAHEAD)
+        pthread_cond_wait(&cond, &lock);
+      if(!prepared) {
+        if((err = snd_pcm_prepare(pcm)))
+          fatal(0, "error calling snd_pcm_prepare: %d", err);
+        prepared = 1;
+      }
+      /* Wait until the buffer empties out */
+      while(nsamples >= MINBUFFER) {
+        /* Wait for ALSA to ask us for more data */
+        pthread_mutex_unlock(&lock);
+        snd_pcm_wait(pcm, -1);
+        pthread_mutex_lock(&lock);
+        /* ALSA wants more data */
+        if(packets && packets->timestamp + packets->nused == next_timestamp) {
+          /* Hooray, we have a packet we can play */
+          const size_t samples_available = packets->nsamples - packets->nused;
+          const size_t frames_available = samples_available / 2;
+
+          frames_written = snd_pcm_writei(pcm,
+                                          packets->samples_raw + packets->nused,
+                                          frames_available);
+          if(frames_written < 0)
+            fatal(0, "error calling snd_pcm_writei: %d", err);
+          samples_written = frames_written * 2;
+          packets->nused += samples_written;
+          next_timestamp += samples_written;
+          if(packets->nused == packets->nsamples) {
+            struct packet *f = packets;
+
+            packets = f->next;
+            nsamples -= f->nsamples;
+            free(f);
+            pthread_cond_broadcast(&cond);
+          }
+        } else {
+          /* We don't have anything to play!  We'd better play some 0s. */
+          static const uint16_t zeros[1024];
+          size_t samples_available = 1024, frames_available;
+          if(packets && next_timestamp + samples_available > packets->timestamp)
+            samples_available = packets->timestamp - next_timestamp;
+          frames_available = samples_available / 2;
+          frames_written = snd_pcm_writei(pcm,
+                                          zeros,
+                                          frames_available);
+          if(frames_written < 0)
+            fatal(0, "error calling snd_pcm_writei: %d", err);
+          next_timestamp += samples_written;
+        }
+      }
+      /* We stop playing for a bit until the buffer re-fills */
+      pthread_mutex_unlock(&lock);
+      if((err = snd_pcm_drain(pcm)))
+        fatal(0, "error calling snd_pcm_drain: %d", err);
+      prepared = 0;
+      pthread_mutex_lock(&lock);
+    }
+
+  }
 #elif HAVE_COREAUDIO_AUDIOHARDWARE_H
   {
     OSStatus status;
@@ -262,7 +451,8 @@ static void help(void) {
          "Options:\n"
          "  --help, -h              Display usage message\n"
          "  --version, -V           Display version number\n"
-         "  --debug, -d             Turn on debugging\n");
+         "  --debug, -d             Turn on debugging\n"
+          "  --device, -D DEVICE     Output device\n");
   xfclose(stdout);
   exit(0);
 }
@@ -278,9 +468,9 @@ int main(int argc, char **argv) {
   int n;
   struct addrinfo *res;
   struct stringlist sl;
-  const char *sockname;
+  char *sockname;
 
-  static const struct addrinfo prefbind = {
+  static const struct addrinfo prefs = {
     AI_PASSIVE,
     PF_INET,
     SOCK_DGRAM,
@@ -293,11 +483,12 @@ int main(int argc, char **argv) {
 
   mem_init();
   if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale");
-  while((n = getopt_long(argc, argv, "hVd", options, 0)) >= 0) {
+  while((n = getopt_long(argc, argv, "hVdD", options, 0)) >= 0) {
     switch(n) {
     case 'h': help();
     case 'V': version();
     case 'd': debugging = 1; break;
+    case 'D': device = optarg; break;
     default: fatal(0, "invalid option");
     }
   }
@@ -308,7 +499,7 @@ int main(int argc, char **argv) {
   sl.n = argc;
   sl.s = argv;
   /* Listen for inbound audio data */
-  if(!(res = get_address(&sl, &pref, &sockname)))
+  if(!(res = get_address(&sl, &prefs, &sockname)))
     exit(1);
   if((rtpfd = socket(res->ai_family,
                      res->ai_socktype,
index 9159bfb..63ade9b 100644 (file)
--- a/lib/rtp.h
+++ b/lib/rtp.h
@@ -22,7 +22,7 @@
 #define RTP_H
 
 /* RTP is defined in RFC1889 */
-struct attribute((packed)) rtp {
+struct attribute((packed)) rtp_header {
   uint8_t vpxcc;
   uint8_t mpt;
   uint16_t seq;
index b3d62ab..4e79cdb 100644 (file)
@@ -555,7 +555,7 @@ static void fork_cmd(void) {
 static void play(size_t frames) {
   size_t avail_bytes, written_frames;
   ssize_t written_bytes;
-  struct rtp header;
+  struct rtp_header header;
   struct iovec vec[2];
 
   if(activate()) {