* server sends banner
* - "#rcopy-repeatedly#\n"
* - length of declaration, as 4 hex digits, zero prefixed,
- * and a space [5 bytes]. In this protocol version this
+ * and a newline [5 bytes]. In this protocol version this
* will be "0002" but client _must_ parse it.
* server sends declaration
* - one of "u " or "d" [1 byte]
* - optionally, some more ascii text, reserved for future use
- * must be ignored by client (but not sent by server)
+ * must be ignored by declaree (but not sent by declarer)
* - a newline [1 byte]
* client sends
- * - 0x01 go
+ * - 0x02 START
+ * n 2 bytes big endian declaration length
+ * ... client's declaration (ascii text, including newline)
+ 8 see above
* then for each update
* sender sends one of
* - 0x03 destination file should be deleted
* - 0x04 complete new destination file follows, 64-bit length
* l 8 bytes big endian length
* ... l bytes data
- * receiver must then reply with 0x01 GO
+ * receiver must then reply with 0x01 ACK
*/
#define _GNU_SOURCE
#include "myopt.h"
-#define REPLMSG_GO 0x01
+#define REPLMSG_ACK 0x01
+#define REPLMSG_START 0x02
#define REPLMSG_RM 0x03
#define REPLMSG_FILE64 0x04
/* `up' means towards the client,
* since we regard the subprocess as `down' */
+static int udchar;
static double stream_allow_secsperbyte= 1/1e6; /* for initial transfer */
static char mainbuf[65536]; /* must be at least 2^16 */
die_badsend();
}
+static void mfreadcommsi(void *buf, int l, const char *what) {
+ int r= fread(buf,1,l,commsi); if (r!=l) die_badrecv(what);
+}
+static void mfwritecommso(const void *buf, int l) {
+ int r= fwrite(buf,1,l,commso); if (r!=l) die_badsend();
+}
+
static void mpipe(int p[2]) { if (pipe(p)) diee("could not create pipe"); }
static void mdup2(int fd, int fd2) {
if (dup2(fd,fd2)!=fd2) diee("could not dup2(%d,%d)",fd,fd2);
die_badsend();
}
+static int generate_declaration(void) {
+ /* returns length; declaration is left in mainbuf */
+ char *p= mainbuf;
+ *p++= udchar;
+ *p++= '\n';
+ return p - mainbuf;
+}
+
+static void read_declaration(int decllen) {
+ assert(decllen <= sizeof(mainbuf));
+ if (decllen<2) die_protocol("declaration too short");
+ mfreadcommsi(mainbuf,decllen,"declaration");
+ if (mainbuf[decllen-1] != '\n')
+ die_protocol("declaration missing final newline");
+ if (mainbuf[0] != udchar)
+ die_protocol("declaration incorrect direction indicator");
+}
+
static void receiver(const char *filename) {
FILE *newfile;
char *tmpfilename;
if (!newfile) diee("could not create temporary receiving file `%s'",
tmpfilename);
uint8_t lbuf[8];
- r= fread(lbuf,1,8,commsi); if (r!=8) die_badrecv("FILE64 l");
+ mfreadcommsi(lbuf,8,"FILE64 l");
uint64_t l=
(lbuf[0] << 28 << 28) |
diee("could not install new version of destination file `%s'",
filename);
- sendbyte(REPLMSG_GO);
+ sendbyte(REPLMSG_ACK);
break;
default:
};
bandlimit_sendstart();
-
- r= fwrite(hbuf,1,9,commso); if (r!=9) die_badsend();
+
+ mfwritecommso(hbuf,9);
copyfile(f, copydie_inputfile,filename,
commso, copydie_commso,0,
send_flush();
c= fgetc(commsi); if (c==EOF) die_badrecv("ack");
- if (c!=REPLMSG_GO) die_protocol("got %#02x instead of GO",c);
+ if (c!=REPLMSG_ACK) die_protocol("got %#02x instead of ACK",c);
bandlimit_sendend(stab.st_size, &interval_usec);
}
}
-void usagemessage(void) {
- puts("usage: rcopy-repeatedly [<options>] <file> <file>\n"
- " <file> may be <local-file> or [<user>@]<host>:<file>\n"
- " exactly one of each of the two forms must be provided\n"
- " a file is taken as remote if it has a : before the first /\n"
- "options\n"
- " --help\n");
-}
-
typedef struct {
const char *userhost, *path;
} FileSpecification;
*(int*)ci->parg= of__server_int(ci,val);
}
+void usagemessage(void) {
+ printf(
+ "usage: rcopy-repeatedly [<options>] <file> <file>\n"
+ " <file> may be <local-file> or [<user>@]<host>:<file>\n"
+ " exactly one of each of the two forms must be provided\n"
+ " a file is taken as remote if it has a : before the first /\n"
+ "general options:\n"
+ " --help\n"
+ " --quiet | -q\n"
+ "options for bandwidth (and cpu time) control:\n"
+ " --max-bandwidth-percent-mean (default %d)\n"
+ " --max-bandwidth-percent-burst (default %d)\n"
+ " --tx-block-size (default/max %d)\n"
+ " --min-interval-usec (default %d)\n"
+ "options for finding programs:\n"
+ " --rcopy-repeatedly (default: rcopy-repeatedly)\n"
+ " --rsh-program (default: $RCOPY_REPEATEDLY_RSH or $RSYNC_RSH or ssh)\n"
+ "options passed to server side via ssh:\n"
+ " --receiver --sender, bandwidth control options\n",
+ (int)(max_bw_prop_mean*100), (int)(max_bw_prop_burst*100),
+ (int)sizeof(mainbuf), min_interval_usec);
+}
+
static const struct cmdinfo cmdinfos[]= {
{ "help", .call= of_help },
{ "max-bandwidth-percent-mean", 0,1,.call=of_bw,.parg=&max_bw_prop_mean },
{ "max-bandwidth-percent-burst",0,1,.call=of_bw,.parg=&max_bw_prop_burst },
- { "tx-block-size", 0,1,.call=of_server_int, .parg=&txblocksz },
- { "min-interval-usec", 0,1,.call=of_server_int, .parg=&min_interval_usec },
- { "rcopy-repeatedly", 0,1, .sassignto=&rcopy_repeatedly_program },
- { "ssh-program", 0,1, .sassignto=&rsh_program },
- { "receiver", .iassignto=&server_upcopy, .arg=0 },
- { "sender", .iassignto=&server_upcopy, .arg=1 },
+ { "tx-block-size",0, 1,.call=of_server_int, .parg=&txblocksz },
+ { "min-interval-usec",0, 1,.call=of_server_int, .parg=&min_interval_usec },
+ { "rcopy-repeatedly",0, 1, .sassignto=&rcopy_repeatedly_program },
+ { "rsh-program",0, 1, .sassignto=&rsh_program },
+ { "quiet",'q', .iassignto= &verbose, .arg=0 },
+ { "receiver", .iassignto= &server_upcopy, .arg=0 },
+ { "sender", .iassignto= &server_upcopy, .arg=1 },
{ 0 }
};
static void server(const char *filename) {
- int c;
+ int c, l;
+ char buf[2];
+
+ udchar= server_upcopy?'u':'d';
+
commsi= stdin;
commso= stdout;
- fprintf(commso, "%s0002 %c\n", banner, server_upcopy?'u':'d');
+ l= generate_declaration();
+ fprintf(commso, "%s%04x\n", banner, l);
+ mfwritecommso(mainbuf, l);
send_flush();
- c= fgetc(commsi); if (c==EOF) die_badrecv("initial go");
- if (c!=REPLMSG_GO) die_protocol("initial go message was %#02x instead",c);
+
+ c= fgetc(commsi);
+ if (c==EOF) {
+ if (feof(commsi)) exit(14);
+ assert(ferror(commsi)); die_badrecv("initial START message");
+ }
+ if (c!=REPLMSG_START) die_protocol("initial START was %#02x instead",c);
+
+ mfreadcommsi(buf,2,"START l");
+ l= (buf[0] << 8) | buf[1];
+
+ read_declaration(l);
if (server_upcopy)
sender(filename);
static void client(void) {
int uppipe[2], downpipe[2], r;
pid_t child;
+ FileSpecification *remotespec;
+ const char *remotemode;
mpipe(uppipe);
mpipe(downpipe);
- FileSpecification *remotespec= srcspec.userhost ? &srcspec : &dstspec;
- const char *remotemode= srcspec.userhost ? "--sender" : "--receiver";
+ if (srcspec.userhost) {
+ udchar= 'u';
+ remotespec= &srcspec;
+ remotemode= "--sender";
+ } else {
+ udchar= 'd';
+ remotespec= &dstspec;
+ remotemode= "--receiver";
+ }
sargs= xrealloc(sargs, sizeof(*sargs) * (7 + nsargs));
memmove(sargs+5, sargs, sizeof(*sargs) * nsargs);
if (r!=sizeof(banbuf)-1 ||
memcmp(banbuf,banner,sizeof(banner)-1) ||
- banbuf[sizeof(banner)-1 + 4] != ' ') {
+ banbuf[sizeof(banner)-1 + 4] != '\n') {
const char **sap;
int count=0;
for (count=0, sap=sargs; *sap; sap++) count+= strlen(*sap)+1;
banbuf[sizeof(banbuf)-1]= 0;
char *ep;
long decllen= strtoul(banbuf + sizeof(banner)-1, &ep, 16);
- if (ep!=banbuf + sizeof(banner)-1 + 4 || *ep!=' ')
- die_protocol("declaration length syntax error (`%s')",ep);
- assert(decllen <= sizeof(mainbuf));
- if (decllen<2) die_protocol("declaration too short");
+ if (ep != banbuf + sizeof(banner)-1 + 4)
+ die_protocol("declaration length syntax error");
- r= fread(mainbuf,1,decllen,commsi);
- if (r!=decllen) die_badrecv("declaration");
- if (mainbuf[decllen-1] != '\n')
- die_protocol("declaration missing final newline");
- if (mainbuf[0] != (remotespec==&srcspec ? 'u' : 'd'))
- die_protocol("declaration incorrect direction indicator");
+ read_declaration(decllen);
- sendbyte(REPLMSG_GO);
+ int l= generate_declaration();
+ sendbyte(REPLMSG_START);
+ sendbyte((l >> 8) & 0x0ff);
+ sendbyte( l & 0x0ff);
+ mfwritecommso(mainbuf,l);
if (remotespec==&srcspec)
receiver(dstspec.path);