chiark / gitweb /
sort of works
[disorder] / clients / playrtp.c
1 /*
2  * This file is part of DisOrder.
3  * Copyright (C) 2007 Richard Kettlewell
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA
19  */
20
21 #include <config.h>
22 #include "types.h"
23
24 #include <getopt.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <netdb.h>
31 #include <pthread.h>
32 #include <locale.h>
33
34 #include "log.h"
35 #include "mem.h"
36 #include "configuration.h"
37 #include "addr.h"
38 #include "syscalls.h"
39 #include "rtp.h"
40 #include "defs.h"
41
42 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
43 # include <CoreAudio/AudioHardware.h>
44 #endif
45 #if API_ALSA
46 #include <alsa/asoundlib.h>
47 #endif
48
49 /** @brief RTP socket */
50 static int rtpfd;
51
52 /** @brief Output device */
53 static const char *device;
54
55 /** @brief Maximum samples per packet we'll support
56  *
57  * NB that two channels = two samples in this program.
58  */
59 #define MAXSAMPLES 2048
60
61 /** @brief Minimum buffer size
62  *
63  * We'll stop playing if there's only this many samples in the buffer. */
64 #define MINBUFFER 8820
65
66 /** @brief Maximum sample size
67  *
68  * The maximum supported size (in bytes) of one sample. */
69 #define MAXSAMPLESIZE 2
70
71 #define READAHEAD 88200                 /* how far to read ahead */
72
73 #define MAXBUFFER (3 * 88200)           /* maximum buffer contents */
74
75 /** @brief Received packet
76  *
77  * Packets are recorded in an ordered linked list. */
78 struct packet {
79   /** @brief Pointer to next packet
80    * The next packet might not be immediately next: if packets are dropped
81    * or mis-ordered there may be gaps at any given moment. */
82   struct packet *next;
83   /** @brief Number of samples in this packet */
84   int nsamples;
85   /** @brief Number of samples used from this packet */
86   int nused;
87   /** @brief Timestamp from RTP packet
88    *
89    * NB that "timestamps" are really sample counters.*/
90   uint32_t timestamp;
91 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
92   /** @brief Converted sample data */
93   float samples_float[MAXSAMPLES];
94 #else
95   /** @brief Raw sample data */
96   unsigned char samples_raw[MAXSAMPLES * MAXSAMPLESIZE];
97 #endif
98 };
99
100 /** @brief Total number of samples available */
101 static unsigned long nsamples;
102
103 /** @brief Linked list of packets
104  *
105  * In ascending order of timestamp. */
106 static struct packet *packets;
107
108 /** @brief Timestamp of next packet to play.
109  *
110  * This is set to the timestamp of the last packet, plus the number of
111  * samples it contained.  Only valid if @ref active is nonzero.
112  */
113 static uint32_t next_timestamp;
114
115 /** @brief True if actively playing
116  *
117  * This is true when playing and false when just buffering. */
118 static int active;
119
120 /** @brief Lock protecting @ref packets */
121 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
122
123 /** @brief Condition variable signalled whenever @ref packets is changed */
124 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
125
126 static const struct option options[] = {
127   { "help", no_argument, 0, 'h' },
128   { "version", no_argument, 0, 'V' },
129   { "debug", no_argument, 0, 'd' },
130   { "device", required_argument, 0, 'D' },
131   { 0, 0, 0, 0 }
132 };
133
134 /** @brief Return true iff a < b in sequence-space arithmetic */
135 static inline int lt(uint32_t a, uint32_t b) {
136   return (uint32_t)(a - b) & 0x80000000;
137 }
138
139 /** @brief Background thread collecting samples
140  *
141  * This function collects samples, perhaps converts them to the target format,
142  * and adds them to the packet list. */
143 static void *listen_thread(void attribute((unused)) *arg) {
144   struct packet *p = 0, **pp;
145   int n;
146   union {
147     struct rtp_header header;
148     uint8_t bytes[sizeof(uint16_t) * MAXSAMPLES + sizeof (struct rtp_header)];
149   } packet;
150   const uint16_t *const samples = (uint16_t *)(packet.bytes
151                                                + sizeof (struct rtp_header));
152
153   for(;;) {
154     if(!p)
155       p = xmalloc(sizeof *p);
156     n = read(rtpfd, packet.bytes, sizeof packet.bytes);
157     if(n < 0) {
158       switch(errno) {
159       case EINTR:
160         continue;
161       default:
162         fatal(errno, "error reading from socket");
163       }
164     }
165     /* Ignore too-short packets */
166     if((size_t)n <= sizeof (struct rtp_header))
167       continue;
168     p->nused = 0;
169     p->timestamp = ntohl(packet.header.timestamp);
170     /* Ignore packets in the past */
171     if(active && lt(p->timestamp, next_timestamp))
172       continue;
173     /* Convert to target format */
174     switch(packet.header.mpt & 0x7F) {
175     case 10:
176       p->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t);
177 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
178       /* Convert to what Core Audio expects */
179       for(n = 0; n < p->nsamples; ++n)
180         p->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767);
181 #else
182       /* ALSA can do any necessary conversion itself (though it might be better
183        * to do any necessary conversion in the background) */
184       memcpy(p->samples_raw, samples, n - sizeof (struct rtp_header));
185 #endif
186       break;
187       /* TODO support other RFC3551 media types (when the speaker does) */
188     default:
189       fatal(0, "unsupported RTP payload type %d",
190             packet.header.mpt & 0x7F);
191     }
192     pthread_mutex_lock(&lock);
193     /* Stop reading if we've reached the maximum.
194      *
195      * This is rather unsatisfactory: it means that if packets get heavily
196      * out of order then we guarantee dropouts.  But for now... */
197     while(nsamples >= MAXBUFFER)
198       pthread_cond_wait(&cond, &lock);
199     for(pp = &packets;
200         *pp && lt((*pp)->timestamp, p->timestamp);
201         pp = &(*pp)->next)
202       ;
203     /* So now either !*pp or *pp >= p */
204     if(*pp && p->timestamp == (*pp)->timestamp) {
205       /* *pp == p; a duplicate.  Ideally we avoid the translation step here,
206        * but we'll worry about that another time. */
207     } else {
208       p->next = *pp;
209       *pp = p;
210       nsamples += p->nsamples;
211       pthread_cond_broadcast(&cond);
212       p = 0;                            /* we've consumed this packet */
213     }
214     pthread_mutex_unlock(&lock);
215   }
216 }
217
218 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
219 /** @brief Callback from Core Audio */
220 static OSStatus adioproc(AudioDeviceID inDevice,
221                          const AudioTimeStamp *inNow,
222                          const AudioBufferList *inInputData,
223                          const AudioTimeStamp *inInputTime,
224                          AudioBufferList *outOutputData,
225                          const AudioTimeStamp *inOutputTime,
226                          void *inClientData) {
227   UInt32 nbuffers = outOutputData->mNumberBuffers;
228   AudioBuffer *ab = outOutputData->mBuffers;
229   float *samplesOut;                    /* where to write samples to */
230   size_t samplesOutLeft;                /* space left */
231   size_t samplesInLeft;
232   size_t samplesToCopy;
233
234   pthread_mutex_lock(&lock);
235   samplesOut = ab->data;
236   samplesOutLeft = ab->mDataByteSize / sizeof (float);
237   while(packets && nbuffers > 0) {
238     if(packets->used == packets->nsamples) {
239       /* TODO if we dropped a packet then we should introduce a gap here */
240       struct packet *const p = packets;
241       packets = p->next;
242       free(p);
243       pthread_cond_broadcast(&cond);
244       continue;
245     }
246     if(samplesOutLeft == 0) {
247       --nbuffers;
248       ++ab;
249       samplesOut = ab->data;
250       samplesOutLeft = ab->mDataByteSize / sizeof (float);
251       continue;
252     }
253     /* Now: (1) there is some data left to read
254      *      (2) there is some space to put it */
255     samplesInLeft = packets->nsamples - packets->used;
256     samplesToCopy = (samplesInLeft < samplesOutLeft
257                      ? samplesInLeft : samplesOutLeft);
258     memcpy(samplesOut, packet->samples + packets->used, samplesToCopy);
259     packets->used += samplesToCopy;
260     samplesOut += samplesToCopy;
261     samesOutLeft -= samplesToCopy;
262   }
263   pthread_mutex_unlock(&lock);
264   return 0;
265 }
266 #endif
267
268 /** @brief Play an RTP stream
269  *
270  * This is the guts of the program.  It is responsible for:
271  * - starting the listening thread
272  * - opening the audio device
273  * - reading ahead to build up a buffer
274  * - arranging for audio to be played
275  * - detecting when the buffer has got too small and re-buffering
276  */
277 static void play_rtp(void) {
278   pthread_t ltid;
279
280   /* We receive and convert audio data in a background thread */
281   pthread_create(&ltid, 0, listen_thread, 0);
282 #if API_ALSA
283   {
284     snd_pcm_t *pcm;
285     snd_pcm_hw_params_t *hwparams;
286     snd_pcm_sw_params_t *swparams;
287     /* Only support one format for now */
288     const int sample_format = SND_PCM_FORMAT_S16_BE;
289     unsigned rate = 44100;
290     const int channels = 2;
291     const int samplesize = channels * sizeof(uint16_t);
292     snd_pcm_uframes_t pcm_bufsize = MAXSAMPLES * samplesize * 3;
293     /* If we can write more than this many samples we'll get a wakeup */
294     const int avail_min = 256;
295     snd_pcm_sframes_t frames_written;
296     size_t samples_written;
297     int prepared = 1;
298     int err;
299     int infilling = 0;
300
301     /* Open ALSA */
302     if((err = snd_pcm_open(&pcm,
303                            device ? device : "default",
304                            SND_PCM_STREAM_PLAYBACK,
305                            SND_PCM_NONBLOCK)))
306       fatal(0, "error from snd_pcm_open: %d", err);
307     /* Set up 'hardware' parameters */
308     snd_pcm_hw_params_alloca(&hwparams);
309     if((err = snd_pcm_hw_params_any(pcm, hwparams)) < 0)
310       fatal(0, "error from snd_pcm_hw_params_any: %d", err);
311     if((err = snd_pcm_hw_params_set_access(pcm, hwparams,
312                                            SND_PCM_ACCESS_RW_INTERLEAVED)) < 0)
313       fatal(0, "error from snd_pcm_hw_params_set_access: %d", err);
314     if((err = snd_pcm_hw_params_set_format(pcm, hwparams,
315                                            sample_format)) < 0)
316       fatal(0, "error from snd_pcm_hw_params_set_format (%d): %d",
317             sample_format, err);
318     if((err = snd_pcm_hw_params_set_rate_near(pcm, hwparams, &rate, 0)) < 0)
319       fatal(0, "error from snd_pcm_hw_params_set_rate (%d): %d",
320             rate, err);
321     if((err = snd_pcm_hw_params_set_channels(pcm, hwparams,
322                                              channels)) < 0)
323       fatal(0, "error from snd_pcm_hw_params_set_channels (%d): %d",
324             channels, err);
325     if((err = snd_pcm_hw_params_set_buffer_size_near(pcm, hwparams,
326                                                      &pcm_bufsize)) < 0)
327       fatal(0, "error from snd_pcm_hw_params_set_buffer_size (%d): %d",
328             MAXSAMPLES * samplesize * 3, err);
329     if((err = snd_pcm_hw_params(pcm, hwparams)) < 0)
330       fatal(0, "error calling snd_pcm_hw_params: %d", err);
331     /* Set up 'software' parameters */
332     snd_pcm_sw_params_alloca(&swparams);
333     if((err = snd_pcm_sw_params_current(pcm, swparams)) < 0)
334       fatal(0, "error calling snd_pcm_sw_params_current: %d", err);
335     if((err = snd_pcm_sw_params_set_avail_min(pcm, swparams, avail_min)) < 0)
336       fatal(0, "error calling snd_pcm_sw_params_set_avail_min %d: %d",
337             avail_min, err);
338     if((err = snd_pcm_sw_params(pcm, swparams)) < 0)
339       fatal(0, "error calling snd_pcm_sw_params: %d", err);
340
341     /* Ready to go */
342
343     pthread_mutex_lock(&lock);
344     for(;;) {
345       /* Wait for the buffer to fill up a bit */
346       info("Buffering...");
347       while(nsamples < READAHEAD)
348         pthread_cond_wait(&cond, &lock);
349       if(!prepared) {
350         if((err = snd_pcm_prepare(pcm)))
351           fatal(0, "error calling snd_pcm_prepare: %d", err);
352         prepared = 1;
353       }
354       /* Start at the first available packet */
355       next_timestamp = packets->timestamp;
356       active = 1;
357       infilling = 0;
358       info("Playing...");
359       /* Wait until the buffer empties out */
360       while(nsamples >= MINBUFFER) {
361         /* Wait for ALSA to ask us for more data */
362         pthread_mutex_unlock(&lock);
363         snd_pcm_wait(pcm, -1);
364         pthread_mutex_lock(&lock);
365         /* ALSA is ready for more data */
366         if(packets && packets->timestamp + packets->nused == next_timestamp) {
367           /* Hooray, we have a packet we can play */
368           const size_t samples_available = packets->nsamples - packets->nused;
369           const size_t frames_available = samples_available / 2;
370
371           frames_written = snd_pcm_writei(pcm,
372                                           packets->samples_raw + packets->nused,
373                                           frames_available);
374           if(frames_written < 0)
375             fatal(0, "error calling snd_pcm_writei: %ld",
376                   (long)frames_written);
377           samples_written = frames_written * 2;
378           packets->nused += samples_written;
379           next_timestamp += samples_written;
380           if(packets->nused == packets->nsamples) {
381             /* We're done with this packet */
382             struct packet *p = packets;
383
384             packets = p->next;
385             nsamples -= p->nsamples;
386             free(p);
387             pthread_cond_broadcast(&cond);
388           }
389           infilling = 0;
390         } else {
391           /* We don't have anything to play!  We'd better play some 0s. */
392           static const uint16_t zeros[1024];
393           size_t samples_available = 1024, frames_available;
394
395           if(!infilling) {
396             info("Infilling...");
397             infilling = 1;
398           }
399           if(packets && next_timestamp + samples_available > packets->timestamp)
400             samples_available = packets->timestamp - next_timestamp;
401           frames_available = samples_available / 2;
402           frames_written = snd_pcm_writei(pcm,
403                                           zeros,
404                                           frames_available);
405           if(frames_written < 0)
406             fatal(0, "error calling snd_pcm_writei: %ld",
407                   (long)frames_written);
408           next_timestamp += samples_written;
409         }
410       }
411       active = 0;
412       /* We stop playing for a bit until the buffer re-fills */
413       pthread_mutex_unlock(&lock);
414       if((err = snd_pcm_nonblock(pcm, 0)))
415         fatal(0, "error calling snd_pcm_nonblock: %d", err);
416       if((err = snd_pcm_drain(pcm)))
417         fatal(0, "error calling snd_pcm_drain: %d", err);
418       if((err = snd_pcm_nonblock(pcm, 1)))
419         fatal(0, "error calling snd_pcm_nonblock: %d", err);
420       prepared = 0;
421       pthread_mutex_lock(&lock);
422     }
423
424   }
425 #elif HAVE_COREAUDIO_AUDIOHARDWARE_H
426   {
427     OSStatus status;
428     UInt32 propertySize;
429     AudioDeviceID adid;
430     AudioStreamBasicDescription asbd;
431
432     /* If this looks suspiciously like libao's macosx driver there's an
433      * excellent reason for that... */
434
435     /* TODO report errors as strings not numbers */
436     propertySize = sizeof adid;
437     status = AudioHardwareGetProperty(kAudioHardwarePropertyDefaultOutputDevice,
438                                       &propertySize, &adid);
439     if(status)
440       fatal(0, "AudioHardwareGetProperty: %d", (int)status);
441     if(adid == kAudioDeviceUnknown)
442       fatal(0, "no output device");
443     propertySize = sizeof asbd;
444     status = AudioDeviceGetProperty(adid, 0, false,
445                                     kAudioDevicePropertyStreamFormat,
446                                     &propertySize, &asbd);
447     if(status)
448       fatal(0, "AudioHardwareGetProperty: %d", (int)status);
449     D(("mSampleRate       %f", asbd.mSampleRate));
450     D(("mFormatID         %08"PRIx32, asbd.mFormatID));
451     D(("mFormatFlags      %08"PRIx32, asbd.mFormatFlags));
452     D(("mBytesPerPacket   %08"PRIx32, asbd.mBytesPerPacket));
453     D(("mFramesPerPacket  %08"PRIx32, asbd.mFramesPerPacket));
454     D(("mBytesPerFrame    %08"PRIx32, asbd.mBytesPerFrame));
455     D(("mChannelsPerFrame %08"PRIx32, asbd.mChannelsPerFrame));
456     D(("mBitsPerChannel   %08"PRIx32, asbd.mBitsPerChannel));
457     D(("mReserved         %08"PRIx32, asbd.mReserved));
458     if(asbd.mFormatID != kAudioFormatLinearPCM)
459       fatal(0, "audio device does not support kAudioFormatLinearPCM");
460     status = AudioDeviceAddIOProc(adid, adioproc, 0);
461     if(status)
462       fatal(0, "AudioDeviceAddIOProc: %d", (int)status);
463     pthread_mutex_lock(&lock);
464     for(;;) {
465       /* Wait for the buffer to fill up a bit */
466       while(nsamples < READAHEAD)
467         pthread_cond_wait(&cond, &lock);
468       /* Start playing now */
469       status = AudioDeviceStart(adid, adioproc);
470       if(status)
471         fatal(0, "AudioDeviceStart: %d", (int)status);
472       /* Wait until the buffer empties out */
473       while(nsamples >= MINBUFFER)
474         pthread_cond_wait(&cond, &lock);
475       /* Stop playing for a bit until the buffer re-fills */
476       status = AudioDeviceStop(adid, adioproc);
477       if(status)
478         fatal(0, "AudioDeviceStop: %d", (int)status);
479       /* Go back round */
480     }
481   }
482 #else
483 # error No known audio API
484 #endif
485 }
486
487 /* display usage message and terminate */
488 static void help(void) {
489   xprintf("Usage:\n"
490           "  disorder-playrtp [OPTIONS] ADDRESS [PORT]\n"
491           "Options:\n"
492           "  --help, -h              Display usage message\n"
493           "  --version, -V           Display version number\n"
494           "  --debug, -d             Turn on debugging\n"
495           "  --device, -D DEVICE     Output device\n");
496   xfclose(stdout);
497   exit(0);
498 }
499
500 /* display version number and terminate */
501 static void version(void) {
502   xprintf("disorder-playrtp version %s\n", disorder_version_string);
503   xfclose(stdout);
504   exit(0);
505 }
506
507 int main(int argc, char **argv) {
508   int n;
509   struct addrinfo *res;
510   struct stringlist sl;
511   char *sockname;
512
513   static const struct addrinfo prefs = {
514     AI_PASSIVE,
515     PF_INET,
516     SOCK_DGRAM,
517     IPPROTO_UDP,
518     0,
519     0,
520     0,
521     0
522   };
523
524   mem_init();
525   if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale");
526   while((n = getopt_long(argc, argv, "hVdD", options, 0)) >= 0) {
527     switch(n) {
528     case 'h': help();
529     case 'V': version();
530     case 'd': debugging = 1; break;
531     case 'D': device = optarg; break;
532     default: fatal(0, "invalid option");
533     }
534   }
535   argc -= optind;
536   argv += optind;
537   if(argc < 1 || argc > 2)
538     fatal(0, "usage: disorder-playrtp [OPTIONS] ADDRESS [PORT]");
539   sl.n = argc;
540   sl.s = argv;
541   /* Listen for inbound audio data */
542   if(!(res = get_address(&sl, &prefs, &sockname)))
543     exit(1);
544   if((rtpfd = socket(res->ai_family,
545                      res->ai_socktype,
546                      res->ai_protocol)) < 0)
547     fatal(errno, "error creating socket");
548   if(bind(rtpfd, res->ai_addr, res->ai_addrlen) < 0)
549     fatal(errno, "error binding socket to %s", sockname);
550   play_rtp();
551   return 0;
552 }
553
554 /*
555 Local Variables:
556 c-basic-offset:2
557 comment-column:40
558 fill-column:79
559 indent-tabs-mode:nil
560 End:
561 */