+/*
+ * rcopy-repeatedly
+ */
+
/*
* protocol is:
* server sends banner
- * - "chiark-realtime-replicator\n" [27 bytes]
+ * - "#rcopy-repeatedly#\n"
* - length of declaration, as 4 hex digits, zero prefixed,
* and a space [5 bytes]. In this protocol version this
* will be "0002" but client _must_ parse it.
* receiver must then reply with 0x01 GO
*/
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <time.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <assert.h>
+#include <math.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "myopt.h"
+
#define REPLMSG_GO 0x01
-#define REPLMSG_RLE8 0x02
#define REPLMSG_RM 0x03
#define REPLMSG_FILE64 0x04
+static const char banner[]= "#rcopy-repeatedly#\n";
+
static FILE *commsi, *commso;
-static long interval_usec;
-static int bytes_per_sec_log2;
+static double max_bandwidth_proportion_mean= 0.2;
+static double max_bandwidth_proportion_burst= 0.8;
+static int txblocksz= INT_MAX;
+static int min_interval_usec= 100000; /* 100ms */
+
+static const char *rsh_program= "ssh";
+static const char *rcopy_repeatedly_program= "rcopy-repeatedly";
+
+static double stream_allow_secsperbyte= 1/1e6; /* for initial transfer */
+
+static char mainbuf[65536]; /* must be at least 2^16 */
+
+#define NORETURN __attribute__((noreturn))
+
+static void vdie(int ec, int eno, const char *fmt, va_list al) NORETURN;
static void vdie(int ec, int eno, const char *fmt, va_list al) {
- fputs("realtime-replicator: ",stderr);
+ fputs("rcopy-repeatedly: ",stderr);
vfprintf(stderr,fmt,al);
if (eno!=-1) fprintf(stderr,": %s",strerror(eno));
fputc('\n',stderr);
exit(ec);
}
+static void die(int ec, int eno, const char *fmt, ...) NORETURN;
static void die(int ec, int eno, const char *fmt, ...) {
- va_arg al;
+ va_list al;
va_start(al,fmt);
- vdiee(ec,eno,fmt,al);
+ vdie(ec,eno,fmt,al);
}
+static void diem(void) NORETURN;
static void diem(void) { die(16,errno,"malloc failed"); }
+static void diee(const char *fmt, ...) NORETURN;
static void diee(const char *fmt, ...) {
- va_arg al;
+ va_list al;
va_start(al,fmt);
- diee(8,errno,fmt,al);
+ vdie(12,errno,fmt,al);
}
+static void die_protocol(const char *fmt, ...) NORETURN;
static void die_protocol(const char *fmt, ...) {
- va_arg al;
+ va_list al;
+ fputs("protocol error: ",stderr);
va_start(al,fmt);
- diee(12,-1,fmt,al);
+ vdie(10,-1,fmt,al);
+}
+static void die_badrecv(const char *what) NORETURN;
static void die_badrecv(const char *what) {
if (ferror(commsi)) diee("communication failed while receiving %s", what);
if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what);
abort();
}
+static void die_badsend(void) NORETURN;
static void die_badsend(void) {
diee("transmission failed");
}
-static void receiver_write(const unsigned char *buf, int n,
- FILE *newfile, const char *tmpfilename) {
- int r;
+static void send_flush(void) {
+ if (ferror(commso) || fflush(commso))
+ die_badsend();
+}
+static void sendbyte(int c) {
+ if (putc(c,commso)==EOF)
+ die_badsend();
+}
- r= fwrite(dbuf,1,n,newfile);
- if (r != n) diee("failed to write temporary receiving file `%s'",
- tmpfilename);
+static void mpipe(int p[2]) { if (pipe(p)) diee("could not create pipe"); }
+static void mdup2(int fd, int fd2) {
+ if (dup2(fd,fd2)!=fd2) diee("could not dup2(%d,%d)",fd,fd2);
}
-typedef static void copyfile_die_fn(FILE *f, const char *xi);
+typedef void copyfile_die_fn(FILE *f, const char *xi);
+
+struct timespec ts_sendstart;
+
+static void mgettime(struct timespec *ts) {
+ int r= clock_gettime(CLOCK_MONOTONIC, ts);
+ if (r) diee("clock_gettime failed");
+}
+
+static void bandlimit_sendstart(void) {
+ mgettime(&ts_sendstart);
+}
+
+static double mgettime_elapsed(struct timespec ts_base,
+ struct timespec *ts_ret) {
+ mgettime(ts_ret);
+ return (ts_ret->tv_sec - ts_base.tv_sec) +
+ (ts_ret->tv_nsec - ts_base.tv_nsec)*1e-9;
+}
+
+static void bandlimit_sendend(uint64_t bytes, int *interval_usec_update) {
+ struct timespec ts_buf;
+ double elapsed= mgettime_elapsed(ts_sendstart, &ts_buf);
+ double secsperbyte_observed= elapsed / bytes;
+
+ stream_allow_secsperbyte=
+ secsperbyte_observed / max_bandwidth_proportion_burst;
+
+ double min_update= elapsed * max_bandwidth_proportion_burst
+ / max_bandwidth_proportion_mean;
+ if (min_update > 1e3) min_update= 1e3;
+ int min_update_usec= min_update * 1e6;
+
+ if (*interval_usec_update > min_update_usec)
+ *interval_usec_update= min_update_usec;
+}
-static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi
+static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi,
FILE *df, copyfile_die_fn *ddie, const char *dxi,
uint64_t l) {
- char buf[65536];
+ struct timespec ts_last;
+ int now, r;
-fixme rate limit
-fixme adjustable chunk size
+ ts_last= ts_sendstart;
while (l>=0) {
- now= l < sizeof(buf) ? l : sizeof(buf);
+ now= l < sizeof(mainbuf) ? l : sizeof(mainbuf);
+ if (now > txblocksz) now= txblocksz;
-
+ double elapsed_want= now * stream_allow_secsperbyte;
+ double elapsed= mgettime_elapsed(ts_last, &ts_last);
+ double needwait= elapsed_want - elapsed;
+ if (needwait > 1) needwait= 1;
+ if (needwait > 0) usleep(ceil(needwait * 1e6));
- r= fread(buf,1,now,sf); if (r!=now) sdie(sf,sxi);
- r= fwrite(buf,1,now,df); if (r!=now) ddie(df,dxi);
+ r= fread(mainbuf,1,now,sf); if (r!=now) sdie(sf,sxi);
+ r= fwrite(mainbuf,1,now,df); if (r!=now) ddie(df,dxi);
l -= now;
}
}
+static void copydie_inputfile(FILE *f, const char *filename) {
+ diee("read failed on source file `%s'", filename);
+}
static void copydie_tmpwrite(FILE *f, const char *tmpfilename) {
diee("write failed to temporary receiving file `%s'", tmpfilename);
}
static void receiver(const char *filename) {
FILE *newfile;
char *tmpfilename;
+ int r, c;
- if (asprintf(&tmpfilename, ".realtime-replicator.#%s#")==-1) diem();
+ if (asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename)
+ ==-1) diem();
r= unlink(tmpfilename);
if (r && errno!=ENOENT)
return;
case REPLMSG_FILE64:
- newfile= fopen(tmpfilename, "w");
+ newfile= fopen(tmpfilename, "wb");
if (!newfile) diee("could not create temporary receiving file `%s'",
tmpfilename);
uint8_t lbuf[8];
}
static void sender(const char *filename) {
- FILE *f;
- int told_removed= 0;
- struct stat stab;
-
+ FILE *f, *fold;
+ int interval_usec, r, c;
+ struct stat stabtest, stab;
+ enum { told_nothing, told_file, told_remove } told;
+
+ interval_usec= 0;
+ fold= 0;
+ told= told_nothing;
+
for (;;) {
- f= fopen(filename, "r");
+ if (interval_usec) usleep(interval_usec);
+ interval_usec= min_interval_usec;
+
+ r= stat(filename, &stabtest);
+ if (r) {
+ f= 0;
+ } else {
+ if (told == told_file &&
+ stabtest.st_mode == stab.st_mode &&
+ stabtest.st_dev == stab.st_dev &&
+ stabtest.st_ino == stab.st_ino)
+ continue;
+ f= fopen(filename, "rb");
+ }
+
if (!f) {
if (errno!=ENOENT) diee("could not access source file `%s'",filename);
- if (told_removed) {
- usleep(interval_usec);
- continue;
+ if (told != told_remove) {
+ sendbyte(REPLMSG_RM);
+ told= told_remove;
}
- told_removed= 1;
- sendbyte(REPLMSG_RM);
continue;
}
+ if (fold) fclose(fold);
+ fold= 0;
+
r= fstat(fileno(f),&stab);
if (r) diee("could not fstat source file `%s'",filename);
stab.st_size >> 8,
stab.st_size
};
+
+ bandlimit_sendstart();
r= fwrite(hbuf,1,9,commso); if (r!=9) die_badsend();
- told_removed= 0;
copyfile(f, copydie_inputfile,filename,
- commso, copydie_commso,0);
+ commso, copydie_commso,0,
+ stab.st_size);
send_flush();
c= fgetc(commsi); if (c==EOF) die_badrecv("ack");
if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c);
- usleep(interval_usec);
+ bandlimit_sendend(stab.st_size, &interval_usec);
+
+ fold= f;
+ told= told_file;
+ }
+}
+
+void usagemessage(void) {
+ puts("usage: rcopy-repeatedly [<options>] <file> <file>\n"
+ " <file> may be <local-file> or [<user>@]<host>:<file>\n"
+ " exactly one of each of the two forms must be provided\n"
+ " a file is taken as remote if it has a : before the first /\n"
+ "options\n"
+ " --help\n");
+}
+
+static void of_help(const struct cmdinfo *ci, const char *val) {
+ usagemessage();
+ if (ferror(stdout)) diee("could not write usage message to stdout");
+ exit(0);
+}
+
+typedef struct {
+ const char *userhost, *path;
+} FileSpecification;
+
+static FileSpecification srcspec, dstspec;
+static int upload=-1; /* -1 means not yet known; 0 means download */
+
+static const struct cmdinfo cmdinfos[]= {
+ { "help", .call= of_help },
+ { "receiver", .iassignto=&upload, .arg=1 },
+ { "sender", .iassignto=&upload, .arg=0 },
+ { 0 }
+};
+
+static void server(const char *filename) {
+ int c;
+ commsi= stdin;
+ commso= stdout;
+ fprintf(commso, "%s0002 %c\n", banner, upload?'u':'d');
+ send_flush();
+ c= fgetc(commsi); if (c==EOF) die_badrecv("initial go");
+ if (c!=REPLMSG_GO) die_protocol("initial go message was %#02x instead",c);
+
+ if (upload)
+ receiver(filename);
+ else
+ sender(filename);
+}
+
+static void client(void) {
+ int uppipe[2], downpipe[2], r;
+ pid_t child;
+
+ mpipe(uppipe);
+ mpipe(downpipe);
+
+ FileSpecification *remotespec= srcspec.userhost ? &srcspec : &dstspec;
+ const char *remotemode= srcspec.userhost ? "--sender" : "--receiver";
+
+ child= fork();
+ if (child==-1) diee("fork failed");
+ if (!child) {
+ mdup2(uppipe[0],0);
+ mdup2(downpipe[1],1);
+ close(uppipe[0]); close(downpipe[0]);
+ close(uppipe[1]); close(downpipe[1]);
+ execlp(rsh_program,
+ rsh_program, remotespec->userhost, rcopy_repeatedly_program,
+ remotemode, remotespec->path, (char*)0);
+ diee("failed to execute rsh program `%s'",rsh_program);
}
+
+ commso= fdopen(uppipe[1],"wb");
+ commsi= fdopen(downpipe[0],"rb");
+ if (!commso || !commsi) diee("fdopen failed");
+ close(uppipe[0]); close(downpipe[0]);
+ close(uppipe[1]); close(downpipe[1]);
+
+ char banbuf[sizeof(banner)-1 + 5 + 1];
+ r= fread(banbuf,1,sizeof(banbuf)-1,commsi);
+ if (r!=sizeof(banbuf)-1) die_badrecv("banner");
+
+ if (memcmp(banbuf,banner,sizeof(banner)-1) ||
+ banbuf[sizeof(banner)-1 + 4] != ' ')
+ die(8,-1,"banner received was not as expected -"
+ " shell dirty? ssh broken?\n"
+ " try running %s %s %s --sender %s\n"
+ " and expect the first line to be %s",
+ rsh_program, remotespec->userhost,
+ rcopy_repeatedly_program, remotespec->path,
+ banner);
+
+ banbuf[sizeof(banbuf)-1]= 0;
+ char *ep;
+ long decllen= strtoul(banbuf + sizeof(banner)-1, &ep, 16);
+ if (*ep) die_protocol("declaration length syntax error");
+ assert(decllen <= sizeof(mainbuf));
+ if (decllen<2) die_protocol("declaration too short");
+
+ r= fread(mainbuf,1,decllen,commsi);
+ if (r!=decllen) die_badrecv("declaration");
+ if (mainbuf[decllen-1] != '\n')
+ die_protocol("declaration missing final newline");
+ if (mainbuf[0] != upload ? 'u' : 'd')
+ die_protocol("declaration incorrect direction indicator");
+
+ sendbyte(REPLMSG_GO);
+
+ if (upload)
+ sender(srcspec.path);
+ else
+ receiver(dstspec.path);
}
-int main(int argc, const char **argv) {
- for (
+static void parse_file_specification(FileSpecification *fs, const char *arg,
+ const char *what) {
+ const char *colon;
+
+ if (!arg) badusage("too few arguments - missing %s\n",what);
+
+ for (colon=arg; ; colon++) {
+ if (!*colon || *colon=='/') {
+ fs->userhost=0;
+ fs->path= arg;
+ return;
+ }
+ if (*colon==':') {
+ char *uh= malloc(colon-arg + 1); if (!fs->userhost) diem();
+ memcpy(uh,arg, colon-arg); uh[colon-arg]= 0;
+ fs->userhost= uh;
+ fs->path= colon+1;
+ return;
+ }
+ }
+}
+
+int main(int argc, const char *const *argv) {
+ myopt(&argv, cmdinfos);
+
+ if (upload>=0) {
+ if (!argv[0] || argv[1])
+ badusage("server mode must have just the filename as non-option arg");
+ server(argv[0]);
+ } else {
+ parse_file_specification(&srcspec, argv[0], "source");
+ parse_file_specification(&dstspec, argv[1], "destination");
+ if (argv[2]) badusage("too many non-option arguments");
+ if (!!srcspec.userhost == !!dstspec.userhost)
+ badusage("need exactly one remote file argument");
+ client();
+ }
+ return 0;
+}