/** @brief Timestamp of next packet to play.
*
* This is set to the timestamp of the last packet, plus the number of
- * samples it contained.
+ * samples it contained. Only valid if @ref active is nonzero.
*/
static uint32_t next_timestamp;
+/** @brief True if actively playing
+ *
+ * This is true when playing and false when just buffering. */
+static int active;
+
/** @brief Lock protecting @ref packets */
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
};
/** @brief Return true iff a < b in sequence-space arithmetic */
-static inline int lt(const struct packet *a, const struct packet *b) {
- return (uint32_t)(a->timestamp - b->timestamp) & 0x80000000;
+static inline int lt(uint32_t a, uint32_t b) {
+ return (uint32_t)(a - b) & 0x80000000;
}
-/** Background thread collecting samples
+/** @brief Background thread collecting samples
*
* This function collects samples, perhaps converts them to the target format,
* and adds them to the packet list. */
static void *listen_thread(void attribute((unused)) *arg) {
- struct packet *f = 0, **ff;
+ struct packet *p = 0, **pp;
int n;
union {
struct rtp_header header;
+ sizeof (struct rtp_header));
for(;;) {
- if(!f)
- f = xmalloc(sizeof *f);
+ if(!p)
+ p = xmalloc(sizeof *p);
n = read(rtpfd, packet.bytes, sizeof packet.bytes);
if(n < 0) {
switch(errno) {
/* Ignore too-short packets */
if((size_t)n <= sizeof (struct rtp_header))
continue;
+ p->nused = 0;
+ p->timestamp = ntohl(packet.header.timestamp);
+ /* Ignore packets in the past */
+ if(active && lt(p->timestamp, next_timestamp))
+ continue;
/* Convert to target format */
switch(packet.header.mpt & 0x7F) {
case 10:
- f->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t);
+ p->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t);
#if HAVE_COREAUDIO_AUDIOHARDWARE_H
/* Convert to what Core Audio expects */
- for(n = 0; n < f->nsamples; ++n)
- f->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767);
+ for(n = 0; n < p->nsamples; ++n)
+ p->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767);
#else
/* ALSA can do any necessary conversion itself (though it might be better
* to do any necessary conversion in the background) */
- memcpy(f->samples_raw, samples, n - sizeof (struct rtp_header));
+ memcpy(p->samples_raw, samples, n - sizeof (struct rtp_header));
#endif
break;
/* TODO support other RFC3551 media types (when the speaker does) */
fatal(0, "unsupported RTP payload type %d",
packet.header.mpt & 0x7F);
}
- f->nused = 0;
- f->timestamp = ntohl(packet.header.timestamp);
pthread_mutex_lock(&lock);
/* Stop reading if we've reached the maximum.
*
* out of order then we guarantee dropouts. But for now... */
while(nsamples >= MAXBUFFER)
pthread_cond_wait(&cond, &lock);
- for(ff = &packets; *ff && lt(*ff, f); ff = &(*ff)->next)
+ for(pp = &packets;
+ *pp && lt((*pp)->timestamp, p->timestamp);
+ pp = &(*pp)->next)
;
- /* So now either !*ff or *ff >= f */
- if(*ff && f->timestamp == (*ff)->timestamp) {
- /* *ff == f; a duplicate. Ideally we avoid the translation step here,
+ /* So now either !*pp or *pp >= p */
+ if(*pp && p->timestamp == (*pp)->timestamp) {
+ /* *pp == p; a duplicate. Ideally we avoid the translation step here,
* but we'll worry about that another time. */
- free(f);
} else {
- f->next = *ff;
- *ff = f;
- nsamples += f->nsamples;
+ p->next = *pp;
+ *pp = p;
+ nsamples += p->nsamples;
pthread_cond_broadcast(&cond);
+ p = 0; /* we've consumed this packet */
}
pthread_mutex_unlock(&lock);
- f = 0;
}
}
#if HAVE_COREAUDIO_AUDIOHARDWARE_H
+/** @brief Callback from Core Audio */
static OSStatus adioproc(AudioDeviceID inDevice,
const AudioTimeStamp *inNow,
const AudioBufferList *inInputData,
while(packets && nbuffers > 0) {
if(packets->used == packets->nsamples) {
/* TODO if we dropped a packet then we should introduce a gap here */
- struct packet *const f = packets;
- packets = f->next;
- free(f);
+ struct packet *const p = packets;
+ packets = p->next;
+ free(p);
pthread_cond_broadcast(&cond);
continue;
}
}
#endif
+/** @brief Play an RTP stream
+ *
+ * This is the guts of the program. It is responsible for:
+ * - starting the listening thread
+ * - opening the audio device
+ * - reading ahead to build up a buffer
+ * - arranging for audio to be played
+ * - detecting when the buffer has got too small and re-buffering
+ */
static void play_rtp(void) {
pthread_t ltid;
fatal(0, "error calling snd_pcm_prepare: %d", err);
prepared = 1;
}
+ /* Start at the first available packet */
+ next_timestamp = packets->timestamp;
+ active = 1;
/* Wait until the buffer empties out */
while(nsamples >= MINBUFFER) {
/* Wait for ALSA to ask us for more data */
pthread_mutex_unlock(&lock);
snd_pcm_wait(pcm, -1);
pthread_mutex_lock(&lock);
- /* ALSA wants more data */
+ /* ALSA is ready for more data */
if(packets && packets->timestamp + packets->nused == next_timestamp) {
/* Hooray, we have a packet we can play */
const size_t samples_available = packets->nsamples - packets->nused;
packets->nused += samples_written;
next_timestamp += samples_written;
if(packets->nused == packets->nsamples) {
- struct packet *f = packets;
+ /* We're done with this packet */
+ struct packet *p = packets;
- packets = f->next;
- nsamples -= f->nsamples;
- free(f);
+ packets = p->next;
+ nsamples -= p->nsamples;
+ free(p);
pthread_cond_broadcast(&cond);
}
} else {
next_timestamp += samples_written;
}
}
+ active = 0;
/* We stop playing for a bit until the buffer re-fills */
pthread_mutex_unlock(&lock);
if((err = snd_pcm_drain(pcm)))