chiark / gitweb /
playrtp: support multiple unicast mode
[disorder] / clients / playrtp.c
1 /*
2  * This file is part of DisOrder.
3  * Copyright (C) 2007-2009, 2011, 2013 Richard Kettlewell
4  *
5  * This program is free software: you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation, either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * 
15  * You should have received a copy of the GNU General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18 /** @file clients/playrtp.c
19  * @brief RTP player
20  *
21  * This player supports Linux (<a href="http://www.alsa-project.org/">ALSA</a>)
22  * and Apple Mac (<a
23  * href="http://developer.apple.com/audio/coreaudio.html">Core Audio</a>)
24  * systems.  There is no support for Microsoft Windows yet, and that will in
25  * fact probably an entirely separate program.
26  *
27  * The program runs (at least) three threads:
28  *
29  * listen_thread() is responsible for reading RTP packets off the wire and
30  * adding them to the linked list @ref received_packets, assuming they are
31  * basically sound.
32  *
33  * queue_thread() takes packets off this linked list and adds them to @ref
34  * packets (an operation which might be much slower due to contention for @ref
35  * lock).
36  *
37  * control_thread() accepts commands from Disobedience (or anything else).
38  *
39  * The main thread activates and deactivates audio playing via the @ref
40  * lib/uaudio.h API (which probably implies at least one further thread).
41  *
42  * Sometimes it happens that there is no audio available to play.  This may
43  * because the server went away, or a packet was dropped, or the server
44  * deliberately did not send any sound because it encountered a silence.
45  *
46  * Assumptions:
47  * - it is safe to read uint32_t values without a lock protecting them
48  */
49
50 #include "common.h"
51
52 #include <getopt.h>
53 #include <sys/socket.h>
54 #include <sys/types.h>
55 #include <sys/socket.h>
56 #include <netdb.h>
57 #include <pthread.h>
58 #include <locale.h>
59 #include <sys/uio.h>
60 #include <errno.h>
61 #include <netinet/in.h>
62 #include <sys/time.h>
63 #include <sys/un.h>
64 #include <unistd.h>
65 #include <sys/mman.h>
66 #include <fcntl.h>
67 #include <math.h>
68 #include <arpa/inet.h>
69 #include <ifaddrs.h>
70 #include <net/if.h>
71
72 #include "log.h"
73 #include "mem.h"
74 #include "configuration.h"
75 #include "addr.h"
76 #include "syscalls.h"
77 #include "rtp.h"
78 #include "defs.h"
79 #include "vector.h"
80 #include "heap.h"
81 #include "timeval.h"
82 #include "client.h"
83 #include "playrtp.h"
84 #include "inputline.h"
85 #include "version.h"
86 #include "uaudio.h"
87
88 /** @brief Obsolete synonym */
89 #ifndef IPV6_JOIN_GROUP
90 # define IPV6_JOIN_GROUP IPV6_ADD_MEMBERSHIP
91 #endif
92
93 /** @brief RTP socket */
94 static int rtpfd;
95
96 /** @brief Log output */
97 static FILE *logfp;
98
99 /** @brief Output device */
100
101 /** @brief Buffer low watermark in samples */
102 unsigned minbuffer = 4 * (2 * 44100) / 10;  /* 0.4 seconds */
103
104 /** @brief Maximum buffer size in samples
105  *
106  * We'll stop reading from the network if we have this many samples.
107  */
108 static unsigned maxbuffer;
109
110 /** @brief Received packets
111  * Protected by @ref receive_lock
112  *
113  * Received packets are added to this list, and queue_thread() picks them off
114  * it and adds them to @ref packets.  Whenever a packet is added to it, @ref
115  * receive_cond is signalled.
116  */
117 struct packet *received_packets;
118
119 /** @brief Tail of @ref received_packets
120  * Protected by @ref receive_lock
121  */
122 struct packet **received_tail = &received_packets;
123
124 /** @brief Lock protecting @ref received_packets 
125  *
126  * Only listen_thread() and queue_thread() ever hold this lock.  It is vital
127  * that queue_thread() not hold it any longer than it strictly has to. */
128 pthread_mutex_t receive_lock = PTHREAD_MUTEX_INITIALIZER;
129
130 /** @brief Condition variable signalled when @ref received_packets is updated
131  *
132  * Used by listen_thread() to notify queue_thread() that it has added another
133  * packet to @ref received_packets. */
134 pthread_cond_t receive_cond = PTHREAD_COND_INITIALIZER;
135
136 /** @brief Length of @ref received_packets */
137 uint32_t nreceived;
138
139 /** @brief Binary heap of received packets */
140 struct pheap packets;
141
142 /** @brief Total number of samples available
143  *
144  * We make this volatile because we inspect it without a protecting lock,
145  * so the usual pthread_* guarantees aren't available.
146  */
147 volatile uint32_t nsamples;
148
149 /** @brief Timestamp of next packet to play.
150  *
151  * This is set to the timestamp of the last packet, plus the number of
152  * samples it contained.  Only valid if @ref active is nonzero.
153  */
154 uint32_t next_timestamp;
155
156 /** @brief True if actively playing
157  *
158  * This is true when playing and false when just buffering. */
159 int active;
160
161 /** @brief Lock protecting @ref packets */
162 pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
163
164 /** @brief Condition variable signalled whenever @ref packets is changed */
165 pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
166
167 /** @brief Backend to play with */
168 static const struct uaudio *backend;
169
170 HEAP_DEFINE(pheap, struct packet *, lt_packet);
171
172 /** @brief Control socket or NULL */
173 const char *control_socket;
174
175 /** @brief Buffer for debugging dump
176  *
177  * The debug dump is enabled by the @c --dump option.  It records the last 20s
178  * of audio to the specified file (which will be about 3.5Mbytes).  The file is
179  * written as as ring buffer, so the start point will progress through it.
180  *
181  * Use clients/dump2wav to convert this to a WAV file, which can then be loaded
182  * into (e.g.) Audacity for further inspection.
183  *
184  * All three backends (ALSA, OSS, Core Audio) now support this option.
185  *
186  * The idea is to allow the user a few seconds to react to an audible artefact.
187  */
188 int16_t *dump_buffer;
189
190 /** @brief Current index within debugging dump */
191 size_t dump_index;
192
193 /** @brief Size of debugging dump in samples */
194 size_t dump_size = 44100/*Hz*/ * 2/*channels*/ * 20/*seconds*/;
195
196 static const struct option options[] = {
197   { "help", no_argument, 0, 'h' },
198   { "version", no_argument, 0, 'V' },
199   { "debug", no_argument, 0, 'd' },
200   { "device", required_argument, 0, 'D' },
201   { "min", required_argument, 0, 'm' },
202   { "max", required_argument, 0, 'x' },
203   { "rcvbuf", required_argument, 0, 'R' },
204 #if HAVE_SYS_SOUNDCARD_H || EMPEG_HOST
205   { "oss", no_argument, 0, 'o' },
206 #endif
207 #if HAVE_ALSA_ASOUNDLIB_H
208   { "alsa", no_argument, 0, 'a' },
209 #endif
210 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
211   { "core-audio", no_argument, 0, 'c' },
212 #endif
213   { "api", required_argument, 0, 'A' },
214   { "dump", required_argument, 0, 'r' },
215   { "command", required_argument, 0, 'e' },
216   { "pause-mode", required_argument, 0, 'P' },
217   { "socket", required_argument, 0, 's' },
218   { "config", required_argument, 0, 'C' },
219   { "monitor", no_argument, 0, 'M' },
220   { 0, 0, 0, 0 }
221 };
222
223 /** @brief Control thread
224  *
225  * This thread is responsible for accepting control commands from Disobedience
226  * (or other controllers) over an AF_UNIX stream socket with a path specified
227  * by the @c --socket option.  The protocol uses simple string commands and
228  * replies:
229  *
230  * - @c stop will shut the player down
231  * - @c query will send back the reply @c running
232  * - anything else is ignored
233  *
234  * Commands and response strings terminated by shutting down the connection or
235  * by a newline.  No attempt is made to multiplex multiple clients so it is
236  * important that the command be sent as soon as the connection is made - it is
237  * assumed that both parties to the protocol are entirely cooperating with one
238  * another.
239  */
240 static void *control_thread(void attribute((unused)) *arg) {
241   struct sockaddr_un sa;
242   int sfd, cfd;
243   char *line;
244   socklen_t salen;
245   FILE *fp;
246
247   assert(control_socket);
248   unlink(control_socket);
249   memset(&sa, 0, sizeof sa);
250   sa.sun_family = AF_UNIX;
251   strcpy(sa.sun_path, control_socket);
252   sfd = xsocket(PF_UNIX, SOCK_STREAM, 0);
253   if(bind(sfd, (const struct sockaddr *)&sa, sizeof sa) < 0)
254     disorder_fatal(errno, "error binding to %s", control_socket);
255   if(listen(sfd, 128) < 0)
256     disorder_fatal(errno, "error calling listen on %s", control_socket);
257   disorder_info("listening on %s", control_socket);
258   for(;;) {
259     salen = sizeof sa;
260     cfd = accept(sfd, (struct sockaddr *)&sa, &salen);
261     if(cfd < 0) {
262       switch(errno) {
263       case EINTR:
264       case EAGAIN:
265         break;
266       default:
267         disorder_fatal(errno, "error calling accept on %s", control_socket);
268       }
269     }
270     if(!(fp = fdopen(cfd, "r+"))) {
271       disorder_error(errno, "error calling fdopen for %s connection", control_socket);
272       close(cfd);
273       continue;
274     }
275     if(!inputline(control_socket, fp, &line, '\n')) {
276       if(!strcmp(line, "stop")) {
277         disorder_info("stopped via %s", control_socket);
278         exit(0);                          /* terminate immediately */
279       }
280       if(!strcmp(line, "query"))
281         fprintf(fp, "running");
282       xfree(line);
283     }
284     if(fclose(fp) < 0)
285       disorder_error(errno, "error closing %s connection", control_socket);
286   }
287 }
288
289 /** @brief Drop the first packet
290  *
291  * Assumes that @ref lock is held. 
292  */
293 static void drop_first_packet(void) {
294   if(pheap_count(&packets)) {
295     struct packet *const p = pheap_remove(&packets);
296     nsamples -= p->nsamples;
297     playrtp_free_packet(p);
298     pthread_cond_broadcast(&cond);
299   }
300 }
301
302 /** @brief Background thread adding packets to heap
303  *
304  * This just transfers packets from @ref received_packets to @ref packets.  It
305  * is important that it holds @ref receive_lock for as little time as possible,
306  * in order to minimize the interval between calls to read() in
307  * listen_thread().
308  */
309 static void *queue_thread(void attribute((unused)) *arg) {
310   struct packet *p;
311
312   for(;;) {
313     /* Get the next packet */
314     pthread_mutex_lock(&receive_lock);
315     while(!received_packets) {
316       pthread_cond_wait(&receive_cond, &receive_lock);
317     }
318     p = received_packets;
319     received_packets = p->next;
320     if(!received_packets)
321       received_tail = &received_packets;
322     --nreceived;
323     pthread_mutex_unlock(&receive_lock);
324     /* Add it to the heap */
325     pthread_mutex_lock(&lock);
326     pheap_insert(&packets, p);
327     nsamples += p->nsamples;
328     pthread_cond_broadcast(&cond);
329     pthread_mutex_unlock(&lock);
330   }
331 #if HAVE_STUPID_GCC44
332   return NULL;
333 #endif
334 }
335
336 /** @brief Background thread collecting samples
337  *
338  * This function collects samples, perhaps converts them to the target format,
339  * and adds them to the packet list.
340  *
341  * It is crucial that the gap between successive calls to read() is as small as
342  * possible: otherwise packets will be dropped.
343  *
344  * We use a binary heap to ensure that the unavoidable effort is at worst
345  * logarithmic in the total number of packets - in fact if packets are mostly
346  * received in order then we will largely do constant work per packet since the
347  * newest packet will always be last.
348  *
349  * Of more concern is that we must acquire the lock on the heap to add a packet
350  * to it.  If this proves a problem in practice then the answer would be
351  * (probably doubly) linked list with new packets added the end and a second
352  * thread which reads packets off the list and adds them to the heap.
353  *
354  * We keep memory allocation (mostly) very fast by keeping pre-allocated
355  * packets around; see @ref playrtp_new_packet().
356  */
357 static void *listen_thread(void attribute((unused)) *arg) {
358   struct packet *p = 0;
359   int n;
360   struct rtp_header header;
361   uint16_t seq;
362   uint32_t timestamp;
363   struct iovec iov[2];
364
365   for(;;) {
366     if(!p)
367       p = playrtp_new_packet();
368     iov[0].iov_base = &header;
369     iov[0].iov_len = sizeof header;
370     iov[1].iov_base = p->samples_raw;
371     iov[1].iov_len = sizeof p->samples_raw / sizeof *p->samples_raw;
372     n = readv(rtpfd, iov, 2);
373     if(n < 0) {
374       switch(errno) {
375       case EINTR:
376         continue;
377       default:
378         disorder_fatal(errno, "error reading from socket");
379       }
380     }
381     /* Ignore too-short packets */
382     if((size_t)n <= sizeof (struct rtp_header)) {
383       disorder_info("ignored a short packet");
384       continue;
385     }
386     timestamp = htonl(header.timestamp);
387     seq = htons(header.seq);
388     /* Ignore packets in the past */
389     if(active && lt(timestamp, next_timestamp)) {
390       disorder_info("dropping old packet, timestamp=%"PRIx32" < %"PRIx32,
391            timestamp, next_timestamp);
392       continue;
393     }
394     /* Ignore packets with the extension bit set. */
395     if(header.vpxcc & 0x10)
396       continue;
397     p->next = 0;
398     p->flags = 0;
399     p->timestamp = timestamp;
400     /* Convert to target format */
401     if(header.mpt & 0x80)
402       p->flags |= IDLE;
403     switch(header.mpt & 0x7F) {
404     case 10:                            /* L16 */
405       p->nsamples = (n - sizeof header) / sizeof(uint16_t);
406       break;
407       /* TODO support other RFC3551 media types (when the speaker does) */
408     default:
409       disorder_fatal(0, "unsupported RTP payload type %d", header.mpt & 0x7F);
410     }
411     /* See if packet is silent */
412     const uint16_t *s = p->samples_raw;
413     n = p->nsamples;
414     for(; n > 0; --n)
415       if(*s++)
416         break;
417     if(!n)
418       p->flags |= SILENT;
419     if(logfp)
420       fprintf(logfp, "sequence %u timestamp %"PRIx32" length %"PRIx32" end %"PRIx32"\n",
421               seq, timestamp, p->nsamples, timestamp + p->nsamples);
422     /* Stop reading if we've reached the maximum.
423      *
424      * This is rather unsatisfactory: it means that if packets get heavily
425      * out of order then we guarantee dropouts.  But for now... */
426     if(nsamples >= maxbuffer) {
427       pthread_mutex_lock(&lock);
428       while(nsamples >= maxbuffer) {
429         pthread_cond_wait(&cond, &lock);
430       }
431       pthread_mutex_unlock(&lock);
432     }
433     /* Add the packet to the receive queue */
434     pthread_mutex_lock(&receive_lock);
435     *received_tail = p;
436     received_tail = &p->next;
437     ++nreceived;
438     pthread_cond_signal(&receive_cond);
439     pthread_mutex_unlock(&receive_lock);
440     /* We'll need a new packet */
441     p = 0;
442   }
443 }
444
445 /** @brief Wait until the buffer is adequately full
446  *
447  * Must be called with @ref lock held.
448  */
449 void playrtp_fill_buffer(void) {
450   /* Discard current buffer contents */
451   while(nsamples) {
452     //fprintf(stderr, "%8u/%u (%u) DROPPING\n", nsamples, maxbuffer, minbuffer);
453     drop_first_packet();
454   }
455   disorder_info("Buffering...");
456   /* Wait until there's at least minbuffer samples available */
457   while(nsamples < minbuffer) {
458     //fprintf(stderr, "%8u/%u (%u) FILLING\n", nsamples, maxbuffer, minbuffer);
459     pthread_cond_wait(&cond, &lock);
460   }
461   /* Start from whatever is earliest */
462   next_timestamp = pheap_first(&packets)->timestamp;
463   active = 1;
464 }
465
466 /** @brief Find next packet
467  * @return Packet to play or NULL if none found
468  *
469  * The return packet is merely guaranteed not to be in the past: it might be
470  * the first packet in the future rather than one that is actually suitable to
471  * play.
472  *
473  * Must be called with @ref lock held.
474  */
475 struct packet *playrtp_next_packet(void) {
476   while(pheap_count(&packets)) {
477     struct packet *const p = pheap_first(&packets);
478     if(le(p->timestamp + p->nsamples, next_timestamp)) {
479       /* This packet is in the past.  Drop it and try another one. */
480       drop_first_packet();
481     } else
482       /* This packet is NOT in the past.  (It might be in the future
483        * however.) */
484       return p;
485   }
486   return 0;
487 }
488
489 /* display usage message and terminate */
490 static void help(void) {
491   xprintf("Usage:\n"
492           "  disorder-playrtp [OPTIONS] [[ADDRESS] PORT]\n"
493           "Options:\n"
494           "  --device, -D DEVICE     Output device\n"
495           "  --min, -m FRAMES        Buffer low water mark\n"
496           "  --max, -x FRAMES        Buffer maximum size\n"
497           "  --rcvbuf, -R BYTES      Socket receive buffer size\n"
498           "  --config, -C PATH       Set configuration file\n"
499           "  --api, -A API           Select audio API.  Possibilities:\n"
500           "                            ");
501   int first = 1;
502   for(int n = 0; uaudio_apis[n]; ++n) {
503     if(uaudio_apis[n]->flags & UAUDIO_API_CLIENT) {
504       if(first)
505         first = 0;
506       else
507         xprintf(", ");
508       xprintf("%s", uaudio_apis[n]->name);
509     }
510   }
511   xprintf("\n"
512           "  --command, -e COMMAND   Pipe audio to command.\n"
513           "  --pause-mode, -P silence  For -e: pauses send silence (default)\n"
514           "  --pause-mode, -P suspend  For -e: pauses suspend writes\n"
515           "  --help, -h              Display usage message\n"
516           "  --version, -V           Display version number\n"
517           );
518   xfclose(stdout);
519   exit(0);
520 }
521
522 static size_t playrtp_callback(void *buffer,
523                                size_t max_samples,
524                                void attribute((unused)) *userdata) {
525   size_t samples;
526   int silent = 0;
527
528   pthread_mutex_lock(&lock);
529   /* Get the next packet, junking any that are now in the past */
530   const struct packet *p = playrtp_next_packet();
531   if(p && contains(p, next_timestamp)) {
532     /* This packet is ready to play; the desired next timestamp points
533      * somewhere into it. */
534
535     /* Timestamp of end of packet */
536     const uint32_t packet_end = p->timestamp + p->nsamples;
537
538     /* Offset of desired next timestamp into current packet */
539     const uint32_t offset = next_timestamp - p->timestamp;
540
541     /* Pointer to audio data */
542     const uint16_t *ptr = (void *)(p->samples_raw + offset);
543
544     /* Compute number of samples left in packet, limited to output buffer
545      * size */
546     samples = packet_end - next_timestamp;
547     if(samples > max_samples)
548       samples = max_samples;
549
550     /* Copy into buffer, converting to native endianness */
551     size_t i = samples;
552     int16_t *bufptr = buffer;
553     while(i > 0) {
554       *bufptr++ = (int16_t)ntohs(*ptr++);
555       --i;
556     }
557     silent = !!(p->flags & SILENT);
558   } else {
559     /* There is no suitable packet.  We introduce 0s up to the next packet, or
560      * to fill the buffer if there's no next packet or that's too many.  The
561      * comparison with max_samples deals with the otherwise troubling overflow
562      * case. */
563     samples = p ? p->timestamp - next_timestamp : max_samples;
564     if(samples > max_samples)
565       samples = max_samples;
566     //info("infill by %zu", samples);
567     memset(buffer, 0, samples * uaudio_sample_size);
568     silent = 1;
569   }
570   /* Debug dump */
571   if(dump_buffer) {
572     for(size_t i = 0; i < samples; ++i) {
573       dump_buffer[dump_index++] = ((int16_t *)buffer)[i];
574       dump_index %= dump_size;
575     }
576   }
577   /* Advance timestamp */
578   next_timestamp += samples;
579   /* If we're getting behind then try to drop just silent packets
580    *
581    * In theory this shouldn't be necessary.  The server is supposed to send
582    * packets at the right rate and compares the number of samples sent with the
583    * time in order to ensure this.
584    *
585    * However, various things could throw this off:
586    *
587    * - the server's clock could advance at the wrong rate.  This would cause it
588    *   to mis-estimate the right number of samples to have sent and
589    *   inappropriately throttle or speed up.
590    *
591    * - playback could happen at the wrong rate.  If the playback host's sound
592    *   card has a slightly incorrect clock then eventually it will get out
593    *   of step.
594    *
595    * So if we play back slightly slower than the server sends for either of
596    * these reasons then eventually our buffer, and the socket's buffer, will
597    * fill, and the kernel will start dropping packets.  The result is audible
598    * and not very nice.
599    *
600    * Therefore if we're getting behind, we pre-emptively drop silent packets,
601    * since a change in the duration of a silence is less noticeable than a
602    * dropped packet from the middle of continuous music.
603    *
604    * (If things go wrong the other way then eventually we run out of packets to
605    * play and are forced to play silence.  This doesn't seem to happen in
606    * practice but if it does then in the same way we can artificially extend
607    * silent packets to compensate.)
608    *
609    * Dropped packets are always logged; use 'disorder-playrtp --monitor' to
610    * track how close to target buffer occupancy we are on a once-a-minute
611    * basis.
612    */
613   if(nsamples > minbuffer && silent) {
614     disorder_info("dropping %zu samples (%"PRIu32" > %"PRIu32")",
615                   samples, nsamples, minbuffer);
616     samples = 0;
617   }
618   /* Junk obsolete packets */
619   playrtp_next_packet();
620   pthread_mutex_unlock(&lock);
621   return samples;
622 }
623
624 static int compare_family(const struct ifaddrs *a,
625                           const struct ifaddrs *b,
626                           int family) {
627   int afamily = a->ifa_addr->sa_family;
628   int bfamily = b->ifa_addr->sa_family;
629   if(afamily != bfamily) {
630     /* Preferred family wins */
631     if(afamily == family) return 1;
632     if(bfamily == family) return -1;
633     /* Either there's no preference or it doesn't help.  Prefer IPv4 */
634     if(afamily == AF_INET) return 1;
635     if(bfamily == AF_INET) return -1;
636     /* Failing that prefer IPv6 */
637     if(afamily == AF_INET6) return 1;
638     if(bfamily == AF_INET6) return -1;
639   }
640   return 0;
641 }
642
643 static int compare_flags(const struct ifaddrs *a,
644                          const struct ifaddrs *b) {
645   unsigned aflags = a->ifa_flags, bflags = b->ifa_flags;
646   /* Up interfaces are better than down ones */
647   unsigned aup = aflags & IFF_UP, bup = bflags & IFF_UP;
648   if(aup != bup)
649     return aup > bup ? 1 : -1;
650   /* Static addresses are better than dynamic */
651   unsigned adynamic = aflags & IFF_DYNAMIC, bdynamic = bflags & IFF_DYNAMIC;
652   if(adynamic != bdynamic)
653     return adynamic < bdynamic ? 1 : -1;
654   unsigned aloopback = aflags & IFF_LOOPBACK, bloopback = bflags & IFF_LOOPBACK;
655   /* Static addresses are better than dynamic */
656   if(aloopback != bloopback)
657     return aloopback < bloopback ? 1 : -1;
658   return 0;
659 }
660
661 static int compare_interfaces(const struct ifaddrs *a,
662                               const struct ifaddrs *b,
663                               int family) {
664   int c;
665   if((c = compare_family(a, b, family))) return c;
666   return compare_flags(a, b);
667 }
668
669 int main(int argc, char **argv) {
670   int n, err;
671   struct addrinfo *res;
672   struct stringlist sl;
673   char *sockname;
674   int rcvbuf, target_rcvbuf = 0;
675   socklen_t len;
676   struct ip_mreq mreq;
677   struct ipv6_mreq mreq6;
678   disorder_client *c = NULL;
679   char *address, *port;
680   int is_multicast;
681   union any_sockaddr {
682     struct sockaddr sa;
683     struct sockaddr_in in;
684     struct sockaddr_in6 in6;
685   };
686   union any_sockaddr mgroup;
687   const char *dumpfile = 0;
688   pthread_t ltid;
689   int monitor = 0;
690   static const int one = 1;
691
692   static const struct addrinfo prefs = {
693     .ai_flags = AI_PASSIVE,
694     .ai_family = PF_INET,
695     .ai_socktype = SOCK_DGRAM,
696     .ai_protocol = IPPROTO_UDP
697   };
698
699   /* Timing information is often important to debugging playrtp, so we include
700    * timestamps in the logs */
701   logdate = 1;
702   mem_init();
703   if(!setlocale(LC_CTYPE, "")) disorder_fatal(errno, "error calling setlocale");
704   while((n = getopt_long(argc, argv, "hVdD:m:x:L:R:aocC:re:P:MA:", options, 0)) >= 0) {
705     switch(n) {
706     case 'h': help();
707     case 'V': version("disorder-playrtp");
708     case 'd': debugging = 1; break;
709     case 'D': uaudio_set("device", optarg); break;
710     case 'm': minbuffer = 2 * atol(optarg); break;
711     case 'x': maxbuffer = 2 * atol(optarg); break;
712     case 'L': logfp = fopen(optarg, "w"); break;
713     case 'R': target_rcvbuf = atoi(optarg); break;
714 #if HAVE_ALSA_ASOUNDLIB_H
715     case 'a':
716       disorder_error(0, "deprecated option; use --api alsa instead");
717       backend = &uaudio_alsa; break;
718 #endif
719 #if HAVE_SYS_SOUNDCARD_H || EMPEG_HOST
720     case 'o':
721       disorder_error(0, "deprecated option; use --api oss instead");
722       backend = &uaudio_oss; 
723       break;
724 #endif
725 #if HAVE_COREAUDIO_AUDIOHARDWARE_H      
726     case 'c':
727       disorder_error(0, "deprecated option; use --api coreaudio instead");
728       backend = &uaudio_coreaudio;
729       break;
730 #endif
731     case 'A': backend = uaudio_find(optarg); break;
732     case 'C': configfile = optarg; break;
733     case 's': control_socket = optarg; break;
734     case 'r': dumpfile = optarg; break;
735     case 'e': backend = &uaudio_command; uaudio_set("command", optarg); break;
736     case 'P': uaudio_set("pause-mode", optarg); break;
737     case 'M': monitor = 1; break;
738     default: disorder_fatal(0, "invalid option");
739     }
740   }
741   if(config_read(0, NULL)) disorder_fatal(0, "cannot read configuration");
742   if(!backend) {
743     backend = uaudio_default(uaudio_apis, UAUDIO_API_CLIENT);
744     if(!backend)
745       disorder_fatal(0, "no default uaudio API found");
746     disorder_info("default audio API %s", backend->name);
747   }
748   if(backend == &uaudio_rtp) {
749     /* This means that you have NO local sound output.  This can happen if you
750      * use a non-Apple GCC on a Mac (because it doesn't know how to compile
751      * CoreAudio/AudioHardware.h). */
752     disorder_fatal(0, "cannot play RTP through RTP");
753   }
754   if(!maxbuffer)
755     maxbuffer = 2 * minbuffer;
756   argc -= optind;
757   argv += optind;
758   switch(argc) {
759   case 0:
760     /* Get configuration from server */
761     if(!(c = disorder_new(1))) exit(EXIT_FAILURE);
762     if(disorder_connect(c)) exit(EXIT_FAILURE);
763     if(disorder_rtp_address(c, &address, &port)) exit(EXIT_FAILURE);
764     sl.n = 2;
765     sl.s = xcalloc(2, sizeof *sl.s);
766     sl.s[0] = address;
767     sl.s[1] = port;
768     break;
769   case 1:
770   case 2:
771     /* Use command-line ADDRESS+PORT or just PORT */
772     sl.n = argc;
773     sl.s = argv;
774     break;
775   default:
776     disorder_fatal(0, "usage: disorder-playrtp [OPTIONS] [[ADDRESS] PORT]");
777   }
778   disorder_info("version "VERSION" process ID %lu",
779                 (unsigned long)getpid());
780   struct sockaddr *addr;
781   socklen_t addr_len;
782   if(!strcmp(sl.s[0], "-")) {
783     /* Pick address family to match known-working connectivity to the server */
784     int family = disorder_client_af(c);
785     /* Get a list of interfaces */
786     struct ifaddrs *ifa, *bestifa = NULL;
787     if(getifaddrs(&ifa) < 0)
788       disorder_fatal(errno, "error calling getifaddrs");
789     /* Try to pick a good one */
790     for(; ifa; ifa = ifa->ifa_next) {
791       if(bestifa == NULL
792          || compare_interfaces(ifa, bestifa, family) > 0)
793         bestifa = ifa;
794     }
795     if(!bestifa)
796       disorder_fatal(0, "failed to select a network interface");
797     family = bestifa->ifa_addr->sa_family;
798     if((rtpfd = socket(family,
799                        SOCK_DGRAM,
800                        IPPROTO_UDP)) < 0)
801       disorder_fatal(errno, "error creating socket (family %d)", family);
802     /* Bind the address */
803     if(bind(rtpfd, bestifa->ifa_addr,
804             family == AF_INET
805             ? sizeof (struct sockaddr_in) : sizeof (struct sockaddr_in6)) < 0)
806       disorder_fatal(errno, "error binding socket");
807     static struct sockaddr_storage bound_address;
808     addr = (struct sockaddr *)&bound_address;
809     addr_len = sizeof bound_address;
810     if(getsockname(rtpfd, addr, &addr_len) < 0)
811       disorder_fatal(errno, "error getting socket address");
812     /* Convert to string */
813     char addrname[128], portname[32];
814     if(getnameinfo(addr, addr_len,
815                    addrname, sizeof addrname,
816                    portname, sizeof portname,
817                    NI_NUMERICHOST|NI_NUMERICSERV) < 0)
818       disorder_fatal(errno, "getnameinfo");
819     /* Ask for audio data */
820     if(disorder_rtp_request(c, addrname, portname)) exit(EXIT_FAILURE);
821     /* Report what we did */
822     disorder_info("listening on %s", format_sockaddr(addr));
823   } else {
824     /* Look up address and port */
825     if(!(res = get_address(&sl, &prefs, &sockname)))
826       exit(1);
827     addr = res->ai_addr;
828     addr_len = res->ai_addrlen;
829     /* Create the socket */
830     if((rtpfd = socket(res->ai_family,
831                        res->ai_socktype,
832                        res->ai_protocol)) < 0)
833       disorder_fatal(errno, "error creating socket");
834     /* Allow multiple listeners */
835     xsetsockopt(rtpfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof one);
836     is_multicast = multicast(addr);
837     /* The multicast and unicast/broadcast cases are different enough that they
838      * are totally split.  Trying to find commonality between them causes more
839      * trouble that it's worth. */
840     if(is_multicast) {
841       /* Stash the multicast group address */
842       memcpy(&mgroup, addr, addr_len);
843       switch(res->ai_addr->sa_family) {
844       case AF_INET:
845         mgroup.in.sin_port = 0;
846         break;
847       case AF_INET6:
848         mgroup.in6.sin6_port = 0;
849         break;
850       default:
851         disorder_fatal(0, "unsupported address family %d",
852                        (int)addr->sa_family);
853       }
854       /* Bind to to the multicast group address */
855       if(bind(rtpfd, addr, addr_len) < 0)
856         disorder_fatal(errno, "error binding socket to %s",
857                        format_sockaddr(addr));
858       /* Add multicast group membership */
859       switch(mgroup.sa.sa_family) {
860       case PF_INET:
861         mreq.imr_multiaddr = mgroup.in.sin_addr;
862         mreq.imr_interface.s_addr = 0;      /* use primary interface */
863         if(setsockopt(rtpfd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
864                       &mreq, sizeof mreq) < 0)
865           disorder_fatal(errno, "error calling setsockopt IP_ADD_MEMBERSHIP");
866         break;
867       case PF_INET6:
868         mreq6.ipv6mr_multiaddr = mgroup.in6.sin6_addr;
869         memset(&mreq6.ipv6mr_interface, 0, sizeof mreq6.ipv6mr_interface);
870         if(setsockopt(rtpfd, IPPROTO_IPV6, IPV6_JOIN_GROUP,
871                       &mreq6, sizeof mreq6) < 0)
872           disorder_fatal(errno, "error calling setsockopt IPV6_JOIN_GROUP");
873         break;
874       default:
875         disorder_fatal(0, "unsupported address family %d", res->ai_family);
876       }
877       /* Report what we did */
878       disorder_info("listening on %s multicast group %s",
879                     format_sockaddr(addr), format_sockaddr(&mgroup.sa));
880     } else {
881       /* Bind to 0/port */
882       switch(addr->sa_family) {
883       case AF_INET: {
884         struct sockaddr_in *in = (struct sockaddr_in *)addr;
885       
886         memset(&in->sin_addr, 0, sizeof (struct in_addr));
887         break;
888       }
889       case AF_INET6: {
890         struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr;
891       
892         memset(&in6->sin6_addr, 0, sizeof (struct in6_addr));
893         break;
894       }
895       default:
896         disorder_fatal(0, "unsupported family %d", (int)addr->sa_family);
897       }
898       if(bind(rtpfd, addr, addr_len) < 0)
899         disorder_fatal(errno, "error binding socket to %s",
900                        format_sockaddr(addr));
901       /* Report what we did */
902       disorder_info("listening on %s", format_sockaddr(addr));
903     }
904   }
905   len = sizeof rcvbuf;
906   if(getsockopt(rtpfd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &len) < 0)
907     disorder_fatal(errno, "error calling getsockopt SO_RCVBUF");
908   if(target_rcvbuf > rcvbuf) {
909     if(setsockopt(rtpfd, SOL_SOCKET, SO_RCVBUF,
910                   &target_rcvbuf, sizeof target_rcvbuf) < 0)
911       disorder_error(errno, "error calling setsockopt SO_RCVBUF %d", 
912                      target_rcvbuf);
913       /* We try to carry on anyway */
914     else
915       disorder_info("changed socket receive buffer from %d to %d",
916                     rcvbuf, target_rcvbuf);
917   } else
918     disorder_info("default socket receive buffer %d", rcvbuf);
919   //info("minbuffer %u maxbuffer %u", minbuffer, maxbuffer);
920   if(logfp)
921     disorder_info("WARNING: -L option can impact performance");
922   if(control_socket) {
923     pthread_t tid;
924
925     if((err = pthread_create(&tid, 0, control_thread, 0)))
926       disorder_fatal(err, "pthread_create control_thread");
927   }
928   if(dumpfile) {
929     int fd;
930     unsigned char buffer[65536];
931     size_t written;
932
933     if((fd = open(dumpfile, O_RDWR|O_TRUNC|O_CREAT, 0666)) < 0)
934       disorder_fatal(errno, "opening %s", dumpfile);
935     /* Fill with 0s to a suitable size */
936     memset(buffer, 0, sizeof buffer);
937     for(written = 0; written < dump_size * sizeof(int16_t);
938         written += sizeof buffer) {
939       if(write(fd, buffer, sizeof buffer) < 0)
940         disorder_fatal(errno, "clearing %s", dumpfile);
941     }
942     /* Map the buffer into memory for convenience */
943     dump_buffer = mmap(0, dump_size * sizeof(int16_t), PROT_READ|PROT_WRITE,
944                        MAP_SHARED, fd, 0);
945     if(dump_buffer == (void *)-1)
946       disorder_fatal(errno, "mapping %s", dumpfile);
947     disorder_info("dumping to %s", dumpfile);
948   }
949   /* Set up output.  Currently we only support L16 so there's no harm setting
950    * the format before we know what it is! */
951   uaudio_set_format(44100/*Hz*/, 2/*channels*/,
952                     16/*bits/channel*/, 1/*signed*/);
953   uaudio_set("application", "disorder-playrtp");
954   backend->start(playrtp_callback, NULL);
955   /* We receive and convert audio data in a background thread */
956   if((err = pthread_create(&ltid, 0, listen_thread, 0)))
957     disorder_fatal(err, "pthread_create listen_thread");
958   /* We have a second thread to add received packets to the queue */
959   if((err = pthread_create(&ltid, 0, queue_thread, 0)))
960     disorder_fatal(err, "pthread_create queue_thread");
961   pthread_mutex_lock(&lock);
962   time_t lastlog = 0;
963   for(;;) {
964     /* Wait for the buffer to fill up a bit */
965     playrtp_fill_buffer();
966     /* Start playing now */
967     disorder_info("Playing...");
968     next_timestamp = pheap_first(&packets)->timestamp;
969     active = 1;
970     pthread_mutex_unlock(&lock);
971     backend->activate();
972     pthread_mutex_lock(&lock);
973     /* Wait until the buffer empties out
974      *
975      * If there's a packet that we can play right now then we definitely
976      * continue.
977      *
978      * Also if there's at least minbuffer samples we carry on regardless and
979      * insert silence.  The assumption is there's been a pause but more data
980      * is now available.
981      */
982     while(nsamples >= minbuffer
983           || (nsamples > 0
984               && contains(pheap_first(&packets), next_timestamp))) {
985       if(monitor) {
986         time_t now = xtime(0);
987
988         if(now >= lastlog + 60) {
989           int offset = nsamples - minbuffer;
990           double offtime = (double)offset / (uaudio_rate * uaudio_channels);
991           disorder_info("%+d samples off (%d.%02ds, %d bytes)",
992                         offset,
993                         (int)fabs(offtime) * (offtime < 0 ? -1 : 1),
994                         (int)(fabs(offtime) * 100) % 100,
995                         offset * uaudio_bits / CHAR_BIT);
996           lastlog = now;
997         }
998       }
999       //fprintf(stderr, "%8u/%u (%u) PLAYING\n", nsamples, maxbuffer, minbuffer);
1000       pthread_cond_wait(&cond, &lock);
1001     }
1002 #if 0
1003     if(nsamples) {
1004       struct packet *p = pheap_first(&packets);
1005       fprintf(stderr, "nsamples=%u (%u) next_timestamp=%"PRIx32", first packet is [%"PRIx32",%"PRIx32")\n",
1006               nsamples, minbuffer, next_timestamp,p->timestamp,p->timestamp+p->nsamples);
1007     }
1008 #endif
1009     /* Stop playing for a bit until the buffer re-fills */
1010     pthread_mutex_unlock(&lock);
1011     backend->deactivate();
1012     pthread_mutex_lock(&lock);
1013     active = 0;
1014     /* Go back round */
1015   }
1016   return 0;
1017 }
1018
1019 /*
1020 Local Variables:
1021 c-basic-offset:2
1022 comment-column:40
1023 fill-column:79
1024 indent-tabs-mode:nil
1025 End:
1026 */