1 /* $Id: tape.c 6716 2004-05-16 20:26:56Z rra $
3 ** The implementation of the innfeed Tape class.
5 ** Written by James Brister <brister@vix.com>
7 ** The implementation of the Tape class. Tapes are read-only or write-only
8 ** files that are accessed sequentially. Their basic unit of i/o is an
9 ** Article. Tapes work out of a single directory and manage all file names
12 ** Tapes will checkpoint themselves periodically so that when innfeed exits
13 ** or crashes things can restart close to where they were last. The period
14 ** checkpointing is handled entirely by the Tape class, but the checkpoint
15 ** period needs to be set by some external user before the first tape is
26 #include <sys/param.h>
30 #if defined (HAVE_DIRENT_H)
32 typedef struct dirent DIRENTRY ;
35 typedef struct direct DIRENTRY ;
38 #ifdef TIME_WITH_SYS_TIME
39 # include <sys/time.h>
42 # ifdef HAVE_SYS_TIME_H
43 # include <sys/time.h>
49 #include "inn/innconf.h"
50 #include "inn/messages.h"
54 #include "configfile.h"
58 extern char *dflTapeDir;
62 /* a structure for temporary storage of articles. */
70 /* The Tape class type. */
73 /* the pathname of the file the administrator can drop in by hand. */
76 /* the pathname of the file the Tape will read from */
79 /* the pathname of the file the Tape will write to. */
80 char *outputFilename ;
82 /* the pathname of the file used in locking */
85 /* the peer we're doing this for. */
88 FILE *inFp ; /* input FILE */
89 FILE *outFp ; /* output FILE */
91 time_t lastRotated ; /* time files last got switched */
92 bool checkNew ; /* set bool when we need to check for
96 /* the tape holds a small output queue in memory to avoid thrashing. */
99 unsigned int qLength ; /* amount on the queue */
102 long outputSize ; /* the current size of the output file. */
105 /* the number of bytes we try to keep the output under. We actually
106 wait for the outputSize to get 10% greater than this amount before
107 shrinking the file down again. A value of zero means no limit. */
108 long outputLowLimit ;
109 long outputHighLimit ;
110 double backlogFactor ;
112 bool scribbled ; /* have we scribbled a checkpoint value in
114 long tellpos ; /* for input file checkpointing. */
115 bool changed ; /* true if tape was read since last
116 checkpoint or start. */
118 /* true if articles that are output are NOT later input. */
121 /* true if no articles should ever be spooled */
126 static void checkpointTape (Tape tape) ;
127 static void removeTapeGlobally (Tape tape) ;
128 static void addTapeGlobally (Tape tape) ;
129 static void prepareFiles (Tape tape) ;
130 static void tapeCkNewFileCbk (TimeoutId id, void *d) ;
131 static void tapeCheckpointCallback (TimeoutId id, void *d) ;
133 static void flushTape (Tape tape) ;
135 static void tapesSetCheckNew (void) ;
136 static void initTape (Tape nt) ;
137 static void tapeCleanup (void) ;
141 /* pathname of directory we store tape files in. */
142 static char *tapeDirectory ;
144 /* the callback ID of the checkpoint timer callback. */
145 static TimeoutId checkPtId ;
146 static TimeoutId ckNewFileId ;
148 /* number of seconds between tape checkpoints. */
149 static unsigned int tapeCkPtPeriod ;
150 static unsigned int tapeCkNewFilePeriod ;
152 static time_t rotatePeriod = TAPE_ROTATE_PERIOD ;
154 /* global list of tapes so we can checkpoint them periodically */
155 static Tape *activeTapes ;
157 /* Size of the activeTapes array */
158 static size_t activeTapeSize ;
160 /* index of last element in activeTapes that's being used. */
161 static size_t activeTapeIdx ;
164 /* default limit of the size of output tapes. */
165 static long defaultSizeLimit ;
168 unsigned int tapeHighwater ;
170 bool debugShrinking = false ;
172 extern bool talkToSelf ; /* main.c */
177 /* callback when config file is loaded */
178 int tapeConfigLoadCbk (void *data)
183 FILE *fp = (FILE *) data ;
186 if (getString (topScope,"backlog-directory",&p,NO_INHERIT))
188 dir = buildFilename(innconf->pathspool, p);
190 if (tapeDirectory != NULL && strcmp (tapeDirectory,dir) != 0)
192 warn ("ME config: cannot change backlog-directory of a running"
195 dir = xstrdup (tapeDirectory) ;
198 if (!isDirectory (dir) && isDirectory (dflTapeDir))
200 logOrPrint (LOG_ERR,fp,
201 "ME config: definition of backlog-directory (%s) is a"
202 " non-existant directory. Using %s",
205 dir = xstrdup (dflTapeDir) ;
207 else if (!isDirectory (dir))
208 logAndExit (1,"ME config: no usable value for backlog-directory") ;
210 else if (!isDirectory (dflTapeDir))
212 logAndExit (1,"ME config: no usable value for backlog-directory") ;
213 return -1; /* not reached */
216 dir = xstrdup (dflTapeDir) ;
218 if (tapeDirectory != NULL)
219 free (tapeDirectory) ;
220 tapeDirectory = dir ;
224 if (getInteger (topScope,"backlog-highwater",&iv,NO_INHERIT))
229 logOrPrint (LOG_ERR,fp,
230 "ME config: value of %s (%ld) in %s cannot be less"
231 " than 0. Using %ld","backlog-highwater",
232 iv,"global scope",(long)TAPE_HIGHWATER);
233 iv = TAPE_HIGHWATER ;
237 iv = TAPE_HIGHWATER ;
238 tapeHighwater = (unsigned int) iv ;
242 if (getInteger (topScope,"backlog-rotate-period",&iv,NO_INHERIT))
247 logOrPrint (LOG_ERR,fp,
248 "ME config: value of %s (%ld) in %s cannot be less"
249 " than 0. Using %ld",
250 "backlog-rotate-period",
251 iv,"global scope",(long)TAPE_ROTATE_PERIOD);
252 iv = TAPE_ROTATE_PERIOD ;
256 iv = TAPE_ROTATE_PERIOD ;
257 rotatePeriod = (unsigned int) iv ;
260 if (getInteger (topScope,"backlog-ckpt-period",&iv,NO_INHERIT))
265 logOrPrint (LOG_ERR,fp,
266 "ME config: value of %s (%ld) in %s cannot be less"
267 " than 0. Using %ld",
268 "backlog-ckpt-period",iv,
269 "global scope",(long)TAPE_CHECKPOINT_PERIOD);
270 iv = TAPE_CHECKPOINT_PERIOD ;
274 iv = TAPE_CHECKPOINT_PERIOD ;
275 tapeCkPtPeriod = (unsigned int) iv ;
278 if (getInteger (topScope,"backlog-newfile-period",&iv,NO_INHERIT))
283 logOrPrint (LOG_ERR,fp,
284 "ME config: value of %s (%ld) in %s cannot be less"
285 " than 0. Using %ld",
286 "backlog-newfile-period",
287 iv,"global scope",(long)TAPE_NEWFILE_PERIOD);
288 iv = TAPE_NEWFILE_PERIOD ;
292 iv = TAPE_NEWFILE_PERIOD ;
293 tapeCkNewFilePeriod = (unsigned int) iv ;
296 if (getBool (topScope,"debug-shrinking",&bv,NO_INHERIT))
297 debugShrinking = (bv ? true : false) ;
303 /* Create a new Tape object. There are three potential files involved in
304 I/O. 'peerName' is what the admin may have dropped in by
305 hand. 'peerName.input' is the file that was being used as input the last
306 time things were run. 'peerName.output' is the file that was being used
307 as output. The file 'peerName' is appended to 'peerName.input' (or
308 renamed if 'peerName.input' doesn't exist). Then 'peerName.output' is
309 appeneded (or renamed) to 'peerName.input' */
311 static bool inited = false ;
312 Tape newTape (const char *peerName, bool dontRotate)
314 Tape nt = xmalloc (sizeof(struct tape_s)) ;
315 size_t pLen = strlen (peerName) ;
316 size_t dLen = strlen (tapeDirectory) ;
321 atexit (tapeCleanup) ;
324 ASSERT (nt != NULL) ;
326 if (endsIn (peerName,INPUT_TAIL))
327 die ("Sorry, can't have a peer name ending in \"%s\"",INPUT_TAIL) ;
329 if (endsIn (peerName,OUTPUT_TAIL))
330 die ("Sorry, can't have a peer name ending in \"%s\"",OUTPUT_TAIL) ;
332 if (endsIn (peerName,LOCK_TAIL))
333 die ("Sorry, can't have a peer name ending in \"%s\"",LOCK_TAIL) ;
335 nt->peerName = xstrdup (peerName) ;
337 nt->handFilename = xmalloc (pLen + dLen + 2) ;
338 sprintf (nt->handFilename,"%s/%s",tapeDirectory,peerName) ;
340 nt->lockFilename = xmalloc (pLen + dLen + strlen(LOCK_TAIL) + 2) ;
341 sprintf (nt->lockFilename,"%s/%s%s",tapeDirectory,peerName,LOCK_TAIL) ;
343 nt->inputFilename = xmalloc (pLen + dLen + strlen(INPUT_TAIL) + 2) ;
344 sprintf (nt->inputFilename,"%s/%s%s",tapeDirectory,peerName,INPUT_TAIL) ;
346 nt->outputFilename = xmalloc (pLen + dLen + strlen(OUTPUT_TAIL) + 2) ;
347 sprintf (nt->outputFilename,"%s/%s%s",tapeDirectory,peerName,OUTPUT_TAIL) ;
349 if ( !lockFile (nt->lockFilename) )
351 warn ("ME lock failed for host: %s", nt->lockFilename) ;
353 free (nt->handFilename) ;
354 free (nt->lockFilename) ;
355 free (nt->inputFilename) ;
356 free (nt->outputFilename) ;
362 nt->noRotate = false ; /* for first time prepare */
364 nt->noRotate = dontRotate ;
366 addTapeGlobally (nt) ;
368 if (checkPtId == 0 && tapeCkPtPeriod > 0) /* only done once. */
369 checkPtId = prepareSleep (tapeCheckpointCallback,tapeCkPtPeriod,NULL);
371 if (ckNewFileId == 0 && tapeCkNewFilePeriod > 0)
372 ckNewFileId = prepareSleep (tapeCkNewFileCbk,tapeCkNewFilePeriod,NULL) ;
377 static void initTape (Tape nt)
379 value *peerVal = findPeer (nt->peerName) ;
380 scope *s = (peerVal == NULL ? NULL : peerVal->v.scope_val) ;
385 nt->lastRotated = 0 ;
386 nt->checkNew = false ;
394 nt->scribbled = false ;
397 nt->changed = false ;
402 nt->noBacklog = false ;
403 nt->backlogFactor = 0.0 ;
404 nt->outputLowLimit = 0 ;
405 nt->outputHighLimit = 0 ;
411 if (getBool (s, "no-backlog", &bval, INHERIT))
412 nt->noBacklog = (bval ? true : false);
414 nt->noBacklog = TAPE_DISABLE;
416 if (getInteger (s,"backlog-limit",&nt->outputLowLimit,INHERIT))
418 if (!getReal (s,"backlog-factor",&nt->backlogFactor,INHERIT))
420 if (!getInteger (s,"backlog-limit-highwater",
421 &nt->outputHighLimit,INHERIT))
423 warn ("%s no backlog-factor or backlog-high-limit",
425 nt->outputLowLimit = 0 ;
426 nt->outputHighLimit = 0 ;
427 nt->backlogFactor = 0.0 ;
431 nt->outputHighLimit = (long)(nt->outputLowLimit * nt->backlogFactor);
434 warn ("ME config: no definition for required key backlog-limit") ;
437 d_printf (1, "%s spooling: %s\n", nt->peerName,
438 nt->noBacklog ? "disabled" : "enabled");
440 d_printf (1,"%s tape backlog limit: [%ld %ld]\n",nt->peerName,
442 nt->outputHighLimit) ;
448 void gFlushTapes (void)
452 notice ("ME flushing tapes") ;
453 for (i = 0 ; i < activeTapeIdx ; i++)
454 tapeFlush (activeTapes [i]) ;
459 /* close the input and output tapes and reinitialize everything in the
461 void tapeFlush (Tape tape)
463 if (tape->inFp != NULL)
465 checkpointTape (tape) ;
466 fclose (tape->inFp) ;
469 if (tape->outFp != NULL)
470 fclose (tape->outFp) ;
477 void gPrintTapeInfo (FILE *fp, unsigned int indentAmt)
479 char indent [INDENT_BUFFER_SIZE] ;
482 for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
486 fprintf (fp,"%sGlobal Tape List : (count %lu) {\n",
487 indent,(unsigned long) activeTapeIdx) ;
489 for (i = 0 ; i < activeTapeIdx ; i++)
490 printTapeInfo (activeTapes [i],fp,indentAmt + INDENT_INCR) ;
491 fprintf (fp,"%s}\n",indent) ;
495 void tapeLogGlobalStatus (FILE *fp)
497 fprintf (fp,"%sBacklog file global values:%s\n",
498 genHtml ? "<B>" : "", genHtml ? "</B>" : "") ;
499 fprintf (fp," directory: %s\n",tapeDirectory) ;
500 fprintf (fp," rotate period: %-3ld seconds\n",(long) rotatePeriod) ;
501 fprintf (fp,"checkpoint period: %-3ld seconds\n",(long) tapeCkPtPeriod) ;
502 fprintf (fp," newfile period: %-3ld seconds\n",(long) tapeCkNewFilePeriod);
507 void tapeLogStatus (Tape tape, FILE *fp)
510 fprintf (fp,"(no tape)\n") ;
511 else if (tape->noBacklog)
512 fprintf (fp," spooling: DISABLED\n");
513 else if (tape->outputLowLimit == 0)
514 fprintf (fp," spooling: UNLIMITED\n");
517 fprintf (fp," backlog low limit: %ld\n",
518 tape->outputLowLimit) ;
519 fprintf (fp," backlog upper limit: %ld",
520 tape->outputHighLimit) ;
521 if (tape->backlogFactor > 0.0)
522 fprintf (fp," (factor %1.2f)",tape->backlogFactor) ;
524 fprintf (fp," backlog shrinkage: ") ;
525 fprintf (fp,"%ld bytes (from current file)\n",tape->lossage) ;
529 void printTapeInfo (Tape tape, FILE *fp, unsigned int indentAmt)
531 char indent [INDENT_BUFFER_SIZE] ;
537 for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
541 fprintf (fp,"%sTape : %p {\n",indent,(void *) tape) ;
545 fprintf (fp,"%s}\n",indent) ;
549 fprintf (fp,"%s master-file : %s\n", indent, tape->handFilename) ;
550 fprintf (fp,"%s input-file : %s\n", indent, tape->inputFilename) ;
551 fprintf (fp,"%s output-file : %s\n",indent, tape->outputFilename) ;
552 fprintf (fp,"%s lock-file : %s\n",indent, tape->lockFilename) ;
553 fprintf (fp,"%s peerName : %s\n",indent,tape->peerName) ;
554 fprintf (fp,"%s input-FILE : %p\n",indent, (void *) tape->inFp) ;
555 fprintf (fp,"%s output-FILE : %p\n",indent, (void *) tape->outFp) ;
556 fprintf (fp,"%s output-limit : %ld\n",indent, tape->outputLowLimit) ;
559 fprintf (fp,"%s in-memory article queue (length %d) {\n",indent,
562 for (qe = tape->head ; qe != NULL ; qe = qe->next)
565 printArticleInfo (qe->article,fp,indentAmt + INDENT_INCR) ;
567 fprintf (fp,"%s %p\n",indent,qe->article) ;
571 fprintf (fp,"%s }\n",indent) ;
574 fprintf (fp,"%s tell-position : %ld\n",indent,(long) tape->tellpos) ;
575 fprintf (fp,"%s input-FILE-changed : %s\n",indent,
576 boolToString (tape->changed)) ;
578 fprintf (fp,"%s no-rotate : %s\n",indent, boolToString (tape->noRotate));
580 fprintf (fp,"%s}\n",indent) ;
586 /* delete the tape. Spools the in-memory articles to disk. */
587 void delTape (Tape tape)
596 if (tape->outFp != NULL && fclose (tape->outFp) != 0)
597 syswarn ("ME ioerr fclose %s", tape->outputFilename) ;
599 if (stat(tape->outputFilename, &st) == 0 && st.st_size == 0)
601 d_printf (1,"removing empty output tape: %s\n",tape->outputFilename) ;
602 unlink (tape->outputFilename) ;
606 tape->outputSize = 0 ;
614 if (tape->inFp != NULL)
616 checkpointTape (tape) ;
617 fclose (tape->inFp) ;
620 unlockFile (tape->lockFilename) ;
622 freeCharP (tape->handFilename) ;
623 freeCharP (tape->inputFilename) ;
624 freeCharP (tape->outputFilename) ;
625 freeCharP (tape->lockFilename) ;
626 freeCharP (tape->peerName) ;
628 removeTapeGlobally (tape) ;
634 void tapeTakeArticle (Tape tape, Article article)
640 const char *fname, *msgid ;
642 ASSERT (tape != NULL) ;
644 /* return immediately if spooling disabled - jgarzik */
647 delArticle (article) ;
651 fname = artFileName (article) ;
652 msgid = artMsgId (article) ;
653 amt = fprintf (tape->outFp,"%s %s\n", fname, msgid) ;
655 /* I'd rather know where I am each time, and I don't trust all
656 fprintf's to give me character counts. */
657 #if defined (TRUST_FPRINTF)
659 tape->outputSize += amt ;
662 #if defined (NO_TRUST_STRLEN)
664 tape->outputSize = ftello (tape->outFp) ;
668 tape->outputSize += strlen(fname) + strlen(msgid) + 2 ; /* " " + "\n" */
673 delArticle (article) ;
679 fflush (tape->outFp) ;
681 if (fstat (fileno (tape->outFp),&sb) != 0)
682 syswarn ("ME oserr fstat %s",tape->outputFilename) ;
683 else if (sb.st_size != tape->outputSize)
684 syslog (LOG_ERR,"fstat and ftello do not agree: %ld %ld for %s\n",
685 (long)sb.st_size,tape->outputSize,tape->outputFilename) ;
688 if (tape->outputHighLimit > 0 && tape->outputSize >= tape->outputHighLimit)
690 long oldSize = tape->outputSize ;
691 shrinkfile (tape->outFp,tape->outputLowLimit,tape->outputFilename,"a+");
692 tape->outputSize = ftello (tape->outFp) ;
693 tape->lossage += oldSize - tape->outputSize ;
698 /* Pick an article off a tape and return it. NULL is returned if there
699 are no more articles. */
700 Article getArticle (Tape tape)
702 char line [2048] ; /* ick. 1024 for filename + 1024 for msgid */
704 char *msgid, *filename ;
706 time_t now = theTime() ;
708 ASSERT (tape != NULL) ;
710 if (tape->inFp == NULL && (now - tape->lastRotated) > rotatePeriod)
711 prepareFiles (tape) ; /* will flush queue too. */
713 while (tape->inFp != NULL && art == NULL)
715 tape->changed = true ;
717 if (fgets (line,sizeof (line), tape->inFp) == NULL)
719 if (ferror (tape->inFp))
720 syswarn ("ME ioerr on tape file %s", tape->inputFilename) ;
721 else if ( !feof (tape->inFp) )
722 syswarn ("ME oserr fgets %s", tape->inputFilename) ;
724 if (fclose (tape->inFp) != 0)
725 syswarn ("ME ioerr fclose %s", tape->inputFilename) ;
727 d_printf (1,"No more articles on tape %s\n",tape->inputFilename) ;
730 tape->scribbled = false ;
732 unlink (tape->inputFilename) ;
734 if ((now - tape->lastRotated) > rotatePeriod)
735 prepareFiles (tape) ; /* rotate files to try next. */
739 msgid = filename = NULL ;
741 for (p = line ; *p && CTYPE(isspace, *p) ; p++) /* eat whitespace */
753 for (q++ ; *q && CTYPE(isspace, *q) ; q++)
758 if (((p = strchr (q, ' ')) != NULL) ||
759 ((p = strchr (q, '\n')) != NULL))
765 filename = NULL ; /* no trailing newline or blank */
768 filename = NULL ; /* line had one field and some blanks */
771 filename = NULL ; /* line only had one field */
774 /* See if message ID looks valid. */
776 for (p = msgid; *p; p++)
779 if (*msgid != '<' || *p != '>') {
780 warn ("ME tape invalid messageID in %s: %s",
781 tape->inputFilename,msgid);
786 if (filename != NULL && msgid != NULL)
787 art = newArticle (filename, msgid) ;
789 /* art may be NULL here if the file is no longer valid. */
794 /* now we either have an article or there is no more on disk */
797 if (tape->inFp != NULL && ((c = fgetc (tape->inFp)) != EOF))
798 ungetc (c,tape->inFp) ; /* shouldn't happen */
799 else if (tape->inFp != NULL)
801 /* last article read was the end of the tape. */
802 if (fclose (tape->inFp) != 0)
803 syswarn ("ME ioerr fclose %s", tape->inputFilename) ;
806 tape->scribbled = false ;
808 /* toss out the old input file and prepare the new one */
809 unlink (tape->inputFilename) ;
811 if (now - tape->lastRotated > rotatePeriod)
812 prepareFiles (tape) ;
818 d_printf (2,"%s All out of articles in the backlog\n", tape->peerName) ;
820 d_printf (2,"%s Peeled article %s from backlog\n",tape->peerName,
827 /****************************************************/
828 /** CLASS FUNCTIONS **/
829 /****************************************************/
831 /* Cause all the Tapes to checkpoint themselves. */
832 void checkPointTapes (void)
836 for (i = 0 ; i < activeTapeIdx ; i++)
837 checkpointTape (activeTapes [i]) ;
841 /* make all the tapes set their checkNew flag. */
842 static void tapesSetCheckNew (void)
846 for (i = 0 ; i < activeTapeIdx ; i++)
847 activeTapes[i]->checkNew = true ;
852 /* Set the pathname of the directory for storing tapes in. */
853 void setTapeDirectory (const char *newDir)
855 /* the activeTape variable gets set when the first Tape object
857 if (activeTapes != NULL)
859 syslog (LOG_CRIT,"Resetting backlog directory") ;
863 if (tapeDirectory != NULL)
864 freeCharP (tapeDirectory) ;
866 tapeDirectory = xstrdup (newDir) ;
868 addPointerFreedOnExit (tapeDirectory) ;
872 /* Get the pathname of the directory tapes are stored in. */
873 const char *getTapeDirectory (void)
875 ASSERT (tapeDirectory != NULL) ;
878 if (tapeDirectory == NULL)
880 tapeDirectory = xstrdup (dflTapeDir) ;
881 addPointerFreedOnExit (tapeDirectory) ;
885 return tapeDirectory ;
890 void setOutputSizeLimit (long val)
892 defaultSizeLimit = val ;
901 /**********************************************************************/
902 /* PRIVATE FUNCTIONS */
903 /**********************************************************************/
906 /* Add a new tape to the class-level list of active tapes. */
907 static void addTapeGlobally (Tape tape)
909 ASSERT (tape != NULL) ;
911 if (activeTapeSize == activeTapeIdx)
915 activeTapeSize += 10 ;
916 if (activeTapes != NULL)
917 activeTapes = xrealloc (activeTapes, sizeof(Tape) * activeTapeSize) ;
919 activeTapes = xmalloc (sizeof(Tape) * activeTapeSize) ;
921 for (i = activeTapeIdx ; i < activeTapeSize ; i++)
922 activeTapes [i] = NULL ;
924 activeTapes [activeTapeIdx++] = tape ;
928 /* Remove a tape for the class-level list of active tapes. */
929 static void removeTapeGlobally (Tape tape)
936 ASSERT (activeTapeIdx > 0) ;
938 for (i = 0 ; i < activeTapeIdx ; i++)
939 if (activeTapes [i] == tape)
942 ASSERT (i < activeTapeIdx) ;
944 for ( ; i < (activeTapeIdx - 1) ; i++)
945 activeTapes [i] = activeTapes [i + 1] ;
947 activeTapes [--activeTapeIdx] = NULL ;
949 if (activeTapeIdx == 0)
957 /* Have a tape checkpoint itself so that next process can pick up where
958 this one left off. */
959 static void checkpointTape (Tape tape)
961 if (tape->inFp == NULL) /* no input file being read. */
964 if (!tape->changed) /* haven't read since last checkpoint */
966 d_printf (1,"Not checkpointing unchanged tape: %s\n", tape->peerName) ;
970 if ((tape->tellpos = ftello (tape->inFp)) < 0)
972 syswarn ("ME oserr ftello %s", tape->inputFilename) ;
976 /* strlen of "18446744073709551616\n" (2^64) */
979 /* make sure we're not right at the beginning of the file so we can write. */
980 if (tape->tellpos > BITS64)
982 rewind (tape->inFp) ;
984 /* scribble blanks over the first lines characters */
985 if (!tape->scribbled)
990 while ((c = fgetc (tape->inFp)) != '\n' || currloc <= BITS64)
996 rewind (tape->inFp) ;
998 while (currloc-- > 0)
999 fputc (' ',tape->inFp) ;
1001 rewind (tape->inFp) ;
1003 fflush (tape->inFp) ;
1004 tape->scribbled = true ;
1007 fprintf (tape->inFp,"%ld",tape->tellpos) ;
1009 if (fseeko (tape->inFp,tape->tellpos,SEEK_SET) != 0)
1010 syswarn ("ME oserr fseeko(%s,%ld,SEEK_SET)",tape->inputFilename,
1014 tape->changed = false ;
1019 /* Prepare the tape file(s) for input and output */
1021 /* For a given Tape there are
1022 * three possible files: PEER.input PEER and
1023 * PEER.output. PEER.input and PEER.output are private to
1024 * innfeed. PEER is where a sysadmin can drop a file that (s)he
1025 * wants to inject into the process. If the first line of the input file
1026 * contains only an integer (possibly surrounded by spaces), then this is
1027 * taken to be the position to seek to before reading.
1029 * prepareFiles will process them in a manner much like the following shell
1032 * if [ ! -f PEER.input ]; then
1033 * if [ -f PEER ]; then mv PEER PEER.input
1034 * elif [ -f PEER.output ]; then mv PEER.output PEER; fi
1037 * At this point PEER.input is opened for reading if it exists.
1039 * The checkpoint file is left as-is unless the PEER.input file
1040 * happens to be newer that the checkpoint file.
1042 static void prepareFiles (Tape tape)
1049 /* flush any in memory articles to disk */
1050 if (tape->head != NULL && tape->outFp != NULL)
1056 /* First time through, or something external has set checkNew */
1057 if (tape->lastRotated == 0 || tape->checkNew)
1059 newExists = fileExistsP (tape->handFilename) ;
1061 notice ("%s new hand-prepared backlog file", tape->peerName) ;
1066 if (tape->lastRotated == 0) /* first time here */
1068 inpExists = fileExistsP (tape->inputFilename) ;
1069 outExists = fileExistsP (tape->outputFilename) ;
1073 inpExists = (tape->inFp != NULL) ? true : false ; /* can this ever be true?? */
1074 outExists = (tape->outFp != NULL && tape->outputSize > 0) ? true : false ;
1078 /* move the hand-dropped file to the input file if needed. */
1079 if (newExists && !inpExists)
1081 if (rename (tape->handFilename,tape->inputFilename) != 0)
1082 syswarn ("ME oserr rename %s, %s", tape->handFilename,
1083 tape->inputFilename);
1086 notice ("%s grabbing external tape file", tape->peerName) ;
1091 /* now move the output file to the input file, if needed and only if in
1092 not in NOROTATE mode. */
1093 if (outExists && !inpExists && !tape->noRotate)
1095 if (tape->outFp != NULL)
1097 fclose (tape->outFp) ;
1098 tape->outFp = NULL ;
1101 if (rename (tape->outputFilename,tape->inputFilename) != 0)
1102 syswarn ("ME oserr rename %s, %s", tape->outputFilename,
1103 tape->inputFilename) ;
1110 /* now open up the input file and seek to the proper position. */
1116 if ((tape->inFp = fopen (tape->inputFilename,"r+")) == NULL)
1117 syswarn ("ME fopen %s", tape->inputFilename) ;
1122 if (fgets (buffer,sizeof (buffer) - 1, tape->inFp) == NULL)
1124 if (feof (tape->inFp))
1126 d_printf (1,"Empty input file: %s\n",tape->inputFilename) ;
1127 unlink (tape->inputFilename) ;
1130 syswarn ("ME oserr fgets %s", tape->inputFilename) ;
1132 fclose (tape->inFp) ;
1134 tape->scribbled = false ;
1138 unsigned int len = strlen (buffer) ;
1141 if (len > 0 && buffer [len - 1] == '\n')
1142 buffer [--len] = '\0' ;
1144 if (len > 0 && strspn (buffer,"0123456789 \n") == len)
1146 if (sscanf (buffer,"%ld",&newPos) == 1)
1148 tape->scribbled = true ;
1149 tape->tellpos = newPos ;
1153 if ((flength = fileLength (fileno (tape->inFp))) < tape->tellpos)
1155 warn ("ME tape short: %s %ld %ld", tape->inputFilename,
1156 flength, tape->tellpos) ;
1159 else if (tape->tellpos == 0)
1160 rewind (tape->inFp) ;
1161 else if (fseeko (tape->inFp,tape->tellpos - 1,SEEK_SET) != 0)
1162 syswarn ("ME oserr fseeko(%s,%ld,SEEK_SET)",
1163 tape->inputFilename,tape->tellpos) ;
1164 else if ((c = fgetc (tape->inFp)) != '\n')
1166 while (c != EOF && c != '\n')
1167 c = fgetc (tape->inFp) ;
1171 fclose (tape->inFp) ;
1172 unlink (tape->inputFilename) ;
1174 tape->scribbled = false ;
1175 prepareFiles (tape) ;
1179 long oldPos = tape->tellpos ;
1181 tape->changed = true ;
1182 checkpointTape (tape) ;
1184 warn ("ME internal checkpoint line boundary missed:"
1185 " %s %ld vs. %ld",tape->inputFilename,
1186 tape->tellpos, oldPos) ;
1193 tape->lastRotated = theTime() ;
1194 tape->checkNew = false ;
1196 /* now open up the output file. */
1197 if (tape->outFp == NULL)
1199 if ((tape->outFp = fopen (tape->outputFilename,"a+")) == NULL)
1201 syswarn ("ME tape open failed (a+) %s", tape->outputFilename) ;
1204 fseeko (tape->outFp,0,SEEK_END) ;
1205 tape->outputSize = ftello (tape->outFp) ;
1211 static void tapeCkNewFileCbk (TimeoutId id, void *d UNUSED)
1213 ASSERT (id == ckNewFileId) ;
1214 ASSERT (tapeCkNewFilePeriod > 0) ;
1216 tapesSetCheckNew () ;
1218 ckNewFileId = prepareSleep (tapeCkNewFileCbk,tapeCkNewFilePeriod,NULL) ;
1222 /* The timer callback function that will checkpoint all the active tapes. */
1223 static void tapeCheckpointCallback (TimeoutId id, void *d UNUSED)
1225 ASSERT (id == checkPtId) ;
1226 ASSERT (tapeCkPtPeriod > 0) ;
1228 d_printf (1,"Checkpointing tapes\n") ;
1230 checkPointTapes () ;
1232 checkPtId = prepareSleep (tapeCheckpointCallback,tapeCkPtPeriod,NULL) ;
1238 static void flushTape (Tape tape)
1242 /* flush out queue to disk. */
1244 while (elem != NULL)
1246 tape->head = tape->head->next ;
1247 fprintf (tape->outFp,"%s %s\n", artFileName (elem->article),
1248 artMsgId (elem->article)) ;
1250 delArticle (elem->article) ;
1259 /* I'd rather know where I am each time, and I don't trust all
1260 fprintf's to give me character counts. */
1261 tape->outputSize = ftello (tape->outFp) ;
1265 static bool logged = false ;
1267 fflush (tape->outFp) ;
1268 if (fstat (fileno (tape->outFp),&buf) != 0 && !logged)
1270 syslog (LOG_ERR,FSTAT_FAILURE,tape->outputFilename) ;
1273 else if (buf.st_size != tape->outputSize)
1275 warn ("ME fstat and ftello do not agree for %s",
1276 tape->outputFilename) ;
1281 if (tape->outputHighLimit > 0 && tape->outputSize > tape->outputHighLimit)
1283 shrinkfile (tape->outFp,tape->outputLowLimit,tape->outputFilename,"a+");
1284 tape->outputSize = ftello (tape->outFp) ;
1290 static void tapeCleanup (void)
1292 free (tapeDirectory) ;
1293 tapeDirectory = NULL ;