X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/disorder/blobdiff_plain/cfa7dda12a65898b5eab2ed504fd765fef5c7527..af66d051899a3ac7d37095807ff14069f0815285:/clients/playrtp.c diff --git a/clients/playrtp.c b/clients/playrtp.c index 2a4dc86..9757be6 100644 --- a/clients/playrtp.c +++ b/clients/playrtp.c @@ -65,6 +65,10 @@ #include #include #include +#include +#include +#include +#include #include "log.h" #include "mem.h" @@ -76,10 +80,17 @@ #include "vector.h" #include "heap.h" #include "timeval.h" +#include "client.h" #include "playrtp.h" +#include "inputline.h" #define readahead linux_headers_are_borked +/** @brief Obsolete synonym */ +#ifndef IPV6_JOIN_GROUP +# define IPV6_JOIN_GROUP IPV6_ADD_MEMBERSHIP +#endif + /** @brief RTP socket */ static int rtpfd; @@ -163,7 +174,7 @@ pthread_cond_t cond = PTHREAD_COND_INITIALIZER; #if HAVE_ALSA_ASOUNDLIB_H # define DEFAULT_BACKEND playrtp_alsa -#elif HAVE_SYS_SOUNDCARD_H +#elif HAVE_SYS_SOUNDCARD_H || EMPEG_HOST # define DEFAULT_BACKEND playrtp_oss #elif HAVE_COREAUDIO_AUDIOHARDWARE_H # define DEFAULT_BACKEND playrtp_coreaudio @@ -176,6 +187,9 @@ static void (*backend)(void) = &DEFAULT_BACKEND; HEAP_DEFINE(pheap, struct packet *, lt_packet); +/** @brief Control socket or NULL */ +const char *control_socket; + static const struct option options[] = { { "help", no_argument, 0, 'h' }, { "version", no_argument, 0, 'V' }, @@ -185,8 +199,7 @@ static const struct option options[] = { { "max", required_argument, 0, 'x' }, { "buffer", required_argument, 0, 'b' }, { "rcvbuf", required_argument, 0, 'R' }, - { "multicast", required_argument, 0, 'M' }, -#if HAVE_SYS_SOUNDCARD_H +#if HAVE_SYS_SOUNDCARD_H || EMPEG_HOST { "oss", no_argument, 0, 'o' }, #endif #if HAVE_ALSA_ASOUNDLIB_H @@ -195,9 +208,77 @@ static const struct option options[] = { #if HAVE_COREAUDIO_AUDIOHARDWARE_H { "core-audio", no_argument, 0, 'c' }, #endif + { "socket", required_argument, 0, 's' }, + { "config", required_argument, 0, 'C' }, { 0, 0, 0, 0 } }; +/** @brief Control thread + * + * This thread is responsible for accepting control commands from Disobedience + * (or other controllers) over an AF_UNIX stream socket with a path specified + * by the @c --socket option. The protocol uses simple string commands and + * replies: + * + * - @c stop will shut the player down + * - @c query will send back the reply @c running + * - anything else is ignored + * + * Commands and response strings terminated by shutting down the connection or + * by a newline. No attempt is made to multiplex multiple clients so it is + * important that the command be sent as soon as the connection is made - it is + * assumed that both parties to the protocol are entirely cooperating with one + * another. + */ +static void *control_thread(void attribute((unused)) *arg) { + struct sockaddr_un sa; + int sfd, cfd; + char *line; + socklen_t salen; + FILE *fp; + + assert(control_socket); + unlink(control_socket); + memset(&sa, 0, sizeof sa); + sa.sun_family = AF_UNIX; + strcpy(sa.sun_path, control_socket); + sfd = xsocket(PF_UNIX, SOCK_STREAM, 0); + if(bind(sfd, (const struct sockaddr *)&sa, sizeof sa) < 0) + fatal(errno, "error binding to %s", control_socket); + if(listen(sfd, 128) < 0) + fatal(errno, "error calling listen on %s", control_socket); + info("listening on %s", control_socket); + for(;;) { + salen = sizeof sa; + cfd = accept(sfd, (struct sockaddr *)&sa, &salen); + if(cfd < 0) { + switch(errno) { + case EINTR: + case EAGAIN: + break; + default: + fatal(errno, "error calling accept on %s", control_socket); + } + } + if(!(fp = fdopen(cfd, "r+"))) { + error(errno, "error calling fdopen for %s connection", control_socket); + close(cfd); + continue; + } + if(!inputline(control_socket, fp, &line, '\n')) { + if(!strcmp(line, "stop")) { + info("stopped via %s", control_socket); + exit(0); /* terminate immediately */ + } + if(!strcmp(line, "query")) + fprintf(fp, "running"); + xfree(line); + } + if(fclose(fp) < 0) + error(errno, "error closing %s connection", control_socket); + } +} + /** @brief Drop the first packet * * Assumes that @ref lock is held. @@ -387,11 +468,14 @@ struct packet *playrtp_next_packet(void) { */ static void play_rtp(void) { pthread_t ltid; + int err; /* We receive and convert audio data in a background thread */ - pthread_create(<id, 0, listen_thread, 0); + if((err = pthread_create(<id, 0, listen_thread, 0))) + fatal(err, "pthread_create listen_thread"); /* We have a second thread to add received packets to the queue */ - pthread_create(<id, 0, queue_thread, 0); + if((err = pthread_create(<id, 0, queue_thread, 0))) + fatal(err, "pthread_create queue_thread"); /* The rest of the work is backend-specific */ backend(); } @@ -406,11 +490,11 @@ static void help(void) { " --buffer, -b FRAMES Buffer high water mark\n" " --max, -x FRAMES Buffer maximum size\n" " --rcvbuf, -R BYTES Socket receive buffer size\n" - " --multicast, -M GROUP Join multicast group\n" + " --config, -C PATH Set configuration file\n" #if HAVE_ALSA_ASOUNDLIB_H " --alsa, -a Use ALSA to play audio\n" #endif -#if HAVE_SYS_SOUNDCARD_H +#if HAVE_SYS_SOUNDCARD_H || EMPEG_HOST " --oss, -o Use OSS to play audio\n" #endif #if HAVE_COREAUDIO_AUDIOHARDWARE_H @@ -431,15 +515,23 @@ static void version(void) { } int main(int argc, char **argv) { - int n; + int n, err; struct addrinfo *res; struct stringlist sl; char *sockname; int rcvbuf, target_rcvbuf = 131072; socklen_t len; - char *multicast_group = 0; struct ip_mreq mreq; struct ipv6_mreq mreq6; + disorder_client *c; + char *address, *port; + int is_multicast; + union any_sockaddr { + struct sockaddr sa; + struct sockaddr_in in; + struct sockaddr_in6 in6; + }; + union any_sockaddr mgroup; static const struct addrinfo prefs = { AI_PASSIVE, @@ -454,7 +546,7 @@ int main(int argc, char **argv) { mem_init(); if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale"); - while((n = getopt_long(argc, argv, "hVdD:m:b:x:L:R:M:aoc", options, 0)) >= 0) { + while((n = getopt_long(argc, argv, "hVdD:m:b:x:L:R:M:aocC:", options, 0)) >= 0) { switch(n) { case 'h': help(); case 'V': version(); @@ -465,49 +557,91 @@ int main(int argc, char **argv) { case 'x': maxbuffer = 2 * atol(optarg); break; case 'L': logfp = fopen(optarg, "w"); break; case 'R': target_rcvbuf = atoi(optarg); break; - case 'M': multicast_group = optarg; break; #if HAVE_ALSA_ASOUNDLIB_H case 'a': backend = playrtp_alsa; break; #endif -#if HAVE_SYS_SOUNDCARD_H +#if HAVE_SYS_SOUNDCARD_H || EMPEG_HOST case 'o': backend = playrtp_oss; break; #endif #if HAVE_COREAUDIO_AUDIOHARDWARE_H case 'c': backend = playrtp_coreaudio; break; #endif + case 'C': configfile = optarg; break; + case 's': control_socket = optarg; break; default: fatal(0, "invalid option"); } } + if(config_read(0)) fatal(0, "cannot read configuration"); if(!maxbuffer) maxbuffer = 4 * readahead; argc -= optind; argv += optind; - if(argc < 1 || argc > 2) - fatal(0, "usage: disorder-playrtp [OPTIONS] ADDRESS [PORT]"); - sl.n = argc; - sl.s = argv; - /* Listen for inbound audio data */ + switch(argc) { + case 0: + /* Get configuration from server */ + if(!(c = disorder_new(1))) exit(EXIT_FAILURE); + if(disorder_connect(c)) exit(EXIT_FAILURE); + if(disorder_rtp_address(c, &address, &port)) exit(EXIT_FAILURE); + sl.n = 2; + sl.s = xcalloc(2, sizeof *sl.s); + sl.s[0] = address; + sl.s[1] = port; + break; + case 1: + case 2: + /* Use command-line ADDRESS+PORT or just PORT */ + sl.n = argc; + sl.s = argv; + break; + default: + fatal(0, "usage: disorder-playrtp [OPTIONS] [[ADDRESS] PORT]"); + } + /* Look up address and port */ if(!(res = get_address(&sl, &prefs, &sockname))) exit(1); + /* Create the socket */ if((rtpfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) < 0) fatal(errno, "error creating socket"); + /* Stash the multicast group address */ + if((is_multicast = multicast(res->ai_addr))) { + memcpy(&mgroup, res->ai_addr, res->ai_addrlen); + switch(res->ai_addr->sa_family) { + case AF_INET: + mgroup.in.sin_port = 0; + break; + case AF_INET6: + mgroup.in6.sin6_port = 0; + break; + } + } + /* Bind to 0/port */ + switch(res->ai_addr->sa_family) { + case AF_INET: + memset(&((struct sockaddr_in *)res->ai_addr)->sin_addr, 0, + sizeof (struct in_addr)); + break; + case AF_INET6: + memset(&((struct sockaddr_in6 *)res->ai_addr)->sin6_addr, 0, + sizeof (struct in6_addr)); + break; + default: + fatal(0, "unsupported family %d", (int)res->ai_addr->sa_family); + } if(bind(rtpfd, res->ai_addr, res->ai_addrlen) < 0) fatal(errno, "error binding socket to %s", sockname); - if(multicast_group) { - if((n = getaddrinfo(multicast_group, 0, &prefs, &res))) - fatal(0, "getaddrinfo %s: %s", multicast_group, gai_strerror(n)); - switch(res->ai_family) { + if(is_multicast) { + switch(mgroup.sa.sa_family) { case PF_INET: - mreq.imr_multiaddr = ((struct sockaddr_in *)res->ai_addr)->sin_addr; + mreq.imr_multiaddr = mgroup.in.sin_addr; mreq.imr_interface.s_addr = 0; /* use primary interface */ if(setsockopt(rtpfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof mreq) < 0) fatal(errno, "error calling setsockopt IP_ADD_MEMBERSHIP"); break; case PF_INET6: - mreq6.ipv6mr_multiaddr = ((struct sockaddr_in6 *)res->ai_addr)->sin6_addr; + mreq6.ipv6mr_multiaddr = mgroup.in6.sin6_addr; memset(&mreq6.ipv6mr_interface, 0, sizeof mreq6.ipv6mr_interface); if(setsockopt(rtpfd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mreq6, sizeof mreq6) < 0) @@ -516,7 +650,10 @@ int main(int argc, char **argv) { default: fatal(0, "unsupported address family %d", res->ai_family); } - } + info("listening on %s multicast group %s", + format_sockaddr(res->ai_addr), format_sockaddr(&mgroup.sa)); + } else + info("listening on %s", format_sockaddr(res->ai_addr)); len = sizeof rcvbuf; if(getsockopt(rtpfd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &len) < 0) fatal(errno, "error calling getsockopt SO_RCVBUF"); @@ -533,6 +670,12 @@ int main(int argc, char **argv) { info("default socket receive buffer %d", rcvbuf); if(logfp) info("WARNING: -L option can impact performance"); + if(control_socket) { + pthread_t tid; + + if((err = pthread_create(&tid, 0, control_thread, 0))) + fatal(err, "pthread_create control_thread"); + } play_rtp(); return 0; }