From 8792b03f1151e481b144a257c58bfc8efa7aaf8a Mon Sep 17 00:00:00 2001 From: ianmdlvl Date: Fri, 3 Oct 2008 20:18:06 +0000 Subject: [PATCH] wip rcopy-repeatedly --- cprogs/rcopy-repeatedly.c | 233 ++++++++++++++++++++------------------ 1 file changed, 125 insertions(+), 108 deletions(-) diff --git a/cprogs/rcopy-repeatedly.c b/cprogs/rcopy-repeatedly.c index 489ab2d..3b64315 100644 --- a/cprogs/rcopy-repeatedly.c +++ b/cprogs/rcopy-repeatedly.c @@ -13,56 +13,58 @@ * client sends * - 0x01 go * then for each update - * sender sends - * - 0x02 update using rle and 8-bit counts - * - zero or more repetitions of - * n single byte giving length of data same as last time - * d single byte giving length of data changed - * ... d bytes of data - * where n!=0 or d!=0 (and for first update, n==0) - * - 0x00 0x00 - * indicates file is complete and should be installed - * or server may send + * sender sends one of * - 0x03 destination file should be deleted * but note that contents must be retained by receiver * as it may be used for rle updates + * - 0x04 complete new destination file follows, 64-bit length + * l 8 bytes big endian length + * ... l bytes data + * receiver must then reply with 0x01 GO */ -#define REPLMSG_GO 0x01 -#define REPLMSG_RLE8 0x02 -#define REPLMSG_RM 0x03 +#define REPLMSG_GO 0x01 +#define REPLMSG_RLE8 0x02 +#define REPLMSG_RM 0x03 +#define REPLMSG_FILE64 0x04 -static void vdie(int ec, const char *fmt, const char *emstr, va_list al) { +static FILE *commsi, *commso; +static long interval_usec; +static int bytes_per_sec_log2; + +static void vdie(int ec, int eno, const char *fmt, va_list al) { fputs("realtime-replicator: ",stderr); vfprintf(stderr,fmt,al); - if (emstr) fprintf(stderr,": %s",emstr); + if (eno!=-1) fprintf(stderr,": %s",strerror(eno)); fputc('\n',stderr); exit(ec); } -static void die(int ec, const char *fmt, const char *emstr, ...) { +static void die(int ec, int eno, const char *fmt, ...) { va_arg al; va_start(al,fmt); - vdiee(ec,fmt,emstr,al); + vdiee(ec,eno,fmt,al); } -static void diem(void) { die(16,"malloc failed",strerror(errno)); } +static void diem(void) { die(16,errno,"malloc failed"); } static void diee(const char *fmt, ...) { - const char *em= strerror(errno); va_arg al; va_start(al,fmt); - diee(8,fmt,em,al); + diee(8,errno,fmt,al); } static void die_protocol(const char *fmt, ...) { va_arg al; va_start(al,fmt); - diee(12,fmt,0,al); + diee(12,-1,fmt,al); -static void die_badrecv(FILE *f, const char *what) { - if (ferror(f)) diee("transmission failed while receiving %s", what); - if (feof(f)) die_protocol("receiver got unexpected EOF in %s", what); +static void die_badrecv(const char *what) { + if (ferror(commsi)) diee("communication failed while receiving %s", what); + if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what); abort(); } +static void die_badsend(void) { + diee("transmission failed"); +} static void receiver_write(const unsigned char *buf, int n, FILE *newfile, const char *tmpfilename) { @@ -72,9 +74,40 @@ static void receiver_write(const unsigned char *buf, int n, if (r != n) diee("failed to write temporary receiving file `%s'", tmpfilename); } + +typedef static void copyfile_die_fn(FILE *f, const char *xi); -static void receiver(const char *filename, FILE *comms) { - FILE *lastfile=0, *newfile; +static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi + FILE *df, copyfile_die_fn *ddie, const char *dxi, + uint64_t l) { + char buf[65536]; + +fixme rate limit +fixme adjustable chunk size + + while (l>=0) { + now= l < sizeof(buf) ? l : sizeof(buf); + + + + r= fread(buf,1,now,sf); if (r!=now) sdie(sf,sxi); + r= fwrite(buf,1,now,df); if (r!=now) ddie(df,dxi); + l -= now; + } +} + +static void copydie_tmpwrite(FILE *f, const char *tmpfilename) { + diee("write failed to temporary receiving file `%s'", tmpfilename); +} +static void copydie_commsi(FILE *f, const char *what) { + die_badrecv(what); +} +static void copydie_commso(FILE *f, const char *what) { + die_badsend(); +} + +static void receiver(const char *filename) { + FILE *newfile; char *tmpfilename; if (asprintf(&tmpfilename, ".realtime-replicator.#%s#")==-1) diem(); @@ -84,120 +117,104 @@ static void receiver(const char *filename, FILE *comms) { diee("could not remove temporary receiving file `%s'", tmpfilename); for (;;) { - c= fgetc(comms); if (c==EOF) break; + send_flush(); + c= fgetc(commsi); + switch (c) { - case REPLMSG_RLE8: + case EOF: + if (ferror(commsi)) die_badrecv("transfer message code"); + assert(feof(commsi)); + return; + + case REPLMSG_FILE64: newfile= fopen(tmpfilename, "w"); if (!newfile) diee("could not create temporary receiving file `%s'", tmpfilename); - for (;;) { - unsigned char lu[2], dbuf[255]; - r= fread(lu,1,2,comms); if (r!=2) die_badrecv(comms,"RLE8 elem hdr"); - if (!lu[0] && !lu[1]) break; - if (lu[0]) { - if (!lastfile) die_protocol("first RLE8 requests some previous"); - r= fread(dbuf,1,lu[0],lastfile); - if (r!=lu[0]) { - if (ferror(lastfile)) diee("could not read current (old) file" - " `%s'", filename); - assert(feof(lastfile)); - die_protocol("RLE8 requests more previous than is available"); - } - receiver_write(dbuf,lu[0],newfile,tmpfilename); - } - if (lu[1]) { - r= fread(dbuf,1,lu[1],comms); - if (r!=lu[1]) die_badrecv(comms,"RLE8 literal data"); - receiver_write(dbuf,lu[1],newfile,tmpfilename); - } - } - if (fflush(newfile)) diee("could not flush temporary receiving file" - " `%s'", tmpfilename); + uint8_t lbuf[8]; + r= fread(lbuf,1,8,commsi); if (r!=8) die_badrecv("FILE64 l"); + + uint64_t l= + (lbuf[0] << 28 << 28) | + (lbuf[1] << 24 << 24) | + (lbuf[2] << 16 << 24) | + (lbuf[3] << 8 << 24) | + (lbuf[4] << 24) | + (lbuf[5] << 16) | + (lbuf[6] << 8) | + (lbuf[7] ) ; + + copyfile(commsi, copydie_commsi,"FILE64 file data", + newfile, copydie_tmpwrite,tmpfilename, + l); + + if (fclose(newfile)) diee("could not flush and close temporary" + " receiving file `%s'", tmpfilename); if (rename(tmpfilename, filename)) diee("could not install new version of destination file `%s'", filename); - if (fclose(lastfile)) diee("failed to close old (now unlinked) file"); - lastfile= newfile; - r= fseek(lastfile,0,SEEK_SET); - if (r) diee("failed to seek installed destination file `%s'", filename); - continue; - case REPLMSG_RM: - r= unlink(filename); - if (r && errno!=ENOENT) - diee("could not remove destination file `%s' (as instructed" - " by sender)", filename); - continue; + sendbyte(REPLMSG_GO); default: die_protocol("unknown transfer message code 0x%02x",c); + } } - if (feof(comms)) return 0; - die_badrecv(comms,"transfer message code"); } - static void sender(const char *filename, FILE *comms, - unsigned long interval_usec) { - struct stat stab, laststab; - memset(&stab,0,sizeof(stab)); - FILE *lastfile, *newfile; +static void sender(const char *filename) { + FILE *f; int told_removed= 0; + struct stat stab; for (;;) { - r= stat(filename, &stab); - if (r) { - newfile= 0; - } else { - if (stab.st_dev == laststab.st_dev && - stab.st_ino == laststab.st_ino) { - usleep(interval_usec); - continue; - } - newfile= fopen(filename, "r"); - } - if (!newfile) { + f= fopen(filename, "r"); + if (!f) { if (errno!=ENOENT) diee("could not access source file `%s'",filename); if (told_removed) { usleep(interval_usec); continue; } - sender_sendbyte(REPLMSG_RM); + told_removed= 1; + sendbyte(REPLMSG_RM); continue; } - for (;;) { - n= 0; - while (n<255) { - cl= nextchar(lastfile); - cn= nextchar(newfile); - if (cl!=cn) { - - if (!feof(lastfile)) { - cl= fgetc(lastfile); - if (ferror(lastfile)) - diee("could not read old source file `%s'",filename); - assert(cl != EOF || feof(lastfile)); - } - - if (errno!=ENOENT) diee("could not open source file `%s'",filename); - sender_send - - if (r) -} + r= fstat(fileno(f),&stab); + if (r) diee("could not fstat source file `%s'",filename); + + if (!S_ISREG(stab.st_mode)) + die(12,-1,"source file `%s' is not a plain file",filename); -static void sender( + uint8_t hbuf[9]= { + REPLMSG_FILE64, + stab.st_size >> 28 >> 28, + stab.st_size >> 24 >> 24, + stab.st_size >> 16 >> 24, + stab.st_size >> 8 >> 24, + stab.st_size >> 24, + stab.st_size >> 16, + stab.st_size >> 8, + stab.st_size + }; + + r= fwrite(hbuf,1,9,commso); if (r!=9) die_badsend(); + told_removed= 0; - assert(ferror(newfile)); - + copyfile(f, copydie_inputfile,filename, + commso, copydie_commso,0); - r= fread(dbuf,1,lu[0],comms); - if (r!=lu[0]) die_badrecv(comms, - + send_flush(); - c= fgetc(comms); + if (fclose(f)) diee("couldn't close source file `%s'",filename); + c= fgetc(commsi); if (c==EOF) die_badrecv("ack"); + if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c); + + usleep(interval_usec); + } +} int main(int argc, const char **argv) { for ( -- 2.30.2