chiark / gitweb /
rename backlog_nextscan_periods to until_backlog_nextscan
[innduct.git] / innfeed / tape.c
1 /*  $Id: tape.c 6716 2004-05-16 20:26:56Z rra $
2 **
3 **  The implementation of the innfeed Tape class.
4 **
5 **  Written by James Brister <brister@vix.com>
6 **
7 **  The implementation of the Tape class. Tapes are read-only or write-only
8 **  files that are accessed sequentially. Their basic unit of i/o is an
9 **  Article. Tapes work out of a single directory and manage all file names
10 **  themselves.
11 **
12 **  Tapes will checkpoint themselves periodically so that when innfeed exits
13 **  or crashes things can restart close to where they were last. The period
14 **  checkpointing is handled entirely by the Tape class, but the checkpoint
15 **  period needs to be set by some external user before the first tape is
16 **  created.
17 */
18
19 #include "innfeed.h"
20 #include "config.h"
21 #include "clibrary.h"
22
23 #include <assert.h>
24 #include <ctype.h>
25 #include <errno.h>
26 #include <sys/param.h>
27 #include <sys/stat.h>
28 #include <syslog.h>
29
30 #if defined (HAVE_DIRENT_H)
31 #include <dirent.h>
32 typedef struct dirent DIRENTRY ;
33 #else
34 #include <sys/dir.h>
35 typedef struct direct DIRENTRY ;
36 #endif
37
38 #ifdef TIME_WITH_SYS_TIME
39 # include <sys/time.h>
40 # include <time.h>
41 #else
42 # ifdef HAVE_SYS_TIME_H
43 #  include <sys/time.h>
44 # else
45 #  include <time.h>
46 # endif
47 #endif
48
49 #include "inn/innconf.h"
50 #include "inn/messages.h"
51 #include "libinn.h"
52
53 #include "article.h"
54 #include "configfile.h"
55 #include "endpoint.h"
56 #include "tape.h"
57
58 extern char *dflTapeDir;
59 extern bool genHtml ;
60
61 #if 0
62 /* a structure for temporary storage of articles. */
63 typedef struct q_e_s
64 {
65     Article article ;
66     struct q_e_s *next ;
67 } *QueueElem ;
68 #endif
69
70 /* The Tape class type. */
71 struct tape_s
72 {
73     /* the pathname of the file the administrator can drop in by hand. */
74     char *handFilename ;
75
76     /* the pathname of the file the Tape will read from */
77     char *inputFilename ;
78
79     /* the pathname of the file the Tape will write to. */
80     char *outputFilename ;
81
82     /* the pathname of the file used in locking */
83     char *lockFilename ;
84
85     /* the peer we're doing this for. */
86     char *peerName ;
87     
88     FILE *inFp ;                /* input FILE */
89     FILE *outFp ;               /* output FILE */
90
91     time_t lastRotated ;        /* time files last got switched */
92     bool checkNew ;             /* set bool when we need to check for
93                                    hand-crafted file. */
94     
95 #if 0
96     /* the tape holds a small output queue in memory to avoid thrashing. */
97     QueueElem head ;            
98     QueueElem tail ;
99     unsigned int qLength ;             /* amount on the queue */
100 #endif
101     
102     long outputSize ;           /* the current size of the output file. */
103     long lossage ;
104
105     /* the number of bytes we try to keep the output under. We actually
106        wait for the outputSize to get 10% greater than this amount before
107        shrinking the file down again. A value of zero means no limit. */
108     long outputLowLimit ;
109     long outputHighLimit ;
110     double backlogFactor ;
111     
112     bool scribbled ;            /* have we scribbled a checkpoint value in
113                                    the file. */
114     long tellpos ;              /* for input file checkpointing. */
115     bool changed ;              /* true if tape was read since last
116                                    checkpoint or start. */
117
118     /* true if articles that are output are NOT later input. */
119     bool noRotate ;     
120
121     /* true if no articles should ever be spooled */
122     bool noBacklog ;
123 };
124
125
126 static void checkpointTape (Tape tape) ;
127 static void removeTapeGlobally (Tape tape) ;
128 static void addTapeGlobally (Tape tape) ;
129 static void prepareFiles (Tape tape) ;
130 static void tapeCkNewFileCbk (TimeoutId id, void *d) ;
131 static void tapeCheckpointCallback (TimeoutId id, void *d) ;
132 #if 0
133 static void flushTape (Tape tape) ;
134 #endif
135 static void tapesSetCheckNew (void) ;
136 static void initTape (Tape nt) ;
137 static void tapeCleanup (void) ;
138
139
140
141 /* pathname of directory we store tape files in. */
142 static char *tapeDirectory ;
143
144 /* the callback ID of the checkpoint timer callback. */
145 static TimeoutId checkPtId ;
146 static TimeoutId ckNewFileId ;
147
148 /* number of seconds between tape checkpoints. */
149 static unsigned int tapeCkPtPeriod ;
150 static unsigned int tapeCkNewFilePeriod ;
151
152 static time_t rotatePeriod = TAPE_ROTATE_PERIOD ;
153
154 /* global list of tapes so we can checkpoint them periodically */
155 static Tape *activeTapes ;
156
157 /* Size of the activeTapes array */
158 static size_t activeTapeSize ;
159
160 /* index of last element in activeTapes that's being used. */
161 static size_t activeTapeIdx ;
162
163 #if 0
164 /* default limit of the size of output tapes. */
165 static long defaultSizeLimit ;
166 #endif
167
168 unsigned int tapeHighwater ;
169
170 bool debugShrinking = false ;
171
172 extern bool talkToSelf ;        /* main.c */
173
174
175
176
177 /* callback when config file is loaded */
178 int tapeConfigLoadCbk (void *data)
179 {
180   int rval = 1 ;
181   long iv ;
182   int bv ;
183   FILE *fp = (FILE *) data ;
184   char *dir, *p ;
185
186   if (getString (topScope,"backlog-directory",&p,NO_INHERIT))
187     {
188       dir = buildFilename(innconf->pathspool, p);
189       free(p);
190       if (tapeDirectory != NULL && strcmp (tapeDirectory,dir) != 0)
191         {
192           warn ("ME config: cannot change backlog-directory of a running"
193                 " process") ;
194           free (dir) ;
195           dir = xstrdup (tapeDirectory) ;
196         }
197
198       if (!isDirectory (dir) && isDirectory (dflTapeDir))
199         {
200           logOrPrint (LOG_ERR,fp,
201                       "ME config: definition of backlog-directory (%s) is a"
202                       " non-existant directory. Using %s",
203                       dir,dflTapeDir) ;
204           free (dir) ;
205           dir = xstrdup (dflTapeDir) ;
206         }
207       else if (!isDirectory (dir))
208         logAndExit (1,"ME config: no usable value for backlog-directory") ;
209     }
210   else if (!isDirectory (dflTapeDir))
211   {
212     logAndExit (1,"ME config: no usable value for backlog-directory") ;
213     return -1; /* not reached */
214   }
215   else
216     dir = xstrdup (dflTapeDir) ;
217   
218   if (tapeDirectory != NULL)
219     free (tapeDirectory) ;
220   tapeDirectory = dir ;
221
222
223   
224   if (getInteger (topScope,"backlog-highwater",&iv,NO_INHERIT))
225     {
226       if (iv < 0)
227         {
228           rval = 0 ;
229           logOrPrint (LOG_ERR,fp,
230                       "ME config: value of %s (%ld) in %s cannot be less"
231                       " than 0. Using %ld","backlog-highwater",
232                       iv,"global scope",(long)TAPE_HIGHWATER);
233           iv = TAPE_HIGHWATER ;
234         }
235     }
236   else
237     iv = TAPE_HIGHWATER ;
238   tapeHighwater = (unsigned int) iv ;
239
240
241   
242   if (getInteger (topScope,"backlog-rotate-period",&iv,NO_INHERIT))
243     {
244       if (iv < 0)
245         {
246           rval = 0 ;
247           logOrPrint (LOG_ERR,fp,
248                       "ME config: value of %s (%ld) in %s cannot be less"
249                       " than 0. Using %ld",
250                       "backlog-rotate-period",
251                       iv,"global scope",(long)TAPE_ROTATE_PERIOD);
252           iv = TAPE_ROTATE_PERIOD ;
253         }
254     }
255   else
256     iv = TAPE_ROTATE_PERIOD ;
257   rotatePeriod = (unsigned int) iv ;
258
259
260   if (getInteger (topScope,"backlog-ckpt-period",&iv,NO_INHERIT))
261     {
262       if (iv < 0)
263         {
264           rval = 0 ;
265           logOrPrint (LOG_ERR,fp,
266                       "ME config: value of %s (%ld) in %s cannot be less"
267                       " than 0. Using %ld",
268                       "backlog-ckpt-period",iv,
269                       "global scope",(long)TAPE_CHECKPOINT_PERIOD);
270           iv = TAPE_CHECKPOINT_PERIOD ;
271         }
272     }
273   else
274     iv = TAPE_CHECKPOINT_PERIOD ;
275   tapeCkPtPeriod = (unsigned int) iv ;
276
277
278   if (getInteger (topScope,"backlog-newfile-period",&iv,NO_INHERIT))
279     {
280       if (iv < 0)
281         {
282           rval = 0 ;
283           logOrPrint (LOG_ERR,fp,
284                       "ME config: value of %s (%ld) in %s cannot be less"
285                       " than 0. Using %ld",
286                       "backlog-newfile-period",
287                       iv,"global scope",(long)TAPE_NEWFILE_PERIOD);
288           iv = TAPE_NEWFILE_PERIOD ;
289         }
290     }
291   else
292     iv = TAPE_NEWFILE_PERIOD ;
293   tapeCkNewFilePeriod = (unsigned int) iv ;
294
295
296   if (getBool (topScope,"debug-shrinking",&bv,NO_INHERIT))
297     debugShrinking = (bv ? true : false) ;
298   
299   return rval ;
300 }
301
302
303 /* Create a new Tape object. There are three potential files involved in
304    I/O. 'peerName' is what  the admin may have dropped in by
305    hand. 'peerName.input' is the file that was being used as input the last
306    time things were run. 'peerName.output' is the file that was being used
307    as output. The file 'peerName' is appended to 'peerName.input' (or
308    renamed if 'peerName.input' doesn't exist). Then 'peerName.output' is
309    appeneded (or renamed) to 'peerName.input' */
310
311 static bool inited = false ;
312 Tape newTape (const char *peerName, bool dontRotate)
313 {
314   Tape nt = xmalloc (sizeof(struct tape_s)) ;
315   size_t pLen = strlen (peerName) ;
316   size_t dLen = strlen (tapeDirectory) ;
317
318   if (!inited)
319     {
320       inited = true ;
321       atexit (tapeCleanup) ;
322     }
323   
324   ASSERT (nt != NULL) ;
325
326   if (endsIn (peerName,INPUT_TAIL))
327     die ("Sorry, can't have a peer name ending in \"%s\"",INPUT_TAIL) ;
328
329   if (endsIn (peerName,OUTPUT_TAIL))
330     die ("Sorry, can't have a peer name ending in \"%s\"",OUTPUT_TAIL) ;
331
332   if (endsIn (peerName,LOCK_TAIL))
333     die ("Sorry, can't have a peer name ending in \"%s\"",LOCK_TAIL) ;
334
335   nt->peerName = xstrdup (peerName) ;
336   
337   nt->handFilename = xmalloc (pLen + dLen + 2) ;
338   sprintf (nt->handFilename,"%s/%s",tapeDirectory,peerName) ;
339
340   nt->lockFilename = xmalloc (pLen + dLen + strlen(LOCK_TAIL) + 2) ;
341   sprintf (nt->lockFilename,"%s/%s%s",tapeDirectory,peerName,LOCK_TAIL) ;
342
343   nt->inputFilename = xmalloc (pLen + dLen + strlen(INPUT_TAIL) + 2) ;
344   sprintf (nt->inputFilename,"%s/%s%s",tapeDirectory,peerName,INPUT_TAIL) ;
345
346   nt->outputFilename = xmalloc (pLen + dLen + strlen(OUTPUT_TAIL) + 2) ;
347   sprintf (nt->outputFilename,"%s/%s%s",tapeDirectory,peerName,OUTPUT_TAIL) ;
348
349   if ( !lockFile (nt->lockFilename) )
350     {
351       warn ("ME lock failed for host: %s", nt->lockFilename) ;
352       
353       free (nt->handFilename) ;
354       free (nt->lockFilename) ;
355       free (nt->inputFilename) ;
356       free (nt->outputFilename) ;
357       free (nt) ;
358
359       return NULL ;
360     }
361
362   nt->noRotate = false ;        /* for first time prepare */
363   initTape (nt) ;
364   nt->noRotate = dontRotate ;
365
366   addTapeGlobally (nt) ;
367
368   if (checkPtId == 0 && tapeCkPtPeriod > 0)     /* only done once. */
369     checkPtId = prepareSleep (tapeCheckpointCallback,tapeCkPtPeriod,NULL);
370
371   if (ckNewFileId == 0 && tapeCkNewFilePeriod > 0)
372     ckNewFileId = prepareSleep (tapeCkNewFileCbk,tapeCkNewFilePeriod,NULL) ;
373
374   return nt ;
375 }
376
377 static void initTape (Tape nt)
378 {
379   value *peerVal = findPeer (nt->peerName) ;
380   scope *s = (peerVal == NULL ? NULL : peerVal->v.scope_val) ;
381
382   nt->inFp = NULL ;
383   nt->outFp = NULL ;
384
385   nt->lastRotated = 0 ;
386   nt->checkNew = false ;
387   
388 #if 0
389   nt->head = NULL ;
390   nt->tail = NULL ;
391   nt->qLength = 0 ;
392 #endif
393
394   nt->scribbled = false ;
395   nt->tellpos = 0 ;
396
397   nt->changed = false ;
398   
399   nt->outputSize = 0 ;
400   nt->lossage = 0 ;
401
402   nt->noBacklog = false ;
403   nt->backlogFactor = 0.0 ;
404   nt->outputLowLimit = 0 ;
405   nt->outputHighLimit = 0 ;
406
407   if (!talkToSelf)
408     {
409       int bval ;
410
411       if (getBool (s, "no-backlog", &bval, INHERIT))
412         nt->noBacklog = (bval ? true : false);
413       else
414         nt->noBacklog = TAPE_DISABLE;
415
416       if (getInteger (s,"backlog-limit",&nt->outputLowLimit,INHERIT))
417         {
418           if (!getReal (s,"backlog-factor",&nt->backlogFactor,INHERIT))
419             {
420               if (!getInteger (s,"backlog-limit-highwater",
421                                &nt->outputHighLimit,INHERIT))
422                 {
423                   warn ("%s no backlog-factor or backlog-high-limit",
424                         nt->peerName) ;
425                   nt->outputLowLimit = 0 ;
426                   nt->outputHighLimit = 0 ;
427                   nt->backlogFactor = 0.0 ;
428                 }
429             }
430           else
431             nt->outputHighLimit = (long)(nt->outputLowLimit * nt->backlogFactor);
432         }
433       else
434         warn ("ME config: no definition for required key backlog-limit") ;
435     }
436   
437   d_printf (1, "%s spooling: %s\n", nt->peerName, 
438            nt->noBacklog ? "disabled" : "enabled");
439
440   d_printf (1,"%s tape backlog limit: [%ld %ld]\n",nt->peerName,
441            nt->outputLowLimit,
442            nt->outputHighLimit) ;
443   
444   prepareFiles (nt) ;
445 }
446
447
448 void gFlushTapes (void)
449 {
450   unsigned int i ;
451
452   notice ("ME flushing tapes") ;
453   for (i = 0 ; i < activeTapeIdx ; i++)
454     tapeFlush (activeTapes [i]) ;
455 }
456
457
458
459 /* close the input and output tapes and reinitialize everything in the
460   tape. */
461 void tapeFlush (Tape tape)
462 {
463   if (tape->inFp != NULL)
464     {
465       checkpointTape (tape) ;
466       fclose (tape->inFp) ;
467     }
468
469   if (tape->outFp != NULL)
470     fclose (tape->outFp) ;
471
472   initTape (tape) ;
473 }
474
475
476
477 void gPrintTapeInfo (FILE *fp, unsigned int indentAmt)
478 {
479   char indent [INDENT_BUFFER_SIZE] ;
480   unsigned int i ;
481   
482   for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
483     indent [i] = ' ' ;
484   indent [i] = '\0' ;
485
486   fprintf (fp,"%sGlobal Tape List : (count %lu) {\n",
487            indent,(unsigned long) activeTapeIdx) ;
488
489   for (i = 0 ; i < activeTapeIdx ; i++)
490     printTapeInfo (activeTapes [i],fp,indentAmt + INDENT_INCR) ;
491   fprintf (fp,"%s}\n",indent) ;
492 }
493
494
495 void tapeLogGlobalStatus (FILE *fp)
496 {
497   fprintf (fp,"%sBacklog file global values:%s\n",
498            genHtml ? "<B>" : "", genHtml ? "</B>" : "") ;
499   fprintf (fp,"        directory: %s\n",tapeDirectory) ;
500   fprintf (fp,"    rotate period: %-3ld seconds\n",(long) rotatePeriod) ;
501   fprintf (fp,"checkpoint period: %-3ld seconds\n",(long) tapeCkPtPeriod) ;
502   fprintf (fp,"   newfile period: %-3ld seconds\n",(long) tapeCkNewFilePeriod);
503   fprintf (fp,"\n") ;
504 }
505
506
507 void tapeLogStatus (Tape tape, FILE *fp)
508 {
509   if (tape == NULL)
510     fprintf (fp,"(no tape)\n") ;
511   else if (tape->noBacklog)
512       fprintf (fp,"                 spooling: DISABLED\n");
513   else if (tape->outputLowLimit == 0)
514       fprintf (fp,"                 spooling: UNLIMITED\n");
515   else 
516     {
517       fprintf (fp,"                 backlog low limit: %ld\n",
518                tape->outputLowLimit) ;
519       fprintf (fp,"               backlog upper limit: %ld",
520                tape->outputHighLimit) ;
521       if (tape->backlogFactor > 0.0)
522         fprintf (fp," (factor %1.2f)",tape->backlogFactor) ;
523       fputc ('\n',fp) ;
524       fprintf (fp,"                 backlog shrinkage: ") ;
525       fprintf (fp,"%ld bytes (from current file)\n",tape->lossage) ;
526     }
527 }
528
529 void printTapeInfo (Tape tape, FILE *fp, unsigned int indentAmt)
530 {
531   char indent [INDENT_BUFFER_SIZE] ;
532   unsigned int i ;
533 #if 0
534   QueueElem qe ;
535 #endif
536   
537   for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
538     indent [i] = ' ' ;
539   indent [i] = '\0' ;
540
541   fprintf (fp,"%sTape : %p {\n",indent,(void *) tape) ;
542
543   if (tape == NULL)
544     {
545       fprintf (fp,"%s}\n",indent) ;
546       return ;
547     }
548   
549   fprintf (fp,"%s    master-file : %s\n", indent, tape->handFilename) ;
550   fprintf (fp,"%s    input-file : %s\n", indent, tape->inputFilename) ;
551   fprintf (fp,"%s    output-file : %s\n",indent, tape->outputFilename) ;
552   fprintf (fp,"%s    lock-file : %s\n",indent, tape->lockFilename) ;
553   fprintf (fp,"%s    peerName : %s\n",indent,tape->peerName) ;
554   fprintf (fp,"%s    input-FILE : %p\n",indent, (void *) tape->inFp) ;
555   fprintf (fp,"%s    output-FILE : %p\n",indent, (void *) tape->outFp) ;
556   fprintf (fp,"%s    output-limit : %ld\n",indent, tape->outputLowLimit) ;
557
558 #if 0
559   fprintf (fp,"%s    in-memory article queue (length  %d) {\n",indent,
560            tape->qLength) ;
561
562   for (qe = tape->head ; qe != NULL ; qe = qe->next)
563     {
564 #if 0
565       printArticleInfo (qe->article,fp,indentAmt + INDENT_INCR) ;
566 #else
567       fprintf (fp,"%s    %p\n",indent,qe->article) ;
568 #endif
569     }
570
571   fprintf (fp,"%s    }\n",indent) ;
572 #endif
573
574   fprintf (fp,"%s    tell-position : %ld\n",indent,(long) tape->tellpos) ;
575   fprintf (fp,"%s    input-FILE-changed : %s\n",indent,
576            boolToString (tape->changed)) ;
577
578   fprintf (fp,"%s    no-rotate : %s\n",indent, boolToString (tape->noRotate));
579   
580   fprintf (fp,"%s}\n",indent) ;
581 }
582
583
584
585
586 /* delete the tape. Spools the in-memory articles to disk. */
587 void delTape (Tape tape)
588 {
589   struct stat st ;
590   
591   if (tape == NULL)
592     return ;
593
594 #if 1
595   
596   if (tape->outFp != NULL && fclose (tape->outFp) != 0)
597     syswarn ("ME ioerr fclose %s", tape->outputFilename) ;
598
599   if (stat(tape->outputFilename, &st) == 0 && st.st_size == 0)
600     {
601       d_printf (1,"removing empty output tape: %s\n",tape->outputFilename) ;
602       unlink (tape->outputFilename) ;
603     }
604   
605   tape->outFp = NULL ;
606   tape->outputSize = 0 ;
607
608 #else
609   
610   tapeClose (tape) ;
611
612 #endif
613
614   if (tape->inFp != NULL)
615     {
616       checkpointTape (tape) ;
617       fclose (tape->inFp) ;
618     }
619
620   unlockFile (tape->lockFilename) ;
621
622   freeCharP (tape->handFilename) ;
623   freeCharP (tape->inputFilename) ;
624   freeCharP (tape->outputFilename) ;
625   freeCharP (tape->lockFilename) ;
626   freeCharP (tape->peerName) ;
627              
628   removeTapeGlobally (tape) ;
629   
630   free (tape) ;
631 }
632
633
634 void tapeTakeArticle (Tape tape, Article article)
635 {
636 #if 0
637   QueueElem elem ;
638 #endif
639   int amt ;
640   const char *fname, *msgid ;
641
642   ASSERT (tape != NULL) ;
643
644   /* return immediately if spooling disabled - jgarzik */
645   if (tape->noBacklog)
646     {
647       delArticle (article) ;
648       return;
649     }
650
651   fname = artFileName (article) ;
652   msgid = artMsgId (article) ;
653   amt = fprintf (tape->outFp,"%s %s\n", fname, msgid) ;
654
655   /* I'd rather know where I am each time, and I don't trust all
656     fprintf's to give me character counts. */
657 #if defined (TRUST_FPRINTF)
658
659   tape->outputSize += amt ;
660
661 #else
662 #if defined (NO_TRUST_STRLEN)
663
664   tape->outputSize = ftello (tape->outFp) ;
665
666 #else
667
668   tape->outputSize += strlen(fname) + strlen(msgid) + 2 ; /* " " + "\n" */
669
670 #endif
671 #endif
672   
673   delArticle (article) ;
674
675   if (debugShrinking)
676     {
677       struct stat sb ;
678
679       fflush (tape->outFp) ;
680       
681       if (fstat (fileno (tape->outFp),&sb) != 0)
682         syswarn ("ME oserr fstat %s",tape->outputFilename) ;
683       else if (sb.st_size != tape->outputSize) 
684         syslog (LOG_ERR,"fstat and ftello do not agree: %ld %ld for %s\n",
685                 (long)sb.st_size,tape->outputSize,tape->outputFilename) ;
686     }
687   
688   if (tape->outputHighLimit > 0 && tape->outputSize >= tape->outputHighLimit)
689     {
690       long oldSize = tape->outputSize ;
691       shrinkfile (tape->outFp,tape->outputLowLimit,tape->outputFilename,"a+");
692       tape->outputSize = ftello (tape->outFp) ;
693       tape->lossage += oldSize - tape->outputSize ;
694     }
695 }
696
697
698 /* Pick an article off a tape and return it. NULL is returned if there
699    are no more articles. */
700 Article getArticle (Tape tape)
701 {
702   char line [2048] ;            /* ick. 1024 for filename + 1024 for msgid */
703   char *p, *q ;
704   char *msgid, *filename ;
705   Article art = NULL ;
706   time_t now = theTime() ;
707
708   ASSERT (tape != NULL) ;
709
710   if (tape->inFp == NULL && (now - tape->lastRotated) > rotatePeriod)
711     prepareFiles (tape) ;       /* will flush queue too. */
712
713   while (tape->inFp != NULL && art == NULL)
714     {
715       tape->changed = true ;
716       
717       if (fgets (line,sizeof (line), tape->inFp) == NULL)
718         {
719           if (ferror (tape->inFp))
720             syswarn ("ME ioerr on tape file %s", tape->inputFilename) ;
721           else if ( !feof (tape->inFp) )
722             syswarn ("ME oserr fgets %s", tape->inputFilename) ;
723           
724           if (fclose (tape->inFp) != 0)
725             syswarn ("ME ioerr fclose %s", tape->inputFilename) ;
726
727           d_printf (1,"No more articles on tape %s\n",tape->inputFilename) ;
728
729           tape->inFp = NULL ;
730           tape->scribbled = false ;
731
732           unlink (tape->inputFilename) ;
733
734           if ((now - tape->lastRotated) > rotatePeriod)
735             prepareFiles (tape) ; /* rotate files to try next. */
736         }
737       else
738         {
739           msgid = filename = NULL ;
740           
741           for (p = line ; *p && CTYPE(isspace, *p) ; p++) /* eat whitespace */
742             /* nada */ ;
743
744           if (*p != '\0')
745             {
746               q = strchr (p,' ') ;
747
748               if (q != NULL)
749                 {
750                   filename = p ;
751                   *q = '\0' ;
752       
753                   for (q++ ; *q && CTYPE(isspace, *q) ; q++)
754                     /* nada */ ;
755
756                   if (*q != '\0')
757                     {
758                       if (((p = strchr (q, ' ')) != NULL) ||
759                           ((p = strchr (q, '\n')) != NULL))
760                         *p = '\0' ;
761
762                       if (p != NULL)
763                         msgid = q ;
764                       else
765                         filename = NULL ; /* no trailing newline or blank */
766                     }
767                   else
768                     filename = NULL ; /* line had one field and some blanks */
769                 }
770               else
771                 filename = NULL ; /* line only had one field */
772             }
773
774           /* See if message ID looks valid. */
775           if (msgid) {
776             for (p = msgid; *p; p++)
777               ;
778             if (p > msgid) p--;
779             if (*msgid != '<' || *p != '>') {
780               warn ("ME tape invalid messageID in %s: %s",
781                     tape->inputFilename,msgid);
782               msgid = NULL;
783             }
784           }
785
786           if (filename != NULL && msgid != NULL)
787             art = newArticle (filename, msgid) ;
788
789           /* art may be NULL here if the file is no longer valid. */
790         }
791     }
792   
793 #if 0
794   /* now we either have an article or there is no more on disk */
795   if (art == NULL)
796     {
797       if (tape->inFp != NULL && ((c = fgetc (tape->inFp)) != EOF))
798         ungetc (c,tape->inFp) ; /* shouldn't happen */
799       else if (tape->inFp != NULL)
800         {
801           /* last article read was the end of the tape. */
802           if (fclose (tape->inFp) != 0)
803             syswarn ("ME ioerr fclose %s", tape->inputFilename) ;
804           
805           tape->inFp = NULL ;
806           tape->scribbled = false ;
807           
808           /* toss out the old input file and prepare the new one */
809           unlink (tape->inputFilename) ;
810           
811           if (now - tape->lastRotated > rotatePeriod)
812             prepareFiles (tape) ;
813         }
814     }
815 #endif
816   
817   if (art == NULL)
818     d_printf (2,"%s All out of articles in the backlog\n", tape->peerName) ;
819   else
820     d_printf (2,"%s Peeled article %s from backlog\n",tape->peerName,
821              artMsgId (art)) ;
822       
823   return art ;
824 }
825
826
827 /****************************************************/
828 /**                  CLASS FUNCTIONS               **/
829 /****************************************************/
830
831 /* Cause all the Tapes to checkpoint themselves. */
832 void checkPointTapes (void)
833 {
834   unsigned int i ;
835
836   for (i = 0 ; i < activeTapeIdx ; i++)
837     checkpointTape (activeTapes [i]) ;
838 }
839
840
841 /* make all the tapes set their checkNew flag. */
842 static void tapesSetCheckNew (void) 
843 {
844   unsigned int i ;
845
846   for (i = 0 ; i < activeTapeIdx ; i++)
847     activeTapes[i]->checkNew = true ;
848 }
849
850
851 #if 0
852 /* Set the pathname of the directory for storing tapes in. */
853 void setTapeDirectory (const char *newDir)
854 {
855   /* the activeTape variable gets set when the first Tape object
856      is created */
857   if (activeTapes != NULL)
858     {
859       syslog (LOG_CRIT,"Resetting backlog directory") ;
860       abort() ;
861     }
862   
863   if (tapeDirectory != NULL)
864     freeCharP (tapeDirectory) ;
865   
866   tapeDirectory = xstrdup (newDir) ;
867
868   addPointerFreedOnExit (tapeDirectory) ;
869 }
870 #endif
871
872 /* Get the pathname of the directory tapes are stored in. */
873 const char *getTapeDirectory (void)
874 {
875   ASSERT (tapeDirectory != NULL) ;
876
877 #if 0
878   if (tapeDirectory == NULL)
879     {
880       tapeDirectory = xstrdup (dflTapeDir) ;
881       addPointerFreedOnExit (tapeDirectory) ;
882     }
883 #endif
884   
885   return tapeDirectory ;
886 }
887
888
889 #if 0
890 void setOutputSizeLimit (long val)
891 {
892   defaultSizeLimit = val ;
893 }
894 #endif
895
896
897
898
899
900
901 /**********************************************************************/
902 /*                          PRIVATE FUNCTIONS                         */
903 /**********************************************************************/
904
905
906 /* Add a new tape to the class-level list of active tapes.  */
907 static void addTapeGlobally (Tape tape)
908 {
909   ASSERT (tape != NULL) ;
910   
911   if (activeTapeSize == activeTapeIdx)
912     {
913       unsigned int i ;
914       
915       activeTapeSize += 10 ;
916       if (activeTapes != NULL)
917         activeTapes = xrealloc (activeTapes, sizeof(Tape) * activeTapeSize) ;
918       else
919         activeTapes = xmalloc (sizeof(Tape) * activeTapeSize) ;
920
921       for (i = activeTapeIdx ; i < activeTapeSize ; i++)
922         activeTapes [i] = NULL ;
923     }
924   activeTapes [activeTapeIdx++] = tape ;
925 }
926
927
928 /* Remove a tape for the class-level list of active tapes. */
929 static void removeTapeGlobally (Tape tape)
930 {
931   unsigned int i ;
932
933   if (tape == NULL)
934     return ;
935   
936   ASSERT (activeTapeIdx > 0) ;
937   
938   for (i = 0 ; i < activeTapeIdx ; i++)
939     if (activeTapes [i] == tape)
940       break ;
941
942   ASSERT (i < activeTapeIdx) ;
943
944   for ( ; i < (activeTapeIdx - 1) ; i++)
945     activeTapes [i] = activeTapes [i + 1] ;
946
947   activeTapes [--activeTapeIdx] = NULL ;
948
949   if (activeTapeIdx == 0)
950     {
951       free (activeTapes) ;
952       activeTapes = NULL ;
953     }
954 }
955
956
957 /* Have a tape checkpoint itself so that next process can pick up where
958    this one left off. */
959 static void checkpointTape (Tape tape)
960 {
961   if (tape->inFp == NULL)       /* no input file being read. */
962     return ;
963
964   if (!tape->changed)   /* haven't read since last checkpoint */
965     {
966       d_printf (1,"Not checkpointing unchanged tape: %s\n", tape->peerName) ;
967       return ;
968     }
969   
970   if ((tape->tellpos = ftello (tape->inFp)) < 0)
971     {
972       syswarn ("ME oserr ftello %s", tape->inputFilename) ;
973       return ;
974     }
975
976   /* strlen of "18446744073709551616\n" (2^64) */
977 #define BITS64 21
978
979   /* make sure we're not right at the beginning of the file so we can write. */
980   if (tape->tellpos > BITS64)
981     {
982       rewind (tape->inFp) ;
983
984       /* scribble blanks over the first lines characters */
985       if (!tape->scribbled) 
986         {
987           int currloc = 0 ;
988           int c ;
989
990           while ((c = fgetc (tape->inFp)) != '\n' || currloc <= BITS64)
991             if (c == EOF)
992               return ;
993             else
994               currloc++ ;
995
996           rewind (tape->inFp) ;
997           
998           while (currloc-- > 0)
999             fputc (' ',tape->inFp) ;
1000
1001           rewind (tape->inFp) ;
1002
1003           fflush (tape->inFp) ;
1004           tape->scribbled = true ;
1005         }
1006       
1007       fprintf (tape->inFp,"%ld",tape->tellpos) ;
1008
1009       if (fseeko (tape->inFp,tape->tellpos,SEEK_SET) != 0)
1010         syswarn ("ME oserr fseeko(%s,%ld,SEEK_SET)",tape->inputFilename,
1011                  tape->tellpos) ;
1012     }
1013   
1014   tape->changed = false ;
1015 }
1016
1017
1018
1019 /* Prepare the tape file(s) for input and output */
1020
1021 /* For a given Tape there are
1022  * three possible files: PEER.input PEER and
1023  * PEER.output. PEER.input and PEER.output are private to
1024  * innfeed. PEER is where a sysadmin can drop a file that (s)he
1025  * wants to inject into the process. If the first line of the input file
1026  * contains only an integer (possibly surrounded by spaces), then this is
1027  * taken to be the position to seek to before reading.
1028  *
1029  * prepareFiles will process them in a manner much like the following shell
1030  * commands:
1031  *
1032  * if [ ! -f PEER.input ]; then
1033  *      if [ -f PEER ]; then mv PEER PEER.input
1034  *      elif [ -f PEER.output ]; then mv PEER.output PEER; fi
1035  * fi
1036  *
1037  * At this point PEER.input is opened for reading if it exists.
1038  *
1039  * The checkpoint file is left as-is unless the PEER.input file
1040  * happens to be newer that the checkpoint file.
1041  */
1042 static void prepareFiles (Tape tape)
1043 {
1044   bool inpExists ;
1045   bool outExists ;
1046   bool newExists ;
1047
1048 #if 0
1049   /* flush any in memory articles to disk */
1050   if (tape->head != NULL && tape->outFp != NULL)
1051     flushTape(tape) ;
1052 #endif
1053
1054   tape->tellpos = 0 ;
1055
1056   /*  First time through, or something external has set checkNew */
1057   if (tape->lastRotated == 0 || tape->checkNew)
1058     {
1059       newExists = fileExistsP (tape->handFilename) ;
1060       if (newExists)
1061         notice ("%s new hand-prepared backlog file", tape->peerName) ;
1062     }
1063   else
1064     newExists = false ;
1065   
1066   if (tape->lastRotated == 0)    /* first time here */
1067     {
1068       inpExists = fileExistsP (tape->inputFilename) ;
1069       outExists = fileExistsP (tape->outputFilename) ;
1070     }
1071   else
1072     {
1073       inpExists = (tape->inFp != NULL) ? true : false ; /* can this ever be true?? */
1074       outExists = (tape->outFp != NULL && tape->outputSize > 0) ? true : false ;
1075     }
1076   
1077
1078   /* move the hand-dropped file to the input file if needed. */
1079   if (newExists && !inpExists)
1080     {
1081       if (rename (tape->handFilename,tape->inputFilename) != 0)
1082         syswarn ("ME oserr rename %s, %s", tape->handFilename,
1083                  tape->inputFilename);
1084       else
1085         {
1086           notice ("%s grabbing external tape file", tape->peerName) ;
1087           inpExists = true ;
1088         }
1089     }
1090   
1091   /* now move the output file to the input file, if needed and only if in
1092      not in NOROTATE mode. */
1093   if (outExists && !inpExists && !tape->noRotate)
1094     {
1095       if (tape->outFp != NULL)
1096         {
1097           fclose (tape->outFp) ;
1098           tape->outFp = NULL ;
1099         }
1100       
1101       if (rename (tape->outputFilename,tape->inputFilename) != 0)
1102         syswarn ("ME oserr rename %s, %s", tape->outputFilename,
1103                  tape->inputFilename) ;
1104       else
1105         inpExists = true ;
1106
1107       outExists = false ;
1108     }
1109
1110   /* now open up the input file and seek to the proper position. */
1111   if (inpExists)
1112     {
1113       int c ;
1114       long flength ;
1115       
1116       if ((tape->inFp = fopen (tape->inputFilename,"r+")) == NULL)
1117         syswarn ("ME fopen %s", tape->inputFilename) ;
1118       else
1119         {
1120           char buffer [64] ;
1121
1122           if (fgets (buffer,sizeof (buffer) - 1, tape->inFp) == NULL)
1123             {
1124               if (feof (tape->inFp))
1125                 {
1126                   d_printf (1,"Empty input file: %s\n",tape->inputFilename) ;
1127                   unlink (tape->inputFilename) ;
1128                 }
1129               else
1130                 syswarn ("ME oserr fgets %s", tape->inputFilename) ;
1131               
1132               fclose (tape->inFp) ;
1133               tape->inFp = NULL ;
1134               tape->scribbled = false ;
1135             }
1136           else
1137             {
1138               unsigned int len = strlen (buffer) ;
1139               long newPos = 0 ;
1140
1141               if (len > 0 && buffer [len - 1] == '\n')
1142                 buffer [--len] = '\0' ;
1143
1144               if (len > 0 && strspn (buffer,"0123456789 \n") == len)
1145                 {
1146                   if (sscanf (buffer,"%ld",&newPos) == 1)
1147                     {
1148                       tape->scribbled = true ;
1149                       tape->tellpos = newPos ;
1150                     }
1151                 }
1152
1153               if ((flength = fileLength (fileno (tape->inFp))) < tape->tellpos)
1154                 {
1155                   warn ("ME tape short: %s %ld %ld", tape->inputFilename,
1156                         flength, tape->tellpos) ;
1157                   tape->tellpos = 0 ;
1158                 }
1159               else if (tape->tellpos == 0)
1160                 rewind (tape->inFp) ;
1161               else if (fseeko (tape->inFp,tape->tellpos - 1,SEEK_SET) != 0)
1162                 syswarn ("ME oserr fseeko(%s,%ld,SEEK_SET)",
1163                          tape->inputFilename,tape->tellpos) ;
1164               else if ((c = fgetc (tape->inFp)) != '\n')
1165                 {
1166                   while (c != EOF && c != '\n')
1167                     c = fgetc (tape->inFp) ;
1168                   
1169                   if (c == EOF)
1170                     {
1171                       fclose (tape->inFp) ;
1172                       unlink (tape->inputFilename) ;
1173                       tape->inFp = NULL ;
1174                       tape->scribbled = false ;
1175                       prepareFiles (tape) ;
1176                     }
1177                   else
1178                     {
1179                       long oldPos = tape->tellpos ;
1180                       
1181                       tape->changed = true ;
1182                       checkpointTape (tape) ;
1183
1184                       warn ("ME internal checkpoint line boundary missed:"
1185                             " %s %ld vs. %ld",tape->inputFilename,
1186                               tape->tellpos, oldPos) ;
1187                     }
1188                 }
1189             }
1190         }
1191     }
1192   
1193   tape->lastRotated = theTime() ;
1194   tape->checkNew = false ;
1195
1196   /* now open up the output file. */
1197   if (tape->outFp == NULL)
1198     {
1199       if ((tape->outFp = fopen (tape->outputFilename,"a+")) == NULL)
1200         {
1201           syswarn ("ME tape open failed (a+) %s", tape->outputFilename) ;
1202           return ;
1203         }
1204       fseeko (tape->outFp,0,SEEK_END) ;
1205       tape->outputSize = ftello (tape->outFp) ;
1206       tape->lossage = 0 ;
1207     }
1208 }
1209
1210
1211 static void tapeCkNewFileCbk (TimeoutId id, void *d UNUSED)
1212 {
1213   ASSERT (id == ckNewFileId) ;
1214   ASSERT (tapeCkNewFilePeriod > 0) ;
1215
1216   tapesSetCheckNew () ;
1217
1218   ckNewFileId = prepareSleep (tapeCkNewFileCbk,tapeCkNewFilePeriod,NULL) ;
1219 }
1220
1221
1222 /* The timer callback function that will checkpoint all the active tapes. */
1223 static void tapeCheckpointCallback (TimeoutId id, void *d UNUSED)
1224 {
1225   ASSERT (id == checkPtId) ;
1226   ASSERT (tapeCkPtPeriod > 0) ;
1227   
1228   d_printf (1,"Checkpointing tapes\n") ;
1229   
1230   checkPointTapes () ;
1231
1232   checkPtId = prepareSleep (tapeCheckpointCallback,tapeCkPtPeriod,NULL) ;
1233 }
1234
1235
1236
1237 #if 0
1238 static void flushTape (Tape tape)
1239 {
1240   QueueElem elem ;
1241
1242   /* flush out queue to disk. */
1243   elem = tape->head ;
1244   while (elem != NULL)
1245     {
1246       tape->head = tape->head->next ;
1247       fprintf (tape->outFp,"%s %s\n", artFileName (elem->article),
1248                artMsgId (elem->article)) ;
1249       
1250       delArticle (elem->article) ;
1251       
1252       free (elem) ;
1253       elem = tape->head ;
1254     }
1255   tape->tail = NULL ;
1256   tape->qLength = 0;
1257       
1258
1259   /* I'd rather know where I am each time, and I don't trust all
1260      fprintf's to give me character counts. */
1261   tape->outputSize = ftello (tape->outFp) ;
1262   if (debugShrinking)
1263     {
1264       struct stat buf ;
1265       static bool logged = false ;
1266       
1267       fflush (tape->outFp) ;
1268       if (fstat (fileno (tape->outFp),&buf) != 0 && !logged)
1269         {
1270           syslog (LOG_ERR,FSTAT_FAILURE,tape->outputFilename) ;
1271           logged = true ;
1272         }
1273       else if (buf.st_size != tape->outputSize)
1274         {
1275           warn ("ME fstat and ftello do not agree for %s",
1276                 tape->outputFilename) ;
1277           logged = true ;
1278         }
1279     }
1280   
1281   if (tape->outputHighLimit > 0 && tape->outputSize > tape->outputHighLimit)
1282     {
1283       shrinkfile (tape->outFp,tape->outputLowLimit,tape->outputFilename,"a+");
1284       tape->outputSize = ftello (tape->outFp) ;
1285     }
1286 }
1287 #endif
1288
1289
1290 static void tapeCleanup (void)
1291 {
1292   free (tapeDirectory) ;
1293   tapeDirectory = NULL ;
1294 }