chiark / gitweb /
missing samples_written assign
[disorder] / clients / playrtp.c
1 /*
2  * This file is part of DisOrder.
3  * Copyright (C) 2007 Richard Kettlewell
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA
19  */
20
21 #include <config.h>
22 #include "types.h"
23
24 #include <getopt.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <netdb.h>
31 #include <pthread.h>
32 #include <locale.h>
33
34 #include "log.h"
35 #include "mem.h"
36 #include "configuration.h"
37 #include "addr.h"
38 #include "syscalls.h"
39 #include "rtp.h"
40 #include "defs.h"
41
42 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
43 # include <CoreAudio/AudioHardware.h>
44 #endif
45 #if API_ALSA
46 #include <alsa/asoundlib.h>
47 #endif
48
49 #define readahead linux_headers_are_borked
50
51 /** @brief RTP socket */
52 static int rtpfd;
53
54 /** @brief Output device */
55 static const char *device;
56
57 /** @brief Maximum samples per packet we'll support
58  *
59  * NB that two channels = two samples in this program.
60  */
61 #define MAXSAMPLES 2048
62
63 /** @brief Minimum buffer size
64  *
65  * We'll stop playing if there's only this many samples in the buffer. */
66 static unsigned minbuffer = 2 * 44100 / 10;  /* 0.2 seconds */
67
68 /** @brief Maximum sample size
69  *
70  * The maximum supported size (in bytes) of one sample. */
71 #define MAXSAMPLESIZE 2
72
73 /** @brief Buffer size
74  *
75  * We'll only start playing when this many samples are available. */
76 static unsigned readahead = 4 * 2 * 44100; /* 4 seconds */
77
78 #define MAXBUFFER (3 * 88200)           /* maximum buffer contents */
79
80 /** @brief Received packet
81  *
82  * Packets are recorded in an ordered linked list. */
83 struct packet {
84   /** @brief Pointer to next packet
85    * The next packet might not be immediately next: if packets are dropped
86    * or mis-ordered there may be gaps at any given moment. */
87   struct packet *next;
88   /** @brief Number of samples in this packet */
89   int nsamples;
90   /** @brief Number of samples used from this packet */
91   int nused;
92   /** @brief Timestamp from RTP packet
93    *
94    * NB that "timestamps" are really sample counters.*/
95   uint32_t timestamp;
96 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
97   /** @brief Converted sample data */
98   float samples_float[MAXSAMPLES];
99 #else
100   /** @brief Raw sample data */
101   unsigned char samples_raw[MAXSAMPLES * MAXSAMPLESIZE];
102 #endif
103 };
104
105 /** @brief Total number of samples available */
106 static unsigned long nsamples;
107
108 /** @brief Linked list of packets
109  *
110  * In ascending order of timestamp. */
111 static struct packet *packets;
112
113 /** @brief Timestamp of next packet to play.
114  *
115  * This is set to the timestamp of the last packet, plus the number of
116  * samples it contained.  Only valid if @ref active is nonzero.
117  */
118 static uint32_t next_timestamp;
119
120 /** @brief True if actively playing
121  *
122  * This is true when playing and false when just buffering. */
123 static int active;
124
125 /** @brief Lock protecting @ref packets */
126 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
127
128 /** @brief Condition variable signalled whenever @ref packets is changed */
129 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
130
131 static const struct option options[] = {
132   { "help", no_argument, 0, 'h' },
133   { "version", no_argument, 0, 'V' },
134   { "debug", no_argument, 0, 'd' },
135   { "device", required_argument, 0, 'D' },
136   { "min", required_argument, 0, 'm' },
137   { "buffer", required_argument, 0, 'b' },
138   { 0, 0, 0, 0 }
139 };
140
141 /** @brief Return true iff a < b in sequence-space arithmetic */
142 static inline int lt(uint32_t a, uint32_t b) {
143   return (uint32_t)(a - b) & 0x80000000;
144 }
145
146 /** @brief Background thread collecting samples
147  *
148  * This function collects samples, perhaps converts them to the target format,
149  * and adds them to the packet list. */
150 static void *listen_thread(void attribute((unused)) *arg) {
151   struct packet *p = 0, **pp;
152   int n;
153   union {
154     struct rtp_header header;
155     uint8_t bytes[sizeof(uint16_t) * MAXSAMPLES + sizeof (struct rtp_header)];
156   } packet;
157   const uint16_t *const samples = (uint16_t *)(packet.bytes
158                                                + sizeof (struct rtp_header));
159
160   for(;;) {
161     if(!p)
162       p = xmalloc(sizeof *p);
163     n = read(rtpfd, packet.bytes, sizeof packet.bytes);
164     if(n < 0) {
165       switch(errno) {
166       case EINTR:
167         continue;
168       default:
169         fatal(errno, "error reading from socket");
170       }
171     }
172     /* Ignore too-short packets */
173     if((size_t)n <= sizeof (struct rtp_header))
174       continue;
175     p->nused = 0;
176     p->timestamp = ntohl(packet.header.timestamp);
177     /* Ignore packets in the past */
178     if(active && lt(p->timestamp, next_timestamp))
179       continue;
180     /* Convert to target format */
181     switch(packet.header.mpt & 0x7F) {
182     case 10:
183       p->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t);
184 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
185       /* Convert to what Core Audio expects */
186       for(n = 0; n < p->nsamples; ++n)
187         p->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767);
188 #else
189       /* ALSA can do any necessary conversion itself (though it might be better
190        * to do any necessary conversion in the background) */
191       memcpy(p->samples_raw, samples, n - sizeof (struct rtp_header));
192 #endif
193       break;
194       /* TODO support other RFC3551 media types (when the speaker does) */
195     default:
196       fatal(0, "unsupported RTP payload type %d",
197             packet.header.mpt & 0x7F);
198     }
199     pthread_mutex_lock(&lock);
200     /* Stop reading if we've reached the maximum.
201      *
202      * This is rather unsatisfactory: it means that if packets get heavily
203      * out of order then we guarantee dropouts.  But for now... */
204     while(nsamples >= MAXBUFFER)
205       pthread_cond_wait(&cond, &lock);
206     for(pp = &packets;
207         *pp && lt((*pp)->timestamp, p->timestamp);
208         pp = &(*pp)->next)
209       ;
210     /* So now either !*pp or *pp >= p */
211     if(*pp && p->timestamp == (*pp)->timestamp) {
212       /* *pp == p; a duplicate.  Ideally we avoid the translation step here,
213        * but we'll worry about that another time. */
214     } else {
215       p->next = *pp;
216       *pp = p;
217       nsamples += p->nsamples;
218       pthread_cond_broadcast(&cond);
219       p = 0;                            /* we've consumed this packet */
220     }
221     pthread_mutex_unlock(&lock);
222   }
223 }
224
225 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
226 /** @brief Callback from Core Audio */
227 static OSStatus adioproc(AudioDeviceID inDevice,
228                          const AudioTimeStamp *inNow,
229                          const AudioBufferList *inInputData,
230                          const AudioTimeStamp *inInputTime,
231                          AudioBufferList *outOutputData,
232                          const AudioTimeStamp *inOutputTime,
233                          void *inClientData) {
234   UInt32 nbuffers = outOutputData->mNumberBuffers;
235   AudioBuffer *ab = outOutputData->mBuffers;
236   float *samplesOut;                    /* where to write samples to */
237   size_t samplesOutLeft;                /* space left */
238   size_t samplesInLeft;
239   size_t samplesToCopy;
240
241   pthread_mutex_lock(&lock);
242   samplesOut = ab->data;
243   samplesOutLeft = ab->mDataByteSize / sizeof (float);
244   while(packets && nbuffers > 0) {
245     if(packets->used == packets->nsamples) {
246       /* TODO if we dropped a packet then we should introduce a gap here */
247       struct packet *const p = packets;
248       packets = p->next;
249       free(p);
250       pthread_cond_broadcast(&cond);
251       continue;
252     }
253     if(samplesOutLeft == 0) {
254       --nbuffers;
255       ++ab;
256       samplesOut = ab->data;
257       samplesOutLeft = ab->mDataByteSize / sizeof (float);
258       continue;
259     }
260     /* Now: (1) there is some data left to read
261      *      (2) there is some space to put it */
262     samplesInLeft = packets->nsamples - packets->used;
263     samplesToCopy = (samplesInLeft < samplesOutLeft
264                      ? samplesInLeft : samplesOutLeft);
265     memcpy(samplesOut, packet->samples + packets->used, samplesToCopy);
266     packets->used += samplesToCopy;
267     samplesOut += samplesToCopy;
268     samesOutLeft -= samplesToCopy;
269   }
270   pthread_mutex_unlock(&lock);
271   return 0;
272 }
273 #endif
274
275 /** @brief Play an RTP stream
276  *
277  * This is the guts of the program.  It is responsible for:
278  * - starting the listening thread
279  * - opening the audio device
280  * - reading ahead to build up a buffer
281  * - arranging for audio to be played
282  * - detecting when the buffer has got too small and re-buffering
283  */
284 static void play_rtp(void) {
285   pthread_t ltid;
286
287   /* We receive and convert audio data in a background thread */
288   pthread_create(&ltid, 0, listen_thread, 0);
289 #if API_ALSA
290   {
291     snd_pcm_t *pcm;
292     snd_pcm_hw_params_t *hwparams;
293     snd_pcm_sw_params_t *swparams;
294     /* Only support one format for now */
295     const int sample_format = SND_PCM_FORMAT_S16_BE;
296     unsigned rate = 44100;
297     const int channels = 2;
298     const int samplesize = channels * sizeof(uint16_t);
299     snd_pcm_uframes_t pcm_bufsize = MAXSAMPLES * samplesize * 3;
300     /* If we can write more than this many samples we'll get a wakeup */
301     const int avail_min = 256;
302     snd_pcm_sframes_t frames_written;
303     size_t samples_written;
304     int prepared = 1;
305     int err;
306     int infilling = 0;
307
308     /* Open ALSA */
309     if((err = snd_pcm_open(&pcm,
310                            device ? device : "default",
311                            SND_PCM_STREAM_PLAYBACK,
312                            SND_PCM_NONBLOCK)))
313       fatal(0, "error from snd_pcm_open: %d", err);
314     /* Set up 'hardware' parameters */
315     snd_pcm_hw_params_alloca(&hwparams);
316     if((err = snd_pcm_hw_params_any(pcm, hwparams)) < 0)
317       fatal(0, "error from snd_pcm_hw_params_any: %d", err);
318     if((err = snd_pcm_hw_params_set_access(pcm, hwparams,
319                                            SND_PCM_ACCESS_RW_INTERLEAVED)) < 0)
320       fatal(0, "error from snd_pcm_hw_params_set_access: %d", err);
321     if((err = snd_pcm_hw_params_set_format(pcm, hwparams,
322                                            sample_format)) < 0)
323       fatal(0, "error from snd_pcm_hw_params_set_format (%d): %d",
324             sample_format, err);
325     if((err = snd_pcm_hw_params_set_rate_near(pcm, hwparams, &rate, 0)) < 0)
326       fatal(0, "error from snd_pcm_hw_params_set_rate (%d): %d",
327             rate, err);
328     if((err = snd_pcm_hw_params_set_channels(pcm, hwparams,
329                                              channels)) < 0)
330       fatal(0, "error from snd_pcm_hw_params_set_channels (%d): %d",
331             channels, err);
332     if((err = snd_pcm_hw_params_set_buffer_size_near(pcm, hwparams,
333                                                      &pcm_bufsize)) < 0)
334       fatal(0, "error from snd_pcm_hw_params_set_buffer_size (%d): %d",
335             MAXSAMPLES * samplesize * 3, err);
336     if((err = snd_pcm_hw_params(pcm, hwparams)) < 0)
337       fatal(0, "error calling snd_pcm_hw_params: %d", err);
338     /* Set up 'software' parameters */
339     snd_pcm_sw_params_alloca(&swparams);
340     if((err = snd_pcm_sw_params_current(pcm, swparams)) < 0)
341       fatal(0, "error calling snd_pcm_sw_params_current: %d", err);
342     if((err = snd_pcm_sw_params_set_avail_min(pcm, swparams, avail_min)) < 0)
343       fatal(0, "error calling snd_pcm_sw_params_set_avail_min %d: %d",
344             avail_min, err);
345     if((err = snd_pcm_sw_params(pcm, swparams)) < 0)
346       fatal(0, "error calling snd_pcm_sw_params: %d", err);
347
348     /* Ready to go */
349
350     pthread_mutex_lock(&lock);
351     for(;;) {
352       /* Wait for the buffer to fill up a bit */
353       info("Buffering...");
354       while(nsamples < readahead)
355         pthread_cond_wait(&cond, &lock);
356       if(!prepared) {
357         if((err = snd_pcm_prepare(pcm)))
358           fatal(0, "error calling snd_pcm_prepare: %d", err);
359         prepared = 1;
360       }
361       /* Start at the first available packet */
362       next_timestamp = packets->timestamp;
363       active = 1;
364       infilling = 0;
365       info("Playing...");
366       /* Wait until the buffer empties out */
367       while(nsamples >= minbuffer) {
368         /* Wait for ALSA to ask us for more data */
369         pthread_mutex_unlock(&lock);
370         snd_pcm_wait(pcm, -1);
371         pthread_mutex_lock(&lock);
372         /* ALSA is ready for more data */
373         if(packets && packets->timestamp + packets->nused == next_timestamp) {
374           /* Hooray, we have a packet we can play */
375           const size_t samples_available = packets->nsamples - packets->nused;
376           const size_t frames_available = samples_available / 2;
377
378           frames_written = snd_pcm_writei(pcm,
379                                           packets->samples_raw + packets->nused,
380                                           frames_available);
381           if(frames_written < 0) {
382             if(frames_written != -EAGAIN)
383               fatal(0, "error calling snd_pcm_writei: %ld",
384                     (long)frames_written);
385           } else {
386             samples_written = frames_written * 2;
387             packets->nused += samples_written;
388             next_timestamp += samples_written;
389             if(packets->nused == packets->nsamples) {
390               /* We're done with this packet */
391               struct packet *p = packets;
392               
393               packets = p->next;
394               nsamples -= p->nsamples;
395               free(p);
396               pthread_cond_broadcast(&cond);
397             }
398             infilling = 0;
399           }
400         } else {
401           /* We don't have anything to play!  We'd better play some 0s. */
402           static const uint16_t zeros[1024];
403           size_t samples_available = 1024, frames_available;
404
405           if(!infilling) {
406             info("Infilling...");
407             infilling = 1;
408           }
409           if(packets && next_timestamp + samples_available > packets->timestamp)
410             samples_available = packets->timestamp - next_timestamp;
411           frames_available = samples_available / 2;
412           frames_written = snd_pcm_writei(pcm,
413                                           zeros,
414                                           frames_available);
415           if(frames_written < 0) {
416             if(frames_written != -EAGAIN)
417               fatal(0, "error calling snd_pcm_writei: %ld",
418                     (long)frames_written);
419           } else {
420             samples_written = frames_written * 2;
421             next_timestamp += samples_written;
422           }
423         }
424       }
425       active = 0;
426       /* We stop playing for a bit until the buffer re-fills */
427       pthread_mutex_unlock(&lock);
428       if((err = snd_pcm_nonblock(pcm, 0)))
429         fatal(0, "error calling snd_pcm_nonblock: %d", err);
430       if((err = snd_pcm_drain(pcm)))
431         fatal(0, "error calling snd_pcm_drain: %d", err);
432       if((err = snd_pcm_nonblock(pcm, 1)))
433         fatal(0, "error calling snd_pcm_nonblock: %d", err);
434       prepared = 0;
435       pthread_mutex_lock(&lock);
436     }
437
438   }
439 #elif HAVE_COREAUDIO_AUDIOHARDWARE_H
440   {
441     OSStatus status;
442     UInt32 propertySize;
443     AudioDeviceID adid;
444     AudioStreamBasicDescription asbd;
445
446     /* If this looks suspiciously like libao's macosx driver there's an
447      * excellent reason for that... */
448
449     /* TODO report errors as strings not numbers */
450     propertySize = sizeof adid;
451     status = AudioHardwareGetProperty(kAudioHardwarePropertyDefaultOutputDevice,
452                                       &propertySize, &adid);
453     if(status)
454       fatal(0, "AudioHardwareGetProperty: %d", (int)status);
455     if(adid == kAudioDeviceUnknown)
456       fatal(0, "no output device");
457     propertySize = sizeof asbd;
458     status = AudioDeviceGetProperty(adid, 0, false,
459                                     kAudioDevicePropertyStreamFormat,
460                                     &propertySize, &asbd);
461     if(status)
462       fatal(0, "AudioHardwareGetProperty: %d", (int)status);
463     D(("mSampleRate       %f", asbd.mSampleRate));
464     D(("mFormatID         %08"PRIx32, asbd.mFormatID));
465     D(("mFormatFlags      %08"PRIx32, asbd.mFormatFlags));
466     D(("mBytesPerPacket   %08"PRIx32, asbd.mBytesPerPacket));
467     D(("mFramesPerPacket  %08"PRIx32, asbd.mFramesPerPacket));
468     D(("mBytesPerFrame    %08"PRIx32, asbd.mBytesPerFrame));
469     D(("mChannelsPerFrame %08"PRIx32, asbd.mChannelsPerFrame));
470     D(("mBitsPerChannel   %08"PRIx32, asbd.mBitsPerChannel));
471     D(("mReserved         %08"PRIx32, asbd.mReserved));
472     if(asbd.mFormatID != kAudioFormatLinearPCM)
473       fatal(0, "audio device does not support kAudioFormatLinearPCM");
474     status = AudioDeviceAddIOProc(adid, adioproc, 0);
475     if(status)
476       fatal(0, "AudioDeviceAddIOProc: %d", (int)status);
477     pthread_mutex_lock(&lock);
478     for(;;) {
479       /* Wait for the buffer to fill up a bit */
480       while(nsamples < readahead)
481         pthread_cond_wait(&cond, &lock);
482       /* Start playing now */
483       status = AudioDeviceStart(adid, adioproc);
484       if(status)
485         fatal(0, "AudioDeviceStart: %d", (int)status);
486       /* Wait until the buffer empties out */
487       while(nsamples >= minbuffer)
488         pthread_cond_wait(&cond, &lock);
489       /* Stop playing for a bit until the buffer re-fills */
490       status = AudioDeviceStop(adid, adioproc);
491       if(status)
492         fatal(0, "AudioDeviceStop: %d", (int)status);
493       /* Go back round */
494     }
495   }
496 #else
497 # error No known audio API
498 #endif
499 }
500
501 /* display usage message and terminate */
502 static void help(void) {
503   xprintf("Usage:\n"
504           "  disorder-playrtp [OPTIONS] ADDRESS [PORT]\n"
505           "Options:\n"
506           "  --help, -h              Display usage message\n"
507           "  --version, -V           Display version number\n"
508           "  --debug, -d             Turn on debugging\n"
509           "  --device, -D DEVICE     Output device\n"
510           "  --min, -m FRAMES        Buffer low water mark\n"
511           "  --buffer, -b FRAMES     Buffer high water mark\n");
512   xfclose(stdout);
513   exit(0);
514 }
515
516 /* display version number and terminate */
517 static void version(void) {
518   xprintf("disorder-playrtp version %s\n", disorder_version_string);
519   xfclose(stdout);
520   exit(0);
521 }
522
523 int main(int argc, char **argv) {
524   int n;
525   struct addrinfo *res;
526   struct stringlist sl;
527   char *sockname;
528
529   static const struct addrinfo prefs = {
530     AI_PASSIVE,
531     PF_INET,
532     SOCK_DGRAM,
533     IPPROTO_UDP,
534     0,
535     0,
536     0,
537     0
538   };
539
540   mem_init();
541   if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale");
542   while((n = getopt_long(argc, argv, "hVdD:m:b:", options, 0)) >= 0) {
543     switch(n) {
544     case 'h': help();
545     case 'V': version();
546     case 'd': debugging = 1; break;
547     case 'D': device = optarg; break;
548     case 'm': minbuffer = 2 * atol(optarg); break;
549     case 'b': readahead = 2 * atol(optarg); break;
550     default: fatal(0, "invalid option");
551     }
552   }
553   argc -= optind;
554   argv += optind;
555   if(argc < 1 || argc > 2)
556     fatal(0, "usage: disorder-playrtp [OPTIONS] ADDRESS [PORT]");
557   sl.n = argc;
558   sl.s = argv;
559   /* Listen for inbound audio data */
560   if(!(res = get_address(&sl, &prefs, &sockname)))
561     exit(1);
562   if((rtpfd = socket(res->ai_family,
563                      res->ai_socktype,
564                      res->ai_protocol)) < 0)
565     fatal(errno, "error creating socket");
566   if(bind(rtpfd, res->ai_addr, res->ai_addrlen) < 0)
567     fatal(errno, "error binding socket to %s", sockname);
568   play_rtp();
569   return 0;
570 }
571
572 /*
573 Local Variables:
574 c-basic-offset:2
575 comment-column:40
576 fill-column:79
577 indent-tabs-mode:nil
578 End:
579 */