#include #include #include #include #include #include #include #include #include #include "buffer.h" #include "util.h" buffer_t *buffer_alloc(void) { buffer_t *buf=malloc(sizeof(buffer_t)); if (!buf) die("buffer_alloc: %s\n", strerror(errno)); memset(buf, 0, sizeof(buffer_t)); buf->in_fd=buf->out_fd=NULL; return buf; } void buffer_passthrough_shutdown_writer(buffer_t *buf) { if (!buf->out_active) return; fd_del_writer(buf->out_fd, &buf->out_fd_list); buf->out_active=0; if (!buf->in_active && buf->notify_finished) (buf->notify_finished)(buf); } void buffer_passthrough_shutdown_reader(buffer_t *buf) { if (!buf->in_active) return; buf->in_active=0; fd_del_reader(buf->in_fd, &buf->in_fd_list); if (!buf->out_active && buf->notify_finished) (buf->notify_finished)(buf); } void buffer_passthrough_waitforempty(buffer_t *buf) { if (buf->used) buf->notify_empty=buffer_passthrough_shutdown_writer; else buffer_passthrough_shutdown_writer(buf); } void buffer_readok(void *object, void *arg1 UNUSED, int arg2 UNUSED) { buffer_t *buf=(buffer_t *)object; int bytesread, end; if (buf->size==buf->used) return; if (buf->used==0) buf->offset=0; end=buf->offset+buf->used; if (buf->offset==0 || end>=buf->size) { if (end>buf->size) end-=buf->size; bytesread=fd_read(buf->in_fd, buf->buffer + end, buf->size-buf->used); } else { struct iovec vec[2]; vec[0].iov_base=buf->buffer+end; vec[0].iov_len=buf->size-end; vec[1].iov_base=buf->buffer; vec[1].iov_len=buf->offset; bytesread=fd_readv(buf->in_fd, vec, 2); } if (bytesread==-1) { if (errno==EINTR || errno==EAGAIN) return; buf->in_error=errno; buf->in_active=0; warn("readv: %s\n", strerror(errno)); fd_unrequest_read(&buf->readok); fd_del_reader(buf->in_fd, &buf->in_fd_list); if (buf->notify_readerror) (buf->notify_readerror)(buf); if (!buf->out_active && buf->notify_finished) (buf->notify_finished)(buf); return; } buf->used+=bytesread; if (bytesread==0) { buf->eof=1; buf->in_active=0; fd_unrequest_read(&buf->readok); fd_del_reader(buf->in_fd, &buf->in_fd_list); if (buf->notify_eof) (buf->notify_eof)(buf); if (!buf->out_active && buf->notify_finished) (buf->notify_finished)(buf); } if (buf->used==buf->size && buf->notify_full) (buf->notify_full)(buf); if (buf->size && buf->used==buf->size) fd_unrequest_read(&buf->readok); if (buf->out_fd && bytesread && buf->used==bytesread) fd_request_write(buf->out_fd, &buf->writeok); if (buf->notify_data) (buf->notify_data)(buf); } void buffer_writeok(void *object, void *arg1 UNUSED, int arg2 UNUSED) { buffer_t *buf=(buffer_t *)object; int byteswritten, end, size; if (!buf->used) return; size=buf->used; if (size>buf->chunk_size) size=buf->chunk_size; end=buf->offset+size; if (end<=buf->size) { byteswritten=fd_write(buf->out_fd, buf->buffer+buf->offset, size); } else { struct iovec vec[2]; vec[0].iov_base=buf->buffer + buf->offset; vec[0].iov_len=buf->size - buf->offset; vec[1].iov_base=buf->buffer; vec[1].iov_len=size - vec[0].iov_len; byteswritten=fd_writev(buf->out_fd, vec, 2); } if (byteswritten==-1) { if (errno==EAGAIN && size>1) buf->chunk_size=size/2; if (errno==EINTR || errno==EAGAIN) return; buf->out_error=errno; buf->out_active=0; warn("writev: %s\n", strerror(buf->out_error)); fd_unrequest_write(&buf->writeok); fd_del_writer(buf->out_fd, &buf->out_fd_list); if (buf->in_active) { fd_unrequest_read(&buf->readok); fd_del_reader(buf->out_fd, &buf->out_fd_list); buf->in_active=0; } if (buf->notify_writeerror) (buf->notify_writeerror)(buf); if (!buf->in_active && buf->notify_finished) (buf->notify_finished)(buf); return; } buf->used-=byteswritten; if (buf->used) { buf->offset+=byteswritten; if (buf->offset>=buf->size) buf->offset-=buf->size; } else { buf->offset=0; } if (buf->chunk_sizesize) buf->chunk_size *= 2; if (buf->used==0 && buf->notify_empty) (buf->notify_empty)(buf); if (buf->used==0) fd_unrequest_write(&buf->writeok); if (buf->in_active && buf->in_fd && buf->used==buf->size-byteswritten) fd_request_read(buf->in_fd, &buf->readok); if (buf->notify_data) (buf->notify_data)(buf); } void buffer_in_misc(void *object, void *arg1 UNUSED, int arg2) { buffer_t *buf=(buffer_t *)object; if (arg2==FD_CLOSE) { if (buf->readok.list.next!=&buf->readok.list) { fd_unrequest_read(&buf->readok); fd_del_reader(buf->in_fd, &buf->in_fd_list); buf->in_active=0; } } } void buffer_out_misc(void *object, void *arg1 UNUSED, int arg2) { buffer_t *buf=(buffer_t *)object; if (arg2==FD_CLOSE) { if (buf->writeok.list.next!=&buf->writeok.list) { fd_unrequest_write(&buf->writeok); fd_del_writer(buf->out_fd, &buf->out_fd_list); } buf->out_active=0; if (buf->in_active) { fd_unrequest_read(&buf->readok); fd_del_reader(buf->in_fd, &buf->in_fd_list); buf->in_active=0; } } } void buffer_init(buffer_t *buf, fd_t *in_fd, fd_t *out_fd, int size, void *arg) { buf->in_fd=in_fd; buf->out_fd=out_fd; buf->offset=0; buf->used=0; buf->size=size; buf->eof=0; buf->in_error=0; buf->in_active=in_fd?in_fd->fd_in:0; buf->out_error=0; buf->out_active=out_fd?out_fd->fd_out:0; buf->chunk_size=size; buf->buffer=NULL; buf->arg=arg; if (size && (!(buf->buffer=malloc(size)))) die("buffer_init: %s\n", strerror(errno)); buf->notify_data=NULL; buf->notify_full=NULL; buf->notify_empty=NULL; buf->notify_eof=NULL; buf->notify_readerror=NULL; buf->notify_writeerror=NULL; buf->notify_finished=NULL; notifier_init(&buf->readok, buf, buffer_readok); notifier_init(&buf->writeok, buf, buffer_writeok); notifier_init(&buf->in_misc, buf, buffer_in_misc); notifier_init(&buf->out_misc, buf, buffer_out_misc); list_init(&buf->in_fd_list); list_init(&buf->out_fd_list); } void buffer_init_incoming(buffer_t *buf, fd_t *fd, int size, void *arg) { buffer_init(buf, fd, NULL, size, arg); fd_add_reader(buf->in_fd, &buf->in_fd_list); if (size) fd_request_read(buf->in_fd, &buf->readok); fd_request_misc(buf->in_fd, &buf->in_misc); } void buffer_init_outgoing(buffer_t *buf, fd_t *fd, int size, void *arg) { buffer_init(buf, NULL, fd, size, arg); fd_add_writer(buf->out_fd, &buf->out_fd_list); fd_request_misc(buf->out_fd, &buf->out_misc); } void buffer_init_passthrough(buffer_t *buf, fd_t *in_fd, fd_t *out_fd, int size, void *arg) { buffer_init(buf, in_fd, out_fd, size, arg); fd_add_reader(buf->in_fd, &buf->in_fd_list); fd_add_writer(buf->out_fd, &buf->out_fd_list); if (size) fd_request_read(buf->in_fd, &buf->readok); fd_request_misc(buf->in_fd, &buf->in_misc); fd_request_misc(buf->out_fd, &buf->out_misc); buf->notify_eof=buffer_passthrough_waitforempty; buf->notify_readerror=buffer_passthrough_waitforempty; buf->notify_writeerror=buffer_passthrough_shutdown_reader; } int buffer_extract_chars(buffer_t *buf, char *dst, int size) { int len, residue; if (size>buf->used) size=buf->used; if (size && buf->size && buf->in_fd && buf->used==buf->size) fd_request_read(buf->in_fd, &buf->readok); len = (buf->offset+size > buf->size)? buf->size-buf->offset : size; memcpy(dst, buf->buffer+buf->offset, len); buf->offset+=len, buf->used-=len; if (buf->offset==buf->size) buf->offset=0; residue=size-len; if (residue) { memcpy(dst+len, buf->buffer, residue); buf->offset=residue, buf->used-=residue; } return size; } int buffer_append_chars(buffer_t *buf, char *src, int size) { int len, residue, start; if (size && buf->size && buf->out_fd && buf->used==0) fd_request_write(buf->out_fd, &buf->writeok); if (size>buf->size-buf->used) size=buf->size-buf->used; start=buf->offset+buf->used; if (start>buf->size) start-=buf->size; len = (start+size > buf->size)? buf->size-start : size; memcpy(buf->buffer+start, src, len); buf->used+=len; residue=size-len; if (residue) { memcpy(buf->buffer, src+len, residue); buf->used+=residue; } return size; }