chiark / gitweb /
bbb7112a7969c1a89bfe488bb4a45269b4302b7c
[disorder] / lib / event.c
1 /*
2  * This file is part of DisOrder.
3  * Copyright (C) 2004, 2005, 2007 Richard Kettlewell
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA
19  */
20 /** @file lib/event.c
21  * @brief DisOrder event loop
22  */
23
24 #include <config.h>
25
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/time.h>
29 #include <sys/types.h>
30 #include <sys/resource.h>
31 #include <sys/wait.h>
32 #include <unistd.h>
33 #include <assert.h>
34 #include <signal.h>
35 #include <errno.h>
36 #include <string.h>
37 #include <limits.h>
38 #include <sys/socket.h>
39 #include <netinet/in.h>
40 #include <sys/un.h>
41 #include <stdio.h>
42 #include "event.h"
43 #include "mem.h"
44 #include "log.h"
45 #include "syscalls.h"
46 #include "printf.h"
47 #include "sink.h"
48 #include "vector.h"
49
50 /** @brief A timeout */
51 struct timeout {
52   struct timeout *next;
53   struct timeval when;
54   ev_timeout_callback *callback;
55   void *u;
56   int resolve;
57 };
58
59 /** @brief A file descriptor in one mode */
60 struct fd {
61   int fd;
62   ev_fd_callback *callback;
63   void *u;
64   const char *what;
65 };
66
67 /** @brief All the file descriptors in a given mode */
68 struct fdmode {
69   /** @brief Mask of active file descriptors passed to @c select() */
70   fd_set enabled;
71
72   /** @brief File descriptor mask returned from @c select() */
73   fd_set tripped;
74
75   /** @brief Number of file descriptors in @p fds */
76   int nfds;
77
78   /** @brief Number of slots in @p fds */
79   int fdslots;
80
81   /** @brief Array of all active file descriptors */
82   struct fd *fds;
83
84   /** @brief Highest-numbered file descriptor or 0 */
85   int maxfd;
86 };
87
88 /** @brief A signal handler */
89 struct signal {
90   struct sigaction oldsa;
91   ev_signal_callback *callback;
92   void *u;
93 };
94
95 /** @brief A child process */
96 struct child {
97   pid_t pid;
98   int options;
99   ev_child_callback *callback;
100   void *u;
101 };
102
103 /** @brief An event loop */
104 struct ev_source {
105   /** @brief File descriptors, per mode */
106   struct fdmode mode[ev_nmodes];
107
108   /** @brief Sorted linked list of timeouts
109    *
110    * We could use @ref HEAP_TYPE now, but there aren't many timeouts.
111    */
112   struct timeout *timeouts;
113
114   /** @brief Array of handled signals */
115   struct signal signals[NSIG];
116
117   /** @brief Mask of handled signals */
118   sigset_t sigmask;
119
120   /** @brief Escape early from handling of @c select() results
121    *
122    * This is set if any of the file descriptor arrays are invalidated, since
123    * it's then not safe for processing of them to continue.
124    */
125   int escape;
126
127   /** @brief Signal handling pipe
128    *
129    * The signal handle writes signal numbers down this pipe.
130    */
131   int sigpipe[2];
132
133   /** @brief Number of child processes in @p children */
134   int nchildren;
135
136   /** @brief Number of slots in @p children */
137   int nchildslots;
138
139   /** @brief Array of child processes */
140   struct child *children;
141 };
142
143 /** @brief Names of file descriptor modes */
144 static const char *modenames[] = { "read", "write", "except" };
145
146 /* utilities ******************************************************************/
147
148 /** @brief Great-than comparison for timevals
149  *
150  * Ought to be in @file lib/timeval.h
151  */
152 static inline int gt(const struct timeval *a, const struct timeval *b) {
153   if(a->tv_sec > b->tv_sec)
154     return 1;
155   if(a->tv_sec == b->tv_sec
156      && a->tv_usec > b->tv_usec)
157     return 1;
158   return 0;
159 }
160
161 /** @brief Greater-than-or-equal comparison for timevals
162  *
163  * Ought to be in @file lib/timeval.h
164  */
165 static inline int ge(const struct timeval *a, const struct timeval *b) {
166   return !gt(b, a);
167 }
168
169 /* creation *******************************************************************/
170
171 /** @brief Create a new event loop */
172 ev_source *ev_new(void) {
173   ev_source *ev = xmalloc(sizeof *ev);
174   int n;
175
176   memset(ev, 0, sizeof *ev);
177   for(n = 0; n < ev_nmodes; ++n)
178     FD_ZERO(&ev->mode[n].enabled);
179   ev->sigpipe[0] = ev->sigpipe[1] = -1;
180   sigemptyset(&ev->sigmask);
181   return ev;
182 }
183
184 /* event loop *****************************************************************/
185
186 /** @brief Run the event loop
187  * @return -1 on error, non-0 if any callback returned non-0
188  */
189 int ev_run(ev_source *ev) {
190   for(;;) {
191     struct timeval now;
192     struct timeval delta;
193     int n, mode;
194     int ret;
195     int maxfd;
196     struct timeout *t, **tt;
197     struct stat sb;
198
199     xgettimeofday(&now, 0);
200     /* Handle timeouts.  We don't want to handle any timeouts that are added
201      * while we're handling them (otherwise we'd have to break out of infinite
202      * loops, preferrably without starving better-behaved subsystems).  Hence
203      * the slightly complicated two-phase approach here. */
204     for(t = ev->timeouts;
205         t && ge(&now, &t->when);
206         t = t->next) {
207       t->resolve = 1;
208       D(("calling timeout for %ld.%ld callback %p %p",
209          (long)t->when.tv_sec, (long)t->when.tv_usec,
210          (void *)t->callback, t->u));
211       ret = t->callback(ev, &now, t->u);
212       if(ret)
213         return ret;
214     }
215     tt = &ev->timeouts;
216     while((t = *tt)) {
217       if(t->resolve)
218         *tt = t->next;
219       else
220         tt = &t->next;
221     }
222     maxfd = 0;
223     for(mode = 0; mode < ev_nmodes; ++mode) {
224       ev->mode[mode].tripped = ev->mode[mode].enabled;
225       if(ev->mode[mode].maxfd > maxfd)
226         maxfd = ev->mode[mode].maxfd;
227     }
228     xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
229     do {
230       if(ev->timeouts) {
231         xgettimeofday(&now, 0);
232         delta.tv_sec = ev->timeouts->when.tv_sec - now.tv_sec;
233         delta.tv_usec = ev->timeouts->when.tv_usec - now.tv_usec;
234         if(delta.tv_usec < 0) {
235           delta.tv_usec += 1000000;
236           --delta.tv_sec;
237         }
238         if(delta.tv_sec < 0)
239           delta.tv_sec = delta.tv_usec = 0;
240         n = select(maxfd + 1,
241                    &ev->mode[ev_read].tripped,
242                    &ev->mode[ev_write].tripped,
243                    &ev->mode[ev_except].tripped,
244                    &delta);
245       } else {
246         n = select(maxfd + 1,
247                    &ev->mode[ev_read].tripped,
248                    &ev->mode[ev_write].tripped,
249                    &ev->mode[ev_except].tripped,
250                    0);
251       }
252     } while(n < 0 && errno == EINTR);
253     xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
254     if(n < 0) {
255       error(errno, "error calling select");
256       if(errno == EBADF) {
257         /* If there's a bad FD in the mix then check them all and log what we
258          * find, to ease debugging */
259         for(mode = 0; mode < ev_nmodes; ++mode) {
260           for(n = 0; n < ev->mode[mode].nfds; ++n) {
261             const int fd = ev->mode[mode].fds[n].fd;
262
263             if(FD_ISSET(fd, &ev->mode[mode].enabled)
264                && fstat(fd, &sb) < 0)
265               error(errno, "mode %s fstat %d (%s)",
266                     modenames[mode], fd, ev->mode[mode].fds[n].what);
267           }
268           for(n = 0; n < maxfd; ++n)
269             if(FD_ISSET(n, &ev->mode[mode].enabled)
270                && fstat(n, &sb) < 0)
271               error(errno, "mode %s fstat %d", modenames[mode], n);
272         }
273       }
274       return -1;
275     }
276     if(n > 0) {
277       /* if anything deranges the meaning of an fd, or re-orders the
278        * fds[] tables, we'd better give up; such operations will
279        * therefore set @escape@. */
280       ev->escape = 0;
281       for(mode = 0; mode < ev_nmodes && !ev->escape; ++mode)
282         for(n = 0; n < ev->mode[mode].nfds && !ev->escape; ++n) {
283           int fd = ev->mode[mode].fds[n].fd;
284           if(FD_ISSET(fd, &ev->mode[mode].tripped)) {
285             D(("calling %s fd %d callback %p %p", modenames[mode], fd,
286                (void *)ev->mode[mode].fds[n].callback,
287                ev->mode[mode].fds[n].u));
288             ret = ev->mode[mode].fds[n].callback(ev, fd,
289                                                  ev->mode[mode].fds[n].u);
290             if(ret)
291               return ret;
292           }
293         }
294     }
295     /* we'll pick up timeouts back round the loop */
296   }
297 }
298
299 /* file descriptors ***********************************************************/
300
301 /** @brief Register a file descriptor
302  * @param ev Event loop
303  * @param mode @c ev_read or @c ev_write
304  * @param fd File descriptor
305  * @param callback Called when @p is readable/writable
306  * @param u Passed to @p callback
307  * @param what Text description
308  * @return 0 on success, non-0 on error
309  *
310  * Sets @ref ev_source::escape, so no further processing of file descriptors
311  * will occur this time round the event loop.
312  */
313 int ev_fd(ev_source *ev,
314           ev_fdmode mode,
315           int fd,
316           ev_fd_callback *callback,
317           void *u,
318           const char *what) {
319   int n;
320
321   D(("registering %s fd %d callback %p %p", modenames[mode], fd,
322      (void *)callback, u));
323   assert(mode < ev_nmodes);
324   if(ev->mode[mode].nfds >= ev->mode[mode].fdslots) {
325     ev->mode[mode].fdslots = (ev->mode[mode].fdslots
326                                ? 2 * ev->mode[mode].fdslots : 16);
327     D(("expanding %s fd table to %d entries", modenames[mode],
328        ev->mode[mode].fdslots));
329     ev->mode[mode].fds = xrealloc(ev->mode[mode].fds,
330                                   ev->mode[mode].fdslots * sizeof (struct fd));
331   }
332   n = ev->mode[mode].nfds++;
333   FD_SET(fd, &ev->mode[mode].enabled);
334   ev->mode[mode].fds[n].fd = fd;
335   ev->mode[mode].fds[n].callback = callback;
336   ev->mode[mode].fds[n].u = u;
337   ev->mode[mode].fds[n].what = what;
338   if(fd > ev->mode[mode].maxfd)
339     ev->mode[mode].maxfd = fd;
340   ev->escape = 1;
341   return 0;
342 }
343
344 /** @brief Cancel a file descriptor
345  * @param ev Event loop
346  * @param mode @c ev_read or @c ev_write
347  * @param fd File descriptor
348  * @return 0 on success, non-0 on error
349  *
350  * Sets @ref ev_source::escape, so no further processing of file descriptors
351  * will occur this time round the event loop.
352  */
353 int ev_fd_cancel(ev_source *ev, ev_fdmode mode, int fd) {
354   int n;
355   int maxfd;
356
357   D(("cancelling mode %s fd %d", modenames[mode], fd));
358   /* find the right struct fd */
359   for(n = 0; n < ev->mode[mode].nfds && fd != ev->mode[mode].fds[n].fd; ++n)
360     ;
361   assert(n < ev->mode[mode].nfds);
362   /* swap in the last fd and reduce the count */
363   if(n != ev->mode[mode].nfds - 1)
364     ev->mode[mode].fds[n] = ev->mode[mode].fds[ev->mode[mode].nfds - 1];
365   --ev->mode[mode].nfds;
366   /* if that was the biggest fd, find the new biggest one */
367   if(fd == ev->mode[mode].maxfd) {
368     maxfd = 0;
369     for(n = 0; n < ev->mode[mode].nfds; ++n)
370       if(ev->mode[mode].fds[n].fd > maxfd)
371         maxfd = ev->mode[mode].fds[n].fd;
372     ev->mode[mode].maxfd = maxfd;
373   }
374   /* don't tell select about this fd any more */
375   FD_CLR(fd, &ev->mode[mode].enabled);
376   ev->escape = 1;
377   return 0;
378 }
379
380 /** @brief Re-enable a file descriptor
381  * @param ev Event loop
382  * @param mode @c ev_read or @c ev_write
383  * @param fd File descriptor
384  * @return 0 on success, non-0 on error
385  *
386  * It is harmless if @p fd is currently disabled, but it must not have been
387  * cancelled.
388  */
389 int ev_fd_enable(ev_source *ev, ev_fdmode mode, int fd) {
390   D(("enabling mode %s fd %d", modenames[mode], fd));
391   FD_SET(fd, &ev->mode[mode].enabled);
392   return 0;
393 }
394
395 /** @brief Temporarily disable a file descriptor
396  * @param ev Event loop
397  * @param mode @c ev_read or @c ev_write
398  * @param fd File descriptor
399  * @return 0 on success, non-0 on error
400  *
401  * Re-enable with ev_fd_enable().  It is harmless if @p fd is already disabled,
402  * but it must not have been cancelled.
403  */
404 int ev_fd_disable(ev_source *ev, ev_fdmode mode, int fd) {
405   D(("disabling mode %s fd %d", modenames[mode], fd));
406   FD_CLR(fd, &ev->mode[mode].enabled);
407   FD_CLR(fd, &ev->mode[mode].tripped);
408   return 0;
409 }
410
411 /** @brief Log a report of file descriptor state */
412 void ev_report(ev_source *ev) {
413   int n, fd;
414   ev_fdmode mode;
415   struct dynstr d[1];
416   char b[4096];
417
418   dynstr_init(d);
419   for(mode = 0; mode < ev_nmodes; ++mode) {
420     info("mode %s maxfd %d", modenames[mode], ev->mode[mode].maxfd);
421     for(n = 0; n < ev->mode[mode].nfds; ++n) {
422       fd = ev->mode[mode].fds[n].fd;
423       info("fd %s %d%s%s (%s)", modenames[mode], fd,
424            FD_ISSET(fd, &ev->mode[mode].enabled) ? " enabled" : "",
425            FD_ISSET(fd, &ev->mode[mode].tripped) ? " tripped" : "",
426            ev->mode[mode].fds[n].what);
427     }
428     d->nvec = 0;
429     for(fd = 0; fd <= ev->mode[mode].maxfd; ++fd) {
430       if(!FD_ISSET(fd, &ev->mode[mode].enabled))
431         continue;
432       for(n = 0; n < ev->mode[mode].nfds; ++n) {
433         if(ev->mode[mode].fds[n].fd == fd)
434           break;
435       }
436       if(n < ev->mode[mode].nfds)
437         snprintf(b, sizeof b, "%d(%s)", fd, ev->mode[mode].fds[n].what);
438       else
439         snprintf(b, sizeof b, "%d", fd);
440       dynstr_append(d, ' ');
441       dynstr_append_string(d, b);
442     }
443     dynstr_terminate(d);
444     info("%s enabled:%s", modenames[mode], d->vec);
445   }
446 }
447
448 /* timeouts *******************************************************************/
449
450 /** @brief Register a timeout
451  * @param ev Event source
452  * @param handle Where to store timeout handle, or @c NULL
453  * @param when Earliest time to call @p callback, or @c NULL
454  * @param callback Function to call at or after @p when
455  * @param u Passed to @p callback
456  * @return 0 on success, non-0 on error
457  *
458  * If @p when is a null pointer then a time of 0 is assumed.  The effect is to
459  * call the timeout handler from ev_run() next time around the event loop.
460  * This is used internally to schedule various operations if it is not
461  * convenient to call them from the current place in the call stack, or
462  * externally to ensure that other clients of the event loop get a look in when
463  * performing some lengthy operation.
464  */
465 int ev_timeout(ev_source *ev,
466                ev_timeout_handle *handlep,
467                const struct timeval *when,
468                ev_timeout_callback *callback,
469                void *u) {
470   struct timeout *t, *p, **pp;
471
472   D(("registering timeout at %ld.%ld callback %p %p",
473      when ? (long)when->tv_sec : 0, when ? (long)when->tv_usec : 0,
474      (void *)callback, u));
475   t = xmalloc(sizeof *t);
476   if(when)
477     t->when = *when;
478   t->callback = callback;
479   t->u = u;
480   pp = &ev->timeouts;
481   while((p = *pp) && gt(&t->when, &p->when))
482     pp = &p->next;
483   t->next = p;
484   *pp = t;
485   if(handlep)
486     *handlep = t;
487   return 0;
488 }
489
490 /** @brief Cancel a timeout
491  * @param ev Event loop
492  * @param handle Handle returned from ev_timeout()
493  * @return 0 on success, non-0 on error
494  */
495 int ev_timeout_cancel(ev_source *ev,
496                       ev_timeout_handle handle) {
497   struct timeout *t = handle, *p, **pp;
498
499   for(pp = &ev->timeouts; (p = *pp) && p != t; pp = &p->next)
500     ;
501   if(p) {
502     *pp = p->next;
503     return 0;
504   } else
505     return -1;
506 }
507
508 /* signals ********************************************************************/
509
510 /** @brief Mapping of signals to pipe write ends
511  *
512  * The pipes are per-event loop, it's possible in theory for there to be
513  * multiple event loops (e.g. in different threads), although in fact DisOrder
514  * does not do this.
515  */
516 static int sigfd[NSIG];
517
518 /** @brief The signal handler
519  * @param s Signal number
520  *
521  * Writes to @c sigfd[s].
522  */
523 static void sighandler(int s) {
524   unsigned char sc = s;
525   static const char errmsg[] = "error writing to signal pipe";
526
527   /* probably the reader has stopped listening for some reason */
528   if(write(sigfd[s], &sc, 1) < 0) {
529     write(2, errmsg, sizeof errmsg - 1);
530     abort();
531   }
532 }
533
534 /** @brief Read callback for signals */
535 static int signal_read(ev_source *ev,
536                        int attribute((unused)) fd,
537                        void attribute((unused)) *u) {
538   unsigned char s;
539   int n;
540   int ret;
541
542   if((n = read(ev->sigpipe[0], &s, 1)) == 1)
543     if((ret = ev->signals[s].callback(ev, s, ev->signals[s].u)))
544       return ret;
545   assert(n != 0);
546   if(n < 0 && (errno != EINTR && errno != EAGAIN)) {
547     error(errno, "error reading from signal pipe %d", ev->sigpipe[0]);
548     return -1;
549   }
550   return 0;
551 }
552
553 /** @brief Close the signal pipe */
554 static void close_sigpipe(ev_source *ev) {
555   int save_errno = errno;
556
557   xclose(ev->sigpipe[0]);
558   xclose(ev->sigpipe[1]);
559   ev->sigpipe[0] = ev->sigpipe[1] = -1;
560   errno = save_errno;
561 }
562
563 /** @brief Register a signal handler
564  * @param ev Event loop
565  * @param sig Signal to handle
566  * @param callback Called when signal is delivered
567  * @param u Passed to @p callback
568  * @return 0 on success, non-0 on error
569  *
570  * Note that @p callback is called from inside ev_run(), not from inside the
571  * signal handler, so the usual restrictions on signal handlers do not apply.
572  */
573 int ev_signal(ev_source *ev,
574               int sig,
575               ev_signal_callback *callback,
576               void *u) {
577   int n;
578   struct sigaction sa;
579
580   D(("registering signal %d handler callback %p %p", sig, (void *)callback, u));
581   assert(sig > 0);
582   assert(sig < NSIG);
583   assert(sig <= UCHAR_MAX);
584   if(ev->sigpipe[0] == -1) {
585     D(("creating signal pipe"));
586     xpipe(ev->sigpipe);
587     D(("signal pipe is %d, %d", ev->sigpipe[0], ev->sigpipe[1]));
588     for(n = 0; n < 2; ++n) {
589       nonblock(ev->sigpipe[n]);
590       cloexec(ev->sigpipe[n]);
591     }
592     if(ev_fd(ev, ev_read, ev->sigpipe[0], signal_read, 0, "sigpipe read")) {
593       close_sigpipe(ev);
594       return -1;
595     }
596   }
597   sigaddset(&ev->sigmask, sig);
598   xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
599   sigfd[sig] = ev->sigpipe[1];
600   ev->signals[sig].callback = callback;
601   ev->signals[sig].u = u;
602   sa.sa_handler = sighandler;
603   sigfillset(&sa.sa_mask);
604   sa.sa_flags = SA_RESTART;
605   xsigaction(sig, &sa, &ev->signals[sig].oldsa);
606   ev->escape = 1;
607   return 0;
608 }
609
610 /** @brief Cancel a signal handler
611  * @param ev Event loop
612  * @param sig Signal to cancel
613  * @return 0 on success, non-0 on error
614  */
615 int ev_signal_cancel(ev_source *ev,
616                      int sig) {
617   sigset_t ss;
618
619   xsigaction(sig, &ev->signals[sig].oldsa, 0);
620   ev->signals[sig].callback = 0;
621   ev->escape = 1;
622   sigdelset(&ev->sigmask, sig);
623   sigemptyset(&ss);
624   sigaddset(&ss, sig);
625   xsigprocmask(SIG_UNBLOCK, &ss, 0);
626   return 0;
627 }
628
629 /** @brief Clean up signal handling
630  * @param ev Event loop
631  *
632  * This function can be called from inside a fork.  It restores signal
633  * handlers, unblocks the signals, and closes the signal pipe for @p ev.
634  */
635 void ev_signal_atfork(ev_source *ev) {
636   int sig;
637
638   if(ev->sigpipe[0] != -1) {
639     /* revert any handled signals to their original state */
640     for(sig = 1; sig < NSIG; ++sig) {
641       if(ev->signals[sig].callback != 0)
642         xsigaction(sig, &ev->signals[sig].oldsa, 0);
643     }
644     /* and then unblock them */
645     xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
646     /* don't want a copy of the signal pipe open inside the fork */
647     xclose(ev->sigpipe[0]);
648     xclose(ev->sigpipe[1]);
649   }
650 }
651
652 /* child processes ************************************************************/
653
654 /** @brief Called on SIGCHLD */
655 static int sigchld_callback(ev_source *ev,
656                             int attribute((unused)) sig,
657                             void attribute((unused)) *u) {
658   struct rusage ru;
659   pid_t r;
660   int status, n, ret, revisit;
661
662   do {
663     revisit = 0;
664     for(n = 0; n < ev->nchildren; ++n) {
665       r = wait4(ev->children[n].pid,
666                 &status,
667                 ev->children[n].options | WNOHANG,
668                 &ru);
669       if(r > 0) {
670         ev_child_callback *c = ev->children[n].callback;
671         void *cu = ev->children[n].u;
672
673         if(WIFEXITED(status) || WIFSIGNALED(status))
674           ev_child_cancel(ev, r);
675         revisit = 1;
676         if((ret = c(ev, r, status, &ru, cu)))
677           return ret;
678       } else if(r < 0) {
679         /* We should "never" get an ECHILD but it can in fact happen.  For
680          * instance on Linux 2.4.31, and probably other versions, if someone
681          * straces a child process and then a different child process
682          * terminates, when we wait4() the trace process we will get ECHILD
683          * because it has been reparented to strace.  Obviously this is a
684          * hopeless design flaw in the tracing infrastructure, but we don't
685          * want the disorder server to bomb out because of it.  So we just log
686          * the problem and ignore it.
687          */
688         error(errno, "error calling wait4 for PID %lu (broken ptrace?)",
689               (unsigned long)ev->children[n].pid);
690         if(errno != ECHILD)
691           return -1;
692       }
693     }
694   } while(revisit);
695   return 0;
696 }
697
698 /** @brief Configure event loop for child process handling
699  * @return 0 on success, non-0 on error
700  *
701  * Currently at most one event loop can handle child processes and it must be
702  * distinguished from others by calling this function on it.  This could be
703  * fixed but since no process ever makes use of more than one event loop there
704  * is no need.
705  */
706 int ev_child_setup(ev_source *ev) {
707   D(("installing SIGCHLD handler"));
708   return ev_signal(ev, SIGCHLD, sigchld_callback, 0);
709 }
710
711 /** @brief Wait for a child process to terminate
712  * @param ev Event loop
713  * @param pid Process ID of child
714  * @param options Options to pass to @c wait4()
715  * @param callback Called when child terminates (or possibly when it stops)
716  * @param u Passed to @p callback
717  * @return 0 on success, non-0 on error
718  *
719  * You must have called ev_child_setup() on @p ev once first.
720  */
721 int ev_child(ev_source *ev,
722              pid_t pid,
723              int options,
724              ev_child_callback *callback,
725              void *u) {
726   int n;
727
728   D(("registering child handling %ld options %d callback %p %p",
729      (long)pid, options, (void *)callback, u));
730   assert(ev->signals[SIGCHLD].callback == sigchld_callback);
731   if(ev->nchildren >= ev->nchildslots) {
732     ev->nchildslots = ev->nchildslots ? 2 * ev->nchildslots : 16;
733     ev->children = xrealloc(ev->children,
734                             ev->nchildslots * sizeof (struct child));
735   }
736   n = ev->nchildren++;
737   ev->children[n].pid = pid;
738   ev->children[n].options = options;
739   ev->children[n].callback = callback;
740   ev->children[n].u = u;
741   return 0;
742 }
743
744 /** @brief Stop waiting for a child process
745  * @param ev Event loop
746  * @param pid Child process ID
747  * @return 0 on success, non-0 on error
748  */ 
749 int ev_child_cancel(ev_source *ev,
750                     pid_t pid) {
751   int n;
752
753   for(n = 0; n < ev->nchildren && ev->children[n].pid != pid; ++n)
754     ;
755   assert(n < ev->nchildren);
756   if(n != ev->nchildren - 1)
757     ev->children[n] = ev->children[ev->nchildren - 1];
758   --ev->nchildren;
759   return 0;
760 }
761
762 /* socket listeners ***********************************************************/
763
764 /** @brief State for a socket listener */
765 struct listen_state {
766   ev_listen_callback *callback;
767   void *u;
768 };
769
770 /** @brief Called when a listenign socket is readable */
771 static int listen_callback(ev_source *ev, int fd, void *u) {
772   const struct listen_state *l = u;
773   int newfd;
774   union {
775     struct sockaddr_in in;
776 #if HAVE_STRUCT_SOCKADDR_IN6
777     struct sockaddr_in6 in6;
778 #endif
779     struct sockaddr_un un;
780     struct sockaddr sa;
781   } addr;
782   socklen_t addrlen;
783   int ret;
784
785   D(("callback for listener fd %d", fd));
786   while((addrlen = sizeof addr),
787         (newfd = accept(fd, &addr.sa, &addrlen)) >= 0) {
788     if((ret = l->callback(ev, newfd, &addr.sa, addrlen, l->u)))
789       return ret;
790   }
791   switch(errno) {
792   case EINTR:
793   case EAGAIN:
794     break;
795 #ifdef ECONNABORTED
796   case ECONNABORTED:
797     error(errno, "error calling accept");
798     break;
799 #endif
800 #ifdef EPROTO
801   case EPROTO:
802     /* XXX on some systems EPROTO should be fatal, but we don't know if
803      * we're running on one of them */
804     error(errno, "error calling accept");
805     break;
806 #endif
807   default:
808     fatal(errno, "error calling accept");
809     break;
810   }
811   if(errno != EINTR && errno != EAGAIN)
812     error(errno, "error calling accept");
813   return 0;
814 }
815
816 /** @brief Listen on a socket for inbound stream connections
817  * @param ev Event source
818  * @param fd File descriptor of socket
819  * @param callback Called when a new connection arrives
820  * @param u Passed to @p callback
821  * @param what Text description of socket
822  * @return 0 on success, non-0 on error
823  */
824 int ev_listen(ev_source *ev,
825               int fd,
826               ev_listen_callback *callback,
827               void *u,
828               const char *what) {
829   struct listen_state *l = xmalloc(sizeof *l);
830
831   D(("registering listener fd %d callback %p %p", fd, (void *)callback, u));
832   l->callback = callback;
833   l->u = u;
834   return ev_fd(ev, ev_read, fd, listen_callback, l, what);
835 }
836
837 /** @brief Stop listening on a socket
838  * @param ev Event loop
839  * @param fd File descriptor of socket
840  * @return 0 on success, non-0 on error
841  */ 
842 int ev_listen_cancel(ev_source *ev, int fd) {
843   D(("cancelling listener fd %d", fd));
844   return ev_fd_cancel(ev, ev_read, fd);
845 }
846
847 /* buffer *********************************************************************/
848
849 /** @brief Buffer structure */
850 struct buffer {
851   char *base, *start, *end, *top;
852 };
853
854 /* @brief Make sure there is @p bytes available at @c b->end */
855 static void buffer_space(struct buffer *b, size_t bytes) {
856   D(("buffer_space %p %p %p %p want %lu",
857      (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top,
858      (unsigned long)bytes));
859   if(b->start == b->end)
860     b->start = b->end = b->base;
861   if((size_t)(b->top - b->end) < bytes) {
862     if((size_t)((b->top - b->end) + (b->start - b->base)) < bytes) {
863       size_t newspace = b->end - b->start + bytes, n;
864       char *newbase;
865
866       for(n = 16; n < newspace; n *= 2)
867         ;
868       newbase = xmalloc_noptr(n);
869       memcpy(newbase, b->start, b->end - b->start);
870       b->base = newbase;
871       b->end = newbase + (b->end - b->start);
872       b->top = newbase + n;
873       b->start = newbase;               /* must be last */
874     } else {
875       memmove(b->base, b->start, b->end - b->start);
876       b->end = b->base + (b->end - b->start);
877       b->start = b->base;
878     }
879   }
880   D(("result %p %p %p %p",
881      (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top));
882 }
883
884 /* buffered writer ************************************************************/
885
886 /** @brief State structure for a buffered writer */
887 struct ev_writer {
888   struct sink s;
889   struct buffer b;
890   int fd;
891   int eof;
892   ev_error_callback *callback;
893   void *u;
894   ev_source *ev;
895 };
896
897 /** @brief Called when a writer's file descriptor is writable */
898 static int writer_callback(ev_source *ev, int fd, void *u) {
899   ev_writer *w = u;
900   int n;
901
902   n = write(fd, w->b.start, w->b.end - w->b.start);
903   D(("callback for writer fd %d, %ld bytes, n=%d, errno=%d",
904      fd, (long)(w->b.end - w->b.start), n, errno));
905   if(n >= 0) {
906     w->b.start += n;
907     if(w->b.start == w->b.end) {
908       if(w->eof) {
909         ev_fd_cancel(ev, ev_write, fd);
910         return w->callback(ev, fd, 0, w->u);
911       } else
912         ev_fd_disable(ev, ev_write, fd);
913     }
914   } else {
915     switch(errno) {
916     case EINTR:
917     case EAGAIN:
918       break;
919     default:
920       ev_fd_cancel(ev, ev_write, fd);
921       return w->callback(ev, fd, errno, w->u);
922     }
923   }
924   return 0;
925 }
926
927 /** @brief Write bytes to a writer's buffer
928  *
929  * This is the sink write callback.
930  *
931  * Calls ev_fd_enable() if necessary (i.e. if the buffer was empty but
932  * now is not).
933  */
934 static int ev_writer_write(struct sink *sk, const void *s, int n) {
935   ev_writer *w = (ev_writer *)sk;
936   
937   buffer_space(&w->b, n);
938   if(w->b.start == w->b.end)
939     ev_fd_enable(w->ev, ev_write, w->fd);
940   memcpy(w->b.end, s, n);
941   w->b.end += n;
942   return 0;
943 }
944
945 /** @brief Create a new buffered writer
946  * @param ev Event loop
947  * @param fd File descriptor to write to
948  * @param callback Called if an error occurs and when finished
949  * @param u Passed to @p callback
950  * @param what Text description
951  * @return New writer or @c NULL
952  */ 
953 ev_writer *ev_writer_new(ev_source *ev,
954                          int fd,
955                          ev_error_callback *callback,
956                          void *u,
957                          const char *what) {
958   ev_writer *w = xmalloc(sizeof *w);
959
960   D(("registering writer fd %d callback %p %p", fd, (void *)callback, u));
961   w->s.write = ev_writer_write;
962   w->fd = fd;
963   w->callback = callback;
964   w->u = u;
965   w->ev = ev;
966   if(ev_fd(ev, ev_write, fd, writer_callback, w, what))
967     return 0;
968   ev_fd_disable(ev, ev_write, fd);
969   return w;
970 }
971
972 /** @brief Return the sink associated with a writer
973  * @param w Writer
974  * @return Pointer to sink
975  *
976  * Writing to the sink will arrange for those bytes to be written to the file
977  * descriptor as and when it is writable.
978  */
979 struct sink *ev_writer_sink(ev_writer *w) {
980   return &w->s;
981 }
982
983 /** @brief Shutdown callback
984  *
985  * See ev_writer_close().
986  */
987 static int writer_shutdown(ev_source *ev,
988                            const attribute((unused)) struct timeval *now,
989                            void *u) {
990   ev_writer *w = u;
991
992   return w->callback(ev, w->fd, 0, w->u);
993 }
994
995 /** @brief Close a writer
996  * @param w Writer to close
997  * @return 0 on success, non-0 on error
998  *
999  * Close a writer.  No more bytes should be written to its sink.
1000  *
1001  * When the last byte has been written the callback will be called with an
1002  * error code of 0.  It is guaranteed that this will NOT happen before
1003  * ev_writer_close() returns (although the file descriptor for the writer might
1004  * be cancelled by the time it returns).
1005  */
1006 int ev_writer_close(ev_writer *w) {
1007   D(("close writer fd %d", w->fd));
1008   w->eof = 1;
1009   if(w->b.start == w->b.end) {
1010     /* we're already finished */
1011     ev_fd_cancel(w->ev, ev_write, w->fd);
1012     return ev_timeout(w->ev, 0, 0, writer_shutdown, w);
1013   }
1014   return 0;
1015 }
1016
1017 /** @brief Cancel a writer discarding any buffered data
1018  * @param w Writer to close
1019  * @return 0 on success, non-0 on error
1020  *
1021  * This cancels a writer immediately.  Any unwritten buffered data is discarded
1022  * and the error callback is never called.  This is appropriate to call if (for
1023  * instance) the read half of a TCP connection is known to have failed and the
1024  * writer is therefore obsolete.
1025  */
1026 int ev_writer_cancel(ev_writer *w) {
1027   D(("cancel writer fd %d", w->fd));
1028   return ev_fd_cancel(w->ev, ev_write, w->fd);
1029 }
1030
1031 /** @brief Attempt to flush a writer
1032  * @param w Writer to flush
1033  * @return 0 on success, non-0 on error
1034  *
1035  * Does a speculative write of any buffered data.  Does not block if it cannot
1036  * be written.
1037  */
1038 int ev_writer_flush(ev_writer *w) {
1039   return writer_callback(w->ev, w->fd, w);
1040 }
1041
1042 /* buffered reader ************************************************************/
1043
1044 /** @brief State structure for a buffered reader */
1045 struct ev_reader {
1046   struct buffer b;
1047   int fd;
1048   ev_reader_callback *callback;
1049   ev_error_callback *error_callback;
1050   void *u;
1051   ev_source *ev;
1052   int eof;
1053 };
1054
1055 /** @brief Called when a reader's @p fd is readable */
1056 static int reader_callback(ev_source *ev, int fd, void *u) {
1057   ev_reader *r = u;
1058   int n;
1059
1060   buffer_space(&r->b, 1);
1061   n = read(fd, r->b.end, r->b.top - r->b.end);
1062   D(("read fd %d buffer %d returned %d errno %d",
1063      fd, (int)(r->b.top - r->b.end), n, errno));
1064   if(n > 0) {
1065     r->b.end += n;
1066     return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 0, r->u);
1067   } else if(n == 0) {
1068     r->eof = 1;
1069     ev_fd_cancel(ev, ev_read, fd);
1070     return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 1, r->u);
1071   } else {
1072     switch(errno) {
1073     case EINTR:
1074     case EAGAIN:
1075       break;
1076     default:
1077       ev_fd_cancel(ev, ev_read, fd);
1078       return r->error_callback(ev, fd, errno, r->u);
1079     }
1080   }
1081   return 0;
1082 }
1083
1084 /** @brief Create a new buffered reader
1085  * @param ev Event loop
1086  * @param fd File descriptor to read from
1087  * @param callback Called when new data is available
1088  * @param error_callback Called if an error occurs
1089  * @param u Passed to callbacks
1090  * @param what Text description
1091  * @return New reader or @c NULL
1092  */
1093 ev_reader *ev_reader_new(ev_source *ev,
1094                          int fd,
1095                          ev_reader_callback *callback,
1096                          ev_error_callback *error_callback,
1097                          void *u,
1098                          const char *what) {
1099   ev_reader *r = xmalloc(sizeof *r);
1100
1101   D(("registering reader fd %d callback %p %p %p",
1102      fd, (void *)callback, (void *)error_callback, u));
1103   r->fd = fd;
1104   r->callback = callback;
1105   r->error_callback = error_callback;
1106   r->u = u;
1107   r->ev = ev;
1108   if(ev_fd(ev, ev_read, fd, reader_callback, r, what))
1109     return 0;
1110   return r;
1111 }
1112
1113 void ev_reader_buffer(ev_reader *r, size_t nbytes) {
1114   buffer_space(&r->b, nbytes - (r->b.end - r->b.start));
1115 }
1116
1117 /** @brief Consume @p n bytes from the reader's buffer
1118  * @param r Reader
1119  * @param n Number of bytes to consume
1120  *
1121  * Tells the reader than the next @p n bytes have been dealt with and can now
1122  * be discarded.
1123  */
1124 void ev_reader_consume(ev_reader *r, size_t n) {
1125   r->b.start += n;
1126 }
1127
1128 /** @brief Cancel a reader
1129  * @param r Reader
1130  * @return 0 on success, non-0 on error
1131  */
1132 int ev_reader_cancel(ev_reader *r) {
1133   D(("cancel reader fd %d", r->fd));
1134   return ev_fd_cancel(r->ev, ev_read, r->fd);
1135 }
1136
1137 /** @brief Temporarily disable a reader
1138  * @param r Reader
1139  * @return 0 on success, non-0 on error
1140  *
1141  * No further callbacks for this reader will be made.  Re-enable with
1142  * ev_reader_enable().
1143  */
1144 int ev_reader_disable(ev_reader *r) {
1145   D(("disable reader fd %d", r->fd));
1146   return r->eof ? 0 : ev_fd_disable(r->ev, ev_read, r->fd);
1147 }
1148
1149 /** @brief Called from ev_run() for ev_reader_incomplete() */
1150 static int reader_continuation(ev_source attribute((unused)) *ev,
1151                                const attribute((unused)) struct timeval *now,
1152                                void *u) {
1153   ev_reader *r = u;
1154
1155   D(("reader continuation callback fd %d", r->fd));
1156   if(ev_fd_enable(r->ev, ev_read, r->fd)) return -1;
1157   return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
1158 }
1159
1160 /** @brief Arrange another callback
1161  * @param r reader
1162  * @return 0 on success, non-0 on error
1163  *
1164  * Indicates that the reader can process more input but would like to yield to
1165  * other clients of the event loop.  Input will be disabled but it will be
1166  * re-enabled on the next iteration of the event loop and the read callback
1167  * will be called again (even if no further bytes are available).
1168  */
1169 int ev_reader_incomplete(ev_reader *r) {
1170   if(ev_fd_disable(r->ev, ev_read, r->fd)) return -1;
1171   return ev_timeout(r->ev, 0, 0, reader_continuation, r);
1172 }
1173
1174 static int reader_enabled(ev_source *ev,
1175                           const attribute((unused)) struct timeval *now,
1176                           void *u) {
1177   ev_reader *r = u;
1178
1179   D(("reader enabled callback fd %d", r->fd));
1180   return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
1181 }
1182
1183 /** @brief Re-enable reading
1184  * @param r reader
1185  * @return 0 on success, non-0 on error
1186  *
1187  * If there is unconsumed data then you get a callback next time round the
1188  * event loop even if nothing new has been read.
1189  *
1190  * The idea is in your read callback you come across a line (or whatever) that
1191  * can't be processed immediately.  So you set up processing and disable
1192  * reading with ev_reader_disable().  Later when you finish processing you
1193  * re-enable.  You'll automatically get another callback directly from the
1194  * event loop (i.e. not from inside ev_reader_enable()) so you can handle the
1195  * next line (or whatever) if the whole thing has in fact already arrived.
1196  */
1197 int ev_reader_enable(ev_reader *r) {
1198   D(("enable reader fd %d", r->fd));
1199   return ((r->eof ? 0 : ev_fd_enable(r->ev, ev_read, r->fd))
1200           || ev_timeout(r->ev, 0, 0, reader_enabled, r)) ? -1 : 0;
1201 }
1202
1203 /*
1204 Local Variables:
1205 c-basic-offset:2
1206 comment-column:40
1207 fill-column:79
1208 End:
1209 */