8 * - "#rcopy-repeatedly#\n"
9 * - length of declaration, as 4 hex digits, zero prefixed,
10 * and a space [5 bytes]. In this protocol version this
11 * will be "0002" but client _must_ parse it.
12 * server sends declaration
13 * - one of "u " or "d" [1 byte]
14 * - optionally, some more ascii text, reserved for future use
15 * must be ignored by client (but not sent by server)
16 * - a newline [1 byte]
19 * then for each update
21 * - 0x03 destination file should be deleted
22 * but note that contents must be retained by receiver
23 * as it may be used for rle updates
24 * - 0x04 complete new destination file follows, 64-bit length
25 * l 8 bytes big endian length
27 * receiver must then reply with 0x01 GO
43 #include <sys/types.h>
49 #define REPLMSG_GO 0x01
50 #define REPLMSG_RM 0x03
51 #define REPLMSG_FILE64 0x04
53 static const char banner[]= "#rcopy-repeatedly#\n";
55 static FILE *commsi, *commso;
57 static double max_bandwidth_proportion_mean= 0.2;
58 static double max_bandwidth_proportion_burst= 0.8;
59 static int txblocksz= INT_MAX;
60 static int min_interval_usec= 100000; /* 100ms */
62 static const char *rsh_program= "ssh";
63 static const char *rcopy_repeatedly_program= "rcopy-repeatedly";
65 static double stream_allow_secsperbyte= 1/1e6; /* for initial transfer */
67 static char mainbuf[65536]; /* must be at least 2^16 */
69 #define NORETURN __attribute__((noreturn))
71 static void vdie(int ec, int eno, const char *fmt, va_list al) NORETURN;
72 static void vdie(int ec, int eno, const char *fmt, va_list al) {
73 fputs("rcopy-repeatedly: ",stderr);
74 vfprintf(stderr,fmt,al);
75 if (eno!=-1) fprintf(stderr,": %s",strerror(eno));
79 static void die(int ec, int eno, const char *fmt, ...) NORETURN;
80 static void die(int ec, int eno, const char *fmt, ...) {
86 static void diem(void) NORETURN;
87 static void diem(void) { die(16,errno,"malloc failed"); }
89 static void diee(const char *fmt, ...) NORETURN;
90 static void diee(const char *fmt, ...) {
93 vdie(12,errno,fmt,al);
95 static void die_protocol(const char *fmt, ...) NORETURN;
96 static void die_protocol(const char *fmt, ...) {
98 fputs("protocol error: ",stderr);
103 static void die_badrecv(const char *what) NORETURN;
104 static void die_badrecv(const char *what) {
105 if (ferror(commsi)) diee("communication failed while receiving %s", what);
106 if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what);
109 static void die_badsend(void) NORETURN;
110 static void die_badsend(void) {
111 diee("transmission failed");
114 static void send_flush(void) {
115 if (ferror(commso) || fflush(commso))
118 static void sendbyte(int c) {
119 if (putc(c,commso)==EOF)
123 static void mpipe(int p[2]) { if (pipe(p)) diee("could not create pipe"); }
124 static void mdup2(int fd, int fd2) {
125 if (dup2(fd,fd2)!=fd2) diee("could not dup2(%d,%d)",fd,fd2);
128 typedef void copyfile_die_fn(FILE *f, const char *xi);
130 struct timespec ts_sendstart;
132 static void mgettime(struct timespec *ts) {
133 int r= clock_gettime(CLOCK_MONOTONIC, ts);
134 if (r) diee("clock_gettime failed");
137 static void bandlimit_sendstart(void) {
138 mgettime(&ts_sendstart);
141 static double mgettime_elapsed(struct timespec ts_base,
142 struct timespec *ts_ret) {
144 return (ts_ret->tv_sec - ts_base.tv_sec) +
145 (ts_ret->tv_nsec - ts_base.tv_nsec)*1e-9;
148 static void bandlimit_sendend(uint64_t bytes, int *interval_usec_update) {
149 struct timespec ts_buf;
150 double elapsed= mgettime_elapsed(ts_sendstart, &ts_buf);
151 double secsperbyte_observed= elapsed / bytes;
153 stream_allow_secsperbyte=
154 secsperbyte_observed / max_bandwidth_proportion_burst;
156 double min_update= elapsed * max_bandwidth_proportion_burst
157 / max_bandwidth_proportion_mean;
158 if (min_update > 1e3) min_update= 1e3;
159 int min_update_usec= min_update * 1e6;
161 if (*interval_usec_update > min_update_usec)
162 *interval_usec_update= min_update_usec;
165 static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi,
166 FILE *df, copyfile_die_fn *ddie, const char *dxi,
168 struct timespec ts_last;
171 ts_last= ts_sendstart;
174 now= l < sizeof(mainbuf) ? l : sizeof(mainbuf);
175 if (now > txblocksz) now= txblocksz;
177 double elapsed_want= now * stream_allow_secsperbyte;
178 double elapsed= mgettime_elapsed(ts_last, &ts_last);
179 double needwait= elapsed_want - elapsed;
180 if (needwait > 1) needwait= 1;
181 if (needwait > 0) usleep(ceil(needwait * 1e6));
183 r= fread(mainbuf,1,now,sf); if (r!=now) sdie(sf,sxi);
184 r= fwrite(mainbuf,1,now,df); if (r!=now) ddie(df,dxi);
189 static void copydie_inputfile(FILE *f, const char *filename) {
190 diee("read failed on source file `%s'", filename);
192 static void copydie_tmpwrite(FILE *f, const char *tmpfilename) {
193 diee("write failed to temporary receiving file `%s'", tmpfilename);
195 static void copydie_commsi(FILE *f, const char *what) {
198 static void copydie_commso(FILE *f, const char *what) {
202 static void receiver(const char *filename) {
207 if (asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename)
210 r= unlink(tmpfilename);
211 if (r && errno!=ENOENT)
212 diee("could not remove temporary receiving file `%s'", tmpfilename);
221 if (ferror(commsi)) die_badrecv("transfer message code");
222 assert(feof(commsi));
226 newfile= fopen(tmpfilename, "wb");
227 if (!newfile) diee("could not create temporary receiving file `%s'",
230 r= fread(lbuf,1,8,commsi); if (r!=8) die_badrecv("FILE64 l");
233 (lbuf[0] << 28 << 28) |
234 (lbuf[1] << 24 << 24) |
235 (lbuf[2] << 16 << 24) |
236 (lbuf[3] << 8 << 24) |
242 copyfile(commsi, copydie_commsi,"FILE64 file data",
243 newfile, copydie_tmpwrite,tmpfilename,
246 if (fclose(newfile)) diee("could not flush and close temporary"
247 " receiving file `%s'", tmpfilename);
248 if (rename(tmpfilename, filename))
249 diee("could not install new version of destination file `%s'",
252 sendbyte(REPLMSG_GO);
255 die_protocol("unknown transfer message code 0x%02x",c);
261 static void sender(const char *filename) {
263 int interval_usec, r, c;
264 struct stat stabtest, stab;
265 enum { told_nothing, told_file, told_remove } told;
272 if (interval_usec) usleep(interval_usec);
273 interval_usec= min_interval_usec;
275 r= stat(filename, &stabtest);
279 if (told == told_file &&
280 stabtest.st_mode == stab.st_mode &&
281 stabtest.st_dev == stab.st_dev &&
282 stabtest.st_ino == stab.st_ino)
284 f= fopen(filename, "rb");
288 if (errno!=ENOENT) diee("could not access source file `%s'",filename);
289 if (told != told_remove) {
290 sendbyte(REPLMSG_RM);
296 if (fold) fclose(fold);
299 r= fstat(fileno(f),&stab);
300 if (r) diee("could not fstat source file `%s'",filename);
302 if (!S_ISREG(stab.st_mode))
303 die(12,-1,"source file `%s' is not a plain file",filename);
307 stab.st_size >> 28 >> 28,
308 stab.st_size >> 24 >> 24,
309 stab.st_size >> 16 >> 24,
310 stab.st_size >> 8 >> 24,
317 bandlimit_sendstart();
319 r= fwrite(hbuf,1,9,commso); if (r!=9) die_badsend();
321 copyfile(f, copydie_inputfile,filename,
322 commso, copydie_commso,0,
327 if (fclose(f)) diee("couldn't close source file `%s'",filename);
329 c= fgetc(commsi); if (c==EOF) die_badrecv("ack");
330 if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c);
332 bandlimit_sendend(stab.st_size, &interval_usec);
339 void usagemessage(void) {
340 puts("usage: rcopy-repeatedly [<options>] <file> <file>\n"
341 " <file> may be <local-file> or [<user>@]<host>:<file>\n"
342 " exactly one of each of the two forms must be provided\n"
343 " a file is taken as remote if it has a : before the first /\n"
348 static void of_help(const struct cmdinfo *ci, const char *val) {
350 if (ferror(stdout)) diee("could not write usage message to stdout");
355 const char *userhost, *path;
358 static FileSpecification srcspec, dstspec;
359 static int upload=-1; /* -1 means not yet known; 0 means download */
361 static const struct cmdinfo cmdinfos[]= {
362 { "help", .call= of_help },
363 { "receiver", .iassignto=&upload, .arg=1 },
364 { "sender", .iassignto=&upload, .arg=0 },
368 static void server(const char *filename) {
372 fprintf(commso, "%s0002 %c\n", banner, upload?'u':'d');
374 c= fgetc(commsi); if (c==EOF) die_badrecv("initial go");
375 if (c!=REPLMSG_GO) die_protocol("initial go message was %#02x instead",c);
383 static void client(void) {
384 int uppipe[2], downpipe[2], r;
390 FileSpecification *remotespec= srcspec.userhost ? &srcspec : &dstspec;
391 const char *remotemode= srcspec.userhost ? "--sender" : "--receiver";
394 if (child==-1) diee("fork failed");
397 mdup2(downpipe[1],1);
398 close(uppipe[0]); close(downpipe[0]);
399 close(uppipe[1]); close(downpipe[1]);
401 rsh_program, remotespec->userhost, rcopy_repeatedly_program,
402 remotemode, remotespec->path, (char*)0);
403 diee("failed to execute rsh program `%s'",rsh_program);
406 commso= fdopen(uppipe[1],"wb");
407 commsi= fdopen(downpipe[0],"rb");
408 if (!commso || !commsi) diee("fdopen failed");
409 close(uppipe[0]); close(downpipe[0]);
410 close(uppipe[1]); close(downpipe[1]);
412 char banbuf[sizeof(banner)-1 + 5 + 1];
413 r= fread(banbuf,1,sizeof(banbuf)-1,commsi);
414 if (r!=sizeof(banbuf)-1) die_badrecv("banner");
416 if (memcmp(banbuf,banner,sizeof(banner)-1) ||
417 banbuf[sizeof(banner)-1 + 4] != ' ')
418 die(8,-1,"banner received was not as expected -"
419 " shell dirty? ssh broken?\n"
420 " try running %s %s %s --sender %s\n"
421 " and expect the first line to be %s",
422 rsh_program, remotespec->userhost,
423 rcopy_repeatedly_program, remotespec->path,
426 banbuf[sizeof(banbuf)-1]= 0;
428 long decllen= strtoul(banbuf + sizeof(banner)-1, &ep, 16);
429 if (*ep) die_protocol("declaration length syntax error");
430 assert(decllen <= sizeof(mainbuf));
431 if (decllen<2) die_protocol("declaration too short");
433 r= fread(mainbuf,1,decllen,commsi);
434 if (r!=decllen) die_badrecv("declaration");
435 if (mainbuf[decllen-1] != '\n')
436 die_protocol("declaration missing final newline");
437 if (mainbuf[0] != upload ? 'u' : 'd')
438 die_protocol("declaration incorrect direction indicator");
440 sendbyte(REPLMSG_GO);
443 sender(srcspec.path);
445 receiver(dstspec.path);
448 static void parse_file_specification(FileSpecification *fs, const char *arg,
452 if (!arg) badusage("too few arguments - missing %s\n",what);
454 for (colon=arg; ; colon++) {
455 if (!*colon || *colon=='/') {
461 char *uh= malloc(colon-arg + 1); if (!fs->userhost) diem();
462 memcpy(uh,arg, colon-arg); uh[colon-arg]= 0;
470 int main(int argc, const char *const *argv) {
471 myopt(&argv, cmdinfos);
474 if (!argv[0] || argv[1])
475 badusage("server mode must have just the filename as non-option arg");
478 parse_file_specification(&srcspec, argv[0], "source");
479 parse_file_specification(&dstspec, argv[1], "destination");
480 if (argv[2]) badusage("too many non-option arguments");
481 if (!!srcspec.userhost == !!dstspec.userhost)
482 badusage("need exactly one remote file argument");