chiark / gitweb /
rcopy-repeatedly compiles, needs debugging
authorianmdlvl <ianmdlvl>
Fri, 3 Oct 2008 22:14:43 +0000 (22:14 +0000)
committerianmdlvl <ianmdlvl>
Fri, 3 Oct 2008 22:14:43 +0000 (22:14 +0000)
cprogs/.cvsignore
cprogs/Makefile
cprogs/rcopy-repeatedly.c

index 8d7246181d8ed83511bee4ff02682aba49a54741..ca3926348253309e38cc5350411d323e3c9ae37a 100644 (file)
@@ -7,3 +7,4 @@ xacpi-simple
 mcastsoundd
 summer
 watershed
 mcastsoundd
 summer
 watershed
+rcopy-repeatedly
index 584cda1d506cc71d7dd36dd38c9adda9207d81f0..e9f19e01bc7f12caeb9d8a3f4c59be337acc0833 100644 (file)
@@ -26,7 +26,7 @@ include ../settings.make
 RWBUFFER_SIZE_MB=16
 
 PROGRAMS=              readbuffer writebuffer with-lock-ex xacpi-simple \
 RWBUFFER_SIZE_MB=16
 
 PROGRAMS=              readbuffer writebuffer with-lock-ex xacpi-simple \
-                       summer watershed
+                       summer watershed rcopy-repeatedly
 SUIDSBINPROGRAMS=      really
 DAEMONS=               trivsoundd
 MAN1PAGES=             readbuffer.1 writebuffer.1 with-lock-ex.1
 SUIDSBINPROGRAMS=      really
 DAEMONS=               trivsoundd
 MAN1PAGES=             readbuffer.1 writebuffer.1 with-lock-ex.1
@@ -41,7 +41,7 @@ writebuffer:                  writebuffer.o   wrbufcore.o     rwbuffer.o
 trivsoundd:                    trivsoundd.o    wrbufcore.o     rwbuffer.o 
 really:                                really.o myopt.o
 
 trivsoundd:                    trivsoundd.o    wrbufcore.o     rwbuffer.o 
 really:                                really.o myopt.o
 
-really.o myopt.o: myopt.h
+really.o myopt.o rcopy-repeatedly.o: myopt.h
 readbuffer.o writebuffer.o rwbuffer.o wrbufcore.o trivsoundd.o:        rwbuffer.h
 
 xacpi-simple:  xacpi-simple.o
 readbuffer.o writebuffer.o rwbuffer.o wrbufcore.o trivsoundd.o:        rwbuffer.h
 
 xacpi-simple:  xacpi-simple.o
@@ -50,6 +50,9 @@ xacpi-simple: xacpi-simple.o
 summer:                summer.o
                $(CC) -o $@ $< -lnettle -lgmp
 
 summer:                summer.o
                $(CC) -o $@ $< -lnettle -lgmp
 
+rcopy-repeatedly: rcopy-repeatedly.o myopt.o
+               $(CC) -o $@ $^ -lm -lrt
+
 watershed:     watershed.o
                $(CC) -o $@ $< -lnettle -lgmp
 
 watershed:     watershed.o
                $(CC) -o $@ $< -lnettle -lgmp
 
index 3b643152148560bf35845d9dfe738cac10bf19ab..399e8aacc6e19a37a617bbae3f1d92cf5886d6ef 100644 (file)
@@ -1,7 +1,11 @@
+/*
+ * rcopy-repeatedly
+ */     
+  
 /*
  * protocol is:
  *   server sends banner
 /*
  * protocol is:
  *   server sends banner
- *    - "chiark-realtime-replicator\n" [27 bytes]
+ *    - "#rcopy-repeatedly#\n"
  *    - length of declaration, as 4 hex digits, zero prefixed,
  *      and a space [5 bytes].  In this protocol version this
  *      will be "0002" but client _must_ parse it.
  *    - length of declaration, as 4 hex digits, zero prefixed,
  *      and a space [5 bytes].  In this protocol version this
  *      will be "0002" but client _must_ parse it.
  *             receiver must then reply with 0x01 GO
  */
 
  *             receiver must then reply with 0x01 GO
  */
 
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <time.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <assert.h>
+#include <math.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "myopt.h"
+
 #define REPLMSG_GO     0x01
 #define REPLMSG_GO     0x01
