chiark / gitweb /
separate thread to add to heap
[disorder] / lib / event.c
CommitLineData
460b9539 1/*
2 * This file is part of DisOrder.
3 * Copyright (C) 2004, 2005 Richard Kettlewell
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18 * USA
19 */
20
21#include <config.h>
22
23#include <unistd.h>
24#include <fcntl.h>
25#include <sys/time.h>
26#include <sys/types.h>
27#include <sys/resource.h>
28#include <sys/wait.h>
29#include <unistd.h>
30#include <assert.h>
31#include <signal.h>
32#include <errno.h>
33#include <string.h>
34#include <limits.h>
35#include <sys/socket.h>
36#include <netinet/in.h>
37#include <sys/un.h>
38#include <stdio.h>
39#include "event.h"
40#include "mem.h"
41#include "log.h"
42#include "syscalls.h"
43#include "printf.h"
44#include "sink.h"
45
46struct timeout {
47 struct timeout *next;
48 struct timeval when;
49 ev_timeout_callback *callback;
50 void *u;
51 int resolve;
52};
53
54struct fd {
55 int fd;
56 ev_fd_callback *callback;
57 void *u;
58};
59
60struct fdmode {
61 fd_set enabled;
62 fd_set tripped;
63 int nfds, fdslots;
64 struct fd *fds;
65 int maxfd;
66};
67
68struct signal {
69 struct sigaction oldsa;
70 ev_signal_callback *callback;
71 void *u;
72};
73
74struct child {
75 pid_t pid;
76 int options;
77 ev_child_callback *callback;
78 void *u;
79};
80
81struct ev_source {
82 struct fdmode mode[ev_nmodes];
83 struct timeout *timeouts;
84 struct signal signals[NSIG];
85 sigset_t sigmask;
86 int escape;
87 int sigpipe[2];
88 int nchildren, nchildslots;
89 struct child *children;
90};
91
92static const char *modenames[] = { "read", "write", "except" };
93
94/* utilities ******************************************************************/
95
96static inline int gt(const struct timeval *a, const struct timeval *b) {
97 if(a->tv_sec > b->tv_sec)
98 return 1;
99 if(a->tv_sec == b->tv_sec
100 && a->tv_usec > b->tv_usec)
101 return 1;
102 return 0;
103}
104
105static inline int ge(const struct timeval *a, const struct timeval *b) {
106 return !gt(b, a);
107}
108
109/* creation *******************************************************************/
110
111ev_source *ev_new(void) {
112 ev_source *ev = xmalloc(sizeof *ev);
113 int n;
114
115 memset(ev, 0, sizeof *ev);
116 for(n = 0; n < ev_nmodes; ++n)
117 FD_ZERO(&ev->mode[n].enabled);
118 ev->sigpipe[0] = ev->sigpipe[1] = -1;
119 sigemptyset(&ev->sigmask);
120 return ev;
121}
122
123/* event loop *****************************************************************/
124
125int ev_run(ev_source *ev) {
126 for(;;) {
127 struct timeval now;
128 struct timeval delta;
129 int n, mode;
130 int ret;
131 int maxfd;
132 struct timeout *t, **tt;
133
134 xgettimeofday(&now, 0);
135 /* Handle timeouts. We don't want to handle any timeouts that are added
136 * while we're handling them (otherwise we'd have to break out of infinite
137 * loops, preferrably without starving better-behaved subsystems). Hence
138 * the slightly complicated two-phase approach here. */
139 for(t = ev->timeouts;
140 t && ge(&now, &t->when);
141 t = t->next) {
142 t->resolve = 1;
143 D(("calling timeout for %ld.%ld callback %p %p",
144 (long)t->when.tv_sec, (long)t->when.tv_usec,
145 (void *)t->callback, t->u));
146 ret = t->callback(ev, &now, t->u);
147 if(ret)
148 return ret;
149 }
150 tt = &ev->timeouts;
151 while((t = *tt)) {
152 if(t->resolve)
153 *tt = t->next;
154 else
155 tt = &t->next;
156 }
157 maxfd = 0;
158 for(mode = 0; mode < ev_nmodes; ++mode) {
159 ev->mode[mode].tripped = ev->mode[mode].enabled;
160 if(ev->mode[mode].maxfd > maxfd)
161 maxfd = ev->mode[mode].maxfd;
162 }
163 xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
164 do {
165 if(ev->timeouts) {
166 xgettimeofday(&now, 0);
167 delta.tv_sec = ev->timeouts->when.tv_sec - now.tv_sec;
168 delta.tv_usec = ev->timeouts->when.tv_usec - now.tv_usec;
169 if(delta.tv_usec < 0) {
170 delta.tv_usec += 1000000;
171 --delta.tv_sec;
172 }
173 if(delta.tv_sec < 0)
174 delta.tv_sec = delta.tv_usec = 0;
175 n = select(maxfd + 1,
176 &ev->mode[ev_read].tripped,
177 &ev->mode[ev_write].tripped,
178 &ev->mode[ev_except].tripped,
179 &delta);
180 } else {
181 n = select(maxfd + 1,
182 &ev->mode[ev_read].tripped,
183 &ev->mode[ev_write].tripped,
184 &ev->mode[ev_except].tripped,
185 0);
186 }
187 } while(n < 0 && errno == EINTR);
188 xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
189 if(n < 0) {
190 error(errno, "error calling select");
191 return -1;
192 }
193 if(n > 0) {
194 /* if anything deranges the meaning of an fd, or re-orders the
195 * fds[] tables, we'd better give up; such operations will
196 * therefore set @escape@. */
197 ev->escape = 0;
198 for(mode = 0; mode < ev_nmodes && !ev->escape; ++mode)
199 for(n = 0; n < ev->mode[mode].nfds && !ev->escape; ++n) {
200 int fd = ev->mode[mode].fds[n].fd;
201 if(FD_ISSET(fd, &ev->mode[mode].tripped)) {
202 D(("calling %s fd %d callback %p %p", modenames[mode], fd,
203 (void *)ev->mode[mode].fds[n].callback,
204 ev->mode[mode].fds[n].u));
205 ret = ev->mode[mode].fds[n].callback(ev, fd,
206 ev->mode[mode].fds[n].u);
207 if(ret)
208 return ret;
209 }
210 }
211 }
212 /* we'll pick up timeouts back round the loop */
213 }
214}
215
216/* file descriptors ***********************************************************/
217
218int ev_fd(ev_source *ev,
219 ev_fdmode mode,
220 int fd,
221 ev_fd_callback *callback,
222 void *u) {
223 int n;
224
225 D(("registering %s fd %d callback %p %p", modenames[mode], fd,
226 (void *)callback, u));
227 assert(mode < ev_nmodes);
228 if(ev->mode[mode].nfds >= ev->mode[mode].fdslots) {
229 ev->mode[mode].fdslots = (ev->mode[mode].fdslots
230 ? 2 * ev->mode[mode].fdslots : 16);
231 D(("expanding %s fd table to %d entries", modenames[mode],
232 ev->mode[mode].fdslots));
233 ev->mode[mode].fds = xrealloc(ev->mode[mode].fds,
234 ev->mode[mode].fdslots * sizeof (struct fd));
235 }
236 n = ev->mode[mode].nfds++;
237 FD_SET(fd, &ev->mode[mode].enabled);
238 ev->mode[mode].fds[n].fd = fd;
239 ev->mode[mode].fds[n].callback = callback;
240 ev->mode[mode].fds[n].u = u;
241 if(fd > ev->mode[mode].maxfd)
242 ev->mode[mode].maxfd = fd;
243 ev->escape = 1;
244 return 0;
245}
246
247int ev_fd_cancel(ev_source *ev, ev_fdmode mode, int fd) {
248 int n;
249 int maxfd;
250
251 D(("cancelling mode %s fd %d", modenames[mode], fd));
252 /* find the right struct fd */
253 for(n = 0; n < ev->mode[mode].nfds && fd != ev->mode[mode].fds[n].fd; ++n)
254 ;
255 assert(n < ev->mode[mode].nfds);
256 /* swap in the last fd and reduce the count */
257 if(n != ev->mode[mode].nfds - 1)
258 ev->mode[mode].fds[n] = ev->mode[mode].fds[ev->mode[mode].nfds - 1];
259 --ev->mode[mode].nfds;
260 /* if that was the biggest fd, find the new biggest one */
261 if(fd == ev->mode[mode].maxfd) {
262 maxfd = 0;
263 for(n = 0; n < ev->mode[mode].nfds; ++n)
264 if(ev->mode[mode].fds[n].fd > maxfd)
265 maxfd = ev->mode[mode].fds[n].fd;
266 ev->mode[mode].maxfd = maxfd;
267 }
268 /* don't tell select about this fd any more */
269 FD_CLR(fd, &ev->mode[mode].enabled);
270 ev->escape = 1;
271 return 0;
272}
273
274int ev_fd_enable(ev_source *ev, ev_fdmode mode, int fd) {
275 D(("enabling mode %s fd %d", modenames[mode], fd));
276 FD_SET(fd, &ev->mode[mode].enabled);
277 return 0;
278}
279
280int ev_fd_disable(ev_source *ev, ev_fdmode mode, int fd) {
281 D(("disabling mode %s fd %d", modenames[mode], fd));
282 FD_CLR(fd, &ev->mode[mode].enabled);
283 FD_CLR(fd, &ev->mode[mode].tripped);
284 return 0;
285}
286
287/* timeouts *******************************************************************/
288
289int ev_timeout(ev_source *ev,
290 ev_timeout_handle *handlep,
291 const struct timeval *when,
292 ev_timeout_callback *callback,
293 void *u) {
294 struct timeout *t, *p, **pp;
295
296 D(("registering timeout at %ld.%ld callback %p %p",
297 when ? (long)when->tv_sec : 0, when ? (long)when->tv_usec : 0,
298 (void *)callback, u));
299 t = xmalloc(sizeof *t);
300 if(when)
301 t->when = *when;
302 t->callback = callback;
303 t->u = u;
304 pp = &ev->timeouts;
305 while((p = *pp) && gt(&t->when, &p->when))
306 pp = &p->next;
307 t->next = p;
308 *pp = t;
309 if(handlep)
310 *handlep = t;
311 return 0;
312}
313
314int ev_timeout_cancel(ev_source *ev,
315 ev_timeout_handle handle) {
316 struct timeout *t = handle, *p, **pp;
317
318 for(pp = &ev->timeouts; (p = *pp) && p != t; pp = &p->next)
319 ;
320 if(p) {
321 *pp = p->next;
322 return 0;
323 } else
324 return -1;
325}
326
327/* signals ********************************************************************/
328
329static int sigfd[NSIG];
330
331static void sighandler(int s) {
332 unsigned char sc = s;
333 static const char errmsg[] = "error writing to signal pipe";
334
335 /* probably the reader has stopped listening for some reason */
336 if(write(sigfd[s], &sc, 1) < 0) {
337 write(2, errmsg, sizeof errmsg - 1);
338 abort();
339 }
340}
341
342static int signal_read(ev_source *ev,
343 int attribute((unused)) fd,
344 void attribute((unused)) *u) {
345 unsigned char s;
346 int n;
347 int ret;
348
349 if((n = read(ev->sigpipe[0], &s, 1)) == 1)
350 if((ret = ev->signals[s].callback(ev, s, ev->signals[s].u)))
351 return ret;
352 assert(n != 0);
353 if(n < 0 && (errno != EINTR && errno != EAGAIN)) {
354 error(errno, "error reading from signal pipe %d", ev->sigpipe[0]);
355 return -1;
356 }
357 return 0;
358}
359
360static void close_sigpipe(ev_source *ev) {
361 int save_errno = errno;
362
363 xclose(ev->sigpipe[0]);
364 xclose(ev->sigpipe[1]);
365 ev->sigpipe[0] = ev->sigpipe[1] = -1;
366 errno = save_errno;
367}
368
369int ev_signal(ev_source *ev,
370 int sig,
371 ev_signal_callback *callback,
372 void *u) {
373 int n;
374 struct sigaction sa;
375
376 D(("registering signal %d handler callback %p %p", sig, (void *)callback, u));
377 assert(sig > 0);
378 assert(sig < NSIG);
379 assert(sig <= UCHAR_MAX);
380 if(ev->sigpipe[0] == -1) {
381 D(("creating signal pipe"));
382 xpipe(ev->sigpipe);
383 D(("signal pipe is %d, %d", ev->sigpipe[0], ev->sigpipe[1]));
384 for(n = 0; n < 2; ++n) {
385 nonblock(ev->sigpipe[n]);
386 cloexec(ev->sigpipe[n]);
387 }
388 if(ev_fd(ev, ev_read, ev->sigpipe[0], signal_read, 0)) {
389 close_sigpipe(ev);
390 return -1;
391 }
392 }
393 sigaddset(&ev->sigmask, sig);
394 xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
395 sigfd[sig] = ev->sigpipe[1];
396 ev->signals[sig].callback = callback;
397 ev->signals[sig].u = u;
398 sa.sa_handler = sighandler;
399 sigfillset(&sa.sa_mask);
400 sa.sa_flags = SA_RESTART;
401 xsigaction(sig, &sa, &ev->signals[sig].oldsa);
402 ev->escape = 1;
403 return 0;
404}
405
406int ev_signal_cancel(ev_source *ev,
407 int sig) {
408 sigset_t ss;
409
410 xsigaction(sig, &ev->signals[sig].oldsa, 0);
411 ev->signals[sig].callback = 0;
412 ev->escape = 1;
413 sigdelset(&ev->sigmask, sig);
414 sigemptyset(&ss);
415 sigaddset(&ss, sig);
416 xsigprocmask(SIG_UNBLOCK, &ss, 0);
417 return 0;
418}
419
420void ev_signal_atfork(ev_source *ev) {
421 int sig;
422
423 if(ev->sigpipe[0] != -1) {
424 /* revert any handled signals to their original state */
425 for(sig = 1; sig < NSIG; ++sig) {
426 if(ev->signals[sig].callback != 0)
427 xsigaction(sig, &ev->signals[sig].oldsa, 0);
428 }
429 /* and then unblock them */
430 xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
431 /* don't want a copy of the signal pipe open inside the fork */
432 xclose(ev->sigpipe[0]);
433 xclose(ev->sigpipe[1]);
434 }
435}
436
437/* child processes ************************************************************/
438
439static int sigchld_callback(ev_source *ev,
440 int attribute((unused)) sig,
441 void attribute((unused)) *u) {
442 struct rusage ru;
443 pid_t r;
444 int status, n, ret, revisit;
445
446 do {
447 revisit = 0;
448 for(n = 0; n < ev->nchildren; ++n) {
449 r = wait4(ev->children[n].pid,
450 &status,
451 ev->children[n].options | WNOHANG,
452 &ru);
453 if(r > 0) {
454 ev_child_callback *c = ev->children[n].callback;
455 void *cu = ev->children[n].u;
456
457 if(WIFEXITED(status) || WIFSIGNALED(status))
458 ev_child_cancel(ev, r);
459 revisit = 1;
460 if((ret = c(ev, r, status, &ru, cu)))
461 return ret;
462 } else if(r < 0) {
463 /* We should "never" get an ECHILD but it can in fact happen. For
464 * instance on Linux 2.4.31, and probably other versions, if someone
465 * straces a child process and then a different child process
466 * terminates, when we wait4() the trace process we will get ECHILD
467 * because it has been reparented to strace. Obviously this is a
468 * hopeless design flaw in the tracing infrastructure, but we don't
469 * want the disorder server to bomb out because of it. So we just log
470 * the problem and ignore it.
471 */
472 error(errno, "error calling wait4 for PID %lu (broken ptrace?)",
473 (unsigned long)ev->children[n].pid);
474 if(errno != ECHILD)
475 return -1;
476 }
477 }
478 } while(revisit);
479 return 0;
480}
481
482int ev_child_setup(ev_source *ev) {
483 D(("installing SIGCHLD handler"));
484 return ev_signal(ev, SIGCHLD, sigchld_callback, 0);
485}
486
487int ev_child(ev_source *ev,
488 pid_t pid,
489 int options,
490 ev_child_callback *callback,
491 void *u) {
492 int n;
493
494 D(("registering child handling %ld options %d callback %p %p",
495 (long)pid, options, (void *)callback, u));
496 assert(ev->signals[SIGCHLD].callback == sigchld_callback);
497 if(ev->nchildren >= ev->nchildslots) {
498 ev->nchildslots = ev->nchildslots ? 2 * ev->nchildslots : 16;
499 ev->children = xrealloc(ev->children,
500 ev->nchildslots * sizeof (struct child));
501 }
502 n = ev->nchildren++;
503 ev->children[n].pid = pid;
504 ev->children[n].options = options;
505 ev->children[n].callback = callback;
506 ev->children[n].u = u;
507 return 0;
508}
509
510int ev_child_cancel(ev_source *ev,
511 pid_t pid) {
512 int n;
513
514 for(n = 0; n < ev->nchildren && ev->children[n].pid != pid; ++n)
515 ;
516 assert(n < ev->nchildren);
517 if(n != ev->nchildren - 1)
518 ev->children[n] = ev->children[ev->nchildren - 1];
519 --ev->nchildren;
520 return 0;
521}
522
523/* socket listeners ***********************************************************/
524
525struct listen_state {
526 ev_listen_callback *callback;
527 void *u;
528};
529
530static int listen_callback(ev_source *ev, int fd, void *u) {
531 const struct listen_state *l = u;
532 int newfd;
533 union {
534 struct sockaddr_in in;
535#if HAVE_STRUCT_SOCKADDR_IN6
536 struct sockaddr_in6 in6;
537#endif
538 struct sockaddr_un un;
539 struct sockaddr sa;
540 } addr;
541 socklen_t addrlen;
542 int ret;
543
544 D(("callback for listener fd %d", fd));
545 while((addrlen = sizeof addr),
546 (newfd = accept(fd, &addr.sa, &addrlen)) >= 0) {
547 if((ret = l->callback(ev, newfd, &addr.sa, addrlen, l->u)))
548 return ret;
549 }
550 switch(errno) {
551 case EINTR:
552 case EAGAIN:
553 break;
554#ifdef ECONNABORTED
555 case ECONNABORTED:
556 error(errno, "error calling accept");
557 break;
558#endif
559#ifdef EPROTO
560 case EPROTO:
561 /* XXX on some systems EPROTO should be fatal, but we don't know if
562 * we're running on one of them */
563 error(errno, "error calling accept");
564 break;
565#endif
566 default:
567 fatal(errno, "error calling accept");
568 break;
569 }
570 if(errno != EINTR && errno != EAGAIN)
571 error(errno, "error calling accept");
572 return 0;
573}
574
575int ev_listen(ev_source *ev,
576 int fd,
577 ev_listen_callback *callback,
578 void *u) {
579 struct listen_state *l = xmalloc(sizeof *l);
580
581 D(("registering listener fd %d callback %p %p", fd, (void *)callback, u));
582 l->callback = callback;
583 l->u = u;
584 return ev_fd(ev, ev_read, fd, listen_callback, l);
585}
586
587int ev_listen_cancel(ev_source *ev, int fd) {
588 D(("cancelling listener fd %d", fd));
589 return ev_fd_cancel(ev, ev_read, fd);
590}
591
592/* buffer *********************************************************************/
593
594struct buffer {
595 char *base, *start, *end, *top;
596};
597
598/* make sure there is @bytes@ available at @b->end@ */
599static void buffer_space(struct buffer *b, size_t bytes) {
600 D(("buffer_space %p %p %p %p want %lu",
601 (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top,
602 (unsigned long)bytes));
603 if(b->start == b->end)
604 b->start = b->end = b->base;
605 if((size_t)(b->top - b->end) < bytes) {
606 if((size_t)((b->top - b->end) + (b->start - b->base)) < bytes) {
607 size_t newspace = b->end - b->start + bytes, n;
608 char *newbase;
609
610 for(n = 16; n < newspace; n *= 2)
611 ;
612 newbase = xmalloc_noptr(n);
613 memcpy(newbase, b->start, b->end - b->start);
614 b->base = newbase;
615 b->end = newbase + (b->end - b->start);
616 b->top = newbase + n;
617 b->start = newbase; /* must be last */
618 } else {
619 memmove(b->base, b->start, b->end - b->start);
620 b->end = b->base + (b->end - b->start);
621 b->start = b->base;
622 }
623 }
624 D(("result %p %p %p %p",
625 (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top));
626}
627
628/* buffered writer ************************************************************/
629
630struct ev_writer {
631 struct sink s;
632 struct buffer b;
633 int fd;
634 int eof;
635 ev_error_callback *callback;
636 void *u;
637 ev_source *ev;
638};
639
640static int writer_callback(ev_source *ev, int fd, void *u) {
641 ev_writer *w = u;
642 int n;
643
644 n = write(fd, w->b.start, w->b.end - w->b.start);
645 D(("callback for writer fd %d, %ld bytes, n=%d, errno=%d",
646 fd, (long)(w->b.end - w->b.start), n, errno));
647 if(n >= 0) {
648 w->b.start += n;
649 if(w->b.start == w->b.end) {
650 if(w->eof) {
651 ev_fd_cancel(ev, ev_write, fd);
652 return w->callback(ev, fd, 0, w->u);
653 } else
654 ev_fd_disable(ev, ev_write, fd);
655 }
656 } else {
657 switch(errno) {
658 case EINTR:
659 case EAGAIN:
660 break;
661 default:
662 ev_fd_cancel(ev, ev_write, fd);
663 return w->callback(ev, fd, errno, w->u);
664 }
665 }
666 return 0;
667}
668
669static int ev_writer_write(struct sink *sk, const void *s, int n) {
670 ev_writer *w = (ev_writer *)sk;
671
672 buffer_space(&w->b, n);
673 if(w->b.start == w->b.end)
674 ev_fd_enable(w->ev, ev_write, w->fd);
675 memcpy(w->b.end, s, n);
676 w->b.end += n;
677 return 0;
678}
679
680ev_writer *ev_writer_new(ev_source *ev,
681 int fd,
682 ev_error_callback *callback,
683 void *u) {
684 ev_writer *w = xmalloc(sizeof *w);
685
686 D(("registering writer fd %d callback %p %p", fd, (void *)callback, u));
687 w->s.write = ev_writer_write;
688 w->fd = fd;
689 w->callback = callback;
690 w->u = u;
691 w->ev = ev;
692 if(ev_fd(ev, ev_write, fd, writer_callback, w))
693 return 0;
694 ev_fd_disable(ev, ev_write, fd);
695 return w;
696}
697
698struct sink *ev_writer_sink(ev_writer *w) {
699 return &w->s;
700}
701
702static int writer_shutdown(ev_source *ev,
703 const attribute((unused)) struct timeval *now,
704 void *u) {
705 ev_writer *w = u;
706
707 return w->callback(ev, w->fd, 0, w->u);
708}
709
710int ev_writer_close(ev_writer *w) {
711 D(("close writer fd %d", w->fd));
712 w->eof = 1;
713 if(w->b.start == w->b.end) {
714 /* we're already finished */
715 ev_fd_cancel(w->ev, ev_write, w->fd);
716 return ev_timeout(w->ev, 0, 0, writer_shutdown, w);
717 }
718 return 0;
719}
720
721int ev_writer_cancel(ev_writer *w) {
722 D(("cancel writer fd %d", w->fd));
723 return ev_fd_cancel(w->ev, ev_write, w->fd);
724}
725
726int ev_writer_flush(ev_writer *w) {
727 return writer_callback(w->ev, w->fd, w);
728}
729
730/* buffered reader ************************************************************/
731
732struct ev_reader {
733 struct buffer b;
734 int fd;
735 ev_reader_callback *callback;
736 ev_error_callback *error_callback;
737 void *u;
738 ev_source *ev;
739 int eof;
740};
741
742static int reader_callback(ev_source *ev, int fd, void *u) {
743 ev_reader *r = u;
744 int n;
745
746 buffer_space(&r->b, 1);
747 n = read(fd, r->b.end, r->b.top - r->b.end);
748 D(("read fd %d buffer %d returned %d errno %d",
749 fd, (int)(r->b.top - r->b.end), n, errno));
750 if(n > 0) {
751 r->b.end += n;
752 return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 0, r->u);
753 } else if(n == 0) {
754 r->eof = 1;
755 ev_fd_cancel(ev, ev_read, fd);
756 return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 1, r->u);
757 } else {
758 switch(errno) {
759 case EINTR:
760 case EAGAIN:
761 break;
762 default:
763 ev_fd_cancel(ev, ev_read, fd);
764 return r->error_callback(ev, fd, errno, r->u);
765 }
766 }
767 return 0;
768}
769
770ev_reader *ev_reader_new(ev_source *ev,
771 int fd,
772 ev_reader_callback *callback,
773 ev_error_callback *error_callback,
774 void *u) {
775 ev_reader *r = xmalloc(sizeof *r);
776
777 D(("registering reader fd %d callback %p %p %p",
778 fd, (void *)callback, (void *)error_callback, u));
779 r->fd = fd;
780 r->callback = callback;
781 r->error_callback = error_callback;
782 r->u = u;
783 r->ev = ev;
784 if(ev_fd(ev, ev_read, fd, reader_callback, r))
785 return 0;
786 return r;
787}
788
789void ev_reader_buffer(ev_reader *r, size_t nbytes) {
790 buffer_space(&r->b, nbytes - (r->b.end - r->b.start));
791}
792
793void ev_reader_consume(ev_reader *r, size_t n) {
794 r->b.start += n;
795}
796
797int ev_reader_cancel(ev_reader *r) {
798 D(("cancel reader fd %d", r->fd));
799 return ev_fd_cancel(r->ev, ev_read, r->fd);
800}
801
802int ev_reader_disable(ev_reader *r) {
803 D(("disable reader fd %d", r->fd));
804 return r->eof ? 0 : ev_fd_disable(r->ev, ev_read, r->fd);
805}
806
807static int reader_continuation(ev_source attribute((unused)) *ev,
808 const attribute((unused)) struct timeval *now,
809 void *u) {
810 ev_reader *r = u;
811
812 D(("reader continuation callback fd %d", r->fd));
813 if(ev_fd_enable(r->ev, ev_read, r->fd)) return -1;
814 return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
815}
816
817int ev_reader_incomplete(ev_reader *r) {
818 if(ev_fd_disable(r->ev, ev_read, r->fd)) return -1;
819 return ev_timeout(r->ev, 0, 0, reader_continuation, r);
820}
821
822static int reader_enabled(ev_source *ev,
823 const attribute((unused)) struct timeval *now,
824 void *u) {
825 ev_reader *r = u;
826
827 D(("reader enabled callback fd %d", r->fd));
828 return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
829}
830
831int ev_reader_enable(ev_reader *r) {
832 D(("enable reader fd %d", r->fd));
833 return ((r->eof ? 0 : ev_fd_enable(r->ev, ev_read, r->fd))
834 || ev_timeout(r->ev, 0, 0, reader_enabled, r)) ? -1 : 0;
835}
836
837/*
838Local Variables:
839c-basic-offset:2
840comment-column:40
841fill-column:79
842End:
843*/