From f4ebcf48e13ce42543ce941c27946fe296eed319 Mon Sep 17 00:00:00 2001 From: ianmdlvl Date: Sat, 4 Oct 2008 00:01:48 +0000 Subject: [PATCH] rcopy-repeatedly seems to mostly work --- cprogs/rcopy-repeatedly.c | 225 ++++++++++++++++++++++++++++---------- debian/changelog | 1 + 2 files changed, 168 insertions(+), 58 deletions(-) diff --git a/cprogs/rcopy-repeatedly.c b/cprogs/rcopy-repeatedly.c index 9c23185..86cd317 100644 --- a/cprogs/rcopy-repeatedly.c +++ b/cprogs/rcopy-repeatedly.c @@ -54,13 +54,19 @@ static const char banner[]= "#rcopy-repeatedly#\n"; static FILE *commsi, *commso; -static double max_bandwidth_proportion_mean= 0.2; -static double max_bandwidth_proportion_burst= 0.8; +static double max_bw_prop_mean= 0.2; +static double max_bw_prop_burst= 0.8; static int txblocksz= INT_MAX, verbose=1; static int min_interval_usec= 100000; /* 100ms */ -static const char *rsh_program= "ssh"; +static int nsargs; +static const char **sargs; + +static const char *rsh_program= 0; static const char *rcopy_repeatedly_program= "rcopy-repeatedly"; +static int server_upcopy=-1; /* -1 means not yet known; 0 means download */ + /* `up' means towards the client, + * since we regard the subprocess as `down' */ static double stream_allow_secsperbyte= 1/1e6; /* for initial transfer */ @@ -68,36 +74,53 @@ static char mainbuf[65536]; /* must be at least 2^16 */ #define NORETURN __attribute__((noreturn)) -static void vdie(int ec, int eno, const char *fmt, va_list al) NORETURN; -static void vdie(int ec, int eno, const char *fmt, va_list al) { +static void vdie(int ec, const char *pfx, int eno, + const char *fmt, va_list al) NORETURN; +static void vdie(int ec, const char *pfx, int eno, + const char *fmt, va_list al) { fputs("rcopy-repeatedly: ",stderr); + if (server_upcopy>=0) fputs("server: ",stderr); + if (pfx) fprintf(stderr,"%s: ",pfx); vfprintf(stderr,fmt,al); if (eno!=-1) fprintf(stderr,": %s",strerror(eno)); fputc('\n',stderr); exit(ec); } -static void die(int ec, int eno, const char *fmt, ...) NORETURN; -static void die(int ec, int eno, const char *fmt, ...) { +static void die(int ec, const char *pfx, int eno, + const char *fmt, ...) NORETURN; +static void die(int ec, const char *pfx, int eno, + const char *fmt, ...) { va_list al; va_start(al,fmt); - vdie(ec,eno,fmt,al); + vdie(ec,pfx,eno,fmt,al); } static void diem(void) NORETURN; -static void diem(void) { die(16,errno,"malloc failed"); } +static void diem(void) { die(16,0,errno,"malloc failed"); } +static void *xmalloc(size_t sz) { + assert(sz); + void *p= malloc(sz); + if (!p) diem(); + return p; +} +static void *xrealloc(void *p, size_t sz) { + assert(sz); + p= realloc(p,sz); + if (!p) diem(); + return p; +} static void diee(const char *fmt, ...) NORETURN; static void diee(const char *fmt, ...) { va_list al; va_start(al,fmt); - vdie(12,errno,fmt,al); + vdie(12,0,errno,fmt,al); } static void die_protocol(const char *fmt, ...) NORETURN; static void die_protocol(const char *fmt, ...) { va_list al; - fputs("protocol error: ",stderr); va_start(al,fmt); - vdie(10,-1,fmt,al); + vdie(10,"protocol error",-1,fmt,al); } static void die_badrecv(const char *what) NORETURN; @@ -145,6 +168,11 @@ static double mgettime_elapsed(struct timespec ts_base, (ts_ret->tv_nsec - ts_base.tv_nsec)*1e-9; } +static void flushstderr(void) { + if (ferror(stderr) || fflush(stderr)) + diee("could not write progress to stderr"); +} + static void verbosespinprintf(const char *fmt, ...) { static const char spinnerchars[]= "/-\\"; static int spinnerchar; @@ -154,12 +182,11 @@ static void verbosespinprintf(const char *fmt, ...) { va_list al; va_start(al,fmt); - fprintf(stderr," %c ",spinnerchars[spinnerchar]); + fprintf(stderr," %c ",spinnerchars[spinnerchar]); spinnerchar++; spinnerchar %= sizeof(spinnerchars)-1; vfprintf(stderr,fmt,al); putc('\r',stderr); - if (ferror(stderr) || fflush(stderr)) - diee("could not write progress to stderr"); + flushstderr(); } static void bandlimit_sendend(uint64_t bytes, int *interval_usec_update) { @@ -168,26 +195,26 @@ static void bandlimit_sendend(uint64_t bytes, int *interval_usec_update) { double secsperbyte_observed= elapsed / bytes; stream_allow_secsperbyte= - secsperbyte_observed / max_bandwidth_proportion_burst; + secsperbyte_observed * max_bw_prop_mean / max_bw_prop_burst; - double min_update= elapsed * max_bandwidth_proportion_burst - / max_bandwidth_proportion_mean; + double min_update= elapsed / max_bw_prop_mean; if (min_update > 1e3) min_update= 1e3; int min_update_usec= min_update * 1e6; if (*interval_usec_update > min_update_usec) *interval_usec_update= min_update_usec; - verbosespinprintf("%12lluby %10fs %13gkby/s", + verbosespinprintf("%12lluby %10.3fs %13.2fkby/s", (unsigned long long)bytes, elapsed, 1e-3/secsperbyte_observed); } static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi, FILE *df, copyfile_die_fn *ddie, const char *dxi, - uint64_t l) { + uint64_t lstart, int amsender) { struct timespec ts_last; int now, r; + uint64_t l=lstart, done=0; ts_last= ts_sendstart; @@ -195,15 +222,24 @@ static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi, now= l < sizeof(mainbuf) ? l : sizeof(mainbuf); if (now > txblocksz) now= txblocksz; - double elapsed_want= now * stream_allow_secsperbyte; - double elapsed= mgettime_elapsed(ts_last, &ts_last); - double needwait= elapsed_want - elapsed; - if (needwait > 1) needwait= 1; - if (needwait > 0) usleep(ceil(needwait * 1e6)); + if (verbose) { + fprintf(stderr," %3d%% \r", + (int)(done*100.0/lstart)); + flushstderr(); + } + + if (amsender) { + double elapsed_want= now * stream_allow_secsperbyte; + double elapsed= mgettime_elapsed(ts_last, &ts_last); + double needwait= elapsed_want - elapsed; + if (needwait > 1) needwait= 1; + if (needwait > 0) usleep(ceil(needwait * 1e6)); + } r= fread(mainbuf,1,now,sf); if (r!=now) sdie(sf,sxi); r= fwrite(mainbuf,1,now,df); if (r!=now) ddie(df,dxi); l -= now; + done += now; } } @@ -225,8 +261,13 @@ static void receiver(const char *filename) { char *tmpfilename; int r, c; - if (asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename) - ==-1) diem(); + char *lastslash= strrchr(filename,'/'); + if (!lastslash) + r= asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename); + else + r= asprintf(&tmpfilename, "%.*s/.rcopy-repeatedly.#%s#", + (int)(lastslash-filename), filename, lastslash+1); + if (r==-1) diem(); r= unlink(tmpfilename); if (r && errno!=ENOENT) @@ -243,6 +284,13 @@ static void receiver(const char *filename) { assert(feof(commsi)); return; + case REPLMSG_RM: + r= unlink(filename); + if (r && errno!=ENOENT) + diee("source file removed but could not remove destination file `%s'", + filename); + break; + case REPLMSG_FILE64: newfile= fopen(tmpfilename, "wb"); if (!newfile) diee("could not create temporary receiving file `%s'", @@ -262,7 +310,7 @@ static void receiver(const char *filename) { copyfile(commsi, copydie_commsi,"FILE64 file data", newfile, copydie_tmpwrite,tmpfilename, - l); + l, 0); if (fclose(newfile)) diee("could not flush and close temporary" " receiving file `%s'", tmpfilename); @@ -271,6 +319,7 @@ static void receiver(const char *filename) { filename); sendbyte(REPLMSG_GO); + break; default: die_protocol("unknown transfer message code 0x%02x",c); @@ -313,7 +362,7 @@ static void sender(const char *filename) { if (!f) { if (errno!=ENOENT) diee("could not access source file `%s'",filename); if (told != told_remove) { - verbosespinprintf(" ENOENT "); + verbosespinprintf(" ENOENT "); sendbyte(REPLMSG_RM); told= told_remove; } @@ -327,7 +376,7 @@ static void sender(const char *filename) { if (r) diee("could not fstat source file `%s'",filename); if (!S_ISREG(stab.st_mode)) - die(12,-1,"source file `%s' is not a plain file",filename); + die(8,0,-1,"source file `%s' is not a plain file",filename); uint8_t hbuf[9]= { REPLMSG_FILE64, @@ -347,12 +396,10 @@ static void sender(const char *filename) { copyfile(f, copydie_inputfile,filename, commso, copydie_commso,0, - stab.st_size); + stab.st_size, 1); send_flush(); - if (fclose(f)) diee("couldn't close source file `%s'",filename); - c= fgetc(commsi); if (c==EOF) die_badrecv("ack"); if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c); @@ -372,25 +419,57 @@ void usagemessage(void) { " --help\n"); } +typedef struct { + const char *userhost, *path; +} FileSpecification; + +static FileSpecification srcspec, dstspec; + +static void of__server(const struct cmdinfo *ci, const char *val) { + int ncount= nsargs + 1 + !!val; + sargs= xrealloc(sargs, sizeof(*sargs) * ncount); + sargs[nsargs++]= ci->olong; + if (val) + sargs[nsargs++]= val; +} + +static int of__server_int(const struct cmdinfo *ci, const char *val) { + of__server(ci,val); + long v; + char *ep; + errno= 0; v= strtol(val,&ep,10); + if (!*val || *ep || errno || vINT_MAX) + badusage("bad integer argument `%s' for --%s",val,ci->olong); + return v; +} + static void of_help(const struct cmdinfo *ci, const char *val) { usagemessage(); if (ferror(stdout)) diee("could not write usage message to stdout"); exit(0); } -typedef struct { - const char *userhost, *path; -} FileSpecification; +static void of_bw(const struct cmdinfo *ci, const char *val) { + int pct= of__server_int(ci,val); + if (pct<1 || pct>100) + badusage("bandwidth percentage must be between 1 and 100 inclusive"); + *(double*)ci->parg= pct * 0.01; +} -static FileSpecification srcspec, dstspec; -static int upcopy=-1; /* -1 means not yet known; 0 means download */ - /* `up' means towards the client, - * since we regard the subprocess as `down' */ +static void of_server_int(const struct cmdinfo *ci, const char *val) { + *(int*)ci->parg= of__server_int(ci,val); +} static const struct cmdinfo cmdinfos[]= { { "help", .call= of_help }, - { "receiver", .iassignto=&upcopy, .arg=0 }, - { "sender", .iassignto=&upcopy, .arg=1 }, + { "max-bandwidth-percent-mean", 0,1,.call=of_bw,.parg=&max_bw_prop_mean }, + { "max-bandwidth-percent-burst",0,1,.call=of_bw,.parg=&max_bw_prop_burst }, + { "tx-block-size", 0,1,.call=of_server_int, .parg=&txblocksz }, + { "min-interval-usec", 0,1,.call=of_server_int, .parg=&min_interval_usec }, + { "rcopy-repeatedly", 0,1, .sassignto=&rcopy_repeatedly_program }, + { "ssh-program", 0,1, .sassignto=&rsh_program }, + { "receiver", .iassignto=&server_upcopy, .arg=0 }, + { "sender", .iassignto=&server_upcopy, .arg=1 }, { 0 } }; @@ -398,12 +477,12 @@ static void server(const char *filename) { int c; commsi= stdin; commso= stdout; - fprintf(commso, "%s0002 %c\n", banner, upcopy?'u':'d'); + fprintf(commso, "%s0002 %c\n", banner, server_upcopy?'u':'d'); send_flush(); c= fgetc(commsi); if (c==EOF) die_badrecv("initial go"); if (c!=REPLMSG_GO) die_protocol("initial go message was %#02x instead",c); - if (upcopy) + if (server_upcopy) sender(filename); else receiver(filename); @@ -419,6 +498,16 @@ static void client(void) { FileSpecification *remotespec= srcspec.userhost ? &srcspec : &dstspec; const char *remotemode= srcspec.userhost ? "--sender" : "--receiver"; + sargs= xrealloc(sargs, sizeof(*sargs) * (7 + nsargs)); + memmove(sargs+5, sargs, sizeof(*sargs) * nsargs); + sargs[0]= rsh_program; + sargs[1]= remotespec->userhost; + sargs[2]= rcopy_repeatedly_program; + sargs[3]= remotemode; + sargs[4]= "--"; + sargs[5+nsargs]= remotespec->path; + sargs[6+nsargs]= 0; + child= fork(); if (child==-1) diee("fork failed"); if (!child) { @@ -426,9 +515,8 @@ static void client(void) { mdup2(uppipe[1],1); close(uppipe[0]); close(downpipe[0]); close(uppipe[1]); close(downpipe[1]); - execlp(rsh_program, - rsh_program, remotespec->userhost, rcopy_repeatedly_program, - remotemode, remotespec->path, (char*)0); + + execvp(rsh_program, (char**)sargs); diee("failed to execute rsh program `%s'",rsh_program); } @@ -444,21 +532,31 @@ static void client(void) { if (r!=sizeof(banbuf)-1 || memcmp(banbuf,banner,sizeof(banner)-1) || - banbuf[sizeof(banner)-1 + 4] != ' ') - die(8,-1,"did not receive banner as expected -" + banbuf[sizeof(banner)-1 + 4] != ' ') { + const char **sap; + int count=0; + for (count=0, sap=sargs; *sap; sap++) count+= strlen(*sap)+1; + char *cmdline= xmalloc(count+1); + cmdline[0]=' '; + for (sap=sargs; *sap; sap++) { + strcat(cmdline," "); + strcat(cmdline,*sap); + } + + die(8,0,-1,"did not receive banner as expected -" " shell dirty? ssh broken?\n" " try running\n" - " %s %s %s --sender %s\n" + " %s\n" " and expect the first line to be\n" " %s", - rsh_program, remotespec->userhost, - rcopy_repeatedly_program, remotespec->path, - banner); + cmdline, banner); + } banbuf[sizeof(banbuf)-1]= 0; char *ep; long decllen= strtoul(banbuf + sizeof(banner)-1, &ep, 16); - if (*ep) die_protocol("declaration length syntax error"); + if (ep!=banbuf + sizeof(banner)-1 + 4 || *ep!=' ') + die_protocol("declaration length syntax error (`%s')",ep); assert(decllen <= sizeof(mainbuf)); if (decllen<2) die_protocol("declaration too short"); @@ -466,12 +564,12 @@ static void client(void) { if (r!=decllen) die_badrecv("declaration"); if (mainbuf[decllen-1] != '\n') die_protocol("declaration missing final newline"); - if (mainbuf[0] != upcopy ? 'u' : 'd') + if (mainbuf[0] != (remotespec==&srcspec ? 'u' : 'd')) die_protocol("declaration incorrect direction indicator"); sendbyte(REPLMSG_GO); - if (upcopy) + if (remotespec==&srcspec) receiver(dstspec.path); else sender(srcspec.path); @@ -490,7 +588,7 @@ static void parse_file_specification(FileSpecification *fs, const char *arg, return; } if (*colon==':') { - char *uh= malloc(colon-arg + 1); if (!uh) diem(); + char *uh= xmalloc(colon-arg + 1); memcpy(uh,arg, colon-arg); uh[colon-arg]= 0; fs->userhost= uh; fs->path= colon+1; @@ -504,7 +602,18 @@ int main(int argc, const char *const *argv) { myopt(&argv, cmdinfos); - if (upcopy>=0) { + if (!rsh_program) rsh_program= getenv("RCOPY_REPEATEDLY_RSH"); + if (!rsh_program) rsh_program= getenv("RSYNC_RSH"); + if (!rsh_program) rsh_program= "ssh"; + + if (max_bw_prop_burst / max_bw_prop_mean < 1.1) + badusage("max bandwidth prop burst must be at least 1.1x" + " max bandwidth prop mean"); + + if (txblocksz<1) badusage("transmit block size must be at least 1"); + if (min_interval_usec<0) badusage("minimum update interval may not be -ve"); + + if (server_upcopy>=0) { if (!argv[0] || argv[1]) badusage("server mode must have just the filename as non-option arg"); server(argv[0]); diff --git a/debian/changelog b/debian/changelog index 185deeb..39e21a8 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,6 +1,7 @@ chiark-utils (4.1.30) unstable; urgency=low * cvs-repomove: work with Solaris's shoddy sed. (Closes: #497670) + * rcopy-repeatedly: new utility -- -- 2.30.2