* client sends
* - 0x01 go
* then for each update
- * sender sends
- * - 0x02 update using rle and 8-bit counts
- * - zero or more repetitions of
- * n single byte giving length of data same as last time
- * d single byte giving length of data changed
- * ... d bytes of data
- * where n!=0 or d!=0 (and for first update, n==0)
- * - 0x00 0x00
- * indicates file is complete and should be installed
- * or server may send
+ * sender sends one of
* - 0x03 destination file should be deleted
* but note that contents must be retained by receiver
* as it may be used for rle updates
+ * - 0x04 complete new destination file follows, 64-bit length
+ * l 8 bytes big endian length
+ * ... l bytes data
+ * receiver must then reply with 0x01 GO
*/
-#define REPLMSG_GO 0x01
-#define REPLMSG_RLE8 0x02
-#define REPLMSG_RM 0x03
+#define REPLMSG_GO 0x01
+#define REPLMSG_RLE8 0x02
+#define REPLMSG_RM 0x03
+#define REPLMSG_FILE64 0x04
-static void vdie(int ec, const char *fmt, const char *emstr, va_list al) {
+static FILE *commsi, *commso;
+static long interval_usec;
+static int bytes_per_sec_log2;
+
+static void vdie(int ec, int eno, const char *fmt, va_list al) {
fputs("realtime-replicator: ",stderr);
vfprintf(stderr,fmt,al);
- if (emstr) fprintf(stderr,": %s",emstr);
+ if (eno!=-1) fprintf(stderr,": %s",strerror(eno));
fputc('\n',stderr);
exit(ec);
}
-static void die(int ec, const char *fmt, const char *emstr, ...) {
+static void die(int ec, int eno, const char *fmt, ...) {
va_arg al;
va_start(al,fmt);
- vdiee(ec,fmt,emstr,al);
+ vdiee(ec,eno,fmt,al);
}
-static void diem(void) { die(16,"malloc failed",strerror(errno)); }
+static void diem(void) { die(16,errno,"malloc failed"); }
static void diee(const char *fmt, ...) {
- const char *em= strerror(errno);
va_arg al;
va_start(al,fmt);
- diee(8,fmt,em,al);
+ diee(8,errno,fmt,al);
}
static void die_protocol(const char *fmt, ...) {
va_arg al;
va_start(al,fmt);
- diee(12,fmt,0,al);
+ diee(12,-1,fmt,al);
-static void die_badrecv(FILE *f, const char *what) {
- if (ferror(f)) diee("transmission failed while receiving %s", what);
- if (feof(f)) die_protocol("receiver got unexpected EOF in %s", what);
+static void die_badrecv(const char *what) {
+ if (ferror(commsi)) diee("communication failed while receiving %s", what);
+ if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what);
abort();
}
+static void die_badsend(void) {
+ diee("transmission failed");
+}
static void receiver_write(const unsigned char *buf, int n,
FILE *newfile, const char *tmpfilename) {
if (r != n) diee("failed to write temporary receiving file `%s'",
tmpfilename);
}
+
+typedef static void copyfile_die_fn(FILE *f, const char *xi);
-static void receiver(const char *filename, FILE *comms) {
- FILE *lastfile=0, *newfile;
+static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi
+ FILE *df, copyfile_die_fn *ddie, const char *dxi,
+ uint64_t l) {
+ char buf[65536];
+
+fixme rate limit
+fixme adjustable chunk size
+
+ while (l>=0) {
+ now= l < sizeof(buf) ? l : sizeof(buf);
+
+
+
+ r= fread(buf,1,now,sf); if (r!=now) sdie(sf,sxi);
+ r= fwrite(buf,1,now,df); if (r!=now) ddie(df,dxi);
+ l -= now;
+ }
+}
+
+static void copydie_tmpwrite(FILE *f, const char *tmpfilename) {
+ diee("write failed to temporary receiving file `%s'", tmpfilename);
+}
+static void copydie_commsi(FILE *f, const char *what) {
+ die_badrecv(what);
+}
+static void copydie_commso(FILE *f, const char *what) {
+ die_badsend();
+}
+
+static void receiver(const char *filename) {
+ FILE *newfile;
char *tmpfilename;
if (asprintf(&tmpfilename, ".realtime-replicator.#%s#")==-1) diem();
diee("could not remove temporary receiving file `%s'", tmpfilename);
for (;;) {
- c= fgetc(comms); if (c==EOF) break;
+ send_flush();
+ c= fgetc(commsi);
+
switch (c) {
- case REPLMSG_RLE8:
+ case EOF:
+ if (ferror(commsi)) die_badrecv("transfer message code");
+ assert(feof(commsi));
+ return;
+
+ case REPLMSG_FILE64:
newfile= fopen(tmpfilename, "w");
if (!newfile) diee("could not create temporary receiving file `%s'",
tmpfilename);
- for (;;) {
- unsigned char lu[2], dbuf[255];
- r= fread(lu,1,2,comms); if (r!=2) die_badrecv(comms,"RLE8 elem hdr");
- if (!lu[0] && !lu[1]) break;
- if (lu[0]) {
- if (!lastfile) die_protocol("first RLE8 requests some previous");
- r= fread(dbuf,1,lu[0],lastfile);
- if (r!=lu[0]) {
- if (ferror(lastfile)) diee("could not read current (old) file"
- " `%s'", filename);
- assert(feof(lastfile));
- die_protocol("RLE8 requests more previous than is available");
- }
- receiver_write(dbuf,lu[0],newfile,tmpfilename);
- }
- if (lu[1]) {
- r= fread(dbuf,1,lu[1],comms);
- if (r!=lu[1]) die_badrecv(comms,"RLE8 literal data");
- receiver_write(dbuf,lu[1],newfile,tmpfilename);
- }
- }
- if (fflush(newfile)) diee("could not flush temporary receiving file"
- " `%s'", tmpfilename);
+ uint8_t lbuf[8];
+ r= fread(lbuf,1,8,commsi); if (r!=8) die_badrecv("FILE64 l");
+
+ uint64_t l=
+ (lbuf[0] << 28 << 28) |
+ (lbuf[1] << 24 << 24) |
+ (lbuf[2] << 16 << 24) |
+ (lbuf[3] << 8 << 24) |
+ (lbuf[4] << 24) |
+ (lbuf[5] << 16) |
+ (lbuf[6] << 8) |
+ (lbuf[7] ) ;
+
+ copyfile(commsi, copydie_commsi,"FILE64 file data",
+ newfile, copydie_tmpwrite,tmpfilename,
+ l);
+
+ if (fclose(newfile)) diee("could not flush and close temporary"
+ " receiving file `%s'", tmpfilename);
if (rename(tmpfilename, filename))
diee("could not install new version of destination file `%s'",
filename);
- if (fclose(lastfile)) diee("failed to close old (now unlinked) file");
- lastfile= newfile;
- r= fseek(lastfile,0,SEEK_SET);
- if (r) diee("failed to seek installed destination file `%s'", filename);
- continue;
- case REPLMSG_RM:
- r= unlink(filename);
- if (r && errno!=ENOENT)
- diee("could not remove destination file `%s' (as instructed"
- " by sender)", filename);
- continue;
+ sendbyte(REPLMSG_GO);
default:
die_protocol("unknown transfer message code 0x%02x",c);
+
}
}
- if (feof(comms)) return 0;
- die_badrecv(comms,"transfer message code");
}
- static void sender(const char *filename, FILE *comms,
- unsigned long interval_usec) {
- struct stat stab, laststab;
- memset(&stab,0,sizeof(stab));
- FILE *lastfile, *newfile;
+static void sender(const char *filename) {
+ FILE *f;
int told_removed= 0;
+ struct stat stab;
for (;;) {
- r= stat(filename, &stab);
- if (r) {
- newfile= 0;
- } else {
- if (stab.st_dev == laststab.st_dev &&
- stab.st_ino == laststab.st_ino) {
- usleep(interval_usec);
- continue;
- }
- newfile= fopen(filename, "r");
- }
- if (!newfile) {
+ f= fopen(filename, "r");
+ if (!f) {
if (errno!=ENOENT) diee("could not access source file `%s'",filename);
if (told_removed) {
usleep(interval_usec);
continue;
}
- sender_sendbyte(REPLMSG_RM);
+ told_removed= 1;
+ sendbyte(REPLMSG_RM);
continue;
}
- for (;;) {
- n= 0;
- while (n<255) {
- cl= nextchar(lastfile);
- cn= nextchar(newfile);
- if (cl!=cn) {
-
- if (!feof(lastfile)) {
- cl= fgetc(lastfile);
- if (ferror(lastfile))
- diee("could not read old source file `%s'",filename);
- assert(cl != EOF || feof(lastfile));
- }
-
- if (errno!=ENOENT) diee("could not open source file `%s'",filename);
- sender_send
-
- if (r)
-}
+ r= fstat(fileno(f),&stab);
+ if (r) diee("could not fstat source file `%s'",filename);
+
+ if (!S_ISREG(stab.st_mode))
+ die(12,-1,"source file `%s' is not a plain file",filename);
-static void sender(
+ uint8_t hbuf[9]= {
+ REPLMSG_FILE64,
+ stab.st_size >> 28 >> 28,
+ stab.st_size >> 24 >> 24,
+ stab.st_size >> 16 >> 24,
+ stab.st_size >> 8 >> 24,
+ stab.st_size >> 24,
+ stab.st_size >> 16,
+ stab.st_size >> 8,
+ stab.st_size
+ };
+
+ r= fwrite(hbuf,1,9,commso); if (r!=9) die_badsend();
+ told_removed= 0;
- assert(ferror(newfile));
-
+ copyfile(f, copydie_inputfile,filename,
+ commso, copydie_commso,0);
- r= fread(dbuf,1,lu[0],comms);
- if (r!=lu[0]) die_badrecv(comms,
-
+ send_flush();
- c= fgetc(comms);
+ if (fclose(f)) diee("couldn't close source file `%s'",filename);
+ c= fgetc(commsi); if (c==EOF) die_badrecv("ack");
+ if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c);
+
+ usleep(interval_usec);
+ }
+}
int main(int argc, const char **argv) {
for (