chiark / gitweb /
chattier event loop error logging
[disorder] / lib / event.c
1 /*
2  * This file is part of DisOrder.
3  * Copyright (C) 2004, 2005, 2007 Richard Kettlewell
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA
19  */
20
21 #include <config.h>
22
23 #include <unistd.h>
24 #include <fcntl.h>
25 #include <sys/time.h>
26 #include <sys/types.h>
27 #include <sys/resource.h>
28 #include <sys/wait.h>
29 #include <unistd.h>
30 #include <assert.h>
31 #include <signal.h>
32 #include <errno.h>
33 #include <string.h>
34 #include <limits.h>
35 #include <sys/socket.h>
36 #include <netinet/in.h>
37 #include <sys/un.h>
38 #include <stdio.h>
39 #include "event.h"
40 #include "mem.h"
41 #include "log.h"
42 #include "syscalls.h"
43 #include "printf.h"
44 #include "sink.h"
45
46 struct timeout {
47   struct timeout *next;
48   struct timeval when;
49   ev_timeout_callback *callback;
50   void *u;
51   int resolve;
52 };
53
54 struct fd {
55   int fd;
56   ev_fd_callback *callback;
57   void *u;
58   const char *what;
59 };
60
61 struct fdmode {
62   fd_set enabled;
63   fd_set tripped;
64   int nfds, fdslots;
65   struct fd *fds;
66   int maxfd;
67 };
68
69 struct signal {
70   struct sigaction oldsa;
71   ev_signal_callback *callback;
72   void *u;
73 };
74
75 struct child {
76   pid_t pid;
77   int options;
78   ev_child_callback *callback;
79   void *u;
80 };
81
82 struct ev_source {
83   struct fdmode mode[ev_nmodes];
84   struct timeout *timeouts;
85   struct signal signals[NSIG];
86   sigset_t sigmask;
87   int escape;
88   int sigpipe[2];
89   int nchildren, nchildslots;
90   struct child *children;
91 };
92
93 static const char *modenames[] = { "read", "write", "except" };
94
95 /* utilities ******************************************************************/
96
97 static inline int gt(const struct timeval *a, const struct timeval *b) {
98   if(a->tv_sec > b->tv_sec)
99     return 1;
100   if(a->tv_sec == b->tv_sec
101      && a->tv_usec > b->tv_usec)
102     return 1;
103   return 0;
104 }
105
106 static inline int ge(const struct timeval *a, const struct timeval *b) {
107   return !gt(b, a);
108 }
109
110 /* creation *******************************************************************/
111
112 ev_source *ev_new(void) {
113   ev_source *ev = xmalloc(sizeof *ev);
114   int n;
115
116   memset(ev, 0, sizeof *ev);
117   for(n = 0; n < ev_nmodes; ++n)
118     FD_ZERO(&ev->mode[n].enabled);
119   ev->sigpipe[0] = ev->sigpipe[1] = -1;
120   sigemptyset(&ev->sigmask);
121   return ev;
122 }
123
124 /* event loop *****************************************************************/
125
126 int ev_run(ev_source *ev) {
127   for(;;) {
128     struct timeval now;
129     struct timeval delta;
130     int n, mode;
131     int ret;
132     int maxfd;
133     struct timeout *t, **tt;
134     struct stat sb;
135
136     xgettimeofday(&now, 0);
137     /* Handle timeouts.  We don't want to handle any timeouts that are added
138      * while we're handling them (otherwise we'd have to break out of infinite
139      * loops, preferrably without starving better-behaved subsystems).  Hence
140      * the slightly complicated two-phase approach here. */
141     for(t = ev->timeouts;
142         t && ge(&now, &t->when);
143         t = t->next) {
144       t->resolve = 1;
145       D(("calling timeout for %ld.%ld callback %p %p",
146          (long)t->when.tv_sec, (long)t->when.tv_usec,
147          (void *)t->callback, t->u));
148       ret = t->callback(ev, &now, t->u);
149       if(ret)
150         return ret;
151     }
152     tt = &ev->timeouts;
153     while((t = *tt)) {
154       if(t->resolve)
155         *tt = t->next;
156       else
157         tt = &t->next;
158     }
159     maxfd = 0;
160     for(mode = 0; mode < ev_nmodes; ++mode) {
161       ev->mode[mode].tripped = ev->mode[mode].enabled;
162       if(ev->mode[mode].maxfd > maxfd)
163         maxfd = ev->mode[mode].maxfd;
164     }
165     xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
166     do {
167       if(ev->timeouts) {
168         xgettimeofday(&now, 0);
169         delta.tv_sec = ev->timeouts->when.tv_sec - now.tv_sec;
170         delta.tv_usec = ev->timeouts->when.tv_usec - now.tv_usec;
171         if(delta.tv_usec < 0) {
172           delta.tv_usec += 1000000;
173           --delta.tv_sec;
174         }
175         if(delta.tv_sec < 0)
176           delta.tv_sec = delta.tv_usec = 0;
177         n = select(maxfd + 1,
178                    &ev->mode[ev_read].tripped,
179                    &ev->mode[ev_write].tripped,
180                    &ev->mode[ev_except].tripped,
181                    &delta);
182       } else {
183         n = select(maxfd + 1,
184                    &ev->mode[ev_read].tripped,
185                    &ev->mode[ev_write].tripped,
186                    &ev->mode[ev_except].tripped,
187                    0);
188       }
189     } while(n < 0 && errno == EINTR);
190     xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
191     if(n < 0) {
192       error(errno, "error calling select");
193       if(errno == EBADF) {
194         /* If there's a bad FD in the mix then check them all and log what we
195          * find, to ease debugging */
196         for(mode = 0; mode < ev_nmodes; ++mode) {
197           for(n = 0; n < ev->mode[mode].nfds; ++n) {
198             const int fd = ev->mode[mode].fds[n].fd;
199
200             if(FD_ISSET(fd, &ev->mode[mode].enabled)
201                && fstat(fd, &sb) < 0)
202               error(errno, "fstat %d (%s)", fd, ev->mode[mode].fds[n].what);
203           }
204         }
205       }
206       return -1;
207     }
208     if(n > 0) {
209       /* if anything deranges the meaning of an fd, or re-orders the
210        * fds[] tables, we'd better give up; such operations will
211        * therefore set @escape@. */
212       ev->escape = 0;
213       for(mode = 0; mode < ev_nmodes && !ev->escape; ++mode)
214         for(n = 0; n < ev->mode[mode].nfds && !ev->escape; ++n) {
215           int fd = ev->mode[mode].fds[n].fd;
216           if(FD_ISSET(fd, &ev->mode[mode].tripped)) {
217             D(("calling %s fd %d callback %p %p", modenames[mode], fd,
218                (void *)ev->mode[mode].fds[n].callback,
219                ev->mode[mode].fds[n].u));
220             ret = ev->mode[mode].fds[n].callback(ev, fd,
221                                                  ev->mode[mode].fds[n].u);
222             if(ret)
223               return ret;
224           }
225         }
226     }
227     /* we'll pick up timeouts back round the loop */
228   }
229 }
230
231 /* file descriptors ***********************************************************/
232
233 int ev_fd(ev_source *ev,
234           ev_fdmode mode,
235           int fd,
236           ev_fd_callback *callback,
237           void *u,
238           const char *what) {
239   int n;
240
241   D(("registering %s fd %d callback %p %p", modenames[mode], fd,
242      (void *)callback, u));
243   assert(mode < ev_nmodes);
244   if(ev->mode[mode].nfds >= ev->mode[mode].fdslots) {
245     ev->mode[mode].fdslots = (ev->mode[mode].fdslots
246                                ? 2 * ev->mode[mode].fdslots : 16);
247     D(("expanding %s fd table to %d entries", modenames[mode],
248        ev->mode[mode].fdslots));
249     ev->mode[mode].fds = xrealloc(ev->mode[mode].fds,
250                                   ev->mode[mode].fdslots * sizeof (struct fd));
251   }
252   n = ev->mode[mode].nfds++;
253   FD_SET(fd, &ev->mode[mode].enabled);
254   ev->mode[mode].fds[n].fd = fd;
255   ev->mode[mode].fds[n].callback = callback;
256   ev->mode[mode].fds[n].u = u;
257   ev->mode[mode].fds[n].what = what;
258   if(fd > ev->mode[mode].maxfd)
259     ev->mode[mode].maxfd = fd;
260   ev->escape = 1;
261   return 0;
262 }
263
264 int ev_fd_cancel(ev_source *ev, ev_fdmode mode, int fd) {
265   int n;
266   int maxfd;
267
268   D(("cancelling mode %s fd %d", modenames[mode], fd));
269   /* find the right struct fd */
270   for(n = 0; n < ev->mode[mode].nfds && fd != ev->mode[mode].fds[n].fd; ++n)
271     ;
272   assert(n < ev->mode[mode].nfds);
273   /* swap in the last fd and reduce the count */
274   if(n != ev->mode[mode].nfds - 1)
275     ev->mode[mode].fds[n] = ev->mode[mode].fds[ev->mode[mode].nfds - 1];
276   --ev->mode[mode].nfds;
277   /* if that was the biggest fd, find the new biggest one */
278   if(fd == ev->mode[mode].maxfd) {
279     maxfd = 0;
280     for(n = 0; n < ev->mode[mode].nfds; ++n)
281       if(ev->mode[mode].fds[n].fd > maxfd)
282         maxfd = ev->mode[mode].fds[n].fd;
283     ev->mode[mode].maxfd = maxfd;
284   }
285   /* don't tell select about this fd any more */
286   FD_CLR(fd, &ev->mode[mode].enabled);
287   ev->escape = 1;
288   return 0;
289 }
290
291 int ev_fd_enable(ev_source *ev, ev_fdmode mode, int fd) {
292   D(("enabling mode %s fd %d", modenames[mode], fd));
293   FD_SET(fd, &ev->mode[mode].enabled);
294   return 0;
295 }
296
297 int ev_fd_disable(ev_source *ev, ev_fdmode mode, int fd) {
298   D(("disabling mode %s fd %d", modenames[mode], fd));
299   FD_CLR(fd, &ev->mode[mode].enabled);
300   FD_CLR(fd, &ev->mode[mode].tripped);
301   return 0;
302 }
303
304 /* timeouts *******************************************************************/
305
306 int ev_timeout(ev_source *ev,
307                ev_timeout_handle *handlep,
308                const struct timeval *when,
309                ev_timeout_callback *callback,
310                void *u) {
311   struct timeout *t, *p, **pp;
312
313   D(("registering timeout at %ld.%ld callback %p %p",
314      when ? (long)when->tv_sec : 0, when ? (long)when->tv_usec : 0,
315      (void *)callback, u));
316   t = xmalloc(sizeof *t);
317   if(when)
318     t->when = *when;
319   t->callback = callback;
320   t->u = u;
321   pp = &ev->timeouts;
322   while((p = *pp) && gt(&t->when, &p->when))
323     pp = &p->next;
324   t->next = p;
325   *pp = t;
326   if(handlep)
327     *handlep = t;
328   return 0;
329 }
330
331 int ev_timeout_cancel(ev_source *ev,
332                       ev_timeout_handle handle) {
333   struct timeout *t = handle, *p, **pp;
334
335   for(pp = &ev->timeouts; (p = *pp) && p != t; pp = &p->next)
336     ;
337   if(p) {
338     *pp = p->next;
339     return 0;
340   } else
341     return -1;
342 }
343
344 /* signals ********************************************************************/
345
346 static int sigfd[NSIG];
347
348 static void sighandler(int s) {
349   unsigned char sc = s;
350   static const char errmsg[] = "error writing to signal pipe";
351
352   /* probably the reader has stopped listening for some reason */
353   if(write(sigfd[s], &sc, 1) < 0) {
354     write(2, errmsg, sizeof errmsg - 1);
355     abort();
356   }
357 }
358
359 static int signal_read(ev_source *ev,
360                        int attribute((unused)) fd,
361                        void attribute((unused)) *u) {
362   unsigned char s;
363   int n;
364   int ret;
365
366   if((n = read(ev->sigpipe[0], &s, 1)) == 1)
367     if((ret = ev->signals[s].callback(ev, s, ev->signals[s].u)))
368       return ret;
369   assert(n != 0);
370   if(n < 0 && (errno != EINTR && errno != EAGAIN)) {
371     error(errno, "error reading from signal pipe %d", ev->sigpipe[0]);
372     return -1;
373   }
374   return 0;
375 }
376
377 static void close_sigpipe(ev_source *ev) {
378   int save_errno = errno;
379
380   xclose(ev->sigpipe[0]);
381   xclose(ev->sigpipe[1]);
382   ev->sigpipe[0] = ev->sigpipe[1] = -1;
383   errno = save_errno;
384 }
385
386 int ev_signal(ev_source *ev,
387               int sig,
388               ev_signal_callback *callback,
389               void *u) {
390   int n;
391   struct sigaction sa;
392
393   D(("registering signal %d handler callback %p %p", sig, (void *)callback, u));
394   assert(sig > 0);
395   assert(sig < NSIG);
396   assert(sig <= UCHAR_MAX);
397   if(ev->sigpipe[0] == -1) {
398     D(("creating signal pipe"));
399     xpipe(ev->sigpipe);
400     D(("signal pipe is %d, %d", ev->sigpipe[0], ev->sigpipe[1]));
401     for(n = 0; n < 2; ++n) {
402       nonblock(ev->sigpipe[n]);
403       cloexec(ev->sigpipe[n]);
404     }
405     if(ev_fd(ev, ev_read, ev->sigpipe[0], signal_read, 0, "sigpipe read")) {
406       close_sigpipe(ev);
407       return -1;
408     }
409   }
410   sigaddset(&ev->sigmask, sig);
411   xsigprocmask(SIG_BLOCK, &ev->sigmask, 0);
412   sigfd[sig] = ev->sigpipe[1];
413   ev->signals[sig].callback = callback;
414   ev->signals[sig].u = u;
415   sa.sa_handler = sighandler;
416   sigfillset(&sa.sa_mask);
417   sa.sa_flags = SA_RESTART;
418   xsigaction(sig, &sa, &ev->signals[sig].oldsa);
419   ev->escape = 1;
420   return 0;
421 }
422
423 int ev_signal_cancel(ev_source *ev,
424                      int sig) {
425   sigset_t ss;
426
427   xsigaction(sig, &ev->signals[sig].oldsa, 0);
428   ev->signals[sig].callback = 0;
429   ev->escape = 1;
430   sigdelset(&ev->sigmask, sig);
431   sigemptyset(&ss);
432   sigaddset(&ss, sig);
433   xsigprocmask(SIG_UNBLOCK, &ss, 0);
434   return 0;
435 }
436
437 void ev_signal_atfork(ev_source *ev) {
438   int sig;
439
440   if(ev->sigpipe[0] != -1) {
441     /* revert any handled signals to their original state */
442     for(sig = 1; sig < NSIG; ++sig) {
443       if(ev->signals[sig].callback != 0)
444         xsigaction(sig, &ev->signals[sig].oldsa, 0);
445     }
446     /* and then unblock them */
447     xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0);
448     /* don't want a copy of the signal pipe open inside the fork */
449     xclose(ev->sigpipe[0]);
450     xclose(ev->sigpipe[1]);
451   }
452 }
453
454 /* child processes ************************************************************/
455
456 static int sigchld_callback(ev_source *ev,
457                             int attribute((unused)) sig,
458                             void attribute((unused)) *u) {
459   struct rusage ru;
460   pid_t r;
461   int status, n, ret, revisit;
462
463   do {
464     revisit = 0;
465     for(n = 0; n < ev->nchildren; ++n) {
466       r = wait4(ev->children[n].pid,
467                 &status,
468                 ev->children[n].options | WNOHANG,
469                 &ru);
470       if(r > 0) {
471         ev_child_callback *c = ev->children[n].callback;
472         void *cu = ev->children[n].u;
473
474         if(WIFEXITED(status) || WIFSIGNALED(status))
475           ev_child_cancel(ev, r);
476         revisit = 1;
477         if((ret = c(ev, r, status, &ru, cu)))
478           return ret;
479       } else if(r < 0) {
480         /* We should "never" get an ECHILD but it can in fact happen.  For
481          * instance on Linux 2.4.31, and probably other versions, if someone
482          * straces a child process and then a different child process
483          * terminates, when we wait4() the trace process we will get ECHILD
484          * because it has been reparented to strace.  Obviously this is a
485          * hopeless design flaw in the tracing infrastructure, but we don't
486          * want the disorder server to bomb out because of it.  So we just log
487          * the problem and ignore it.
488          */
489         error(errno, "error calling wait4 for PID %lu (broken ptrace?)",
490               (unsigned long)ev->children[n].pid);
491         if(errno != ECHILD)
492           return -1;
493       }
494     }
495   } while(revisit);
496   return 0;
497 }
498
499 int ev_child_setup(ev_source *ev) {
500   D(("installing SIGCHLD handler"));
501   return ev_signal(ev, SIGCHLD, sigchld_callback, 0);
502 }
503
504 int ev_child(ev_source *ev,
505              pid_t pid,
506              int options,
507              ev_child_callback *callback,
508              void *u) {
509   int n;
510
511   D(("registering child handling %ld options %d callback %p %p",
512      (long)pid, options, (void *)callback, u));
513   assert(ev->signals[SIGCHLD].callback == sigchld_callback);
514   if(ev->nchildren >= ev->nchildslots) {
515     ev->nchildslots = ev->nchildslots ? 2 * ev->nchildslots : 16;
516     ev->children = xrealloc(ev->children,
517                             ev->nchildslots * sizeof (struct child));
518   }
519   n = ev->nchildren++;
520   ev->children[n].pid = pid;
521   ev->children[n].options = options;
522   ev->children[n].callback = callback;
523   ev->children[n].u = u;
524   return 0;
525 }
526
527 int ev_child_cancel(ev_source *ev,
528                     pid_t pid) {
529   int n;
530
531   for(n = 0; n < ev->nchildren && ev->children[n].pid != pid; ++n)
532     ;
533   assert(n < ev->nchildren);
534   if(n != ev->nchildren - 1)
535     ev->children[n] = ev->children[ev->nchildren - 1];
536   --ev->nchildren;
537   return 0;
538 }
539
540 /* socket listeners ***********************************************************/
541
542 struct listen_state {
543   ev_listen_callback *callback;
544   void *u;
545 };
546
547 static int listen_callback(ev_source *ev, int fd, void *u) {
548   const struct listen_state *l = u;
549   int newfd;
550   union {
551     struct sockaddr_in in;
552 #if HAVE_STRUCT_SOCKADDR_IN6
553     struct sockaddr_in6 in6;
554 #endif
555     struct sockaddr_un un;
556     struct sockaddr sa;
557   } addr;
558   socklen_t addrlen;
559   int ret;
560
561   D(("callback for listener fd %d", fd));
562   while((addrlen = sizeof addr),
563         (newfd = accept(fd, &addr.sa, &addrlen)) >= 0) {
564     if((ret = l->callback(ev, newfd, &addr.sa, addrlen, l->u)))
565       return ret;
566   }
567   switch(errno) {
568   case EINTR:
569   case EAGAIN:
570     break;
571 #ifdef ECONNABORTED
572   case ECONNABORTED:
573     error(errno, "error calling accept");
574     break;
575 #endif
576 #ifdef EPROTO
577   case EPROTO:
578     /* XXX on some systems EPROTO should be fatal, but we don't know if
579      * we're running on one of them */
580     error(errno, "error calling accept");
581     break;
582 #endif
583   default:
584     fatal(errno, "error calling accept");
585     break;
586   }
587   if(errno != EINTR && errno != EAGAIN)
588     error(errno, "error calling accept");
589   return 0;
590 }
591
592 int ev_listen(ev_source *ev,
593               int fd,
594               ev_listen_callback *callback,
595               void *u,
596               const char *what) {
597   struct listen_state *l = xmalloc(sizeof *l);
598
599   D(("registering listener fd %d callback %p %p", fd, (void *)callback, u));
600   l->callback = callback;
601   l->u = u;
602   return ev_fd(ev, ev_read, fd, listen_callback, l, what);
603 }
604
605 int ev_listen_cancel(ev_source *ev, int fd) {
606   D(("cancelling listener fd %d", fd));
607   return ev_fd_cancel(ev, ev_read, fd);
608 }
609
610 /* buffer *********************************************************************/
611
612 struct buffer {
613   char *base, *start, *end, *top;
614 };
615
616 /* make sure there is @bytes@ available at @b->end@ */
617 static void buffer_space(struct buffer *b, size_t bytes) {
618   D(("buffer_space %p %p %p %p want %lu",
619      (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top,
620      (unsigned long)bytes));
621   if(b->start == b->end)
622     b->start = b->end = b->base;
623   if((size_t)(b->top - b->end) < bytes) {
624     if((size_t)((b->top - b->end) + (b->start - b->base)) < bytes) {
625       size_t newspace = b->end - b->start + bytes, n;
626       char *newbase;
627
628       for(n = 16; n < newspace; n *= 2)
629         ;
630       newbase = xmalloc_noptr(n);
631       memcpy(newbase, b->start, b->end - b->start);
632       b->base = newbase;
633       b->end = newbase + (b->end - b->start);
634       b->top = newbase + n;
635       b->start = newbase;               /* must be last */
636     } else {
637       memmove(b->base, b->start, b->end - b->start);
638       b->end = b->base + (b->end - b->start);
639       b->start = b->base;
640     }
641   }
642   D(("result %p %p %p %p",
643      (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top));
644 }
645
646 /* buffered writer ************************************************************/
647
648 struct ev_writer {
649   struct sink s;
650   struct buffer b;
651   int fd;
652   int eof;
653   ev_error_callback *callback;
654   void *u;
655   ev_source *ev;
656 };
657
658 static int writer_callback(ev_source *ev, int fd, void *u) {
659   ev_writer *w = u;
660   int n;
661
662   n = write(fd, w->b.start, w->b.end - w->b.start);
663   D(("callback for writer fd %d, %ld bytes, n=%d, errno=%d",
664      fd, (long)(w->b.end - w->b.start), n, errno));
665   if(n >= 0) {
666     w->b.start += n;
667     if(w->b.start == w->b.end) {
668       if(w->eof) {
669         ev_fd_cancel(ev, ev_write, fd);
670         return w->callback(ev, fd, 0, w->u);
671       } else
672         ev_fd_disable(ev, ev_write, fd);
673     }
674   } else {
675     switch(errno) {
676     case EINTR:
677     case EAGAIN:
678       break;
679     default:
680       ev_fd_cancel(ev, ev_write, fd);
681       return w->callback(ev, fd, errno, w->u);
682     }
683   }
684   return 0;
685 }
686
687 static int ev_writer_write(struct sink *sk, const void *s, int n) {
688   ev_writer *w = (ev_writer *)sk;
689   
690   buffer_space(&w->b, n);
691   if(w->b.start == w->b.end)
692     ev_fd_enable(w->ev, ev_write, w->fd);
693   memcpy(w->b.end, s, n);
694   w->b.end += n;
695   return 0;
696 }
697
698 ev_writer *ev_writer_new(ev_source *ev,
699                          int fd,
700                          ev_error_callback *callback,
701                          void *u,
702                          const char *what) {
703   ev_writer *w = xmalloc(sizeof *w);
704
705   D(("registering writer fd %d callback %p %p", fd, (void *)callback, u));
706   w->s.write = ev_writer_write;
707   w->fd = fd;
708   w->callback = callback;
709   w->u = u;
710   w->ev = ev;
711   if(ev_fd(ev, ev_write, fd, writer_callback, w, what))
712     return 0;
713   ev_fd_disable(ev, ev_write, fd);
714   return w;
715 }
716
717 struct sink *ev_writer_sink(ev_writer *w) {
718   return &w->s;
719 }
720
721 static int writer_shutdown(ev_source *ev,
722                            const attribute((unused)) struct timeval *now,
723                            void *u) {
724   ev_writer *w = u;
725
726   return w->callback(ev, w->fd, 0, w->u);
727 }
728
729 int ev_writer_close(ev_writer *w) {
730   D(("close writer fd %d", w->fd));
731   w->eof = 1;
732   if(w->b.start == w->b.end) {
733     /* we're already finished */
734     ev_fd_cancel(w->ev, ev_write, w->fd);
735     return ev_timeout(w->ev, 0, 0, writer_shutdown, w);
736   }
737   return 0;
738 }
739
740 int ev_writer_cancel(ev_writer *w) {
741   D(("cancel writer fd %d", w->fd));
742   return ev_fd_cancel(w->ev, ev_write, w->fd);
743 }
744
745 int ev_writer_flush(ev_writer *w) {
746   return writer_callback(w->ev, w->fd, w);
747 }
748
749 /* buffered reader ************************************************************/
750
751 struct ev_reader {
752   struct buffer b;
753   int fd;
754   ev_reader_callback *callback;
755   ev_error_callback *error_callback;
756   void *u;
757   ev_source *ev;
758   int eof;
759 };
760
761 static int reader_callback(ev_source *ev, int fd, void *u) {
762   ev_reader *r = u;
763   int n;
764
765   buffer_space(&r->b, 1);
766   n = read(fd, r->b.end, r->b.top - r->b.end);
767   D(("read fd %d buffer %d returned %d errno %d",
768      fd, (int)(r->b.top - r->b.end), n, errno));
769   if(n > 0) {
770     r->b.end += n;
771     return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 0, r->u);
772   } else if(n == 0) {
773     r->eof = 1;
774     ev_fd_cancel(ev, ev_read, fd);
775     return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 1, r->u);
776   } else {
777     switch(errno) {
778     case EINTR:
779     case EAGAIN:
780       break;
781     default:
782       ev_fd_cancel(ev, ev_read, fd);
783       return r->error_callback(ev, fd, errno, r->u);
784     }
785   }
786   return 0;
787 }
788
789 ev_reader *ev_reader_new(ev_source *ev,
790                          int fd,
791                          ev_reader_callback *callback,
792                          ev_error_callback *error_callback,
793                          void *u,
794                          const char *what) {
795   ev_reader *r = xmalloc(sizeof *r);
796
797   D(("registering reader fd %d callback %p %p %p",
798      fd, (void *)callback, (void *)error_callback, u));
799   r->fd = fd;
800   r->callback = callback;
801   r->error_callback = error_callback;
802   r->u = u;
803   r->ev = ev;
804   if(ev_fd(ev, ev_read, fd, reader_callback, r, what))
805     return 0;
806   return r;
807 }
808
809 void ev_reader_buffer(ev_reader *r, size_t nbytes) {
810   buffer_space(&r->b, nbytes - (r->b.end - r->b.start));
811 }
812
813 void ev_reader_consume(ev_reader *r, size_t n) {
814   r->b.start += n;
815 }
816
817 int ev_reader_cancel(ev_reader *r) {
818   D(("cancel reader fd %d", r->fd));
819   return ev_fd_cancel(r->ev, ev_read, r->fd);
820 }
821
822 int ev_reader_disable(ev_reader *r) {
823   D(("disable reader fd %d", r->fd));
824   return r->eof ? 0 : ev_fd_disable(r->ev, ev_read, r->fd);
825 }
826
827 static int reader_continuation(ev_source attribute((unused)) *ev,
828                                const attribute((unused)) struct timeval *now,
829                                void *u) {
830   ev_reader *r = u;
831
832   D(("reader continuation callback fd %d", r->fd));
833   if(ev_fd_enable(r->ev, ev_read, r->fd)) return -1;
834   return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
835 }
836
837 int ev_reader_incomplete(ev_reader *r) {
838   if(ev_fd_disable(r->ev, ev_read, r->fd)) return -1;
839   return ev_timeout(r->ev, 0, 0, reader_continuation, r);
840 }
841
842 static int reader_enabled(ev_source *ev,
843                           const attribute((unused)) struct timeval *now,
844                           void *u) {
845   ev_reader *r = u;
846
847   D(("reader enabled callback fd %d", r->fd));
848   return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
849 }
850
851 int ev_reader_enable(ev_reader *r) {
852   D(("enable reader fd %d", r->fd));
853   return ((r->eof ? 0 : ev_fd_enable(r->ev, ev_read, r->fd))
854           || ev_timeout(r->ev, 0, 0, reader_enabled, r)) ? -1 : 0;
855 }
856
857 /*
858 Local Variables:
859 c-basic-offset:2
860 comment-column:40
861 fill-column:79
862 End:
863 */