chiark / gitweb /
rcopy-repeatedly seems to mostly work
[chiark-utils.git] / cprogs / rcopy-repeatedly.c
index 399e8aacc6e19a37a617bbae3f1d92cf5886d6ef..86cd3170388c2d9a66d32b7925616411a8a38874 100644 (file)
@@ -54,13 +54,19 @@ static const char banner[]= "#rcopy-repeatedly#\n";
 
 static FILE *commsi, *commso;
 
-static double max_bandwidth_proportion_mean= 0.2;
-static double max_bandwidth_proportion_burst= 0.8;
-static int txblocksz= INT_MAX;
+static double max_bw_prop_mean= 0.2;
+static double max_bw_prop_burst= 0.8;
+static int txblocksz= INT_MAX, verbose=1;
 static int min_interval_usec= 100000; /* 100ms */
 
-static const char *rsh_program= "ssh";
+static int nsargs;
+static const char **sargs;
+
+static const char *rsh_program= 0;
 static const char *rcopy_repeatedly_program= "rcopy-repeatedly";
+static int server_upcopy=-1; /* -1 means not yet known; 0 means download */
+  /* `up' means towards the client,
+   * since we regard the subprocess as `down' */
 
 static double stream_allow_secsperbyte= 1/1e6; /* for initial transfer */
 
@@ -68,36 +74,53 @@ static char mainbuf[65536]; /* must be at least 2^16 */
 
 #define NORETURN __attribute__((noreturn))
 
