460b9539 |
1 | /* |
2 | * This file is part of DisOrder. |
3 | * Copyright (C) 2004, 2005 Richard Kettlewell |
4 | * |
5 | * This program is free software; you can redistribute it and/or modify |
6 | * it under the terms of the GNU General Public License as published by |
7 | * the Free Software Foundation; either version 2 of the License, or |
8 | * (at your option) any later version. |
9 | * |
10 | * This program is distributed in the hope that it will be useful, but |
11 | * WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
13 | * General Public License for more details. |
14 | * |
15 | * You should have received a copy of the GNU General Public License |
16 | * along with this program; if not, write to the Free Software |
17 | * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 |
18 | * USA |
19 | */ |
20 | |
21 | #include <config.h> |
22 | |
23 | #include <unistd.h> |
24 | #include <fcntl.h> |
25 | #include <sys/time.h> |
26 | #include <sys/types.h> |
27 | #include <sys/resource.h> |
28 | #include <sys/wait.h> |
29 | #include <unistd.h> |
30 | #include <assert.h> |
31 | #include <signal.h> |
32 | #include <errno.h> |
33 | #include <string.h> |
34 | #include <limits.h> |
35 | #include <sys/socket.h> |
36 | #include <netinet/in.h> |
37 | #include <sys/un.h> |
38 | #include <stdio.h> |
39 | #include "event.h" |
40 | #include "mem.h" |
41 | #include "log.h" |
42 | #include "syscalls.h" |
43 | #include "printf.h" |
44 | #include "sink.h" |
45 | |
46 | struct timeout { |
47 | struct timeout *next; |
48 | struct timeval when; |
49 | ev_timeout_callback *callback; |
50 | void *u; |
51 | int resolve; |
52 | }; |
53 | |
54 | struct fd { |
55 | int fd; |
56 | ev_fd_callback *callback; |
57 | void *u; |
58 | }; |
59 | |
60 | struct fdmode { |
61 | fd_set enabled; |
62 | fd_set tripped; |
63 | int nfds, fdslots; |
64 | struct fd *fds; |
65 | int maxfd; |
66 | }; |
67 | |
68 | struct signal { |
69 | struct sigaction oldsa; |
70 | ev_signal_callback *callback; |
71 | void *u; |
72 | }; |
73 | |
74 | struct child { |
75 | pid_t pid; |
76 | int options; |
77 | ev_child_callback *callback; |
78 | void *u; |
79 | }; |
80 | |
81 | struct ev_source { |
82 | struct fdmode mode[ev_nmodes]; |
83 | struct timeout *timeouts; |
84 | struct signal signals[NSIG]; |
85 | sigset_t sigmask; |
86 | int escape; |
87 | int sigpipe[2]; |
88 | int nchildren, nchildslots; |
89 | struct child *children; |
90 | }; |
91 | |
92 | static const char *modenames[] = { "read", "write", "except" }; |
93 | |
94 | /* utilities ******************************************************************/ |
95 | |
96 | static inline int gt(const struct timeval *a, const struct timeval *b) { |
97 | if(a->tv_sec > b->tv_sec) |
98 | return 1; |
99 | if(a->tv_sec == b->tv_sec |
100 | && a->tv_usec > b->tv_usec) |
101 | return 1; |
102 | return 0; |
103 | } |
104 | |
105 | static inline int ge(const struct timeval *a, const struct timeval *b) { |
106 | return !gt(b, a); |
107 | } |
108 | |
109 | /* creation *******************************************************************/ |
110 | |
111 | ev_source *ev_new(void) { |
112 | ev_source *ev = xmalloc(sizeof *ev); |
113 | int n; |
114 | |
115 | memset(ev, 0, sizeof *ev); |
116 | for(n = 0; n < ev_nmodes; ++n) |
117 | FD_ZERO(&ev->mode[n].enabled); |
118 | ev->sigpipe[0] = ev->sigpipe[1] = -1; |
119 | sigemptyset(&ev->sigmask); |
120 | return ev; |
121 | } |
122 | |
123 | /* event loop *****************************************************************/ |
124 | |
125 | int ev_run(ev_source *ev) { |
126 | for(;;) { |
127 | struct timeval now; |
128 | struct timeval delta; |
129 | int n, mode; |
130 | int ret; |
131 | int maxfd; |
132 | struct timeout *t, **tt; |
133 | |
134 | xgettimeofday(&now, 0); |
135 | /* Handle timeouts. We don't want to handle any timeouts that are added |
136 | * while we're handling them (otherwise we'd have to break out of infinite |
137 | * loops, preferrably without starving better-behaved subsystems). Hence |
138 | * the slightly complicated two-phase approach here. */ |
139 | for(t = ev->timeouts; |
140 | t && ge(&now, &t->when); |
141 | t = t->next) { |
142 | t->resolve = 1; |
143 | D(("calling timeout for %ld.%ld callback %p %p", |
144 | (long)t->when.tv_sec, (long)t->when.tv_usec, |
145 | (void *)t->callback, t->u)); |
146 | ret = t->callback(ev, &now, t->u); |
147 | if(ret) |
148 | return ret; |
149 | } |
150 | tt = &ev->timeouts; |
151 | while((t = *tt)) { |
152 | if(t->resolve) |
153 | *tt = t->next; |
154 | else |
155 | tt = &t->next; |
156 | } |
157 | maxfd = 0; |
158 | for(mode = 0; mode < ev_nmodes; ++mode) { |
159 | ev->mode[mode].tripped = ev->mode[mode].enabled; |
160 | if(ev->mode[mode].maxfd > maxfd) |
161 | maxfd = ev->mode[mode].maxfd; |
162 | } |
163 | xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0); |
164 | do { |
165 | if(ev->timeouts) { |
166 | xgettimeofday(&now, 0); |
167 | delta.tv_sec = ev->timeouts->when.tv_sec - now.tv_sec; |
168 | delta.tv_usec = ev->timeouts->when.tv_usec - now.tv_usec; |
169 | if(delta.tv_usec < 0) { |
170 | delta.tv_usec += 1000000; |
171 | --delta.tv_sec; |
172 | } |
173 | if(delta.tv_sec < 0) |
174 | delta.tv_sec = delta.tv_usec = 0; |
175 | n = select(maxfd + 1, |
176 | &ev->mode[ev_read].tripped, |
177 | &ev->mode[ev_write].tripped, |
178 | &ev->mode[ev_except].tripped, |
179 | &delta); |
180 | } else { |
181 | n = select(maxfd + 1, |
182 | &ev->mode[ev_read].tripped, |
183 | &ev->mode[ev_write].tripped, |
184 | &ev->mode[ev_except].tripped, |
185 | 0); |
186 | } |
187 | } while(n < 0 && errno == EINTR); |
188 | xsigprocmask(SIG_BLOCK, &ev->sigmask, 0); |
189 | if(n < 0) { |
190 | error(errno, "error calling select"); |
191 | return -1; |
192 | } |
193 | if(n > 0) { |
194 | /* if anything deranges the meaning of an fd, or re-orders the |
195 | * fds[] tables, we'd better give up; such operations will |
196 | * therefore set @escape@. */ |
197 | ev->escape = 0; |
198 | for(mode = 0; mode < ev_nmodes && !ev->escape; ++mode) |
199 | for(n = 0; n < ev->mode[mode].nfds && !ev->escape; ++n) { |
200 | int fd = ev->mode[mode].fds[n].fd; |
201 | if(FD_ISSET(fd, &ev->mode[mode].tripped)) { |
202 | D(("calling %s fd %d callback %p %p", modenames[mode], fd, |
203 | (void *)ev->mode[mode].fds[n].callback, |
204 | ev->mode[mode].fds[n].u)); |
205 | ret = ev->mode[mode].fds[n].callback(ev, fd, |
206 | ev->mode[mode].fds[n].u); |
207 | if(ret) |
208 | return ret; |
209 | } |
210 | } |
211 | } |
212 | /* we'll pick up timeouts back round the loop */ |
213 | } |
214 | } |
215 | |
216 | /* file descriptors ***********************************************************/ |
217 | |
218 | int ev_fd(ev_source *ev, |
219 | ev_fdmode mode, |
220 | int fd, |
221 | ev_fd_callback *callback, |
222 | void *u) { |
223 | int n; |
224 | |
225 | D(("registering %s fd %d callback %p %p", modenames[mode], fd, |
226 | (void *)callback, u)); |
227 | assert(mode < ev_nmodes); |
228 | if(ev->mode[mode].nfds >= ev->mode[mode].fdslots) { |
229 | ev->mode[mode].fdslots = (ev->mode[mode].fdslots |
230 | ? 2 * ev->mode[mode].fdslots : 16); |
231 | D(("expanding %s fd table to %d entries", modenames[mode], |
232 | ev->mode[mode].fdslots)); |
233 | ev->mode[mode].fds = xrealloc(ev->mode[mode].fds, |
234 | ev->mode[mode].fdslots * sizeof (struct fd)); |
235 | } |
236 | n = ev->mode[mode].nfds++; |
237 | FD_SET(fd, &ev->mode[mode].enabled); |
238 | ev->mode[mode].fds[n].fd = fd; |
239 | ev->mode[mode].fds[n].callback = callback; |
240 | ev->mode[mode].fds[n].u = u; |
241 | if(fd > ev->mode[mode].maxfd) |
242 | ev->mode[mode].maxfd = fd; |
243 | ev->escape = 1; |
244 | return 0; |
245 | } |
246 | |
247 | int ev_fd_cancel(ev_source *ev, ev_fdmode mode, int fd) { |
248 | int n; |
249 | int maxfd; |
250 | |
251 | D(("cancelling mode %s fd %d", modenames[mode], fd)); |
252 | /* find the right struct fd */ |
253 | for(n = 0; n < ev->mode[mode].nfds && fd != ev->mode[mode].fds[n].fd; ++n) |
254 | ; |
255 | assert(n < ev->mode[mode].nfds); |
256 | /* swap in the last fd and reduce the count */ |
257 | if(n != ev->mode[mode].nfds - 1) |
258 | ev->mode[mode].fds[n] = ev->mode[mode].fds[ev->mode[mode].nfds - 1]; |
259 | --ev->mode[mode].nfds; |
260 | /* if that was the biggest fd, find the new biggest one */ |
261 | if(fd == ev->mode[mode].maxfd) { |
262 | maxfd = 0; |
263 | for(n = 0; n < ev->mode[mode].nfds; ++n) |
264 | if(ev->mode[mode].fds[n].fd > maxfd) |
265 | maxfd = ev->mode[mode].fds[n].fd; |
266 | ev->mode[mode].maxfd = maxfd; |
267 | } |
268 | /* don't tell select about this fd any more */ |
269 | FD_CLR(fd, &ev->mode[mode].enabled); |
270 | ev->escape = 1; |
271 | return 0; |
272 | } |
273 | |
274 | int ev_fd_enable(ev_source *ev, ev_fdmode mode, int fd) { |
275 | D(("enabling mode %s fd %d", modenames[mode], fd)); |
276 | FD_SET(fd, &ev->mode[mode].enabled); |
277 | return 0; |
278 | } |
279 | |
280 | int ev_fd_disable(ev_source *ev, ev_fdmode mode, int fd) { |
281 | D(("disabling mode %s fd %d", modenames[mode], fd)); |
282 | FD_CLR(fd, &ev->mode[mode].enabled); |
283 | FD_CLR(fd, &ev->mode[mode].tripped); |
284 | return 0; |
285 | } |
286 | |
287 | /* timeouts *******************************************************************/ |
288 | |
289 | int ev_timeout(ev_source *ev, |
290 | ev_timeout_handle *handlep, |
291 | const struct timeval *when, |
292 | ev_timeout_callback *callback, |
293 | void *u) { |
294 | struct timeout *t, *p, **pp; |
295 | |
296 | D(("registering timeout at %ld.%ld callback %p %p", |
297 | when ? (long)when->tv_sec : 0, when ? (long)when->tv_usec : 0, |
298 | (void *)callback, u)); |
299 | t = xmalloc(sizeof *t); |
300 | if(when) |
301 | t->when = *when; |
302 | t->callback = callback; |
303 | t->u = u; |
304 | pp = &ev->timeouts; |
305 | while((p = *pp) && gt(&t->when, &p->when)) |
306 | pp = &p->next; |
307 | t->next = p; |
308 | *pp = t; |
309 | if(handlep) |
310 | *handlep = t; |
311 | return 0; |
312 | } |
313 | |
314 | int ev_timeout_cancel(ev_source *ev, |
315 | ev_timeout_handle handle) { |
316 | struct timeout *t = handle, *p, **pp; |
317 | |
318 | for(pp = &ev->timeouts; (p = *pp) && p != t; pp = &p->next) |
319 | ; |
320 | if(p) { |
321 | *pp = p->next; |
322 | return 0; |
323 | } else |
324 | return -1; |
325 | } |
326 | |
327 | /* signals ********************************************************************/ |
328 | |
329 | static int sigfd[NSIG]; |
330 | |
331 | static void sighandler(int s) { |
332 | unsigned char sc = s; |
333 | static const char errmsg[] = "error writing to signal pipe"; |
334 | |
335 | /* probably the reader has stopped listening for some reason */ |
336 | if(write(sigfd[s], &sc, 1) < 0) { |
337 | write(2, errmsg, sizeof errmsg - 1); |
338 | abort(); |
339 | } |
340 | } |
341 | |
342 | static int signal_read(ev_source *ev, |
343 | int attribute((unused)) fd, |
344 | void attribute((unused)) *u) { |
345 | unsigned char s; |
346 | int n; |
347 | int ret; |
348 | |
349 | if((n = read(ev->sigpipe[0], &s, 1)) == 1) |
350 | if((ret = ev->signals[s].callback(ev, s, ev->signals[s].u))) |
351 | return ret; |
352 | assert(n != 0); |
353 | if(n < 0 && (errno != EINTR && errno != EAGAIN)) { |
354 | error(errno, "error reading from signal pipe %d", ev->sigpipe[0]); |
355 | return -1; |
356 | } |
357 | return 0; |
358 | } |
359 | |
360 | static void close_sigpipe(ev_source *ev) { |
361 | int save_errno = errno; |
362 | |
363 | xclose(ev->sigpipe[0]); |
364 | xclose(ev->sigpipe[1]); |
365 | ev->sigpipe[0] = ev->sigpipe[1] = -1; |
366 | errno = save_errno; |
367 | } |
368 | |
369 | int ev_signal(ev_source *ev, |
370 | int sig, |
371 | ev_signal_callback *callback, |
372 | void *u) { |
373 | int n; |
374 | struct sigaction sa; |
375 | |
376 | D(("registering signal %d handler callback %p %p", sig, (void *)callback, u)); |
377 | assert(sig > 0); |
378 | assert(sig < NSIG); |
379 | assert(sig <= UCHAR_MAX); |
380 | if(ev->sigpipe[0] == -1) { |
381 | D(("creating signal pipe")); |
382 | xpipe(ev->sigpipe); |
383 | D(("signal pipe is %d, %d", ev->sigpipe[0], ev->sigpipe[1])); |
384 | for(n = 0; n < 2; ++n) { |
385 | nonblock(ev->sigpipe[n]); |
386 | cloexec(ev->sigpipe[n]); |
387 | } |
388 | if(ev_fd(ev, ev_read, ev->sigpipe[0], signal_read, 0)) { |
389 | close_sigpipe(ev); |
390 | return -1; |
391 | } |
392 | } |
393 | sigaddset(&ev->sigmask, sig); |
394 | xsigprocmask(SIG_BLOCK, &ev->sigmask, 0); |
395 | sigfd[sig] = ev->sigpipe[1]; |
396 | ev->signals[sig].callback = callback; |
397 | ev->signals[sig].u = u; |
398 | sa.sa_handler = sighandler; |
399 | sigfillset(&sa.sa_mask); |
400 | sa.sa_flags = SA_RESTART; |
401 | xsigaction(sig, &sa, &ev->signals[sig].oldsa); |
402 | ev->escape = 1; |
403 | return 0; |
404 | } |
405 | |
406 | int ev_signal_cancel(ev_source *ev, |
407 | int sig) { |
408 | sigset_t ss; |
409 | |
410 | xsigaction(sig, &ev->signals[sig].oldsa, 0); |
411 | ev->signals[sig].callback = 0; |
412 | ev->escape = 1; |
413 | sigdelset(&ev->sigmask, sig); |
414 | sigemptyset(&ss); |
415 | sigaddset(&ss, sig); |
416 | xsigprocmask(SIG_UNBLOCK, &ss, 0); |
417 | return 0; |
418 | } |
419 | |
420 | void ev_signal_atfork(ev_source *ev) { |
421 | int sig; |
422 | |
423 | if(ev->sigpipe[0] != -1) { |
424 | /* revert any handled signals to their original state */ |
425 | for(sig = 1; sig < NSIG; ++sig) { |
426 | if(ev->signals[sig].callback != 0) |
427 | xsigaction(sig, &ev->signals[sig].oldsa, 0); |
428 | } |
429 | /* and then unblock them */ |
430 | xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0); |
431 | /* don't want a copy of the signal pipe open inside the fork */ |
432 | xclose(ev->sigpipe[0]); |
433 | xclose(ev->sigpipe[1]); |
434 | } |
435 | } |
436 | |
437 | /* child processes ************************************************************/ |
438 | |
439 | static int sigchld_callback(ev_source *ev, |
440 | int attribute((unused)) sig, |
441 | void attribute((unused)) *u) { |
442 | struct rusage ru; |
443 | pid_t r; |
444 | int status, n, ret, revisit; |
445 | |
446 | do { |
447 | revisit = 0; |
448 | for(n = 0; n < ev->nchildren; ++n) { |
449 | r = wait4(ev->children[n].pid, |
450 | &status, |
451 | ev->children[n].options | WNOHANG, |
452 | &ru); |
453 | if(r > 0) { |
454 | ev_child_callback *c = ev->children[n].callback; |
455 | void *cu = ev->children[n].u; |
456 | |
457 | if(WIFEXITED(status) || WIFSIGNALED(status)) |
458 | ev_child_cancel(ev, r); |
459 | revisit = 1; |
460 | if((ret = c(ev, r, status, &ru, cu))) |
461 | return ret; |
462 | } else if(r < 0) { |
463 | /* We should "never" get an ECHILD but it can in fact happen. For |
464 | * instance on Linux 2.4.31, and probably other versions, if someone |
465 | * straces a child process and then a different child process |
466 | * terminates, when we wait4() the trace process we will get ECHILD |
467 | * because it has been reparented to strace. Obviously this is a |
468 | * hopeless design flaw in the tracing infrastructure, but we don't |
469 | * want the disorder server to bomb out because of it. So we just log |
470 | * the problem and ignore it. |
471 | */ |
472 | error(errno, "error calling wait4 for PID %lu (broken ptrace?)", |
473 | (unsigned long)ev->children[n].pid); |
474 | if(errno != ECHILD) |
475 | return -1; |
476 | } |
477 | } |
478 | } while(revisit); |
479 | return 0; |
480 | } |
481 | |
482 | int ev_child_setup(ev_source *ev) { |
483 | D(("installing SIGCHLD handler")); |
484 | return ev_signal(ev, SIGCHLD, sigchld_callback, 0); |
485 | } |
486 | |
487 | int ev_child(ev_source *ev, |
488 | pid_t pid, |
489 | int options, |
490 | ev_child_callback *callback, |
491 | void *u) { |
492 | int n; |
493 | |
494 | D(("registering child handling %ld options %d callback %p %p", |
495 | (long)pid, options, (void *)callback, u)); |
496 | assert(ev->signals[SIGCHLD].callback == sigchld_callback); |
497 | if(ev->nchildren >= ev->nchildslots) { |
498 | ev->nchildslots = ev->nchildslots ? 2 * ev->nchildslots : 16; |
499 | ev->children = xrealloc(ev->children, |
500 | ev->nchildslots * sizeof (struct child)); |
501 | } |
502 | n = ev->nchildren++; |
503 | ev->children[n].pid = pid; |
504 | ev->children[n].options = options; |
505 | ev->children[n].callback = callback; |
506 | ev->children[n].u = u; |
507 | return 0; |
508 | } |
509 | |
510 | int ev_child_cancel(ev_source *ev, |
511 | pid_t pid) { |
512 | int n; |
513 | |
514 | for(n = 0; n < ev->nchildren && ev->children[n].pid != pid; ++n) |
515 | ; |
516 | assert(n < ev->nchildren); |
517 | if(n != ev->nchildren - 1) |
518 | ev->children[n] = ev->children[ev->nchildren - 1]; |
519 | --ev->nchildren; |
520 | return 0; |
521 | } |
522 | |
523 | /* socket listeners ***********************************************************/ |
524 | |
525 | struct listen_state { |
526 | ev_listen_callback *callback; |
527 | void *u; |
528 | }; |
529 | |
530 | static int listen_callback(ev_source *ev, int fd, void *u) { |
531 | const struct listen_state *l = u; |
532 | int newfd; |
533 | union { |
534 | struct sockaddr_in in; |
535 | #if HAVE_STRUCT_SOCKADDR_IN6 |
536 | struct sockaddr_in6 in6; |
537 | #endif |
538 | struct sockaddr_un un; |
539 | struct sockaddr sa; |
540 | } addr; |
541 | socklen_t addrlen; |
542 | int ret; |
543 | |
544 | D(("callback for listener fd %d", fd)); |
545 | while((addrlen = sizeof addr), |
546 | (newfd = accept(fd, &addr.sa, &addrlen)) >= 0) { |
547 | if((ret = l->callback(ev, newfd, &addr.sa, addrlen, l->u))) |
548 | return ret; |
549 | } |
550 | switch(errno) { |
551 | case EINTR: |
552 | case EAGAIN: |
553 | break; |
554 | #ifdef ECONNABORTED |
555 | case ECONNABORTED: |
556 | error(errno, "error calling accept"); |
557 | break; |
558 | #endif |
559 | #ifdef EPROTO |
560 | case EPROTO: |
561 | /* XXX on some systems EPROTO should be fatal, but we don't know if |
562 | * we're running on one of them */ |
563 | error(errno, "error calling accept"); |
564 | break; |
565 | #endif |
566 | default: |
567 | fatal(errno, "error calling accept"); |
568 | break; |
569 | } |
570 | if(errno != EINTR && errno != EAGAIN) |
571 | error(errno, "error calling accept"); |
572 | return 0; |
573 | } |
574 | |
575 | int ev_listen(ev_source *ev, |
576 | int fd, |
577 | ev_listen_callback *callback, |
578 | void *u) { |
579 | struct listen_state *l = xmalloc(sizeof *l); |
580 | |
581 | D(("registering listener fd %d callback %p %p", fd, (void *)callback, u)); |
582 | l->callback = callback; |
583 | l->u = u; |
584 | return ev_fd(ev, ev_read, fd, listen_callback, l); |
585 | } |
586 | |
587 | int ev_listen_cancel(ev_source *ev, int fd) { |
588 | D(("cancelling listener fd %d", fd)); |
589 | return ev_fd_cancel(ev, ev_read, fd); |
590 | } |
591 | |
592 | /* buffer *********************************************************************/ |
593 | |
594 | struct buffer { |
595 | char *base, *start, *end, *top; |
596 | }; |
597 | |
598 | /* make sure there is @bytes@ available at @b->end@ */ |
599 | static void buffer_space(struct buffer *b, size_t bytes) { |
600 | D(("buffer_space %p %p %p %p want %lu", |
601 | (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top, |
602 | (unsigned long)bytes)); |
603 | if(b->start == b->end) |
604 | b->start = b->end = b->base; |
605 | if((size_t)(b->top - b->end) < bytes) { |
606 | if((size_t)((b->top - b->end) + (b->start - b->base)) < bytes) { |
607 | size_t newspace = b->end - b->start + bytes, n; |
608 | char *newbase; |
609 | |
610 | for(n = 16; n < newspace; n *= 2) |
611 | ; |
612 | newbase = xmalloc_noptr(n); |
613 | memcpy(newbase, b->start, b->end - b->start); |
614 | b->base = newbase; |
615 | b->end = newbase + (b->end - b->start); |
616 | b->top = newbase + n; |
617 | b->start = newbase; /* must be last */ |
618 | } else { |
619 | memmove(b->base, b->start, b->end - b->start); |
620 | b->end = b->base + (b->end - b->start); |
621 | b->start = b->base; |
622 | } |
623 | } |
624 | D(("result %p %p %p %p", |
625 | (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top)); |
626 | } |
627 | |
628 | /* buffered writer ************************************************************/ |
629 | |
630 | struct ev_writer { |
631 | struct sink s; |
632 | struct buffer b; |
633 | int fd; |
634 | int eof; |
635 | ev_error_callback *callback; |
636 | void *u; |
637 | ev_source *ev; |
638 | }; |
639 | |
640 | static int writer_callback(ev_source *ev, int fd, void *u) { |
641 | ev_writer *w = u; |
642 | int n; |
643 | |
644 | n = write(fd, w->b.start, w->b.end - w->b.start); |
645 | D(("callback for writer fd %d, %ld bytes, n=%d, errno=%d", |
646 | fd, (long)(w->b.end - w->b.start), n, errno)); |
647 | if(n >= 0) { |
648 | w->b.start += n; |
649 | if(w->b.start == w->b.end) { |
650 | if(w->eof) { |
651 | ev_fd_cancel(ev, ev_write, fd); |
652 | return w->callback(ev, fd, 0, w->u); |
653 | } else |
654 | ev_fd_disable(ev, ev_write, fd); |
655 | } |
656 | } else { |
657 | switch(errno) { |
658 | case EINTR: |
659 | case EAGAIN: |
660 | break; |
661 | default: |
662 | ev_fd_cancel(ev, ev_write, fd); |
663 | return w->callback(ev, fd, errno, w->u); |
664 | } |
665 | } |
666 | return 0; |
667 | } |
668 | |
669 | static int ev_writer_write(struct sink *sk, const void *s, int n) { |
670 | ev_writer *w = (ev_writer *)sk; |
671 | |
672 | buffer_space(&w->b, n); |
673 | if(w->b.start == w->b.end) |
674 | ev_fd_enable(w->ev, ev_write, w->fd); |
675 | memcpy(w->b.end, s, n); |
676 | w->b.end += n; |
677 | return 0; |
678 | } |
679 | |
680 | ev_writer *ev_writer_new(ev_source *ev, |
681 | int fd, |
682 | ev_error_callback *callback, |
683 | void *u) { |
684 | ev_writer *w = xmalloc(sizeof *w); |
685 | |
686 | D(("registering writer fd %d callback %p %p", fd, (void *)callback, u)); |
687 | w->s.write = ev_writer_write; |
688 | w->fd = fd; |
689 | w->callback = callback; |
690 | w->u = u; |
691 | w->ev = ev; |
692 | if(ev_fd(ev, ev_write, fd, writer_callback, w)) |
693 | return 0; |
694 | ev_fd_disable(ev, ev_write, fd); |
695 | return w; |
696 | } |
697 | |
698 | struct sink *ev_writer_sink(ev_writer *w) { |
699 | return &w->s; |
700 | } |
701 | |
702 | static int writer_shutdown(ev_source *ev, |
703 | const attribute((unused)) struct timeval *now, |
704 | void *u) { |
705 | ev_writer *w = u; |
706 | |
707 | return w->callback(ev, w->fd, 0, w->u); |
708 | } |
709 | |
710 | int ev_writer_close(ev_writer *w) { |
711 | D(("close writer fd %d", w->fd)); |
712 | w->eof = 1; |
713 | if(w->b.start == w->b.end) { |
714 | /* we're already finished */ |
715 | ev_fd_cancel(w->ev, ev_write, w->fd); |
716 | return ev_timeout(w->ev, 0, 0, writer_shutdown, w); |
717 | } |
718 | return 0; |
719 | } |
720 | |
721 | int ev_writer_cancel(ev_writer *w) { |
722 | D(("cancel writer fd %d", w->fd)); |
723 | return ev_fd_cancel(w->ev, ev_write, w->fd); |
724 | } |
725 | |
726 | int ev_writer_flush(ev_writer *w) { |
727 | return writer_callback(w->ev, w->fd, w); |
728 | } |
729 | |
730 | /* buffered reader ************************************************************/ |
731 | |
732 | struct ev_reader { |
733 | struct buffer b; |
734 | int fd; |
735 | ev_reader_callback *callback; |
736 | ev_error_callback *error_callback; |
737 | void *u; |
738 | ev_source *ev; |
739 | int eof; |
740 | }; |
741 | |
742 | static int reader_callback(ev_source *ev, int fd, void *u) { |
743 | ev_reader *r = u; |
744 | int n; |
745 | |
746 | buffer_space(&r->b, 1); |
747 | n = read(fd, r->b.end, r->b.top - r->b.end); |
748 | D(("read fd %d buffer %d returned %d errno %d", |
749 | fd, (int)(r->b.top - r->b.end), n, errno)); |
750 | if(n > 0) { |
751 | r->b.end += n; |
752 | return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 0, r->u); |
753 | } else if(n == 0) { |
754 | r->eof = 1; |
755 | ev_fd_cancel(ev, ev_read, fd); |
756 | return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 1, r->u); |
757 | } else { |
758 | switch(errno) { |
759 | case EINTR: |
760 | case EAGAIN: |
761 | break; |
762 | default: |
763 | ev_fd_cancel(ev, ev_read, fd); |
764 | return r->error_callback(ev, fd, errno, r->u); |
765 | } |
766 | } |
767 | return 0; |
768 | } |
769 | |
770 | ev_reader *ev_reader_new(ev_source *ev, |
771 | int fd, |
772 | ev_reader_callback *callback, |
773 | ev_error_callback *error_callback, |
774 | void *u) { |
775 | ev_reader *r = xmalloc(sizeof *r); |
776 | |
777 | D(("registering reader fd %d callback %p %p %p", |
778 | fd, (void *)callback, (void *)error_callback, u)); |
779 | r->fd = fd; |
780 | r->callback = callback; |
781 | r->error_callback = error_callback; |
782 | r->u = u; |
783 | r->ev = ev; |
784 | if(ev_fd(ev, ev_read, fd, reader_callback, r)) |
785 | return 0; |
786 | return r; |
787 | } |
788 | |
789 | void ev_reader_buffer(ev_reader *r, size_t nbytes) { |
790 | buffer_space(&r->b, nbytes - (r->b.end - r->b.start)); |
791 | } |
792 | |
793 | void ev_reader_consume(ev_reader *r, size_t n) { |
794 | r->b.start += n; |
795 | } |
796 | |
797 | int ev_reader_cancel(ev_reader *r) { |
798 | D(("cancel reader fd %d", r->fd)); |
799 | return ev_fd_cancel(r->ev, ev_read, r->fd); |
800 | } |
801 | |
802 | int ev_reader_disable(ev_reader *r) { |
803 | D(("disable reader fd %d", r->fd)); |
804 | return r->eof ? 0 : ev_fd_disable(r->ev, ev_read, r->fd); |
805 | } |
806 | |
807 | static int reader_continuation(ev_source attribute((unused)) *ev, |
808 | const attribute((unused)) struct timeval *now, |
809 | void *u) { |
810 | ev_reader *r = u; |
811 | |
812 | D(("reader continuation callback fd %d", r->fd)); |
813 | if(ev_fd_enable(r->ev, ev_read, r->fd)) return -1; |
814 | return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u); |
815 | } |
816 | |
817 | int ev_reader_incomplete(ev_reader *r) { |
818 | if(ev_fd_disable(r->ev, ev_read, r->fd)) return -1; |
819 | return ev_timeout(r->ev, 0, 0, reader_continuation, r); |
820 | } |
821 | |
822 | static int reader_enabled(ev_source *ev, |
823 | const attribute((unused)) struct timeval *now, |
824 | void *u) { |
825 | ev_reader *r = u; |
826 | |
827 | D(("reader enabled callback fd %d", r->fd)); |
828 | return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u); |
829 | } |
830 | |
831 | int ev_reader_enable(ev_reader *r) { |
832 | D(("enable reader fd %d", r->fd)); |
833 | return ((r->eof ? 0 : ev_fd_enable(r->ev, ev_read, r->fd)) |
834 | || ev_timeout(r->ev, 0, 0, reader_enabled, r)) ? -1 : 0; |
835 | } |
836 | |
837 | /* |
838 | Local Variables: |
839 | c-basic-offset:2 |
840 | comment-column:40 |
841 | fill-column:79 |
842 | End: |
843 | */ |