X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?p=elogind.git;a=blobdiff_plain;f=src%2Fjournal-remote%2Fjournal-remote.c;h=1a2c1368a3f18b8308c8d6cbb8cfba09f9f877c8;hp=e6aa07243e488820990d99127f784d3bae37a47b;hb=043945b93824e33e040954612aaa934cd1a43a1b;hpb=f53f7c8fc43df4e38655f2a1f57777c5934fee06 diff --git a/src/journal-remote/journal-remote.c b/src/journal-remote/journal-remote.c index e6aa07243..1a2c1368a 100644 --- a/src/journal-remote/journal-remote.c +++ b/src/journal-remote/journal-remote.c @@ -285,6 +285,8 @@ static int dispatch_raw_source_event(sd_event_source *event, int fd, uint32_t revents, void *userdata); +static int dispatch_raw_source_until_block(sd_event_source *event, + void *userdata); static int dispatch_blocking_source_event(sd_event_source *event, void *userdata); static int dispatch_raw_connection_event(sd_event_source *event, @@ -372,8 +374,15 @@ static int add_source(RemoteServer *s, int fd, char* name, bool own_name) { r = sd_event_add_io(s->events, &source->event, fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI, - dispatch_raw_source_event, s); - if (r == -EPERM) { + dispatch_raw_source_event, source); + if (r == 0) { + /* Add additional source for buffer processing. It will be + * enabled later. */ + r = sd_event_add_defer(s->events, &source->buffer_event, + dispatch_raw_source_until_block, source); + if (r == 0) + sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF); + } else if (r == -EPERM) { log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name); r = sd_event_add_defer(s->events, &source->event, dispatch_blocking_source_event, source); @@ -993,15 +1002,18 @@ static void server_destroy(RemoteServer *s) { ********************************************************************** **********************************************************************/ -static int dispatch_raw_source_event(sd_event_source *event, - int fd, - uint32_t revents, - void *userdata) { +static int handle_raw_source(sd_event_source *event, + int fd, + uint32_t revents, + RemoteServer *s) { - RemoteServer *s = userdata; RemoteSource *source; int r; + /* Returns 1 if there might be more data pending, + * 0 if data is currently exhausted, negative on error. + */ + assert(fd >= 0 && fd < (ssize_t) s->sources_size); source = s->sources[fd]; assert(source->fd == fd); @@ -1032,11 +1044,48 @@ static int dispatch_raw_source_event(sd_event_source *event, return 1; } +static int dispatch_raw_source_until_block(sd_event_source *event, + void *userdata) { + RemoteSource *source = userdata; + int r; + + /* Make sure event stays around even if source is destroyed */ + sd_event_source_ref(event); + + r = handle_raw_source(event, source->fd, EPOLLIN, server); + if (r != 1) + /* No more data for now */ + sd_event_source_set_enabled(event, SD_EVENT_OFF); + + sd_event_source_unref(event); + + return r; +} + +static int dispatch_raw_source_event(sd_event_source *event, + int fd, + uint32_t revents, + void *userdata) { + RemoteSource *source = userdata; + int r; + + assert(source->event); + assert(source->buffer_event); + + r = handle_raw_source(event, fd, EPOLLIN, server); + if (r == 1) + /* Might have more data. We need to rerun the handler + * until we are sure the buffer is exhausted. */ + sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON); + + return r; +} + static int dispatch_blocking_source_event(sd_event_source *event, void *userdata) { RemoteSource *source = userdata; - return dispatch_raw_source_event(event, source->fd, EPOLLIN, server); + return handle_raw_source(event, source->fd, EPOLLIN, server); } static int accept_connection(const char* type, int fd,