chiark / gitweb /
c198329cb0196677e1768fad0b1e90f3e6962660
[elogind.git] / src / shared / barrier.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 David Herrmann <dh.herrmann@gmail.com>
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <limits.h>
25 #include <poll.h>
26 #include <stdbool.h>
27 #include <stdint.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <sys/eventfd.h>
32 #include <sys/types.h>
33 #include <unistd.h>
34
35 #include "barrier.h"
36 #include "macro.h"
37 #include "util.h"
38
39 /**
40  * Barriers
41  * This barrier implementation provides a simple synchronization method based
42  * on file-descriptors that can safely be used between threads and processes. A
43  * barrier object contains 2 shared counters based on eventfd. Both processes
44  * can now place barriers and wait for the other end to reach a random or
45  * specific barrier.
46  * Barriers are numbered, so you can either wait for the other end to reach any
47  * barrier or the last barrier that you placed. This way, you can use barriers
48  * for one-way *and* full synchronization. Note that even-though barriers are
49  * numbered, these numbers are internal and recycled once both sides reached the
50  * same barrier (implemented as a simple signed counter). It is thus not
51  * possible to address barriers by their ID.
52  *
53  * Barrier-API: Both ends can place as many barriers via barrier_place() as
54  * they want and each pair of barriers on both sides will be implicitly linked.
55  * Each side can use the barrier_wait/sync_*() family of calls to wait for the
56  * other side to place a specific barrier. barrier_wait_next() waits until the
57  * other side calls barrier_place(). No links between the barriers are
58  * considered and this simply serves as most basic asynchronous barrier.
59  * barrier_sync_next() is like barrier_wait_next() and waits for the other side
60  * to place their next barrier via barrier_place(). However, it only waits for
61  * barriers that are linked to a barrier we already placed. If the other side
62  * already placed more barriers than we did, barrier_sync_next() returns
63  * immediately.
64  * barrier_sync() extends barrier_sync_next() and waits until the other end
65  * placed as many barriers via barrier_place() as we did. If they already placed
66  * as many as we did (or more), it returns immediately.
67  *
68  * Additionally to basic barriers, an abortion event is available.
69  * barrier_abort() places an abortion event that cannot be undone. An abortion
70  * immediately cancels all placed barriers and replaces them. Any running and
71  * following wait/sync call besides barrier_wait_abortion() will immediately
72  * return false on both sides (otherwise, they always return true).
73  * barrier_abort() can be called multiple times on both ends and will be a
74  * no-op if already called on this side.
75  * barrier_wait_abortion() can be used to wait for the other side to call
76  * barrier_abort() and is the only wait/sync call that does not return
77  * immediately if we aborted outself. It only returns once the other side
78  * called barrier_abort().
79  *
80  * Barriers can be used for in-process and inter-process synchronization.
81  * However, for in-process synchronization you could just use mutexes.
82  * Therefore, main target is IPC and we require both sides to *not* share the FD
83  * table. If that's given, barriers provide target tracking: If the remote side
84  * exit()s, an abortion event is implicitly queued on the other side. This way,
85  * a sync/wait call will be woken up if the remote side crashed or exited
86  * unexpectedly. However, note that these abortion events are only queued if the
87  * barrier-queue has been drained. Therefore, it is safe to place a barrier and
88  * exit. The other side can safely wait on the barrier even though the exit
89  * queued an abortion event. Usually, the abortion event would overwrite the
90  * barrier, however, that's not true for exit-abortion events. Those are only
91  * queued if the barrier-queue is drained (thus, the receiving side has placed
92  * more barriers than the remote side).
93  */
94
95 /**
96  * barrier_init() - Initialize a barrier object
97  * @obj: barrier to initialize
98  *
99  * This initializes a barrier object. The caller is responsible of allocating
100  * the memory and keeping it valid. The memory does not have to be zeroed
101  * beforehand.
102  * Two eventfd objects are allocated for each barrier. If allocation fails, an
103  * error is returned.
104  *
105  * If this function fails, the barrier is reset to an invalid state so it is
106  * safe to call barrier_destroy() on the object regardless whether the
107  * initialization succeeded or not.
108  *
109  * The caller is responsible to destroy the object via barrier_destroy() before
110  * releasing the underlying memory.
111  *
112  * Returns: 0 on success, negative error code on failure.
113  */
114 int barrier_init(Barrier *obj) {
115         _cleanup_(barrier_destroy) Barrier b = { };
116         int r;
117
118         assert_return(obj, -EINVAL);
119
120         b.me = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
121         if (b.me < 0)
122                 return -errno;
123
124         b.them = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
125         if (b.them < 0)
126                 return -errno;
127
128         r = pipe2(b.pipe, O_CLOEXEC | O_NONBLOCK);
129         if (r < 0)
130                 return -errno;
131
132         memcpy(obj, &b, sizeof(b));
133         zero(b);
134         return 0;
135 }
136
137 /**
138  * barrier_destroy() - Destroy a barrier object
139  * @b: barrier to destroy or NULL
140  *
141  * This destroys a barrier object that has previously been initialized via
142  * barrier_init(). The object is released and reset to invalid state.
143  * Therefore, it is safe to call barrier_destroy() multiple times or even if
144  * barrier_init() failed. However, you must not call barrier_destroy() if you
145  * never called barrier_init() on the object before.
146  *
147  * It is safe to initialize a barrier via zero() / memset(.., 0, ...). Even
148  * though it has embedded FDs, barrier_destroy() can deal with zeroed objects
149  * just fine.
150  *
151  * If @b is NULL, this is a no-op.
152  */
153 void barrier_destroy(Barrier *b) {
154         if (!b)
155                 return;
156
157         /* @me and @them cannot be both FD 0. Lets be pedantic and check the
158          * pipes and barriers, too. If all are 0, the object was zero()ed and
159          * is invalid. This allows users to use zero(barrier) to reset the
160          * backing memory. */
161         if (b->me == 0 &&
162             b->them == 0 &&
163             b->pipe[0] == 0 &&
164             b->pipe[1] == 0 &&
165             b->barriers == 0)
166                 return;
167
168         b->me = safe_close(b->me);
169         b->them = safe_close(b->them);
170         b->pipe[0] = safe_close(b->pipe[0]);
171         b->pipe[1] = safe_close(b->pipe[1]);
172         b->barriers = 0;
173 }
174
175 /**
176  * barrier_set_role() - Set the local role of the barrier
177  * @b: barrier to operate on
178  * @role: role to set on the barrier
179  *
180  * This sets the roles on a barrier object. This is needed to know which
181  * side of the barrier you're on. Usually, the parent creates the barrier via
182  * barrier_init() and then calls fork() or clone(). Therefore, the FDs are
183  * duplicated and the child retains the same barrier object.
184  *
185  * Both sides need to call barrier_set_role() after fork() or clone() are done.
186  * If this is not done, barriers will not work correctly.
187  *
188  * Note that barriers could be supported without fork() or clone(). However,
189  * this is currently not needed so it hasn't been implemented.
190  */
191 void barrier_set_role(Barrier *b, unsigned int role) {
192         int fd;
193
194         assert(b);
195         assert(role == BARRIER_PARENT || role == BARRIER_CHILD);
196         /* make sure this is only called once */
197         assert(b->pipe[1] >= 0 && b->pipe[1] >= 0);
198
199         if (role == BARRIER_PARENT) {
200                 b->pipe[1] = safe_close(b->pipe[1]);
201         } else {
202                 b->pipe[0] = safe_close(b->pipe[0]);
203
204                 /* swap me/them for children */
205                 fd = b->me;
206                 b->me = b->them;
207                 b->them = fd;
208         }
209 }
210
211 /* places barrier; returns false if we aborted, otherwise true */
212 static bool barrier_write(Barrier *b, uint64_t buf) {
213         ssize_t len;
214
215         /* prevent new sync-points if we already aborted */
216         if (barrier_i_aborted(b))
217                 return false;
218
219         do {
220                 len = write(b->me, &buf, sizeof(buf));
221         } while (len < 0 && (errno == EAGAIN || errno == EINTR));
222
223         if (len != sizeof(buf))
224                 goto error;
225
226         /* lock if we aborted */
227         if (buf >= (uint64_t)BARRIER_ABORTION) {
228                 if (barrier_they_aborted(b))
229                         b->barriers = BARRIER_WE_ABORTED;
230                 else
231                         b->barriers = BARRIER_I_ABORTED;
232         } else if (!barrier_is_aborted(b)) {
233                 b->barriers += buf;
234         }
235
236         return !barrier_i_aborted(b);
237
238 error:
239         /* If there is an unexpected error, we have to make this fatal. There
240          * is no way we can recover from sync-errors. Therefore, we close the
241          * pipe-ends and treat this as abortion. The other end will notice the
242          * pipe-close and treat it as abortion, too. */
243
244         b->pipe[0] = safe_close(b->pipe[0]);
245         b->pipe[1] = safe_close(b->pipe[1]);
246         b->barriers = BARRIER_WE_ABORTED;
247         return false;
248 }
249
250 /* waits for barriers; returns false if they aborted, otherwise true */
251 static bool barrier_read(Barrier *b, int64_t comp) {
252         uint64_t buf;
253         ssize_t len;
254         struct pollfd pfd[2] = { };
255         int r;
256
257         if (barrier_they_aborted(b))
258                 return false;
259
260         while (b->barriers > comp) {
261                 pfd[0].fd = (b->pipe[0] >= 0) ? b->pipe[0] : b->pipe[1];
262                 pfd[0].events = POLLHUP;
263                 pfd[0].revents = 0;
264                 pfd[1].fd = b->them;
265                 pfd[1].events = POLLIN;
266                 pfd[1].revents = 0;
267
268                 r = poll(pfd, 2, -1);
269                 if (r < 0 && (errno == EAGAIN || errno == EINTR))
270                         continue;
271                 else if (r < 0)
272                         goto error;
273
274                 if (pfd[1].revents) {
275                         /* events on @them signal us new data */
276                         len = read(b->them, &buf, sizeof(buf));
277                         if (len < 0 && (errno == EAGAIN || errno == EINTR))
278                                 continue;
279
280                         if (len != sizeof(buf))
281                                 goto error;
282                 } else if (pfd[0].revents & (POLLHUP | POLLERR | POLLNVAL)) {
283                         /* POLLHUP on the pipe tells us the other side exited.
284                          * We treat this as implicit abortion. But we only
285                          * handle it if there's no event on the eventfd. This
286                          * guarantees that exit-abortions do not overwrite real
287                          * barriers. */
288                         buf = BARRIER_ABORTION;
289                 }
290
291                 /* lock if they aborted */
292                 if (buf >= (uint64_t)BARRIER_ABORTION) {
293                         if (barrier_i_aborted(b))
294                                 b->barriers = BARRIER_WE_ABORTED;
295                         else
296                                 b->barriers = BARRIER_THEY_ABORTED;
297                 } else if (!barrier_is_aborted(b)) {
298                         b->barriers -= buf;
299                 }
300         }
301
302         return !barrier_they_aborted(b);
303
304 error:
305         /* If there is an unexpected error, we have to make this fatal. There
306          * is no way we can recover from sync-errors. Therefore, we close the
307          * pipe-ends and treat this as abortion. The other end will notice the
308          * pipe-close and treat it as abortion, too. */
309
310         b->pipe[0] = safe_close(b->pipe[0]);
311         b->pipe[1] = safe_close(b->pipe[1]);
312         b->barriers = BARRIER_WE_ABORTED;
313         return false;
314 }
315
316 /**
317  * barrier_place() - Place a new barrier
318  * @b: barrier object
319  *
320  * This places a new barrier on the barrier object. If either side already
321  * aborted, this is a no-op and returns "false". Otherwise, the barrier is
322  * placed and this returns "true".
323  *
324  * Returns: true if barrier was placed, false if either side aborted.
325  */
326 bool barrier_place(Barrier *b) {
327         assert(b);
328
329         if (barrier_is_aborted(b))
330                 return false;
331
332         barrier_write(b, BARRIER_SINGLE);
333         return true;
334 }
335
336 /**
337  * barrier_abort() - Abort the synchronization
338  * @b: barrier object to abort
339  *
340  * This aborts the barrier-synchronization. If barrier_abort() was already
341  * called on this side, this is a no-op. Otherwise, the barrier is put into the
342  * ABORT-state and will stay there. The other side is notified about the
343  * abortion. Any following attempt to place normal barriers or to wait on normal
344  * barriers will return immediately as "false".
345  *
346  * You can wait for the other side to call barrier_abort(), too. Use
347  * barrier_wait_abortion() for that.
348  *
349  * Returns: false if the other side already aborted, true otherwise.
350  */
351 bool barrier_abort(Barrier *b) {
352         assert(b);
353
354         barrier_write(b, BARRIER_ABORTION);
355         return !barrier_they_aborted(b);
356 }
357
358 /**
359  * barrier_wait_next() - Wait for the next barrier of the other side
360  * @b: barrier to operate on
361  *
362  * This waits until the other side places its next barrier. This is independent
363  * of any barrier-links and just waits for any next barrier of the other side.
364  *
365  * If either side aborted, this returns false.
366  *
367  * Returns: false if either side aborted, true otherwise.
368  */
369 bool barrier_wait_next(Barrier *b) {
370         assert(b);
371
372         if (barrier_is_aborted(b))
373                 return false;
374
375         barrier_read(b, b->barriers - 1);
376         return !barrier_is_aborted(b);
377 }
378
379 /**
380  * barrier_wait_abortion() - Wait for the other side to abort
381  * @b: barrier to operate on
382  *
383  * This waits until the other side called barrier_abort(). This can be called
384  * regardless whether the local side already called barrier_abort() or not.
385  *
386  * If the other side has already aborted, this returns immediately.
387  *
388  * Returns: false if the local side aborted, true otherwise.
389  */
390 bool barrier_wait_abortion(Barrier *b) {
391         assert(b);
392
393         barrier_read(b, BARRIER_THEY_ABORTED);
394         return !barrier_i_aborted(b);
395 }
396
397 /**
398  * barrier_sync_next() - Wait for the other side to place a next linked barrier
399  * @b: barrier to operate on
400  *
401  * This is like barrier_wait_next() and waits for the other side to call
402  * barrier_place(). However, this only waits for linked barriers. That means, if
403  * the other side already placed more barriers than (or as much as) we did, this
404  * returns immediately instead of waiting.
405  *
406  * If either side aborted, this returns false.
407  *
408  * Returns: false if either side aborted, true otherwise.
409  */
410 bool barrier_sync_next(Barrier *b) {
411         assert(b);
412
413         if (barrier_is_aborted(b))
414                 return false;
415
416         barrier_read(b, MAX((int64_t)0, b->barriers - 1));
417         return !barrier_is_aborted(b);
418 }
419
420 /**
421  * barrier_sync() - Wait for the other side to place as many barriers as we did
422  * @b: barrier to operate on
423  *
424  * This is like barrier_sync_next() but waits for the other side to call
425  * barrier_place() as often as we did (in total). If they already placed as much
426  * as we did (or more), this returns immediately instead of waiting.
427  *
428  * If either side aborted, this returns false.
429  *
430  * Returns: false if either side aborted, true otherwise.
431  */
432 bool barrier_sync(Barrier *b) {
433         assert(b);
434
435         if (barrier_is_aborted(b))
436                 return false;
437
438         barrier_read(b, 0);
439         return !barrier_is_aborted(b);
440 }