1 /* $Id: connection.c 7793 2008-04-26 08:15:40Z iulius $
3 ** The implementation of the innfeed Connection class.
5 ** Written by James Brister <brister@vix.com>
7 ** The Connection object is what manages the NNTP protocol. If the remote
8 ** doesn't do streaming, then the standard IHAVE lock-step protcol is
9 ** performed. In the streaming situation we have two cases. One where we must
10 ** send CHECK commands, and the other where we can directly send TAKETHIS
11 ** commands without a prior CHECK.
13 ** The Connection object maintains four article queues. The first one is
14 ** where new articles are put if they need to have an IHAVE or CHECK command
15 ** sent for them. The second queue is where the articles move from the first
16 ** after their IHAVE/CHECK command is sent, but the reply has not yet been
17 ** seen. The third queue is where articles go after the IHAVE/CHECK reply has
18 ** been seen (and the reply says to send the article). It is articles in the
19 ** third queue that have the TAKETHIS command sent, or the body of an IHAVE.
20 ** The third queue is also where new articles go if the connection is running
21 ** in no-CHECK mode. The fourth queue is where the articles move to from the
22 ** third queue after their IHAVE-body or TAKETHIS command has been sent. When
23 ** the response to the IHAVE-body or TAKETHIS is received the articles are
24 ** removed from the fourth queue and the Host object controlling this
25 ** Connection is notified of the success or failure of the transfer.
27 ** The whole system is event-driven by the EndPoint class and the Host via
28 ** calls to prepareRead() and prepareWrite() and prepareSleep().
31 ** We should probably store the results of gethostbyname in the connection so
32 ** we can rotate through the address when one fails for connecting. Perhaps
33 ** the gethostbyname should be done in the Host and the connection should
34 ** just be given the address to use.
36 ** Should we worry about articles being stuck on a queue for ever if the
37 ** remote forgets to send a response to a CHECK?
39 ** Perhaps instead of killing the connection on some of the more simple
40 ** errors, we should perhaps try to flush the input and keep going.
42 ** Worry about counter overflow.
44 ** Worry about stats gathering when switch to no-check mode.
46 ** XXX if issueQUIT() has a problem and the state goes to cxnDeadS this is
47 ** not handled properly everywhere yet.
53 #include "portable/socket.h"
54 #include "portable/time.h"
63 #if defined (__FreeBSD__)
64 # include <sys/ioctl.h>
67 #include "inn/messages.h"
72 #include "configfile.h"
73 #include "connection.h"
78 #define VALIDATE_CONNECTION(x) ((void) 0)
80 #define VALIDATE_CONNECTION(x) validateConnection (x)
83 extern char **PointersFreedOnExit ;
84 extern const char *pidFile ;
90 /* We keep a linked list of articles the connection is trying to transmit */
91 typedef struct art_holder_s
94 struct art_holder_s *next ;
99 cxnStartingS, /* the connection's start state. */
100 cxnWaitingS, /* not connected. Waiting for an article. */
101 cxnConnectingS, /* in the middle of connecting */
102 cxnIdleS, /* open and ready to feed, has empty queues */
103 cxnIdleTimeoutS, /* timed out in the idle state */
104 cxnFeedingS, /* in the processes of feeding articles */
105 cxnSleepingS, /* blocked on reestablishment timer */
106 cxnFlushingS, /* am waiting for queues to drain to bounce connection. */
107 cxnClosingS, /* have been told to close down permanently when queues drained */
108 cxnDeadS /* connection is dead. */
111 /* The Connection class */
114 Host myHost ; /* the host who owns the connection */
115 EndPoint myEp ; /* the endpoint the connection talks through */
116 unsigned int ident ; /* an identifier for syslogging. */
117 CxnState state ; /* the state the connection is in */
121 * The Connection maintains 4 queue of articles.
123 ArtHolder checkHead ; /* head of article list to do CHECK/IHAVE */
124 ArtHolder checkRespHead ; /* head of list waiting on CHECK/IHAVE
126 ArtHolder takeHead ; /* head of list of articles to send
127 TAKETHIS/IHAVE-body */
128 ArtHolder takeRespHead ; /* list of articles waiting on
129 TAKETHIS/IHAVE-body response */
130 unsigned int articleQTotal ; /* number of articles in all four queues */
131 ArtHolder missing ; /* head of missing list */
134 Buffer respBuffer ; /* buffer all responses are read into */
136 char *ipName ; /* the ip name (possibly quad) of the remote */
138 unsigned int maxCheck ; /* the max number of CHECKs to send */
139 unsigned short port ; /* the port number to use */
142 * Timeout values and their callback IDs
145 /* Timer for max amount of time between receiving articles from the
147 unsigned int articleReceiptTimeout ;
148 TimeoutId artReceiptTimerId ;
150 /* Timer for the max amount of time to wait for a response from the
152 unsigned int readTimeout ;
153 TimeoutId readBlockedTimerId ;
155 /* Timer for the max amount of time to wait for a any amount of data
156 to be written to the remote */
157 unsigned int writeTimeout ;
158 TimeoutId writeBlockedTimerId ;
160 /* Timer for the max number of seconds to keep the network connection
161 up (long lasting connections give older nntp servers problems). */
162 unsigned int flushTimeout ;
163 TimeoutId flushTimerId ;
165 /* Timer for the number of seconds to sleep before attempting a
167 unsigned int sleepTimeout ;
168 TimeoutId sleepTimerId ;
171 bool loggedNoCr ; /* true if we logged the NOCR_MSG */
172 bool immedRecon ; /* true if we recon immediately after flushing. */
173 bool doesStreaming ; /* true if remote will handle streaming */
174 bool authenticated ; /* true if remote authenticated */
175 bool quitWasIssued ; /* true if QUIT command was sent. */
176 bool needsChecks ; /* true if we issue CHECK commands in
177 streaming mode (rather than just sending
178 TAKETHIS commands) */
180 time_t timeCon ; /* the time the connect happened (including
181 the MODE STREAM command). */
186 unsigned int artsTaken ; /* the number of articles INN gave this cxn */
187 unsigned int checksIssued ; /* the number of CHECKS/IHAVES we
188 sent. Note that if we're running in
189 no-CHECK mode, then we add in the
190 TAKETHIS commands too */
191 unsigned int checksRefused ; /* the number of response 435/438 */
192 unsigned int takesRejected ; /* the number of response 437/439 recevied */
193 unsigned int takesOkayed ; /* the number of response 235/239 received */
195 double takesSizeRejected ;
196 double takesSizeOkayed ;
198 double onThreshold ; /* for no-CHECK mode */
199 double offThreshold ; /* for no-CHECK mode */
200 double filterValue ; /* current value of IIR filter */
201 double lowPassFilter ; /* time constant for IIR filter */
203 Connection next ; /* for global list. */
206 static Connection gCxnList = NULL ;
207 static unsigned int gCxnCount = 0 ;
208 static unsigned int max_reconnect_period = MAX_RECON_PER ;
209 static unsigned int init_reconnect_period = INIT_RECON_PER ;
211 static bool inited = false ;
213 static Buffer dotFirstBuffer ;
214 static Buffer dotBuffer ;
215 static Buffer crlfBuffer ;
218 /***************************************************
220 * Private function declarations.
222 ***************************************************/
226 static void connectionDone (EndPoint e, IoStatus i, Buffer *b, void *d) ;
227 static void getBanner (EndPoint e, IoStatus i, Buffer *b, void *d) ;
228 static void getAuthUserResponse (EndPoint e, IoStatus i, Buffer *b, void *d) ;
229 static void getAuthPassResponse (EndPoint e, IoStatus i, Buffer *b, void *d) ;
230 static void getModeResponse (EndPoint e, IoStatus i, Buffer *b, void *d) ;
231 static void responseIsRead (EndPoint e, IoStatus i, Buffer *b, void *d) ;
232 static void quitWritten (EndPoint e, IoStatus i, Buffer *b, void *d) ;
233 static void ihaveBodyDone (EndPoint e, IoStatus i, Buffer *b, void *d) ;
234 static void commandWriteDone (EndPoint e, IoStatus i, Buffer *b, void *d) ;
235 static void modeCmdIssued (EndPoint e, IoStatus i, Buffer *b, void *d) ;
236 static void authUserIssued (EndPoint e, IoStatus i, Buffer *b, void *d) ;
237 static void authPassIssued (EndPoint e, IoStatus i, Buffer *b, void *d) ;
238 static void writeProgress (EndPoint e, IoStatus i, Buffer *b, void *d) ;
241 /* Timer callbacks */
242 static void responseTimeoutCbk (TimeoutId id, void *data) ;
243 static void writeTimeoutCbk (TimeoutId id, void *data) ;
244 static void reopenTimeoutCbk (TimeoutId id, void *data) ;
245 static void flushCxnCbk (TimeoutId, void *data) ;
246 static void articleTimeoutCbk (TimeoutId id, void *data) ;
249 static void cxnWorkProc (EndPoint ep, void *data) ;
252 static void cxnSleepOrDie (Connection cxn) ;
254 /* Response processing. */
255 static void processResponse205 (Connection cxn, char *response) ;
256 static void processResponse238 (Connection cxn, char *response) ;
257 static void processResponse431 (Connection cxn, char *response) ;
258 static void processResponse438 (Connection cxn, char *response) ;
259 static void processResponse239 (Connection cxn, char *response) ;
260 static void processResponse439 (Connection cxn, char *response) ;
261 static void processResponse235 (Connection cxn, char *response) ;
262 static void processResponse335 (Connection cxn, char *response) ;
263 static void processResponse400 (Connection cxn, char *response) ;
264 static void processResponse435 (Connection cxn, char *response) ;
265 static void processResponse436 (Connection cxn, char *response) ;
266 static void processResponse437 (Connection cxn, char *response) ;
267 static void processResponse480 (Connection cxn, char *response) ;
268 static void processResponse503 (Connection cxn, char *response) ;
272 static void cxnSleep (Connection cxn) ;
273 static void cxnDead (Connection cxn) ;
274 static void cxnIdle (Connection cxn) ;
275 static void noSuchMessageId (Connection cxn, unsigned int responseCode,
276 const char *msgid, const char *response) ;
277 static void abortConnection (Connection cxn) ;
278 static void resetConnection (Connection cxn) ;
279 static void deferAllArticles (Connection cxn) ;
280 static void deferQueuedArticles (Connection cxn) ;
281 static void doSomeWrites (Connection cxn) ;
282 static bool issueIHAVE (Connection cxn) ;
283 static void issueIHAVEBody (Connection cxn) ;
284 static bool issueStreamingCommands (Connection cxn) ;
285 static Buffer buildCheckBuffer (Connection cxn) ;
286 static Buffer *buildTakethisBuffers (Connection cxn, Buffer checkBuffer) ;
287 static void issueQUIT (Connection cxn) ;
288 static void initReadBlockedTimeout (Connection cxn) ;
289 static int prepareWriteWithTimeout (EndPoint endp, Buffer *buffers,
290 EndpRWCB done, Connection cxn) ;
291 static void delConnection (Connection cxn) ;
292 static void incrFilter (Connection cxn) ;
293 static void decrFilter (Connection cxn) ;
294 static bool writesNeeded (Connection cxn) ;
295 static void validateConnection (Connection cxn) ;
296 static const char *stateToString (CxnState state) ;
298 static void issueModeStream (EndPoint e, Connection cxn) ;
299 static void issueAuthUser (EndPoint e, Connection cxn) ;
300 static void issueAuthPass (EndPoint e, Connection cxn) ;
302 static void prepareReopenCbk (Connection cxn) ;
305 /* Article queue management routines. */
306 static ArtHolder newArtHolder (Article art) ;
307 static void delArtHolder (ArtHolder artH) ;
308 static bool remArtHolder (ArtHolder art, ArtHolder *head, unsigned int *count) ;
309 static void appendArtHolder (ArtHolder artH, ArtHolder *head, unsigned int *count) ;
310 static ArtHolder artHolderByMsgId (const char *msgid, ArtHolder head) ;
312 static int fudgeFactor (int initVal) ;
317 /***************************************************
319 * Public functions implementation.
321 ***************************************************/
324 int cxnConfigLoadCbk (void *data UNUSED)
328 FILE *fp = (FILE *) data ;
330 if (getInteger (topScope,"max-reconnect-time",&iv,NO_INHERIT))
335 logOrPrint (LOG_ERR,fp,
336 "ME config: value of %s (%ld) in %s cannot be less"
337 " than 1. Using %ld", "max-reconnect-time",
338 iv,"global scope",(long) MAX_RECON_PER);
344 max_reconnect_period = (unsigned int) iv ;
346 if (getInteger (topScope,"initial-reconnect-time",&iv,NO_INHERIT))
351 logOrPrint (LOG_ERR,fp,
352 "ME config: value of %s (%ld) in %s cannot be less"
353 " than 1. Using %ld", "initial-reconnect-time",
354 iv,"global scope",(long)INIT_RECON_PER);
355 iv = INIT_RECON_PER ;
359 iv = INIT_RECON_PER ;
360 init_reconnect_period = (unsigned int) iv ;
370 * Create a new Connection object and return it. All fields are
371 * initialized to reasonable values.
373 Connection newConnection (Host host,
376 unsigned int articleReceiptTimeout,
377 unsigned int portNum,
378 unsigned int respTimeout,
379 unsigned int flushTimeout,
382 double lowPassFilter)
389 d_printf (1,"NULL ipname in newConnection\n") ;
393 if (ipname && strlen (ipname) == 0)
395 d_printf (1,"Empty ipname in newConnection\n") ;
402 cxn = xcalloc (1, sizeof(struct connection_s));
408 cxn->checkHead = NULL ;
409 cxn->checkRespHead = NULL ;
410 cxn->takeHead = NULL ;
411 cxn->takeRespHead = NULL ;
413 cxn->articleQTotal = 0 ;
414 cxn->missing = NULL ;
416 cxn->respBuffer = newBuffer (BUFFER_SIZE) ;
417 ASSERT (cxn->respBuffer != NULL) ;
419 cxn->ipName = xstrdup (ipname) ;
420 cxn->port = portNum ;
422 /* Time out the higher numbered connections faster */
423 cxn->articleReceiptTimeout = articleReceiptTimeout * 10.0 / (10.0 + id) ;
424 cxn->artReceiptTimerId = 0 ;
426 cxn->readTimeout = respTimeout ;
427 cxn->readBlockedTimerId = 0 ;
429 cxn->writeTimeout = respTimeout ; /* XXX should be a separate value */
430 cxn->writeBlockedTimerId = 0 ;
432 cxn->flushTimeout = fudgeFactor (flushTimeout) ;
433 cxn->flushTimerId = 0 ;
435 cxn->onThreshold = lowPassHigh * lowPassFilter / 100.0 ;
436 cxn->offThreshold = lowPassLow * lowPassFilter / 100.0 ;
437 cxn->lowPassFilter = lowPassFilter;
439 cxn->sleepTimerId = 0 ;
440 cxn->sleepTimeout = init_reconnect_period ;
442 resetConnection (cxn) ;
444 cxn->next = gCxnList ;
448 cxn->state = cxnStartingS ;
457 /* Create a new endpoint hooked to a non-blocking socket that is trying to
458 * connect to the host info stored in the Connection. On fast machines
459 * connecting locally the connect() may have already succeeded when this
460 * returns, but typically the connect will still be running and when it
461 * completes. The Connection will be notified via a write callback setup by
462 * prepareWrite below. If nothing goes wrong then this will return true
463 * (even if the connect() has not yet completed). If something fails
464 * (hostname lookup etc.) then it returns false (and the Connection is left
465 * in the sleeping state)..
467 * Pre-state Reason cxnConnect called
468 * --------- ------------------------
469 * cxnStartingS Connection owner issued call.
470 * cxnWaitingS side effect of cxnTakeArticle() call
471 * cxnConnecting side effect of cxnFlush() call
472 * cxnSleepingS side effect of reopenTimeoutCbk() call.
474 bool cxnConnect (Connection cxn)
476 const struct sockaddr_storage cxnAddr, cxnSelf ;
477 const struct sockaddr *retAddr;
479 const char *peerName = hostPeerName (cxn->myHost) ;
481 const struct sockaddr_in *bind_addr = hostBindAddr (cxn->myHost) ;
484 char paddr[INET6_ADDRSTRLEN];
485 const struct sockaddr_in6 *bind_addr6 = hostBindAddr6 (cxn->myHost) ;
488 ASSERT (cxn->myEp == NULL) ;
490 if (!(cxn->state == cxnStartingS ||
491 cxn->state == cxnWaitingS ||
492 cxn->state == cxnFlushingS ||
493 cxn->state == cxnSleepingS))
495 warn ("%s:%d cxnsleep connection in bad state: %s",
496 hostPeerName (cxn->myHost), cxn->ident,
497 stateToString (cxn->state)) ;
498 cxnSleepOrDie (cxn) ;
502 if (cxn->state == cxnWaitingS)
503 ASSERT (cxn->articleQTotal == 1) ;
505 ASSERT (cxn->articleQTotal == 0) ;
507 cxn->state = cxnConnectingS ;
510 family = hostAddrFamily (cxn->myHost);
512 retAddr = hostIpAddr (cxn->myHost, family) ;
516 cxnSleepOrDie (cxn) ;
520 memcpy( (void *)&cxnAddr, retAddr, SA_LEN(retAddr) );
523 if( cxnAddr.ss_family == AF_INET6 )
525 ((struct sockaddr_in6 *)&cxnAddr)->sin6_port = htons(cxn->port) ;
526 fd = socket (PF_INET6, SOCK_STREAM, 0);
531 ((struct sockaddr_in *)&cxnAddr)->sin_port = htons(cxn->port) ;
532 fd = socket (PF_INET, SOCK_STREAM, 0);
536 syswarn ("%s:%d cxnsleep can't create socket", peerName, cxn->ident) ;
537 d_printf (1,"Can't get a socket: %s\n", strerror (errno)) ;
539 cxnSleepOrDie (cxn) ;
545 /* bind to a specified IPv6 address */
546 if( (cxnAddr.ss_family == AF_INET6) && bind_addr6 )
548 memcpy( (void *)&cxnSelf, bind_addr6, sizeof(struct sockaddr_in6) );
549 if (bind (fd, (struct sockaddr *) &cxnSelf,
550 sizeof(struct sockaddr_in6)) < 0)
552 snprintf(msgbuf, sizeof(msgbuf), "bind (%s): %%m",
553 inet_ntop(AF_INET6, bind_addr6->sin6_addr.s6_addr,
554 paddr, sizeof(paddr)) );
556 syslog (LOG_ERR, msgbuf) ;
558 cxnSleepOrDie (cxn) ;
565 /* bind to a specified IPv4 address */
567 if ( (cxnAddr.ss_family == AF_INET) && bind_addr )
572 memcpy( (void *)&cxnSelf, bind_addr, sizeof(struct sockaddr_in) );
573 if (bind (fd, (struct sockaddr *) &cxnSelf,
574 sizeof(struct sockaddr_in) ) < 0)
576 snprintf(msgbuf, sizeof(msgbuf), "bind (%s): %%m",
577 inet_ntoa(bind_addr->sin_addr));
578 syslog (LOG_ERR, msgbuf) ;
580 cxnSleepOrDie (cxn) ;
586 /* set our file descriptor to non-blocking */
587 #if defined (O_NONBLOCK)
588 rval = fcntl (fd, F_GETFL, 0) ;
590 rval = fcntl (fd, F_SETFL, rval | O_NONBLOCK) ;
594 rval = ioctl (fd, FIONBIO, (char *) &state) ;
600 syswarn ("%s:%d cxnsleep can't set socket non-blocking", peerName,
604 cxnSleepOrDie (cxn) ;
609 rval = connect (fd, (struct sockaddr *) &cxnAddr,
610 SA_LEN((struct sockaddr *)&cxnAddr)) ;
611 if (rval < 0 && errno != EINPROGRESS)
613 syswarn ("%s:%d connect", peerName, cxn->ident) ;
614 hostIpFailed (cxn->myHost) ;
617 cxnSleepOrDie (cxn) ;
622 if ((cxn->myEp = newEndPoint (fd)) == NULL)
624 /* If this happens, then fd was bigger than what select could handle,
625 so endpoint.c refused to create the new object. */
627 cxnSleepOrDie (cxn) ;
633 /* when the write callback gets done the connection went through */
634 prepareWrite (cxn->myEp, NULL, NULL, connectionDone, cxn) ;
636 connectionDone (cxn->myEp, IoDone, NULL, cxn) ;
638 /* connectionDone() could set state to sleeping */
639 return (cxn->state == cxnConnectingS ? true : false) ;
646 /* Put the Connection into the wait state.
648 * Pre-state Reason cxnWait called
649 * --------- ------------------------
650 * cxnStartingS - Connection owner called cxnWait()
651 * cxnSleepingS - side effect of cxnFlush() call.
652 * cxnConnectingS - side effect of cxnFlush() call.
653 * cxnFlushingS - side effect of receiving response 205
654 * and Connection had no articles when
655 * cxnFlush() was issued.
656 * - prepareRead failed.
660 void cxnWait (Connection cxn)
662 ASSERT (cxn->state == cxnStartingS ||
663 cxn->state == cxnSleepingS ||
664 cxn->state == cxnConnectingS ||
665 cxn->state == cxnFeedingS ||
666 cxn->state == cxnFlushingS) ;
667 VALIDATE_CONNECTION (cxn) ;
669 abortConnection (cxn) ;
671 cxn->state = cxnWaitingS ;
673 hostCxnWaiting (cxn->myHost,cxn) ; /* tell our Host we're waiting */
680 /* Tells the Connection to flush itself (i.e. push out all articles,
681 * issue a QUIT and drop the network connection. If necessary a
682 * reconnect will be done immediately after. Called by the Host, or
683 * by the timer callback.
685 * Pre-state Reason cxnFlush called
686 * --------- ------------------------
687 * ALL (except cxnDeadS - Connection owner called cxnFlush()
689 * cxnFeedingS - side effect of flushCxnCbk() call.
691 void cxnFlush (Connection cxn)
693 ASSERT (cxn != NULL) ;
694 ASSERT (cxn->state != cxnStartingS) ;
695 ASSERT (cxn->state != cxnDeadS) ;
696 VALIDATE_CONNECTION (cxn) ;
709 case cxnIdleTimeoutS:
711 ASSERT (cxn->articleQTotal == 0) ;
712 if (cxn->state != cxnIdleTimeoutS)
713 clearTimer (cxn->artReceiptTimerId) ;
714 clearTimer (cxn->flushTimerId) ;
715 cxn->state = cxnFlushingS ;
722 if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
727 /* we only reconnect immediately if we're not idle when cxnFlush()
729 if (!cxn->immedRecon)
731 cxn->immedRecon = (cxn->articleQTotal > 0 ? true : false) ;
732 d_printf (1,"%s:%d immediate reconnect for a cxnFlush()\n",
733 hostPeerName (cxn->myHost), cxn->ident) ;
736 clearTimer (cxn->flushTimerId) ;
738 cxn->state = cxnFlushingS ;
740 if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
745 die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
752 * Tells the Connection to dump all articles that are queued and to issue a
753 * QUIT as quickly as possible. Much like cxnClose, except queued articles
754 * are not sent, but are given back to the Host.
756 void cxnTerminate (Connection cxn)
758 ASSERT (cxn != NULL) ;
759 ASSERT (cxn->state != cxnDeadS) ;
760 ASSERT (cxn->state != cxnStartingS) ;
761 VALIDATE_CONNECTION (cxn) ;
766 d_printf (1,"%s:%d Issuing terminate\n",
767 hostPeerName (cxn->myHost), cxn->ident) ;
769 clearTimer (cxn->flushTimerId) ;
771 cxn->state = cxnClosingS ;
773 deferQueuedArticles (cxn) ;
774 if (cxn->articleQTotal == 0)
775 issueQUIT (cxn) ; /* send out the QUIT if we can */
778 case cxnIdleTimeoutS:
780 ASSERT (cxn->articleQTotal == 0) ;
781 if (cxn->state != cxnIdleTimeoutS)
782 clearTimer (cxn->artReceiptTimerId) ;
783 clearTimer (cxn->flushTimerId) ;
784 cxn->state = cxnClosingS ;
788 case cxnFlushingS: /* we are in the middle of a periodic close. */
789 d_printf (1,"%s:%d Connection already being flushed\n",
790 hostPeerName (cxn->myHost),cxn->ident);
791 cxn->state = cxnClosingS ;
792 if (cxn->articleQTotal == 0)
793 issueQUIT (cxn) ; /* send out the QUIT if we can */
797 d_printf (1,"%s:%d Connection already closing\n",
798 hostPeerName (cxn->myHost),cxn->ident) ;
808 die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
811 VALIDATE_CONNECTION (cxn) ;
813 if (cxn->state == cxnDeadS)
815 d_printf (1,"%s:%d Deleting connection\n",hostPeerName (cxn->myHost),
818 delConnection (cxn) ;
824 /* Tells the Connection to do a disconnect and then when it is
825 * disconnected to delete itself.
827 * Pre-state Reason cxnClose called
828 * --------- ------------------------
829 * ALL (except cxnDeadS - Connecton owner called directly.
832 void cxnClose (Connection cxn)
834 ASSERT (cxn != NULL) ;
835 ASSERT (cxn->state != cxnDeadS) ;
836 ASSERT (cxn->state != cxnStartingS) ;
837 VALIDATE_CONNECTION (cxn) ;
842 d_printf (1,"%s:%d Issuing disconnect\n",
843 hostPeerName (cxn->myHost), cxn->ident) ;
845 clearTimer (cxn->flushTimerId) ;
847 cxn->state = cxnClosingS ;
849 if (cxn->articleQTotal == 0)
850 issueQUIT (cxn) ; /* send out the QUIT if we can */
854 case cxnIdleTimeoutS:
855 ASSERT (cxn->articleQTotal == 0) ;
856 if (cxn->state != cxnIdleTimeoutS)
857 clearTimer (cxn->artReceiptTimerId) ;
858 clearTimer (cxn->flushTimerId) ;
859 cxn->state = cxnClosingS ;
863 case cxnFlushingS: /* we are in the middle of a periodic close. */
864 d_printf (1,"%s:%d Connection already being flushed\n",
865 hostPeerName (cxn->myHost),cxn->ident);
866 cxn->state = cxnClosingS ;
867 if (cxn->articleQTotal == 0)
868 issueQUIT (cxn) ; /* send out the QUIT if we can */
872 d_printf (1,"%s:%d Connection already closing\n",
873 hostPeerName (cxn->myHost),cxn->ident) ;
883 die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
886 VALIDATE_CONNECTION (cxn) ;
888 if (cxn->state == cxnDeadS)
890 d_printf (1,"%s:%d Deleting connection\n",hostPeerName (cxn->myHost),
893 delConnection (cxn) ;
901 /* This is what the Host calls to get us to tranfer an article. If
902 * we're running the IHAVE sequence, then we can't take it if we've
903 * got an article already. If we're running the CHECK/TAKETHIS
904 * sequence, then we'll take as many as we can (up to our MAXCHECK
907 bool cxnTakeArticle (Connection cxn, Article art)
911 ASSERT (cxn != NULL) ;
912 VALIDATE_CONNECTION (cxn) ;
914 if ( !cxnQueueArticle (cxn,art) ) /* might change cxnIdleS to cxnFeedingS */
917 if (!(cxn->state == cxnConnectingS ||
918 cxn->state == cxnFeedingS ||
919 cxn->state == cxnWaitingS))
921 warn ("%s:%d cxnsleep connection in bad state: %s",
922 hostPeerName (cxn->myHost), cxn->ident,
923 stateToString (cxn->state)) ;
924 cxnSleepOrDie (cxn) ;
928 if (cxn->state != cxnWaitingS) /* because articleQTotal == 1 */
929 VALIDATE_CONNECTION (cxn) ;
931 ASSERT (cxn->articleQTotal == 1) ;
947 die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
957 /* if there's room in the Connection then stick the article on the
958 * queue, otherwise return false.
960 bool cxnQueueArticle (Connection cxn, Article art)
965 ASSERT (cxn != NULL) ;
966 ASSERT (cxn->state != cxnStartingS) ;
967 ASSERT (cxn->state != cxnDeadS) ;
968 VALIDATE_CONNECTION (cxn) ;
973 d_printf (5,"%s:%d Refusing article due to closing\n",
974 hostPeerName (cxn->myHost),cxn->ident) ;
978 d_printf (5,"%s:%d Refusing article due to flushing\n",
979 hostPeerName (cxn->myHost),cxn->ident) ;
983 d_printf (5,"%s:%d Refusing article due to sleeping\n",
984 hostPeerName (cxn->myHost),cxn->ident) ;
989 newArt = newArtHolder (art) ;
990 appendArtHolder (newArt, &cxn->checkHead, &cxn->articleQTotal) ;
994 if (cxn->articleQTotal != 0)
997 newArt = newArtHolder (art) ;
998 appendArtHolder (newArt, &cxn->checkHead, &cxn->articleQTotal) ;
1003 if (cxn->articleQTotal >= cxn->maxCheck)
1004 d_printf (5, "%s:%d Refusing article due to articleQTotal >= maxCheck (%d > %d)\n",
1005 hostPeerName (cxn->myHost), cxn->ident,
1006 cxn->articleQTotal, cxn->maxCheck) ;
1010 newArt = newArtHolder (art) ;
1011 if (cxn->needsChecks)
1012 appendArtHolder (newArt, &cxn->checkHead, &cxn->articleQTotal) ;
1014 appendArtHolder (newArt, &cxn->takeHead, &cxn->articleQTotal) ;
1015 if (cxn->state == cxnIdleS)
1017 cxn->state = cxnFeedingS ;
1018 clearTimer (cxn->artReceiptTimerId) ;
1024 die ("Invalid state: %s\n", stateToString (cxn->state)) ;
1029 d_printf (5,"%s:%d accepting article %s\n",hostPeerName (cxn->myHost),
1030 cxn->ident,artMsgId (art)) ;
1043 * generate a log message for activity. Usually called by the Connection's
1046 void cxnLogStats (Connection cxn, bool final)
1048 const char *peerName ;
1049 time_t now = theTime() ;
1051 ASSERT (cxn != NULL) ;
1053 /* only log stats when in one of these three states. */
1065 peerName = hostPeerName (cxn->myHost) ;
1067 notice ("%s:%d %s seconds %ld offered %d accepted %d refused %d"
1068 " rejected %d accsize %.0f rejsize %.0f", peerName, cxn->ident,
1069 (final ? "final" : "checkpoint"), (long) (now - cxn->timeCon),
1070 cxn->checksIssued, cxn->takesOkayed, cxn->checksRefused,
1071 cxn->takesRejected, cxn->takesSizeOkayed, cxn->takesSizeRejected) ;
1075 cxn->artsTaken = 0 ;
1076 cxn->checksIssued = 0 ;
1077 cxn->checksRefused = 0 ;
1078 cxn->takesRejected = 0 ;
1079 cxn->takesOkayed = 0 ;
1080 cxn->takesSizeRejected = 0 ;
1081 cxn->takesSizeOkayed = 0 ;
1083 if (cxn->timeCon > 0)
1084 cxn->timeCon = theTime() ;
1093 * return the number of articles the connection will accept.
1095 size_t cxnQueueSpace (Connection cxn)
1099 ASSERT (cxn != NULL) ;
1101 if (cxn->state == cxnFeedingS ||
1102 cxn->state == cxnIdleS ||
1103 cxn->state == cxnConnectingS ||
1104 cxn->state == cxnWaitingS)
1105 rval = cxn->maxCheck - cxn->articleQTotal ;
1115 * Print info on all the connections that currently exist.
1117 void gPrintCxnInfo (FILE *fp, unsigned int indentAmt)
1119 char indent [INDENT_BUFFER_SIZE] ;
1123 for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
1127 fprintf (fp,"%sGlobal Connection list : (count %d) {\n",
1129 for (cxn = gCxnList ; cxn != NULL ; cxn = cxn->next)
1130 printCxnInfo (cxn,fp,indentAmt + INDENT_INCR) ;
1131 fprintf (fp,"%s}\n",indent) ;
1139 * Print the info about the given connection.
1141 void printCxnInfo (Connection cxn, FILE *fp, unsigned int indentAmt)
1143 char indent [INDENT_BUFFER_SIZE] ;
1147 for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
1151 fprintf (fp,"%sConnection : %p {\n",indent, (void *) cxn) ;
1152 fprintf (fp,"%s host : %p\n",indent, (void *) cxn->myHost) ;
1153 fprintf (fp,"%s endpoint : %p\n",indent, (void *) cxn->myEp) ;
1154 fprintf (fp,"%s state : %s\n",indent, stateToString (cxn->state)) ;
1155 fprintf (fp,"%s ident : %d\n",indent,cxn->ident) ;
1156 fprintf (fp,"%s ip-name : %s\n", indent, cxn->ipName) ;
1157 fprintf (fp,"%s port-number : %d\n",indent,cxn->port) ;
1158 fprintf (fp,"%s max-checks : %d\n",indent,cxn->maxCheck) ;
1159 fprintf (fp,"%s does-streaming : %s\n",indent,
1160 boolToString (cxn->doesStreaming)) ;
1161 fprintf (fp,"%s authenticated : %s\n",indent,
1162 boolToString (cxn->authenticated)) ;
1163 fprintf (fp,"%s quitWasIssued : %s\n",indent,
1164 boolToString (cxn->quitWasIssued)) ;
1165 fprintf (fp,"%s needs-checks : %s\n",indent,
1166 boolToString (cxn->needsChecks)) ;
1168 fprintf (fp,"%s time-connected : %ld\n",indent,(long) cxn->timeCon) ;
1169 fprintf (fp,"%s articles from INN : %d\n",indent,cxn->artsTaken) ;
1170 fprintf (fp,"%s articles offered : %d\n",indent,
1171 cxn->checksIssued) ;
1172 fprintf (fp,"%s articles refused : %d\n",indent,
1173 cxn->checksRefused) ;
1174 fprintf (fp,"%s articles rejected : %d\n",indent,
1175 cxn->takesRejected) ;
1176 fprintf (fp,"%s articles accepted : %d\n",indent,
1178 fprintf (fp,"%s low-pass upper limit : %0.6f\n", indent,
1180 fprintf (fp,"%s low-pass lower limit : %0.6f\n", indent,
1181 cxn->offThreshold) ;
1182 fprintf (fp,"%s low-pass filter tc : %0.6f\n", indent,
1183 cxn->lowPassFilter) ;
1184 fprintf (fp,"%s low-pass filter : %0.6f\n", indent,
1187 fprintf (fp,"%s article-timeout : %d\n",indent,cxn->articleReceiptTimeout) ;
1188 fprintf (fp,"%s article-callback : %d\n",indent,cxn->artReceiptTimerId) ;
1190 fprintf (fp,"%s response-timeout : %d\n",indent,cxn->readTimeout) ;
1191 fprintf (fp,"%s response-callback : %d\n",indent,cxn->readBlockedTimerId) ;
1193 fprintf (fp,"%s write-timeout : %d\n",indent,cxn->writeTimeout) ;
1194 fprintf (fp,"%s write-callback : %d\n",indent,cxn->writeBlockedTimerId) ;
1196 fprintf (fp,"%s flushTimeout : %d\n",indent,cxn->flushTimeout) ;
1197 fprintf (fp,"%s flushTimerId : %d\n",indent,cxn->flushTimerId) ;
1199 fprintf (fp,"%s reopen wait : %d\n",indent,cxn->sleepTimeout) ;
1200 fprintf (fp,"%s reopen id : %d\n",indent,cxn->sleepTimerId) ;
1202 fprintf (fp,"%s CHECK queue {\n",indent) ;
1203 for (artH = cxn->checkHead ; artH != NULL ; artH = artH->next)
1204 printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1205 fprintf (fp,"%s }\n",indent) ;
1207 fprintf (fp,"%s CHECK Response queue {\n",indent) ;
1208 for (artH = cxn->checkRespHead ; artH != NULL ; artH = artH->next)
1209 printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1210 fprintf (fp,"%s }\n",indent) ;
1212 fprintf (fp,"%s TAKE queue {\n",indent) ;
1213 for (artH = cxn->takeHead ; artH != NULL ; artH = artH->next)
1214 printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1215 fprintf (fp,"%s }\n",indent) ;
1217 fprintf (fp,"%s TAKE response queue {\n",indent) ;
1218 for (artH = cxn->takeRespHead ; artH != NULL ; artH = artH->next)
1219 printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1220 fprintf (fp,"%s }\n",indent) ;
1222 fprintf (fp,"%s response buffer {\n",indent) ;
1223 printBufferInfo (cxn->respBuffer,fp,indentAmt + INDENT_INCR) ;
1224 fprintf (fp,"%s }\n",indent) ;
1226 fprintf (fp,"%s}\n",indent) ;
1234 * return the number of articles the connection will accept.
1236 bool cxnCheckstate (Connection cxn)
1240 ASSERT (cxn != NULL) ;
1242 if (cxn->state == cxnFeedingS ||
1243 cxn->state == cxnIdleS ||
1244 cxn->state == cxnConnectingS)
1254 /**********************************************************************/
1255 /** STATIC PRIVATE FUNCTIONS **/
1256 /**********************************************************************/
1260 * ENDPOINT CALLBACK AREA.
1262 * All the functions in this next section are callbacks fired by the
1263 * EndPoint objects/class (either timers or i/o completion callbacks)..
1268 * this is the first stage of the NNTP FSM. This function is called
1269 * when the tcp/ip network connection is setup and we should get
1270 * ready to read the banner message. When this function returns the
1271 * state of the Connection will still be cxnConnectingS unless
1272 * something broken, in which case it probably went into the
1273 * cxnSleepingS state.
1275 static void connectionDone (EndPoint e, IoStatus i, Buffer *b, void *d)
1277 Buffer *readBuffers ;
1278 Connection cxn = (Connection) d ;
1279 const char *peerName ;
1283 ASSERT (b == NULL) ;
1284 ASSERT (cxn->state == cxnConnectingS) ;
1285 ASSERT (!writeIsPending (cxn->myEp)) ;
1287 size = sizeof (optval) ;
1288 peerName = hostPeerName (cxn->myHost) ;
1292 errno = endPointErrno (e) ;
1293 syswarn ("%s:%d cxnsleep i/o failed", peerName, cxn->ident) ;
1295 cxnSleepOrDie (cxn) ;
1297 else if (getsockopt (endPointFd (e), SOL_SOCKET, SO_ERROR,
1298 (char *) &optval, &size) != 0)
1300 /* This is bad. Can't even get the SO_ERROR value out of the socket */
1301 syswarn ("%s:%d cxnsleep internal getsockopt", peerName, cxn->ident) ;
1303 cxnSleepOrDie (cxn) ;
1305 else if (optval != 0)
1307 /* if the connect failed then the only way to know is by getting
1308 the SO_ERROR value out of the socket. */
1310 syswarn ("%s:%d cxnsleep connect", peerName, cxn->ident) ;
1311 hostIpFailed (cxn->myHost) ;
1313 cxnSleepOrDie (cxn) ;
1317 readBuffers = makeBufferArray (bufferTakeRef (cxn->respBuffer), NULL) ;
1319 if ( !prepareRead (e, readBuffers, getBanner, cxn, 1) )
1321 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1323 cxnSleepOrDie (cxn) ;
1327 initReadBlockedTimeout (cxn) ;
1329 /* set up the callback for closing down the connection at regular
1330 intervals (due to problems with long running nntpd). */
1331 if (cxn->flushTimeout > 0)
1332 cxn->flushTimerId = prepareSleep (flushCxnCbk,
1333 cxn->flushTimeout,cxn) ;
1335 /* The state doesn't change yet until we've read the banner and
1336 tried the MODE STREAM command. */
1339 VALIDATE_CONNECTION (cxn) ;
1347 * This is called when we are so far in the connection setup that
1348 * we're confident it'll work. If the connection is IPv6, remove
1349 * the IPv4 addresses from the address list.
1351 static void connectionIfIpv6DeleteIpv4Addr (Connection cxn)
1354 struct sockaddr_storage ss;
1355 socklen_t len = sizeof(ss);
1357 if (getpeername (endPointFd (cxn->myEp), (struct sockaddr *)&ss, &len) < 0)
1359 if (ss.ss_family != AF_INET6)
1362 hostDeleteIpv4Addr (cxn->myHost);
1371 * Called when the banner message has been read off the wire and is
1372 * in the buffer(s). When this function returns the state of the
1373 * Connection will still be cxnConnectingS unless something broken,
1374 * in which case it probably went into the cxnSleepiongS state.
1376 static void getBanner (EndPoint e, IoStatus i, Buffer *b, void *d)
1378 Buffer *readBuffers ;
1379 Connection cxn = (Connection) d ;
1380 char *p = bufferBase (b[0]) ;
1383 const char *peerName ;
1386 ASSERT (e == cxn->myEp) ;
1387 ASSERT (b[0] == cxn->respBuffer) ;
1388 ASSERT (b[1] == NULL) ;
1389 ASSERT (cxn->state == cxnConnectingS) ;
1390 ASSERT (!writeIsPending (cxn->myEp));
1393 peerName = hostPeerName (cxn->myHost) ;
1395 bufferAddNullByte (b[0]) ;
1399 errno = endPointErrno (cxn->myEp) ;
1400 syswarn ("%s:%d cxnsleep can't read banner", peerName, cxn->ident) ;
1401 hostIpFailed (cxn->myHost) ;
1403 cxnSleepOrDie (cxn) ;
1405 else if (strchr (p, '\n') == NULL)
1406 { /* partial read. expand buffer and retry */
1407 expandBuffer (b[0], BUFFER_EXPAND_AMOUNT) ;
1408 readBuffers = makeBufferArray (bufferTakeRef (b[0]), NULL) ;
1410 if ( !prepareRead (e, readBuffers, getBanner, cxn, 1) )
1412 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1414 cxnSleepOrDie (cxn) ;
1417 else if ( !getNntpResponse (p, &code, &rest) )
1421 warn ("%s:%d cxnsleep response format: %s", peerName, cxn->ident, p) ;
1423 cxnSleepOrDie (cxn) ;
1431 case 200: /* normal */
1432 case 201: /* can transfer but not post -- old nntpd */
1437 cxnSleepOrDie (cxn) ;
1438 hostIpFailed (cxn->myHost) ;
1439 hostCxnBlocked (cxn->myHost, cxn, rest) ;
1443 warn ("%s:%d cxnsleep no permission to talk: %s", peerName,
1445 cxnSleepOrDie (cxn) ;
1446 hostIpFailed (cxn->myHost) ;
1447 hostCxnBlocked (cxn->myHost, cxn, rest) ;
1451 warn ("%s:%d cxnsleep response unknown banner: %d %s", peerName,
1452 cxn->ident, code, p) ;
1453 d_printf (1,"%s:%d Unknown response code: %d: %s\n",
1454 hostPeerName (cxn->myHost),cxn->ident, code, p) ;
1455 cxnSleepOrDie (cxn) ;
1456 hostIpFailed (cxn->myHost) ;
1457 hostCxnBlocked (cxn->myHost, cxn, rest) ;
1463 /* If we got this far and the connection is IPv6, remove
1464 the IPv4 addresses from the address list. */
1465 connectionIfIpv6DeleteIpv4Addr (cxn);
1467 if (hostUsername (cxn->myHost) != NULL
1468 && hostPassword (cxn->myHost) != NULL)
1469 issueAuthUser (e,cxn);
1471 issueModeStream (e,cxn);
1474 freeBufferArray (b) ;
1481 static void issueAuthUser (EndPoint e, Connection cxn)
1483 Buffer authUserBuffer;
1484 Buffer *authUserCmdBuffers,*readBuffers;
1485 size_t lenBuff = 0 ;
1488 /* 17 == strlen("AUTHINFO USER \r\n\0") */
1489 lenBuff = (17 + strlen (hostUsername (cxn->myHost))) ;
1490 authUserBuffer = newBuffer (lenBuff) ;
1491 t = bufferBase (authUserBuffer) ;
1493 sprintf (t, "AUTHINFO USER %s\r\n", hostUsername (cxn->myHost)) ;
1494 bufferSetDataSize (authUserBuffer, strlen (t)) ;
1496 authUserCmdBuffers = makeBufferArray (authUserBuffer, NULL) ;
1498 if ( !prepareWriteWithTimeout (e, authUserCmdBuffers, authUserIssued,
1501 die ("%s:%d fatal prepare write for authinfo user failed",
1502 hostPeerName (cxn->myHost), cxn->ident) ;
1505 bufferSetDataSize (cxn->respBuffer, 0) ;
1507 readBuffers = makeBufferArray (bufferTakeRef(cxn->respBuffer),NULL);
1509 if ( !prepareRead (e, readBuffers, getAuthUserResponse, cxn, 1) )
1511 warn ("%s:%d cxnsleep prepare read failed", hostPeerName (cxn->myHost),
1513 freeBufferArray (readBuffers) ;
1514 cxnSleepOrDie (cxn) ;
1524 static void issueAuthPass (EndPoint e, Connection cxn)
1526 Buffer authPassBuffer;
1527 Buffer *authPassCmdBuffers,*readBuffers;
1528 size_t lenBuff = 0 ;
1531 /* 17 == strlen("AUTHINFO PASS \r\n\0") */
1532 lenBuff = (17 + strlen (hostPassword (cxn->myHost))) ;
1533 authPassBuffer = newBuffer (lenBuff) ;
1534 t = bufferBase (authPassBuffer) ;
1536 sprintf (t, "AUTHINFO PASS %s\r\n", hostPassword (cxn->myHost)) ;
1537 bufferSetDataSize (authPassBuffer, strlen (t)) ;
1539 authPassCmdBuffers = makeBufferArray (authPassBuffer, NULL) ;
1541 if ( !prepareWriteWithTimeout (e, authPassCmdBuffers, authPassIssued,
1544 die ("%s:%d fatal prepare write for authinfo pass failed",
1545 hostPeerName (cxn->myHost), cxn->ident) ;
1548 bufferSetDataSize (cxn->respBuffer, 0) ;
1550 readBuffers = makeBufferArray (bufferTakeRef(cxn->respBuffer),NULL);
1552 if ( !prepareRead (e, readBuffers, getAuthPassResponse, cxn, 1) )
1554 warn ("%s:%d cxnsleep prepare read failed", hostPeerName (cxn->myHost),
1556 freeBufferArray (readBuffers) ;
1557 cxnSleepOrDie (cxn) ;
1567 static void issueModeStream (EndPoint e, Connection cxn)
1569 Buffer *modeCmdBuffers,*readBuffers ;
1573 #define MODE_CMD "MODE STREAM\r\n"
1575 modeBuffer = newBuffer (strlen (MODE_CMD) + 1) ;
1576 p = bufferBase (modeBuffer) ;
1578 /* now issue the MODE STREAM command */
1579 d_printf (1,"%s:%d Issuing the streaming command: %s\n",
1580 hostPeerName (cxn->myHost),cxn->ident,MODE_CMD) ;
1582 strcpy (p, MODE_CMD) ;
1584 bufferSetDataSize (modeBuffer, strlen (p)) ;
1586 modeCmdBuffers = makeBufferArray (modeBuffer, NULL) ;
1588 if ( !prepareWriteWithTimeout (e, modeCmdBuffers, modeCmdIssued,
1591 die ("%s:%d fatal prepare write for mode stream failed",
1592 hostPeerName (cxn->myHost), cxn->ident) ;
1595 bufferSetDataSize (cxn->respBuffer, 0) ;
1597 readBuffers = makeBufferArray (bufferTakeRef(cxn->respBuffer),NULL);
1599 if ( !prepareRead (e, readBuffers, getModeResponse, cxn, 1) )
1601 warn ("%s:%d cxnsleep prepare read failed", hostPeerName (cxn->myHost),
1603 freeBufferArray (readBuffers) ;
1604 cxnSleepOrDie (cxn) ;
1615 static void getAuthUserResponse (EndPoint e, IoStatus i, Buffer *b, void *d)
1617 Connection cxn = (Connection) d ;
1619 char *p = bufferBase (b[0]) ;
1621 const char *peerName ;
1623 ASSERT (e == cxn->myEp) ;
1624 ASSERT (b [0] == cxn->respBuffer) ;
1625 ASSERT (b [1] == NULL) ; /* only ever one buffer on this read */
1626 ASSERT (cxn->state == cxnConnectingS) ;
1627 VALIDATE_CONNECTION (cxn) ;
1629 peerName = hostPeerName (cxn->myHost) ;
1631 bufferAddNullByte (b[0]) ;
1633 d_printf (1,"%s:%d Processing authinfo user response: %s", /* no NL */
1634 hostPeerName (cxn->myHost), cxn->ident, p) ;
1636 if (i == IoDone && writeIsPending (cxn->myEp))
1638 /* badness. should never happen */
1639 warn ("%s:%d cxnsleep authinfo command still pending", peerName,
1642 cxnSleepOrDie (cxn) ;
1644 else if (i != IoDone)
1648 errno = endPointErrno (e) ;
1649 syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1651 cxnSleepOrDie (cxn) ;
1653 else if (strchr (p, '\n') == NULL)
1656 expandBuffer (b [0], BUFFER_EXPAND_AMOUNT) ;
1658 buffers = makeBufferArray (bufferTakeRef (b [0]), NULL) ;
1659 if ( !prepareRead (e, buffers, getAuthUserResponse, cxn, 1) )
1661 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1662 freeBufferArray (buffers) ;
1663 cxnSleepOrDie (cxn) ;
1668 clearTimer (cxn->readBlockedTimerId) ;
1670 if ( !getNntpResponse (p, &code, NULL) )
1672 warn ("%s:%d cxnsleep response to AUTHINFO USER: %s", peerName,
1675 cxnSleepOrDie (cxn) ;
1679 notice ("%s:%d connected", peerName, cxn->ident) ;
1684 issueAuthPass (e,cxn);
1688 warn ("%s:%d cxnsleep response to AUTHINFO USER: %s", peerName,
1690 cxn->authenticated = true;
1691 issueModeStream (e,cxn);
1706 static void getAuthPassResponse (EndPoint e, IoStatus i, Buffer *b, void *d)
1708 Connection cxn = (Connection) d ;
1710 char *p = bufferBase (b[0]) ;
1712 const char *peerName ;
1714 ASSERT (e == cxn->myEp) ;
1715 ASSERT (b [0] == cxn->respBuffer) ;
1716 ASSERT (b [1] == NULL) ; /* only ever one buffer on this read */
1717 ASSERT (cxn->state == cxnConnectingS) ;
1718 VALIDATE_CONNECTION (cxn) ;
1720 peerName = hostPeerName (cxn->myHost) ;
1722 bufferAddNullByte (b[0]) ;
1724 d_printf (1,"%s:%d Processing authinfo pass response: %s", /* no NL */
1725 hostPeerName (cxn->myHost), cxn->ident, p) ;
1727 if (i == IoDone && writeIsPending (cxn->myEp))
1729 /* badness. should never happen */
1730 warn ("%s:%d cxnsleep authinfo command still pending", peerName,
1733 cxnSleepOrDie (cxn) ;
1735 else if (i != IoDone)
1739 errno = endPointErrno (e) ;
1740 syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1742 cxnSleepOrDie (cxn) ;
1744 else if (strchr (p, '\n') == NULL)
1747 expandBuffer (b [0], BUFFER_EXPAND_AMOUNT) ;
1749 buffers = makeBufferArray (bufferTakeRef (b [0]), NULL) ;
1750 if ( !prepareRead (e, buffers, getAuthPassResponse, cxn, 1) )
1752 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1753 freeBufferArray (buffers) ;
1754 cxnSleepOrDie (cxn) ;
1759 clearTimer (cxn->readBlockedTimerId) ;
1761 if ( !getNntpResponse (p, &code, NULL) )
1763 warn ("%s:%d cxnsleep response to AUTHINFO PASS: %s", peerName,
1766 cxnSleepOrDie (cxn) ;
1773 notice ("%s:%d authenticated", peerName, cxn->ident) ;
1774 cxn->authenticated = true ;
1775 issueModeStream (e,cxn);
1779 warn ("%s:%d cxnsleep response to AUTHINFO PASS: %s", peerName,
1781 cxnSleepOrDie (cxn) ;
1794 * Process the remote's response to our MODE STREAM command. This is where
1795 * the Connection moves into the cxnFeedingS state. If the remote has given
1796 * us a good welcome banner, but then immediately dropped the connection,
1797 * we'll arrive here with the MODE STREAM command still queued up.
1799 static void getModeResponse (EndPoint e, IoStatus i, Buffer *b, void *d)
1801 Connection cxn = (Connection) d ;
1803 char *p = bufferBase (b[0]) ;
1805 const char *peerName ;
1807 ASSERT (e == cxn->myEp) ;
1808 ASSERT (b [0] == cxn->respBuffer) ;
1809 ASSERT (b [1] == NULL) ; /* only ever one buffer on this read */
1810 ASSERT (cxn->state == cxnConnectingS) ;
1811 VALIDATE_CONNECTION (cxn) ;
1813 peerName = hostPeerName (cxn->myHost) ;
1815 bufferAddNullByte (b[0]) ;
1817 d_printf (1,"%s:%d Processing mode response: %s", /* no NL */
1818 hostPeerName (cxn->myHost), cxn->ident, p) ;
1820 if (i == IoDone && writeIsPending (cxn->myEp))
1821 { /* badness. should never happen */
1822 warn ("%s:%d cxnsleep mode stream command still pending", peerName,
1825 cxnSleepOrDie (cxn) ;
1827 else if (i != IoDone)
1831 errno = endPointErrno (e) ;
1832 syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1834 cxnSleepOrDie (cxn) ;
1836 else if (strchr (p, '\n') == NULL)
1837 { /* partial read */
1838 expandBuffer (b [0], BUFFER_EXPAND_AMOUNT) ;
1840 buffers = makeBufferArray (bufferTakeRef (b [0]), NULL) ;
1841 if ( !prepareRead (e, buffers, getModeResponse, cxn, 1) )
1843 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1844 freeBufferArray (buffers) ;
1845 cxnSleepOrDie (cxn) ;
1850 clearTimer (cxn->readBlockedTimerId) ;
1852 if ( !getNntpResponse (p, &code, NULL) )
1854 warn ("%s:%d cxnsleep response to MODE STREAM: %s", peerName,
1857 cxnSleepOrDie (cxn) ;
1861 if (!cxn->authenticated)
1862 notice ("%s:%d connected", peerName, cxn->ident) ;
1866 case 203: /* will do streaming */
1867 hostRemoteStreams (cxn->myHost, cxn, true) ;
1869 if (hostWantsStreaming (cxn->myHost))
1871 cxn->doesStreaming = true ;
1872 cxn->maxCheck = hostMaxChecks (cxn->myHost) ;
1879 default: /* won't do it */
1880 hostRemoteStreams (cxn->myHost, cxn, false) ;
1885 /* now we consider ourselves completly connected. */
1886 cxn->timeCon = theTime () ;
1887 if (cxn->articleQTotal == 0)
1890 cxn->state = cxnFeedingS ;
1892 /* one for the connection and one for the buffer array */
1893 ASSERT (cxn->authenticated || bufferRefCount (cxn->respBuffer) == 2) ;
1895 /* there was only one line in there, right? */
1896 bufferSetDataSize (cxn->respBuffer, 0) ;
1897 buffers = makeBufferArray (bufferTakeRef (cxn->respBuffer), NULL) ;
1899 /* sleepTimeout get changed at each failed attempt, so reset. */
1900 cxn->sleepTimeout = init_reconnect_period ;
1902 if ( !prepareRead (cxn->myEp, buffers, responseIsRead, cxn, 1) )
1904 freeBufferArray (buffers) ;
1906 cxnSleepOrDie (cxn) ;
1910 /* now we wait for articles from our Host, or we have some
1911 articles already. On infrequently used connections, the
1912 network link is torn down and rebuilt as needed. So we may
1913 be rebuilding the connection here in which case we have an
1915 if (writesNeeded (cxn) || hostGimmeArticle (cxn->myHost,cxn))
1916 doSomeWrites (cxn) ;
1921 freeBufferArray (b) ;
1929 * called when a response has been read from the socket. This is
1930 * where the bulk of the processing starts.
1932 static void responseIsRead (EndPoint e, IoStatus i, Buffer *b, void *d)
1934 Connection cxn = (Connection) d ;
1938 unsigned int respSize ;
1943 const char *peerName ;
1945 ASSERT (e == cxn->myEp) ;
1946 ASSERT (b != NULL) ;
1947 ASSERT (b [1] == NULL) ;
1948 ASSERT (b [0] == cxn->respBuffer) ;
1949 ASSERT (cxn->state == cxnFeedingS ||
1950 cxn->state == cxnIdleS ||
1951 cxn->state == cxnClosingS ||
1952 cxn->state == cxnFlushingS) ;
1953 VALIDATE_CONNECTION (cxn) ;
1955 bufferAddNullByte (b [0]) ;
1957 peerName = hostPeerName (cxn->myHost) ;
1963 errno = endPointErrno (e) ;
1964 syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1966 freeBufferArray (b) ;
1968 cxnLogStats (cxn,true) ;
1970 if (cxn->state == cxnClosingS)
1973 delConnection (cxn) ;
1982 bufBase = bufferBase (buf) ;
1984 /* check that we have (at least) a full line response. If not expand
1985 the buffer and resubmit the read. */
1986 if (strchr (bufBase, '\n') == 0)
1988 if (!expandBuffer (buf, BUFFER_EXPAND_AMOUNT))
1990 warn ("%s:%d cxnsleep can't expand input buffer", peerName,
1992 freeBufferArray (b) ;
1994 cxnSleepOrDie (cxn) ;
1996 else if ( !prepareRead (cxn->myEp, b, responseIsRead, cxn, 1))
1998 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1999 freeBufferArray (b) ;
2001 cxnSleepOrDie (cxn) ;
2008 freeBufferArray (b) ; /* connection still has reference to buffer */
2012 * Now process all the full responses that we have.
2014 response = bufBase ;
2015 respSize = bufferDataSize (cxn->respBuffer) ;
2017 while ((endr = strchr (response, '\n')) != NULL)
2019 char *next = endr + 1 ;
2028 if (next - endr != 2 && !cxn->loggedNoCr)
2030 /* only a newline there. we'll live with it */
2031 warn ("%s:%d remote not giving out CR characters", peerName,
2033 cxn->loggedNoCr = true ;
2038 if ( !getNntpResponse (response, &code, &rest) )
2040 warn ("%s:%d cxnsleep response format: %s", peerName, cxn->ident,
2042 cxnSleepOrDie (cxn) ;
2047 d_printf (5,"%s:%d Response %d: %s\n", peerName, cxn->ident, code, response) ;
2049 /* now handle the response code. I'm not using symbolic names on
2050 purpose--the numbers are all you see in the RFC's. */
2053 case 205: /* OK response to QUIT. */
2054 processResponse205 (cxn, response) ;
2059 /* These three are from the CHECK command */
2060 case 238: /* no such article found */
2061 /* Do not incrFilter (cxn) now, wait till after
2062 subsequent TAKETHIS */
2063 processResponse238 (cxn, response) ;
2066 case 431: /* try again later (also for TAKETHIS) */
2068 if (hostDropDeferred (cxn->myHost))
2069 processResponse438 (cxn, response) ;
2071 processResponse431 (cxn, response) ;
2074 case 438: /* already have it */
2076 processResponse438 (cxn, response) ;
2081 /* These are from the TAKETHIS command */
2082 case 239: /* article transferred OK */
2084 processResponse239 (cxn, response) ;
2087 case 439: /* article rejected */
2089 processResponse439 (cxn, response) ;
2094 /* These are from the IHAVE command */
2095 case 335: /* send article */
2096 processResponse335 (cxn, response) ;
2099 case 435: /* article not wanted */
2100 processResponse435 (cxn, response) ;
2103 case 436: /* transfer failed try again later */
2104 if (cxn->takeRespHead == NULL && hostDropDeferred (cxn->myHost))
2105 processResponse435 (cxn, response) ;
2107 processResponse436 (cxn, response) ;
2110 case 437: /* article rejected */
2111 processResponse437 (cxn, response) ;
2114 case 400: /* has stopped accepting articles */
2115 processResponse400 (cxn, response) ;
2120 case 235: /* article transfered OK (IHAVE-body) */
2121 processResponse235 (cxn, response) ;
2125 case 480: /* Transfer permission denied. */
2126 processResponse480 (cxn,response) ;
2129 case 503: /* remote timeout. */
2130 processResponse503 (cxn,response) ;
2134 warn ("%s:%d cxnsleep response unknown: %d %s", peerName,
2135 cxn->ident, code, response) ;
2136 cxnSleepOrDie (cxn) ;
2140 VALIDATE_CONNECTION (cxn) ;
2142 if (cxn->state != cxnFeedingS && cxn->state != cxnClosingS &&
2143 cxn->state != cxnFlushingS && cxn->state != cxnIdleS /* XXX */)
2144 break ; /* connection is terminated */
2149 d_printf (5,"%s:%d done with responses\n",hostPeerName (cxn->myHost),
2158 /* see if we need to drop in to or out of no-CHECK mode */
2159 if (cxn->state == cxnFeedingS && cxn->doesStreaming)
2161 if ((cxn->filterValue > cxn->onThreshold) && cxn->needsChecks) {
2162 cxn->needsChecks = false;
2163 hostLogNoCheckMode (cxn->myHost, true,
2164 cxn->offThreshold/cxn->lowPassFilter,
2165 cxn->filterValue/cxn->lowPassFilter,
2166 cxn->onThreshold/cxn->lowPassFilter) ;
2168 } else if ((cxn->filterValue < cxn->offThreshold) &&
2169 !cxn->needsChecks) {
2170 cxn->needsChecks = true;
2171 hostLogNoCheckMode (cxn->myHost, false,
2172 cxn->offThreshold/cxn->lowPassFilter,
2173 cxn->filterValue/cxn->lowPassFilter,
2174 cxn->onThreshold/cxn->lowPassFilter) ;
2179 /* Now handle possible remaining partial reponse and set up for
2181 if (*response != '\0')
2182 { /* partial response */
2183 unsigned int leftAmt = respSize - (response - bufBase) ;
2185 d_printf (2,"%s:%d handling a partial response\n",
2186 hostPeerName (cxn->myHost),cxn->ident) ;
2188 /* first we shift what's left in the buffer down to the
2189 bottom, if needed, or just expand the buffer */
2190 if (response != bufBase)
2192 /* so next read appends */
2193 memmove (bufBase, response, leftAmt) ;
2194 bufferSetDataSize (cxn->respBuffer, leftAmt) ;
2196 else if (!expandBuffer (cxn->respBuffer, BUFFER_EXPAND_AMOUNT))
2197 die ("%s:%d cxnsleep can't expand input buffer", peerName,
2201 bufferSetDataSize (cxn->respBuffer, 0) ;
2203 bArr = makeBufferArray (bufferTakeRef (cxn->respBuffer), NULL) ;
2205 if ( !prepareRead (e, bArr, responseIsRead, cxn, 1) )
2207 warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
2208 freeBufferArray (bArr) ;
2214 /* only setup the timer if we're still waiting for a response
2215 to something. There's not necessarily a 1-to-1 mapping
2216 between reads and writes in streaming mode. May have been
2217 set already above (that would be unlikely I think). */
2218 VALIDATE_CONNECTION (cxn) ;
2220 d_printf (5,"%s:%d about to do some writes\n",
2221 hostPeerName (cxn->myHost),cxn->ident) ;
2223 doSomeWrites (cxn) ;
2225 /* If the read timer is (still) running, update it to give
2226 those terminally slow hosts that take forever to drain
2227 the network buffers and just dribble out responses the
2228 benefit of the doubt. XXX - maybe should just increase
2229 timeout for these! */
2230 if (cxn->readBlockedTimerId)
2231 cxn->readBlockedTimerId = updateSleep (cxn->readBlockedTimerId,
2236 VALIDATE_CONNECTION (cxn) ;
2239 case cxnWaitingS: /* presumably after a code 205 or 400 */
2240 case cxnConnectingS: /* presumably after a code 205 or 400 */
2241 case cxnSleepingS: /* probably after a 480 */
2245 delConnection (cxn) ;
2250 die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
2259 * called when the write of the QUIT command has completed.
2261 static void quitWritten (EndPoint e, IoStatus i, Buffer *b, void *d)
2263 Connection cxn = (Connection) d ;
2264 const char *peerName ;
2266 peerName = hostPeerName (cxn->myHost) ;
2268 clearTimer (cxn->writeBlockedTimerId) ;
2270 ASSERT (cxn->myEp == e) ;
2271 VALIDATE_CONNECTION (cxn) ;
2275 errno = endPointErrno (e) ;
2276 syswarn ("%s:%d cxnsleep can't write QUIT", peerName, cxn->ident) ;
2277 if (cxn->state == cxnClosingS)
2280 delConnection (cxn) ;
2286 /* The QUIT command has been sent, so start the response timer. */
2287 initReadBlockedTimeout (cxn) ;
2289 freeBufferArray (b) ;
2297 * called when the write of the IHAVE-body data is finished
2299 static void ihaveBodyDone (EndPoint e, IoStatus i, Buffer *b, void *d)
2301 Connection cxn = (Connection) d ;
2303 ASSERT (e == cxn->myEp) ;
2305 clearTimer (cxn->writeBlockedTimerId) ;
2309 errno = endPointErrno (e) ;
2310 syswarn ("%s:%d cxnsleep can't write IHAVE body",
2311 hostPeerName (cxn->myHost), cxn->ident) ;
2313 cxnLogStats (cxn,true) ;
2315 if (cxn->state == cxnClosingS)
2318 delConnection (cxn) ;
2324 /* The article has been sent, so start the response timer. */
2325 initReadBlockedTimeout (cxn) ;
2328 freeBufferArray (b) ;
2338 * Called when a command set (IHAVE, CHECK, TAKETHIS) has been
2339 * written to the remote.
2341 static void commandWriteDone (EndPoint e, IoStatus i, Buffer *b, void *d)
2343 Connection cxn = (Connection) d ;
2344 const char *peerName ;
2346 ASSERT (e == cxn->myEp) ;
2348 peerName = hostPeerName (cxn->myHost) ;
2350 freeBufferArray (b) ;
2352 clearTimer (cxn->writeBlockedTimerId) ;
2356 errno = endPointErrno (e) ;
2357 syswarn ("%s:%d cxnsleep can't write command", peerName, cxn->ident) ;
2359 cxnLogStats (cxn,true) ;
2361 if (cxn->state == cxnClosingS)
2364 delConnection (cxn) ;
2368 /* XXX - so cxnSleep() doesn't die in VALIDATE_CONNECTION () */
2369 deferAllArticles (cxn) ;
2377 /* Some(?) hosts return the 439 response even before we're done
2378 sending, so don't go idle until here */
2379 if (cxn->state == cxnFeedingS && cxn->articleQTotal == 0)
2382 /* The command set has been sent, so start the response timer.
2383 XXX - we'd like finer grained control */
2384 initReadBlockedTimeout (cxn) ;
2386 if ( cxn->doesStreaming )
2387 doSomeWrites (cxn) ; /* pump data as fast as possible */
2388 /* XXX - will clear the read timeout */
2397 * Called when the MODE STREAM command has been written down the pipe.
2399 static void modeCmdIssued (EndPoint e, IoStatus i, Buffer *b, void *d)
2401 Connection cxn = (Connection) d ;
2403 ASSERT (e == cxn->myEp) ;
2405 clearTimer (cxn->writeBlockedTimerId) ;
2407 /* The mode command has been sent, so start the response timer */
2408 initReadBlockedTimeout (cxn) ;
2412 d_printf (1,"%s:%d MODE STREAM command failed to write\n",
2413 hostPeerName (cxn->myHost), cxn->ident) ;
2415 syswarn ("%s:%d cxnsleep can't write MODE STREAM",
2416 hostPeerName (cxn->myHost), cxn->ident) ;
2418 cxnSleepOrDie (cxn) ;
2421 freeBufferArray (b) ;
2429 * Called when the AUTHINFO USER command has been written down the pipe.
2431 static void authUserIssued (EndPoint e, IoStatus i, Buffer *b, void *d)
2433 Connection cxn = (Connection) d ;
2435 ASSERT (e == cxn->myEp) ;
2437 clearTimer (cxn->writeBlockedTimerId) ;
2439 /* The authinfo user command has been sent, so start the response timer */
2440 initReadBlockedTimeout (cxn) ;
2444 d_printf (1,"%s:%d AUTHINFO USER command failed to write\n",
2445 hostPeerName (cxn->myHost), cxn->ident) ;
2447 syswarn ("%s:%d cxnsleep can't write AUTHINFO USER",
2448 hostPeerName (cxn->myHost), cxn->ident) ;
2450 cxnSleepOrDie (cxn) ;
2453 freeBufferArray (b) ;
2462 * Called when the AUTHINFO USER command has been written down the pipe.
2464 static void authPassIssued (EndPoint e, IoStatus i, Buffer *b, void *d)
2466 Connection cxn = (Connection) d ;
2468 ASSERT (e == cxn->myEp) ;
2470 clearTimer (cxn->writeBlockedTimerId) ;
2472 /* The authinfo pass command has been sent, so start the response timer */
2473 initReadBlockedTimeout (cxn) ;
2477 d_printf (1,"%s:%d AUTHINFO PASS command failed to write\n",
2478 hostPeerName (cxn->myHost), cxn->ident) ;
2480 syswarn ("%s:%d cxnsleep can't write AUTHINFO PASS",
2481 hostPeerName (cxn->myHost), cxn->ident) ;
2483 cxnSleepOrDie (cxn) ;
2486 freeBufferArray (b) ;
2495 * Called whenever some amount of data has been written to the pipe but
2496 * more data remains to be written
2498 static void writeProgress (EndPoint e UNUSED, IoStatus i, Buffer *b UNUSED,
2501 Connection cxn = (Connection) d ;
2503 ASSERT (i == IoProgress) ;
2505 if (cxn->writeTimeout > 0)
2506 cxn->writeBlockedTimerId = updateSleep (cxn->writeBlockedTimerId,
2507 writeTimeoutCbk, cxn->writeTimeout,
2520 * This is called when the timeout for the reponse from the remote
2521 * goes off. We tear down the connection and notify our host.
2523 static void responseTimeoutCbk (TimeoutId id, void *data)
2525 Connection cxn = (Connection) data ;
2526 const char *peerName ;
2528 ASSERT (id == cxn->readBlockedTimerId) ;
2529 ASSERT (cxn->state == cxnConnectingS ||
2530 cxn->state == cxnFeedingS ||
2531 cxn->state == cxnFlushingS ||
2532 cxn->state == cxnClosingS) ;
2533 VALIDATE_CONNECTION (cxn) ;
2535 /* XXX - let abortConnection clear readBlockedTimerId, otherwise
2536 VALIDATE_CONNECTION() will croak */
2538 peerName = hostPeerName (cxn->myHost) ;
2540 warn ("%s:%d cxnsleep non-responsive connection", peerName, cxn->ident) ;
2541 d_printf (1,"%s:%d shutting down non-repsonsive connection\n",
2542 hostPeerName (cxn->myHost), cxn->ident) ;
2544 cxnLogStats (cxn,true) ;
2546 if (cxn->state == cxnClosingS)
2548 abortConnection (cxn) ;
2549 delConnection (cxn) ;
2552 cxnSleep (cxn) ; /* will notify the Host */
2560 * This is called when the data write timeout for the remote
2561 * goes off. We tear down the connection and notify our host.
2563 static void writeTimeoutCbk (TimeoutId id, void *data)
2565 Connection cxn = (Connection) data ;
2566 const char *peerName ;
2568 ASSERT (id == cxn->writeBlockedTimerId) ;
2569 ASSERT (cxn->state == cxnConnectingS ||
2570 cxn->state == cxnFeedingS ||
2571 cxn->state == cxnFlushingS ||
2572 cxn->state == cxnClosingS) ;
2573 VALIDATE_CONNECTION (cxn) ;
2575 /* XXX - let abortConnection clear writeBlockedTimerId, otherwise
2576 VALIDATE_CONNECTION() will croak */
2578 peerName = hostPeerName (cxn->myHost) ;
2580 warn ("%s:%d cxnsleep write timeout", peerName, cxn->ident) ;
2581 d_printf (1,"%s:%d shutting down non-responsive connection\n",
2582 hostPeerName (cxn->myHost), cxn->ident) ;
2584 cxnLogStats (cxn,true) ;
2586 if (cxn->state == cxnClosingS)
2588 abortConnection (cxn) ;
2589 delConnection (cxn) ;
2592 cxnSleep (cxn) ; /* will notify the Host */
2600 * Called by the EndPoint class when the timer goes off
2602 void reopenTimeoutCbk (TimeoutId id, void *data)
2604 Connection cxn = (Connection) data ;
2606 ASSERT (id == cxn->sleepTimerId) ;
2608 cxn->sleepTimerId = 0 ;
2610 if (cxn->state != cxnSleepingS)
2612 warn ("%s:%d cxnsleep connection in bad state: %s",
2613 hostPeerName (cxn->myHost), cxn->ident,
2614 stateToString (cxn->state)) ;
2615 cxnSleepOrDie (cxn) ;
2626 * timeout callback to close down long running connection.
2628 static void flushCxnCbk (TimeoutId id, void *data)
2630 Connection cxn = (Connection) data ;
2632 ASSERT (id == cxn->flushTimerId) ;
2633 VALIDATE_CONNECTION (cxn) ;
2635 cxn->flushTimerId = 0 ;
2637 if (!(cxn->state == cxnFeedingS || cxn->state == cxnConnectingS ||
2638 cxn->state == cxnIdleS))
2640 warn ("%s:%d cxnsleep connection in bad state: %s",
2641 hostPeerName (cxn->myHost), cxn->ident,
2642 stateToString (cxn->state)) ;
2643 cxnSleepOrDie (cxn) ;
2647 d_printf (1,"%s:%d Handling periodic connection close.\n",
2648 hostPeerName (cxn->myHost), cxn->ident) ;
2650 notice ("%s:%d periodic close", hostPeerName (cxn->myHost), cxn->ident) ;
2661 * Timer callback for when the connection has not received an
2662 * article from INN. When that happens we tear down the network
2663 * connection to help recycle fds
2665 static void articleTimeoutCbk (TimeoutId id, void *data)
2667 Connection cxn = (Connection) data ;
2668 const char *peerName = hostPeerName (cxn->myHost) ;
2670 ASSERT (cxn->artReceiptTimerId == id) ;
2671 VALIDATE_CONNECTION (cxn) ;
2673 cxn->artReceiptTimerId = 0 ;
2675 if (cxn->state != cxnIdleS)
2677 warn ("%s:%d cxnsleep connection in bad state: %s",
2678 hostPeerName (cxn->myHost), cxn->ident,
2679 stateToString (cxn->state)) ;
2680 cxnSleepOrDie (cxn) ;
2685 /* it's doubtful (right?) that this timer could go off and there'd
2686 still be articles in the queue. */
2687 if (cxn->articleQTotal > 0)
2689 warn ("%s:%d idle connection still has articles", peerName, cxn->ident) ;
2693 notice ("%s:%d idle tearing down connection", peerName, cxn->ident) ;
2694 cxn->state = cxnIdleTimeoutS ;
2704 * function to be called when the fd is not ready for reading, but there is
2705 * an article on tape or in the queue to be done. Things are done this way
2706 * so that a Connection doesn't hog time trying to find the next good
2707 * article for writing. With a large backlog of expired articles that would
2708 * take a long time. Instead the Connection just tries its next article on
2709 * tape or queue, and if that's no good then it registers this callback so
2710 * that other Connections have a chance of being serviced.
2712 static void cxnWorkProc (EndPoint ep UNUSED, void *data)
2714 Connection cxn = (Connection) data ;
2716 d_printf (2,"%s:%d calling work proc\n",
2717 hostPeerName (cxn->myHost),cxn->ident) ;
2719 if (writesNeeded (cxn))
2720 doSomeWrites (cxn) ; /* may re-register the work proc... */
2721 else if (cxn->state == cxnFlushingS || cxn->state == cxnClosingS)
2723 if (cxn->articleQTotal == 0)
2727 d_printf (2,"%s:%d no writes were needed....\n",
2728 hostPeerName (cxn->myHost), cxn->ident) ;
2733 /****************************************************************************
2735 * END EndPoint callback area.
2737 ****************************************************************************/
2743 /****************************************************************************
2745 * REPONSE CODE PROCESSING.
2747 ***************************************************************************/
2751 * A connection needs to sleep, but if it's closing it needs to die instead.
2753 static void cxnSleepOrDie (Connection cxn)
2755 if (cxn->state == cxnClosingS)
2763 * Handle the response 205 to our QUIT command, which means the
2764 * remote is going away and we can happily cleanup
2766 static void processResponse205 (Connection cxn, char *response UNUSED)
2770 VALIDATE_CONNECTION (cxn) ;
2772 if (!(cxn->state == cxnFeedingS ||
2773 cxn->state == cxnIdleS ||
2774 cxn->state == cxnFlushingS ||
2775 cxn->state == cxnClosingS))
2777 warn ("%s:%d cxnsleep connection in bad state: %s",
2778 hostPeerName (cxn->myHost), cxn->ident,
2779 stateToString (cxn->state)) ;
2780 cxnSleepOrDie (cxn) ;
2788 ASSERT (cxn->articleQTotal == 0) ;
2790 cxnLogStats (cxn,true) ;
2792 immedRecon = cxn->immedRecon ;
2794 hostCxnDead (cxn->myHost,cxn) ;
2796 if (cxn->state == cxnFlushingS && immedRecon)
2798 abortConnection (cxn) ;
2799 if (!cxnConnect (cxn))
2800 notice ("%s:%d flush re-connect failed",
2801 hostPeerName (cxn->myHost), cxn->ident) ;
2803 else if (cxn->state == cxnFlushingS)
2811 /* this shouldn't ever happen... */
2812 warn ("%s:%d cxnsleep response unexpected: %d",
2813 hostPeerName (cxn->myHost), cxn->ident, 205) ;
2814 cxnSleepOrDie (cxn) ;
2818 die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
2827 * Handle a response code of 238 which is the "no such article"
2828 * reply to the CHECK command (i.e. remote wants it).
2830 static void processResponse238 (Connection cxn, char *response)
2833 ArtHolder artHolder ;
2835 if (!cxn->doesStreaming)
2837 warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
2838 " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
2840 cxnSleepOrDie (cxn) ;
2844 if (!(cxn->state == cxnFlushingS ||
2845 cxn->state == cxnFeedingS ||
2846 cxn->state == cxnClosingS))
2848 warn ("%s:%d cxnsleep connection in bad state: %s",
2849 hostPeerName (cxn->myHost), cxn->ident,
2850 stateToString (cxn->state)) ;
2851 cxnSleepOrDie (cxn) ;
2855 VALIDATE_CONNECTION (cxn) ;
2857 msgid = getMsgId (response) ;
2859 if (cxn->checkRespHead == NULL) /* peer is confused */
2861 warn ("%s:%d cxnsleep response unexpected: %d",
2862 hostPeerName (cxn->myHost),cxn->ident,238) ;
2863 cxnSleepOrDie (cxn) ;
2865 else if (msgid == NULL || strlen (msgid) == 0 ||
2866 (artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
2867 noSuchMessageId (cxn,238,msgid,response) ;
2870 /* now remove the article from the check queue and move it onto the
2871 transmit queue. Another function wil take care of transmitting */
2872 remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
2873 if (cxn->state != cxnClosingS)
2874 appendArtHolder (artHolder, &cxn->takeHead, &cxn->articleQTotal) ;
2877 hostTakeBackArticle (cxn->myHost, cxn, artHolder->article) ;
2878 delArtHolder (artHolder) ;
2891 * process the response "try again later" to the CHECK command If this
2892 * returns true then the connection is still usable.
2894 static void processResponse431 (Connection cxn, char *response)
2897 ArtHolder artHolder ;
2899 if (!cxn->doesStreaming)
2901 warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
2902 " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
2904 cxnSleepOrDie (cxn) ;
2908 if (!(cxn->state == cxnFlushingS ||
2909 cxn->state == cxnFeedingS ||
2910 cxn->state == cxnClosingS))
2912 warn ("%s:%d cxnsleep connection in bad state: %s",
2913 hostPeerName (cxn->myHost), cxn->ident,
2914 stateToString (cxn->state)) ;
2915 cxnSleepOrDie (cxn) ;
2919 VALIDATE_CONNECTION (cxn) ;
2921 msgid = getMsgId (response) ;
2923 if (cxn->checkRespHead == NULL) /* peer is confused */
2925 warn ("%s:%d cxnsleep response unexpected: %d",
2926 hostPeerName (cxn->myHost),cxn->ident,431) ;
2927 cxnSleepOrDie (cxn) ;
2929 else if (msgid == NULL || strlen (msgid) == 0 ||
2930 (artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
2931 noSuchMessageId (cxn,431,msgid,response) ;
2934 remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
2935 if (cxn->articleQTotal == 0)
2937 hostArticleDeferred (cxn->myHost, cxn, artHolder->article) ;
2938 delArtHolder (artHolder) ;
2950 * process the "already have it" response to the CHECK command. If this
2951 * returns true then the connection is still usable.
2953 static void processResponse438 (Connection cxn, char *response)
2956 ArtHolder artHolder ;
2958 if (!cxn->doesStreaming)
2960 warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
2961 " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
2963 cxnSleepOrDie (cxn) ;
2967 if (!(cxn->state == cxnFlushingS ||
2968 cxn->state == cxnFeedingS ||
2969 cxn->state == cxnClosingS))
2971 warn ("%s:%d cxnsleep connection in bad state: %s",
2972 hostPeerName (cxn->myHost), cxn->ident,
2973 stateToString (cxn->state)) ;
2974 cxnSleepOrDie (cxn) ;
2978 VALIDATE_CONNECTION (cxn) ;
2980 msgid = getMsgId (response) ;
2982 if (cxn->checkRespHead == NULL) /* peer is confused */
2984 warn ("%s:%d cxnsleep response unexpected: %d",
2985 hostPeerName (cxn->myHost),cxn->ident,438) ;
2986 cxnSleepOrDie (cxn) ;
2988 else if (msgid == NULL || strlen (msgid) == 0 ||
2989 (artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
2990 noSuchMessageId (cxn,438,msgid,response) ;
2993 cxn->checksRefused++ ;
2995 remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
2996 if (cxn->articleQTotal == 0)
2998 hostArticleNotWanted (cxn->myHost, cxn, artHolder->article);
2999 delArtHolder (artHolder) ;
3011 * process the "article transferred ok" response to the TAKETHIS.
3013 static void processResponse239 (Connection cxn, char *response)
3016 ArtHolder artHolder ;
3018 if (!cxn->doesStreaming)
3020 warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
3021 " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
3023 cxnSleepOrDie (cxn) ;
3027 if (!(cxn->state == cxnFlushingS ||
3028 cxn->state == cxnFeedingS ||
3029 cxn->state == cxnClosingS))
3031 warn ("%s:%d cxnsleep connection in bad state: %s",
3032 hostPeerName (cxn->myHost), cxn->ident,
3033 stateToString (cxn->state)) ;
3034 cxnSleepOrDie (cxn) ;
3038 VALIDATE_CONNECTION (cxn) ;
3040 msgid = getMsgId (response) ;
3042 if (cxn->takeRespHead == NULL) /* peer is confused */
3044 warn ("%s:%d cxnsleep response unexpected: %d",
3045 hostPeerName (cxn->myHost),cxn->ident,239) ;
3046 cxnSleepOrDie (cxn) ;
3048 else if (msgid == NULL || strlen (msgid) == 0 ||
3049 (artHolder = artHolderByMsgId (msgid, cxn->takeRespHead)) == NULL)
3050 noSuchMessageId (cxn,239,msgid,response) ;
3053 cxn->takesOkayed++ ;
3054 cxn->takesSizeOkayed += artSize(artHolder->article);
3056 remArtHolder (artHolder, &cxn->takeRespHead, &cxn->articleQTotal) ;
3057 if (cxn->articleQTotal == 0)
3059 hostArticleAccepted (cxn->myHost, cxn, artHolder->article) ;
3060 delArtHolder (artHolder) ;
3070 * Set the thresholds for no-CHECK mode; negative means leave existing value
3073 void cxnSetCheckThresholds (Connection cxn,
3074 double lowFilter, double highFilter,
3075 double lowPassFilter)
3077 /* Adjust current value for new scaling */
3078 if (cxn->lowPassFilter > 0.0)
3079 cxn->filterValue = cxn->filterValue / cxn->lowPassFilter * lowPassFilter;
3081 /* Stick in new values */
3082 if (highFilter >= 0)
3083 cxn->onThreshold = highFilter * lowPassFilter / 100.0;
3085 cxn->offThreshold = lowFilter * lowPassFilter / 100.0;
3086 cxn->lowPassFilter = lowPassFilter;
3091 * Blow away the connection gracelessly and immedately clean up
3093 void cxnNuke (Connection cxn)
3095 abortConnection (cxn) ;
3096 hostCxnDead (cxn->myHost,cxn) ;
3097 delConnection(cxn) ;
3102 * process a "article rejected do not try again" response to the
3105 static void processResponse439 (Connection cxn, char *response)
3108 ArtHolder artHolder ;
3110 if (!cxn->doesStreaming)
3112 warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
3113 " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
3115 cxnSleepOrDie (cxn) ;
3119 if (!(cxn->state == cxnFlushingS ||
3120 cxn->state == cxnFeedingS ||
3121 cxn->state == cxnClosingS))
3123 warn ("%s:%d cxnsleep connection in bad state: %s",
3124 hostPeerName (cxn->myHost), cxn->ident,
3125 stateToString (cxn->state)) ;
3126 cxnSleepOrDie (cxn) ;
3130 VALIDATE_CONNECTION (cxn) ;
3132 msgid = getMsgId (response) ;
3134 if (cxn->takeRespHead == NULL) /* peer is confused */
3136 /* NNTPRelay return 439 for check <messid> if messid is bad */
3137 if (cxn->checkRespHead == NULL) /* peer is confused */
3139 warn ("%s:%d cxnsleep response unexpected: %d",
3140 hostPeerName (cxn->myHost),cxn->ident,439) ;
3141 cxnSleepOrDie (cxn) ;
3145 if ((artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
3146 noSuchMessageId (cxn,439,msgid,response) ;
3149 cxn->checksRefused++ ;
3150 remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
3151 if (cxn->articleQTotal == 0)
3153 hostArticleNotWanted (cxn->myHost, cxn, artHolder->article);
3154 delArtHolder (artHolder) ;
3158 else if (msgid == NULL || strlen (msgid) == 0 ||
3159 (artHolder = artHolderByMsgId (msgid, cxn->takeRespHead)) == NULL)
3160 noSuchMessageId (cxn,439,msgid,response) ;
3163 cxn->takesRejected++ ;
3164 cxn->takesSizeRejected += artSize(artHolder->article);
3166 remArtHolder (artHolder, &cxn->takeRespHead, &cxn->articleQTotal) ;
3167 /* Some(?) hosts return the 439 response even before we're done
3169 if (cxn->articleQTotal == 0 && !writeIsPending(cxn->myEp))
3171 hostArticleRejected (cxn->myHost, cxn, artHolder->article) ;
3172 delArtHolder (artHolder) ;
3185 * process the "article transferred ok" response to the IHAVE-body.
3187 static void processResponse235 (Connection cxn, char *response UNUSED)
3189 ArtHolder artHolder ;
3191 if (cxn->doesStreaming)
3193 warn ("%s:%d cxnsleep unexpected non-streaming response for"
3194 " streaming connection: %s", hostPeerName (cxn->myHost),
3195 cxn->ident,response) ;
3196 cxnSleepOrDie (cxn) ;
3200 if (!(cxn->state == cxnFlushingS ||
3201 cxn->state == cxnFeedingS ||
3202 cxn->state == cxnClosingS))
3204 warn ("%s:%d cxnsleep connection in bad state: %s",
3205 hostPeerName (cxn->myHost), cxn->ident,
3206 stateToString (cxn->state)) ;
3207 cxnSleepOrDie (cxn) ;
3211 ASSERT (cxn->articleQTotal == 1) ;
3212 ASSERT (cxn->takeRespHead != NULL) ;
3213 VALIDATE_CONNECTION (cxn) ;
3215 if (cxn->takeRespHead == NULL) /* peer is confused */
3217 warn ("%s:%d cxnsleep response unexpected: %d",
3218 hostPeerName (cxn->myHost),cxn->ident,235) ;
3219 cxnSleepOrDie (cxn) ;
3223 /* now remove the article from the queue and tell the Host to forget
3225 artHolder = cxn->takeRespHead ;
3227 cxn->takeRespHead = NULL ;
3228 cxn->articleQTotal = 0 ;
3229 cxn->takesOkayed++ ;
3230 cxn->takesSizeOkayed += artSize(artHolder->article);
3232 if (cxn->articleQTotal == 0)
3235 hostArticleAccepted (cxn->myHost, cxn, artHolder->article) ;
3236 delArtHolder (artHolder) ;
3245 * process the "send article to be transfered" reponse to the IHAVE.
3247 static void processResponse335 (Connection cxn, char *response UNUSED)
3249 if (cxn->doesStreaming)
3251 warn ("%s:%d cxnsleep unexpected non-streaming response for"
3252 " streaming connection: %s", hostPeerName (cxn->myHost),
3253 cxn->ident,response) ;
3254 cxnSleepOrDie (cxn) ;
3258 if (!(cxn->state == cxnFlushingS ||
3259 cxn->state == cxnFeedingS ||
3260 cxn->state == cxnClosingS))
3262 warn ("%s:%d cxnsleep connection in bad state: %s",
3263 hostPeerName (cxn->myHost), cxn->ident,
3264 stateToString (cxn->state)) ;
3265 cxnSleepOrDie (cxn) ;
3269 if (cxn->checkRespHead == NULL)
3271 warn ("%s:%d cxnsleep response unexpected: %d",
3272 hostPeerName (cxn->myHost),cxn->ident,335) ;
3273 cxnSleepOrDie (cxn) ;
3277 VALIDATE_CONNECTION (cxn) ;
3278 /* now move the article into the third queue */
3279 cxn->takeHead = cxn->checkRespHead ;
3280 cxn->checkRespHead = NULL ;
3282 issueIHAVEBody (cxn) ;
3291 * process the "not accepting articles" response. This could be to any of
3292 * the IHAVE/CHECK/TAKETHIS command, but not the banner--that's handled
3295 static void processResponse400 (Connection cxn, char *response)
3297 if (!(cxn->state == cxnFlushingS ||
3298 cxn->state == cxnFeedingS ||
3299 cxn->state == cxnIdleS ||
3300 cxn->state == cxnClosingS))
3302 warn ("%s:%d cxnsleep connection in bad state: %s",
3303 hostPeerName (cxn->myHost), cxn->ident,
3304 stateToString (cxn->state)) ;
3305 cxnSleepOrDie (cxn) ;
3309 VALIDATE_CONNECTION (cxn) ;
3311 /* We may get a response 400 multiple times when in streaming mode. */
3312 notice ("%s:%d remote cannot accept articles: %s",
3313 hostPeerName(cxn->myHost), cxn->ident, response) ;
3315 /* right here there may still be data queued to write and so we'll fail
3316 trying to issue the quit ('cause a write will be pending). Furthermore,
3317 the data pending may be half way through an command, and so just
3318 tossing the buffer is nt sufficient. But figuring out where we are and
3319 doing a tidy job is hard */
3320 if (writeIsPending (cxn->myEp))
3321 cxnSleepOrDie (cxn) ;
3324 if (cxn->articleQTotal > 0)
3326 /* Defer the articles here so that cxnFlush() doesn't set up an
3327 immediate reconnect. */
3328 deferAllArticles (cxn) ;
3329 clearTimer (cxn->readBlockedTimerId) ;
3330 /* XXX - so cxnSleep() doesn't die when it validates the connection */
3333 /* XXX - it would be nice if we QUIT first, but we'd have to go
3334 into a state where we just search for the 205 response, and
3335 only go into the sleep state at that point */
3336 cxnSleepOrDie (cxn) ;
3345 * process the "not wanted" reponse to the IHAVE.
3347 static void processResponse435 (Connection cxn, char *response UNUSED)
3349 ArtHolder artHolder ;
3351 if (cxn->doesStreaming)
3353 warn ("%s:%d cxnsleep unexpected non-streaming response for"
3354 " streaming connection: %s", hostPeerName (cxn->myHost),
3355 cxn->ident,response) ;
3356 cxnSleepOrDie (cxn) ;
3360 if (!(cxn->state == cxnFlushingS ||
3361 cxn->state == cxnFeedingS ||
3362 cxn->state == cxnClosingS))
3364 warn ("%s:%d cxnsleep connection in bad state: %s",
3365 hostPeerName (cxn->myHost), cxn->ident,
3366 stateToString (cxn->state)) ;
3367 cxnSleepOrDie (cxn) ;
3371 /* Some servers, such as early versions of Diablo, had a bug where they'd
3372 respond with a 435 code (which should only be used for refusing an
3373 article before it was offered) after an article has been sent. */
3374 if (cxn->checkRespHead == NULL)
3376 warn ("%s:%d cxnsleep response unexpected: %d",
3377 hostPeerName (cxn->myHost), cxn->ident, 435) ;
3378 cxnSleepOrDie (cxn) ;
3382 ASSERT (cxn->articleQTotal == 1) ;
3383 VALIDATE_CONNECTION (cxn) ;
3385 cxn->articleQTotal-- ;
3386 cxn->checksRefused++ ;
3388 artHolder = cxn->checkRespHead ;
3389 cxn->checkRespHead = NULL ;
3391 if (cxn->articleQTotal == 0 && !writeIsPending(cxn->myEp))
3394 hostArticleNotWanted (cxn->myHost, cxn, artHolder->article) ;
3395 delArtHolder (artHolder) ;
3398 d_printf (1,"%s:%d On exiting 435 article queue total is %d (%d %d %d %d)\n",
3399 hostPeerName (cxn->myHost), cxn->ident,
3401 (int) (cxn->checkHead != NULL),
3402 (int) (cxn->checkRespHead != NULL),
3403 (int) (cxn->takeHead != NULL),
3404 (int) (cxn->takeRespHead != NULL));
3413 * process the "transfer failed" response to the IHAVE-body, (seems this
3414 * can come from the IHAVE too).
3416 static void processResponse436 (Connection cxn, char *response UNUSED)
3418 ArtHolder artHolder ;
3420 if (cxn->doesStreaming)
3422 warn ("%s:%d cxnsleep unexpected non-streaming response for"
3423 " streaming connection: %s", hostPeerName (cxn->myHost),
3424 cxn->ident,response) ;
3425 cxnSleepOrDie (cxn) ;
3429 if (!(cxn->state == cxnFlushingS ||
3430 cxn->state == cxnFeedingS ||
3431 cxn->state == cxnClosingS))
3433 warn ("%s:%d cxnsleep connection in bad state: %s",
3434 hostPeerName (cxn->myHost), cxn->ident,
3435 stateToString (cxn->state)) ;
3436 cxnSleepOrDie (cxn) ;
3440 ASSERT (cxn->articleQTotal == 1) ;
3441 ASSERT (cxn->takeRespHead != NULL || cxn->checkRespHead != NULL) ;
3442 VALIDATE_CONNECTION (cxn) ;
3444 cxn->articleQTotal-- ;
3446 if (cxn->takeRespHead != NULL) /* IHAVE-body command barfed */
3448 artHolder = cxn->takeRespHead ;
3449 cxn->takeRespHead = NULL ;
3451 else /* IHAVE command barfed */
3453 artHolder = cxn->checkRespHead ;
3454 cxn->checkRespHead = NULL ;
3457 if (cxn->articleQTotal == 0 && !writeIsPending(cxn->myEp))
3460 hostArticleDeferred (cxn->myHost, cxn, artHolder->article) ;
3461 delArtHolder (artHolder) ;
3469 * Process the "article rejected do not try again" response to the
3472 static void processResponse437 (Connection cxn, char *response UNUSED)
3474 ArtHolder artHolder ;
3476 if (cxn->doesStreaming)
3478 warn ("%s:%d cxnsleep unexpected non-streaming response for"
3479 " streaming connection: %s", hostPeerName (cxn->myHost),
3480 cxn->ident,response) ;
3481 cxnSleepOrDie (cxn) ;
3485 if (!(cxn->state == cxnFlushingS ||
3486 cxn->state == cxnFeedingS ||
3487 cxn->state == cxnClosingS))
3489 warn ("%s:%d cxnsleep connection in bad state: %s",
3490 hostPeerName (cxn->myHost), cxn->ident,
3491 stateToString (cxn->state)) ;
3492 cxnSleepOrDie (cxn) ;
3496 ASSERT (cxn->articleQTotal == 1) ;
3497 ASSERT (cxn->takeRespHead != NULL) ;
3498 VALIDATE_CONNECTION (cxn) ;
3500 cxn->articleQTotal-- ;
3501 cxn->takesRejected++ ;
3503 artHolder = cxn->takeRespHead ;
3504 cxn->takeRespHead = NULL ;
3505 cxn->takesSizeRejected += artSize(artHolder->article);
3507 /* Some servers return the 437 response before we're done sending. */
3508 if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
3511 hostArticleRejected (cxn->myHost, cxn, artHolder->article) ;
3512 delArtHolder (artHolder) ;
3516 /* Process the response 480 Transfer permission defined. We're probably
3517 talking to a remote nnrpd on a system that forgot to put us in
3519 static void processResponse480 (Connection cxn, char *response UNUSED)
3521 if (cxn->doesStreaming)
3523 warn ("%s:%d cxnsleep unexpected non-streaming response for"
3524 " streaming connection: %s", hostPeerName (cxn->myHost),
3525 cxn->ident,response) ;
3526 cxnSleepOrDie (cxn) ;
3530 if (!(cxn->state == cxnFlushingS ||
3531 cxn->state == cxnFeedingS ||
3532 cxn->state == cxnClosingS))
3534 warn ("%s:%d cxnsleep connection in bad state: %s",
3535 hostPeerName (cxn->myHost), cxn->ident,
3536 stateToString (cxn->state)) ;
3537 cxnSleepOrDie (cxn) ;
3541 VALIDATE_CONNECTION (cxn) ;
3543 warn ("%s:%d cxnsleep transfer permission denied",
3544 hostPeerName (cxn->myHost), cxn->ident) ;
3546 if (cxn->state == cxnClosingS)
3557 * Handle the response 503, which means the timeout of nnrpd.
3559 static void processResponse503 (Connection cxn, char *response UNUSED)
3563 VALIDATE_CONNECTION (cxn) ;
3565 if (!(cxn->state == cxnFeedingS ||
3566 cxn->state == cxnIdleS ||
3567 cxn->state == cxnFlushingS ||
3568 cxn->state == cxnClosingS))
3570 warn ("%s:%d cxnsleep connection in bad state: %s",
3571 hostPeerName (cxn->myHost), cxn->ident,
3572 stateToString (cxn->state)) ;
3573 cxnSleepOrDie (cxn) ;
3577 if (cxn->articleQTotal != 0)
3578 notice ("%s:%d flush re-connect failed", hostPeerName (cxn->myHost),
3581 cxnLogStats (cxn,true) ;
3583 immedRecon = cxn->immedRecon ;
3585 hostCxnDead (cxn->myHost,cxn) ;
3587 if (cxn->state == cxnFlushingS && immedRecon)
3589 abortConnection (cxn) ;
3590 if (!cxnConnect (cxn))
3591 notice ("%s:%d flush re-connect failed", hostPeerName (cxn->myHost),
3594 else if (cxn->state == cxnFlushingS)
3605 /****************************************************************************
3607 * END REPONSE CODE PROCESSING.
3609 ***************************************************************************/
3616 * puts the Connection into the sleep state.
3618 static void cxnSleep (Connection cxn)
3620 ASSERT (cxn != NULL) ;
3621 ASSERT (cxn->state == cxnFlushingS ||
3622 cxn->state == cxnIdleS ||
3623 cxn->state == cxnFeedingS ||
3624 cxn->state == cxnConnectingS) ;
3625 VALIDATE_CONNECTION (cxn) ;
3627 abortConnection (cxn) ;
3629 prepareReopenCbk (cxn) ; /* XXX - we don't want to reopen if idle */
3630 cxn->state = cxnSleepingS ;
3632 /* tell our Host we're asleep so it doesn't try to give us articles */
3633 hostCxnSleeping (cxn->myHost,cxn) ;
3638 static void cxnDead (Connection cxn)
3640 ASSERT (cxn != NULL) ;
3641 VALIDATE_CONNECTION (cxn) ;
3643 abortConnection (cxn) ;
3644 cxn->state = cxnDeadS ;
3650 * Sets the idle timer. If no articles arrive before the timer expires, the
3651 * connection will be closed.
3653 static void cxnIdle (Connection cxn)
3655 ASSERT (cxn != NULL) ;
3656 ASSERT (cxn->state == cxnFeedingS || cxn->state == cxnConnectingS ||
3657 cxn->state == cxnFlushingS || cxn->state == cxnClosingS) ;
3658 ASSERT (cxn->articleQTotal == 0) ;
3659 ASSERT (cxn->writeBlockedTimerId == 0) ;
3660 ASSERT (!writeIsPending (cxn->myEp)) ;
3661 ASSERT (cxn->sleepTimerId == 0) ;
3663 if (cxn->state == cxnFeedingS || cxn->state == cxnConnectingS)
3665 if (cxn->articleReceiptTimeout > 0)
3667 clearTimer (cxn->artReceiptTimerId) ;
3668 cxn->artReceiptTimerId = prepareSleep (articleTimeoutCbk,
3669 cxn->articleReceiptTimeout,
3673 if (cxn->readTimeout > 0 && cxn->state == cxnFeedingS)
3674 clearTimer (cxn->readBlockedTimerId) ;
3676 cxn->state = cxnIdleS ;
3677 ASSERT (cxn->readBlockedTimerId == 0) ;
3686 * Called when a response from the remote refers to a non-existant
3687 * message-id. The network connection is aborted and the Connection
3688 * object goes into sleep mode.
3690 static void noSuchMessageId (Connection cxn, unsigned int responseCode,
3691 const char *msgid, const char *response)
3693 const char *peerName = hostPeerName (cxn->myHost) ;
3695 if (msgid == NULL || strlen (msgid) == 0)
3696 warn ("%s:%d cxnsleep message-id missing in reponse code %d: %s",
3697 peerName, cxn->ident, responseCode, response) ;
3699 warn ("%s:%d cxnsleep message-id invalid message-id in reponse code"
3700 " %d: %s", peerName, cxn->ident, responseCode, msgid) ;
3702 cxnLogStats (cxn,true) ;
3704 if (cxn->state != cxnClosingS)
3715 * a processing error has occured (for example in parsing a response), or
3716 * we're at the end of the FSM and we're cleaning up.
3718 static void abortConnection (Connection cxn)
3720 ASSERT (cxn != NULL) ;
3721 VALIDATE_CONNECTION (cxn) ;
3723 /* cxn->myEp could be NULL if we get here during cxnConnect (via
3725 if (cxn->myEp != NULL)
3728 delEndPoint (cxn->myEp) ;
3732 clearTimer (cxn->sleepTimerId) ;
3733 clearTimer (cxn->artReceiptTimerId) ;
3734 clearTimer (cxn->readBlockedTimerId) ;
3735 clearTimer (cxn->writeBlockedTimerId) ;
3736 clearTimer (cxn->flushTimerId) ;
3738 deferAllArticles (cxn) ; /* give any articles back to Host */
3740 bufferSetDataSize (cxn->respBuffer,0) ;
3742 resetConnection (cxn) ;
3744 if (cxn->state == cxnFeedingS ||
3745 cxn->state == cxnIdleS ||
3746 cxn->state == cxnFlushingS ||
3747 cxn->state == cxnClosingS)
3748 hostCxnDead (cxn->myHost,cxn) ;
3755 * Set up the callback used when the Connection is sleeping (i.e. will try
3756 * to reopen the connection).
3758 static void prepareReopenCbk (Connection cxn)
3760 ASSERT (cxn->sleepTimerId == 0) ;
3762 if (!(cxn->state == cxnConnectingS ||
3763 cxn->state == cxnIdleS ||
3764 cxn->state == cxnFeedingS ||
3765 cxn->state == cxnFlushingS ||
3766 cxn->state == cxnStartingS))
3768 warn ("%s:%d cxnsleep connection in bad state: %s",
3769 hostPeerName (cxn->myHost), cxn->ident,
3770 stateToString (cxn->state)) ;
3771 cxnSleepOrDie (cxn) ;
3775 d_printf (1,"%s:%d Setting up a reopen callback\n",
3776 hostPeerName (cxn->myHost), cxn->ident) ;
3778 cxn->sleepTimerId = prepareSleep (reopenTimeoutCbk, cxn->sleepTimeout, cxn) ;
3780 /* bump the sleep timer amount each time to wait longer and longer. Gets
3781 reset in resetConnection() */
3782 cxn->sleepTimeout *= 2 ;
3783 if (cxn->sleepTimeout > max_reconnect_period)
3784 cxn->sleepTimeout = max_reconnect_period ;
3792 * (re)set all state variables to inital condition.
3794 static void resetConnection (Connection cxn)
3796 ASSERT (cxn != NULL) ;
3798 bufferSetDataSize (cxn->respBuffer,0) ;
3800 cxn->loggedNoCr = false ;
3802 cxn->immedRecon = false ;
3803 cxn->doesStreaming = false ; /* who knows, next time around maybe... */
3804 cxn->authenticated = false ;
3805 cxn->quitWasIssued = false ;
3806 cxn->needsChecks = true ;
3809 cxn->artsTaken = 0 ;
3810 cxn->checksIssued = 0 ;
3811 cxn->checksRefused = 0 ;
3812 cxn->takesRejected = 0 ;
3813 cxn->takesOkayed = 0 ;
3814 cxn->takesSizeRejected = 0 ;
3815 cxn->takesSizeOkayed = 0 ;
3817 cxn->filterValue = 0.0 ;
3823 * Give back all articles that are queued, but not actually in progress.
3824 * XXX merge come of this with deferAllArticles
3826 static void deferQueuedArticles (Connection cxn)
3830 for (q = NULL, p = cxn->checkHead ; p != NULL ; p = q)
3833 hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3835 cxn->articleQTotal-- ;
3837 cxn->checkHead = NULL ;
3839 for (q = NULL, p = cxn->takeHead ; cxn->doesStreaming && p != NULL ; p = q)
3842 hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3844 cxn->articleQTotal-- ;
3846 cxn->takeHead = NULL ;
3852 * Give back any articles we have to the Host for later retrys.
3854 static void deferAllArticles (Connection cxn)
3858 for (q = NULL, p = cxn->checkHead ; p != NULL ; p = q)
3861 hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3863 cxn->articleQTotal-- ;
3865 cxn->checkHead = NULL ;
3867 for (q = NULL, p = cxn->checkRespHead ; p != NULL ; p = q)
3870 hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3872 cxn->articleQTotal-- ;
3874 cxn->checkRespHead = NULL ;
3876 for (q = NULL, p = cxn->takeHead ; p != NULL ; p = q)
3879 hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3881 cxn->articleQTotal-- ;
3883 cxn->takeHead = NULL ;
3885 for (q = NULL, p = cxn->takeRespHead ; p != NULL ; p = q)
3888 hostTakeBackArticle (cxn->myHost, cxn, p->article) ;
3890 cxn->articleQTotal-- ;
3892 cxn->takeRespHead = NULL ;
3894 ASSERT (cxn->articleQTotal == 0) ;
3902 * Called when there's an article to be pushed out to the remote. Even if
3903 * the Connection has an article it's possible that nothing will be written
3904 * (e.g. if the article on the queue doesn't exist any more)
3906 static void doSomeWrites (Connection cxn)
3908 bool doneSome = false ;
3910 /* If there's a write pending we can't do anything now. */
3911 if ( writeIsPending (cxn->myEp) )
3913 else if ( writesNeeded (cxn) ) /* something on a queue. */
3915 if (cxn->doesStreaming)
3916 doneSome = issueStreamingCommands (cxn) ;
3918 doneSome = issueIHAVE (cxn) ;
3920 /* doneSome will be false if article(s) were gone, but if the Host
3921 has something available, then it would have been put on the queue
3922 for next time around. */
3925 if (writesNeeded (cxn)) /* Host gave us something */
3926 addWorkCallback (cxn->myEp,cxnWorkProc,cxn) ; /* for next time. */
3927 else if (cxn->articleQTotal == 0)
3929 /* if we were in cxnFeedingS, then issueStreamingCommands
3930 already called cxnIdle(). */
3931 if (cxn->state == cxnClosingS || cxn->state == cxnFlushingS)
3932 issueQUIT (cxn) ; /* and nothing to wait for... */
3936 else if (cxn->state == cxnClosingS || cxn->state == cxnFlushingS)
3937 { /* nothing to do... */
3938 if (cxn->articleQTotal == 0)
3939 issueQUIT (cxn) ; /* and nothing to wait for before closing */
3947 /* Queue up a buffer with the IHAVE command in it for the article at
3948 * the head of the transmisson queue.
3950 * If the article is missing, then the Host will be notified and
3951 * another article may be put on the Connections queue. This new
3952 * article is ignored for now, but a work callback is registered so
3953 * that it can be looked at later.
3955 static bool issueIHAVE (Connection cxn)
3957 Buffer ihaveBuff, *writeArr ;
3963 size_t bufLen = 256 ;
3966 ASSERT (!cxn->doesStreaming) ;
3967 ASSERT (cxn->state == cxnFlushingS ||
3968 cxn->state == cxnFeedingS ||
3969 cxn->state == cxnClosingS) ;
3970 ASSERT (cxn->articleQTotal == 1) ;
3971 ASSERT (cxn->checkHead != NULL) ;
3972 ASSERT (writeIsPending (cxn->myEp) == false) ;
3973 VALIDATE_CONNECTION (cxn) ;
3975 artH = cxn->checkHead ;
3976 article = cxn->checkHead->article ;
3977 msgid = artMsgId (artH->article) ;
3979 ASSERT (msgid != NULL) ;
3980 ASSERT (article != NULL) ;
3982 if ((tmp = (strlen (msgid) + 10)) > bufLen)
3985 ihaveBuff = newBuffer (bufLen) ;
3987 ASSERT (ihaveBuff != NULL) ;
3989 p = bufferBase (ihaveBuff) ;
3990 sprintf (p, "IHAVE %s\r\n", msgid) ;
3991 bufferSetDataSize (ihaveBuff, strlen (p)) ;
3993 d_printf (5,"%s:%d Command IHAVE %s\n",
3994 hostPeerName (cxn->myHost),cxn->ident,msgid) ;
3996 writeArr = makeBufferArray (ihaveBuff, NULL) ;
3997 if ( !prepareWriteWithTimeout (cxn->myEp, writeArr, commandWriteDone,
4000 die ("%s:%d fatal prepare write for IHAVE failed",
4001 hostPeerName (cxn->myHost), cxn->ident) ;
4004 /* now move the article to the second queue */
4005 cxn->checkRespHead = cxn->checkHead ;
4006 cxn->checkHead = NULL ;
4008 cxn->checksIssued++ ;
4009 hostArticleOffered (cxn->myHost, cxn) ;
4021 * Do a prepare write with the article body as the body portion of the
4024 static void issueIHAVEBody (Connection cxn)
4026 Buffer *writeArray ;
4029 ASSERT (cxn != NULL) ;
4030 ASSERT (!cxn->doesStreaming) ;
4031 ASSERT (cxn->state == cxnFlushingS ||
4032 cxn->state == cxnFeedingS ||
4033 cxn->state == cxnClosingS) ;
4034 ASSERT (cxn->articleQTotal == 1) ;
4035 ASSERT (cxn->takeHead != NULL) ;
4036 VALIDATE_CONNECTION (cxn) ;
4038 article = cxn->takeHead->article ;
4039 ASSERT (article != NULL) ;
4041 if (cxn->state != cxnClosingS)
4042 writeArray = artGetNntpBuffers (article) ;
4046 if (writeArray == NULL)
4048 /* missing article (expired for example) will get us here. */
4049 if (dotBuffer == NULL)
4051 dotBuffer = newBufferByCharP (".\r\n",3,3) ;
4052 dotFirstBuffer = newBufferByCharP ("\r\n.",3,3) ;
4053 crlfBuffer = newBufferByCharP ("\r\n",2,2) ;
4056 /* we'll just write the empty buffer and the remote will complain
4058 writeArray = makeBufferArray (bufferTakeRef (dotBuffer),NULL) ;
4062 if ( !prepareWriteWithTimeout (cxn->myEp, writeArray, ihaveBodyDone, cxn) )
4064 die ("%s:%d fatal prepare write failed in issueIHAVEBody",
4065 hostPeerName (cxn->myHost), cxn->ident) ;
4069 d_printf (5,"%s:%d prepared write for IHAVE body.\n",
4070 hostPeerName (cxn->myHost),cxn->ident) ;
4073 /* now move the article to the last queue */
4074 cxn->takeRespHead = cxn->takeHead ;
4075 cxn->takeHead = NULL ;
4084 /* Process the two command queues. Slaps all the CHECKs together and
4085 * then does the TAKETHIS commands.
4087 * If no articles on the queue(s) are valid, then the Host is
4088 * notified. It may queue up new articles on the Connection, but
4089 * these are ignored for now. A work proc is registered so the
4090 * articles can be processed later.
4092 static bool issueStreamingCommands (Connection cxn)
4094 Buffer checkBuffer = NULL ; /* the buffer with the CHECK commands in it. */
4095 Buffer *writeArray = NULL ;
4099 ASSERT (cxn != NULL) ;
4100 ASSERT (cxn->myEp != NULL) ;
4101 ASSERT (cxn->doesStreaming) ;
4102 VALIDATE_CONNECTION (cxn) ;
4104 checkBuffer = buildCheckBuffer (cxn) ; /* may be null if none to issue */
4106 if (checkBuffer != NULL)
4108 /* Now shift the articles to their new queue. */
4109 for (p = cxn->checkRespHead ; p != NULL && p->next != NULL ; p = p->next)
4110 /* nada--finding end of queue*/ ;
4113 cxn->checkRespHead = cxn->checkHead ;
4115 p->next = cxn->checkHead ;
4117 cxn->checkHead = NULL ;
4121 writeArray = buildTakethisBuffers (cxn,checkBuffer) ; /* may be null */
4123 /* If not null, then writeArray will have checkBuffer (if it wasn't NULL)
4124 in the first spot and the takethis buffers after that. */
4127 if ( !prepareWriteWithTimeout (cxn->myEp, writeArray,
4128 commandWriteDone, cxn) )
4130 die ("%s:%d fatal prepare write for STREAMING commands failed",
4131 hostPeerName (cxn->myHost), cxn->ident) ;
4136 /* now shift articles over to their new queue. */
4137 for (p = cxn->takeRespHead ; p != NULL && p->next != NULL ; p = p->next)
4138 /* nada--finding end of queue */ ;
4141 cxn->takeRespHead = cxn->takeHead ;
4143 p->next = cxn->takeHead ;
4145 cxn->takeHead = NULL ;
4148 /* we defer the missing article notification to here because if there
4149 was a big backlog of missing articles *and* we're running in
4150 no-CHECK mode, then the Host would be putting bad articles on the
4151 queue we're taking them off of. */
4152 if (cxn->missing && cxn->articleQTotal == 0)
4154 for (p = cxn->missing ; p != NULL ; p = q)
4156 hostArticleIsMissing (cxn->myHost, cxn, p->article) ;
4160 cxn->missing = NULL ;
4170 * build up the buffer of all the CHECK commands.
4172 static Buffer buildCheckBuffer (Connection cxn)
4175 size_t lenBuff = 0 ;
4176 Buffer checkBuffer = NULL ;
4177 const char *peerName = hostPeerName (cxn->myHost) ;
4179 p = cxn->checkHead ;
4182 Article article = p->article ;
4185 msgid = artMsgId (article) ;
4187 lenBuff += (8 + strlen (msgid)) ; /* 8 == strlen("CHECK \r\n") */
4192 lenBuff++ ; /* for the null byte */
4194 /* now build up the single buffer that contains all the CHECK commands */
4200 checkBuffer = newBuffer (lenBuff) ;
4201 t = bufferBase (checkBuffer) ;
4203 p = cxn->checkHead ;
4206 const char *msgid = artMsgId (p->article) ;
4208 sprintf (t,"CHECK %s\r\n", msgid) ;
4209 d_printf (5,"%s:%d Command %s\n", peerName, cxn->ident, t) ;
4211 tlen += strlen (t) ;
4215 cxn->checksIssued++ ;
4216 hostArticleOffered (cxn->myHost,cxn) ;
4221 ASSERT (tlen + 1 == lenBuff) ;
4223 bufferSetDataSize (checkBuffer, tlen) ;
4226 return checkBuffer ;
4235 * Construct and array of TAKETHIS commands and the command bodies. Any
4236 * articles on the queue that are missing will be removed and the Host will
4239 static Buffer *buildTakethisBuffers (Connection cxn, Buffer checkBuffer)
4241 size_t lenArray = 0 ;
4243 Buffer *rval = NULL ;
4244 const char *peerName = hostPeerName (cxn->myHost) ;
4246 if (checkBuffer != NULL)
4249 if (cxn->takeHead != NULL) /* some TAKETHIS commands to be done. */
4252 unsigned int takeBuffLen ;
4253 unsigned int writeIdx = 0 ;
4255 /* count up all the buffers we'll be writing. One extra each time for
4256 the TAKETHIS command buffer*/
4257 for (p = cxn->takeHead ; p != NULL ; p = p->next)
4258 if (artContentsOk (p->article))
4259 lenArray += (1 + artNntpBufferCount (p->article)) ;
4261 /* now allocate the array for the buffers and put them all in it */
4262 /* 1 for the terminator */
4263 rval = xmalloc (sizeof(Buffer) * (lenArray + 1)) ;
4265 if (checkBuffer != NULL)
4266 rval [writeIdx++] = checkBuffer ;
4275 Buffer *articleBuffers ;
4278 article = p->article ;
4279 nntpLen = artNntpBufferCount (article) ;
4280 msgid = artMsgId (article) ;
4283 { /* file no longer valid so drop from queue */
4286 if (q == NULL) /* it's the first in the queue */
4287 cxn->takeHead = p->next ;
4292 ASSERT (cxn->articleQTotal > 0) ;
4293 cxn->articleQTotal-- ;
4295 ta->next = cxn->missing ;
4300 articleBuffers = artGetNntpBuffers (article) ;
4302 /* set up the buffer with the TAKETHIS command in it.
4303 12 == strlen ("TAKETHIS \n\r") */
4304 takeBuffLen = 12 + strlen (msgid) ;
4305 takeBuffer = newBuffer (takeBuffLen) ;
4306 t = bufferBase (takeBuffer) ;
4308 sprintf (t, "TAKETHIS %s\r\n", msgid) ;
4309 bufferSetDataSize (takeBuffer, strlen (t)) ;
4311 d_printf (5,"%s:%d Command %s\n", peerName, cxn->ident, t) ;
4313 ASSERT (writeIdx <= lenArray) ;
4314 rval [writeIdx++] = takeBuffer ;
4316 /* now add all the buffers that make up the body of the TAKETHIS
4318 for (i = 0 ; i < nntpLen ; i++)
4320 ASSERT (writeIdx <= lenArray) ;
4321 rval [writeIdx++] = bufferTakeRef (articleBuffers [i]) ;
4324 freeBufferArray (articleBuffers) ;
4326 if ( !cxn->needsChecks )
4328 /* this isn't quite right. An article may be counted
4329 twice if we switch to no-CHECK mode after its
4330 CHECK was issued, but before its TAKETHIS was done
4331 just now. I'm not going to worry unless someone
4334 cxn->checksIssued++ ;
4335 hostArticleOffered (cxn->myHost,cxn) ;
4344 rval [writeIdx] = NULL ;
4346 { /* all articles were missing and no CHECKS */
4351 else if (checkBuffer != NULL) /* no TAKETHIS to do, but some CHECKS */
4352 rval = makeBufferArray (checkBuffer, NULL) ;
4362 * for one reason or another we need to disconnect gracefully. We send a
4365 static void issueQUIT (Connection cxn)
4367 Buffer quitBuffer, *writeArray ;
4368 const char *peerName = hostPeerName (cxn->myHost) ;
4370 ASSERT (cxn->takeHead == NULL) ;
4371 ASSERT (cxn->checkHead == NULL) ;
4372 VALIDATE_CONNECTION (cxn) ;
4374 if (cxn->quitWasIssued)
4377 if (writeIsPending (cxn->myEp))
4379 warn ("%s:%d internal QUIT while write pending", peerName,
4382 if (cxn->state == cxnClosingS)
4389 quitBuffer = newBuffer (7) ;
4390 strcpy (bufferBase (quitBuffer), "QUIT\r\n") ;
4391 bufferSetDataSize (quitBuffer, 6) ;
4393 writeArray = makeBufferArray (quitBuffer, NULL) ;
4395 d_printf (1,"%s:%d Sending a quit command\n",
4396 hostPeerName (cxn->myHost),cxn->ident) ;
4398 cxn->quitWasIssued = true ; /* not exactly true, but good enough */
4400 if ( !prepareWriteWithTimeout (cxn->myEp, writeArray, quitWritten,
4403 die ("%s:%d fatal prepare write for QUIT command failed", peerName,
4414 * Set up the timer for the blocked reads
4416 static void initReadBlockedTimeout (Connection cxn)
4418 ASSERT (cxn != NULL) ;
4419 ASSERT (cxn->state != cxnIdleS ) ;
4421 /* set up the response timer. */
4422 clearTimer (cxn->readBlockedTimerId) ;
4424 if (cxn->readTimeout > 0)
4425 cxn->readBlockedTimerId = prepareSleep (responseTimeoutCbk, cxn->readTimeout, cxn) ;
4433 * Set up the timer for the blocked reads
4435 static int prepareWriteWithTimeout (EndPoint endp,
4440 /* Clear the read timer, since we can't expect a response until everything
4442 XXX - would be nice to have a timeout for reponses if we're sending a
4443 string of commands. */
4444 clearTimer (cxn->readBlockedTimerId) ;
4446 /* set up the write timer. */
4447 clearTimer (cxn->writeBlockedTimerId) ;
4449 if (cxn->writeTimeout > 0)
4450 cxn->writeBlockedTimerId = prepareSleep (writeTimeoutCbk, cxn->writeTimeout,
4453 /* set up the write. */
4454 return prepareWrite (endp, buffers, writeProgress, done, cxn) ;
4462 * Does the actual deletion of a connection and all its private data.
4464 static void delConnection (Connection cxn)
4472 d_printf (1,"Deleting connection: %s:%d\n",
4473 hostPeerName (cxn->myHost),cxn->ident) ;
4475 for (c = gCxnList, q = NULL ; c != NULL ; q = c, c = c->next)
4479 gCxnList = gCxnList->next ;
4485 ASSERT (c != NULL) ;
4487 if (cxn->myEp != NULL)
4488 delEndPoint (cxn->myEp) ;
4490 ASSERT (cxn->checkHead == NULL) ;
4491 ASSERT (cxn->checkRespHead == NULL) ;
4492 ASSERT (cxn->takeHead == NULL) ;
4493 ASSERT (cxn->takeRespHead == NULL) ;
4495 delBuffer (cxn->respBuffer) ;
4497 /* tell the Host we're outta here. */
4498 shutDown = hostCxnGone (cxn->myHost, cxn) ;
4503 free (cxn->ipName) ;
4505 clearTimer (cxn->artReceiptTimerId) ;
4506 clearTimer (cxn->readBlockedTimerId) ;
4507 clearTimer (cxn->writeBlockedTimerId) ;
4508 clearTimer (cxn->flushTimerId) ;
4514 /* exit program if that was the last connexion for the last host */
4515 /* XXX what about if there are ever multiple listeners?
4516 XXX this will be executed if all hosts on only one of the
4517 XXX listeners have gone */
4518 time_t now = theTime () ;
4519 char dateString [30] ;
4520 char **p = PointersFreedOnExit ;
4522 /* finish out all outstanding memory */
4525 free (PointersFreedOnExit) ;
4526 freeTimeoutQueue () ;
4528 strlcpy (dateString,ctime (&now), sizeof(dateString)) ;
4530 notice ("ME finishing at %s", dateString) ;
4541 * Bump up the value of the low pass filter on the connection.
4543 static void incrFilter (Connection cxn)
4545 cxn->filterValue *= (1.0 - (1.0 / cxn->lowPassFilter)) ;
4546 cxn->filterValue += 1.0 ;
4554 * decrement the value of the low pass filter on the connection.
4556 static void decrFilter (Connection cxn)
4558 cxn->filterValue *= (1.0 - (1.0 / cxn->lowPassFilter)) ;
4566 * return true if we have articles we need to issue commands for.
4568 static bool writesNeeded (Connection cxn)
4570 return (cxn->checkHead != NULL || cxn->takeHead != NULL ? true : false) ;
4578 * do some simple tests to make sure it's OK.
4580 static void validateConnection (Connection cxn)
4588 /* count up the articles the Connection has and make sure that matches. */
4589 for (p = cxn->takeHead ; p != NULL ; p = p->next)
4591 d_printf (4,"TAKE queue: %d\n",i) ;
4594 for (p = cxn->takeRespHead ; p != NULL ; p = p->next)
4596 d_printf (4,"TAKE response queue: %d\n",i - old) ;
4599 for (p = cxn->checkHead ; p != NULL ; p = p->next)
4601 d_printf (4,"CHECK queue: %d\n",i - old) ;
4604 for (p = cxn->checkRespHead ; p != NULL ; p = p->next)
4606 d_printf (4,"CHECK response queue: %d\n",i - old) ;
4608 ASSERT (i == cxn->articleQTotal) ;
4612 case cxnConnectingS:
4613 ASSERT (cxn->doesStreaming == false) ;
4614 ASSERT (cxn->articleQTotal <= 1) ;
4615 ASSERT (cxn->artReceiptTimerId == 0) ;
4616 ASSERT (cxn->sleepTimerId == 0) ;
4617 /* ASSERT (cxn->timeCon == 0) ; */
4621 ASSERT (cxn->articleQTotal == 0) ;
4622 ASSERT (cxn->myEp == NULL) ;
4623 ASSERT (cxn->artReceiptTimerId == 0) ;
4624 ASSERT (cxn->readBlockedTimerId == 0) ;
4625 ASSERT (cxn->writeBlockedTimerId == 0) ;
4626 ASSERT (cxn->flushTimerId == 0) ;
4627 ASSERT (cxn->sleepTimerId == 0) ;
4628 ASSERT (cxn->timeCon == 0) ;
4633 if (!cxn->doesStreaming)
4634 ASSERT (cxn->articleQTotal <= 1) ;
4635 ASSERT (cxn->artReceiptTimerId == 0) ;
4636 ASSERT (cxn->flushTimerId == 0) ;
4637 ASSERT (cxn->sleepTimerId == 0) ;
4638 ASSERT (cxn->timeCon != 0) ;
4639 ASSERT (cxn->doesStreaming || cxn->maxCheck == 1) ;
4643 if (cxn->doesStreaming)
4644 /* Some(?) hosts return the 439 response even before we're done
4645 sending, so don't go idle until here */
4646 ASSERT (cxn->articleQTotal > 0 || writeIsPending (cxn->myEp)) ;
4648 ASSERT (cxn->articleQTotal == 1) ;
4649 if (cxn->readTimeout > 0 && !writeIsPending (cxn->myEp) &&
4650 cxn->checkRespHead != NULL && cxn->takeRespHead != NULL)
4651 ASSERT (cxn->readBlockedTimerId != 0) ;
4652 if (cxn->writeTimeout > 0 && writeIsPending (cxn->myEp))
4653 ASSERT (cxn->writeBlockedTimerId != 0) ;
4654 ASSERT (cxn->sleepTimerId == 0) ;
4655 ASSERT (cxn->timeCon != 0) ;
4656 ASSERT (cxn->doesStreaming || cxn->maxCheck == 1) ;
4660 ASSERT (cxn->articleQTotal == 0) ;
4661 if (cxn->articleReceiptTimeout > 0)
4662 ASSERT (cxn->artReceiptTimerId != 0) ;
4663 ASSERT (cxn->readBlockedTimerId == 0) ;
4664 ASSERT (cxn->writeBlockedTimerId == 0) ;
4665 ASSERT (cxn->sleepTimerId == 0) ;
4666 ASSERT (cxn->timeCon != 0) ;
4667 ASSERT (!writeIsPending (cxn->myEp)) ;
4670 case cxnIdleTimeoutS:
4671 ASSERT (cxn->articleQTotal == 0) ;
4672 ASSERT (cxn->artReceiptTimerId == 0) ;
4673 ASSERT (cxn->writeBlockedTimerId == 0) ;
4674 ASSERT (cxn->sleepTimerId == 0) ;
4675 ASSERT (cxn->timeCon != 0) ;
4676 ASSERT (!writeIsPending (cxn->myEp)) ;
4680 ASSERT (cxn->articleQTotal == 0) ;
4681 ASSERT (cxn->myEp == NULL) ;
4682 ASSERT (cxn->artReceiptTimerId == 0) ;
4683 ASSERT (cxn->readBlockedTimerId == 0) ;
4684 ASSERT (cxn->writeBlockedTimerId == 0) ;
4685 ASSERT (cxn->flushTimerId == 0) ;
4686 ASSERT (cxn->timeCon == 0) ;
4690 ASSERT (cxn->articleQTotal == 0) ;
4691 ASSERT (cxn->myEp == NULL) ;
4692 ASSERT (cxn->artReceiptTimerId == 0) ;
4693 ASSERT (cxn->readBlockedTimerId == 0) ;
4694 ASSERT (cxn->writeBlockedTimerId == 0) ;
4695 ASSERT (cxn->flushTimerId == 0) ;
4696 ASSERT (cxn->sleepTimerId == 0) ;
4697 ASSERT (cxn->timeCon == 0) ;
4710 * Generate a printable string of the parameter.
4712 static const char *stateToString (CxnState state)
4714 static char rval [64] ;
4719 strlcpy (rval,"cxnStartingS", sizeof(rval)) ;
4723 strlcpy (rval,"cxnWaitingS", sizeof(rval)) ;
4726 case cxnConnectingS:
4727 strlcpy (rval,"cxnConnectingS", sizeof(rval)) ;
4731 strlcpy (rval,"cxnIdleS", sizeof(rval)) ;
4734 case cxnIdleTimeoutS:
4735 strlcpy (rval,"cxnIdleTimeoutS", sizeof(rval)) ;
4739 strlcpy (rval,"cxnFeedingS", sizeof(rval)) ;
4743 strlcpy (rval,"cxnSleepingS", sizeof(rval)) ;
4747 strlcpy (rval,"cxnFlushingS", sizeof(rval)) ;
4751 strlcpy (rval,"cxnClosingS", sizeof(rval)) ;
4755 strlcpy (rval,"cxnDeadS", sizeof(rval)) ;
4759 snprintf (rval,sizeof(rval),"UNKNOWN STATE: %d",state) ;
4770 /****************************************************************************
4772 * Functions for managing the internal queue of Articles on each Connection.
4774 ****************************************************************************/
4776 static ArtHolder newArtHolder (Article article)
4778 ArtHolder a = xmalloc (sizeof(struct art_holder_s)) ;
4780 a->article = article ;
4791 * Deletes the article holder
4793 static void delArtHolder (ArtHolder artH)
4804 * remove the article holder from the queue. Adjust the count and if nxtPtr
4805 * points at the element then adjust that too.
4807 static bool remArtHolder (ArtHolder artH, ArtHolder *head, unsigned int *count)
4811 ASSERT (head != NULL) ;
4812 ASSERT (count != NULL) ;
4816 while (h != NULL && h != artH)
4826 *head = (*head)->next ;
4828 i->next = artH->next ;
4840 * append the ArticleHolder to the queue
4842 static void appendArtHolder (ArtHolder artH, ArtHolder *head, unsigned int *count)
4846 ASSERT (head != NULL) ;
4847 ASSERT (count != NULL) ;
4849 for (p = *head ; p != NULL && p->next != NULL ; p = p->next)
4866 * find the article holder on the queue by comparing the message-id.
4868 static ArtHolder artHolderByMsgId (const char *msgid, ArtHolder head)
4870 while (head != NULL)
4872 if (strcmp (msgid, artMsgId (head->article)) == 0)
4884 * Randomize a numeber by the given percentage
4887 static int fudgeFactor (int initVal)
4890 static bool seeded ;
4894 time_t t = theTime () ;
4896 /* this may have been done already in endpoint.c. Is that a problem??? */
4901 newValue = initVal + (initVal / 10 - (rand() % (initVal / 5)));