chiark / gitweb /
chattier event loop error logging
[disorder] / lib / event.c
CommitLineData
460b9539 1/*
2 * This file is part of DisOrder.
e8c92ba7 3 * Copyright (C) 2004, 2005, 2007 Richard Kettlewell
460b9539 4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18 * USA
19 */
20
21#include <config.h>
22
23#include <unistd.h>
24#include <fcntl.h>
25#include <sys/time.h>
26#include <sys/types.h>
27#include <sys/resource.h>
28#include <sys/wait.h>
29#include <unistd.h>
30#include <assert.h>
31#include <signal.h>
32#include <errno.h>
33#include <string.h>
34#include <limits.h>
35#include <sys/socket.h>
36#include <netinet/in.h>
37#include <sys/un.h>
38#include <stdio.h>
39#include "event.h"
40#include "mem.h"
41#include "log.h"
42#include "syscalls.h"
43#include "printf.h"
44#include "sink.h"
45
46struct timeout {
47 struct timeout *next;
48 struct timeval when;
49 ev_timeout_callback *callback;
50 void *u;
51 int resolve;
52};
53
54struct fd {
55 int fd;
56 ev_fd_callback *callback;
57 void *u;
e8c92ba7 58 const char *what;
460b9539 59};
60
61struct fdmode {
62 fd_set enabled;
63 fd_set tripped;
64 int nfds, fdslots;
65 struct fd *fds;
66 int maxfd;
67};
68
69struct signal {
70 struct sigaction oldsa;
71 ev_signal_callback *callback;
72 void *u;
73};
74
75struct child {
76 pid_t pid;
77 int options;
78 ev_child_callback *callback;
79 void *u;
80};
81
82struct ev_source {
83 struct fdmode mode[ev_nmodes];
84 struct timeout *timeouts;
85 struct signal signals[NSIG];
86 sigset_t sigmask;
87 int escape;
88 int sigpipe[2];
89 int nchildren, nchildslots;
90 struct child *children;
91};
92
93static const char *modenames[] = { "read", "write", "except" };
94
95/* utilities ******************************************************************/
96
97static inline int gt(const struct timeval *a, const struct timeval *b) {
98 if(a->tv_sec > b->tv_sec)
99 return 1;
100 if(a->tv_sec == b->tv_sec
101 && a->tv_usec > b->tv_usec)
102 return 1;
103 return 0;
104}
105
106static inline int ge(const struct timeval *a, const struct timeval *b) {
107 return !gt(b, a);
108}
109
110/* creation *******************************************************************/
111
112ev_source *ev_new(void) {
113 ev_source *ev = xmalloc(sizeof *ev);
114 int n;
115
116 memset(ev, 0, sizeof *ev);
117 for(n = 0; n < ev_nmodes; ++n)
118 FD_ZERO(&ev->mode[n].enabled);
119 ev->sigpipe[0] = ev->sigpipe[1] = -1;
120 sigemptyset(&ev->sigmask);
121 return ev;
122}
123
124/* event loop *****************************************************************/
125
126int ev_run(ev_source *ev) {
127 for(;;) {
128 struct timeval now;
129 struct timeval delta;
130 int n, mode;
131 int ret;
132 int maxfd;
133 struct timeout *t, **tt;
e8c92ba7 134 struct stat sb;
460b9539 135
136 xgettimeofday(&now, 0);
137 /* Handle timeouts. We don't want to handle any timeouts that are added
138 * while we're handling them (otherwise we'd have to break out of infinite
139 * loops, preferrably without starving better-behaved subsystems). Hence
140 * the slightly complicated two-phase approach here. */
141 for(t = ev->timeouts;
142 t && ge(&now, &t->when);
143 t = t->next) {
144 t->resolve = 1;
145 D(("calling timeout for %ld.%ld callback %p %p",
146 (long)t->when.tv_sec, (long)t->when.tv_usec,
147 (void *)t->callback, t->u));
148 ret = t->callback(ev, &now, t->u);
149 if(ret)
150 return ret;
151 }
152 tt = &ev->timeouts;
153 while((t = *tt)) {
154 if(t->resolve)
155 *tt = t->next;
156 else
157 tt = &t->next;
158 }
159 maxfd = 0;
160 for(mode = 0; mode < ev_nmodes; ++mode) {
161 ev->mode[mode].tripped = ev->mode[mode].enabled;
162 if(ev->mode[mode].maxfd > maxfd)
163 maxfd = ev->mode[mode].maxfd;
164 }
165 xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
166 do {
167 if(ev->timeouts) {
168 xgettimeofday(&now, 0);
169 delta.tv_sec = ev->timeouts->when.tv_sec - now.tv_sec;
170 delta.tv_usec = ev->timeouts->when.tv_usec - now.tv_usec;
171 if(delta.tv_usec < 0) {
172 delta.tv_usec += 1000000;
173 --delta.tv_sec;
174 }
175 if(delta.tv_sec < 0)
176 delta.tv_sec = delta.tv_usec = 0;
177 n = select(maxfd + 1,
178 &ev->mode[ev_read].tripped,
179 &ev->mode[ev_write].tripped,
180 &ev->mode[ev_except].tripped,
181 &delta);
182 } else {
183 n = select(maxfd + 1,
184 &ev->mode[ev_read].tripped,
185 &ev->mode[ev_write].tripped,
186 &ev->mode[ev_except].tripped,
187 0);
188 }
189 } while(n < 0 && errno == EINTR);
190 xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
191 if(n < 0) {
192 error(errno, "error calling select");
e8c92ba7
RK
193 if(errno == EBADF) {
194 /* If there's a bad FD in the mix then check them all and log what we
195 * find, to ease debugging */
196 for(mode = 0; mode < ev_nmodes; ++mode) {
197 for(n = 0; n < ev->mode[mode].nfds; ++n) {
198 const int fd = ev->mode[mode].fds[n].fd;
199
200 if(FD_ISSET(fd, &ev->mode[mode].enabled)
201 && fstat(fd, &sb) < 0)
202 error(errno, "fstat %d (%s)", fd, ev->mode[mode].fds[n].what);
203 }
204 }
205 }
460b9539 206 return -1;
207 }
208 if(n > 0) {
209 /* if anything deranges the meaning of an fd, or re-orders the
210 * fds[] tables, we'd better give up; such operations will
211 * therefore set @escape@. */
212 ev->escape = 0;
213 for(mode = 0; mode < ev_nmodes && !ev->escape; ++mode)
214 for(n = 0; n < ev->mode[mode].nfds && !ev->escape; ++n) {
215 int fd = ev->mode[mode].fds[n].fd;
216 if(FD_ISSET(fd, &ev->mode[mode].tripped)) {
217 D(("calling %s fd %d callback %p %p", modenames[mode], fd,
218 (void *)ev->mode[mode].fds[n].callback,
219 ev->mode[mode].fds[n].u));
220 ret = ev->mode[mode].fds[n].callback(ev, fd,
221 ev->mode[mode].fds[n].u);
222 if(ret)
223 return ret;
224 }
225 }
226 }
227 /* we'll pick up timeouts back round the loop */
228 }
229}
230
231/* file descriptors ***********************************************************/
232
233int ev_fd(ev_source *ev,
234 ev_fdmode mode,
235 int fd,
236 ev_fd_callback *callback,
e8c92ba7
RK
237 void *u,
238 const char *what) {
460b9539 239 int n;
240
241 D(("registering %s fd %d callback %p %p", modenames[mode], fd,
242 (void *)callback, u));
243 assert(mode < ev_nmodes);
244 if(ev->mode[mode].nfds >= ev->mode[mode].fdslots) {
245 ev->mode[mode].fdslots = (ev->mode[mode].fdslots
246 ? 2 * ev->mode[mode].fdslots : 16);
247 D(("expanding %s fd table to %d entries", modenames[mode],
248 ev->mode[mode].fdslots));
249 ev->mode[mode].fds = xrealloc(ev->mode[mode].fds,
250 ev->mode[mode].fdslots * sizeof (struct fd));
251 }
252 n = ev->mode[mode].nfds++;
253 FD_SET(fd, &ev->mode[mode].enabled);
254 ev->mode[mode].fds[n].fd = fd;
255 ev->mode[mode].fds[n].callback = callback;
256 ev->mode[mode].fds[n].u = u;
e8c92ba7 257 ev->mode[mode].fds[n].what = what;
460b9539 258 if(fd > ev->mode[mode].maxfd)
259 ev->mode[mode].maxfd = fd;
260 ev->escape = 1;
261 return 0;
262}
263
264int ev_fd_cancel(ev_source *ev, ev_fdmode mode, int fd) {
265 int n;
266 int maxfd;
267
268 D(("cancelling mode %s fd %d", modenames[mode], fd));
269 /* find the right struct fd */
270 for(n = 0; n < ev->mode[mode].nfds && fd != ev->mode[mode].fds[n].fd; ++n)
271 ;
272 assert(n < ev->mode[mode].nfds);
273 /* swap in the last fd and reduce the count */
274 if(n != ev->mode[mode].nfds - 1)
275 ev->mode[mode].fds[n] = ev->mode[mode].fds[ev->mode[mode].nfds - 1];
276 --ev->mode[mode].nfds;
277 /* if that was the biggest fd, find the new biggest one */
278 if(fd == ev->mode[mode].maxfd) {
279 maxfd = 0;
280 for(n = 0; n < ev->mode[mode].nfds; ++n)
281 if(ev->mode[mode].fds[n].fd > maxfd)
282 maxfd = ev->mode[mode].fds[n].fd;
283 ev->mode[mode].maxfd = maxfd;
284 }
285 /* don't tell select about this fd any more */
286 FD_CLR(fd, &ev->mode[mode].enabled);
287 ev->escape = 1;
288 return 0;
289}
290
291int ev_fd_enable(ev_source *ev, ev_fdmode mode, int fd) {
292 D(("enabling mode %s fd %d", modenames[mode], fd));
293 FD_SET(fd, &ev->mode[mode].enabled);
294 return 0;
295}
296
297int ev_fd_disable(ev_source *ev, ev_fdmode mode, int fd) {
298 D(("disabling mode %s fd %d", modenames[mode], fd));
299 FD_CLR(fd, &ev->mode[mode].enabled);
300 FD_CLR(fd, &ev->mode[mode].tripped);
301 return 0;
302}
303
304/* timeouts *******************************************************************/
305
306int ev_timeout(ev_source *ev,
307 ev_timeout_handle *handlep,
308 const struct timeval *when,
309 ev_timeout_callback *callback,
310 void *u) {
311 struct timeout *t, *p, **pp;
312
313 D(("registering timeout at %ld.%ld callback %p %p",
314 when ? (long)when->tv_sec : 0, when ? (long)when->tv_usec : 0,
315 (void *)callback, u));
316 t = xmalloc(sizeof *t);
317 if(when)
318 t->when = *when;
319 t->callback = callback;
320 t->u = u;
321 pp = &ev->timeouts;
322 while((p = *pp) && gt(&t->when, &p->when))
323 pp = &p->next;
324 t->next = p;
325 *pp = t;
326 if(handlep)
327 *handlep = t;
328 return 0;
329}
330
331int ev_timeout_cancel(ev_source *ev,
332 ev_timeout_handle handle) {
333 struct timeout *t = handle, *p, **pp;
334
335 for(pp = &ev->timeouts; (p = *pp) && p != t; pp = &p->next)
336 ;
337 if(p) {
338 *pp = p->next;
339 return 0;
340 } else
341 return -1;
342}
343
344/* signals ********************************************************************/
345
346static int sigfd[NSIG];
347
348static void sighandler(int s) {
349 unsigned char sc = s;
350 static const char errmsg[] = "error writing to signal pipe";
351
352 /* probably the reader has stopped listening for some reason */
353 if(write(sigfd[s], &sc, 1) < 0) {
354 write(2, errmsg, sizeof errmsg - 1);
355 abort();
356 }
357}
358
359static int signal_read(ev_source *ev,
360 int attribute((unused)) fd,
361 void attribute((unused)) *u) {
362 unsigned char s;
363 int n;
364 int ret;
365
366 if((n = read(ev->sigpipe[0], &s, 1)) == 1)
367 if((ret = ev->signals[s].callback(ev, s, ev->signals[s].u)))
368 return ret;
369 assert(n != 0);
370 if(n < 0 && (errno != EINTR && errno != EAGAIN)) {
371 error(errno, "error reading from signal pipe %d", ev->sigpipe[0]);
372 return -1;
373 }
374 return 0;
375}
376
377static void close_sigpipe(ev_source *ev) {
378 int save_errno = errno;
379
380 xclose(ev->sigpipe[0]);
381 xclose(ev->sigpipe[1]);
382 ev->sigpipe[0] = ev->sigpipe[1] = -1;
383 errno = save_errno;
384}
385
386int ev_signal(ev_source *ev,
387 int sig,
388 ev_signal_callback *callback,
389 void *u) {
390 int n;
391 struct sigaction sa;
392
393 D(("registering signal %d handler callback %p %p", sig, (void *)callback, u));
394 assert(sig > 0);
395 assert(sig < NSIG);
396 assert(sig <= UCHAR_MAX);
397 if(ev->sigpipe[0] == -1) {
398 D(("creating signal pipe"));
399 xpipe(ev->sigpipe);
400 D(("signal pipe is %d, %d", ev->sigpipe[0], ev->sigpipe[1]));
401 for(n = 0; n < 2; ++n) {
402 nonblock(ev->sigpipe[n]);
403 cloexec(ev->sigpipe[n]);
404 }
e8c92ba7 405 if(ev_fd(ev, ev_read, ev->sigpipe[0], signal_read, 0, "sigpipe read")) {
460b9539 406 close_sigpipe(ev);
407 return -1;
408 }
409 }
410 sigaddset(&ev->sigmask, sig);
411 xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
412 sigfd[sig] = ev->sigpipe[1];
413 ev->signals[sig].callback = callback;
414 ev->signals[sig].u = u;
415 sa.sa_handler = sighandler;
416 sigfillset(&sa.sa_mask);
417 sa.sa_flags = SA_RESTART;
418 xsigaction(sig, &sa, &ev->signals[sig].oldsa);
419 ev->escape = 1;
420 return 0;
421}
422
423int ev_signal_cancel(ev_source *ev,
424 int sig) {
425 sigset_t ss;
426
427 xsigaction(sig, &ev->signals[sig].oldsa, 0);
428 ev->signals[sig].callback = 0;
429 ev->escape = 1;
430 sigdelset(&ev->sigmask, sig);
431 sigemptyset(&ss);
432 sigaddset(&ss, sig);
433 xsigprocmask(SIG_UNBLOCK, &ss, 0);
434 return 0;
435}
436
437void ev_signal_atfork(ev_source *ev) {
438 int sig;
439
440 if(ev->sigpipe[0] != -1) {
441 /* revert any handled signals to their original state */
442 for(sig = 1; sig < NSIG; ++sig) {
443 if(ev->signals[sig].callback != 0)
444 xsigaction(sig, &ev->signals[sig].oldsa, 0);
445 }
446 /* and then unblock them */
447 xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
448 /* don't want a copy of the signal pipe open inside the fork */
449 xclose(ev->sigpipe[0]);
450 xclose(ev->sigpipe[1]);
451 }
452}
453
454/* child processes ************************************************************/
455
456static int sigchld_callback(ev_source *ev,
457 int attribute((unused)) sig,
458 void attribute((unused)) *u) {
459 struct rusage ru;
460 pid_t r;
461 int status, n, ret, revisit;
462
463 do {
464 revisit = 0;
465 for(n = 0; n < ev->nchildren; ++n) {
466 r = wait4(ev->children[n].pid,
467 &status,
468 ev->children[n].options | WNOHANG,
469 &ru);
470 if(r > 0) {
471 ev_child_callback *c = ev->children[n].callback;
472 void *cu = ev->children[n].u;
473
474 if(WIFEXITED(status) || WIFSIGNALED(status))
475 ev_child_cancel(ev, r);
476 revisit = 1;
477 if((ret = c(ev, r, status, &ru, cu)))
478 return ret;
479 } else if(r < 0) {
480 /* We should "never" get an ECHILD but it can in fact happen. For
481 * instance on Linux 2.4.31, and probably other versions, if someone
482 * straces a child process and then a different child process
483 * terminates, when we wait4() the trace process we will get ECHILD
484 * because it has been reparented to strace. Obviously this is a
485 * hopeless design flaw in the tracing infrastructure, but we don't
486 * want the disorder server to bomb out because of it. So we just log
487 * the problem and ignore it.
488 */
489 error(errno, "error calling wait4 for PID %lu (broken ptrace?)",
490 (unsigned long)ev->children[n].pid);
491 if(errno != ECHILD)
492 return -1;
493 }
494 }
495 } while(revisit);
496 return 0;
497}
498
499int ev_child_setup(ev_source *ev) {
500 D(("installing SIGCHLD handler"));
501 return ev_signal(ev, SIGCHLD, sigchld_callback, 0);
502}
503
504int ev_child(ev_source *ev,
505 pid_t pid,
506 int options,
507 ev_child_callback *callback,
508 void *u) {
509 int n;
510
511 D(("registering child handling %ld options %d callback %p %p",
512 (long)pid, options, (void *)callback, u));
513 assert(ev->signals[SIGCHLD].callback == sigchld_callback);
514 if(ev->nchildren >= ev->nchildslots) {
515 ev->nchildslots = ev->nchildslots ? 2 * ev->nchildslots : 16;
516 ev->children = xrealloc(ev->children,
517 ev->nchildslots * sizeof (struct child));
518 }
519 n = ev->nchildren++;
520 ev->children[n].pid = pid;
521 ev->children[n].options = options;
522 ev->children[n].callback = callback;
523 ev->children[n].u = u;
524 return 0;
525}
526
527int ev_child_cancel(ev_source *ev,
528 pid_t pid) {
529 int n;
530
531 for(n = 0; n < ev->nchildren && ev->children[n].pid != pid; ++n)
532 ;
533 assert(n < ev->nchildren);
534 if(n != ev->nchildren - 1)
535 ev->children[n] = ev->children[ev->nchildren - 1];
536 --ev->nchildren;
537 return 0;
538}
539
540/* socket listeners ***********************************************************/
541
542struct listen_state {
543 ev_listen_callback *callback;
544 void *u;
545};
546
547static int listen_callback(ev_source *ev, int fd, void *u) {
548 const struct listen_state *l = u;
549 int newfd;
550 union {
551 struct sockaddr_in in;
552#if HAVE_STRUCT_SOCKADDR_IN6
553 struct sockaddr_in6 in6;
554#endif
555 struct sockaddr_un un;
556 struct sockaddr sa;
557 } addr;
558 socklen_t addrlen;
559 int ret;
560
561 D(("callback for listener fd %d", fd));
562 while((addrlen = sizeof addr),
563 (newfd = accept(fd, &addr.sa, &addrlen)) >= 0) {
564 if((ret = l->callback(ev, newfd, &addr.sa, addrlen, l->u)))
565 return ret;
566 }
567 switch(errno) {
568 case EINTR:
569 case EAGAIN:
570 break;
571#ifdef ECONNABORTED
572 case ECONNABORTED:
573 error(errno, "error calling accept");
574 break;
575#endif
576#ifdef EPROTO
577 case EPROTO:
578 /* XXX on some systems EPROTO should be fatal, but we don't know if
579 * we're running on one of them */
580 error(errno, "error calling accept");
581 break;
582#endif
583 default:
584 fatal(errno, "error calling accept");
585 break;
586 }
587 if(errno != EINTR && errno != EAGAIN)
588 error(errno, "error calling accept");
589 return 0;
590}
591
592int ev_listen(ev_source *ev,
593 int fd,
594 ev_listen_callback *callback,
e8c92ba7
RK
595 void *u,
596 const char *what) {
460b9539 597 struct listen_state *l = xmalloc(sizeof *l);
598
599 D(("registering listener fd %d callback %p %p", fd, (void *)callback, u));
600 l->callback = callback;
601 l->u = u;
e8c92ba7 602 return ev_fd(ev, ev_read, fd, listen_callback, l, what);
460b9539 603}
604
605int ev_listen_cancel(ev_source *ev, int fd) {
606 D(("cancelling listener fd %d", fd));
607 return ev_fd_cancel(ev, ev_read, fd);
608}
609
610/* buffer *********************************************************************/
611
612struct buffer {
613 char *base, *start, *end, *top;
614};
615
616/* make sure there is @bytes@ available at @b->end@ */
617static void buffer_space(struct buffer *b, size_t bytes) {
618 D(("buffer_space %p %p %p %p want %lu",
619 (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top,
620 (unsigned long)bytes));
621 if(b->start == b->end)
622 b->start = b->end = b->base;
623 if((size_t)(b->top - b->end) < bytes) {
624 if((size_t)((b->top - b->end) + (b->start - b->base)) < bytes) {
625 size_t newspace = b->end - b->start + bytes, n;
626 char *newbase;
627
628 for(n = 16; n < newspace; n *= 2)
629 ;
630 newbase = xmalloc_noptr(n);
631 memcpy(newbase, b->start, b->end - b->start);
632 b->base = newbase;
633 b->end = newbase + (b->end - b->start);
634 b->top = newbase + n;
635 b->start = newbase; /* must be last */
636 } else {
637 memmove(b->base, b->start, b->end - b->start);
638 b->end = b->base + (b->end - b->start);
639 b->start = b->base;
640 }
641 }
642 D(("result %p %p %p %p",
643 (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top));
644}
645
646/* buffered writer ************************************************************/
647
648struct ev_writer {
649 struct sink s;
650 struct buffer b;
651 int fd;
652 int eof;
653 ev_error_callback *callback;
654 void *u;
655 ev_source *ev;
656};
657
658static int writer_callback(ev_source *ev, int fd, void *u) {
659 ev_writer *w = u;
660 int n;
661
662 n = write(fd, w->b.start, w->b.end - w->b.start);
663 D(("callback for writer fd %d, %ld bytes, n=%d, errno=%d",
664 fd, (long)(w->b.end - w->b.start), n, errno));
665 if(n >= 0) {
666 w->b.start += n;
667 if(w->b.start == w->b.end) {
668 if(w->eof) {
669 ev_fd_cancel(ev, ev_write, fd);
670 return w->callback(ev, fd, 0, w->u);
671 } else
672 ev_fd_disable(ev, ev_write, fd);
673 }
674 } else {
675 switch(errno) {
676 case EINTR:
677 case EAGAIN:
678 break;
679 default:
680 ev_fd_cancel(ev, ev_write, fd);
681 return w->callback(ev, fd, errno, w->u);
682 }
683 }
684 return 0;
685}
686
687static int ev_writer_write(struct sink *sk, const void *s, int n) {
688 ev_writer *w = (ev_writer *)sk;
689
690 buffer_space(&w->b, n);
691 if(w->b.start == w->b.end)
692 ev_fd_enable(w->ev, ev_write, w->fd);
693 memcpy(w->b.end, s, n);
694 w->b.end += n;
695 return 0;
696}
697
698ev_writer *ev_writer_new(ev_source *ev,
699 int fd,
700 ev_error_callback *callback,
e8c92ba7
RK
701 void *u,
702 const char *what) {
460b9539 703 ev_writer *w = xmalloc(sizeof *w);
704
705 D(("registering writer fd %d callback %p %p", fd, (void *)callback, u));
706 w->s.write = ev_writer_write;
707 w->fd = fd;
708 w->callback = callback;
709 w->u = u;
710 w->ev = ev;
e8c92ba7 711 if(ev_fd(ev, ev_write, fd, writer_callback, w, what))
460b9539 712 return 0;
713 ev_fd_disable(ev, ev_write, fd);
714 return w;
715}
716
717struct sink *ev_writer_sink(ev_writer *w) {
718 return &w->s;
719}
720
721static int writer_shutdown(ev_source *ev,
722 const attribute((unused)) struct timeval *now,
723 void *u) {
724 ev_writer *w = u;
725
726 return w->callback(ev, w->fd, 0, w->u);
727}
728
729int ev_writer_close(ev_writer *w) {
730 D(("close writer fd %d", w->fd));
731 w->eof = 1;
732 if(w->b.start == w->b.end) {
733 /* we're already finished */
734 ev_fd_cancel(w->ev, ev_write, w->fd);
735 return ev_timeout(w->ev, 0, 0, writer_shutdown, w);
736 }
737 return 0;
738}
739
740int ev_writer_cancel(ev_writer *w) {
741 D(("cancel writer fd %d", w->fd));
742 return ev_fd_cancel(w->ev, ev_write, w->fd);
743}
744
745int ev_writer_flush(ev_writer *w) {
746 return writer_callback(w->ev, w->fd, w);
747}
748
749/* buffered reader ************************************************************/
750
751struct ev_reader {
752 struct buffer b;
753 int fd;
754 ev_reader_callback *callback;
755 ev_error_callback *error_callback;
756 void *u;
757 ev_source *ev;
758 int eof;
759};
760
761static int reader_callback(ev_source *ev, int fd, void *u) {
762 ev_reader *r = u;
763 int n;
764
765 buffer_space(&r->b, 1);
766 n = read(fd, r->b.end, r->b.top - r->b.end);
767 D(("read fd %d buffer %d returned %d errno %d",
768 fd, (int)(r->b.top - r->b.end), n, errno));
769 if(n > 0) {
770 r->b.end += n;
771 return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 0, r->u);
772 } else if(n == 0) {
773 r->eof = 1;
774 ev_fd_cancel(ev, ev_read, fd);
775 return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 1, r->u);
776 } else {
777 switch(errno) {
778 case EINTR:
779 case EAGAIN:
780 break;
781 default:
782 ev_fd_cancel(ev, ev_read, fd);
783 return r->error_callback(ev, fd, errno, r->u);
784 }
785 }
786 return 0;
787}
788
789ev_reader *ev_reader_new(ev_source *ev,
790 int fd,
791 ev_reader_callback *callback,
792 ev_error_callback *error_callback,
e8c92ba7
RK
793 void *u,
794 const char *what) {
460b9539 795 ev_reader *r = xmalloc(sizeof *r);
796
797 D(("registering reader fd %d callback %p %p %p",
798 fd, (void *)callback, (void *)error_callback, u));
799 r->fd = fd;
800 r->callback = callback;
801 r->error_callback = error_callback;
802 r->u = u;
803 r->ev = ev;
e8c92ba7 804 if(ev_fd(ev, ev_read, fd, reader_callback, r, what))
460b9539 805 return 0;
806 return r;
807}
808
809void ev_reader_buffer(ev_reader *r, size_t nbytes) {
810 buffer_space(&r->b, nbytes - (r->b.end - r->b.start));
811}
812
813void ev_reader_consume(ev_reader *r, size_t n) {
814 r->b.start += n;
815}
816
817int ev_reader_cancel(ev_reader *r) {
818 D(("cancel reader fd %d", r->fd));
819 return ev_fd_cancel(r->ev, ev_read, r->fd);
820}
821
822int ev_reader_disable(ev_reader *r) {
823 D(("disable reader fd %d", r->fd));
824 return r->eof ? 0 : ev_fd_disable(r->ev, ev_read, r->fd);
825}
826
827static int reader_continuation(ev_source attribute((unused)) *ev,
828 const attribute((unused)) struct timeval *now,
829 void *u) {
830 ev_reader *r = u;
831
832 D(("reader continuation callback fd %d", r->fd));
833 if(ev_fd_enable(r->ev, ev_read, r->fd)) return -1;
834 return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
835}
836
837int ev_reader_incomplete(ev_reader *r) {
838 if(ev_fd_disable(r->ev, ev_read, r->fd)) return -1;
839 return ev_timeout(r->ev, 0, 0, reader_continuation, r);
840}
841
842static int reader_enabled(ev_source *ev,
843 const attribute((unused)) struct timeval *now,
844 void *u) {
845 ev_reader *r = u;
846
847 D(("reader enabled callback fd %d", r->fd));
848 return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
849}
850
851int ev_reader_enable(ev_reader *r) {
852 D(("enable reader fd %d", r->fd));
853 return ((r->eof ? 0 : ev_fd_enable(r->ev, ev_read, r->fd))
854 || ev_timeout(r->ev, 0, 0, reader_enabled, r)) ? -1 : 0;
855}
856
857/*
858Local Variables:
859c-basic-offset:2
860comment-column:40
861fill-column:79
862End:
863*/