1 /* $Id: innlistener.c 6716 2004-05-16 20:26:56Z rra $
3 ** The implementation of the innfeed InnListener class.
5 ** Written by James Brister <brister@vix.com>
19 #include "inn/messages.h"
24 #include "configfile.h"
27 #include "innlistener.h"
31 #define LISTENER_INPUT_BUFFER (1024 * 8) /* byte size of the input buffer */
32 #define EOF_SLEEP_TIME 1 /* seconds to sleep when EOF on InputFile */
43 TimeoutId inputEOFSleepId ;
48 static unsigned int listenerCount = 0 ;
49 static InnListener listenerList = NULL ;
51 InnListener mainListener ;
53 static FILE *droppedFp = NULL ;
54 static long droppedCount = 0 ;
55 static int droppedFileCount = 0 ;
56 static char *dropArtFile = NULL ;
57 static bool fastExit = false ;
59 extern const char *pidFile ;
60 extern const char *InputFile ;
61 extern bool RollInputFile ;
65 static void giveArticleToPeer (InnListener lis,
66 Article article, const char *peerName) ;
67 static void newArticleCommand (EndPoint ep, IoStatus i,
68 Buffer *buffs, void *data) ;
69 static void wakeUp (TimeoutId id, void *data) ;
70 static void writeCheckPoint (int offsetAdjust) ;
71 static void dropArticle (const char *peer, Article article) ;
72 static void listenerCleanup (void) ;
74 static bool inited = false ;
77 void listenerLogStatus (FILE *fp)
79 fprintf (fp,"%sListener Status:%s\n",
80 genHtml ? "<B>" : "", genHtml ? "</B>" : "") ;
81 fprintf (fp," Dropped article file: %s\n",dropArtFile) ;
82 fprintf (fp," Dropped article count: %ld\n",(long) droppedCount) ;
86 InnListener newListener (EndPoint endp, bool isDummy, bool dynamicPeers)
88 InnListener l = xcalloc (1, sizeof(struct innlistener_s)) ;
94 atexit (listenerCleanup) ;
99 l->hostLen = MAX_HOSTS ;
100 l->myHosts = xcalloc (l->hostLen, sizeof(Host)) ;
102 l->inputBuffer = newBuffer (LISTENER_INPUT_BUFFER) ;
103 l->dummyListener = isDummy ;
104 l->dynamicPeers = dynamicPeers ;
106 addPointerFreedOnExit ((char *)bufferBase(l->inputBuffer)) ;
107 addPointerFreedOnExit ((char *)l->myHosts) ;
108 addPointerFreedOnExit ((char *)l) ;
110 readArray = makeBufferArray (bufferTakeRef (l->inputBuffer), NULL) ;
111 prepareRead (endp,readArray,newArticleCommand,l,1) ;
113 l->next = listenerList ;
121 void gPrintListenerInfo (FILE *fp, unsigned int indentAmt)
124 char indent [INDENT_BUFFER_SIZE] ;
127 for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
131 fprintf (fp,"%sGlobal InnListener list : %p (count %d) {\n",
132 indent,(void *) listenerList,listenerCount) ;
133 for (p = listenerList ; p != NULL ; p = p->next)
134 printListenerInfo (p,fp,indentAmt + INDENT_INCR) ;
135 fprintf (fp,"%s}\n",indent) ;
140 void printListenerInfo (InnListener listener, FILE *fp, unsigned int indentAmt)
142 char indent [INDENT_BUFFER_SIZE] ;
145 for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
149 fprintf (fp,"%sInnListener : %p {\n",indent,(void *) listener) ;
150 fprintf (fp,"%s endpoint : %p\n", indent,(void *) listener->myep) ;
151 fprintf (fp,"%s dummy-listener : %s\n",indent,
152 boolToString (listener->dummyListener)) ;
153 fprintf (fp,"%s dynamicPeers : %s\n",indent,
154 boolToString (listener->dynamicPeers)) ;
156 fprintf (fp,"%s input-buffer {\n",indent) ;
157 printBufferInfo (listener->inputBuffer,fp,indentAmt + INDENT_INCR) ;
158 fprintf (fp,"%s }\n",indent) ;
160 fprintf (fp,"%s hosts {\n",indent) ;
161 for (i = 0 ; i < listener->hostLen ; i++)
164 if (listener->myHosts [i] != NULL)
165 printHostInfo (listener->myHosts [i],fp,indentAmt + INDENT_INCR) ;
167 fprintf (fp,"%s %p\n",indent,(void *) listener->myHosts[i]) ;
171 fprintf (fp,"%s }\n",indent) ;
173 fprintf (fp,"%s}\n",indent) ;
176 /* Unlink the pidFile if and only if it is our pidFile.
177 There is still a racecondition here but very small. */
178 static void unlinkPidFile (void)
183 if ((fp = fopen(pidFile, "r")) == NULL)
186 if (fgets(buf, 32, fp) != NULL && atoi(buf) == getpid())
191 /* Close down all hosts on this listener. When they're all gone the
192 Listener will be deleted. */
193 void shutDown (InnListener l)
198 d_printf (1,"Shutting down the listener\n") ;
200 /* When shutting down the mainListener, stop writing to the
201 StatusFile and remove our pidFile. */
202 if (l == mainListener)
204 /*hostCleanup (); should do this but .. */
205 hostSetStatusFile ("/dev/null");
209 closeDroppedArticleFile () ;
213 if (l->inputEOFSleepId != 0)
214 removeTimeout (l->inputEOFSleepId) ;
215 l->inputEOFSleepId = 0 ;
216 delEndPoint (l->myep) ;
220 for (i = 0, count = 0 ; i < l->hostLen ; i++)
221 if (l->myHosts [i] != NULL)
223 hostClose (l->myHosts[i]) ;
227 if (count == 0 || fastExit)
229 time_t now = theTime () ;
230 char dateString [30] ;
233 strlcpy (dateString,ctime (&now),sizeof (dateString)) ;
234 dateString [24] = '\0' ;
237 notice ("ME finishing (quickly) at %s", dateString) ;
239 notice ("ME finishing at %s", dateString) ;
247 bool listenerAddPeer (InnListener listener, Host hostObj)
251 d_printf (1,"Adding peer: %s\n", hostPeerName (hostObj)) ;
253 for (i = 0 ; i < listener->hostLen ; i++)
255 if (listener->myHosts [i] == NULL)
257 listener->myHosts [i] = hostObj ;
267 /* return true if this listener doesn't ever generate articles. */
268 bool listenerIsDummy (InnListener listener)
270 return listener->dummyListener ;
273 /* Called by the Host when it (the Host) is about to delete itself. */
274 unsigned int listenerHostGone (InnListener listener, Host host)
277 unsigned int someThere = 0 ;
279 d_printf (1,"Host is gone: %s\n", hostPeerName (host)) ;
281 for (i = 0 ; i < listener->hostLen ; i++)
282 if (listener->myHosts [i] == host)
283 listener->myHosts [i] = NULL ;
284 else if (listener->myHosts [i] != NULL)
291 /* called by the Host when it has nothing to do. */
292 void listenerHostIsIdle (InnListener listener, Host host)
294 ASSERT (listener != NULL) ;
295 ASSERT (host != NULL) ;
297 d_printf (1,"Host is idle: %s\n", hostPeerName (host)) ;
299 if (!listener->dummyListener)
302 /* if this listener is a dummy (i.e. not generating articles cause we're
303 just dealing with backlog files) then forget about the host and when
304 last one is gone we exit. */
310 void openInputFile (void)
316 ASSERT (InputFile && *InputFile) ;
318 fd = open(InputFile, O_RDWR) ;
320 die ("open %s: %s\n", InputFile, strerror(errno)) ;
322 mainFd = getMainEndPointFd() ;
325 if (dup2(fd, mainFd) < 0)
326 die ("dup2 %d %d: %s\n", fd, mainFd, strerror(errno)) ;
330 i = read(mainFd, buf, sizeof (buf)) ;
332 die ("read %s: %s\n", InputFile, strerror(errno)) ;
336 buf [ sizeof(buf) - 1 ] = '\0';
337 offset = (off_t) strtol (p, &p, 10) ;
338 if (offset > 0 && *p == '\n')
339 lseek (mainFd, offset, SEEK_SET) ;
341 lseek (mainFd, 0, SEEK_SET) ;
343 syslog(LOG_NOTICE, "ME opened %s", InputFile);
347 int listenerConfigLoadCbk (void *data UNUSED)
351 if (getBool (topScope,"fast-exit",&bval,NO_INHERIT))
352 fastExit = (bval ? true : false) ;
357 /**********************************************************************/
358 /** STATIC PRIVATE FUNCTIONS **/
359 /**********************************************************************/
362 /* EndPoint callback function for when the InnListener's fd is ready for
364 static void newArticleCommand (EndPoint ep, IoStatus i,
365 Buffer *buffs, void *data)
367 InnListener lis = (InnListener) data ;
368 char *msgid, *msgidEnd ;
369 char *fileName, *fileNameEnd ;
370 char *peer, *peerEnd ;
372 char *bbase = bufferBase (buffs [0]) ;
373 size_t blen = bufferDataSize (buffs [0]) ;
375 static int checkPointCounter ;
378 ASSERT (ep == lis->myep) ;
380 bufferAddNullByte (buffs [0]) ;
384 if ( lis == mainListener && InputFile != NULL )
388 syslog(LOG_NOTICE, "ME reached EOF in %s", InputFile);
390 RollInputFile = false ;
391 readArray = makeBufferArray (bufferTakeRef (buffs [0]),NULL) ;
392 prepareRead (ep, readArray, newArticleCommand, data, 1) ;
396 lis->inputEOFSleepId =
397 prepareSleep (wakeUp, EOF_SLEEP_TIME, data) ;
403 d_printf (1,"Got EOF on listener\n") ;
404 notice ("ME source lost . Exiting");
408 else if (i == IoFailed)
410 errno = endPointErrno (ep) ;
412 if (errno != ECONNABORTED)
414 syswarn ("ME source read error, exiting") ;
415 d_printf (1,"Got IO Error on listener\n") ;
418 else if (strchr (bbase, '\n') == NULL) /* partial read */
420 /* check for input corrupted by NULs - if they
421 precede the newline, we never get out of here */
422 if (strlen(bbase) < blen)
424 warn ("ME source format bad, exiting: %s", bbase) ;
429 if (blen == bufferSize(buffs [0])) {
430 if (!expandBuffer (buffs [0], BUFFER_EXPAND_AMOUNT)) {
431 warn ("ME error expanding input buffer") ;
436 readArray = makeBufferArray (bufferTakeRef (buffs [0]),NULL) ;
437 if (!prepareRead (ep, readArray, newArticleCommand, data, 1)) {
438 warn ("ME error prepare read failed") ;
439 freeBufferArray (readArray) ;
446 /* now iterate over each full command we got on the input. */
448 while ((cmd < (bbase + blen)) && ((endc = strchr (cmd,'\n')) != NULL))
451 char *next = endc + 1;
462 /* check for input corrupted by NULs - if they are preceded
463 by newline, we may skip a large chunk without noticing */
464 if (*next == '\0' && next < bbase + blen)
466 warn ("ME source format bad, exiting: %s", cmd) ;
472 d_printf (2,"INN Command: %s\n", cmd) ;
474 /* pick out the leading string (the filename) */
475 if ((fileName = findNonBlankString (cmd,&fileNameEnd)) == NULL)
477 warn ("ME source format bad, exiting: %s", cmd) ;
483 *fileNameEnd = '\0' ; /* for the benefit of newArticle() */
485 /* now pick out the next string (the message id) */
486 if ((msgid = findNonBlankString (fileNameEnd + 1,&msgidEnd)) == NULL)
488 *fileNameEnd = ' ' ; /* to make syslog work properly */
489 warn ("ME source format bad, exiting: %s", cmd) ;
495 *msgidEnd = '\0' ; /* for the benefit of newArticle() */
497 /* now create an article object and give it all the peers on the
498 rest of the command line. Will return null if file is missing. */
499 article = newArticle (fileName, msgid) ;
502 /* Check the message ID length */
503 if (strlen(msgid) > NNTP_MSGID_MAXLEN) {
504 warn ("ME message id exceeds limit of %d octets: %s",
505 NNTP_MSGID_MAXLEN, msgid) ;
506 *(msgidEnd+1) = '\0' ;
510 /* Check if message ID starts with < and ends with > */
511 if (*msgid != '<' || *(msgidEnd-1) != '>') {
512 warn ("ME source format bad, exiting: %s", cmd) ;
513 *(msgidEnd+1) = '\0';
516 /* now get all the peernames off the rest of the command lines */
522 /* pick out the next peer name */
523 if ((peer = findNonBlankString (peerEnd + 1,&peerEnd))==NULL)
524 break ; /* even no peer names is OK. */ /* XXX REALLY? */
528 /* See if this is a valid peername */
529 for(s = peer; *s; s++)
530 if (!CTYPE(isalnum, *s) && *s != '.' && *s != '-' && *s != '_')
533 warn ("ME invalid peername %s", peer) ;
537 giveArticleToPeer (lis,article,peer) ;
539 while (peerEnd < endc) ;
541 delArticle (article) ;
545 /* write a checkpoint marker if we've done another large chunk */
546 if (InputFile && *InputFile && ++checkPointCounter == 1000)
548 /* adjust the seek pointer value by the current location
549 within the input buffer */
550 writeCheckPoint (blen - (cmd - bbase)) ;
551 checkPointCounter = 0 ;
556 if (*cmd != '\0') /* partial command left in buffer */
559 unsigned int leftAmt = blen - (cmd - bbase) ;
561 ASSERT (cmd != bbase) ;
562 /* first we shift whats left in the buffer down to the bottom */
563 memmove (bbase,cmd,leftAmt) ;
564 bufferSetDataSize (buffs [0],leftAmt) ;
566 bArr = makeBufferArray (bufferTakeRef (buffs [0]),NULL) ;
568 if ( !prepareRead (lis->myep, bArr, newArticleCommand, lis, 1) )
570 warn ("ME error prepare read failed") ;
572 freeBufferArray (bArr) ;
579 else if ( !readIsPending (lis->myep) )
580 { /* XXX read should never be pending here */
581 Buffer *bArr = makeBufferArray (bufferTakeRef (buffs [0]),NULL) ;
583 bufferSetDataSize (buffs [0],0) ;
585 if ( !prepareRead (lis->myep, bArr, newArticleCommand, lis, 1) )
587 warn ("ME error prepare read failed") ;
596 freeBufferArray (buffs) ;
599 /* EndPoint callback function for when the sleep due to
600 having reached EOF on InputFile is done. */
601 static void wakeUp (TimeoutId id, void *data)
603 InnListener lis = (InnListener) data ;
606 ASSERT (id == lis->inputEOFSleepId) ;
608 lis->inputEOFSleepId = 0 ;
609 readArray = makeBufferArray (bufferTakeRef (lis->inputBuffer), NULL) ;
610 prepareRead (lis->myep,readArray,newArticleCommand,lis,1) ;
614 /* Find the Host object for the peer and hand off a reference to the
615 article for it to transmit. */
616 static void giveArticleToPeer (InnListener lis,
617 Article article, const char *peerName)
621 for (i = 0 ; i < lis->hostLen ; i++)
622 if (lis->myHosts[i] != NULL)
623 if (strcmp (peerName,hostPeerName (lis->myHosts [i])) == 0)
625 d_printf (1,"Giving article to peer: %s\n", peerName) ;
626 hostSendArticle (lis->myHosts [i],artTakeRef (article)) ;
630 if (i == lis->hostLen)
632 d_printf (1,"Failed to give article to peer: -%s-\n", peerName) ;
634 if (lis->dynamicPeers)
638 d_printf (1, "Adding peer dynamically\n") ;
640 newHostObj = newDefaultHost (lis, peerName);
642 if (newHostObj == NULL)
644 /* Most likely we couldn't get the lock, i.e. the
647 dropArticle (peerName,article) ;
649 else if ( !listenerAddPeer (lis, newHostObj) )
651 /* XXX need to remember we've gone over the limit and not try
653 warn ("ME internal too many hosts. (max is %lu)",
654 (unsigned long) lis->hostLen) ;
655 dropArticle (peerName,article) ;
659 d_printf (1,"Giving article to peer: %s\n", peerName) ;
660 hostSendArticle (newHostObj,artTakeRef (article)) ;
665 dropArticle (peerName,article) ;
671 static void writeCheckPoint (int offsetAdjust)
673 char offsetString[16], *writePointer ;
675 int writeBytes, writeReturn, mainFd ;
677 mainFd = getMainEndPointFd() ;
678 offset = lseek (mainFd, 0, SEEK_CUR) ;
680 syslog (LOG_ERR, "ME tell(mainFd): %m") ;
683 snprintf (offsetString, sizeof(offsetString), "%ld\n",
684 (long)(offset - offsetAdjust) ) ;
685 if ( lseek (mainFd, 0, SEEK_SET) != 0 )
686 syslog (LOG_ERR, "ME seek(mainFd, 0, 0): %m") ;
689 writeBytes = strlen (offsetString) ;
690 writePointer = offsetString ;
693 writeReturn = write (mainFd, writePointer, writeBytes) ;
696 syslog (LOG_ERR,"ME write input checkpoint: %m") ;
699 writePointer += writeReturn ;
700 writeBytes -= writeReturn ;
701 } while (writeBytes) ;
702 if ( lseek (mainFd, offset, SEEK_SET) != offset )
703 die ("ME seek(mainFd, %ld, SEEK_SET): %s\n", (long)offset,
710 void openDroppedArticleFile (void)
712 pid_t myPid = getpid () ;
713 const char *tapeDir = getTapeDirectory() ;
716 if (dropArtFile != NULL)
719 len = pathMax(tapeDir) + 1;
720 dropArtFile = xmalloc(len);
721 snprintf (dropArtFile,len,"%s/innfeed-dropped.%c%06d",
722 tapeDir, droppedFileCount + 'A', (int) myPid) ;
724 if ((droppedFp = fopen (dropArtFile,"w")) == NULL)
726 syswarn ("ME cant open %s: loosing articles", dropArtFile) ;
731 if ((droppedFp = fopen ("/dev/null","w")) == NULL)
733 die ("ME error opening /dev/null") ;
740 void closeDroppedArticleFile (void)
744 if (droppedFp == NULL)
748 pos = ftello (droppedFp) ;
753 if (pos == 0 && dropArtFile != NULL)
754 unlink (dropArtFile) ;
755 else if (pos != 0 && dropArtFile == NULL)
756 warn ("ME lost %ld articles", droppedCount) ;
758 notice ("ME dropped %ld articles", droppedCount) ;
760 droppedFileCount = (droppedFileCount + 1) % 26 ;
764 static void dropArticle (const char *peerName, Article article)
766 static bool logged = false ;
770 warn ("ME dropping articles into %s", dropArtFile) ;
775 fprintf (droppedFp,"%s %s %s\n",artFileName (article),
776 artMsgId (article), peerName) ;
780 static void listenerCleanup (void)