1 /* $Id: host.c 7833 2008-05-18 20:04:35Z iulius $
3 ** The implementation of the innfeed Host class.
5 ** Written by James Brister <brister@vix.com>
11 #include "portable/socket.h"
20 #include <sys/param.h>
26 #include "inn/innconf.h"
27 #include "inn/messages.h"
32 #include "configfile.h"
33 #include "connection.h"
36 #include "innlistener.h"
44 #define VALUE_TOO_HIGH 1
45 #define VALUE_TOO_LOW 2
46 #define VALUE_MISSING 3
47 #define VALUE_WRONG_TYPE 4
49 #define METHOD_STATIC 0
51 #define METHOD_QUEUE 2
52 #define METHOD_COMBINED 3
54 /* the limit of number of connections open when a host is
55 set to 0 to mean "infinite" */
57 #define MAXCONLIMIT(xx) ((xx==0)?MAXCON:xx)
59 #define BACKLOGFILTER 0.7
60 #define BACKLOGLWM 20.0
61 #define BACKLOGHWM 50.0
63 /* time between retrying blocked hosts in seconds */
64 #define TRYBLOCKEDHOSTPERIOD 120
66 extern char *configFile ;
67 #if defined(hpux) || defined(__hpux) || defined(_SCO_DS)
71 /* the host keeps a couple lists of these */
72 typedef struct proc_q_elem
75 struct proc_q_elem *next ;
76 struct proc_q_elem *prev ;
77 time_t whenToRequeue ;
80 typedef struct host_param_s
84 struct sockaddr_in *bindAddr;
86 struct sockaddr_in6 *bindAddr6;
89 unsigned int articleTimeout;
90 unsigned int responseTimeout;
91 unsigned int initialConnections;
92 unsigned int absMaxConnections;
93 unsigned int maxChecks;
94 unsigned short portNum;
96 unsigned int closePeriod;
97 unsigned int dynamicMethod;
101 double lowPassLow; /* as percentages */
103 double lowPassFilter;
104 unsigned int backlogLimit ;
105 unsigned int backlogLimitHigh ;
106 double backlogFactor ;
107 double dynBacklogFilter ;
108 double dynBacklogLowWaterMark ;
109 double dynBacklogHighWaterMark ;
110 bool backlogFeedFirst ;
117 InnListener listener ; /* who created me. */
118 struct sockaddr **ipAddrs ; /* the ip addresses of the remote */
119 int nextIpAddr ; /* the next ip address to hand out */
121 Connection *connections ; /* NULL-terminated list of all connections */
122 bool *cxnActive ; /* true if the corresponding cxn is active */
123 bool *cxnSleeping ; /* true if the connection is sleeping */
124 unsigned int maxConnections; /* maximum no of cxns controlled by method */
125 unsigned int activeCxns ; /* number of connections currently active */
126 unsigned int sleepingCxns ; /* number of connections currently sleeping */
127 Connection blockedCxn ; /* the first connection to get the 400 banner*/
128 Connection notThisCxn ; /* don't offer articles to this connection */
130 HostParams params; /* Parameters from config file */
132 bool remoteStreams ; /* true if remote supports streaming */
134 ProcQElem queued ; /* articles done nothing with yet. */
135 ProcQElem queuedTail ;
137 ProcQElem processed ; /* articles given to a Connection */
138 ProcQElem processedTail ;
140 ProcQElem deferred ; /* articles which have been deferred by */
141 ProcQElem deferredTail ; /* a connection */
143 TimeoutId statsId ; /* timeout id for stats logging. */
144 TimeoutId ChkCxnsId ; /* timeout id for dynamic connections */
145 TimeoutId deferredId ; /* timeout id for deferred articles */
149 bool backedUp ; /* set to true when all cxns are full */
150 unsigned int backlog ; /* number of arts in `queued' queue */
151 unsigned int deferLen ; /* number of arts in `deferred' queue */
153 bool loggedModeOn ; /* true if we logged going into no-CHECK mode */
154 bool loggedModeOff ; /* true if we logged going out of no-CHECK mode */
156 bool loggedBacklog ; /* true if we already logged the fact */
157 bool notifiedChangedRemBlckd ; /* true if we logged a new response 400 */
158 bool removeOnReload ; /* true if host should be removed at end of
161 bool isDynamic; /* true if host created dynamically */
163 /* these numbers get reset periodically (after a 'final' logging). */
164 unsigned int artsOffered ; /* # of articles we offered to remote. */
165 unsigned int artsAccepted ; /* # of articles succesfully transferred */
166 unsigned int artsNotWanted ; /* # of articles remote already had */
167 unsigned int artsRejected ; /* # of articles remote rejected */
168 unsigned int artsDeferred ; /* # of articles remote asked us to retry */
169 unsigned int artsMissing ; /* # of articles whose file was missing. */
170 unsigned int artsToTape ; /* # of articles given to tape */
171 unsigned int artsQueueOverflow ; /* # of articles that overflowed `queued' */
172 unsigned int artsCxnDrop ; /* # of articles caught in dead cxn */
173 unsigned int artsHostSleep ; /* # of articles spooled by sleeping host */
174 unsigned int artsHostClose ; /* # of articles caught by closing host */
175 unsigned int artsFromTape ; /* # of articles we pulled off tape */
176 double artsSizeAccepted ; /* size of articles succesfully transferred */
177 double artsSizeRejected ; /* size of articles remote rejected */
179 /* Dynamic Peerage - MGF */
180 unsigned int artsProcLastPeriod ; /* # of articles processed in last period */
181 unsigned int secsInLastPeriod ; /* Number of seconds in last period */
182 unsigned int lastCheckPoint ; /* total articles at end of last period */
183 unsigned int lastSentCheckPoint ; /* total articles sent end of last period */
184 unsigned int lastTotalCheckPoint ; /* total articles total end of last period */
185 bool maxCxnChk ; /* check for maxConnections */
186 time_t lastMaxCxnTime ; /* last time a maxConnections increased */
187 time_t lastChkTime; /* last time a check was made for maxConnect */
188 unsigned int nextCxnTimeChk ; /* next check for maxConnect */
190 double backlogFilter; /* IIR filter for size of backlog */
192 /* These numbers are as above, but for the life of the process. */
193 unsigned int gArtsOffered ;
194 unsigned int gArtsAccepted ;
195 unsigned int gArtsNotWanted ;
196 unsigned int gArtsRejected ;
197 unsigned int gArtsDeferred ;
198 unsigned int gArtsMissing ;
199 unsigned int gArtsToTape ;
200 unsigned int gArtsQueueOverflow ;
201 unsigned int gArtsCxnDrop ;
202 unsigned int gArtsHostSleep ;
203 unsigned int gArtsHostClose ;
204 unsigned int gArtsFromTape ;
205 double gArtsSizeAccepted ;
206 double gArtsSizeRejected ;
207 unsigned int gCxnQueue ;
208 unsigned int gNoQueue ;
210 time_t firstConnectTime ; /* time of first connect. */
211 time_t connectTime ; /* the time the first connection was fully
212 set up (MODE STREAM and everything
214 time_t spoolTime ; /* the time the Host had to revert to
215 spooling articles to tape. */
216 time_t lastSpoolTime ; /* the time the last time the Host had to
217 revert to spooling articles to tape. */
218 time_t nextIpLookup ; /* time of last IP name resolution */
220 char *blockedReason ; /* what the 400 from the remote says. */
222 Host next ; /* for global list of hosts. */
224 unsigned long dlAccum ; /* cumulative deferLen */
225 unsigned int blNone ; /* number of times the backlog was 0 */
226 unsigned int blFull ; /* number of times the backlog was full */
227 unsigned int blQuartile[4] ; /* number of times in each quartile */
228 unsigned long blAccum ; /* cumulative backlog for computing mean */
229 unsigned int blCount ; /* the sample count */
232 /* A holder for the info we got out of the config file, but couldn't create
233 the Host object for (normally due to lock-file problems).*/
235 typedef struct host_holder_s
238 struct host_holder_s *next ;
242 /* These numbers are as above, but for all hosts over
243 the life of the process. */
244 long procArtsOffered ;
245 long procArtsAccepted ;
246 long procArtsNotWanted ;
247 long procArtsRejected ;
248 long procArtsDeferred ;
249 long procArtsMissing ;
250 double procArtsSizeAccepted ;
251 double procArtsSizeRejected ;
252 long procArtsToTape ;
253 long procArtsFromTape ;
255 static HostParams defaultParams=NULL;
257 static HostHolder blockedHosts ; /* lists of hosts we can't lock */
258 static TimeoutId tryBlockedHostsId = 0 ;
259 static time_t lastStatusLog ;
262 * Host object private methods.
264 static void articleGone (Host host, Connection cxn, Article article) ;
265 static void hostStopSpooling (Host host) ;
266 static void hostStartSpooling (Host host) ;
267 static void hostLogStats (Host host, bool final) ;
268 static void hostStatsTimeoutCbk (TimeoutId tid, void *data) ;
269 static void hostDeferredArtCbk (TimeoutId tid, void *data) ;
270 static void backlogToTape (Host host) ;
271 static void queuesToTape (Host host) ;
272 static bool amClosing (Host host) ;
273 static void hostLogStatus (void) ;
274 static void hostPrintStatus (Host host, FILE *fp) ;
275 static int validateBool (FILE *fp, const char *name,
276 int required, bool setval,
277 scope * sc, unsigned int inh);
278 static int validateReal (FILE *fp, const char *name, double low,
279 double high, int required, double setval,
280 scope * sc, unsigned int inh);
281 static int validateInteger (FILE *fp, const char *name,
282 long low, long high, int required, long setval,
283 scope * sc, unsigned int inh);
285 static HostParams newHostParams(HostParams p);
286 static void freeHostParams(HostParams params);
288 static HostHolder FindBlockedHost(const char *name);
289 static void addBlockedHost(HostParams params);
290 static void tryBlockedHosts(TimeoutId tid, void *data);
291 static Host newHost (InnListener listener, HostParams p);
293 static HostParams getHostInfo (void);
294 static HostParams hostDetails (scope *s,
299 static Host findHostByName (char *name) ;
300 static void hostCleanup (void) ;
301 static void hostAlterMaxConnections(Host host,
302 unsigned int absMaxCxns, unsigned int maxCxns,
305 /* article queue management functions */
306 static Article remHead (ProcQElem *head, ProcQElem *tail) ;
307 static void queueArticle (Article article, ProcQElem *head, ProcQElem *tail,
309 static bool remArticle (Article article, ProcQElem *head, ProcQElem *tail) ;
319 /* if true then when a Host logs its stats, it has all its connections
321 static bool logConnectionStats = (bool) LOG_CONNECTION_STATS ;
323 /* The frequency in seconds with which a Host will log its stats. */
324 static time_t statsPeriod = STATS_PERIOD ;
325 static time_t statsResetPeriod = STATS_RESET_PERIOD ;
327 static Host gHostList = NULL ;
329 static unsigned int gHostCount = 0 ;
331 static unsigned int maxIpNameLen = 0 ;
332 static unsigned int maxPeerNameLen = 0 ;
334 static unsigned int hostHighwater = HOST_HIGHWATER ;
335 static time_t start ;
336 static char startTime [30] ; /* for ctime(3) */
339 static char *statusFile = NULL ;
340 static unsigned int dnsRetPeriod ;
341 static unsigned int dnsExpPeriod ;
343 bool genHtml = false ;
345 /*******************************************************************/
346 /* PUBLIC FUNCTIONS */
347 /*******************************************************************/
350 /* function called when the config file is loaded */
351 int hostConfigLoadCbk (void *data)
355 FILE *fp = (FILE *) data ;
359 d_printf(1,"hostConfigLoadCbk\n");
363 freeHostParams(defaultParams);
367 /* get optional global defaults */
368 if (getInteger (topScope,"dns-retry",&iv,NO_INHERIT))
373 logOrPrint (LOG_ERR,fp,
374 "ME config: value of %s (%ld) in %s cannot be less"
375 " than 1. Using %ld","dns-retry",
376 iv,"global scope",(long)DNS_RETRY_PERIOD) ;
377 iv = DNS_RETRY_PERIOD ;
381 iv = DNS_RETRY_PERIOD ;
382 dnsRetPeriod = (unsigned int) iv ;
385 if (getInteger (topScope,"dns-expire",&iv,NO_INHERIT))
390 logOrPrint (LOG_ERR,fp,
391 "ME config: value of %s (%ld) in %s cannot be less"
392 " than 1. Using %ld","dns-expire",iv,
393 "global scope",(long)DNS_EXPIRE_PERIOD) ;
394 iv = DNS_EXPIRE_PERIOD ;
398 iv = DNS_EXPIRE_PERIOD ;
399 dnsExpPeriod = (unsigned int) iv ;
401 if (getBool (topScope,"gen-html",&bval,NO_INHERIT))
402 genHtml = (bval ? true : false) ;
406 if (getString (topScope,"status-file",&p,NO_INHERIT))
408 hostSetStatusFile (p) ;
412 hostSetStatusFile (INNFEED_STATUS) ;
415 if (getBool (topScope,"connection-stats",&bval,NO_INHERIT))
416 logConnectionStats = (bval ? true : false) ;
418 logConnectionStats = (LOG_CONNECTION_STATS ? true : false) ;
421 if (getInteger (topScope,"host-queue-highwater", &iv,NO_INHERIT))
426 logOrPrint (LOG_ERR,fp,
427 "ME config: value of %s (%ld) in %s cannot be less"
428 " than 0. Using %ld","host-queue-highwater",
429 iv,"global scope",(long) HOST_HIGHWATER) ;
430 iv = HOST_HIGHWATER ;
434 iv = HOST_HIGHWATER ;
435 hostHighwater = (unsigned int) iv ;
437 if (getInteger (topScope,"stats-period",&iv,NO_INHERIT))
442 logOrPrint (LOG_ERR,fp,
443 "ME config: value of %s (%ld) in %s cannot be less"
444 " than 0. Using %ld","stats-period",
445 iv,"global scope",(long)STATS_PERIOD) ;
451 statsPeriod = (unsigned int) iv ;
454 if (getInteger (topScope,"stats-reset",&iv,NO_INHERIT))
459 logOrPrint (LOG_ERR,fp,
460 "ME config: value of %s (%ld) in %s cannot be less"
461 " than 0. Using %ld","stats-reset",iv,
462 "global scope",(long)STATS_RESET_PERIOD) ;
463 iv = STATS_RESET_PERIOD ;
467 iv = STATS_RESET_PERIOD ;
468 statsResetPeriod = (unsigned int) iv ;
470 defaultParams=hostDetails(topScope, NULL, true, fp);
471 ASSERT(defaultParams!=NULL);
477 * make a new HostParams structure copying an existing one
478 * or from compiled defaults
481 HostParams newHostParams(HostParams p)
485 params = xmalloc (sizeof(struct host_param_s)) ;
489 /* Copy old stuff in */
490 memcpy ((char *) params, (char *) p, sizeof(struct host_param_s));
491 if (params->peerName)
492 params->peerName = xstrdup(params->peerName);
494 params->ipName = xstrdup(params->ipName);
495 if (params->bindAddr)
497 struct sockaddr_in *s = params->bindAddr;
498 params->bindAddr = xmalloc(sizeof(*s));
499 memcpy(params->bindAddr, s, sizeof(*s));
502 if (params->bindAddr6)
504 struct sockaddr_in6 *s = params->bindAddr6;
505 params->bindAddr6 = xmalloc(sizeof(*s));
506 memcpy(params->bindAddr6, s, sizeof(*s));
512 /* Fill in defaults */
513 params->peerName=NULL;
515 params->bindAddr=NULL;
517 params->bindAddr6=NULL;
520 params->articleTimeout=ARTTOUT;
521 params->responseTimeout=RESPTOUT;
522 params->initialConnections=INIT_CXNS;
523 params->absMaxConnections=MAX_CXNS;
524 params->maxChecks=MAX_Q_SIZE;
525 params->portNum=PORTNUM;
526 params->forceIPv4=FORCE_IPv4;
527 params->closePeriod=CLOSE_PERIOD;
528 params->dynamicMethod=METHOD_STATIC;
529 params->wantStreaming=STREAM;
530 params->dropDeferred=false;
531 params->minQueueCxn=false;
532 params->lowPassLow=NOCHECKLOW;
533 params->lowPassHigh=NOCHECKHIGH;
534 params->lowPassFilter=FILTERVALUE;
535 params->backlogLimit=BLOGLIMIT;
536 params->backlogLimitHigh=BLOGLIMIT_HIGH ;
537 params->backlogFactor=LIMIT_FUDGE ;
538 params->dynBacklogFilter = BACKLOGFILTER ;
539 params->dynBacklogLowWaterMark = BACKLOGLWM;
540 params->dynBacklogHighWaterMark = BACKLOGHWM;
541 params->backlogFeedFirst=false;
542 params->username=NULL;
543 params->password=NULL;
549 * Free up a param structure
552 void freeHostParams(HostParams params)
554 ASSERT(params != NULL);
555 if (params->peerName)
556 free (params->peerName) ;
558 free (params->ipName) ;
559 if (params->bindAddr)
560 free (params->bindAddr) ;
562 if (params->bindAddr6)
563 free (params->bindAddr6) ;
568 static void hostReconfigure(Host h, HostParams params)
570 unsigned int i, absMaxCxns ;
571 double oldBacklogFilter ;
573 if (strcmp(h->params->ipName, params->ipName) != 0)
575 free (h->params->ipName) ;
576 h->params->ipName = xstrdup (params->ipName) ;
577 h->nextIpLookup = theTime () ;
580 /* Put in new parameters
581 Unfortunately we can't blat on top of absMaxConnections
582 as we need to do some resizing here
585 ASSERT (h->params != NULL);
587 oldBacklogFilter = h->params->dynBacklogFilter;
588 i = h->params->absMaxConnections; /* keep old value */
589 absMaxCxns = params->absMaxConnections;
590 /* Use this set of params and allocate, and free
593 freeHostParams(h->params);
595 h->params->absMaxConnections = i; /* restore old value */
597 /* If the backlog filter value has changed, reset the
598 * filter as the value therein will be screwy
600 if (h->params->dynBacklogFilter != oldBacklogFilter)
601 h->backlogFilter = ((h->params->dynBacklogLowWaterMark
602 + h->params->dynBacklogHighWaterMark)
603 /200.0 /(1.0-h->params->dynBacklogFilter));
605 /* We call this anyway - it does nothing if the values
606 * haven't changed. This is because doing things like
607 * just changing "dynamic-method" requires this call
610 hostAlterMaxConnections(h, absMaxCxns, h->maxConnections, false);
612 for ( i = 0 ; i < MAXCONLIMIT(h->params->absMaxConnections) ; i++ )
613 if (h->connections[i] != NULL)
614 cxnSetCheckThresholds (h->connections[i],
615 h->params->lowPassLow,
616 h->params->lowPassHigh,
617 h->params->lowPassFilter) ;
619 /* XXX how to handle initCxns change? */
623 void configHosts (bool talkSelf)
629 /* Remove the current blocked host list */
630 for (hh = blockedHosts, hi = NULL ; hh != NULL ; hh = hi)
632 freeHostParams(hh->params);
636 blockedHosts = NULL ;
638 closeDroppedArticleFile () ;
639 openDroppedArticleFile () ;
641 while ((params = getHostInfo ()) !=NULL )
643 h = findHostByName (params->peerName) ;
644 /* We know the host isn't blocked as we cleared the blocked list */
645 /* Have we already got this host up and running ?*/
648 hostReconfigure(h, params);
649 h->removeOnReload = false ; /* Don't remove at the end */
654 /* It's a host we haven't seen from the config file before */
655 nHost = newHost (mainListener, params);
659 addBlockedHost(params);
661 warn ("ME locked cannot setup peer %s", params->peerName) ;
665 if (params->initialConnections == 0 && talkSelf)
666 notice ("%s config ignored batch mode with initial"
667 " connection count of 0", params->peerName) ;
669 if ( !listenerAddPeer (mainListener,nHost) )
670 die ("failed to add a new peer\n") ;
677 for (h = gHostList; h != NULL; h = q)
680 if (h->removeOnReload)
684 /* change to the new default parameters */
685 params = newHostParams(defaultParams);
686 ASSERT(params->peerName == NULL);
687 ASSERT(params->ipName == NULL);
688 ASSERT(h->params->peerName != NULL);
689 ASSERT(h->params->ipName != NULL);
690 params->peerName = xstrdup(h->params->peerName);
691 params->ipName = xstrdup(h->params->ipName);
692 hostReconfigure(h, params);
693 h->removeOnReload = true;
696 hostClose (h) ; /* h may be deleted in here. */
699 /* prime it for the next config file read */
700 h->removeOnReload = true ;
707 void hostAlterMaxConnections(Host host,
708 unsigned int absMaxCxns, unsigned int maxCxns,
711 unsigned int lAbsMaxCxns;
714 /* Fix 0 unlimited case */
715 lAbsMaxCxns = MAXCONLIMIT(absMaxCxns);
717 /* Don't accept 0 for maxCxns */
718 maxCxns=MAXCONLIMIT(maxCxns);
720 if ( host->params->dynamicMethod == METHOD_STATIC)
722 /* If running static, ignore the maxCxns passed in, we'll
725 maxCxns = lAbsMaxCxns;
728 if ( maxCxns > lAbsMaxCxns)
730 /* ensure maxCxns is of the correct form */
731 maxCxns = lAbsMaxCxns;
734 if ((maxCxns < host->maxConnections) && (host->connections != NULL))
736 /* We are going to have to nuke some connections, as the current
737 max is now greater than the new max
739 for ( i = host->maxConnections ; i > maxCxns ; i-- )
741 /* XXX this is harsh, and arguably there could be a
742 cleaner way of doing it. the problem being addressed
743 by doing it this way is that eventually a connection
744 closed cleanly via cxnClose can end up ultimately
745 calling hostCxnDead after h->maxConnections has
746 been lowered and the relevant arrays downsized.
747 If trashing the old, unallocated space in
748 hostCxnDead doesn't kill the process, the
749 ASSERT against h->maxConnections surely will.
751 if (host->connections[i - 1] != NULL)
753 cxnLogStats (host->connections [i-1], true) ;
754 cxnNuke (host->connections[i-1]) ;
755 host->connections[i-1] = NULL;
758 host->maxConnections = maxCxns ;
761 if (host->connections)
762 for (i = host->maxConnections ; i <= MAXCONLIMIT(host->params->absMaxConnections) ; i++)
764 /* Ensure we've got an empty values only beyond the maxConnection
767 ASSERT (host->connections[i] == NULL);
770 if ((lAbsMaxCxns != MAXCONLIMIT(host->params->absMaxConnections)) ||
771 (host->connections == NULL))
773 /* we need to change the size of the connection array */
774 if (host->connections == NULL)
776 /* not yet allocated */
778 host->connections = xcalloc (lAbsMaxCxns + 1, sizeof(Connection)) ;
780 ASSERT (host->cxnActive == NULL);
781 host->cxnActive = xcalloc (lAbsMaxCxns, sizeof(bool)) ;
783 ASSERT (host->cxnSleeping == NULL) ;
784 host->cxnSleeping = xcalloc (lAbsMaxCxns, sizeof(bool)) ;
786 for (i = 0 ; i < lAbsMaxCxns ; i++)
788 host->connections [i] = NULL ;
789 host->cxnActive[i] = false ;
790 host->cxnSleeping[i] = false ;
792 host->connections[lAbsMaxCxns] = NULL;
797 xrealloc (host->connections,
798 sizeof(Connection) * (lAbsMaxCxns + 1));
799 host->cxnActive = xrealloc (host->cxnActive,
800 sizeof(bool) * lAbsMaxCxns) ;
801 host->cxnSleeping = xrealloc (host->cxnSleeping,
802 sizeof(bool) * lAbsMaxCxns) ;
804 if (lAbsMaxCxns > MAXCONLIMIT(host->params->absMaxConnections))
806 for (i = MAXCONLIMIT(host->params->absMaxConnections) ;
807 i < lAbsMaxCxns ; i++)
809 host->connections[i+1] = NULL; /* array always 1 larger */
810 host->cxnActive[i] = false ;
811 host->cxnSleeping[i] = false ;
815 host->params->absMaxConnections = absMaxCxns;
817 /* if maximum was raised, establish the new connexions
818 (but don't start using them).
820 if ( maxCxns > host->maxConnections)
822 i = host->maxConnections ;
823 /* need to set host->maxConnections before cxnWait() */
824 host->maxConnections = maxCxns;
826 while ( i < maxCxns )
828 host->cxnActive [i] = false ;
829 host->cxnSleeping [i] = false ;
830 /* create a new connection */
831 host->connections [i] =
832 newConnection (host, i,
833 host->params->ipName,
834 host->params->articleTimeout,
835 host->params->portNum,
836 host->params->responseTimeout,
837 host->params->closePeriod,
838 host->params->lowPassLow,
839 host->params->lowPassHigh,
840 host->params->lowPassFilter) ;
842 /* connect if low enough numbered, or we were forced to */
843 if ((i < host->params->initialConnections) || makeConnect)
844 cxnConnect (host->connections [i]) ;
846 cxnWait (host->connections [i]) ;
854 * Find a host on the blocked host list
857 static HostHolder FindBlockedHost(const char *name)
859 HostHolder hh = blockedHosts;
861 if ((hh->params) && (hh->params->peerName) &&
862 (strcmp(name,hh->params->peerName) == 0))
869 static void addBlockedHost(HostParams params)
873 hh = xmalloc (sizeof(struct host_holder_s)) ;
874 /* Use this set of params */
878 hh->next = blockedHosts ;
883 * We iterate through the blocked host list and try and reconnect ones
884 * where we couldn't get a lock
886 static void tryBlockedHosts(TimeoutId tid UNUSED , void *data UNUSED )
891 hh = blockedHosts; /* Get start of our queue */
892 blockedHosts = NULL ; /* remove them all from the queue of hosts */
901 if (params && params->peerName)
903 if (findHostByName(params->peerName)!=NULL)
905 /* Wierd, someone's managed to start it when it's on
906 * the blocked list. Just silently discard.
908 freeHostParams(params);
913 nHost = newHost (mainListener, params);
917 addBlockedHost(params);
919 warn ("ME locked cannot setup peer %s", params->peerName) ;
923 d_printf(1,"Unblocked host %s\n",params->peerName);
925 if (params->initialConnections == 0 &&
926 listenerIsDummy(mainListener) /*talk to self*/)
927 notice ("%s config ignored batch mode with initial"
928 " connection count of 0", params->peerName) ;
930 if ( !listenerAddPeer (mainListener,nHost) )
931 die ("failed to add a new peer\n") ;
936 tryBlockedHostsId = prepareSleep(tryBlockedHosts,
937 TRYBLOCKEDHOSTPERIOD, NULL);
942 * Create a new Host object with default parameters. Called by the
946 Host newDefaultHost (InnListener listener,
952 if (FindBlockedHost(name)==NULL)
955 p=newHostParams(defaultParams);
958 /* relies on fact listener and names are null in default*/
959 p->peerName=xstrdup(name);
960 p->ipName=xstrdup(name);
962 h=newHost (listener,p);
965 /* Couldn't get a lock - add to list of blocked peers */
968 warn ("ME locked cannot setup peer %s", p->peerName);
974 h->removeOnReload = true;
976 notice ("ME unconfigured peer %s added", p->peerName) ;
982 * Create a new host and attach the supplied param structure
985 static bool inited = false ;
986 Host newHost (InnListener listener, HostParams p)
990 ASSERT (p->maxChecks > 0) ;
995 atexit (hostCleanup) ;
999 * Once only, init the first blocked host check
1001 if (tryBlockedHostsId==0)
1002 tryBlockedHostsId = prepareSleep(tryBlockedHosts,
1003 TRYBLOCKEDHOSTPERIOD, NULL);
1005 nh = xcalloc (1, sizeof(struct host_s)) ;
1008 nh->listener = listener;
1010 nh->connections = NULL; /* We'll get these allocated later */
1011 nh->cxnActive = NULL;
1012 nh->cxnSleeping = NULL;
1014 nh->activeCxns = 0 ;
1015 nh->sleepingCxns = 0 ;
1017 nh->blockedCxn = NULL ;
1018 nh->notThisCxn = NULL ;
1021 nh->queuedTail = NULL ;
1023 nh->processed = NULL ;
1024 nh->processedTail = NULL ;
1026 nh->deferred = NULL ;
1027 nh->deferredTail = NULL ;
1033 nh->myTape = newTape (nh->params->peerName,
1034 listenerIsDummy (nh->listener)) ;
1035 if (nh->myTape == NULL)
1036 { /* tape couldn't be locked, probably */
1037 free (nh->connections) ;
1038 free (nh->cxnActive) ;
1039 free (nh->cxnSleeping) ;
1042 return NULL ; /* note we don't free up p */
1045 nh->backedUp = false ;
1049 nh->loggedBacklog = false ;
1050 nh->loggedModeOn = false ;
1051 nh->loggedModeOff = false ;
1052 nh->notifiedChangedRemBlckd = false ;
1053 nh->removeOnReload = false ; /* ready for config file reload */
1054 nh->isDynamic = false ;
1056 nh->artsOffered = 0 ;
1057 nh->artsAccepted = 0 ;
1058 nh->artsNotWanted = 0 ;
1059 nh->artsRejected = 0 ;
1060 nh->artsDeferred = 0 ;
1061 nh->artsMissing = 0 ;
1062 nh->artsToTape = 0 ;
1063 nh->artsQueueOverflow = 0 ;
1064 nh->artsCxnDrop = 0 ;
1065 nh->artsHostSleep = 0 ;
1066 nh->artsHostClose = 0 ;
1067 nh->artsFromTape = 0 ;
1068 nh->artsSizeAccepted = 0 ;
1069 nh->artsSizeRejected = 0 ;
1071 nh->artsProcLastPeriod = 0;
1072 nh->secsInLastPeriod = 0;
1073 nh->lastCheckPoint = 0;
1074 nh->lastSentCheckPoint = 0;
1075 nh->lastTotalCheckPoint = 0;
1076 nh->maxCxnChk = true;
1077 nh->lastMaxCxnTime = time(0);
1078 nh->lastChkTime = time(0);
1079 nh->nextCxnTimeChk = 30;
1080 nh->backlogFilter = ((nh->params->dynBacklogLowWaterMark
1081 + nh->params->dynBacklogHighWaterMark)
1082 /200.0 /(1.0-nh->params->dynBacklogFilter));
1084 nh->gArtsOffered = 0 ;
1085 nh->gArtsAccepted = 0 ;
1086 nh->gArtsNotWanted = 0 ;
1087 nh->gArtsRejected = 0 ;
1088 nh->gArtsDeferred = 0 ;
1089 nh->gArtsMissing = 0 ;
1090 nh->gArtsToTape = 0 ;
1091 nh->gArtsQueueOverflow = 0 ;
1092 nh->gArtsCxnDrop = 0 ;
1093 nh->gArtsHostSleep = 0 ;
1094 nh->gArtsHostClose = 0 ;
1095 nh->gArtsFromTape = 0 ;
1096 nh->gArtsSizeAccepted = 0 ;
1097 nh->gArtsSizeRejected = 0 ;
1101 nh->firstConnectTime = 0 ;
1102 nh->connectTime = 0 ;
1108 nh->blQuartile[0] = nh->blQuartile[1] = nh->blQuartile[2] =
1109 nh->blQuartile[3] = 0 ;
1115 nh->maxConnections = 0; /* we currently have no connections allocated */
1117 /* Note that the following will override the initialCxns specified as
1118 maxCxns if we are on non-dyamic feed
1120 hostAlterMaxConnections(nh, nh->params->absMaxConnections,
1121 nh->params->initialConnections, false);
1123 nh->next = gHostList ;
1127 if (maxIpNameLen == 0)
1130 strlcpy (startTime,ctime (&start),sizeof (startTime)) ;
1134 if (strlen (nh->params->ipName) > maxIpNameLen)
1135 maxIpNameLen = strlen (nh->params->ipName) ;
1136 if (strlen (nh->params->peerName) > maxPeerNameLen)
1137 maxPeerNameLen = strlen (nh->params->peerName) ;
1142 struct sockaddr *hostIpAddr (Host host, int family)
1145 struct sockaddr **newIpAddrPtrs = NULL;
1146 struct sockaddr_storage *newIpAddrs = NULL;
1147 struct sockaddr *returnAddr;
1149 ASSERT(host->params != NULL);
1151 /* check to see if need to look up the host name */
1152 if (host->nextIpLookup <= theTime())
1156 struct addrinfo *res, *p;
1157 struct addrinfo hints;
1159 memset(&hints, 0, sizeof(hints));
1160 hints.ai_family = family ? family : AF_UNSPEC;
1161 hints.ai_socktype = SOCK_STREAM;
1162 #ifdef AI_ADDRCONFIG
1163 hints.ai_flags = AI_ADDRCONFIG;
1165 if((gai_ret = getaddrinfo(host->params->ipName, NULL, &hints, &res)) != 0
1168 warn ("%s can't resolve hostname %s: %s", host->params->peerName,
1169 host->params->ipName, gai_ret == 0 ? "no addresses returned"
1170 : gai_strerror(gai_ret)) ;
1174 /* figure number of pointers that need space */
1176 for ( p = res ; p ; p = p->ai_next ) ++i;
1178 newIpAddrPtrs = (struct sockaddr **)
1179 xmalloc ( (i + 1) * sizeof(struct sockaddr *) );
1181 newIpAddrs = (struct sockaddr_storage *)
1182 xmalloc ( i * sizeof(struct sockaddr_storage) );
1185 /* copy the addresses from the getaddrinfo linked list */
1186 for( p = res ; p ; p = p->ai_next )
1188 memcpy( &newIpAddrs[i], p->ai_addr, p->ai_addrlen );
1189 newIpAddrPtrs[i] = (struct sockaddr *)(&newIpAddrs[i]);
1192 newIpAddrPtrs[i] = NULL ;
1193 freeaddrinfo( res );
1196 struct hostent *hostEnt ;
1197 struct in_addr ipAddr;
1199 /* see if the ipName we're given is a dotted quad */
1200 if ( !inet_aton (host->params->ipName,&ipAddr) )
1202 if ((hostEnt = gethostbyname (host->params->ipName)) == NULL)
1204 warn ("%s can't resolve hostname %s: %s", host->params->peerName,
1205 host->params->ipName, hstrerror(h_errno)) ;
1209 /* figure number of pointers that need space */
1210 for (i = 0 ; hostEnt->h_addr_list[i] ; i++)
1213 newIpAddrPtrs = xmalloc ((i + 1) * sizeof(struct sockaddr *));
1214 newIpAddrs = xmalloc (i * sizeof(struct sockaddr_storage));
1216 /* copy the addresses from gethostbyname() static space */
1218 for (i = 0 ; hostEnt->h_addr_list[i] ; i++)
1220 make_sin( (struct sockaddr_in *)(&newIpAddrs[i]),
1221 (struct in_addr *)(hostEnt->h_addr_list[i]) );
1222 newIpAddrPtrs[i] = (struct sockaddr *)(&newIpAddrs[i]);
1224 newIpAddrPtrs[i] = NULL ;
1229 newIpAddrPtrs = (struct sockaddr **)
1230 xmalloc ( 2 * sizeof( struct sockaddr * ) );
1231 newIpAddrs = (struct sockaddr_storage *)
1232 xmalloc ( sizeof( struct sockaddr_storage ) );
1234 make_sin( (struct sockaddr_in *)newIpAddrs, &ipAddr );
1235 newIpAddrPtrs[0] = (struct sockaddr *)newIpAddrs;
1236 newIpAddrPtrs[1] = NULL;
1244 if(host->ipAddrs[0])
1245 free (host->ipAddrs[0]);
1246 free (host->ipAddrs) ;
1248 host->ipAddrs = newIpAddrPtrs ;
1249 host->nextIpAddr = 0 ;
1250 host->nextIpLookup = theTime () + dnsExpPeriod ;
1254 /* failed to setup new addresses */
1255 host->nextIpLookup = theTime () + dnsRetPeriod ;
1260 returnAddr = host->ipAddrs[host->nextIpAddr] ;
1270 * Delete IPv4 addresses from the address list.
1272 void hostDeleteIpv4Addr (Host host)
1278 for (i = 0, j = 0; host->ipAddrs[i]; i++) {
1279 if (host->ipAddrs[i]->sa_family != AF_INET)
1280 host->ipAddrs[j++] = host->ipAddrs[i];
1282 host->ipAddrs[j] = 0;
1283 if (host->nextIpAddr >= j)
1284 host->nextIpAddr = 0;
1289 void hostIpFailed (Host host)
1292 if (host->ipAddrs[++host->nextIpAddr] == NULL)
1293 host->nextIpAddr = 0 ;
1297 void gPrintHostInfo (FILE *fp, unsigned int indentAmt)
1300 char indent [INDENT_BUFFER_SIZE] ;
1303 for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
1307 fprintf (fp,"%sGlobal Host list : (count %d) {\n",indent,gHostCount) ;
1309 for (h = gHostList ; h != NULL ; h = h->next)
1310 printHostInfo (h,fp,indentAmt + INDENT_INCR) ;
1312 fprintf (fp,"%s}\n",indent) ;
1316 void printHostInfo (Host host, FILE *fp, unsigned int indentAmt)
1318 char indent [INDENT_BUFFER_SIZE] ;
1321 double cnt = (host->blCount) ? (host->blCount) : 1.0;
1323 for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
1327 fprintf (fp,"%sHost : %p {\n",indent,(void *) host) ;
1331 fprintf (fp,"%s}\n",indent) ;
1335 fprintf (fp,"%s peer-name : %s\n",indent,host->params->peerName) ;
1336 fprintf (fp,"%s ip-name : %s\n",indent,host->params->ipName) ;
1338 if (host->params->family == AF_INET6)
1340 fprintf (fp,"%s bindaddress : none\n",indent);
1345 fprintf (fp,"%s bindaddress : %s\n",indent,
1346 host->params->bindAddr == NULL ||
1347 host->params->bindAddr->sin_addr.s_addr == 0 ? "any" :
1348 inet_ntoa(host->params->bindAddr->sin_addr));
1351 if (host->params->family == AF_INET)
1353 fprintf (fp,"%s bindaddress6 : none\n",indent);
1358 fprintf (fp,"%s bindaddress6 : %s\n",indent,
1359 host->params->bindAddr6 == NULL ? "any" :
1360 inet_ntop(AF_INET6, &host->params->bindAddr6->sin6_addr,
1364 fprintf (fp,"%s abs-max-connections : %d\n",indent,
1365 host->params->absMaxConnections) ;
1366 fprintf (fp,"%s active-connections : %d\n",indent,host->activeCxns) ;
1367 fprintf (fp,"%s sleeping-connections : %d\n",indent,host->sleepingCxns) ;
1368 fprintf (fp,"%s initial-connections : %d\n",indent,
1369 host->params->initialConnections) ;
1370 fprintf (fp,"%s want-streaming : %s\n",indent,
1371 boolToString (host->params->wantStreaming)) ;
1372 fprintf (fp,"%s drop-deferred : %s\n",indent,
1373 boolToString (host->params->dropDeferred)) ;
1374 fprintf (fp,"%s min-queue-connection : %s\n",indent,
1375 boolToString (host->params->minQueueCxn)) ;
1376 fprintf (fp,"%s remote-streams : %s\n",indent,
1377 boolToString (host->remoteStreams)) ;
1378 fprintf (fp,"%s max-checks : %d\n",indent,host->params->maxChecks) ;
1379 fprintf (fp,"%s article-timeout : %d\n",indent,
1380 host->params->articleTimeout) ;
1381 fprintf (fp,"%s response-timeout : %d\n",indent,
1382 host->params->responseTimeout) ;
1383 fprintf (fp,"%s close-period : %d\n",indent,
1384 host->params->closePeriod) ;
1385 fprintf (fp,"%s port : %d\n",indent,host->params->portNum) ;
1386 fprintf (fp,"%s dynamic-method : %d\n",indent,
1387 host->params->dynamicMethod) ;
1388 fprintf (fp,"%s dynamic-backlog-filter : %2.1f\n",indent,
1389 host->params->dynBacklogFilter) ;
1390 fprintf (fp,"%s dynamic-backlog-lwm : %2.1f\n",indent,
1391 host->params->dynBacklogLowWaterMark) ;
1392 fprintf (fp,"%s dynamic-backlog-hwm : %2.1f\n",indent,
1393 host->params->dynBacklogHighWaterMark) ;
1394 fprintf (fp,"%s no-check on : %2.1f\n",indent,
1395 host->params->lowPassHigh) ;
1396 fprintf (fp,"%s no-check off : %2.1f\n",indent,
1397 host->params->lowPassLow) ;
1398 fprintf (fp,"%s no-check filter : %2.1f\n",indent,
1399 host->params->lowPassFilter) ;
1400 fprintf (fp,"%s backlog-limit : %d\n",indent,
1401 host->params->backlogLimit) ;
1402 fprintf (fp,"%s backlog-limit-high : %d\n",indent,
1403 host->params->backlogLimitHigh) ;
1404 fprintf (fp,"%s backlog-factor : %2.1f\n",indent,
1405 host->params->backlogFactor) ;
1406 fprintf (fp,"%s max-connections : %d\n",indent,
1407 host->maxConnections) ;
1408 fprintf (fp,"%s backlog-feed-first : %s\n",indent,
1409 boolToString (host->params->backlogFeedFirst)) ;
1412 fprintf (fp,"%s statistics-id : %d\n",indent,host->statsId) ;
1413 fprintf (fp,"%s ChkCxns-id : %d\n",indent,host->ChkCxnsId) ;
1414 fprintf (fp,"%s deferred-id : %d\n",indent,host->deferredId) ;
1415 fprintf (fp,"%s backed-up : %s\n",indent,boolToString (host->backedUp));
1416 fprintf (fp,"%s backlog : %d\n",indent,host->backlog) ;
1417 fprintf (fp,"%s deferLen : %d\n",indent,host->deferLen) ;
1418 fprintf (fp,"%s loggedModeOn : %s\n",indent,
1419 boolToString (host->loggedModeOn)) ;
1420 fprintf (fp,"%s loggedModeOff : %s\n",indent,
1421 boolToString (host->loggedModeOff)) ;
1422 fprintf (fp,"%s logged-backlog : %s\n",indent,
1423 boolToString (host->loggedBacklog)) ;
1424 fprintf (fp,"%s streaming-type changed : %s\n",indent,
1425 boolToString (host->notifiedChangedRemBlckd)) ;
1426 fprintf (fp,"%s articles offered : %d\n",indent,host->artsOffered) ;
1427 fprintf (fp,"%s articles accepted : %d\n",indent,host->artsAccepted) ;
1428 fprintf (fp,"%s articles not wanted : %d\n",indent,
1429 host->artsNotWanted) ;
1430 fprintf (fp,"%s articles rejected : %d\n",indent,host->artsRejected);
1431 fprintf (fp,"%s articles deferred : %d\n",indent,host->artsDeferred) ;
1432 fprintf (fp,"%s articles missing : %d\n",indent,host->artsMissing) ;
1433 fprintf (fp,"%s articles spooled : %d\n",indent,host->artsToTape) ;
1434 fprintf (fp,"%s because of queue overflow : %d\n",indent,
1435 host->artsQueueOverflow) ;
1436 fprintf (fp,"%s when the we closed the host : %d\n",indent,
1437 host->artsHostClose) ;
1438 fprintf (fp,"%s because the host was asleep : %d\n",indent,
1439 host->artsHostSleep) ;
1440 fprintf (fp,"%s articles unspooled : %d\n",indent,host->artsFromTape) ;
1441 fprintf (fp,"%s articles requeued from dropped connections : %d\n",indent,
1442 host->artsCxnDrop) ;
1444 fprintf (fp,"%s process articles offered : %d\n",indent,
1445 host->gArtsOffered) ;
1446 fprintf (fp,"%s process articles accepted : %d\n",indent,
1447 host->gArtsAccepted) ;
1448 fprintf (fp,"%s process articles not wanted : %d\n",indent,
1449 host->gArtsNotWanted) ;
1450 fprintf (fp,"%s process articles rejected : %d\n",indent,
1451 host->gArtsRejected);
1452 fprintf (fp,"%s process articles deferred : %d\n",indent,
1453 host->gArtsDeferred) ;
1454 fprintf (fp,"%s process articles missing : %d\n",indent,
1455 host->gArtsMissing) ;
1456 fprintf (fp,"%s process articles spooled : %d\n",indent,
1457 host->gArtsToTape) ;
1458 fprintf (fp,"%s because of queue overflow : %d\n",indent,
1459 host->gArtsQueueOverflow) ;
1460 fprintf (fp,"%s when the we closed the host : %d\n",indent,
1461 host->gArtsHostClose) ;
1462 fprintf (fp,"%s because the host was asleep : %d\n",indent,
1463 host->gArtsHostSleep) ;
1464 fprintf (fp,"%s process articles unspooled : %d\n",indent,
1465 host->gArtsFromTape) ;
1466 fprintf (fp,"%s process articles requeued from dropped connections : %d\n",
1467 indent, host->gArtsCxnDrop) ;
1469 fprintf (fp,"%s average (mean) defer length : %.1f\n", indent,
1470 (double) host->dlAccum / cnt) ;
1471 fprintf (fp,"%s average (mean) queue length : %.1f\n", indent,
1472 (double) host->blAccum / cnt) ;
1473 fprintf (fp,"%s percentage of the time empty : %.1f\n", indent,
1474 100.0 * host->blNone / cnt) ;
1475 fprintf (fp,"%s percentage of the time >0%%-25%% : %.1f\n", indent,
1476 100.0 * host->blQuartile[0] / cnt) ;
1477 fprintf (fp,"%s percentage of the time 25%%-50%% : %.1f\n", indent,
1478 100.0 * host->blQuartile[1] / cnt) ;
1479 fprintf (fp,"%s percentage of the time 50%%-75%% : %.1f\n", indent,
1480 100.0 * host->blQuartile[2] / cnt) ;
1481 fprintf (fp,"%s percentage of the time 75%%-<100%% : %.1f\n", indent,
1482 100.0 * host->blQuartile[3] / cnt) ;
1483 fprintf (fp,"%s percentage of the time full : %.1f\n", indent,
1484 100.0 * host->blFull / cnt) ;
1485 fprintf (fp,"%s number of samples : %u\n", indent, host->blCount) ;
1487 fprintf (fp,"%s firstConnectTime : %s",indent,
1488 ctime (&host->firstConnectTime));
1489 fprintf (fp,"%s connectTime : %s",indent,ctime (&host->connectTime));
1490 fprintf (fp,"%s spoolTime : %s",indent,ctime (&host->spoolTime)) ;
1491 fprintf (fp,"%s last-spool-time : %s",indent,
1492 ctime (&host->lastSpoolTime)) ;
1495 fprintf (fp,"%s tape {\n",indent) ;
1496 printTapeInfo (host->myTape,fp,indentAmt + INDENT_INCR) ;
1497 fprintf (fp,"%s }\n",indent) ;
1499 fprintf (fp,"%s tape : %p\n",indent,(void *) host->myTape) ;
1502 fprintf (fp,"%s QUEUED articles {\n",indent) ;
1503 for (qe = host->queued ; qe != NULL ; qe = qe->next)
1506 printArticleInfo (qe->article,fp,indentAmt + INDENT_INCR) ;
1508 fprintf (fp,"%s %p\n",indent,(void *) qe->article) ;
1512 fprintf (fp,"%s }\n",indent) ;
1514 fprintf (fp,"%s IN PROCESS articles {\n",indent) ;
1515 for (qe = host->processed ; qe != NULL ; qe = qe->next)
1518 printArticleInfo (qe->article,fp,indentAmt + INDENT_INCR) ;
1520 fprintf (fp,"%s %p\n",indent,(void *) qe->article) ;
1524 fprintf (fp,"%s }\n",indent) ;
1525 fprintf (fp,"%s DEFERRED articles {\n",indent) ;
1526 for (qe = host->deferred ; qe != NULL ; qe = qe->next)
1529 printArticleInfo (qe->article,fp,indentAmt + INDENT_INCR) ;
1531 fprintf (fp,"%s %p\n",indent,(void *) qe->article) ;
1535 fprintf (fp,"%s }\n",indent) ;
1536 fprintf (fp,"%s DEFERRED articles {\n",indent) ;
1537 for (qe = host->deferred ; qe != NULL ; qe = qe->next)
1540 printArticleInfo (qe->article,fp,indentAmt + INDENT_INCR) ;
1542 fprintf (fp,"%s %p\n",indent,(void *) qe->article) ;
1546 fprintf (fp,"%s }\n",indent) ;
1550 fprintf (fp,"%s Connections {\n",indent) ;
1551 for (i = 0 ; i < host->maxConnections ; i++)
1554 if (host->connections[i] != NULL)
1555 printCxnInfo (*cxn,fp,indentAmt + INDENT_INCR) ;
1557 fprintf (fp,"%s %p\n",indent,(void *) host->connections[i]) ;
1560 fprintf (fp,"%s }\n",indent) ;
1562 fprintf (fp,"%s Active Connections {\n%s ",indent,indent) ;
1563 for (i = 0 ; i < host->maxConnections ; i++)
1564 if (host->cxnActive[i])
1565 fprintf (fp," [%d:%p]",i,(void *) host->connections[i]) ;
1566 fprintf (fp,"\n%s }\n",indent) ;
1568 fprintf (fp,"%s Sleeping Connections {\n%s ",indent,indent) ;
1569 for (i = 0 ; i < host->maxConnections ; i++)
1570 if (host->cxnSleeping[i])
1571 fprintf (fp," [%d:%p]",i,(void *) host->connections[i]) ;
1572 fprintf (fp,"\n%s }\n",indent) ;
1574 fprintf (fp,"%s}\n",indent) ;
1583 /* close down all the connections of the Host. All articles that are in
1584 * processes are still pushed out and then a QUIT is issued. The Host will
1585 * also spool all inprocess articles to tape incase the process is about to
1586 * be killed (they'll be refused next time around). When all Connections
1587 * report that they're gone, then the Host will delete itself.
1589 void hostClose (Host host)
1592 unsigned int cxnCount ;
1594 d_printf (1,"Closing host %s\n",host->params->peerName) ;
1596 queuesToTape (host) ;
1597 delTape (host->myTape) ;
1598 host->myTape = NULL ;
1600 hostLogStats (host,true) ;
1602 clearTimer (host->statsId) ;
1603 clearTimer (host->ChkCxnsId) ;
1604 clearTimer (host->deferredId) ;
1606 host->connectTime = 0 ;
1608 /* when we call cxnTerminate() on the last Connection, the Host objects
1609 will end up getting deleted out from under us (via hostCxnGone()). If
1610 we are running with a malloc that scribbles over memory after freeing
1611 it, then we'd fail in the second for loop test. Trying to access
1612 host->maxConnections. */
1613 for (i = 0, cxnCount = 0 ; i < host->maxConnections ; i++)
1614 cxnCount += (host->connections [i] != NULL ? 1 : 0) ;
1615 for (i = 0 ; i < cxnCount ; i++)
1616 if (host->connections[i] != NULL)
1617 cxnTerminate (host->connections [i]) ;
1622 * check if host should get more connections opened, or some closed...
1624 void hostChkCxns(TimeoutId tid UNUSED, void *data) {
1625 Host host = (Host) data;
1626 unsigned int currArticles, currSentArticles, currTotalArticles, newMaxCxns ;
1627 double lastAPS, currAPS, percentTaken, ratio ;
1628 double backlogRatio, backlogMult;
1630 if(!host->maxCxnChk)
1633 ASSERT(host->params != NULL);
1635 if(host->secsInLastPeriod > 0)
1636 lastAPS = host->artsProcLastPeriod / (host->secsInLastPeriod * 1.0);
1638 lastAPS = host->artsProcLastPeriod * 1.0;
1640 newMaxCxns = host->maxConnections;
1642 currArticles = (host->gArtsAccepted + host->gArtsRejected +
1643 (host->gArtsNotWanted / 4)) - host->lastCheckPoint ;
1645 host->lastCheckPoint = (host->gArtsAccepted + host->gArtsRejected +
1646 (host->gArtsNotWanted / 4));
1648 currSentArticles = host->gArtsAccepted + host->gArtsRejected
1649 - host->lastSentCheckPoint ;
1651 host->lastSentCheckPoint = host->gArtsAccepted + host->gArtsRejected;
1653 currTotalArticles = host->gArtsAccepted + host->gArtsRejected
1654 + host->gArtsRejected + host->gArtsQueueOverflow
1655 - host->lastTotalCheckPoint ;
1657 host->lastTotalCheckPoint = host->gArtsAccepted + host->gArtsRejected
1658 + host->gArtsRejected + host->gArtsQueueOverflow ;
1660 currAPS = currArticles / (host->nextCxnTimeChk * 1.0) ;
1662 percentTaken = currSentArticles * 1.0 /
1663 ((currTotalArticles==0)?1:currTotalArticles);
1665 /* Get how full the queue is currently */
1666 backlogRatio = (host->backlog * 1.0 / hostHighwater);
1667 backlogMult = 1.0/(1.0-host->params->dynBacklogFilter);
1669 d_printf(1,"%s hostChkCxns - entry filter=%3.3f blmult=%3.3f blratio=%3.3f\n",host->params->peerName,host->backlogFilter, backlogMult, backlogRatio);
1671 ratio = 0.0; /* ignore APS by default */
1673 switch (host->params->dynamicMethod)
1675 case METHOD_COMBINED:
1676 /* When a high % of articles is being taken, take notice of the
1677 * APS values. However for smaller %s, quickly start to ignore this
1678 * and concentrate on queue sizes
1680 ratio = percentTaken * percentTaken;
1683 /* backlogFilter is an IIR filtered version of the backlogRatio.
1685 host->backlogFilter *= host->params->dynBacklogFilter;
1686 /* Penalise anything over the backlog HWM twice as severely
1687 * (otherwise we end up feeding some sites constantly
1688 * just below the HWM. This way random noise makes
1689 * such sites jump to one more connection
1691 * Use factor (1-ratio) so if ratio is near 1 we ignore this
1693 if (backlogRatio>host->params->dynBacklogLowWaterMark/100.0)
1694 host->backlogFilter += (backlogRatio+1.0)/2.0 * (1.0-ratio);
1696 host->backlogFilter += backlogRatio * (1.0-ratio);
1699 * Now bump it around for APS too
1701 if ((currAPS - lastAPS) >= 0.1)
1702 host->backlogFilter += ratio*((currAPS - lastAPS) + 1.0);
1703 else if ((currAPS - lastAPS) < -.2)
1704 host->backlogFilter -= ratio;
1706 d_printf(1,"%s hostChkCxns - entry hwm=%3.3f lwm=%3.3f new=%3.3f [%3.3f,%3.3f]\n",
1707 host->params->peerName,host->params->dynBacklogHighWaterMark,
1708 host->params->dynBacklogLowWaterMark,host->backlogFilter,
1709 (host->params->dynBacklogLowWaterMark * backlogMult / 100.0),
1710 (host->params->dynBacklogHighWaterMark * backlogMult / 100.0));
1712 if (host->backlogFilter <
1713 (host->params->dynBacklogLowWaterMark * backlogMult / 100.0))
1715 else if (host->backlogFilter >
1716 (host->params->dynBacklogHighWaterMark * backlogMult / 100.0))
1720 /* well not much to do, just check maxConnection = absMaxConnections */
1721 ASSERT (host->maxConnections == MAXCONLIMIT(host->params->absMaxConnections));
1724 if ((currAPS - lastAPS) >= 0.1)
1725 newMaxCxns += (int)(currAPS - lastAPS) + 1 ;
1726 else if ((currAPS - lastAPS) < -.2)
1731 d_printf(1, "hostChkCxns: Chngs %f\n", currAPS - lastAPS);
1733 if (newMaxCxns < 1) newMaxCxns=1;
1734 if (newMaxCxns > MAXCONLIMIT(host->params->absMaxConnections))
1735 newMaxCxns = MAXCONLIMIT(host->params->absMaxConnections);
1737 if (newMaxCxns != host->maxConnections)
1739 notice ("%s hostChkCxns - maxConnections was %d now %d",
1740 host->params->peerName, host->maxConnections,newMaxCxns);
1742 host->backlogFilter= ((host->params->dynBacklogLowWaterMark
1743 + host->params->dynBacklogHighWaterMark)
1744 /200.0 * backlogMult);
1745 host->artsProcLastPeriod = currArticles ;
1746 host->secsInLastPeriod = host->nextCxnTimeChk ;
1748 /* Alter MaxConnections and in doing so ensure we connect new
1749 cxns immediately if we are adding stuff
1751 hostAlterMaxConnections(host, host->params->absMaxConnections,
1755 if(host->nextCxnTimeChk <= 240) host->nextCxnTimeChk *= 2;
1756 else host->nextCxnTimeChk = 300;
1757 d_printf(1, "prepareSleep hostChkCxns, %d\n", host->nextCxnTimeChk);
1758 host->ChkCxnsId = prepareSleep(hostChkCxns, host->nextCxnTimeChk, host);
1763 * have the Host transmit the Article if possible.
1765 void hostSendArticle (Host host, Article article)
1767 ASSERT(host->params != NULL);
1768 if (host->spoolTime > 0)
1769 { /* all connections are asleep */
1770 host->artsHostSleep++ ;
1771 host->gArtsHostSleep++ ;
1772 host->artsToTape++ ;
1773 host->gArtsToTape++ ;
1775 tapeTakeArticle (host->myTape, article) ;
1779 /* at least one connection is feeding or waiting and there's no backlog */
1780 if (host->queued == NULL)
1784 Connection cxn = NULL ;
1786 extraRef = artTakeRef (article) ; /* the referrence we give away */
1788 /* stick on the queue of articles we've handed off--we're hopeful. */
1789 queueArticle (article,&host->processed,&host->processedTail, 0) ;
1791 if (host->params->minQueueCxn) {
1792 Connection x_cxn = NULL ;
1793 unsigned int x_queue = host->params->maxChecks + 1 ;
1795 for (idx = 0 ; x_queue > 0 && idx < host->maxConnections ; idx++)
1796 if ((cxn = host->connections[idx]) != host->notThisCxn) {
1797 if (!host->cxnActive [idx]) {
1798 if (!host->cxnSleeping [idx]) {
1799 if (cxnTakeArticle (cxn, extraRef)) {
1803 d_printf (1,"%s Inactive connection %d refused an article\n",
1804 host->params->peerName,idx) ;
1807 unsigned int queue = host->params->maxChecks - cxnQueueSpace (cxn) ;
1808 if (queue < x_queue) {
1815 if (x_cxn != NULL && cxnTakeArticle (x_cxn, extraRef)) {
1816 if (x_queue == 0) host->gNoQueue++ ;
1817 else host->gCxnQueue += x_queue ;
1823 /* first we try to give it to one of our active connections. We
1824 simply start at the bottom and work our way up. This way
1825 connections near the end of the list will get closed sooner from
1827 for (idx = 0 ; idx < host->maxConnections ; idx++)
1829 if (host->cxnActive [idx] &&
1830 (cxn = host->connections[idx]) != host->notThisCxn &&
1831 cxnTakeArticle (cxn, extraRef)) {
1832 unsigned int queue = host->params->maxChecks - cxnQueueSpace (cxn) - 1;
1833 if (queue == 0) host->gNoQueue++ ;
1834 else host->gCxnQueue += queue ;
1839 /* Wasn't taken so try to give it to one of the waiting connections. */
1840 for (idx = 0 ; idx < host->maxConnections ; idx++)
1841 if (!host->cxnActive [idx] && !host->cxnSleeping [idx] &&
1842 (cxn = host->connections[idx]) != host->notThisCxn)
1844 if (cxnTakeArticle (cxn, extraRef)) {
1845 unsigned int queue = host->params->maxChecks - cxnQueueSpace (cxn) - 1;
1846 if (queue == 0) host->gNoQueue++ ;
1847 else host->gCxnQueue += queue ;
1850 d_printf (1,"%s Inactive connection %d refused an article\n",
1851 host->params->peerName,idx) ;
1855 /* this'll happen if all connections are feeding and all
1856 their queues are full, or if those not feeding are asleep. */
1857 d_printf (1, "Couldn't give the article to a connection\n") ;
1859 delArticle (extraRef) ;
1861 remArticle (article,&host->processed,&host->processedTail) ;
1862 if (!cxnCheckstate (cxn))
1864 host->artsToTape++ ;
1865 host->gArtsToTape++ ;
1867 tapeTakeArticle (host->myTape,article) ;
1872 /* either all the per connection queues were full or we already had
1873 a backlog, so there was no sense in checking. */
1874 queueArticle (article,&host->queued,&host->queuedTail, 0) ;
1877 backlogToTape (host) ;
1887 * called by the Host's connection when the remote is refusing postings
1888 * from us becasue we're not allowed (banner code 400).
1890 void hostCxnBlocked (Host host, Connection cxn, char *reason)
1892 ASSERT(host->params != NULL);
1897 for (i = 0 ; i < host->maxConnections ; i++)
1898 if (host->connections [i] == cxn)
1899 ASSERT (host->cxnActive [i] == false) ;
1903 if (host->blockedReason == NULL)
1904 host->blockedReason = xstrdup (reason) ;
1906 if (host->activeCxns == 0 && host->spoolTime == 0)
1908 host->blockedCxn = cxn ; /* to limit log notices */
1909 notice ("%s remote cannot accept articles initial: %s",
1910 host->params->peerName, reason) ;
1912 else if (host->activeCxns > 0 && !host->notifiedChangedRemBlckd)
1914 notice ("%s remote cannot accept articles change: %s",
1915 host->params->peerName, reason) ;
1916 host->notifiedChangedRemBlckd = true ;
1918 else if (host->spoolTime != 0 && host->blockedCxn == cxn)
1920 notice ("%s remote cannot accept articles still: %s",
1921 host->params->peerName, reason) ;
1933 * Called by the Connection when it gets a response back to the MODE
1934 * STREAM command. It's now that we consider the connection usable.
1936 void hostRemoteStreams (Host host, Connection cxn, bool doesStreaming)
1940 host->blockedCxn = NULL ;
1941 if (host->blockedReason != NULL)
1942 free (host->blockedReason) ;
1943 host->blockedReason = NULL ;
1945 /* we may have told the connection to quit while it was in the middle
1947 if (amClosing (host))
1950 if (host->connectTime == 0) /* first connection for this cycle. */
1952 if (doesStreaming && host->params->wantStreaming)
1953 notice ("%s remote MODE STREAM", host->params->peerName) ;
1954 else if (doesStreaming)
1955 notice ("%s remote MODE STREAM disabled", host->params->peerName) ;
1957 notice ("%s remote MODE STREAM failed", host->params->peerName) ;
1959 if (host->spoolTime > 0)
1960 hostStopSpooling (host) ;
1962 /* set up the callback for statistics logging. */
1963 if (host->statsId != 0)
1964 clearTimer (host->statsId) ;
1965 host->statsId = prepareSleep (hostStatsTimeoutCbk, statsPeriod, host) ;
1967 if (host->ChkCxnsId != 0)
1968 clearTimer (host->ChkCxnsId);
1969 host->ChkCxnsId = prepareSleep (hostChkCxns, 30, host) ;
1971 host->remoteStreams = (host->params->wantStreaming ? doesStreaming : false) ;
1973 host->connectTime = theTime() ;
1974 if (host->firstConnectTime == 0)
1975 host->firstConnectTime = host->connectTime ;
1977 else if (host->remoteStreams != doesStreaming && host->params->wantStreaming)
1978 notice ("%s remote MODE STREAM change", host->params->peerName) ;
1980 for (i = 0 ; i < host->maxConnections ; i++)
1981 if (host->connections [i] == cxn)
1983 host->cxnActive [i] = true ;
1984 if (host->cxnSleeping [i])
1985 host->sleepingCxns-- ;
1986 host->cxnSleeping [i] = false ;
1990 ASSERT (i != host->maxConnections) ;
1992 host->activeCxns++ ;
2004 * Called by the connection when it is no longer connected to the
2005 * remote. Perhaps due to getting a code 400 to an IHAVE, or due to a
2008 void hostCxnDead (Host host, Connection cxn)
2012 for (i = 0 ; i < host->maxConnections ; i++)
2013 if (host->connections [i] == cxn)
2015 if (host->cxnActive [i]) /* won't be active if got 400 on banner */
2017 host->cxnActive [i] = false ;
2018 host->activeCxns-- ;
2020 if (!amClosing (host) && host->activeCxns == 0)
2022 clearTimer (host->statsId) ;
2023 clearTimer (host->ChkCxnsId) ;
2024 hostLogStats (host,true) ;
2025 host->connectTime = 0 ;
2028 else if (host->cxnSleeping [i]) /* cxnNuke can be called on sleepers */
2030 host->cxnSleeping [i] = false ;
2031 host->sleepingCxns-- ;
2037 ASSERT (i < host->maxConnections) ;
2048 * Called by the Connection when it is going to sleep so the Host won't
2049 * bother trying to give it Articles
2051 void hostCxnSleeping (Host host, Connection cxn)
2055 for (i = 0 ; i < host->maxConnections ; i++)
2056 if (host->connections [i] == cxn)
2058 if (!host->cxnSleeping [i])
2060 host->cxnSleeping [i] = true ;
2061 host->sleepingCxns++ ;
2064 if (host->spoolTime == 0 && host->sleepingCxns >= host->maxConnections)
2065 hostStartSpooling (host) ;
2070 ASSERT (i < host->maxConnections) ;
2082 * Called by the Connection when it goes into the waiting state.
2084 void hostCxnWaiting (Host host, Connection cxn)
2088 for (i = 0 ; i < host->maxConnections ; i++)
2089 if (host->connections [i] == cxn)
2091 if (host->cxnSleeping [i])
2092 host->sleepingCxns-- ;
2093 host->cxnSleeping [i] = false ;
2097 ASSERT (i < host->maxConnections) ;
2099 if (host->spoolTime > 0)
2100 hostStopSpooling (host) ;
2112 * Called by the Connection when it is about to delete itself.
2114 bool hostCxnGone (Host host, Connection cxn)
2117 bool oneThere = false ;
2118 char msgstr[SMBUF] ;
2120 /* forget about the Connection and see if we are still holding any live
2121 connections still. */
2122 for (i = 0 ; i < host->maxConnections ; i++)
2123 if (host->connections [i] == cxn)
2125 if (!amClosing (host))
2127 warn ("%s:%d connection vanishing", host->params->peerName, i) ;
2129 host->connections [i] = NULL ;
2130 if (host->cxnActive [i])
2132 host->cxnActive [i] = false ;
2133 host->activeCxns-- ;
2135 else if (host->cxnSleeping [i])
2137 host->cxnSleeping [i] = false ;
2138 host->sleepingCxns-- ;
2141 else if (host->connections [i] != NULL)
2144 /* remove the host if it has no connexions */
2147 time_t now = theTime() ;
2148 unsigned int hostsLeft ;
2150 if (host->firstConnectTime > 0) {
2151 snprintf(msgstr, sizeof(msgstr), "accsize %.0f rejsize %.0f",
2152 host->gArtsSizeAccepted, host->gArtsSizeRejected);
2153 notice ("%s global seconds %ld offered %d accepted %d refused %d"
2154 " rejected %d missing %d %s spooled %d unspooled %d",
2155 host->params->peerName, (long) (now - host->firstConnectTime),
2156 host->gArtsOffered, host->gArtsAccepted,
2157 host->gArtsNotWanted, host->gArtsRejected,
2158 host->gArtsMissing, msgstr,
2159 host->gArtsToTape, host->gArtsFromTape) ;
2162 hostsLeft = listenerHostGone (host->listener, host) ;
2165 if (hostsLeft == 0) {
2166 snprintf(msgstr, sizeof(msgstr), "accsize %.0f rejsize %.0f",
2167 procArtsSizeAccepted, procArtsSizeRejected);
2168 notice ("ME global seconds %ld offered %ld accepted %ld refused %ld"
2169 " rejected %ld missing %ld %s spooled %ld unspooled %ld",
2170 (long) (now - start),
2171 procArtsOffered, procArtsAccepted,
2172 procArtsNotWanted,procArtsRejected,
2173 procArtsMissing, msgstr,
2174 procArtsToTape, procArtsFromTape) ;
2177 /* return true if that was the last host */
2178 return (hostsLeft == 0 ? true : false) ;
2181 /* return false because there is still at least one host (this one) */
2192 * The connections has offered an article to the remote.
2194 void hostArticleOffered (Host host, Connection cxn UNUSED)
2196 host->artsOffered++ ;
2197 host->gArtsOffered++ ;
2208 * Article was succesfully transferred.
2210 void hostArticleAccepted (Host host, Connection cxn, Article article)
2212 const char *filename = artFileName (article) ;
2213 const char *msgid = artMsgId (article) ;
2214 double len = artSize (article);
2216 d_printf (5,"Article %s (%s) was transferred\n", msgid, filename) ;
2218 host->artsAccepted++ ;
2219 host->gArtsAccepted++ ;
2220 procArtsAccepted++ ;
2221 host->artsSizeAccepted += len ;
2222 host->gArtsSizeAccepted += len ;
2223 procArtsSizeAccepted += len ;
2225 /* host has two references to the article here... the parameter `article'
2228 delArticle (article) ; /* drop the parameter reference */
2230 if (!amClosing (host))
2231 articleGone (host,cxn,article) ; /* and the one in the queue */
2241 * remote said no thanks to an article.
2243 void hostArticleNotWanted (Host host, Connection cxn, Article article)
2245 const char *filename = artFileName (article) ;
2246 const char *msgid = artMsgId (article) ;
2248 d_printf (5,"Article %s (%s) was not wanted\n", msgid, filename) ;
2250 host->artsNotWanted++ ;
2251 host->gArtsNotWanted++ ;
2252 procArtsNotWanted++ ;
2255 /* host has two references to the article here... `article' and the
2258 delArticle (article) ; /* drop the `article' reference */
2260 if (!amClosing (host))
2261 articleGone (host,cxn,article) ; /* and the one in the queue */
2271 * remote rejected the article after it was was transferred
2273 void hostArticleRejected (Host host, Connection cxn, Article article)
2275 const char *filename = artFileName (article) ;
2276 const char *msgid = artMsgId (article) ;
2277 double len = artSize (article);
2279 d_printf (5,"Article %s (%s) was rejected\n", msgid, filename) ;
2281 host->artsRejected++ ;
2282 host->gArtsRejected++ ;
2283 procArtsRejected++ ;
2284 host->artsSizeRejected += len ;
2285 host->gArtsSizeRejected += len ;
2286 procArtsSizeRejected += len ;
2288 /* host has two references to the article here... `article' and the queue */
2290 delArticle (article) ; /* drop the `article' reference */
2292 if (!amClosing (host))
2293 articleGone (host,cxn,article) ;
2303 * The remote wants us to retry the article later.
2305 void hostArticleDeferred (Host host, Connection cxn, Article article)
2307 host->artsDeferred++ ;
2308 host->gArtsDeferred++ ;
2309 procArtsDeferred++ ;
2312 if (!amClosing (host))
2315 int deferTimeout = 5 ; /* XXX - should be tunable */
2316 time_t now = theTime() ;
2318 extraRef = artTakeRef (article) ; /* hold a reference until requeued */
2319 articleGone (host,cxn,article) ; /* drop from the queue */
2321 if (host->deferred == NULL)
2323 if (host->deferredId != 0)
2324 clearTimer (host->deferredId) ;
2325 host->deferredId = prepareSleep (hostDeferredArtCbk, deferTimeout,
2329 queueArticle (article,&host->deferred,&host->deferredTail,
2330 now + deferTimeout) ;
2332 backlogToTape (host) ;
2333 delArticle (extraRef) ;
2336 delArticle(article); /*drop parameter reference if not sent to tape*/
2346 * The Connection is giving the article back to the Host, but it doesn't
2347 * want a new one in return.
2349 void hostTakeBackArticle (Host host, Connection cxn UNUSED, Article article)
2351 if (!amClosing (host))
2355 host->artsCxnDrop++ ;
2356 host->gArtsCxnDrop++ ;
2357 extraRef = artTakeRef (article) ; /* hold a reference until requeued */
2358 articleGone (host,NULL,article) ; /* drop from the queue */
2359 host->notThisCxn = cxn;
2360 hostSendArticle (host, article) ; /* requeue it */
2361 host->notThisCxn = NULL;
2362 delArticle (extraRef) ;
2365 delArticle(article); /*drop parameter reference if not sent to tape*/
2376 * The disk file for the article is no longer valid
2378 void hostArticleIsMissing (Host host, Connection cxn, Article article)
2380 const char *filename = artFileName (article) ;
2381 const char *msgid = artMsgId (article) ;
2383 d_printf (5, "%s article is missing %s %s\n", host->params->peerName, msgid, filename) ;
2385 host->artsMissing++ ;
2386 host->gArtsMissing++ ;
2389 /* host has two references to the article here... `article' and the
2392 delArticle (article) ; /* drop the `article' reference */
2394 if (!amClosing (host))
2395 articleGone (host,cxn,article) ; /* and the one in the queue */
2404 /* The Connection wants something to do. This is called by the Connection
2405 * after it has transferred an article. This is what keeps the pipes full
2406 * of data off the tapes if the input from inn is idle.
2408 bool hostGimmeArticle (Host host, Connection cxn)
2410 Article article = NULL ;
2411 bool gaveSomething = false ;
2412 size_t amtToGive = cxnQueueSpace (cxn) ; /* may be more than one */
2415 if (amClosing (host))
2417 d_printf (5,"%s no article to give due to closing\n",host->params->peerName) ;
2423 d_printf (5,"%s Queue space is zero....\n",host->params->peerName) ;
2425 while (amtToGive > 0)
2428 unsigned int queue = host->params->maxChecks - amtToGive ;
2430 if (host->params->backlogFeedFirst) {
2431 if ((article = getArticle (host->myTape)) != NULL)
2433 else if ((article = remHead (&host->queued,&host->queuedTail)) != NULL)
2439 if ((article = remHead (&host->queued,&host->queuedTail)) != NULL)
2441 else if ((article = getArticle (host->myTape)) != NULL)
2450 tookIt = cxnQueueArticle (cxn,artTakeRef (article)) ;
2452 ASSERT (tookIt == true) ;
2454 if (queue == 0) host->gNoQueue++ ;
2455 else host->gCxnQueue += queue ;
2457 queueArticle (article,&host->processed,&host->processedTail, 0) ;
2460 gaveSomething = true ;
2464 /* go to the tapes */
2465 tookIt = cxnQueueArticle (cxn,artTakeRef (article)) ;
2467 ASSERT (tookIt == true) ;
2469 if (queue == 0) host->gNoQueue++ ;
2470 else host->gCxnQueue += queue ;
2472 host->artsFromTape++ ;
2473 host->gArtsFromTape++ ;
2474 procArtsFromTape++ ;
2475 queueArticle (article,&host->processed,&host->processedTail, 0) ;
2478 gaveSomething = true ;
2483 /* we had nothing left to give... */
2485 if (host->processed == NULL) /* and if nothing outstanding... */
2486 listenerHostIsIdle (host->listener,host) ; /* tell our owner */
2494 return gaveSomething ;
2504 * get the name that INN uses for this host
2506 const char *hostPeerName (Host host)
2508 ASSERT (host != NULL) ;
2510 return host->params->peerName ;
2514 * get the IPv4 bindaddress
2516 const struct sockaddr_in *hostBindAddr (Host host)
2518 ASSERT (host != NULL) ;
2520 return host->params->bindAddr ;
2525 * get the IPv6 bindaddress
2527 const struct sockaddr_in6 *hostBindAddr6 (Host host)
2529 ASSERT (host != NULL) ;
2531 return host->params->bindAddr6 ;
2535 * get the address family
2537 int hostAddrFamily (Host host)
2539 ASSERT (host != NULL) ;
2541 return host->params->family ;
2546 * get the username and password for authentication
2548 const char *hostUsername (Host host)
2550 ASSERT (host != NULL) ;
2552 return host->params->username ;
2554 const char *hostPassword (Host host)
2556 ASSERT (host != NULL) ;
2558 return host->params->password ;
2562 /* return true if the Connections for this host should attempt to do
2564 bool hostWantsStreaming (Host host)
2566 return host->params->wantStreaming ;
2569 unsigned int hostMaxChecks (Host host)
2571 return host->params->maxChecks ;
2574 bool hostDropDeferred (Host host)
2576 return host->params->dropDeferred ;
2585 /**********************************************************************/
2586 /** CLASS FUNCTIONS **/
2587 /**********************************************************************/
2590 * Set the state of whether each Connection is told to log its stats when
2591 * its controlling Host logs its stats.
2593 void hostLogConnectionStats (bool val)
2595 logConnectionStats = val ;
2599 bool hostLogConnectionStatsP (void)
2601 return logConnectionStats ;
2607 * Called by one of the Host's Connection's when it (the Connection)
2608 * switches into or out of no-CHECK mode.
2610 void hostLogNoCheckMode (Host host, bool on, double low, double cur, double high)
2612 if (on && host->loggedModeOn == false)
2614 notice ("%s mode no-CHECK entered [%.2f,%.2f,%.2f]",
2615 host->params->peerName, low, cur, high) ;
2616 host->loggedModeOn = true ;
2618 else if (!on && host->loggedModeOff == false)
2620 notice ("%s mode no-CHECK exited [%.2f,%.2f,%.2f]",
2621 host->params->peerName, low, cur, high) ;
2622 host->loggedModeOff = true ;
2628 void hostSetStatusFile (const char *filename)
2632 if (filename == NULL)
2633 die ("Can't set status file name with a NULL filename\n") ;
2634 else if (*filename == '\0')
2635 die ("Can't set status file name with a empty string\n") ;
2637 if (*filename == '/')
2638 statusFile = xstrdup (filename) ;
2640 statusFile = concatpath (innconf->pathlog,filename) ;
2642 if ((fp = fopen (statusFile,"w")) == NULL)
2644 syslog (LOG_ERR,"Status file is not a valid pathname: %s",
2653 void gHostStats (void)
2656 time_t now = theTime() ;
2657 char msgstr[SMBUF] ;
2659 for (h = gHostList ; h != NULL ; h = h->next)
2660 if (h->firstConnectTime > 0) {
2661 snprintf(msgstr, sizeof(msgstr), "accsize %.0f rejsize %.0f",
2662 h->gArtsSizeAccepted, h->gArtsSizeRejected);
2663 notice ("%s global seconds %ld offered %d accepted %d refused %d"
2664 " rejected %d missing %d %s spooled %d unspooled %d",
2665 h->params->peerName,
2666 (long) (now - h->firstConnectTime),
2667 h->gArtsOffered, h->gArtsAccepted,
2668 h->gArtsNotWanted, h->gArtsRejected,
2669 h->gArtsMissing, msgstr,
2670 h->gArtsToTape, h->gArtsFromTape) ;
2676 /**********************************************************************/
2677 /** PRIVATE FUNCTIONS **/
2678 /**********************************************************************/
2684 #define NO_INHERIT 0
2687 static HostParams hostDetails (scope *s,
2693 int bv, vival, inherit ;
2699 p=newHostParams(isDefault?NULL:defaultParams);
2703 ASSERT (name==NULL);
2709 p->peerName=xstrdup(name);
2714 if (getString (s,IP_NAME,&q,NO_INHERIT))
2717 p->ipName = xstrdup (name) ;
2720 if (getString (s,"username",&q,NO_INHERIT))
2722 if (getString (s,"password",&q,NO_INHERIT))
2725 if (p->username != NULL && p->password == NULL)
2726 logOrPrint (LOG_ERR,fp,"cannot find password for %s",p->peerName);
2727 if (p->username == NULL && p->password != NULL)
2728 logOrPrint (LOG_ERR,fp,"cannot find username for %s",p->peerName);
2733 if (getString(s,"bindaddress6",&q,isDefault?NO_INHERIT:INHERIT))
2735 struct addrinfo *res, hints;
2737 if (strcmp(q, "none") == 0)
2738 p->family = AF_INET;
2739 else if (p->family == AF_INET)
2742 if (strcmp(q, "any") != 0 && strcmp(q, "all") != 0 &&
2743 strcmp(q, "none") != 0)
2745 memset( &hints, 0, sizeof( hints ) );
2746 hints.ai_flags = AI_NUMERICHOST;
2747 if( getaddrinfo( q, NULL, &hints, &res ) )
2749 logOrPrint (LOG_ERR, fp,
2750 "unable to determine IPv6 bind address for %s",
2755 p->bindAddr6 = (struct sockaddr_in6 *) xmalloc (res->ai_addrlen);
2756 memcpy( p->bindAddr6, res->ai_addr, res->ai_addrlen );
2762 if (getString(s,"bindaddress",&q,isDefault?NO_INHERIT:INHERIT))
2764 struct in_addr addr ;
2767 if (strcmp(q, "none") == 0) {
2769 logOrPrint (LOG_ERR,fp,"cannot set both bindaddress and bindaddress6"
2770 " to \"none\" -- ignoring them for %s",p->peerName);
2773 p->family = AF_INET6;
2775 } else if (p->family == AF_INET6)
2779 if (strcmp(q, "any") != 0 && strcmp(q, "all") != 0 &&
2780 strcmp(q, "none") != 0)
2782 if (!inet_aton(q,&addr))
2784 logOrPrint (LOG_ERR, fp,
2785 "unable to determine IPv4 bind address for %s",
2790 p->bindAddr = (struct sockaddr_in *)
2791 xmalloc (sizeof(struct sockaddr_in));
2792 make_sin( (struct sockaddr_in *)p->bindAddr, &addr );
2797 /* check required global defaults are there and have good values */
2800 #define GETINT(sc,f,n,min,max,req,val,inh) \
2801 vival = validateInteger(f,n,min,max,req,val,sc,inh); \
2802 if (isDefault) do{ \
2803 if(vival==VALUE_WRONG_TYPE) \
2805 logOrPrint(LOG_CRIT,fp,"cannot continue"); \
2808 else if(vival != VALUE_OK) \
2812 getInteger (sc,n,&iv,inh) ; \
2813 val = (unsigned int) iv ;
2815 #define GETREAL(sc,f,n,min,max,req,val,inh) \
2816 vival = validateReal(f,n,min,max,req,val,sc,inh); \
2817 if (isDefault) do{ \
2818 if(vival==VALUE_WRONG_TYPE) \
2820 logOrPrint(LOG_CRIT,fp,"cannot continue"); \
2823 else if(vival != VALUE_OK) \
2827 getReal (sc,n,&rv,inh) ; \
2830 #define GETBOOL(sc,f,n,req,val,inh) \
2831 vival = validateBool(f,n,req,val,sc,inh); \
2832 if (isDefault) do{ \
2833 if(vival==VALUE_WRONG_TYPE) \
2835 logOrPrint(LOG_CRIT,fp,"cannot continue"); \
2838 else if(vival != VALUE_OK) \
2842 getBool (sc,n,&bv,inh) ; \
2843 val = (bv ? true : false);
2845 inherit = isDefault?NO_INHERIT:INHERIT;
2846 GETINT(s,fp,"article-timeout",0,LONG_MAX,REQ,p->articleTimeout, inherit);
2847 GETINT(s,fp,"response-timeout",0,LONG_MAX,REQ,p->responseTimeout, inherit);
2848 GETINT(s,fp,"close-period",0,LONG_MAX,REQ,p->closePeriod, inherit);
2849 GETINT(s,fp,"initial-connections",0,LONG_MAX,REQ,p->initialConnections, inherit);
2850 GETINT(s,fp,"max-connections",0,LONG_MAX,REQ,p->absMaxConnections, inherit);
2851 GETINT(s,fp,"max-queue-size",1,LONG_MAX,REQ,p->maxChecks, inherit);
2852 GETBOOL(s,fp,"streaming",REQ,p->wantStreaming, inherit);
2853 GETBOOL(s,fp,"drop-deferred",REQ,p->dropDeferred, inherit);
2854 GETBOOL(s,fp,"min-queue-connection",REQ,p->minQueueCxn, inherit);
2855 GETREAL(s,fp,"no-check-high",0.0,100.0,REQ,p->lowPassHigh, inherit);
2856 GETREAL(s,fp,"no-check-low",0.0,100.0,REQ,p->lowPassLow, inherit);
2857 GETREAL(s,fp,"no-check-filter",0.1,DBL_MAX,REQ,p->lowPassFilter, inherit);
2858 GETINT(s,fp,"port-number",0,LONG_MAX,REQ,p->portNum, inherit);
2859 GETINT(s,fp,"backlog-limit",0,LONG_MAX,REQ,p->backlogLimit, inherit);
2862 GETBOOL(s,fp,"force-ipv4",NOTREQ,p->forceIPv4,inherit);
2864 p->family = AF_INET;
2867 if (findValue (s,"backlog-factor",inherit) == NULL &&
2868 findValue (s,"backlog-limit-high",inherit) == NULL)
2870 logOrPrint (LOG_ERR,fp,
2871 "ME config: must define at least one of backlog-factor"
2872 " and backlog-limit-high. Adding %s: %f", "backlog-factor",
2874 addReal (s,"backlog-factor",LIMIT_FUDGE) ;
2878 GETBOOL(s,fp,"backlog-feed-first",NOTREQ,p->backlogFeedFirst, inherit);
2880 /* Innfeed should emit a warning if backlog-feed-first is set
2881 to "true" for any peer that doesn't have max-connections and
2882 initial-connections both set to "1" */
2883 if ((p->backlogFeedFirst)
2884 && ((p->initialConnections <= 1) || (p->absMaxConnections != 1)))
2886 if (p->peerName != NULL)
2887 logOrPrint (LOG_WARNING,fp,
2888 "ME config: innfeed will make more than one connection"
2889 " to peer %s, but backlog-feed-first is set", p->peerName);
2891 logOrPrint (LOG_WARNING,fp,
2892 "ME config: innfeed will make more than one connection"
2893 " to peer, but backlog-feed-first is set");
2896 GETINT(s,fp,"backlog-limit-high",0,LONG_MAX,NOTREQNOADD,p->backlogLimitHigh, inherit);
2897 GETREAL(s,fp,"backlog-factor",1.0,DBL_MAX,NOTREQNOADD,p->backlogFactor, inherit);
2899 GETINT(s,fp,"dynamic-method",0,3,REQ,p->dynamicMethod, inherit);
2900 GETREAL(s,fp,"dynamic-backlog-filter",0.0,DBL_MAX,REQ,p->dynBacklogFilter, inherit);
2901 GETREAL(s,fp,"dynamic-backlog-low",0.0,100.0,REQ,p->dynBacklogLowWaterMark, inherit);
2902 GETREAL(s,fp,"dynamic-backlog-high",0.0,100.0,REQ,p->dynBacklogHighWaterMark, inherit);
2908 logOrPrint (LOG_ERR,fp,
2909 "ME config: no-check-low value greater than no-check-high"
2910 " (%f vs %f). Setting to %f and %f", l, h, NOCHECKLOW,
2913 v = findValue (s,"no-check-low",NO_INHERIT) ;
2914 v->v.real_val = p->lowPassLow = NOCHECKLOW ;
2915 v = findValue (s,"no-check-high",NO_INHERIT) ;
2916 v->v.real_val = p->lowPassHigh = NOCHECKHIGH ;
2918 else if (h - l < 5.0)
2919 logOrPrint (LOG_WARNING,fp,
2920 "ME config: no-check-low and no-check-high are close"
2921 " together (%f vs %f)",l,h) ;
2929 static HostParams getHostInfo (void)
2931 static int idx = 0 ;
2936 bool isGood = false ;
2938 if (topScope == NULL)
2941 while ((v = getNextPeer (&idx)) != NULL)
2946 s = v->v.scope_val ;
2948 p=hostDetails(s,v->name,false,NULL);
2956 idx = 0 ; /* start over next time around */
2963 * fully delete and clean up the Host object.
2965 void delHost (Host host)
2969 for (h = gHostList, q = NULL ; h != NULL ; q = h, h = h->next)
2973 gHostList = gHostList->next ;
2979 ASSERT (h != NULL) ;
2981 delTape (host->myTape) ;
2983 free (host->connections) ;
2984 free (host->cxnActive) ;
2985 free (host->cxnSleeping) ;
2986 free (host->params->peerName) ;
2987 free (host->params->ipName) ;
2991 if(host->ipAddrs[0])
2992 free (host->ipAddrs[0]);
2993 free (host->ipAddrs) ;
3002 static Host findHostByName (char *name)
3006 for (h = gHostList; h != NULL; h = h->next)
3007 if ( strcmp(h->params->peerName, name) == 0 )
3016 * the article can be dropped from the process queue and the connection can
3017 * take a new article if there are any to be had.
3019 static void articleGone (Host host, Connection cxn, Article article)
3021 if ( !remArticle (article,&host->processed,&host->processedTail) )
3022 die ("remArticle in articleGone failed") ;
3024 delArticle (article) ;
3027 hostGimmeArticle (host,cxn) ; /* may not give anything over */
3037 * One of the Connections for this Host has reestablished itself, so stop
3038 * spooling article info to disk.
3040 static void hostStopSpooling (Host host)
3042 ASSERT (host->spoolTime != 0) ;
3044 clearTimer (host->statsId) ;
3045 hostLogStats (host,true) ;
3047 host->spoolTime = 0 ;
3057 * No connections are active and we're getting response 201 or 400 (or some
3058 * such) so that we need to start spooling article info to disk.
3060 static void hostStartSpooling (Host host)
3062 ASSERT (host->spoolTime == 0) ;
3064 queuesToTape (host) ;
3066 hostLogStats (host,true) ;
3068 host->spoolTime = theTime() ;
3069 if (host->firstConnectTime == 0)
3070 host->firstConnectTime = host->spoolTime ;
3072 /* don't want to log too frequently */
3073 if (SPOOL_LOG_PERIOD > 0 &&
3074 (host->spoolTime - host->lastSpoolTime) > SPOOL_LOG_PERIOD)
3076 notice ("%s spooling no active connections", host->params->peerName) ;
3077 host->lastSpoolTime = host->spoolTime ;
3080 host->connectTime = 0 ;
3082 host->notifiedChangedRemBlckd = false ;
3084 clearTimer (host->statsId) ;
3085 host->statsId = prepareSleep (hostStatsTimeoutCbk, statsPeriod, host) ;
3095 * Time to log the statistics for the Host. If FINAL is true then the
3096 * counters will be reset.
3098 static void hostLogStats (Host host, bool final)
3100 time_t now = theTime() ;
3101 time_t *startPeriod ;
3102 double cnt = (host->blCount) ? (host->blCount) : 1.0;
3103 char msgstr[SMBUF] ;
3105 if (host->spoolTime == 0 && host->connectTime == 0)
3106 return ; /* host has never connected and never started spooling*/
3108 startPeriod = (host->spoolTime != 0 ? &host->spoolTime : &host->connectTime);
3110 if (now - *startPeriod >= statsResetPeriod)
3113 if (host->spoolTime != 0)
3114 notice ("%s %s seconds %ld spooled %d on_close %d sleeping %d",
3115 host->params->peerName, (final ? "final" : "checkpoint"),
3116 (long) (now - host->spoolTime), host->artsToTape,
3117 host->artsHostClose, host->artsHostSleep) ;
3119 snprintf(msgstr, sizeof(msgstr), "accsize %.0f rejsize %.0f",
3120 host->artsSizeAccepted, host->artsSizeRejected);
3121 notice ("%s %s seconds %ld offered %d accepted %d refused %d rejected %d"
3122 " missing %d %s spooled %d on_close %d unspooled %d"
3123 " deferred %d/%.1f requeued %d"
3124 " queue %.1f/%d:%.0f,%.0f,%.0f,%.0f,%.0f,%.0f",
3125 host->params->peerName, (final ? "final" : "checkpoint"),
3126 (long) (now - host->connectTime),
3127 host->artsOffered, host->artsAccepted,
3128 host->artsNotWanted, host->artsRejected,
3129 host->artsMissing, msgstr,
3131 host->artsHostClose, host->artsFromTape,
3132 host->artsDeferred, (double)host->dlAccum/cnt,
3134 (double)host->blAccum/cnt, hostHighwater,
3135 (100.0*host->blNone)/cnt,
3136 (100.0*host->blQuartile[0])/cnt, (100.0*host->blQuartile[1])/cnt,
3137 (100.0*host->blQuartile[2])/cnt, (100.0*host->blQuartile[3])/cnt,
3138 (100.0*host->blFull)/cnt) ;
3141 if (logConnectionStats)
3145 for (i = 0 ; i < host->maxConnections ; i++)
3146 if (host->connections [i] != NULL && host->cxnActive [i])
3147 cxnLogStats (host->connections [i],final) ;
3150 /* one 'spooling backlog' message per stats logging period */
3151 host->loggedBacklog = false ;
3152 host->loggedModeOn = host->loggedModeOff = false ;
3156 host->artsOffered = 0 ;
3157 host->artsAccepted = 0 ;
3158 host->artsNotWanted = 0 ;
3159 host->artsRejected = 0 ;
3160 host->artsDeferred = 0 ;
3161 host->artsMissing = 0 ;
3162 host->artsToTape = 0 ;
3163 host->artsQueueOverflow = 0 ;
3164 host->artsCxnDrop = 0 ;
3165 host->artsHostSleep = 0 ;
3166 host->artsHostClose = 0 ;
3167 host->artsFromTape = 0 ;
3168 host->artsSizeAccepted = 0 ;
3169 host->artsSizeRejected = 0 ;
3171 *startPeriod = theTime () ; /* in of case STATS_RESET_PERIOD */
3174 /* reset these each log period */
3177 host->blQuartile[0] = host->blQuartile[1] = host->blQuartile[2] =
3178 host->blQuartile[3] = 0;
3184 /* XXX turn this section on to get a snapshot at each log period. */
3185 if (gPrintInfo != NULL)
3198 convsize(double size, char **tsize)
3201 static char tTB[]="TB";
3202 static char tGB[]="GB";
3203 static char tMB[]="MB";
3204 static char tKB[]="KB";
3205 static char tB []="B";
3207 if (size/((double)1024*1024*1024*1000)>=1.) {
3208 dsize=size/((double)1024*1024*1024*1024);
3210 } else if (size/(1024*1024*1000)>=1.) {
3211 dsize=size/(1024*1024*1024);
3213 } else if (size/(1024*1000)>=1.) {
3214 dsize=size/(1024*1024);
3216 } else if (size/1000>=1.) {
3228 * Log the status of the Hosts.
3230 extern char *versionInfo ;
3231 static void hostLogStatus (void)
3235 bool anyToLog = false ;
3236 unsigned int peerNum = 0, actConn = 0, slpConn = 0, maxcon = 0 ;
3237 static bool logged = false ;
3238 static bool flogged = false ;
3240 if (statusFile == NULL && !logged)
3242 syslog (LOG_ERR,"No status file to write to") ;
3249 for (h = gHostList ; h != NULL ; h = h->next)
3250 if (h->myTape != NULL) /* the host deletes its tape when it's closing */
3254 actConn += h->activeCxns ;
3255 slpConn += h->sleepingCxns ;
3256 maxcon += h->maxConnections ;
3262 lastStatusLog = theTime() ;
3264 TMRstart(TMR_STATUSFILE);
3265 if ((fp = fopen (statusFile,"w")) == NULL)
3268 syswarn ("ME oserr status file open: %s", statusFile) ;
3273 char timeString [30] ;
3277 double size, totalsize;
3283 sec = (long) (now - start) ;
3284 strlcpy (timeString,ctime (&now),sizeof (timeString)) ;
3288 fprintf (fp, "<HTML>\n"
3290 "<META HTTP-EQUIV=\"Refresh\" CONTENT=\"300;\">\n"
3294 fprintf (fp, "<PRE>\n");
3297 fprintf (fp,"%s\npid %d started %s\nUpdated: %s",
3298 versionInfo,(int) myPid,startTime,timeString) ;
3299 fprintf (fp,"(peers: %d active-cxns: %d sleeping-cxns: %d idle-cxns: %d)\n\n",
3300 peerNum, actConn, slpConn,(maxcon - (actConn + slpConn))) ;
3302 fprintf (fp,"Configuration file: %s\n\n",configFile) ;
3306 fprintf (fp, "</PRE>\n");
3307 fprintf (fp,"<UL>\n");
3308 for (h = gHostList ; h != NULL ; h = h->next)
3309 fprintf (fp,"<LI><A href=\"#%s\">%s</A></LI>\n",
3310 h->params->peerName, h->params->peerName);
3311 fprintf (fp,"</UL>\n\n");
3312 fprintf (fp,"<PRE>\n");
3315 mainLogStatus (fp) ;
3316 listenerLogStatus (fp) ;
3319 Default peer configuration parameters:
3320 article timeout: 600 initial connections: 1
3321 response timeout: 300 max connections: 5
3322 close period: 6000 max checks: 25
3323 want streaming: true dynamic method: 1
3324 no-check on: 95.0% dynamic backlog low: 25%
3325 no-check off: 90.0% dynamic backlog high: 50%
3326 no-check filter: 50.0 dynamic backlog filter: 0.7
3327 backlog low limit: 1024 port num: 119
3328 backlog high limit: 1280 backlog feed first: false
3331 fprintf(fp,"%sDefault peer configuration parameters:%s\n",
3332 genHtml ? "<B>" : "", genHtml ? "</B>" : "") ;
3333 fprintf(fp," article timeout: %-5d initial connections: %d\n",
3334 defaultParams->articleTimeout,
3335 defaultParams->initialConnections) ;
3336 fprintf(fp," response timeout: %-5d max connections: %d\n",
3337 defaultParams->responseTimeout,
3338 defaultParams->absMaxConnections) ;
3339 fprintf(fp," close period: %-5d max checks: %d\n",
3340 defaultParams->closePeriod,
3341 defaultParams->maxChecks) ;
3342 fprintf(fp," want streaming: %-5s dynamic method: %d\n",
3343 defaultParams->wantStreaming ? "true " : "false",
3344 defaultParams->dynamicMethod) ;
3345 fprintf(fp," no-check on: %-2.1f%% dynamic backlog low: %-2.1f%%\n",
3346 defaultParams->lowPassHigh,
3347 defaultParams->dynBacklogLowWaterMark) ;
3348 fprintf(fp," no-check off: %-2.1f%% dynamic backlog high: %-2.1f%%\n",
3349 defaultParams->lowPassLow,
3350 defaultParams->dynBacklogHighWaterMark) ;
3351 fprintf(fp," no-check filter: %-2.1f dynamic backlog filter: %-2.1f\n",
3352 defaultParams->lowPassFilter,
3353 defaultParams->dynBacklogFilter) ;
3354 fprintf(fp," backlog limit low: %-7d drop-deferred: %s\n",
3355 defaultParams->backlogLimit,
3356 defaultParams->dropDeferred ? "true " : "false");
3357 fprintf(fp," backlog limit high: %-7d min-queue-cxn: %s\n",
3358 defaultParams->backlogLimitHigh,
3359 defaultParams->minQueueCxn ? "true " : "false");
3360 fprintf(fp," backlog feed first: %s\n",
3361 defaultParams->backlogFeedFirst ? "true " : "false");
3362 fprintf(fp," backlog factor: %1.1f\n\n",
3363 defaultParams->backlogFactor);
3365 tapeLogGlobalStatus (fp) ;
3368 fprintf(fp,"%sglobal (process)%s\n",
3369 genHtml ? "<B>" : "", genHtml ? "</B>" : "") ;
3371 fprintf (fp, " seconds: %ld\n", sec) ;
3372 if (sec == 0) sec = 1 ;
3373 offered = procArtsOffered ? procArtsOffered : 1 ;
3374 totalsize = procArtsSizeAccepted+procArtsSizeRejected ;
3375 if (totalsize == 0) totalsize = 1. ;
3377 fprintf (fp, " offered: %-5ld\t%6.2f art/s\n",
3379 (double)procArtsOffered/sec) ;
3380 fprintf (fp, " accepted: %-5ld\t%6.2f art/s\t%5.1f%%\n",
3382 (double)procArtsAccepted/sec,
3383 (double)procArtsAccepted*100./offered) ;
3384 fprintf (fp, " refused: %-5ld\t%6.2f art/s\t%5.1f%%\n",
3386 (double)procArtsNotWanted/sec,
3387 (double)procArtsNotWanted*100./offered) ;
3388 fprintf (fp, " rejected: %-5ld\t%6.2f art/s\t%5.1f%%\n",
3390 (double)procArtsRejected/sec,
3391 (double)procArtsRejected*100./offered) ;
3392 fprintf (fp, " missing: %-5ld\t%6.2f art/s\t%5.1f%%\n",
3394 (double)procArtsMissing/sec,
3395 (double)procArtsMissing*100./offered) ;
3396 fprintf (fp, " deferred: %-5ld\t%6.2f art/s\t%5.1f%%\n",
3398 (double)procArtsDeferred/sec,
3399 (double)procArtsDeferred*100./offered) ;
3401 size=convsize(procArtsSizeAccepted, &tsize);
3402 fprintf (fp, "accpt size: %.3g %s", size, tsize) ;
3403 size=convsize(procArtsSizeAccepted/sec, &tsize);
3404 fprintf (fp, " \t%6.3g %s/s\t%5.1f%%\n",
3406 procArtsSizeAccepted*100./totalsize) ;
3408 size=convsize(procArtsSizeRejected, &tsize);
3409 fprintf (fp, "rejct size: %.3g %s", size, tsize) ;
3410 size=convsize(procArtsSizeRejected/sec, &tsize);
3411 fprintf (fp, " \t%6.3g %s/s\t%5.1f%%\n",
3413 procArtsSizeRejected*100./totalsize) ;
3417 for (h = gHostList ; h != NULL ; h = h->next)
3418 hostPrintStatus (h,fp) ;
3422 fprintf (fp,"</PRE>\n") ;
3423 fprintf (fp,"</BODY>\n") ;
3424 fprintf (fp,"</HTML>\n") ;
3429 TMRstop(TMR_STATUSFILE);
3433 * This prints status information for each host. An example of the
3434 * format of the output is:
3437 * seconds: 351 art. timeout: 400 ip name: foo.bar
3438 * offered: 1194 resp. timeout: 240 port: 119
3439 * accepted: 178 want streaming: yes active cxns: 6
3440 * refused: 948 is streaming: yes sleeping cxns: 0
3441 * rejected: 31 max checks: 25 initial cxns: 5
3442 * missing: 0 no-check on: 95.0% idle cxns: 4
3443 * deferred: 0 no-check off: 95.0% max cxns: 8/10
3444 * requeued: 0 no-check fltr: 50.0 queue length: 0.0/200
3445 * spooled: 0 dynamic method: 0 empty: 100.0%
3446 *[overflow]: 0 dyn b'log low: 25% >0%-25%: 0.0%
3447 *[on_close]: 0 dyn b'log high: 50% 25%-50%: 0.0%
3448 *[sleeping]: 0 dyn b'log stat: 37% 50%-75%: 0.0%
3449 * unspooled: 0 dyn b'log fltr: 0.7 75%-<100%: 0.0%
3450 * no queue: 1234 avr.cxns queue: 0.0 full: 0.0%
3451 *accpt size: 121.1 MB drop-deferred: false defer length: 0
3452 *rejct size: 27.1 MB min-queue-cxn: false
3453 * backlog low limit: 1000000
3454 * backlog high limit: 2000000 (factor 2.0)
3455 * backlog shrinkage: 0 bytes (from current file)
3456 * offered: 1.13 art/s accepted: 0.69 art/s (101.71 KB/s)
3457 * refused: 0.01 art/s rejected: 0.42 art/s (145.11 KB/s)
3458 * missing 0 spooled 0
3461 static void hostPrintStatus (Host host, FILE *fp)
3463 time_t now = theTime() ;
3464 double cnt = (host->blCount) ? (host->blCount) : 1.0;
3467 char buf[]="1234.1 MB";
3469 ASSERT (host != NULL) ;
3470 ASSERT (fp != NULL) ;
3473 fprintf (fp,"<A name=\"%s\"><B>%s</B></A>",host->params->peerName,
3474 host->params->peerName);
3476 fprintf (fp,"%s",host->params->peerName);
3478 if (host->blockedReason != NULL)
3479 fprintf (fp," (remote status: ``%s'')",host->blockedReason) ;
3483 fprintf (fp, " seconds: %-7ld art. timeout: %-5d ip name: %s\n",
3484 host->firstConnectTime > 0 ? (long)(now - host->firstConnectTime) : 0,
3485 host->params->articleTimeout, host->params->ipName) ;
3487 fprintf (fp, " offered: %-7ld resp. timeout: %-5d port: %d\n",
3488 (long) host->gArtsOffered, host->params->responseTimeout,
3489 host->params->portNum);
3491 fprintf (fp, " accepted: %-7ld want streaming: %s active cxns: %d\n",
3492 (long) host->gArtsAccepted,
3493 (host->params->wantStreaming ? "yes" : "no "),
3496 fprintf (fp, " refused: %-7ld is streaming: %s sleeping cxns: %d\n",
3497 (long) host->gArtsNotWanted,
3498 (host->remoteStreams ? "yes" : "no "),
3499 host->sleepingCxns) ;
3501 fprintf (fp, " rejected: %-7ld max checks: %-5d initial cxns: %d\n",
3502 (long) host->gArtsRejected, host->params->maxChecks,
3503 host->params->initialConnections) ;
3505 fprintf (fp, " missing: %-7ld no-check on: %-3.1f%% idle cxns: %d\n",
3506 (long) host->gArtsMissing, host->params->lowPassHigh,
3507 host->maxConnections - (host->activeCxns + host->sleepingCxns)) ;
3509 fprintf (fp, " deferred: %-7ld no-check off: %-3.1f%% max cxns: %d/%d\n",
3510 (long) host->gArtsDeferred, host->params->lowPassLow,
3511 host->maxConnections, host->params->absMaxConnections) ;
3513 fprintf (fp, " requeued: %-7ld no-check fltr: %-3.1f queue length: %-3.1f/%d\n",
3514 (long) host->gArtsCxnDrop, host->params->lowPassFilter,
3515 (double)host->blAccum / cnt, hostHighwater) ;
3517 fprintf (fp, " spooled: %-7ld dynamic method: %-5d empty: %-3.1f%%\n",
3518 (long) host->gArtsToTape,
3519 host->params->dynamicMethod,
3520 100.0 * host->blNone / cnt) ;
3522 fprintf (fp, "[overflow]: %-7ld dyn b'log low: %-3.1f%% >0%%-25%%: %-3.1f%%\n",
3523 (long) host->gArtsQueueOverflow,
3524 host->params->dynBacklogLowWaterMark,
3525 100.0 * host->blQuartile[0] / cnt) ;
3527 fprintf (fp, "[on_close]: %-7ld dyn b'log high: %-3.1f%% 25%%-50%%: %-3.1f%%\n",
3528 (long) host->gArtsHostClose,
3529 host->params->dynBacklogHighWaterMark,
3530 100.0 * host->blQuartile[1] / cnt) ;
3532 fprintf (fp, "[sleeping]: %-7ld dyn b'log stat: %-3.1f%% 50%%-75%%: %-3.1f%%\n",
3533 (long) host->gArtsHostSleep,
3534 host->backlogFilter*100.0*(1.0-host->params->dynBacklogFilter),
3535 100.0 * host->blQuartile[2] / cnt) ;
3537 fprintf (fp, " unspooled: %-7ld dyn b'log fltr: %-3.1f 75%%-<100%%: %-3.1f%%\n",
3538 (long) host->gArtsFromTape,
3539 host->params->dynBacklogLowWaterMark,
3540 100.0 * host->blQuartile[3] / cnt) ;
3542 fprintf (fp, " no queue: %-7ld avr.cxns queue: %-3.1f full: %-3.1f%%\n",
3543 (long) host->gNoQueue,
3544 (double) host->gCxnQueue / (host->gArtsOffered ? host->gArtsOffered :1) ,
3545 100.0 * host->blFull / cnt) ;
3546 size=convsize(host->gArtsSizeAccepted, &tsize);
3547 snprintf(buf,sizeof(buf),"%.3g %s", size, tsize);
3548 fprintf (fp, "accpt size: %-8s drop-deferred: %-5s defer length: %-3.1f\n",
3549 buf, host->params->dropDeferred ? "true " : "false",
3550 (double)host->dlAccum / cnt) ;
3551 size=convsize(host->gArtsSizeRejected, &tsize);
3552 snprintf(buf,sizeof(buf),"%.3g %s", size, tsize);
3553 fprintf (fp, "rejct size: %-8s min-queue-cxn: %s\n",
3554 buf, host->params->minQueueCxn ? "true " : "false");
3556 tapeLogStatus (host->myTape,fp) ;
3559 time_t sec = (time_t) (now - host->connectTime);
3560 double or, ar, rr, jr;
3564 or = (double) host->artsOffered / (double) sec;
3565 ar = (double) host->artsAccepted / (double) sec;
3566 rr = (double) host->artsNotWanted / (double) sec;
3567 jr = (double) host->artsRejected / (double) sec;
3568 ars = convsize (host->artsSizeAccepted/sec, &tars);
3569 jrs = convsize (host->artsSizeRejected/sec, &tjrs);
3570 fprintf(fp, " offered: %5.2f art/s accepted: %5.2f art/s, %.3g %s/s\n",
3572 fprintf(fp, " refused: %5.2f art/s rejected: %5.2f art/s, %.3g %s/s\n",
3575 fprintf(fp, " missing %d spooled %d\n",
3576 host->artsMissing, host->artsToTape);
3579 #ifdef XXX_STATSHACK
3581 time_t now = time(NULL), sec = (long) (now - host->connectTime);
3582 float or, ar, rr, jr;
3585 or = (float) host->artsOffered / (float) sec;
3586 ar = (float) host->artsAccepted / (float) sec;
3587 rr = (float) host->artsNotWanted / (float) sec;
3588 jr = (float) host->artsRejected / (float) sec;
3589 fprintf(fp, "\t\tor %02.2f ar %02.2f rr %02.2f jr %02.2f\n",
3592 fprintf(fp, "\tmissing %d spooled %d\n",
3593 host->artsMissing,host->backlogSpooled);
3595 #endif /* XXX_STATSHACK */
3597 fprintf (fp, "\n\n");
3607 * The callback function for the statistics timer to call.
3609 static void hostStatsTimeoutCbk (TimeoutId tid UNUSED, void *data)
3611 Host host = (Host) data ;
3612 time_t now = theTime () ;
3614 ASSERT (tid == host->statsId) ;
3616 if (!amClosing (host))
3617 hostLogStats (host, false) ;
3619 if (now - lastStatusLog >= statsPeriod)
3622 host->statsId = prepareSleep (hostStatsTimeoutCbk, statsPeriod, host) ;
3627 * The callback function for the deferred article timer to call.
3629 static void hostDeferredArtCbk (TimeoutId tid UNUSED, void *data)
3631 Host host = (Host) data ;
3632 time_t now = theTime () ;
3635 ASSERT (tid == host->deferredId) ;
3637 while (host->deferred && host->deferred->whenToRequeue <= now)
3639 article = remHead (&host->deferred,&host->deferredTail) ;
3641 hostSendArticle (host, article) ; /* requeue it */
3645 host->deferredId = prepareSleep (hostDeferredArtCbk,
3646 host->deferred->whenToRequeue - now,
3649 host->deferredId = 0;
3653 /* if the host has too many unprocessed articles so we send some to the tape. */
3654 static void backlogToTape (Host host)
3658 while ((host->backlog + host->deferLen) > hostHighwater)
3660 if (!host->loggedBacklog)
3662 host->loggedBacklog = true ;
3665 if (host->deferred != NULL)
3667 article = remHead (&host->deferred,&host->deferredTail) ;
3672 article = remHead (&host->queued,&host->queuedTail) ;
3676 ASSERT(article != NULL);
3678 host->artsQueueOverflow++ ;
3679 host->gArtsQueueOverflow++ ;
3680 host->artsToTape++ ;
3681 host->gArtsToTape++ ;
3683 tapeTakeArticle (host->myTape,article) ;
3694 * Returns true of the Host is in the middle of closing down.
3696 static bool amClosing (Host host)
3698 return (host->myTape == NULL ? true : false) ;
3708 * flush all queued articles all the way out to disk.
3710 static void queuesToTape (Host host)
3714 while ((art = remHead (&host->processed,&host->processedTail)) != NULL)
3716 host->artsHostClose++ ;
3717 host->gArtsHostClose++ ;
3718 host->artsToTape++ ;
3719 host->gArtsToTape++ ;
3721 tapeTakeArticle (host->myTape,art) ;
3724 while ((art = remHead (&host->queued,&host->queuedTail)) != NULL)
3727 host->artsHostClose++ ;
3728 host->gArtsHostClose++ ;
3729 host->artsToTape++ ;
3730 host->gArtsToTape++ ;
3732 tapeTakeArticle (host->myTape,art) ;
3735 while ((art = remHead (&host->deferred,&host->deferredTail)) != NULL)
3738 host->artsHostClose++ ;
3739 host->gArtsHostClose++ ;
3740 host->artsToTape++ ;
3741 host->gArtsToTape++ ;
3743 tapeTakeArticle (host->myTape,art) ;
3746 while ((art = remHead (&host->deferred,&host->deferredTail)) != NULL)
3749 host->artsHostClose++ ;
3750 host->gArtsHostClose++ ;
3751 host->artsToTape++ ;
3752 host->gArtsToTape++ ;
3754 tapeTakeArticle (host->myTape,art) ;
3764 #define QUEUE_ELEM_POOL_SIZE ((4096 - 2 * (sizeof (void *))) / (sizeof (struct proc_q_elem)))
3766 static ProcQElem queueElemPool ;
3769 * Add an article to the given queue.
3771 static void queueArticle (Article article, ProcQElem *head, ProcQElem *tail,
3776 if (queueElemPool == NULL)
3781 xmalloc (sizeof(struct proc_q_elem) * QUEUE_ELEM_POOL_SIZE) ;
3783 for (i = 0; i < QUEUE_ELEM_POOL_SIZE - 1; i++)
3784 queueElemPool[i] . next = &(queueElemPool [i + 1]) ;
3785 queueElemPool [QUEUE_ELEM_POOL_SIZE-1] . next = NULL ;
3788 elem = queueElemPool ;
3789 ASSERT (elem != NULL) ;
3790 queueElemPool = queueElemPool->next ;
3792 elem->article = article ;
3794 elem->prev = *tail ;
3795 elem->whenToRequeue = when ;
3797 (*tail)->next = elem ;
3810 * remove the article from the queue
3812 static bool remArticle (Article article, ProcQElem *head, ProcQElem *tail)
3816 ASSERT (head != NULL) ;
3817 ASSERT (tail != NULL) ;
3819 /* we go backwards down the list--probably faster */
3821 while (elem != NULL && elem->article != article)
3826 if (elem->prev != NULL)
3827 elem->prev->next = elem->next ;
3828 if (elem->next != NULL)
3829 elem->next->prev = elem->prev ;
3831 *head = elem->next ;
3833 *tail = elem->prev ;
3835 elem->next = queueElemPool ;
3836 queueElemPool = elem ;
3851 * remove the article that's at the head of the queue and return
3852 * it. Returns NULL if the queue is empty.
3854 static Article remHead (ProcQElem *head, ProcQElem *tail)
3859 ASSERT (head != NULL) ;
3860 ASSERT (tail != NULL) ;
3861 ASSERT ((*head == NULL && *tail == NULL) ||
3862 (*head != NULL && *tail != NULL)) ;
3868 art = elem->article ;
3869 *head = elem->next ;
3870 if (elem->next != NULL)
3871 elem->next->prev = NULL ;
3876 elem->next = queueElemPool ;
3877 queueElemPool = elem ;
3884 static int validateInteger (FILE *fp, const char *name,
3885 long low, long high, int required, long setval,
3886 scope * sc, unsigned int inh)
3888 int rval = VALUE_OK ;
3891 char *p = strrchr (name,':') ;
3893 v = findValue (sc,name,inh) ;
3894 if (v == NULL && required != NOTREQNOADD)
3896 s = findScope (sc,name,0) ;
3897 addInteger (s,p ? p + 1 : name,setval) ;
3898 if (required == REQ)
3900 rval = VALUE_MISSING ;
3901 logOrPrint (LOG_ERR,fp,
3902 "ME config: no definition for required key %s",name) ;
3905 logOrPrint (LOG_INFO,fp,
3906 "ME config: adding missing key/value %s: %ld",name
3909 else if (v != NULL && v->type != intval)
3911 rval = VALUE_WRONG_TYPE ;
3912 logOrPrint (LOG_ERR,fp,"ME config: value of %s is not an integer",name) ;
3914 else if (v != NULL && low != LONG_MIN && v->v.int_val < low)
3916 rval = VALUE_TOO_LOW ;
3917 logOrPrint (LOG_ERR,fp,
3918 "ME config: value of %s (%ld) in %s is lower than minimum"
3919 " of %ld. Using %ld",name,v->v.int_val,
3920 "global scope",low,low) ;
3921 v->v.int_val = low ;
3923 else if (v != NULL && high != LONG_MAX && v->v.int_val > high)
3925 rval = VALUE_TOO_HIGH ;
3926 logOrPrint(LOG_ERR,fp,
3927 "ME config: value of %s (%ld) in %s is higher than maximum"
3928 " of %ld. Using %ld",name,v->v.int_val,
3929 "global scope",high,high);
3930 v->v.int_val = high ;
3938 static int validateReal (FILE *fp, const char *name, double low,
3939 double high, int required, double setval,
3940 scope * sc, unsigned int inh)
3942 int rval = VALUE_OK ;
3945 char *p = strrchr (name,':') ;
3947 v = findValue (sc,name,inh) ;
3948 if (v == NULL && required != NOTREQNOADD)
3950 s = findScope (sc,name,0) ;
3951 addReal (s,p ? p + 1 : name,setval) ;
3952 if (required == REQ)
3954 rval = VALUE_MISSING ;
3955 logOrPrint (LOG_ERR,fp,
3956 "ME config: no definition for required key %s",name) ;
3959 logOrPrint (LOG_INFO,fp,
3960 "ME config: adding missing key/value %s: %f",name,
3963 else if (v != NULL && v->type != realval)
3965 rval = VALUE_WRONG_TYPE ;
3966 logOrPrint (LOG_ERR,fp,
3967 "ME config: value of %s is not a floating point number",
3970 else if (v != NULL && low != -DBL_MAX && v->v.real_val < low)
3972 logOrPrint (LOG_ERR,fp,
3973 "ME config: value of %s (%f) is lower than minimum of %f",
3974 name,v->v.real_val,low) ;
3975 v->v.real_val = setval ;
3977 else if (v != NULL && high != DBL_MAX && v->v.real_val > high)
3979 logOrPrint (LOG_ERR,fp,
3980 "ME config: value of %s (%f) is higher than maximum of %f",
3981 name,v->v.real_val,high) ;
3982 v->v.real_val = setval ;
3990 static int validateBool (FILE *fp, const char *name, int required, bool setval,
3991 scope * sc, unsigned int inh)
3993 int rval = VALUE_OK ;
3996 char *p = strrchr (name,':') ;
3998 v = findValue (sc,name,inh) ;
3999 if (v == NULL && required != NOTREQNOADD)
4001 s = findScope (sc,name,0) ;
4002 addBoolean (s,p ? p + 1 : name, setval ? 1 : 0) ;
4003 if (required == REQ)
4005 rval = VALUE_MISSING ;
4006 logOrPrint (LOG_ERR,fp,
4007 "ME config: no definition for required key %s",name) ;
4010 logOrPrint (LOG_INFO,fp,
4011 "ME config: adding missing key/value %s: %s",name,
4012 (setval ? "true" : "false")) ;
4014 else if (v != NULL && v->type != boolval)
4016 rval = VALUE_WRONG_TYPE ;
4017 logOrPrint (LOG_ERR,fp,"ME config: value of %s is not a boolean",name) ;
4024 void gCalcHostBlStat (void)
4028 for (h = gHostList ; h != NULL ; h = h->next)
4030 h->dlAccum += h->deferLen ;
4031 h->blAccum += h->backlog ;
4032 if (h->backlog == 0)
4034 else if (h->backlog >= (hostHighwater - h->deferLen))
4037 h->blQuartile[(4*h->backlog) / (hostHighwater - h->deferLen)]++ ;
4041 static void hostCleanup (void)
4043 if (statusFile != NULL)