static FILE *commsi, *commso;
-static double max_bandwidth_proportion_mean= 0.2;
-static double max_bandwidth_proportion_burst= 0.8;
-static int txblocksz= INT_MAX;
+static double max_bw_prop_mean= 0.2;
+static double max_bw_prop_burst= 0.8;
+static int txblocksz= INT_MAX, verbose=1;
static int min_interval_usec= 100000; /* 100ms */
-static const char *rsh_program= "ssh";
+static int nsargs;
+static const char **sargs;
+
+static const char *rsh_program= 0;
static const char *rcopy_repeatedly_program= "rcopy-repeatedly";
+static int server_upcopy=-1; /* -1 means not yet known; 0 means download */
+ /* `up' means towards the client,
+ * since we regard the subprocess as `down' */
static double stream_allow_secsperbyte= 1/1e6; /* for initial transfer */
#define NORETURN __attribute__((noreturn))
-static void vdie(int ec, int eno, const char *fmt, va_list al) NORETURN;
-static void vdie(int ec, int eno, const char *fmt, va_list al) {
+static void vdie(int ec, const char *pfx, int eno,
+ const char *fmt, va_list al) NORETURN;
+static void vdie(int ec, const char *pfx, int eno,
+ const char *fmt, va_list al) {
fputs("rcopy-repeatedly: ",stderr);
+ if (server_upcopy>=0) fputs("server: ",stderr);
+ if (pfx) fprintf(stderr,"%s: ",pfx);
vfprintf(stderr,fmt,al);
if (eno!=-1) fprintf(stderr,": %s",strerror(eno));
fputc('\n',stderr);
exit(ec);
}
-static void die(int ec, int eno, const char *fmt, ...) NORETURN;
-static void die(int ec, int eno, const char *fmt, ...) {
+static void die(int ec, const char *pfx, int eno,
+ const char *fmt, ...) NORETURN;
+static void die(int ec, const char *pfx, int eno,
+ const char *fmt, ...) {
va_list al;
va_start(al,fmt);
- vdie(ec,eno,fmt,al);
+ vdie(ec,pfx,eno,fmt,al);
}
static void diem(void) NORETURN;
-static void diem(void) { die(16,errno,"malloc failed"); }
+static void diem(void) { die(16,0,errno,"malloc failed"); }
+static void *xmalloc(size_t sz) {
+ assert(sz);
+ void *p= malloc(sz);
+ if (!p) diem();
+ return p;
+}
+static void *xrealloc(void *p, size_t sz) {
+ assert(sz);
+ p= realloc(p,sz);
+ if (!p) diem();
+ return p;
+}
static void diee(const char *fmt, ...) NORETURN;
static void diee(const char *fmt, ...) {
va_list al;
va_start(al,fmt);
- vdie(12,errno,fmt,al);
+ vdie(12,0,errno,fmt,al);
}
static void die_protocol(const char *fmt, ...) NORETURN;
static void die_protocol(const char *fmt, ...) {
va_list al;
- fputs("protocol error: ",stderr);
va_start(al,fmt);
- vdie(10,-1,fmt,al);
+ vdie(10,"protocol error",-1,fmt,al);
}
static void die_badrecv(const char *what) NORETURN;
(ts_ret->tv_nsec - ts_base.tv_nsec)*1e-9;
}
+static void flushstderr(void) {
+ if (ferror(stderr) || fflush(stderr))
+ diee("could not write progress to stderr");
+}
+
+static void verbosespinprintf(const char *fmt, ...) {
+ static const char spinnerchars[]= "/-\\";
+ static int spinnerchar;
+
+ if (!verbose)
+ return;
+
+ va_list al;
+ va_start(al,fmt);
+ fprintf(stderr," %c ",spinnerchars[spinnerchar]);
+ spinnerchar++; spinnerchar %= sizeof(spinnerchars)-1;
+ vfprintf(stderr,fmt,al);
+ putc('\r',stderr);
+ flushstderr();
+}
+
static void bandlimit_sendend(uint64_t bytes, int *interval_usec_update) {
struct timespec ts_buf;
double elapsed= mgettime_elapsed(ts_sendstart, &ts_buf);
double secsperbyte_observed= elapsed / bytes;
stream_allow_secsperbyte=
- secsperbyte_observed / max_bandwidth_proportion_burst;
+ secsperbyte_observed * max_bw_prop_mean / max_bw_prop_burst;
- double min_update= elapsed * max_bandwidth_proportion_burst
- / max_bandwidth_proportion_mean;
+ double min_update= elapsed / max_bw_prop_mean;
if (min_update > 1e3) min_update= 1e3;
int min_update_usec= min_update * 1e6;
if (*interval_usec_update > min_update_usec)
*interval_usec_update= min_update_usec;
+
+ verbosespinprintf("%12lluby %10.3fs %13.2fkby/s",
+ (unsigned long long)bytes, elapsed,
+ 1e-3/secsperbyte_observed);
}
static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi,
FILE *df, copyfile_die_fn *ddie, const char *dxi,
- uint64_t l) {
+ uint64_t lstart, int amsender) {
struct timespec ts_last;
int now, r;
+ uint64_t l=lstart, done=0;
ts_last= ts_sendstart;
- while (l>=0) {
+ while (l>0) {
now= l < sizeof(mainbuf) ? l : sizeof(mainbuf);
if (now > txblocksz) now= txblocksz;
- double elapsed_want= now * stream_allow_secsperbyte;
- double elapsed= mgettime_elapsed(ts_last, &ts_last);
- double needwait= elapsed_want - elapsed;
- if (needwait > 1) needwait= 1;
- if (needwait > 0) usleep(ceil(needwait * 1e6));
+ if (verbose) {
+ fprintf(stderr," %3d%% \r",
+ (int)(done*100.0/lstart));
+ flushstderr();
+ }
+
+ if (amsender) {
+ double elapsed_want= now * stream_allow_secsperbyte;
+ double elapsed= mgettime_elapsed(ts_last, &ts_last);
+ double needwait= elapsed_want - elapsed;
+ if (needwait > 1) needwait= 1;
+ if (needwait > 0) usleep(ceil(needwait * 1e6));
+ }
r= fread(mainbuf,1,now,sf); if (r!=now) sdie(sf,sxi);
r= fwrite(mainbuf,1,now,df); if (r!=now) ddie(df,dxi);
l -= now;
+ done += now;
}
}
char *tmpfilename;
int r, c;
- if (asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename)
- ==-1) diem();
+ char *lastslash= strrchr(filename,'/');
+ if (!lastslash)
+ r= asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename);
+ else
+ r= asprintf(&tmpfilename, "%.*s/.rcopy-repeatedly.#%s#",
+ (int)(lastslash-filename), filename, lastslash+1);
+ if (r==-1) diem();
r= unlink(tmpfilename);
if (r && errno!=ENOENT)
assert(feof(commsi));
return;
+ case REPLMSG_RM:
+ r= unlink(filename);
+ if (r && errno!=ENOENT)
+ diee("source file removed but could not remove destination file `%s'",
+ filename);
+ break;
+
case REPLMSG_FILE64:
newfile= fopen(tmpfilename, "wb");
if (!newfile) diee("could not create temporary receiving file `%s'",
copyfile(commsi, copydie_commsi,"FILE64 file data",
newfile, copydie_tmpwrite,tmpfilename,
- l);
+ l, 0);
if (fclose(newfile)) diee("could not flush and close temporary"
" receiving file `%s'", tmpfilename);
filename);
sendbyte(REPLMSG_GO);
+ break;
default:
die_protocol("unknown transfer message code 0x%02x",c);
told= told_nothing;
for (;;) {
- if (interval_usec) usleep(interval_usec);
+ if (interval_usec) {
+ send_flush();
+ usleep(interval_usec);
+ }
interval_usec= min_interval_usec;
r= stat(filename, &stabtest);
f= 0;
} else {
if (told == told_file &&
- stabtest.st_mode == stab.st_mode &&
- stabtest.st_dev == stab.st_dev &&
- stabtest.st_ino == stab.st_ino)
+ stabtest.st_mode == stab.st_mode &&
+ stabtest.st_dev == stab.st_dev &&
+ stabtest.st_ino == stab.st_ino &&
+ stabtest.st_mtime == stab.st_mtime &&
+ stabtest.st_size == stab.st_size)
continue;
f= fopen(filename, "rb");
}
if (!f) {
if (errno!=ENOENT) diee("could not access source file `%s'",filename);
if (told != told_remove) {
+ verbosespinprintf(" ENOENT ");
sendbyte(REPLMSG_RM);
told= told_remove;
}
if (r) diee("could not fstat source file `%s'",filename);
if (!S_ISREG(stab.st_mode))
- die(12,-1,"source file `%s' is not a plain file",filename);
+ die(8,0,-1,"source file `%s' is not a plain file",filename);
uint8_t hbuf[9]= {
REPLMSG_FILE64,
copyfile(f, copydie_inputfile,filename,
commso, copydie_commso,0,
- stab.st_size);
+ stab.st_size, 1);
send_flush();
- if (fclose(f)) diee("couldn't close source file `%s'",filename);
-
c= fgetc(commsi); if (c==EOF) die_badrecv("ack");
if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c);
" --help\n");
}
+typedef struct {
+ const char *userhost, *path;
+} FileSpecification;
+
+static FileSpecification srcspec, dstspec;
+
+static void of__server(const struct cmdinfo *ci, const char *val) {
+ int ncount= nsargs + 1 + !!val;
+ sargs= xrealloc(sargs, sizeof(*sargs) * ncount);
+ sargs[nsargs++]= ci->olong;
+ if (val)
+ sargs[nsargs++]= val;
+}
+
+static int of__server_int(const struct cmdinfo *ci, const char *val) {
+ of__server(ci,val);
+ long v;
+ char *ep;
+ errno= 0; v= strtol(val,&ep,10);
+ if (!*val || *ep || errno || v<INT_MIN || v>INT_MAX)
+ badusage("bad integer argument `%s' for --%s",val,ci->olong);
+ return v;
+}
+
static void of_help(const struct cmdinfo *ci, const char *val) {
usagemessage();
if (ferror(stdout)) diee("could not write usage message to stdout");
exit(0);
}
-typedef struct {
- const char *userhost, *path;
-} FileSpecification;
+static void of_bw(const struct cmdinfo *ci, const char *val) {
+ int pct= of__server_int(ci,val);
+ if (pct<1 || pct>100)
+ badusage("bandwidth percentage must be between 1 and 100 inclusive");
+ *(double*)ci->parg= pct * 0.01;
+}
-static FileSpecification srcspec, dstspec;
-static int upload=-1; /* -1 means not yet known; 0 means download */
+static void of_server_int(const struct cmdinfo *ci, const char *val) {
+ *(int*)ci->parg= of__server_int(ci,val);
+}
static const struct cmdinfo cmdinfos[]= {
{ "help", .call= of_help },
- { "receiver", .iassignto=&upload, .arg=1 },
- { "sender", .iassignto=&upload, .arg=0 },
+ { "max-bandwidth-percent-mean", 0,1,.call=of_bw,.parg=&max_bw_prop_mean },
+ { "max-bandwidth-percent-burst",0,1,.call=of_bw,.parg=&max_bw_prop_burst },
+ { "tx-block-size", 0,1,.call=of_server_int, .parg=&txblocksz },
+ { "min-interval-usec", 0,1,.call=of_server_int, .parg=&min_interval_usec },
+ { "rcopy-repeatedly", 0,1, .sassignto=&rcopy_repeatedly_program },
+ { "ssh-program", 0,1, .sassignto=&rsh_program },
+ { "receiver", .iassignto=&server_upcopy, .arg=0 },
+ { "sender", .iassignto=&server_upcopy, .arg=1 },
{ 0 }
};
int c;
commsi= stdin;
commso= stdout;
- fprintf(commso, "%s0002 %c\n", banner, upload?'u':'d');
+ fprintf(commso, "%s0002 %c\n", banner, server_upcopy?'u':'d');
send_flush();
c= fgetc(commsi); if (c==EOF) die_badrecv("initial go");
if (c!=REPLMSG_GO) die_protocol("initial go message was %#02x instead",c);
- if (upload)
- receiver(filename);
- else
+ if (server_upcopy)
sender(filename);
+ else
+ receiver(filename);
}
static void client(void) {
FileSpecification *remotespec= srcspec.userhost ? &srcspec : &dstspec;
const char *remotemode= srcspec.userhost ? "--sender" : "--receiver";
+ sargs= xrealloc(sargs, sizeof(*sargs) * (7 + nsargs));
+ memmove(sargs+5, sargs, sizeof(*sargs) * nsargs);
+ sargs[0]= rsh_program;
+ sargs[1]= remotespec->userhost;
+ sargs[2]= rcopy_repeatedly_program;
+ sargs[3]= remotemode;
+ sargs[4]= "--";
+ sargs[5+nsargs]= remotespec->path;
+ sargs[6+nsargs]= 0;
+
child= fork();
if (child==-1) diee("fork failed");
if (!child) {
- mdup2(uppipe[0],0);
- mdup2(downpipe[1],1);
+ mdup2(downpipe[0],0);
+ mdup2(uppipe[1],1);
close(uppipe[0]); close(downpipe[0]);
close(uppipe[1]); close(downpipe[1]);
- execlp(rsh_program,
- rsh_program, remotespec->userhost, rcopy_repeatedly_program,
- remotemode, remotespec->path, (char*)0);
+
+ execvp(rsh_program, (char**)sargs);
diee("failed to execute rsh program `%s'",rsh_program);
}
- commso= fdopen(uppipe[1],"wb");
- commsi= fdopen(downpipe[0],"rb");
+ commso= fdopen(downpipe[1],"wb");
+ commsi= fdopen(uppipe[0],"rb");
if (!commso || !commsi) diee("fdopen failed");
- close(uppipe[0]); close(downpipe[0]);
- close(uppipe[1]); close(downpipe[1]);
+ close(downpipe[0]);
+ close(uppipe[1]);
char banbuf[sizeof(banner)-1 + 5 + 1];
r= fread(banbuf,1,sizeof(banbuf)-1,commsi);
- if (r!=sizeof(banbuf)-1) die_badrecv("banner");
-
- if (memcmp(banbuf,banner,sizeof(banner)-1) ||
- banbuf[sizeof(banner)-1 + 4] != ' ')
- die(8,-1,"banner received was not as expected -"
+ if (ferror(commsi)) die_badrecv("read banner");
+
+ if (r!=sizeof(banbuf)-1 ||
+ memcmp(banbuf,banner,sizeof(banner)-1) ||
+ banbuf[sizeof(banner)-1 + 4] != ' ') {
+ const char **sap;
+ int count=0;
+ for (count=0, sap=sargs; *sap; sap++) count+= strlen(*sap)+1;
+ char *cmdline= xmalloc(count+1);
+ cmdline[0]=' ';
+ for (sap=sargs; *sap; sap++) {
+ strcat(cmdline," ");
+ strcat(cmdline,*sap);
+ }
+
+ die(8,0,-1,"did not receive banner as expected -"
" shell dirty? ssh broken?\n"
- " try running %s %s %s --sender %s\n"
- " and expect the first line to be %s",
- rsh_program, remotespec->userhost,
- rcopy_repeatedly_program, remotespec->path,
- banner);
+ " try running\n"
+ " %s\n"
+ " and expect the first line to be\n"
+ " %s",
+ cmdline, banner);
+ }
banbuf[sizeof(banbuf)-1]= 0;
char *ep;
long decllen= strtoul(banbuf + sizeof(banner)-1, &ep, 16);
- if (*ep) die_protocol("declaration length syntax error");
+ if (ep!=banbuf + sizeof(banner)-1 + 4 || *ep!=' ')
+ die_protocol("declaration length syntax error (`%s')",ep);
assert(decllen <= sizeof(mainbuf));
if (decllen<2) die_protocol("declaration too short");
if (r!=decllen) die_badrecv("declaration");
if (mainbuf[decllen-1] != '\n')
die_protocol("declaration missing final newline");
- if (mainbuf[0] != upload ? 'u' : 'd')
+ if (mainbuf[0] != (remotespec==&srcspec ? 'u' : 'd'))
die_protocol("declaration incorrect direction indicator");
sendbyte(REPLMSG_GO);
- if (upload)
- sender(srcspec.path);
- else
+ if (remotespec==&srcspec)
receiver(dstspec.path);
+ else
+ sender(srcspec.path);
}
static void parse_file_specification(FileSpecification *fs, const char *arg,
return;
}
if (*colon==':') {
- char *uh= malloc(colon-arg + 1); if (!fs->userhost) diem();
+ char *uh= xmalloc(colon-arg + 1);
memcpy(uh,arg, colon-arg); uh[colon-arg]= 0;
fs->userhost= uh;
fs->path= colon+1;
}
int main(int argc, const char *const *argv) {
+ setvbuf(stderr,0,_IOLBF,BUFSIZ);
+
myopt(&argv, cmdinfos);
- if (upload>=0) {
+ if (!rsh_program) rsh_program= getenv("RCOPY_REPEATEDLY_RSH");
+ if (!rsh_program) rsh_program= getenv("RSYNC_RSH");
+ if (!rsh_program) rsh_program= "ssh";
+
+ if (max_bw_prop_burst / max_bw_prop_mean < 1.1)
+ badusage("max bandwidth prop burst must be at least 1.1x"
+ " max bandwidth prop mean");
+
+ if (txblocksz<1) badusage("transmit block size must be at least 1");
+ if (min_interval_usec<0) badusage("minimum update interval may not be -ve");
+
+ if (server_upcopy>=0) {
if (!argv[0] || argv[1])
badusage("server mode must have just the filename as non-option arg");
server(argv[0]);