2 * This file is part of DisOrder.
3 * Copyright (C) 2004, 2005, 2007 Richard Kettlewell
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26 #include <sys/types.h>
27 #include <sys/resource.h>
35 #include <sys/socket.h>
36 #include <netinet/in.h>
49 ev_timeout_callback *callback;
56 ev_fd_callback *callback;
70 struct sigaction oldsa;
71 ev_signal_callback *callback;
78 ev_child_callback *callback;
83 struct fdmode mode[ev_nmodes];
84 struct timeout *timeouts;
85 struct signal signals[NSIG];
89 int nchildren, nchildslots;
90 struct child *children;
93 static const char *modenames[] = { "read", "write", "except" };
95 /* utilities ******************************************************************/
97 static inline int gt(const struct timeval *a, const struct timeval *b) {
98 if(a->tv_sec > b->tv_sec)
100 if(a->tv_sec == b->tv_sec
101 && a->tv_usec > b->tv_usec)
106 static inline int ge(const struct timeval *a, const struct timeval *b) {
110 /* creation *******************************************************************/
112 ev_source *ev_new(void) {
113 ev_source *ev = xmalloc(sizeof *ev);
116 memset(ev, 0, sizeof *ev);
117 for(n = 0; n < ev_nmodes; ++n)
118 FD_ZERO(&ev->mode[n].enabled);
119 ev->sigpipe[0] = ev->sigpipe[1] = -1;
120 sigemptyset(&ev->sigmask);
124 /* event loop *****************************************************************/
126 int ev_run(ev_source *ev) {
129 struct timeval delta;
133 struct timeout *t, **tt;
136 xgettimeofday(&now, 0);
137 /* Handle timeouts. We don't want to handle any timeouts that are added
138 * while we're handling them (otherwise we'd have to break out of infinite
139 * loops, preferrably without starving better-behaved subsystems). Hence
140 * the slightly complicated two-phase approach here. */
141 for(t = ev->timeouts;
142 t && ge(&now, &t->when);
145 D(("calling timeout for %ld.%ld callback %p %p",
146 (long)t->when.tv_sec, (long)t->when.tv_usec,
147 (void *)t->callback, t->u));
148 ret = t->callback(ev, &now, t->u);
160 for(mode = 0; mode < ev_nmodes; ++mode) {
161 ev->mode[mode].tripped = ev->mode[mode].enabled;
162 if(ev->mode[mode].maxfd > maxfd)
163 maxfd = ev->mode[mode].maxfd;
165 xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
168 xgettimeofday(&now, 0);
169 delta.tv_sec = ev->timeouts->when.tv_sec - now.tv_sec;
170 delta.tv_usec = ev->timeouts->when.tv_usec - now.tv_usec;
171 if(delta.tv_usec < 0) {
172 delta.tv_usec += 1000000;
176 delta.tv_sec = delta.tv_usec = 0;
177 n = select(maxfd + 1,
178 &ev->mode[ev_read].tripped,
179 &ev->mode[ev_write].tripped,
180 &ev->mode[ev_except].tripped,
183 n = select(maxfd + 1,
184 &ev->mode[ev_read].tripped,
185 &ev->mode[ev_write].tripped,
186 &ev->mode[ev_except].tripped,
189 } while(n < 0 && errno == EINTR);
190 xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
192 error(errno, "error calling select");
194 /* If there's a bad FD in the mix then check them all and log what we
195 * find, to ease debugging */
196 for(mode = 0; mode < ev_nmodes; ++mode) {
197 for(n = 0; n < ev->mode[mode].nfds; ++n) {
198 const int fd = ev->mode[mode].fds[n].fd;
200 if(FD_ISSET(fd, &ev->mode[mode].enabled)
201 && fstat(fd, &sb) < 0)
202 error(errno, "fstat %d (%s)", fd, ev->mode[mode].fds[n].what);
209 /* if anything deranges the meaning of an fd, or re-orders the
210 * fds[] tables, we'd better give up; such operations will
211 * therefore set @escape@. */
213 for(mode = 0; mode < ev_nmodes && !ev->escape; ++mode)
214 for(n = 0; n < ev->mode[mode].nfds && !ev->escape; ++n) {
215 int fd = ev->mode[mode].fds[n].fd;
216 if(FD_ISSET(fd, &ev->mode[mode].tripped)) {
217 D(("calling %s fd %d callback %p %p", modenames[mode], fd,
218 (void *)ev->mode[mode].fds[n].callback,
219 ev->mode[mode].fds[n].u));
220 ret = ev->mode[mode].fds[n].callback(ev, fd,
221 ev->mode[mode].fds[n].u);
227 /* we'll pick up timeouts back round the loop */
231 /* file descriptors ***********************************************************/
233 int ev_fd(ev_source *ev,
236 ev_fd_callback *callback,
241 D(("registering %s fd %d callback %p %p", modenames[mode], fd,
242 (void *)callback, u));
243 assert(mode < ev_nmodes);
244 if(ev->mode[mode].nfds >= ev->mode[mode].fdslots) {
245 ev->mode[mode].fdslots = (ev->mode[mode].fdslots
246 ? 2 * ev->mode[mode].fdslots : 16);
247 D(("expanding %s fd table to %d entries", modenames[mode],
248 ev->mode[mode].fdslots));
249 ev->mode[mode].fds = xrealloc(ev->mode[mode].fds,
250 ev->mode[mode].fdslots * sizeof (struct fd));
252 n = ev->mode[mode].nfds++;
253 FD_SET(fd, &ev->mode[mode].enabled);
254 ev->mode[mode].fds[n].fd = fd;
255 ev->mode[mode].fds[n].callback = callback;
256 ev->mode[mode].fds[n].u = u;
257 ev->mode[mode].fds[n].what = what;
258 if(fd > ev->mode[mode].maxfd)
259 ev->mode[mode].maxfd = fd;
264 int ev_fd_cancel(ev_source *ev, ev_fdmode mode, int fd) {
268 D(("cancelling mode %s fd %d", modenames[mode], fd));
269 /* find the right struct fd */
270 for(n = 0; n < ev->mode[mode].nfds && fd != ev->mode[mode].fds[n].fd; ++n)
272 assert(n < ev->mode[mode].nfds);
273 /* swap in the last fd and reduce the count */
274 if(n != ev->mode[mode].nfds - 1)
275 ev->mode[mode].fds[n] = ev->mode[mode].fds[ev->mode[mode].nfds - 1];
276 --ev->mode[mode].nfds;
277 /* if that was the biggest fd, find the new biggest one */
278 if(fd == ev->mode[mode].maxfd) {
280 for(n = 0; n < ev->mode[mode].nfds; ++n)
281 if(ev->mode[mode].fds[n].fd > maxfd)
282 maxfd = ev->mode[mode].fds[n].fd;
283 ev->mode[mode].maxfd = maxfd;
285 /* don't tell select about this fd any more */
286 FD_CLR(fd, &ev->mode[mode].enabled);
291 int ev_fd_enable(ev_source *ev, ev_fdmode mode, int fd) {
292 D(("enabling mode %s fd %d", modenames[mode], fd));
293 FD_SET(fd, &ev->mode[mode].enabled);
297 int ev_fd_disable(ev_source *ev, ev_fdmode mode, int fd) {
298 D(("disabling mode %s fd %d", modenames[mode], fd));
299 FD_CLR(fd, &ev->mode[mode].enabled);
300 FD_CLR(fd, &ev->mode[mode].tripped);
304 /* timeouts *******************************************************************/
306 int ev_timeout(ev_source *ev,
307 ev_timeout_handle *handlep,
308 const struct timeval *when,
309 ev_timeout_callback *callback,
311 struct timeout *t, *p, **pp;
313 D(("registering timeout at %ld.%ld callback %p %p",
314 when ? (long)when->tv_sec : 0, when ? (long)when->tv_usec : 0,
315 (void *)callback, u));
316 t = xmalloc(sizeof *t);
319 t->callback = callback;
322 while((p = *pp) && gt(&t->when, &p->when))
331 int ev_timeout_cancel(ev_source *ev,
332 ev_timeout_handle handle) {
333 struct timeout *t = handle, *p, **pp;
335 for(pp = &ev->timeouts; (p = *pp) && p != t; pp = &p->next)
344 /* signals ********************************************************************/
346 static int sigfd[NSIG];
348 static void sighandler(int s) {
349 unsigned char sc = s;
350 static const char errmsg[] = "error writing to signal pipe";
352 /* probably the reader has stopped listening for some reason */
353 if(write(sigfd[s], &sc, 1) < 0) {
354 write(2, errmsg, sizeof errmsg - 1);
359 static int signal_read(ev_source *ev,
360 int attribute((unused)) fd,
361 void attribute((unused)) *u) {
366 if((n = read(ev->sigpipe[0], &s, 1)) == 1)
367 if((ret = ev->signals[s].callback(ev, s, ev->signals[s].u)))
370 if(n < 0 && (errno != EINTR && errno != EAGAIN)) {
371 error(errno, "error reading from signal pipe %d", ev->sigpipe[0]);
377 static void close_sigpipe(ev_source *ev) {
378 int save_errno = errno;
380 xclose(ev->sigpipe[0]);
381 xclose(ev->sigpipe[1]);
382 ev->sigpipe[0] = ev->sigpipe[1] = -1;
386 int ev_signal(ev_source *ev,
388 ev_signal_callback *callback,
393 D(("registering signal %d handler callback %p %p", sig, (void *)callback, u));
396 assert(sig <= UCHAR_MAX);
397 if(ev->sigpipe[0] == -1) {
398 D(("creating signal pipe"));
400 D(("signal pipe is %d, %d", ev->sigpipe[0], ev->sigpipe[1]));
401 for(n = 0; n < 2; ++n) {
402 nonblock(ev->sigpipe[n]);
403 cloexec(ev->sigpipe[n]);
405 if(ev_fd(ev, ev_read, ev->sigpipe[0], signal_read, 0, "sigpipe read")) {
410 sigaddset(&ev->sigmask, sig);
411 xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
412 sigfd[sig] = ev->sigpipe[1];
413 ev->signals[sig].callback = callback;
414 ev->signals[sig].u = u;
415 sa.sa_handler = sighandler;
416 sigfillset(&sa.sa_mask);
417 sa.sa_flags = SA_RESTART;
418 xsigaction(sig, &sa, &ev->signals[sig].oldsa);
423 int ev_signal_cancel(ev_source *ev,
427 xsigaction(sig, &ev->signals[sig].oldsa, 0);
428 ev->signals[sig].callback = 0;
430 sigdelset(&ev->sigmask, sig);
433 xsigprocmask(SIG_UNBLOCK, &ss, 0);
437 void ev_signal_atfork(ev_source *ev) {
440 if(ev->sigpipe[0] != -1) {
441 /* revert any handled signals to their original state */
442 for(sig = 1; sig < NSIG; ++sig) {
443 if(ev->signals[sig].callback != 0)
444 xsigaction(sig, &ev->signals[sig].oldsa, 0);
446 /* and then unblock them */
447 xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
448 /* don't want a copy of the signal pipe open inside the fork */
449 xclose(ev->sigpipe[0]);
450 xclose(ev->sigpipe[1]);
454 /* child processes ************************************************************/
456 static int sigchld_callback(ev_source *ev,
457 int attribute((unused)) sig,
458 void attribute((unused)) *u) {
461 int status, n, ret, revisit;
465 for(n = 0; n < ev->nchildren; ++n) {
466 r = wait4(ev->children[n].pid,
468 ev->children[n].options | WNOHANG,
471 ev_child_callback *c = ev->children[n].callback;
472 void *cu = ev->children[n].u;
474 if(WIFEXITED(status) || WIFSIGNALED(status))
475 ev_child_cancel(ev, r);
477 if((ret = c(ev, r, status, &ru, cu)))
480 /* We should "never" get an ECHILD but it can in fact happen. For
481 * instance on Linux 2.4.31, and probably other versions, if someone
482 * straces a child process and then a different child process
483 * terminates, when we wait4() the trace process we will get ECHILD
484 * because it has been reparented to strace. Obviously this is a
485 * hopeless design flaw in the tracing infrastructure, but we don't
486 * want the disorder server to bomb out because of it. So we just log
487 * the problem and ignore it.
489 error(errno, "error calling wait4 for PID %lu (broken ptrace?)",
490 (unsigned long)ev->children[n].pid);
499 int ev_child_setup(ev_source *ev) {
500 D(("installing SIGCHLD handler"));
501 return ev_signal(ev, SIGCHLD, sigchld_callback, 0);
504 int ev_child(ev_source *ev,
507 ev_child_callback *callback,
511 D(("registering child handling %ld options %d callback %p %p",
512 (long)pid, options, (void *)callback, u));
513 assert(ev->signals[SIGCHLD].callback == sigchld_callback);
514 if(ev->nchildren >= ev->nchildslots) {
515 ev->nchildslots = ev->nchildslots ? 2 * ev->nchildslots : 16;
516 ev->children = xrealloc(ev->children,
517 ev->nchildslots * sizeof (struct child));
520 ev->children[n].pid = pid;
521 ev->children[n].options = options;
522 ev->children[n].callback = callback;
523 ev->children[n].u = u;
527 int ev_child_cancel(ev_source *ev,
531 for(n = 0; n < ev->nchildren && ev->children[n].pid != pid; ++n)
533 assert(n < ev->nchildren);
534 if(n != ev->nchildren - 1)
535 ev->children[n] = ev->children[ev->nchildren - 1];
540 /* socket listeners ***********************************************************/
542 struct listen_state {
543 ev_listen_callback *callback;
547 static int listen_callback(ev_source *ev, int fd, void *u) {
548 const struct listen_state *l = u;
551 struct sockaddr_in in;
552 #if HAVE_STRUCT_SOCKADDR_IN6
553 struct sockaddr_in6 in6;
555 struct sockaddr_un un;
561 D(("callback for listener fd %d", fd));
562 while((addrlen = sizeof addr),
563 (newfd = accept(fd, &addr.sa, &addrlen)) >= 0) {
564 if((ret = l->callback(ev, newfd, &addr.sa, addrlen, l->u)))
573 error(errno, "error calling accept");
578 /* XXX on some systems EPROTO should be fatal, but we don't know if
579 * we're running on one of them */
580 error(errno, "error calling accept");
584 fatal(errno, "error calling accept");
587 if(errno != EINTR && errno != EAGAIN)
588 error(errno, "error calling accept");
592 int ev_listen(ev_source *ev,
594 ev_listen_callback *callback,
597 struct listen_state *l = xmalloc(sizeof *l);
599 D(("registering listener fd %d callback %p %p", fd, (void *)callback, u));
600 l->callback = callback;
602 return ev_fd(ev, ev_read, fd, listen_callback, l, what);
605 int ev_listen_cancel(ev_source *ev, int fd) {
606 D(("cancelling listener fd %d", fd));
607 return ev_fd_cancel(ev, ev_read, fd);
610 /* buffer *********************************************************************/
613 char *base, *start, *end, *top;
616 /* make sure there is @bytes@ available at @b->end@ */
617 static void buffer_space(struct buffer *b, size_t bytes) {
618 D(("buffer_space %p %p %p %p want %lu",
619 (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top,
620 (unsigned long)bytes));
621 if(b->start == b->end)
622 b->start = b->end = b->base;
623 if((size_t)(b->top - b->end) < bytes) {
624 if((size_t)((b->top - b->end) + (b->start - b->base)) < bytes) {
625 size_t newspace = b->end - b->start + bytes, n;
628 for(n = 16; n < newspace; n *= 2)
630 newbase = xmalloc_noptr(n);
631 memcpy(newbase, b->start, b->end - b->start);
633 b->end = newbase + (b->end - b->start);
634 b->top = newbase + n;
635 b->start = newbase; /* must be last */
637 memmove(b->base, b->start, b->end - b->start);
638 b->end = b->base + (b->end - b->start);
642 D(("result %p %p %p %p",
643 (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top));
646 /* buffered writer ************************************************************/
653 ev_error_callback *callback;
658 static int writer_callback(ev_source *ev, int fd, void *u) {
662 n = write(fd, w->b.start, w->b.end - w->b.start);
663 D(("callback for writer fd %d, %ld bytes, n=%d, errno=%d",
664 fd, (long)(w->b.end - w->b.start), n, errno));
667 if(w->b.start == w->b.end) {
669 ev_fd_cancel(ev, ev_write, fd);
670 return w->callback(ev, fd, 0, w->u);
672 ev_fd_disable(ev, ev_write, fd);
680 ev_fd_cancel(ev, ev_write, fd);
681 return w->callback(ev, fd, errno, w->u);
687 static int ev_writer_write(struct sink *sk, const void *s, int n) {
688 ev_writer *w = (ev_writer *)sk;
690 buffer_space(&w->b, n);
691 if(w->b.start == w->b.end)
692 ev_fd_enable(w->ev, ev_write, w->fd);
693 memcpy(w->b.end, s, n);
698 ev_writer *ev_writer_new(ev_source *ev,
700 ev_error_callback *callback,
703 ev_writer *w = xmalloc(sizeof *w);
705 D(("registering writer fd %d callback %p %p", fd, (void *)callback, u));
706 w->s.write = ev_writer_write;
708 w->callback = callback;
711 if(ev_fd(ev, ev_write, fd, writer_callback, w, what))
713 ev_fd_disable(ev, ev_write, fd);
717 struct sink *ev_writer_sink(ev_writer *w) {
721 static int writer_shutdown(ev_source *ev,
722 const attribute((unused)) struct timeval *now,
726 return w->callback(ev, w->fd, 0, w->u);
729 int ev_writer_close(ev_writer *w) {
730 D(("close writer fd %d", w->fd));
732 if(w->b.start == w->b.end) {
733 /* we're already finished */
734 ev_fd_cancel(w->ev, ev_write, w->fd);
735 return ev_timeout(w->ev, 0, 0, writer_shutdown, w);
740 int ev_writer_cancel(ev_writer *w) {
741 D(("cancel writer fd %d", w->fd));
742 return ev_fd_cancel(w->ev, ev_write, w->fd);
745 int ev_writer_flush(ev_writer *w) {
746 return writer_callback(w->ev, w->fd, w);
749 /* buffered reader ************************************************************/
754 ev_reader_callback *callback;
755 ev_error_callback *error_callback;
761 static int reader_callback(ev_source *ev, int fd, void *u) {
765 buffer_space(&r->b, 1);
766 n = read(fd, r->b.end, r->b.top - r->b.end);
767 D(("read fd %d buffer %d returned %d errno %d",
768 fd, (int)(r->b.top - r->b.end), n, errno));
771 return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 0, r->u);
774 ev_fd_cancel(ev, ev_read, fd);
775 return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 1, r->u);
782 ev_fd_cancel(ev, ev_read, fd);
783 return r->error_callback(ev, fd, errno, r->u);
789 ev_reader *ev_reader_new(ev_source *ev,
791 ev_reader_callback *callback,
792 ev_error_callback *error_callback,
795 ev_reader *r = xmalloc(sizeof *r);
797 D(("registering reader fd %d callback %p %p %p",
798 fd, (void *)callback, (void *)error_callback, u));
800 r->callback = callback;
801 r->error_callback = error_callback;
804 if(ev_fd(ev, ev_read, fd, reader_callback, r, what))
809 void ev_reader_buffer(ev_reader *r, size_t nbytes) {
810 buffer_space(&r->b, nbytes - (r->b.end - r->b.start));
813 void ev_reader_consume(ev_reader *r, size_t n) {
817 int ev_reader_cancel(ev_reader *r) {
818 D(("cancel reader fd %d", r->fd));
819 return ev_fd_cancel(r->ev, ev_read, r->fd);
822 int ev_reader_disable(ev_reader *r) {
823 D(("disable reader fd %d", r->fd));
824 return r->eof ? 0 : ev_fd_disable(r->ev, ev_read, r->fd);
827 static int reader_continuation(ev_source attribute((unused)) *ev,
828 const attribute((unused)) struct timeval *now,
832 D(("reader continuation callback fd %d", r->fd));
833 if(ev_fd_enable(r->ev, ev_read, r->fd)) return -1;
834 return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
837 int ev_reader_incomplete(ev_reader *r) {
838 if(ev_fd_disable(r->ev, ev_read, r->fd)) return -1;
839 return ev_timeout(r->ev, 0, 0, reader_continuation, r);
842 static int reader_enabled(ev_source *ev,
843 const attribute((unused)) struct timeval *now,
847 D(("reader enabled callback fd %d", r->fd));
848 return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
851 int ev_reader_enable(ev_reader *r) {
852 D(("enable reader fd %d", r->fd));
853 return ((r->eof ? 0 : ev_fd_enable(r->ev, ev_read, r->fd))
854 || ev_timeout(r->ev, 0, 0, reader_enabled, r)) ? -1 : 0;