/*
* This file is part of DisOrder
- * Copyright (C) 2005-2009 Richard Kettlewell
+ * Copyright (C) 2005-2013 Richard Kettlewell
* Portions (C) 2007 Mark Wooding
*
* This program is free software: you can redistribute it and/or modify
* assumed that the main server won't start outrageously many decoders.
*
* Audio is supplied from this buffer to the uaudio play callback. Playback is
- * enabled when a track is to be played and disabled when the its last bytes
- * have been return by the callback; pause and resume is implemneted the
+ * enabled when a track is to be played and disabled when its last bytes
+ * have been returned by the callback; pause and resume is implemented the
* obvious way. If the callback finds itself required to play when there is no
* playing track it returns dead air.
*
+ * To implement gapless playback, the server is notified that a track has
+ * finished slightly early. @ref SM_PLAY is therefore allowed to arrive while
+ * the previous track is still playing provided an early @ref SM_FINISHED has
+ * been sent for it.
+ *
* @b Encodings. The encodings supported depend entirely on the uaudio backend
* chosen. See @ref uaudio.h, etc.
*
#include <syslog.h>
#include <unistd.h>
#include <errno.h>
-#include <ao/ao.h>
#include <sys/select.h>
#include <sys/wait.h>
#include <time.h>
#include <sys/un.h>
#include <sys/stat.h>
#include <pthread.h>
+#include <sys/resource.h>
+#include <gcrypt.h>
#include "configuration.h"
#include "syscalls.h"
/** @brief Maximum number of FDs to poll for */
#define NFDS 1024
+/** @brief Number of bytes before end of track to send SM_FINISHED
+ *
+ * Generally set to 1 second.
+ */
+static size_t early_finish;
+
/** @brief Track structure
*
* Known tracks are kept in a linked list. Usually there will be at most two
struct track *next;
/** @brief Input file descriptor */
- int fd; /* input FD */
+ int fd;
/** @brief Track ID */
char id[24];
* out life not playable.
*/
int playable;
+
+ /** @brief Set when finished
+ *
+ * This is set when we've notified the server that the track is finished.
+ * Once this has happened (typically very late in the track's lifetime) the
+ * track cannot be paused or cancelled.
+ */
+ int finished;
/** @brief Input buffer
*
/** @brief Lock protecting data structures
*
* This lock protects values shared between the main thread and the callback.
- * It is needed e.g. if changing @ref playing or if modifying buffer pointers.
- * It is not needed to add a new track, to read values only modified in the
- * same thread, etc.
+ *
+ * It is held 'all' the time by the main thread, the exceptions being when
+ * called activate/deactivate callbacks and when calling (potentially) slow
+ * system calls (in particular poll(), where in fact the main thread will spend
+ * most of its time blocked).
+ *
+ * The callback holds it when it's running.
*/
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-/** @brief Linked list of all prepared tracks */
+/** @brief Linked list of all prepared tracks
+ *
+ * This includes @ref playing and @ref pending_playing.
+ */
static struct track *tracks;
/** @brief Playing track, or NULL
*
- * This means the DESIRED playing track. It does not reflect any other state
- * (e.g. activation of uaudio backend).
+ * This means the track the speaker process intends to play. It does not
+ * reflect any other state (e.g. activation of uaudio backend).
+ *
+ * This track remains on @ref track.
*/
static struct track *playing;
+/** @brief Pending playing track, or NULL
+ *
+ * This means the track the server wants the speaker to play.
+ *
+ * This track remains on @p track.
+ */
+static struct track *pending_playing;
+
/** @brief Array of file descriptors for poll() */
static struct pollfd fds[NFDS];
-/** @brief Next free slot in @ref fds */
+/** @brief Next free slot in @ref fds
+ *
+ * This is used when filling in the @ref fds array each iteration through the
+ * event loop.
+ */
static int fdno;
/** @brief Listen socket */
/** @brief Find track @p id, maybe creating it if not found
* @param id Track ID to find
- * @param create If nonzero, create track structure of @p id not found
+ * @param create If nonzero, create track structure of @p id if not found
* @return Pointer to track structure or NULL
*/
static struct track *findtrack(const char *id, int create) {
* This is effectively the read callback on @c t->fd. It is called from the
* main loop whenever the track's file descriptor is readable, assuming the
* buffer has not reached the maximum allowed occupancy.
+ *
+ * Errors count as EOF.
*/
static int speaker_fill(struct track *t) {
size_t where, left;
t->id, t->eof, t->used));
if(t->eof)
return -1;
- pthread_mutex_lock(&lock);
if(t->used < sizeof t->buffer) {
/* there is room left in the buffer */
where = (t->start + t->used) % sizeof t->buffer;
n = read(t->fd, t->buffer + where, left);
} while(n < 0 && errno == EINTR);
pthread_mutex_lock(&lock);
- if(n < 0) {
- if(errno != EAGAIN)
- fatal(errno, "error reading sample stream");
+ if(n < 0 && errno == EAGAIN) {
+ /* EAGAIN means more later */
rc = 0;
- } else if(n == 0) {
- D(("fill %s: eof detected", t->id));
+ } else if(n <= 0) {
+ /* n=0 means EOF. n<0 means some error occurred. We log the error but
+ * otherwise treat it as identical to EOF. */
+ if(n < 0)
+ disorder_error(errno, "error reading sample stream for %s", t->id);
+ else
+ D(("fill %s: eof detected", t->id));
t->eof = 1;
/* A track always becomes playable at EOF; we're not going to see any
* more data. */
t->playable = 1;
rc = 0;
}
- }
- pthread_mutex_unlock(&lock);
+ } else
+ rc = 0;
return rc;
}
*
* We want to play audio if there is a current track; and it is not paused; and
* it is playable according to the rules for @ref track::playable.
+ *
+ * We don't allow tracks to be paused if we've already told the server we've
+ * finished them; that would cause such tracks to survive much longer than the
+ * few samples they're supposed to, with report() remaining silent for the
+ * duration. The effect is that if you hit pause towards the end of a track,
+ * what should happen is that it finished but the next one is paused right at
+ * its start.
*/
static int playable(void) {
return playing
- && !paused
+ && (!paused || playing->finished)
&& playing->playable;
}
struct speaker_message sm;
if(playing) {
+ /* Had better not send a report for a track that the server thinks has
+ * finished, that would be confusing. */
+ if(playing->finished)
+ return;
memset(&sm, 0, sizeof sm);
sm.type = paused ? SM_PAUSED : SM_PLAYING;
- strcpy(sm.id, playing->id);
- pthread_mutex_lock(&lock);
+ strcpy(sm.u.id, playing->id);
sm.data = playing->played / (uaudio_rate * uaudio_channels);
- pthread_mutex_unlock(&lock);
speaker_send(1, &sm);
+ xtime(&last_report);
}
- time(&last_report);
}
/** @brief Add a file descriptor to the set to poll() for
static size_t speaker_callback(void *buffer,
size_t max_samples,
void attribute((unused)) *userdata) {
- const size_t max_bytes = max_samples * uaudio_sample_size;
+ size_t max_bytes = max_samples * uaudio_sample_size;
size_t provided_samples = 0;
+ /* Be sure to keep the amount of data in a buffer a whole number of frames:
+ * otherwise the playing threads can become stuck. */
+ max_bytes -= max_bytes % (uaudio_sample_size * uaudio_channels);
+
pthread_mutex_lock(&lock);
/* TODO perhaps we should immediately go silent if we've been asked to pause
* or cancel the playing track (maybe block in the cancel case and see what
/* Limit to what we were asked for */
if(bytes > max_bytes)
bytes = max_bytes;
+ /* And truncate to a whole number of frames. */
+ bytes -= bytes % (uaudio_sample_size * uaudio_channels);
/* Provide it */
memcpy(buffer, playing->buffer + playing->start, bytes);
playing->start += bytes;
/* Wrap around to start of buffer */
if(playing->start == sizeof playing->buffer)
playing->start = 0;
- /* See if we've reached the end of the track */
- if(playing->used == 0 && playing->eof)
- write(sigpipe[1], "", 1);
+ /* See if we've reached the end of the track; if so make sure the event
+ * loop wakes up. */
+ if(playing->used == 0 && playing->eof) {
+ int ignored = write(sigpipe[1], "", 1);
+ (void) ignored;
+ }
provided_samples = bytes / uaudio_sample_size;
playing->played += provided_samples;
}
if(!provided_samples) {
memset(buffer, 0, max_bytes);
provided_samples = max_samples;
+ if(playing)
+ disorder_info("%zu samples silence, playing->used=%zu",
+ provided_samples, playing->used);
+ else
+ disorder_info("%zu samples silence, playing=NULL", provided_samples);
}
pthread_mutex_unlock(&lock);
return provided_samples;
struct speaker_message sm;
int n, fd, stdin_slot, timeout, listen_slot, sigpipe_slot;
+ pthread_mutex_lock(&lock);
/* Keep going while our parent process is alive */
while(getppid() != 1) {
int force_report = 0;
fdno = 0;
- /* By default we will wait up to a second before thinking about current
- * state. */
- timeout = 1000;
+ /* By default we will wait up to half a second before thinking about
+ * current state. */
+ timeout = 500;
/* Always ready for commands from the main server. */
stdin_slot = addfd(0, POLLIN);
/* Also always ready for inbound connections */
playing->slot = addfd(playing->fd, POLLIN);
else if(playing)
playing->slot = -1;
+ /* Allow the poll() to be interrupted at the end of a track */
+ sigpipe_slot = addfd(sigpipe[0], POLLIN);
/* If any other tracks don't have a full buffer, try to read sample data
* from them. We do this last of all, so that if we run out of slots,
* nothing important can't be monitored. */
} else
t->slot = -1;
}
- sigpipe_slot = addfd(sigpipe[1], POLLIN);
/* Wait for something interesting to happen */
+ pthread_mutex_unlock(&lock);
n = poll(fds, fdno, timeout);
+ pthread_mutex_lock(&lock);
if(n < 0) {
if(errno == EINTR) continue;
- fatal(errno, "error calling poll");
+ disorder_fatal(errno, "error calling poll");
}
/* Perhaps a connection has arrived */
if(fds[listen_slot].revents & POLLIN) {
char id[24];
if((fd = accept(listenfd, (struct sockaddr *)&addr, &addrlen)) >= 0) {
+ /* We do blocking reads for the header. In theory this means that the
+ * connecting process could wedge the speaker indefinitely. In
+ * practice that would mean that the main server was broken anyway.
+ * Still, this is ugly, and a rewrite would be nice. */
blocking(fd);
if(read(fd, &l, sizeof l) < 4) {
- error(errno, "reading length from inbound connection");
+ disorder_error(errno, "reading length from inbound connection");
xclose(fd);
} else if(l >= sizeof id) {
- error(0, "id length too long");
+ disorder_error(0, "id length too long");
xclose(fd);
} else if(read(fd, id, l) < (ssize_t)l) {
- error(errno, "reading id from inbound connection");
+ disorder_error(errno, "reading id from inbound connection");
xclose(fd);
} else {
id[l] = 0;
D(("id %s fd %d", id, fd));
t = findtrack(id, 1/*create*/);
if (write(fd, "", 1) < 0) /* write an ack */
- error(errno, "writing ack to inbound connection");
+ disorder_error(errno, "writing ack to inbound connection for %s",
+ id);
if(t->fd != -1) {
- error(0, "%s: already got a connection", id);
+ disorder_error(0, "%s: already got a connection", id);
xclose(fd);
} else {
nonblock(fd);
t->fd = fd; /* yay */
}
+ /* Notify the server that the connection arrived */
+ sm.type = SM_ARRIVED;
+ strcpy(sm.u.id, id);
+ speaker_send(1, &sm);
}
} else
- error(errno, "accept");
+ disorder_error(errno, "accept");
}
/* Perhaps we have a command to process */
if(fds[stdin_slot].revents & POLLIN) {
* this won't be the case, so we don't bother looping around to pick them
* all up. */
n = speaker_recv(0, &sm);
- /* TODO */
if(n > 0)
+ /* As a rule we don't send success replies to most commands - we just
+ * force the regular status update to be sent immediately rather than
+ * on schedule. */
switch(sm.type) {
case SM_PLAY:
- if(playing)
- fatal(0, "got SM_PLAY but already playing something");
- t = findtrack(sm.id, 1);
+ /* SM_PLAY is only allowed if the server reasonably believes that
+ * nothing is playing */
+ if(playing) {
+ /* If finished isn't set then the server can't believe that this
+ * track has finished */
+ if(!playing->finished)
+ disorder_fatal(0, "got SM_PLAY but already playing something");
+ /* If pending_playing is set then the server must believe that that
+ * is playing */
+ if(pending_playing)
+ disorder_fatal(0, "got SM_PLAY but have a pending playing track");
+ }
+ t = findtrack(sm.u.id, 1);
D(("SM_PLAY %s fd %d", t->id, t->fd));
if(t->fd == -1)
- error(0, "cannot play track because no connection arrived");
- playing = t;
- force_report = 1;
+ disorder_error(0,
+ "cannot play track because no connection arrived");
+ /* TODO as things stand we often report this error message but then
+ * appear to proceed successfully. Understanding why requires a look
+ * at play.c: we call prepare() which makes the connection in a child
+ * process, and then sends the SM_PLAY in the parent process. The
+ * latter may well be faster. As it happens this is harmless; we'll
+ * just sit around sending silence until the decoder connects and
+ * starts sending some sample data. But is is annoying and ought to
+ * be fixed. */
+ pending_playing = t;
+ /* If nothing is currently playing then we'll switch to the pending
+ * track below so there's no point distinguishing the situations
+ * here. */
break;
case SM_PAUSE:
D(("SM_PAUSE"));
force_report = 1;
break;
case SM_CANCEL:
- D(("SM_CANCEL %s", sm.id));
- t = removetrack(sm.id);
+ D(("SM_CANCEL %s", sm.u.id));
+ t = removetrack(sm.u.id);
if(t) {
- pthread_mutex_lock(&lock);
- if(t == playing) {
- /* scratching the playing track */
+ if(t == playing || t == pending_playing) {
+ /* Scratching the track that the server believes is playing,
+ * which might either be the actual playing track or a pending
+ * playing track */
sm.type = SM_FINISHED;
- playing = 0;
+ if(t == playing)
+ playing = 0;
+ else
+ pending_playing = 0;
} else {
/* Could be scratching the playing track before it's quite got
* going, or could be just removing a track from the queue. We
* log more because there's been a bug here recently than because
* it's particularly interesting; the log message will be removed
* if no further problems show up. */
- info("SM_CANCEL for nonplaying track %s", sm.id);
+ disorder_info("SM_CANCEL for nonplaying track %s", sm.u.id);
sm.type = SM_STILLBORN;
}
- strcpy(sm.id, t->id);
+ strcpy(sm.u.id, t->id);
destroy(t);
- pthread_mutex_unlock(&lock);
} else {
/* Probably scratching the playing track well before it's got
* going, but could indicate a bug, so we log this as an error. */
sm.type = SM_UNKNOWN;
- error(0, "SM_CANCEL for unknown track %s", sm.id);
+ disorder_error(0, "SM_CANCEL for unknown track %s", sm.u.id);
}
speaker_send(1, &sm);
force_report = 1;
break;
case SM_RELOAD:
D(("SM_RELOAD"));
- if(config_read(1))
- error(0, "cannot read configuration");
- info("reloaded configuration");
+ if(config_read(1, NULL))
+ disorder_error(0, "cannot read configuration");
+ disorder_info("reloaded configuration");
break;
+ case SM_RTP_REQUEST:
+ /* TODO the error behavior here is really unhelpful */
+ if(rtp_add_recipient(&sm.u.address))
+ disorder_error(0, "unacceptable RTP destination");
+ break;
+ case SM_RTP_CANCEL:
+ if(rtp_remove_recipient(&sm.u.address))
+ disorder_error(0, "unacceptable RTP destination for removal");
+ break;
default:
- error(0, "unknown message type %d", sm.type);
+ disorder_error(0, "unknown message type %d", sm.type);
}
}
/* Read in any buffered data */
* interrupted poll(). */
if(fds[sigpipe_slot].revents & POLLIN) {
char buffer[64];
+ int ignored; (void)ignored;
- read(sigpipe[0], buffer, sizeof buffer);
+ ignored = read(sigpipe[0], buffer, sizeof buffer);
}
- if(playing && playing->used == 0 && playing->eof) {
- /* The playing track is done. Tell the server, and destroy it. */
+ /* Send SM_FINISHED when we're near the end of the track.
+ *
+ * This is how we implement gapless play; we hope that the SM_PLAY from the
+ * server arrives before the remaining bytes of the track play out.
+ */
+ if(playing
+ && playing->eof
+ && !playing->finished
+ && playing->used <= early_finish) {
memset(&sm, 0, sizeof sm);
sm.type = SM_FINISHED;
- strcpy(sm.id, playing->id);
+ strcpy(sm.u.id, playing->id);
speaker_send(1, &sm);
+ playing->finished = 1;
+ }
+ /* When the track is actually finished, deconfigure it */
+ if(playing && playing->eof && !playing->used) {
+ if(!playing->finished) {
+ /* should never happen but we'd like to know if it does */
+ disorder_fatal(0, "track finish state inconsistent");
+ }
removetrack(playing->id);
- pthread_mutex_lock(&lock);
destroy(playing);
playing = 0;
- pthread_mutex_unlock(&lock);
- /* The server will presumalby send as an SM_PLAY by return */
+ }
+ /* Act on the pending SM_PLAY */
+ if(!playing && pending_playing) {
+ playing = pending_playing;
+ pending_playing = 0;
+ force_report = 1;
}
/* Impose any state change required by the above */
if(playable()) {
if(!activated) {
activated = 1;
+ pthread_mutex_unlock(&lock);
backend->activate();
+ pthread_mutex_lock(&lock);
}
} else {
if(activated) {
activated = 0;
+ pthread_mutex_unlock(&lock);
backend->deactivate();
+ pthread_mutex_lock(&lock);
}
}
/* If we've not reported our state for a second do so now. */
- if(force_report || time(0) > last_report)
+ if(force_report || xtime(0) > last_report)
report();
}
}
struct rlimit rl[1];
set_progname(argv);
- if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale");
+ if(!setlocale(LC_CTYPE, "")) disorder_fatal(errno, "error calling setlocale");
while((n = getopt_long(argc, argv, "hVc:dDSs", options, 0)) >= 0) {
switch(n) {
case 'h': help();
case 'D': debugging = 0; break;
case 'S': logsyslog = 0; break;
case 's': logsyslog = 1; break;
- default: fatal(0, "invalid option");
+ default: disorder_fatal(0, "invalid option");
}
}
if((d = getenv("DISORDER_DEBUG_SPEAKER"))) debugging = atoi(d);
log_default = &log_syslog;
}
config_uaudio_apis = uaudio_apis;
- if(config_read(1)) fatal(0, "cannot read configuration");
+ if(config_read(1, NULL)) disorder_fatal(0, "cannot read configuration");
/* ignore SIGPIPE */
signal(SIGPIPE, SIG_IGN);
/* set nice value */
become_mortal();
/* make sure we're not root, whatever the config says */
if(getuid() == 0 || geteuid() == 0)
- fatal(0, "do not run as root");
+ disorder_fatal(0, "do not run as root");
/* Make sure we can't have more than NFDS files open (it would bust our
* poll() array) */
if(getrlimit(RLIMIT_NOFILE, rl) < 0)
- fatal(errno, "getrlimit RLIMIT_NOFILE");
+ disorder_fatal(errno, "getrlimit RLIMIT_NOFILE");
if(rl->rlim_cur > NFDS) {
rl->rlim_cur = NFDS;
if(setrlimit(RLIMIT_NOFILE, rl) < 0)
- fatal(errno, "setrlimit to reduce RLIMIT_NOFILE to %lu",
+ disorder_fatal(errno, "setrlimit to reduce RLIMIT_NOFILE to %lu",
(unsigned long)rl->rlim_cur);
- info("set RLIM_NOFILE to %lu", (unsigned long)rl->rlim_cur);
+ disorder_info("set RLIM_NOFILE to %lu", (unsigned long)rl->rlim_cur);
} else
- info("RLIM_NOFILE is %lu", (unsigned long)rl->rlim_cur);
+ disorder_info("RLIM_NOFILE is %lu", (unsigned long)rl->rlim_cur);
+ /* gcrypt initialization */
+ if(!gcry_check_version(NULL))
+ disorder_fatal(0, "gcry_check_version failed");
+ gcry_control(GCRYCTL_INIT_SECMEM, 0);
+ gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
/* create a pipe between the backend callback and the poll() loop */
xpipe(sigpipe);
nonblock(sigpipe[0]);
config->sample_format.channels,
config->sample_format.bits,
config->sample_format.bits != 8);
+ early_finish = uaudio_sample_size * uaudio_channels * uaudio_rate;
/* TODO other parameters! */
backend = uaudio_find(config->api);
/* backend-specific initialization */
if(backend->configure)
backend->configure();
+ uaudio_set("application", "disorder-speaker");
backend->start(speaker_callback, NULL);
- /* create the socket directory */
- byte_xasprintf(&dir, "%s/speaker", config->home);
+ /* create the private socket directory */
+ byte_xasprintf(&dir, "%s/private", config->home);
unlink(dir); /* might be a leftover socket */
if(mkdir(dir, 0700) < 0 && errno != EEXIST)
- fatal(errno, "error creating %s", dir);
+ disorder_fatal(errno, "error creating %s", dir);
/* set up the listen socket */
listenfd = xsocket(PF_UNIX, SOCK_STREAM, 0);
memset(&addr, 0, sizeof addr);
addr.sun_family = AF_UNIX;
- snprintf(addr.sun_path, sizeof addr.sun_path, "%s/speaker/socket",
+ snprintf(addr.sun_path, sizeof addr.sun_path, "%s/private/speaker",
config->home);
if(unlink(addr.sun_path) < 0 && errno != ENOENT)
- error(errno, "removing %s", addr.sun_path);
+ disorder_error(errno, "removing %s", addr.sun_path);
xsetsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof one);
if(bind(listenfd, (const struct sockaddr *)&addr, sizeof addr) < 0)
- fatal(errno, "error binding socket to %s", addr.sun_path);
+ disorder_fatal(errno, "error binding socket to %s", addr.sun_path);
xlisten(listenfd, 128);
nonblock(listenfd);
- info("listening on %s", addr.sun_path);
+ disorder_info("version "VERSION" process ID %lu",
+ (unsigned long)getpid());
+ disorder_info("listening on %s", addr.sun_path);
memset(&sm, 0, sizeof sm);
sm.type = SM_READY;
speaker_send(1, &sm);
mainloop();
- info("stopped (parent terminated)");
+ disorder_info("stopped (parent terminated)");
exit(0);
}