chiark / gitweb /
treewide: more log_*_errno() conversions, multiline calls
[elogind.git] / src / journal-remote / journal-upload-journal.c
1 #include <stdbool.h>
2
3 #include <curl/curl.h>
4
5 #include "util.h"
6 #include "log.h"
7 #include "utf8.h"
8 #include "journal-upload.h"
9
10 /**
11  * Write up to size bytes to buf. Return negative on error, and number of
12  * bytes written otherwise. The last case is a kind of an error too.
13  */
14 static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
15         int r;
16         size_t pos = 0;
17
18         assert(size <= SSIZE_MAX);
19
20         while (true) {
21
22                 switch(u->entry_state) {
23                 case ENTRY_CURSOR: {
24                         free(u->current_cursor);
25                         u->current_cursor = NULL;
26
27                         r = sd_journal_get_cursor(u->journal, &u->current_cursor);
28                         if (r < 0) {
29                                 log_error_errno(r, "Failed to get cursor: %m");
30                                 return r;
31                         }
32
33                         r = snprintf(buf + pos, size - pos,
34                                      "__CURSOR=%s\n", u->current_cursor);
35                         if (pos + r > size)
36                                 /* not enough space */
37                                 return pos;
38
39                         u->entry_state ++;
40
41                         if (pos + r == size) {
42                                 /* exactly one character short, but we don't need it */
43                                 buf[size - 1] = '\n';
44                                 return size;
45                         }
46
47                         pos += r;
48                 }       /* fall through */
49
50                 case ENTRY_REALTIME: {
51                         usec_t realtime;
52
53                         r = sd_journal_get_realtime_usec(u->journal, &realtime);
54                         if (r < 0) {
55                                 log_error_errno(r, "Failed to get realtime timestamp: %m");
56                                 return r;
57                         }
58
59                         r = snprintf(buf + pos, size - pos,
60                                      "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
61                         if (r + pos > size)
62                                 /* not enough space */
63                                 return pos;
64
65                         u->entry_state ++;
66
67                         if (r + pos == size) {
68                                 /* exactly one character short, but we don't need it */
69                                 buf[size - 1] = '\n';
70                                 return size;
71                         }
72
73                         pos += r;
74                 }       /* fall through */
75
76                 case ENTRY_MONOTONIC: {
77                         usec_t monotonic;
78                         sd_id128_t boot_id;
79
80                         r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
81                         if (r < 0) {
82                                 log_error_errno(r, "Failed to get monotonic timestamp: %m");
83                                 return r;
84                         }
85
86                         r = snprintf(buf + pos, size - pos,
87                                      "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
88                         if (r + pos > size)
89                                 /* not enough space */
90                                 return pos;
91
92                         u->entry_state ++;
93
94                         if (r + pos == size) {
95                                 /* exactly one character short, but we don't need it */
96                                 buf[size - 1] = '\n';
97                                 return size;
98                         }
99
100                         pos += r;
101                 }       /* fall through */
102
103                 case ENTRY_BOOT_ID: {
104                         sd_id128_t boot_id;
105                         char sid[33];
106
107                         r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
108                         if (r < 0) {
109                                 log_error_errno(r, "Failed to get monotonic timestamp: %m");
110                                 return r;
111                         }
112
113                         r = snprintf(buf + pos, size - pos,
114                                      "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
115                         if (r + pos> size)
116                                 /* not enough space */
117                                 return pos;
118
119                         u->entry_state ++;
120
121                         if (r + pos == size) {
122                                 /* exactly one character short, but we don't need it */
123                                 buf[size - 1] = '\n';
124                                 return size;
125                         }
126
127                         pos += r;
128                 }       /* fall through */
129
130                 case ENTRY_NEW_FIELD: {
131                         u->field_pos = 0;
132
133                         r = sd_journal_enumerate_data(u->journal,
134                                                       &u->field_data,
135                                                       &u->field_length);
136                         if (r < 0) {
137                                 log_error_errno(r, "Failed to move to next field in entry: %m");
138                                 return r;
139                         } else if (r == 0) {
140                                 u->entry_state = ENTRY_OUTRO;
141                                 continue;
142                         }
143
144                         if (!utf8_is_printable_newline(u->field_data,
145                                                        u->field_length, false)) {
146                                 u->entry_state = ENTRY_BINARY_FIELD_START;
147                                 continue;
148                         }
149
150                         u->entry_state ++;
151                 }       /* fall through */
152
153                 case ENTRY_TEXT_FIELD:
154                 case ENTRY_BINARY_FIELD: {
155                         bool done;
156                         size_t tocopy;
157
158                         done = size - pos > u->field_length - u->field_pos;
159                         if (done)
160                                 tocopy = u->field_length - u->field_pos;
161                         else
162                                 tocopy = size - pos;
163
164                         memcpy(buf + pos,
165                                (char*) u->field_data + u->field_pos,
166                                tocopy);
167
168                         if (done) {
169                                 buf[pos + tocopy] = '\n';
170                                 pos += tocopy + 1;
171                                 u->entry_state = ENTRY_NEW_FIELD;
172                                 continue;
173                         } else {
174                                 u->field_pos += tocopy;
175                                 return size;
176                         }
177                 }
178
179                 case ENTRY_BINARY_FIELD_START: {
180                         const char *c;
181                         size_t len;
182
183                         c = memchr(u->field_data, '=', u->field_length);
184                         if (!c || c == u->field_data) {
185                                 log_error("Invalid field.");
186                                 return -EINVAL;
187                         }
188
189                         len = c - (const char*)u->field_data;
190
191                         /* need space for label + '\n' */
192                         if (size - pos < len + 1)
193                                 return pos;
194
195                         memcpy(buf + pos, u->field_data, len);
196                         buf[pos + len] = '\n';
197                         pos += len + 1;
198
199                         u->field_pos = len + 1;
200                         u->entry_state ++;
201                 }       /* fall through */
202
203                 case ENTRY_BINARY_FIELD_SIZE: {
204                         uint64_t le64;
205
206                         /* need space for uint64_t */
207                         if (size - pos < 8)
208                                 return pos;
209
210                         le64 = htole64(u->field_length - u->field_pos);
211                         memcpy(buf + pos, &le64, 8);
212                         pos += 8;
213
214                         u->entry_state ++;
215                         continue;
216                 }
217
218                 case ENTRY_OUTRO:
219                         /* need space for '\n' */
220                         if (size - pos < 1)
221                                 return pos;
222
223                         buf[pos++] = '\n';
224                         u->entry_state ++;
225                         u->entries_sent ++;
226
227                         return pos;
228
229                 default:
230                         assert_not_reached("WTF?");
231                 }
232         }
233         assert_not_reached("WTF?");
234 }
235
236 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
237         Uploader *u = userp;
238         int r;
239         sd_journal *j;
240         size_t filled = 0;
241         ssize_t w;
242
243         assert(u);
244         assert(nmemb <= SSIZE_MAX / size);
245
246         j = u->journal;
247
248         while (j && filled < size * nmemb) {
249                 if (u->entry_state == ENTRY_DONE) {
250                         r = sd_journal_next(j);
251                         if (r < 0) {
252                                 log_error_errno(r, "Failed to move to next entry in journal: %m");
253                                 return CURL_READFUNC_ABORT;
254                         } else if (r == 0) {
255                                 if (u->input_event)
256                                         log_debug("No more entries, waiting for journal.");
257                                 else {
258                                         log_info("No more entries, closing journal.");
259                                         close_journal_input(u);
260                                 }
261
262                                 u->uploading = false;
263
264                                 break;
265                         }
266
267                         u->entry_state = ENTRY_CURSOR;
268                 }
269
270                 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
271                 if (w < 0)
272                         return CURL_READFUNC_ABORT;
273                 filled += w;
274
275                 if (filled == 0) {
276                         log_error("Buffer space is too small to write entry.");
277                         return CURL_READFUNC_ABORT;
278                 } else if (u->entry_state != ENTRY_DONE)
279                         /* This means that all available space was used up */
280                         break;
281
282                 log_debug("Entry %zu (%s) has been uploaded.",
283                           u->entries_sent, u->current_cursor);
284         }
285
286         return filled;
287 }
288
289 void close_journal_input(Uploader *u) {
290         assert(u);
291
292         if (u->journal) {
293                 log_debug("Closing journal input.");
294
295                 sd_journal_close(u->journal);
296                 u->journal = NULL;
297         }
298         u->timeout = 0;
299 }
300
301 static int process_journal_input(Uploader *u, int skip) {
302         int r;
303
304         r = sd_journal_next_skip(u->journal, skip);
305         if (r < 0) {
306                 log_error_errno(r, "Failed to skip to next entry: %m");
307                 return r;
308         } else if (r < skip)
309                 return 0;
310
311         /* have data */
312         u->entry_state = ENTRY_CURSOR;
313         return start_upload(u, journal_input_callback, u);
314 }
315
316 int check_journal_input(Uploader *u) {
317         if (u->input_event) {
318                 int r;
319
320                 r = sd_journal_process(u->journal);
321                 if (r < 0) {
322                         log_error_errno(r, "Failed to process journal: %m");
323                         close_journal_input(u);
324                         return r;
325                 }
326
327                 if (r == SD_JOURNAL_NOP)
328                         return 0;
329         }
330
331         return process_journal_input(u, 1);
332 }
333
334 static int dispatch_journal_input(sd_event_source *event,
335                                   int fd,
336                                   uint32_t revents,
337                                   void *userp) {
338         Uploader *u = userp;
339
340         assert(u);
341
342         if (u->uploading) {
343                 log_warning("dispatch_journal_input called when uploading, ignoring.");
344                 return 0;
345         }
346
347         log_debug("Detected journal input, checking for new data.");
348         return check_journal_input(u);
349 }
350
351 int open_journal_for_upload(Uploader *u,
352                             sd_journal *j,
353                             const char *cursor,
354                             bool after_cursor,
355                             bool follow) {
356         int fd, r, events;
357
358         u->journal = j;
359
360         sd_journal_set_data_threshold(j, 0);
361
362         if (follow) {
363                 fd = sd_journal_get_fd(j);
364                 if (fd < 0) {
365                         log_error_errno(fd, "sd_journal_get_fd failed: %m");
366                         return fd;
367                 }
368
369                 events = sd_journal_get_events(j);
370
371                 r = sd_journal_reliable_fd(j);
372                 assert(r >= 0);
373                 if (r > 0)
374                         u->timeout = -1;
375                 else
376                         u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
377
378                 r = sd_event_add_io(u->events, &u->input_event,
379                                     fd, events, dispatch_journal_input, u);
380                 if (r < 0) {
381                         log_error_errno(r, "Failed to register input event: %m");
382                         return r;
383                 }
384
385                 log_debug("Listening for journal events on fd:%d, timeout %d",
386                           fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
387         } else
388                 log_debug("Not listening for journal events.");
389
390         if (cursor) {
391                 r = sd_journal_seek_cursor(j, cursor);
392                 if (r < 0) {
393                         log_error_errno(r, "Failed to seek to cursor %s: %m",
394                                         cursor);
395                         return r;
396                 }
397         }
398
399         return process_journal_input(u, 1 + !!after_cursor);
400 }