From: rjk@greenend.org.uk <> Date: Sun, 16 Sep 2007 18:52:30 +0000 (+0100) Subject: correct next_timestamp logic X-Git-Tag: debian-1_5_99dev8~243^2~65 X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/disorder/commitdiff_plain/09ee2f0d809da23b6b442233b5c3d94a6e64bdd2?ds=sidebyside correct next_timestamp logic --- diff --git a/clients/playrtp.c b/clients/playrtp.c index 7f2b238..5606fb5 100644 --- a/clients/playrtp.c +++ b/clients/playrtp.c @@ -108,10 +108,15 @@ static struct packet *packets; /** @brief Timestamp of next packet to play. * * This is set to the timestamp of the last packet, plus the number of - * samples it contained. + * samples it contained. Only valid if @ref active is nonzero. */ static uint32_t next_timestamp; +/** @brief True if actively playing + * + * This is true when playing and false when just buffering. */ +static int active; + /** @brief Lock protecting @ref packets */ static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; @@ -127,16 +132,16 @@ static const struct option options[] = { }; /** @brief Return true iff a < b in sequence-space arithmetic */ -static inline int lt(const struct packet *a, const struct packet *b) { - return (uint32_t)(a->timestamp - b->timestamp) & 0x80000000; +static inline int lt(uint32_t a, uint32_t b) { + return (uint32_t)(a - b) & 0x80000000; } -/** Background thread collecting samples +/** @brief Background thread collecting samples * * This function collects samples, perhaps converts them to the target format, * and adds them to the packet list. */ static void *listen_thread(void attribute((unused)) *arg) { - struct packet *f = 0, **ff; + struct packet *p = 0, **pp; int n; union { struct rtp_header header; @@ -146,8 +151,8 @@ static void *listen_thread(void attribute((unused)) *arg) { + sizeof (struct rtp_header)); for(;;) { - if(!f) - f = xmalloc(sizeof *f); + if(!p) + p = xmalloc(sizeof *p); n = read(rtpfd, packet.bytes, sizeof packet.bytes); if(n < 0) { switch(errno) { @@ -160,18 +165,23 @@ static void *listen_thread(void attribute((unused)) *arg) { /* Ignore too-short packets */ if((size_t)n <= sizeof (struct rtp_header)) continue; + p->nused = 0; + p->timestamp = ntohl(packet.header.timestamp); + /* Ignore packets in the past */ + if(active && lt(p->timestamp, next_timestamp)) + continue; /* Convert to target format */ switch(packet.header.mpt & 0x7F) { case 10: - f->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t); + p->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t); #if HAVE_COREAUDIO_AUDIOHARDWARE_H /* Convert to what Core Audio expects */ - for(n = 0; n < f->nsamples; ++n) - f->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767); + for(n = 0; n < p->nsamples; ++n) + p->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767); #else /* ALSA can do any necessary conversion itself (though it might be better * to do any necessary conversion in the background) */ - memcpy(f->samples_raw, samples, n - sizeof (struct rtp_header)); + memcpy(p->samples_raw, samples, n - sizeof (struct rtp_header)); #endif break; /* TODO support other RFC3551 media types (when the speaker does) */ @@ -179,8 +189,6 @@ static void *listen_thread(void attribute((unused)) *arg) { fatal(0, "unsupported RTP payload type %d", packet.header.mpt & 0x7F); } - f->nused = 0; - f->timestamp = ntohl(packet.header.timestamp); pthread_mutex_lock(&lock); /* Stop reading if we've reached the maximum. * @@ -188,25 +196,27 @@ static void *listen_thread(void attribute((unused)) *arg) { * out of order then we guarantee dropouts. But for now... */ while(nsamples >= MAXBUFFER) pthread_cond_wait(&cond, &lock); - for(ff = &packets; *ff && lt(*ff, f); ff = &(*ff)->next) + for(pp = &packets; + *pp && lt((*pp)->timestamp, p->timestamp); + pp = &(*pp)->next) ; - /* So now either !*ff or *ff >= f */ - if(*ff && f->timestamp == (*ff)->timestamp) { - /* *ff == f; a duplicate. Ideally we avoid the translation step here, + /* So now either !*pp or *pp >= p */ + if(*pp && p->timestamp == (*pp)->timestamp) { + /* *pp == p; a duplicate. Ideally we avoid the translation step here, * but we'll worry about that another time. */ - free(f); } else { - f->next = *ff; - *ff = f; - nsamples += f->nsamples; + p->next = *pp; + *pp = p; + nsamples += p->nsamples; pthread_cond_broadcast(&cond); + p = 0; /* we've consumed this packet */ } pthread_mutex_unlock(&lock); - f = 0; } } #if HAVE_COREAUDIO_AUDIOHARDWARE_H +/** @brief Callback from Core Audio */ static OSStatus adioproc(AudioDeviceID inDevice, const AudioTimeStamp *inNow, const AudioBufferList *inInputData, @@ -227,9 +237,9 @@ static OSStatus adioproc(AudioDeviceID inDevice, while(packets && nbuffers > 0) { if(packets->used == packets->nsamples) { /* TODO if we dropped a packet then we should introduce a gap here */ - struct packet *const f = packets; - packets = f->next; - free(f); + struct packet *const p = packets; + packets = p->next; + free(p); pthread_cond_broadcast(&cond); continue; } @@ -255,6 +265,15 @@ static OSStatus adioproc(AudioDeviceID inDevice, } #endif +/** @brief Play an RTP stream + * + * This is the guts of the program. It is responsible for: + * - starting the listening thread + * - opening the audio device + * - reading ahead to build up a buffer + * - arranging for audio to be played + * - detecting when the buffer has got too small and re-buffering + */ static void play_rtp(void) { pthread_t ltid; @@ -330,13 +349,16 @@ static void play_rtp(void) { fatal(0, "error calling snd_pcm_prepare: %d", err); prepared = 1; } + /* Start at the first available packet */ + next_timestamp = packets->timestamp; + active = 1; /* Wait until the buffer empties out */ while(nsamples >= MINBUFFER) { /* Wait for ALSA to ask us for more data */ pthread_mutex_unlock(&lock); snd_pcm_wait(pcm, -1); pthread_mutex_lock(&lock); - /* ALSA wants more data */ + /* ALSA is ready for more data */ if(packets && packets->timestamp + packets->nused == next_timestamp) { /* Hooray, we have a packet we can play */ const size_t samples_available = packets->nsamples - packets->nused; @@ -351,11 +373,12 @@ static void play_rtp(void) { packets->nused += samples_written; next_timestamp += samples_written; if(packets->nused == packets->nsamples) { - struct packet *f = packets; + /* We're done with this packet */ + struct packet *p = packets; - packets = f->next; - nsamples -= f->nsamples; - free(f); + packets = p->next; + nsamples -= p->nsamples; + free(p); pthread_cond_broadcast(&cond); } } else { @@ -373,6 +396,7 @@ static void play_rtp(void) { next_timestamp += samples_written; } } + active = 0; /* We stop playing for a bit until the buffer re-fills */ pthread_mutex_unlock(&lock); if((err = snd_pcm_drain(pcm)))