From 2c7c9eae95f4161c90d270afc9d5f41a37874bba Mon Sep 17 00:00:00 2001 Message-Id: <2c7c9eae95f4161c90d270afc9d5f41a37874bba.1715669760.git.mdw@distorted.org.uk> From: Mark Wooding Date: Wed, 19 Sep 2007 00:37:49 +0100 Subject: [PATCH] abolish linked list of packets. (linux still to do.) Organization: Straylight/Edgeware From: Richard Kettlewell --- clients/playrtp.c | 233 +++++++++++++++++++++++++++------------------- 1 file changed, 137 insertions(+), 96 deletions(-) diff --git a/clients/playrtp.c b/clients/playrtp.c index fe9d1e2..398f47b 100644 --- a/clients/playrtp.c +++ b/clients/playrtp.c @@ -30,6 +30,7 @@ #include #include #include +#include #include "log.h" #include "mem.h" @@ -86,37 +87,29 @@ static unsigned maxbuffer; /** @brief Number of samples to infill by in one go */ #define INFILL_SAMPLES (44100 * 2) /* 1s */ -/** @brief Received packet - * - * Packets are recorded in an ordered linked list. */ +/** @brief Received packet */ struct packet { - /** @brief Pointer to next packet - * The next packet might not be immediately next: if packets are dropped - * or mis-ordered there may be gaps at any given moment. */ - struct packet *next; /** @brief Number of samples in this packet */ uint32_t nsamples; /** @brief Timestamp from RTP packet * * NB that "timestamps" are really sample counters.*/ uint32_t timestamp; -#if HAVE_COREAUDIO_AUDIOHARDWARE_H - /** @brief Converted sample data */ - float samples_float[MAXSAMPLES]; -#else /** @brief Raw sample data */ unsigned char samples_raw[MAXSAMPLES * MAXSAMPLESIZE]; -#endif }; /** @brief Total number of samples available */ static unsigned long nsamples; -/** @brief Linked list of packets +/** @brief Mapping of sequence numbers to packets * - * In ascending order of timestamp. Really this should be a heap for more - * efficient access. */ -static struct packet *packets; + * This isn't very efficient - 256KB on 32-bit machines, 512KB if you do a + * 64-bit build for some reason. It can be optimized later if need be. */ +static struct packet *packets[65536]; + +/** @brief Total number of packets */ +static unsigned npackets; /** @brief Timestamp of next packet to play. * @@ -130,6 +123,24 @@ static uint32_t next_timestamp; * This is true when playing and false when just buffering. */ static int active; +/** @brief Sequence number of next packet we expxect to play */ +static uint16_t sequence; + +/** @brief Structure of free packet list */ +union free_packet { + struct packet p; + union free_packet *next; +}; + +/** @brief Linked list of free packets */ +static union free_packet *free_packets; + +/** @brief Array of new free packets */ +static union free_packet *next_free_packet; + +/** @brief Count of new free packets */ +static size_t count_free_packets; + /** @brief Lock protecting @ref packets */ static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; @@ -147,6 +158,35 @@ static const struct option options[] = { { 0, 0, 0, 0 } }; +/** @brief Return a new packet + * + * Assumes that @ref lock is held. */ +static struct packet *new_packet(void) { + struct packet *p; + + if(free_packets) { + p = &free_packets->p; + free_packets = free_packets->next; + } else { + if(!count_free_packets) { + next_free_packet = xcalloc(1024, sizeof (union free_packet)); + count_free_packets = 1024; + } + p = &(next_free_packet++)->p; + --count_free_packets; + } + return p; +} + +/** @brief Free a packet + * + * Assumes that @ref lock is held. */ +static void free_packet(struct packet *p) { + union free_packet *u = (union free_packet *)p; + u->next = free_packets; + free_packets = u; +} + /** @brief Return true iff a < b in sequence-space arithmetic */ static inline int lt(uint32_t a, uint32_t b) { return (uint32_t)(a - b) & 0x80000000; @@ -168,12 +208,14 @@ static inline int le(uint32_t a, uint32_t b) { } /** @brief Drop the packet at the head of the queue */ -static void drop_first_packet(void) { - struct packet *const p = packets; - packets = p->next; - nsamples -= p->nsamples; - free(p); - pthread_cond_broadcast(&cond); +static void drop_packet(unsigned sequence) { + if(packets[sequence]) { + nsamples -= packets[sequence]->nsamples; + free_packet(packets[sequence]); + packets[sequence] = 0; + pthread_cond_broadcast(&cond); + --npackets; + } } /** @brief Background thread collecting samples @@ -181,19 +223,24 @@ static void drop_first_packet(void) { * This function collects samples, perhaps converts them to the target format, * and adds them to the packet list. */ static void *listen_thread(void attribute((unused)) *arg) { - struct packet *p = 0, **pp; + struct packet *p = 0; int n; - union { - struct rtp_header header; - uint8_t bytes[sizeof(uint16_t) * MAXSAMPLES + sizeof (struct rtp_header)]; - } packet; - const uint16_t *const samples = (uint16_t *)(packet.bytes - + sizeof (struct rtp_header)); + struct rtp_header header; + uint16_t seq; + uint32_t timestamp; + struct iovec iov[2]; for(;;) { - if(!p) - p = xmalloc(sizeof *p); - n = read(rtpfd, packet.bytes, sizeof packet.bytes); + if(!p) { + pthread_mutex_lock(&lock); + p = new_packet(); + pthread_mutex_unlock(&lock); + } + iov[0].iov_base = &header; + iov[0].iov_len = sizeof header; + iov[1].iov_base = p->samples_raw; + iov[1].iov_len = sizeof p->samples_raw; + n = readv(rtpfd, iov, 2); if(n < 0) { switch(errno) { case EINTR: @@ -207,41 +254,33 @@ static void *listen_thread(void attribute((unused)) *arg) { info("ignored a short packet"); continue; } - p->timestamp = ntohl(packet.header.timestamp); + timestamp = htonl(header.timestamp); + seq = htons(header.seq); /* Ignore packets in the past */ - if(active && lt(p->timestamp, next_timestamp)) { + if(active && lt(timestamp, next_timestamp)) { info("dropping old packet, timestamp=%"PRIx32" < %"PRIx32, - p->timestamp, next_timestamp); + timestamp, next_timestamp); continue; } + pthread_mutex_lock(&lock); + p = new_packet(); + p->timestamp = timestamp; /* Convert to target format */ - switch(packet.header.mpt & 0x7F) { + switch(header.mpt & 0x7F) { case 10: - p->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t); -#if HAVE_COREAUDIO_AUDIOHARDWARE_H - /* Convert to what Core Audio expects */ - { - size_t i; - - for(i = 0; i < p->nsamples; ++i) - p->samples_float[i] = (int16_t)ntohs(samples[i]) * (0.5f / 32767); - } -#else + p->nsamples = (n - sizeof header) / sizeof(uint16_t); /* ALSA can do any necessary conversion itself (though it might be better * to do any necessary conversion in the background) */ - memcpy(p->samples_raw, samples, n - sizeof (struct rtp_header)); -#endif + /* TODO we could readv into the buffer */ break; /* TODO support other RFC3551 media types (when the speaker does) */ default: fatal(0, "unsupported RTP payload type %d", - packet.header.mpt & 0x7F); + header.mpt & 0x7F); } if(logfp) fprintf(logfp, "sequence %u timestamp %"PRIx32" length %"PRIx32" end %"PRIx32"\n", - ntohs(packet.header.seq), - p->timestamp, p->nsamples, p->timestamp + p->nsamples); - pthread_mutex_lock(&lock); + seq, timestamp, p->nsamples, timestamp + p->nsamples); /* Stop reading if we've reached the maximum. * * This is rather unsatisfactory: it means that if packets get heavily @@ -251,28 +290,30 @@ static void *listen_thread(void attribute((unused)) *arg) { while(nsamples >= maxbuffer) pthread_cond_wait(&cond, &lock); } - for(pp = &packets; - *pp && lt((*pp)->timestamp, p->timestamp); - pp = &(*pp)->next) - ; - /* So now either !*pp or *pp >= p */ - if(*pp && p->timestamp == (*pp)->timestamp) { - /* *pp == p; a duplicate. Ideally we avoid the translation step here, - * but we'll worry about that another time. */ - info("dropped a duplicated"); - } else { - if(*pp) - info("receiving packets out of order"); - p->next = *pp; - *pp = p; - nsamples += p->nsamples; - pthread_cond_broadcast(&cond); - p = 0; /* we've consumed this packet */ - } + /* If there's a packet there already we overwrite it; perhaps it is left + * over from an earlier stage. */ + drop_packet(seq); + /* Record this packet */ + packets[seq] = p; + /* If we currently have no idea where to start playing, this is it */ + if(!npackets) + sequence = seq; + ++npackets; + nsamples += p->nsamples; + pthread_cond_broadcast(&cond); pthread_mutex_unlock(&lock); } } +/** @brief Return true if @p p contains @p timestamp */ +static inline int contains(const struct packet *p, uint32_t timestamp) { + const uint32_t packet_start = p->timestamp; + const uint32_t packet_end = p->timestamp + p->nsamples; + + return (ge(timestamp, packet_start) + && lt(timestamp, packet_end)); +} + #if HAVE_COREAUDIO_AUDIOHARDWARE_H /** @brief Callback from Core Audio */ static OSStatus adioproc @@ -285,48 +326,47 @@ static OSStatus adioproc void attribute((unused)) *inClientData) { UInt32 nbuffers = outOutputData->mNumberBuffers; AudioBuffer *ab = outOutputData->mBuffers; + const struct packet *p; pthread_mutex_lock(&lock); while(nbuffers > 0) { float *samplesOut = ab->mData; size_t samplesOutLeft = ab->mDataByteSize / sizeof (float); - + while(samplesOutLeft > 0) { - if(packets) { - /* There's a packet */ - const uint32_t packet_start = packets->timestamp; - const uint32_t packet_end = packets->timestamp + packets->nsamples; - - if(le(packet_end, next_timestamp)) { - /* This packet is in the past */ - info("dropping buffered past packet %"PRIx32" < %"PRIx32, - packet_start, next_timestamp); - drop_first_packet(); - continue; - } - if(ge(next_timestamp, packet_start) - && lt(next_timestamp, packet_end)) { + /* Look for a suitable packet, dropping any unsuitable ones along the + * way. Unsuitable packets are ones that are in the past. */ + while(npackets + && (!packets[sequence] + || le(packets[sequence]->timestamp + + packets[sequence]->nsamples, + next_timestamp))) + drop_packet(sequence++); + p = packets[sequence]; + if(p) { + if(contains(p, next_timestamp)) { /* This packet is suitable */ - const uint32_t offset = next_timestamp - packet_start; + const uint32_t packet_end = p->timestamp + p->nsamples; + const uint32_t offset = next_timestamp - p->timestamp; + const uint16_t *ptr = + (void *)(p->samples_raw + offset * sizeof (uint16_t)); uint32_t samples_available = packet_end - next_timestamp; if(samples_available > samplesOutLeft) samples_available = samplesOutLeft; - memcpy(samplesOut, - packets->samples_float + offset, - samples_available * sizeof(float)); - samplesOut += samples_available; next_timestamp += samples_available; samplesOutLeft -= samples_available; - if(ge(next_timestamp, packet_end)) - drop_first_packet(); + while(samples_available-- > 0) + *samplesOut++ = (int16_t)ntohs(*ptr++) * (0.5 / 32767); + /* We don't bother junking the packet or advancing sequence - that'll + * be dealt with next time round */ continue; } } /* We didn't find a suitable packet (though there might still be * unsuitable ones). We infill with 0s. */ - if(packets) { + if(p) { /* There is a next packet, only infill up to that point */ - uint32_t samples_available = packets->timestamp - next_timestamp; + uint32_t samples_available = p->timestamp - next_timestamp; if(samples_available > samplesOutLeft) samples_available = samplesOutLeft; @@ -443,8 +483,9 @@ static void play_rtp(void) { fatal(0, "error calling snd_pcm_prepare: %d", err); prepared = 1; } + assert(sequence != -1); /* Start at the first available packet */ - next_timestamp = packets->timestamp; + next_timestamp = packets[sequence]->timestamp; active = 1; infilling = 0; escape = 0; @@ -624,7 +665,7 @@ static void play_rtp(void) { pthread_cond_wait(&cond, &lock); /* Start playing now */ info("Playing..."); - next_timestamp = packets->timestamp; + next_timestamp = packets[sequence]->timestamp; active = 1; status = AudioDeviceStart(adid, adioproc); if(status) -- [mdw]