chiark / gitweb /
290cfb9f5648c0a52314dff4588f94e29ddbd3dc
[elogind.git] / src / journal-remote / journal-upload-journal.c
1 #include <stdbool.h>
2
3 #include <curl/curl.h>
4
5 #include "util.h"
6 #include "log.h"
7 #include "utf8.h"
8 #include "journal-upload.h"
9
10 /**
11  * Write up to size bytes to buf. Return negative on error, and number of
12  * bytes written otherwise. The last case is a kind of an error too.
13  */
14 static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
15         int r;
16         size_t pos = 0;
17
18         assert(size <= SSIZE_MAX);
19
20         while (true) {
21
22                 switch(u->entry_state) {
23                 case ENTRY_CURSOR: {
24                         free(u->current_cursor);
25                         u->current_cursor = NULL;
26
27                         r = sd_journal_get_cursor(u->journal, &u->current_cursor);
28                         if (r < 0) {
29                                 log_error_errno(-r, "Failed to get cursor: %m");
30                                 return r;
31                         }
32
33                         r = snprintf(buf + pos, size - pos,
34                                      "__CURSOR=%s\n", u->current_cursor);
35                         if (pos + r > size)
36                                 /* not enough space */
37                                 return pos;
38
39                         u->entry_state ++;
40
41                         if (pos + r == size) {
42                                 /* exactly one character short, but we don't need it */
43                                 buf[size - 1] = '\n';
44                                 return size;
45                         }
46
47                         pos += r;
48                 }       /* fall through */
49
50                 case ENTRY_REALTIME: {
51                         usec_t realtime;
52
53                         r = sd_journal_get_realtime_usec(u->journal, &realtime);
54                         if (r < 0) {
55                                 log_error_errno(-r, "Failed to get realtime timestamp: %m");
56                                 return r;
57                         }
58
59                         r = snprintf(buf + pos, size - pos,
60                                      "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
61                         if (r + pos > size)
62                                 /* not enough space */
63                                 return pos;
64
65                         u->entry_state ++;
66
67                         if (r + pos == size) {
68                                 /* exactly one character short, but we don't need it */
69                                 buf[size - 1] = '\n';
70                                 return size;
71                         }
72
73                         pos += r;
74                 }       /* fall through */
75
76                 case ENTRY_MONOTONIC: {
77                         usec_t monotonic;
78                         sd_id128_t boot_id;
79
80                         r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
81                         if (r < 0) {
82                                 log_error_errno(-r, "Failed to get monotonic timestamp: %m");
83                                 return r;
84                         }
85
86                         r = snprintf(buf + pos, size - pos,
87                                      "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
88                         if (r + pos > size)
89                                 /* not enough space */
90                                 return pos;
91
92                         u->entry_state ++;
93
94                         if (r + pos == size) {
95                                 /* exactly one character short, but we don't need it */
96                                 buf[size - 1] = '\n';
97                                 return size;
98                         }
99
100                         pos += r;
101                 }       /* fall through */
102
103                 case ENTRY_BOOT_ID: {
104                         sd_id128_t boot_id;
105                         char sid[33];
106
107                         r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
108                         if (r < 0) {
109                                 log_error_errno(-r, "Failed to get monotonic timestamp: %m");
110                                 return r;
111                         }
112
113                         r = snprintf(buf + pos, size - pos,
114                                      "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
115                         if (r + pos> size)
116                                 /* not enough space */
117                                 return pos;
118
119                         u->entry_state ++;
120
121                         if (r + pos == size) {
122                                 /* exactly one character short, but we don't need it */
123                                 buf[size - 1] = '\n';
124                                 return size;
125                         }
126
127                         pos += r;
128                 }       /* fall through */
129
130                 case ENTRY_NEW_FIELD: {
131                         u->field_pos = 0;
132
133                         r = sd_journal_enumerate_data(u->journal,
134                                                       &u->field_data,
135                                                       &u->field_length);
136                         if (r < 0) {
137                                 log_error("Failed to move to next field in entry: %s",
138                                           strerror(-r));
139                                 return r;
140                         } else if (r == 0) {
141                                 u->entry_state = ENTRY_OUTRO;
142                                 continue;
143                         }
144
145                         if (!utf8_is_printable_newline(u->field_data,
146                                                        u->field_length, false)) {
147                                 u->entry_state = ENTRY_BINARY_FIELD_START;
148                                 continue;
149                         }
150
151                         u->entry_state ++;
152                 }       /* fall through */
153
154                 case ENTRY_TEXT_FIELD:
155                 case ENTRY_BINARY_FIELD: {
156                         bool done;
157                         size_t tocopy;
158
159                         done = size - pos > u->field_length - u->field_pos;
160                         if (done)
161                                 tocopy = u->field_length - u->field_pos;
162                         else
163                                 tocopy = size - pos;
164
165                         memcpy(buf + pos,
166                                (char*) u->field_data + u->field_pos,
167                                tocopy);
168
169                         if (done) {
170                                 buf[pos + tocopy] = '\n';
171                                 pos += tocopy + 1;
172                                 u->entry_state = ENTRY_NEW_FIELD;
173                                 continue;
174                         } else {
175                                 u->field_pos += tocopy;
176                                 return size;
177                         }
178                 }
179
180                 case ENTRY_BINARY_FIELD_START: {
181                         const char *c;
182                         size_t len;
183
184                         c = memchr(u->field_data, '=', u->field_length);
185                         if (!c || c == u->field_data) {
186                                 log_error("Invalid field.");
187                                 return -EINVAL;
188                         }
189
190                         len = c - (const char*)u->field_data;
191
192                         /* need space for label + '\n' */
193                         if (size - pos < len + 1)
194                                 return pos;
195
196                         memcpy(buf + pos, u->field_data, len);
197                         buf[pos + len] = '\n';
198                         pos += len + 1;
199
200                         u->field_pos = len + 1;
201                         u->entry_state ++;
202                 }       /* fall through */
203
204                 case ENTRY_BINARY_FIELD_SIZE: {
205                         uint64_t le64;
206
207                         /* need space for uint64_t */
208                         if (size - pos < 8)
209                                 return pos;
210
211                         le64 = htole64(u->field_length - u->field_pos);
212                         memcpy(buf + pos, &le64, 8);
213                         pos += 8;
214
215                         u->entry_state ++;
216                         continue;
217                 }
218
219                 case ENTRY_OUTRO:
220                         /* need space for '\n' */
221                         if (size - pos < 1)
222                                 return pos;
223
224                         buf[pos++] = '\n';
225                         u->entry_state ++;
226                         u->entries_sent ++;
227
228                         return pos;
229
230                 default:
231                         assert_not_reached("WTF?");
232                 }
233         }
234         assert_not_reached("WTF?");
235 }
236
237 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
238         Uploader *u = userp;
239         int r;
240         sd_journal *j;
241         size_t filled = 0;
242         ssize_t w;
243
244         assert(u);
245         assert(nmemb <= SSIZE_MAX / size);
246
247         j = u->journal;
248
249         while (j && filled < size * nmemb) {
250                 if (u->entry_state == ENTRY_DONE) {
251                         r = sd_journal_next(j);
252                         if (r < 0) {
253                                 log_error("Failed to move to next entry in journal: %s",
254                                           strerror(-r));
255                                 return CURL_READFUNC_ABORT;
256                         } else if (r == 0) {
257                                 if (u->input_event)
258                                         log_debug("No more entries, waiting for journal.");
259                                 else {
260                                         log_info("No more entries, closing journal.");
261                                         close_journal_input(u);
262                                 }
263
264                                 u->uploading = false;
265
266                                 break;
267                         }
268
269                         u->entry_state = ENTRY_CURSOR;
270                 }
271
272                 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
273                 if (w < 0)
274                         return CURL_READFUNC_ABORT;
275                 filled += w;
276
277                 if (filled == 0) {
278                         log_error("Buffer space is too small to write entry.");
279                         return CURL_READFUNC_ABORT;
280                 } else if (u->entry_state != ENTRY_DONE)
281                         /* This means that all available space was used up */
282                         break;
283
284                 log_debug("Entry %zu (%s) has been uploaded.",
285                           u->entries_sent, u->current_cursor);
286         }
287
288         return filled;
289 }
290
291 void close_journal_input(Uploader *u) {
292         assert(u);
293
294         if (u->journal) {
295                 log_debug("Closing journal input.");
296
297                 sd_journal_close(u->journal);
298                 u->journal = NULL;
299         }
300         u->timeout = 0;
301 }
302
303 static int process_journal_input(Uploader *u, int skip) {
304         int r;
305
306         r = sd_journal_next_skip(u->journal, skip);
307         if (r < 0) {
308                 log_error_errno(-r, "Failed to skip to next entry: %m");
309                 return r;
310         } else if (r < skip)
311                 return 0;
312
313         /* have data */
314         u->entry_state = ENTRY_CURSOR;
315         return start_upload(u, journal_input_callback, u);
316 }
317
318 int check_journal_input(Uploader *u) {
319         if (u->input_event) {
320                 int r;
321
322                 r = sd_journal_process(u->journal);
323                 if (r < 0) {
324                         log_error_errno(-r, "Failed to process journal: %m");
325                         close_journal_input(u);
326                         return r;
327                 }
328
329                 if (r == SD_JOURNAL_NOP)
330                         return 0;
331         }
332
333         return process_journal_input(u, 1);
334 }
335
336 static int dispatch_journal_input(sd_event_source *event,
337                                   int fd,
338                                   uint32_t revents,
339                                   void *userp) {
340         Uploader *u = userp;
341
342         assert(u);
343
344         if (u->uploading) {
345                 log_warning("dispatch_journal_input called when uploading, ignoring.");
346                 return 0;
347         }
348
349         log_debug("Detected journal input, checking for new data.");
350         return check_journal_input(u);
351 }
352
353 int open_journal_for_upload(Uploader *u,
354                             sd_journal *j,
355                             const char *cursor,
356                             bool after_cursor,
357                             bool follow) {
358         int fd, r, events;
359
360         u->journal = j;
361
362         sd_journal_set_data_threshold(j, 0);
363
364         if (follow) {
365                 fd = sd_journal_get_fd(j);
366                 if (fd < 0) {
367                         log_error_errno(-fd, "sd_journal_get_fd failed: %m");
368                         return fd;
369                 }
370
371                 events = sd_journal_get_events(j);
372
373                 r = sd_journal_reliable_fd(j);
374                 assert(r >= 0);
375                 if (r > 0)
376                         u->timeout = -1;
377                 else
378                         u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
379
380                 r = sd_event_add_io(u->events, &u->input_event,
381                                     fd, events, dispatch_journal_input, u);
382                 if (r < 0) {
383                         log_error_errno(-r, "Failed to register input event: %m");
384                         return r;
385                 }
386
387                 log_debug("Listening for journal events on fd:%d, timeout %d",
388                           fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
389         } else
390                 log_debug("Not listening for journal events.");
391
392         if (cursor) {
393                 r = sd_journal_seek_cursor(j, cursor);
394                 if (r < 0) {
395                         log_error("Failed to seek to cursor %s: %s",
396                                   cursor, strerror(-r));
397                         return r;
398                 }
399         }
400
401         return process_journal_input(u, 1 + !!after_cursor);
402 }