X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=cprogs%2Frcopy-repeatedly.c;h=86cd3170388c2d9a66d32b7925616411a8a38874;hb=e07e49f983756cbeb25ac4ac813e18defdca0de6;hp=489ab2daf5ec09e15cc2ae0e8d567fb09de6d36c;hpb=b04c264af5257661e52015a8895f81046a39e8d8;p=chiark-utils.git diff --git a/cprogs/rcopy-repeatedly.c b/cprogs/rcopy-repeatedly.c index 489ab2d..86cd317 100644 --- a/cprogs/rcopy-repeatedly.c +++ b/cprogs/rcopy-repeatedly.c @@ -1,7 +1,11 @@ +/* + * rcopy-repeatedly + */ + /* * protocol is: * server sends banner - * - "chiark-realtime-replicator\n" [27 bytes] + * - "#rcopy-repeatedly#\n" * - length of declaration, as 4 hex digits, zero prefixed, * and a space [5 bytes]. In this protocol version this * will be "0002" but client _must_ parse it. @@ -13,191 +17,613 @@ * client sends * - 0x01 go * then for each update - * sender sends - * - 0x02 update using rle and 8-bit counts - * - zero or more repetitions of - * n single byte giving length of data same as last time - * d single byte giving length of data changed - * ... d bytes of data - * where n!=0 or d!=0 (and for first update, n==0) - * - 0x00 0x00 - * indicates file is complete and should be installed - * or server may send + * sender sends one of * - 0x03 destination file should be deleted * but note that contents must be retained by receiver * as it may be used for rle updates + * - 0x04 complete new destination file follows, 64-bit length + * l 8 bytes big endian length + * ... l bytes data + * receiver must then reply with 0x01 GO */ -#define REPLMSG_GO 0x01 -#define REPLMSG_RLE8 0x02 -#define REPLMSG_RM 0x03 +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "myopt.h" + +#define REPLMSG_GO 0x01 +#define REPLMSG_RM 0x03 +#define REPLMSG_FILE64 0x04 + +static const char banner[]= "#rcopy-repeatedly#\n"; + +static FILE *commsi, *commso; + +static double max_bw_prop_mean= 0.2; +static double max_bw_prop_burst= 0.8; +static int txblocksz= INT_MAX, verbose=1; +static int min_interval_usec= 100000; /* 100ms */ -static void vdie(int ec, const char *fmt, const char *emstr, va_list al) { - fputs("realtime-replicator: ",stderr); +static int nsargs; +static const char **sargs; + +static const char *rsh_program= 0; +static const char *rcopy_repeatedly_program= "rcopy-repeatedly"; +static int server_upcopy=-1; /* -1 means not yet known; 0 means download */ + /* `up' means towards the client, + * since we regard the subprocess as `down' */ + +static double stream_allow_secsperbyte= 1/1e6; /* for initial transfer */ + +static char mainbuf[65536]; /* must be at least 2^16 */ + +#define NORETURN __attribute__((noreturn)) + +static void vdie(int ec, const char *pfx, int eno, + const char *fmt, va_list al) NORETURN; +static void vdie(int ec, const char *pfx, int eno, + const char *fmt, va_list al) { + fputs("rcopy-repeatedly: ",stderr); + if (server_upcopy>=0) fputs("server: ",stderr); + if (pfx) fprintf(stderr,"%s: ",pfx); vfprintf(stderr,fmt,al); - if (emstr) fprintf(stderr,": %s",emstr); + if (eno!=-1) fprintf(stderr,": %s",strerror(eno)); fputc('\n',stderr); exit(ec); } -static void die(int ec, const char *fmt, const char *emstr, ...) { - va_arg al; +static void die(int ec, const char *pfx, int eno, + const char *fmt, ...) NORETURN; +static void die(int ec, const char *pfx, int eno, + const char *fmt, ...) { + va_list al; va_start(al,fmt); - vdiee(ec,fmt,emstr,al); + vdie(ec,pfx,eno,fmt,al); } -static void diem(void) { die(16,"malloc failed",strerror(errno)); } +static void diem(void) NORETURN; +static void diem(void) { die(16,0,errno,"malloc failed"); } +static void *xmalloc(size_t sz) { + assert(sz); + void *p= malloc(sz); + if (!p) diem(); + return p; +} +static void *xrealloc(void *p, size_t sz) { + assert(sz); + p= realloc(p,sz); + if (!p) diem(); + return p; +} +static void diee(const char *fmt, ...) NORETURN; static void diee(const char *fmt, ...) { - const char *em= strerror(errno); - va_arg al; + va_list al; va_start(al,fmt); - diee(8,fmt,em,al); + vdie(12,0,errno,fmt,al); } +static void die_protocol(const char *fmt, ...) NORETURN; static void die_protocol(const char *fmt, ...) { - va_arg al; + va_list al; va_start(al,fmt); - diee(12,fmt,0,al); + vdie(10,"protocol error",-1,fmt,al); +} -static void die_badrecv(FILE *f, const char *what) { - if (ferror(f)) diee("transmission failed while receiving %s", what); - if (feof(f)) die_protocol("receiver got unexpected EOF in %s", what); +static void die_badrecv(const char *what) NORETURN; +static void die_badrecv(const char *what) { + if (ferror(commsi)) diee("communication failed while receiving %s", what); + if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what); abort(); } +static void die_badsend(void) NORETURN; +static void die_badsend(void) { + diee("transmission failed"); +} + +static void send_flush(void) { + if (ferror(commso) || fflush(commso)) + die_badsend(); +} +static void sendbyte(int c) { + if (putc(c,commso)==EOF) + die_badsend(); +} + +static void mpipe(int p[2]) { if (pipe(p)) diee("could not create pipe"); } +static void mdup2(int fd, int fd2) { + if (dup2(fd,fd2)!=fd2) diee("could not dup2(%d,%d)",fd,fd2); +} + +typedef void copyfile_die_fn(FILE *f, const char *xi); + +struct timespec ts_sendstart; -static void receiver_write(const unsigned char *buf, int n, - FILE *newfile, const char *tmpfilename) { - int r; +static void mgettime(struct timespec *ts) { + int r= clock_gettime(CLOCK_MONOTONIC, ts); + if (r) diee("clock_gettime failed"); +} + +static void bandlimit_sendstart(void) { + mgettime(&ts_sendstart); +} + +static double mgettime_elapsed(struct timespec ts_base, + struct timespec *ts_ret) { + mgettime(ts_ret); + return (ts_ret->tv_sec - ts_base.tv_sec) + + (ts_ret->tv_nsec - ts_base.tv_nsec)*1e-9; +} - r= fwrite(dbuf,1,n,newfile); - if (r != n) diee("failed to write temporary receiving file `%s'", - tmpfilename); +static void flushstderr(void) { + if (ferror(stderr) || fflush(stderr)) + diee("could not write progress to stderr"); +} + +static void verbosespinprintf(const char *fmt, ...) { + static const char spinnerchars[]= "/-\\"; + static int spinnerchar; + + if (!verbose) + return; + + va_list al; + va_start(al,fmt); + fprintf(stderr," %c ",spinnerchars[spinnerchar]); + spinnerchar++; spinnerchar %= sizeof(spinnerchars)-1; + vfprintf(stderr,fmt,al); + putc('\r',stderr); + flushstderr(); +} + +static void bandlimit_sendend(uint64_t bytes, int *interval_usec_update) { + struct timespec ts_buf; + double elapsed= mgettime_elapsed(ts_sendstart, &ts_buf); + double secsperbyte_observed= elapsed / bytes; + + stream_allow_secsperbyte= + secsperbyte_observed * max_bw_prop_mean / max_bw_prop_burst; + + double min_update= elapsed / max_bw_prop_mean; + if (min_update > 1e3) min_update= 1e3; + int min_update_usec= min_update * 1e6; + + if (*interval_usec_update > min_update_usec) + *interval_usec_update= min_update_usec; + + verbosespinprintf("%12lluby %10.3fs %13.2fkby/s", + (unsigned long long)bytes, elapsed, + 1e-3/secsperbyte_observed); } -static void receiver(const char *filename, FILE *comms) { - FILE *lastfile=0, *newfile; +static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi, + FILE *df, copyfile_die_fn *ddie, const char *dxi, + uint64_t lstart, int amsender) { + struct timespec ts_last; + int now, r; + uint64_t l=lstart, done=0; + + ts_last= ts_sendstart; + + while (l>0) { + now= l < sizeof(mainbuf) ? l : sizeof(mainbuf); + if (now > txblocksz) now= txblocksz; + + if (verbose) { + fprintf(stderr," %3d%% \r", + (int)(done*100.0/lstart)); + flushstderr(); + } + + if (amsender) { + double elapsed_want= now * stream_allow_secsperbyte; + double elapsed= mgettime_elapsed(ts_last, &ts_last); + double needwait= elapsed_want - elapsed; + if (needwait > 1) needwait= 1; + if (needwait > 0) usleep(ceil(needwait * 1e6)); + } + + r= fread(mainbuf,1,now,sf); if (r!=now) sdie(sf,sxi); + r= fwrite(mainbuf,1,now,df); if (r!=now) ddie(df,dxi); + l -= now; + done += now; + } +} + +static void copydie_inputfile(FILE *f, const char *filename) { + diee("read failed on source file `%s'", filename); +} +static void copydie_tmpwrite(FILE *f, const char *tmpfilename) { + diee("write failed to temporary receiving file `%s'", tmpfilename); +} +static void copydie_commsi(FILE *f, const char *what) { + die_badrecv(what); +} +static void copydie_commso(FILE *f, const char *what) { + die_badsend(); +} + +static void receiver(const char *filename) { + FILE *newfile; char *tmpfilename; + int r, c; - if (asprintf(&tmpfilename, ".realtime-replicator.#%s#")==-1) diem(); + char *lastslash= strrchr(filename,'/'); + if (!lastslash) + r= asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename); + else + r= asprintf(&tmpfilename, "%.*s/.rcopy-repeatedly.#%s#", + (int)(lastslash-filename), filename, lastslash+1); + if (r==-1) diem(); r= unlink(tmpfilename); if (r && errno!=ENOENT) diee("could not remove temporary receiving file `%s'", tmpfilename); for (;;) { - c= fgetc(comms); if (c==EOF) break; + send_flush(); + c= fgetc(commsi); + switch (c) { - case REPLMSG_RLE8: - newfile= fopen(tmpfilename, "w"); + case EOF: + if (ferror(commsi)) die_badrecv("transfer message code"); + assert(feof(commsi)); + return; + + case REPLMSG_RM: + r= unlink(filename); + if (r && errno!=ENOENT) + diee("source file removed but could not remove destination file `%s'", + filename); + break; + + case REPLMSG_FILE64: + newfile= fopen(tmpfilename, "wb"); if (!newfile) diee("could not create temporary receiving file `%s'", tmpfilename); - for (;;) { - unsigned char lu[2], dbuf[255]; - r= fread(lu,1,2,comms); if (r!=2) die_badrecv(comms,"RLE8 elem hdr"); - if (!lu[0] && !lu[1]) break; - if (lu[0]) { - if (!lastfile) die_protocol("first RLE8 requests some previous"); - r= fread(dbuf,1,lu[0],lastfile); - if (r!=lu[0]) { - if (ferror(lastfile)) diee("could not read current (old) file" - " `%s'", filename); - assert(feof(lastfile)); - die_protocol("RLE8 requests more previous than is available"); - } - receiver_write(dbuf,lu[0],newfile,tmpfilename); - } - if (lu[1]) { - r= fread(dbuf,1,lu[1],comms); - if (r!=lu[1]) die_badrecv(comms,"RLE8 literal data"); - receiver_write(dbuf,lu[1],newfile,tmpfilename); - } - } - if (fflush(newfile)) diee("could not flush temporary receiving file" - " `%s'", tmpfilename); + uint8_t lbuf[8]; + r= fread(lbuf,1,8,commsi); if (r!=8) die_badrecv("FILE64 l"); + + uint64_t l= + (lbuf[0] << 28 << 28) | + (lbuf[1] << 24 << 24) | + (lbuf[2] << 16 << 24) | + (lbuf[3] << 8 << 24) | + (lbuf[4] << 24) | + (lbuf[5] << 16) | + (lbuf[6] << 8) | + (lbuf[7] ) ; + + copyfile(commsi, copydie_commsi,"FILE64 file data", + newfile, copydie_tmpwrite,tmpfilename, + l, 0); + + if (fclose(newfile)) diee("could not flush and close temporary" + " receiving file `%s'", tmpfilename); if (rename(tmpfilename, filename)) diee("could not install new version of destination file `%s'", filename); - if (fclose(lastfile)) diee("failed to close old (now unlinked) file"); - lastfile= newfile; - r= fseek(lastfile,0,SEEK_SET); - if (r) diee("failed to seek installed destination file `%s'", filename); - continue; - case REPLMSG_RM: - r= unlink(filename); - if (r && errno!=ENOENT) - diee("could not remove destination file `%s' (as instructed" - " by sender)", filename); - continue; + sendbyte(REPLMSG_GO); + break; default: die_protocol("unknown transfer message code 0x%02x",c); + } } - if (feof(comms)) return 0; - die_badrecv(comms,"transfer message code"); } - static void sender(const char *filename, FILE *comms, - unsigned long interval_usec) { - struct stat stab, laststab; - memset(&stab,0,sizeof(stab)); - FILE *lastfile, *newfile; - int told_removed= 0; +static void sender(const char *filename) { + FILE *f, *fold; + int interval_usec, r, c; + struct stat stabtest, stab; + enum { told_nothing, told_file, told_remove } told; + interval_usec= 0; + fold= 0; + told= told_nothing; + for (;;) { - r= stat(filename, &stab); + if (interval_usec) { + send_flush(); + usleep(interval_usec); + } + interval_usec= min_interval_usec; + + r= stat(filename, &stabtest); if (r) { - newfile= 0; + f= 0; } else { - if (stab.st_dev == laststab.st_dev && - stab.st_ino == laststab.st_ino) { - usleep(interval_usec); + if (told == told_file && + stabtest.st_mode == stab.st_mode && + stabtest.st_dev == stab.st_dev && + stabtest.st_ino == stab.st_ino && + stabtest.st_mtime == stab.st_mtime && + stabtest.st_size == stab.st_size) continue; - } - newfile= fopen(filename, "r"); + f= fopen(filename, "rb"); } - if (!newfile) { + + if (!f) { if (errno!=ENOENT) diee("could not access source file `%s'",filename); - if (told_removed) { - usleep(interval_usec); - continue; + if (told != told_remove) { + verbosespinprintf(" ENOENT "); + sendbyte(REPLMSG_RM); + told= told_remove; } - sender_sendbyte(REPLMSG_RM); continue; } - for (;;) { - n= 0; - while (n<255) { - cl= nextchar(lastfile); - cn= nextchar(newfile); - if (cl!=cn) { + if (fold) fclose(fold); + fold= 0; + + r= fstat(fileno(f),&stab); + if (r) diee("could not fstat source file `%s'",filename); + + if (!S_ISREG(stab.st_mode)) + die(8,0,-1,"source file `%s' is not a plain file",filename); + + uint8_t hbuf[9]= { + REPLMSG_FILE64, + stab.st_size >> 28 >> 28, + stab.st_size >> 24 >> 24, + stab.st_size >> 16 >> 24, + stab.st_size >> 8 >> 24, + stab.st_size >> 24, + stab.st_size >> 16, + stab.st_size >> 8, + stab.st_size + }; + + bandlimit_sendstart(); + + r= fwrite(hbuf,1,9,commso); if (r!=9) die_badsend(); + + copyfile(f, copydie_inputfile,filename, + commso, copydie_commso,0, + stab.st_size, 1); + + send_flush(); + + c= fgetc(commsi); if (c==EOF) die_badrecv("ack"); + if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c); + + bandlimit_sendend(stab.st_size, &interval_usec); + + fold= f; + told= told_file; + } +} + +void usagemessage(void) { + puts("usage: rcopy-repeatedly [] \n" + " may be or [@]:\n" + " exactly one of each of the two forms must be provided\n" + " a file is taken as remote if it has a : before the first /\n" + "options\n" + " --help\n"); +} + +typedef struct { + const char *userhost, *path; +} FileSpecification; + +static FileSpecification srcspec, dstspec; + +static void of__server(const struct cmdinfo *ci, const char *val) { + int ncount= nsargs + 1 + !!val; + sargs= xrealloc(sargs, sizeof(*sargs) * ncount); + sargs[nsargs++]= ci->olong; + if (val) + sargs[nsargs++]= val; +} + +static int of__server_int(const struct cmdinfo *ci, const char *val) { + of__server(ci,val); + long v; + char *ep; + errno= 0; v= strtol(val,&ep,10); + if (!*val || *ep || errno || vINT_MAX) + badusage("bad integer argument `%s' for --%s",val,ci->olong); + return v; +} + +static void of_help(const struct cmdinfo *ci, const char *val) { + usagemessage(); + if (ferror(stdout)) diee("could not write usage message to stdout"); + exit(0); +} + +static void of_bw(const struct cmdinfo *ci, const char *val) { + int pct= of__server_int(ci,val); + if (pct<1 || pct>100) + badusage("bandwidth percentage must be between 1 and 100 inclusive"); + *(double*)ci->parg= pct * 0.01; +} + +static void of_server_int(const struct cmdinfo *ci, const char *val) { + *(int*)ci->parg= of__server_int(ci,val); +} + +static const struct cmdinfo cmdinfos[]= { + { "help", .call= of_help }, + { "max-bandwidth-percent-mean", 0,1,.call=of_bw,.parg=&max_bw_prop_mean }, + { "max-bandwidth-percent-burst",0,1,.call=of_bw,.parg=&max_bw_prop_burst }, + { "tx-block-size", 0,1,.call=of_server_int, .parg=&txblocksz }, + { "min-interval-usec", 0,1,.call=of_server_int, .parg=&min_interval_usec }, + { "rcopy-repeatedly", 0,1, .sassignto=&rcopy_repeatedly_program }, + { "ssh-program", 0,1, .sassignto=&rsh_program }, + { "receiver", .iassignto=&server_upcopy, .arg=0 }, + { "sender", .iassignto=&server_upcopy, .arg=1 }, + { 0 } +}; + +static void server(const char *filename) { + int c; + commsi= stdin; + commso= stdout; + fprintf(commso, "%s0002 %c\n", banner, server_upcopy?'u':'d'); + send_flush(); + c= fgetc(commsi); if (c==EOF) die_badrecv("initial go"); + if (c!=REPLMSG_GO) die_protocol("initial go message was %#02x instead",c); + + if (server_upcopy) + sender(filename); + else + receiver(filename); +} + +static void client(void) { + int uppipe[2], downpipe[2], r; + pid_t child; + + mpipe(uppipe); + mpipe(downpipe); + + FileSpecification *remotespec= srcspec.userhost ? &srcspec : &dstspec; + const char *remotemode= srcspec.userhost ? "--sender" : "--receiver"; + + sargs= xrealloc(sargs, sizeof(*sargs) * (7 + nsargs)); + memmove(sargs+5, sargs, sizeof(*sargs) * nsargs); + sargs[0]= rsh_program; + sargs[1]= remotespec->userhost; + sargs[2]= rcopy_repeatedly_program; + sargs[3]= remotemode; + sargs[4]= "--"; + sargs[5+nsargs]= remotespec->path; + sargs[6+nsargs]= 0; + + child= fork(); + if (child==-1) diee("fork failed"); + if (!child) { + mdup2(downpipe[0],0); + mdup2(uppipe[1],1); + close(uppipe[0]); close(downpipe[0]); + close(uppipe[1]); close(downpipe[1]); + + execvp(rsh_program, (char**)sargs); + diee("failed to execute rsh program `%s'",rsh_program); + } + + commso= fdopen(downpipe[1],"wb"); + commsi= fdopen(uppipe[0],"rb"); + if (!commso || !commsi) diee("fdopen failed"); + close(downpipe[0]); + close(uppipe[1]); + + char banbuf[sizeof(banner)-1 + 5 + 1]; + r= fread(banbuf,1,sizeof(banbuf)-1,commsi); + if (ferror(commsi)) die_badrecv("read banner"); + + if (r!=sizeof(banbuf)-1 || + memcmp(banbuf,banner,sizeof(banner)-1) || + banbuf[sizeof(banner)-1 + 4] != ' ') { + const char **sap; + int count=0; + for (count=0, sap=sargs; *sap; sap++) count+= strlen(*sap)+1; + char *cmdline= xmalloc(count+1); + cmdline[0]=' '; + for (sap=sargs; *sap; sap++) { + strcat(cmdline," "); + strcat(cmdline,*sap); + } + + die(8,0,-1,"did not receive banner as expected -" + " shell dirty? ssh broken?\n" + " try running\n" + " %s\n" + " and expect the first line to be\n" + " %s", + cmdline, banner); + } + + banbuf[sizeof(banbuf)-1]= 0; + char *ep; + long decllen= strtoul(banbuf + sizeof(banner)-1, &ep, 16); + if (ep!=banbuf + sizeof(banner)-1 + 4 || *ep!=' ') + die_protocol("declaration length syntax error (`%s')",ep); + assert(decllen <= sizeof(mainbuf)); + if (decllen<2) die_protocol("declaration too short"); + + r= fread(mainbuf,1,decllen,commsi); + if (r!=decllen) die_badrecv("declaration"); + if (mainbuf[decllen-1] != '\n') + die_protocol("declaration missing final newline"); + if (mainbuf[0] != (remotespec==&srcspec ? 'u' : 'd')) + die_protocol("declaration incorrect direction indicator"); + + sendbyte(REPLMSG_GO); - if (!feof(lastfile)) { - cl= fgetc(lastfile); - if (ferror(lastfile)) - diee("could not read old source file `%s'",filename); - assert(cl != EOF || feof(lastfile)); - } - - if (errno!=ENOENT) diee("could not open source file `%s'",filename); - sender_send + if (remotespec==&srcspec) + receiver(dstspec.path); + else + sender(srcspec.path); +} + +static void parse_file_specification(FileSpecification *fs, const char *arg, + const char *what) { + const char *colon; + + if (!arg) badusage("too few arguments - missing %s\n",what); - if (r) + for (colon=arg; ; colon++) { + if (!*colon || *colon=='/') { + fs->userhost=0; + fs->path= arg; + return; + } + if (*colon==':') { + char *uh= xmalloc(colon-arg + 1); + memcpy(uh,arg, colon-arg); uh[colon-arg]= 0; + fs->userhost= uh; + fs->path= colon+1; + return; + } + } } -static void sender( +int main(int argc, const char *const *argv) { + setvbuf(stderr,0,_IOLBF,BUFSIZ); - assert(ferror(newfile)); - + myopt(&argv, cmdinfos); - r= fread(dbuf,1,lu[0],comms); - if (r!=lu[0]) die_badrecv(comms, - + if (!rsh_program) rsh_program= getenv("RCOPY_REPEATEDLY_RSH"); + if (!rsh_program) rsh_program= getenv("RSYNC_RSH"); + if (!rsh_program) rsh_program= "ssh"; - c= fgetc(comms); + if (max_bw_prop_burst / max_bw_prop_mean < 1.1) + badusage("max bandwidth prop burst must be at least 1.1x" + " max bandwidth prop mean"); + if (txblocksz<1) badusage("transmit block size must be at least 1"); + if (min_interval_usec<0) badusage("minimum update interval may not be -ve"); -int main(int argc, const char **argv) { - for ( + if (server_upcopy>=0) { + if (!argv[0] || argv[1]) + badusage("server mode must have just the filename as non-option arg"); + server(argv[0]); + } else { + parse_file_specification(&srcspec, argv[0], "source"); + parse_file_specification(&dstspec, argv[1], "destination"); + if (argv[2]) badusage("too many non-option arguments"); + if (!!srcspec.userhost == !!dstspec.userhost) + badusage("need exactly one remote file argument"); + client(); + } + return 0; +}