chiark / gitweb /
Commit 2.4.5-5 as unpacked
[inn-innduct.git] / innfeed / connection.c
1 /*  $Id: connection.c 7793 2008-04-26 08:15:40Z iulius $
2 **
3 **  The implementation of the innfeed Connection class.
4 **
5 **  Written by James Brister <brister@vix.com>
6 **
7 **  The Connection object is what manages the NNTP protocol. If the remote
8 **  doesn't do streaming, then the standard IHAVE lock-step protcol is
9 **  performed. In the streaming situation we have two cases. One where we must
10 **  send CHECK commands, and the other where we can directly send TAKETHIS
11 **  commands without a prior CHECK.
12 **
13 **  The Connection object maintains four article queues. The first one is
14 **  where new articles are put if they need to have an IHAVE or CHECK command
15 **  sent for them. The second queue is where the articles move from the first
16 **  after their IHAVE/CHECK command is sent, but the reply has not yet been
17 **  seen. The third queue is where articles go after the IHAVE/CHECK reply has
18 **  been seen (and the reply says to send the article). It is articles in the
19 **  third queue that have the TAKETHIS command sent, or the body of an IHAVE.
20 **  The third queue is also where new articles go if the connection is running
21 **  in no-CHECK mode. The fourth queue is where the articles move to from the
22 **  third queue after their IHAVE-body or TAKETHIS command has been sent. When
23 **  the response to the IHAVE-body or TAKETHIS is received the articles are
24 **  removed from the fourth queue and the Host object controlling this
25 **  Connection is notified of the success or failure of the transfer.
26 **
27 **  The whole system is event-driven by the EndPoint class and the Host via
28 **  calls to prepareRead() and prepareWrite() and prepareSleep().
29 **
30 **
31 **  We should probably store the results of gethostbyname in the connection so
32 **  we can rotate through the address when one fails for connecting. Perhaps
33 **  the gethostbyname should be done in the Host and the connection should
34 **  just be given the address to use.
35 **
36 **  Should we worry about articles being stuck on a queue for ever if the
37 **  remote forgets to send a response to a CHECK?
38 **
39 **  Perhaps instead of killing the connection on some of the more simple
40 **  errors, we should perhaps try to flush the input and keep going.
41 **
42 **  Worry about counter overflow.
43 **
44 **  Worry about stats gathering when switch to no-check mode.
45 **
46 **  XXX if issueQUIT() has a problem and the state goes to cxnDeadS this is
47 **  not handled properly everywhere yet.
48 */
49
50 #include "innfeed.h"
51 #include "config.h"
52 #include "clibrary.h"
53 #include "portable/socket.h"
54 #include "portable/time.h"
55
56 #include <assert.h>
57 #include <errno.h>
58 #include <fcntl.h>
59 #include <netdb.h>
60 #include <signal.h>
61 #include <syslog.h>
62
63 #if defined (__FreeBSD__)
64 # include <sys/ioctl.h>
65 #endif
66
67 #include "inn/messages.h"
68 #include "libinn.h"
69
70 #include "article.h"
71 #include "buffer.h"
72 #include "configfile.h"
73 #include "connection.h"
74 #include "endpoint.h"
75 #include "host.h"
76
77 #if defined (NDEBUG)
78 #define VALIDATE_CONNECTION(x) ((void) 0)
79 #else
80 #define VALIDATE_CONNECTION(x) validateConnection (x)
81 #endif
82
83 extern char **PointersFreedOnExit ;
84 extern const char *pidFile ;
85
86 /*
87  * Private types.
88  */
89
90 /* We keep a linked list of articles the connection is trying to transmit */
91 typedef struct art_holder_s
92 {
93     Article article ;
94     struct art_holder_s *next ;
95 } *ArtHolder ;
96
97
98 typedef enum {
99   cxnStartingS,                 /* the connection's start state. */
100   cxnWaitingS,                  /* not connected. Waiting for an article. */
101   cxnConnectingS,               /* in the middle of connecting */
102   cxnIdleS,                     /* open and ready to feed, has empty queues */
103   cxnIdleTimeoutS,              /* timed out in the idle state */
104   cxnFeedingS,                  /* in the processes of feeding articles */
105   cxnSleepingS,                 /* blocked on reestablishment timer */
106   cxnFlushingS,                 /* am waiting for queues to drain to bounce connection. */
107   cxnClosingS,                  /* have been told to close down permanently when queues drained */
108   cxnDeadS                      /* connection is dead. */
109 } CxnState ;
110
111 /* The Connection class */
112 struct connection_s
113 {
114     Host myHost ;               /* the host who owns the connection */
115     EndPoint myEp ;             /* the endpoint the connection talks through */
116     unsigned int ident ;               /* an identifier for syslogging. */
117     CxnState state ;            /* the state the connection is in */
118
119
120     /*
121      * The Connection maintains 4 queue of articles.
122      */
123     ArtHolder checkHead ;       /* head of article list to do CHECK/IHAVE */
124     ArtHolder checkRespHead ;   /* head of list waiting on CHECK/IHAVE
125                                    response */
126     ArtHolder takeHead ;        /* head of list of articles to send
127                                    TAKETHIS/IHAVE-body */
128     ArtHolder takeRespHead ;    /* list of articles waiting on
129                                    TAKETHIS/IHAVE-body response */
130     unsigned int articleQTotal ;       /* number of articles in all four queues */
131     ArtHolder missing ;         /* head of missing list */
132
133
134     Buffer respBuffer ;         /* buffer all responses are read into */
135
136     char *ipName ;              /* the ip name (possibly quad) of the remote */
137
138     unsigned int maxCheck ;            /* the max number of CHECKs to send */
139     unsigned short port ;              /* the port number to use */
140
141     /*
142      * Timeout values and their callback IDs
143      */
144
145     /* Timer for max amount of time between receiving articles from the
146        Host */
147     unsigned int articleReceiptTimeout ;
148     TimeoutId artReceiptTimerId ;
149
150     /* Timer for the max amount of time to wait for a response from the
151        remote */
152     unsigned int readTimeout ;
153     TimeoutId readBlockedTimerId ;
154
155     /* Timer for the max amount of time to wait for a any amount of data
156        to be written to the remote */
157     unsigned int writeTimeout ;
158     TimeoutId writeBlockedTimerId ;
159
160     /* Timer for the max number of seconds to keep the network connection
161        up (long lasting connections give older nntp servers problems). */
162     unsigned int flushTimeout ;
163     TimeoutId flushTimerId ;
164
165     /* Timer for the number of seconds to sleep before attempting a
166        reconnect. */
167     unsigned int sleepTimeout ;
168     TimeoutId sleepTimerId ;
169
170
171     bool loggedNoCr ;           /* true if we logged the NOCR_MSG */
172     bool immedRecon ;           /* true if we recon immediately after flushing. */
173     bool doesStreaming ;        /* true if remote will handle streaming */
174     bool authenticated ;        /* true if remote authenticated */
175     bool quitWasIssued ;          /* true if QUIT command was sent. */
176     bool needsChecks ;          /* true if we issue CHECK commands in
177                                    streaming mode (rather than just sending
178                                    TAKETHIS commands) */
179
180     time_t timeCon ;            /* the time the connect happened (including
181                                    the MODE STREAM command). */
182
183     /*
184      * STATISTICS
185      */
186     unsigned int artsTaken ;           /* the number of articles INN gave this cxn */
187     unsigned int checksIssued ;        /* the number of CHECKS/IHAVES we
188                                    sent. Note that if we're running in
189                                    no-CHECK mode, then we add in the
190                                    TAKETHIS commands too */
191     unsigned int checksRefused ;       /* the number of response 435/438 */
192     unsigned int takesRejected ;       /* the number of response 437/439 recevied */
193     unsigned int takesOkayed ;         /* the number of response 235/239 received */
194
195     double takesSizeRejected ;
196     double takesSizeOkayed ;
197
198     double onThreshold ;        /* for no-CHECK mode */
199     double offThreshold ;       /* for no-CHECK mode */
200     double filterValue ;        /* current value of IIR filter */
201     double lowPassFilter ;      /* time constant for IIR filter */
202
203     Connection next ;           /* for global list. */
204 };
205
206 static Connection gCxnList = NULL ;
207 static unsigned int gCxnCount = 0 ;
208 static unsigned int max_reconnect_period = MAX_RECON_PER ;
209 static unsigned int init_reconnect_period = INIT_RECON_PER ;
210 #if 0
211 static bool inited = false ;
212 #endif
213 static Buffer dotFirstBuffer ;
214 static Buffer dotBuffer ;
215 static Buffer crlfBuffer ;
216
217
218 /***************************************************
219  *
220  * Private function declarations.
221  *
222  ***************************************************/
223
224
225 /* I/O Callbacks */
226 static void connectionDone (EndPoint e, IoStatus i, Buffer *b, void *d) ;
227 static void getBanner (EndPoint e, IoStatus i, Buffer *b, void *d) ;
228 static void getAuthUserResponse (EndPoint e, IoStatus i, Buffer *b, void *d) ;
229 static void getAuthPassResponse (EndPoint e, IoStatus i, Buffer *b, void *d) ;
230 static void getModeResponse (EndPoint e, IoStatus i, Buffer *b, void *d) ;
231 static void responseIsRead (EndPoint e, IoStatus i, Buffer *b, void *d) ;
232 static void quitWritten (EndPoint e, IoStatus i, Buffer *b, void *d) ;
233 static void ihaveBodyDone (EndPoint e, IoStatus i, Buffer *b, void *d) ;
234 static void commandWriteDone (EndPoint e, IoStatus i, Buffer *b, void *d) ;
235 static void modeCmdIssued (EndPoint e, IoStatus i, Buffer *b, void *d) ;
236 static void authUserIssued (EndPoint e, IoStatus i, Buffer *b, void *d) ;
237 static void authPassIssued (EndPoint e, IoStatus i, Buffer *b, void *d) ;
238 static void writeProgress (EndPoint e, IoStatus i, Buffer *b, void *d) ;
239
240
241 /* Timer callbacks */
242 static void responseTimeoutCbk (TimeoutId id, void *data) ;
243 static void writeTimeoutCbk (TimeoutId id, void *data) ;
244 static void reopenTimeoutCbk (TimeoutId id, void *data) ;
245 static void flushCxnCbk (TimeoutId, void *data) ;
246 static void articleTimeoutCbk (TimeoutId id, void *data) ;
247
248 /* Work callbacks */
249 static void cxnWorkProc (EndPoint ep, void *data) ;
250
251
252 static void cxnSleepOrDie (Connection cxn) ;
253
254 /* Response processing. */
255 static void processResponse205 (Connection cxn, char *response) ;
256 static void processResponse238 (Connection cxn, char *response) ;
257 static void processResponse431 (Connection cxn, char *response) ;
258 static void processResponse438 (Connection cxn, char *response) ;
259 static void processResponse239 (Connection cxn, char *response) ;
260 static void processResponse439 (Connection cxn, char *response) ;
261 static void processResponse235 (Connection cxn, char *response) ;
262 static void processResponse335 (Connection cxn, char *response) ;
263 static void processResponse400 (Connection cxn, char *response) ;
264 static void processResponse435 (Connection cxn, char *response) ;
265 static void processResponse436 (Connection cxn, char *response) ;
266 static void processResponse437 (Connection cxn, char *response) ;
267 static void processResponse480 (Connection cxn, char *response) ;
268 static void processResponse503 (Connection cxn, char *response) ;
269
270
271 /* Misc functions */
272 static void cxnSleep (Connection cxn) ;
273 static void cxnDead (Connection cxn) ;
274 static void cxnIdle (Connection cxn) ;
275 static void noSuchMessageId (Connection cxn, unsigned int responseCode,
276                            const char *msgid, const char *response) ;
277 static void abortConnection (Connection cxn) ;
278 static void resetConnection (Connection cxn) ;
279 static void deferAllArticles (Connection cxn) ;
280 static void deferQueuedArticles (Connection cxn) ;
281 static void doSomeWrites (Connection cxn) ;
282 static bool issueIHAVE (Connection cxn) ;
283 static void issueIHAVEBody (Connection cxn) ;
284 static bool issueStreamingCommands (Connection cxn) ;
285 static Buffer buildCheckBuffer (Connection cxn) ;
286 static Buffer *buildTakethisBuffers (Connection cxn, Buffer checkBuffer) ;
287 static void issueQUIT (Connection cxn) ;
288 static void initReadBlockedTimeout (Connection cxn) ;
289 static int prepareWriteWithTimeout (EndPoint endp, Buffer *buffers,
290                                     EndpRWCB done, Connection cxn) ;
291 static void delConnection (Connection cxn) ;
292 static void incrFilter (Connection cxn) ;
293 static void decrFilter (Connection cxn) ;
294 static bool writesNeeded (Connection cxn) ;
295 static void validateConnection (Connection cxn) ;
296 static const char *stateToString (CxnState state) ;
297
298 static void issueModeStream (EndPoint e, Connection cxn) ;
299 static void issueAuthUser (EndPoint e, Connection cxn) ;
300 static void issueAuthPass (EndPoint e, Connection cxn) ;
301
302 static void prepareReopenCbk (Connection cxn) ;
303
304
305 /* Article queue management routines. */
306 static ArtHolder newArtHolder (Article art) ;
307 static void delArtHolder (ArtHolder artH) ;
308 static bool remArtHolder (ArtHolder art, ArtHolder *head, unsigned int *count) ;
309 static void appendArtHolder (ArtHolder artH, ArtHolder *head, unsigned int *count) ;
310 static ArtHolder artHolderByMsgId (const char *msgid, ArtHolder head) ;
311
312 static int fudgeFactor (int initVal) ;
313
314
315
316
317 /***************************************************
318  *
319  * Public functions implementation.
320  *
321  ***************************************************/
322
323
324 int cxnConfigLoadCbk (void *data UNUSED)
325 {
326   long iv ;
327   int rval = 1 ;
328   FILE *fp = (FILE *) data ;
329
330   if (getInteger (topScope,"max-reconnect-time",&iv,NO_INHERIT))
331     {
332       if (iv < 1)
333         {
334           rval = 0 ;
335           logOrPrint (LOG_ERR,fp,
336                       "ME config: value of %s (%ld) in %s cannot be less"
337                       " than 1. Using %ld", "max-reconnect-time",
338                       iv,"global scope",(long) MAX_RECON_PER);
339           iv = MAX_RECON_PER ;
340         }
341     }
342   else
343     iv = MAX_RECON_PER ;
344   max_reconnect_period = (unsigned int) iv ;
345
346   if (getInteger (topScope,"initial-reconnect-time",&iv,NO_INHERIT))
347     {
348       if (iv < 1)
349         {
350           rval = 0 ;
351           logOrPrint (LOG_ERR,fp,
352                       "ME config: value of %s (%ld) in %s cannot be less"
353                       " than 1. Using %ld", "initial-reconnect-time",
354                       iv,"global scope",(long)INIT_RECON_PER);
355           iv = INIT_RECON_PER ;
356         }
357     }
358   else
359     iv = INIT_RECON_PER ;
360   init_reconnect_period = (unsigned int) iv ;
361
362   return rval ;
363 }
364   
365
366
367
368 \f
369 /*
370  * Create a new Connection object and return it. All fields are
371  * initialized to reasonable values.
372  */
373 Connection newConnection (Host host,
374                           unsigned int id,
375                           const char *ipname,
376                           unsigned int articleReceiptTimeout,
377                           unsigned int portNum,
378                           unsigned int respTimeout,
379                           unsigned int flushTimeout,
380                           double lowPassLow,
381                           double lowPassHigh,
382                           double lowPassFilter)
383 {
384   Connection cxn ;
385   bool croak = false ;
386
387   if (ipname == NULL)
388     {
389       d_printf (1,"NULL ipname in newConnection\n") ;
390       croak = true ;
391     }
392
393   if (ipname && strlen (ipname) == 0)
394     {
395       d_printf (1,"Empty ipname in newConnection\n") ;
396       croak = true ;
397     }
398
399   if (croak)
400     return NULL ;
401
402   cxn = xcalloc (1, sizeof(struct connection_s));
403
404   cxn->myHost = host ;
405   cxn->myEp = NULL ;
406   cxn->ident = id ;
407
408   cxn->checkHead = NULL ;
409   cxn->checkRespHead = NULL ;
410   cxn->takeHead = NULL ;
411   cxn->takeRespHead = NULL ;
412
413   cxn->articleQTotal = 0 ;
414   cxn->missing = NULL ;
415
416   cxn->respBuffer = newBuffer (BUFFER_SIZE) ;
417   ASSERT (cxn->respBuffer != NULL) ;
418
419   cxn->ipName = xstrdup (ipname) ;
420   cxn->port = portNum ;
421
422   /* Time out the higher numbered connections faster */
423   cxn->articleReceiptTimeout = articleReceiptTimeout * 10.0 / (10.0 + id) ;
424   cxn->artReceiptTimerId = 0 ;
425
426   cxn->readTimeout = respTimeout ;
427   cxn->readBlockedTimerId = 0 ;
428
429   cxn->writeTimeout = respTimeout ; /* XXX should be a separate value */
430   cxn->writeBlockedTimerId = 0 ;
431
432   cxn->flushTimeout = fudgeFactor (flushTimeout) ;
433   cxn->flushTimerId = 0 ;
434
435   cxn->onThreshold = lowPassHigh * lowPassFilter / 100.0 ;
436   cxn->offThreshold = lowPassLow * lowPassFilter / 100.0 ;
437   cxn->lowPassFilter = lowPassFilter;
438
439   cxn->sleepTimerId = 0 ;
440   cxn->sleepTimeout = init_reconnect_period ;
441
442   resetConnection (cxn) ;
443
444   cxn->next = gCxnList ;
445   gCxnList = cxn ;
446   gCxnCount++ ;
447
448   cxn->state = cxnStartingS ;
449
450   return cxn ;
451 }
452
453
454
455
456 \f
457 /* Create a new endpoint hooked to a non-blocking socket that is trying to
458  * connect to the host info stored in the Connection. On fast machines
459  * connecting locally the connect() may have already succeeded when this
460  * returns, but typically the connect will still be running and when it
461  * completes. The Connection will be notified via a write callback setup by
462  * prepareWrite below. If nothing goes wrong then this will return true
463  * (even if the connect() has not yet completed). If something fails
464  * (hostname lookup etc.) then it returns false (and the Connection is left
465  * in the sleeping state)..
466  *
467  * Pre-state            Reason cxnConnect called
468  * ---------            ------------------------
469  * cxnStartingS         Connection owner issued call.
470  * cxnWaitingS          side effect of cxnTakeArticle() call
471  * cxnConnecting        side effect of cxnFlush() call
472  * cxnSleepingS         side effect of reopenTimeoutCbk() call.
473  */
474 bool cxnConnect (Connection cxn)
475 {
476   const struct sockaddr_storage cxnAddr, cxnSelf ;
477   const struct sockaddr *retAddr;
478   int fd, rval ;
479   const char *peerName = hostPeerName (cxn->myHost) ;
480   char msgbuf[100];
481   const struct sockaddr_in *bind_addr = hostBindAddr (cxn->myHost) ;
482   int family = 0;
483 #ifdef HAVE_INET6
484   char paddr[INET6_ADDRSTRLEN];
485   const struct sockaddr_in6 *bind_addr6 = hostBindAddr6 (cxn->myHost) ;
486 #endif
487
488   ASSERT (cxn->myEp == NULL) ;
489
490   if (!(cxn->state == cxnStartingS ||
491         cxn->state == cxnWaitingS ||
492         cxn->state == cxnFlushingS ||
493         cxn->state == cxnSleepingS))
494     {
495       warn ("%s:%d cxnsleep connection in bad state: %s",
496             hostPeerName (cxn->myHost), cxn->ident,
497             stateToString (cxn->state)) ;
498       cxnSleepOrDie (cxn) ;
499       return false;
500     }
501   
502   if (cxn->state == cxnWaitingS)
503     ASSERT (cxn->articleQTotal == 1) ;
504   else
505     ASSERT (cxn->articleQTotal == 0) ;
506
507   cxn->state = cxnConnectingS ;
508
509 #ifdef HAVE_INET6
510   family = hostAddrFamily (cxn->myHost);
511 #endif
512   retAddr = hostIpAddr (cxn->myHost, family) ;
513
514   if (retAddr == NULL)
515     {
516       cxnSleepOrDie (cxn) ;
517       return false ;
518     }
519
520   memcpy( (void *)&cxnAddr, retAddr, SA_LEN(retAddr) );
521
522 #ifdef HAVE_INET6
523   if( cxnAddr.ss_family == AF_INET6 )
524   {
525     ((struct sockaddr_in6 *)&cxnAddr)->sin6_port = htons(cxn->port) ;
526     fd = socket (PF_INET6, SOCK_STREAM, 0);
527   }
528   else
529 #endif
530   {
531     ((struct sockaddr_in *)&cxnAddr)->sin_port = htons(cxn->port) ;
532     fd = socket (PF_INET, SOCK_STREAM, 0);
533   }
534   if (fd < 0)
535     {
536       syswarn ("%s:%d cxnsleep can't create socket", peerName, cxn->ident) ;
537       d_printf (1,"Can't get a socket: %s\n", strerror (errno)) ;
538
539       cxnSleepOrDie (cxn) ;
540
541       return false ;
542     }
543
544 #ifdef HAVE_INET6
545   /* bind to a specified IPv6 address */
546   if( (cxnAddr.ss_family == AF_INET6) && bind_addr6 )
547     {
548       memcpy( (void *)&cxnSelf, bind_addr6, sizeof(struct sockaddr_in6) );
549       if (bind (fd, (struct sockaddr *) &cxnSelf,
550                   sizeof(struct sockaddr_in6)) < 0)
551         {
552           snprintf(msgbuf, sizeof(msgbuf), "bind (%s): %%m",
553                    inet_ntop(AF_INET6, bind_addr6->sin6_addr.s6_addr,
554                              paddr, sizeof(paddr)) );
555
556           syslog (LOG_ERR, msgbuf) ;
557
558           cxnSleepOrDie (cxn) ;
559
560           return false ;
561         }
562     }
563   else
564 #endif
565   /* bind to a specified IPv4 address */
566 #ifdef HAVE_INET6
567   if ( (cxnAddr.ss_family == AF_INET) && bind_addr )
568 #else
569   if (bind_addr)
570 #endif
571     {
572       memcpy( (void *)&cxnSelf, bind_addr, sizeof(struct sockaddr_in) );
573       if (bind (fd, (struct sockaddr *) &cxnSelf,
574                   sizeof(struct sockaddr_in) ) < 0)
575         {
576           snprintf(msgbuf, sizeof(msgbuf), "bind (%s): %%m",
577                    inet_ntoa(bind_addr->sin_addr));
578           syslog (LOG_ERR, msgbuf) ;
579
580           cxnSleepOrDie (cxn) ;
581
582           return false ;
583         }
584     }
585
586   /* set our file descriptor to non-blocking */
587 #if defined (O_NONBLOCK)
588   rval = fcntl (fd, F_GETFL, 0) ;
589   if (rval >= 0)
590     rval = fcntl (fd, F_SETFL, rval | O_NONBLOCK) ;
591 #else
592   {
593     int state = 1 ;
594     rval = ioctl (fd, FIONBIO, (char *) &state) ;
595   }
596 #endif
597
598   if (rval < 0)
599     {
600       syswarn ("%s:%d cxnsleep can't set socket non-blocking", peerName,
601                cxn->ident) ;
602       close (fd) ;
603
604       cxnSleepOrDie (cxn) ;
605
606       return false ;
607     }
608
609   rval = connect (fd, (struct sockaddr *) &cxnAddr,
610                   SA_LEN((struct sockaddr *)&cxnAddr)) ;
611   if (rval < 0 && errno != EINPROGRESS)
612     {
613       syswarn ("%s:%d connect", peerName, cxn->ident) ;
614       hostIpFailed (cxn->myHost) ;
615       close (fd) ;
616
617       cxnSleepOrDie (cxn) ;
618
619       return false ;
620     }
621
622   if ((cxn->myEp = newEndPoint (fd)) == NULL)
623     {
624       /* If this happens, then fd was bigger than what select could handle,
625          so endpoint.c refused to create the new object. */
626       close (fd) ;
627       cxnSleepOrDie (cxn) ;
628       return false ;
629     }
630   
631
632   if (rval < 0)
633     /* when the write callback gets done the connection went through */
634     prepareWrite (cxn->myEp, NULL, NULL, connectionDone, cxn) ;
635   else
636     connectionDone (cxn->myEp, IoDone, NULL, cxn) ;
637
638   /* connectionDone() could set state to sleeping */
639   return (cxn->state == cxnConnectingS ? true : false) ;
640 }
641
642
643
644
645 \f
646 /* Put the Connection into the wait state.
647  *
648  * Pre-state            Reason cxnWait called
649  * ---------            ------------------------
650  * cxnStartingS         - Connection owner called cxnWait()
651  * cxnSleepingS         - side effect of cxnFlush() call.
652  * cxnConnectingS       - side effect of cxnFlush() call.
653  * cxnFlushingS         - side effect of receiving response 205
654  *                        and Connection had no articles when
655  *                        cxnFlush() was issued.
656  *                      - prepareRead failed.
657  *                      - I/O failed.
658  *
659  */
660 void cxnWait (Connection cxn)
661 {
662   ASSERT (cxn->state == cxnStartingS ||
663           cxn->state == cxnSleepingS ||
664           cxn->state == cxnConnectingS ||
665           cxn->state == cxnFeedingS ||
666           cxn->state == cxnFlushingS) ;
667   VALIDATE_CONNECTION (cxn) ;
668
669   abortConnection (cxn) ;
670
671   cxn->state = cxnWaitingS ;
672
673   hostCxnWaiting (cxn->myHost,cxn) ;   /* tell our Host we're waiting */
674 }
675
676
677
678
679 \f
680 /* Tells the Connection to flush itself (i.e. push out all articles,
681  * issue a QUIT and drop the network connection. If necessary a
682  * reconnect will be done immediately after. Called by the Host, or
683  * by the timer callback.
684  *
685  * Pre-state            Reason cxnFlush called
686  * ---------            ------------------------
687  * ALL (except cxnDeadS - Connection owner called cxnFlush()
688  *  and cxnStartingS)
689  * cxnFeedingS          - side effect of flushCxnCbk() call.
690  */
691 void cxnFlush (Connection cxn)
692 {
693   ASSERT (cxn != NULL) ;
694   ASSERT (cxn->state != cxnStartingS) ;
695   ASSERT (cxn->state != cxnDeadS) ;
696   VALIDATE_CONNECTION (cxn) ;
697
698   switch (cxn->state)
699     {
700       case cxnSleepingS:
701         cxnWait (cxn) ;
702         break ;
703
704       case cxnConnectingS:
705         cxnWait (cxn) ;
706         cxnConnect (cxn) ;
707         break ;
708
709       case cxnIdleTimeoutS:
710       case cxnIdleS:
711         ASSERT (cxn->articleQTotal == 0) ;
712         if (cxn->state != cxnIdleTimeoutS)
713           clearTimer (cxn->artReceiptTimerId) ;
714         clearTimer (cxn->flushTimerId) ;
715         cxn->state = cxnFlushingS ;
716         issueQUIT (cxn) ;
717         break ;
718
719       case cxnClosingS:
720       case cxnFlushingS:
721       case cxnWaitingS:
722         if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
723           issueQUIT (cxn) ;
724         break ;
725
726       case cxnFeedingS:
727         /* we only reconnect immediately if we're not idle when cxnFlush()
728            is called. */
729         if (!cxn->immedRecon)
730           {
731             cxn->immedRecon = (cxn->articleQTotal > 0 ? true : false) ;
732             d_printf (1,"%s:%d immediate reconnect for a cxnFlush()\n",
733                      hostPeerName (cxn->myHost), cxn->ident) ;
734           }
735         
736         clearTimer (cxn->flushTimerId) ;
737
738         cxn->state = cxnFlushingS ;
739
740         if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
741           issueQUIT (cxn) ;
742         break ;
743
744       default:
745         die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
746     }
747 }
748
749
750 \f
751 /*
752  * Tells the Connection to dump all articles that are queued and to issue a
753  * QUIT as quickly as possible. Much like cxnClose, except queued articles
754  * are not sent, but are given back to the Host.
755  */
756 void cxnTerminate (Connection cxn)
757 {
758   ASSERT (cxn != NULL) ;
759   ASSERT (cxn->state != cxnDeadS) ;
760   ASSERT (cxn->state != cxnStartingS) ;
761   VALIDATE_CONNECTION (cxn) ;
762   
763   switch (cxn->state)
764     {
765       case cxnFeedingS:
766         d_printf (1,"%s:%d Issuing terminate\n",
767                  hostPeerName (cxn->myHost), cxn->ident) ;
768
769         clearTimer (cxn->flushTimerId) ;
770
771         cxn->state = cxnClosingS ;
772
773         deferQueuedArticles (cxn) ;
774         if (cxn->articleQTotal == 0)
775           issueQUIT (cxn) ; /* send out the QUIT if we can */
776         break ;
777
778       case cxnIdleTimeoutS:
779       case cxnIdleS:
780         ASSERT (cxn->articleQTotal == 0) ;
781         if (cxn->state != cxnIdleTimeoutS)
782           clearTimer (cxn->artReceiptTimerId) ;
783         clearTimer (cxn->flushTimerId) ;
784         cxn->state = cxnClosingS ;
785         issueQUIT (cxn) ;
786         break ;
787
788       case cxnFlushingS: /* we are in the middle of a periodic close. */
789         d_printf (1,"%s:%d Connection already being flushed\n",
790                  hostPeerName (cxn->myHost),cxn->ident);
791         cxn->state = cxnClosingS ;
792         if (cxn->articleQTotal == 0)
793           issueQUIT (cxn) ; /* send out the QUIT if we can */
794         break ;
795
796       case cxnClosingS:
797         d_printf (1,"%s:%d Connection already closing\n",
798                  hostPeerName (cxn->myHost),cxn->ident) ;
799         break ;
800
801       case cxnWaitingS:
802       case cxnConnectingS:
803       case cxnSleepingS:
804         cxnDead (cxn) ;
805         break ;
806
807       default:
808         die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
809     }
810
811   VALIDATE_CONNECTION (cxn) ;
812
813   if (cxn->state == cxnDeadS)
814     {
815       d_printf (1,"%s:%d Deleting connection\n",hostPeerName (cxn->myHost),
816                cxn->ident) ;
817
818       delConnection (cxn) ;
819     }
820 }
821
822
823 \f
824 /* Tells the Connection to do a disconnect and then when it is
825  * disconnected to delete itself.
826  *
827  * Pre-state            Reason cxnClose called
828  * ---------            ------------------------
829  * ALL (except cxnDeadS - Connecton owner called directly.
830  * and cxnStartingS).
831  */
832 void cxnClose (Connection cxn)
833 {
834   ASSERT (cxn != NULL) ;
835   ASSERT (cxn->state != cxnDeadS) ;
836   ASSERT (cxn->state != cxnStartingS) ;
837   VALIDATE_CONNECTION (cxn) ;
838
839   switch (cxn->state)
840     {
841       case cxnFeedingS:
842         d_printf (1,"%s:%d Issuing disconnect\n",
843                  hostPeerName (cxn->myHost), cxn->ident) ;
844
845         clearTimer (cxn->flushTimerId) ;
846
847         cxn->state = cxnClosingS ;
848
849         if (cxn->articleQTotal == 0)
850           issueQUIT (cxn) ; /* send out the QUIT if we can */
851         break ;
852
853       case cxnIdleS:
854       case cxnIdleTimeoutS:
855         ASSERT (cxn->articleQTotal == 0) ;
856         if (cxn->state != cxnIdleTimeoutS)
857           clearTimer (cxn->artReceiptTimerId) ;
858         clearTimer (cxn->flushTimerId) ;
859         cxn->state = cxnClosingS ;
860         issueQUIT (cxn) ;
861         break ;
862
863       case cxnFlushingS: /* we are in the middle of a periodic close. */
864         d_printf (1,"%s:%d Connection already being flushed\n",
865                  hostPeerName (cxn->myHost),cxn->ident);
866         cxn->state = cxnClosingS ;
867         if (cxn->articleQTotal == 0)
868           issueQUIT (cxn) ; /* send out the QUIT if we can */
869         break ;
870
871       case cxnClosingS:
872         d_printf (1,"%s:%d Connection already closing\n",
873                  hostPeerName (cxn->myHost),cxn->ident) ;
874         break ;
875
876       case cxnWaitingS:
877       case cxnConnectingS:
878       case cxnSleepingS:
879         cxnDead (cxn) ;
880         break ;
881
882       default:
883         die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
884     }
885
886   VALIDATE_CONNECTION (cxn) ;
887
888   if (cxn->state == cxnDeadS)
889     {
890       d_printf (1,"%s:%d Deleting connection\n",hostPeerName (cxn->myHost),
891                cxn->ident) ;
892
893       delConnection (cxn) ;
894     }
895 }
896
897
898
899
900 \f
901 /* This is what the Host calls to get us to tranfer an article. If
902  * we're running the IHAVE sequence, then we can't take it if we've
903  * got an article already. If we're running the CHECK/TAKETHIS
904  * sequence, then we'll take as many as we can (up to our MAXCHECK
905  * limit).
906  */
907 bool cxnTakeArticle (Connection cxn, Article art)
908 {
909   bool rval = true ;
910
911   ASSERT (cxn != NULL) ;
912   VALIDATE_CONNECTION (cxn) ;
913
914   if ( !cxnQueueArticle (cxn,art) ) /* might change cxnIdleS to cxnFeedingS */
915     return false ;
916
917   if (!(cxn->state == cxnConnectingS ||
918         cxn->state == cxnFeedingS ||
919         cxn->state == cxnWaitingS))
920     {
921       warn ("%s:%d cxnsleep connection in bad state: %s",
922             hostPeerName (cxn->myHost), cxn->ident,
923             stateToString (cxn->state)) ;
924       cxnSleepOrDie (cxn) ;
925       return false ;
926     }
927   
928   if (cxn->state != cxnWaitingS) /* because articleQTotal == 1 */
929     VALIDATE_CONNECTION (cxn) ;
930   else
931     ASSERT (cxn->articleQTotal == 1) ;
932
933   switch (cxn->state)
934     {
935       case cxnWaitingS:
936         cxnConnect (cxn) ;
937         break ;
938
939       case cxnFeedingS:
940         doSomeWrites (cxn) ;
941         break ;
942
943       case cxnConnectingS:
944         break ;
945
946       default:
947         die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
948     }
949
950   return rval ;
951 }
952
953
954
955
956 \f
957 /* if there's room in the Connection then stick the article on the
958  * queue, otherwise return false.
959  */
960 bool cxnQueueArticle (Connection cxn, Article art)
961 {
962   ArtHolder newArt ;
963   bool rval = false ;
964
965   ASSERT (cxn != NULL) ;
966   ASSERT (cxn->state != cxnStartingS) ;
967   ASSERT (cxn->state != cxnDeadS) ;
968   VALIDATE_CONNECTION (cxn) ;
969
970   switch (cxn->state)
971     {
972       case cxnClosingS:
973         d_printf (5,"%s:%d Refusing article due to closing\n",
974                  hostPeerName (cxn->myHost),cxn->ident) ;
975         break ;
976
977       case cxnFlushingS:
978         d_printf (5,"%s:%d Refusing article due to flushing\n",
979                  hostPeerName (cxn->myHost),cxn->ident) ;
980         break ;
981
982       case cxnSleepingS:
983         d_printf (5,"%s:%d Refusing article due to sleeping\n",
984                  hostPeerName (cxn->myHost),cxn->ident) ;
985         break ;
986
987       case cxnWaitingS:
988         rval = true ;
989         newArt = newArtHolder (art) ;
990         appendArtHolder (newArt, &cxn->checkHead, &cxn->articleQTotal) ;
991         break ;
992
993       case cxnConnectingS:
994         if (cxn->articleQTotal != 0)
995           break ;
996         rval = true ;
997         newArt = newArtHolder (art) ;
998         appendArtHolder (newArt, &cxn->checkHead, &cxn->articleQTotal) ;
999         break ;
1000
1001       case cxnIdleS:
1002       case cxnFeedingS:
1003         if (cxn->articleQTotal >= cxn->maxCheck)
1004           d_printf (5, "%s:%d Refusing article due to articleQTotal >= maxCheck (%d > %d)\n",
1005                    hostPeerName (cxn->myHost), cxn->ident,
1006                    cxn->articleQTotal, cxn->maxCheck) ;
1007         else
1008           {
1009             rval = true ;
1010             newArt = newArtHolder (art) ;
1011             if (cxn->needsChecks)
1012               appendArtHolder (newArt, &cxn->checkHead, &cxn->articleQTotal) ;
1013             else
1014               appendArtHolder (newArt, &cxn->takeHead, &cxn->articleQTotal) ;
1015             if (cxn->state == cxnIdleS)
1016               {
1017                 cxn->state = cxnFeedingS ;
1018                 clearTimer (cxn->artReceiptTimerId) ;
1019               }
1020           }
1021         break ;
1022
1023       default:
1024         die ("Invalid state: %s\n", stateToString (cxn->state)) ;
1025     }
1026
1027   if (rval)
1028     {
1029       d_printf (5,"%s:%d accepting article %s\n",hostPeerName (cxn->myHost),
1030                cxn->ident,artMsgId (art)) ;
1031
1032       cxn->artsTaken++ ;
1033     }
1034
1035   return rval ;
1036 }
1037
1038
1039
1040
1041 \f
1042 /*
1043  * generate a log message for activity. Usually called by the Connection's
1044  * owner
1045  */
1046 void cxnLogStats (Connection cxn, bool final)
1047 {
1048   const char *peerName ;
1049   time_t now = theTime() ;
1050
1051   ASSERT (cxn != NULL) ;
1052
1053   /* only log stats when in one of these three states. */
1054   switch (cxn->state)
1055     {
1056       case cxnFeedingS:
1057       case cxnFlushingS:
1058       case cxnClosingS:
1059         break ;
1060
1061       default:
1062         return ;
1063     }
1064
1065   peerName = hostPeerName (cxn->myHost) ;
1066
1067   notice ("%s:%d %s seconds %ld offered %d accepted %d refused %d"
1068           " rejected %d accsize %.0f rejsize %.0f", peerName, cxn->ident,
1069           (final ? "final" : "checkpoint"), (long) (now - cxn->timeCon),
1070           cxn->checksIssued, cxn->takesOkayed, cxn->checksRefused,
1071           cxn->takesRejected, cxn->takesSizeOkayed, cxn->takesSizeRejected) ;
1072
1073   if (final)
1074     {
1075       cxn->artsTaken = 0 ;
1076       cxn->checksIssued = 0 ;
1077       cxn->checksRefused = 0 ;
1078       cxn->takesRejected = 0 ;
1079       cxn->takesOkayed = 0 ;
1080       cxn->takesSizeRejected = 0 ;
1081       cxn->takesSizeOkayed = 0 ;
1082
1083       if (cxn->timeCon > 0)
1084         cxn->timeCon = theTime() ;
1085     }
1086 }
1087
1088
1089
1090
1091 \f
1092 /*
1093  * return the number of articles the connection will accept.
1094  */
1095 size_t cxnQueueSpace (Connection cxn)
1096 {
1097   int rval = 0 ;
1098
1099   ASSERT (cxn != NULL) ;
1100
1101   if (cxn->state == cxnFeedingS ||
1102       cxn->state == cxnIdleS ||
1103       cxn->state == cxnConnectingS ||
1104       cxn->state == cxnWaitingS)
1105     rval = cxn->maxCheck - cxn->articleQTotal ;
1106
1107   return rval ;
1108 }
1109
1110
1111
1112
1113 \f
1114 /*
1115  * Print info on all the connections that currently exist.
1116  */
1117 void gPrintCxnInfo (FILE *fp, unsigned int indentAmt)
1118 {
1119   char indent [INDENT_BUFFER_SIZE] ;
1120   unsigned int i ;
1121   Connection cxn ;
1122
1123   for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
1124     indent [i] = ' ' ;
1125   indent [i] = '\0' ;
1126
1127   fprintf (fp,"%sGlobal Connection list : (count %d) {\n",
1128            indent,gCxnCount) ;
1129   for (cxn = gCxnList ; cxn != NULL ; cxn = cxn->next)
1130     printCxnInfo (cxn,fp,indentAmt + INDENT_INCR) ;
1131   fprintf (fp,"%s}\n",indent) ;
1132 }
1133
1134
1135
1136
1137 \f
1138 /*
1139  * Print the info about the given connection.
1140  */
1141 void printCxnInfo (Connection cxn, FILE *fp, unsigned int indentAmt)
1142 {
1143   char indent [INDENT_BUFFER_SIZE] ;
1144   unsigned int i ;
1145   ArtHolder artH ;
1146
1147   for (i = 0 ; i < MIN(INDENT_BUFFER_SIZE - 1,indentAmt) ; i++)
1148     indent [i] = ' ' ;
1149   indent [i] = '\0' ;
1150
1151   fprintf (fp,"%sConnection : %p {\n",indent, (void *) cxn) ;
1152   fprintf (fp,"%s    host : %p\n",indent, (void *) cxn->myHost) ;
1153   fprintf (fp,"%s    endpoint : %p\n",indent, (void *) cxn->myEp) ;
1154   fprintf (fp,"%s    state : %s\n",indent, stateToString (cxn->state)) ;
1155   fprintf (fp,"%s    ident : %d\n",indent,cxn->ident) ;
1156   fprintf (fp,"%s    ip-name : %s\n", indent, cxn->ipName) ;
1157   fprintf (fp,"%s    port-number : %d\n",indent,cxn->port) ;
1158   fprintf (fp,"%s    max-checks : %d\n",indent,cxn->maxCheck) ;
1159   fprintf (fp,"%s    does-streaming : %s\n",indent,
1160            boolToString (cxn->doesStreaming)) ;
1161   fprintf (fp,"%s    authenticated : %s\n",indent,
1162            boolToString (cxn->authenticated)) ;
1163   fprintf (fp,"%s    quitWasIssued : %s\n",indent,
1164            boolToString (cxn->quitWasIssued)) ;
1165   fprintf (fp,"%s    needs-checks : %s\n",indent,
1166            boolToString (cxn->needsChecks)) ;
1167
1168   fprintf (fp,"%s    time-connected : %ld\n",indent,(long) cxn->timeCon) ;
1169   fprintf (fp,"%s    articles from INN : %d\n",indent,cxn->artsTaken) ;
1170   fprintf (fp,"%s    articles offered : %d\n",indent,
1171            cxn->checksIssued) ;
1172   fprintf (fp,"%s    articles refused : %d\n",indent,
1173            cxn->checksRefused) ;
1174   fprintf (fp,"%s    articles rejected : %d\n",indent,
1175            cxn->takesRejected) ;
1176   fprintf (fp,"%s    articles accepted : %d\n",indent,
1177            cxn->takesOkayed) ;
1178   fprintf (fp,"%s    low-pass upper limit : %0.6f\n", indent,
1179            cxn->onThreshold) ;
1180   fprintf (fp,"%s    low-pass lower limit : %0.6f\n", indent,
1181            cxn->offThreshold) ;
1182   fprintf (fp,"%s    low-pass filter tc : %0.6f\n", indent,
1183            cxn->lowPassFilter) ;
1184   fprintf (fp,"%s    low-pass filter : %0.6f\n", indent,
1185            cxn->filterValue) ;
1186
1187   fprintf (fp,"%s    article-timeout : %d\n",indent,cxn->articleReceiptTimeout) ;
1188   fprintf (fp,"%s    article-callback : %d\n",indent,cxn->artReceiptTimerId) ;
1189
1190   fprintf (fp,"%s    response-timeout : %d\n",indent,cxn->readTimeout) ;
1191   fprintf (fp,"%s    response-callback : %d\n",indent,cxn->readBlockedTimerId) ;
1192
1193   fprintf (fp,"%s    write-timeout : %d\n",indent,cxn->writeTimeout) ;
1194   fprintf (fp,"%s    write-callback : %d\n",indent,cxn->writeBlockedTimerId) ;
1195
1196   fprintf (fp,"%s    flushTimeout : %d\n",indent,cxn->flushTimeout) ;
1197   fprintf (fp,"%s    flushTimerId : %d\n",indent,cxn->flushTimerId) ;
1198
1199   fprintf (fp,"%s    reopen wait : %d\n",indent,cxn->sleepTimeout) ;
1200   fprintf (fp,"%s    reopen id : %d\n",indent,cxn->sleepTimerId) ;
1201
1202   fprintf (fp,"%s    CHECK queue {\n",indent) ;
1203   for (artH = cxn->checkHead ; artH != NULL ; artH = artH->next)
1204     printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1205   fprintf (fp,"%s    }\n",indent) ;
1206
1207   fprintf (fp,"%s    CHECK Response queue {\n",indent) ;
1208   for (artH = cxn->checkRespHead ; artH != NULL ; artH = artH->next)
1209     printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1210   fprintf (fp,"%s    }\n",indent) ;
1211
1212   fprintf (fp,"%s    TAKE queue {\n",indent) ;
1213   for (artH = cxn->takeHead ; artH != NULL ; artH = artH->next)
1214     printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1215   fprintf (fp,"%s    }\n",indent) ;
1216
1217   fprintf (fp,"%s    TAKE response queue {\n",indent) ;
1218   for (artH = cxn->takeRespHead ; artH != NULL ; artH = artH->next)
1219     printArticleInfo (artH->article,fp,indentAmt + INDENT_INCR) ;
1220   fprintf (fp,"%s    }\n",indent) ;
1221
1222   fprintf (fp,"%s    response buffer {\n",indent) ;
1223   printBufferInfo (cxn->respBuffer,fp,indentAmt + INDENT_INCR) ;
1224   fprintf (fp,"%s    }\n",indent) ;
1225
1226   fprintf (fp,"%s}\n",indent) ;
1227 }
1228
1229
1230
1231
1232 \f
1233 /*
1234  * return the number of articles the connection will accept.
1235  */
1236 bool cxnCheckstate (Connection cxn)
1237 {
1238   bool rval = false ;
1239
1240   ASSERT (cxn != NULL) ;
1241
1242   if (cxn->state == cxnFeedingS ||
1243       cxn->state == cxnIdleS ||
1244       cxn->state == cxnConnectingS)
1245     rval = true ;
1246
1247   return rval ;
1248 }
1249
1250
1251
1252
1253 \f
1254 /**********************************************************************/
1255 /**                       STATIC PRIVATE FUNCTIONS                   **/
1256 /**********************************************************************/
1257
1258
1259 /*
1260  * ENDPOINT CALLBACK AREA.
1261  *
1262  * All the functions in this next section are callbacks fired by the
1263  * EndPoint objects/class (either timers or i/o completion callbacks)..
1264  */
1265
1266 \f
1267 /*
1268  * this is the first stage of the NNTP FSM. This function is called
1269  * when the tcp/ip network connection is setup and we should get
1270  * ready to read the banner message. When this function returns the
1271  * state of the Connection will still be cxnConnectingS unless
1272  * something broken, in which case it probably went into the
1273  * cxnSleepingS state.
1274  */
1275 static void connectionDone (EndPoint e, IoStatus i, Buffer *b, void *d)
1276 {
1277   Buffer *readBuffers ;
1278   Connection cxn = (Connection) d ;
1279   const char *peerName ;
1280   int optval;
1281   socklen_t size ;
1282
1283   ASSERT (b == NULL) ;
1284   ASSERT (cxn->state == cxnConnectingS) ;
1285   ASSERT (!writeIsPending (cxn->myEp)) ;
1286
1287   size = sizeof (optval) ;
1288   peerName = hostPeerName (cxn->myHost) ;
1289
1290   if (i != IoDone)
1291     {
1292       errno = endPointErrno (e) ;
1293       syswarn ("%s:%d cxnsleep i/o failed", peerName, cxn->ident) ;
1294
1295       cxnSleepOrDie (cxn) ;
1296     }
1297   else if (getsockopt (endPointFd (e), SOL_SOCKET, SO_ERROR,
1298                        (char *) &optval, &size) != 0)
1299     {
1300       /* This is bad. Can't even get the SO_ERROR value out of the socket */
1301       syswarn ("%s:%d cxnsleep internal getsockopt", peerName, cxn->ident) ;
1302
1303       cxnSleepOrDie (cxn) ;
1304     }
1305   else if (optval != 0)
1306     {
1307       /* if the connect failed then the only way to know is by getting
1308          the SO_ERROR value out of the socket. */
1309       errno = optval ;
1310       syswarn ("%s:%d cxnsleep connect", peerName, cxn->ident) ;
1311       hostIpFailed (cxn->myHost) ;
1312
1313       cxnSleepOrDie (cxn) ;
1314     }
1315   else
1316     {
1317       readBuffers = makeBufferArray (bufferTakeRef (cxn->respBuffer), NULL) ;
1318
1319       if ( !prepareRead (e, readBuffers, getBanner, cxn, 1) )
1320         {
1321           warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1322
1323           cxnSleepOrDie (cxn) ;
1324         }
1325       else
1326         {
1327           initReadBlockedTimeout (cxn) ;
1328
1329           /* set up the callback for closing down the connection at regular
1330              intervals (due to problems with long running nntpd). */
1331           if (cxn->flushTimeout > 0)
1332             cxn->flushTimerId = prepareSleep (flushCxnCbk,
1333                                               cxn->flushTimeout,cxn) ;
1334
1335           /* The state doesn't change yet until we've read the banner and
1336              tried the MODE STREAM command. */
1337         }
1338     }
1339   VALIDATE_CONNECTION (cxn) ;
1340 }
1341
1342
1343
1344
1345 \f
1346 /*
1347  * This is called when we are so far in the connection setup that
1348  * we're confident it'll work.  If the connection is IPv6, remove
1349  * the IPv4 addresses from the address list.
1350  */
1351 static void connectionIfIpv6DeleteIpv4Addr (Connection cxn)
1352 {
1353 #ifdef HAVE_INET6
1354   struct sockaddr_storage ss;
1355   socklen_t len = sizeof(ss);
1356
1357   if (getpeername (endPointFd (cxn->myEp), (struct sockaddr *)&ss, &len) < 0)
1358     return;
1359   if (ss.ss_family != AF_INET6)
1360     return;
1361
1362   hostDeleteIpv4Addr (cxn->myHost);
1363 #endif
1364 }
1365
1366
1367
1368
1369  
1370 /*
1371  * Called when the banner message has been read off the wire and is
1372  * in the buffer(s). When this function returns the state of the
1373  * Connection will still be cxnConnectingS unless something broken,
1374  * in which case it probably went into the cxnSleepiongS state.
1375  */
1376 static void getBanner (EndPoint e, IoStatus i, Buffer *b, void *d)
1377 {
1378   Buffer *readBuffers ;
1379   Connection cxn = (Connection) d ;
1380   char *p = bufferBase (b[0]) ;
1381   int code ;
1382   bool isOk = false ;
1383   const char *peerName ;
1384   char *rest ;
1385
1386   ASSERT (e == cxn->myEp) ;
1387   ASSERT (b[0] == cxn->respBuffer) ;
1388   ASSERT (b[1] == NULL) ;
1389   ASSERT (cxn->state == cxnConnectingS) ;
1390   ASSERT (!writeIsPending (cxn->myEp));
1391
1392
1393   peerName = hostPeerName (cxn->myHost) ;
1394
1395   bufferAddNullByte (b[0]) ;
1396
1397   if (i != IoDone)
1398     {
1399       errno = endPointErrno (cxn->myEp) ;
1400       syswarn ("%s:%d cxnsleep can't read banner", peerName, cxn->ident) ;
1401       hostIpFailed (cxn->myHost) ;
1402
1403       cxnSleepOrDie (cxn) ;
1404     }
1405   else if (strchr (p, '\n') == NULL)
1406     {                           /* partial read. expand buffer and retry */
1407       expandBuffer (b[0], BUFFER_EXPAND_AMOUNT) ;
1408       readBuffers = makeBufferArray (bufferTakeRef (b[0]), NULL) ;
1409
1410       if ( !prepareRead (e, readBuffers, getBanner, cxn, 1) )
1411         {
1412           warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1413
1414           cxnSleepOrDie (cxn) ;
1415         }
1416     }
1417   else if ( !getNntpResponse (p, &code, &rest) )
1418     {
1419       trim_ws (p) ;
1420
1421       warn ("%s:%d cxnsleep response format: %s", peerName, cxn->ident, p) ;
1422
1423       cxnSleepOrDie (cxn) ;
1424     }
1425   else
1426     {
1427       trim_ws (p) ;
1428
1429       switch (code)
1430         {
1431           case 200:             /* normal */
1432           case 201:             /* can transfer but not post -- old nntpd */
1433             isOk = true ;
1434             break ;
1435
1436           case 400:
1437             cxnSleepOrDie (cxn) ;
1438             hostIpFailed (cxn->myHost) ;
1439             hostCxnBlocked (cxn->myHost, cxn, rest) ;
1440             break ;
1441
1442           case 502:
1443             warn ("%s:%d cxnsleep no permission to talk: %s", peerName,
1444                   cxn->ident, p) ;
1445             cxnSleepOrDie (cxn) ;
1446             hostIpFailed (cxn->myHost) ;
1447             hostCxnBlocked (cxn->myHost, cxn, rest) ;
1448             break ;
1449
1450           default:
1451             warn ("%s:%d cxnsleep response unknown banner: %d %s", peerName,
1452                   cxn->ident, code, p) ;
1453             d_printf (1,"%s:%d Unknown response code: %d: %s\n",
1454                      hostPeerName (cxn->myHost),cxn->ident, code, p) ;
1455             cxnSleepOrDie (cxn) ;
1456             hostIpFailed (cxn->myHost) ;
1457             hostCxnBlocked (cxn->myHost, cxn, rest) ;
1458             break ;
1459         }
1460
1461       if ( isOk )
1462         {
1463       /* If we got this far and the connection is IPv6, remove
1464          the IPv4 addresses from the address list. */
1465       connectionIfIpv6DeleteIpv4Addr (cxn);
1466
1467           if (hostUsername (cxn->myHost) != NULL
1468               && hostPassword (cxn->myHost) != NULL)
1469             issueAuthUser (e,cxn);
1470           else
1471             issueModeStream (e,cxn);
1472         }
1473     }
1474   freeBufferArray (b) ;
1475 }
1476
1477
1478
1479
1480 \f
1481 static void issueAuthUser (EndPoint e, Connection cxn)
1482 {
1483   Buffer authUserBuffer;
1484   Buffer *authUserCmdBuffers,*readBuffers;
1485   size_t lenBuff = 0 ;
1486   char *t ;
1487
1488   /* 17 == strlen("AUTHINFO USER \r\n\0") */
1489   lenBuff = (17 + strlen (hostUsername (cxn->myHost))) ;
1490   authUserBuffer = newBuffer (lenBuff) ;
1491   t = bufferBase (authUserBuffer) ;
1492
1493   sprintf (t, "AUTHINFO USER %s\r\n", hostUsername (cxn->myHost)) ;
1494   bufferSetDataSize (authUserBuffer, strlen (t)) ;
1495
1496   authUserCmdBuffers = makeBufferArray (authUserBuffer, NULL) ;
1497
1498   if ( !prepareWriteWithTimeout (e, authUserCmdBuffers, authUserIssued,
1499                                  cxn) )
1500     {
1501       die ("%s:%d fatal prepare write for authinfo user failed",
1502            hostPeerName (cxn->myHost), cxn->ident) ;
1503     }
1504
1505   bufferSetDataSize (cxn->respBuffer, 0) ;
1506
1507   readBuffers = makeBufferArray (bufferTakeRef(cxn->respBuffer),NULL);
1508
1509   if ( !prepareRead (e, readBuffers, getAuthUserResponse, cxn, 1) )
1510     {
1511       warn ("%s:%d cxnsleep prepare read failed", hostPeerName (cxn->myHost),
1512             cxn->ident) ;
1513       freeBufferArray (readBuffers) ;
1514       cxnSleepOrDie (cxn) ;
1515     }
1516
1517 }
1518
1519
1520
1521
1522
1523 \f
1524 static void issueAuthPass (EndPoint e, Connection cxn)
1525 {
1526   Buffer authPassBuffer;
1527   Buffer *authPassCmdBuffers,*readBuffers;
1528   size_t lenBuff = 0 ;
1529   char *t ;
1530
1531   /* 17 == strlen("AUTHINFO PASS \r\n\0") */
1532   lenBuff = (17 + strlen (hostPassword (cxn->myHost))) ;
1533   authPassBuffer = newBuffer (lenBuff) ;
1534   t = bufferBase (authPassBuffer) ;
1535
1536   sprintf (t, "AUTHINFO PASS %s\r\n", hostPassword (cxn->myHost)) ;
1537   bufferSetDataSize (authPassBuffer, strlen (t)) ;
1538
1539   authPassCmdBuffers = makeBufferArray (authPassBuffer, NULL) ;
1540
1541   if ( !prepareWriteWithTimeout (e, authPassCmdBuffers, authPassIssued,
1542                                  cxn) )
1543     {
1544       die ("%s:%d fatal prepare write for authinfo pass failed",
1545            hostPeerName (cxn->myHost), cxn->ident) ;
1546     }
1547
1548   bufferSetDataSize (cxn->respBuffer, 0) ;
1549
1550   readBuffers = makeBufferArray (bufferTakeRef(cxn->respBuffer),NULL);
1551
1552   if ( !prepareRead (e, readBuffers, getAuthPassResponse, cxn, 1) )
1553     {
1554       warn ("%s:%d cxnsleep prepare read failed", hostPeerName (cxn->myHost),
1555             cxn->ident) ;
1556       freeBufferArray (readBuffers) ;
1557       cxnSleepOrDie (cxn) ;
1558     }
1559
1560 }
1561
1562
1563
1564
1565
1566 \f
1567 static void issueModeStream (EndPoint e, Connection cxn)
1568 {
1569   Buffer *modeCmdBuffers,*readBuffers ;
1570   Buffer modeBuffer ;
1571   char *p;
1572
1573 #define  MODE_CMD "MODE STREAM\r\n"
1574
1575   modeBuffer = newBuffer (strlen (MODE_CMD) + 1) ;
1576   p = bufferBase (modeBuffer) ;
1577
1578   /* now issue the MODE STREAM command */
1579   d_printf (1,"%s:%d Issuing the streaming command: %s\n",
1580             hostPeerName (cxn->myHost),cxn->ident,MODE_CMD) ;
1581
1582   strcpy (p, MODE_CMD) ;
1583
1584   bufferSetDataSize (modeBuffer, strlen (p)) ;
1585
1586   modeCmdBuffers = makeBufferArray (modeBuffer, NULL) ;
1587
1588   if ( !prepareWriteWithTimeout (e, modeCmdBuffers, modeCmdIssued,
1589                                  cxn) )
1590     {
1591       die ("%s:%d fatal prepare write for mode stream failed",
1592            hostPeerName (cxn->myHost), cxn->ident) ;
1593     }
1594
1595   bufferSetDataSize (cxn->respBuffer, 0) ;
1596
1597   readBuffers = makeBufferArray (bufferTakeRef(cxn->respBuffer),NULL);
1598
1599   if ( !prepareRead (e, readBuffers, getModeResponse, cxn, 1) )
1600     {
1601       warn ("%s:%d cxnsleep prepare read failed", hostPeerName (cxn->myHost),
1602             cxn->ident) ;
1603       freeBufferArray (readBuffers) ;
1604       cxnSleepOrDie (cxn) ;
1605     }
1606 }
1607
1608
1609
1610
1611 \f
1612 /*
1613  *
1614  */
1615 static void getAuthUserResponse (EndPoint e, IoStatus i, Buffer *b, void *d)
1616 {
1617   Connection cxn = (Connection) d ;
1618   int code ;
1619   char *p = bufferBase (b[0]) ;
1620   Buffer *buffers ;
1621   const char *peerName ;
1622
1623   ASSERT (e == cxn->myEp) ;
1624   ASSERT (b [0] == cxn->respBuffer) ;
1625   ASSERT (b [1] == NULL) ;      /* only ever one buffer on this read */
1626   ASSERT (cxn->state == cxnConnectingS) ;
1627   VALIDATE_CONNECTION (cxn) ;
1628
1629   peerName = hostPeerName (cxn->myHost) ;
1630
1631   bufferAddNullByte (b[0]) ;
1632
1633   d_printf (1,"%s:%d Processing authinfo user response: %s", /* no NL */
1634             hostPeerName (cxn->myHost), cxn->ident, p) ;
1635
1636   if (i == IoDone && writeIsPending (cxn->myEp))
1637     {
1638       /* badness. should never happen */
1639       warn ("%s:%d cxnsleep authinfo command still pending", peerName,
1640             cxn->ident) ;
1641
1642       cxnSleepOrDie (cxn) ;
1643     }
1644   else if (i != IoDone)
1645     {
1646       if (i != IoEOF)
1647         {
1648           errno = endPointErrno (e) ;
1649           syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1650         }
1651       cxnSleepOrDie (cxn) ;
1652     }
1653   else if (strchr (p, '\n') == NULL)
1654     {
1655       /* partial read */
1656       expandBuffer (b [0], BUFFER_EXPAND_AMOUNT) ;
1657
1658       buffers = makeBufferArray (bufferTakeRef (b [0]), NULL) ;
1659       if ( !prepareRead (e, buffers, getAuthUserResponse, cxn, 1) )
1660         {
1661           warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1662           freeBufferArray (buffers) ;
1663           cxnSleepOrDie (cxn) ;
1664         }
1665     }
1666   else
1667     {
1668       clearTimer (cxn->readBlockedTimerId) ;
1669
1670       if ( !getNntpResponse (p, &code, NULL) )
1671         {
1672           warn ("%s:%d cxnsleep response to AUTHINFO USER: %s", peerName,
1673                 cxn->ident, p) ;
1674
1675           cxnSleepOrDie (cxn) ;
1676         }
1677       else
1678         {
1679           notice ("%s:%d connected", peerName, cxn->ident) ;
1680
1681           switch (code)
1682             {
1683             case 381:
1684               issueAuthPass (e,cxn);
1685               break ;
1686
1687             default:
1688               warn ("%s:%d cxnsleep response to AUTHINFO USER: %s", peerName,
1689                     cxn->ident, p) ;
1690               cxn->authenticated = true;
1691               issueModeStream (e,cxn);
1692               break ;
1693             }
1694
1695         }
1696     }
1697 }
1698
1699
1700
1701
1702 \f
1703 /*
1704  *
1705  */
1706 static void getAuthPassResponse (EndPoint e, IoStatus i, Buffer *b, void *d)
1707 {
1708   Connection cxn = (Connection) d ;
1709   int code ;
1710   char *p = bufferBase (b[0]) ;
1711   Buffer *buffers ;
1712   const char *peerName ;
1713
1714   ASSERT (e == cxn->myEp) ;
1715   ASSERT (b [0] == cxn->respBuffer) ;
1716   ASSERT (b [1] == NULL) ;      /* only ever one buffer on this read */
1717   ASSERT (cxn->state == cxnConnectingS) ;
1718   VALIDATE_CONNECTION (cxn) ;
1719
1720   peerName = hostPeerName (cxn->myHost) ;
1721
1722   bufferAddNullByte (b[0]) ;
1723
1724   d_printf (1,"%s:%d Processing authinfo pass response: %s", /* no NL */
1725             hostPeerName (cxn->myHost), cxn->ident, p) ;
1726
1727   if (i == IoDone && writeIsPending (cxn->myEp))
1728     {
1729       /* badness. should never happen */
1730       warn ("%s:%d cxnsleep authinfo command still pending", peerName,
1731             cxn->ident) ;
1732
1733       cxnSleepOrDie (cxn) ;
1734     }
1735   else if (i != IoDone)
1736     {
1737       if (i != IoEOF)
1738         {
1739           errno = endPointErrno (e) ;
1740           syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1741         }
1742       cxnSleepOrDie (cxn) ;
1743     }
1744   else if (strchr (p, '\n') == NULL)
1745     {
1746       /* partial read */
1747       expandBuffer (b [0], BUFFER_EXPAND_AMOUNT) ;
1748
1749       buffers = makeBufferArray (bufferTakeRef (b [0]), NULL) ;
1750       if ( !prepareRead (e, buffers, getAuthPassResponse, cxn, 1) )
1751         {
1752           warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1753           freeBufferArray (buffers) ;
1754           cxnSleepOrDie (cxn) ;
1755         }
1756     }
1757   else
1758     {
1759       clearTimer (cxn->readBlockedTimerId) ;
1760
1761       if ( !getNntpResponse (p, &code, NULL) )
1762         {
1763           warn ("%s:%d cxnsleep response to AUTHINFO PASS: %s", peerName,
1764                 cxn->ident, p) ;
1765
1766           cxnSleepOrDie (cxn) ;
1767         }
1768       else
1769         {
1770           switch (code)
1771             {
1772             case 281:
1773               notice ("%s:%d authenticated", peerName, cxn->ident) ;
1774               cxn->authenticated = true ;
1775               issueModeStream (e,cxn);
1776               break ;
1777
1778             default:
1779               warn ("%s:%d cxnsleep response to AUTHINFO PASS: %s", peerName,
1780                     cxn->ident, p) ;
1781               cxnSleepOrDie (cxn) ;
1782               break ;
1783             }
1784
1785         }
1786     }
1787 }
1788
1789
1790
1791
1792 \f
1793 /*
1794  * Process the remote's response to our MODE STREAM command. This is where
1795  * the Connection moves into the cxnFeedingS state. If the remote has given
1796  * us a good welcome banner, but then immediately dropped the connection,
1797  * we'll arrive here with the MODE STREAM command still queued up.
1798  */
1799 static void getModeResponse (EndPoint e, IoStatus i, Buffer *b, void *d)
1800 {
1801   Connection cxn = (Connection) d ;
1802   int code ;
1803   char *p = bufferBase (b[0]) ;
1804   Buffer *buffers ;
1805   const char *peerName ;
1806
1807   ASSERT (e == cxn->myEp) ;
1808   ASSERT (b [0] == cxn->respBuffer) ;
1809   ASSERT (b [1] == NULL) ;      /* only ever one buffer on this read */
1810   ASSERT (cxn->state == cxnConnectingS) ;
1811   VALIDATE_CONNECTION (cxn) ;
1812
1813   peerName = hostPeerName (cxn->myHost) ;
1814
1815   bufferAddNullByte (b[0]) ;
1816
1817   d_printf (1,"%s:%d Processing mode response: %s", /* no NL */
1818            hostPeerName (cxn->myHost), cxn->ident, p) ;
1819
1820   if (i == IoDone && writeIsPending (cxn->myEp))
1821     {                           /* badness. should never happen */
1822       warn ("%s:%d cxnsleep mode stream command still pending", peerName,
1823             cxn->ident) ;
1824
1825       cxnSleepOrDie (cxn) ;
1826     }
1827   else if (i != IoDone)
1828     {
1829       if (i != IoEOF)
1830         {
1831           errno = endPointErrno (e) ;
1832           syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1833         }
1834       cxnSleepOrDie (cxn) ;
1835     }
1836   else if (strchr (p, '\n') == NULL)
1837     {                           /* partial read */
1838       expandBuffer (b [0], BUFFER_EXPAND_AMOUNT) ;
1839
1840       buffers = makeBufferArray (bufferTakeRef (b [0]), NULL) ;
1841       if ( !prepareRead (e, buffers, getModeResponse, cxn, 1) )
1842         {
1843           warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1844           freeBufferArray (buffers) ;
1845           cxnSleepOrDie (cxn) ;
1846         }
1847     }
1848   else
1849     {
1850       clearTimer (cxn->readBlockedTimerId) ;
1851
1852       if ( !getNntpResponse (p, &code, NULL) )
1853         {
1854           warn ("%s:%d cxnsleep response to MODE STREAM: %s", peerName,
1855                 cxn->ident, p) ;
1856
1857           cxnSleepOrDie (cxn) ;
1858         }
1859       else
1860         {
1861           if (!cxn->authenticated)
1862             notice ("%s:%d connected", peerName, cxn->ident) ;
1863           
1864           switch (code)
1865             {
1866               case 203:             /* will do streaming */
1867                 hostRemoteStreams (cxn->myHost, cxn, true) ;
1868
1869                 if (hostWantsStreaming (cxn->myHost))
1870                   {
1871                     cxn->doesStreaming = true ;
1872                     cxn->maxCheck = hostMaxChecks (cxn->myHost) ;
1873                   }
1874                 else
1875                   cxn->maxCheck = 1 ;
1876             
1877                 break ;
1878                 
1879               default:                      /* won't do it */
1880                 hostRemoteStreams (cxn->myHost, cxn, false) ;
1881                 cxn->maxCheck = 1 ;
1882                 break ;
1883             }
1884           
1885           /* now we consider ourselves completly connected. */
1886           cxn->timeCon = theTime () ;
1887           if (cxn->articleQTotal == 0)
1888             cxnIdle (cxn) ;
1889           else
1890             cxn->state = cxnFeedingS ;
1891           
1892               /* one for the connection and one for the buffer array */
1893           ASSERT (cxn->authenticated || bufferRefCount (cxn->respBuffer) == 2) ;
1894           
1895           /* there was only one line in there, right? */
1896           bufferSetDataSize (cxn->respBuffer, 0) ;
1897           buffers = makeBufferArray (bufferTakeRef (cxn->respBuffer), NULL) ;
1898           
1899               /* sleepTimeout get changed at each failed attempt, so reset. */
1900           cxn->sleepTimeout = init_reconnect_period ;
1901           
1902           if ( !prepareRead (cxn->myEp, buffers, responseIsRead, cxn, 1) )
1903             {
1904               freeBufferArray (buffers) ;
1905               
1906               cxnSleepOrDie (cxn) ;
1907             }
1908           else
1909             {
1910               /* now we wait for articles from our Host, or we have some
1911                  articles already. On infrequently used connections, the
1912                  network link is torn down and rebuilt as needed. So we may
1913                  be rebuilding the connection here in which case we have an
1914                  article to send. */
1915               if (writesNeeded (cxn) || hostGimmeArticle (cxn->myHost,cxn))
1916                 doSomeWrites (cxn) ;
1917             }
1918         }
1919     }
1920   
1921   freeBufferArray (b) ;
1922 }
1923
1924
1925
1926
1927 \f
1928 /*
1929  * called when a response has been read from the socket. This is
1930  * where the bulk of the processing starts.
1931  */
1932 static void responseIsRead (EndPoint e, IoStatus i, Buffer *b, void *d)
1933 {
1934   Connection cxn = (Connection) d ;
1935   char *response ;
1936   char *endr ;
1937   char *bufBase ;
1938   unsigned int respSize ;
1939   int code ;
1940   char *rest = NULL ;
1941   Buffer buf ;
1942   Buffer *bArr ;
1943   const char *peerName ;
1944
1945   ASSERT (e == cxn->myEp) ;
1946   ASSERT (b != NULL) ;
1947   ASSERT (b [1] == NULL) ;
1948   ASSERT (b [0] == cxn->respBuffer) ;
1949   ASSERT (cxn->state == cxnFeedingS ||
1950           cxn->state == cxnIdleS    ||
1951           cxn->state == cxnClosingS ||
1952           cxn->state == cxnFlushingS) ;
1953   VALIDATE_CONNECTION (cxn) ;
1954
1955   bufferAddNullByte (b [0]) ;
1956
1957   peerName = hostPeerName (cxn->myHost) ;
1958
1959   if (i != IoDone)
1960     {                           /* uh oh. */
1961       if (i != IoEOF)
1962         {
1963           errno = endPointErrno (e) ;
1964           syswarn ("%s:%d cxnsleep can't read response", peerName, cxn->ident);
1965         }
1966       freeBufferArray (b) ;
1967
1968       cxnLogStats (cxn,true) ;
1969
1970       if (cxn->state == cxnClosingS)
1971         {
1972           cxnDead (cxn) ;
1973           delConnection (cxn) ;
1974         }
1975       else
1976         cxnSleep (cxn) ;
1977
1978       return ;
1979     }
1980
1981   buf = b [0] ;
1982   bufBase = bufferBase (buf) ;
1983
1984   /* check that we have (at least) a full line response. If not expand
1985      the buffer and resubmit the read. */
1986   if (strchr (bufBase, '\n') == 0)
1987     {
1988       if (!expandBuffer (buf, BUFFER_EXPAND_AMOUNT))
1989         {
1990           warn ("%s:%d cxnsleep can't expand input buffer", peerName,
1991                 cxn->ident) ;
1992           freeBufferArray (b) ;
1993
1994           cxnSleepOrDie (cxn) ;
1995         }
1996       else if ( !prepareRead (cxn->myEp, b, responseIsRead, cxn, 1))
1997         {
1998           warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
1999           freeBufferArray (b) ;
2000
2001           cxnSleepOrDie (cxn) ;
2002         }
2003
2004       return ;
2005     }
2006
2007
2008   freeBufferArray (b) ; /* connection still has reference to buffer */
2009
2010   
2011   /*
2012    * Now process all the full responses that we have.
2013    */
2014   response = bufBase ;
2015   respSize = bufferDataSize (cxn->respBuffer) ;
2016
2017   while ((endr = strchr (response, '\n')) != NULL)
2018     {
2019       char *next = endr + 1 ;
2020
2021       if (*next == '\r')
2022         next++ ;
2023
2024       endr-- ;
2025       if (*endr != '\r')
2026         endr++ ;
2027
2028       if (next - endr != 2 && !cxn->loggedNoCr)
2029         {
2030           /* only a newline there. we'll live with it */
2031           warn ("%s:%d remote not giving out CR characters", peerName,
2032                 cxn->ident) ;
2033           cxn->loggedNoCr = true ;
2034         }
2035
2036       *endr = '\0' ;
2037
2038       if ( !getNntpResponse (response, &code, &rest) )
2039         {
2040           warn ("%s:%d cxnsleep response format: %s", peerName, cxn->ident,
2041                 response) ;
2042           cxnSleepOrDie (cxn) ;
2043
2044           return ;
2045         }
2046       
2047       d_printf (5,"%s:%d Response %d: %s\n", peerName, cxn->ident, code, response) ;
2048
2049       /* now handle the response code. I'm not using symbolic names on
2050          purpose--the numbers are all you see in the RFC's. */
2051       switch (code)
2052         {
2053           case 205:             /* OK response to QUIT. */
2054             processResponse205 (cxn, response) ;
2055             break ;
2056
2057
2058
2059             /* These three are from the CHECK command */
2060           case 238:             /* no such article found */
2061             /* Do not incrFilter (cxn) now, wait till after
2062                subsequent TAKETHIS */
2063             processResponse238 (cxn, response) ;
2064             break ;
2065
2066           case 431:             /* try again later (also for TAKETHIS) */
2067             decrFilter (cxn) ;
2068             if (hostDropDeferred (cxn->myHost))
2069                 processResponse438 (cxn, response) ;
2070             else
2071                 processResponse431 (cxn, response) ;
2072             break ;
2073
2074           case 438:             /* already have it */
2075             decrFilter (cxn) ;
2076             processResponse438 (cxn, response) ;
2077             break ;
2078
2079
2080
2081             /* These are from the TAKETHIS command */
2082           case 239:             /* article transferred OK */
2083             incrFilter (cxn) ;
2084             processResponse239 (cxn, response) ;
2085             break ;
2086
2087           case 439:             /* article rejected */
2088             decrFilter (cxn) ;
2089             processResponse439 (cxn, response) ;
2090             break ;
2091
2092
2093
2094             /* These are from the IHAVE command */
2095           case 335:             /* send article */
2096             processResponse335 (cxn, response) ;
2097             break ;
2098
2099           case 435:             /* article not wanted */
2100             processResponse435 (cxn, response) ;
2101             break ;
2102
2103           case 436:             /* transfer failed try again later */
2104             if (cxn->takeRespHead == NULL && hostDropDeferred (cxn->myHost))
2105                 processResponse435 (cxn, response) ;
2106             else
2107                 processResponse436 (cxn, response) ;
2108             break ;
2109
2110           case 437:             /* article rejected */
2111             processResponse437 (cxn, response) ;
2112             break ;
2113
2114           case 400:             /* has stopped accepting articles */
2115             processResponse400 (cxn, response) ;
2116             break ;
2117
2118             
2119
2120           case 235:             /* article transfered OK (IHAVE-body) */
2121             processResponse235 (cxn, response) ;
2122             break ;
2123
2124
2125           case 480:             /* Transfer permission denied. */
2126             processResponse480  (cxn,response) ;
2127             break ;
2128             
2129           case 503:             /* remote timeout. */
2130             processResponse503  (cxn,response) ;
2131             break ;
2132
2133           default:
2134             warn ("%s:%d cxnsleep response unknown: %d %s", peerName,
2135                   cxn->ident, code, response) ;
2136             cxnSleepOrDie (cxn) ;
2137             break ;
2138         }
2139
2140       VALIDATE_CONNECTION (cxn) ;
2141
2142       if (cxn->state != cxnFeedingS && cxn->state != cxnClosingS &&
2143           cxn->state != cxnFlushingS && cxn->state != cxnIdleS /* XXX */)
2144         break ;                 /* connection is terminated */
2145
2146       response = next ;
2147     }
2148
2149   d_printf (5,"%s:%d done with responses\n",hostPeerName (cxn->myHost),
2150            cxn->ident) ;
2151
2152   switch (cxn->state)
2153     {
2154       case cxnIdleS:
2155       case cxnFeedingS:
2156       case cxnClosingS:
2157       case cxnFlushingS:
2158         /* see if we need to drop in to or out of no-CHECK mode */
2159         if (cxn->state == cxnFeedingS && cxn->doesStreaming)
2160           {
2161             if ((cxn->filterValue > cxn->onThreshold) && cxn->needsChecks) {
2162               cxn->needsChecks = false;
2163               hostLogNoCheckMode (cxn->myHost, true,
2164                                   cxn->offThreshold/cxn->lowPassFilter,
2165                                   cxn->filterValue/cxn->lowPassFilter,
2166                                   cxn->onThreshold/cxn->lowPassFilter) ;
2167               /* on and log */
2168             } else if ((cxn->filterValue < cxn->offThreshold) &&
2169                      !cxn->needsChecks) {
2170               cxn->needsChecks = true;
2171               hostLogNoCheckMode (cxn->myHost, false,
2172                                   cxn->offThreshold/cxn->lowPassFilter,
2173                                   cxn->filterValue/cxn->lowPassFilter,
2174                                   cxn->onThreshold/cxn->lowPassFilter) ;
2175               /* off and log */
2176             }
2177           }
2178
2179         /* Now handle possible remaining partial reponse and set up for
2180            next read. */
2181         if (*response != '\0')
2182           {                       /* partial response */
2183             unsigned int leftAmt = respSize - (response - bufBase) ;
2184
2185             d_printf (2,"%s:%d handling a partial response\n",
2186                      hostPeerName (cxn->myHost),cxn->ident) ;
2187
2188             /* first we shift what's left in the buffer down to the
2189                bottom, if needed, or just expand the buffer */
2190             if (response != bufBase)
2191               {
2192                 /* so next read appends */
2193                 memmove (bufBase, response, leftAmt) ;
2194                 bufferSetDataSize (cxn->respBuffer, leftAmt) ;
2195               }
2196             else if (!expandBuffer (cxn->respBuffer, BUFFER_EXPAND_AMOUNT))
2197               die ("%s:%d cxnsleep can't expand input buffer", peerName,
2198                    cxn->ident) ;
2199           }
2200         else
2201           bufferSetDataSize (cxn->respBuffer, 0) ;
2202
2203         bArr = makeBufferArray (bufferTakeRef (cxn->respBuffer), NULL) ;
2204
2205         if ( !prepareRead (e, bArr, responseIsRead, cxn, 1) )
2206           {
2207             warn ("%s:%d cxnsleep prepare read failed", peerName, cxn->ident) ;
2208             freeBufferArray (bArr) ;
2209             cxnWait (cxn) ;
2210             return ;
2211           }
2212         else
2213           {
2214             /* only setup the timer if we're still waiting for a response
2215                to something. There's not necessarily a 1-to-1 mapping
2216                between reads and writes in streaming mode. May have been
2217                set already above (that would be unlikely I think). */
2218             VALIDATE_CONNECTION (cxn) ;
2219
2220             d_printf (5,"%s:%d about to do some writes\n",
2221                      hostPeerName (cxn->myHost),cxn->ident) ;
2222
2223             doSomeWrites (cxn) ;
2224
2225             /* If the read timer is (still) running, update it to give
2226                those terminally slow hosts that take forever to drain
2227                the network buffers and just dribble out responses the
2228                benefit of the doubt.  XXX - maybe should just increase
2229                timeout for these! */
2230             if (cxn->readBlockedTimerId)
2231               cxn->readBlockedTimerId = updateSleep (cxn->readBlockedTimerId,
2232                                                      responseTimeoutCbk,
2233                                                      cxn->readTimeout,
2234                                                      cxn) ;
2235           }
2236         VALIDATE_CONNECTION (cxn) ;
2237         break ;
2238
2239       case cxnWaitingS:         /* presumably after a code 205 or 400 */
2240       case cxnConnectingS:      /* presumably after a code 205 or 400 */
2241       case cxnSleepingS:        /* probably after a 480 */
2242         break ;
2243
2244       case cxnDeadS:
2245         delConnection (cxn) ;
2246         break ;
2247
2248       case cxnStartingS:
2249       default:
2250         die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
2251     }
2252 }
2253
2254
2255
2256
2257 \f
2258 /*
2259  * called when the write of the QUIT command has completed.
2260  */
2261 static void quitWritten (EndPoint e, IoStatus i, Buffer *b, void *d)
2262 {
2263   Connection cxn = (Connection) d ;
2264   const char *peerName ;
2265
2266   peerName = hostPeerName (cxn->myHost) ;
2267
2268   clearTimer (cxn->writeBlockedTimerId) ;
2269
2270   ASSERT (cxn->myEp == e) ;
2271   VALIDATE_CONNECTION (cxn) ;
2272
2273   if (i != IoDone)
2274     {
2275       errno = endPointErrno (e) ;
2276       syswarn ("%s:%d cxnsleep can't write QUIT", peerName, cxn->ident) ;
2277       if (cxn->state == cxnClosingS)
2278         {
2279           cxnDead (cxn) ;
2280           delConnection (cxn) ;
2281         }
2282       else
2283         cxnWait (cxn) ;
2284     }
2285   else
2286     /* The QUIT command has been sent, so start the response timer. */
2287     initReadBlockedTimeout (cxn) ;
2288
2289   freeBufferArray (b) ;
2290 }
2291
2292
2293
2294
2295 \f
2296 /*
2297  * called when the write of the IHAVE-body data is finished
2298  */
2299 static void ihaveBodyDone (EndPoint e, IoStatus i, Buffer *b, void *d)
2300 {
2301   Connection cxn = (Connection) d ;
2302
2303   ASSERT (e == cxn->myEp) ;
2304
2305   clearTimer (cxn->writeBlockedTimerId) ;
2306
2307   if (i != IoDone)
2308     {
2309       errno = endPointErrno (e) ;
2310       syswarn ("%s:%d cxnsleep can't write IHAVE body",
2311                hostPeerName (cxn->myHost), cxn->ident) ;
2312
2313       cxnLogStats (cxn,true) ;
2314
2315       if (cxn->state == cxnClosingS)
2316         {
2317           cxnDead (cxn) ;
2318           delConnection (cxn) ;
2319         }
2320       else
2321         cxnSleep (cxn) ;
2322     }
2323   else
2324     /* The article has been sent, so start the response timer. */
2325     initReadBlockedTimeout (cxn) ;
2326
2327
2328   freeBufferArray (b) ;
2329
2330   return ;
2331 }
2332
2333
2334
2335
2336 \f
2337 /*
2338  * Called when a command set (IHAVE, CHECK, TAKETHIS) has been
2339  * written to the remote.
2340  */
2341 static void commandWriteDone (EndPoint e, IoStatus i, Buffer *b, void *d)
2342 {
2343   Connection cxn = (Connection) d ;
2344   const char *peerName ;
2345
2346   ASSERT (e == cxn->myEp) ;
2347
2348   peerName = hostPeerName (cxn->myHost) ;
2349
2350   freeBufferArray (b) ;
2351
2352   clearTimer (cxn->writeBlockedTimerId) ;
2353
2354   if (i != IoDone)
2355     {
2356       errno = endPointErrno (e) ;
2357       syswarn ("%s:%d cxnsleep can't write command", peerName, cxn->ident) ;
2358
2359       cxnLogStats (cxn,true) ;
2360
2361       if (cxn->state == cxnClosingS)
2362         {
2363           cxnDead (cxn) ;
2364           delConnection (cxn) ;
2365         }
2366       else
2367         {
2368           /* XXX - so cxnSleep() doesn't die in VALIDATE_CONNECTION () */
2369           deferAllArticles (cxn) ;
2370           cxnIdle (cxn) ;
2371
2372           cxnSleep (cxn) ;
2373         }
2374     }
2375   else
2376     {
2377       /* Some(?) hosts return the 439 response even before we're done
2378          sending, so don't go idle until here */
2379       if (cxn->state == cxnFeedingS && cxn->articleQTotal == 0)
2380         cxnIdle (cxn) ;
2381       else
2382         /* The command set has been sent, so start the response timer.
2383            XXX - we'd like finer grained control */
2384         initReadBlockedTimeout (cxn) ;
2385
2386       if ( cxn->doesStreaming )
2387         doSomeWrites (cxn) ;        /* pump data as fast as possible */
2388                                     /* XXX - will clear the read timeout */
2389     }
2390 }
2391
2392
2393
2394
2395 \f
2396 /*
2397  * Called when the MODE STREAM command has been written down the pipe.
2398  */
2399 static void modeCmdIssued (EndPoint e, IoStatus i, Buffer *b, void *d)
2400 {
2401   Connection cxn = (Connection) d ;
2402
2403   ASSERT (e == cxn->myEp) ;
2404
2405   clearTimer (cxn->writeBlockedTimerId) ;
2406
2407   /* The mode command has been sent, so start the response timer */
2408   initReadBlockedTimeout (cxn) ;
2409
2410   if (i != IoDone)
2411     {
2412       d_printf (1,"%s:%d MODE STREAM command failed to write\n",
2413                hostPeerName (cxn->myHost), cxn->ident) ;
2414
2415       syswarn ("%s:%d cxnsleep can't write MODE STREAM",
2416                hostPeerName (cxn->myHost), cxn->ident) ;
2417
2418       cxnSleepOrDie (cxn) ;
2419     }
2420
2421   freeBufferArray (b) ;
2422 }
2423
2424
2425
2426
2427 \f
2428 /*
2429  * Called when the AUTHINFO USER command has been written down the pipe.
2430  */
2431 static void authUserIssued (EndPoint e, IoStatus i, Buffer *b, void *d)
2432 {
2433   Connection cxn = (Connection) d ;
2434
2435   ASSERT (e == cxn->myEp) ;
2436
2437   clearTimer (cxn->writeBlockedTimerId) ;
2438
2439   /* The authinfo user command has been sent, so start the response timer */
2440   initReadBlockedTimeout (cxn) ;
2441
2442   if (i != IoDone)
2443     {
2444       d_printf (1,"%s:%d AUTHINFO USER command failed to write\n",
2445                hostPeerName (cxn->myHost), cxn->ident) ;
2446
2447       syswarn ("%s:%d cxnsleep can't write AUTHINFO USER",
2448                hostPeerName (cxn->myHost), cxn->ident) ;
2449
2450       cxnSleepOrDie (cxn) ;
2451     }
2452
2453   freeBufferArray (b) ;
2454 }
2455
2456
2457
2458
2459
2460 \f
2461 /*
2462  * Called when the AUTHINFO USER command has been written down the pipe.
2463  */
2464 static void authPassIssued (EndPoint e, IoStatus i, Buffer *b, void *d)
2465 {
2466   Connection cxn = (Connection) d ;
2467
2468   ASSERT (e == cxn->myEp) ;
2469
2470   clearTimer (cxn->writeBlockedTimerId) ;
2471
2472   /* The authinfo pass command has been sent, so start the response timer */
2473   initReadBlockedTimeout (cxn) ;
2474
2475   if (i != IoDone)
2476     {
2477       d_printf (1,"%s:%d AUTHINFO PASS command failed to write\n",
2478                hostPeerName (cxn->myHost), cxn->ident) ;
2479
2480       syswarn ("%s:%d cxnsleep can't write AUTHINFO PASS",
2481                hostPeerName (cxn->myHost), cxn->ident) ;
2482
2483       cxnSleepOrDie (cxn) ;
2484     }
2485
2486   freeBufferArray (b) ;
2487 }
2488
2489
2490
2491
2492
2493 \f
2494 /*
2495  * Called whenever some amount of data has been written to the pipe but
2496  * more data remains to be written
2497  */
2498 static void writeProgress (EndPoint e UNUSED, IoStatus i, Buffer *b UNUSED,
2499                            void *d)
2500 {
2501   Connection cxn = (Connection) d ;
2502
2503   ASSERT (i == IoProgress) ;
2504
2505   if (cxn->writeTimeout > 0)
2506     cxn->writeBlockedTimerId = updateSleep (cxn->writeBlockedTimerId,
2507                                             writeTimeoutCbk, cxn->writeTimeout,
2508                                             cxn) ;
2509 }
2510
2511
2512
2513
2514 \f
2515 /*
2516  * Timers.
2517  */
2518
2519 /*
2520  * This is called when the timeout for the reponse from the remote
2521  * goes off. We tear down the connection and notify our host.
2522  */
2523 static void responseTimeoutCbk (TimeoutId id, void *data)
2524 {
2525   Connection cxn = (Connection) data ;
2526   const char *peerName ;
2527
2528   ASSERT (id == cxn->readBlockedTimerId) ;
2529   ASSERT (cxn->state == cxnConnectingS ||
2530           cxn->state == cxnFeedingS ||
2531           cxn->state == cxnFlushingS ||
2532           cxn->state == cxnClosingS) ;
2533   VALIDATE_CONNECTION (cxn) ;
2534
2535   /* XXX - let abortConnection clear readBlockedTimerId, otherwise
2536      VALIDATE_CONNECTION() will croak */
2537
2538   peerName = hostPeerName (cxn->myHost) ;
2539
2540   warn ("%s:%d cxnsleep non-responsive connection", peerName, cxn->ident) ;
2541   d_printf (1,"%s:%d shutting down non-repsonsive connection\n",
2542            hostPeerName (cxn->myHost), cxn->ident) ;
2543
2544   cxnLogStats (cxn,true) ;
2545
2546   if (cxn->state == cxnClosingS)
2547     {
2548       abortConnection (cxn) ;
2549       delConnection (cxn) ;
2550     }
2551   else  
2552     cxnSleep (cxn) ;              /* will notify the Host */
2553 }
2554
2555
2556
2557
2558 \f
2559 /*
2560  * This is called when the data write timeout for the remote
2561  * goes off. We tear down the connection and notify our host.
2562  */
2563 static void writeTimeoutCbk (TimeoutId id, void *data)
2564 {
2565   Connection cxn = (Connection) data ;
2566   const char *peerName ;
2567
2568   ASSERT (id == cxn->writeBlockedTimerId) ;
2569   ASSERT (cxn->state == cxnConnectingS ||
2570           cxn->state == cxnFeedingS ||
2571           cxn->state == cxnFlushingS ||
2572           cxn->state == cxnClosingS) ;
2573   VALIDATE_CONNECTION (cxn) ;
2574
2575   /* XXX - let abortConnection clear writeBlockedTimerId, otherwise
2576      VALIDATE_CONNECTION() will croak */
2577
2578   peerName = hostPeerName (cxn->myHost) ;
2579
2580   warn ("%s:%d cxnsleep write timeout", peerName, cxn->ident) ;
2581   d_printf (1,"%s:%d shutting down non-responsive connection\n",
2582            hostPeerName (cxn->myHost), cxn->ident) ;
2583
2584   cxnLogStats (cxn,true) ;
2585
2586   if (cxn->state == cxnClosingS)
2587     {
2588       abortConnection (cxn) ;
2589       delConnection (cxn) ;
2590     }
2591   else  
2592     cxnSleep (cxn) ;              /* will notify the Host */
2593 }
2594
2595
2596
2597
2598 \f
2599 /*
2600  * Called by the EndPoint class when the timer goes off
2601  */
2602 void reopenTimeoutCbk (TimeoutId id, void *data)
2603 {
2604   Connection cxn = (Connection) data ;
2605
2606   ASSERT (id == cxn->sleepTimerId) ;
2607
2608   cxn->sleepTimerId = 0 ;
2609   
2610   if (cxn->state != cxnSleepingS)
2611     {
2612       warn ("%s:%d cxnsleep connection in bad state: %s",
2613             hostPeerName (cxn->myHost), cxn->ident,
2614             stateToString (cxn->state)) ;
2615       cxnSleepOrDie (cxn) ;
2616     }
2617   else
2618     cxnConnect (cxn) ;
2619 }
2620
2621
2622
2623
2624 \f
2625 /*
2626  * timeout callback to close down long running connection.
2627  */
2628 static void flushCxnCbk (TimeoutId id, void *data)
2629 {
2630   Connection cxn = (Connection) data ;
2631
2632   ASSERT (id == cxn->flushTimerId) ;
2633   VALIDATE_CONNECTION (cxn) ;
2634
2635   cxn->flushTimerId = 0 ;
2636
2637   if (!(cxn->state == cxnFeedingS || cxn->state == cxnConnectingS ||
2638         cxn->state == cxnIdleS))
2639     {
2640       warn ("%s:%d cxnsleep connection in bad state: %s",
2641             hostPeerName (cxn->myHost), cxn->ident,
2642             stateToString (cxn->state)) ;
2643       cxnSleepOrDie (cxn) ;
2644     }
2645   else
2646     {
2647       d_printf (1,"%s:%d Handling periodic connection close.\n",
2648                hostPeerName (cxn->myHost), cxn->ident) ;
2649
2650       notice ("%s:%d periodic close", hostPeerName (cxn->myHost), cxn->ident) ;
2651
2652       cxnFlush (cxn) ;
2653     }
2654 }
2655
2656
2657
2658
2659 \f
2660 /*
2661  * Timer callback for when the connection has not received an
2662  * article from INN. When that happens we tear down the network
2663  * connection to help recycle fds
2664  */
2665 static void articleTimeoutCbk (TimeoutId id, void *data)
2666 {
2667   Connection cxn = (Connection) data ;
2668   const char *peerName = hostPeerName (cxn->myHost) ;
2669
2670   ASSERT (cxn->artReceiptTimerId == id) ;
2671   VALIDATE_CONNECTION (cxn) ;
2672
2673   cxn->artReceiptTimerId = 0 ;
2674
2675   if (cxn->state != cxnIdleS)
2676     {
2677       warn ("%s:%d cxnsleep connection in bad state: %s",
2678             hostPeerName (cxn->myHost), cxn->ident,
2679             stateToString (cxn->state)) ;
2680       cxnSleepOrDie (cxn) ;
2681
2682       return ;
2683     }
2684
2685   /* it's doubtful (right?) that this timer could go off and there'd
2686      still be articles in the queue. */
2687   if (cxn->articleQTotal > 0)
2688     {
2689       warn ("%s:%d idle connection still has articles", peerName, cxn->ident) ;
2690     }
2691   else
2692     {
2693       notice ("%s:%d idle tearing down connection", peerName, cxn->ident) ;
2694       cxn->state = cxnIdleTimeoutS ;
2695       cxnFlush (cxn) ;
2696     }
2697 }
2698
2699
2700
2701
2702 \f
2703 /*
2704  * function to be called when the fd is not ready for reading, but there is
2705  * an article on tape or in the queue to be done. Things are done this way
2706  * so that a Connection doesn't hog time trying to find the next good
2707  * article for writing. With a large backlog of expired articles that would
2708  * take a long time. Instead the Connection just tries its next article on
2709  * tape or queue, and if that's no good then it registers this callback so
2710  * that other Connections have a chance of being serviced.
2711  */
2712 static void cxnWorkProc (EndPoint ep UNUSED, void *data)
2713 {
2714   Connection cxn = (Connection) data ;
2715
2716   d_printf (2,"%s:%d calling work proc\n",
2717            hostPeerName (cxn->myHost),cxn->ident) ;
2718
2719   if (writesNeeded (cxn))
2720     doSomeWrites (cxn) ;        /* may re-register the work proc... */
2721   else if (cxn->state == cxnFlushingS || cxn->state == cxnClosingS)
2722     {
2723       if (cxn->articleQTotal == 0)
2724         issueQUIT (cxn) ;
2725     }
2726   else
2727     d_printf (2,"%s:%d no writes were needed....\n",
2728              hostPeerName (cxn->myHost), cxn->ident) ;
2729 }
2730
2731
2732
2733 /****************************************************************************
2734  *
2735  * END EndPoint callback area.
2736  *
2737  ****************************************************************************/
2738
2739
2740
2741
2742 \f
2743 /****************************************************************************
2744  *
2745  * REPONSE CODE PROCESSING.
2746  *
2747  ***************************************************************************/
2748
2749
2750 /*
2751  * A connection needs to sleep, but if it's closing it needs to die instead.
2752  */
2753 static void cxnSleepOrDie (Connection cxn)
2754 {
2755   if (cxn->state == cxnClosingS)
2756     cxnDead (cxn) ;
2757   else
2758     cxnSleep (cxn) ;
2759 }
2760
2761
2762 /*
2763  * Handle the response 205 to our QUIT command, which means the
2764  * remote is going away and we can happily cleanup
2765  */
2766 static void processResponse205 (Connection cxn, char *response UNUSED)
2767 {
2768   bool immedRecon ;
2769
2770   VALIDATE_CONNECTION (cxn) ;
2771
2772   if (!(cxn->state == cxnFeedingS ||
2773         cxn->state == cxnIdleS ||
2774         cxn->state == cxnFlushingS ||
2775         cxn->state == cxnClosingS)) 
2776     {
2777       warn ("%s:%d cxnsleep connection in bad state: %s",
2778             hostPeerName (cxn->myHost), cxn->ident,
2779             stateToString (cxn->state)) ;
2780       cxnSleepOrDie (cxn) ;
2781       return ;
2782     }
2783
2784   switch (cxn->state)
2785     {
2786       case cxnFlushingS:
2787       case cxnClosingS:
2788         ASSERT (cxn->articleQTotal == 0) ;
2789
2790         cxnLogStats (cxn,true) ;
2791
2792         immedRecon = cxn->immedRecon ;
2793
2794         hostCxnDead (cxn->myHost,cxn) ;
2795
2796         if (cxn->state == cxnFlushingS && immedRecon)
2797           {
2798             abortConnection (cxn) ;
2799             if (!cxnConnect (cxn))
2800               notice ("%s:%d flush re-connect failed",
2801                       hostPeerName (cxn->myHost), cxn->ident) ;
2802           }
2803         else if (cxn->state == cxnFlushingS)
2804           cxnWait (cxn) ;
2805         else
2806           cxnDead (cxn) ;
2807         break ;
2808
2809       case cxnIdleS:
2810       case cxnFeedingS:
2811         /* this shouldn't ever happen... */
2812         warn ("%s:%d cxnsleep response unexpected: %d",
2813               hostPeerName (cxn->myHost), cxn->ident, 205) ;
2814         cxnSleepOrDie (cxn) ;
2815         break ;
2816
2817       default:
2818         die ("Bad connection state: %s\n",stateToString (cxn->state)) ;
2819     }
2820 }
2821
2822
2823
2824
2825 \f
2826 /*
2827  * Handle a response code of 238 which is the "no such article"
2828  * reply to the CHECK command (i.e. remote wants it).
2829  */
2830 static void processResponse238 (Connection cxn, char *response)
2831 {
2832   char *msgid ;
2833   ArtHolder artHolder ;
2834
2835   if (!cxn->doesStreaming)
2836     {
2837       warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
2838             " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
2839             response) ;
2840       cxnSleepOrDie (cxn) ;
2841       return ;
2842     }
2843   
2844   if (!(cxn->state == cxnFlushingS ||
2845         cxn->state == cxnFeedingS ||
2846         cxn->state == cxnClosingS))
2847     {
2848       warn ("%s:%d cxnsleep connection in bad state: %s",
2849             hostPeerName (cxn->myHost), cxn->ident,
2850             stateToString (cxn->state)) ;
2851       cxnSleepOrDie (cxn) ;
2852       return ;
2853     }
2854
2855   VALIDATE_CONNECTION (cxn) ;
2856
2857   msgid = getMsgId (response) ;
2858
2859   if (cxn->checkRespHead == NULL) /* peer is confused */
2860     {
2861       warn ("%s:%d cxnsleep response unexpected: %d",
2862             hostPeerName (cxn->myHost),cxn->ident,238) ;
2863       cxnSleepOrDie (cxn) ;
2864     }
2865   else if (msgid == NULL || strlen (msgid) == 0 ||
2866            (artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
2867     noSuchMessageId (cxn,238,msgid,response) ;
2868   else
2869     {
2870       /* now remove the article from the check queue and move it onto the
2871          transmit queue. Another function wil take care of transmitting */
2872       remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
2873       if (cxn->state != cxnClosingS)
2874         appendArtHolder (artHolder, &cxn->takeHead, &cxn->articleQTotal) ;
2875       else
2876         {
2877           hostTakeBackArticle (cxn->myHost, cxn, artHolder->article) ;
2878           delArtHolder (artHolder) ;
2879         }
2880     }
2881
2882   if (msgid != NULL)
2883     free (msgid) ;
2884 }
2885
2886
2887
2888
2889 \f
2890 /*
2891  * process the response "try again later" to the CHECK command If this
2892  * returns true then the connection is still usable.
2893  */
2894 static void processResponse431 (Connection cxn, char *response)
2895 {
2896   char *msgid ;
2897   ArtHolder artHolder ;
2898
2899   if (!cxn->doesStreaming)
2900     {
2901       warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
2902             " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
2903             response) ;
2904       cxnSleepOrDie (cxn) ;
2905       return ;
2906     }
2907   
2908   if (!(cxn->state == cxnFlushingS ||
2909         cxn->state == cxnFeedingS ||
2910         cxn->state == cxnClosingS))
2911     {
2912       warn ("%s:%d cxnsleep connection in bad state: %s",
2913             hostPeerName (cxn->myHost), cxn->ident,
2914             stateToString (cxn->state)) ;
2915       cxnSleepOrDie (cxn) ;
2916       return ;
2917     }
2918   
2919   VALIDATE_CONNECTION (cxn) ;
2920
2921   msgid = getMsgId (response) ;
2922
2923   if (cxn->checkRespHead == NULL) /* peer is confused */
2924     {
2925       warn ("%s:%d cxnsleep response unexpected: %d",
2926             hostPeerName (cxn->myHost),cxn->ident,431) ;
2927       cxnSleepOrDie (cxn) ;
2928     }
2929   else if (msgid == NULL || strlen (msgid) == 0 ||
2930            (artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
2931     noSuchMessageId (cxn,431,msgid,response) ;
2932   else
2933     {
2934       remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
2935       if (cxn->articleQTotal == 0)
2936         cxnIdle (cxn) ;
2937       hostArticleDeferred (cxn->myHost, cxn, artHolder->article) ;
2938       delArtHolder (artHolder) ;
2939     }
2940
2941   if (msgid != NULL)
2942     free (msgid) ;
2943 }
2944
2945
2946
2947
2948 \f
2949 /*
2950  * process the "already have it" response to the CHECK command.  If this
2951  * returns true then the connection is still usable.
2952  */
2953 static void processResponse438 (Connection cxn, char *response)
2954 {
2955   char *msgid ;
2956   ArtHolder artHolder ;
2957
2958   if (!cxn->doesStreaming)
2959     {
2960       warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
2961             " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
2962             response) ;
2963       cxnSleepOrDie (cxn) ;
2964       return ;
2965     }
2966   
2967   if (!(cxn->state == cxnFlushingS ||
2968         cxn->state == cxnFeedingS ||
2969         cxn->state == cxnClosingS))
2970     {
2971       warn ("%s:%d cxnsleep connection in bad state: %s",
2972             hostPeerName (cxn->myHost), cxn->ident,
2973             stateToString (cxn->state)) ;
2974       cxnSleepOrDie (cxn) ;
2975       return ;
2976     }
2977   
2978   VALIDATE_CONNECTION (cxn) ;
2979
2980   msgid = getMsgId (response) ;
2981
2982   if (cxn->checkRespHead == NULL) /* peer is confused */
2983     {
2984       warn ("%s:%d cxnsleep response unexpected: %d",
2985             hostPeerName (cxn->myHost),cxn->ident,438) ;
2986       cxnSleepOrDie (cxn) ;
2987     }
2988   else if (msgid == NULL || strlen (msgid) == 0 ||
2989            (artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
2990     noSuchMessageId (cxn,438,msgid,response) ;
2991   else
2992     {
2993       cxn->checksRefused++ ;
2994
2995       remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
2996       if (cxn->articleQTotal == 0)
2997         cxnIdle (cxn) ;
2998       hostArticleNotWanted (cxn->myHost, cxn, artHolder->article);
2999       delArtHolder (artHolder) ;
3000     }
3001
3002   if (msgid != NULL)
3003     free (msgid) ;
3004 }
3005
3006
3007
3008
3009 \f
3010 /*
3011  * process the "article transferred ok" response to the TAKETHIS.
3012  */
3013 static void processResponse239 (Connection cxn, char *response)
3014 {
3015   char *msgid ;
3016   ArtHolder artHolder ;
3017
3018   if (!cxn->doesStreaming)
3019     {
3020       warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
3021             " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
3022             response) ;
3023       cxnSleepOrDie (cxn) ;
3024       return ;
3025     }
3026   
3027   if (!(cxn->state == cxnFlushingS ||
3028         cxn->state == cxnFeedingS ||
3029         cxn->state == cxnClosingS))
3030     {
3031       warn ("%s:%d cxnsleep connection in bad state: %s",
3032             hostPeerName (cxn->myHost), cxn->ident,
3033             stateToString (cxn->state)) ;
3034       cxnSleepOrDie (cxn) ;
3035       return ;
3036     }
3037
3038   VALIDATE_CONNECTION (cxn) ;
3039
3040   msgid = getMsgId (response) ;
3041
3042   if (cxn->takeRespHead == NULL) /* peer is confused */
3043     {
3044       warn ("%s:%d cxnsleep response unexpected: %d",
3045             hostPeerName (cxn->myHost),cxn->ident,239) ;
3046       cxnSleepOrDie (cxn) ;
3047     }
3048   else if (msgid == NULL || strlen (msgid) == 0 ||
3049            (artHolder = artHolderByMsgId (msgid, cxn->takeRespHead)) == NULL)
3050     noSuchMessageId (cxn,239,msgid,response) ;
3051   else
3052     {
3053       cxn->takesOkayed++ ;
3054       cxn->takesSizeOkayed += artSize(artHolder->article);
3055
3056       remArtHolder (artHolder, &cxn->takeRespHead, &cxn->articleQTotal) ;
3057       if (cxn->articleQTotal == 0)
3058         cxnIdle (cxn) ;
3059       hostArticleAccepted (cxn->myHost, cxn, artHolder->article) ;
3060       delArtHolder (artHolder) ;
3061     }
3062
3063   if (msgid != NULL)
3064     free (msgid) ;
3065 }
3066
3067
3068 \f
3069 /*
3070  *  Set the thresholds for no-CHECK mode; negative means leave existing value
3071  */
3072
3073 void cxnSetCheckThresholds (Connection cxn,
3074                             double lowFilter, double highFilter,
3075                             double lowPassFilter)
3076 {
3077   /* Adjust current value for new scaling */
3078   if (cxn->lowPassFilter > 0.0)
3079     cxn->filterValue = cxn->filterValue / cxn->lowPassFilter * lowPassFilter;
3080
3081   /* Stick in new values */
3082   if (highFilter >= 0)
3083     cxn->onThreshold = highFilter * lowPassFilter / 100.0;
3084   if (lowFilter >= 0)
3085     cxn->offThreshold = lowFilter * lowPassFilter / 100.0;
3086   cxn->lowPassFilter = lowPassFilter;
3087 }
3088
3089 \f
3090 /*
3091  *  Blow away the connection gracelessly and immedately clean up
3092  */
3093 void cxnNuke (Connection cxn)
3094 {
3095   abortConnection (cxn) ;
3096   hostCxnDead (cxn->myHost,cxn) ;
3097   delConnection(cxn) ;
3098 }
3099
3100 \f
3101 /*
3102  * process a "article rejected do not try again" response to the
3103  * TAKETHIS.
3104  */
3105 static void processResponse439 (Connection cxn, char *response)
3106 {
3107   char *msgid ;
3108   ArtHolder artHolder ;
3109
3110   if (!cxn->doesStreaming)
3111     {
3112       warn ("%s:%d cxnsleep unexpected streaming response for non-streaming"
3113             " connection: %s", hostPeerName (cxn->myHost), cxn->ident,
3114             response) ;
3115       cxnSleepOrDie (cxn) ;
3116       return ;
3117     }
3118   
3119   if (!(cxn->state == cxnFlushingS ||
3120         cxn->state == cxnFeedingS ||
3121         cxn->state == cxnClosingS))
3122     {
3123       warn ("%s:%d cxnsleep connection in bad state: %s",
3124             hostPeerName (cxn->myHost), cxn->ident,
3125             stateToString (cxn->state)) ;
3126       cxnSleepOrDie (cxn) ;
3127       return ;
3128     }
3129
3130   VALIDATE_CONNECTION (cxn) ;
3131
3132   msgid = getMsgId (response) ;
3133
3134   if (cxn->takeRespHead == NULL) /* peer is confused */
3135     {
3136       /* NNTPRelay return 439 for check <messid> if messid is bad */
3137       if (cxn->checkRespHead == NULL) /* peer is confused */
3138         {
3139           warn ("%s:%d cxnsleep response unexpected: %d",
3140                 hostPeerName (cxn->myHost),cxn->ident,439) ;
3141           cxnSleepOrDie (cxn) ;
3142         }
3143       else
3144         {
3145           if ((artHolder = artHolderByMsgId (msgid, cxn->checkRespHead)) == NULL)
3146             noSuchMessageId (cxn,439,msgid,response) ;
3147           else
3148             {
3149               cxn->checksRefused++ ;
3150               remArtHolder (artHolder, &cxn->checkRespHead, &cxn->articleQTotal) ;
3151               if (cxn->articleQTotal == 0)
3152                 cxnIdle (cxn) ;
3153               hostArticleNotWanted (cxn->myHost, cxn, artHolder->article);
3154               delArtHolder (artHolder) ;
3155             }
3156         }
3157     }
3158   else if (msgid == NULL || strlen (msgid) == 0 ||
3159            (artHolder = artHolderByMsgId (msgid, cxn->takeRespHead)) == NULL)
3160     noSuchMessageId (cxn,439,msgid,response) ;
3161   else
3162     {
3163       cxn->takesRejected++ ;
3164       cxn->takesSizeRejected += artSize(artHolder->article);
3165
3166       remArtHolder (artHolder, &cxn->takeRespHead, &cxn->articleQTotal) ;
3167       /* Some(?) hosts return the 439 response even before we're done
3168           sending */
3169       if (cxn->articleQTotal == 0 && !writeIsPending(cxn->myEp))
3170         cxnIdle (cxn) ;
3171       hostArticleRejected (cxn->myHost, cxn, artHolder->article) ;
3172       delArtHolder (artHolder) ;
3173     }
3174
3175   if (msgid != NULL)
3176     free (msgid) ;
3177 }
3178
3179
3180
3181
3182
3183 \f
3184 /*
3185  * process the "article transferred ok" response to the IHAVE-body.
3186  */
3187 static void processResponse235 (Connection cxn, char *response UNUSED)
3188 {
3189   ArtHolder artHolder ;
3190
3191   if (cxn->doesStreaming)
3192     {
3193       warn ("%s:%d cxnsleep unexpected non-streaming response for"
3194             " streaming connection: %s", hostPeerName (cxn->myHost),
3195             cxn->ident,response) ;
3196       cxnSleepOrDie (cxn) ;
3197       return ;
3198     }
3199
3200   if (!(cxn->state == cxnFlushingS ||
3201         cxn->state == cxnFeedingS ||
3202         cxn->state == cxnClosingS))
3203     {
3204       warn ("%s:%d cxnsleep connection in bad state: %s",
3205             hostPeerName (cxn->myHost), cxn->ident,
3206             stateToString (cxn->state)) ;
3207       cxnSleepOrDie (cxn) ;
3208       return ;
3209     }
3210
3211   ASSERT (cxn->articleQTotal == 1) ;
3212   ASSERT (cxn->takeRespHead != NULL) ;
3213   VALIDATE_CONNECTION (cxn) ;
3214
3215   if (cxn->takeRespHead == NULL) /* peer is confused */
3216     {
3217       warn ("%s:%d cxnsleep response unexpected: %d",
3218             hostPeerName (cxn->myHost),cxn->ident,235) ;
3219       cxnSleepOrDie (cxn) ;
3220     }
3221   else
3222     {
3223       /* now remove the article from the queue and tell the Host to forget
3224          about it. */
3225       artHolder = cxn->takeRespHead ;
3226       
3227       cxn->takeRespHead = NULL ;
3228       cxn->articleQTotal = 0 ;
3229       cxn->takesOkayed++ ;
3230       cxn->takesSizeOkayed += artSize(artHolder->article);
3231       
3232       if (cxn->articleQTotal == 0)
3233         cxnIdle (cxn) ;
3234
3235       hostArticleAccepted (cxn->myHost, cxn, artHolder->article) ;
3236       delArtHolder (artHolder) ;
3237     }
3238 }
3239
3240
3241
3242
3243 \f
3244 /*
3245  * process the "send article to be transfered" reponse to the IHAVE.
3246  */
3247 static void processResponse335 (Connection cxn, char *response UNUSED)
3248 {
3249   if (cxn->doesStreaming)
3250     {
3251       warn ("%s:%d cxnsleep unexpected non-streaming response for"
3252             " streaming connection: %s", hostPeerName (cxn->myHost),
3253             cxn->ident,response) ;
3254       cxnSleepOrDie (cxn) ;
3255       return ;
3256     }
3257
3258   if (!(cxn->state == cxnFlushingS ||
3259         cxn->state == cxnFeedingS ||
3260         cxn->state == cxnClosingS))
3261     {
3262       warn ("%s:%d cxnsleep connection in bad state: %s",
3263             hostPeerName (cxn->myHost), cxn->ident,
3264             stateToString (cxn->state)) ;
3265       cxnSleepOrDie (cxn) ;
3266       return ;
3267     }
3268
3269   if (cxn->checkRespHead == NULL)
3270     {
3271       warn ("%s:%d cxnsleep response unexpected: %d",
3272             hostPeerName (cxn->myHost),cxn->ident,335) ;
3273       cxnSleepOrDie (cxn) ;
3274     }
3275   else 
3276     {
3277       VALIDATE_CONNECTION (cxn) ;
3278       /* now move the article into the third queue */
3279       cxn->takeHead = cxn->checkRespHead ;
3280       cxn->checkRespHead = NULL ;
3281       
3282       issueIHAVEBody (cxn) ;
3283     }
3284 }
3285
3286
3287
3288
3289 \f
3290 /*
3291  * process the "not accepting articles" response. This could be to any of
3292  * the IHAVE/CHECK/TAKETHIS command, but not the banner--that's handled
3293  * elsewhere.
3294  */
3295 static void processResponse400 (Connection cxn, char *response)
3296 {
3297   if (!(cxn->state == cxnFlushingS ||
3298         cxn->state == cxnFeedingS ||
3299         cxn->state == cxnIdleS ||
3300         cxn->state == cxnClosingS))
3301     {
3302       warn ("%s:%d cxnsleep connection in bad state: %s",
3303             hostPeerName (cxn->myHost), cxn->ident,
3304             stateToString (cxn->state)) ;
3305       cxnSleepOrDie (cxn) ;
3306       return ;
3307     }
3308
3309   VALIDATE_CONNECTION (cxn) ;
3310
3311   /* We may get a response 400 multiple times when in streaming mode. */
3312   notice ("%s:%d remote cannot accept articles: %s",
3313           hostPeerName(cxn->myHost), cxn->ident, response) ;
3314   
3315   /* right here there may still be data queued to write and so we'll fail
3316      trying to issue the quit ('cause a write will be pending). Furthermore,
3317      the data pending may be half way through an command, and so just
3318      tossing the buffer is nt sufficient. But figuring out where we are and
3319      doing a tidy job is hard */
3320   if (writeIsPending (cxn->myEp))
3321     cxnSleepOrDie (cxn) ;
3322   else
3323     {
3324       if (cxn->articleQTotal > 0)
3325         {
3326           /* Defer the articles here so that cxnFlush() doesn't set up an
3327              immediate reconnect. */
3328           deferAllArticles (cxn) ;
3329           clearTimer (cxn->readBlockedTimerId) ;
3330           /* XXX - so cxnSleep() doesn't die when it validates the connection */
3331           cxnIdle (cxn) ;
3332         }
3333       /* XXX - it would be nice if we QUIT first, but we'd have to go
3334          into a state where we just search for the 205 response, and
3335          only go into the sleep state at that point */
3336       cxnSleepOrDie (cxn) ;
3337     }
3338 }
3339
3340
3341
3342
3343 \f
3344 /*
3345  * process the "not wanted" reponse to the IHAVE.
3346  */
3347 static void processResponse435 (Connection cxn, char *response UNUSED)
3348 {
3349   ArtHolder artHolder ;
3350
3351   if (cxn->doesStreaming)
3352     {
3353       warn ("%s:%d cxnsleep unexpected non-streaming response for"
3354             " streaming connection: %s", hostPeerName (cxn->myHost),
3355             cxn->ident,response) ;
3356       cxnSleepOrDie (cxn) ;
3357       return ;
3358     }
3359
3360   if (!(cxn->state == cxnFlushingS ||
3361         cxn->state == cxnFeedingS ||
3362         cxn->state == cxnClosingS))
3363     {
3364       warn ("%s:%d cxnsleep connection in bad state: %s",
3365             hostPeerName (cxn->myHost), cxn->ident,
3366             stateToString (cxn->state)) ;
3367       cxnSleepOrDie (cxn) ;
3368       return ;
3369     }
3370
3371   /* Some servers, such as early versions of Diablo, had a bug where they'd
3372      respond with a 435 code (which should only be used for refusing an
3373      article before it was offered) after an article has been sent. */
3374   if (cxn->checkRespHead == NULL)
3375     {
3376       warn ("%s:%d cxnsleep response unexpected: %d",
3377             hostPeerName (cxn->myHost), cxn->ident, 435) ;
3378       cxnSleepOrDie (cxn) ;
3379       return ;
3380     }
3381
3382   ASSERT (cxn->articleQTotal == 1) ;
3383   VALIDATE_CONNECTION (cxn) ;
3384
3385   cxn->articleQTotal-- ;
3386   cxn->checksRefused++ ;
3387
3388   artHolder = cxn->checkRespHead ;
3389   cxn->checkRespHead = NULL ;
3390
3391   if (cxn->articleQTotal == 0 && !writeIsPending(cxn->myEp))
3392     cxnIdle (cxn) ;
3393
3394   hostArticleNotWanted (cxn->myHost, cxn, artHolder->article) ;
3395   delArtHolder (artHolder) ;
3396
3397 #if 0
3398   d_printf (1,"%s:%d On exiting 435 article queue total is %d (%d %d %d %d)\n",
3399            hostPeerName (cxn->myHost), cxn->ident,
3400            cxn->articleQTotal,
3401            (int) (cxn->checkHead != NULL),
3402            (int) (cxn->checkRespHead != NULL),
3403            (int) (cxn->takeHead != NULL),
3404            (int) (cxn->takeRespHead != NULL));
3405 #endif
3406 }
3407
3408
3409
3410
3411 \f
3412 /*
3413  * process the "transfer failed" response to the IHAVE-body, (seems this
3414  * can come from the IHAVE too).
3415  */
3416 static void processResponse436 (Connection cxn, char *response UNUSED)
3417 {
3418   ArtHolder artHolder ;
3419
3420   if (cxn->doesStreaming)
3421     {
3422       warn ("%s:%d cxnsleep unexpected non-streaming response for"
3423             " streaming connection: %s", hostPeerName (cxn->myHost),
3424             cxn->ident,response) ;
3425       cxnSleepOrDie (cxn) ;
3426       return ;
3427     }
3428
3429   if (!(cxn->state == cxnFlushingS ||
3430         cxn->state == cxnFeedingS ||
3431         cxn->state == cxnClosingS))
3432     {
3433       warn ("%s:%d cxnsleep connection in bad state: %s",
3434             hostPeerName (cxn->myHost), cxn->ident,
3435             stateToString (cxn->state)) ;
3436       cxnSleepOrDie (cxn) ;
3437       return ;
3438     }
3439
3440   ASSERT (cxn->articleQTotal == 1) ;
3441   ASSERT (cxn->takeRespHead != NULL || cxn->checkRespHead != NULL) ;
3442   VALIDATE_CONNECTION (cxn) ;
3443
3444   cxn->articleQTotal-- ;
3445
3446   if (cxn->takeRespHead != NULL) /* IHAVE-body command barfed */
3447     {
3448       artHolder = cxn->takeRespHead ;
3449       cxn->takeRespHead = NULL ;
3450     }
3451   else                          /* IHAVE command barfed */
3452     {
3453       artHolder = cxn->checkRespHead ;
3454       cxn->checkRespHead = NULL ;
3455     }
3456
3457   if (cxn->articleQTotal == 0 && !writeIsPending(cxn->myEp))
3458     cxnIdle (cxn) ;
3459   
3460   hostArticleDeferred (cxn->myHost, cxn, artHolder->article) ;
3461   delArtHolder (artHolder) ;
3462 }
3463
3464
3465
3466
3467 \f
3468 /*
3469  * Process the "article rejected do not try again" response to the
3470  * IHAVE-body.
3471  */
3472 static void processResponse437 (Connection cxn, char *response UNUSED)
3473 {
3474   ArtHolder artHolder ;
3475
3476   if (cxn->doesStreaming)
3477     {
3478       warn ("%s:%d cxnsleep unexpected non-streaming response for"
3479             " streaming connection: %s", hostPeerName (cxn->myHost),
3480             cxn->ident,response) ;
3481       cxnSleepOrDie (cxn) ;
3482       return ;
3483     }
3484
3485   if (!(cxn->state == cxnFlushingS ||
3486         cxn->state == cxnFeedingS ||
3487         cxn->state == cxnClosingS))
3488     {
3489       warn ("%s:%d cxnsleep connection in bad state: %s",
3490             hostPeerName (cxn->myHost), cxn->ident,
3491             stateToString (cxn->state)) ;
3492       cxnSleepOrDie (cxn) ;
3493       return ;
3494     }
3495
3496   ASSERT (cxn->articleQTotal == 1) ;
3497   ASSERT (cxn->takeRespHead != NULL) ;
3498   VALIDATE_CONNECTION (cxn) ;
3499
3500   cxn->articleQTotal-- ;
3501   cxn->takesRejected++ ;
3502
3503   artHolder = cxn->takeRespHead ;
3504   cxn->takeRespHead = NULL ;
3505   cxn->takesSizeRejected += artSize(artHolder->article);
3506
3507   /* Some servers return the 437 response before we're done sending. */
3508   if (cxn->articleQTotal == 0 && !writeIsPending (cxn->myEp))
3509     cxnIdle (cxn) ;
3510
3511   hostArticleRejected (cxn->myHost, cxn, artHolder->article) ;
3512   delArtHolder (artHolder) ;
3513 }
3514
3515
3516 /* Process the response 480 Transfer permission defined. We're probably
3517    talking to a remote nnrpd on a system that forgot to put us in
3518    the hosts.nntp */
3519 static void processResponse480 (Connection cxn, char *response UNUSED)
3520 {
3521   if (cxn->doesStreaming)
3522     {
3523       warn ("%s:%d cxnsleep unexpected non-streaming response for"
3524             " streaming connection: %s", hostPeerName (cxn->myHost),
3525             cxn->ident,response) ;
3526       cxnSleepOrDie (cxn) ;
3527       return ;
3528     }
3529
3530   if (!(cxn->state == cxnFlushingS ||
3531         cxn->state == cxnFeedingS ||
3532         cxn->state == cxnClosingS))
3533     {
3534       warn ("%s:%d cxnsleep connection in bad state: %s",
3535             hostPeerName (cxn->myHost), cxn->ident,
3536             stateToString (cxn->state)) ;
3537       cxnSleepOrDie (cxn) ;
3538       return ;
3539     }
3540
3541   VALIDATE_CONNECTION (cxn) ;
3542
3543   warn ("%s:%d cxnsleep transfer permission denied",
3544         hostPeerName (cxn->myHost), cxn->ident) ;
3545   
3546   if (cxn->state == cxnClosingS)
3547     cxnDead (cxn) ;
3548   else
3549     cxnSleep (cxn) ;
3550 }
3551
3552
3553
3554
3555 \f
3556 /*
3557  * Handle the response 503, which means the timeout of nnrpd.
3558  */
3559 static void processResponse503 (Connection cxn, char *response UNUSED)
3560 {
3561   bool immedRecon ;
3562
3563   VALIDATE_CONNECTION (cxn) ;
3564
3565   if (!(cxn->state == cxnFeedingS ||
3566         cxn->state == cxnIdleS ||
3567         cxn->state == cxnFlushingS ||
3568         cxn->state == cxnClosingS))
3569     {
3570       warn ("%s:%d cxnsleep connection in bad state: %s",
3571             hostPeerName (cxn->myHost), cxn->ident,
3572             stateToString (cxn->state)) ;
3573       cxnSleepOrDie (cxn) ;
3574       return ;
3575     }
3576
3577   if (cxn->articleQTotal != 0)
3578     notice ("%s:%d flush re-connect failed", hostPeerName (cxn->myHost),
3579             cxn->ident) ;