chiark / gitweb /
abolish daft stream burst thing
[chiark-utils.git] / cprogs / rcopy-repeatedly.c
1 /*
2  * rcopy-repeatedly
3  */     
4   
5 /*
6  * protocol is:
7  *   server sends banner
8  *    - "#rcopy-repeatedly#\n"
9  *    - length of declaration, as 4 hex digits, zero prefixed,
10  *      and a newline [5 bytes].  In this protocol version this
11  *      will be "0002" but client _must_ parse it.
12  *   server sends declaration
13  *    - one of "u " or "d" [1 byte]
14  *    - optionally, some more ascii text, reserved for future use
15  *      must be ignored by declaree (but not sent by declarer)
16  *    - a newline [1 byte]
17  *   client sends
18  *    - 0x02   START
19  *        n    2 bytes big endian declaration length
20  *        ...  client's declaration (ascii text, including newline)
21  8             see above
22  * then for each update
23  *   sender sends one of
24  *    - 0x03   destination file should be deleted
25  *             but note that contents must be retained by receiver
26  *             as it may be used for rle updates
27  *    - 0x04   complete new destination file follows, 64-bit length
28  *        l    8 bytes big endian length
29  *        ...  l bytes data
30  *             receiver must then reply with 0x01 ACK
31  */
32
33 #define _GNU_SOURCE
34
35 #include <stdio.h>
36 #include <time.h>
37 #include <stdarg.h>
38 #include <stdlib.h>
39 #include <stdint.h>
40 #include <string.h>
41 #include <errno.h>
42 #include <limits.h>
43 #include <assert.h>
44 #include <math.h>
45
46 #include <sys/types.h>
47 #include <sys/stat.h>
48 #include <unistd.h>
49
50 #include "myopt.h"
51
52 #define REPLMSG_ACK    0x01
53 #define REPLMSG_START  0x02
54 #define REPLMSG_RM     0x03
55 #define REPLMSG_FILE64 0x04
56
57 static const char banner[]= "#rcopy-repeatedly#\n";
58
59 static FILE *commsi, *commso;
60
61 static double max_bw_prop= 0.2;
62 static int txblocksz= INT_MAX, verbose=1;
63 static int min_interval_usec= 100000; /* 100ms */
64
65 static int nsargs;
66 static const char **sargs;
67
68 static const char *rsh_program= 0;
69 static const char *rcopy_repeatedly_program= "rcopy-repeatedly";
70 static int server_upcopy=-1; /* -1 means not yet known; 0 means download */
71   /* `up' means towards the client,
72    * since we regard the subprocess as `down' */
73
74 static int udchar;
75
76 static char mainbuf[65536]; /* must be at least 2^16 */
77
78 #define NORETURN __attribute__((noreturn))
79
80 static void vdie(int ec, const char *pfx, int eno,
81                  const char *fmt, va_list al) NORETURN;
82 static void vdie(int ec, const char *pfx, int eno,
83                  const char *fmt, va_list al) {
84   fputs("rcopy-repeatedly: ",stderr);
85   if (server_upcopy>=0) fputs("server: ",stderr);
86   if (pfx) fprintf(stderr,"%s: ",pfx);
87   vfprintf(stderr,fmt,al);
88   if (eno!=-1) fprintf(stderr,": %s",strerror(eno));
89   fputc('\n',stderr);
90   exit(ec);
91 }
92 static void die(int ec, const char *pfx, int eno,
93                 const char *fmt, ...) NORETURN;
94 static void die(int ec, const char *pfx, int eno,
95                 const char *fmt, ...) {
96   va_list al;
97   va_start(al,fmt);
98   vdie(ec,pfx,eno,fmt,al);
99 }
100
101 static void diem(void) NORETURN;
102 static void diem(void) { die(16,0,errno,"malloc failed"); }
103 static void *xmalloc(size_t sz) {
104   assert(sz);
105   void *p= malloc(sz);
106   if (!p) diem();
107   return p;
108 }
109 static void *xrealloc(void *p, size_t sz) {
110   assert(sz);
111   p= realloc(p,sz);
112   if (!p) diem();
113   return p;
114 }
115
116 static void diee(const char *fmt, ...) NORETURN;
117 static void diee(const char *fmt, ...) {
118   va_list al;
119   va_start(al,fmt);
120   vdie(12,0,errno,fmt,al);
121 }
122 static void die_protocol(const char *fmt, ...) NORETURN;
123 static void die_protocol(const char *fmt, ...) {
124   va_list al;
125   va_start(al,fmt);
126   vdie(10,"protocol error",-1,fmt,al);
127 }
128
129 static void die_badrecv(const char *what) NORETURN;
130 static void die_badrecv(const char *what) {
131   if (ferror(commsi)) diee("communication failed while receiving %s", what);
132   if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what);
133   abort();
134 }
135 static void die_badsend(void) NORETURN;
136 static void die_badsend(void) {
137   diee("transmission failed");
138 }
139
140 static void send_flush(void) {
141   if (ferror(commso) || fflush(commso))
142     die_badsend();
143 }
144 static void sendbyte(int c) {
145   if (putc(c,commso)==EOF)
146     die_badsend();
147 }
148
149 static void mfreadcommsi(void *buf, int l, const char *what) {
150   int r= fread(buf,1,l,commsi);  if (r!=l) die_badrecv(what);
151 }
152 static void mfwritecommso(const void *buf, int l) {
153   int r= fwrite(buf,1,l,commso);  if (r!=l) die_badsend();
154 }
155
156 static void mpipe(int p[2]) { if (pipe(p)) diee("could not create pipe"); }
157 static void mdup2(int fd, int fd2) {
158   if (dup2(fd,fd2)!=fd2) diee("could not dup2(%d,%d)",fd,fd2);
159 }
160
161 typedef void copyfile_die_fn(FILE *f, const char *xi);
162
163 struct timespec ts_sendstart;
164
165 static void mgettime(struct timespec *ts) {
166   int r= clock_gettime(CLOCK_MONOTONIC, ts);
167   if (r) diee("clock_gettime failed");
168 }
169
170 static void bandlimit_sendstart(void) {
171   mgettime(&ts_sendstart);
172 }
173
174 static double mgettime_elapsed(struct timespec ts_base,
175                                struct timespec *ts_ret) {
176   mgettime(ts_ret);
177   return (ts_ret->tv_sec - ts_base.tv_sec) +
178          (ts_ret->tv_nsec - ts_base.tv_nsec)*1e-9;
179 }
180
181 static void flushstderr(void) {
182   if (ferror(stderr) || fflush(stderr))
183     diee("could not write progress to stderr");
184 }
185
186 static void verbosespinprintf(const char *fmt, ...) {
187   static const char spinnerchars[]= "/-\\";
188   static int spinnerchar;
189
190   if (!verbose)
191     return;
192
193   va_list al;
194   va_start(al,fmt);
195   fprintf(stderr,"      %c ",spinnerchars[spinnerchar]);
196   spinnerchar++; spinnerchar %= sizeof(spinnerchars)-1;
197   vfprintf(stderr,fmt,al);
198   putc('\r',stderr);
199   flushstderr();
200 }
201
202 static void bandlimit_sendend(uint64_t bytes, int *interval_usec_update) {
203   struct timespec ts_buf;
204   double elapsed= mgettime_elapsed(ts_sendstart, &ts_buf);
205   double secsperbyte_observed= elapsed / bytes;
206
207   double min_update= elapsed / max_bw_prop;
208   if (min_update > 1e3) min_update= 1e3;
209   int min_update_usec= min_update * 1e6;
210
211   if (*interval_usec_update < min_update_usec)
212     *interval_usec_update= min_update_usec;
213
214   verbosespinprintf("%12lluby %10.3fs %13.2fkby/s %8dms",
215                     (unsigned long long)bytes, elapsed,
216                     1e-3/secsperbyte_observed, *interval_usec_update/1000);
217 }
218  
219 static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi,
220                      FILE *df, copyfile_die_fn *ddie, const char *dxi,
221                      uint64_t lstart, int amsender) {
222   struct timespec ts_last;
223   int now, r;
224   uint64_t l=lstart, done=0;
225
226   ts_last= ts_sendstart;
227
228   while (l>0) {
229     now= l < sizeof(mainbuf) ? l : sizeof(mainbuf);
230     if (now > txblocksz) now= txblocksz;
231
232     r= fread(mainbuf,1,now,sf);  if (r!=now) sdie(sf,sxi);
233     r= fwrite(mainbuf,1,now,df);  if (r!=now) ddie(df,dxi);
234     l -= now;
235     done += now;
236
237     if (verbose) {
238       fprintf(stderr," %3d%% \r",
239               (int)(done*100.0/lstart));
240       flushstderr();
241     }
242   }
243 }
244
245 static void copydie_inputfile(FILE *f, const char *filename) {
246   diee("read failed on source file `%s'", filename);
247 }
248 static void copydie_tmpwrite(FILE *f, const char *tmpfilename) {
249   diee("write failed to temporary receiving file `%s'", tmpfilename);
250 }
251 static void copydie_commsi(FILE *f, const char *what) {
252   die_badrecv(what);
253 }
254 static void copydie_commso(FILE *f, const char *what) {
255   die_badsend();
256 }
257   
258 static int generate_declaration(void) {
259   /* returns length; declaration is left in mainbuf */
260   char *p= mainbuf;
261   *p++= udchar;
262   *p++= '\n';
263   return p - mainbuf;
264 }
265
266 static void read_declaration(int decllen) {
267   assert(decllen <= sizeof(mainbuf));
268   if (decllen<2) die_protocol("declaration too short");
269   mfreadcommsi(mainbuf,decllen,"declaration");
270   if (mainbuf[decllen-1] != '\n')
271     die_protocol("declaration missing final newline");
272   if (mainbuf[0] != udchar)
273     die_protocol("declaration incorrect direction indicator");
274 }
275
276 static void receiver(const char *filename) {
277   FILE *newfile;
278   char *tmpfilename;
279   int r, c;
280
281   char *lastslash= strrchr(filename,'/');
282   if (!lastslash)
283     r= asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename);
284   else
285     r= asprintf(&tmpfilename, "%.*s/.rcopy-repeatedly.#%s#",
286                 (int)(lastslash-filename), filename, lastslash+1);
287   if (r==-1) diem();
288   
289   r= unlink(tmpfilename);
290   if (r && errno!=ENOENT)
291     diee("could not remove temporary receiving file `%s'", tmpfilename);
292   
293   for (;;) {
294     send_flush();
295     c= fgetc(commsi);
296
297     switch (c) {
298
299     case EOF:
300       if (ferror(commsi)) die_badrecv("transfer message code");
301       assert(feof(commsi));
302       return;
303
304     case REPLMSG_RM:
305       r= unlink(filename);
306       if (r && errno!=ENOENT)
307         diee("source file removed but could not remove destination file `%s'",
308              filename);
309       break;
310       
311     case REPLMSG_FILE64:
312       newfile= fopen(tmpfilename, "wb");
313       if (!newfile) diee("could not create temporary receiving file `%s'",
314                          tmpfilename);
315       uint8_t lbuf[8];
316       mfreadcommsi(lbuf,8,"FILE64 l");
317
318       uint64_t l=
319         (lbuf[0] << 28 << 28) |
320         (lbuf[1] << 24 << 24) |
321         (lbuf[2] << 16 << 24) |
322         (lbuf[3] <<  8 << 24) |
323         (lbuf[4]       << 24) |
324         (lbuf[5]       << 16) |
325         (lbuf[6]       <<  8) |
326         (lbuf[7]            ) ;
327
328       copyfile(commsi, copydie_commsi,"FILE64 file data",
329                newfile, copydie_tmpwrite,tmpfilename,
330                l, 0);
331
332       if (fclose(newfile)) diee("could not flush and close temporary"
333                                 " receiving file `%s'", tmpfilename);
334       if (rename(tmpfilename, filename))
335         diee("could not install new version of destination file `%s'",
336              filename);
337
338       sendbyte(REPLMSG_ACK);
339       break;
340
341     default:
342       die_protocol("unknown transfer message code 0x%02x",c);
343
344     }
345   }
346 }
347
348 static void sender(const char *filename) {
349   FILE *f, *fold;
350   int interval_usec, r, c;
351   struct stat stabtest, stab;
352   enum { told_nothing, told_file, told_remove } told;
353
354   interval_usec= 0;
355   fold= 0;
356   told= told_nothing;
357   
358   for (;;) {
359     if (interval_usec) {
360       send_flush();
361       usleep(interval_usec);
362     }
363     interval_usec= min_interval_usec;
364
365     r= stat(filename, &stabtest);
366     if (r) {
367       f= 0;
368     } else {
369       if (told == told_file &&
370           stabtest.st_mode  == stab.st_mode  &&
371           stabtest.st_dev   == stab.st_dev   &&
372           stabtest.st_ino   == stab.st_ino   &&
373           stabtest.st_mtime == stab.st_mtime &&
374           stabtest.st_size  == stab.st_size)
375         continue;
376       f= fopen(filename, "rb");
377     }
378     
379     if (!f) {
380       if (errno!=ENOENT) diee("could not access source file `%s'",filename);
381       if (told != told_remove) {
382         verbosespinprintf
383           (" ENOENT                                                    ");
384         sendbyte(REPLMSG_RM);
385         told= told_remove;
386       }
387       continue;
388     }
389
390     if (fold) fclose(fold);
391     fold= 0;
392
393     r= fstat(fileno(f),&stab);
394     if (r) diee("could not fstat source file `%s'",filename);
395
396     if (!S_ISREG(stab.st_mode))
397       die(8,0,-1,"source file `%s' is not a plain file",filename);
398
399     uint8_t hbuf[9]= {
400       REPLMSG_FILE64,
401       stab.st_size >> 28 >> 28,
402       stab.st_size >> 24 >> 24,
403       stab.st_size >> 16 >> 24,
404       stab.st_size >>  8 >> 24,
405       stab.st_size       >> 24,
406       stab.st_size       >> 16,
407       stab.st_size       >>  8,
408       stab.st_size
409     };
410
411     bandlimit_sendstart();
412
413     mfwritecommso(hbuf,9);
414
415     copyfile(f, copydie_inputfile,filename,
416              commso, copydie_commso,0,
417              stab.st_size, 1);
418
419     send_flush();
420
421     c= fgetc(commsi);  if (c==EOF) die_badrecv("ack");
422     if (c!=REPLMSG_ACK) die_protocol("got %#02x instead of ACK",c);
423
424     bandlimit_sendend(stab.st_size, &interval_usec);
425
426     fold= f;
427     told= told_file;
428   }
429 }
430
431 typedef struct {
432   const char *userhost, *path;
433 } FileSpecification;
434
435 static FileSpecification srcspec, dstspec;
436
437 static void of__server(const struct cmdinfo *ci, const char *val) {
438   int ncount= nsargs + 1 + !!val;
439   sargs= xrealloc(sargs, sizeof(*sargs) * ncount);
440   sargs[nsargs++]= ci->olong;
441   if (val)
442     sargs[nsargs++]= val;
443 }
444
445 static int of__server_int(const struct cmdinfo *ci, const char *val) {
446   of__server(ci,val);
447   long v;
448   char *ep;
449   errno= 0; v= strtol(val,&ep,10);
450   if (!*val || *ep || errno || v<INT_MIN || v>INT_MAX)
451     badusage("bad integer argument `%s' for --%s",val,ci->olong);
452   return v;
453 }
454
455 static void of_help(const struct cmdinfo *ci, const char *val) {
456   usagemessage();
457   if (ferror(stdout)) diee("could not write usage message to stdout");
458   exit(0);
459 }
460
461 static void of_bw(const struct cmdinfo *ci, const char *val) {
462   int pct= of__server_int(ci,val);
463   if (pct<1 || pct>100)
464     badusage("bandwidth percentage must be between 1 and 100 inclusive");
465   *(double*)ci->parg= pct * 0.01;
466 }
467
468 static void of_server_int(const struct cmdinfo *ci, const char *val) {
469   *(int*)ci->parg= of__server_int(ci,val);
470 }
471
472 void usagemessage(void) {
473   printf(
474          "usage: rcopy-repeatedly [<options>] <file> <file>\n"
475          "  <file> may be <local-file> or [<user>@]<host>:<file>\n"
476          "  exactly one of each of the two forms must be provided\n"
477          "  a file is taken as remote if it has a : before the first /\n"
478          "general options:\n"
479          "  --help\n"
480          "  --quiet | -q\n"
481          "options for bandwidth (and cpu time) control:\n"
482          "  --max-bandwidth-percent  (default %d)\n"
483          "  --tx-block-size      (default/max %d)\n"
484          "  --min-interval-usec  (default %d)\n"
485          "options for finding programs:\n"
486          "  --rcopy-repeatedly  (default: rcopy-repeatedly)\n"
487          "  --rsh-program       (default: $RCOPY_REPEATEDLY_RSH or $RSYNC_RSH or ssh)\n"
488          "options passed to server side via ssh:\n"
489          "  --receiver --sender, bandwidth control options\n",
490          (int)(max_bw_prop*100), (int)sizeof(mainbuf), min_interval_usec);
491 }
492
493 static const struct cmdinfo cmdinfos[]= {
494   { "help",     .call= of_help },
495   { "max-bandwidth-percent", 0,1,.call=of_bw,.parg=&max_bw_prop            },
496   { "tx-block-size",0,     1,.call=of_server_int, .parg=&txblocksz         },
497   { "min-interval-usec",0, 1,.call=of_server_int, .parg=&min_interval_usec },
498   { "rcopy-repeatedly",0,  1, .sassignto=&rcopy_repeatedly_program         },
499   { "rsh-program",0,       1, .sassignto=&rsh_program                      },
500   { "quiet",'q',  .iassignto= &verbose,       .arg=0                       },
501   { "receiver",   .iassignto= &server_upcopy, .arg=0                       },
502   { "sender",     .iassignto= &server_upcopy, .arg=1                       },
503   { 0 }
504 };
505
506 static void server(const char *filename) {
507   int c, l;
508   char buf[2];
509
510   udchar= server_upcopy?'u':'d';
511
512   commsi= stdin;
513   commso= stdout;
514   l= generate_declaration();
515   fprintf(commso, "%s%04x\n", banner, l);
516   mfwritecommso(mainbuf, l);
517   send_flush();
518
519   c= fgetc(commsi);
520   if (c==EOF) {
521     if (feof(commsi)) exit(14);
522     assert(ferror(commsi));  die_badrecv("initial START message");
523   }
524   if (c!=REPLMSG_START) die_protocol("initial START was %#02x instead",c);
525
526   mfreadcommsi(buf,2,"START l");
527   l= (buf[0] << 8) | buf[1];
528
529   read_declaration(l);
530
531   if (server_upcopy)
532     sender(filename);
533   else
534     receiver(filename);
535 }
536
537 static void client(void) {
538   int uppipe[2], downpipe[2], r;
539   pid_t child;
540   FileSpecification *remotespec;
541   const char *remotemode;
542
543   mpipe(uppipe);
544   mpipe(downpipe);
545
546   if (srcspec.userhost) {
547     udchar= 'u';
548     remotespec= &srcspec;
549     remotemode= "--sender";
550   } else {
551     udchar= 'd';
552     remotespec= &dstspec;
553     remotemode= "--receiver";
554   }
555
556   sargs= xrealloc(sargs, sizeof(*sargs) * (7 + nsargs));
557   memmove(sargs+5, sargs, sizeof(*sargs) * nsargs);
558   sargs[0]= rsh_program;
559   sargs[1]= remotespec->userhost;
560   sargs[2]= rcopy_repeatedly_program;
561   sargs[3]= remotemode;
562   sargs[4]= "--";
563   sargs[5+nsargs]= remotespec->path;
564   sargs[6+nsargs]= 0;
565     
566   child= fork();
567   if (child==-1) diee("fork failed");
568   if (!child) {
569     mdup2(downpipe[0],0);
570     mdup2(uppipe[1],1);
571     close(uppipe[0]); close(downpipe[0]);
572     close(uppipe[1]); close(downpipe[1]);
573
574     execvp(rsh_program, (char**)sargs);
575     diee("failed to execute rsh program `%s'",rsh_program);
576   }
577
578   commso= fdopen(downpipe[1],"wb");
579   commsi= fdopen(uppipe[0],"rb");
580   if (!commso || !commsi) diee("fdopen failed");
581   close(downpipe[0]);
582   close(uppipe[1]);
583   
584   char banbuf[sizeof(banner)-1 + 5 + 1];
585   r= fread(banbuf,1,sizeof(banbuf)-1,commsi);
586   if (ferror(commsi)) die_badrecv("read banner");
587
588   if (r!=sizeof(banbuf)-1 ||
589       memcmp(banbuf,banner,sizeof(banner)-1) ||
590       banbuf[sizeof(banner)-1 + 4] != '\n') {
591     const char **sap;
592     int count=0;
593     for (count=0, sap=sargs; *sap; sap++) count+= strlen(*sap)+1;
594     char *cmdline= xmalloc(count+1);
595     cmdline[0]=' ';
596     for (sap=sargs; *sap; sap++) {
597       strcat(cmdline," ");
598       strcat(cmdline,*sap);
599     }
600     
601     die(8,0,-1,"did not receive banner as expected -"
602         " shell dirty? ssh broken?\n"
603         " try running\n"
604         "  %s\n"
605         " and expect the first line to be\n"
606         "  %s",
607         cmdline, banner);
608   }
609   
610   banbuf[sizeof(banbuf)-1]= 0;
611   char *ep;
612   long decllen= strtoul(banbuf + sizeof(banner)-1, &ep, 16);
613   if (ep != banbuf + sizeof(banner)-1 + 4)
614     die_protocol("declaration length syntax error");
615
616   read_declaration(decllen);
617
618   int l= generate_declaration();
619   sendbyte(REPLMSG_START);
620   sendbyte((l >> 8) & 0x0ff);
621   sendbyte( l       & 0x0ff);
622   mfwritecommso(mainbuf,l);
623
624   if (remotespec==&srcspec)
625     receiver(dstspec.path);
626   else
627     sender(srcspec.path);
628 }
629
630 static void parse_file_specification(FileSpecification *fs, const char *arg,
631                                      const char *what) {
632   const char *colon;
633   
634   if (!arg) badusage("too few arguments - missing %s\n",what);
635
636   for (colon=arg; ; colon++) {
637     if (!*colon || *colon=='/') {
638       fs->userhost=0;
639       fs->path= arg;
640       return;
641     }
642     if (*colon==':') {
643       char *uh= xmalloc(colon-arg + 1);
644       memcpy(uh,arg, colon-arg);  uh[colon-arg]= 0;
645       fs->userhost= uh;
646       fs->path= colon+1;
647       return;
648     }
649   }
650 }
651
652 int main(int argc, const char *const *argv) {
653   setvbuf(stderr,0,_IOLBF,BUFSIZ);
654
655   myopt(&argv, cmdinfos);
656
657   if (!rsh_program) rsh_program= getenv("RCOPY_REPEATEDLY_RSH");
658   if (!rsh_program) rsh_program= getenv("RSYNC_RSH");
659   if (!rsh_program) rsh_program= "ssh";
660
661   if (txblocksz<1) badusage("transmit block size must be at least 1");
662   if (min_interval_usec<0) badusage("minimum update interval may not be -ve");
663
664   if (server_upcopy>=0) {
665     if (!argv[0] || argv[1])
666       badusage("server mode must have just the filename as non-option arg");
667     server(argv[0]);
668   } else {
669     parse_file_specification(&srcspec, argv[0], "source");
670     parse_file_specification(&dstspec, argv[1], "destination");
671     if (argv[2]) badusage("too many non-option arguments");
672     if (!!srcspec.userhost == !!dstspec.userhost)
673       badusage("need exactly one remote file argument");
674     client();
675   }
676   return 0;
677 }