2 * This file is part of DisOrder.
3 * Copyright (C) 2004, 2005 Richard Kettlewell
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26 #include <sys/types.h>
27 #include <sys/resource.h>
35 #include <sys/socket.h>
36 #include <netinet/in.h>
49 ev_timeout_callback *callback;
56 ev_fd_callback *callback;
69 struct sigaction oldsa;
70 ev_signal_callback *callback;
77 ev_child_callback *callback;
82 struct fdmode mode[ev_nmodes];
83 struct timeout *timeouts;
84 struct signal signals[NSIG];
88 int nchildren, nchildslots;
89 struct child *children;
92 static const char *modenames[] = { "read", "write", "except" };
94 /* utilities ******************************************************************/
96 static inline int gt(const struct timeval *a, const struct timeval *b) {
97 if(a->tv_sec > b->tv_sec)
99 if(a->tv_sec == b->tv_sec
100 && a->tv_usec > b->tv_usec)
105 static inline int ge(const struct timeval *a, const struct timeval *b) {
109 /* creation *******************************************************************/
111 ev_source *ev_new(void) {
112 ev_source *ev = xmalloc(sizeof *ev);
115 memset(ev, 0, sizeof *ev);
116 for(n = 0; n < ev_nmodes; ++n)
117 FD_ZERO(&ev->mode[n].enabled);
118 ev->sigpipe[0] = ev->sigpipe[1] = -1;
119 sigemptyset(&ev->sigmask);
123 /* event loop *****************************************************************/
125 int ev_run(ev_source *ev) {
128 struct timeval delta;
132 struct timeout *t, **tt;
134 xgettimeofday(&now, 0);
135 /* Handle timeouts. We don't want to handle any timeouts that are added
136 * while we're handling them (otherwise we'd have to break out of infinite
137 * loops, preferrably without starving better-behaved subsystems). Hence
138 * the slightly complicated two-phase approach here. */
139 for(t = ev->timeouts;
140 t && ge(&now, &t->when);
143 D(("calling timeout for %ld.%ld callback %p %p",
144 (long)t->when.tv_sec, (long)t->when.tv_usec,
145 (void *)t->callback, t->u));
146 ret = t->callback(ev, &now, t->u);
158 for(mode = 0; mode < ev_nmodes; ++mode) {
159 ev->mode[mode].tripped = ev->mode[mode].enabled;
160 if(ev->mode[mode].maxfd > maxfd)
161 maxfd = ev->mode[mode].maxfd;
163 xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
166 xgettimeofday(&now, 0);
167 delta.tv_sec = ev->timeouts->when.tv_sec - now.tv_sec;
168 delta.tv_usec = ev->timeouts->when.tv_usec - now.tv_usec;
169 if(delta.tv_usec < 0) {
170 delta.tv_usec += 1000000;
174 delta.tv_sec = delta.tv_usec = 0;
175 n = select(maxfd + 1,
176 &ev->mode[ev_read].tripped,
177 &ev->mode[ev_write].tripped,
178 &ev->mode[ev_except].tripped,
181 n = select(maxfd + 1,
182 &ev->mode[ev_read].tripped,
183 &ev->mode[ev_write].tripped,
184 &ev->mode[ev_except].tripped,
187 } while(n < 0 && errno == EINTR);
188 xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
190 error(errno, "error calling select");
194 /* if anything deranges the meaning of an fd, or re-orders the
195 * fds[] tables, we'd better give up; such operations will
196 * therefore set @escape@. */
198 for(mode = 0; mode < ev_nmodes && !ev->escape; ++mode)
199 for(n = 0; n < ev->mode[mode].nfds && !ev->escape; ++n) {
200 int fd = ev->mode[mode].fds[n].fd;
201 if(FD_ISSET(fd, &ev->mode[mode].tripped)) {
202 D(("calling %s fd %d callback %p %p", modenames[mode], fd,
203 (void *)ev->mode[mode].fds[n].callback,
204 ev->mode[mode].fds[n].u));
205 ret = ev->mode[mode].fds[n].callback(ev, fd,
206 ev->mode[mode].fds[n].u);
212 /* we'll pick up timeouts back round the loop */
216 /* file descriptors ***********************************************************/
218 int ev_fd(ev_source *ev,
221 ev_fd_callback *callback,
225 D(("registering %s fd %d callback %p %p", modenames[mode], fd,
226 (void *)callback, u));
227 assert(mode < ev_nmodes);
228 if(ev->mode[mode].nfds >= ev->mode[mode].fdslots) {
229 ev->mode[mode].fdslots = (ev->mode[mode].fdslots
230 ? 2 * ev->mode[mode].fdslots : 16);
231 D(("expanding %s fd table to %d entries", modenames[mode],
232 ev->mode[mode].fdslots));
233 ev->mode[mode].fds = xrealloc(ev->mode[mode].fds,
234 ev->mode[mode].fdslots * sizeof (struct fd));
236 n = ev->mode[mode].nfds++;
237 FD_SET(fd, &ev->mode[mode].enabled);
238 ev->mode[mode].fds[n].fd = fd;
239 ev->mode[mode].fds[n].callback = callback;
240 ev->mode[mode].fds[n].u = u;
241 if(fd > ev->mode[mode].maxfd)
242 ev->mode[mode].maxfd = fd;
247 int ev_fd_cancel(ev_source *ev, ev_fdmode mode, int fd) {
251 D(("cancelling mode %s fd %d", modenames[mode], fd));
252 /* find the right struct fd */
253 for(n = 0; n < ev->mode[mode].nfds && fd != ev->mode[mode].fds[n].fd; ++n)
255 assert(n < ev->mode[mode].nfds);
256 /* swap in the last fd and reduce the count */
257 if(n != ev->mode[mode].nfds - 1)
258 ev->mode[mode].fds[n] = ev->mode[mode].fds[ev->mode[mode].nfds - 1];
259 --ev->mode[mode].nfds;
260 /* if that was the biggest fd, find the new biggest one */
261 if(fd == ev->mode[mode].maxfd) {
263 for(n = 0; n < ev->mode[mode].nfds; ++n)
264 if(ev->mode[mode].fds[n].fd > maxfd)
265 maxfd = ev->mode[mode].fds[n].fd;
266 ev->mode[mode].maxfd = maxfd;
268 /* don't tell select about this fd any more */
269 FD_CLR(fd, &ev->mode[mode].enabled);
274 int ev_fd_enable(ev_source *ev, ev_fdmode mode, int fd) {
275 D(("enabling mode %s fd %d", modenames[mode], fd));
276 FD_SET(fd, &ev->mode[mode].enabled);
280 int ev_fd_disable(ev_source *ev, ev_fdmode mode, int fd) {
281 D(("disabling mode %s fd %d", modenames[mode], fd));
282 FD_CLR(fd, &ev->mode[mode].enabled);
283 FD_CLR(fd, &ev->mode[mode].tripped);
287 /* timeouts *******************************************************************/
289 int ev_timeout(ev_source *ev,
290 ev_timeout_handle *handlep,
291 const struct timeval *when,
292 ev_timeout_callback *callback,
294 struct timeout *t, *p, **pp;
296 D(("registering timeout at %ld.%ld callback %p %p",
297 when ? (long)when->tv_sec : 0, when ? (long)when->tv_usec : 0,
298 (void *)callback, u));
299 t = xmalloc(sizeof *t);
302 t->callback = callback;
305 while((p = *pp) && gt(&t->when, &p->when))
314 int ev_timeout_cancel(ev_source *ev,
315 ev_timeout_handle handle) {
316 struct timeout *t = handle, *p, **pp;
318 for(pp = &ev->timeouts; (p = *pp) && p != t; pp = &p->next)
327 /* signals ********************************************************************/
329 static int sigfd[NSIG];
331 static void sighandler(int s) {
332 unsigned char sc = s;
333 static const char errmsg[] = "error writing to signal pipe";
335 /* probably the reader has stopped listening for some reason */
336 if(write(sigfd[s], &sc, 1) < 0) {
337 write(2, errmsg, sizeof errmsg - 1);
342 static int signal_read(ev_source *ev,
343 int attribute((unused)) fd,
344 void attribute((unused)) *u) {
349 if((n = read(ev->sigpipe[0], &s, 1)) == 1)
350 if((ret = ev->signals[s].callback(ev, s, ev->signals[s].u)))
353 if(n < 0 && (errno != EINTR && errno != EAGAIN)) {
354 error(errno, "error reading from signal pipe %d", ev->sigpipe[0]);
360 static void close_sigpipe(ev_source *ev) {
361 int save_errno = errno;
363 xclose(ev->sigpipe[0]);
364 xclose(ev->sigpipe[1]);
365 ev->sigpipe[0] = ev->sigpipe[1] = -1;
369 int ev_signal(ev_source *ev,
371 ev_signal_callback *callback,
376 D(("registering signal %d handler callback %p %p", sig, (void *)callback, u));
379 assert(sig <= UCHAR_MAX);
380 if(ev->sigpipe[0] == -1) {
381 D(("creating signal pipe"));
383 D(("signal pipe is %d, %d", ev->sigpipe[0], ev->sigpipe[1]));
384 for(n = 0; n < 2; ++n) {
385 nonblock(ev->sigpipe[n]);
386 cloexec(ev->sigpipe[n]);
388 if(ev_fd(ev, ev_read, ev->sigpipe[0], signal_read, 0)) {
393 sigaddset(&ev->sigmask, sig);
394 xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
395 sigfd[sig] = ev->sigpipe[1];
396 ev->signals[sig].callback = callback;
397 ev->signals[sig].u = u;
398 sa.sa_handler = sighandler;
399 sigfillset(&sa.sa_mask);
400 sa.sa_flags = SA_RESTART;
401 xsigaction(sig, &sa, &ev->signals[sig].oldsa);
406 int ev_signal_cancel(ev_source *ev,
410 xsigaction(sig, &ev->signals[sig].oldsa, 0);
411 ev->signals[sig].callback = 0;
413 sigdelset(&ev->sigmask, sig);
416 xsigprocmask(SIG_UNBLOCK, &ss, 0);
420 void ev_signal_atfork(ev_source *ev) {
423 if(ev->sigpipe[0] != -1) {
424 /* revert any handled signals to their original state */
425 for(sig = 1; sig < NSIG; ++sig) {
426 if(ev->signals[sig].callback != 0)
427 xsigaction(sig, &ev->signals[sig].oldsa, 0);
429 /* and then unblock them */
430 xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
431 /* don't want a copy of the signal pipe open inside the fork */
432 xclose(ev->sigpipe[0]);
433 xclose(ev->sigpipe[1]);
437 /* child processes ************************************************************/
439 static int sigchld_callback(ev_source *ev,
440 int attribute((unused)) sig,
441 void attribute((unused)) *u) {
444 int status, n, ret, revisit;
448 for(n = 0; n < ev->nchildren; ++n) {
449 r = wait4(ev->children[n].pid,
451 ev->children[n].options | WNOHANG,
454 ev_child_callback *c = ev->children[n].callback;
455 void *cu = ev->children[n].u;
457 if(WIFEXITED(status) || WIFSIGNALED(status))
458 ev_child_cancel(ev, r);
460 if((ret = c(ev, r, status, &ru, cu)))
463 /* We should "never" get an ECHILD but it can in fact happen. For
464 * instance on Linux 2.4.31, and probably other versions, if someone
465 * straces a child process and then a different child process
466 * terminates, when we wait4() the trace process we will get ECHILD
467 * because it has been reparented to strace. Obviously this is a
468 * hopeless design flaw in the tracing infrastructure, but we don't
469 * want the disorder server to bomb out because of it. So we just log
470 * the problem and ignore it.
472 error(errno, "error calling wait4 for PID %lu (broken ptrace?)",
473 (unsigned long)ev->children[n].pid);
482 int ev_child_setup(ev_source *ev) {
483 D(("installing SIGCHLD handler"));
484 return ev_signal(ev, SIGCHLD, sigchld_callback, 0);
487 int ev_child(ev_source *ev,
490 ev_child_callback *callback,
494 D(("registering child handling %ld options %d callback %p %p",
495 (long)pid, options, (void *)callback, u));
496 assert(ev->signals[SIGCHLD].callback == sigchld_callback);
497 if(ev->nchildren >= ev->nchildslots) {
498 ev->nchildslots = ev->nchildslots ? 2 * ev->nchildslots : 16;
499 ev->children = xrealloc(ev->children,
500 ev->nchildslots * sizeof (struct child));
503 ev->children[n].pid = pid;
504 ev->children[n].options = options;
505 ev->children[n].callback = callback;
506 ev->children[n].u = u;
510 int ev_child_cancel(ev_source *ev,
514 for(n = 0; n < ev->nchildren && ev->children[n].pid != pid; ++n)
516 assert(n < ev->nchildren);
517 if(n != ev->nchildren - 1)
518 ev->children[n] = ev->children[ev->nchildren - 1];
523 /* socket listeners ***********************************************************/
525 struct listen_state {
526 ev_listen_callback *callback;
530 static int listen_callback(ev_source *ev, int fd, void *u) {
531 const struct listen_state *l = u;
534 struct sockaddr_in in;
535 #if HAVE_STRUCT_SOCKADDR_IN6
536 struct sockaddr_in6 in6;
538 struct sockaddr_un un;
544 D(("callback for listener fd %d", fd));
545 while((addrlen = sizeof addr),
546 (newfd = accept(fd, &addr.sa, &addrlen)) >= 0) {
547 if((ret = l->callback(ev, newfd, &addr.sa, addrlen, l->u)))
556 error(errno, "error calling accept");
561 /* XXX on some systems EPROTO should be fatal, but we don't know if
562 * we're running on one of them */
563 error(errno, "error calling accept");
567 fatal(errno, "error calling accept");
570 if(errno != EINTR && errno != EAGAIN)
571 error(errno, "error calling accept");
575 int ev_listen(ev_source *ev,
577 ev_listen_callback *callback,
579 struct listen_state *l = xmalloc(sizeof *l);
581 D(("registering listener fd %d callback %p %p", fd, (void *)callback, u));
582 l->callback = callback;
584 return ev_fd(ev, ev_read, fd, listen_callback, l);
587 int ev_listen_cancel(ev_source *ev, int fd) {
588 D(("cancelling listener fd %d", fd));
589 return ev_fd_cancel(ev, ev_read, fd);
592 /* buffer *********************************************************************/
595 char *base, *start, *end, *top;
598 /* make sure there is @bytes@ available at @b->end@ */
599 static void buffer_space(struct buffer *b, size_t bytes) {
600 D(("buffer_space %p %p %p %p want %lu",
601 (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top,
602 (unsigned long)bytes));
603 if(b->start == b->end)
604 b->start = b->end = b->base;
605 if((size_t)(b->top - b->end) < bytes) {
606 if((size_t)((b->top - b->end) + (b->start - b->base)) < bytes) {
607 size_t newspace = b->end - b->start + bytes, n;
610 for(n = 16; n < newspace; n *= 2)
612 newbase = xmalloc_noptr(n);
613 memcpy(newbase, b->start, b->end - b->start);
615 b->end = newbase + (b->end - b->start);
616 b->top = newbase + n;
617 b->start = newbase; /* must be last */
619 memmove(b->base, b->start, b->end - b->start);
620 b->end = b->base + (b->end - b->start);
624 D(("result %p %p %p %p",
625 (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top));
628 /* buffered writer ************************************************************/
635 ev_error_callback *callback;
640 static int writer_callback(ev_source *ev, int fd, void *u) {
644 n = write(fd, w->b.start, w->b.end - w->b.start);
645 D(("callback for writer fd %d, %ld bytes, n=%d, errno=%d",
646 fd, (long)(w->b.end - w->b.start), n, errno));
649 if(w->b.start == w->b.end) {
651 ev_fd_cancel(ev, ev_write, fd);
652 return w->callback(ev, fd, 0, w->u);
654 ev_fd_disable(ev, ev_write, fd);
662 ev_fd_cancel(ev, ev_write, fd);
663 return w->callback(ev, fd, errno, w->u);
669 static int ev_writer_write(struct sink *sk, const void *s, int n) {
670 ev_writer *w = (ev_writer *)sk;
672 buffer_space(&w->b, n);
673 if(w->b.start == w->b.end)
674 ev_fd_enable(w->ev, ev_write, w->fd);
675 memcpy(w->b.end, s, n);
680 ev_writer *ev_writer_new(ev_source *ev,
682 ev_error_callback *callback,
684 ev_writer *w = xmalloc(sizeof *w);
686 D(("registering writer fd %d callback %p %p", fd, (void *)callback, u));
687 w->s.write = ev_writer_write;
689 w->callback = callback;
692 if(ev_fd(ev, ev_write, fd, writer_callback, w))
694 ev_fd_disable(ev, ev_write, fd);
698 struct sink *ev_writer_sink(ev_writer *w) {
702 static int writer_shutdown(ev_source *ev,
703 const attribute((unused)) struct timeval *now,
707 return w->callback(ev, w->fd, 0, w->u);
710 int ev_writer_close(ev_writer *w) {
711 D(("close writer fd %d", w->fd));
713 if(w->b.start == w->b.end) {
714 /* we're already finished */
715 ev_fd_cancel(w->ev, ev_write, w->fd);
716 return ev_timeout(w->ev, 0, 0, writer_shutdown, w);
721 int ev_writer_cancel(ev_writer *w) {
722 D(("cancel writer fd %d", w->fd));
723 return ev_fd_cancel(w->ev, ev_write, w->fd);
726 int ev_writer_flush(ev_writer *w) {
727 return writer_callback(w->ev, w->fd, w);
730 /* buffered reader ************************************************************/
735 ev_reader_callback *callback;
736 ev_error_callback *error_callback;
742 static int reader_callback(ev_source *ev, int fd, void *u) {
746 buffer_space(&r->b, 1);
747 n = read(fd, r->b.end, r->b.top - r->b.end);
748 D(("read fd %d buffer %d returned %d errno %d",
749 fd, (int)(r->b.top - r->b.end), n, errno));
752 return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 0, r->u);
755 ev_fd_cancel(ev, ev_read, fd);
756 return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 1, r->u);
763 ev_fd_cancel(ev, ev_read, fd);
764 return r->error_callback(ev, fd, errno, r->u);
770 ev_reader *ev_reader_new(ev_source *ev,
772 ev_reader_callback *callback,
773 ev_error_callback *error_callback,
775 ev_reader *r = xmalloc(sizeof *r);
777 D(("registering reader fd %d callback %p %p %p",
778 fd, (void *)callback, (void *)error_callback, u));
780 r->callback = callback;
781 r->error_callback = error_callback;
784 if(ev_fd(ev, ev_read, fd, reader_callback, r))
789 void ev_reader_buffer(ev_reader *r, size_t nbytes) {
790 buffer_space(&r->b, nbytes - (r->b.end - r->b.start));
793 void ev_reader_consume(ev_reader *r, size_t n) {
797 int ev_reader_cancel(ev_reader *r) {
798 D(("cancel reader fd %d", r->fd));
799 return ev_fd_cancel(r->ev, ev_read, r->fd);
802 int ev_reader_disable(ev_reader *r) {
803 D(("disable reader fd %d", r->fd));
804 return r->eof ? 0 : ev_fd_disable(r->ev, ev_read, r->fd);
807 static int reader_continuation(ev_source attribute((unused)) *ev,
808 const attribute((unused)) struct timeval *now,
812 D(("reader continuation callback fd %d", r->fd));
813 if(ev_fd_enable(r->ev, ev_read, r->fd)) return -1;
814 return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
817 int ev_reader_incomplete(ev_reader *r) {
818 if(ev_fd_disable(r->ev, ev_read, r->fd)) return -1;
819 return ev_timeout(r->ev, 0, 0, reader_continuation, r);
822 static int reader_enabled(ev_source *ev,
823 const attribute((unused)) struct timeval *now,
827 D(("reader enabled callback fd %d", r->fd));
828 return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
831 int ev_reader_enable(ev_reader *r) {
832 D(("enable reader fd %d", r->fd));
833 return ((r->eof ? 0 : ev_fd_enable(r->ev, ev_read, r->fd))
834 || ev_timeout(r->ev, 0, 0, reader_enabled, r)) ? -1 : 0;