X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/disorder/blobdiff_plain/b64efe7e78086710c0196e2a9cd46ea03e925e90..655cae6737903aac20835588f82ca592cbdbde99:/clients/playrtp.c diff --git a/clients/playrtp.c b/clients/playrtp.c index 11df0e0..8931b6e 100644 --- a/clients/playrtp.c +++ b/clients/playrtp.c @@ -20,7 +20,32 @@ /** @file clients/playrtp.c * @brief RTP player * - * This RTP player supports Linux (ALSA) and Darwin (Core Audio) systems. + * This player supports Linux (ALSA) + * and Apple Mac (Core Audio) + * systems. There is no support for Microsoft Windows yet, and that will in + * fact probably an entirely separate program. + * + * The program runs (at least) three threads. listen_thread() is responsible + * for reading RTP packets off the wire and adding them to the linked list @ref + * received_packets, assuming they are basically sound. queue_thread() takes + * packets off this linked list and adds them to @ref packets (an operation + * which might be much slower due to contention for @ref lock). + * + * The main thread is responsible for actually playing audio. In ALSA this + * means it waits until ALSA says it's ready for more audio which it then + * plays. + * + * InCore Audio the main thread is only responsible for starting and stopping + * play: the system does the actual playback in its own private thread, and + * calls adioproc() to fetch the audio data. + * + * Sometimes it happens that there is no audio available to play. This may + * because the server went away, or a packet was dropped, or the server + * deliberately did not send any sound because it encountered a silence. + * + * Assumptions: + * - it is safe to read uint32_t values without a lock protecting them */ #include @@ -47,6 +72,7 @@ #include "defs.h" #include "vector.h" #include "heap.h" +#include "timeval.h" #if HAVE_COREAUDIO_AUDIOHARDWARE_H # include @@ -100,6 +126,9 @@ static unsigned maxbuffer; * timestamp. */ struct packet { + /** @brief Next packet in @ref next_free_packet or @ref received_packets */ + struct packet *next; + /** @brief Number of samples in this packet */ uint32_t nsamples; @@ -113,10 +142,11 @@ struct packet { /** @brief Flags * * Valid values are: - * - @ref IDLE: the idle bit was set in the RTP packet + * - @ref IDLE - the idle bit was set in the RTP packet */ unsigned flags; -#define IDLE 0x0001 /**< idle bit set in RTP packet */ +/** @brief idle bit set in RTP packet*/ +#define IDLE 0x0001 /** @brief Raw sample data * @@ -156,6 +186,35 @@ static inline int lt_packet(const struct packet *a, const struct packet *b) { return lt(a->timestamp, b->timestamp); } +/** @brief Received packets + * Protected by @ref receive_lock + * + * Received packets are added to this list, and queue_thread() picks them off + * it and adds them to @ref packets. Whenever a packet is added to it, @ref + * receive_cond is signalled. + */ +static struct packet *received_packets; + +/** @brief Tail of @ref received_packets + * Protected by @ref receive_lock + */ +static struct packet **received_tail = &received_packets; + +/** @brief Lock protecting @ref received_packets + * + * Only listen_thread() and queue_thread() ever hold this lock. It is vital + * that queue_thread() not hold it any longer than it strictly has to. */ +static pthread_mutex_t receive_lock = PTHREAD_MUTEX_INITIALIZER; + +/** @brief Condition variable signalled when @ref received_packets is updated + * + * Used by listen_thread() to notify queue_thread() that it has added another + * packet to @ref received_packets. */ +static pthread_cond_t receive_cond = PTHREAD_COND_INITIALIZER; + +/** @brief Length of @ref received_packets */ +static uint32_t nreceived; + /** @struct pheap * @brief Binary heap of packets ordered by timestamp */ HEAP_TYPE(pheap, struct packet *, lt_packet); @@ -163,8 +222,12 @@ HEAP_TYPE(pheap, struct packet *, lt_packet); /** @brief Binary heap of received packets */ static struct pheap packets; -/** @brief Total number of samples available */ -static unsigned long nsamples; +/** @brief Total number of samples available + * + * We make this volatile because we inspect it without a protecting lock, + * so the usual pthread_* guarantees aren't available. + */ +static volatile uint32_t nsamples; /** @brief Timestamp of next packet to play. * @@ -178,6 +241,12 @@ static uint32_t next_timestamp; * This is true when playing and false when just buffering. */ static int active; +/** @brief Lock protecting @ref packets */ +static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; + +/** @brief Condition variable signalled whenever @ref packets is changed */ +static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; + /** @brief Structure of free packet list */ union free_packet { struct packet p; @@ -210,14 +279,8 @@ static union free_packet *next_free_packet; */ static size_t count_free_packets; -/** @brief Lock protecting @ref packets - * - * This also protects the packet memory allocation infrastructure, @ref - * free_packets and @ref next_free_packet. */ -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; - -/** @brief Condition variable signalled whenever @ref packets is changed */ -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +/** @brief Lock protecting packet allocator */ +static pthread_mutex_t mem_lock = PTHREAD_MUTEX_INITIALIZER; static const struct option options[] = { { "help", no_argument, 0, 'h' }, @@ -227,15 +290,16 @@ static const struct option options[] = { { "min", required_argument, 0, 'm' }, { "max", required_argument, 0, 'x' }, { "buffer", required_argument, 0, 'b' }, + { "rcvbuf", required_argument, 0, 'R' }, + { "multicast", required_argument, 0, 'M' }, { 0, 0, 0, 0 } }; -/** @brief Return a new packet - * - * Assumes that @ref lock is held. */ +/** @brief Return a new packet */ static struct packet *new_packet(void) { struct packet *p; - + + pthread_mutex_lock(&mem_lock); if(free_packets) { p = &free_packets->p; free_packets = free_packets->next; @@ -247,16 +311,17 @@ static struct packet *new_packet(void) { p = &(next_free_packet++)->p; --count_free_packets; } + pthread_mutex_unlock(&mem_lock); return p; } -/** @brief Free a packet - * - * Assumes that @ref lock is held. */ +/** @brief Free a packet */ static void free_packet(struct packet *p) { union free_packet *u = (union free_packet *)p; + pthread_mutex_lock(&mem_lock); u->next = free_packets; free_packets = u; + pthread_mutex_unlock(&mem_lock); } /** @brief Drop the first packet @@ -272,10 +337,57 @@ static void drop_first_packet(void) { } } +/** @brief Background thread adding packets to heap + * + * This just transfers packets from @ref received_packets to @ref packets. It + * is important that it holds @ref receive_lock for as little time as possible, + * in order to minimize the interval between calls to read() in + * listen_thread(). + */ +static void *queue_thread(void attribute((unused)) *arg) { + struct packet *p; + + for(;;) { + /* Get the next packet */ + pthread_mutex_lock(&receive_lock); + while(!received_packets) + pthread_cond_wait(&receive_cond, &receive_lock); + p = received_packets; + received_packets = p->next; + if(!received_packets) + received_tail = &received_packets; + --nreceived; + pthread_mutex_unlock(&receive_lock); + /* Add it to the heap */ + pthread_mutex_lock(&lock); + pheap_insert(&packets, p); + nsamples += p->nsamples; + pthread_cond_broadcast(&cond); + pthread_mutex_unlock(&lock); + } +} + /** @brief Background thread collecting samples * * This function collects samples, perhaps converts them to the target format, - * and adds them to the packet list. */ + * and adds them to the packet list. + * + * It is crucial that the gap between successive calls to read() is as small as + * possible: otherwise packets will be dropped. + * + * We use a binary heap to ensure that the unavoidable effort is at worst + * logarithmic in the total number of packets - in fact if packets are mostly + * received in order then we will largely do constant work per packet since the + * newest packet will always be last. + * + * Of more concern is that we must acquire the lock on the heap to add a packet + * to it. If this proves a problem in practice then the answer would be + * (probably doubly) linked list with new packets added the end and a second + * thread which reads packets off the list and adds them to the heap. + * + * We keep memory allocation (mostly) very fast by keeping pre-allocated + * packets around; see @ref new_packet(). + */ static void *listen_thread(void attribute((unused)) *arg) { struct packet *p = 0; int n; @@ -285,11 +397,8 @@ static void *listen_thread(void attribute((unused)) *arg) { struct iovec iov[2]; for(;;) { - if(!p) { - pthread_mutex_lock(&lock); + if(!p) p = new_packet(); - pthread_mutex_unlock(&lock); - } iov[0].iov_base = &header; iov[0].iov_len = sizeof header; iov[1].iov_base = p->samples_raw; @@ -316,7 +425,7 @@ static void *listen_thread(void attribute((unused)) *arg) { timestamp, next_timestamp); continue; } - pthread_mutex_lock(&lock); + p->next = 0; p->flags = 0; p->timestamp = timestamp; /* Convert to target format */ @@ -325,9 +434,6 @@ static void *listen_thread(void attribute((unused)) *arg) { switch(header.mpt & 0x7F) { case 10: p->nsamples = (n - sizeof header) / sizeof(uint16_t); - /* ALSA can do any necessary conversion itself (though it might be better - * to do any necessary conversion in the background) */ - /* TODO we could readv into the buffer */ break; /* TODO support other RFC3551 media types (when the speaker does) */ default: @@ -342,21 +448,27 @@ static void *listen_thread(void attribute((unused)) *arg) { * This is rather unsatisfactory: it means that if packets get heavily * out of order then we guarantee dropouts. But for now... */ if(nsamples >= maxbuffer) { - info("buffer full"); + pthread_mutex_lock(&lock); while(nsamples >= maxbuffer) pthread_cond_wait(&cond, &lock); + pthread_mutex_unlock(&lock); } - /* Add the packet to the heap */ - pheap_insert(&packets, p); - nsamples += p->nsamples; + /* Add the packet to the receive queue */ + pthread_mutex_lock(&receive_lock); + *received_tail = p; + received_tail = &p->next; + ++nreceived; + pthread_cond_signal(&receive_cond); + pthread_mutex_unlock(&receive_lock); /* We'll need a new packet */ p = 0; - pthread_cond_broadcast(&cond); - pthread_mutex_unlock(&lock); } } -/** @brief Return true if @p p contains @p timestamp */ +/** @brief Return true if @p p contains @p timestamp + * + * Containment implies that a sample @p timestamp exists within the packet. + */ static inline int contains(const struct packet *p, uint32_t timestamp) { const uint32_t packet_start = p->timestamp; const uint32_t packet_end = p->timestamp + p->nsamples; @@ -365,6 +477,43 @@ static inline int contains(const struct packet *p, uint32_t timestamp) { && lt(timestamp, packet_end)); } +/** @brief Wait until the buffer is adequately full + * + * Must be called with @ref lock held. + */ +static void fill_buffer(void) { + while(nsamples) + drop_first_packet(); + info("Buffering..."); + while(nsamples < readahead) + pthread_cond_wait(&cond, &lock); + next_timestamp = pheap_first(&packets)->timestamp; + active = 1; +} + +/** @brief Find next packet + * @return Packet to play or NULL if none found + * + * The return packet is merely guaranteed not to be in the past: it might be + * the first packet in the future rather than one that is actually suitable to + * play. + * + * Must be called with @ref lock held. + */ +static struct packet *next_packet(void) { + while(pheap_count(&packets)) { + struct packet *const p = pheap_first(&packets); + if(le(p->timestamp + p->nsamples, next_timestamp)) { + /* This packet is in the past. Drop it and try another one. */ + drop_first_packet(); + } else + /* This packet is NOT in the past. (It might be in the future + * however.) */ + return p; + } + return 0; +} + #if HAVE_COREAUDIO_AUDIOHARDWARE_H /** @brief Callback from Core Audio */ static OSStatus adioproc @@ -377,33 +526,16 @@ static OSStatus adioproc void attribute((unused)) *inClientData) { UInt32 nbuffers = outOutputData->mNumberBuffers; AudioBuffer *ab = outOutputData->mBuffers; - const struct packet *p; uint32_t samples_available; - struct timeval in, out; - gettimeofday(&in, 0); pthread_mutex_lock(&lock); while(nbuffers > 0) { float *samplesOut = ab->mData; size_t samplesOutLeft = ab->mDataByteSize / sizeof (float); while(samplesOutLeft > 0) { - /* Look for a suitable packet, dropping any unsuitable ones along the - * way. Unsuitable packets are ones that are in the past. */ - while(pheap_count(&packets)) { - p = pheap_first(&packets); - if(le(p->timestamp + p->nsamples, next_timestamp)) - /* This packet is in the past. Drop it and try another one. */ - drop_first_packet(); - else - /* This packet is NOT in the past. (It might be in the future - * however.) */ - break; - } - p = pheap_count(&packets) ? pheap_first(&packets) : 0; + const struct packet *p = next_packet(); if(p && contains(p, next_timestamp)) { - if(p->flags & IDLE) - fprintf(stderr, "\nIDLE\n"); /* This packet is ready to play */ const uint32_t packet_end = p->timestamp + p->nsamples; const uint32_t offset = next_timestamp - p->timestamp; @@ -418,7 +550,6 @@ static OSStatus adioproc *samplesOut++ = (int16_t)ntohs(*ptr++) * (0.5 / 32767); /* We don't bother junking the packet - that'll be dealt with next time * round */ - write(2, ".", 1); } else { /* No packet is ready to play (and there might be no packet at all) */ samples_available = p ? p->timestamp - next_timestamp @@ -430,20 +561,12 @@ static OSStatus adioproc next_timestamp += samples_available; samplesOut += samples_available; samplesOutLeft -= samples_available; - write(2, "?", 1); } } ++ab; --nbuffers; } pthread_mutex_unlock(&lock); - gettimeofday(&out, 0); - { - static double max; - double thistime = (out.tv_sec - in.tv_sec) + (out.tv_usec - in.tv_usec) / 1000000.0; - if(thistime > max) - fprintf(stderr, "adioproc: %8.8fs\n", max = thistime); - } return 0; } #endif @@ -531,7 +654,7 @@ static void wait_alsa(void) { } } -/** @brief Play some sound +/** @brief Play some sound via ALSA * @param s Pointer to sample data * @param n Number of samples * @return 0 on success, -1 on non-fatal error @@ -564,7 +687,6 @@ static int alsa_writei(const void *s, size_t n) { * @return 0 on success, -1 on non-fatal error */ static int alsa_play(const struct packet *p) { - write(2, ".", 1); return alsa_writei(p->samples_raw + next_timestamp - p->timestamp, (p->timestamp + p->nsamples) - next_timestamp); } @@ -579,7 +701,6 @@ static int alsa_infill(const struct packet *p) { if(p && samples_available > p->timestamp - next_timestamp) samples_available = p->timestamp - next_timestamp; - write(2, "?", 1); return alsa_writei(zeros, samples_available); } @@ -601,41 +722,6 @@ static void alsa_reset(int hard_reset) { } #endif -/** @brief Wait until the buffer is adequately full - * - * Must be called with @ref lock held. - */ -static void fill_buffer(void) { - info("Buffering..."); - while(nsamples < readahead) - pthread_cond_wait(&cond, &lock); - next_timestamp = pheap_first(&packets)->timestamp; - active = 1; -} - -/** @brief Find next packet - * @return Packet to play or NULL if none found - * - * The return packet is merely guaranteed not to be in the past: it might be - * the first packet in the future rather than one that is actually suitable to - * play. - * - * Must be called with @ref lock held. - */ -static struct packet *next_packet(void) { - while(pheap_count(&packets)) { - struct packet *const p = pheap_first(&packets); - if(le(p->timestamp + p->nsamples, next_timestamp)) { - /* This packet is in the past. Drop it and try another one. */ - drop_first_packet(); - } else - /* This packet is NOT in the past. (It might be in the future - * however.) */ - return p; - } - return 0; -} - /** @brief Play an RTP stream * * This is the guts of the program. It is responsible for: @@ -650,6 +736,8 @@ static void play_rtp(void) { /* We receive and convert audio data in a background thread */ pthread_create(<id, 0, listen_thread, 0); + /* We have a second thread to add received packets to the queue */ + pthread_create(<id, 0, queue_thread, 0); #if API_ALSA { struct packet *p; @@ -670,7 +758,10 @@ static void play_rtp(void) { info("Playing..."); /* Keep playing until the buffer empties out, or ALSA tells us to get * lost */ - while(nsamples >= minbuffer && !escape) { + while((nsamples >= minbuffer + || (nsamples > 0 + && contains(pheap_first(&packets), next_timestamp))) + && !escape) { /* Wait for ALSA to ask us for more data */ pthread_mutex_unlock(&lock); wait_alsa(); @@ -741,7 +832,9 @@ static void play_rtp(void) { if(status) fatal(0, "AudioDeviceStart: %d", (int)status); /* Wait until the buffer empties out */ - while(nsamples >= minbuffer) + while(nsamples >= minbuffer + || (nsamples > 0 + && contains(pheap_first(&packets), next_timestamp))) pthread_cond_wait(&cond, &lock); /* Stop playing for a bit until the buffer re-fills */ status = AudioDeviceStop(adid, adioproc); @@ -765,6 +858,8 @@ static void help(void) { " --min, -m FRAMES Buffer low water mark\n" " --buffer, -b FRAMES Buffer high water mark\n" " --max, -x FRAMES Buffer maximum size\n" + " --rcvbuf, -R BYTES Socket receive buffer size\n" + " --multicast, -M GROUP Join multicast group\n" " --help, -h Display usage message\n" " --version, -V Display version number\n" ); @@ -784,6 +879,11 @@ int main(int argc, char **argv) { struct addrinfo *res; struct stringlist sl; char *sockname; + int rcvbuf, target_rcvbuf = 131072; + socklen_t len; + char *multicast_group = 0; + struct ip_mreq mreq; + struct ipv6_mreq mreq6; static const struct addrinfo prefs = { AI_PASSIVE, @@ -798,7 +898,7 @@ int main(int argc, char **argv) { mem_init(); if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale"); - while((n = getopt_long(argc, argv, "hVdD:m:b:x:L:", options, 0)) >= 0) { + while((n = getopt_long(argc, argv, "hVdD:m:b:x:L:R:M:", options, 0)) >= 0) { switch(n) { case 'h': help(); case 'V': version(); @@ -808,6 +908,8 @@ int main(int argc, char **argv) { case 'b': readahead = 2 * atol(optarg); break; case 'x': maxbuffer = 2 * atol(optarg); break; case 'L': logfp = fopen(optarg, "w"); break; + case 'R': target_rcvbuf = atoi(optarg); break; + case 'M': multicast_group = optarg; break; default: fatal(0, "invalid option"); } } @@ -828,6 +930,44 @@ int main(int argc, char **argv) { fatal(errno, "error creating socket"); if(bind(rtpfd, res->ai_addr, res->ai_addrlen) < 0) fatal(errno, "error binding socket to %s", sockname); + if(multicast_group) { + if((n = getaddrinfo(multicast_group, 0, &prefs, &res))) + fatal(0, "getaddrinfo %s: %s", multicast_group, gai_strerror(n)); + switch(res->ai_family) { + case PF_INET: + mreq.imr_multiaddr = ((struct sockaddr_in *)res->ai_addr)->sin_addr; + mreq.imr_interface.s_addr = 0; /* use primary interface */ + if(setsockopt(rtpfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mreq, sizeof mreq) < 0) + fatal(errno, "error calling setsockopt IP_ADD_MEMBERSHIP"); + break; + case PF_INET6: + mreq6.ipv6mr_multiaddr = ((struct sockaddr_in6 *)res->ai_addr)->sin6_addr; + memset(&mreq6.ipv6mr_interface, 0, sizeof mreq6.ipv6mr_interface); + if(setsockopt(rtpfd, IPPROTO_IPV6, IPV6_JOIN_GROUP, + &mreq6, sizeof mreq6) < 0) + fatal(errno, "error calling setsockopt IPV6_JOIN_GROUP"); + break; + default: + fatal(0, "unsupported address family %d", res->ai_family); + } + } + len = sizeof rcvbuf; + if(getsockopt(rtpfd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &len) < 0) + fatal(errno, "error calling getsockopt SO_RCVBUF"); + if(target_rcvbuf > rcvbuf) { + if(setsockopt(rtpfd, SOL_SOCKET, SO_RCVBUF, + &target_rcvbuf, sizeof target_rcvbuf) < 0) + error(errno, "error calling setsockopt SO_RCVBUF %d", + target_rcvbuf); + /* We try to carry on anyway */ + else + info("changed socket receive buffer from %d to %d", + rcvbuf, target_rcvbuf); + } else + info("default socket receive buffer %d", rcvbuf); + if(logfp) + info("WARNING: -L option can impact performance"); play_rtp(); return 0; }