From: rjk@greenend.org.uk <> Date: Sun, 16 Sep 2007 18:08:46 +0000 (+0100) Subject: now builds on linux X-Git-Tag: debian-1_5_99dev8~243^2~66 X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/disorder/commitdiff_plain/0b75463fed40851e872ddb495ed679a62ff7ed20 now builds on linux --- diff --git a/.bzrignore b/.bzrignore index fb4ade0..71c11f7 100644 --- a/.bzrignore +++ b/.bzrignore @@ -96,3 +96,4 @@ tests/testroot disorder.plist server/uk.org.greenend.rjk.disorder.plist doc/guts +clients/disorder-playrtp diff --git a/clients/Makefile.am b/clients/Makefile.am index b197b90..784ed95 100644 --- a/clients/Makefile.am +++ b/clients/Makefile.am @@ -18,7 +18,7 @@ # USA # -bin_PROGRAMS=disorder disorderfm +bin_PROGRAMS=disorder disorderfm disorder-playrtp noinst_PROGRAMS=test-eclient filename-bytes AM_CPPFLAGS=-I${top_srcdir}/lib -I../lib @@ -34,6 +34,10 @@ disorderfm_SOURCES=disorderfm.c \ disorderfm_LDADD=$(LIBOBJS) ../lib/libdisorder.a $(LIBGC) $(LIBICONV) disorderfm_DEPENDENCIES=$(LIBOBJS) ../lib/libdisorder.a +disorder_playrtp_SOURCES=playrtp.c +disorder_playrtp_LDADD=$(LIBOBJS) ../lib/libdisorder.a $(LIBASOUND) +disorder_playrtp_DEPENDENCIES=$(LIBOBJS) ../lib/libdisorder.a + filename_bytes_SOURCES=filename-bytes.c test_eclient_SOURCES=test-eclient.c \ diff --git a/clients/playrtp.c b/clients/playrtp.c index 7ec35d0..7f2b238 100644 --- a/clients/playrtp.c +++ b/clients/playrtp.c @@ -29,6 +29,7 @@ #include #include #include +#include #include "log.h" #include "mem.h" @@ -36,57 +37,107 @@ #include "addr.h" #include "syscalls.h" #include "rtp.h" -#include "debug.h" +#include "defs.h" #if HAVE_COREAUDIO_AUDIOHARDWARE_H # include #endif +#if API_ALSA +#include +#endif +/** @brief RTP socket */ static int rtpfd; -#define MAXSAMPLES 2048 /* max samples/frame we'll support */ -/* NB two channels = two samples in this program! */ -#define MINBUFFER 8820 /* when to stop playing */ +/** @brief Output device */ +static const char *device; + +/** @brief Maximum samples per packet we'll support + * + * NB that two channels = two samples in this program. + */ +#define MAXSAMPLES 2048 + +/** @brief Minimum buffer size + * + * We'll stop playing if there's only this many samples in the buffer. */ +#define MINBUFFER 8820 + +/** @brief Maximum sample size + * + * The maximum supported size (in bytes) of one sample. */ +#define MAXSAMPLESIZE 2 + #define READAHEAD 88200 /* how far to read ahead */ + #define MAXBUFFER (3 * 88200) /* maximum buffer contents */ -struct frame { - struct frame *next; /* another frame */ - int nsamples; /* number of samples */ - int nused; /* number of samples used so far */ - uint32_t timestamp; /* timestamp from packet */ +/** @brief Received packet + * + * Packets are recorded in an ordered linked list. */ +struct packet { + /** @brief Pointer to next packet + * The next packet might not be immediately next: if packets are dropped + * or mis-ordered there may be gaps at any given moment. */ + struct packet *next; + /** @brief Number of samples in this packet */ + int nsamples; + /** @brief Number of samples used from this packet */ + int nused; + /** @brief Timestamp from RTP packet + * + * NB that "timestamps" are really sample counters.*/ + uint32_t timestamp; #if HAVE_COREAUDIO_AUDIOHARDWARE_H - float samples[MAXSAMPLES]; /* converted sample data */ + /** @brief Converted sample data */ + float samples_float[MAXSAMPLES]; +#else + /** @brief Raw sample data */ + unsigned char samples_raw[MAXSAMPLES * MAXSAMPLESIZE]; #endif }; -static unsigned long nsamples; /* total samples available */ +/** @brief Total number of samples available */ +static unsigned long nsamples; + +/** @brief Linked list of packets + * + * In ascending order of timestamp. */ +static struct packet *packets; + +/** @brief Timestamp of next packet to play. + * + * This is set to the timestamp of the last packet, plus the number of + * samples it contained. + */ +static uint32_t next_timestamp; -static struct frame *frames; /* received frames in ascending order - * of timestamp */ +/** @brief Lock protecting @ref packets */ static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; -/* lock protecting frame list */ -static pthread_cond_t cond = PTHREAD_CONDVAR_INITIALIZER; -/* signalled whenever we add a new frame */ +/** @brief Condition variable signalled whenever @ref packets is changed */ +static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; static const struct option options[] = { { "help", no_argument, 0, 'h' }, { "version", no_argument, 0, 'V' }, { "debug", no_argument, 0, 'd' }, + { "device", required_argument, 0, 'D' }, { 0, 0, 0, 0 } }; -/* Return true iff a > b in sequence-space arithmetic */ -static inline int gt(const struct frame *a, const struct frame *b) { - return (uint32_t)(a->timestamp - b->timestamp) < 0x80000000; +/** @brief Return true iff a < b in sequence-space arithmetic */ +static inline int lt(const struct packet *a, const struct packet *b) { + return (uint32_t)(a->timestamp - b->timestamp) & 0x80000000; } -/* Background thread that reads frames over the network and add them to the - * list */ -static listen_thread(void attribute((unused)) *arg) { - struct frame *f = 0, **ff; - int n, i; +/** Background thread collecting samples + * + * This function collects samples, perhaps converts them to the target format, + * and adds them to the packet list. */ +static void *listen_thread(void attribute((unused)) *arg) { + struct packet *f = 0, **ff; + int n; union { struct rtp_header header; uint8_t bytes[sizeof(uint16_t) * MAXSAMPLES + sizeof (struct rtp_header)]; @@ -106,32 +157,50 @@ static listen_thread(void attribute((unused)) *arg) { fatal(errno, "error reading from socket"); } } -#if HAVE_COREAUDIO_AUDIOHARDWARE_H + /* Ignore too-short packets */ + if((size_t)n <= sizeof (struct rtp_header)) + continue; /* Convert to target format */ - switch(packet.header.mtp & 0x7F) { + switch(packet.header.mpt & 0x7F) { case 10: f->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t); - for(i = 0; i < f->nsamples; ++i) - f->samples[i] = (int16_t)ntohs(samples[i]) * (0.5f / 32767); +#if HAVE_COREAUDIO_AUDIOHARDWARE_H + /* Convert to what Core Audio expects */ + for(n = 0; n < f->nsamples; ++n) + f->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767); +#else + /* ALSA can do any necessary conversion itself (though it might be better + * to do any necessary conversion in the background) */ + memcpy(f->samples_raw, samples, n - sizeof (struct rtp_header)); +#endif break; /* TODO support other RFC3551 media types (when the speaker does) */ default: - fatal(0, "unsupported RTP payload type %d", + fatal(0, "unsupported RTP payload type %d", packet.header.mpt & 0x7F); } -#endif - f->used = 0; + f->nused = 0; f->timestamp = ntohl(packet.header.timestamp); pthread_mutex_lock(&lock); - /* Stop reading if we've reached the maximum */ + /* Stop reading if we've reached the maximum. + * + * This is rather unsatisfactory: it means that if packets get heavily + * out of order then we guarantee dropouts. But for now... */ while(nsamples >= MAXBUFFER) pthread_cond_wait(&cond, &lock); - for(ff = &frames; *ff && !gt(*ff, f); ff = &(*ff)->next) + for(ff = &packets; *ff && lt(*ff, f); ff = &(*ff)->next) ; - f->next = *ff; - *ff = f; - nsamples += f->nsamples; - pthread_cond_broadcast(&cond); + /* So now either !*ff or *ff >= f */ + if(*ff && f->timestamp == (*ff)->timestamp) { + /* *ff == f; a duplicate. Ideally we avoid the translation step here, + * but we'll worry about that another time. */ + free(f); + } else { + f->next = *ff; + *ff = f; + nsamples += f->nsamples; + pthread_cond_broadcast(&cond); + } pthread_mutex_unlock(&lock); f = 0; } @@ -152,14 +221,14 @@ static OSStatus adioproc(AudioDeviceID inDevice, size_t samplesInLeft; size_t samplesToCopy; - pthread_mutex_lock(&lock); + pthread_mutex_lock(&lock); samplesOut = ab->data; samplesOutLeft = ab->mDataByteSize / sizeof (float); - while(frames && nbuffers > 0) { - if(frames->used == frames->nsamples) { + while(packets && nbuffers > 0) { + if(packets->used == packets->nsamples) { /* TODO if we dropped a packet then we should introduce a gap here */ - struct frame *const f = frames; - frames = f->next; + struct packet *const f = packets; + packets = f->next; free(f); pthread_cond_broadcast(&cond); continue; @@ -173,11 +242,11 @@ static OSStatus adioproc(AudioDeviceID inDevice, } /* Now: (1) there is some data left to read * (2) there is some space to put it */ - samplesInLeft = frames->nsamples - frames->used; + samplesInLeft = packets->nsamples - packets->used; samplesToCopy = (samplesInLeft < samplesOutLeft ? samplesInLeft : samplesOutLeft); - memcpy(samplesOut, frame->samples + frames->used, samplesToCopy); - frames->used += samplesToCopy; + memcpy(samplesOut, packet->samples + packets->used, samplesToCopy); + packets->used += samplesToCopy; samplesOut += samplesToCopy; samesOutLeft -= samplesToCopy; } @@ -186,13 +255,133 @@ static OSStatus adioproc(AudioDeviceID inDevice, } #endif -void play_rtp(void) { - pthread_t lt; +static void play_rtp(void) { + pthread_t ltid; /* We receive and convert audio data in a background thread */ - pthread_create(<, 0, listen_thread, 0); + pthread_create(<id, 0, listen_thread, 0); #if API_ALSA - assert(!"implemented"); + { + snd_pcm_t *pcm; + snd_pcm_hw_params_t *hwparams; + snd_pcm_sw_params_t *swparams; + /* Only support one format for now */ + const int sample_format = SND_PCM_FORMAT_S16_BE; + unsigned rate = 44100; + const int channels = 2; + const int samplesize = channels * sizeof(uint16_t); + snd_pcm_uframes_t pcm_bufsize = MAXSAMPLES * samplesize * 3; + /* If we can write more than this many samples we'll get a wakeup */ + const int avail_min = 256; + snd_pcm_sframes_t frames_written; + size_t samples_written; + int prepared = 1; + int err; + + /* Open ALSA */ + if((err = snd_pcm_open(&pcm, + device ? device : "default", + SND_PCM_STREAM_PLAYBACK, + SND_PCM_NONBLOCK))) + fatal(0, "error from snd_pcm_open: %d", err); + /* Set up 'hardware' parameters */ + snd_pcm_hw_params_alloca(&hwparams); + if((err = snd_pcm_hw_params_any(pcm, hwparams)) < 0) + fatal(0, "error from snd_pcm_hw_params_any: %d", err); + if((err = snd_pcm_hw_params_set_access(pcm, hwparams, + SND_PCM_ACCESS_RW_INTERLEAVED)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_access: %d", err); + if((err = snd_pcm_hw_params_set_format(pcm, hwparams, + sample_format)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_format (%d): %d", + sample_format, err); + if((err = snd_pcm_hw_params_set_rate_near(pcm, hwparams, &rate, 0)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_rate (%d): %d", + rate, err); + if((err = snd_pcm_hw_params_set_channels(pcm, hwparams, + channels)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_channels (%d): %d", + channels, err); + if((err = snd_pcm_hw_params_set_buffer_size_near(pcm, hwparams, + &pcm_bufsize)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_buffer_size (%d): %d", + MAXSAMPLES * samplesize * 3, err); + if((err = snd_pcm_hw_params(pcm, hwparams)) < 0) + fatal(0, "error calling snd_pcm_hw_params: %d", err); + /* Set up 'software' parameters */ + snd_pcm_sw_params_alloca(&swparams); + if((err = snd_pcm_sw_params_current(pcm, swparams)) < 0) + fatal(0, "error calling snd_pcm_sw_params_current: %d", err); + if((err = snd_pcm_sw_params_set_avail_min(pcm, swparams, avail_min)) < 0) + fatal(0, "error calling snd_pcm_sw_params_set_avail_min %d: %d", + avail_min, err); + if((err = snd_pcm_sw_params(pcm, swparams)) < 0) + fatal(0, "error calling snd_pcm_sw_params: %d", err); + + /* Ready to go */ + + pthread_mutex_lock(&lock); + for(;;) { + /* Wait for the buffer to fill up a bit */ + while(nsamples < READAHEAD) + pthread_cond_wait(&cond, &lock); + if(!prepared) { + if((err = snd_pcm_prepare(pcm))) + fatal(0, "error calling snd_pcm_prepare: %d", err); + prepared = 1; + } + /* Wait until the buffer empties out */ + while(nsamples >= MINBUFFER) { + /* Wait for ALSA to ask us for more data */ + pthread_mutex_unlock(&lock); + snd_pcm_wait(pcm, -1); + pthread_mutex_lock(&lock); + /* ALSA wants more data */ + if(packets && packets->timestamp + packets->nused == next_timestamp) { + /* Hooray, we have a packet we can play */ + const size_t samples_available = packets->nsamples - packets->nused; + const size_t frames_available = samples_available / 2; + + frames_written = snd_pcm_writei(pcm, + packets->samples_raw + packets->nused, + frames_available); + if(frames_written < 0) + fatal(0, "error calling snd_pcm_writei: %d", err); + samples_written = frames_written * 2; + packets->nused += samples_written; + next_timestamp += samples_written; + if(packets->nused == packets->nsamples) { + struct packet *f = packets; + + packets = f->next; + nsamples -= f->nsamples; + free(f); + pthread_cond_broadcast(&cond); + } + } else { + /* We don't have anything to play! We'd better play some 0s. */ + static const uint16_t zeros[1024]; + size_t samples_available = 1024, frames_available; + if(packets && next_timestamp + samples_available > packets->timestamp) + samples_available = packets->timestamp - next_timestamp; + frames_available = samples_available / 2; + frames_written = snd_pcm_writei(pcm, + zeros, + frames_available); + if(frames_written < 0) + fatal(0, "error calling snd_pcm_writei: %d", err); + next_timestamp += samples_written; + } + } + /* We stop playing for a bit until the buffer re-fills */ + pthread_mutex_unlock(&lock); + if((err = snd_pcm_drain(pcm))) + fatal(0, "error calling snd_pcm_drain: %d", err); + prepared = 0; + pthread_mutex_lock(&lock); + } + + } #elif HAVE_COREAUDIO_AUDIOHARDWARE_H { OSStatus status; @@ -262,7 +451,8 @@ static void help(void) { "Options:\n" " --help, -h Display usage message\n" " --version, -V Display version number\n" - " --debug, -d Turn on debugging\n"); + " --debug, -d Turn on debugging\n" + " --device, -D DEVICE Output device\n"); xfclose(stdout); exit(0); } @@ -278,9 +468,9 @@ int main(int argc, char **argv) { int n; struct addrinfo *res; struct stringlist sl; - const char *sockname; + char *sockname; - static const struct addrinfo prefbind = { + static const struct addrinfo prefs = { AI_PASSIVE, PF_INET, SOCK_DGRAM, @@ -293,11 +483,12 @@ int main(int argc, char **argv) { mem_init(); if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale"); - while((n = getopt_long(argc, argv, "hVd", options, 0)) >= 0) { + while((n = getopt_long(argc, argv, "hVdD", options, 0)) >= 0) { switch(n) { case 'h': help(); case 'V': version(); case 'd': debugging = 1; break; + case 'D': device = optarg; break; default: fatal(0, "invalid option"); } } @@ -308,7 +499,7 @@ int main(int argc, char **argv) { sl.n = argc; sl.s = argv; /* Listen for inbound audio data */ - if(!(res = get_address(&sl, &pref, &sockname))) + if(!(res = get_address(&sl, &prefs, &sockname))) exit(1); if((rtpfd = socket(res->ai_family, res->ai_socktype, diff --git a/lib/rtp.h b/lib/rtp.h index 9159bfb..63ade9b 100644 --- a/lib/rtp.h +++ b/lib/rtp.h @@ -22,7 +22,7 @@ #define RTP_H /* RTP is defined in RFC1889 */ -struct attribute((packed)) rtp { +struct attribute((packed)) rtp_header { uint8_t vpxcc; uint8_t mpt; uint16_t seq; diff --git a/server/speaker.c b/server/speaker.c index b3d62ab..4e79cdb 100644 --- a/server/speaker.c +++ b/server/speaker.c @@ -555,7 +555,7 @@ static void fork_cmd(void) { static void play(size_t frames) { size_t avail_bytes, written_frames; ssize_t written_bytes; - struct rtp header; + struct rtp_header header; struct iovec vec[2]; if(activate()) {