X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/disorder/blobdiff_plain/e83d0967d4c0965eb8036248acc20d1bf12ad1d8..bfd27c143e12fd330d30f444fcff72a21cfaf5a7:/clients/playrtp.c diff --git a/clients/playrtp.c b/clients/playrtp.c index 7ec35d0..33623ab 100644 --- a/clients/playrtp.c +++ b/clients/playrtp.c @@ -17,6 +17,36 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA */ +/** @file clients/playrtp.c + * @brief RTP player + * + * This player supports Linux (ALSA) + * and Apple Mac (Core Audio) + * systems. There is no support for Microsoft Windows yet, and that will in + * fact probably an entirely separate program. + * + * The program runs (at least) three threads. listen_thread() is responsible + * for reading RTP packets off the wire and adding them to the linked list @ref + * received_packets, assuming they are basically sound. queue_thread() takes + * packets off this linked list and adds them to @ref packets (an operation + * which might be much slower due to contention for @ref lock). + * + * The main thread is responsible for actually playing audio. In ALSA this + * means it waits until ALSA says it's ready for more audio which it then + * plays. + * + * InCore Audio the main thread is only responsible for starting and stopping + * play: the system does the actual playback in its own private thread, and + * calls adioproc() to fetch the audio data. + * + * Sometimes it happens that there is no audio available to play. This may + * because the server went away, or a packet was dropped, or the server + * deliberately did not send any sound because it encountered a silence. + * + * Assumptions: + * - it is safe to read uint32_t values without a lock protecting them + */ #include #include "types.h" @@ -29,6 +59,9 @@ #include #include #include +#include +#include +#include #include "log.h" #include "mem.h" @@ -36,68 +69,340 @@ #include "addr.h" #include "syscalls.h" #include "rtp.h" -#include "debug.h" +#include "defs.h" +#include "vector.h" +#include "heap.h" +#include "timeval.h" #if HAVE_COREAUDIO_AUDIOHARDWARE_H # include #endif +#if API_ALSA +#include +#endif +#define readahead linux_headers_are_borked + +/** @brief RTP socket */ static int rtpfd; -#define MAXSAMPLES 2048 /* max samples/frame we'll support */ -/* NB two channels = two samples in this program! */ -#define MINBUFFER 8820 /* when to stop playing */ -#define READAHEAD 88200 /* how far to read ahead */ -#define MAXBUFFER (3 * 88200) /* maximum buffer contents */ - -struct frame { - struct frame *next; /* another frame */ - int nsamples; /* number of samples */ - int nused; /* number of samples used so far */ - uint32_t timestamp; /* timestamp from packet */ -#if HAVE_COREAUDIO_AUDIOHARDWARE_H - float samples[MAXSAMPLES]; /* converted sample data */ -#endif +/** @brief Log output */ +static FILE *logfp; + +/** @brief Output device */ +static const char *device; + +/** @brief Maximum samples per packet we'll support + * + * NB that two channels = two samples in this program. + */ +#define MAXSAMPLES 2048 + +/** @brief Minimum low watermark + * + * We'll stop playing if there's only this many samples in the buffer. */ +static unsigned minbuffer = 2 * 44100 / 10; /* 0.2 seconds */ + +/** @brief Buffer high watermark + * + * We'll only start playing when this many samples are available. */ +static unsigned readahead = 2 * 2 * 44100; + +/** @brief Maximum buffer size + * + * We'll stop reading from the network if we have this many samples. */ +static unsigned maxbuffer; + +/** @brief Number of samples to infill by in one go + * + * This is an upper bound - in practice we expect the underlying audio API to + * only ask for a much smaller number of samples in any one go. + */ +#define INFILL_SAMPLES (44100 * 2) /* 1s */ + +/** @brief Received packet + * + * Received packets are kept in a binary heap (see @ref pheap) ordered by + * timestamp. + */ +struct packet { + /** @brief Next packet in @ref next_free_packet or @ref received_packets */ + struct packet *next; + + /** @brief Number of samples in this packet */ + uint32_t nsamples; + + /** @brief Timestamp from RTP packet + * + * NB that "timestamps" are really sample counters. Use lt() or lt_packet() + * to compare timestamps. + */ + uint32_t timestamp; + + /** @brief Flags + * + * Valid values are: + * - @ref IDLE - the idle bit was set in the RTP packet + */ + unsigned flags; +/** @brief idle bit set in RTP packet*/ +#define IDLE 0x0001 + + /** @brief Raw sample data + * + * Only the first @p nsamples samples are defined; the rest is uninitialized + * data. + */ + uint16_t samples_raw[MAXSAMPLES]; }; -static unsigned long nsamples; /* total samples available */ +/** @brief Return true iff \f$a < b\f$ in sequence-space arithmetic + * + * Specifically it returns true if \f$(a-b) mod 2^{32} < 2^{31}\f$. + * + * See also lt_packet(). + */ +static inline int lt(uint32_t a, uint32_t b) { + return (uint32_t)(a - b) & 0x80000000; +} + +/** @brief Return true iff a >= b in sequence-space arithmetic */ +static inline int ge(uint32_t a, uint32_t b) { + return !lt(a, b); +} + +/** @brief Return true iff a > b in sequence-space arithmetic */ +static inline int gt(uint32_t a, uint32_t b) { + return lt(b, a); +} + +/** @brief Return true iff a <= b in sequence-space arithmetic */ +static inline int le(uint32_t a, uint32_t b) { + return !lt(b, a); +} + +/** @brief Ordering for packets, used by @ref pheap */ +static inline int lt_packet(const struct packet *a, const struct packet *b) { + return lt(a->timestamp, b->timestamp); +} + +/** @brief Received packets + * Protected by @ref receive_lock + * + * Received packets are added to this list, and queue_thread() picks them off + * it and adds them to @ref packets. Whenever a packet is added to it, @ref + * receive_cond is signalled. + */ +static struct packet *received_packets; + +/** @brief Tail of @ref received_packets + * Protected by @ref receive_lock + */ +static struct packet **received_tail = &received_packets; + +/** @brief Lock protecting @ref received_packets + * + * Only listen_thread() and queue_thread() ever hold this lock. It is vital + * that queue_thread() not hold it any longer than it strictly has to. */ +static pthread_mutex_t receive_lock = PTHREAD_MUTEX_INITIALIZER; + +/** @brief Condition variable signalled when @ref received_packets is updated + * + * Used by listen_thread() to notify queue_thread() that it has added another + * packet to @ref received_packets. */ +static pthread_cond_t receive_cond = PTHREAD_COND_INITIALIZER; + +/** @brief Length of @ref received_packets */ +static uint32_t nreceived; + +/** @struct pheap + * @brief Binary heap of packets ordered by timestamp */ +HEAP_TYPE(pheap, struct packet *, lt_packet); -static struct frame *frames; /* received frames in ascending order - * of timestamp */ +/** @brief Binary heap of received packets */ +static struct pheap packets; + +/** @brief Total number of samples available + * + * We make this volatile because we inspect it without a protecting lock, + * so the usual pthread_* guarantees aren't available. + */ +static volatile uint32_t nsamples; + +/** @brief Timestamp of next packet to play. + * + * This is set to the timestamp of the last packet, plus the number of + * samples it contained. Only valid if @ref active is nonzero. + */ +static uint32_t next_timestamp; + +/** @brief True if actively playing + * + * This is true when playing and false when just buffering. */ +static int active; + +/** @brief Lock protecting @ref packets */ static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; -/* lock protecting frame list */ -static pthread_cond_t cond = PTHREAD_CONDVAR_INITIALIZER; -/* signalled whenever we add a new frame */ +/** @brief Condition variable signalled whenever @ref packets is changed */ +static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; + +/** @brief Structure of free packet list */ +union free_packet { + struct packet p; + union free_packet *next; +}; + +/** @brief Linked list of free packets + * + * This is a linked list of formerly used packets. For preference we re-use + * packets that have already been used rather than unused ones, to limit the + * size of the program's working set. If there are no free packets in the list + * we try @ref next_free_packet instead. + * + * Must hold @ref lock when accessing this. + */ +static union free_packet *free_packets; + +/** @brief Array of new free packets + * + * There are @ref count_free_packets ready to use at this address. If there + * are none left we allocate more memory. + * + * Must hold @ref lock when accessing this. + */ +static union free_packet *next_free_packet; + +/** @brief Count of new free packets at @ref next_free_packet + * + * Must hold @ref lock when accessing this. + */ +static size_t count_free_packets; + +/** @brief Lock protecting packet allocator */ +static pthread_mutex_t mem_lock = PTHREAD_MUTEX_INITIALIZER; static const struct option options[] = { { "help", no_argument, 0, 'h' }, { "version", no_argument, 0, 'V' }, { "debug", no_argument, 0, 'd' }, + { "device", required_argument, 0, 'D' }, + { "min", required_argument, 0, 'm' }, + { "max", required_argument, 0, 'x' }, + { "buffer", required_argument, 0, 'b' }, + { "rcvbuf", required_argument, 0, 'R' }, { 0, 0, 0, 0 } }; -/* Return true iff a > b in sequence-space arithmetic */ -static inline int gt(const struct frame *a, const struct frame *b) { - return (uint32_t)(a->timestamp - b->timestamp) < 0x80000000; +/** @brief Return a new packet */ +static struct packet *new_packet(void) { + struct packet *p; + + pthread_mutex_lock(&mem_lock); + if(free_packets) { + p = &free_packets->p; + free_packets = free_packets->next; + } else { + if(!count_free_packets) { + next_free_packet = xcalloc(1024, sizeof (union free_packet)); + count_free_packets = 1024; + } + p = &(next_free_packet++)->p; + --count_free_packets; + } + pthread_mutex_unlock(&mem_lock); + return p; +} + +/** @brief Free a packet */ +static void free_packet(struct packet *p) { + union free_packet *u = (union free_packet *)p; + pthread_mutex_lock(&mem_lock); + u->next = free_packets; + free_packets = u; + pthread_mutex_unlock(&mem_lock); } -/* Background thread that reads frames over the network and add them to the - * list */ -static listen_thread(void attribute((unused)) *arg) { - struct frame *f = 0, **ff; - int n, i; - union { - struct rtp_header header; - uint8_t bytes[sizeof(uint16_t) * MAXSAMPLES + sizeof (struct rtp_header)]; - } packet; - const uint16_t *const samples = (uint16_t *)(packet.bytes - + sizeof (struct rtp_header)); +/** @brief Drop the first packet + * + * Assumes that @ref lock is held. + */ +static void drop_first_packet(void) { + if(pheap_count(&packets)) { + struct packet *const p = pheap_remove(&packets); + nsamples -= p->nsamples; + free_packet(p); + pthread_cond_broadcast(&cond); + } +} + +/** @brief Background thread adding packets to heap + * + * This just transfers packets from @ref received_packets to @ref packets. It + * is important that it holds @ref receive_lock for as little time as possible, + * in order to minimize the interval between calls to read() in + * listen_thread(). + */ +static void *queue_thread(void attribute((unused)) *arg) { + struct packet *p; for(;;) { - if(!f) - f = xmalloc(sizeof *f); - n = read(rtpfd, packet.bytes, sizeof packet.bytes); + /* Get the next packet */ + pthread_mutex_lock(&receive_lock); + while(!received_packets) + pthread_cond_wait(&receive_cond, &receive_lock); + p = received_packets; + received_packets = p->next; + if(!received_packets) + received_tail = &received_packets; + --nreceived; + pthread_mutex_unlock(&receive_lock); + /* Add it to the heap */ + pthread_mutex_lock(&lock); + pheap_insert(&packets, p); + nsamples += p->nsamples; + pthread_cond_broadcast(&cond); + pthread_mutex_unlock(&lock); + } +} + +/** @brief Background thread collecting samples + * + * This function collects samples, perhaps converts them to the target format, + * and adds them to the packet list. + * + * It is crucial that the gap between successive calls to read() is as small as + * possible: otherwise packets will be dropped. + * + * We use a binary heap to ensure that the unavoidable effort is at worst + * logarithmic in the total number of packets - in fact if packets are mostly + * received in order then we will largely do constant work per packet since the + * newest packet will always be last. + * + * Of more concern is that we must acquire the lock on the heap to add a packet + * to it. If this proves a problem in practice then the answer would be + * (probably doubly) linked list with new packets added the end and a second + * thread which reads packets off the list and adds them to the heap. + * + * We keep memory allocation (mostly) very fast by keeping pre-allocated + * packets around; see @ref new_packet(). + */ +static void *listen_thread(void attribute((unused)) *arg) { + struct packet *p = 0; + int n; + struct rtp_header header; + uint16_t seq; + uint32_t timestamp; + struct iovec iov[2]; + + for(;;) { + if(!p) + p = new_packet(); + iov[0].iov_base = &header; + iov[0].iov_len = sizeof header; + iov[1].iov_base = p->samples_raw; + iov[1].iov_len = sizeof p->samples_raw / sizeof *p->samples_raw; + n = readv(rtpfd, iov, 2); if(n < 0) { switch(errno) { case EINTR: @@ -106,93 +411,373 @@ static listen_thread(void attribute((unused)) *arg) { fatal(errno, "error reading from socket"); } } -#if HAVE_COREAUDIO_AUDIOHARDWARE_H + /* Ignore too-short packets */ + if((size_t)n <= sizeof (struct rtp_header)) { + info("ignored a short packet"); + continue; + } + timestamp = htonl(header.timestamp); + seq = htons(header.seq); + /* Ignore packets in the past */ + if(active && lt(timestamp, next_timestamp)) { + info("dropping old packet, timestamp=%"PRIx32" < %"PRIx32, + timestamp, next_timestamp); + continue; + } + p->next = 0; + p->flags = 0; + p->timestamp = timestamp; /* Convert to target format */ - switch(packet.header.mtp & 0x7F) { + if(header.mpt & 0x80) + p->flags |= IDLE; + switch(header.mpt & 0x7F) { case 10: - f->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t); - for(i = 0; i < f->nsamples; ++i) - f->samples[i] = (int16_t)ntohs(samples[i]) * (0.5f / 32767); + p->nsamples = (n - sizeof header) / sizeof(uint16_t); break; /* TODO support other RFC3551 media types (when the speaker does) */ default: - fatal(0, "unsupported RTP payload type %d", - packet.header.mpt & 0x7F); + fatal(0, "unsupported RTP payload type %d", + header.mpt & 0x7F); } -#endif - f->used = 0; - f->timestamp = ntohl(packet.header.timestamp); - pthread_mutex_lock(&lock); - /* Stop reading if we've reached the maximum */ - while(nsamples >= MAXBUFFER) - pthread_cond_wait(&cond, &lock); - for(ff = &frames; *ff && !gt(*ff, f); ff = &(*ff)->next) - ; - f->next = *ff; - *ff = f; - nsamples += f->nsamples; - pthread_cond_broadcast(&cond); - pthread_mutex_unlock(&lock); - f = 0; + if(logfp) + fprintf(logfp, "sequence %u timestamp %"PRIx32" length %"PRIx32" end %"PRIx32"\n", + seq, timestamp, p->nsamples, timestamp + p->nsamples); + /* Stop reading if we've reached the maximum. + * + * This is rather unsatisfactory: it means that if packets get heavily + * out of order then we guarantee dropouts. But for now... */ + if(nsamples >= maxbuffer) { + pthread_mutex_lock(&lock); + while(nsamples >= maxbuffer) + pthread_cond_wait(&cond, &lock); + pthread_mutex_unlock(&lock); + } + /* Add the packet to the receive queue */ + pthread_mutex_lock(&receive_lock); + *received_tail = p; + received_tail = &p->next; + ++nreceived; + pthread_cond_signal(&receive_cond); + pthread_mutex_unlock(&receive_lock); + /* We'll need a new packet */ + p = 0; + } +} + +/** @brief Return true if @p p contains @p timestamp + * + * Containment implies that a sample @p timestamp exists within the packet. + */ +static inline int contains(const struct packet *p, uint32_t timestamp) { + const uint32_t packet_start = p->timestamp; + const uint32_t packet_end = p->timestamp + p->nsamples; + + return (ge(timestamp, packet_start) + && lt(timestamp, packet_end)); +} + +/** @brief Wait until the buffer is adequately full + * + * Must be called with @ref lock held. + */ +static void fill_buffer(void) { + while(nsamples) + drop_first_packet(); + info("Buffering..."); + while(nsamples < readahead) + pthread_cond_wait(&cond, &lock); + next_timestamp = pheap_first(&packets)->timestamp; + active = 1; +} + +/** @brief Find next packet + * @return Packet to play or NULL if none found + * + * The return packet is merely guaranteed not to be in the past: it might be + * the first packet in the future rather than one that is actually suitable to + * play. + * + * Must be called with @ref lock held. + */ +static struct packet *next_packet(void) { + while(pheap_count(&packets)) { + struct packet *const p = pheap_first(&packets); + if(le(p->timestamp + p->nsamples, next_timestamp)) { + /* This packet is in the past. Drop it and try another one. */ + drop_first_packet(); + } else + /* This packet is NOT in the past. (It might be in the future + * however.) */ + return p; } + return 0; } #if HAVE_COREAUDIO_AUDIOHARDWARE_H -static OSStatus adioproc(AudioDeviceID inDevice, - const AudioTimeStamp *inNow, - const AudioBufferList *inInputData, - const AudioTimeStamp *inInputTime, - AudioBufferList *outOutputData, - const AudioTimeStamp *inOutputTime, - void *inClientData) { +/** @brief Callback from Core Audio */ +static OSStatus adioproc + (AudioDeviceID attribute((unused)) inDevice, + const AudioTimeStamp attribute((unused)) *inNow, + const AudioBufferList attribute((unused)) *inInputData, + const AudioTimeStamp attribute((unused)) *inInputTime, + AudioBufferList *outOutputData, + const AudioTimeStamp attribute((unused)) *inOutputTime, + void attribute((unused)) *inClientData) { UInt32 nbuffers = outOutputData->mNumberBuffers; AudioBuffer *ab = outOutputData->mBuffers; - float *samplesOut; /* where to write samples to */ - size_t samplesOutLeft; /* space left */ - size_t samplesInLeft; - size_t samplesToCopy; - - pthread_mutex_lock(&lock); - samplesOut = ab->data; - samplesOutLeft = ab->mDataByteSize / sizeof (float); - while(frames && nbuffers > 0) { - if(frames->used == frames->nsamples) { - /* TODO if we dropped a packet then we should introduce a gap here */ - struct frame *const f = frames; - frames = f->next; - free(f); - pthread_cond_broadcast(&cond); - continue; - } - if(samplesOutLeft == 0) { - --nbuffers; - ++ab; - samplesOut = ab->data; - samplesOutLeft = ab->mDataByteSize / sizeof (float); - continue; + uint32_t samples_available; + + pthread_mutex_lock(&lock); + while(nbuffers > 0) { + float *samplesOut = ab->mData; + size_t samplesOutLeft = ab->mDataByteSize / sizeof (float); + + while(samplesOutLeft > 0) { + const struct packet *p = next_packet(); + if(p && contains(p, next_timestamp)) { + /* This packet is ready to play */ + const uint32_t packet_end = p->timestamp + p->nsamples; + const uint32_t offset = next_timestamp - p->timestamp; + const uint16_t *ptr = (void *)(p->samples_raw + offset); + + samples_available = packet_end - next_timestamp; + if(samples_available > samplesOutLeft) + samples_available = samplesOutLeft; + next_timestamp += samples_available; + samplesOutLeft -= samples_available; + while(samples_available-- > 0) + *samplesOut++ = (int16_t)ntohs(*ptr++) * (0.5 / 32767); + /* We don't bother junking the packet - that'll be dealt with next time + * round */ + } else { + /* No packet is ready to play (and there might be no packet at all) */ + samples_available = p ? p->timestamp - next_timestamp + : samplesOutLeft; + if(samples_available > samplesOutLeft) + samples_available = samplesOutLeft; + //info("infill by %"PRIu32, samples_available); + /* Conveniently the buffer is 0 to start with */ + next_timestamp += samples_available; + samplesOut += samples_available; + samplesOutLeft -= samples_available; + } } - /* Now: (1) there is some data left to read - * (2) there is some space to put it */ - samplesInLeft = frames->nsamples - frames->used; - samplesToCopy = (samplesInLeft < samplesOutLeft - ? samplesInLeft : samplesOutLeft); - memcpy(samplesOut, frame->samples + frames->used, samplesToCopy); - frames->used += samplesToCopy; - samplesOut += samplesToCopy; - samesOutLeft -= samplesToCopy; + ++ab; + --nbuffers; } pthread_mutex_unlock(&lock); return 0; } #endif -void play_rtp(void) { - pthread_t lt; + +#if API_ALSA +/** @brief PCM handle */ +static snd_pcm_t *pcm; + +/** @brief True when @ref pcm is up and running */ +static int alsa_prepared = 1; + +/** @brief Initialize @ref pcm */ +static void setup_alsa(void) { + snd_pcm_hw_params_t *hwparams; + snd_pcm_sw_params_t *swparams; + /* Only support one format for now */ + const int sample_format = SND_PCM_FORMAT_S16_BE; + unsigned rate = 44100; + const int channels = 2; + const int samplesize = channels * sizeof(uint16_t); + snd_pcm_uframes_t pcm_bufsize = MAXSAMPLES * samplesize * 3; + /* If we can write more than this many samples we'll get a wakeup */ + const int avail_min = 256; + int err; + + /* Open ALSA */ + if((err = snd_pcm_open(&pcm, + device ? device : "default", + SND_PCM_STREAM_PLAYBACK, + SND_PCM_NONBLOCK))) + fatal(0, "error from snd_pcm_open: %d", err); + /* Set up 'hardware' parameters */ + snd_pcm_hw_params_alloca(&hwparams); + if((err = snd_pcm_hw_params_any(pcm, hwparams)) < 0) + fatal(0, "error from snd_pcm_hw_params_any: %d", err); + if((err = snd_pcm_hw_params_set_access(pcm, hwparams, + SND_PCM_ACCESS_RW_INTERLEAVED)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_access: %d", err); + if((err = snd_pcm_hw_params_set_format(pcm, hwparams, + sample_format)) < 0) + + fatal(0, "error from snd_pcm_hw_params_set_format (%d): %d", + sample_format, err); + if((err = snd_pcm_hw_params_set_rate_near(pcm, hwparams, &rate, 0)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_rate (%d): %d", + rate, err); + if((err = snd_pcm_hw_params_set_channels(pcm, hwparams, + channels)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_channels (%d): %d", + channels, err); + if((err = snd_pcm_hw_params_set_buffer_size_near(pcm, hwparams, + &pcm_bufsize)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_buffer_size (%d): %d", + MAXSAMPLES * samplesize * 3, err); + if((err = snd_pcm_hw_params(pcm, hwparams)) < 0) + fatal(0, "error calling snd_pcm_hw_params: %d", err); + /* Set up 'software' parameters */ + snd_pcm_sw_params_alloca(&swparams); + if((err = snd_pcm_sw_params_current(pcm, swparams)) < 0) + fatal(0, "error calling snd_pcm_sw_params_current: %d", err); + if((err = snd_pcm_sw_params_set_avail_min(pcm, swparams, avail_min)) < 0) + fatal(0, "error calling snd_pcm_sw_params_set_avail_min %d: %d", + avail_min, err); + if((err = snd_pcm_sw_params(pcm, swparams)) < 0) + fatal(0, "error calling snd_pcm_sw_params: %d", err); +} + +/** @brief Wait until ALSA wants some audio */ +static void wait_alsa(void) { + struct pollfd fds[64]; + int nfds, err; + unsigned short events; + + for(;;) { + do { + if((nfds = snd_pcm_poll_descriptors(pcm, + fds, sizeof fds / sizeof *fds)) < 0) + fatal(0, "error calling snd_pcm_poll_descriptors: %d", nfds); + } while(poll(fds, nfds, -1) < 0 && errno == EINTR); + if((err = snd_pcm_poll_descriptors_revents(pcm, fds, nfds, &events))) + fatal(0, "error calling snd_pcm_poll_descriptors_revents: %d", err); + if(events & POLLOUT) + return; + } +} + +/** @brief Play some sound via ALSA + * @param s Pointer to sample data + * @param n Number of samples + * @return 0 on success, -1 on non-fatal error + */ +static int alsa_writei(const void *s, size_t n) { + /* Do the write */ + const snd_pcm_sframes_t frames_written = snd_pcm_writei(pcm, s, n / 2); + if(frames_written < 0) { + /* Something went wrong */ + switch(frames_written) { + case -EAGAIN: + return 0; + case -EPIPE: + error(0, "error calling snd_pcm_writei: %ld", + (long)frames_written); + return -1; + default: + fatal(0, "error calling snd_pcm_writei: %ld", + (long)frames_written); + } + } else { + /* Success */ + next_timestamp += frames_written * 2; + return 0; + } +} + +/** @brief Play the relevant part of a packet + * @param p Packet to play + * @return 0 on success, -1 on non-fatal error + */ +static int alsa_play(const struct packet *p) { + return alsa_writei(p->samples_raw + next_timestamp - p->timestamp, + (p->timestamp + p->nsamples) - next_timestamp); +} + +/** @brief Play some silence + * @param p Next packet or NULL + * @return 0 on success, -1 on non-fatal error + */ +static int alsa_infill(const struct packet *p) { + static const uint16_t zeros[INFILL_SAMPLES]; + size_t samples_available = INFILL_SAMPLES; + + if(p && samples_available > p->timestamp - next_timestamp) + samples_available = p->timestamp - next_timestamp; + return alsa_writei(zeros, samples_available); +} + +/** @brief Reset ALSA state after we lost synchronization */ +static void alsa_reset(int hard_reset) { + int err; + + if((err = snd_pcm_nonblock(pcm, 0))) + fatal(0, "error calling snd_pcm_nonblock: %d", err); + if(hard_reset) { + if((err = snd_pcm_drop(pcm))) + fatal(0, "error calling snd_pcm_drop: %d", err); + } else + if((err = snd_pcm_drain(pcm))) + fatal(0, "error calling snd_pcm_drain: %d", err); + if((err = snd_pcm_nonblock(pcm, 1))) + fatal(0, "error calling snd_pcm_nonblock: %d", err); + alsa_prepared = 0; +} +#endif + +/** @brief Play an RTP stream + * + * This is the guts of the program. It is responsible for: + * - starting the listening thread + * - opening the audio device + * - reading ahead to build up a buffer + * - arranging for audio to be played + * - detecting when the buffer has got too small and re-buffering + */ +static void play_rtp(void) { + pthread_t ltid; /* We receive and convert audio data in a background thread */ - pthread_create(<, 0, listen_thread, 0); + pthread_create(<id, 0, listen_thread, 0); + /* We have a second thread to add received packets to the queue */ + pthread_create(<id, 0, queue_thread, 0); #if API_ALSA - assert(!"implemented"); + { + struct packet *p; + int escape, err; + + /* Open the sound device */ + setup_alsa(); + pthread_mutex_lock(&lock); + for(;;) { + /* Wait for the buffer to fill up a bit */ + fill_buffer(); + if(!alsa_prepared) { + if((err = snd_pcm_prepare(pcm))) + fatal(0, "error calling snd_pcm_prepare: %d", err); + alsa_prepared = 1; + } + escape = 0; + info("Playing..."); + /* Keep playing until the buffer empties out, or ALSA tells us to get + * lost */ + while(nsamples >= minbuffer && !escape) { + /* Wait for ALSA to ask us for more data */ + pthread_mutex_unlock(&lock); + wait_alsa(); + pthread_mutex_lock(&lock); + /* ALSA is ready for more data, find something to play */ + p = next_packet(); + /* Play it or play some silence */ + if(contains(p, next_timestamp)) + escape = alsa_play(p); + else + escape = alsa_infill(p); + } + active = 0; + /* We stop playing for a bit until the buffer re-fills */ + pthread_mutex_unlock(&lock); + alsa_reset(escape); + pthread_mutex_lock(&lock); + } + + } #elif HAVE_COREAUDIO_AUDIOHARDWARE_H { OSStatus status; @@ -218,14 +803,14 @@ void play_rtp(void) { if(status) fatal(0, "AudioHardwareGetProperty: %d", (int)status); D(("mSampleRate %f", asbd.mSampleRate)); - D(("mFormatID %08"PRIx32, asbd.mFormatID)); - D(("mFormatFlags %08"PRIx32, asbd.mFormatFlags)); - D(("mBytesPerPacket %08"PRIx32, asbd.mBytesPerPacket)); - D(("mFramesPerPacket %08"PRIx32, asbd.mFramesPerPacket)); - D(("mBytesPerFrame %08"PRIx32, asbd.mBytesPerFrame)); - D(("mChannelsPerFrame %08"PRIx32, asbd.mChannelsPerFrame)); - D(("mBitsPerChannel %08"PRIx32, asbd.mBitsPerChannel)); - D(("mReserved %08"PRIx32, asbd.mReserved)); + D(("mFormatID %08lx", asbd.mFormatID)); + D(("mFormatFlags %08lx", asbd.mFormatFlags)); + D(("mBytesPerPacket %08lx", asbd.mBytesPerPacket)); + D(("mFramesPerPacket %08lx", asbd.mFramesPerPacket)); + D(("mBytesPerFrame %08lx", asbd.mBytesPerFrame)); + D(("mChannelsPerFrame %08lx", asbd.mChannelsPerFrame)); + D(("mBitsPerChannel %08lx", asbd.mBitsPerChannel)); + D(("mReserved %08lx", asbd.mReserved)); if(asbd.mFormatID != kAudioFormatLinearPCM) fatal(0, "audio device does not support kAudioFormatLinearPCM"); status = AudioDeviceAddIOProc(adid, adioproc, 0); @@ -234,19 +819,22 @@ void play_rtp(void) { pthread_mutex_lock(&lock); for(;;) { /* Wait for the buffer to fill up a bit */ - while(nsamples < READAHEAD) - pthread_cond_wait(&cond, &lock); + fill_buffer(); /* Start playing now */ + info("Playing..."); + next_timestamp = pheap_first(&packets)->timestamp; + active = 1; status = AudioDeviceStart(adid, adioproc); if(status) fatal(0, "AudioDeviceStart: %d", (int)status); /* Wait until the buffer empties out */ - while(nsamples >= MINBUFFER) + while(nsamples >= minbuffer) pthread_cond_wait(&cond, &lock); /* Stop playing for a bit until the buffer re-fills */ status = AudioDeviceStop(adid, adioproc); if(status) fatal(0, "AudioDeviceStop: %d", (int)status); + active = 0; /* Go back round */ } } @@ -260,9 +848,14 @@ static void help(void) { xprintf("Usage:\n" " disorder-playrtp [OPTIONS] ADDRESS [PORT]\n" "Options:\n" + " --device, -D DEVICE Output device\n" + " --min, -m FRAMES Buffer low water mark\n" + " --buffer, -b FRAMES Buffer high water mark\n" + " --max, -x FRAMES Buffer maximum size\n" + " --rcvbuf, -R BYTES Socket receive buffer size\n" " --help, -h Display usage message\n" " --version, -V Display version number\n" - " --debug, -d Turn on debugging\n"); + ); xfclose(stdout); exit(0); } @@ -278,9 +871,11 @@ int main(int argc, char **argv) { int n; struct addrinfo *res; struct stringlist sl; - const char *sockname; + char *sockname; + int rcvbuf, target_rcvbuf = 131072; + socklen_t len; - static const struct addrinfo prefbind = { + static const struct addrinfo prefs = { AI_PASSIVE, PF_INET, SOCK_DGRAM, @@ -293,14 +888,22 @@ int main(int argc, char **argv) { mem_init(); if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale"); - while((n = getopt_long(argc, argv, "hVd", options, 0)) >= 0) { + while((n = getopt_long(argc, argv, "hVdD:m:b:x:L:R:", options, 0)) >= 0) { switch(n) { case 'h': help(); case 'V': version(); case 'd': debugging = 1; break; + case 'D': device = optarg; break; + case 'm': minbuffer = 2 * atol(optarg); break; + case 'b': readahead = 2 * atol(optarg); break; + case 'x': maxbuffer = 2 * atol(optarg); break; + case 'L': logfp = fopen(optarg, "w"); break; + case 'R': target_rcvbuf = atoi(optarg); break; default: fatal(0, "invalid option"); } } + if(!maxbuffer) + maxbuffer = 4 * readahead; argc -= optind; argv += optind; if(argc < 1 || argc > 2) @@ -308,7 +911,7 @@ int main(int argc, char **argv) { sl.n = argc; sl.s = argv; /* Listen for inbound audio data */ - if(!(res = get_address(&sl, &pref, &sockname))) + if(!(res = get_address(&sl, &prefs, &sockname))) exit(1); if((rtpfd = socket(res->ai_family, res->ai_socktype, @@ -316,6 +919,22 @@ int main(int argc, char **argv) { fatal(errno, "error creating socket"); if(bind(rtpfd, res->ai_addr, res->ai_addrlen) < 0) fatal(errno, "error binding socket to %s", sockname); + len = sizeof rcvbuf; + if(getsockopt(rtpfd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &len) < 0) + fatal(errno, "error calling getsockopt SO_RCVBUF"); + if(target_rcvbuf > rcvbuf) { + if(setsockopt(rtpfd, SOL_SOCKET, SO_RCVBUF, + &target_rcvbuf, sizeof target_rcvbuf) < 0) + error(errno, "error calling setsockopt SO_RCVBUF %d", + target_rcvbuf); + /* We try to carry on anyway */ + else + info("changed socket receive buffer from %d to %d", + rcvbuf, target_rcvbuf); + } else + info("default socket receive buffer %d", rcvbuf); + if(logfp) + info("WARNING: -L option can impact performance"); play_rtp(); return 0; }