chiark / gitweb /
Merge and end branch-hostside-wip-2008-01-25 PROPERLY; cvs up -j branch-hostside...
[trains.git] / hostside / obc.c
index 7fb3c1aa632221f786d5f5a67920234000ae1466..33ab3f26b453ade0203c75020028309e2fd3d865 100644 (file)
@@ -1,4 +1,7 @@
-/**/
+/*
+ * daemons
+ * output buffer chains
+ */
 
 #include <assert.h>
 #include <stdlib.h>
@@ -6,51 +9,76 @@
 #include <stdarg.h>
 #include <string.h>
 
-#include "hostside.h"
+#include "daemons.h"
 #include "../layout/dlist.h"
 
-static void *writeable(oop_source *evts, int fd,
-                      oop_event evt, void *ch_v) {
-  OutBufferChain *ch= ch_v;
+struct OutBuffer {
+  OutBuffer *back, *next;
+  char *m;
+  int l;
+};
+
+int obc_tryflush(OutBufferChain *ch) {
   OutBuffer *ob;
   int r;
   
-  assert(fd == ch->fd);
-  assert(evt == OOP_WRITE);
-
   for (;;) {
     ob= ch->obs.head;
     if (!ob) {
-      events->cancel_fd(events, fd, OOP_WRITE);
-      return OOP_CONTINUE;
+      events->cancel_fd(events, ch->fd, OOP_WRITE);
+      return 0;
     }
     if (ch->done_of_head == ob->l) {
       LIST_UNLINK(ch->obs, ob);
-      free(ob);
       free(ob->m);
+      free(ob);
       ch->done_of_head= 0;
       continue;
     }
     r= write(ch->fd, ob->m + ch->done_of_head, ob->l - ch->done_of_head);
     if (r==-1) {
       if (errno==EINTR) continue;
-      if (errno==EWOULDBLOCK) return OOP_CONTINUE;
+      if (errno==EWOULDBLOCK) return errno;
       ch->error(ch,"write",strerror(errno));
+      return errno;
     }
     assert(r>=0);
     ch->done_of_head += r;
+    ch->total -= r;
     assert(ch->done_of_head <= ob->l);
   }
 }
 
+static void *writeable(oop_source *evts, int fd,
+                      oop_event evt, void *ch_v) {
+  OutBufferChain *ch= ch_v;
+  assert(fd == ch->fd);
+  assert(evt == OOP_WRITE);
+  obc_tryflush(ch);
+  return OOP_CONTINUE;
+}
+
 static void addlink(OutBufferChain *ch, OutBuffer *ob) {
   if (!ch->obs.head)
     events->on_fd(events, ch->fd, OOP_WRITE, writeable, ch);
   LIST_LINK_TAIL(ch->obs, ob);
+  ch->total += ob->l;
+  obc_tryflush(ch);
+  if (ch->total > ch->limit) {
+    char what[128];
+    snprintf(what,sizeof(what)-1,"`%.*s...'", ob->l,ob->m);
+    what[sizeof(what)-1]= 0;
+    ch->error(ch,"buffer limit exceeded",what);
+  }
 }
 
 void obc_init(OutBufferChain *ch) {
+  int r;
   ch->done_of_head= 0;
+  ch->total= 0;
+  if (!ch->limit) ch->limit= 128*1024;
+  r= oop_fd_nonblock(ch->fd, 1);
+  if (r) diee("nonblock(OutBufferChain->fd,1)");
   LIST_INIT(ch->obs);
 }
 
@@ -69,6 +97,12 @@ void oprintf(OutBufferChain *ch, const char *msg, ...) {
   va_end(al);
 }
 
+void voerror(OutBufferChain *ch, const char *fmt, va_list al) {
+  oprintf(ch,"error ");
+  ovprintf(ch,fmt,al);
+  owrite(ch,"\n",1);
+}
+
 void owrite(OutBufferChain *ch, const char *data, int l) {
   OutBuffer *ob;
   ob= mmalloc(sizeof(*ob));