chiark / gitweb /
Turn SMstore thing into notes in manpage
[inn-innduct.git] / frontends / ovdb_server.c
1 /*
2  * ovdb_server.c
3  * ovdb read server
4  */
5
6 #include "config.h"
7 #include "clibrary.h"
8 #include "portable/mmap.h"
9 #include "portable/time.h"
10 #include "portable/setproctitle.h"
11 #include "portable/socket.h"
12 #include "portable/wait.h"
13 #include <errno.h>
14 #include <fcntl.h>
15 #include <signal.h>
16 #ifdef HAVE_SYS_SELECT_H
17 # include <sys/select.h>
18 #endif
19 #include <syslog.h>
20
21 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
22 # include <sys/un.h>
23 #endif
24
25 #include "inn/innconf.h"
26 #include "inn/messages.h"
27 #include "libinn.h"
28 #include "paths.h"
29 #include "storage.h"
30 #include "ov.h"
31
32 #include "../storage/ovdb/ovdb.h"
33 #include "../storage/ovdb/ovdb-private.h"
34
35 #ifndef USE_BERKELEY_DB
36
37 int
38 main(int argc UNUSED, char **argv UNUSED)
39 {
40     die("BerkeleyDB support not compiled");
41 }
42
43 #else /* USE_BERKELEY_DB */
44
45
46 #define SELECT_TIMEOUT 15
47
48
49 /* This will work unless user sets a larger clienttimeout
50    in readers.conf */
51 #define CLIENT_TIMEOUT (innconf->clienttimeout + 60)
52 /*#define CLIENT_TIMEOUT 3600*/
53
54
55 static int listensock;
56
57 #define MODE_READ   0
58 #define MODE_WRITE  1
59 #define MODE_CLOSED 2
60 #define STATE_READCMD 0
61 #define STATE_READGROUP 1
62 struct reader {
63     int fd;
64     int mode;
65     int state;
66     int buflen;
67     int bufpos;
68     void *buf;
69     time_t lastactive;
70     void *currentsearch;
71 };
72
73 static struct reader *readertab;
74 static int readertablen;
75 static int numreaders;
76 static time_t now;
77 static pid_t parent;
78
79 struct child {
80     pid_t pid;
81     int num;
82     time_t started;
83 };
84 static struct child *children;
85 #define wholistens (children[ovdb_conf.numrsprocs].num)
86
87 static int signalled = 0;
88 static void
89 sigfunc(int sig UNUSED)
90 {
91     signalled = 1;
92 }
93
94 static int updated = 0;
95 static void
96 childsig(int sig UNUSED)
97 {
98     updated = 1;
99 }
100
101 static void
102 parentsig(int sig UNUSED)
103 {
104     int i, which, smallest;
105     if(wholistens < 0) {
106         which = smallest = -1;
107         for(i = 0; i < ovdb_conf.numrsprocs; i++) {
108             if(children[i].pid == -1)
109                 continue;
110             if(!ovdb_conf.maxrsconn || children[i].num <= ovdb_conf.maxrsconn) {
111                 if(smallest == -1 || children[i].num < smallest) {
112                     smallest = children[i].num;
113                     which = i;
114                 }
115             }
116         }
117         if(which != -1) {
118             wholistens = which;
119             kill(children[which].pid, SIGUSR1);
120         } else {
121             wholistens = -2;
122         }
123         updated = 1;
124     }
125 }
126
127 static int putpid(const char *path)
128 {
129     char buf[30];
130     int fd = open(path, O_WRONLY|O_TRUNC|O_CREAT, 0664);
131     if(fd == -1) {
132         syswarn("cannot open %s", path);
133         return -1;
134     }
135     snprintf(buf, sizeof(buf), "%d\n", getpid());
136     if(write(fd, buf, strlen(buf)) < 0) {
137         syswarn("cannot write to %s", path);
138         close(fd);
139         return -1;
140     }
141     close(fd);
142     return 0;
143 }
144
145 static void
146 do_groupstats(struct reader *r)
147 {
148     struct rs_groupstats *reply;
149     char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
150     reply = xmalloc(sizeof(struct rs_groupstats));
151
152     /*syslog(LOG_DEBUG, "OVDB: rs: do_groupstats '%s'", group);*/
153     if(ovdb_groupstats(group, &reply->lo, &reply->hi, &reply->count, &reply->flag)) {
154         reply->status = CMD_GROUPSTATS;
155         reply->aliaslen = 0;
156     } else {
157         reply->status = CMD_GROUPSTATS | RPLY_ERROR;
158     }
159     free(r->buf);
160     r->buf = reply;
161     r->buflen = sizeof(struct rs_groupstats);
162     r->bufpos = 0;
163     r->mode = MODE_WRITE;
164 }
165
166 static void
167 do_opensrch(struct reader *r)
168 {
169     struct rs_cmd *cmd = r->buf;
170     struct rs_opensrch *reply;
171     char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
172     reply = xmalloc(sizeof(struct rs_opensrch));
173
174     /*syslog(LOG_DEBUG, "OVDB: rs: do_opensrch '%s' %d %d", group, cmd->artlo, cmd->arthi);*/
175
176     if(r->currentsearch != NULL) {
177         /* can only open one search at a time */
178         reply->status = CMD_OPENSRCH | RPLY_ERROR;
179     } else {
180         reply->handle = ovdb_opensearch(group, cmd->artlo, cmd->arthi);
181         if(reply->handle == NULL) {
182             reply->status = CMD_OPENSRCH | RPLY_ERROR;
183         } else {
184             reply->status = CMD_OPENSRCH;
185         }
186         r->currentsearch = reply->handle;
187     }
188     free(r->buf);
189     r->buf = reply;
190     r->buflen = sizeof(struct rs_opensrch);
191     r->bufpos = 0;
192     r->mode = MODE_WRITE;
193 }
194
195 static void
196 do_srch(struct reader *r)
197 {
198     struct rs_cmd *cmd = r->buf;
199     struct rs_srch *reply;
200     ARTNUM artnum;
201     TOKEN token;
202     time_t arrived;
203     int len;
204     char *data;
205
206     if(ovdb_search(cmd->handle, &artnum, &data, &len, &token, &arrived)) {
207         reply = xmalloc(sizeof(struct rs_srch) + len);
208         reply->status = CMD_SRCH;
209         reply->artnum = artnum;
210         reply->token = token;
211         reply->arrived = arrived;
212         reply->len = len;
213         memcpy((char *)reply + sizeof(struct rs_srch), data, len);
214         r->buflen = sizeof(struct rs_srch) + len;
215     } else {
216         reply = xmalloc(sizeof(struct rs_srch));
217         reply->status = CMD_SRCH | RPLY_ERROR;
218         r->buflen = sizeof(struct rs_srch);
219     }
220     free(r->buf);
221     r->buf = reply;
222     r->bufpos = 0;
223     r->mode = MODE_WRITE;
224 }
225
226 static void
227 do_closesrch(struct reader *r)
228 {
229     struct rs_cmd *cmd = r->buf;
230
231     ovdb_closesearch(cmd->handle);
232     free(r->buf);
233     r->buf = NULL;
234     r->bufpos = r->buflen = 0;
235     r->mode = MODE_READ;
236     r->currentsearch = NULL;
237 }
238
239 static void
240 do_artinfo(struct reader *r)
241 {
242     struct rs_cmd *cmd = r->buf;
243     struct rs_artinfo *reply;
244     char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
245     TOKEN token;
246
247     /*syslog(LOG_DEBUG, "OVDB: rs: do_artinfo: '%s' %d", group, cmd->artlo);*/
248     if(ovdb_getartinfo(group, cmd->artlo, &token)) {
249         reply = xmalloc(sizeof(struct rs_artinfo));
250         reply->status = CMD_ARTINFO;
251         reply->token = token;
252         r->buflen = sizeof(struct rs_artinfo);
253     } else {
254         reply = xmalloc(sizeof(struct rs_artinfo));
255         reply->status = CMD_ARTINFO | RPLY_ERROR;
256         r->buflen = sizeof(struct rs_artinfo);
257     }
258     free(r->buf);
259     r->buf = reply;
260     r->bufpos = 0;
261     r->mode = MODE_WRITE;
262 }
263
264
265 static int
266 process_cmd(struct reader *r)
267 {
268     struct rs_cmd *cmd = r->buf;
269
270     if(r->state == STATE_READCMD) {
271         switch(cmd->what) {
272         case CMD_GROUPSTATS:
273         case CMD_OPENSRCH:
274         case CMD_ARTINFO:
275             r->state = STATE_READGROUP;
276             if(cmd->grouplen == 0) {
277                 /* shoudn't happen... */
278                 r->mode = MODE_CLOSED;
279                 close(r->fd);
280                 free(r->buf);
281                 r->buf = NULL;
282                 return 0;
283             }
284             r->buflen += cmd->grouplen;
285             r->buf = xrealloc(r->buf, r->buflen);
286             return 1;
287         }
288     }
289
290     switch(cmd->what) {
291     case CMD_GROUPSTATS:
292         ((char *)r->buf)[r->buflen - 1] = 0;    /* make sure group is null-terminated */
293         do_groupstats(r);
294         break;
295     case CMD_OPENSRCH:
296         ((char *)r->buf)[r->buflen - 1] = 0;
297         do_opensrch(r);
298         break;
299     case CMD_SRCH:
300         do_srch(r);
301         break;
302     case CMD_CLOSESRCH:
303         do_closesrch(r);
304         break;
305     case CMD_ARTINFO:
306         ((char *)r->buf)[r->buflen - 1] = 0;
307         do_artinfo(r);
308         break;
309     default:
310         r->mode = MODE_CLOSED;
311         close(r->fd);
312         free(r->buf);
313         r->buf = NULL;
314         break;
315     }
316
317     return 0;
318 }
319
320 static void
321 handle_read(struct reader *r)
322 {
323     int n;
324     r->lastactive = now;
325
326     if(r->buf == NULL) {
327         r->state = STATE_READCMD;
328         r->buf = xmalloc(sizeof(struct rs_cmd));
329         r->buflen = sizeof(struct rs_cmd);
330         r->bufpos = 0;
331     }
332 again:
333     n = read(r->fd, (char *)(r->buf) + r->bufpos, r->buflen - r->bufpos);
334     if(n <= 0) {
335         if(n < 0 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
336             return;
337         r->mode = MODE_CLOSED;
338         close(r->fd);
339         free(r->buf);
340         r->buf = NULL;
341     }
342     r->bufpos += n;
343
344     if(r->bufpos >= r->buflen)
345         if(process_cmd(r))
346             goto again;
347 }
348
349 static void
350 handle_write(struct reader *r)
351 {
352     int n;
353     r->lastactive = now;
354
355     if(r->buf == NULL)  /* shouldn't happen */
356         return;
357
358     n = write(r->fd, (char *)(r->buf) + r->bufpos, r->buflen - r->bufpos);
359     if(n <= 0) {
360         if(n < 0 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
361             return;
362         r->mode = MODE_CLOSED;
363         close(r->fd);
364         free(r->buf);
365         r->buf = NULL;
366     }
367     r->bufpos += n;
368
369     if(r->bufpos >= r->buflen) {
370         free(r->buf);
371         r->buf = NULL;
372         r->bufpos = r->buflen = 0;
373         r->mode = MODE_READ;
374     }
375 }
376
377 static void
378 newclient(int fd)
379 {
380     struct reader *r;
381     int i;
382
383     nonblocking(fd, 1);
384
385     if(numreaders >= readertablen) {
386         readertablen += 50;
387         readertab = xrealloc(readertab, readertablen * sizeof(struct reader));
388         for(i = numreaders; i < readertablen; i++) {
389             readertab[i].mode = MODE_CLOSED;
390             readertab[i].buf = NULL;
391         }
392     }
393
394     r = &(readertab[numreaders]);
395     numreaders++;
396
397     r->fd = fd;
398     r->mode = MODE_WRITE;
399     r->buflen = sizeof(OVDB_SERVER_BANNER);
400     r->bufpos = 0;
401     r->buf = xstrdup(OVDB_SERVER_BANNER);
402     r->lastactive = now;
403     r->currentsearch = NULL;
404
405     handle_write(r);
406 }
407
408 static void
409 delclient(int which)
410 {
411     int i;
412     struct reader *r = &(readertab[which]);
413
414     if(r->mode != MODE_CLOSED)
415         close(r->fd);
416
417     if(r->buf != NULL) {
418         free(r->buf);
419     }
420     if(r->currentsearch != NULL) {
421         ovdb_closesearch(r->currentsearch);
422         r->currentsearch = NULL;
423     }
424
425     /* numreaders will get decremented by the calling function */
426     for(i = which; i < numreaders-1; i++)
427         readertab[i] = readertab[i+1];
428
429     readertab[i].mode = MODE_CLOSED;
430     readertab[i].buf = NULL;
431 }
432
433 static pid_t
434 serverproc(int me)
435 {
436     fd_set rdset, wrset;
437     int i, ret, count, lastfd, lastnumreaders;
438     socklen_t salen;
439     struct sockaddr_in sa;
440     struct timeval tv;
441     pid_t pid;
442
443     pid = fork();
444     if (pid != 0)
445         return pid;
446
447     if (!ovdb_open(OV_READ|OVDB_SERVER))
448         die("cannot open overview");
449     xsignal_norestart(SIGINT, sigfunc);
450     xsignal_norestart(SIGTERM, sigfunc);
451     xsignal_norestart(SIGHUP, sigfunc);
452     xsignal_norestart(SIGUSR1, childsig);
453     xsignal(SIGPIPE, SIG_IGN);
454
455     numreaders = lastnumreaders = 0;
456     if(ovdb_conf.maxrsconn) {
457         readertablen = ovdb_conf.maxrsconn;
458     } else {
459         readertablen = 50;
460     }
461     readertab = xmalloc(readertablen * sizeof(struct reader));
462     for(i = 0; i < readertablen; i++) {
463         readertab[i].mode = MODE_CLOSED;
464         readertab[i].buf = NULL;
465     }
466
467     setproctitle("0 clients");
468
469     /* main loop */
470     while(!signalled) {
471         FD_ZERO(&rdset);
472         FD_ZERO(&wrset);
473         lastfd = 0;
474         if(wholistens == me) {
475             if(!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
476                 FD_SET(listensock, &rdset);
477                 lastfd = listensock;
478                 setproctitle("%d client%s *", numreaders,
479                              numreaders == 1 ? "" : "s");
480             } else {
481                 wholistens = -1;
482                 kill(parent, SIGUSR1);
483             }
484         }
485
486         for(i = 0; i < numreaders; i++) {
487             switch(readertab[i].mode) {
488             case MODE_READ:
489                 FD_SET(readertab[i].fd, &rdset);
490                 break;
491             case MODE_WRITE:
492                 FD_SET(readertab[i].fd, &wrset);
493                 break;
494             default:
495                 continue;
496             }
497             if(readertab[i].fd > lastfd)
498                 lastfd = readertab[i].fd;
499         }
500         tv.tv_usec = 0;
501         tv.tv_sec = SELECT_TIMEOUT;
502         count = select(lastfd + 1, &rdset, &wrset, NULL, &tv);
503
504         if(signalled)
505             break;
506         if(count <= 0)
507             continue;
508
509         now = time(NULL);
510
511         if(FD_ISSET(listensock, &rdset)) {
512             if(!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
513                 salen = sizeof(sa);
514                 ret = accept(listensock, (struct sockaddr *)&sa, &salen);
515                 if(ret >= 0) {
516                     newclient(ret);
517                     wholistens = -1;
518                     children[me].num = numreaders;
519                     kill(parent, SIGUSR1);
520                 }
521             }
522         }
523
524         for(i = 0; i < numreaders; i++) {
525             switch(readertab[i].mode) {
526             case MODE_READ:
527                 if(FD_ISSET(readertab[i].fd, &rdset))
528                     handle_read(&(readertab[i]));
529                 break;
530             case MODE_WRITE:
531                 if(FD_ISSET(readertab[i].fd, &wrset))
532                     handle_write(&(readertab[i]));
533                 break;
534             }
535         }
536
537         for(i = 0; i < numreaders; i++) {
538             if(readertab[i].mode == MODE_CLOSED
539                   || readertab[i].lastactive + CLIENT_TIMEOUT < now) {
540                 delclient(i);
541                 numreaders--;
542                 i--;
543             }
544         }
545         if(children[me].num != numreaders) {
546             children[me].num = numreaders;
547             kill(parent, SIGUSR1);
548         }
549         if(numreaders != lastnumreaders) {
550             lastnumreaders = numreaders;
551             setproctitle("%d client%s", numreaders,
552                          numreaders == 1 ? "" : "s");
553         }
554     }
555
556     ovdb_close();
557     exit(0);
558 }
559
560 static int
561 reap(void)
562 {
563     int i, cs;
564     pid_t c;
565
566     while((c = waitpid(-1, &cs, WNOHANG)) > 0) {
567         for(i = 0; i < ovdb_conf.numrsprocs; i++) {
568             if(c == children[i].pid) {
569                 if(children[i].started + 30 > time(NULL))
570                     return 1;
571
572                 children[i].num = 0;
573
574                 if(wholistens == i)
575                     wholistens = -1;
576
577                 if((children[i].pid = serverproc(i)) == -1)
578                     return 1;
579
580                 children[i].started = time(NULL);
581                 break;
582             }
583         }
584     }
585     if(wholistens == -1)
586         parentsig(SIGUSR1);
587     return 0;
588 }
589
590 #ifndef MAP_ANON
591 #ifdef MAP_ANONYMOUS
592 #define MAP_ANON MAP_ANONYMOUS
593 #endif
594 #endif
595
596 static void *
597 sharemem(size_t len)
598 {
599 #ifdef MAP_ANON
600     return mmap(0, len, PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0);
601 #else
602     int fd = open("/dev/zero", O_RDWR, 0);
603     char *ptr = mmap(0, len, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
604     close(fd);
605     return ptr;
606 #endif
607 }
608
609 int
610 main(int argc, char *argv[])
611 {
612     int i, ret;
613     socklen_t salen;
614     char *path, *pidfile;
615 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
616     struct sockaddr_un sa;
617 #else
618     struct sockaddr_in sa;
619 #endif
620     struct timeval tv;
621     fd_set rdset;
622
623     setproctitle_init(argc, argv);
624
625     openlog("ovdb_server", L_OPENLOG_FLAGS | LOG_PID, LOG_INN_PROG);
626     message_program_name = "ovdb_server";
627
628     if(argc != 2 || strcmp(argv[1], SPACES))
629         die("should be started by ovdb_init");
630     message_handlers_warn(1, message_log_syslog_err);
631     message_handlers_die(1, message_log_syslog_err);
632
633     if (!innconf_read(NULL))
634         exit(1);
635
636     if(strcmp(innconf->ovmethod, "ovdb"))
637         die("ovmethod not set to ovdb in inn.conf");
638
639     read_ovdb_conf();
640
641 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
642     listensock = socket(AF_UNIX, SOCK_STREAM, 0);
643 #else
644     listensock = socket(AF_INET, SOCK_STREAM, 0);
645 #endif
646     if(listensock < 0)
647         sysdie("cannot create socket");
648
649     nonblocking(listensock, 1);
650
651 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
652     sa.sun_family = AF_UNIX;
653     path = concatpath(innconf->pathrun, OVDB_SERVER_SOCKET);
654     strlcpy(sa.sun_path, path, sizeof(sa.sun_path));
655     unlink(sa.sun_path);
656     free(path);
657     ret = bind(listensock, (struct sockaddr *)&sa, sizeof sa);
658 #else
659     sa.sin_family = AF_INET;
660     sa.sin_port = htons(OVDB_SERVER_PORT);
661     sa.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
662     
663     ret = bind(listensock, (struct sockaddr *)&sa, sizeof sa);
664
665     if(ret != 0 && errno == EADDRNOTAVAIL) {
666         sa.sin_family = AF_INET;
667         sa.sin_port = htons(OVDB_SERVER_PORT);
668         sa.sin_addr.s_addr = INADDR_ANY;
669         ret = bind(listensock, (struct sockaddr *)&sa, sizeof sa);
670     }
671 #endif
672
673     if(ret != 0)
674         sysdie("cannot bind socket");
675     if(listen(listensock, MAXLISTEN) < 0)
676         sysdie("cannot listen on socket");
677
678     pidfile = concatpath(innconf->pathrun, OVDB_SERVER_PIDFILE);
679     if(putpid(pidfile))
680         exit(1);
681
682     xsignal_norestart(SIGINT, sigfunc);
683     xsignal_norestart(SIGTERM, sigfunc);
684     xsignal_norestart(SIGHUP, sigfunc);
685
686     xsignal_norestart(SIGUSR1, parentsig);
687     xsignal_norestart(SIGCHLD, childsig);
688     parent = getpid();
689
690     children = sharemem(sizeof(struct child) * (ovdb_conf.numrsprocs+1));
691
692     if(children == NULL)
693         sysdie("cannot mmap shared memory");
694     for(i = 0; i < ovdb_conf.numrsprocs+1; i++) {
695         children[i].pid = -1;
696         children[i].num = 0;
697     }
698
699     for(i = 0; i < ovdb_conf.numrsprocs; i++) {
700         if((children[i].pid = serverproc(i)) == -1) {
701             for(i--; i >= 0; i--)
702                 kill(children[i].pid, SIGTERM);
703             exit(1);
704         }
705         children[i].started = time(NULL);
706         sleep(1);
707     }
708
709     while(!signalled) {
710         if(reap())
711             break;
712
713         if(wholistens == -2) {
714             FD_ZERO(&rdset);
715             FD_SET(listensock, &rdset);
716             tv.tv_usec = 0;
717             tv.tv_sec = SELECT_TIMEOUT;
718             ret = select(listensock+1, &rdset, NULL, NULL, &tv);
719
720             if(ret == 1 && wholistens == -2) {
721                 salen = sizeof(sa);
722                 ret = accept(listensock, (struct sockaddr *)&sa, &salen);
723                 if(ret >= 0)
724                    close(ret);
725             }
726         } else {
727             pause();
728         }
729     }
730
731     for(i = 0; i < ovdb_conf.numrsprocs; i++)
732         if(children[i].pid != -1)
733             kill(children[i].pid, SIGTERM);
734
735     while(wait(&ret) > 0)
736         ;
737
738     unlink(pidfile);
739
740     exit(0);
741 }
742
743
744 #endif /* USE_BERKELEY_DB */