8 #include "journal-upload.h"
11 * Write up to size bytes to buf. Return negative on error, and number of
12 * bytes written otherwise. The last case is a kind of an error too.
14 static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
18 assert(size <= SSIZE_MAX);
22 switch(u->entry_state) {
24 free(u->current_cursor);
25 u->current_cursor = NULL;
27 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
29 log_error("Failed to get cursor: %s", strerror(-r));
33 r = snprintf(buf + pos, size - pos,
34 "__CURSOR=%s\n", u->current_cursor);
36 /* not enough space */
41 if (pos + r == size) {
42 /* exactly one character short, but we don't need it */
50 case ENTRY_REALTIME: {
53 r = sd_journal_get_realtime_usec(u->journal, &realtime);
55 log_error("Failed to get realtime timestamp: %s", strerror(-r));
59 r = snprintf(buf + pos, size - pos,
60 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
62 /* not enough space */
67 if (r + pos == size) {
68 /* exactly one character short, but we don't need it */
76 case ENTRY_MONOTONIC: {
80 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
82 log_error("Failed to get monotonic timestamp: %s", strerror(-r));
86 r = snprintf(buf + pos, size - pos,
87 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
89 /* not enough space */
94 if (r + pos == size) {
95 /* exactly one character short, but we don't need it */
103 case ENTRY_BOOT_ID: {
107 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
109 log_error("Failed to get monotonic timestamp: %s", strerror(-r));
113 r = snprintf(buf + pos, size - pos,
114 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
116 /* not enough space */
121 if (r + pos == size) {
122 /* exactly one character short, but we don't need it */
123 buf[size - 1] = '\n';
130 case ENTRY_NEW_FIELD: {
133 r = sd_journal_enumerate_data(u->journal,
137 log_error("Failed to move to next field in entry: %s",
141 u->entry_state = ENTRY_OUTRO;
145 if (!utf8_is_printable_newline(u->field_data,
146 u->field_length, false)) {
147 u->entry_state = ENTRY_BINARY_FIELD_START;
154 case ENTRY_TEXT_FIELD:
155 case ENTRY_BINARY_FIELD: {
159 done = size - pos > u->field_length - u->field_pos;
161 tocopy = u->field_length - u->field_pos;
166 (char*) u->field_data + u->field_pos,
170 buf[pos + tocopy] = '\n';
172 u->entry_state = ENTRY_NEW_FIELD;
175 u->field_pos += tocopy;
180 case ENTRY_BINARY_FIELD_START: {
184 c = memchr(u->field_data, '=', u->field_length);
185 if (!c || c == u->field_data) {
186 log_error("Invalid field.");
190 len = c - (const char*)u->field_data;
192 /* need space for label + '\n' */
193 if (size - pos < len + 1)
196 memcpy(buf + pos, u->field_data, len);
197 buf[pos + len] = '\n';
200 u->field_pos = len + 1;
204 case ENTRY_BINARY_FIELD_SIZE: {
207 /* need space for uint64_t */
211 le64 = htole64(u->field_length - u->field_pos);
212 memcpy(buf + pos, &le64, 8);
220 /* need space for '\n' */
231 assert_not_reached("WTF?");
234 assert_not_reached("WTF?");
237 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
245 assert(nmemb <= SSIZE_MAX / size);
249 while (j && filled < size * nmemb) {
250 if (u->entry_state == ENTRY_DONE) {
251 r = sd_journal_next(j);
253 log_error("Failed to move to next entry in journal: %s",
255 return CURL_READFUNC_ABORT;
258 log_debug("No more entries, waiting for journal.");
260 log_info("No more entries, closing journal.");
261 close_journal_input(u);
264 u->uploading = false;
269 u->entry_state = ENTRY_CURSOR;
272 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
274 return CURL_READFUNC_ABORT;
278 log_error("Buffer space is too small to write entry.");
279 return CURL_READFUNC_ABORT;
280 } else if (u->entry_state != ENTRY_DONE)
281 /* This means that all available space was used up */
284 log_debug("Entry %zu (%s) has been uploaded.",
285 u->entries_sent, u->current_cursor);
291 void close_journal_input(Uploader *u) {
295 log_debug("Closing journal input.");
297 sd_journal_close(u->journal);
303 static int process_journal_input(Uploader *u, int skip) {
306 r = sd_journal_next_skip(u->journal, skip);
308 log_error("Failed to skip to next entry: %s", strerror(-r));
314 u->entry_state = ENTRY_CURSOR;
315 return start_upload(u, journal_input_callback, u);
318 int check_journal_input(Uploader *u) {
319 if (u->input_event) {
322 r = sd_journal_process(u->journal);
324 log_error("Failed to process journal: %s", strerror(-r));
325 close_journal_input(u);
329 if (r == SD_JOURNAL_NOP)
333 return process_journal_input(u, 1);
336 static int dispatch_journal_input(sd_event_source *event,
345 log_warning("dispatch_journal_input called when uploading, ignoring.");
349 log_debug("Detected journal input, checking for new data.");
350 return check_journal_input(u);
353 int open_journal_for_upload(Uploader *u,
362 sd_journal_set_data_threshold(j, 0);
365 fd = sd_journal_get_fd(j);
367 log_error("sd_journal_get_fd failed: %s", strerror(-fd));
371 events = sd_journal_get_events(j);
373 r = sd_journal_reliable_fd(j);
378 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
380 r = sd_event_add_io(u->events, &u->input_event,
381 fd, events, dispatch_journal_input, u);
383 log_error("Failed to register input event: %s", strerror(-r));
387 log_debug("Listening for journal events on fd:%d, timeout %d",
388 fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
390 log_debug("Not listening for journal events.");
393 r = sd_journal_seek_cursor(j, cursor);
395 log_error("Failed to seek to cursor %s: %s",
396 cursor, strerror(-r));
401 return process_journal_input(u, 1 + !!after_cursor);