chiark / gitweb /
fixes
[inn-innduct.git] / backends / batcher.c
1 /*  $Id: batcher.c 6762 2004-05-17 04:24:53Z rra $
2 **
3 **  Read batchfiles on standard input and spew out batches.
4 */
5
6 #include "config.h"
7 #include "clibrary.h"
8 #include <ctype.h>
9 #include <errno.h>
10 #include <fcntl.h>
11 #include <signal.h>
12 #include <syslog.h> 
13 #include <sys/stat.h>
14
15 #include "inn/innconf.h"
16 #include "inn/messages.h"
17 #include "inn/timer.h"
18 #include "libinn.h"
19 #include "paths.h"
20 #include "storage.h"
21
22
23 /*
24 **  Global variables.
25 */
26 static bool     BATCHopen;
27 static bool     STATprint;
28 static double   STATbegin;
29 static double   STATend;
30 static char     *Host;
31 static char     *InitialString;
32 static char     *Input;
33 static char     *Processor;
34 static int      ArtsInBatch;
35 static int      ArtsWritten;
36 static int      BATCHcount;
37 static int      MaxBatches;
38 static int      BATCHstatus;
39 static long     BytesInBatch = 60 * 1024;
40 static long     BytesWritten;
41 static long     MaxArts;
42 static long     MaxBytes;
43 static sig_atomic_t     GotInterrupt;
44 static const char *Separator = "#! rnews %ld";
45 static char     *ERRLOG;
46
47 /*
48 **  Start a batch process.
49 */
50 static FILE *
51 BATCHstart(void)
52 {
53     FILE        *F;
54     char        buff[SMBUF];
55
56     if (Processor && *Processor) {
57         snprintf(buff, sizeof(buff), Processor, Host);
58         F = popen(buff, "w");
59         if (F == NULL)
60             return NULL;
61     }
62     else
63         F = stdout;
64     BATCHopen = true;
65     BATCHcount++;
66     return F;
67 }
68
69
70 /*
71 **  Close a batch, return exit status.
72 */
73 static int
74 BATCHclose(FILE *F)
75 {
76     BATCHopen = false;
77     if (F == stdout)
78         return fflush(stdout) == EOF ? 1 : 0;
79     return pclose(F);
80 }
81
82
83 /*
84 **  Update the batch file and exit.
85 */
86 static void
87 RequeueAndExit(off_t Cookie, char *line, long BytesInArt)
88 {
89     static char LINE1[] = "batcher %s times user %.3f system %.3f elapsed %.3f";
90     static char LINE2[] ="batcher %s stats batches %d articles %d bytes %ld";
91     char        *spool;
92     char        buff[BIG_BUFFER];
93     int         i;
94     FILE        *F;
95     double      usertime;
96     double      systime;
97
98     /* Do statistics. */
99     STATend = TMRnow_double();
100     if (GetResourceUsage(&usertime, &systime) < 0) {
101         usertime = 0;
102         systime = 0;
103     }
104
105     if (STATprint) {
106         printf(LINE1, Host, usertime, systime, STATend - STATbegin);
107         printf("\n");
108         printf(LINE2, Host, BATCHcount, ArtsWritten, BytesWritten);
109         printf("\n");
110     }
111
112     syslog(L_NOTICE, LINE1, Host, usertime, systime, STATend - STATbegin);
113     syslog(L_NOTICE, LINE2, Host, BATCHcount, ArtsWritten, BytesWritten);
114
115     /* Last batch exit okay? */
116     if (BATCHstatus == 0) {
117         if (feof(stdin) && Cookie != -1) {
118             /* Yes, and we're all done -- remove input and exit. */
119             fclose(stdin);
120             if (Input)
121                 unlink(Input);
122             exit(0);
123         }
124     }
125
126     /* Make an appropriate spool file. */
127     if (Input == NULL)
128         spool = concatpath(innconf->pathoutgoing, Host);
129     else
130         spool = concat(Input, ".bch", (char *) 0);
131     if ((F = xfopena(spool)) == NULL)
132         sysdie("%s cannot open %s", Host, spool);
133
134     /* If we can back up to where the batch started, do so. */
135     i = 0;
136     if (Cookie != -1 && fseeko(stdin, Cookie, SEEK_SET) == -1) {
137         syswarn("%s cannot seek", Host);
138         i = 1;
139     }
140
141     /* Write the line we had; if the fseeko worked, this will be an
142      * extra line, but that's okay. */
143     if (line && fprintf(F, "%s %ld\n", line, BytesInArt) == EOF) {
144         syswarn("%s cannot write spool", Host);
145         i = 1;
146     }
147
148     /* Write rest of stdin to spool. */
149     while (fgets(buff, sizeof buff, stdin) != NULL) 
150         if (fputs(buff, F) == EOF) {
151             syswarn("%s cannot write spool", Host);
152             i = 1;
153             break;
154         }
155     if (fclose(F) == EOF) {
156         syswarn("%s cannot close spool", Host);
157         i = 1;
158     }
159
160     /* If we had a named input file, try to rename the spool. */
161     if (Input != NULL && rename(spool, Input) < 0) {
162         syswarn("%s cannot rename spool", Host);
163         i = 1;
164     }
165
166     exit(i);
167     /* NOTREACHED */
168 }
169
170
171 /*
172 **  Mark that we got interrupted.
173 */
174 static RETSIGTYPE
175 CATCHinterrupt(int s)
176 {
177     GotInterrupt = true;
178
179     /* Let two interrupts kill us. */
180     xsignal(s, SIG_DFL);
181 }
182
183
184 int
185 main(int ac, char *av[])
186 {
187     bool        Redirect;
188     FILE        *F;
189     const char  *AltSpool;
190     char        *p;
191     char        *data;
192     char        line[BIG_BUFFER];
193     char        buff[BIG_BUFFER];
194     int         BytesInArt;
195     long        BytesInCB;
196     off_t       Cookie;
197     size_t      datasize;
198     int         i;
199     int         ArtsInCB;
200     int         length;
201     TOKEN       token;
202     ARTHANDLE   *art;
203     char        *artdata;
204
205     /* Set defaults. */
206     openlog("batcher", L_OPENLOG_FLAGS | LOG_PID, LOG_INN_PROG);
207     message_program_name = "batcher";
208     if (!innconf_read(NULL))
209         exit(1);
210     AltSpool = NULL;
211     Redirect = true;
212     umask(NEWSUMASK);
213     ERRLOG = concatpath(innconf->pathlog, _PATH_ERRLOG);
214
215     /* Parse JCL. */
216     while ((i = getopt(ac, av, "a:A:b:B:i:N:p:rs:S:v")) != EOF)
217         switch (i) {
218         default:
219             die("usage error");
220             break;
221         case 'a':
222             ArtsInBatch = atoi(optarg);
223             break;
224         case 'A':
225             MaxArts = atol(optarg);
226             break;
227         case 'b':
228             BytesInBatch = atol(optarg);
229             break;
230         case 'B':
231             MaxBytes = atol(optarg);
232             break;
233         case 'i':
234             InitialString = optarg;
235             break;
236         case 'N':
237             MaxBatches = atoi(optarg);
238             break;
239         case 'p':
240             Processor = optarg;
241             break;
242         case 'r':
243             Redirect = false;
244             break;
245         case 's':
246             Separator = optarg;
247             break;
248         case 'S':
249             AltSpool = optarg;
250             break;
251         case 'v':
252             STATprint = true;
253             break;
254         }
255     if (MaxArts && ArtsInBatch == 0)
256         ArtsInBatch = MaxArts;
257     if (MaxBytes && BytesInBatch == 0)
258         BytesInBatch = MaxBytes;
259
260     /* Parse arguments. */
261     ac -= optind;
262     av += optind;
263     if (ac != 1 && ac != 2)
264         die("usage error");
265     Host = av[0];
266     if ((Input = av[1]) != NULL) {
267         if (Input[0] != '/')
268             Input = concatpath(innconf->pathoutgoing, av[1]);
269         if (freopen(Input, "r", stdin) == NULL)
270             sysdie("%s cannot open %s", Host, Input);
271     }
272
273     if (Redirect)
274         freopen(ERRLOG, "a", stderr);
275
276     /* Go to where the articles are. */
277     if (chdir(innconf->patharticles) < 0)
278         sysdie("%s cannot chdir to %s", Host, innconf->patharticles);
279
280     /* Set initial counters, etc. */
281     datasize = 8 * 1024;
282     data = xmalloc(datasize);
283     BytesInCB = 0;
284     ArtsInCB = 0;
285     Cookie = -1;
286     GotInterrupt = false;
287     xsignal(SIGHUP, CATCHinterrupt);
288     xsignal(SIGINT, CATCHinterrupt);
289     xsignal(SIGTERM, CATCHinterrupt);
290     /* xsignal(SIGPIPE, CATCHinterrupt); */
291     STATbegin = TMRnow_double();
292
293     SMinit();
294     F = NULL;
295     while (fgets(line, sizeof line, stdin) != NULL) {
296         /* Record line length in case we do an ftello. Not portable to
297          * systems with non-Unix file formats. */
298         length = strlen(line);
299         Cookie = ftello(stdin) - length;
300
301         /* Get lines like "name size" */
302         if ((p = strchr(line, '\n')) == NULL) {
303             warn("%s skipping %.40s: too long", Host, line);
304             continue;
305         }
306         *p = '\0';
307         if (line[0] == '\0' || line[0] == '#')
308             continue;
309         if ((p = strchr(line, ' ')) != NULL) {
310             *p++ = '\0';
311             /* Try to be forgiving of bad input. */
312             BytesInArt = CTYPE(isdigit, (int)*p) ? atol(p) : -1;
313         }
314         else
315             BytesInArt = -1;
316
317         /* Strip of leading spool pathname. */
318         if (line[0] == '/'
319          && line[strlen(innconf->patharticles)] == '/'
320          && strncmp(line, innconf->patharticles, strlen(innconf->patharticles)) == 0)
321             p = line + strlen(innconf->patharticles) + 1;
322         else
323             p = line;
324
325         /* Open the file. */
326         if (IsToken(p)) {
327             token = TextToToken(p);
328             if ((art = SMretrieve(token, RETR_ALL)) == NULL) {
329                 if ((SMerrno != SMERR_NOENT) && (SMerrno != SMERR_UNINIT))
330                     warn("%s skipping %.40s: %s", Host, p, SMerrorstr);
331                 continue;
332             }
333             BytesInArt = -1;
334             artdata = FromWireFmt(art->data, art->len, (size_t *)&BytesInArt);
335             SMfreearticle(art);
336         } else {
337             warn("%s skipping %.40s: not token", Host, p);
338             continue;
339         }
340
341         /* Have an open article, do we need to open a batch?  This code
342          * is here (rather then up before the while loop) so that we
343          * can avoid sending an empty batch.  The goto makes the code
344          * a bit more clear. */
345         if (F == NULL) {
346             if (GotInterrupt) {
347                 RequeueAndExit(Cookie, (char *)NULL, 0L);
348             }
349             if ((F = BATCHstart()) == NULL) {
350                 syswarn("%s cannot start batch %d", Host, BATCHcount);
351                 break;
352             }
353             if (InitialString && *InitialString) {
354                 fprintf(F, "%s\n", InitialString);
355                 BytesInCB += strlen(InitialString) + 1;
356                 BytesWritten += strlen(InitialString) + 1;
357             }
358             goto SendIt;
359         }
360
361         /* We're writing a batch, see if adding the current article
362          * would exceed the limits. */
363         if ((ArtsInBatch > 0 && ArtsInCB + 1 >= ArtsInBatch)
364          || (BytesInBatch > 0 && BytesInCB + BytesInArt >= BytesInBatch)) {
365             if ((BATCHstatus = BATCHclose(F)) != 0) {
366                 if (BATCHstatus == -1)
367                     syswarn("%s cannot close batch %d", Host, BATCHcount);
368                 else
369                     syswarn("%s batch %d exit status %d", Host, BATCHcount,
370                             BATCHstatus);
371                 break;
372             }
373             ArtsInCB = 0;
374             BytesInCB = 0;
375
376             /* See if we can start a new batch. */
377             if ((MaxBatches > 0 && BATCHcount >= MaxBatches)
378              || (MaxBytes > 0 && BytesWritten + BytesInArt >= MaxBytes)
379              || (MaxArts > 0 && ArtsWritten + 1 >= MaxArts)) {
380                 break;
381             }
382
383             if (GotInterrupt) {
384                 RequeueAndExit(Cookie, (char *)NULL, 0L);
385             }
386
387             if ((F = BATCHstart()) == NULL) {
388                 syswarn("%s cannot start batch %d", Host, BATCHcount);
389                 break;
390             }
391         }
392
393     SendIt:
394         /* Now we can start to send the article! */
395         if (Separator && *Separator) {
396             snprintf(buff, sizeof(buff), Separator, BytesInArt);
397             BytesInCB += strlen(buff) + 1;
398             BytesWritten += strlen(buff) + 1;
399             if (fprintf(F, "%s\n", buff) == EOF || ferror(F)) {
400                 syswarn("%s cannot write separator", Host);
401                 break;
402             }
403         }
404
405         /* Write the article.  In case of interrupts, retry the read but not
406            the fwrite because we can't check that reliably and portably. */
407         if ((fwrite(artdata, 1, BytesInArt, F) != BytesInArt) || ferror(F))
408             break;
409
410         /* Update the counts. */
411         BytesInCB += BytesInArt;
412         BytesWritten += BytesInArt;
413         ArtsInCB++;
414         ArtsWritten++;
415
416         if (GotInterrupt) {
417             Cookie = -1;
418             BATCHstatus = BATCHclose(F);
419             RequeueAndExit(Cookie, line, BytesInArt);
420         }
421     }
422
423     if (BATCHopen)
424         BATCHstatus = BATCHclose(F);
425     RequeueAndExit(Cookie, NULL, 0);
426
427     return 0;
428 }