From 84aa9f9339ef6fa104588dd510c433ef20a96fe1 Mon Sep 17 00:00:00 2001 Message-Id: <84aa9f9339ef6fa104588dd510c433ef20a96fe1.1714829184.git.mdw@distorted.org.uk> From: Mark Wooding Date: Wed, 3 Oct 2007 20:23:09 +0100 Subject: [PATCH] banish fd passing. currently a bit ugly but seems to work Organization: Straylight/Edgeware From: Richard Kettlewell --- lib/speaker-protocol.c | 66 +++----------------------- lib/speaker-protocol.h | 18 ++----- server/play.c | 105 ++++++++++++++++++++++++----------------- server/speaker.c | 99 ++++++++++++++++++++++++++++---------- 4 files changed, 148 insertions(+), 140 deletions(-) diff --git a/lib/speaker-protocol.c b/lib/speaker-protocol.c index 1c5199a..ab8b4ee 100644 --- a/lib/speaker-protocol.c +++ b/lib/speaker-protocol.c @@ -27,7 +27,8 @@ #include #include #include -#include +#include +#include #include "speaker-protocol.h" #include "log.h" @@ -35,85 +36,32 @@ /** @brief Send a speaker message * @param fd File descriptor to send to * @param sm Pointer to message - * @param datafd File descriptoxr to pass with message or -1 - * - * @p datafd will be the output from some decoder. */ -void speaker_send(int fd, const struct speaker_message *sm, int datafd) { - struct msghdr m; - struct iovec iov; - union { - struct cmsghdr cmsg; - char size[CMSG_SPACE(sizeof (int))]; - } u; +void speaker_send(int fd, const struct speaker_message *sm) { int ret; - memset(&m, 0, sizeof m); - m.msg_iov = &iov; - m.msg_iovlen = 1; - iov.iov_base = (void *)sm; - iov.iov_len = sizeof *sm; - if(datafd != -1) { - m.msg_control = (void *)&u.cmsg; - m.msg_controllen = sizeof u; - memset(&u, 0, sizeof u); - u.cmsg.cmsg_len = CMSG_LEN(sizeof (int)); - u.cmsg.cmsg_level = SOL_SOCKET; - u.cmsg.cmsg_type = SCM_RIGHTS; - *(int *)CMSG_DATA(&u.cmsg) = datafd; - } do { - ret = sendmsg(fd, &m, 0); + ret = write(fd, sm, sizeof *sm); } while(ret < 0 && errno == EINTR); if(ret < 0) - fatal(errno, "sendmsg"); + fatal(errno, "write"); } /** @brief Receive a speaker message * @param fd File descriptor to read from * @param sm Where to store received message - * @param datafd Where to store received file descriptor or NULL * @return -ve on @c EAGAIN, 0 at EOF, +ve on success - * - * If @p datafd is NULL but a file descriptor is nonetheless received, - * the process is terminated with an error. */ -int speaker_recv(int fd, struct speaker_message *sm, int *datafd) { - struct msghdr m; - struct iovec iov; - union { - struct cmsghdr cmsg; - char size[CMSG_SPACE(sizeof (int))]; - } u; +int speaker_recv(int fd, struct speaker_message *sm) { int ret; - memset(&m, 0, sizeof m); - m.msg_iov = &iov; - m.msg_iovlen = 1; - iov.iov_base = (void *)sm; - iov.iov_len = sizeof *sm; - if(datafd) { - m.msg_control = (void *)&u.cmsg; - m.msg_controllen = sizeof u; - memset(&u, 0, sizeof u); - u.cmsg.cmsg_len = CMSG_LEN(sizeof (int)); - u.cmsg.cmsg_level = SOL_SOCKET; - u.cmsg.cmsg_type = SCM_RIGHTS; - *datafd = -1; - } do { - ret = recvmsg(fd, &m, MSG_DONTWAIT); + ret = read(fd, sm, sizeof *sm); } while(ret < 0 && errno == EINTR); if(ret < 0) { if(errno != EAGAIN) fatal(errno, "recvmsg"); return -1; } - if((size_t)m.msg_controllen >= CMSG_LEN(sizeof (int))) { - if(!datafd) - fatal(0, "got an unexpected file descriptor from recvmsg"); - else - *datafd = *(int *)CMSG_DATA(&u.cmsg); - } return ret; } diff --git a/lib/speaker-protocol.h b/lib/speaker-protocol.h index 8809f0f..b271373 100644 --- a/lib/speaker-protocol.h +++ b/lib/speaker-protocol.h @@ -32,7 +32,6 @@ struct speaker_message { /** @brief Message type * * Messges from the main server: - * - @ref SM_PREPARE * - @ref SM_PLAY * - @ref SM_PAUSE * - @ref SM_RESUME @@ -54,12 +53,6 @@ struct speaker_message { }; /* messages from the main DisOrder server */ -/** @brief Prepare track @c id - * - * This message will include a file descriptor. The speaker starts buffering - * audio data read from this file against the time that it must be played. - */ -#define SM_PREPARE 0 /** @brief Play track @c id * @@ -96,13 +89,12 @@ struct speaker_message { */ #define SM_PLAYING 131 -void speaker_send(int fd, const struct speaker_message *sm, int datafd); -/* Send a message. DATAFD is passed too if not -1. Does not close DATAFD. */ +void speaker_send(int fd, const struct speaker_message *sm); +/* Send a message. */ -int speaker_recv(int fd, struct speaker_message *sm, int *datafd); -/* Receive a message. If DATAFD is not null then can receive an FD. Return 0 - * on EOF, +ve if a message is read, -1 on EAGAIN, terminates on any other - * error. */ +int speaker_recv(int fd, struct speaker_message *sm); +/* Receive a message. Return 0 on EOF, +ve if a message is read, -1 on EAGAIN, + * terminates on any other error. */ /** @brief One chunk in a stream */ struct stream_header { diff --git a/server/play.c b/server/play.c index b50e19b..1530fe8 100644 --- a/server/play.c +++ b/server/play.c @@ -36,6 +36,7 @@ #include #include #include +#include #include "event.h" #include "log.h" @@ -95,7 +96,7 @@ static int speaker_terminated(ev_source attribute((unused)) *ev, static int speaker_readable(ev_source *ev, int fd, void attribute((unused)) *u) { struct speaker_message sm; - int ret = speaker_recv(fd, &sm, 0); + int ret = speaker_recv(fd, &sm); if(ret < 0) return 0; /* EAGAIN */ if(!ret) { /* EOF */ @@ -169,7 +170,7 @@ void speaker_reload(void) { memset(&sm, 0, sizeof sm); sm.type = SM_RELOAD; - speaker_send(speaker_fd, &sm, -1); + speaker_send(speaker_fd, &sm); } /* timeout for play retry */ @@ -238,9 +239,9 @@ static int player_finished(ev_source *ev, switch(q->state) { case playing_unplayed: case playing_random: - /* If this was an SM_PREPARE track then either it failed or we deliberately - * stopped it because it was removed from the queue or moved down it. So - * leave it state alone for future use. */ + /* If this was a pre-prepared track then either it failed or we + * deliberately stopped it because it was removed from the queue or moved + * down it. So leave it state alone for future use. */ break; default: /* We actually started playing this track. */ @@ -285,13 +286,18 @@ static int find_player(const struct queue_entry *q) { #define START_HARDFAIL 1 /* Track is broken. */ #define START_SOFTFAIL 2 /* Track OK, system (temporarily?) broken */ -/* Play or prepare Q */ +/** @brief Play or prepare @p q + * @param ev Event loop + * @param q Track to play/prepare + * @param prepare_only If true, only prepares track + * @return @ref START_OK, @ref START_HARDFAIL or @ref START_SOFTFTAIL + */ static int start(ev_source *ev, struct queue_entry *q, - int smop) { + int prepare_only) { int n, lfd; const char *p; - int np[2], sp[2]; + int np[2], sfd; struct speaker_message sm; char buffer[64]; int optc; @@ -302,16 +308,19 @@ static int start(ev_source *ev, const char *waitdevice = 0; const char *const *optv; pid_t pid, npid; + struct sockaddr_un addr; + uint32_t l; memset(&sm, 0, sizeof sm); + D(("start %s %d", q->id, prepare_only)); if(find_player_pid(q->id) > 0) { - if(smop == SM_PREPARE) return START_OK; - /* We have already sent an SM_PREPARE for this track so we just need to - * tell the speaker process to start actually playing the queued up audio - * data */ + if(prepare_only) return START_OK; + /* We have already prepared this track so we just need to tell the speaker + * process to start actually playing the queued up audio data */ strcpy(sm.id, q->id); sm.type = SM_PLAY; - speaker_send(speaker_fd, &sm, -1); + speaker_send(speaker_fd, &sm); + D(("sent SM_PLAY for %s", sm.id)); return START_OK; } /* Find the player plugin. */ @@ -320,7 +329,7 @@ static int start(ev_source *ev, return START_HARDFAIL; q->type = play_get_type(q->pl); /* Can't prepare non-raw tracks. */ - if(smop == SM_PREPARE + if(prepare_only && (q->type & DISORDER_PLAYER_TYPEMASK) != DISORDER_PLAYER_RAW) return START_OK; /* Call the prefork function. */ @@ -365,52 +374,62 @@ static int start(ev_source *ev, xclose(lfd); /* tidy up */ setpgid(0, 0); if((q->type & DISORDER_PLAYER_TYPEMASK) == DISORDER_PLAYER_RAW) { - /* "Raw" format players need special treatment: - * 1) their output needs to go via the disorder-normalize process - * 2) the output of that needs to be passed to the disorder-speaker - * process. + /* "Raw" format players always have their output send down a pipe + * to the disorder-normalize process. This will connect to the + * speaker process to actually play the audio data. */ /* np will be the pipe to disorder-normalize */ if(socketpair(PF_UNIX, SOCK_STREAM, 0, np) < 0) fatal(errno, "error calling socketpair"); xshutdown(np[0], SHUT_WR); /* normalize reads from np[0] */ xshutdown(np[1], SHUT_RD); /* decoder writes to np[1] */ - /* sp will be the pipe to disorder-speaker */ - sm.type = smop; - if(socketpair(PF_UNIX, SOCK_STREAM, 0, sp) < 0) - fatal(errno, "error calling socketpair"); - xshutdown(sp[0], SHUT_WR); /* speaker reads from sp[0] */ - xshutdown(sp[1], SHUT_RD); /* normalize writes to sp[1] */ /* Start disorder-normalize */ if(!(npid = xfork())) { if(!xfork()) { + /* Connect to the speaker process */ + memset(&addr, 0, sizeof addr); + addr.sun_family = AF_UNIX; + snprintf(addr.sun_path, sizeof addr.sun_path, + "%s/speaker", config->home); + sfd = xsocket(PF_UNIX, SOCK_STREAM, 0); + if(connect(sfd, &addr, sizeof addr) < 0) + fatal(errno, "connecting to %s", addr.sun_path); + l = strlen(q->id); + if(write(sfd, &l, sizeof l) < 0 + || write(sfd, q->id, l) < 0) + fatal(errno, "writing to %s", addr.sun_path); + /* Await the ack */ + read(sfd, &l, 1); + /* Plumbing */ xdup2(np[0], 0); - xdup2(sp[1], 1); + xdup2(sfd, 1); xclose(np[0]); xclose(np[1]); - xclose(sp[0]); - xclose(sp[1]); + xclose(sfd); + /* Ask the speaker to actually start playing the track; we do it here + * so it's definitely after ack. */ + if(!prepare_only) { + strcpy(sm.id, q->id); + sm.type = SM_PLAY; + speaker_send(speaker_fd, &sm); + D(("sent SM_PLAY for %s", sm.id)); + } execlp("disorder-normalize", "disorder-normalize", (char *)0); fatal(errno, "executing disorder-normalize"); + /* end of the innermost fork */ } _exit(0); - } else { - int w; - - while(waitpid(npid, &w, 0) < 0 && errno == EINTR) - ; + /* end of the middle fork */ } - /* Send the speaker process the file descriptor to read from */ - strcpy(sm.id, q->id); - speaker_send(speaker_fd, &sm, sp[0]); + /* Wait for the middle fork to finish */ + while(waitpid(npid, &n, 0) < 0 && errno == EINTR) + ; /* Pass the file descriptor to the driver in an environment * variable. */ snprintf(buffer, sizeof buffer, "DISORDER_RAW_FD=%d", np[1]); if(putenv(buffer) < 0) fatal(errno, "error calling putenv"); /* Close all the FDs we don't need */ - xclose(sp[0]); - xclose(sp[1]); xclose(np[0]); } if(waitdevice) { @@ -466,7 +485,7 @@ int prepare(ev_source *ev, q->type = play_get_type(q->pl); if((q->type & DISORDER_PLAYER_TYPEMASK) != DISORDER_PLAYER_RAW) return 0; /* Not a raw player */ - return start(ev, q, SM_PREPARE); /* Prepare it */ + return start(ev, q, 1/*prepare_only*/); /* Prepare it */ } void abandon(ev_source attribute((unused)) *ev, @@ -484,7 +503,7 @@ void abandon(ev_source attribute((unused)) *ev, memset(&sm, 0, sizeof sm); sm.type = SM_CANCEL; strcpy(sm.id, q->id); - speaker_send(speaker_fd, &sm, -1); + speaker_send(speaker_fd, &sm); } int add_random_track(void) { @@ -542,7 +561,7 @@ void play(ev_source *ev) { return; D(("taken %p (%s) from queue", (void *)q, q->track)); /* Try to start playing. */ - switch(start(ev, q, SM_PLAY)) { + switch(start(ev, q, 0/*!prepare_only*/)) { case START_HARDFAIL: if(q == qhead.next) { queue_remove(q, 0); /* Abandon this track. */ @@ -645,7 +664,7 @@ void scratch(const char *who, const char *id) { memset(&sm, 0, sizeof sm); sm.type = SM_CANCEL; strcpy(sm.id, playing->id); - speaker_send(speaker_fd, &sm, -1); + speaker_send(speaker_fd, &sm); D(("sending SM_CANCEL for %s", playing->id)); } /* put a scratch track onto the front of the queue (but don't @@ -713,7 +732,7 @@ int pause_playing(const char *who) { case DISORDER_PLAYER_RAW: memset(&sm, 0, sizeof sm); sm.type = SM_PAUSE; - speaker_send(speaker_fd, &sm, -1); + speaker_send(speaker_fd, &sm); break; } if(who) info("paused by %s", who); @@ -744,7 +763,7 @@ void resume_playing(const char *who) { case DISORDER_PLAYER_RAW: memset(&sm, 0, sizeof sm); sm.type = SM_RESUME; - speaker_send(speaker_fd, &sm, -1); + speaker_send(speaker_fd, &sm); break; } if(who) info("resumed by %s", who); diff --git a/server/speaker.c b/server/speaker.c index 70d2162..11916c1 100644 --- a/server/speaker.c +++ b/server/speaker.c @@ -66,6 +66,7 @@ #include #include #include +#include #include "configuration.h" #include "syscalls.h" @@ -91,6 +92,9 @@ struct pollfd fds[NFDS]; /** @brief Next free slot in @ref fds */ int fdno; +/** @brief Listen socket */ +static int listenfd; + static time_t last_report; /* when we last reported */ static int paused; /* pause status */ @@ -179,15 +183,6 @@ static void destroy(struct track *t) { free(t); } -/** @brief Notice a new connection */ -static void acquire(struct track *t, int fd) { - D(("acquire %s %d", t->id, fd)); - if(t->fd != -1) - xclose(t->fd); - t->fd = fd; - nonblock(fd); -} - /** @brief Read data into a sample buffer * @param t Pointer to track * @return 0 on success, -1 on EOF @@ -249,7 +244,7 @@ void abandon(void) { memset(&sm, 0, sizeof sm); sm.type = SM_FINISHED; strcpy(sm.id, playing->id); - speaker_send(1, &sm, 0); + speaker_send(1, &sm); removetrack(playing->id); destroy(playing); playing = 0; @@ -350,7 +345,7 @@ static void report(void) { sm.type = paused ? SM_PAUSED : SM_PLAYING; strcpy(sm.id, playing->id); sm.data = playing->played / config->sample_format.rate; - speaker_send(1, &sm, 0); + speaker_send(1, &sm); } time(&last_report); } @@ -400,7 +395,7 @@ static int playable(void) { static void mainloop(void) { struct track *t; struct speaker_message sm; - int n, fd, stdin_slot, timeout; + int n, fd, stdin_slot, timeout, listen_slot; while(getppid() != 1) { fdno = 0; @@ -409,9 +404,14 @@ static void mainloop(void) { timeout = 1000; /* Always ready for commands from the main server. */ stdin_slot = addfd(0, POLLIN); + /* Also always ready for inbound connections */ + listen_slot = addfd(listenfd, POLLIN); /* Try to read sample data for the currently playing track if there is * buffer space. */ - if(playing && !playing->eof && playing->used < (sizeof playing->buffer)) + if(playing + && playing->fd >= 0 + && !playing->eof + && playing->used < (sizeof playing->buffer)) playing->slot = addfd(playing->fd, POLLIN); else if(playing) playing->slot = -1; @@ -432,7 +432,9 @@ static void mainloop(void) { * nothing important can't be monitored. */ for(t = tracks; t; t = t->next) if(t != playing) { - if(!t->eof && t->used < sizeof t->buffer) { + if(t->fd >= 0 + && !t->eof + && t->used < sizeof t->buffer) { t->slot = addfd(t->fd, POLLIN | POLLHUP); } else t->slot = -1; @@ -459,25 +461,54 @@ static void mainloop(void) { play(3 * FRAMES); } } + /* Perhaps a connection has arrived */ + if(fds[listen_slot].revents & POLLIN) { + struct sockaddr_un addr; + socklen_t addrlen = sizeof addr; + uint32_t l; + char id[24]; + + if((fd = accept(listenfd, &addr, &addrlen)) >= 0) { + if(read(fd, &l, sizeof l) < 4) { + error(errno, "reading length from inbound connection"); + xclose(fd); + } else if(l >= sizeof id) { + error(0, "id length too long"); + xclose(fd); + } else if(read(fd, id, l) < (ssize_t)l) { + error(errno, "reading id from inbound connection"); + xclose(fd); + } else { + id[l] = 0; + D(("id %s fd %d", id, fd)); + t = findtrack(id, 1/*create*/); + write(fd, "", 1); /* write an ack */ + if(t->fd != -1) { + error(0, "got a connection for a track that already has one"); + xclose(fd); + } else { + nonblock(fd); + t->fd = fd; /* yay */ + } + } + } else + error(errno, "accept"); + } /* Perhaps we have a command to process */ if(fds[stdin_slot].revents & POLLIN) { /* There might (in theory) be several commands queued up, but in general * this won't be the case, so we don't bother looping around to pick them * all up. */ - n = speaker_recv(0, &sm, &fd); + n = speaker_recv(0, &sm); + /* TODO */ if(n > 0) switch(sm.type) { - case SM_PREPARE: - D(("SM_PREPARE %s %d", sm.id, fd)); - if(fd == -1) fatal(0, "got SM_PREPARE but no file descriptor"); - t = findtrack(sm.id, 1); - acquire(t, fd); - break; case SM_PLAY: - D(("SM_PLAY %s %d", sm.id, fd)); if(playing) fatal(0, "got SM_PLAY but already playing something"); t = findtrack(sm.id, 1); - if(fd != -1) acquire(t, fd); + D(("SM_PLAY %s fd %d", t->id, t->fd)); + if(t->fd == -1) + error(0, "cannot play track because no connection arrived"); playing = t; /* We attempt to play straight away rather than going round the loop. * play() is clever enough to perform any activation that is @@ -507,7 +538,7 @@ static void mainloop(void) { if(t == playing) { sm.type = SM_FINISHED; strcpy(sm.id, playing->id); - speaker_send(1, &sm, 0); + speaker_send(1, &sm); playing = 0; } destroy(t); @@ -526,7 +557,9 @@ static void mainloop(void) { } /* Read in any buffered data */ for(t = tracks; t; t = t->next) - if(t->slot != -1 && (fds[t->slot].revents & (POLLIN | POLLHUP))) + if(t->fd != -1 + && t->slot != -1 + && (fds[t->slot].revents & (POLLIN | POLLHUP))) fill(t); /* Maybe we finished playing a track somewhere in the above */ maybe_finished(); @@ -542,6 +575,8 @@ static void mainloop(void) { int main(int argc, char **argv) { int n; + struct sockaddr_un addr; + static const int one = 1; set_progname(argv); if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale"); @@ -582,6 +617,20 @@ int main(int argc, char **argv) { backend = backends[n]; /* backend-specific initialization */ backend->init(); + /* set up the listen socket */ + listenfd = xsocket(PF_UNIX, SOCK_STREAM, 0); + memset(&addr, 0, sizeof addr); + addr.sun_family = AF_UNIX; + snprintf(addr.sun_path, sizeof addr.sun_path, "%s/speaker", + config->home); + if(unlink(addr.sun_path) < 0 && errno != ENOENT) + error(errno, "removing %s", addr.sun_path); + xsetsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof one); + if(bind(listenfd, &addr, sizeof addr) < 0) + fatal(errno, "error binding socket to %s", addr.sun_path); + xlisten(listenfd, 128); + nonblock(listenfd); + info("listening on %s", addr.sun_path); mainloop(); info("stopped (parent terminated)"); exit(0); -- [mdw]