From 189e98302534f1bf59c7ed0b29ba6dd025b45fc1 Mon Sep 17 00:00:00 2001 Message-Id: <189e98302534f1bf59c7ed0b29ba6dd025b45fc1.1713583052.git.mdw@distorted.org.uk> From: Mark Wooding Date: Sun, 23 Sep 2007 12:54:25 +0100 Subject: [PATCH] separate thread to add to heap Organization: Straylight/Edgeware From: Richard Kettlewell --- clients/playrtp.c | 146 +++++++++++++++++++++++++++++++++------------- lib/timeval.h | 4 +- server/speaker.c | 12 ++-- 3 files changed, 114 insertions(+), 48 deletions(-) diff --git a/clients/playrtp.c b/clients/playrtp.c index d64162b..1ed0d93 100644 --- a/clients/playrtp.c +++ b/clients/playrtp.c @@ -26,9 +26,11 @@ * systems. There is no support for Microsoft Windows yet, and that will in * fact probably an entirely separate program. * - * The program runs (at least) two threads. listen_thread() is responsible for - * reading RTP packets off the wire and adding them to the binary heap @ref - * packets, assuming they are basically sound. + * The program runs (at least) three threads. listen_thread() is responsible + * for reading RTP packets off the wire and adding them to the linked list @ref + * received_packets, assuming they are basically sound. queue_thread() takes + * packets off this linked list and adds them to @ref packets (an operation + * which might be much slower due to contention for @ref lock). * * The main thread is responsible for actually playing audio. In ALSA this * means it waits until ALSA says it's ready for more audio which it then @@ -41,6 +43,9 @@ * Sometimes it happens that there is no audio available to play. This may * because the server went away, or a packet was dropped, or the server * deliberately did not send any sound because it encountered a silence. + * + * Assumptions: + * - it is safe to read uint32_t values without a lock protecting them */ #include @@ -67,6 +72,7 @@ #include "defs.h" #include "vector.h" #include "heap.h" +#include "timeval.h" #if HAVE_COREAUDIO_AUDIOHARDWARE_H # include @@ -120,6 +126,9 @@ static unsigned maxbuffer; * timestamp. */ struct packet { + /** @brief Next packet in @ref next_free_packet or @ref received_packets */ + struct packet *next; + /** @brief Number of samples in this packet */ uint32_t nsamples; @@ -177,6 +186,35 @@ static inline int lt_packet(const struct packet *a, const struct packet *b) { return lt(a->timestamp, b->timestamp); } +/** @brief Received packets + * Protected by @ref receive_lock + * + * Received packets are added to this list, and queue_thread() picks them off + * it and adds them to @ref packets. Whenever a packet is added to it, @ref + * receive_cond is signalled. + */ +static struct packet *received_packets; + +/** @brief Tail of @ref received_packets + * Protected by @ref receive_lock + */ +static struct packet **received_tail = &received_packets; + +/** @brief Lock protecting @ref received_packets + * + * Only listen_thread() and queue_thread() ever hold this lock. It is vital + * that queue_thread() not hold it any longer than it strictly has to. */ +static pthread_mutex_t receive_lock = PTHREAD_MUTEX_INITIALIZER; + +/** @brief Condition variable signalled when @ref received_packets is updated + * + * Used by listen_thread() to notify queue_thread() that it has added another + * packet to @ref received_packets. */ +static pthread_cond_t receive_cond = PTHREAD_COND_INITIALIZER; + +/** @brief Length of @ref received_packets */ +static uint32_t nreceived; + /** @struct pheap * @brief Binary heap of packets ordered by timestamp */ HEAP_TYPE(pheap, struct packet *, lt_packet); @@ -184,8 +222,12 @@ HEAP_TYPE(pheap, struct packet *, lt_packet); /** @brief Binary heap of received packets */ static struct pheap packets; -/** @brief Total number of samples available */ -static unsigned long nsamples; +/** @brief Total number of samples available + * + * We make this volatile because we inspect it without a protecting lock, + * so the usual pthread_* guarantees aren't available. + */ +static volatile uint32_t nsamples; /** @brief Timestamp of next packet to play. * @@ -199,6 +241,12 @@ static uint32_t next_timestamp; * This is true when playing and false when just buffering. */ static int active; +/** @brief Lock protecting @ref packets */ +static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; + +/** @brief Condition variable signalled whenever @ref packets is changed */ +static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; + /** @brief Structure of free packet list */ union free_packet { struct packet p; @@ -231,14 +279,8 @@ static union free_packet *next_free_packet; */ static size_t count_free_packets; -/** @brief Lock protecting @ref packets - * - * This also protects the packet memory allocation infrastructure, @ref - * free_packets and @ref next_free_packet. */ -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; - -/** @brief Condition variable signalled whenever @ref packets is changed */ -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +/** @brief Lock protecting packet allocator */ +static pthread_mutex_t mem_lock = PTHREAD_MUTEX_INITIALIZER; static const struct option options[] = { { "help", no_argument, 0, 'h' }, @@ -251,12 +293,11 @@ static const struct option options[] = { { 0, 0, 0, 0 } }; -/** @brief Return a new packet - * - * Assumes that @ref lock is held. */ +/** @Brief Return a new packet */ static struct packet *new_packet(void) { struct packet *p; - + + pthread_mutex_lock(&mem_lock); if(free_packets) { p = &free_packets->p; free_packets = free_packets->next; @@ -268,16 +309,17 @@ static struct packet *new_packet(void) { p = &(next_free_packet++)->p; --count_free_packets; } + pthread_mutex_unlock(&mem_lock); return p; } -/** @brief Free a packet - * - * Assumes that @ref lock is held. */ +/** @brief Free a packet */ static void free_packet(struct packet *p) { union free_packet *u = (union free_packet *)p; + pthread_mutex_lock(&mem_lock); u->next = free_packets; free_packets = u; + pthread_mutex_unlock(&mem_lock); } /** @brief Drop the first packet @@ -293,6 +335,36 @@ static void drop_first_packet(void) { } } +/** @brief Background thread adding packets to heap + * + * This just transfers packets from @ref received_packets to @ref packets. It + * is important that it holds @ref receive_lock for as little time as possible, + * in order to minimize the interval between calls to read() in + * listen_thread(). + */ +static void *queue_thread(void attribute((unused)) *arg) { + struct packet *p; + + for(;;) { + /* Get the next packet */ + pthread_mutex_lock(&receive_lock); + while(!received_packets) + pthread_cond_wait(&receive_cond, &receive_lock); + p = received_packets; + received_packets = p->next; + if(!received_packets) + received_tail = &received_packets; + --nreceived; + pthread_mutex_unlock(&receive_lock); + /* Add it to the heap */ + pthread_mutex_lock(&lock); + pheap_insert(&packets, p); + nsamples += p->nsamples; + pthread_cond_broadcast(&cond); + pthread_mutex_unlock(&lock); + } +} + /** @brief Background thread collecting samples * * This function collects samples, perhaps converts them to the target format, @@ -323,11 +395,8 @@ static void *listen_thread(void attribute((unused)) *arg) { struct iovec iov[2]; for(;;) { - if(!p) { - pthread_mutex_lock(&lock); + if(!p) p = new_packet(); - pthread_mutex_unlock(&lock); - } iov[0].iov_base = &header; iov[0].iov_len = sizeof header; iov[1].iov_base = p->samples_raw; @@ -354,7 +423,7 @@ static void *listen_thread(void attribute((unused)) *arg) { timestamp, next_timestamp); continue; } - pthread_mutex_lock(&lock); + p->next = 0; p->flags = 0; p->timestamp = timestamp; /* Convert to target format */ @@ -377,18 +446,20 @@ static void *listen_thread(void attribute((unused)) *arg) { * This is rather unsatisfactory: it means that if packets get heavily * out of order then we guarantee dropouts. But for now... */ if(nsamples >= maxbuffer) { - //info("Buffer full"); - write(2, "B", 1); + pthread_mutex_lock(&lock); while(nsamples >= maxbuffer) pthread_cond_wait(&cond, &lock); + pthread_mutex_unlock(&lock); } - /* Add the packet to the heap */ - pheap_insert(&packets, p); - nsamples += p->nsamples; + /* Add the packet to the receive queue */ + pthread_mutex_lock(&receive_lock); + *received_tail = p; + received_tail = &p->next; + ++nreceived; + pthread_cond_signal(&receive_cond); + pthread_mutex_unlock(&receive_lock); /* We'll need a new packet */ p = 0; - pthread_cond_broadcast(&cond); - pthread_mutex_unlock(&lock); } } @@ -461,8 +532,6 @@ static OSStatus adioproc while(samplesOutLeft > 0) { const struct packet *p = next_packet(); if(p && contains(p, next_timestamp)) { - if(p->flags & IDLE) - write(2, "I", 1); /* This packet is ready to play */ const uint32_t packet_end = p->timestamp + p->nsamples; const uint32_t offset = next_timestamp - p->timestamp; @@ -477,7 +546,6 @@ static OSStatus adioproc *samplesOut++ = (int16_t)ntohs(*ptr++) * (0.5 / 32767); /* We don't bother junking the packet - that'll be dealt with next time * round */ - write(2, ".", 1); } else { /* No packet is ready to play (and there might be no packet at all) */ samples_available = p ? p->timestamp - next_timestamp @@ -489,7 +557,6 @@ static OSStatus adioproc next_timestamp += samples_available; samplesOut += samples_available; samplesOutLeft -= samples_available; - write(2, "?", 1); } } ++ab; @@ -595,7 +662,6 @@ static int alsa_writei(const void *s, size_t n) { /* Something went wrong */ switch(frames_written) { case -EAGAIN: - write(2, "#", 1); return 0; case -EPIPE: error(0, "error calling snd_pcm_writei: %ld", @@ -617,9 +683,6 @@ static int alsa_writei(const void *s, size_t n) { * @return 0 on success, -1 on non-fatal error */ static int alsa_play(const struct packet *p) { - if(p->flags & IDLE) - write(2, "I", 1); - write(2, ".", 1); return alsa_writei(p->samples_raw + next_timestamp - p->timestamp, (p->timestamp + p->nsamples) - next_timestamp); } @@ -634,7 +697,6 @@ static int alsa_infill(const struct packet *p) { if(p && samples_available > p->timestamp - next_timestamp) samples_available = p->timestamp - next_timestamp; - write(2, "?", 1); return alsa_writei(zeros, samples_available); } @@ -670,6 +732,8 @@ static void play_rtp(void) { /* We receive and convert audio data in a background thread */ pthread_create(<id, 0, listen_thread, 0); + /* We have a second thread to add received packets to the queue */ + pthread_create(<id, 0, queue_thread, 0); #if API_ALSA { struct packet *p; diff --git a/lib/timeval.h b/lib/timeval.h index ff8ccf0..9ec8c31 100644 --- a/lib/timeval.h +++ b/lib/timeval.h @@ -38,8 +38,8 @@ static inline struct timeval tvsub(const struct timeval a, return r; } -static inline uint64_t tvsub_us(const struct timeval a, - const struct timeval b) { +static inline int64_t tvsub_us(const struct timeval a, + const struct timeval b) { return (((uint64_t)a.tv_sec * 1000000 + a.tv_usec) - ((uint64_t)b.tv_sec * 1000000 + b.tv_usec)); } diff --git a/server/speaker.c b/server/speaker.c index c5fc668..f9badeb 100644 --- a/server/speaker.c +++ b/server/speaker.c @@ -107,7 +107,7 @@ #define NETWORK_BYTES 1024 /** @brief Maximum RTP playahead (seconds) */ -#define RTP_AHEAD 2 +#define RTP_AHEAD 1 /** @brief Maximum number of FDs to poll for */ #define NFDS 256 @@ -971,8 +971,8 @@ int main(int argc, char **argv) { struct timeval now; uint64_t target_us; uint64_t target_rtp_time; - const uint64_t ahead = RTP_AHEAD * config->sample_format.rate - * config->sample_format.channels; + const uint64_t samples_ahead = RTP_AHEAD * config->sample_format.rate + * config->sample_format.channels; static unsigned logit; @@ -988,14 +988,16 @@ int main(int argc, char **argv) { * config->sample_format.channels) / 1000000; +#if 0 /* TODO remove logging guff */ if(!(logit++ & 1023)) info("rtp_time %llu target %llu difference %lld [%lld]", rtp_time, target_rtp_time, rtp_time - target_rtp_time, - ahead); + samples_ahead); +#endif if(rtp_time < target_rtp_time - || rtp_time - target_rtp_time < ahead) + || rtp_time - target_rtp_time < samples_ahead) bfd_slot = addfd(bfd, POLLOUT); break; } -- [mdw]