-#define REPLMSG_RLE8   0x02
 #define REPLMSG_RM     0x03
 #define REPLMSG_FILE64 0x04
 
 #define REPLMSG_RM     0x03
 #define REPLMSG_FILE64 0x04
 
+static const char banner[]= "#rcopy-repeatedly#\n";
+
 static FILE *commsi, *commso;
 static FILE *commsi, *commso;
-static long interval_usec;
-static int bytes_per_sec_log2;
 
 
+static double max_bandwidth_proportion_mean= 0.2;
+static double max_bandwidth_proportion_burst= 0.8;
+static int txblocksz= INT_MAX;
+static int min_interval_usec= 100000; /* 100ms */
+
+static const char *rsh_program= "ssh";
+static const char *rcopy_repeatedly_program= "rcopy-repeatedly";
+
+static double stream_allow_secsperbyte= 1/1e6; /* for initial transfer */
+
+static char mainbuf[65536]; /* must be at least 2^16 */
+
+#define NORETURN __attribute__((noreturn))
+
+static void vdie(int ec, int eno, const char *fmt, va_list al) NORETURN;
 static void vdie(int ec, int eno, const char *fmt, va_list al) {
 static void vdie(int ec, int eno, const char *fmt, va_list al) {
-  fputs("realtime-replicator: ",stderr);
+  fputs("rcopy-repeatedly: ",stderr);
   vfprintf(stderr,fmt,al);
   if (eno!=-1) fprintf(stderr,": %s",strerror(eno));
   fputc('\n',stderr);
   exit(ec);
 }
   vfprintf(stderr,fmt,al);
   if (eno!=-1) fprintf(stderr,": %s",strerror(eno));
   fputc('\n',stderr);
   exit(ec);
 }
+static void die(int ec, int eno, const char *fmt, ...) NORETURN;
 static void die(int ec, int eno, const char *fmt, ...) {
 static void die(int ec, int eno, const char *fmt, ...) {
-  va_arg al;
+  va_list al;
   va_start(al,fmt);
   va_start(al,fmt);
-  vdiee(ec,eno,fmt,al);
+  vdie(ec,eno,fmt,al);
 }
 
 }
 
+static void diem(void) NORETURN;
 static void diem(void) { die(16,errno,"malloc failed"); }
 
 static void diem(void) { die(16,errno,"malloc failed"); }
 
+static void diee(const char *fmt, ...) NORETURN;
 static void diee(const char *fmt, ...) {
 static void diee(const char *fmt, ...) {
-  va_arg al;
+  va_list al;
   va_start(al,fmt);
   va_start(al,fmt);
-  diee(8,errno,fmt,al);
+  vdie(12,errno,fmt,al);
 }
 }
+static void die_protocol(const char *fmt, ...) NORETURN;
 static void die_protocol(const char *fmt, ...) {
 static void die_protocol(const char *fmt, ...) {
-  va_arg al;
+  va_list al;
+  fputs("protocol error: ",stderr);
   va_start(al,fmt);
   va_start(al,fmt);
-  diee(12,-1,fmt,al);
+  vdie(10,-1,fmt,al);
+}
 
 
+static void die_badrecv(const char *what) NORETURN;
 static void die_badrecv(const char *what) {
   if (ferror(commsi)) diee("communication failed while receiving %s", what);
   if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what);
   abort();
 }
 static void die_badrecv(const char *what) {
   if (ferror(commsi)) diee("communication failed while receiving %s", what);
   if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what);
   abort();
 }
+static void die_badsend(void) NORETURN;
 static void die_badsend(void) {
   diee("transmission failed");
 }
 
 static void die_badsend(void) {
   diee("transmission failed");
 }
 
-static void receiver_write(const unsigned char *buf, int n,
-                          FILE *newfile, const char *tmpfilename) {
-  int r;
+static void send_flush(void) {
+  if (ferror(commso) || fflush(commso))
+    die_badsend();
+}
+static void sendbyte(int c) {
+  if (putc(c,commso)==EOF)
+    die_badsend();
+}
 
 
-  r= fwrite(dbuf,1,n,newfile);
-  if (r != n) diee("failed to write temporary receiving file `%s'",
-                  tmpfilename);
+static void mpipe(int p[2]) { if (pipe(p)) diee("could not create pipe"); }
+static void mdup2(int fd, int fd2) {
+  if (dup2(fd,fd2)!=fd2) diee("could not dup2(%d,%d)",fd,fd2);
 }
 
 }
 
