chiark / gitweb /
Merge from Mark's branch.
[disorder] / lib / event.c
1 /*
2  * This file is part of DisOrder.
3  * Copyright (C) 2004, 2005 Richard Kettlewell
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA
19  */
20
21 #include <config.h>
22
23 #include <unistd.h>
24 #include <fcntl.h>
25 #include <sys/time.h>
26 #include <sys/types.h>
27 #include <sys/resource.h>
28 #include <sys/wait.h>
29 #include <unistd.h>
30 #include <assert.h>
31 #include <signal.h>
32 #include <errno.h>
33 #include <string.h>
34 #include <limits.h>
35 #include <sys/socket.h>
36 #include <netinet/in.h>
37 #include <sys/un.h>
38 #include <stdio.h>
39 #include "event.h"
40 #include "mem.h"
41 #include "log.h"
42 #include "syscalls.h"
43 #include "printf.h"
44 #include "sink.h"
45
46 struct timeout {
47   struct timeout *next;
48   struct timeval when;
49   ev_timeout_callback *callback;
50   void *u;
51   int resolve;
52 };
53
54 struct fd {
55   int fd;
56   ev_fd_callback *callback;
57   void *u;
58 };
59
60 struct fdmode {
61   fd_set enabled;
62   fd_set tripped;
63   int nfds, fdslots;
64   struct fd *fds;
65   int maxfd;
66 };
67
68 struct signal {
69   struct sigaction oldsa;
70   ev_signal_callback *callback;
71   void *u;
72 };
73
74 struct child {
75   pid_t pid;
76   int options;
77   ev_child_callback *callback;
78   void *u;
79 };
80
81 struct ev_source {
82   struct fdmode mode[ev_nmodes];
83   struct timeout *timeouts;
84   struct signal signals[NSIG];
85   sigset_t sigmask;
86   int escape;
87   int sigpipe[2];
88   int nchildren, nchildslots;
89   struct child *children;
90 };
91
92 static const char *modenames[] = { "read", "write", "except" };
93
94 /* utilities ******************************************************************/
95
96 static inline int gt(const struct timeval *a, const struct timeval *b) {
97   if(a->tv_sec > b->tv_sec)
98     return 1;
99   if(a->tv_sec == b->tv_sec
100      && a->tv_usec > b->tv_usec)
101     return 1;
102   return 0;
103 }
104
105 static inline int ge(const struct timeval *a, const struct timeval *b) {
106   return !gt(b, a);
107 }
108
109 /* creation *******************************************************************/
110
111 ev_source *ev_new(void) {
112   ev_source *ev = xmalloc(sizeof *ev);
113   int n;
114
115   memset(ev, 0, sizeof *ev);
116   for(n = 0; n < ev_nmodes; ++n)
117     FD_ZERO(&ev->mode[n].enabled);
118   ev->sigpipe[0] = ev->sigpipe[1] = -1;
119   sigemptyset(&ev->sigmask);
120   return ev;
121 }
122
123 /* event loop *****************************************************************/
124
125 int ev_run(ev_source *ev) {
126   for(;;) {
127     struct timeval now;
128     struct timeval delta;
129     int n, mode;
130     int ret;
131     int maxfd;
132     struct timeout *t, **tt;
133
134     xgettimeofday(&now, 0);
135     /* Handle timeouts.  We don't want to handle any timeouts that are added
136      * while we're handling them (otherwise we'd have to break out of infinite
137      * loops, preferrably without starving better-behaved subsystems).  Hence
138      * the slightly complicated two-phase approach here. */
139     for(t = ev->timeouts;
140         t && ge(&now, &t->when);
141         t = t->next) {
142       t->resolve = 1;
143       D(("calling timeout for %ld.%ld callback %p %p",
144          (long)t->when.tv_sec, (long)t->when.tv_usec,
145          (void *)t->callback, t->u));
146       ret = t->callback(ev, &now, t->u);
147       if(ret)
148         return ret;
149     }
150     tt = &ev->timeouts;
151     while((t = *tt)) {
152       if(t->resolve)
153         *tt = t->next;
154       else
155         tt = &t->next;
156     }
157     maxfd = 0;
158     for(mode = 0; mode < ev_nmodes; ++mode) {
159       ev->mode[mode].tripped = ev->mode[mode].enabled;
160       if(ev->mode[mode].maxfd > maxfd)
161         maxfd = ev->mode[mode].maxfd;
162     }
163     xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
164     do {
165       if(ev->timeouts) {
166         xgettimeofday(&now, 0);
167         delta.tv_sec = ev->timeouts->when.tv_sec - now.tv_sec;
168         delta.tv_usec = ev->timeouts->when.tv_usec - now.tv_usec;
169         if(delta.tv_usec < 0) {
170           delta.tv_usec += 1000000;
171           --delta.tv_sec;
172         }
173         if(delta.tv_sec < 0)
174           delta.tv_sec = delta.tv_usec = 0;
175         n = select(maxfd + 1,
176                    &ev->mode[ev_read].tripped,
177                    &ev->mode[ev_write].tripped,
178                    &ev->mode[ev_except].tripped,
179                    &delta);
180       } else {
181         n = select(maxfd + 1,
182                    &ev->mode[ev_read].tripped,
183                    &ev->mode[ev_write].tripped,
184                    &ev->mode[ev_except].tripped,
185                    0);
186       }
187     } while(n < 0 && errno == EINTR);
188     xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
189     if(n < 0) {
190       error(errno, "error calling select");
191       return -1;
192     }
193     if(n > 0) {
194       /* if anything deranges the meaning of an fd, or re-orders the
195        * fds[] tables, we'd better give up; such operations will
196        * therefore set @escape@. */
197       ev->escape = 0;
198       for(mode = 0; mode < ev_nmodes && !ev->escape; ++mode)
199         for(n = 0; n < ev->mode[mode].nfds && !ev->escape; ++n) {
200           int fd = ev->mode[mode].fds[n].fd;
201           if(FD_ISSET(fd, &ev->mode[mode].tripped)) {
202             D(("calling %s fd %d callback %p %p", modenames[mode], fd,
203                (void *)ev->mode[mode].fds[n].callback,
204                ev->mode[mode].fds[n].u));
205             ret = ev->mode[mode].fds[n].callback(ev, fd,
206                                                  ev->mode[mode].fds[n].u);
207             if(ret)
208               return ret;
209           }
210         }
211     }
212     /* we'll pick up timeouts back round the loop */
213   }
214 }
215
216 /* file descriptors ***********************************************************/
217
218 int ev_fd(ev_source *ev,
219           ev_fdmode mode,
220           int fd,
221           ev_fd_callback *callback,
222           void *u) {
223   int n;
224
225   D(("registering %s fd %d callback %p %p", modenames[mode], fd,
226      (void *)callback, u));
227   assert(mode < ev_nmodes);
228   if(ev->mode[mode].nfds >= ev->mode[mode].fdslots) {
229     ev->mode[mode].fdslots = (ev->mode[mode].fdslots
230                                ? 2 * ev->mode[mode].fdslots : 16);
231     D(("expanding %s fd table to %d entries", modenames[mode],
232        ev->mode[mode].fdslots));
233     ev->mode[mode].fds = xrealloc(ev->mode[mode].fds,
234                                   ev->mode[mode].fdslots * sizeof (struct fd));
235   }
236   n = ev->mode[mode].nfds++;
237   FD_SET(fd, &ev->mode[mode].enabled);
238   ev->mode[mode].fds[n].fd = fd;
239   ev->mode[mode].fds[n].callback = callback;
240   ev->mode[mode].fds[n].u = u;
241   if(fd > ev->mode[mode].maxfd)
242     ev->mode[mode].maxfd = fd;
243   ev->escape = 1;
244   return 0;
245 }
246
247 int ev_fd_cancel(ev_source *ev, ev_fdmode mode, int fd) {
248   int n;
249   int maxfd;
250
251   D(("cancelling mode %s fd %d", modenames[mode], fd));
252   /* find the right struct fd */
253   for(n = 0; n < ev->mode[mode].nfds && fd != ev->mode[mode].fds[n].fd; ++n)
254     ;
255   assert(n < ev->mode[mode].nfds);
256   /* swap in the last fd and reduce the count */
257   if(n != ev->mode[mode].nfds - 1)
258     ev->mode[mode].fds[n] = ev->mode[mode].fds[ev->mode[mode].nfds - 1];
259   --ev->mode[mode].nfds;
260   /* if that was the biggest fd, find the new biggest one */
261   if(fd == ev->mode[mode].maxfd) {
262     maxfd = 0;
263     for(n = 0; n < ev->mode[mode].nfds; ++n)
264       if(ev->mode[mode].fds[n].fd > maxfd)
265         maxfd = ev->mode[mode].fds[n].fd;
266     ev->mode[mode].maxfd = maxfd;
267   }
268   /* don't tell select about this fd any more */
269   FD_CLR(fd, &ev->mode[mode].enabled);
270   ev->escape = 1;
271   return 0;
272 }
273
274 int ev_fd_enable(ev_source *ev, ev_fdmode mode, int fd) {
275   D(("enabling mode %s fd %d", modenames[mode], fd));
276   FD_SET(fd, &ev->mode[mode].enabled);
277   return 0;
278 }
279
280 int ev_fd_disable(ev_source *ev, ev_fdmode mode, int fd) {
281   D(("disabling mode %s fd %d", modenames[mode], fd));
282   FD_CLR(fd, &ev->mode[mode].enabled);
283   FD_CLR(fd, &ev->mode[mode].tripped);
284   return 0;
285 }
286
287 /* timeouts *******************************************************************/
288
289 int ev_timeout(ev_source *ev,
290                ev_timeout_handle *handlep,
291                const struct timeval *when,
292                ev_timeout_callback *callback,
293                void *u) {
294   struct timeout *t, *p, **pp;
295
296   D(("registering timeout at %ld.%ld callback %p %p",
297      when ? (long)when->tv_sec : 0, when ? (long)when->tv_usec : 0,
298      (void *)callback, u));
299   t = xmalloc(sizeof *t);
300   if(when)
301     t->when = *when;
302   t->callback = callback;
303   t->u = u;
304   pp = &ev->timeouts;
305   while((p = *pp) && gt(&t->when, &p->when))
306     pp = &p->next;
307   t->next = p;
308   *pp = t;
309   if(handlep)
310     *handlep = t;
311   return 0;
312 }
313
314 int ev_timeout_cancel(ev_source *ev,
315                       ev_timeout_handle handle) {
316   struct timeout *t = handle, *p, **pp;
317
318   for(pp = &ev->timeouts; (p = *pp) && p != t; pp = &p->next)
319     ;
320   if(p) {
321     *pp = p->next;
322     return 0;
323   } else
324     return -1;
325 }
326
327 /* signals ********************************************************************/
328
329 static int sigfd[NSIG];
330
331 static void sighandler(int s) {
332   unsigned char sc = s;
333   static const char errmsg[] = "error writing to signal pipe";
334
335   /* probably the reader has stopped listening for some reason */
336   if(write(sigfd[s], &sc, 1) < 0) {
337     write(2, errmsg, sizeof errmsg - 1);
338     abort();
339   }
340 }
341
342 static int signal_read(ev_source *ev,
343                        int attribute((unused)) fd,
344                        void attribute((unused)) *u) {
345   unsigned char s;
346   int n;
347   int ret;
348
349   if((n = read(ev->sigpipe[0], &s, 1)) == 1)
350     if((ret = ev->signals[s].callback(ev, s, ev->signals[s].u)))
351       return ret;
352   assert(n != 0);
353   if(n < 0 && (errno != EINTR && errno != EAGAIN)) {
354     error(errno, "error reading from signal pipe %d", ev->sigpipe[0]);
355     return -1;
356   }
357   return 0;
358 }
359
360 static void close_sigpipe(ev_source *ev) {
361   int save_errno = errno;
362
363   xclose(ev->sigpipe[0]);
364   xclose(ev->sigpipe[1]);
365   ev->sigpipe[0] = ev->sigpipe[1] = -1;
366   errno = save_errno;
367 }
368
369 int ev_signal(ev_source *ev,
370               int sig,
371               ev_signal_callback *callback,
372               void *u) {
373   int n;
374   struct sigaction sa;
375
376   D(("registering signal %d handler callback %p %p", sig, (void *)callback, u));
377   assert(sig > 0);
378   assert(sig < NSIG);
379   assert(sig <= UCHAR_MAX);
380   if(ev->sigpipe[0] == -1) {
381     D(("creating signal pipe"));
382     xpipe(ev->sigpipe);
383     D(("signal pipe is %d, %d", ev->sigpipe[0], ev->sigpipe[1]));
384     for(n = 0; n < 2; ++n) {
385       nonblock(ev->sigpipe[n]);
386       cloexec(ev->sigpipe[n]);
387     }
388     if(ev_fd(ev, ev_read, ev->sigpipe[0], signal_read, 0)) {
389       close_sigpipe(ev);
390       return -1;
391     }
392   }
393   sigaddset(&ev->sigmask, sig);
394   xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
395   sigfd[sig] = ev->sigpipe[1];
396   ev->signals[sig].callback = callback;
397   ev->signals[sig].u = u;
398   sa.sa_handler = sighandler;
399   sigfillset(&sa.sa_mask);
400   sa.sa_flags = SA_RESTART;
401   xsigaction(sig, &sa, &ev->signals[sig].oldsa);
402   ev->escape = 1;
403   return 0;
404 }
405
406 int ev_signal_cancel(ev_source *ev,
407                      int sig) {
408   sigset_t ss;
409
410   xsigaction(sig, &ev->signals[sig].oldsa, 0);
411   ev->signals[sig].callback = 0;
412   ev->escape = 1;
413   sigdelset(&ev->sigmask, sig);
414   sigemptyset(&ss);
415   sigaddset(&ss, sig);
416   xsigprocmask(SIG_UNBLOCK, &ss, 0);
417   return 0;
418 }
419
420 void ev_signal_atfork(ev_source *ev) {
421   int sig;
422
423   if(ev->sigpipe[0] != -1) {
424     /* revert any handled signals to their original state */
425     for(sig = 1; sig < NSIG; ++sig) {
426       if(ev->signals[sig].callback != 0)
427         xsigaction(sig, &ev->signals[sig].oldsa, 0);
428     }
429     /* and then unblock them */
430     xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
431     /* don't want a copy of the signal pipe open inside the fork */
432     xclose(ev->sigpipe[0]);
433     xclose(ev->sigpipe[1]);
434   }
435 }
436
437 /* child processes ************************************************************/
438
439 static int sigchld_callback(ev_source *ev,
440                             int attribute((unused)) sig,
441                             void attribute((unused)) *u) {
442   struct rusage ru;
443   pid_t r;
444   int status, n, ret, revisit;
445
446   do {
447     revisit = 0;
448     for(n = 0; n < ev->nchildren; ++n) {
449       r = wait4(ev->children[n].pid,
450                 &status,
451                 ev->children[n].options | WNOHANG,
452                 &ru);
453       if(r > 0) {
454         ev_child_callback *c = ev->children[n].callback;
455         void *cu = ev->children[n].u;
456
457         if(WIFEXITED(status) || WIFSIGNALED(status))
458           ev_child_cancel(ev, r);
459         revisit = 1;
460         if((ret = c(ev, r, status, &ru, cu)))
461           return ret;
462       } else if(r < 0) {
463         /* We should "never" get an ECHILD but it can in fact happen.  For
464          * instance on Linux 2.4.31, and probably other versions, if someone
465          * straces a child process and then a different child process
466          * terminates, when we wait4() the trace process we will get ECHILD
467          * because it has been reparented to strace.  Obviously this is a
468          * hopeless design flaw in the tracing infrastructure, but we don't
469          * want the disorder server to bomb out because of it.  So we just log
470          * the problem and ignore it.
471          */
472         error(errno, "error calling wait4 for PID %lu (broken ptrace?)",
473               (unsigned long)ev->children[n].pid);
474         if(errno != ECHILD)
475           return -1;
476       }
477     }
478   } while(revisit);
479   return 0;
480 }
481
482 int ev_child_setup(ev_source *ev) {
483   D(("installing SIGCHLD handler"));
484   return ev_signal(ev, SIGCHLD, sigchld_callback, 0);
485 }
486
487 int ev_child(ev_source *ev,
488              pid_t pid,
489              int options,
490              ev_child_callback *callback,
491              void *u) {
492   int n;
493
494   D(("registering child handling %ld options %d callback %p %p",
495      (long)pid, options, (void *)callback, u));
496   assert(ev->signals[SIGCHLD].callback == sigchld_callback);
497   if(ev->nchildren >= ev->nchildslots) {
498     ev->nchildslots = ev->nchildslots ? 2 * ev->nchildslots : 16;
499     ev->children = xrealloc(ev->children,
500                             ev->nchildslots * sizeof (struct child));
501   }
502   n = ev->nchildren++;
503   ev->children[n].pid = pid;
504   ev->children[n].options = options;
505   ev->children[n].callback = callback;
506   ev->children[n].u = u;
507   return 0;
508 }
509
510 int ev_child_cancel(ev_source *ev,
511                     pid_t pid) {
512   int n;
513
514   for(n = 0; n < ev->nchildren && ev->children[n].pid != pid; ++n)
515     ;
516   assert(n < ev->nchildren);
517   if(n != ev->nchildren - 1)
518     ev->children[n] = ev->children[ev->nchildren - 1];
519   --ev->nchildren;
520   return 0;
521 }
522
523 /* socket listeners ***********************************************************/
524
525 struct listen_state {
526   ev_listen_callback *callback;
527   void *u;
528 };
529
530 static int listen_callback(ev_source *ev, int fd, void *u) {
531   const struct listen_state *l = u;
532   int newfd;
533   union {
534     struct sockaddr_in in;
535 #if HAVE_STRUCT_SOCKADDR_IN6
536     struct sockaddr_in6 in6;
537 #endif
538     struct sockaddr_un un;
539     struct sockaddr sa;
540   } addr;
541   socklen_t addrlen;
542   int ret;
543
544   D(("callback for listener fd %d", fd));
545   while((addrlen = sizeof addr),
546         (newfd = accept(fd, &addr.sa, &addrlen)) >= 0) {
547     if((ret = l->callback(ev, newfd, &addr.sa, addrlen, l->u)))
548       return ret;
549   }
550   switch(errno) {
551   case EINTR:
552   case EAGAIN:
553     break;
554 #ifdef ECONNABORTED
555   case ECONNABORTED:
556     error(errno, "error calling accept");
557     break;
558 #endif
559 #ifdef EPROTO
560   case EPROTO:
561     /* XXX on some systems EPROTO should be fatal, but we don't know if
562      * we're running on one of them */
563     error(errno, "error calling accept");
564     break;
565 #endif
566   default:
567     fatal(errno, "error calling accept");
568     break;
569   }
570   if(errno != EINTR && errno != EAGAIN)
571     error(errno, "error calling accept");
572   return 0;
573 }
574
575 int ev_listen(ev_source *ev,
576               int fd,
577               ev_listen_callback *callback,
578               void *u) {
579   struct listen_state *l = xmalloc(sizeof *l);
580
581   D(("registering listener fd %d callback %p %p", fd, (void *)callback, u));
582   l->callback = callback;
583   l->u = u;
584   return ev_fd(ev, ev_read, fd, listen_callback, l);
585 }
586
587 int ev_listen_cancel(ev_source *ev, int fd) {
588   D(("cancelling listener fd %d", fd));
589   return ev_fd_cancel(ev, ev_read, fd);
590 }
591
592 /* buffer *********************************************************************/
593
594 struct buffer {
595   char *base, *start, *end, *top;
596 };
597
598 /* make sure there is @bytes@ available at @b->end@ */
599 static void buffer_space(struct buffer *b, size_t bytes) {
600   D(("buffer_space %p %p %p %p want %lu",
601      (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top,
602      (unsigned long)bytes));
603   if(b->start == b->end)
604     b->start = b->end = b->base;
605   if((size_t)(b->top - b->end) < bytes) {
606     if((size_t)((b->top - b->end) + (b->start - b->base)) < bytes) {
607       size_t newspace = b->end - b->start + bytes, n;
608       char *newbase;
609
610       for(n = 16; n < newspace; n *= 2)
611         ;
612       newbase = xmalloc_noptr(n);
613       memcpy(newbase, b->start, b->end - b->start);
614       b->base = newbase;
615       b->end = newbase + (b->end - b->start);
616       b->top = newbase + n;
617       b->start = newbase;               /* must be last */
618     } else {
619       memmove(b->base, b->start, b->end - b->start);
620       b->end = b->base + (b->end - b->start);
621       b->start = b->base;
622     }
623   }
624   D(("result %p %p %p %p",
625      (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top));
626 }
627
628 /* buffered writer ************************************************************/
629
630 struct ev_writer {
631   struct sink s;
632   struct buffer b;
633   int fd;
634   int eof;
635   ev_error_callback *callback;
636   void *u;
637   ev_source *ev;
638 };
639
640 static int writer_callback(ev_source *ev, int fd, void *u) {
641   ev_writer *w = u;
642   int n;
643
644   n = write(fd, w->b.start, w->b.end - w->b.start);
645   D(("callback for writer fd %d, %ld bytes, n=%d, errno=%d",
646      fd, (long)(w->b.end - w->b.start), n, errno));
647   if(n >= 0) {
648     w->b.start += n;
649     if(w->b.start == w->b.end) {
650       if(w->eof) {
651         ev_fd_cancel(ev, ev_write, fd);
652         return w->callback(ev, fd, 0, w->u);
653       } else
654         ev_fd_disable(ev, ev_write, fd);
655     }
656   } else {
657     switch(errno) {
658     case EINTR:
659     case EAGAIN:
660       break;
661     default:
662       ev_fd_cancel(ev, ev_write, fd);
663       return w->callback(ev, fd, errno, w->u);
664     }
665   }
666   return 0;
667 }
668
669 static int ev_writer_write(struct sink *sk, const void *s, int n) {
670   ev_writer *w = (ev_writer *)sk;
671   
672   buffer_space(&w->b, n);
673   if(w->b.start == w->b.end)
674     ev_fd_enable(w->ev, ev_write, w->fd);
675   memcpy(w->b.end, s, n);
676   w->b.end += n;
677   return 0;
678 }
679
680 ev_writer *ev_writer_new(ev_source *ev,
681                          int fd,
682                          ev_error_callback *callback,
683                          void *u) {
684   ev_writer *w = xmalloc(sizeof *w);
685
686   D(("registering writer fd %d callback %p %p", fd, (void *)callback, u));
687   w->s.write = ev_writer_write;
688   w->fd = fd;
689   w->callback = callback;
690   w->u = u;
691   w->ev = ev;
692   if(ev_fd(ev, ev_write, fd, writer_callback, w))
693     return 0;
694   ev_fd_disable(ev, ev_write, fd);
695   return w;
696 }
697
698 struct sink *ev_writer_sink(ev_writer *w) {
699   return &w->s;
700 }
701
702 static int writer_shutdown(ev_source *ev,
703                            const attribute((unused)) struct timeval *now,
704                            void *u) {
705   ev_writer *w = u;
706
707   return w->callback(ev, w->fd, 0, w->u);
708 }
709
710 int ev_writer_close(ev_writer *w) {
711   D(("close writer fd %d", w->fd));
712   w->eof = 1;
713   if(w->b.start == w->b.end) {
714     /* we're already finished */
715     ev_fd_cancel(w->ev, ev_write, w->fd);
716     return ev_timeout(w->ev, 0, 0, writer_shutdown, w);
717   }
718   return 0;
719 }
720
721 int ev_writer_cancel(ev_writer *w) {
722   D(("cancel writer fd %d", w->fd));
723   return ev_fd_cancel(w->ev, ev_write, w->fd);
724 }
725
726 int ev_writer_flush(ev_writer *w) {
727   return writer_callback(w->ev, w->fd, w);
728 }
729
730 /* buffered reader ************************************************************/
731
732 struct ev_reader {
733   struct buffer b;
734   int fd;
735   ev_reader_callback *callback;
736   ev_error_callback *error_callback;
737   void *u;
738   ev_source *ev;
739   int eof;
740 };
741
742 static int reader_callback(ev_source *ev, int fd, void *u) {
743   ev_reader *r = u;
744   int n;
745
746   buffer_space(&r->b, 1);
747   n = read(fd, r->b.end, r->b.top - r->b.end);
748   D(("read fd %d buffer %d returned %d errno %d",
749      fd, (int)(r->b.top - r->b.end), n, errno));
750   if(n > 0) {
751     r->b.end += n;
752     return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 0, r->u);
753   } else if(n == 0) {
754     r->eof = 1;
755     ev_fd_cancel(ev, ev_read, fd);
756     return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 1, r->u);
757   } else {
758     switch(errno) {
759     case EINTR:
760     case EAGAIN:
761       break;
762     default:
763       ev_fd_cancel(ev, ev_read, fd);
764       return r->error_callback(ev, fd, errno, r->u);
765     }
766   }
767   return 0;
768 }
769
770 ev_reader *ev_reader_new(ev_source *ev,
771                          int fd,
772                          ev_reader_callback *callback,
773                          ev_error_callback *error_callback,
774                          void *u) {
775   ev_reader *r = xmalloc(sizeof *r);
776
777   D(("registering reader fd %d callback %p %p %p",
778      fd, (void *)callback, (void *)error_callback, u));
779   r->fd = fd;
780   r->callback = callback;
781   r->error_callback = error_callback;
782   r->u = u;
783   r->ev = ev;
784   if(ev_fd(ev, ev_read, fd, reader_callback, r))
785     return 0;
786   return r;
787 }
788
789 void ev_reader_buffer(ev_reader *r, size_t nbytes) {
790   buffer_space(&r->b, nbytes - (r->b.end - r->b.start));
791 }
792
793 void ev_reader_consume(ev_reader *r, size_t n) {
794   r->b.start += n;
795 }
796
797 int ev_reader_cancel(ev_reader *r) {
798   D(("cancel reader fd %d", r->fd));
799   return ev_fd_cancel(r->ev, ev_read, r->fd);
800 }
801
802 int ev_reader_disable(ev_reader *r) {
803   D(("disable reader fd %d", r->fd));
804   return r->eof ? 0 : ev_fd_disable(r->ev, ev_read, r->fd);
805 }
806
807 static int reader_continuation(ev_source attribute((unused)) *ev,
808                                const attribute((unused)) struct timeval *now,
809                                void *u) {
810   ev_reader *r = u;
811
812   D(("reader continuation callback fd %d", r->fd));
813   if(ev_fd_enable(r->ev, ev_read, r->fd)) return -1;
814   return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
815 }
816
817 int ev_reader_incomplete(ev_reader *r) {
818   if(ev_fd_disable(r->ev, ev_read, r->fd)) return -1;
819   return ev_timeout(r->ev, 0, 0, reader_continuation, r);
820 }
821
822 static int reader_enabled(ev_source *ev,
823                           const attribute((unused)) struct timeval *now,
824                           void *u) {
825   ev_reader *r = u;
826
827   D(("reader enabled callback fd %d", r->fd));
828   return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
829 }
830
831 int ev_reader_enable(ev_reader *r) {
832   D(("enable reader fd %d", r->fd));
833   return ((r->eof ? 0 : ev_fd_enable(r->ev, ev_read, r->fd))
834           || ev_timeout(r->ev, 0, 0, reader_enabled, r)) ? -1 : 0;
835 }
836
837 /*
838 Local Variables:
839 c-basic-offset:2
840 comment-column:40
841 fill-column:79
842 End:
843 */