From: ianmdlvl Date: Fri, 3 Oct 2008 22:14:43 +0000 (+0000) Subject: rcopy-repeatedly compiles, needs debugging X-Git-Tag: debian_version_4_1_30~7 X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=chiark-utils.git;a=commitdiff_plain;h=1dddd3ed4fe7eb7073312ac6c364c612b6bc9b07 rcopy-repeatedly compiles, needs debugging --- diff --git a/cprogs/.cvsignore b/cprogs/.cvsignore index 8d72461..ca39263 100644 --- a/cprogs/.cvsignore +++ b/cprogs/.cvsignore @@ -7,3 +7,4 @@ xacpi-simple mcastsoundd summer watershed +rcopy-repeatedly diff --git a/cprogs/Makefile b/cprogs/Makefile index 584cda1..e9f19e0 100644 --- a/cprogs/Makefile +++ b/cprogs/Makefile @@ -26,7 +26,7 @@ include ../settings.make RWBUFFER_SIZE_MB=16 PROGRAMS= readbuffer writebuffer with-lock-ex xacpi-simple \ - summer watershed + summer watershed rcopy-repeatedly SUIDSBINPROGRAMS= really DAEMONS= trivsoundd MAN1PAGES= readbuffer.1 writebuffer.1 with-lock-ex.1 @@ -41,7 +41,7 @@ writebuffer: writebuffer.o wrbufcore.o rwbuffer.o trivsoundd: trivsoundd.o wrbufcore.o rwbuffer.o really: really.o myopt.o -really.o myopt.o: myopt.h +really.o myopt.o rcopy-repeatedly.o: myopt.h readbuffer.o writebuffer.o rwbuffer.o wrbufcore.o trivsoundd.o: rwbuffer.h xacpi-simple: xacpi-simple.o @@ -50,6 +50,9 @@ xacpi-simple: xacpi-simple.o summer: summer.o $(CC) -o $@ $< -lnettle -lgmp +rcopy-repeatedly: rcopy-repeatedly.o myopt.o + $(CC) -o $@ $^ -lm -lrt + watershed: watershed.o $(CC) -o $@ $< -lnettle -lgmp diff --git a/cprogs/rcopy-repeatedly.c b/cprogs/rcopy-repeatedly.c index 3b64315..399e8aa 100644 --- a/cprogs/rcopy-repeatedly.c +++ b/cprogs/rcopy-repeatedly.c @@ -1,7 +1,11 @@ +/* + * rcopy-repeatedly + */ + /* * protocol is: * server sends banner - * - "chiark-realtime-replicator\n" [27 bytes] + * - "#rcopy-repeatedly#\n" * - length of declaration, as 4 hex digits, zero prefixed, * and a space [5 bytes]. In this protocol version this * will be "0002" but client _must_ parse it. @@ -23,79 +27,168 @@ * receiver must then reply with 0x01 GO */ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "myopt.h" + #define REPLMSG_GO 0x01 -#define REPLMSG_RLE8 0x02 #define REPLMSG_RM 0x03 #define REPLMSG_FILE64 0x04 +static const char banner[]= "#rcopy-repeatedly#\n"; + static FILE *commsi, *commso; -static long interval_usec; -static int bytes_per_sec_log2; +static double max_bandwidth_proportion_mean= 0.2; +static double max_bandwidth_proportion_burst= 0.8; +static int txblocksz= INT_MAX; +static int min_interval_usec= 100000; /* 100ms */ + +static const char *rsh_program= "ssh"; +static const char *rcopy_repeatedly_program= "rcopy-repeatedly"; + +static double stream_allow_secsperbyte= 1/1e6; /* for initial transfer */ + +static char mainbuf[65536]; /* must be at least 2^16 */ + +#define NORETURN __attribute__((noreturn)) + +static void vdie(int ec, int eno, const char *fmt, va_list al) NORETURN; static void vdie(int ec, int eno, const char *fmt, va_list al) { - fputs("realtime-replicator: ",stderr); + fputs("rcopy-repeatedly: ",stderr); vfprintf(stderr,fmt,al); if (eno!=-1) fprintf(stderr,": %s",strerror(eno)); fputc('\n',stderr); exit(ec); } +static void die(int ec, int eno, const char *fmt, ...) NORETURN; static void die(int ec, int eno, const char *fmt, ...) { - va_arg al; + va_list al; va_start(al,fmt); - vdiee(ec,eno,fmt,al); + vdie(ec,eno,fmt,al); } +static void diem(void) NORETURN; static void diem(void) { die(16,errno,"malloc failed"); } +static void diee(const char *fmt, ...) NORETURN; static void diee(const char *fmt, ...) { - va_arg al; + va_list al; va_start(al,fmt); - diee(8,errno,fmt,al); + vdie(12,errno,fmt,al); } +static void die_protocol(const char *fmt, ...) NORETURN; static void die_protocol(const char *fmt, ...) { - va_arg al; + va_list al; + fputs("protocol error: ",stderr); va_start(al,fmt); - diee(12,-1,fmt,al); + vdie(10,-1,fmt,al); +} +static void die_badrecv(const char *what) NORETURN; static void die_badrecv(const char *what) { if (ferror(commsi)) diee("communication failed while receiving %s", what); if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what); abort(); } +static void die_badsend(void) NORETURN; static void die_badsend(void) { diee("transmission failed"); } -static void receiver_write(const unsigned char *buf, int n, - FILE *newfile, const char *tmpfilename) { - int r; +static void send_flush(void) { + if (ferror(commso) || fflush(commso)) + die_badsend(); +} +static void sendbyte(int c) { + if (putc(c,commso)==EOF) + die_badsend(); +} - r= fwrite(dbuf,1,n,newfile); - if (r != n) diee("failed to write temporary receiving file `%s'", - tmpfilename); +static void mpipe(int p[2]) { if (pipe(p)) diee("could not create pipe"); } +static void mdup2(int fd, int fd2) { + if (dup2(fd,fd2)!=fd2) diee("could not dup2(%d,%d)",fd,fd2); } -typedef static void copyfile_die_fn(FILE *f, const char *xi); +typedef void copyfile_die_fn(FILE *f, const char *xi); + +struct timespec ts_sendstart; + +static void mgettime(struct timespec *ts) { + int r= clock_gettime(CLOCK_MONOTONIC, ts); + if (r) diee("clock_gettime failed"); +} + +static void bandlimit_sendstart(void) { + mgettime(&ts_sendstart); +} + +static double mgettime_elapsed(struct timespec ts_base, + struct timespec *ts_ret) { + mgettime(ts_ret); + return (ts_ret->tv_sec - ts_base.tv_sec) + + (ts_ret->tv_nsec - ts_base.tv_nsec)*1e-9; +} + +static void bandlimit_sendend(uint64_t bytes, int *interval_usec_update) { + struct timespec ts_buf; + double elapsed= mgettime_elapsed(ts_sendstart, &ts_buf); + double secsperbyte_observed= elapsed / bytes; + + stream_allow_secsperbyte= + secsperbyte_observed / max_bandwidth_proportion_burst; + + double min_update= elapsed * max_bandwidth_proportion_burst + / max_bandwidth_proportion_mean; + if (min_update > 1e3) min_update= 1e3; + int min_update_usec= min_update * 1e6; + + if (*interval_usec_update > min_update_usec) + *interval_usec_update= min_update_usec; +} -static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi +static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi, FILE *df, copyfile_die_fn *ddie, const char *dxi, uint64_t l) { - char buf[65536]; + struct timespec ts_last; + int now, r; -fixme rate limit -fixme adjustable chunk size + ts_last= ts_sendstart; while (l>=0) { - now= l < sizeof(buf) ? l : sizeof(buf); + now= l < sizeof(mainbuf) ? l : sizeof(mainbuf); + if (now > txblocksz) now= txblocksz; - + double elapsed_want= now * stream_allow_secsperbyte; + double elapsed= mgettime_elapsed(ts_last, &ts_last); + double needwait= elapsed_want - elapsed; + if (needwait > 1) needwait= 1; + if (needwait > 0) usleep(ceil(needwait * 1e6)); - r= fread(buf,1,now,sf); if (r!=now) sdie(sf,sxi); - r= fwrite(buf,1,now,df); if (r!=now) ddie(df,dxi); + r= fread(mainbuf,1,now,sf); if (r!=now) sdie(sf,sxi); + r= fwrite(mainbuf,1,now,df); if (r!=now) ddie(df,dxi); l -= now; } } +static void copydie_inputfile(FILE *f, const char *filename) { + diee("read failed on source file `%s'", filename); +} static void copydie_tmpwrite(FILE *f, const char *tmpfilename) { diee("write failed to temporary receiving file `%s'", tmpfilename); } @@ -109,8 +202,10 @@ static void copydie_commso(FILE *f, const char *what) { static void receiver(const char *filename) { FILE *newfile; char *tmpfilename; + int r, c; - if (asprintf(&tmpfilename, ".realtime-replicator.#%s#")==-1) diem(); + if (asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename) + ==-1) diem(); r= unlink(tmpfilename); if (r && errno!=ENOENT) @@ -128,7 +223,7 @@ static void receiver(const char *filename) { return; case REPLMSG_FILE64: - newfile= fopen(tmpfilename, "w"); + newfile= fopen(tmpfilename, "wb"); if (!newfile) diee("could not create temporary receiving file `%s'", tmpfilename); uint8_t lbuf[8]; @@ -164,23 +259,43 @@ static void receiver(const char *filename) { } static void sender(const char *filename) { - FILE *f; - int told_removed= 0; - struct stat stab; - + FILE *f, *fold; + int interval_usec, r, c; + struct stat stabtest, stab; + enum { told_nothing, told_file, told_remove } told; + + interval_usec= 0; + fold= 0; + told= told_nothing; + for (;;) { - f= fopen(filename, "r"); + if (interval_usec) usleep(interval_usec); + interval_usec= min_interval_usec; + + r= stat(filename, &stabtest); + if (r) { + f= 0; + } else { + if (told == told_file && + stabtest.st_mode == stab.st_mode && + stabtest.st_dev == stab.st_dev && + stabtest.st_ino == stab.st_ino) + continue; + f= fopen(filename, "rb"); + } + if (!f) { if (errno!=ENOENT) diee("could not access source file `%s'",filename); - if (told_removed) { - usleep(interval_usec); - continue; + if (told != told_remove) { + sendbyte(REPLMSG_RM); + told= told_remove; } - told_removed= 1; - sendbyte(REPLMSG_RM); continue; } + if (fold) fclose(fold); + fold= 0; + r= fstat(fileno(f),&stab); if (r) diee("could not fstat source file `%s'",filename); @@ -198,12 +313,14 @@ static void sender(const char *filename) { stab.st_size >> 8, stab.st_size }; + + bandlimit_sendstart(); r= fwrite(hbuf,1,9,commso); if (r!=9) die_badsend(); - told_removed= 0; copyfile(f, copydie_inputfile,filename, - commso, copydie_commso,0); + commso, copydie_commso,0, + stab.st_size); send_flush(); @@ -212,9 +329,158 @@ static void sender(const char *filename) { c= fgetc(commsi); if (c==EOF) die_badrecv("ack"); if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c); - usleep(interval_usec); + bandlimit_sendend(stab.st_size, &interval_usec); + + fold= f; + told= told_file; + } +} + +void usagemessage(void) { + puts("usage: rcopy-repeatedly [] \n" + " may be or [@]:\n" + " exactly one of each of the two forms must be provided\n" + " a file is taken as remote if it has a : before the first /\n" + "options\n" + " --help\n"); +} + +static void of_help(const struct cmdinfo *ci, const char *val) { + usagemessage(); + if (ferror(stdout)) diee("could not write usage message to stdout"); + exit(0); +} + +typedef struct { + const char *userhost, *path; +} FileSpecification; + +static FileSpecification srcspec, dstspec; +static int upload=-1; /* -1 means not yet known; 0 means download */ + +static const struct cmdinfo cmdinfos[]= { + { "help", .call= of_help }, + { "receiver", .iassignto=&upload, .arg=1 }, + { "sender", .iassignto=&upload, .arg=0 }, + { 0 } +}; + +static void server(const char *filename) { + int c; + commsi= stdin; + commso= stdout; + fprintf(commso, "%s0002 %c\n", banner, upload?'u':'d'); + send_flush(); + c= fgetc(commsi); if (c==EOF) die_badrecv("initial go"); + if (c!=REPLMSG_GO) die_protocol("initial go message was %#02x instead",c); + + if (upload) + receiver(filename); + else + sender(filename); +} + +static void client(void) { + int uppipe[2], downpipe[2], r; + pid_t child; + + mpipe(uppipe); + mpipe(downpipe); + + FileSpecification *remotespec= srcspec.userhost ? &srcspec : &dstspec; + const char *remotemode= srcspec.userhost ? "--sender" : "--receiver"; + + child= fork(); + if (child==-1) diee("fork failed"); + if (!child) { + mdup2(uppipe[0],0); + mdup2(downpipe[1],1); + close(uppipe[0]); close(downpipe[0]); + close(uppipe[1]); close(downpipe[1]); + execlp(rsh_program, + rsh_program, remotespec->userhost, rcopy_repeatedly_program, + remotemode, remotespec->path, (char*)0); + diee("failed to execute rsh program `%s'",rsh_program); } + + commso= fdopen(uppipe[1],"wb"); + commsi= fdopen(downpipe[0],"rb"); + if (!commso || !commsi) diee("fdopen failed"); + close(uppipe[0]); close(downpipe[0]); + close(uppipe[1]); close(downpipe[1]); + + char banbuf[sizeof(banner)-1 + 5 + 1]; + r= fread(banbuf,1,sizeof(banbuf)-1,commsi); + if (r!=sizeof(banbuf)-1) die_badrecv("banner"); + + if (memcmp(banbuf,banner,sizeof(banner)-1) || + banbuf[sizeof(banner)-1 + 4] != ' ') + die(8,-1,"banner received was not as expected -" + " shell dirty? ssh broken?\n" + " try running %s %s %s --sender %s\n" + " and expect the first line to be %s", + rsh_program, remotespec->userhost, + rcopy_repeatedly_program, remotespec->path, + banner); + + banbuf[sizeof(banbuf)-1]= 0; + char *ep; + long decllen= strtoul(banbuf + sizeof(banner)-1, &ep, 16); + if (*ep) die_protocol("declaration length syntax error"); + assert(decllen <= sizeof(mainbuf)); + if (decllen<2) die_protocol("declaration too short"); + + r= fread(mainbuf,1,decllen,commsi); + if (r!=decllen) die_badrecv("declaration"); + if (mainbuf[decllen-1] != '\n') + die_protocol("declaration missing final newline"); + if (mainbuf[0] != upload ? 'u' : 'd') + die_protocol("declaration incorrect direction indicator"); + + sendbyte(REPLMSG_GO); + + if (upload) + sender(srcspec.path); + else + receiver(dstspec.path); } -int main(int argc, const char **argv) { - for ( +static void parse_file_specification(FileSpecification *fs, const char *arg, + const char *what) { + const char *colon; + + if (!arg) badusage("too few arguments - missing %s\n",what); + + for (colon=arg; ; colon++) { + if (!*colon || *colon=='/') { + fs->userhost=0; + fs->path= arg; + return; + } + if (*colon==':') { + char *uh= malloc(colon-arg + 1); if (!fs->userhost) diem(); + memcpy(uh,arg, colon-arg); uh[colon-arg]= 0; + fs->userhost= uh; + fs->path= colon+1; + return; + } + } +} + +int main(int argc, const char *const *argv) { + myopt(&argv, cmdinfos); + + if (upload>=0) { + if (!argv[0] || argv[1]) + badusage("server mode must have just the filename as non-option arg"); + server(argv[0]); + } else { + parse_file_specification(&srcspec, argv[0], "source"); + parse_file_specification(&dstspec, argv[1], "destination"); + if (argv[2]) badusage("too many non-option arguments"); + if (!!srcspec.userhost == !!dstspec.userhost) + badusage("need exactly one remote file argument"); + client(); + } + return 0; +}