-typedef static void copyfile_die_fn(FILE *f, const char *xi);
+typedef void copyfile_die_fn(FILE *f, const char *xi);
+
+struct timespec ts_sendstart;
+
+static void mgettime(struct timespec *ts) {
+  int r= clock_gettime(CLOCK_MONOTONIC, ts);
+  if (r) diee("clock_gettime failed");
+}
+
+static void bandlimit_sendstart(void) {
+  mgettime(&ts_sendstart);
+}
+
+static double mgettime_elapsed(struct timespec ts_base,
+                              struct timespec *ts_ret) {
+  mgettime(ts_ret);
+  return (ts_ret->tv_sec - ts_base.tv_sec) +
+         (ts_ret->tv_nsec - ts_base.tv_nsec)*1e-9;
+}
+
+static void bandlimit_sendend(uint64_t bytes, int *interval_usec_update) {
+  struct timespec ts_buf;
+  double elapsed= mgettime_elapsed(ts_sendstart, &ts_buf);
+  double secsperbyte_observed= elapsed / bytes;
+
+  stream_allow_secsperbyte=
+    secsperbyte_observed / max_bandwidth_proportion_burst;
+
+  double min_update= elapsed * max_bandwidth_proportion_burst
+    / max_bandwidth_proportion_mean;
+  if (min_update > 1e3) min_update= 1e3;
+  int min_update_usec= min_update * 1e6;
+
+  if (*interval_usec_update > min_update_usec)
+    *interval_usec_update= min_update_usec;
+}
  
  
-static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi
+static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi,
                     FILE *df, copyfile_die_fn *ddie, const char *dxi,
                     uint64_t l) {
                     FILE *df, copyfile_die_fn *ddie, const char *dxi,
                     uint64_t l) {
-  char buf[65536];
+  struct timespec ts_last;
+  int now, r;
 
 
-fixme rate limit
-fixme adjustable chunk size
+  ts_last= ts_sendstart;
 
   while (l>=0) {
 
   while (l>=0) {
-    now= l < sizeof(buf) ? l : sizeof(buf);
+    now= l < sizeof(mainbuf) ? l : sizeof(mainbuf);
+    if (now > txblocksz) now= txblocksz;
 
 
-    
+    double elapsed_want= now * stream_allow_secsperbyte;
+    double elapsed= mgettime_elapsed(ts_last, &ts_last);
+    double needwait= elapsed_want - elapsed;
+    if (needwait > 1) needwait= 1;
+    if (needwait > 0) usleep(ceil(needwait * 1e6));
 
 
-    r= fread(buf,1,now,sf);  if (r!=now) sdie(sf,sxi);
-    r= fwrite(buf,1,now,df);  if (r!=now) ddie(df,dxi);
+    r= fread(mainbuf,1,now,sf);  if (r!=now) sdie(sf,sxi);
+    r= fwrite(mainbuf,1,now,df);  if (r!=now) ddie(df,dxi);
     l -= now;
   }
 }
 
     l -= now;
   }
 }
 
+static void copydie_inputfile(FILE *f, const char *filename) {
+  diee("read failed on source file `%s'", filename);
+}
 static void copydie_tmpwrite(FILE *f, const char *tmpfilename) {
   diee("write failed to temporary receiving file `%s'", tmpfilename);
 }
 static void copydie_tmpwrite(FILE *f, const char *tmpfilename) {
   diee("write failed to temporary receiving file `%s'", tmpfilename);
 }
