X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=blobdiff_plain;f=src%2Fjournal-remote%2Fjournal-remote.c;h=1a2c1368a3f18b8308c8d6cbb8cfba09f9f877c8;hb=043945b93824e33e040954612aaa934cd1a43a1b;hp=ba811e0333749e2466889aa330daf163f16158ee;hpb=2eec67acbb00593e414549a7e5b35eb7dd776b1b;p=elogind.git diff --git a/src/journal-remote/journal-remote.c b/src/journal-remote/journal-remote.c index ba811e033..1a2c1368a 100644 --- a/src/journal-remote/journal-remote.c +++ b/src/journal-remote/journal-remote.c @@ -203,7 +203,7 @@ static int open_output(Writer *w, const char* host) { log_error_errno(r, "Failed to open output journal %s: %m", output); else - log_info("Opened output file %s", w->journal->path); + log_debug("Opened output file %s", w->journal->path); return r; } @@ -285,6 +285,8 @@ static int dispatch_raw_source_event(sd_event_source *event, int fd, uint32_t revents, void *userdata); +static int dispatch_raw_source_until_block(sd_event_source *event, + void *userdata); static int dispatch_blocking_source_event(sd_event_source *event, void *userdata); static int dispatch_raw_connection_event(sd_event_source *event, @@ -372,8 +374,15 @@ static int add_source(RemoteServer *s, int fd, char* name, bool own_name) { r = sd_event_add_io(s->events, &source->event, fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI, - dispatch_raw_source_event, s); - if (r == -EPERM) { + dispatch_raw_source_event, source); + if (r == 0) { + /* Add additional source for buffer processing. It will be + * enabled later. */ + r = sd_event_add_defer(s->events, &source->buffer_event, + dispatch_raw_source_until_block, source); + if (r == 0) + sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF); + } else if (r == -EPERM) { log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name); r = sd_event_add_defer(s->events, &source->event, dispatch_blocking_source_event, source); @@ -743,7 +752,7 @@ static int setup_microhttpd_socket(RemoteServer *s, const char *trust) { int fd; - fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC); + fd = make_socket_fd(LOG_DEBUG, address, SOCK_STREAM | SOCK_CLOEXEC); if (fd < 0) return fd; @@ -840,7 +849,7 @@ static int remoteserver_init(RemoteServer *s, if (n < 0) return log_error_errno(n, "Failed to read listening file descriptors from environment: %m"); else - log_info("Received %d descriptors", n); + log_debug("Received %d descriptors", n); if (MAX(http_socket, https_socket) >= SD_LISTEN_FDS_START + n) { log_error("Received fewer sockets than expected"); @@ -849,7 +858,7 @@ static int remoteserver_init(RemoteServer *s, for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) { if (sd_is_socket(fd, AF_UNSPEC, 0, true)) { - log_info("Received a listening socket (fd:%d)", fd); + log_debug("Received a listening socket (fd:%d)", fd); if (fd == http_socket) r = setup_microhttpd_server(s, fd, NULL, NULL, NULL); @@ -864,7 +873,7 @@ static int remoteserver_init(RemoteServer *s, if (r < 0) return log_error_errno(r, "Failed to retrieve remote name: %m"); - log_info("Received a connection socket (fd:%d) from %s", fd, hostname); + log_debug("Received a connection socket (fd:%d) from %s", fd, hostname); r = add_source(s, fd, hostname, true); } else { @@ -904,7 +913,7 @@ static int remoteserver_init(RemoteServer *s, } if (arg_listen_raw) { - log_info("Listening on a socket..."); + log_debug("Listening on a socket..."); r = setup_raw_socket(s, arg_listen_raw); if (r < 0) return r; @@ -926,12 +935,12 @@ static int remoteserver_init(RemoteServer *s, const char *output_name; if (streq(*file, "-")) { - log_info("Using standard input as source."); + log_debug("Using standard input as source."); fd = STDIN_FILENO; output_name = "stdin"; } else { - log_info("Reading file %s...", *file); + log_debug("Reading file %s...", *file); fd = open(*file, O_RDONLY|O_CLOEXEC|O_NOCTTY|O_NONBLOCK); if (fd < 0) @@ -993,15 +1002,18 @@ static void server_destroy(RemoteServer *s) { ********************************************************************** **********************************************************************/ -static int dispatch_raw_source_event(sd_event_source *event, - int fd, - uint32_t revents, - void *userdata) { +static int handle_raw_source(sd_event_source *event, + int fd, + uint32_t revents, + RemoteServer *s) { - RemoteServer *s = userdata; RemoteSource *source; int r; + /* Returns 1 if there might be more data pending, + * 0 if data is currently exhausted, negative on error. + */ + assert(fd >= 0 && fd < (ssize_t) s->sources_size); source = s->sources[fd]; assert(source->fd == fd); @@ -1010,33 +1022,70 @@ static int dispatch_raw_source_event(sd_event_source *event, if (source->state == STATE_EOF) { size_t remaining; - log_info("EOF reached with source fd:%d (%s)", - source->fd, source->name); + log_debug("EOF reached with source fd:%d (%s)", + source->fd, source->name); remaining = source_non_empty(source); if (remaining > 0) - log_warning("Premature EOF. %zu bytes lost.", remaining); + log_notice("Premature EOF. %zu bytes lost.", remaining); remove_source(s, source->fd); - log_info("%zu active sources remaining", s->active); + log_debug("%zu active sources remaining", s->active); return 0; } else if (r == -E2BIG) { - log_error("Entry too big, skipped"); + log_notice_errno(E2BIG, "Entry too big, skipped"); return 1; - } else if (r == -EAGAIN) { + } else if (r == -EAGAIN || r == -EWOULDBLOCK) { return 0; } else if (r < 0) { - log_info_errno(r, "Closing connection: %m"); + log_debug_errno(r, "Closing connection: %m"); remove_source(server, fd); return 0; } else return 1; } +static int dispatch_raw_source_until_block(sd_event_source *event, + void *userdata) { + RemoteSource *source = userdata; + int r; + + /* Make sure event stays around even if source is destroyed */ + sd_event_source_ref(event); + + r = handle_raw_source(event, source->fd, EPOLLIN, server); + if (r != 1) + /* No more data for now */ + sd_event_source_set_enabled(event, SD_EVENT_OFF); + + sd_event_source_unref(event); + + return r; +} + +static int dispatch_raw_source_event(sd_event_source *event, + int fd, + uint32_t revents, + void *userdata) { + RemoteSource *source = userdata; + int r; + + assert(source->event); + assert(source->buffer_event); + + r = handle_raw_source(event, fd, EPOLLIN, server); + if (r == 1) + /* Might have more data. We need to rerun the handler + * until we are sure the buffer is exhausted. */ + sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON); + + return r; +} + static int dispatch_blocking_source_event(sd_event_source *event, void *userdata) { RemoteSource *source = userdata; - return dispatch_raw_source_event(event, source->fd, EPOLLIN, server); + return handle_raw_source(event, source->fd, EPOLLIN, server); } static int accept_connection(const char* type, int fd, @@ -1067,10 +1116,10 @@ static int accept_connection(const char* type, int fd, return r; } - log_info("Accepted %s %s connection from %s", - type, - socket_address_family(addr) == AF_INET ? "IP" : "IPv6", - a); + log_debug("Accepted %s %s connection from %s", + type, + socket_address_family(addr) == AF_INET ? "IP" : "IPv6", + a); *hostname = b;