doc/disorder-choose.8.html
config.aux/compile
server/endian
+clients/rtpmon
#
bin_PROGRAMS=disorder disorderfm disorder-playrtp
-noinst_PROGRAMS=filename-bytes
+noinst_PROGRAMS=filename-bytes rtpmon
noinst_SCRIPTS=dump2wav
AM_CPPFLAGS=-I${top_srcdir}/lib -I../lib
$(LIBDB) $(LIBPTHREAD)
disorder_playrtp_DEPENDENCIES=$(LIBOBJS) ../lib/libdisorder.a
+rtpmon_SOURCES=rtpmon.c
+rtpmon_LDADD=$(LIBOBJS) ../lib/libdisorder.a
+
filename_bytes_SOURCES=filename-bytes.c
install-exec-hook:
#include "version.h"
#include "uaudio.h"
-#define readahead linux_headers_are_borked
-
/** @brief Obsolete synonym */
#ifndef IPV6_JOIN_GROUP
# define IPV6_JOIN_GROUP IPV6_ADD_MEMBERSHIP
/** @brief Output device */
-/** @brief Minimum low watermark
- *
- * We'll stop playing if there's only this many samples in the buffer. */
-unsigned minbuffer = 2 * 44100 / 10; /* 0.2 seconds */
+/** @brief Buffer low watermark in samples */
+unsigned minbuffer = 4 * (2 * 44100) / 10; /* 0.4 seconds */
-/** @brief Buffer high watermark
+/** @brief Maximum buffer size in samples
*
- * We'll only start playing when this many samples are available. */
-static unsigned readahead = 44100; /* 0.5 seconds */
-
-/** @brief Maximum buffer size
- *
- * We'll stop reading from the network if we have this many samples. */
+ * We'll stop reading from the network if we have this many samples.
+ */
static unsigned maxbuffer;
/** @brief Received packets
{ "device", required_argument, 0, 'D' },
{ "min", required_argument, 0, 'm' },
{ "max", required_argument, 0, 'x' },
- { "buffer", required_argument, 0, 'b' },
{ "rcvbuf", required_argument, 0, 'R' },
#if HAVE_SYS_SOUNDCARD_H || EMPEG_HOST
{ "oss", no_argument, 0, 'o' },
* Must be called with @ref lock held.
*/
void playrtp_fill_buffer(void) {
- while(nsamples)
+ /* Discard current buffer contents */
+ while(nsamples) {
+ //fprintf(stderr, "%8u/%u (%u) DROPPING\n", nsamples, maxbuffer, minbuffer);
drop_first_packet();
+ }
info("Buffering...");
- while(nsamples < readahead) {
+ /* Wait until there's at least minbuffer samples available */
+ while(nsamples < minbuffer) {
+ //fprintf(stderr, "%8u/%u (%u) FILLING\n", nsamples, maxbuffer, minbuffer);
pthread_cond_wait(&cond, &lock);
}
+ /* Start from whatever is earliest */
next_timestamp = pheap_first(&packets)->timestamp;
active = 1;
}
"Options:\n"
" --device, -D DEVICE Output device\n"
" --min, -m FRAMES Buffer low water mark\n"
- " --buffer, -b FRAMES Buffer high water mark\n"
" --max, -x FRAMES Buffer maximum size\n"
" --rcvbuf, -R BYTES Socket receive buffer size\n"
" --config, -C PATH Set configuration file\n"
*bufptr++ = (int16_t)ntohs(*ptr++);
--i;
}
- /* We don't junk the packet here; a subsequent call to
- * playrtp_next_packet() will dispose of it (if it's actually done with). */
} else {
/* There is no suitable packet. We introduce 0s up to the next packet, or
* to fill the buffer if there's no next packet or that's too many. The
}
/* Advance timestamp */
next_timestamp += samples;
+ /* Junk obsolete packets */
+ playrtp_next_packet();
pthread_mutex_unlock(&lock);
return samples;
}
struct addrinfo *res;
struct stringlist sl;
char *sockname;
- int rcvbuf, target_rcvbuf = 131072;
+ int rcvbuf, target_rcvbuf = 0;
socklen_t len;
struct ip_mreq mreq;
struct ipv6_mreq mreq6;
mem_init();
if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale");
backend = uaudio_apis[0];
- while((n = getopt_long(argc, argv, "hVdD:m:b:x:L:R:M:aocC:re:P:", options, 0)) >= 0) {
+ while((n = getopt_long(argc, argv, "hVdD:m:x:L:R:M:aocC:re:P:", options, 0)) >= 0) {
switch(n) {
case 'h': help();
case 'V': version("disorder-playrtp");
case 'd': debugging = 1; break;
case 'D': uaudio_set("device", optarg); break;
case 'm': minbuffer = 2 * atol(optarg); break;
- case 'b': readahead = 2 * atol(optarg); break;
case 'x': maxbuffer = 2 * atol(optarg); break;
case 'L': logfp = fopen(optarg, "w"); break;
case 'R': target_rcvbuf = atoi(optarg); break;
}
if(config_read(0)) fatal(0, "cannot read configuration");
if(!maxbuffer)
- maxbuffer = 4 * readahead;
+ maxbuffer = 2 * minbuffer;
argc -= optind;
argv += optind;
switch(argc) {
rcvbuf, target_rcvbuf);
} else
info("default socket receive buffer %d", rcvbuf);
+ //info("minbuffer %u maxbuffer %u", minbuffer, maxbuffer);
if(logfp)
info("WARNING: -L option can impact performance");
if(control_socket) {
pthread_mutex_unlock(&lock);
backend->activate();
pthread_mutex_lock(&lock);
- /* Wait until the buffer empties out */
+ /* Wait until the buffer empties out
+ *
+ * If there's a packet that we can play right now then we definitely
+ * continue.
+ *
+ * Also if there's at least minbuffer samples we carry on regardless and
+ * insert silence. The assumption is there's been a pause but more data
+ * is now available.
+ */
while(nsamples >= minbuffer
|| (nsamples > 0
&& contains(pheap_first(&packets), next_timestamp))) {
+ //fprintf(stderr, "%8u/%u (%u) PLAYING\n", nsamples, maxbuffer, minbuffer);
pthread_cond_wait(&cond, &lock);
}
+#if 0
+ if(nsamples) {
+ struct packet *p = pheap_first(&packets);
+ fprintf(stderr, "nsamples=%u (%u) next_timestamp=%"PRIx32", first packet is [%"PRIx32",%"PRIx32")\n",
+ nsamples, minbuffer, next_timestamp,p->timestamp,p->timestamp+p->nsamples);
+ }
+#endif
/* Stop playing for a bit until the buffer re-fills */
pthread_mutex_unlock(&lock);
backend->deactivate();
--- /dev/null
+/*
+ * This file is part of DisOrder.
+ * Copyright (C) 2009 Richard Kettlewell
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+/** @file clients/rtpmon.c
+ * @brief RTP monitor
+ *
+ * This progam monitors the rate at which data arrives by RTP and
+ * constantly display it. It is intended for debugging only.
+ *
+ * TODO de-dupe with playrtp.
+ */
+#include "common.h"
+
+#include <getopt.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <locale.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <sys/uio.h>
+
+#include "syscalls.h"
+#include "timeval.h"
+#include "mem.h"
+#include "log.h"
+#include "version.h"
+#include "addr.h"
+#include "configuration.h"
+#include "rtp.h"
+
+/** @brief Record of one packet */
+struct entry {
+ /** @brief When packet arrived */
+ struct timeval when;
+
+ /** @brief Serial number of first sample */
+ uint32_t serial;
+};
+
+/** @brief Bytes per frame */
+static unsigned bpf = 4;
+
+/** @brief Frame serial number */
+static uint32_t serial;
+
+/** @brief Size of ring buffer */
+#define RINGSIZE 131072
+
+/** @brief Ring buffer */
+static struct entry ring[RINGSIZE];
+
+/** @brief Where new packets join the ring */
+static unsigned ringtail;
+
+static const struct option options[] = {
+ { "help", no_argument, 0, 'h' },
+ { "version", no_argument, 0, 'V' },
+ { "bpf", required_argument, 0, 'b' },
+ { 0, 0, 0, 0 }
+};
+
+static void help(void) {
+ xprintf("Usage:\n"
+ " rtpmon [OPTIONS] [ADDRESS] PORT\n"
+ "Options:\n"
+ " --bpf, -b Bytes/frame (default 4)\n"
+ " --help, -h Display usage message\n"
+ " --version, -V Display version number\n"
+ );
+ xfclose(stdout);
+ exit(0);
+}
+
+/** @brief Compute the rate by sampling at two points in the ring buffer */
+static double rate(unsigned earlier, unsigned later) {
+ const uint32_t frames = ring[later].serial - ring[earlier].serial;
+ const int64_t us = tvsub_us(ring[later].when, ring[earlier].when);
+
+ if(us)
+ return 1000000.0 * frames / us;
+ else
+ return 0.0;
+}
+
+/** @brief Called to say we received some bytes
+ * @param when When we received them
+ * @param n How many frames of audio data we received
+ */
+static void frames(const struct timeval *when, size_t n) {
+ const time_t prev = ring[(ringtail - 1) % RINGSIZE].when.tv_sec;
+
+ ring[ringtail].when = *when;
+ ring[ringtail].serial = serial;
+ serial += n;
+ ringtail = (ringtail + 1) % RINGSIZE;
+ // Report once a second
+ if(prev != when->tv_sec) {
+ if(printf("%8.2f %8.2f %8.2f %8.2f %8.2f %8.2f %8.2f\n",
+ rate((ringtail - RINGSIZE / 128) % RINGSIZE,
+ (ringtail - 1) % RINGSIZE),
+ rate((ringtail - RINGSIZE / 64) % RINGSIZE,
+ (ringtail - 1) % RINGSIZE),
+ rate((ringtail - RINGSIZE / 32) % RINGSIZE,
+ (ringtail - 1) % RINGSIZE),
+ rate((ringtail - RINGSIZE / 16) % RINGSIZE,
+ (ringtail - 1) % RINGSIZE),
+ rate((ringtail - RINGSIZE / 8) % RINGSIZE,
+ (ringtail - 1) % RINGSIZE),
+ rate((ringtail - RINGSIZE / 4) % RINGSIZE,
+ (ringtail - 1) % RINGSIZE),
+ rate((ringtail - RINGSIZE / 2) % RINGSIZE,
+ (ringtail - 1) % RINGSIZE)) < 0
+ || fflush(stdout) < 0)
+ fatal(errno, "stdout");
+ }
+}
+
+int main(int argc, char **argv) {
+ int n;
+ struct addrinfo *res;
+ struct stringlist sl;
+ struct ip_mreq mreq;
+ struct ipv6_mreq mreq6;
+ int rtpfd;
+ char *sockname;
+ int is_multicast;
+ union any_sockaddr {
+ struct sockaddr sa;
+ struct sockaddr_in in;
+ struct sockaddr_in6 in6;
+ };
+ union any_sockaddr mgroup;
+
+ static const struct addrinfo prefs = {
+ .ai_flags = AI_PASSIVE,
+ .ai_family = PF_INET,
+ .ai_socktype = SOCK_DGRAM,
+ .ai_protocol = IPPROTO_UDP
+ };
+ static const int one = 1;
+
+ mem_init();
+ if(!setlocale(LC_CTYPE, ""))
+ fatal(errno, "error calling setlocale");
+ while((n = getopt_long(argc, argv, "hVb:", options, 0)) >= 0) {
+ switch(n) {
+ case 'h': help();
+ case 'V': version("rtpmon");
+ case 'b': bpf = atoi(optarg); break;
+ default: fatal(0, "invalid option");
+ }
+ }
+ argc -= optind;
+ argv += optind;
+ switch(argc) {
+ case 1:
+ case 2:
+ /* Use command-line ADDRESS+PORT or just PORT */
+ sl.n = argc;
+ sl.s = argv;
+ break;
+ default:
+ fatal(0, "usage: rtpmon [OPTIONS] [ADDRESS] PORT");
+ }
+ if(!(res = get_address(&sl, &prefs, &sockname)))
+ exit(1);
+ /* Create the socket */
+ if((rtpfd = socket(res->ai_family,
+ res->ai_socktype,
+ res->ai_protocol)) < 0)
+ fatal(errno, "error creating socket");
+ /* Allow multiple listeners */
+ xsetsockopt(rtpfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof one);
+ is_multicast = multicast(res->ai_addr);
+ /* The multicast and unicast/broadcast cases are different enough that they
+ * are totally split. Trying to find commonality between them causes more
+ * trouble that it's worth. */
+ if(is_multicast) {
+ /* Stash the multicast group address */
+ memcpy(&mgroup, res->ai_addr, res->ai_addrlen);
+ switch(res->ai_addr->sa_family) {
+ case AF_INET:
+ mgroup.in.sin_port = 0;
+ break;
+ case AF_INET6:
+ mgroup.in6.sin6_port = 0;
+ break;
+ default:
+ fatal(0, "unsupported family %d", (int)res->ai_addr->sa_family);
+ }
+ /* Bind to to the multicast group address */
+ if(bind(rtpfd, res->ai_addr, res->ai_addrlen) < 0)
+ fatal(errno, "error binding socket to %s", format_sockaddr(res->ai_addr));
+ /* Add multicast group membership */
+ switch(mgroup.sa.sa_family) {
+ case PF_INET:
+ mreq.imr_multiaddr = mgroup.in.sin_addr;
+ mreq.imr_interface.s_addr = 0; /* use primary interface */
+ if(setsockopt(rtpfd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
+ &mreq, sizeof mreq) < 0)
+ fatal(errno, "error calling setsockopt IP_ADD_MEMBERSHIP");
+ break;
+ case PF_INET6:
+ mreq6.ipv6mr_multiaddr = mgroup.in6.sin6_addr;
+ memset(&mreq6.ipv6mr_interface, 0, sizeof mreq6.ipv6mr_interface);
+ if(setsockopt(rtpfd, IPPROTO_IPV6, IPV6_JOIN_GROUP,
+ &mreq6, sizeof mreq6) < 0)
+ fatal(errno, "error calling setsockopt IPV6_JOIN_GROUP");
+ break;
+ default:
+ fatal(0, "unsupported address family %d", res->ai_family);
+ }
+ /* Report what we did */
+ info("listening on %s multicast group %s",
+ format_sockaddr(res->ai_addr), format_sockaddr(&mgroup.sa));
+ } else {
+ /* Bind to 0/port */
+ switch(res->ai_addr->sa_family) {
+ case AF_INET: {
+ struct sockaddr_in *in = (struct sockaddr_in *)res->ai_addr;
+
+ memset(&in->sin_addr, 0, sizeof (struct in_addr));
+ if(bind(rtpfd, res->ai_addr, res->ai_addrlen) < 0)
+ fatal(errno, "error binding socket to 0.0.0.0 port %d",
+ ntohs(in->sin_port));
+ break;
+ }
+ case AF_INET6: {
+ struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)res->ai_addr;
+
+ memset(&in6->sin6_addr, 0, sizeof (struct in6_addr));
+ break;
+ }
+ default:
+ fatal(0, "unsupported family %d", (int)res->ai_addr->sa_family);
+ }
+ if(bind(rtpfd, res->ai_addr, res->ai_addrlen) < 0)
+ fatal(errno, "error binding socket to %s", format_sockaddr(res->ai_addr));
+ /* Report what we did */
+ info("listening on %s", format_sockaddr(res->ai_addr));
+ }
+ for(;;) {
+ struct rtp_header header;
+ char buffer[4096];
+ struct iovec iov[2];
+ struct timeval when;
+
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof header;
+ iov[1].iov_base = buffer;
+ iov[1].iov_len = sizeof buffer;
+ n = readv(rtpfd, iov, 2);
+ gettimeofday(&when, 0);
+ if(n < 0) {
+ switch(errno) {
+ case EINTR:
+ continue;
+ default:
+ fatal(errno, "error reading from socket");
+ }
+ }
+ if((size_t)n <= sizeof (struct rtp_header)) {
+ info("ignored a short packet");
+ continue;
+ }
+ frames(&when, (n - sizeof header) / bpf);
+ }
+}
+
+/*
+Local Variables:
+c-basic-offset:2
+comment-column:40
+fill-column:79
+indent-tabs-mode:nil
+End:
+*/
Display version number.
.SS "Buffer Control Options"
You shouldn't need to use these options.
+You should consult the source code for details of their effects.
.TP
.B \-\-min \fIFRAMES\fR, \fB\-m \fIFRAMES\fR
Specifies the buffer low watermark in frames.
-If the number of frames falls below this value then playing will be
-stopped until the buffer fills up.
-.TP
-.B \-\-buffer \fIFRAMES\fR, \fB\-b \fIFRAMES\fR
-Specifies the buffer high watermark in frames.
-Once there are this many frames in the buffer, playing will be (re-)started.
.TP
.B \-\-max \fIFRAMES\fR, \fB\-x \fIFRAMES\fR
Specifies the maximum buffer size in frames.
If there are this many frames in the buffer then reading from the
network socket will be suspended.
-The default is four times the \fB\-\-buffer\fR value.
+The default is twice the \fB\-\-min\fR value.
.TP
.B \-\-rcvbuf \fIBYTES\fR, \fB\-R \fIBYTES\fR
Specifies socket receive buffer size.
-The default is 131072 (128Kbytes).
+The default is not to change the buffer size, i.e. you get whatever the
+local operating system chooses.
The buffer size will not be reduced below the operating system's default.
.SH "REMOTE CONTROL"
The
static long alsa_mixer_max;
/** @brief Actually play sound via ALSA */
-static size_t alsa_play(void *buffer, size_t samples) {
+static size_t alsa_play(void *buffer, size_t samples, unsigned flags) {
+ /* If we're paused we just pretend. We rely on snd_pcm_writei() blocking so
+ * we have to fake up a sleep here. However it doesn't have to be all that
+ * accurate - in particular it's quite acceptable to greatly underestimate
+ * the required wait time. For 'lengthy' waits we do this by the blunt
+ * instrument of halving it. */
+ if(flags & UAUDIO_PAUSED) {
+ if(samples > 64)
+ samples /= 2;
+ const uint64_t ns = ((uint64_t)samples * 1000000000
+ / (uaudio_rate * uaudio_channels));
+ struct timespec ts[1];
+ ts->tv_sec = ns / 1000000000;
+ ts->tv_nsec = ns % 1000000000;
+ while(nanosleep(ts, ts) < 0 && errno == EINTR)
+ ;
+ return samples;
+ }
int err;
/* ALSA wants 'frames', where frame = several concurrently played samples */
const snd_pcm_uframes_t frames = samples / uaudio_channels;
}
-static void alsa_activate(void) {
- uaudio_thread_activate();
-}
-
-static void alsa_deactivate(void) {
- uaudio_thread_deactivate();
-}
-
static void alsa_start(uaudio_callback *callback,
void *userdata) {
if(uaudio_channels != 1 && uaudio_channels != 2)
.options = alsa_options,
.start = alsa_start,
.stop = alsa_stop,
- .activate = alsa_activate,
- .deactivate = alsa_deactivate,
+ .activate = uaudio_thread_activate,
+ .deactivate = uaudio_thread_deactivate,
.open_mixer = alsa_open_mixer,
.close_mixer = alsa_close_mixer,
.get_volume = alsa_get_volume,
/** @brief Child process ID */
static pid_t command_pid;
+/** @brief Whether to suspend on pause */
+static int command_suspend_on_pause;
+
static const char *const command_options[] = {
"command",
"pause-mode",
}
/** @brief Send audio data to subprocess */
-static size_t command_play(void *buffer, size_t nsamples) {
- uaudio_schedule_synchronize();
+static size_t command_play(void *buffer, size_t nsamples, unsigned flags) {
+ uaudio_schedule_sync();
+ /* If we're pausing and want that to be represented by stopping writing, we
+ * just pretend */
+ if((flags & UAUDIO_PAUSED) && command_suspend_on_pause)
+ return nsamples;
const size_t bytes = nsamples * uaudio_sample_size;
int written = write(command_fd, buffer, bytes);
if(written < 0) {
fatal(errno, "error writing to audio command subprocess");
}
}
+ /* TODO what if we write a partial sample? Actually reasonably unlikely but
+ * not impossible. Maybe someone who actually uses this backend should sort
+ * it out. */
const size_t written_samples = written / uaudio_sample_size;
- uaudio_schedule_update(written_samples);
+ uaudio_schedule_sent(written_samples);
return written_samples;
}
unsigned flags = 0;
if(!strcmp(pausemode, "silence"))
- flags |= UAUDIO_THREAD_FAKE_PAUSE;
+ command_suspend_on_pause = 0;
else if(!strcmp(pausemode, "suspend"))
- ;
+ command_suspend_on_pause = 1;
else
fatal(0, "unknown pause mode '%s'", pausemode);
command_open();
command_wait();
}
-static void command_activate(void) {
- uaudio_schedule_reactivated = 1;
- uaudio_thread_activate();
-}
-
-static void command_deactivate(void) {
- uaudio_thread_deactivate();
-}
-
static void command_configure(void) {
uaudio_set("command", config->speaker_command);
uaudio_set("pause-mode", config->pause_mode);
.options = command_options,
.start = command_start,
.stop = command_stop,
- .activate = command_activate,
- .deactivate = command_deactivate,
+ .activate = uaudio_thread_activate,
+ .deactivate = uaudio_thread_deactivate,
.configure = command_configure,
};
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
+#include <time.h>
#include "mem.h"
#include "log.h"
NULL
};
-/** @brief Actually play sound via OSS */
-static size_t oss_play(void *buffer, size_t samples) {
- const size_t bytes = samples * uaudio_sample_size;
- int rc = write(oss_fd, buffer, bytes);
- if(rc < 0)
- fatal(errno, "error writing to sound device");
- return rc / uaudio_sample_size;
-}
-
/** @brief Open the OSS sound device */
static void oss_open(void) {
const char *device = uaudio_get("device", NULL);
#endif
}
-static void oss_activate(void) {
- oss_open();
- uaudio_thread_activate();
+/** @brief Close the OSS sound device */
+static void oss_close(void) {
+ if(oss_fd != -1) {
+ close(oss_fd);
+ oss_fd = -1;
+ }
}
-static void oss_deactivate(void) {
- uaudio_thread_deactivate();
- close(oss_fd);
- oss_fd = -1;
+/** @brief Actually play sound via OSS */
+static size_t oss_play(void *buffer, size_t samples, unsigned flags) {
+ /* cf uaudio-alsa.c:alsa-play() */
+ if(flags & UAUDIO_PAUSED) {
+ if(flags & UAUDIO_PAUSE)
+ oss_close();
+ if(samples > 64)
+ samples /= 2;
+ const uint64_t ns = ((uint64_t)samples * 1000000000
+ / (uaudio_rate * uaudio_channels));
+ struct timespec ts[1];
+ ts->tv_sec = ns / 1000000000;
+ ts->tv_nsec = ns % 1000000000;
+ while(nanosleep(ts, ts) < 0 && errno == EINTR)
+ ;
+ return samples;
+ }
+ if(flags & UAUDIO_RESUME)
+ oss_open();
+ const size_t bytes = samples * uaudio_sample_size;
+ int rc = write(oss_fd, buffer, bytes);
+ if(rc < 0)
+ fatal(errno, "error writing to sound device");
+ return rc / uaudio_sample_size;
}
-
+
static void oss_start(uaudio_callback *callback,
void *userdata) {
if(uaudio_channels != 1 && uaudio_channels != 2)
static void oss_stop(void) {
uaudio_thread_stop();
+ oss_close(); /* might not have been paused */
}
/** @brief Channel names */
.options = oss_options,
.start = oss_start,
.stop = oss_stop,
- .activate = oss_activate,
- .deactivate = oss_deactivate,
+ .activate = uaudio_thread_activate,
+ .deactivate = uaudio_thread_deactivate,
.open_mixer = oss_open_mixer,
.close_mixer = oss_close_mixer,
.get_volume = oss_get_volume,
/** @brief RTP SSRC */
static uint32_t rtp_id;
+/** @brief Base for timestamp */
+static uint32_t rtp_base;
+
/** @brief RTP sequence number */
static uint16_t rtp_sequence;
*/
static int rtp_errors;
-/** @brief Delay threshold in microseconds
- *
- * rtp_play() never attempts to introduce a delay shorter than this.
- */
-static int64_t rtp_delay_threshold;
+/** @brief Set while paused */
+static volatile int rtp_paused;
static const char *const rtp_options[] = {
"rtp-destination",
"rtp-source-port",
"multicast-ttl",
"multicast-loop",
- "delay-threshold",
NULL
};
}
}
-static size_t rtp_play(void *buffer, size_t nsamples) {
+static size_t rtp_play(void *buffer, size_t nsamples, unsigned flags) {
struct rtp_header header;
struct iovec vec[2];
-
+
+#if 0
+ if(flags & (UAUDIO_PAUSE|UAUDIO_RESUME))
+ fprintf(stderr, "rtp_play %zu samples%s%s%s%s\n", nsamples,
+ flags & UAUDIO_PAUSE ? " UAUDIO_PAUSE" : "",
+ flags & UAUDIO_RESUME ? " UAUDIO_RESUME" : "",
+ flags & UAUDIO_PLAYING ? " UAUDIO_PLAYING" : "",
+ flags & UAUDIO_PAUSED ? " UAUDIO_PAUSED" : "");
+#endif
+
/* We do as much work as possible before checking what time it is */
/* Fill out header */
header.vpxcc = 2 << 6; /* V=2, P=0, X=0, CC=0 */
header.seq = htons(rtp_sequence++);
header.ssrc = rtp_id;
- header.mpt = (uaudio_schedule_reactivated ? 0x80 : 0x00) | rtp_payload;
+ header.mpt = rtp_payload;
+ /* If we've come out of a pause, set the marker bit */
+ if(flags & UAUDIO_RESUME)
+ header.mpt |= 0x80;
#if !WORDS_BIGENDIAN
/* Convert samples to network byte order */
uint16_t *u = buffer, *const limit = u + nsamples;
vec[0].iov_len = sizeof header;
vec[1].iov_base = buffer;
vec[1].iov_len = nsamples * uaudio_sample_size;
- uaudio_schedule_synchronize();
- header.timestamp = htonl((uint32_t)uaudio_schedule_timestamp);
+ const uint32_t timestamp = uaudio_schedule_sync();
+ header.timestamp = htonl(rtp_base + (uint32_t)timestamp);
+ /* If we're paused don't actually end a packet, we just pretend */
+ if(flags & UAUDIO_PAUSED) {
+ uaudio_schedule_sent(nsamples);
+ return nsamples;
+ }
int written_bytes;
do {
written_bytes = writev(rtp_fd, vec, 2);
return 0;
} else
rtp_errors /= 2; /* gradual decay */
- written_bytes -= sizeof (struct rtp_header);
- const size_t written_samples = written_bytes / uaudio_sample_size;
- uaudio_schedule_update(written_samples);
- return written_samples;
+ /* TODO what can we sensibly do about short writes here? Really that's just
+ * an error and we ought to be using smaller packets. */
+ uaudio_schedule_sent(nsamples);
+ return nsamples;
}
static void rtp_open(void) {
"rtp-source",
"rtp-source-port",
src);
- rtp_delay_threshold = atoi(uaudio_get("rtp-delay-threshold", "1000"));
/* ...microseconds */
/* Resolve addresses */
* packet contents are highly public so there's no point asking for very
* strong randomness. */
gcry_create_nonce(&rtp_id, sizeof rtp_id);
+ gcry_create_nonce(&rtp_base, sizeof rtp_base);
gcry_create_nonce(&rtp_sequence, sizeof rtp_sequence);
rtp_open();
uaudio_schedule_init();
rtp_fd = -1;
}
-static void rtp_activate(void) {
- uaudio_schedule_reactivated = 1;
- uaudio_thread_activate();
-}
-
-static void rtp_deactivate(void) {
- uaudio_thread_deactivate();
-}
-
static void rtp_configure(void) {
char buffer[64];
snprintf(buffer, sizeof buffer, "%ld", config->multicast_ttl);
uaudio_set("multicast-ttl", buffer);
uaudio_set("multicast-loop", config->multicast_loop ? "yes" : "no");
- snprintf(buffer, sizeof buffer, "%ld", config->rtp_delay_threshold);
- uaudio_set("delay-threshold", buffer);
}
const struct uaudio uaudio_rtp = {
.options = rtp_options,
.start = rtp_start,
.stop = rtp_stop,
- .activate = rtp_activate,
- .deactivate = rtp_deactivate,
+ .activate = uaudio_thread_activate,
+ .deactivate = uaudio_thread_deactivate,
.configure = rtp_configure,
};
*
* The sequence numbers are intended for RTP's use but it's more convenient to
* maintain them here.
+ *
+ * The basic idea:
+ * - we maintain a base time
+ * - we calculate from this how many samples SHOULD have been sent by now
+ * - we compare this with the number of samples sent so far
+ * - we use this to wait until we're ready to send something
+ * - it's up to the caller to send nothing, or send 0s, if it's supposed to
+ * be paused
+ *
+ * An implication of this is that the caller must still call
+ * uaudio_schedule_sync() when deactivated (paused) and pretend to send 0s.
*/
#include "common.h"
#include <unistd.h>
-#include <gcrypt.h>
+#include <time.h>
+#include <errno.h>
#include "uaudio.h"
#include "mem.h"
* timestamp is maintained as a 64-bit integer, giving around six million years
* before wrapping, and truncated to 32 bits when transmitting.
*/
-uint64_t uaudio_schedule_timestamp;
-
-/** @brief Actual time corresponding to @ref uaudio_schedule_timestamp
- *
- * This is the time, on this machine, at which the sample at @ref
- * uaudio_schedule_timestamp ought to be sent, interpreted as the time the last
- * packet was sent plus the time length of the packet. */
-static struct timeval uaudio_schedule_timeval;
-
-/** @brief Set when we (re-)activate, to provoke timestamp resync */
-int uaudio_schedule_reactivated;
+static uint64_t timestamp;
-/** @brief Delay threshold in microseconds
+/** @brief Base time
*
- * uaudio_schedule_play() never attempts to introduce a delay shorter than this.
+ * This is the base time that corresponds to a timestamp of 0.
*/
-static int64_t uaudio_schedule_delay_threshold;
-
-/** @brief Time for current packet */
-static struct timeval uaudio_schedule_now;
+struct timeval base;
/** @brief Synchronize playback operations against real time
+ * @return Sample number
*
- * This function sleeps as necessary to rate-limit playback operations to match
- * the actual playback rate. It also maintains @ref uaudio_schedule_timestamp
- * as an arbitrarily-based sample counter, for use by RTP.
- *
- * You should call this in your API's @ref uaudio_playcallback before writing
- * and call uaudio_schedule_update() afterwards.
*/
-void uaudio_schedule_synchronize(void) {
-retry:
- xgettimeofday(&uaudio_schedule_now, NULL);
- if(uaudio_schedule_reactivated) {
- /* We've been deactivated for some unknown interval. We need to advance
- * rtp_timestamp to account for the dead air. */
- /* On the first run through we'll set the start time. */
- if(!uaudio_schedule_timeval.tv_sec)
- uaudio_schedule_timeval = uaudio_schedule_now;
- /* See how much time we missed.
- *
- * This will be 0 on the first run through, in which case we'll not modify
- * anything.
- *
- * It'll be negative in the (rare) situation where the deactivation
- * interval is shorter than the last packet we sent. In this case we wait
- * for that much time and then return having sent no samples, which will
- * cause uaudio_play_thread_fn() to retry.
- *
- * In the normal case it will be positive.
- */
- const int64_t delay = tvsub_us(uaudio_schedule_now,
- uaudio_schedule_timeval); /* microseconds */
- if(delay < 0) {
- usleep(-delay);
- goto retry;
- }
- /* Advance the RTP timestamp to the present. With 44.1KHz stereo this will
- * overflow the intermediate value with a delay of a bit over 6 years.
- * This seems acceptable. */
- uint64_t update = (delay * uaudio_rate * uaudio_channels) / 1000000;
- /* Don't throw off channel synchronization */
- update -= update % uaudio_channels;
- /* We log nontrivial changes */
- if(update)
- info("advancing uaudio_schedule_timeval by %"PRIu64" samples", update);
- uaudio_schedule_timestamp += update;
- uaudio_schedule_timeval = uaudio_schedule_now;
- uaudio_schedule_reactivated = 0;
+uint32_t uaudio_schedule_sync(void) {
+ const unsigned rate = uaudio_rate * uaudio_channels;
+ struct timeval now;
+
+ xgettimeofday(&now, NULL);
+ /* If we're just starting then we might as well send as much as possible
+ * straight away. */
+ if(!base.tv_sec) {
+ base = now;
+ return timestamp;
+ }
+ /* Calculate how many microseconds ahead of the base time we are */
+ uint64_t us = tvsub_us(now, base);
+ /* Calculate how many samples that is */
+ uint64_t samples = us * rate / 1000000;
+ /* So...
+ *
+ * We've actually sent 'timestamp' samples so far.
+ *
+ * We OUGHT to have sent 'samples' samples so far.
+ *
+ * Suppose it's the SECOND call. timestamp will be (say) 716. 'samples'
+ * will be (say) 10 - there's been a bit of scheduling delay. So in that
+ * case we should wait for 716-10=706 samples worth of time before we can
+ * even send one sample.
+ *
+ * So we wait that long and send our 716 samples.
+ *
+ * On the next call we'll have timestamp=1432 and samples=726, say. So we
+ * wait and send again.
+ *
+ * On the next call there's been a bit of a delay. timestamp=2148 but
+ * samples=2200. So we send our 716 samples immediately.
+ *
+ * If the delay had been longer we might sent further packets back to back to
+ * make up for it.
+ *
+ * Now timestamp=2864 and samples=2210 (say). Now we're back to waiting.
+ */
+ if(samples < timestamp) {
+ /* We should delay a bit */
+ int64_t wait_samples = timestamp - samples;
+ int64_t wait_ns = wait_samples * 1000000000 / rate;
+
+ struct timespec ts[1];
+ ts->tv_sec = wait_ns / 1000000000;
+ ts->tv_nsec = wait_ns % 1000000000;
+#if 0
+ fprintf(stderr,
+ "samples=%8"PRIu64" timestamp=%8"PRIu64" wait=%"PRId64" (%"PRId64"ns)\n",
+ samples, timestamp, wait_samples, wait_ns);
+#endif
+ while(nanosleep(ts, ts) < 0 && errno == EINTR)
+ ;
} else {
- /* Chances are we've been called right on the heels of the previous packet.
- * If we just sent packets as fast as we got audio data we'd get way ahead
- * of the player and some buffer somewhere would fill (or at least become
- * unreasonably large).
- *
- * First find out how far ahead of the target time we are.
- */
- const int64_t ahead = tvsub_us(uaudio_schedule_timeval,
- uaudio_schedule_now); /* microseconds */
- /* Only delay at all if we are nontrivially ahead. */
- if(ahead > uaudio_schedule_delay_threshold) {
- /* Don't delay by the full amount */
- usleep(ahead - uaudio_schedule_delay_threshold / 2);
- /* Refetch time (so we don't get out of step with reality) */
- xgettimeofday(&uaudio_schedule_now, NULL);
- }
+#if 0
+ fprintf(stderr, "samples=%8"PRIu64" timestamp=%8"PRIu64"\n",
+ samples, timestamp);
+#endif
}
+ /* If samples >= timestamp then it's time, or gone time, to play the
+ * timestamp'th sample. So we return immediately. */
+ return timestamp;
}
-/** @brief Update schedule after writing
- *
- * Called by your API's @ref uaudio_playcallback after sending audio data (to a
- * subprocess or network or whatever). A separate function so that the caller
- * doesn't have to know how many samples they're going to write until they've
- * done so.
+/** @brief Report how many samples we actually sent
+ * @param nsamples Number of samples sent
*/
-void uaudio_schedule_update(size_t written_samples) {
- /* uaudio_schedule_timestamp and uaudio_schedule_timestamp are supposed to
- * refer to the first sample of the next packet */
- uaudio_schedule_timestamp += written_samples;
- const unsigned usec = (uaudio_schedule_timeval.tv_usec
- + 1000000 * written_samples / (uaudio_rate
- * uaudio_channels));
- /* ...will only overflow 32 bits if one packet is more than about half an
- * hour long, which is not plausible. */
- uaudio_schedule_timeval.tv_sec += usec / 1000000;
- uaudio_schedule_timeval.tv_usec = usec % 1000000;
+void uaudio_schedule_sent(size_t nsamples) {
+ timestamp += nsamples;
}
/** @brief Initialize audio scheduling
* Should be called from your API's @c start callback.
*/
void uaudio_schedule_init(void) {
- gcry_create_nonce(&uaudio_schedule_timestamp,
- sizeof uaudio_schedule_timestamp);
/* uaudio_schedule_play() will spot this and choose an initial value */
- uaudio_schedule_timeval.tv_sec = 0;
+ base.tv_sec = 0;
}
/*
static uaudio_playcallback *uaudio_thread_play_callback;
static void *uaudio_thread_userdata;
static int uaudio_thread_started;
-static int uaudio_thread_activated;
static int uaudio_thread_collecting;
static pthread_mutex_t uaudio_thread_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t uaudio_thread_cond = PTHREAD_COND_INITIALIZER;
/** @brief Maximum number of samples per chunk */
static size_t uaudio_thread_max;
+/** @brief Set when activated, clear when paused */
+static int uaudio_thread_activated;
+
/** @brief Return number of buffers currently in use */
static int uaudio_buffers_used(void) {
return (uaudio_collect_buffer - uaudio_play_buffer) % UAUDIO_THREAD_BUFFERS;
/* We are definitely active now */
uaudio_thread_collecting = 1;
pthread_cond_broadcast(&uaudio_thread_cond);
- while(uaudio_thread_activated
- || (uaudio_thread_flags & UAUDIO_THREAD_FAKE_PAUSE)) {
+ while(uaudio_thread_activated) {
if(uaudio_buffers_used() < UAUDIO_THREAD_BUFFERS - 1) {
/* At least one buffer is available. We release the lock while
* collecting data so that other already-filled buffers can be played
uaudio_thread_max - b->nsamples,
uaudio_thread_userdata);
}
- } else if(uaudio_thread_flags & UAUDIO_THREAD_FAKE_PAUSE) {
- memset(b->samples, 0, uaudio_thread_min * uaudio_sample_size);
- b->nsamples += uaudio_thread_min;
}
pthread_mutex_lock(&uaudio_thread_lock);
/* Advance to next buffer */
*/
static void *uaudio_play_thread_fn(void attribute((unused)) *arg) {
int resync = 1;
+ unsigned last_flags = 0;
+ unsigned char zero[uaudio_thread_max * uaudio_sample_size];
+ memset(zero, 0, sizeof zero);
- pthread_mutex_lock(&uaudio_thread_lock);
while(uaudio_thread_started) {
+ // If we're paused then just play silence
+ if(!uaudio_thread_activated) {
+ pthread_mutex_unlock(&uaudio_thread_lock);
+ unsigned flags = UAUDIO_PAUSED;
+ if(last_flags & UAUDIO_PLAYING)
+ flags |= UAUDIO_PAUSE;
+ uaudio_thread_play_callback(zero, uaudio_thread_max,
+ last_flags = flags);
+ /* We expect the play callback to block for a reasonable period */
+ pthread_mutex_lock(&uaudio_thread_lock);
+ continue;
+ }
const int used = uaudio_buffers_used();
int go;
pthread_mutex_unlock(&uaudio_thread_lock);
//fprintf(stderr, "P%d.", uaudio_play_buffer);
size_t played = 0;
- while(played < b->nsamples)
+ while(played < b->nsamples) {
+ unsigned flags = UAUDIO_PLAYING;
+ if(last_flags & UAUDIO_PAUSED)
+ flags |= UAUDIO_RESUME;
played += uaudio_thread_play_callback((char *)b->samples
+ played * uaudio_sample_size,
- b->nsamples - played);
+ b->nsamples - played,
+ last_flags = flags);
+ }
pthread_mutex_lock(&uaudio_thread_lock);
/* Move to next buffer */
uaudio_play_buffer = (1 + uaudio_play_buffer) % UAUDIO_THREAD_BUFFERS;
* @param playcallback Callback to play audio data
* @param min Minimum number of samples to play in a chunk
* @param max Maximum number of samples to play in a chunk
- * @param flags Flags
+ * @param flags Flags (not currently used)
*
* @p callback will be called multiple times in quick succession if necessary
* to gather at least @p min samples. Equally @p playcallback may be called
* repeatedly in quick succession to play however much was received in a single
* chunk.
- *
- * Possible flags are:
- * - @ref UAUDIO_THREAD_FAKE_PAUSE
*/
void uaudio_thread_start(uaudio_callback *callback,
void *userdata,
uaudio_thread_max = max;
uaudio_thread_flags = flags;
uaudio_thread_started = 1;
+ uaudio_thread_activated = 0;
for(int n = 0; n < UAUDIO_THREAD_BUFFERS; ++n)
uaudio_buffers[n].samples = xcalloc_noptr(uaudio_thread_max,
uaudio_sample_size);
pthread_mutex_lock(&uaudio_thread_lock);
uaudio_thread_activated = 1;
pthread_cond_broadcast(&uaudio_thread_cond);
- while(!uaudio_thread_collecting)
- pthread_cond_wait(&uaudio_thread_cond, &uaudio_thread_lock);
pthread_mutex_unlock(&uaudio_thread_lock);
}
/** @brief Deactivate audio output */
void uaudio_thread_deactivate(void) {
pthread_mutex_lock(&uaudio_thread_lock);
- uaudio_thread_activated = 0;
+ uaudio_thread_activated = 0;
pthread_cond_broadcast(&uaudio_thread_cond);
- if(!(uaudio_thread_flags & UAUDIO_THREAD_FAKE_PAUSE)) {
- while(uaudio_thread_collecting || uaudio_buffers_used())
- pthread_cond_wait(&uaudio_thread_cond, &uaudio_thread_lock);
- }
pthread_mutex_unlock(&uaudio_thread_lock);
}
/** @brief Callback to play audio data
* @param buffer Pointer to audio buffer
* @param samples Number of samples to play
+ * @param flags Flags word
* @return Number of samples played
*
* Used with uaudio_thread_start() etc.
+ *
+ * @p flags is a bitmap giving the current pause state and transitions:
+ * - @ref UAUDIO_PAUSE if this is the first call of a pause
+ * - @ref UAUDIO_RESUME if this is the first call of a resumse
+ * - @ref UAUDIO_PLAYING if this is outside a pause
+ * - @ref UAUDIO_PAUSED if this is in a pause
+ *
+ * During a pause, the sample data is guaranteed to be 0.
*/
-typedef size_t uaudio_playcallback(void *buffer, size_t samples);
+typedef size_t uaudio_playcallback(void *buffer, size_t samples,
+ unsigned flags);
+
+/** @brief Start of a pause */
+#define UAUDIO_PAUSE 0x0001
+
+/** @brief End of a pause */
+#define UAUDIO_RESUME 0x0002
+
+/** @brief Currently playing */
+#define UAUDIO_PLAYING 0x0004
+
+/** @brief Currently paused */
+#define UAUDIO_PAUSED 0x0008
/** @brief Audio API definition */
struct uaudio {
size_t max,
unsigned flags);
-/** @brief Fake pauses
- *
- * This flag is used for audio backends that cannot sensibly be paused.
- * The thread support code will supply silence while deactivated in this
- * case.
- */
-#define UAUDIO_THREAD_FAKE_PAUSE 0x00000001
-
void uaudio_thread_stop(void);
void uaudio_thread_activate(void);
void uaudio_thread_deactivate(void);
-void uaudio_schedule_synchronize(void);
-void uaudio_schedule_update(size_t written_samples);
+uint32_t uaudio_schedule_sync(void);
+void uaudio_schedule_sent(size_t nsamples);
void uaudio_schedule_init(void);
const struct uaudio *uaudio_find(const char *name);