chiark / gitweb /
wip rcopy-repeatedly
[chiark-utils.git] / cprogs / rcopy-repeatedly.c
index 489ab2daf5ec09e15cc2ae0e8d567fb09de6d36c..3b643152148560bf35845d9dfe738cac10bf19ab 100644 (file)
  *   client sends
  *    - 0x01   go
  * then for each update
- *   sender sends
- *    - 0x02   update using rle and 8-bit counts
- *    - zero or more repetitions of
- *        n    single byte giving length of data same as last time
- *        d    single byte giving length of data changed
- *        ...  d bytes of data
- *        where n!=0 or d!=0 (and for first update, n==0)
- *    - 0x00 0x00
- *       indicates file is complete and should be installed
- *   or server may send
+ *   sender sends one of
  *    - 0x03   destination file should be deleted
  *             but note that contents must be retained by receiver
  *             as it may be used for rle updates
+ *    - 0x04   complete new destination file follows, 64-bit length
+ *        l    8 bytes big endian length
+ *        ...  l bytes data
+ *             receiver must then reply with 0x01 GO
  */
 
-#define REPLMSG_GO   0x01
-#define REPLMSG_RLE8 0x02
-#define REPLMSG_RM   0x03
+#define REPLMSG_GO     0x01
+#define REPLMSG_RLE8   0x02
+#define REPLMSG_RM     0x03
+#define REPLMSG_FILE64 0x04
 
