chiark / gitweb /
wip rcopy-repeatedly
[chiark-utils.git] / cprogs / rcopy-repeatedly.c
1 /*
2  * protocol is:
3  *   server sends banner
4  *    - "chiark-realtime-replicator\n" [27 bytes]
5  *    - length of declaration, as 4 hex digits, zero prefixed,
6  *      and a space [5 bytes].  In this protocol version this
7  *      will be "0002" but client _must_ parse it.
8  *   server sends declaration
9  *    - one of "u " or "d" [1 byte]
10  *    - optionally, some more ascii text, reserved for future use
11  *      must be ignored by client (but not sent by server)
12  *    - a newline [1 byte]
13  *   client sends
14  *    - 0x01   go
15  * then for each update
16  *   sender sends one of
17  *    - 0x03   destination file should be deleted
18  *             but note that contents must be retained by receiver
19  *             as it may be used for rle updates
20  *    - 0x04   complete new destination file follows, 64-bit length
21  *        l    8 bytes big endian length
22  *        ...  l bytes data
23  *             receiver must then reply with 0x01 GO
24  */
25
26 #define REPLMSG_GO     0x01
27 #define REPLMSG_RLE8   0x02
28 #define REPLMSG_RM     0x03
29 #define REPLMSG_FILE64 0x04
30
31 static FILE *commsi, *commso;
32 static long interval_usec;
33 static int bytes_per_sec_log2;
34
35 static void vdie(int ec, int eno, const char *fmt, va_list al) {
36   fputs("realtime-replicator: ",stderr);
37   vfprintf(stderr,fmt,al);
38   if (eno!=-1) fprintf(stderr,": %s",strerror(eno));
39   fputc('\n',stderr);
40   exit(ec);
41 }
42 static void die(int ec, int eno, const char *fmt, ...) {
43   va_arg al;
44   va_start(al,fmt);
45   vdiee(ec,eno,fmt,al);
46 }
47
48 static void diem(void) { die(16,errno,"malloc failed"); }
49
50 static void diee(const char *fmt, ...) {
51   va_arg al;
52   va_start(al,fmt);
53   diee(8,errno,fmt,al);
54 }
55 static void die_protocol(const char *fmt, ...) {
56   va_arg al;
57   va_start(al,fmt);
58   diee(12,-1,fmt,al);
59
60 static void die_badrecv(const char *what) {
61   if (ferror(commsi)) diee("communication failed while receiving %s", what);
62   if (feof(commsi)) die_protocol("receiver got unexpected EOF in %s", what);
63   abort();
64 }
65 static void die_badsend(void) {
66   diee("transmission failed");
67 }
68
69 static void receiver_write(const unsigned char *buf, int n,
70                            FILE *newfile, const char *tmpfilename) {
71   int r;
72
73   r= fwrite(dbuf,1,n,newfile);
74   if (r != n) diee("failed to write temporary receiving file `%s'",
75                    tmpfilename);
76 }
77
78 typedef static void copyfile_die_fn(FILE *f, const char *xi);
79  
80 static void copyfile(FILE *sf, copyfile_die_fn *sdie, const char *sxi
81                      FILE *df, copyfile_die_fn *ddie, const char *dxi,
82                      uint64_t l) {
83   char buf[65536];
84
85 fixme rate limit
86 fixme adjustable chunk size
87
88   while (l>=0) {
89     now= l < sizeof(buf) ? l : sizeof(buf);
90
91     
92
93     r= fread(buf,1,now,sf);  if (r!=now) sdie(sf,sxi);
94     r= fwrite(buf,1,now,df);  if (r!=now) ddie(df,dxi);
95     l -= now;
96   }
97 }
98
99 static void copydie_tmpwrite(FILE *f, const char *tmpfilename) {
100   diee("write failed to temporary receiving file `%s'", tmpfilename);
101 }
102 static void copydie_commsi(FILE *f, const char *what) {
103   die_badrecv(what);
104 }
105 static void copydie_commso(FILE *f, const char *what) {
106   die_badsend();
107 }
108   
109 static void receiver(const char *filename) {
110   FILE *newfile;
111   char *tmpfilename;
112
113   if (asprintf(&tmpfilename, ".realtime-replicator.#%s#")==-1) diem();
114   
115   r= unlink(tmpfilename);
116   if (r && errno!=ENOENT)
117     diee("could not remove temporary receiving file `%s'", tmpfilename);
118   
119   for (;;) {
120     send_flush();
121     c= fgetc(commsi);
122
123     switch (c) {
124
125     case EOF:
126       if (ferror(commsi)) die_badrecv("transfer message code");
127       assert(feof(commsi));
128       return;
129
130     case REPLMSG_FILE64:
131       newfile= fopen(tmpfilename, "w");
132       if (!newfile) diee("could not create temporary receiving file `%s'",
133                          tmpfilename);
134       uint8_t lbuf[8];
135       r= fread(lbuf,1,8,commsi);  if (r!=8) die_badrecv("FILE64 l");
136
137       uint64_t l=
138         (lbuf[0] << 28 << 28) |
139         (lbuf[1] << 24 << 24) |
140         (lbuf[2] << 16 << 24) |
141         (lbuf[3] <<  8 << 24) |
142         (lbuf[4]       << 24) |
143         (lbuf[5]       << 16) |
144         (lbuf[6]       <<  8) |
145         (lbuf[7]            ) ;
146
147       copyfile(commsi, copydie_commsi,"FILE64 file data",
148                newfile, copydie_tmpwrite,tmpfilename,
149                l);
150
151       if (fclose(newfile)) diee("could not flush and close temporary"
152                                 " receiving file `%s'", tmpfilename);
153       if (rename(tmpfilename, filename))
154         diee("could not install new version of destination file `%s'",
155              filename);
156
157       sendbyte(REPLMSG_GO);
158
159     default:
160       die_protocol("unknown transfer message code 0x%02x",c);
161
162     }
163   }
164 }
165
166 static void sender(const char *filename) {
167   FILE *f;
168   int told_removed= 0;
169   struct stat stab;
170
171   for (;;) {
172     f= fopen(filename, "r");
173     if (!f) {
174       if (errno!=ENOENT) diee("could not access source file `%s'",filename);
175       if (told_removed) {
176         usleep(interval_usec);
177         continue;
178       }
179       told_removed= 1;
180       sendbyte(REPLMSG_RM);
181       continue;
182     }
183
184     r= fstat(fileno(f),&stab);
185     if (r) diee("could not fstat source file `%s'",filename);
186
187     if (!S_ISREG(stab.st_mode))
188       die(12,-1,"source file `%s' is not a plain file",filename);
189
190     uint8_t hbuf[9]= {
191       REPLMSG_FILE64,
192       stab.st_size >> 28 >> 28,
193       stab.st_size >> 24 >> 24,
194       stab.st_size >> 16 >> 24,
195       stab.st_size >>  8 >> 24,
196       stab.st_size       >> 24,
197       stab.st_size       >> 16,
198       stab.st_size       >>  8,
199       stab.st_size
200     };
201     
202     r= fwrite(hbuf,1,9,commso);  if (r!=9) die_badsend();
203     told_removed= 0;
204
205     copyfile(f, copydie_inputfile,filename,
206              commso, copydie_commso,0);
207
208     send_flush();
209
210     if (fclose(f)) diee("couldn't close source file `%s'",filename);
211
212     c= fgetc(commsi);  if (c==EOF) die_badrecv("ack");
213     if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c);
214
215     usleep(interval_usec);
216   }
217 }
218
219 int main(int argc, const char **argv) {
220   for (