chiark
/
gitweb
/
~mdw
/
disorder
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
0b75463
)
correct next_timestamp logic
author
rjk@greenend.org.uk
<>
Sun, 16 Sep 2007 18:52:30 +0000
(19:52 +0100)
committer
rjk@greenend.org.uk
<>
Sun, 16 Sep 2007 18:52:30 +0000
(19:52 +0100)
clients/playrtp.c
patch
|
blob
|
blame
|
history
diff --git
a/clients/playrtp.c
b/clients/playrtp.c
index 7f2b2385169ad839befb3286eb40e1792513d888..5606fb5194f2d80e195bd6a4a4abea7d7db04aa2 100644
(file)
--- a/
clients/playrtp.c
+++ b/
clients/playrtp.c
@@
-108,10
+108,15
@@
static struct packet *packets;
/** @brief Timestamp of next packet to play.
*
* This is set to the timestamp of the last packet, plus the number of
/** @brief Timestamp of next packet to play.
*
* This is set to the timestamp of the last packet, plus the number of
- * samples it contained.
+ * samples it contained.
Only valid if @ref active is nonzero.
*/
static uint32_t next_timestamp;
*/
static uint32_t next_timestamp;
+/** @brief True if actively playing
+ *
+ * This is true when playing and false when just buffering. */
+static int active;
+
/** @brief Lock protecting @ref packets */
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
/** @brief Lock protecting @ref packets */
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
@@
-127,16
+132,16
@@
static const struct option options[] = {
};
/** @brief Return true iff a < b in sequence-space arithmetic */
};
/** @brief Return true iff a < b in sequence-space arithmetic */
-static inline int lt(
const struct packet *a, const struct packet *
b) {
- return (uint32_t)(a
->timestamp - b->timestamp
) & 0x80000000;
+static inline int lt(
uint32_t a, uint32_t
b) {
+ return (uint32_t)(a
- b
) & 0x80000000;
}
}
-/** Background thread collecting samples
+/**
@brief
Background thread collecting samples
*
* This function collects samples, perhaps converts them to the target format,
* and adds them to the packet list. */
static void *listen_thread(void attribute((unused)) *arg) {
*
* This function collects samples, perhaps converts them to the target format,
* and adds them to the packet list. */
static void *listen_thread(void attribute((unused)) *arg) {
- struct packet *
f = 0, **ff
;
+ struct packet *
p = 0, **pp
;
int n;
union {
struct rtp_header header;
int n;
union {
struct rtp_header header;
@@
-146,8
+151,8
@@
static void *listen_thread(void attribute((unused)) *arg) {
+ sizeof (struct rtp_header));
for(;;) {
+ sizeof (struct rtp_header));
for(;;) {
- if(!
f
)
-
f = xmalloc(sizeof *f
);
+ if(!
p
)
+
p = xmalloc(sizeof *p
);
n = read(rtpfd, packet.bytes, sizeof packet.bytes);
if(n < 0) {
switch(errno) {
n = read(rtpfd, packet.bytes, sizeof packet.bytes);
if(n < 0) {
switch(errno) {
@@
-160,18
+165,23
@@
static void *listen_thread(void attribute((unused)) *arg) {
/* Ignore too-short packets */
if((size_t)n <= sizeof (struct rtp_header))
continue;
/* Ignore too-short packets */
if((size_t)n <= sizeof (struct rtp_header))
continue;
+ p->nused = 0;
+ p->timestamp = ntohl(packet.header.timestamp);
+ /* Ignore packets in the past */
+ if(active && lt(p->timestamp, next_timestamp))
+ continue;
/* Convert to target format */
switch(packet.header.mpt & 0x7F) {
case 10:
/* Convert to target format */
switch(packet.header.mpt & 0x7F) {
case 10:
-
f
->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t);
+
p
->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t);
#if HAVE_COREAUDIO_AUDIOHARDWARE_H
/* Convert to what Core Audio expects */
#if HAVE_COREAUDIO_AUDIOHARDWARE_H
/* Convert to what Core Audio expects */
- for(n = 0; n <
f
->nsamples; ++n)
-
f
->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767);
+ for(n = 0; n <
p
->nsamples; ++n)
+
p
->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767);
#else
/* ALSA can do any necessary conversion itself (though it might be better
* to do any necessary conversion in the background) */
#else
/* ALSA can do any necessary conversion itself (though it might be better
* to do any necessary conversion in the background) */
- memcpy(
f
->samples_raw, samples, n - sizeof (struct rtp_header));
+ memcpy(
p
->samples_raw, samples, n - sizeof (struct rtp_header));
#endif
break;
/* TODO support other RFC3551 media types (when the speaker does) */
#endif
break;
/* TODO support other RFC3551 media types (when the speaker does) */
@@
-179,8
+189,6
@@
static void *listen_thread(void attribute((unused)) *arg) {
fatal(0, "unsupported RTP payload type %d",
packet.header.mpt & 0x7F);
}
fatal(0, "unsupported RTP payload type %d",
packet.header.mpt & 0x7F);
}
- f->nused = 0;
- f->timestamp = ntohl(packet.header.timestamp);
pthread_mutex_lock(&lock);
/* Stop reading if we've reached the maximum.
*
pthread_mutex_lock(&lock);
/* Stop reading if we've reached the maximum.
*
@@
-188,25
+196,27
@@
static void *listen_thread(void attribute((unused)) *arg) {
* out of order then we guarantee dropouts. But for now... */
while(nsamples >= MAXBUFFER)
pthread_cond_wait(&cond, &lock);
* out of order then we guarantee dropouts. But for now... */
while(nsamples >= MAXBUFFER)
pthread_cond_wait(&cond, &lock);
- for(ff = &packets; *ff && lt(*ff, f); ff = &(*ff)->next)
+ for(pp = &packets;
+ *pp && lt((*pp)->timestamp, p->timestamp);
+ pp = &(*pp)->next)
;
;
- /* So now either !*
ff or *ff >= f
*/
- if(*
ff && f->timestamp == (*ff
)->timestamp) {
- /* *
ff == f
; a duplicate. Ideally we avoid the translation step here,
+ /* So now either !*
pp or *pp >= p
*/
+ if(*
pp && p->timestamp == (*pp
)->timestamp) {
+ /* *
pp == p
; a duplicate. Ideally we avoid the translation step here,
* but we'll worry about that another time. */
* but we'll worry about that another time. */
- free(f);
} else {
} else {
-
f->next = *ff
;
- *
ff = f
;
- nsamples +=
f
->nsamples;
+
p->next = *pp
;
+ *
pp = p
;
+ nsamples +=
p
->nsamples;
pthread_cond_broadcast(&cond);
pthread_cond_broadcast(&cond);
+ p = 0; /* we've consumed this packet */
}
pthread_mutex_unlock(&lock);
}
pthread_mutex_unlock(&lock);
- f = 0;
}
}
#if HAVE_COREAUDIO_AUDIOHARDWARE_H
}
}
#if HAVE_COREAUDIO_AUDIOHARDWARE_H
+/** @brief Callback from Core Audio */
static OSStatus adioproc(AudioDeviceID inDevice,
const AudioTimeStamp *inNow,
const AudioBufferList *inInputData,
static OSStatus adioproc(AudioDeviceID inDevice,
const AudioTimeStamp *inNow,
const AudioBufferList *inInputData,
@@
-227,9
+237,9
@@
static OSStatus adioproc(AudioDeviceID inDevice,
while(packets && nbuffers > 0) {
if(packets->used == packets->nsamples) {
/* TODO if we dropped a packet then we should introduce a gap here */
while(packets && nbuffers > 0) {
if(packets->used == packets->nsamples) {
/* TODO if we dropped a packet then we should introduce a gap here */
- struct packet *const
f
= packets;
- packets =
f
->next;
- free(
f
);
+ struct packet *const
p
= packets;
+ packets =
p
->next;
+ free(
p
);
pthread_cond_broadcast(&cond);
continue;
}
pthread_cond_broadcast(&cond);
continue;
}
@@
-255,6
+265,15
@@
static OSStatus adioproc(AudioDeviceID inDevice,
}
#endif
}
#endif
+/** @brief Play an RTP stream
+ *
+ * This is the guts of the program. It is responsible for:
+ * - starting the listening thread
+ * - opening the audio device
+ * - reading ahead to build up a buffer
+ * - arranging for audio to be played
+ * - detecting when the buffer has got too small and re-buffering
+ */
static void play_rtp(void) {
pthread_t ltid;
static void play_rtp(void) {
pthread_t ltid;
@@
-330,13
+349,16
@@
static void play_rtp(void) {
fatal(0, "error calling snd_pcm_prepare: %d", err);
prepared = 1;
}
fatal(0, "error calling snd_pcm_prepare: %d", err);
prepared = 1;
}
+ /* Start at the first available packet */
+ next_timestamp = packets->timestamp;
+ active = 1;
/* Wait until the buffer empties out */
while(nsamples >= MINBUFFER) {
/* Wait for ALSA to ask us for more data */
pthread_mutex_unlock(&lock);
snd_pcm_wait(pcm, -1);
pthread_mutex_lock(&lock);
/* Wait until the buffer empties out */
while(nsamples >= MINBUFFER) {
/* Wait for ALSA to ask us for more data */
pthread_mutex_unlock(&lock);
snd_pcm_wait(pcm, -1);
pthread_mutex_lock(&lock);
- /* ALSA
wants
more data */
+ /* ALSA
is ready for
more data */
if(packets && packets->timestamp + packets->nused == next_timestamp) {
/* Hooray, we have a packet we can play */
const size_t samples_available = packets->nsamples - packets->nused;
if(packets && packets->timestamp + packets->nused == next_timestamp) {
/* Hooray, we have a packet we can play */
const size_t samples_available = packets->nsamples - packets->nused;
@@
-351,11
+373,12
@@
static void play_rtp(void) {
packets->nused += samples_written;
next_timestamp += samples_written;
if(packets->nused == packets->nsamples) {
packets->nused += samples_written;
next_timestamp += samples_written;
if(packets->nused == packets->nsamples) {
- struct packet *f = packets;
+ /* We're done with this packet */
+ struct packet *p = packets;
- packets =
f
->next;
- nsamples -=
f
->nsamples;
- free(
f
);
+ packets =
p
->next;
+ nsamples -=
p
->nsamples;
+ free(
p
);
pthread_cond_broadcast(&cond);
}
} else {
pthread_cond_broadcast(&cond);
}
} else {
@@
-373,6
+396,7
@@
static void play_rtp(void) {
next_timestamp += samples_written;
}
}
next_timestamp += samples_written;
}
}
+ active = 0;
/* We stop playing for a bit until the buffer re-fills */
pthread_mutex_unlock(&lock);
if((err = snd_pcm_drain(pcm)))
/* We stop playing for a bit until the buffer re-fills */
pthread_mutex_unlock(&lock);
if((err = snd_pcm_drain(pcm)))