chiark / gitweb /
399e8aacc6e19a37a617bbae3f1d92cf5886d6ef
[chiark-utils.git] / cprogs / rcopy-repeatedly.c
1 /*
2  * rcopy-repeatedly
3  */     
4   
5 /*
6  * protocol is:
7  *   server sends banner
8  *    - "#rcopy-repeatedly#\n"
9  *    - length of declaration, as 4 hex digits, zero prefixed,
10  *      and a space [5 bytes].  In this protocol version this
11  *      will be "0002" but client _must_ parse it.
12  *   server sends declaration
13  *    - one of "u " or "d" [1 byte]
14  *    - optionally, some more ascii text, reserved for future use
15  *      must be ignored by client (but not sent by server)
16  *    - a newline [1 byte]
17  *   client sends
18  *    - 0x01   go
19  * then for each update
20  *   sender sends one of
21  *    - 0x03   destination file should be deleted
22  *             but note that contents must be retained by receiver
23  *             as it may be used for rle updates
24  *    - 0x04   complete new destination file follows, 64-bit length
25  *        l    8 bytes big endian length
26  *        ...  l bytes data
27  *             receiver must then reply with 0x01 GO
28  */
29
30 #define _GNU_SOURCE
31
32 #include <stdio.h>
33 #include <time.h>
34 #include <stdarg.h>
35 #include <stdlib.h>
36 #include <stdint.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <limits.h>
40 #include <assert.h>
41 #include <math.h>
42
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #include <unistd.h>
46
47 #include "myopt.h"
48
49 #define REPLMSG_GO     0x01
50 #define REPLMSG_RM     0x03
51 #define REPLMSG_FILE64 0x04
52
53 static const char banner[]= "#rcopy-repeatedly#\n";
54
55 static FILE *commsi, *commso;
56
57 static double max_bandwidth_proportion_mean= 0.2;
58 static double max_bandwidth_proportion_burst= 0.8;
59 static int txblocksz= INT_MAX;
60 static int min_interval_usec= 100000; /* 100ms */
61
62 static const char *rsh_program= "ssh";
63 static const char *rcopy_repeatedly_program= "rcopy-repeatedly";
64
65 static double stream_allow_secsperbyte= 1/1e6; /* for initial transfer */
66
67 static char mainbuf[65536]; /* must be at least 2^16 */
68
69 #define NORETURN __attribute__((noreturn))
70
71 static void vdie(int ec, int eno, const char *fmt, va_list al) NORETURN;
72 static void vdie(int ec, int eno, const char *fmt, va_list al) {
73   fputs("rcopy-repeatedly: ",stderr);
74   vfprintf(stderr,fmt,al);
75   if (eno!=-1) fprintf(stderr,": %s",strerror(eno));
76   fputc('\n',stderr);
77   exit(ec);
78 }
79 static void die(int ec, int eno, const char *fmt, ...) NORETURN;
80 static void die(int ec, int eno, const char *fmt, ...) {
81   va_list al;
82   va_start(al,fmt);
83   vdie(ec,eno,fmt,al);
84 }
85
86 static void diem(void) NORETURN;
87 static void diem(void) { die(16,errno,"malloc failed"); }
88
89 static void diee(const char *fmt, ...) NORETURN;
90 static void diee(const char *fmt, ...) {
91   va_list al;
92   va_start(al,fmt);
93   vdie(12,errno,fmt,al);
94 }
95 static void die_protocol(const char *fmt, ...) NORETURN;
96 static void die_protocol(const char *fmt, ...) {
97   va_list al;
98   fputs("protocol error: ",stderr);
99   va_start(al,fmt);
100   vdie(10,-1,fmt,al);
101 }
102
103 static void die_badrecv(const char *what) NORETURN;
104 static void die_badrecv(const char *what) {
105   if (ferror(commsi)) diee("communication failed while receiving %s", what);
106   if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what);
107   abort();
108 }
109 static void die_badsend(void) NORETURN;
110 static void die_badsend(void) {
111   diee("transmission failed");
112 }
113
114 static void send_flush(void) {
115   if (ferror(commso) || fflush(commso))
116     die_badsend();
117 }
118 static void sendbyte(int c) {
119   if (putc(c,commso)==EOF)
120     die_badsend();
121 }
122
123 static void mpipe(int p[2]) { if (pipe(p)) diee("could not create pipe"); }
124 static void mdup2(int fd, int fd2) {
125   if (dup2(fd,fd2)!=fd2) diee("could not dup2(%d,%d)",fd,fd2);
126 }
127
128 typedef void copyfile_die_fn(FILE *f, const char *xi);
129
130 struct timespec ts_sendstart;
131
132 static void mgettime(struct timespec *ts) {
133   int r= clock_gettime(CLOCK_MONOTONIC, ts);
134   if (r) diee("clock_gettime failed");
135 }
136
137 static void bandlimit_sendstart(void) {
138   mgettime(&ts_sendstart);
139 }
140
141 static double mgettime_elapsed(struct timespec ts_base,
142                                struct timespec *ts_ret) {
143   mgettime(ts_ret);
144   return (ts_ret->tv_sec - ts_base.tv_sec) +
145          (ts_ret->tv_nsec - ts_base.tv_nsec)*1e-9;
146 }
147
148 static void bandlimit_sendend(uint64_t bytes, int *interval_usec_update) {
149   struct timespec ts_buf;
150   double elapsed= mgettime_elapsed(ts_sendstart, &ts_buf);
151   double secsperbyte_observed= elapsed / bytes;
152
153   stream_allow_secsperbyte=
154     secsperbyte_observed / max_bandwidth_proportion_burst;
155
156   double min_update= elapsed * max_bandwidth_proportion_burst
157     / max_bandwidth_proportion_mean;
158   if (min_update > 1e3) min_update= 1e3;
159   int min_update_usec= min_update * 1e6;
160
161   if (*interval_usec_update > min_update_usec)
162     *interval_usec_update= min_update_usec;
163 }
164  
165 static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi,
166                      FILE *df, copyfile_die_fn *ddie, const char *dxi,
167                      uint64_t l) {
168   struct timespec ts_last;
169   int now, r;
170
171   ts_last= ts_sendstart;
172
173   while (l>=0) {
174     now= l < sizeof(mainbuf) ? l : sizeof(mainbuf);
175     if (now > txblocksz) now= txblocksz;
176
177     double elapsed_want= now * stream_allow_secsperbyte;
178     double elapsed= mgettime_elapsed(ts_last, &ts_last);
179     double needwait= elapsed_want - elapsed;
180     if (needwait > 1) needwait= 1;
181     if (needwait > 0) usleep(ceil(needwait * 1e6));
182
183     r= fread(mainbuf,1,now,sf);  if (r!=now) sdie(sf,sxi);
184     r= fwrite(mainbuf,1,now,df);  if (r!=now) ddie(df,dxi);
185     l -= now;
186   }
187 }
188
189 static void copydie_inputfile(FILE *f, const char *filename) {
190   diee("read failed on source file `%s'", filename);
191 }
192 static void copydie_tmpwrite(FILE *f, const char *tmpfilename) {
193   diee("write failed to temporary receiving file `%s'", tmpfilename);
194 }
195 static void copydie_commsi(FILE *f, const char *what) {
196   die_badrecv(what);
197 }
198 static void copydie_commso(FILE *f, const char *what) {
199   die_badsend();
200 }
201   
202 static void receiver(const char *filename) {
203   FILE *newfile;
204   char *tmpfilename;
205   int r, c;
206
207   if (asprintf(&tmpfilename, ".rcopy-repeatedly.#%s#", filename)
208       ==-1) diem();
209   
210   r= unlink(tmpfilename);
211   if (r && errno!=ENOENT)
212     diee("could not remove temporary receiving file `%s'", tmpfilename);
213   
214   for (;;) {
215     send_flush();
216     c= fgetc(commsi);
217
218     switch (c) {
219
220     case EOF:
221       if (ferror(commsi)) die_badrecv("transfer message code");
222       assert(feof(commsi));
223       return;
224
225     case REPLMSG_FILE64:
226       newfile= fopen(tmpfilename, "wb");
227       if (!newfile) diee("could not create temporary receiving file `%s'",
228                          tmpfilename);
229       uint8_t lbuf[8];
230       r= fread(lbuf,1,8,commsi);  if (r!=8) die_badrecv("FILE64 l");
231
232       uint64_t l=
233         (lbuf[0] << 28 << 28) |
234         (lbuf[1] << 24 << 24) |
235         (lbuf[2] << 16 << 24) |
236         (lbuf[3] <<  8 << 24) |
237         (lbuf[4]       << 24) |
238         (lbuf[5]       << 16) |
239         (lbuf[6]       <<  8) |
240         (lbuf[7]            ) ;
241
242       copyfile(commsi, copydie_commsi,"FILE64 file data",
243                newfile, copydie_tmpwrite,tmpfilename,
244                l);
245
246       if (fclose(newfile)) diee("could not flush and close temporary"
247                                 " receiving file `%s'", tmpfilename);
248       if (rename(tmpfilename, filename))
249         diee("could not install new version of destination file `%s'",
250              filename);
251
252       sendbyte(REPLMSG_GO);
253
254     default:
255       die_protocol("unknown transfer message code 0x%02x",c);
256
257     }
258   }
259 }
260
261 static void sender(const char *filename) {
262   FILE *f, *fold;
263   int interval_usec, r, c;
264   struct stat stabtest, stab;
265   enum { told_nothing, told_file, told_remove } told;
266
267   interval_usec= 0;
268   fold= 0;
269   told= told_nothing;
270   
271   for (;;) {
272     if (interval_usec) usleep(interval_usec);
273     interval_usec= min_interval_usec;
274
275     r= stat(filename, &stabtest);
276     if (r) {
277       f= 0;
278     } else {
279       if (told == told_file &&
280           stabtest.st_mode == stab.st_mode &&
281           stabtest.st_dev == stab.st_dev &&
282           stabtest.st_ino == stab.st_ino)
283         continue;
284       f= fopen(filename, "rb");
285     }
286     
287     if (!f) {
288       if (errno!=ENOENT) diee("could not access source file `%s'",filename);
289       if (told != told_remove) {
290         sendbyte(REPLMSG_RM);
291         told= told_remove;
292       }
293       continue;
294     }
295
296     if (fold) fclose(fold);
297     fold= 0;
298
299     r= fstat(fileno(f),&stab);
300     if (r) diee("could not fstat source file `%s'",filename);
301
302     if (!S_ISREG(stab.st_mode))
303       die(12,-1,"source file `%s' is not a plain file",filename);
304
305     uint8_t hbuf[9]= {
306       REPLMSG_FILE64,
307       stab.st_size >> 28 >> 28,
308       stab.st_size >> 24 >> 24,
309       stab.st_size >> 16 >> 24,
310       stab.st_size >>  8 >> 24,
311       stab.st_size       >> 24,
312       stab.st_size       >> 16,
313       stab.st_size       >>  8,
314       stab.st_size
315     };
316
317     bandlimit_sendstart();
318     
319     r= fwrite(hbuf,1,9,commso);  if (r!=9) die_badsend();
320
321     copyfile(f, copydie_inputfile,filename,
322              commso, copydie_commso,0,
323              stab.st_size);
324
325     send_flush();
326
327     if (fclose(f)) diee("couldn't close source file `%s'",filename);
328
329     c= fgetc(commsi);  if (c==EOF) die_badrecv("ack");
330     if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c);
331
332     bandlimit_sendend(stab.st_size, &interval_usec);
333
334     fold= f;
335     told= told_file;
336   }
337 }
338
339 void usagemessage(void) {
340   puts("usage: rcopy-repeatedly [<options>] <file> <file>\n"
341        " <file> may be <local-file> or [<user>@]<host>:<file>\n"
342        " exactly one of each of the two forms must be provided\n"
343        " a file is taken as remote if it has a : before the first /\n"
344        "options\n"
345        " --help\n");
346 }
347
348 static void of_help(const struct cmdinfo *ci, const char *val) {
349   usagemessage();
350   if (ferror(stdout)) diee("could not write usage message to stdout");
351   exit(0);
352 }
353
354 typedef struct {
355   const char *userhost, *path;
356 } FileSpecification;
357
358 static FileSpecification srcspec, dstspec;
359 static int upload=-1; /* -1 means not yet known; 0 means download */
360
361 static const struct cmdinfo cmdinfos[]= {
362   { "help",     .call= of_help },
363   { "receiver", .iassignto=&upload, .arg=1 },
364   { "sender",   .iassignto=&upload, .arg=0 },
365   { 0 }
366 };
367
368 static void server(const char *filename) {
369   int c;
370   commsi= stdin;
371   commso= stdout;
372   fprintf(commso, "%s0002 %c\n", banner, upload?'u':'d');
373   send_flush();
374   c= fgetc(commsi);  if (c==EOF) die_badrecv("initial go");
375   if (c!=REPLMSG_GO) die_protocol("initial go message was %#02x instead",c);
376
377   if (upload)
378     receiver(filename);
379   else
380     sender(filename);
381 }
382
383 static void client(void) {
384   int uppipe[2], downpipe[2], r;
385   pid_t child;
386
387   mpipe(uppipe);
388   mpipe(downpipe);
389
390   FileSpecification *remotespec= srcspec.userhost ? &srcspec : &dstspec;
391   const char *remotemode= srcspec.userhost ? "--sender" : "--receiver";
392
393   child= fork();
394   if (child==-1) diee("fork failed");
395   if (!child) {
396     mdup2(uppipe[0],0);
397     mdup2(downpipe[1],1);
398     close(uppipe[0]); close(downpipe[0]);
399     close(uppipe[1]); close(downpipe[1]);
400     execlp(rsh_program,
401            rsh_program, remotespec->userhost, rcopy_repeatedly_program,
402            remotemode, remotespec->path, (char*)0);
403     diee("failed to execute rsh program `%s'",rsh_program);
404   }
405
406   commso= fdopen(uppipe[1],"wb");
407   commsi= fdopen(downpipe[0],"rb");
408   if (!commso || !commsi) diee("fdopen failed");
409   close(uppipe[0]); close(downpipe[0]);
410   close(uppipe[1]); close(downpipe[1]);
411   
412   char banbuf[sizeof(banner)-1 + 5 + 1];
413   r= fread(banbuf,1,sizeof(banbuf)-1,commsi);
414   if (r!=sizeof(banbuf)-1) die_badrecv("banner");
415
416   if (memcmp(banbuf,banner,sizeof(banner)-1) ||
417       banbuf[sizeof(banner)-1 + 4] != ' ')
418     die(8,-1,"banner received was not as expected -"
419         " shell dirty? ssh broken?\n"
420         " try running   %s %s %s --sender %s\n"
421         " and expect the first line to be   %s",
422         rsh_program, remotespec->userhost,
423         rcopy_repeatedly_program, remotespec->path,
424         banner);
425   
426   banbuf[sizeof(banbuf)-1]= 0;
427   char *ep;
428   long decllen= strtoul(banbuf + sizeof(banner)-1, &ep, 16);
429   if (*ep) die_protocol("declaration length syntax error");
430   assert(decllen <= sizeof(mainbuf));
431   if (decllen<2) die_protocol("declaration too short");
432
433   r= fread(mainbuf,1,decllen,commsi);
434   if (r!=decllen) die_badrecv("declaration");
435   if (mainbuf[decllen-1] != '\n')
436     die_protocol("declaration missing final newline");
437   if (mainbuf[0] != upload ? 'u' : 'd')
438     die_protocol("declaration incorrect direction indicator");
439
440   sendbyte(REPLMSG_GO);
441
442   if (upload)
443     sender(srcspec.path);
444   else
445     receiver(dstspec.path);
446 }
447
448 static void parse_file_specification(FileSpecification *fs, const char *arg,
449                                      const char *what) {
450   const char *colon;
451   
452   if (!arg) badusage("too few arguments - missing %s\n",what);
453
454   for (colon=arg; ; colon++) {
455     if (!*colon || *colon=='/') {
456       fs->userhost=0;
457       fs->path= arg;
458       return;
459     }
460     if (*colon==':') {
461       char *uh= malloc(colon-arg + 1);  if (!fs->userhost) diem();
462       memcpy(uh,arg, colon-arg);  uh[colon-arg]= 0;
463       fs->userhost= uh;
464       fs->path= colon+1;
465       return;
466     }
467   }
468 }
469
470 int main(int argc, const char *const *argv) {
471   myopt(&argv, cmdinfos);
472
473   if (upload>=0) {
474     if (!argv[0] || argv[1])
475       badusage("server mode must have just the filename as non-option arg");
476     server(argv[0]);
477   } else {
478     parse_file_specification(&srcspec, argv[0], "source");
479     parse_file_specification(&dstspec, argv[1], "destination");
480     if (argv[2]) badusage("too many non-option arguments");
481     if (!!srcspec.userhost == !!dstspec.userhost)
482       badusage("need exactly one remote file argument");
483     client();
484   }
485   return 0;
486 }