chiark / gitweb /
9c231857d5a16bc846e5500740c254f8b1cd5f5d
[chiark-utils.git] / cprogs / rcopy-repeatedly.c
1 /*
2  * rcopy-repeatedly
3  */     
4   
5 /*
6  * protocol is:
7  *   server sends banner
8  *    - "#rcopy-repeatedly#\n"
9  *    - length of declaration, as 4 hex digits, zero prefixed,
10  *      and a space [5 bytes].  In this protocol version this
11  *      will be "0002" but client _must_ parse it.
12  *   server sends declaration
13  *    - one of "u " or "d" [1 byte]
14  *    - optionally, some more ascii text, reserved for future use
15  *      must be ignored by client (but not sent by server)
16  *    - a newline [1 byte]
17  *   client sends
18  *    - 0x01   go
19  * then for each update
20  *   sender sends one of
21  *    - 0x03   destination file should be deleted
22  *             but note that contents must be retained by receiver
23  *             as it may be used for rle updates
24  *    - 0x04   complete new destination file follows, 64-bit length
25  *        l    8 bytes big endian length
26  *        ...  l bytes data
27  *             receiver must then reply with 0x01 GO
28  */
29
30 #define _GNU_SOURCE
31
32 #include <stdio.h>
33 #include <time.h>
34 #include <stdarg.h>
35 #include <stdlib.h>
36 #include <stdint.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <limits.h>
40 #include <assert.h>
41 #include <math.h>
42
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #include <unistd.h>
46
47 #include "myopt.h"
48
49 #define REPLMSG_GO     0x01
50 #define REPLMSG_RM     0x03
51 #define REPLMSG_FILE64 0x04
52
53 static const char banner[]= "#rcopy-repeatedly#\n";
54
55 static FILE *commsi, *commso;
56
57 static double max_bandwidth_proportion_mean= 0.2;
58 static double max_bandwidth_proportion_burst= 0.8;
59 static int txblocksz= INT_MAX, verbose=1;
60 static int min_interval_usec= 100000; /* 100ms */
61
62 static const char *rsh_program= "ssh";
63 static const char *rcopy_repeatedly_program= "rcopy-repeatedly";
64
65 static double stream_allow_secsperbyte= 1/1e6; /* for initial transfer */
66
67 static char mainbuf[65536]; /* must be at least 2^16 */
68
69 #define NORETURN __attribute__((noreturn))
70
71 static void vdie(int ec, int eno, const char *fmt, va_list al) NORETURN;
72 static void vdie(int ec, int eno, const char *fmt, va_list al) {
73   fputs("rcopy-repeatedly: ",stderr);
74   vfprintf(stderr,fmt,al);
75   if (eno!=-1) fprintf(stderr,": %s",strerror(eno));
76   fputc('\n',stderr);
77   exit(ec);
78 }
79 static void die(int ec, int eno, const char *fmt, ...) NORETURN;
80 static void die(int ec, int eno, const char *fmt, ...) {
81   va_list al;
82   va_start(al,fmt);
83   vdie(ec,eno,fmt,al);
84 }
85
86 static void diem(void) NORETURN;
87 static void diem(void) { die(16,errno,"malloc failed"); }
88
89 static void diee(const char *fmt, ...) NORETURN;
90 static void diee(const char *fmt, ...) {
91   va_list al;
92   va_start(al,fmt);
93   vdie(12,errno,fmt,al);
94 }
95 static void die_protocol(const char *fmt, ...) NORETURN;
96 static void die_protocol(const char *fmt, ...) {
97   va_list al;
98   fputs("protocol error: ",stderr);
99   va_start(al,fmt);
100   vdie(10,-1,fmt,al);
101 }
102
103 static void die_badrecv(const char *what) NORETURN;
104 static void die_badrecv(const char *what) {
105   if (ferror(commsi)) diee("communication failed while receiving %s", what);
106   if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what);
107   abort();
108 }
109 static void die_badsend(void) NORETURN;
110 static void die_badsend(void) {
111   diee("transmission failed");
112 }
113
114 static void send_flush(void) {
115   if (ferror(commso) || fflush(commso))
116     die_badsend();
117 }
118 static void sendbyte(int c) {
119   if (putc(c,commso)==EOF)
120     die_badsend();
121 }
122
123 static void mpipe(int p[2]) { if (pipe(p)) diee("could not create pipe"); }
124 static void mdup2(int fd, int fd2) {
125   if (dup2(fd,fd2)!=fd2) diee("could not dup2(%d,%d)",fd,fd2);
126 }
127
128 typedef void copyfile_die_fn(FILE *f, const char *xi);
129
130 struct timespec ts_sendstart;
131
132 static void mgettime(struct timespec *ts) {
133   int r= clock_gettime(CLOCK_MONOTONIC, ts);
134   if (r) diee("clock_gettime failed");
135 }
136
137 static void bandlimit_sendstart(void) {
138   mgettime(&ts_sendstart);
139 }
140
141 static double mgettime_elapsed(struct timespec ts_base,
142                                struct timespec *ts_ret) {
143   mgettime(ts_ret);
144   return (ts_ret->tv_sec - ts_base.tv_sec) +
145          (ts_ret->tv_nsec - ts_base.tv_nsec)*1e-9;
146 }
147
148 static void verbosespinprintf(const char *fmt, ...) {
149   static const char spinnerchars[]= "/-\\";
150   static int spinnerchar;
151
152   if (!verbose)
153     return;
154
155   va_list al;
156   va_start(al,fmt);
157   fprintf(stderr," %c ",spinnerchars[spinnerchar]);
158   spinnerchar++; spinnerchar %= sizeof(spinnerchars)-1;
159   vfprintf(stderr,fmt,al);
160   putc('\r',stderr);
161   if (ferror(stderr) || fflush(stderr))
162     diee("could not write progress to stderr");
163 }
164
165 static void bandlimit_sendend(uint64_t bytes, int *interval_usec_update) {
166   struct timespec ts_buf;
167   double elapsed= mgettime_elapsed(ts_sendstart, &ts_buf);
168   double secsperbyte_observed= elapsed / bytes;
169
170   stream_allow_secsperbyte=
171     secsperbyte_observed / max_bandwidth_proportion_burst;
172
173   double min_update= elapsed * max_bandwidth_proportion_burst
174     / max_bandwidth_proportion_mean;
175   if (min_update > 1e3) min_update= 1e3;
176   int min_update_usec= min_update * 1e6;
177
178   if (*interval_usec_update > min_update_usec)
179     *interval_usec_update= min_update_usec;
180
181   verbosespinprintf("%12lluby %10fs %13gkby/s",
182                     (unsigned long long)bytes, elapsed,
183                     1e-3/secsperbyte_observed);
184 }
185  
186 static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi,
187                      FILE *df, copyfile_die_fn *ddie, const char *dxi,
188                      uint64_t l) {
189   struct timespec ts_last;
190   int now, r;
191
192   ts_last= ts_sendstart;
193
194   while (l>0) {
195     now= l < sizeof(mainbuf) ? l : sizeof(mainbuf);
196     if (now > txblocksz) now= txblocksz;
197
198     double elapsed_want= now * stream_allow_secsperbyte;
199     double elapsed= mgettime_elapsed(ts_last, &ts_last);
200     double needwait= elapsed_want - elapsed;
201     if (needwait > 1) needwait= 1;
202     if (needwait > 0) usleep(ceil(needwait * 1e6));
203
204     r= fread(mainbuf,1,now,sf);  if (r!=now) sdie(sf,sxi);
205     r= fwrite(mainbuf,1,now,df);  if (r!=now) ddie(df,dxi);
206     l -= now;
207   }
208 }
209
210 static void copydie_inputfile(FILE *f, const char *filename) {
211   diee("read failed on source file `%s'", filename);
212 }
213 static void copydie_tmpwrite(FILE *f, const char *tmpfilename) {
214   diee("write failed to temporary receiving file `%s'", tmpfilename);
215 }
216 static void copydie_commsi(FILE *f, const char *what) {
217   die_badrecv(what);
218 }
219 static void copydie_commso(FILE *f, const char *what) {
220   die_badsend();
221 }
222   
223 static void receiver(const char *filename) {
224   FILE *newfile;
225   char *tmpfilename;
226   int r, c;
227
228   if (asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename)
229       ==-1) diem();
230   
231   r= unlink(tmpfilename);
232   if (r && errno!=ENOENT)
233     diee("could not remove temporary receiving file `%s'", tmpfilename);
234   
235   for (;;) {
236     send_flush();
237     c= fgetc(commsi);
238
239     switch (c) {
240
241     case EOF:
242       if (ferror(commsi)) die_badrecv("transfer message code");
243       assert(feof(commsi));
244       return;
245
246     case REPLMSG_FILE64:
247       newfile= fopen(tmpfilename, "wb");
248       if (!newfile) diee("could not create temporary receiving file `%s'",
249                          tmpfilename);
250       uint8_t lbuf[8];
251       r= fread(lbuf,1,8,commsi);  if (r!=8) die_badrecv("FILE64 l");
252
253       uint64_t l=
254         (lbuf[0] << 28 << 28) |
255         (lbuf[1] << 24 << 24) |
256         (lbuf[2] << 16 << 24) |
257         (lbuf[3] <<  8 << 24) |
258         (lbuf[4]       << 24) |
259         (lbuf[5]       << 16) |
260         (lbuf[6]       <<  8) |
261         (lbuf[7]            ) ;
262
263       copyfile(commsi, copydie_commsi,"FILE64 file data",
264                newfile, copydie_tmpwrite,tmpfilename,
265                l);
266
267       if (fclose(newfile)) diee("could not flush and close temporary"
268                                 " receiving file `%s'", tmpfilename);
269       if (rename(tmpfilename, filename))
270         diee("could not install new version of destination file `%s'",
271              filename);
272
273       sendbyte(REPLMSG_GO);
274
275     default:
276       die_protocol("unknown transfer message code 0x%02x",c);
277
278     }
279   }
280 }
281
282 static void sender(const char *filename) {
283   FILE *f, *fold;
284   int interval_usec, r, c;
285   struct stat stabtest, stab;
286   enum { told_nothing, told_file, told_remove } told;
287
288   interval_usec= 0;
289   fold= 0;
290   told= told_nothing;
291   
292   for (;;) {
293     if (interval_usec) {
294       send_flush();
295       usleep(interval_usec);
296     }
297     interval_usec= min_interval_usec;
298
299     r= stat(filename, &stabtest);
300     if (r) {
301       f= 0;
302     } else {
303       if (told == told_file &&
304           stabtest.st_mode  == stab.st_mode  &&
305           stabtest.st_dev   == stab.st_dev   &&
306           stabtest.st_ino   == stab.st_ino   &&
307           stabtest.st_mtime == stab.st_mtime &&
308           stabtest.st_size  == stab.st_size)
309         continue;
310       f= fopen(filename, "rb");
311     }
312     
313     if (!f) {
314       if (errno!=ENOENT) diee("could not access source file `%s'",filename);
315       if (told != told_remove) {
316         verbosespinprintf(" ENOENT                            ");
317         sendbyte(REPLMSG_RM);
318         told= told_remove;
319       }
320       continue;
321     }
322
323     if (fold) fclose(fold);
324     fold= 0;
325
326     r= fstat(fileno(f),&stab);
327     if (r) diee("could not fstat source file `%s'",filename);
328
329     if (!S_ISREG(stab.st_mode))
330       die(12,-1,"source file `%s' is not a plain file",filename);
331
332     uint8_t hbuf[9]= {
333       REPLMSG_FILE64,
334       stab.st_size >> 28 >> 28,
335       stab.st_size >> 24 >> 24,
336       stab.st_size >> 16 >> 24,
337       stab.st_size >>  8 >> 24,
338       stab.st_size       >> 24,
339       stab.st_size       >> 16,
340       stab.st_size       >>  8,
341       stab.st_size
342     };
343
344     bandlimit_sendstart();
345     
346     r= fwrite(hbuf,1,9,commso);  if (r!=9) die_badsend();
347
348     copyfile(f, copydie_inputfile,filename,
349              commso, copydie_commso,0,
350              stab.st_size);
351
352     send_flush();
353
354     if (fclose(f)) diee("couldn't close source file `%s'",filename);
355
356     c= fgetc(commsi);  if (c==EOF) die_badrecv("ack");
357     if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c);
358
359     bandlimit_sendend(stab.st_size, &interval_usec);
360
361     fold= f;
362     told= told_file;
363   }
364 }
365
366 void usagemessage(void) {
367   puts("usage: rcopy-repeatedly [<options>] <file> <file>\n"
368        " <file> may be <local-file> or [<user>@]<host>:<file>\n"
369        " exactly one of each of the two forms must be provided\n"
370        " a file is taken as remote if it has a : before the first /\n"
371        "options\n"
372        " --help\n");
373 }
374
375 static void of_help(const struct cmdinfo *ci, const char *val) {
376   usagemessage();
377   if (ferror(stdout)) diee("could not write usage message to stdout");
378   exit(0);
379 }
380
381 typedef struct {
382   const char *userhost, *path;
383 } FileSpecification;
384
385 static FileSpecification srcspec, dstspec;
386 static int upcopy=-1; /* -1 means not yet known; 0 means download */
387   /* `up' means towards the client,
388    * since we regard the subprocess as `down' */
389
390 static const struct cmdinfo cmdinfos[]= {
391   { "help",     .call= of_help },
392   { "receiver", .iassignto=&upcopy, .arg=0 },
393   { "sender",   .iassignto=&upcopy, .arg=1 },
394   { 0 }
395 };
396
397 static void server(const char *filename) {
398   int c;
399   commsi= stdin;
400   commso= stdout;
401   fprintf(commso, "%s0002 %c\n", banner, upcopy?'u':'d');
402   send_flush();
403   c= fgetc(commsi);  if (c==EOF) die_badrecv("initial go");
404   if (c!=REPLMSG_GO) die_protocol("initial go message was %#02x instead",c);
405
406   if (upcopy)
407     sender(filename);
408   else
409     receiver(filename);
410 }
411
412 static void client(void) {
413   int uppipe[2], downpipe[2], r;
414   pid_t child;
415
416   mpipe(uppipe);
417   mpipe(downpipe);
418
419   FileSpecification *remotespec= srcspec.userhost ? &srcspec : &dstspec;
420   const char *remotemode= srcspec.userhost ? "--sender" : "--receiver";
421
422   child= fork();
423   if (child==-1) diee("fork failed");
424   if (!child) {
425     mdup2(downpipe[0],0);
426     mdup2(uppipe[1],1);
427     close(uppipe[0]); close(downpipe[0]);
428     close(uppipe[1]); close(downpipe[1]);
429     execlp(rsh_program,
430            rsh_program, remotespec->userhost, rcopy_repeatedly_program,
431            remotemode, remotespec->path, (char*)0);
432     diee("failed to execute rsh program `%s'",rsh_program);
433   }
434
435   commso= fdopen(downpipe[1],"wb");
436   commsi= fdopen(uppipe[0],"rb");
437   if (!commso || !commsi) diee("fdopen failed");
438   close(downpipe[0]);
439   close(uppipe[1]);
440   
441   char banbuf[sizeof(banner)-1 + 5 + 1];
442   r= fread(banbuf,1,sizeof(banbuf)-1,commsi);
443   if (ferror(commsi)) die_badrecv("read banner");
444
445   if (r!=sizeof(banbuf)-1 ||
446       memcmp(banbuf,banner,sizeof(banner)-1) ||
447       banbuf[sizeof(banner)-1 + 4] != ' ')
448     die(8,-1,"did not receive banner as expected -"
449         " shell dirty? ssh broken?\n"
450         " try running\n"
451         "  %s %s %s --sender %s\n"
452         " and expect the first line to be\n"
453         "  %s",
454         rsh_program, remotespec->userhost,
455         rcopy_repeatedly_program, remotespec->path,
456         banner);
457   
458   banbuf[sizeof(banbuf)-1]= 0;
459   char *ep;
460   long decllen= strtoul(banbuf + sizeof(banner)-1, &ep, 16);
461   if (*ep) die_protocol("declaration length syntax error");
462   assert(decllen <= sizeof(mainbuf));
463   if (decllen<2) die_protocol("declaration too short");
464
465   r= fread(mainbuf,1,decllen,commsi);
466   if (r!=decllen) die_badrecv("declaration");
467   if (mainbuf[decllen-1] != '\n')
468     die_protocol("declaration missing final newline");
469   if (mainbuf[0] != upcopy ? 'u' : 'd')
470     die_protocol("declaration incorrect direction indicator");
471
472   sendbyte(REPLMSG_GO);
473
474   if (upcopy)
475     receiver(dstspec.path);
476   else
477     sender(srcspec.path);
478 }
479
480 static void parse_file_specification(FileSpecification *fs, const char *arg,
481                                      const char *what) {
482   const char *colon;
483   
484   if (!arg) badusage("too few arguments - missing %s\n",what);
485
486   for (colon=arg; ; colon++) {
487     if (!*colon || *colon=='/') {
488       fs->userhost=0;
489       fs->path= arg;
490       return;
491     }
492     if (*colon==':') {
493       char *uh= malloc(colon-arg + 1);  if (!uh) diem();
494       memcpy(uh,arg, colon-arg);  uh[colon-arg]= 0;
495       fs->userhost= uh;
496       fs->path= colon+1;
497       return;
498     }
499   }
500 }
501
502 int main(int argc, const char *const *argv) {
503   setvbuf(stderr,0,_IOLBF,BUFSIZ);
504
505   myopt(&argv, cmdinfos);
506
507   if (upcopy>=0) {
508     if (!argv[0] || argv[1])
509       badusage("server mode must have just the filename as non-option arg");
510     server(argv[0]);
511   } else {
512     parse_file_specification(&srcspec, argv[0], "source");
513     parse_file_specification(&dstspec, argv[1], "destination");
514     if (argv[2]) badusage("too many non-option arguments");
515     if (!!srcspec.userhost == !!dstspec.userhost)
516       badusage("need exactly one remote file argument");
517     client();
518   }
519   return 0;
520 }