-static void vdie(int ec, int eno, const char *fmt, va_list al) NORETURN;
-static void vdie(int ec, int eno, const char *fmt, va_list al) {
+static void vdie(int ec, const char *pfx, int eno,
+                const char *fmt, va_list al) NORETURN;
+static void vdie(int ec, const char *pfx, int eno,
+                const char *fmt, va_list al) {
   fputs("rcopy-repeatedly: ",stderr);
+  if (server_upcopy>=0) fputs("server: ",stderr);
+  if (pfx) fprintf(stderr,"%s: ",pfx);
   vfprintf(stderr,fmt,al);
   if (eno!=-1) fprintf(stderr,": %s",strerror(eno));
   fputc('\n',stderr);
   exit(ec);
 }
-static void die(int ec, int eno, const char *fmt, ...) NORETURN;
-static void die(int ec, int eno, const char *fmt, ...) {
+static void die(int ec, const char *pfx, int eno,
+               const char *fmt, ...) NORETURN;
+static void die(int ec, const char *pfx, int eno,
+               const char *fmt, ...) {
   va_list al;
   va_start(al,fmt);
-  vdie(ec,eno,fmt,al);
+  vdie(ec,pfx,eno,fmt,al);
 }
 
 static void diem(void) NORETURN;
-static void diem(void) { die(16,errno,"malloc failed"); }
+static void diem(void) { die(16,0,errno,"malloc failed"); }
+static void *xmalloc(size_t sz) {
+  assert(sz);
+  void *p= malloc(sz);
+  if (!p) diem();
+  return p;
+}
+static void *xrealloc(void *p, size_t sz) {
+  assert(sz);
+  p= realloc(p,sz);
+  if (!p) diem();
+  return p;
+}
 
 static void diee(const char *fmt, ...) NORETURN;
 static void diee(const char *fmt, ...) {
   va_list al;
   va_start(al,fmt);
-  vdie(12,errno,fmt,al);
+  vdie(12,0,errno,fmt,al);
 }
 static void die_protocol(const char *fmt, ...) NORETURN;
 static void die_protocol(const char *fmt, ...) {
   va_list al;
-  fputs("protocol error: ",stderr);
   va_start(al,fmt);
-  vdie(10,-1,fmt,al);
+  vdie(10,"protocol error",-1,fmt,al);
 }
 
 static void die_badrecv(const char *what) NORETURN;
@@ -145,44 +168,78 @@ static double mgettime_elapsed(struct timespec ts_base,
          (ts_ret->tv_nsec - ts_base.tv_nsec)*1e-9;
 }
 
+static void flushstderr(void) {
+  if (ferror(stderr) || fflush(stderr))
+    diee("could not write progress to stderr");
+}
+
+static void verbosespinprintf(const char *fmt, ...) {
+  static const char spinnerchars[]= "/-\\";
+  static int spinnerchar;
+
+  if (!verbose)
+    return;
+
+  va_list al;
+  va_start(al,fmt);
+  fprintf(stderr,"      %c ",spinnerchars[spinnerchar]);
+  spinnerchar++; spinnerchar %= sizeof(spinnerchars)-1;
+  vfprintf(stderr,fmt,al);
+  putc('\r',stderr);
+  flushstderr();
+}
+
 static void bandlimit_sendend(uint64_t bytes, int *interval_usec_update) {
   struct timespec ts_buf;
   double elapsed= mgettime_elapsed(ts_sendstart, &ts_buf);
   double secsperbyte_observed= elapsed / bytes;
 
   stream_allow_secsperbyte=
-    secsperbyte_observed / max_bandwidth_proportion_burst;
+    secsperbyte_observed * max_bw_prop_mean / max_bw_prop_burst;
 
-  double min_update= elapsed * max_bandwidth_proportion_burst
-    / max_bandwidth_proportion_mean;
+  double min_update= elapsed / max_bw_prop_mean;
   if (min_update > 1e3) min_update= 1e3;
   int min_update_usec= min_update * 1e6;
 
   if (*interval_usec_update > min_update_usec)
     *interval_usec_update= min_update_usec;
+
+  verbosespinprintf("%12lluby %10.3fs %13.2fkby/s",
+                   (unsigned long long)bytes, elapsed,
+                   1e-3/secsperbyte_observed);
 }
  
 static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi,
                     FILE *df, copyfile_die_fn *ddie, const char *dxi,
-                    uint64_t l) {
+                    uint64_t lstart, int amsender) {
   struct timespec ts_last;
   int now, r;
+  uint64_t l=lstart, done=0;
 
   ts_last= ts_sendstart;
 
-  while (l>=0) {
+  while (l>0) {
     now= l < sizeof(mainbuf) ? l : sizeof(mainbuf);
     if (now > txblocksz) now= txblocksz;
 
-    double elapsed_want= now * stream_allow_secsperbyte;
-    double elapsed= mgettime_elapsed(ts_last, &ts_last);
-    double needwait= elapsed_want - elapsed;
-    if (needwait > 1) needwait= 1;
-    if (needwait > 0) usleep(ceil(needwait * 1e6));
+    if (verbose) {
+      fprintf(stderr," %3d%% \r",
+             (int)(done*100.0/lstart));
+      flushstderr();
+    }
+
+    if (amsender) {
+      double elapsed_want= now * stream_allow_secsperbyte;
+      double elapsed= mgettime_elapsed(ts_last, &ts_last);
+      double needwait= elapsed_want - elapsed;
+      if (needwait > 1) needwait= 1;
+      if (needwait > 0) usleep(ceil(needwait * 1e6));
+    }
 
     r= fread(mainbuf,1,now,sf);  if (r!=now) sdie(sf,sxi);
     r= fwrite(mainbuf,1,now,df);  if (r!=now) ddie(df,dxi);
     l -= now;
+    done += now;
   }
 }
 
@@ -204,8 +261,13 @@ static void receiver(const char *filename) {
   char *tmpfilename;
   int r, c;
 
-  if (asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename)
-      ==-1) diem();
+  char *lastslash= strrchr(filename,'/');
+  if (!lastslash)
+    r= asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename);
+  else
+    r= asprintf(&tmpfilename, "%.*s/.rcopy-repeatedly.#%s#",
+               (int)(lastslash-filename), filename, lastslash+1);
+  if (r==-1) diem();
   
   r= unlink(tmpfilename);
   if (r && errno!=ENOENT)
@@ -222,6 +284,13 @@ static void receiver(const char *filename) {
       assert(feof(commsi));
       return;
 
+    case REPLMSG_RM:
+      r= unlink(filename);
+      if (r && errno!=ENOENT)
+       diee("source file removed but could not remove destination file `%s'",
+            filename);
+      break;
+      
     case REPLMSG_FILE64:
       newfile= fopen(tmpfilename, "wb");
       if (!newfile) diee("could not create temporary receiving file `%s'",
@@ -241,7 +310,7 @@ static void receiver(const char *filename) {
 
       copyfile(commsi, copydie_commsi,"FILE64 file data",
               newfile, copydie_tmpwrite,tmpfilename,
-              l);
+              l, 0);
 
       if (fclose(newfile)) diee("could not flush and close temporary"
                                " receiving file `%s'", tmpfilename);
@@ -250,6 +319,7 @@ static void receiver(const char *filename) {
             filename);
 
       sendbyte(REPLMSG_GO);
+      break;
 
     default:
       die_protocol("unknown transfer message code 0x%02x",c);
@@ -269,7 +339,10 @@ static void sender(const char *filename) {
   told= told_nothing;
   
   for (;;) {
-    if (interval_usec) usleep(interval_usec);
+    if (interval_usec) {
+      send_flush();
+      usleep(interval_usec);
+    }
     interval_usec= min_interval_usec;
 
     r= stat(filename, &stabtest);
@@ -277,9 +350,11 @@ static void sender(const char *filename) {
       f= 0;
     } else {
       if (told == told_file &&
-         stabtest.st_mode == stab.st_mode &&
-         stabtest.st_dev == stab.st_dev &&
-         stabtest.st_ino == stab.st_ino)
+         stabtest.st_mode  == stab.st_mode  &&
+         stabtest.st_dev   == stab.st_dev   &&
+         stabtest.st_ino   == stab.st_ino   &&
+         stabtest.st_mtime == stab.st_mtime &&
+         stabtest.st_size  == stab.st_size)
        continue;
       f= fopen(filename, "rb");
     }
@@ -287,6 +362,7 @@ static void sender(const char *filename) {
     if (!f) {
       if (errno!=ENOENT) diee("could not access source file `%s'",filename);
       if (told != told_remove) {
+       verbosespinprintf(" ENOENT                                       ");
        sendbyte(REPLMSG_RM);
        told= told_remove;
       }
@@ -300,7 +376,7 @@ static void sender(const char *filename) {
     if (r) diee("could not fstat source file `%s'",filename);
 
     if (!S_ISREG(stab.st_mode))
-      die(12,-1,"source file `%s' is not a plain file",filename);
+      die(8,0,-1,"source file `%s' is not a plain file",filename);
 
     uint8_t hbuf[9]= {
       REPLMSG_FILE64,
@@ -320,12 +396,10 @@ static void sender(const char *filename) {
 
     copyfile(f, copydie_inputfile,filename,
             commso, copydie_commso,0,
-            stab.st_size);
+            stab.st_size, 1);
 
     send_flush();
 
-    if (fclose(f)) diee("couldn't close source file `%s'",filename);
-
     c= fgetc(commsi);  if (c==EOF) die_badrecv("ack");
     if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c);
 
@@ -345,23 +419,57 @@ void usagemessage(void) {
        " --help\n");
 }
 
+typedef struct {
+  const char *userhost, *path;
+} FileSpecification;
+
+static FileSpecification srcspec, dstspec;
+
+static void of__server(const struct cmdinfo *ci, const char *val) {
+  int ncount= nsargs + 1 + !!val;
+  sargs= xrealloc(sargs, sizeof(*sargs) * ncount);
+  sargs[nsargs++]= ci->olong;
+  if (val)
+    sargs[nsargs++]= val;
+}
+
+static int of__server_int(const struct cmdinfo *ci, const char *val) {
+  of__server(ci,val);
+  long v;
+  char *ep;
+  errno= 0; v= strtol(val,&ep,10);
+  if (!*val || *ep || errno || v<INT_MIN || v>INT_MAX)
+    badusage("bad integer argument `%s' for --%s",val,ci->olong);
+  return v;
+}
+
 static void of_help(const struct cmdinfo *ci, const char *val) {
   usagemessage();
   if (ferror(stdout)) diee("could not write usage message to stdout");
   exit(0);
 }
 
-typedef struct {
-  const char *userhost, *path;
-} FileSpecification;
+static void of_bw(const struct cmdinfo *ci, const char *val) {
+  int pct= of__server_int(ci,val);
+  if (pct<1 || pct>100)
+    badusage("bandwidth percentage must be between 1 and 100 inclusive");
+  *(double*)ci->parg= pct * 0.01;
+}
 
-static FileSpecification srcspec, dstspec;
-static int upload=-1; /* -1 means not yet known; 0 means download */
+static void of_server_int(const struct cmdinfo *ci, const char *val) {
+  *(int*)ci->parg= of__server_int(ci,val);
+}
 
 static const struct cmdinfo cmdinfos[]= {
   { "help",     .call= of_help },
-  { "receiver", .iassignto=&upload, .arg=1 },
-  { "sender",   .iassignto=&upload, .arg=0 },
+  { "max-bandwidth-percent-mean", 0,1,.call=of_bw,.parg=&max_bw_prop_mean  },
+  { "max-bandwidth-percent-burst",0,1,.call=of_bw,.parg=&max_bw_prop_burst },
+  { "tx-block-size",     0,1,.call=of_server_int, .parg=&txblocksz         },
+  { "min-interval-usec", 0,1,.call=of_server_int, .parg=&min_interval_usec },
+  { "rcopy-repeatedly",  0,1, .sassignto=&rcopy_repeatedly_program         },
+  { "ssh-program",       0,1, .sassignto=&rsh_program                      },
+  { "receiver", .iassignto=&server_upcopy, .arg=0                          },
+  { "sender",   .iassignto=&server_upcopy, .arg=1                          },
   { 0 }
 };
 
@@ -369,15 +477,15 @@ static void server(const char *filename) {
   int c;
   commsi= stdin;
   commso= stdout;
-  fprintf(commso, "%s0002 %c\n", banner, upload?'u':'d');
+  fprintf(commso, "%s0002 %c\n", banner, server_upcopy?'u':'d');
   send_flush();
   c= fgetc(commsi);  if (c==EOF) die_badrecv("initial go");
   if (c!=REPLMSG_GO) die_protocol("initial go message was %#02x instead",c);
 
-  if (upload)
-    receiver(filename);
-  else
+  if (server_upcopy)
     sender(filename);
+  else
+    receiver(filename);
 }
 
 static void client(void) {
@@ -390,43 +498,65 @@ static void client(void) {
   FileSpecification *remotespec= srcspec.userhost ? &srcspec : &dstspec;
   const char *remotemode= srcspec.userhost ? "--sender" : "--receiver";
 
+  sargs= xrealloc(sargs, sizeof(*sargs) * (7 + nsargs));
+  memmove(sargs+5, sargs, sizeof(*sargs) * nsargs);
+  sargs[0]= rsh_program;
+  sargs[1]= remotespec->userhost;
+  sargs[2]= rcopy_repeatedly_program;
+  sargs[3]= remotemode;
+  sargs[4]= "--";
+  sargs[5+nsargs]= remotespec->path;
+  sargs[6+nsargs]= 0;
+    
   child= fork();
   if (child==-1) diee("fork failed");
   if (!child) {
-    mdup2(uppipe[0],0);
-    mdup2(downpipe[1],1);
+    mdup2(downpipe[0],0);
+    mdup2(uppipe[1],1);
     close(uppipe[0]); close(downpipe[0]);
     close(uppipe[1]); close(downpipe[1]);
-    execlp(rsh_program,
-          rsh_program, remotespec->userhost, rcopy_repeatedly_program,
-          remotemode, remotespec->path, (char*)0);
+
+    execvp(rsh_program, (char**)sargs);
     diee("failed to execute rsh program `%s'",rsh_program);
   }
 
-  commso= fdopen(uppipe[1],"wb");
-  commsi= fdopen(downpipe[0],"rb");
+  commso= fdopen(downpipe[1],"wb");
+  commsi= fdopen(uppipe[0],"rb");
   if (!commso || !commsi) diee("fdopen failed");
-  close(uppipe[0]); close(downpipe[0]);
-  close(uppipe[1]); close(downpipe[1]);
+  close(downpipe[0]);
+  close(uppipe[1]);
   
   char banbuf[sizeof(banner)-1 + 5 + 1];
   r= fread(banbuf,1,sizeof(banbuf)-1,commsi);
-  if (r!=sizeof(banbuf)-1) die_badrecv("banner");
-
-  if (memcmp(banbuf,banner,sizeof(banner)-1) ||
-      banbuf[sizeof(banner)-1 + 4] != ' ')
-    die(8,-1,"banner received was not as expected -"
+  if (ferror(commsi)) die_badrecv("read banner");
+
+  if (r!=sizeof(banbuf)-1 ||
+      memcmp(banbuf,banner,sizeof(banner)-1) ||
+      banbuf[sizeof(banner)-1 + 4] != ' ') {
+    const char **sap;
+    int count=0;
+    for (count=0, sap=sargs; *sap; sap++) count+= strlen(*sap)+1;
+    char *cmdline= xmalloc(count+1);
+    cmdline[0]=' ';
+    for (sap=sargs; *sap; sap++) {
+      strcat(cmdline," ");
+      strcat(cmdline,*sap);
+    }
+    
+    die(8,0,-1,"did not receive banner as expected -"
        " shell dirty? ssh broken?\n"
-       " try running   %s %s %s --sender %s\n"
-       " and expect the first line to be   %s",
-       rsh_program, remotespec->userhost,
-       rcopy_repeatedly_program, remotespec->path,
-       banner);
+       " try running\n"
+       "  %s\n"
+       " and expect the first line to be\n"
+       "  %s",
+       cmdline, banner);
+  }
   
   banbuf[sizeof(banbuf)-1]= 0;
   char *ep;
   long decllen= strtoul(banbuf + sizeof(banner)-1, &ep, 16);
-  if (*ep) die_protocol("declaration length syntax error");
+  if (ep!=banbuf + sizeof(banner)-1 + 4 || *ep!=' ')
+    die_protocol("declaration length syntax error (`%s')",ep);
   assert(decllen <= sizeof(mainbuf));
   if (decllen<2) die_protocol("declaration too short");
 
@@ -434,15 +564,15 @@ static void client(void) {
   if (r!=decllen) die_badrecv("declaration");
   if (mainbuf[decllen-1] != '\n')
     die_protocol("declaration missing final newline");
-  if (mainbuf[0] != upload ? 'u' : 'd')
+  if (mainbuf[0] != (remotespec==&srcspec ? 'u' : 'd'))
     die_protocol("declaration incorrect direction indicator");
 
   sendbyte(REPLMSG_GO);
 
-  if (upload)
-    sender(srcspec.path);
-  else
+  if (remotespec==&srcspec)
     receiver(dstspec.path);
+  else
+    sender(srcspec.path);
 }
 
 static void parse_file_specification(FileSpecification *fs, const char *arg,
@@ -458,7 +588,7 @@ static void parse_file_specification(FileSpecification *fs, const char *arg,
       return;
     }
     if (*colon==':') {
-      char *uh= malloc(colon-arg + 1);  if (!fs->userhost) diem();
+      char *uh= xmalloc(colon-arg + 1);
       memcpy(uh,arg, colon-arg);  uh[colon-arg]= 0;
       fs->userhost= uh;
       fs->path= colon+1;
@@ -468,9 +598,22 @@ static void parse_file_specification(FileSpecification *fs, const char *arg,
 }
 
 int main(int argc, const char *const *argv) {
+  setvbuf(stderr,0,_IOLBF,BUFSIZ);
+
   myopt(&argv, cmdinfos);
 
-  if (upload>=0) {
+  if (!rsh_program) rsh_program= getenv("RCOPY_REPEATEDLY_RSH");
+  if (!rsh_program) rsh_program= getenv("RSYNC_RSH");
+  if (!rsh_program) rsh_program= "ssh";
+
+  if (max_bw_prop_burst / max_bw_prop_mean < 1.1)
+    badusage("max bandwidth prop burst must be at least 1.1x"
+             " max bandwidth prop mean");
+
+  if (txblocksz<1) badusage("transmit block size must be at least 1");
+  if (min_interval_usec<0) badusage("minimum update interval may not be -ve");
+
+  if (server_upcopy>=0) {
     if (!argv[0] || argv[1])
       badusage("server mode must have just the filename as non-option arg");
     server(argv[0]);