chiark / gitweb /
Merge branch 'mdw/gstdecode'
[disorder] / clients / playrtp.c
1 /*
2  * This file is part of DisOrder.
3  * Copyright (C) 2007-2009, 2011, 2013 Richard Kettlewell
4  *
5  * This program is free software: you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation, either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * 
15  * You should have received a copy of the GNU General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18 /** @file clients/playrtp.c
19  * @brief RTP player
20  *
21  * This player supports Linux (<a href="http://www.alsa-project.org/">ALSA</a>)
22  * and Apple Mac (<a
23  * href="http://developer.apple.com/audio/coreaudio.html">Core Audio</a>)
24  * systems.  There is no support for Microsoft Windows yet, and that will in
25  * fact probably an entirely separate program.
26  *
27  * The program runs (at least) three threads:
28  *
29  * listen_thread() is responsible for reading RTP packets off the wire and
30  * adding them to the linked list @ref received_packets, assuming they are
31  * basically sound.
32  *
33  * queue_thread() takes packets off this linked list and adds them to @ref
34  * packets (an operation which might be much slower due to contention for @ref
35  * lock).
36  *
37  * control_thread() accepts commands from Disobedience (or anything else).
38  *
39  * The main thread activates and deactivates audio playing via the @ref
40  * lib/uaudio.h API (which probably implies at least one further thread).
41  *
42  * Sometimes it happens that there is no audio available to play.  This may
43  * because the server went away, or a packet was dropped, or the server
44  * deliberately did not send any sound because it encountered a silence.
45  *
46  * Assumptions:
47  * - it is safe to read uint32_t values without a lock protecting them
48  */
49
50 #include "common.h"
51
52 #include <getopt.h>
53 #include <sys/socket.h>
54 #include <sys/types.h>
55 #include <sys/socket.h>
56 #include <netdb.h>
57 #include <pthread.h>
58 #include <locale.h>
59 #include <sys/uio.h>
60 #include <errno.h>
61 #include <netinet/in.h>
62 #include <sys/time.h>
63 #include <sys/un.h>
64 #include <unistd.h>
65 #include <sys/mman.h>
66 #include <fcntl.h>
67 #include <math.h>
68 #include <arpa/inet.h>
69 #include <ifaddrs.h>
70 #include <net/if.h>
71
72 #include "log.h"
73 #include "mem.h"
74 #include "configuration.h"
75 #include "addr.h"
76 #include "syscalls.h"
77 #include "rtp.h"
78 #include "defs.h"
79 #include "vector.h"
80 #include "heap.h"
81 #include "timeval.h"
82 #include "client.h"
83 #include "playrtp.h"
84 #include "inputline.h"
85 #include "version.h"
86 #include "uaudio.h"
87
88 /** @brief Obsolete synonym */
89 #ifndef IPV6_JOIN_GROUP
90 # define IPV6_JOIN_GROUP IPV6_ADD_MEMBERSHIP
91 #endif
92
93 /** @brief RTP socket */
94 static int rtpfd;
95
96 /** @brief Log output */
97 static FILE *logfp;
98
99 /** @brief Output device */
100
101 /** @brief Buffer low watermark in samples */
102 unsigned minbuffer = 4 * (2 * 44100) / 10;  /* 0.4 seconds */
103
104 /** @brief Maximum buffer size in samples
105  *
106  * We'll stop reading from the network if we have this many samples.
107  */
108 static unsigned maxbuffer;
109
110 /** @brief Received packets
111  * Protected by @ref receive_lock
112  *
113  * Received packets are added to this list, and queue_thread() picks them off
114  * it and adds them to @ref packets.  Whenever a packet is added to it, @ref
115  * receive_cond is signalled.
116  */
117 struct packet *received_packets;
118
119 /** @brief Tail of @ref received_packets
120  * Protected by @ref receive_lock
121  */
122 struct packet **received_tail = &received_packets;
123
124 /** @brief Lock protecting @ref received_packets 
125  *
126  * Only listen_thread() and queue_thread() ever hold this lock.  It is vital
127  * that queue_thread() not hold it any longer than it strictly has to. */
128 pthread_mutex_t receive_lock = PTHREAD_MUTEX_INITIALIZER;
129
130 /** @brief Condition variable signalled when @ref received_packets is updated
131  *
132  * Used by listen_thread() to notify queue_thread() that it has added another
133  * packet to @ref received_packets. */
134 pthread_cond_t receive_cond = PTHREAD_COND_INITIALIZER;
135
136 /** @brief Length of @ref received_packets */
137 uint32_t nreceived;
138
139 /** @brief Binary heap of received packets */
140 struct pheap packets;
141
142 /** @brief Total number of samples available
143  *
144  * We make this volatile because we inspect it without a protecting lock,
145  * so the usual pthread_* guarantees aren't available.
146  */
147 volatile uint32_t nsamples;
148
149 /** @brief Timestamp of next packet to play.
150  *
151  * This is set to the timestamp of the last packet, plus the number of
152  * samples it contained.  Only valid if @ref active is nonzero.
153  */
154 uint32_t next_timestamp;
155
156 /** @brief True if actively playing
157  *
158  * This is true when playing and false when just buffering. */
159 int active;
160
161 /** @brief Lock protecting @ref packets */
162 pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
163
164 /** @brief Condition variable signalled whenever @ref packets is changed */
165 pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
166
167 /** @brief Backend to play with */
168 static const struct uaudio *backend;
169
170 HEAP_DEFINE(pheap, struct packet *, lt_packet);
171
172 /** @brief Control socket or NULL */
173 const char *control_socket;
174
175 /** @brief Buffer for debugging dump
176  *
177  * The debug dump is enabled by the @c --dump option.  It records the last 20s
178  * of audio to the specified file (which will be about 3.5Mbytes).  The file is
179  * written as as ring buffer, so the start point will progress through it.
180  *
181  * Use clients/dump2wav to convert this to a WAV file, which can then be loaded
182  * into (e.g.) Audacity for further inspection.
183  *
184  * All three backends (ALSA, OSS, Core Audio) now support this option.
185  *
186  * The idea is to allow the user a few seconds to react to an audible artefact.
187  */
188 int16_t *dump_buffer;
189
190 /** @brief Current index within debugging dump */
191 size_t dump_index;
192
193 /** @brief Size of debugging dump in samples */
194 size_t dump_size = 44100/*Hz*/ * 2/*channels*/ * 20/*seconds*/;
195
196 static const struct option options[] = {
197   { "help", no_argument, 0, 'h' },
198   { "version", no_argument, 0, 'V' },
199   { "debug", no_argument, 0, 'd' },
200   { "device", required_argument, 0, 'D' },
201   { "min", required_argument, 0, 'm' },
202   { "max", required_argument, 0, 'x' },
203   { "rcvbuf", required_argument, 0, 'R' },
204 #if HAVE_SYS_SOUNDCARD_H || EMPEG_HOST
205   { "oss", no_argument, 0, 'o' },
206 #endif
207 #if HAVE_ALSA_ASOUNDLIB_H
208   { "alsa", no_argument, 0, 'a' },
209 #endif
210 #if HAVE_COREAUDIO_AUDIOHARDWARE_H
211   { "core-audio", no_argument, 0, 'c' },
212 #endif
213   { "api", required_argument, 0, 'A' },
214   { "dump", required_argument, 0, 'r' },
215   { "command", required_argument, 0, 'e' },
216   { "pause-mode", required_argument, 0, 'P' },
217   { "socket", required_argument, 0, 's' },
218   { "config", required_argument, 0, 'C' },
219   { "monitor", no_argument, 0, 'M' },
220   { 0, 0, 0, 0 }
221 };
222
223 /** @brief Control thread
224  *
225  * This thread is responsible for accepting control commands from Disobedience
226  * (or other controllers) over an AF_UNIX stream socket with a path specified
227  * by the @c --socket option.  The protocol uses simple string commands and
228  * replies:
229  *
230  * - @c stop will shut the player down
231  * - @c query will send back the reply @c running
232  * - anything else is ignored
233  *
234  * Commands and response strings terminated by shutting down the connection or
235  * by a newline.  No attempt is made to multiplex multiple clients so it is
236  * important that the command be sent as soon as the connection is made - it is
237  * assumed that both parties to the protocol are entirely cooperating with one
238  * another.
239  */
240 static void *control_thread(void attribute((unused)) *arg) {
241   struct sockaddr_un sa;
242   int sfd, cfd;
243   char *line;
244   socklen_t salen;
245   FILE *fp;
246
247   assert(control_socket);
248   unlink(control_socket);
249   memset(&sa, 0, sizeof sa);
250   sa.sun_family = AF_UNIX;
251   strcpy(sa.sun_path, control_socket);
252   sfd = xsocket(PF_UNIX, SOCK_STREAM, 0);
253   if(bind(sfd, (const struct sockaddr *)&sa, sizeof sa) < 0)
254     disorder_fatal(errno, "error binding to %s", control_socket);
255   if(listen(sfd, 128) < 0)
256     disorder_fatal(errno, "error calling listen on %s", control_socket);
257   disorder_info("listening on %s", control_socket);
258   for(;;) {
259     salen = sizeof sa;
260     cfd = accept(sfd, (struct sockaddr *)&sa, &salen);
261     if(cfd < 0) {
262       switch(errno) {
263       case EINTR:
264       case EAGAIN:
265         break;
266       default:
267         disorder_fatal(errno, "error calling accept on %s", control_socket);
268       }
269     }
270     if(!(fp = fdopen(cfd, "r+"))) {
271       disorder_error(errno, "error calling fdopen for %s connection", control_socket);
272       close(cfd);
273       continue;
274     }
275     if(!inputline(control_socket, fp, &line, '\n')) {
276       if(!strcmp(line, "stop")) {
277         disorder_info("stopped via %s", control_socket);
278         exit(0);                          /* terminate immediately */
279       }
280       if(!strcmp(line, "query"))
281         fprintf(fp, "running");
282       xfree(line);
283     }
284     if(fclose(fp) < 0)
285       disorder_error(errno, "error closing %s connection", control_socket);
286   }
287 }
288
289 /** @brief Drop the first packet
290  *
291  * Assumes that @ref lock is held. 
292  */
293 static void drop_first_packet(void) {
294   if(pheap_count(&packets)) {
295     struct packet *const p = pheap_remove(&packets);
296     nsamples -= p->nsamples;
297     playrtp_free_packet(p);
298     pthread_cond_broadcast(&cond);
299   }
300 }
301
302 /** @brief Background thread adding packets to heap
303  *
304  * This just transfers packets from @ref received_packets to @ref packets.  It
305  * is important that it holds @ref receive_lock for as little time as possible,
306  * in order to minimize the interval between calls to read() in
307  * listen_thread().
308  */
309 static void *queue_thread(void attribute((unused)) *arg) {
310   struct packet *p;
311
312   for(;;) {
313     /* Get the next packet */
314     pthread_mutex_lock(&receive_lock);
315     while(!received_packets) {
316       pthread_cond_wait(&receive_cond, &receive_lock);
317     }
318     p = received_packets;
319     received_packets = p->next;
320     if(!received_packets)
321       received_tail = &received_packets;
322     --nreceived;
323     pthread_mutex_unlock(&receive_lock);
324     /* Add it to the heap */
325     pthread_mutex_lock(&lock);
326     pheap_insert(&packets, p);
327     nsamples += p->nsamples;
328     pthread_cond_broadcast(&cond);
329     pthread_mutex_unlock(&lock);
330   }
331 #if HAVE_STUPID_GCC44
332   return NULL;
333 #endif
334 }
335
336 /** @brief Background thread collecting samples
337  *
338  * This function collects samples, perhaps converts them to the target format,
339  * and adds them to the packet list.
340  *
341  * It is crucial that the gap between successive calls to read() is as small as
342  * possible: otherwise packets will be dropped.
343  *
344  * We use a binary heap to ensure that the unavoidable effort is at worst
345  * logarithmic in the total number of packets - in fact if packets are mostly
346  * received in order then we will largely do constant work per packet since the
347  * newest packet will always be last.
348  *
349  * Of more concern is that we must acquire the lock on the heap to add a packet
350  * to it.  If this proves a problem in practice then the answer would be
351  * (probably doubly) linked list with new packets added the end and a second
352  * thread which reads packets off the list and adds them to the heap.
353  *
354  * We keep memory allocation (mostly) very fast by keeping pre-allocated
355  * packets around; see @ref playrtp_new_packet().
356  */
357 static void *listen_thread(void attribute((unused)) *arg) {
358   struct packet *p = 0;
359   int n;
360   struct rtp_header header;
361   uint16_t seq;
362   uint32_t timestamp;
363   struct iovec iov[2];
364
365   for(;;) {
366     if(!p)
367       p = playrtp_new_packet();
368     iov[0].iov_base = &header;
369     iov[0].iov_len = sizeof header;
370     iov[1].iov_base = p->samples_raw;
371     iov[1].iov_len = sizeof p->samples_raw / sizeof *p->samples_raw;
372     n = readv(rtpfd, iov, 2);
373     if(n < 0) {
374       switch(errno) {
375       case EINTR:
376         continue;
377       default:
378         disorder_fatal(errno, "error reading from socket");
379       }
380     }
381     /* Ignore too-short packets */
382     if((size_t)n <= sizeof (struct rtp_header)) {
383       disorder_info("ignored a short packet");
384       continue;
385     }
386     timestamp = htonl(header.timestamp);
387     seq = htons(header.seq);
388     /* Ignore packets in the past */
389     if(active && lt(timestamp, next_timestamp)) {
390       disorder_info("dropping old packet, timestamp=%"PRIx32" < %"PRIx32,
391            timestamp, next_timestamp);
392       continue;
393     }
394     /* Ignore packets with the extension bit set. */
395     if(header.vpxcc & 0x10)
396       continue;
397     p->next = 0;
398     p->flags = 0;
399     p->timestamp = timestamp;
400     /* Convert to target format */
401     if(header.mpt & 0x80)
402       p->flags |= IDLE;
403     switch(header.mpt & 0x7F) {
404     case 10:                            /* L16 */
405       p->nsamples = (n - sizeof header) / sizeof(uint16_t);
406       break;
407       /* TODO support other RFC3551 media types (when the speaker does) */
408     default:
409       disorder_fatal(0, "unsupported RTP payload type %d", header.mpt & 0x7F);
410     }
411     /* See if packet is silent */
412     const uint16_t *s = p->samples_raw;
413     n = p->nsamples;
414     for(; n > 0; --n)
415       if(*s++)
416         break;
417     if(!n)
418       p->flags |= SILENT;
419     if(logfp)
420       fprintf(logfp, "sequence %u timestamp %"PRIx32" length %"PRIx32" end %"PRIx32"\n",
421               seq, timestamp, p->nsamples, timestamp + p->nsamples);
422     /* Stop reading if we've reached the maximum.
423      *
424      * This is rather unsatisfactory: it means that if packets get heavily
425      * out of order then we guarantee dropouts.  But for now... */
426     if(nsamples >= maxbuffer) {
427       pthread_mutex_lock(&lock);
428       while(nsamples >= maxbuffer) {
429         pthread_cond_wait(&cond, &lock);
430       }
431       pthread_mutex_unlock(&lock);
432     }
433     /* Add the packet to the receive queue */
434     pthread_mutex_lock(&receive_lock);
435     *received_tail = p;
436     received_tail = &p->next;
437     ++nreceived;
438     pthread_cond_signal(&receive_cond);
439     pthread_mutex_unlock(&receive_lock);
440     /* We'll need a new packet */
441     p = 0;
442   }
443 }
444
445 /** @brief Wait until the buffer is adequately full
446  *
447  * Must be called with @ref lock held.
448  */
449 void playrtp_fill_buffer(void) {
450   /* Discard current buffer contents */
451   while(nsamples) {
452     //fprintf(stderr, "%8u/%u (%u) DROPPING\n", nsamples, maxbuffer, minbuffer);
453     drop_first_packet();
454   }
455   disorder_info("Buffering...");
456   /* Wait until there's at least minbuffer samples available */
457   while(nsamples < minbuffer) {
458     //fprintf(stderr, "%8u/%u (%u) FILLING\n", nsamples, maxbuffer, minbuffer);
459     pthread_cond_wait(&cond, &lock);
460   }
461   /* Start from whatever is earliest */
462   next_timestamp = pheap_first(&packets)->timestamp;
463   active = 1;
464 }
465
466 /** @brief Find next packet
467  * @return Packet to play or NULL if none found
468  *
469  * The return packet is merely guaranteed not to be in the past: it might be
470  * the first packet in the future rather than one that is actually suitable to
471  * play.
472  *
473  * Must be called with @ref lock held.
474  */
475 struct packet *playrtp_next_packet(void) {
476   while(pheap_count(&packets)) {
477     struct packet *const p = pheap_first(&packets);
478     if(le(p->timestamp + p->nsamples, next_timestamp)) {
479       /* This packet is in the past.  Drop it and try another one. */
480       drop_first_packet();
481     } else
482       /* This packet is NOT in the past.  (It might be in the future
483        * however.) */
484       return p;
485   }
486   return 0;
487 }
488
489 /* display usage message and terminate */
490 static void help(void) {
491   xprintf("Usage:\n"
492           "  disorder-playrtp [OPTIONS] [[ADDRESS] PORT]\n"
493           "Options:\n"
494           "  --device, -D DEVICE     Output device\n"
495           "  --min, -m FRAMES        Buffer low water mark\n"
496           "  --max, -x FRAMES        Buffer maximum size\n"
497           "  --rcvbuf, -R BYTES      Socket receive buffer size\n"
498           "  --config, -C PATH       Set configuration file\n"
499           "  --api, -A API           Select audio API.  Possibilities:\n"
500           "                            ");
501   int first = 1;
502   for(int n = 0; uaudio_apis[n]; ++n) {
503     if(uaudio_apis[n]->flags & UAUDIO_API_CLIENT) {
504       if(first)
505         first = 0;
506       else
507         xprintf(", ");
508       xprintf("%s", uaudio_apis[n]->name);
509     }
510   }
511   xprintf("\n"
512           "  --command, -e COMMAND   Pipe audio to command.\n"
513           "  --pause-mode, -P silence  For -e: pauses send silence (default)\n"
514           "  --pause-mode, -P suspend  For -e: pauses suspend writes\n"
515           "  --help, -h              Display usage message\n"
516           "  --version, -V           Display version number\n"
517           );
518   xfclose(stdout);
519   exit(0);
520 }
521
522 static size_t playrtp_callback(void *buffer,
523                                size_t max_samples,
524                                void attribute((unused)) *userdata) {
525   size_t samples;
526   int silent = 0;
527
528   pthread_mutex_lock(&lock);
529   /* Get the next packet, junking any that are now in the past */
530   const struct packet *p = playrtp_next_packet();
531   if(p && contains(p, next_timestamp)) {
532     /* This packet is ready to play; the desired next timestamp points
533      * somewhere into it. */
534
535     /* Timestamp of end of packet */
536     const uint32_t packet_end = p->timestamp + p->nsamples;
537
538     /* Offset of desired next timestamp into current packet */
539     const uint32_t offset = next_timestamp - p->timestamp;
540
541     /* Pointer to audio data */
542     const uint16_t *ptr = (void *)(p->samples_raw + offset);
543
544     /* Compute number of samples left in packet, limited to output buffer
545      * size */
546     samples = packet_end - next_timestamp;
547     if(samples > max_samples)
548       samples = max_samples;
549
550     /* Copy into buffer, converting to native endianness */
551     size_t i = samples;
552     int16_t *bufptr = buffer;
553     while(i > 0) {
554       *bufptr++ = (int16_t)ntohs(*ptr++);
555       --i;
556     }
557     silent = !!(p->flags & SILENT);
558   } else {
559     /* There is no suitable packet.  We introduce 0s up to the next packet, or
560      * to fill the buffer if there's no next packet or that's too many.  The
561      * comparison with max_samples deals with the otherwise troubling overflow
562      * case. */
563     samples = p ? p->timestamp - next_timestamp : max_samples;
564     if(samples > max_samples)
565       samples = max_samples;
566     //info("infill by %zu", samples);
567     memset(buffer, 0, samples * uaudio_sample_size);
568     silent = 1;
569   }
570   /* Debug dump */
571   if(dump_buffer) {
572     for(size_t i = 0; i < samples; ++i) {
573       dump_buffer[dump_index++] = ((int16_t *)buffer)[i];
574       dump_index %= dump_size;
575     }
576   }
577   /* Advance timestamp */
578   next_timestamp += samples;
579   /* If we're getting behind then try to drop just silent packets
580    *
581    * In theory this shouldn't be necessary.  The server is supposed to send
582    * packets at the right rate and compares the number of samples sent with the
583    * time in order to ensure this.
584    *
585    * However, various things could throw this off:
586    *
587    * - the server's clock could advance at the wrong rate.  This would cause it
588    *   to mis-estimate the right number of samples to have sent and
589    *   inappropriately throttle or speed up.
590    *
591    * - playback could happen at the wrong rate.  If the playback host's sound
592    *   card has a slightly incorrect clock then eventually it will get out
593    *   of step.
594    *
595    * So if we play back slightly slower than the server sends for either of
596    * these reasons then eventually our buffer, and the socket's buffer, will
597    * fill, and the kernel will start dropping packets.  The result is audible
598    * and not very nice.
599    *
600    * Therefore if we're getting behind, we pre-emptively drop silent packets,
601    * since a change in the duration of a silence is less noticeable than a
602    * dropped packet from the middle of continuous music.
603    *
604    * (If things go wrong the other way then eventually we run out of packets to
605    * play and are forced to play silence.  This doesn't seem to happen in
606    * practice but if it does then in the same way we can artificially extend
607    * silent packets to compensate.)
608    *
609    * Dropped packets are always logged; use 'disorder-playrtp --monitor' to
610    * track how close to target buffer occupancy we are on a once-a-minute
611    * basis.
612    */
613   if(nsamples > minbuffer && silent) {
614     disorder_info("dropping %zu samples (%"PRIu32" > %"PRIu32")",
615                   samples, nsamples, minbuffer);
616     samples = 0;
617   }
618   /* Junk obsolete packets */
619   playrtp_next_packet();
620   pthread_mutex_unlock(&lock);
621   return samples;
622 }
623
624 static int compare_family(const struct ifaddrs *a,
625                           const struct ifaddrs *b,
626                           int family) {
627   int afamily = a->ifa_addr->sa_family;
628   int bfamily = b->ifa_addr->sa_family;
629   if(afamily != bfamily) {
630     /* Preferred family wins */
631     if(afamily == family) return 1;
632     if(bfamily == family) return -1;
633     /* Either there's no preference or it doesn't help.  Prefer IPv4 */
634     if(afamily == AF_INET) return 1;
635     if(bfamily == AF_INET) return -1;
636     /* Failing that prefer IPv6 */
637     if(afamily == AF_INET6) return 1;
638     if(bfamily == AF_INET6) return -1;
639   }
640   return 0;
641 }
642
643 static int compare_flags(const struct ifaddrs *a,
644                          const struct ifaddrs *b) {
645   unsigned aflags = a->ifa_flags, bflags = b->ifa_flags;
646   /* Up interfaces are better than down ones */
647   unsigned aup = aflags & IFF_UP, bup = bflags & IFF_UP;
648   if(aup != bup)
649     return aup > bup ? 1 : -1;
650 #if IFF_DYNAMIC
651   /* Static addresses are better than dynamic */
652   unsigned adynamic = aflags & IFF_DYNAMIC, bdynamic = bflags & IFF_DYNAMIC;
653   if(adynamic != bdynamic)
654     return adynamic < bdynamic ? 1 : -1;
655 #endif
656   unsigned aloopback = aflags & IFF_LOOPBACK, bloopback = bflags & IFF_LOOPBACK;
657   /* Static addresses are better than dynamic */
658   if(aloopback != bloopback)
659     return aloopback < bloopback ? 1 : -1;
660   return 0;
661 }
662
663 static int compare_interfaces(const struct ifaddrs *a,
664                               const struct ifaddrs *b,
665                               int family) {
666   int c;
667   if((c = compare_family(a, b, family))) return c;
668   return compare_flags(a, b);
669 }
670
671 int main(int argc, char **argv) {
672   int n, err;
673   struct addrinfo *res;
674   struct stringlist sl;
675   char *sockname;
676   int rcvbuf, target_rcvbuf = 0;
677   socklen_t len;
678   struct ip_mreq mreq;
679   struct ipv6_mreq mreq6;
680   disorder_client *c = NULL;
681   char *address, *port;
682   int is_multicast;
683   union any_sockaddr {
684     struct sockaddr sa;
685     struct sockaddr_in in;
686     struct sockaddr_in6 in6;
687   };
688   union any_sockaddr mgroup;
689   const char *dumpfile = 0;
690   pthread_t ltid;
691   int monitor = 0;
692   static const int one = 1;
693
694   static const struct addrinfo prefs = {
695     .ai_flags = AI_PASSIVE,
696     .ai_family = PF_INET,
697     .ai_socktype = SOCK_DGRAM,
698     .ai_protocol = IPPROTO_UDP
699   };
700
701   /* Timing information is often important to debugging playrtp, so we include
702    * timestamps in the logs */
703   logdate = 1;
704   mem_init();
705   if(!setlocale(LC_CTYPE, "")) disorder_fatal(errno, "error calling setlocale");
706   while((n = getopt_long(argc, argv, "hVdD:m:x:L:R:aocC:re:P:MA:", options, 0)) >= 0) {
707     switch(n) {
708     case 'h': help();
709     case 'V': version("disorder-playrtp");
710     case 'd': debugging = 1; break;
711     case 'D': uaudio_set("device", optarg); break;
712     case 'm': minbuffer = 2 * atol(optarg); break;
713     case 'x': maxbuffer = 2 * atol(optarg); break;
714     case 'L': logfp = fopen(optarg, "w"); break;
715     case 'R': target_rcvbuf = atoi(optarg); break;
716 #if HAVE_ALSA_ASOUNDLIB_H
717     case 'a':
718       disorder_error(0, "deprecated option; use --api alsa instead");
719       backend = &uaudio_alsa; break;
720 #endif
721 #if HAVE_SYS_SOUNDCARD_H || EMPEG_HOST
722     case 'o':
723       disorder_error(0, "deprecated option; use --api oss instead");
724       backend = &uaudio_oss; 
725       break;
726 #endif
727 #if HAVE_COREAUDIO_AUDIOHARDWARE_H      
728     case 'c':
729       disorder_error(0, "deprecated option; use --api coreaudio instead");
730       backend = &uaudio_coreaudio;
731       break;
732 #endif
733     case 'A': backend = uaudio_find(optarg); break;
734     case 'C': configfile = optarg; break;
735     case 's': control_socket = optarg; break;
736     case 'r': dumpfile = optarg; break;
737     case 'e': backend = &uaudio_command; uaudio_set("command", optarg); break;
738     case 'P': uaudio_set("pause-mode", optarg); break;
739     case 'M': monitor = 1; break;
740     default: disorder_fatal(0, "invalid option");
741     }
742   }
743   if(config_read(0, NULL)) disorder_fatal(0, "cannot read configuration");
744   if(!backend) {
745     backend = uaudio_default(uaudio_apis, UAUDIO_API_CLIENT);
746     if(!backend)
747       disorder_fatal(0, "no default uaudio API found");
748     disorder_info("default audio API %s", backend->name);
749   }
750   if(backend == &uaudio_rtp) {
751     /* This means that you have NO local sound output.  This can happen if you
752      * use a non-Apple GCC on a Mac (because it doesn't know how to compile
753      * CoreAudio/AudioHardware.h). */
754     disorder_fatal(0, "cannot play RTP through RTP");
755   }
756   if(!maxbuffer)
757     maxbuffer = 2 * minbuffer;
758   argc -= optind;
759   argv += optind;
760   switch(argc) {
761   case 0:
762     /* Get configuration from server */
763     if(!(c = disorder_new(1))) exit(EXIT_FAILURE);
764     if(disorder_connect(c)) exit(EXIT_FAILURE);
765     if(disorder_rtp_address(c, &address, &port)) exit(EXIT_FAILURE);
766     sl.n = 2;
767     sl.s = xcalloc(2, sizeof *sl.s);
768     sl.s[0] = address;
769     sl.s[1] = port;
770     break;
771   case 1:
772   case 2:
773     /* Use command-line ADDRESS+PORT or just PORT */
774     sl.n = argc;
775     sl.s = argv;
776     break;
777   default:
778     disorder_fatal(0, "usage: disorder-playrtp [OPTIONS] [[ADDRESS] PORT]");
779   }
780   disorder_info("version "VERSION" process ID %lu",
781                 (unsigned long)getpid());
782   struct sockaddr *addr;
783   socklen_t addr_len;
784   if(!strcmp(sl.s[0], "-")) {
785     /* Pick address family to match known-working connectivity to the server */
786     int family = disorder_client_af(c);
787     /* Get a list of interfaces */
788     struct ifaddrs *ifa, *bestifa = NULL;
789     if(getifaddrs(&ifa) < 0)
790       disorder_fatal(errno, "error calling getifaddrs");
791     /* Try to pick a good one */
792     for(; ifa; ifa = ifa->ifa_next) {
793       if(bestifa == NULL
794          || compare_interfaces(ifa, bestifa, family) > 0)
795         bestifa = ifa;
796     }
797     if(!bestifa)
798       disorder_fatal(0, "failed to select a network interface");
799     family = bestifa->ifa_addr->sa_family;
800     if((rtpfd = socket(family,
801                        SOCK_DGRAM,
802                        IPPROTO_UDP)) < 0)
803       disorder_fatal(errno, "error creating socket (family %d)", family);
804     /* Bind the address */
805     if(bind(rtpfd, bestifa->ifa_addr,
806             family == AF_INET
807             ? sizeof (struct sockaddr_in) : sizeof (struct sockaddr_in6)) < 0)
808       disorder_fatal(errno, "error binding socket");
809     static struct sockaddr_storage bound_address;
810     addr = (struct sockaddr *)&bound_address;
811     addr_len = sizeof bound_address;
812     if(getsockname(rtpfd, addr, &addr_len) < 0)
813       disorder_fatal(errno, "error getting socket address");
814     /* Convert to string */
815     char addrname[128], portname[32];
816     if(getnameinfo(addr, addr_len,
817                    addrname, sizeof addrname,
818                    portname, sizeof portname,
819                    NI_NUMERICHOST|NI_NUMERICSERV) < 0)
820       disorder_fatal(errno, "getnameinfo");
821     /* Ask for audio data */
822     if(disorder_rtp_request(c, addrname, portname)) exit(EXIT_FAILURE);
823     /* Report what we did */
824     disorder_info("listening on %s", format_sockaddr(addr));
825   } else {
826     /* Look up address and port */
827     if(!(res = get_address(&sl, &prefs, &sockname)))
828       exit(1);
829     addr = res->ai_addr;
830     addr_len = res->ai_addrlen;
831     /* Create the socket */
832     if((rtpfd = socket(res->ai_family,
833                        res->ai_socktype,
834                        res->ai_protocol)) < 0)
835       disorder_fatal(errno, "error creating socket");
836     /* Allow multiple listeners */
837     xsetsockopt(rtpfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof one);
838     is_multicast = multicast(addr);
839     /* The multicast and unicast/broadcast cases are different enough that they
840      * are totally split.  Trying to find commonality between them causes more
841      * trouble that it's worth. */
842     if(is_multicast) {
843       /* Stash the multicast group address */
844       memcpy(&mgroup, addr, addr_len);
845       switch(res->ai_addr->sa_family) {
846       case AF_INET:
847         mgroup.in.sin_port = 0;
848         break;
849       case AF_INET6:
850         mgroup.in6.sin6_port = 0;
851         break;
852       default:
853         disorder_fatal(0, "unsupported address family %d",
854                        (int)addr->sa_family);
855       }
856       /* Bind to to the multicast group address */
857       if(bind(rtpfd, addr, addr_len) < 0)
858         disorder_fatal(errno, "error binding socket to %s",
859                        format_sockaddr(addr));
860       /* Add multicast group membership */
861       switch(mgroup.sa.sa_family) {
862       case PF_INET:
863         mreq.imr_multiaddr = mgroup.in.sin_addr;
864         mreq.imr_interface.s_addr = 0;      /* use primary interface */
865         if(setsockopt(rtpfd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
866                       &mreq, sizeof mreq) < 0)
867           disorder_fatal(errno, "error calling setsockopt IP_ADD_MEMBERSHIP");
868         break;
869       case PF_INET6:
870         mreq6.ipv6mr_multiaddr = mgroup.in6.sin6_addr;
871         memset(&mreq6.ipv6mr_interface, 0, sizeof mreq6.ipv6mr_interface);
872         if(setsockopt(rtpfd, IPPROTO_IPV6, IPV6_JOIN_GROUP,
873                       &mreq6, sizeof mreq6) < 0)
874           disorder_fatal(errno, "error calling setsockopt IPV6_JOIN_GROUP");
875         break;
876       default:
877         disorder_fatal(0, "unsupported address family %d", res->ai_family);
878       }
879       /* Report what we did */
880       disorder_info("listening on %s multicast group %s",
881                     format_sockaddr(addr), format_sockaddr(&mgroup.sa));
882     } else {
883       /* Bind to 0/port */
884       switch(addr->sa_family) {
885       case AF_INET: {
886         struct sockaddr_in *in = (struct sockaddr_in *)addr;
887       
888         memset(&in->sin_addr, 0, sizeof (struct in_addr));
889         break;
890       }
891       case AF_INET6: {
892         struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr;
893       
894         memset(&in6->sin6_addr, 0, sizeof (struct in6_addr));
895         break;
896       }
897       default:
898         disorder_fatal(0, "unsupported family %d", (int)addr->sa_family);
899       }
900       if(bind(rtpfd, addr, addr_len) < 0)
901         disorder_fatal(errno, "error binding socket to %s",
902                        format_sockaddr(addr));
903       /* Report what we did */
904       disorder_info("listening on %s", format_sockaddr(addr));
905     }
906   }
907   len = sizeof rcvbuf;
908   if(getsockopt(rtpfd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &len) < 0)
909     disorder_fatal(errno, "error calling getsockopt SO_RCVBUF");
910   if(target_rcvbuf > rcvbuf) {
911     if(setsockopt(rtpfd, SOL_SOCKET, SO_RCVBUF,
912                   &target_rcvbuf, sizeof target_rcvbuf) < 0)
913       disorder_error(errno, "error calling setsockopt SO_RCVBUF %d", 
914                      target_rcvbuf);
915       /* We try to carry on anyway */
916     else
917       disorder_info("changed socket receive buffer from %d to %d",
918                     rcvbuf, target_rcvbuf);
919   } else
920     disorder_info("default socket receive buffer %d", rcvbuf);
921   //info("minbuffer %u maxbuffer %u", minbuffer, maxbuffer);
922   if(logfp)
923     disorder_info("WARNING: -L option can impact performance");
924   if(control_socket) {
925     pthread_t tid;
926
927     if((err = pthread_create(&tid, 0, control_thread, 0)))
928       disorder_fatal(err, "pthread_create control_thread");
929   }
930   if(dumpfile) {
931     int fd;
932     unsigned char buffer[65536];
933     size_t written;
934
935     if((fd = open(dumpfile, O_RDWR|O_TRUNC|O_CREAT, 0666)) < 0)
936       disorder_fatal(errno, "opening %s", dumpfile);
937     /* Fill with 0s to a suitable size */
938     memset(buffer, 0, sizeof buffer);
939     for(written = 0; written < dump_size * sizeof(int16_t);
940         written += sizeof buffer) {
941       if(write(fd, buffer, sizeof buffer) < 0)
942         disorder_fatal(errno, "clearing %s", dumpfile);
943     }
944     /* Map the buffer into memory for convenience */
945     dump_buffer = mmap(0, dump_size * sizeof(int16_t), PROT_READ|PROT_WRITE,
946                        MAP_SHARED, fd, 0);
947     if(dump_buffer == (void *)-1)
948       disorder_fatal(errno, "mapping %s", dumpfile);
949     disorder_info("dumping to %s", dumpfile);
950   }
951   /* Set up output.  Currently we only support L16 so there's no harm setting
952    * the format before we know what it is! */
953   uaudio_set_format(44100/*Hz*/, 2/*channels*/,
954                     16/*bits/channel*/, 1/*signed*/);
955   uaudio_set("application", "disorder-playrtp");
956   backend->start(playrtp_callback, NULL);
957   /* We receive and convert audio data in a background thread */
958   if((err = pthread_create(&ltid, 0, listen_thread, 0)))
959     disorder_fatal(err, "pthread_create listen_thread");
960   /* We have a second thread to add received packets to the queue */
961   if((err = pthread_create(&ltid, 0, queue_thread, 0)))
962     disorder_fatal(err, "pthread_create queue_thread");
963   pthread_mutex_lock(&lock);
964   time_t lastlog = 0;
965   for(;;) {
966     /* Wait for the buffer to fill up a bit */
967     playrtp_fill_buffer();
968     /* Start playing now */
969     disorder_info("Playing...");
970     next_timestamp = pheap_first(&packets)->timestamp;
971     active = 1;
972     pthread_mutex_unlock(&lock);
973     backend->activate();
974     pthread_mutex_lock(&lock);
975     /* Wait until the buffer empties out
976      *
977      * If there's a packet that we can play right now then we definitely
978      * continue.
979      *
980      * Also if there's at least minbuffer samples we carry on regardless and
981      * insert silence.  The assumption is there's been a pause but more data
982      * is now available.
983      */
984     while(nsamples >= minbuffer
985           || (nsamples > 0
986               && contains(pheap_first(&packets), next_timestamp))) {
987       if(monitor) {
988         time_t now = xtime(0);
989
990         if(now >= lastlog + 60) {
991           int offset = nsamples - minbuffer;
992           double offtime = (double)offset / (uaudio_rate * uaudio_channels);
993           disorder_info("%+d samples off (%d.%02ds, %d bytes)",
994                         offset,
995                         (int)fabs(offtime) * (offtime < 0 ? -1 : 1),
996                         (int)(fabs(offtime) * 100) % 100,
997                         offset * uaudio_bits / CHAR_BIT);
998           lastlog = now;
999         }
1000       }
1001       //fprintf(stderr, "%8u/%u (%u) PLAYING\n", nsamples, maxbuffer, minbuffer);
1002       pthread_cond_wait(&cond, &lock);
1003     }
1004 #if 0
1005     if(nsamples) {
1006       struct packet *p = pheap_first(&packets);
1007       fprintf(stderr, "nsamples=%u (%u) next_timestamp=%"PRIx32", first packet is [%"PRIx32",%"PRIx32")\n",
1008               nsamples, minbuffer, next_timestamp,p->timestamp,p->timestamp+p->nsamples);
1009     }
1010 #endif
1011     /* Stop playing for a bit until the buffer re-fills */
1012     pthread_mutex_unlock(&lock);
1013     backend->deactivate();
1014     pthread_mutex_lock(&lock);
1015     active = 0;
1016     /* Go back round */
1017   }
1018   return 0;
1019 }
1020
1021 /*
1022 Local Variables:
1023 c-basic-offset:2
1024 comment-column:40
1025 fill-column:79
1026 indent-tabs-mode:nil
1027 End:
1028 */