-static void vdie(int ec, const char *fmt, const char *emstr, va_list al) {
+static FILE *commsi, *commso;
+static long interval_usec;
+static int bytes_per_sec_log2;
+
+static void vdie(int ec, int eno, const char *fmt, va_list al) {
   fputs("realtime-replicator: ",stderr);
   vfprintf(stderr,fmt,al);
-  if (emstr) fprintf(stderr,": %s",emstr);
+  if (eno!=-1) fprintf(stderr,": %s",strerror(eno));
   fputc('\n',stderr);
   exit(ec);
 }
-static void die(int ec, const char *fmt, const char *emstr, ...) {
+static void die(int ec, int eno, const char *fmt, ...) {
   va_arg al;
   va_start(al,fmt);
-  vdiee(ec,fmt,emstr,al);
+  vdiee(ec,eno,fmt,al);
 }
 
-static void diem(void) { die(16,"malloc failed",strerror(errno)); }
+static void diem(void) { die(16,errno,"malloc failed"); }
 
 static void diee(const char *fmt, ...) {
-  const char *em= strerror(errno);
   va_arg al;
   va_start(al,fmt);
-  diee(8,fmt,em,al);
+  diee(8,errno,fmt,al);
 }
 static void die_protocol(const char *fmt, ...) {
   va_arg al;
   va_start(al,fmt);
-  diee(12,fmt,0,al);
+  diee(12,-1,fmt,al);
 
-static void die_badrecv(FILE *f, const char *what) {
-  if (ferror(f)) diee("transmission failed while receiving %s", what);
-  if (feof(f)) die_protocol("receiver got unexpected EOF in %s", what);
+static void die_badrecv(const char *what) {
+  if (ferror(commsi)) diee("communication failed while receiving %s", what);
+  if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what);
   abort();
 }
+static void die_badsend(void) {
+  diee("transmission failed");
+}
 
 static void receiver_write(const unsigned char *buf, int n,
                           FILE *newfile, const char *tmpfilename) {
@@ -72,9 +74,40 @@ static void receiver_write(const unsigned char *buf, int n,
   if (r != n) diee("failed to write temporary receiving file `%s'",
                   tmpfilename);
 }
+
+typedef static void copyfile_die_fn(FILE *f, const char *xi);
  
-static void receiver(const char *filename, FILE *comms) {
-  FILE *lastfile=0, *newfile;
+static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi
+                    FILE *df, copyfile_die_fn *ddie, const char *dxi,
+                    uint64_t l) {
+  char buf[65536];
+
+fixme rate limit
+fixme adjustable chunk size
+
+  while (l>=0) {
+    now= l < sizeof(buf) ? l : sizeof(buf);
+
+    
+
+    r= fread(buf,1,now,sf);  if (r!=now) sdie(sf,sxi);
+    r= fwrite(buf,1,now,df);  if (r!=now) ddie(df,dxi);
+    l -= now;
+  }
+}
+
+static void copydie_tmpwrite(FILE *f, const char *tmpfilename) {
+  diee("write failed to temporary receiving file `%s'", tmpfilename);
+}
+static void copydie_commsi(FILE *f, const char *what) {
+  die_badrecv(what);
+}
+static void copydie_commso(FILE *f, const char *what) {
+  die_badsend();
+}
+  
+static void receiver(const char *filename) {
+  FILE *newfile;
   char *tmpfilename;
 
   if (asprintf(&tmpfilename, ".realtime-replicator.#%s#")==-1) diem();
@@ -84,120 +117,104 @@ static void receiver(const char *filename, FILE *comms) {
     diee("could not remove temporary receiving file `%s'", tmpfilename);
   
   for (;;) {
-    c= fgetc(comms);  if (c==EOF) break;
+    send_flush();
+    c= fgetc(commsi);
+
     switch (c) {
 
-    case REPLMSG_RLE8:
+    case EOF:
+      if (ferror(commsi)) die_badrecv("transfer message code");
+      assert(feof(commsi));
+      return;
+
+    case REPLMSG_FILE64:
       newfile= fopen(tmpfilename, "w");
       if (!newfile) diee("could not create temporary receiving file `%s'",
                         tmpfilename);
-      for (;;) {
-       unsigned char lu[2], dbuf[255];
-       r= fread(lu,1,2,comms);  if (r!=2) die_badrecv(comms,"RLE8 elem hdr");
-       if (!lu[0] && !lu[1]) break;
-       if (lu[0]) {
-         if (!lastfile) die_protocol("first RLE8 requests some previous");
-         r= fread(dbuf,1,lu[0],lastfile);
-         if (r!=lu[0]) {
-           if (ferror(lastfile)) diee("could not read current (old) file"
-                                      " `%s'", filename);
-           assert(feof(lastfile));
-           die_protocol("RLE8 requests more previous than is available");
-         }
-         receiver_write(dbuf,lu[0],newfile,tmpfilename);
-       }
-       if (lu[1]) {
-         r= fread(dbuf,1,lu[1],comms);
-         if (r!=lu[1]) die_badrecv(comms,"RLE8 literal data");
-         receiver_write(dbuf,lu[1],newfile,tmpfilename);
-       }
-      }
-      if (fflush(newfile)) diee("could not flush temporary receiving file"
-                               " `%s'", tmpfilename);
+      uint8_t lbuf[8];
+      r= fread(lbuf,1,8,commsi);  if (r!=8) die_badrecv("FILE64 l");
+
+      uint64_t l=
+       (lbuf[0] << 28 << 28) |
+       (lbuf[1] << 24 << 24) |
+       (lbuf[2] << 16 << 24) |
+       (lbuf[3] <<  8 << 24) |
+       (lbuf[4]       << 24) |
+       (lbuf[5]       << 16) |
+       (lbuf[6]       <<  8) |
+       (lbuf[7]            ) ;
+
+      copyfile(commsi, copydie_commsi,"FILE64 file data",
+              newfile, copydie_tmpwrite,tmpfilename,
+              l);
+
+      if (fclose(newfile)) diee("could not flush and close temporary"
+                               " receiving file `%s'", tmpfilename);
       if (rename(tmpfilename, filename))
        diee("could not install new version of destination file `%s'",
             filename);
-      if (fclose(lastfile)) diee("failed to close old (now unlinked) file");
-      lastfile= newfile;
-      r= fseek(lastfile,0,SEEK_SET);
-      if (r) diee("failed to seek installed destination file `%s'", filename);
-      continue;
 
-    case REPLMSG_RM:
-      r= unlink(filename);
-      if (r && errno!=ENOENT)
-       diee("could not remove destination file `%s' (as instructed"
-            " by sender)", filename);
-      continue;
+      sendbyte(REPLMSG_GO);
 
     default:
       die_protocol("unknown transfer message code 0x%02x",c);
+
     }
   }
-  if (feof(comms)) return 0;
-  die_badrecv(comms,"transfer message code");
 }
 
- static void sender(const char *filename, FILE *comms,
-                   unsigned long interval_usec) {
-  struct stat stab, laststab;
-  memset(&stab,0,sizeof(stab));
-  FILE *lastfile, *newfile;
+static void sender(const char *filename) {
+  FILE *f;
   int told_removed= 0;
+  struct stat stab;
 
   for (;;) {
-    r= stat(filename, &stab);
-    if (r) {
-      newfile= 0;
-    } else {
-      if (stab.st_dev == laststab.st_dev &&
-         stab.st_ino == laststab.st_ino) {
-       usleep(interval_usec);
-       continue;
-      }
-      newfile= fopen(filename, "r");
-    }
-    if (!newfile) {
+    f= fopen(filename, "r");
+    if (!f) {
       if (errno!=ENOENT) diee("could not access source file `%s'",filename);
       if (told_removed) {
        usleep(interval_usec);
        continue;
       }
-      sender_sendbyte(REPLMSG_RM);
+      told_removed= 1;
+      sendbyte(REPLMSG_RM);
       continue;
     }
 
-    for (;;) {
-      n= 0;
-      while (n<255) {
-       cl= nextchar(lastfile);
-       cn= nextchar(newfile);
-       if (cl!=cn) { 
-
-       if (!feof(lastfile)) {
-         cl= fgetc(lastfile);
-         if (ferror(lastfile))
-           diee("could not read old source file `%s'",filename);
-         assert(cl != EOF || feof(lastfile));
-       }
-       
-      if (errno!=ENOENT) diee("could not open source file `%s'",filename);
-      sender_send
-
-    if (r) 
-}
+    r= fstat(fileno(f),&stab);
+    if (r) diee("could not fstat source file `%s'",filename);
+
+    if (!S_ISREG(stab.st_mode))
+      die(12,-1,"source file `%s' is not a plain file",filename);
 
-static void sender( 
+    uint8_t hbuf[9]= {
+      REPLMSG_FILE64,
+      stab.st_size >> 28 >> 28,
+      stab.st_size >> 24 >> 24,
+      stab.st_size >> 16 >> 24,
+      stab.st_size >>  8 >> 24,
+      stab.st_size       >> 24,
+      stab.st_size       >> 16,
+      stab.st_size       >>  8,
+      stab.st_size
+    };
+    
+    r= fwrite(hbuf,1,9,commso);  if (r!=9) die_badsend();
+    told_removed= 0;
 
-           assert(ferror(newfile));
-           
+    copyfile(f, copydie_inputfile,filename,
+            commso, copydie_commso,0);
 
-         r= fread(dbuf,1,lu[0],comms);
-         if (r!=lu[0]) die_badrecv(comms,
-           
+    send_flush();
 
-       c= fgetc(comms);
+    if (fclose(f)) diee("couldn't close source file `%s'",filename);
 
+    c= fgetc(commsi);  if (c==EOF) die_badrecv("ack");
+    if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c);
+
+    usleep(interval_usec);
+  }
+}
 
 int main(int argc, const char **argv) {
   for (