8 #include "portable/mmap.h"
9 #include "portable/time.h"
10 #include "portable/setproctitle.h"
11 #include "portable/socket.h"
12 #include "portable/wait.h"
16 #ifdef HAVE_SYS_SELECT_H
17 # include <sys/select.h>
21 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
25 #include "inn/innconf.h"
26 #include "inn/messages.h"
32 #include "../storage/ovdb/ovdb.h"
33 #include "../storage/ovdb/ovdb-private.h"
35 #ifndef USE_BERKELEY_DB
38 main(int argc UNUSED, char **argv UNUSED)
40 die("BerkeleyDB support not compiled");
43 #else /* USE_BERKELEY_DB */
46 #define SELECT_TIMEOUT 15
49 /* This will work unless user sets a larger clienttimeout
51 #define CLIENT_TIMEOUT (innconf->clienttimeout + 60)
52 /*#define CLIENT_TIMEOUT 3600*/
55 static int listensock;
60 #define STATE_READCMD 0
61 #define STATE_READGROUP 1
73 static struct reader *readertab;
74 static int readertablen;
75 static int numreaders;
84 static struct child *children;
85 #define wholistens (children[ovdb_conf.numrsprocs].num)
87 static int signalled = 0;
89 sigfunc(int sig UNUSED)
94 static int updated = 0;
96 childsig(int sig UNUSED)
102 parentsig(int sig UNUSED)
104 int i, which, smallest;
106 which = smallest = -1;
107 for(i = 0; i < ovdb_conf.numrsprocs; i++) {
108 if(children[i].pid == -1)
110 if(!ovdb_conf.maxrsconn || children[i].num <= ovdb_conf.maxrsconn) {
111 if(smallest == -1 || children[i].num < smallest) {
112 smallest = children[i].num;
119 kill(children[which].pid, SIGUSR1);
127 static int putpid(const char *path)
130 int fd = open(path, O_WRONLY|O_TRUNC|O_CREAT, 0664);
132 syswarn("cannot open %s", path);
135 snprintf(buf, sizeof(buf), "%d\n", getpid());
136 if(write(fd, buf, strlen(buf)) < 0) {
137 syswarn("cannot write to %s", path);
146 do_groupstats(struct reader *r)
148 struct rs_groupstats *reply;
149 char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
150 reply = xmalloc(sizeof(struct rs_groupstats));
152 /*syslog(LOG_DEBUG, "OVDB: rs: do_groupstats '%s'", group);*/
153 if(ovdb_groupstats(group, &reply->lo, &reply->hi, &reply->count, &reply->flag)) {
154 reply->status = CMD_GROUPSTATS;
157 reply->status = CMD_GROUPSTATS | RPLY_ERROR;
161 r->buflen = sizeof(struct rs_groupstats);
163 r->mode = MODE_WRITE;
167 do_opensrch(struct reader *r)
169 struct rs_cmd *cmd = r->buf;
170 struct rs_opensrch *reply;
171 char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
172 reply = xmalloc(sizeof(struct rs_opensrch));
174 /*syslog(LOG_DEBUG, "OVDB: rs: do_opensrch '%s' %d %d", group, cmd->artlo, cmd->arthi);*/
176 if(r->currentsearch != NULL) {
177 /* can only open one search at a time */
178 reply->status = CMD_OPENSRCH | RPLY_ERROR;
180 reply->handle = ovdb_opensearch(group, cmd->artlo, cmd->arthi);
181 if(reply->handle == NULL) {
182 reply->status = CMD_OPENSRCH | RPLY_ERROR;
184 reply->status = CMD_OPENSRCH;
186 r->currentsearch = reply->handle;
190 r->buflen = sizeof(struct rs_opensrch);
192 r->mode = MODE_WRITE;
196 do_srch(struct reader *r)
198 struct rs_cmd *cmd = r->buf;
199 struct rs_srch *reply;
206 if(ovdb_search(cmd->handle, &artnum, &data, &len, &token, &arrived)) {
207 reply = xmalloc(sizeof(struct rs_srch) + len);
208 reply->status = CMD_SRCH;
209 reply->artnum = artnum;
210 reply->token = token;
211 reply->arrived = arrived;
213 memcpy((char *)reply + sizeof(struct rs_srch), data, len);
214 r->buflen = sizeof(struct rs_srch) + len;
216 reply = xmalloc(sizeof(struct rs_srch));
217 reply->status = CMD_SRCH | RPLY_ERROR;
218 r->buflen = sizeof(struct rs_srch);
223 r->mode = MODE_WRITE;
227 do_closesrch(struct reader *r)
229 struct rs_cmd *cmd = r->buf;
231 ovdb_closesearch(cmd->handle);
234 r->bufpos = r->buflen = 0;
236 r->currentsearch = NULL;
240 do_artinfo(struct reader *r)
242 struct rs_cmd *cmd = r->buf;
243 struct rs_artinfo *reply;
244 char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
247 /*syslog(LOG_DEBUG, "OVDB: rs: do_artinfo: '%s' %d", group, cmd->artlo);*/
248 if(ovdb_getartinfo(group, cmd->artlo, &token)) {
249 reply = xmalloc(sizeof(struct rs_artinfo));
250 reply->status = CMD_ARTINFO;
251 reply->token = token;
252 r->buflen = sizeof(struct rs_artinfo);
254 reply = xmalloc(sizeof(struct rs_artinfo));
255 reply->status = CMD_ARTINFO | RPLY_ERROR;
256 r->buflen = sizeof(struct rs_artinfo);
261 r->mode = MODE_WRITE;
266 process_cmd(struct reader *r)
268 struct rs_cmd *cmd = r->buf;
270 if(r->state == STATE_READCMD) {
275 r->state = STATE_READGROUP;
276 if(cmd->grouplen == 0) {
277 /* shoudn't happen... */
278 r->mode = MODE_CLOSED;
284 r->buflen += cmd->grouplen;
285 r->buf = xrealloc(r->buf, r->buflen);
292 ((char *)r->buf)[r->buflen - 1] = 0; /* make sure group is null-terminated */
296 ((char *)r->buf)[r->buflen - 1] = 0;
306 ((char *)r->buf)[r->buflen - 1] = 0;
310 r->mode = MODE_CLOSED;
321 handle_read(struct reader *r)
327 r->state = STATE_READCMD;
328 r->buf = xmalloc(sizeof(struct rs_cmd));
329 r->buflen = sizeof(struct rs_cmd);
333 n = read(r->fd, (char *)(r->buf) + r->bufpos, r->buflen - r->bufpos);
335 if(n < 0 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
337 r->mode = MODE_CLOSED;
344 if(r->bufpos >= r->buflen)
350 handle_write(struct reader *r)
355 if(r->buf == NULL) /* shouldn't happen */
358 n = write(r->fd, (char *)(r->buf) + r->bufpos, r->buflen - r->bufpos);
360 if(n < 0 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
362 r->mode = MODE_CLOSED;
369 if(r->bufpos >= r->buflen) {
372 r->bufpos = r->buflen = 0;
385 if(numreaders >= readertablen) {
387 readertab = xrealloc(readertab, readertablen * sizeof(struct reader));
388 for(i = numreaders; i < readertablen; i++) {
389 readertab[i].mode = MODE_CLOSED;
390 readertab[i].buf = NULL;
394 r = &(readertab[numreaders]);
398 r->mode = MODE_WRITE;
399 r->buflen = sizeof(OVDB_SERVER_BANNER);
401 r->buf = xstrdup(OVDB_SERVER_BANNER);
403 r->currentsearch = NULL;
412 struct reader *r = &(readertab[which]);
414 if(r->mode != MODE_CLOSED)
420 if(r->currentsearch != NULL) {
421 ovdb_closesearch(r->currentsearch);
422 r->currentsearch = NULL;
425 /* numreaders will get decremented by the calling function */
426 for(i = which; i < numreaders-1; i++)
427 readertab[i] = readertab[i+1];
429 readertab[i].mode = MODE_CLOSED;
430 readertab[i].buf = NULL;
437 int i, ret, count, lastfd, lastnumreaders;
439 struct sockaddr_in sa;
447 if (!ovdb_open(OV_READ|OVDB_SERVER))
448 die("cannot open overview");
449 xsignal_norestart(SIGINT, sigfunc);
450 xsignal_norestart(SIGTERM, sigfunc);
451 xsignal_norestart(SIGHUP, sigfunc);
452 xsignal_norestart(SIGUSR1, childsig);
453 xsignal(SIGPIPE, SIG_IGN);
455 numreaders = lastnumreaders = 0;
456 if(ovdb_conf.maxrsconn) {
457 readertablen = ovdb_conf.maxrsconn;
461 readertab = xmalloc(readertablen * sizeof(struct reader));
462 for(i = 0; i < readertablen; i++) {
463 readertab[i].mode = MODE_CLOSED;
464 readertab[i].buf = NULL;
467 setproctitle("0 clients");
474 if(wholistens == me) {
475 if(!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
476 FD_SET(listensock, &rdset);
478 setproctitle("%d client%s *", numreaders,
479 numreaders == 1 ? "" : "s");
482 kill(parent, SIGUSR1);
486 for(i = 0; i < numreaders; i++) {
487 switch(readertab[i].mode) {
489 FD_SET(readertab[i].fd, &rdset);
492 FD_SET(readertab[i].fd, &wrset);
497 if(readertab[i].fd > lastfd)
498 lastfd = readertab[i].fd;
501 tv.tv_sec = SELECT_TIMEOUT;
502 count = select(lastfd + 1, &rdset, &wrset, NULL, &tv);
511 if(FD_ISSET(listensock, &rdset)) {
512 if(!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
514 ret = accept(listensock, (struct sockaddr *)&sa, &salen);
518 children[me].num = numreaders;
519 kill(parent, SIGUSR1);
524 for(i = 0; i < numreaders; i++) {
525 switch(readertab[i].mode) {
527 if(FD_ISSET(readertab[i].fd, &rdset))
528 handle_read(&(readertab[i]));
531 if(FD_ISSET(readertab[i].fd, &wrset))
532 handle_write(&(readertab[i]));
537 for(i = 0; i < numreaders; i++) {
538 if(readertab[i].mode == MODE_CLOSED
539 || readertab[i].lastactive + CLIENT_TIMEOUT < now) {
545 if(children[me].num != numreaders) {
546 children[me].num = numreaders;
547 kill(parent, SIGUSR1);
549 if(numreaders != lastnumreaders) {
550 lastnumreaders = numreaders;
551 setproctitle("%d client%s", numreaders,
552 numreaders == 1 ? "" : "s");
566 while((c = waitpid(-1, &cs, WNOHANG)) > 0) {
567 for(i = 0; i < ovdb_conf.numrsprocs; i++) {
568 if(c == children[i].pid) {
569 if(children[i].started + 30 > time(NULL))
577 if((children[i].pid = serverproc(i)) == -1)
580 children[i].started = time(NULL);
592 #define MAP_ANON MAP_ANONYMOUS
600 return mmap(0, len, PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0);
602 int fd = open("/dev/zero", O_RDWR, 0);
603 char *ptr = mmap(0, len, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
610 main(int argc, char *argv[])
614 char *path, *pidfile;
615 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
616 struct sockaddr_un sa;
618 struct sockaddr_in sa;
623 setproctitle_init(argc, argv);
625 openlog("ovdb_server", L_OPENLOG_FLAGS | LOG_PID, LOG_INN_PROG);
626 message_program_name = "ovdb_server";
628 if(argc != 2 || strcmp(argv[1], SPACES))
629 die("should be started by ovdb_init");
630 message_handlers_warn(1, message_log_syslog_err);
631 message_handlers_die(1, message_log_syslog_err);
633 if (!innconf_read(NULL))
636 if(strcmp(innconf->ovmethod, "ovdb"))
637 die("ovmethod not set to ovdb in inn.conf");
641 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
642 listensock = socket(AF_UNIX, SOCK_STREAM, 0);
644 listensock = socket(AF_INET, SOCK_STREAM, 0);
647 sysdie("cannot create socket");
649 nonblocking(listensock, 1);
651 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
652 sa.sun_family = AF_UNIX;
653 path = concatpath(innconf->pathrun, OVDB_SERVER_SOCKET);
654 strlcpy(sa.sun_path, path, sizeof(sa.sun_path));
657 ret = bind(listensock, (struct sockaddr *)&sa, sizeof sa);
659 sa.sin_family = AF_INET;
660 sa.sin_port = htons(OVDB_SERVER_PORT);
661 sa.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
663 ret = bind(listensock, (struct sockaddr *)&sa, sizeof sa);
665 if(ret != 0 && errno == EADDRNOTAVAIL) {
666 sa.sin_family = AF_INET;
667 sa.sin_port = htons(OVDB_SERVER_PORT);
668 sa.sin_addr.s_addr = INADDR_ANY;
669 ret = bind(listensock, (struct sockaddr *)&sa, sizeof sa);
674 sysdie("cannot bind socket");
675 if(listen(listensock, MAXLISTEN) < 0)
676 sysdie("cannot listen on socket");
678 pidfile = concatpath(innconf->pathrun, OVDB_SERVER_PIDFILE);
682 xsignal_norestart(SIGINT, sigfunc);
683 xsignal_norestart(SIGTERM, sigfunc);
684 xsignal_norestart(SIGHUP, sigfunc);
686 xsignal_norestart(SIGUSR1, parentsig);
687 xsignal_norestart(SIGCHLD, childsig);
690 children = sharemem(sizeof(struct child) * (ovdb_conf.numrsprocs+1));
693 sysdie("cannot mmap shared memory");
694 for(i = 0; i < ovdb_conf.numrsprocs+1; i++) {
695 children[i].pid = -1;
699 for(i = 0; i < ovdb_conf.numrsprocs; i++) {
700 if((children[i].pid = serverproc(i)) == -1) {
701 for(i--; i >= 0; i--)
702 kill(children[i].pid, SIGTERM);
705 children[i].started = time(NULL);
713 if(wholistens == -2) {
715 FD_SET(listensock, &rdset);
717 tv.tv_sec = SELECT_TIMEOUT;
718 ret = select(listensock+1, &rdset, NULL, NULL, &tv);
720 if(ret == 1 && wholistens == -2) {
722 ret = accept(listensock, (struct sockaddr *)&sa, &salen);
731 for(i = 0; i < ovdb_conf.numrsprocs; i++)
732 if(children[i].pid != -1)
733 kill(children[i].pid, SIGTERM);
735 while(wait(&ret) > 0)
744 #endif /* USE_BERKELEY_DB */