chiark / gitweb /
wip compile
[inn-innduct.git] / innd / chan.c
1 /*  $Id: chan.c 6720 2004-05-16 20:54:25Z rra $
2 **
3 **  I/O channel (and buffer) processing.
4 */
5
6 #include "config.h"
7 #include "clibrary.h"
8
9 /* Needed on AIX 4.1 to get fd_set and friends. */
10 #ifdef HAVE_SYS_SELECT_H
11 # include <sys/select.h>
12 #endif
13
14 #include "inn/innconf.h"
15 #include "innd.h"
16
17 /* These errno values don't exist on all systems, but may be returned as an
18    (ignorable) error to setting the accept socket nonblocking.  Define them
19    to 0 if they don't exist so that we can unconditionally compare errno to
20    them in the code. */
21 #ifndef ENOTSOCK
22 # define ENOTSOCK 0
23 #endif
24 #ifndef ENOTTY
25 # define ENOTTY 0
26 #endif
27
28 static const char * const timer_name[] = {
29     "idle", "artclean", "artwrite", "artcncl", "sitesend", "overv",
30     "perl", "python", "nntpread", "artparse", "artlog", "datamove"
31 };
32
33 /* Minutes - basically, keep the connection open but idle */
34 #define PAUSE_BEFORE_DROP               5
35
36 /* Divisor of the BUFFER size. If the amount free at the beginning of the
37    buffer is bigger than the quotient, then it is compacted in the
38    readloop */
39 #define COMP_THRESHOLD 10
40
41 static fd_set   RCHANmask;
42 static fd_set   SCHANmask;
43 static fd_set   WCHANmask;
44 static int      SCHANcount;
45 static int      CHANlastfd;
46 static int      CHANlastsleepfd;
47 static int      CHANccfd;
48 static int      CHANtablesize;
49 static CHANNEL  *CHANtable;
50 static CHANNEL  *CHANcc;
51 static CHANNEL  CHANnull = { CTfree, CSerror, -1 };
52
53 #define PRIORITISE_REMCONN
54 #ifdef PRIORITISE_REMCONN
55 static int      *CHANrcfd;
56 static CHANNEL  **CHANrc;
57 static int      chanlimit;
58 #endif /* PRIORITISE_REMCONN */
59
60 /*
61 ** Tear down our world
62 */
63 void
64 CHANshutdown(void)
65 {
66   CHANNEL               *cp;
67   int                   i;
68
69   if (CHANtable) {
70     for (i = CHANtablesize, cp = &CHANtable[0]; --i >= 0; cp++) {
71       if (cp->In.data) {
72         free(cp->In.data);
73       }
74       if (cp->Out.data) {
75         free(cp->Out.data);
76       }
77     }
78     free(CHANtable);
79     CHANtable = NULL;
80   }
81 }
82
83 /*
84 **  Initialize all the I/O channels.
85 */
86 void
87 CHANsetup(int i)
88 {
89     CHANNEL             *cp;
90
91     FD_ZERO(&RCHANmask);
92     FD_ZERO(&SCHANmask);
93     FD_ZERO(&WCHANmask);
94     CHANshutdown();
95     CHANtablesize = i;
96     CHANtable = xcalloc(CHANtablesize, sizeof(CHANNEL));
97     CHANnull.NextLog = innconf->chaninacttime;
98     memset(&CHANnull.Address, 0, sizeof(CHANnull.Address));
99     for (cp = CHANtable; --i >= 0; cp++)
100         *cp = CHANnull;
101 }
102
103
104 /*
105 **  Create a channel from a descriptor.
106 */
107 CHANNEL *
108 CHANcreate(int fd, CHANNELTYPE Type, CHANNELSTATE State,
109            innd_callback_t Reader, innd_callback_t WriteDone)
110 {
111     CHANNEL             *cp;
112     struct buffer       in  = { 0, 0, 0, NULL };
113     struct buffer       out = { 0, 0, 0, NULL };
114
115     cp = &CHANtable[fd];
116
117     /* Don't overwrite the buffers with CHANnull. */
118     in = cp->In;
119     buffer_resize(&in, START_BUFF_SIZE);
120     in.used = 0;
121     in.left = in.size;
122     out = cp->Out;
123     buffer_resize(&out, SMBUF);
124     buffer_set(&out, "", 0);
125
126     /* Set up the channel's info. */
127     *cp = CHANnull;
128     cp->fd = fd;
129     cp->Type = Type;
130     cp->State = State;
131     cp->Streaming = false;
132     cp->Skip = false;
133     cp->NoResendId = false;
134     cp->privileged = false;
135     cp->Ihave = cp->Ihave_Duplicate = cp->Ihave_Deferred = cp->Ihave_SendIt = cp->Ihave_Cybercan = 0;
136     cp->Check = cp->Check_send = cp->Check_deferred = cp->Check_got = cp->Check_cybercan = 0;
137     cp->Takethis = cp->Takethis_Ok = cp->Takethis_Err = 0;
138     cp->Size = cp->Duplicate = 0;
139     cp->Unwanted_s = cp->Unwanted_f = cp->Unwanted_d = 0;
140     cp->Unwanted_g = cp->Unwanted_u = cp->Unwanted_o = 0;
141     cp->Reader = Reader;
142     cp->WriteDone = WriteDone;
143     cp->Started = cp->LastActive = Now.time;
144     cp->In = in;
145     cp->Out = out;
146     cp->Tracing = Tracing;
147     cp->Sendid.size = 0;
148     cp->Next=0;
149     cp->MaxCnx=0;
150     cp->ActiveCnx=0;
151     cp->ArtBeg = 0;
152     cp->ArtMax = 0;
153     cp->Start = 0;
154     HashClear(&cp->CurrentMessageIDHash);
155     memset(cp->PrecommitWIP, '\0', sizeof(cp->PrecommitWIP));
156     cp->PrecommitiCachenext=0;
157     ARTprepare(cp);
158
159     close_on_exec(fd, true);
160
161 #ifndef _HPUX_SOURCE
162     /* Stupid HPUX 11.00 has a broken listen/accept where setting the listen
163        socket to nonblocking prevents you from successfully setting the
164        socket returned by accept(2) back to blocking mode, no matter what,
165        resulting in all kinds of funny behaviour, data loss, etc. etc.  */
166     if (nonblocking(fd, true) < 0 && errno != ENOTSOCK && errno != ENOTTY)
167         syslog(L_ERROR, "%s cant nonblock %d %m", LogName, fd);
168 #endif
169
170     /* Note control channel, for efficiency. */
171     if (Type == CTcontrol) {
172         CHANcc = cp;
173         CHANccfd = fd;
174     }
175 #ifdef PRIORITISE_REMCONN
176     /* Note remconn channel, for efficiency */
177     if (Type == CTremconn) {
178         int j;
179         for (j = 0 ; j < chanlimit ; j++ ) {
180             if (CHANrcfd[j] == -1) {
181                 break;
182             }
183         }
184         if (j < chanlimit) {
185             CHANrc[j] = cp;
186             CHANrcfd[j] = fd;
187         } else if (chanlimit == 0) {
188             /* assuming two file descriptors(AF_INET and AF_INET6) */
189             chanlimit = 2;
190             CHANrc = xmalloc(chanlimit * sizeof(CHANNEL **));
191             CHANrcfd = xmalloc(chanlimit * sizeof(int *));
192             for (j = 0 ; j < chanlimit ; j++ ) {
193                 CHANrc[j] = NULL;
194                 CHANrcfd[j] = -1;
195             }
196             CHANrc[0] = cp;
197             CHANrcfd[0] = fd;
198         } else {
199             /* extend to double size */
200             CHANrc = xrealloc(CHANrc, chanlimit * 2 * sizeof(CHANNEL **));
201             CHANrcfd = xrealloc(CHANrcfd, chanlimit * 2 * sizeof(int *));
202             for (j = chanlimit ; j < chanlimit * 2 ; j++ ) {
203                 CHANrc[j] = NULL;
204                 CHANrcfd[j] = -1;
205             }
206             CHANrc[chanlimit] = cp;
207             CHANrcfd[chanlimit] = fd;
208             chanlimit *= 2;
209         }
210     }
211 #endif /* PRIORITISE_REMCONN */
212     return cp;
213 }
214
215
216 /*
217 **  Start tracing a channel.
218 */
219 void
220 CHANtracing(CHANNEL *cp, bool Flag)
221 {
222     char                *p;
223
224     p = CHANname(cp);
225     syslog(L_NOTICE, "%s trace %s", p, Flag ? "on" : "off");
226     cp->Tracing = Flag;
227     if (Flag) {
228         syslog(L_NOTICE, "%s trace badwrites %d blockwrites %d badreads %d",
229             p, cp->BadWrites, cp->BlockedWrites, cp->BadReads);
230         syslog(L_NOTICE, "%s trace address %s lastactive %ld nextlog %ld",
231             p, sprint_sockaddr((struct sockaddr *)&cp->Address),
232             (long) cp->LastActive, (long) cp->NextLog);
233         if (FD_ISSET(cp->fd, &SCHANmask))
234             syslog(L_NOTICE, "%s trace sleeping %ld 0x%p",
235                 p, (long)cp->Waketime, (void *)cp->Waker);
236         if (FD_ISSET(cp->fd, &RCHANmask))
237             syslog(L_NOTICE, "%s trace reading %lu %s",
238                 p, (unsigned long) cp->In.used,
239                 MaxLength(cp->In.data, cp->In.data));
240         if (FD_ISSET(cp->fd, &WCHANmask))
241             syslog(L_NOTICE, "%s trace writing %lu %s",
242                 p, (unsigned long) cp->Out.left,
243                 MaxLength(cp->Out.data, cp->Out.data));
244     }
245 }
246
247
248 /*
249 **  Close a channel.
250 */
251 void
252 CHANclose(CHANNEL *cp, const char *name)
253 {
254     char        *label, *tmplabel, buff[SMBUF];
255
256     if (cp->Type == CTfree)
257         syslog(L_ERROR, "%s internal closing free channel %d", name, cp->fd);
258     else {
259         if (cp->Type == CTnntp) {
260             WIPprecomfree(cp);
261             NCclearwip(cp);
262             if (cp->State == CScancel)
263                 syslog(L_NOTICE,
264                "%s closed seconds %ld cancels %ld",
265                name, (long)(Now.time - cp->Started),
266                cp->Received);
267             else {
268             snprintf(buff, sizeof(buff),
269                      "accepted size %.0f duplicate size %.0f", cp->Size,
270                      cp->DuplicateSize);
271             syslog(L_NOTICE,
272                 "%s closed seconds %ld accepted %ld refused %ld rejected %ld duplicate %ld %s",
273                 name, (long)(Now.time - cp->Started),
274                 cp->Received, cp->Refused, cp->Rejected,
275                 cp->Duplicate, buff);
276             }
277             if (cp->Data.Newsgroups.Data != NULL) {
278                 free(cp->Data.Newsgroups.Data);
279                 cp->Data.Newsgroups.Data = NULL;
280             }
281             if (cp->Data.Newsgroups.List != NULL) {
282                 free(cp->Data.Newsgroups.List);
283                 cp->Data.Newsgroups.List = NULL;
284             }
285             if (cp->Data.Distribution.Data != NULL) {
286                 free(cp->Data.Distribution.Data);
287                 cp->Data.Distribution.Data = NULL;
288             }
289             if (cp->Data.Distribution.List != NULL) {
290                 free(cp->Data.Distribution.List);
291                 cp->Data.Distribution.List = NULL;
292             }
293             if (cp->Data.Path.Data != NULL) {
294                 free(cp->Data.Path.Data);
295                 cp->Data.Path.Data = NULL;
296             }
297             if (cp->Data.Path.List != NULL) {
298                 free(cp->Data.Path.List);
299                 cp->Data.Path.List = NULL;
300             }
301             if (cp->Data.Overview.size != 0) {
302                 free(cp->Data.Overview.data);
303                 cp->Data.Overview.data = NULL;
304                 cp->Data.Overview.size = 0;
305                 cp->Data.Overview.left = 0;
306                 cp->Data.Overview.used = 0;
307             }
308             if (cp->Data.XrefBufLength != 0) {
309                 free(cp->Data.Xref);
310                 cp->Data.Xref = NULL;
311                 cp->Data.XrefBufLength = 0;
312             }
313         } else if (cp->Type == CTreject)
314             syslog(L_NOTICE, "%s %ld", name, cp->Rejected);
315         else if (cp->Out.left)
316             syslog(L_NOTICE, "%s closed lost %lu", name,
317                    (unsigned long) cp->Out.left);
318         else
319             syslog(L_NOTICE, "%s closed", name);
320         WCHANremove(cp);
321         RCHANremove(cp);
322         SCHANremove(cp);
323         if (cp->Argument != NULL)
324             /* Set to NULL below. */
325             free(cp->Argument);
326         if (cp->fd >= 0 && close(cp->fd) < 0)
327             syslog(L_ERROR, "%s cant close %s %m", LogName, name);
328  
329         if (cp->MaxCnx > 0 && cp->Type == CTnntp) {
330             int tfd;
331             CHANNEL *tempchan;
332
333             cp->fd = -1;
334             if ((label = RClabelname(cp)) != NULL) {
335                 for(tfd = 0; tfd <= CHANlastfd; tfd++) {
336                     tempchan = &CHANtable[tfd];
337                     if(tempchan->fd > 0 && tempchan->Type == CTnntp &&
338                         ((tmplabel = RClabelname(tempchan)) != NULL) &&
339                         strcmp(label, tmplabel) == 0 &&
340                         tempchan->ActiveCnx == 0) {
341                             tempchan->ActiveCnx = cp->ActiveCnx;
342                             RCHANadd(tempchan);
343                             break;
344                     }
345                 }
346             }
347         }
348     }
349
350     /* Mark it unused. */
351     cp->Type = CTfree;
352     cp->State = CSerror;
353     cp->fd = -1;
354     cp->Argument = NULL;
355     cp->ActiveCnx = 0;
356
357     /* Free the buffers if they got big. */
358     if (cp->In.size > BIG_BUFFER) {
359         cp->In.size = 0;
360         cp->In.used = 0;
361         cp->In.left = 0;
362         free(cp->In.data);
363         cp->In.data = NULL;
364     }
365     if (cp->Out.size > BIG_BUFFER) {
366         cp->Out.size = 0;
367         cp->Out.used = 0;
368         cp->Out.left = 0;
369         free(cp->Out.data);
370         cp->Out.data = NULL;
371     }
372     if (cp->Sendid.size > 0) {
373         cp->Sendid.size = 0;
374         cp->Sendid.used = 0;
375         cp->Sendid.left = 0;
376         free(cp->Sendid.data);
377         cp->Sendid.data = NULL;
378     }
379 }
380
381
382 /*
383 **  Return a printable name for the channel.
384 */
385 char *
386 CHANname(const CHANNEL *cp)
387 {
388     static char         buff[SMBUF];
389     int                 i;
390     SITE *              sp;
391     const char *        p;
392     pid_t               pid;
393
394     switch (cp->Type) {
395     default:
396         snprintf(buff, sizeof(buff), "?%d(#%d@%ld)?", cp->Type, cp->fd,
397                  (long) (cp - CHANtable));
398         break;
399     case CTany:
400         snprintf(buff, sizeof(buff), "any:%d", cp->fd);
401         break;
402     case CTfree:
403         snprintf(buff, sizeof(buff), "free:%d", cp->fd);
404         break;
405     case CTremconn:
406         snprintf(buff, sizeof(buff), "remconn:%d", cp->fd);
407         break;
408     case CTreject:
409         snprintf(buff, sizeof(buff), "%s rejected", RChostname(cp));
410         break;
411     case CTnntp:
412         snprintf(buff, sizeof(buff), "%s:%d",
413                  cp->Address.ss_family == 0 ? "localhost" : RChostname(cp),
414                  cp->fd);
415         break;
416     case CTlocalconn:
417         snprintf(buff, sizeof(buff), "localconn:%d", cp->fd);
418         break;
419     case CTcontrol:
420         snprintf(buff, sizeof(buff), "control:%d", cp->fd);
421         break;
422     case CTexploder:
423     case CTfile:
424     case CTprocess:
425         /* Find the site that has this channel. */
426         for (p = "?", i = nSites, sp = Sites, pid = 0; --i >= 0; sp++)
427             if (sp->Channel == cp) {
428                 p = sp->Name;
429                 if (cp->Type != CTfile)
430                     pid = sp->pid;
431                 break;
432             }
433         if (pid == 0)
434             snprintf(buff, sizeof(buff), "%s:%d:%s",
435                      MaxLength(p, p), cp->fd,
436                      cp->Type == CTfile ? "file" : "proc");
437         else
438             snprintf(buff, sizeof(buff), "%s:%d:%s:%ld",
439                      MaxLength(p, p), cp->fd,
440                      cp->Type == CTfile ? "file" : "proc", (long)pid);
441         break;
442     }
443     return buff;
444 }
445
446
447 /*
448 **  Return the channel for a specified descriptor.
449 */
450 CHANNEL *
451 CHANfromdescriptor(int fd)
452 {
453     if (fd <0 || fd > CHANtablesize)
454         return NULL;
455     return &CHANtable[fd];
456 }
457
458
459 /*
460 **  Iterate over all channels of a specified type.
461 */
462 CHANNEL *
463 CHANiter(int *ip, CHANNELTYPE Type)
464 {
465     CHANNEL             *cp;
466     int                 i;
467
468     if ((i = *ip) >= 0 && i < CHANtablesize) {
469         do {
470             cp = &CHANtable[i];
471             if (cp->Type == CTfree && cp->fd == -1)
472                 continue;
473             if (Type == CTany || cp->Type == Type) {
474                 *ip = ++i;
475                 return cp;
476             }
477         } while (++i < CHANtablesize);
478     }
479     return NULL;
480 }
481
482
483 /*
484 **  Mark a channel as an active reader.
485 */
486 void
487 RCHANadd(CHANNEL *cp)
488 {
489     FD_SET(cp->fd, &RCHANmask);
490     if (cp->fd > CHANlastfd)
491         CHANlastfd = cp->fd;
492
493     if (cp->Type != CTnntp)
494         /* Start reading at the beginning of the buffer. */
495         cp->In.used = 0;
496 }
497
498
499 /*
500 **  Remove a channel from the set of readers.
501 */
502 void
503 RCHANremove(CHANNEL *cp)
504 {
505     if (FD_ISSET(cp->fd, &RCHANmask)) {
506         FD_CLR(cp->fd, &RCHANmask);
507         if (cp->fd == CHANlastfd) {
508             /* This was the highest descriptor, get a new highest. */
509             while (!FD_ISSET(CHANlastfd, &RCHANmask)
510               && !FD_ISSET(CHANlastfd, &WCHANmask)
511               && CHANlastfd > 1)
512                 CHANlastfd--;
513         }
514     }
515 }
516
517
518 /*
519 **  Put a channel to sleep, call a function when it wakes.
520 **  Note that the Argument must be NULL or allocated memory!
521 */
522 void
523 SCHANadd(CHANNEL *cp, time_t Waketime, void *Event, innd_callback_t Waker,
524          void *Argument)
525 {
526     if (!FD_ISSET(cp->fd, &SCHANmask)) {
527         SCHANcount++;
528         FD_SET(cp->fd, &SCHANmask);
529     }
530     if (cp->fd > CHANlastsleepfd)
531         CHANlastsleepfd = cp->fd;
532     cp->Waketime = Waketime;
533     cp->Waker = Waker;
534     if (cp->Argument != Argument) {
535         free(cp->Argument);
536         cp->Argument = Argument;
537     }
538     cp->Event = Event;
539 }
540
541
542 /*
543 **  Take a channel off the sleep list.
544 */
545 void
546 SCHANremove(CHANNEL *cp)
547 {
548     if (FD_ISSET(cp->fd, &SCHANmask)) {
549         FD_CLR(cp->fd, &SCHANmask);
550         SCHANcount--;
551         cp->Waketime = 0;
552         if (cp->fd == CHANlastsleepfd) {
553             /* This was the highest descriptor, get a new highest. */
554             while (!FD_ISSET(CHANlastsleepfd, &SCHANmask)
555               && CHANlastsleepfd > 1)
556                 CHANlastsleepfd--;
557         }
558     }
559 }
560
561
562 /*
563 **  Is a channel on the sleep list?
564 */
565 bool
566 CHANsleeping(CHANNEL *cp)
567 {
568     return FD_ISSET(cp->fd, &SCHANmask);
569 }
570
571
572 /*
573 **  Wake up channels waiting for a specific event.
574 */
575 void
576 SCHANwakeup(void *Event)
577 {
578     CHANNEL             *cp;
579     int                 i;
580
581     for (cp = CHANtable, i = CHANtablesize; --i >= 0; cp++)
582         if (cp->Type != CTfree && cp->Event == Event && CHANsleeping(cp))
583             cp->Waketime = 0;
584 }
585
586
587 /*
588 **  Mark a channel as an active writer.  Don't reset the Out->left field
589 **  since we could have buffered I/O already in there.
590 */
591 void
592 WCHANadd(CHANNEL *cp)
593 {
594     if (cp->Out.left > 0) {
595         FD_SET(cp->fd, &WCHANmask);
596         if (cp->fd > CHANlastfd)
597             CHANlastfd = cp->fd;
598     }
599 }
600
601
602 /*
603 **  Remove a channel from the set of writers.
604 */
605 void
606 WCHANremove(CHANNEL *cp)
607 {
608     if (FD_ISSET(cp->fd, &WCHANmask)) {
609         FD_CLR(cp->fd, &WCHANmask);
610         if (cp->Out.left <= 0) {
611             /* No data left -- reset used so we don't grow the buffer. */
612             cp->Out.used = 0;
613             cp->Out.left = 0;
614         }
615         if (cp->fd == CHANlastfd) {
616             /* This was the highest descriptor, get a new highest. */
617             while (!FD_ISSET(CHANlastfd, &RCHANmask)
618               && !FD_ISSET(CHANlastfd, &WCHANmask)
619               && CHANlastfd > 1)
620                 CHANlastfd--;
621         }
622     }
623 }
624
625
626 /*
627 **  Set a channel to start off with the contents of an existing channel.
628 */
629 void
630 WCHANsetfrombuffer(CHANNEL *cp, struct buffer *bp)
631 {
632     WCHANset(cp, &bp->data[bp->used], bp->left);
633 }
634
635 \f
636
637 /*
638 **  Read in text data, return the amount we read.
639 */
640 int
641 CHANreadtext(CHANNEL *cp)
642 {
643     ptrdiff_t           i, j;
644     struct buffer       *bp;
645     char                *p;
646     int                 oerrno;
647     int                 maxbyte;
648     HDRCONTENT          *hc = cp->Data.HdrContent;
649
650     /* Grow buffer if we're getting close to current limit.  FIXME: The In
651        buffer doesn't use the normal meanings of .used and .left.  */
652     bp = &cp->In;
653     bp->left = bp->size - bp->used;
654     if (bp->left <= LOW_WATER) {
655         i = GROW_AMOUNT(bp->size);
656         bp->size += i;
657         bp->left += i;
658         p = bp->data;
659         TMRstart(TMR_DATAMOVE);
660         bp->data = xrealloc(bp->data, bp->size);
661
662         /* Adjust offets of realloc moved the location of the memory region.
663            FIXME: This is invalid C, although it will work on most (all?)
664            common systems.  The pointers need to be reduced to offets and then
665            turned back into relative pointers rather than adjusting the
666            pointers directly, since as soon as realloc is called, pointers
667            into the old space become invalid and may not be used further. */
668         if ((i = p - bp->data) != 0) {
669             if (cp->State == CSgetheader || cp->State == CSgetbody ||
670                 cp->State == CSeatarticle) {
671                 /* adjust offset only in CSgetheader, CSgetbody or
672                    CSeatarticle */
673                 if (cp->Data.BytesHeader != NULL)
674                   cp->Data.BytesHeader -= i;
675                 for (j = 0 ; j < MAX_ARTHEADER ; j++, hc++) {
676                     if (hc->Value != NULL)
677                         hc->Value -= i;
678                 }
679             }
680         }
681         TMRstop(TMR_DATAMOVE);
682     }
683
684     /* Read in whatever is there, up to some reasonable limit.
685
686        We want to limit the amount of time devoted to processing the incoming
687        data for any given channel.  There's no easy way of doing that, though,
688        so we restrict the data size instead.
689
690        If the data is part of a single large article, then reading and
691        processing many kilobytes at a time costs very little.  If the data is
692        a long list of CHECK commands from a streaming feed, then every line of
693        data will require a history lookup, and we probably don't want to do
694        more than about 10 of those per channel on each cycle of the main
695        select() loop (otherwise we might take too long before giving other
696        channels a turn).  10 lines of CHECK commands suggests a limit of about
697        1KB of data, or less.  innconf->maxcmdreadsize (BUFSIZ by default) is
698        often about 1KB, and is attractive for other reasons, so let's use that
699        as our size limit.  If innconf->maxcmdreadsize is 0, there is no limit.
700
701        Reduce the read size only if we're reading commands.
702
703        FIXME: A better approach would be to limit the number of commands we
704        process for each channel. */
705     if (innconf->maxcmdreadsize <= 0 || cp->State != CSgetcmd
706         || bp->left < innconf->maxcmdreadsize)
707         maxbyte = bp->left;
708     else
709         maxbyte = innconf->maxcmdreadsize;
710     TMRstart(TMR_NNTPREAD);
711     i = read(cp->fd, &bp->data[bp->used], maxbyte);
712     TMRstop(TMR_NNTPREAD);
713     if (i < 0) {
714         /* Solaris (at least 2.4 through 2.6) will occasionally return
715            EAGAIN in response to a read even if the file descriptor already
716            selected true for reading, apparently due to some internal
717            resource exhaustion.  In that case, return -2, which will drop
718            back out to the main loop and go on to the next file descriptor,
719            as if the descriptor never selected true.  This check will
720            probably never trigger on platforms other than Solaris. */
721         if (errno == EAGAIN)
722             return -2;
723         oerrno = errno;
724         p = CHANname(cp);
725         errno = oerrno;
726         sysnotice("%s cant read", p);
727         return -1;
728     }
729     if (i == 0) {
730         p = CHANname(cp);
731         notice("%s readclose", p);
732         return 0;
733     }
734
735     bp->used += i;
736     bp->left -= i;
737     return i;
738 }
739
740
741 /*
742 **  If I/O backs up a lot, we can get EMSGSIZE on some systems.  If that
743 **  happens we want to do the I/O in chunks.  We assume stdio's BUFSIZ is
744 **  a good chunk value.
745 */
746 static int
747 CHANwrite(int fd, char *p, long length)
748 {
749     int i;
750     char        *save;
751
752     do {
753         /* Try the standard case -- write it all. */
754         i = write(fd, p, length);
755         if (i > 0 || (i < 0 && errno != EMSGSIZE && errno != EINTR))
756             return i;
757     } while (i < 0 && errno == EINTR);
758
759     /* Write it in pieces. */
760     for (save = p, i = 0; length; p += i, length -= i) {
761         i = write(fd, p, (length > BUFSIZ ? BUFSIZ : length));
762         if (i <= 0)
763             break;
764     }
765
766     /* Return error, or partial results if we got something. */
767     return p == save ? i : p - save;
768 }
769
770
771 /*
772 **  Try to flush out the buffer.  Use this only on file channels!
773 */
774 bool
775 WCHANflush(CHANNEL *cp)
776 {
777     struct buffer       *bp;
778     int                 i;
779
780     /* Write it. */
781     for (bp = &cp->Out; bp->left > 0; bp->left -= i, bp->used += i) {
782         i = CHANwrite(cp->fd, &bp->data[bp->used], bp->left);
783         if (i < 0) {
784             syslog(L_ERROR, "%s cant flush count %lu %m",
785                 CHANname(cp), (unsigned long) bp->left);
786             return false;
787         }
788         if (i == 0) {
789             syslog(L_ERROR, "%s cant flush count %lu",
790                 CHANname(cp), (unsigned long) bp->left);
791             return false;
792         }
793     }
794     WCHANremove(cp);
795     return true;
796 }
797
798 \f
799
800 /*
801 **  Wakeup routine called after a write channel was put to sleep.
802 */
803 static void
804 CHANwakeup(CHANNEL *cp)
805 {
806     syslog(L_NOTICE, "%s wakeup", CHANname(cp));
807     WCHANadd(cp);
808 }
809
810
811 /*
812 **  Attempting to write would block; stop output or give up.
813 */
814 static void
815 CHANwritesleep(CHANNEL *cp, char *p)
816 {
817     int                 i;
818
819     if ((i = ++(cp->BlockedWrites)) > innconf->badiocount)
820         switch (cp->Type) {
821         default:
822             break;
823         case CTreject:
824         case CTnntp:
825         case CTfile:
826         case CTexploder:
827         case CTprocess:
828             syslog(L_ERROR, "%s blocked closing", p);
829             SITEchanclose(cp);
830             CHANclose(cp, p);
831             return;
832         }
833     i *= innconf->blockbackoff;
834     syslog(L_ERROR, "%s blocked sleeping %d", p, i);
835     SCHANadd(cp, Now.time + i, NULL, CHANwakeup, NULL);
836 }
837
838
839 #if     defined(INND_FIND_BAD_FDS)
840 /*
841 **  We got an unknown error in select.  Find out the culprit.
842 **  Not really ready for production use yet, and it's expensive, too.
843 */
844 static void
845 CHANdiagnose(void)
846 {
847     fd_set              Test;
848     int                 i;
849     struct timeval      t;
850
851     FD_ZERO(&Test);
852     for (i = CHANlastfd; i >= 0; i--) {
853         if (FD_ISSET(i, &RCHANmask)) {
854             FD_SET(i, &Test);
855             t.tv_sec = 0;
856             t.tv_usec = 0;
857             if (select(i + 1, &Test, NULL, NULL, &t) < 0
858               && errno != EINTR) {
859                 syslog(L_ERROR, "%s Bad Read File %d", LogName, i);
860                 FD_CLR(i, &RCHANmask);
861                 /* Probably do something about the file descriptor here; call
862                  * CHANclose on it? */
863             }
864             FD_CLR(i, &Test);
865         }
866         if (FD_ISSET(i, &WCHANmask)) {
867             FD_SET(i, &Test);
868             t.tv_sec = 0;
869             t.tv_usec = 0;
870             if (select(i + 1, NULL, &Test, NULL, &t) < 0
871              && errno != EINTR) {
872                 syslog(L_ERROR, "%s Bad Write File %d", LogName, i);
873                 FD_CLR(i, &WCHANmask);
874                 /* Probably do something about the file descriptor here; call
875                  * CHANclose on it? */
876             }
877             FD_CLR(i, &Test);
878         }
879     }
880 }
881 #endif  /* defined(INND_FIND_BAD_FDS) */
882
883 void
884 CHANsetActiveCnx(CHANNEL *cp) {
885     int         found;  
886     CHANNEL     *tempchan;
887     char        *label, *tmplabel;
888     int         tfd;
889     
890     if((cp->fd > 0) && (cp->Type == CTnntp) && (cp->ActiveCnx == 0)) {
891         found = 1;      
892         if ((label = RClabelname(cp)) != NULL) {
893             for(tfd = 0; tfd <= CHANlastfd; tfd++) {
894                 tempchan = &CHANtable[tfd];
895                 if ((tmplabel = RClabelname(tempchan)) == NULL) {
896                     continue;
897                 }
898                 if(strcmp(label, tmplabel) == 0) {
899                     if(tempchan->ActiveCnx != 0)
900                         found++;
901                 }
902             }
903         } 
904         cp->ActiveCnx = found;
905     }   
906 }
907
908 /*
909 **  Main I/O loop.  Wait for data, call the channel's handler when there is
910 **  something to read or when the queued write is finished.  In order to
911 **  be fair (i.e., don't always give descriptor n priority over n+1), we
912 **  remember where we last had something and pick up from there.
913 **
914 **  Yes, the main code has really wandered over to the side a lot.
915 */
916 void
917 CHANreadloop(void)
918 {
919     static char         EXITING[] = "INND exiting because of signal\n";
920     static int          fd;
921     ptrdiff_t           i, j;
922     int                 startpoint;
923     int                 count;
924     int                 lastfd;
925     int                 oerrno;
926     CHANNEL             *cp;
927     struct buffer       *bp;
928     fd_set              MyRead;
929     fd_set              MyWrite;
930     struct timeval      MyTime;
931     long                silence;
932     char                *p;
933     time_t              LastUpdate;
934     HDRCONTENT          *hc;
935
936     STATUSinit();
937     
938     LastUpdate = GetTimeInfo(&Now) < 0 ? 0 : Now.time;
939     for ( ; ; ) {
940         /* See if any processes died. */
941         PROCscan();
942
943         /* Wait for data, note the time. */
944         MyRead = RCHANmask;
945         MyWrite = WCHANmask;
946         MyTime = TimeOut;
947         if (innconf->timer) {
948             unsigned long now = TMRnow();
949
950             if (now >= 1000 * (unsigned long)(innconf->timer)) {
951                 TMRsummary("ME", timer_name);
952                 InndHisLogStats();
953                 MyTime.tv_sec = innconf->timer;
954             }
955             else {
956                 MyTime.tv_sec = innconf->timer - now / 1000;
957             }
958         }
959         TMRstart(TMR_IDLE);
960         count = select(CHANlastfd + 1, &MyRead, &MyWrite, NULL, &MyTime);
961         TMRstop(TMR_IDLE);
962
963         STATUSmainloophook();
964         if (GotTerminate) {
965             write(2, EXITING, strlen(EXITING));
966             CleanupAndExit(0, (char *)NULL);
967         }
968         if (count < 0) {
969             if (errno != EINTR) {
970                 syslog(L_ERROR, "%s cant select %m", LogName);
971 #if     defined(INND_FIND_BAD_FDS)
972                 CHANdiagnose();
973 #endif  /* defined(INND_FIND_BAD_FDS) */
974             }
975             continue;
976         }
977
978         /* Update the "reasonably accurate" time. */
979         if (GetTimeInfo(&Now) < 0)
980             syslog(L_ERROR, "%s cant gettimeinfo %m", LogName);
981         if (Now.time > LastUpdate + TimeOut.tv_sec) {
982             HISsync(History);
983             if (ICDactivedirty) {
984                 ICDwriteactive();
985                 ICDactivedirty = 0;
986             }
987             LastUpdate = Now.time;
988         }
989
990         if (count == 0) {
991             /* No channels active, so flush and skip if nobody's
992              * sleeping. */
993             if (Mode == OMrunning)
994                 ICDwrite();
995             if (SCHANcount == 0)
996                 continue;
997         }
998
999         /* Try the control channel first. */
1000         if (FD_ISSET(CHANccfd, &RCHANmask) && FD_ISSET(CHANccfd, &MyRead)) {
1001             count--;
1002             if (count > 3)
1003                 count = 3; /* might be more requests */
1004             (*CHANcc->Reader)(CHANcc);
1005             FD_CLR(CHANccfd, &MyRead);
1006         }
1007
1008 #ifdef PRIORITISE_REMCONN
1009         /* Try the remconn channel next. */
1010         for (j = 0 ; (j < chanlimit) && (CHANrcfd[j] >= 0) ; j++) {
1011             if (FD_ISSET(CHANrcfd[j], &RCHANmask) && FD_ISSET(CHANrcfd[j], &MyRead)) {
1012                 count--;
1013                 if (count > 3)
1014                     count = 3; /* might be more requests */
1015                 (*CHANrc[j]->Reader)(CHANrc[j]);
1016                 FD_CLR(CHANrcfd[j], &MyRead);
1017             }
1018         }
1019 #endif /* PRIORITISE_REMCONN */
1020
1021         /* Loop through all active channels.  Somebody could have closed
1022          * closed a channel so we double-check the global mask before
1023          * looking at what select returned.  The code here is written so
1024          * that a channel could be reading and writing and sleeping at the
1025          * same time, even though that's not possible.  (Just as well,
1026          * since in SysVr4 the count would be wrong.) */
1027         lastfd = CHANlastfd;
1028         if (lastfd < CHANlastsleepfd)
1029             lastfd = CHANlastsleepfd;
1030         if (fd > lastfd)
1031             fd = 0;
1032         startpoint = fd;
1033         do {
1034             cp = &CHANtable[fd];
1035
1036             if (cp->MaxCnx > 0 && cp->HoldTime > 0) {
1037                 CHANsetActiveCnx(cp);
1038                 if((cp->ActiveCnx > cp->MaxCnx) && (cp->fd > 0)) {
1039                     if(cp->Started + cp->HoldTime < Now.time) {
1040                         CHANclose(cp, CHANname(cp));
1041                     } else {
1042                         if (fd >= lastfd)
1043                             fd = 0;
1044                         else
1045                             fd++;
1046                         cp->ActiveCnx = 0;
1047                         RCHANremove(cp);
1048                     }
1049                     continue;
1050                 }
1051             }
1052             
1053             /* Anything to read? */
1054             if (FD_ISSET(fd, &RCHANmask) && FD_ISSET(fd, &MyRead)) {
1055                 count--;
1056                 if (cp->Type == CTfree) {
1057                     syslog(L_ERROR, "%s %d free but was in RMASK",
1058                         CHANname(cp), fd);
1059                     /* Don't call RCHANremove since cp->fd will be -1. */
1060                     FD_CLR(fd, &RCHANmask);
1061                     close(fd);
1062                 }
1063                 else {
1064                     cp->LastActive = Now.time;
1065                     (*cp->Reader)(cp);
1066                 }
1067             }
1068
1069             /* Check and see if the buffer is grossly overallocated and shrink
1070                if needed */
1071             if (cp->In.size > (BIG_BUFFER)) {
1072                 if (cp->In.used != 0) {
1073                     if ((cp->In.size / cp->In.used) > 10) {
1074                         cp->In.size = (cp->In.used * 2) > START_BUFF_SIZE ? (cp->In.used * 2) : START_BUFF_SIZE;
1075                         p = cp->In.data;
1076                         TMRstart(TMR_DATAMOVE);
1077                         cp->In.data = xrealloc(cp->In.data, cp->In.size);
1078                         cp->In.left = cp->In.size - cp->In.used;
1079                         /* do not move data, since xrealloc did it already */
1080                         if ((i = p - cp->In.data) != 0) {
1081                             if (cp->State == CSgetheader ||
1082                                 cp->State == CSgetbody ||
1083                                 cp->State == CSeatarticle) {
1084                                 /* adjust offset only in CSgetheader, CSgetbody
1085                                    or CSeatarticle */
1086                                 if (cp->Data.BytesHeader != NULL)
1087                                   cp->Data.BytesHeader -= i;
1088                                 hc = cp->Data.HdrContent;
1089                                 for (j = 0 ; j < MAX_ARTHEADER ; j++, hc++) {
1090                                     if (hc->Value != NULL)
1091                                         hc->Value -= i;
1092                                 }
1093                             }
1094                         }
1095                         TMRstop(TMR_DATAMOVE);
1096                     }
1097                 } else {
1098                     p = cp->In.data;
1099                     TMRstart(TMR_DATAMOVE);
1100                     cp->In.data = xrealloc(cp->In.data, START_BUFF_SIZE);
1101                     cp->In.size = cp->In.left = START_BUFF_SIZE;
1102                     if ((i = p - cp->In.data) != 0) {
1103                         if (cp->State == CSgetheader ||
1104                             cp->State == CSgetbody ||
1105                             cp->State == CSeatarticle) {
1106                             /* adjust offset only in CSgetheader, CSgetbody
1107                                or CSeatarticle */
1108                             if (cp->Data.BytesHeader != NULL)
1109                               cp->Data.BytesHeader -= i;
1110                             hc = cp->Data.HdrContent;
1111                             for (j = 0 ; j < MAX_ARTHEADER ; j++, hc++) {
1112                                 if (hc->Value != NULL)
1113                                     hc->Value -= i;
1114                             }
1115                         }
1116                     }
1117                     TMRstop(TMR_DATAMOVE);
1118                 }
1119             }
1120             /* Possibly recheck for dead children so we don't get SIGPIPE
1121              * on readerless channels. */
1122             if (PROCneedscan)
1123                 PROCscan();
1124
1125             /* Ready to write? */
1126             if (FD_ISSET(fd, &WCHANmask) && FD_ISSET(fd, &MyWrite)) {
1127                 count--;
1128                 if (cp->Type == CTfree) {
1129                     syslog(L_ERROR, "%s %d free but was in WMASK",
1130                         CHANname(cp), fd);
1131                     /* Don't call WCHANremove since cp->fd will be -1. */
1132                     FD_CLR(fd, &WCHANmask);
1133                     close(fd);
1134                 }
1135                 else {
1136                     bp = &cp->Out;
1137                     if (bp->left) {
1138                         cp->LastActive = Now.time;
1139                         i = CHANwrite(fd, &bp->data[bp->used], bp->left);
1140                         if (i <= 0) {
1141                             oerrno = errno;
1142                             p = CHANname(cp);
1143                             errno = oerrno;
1144                             if (i < 0)
1145                                 sysnotice("%s cant write", p);
1146                             else
1147                                 notice("%s cant write", p);
1148                             cp->BadWrites++;
1149                             if (i < 0 && oerrno == EPIPE) {
1150                                 SITEchanclose(cp);
1151                                 CHANclose(cp, p);
1152                             }
1153                             else if (i < 0 &&
1154                                      (oerrno == EWOULDBLOCK
1155                                       || oerrno == EAGAIN)) {
1156                                 WCHANremove(cp);
1157                                 CHANwritesleep(cp, p);
1158                             }
1159                             else if (cp->BadWrites >= innconf->badiocount) {
1160                                 syslog(L_ERROR, "%s sleeping", p);
1161                                 WCHANremove(cp);
1162                                 SCHANadd(cp,
1163                                          Now.time + innconf->pauseretrytime,
1164                                          NULL, CHANwakeup, NULL);
1165                             }
1166                         }
1167                         else {
1168                             cp->BadWrites = 0;
1169                             cp->BlockedWrites = 0;
1170                             bp->left -= i;
1171                             bp->used += i;
1172                             if (bp->left <= 0) {
1173                                 WCHANremove(cp);
1174                                 (*cp->WriteDone)(cp);
1175                             } else if (bp->used > (bp->size/COMP_THRESHOLD)) {
1176                                 /* compact the buffer, shoving the
1177                                    data back to the beginning.
1178                                    <rmtodd@mailhost.ecn.ou.edu> */
1179                                 buffer_set(bp, &bp->data[bp->used], bp->left);
1180                             }
1181                         }
1182                     }
1183                     else
1184                         /* Should not be possible. */
1185                         WCHANremove(cp);
1186                 }
1187             }
1188
1189             /* Coming off a sleep? */
1190             if (FD_ISSET(fd, &SCHANmask) && cp->Waketime <= Now.time) {
1191                 if (cp->Type == CTfree) {
1192                     syslog(L_ERROR,"%s ERROR s-select free %d",CHANname(cp),fd);
1193                     FD_CLR(fd, &SCHANmask);
1194                      close(fd);
1195                 } else {
1196                     cp->LastActive = Now.time;
1197                     SCHANremove(cp);
1198                     (*cp->Waker)(cp);
1199                 }
1200             }
1201
1202             /* Toss CTreject channel early if it's inactive. */
1203             if (cp->Type == CTreject
1204              && cp->LastActive + REJECT_TIMEOUT < Now.time) {
1205                 p = CHANname(cp);
1206                 syslog(L_NOTICE, "%s timeout reject", p);
1207                 CHANclose(cp, p);
1208             }
1209
1210             /* Has this channel been inactive very long? */
1211             if (cp->Type == CTnntp
1212              && cp->LastActive + cp->NextLog < Now.time) {
1213                 p = CHANname(cp);
1214                 silence = Now.time - cp->LastActive;
1215                 cp->NextLog += innconf->chaninacttime;
1216                 syslog(L_NOTICE, "%s inactive %ld", p, silence / 60L);
1217                 if (silence > innconf->peertimeout) {
1218                     syslog(L_NOTICE, "%s timeout", p);
1219                     CHANclose(cp, p);
1220                 }
1221             }
1222
1223             /* Bump pointer, modulo the table size. */
1224             if (fd >= lastfd)
1225                 fd = 0;
1226             else
1227                 fd++;
1228
1229             /* If there is nothing to do, break out. */
1230             if (count == 0 && SCHANcount == 0)
1231                 break;
1232
1233         } while (fd != startpoint);
1234     }
1235 }