X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/disorder/blobdiff_plain/346ba8d53eb0c9cead1d3818eedc1b004ea938af..416609cff1292890e48a89a0d90bcf421e735a32:/lib/eclient.c diff --git a/lib/eclient.c b/lib/eclient.c index 519e603..012db98 100644 --- a/lib/eclient.c +++ b/lib/eclient.c @@ -1,6 +1,6 @@ /* * This file is part of DisOrder. - * Copyright (C) 2006 Richard Kettlewell + * Copyright (C) 2006, 2007 Richard Kettlewell * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -17,6 +17,9 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA */ +/** @file lib/eclient.c + * @brief Client code for event-driven programs + */ #include #include "types.h" @@ -34,6 +37,7 @@ #include #include #include +#include #include "log.h" #include "mem.h" @@ -55,18 +59,23 @@ /* TODO: more commands */ +/** @brief How often to send data to the server when receiving logs */ +#define LOG_PROD_INTERVAL 10 + /* Types *********************************************************************/ +/** @brief Client state */ enum client_state { - state_disconnected, /* not connected */ - state_connecting, /* waiting for connect() */ - state_connected, /* connected but not authenticated */ - state_idle, /* not doing anything */ - state_cmdresponse, /* waiting for command resonse */ - state_body, /* accumulating body */ - state_log, /* monitoring log */ + state_disconnected, /**< @brief not connected */ + state_connecting, /**< @brief waiting for connect() */ + state_connected, /**< @brief connected but not authenticated */ + state_idle, /**< @brief not doing anything */ + state_cmdresponse, /**< @brief waiting for command resonse */ + state_body, /**< @brief accumulating body */ + state_log, /**< @brief monitoring log */ }; +/** @brief Names for @ref client_state */ static const char *const states[] = { "disconnected", "connecting", @@ -79,42 +88,64 @@ static const char *const states[] = { struct operation; /* forward decl */ +/** @brief Type of an operation callback */ typedef void operation_callback(disorder_eclient *c, struct operation *op); -/* A pending operation. This can be either a command or part of the - * authentication protocol. In the former case new commands are appended to - * the list, in the latter case they are inserted at the front. */ +/** @brief A pending operation. + * + * This can be either a command or part of the authentication protocol. In the + * former case new commands are appended to the list, in the latter case they + * are inserted at the front. */ struct operation { - struct operation *next; /* next operation */ - char *cmd; /* command to send or 0 */ - operation_callback *opcallback; /* internal completion callback */ - void (*completed)(); /* user completion callback or 0 */ - void *v; /* data for COMPLETED */ - disorder_eclient *client; /* owning client */ - int sent; /* true if sent to server */ + struct operation *next; /**< @brief next operation */ + char *cmd; /**< @brief command to send or 0 */ + operation_callback *opcallback; /**< @brief internal completion callback */ + void (*completed)(); /**< @brief user completion callback or 0 */ + void *v; /**< @brief data for COMPLETED */ + disorder_eclient *client; /**< @brief owning client */ + + /** @brief true if sent to server + * + * This is cleared by disorder_eclient_close(), forcing all queued + * commands to be transparently resent. + */ + int sent; }; +/** @brief Client structure */ struct disorder_eclient { const char *ident; - int fd; - enum client_state state; /* current state */ - int authenticated; /* true when authenicated */ - struct dynstr output; /* output buffer */ - struct dynstr input; /* input buffer */ - int eof; /* input buffer is at EOF */ - /* error reporting callbacks */ - const disorder_eclient_callbacks *callbacks; - void *u; - /* operation queuue */ - struct operation *ops, **opstail; + int fd; /**< @brief connection to server */ + enum client_state state; /**< @brief current state */ + int authenticated; /**< @brief true when authenicated */ + struct dynstr output; /**< @brief output buffer */ + struct dynstr input; /**< @brief input buffer */ + int eof; /**< @brief input buffer is at EOF */ + const disorder_eclient_callbacks *callbacks; /**< @brief error callbacks */ + void *u; /**< @brief user data */ + struct operation *ops; /**< @brief queue of operations */ + struct operation **opstail; /**< @brief queue tail */ /* accumulated response */ - int rc; /* response code */ - char *line; /* complete line */ - struct vector vec; /* body */ - /* log client callback */ + int rc; /**< @brief response code */ + char *line; /**< @brief complete line */ + struct vector vec; /**< @brief body */ + const disorder_eclient_log_callbacks *log_callbacks; - void *log_v; - unsigned long statebits; /* current state */ + /**< @brief log callbacks + * + * Once disorder_eclient_log() has been issued this is always set. When we + * re-connect it is checked to re-issue the log command. + */ + void *log_v; /**< @brief user data */ + unsigned long statebits; /**< @brief latest state */ + + time_t last_prod; + /**< @brief last time we sent a prod + * + * When we are receiving log data we send a "prod" byte to the server from + * time to time so that we detect broken connections reasonably quickly. The + * server just ignores these bytes. + */ }; /* Forward declarations ******************************************************/ @@ -157,16 +188,22 @@ static void logentry_removed(disorder_eclient *c, int nvec, char **vec); static void logentry_scratched(disorder_eclient *c, int nvec, char **vec); static void logentry_state(disorder_eclient *c, int nvec, char **vec); static void logentry_volume(disorder_eclient *c, int nvec, char **vec); +static void logentry_rescanned(disorder_eclient *c, int nvec, char **vec); /* Tables ********************************************************************/ -static const struct logentry_handler { - const char *name; - int min, max; +/** @brief One possible log entry */ +struct logentry_handler { + const char *name; /**< @brief Entry name */ + int min; /**< @brief Minimum arguments */ + int max; /**< @brief Maximum arguments */ void (*handler)(disorder_eclient *c, int nvec, - char **vec); -} logentry_handlers[] = { + char **vec); /**< @brief Handler function */ +}; + +/** @brief Table for parsing log entries */ +static const struct logentry_handler logentry_handlers[] = { #define LE(X, MIN, MAX) { #X, MIN, MAX, logentry_##X } LE(completed, 1, 1), LE(failed, 2, 2), @@ -176,6 +213,7 @@ static const struct logentry_handler { LE(recent_added, 2, INT_MAX), LE(recent_removed, 1, 1), LE(removed, 1, 2), + LE(rescanned, 0, 0), LE(scratched, 2, 2), LE(state, 1, 1), LE(volume, 2, 2) @@ -183,6 +221,10 @@ static const struct logentry_handler { /* Setup and teardown ********************************************************/ +/** @brief Create a new client + * + * Does NOT connect the client - connections are made (and re-made) on demand. + */ disorder_eclient *disorder_eclient_new(const disorder_eclient_callbacks *cb, void *u) { disorder_eclient *c = xmalloc(sizeof *c); @@ -197,6 +239,13 @@ disorder_eclient *disorder_eclient_new(const disorder_eclient_callbacks *cb, return c; } +/** @brief Disconnect a client + * @param c Client to disconnect + * + * NB that this routine just disconnnects the TCP connection. It does not + * destroy the client! If you continue to use it then it will attempt to + * reconnect. + */ void disorder_eclient_close(disorder_eclient *c) { struct operation *op; @@ -207,6 +256,7 @@ void disorder_eclient_close(disorder_eclient *c) { xclose(c->fd); c->fd = -1; c->state = state_disconnected; + c->statebits = 0; } c->output.nvec = 0; c->input.nvec = 0; @@ -215,11 +265,23 @@ void disorder_eclient_close(disorder_eclient *c) { /* We'll need to resend all operations */ for(op = c->ops; op; op = op->next) op->sent = 0; + /* Drop our use a hint that we're disconnected */ + if(c->log_callbacks && c->log_callbacks->state) + c->log_callbacks->state(c->log_v, c->statebits); +} + +/** @brief Return current state */ +unsigned long disorder_eclient_state(const disorder_eclient *c) { + return c->statebits | (c->state > state_connected ? DISORDER_CONNECTED : 0); } /* Error reporting ***********************************************************/ -/* called when a connection error occurs */ +/** @brief called when a connection error occurs + * + * After this called we will be disconnected (by disorder_eclient_close()), + * so there will be a reconnection before any commands can be sent. + */ static int comms_error(disorder_eclient *c, const char *fmt, ...) { va_list ap; char *s; @@ -233,7 +295,7 @@ static int comms_error(disorder_eclient *c, const char *fmt, ...) { return -1; } -/* called when the server reports an error */ +/** @brief called when the server reports an error */ static int protocol_error(disorder_eclient *c, struct operation *op, int code, const char *fmt, ...) { va_list ap; @@ -249,9 +311,19 @@ static int protocol_error(disorder_eclient *c, struct operation *op, /* State machine *************************************************************/ +/** @brief Called when there's something to do + * @param c Client + * @param mode bitmap of @ref DISORDER_POLL_READ and/or @ref DISORDER_POLL_WRITE. + * + * This should be called from by your code when the file descriptor is readable + * or writable (as requested by the @c poll callback, see @ref + * disorder_eclient_callbacks) and in any case from time to time (with @p mode + * = 0) to allow for retries to work. + */ void disorder_eclient_polled(disorder_eclient *c, unsigned mode) { struct operation *op; - + time_t now; + D(("disorder_eclient_polled fd=%d state=%s mode=[%s %s]", c->fd, states[c->state], mode & DISORDER_POLL_READ ? "READ" : "", @@ -264,6 +336,11 @@ void disorder_eclient_polled(disorder_eclient *c, unsigned mode) { if(c->state == state_disconnected) { D(("state_disconnected")); + /* If there is no password yet then we cannot connect */ + if(!config->password) { + comms_error(c, "no password is configured"); + return; + } with_sockaddr(c, start_connect); /* might now be state_disconnected (on error), state_connecting (slow * connect) or state_connected (fast connect). If state_disconnected then @@ -319,6 +396,14 @@ void disorder_eclient_polled(disorder_eclient *c, unsigned mode) { c->callbacks->report(c->u, 0); } + /* Queue up a byte to send */ + if(c->state == state_log + && c->output.nvec == 0 + && time(&now) - c->last_prod > LOG_PROD_INTERVAL) { + put(c, "x", 1); + c->last_prod = now; + } + if(c->state == state_cmdresponse || c->state == state_body || c->state == state_log) { @@ -381,7 +466,7 @@ void disorder_eclient_polled(disorder_eclient *c, unsigned mode) { if(c->fd != -1) c->callbacks->poll(c->u, c, c->fd, mode); } -/* Called to start connecting */ +/** @brief Called to start connecting */ static int start_connect(void *cc, const struct sockaddr *sa, socklen_t len, @@ -398,6 +483,7 @@ static int start_connect(void *cc, return comms_error(c, "socket: %s", strerror(errno)); c->eof = 0; nonblock(c->fd); + cloexec(c->fd); if(connect(c->fd, sa, len) < 0) { switch(errno) { case EINTR: @@ -415,7 +501,7 @@ static int start_connect(void *cc, return 0; } -/* Called when maybe connected */ +/** @brief Called when poll triggers while waiting for a connection */ static void maybe_connected(disorder_eclient *c) { /* We either connected, or got an error. */ int err; @@ -430,8 +516,14 @@ static void maybe_connected(disorder_eclient *c) { comms_error(c, "connecting to %s: %s", c->ident, strerror(err)); /* sets state_disconnected */ } else { + char *r; + /* The connection succeeded */ c->state = state_connected; + byte_xasprintf(&r, "connected to %s", c->ident); + c->callbacks->report(c->u, r); + /* If this is a log client we expect to get a bunch of updates from the + * server straight away */ } } @@ -442,16 +534,38 @@ static void authbanner_opcallback(disorder_eclient *c, size_t nonce_len; const unsigned char *nonce; const char *res; + char **rvec; + int nrvec; + const char *protocol, *algorithm, *challenge; D(("authbanner_opcallback")); - if(c->rc / 100 != 2) { - /* Banner told us to go away. We cannot proceed. */ + if(c->rc / 100 != 2 + || !(rvec = split(c->line + 4, &nrvec, SPLIT_QUOTES, 0, 0)) + || nrvec < 1) { + /* Banner told us to go away, or was malformed. We cannot proceed. */ + protocol_error(c, op, c->rc, "%s: %s", c->ident, c->line); + disorder_eclient_close(c); + return; + } + if(nrvec != 3) { protocol_error(c, op, c->rc, "%s: %s", c->ident, c->line); disorder_eclient_close(c); + } + protocol = *rvec++; + algorithm = *rvec++; + challenge = *rvec++; + if(strcmp(protocol, "2")) { + protocol_error(c, op, c->rc, "%s: %s", c->ident, c->line); + disorder_eclient_close(c); + } + nonce = unhex(challenge, &nonce_len); + res = authhash(nonce, nonce_len, config->password, algorithm); + if(!res) { + protocol_error(c, op, c->rc, "%s: unknown authentication algorithm '%s'", + c->ident, algorithm); + disorder_eclient_close(c); return; } - nonce = unhex(c->line + 4, &nonce_len); - res = authhash(nonce, nonce_len, config->password); stash_command(c, 1/*queuejump*/, authuser_opcallback, 0/*completed*/, 0/*v*/, "user", quoteutf8(config->username), quoteutf8(res), (char *)0); @@ -459,6 +573,8 @@ static void authbanner_opcallback(disorder_eclient *c, static void authuser_opcallback(disorder_eclient *c, struct operation *op) { + char *r; + D(("authuser_opcallback")); if(c->rc / 100 != 2) { /* Wrong password or something. We cannot proceed. */ @@ -468,6 +584,8 @@ static void authuser_opcallback(disorder_eclient *c, } /* OK, we're authenticated now. */ c->authenticated = 1; + byte_xasprintf(&r, "authenticated with %s", c->ident); + c->callbacks->report(c->u, r); if(c->log_callbacks && !(c->ops && c->ops->opcallback == log_opcallback)) /* We are a log client, switch to logging mode */ stash_command(c, 0/*queuejump*/, log_opcallback, 0/*completed*/, c->log_v, @@ -569,7 +687,7 @@ static void process_line(disorder_eclient *c, char *line) { case 3: /* We need to collect the body. */ c->state = state_body; - c->vec.nvec = 0; + vector_init(&c->vec); break; case 4: assert(c->log_callbacks != 0); @@ -708,13 +826,19 @@ static void stash_command(disorder_eclient *c, /* Command support ***********************************************************/ -/* for commands with a simple string response */ +/* for commands with a quoted string response */ static void string_response_opcallback(disorder_eclient *c, struct operation *op) { D(("string_response_callback")); if(c->rc / 100 == 2) { - if(op->completed) - ((disorder_eclient_string_response *)op->completed)(op->v, c->line + 4); + if(op->completed) { + char **rr = split(c->line + 4, 0, SPLIT_QUOTES, 0, 0); + + if(rr && *rr) + ((disorder_eclient_string_response *)op->completed)(op->v, *rr); + else + protocol_error(c, op, c->rc, "%s: %s", c->ident, c->line); + } } else protocol_error(c, op, c->rc, "%s: %s", c->ident, c->line); } @@ -1069,14 +1193,84 @@ int disorder_eclient_search(disorder_eclient *c, "search", terms, (char *)0); } +int disorder_eclient_nop(disorder_eclient *c, + disorder_eclient_no_response *completed, + void *v) { + return simple(c, no_response_opcallback, (void (*)())completed, v, + "nop", (char *)0); +} + +/** @brief Get the last @p max added tracks + * @param c Client + * @param completed Called with list + * @param max Number of tracks to get, 0 for all + * @param v Passed to @p completed + * + * The first track in the list is the most recently added. + */ +int disorder_eclient_new_tracks(disorder_eclient *c, + disorder_eclient_list_response *completed, + int max, + void *v) { + char limit[32]; + + sprintf(limit, "%d", max); + return simple(c, list_response_opcallback, (void (*)())completed, v, + "new", limit, (char *)0); +} + +static void rtp_response_opcallback(disorder_eclient *c, + struct operation *op) { + D(("rtp_response_opcallback")); + if(c->rc / 100 == 2) { + if(op->completed) { + int nvec; + char **vec = split(c->line + 4, &nvec, SPLIT_QUOTES, 0, 0); + + ((disorder_eclient_list_response *)op->completed)(op->v, nvec, vec); + } + } else + protocol_error(c, op, c->rc, "%s: %s", c->ident, c->line); +} + +/** @brief Determine the RTP target address + * @param c Client + * @param completed Called with address details + * @param v Passed to @p completed + * + * The address details will be two elements, the first being the hostname and + * the second the service (port). + */ +int disorder_eclient_rtp_address(disorder_eclient *c, + disorder_eclient_list_response *completed, + void *v) { + return simple(c, rtp_response_opcallback, (void (*)())completed, v, + "rtp-address", (char *)0); +} + /* Log clients ***************************************************************/ +/** @brief Monitor the server log + * @param c Client + * @param callbacks Functions to call when anything happens + * @param v Passed to @p callbacks functions + * + * Once a client is being used for logging it cannot be used for anything else. + * There is magic in authuser_opcallback() to re-submit the @c log command + * after reconnection. + * + * NB that the @c state callback may be called from within this function, + * i.e. not solely later on from the event loop callback. + */ int disorder_eclient_log(disorder_eclient *c, const disorder_eclient_log_callbacks *callbacks, void *v) { if(c->log_callbacks) return -1; c->log_callbacks = callbacks; c->log_v = v; + /* Repoort initial state */ + if(c->log_callbacks->state) + c->log_callbacks->state(c->log_v, c->statebits); stash_command(c, 0/*queuejump*/, log_opcallback, 0/*completed*/, v, "log", (char *)0); return 0; @@ -1102,7 +1296,7 @@ static void logline(disorder_eclient *c, const char *line) { char **vec; uintmax_t when; - D(("log_opcallback [%s]", line)); + D(("logline [%s]", line)); vec = split(line, &nvec, SPLIT_QUOTES, logline_error, c); if(nvec < 2) return; /* probably an error, already * reported */ @@ -1124,13 +1318,19 @@ static void logline(disorder_eclient *c, const char *line) { static void logentry_completed(disorder_eclient *c, int attribute((unused)) nvec, char **vec) { if(!c->log_callbacks->completed) return; + c->statebits &= ~DISORDER_PLAYING; c->log_callbacks->completed(c->log_v, vec[0]); + if(c->log_callbacks->state) + c->log_callbacks->state(c->log_v, c->statebits | DISORDER_CONNECTED); } static void logentry_failed(disorder_eclient *c, int attribute((unused)) nvec, char **vec) { if(!c->log_callbacks->failed)return; + c->statebits &= ~DISORDER_PLAYING; c->log_callbacks->failed(c->log_v, vec[0], vec[1]); + if(c->log_callbacks->state) + c->log_callbacks->state(c->log_v, c->statebits | DISORDER_CONNECTED); } static void logentry_moved(disorder_eclient *c, @@ -1142,7 +1342,10 @@ static void logentry_moved(disorder_eclient *c, static void logentry_playing(disorder_eclient *c, int attribute((unused)) nvec, char **vec) { if(!c->log_callbacks->playing) return; + c->statebits |= DISORDER_PLAYING; c->log_callbacks->playing(c->log_v, vec[0], vec[1]); + if(c->log_callbacks->state) + c->log_callbacks->state(c->log_v, c->statebits | DISORDER_CONNECTED); } static void logentry_queue(disorder_eclient *c, @@ -1179,10 +1382,20 @@ static void logentry_removed(disorder_eclient *c, c->log_callbacks->removed(c->log_v, vec[0], vec[1]); } +static void logentry_rescanned(disorder_eclient *c, + int attribute((unused)) nvec, + char attribute((unused)) **vec) { + if(!c->log_callbacks->rescanned) return; + c->log_callbacks->rescanned(c->log_v); +} + static void logentry_scratched(disorder_eclient *c, int attribute((unused)) nvec, char **vec) { if(!c->log_callbacks->scratched) return; + c->statebits &= ~DISORDER_PLAYING; c->log_callbacks->scratched(c->log_v, vec[0], vec[1]); + if(c->log_callbacks->state) + c->log_callbacks->state(c->log_v, c->statebits | DISORDER_CONNECTED); } static const struct { @@ -1193,23 +1406,26 @@ static const struct { { DISORDER_PLAYING_ENABLED, "enable_play", "disable_play" }, { DISORDER_RANDOM_ENABLED, "enable_random", "disable_random" }, { DISORDER_TRACK_PAUSED, "pause", "resume" }, + { DISORDER_PLAYING, "playing", "completed" }, + { DISORDER_PLAYING, 0, "scratched" }, + { DISORDER_PLAYING, 0, "failed" }, }; -#define NSTATES (int)(sizeof states / sizeof *states) +#define NSTATES (int)(sizeof statestrings / sizeof *statestrings) static void logentry_state(disorder_eclient *c, int attribute((unused)) nvec, char **vec) { int n; for(n = 0; n < NSTATES; ++n) - if(!strcmp(vec[0], statestrings[n].enable)) { + if(statestrings[n].enable && !strcmp(vec[0], statestrings[n].enable)) { c->statebits |= statestrings[n].bit; break; - } else if(!strcmp(vec[0], statestrings[n].disable)) { + } else if(statestrings[n].disable && !strcmp(vec[0], statestrings[n].disable)) { c->statebits &= ~statestrings[n].bit; break; } if(!c->log_callbacks->state) return; - c->log_callbacks->state(c->log_v, c->statebits); + c->log_callbacks->state(c->log_v, c->statebits | DISORDER_CONNECTED); } static void logentry_volume(disorder_eclient *c, @@ -1225,6 +1441,45 @@ static void logentry_volume(disorder_eclient *c, c->log_callbacks->volume(c->log_v, (int)l, (int)r); } +/** @brief Convert @p statebits to a string */ +char *disorder_eclient_interpret_state(unsigned long statebits) { + struct dynstr d[1]; + size_t n; + + static const struct { + unsigned long bit; + const char *name; + } bits[] = { + { DISORDER_PLAYING_ENABLED, "playing_enabled" }, + { DISORDER_RANDOM_ENABLED, "random_enabled" }, + { DISORDER_TRACK_PAUSED, "track_paused" }, + { DISORDER_PLAYING, "playing" }, + { DISORDER_CONNECTED, "connected" }, + }; +#define NBITS (sizeof bits / sizeof *bits) + + dynstr_init(d); + if(!statebits) + dynstr_append(d, '0'); + for(n = 0; n < NBITS; ++n) + if(statebits & bits[n].bit) { + if(d->nvec) + dynstr_append(d, '|'); + dynstr_append_string(d, bits[n].name); + statebits ^= bits[n].bit; + } + if(statebits) { + char s[20]; + + if(d->nvec) + dynstr_append(d, '|'); + sprintf(s, "%#lx", statebits); + dynstr_append_string(d, s); + } + dynstr_terminate(d); + return d->vec; +} + /* Local Variables: c-basic-offset:2