8 * - "#rcopy-repeatedly#\n"
9 * - length of declaration, as 4 hex digits, zero prefixed,
10 * and a space [5 bytes]. In this protocol version this
11 * will be "0002" but client _must_ parse it.
12 * server sends declaration
13 * - one of "u " or "d" [1 byte]
14 * - optionally, some more ascii text, reserved for future use
15 * must be ignored by client (but not sent by server)
16 * - a newline [1 byte]
19 * then for each update
21 * - 0x03 destination file should be deleted
22 * but note that contents must be retained by receiver
23 * as it may be used for rle updates
24 * - 0x04 complete new destination file follows, 64-bit length
25 * l 8 bytes big endian length
27 * receiver must then reply with 0x01 GO
43 #include <sys/types.h>
49 #define REPLMSG_GO 0x01
50 #define REPLMSG_RM 0x03
51 #define REPLMSG_FILE64 0x04
53 static const char banner[]= "#rcopy-repeatedly#\n";
55 static FILE *commsi, *commso;
57 static double max_bandwidth_proportion_mean= 0.2;
58 static double max_bandwidth_proportion_burst= 0.8;
59 static int txblocksz= INT_MAX, verbose=1;
60 static int min_interval_usec= 100000; /* 100ms */
62 static const char *rsh_program= "ssh";
63 static const char *rcopy_repeatedly_program= "rcopy-repeatedly";
65 static double stream_allow_secsperbyte= 1/1e6; /* for initial transfer */
67 static char mainbuf[65536]; /* must be at least 2^16 */
69 #define NORETURN __attribute__((noreturn))
71 static void vdie(int ec, int eno, const char *fmt, va_list al) NORETURN;
72 static void vdie(int ec, int eno, const char *fmt, va_list al) {
73 fputs("rcopy-repeatedly: ",stderr);
74 vfprintf(stderr,fmt,al);
75 if (eno!=-1) fprintf(stderr,": %s",strerror(eno));
79 static void die(int ec, int eno, const char *fmt, ...) NORETURN;
80 static void die(int ec, int eno, const char *fmt, ...) {
86 static void diem(void) NORETURN;
87 static void diem(void) { die(16,errno,"malloc failed"); }
89 static void diee(const char *fmt, ...) NORETURN;
90 static void diee(const char *fmt, ...) {
93 vdie(12,errno,fmt,al);
95 static void die_protocol(const char *fmt, ...) NORETURN;
96 static void die_protocol(const char *fmt, ...) {
98 fputs("protocol error: ",stderr);
103 static void die_badrecv(const char *what) NORETURN;
104 static void die_badrecv(const char *what) {
105 if (ferror(commsi)) diee("communication failed while receiving %s", what);
106 if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what);
109 static void die_badsend(void) NORETURN;
110 static void die_badsend(void) {
111 diee("transmission failed");
114 static void send_flush(void) {
115 if (ferror(commso) || fflush(commso))
118 static void sendbyte(int c) {
119 if (putc(c,commso)==EOF)
123 static void mpipe(int p[2]) { if (pipe(p)) diee("could not create pipe"); }
124 static void mdup2(int fd, int fd2) {
125 if (dup2(fd,fd2)!=fd2) diee("could not dup2(%d,%d)",fd,fd2);
128 typedef void copyfile_die_fn(FILE *f, const char *xi);
130 struct timespec ts_sendstart;
132 static void mgettime(struct timespec *ts) {
133 int r= clock_gettime(CLOCK_MONOTONIC, ts);
134 if (r) diee("clock_gettime failed");
137 static void bandlimit_sendstart(void) {
138 mgettime(&ts_sendstart);
141 static double mgettime_elapsed(struct timespec ts_base,
142 struct timespec *ts_ret) {
144 return (ts_ret->tv_sec - ts_base.tv_sec) +
145 (ts_ret->tv_nsec - ts_base.tv_nsec)*1e-9;
148 static void verbosespinprintf(const char *fmt, ...) {
149 static const char spinnerchars[]= "/-\\";
150 static int spinnerchar;
157 fprintf(stderr," %c ",spinnerchars[spinnerchar]);
158 spinnerchar++; spinnerchar %= sizeof(spinnerchars)-1;
159 vfprintf(stderr,fmt,al);
161 if (ferror(stderr) || fflush(stderr))
162 diee("could not write progress to stderr");
165 static void bandlimit_sendend(uint64_t bytes, int *interval_usec_update) {
166 struct timespec ts_buf;
167 double elapsed= mgettime_elapsed(ts_sendstart, &ts_buf);
168 double secsperbyte_observed= elapsed / bytes;
170 stream_allow_secsperbyte=
171 secsperbyte_observed / max_bandwidth_proportion_burst;
173 double min_update= elapsed * max_bandwidth_proportion_burst
174 / max_bandwidth_proportion_mean;
175 if (min_update > 1e3) min_update= 1e3;
176 int min_update_usec= min_update * 1e6;
178 if (*interval_usec_update > min_update_usec)
179 *interval_usec_update= min_update_usec;
181 verbosespinprintf("%12lluby %10fs %13gkby/s",
182 (unsigned long long)bytes, elapsed,
183 1e-3/secsperbyte_observed);
186 static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi,
187 FILE *df, copyfile_die_fn *ddie, const char *dxi,
189 struct timespec ts_last;
192 ts_last= ts_sendstart;
195 now= l < sizeof(mainbuf) ? l : sizeof(mainbuf);
196 if (now > txblocksz) now= txblocksz;
198 double elapsed_want= now * stream_allow_secsperbyte;
199 double elapsed= mgettime_elapsed(ts_last, &ts_last);
200 double needwait= elapsed_want - elapsed;
201 if (needwait > 1) needwait= 1;
202 if (needwait > 0) usleep(ceil(needwait * 1e6));
204 r= fread(mainbuf,1,now,sf); if (r!=now) sdie(sf,sxi);
205 r= fwrite(mainbuf,1,now,df); if (r!=now) ddie(df,dxi);
210 static void copydie_inputfile(FILE *f, const char *filename) {
211 diee("read failed on source file `%s'", filename);
213 static void copydie_tmpwrite(FILE *f, const char *tmpfilename) {
214 diee("write failed to temporary receiving file `%s'", tmpfilename);
216 static void copydie_commsi(FILE *f, const char *what) {
219 static void copydie_commso(FILE *f, const char *what) {
223 static void receiver(const char *filename) {
228 if (asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename)
231 r= unlink(tmpfilename);
232 if (r && errno!=ENOENT)
233 diee("could not remove temporary receiving file `%s'", tmpfilename);
242 if (ferror(commsi)) die_badrecv("transfer message code");
243 assert(feof(commsi));
247 newfile= fopen(tmpfilename, "wb");
248 if (!newfile) diee("could not create temporary receiving file `%s'",
251 r= fread(lbuf,1,8,commsi); if (r!=8) die_badrecv("FILE64 l");
254 (lbuf[0] << 28 << 28) |
255 (lbuf[1] << 24 << 24) |
256 (lbuf[2] << 16 << 24) |
257 (lbuf[3] << 8 << 24) |
263 copyfile(commsi, copydie_commsi,"FILE64 file data",
264 newfile, copydie_tmpwrite,tmpfilename,
267 if (fclose(newfile)) diee("could not flush and close temporary"
268 " receiving file `%s'", tmpfilename);
269 if (rename(tmpfilename, filename))
270 diee("could not install new version of destination file `%s'",
273 sendbyte(REPLMSG_GO);
276 die_protocol("unknown transfer message code 0x%02x",c);
282 static void sender(const char *filename) {
284 int interval_usec, r, c;
285 struct stat stabtest, stab;
286 enum { told_nothing, told_file, told_remove } told;
295 usleep(interval_usec);
297 interval_usec= min_interval_usec;
299 r= stat(filename, &stabtest);
303 if (told == told_file &&
304 stabtest.st_mode == stab.st_mode &&
305 stabtest.st_dev == stab.st_dev &&
306 stabtest.st_ino == stab.st_ino &&
307 stabtest.st_mtime == stab.st_mtime &&
308 stabtest.st_size == stab.st_size)
310 f= fopen(filename, "rb");
314 if (errno!=ENOENT) diee("could not access source file `%s'",filename);
315 if (told != told_remove) {
316 verbosespinprintf(" ENOENT ");
317 sendbyte(REPLMSG_RM);
323 if (fold) fclose(fold);
326 r= fstat(fileno(f),&stab);
327 if (r) diee("could not fstat source file `%s'",filename);
329 if (!S_ISREG(stab.st_mode))
330 die(12,-1,"source file `%s' is not a plain file",filename);
334 stab.st_size >> 28 >> 28,
335 stab.st_size >> 24 >> 24,
336 stab.st_size >> 16 >> 24,
337 stab.st_size >> 8 >> 24,
344 bandlimit_sendstart();
346 r= fwrite(hbuf,1,9,commso); if (r!=9) die_badsend();
348 copyfile(f, copydie_inputfile,filename,
349 commso, copydie_commso,0,
354 if (fclose(f)) diee("couldn't close source file `%s'",filename);
356 c= fgetc(commsi); if (c==EOF) die_badrecv("ack");
357 if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c);
359 bandlimit_sendend(stab.st_size, &interval_usec);
366 void usagemessage(void) {
367 puts("usage: rcopy-repeatedly [<options>] <file> <file>\n"
368 " <file> may be <local-file> or [<user>@]<host>:<file>\n"
369 " exactly one of each of the two forms must be provided\n"
370 " a file is taken as remote if it has a : before the first /\n"
375 static void of_help(const struct cmdinfo *ci, const char *val) {
377 if (ferror(stdout)) diee("could not write usage message to stdout");
382 const char *userhost, *path;
385 static FileSpecification srcspec, dstspec;
386 static int upcopy=-1; /* -1 means not yet known; 0 means download */
387 /* `up' means towards the client,
388 * since we regard the subprocess as `down' */
390 static const struct cmdinfo cmdinfos[]= {
391 { "help", .call= of_help },
392 { "receiver", .iassignto=&upcopy, .arg=0 },
393 { "sender", .iassignto=&upcopy, .arg=1 },
397 static void server(const char *filename) {
401 fprintf(commso, "%s0002 %c\n", banner, upcopy?'u':'d');
403 c= fgetc(commsi); if (c==EOF) die_badrecv("initial go");
404 if (c!=REPLMSG_GO) die_protocol("initial go message was %#02x instead",c);
412 static void client(void) {
413 int uppipe[2], downpipe[2], r;
419 FileSpecification *remotespec= srcspec.userhost ? &srcspec : &dstspec;
420 const char *remotemode= srcspec.userhost ? "--sender" : "--receiver";
423 if (child==-1) diee("fork failed");
425 mdup2(downpipe[0],0);
427 close(uppipe[0]); close(downpipe[0]);
428 close(uppipe[1]); close(downpipe[1]);
430 rsh_program, remotespec->userhost, rcopy_repeatedly_program,
431 remotemode, remotespec->path, (char*)0);
432 diee("failed to execute rsh program `%s'",rsh_program);
435 commso= fdopen(downpipe[1],"wb");
436 commsi= fdopen(uppipe[0],"rb");
437 if (!commso || !commsi) diee("fdopen failed");
441 char banbuf[sizeof(banner)-1 + 5 + 1];
442 r= fread(banbuf,1,sizeof(banbuf)-1,commsi);
443 if (ferror(commsi)) die_badrecv("read banner");
445 if (r!=sizeof(banbuf)-1 ||
446 memcmp(banbuf,banner,sizeof(banner)-1) ||
447 banbuf[sizeof(banner)-1 + 4] != ' ')
448 die(8,-1,"did not receive banner as expected -"
449 " shell dirty? ssh broken?\n"
451 " %s %s %s --sender %s\n"
452 " and expect the first line to be\n"
454 rsh_program, remotespec->userhost,
455 rcopy_repeatedly_program, remotespec->path,
458 banbuf[sizeof(banbuf)-1]= 0;
460 long decllen= strtoul(banbuf + sizeof(banner)-1, &ep, 16);
461 if (*ep) die_protocol("declaration length syntax error");
462 assert(decllen <= sizeof(mainbuf));
463 if (decllen<2) die_protocol("declaration too short");
465 r= fread(mainbuf,1,decllen,commsi);
466 if (r!=decllen) die_badrecv("declaration");
467 if (mainbuf[decllen-1] != '\n')
468 die_protocol("declaration missing final newline");
469 if (mainbuf[0] != upcopy ? 'u' : 'd')
470 die_protocol("declaration incorrect direction indicator");
472 sendbyte(REPLMSG_GO);
475 receiver(dstspec.path);
477 sender(srcspec.path);
480 static void parse_file_specification(FileSpecification *fs, const char *arg,
484 if (!arg) badusage("too few arguments - missing %s\n",what);
486 for (colon=arg; ; colon++) {
487 if (!*colon || *colon=='/') {
493 char *uh= malloc(colon-arg + 1); if (!uh) diem();
494 memcpy(uh,arg, colon-arg); uh[colon-arg]= 0;
502 int main(int argc, const char *const *argv) {
503 setvbuf(stderr,0,_IOLBF,BUFSIZ);
505 myopt(&argv, cmdinfos);
508 if (!argv[0] || argv[1])
509 badusage("server mode must have just the filename as non-option arg");
512 parse_file_specification(&srcspec, argv[0], "source");
513 parse_file_specification(&dstspec, argv[1], "destination");
514 if (argv[2]) badusage("too many non-option arguments");
515 if (!!srcspec.userhost == !!dstspec.userhost)
516 badusage("need exactly one remote file argument");