@@ -109,8 +202,10 @@ static void copydie_commso(FILE *f, const char *what) {
 static void receiver(const char *filename) {
   FILE *newfile;
   char *tmpfilename;
 static void receiver(const char *filename) {
   FILE *newfile;
   char *tmpfilename;
+  int r, c;
 
 
-  if (asprintf(&tmpfilename, ".realtime-replicator.#%s#")==-1) diem();
+  if (asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename)
+      ==-1) diem();
   
   r= unlink(tmpfilename);
   if (r && errno!=ENOENT)
   
   r= unlink(tmpfilename);
   if (r && errno!=ENOENT)
@@ -128,7 +223,7 @@ static void receiver(const char *filename) {
       return;
 
     case REPLMSG_FILE64:
       return;
 
     case REPLMSG_FILE64:
-      newfile= fopen(tmpfilename, "w");
+      newfile= fopen(tmpfilename, "wb");
       if (!newfile) diee("could not create temporary receiving file `%s'",
                         tmpfilename);
       uint8_t lbuf[8];
       if (!newfile) diee("could not create temporary receiving file `%s'",
                         tmpfilename);
       uint8_t lbuf[8];
@@ -164,23 +259,43 @@ static void receiver(const char *filename) {
 }
 
 static void sender(const char *filename) {
 }
 
 static void sender(const char *filename) {
-  FILE *f;
-  int told_removed= 0;
-  struct stat stab;
-
+  FILE *f, *fold;
+  int interval_usec, r, c;
+  struct stat stabtest, stab;
+  enum { told_nothing, told_file, told_remove } told;
+
+  interval_usec= 0;
+  fold= 0;
+  told= told_nothing;
+  
   for (;;) {
   for (;;) {
-    f= fopen(filename, "r");
+    if (interval_usec) usleep(interval_usec);
+    interval_usec= min_interval_usec;
+
+    r= stat(filename, &stabtest);
+    if (r) {
+      f= 0;
+    } else {
+      if (told == told_file &&
+         stabtest.st_mode == stab.st_mode &&
+         stabtest.st_dev == stab.st_dev &&
+         stabtest.st_ino == stab.st_ino)
+       continue;
+      f= fopen(filename, "rb");
+    }
+    
     if (!f) {
       if (errno!=ENOENT) diee("could not access source file `%s'",filename);
     if (!f) {
       if (errno!=ENOENT) diee("could not access source file `%s'",filename);
-      if (told_removed) {
-       usleep(interval_usec);
-       continue;
+      if (told != told_remove) {
+       sendbyte(REPLMSG_RM);
+       told= told_remove;
       }
       }
-      told_removed= 1;
-      sendbyte(REPLMSG_RM);
       continue;
     }
 
       continue;
     }
 
+    if (fold) fclose(fold);
+    fold= 0;
+
     r= fstat(fileno(f),&stab);
     if (r) diee("could not fstat source file `%s'",filename);
 
     r= fstat(fileno(f),&stab);
     if (r) diee("could not fstat source file `%s'",filename);
 
@@ -198,12 +313,14 @@ static void sender(const char *filename) {
       stab.st_size       >>  8,
       stab.st_size
     };
       stab.st_size       >>  8,
       stab.st_size
     };
+
+    bandlimit_sendstart();
     
     r= fwrite(hbuf,1,9,commso);  if (r!=9) die_badsend();
     
     r= fwrite(hbuf,1,9,commso);  if (r!=9) die_badsend();
-    told_removed= 0;
 
     copyfile(f, copydie_inputfile,filename,
 
     copyfile(f, copydie_inputfile,filename,
-            commso, copydie_commso,0);
+            commso, copydie_commso,0,
+            stab.st_size);
 
     send_flush();
 
 
     send_flush();
 
@@ -212,9 +329,158 @@ static void sender(const char *filename) {
     c= fgetc(commsi);  if (c==EOF) die_badrecv("ack");
     if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c);
 
     c= fgetc(commsi);  if (c==EOF) die_badrecv("ack");
     if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c);
 
-    usleep(interval_usec);
+    bandlimit_sendend(stab.st_size, &interval_usec);
+
+    fold= f;
+    told= told_file;
+  }
+}
+
+void usagemessage(void) {
+  puts("usage: rcopy-repeatedly [<options>] <file> <file>\n"
+       " <file> may be <local-file> or [<user>@]<host>:<file>\n"
+       " exactly one of each of the two forms must be provided\n"
+       " a file is taken as remote if it has a : before the first /\n"
+       "options\n"
+       " --help\n");
+}
+
+static void of_help(const struct cmdinfo *ci, const char *val) {
+  usagemessage();
+  if (ferror(stdout)) diee("could not write usage message to stdout");
+  exit(0);
+}
+
+typedef struct {
+  const char *userhost, *path;
+} FileSpecification;
+
+static FileSpecification srcspec, dstspec;
+static int upload=-1; /* -1 means not yet known; 0 means download */
+
+static const struct cmdinfo cmdinfos[]= {
+  { "help",     .call= of_help },
+  { "receiver", .iassignto=&upload, .arg=1 },
+  { "sender",   .iassignto=&upload, .arg=0 },
+  { 0 }
+};
+
+static void server(const char *filename) {
+  int c;
+  commsi= stdin;
+  commso= stdout;
+  fprintf(commso, "%s0002 %c\n", banner, upload?'u':'d');
+  send_flush();
+  c= fgetc(commsi);  if (c==EOF) die_badrecv("initial go");
+  if (c!=REPLMSG_GO) die_protocol("initial go message was %#02x instead",c);
+
+  if (upload)
+    receiver(filename);
+  else
+    sender(filename);
+}
+
+static void client(void) {
+  int uppipe[2], downpipe[2], r;
+  pid_t child;
+
+  mpipe(uppipe);
+  mpipe(downpipe);
+
+  FileSpecification *remotespec= srcspec.userhost ? &srcspec : &dstspec;
+  const char *remotemode= srcspec.userhost ? "--sender" : "--receiver";
+
+  child= fork();
+  if (child==-1) diee("fork failed");
+  if (!child) {
+    mdup2(uppipe[0],0);
+    mdup2(downpipe[1],1);
+    close(uppipe[0]); close(downpipe[0]);
+    close(uppipe[1]); close(downpipe[1]);
+    execlp(rsh_program,
+          rsh_program, remotespec->userhost, rcopy_repeatedly_program,
+          remotemode, remotespec->path, (char*)0);
+    diee("failed to execute rsh program `%s'",rsh_program);
   }
   }
+
+  commso= fdopen(uppipe[1],"wb");
+  commsi= fdopen(downpipe[0],"rb");
+  if (!commso || !commsi) diee("fdopen failed");
+  close(uppipe[0]); close(downpipe[0]);
+  close(uppipe[1]); close(downpipe[1]);
+  
+  char banbuf[sizeof(banner)-1 + 5 + 1];
+  r= fread(banbuf,1,sizeof(banbuf)-1,commsi);
+  if (r!=sizeof(banbuf)-1) die_badrecv("banner");
+
+  if (memcmp(banbuf,banner,sizeof(banner)-1) ||
+      banbuf[sizeof(banner)-1 + 4] != ' ')
+    die(8,-1,"banner received was not as expected -"
+       " shell dirty? ssh broken?\n"
+       " try running   %s %s %s --sender %s\n"
+       " and expect the first line to be   %s",
+       rsh_program, remotespec->userhost,
+       rcopy_repeatedly_program, remotespec->path,
+       banner);
+  
+  banbuf[sizeof(banbuf)-1]= 0;
+  char *ep;
+  long decllen= strtoul(banbuf + sizeof(banner)-1, &ep, 16);
+  if (*ep) die_protocol("declaration length syntax error");
+  assert(decllen <= sizeof(mainbuf));
+  if (decllen<2) die_protocol("declaration too short");
+
+  r= fread(mainbuf,1,decllen,commsi);
+  if (r!=decllen) die_badrecv("declaration");
+  if (mainbuf[decllen-1] != '\n')
+    die_protocol("declaration missing final newline");
+  if (mainbuf[0] != upload ? 'u' : 'd')
+    die_protocol("declaration incorrect direction indicator");
+
+  sendbyte(REPLMSG_GO);
+
+  if (upload)
+    sender(srcspec.path);
+  else
+    receiver(dstspec.path);
 }
 
 }
 
