chiark / gitweb /
treewide: Correct typos and spell plural of bus consistent
[elogind.git] / src / shared / barrier.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 David Herrmann <dh.herrmann@gmail.com>
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <poll.h>
25 #include <stdbool.h>
26 #include <stdint.h>
27 #include <stdlib.h>
28 #include <sys/eventfd.h>
29 #include <sys/types.h>
30 #include <unistd.h>
31
32 #include "barrier.h"
33 #include "macro.h"
34 #include "util.h"
35
36 /**
37  * Barriers
38  * This barrier implementation provides a simple synchronization method based
39  * on file-descriptors that can safely be used between threads and processes. A
40  * barrier object contains 2 shared counters based on eventfd. Both processes
41  * can now place barriers and wait for the other end to reach a random or
42  * specific barrier.
43  * Barriers are numbered, so you can either wait for the other end to reach any
44  * barrier or the last barrier that you placed. This way, you can use barriers
45  * for one-way *and* full synchronization. Note that even-though barriers are
46  * numbered, these numbers are internal and recycled once both sides reached the
47  * same barrier (implemented as a simple signed counter). It is thus not
48  * possible to address barriers by their ID.
49  *
50  * Barrier-API: Both ends can place as many barriers via barrier_place() as
51  * they want and each pair of barriers on both sides will be implicitly linked.
52  * Each side can use the barrier_wait/sync_*() family of calls to wait for the
53  * other side to place a specific barrier. barrier_wait_next() waits until the
54  * other side calls barrier_place(). No links between the barriers are
55  * considered and this simply serves as most basic asynchronous barrier.
56  * barrier_sync_next() is like barrier_wait_next() and waits for the other side
57  * to place their next barrier via barrier_place(). However, it only waits for
58  * barriers that are linked to a barrier we already placed. If the other side
59  * already placed more barriers than we did, barrier_sync_next() returns
60  * immediately.
61  * barrier_sync() extends barrier_sync_next() and waits until the other end
62  * placed as many barriers via barrier_place() as we did. If they already placed
63  * as many as we did (or more), it returns immediately.
64  *
65  * Additionally to basic barriers, an abortion event is available.
66  * barrier_abort() places an abortion event that cannot be undone. An abortion
67  * immediately cancels all placed barriers and replaces them. Any running and
68  * following wait/sync call besides barrier_wait_abortion() will immediately
69  * return false on both sides (otherwise, they always return true).
70  * barrier_abort() can be called multiple times on both ends and will be a
71  * no-op if already called on this side.
72  * barrier_wait_abortion() can be used to wait for the other side to call
73  * barrier_abort() and is the only wait/sync call that does not return
74  * immediately if we aborted outself. It only returns once the other side
75  * called barrier_abort().
76  *
77  * Barriers can be used for in-process and inter-process synchronization.
78  * However, for in-process synchronization you could just use mutexes.
79  * Therefore, main target is IPC and we require both sides to *not* share the FD
80  * table. If that's given, barriers provide target tracking: If the remote side
81  * exit()s, an abortion event is implicitly queued on the other side. This way,
82  * a sync/wait call will be woken up if the remote side crashed or exited
83  * unexpectedly. However, note that these abortion events are only queued if the
84  * barrier-queue has been drained. Therefore, it is safe to place a barrier and
85  * exit. The other side can safely wait on the barrier even though the exit
86  * queued an abortion event. Usually, the abortion event would overwrite the
87  * barrier, however, that's not true for exit-abortion events. Those are only
88  * queued if the barrier-queue is drained (thus, the receiving side has placed
89  * more barriers than the remote side).
90  */
91
92 /**
93  * barrier_create() - Initialize a barrier object
94  * @obj: barrier to initialize
95  *
96  * This initializes a barrier object. The caller is responsible of allocating
97  * the memory and keeping it valid. The memory does not have to be zeroed
98  * beforehand.
99  * Two eventfd objects are allocated for each barrier. If allocation fails, an
100  * error is returned.
101  *
102  * If this function fails, the barrier is reset to an invalid state so it is
103  * safe to call barrier_destroy() on the object regardless whether the
104  * initialization succeeded or not.
105  *
106  * The caller is responsible to destroy the object via barrier_destroy() before
107  * releasing the underlying memory.
108  *
109  * Returns: 0 on success, negative error code on failure.
110  */
111 int barrier_create(Barrier *b) {
112         _cleanup_(barrier_destroyp) Barrier *staging = b;
113         int r;
114
115         assert(b);
116
117         b->me = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
118         if (b->me < 0)
119                 return -errno;
120
121         b->them = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
122         if (b->them < 0)
123                 return -errno;
124
125         r = pipe2(b->pipe, O_CLOEXEC | O_NONBLOCK);
126         if (r < 0)
127                 return -errno;
128
129         staging = NULL;
130         return 0;
131 }
132
133 /**
134  * barrier_destroy() - Destroy a barrier object
135  * @b: barrier to destroy or NULL
136  *
137  * This destroys a barrier object that has previously been passed to
138  * barrier_create(). The object is released and reset to invalid
139  * state. Therefore, it is safe to call barrier_destroy() multiple
140  * times or even if barrier_create() failed. However, barrier must be
141  * always initialized with BARRIER_NULL.
142  *
143  * If @b is NULL, this is a no-op.
144  */
145 void barrier_destroy(Barrier *b) {
146         if (!b)
147                 return;
148
149         b->me = safe_close(b->me);
150         b->them = safe_close(b->them);
151         safe_close_pair(b->pipe);
152         b->barriers = 0;
153 }
154
155 /**
156  * barrier_set_role() - Set the local role of the barrier
157  * @b: barrier to operate on
158  * @role: role to set on the barrier
159  *
160  * This sets the roles on a barrier object. This is needed to know
161  * which side of the barrier you're on. Usually, the parent creates
162  * the barrier via barrier_create() and then calls fork() or clone().
163  * Therefore, the FDs are duplicated and the child retains the same
164  * barrier object.
165  *
166  * Both sides need to call barrier_set_role() after fork() or clone()
167  * are done. If this is not done, barriers will not work correctly.
168  *
169  * Note that barriers could be supported without fork() or clone(). However,
170  * this is currently not needed so it hasn't been implemented.
171  */
172 void barrier_set_role(Barrier *b, unsigned int role) {
173         int fd;
174
175         assert(b);
176         assert(role == BARRIER_PARENT || role == BARRIER_CHILD);
177         /* make sure this is only called once */
178         assert(b->pipe[0] >= 0 && b->pipe[1] >= 0);
179
180         if (role == BARRIER_PARENT)
181                 b->pipe[1] = safe_close(b->pipe[1]);
182         else {
183                 b->pipe[0] = safe_close(b->pipe[0]);
184
185                 /* swap me/them for children */
186                 fd = b->me;
187                 b->me = b->them;
188                 b->them = fd;
189         }
190 }
191
192 /* places barrier; returns false if we aborted, otherwise true */
193 static bool barrier_write(Barrier *b, uint64_t buf) {
194         ssize_t len;
195
196         /* prevent new sync-points if we already aborted */
197         if (barrier_i_aborted(b))
198                 return false;
199
200         do {
201                 len = write(b->me, &buf, sizeof(buf));
202         } while (len < 0 && IN_SET(errno, EAGAIN, EINTR));
203
204         if (len != sizeof(buf))
205                 goto error;
206
207         /* lock if we aborted */
208         if (buf >= (uint64_t)BARRIER_ABORTION) {
209                 if (barrier_they_aborted(b))
210                         b->barriers = BARRIER_WE_ABORTED;
211                 else
212                         b->barriers = BARRIER_I_ABORTED;
213         } else if (!barrier_is_aborted(b))
214                 b->barriers += buf;
215
216         return !barrier_i_aborted(b);
217
218 error:
219         /* If there is an unexpected error, we have to make this fatal. There
220          * is no way we can recover from sync-errors. Therefore, we close the
221          * pipe-ends and treat this as abortion. The other end will notice the
222          * pipe-close and treat it as abortion, too. */
223
224         safe_close_pair(b->pipe);
225         b->barriers = BARRIER_WE_ABORTED;
226         return false;
227 }
228
229 /* waits for barriers; returns false if they aborted, otherwise true */
230 static bool barrier_read(Barrier *b, int64_t comp) {
231         if (barrier_they_aborted(b))
232                 return false;
233
234         while (b->barriers > comp) {
235                 struct pollfd pfd[2] = {
236                         { .fd = b->pipe[0] >= 0 ? b->pipe[0] : b->pipe[1],
237                           .events = POLLHUP },
238                         { .fd = b->them,
239                           .events = POLLIN }};
240                 uint64_t buf;
241                 int r;
242
243                 r = poll(pfd, 2, -1);
244                 if (r < 0 && IN_SET(errno, EAGAIN, EINTR))
245                         continue;
246                 else if (r < 0)
247                         goto error;
248
249                 if (pfd[1].revents) {
250                         ssize_t len;
251
252                         /* events on @them signal new data for us */
253                         len = read(b->them, &buf, sizeof(buf));
254                         if (len < 0 && IN_SET(errno, EAGAIN, EINTR))
255                                 continue;
256
257                         if (len != sizeof(buf))
258                                 goto error;
259                 } else if (pfd[0].revents & (POLLHUP | POLLERR | POLLNVAL))
260                         /* POLLHUP on the pipe tells us the other side exited.
261                          * We treat this as implicit abortion. But we only
262                          * handle it if there's no event on the eventfd. This
263                          * guarantees that exit-abortions do not overwrite real
264                          * barriers. */
265                         buf = BARRIER_ABORTION;
266                 else
267                         continue;
268
269                 /* lock if they aborted */
270                 if (buf >= (uint64_t)BARRIER_ABORTION) {
271                         if (barrier_i_aborted(b))
272                                 b->barriers = BARRIER_WE_ABORTED;
273                         else
274                                 b->barriers = BARRIER_THEY_ABORTED;
275                 } else if (!barrier_is_aborted(b))
276                         b->barriers -= buf;
277         }
278
279         return !barrier_they_aborted(b);
280
281 error:
282         /* If there is an unexpected error, we have to make this fatal. There
283          * is no way we can recover from sync-errors. Therefore, we close the
284          * pipe-ends and treat this as abortion. The other end will notice the
285          * pipe-close and treat it as abortion, too. */
286
287         safe_close_pair(b->pipe);
288         b->barriers = BARRIER_WE_ABORTED;
289         return false;
290 }
291
292 /**
293  * barrier_place() - Place a new barrier
294  * @b: barrier object
295  *
296  * This places a new barrier on the barrier object. If either side already
297  * aborted, this is a no-op and returns "false". Otherwise, the barrier is
298  * placed and this returns "true".
299  *
300  * Returns: true if barrier was placed, false if either side aborted.
301  */
302 bool barrier_place(Barrier *b) {
303         assert(b);
304
305         if (barrier_is_aborted(b))
306                 return false;
307
308         barrier_write(b, BARRIER_SINGLE);
309         return true;
310 }
311
312 /**
313  * barrier_abort() - Abort the synchronization
314  * @b: barrier object to abort
315  *
316  * This aborts the barrier-synchronization. If barrier_abort() was already
317  * called on this side, this is a no-op. Otherwise, the barrier is put into the
318  * ABORT-state and will stay there. The other side is notified about the
319  * abortion. Any following attempt to place normal barriers or to wait on normal
320  * barriers will return immediately as "false".
321  *
322  * You can wait for the other side to call barrier_abort(), too. Use
323  * barrier_wait_abortion() for that.
324  *
325  * Returns: false if the other side already aborted, true otherwise.
326  */
327 bool barrier_abort(Barrier *b) {
328         assert(b);
329
330         barrier_write(b, BARRIER_ABORTION);
331         return !barrier_they_aborted(b);
332 }
333
334 /**
335  * barrier_wait_next() - Wait for the next barrier of the other side
336  * @b: barrier to operate on
337  *
338  * This waits until the other side places its next barrier. This is independent
339  * of any barrier-links and just waits for any next barrier of the other side.
340  *
341  * If either side aborted, this returns false.
342  *
343  * Returns: false if either side aborted, true otherwise.
344  */
345 bool barrier_wait_next(Barrier *b) {
346         assert(b);
347
348         if (barrier_is_aborted(b))
349                 return false;
350
351         barrier_read(b, b->barriers - 1);
352         return !barrier_is_aborted(b);
353 }
354
355 /**
356  * barrier_wait_abortion() - Wait for the other side to abort
357  * @b: barrier to operate on
358  *
359  * This waits until the other side called barrier_abort(). This can be called
360  * regardless whether the local side already called barrier_abort() or not.
361  *
362  * If the other side has already aborted, this returns immediately.
363  *
364  * Returns: false if the local side aborted, true otherwise.
365  */
366 bool barrier_wait_abortion(Barrier *b) {
367         assert(b);
368
369         barrier_read(b, BARRIER_THEY_ABORTED);
370         return !barrier_i_aborted(b);
371 }
372
373 /**
374  * barrier_sync_next() - Wait for the other side to place a next linked barrier
375  * @b: barrier to operate on
376  *
377  * This is like barrier_wait_next() and waits for the other side to call
378  * barrier_place(). However, this only waits for linked barriers. That means, if
379  * the other side already placed more barriers than (or as much as) we did, this
380  * returns immediately instead of waiting.
381  *
382  * If either side aborted, this returns false.
383  *
384  * Returns: false if either side aborted, true otherwise.
385  */
386 bool barrier_sync_next(Barrier *b) {
387         assert(b);
388
389         if (barrier_is_aborted(b))
390                 return false;
391
392         barrier_read(b, MAX((int64_t)0, b->barriers - 1));
393         return !barrier_is_aborted(b);
394 }
395
396 /**
397  * barrier_sync() - Wait for the other side to place as many barriers as we did
398  * @b: barrier to operate on
399  *
400  * This is like barrier_sync_next() but waits for the other side to call
401  * barrier_place() as often as we did (in total). If they already placed as much
402  * as we did (or more), this returns immediately instead of waiting.
403  *
404  * If either side aborted, this returns false.
405  *
406  * Returns: false if either side aborted, true otherwise.
407  */
408 bool barrier_sync(Barrier *b) {
409         assert(b);
410
411         if (barrier_is_aborted(b))
412                 return false;
413
414         barrier_read(b, 0);
415         return !barrier_is_aborted(b);
416 }