-int main(int argc, const char **argv) {
-  for (
+static void parse_file_specification(FileSpecification *fs, const char *arg,
+                                    const char *what) {
+  const char *colon;
+  
+  if (!arg) badusage("too few arguments - missing %s\n",what);
+
+  for (colon=arg; ; colon++) {
+    if (!*colon || *colon=='/') {
+      fs->userhost=0;
+      fs->path= arg;
+      return;
+    }
+    if (*colon==':') {
+      char *uh= malloc(colon-arg + 1);  if (!fs->userhost) diem();
+      memcpy(uh,arg, colon-arg);  uh[colon-arg]= 0;
+      fs->userhost= uh;
+      fs->path= colon+1;
+      return;
+    }
+  }
+}
+
+int main(int argc, const char *const *argv) {
+  myopt(&argv, cmdinfos);
+
+  if (upload>=0) {
+    if (!argv[0] || argv[1])
+      badusage("server mode must have just the filename as non-option arg");
+    server(argv[0]);
+  } else {
+    parse_file_specification(&srcspec, argv[0], "source");
+    parse_file_specification(&dstspec, argv[1], "destination");
+    if (argv[2]) badusage("too many non-option arguments");
+    if (!!srcspec.userhost == !!dstspec.userhost)
+      badusage("need exactly one remote file argument");
+    client();
+  }
+  return 0;
+}