chiark / gitweb /
abolish xk_Malloc
[inn-innduct.git] / innfeed / endpoint.c
1 /*  $Id: endpoint.c 7738 2008-04-06 09:33:33Z iulius $
2 **
3 **  The implementation of the innfeed EndPoint object class.
4 **
5 **  Written by James Brister <brister@vix.com>
6 **
7 **  The EndPoint class is what gives the illusion (sort of, kind of) of
8 **  threading.  Basically it controls a select loop and a set of EndPoint
9 **  objects.  Each EndPoint has a file descriptor it is interested in.  The
10 **  users of the EndPoint tell the EndPoints to notify them when a read or
11 **  write has been completed (or simple if the file descriptor is read or
12 **  write ready).
13 */
14
15 #include "innfeed.h"
16 #include "config.h"
17 #include "clibrary.h"
18 #include "portable/socket.h"
19 #include "portable/time.h"
20
21 #include <assert.h>
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <signal.h>
25 #include <sys/stat.h>
26 #include <sys/uio.h>
27 #include <syslog.h>
28
29 #ifdef HAVE_LIMITS_H
30 # include <limits.h>
31 #endif
32
33 #ifdef HAVE_SYS_SELECT_H
34 # include <sys/select.h>
35 #endif
36
37 #include "inn/innconf.h"
38 #include "inn/messages.h"
39 #include "libinn.h"
40
41 #include "buffer.h"
42 #include "configfile.h"
43 #include "endpoint.h"
44 #include "host.h"
45
46 static const char *const timer_name[] = {
47   "idle", "blstats", "stsfile", "newart", "readart", "prepart", "read",
48   "write", "cb"
49 };
50
51 #if ! defined (NSIG)
52 #define NSIG 32
53 #endif
54
55
56   /* This is the structure that is the EndPoint */
57 struct endpoint_s 
58 {
59       /* fields for managing multiple reads into the inBuffer. */
60     Buffer *inBuffer ;          /* list of buffers to read into */
61     unsigned int inBufferIdx ;         /* where is list we're at. */
62     size_t inIndex ;            /* where in current read we're at */
63     size_t inMinLen ;           /* minimum amount to read */
64     size_t inAmtRead ;          /* amount read so far */
65     EndpRWCB inCbk ;            /* callback for when read complete */
66     void *inClientData ;        /* callback data */
67     
68       /* fields for managing multiple writes from the outBuffer */
69     Buffer *outBuffer ;         /* list of buffers to write */
70     unsigned int outBufferIdx ;        /* index into buffer list to start write */
71     size_t outIndex ;           /* where in current buffer we write from */
72     size_t outSize ;            /* total of all the buffers */
73     size_t outAmtWritten ;      /* amount written so far */
74     EndpRWCB outProgressCbk ;   /* callback when done */
75     EndpRWCB outDoneCbk ;       /* callback when done */
76     void *outClientData ;       /* callback data */
77
78     EndpWorkCbk workCbk ;       /* callback to run if no I/O to do */
79     void *workData ;            /* data for callback */
80
81     int myFd ;                  /* the file descriptor we're handling */
82     int myErrno ;               /* the errno when I/O fails */
83     
84     double selectHits ;         /* indicates how often it's ready */
85 };
86
87
88
89   /* A private structure. These hold the information on the timer callbacks. */
90 typedef struct timerqelem_s
91 {
92     TimeoutId id ;              /* the id we gave out */
93     time_t when ;               /* The time the timer should go off */
94     EndpTCB func ;              /* the function to call */
95     void *data ;                /* the client callback data */
96     struct timerqelem_s *next ; /* next in the queue */
97 } *TimerElem, TimerElemStruct ;
98
99
100
101   /* set to 1 elsewhere if you want stderr to get what's also being written
102      in doWrite. */
103 int debugWrites ;
104
105 extern const char *InputFile ;
106
107 static EndPoint mainEndPoint ;
108 static bool mainEpIsReg = false ;
109 static unsigned int stdioFdMax = MAX_STDIO_FD ;
110
111 time_t  PrivateTime;
112
113
114 typedef void (*sigfn) (int) ;
115 static sigfn *sigHandlers ;
116
117 static volatile sig_atomic_t *sigFlags ;
118
119
120
121   /* private functions */
122 static IoStatus doRead (EndPoint endp) ;
123 static IoStatus doWrite (EndPoint endp) ;
124 static IoStatus doExcept (EndPoint endp) ;
125 static void pipeHandler (int s) ;
126 static void signalHandler (int s) ;
127 static int hitCompare (const void *v1, const void *v2) ;
128 static void reorderPriorityList (void) ;
129 static TimerElem newTimerElem (TimeoutId i, time_t w, EndpTCB f, void *d) ;
130 static TimeoutId timerElemAdd (time_t when, EndpTCB func, void *data) ;
131 static struct timeval *getTimeout (struct timeval *tout) ;
132 static void doTimeout (void) ;
133 static void handleSignals (void) ;
134
135 #if 0
136 static int ff_set (fd_set *set, unsigned int start) ;
137 static int ff_free (fd_set *set, unsigned int start) ;
138 #endif
139 static void endpointCleanup (void) ;
140
141
142   /* Private data */
143 static size_t maxEndPoints ;
144
145 static EndPoint *endPoints ;    /* endpoints indexed on fd */
146 static EndPoint *priorityList ; /* endpoints indexed on priority */
147
148 static int absHighestFd = 0 ;       /* never goes down */
149 static int highestFd = -1 ;
150 static unsigned int endPointCount = 0 ;
151 static unsigned int priorityCount = 0 ;
152
153 static fd_set rdSet ;
154 static fd_set wrSet ;
155 static fd_set exSet ;
156
157 static int keepSelecting ;
158
159 static TimerElem timeoutQueue ;
160 static TimerElem timeoutPool ;
161 static TimeoutId nextId ;
162 static int timeoutQueueLength ;
163
164
165
166
167   /* Create a new EndPoint and hook it to the give file descriptor. All
168      fields are initialized to appropriate values.  On the first time this
169      function is called the global data structs that manages lists of
170      endpoints are intialized. */
171 static bool inited = false ;
172
173 EndPoint newEndPoint (int fd) 
174 {
175   EndPoint ep ;
176
177   if (!inited)
178     {
179       inited = true ;
180       atexit (endpointCleanup) ;
181     }
182   
183   if (fd < 0)
184     return NULL ;
185
186   /* try to dup the fd to a larger number to leave lower values free for
187      broken stdio implementations. */
188   if (stdioFdMax > 0 && ((unsigned int) fd) <= stdioFdMax)
189     {
190       int newfd = fcntl(fd, F_DUPFD, stdioFdMax + 1);
191       if (newfd >= 0)
192         {
193           d_printf (1,"Dupped fd %d to %d\n",fd,newfd) ;
194           if (close (fd) != 0)
195             syswarn ("ME oserr close (%d)", fd) ;
196         }
197       else
198         {
199           d_printf (1,"Couldn't dup fd %d to above %d\n",fd,stdioFdMax) ;
200           newfd = fd ;
201         }
202
203       fd = newfd ;
204     }
205
206   if ((unsigned int) fd >= maxEndPoints)
207     {
208       unsigned int i = maxEndPoints ;
209       
210       maxEndPoints = (((fd + 256) / 256) * 256); /* round up to nearest 256 */ 
211       if (endPoints == NULL)
212         {
213           endPoints = xmalloc (sizeof(EndPoint) * maxEndPoints) ;
214           priorityList = xmalloc (sizeof(EndPoint) * maxEndPoints) ;
215         }
216       else
217         {
218           endPoints = xrealloc (endPoints,sizeof(EndPoint) * maxEndPoints) ;
219           priorityList = xrealloc (priorityList,
220                                    sizeof(EndPoint) * maxEndPoints) ;
221         }
222
223       for ( ; i < maxEndPoints ; i++)
224         endPoints [i] = priorityList [i] = NULL ;
225     }
226   
227   ASSERT (endPoints [fd] == NULL) ;
228
229   if (fd > absHighestFd)
230     {
231       static bool sizelogged = false ;
232           
233 #if defined (FD_SETSIZE)
234       if (fd >= FD_SETSIZE)
235         {
236           sizelogged = true ;
237           warn ("ME fd (%d) looks too big (%d -- FD_SETSIZE)", fd,
238                 FD_SETSIZE) ;
239           return NULL ;
240         }
241 #else
242       if (fd > (sizeof (fd_set) * CHAR_BIT))
243         {
244           sizelogged = true ;
245           warn ("ME fd (%d) looks too big (%d -- sizeof (fd_set) * CHAR_BIT)",
246                 fd, (sizeof (fd_set) * CHAR_BIT)) ;
247           return NULL ;
248         }
249 #endif
250
251       absHighestFd = fd ;
252     }
253       
254   ep = xcalloc (1, sizeof(struct endpoint_s)) ;
255
256   ep->inBuffer = NULL ;
257   ep->inBufferIdx = 0 ;
258   ep->inIndex = 0 ;
259   ep->inMinLen = 0 ;
260   ep->inAmtRead = 0 ;
261   ep->inCbk = NULL ;
262   ep->inClientData = NULL ;
263
264   ep->outBuffer = 0 ;
265   ep->outBufferIdx = 0 ;
266   ep->outIndex = 0 ;
267   ep->outSize = 0 ;
268   ep->outProgressCbk = NULL ;
269   ep->outDoneCbk = NULL ;
270   ep->outClientData = NULL ;
271   ep->outAmtWritten = 0 ;
272
273   ep->workCbk = NULL ;
274   ep->workData = NULL ;
275   
276   ep->myFd = fd ;
277   ep->myErrno = 0 ;
278
279   ep->selectHits = 0.0 ;
280
281   endPoints [fd] = ep ;
282   priorityList [priorityCount++] = ep ;
283   endPointCount++ ;
284
285   highestFd = (fd > highestFd ? fd : highestFd) ;
286
287   return ep ;
288 }
289
290
291
292 /* Delete the given endpoint. The files descriptor is closed and the two
293    Buffer arrays are released. */
294
295 void delEndPoint (EndPoint ep) 
296 {
297   unsigned int idx ;
298
299   if (ep == NULL)
300     return ;
301
302   ASSERT (endPoints [ep->myFd] == ep) ;
303
304   if (mainEndPoint == ep)
305     mainEndPoint = NULL ;
306   
307   if (ep->inBuffer != NULL)
308     freeBufferArray (ep->inBuffer) ;
309   
310   if (ep->outBuffer != NULL)
311     freeBufferArray (ep->outBuffer) ;
312
313   close (ep->myFd) ;
314
315   /* remove from selectable bits */
316   FD_CLR (ep->myFd,&rdSet) ;
317   FD_CLR (ep->myFd,&wrSet) ;
318   FD_CLR (ep->myFd,&exSet) ;
319
320   /* Adjust the global arrays to account for deleted endpoint. */
321   endPoints [ep->myFd] = NULL ;
322   if (ep->myFd == highestFd)
323     while (endPoints [highestFd] == NULL && highestFd >= 0)
324       highestFd-- ;
325
326   for (idx = 0 ; idx < priorityCount ; idx++)
327     if (priorityList [idx] == ep)
328       break ;
329
330   ASSERT (idx < priorityCount) ; /* i.e. was found */
331   ASSERT (priorityList [idx] == ep) ; /* redundant */
332
333   /* this hole will removed in the reorder routine */
334   priorityList [idx] = NULL ;
335
336   endPointCount-- ;
337
338   free (ep) ;
339 }
340
341 int endPointFd (EndPoint endp)
342 {
343   ASSERT (endp != NULL) ;
344
345   return endp->myFd ;
346 }
347
348
349
350
351 /* Request a read to be done next time there's data. The endpoint
352  * ENDP is what will do the read. BUFF is the Buffer the data should
353  * go into. FUNC is the callback function to call when the read is
354  * complete. CLIENTDATA is the client data to pass back into the
355  * callback function. MINLEN is the minimum amount of data to
356  * read. If MINLEN is 0 then then BUFF must be filled, otherwise at
357  * least MINLEN bytes must be read.
358  *
359  * BUFF can be null, in which case no read is actually done, but the
360  * callback function will be called still. This is useful for
361  * listening sockets.
362  *
363  * Returns 0 if the read couln't be prepared (for example if a read
364  * is already outstanding).
365  */
366
367 int prepareRead (EndPoint endp,
368                  Buffer *buffers,
369                  EndpRWCB func,
370                  void *clientData,
371                  int minlen) 
372 {
373   int bufferSizeTotal = 0 ;
374   int idx ;
375   
376   ASSERT (endp != NULL) ;
377   
378   if (endp->inBuffer != NULL || FD_ISSET (endp->myFd,&rdSet)) 
379     return 0 ;                  /* something already there */
380
381   for (idx = 0 ; buffers != NULL && buffers [idx] != NULL ; idx++)
382     {
383       size_t bs = bufferSize (buffers [idx]) ;
384       size_t bds = bufferDataSize (buffers [idx]) ;
385       
386       bufferSizeTotal += (bs - bds) ;
387     }
388   
389   endp->inBuffer = buffers ;
390   endp->inBufferIdx = 0 ;
391   endp->inIndex = 0 ;
392   endp->inMinLen = (minlen > 0 ? minlen : bufferSizeTotal) ;
393   endp->inCbk = func ;
394   endp->inAmtRead = 0 ;
395   endp->inClientData = clientData ;
396
397   FD_SET (endp->myFd, &rdSet) ;
398   if ( InputFile == NULL )
399     FD_SET (endp->myFd, &exSet) ;
400
401   return 1 ;
402 }
403
404
405
406 /* Request a write to be done at a later point. ENDP is the EndPoint
407  * to do the write. BUFF is the Buffer to write from. FUNC is the
408  * function to call when the write is complete. CLIENTDATA is some
409  * data to hand back to the callback function.
410  *
411  * If BUFF is null, then no write will actually by done, but the
412  * callback function will still be called. This is useful for
413  * connecting sockets.
414  *
415  * Returns 0 if the write couldn't be prepared (like if a write is
416  * still in process.
417  */
418 int prepareWrite (EndPoint endp,
419                   Buffer *buffers,
420                   EndpRWCB progress,
421                   EndpRWCB done,
422                   void *clientData) 
423 {
424   int bufferSizeTotal = 0 ;
425   int idx ;
426
427   ASSERT (endp != NULL) ;
428   
429   if (endp->outBuffer != NULL || FD_ISSET (endp->myFd,&wrSet))
430     return 0 ;                  /* something already there */
431
432   for (idx = 0 ; buffers != NULL && buffers [idx] != NULL ; idx++)
433     bufferSizeTotal += bufferDataSize (buffers [idx]) ;
434   
435   endp->outBuffer = buffers ;
436   endp->outBufferIdx = 0 ;
437   endp->outIndex = 0 ;
438   endp->outProgressCbk = progress ;
439   endp->outDoneCbk = done ;
440   endp->outClientData = clientData ;
441   endp->outSize = bufferSizeTotal ;
442   endp->outAmtWritten = 0 ;
443
444   FD_SET (endp->myFd, &wrSet) ;
445   FD_SET (endp->myFd, &exSet) ;
446
447   return 1 ;
448 }
449
450
451 /* Cancel the pending read. */
452 void cancelRead (EndPoint endp)
453 {
454   FD_CLR (endp->myFd,&rdSet) ;
455   if (!FD_ISSET (endp->myFd, &wrSet))
456     FD_CLR (endp->myFd,&exSet) ;
457
458   freeBufferArray (endp->inBuffer) ;
459   
460   endp->inBuffer = NULL ;
461   endp->inBufferIdx = 0 ;
462   endp->inIndex = 0 ;
463   endp->inMinLen = 0 ;
464   endp->inAmtRead = 0 ;
465   endp->inCbk = NULL ;
466   endp->inClientData = NULL ;
467 }
468
469
470 /* cancel all pending writes. The first len bytes of the queued write are
471   copied to buffer. The number of bytes copied (if it is less than *len) is
472   copied to len. If no write was outstanding then len will have 0 stored in
473   it. */
474 void cancelWrite (EndPoint endp, char *buffer UNUSED, size_t *len UNUSED)
475 {
476   FD_CLR (endp->myFd, &wrSet) ;
477   if (!FD_ISSET (endp->myFd, &rdSet))
478     FD_CLR (endp->myFd, &exSet) ;
479
480 #if 0
481 #error XXX need to copy data to buffer and *len
482 #endif
483   
484   freeBufferArray (endp->outBuffer) ;
485   
486   endp->outBuffer = NULL ;
487   endp->outBufferIdx = 0 ;
488   endp->outIndex = 0 ;
489   endp->outProgressCbk = NULL ;
490   endp->outDoneCbk = NULL ;
491   endp->outClientData = NULL ;
492   endp->outSize = 0 ;
493   endp->outAmtWritten = 0 ;
494 }
495
496 /* queue up a new timeout request. to go off at a specific time. */
497 TimeoutId prepareWake (EndpTCB func, time_t timeToWake, void *clientData) 
498 {
499   TimeoutId id ;
500   time_t now = theTime() ;
501
502   if (now > timeToWake)
503     return 0 ;
504   
505   id = timerElemAdd (timeToWake,func,clientData) ;
506
507 #if 0
508   d_printf (1,"Preparing wake %d at date %ld for %d seconds\n",
509            (int) id, (long) now, timeToWake - now) ;
510 #endif
511
512   return id ;
513 }
514
515
516 /* queue up a new timeout request to off TIMETOSLEEP seconds from now */
517 TimeoutId prepareSleep (EndpTCB func, int timeToSleep, void *clientData) 
518 {
519   time_t now = theTime() ;
520   TimeoutId id ;
521   
522   id = timerElemAdd (now + timeToSleep,func,clientData) ;
523
524 #if 0
525   d_printf (1,"Preparing sleep %d at date %ld for %d seconds\n",
526            (int) id, (long) now, timeToSleep) ;
527 #endif
528
529   return id ;
530 }
531
532
533 /* Updates a an existing timeout request to go off TIMETOSLEEP seconds from
534    now, or queues a new request.  Always returns a new ID. */
535 TimeoutId updateSleep (TimeoutId tid, EndpTCB func, int timeToSleep,
536                        void *clientData) 
537 {
538   if (tid == 0)
539     return prepareSleep (func, timeToSleep, clientData) ;
540   else
541     {
542       /* XXX - quick and dirty but CPU wasteful implementation */
543       removeTimeout (tid) ;
544       return prepareSleep (func, timeToSleep, clientData) ;
545     }
546 }
547
548
549 /* Remove a timeout that was previously prepared. 0 is a legal value that
550    is just ignored. */
551 bool removeTimeout (TimeoutId tid)
552 {
553   TimerElem n = timeoutQueue ;
554   TimerElem p = NULL ;
555
556   if (tid == 0)
557     return true ;
558   
559   while (n != NULL && n->id != tid)
560     {
561       p = n ;
562       n = n->next ;
563     }
564
565   if (n == NULL)
566     return false ;
567
568   if (p == NULL)                /* at the head. */
569     timeoutQueue = n->next ;
570   else
571     p->next = n->next ;
572
573   n->next = timeoutPool ;
574   timeoutPool = n ;
575
576   timeoutQueueLength-- ;
577   
578   return true ;
579 }
580
581
582 /* The main routine. This is a near-infinite loop that drives the whole
583    program. */
584 void Run (void) 
585 {
586   fd_set rSet ;
587   fd_set wSet ;
588   fd_set eSet ;
589   unsigned long last_summary = 0 ;
590
591   keepSelecting = 1 ;
592   xsignal (SIGPIPE, pipeHandler) ;
593
594   while (keepSelecting)
595     {
596       struct timeval timeout ;
597       struct timeval *twait ;
598       int sval ;
599       unsigned int idx ;
600       bool modifiedTime = false ;
601       
602       twait = getTimeout (&timeout) ;
603
604       memcpy (&rSet,&rdSet,sizeof (rdSet)) ;
605       memcpy (&wSet,&wrSet,sizeof (wrSet)) ;
606       memcpy (&eSet,&exSet,sizeof (exSet)) ;
607
608       if (highestFd < 0 && twait == NULL) /* no fds and no timeout */
609         break ;
610       else if (twait != NULL && (twait->tv_sec != 0 || twait->tv_usec != 0))
611         {
612             /* if we have any workprocs registered we poll rather than
613                block on the fds */
614           for (idx = 0 ; idx < priorityCount ; idx++)
615             if (priorityList [idx] != NULL &&
616                 priorityList [idx]->workCbk != NULL)
617               {
618                 modifiedTime = true ;
619                 twait->tv_sec = 0 ;
620                 twait->tv_usec = 0 ;
621
622                 break ;
623               }
624         }
625
626       /* calculate host backlog statistics */
627       TMRstart(TMR_BACKLOGSTATS);
628       gCalcHostBlStat ();
629       TMRstop(TMR_BACKLOGSTATS);
630
631       TMRstart(TMR_IDLE);
632       sval = select (highestFd + 1, &rSet, &wSet, &eSet, twait) ;
633       TMRstop(TMR_IDLE);
634
635       timePasses () ;
636       if (innconf->timer)
637         {
638           unsigned long now = TMRnow () ;
639           if (last_summary == 0 
640               || (long) (now - last_summary) > (innconf->timer * 1000))
641             {
642               TMRsummary ("ME", timer_name) ;
643               last_summary = now;
644             }
645         }
646       
647       if (sval == 0 && twait == NULL)
648         die ("No fd's ready and no timeouts") ;
649       else if (sval < 0 && errno == EINTR)
650         {
651           handleSignals () ;
652         }
653       else if (sval < 0) 
654         {
655           syswarn ("ME exception: select failed: %d", sval) ;
656           stopRun () ;
657         }
658       else if (sval > 0)
659         {
660           IoStatus rval ;
661           int readyCount = sval ;
662           int endpointsServiced = 1 ;
663           
664           handleSignals() ;
665           
666           for (idx = 0 ; idx < priorityCount ; idx++)
667             {
668               EndPoint ep = priorityList [idx] ;
669               bool specialCheck = false ;
670
671               if (readyCount > 0 && ep != NULL) 
672                 {
673                   int fd = ep->myFd ;
674                   int selectHit = 0, readMiss = 0, writeMiss = 0 ;
675
676                   /* Every SELECT_RATIO times we service an endpoint in this
677                      loop we check to see if the mainEndPoint fd is ready to
678                      read or write. If so we process it and do the current
679                      endpoint next time around. */
680                   if (((endpointsServiced % (SELECT_RATIO + 1)) == 0) &&
681                       ep != mainEndPoint && mainEndPoint != NULL &&
682                       !mainEpIsReg)
683                     {
684                       fd_set trSet, twSet ;
685                       struct timeval tw ;
686                       int checkRead = FD_ISSET (mainEndPoint->myFd,&rdSet) ;
687                       int checkWrite = FD_ISSET (mainEndPoint->myFd,&wrSet) ;
688
689                       endpointsServiced++;
690
691                       if (checkRead || checkWrite) 
692                         {
693                           fd = mainEndPoint->myFd ;
694
695                           tw.tv_sec = tw.tv_usec = 0 ;
696                           memset (&trSet,0,sizeof (trSet)) ;
697                           memset (&twSet,0,sizeof (twSet)) ;
698                       
699                           if (checkRead)
700                             FD_SET (fd,&trSet) ;
701                           if (checkWrite)
702                             FD_SET (fd,&twSet) ;
703
704                           sval = select (fd + 1,&trSet,&twSet,0,&tw) ;
705
706                           if (sval > 0)
707                             {
708                               idx-- ;
709                               ep = mainEndPoint ;
710                               specialCheck = true ;
711                               if (checkRead && FD_ISSET (fd,&trSet))
712                                 {
713                                   FD_SET (fd,&rSet) ;
714                                   readyCount++ ;
715                                 }
716                               if (checkWrite && FD_ISSET (fd,&twSet))
717                                 {
718                                   FD_SET (fd,&wSet) ;
719                                   readyCount++ ;
720                                 }
721                             }
722                           else if (sval < 0)
723                             {
724                               syswarn ("ME exception: select failed: %d",
725                                        sval) ;
726                               stopRun () ;
727                               return ;
728                             }
729                           else
730                             fd = ep->myFd ; /* back to original fd. */
731                         }
732                     }
733
734                   FD_CLR (fd, &exSet) ;
735
736                   if (FD_ISSET (fd,&rSet))
737                     {
738                       readyCount-- ;
739                       endpointsServiced++ ;
740                       selectHit = 1 ;
741                       
742                       if ((rval = doRead (ep)) != IoIncomplete)
743                         {
744                           Buffer *buff = ep->inBuffer ;
745
746                           FD_CLR (fd, &rdSet) ;
747
748                           /* incase callback wants to issue read */
749                           ep->inBuffer = NULL ; 
750                           
751                           if (ep->inCbk != NULL)
752                             (*ep->inCbk) (ep,rval,buff,ep->inClientData) ;
753                           else
754                             freeBufferArray (buff) ;
755                         }
756                       else
757                         {
758                           if ( InputFile == NULL )
759                             FD_SET (ep->myFd, &exSet) ;
760                         }
761                     }
762                   else if (FD_ISSET(fd,&rdSet))
763                     readMiss = 1;
764
765                   /* get it again as the read callback may have deleted the */
766                   /* endpoint */
767                   if (specialCheck)
768                     ep = mainEndPoint ;
769                   else
770                     ep = priorityList [idx] ;
771                   
772                   if (readyCount > 0 && ep != NULL && FD_ISSET (fd,&wSet))
773                     {
774                       readyCount-- ;
775                       endpointsServiced++ ;
776                       selectHit = 1 ;
777                       
778                       if ((rval = doWrite (ep)) != IoIncomplete &&
779                           rval != IoProgress)
780                         {
781                           Buffer *buff = ep->outBuffer ;
782
783                           FD_CLR (fd, &wrSet) ;
784
785                           /* incase callback wants to issue a write */
786                           ep->outBuffer = NULL ;        
787                           
788                           if (ep->outDoneCbk != NULL)
789                             (*ep->outDoneCbk) (ep,rval,buff,ep->outClientData) ;
790                           else
791                             freeBufferArray (buff) ;
792                         }
793                       else if (rval == IoProgress)
794                         {
795                           Buffer *buff = ep->outBuffer ;
796
797                           if (ep->outProgressCbk != NULL)
798                             (*ep->outProgressCbk) (ep,rval,buff,ep->outClientData) ;
799                         }
800                       else
801                         {
802                           FD_SET (ep->myFd, &exSet) ;
803                         }
804                     }
805                   else if (FD_ISSET(fd,&wrSet))
806                     writeMiss = 1;
807
808                   /* get it again as the write callback may have deleted the */
809                   /* endpoint */
810                   if (specialCheck)
811                     ep = mainEndPoint ;
812                   else
813                     ep = priorityList [idx] ;
814
815                   if (ep != NULL)
816                     {
817                       ep->selectHits *= 0.9 ;
818                       if (selectHit)
819                         ep->selectHits += 1.0 ;
820                       else if (readMiss && writeMiss)
821                         ep->selectHits -= 1.0 ;
822                     }
823                     
824                   if (readyCount > 0 && ep != NULL && FD_ISSET (fd,&eSet))
825                     doExcept (ep) ;
826                 }
827             }
828           
829           reorderPriorityList () ;
830         }
831       else if (sval == 0 && !modifiedTime)
832         doTimeout () ;
833
834         /* now we're done processing all read fds and/or the
835            timeout(s). Next we do the work callbacks for all the endpoints
836            whose fds weren't ready. */
837       for (idx = 0 ; idx < priorityCount ; idx++)
838         {
839           EndPoint ep = priorityList [idx] ;
840
841           if (ep != NULL)
842             {
843               int fd = ep->myFd ;
844               
845               if ( !FD_ISSET (fd,&rSet) && !FD_ISSET (fd,&wSet) )
846                 if (ep->workCbk != NULL)
847                   {
848                     EndpWorkCbk func = ep->workCbk ;
849                     void *data = ep->workData ;
850
851                     ep->workCbk = NULL ;
852                     ep->workData = NULL ;
853                     TMRstart(TMR_CALLBACK);
854                     func (ep,data) ;
855                     TMRstop(TMR_CALLBACK);
856                   }
857               
858             }
859         }
860     }
861 }
862
863 void *addWorkCallback (EndPoint endp, EndpWorkCbk cbk, void *data)
864 {
865   void *oldBk = endp->workData ;
866   
867   endp->workCbk = cbk ;
868   endp->workData = data ;
869
870   return oldBk ;
871 }
872
873 /* Tell the Run routine to stop next time around. */
874 void stopRun (void) 
875 {
876   keepSelecting = 0 ;
877 }
878
879
880 int endPointErrno (EndPoint endp)
881 {
882   return endp->myErrno ;
883 }
884
885 bool readIsPending (EndPoint endp) 
886 {
887   return (endp->inBuffer != NULL ? true : false) ;
888 }
889
890 bool writeIsPending (EndPoint endp)
891 {
892   return (endp->outBuffer != NULL ? true : false) ;
893 }
894
895 void setMainEndPoint (EndPoint endp)
896 {
897   struct stat buf ;
898
899   mainEndPoint = endp ;
900   if (endp->myFd >= 0 && fstat (endp->myFd,&buf) < 0)
901     syslog (LOG_ERR,"Can't fstat mainEndPoint fd (%d): %m", endp->myFd) ;
902   else if (endp->myFd < 0)
903     syslog (LOG_ERR,"Negative fd for mainEndPoint???") ;
904   else
905     mainEpIsReg = (S_ISREG(buf.st_mode) ? true : false) ;
906 }
907
908 int getMainEndPointFd (void)
909 {
910   return(mainEndPoint->myFd) ;
911 }
912
913 void freeTimeoutQueue (void)
914 {
915   TimerElem p, n ;
916
917   p = timeoutQueue ;
918   while (p)
919     {
920       n = p->next ;
921       p->next = timeoutPool ;
922       timeoutPool = p;
923       p = n ;
924       timeoutQueueLength-- ;
925     }
926 }
927
928
929 /***********************************************************************/
930 /*                      STATIC FUNCTIONS BELOW HERE                    */
931 /***********************************************************************/
932
933
934 /*
935  * called when the file descriptor on this endpoint is read ready.
936  */
937 static IoStatus doRead (EndPoint endp) 
938 {
939   int i = 0 ;
940   unsigned int idx ;
941   unsigned int bCount = 0 ;
942   struct iovec *vp = NULL ;
943   Buffer *buffers = endp->inBuffer ;
944   unsigned int currIdx = endp->inBufferIdx ;
945   size_t amt = 0 ;
946   IoStatus rval = IoIncomplete ;
947
948   TMRstart(TMR_READ);
949   for (i = currIdx ; buffers && buffers [i] != NULL ; i++)
950     bCount++ ;
951
952   bCount = (bCount > IOV_MAX ? IOV_MAX : bCount) ;
953
954   i = 0 ;
955
956   /* now set up the iovecs for the readv */
957   if (bCount > 0)
958     {
959       char *bbase ;
960       size_t bds, bs ;
961
962       vp = xcalloc (bCount, sizeof(struct iovec)) ;
963
964       bbase = bufferBase (buffers[currIdx]) ;
965       bds = bufferDataSize (buffers[currIdx]) ;
966       bs = bufferSize (buffers [currIdx]) ;
967
968       /* inIndex is an index in the virtual array of the read, not directly
969          into the buffer. */
970       vp[0].iov_base = bbase + bds + endp->inIndex ;
971       vp[0].iov_len = bs - bds - endp->inIndex ;
972
973       amt = vp[0].iov_len ;
974       
975       for (idx = currIdx + 1 ; idx < bCount ; idx++)
976         {
977           bbase = bufferBase (buffers[idx]) ;
978           bds = bufferDataSize (buffers[idx]) ;
979           bs = bufferSize (buffers [idx]) ;
980       
981           vp [idx].iov_base = bbase ;
982           vp [idx].iov_len = bs - bds ;
983           amt += (bs - bds) ;
984         }
985
986       i = readv (endp->myFd,vp,(int) bCount) ;
987
988       if (i > 0)
989         {
990           size_t readAmt = (size_t) i ;
991             
992           endp->inAmtRead += readAmt ;
993           
994           /* check if we filled the first buffer */
995           if (readAmt >= (size_t) vp[0].iov_len)
996             {                   /* we did */
997               bufferIncrDataSize (buffers[currIdx], vp[0].iov_len) ;
998               readAmt -= vp [0].iov_len ;
999               endp->inBufferIdx++ ;
1000             }
1001           else
1002             {
1003               endp->inIndex += readAmt ;
1004               bufferIncrDataSize (buffers[currIdx], readAmt) ;
1005               readAmt = 0 ;
1006             }
1007           
1008           /* now check the rest of the buffers */
1009           for (idx = 1 ; readAmt > 0 ; idx++)
1010             {
1011               ASSERT (idx < bCount) ;
1012
1013               bs = bufferSize (buffers [currIdx + idx]) ;
1014               bbase = bufferBase (buffers [currIdx + idx]) ;
1015               bds = bufferDataSize (buffers [currIdx + idx]) ;
1016               
1017               if (readAmt >= (bs - bds))
1018                 {
1019                   bufferSetDataSize (buffers [currIdx + idx],bs) ;
1020                   readAmt -= bs ;
1021                   endp->inBufferIdx++ ;
1022                 }
1023               else
1024                 {
1025                   endp->inIndex = readAmt ;
1026                   bufferIncrDataSize (buffers [currIdx + idx], readAmt) ;
1027                   memset (bbase + bds + readAmt, 0, bs - bds - readAmt) ;
1028                   readAmt = 0 ;
1029                 }
1030             }
1031
1032           if (endp->inAmtRead >= endp->inMinLen)
1033             {
1034               endp->inIndex = 0 ;
1035               rval = IoDone ;
1036             }
1037         }
1038       else if (i < 0 && errno != EINTR && errno != EAGAIN)
1039         {
1040           endp->myErrno = errno ;
1041           rval = IoFailed ;
1042         }
1043       else if (i < 0 && errno == EINTR)
1044         {
1045           handleSignals () ;
1046         }
1047       else if (i == 0)
1048         rval = IoEOF ;
1049       else                   /* i < 0 && errno == EAGAIN */
1050         rval = IoIncomplete ;
1051       
1052       free (vp) ;
1053     }
1054   else
1055     rval = IoDone ;
1056   TMRstop(TMR_READ);
1057   return rval ;
1058 }
1059
1060 /* called when the file descriptor on the endpoint is write ready. */
1061 static IoStatus doWrite (EndPoint endp)
1062 {
1063   unsigned int idx ;
1064   int i = 0 ;
1065   size_t amt = 0 ;
1066   unsigned int bCount = 0 ;
1067   struct iovec *vp = NULL ;
1068   Buffer *buffers = endp->outBuffer ;
1069   unsigned int currIdx = endp->outBufferIdx ;
1070   IoStatus rval = IoIncomplete ;
1071   
1072   TMRstart(TMR_WRITE);
1073   for (i = currIdx ; buffers && buffers [i] != NULL ; i++)
1074     bCount++ ;
1075
1076   bCount = (bCount > IOV_MAX ? IOV_MAX : bCount) ;
1077
1078   i = 0 ;
1079   
1080   if (bCount > 0)
1081     {
1082       vp = xcalloc (bCount, sizeof(struct iovec)) ;
1083
1084       vp[0].iov_base = bufferBase (buffers [currIdx]) ;
1085       vp[0].iov_base = (char *) vp[0].iov_base + endp->outIndex ;
1086       vp[0].iov_len = bufferDataSize (buffers [currIdx]) - endp->outIndex ;
1087
1088       amt = vp[0].iov_len ;
1089       
1090       for (idx = 1 ; idx < bCount ; idx++)
1091         {
1092           vp [idx].iov_base = bufferBase (buffers [idx + currIdx]) ;
1093           vp [idx].iov_len = bufferDataSize (buffers [idx + currIdx]) ;
1094           amt += vp[idx].iov_len ;
1095         }
1096
1097 #if 1
1098       if (debugWrites) 
1099         {
1100           /* nasty mixing, but stderr is unbuffered usually. It's debugging only */
1101           d_printf (5,"About to write this:================================\n") ;
1102           writev (2,vp,bCount) ;
1103           d_printf (5,"end=================================================\n") ;
1104         }
1105       
1106 #endif
1107
1108       ASSERT (endp->myFd >= 0) ;
1109       ASSERT (vp != 0) ;
1110       ASSERT (bCount > 0) ;
1111       
1112       i = writev (endp->myFd,vp,(int) bCount) ;
1113
1114       if (i > 0)
1115         {
1116           size_t writeAmt = (size_t) i ;
1117           
1118           endp->outAmtWritten += writeAmt ;
1119
1120           /* now figure out which buffers got completely written */
1121           for (idx = 0 ; writeAmt > 0 ; idx++)
1122             {
1123               if (writeAmt >= (size_t) vp[idx].iov_len)
1124                 {
1125                   endp->outBufferIdx++ ;
1126                   endp->outIndex = 0 ;
1127                   writeAmt -= vp [idx].iov_len ;
1128                 }
1129               else
1130                 {
1131                   /* this buffer was not completly written */
1132                   endp->outIndex += writeAmt ;
1133                   writeAmt = 0 ;
1134                 }
1135             }
1136
1137           if (endp->outAmtWritten == endp->outSize)
1138             rval = IoDone ;
1139           else
1140             rval = IoProgress ;
1141         }
1142       else if (i < 0 && errno == EINTR)
1143         {
1144           handleSignals () ;
1145         }
1146       else if (i < 0 && errno == EAGAIN)
1147         {
1148           rval = IoIncomplete ;
1149         }
1150       else if (i < 0)
1151         {
1152           endp->myErrno = errno ;
1153           rval = IoFailed ;
1154         }
1155       else
1156         d_printf (1,"Wrote 0 bytes in doWrite()?\n") ;
1157
1158       free (vp) ;
1159     }
1160   else
1161     rval = IoDone ;
1162
1163   TMRstop(TMR_WRITE);
1164   return rval ;
1165 }
1166
1167
1168 static IoStatus doExcept (EndPoint endp)
1169 {
1170   int optval;
1171   socklen_t size ;
1172   int fd = endPointFd (endp) ;
1173
1174   if (getsockopt (fd, SOL_SOCKET, SO_ERROR,
1175                   (char *) &optval, &size) != 0)
1176     syswarn ("ME exception: getsockopt (%d)", fd) ;
1177   else if (optval != 0)
1178     {
1179       errno = optval ;
1180       syswarn ("ME exception: fd %d", fd) ;
1181     }
1182   else
1183     syswarn ("ME exception: fd %d: Unknown error", fd) ;
1184
1185 #if 0
1186   sleep (5) ;
1187   abort () ;
1188 #endif
1189
1190   /* Not reached */
1191   return IoFailed ;
1192 }
1193
1194 #if 0
1195 static void endPointPrint (EndPoint ep, FILE *fp)
1196 {
1197   fprintf (fp,"EndPoint [%p]: fd [%d]\n",(void *) ep, ep->myFd) ;
1198 }
1199 #endif
1200
1201 static void signalHandler (int s)
1202 {
1203   sigFlags[s] = 1 ;
1204 #ifndef HAVE_SIGACTION
1205   xsignal (s, signalHandler) ;
1206 #endif
1207 }
1208
1209
1210 static void pipeHandler (int s)
1211 {
1212   xsignal (s, pipeHandler) ;
1213 }
1214
1215
1216 /* compare the hit ratio of two endpoint for qsort. We're sorting the
1217    endpoints on their relative activity */
1218 static int hitCompare (const void *v1, const void *v2)
1219 {
1220   const struct endpoint_s *e1 = *((const struct endpoint_s * const *) v1) ;
1221   const struct endpoint_s *e2 = *((const struct endpoint_s * const *) v2) ;
1222   double e1Hit = e1->selectHits ;
1223   double e2Hit = e2->selectHits ;
1224
1225   if (e1 == mainEndPoint)
1226     return -1 ;
1227   else if (e2 == mainEndPoint)
1228     return 1 ;
1229   else if (e1Hit < e2Hit)
1230     return 1 ;
1231   else if (e1Hit > e2Hit)
1232     return -1 ;
1233
1234   return 0 ;
1235 }
1236
1237
1238
1239 /* We maintain the endpoints in order of the percent times they're ready
1240    for read/write when they've been selected. This helps us favour the more
1241    active endpoints. */
1242 static void reorderPriorityList (void)
1243 {
1244   unsigned int i, j ;
1245   static int thisTime = 4;
1246
1247   /* only sort every 4th time since it's so expensive */
1248   if (--thisTime > 0)
1249     return ;
1250
1251   thisTime = 4;
1252
1253   for (i = j = 0; i < priorityCount; i++)
1254     if (priorityList [i] != NULL)
1255       {
1256         if (i != j)
1257           priorityList [j] = priorityList [i] ;
1258         j++ ;
1259       }
1260
1261   for (i = j; i < priorityCount; i++)
1262     priorityList [ i ] = NULL;
1263
1264   priorityCount = j;
1265
1266   qsort (priorityList, (size_t)priorityCount, sizeof (EndPoint), &hitCompare);
1267 }
1268
1269
1270 #define TIMEOUT_POOL_SIZE ((4096 - 2 * (sizeof (void *))) / (sizeof (TimerElemStruct)))
1271
1272 /* create a new timeout data structure properly initialized. */
1273 static TimerElem newTimerElem (TimeoutId i, time_t w, EndpTCB f, void *d)
1274 {
1275   TimerElem p ;
1276
1277   if (timeoutPool == NULL)
1278     {
1279       unsigned int j ;
1280
1281       timeoutPool = xmalloc (sizeof(TimerElemStruct) * TIMEOUT_POOL_SIZE) ;
1282
1283       for (j = 0; j < TIMEOUT_POOL_SIZE - 1; j++)
1284         timeoutPool[j] . next = &(timeoutPool [j + 1]) ;
1285       timeoutPool [TIMEOUT_POOL_SIZE-1] . next = NULL ;
1286     }
1287
1288   p = timeoutPool ;
1289   timeoutPool = timeoutPool->next ;
1290
1291   ASSERT (p != NULL) ;
1292   
1293   p->id = i ;
1294   p->when = w ;
1295   p->func = f ;
1296   p->data = d ;
1297   p->next = NULL ;
1298
1299   return p ;
1300 }
1301
1302
1303
1304 /* add a new timeout structure to the global list. */
1305 static TimeoutId timerElemAdd (time_t when, EndpTCB func, void *data)
1306 {
1307   TimerElem p = newTimerElem (++nextId ? nextId : ++nextId,when,func,data) ;
1308   TimerElem n = timeoutQueue ;
1309   TimerElem q = NULL ;
1310   
1311   while (n != NULL && n->when <= when)
1312     {
1313       q = n ;
1314       n = n->next ;
1315     }
1316
1317   if (n == NULL && q == NULL)   /* empty list so put at head */
1318     timeoutQueue = p ;
1319   else if (q == NULL)           /* put at head of list */
1320     {
1321       p->next = timeoutQueue ;
1322       timeoutQueue = p ;
1323     }
1324   else if (n == NULL)           /* put at end of list */
1325     q->next = p ;
1326   else                          /* in middle of list */
1327     {
1328       p->next = q->next ;
1329       q->next = p ;
1330     }
1331
1332   timeoutQueueLength++ ;
1333   
1334   return p->id ;
1335 }
1336
1337
1338 /* Fills in TOUT with the timeout to use on the next call to
1339  * select. Returns TOUT. If there is no timeout, then returns NULL.  If the
1340  * timeout has already passed, then it calls the timeout handling routine
1341  * first.
1342  */
1343 static struct timeval *getTimeout (struct timeval *tout)
1344 {
1345   struct timeval *rval = NULL ;
1346   
1347   if (timeoutQueue != NULL)
1348     {
1349       time_t now = theTime() ;
1350
1351       while (timeoutQueue && now > timeoutQueue->when)
1352         doTimeout () ;
1353           
1354       if (timeoutQueue != NULL && now == timeoutQueue->when)
1355         {
1356           tout->tv_sec = 0 ;
1357           tout->tv_usec = 0 ;
1358           rval = tout ;
1359         }
1360       else if (timeoutQueue != NULL)
1361         {
1362           tout->tv_sec = timeoutQueue->when - now ;
1363           tout->tv_usec = 0 ;
1364           rval = tout ;
1365         }
1366     }
1367
1368   return rval ;
1369 }
1370
1371       
1372   
1373
1374
1375   
1376 static void doTimeout (void)
1377 {
1378   EndpTCB cbk = timeoutQueue->func ;
1379   void *data = timeoutQueue->data ;
1380   TimerElem p = timeoutQueue ;
1381   TimeoutId tid = timeoutQueue->id ;
1382
1383   timeoutQueue = timeoutQueue->next ;
1384
1385   p->next = timeoutPool ;
1386   timeoutPool = p ;
1387
1388   timeoutQueueLength-- ;
1389   
1390   if (cbk)
1391     (*cbk) (tid, data) ;        /* call the callback function */
1392 }
1393
1394
1395
1396
1397
1398 #if defined (WANT_MAIN)
1399
1400
1401 #define BUFF_SIZE 100
1402
1403
1404 void timerCallback (void *cd) ;
1405 void timerCallback (void *cd)
1406 {
1407   d_printf (1,"Callback \n") ;
1408 }
1409
1410   
1411 void lineIsWritten (EndPoint ep, IoStatus status, Buffer *buffer, void *data);
1412 void lineIsWritten (EndPoint ep, IoStatus status, Buffer *buffer, void *data)
1413 {
1414   int i ;
1415   
1416   if (status == IoDone)
1417     d_printf (1,"LINE was written\n") ;
1418   else
1419     {
1420       int oldErrno = errno ;
1421       
1422       errno = endPointErrno (ep) ;
1423       perror ("write failed") ;
1424       errno = oldErrno ;
1425     }
1426
1427   for (i = 0 ; buffer && buffer [i] ; i++)
1428     delBuffer (buffer [i]) ;
1429 }
1430
1431 void lineIsRead (EndPoint myEp, IoStatus status, Buffer *buffer, void *data);
1432 void lineIsRead (EndPoint myEp, IoStatus status, Buffer *buffer, void *d)
1433 {
1434   Buffer *writeBuffers, *readBuffers ;
1435   Buffer newBuff1, newBuff2 ;
1436   Buffer newInputBuffer ;
1437   char *data, *p ;
1438   size_t len ;
1439
1440   if (status == IoFailed)
1441     {
1442       int oldErrno = errno ;
1443
1444       errno = endPointErrno (myEp) ;
1445       perror ("read failed") ;
1446       errno = oldErrno ;
1447
1448       return ;
1449     }
1450   else if (status == IoEOF)
1451     {
1452       d_printf (1,"EOF on endpoint.\n") ;
1453       delEndPoint (myEp) ;
1454
1455       return ;
1456     }
1457   
1458   
1459   data = bufferBase (buffer[0]) ;
1460   len = bufferDataSize (buffer[0]) ;
1461   
1462   if (data [len - 1] == '\r' || data [len - 1] == '\n')
1463     bufferDecrDataSize (buffer [0],1) ;
1464   if (data [len - 1] == '\r' || data [len - 1] == '\n')
1465     bufferDecrDataSize (buffer [0],1) ;
1466
1467   data [len] = '\0' ;
1468   
1469   d_printf (1,"Got a line: %s\n", data) ;
1470
1471   newBuff1 = newBuffer (len + 50) ;
1472   newBuff2 = newBuffer (len + 50) ;
1473   newInputBuffer = newBuffer (BUFF_SIZE) ;
1474   
1475   p = bufferBase (newBuff1) ; 
1476   strcpy (p, "Thanks for that \"") ;
1477   bufferSetDataSize (newBuff1,strlen (p)) ;
1478   
1479   p = bufferBase (newBuff2) ;
1480   strcpy (p,"\" very tasty\n") ;
1481   bufferSetDataSize (newBuff2,strlen (p)) ;
1482
1483   writeBuffers = makeBufferArray (newBuff1,buffer[0],newBuff2,NULL) ;
1484   readBuffers = makeBufferArray (newInputBuffer,NULL) ;
1485   
1486   prepareWrite (myEp,writeBuffers,lineIsWritten,NULL) ;
1487   prepareRead (myEp,readBuffers,lineIsRead,NULL,1) ;
1488
1489 #if 0
1490   myEp->registerWake (&timerCallback,theTime() + 7,0) ;
1491 #endif
1492 }
1493
1494
1495 static void printDate (TimeoutId tid, void *data) ;
1496 static void printDate (TimeoutId tid, void *data)
1497 {
1498   time_t t ;
1499
1500   t = theTime() ;
1501   
1502   d_printf (1,"Timeout (%d) time now is %ld %s",
1503            (int) tid,(long) t,ctime(&t)) ;
1504
1505   if (timeoutQueue == NULL) 
1506     {
1507       int ti = (rand () % 10) + 1 ;
1508
1509       prepareSleep (printDate,ti,data) ;
1510     }
1511 }
1512
1513 TimeoutId rm ;
1514
1515 static void Timeout (TimeoutId tid, void *data) ;
1516 static void Timeout (TimeoutId tid, void *data)
1517 {
1518   static int seeded ;
1519   static int howMany ;
1520   static int i ;
1521   time_t t = theTime() ;
1522
1523   if ( !seeded )
1524     {
1525       srand (t) ;
1526       seeded = 1 ;
1527     }
1528
1529   d_printf (1,"Timeout (%d) time now is %ld %s",
1530            (int) tid, (long) t,ctime(&t)) ;
1531   
1532   if (timeoutQueue != NULL && timeoutQueue->next != NULL)
1533     d_printf (1,"%s timeout id %d\n",
1534              (removeTimeout (rm) ? "REMOVED" : "FAILED TO REMOVE"), rm) ;
1535   rm = 0 ;
1536   
1537   howMany = (rand() % 10) + (timeoutQueue == NULL ? 1 : 0) ;
1538
1539   for (i = 0 ; i < howMany ; i++ )
1540     {
1541       TimeoutId id ;
1542       int count = (rand () % 30) + 1 ;
1543
1544       id = (i % 2 == 0 ? prepareSleep (Timeout,count,data)
1545             : prepareWake (Timeout,t + count,data)) ;
1546
1547       if (rm == 0)
1548         rm = id ;
1549     }
1550 }
1551
1552
1553 void newConn (EndPoint ep, IoStatus status, Buffer *buffer, void *d) ;
1554 void newConn (EndPoint ep, IoStatus status, Buffer *buffer, void *d)
1555 {
1556   EndPoint newEp ;
1557   struct sockaddr_in in ;
1558   Buffer *readBuffers ;
1559   Buffer newBuff = newBuffer (BUFF_SIZE) ;
1560   int len = sizeof (in) ;
1561   int fd ;
1562
1563   memset (&in, 0, sizeof (in)) ;
1564   
1565   fd = accept (ep->myFd, (struct sockaddr *) &in, &len) ;
1566
1567   if (fd < 0)
1568     {
1569       perror ("::accept") ;
1570       return ;
1571     }
1572   
1573   newEp = newEndPoint (fd) ;
1574
1575   prepareRead (ep, NULL, newConn,NULL,0) ;
1576
1577   readBuffers = makeBufferArray (newBuff,NULL) ;
1578
1579   prepareRead (newEp, readBuffers, lineIsRead, NULL, 1) ;
1580
1581   d_printf (1,"Set up a new connection\n");
1582 }
1583
1584
1585 int main (int argc, char **argv)
1586 {
1587   EndPoint accConn ;
1588   struct sockaddr_in accNet ;
1589   int accFd = socket (AF_INET,SOCK_STREAM,0) ;
1590   unsigned short port = atoi (argc > 1 ? argv[1] : "10000") ;
1591   time_t t = theTime() ;
1592
1593
1594   program = strrchr (argv[0],'/') ;
1595
1596   if (!program)
1597     program = argv [0] ;
1598   else
1599     program++ ;
1600
1601   ASSERT (accFd >= 0) ;
1602
1603   memset (&accNet,0,sizeof (accNet)) ;
1604   accNet.sin_family = AF_INET ;
1605   accNet.sin_addr.s_addr = htonl (INADDR_ANY) ;
1606   accNet.sin_port = htons (port) ;
1607
1608 #ifdef LOG_PERROR
1609   openlog (program, LOG_PERROR | LOG_PID, LOG_NEWS) ;
1610 #else
1611   openlog (program, LOG_PID, LOG_NEWS) ;
1612 #endif
1613   
1614   if (bind (accFd, (struct sockaddr *) &accNet, sizeof (accNet)) < 0)
1615     {
1616       perror ("bind: %m") ;
1617       exit (1) ;
1618     }
1619
1620   listen (accFd,5) ;
1621   
1622   accConn = newEndPoint (accFd) ;
1623
1624   prepareRead (accConn,NULL,newConn,NULL,0) ;
1625
1626   prepareSleep (Timeout,5,(void *) 0x10) ;
1627
1628   t = theTime() ;
1629   d_printf (1,"Time now is %s",ctime(&t)) ;
1630   
1631   prepareWake (printDate,t + 16,NULL) ;
1632
1633   Run () ;
1634
1635   return 0;
1636 }
1637 #endif /* WANT_MAIN */
1638
1639 /* Probably doesn't do the right thing for SIGCHLD */
1640 void setSigHandler (int signum, void (*ptr)(int))
1641 {
1642   unsigned int i ;
1643
1644   if (sigHandlers == NULL)
1645     {
1646       sigHandlers = xmalloc (sizeof(sigfn) * NSIG) ;
1647       sigFlags = xmalloc (sizeof(sig_atomic_t) * NSIG) ;
1648       for (i = 0 ; i < NSIG ; i++)
1649         {
1650           sigHandlers [i] = NULL ;
1651           sigFlags [i] = 0 ;
1652         }
1653     }
1654
1655   if (signum >= NSIG)
1656     {
1657       syslog (LOG_ERR,"ME signal number bigger than NSIG: %d vs %d",
1658               signum,NSIG) ;
1659       return ;
1660     }
1661
1662   if (xsignal (signum, signalHandler) == SIG_ERR)
1663     die ("signal failed: %s", strerror(errno)) ;
1664
1665   sigHandlers[signum] = ptr ;
1666 }
1667
1668 static void handleSignals (void)
1669 {
1670   int i ;
1671 #if defined(USE_SIGVEC)
1672   int mask ;
1673 #endif
1674
1675   for (i = 1; i < NSIG; i++)
1676     {
1677       if (sigFlags[i])
1678         {
1679 #if defined(USE_SIGACTION)
1680           sigset_t set, oset ;
1681       
1682           if (sigemptyset (&set) != 0 || sigaddset (&set, i) != 0)
1683             die ("sigemptyset or sigaddset failed") ;
1684           if (sigprocmask (SIG_BLOCK, &set, &oset) != 0)
1685             die ("sigprocmask failed: %s", strerror(errno)) ;
1686 #elif defined(USE_SIGVEC)
1687 # ifndef sigmask
1688 #  define sigmask(s)    (1 << ((s) - 1))
1689 # endif
1690           int mask ;
1691           
1692           mask = sigblock (sigmask(i)) ;
1693 #elif defined(USE_SIGSET)
1694           if (sighold (i) != 0)
1695             die ("sighold failed: %s", strerror(errno)) ;
1696 #else
1697       /* hope for the best */
1698 #endif
1699
1700           sigFlags[i] = 0;
1701
1702           if (sigHandlers[i] != NULL &&
1703               sigHandlers[i] != SIG_IGN &&
1704               sigHandlers[i] != SIG_DFL)
1705             (sigHandlers[i])(i) ;
1706             
1707 #if defined(USE_SIGACTION)
1708           if (sigprocmask (SIG_SETMASK, &oset, (sigset_t *)NULL) != 0)
1709             die ("sigprocmask failed: %s", strerror(errno)) ;
1710 #elif defined(USE_SIGVEC)
1711           sigsetmask (mask) ;
1712 #elif defined(USE_SIGSET)
1713           if (sigrelse (i) != 0)
1714             die ("sigrelse failed: %s", strerror(errno)) ;
1715 #else
1716           /* hope for the best */
1717 #endif
1718         }
1719     }
1720 }
1721
1722
1723 int endpointConfigLoadCbk (void *data)
1724 {
1725   FILE *fp = (FILE *) data ;
1726   long ival ;
1727   int rval = 1 ;
1728
1729   if (getInteger (topScope,"stdio-fdmax",&ival,NO_INHERIT))
1730     {
1731       stdioFdMax = ival ;
1732
1733 #if ! defined (FD_SETSIZE)
1734
1735       if (stdioFdMax > 0)
1736         {
1737           logOrPrint (LOG_ERR,fp,NO_STDIO_FDMAX) ;
1738           stdioFdMax = 0 ;
1739           rval = 0 ;
1740         }
1741
1742 #else
1743
1744       if (stdioFdMax > FD_SETSIZE)
1745         {
1746           logOrPrint (LOG_ERR,fp,
1747                       "ME config: value of %s (%ld) in %s is higher"
1748                       " than maximum of %ld. Using %ld","stdio-fdmax",
1749                       ival,"global scope",
1750                       (long) FD_SETSIZE, (long) FD_SETSIZE) ;
1751           stdioFdMax = FD_SETSIZE ;
1752           rval = 0 ;
1753         }
1754       
1755 #endif
1756       
1757     }
1758   else
1759     stdioFdMax = 0 ;
1760
1761   return rval ;
1762 }
1763
1764
1765
1766 #if 0
1767 /* definitely not the fastest, but the most portable way to find the first
1768   set bit in a mask  */
1769 static int ff_set (fd_set *set,unsigned int start)
1770 {
1771   unsigned int i ;
1772
1773   for (i = start ; i < FD_SETSIZE ; i++)
1774     if (FD_ISSET (i,set))
1775       return (int) i ;
1776
1777   return -1 ;
1778 }
1779
1780
1781 static int ff_free (fd_set *set, unsigned int start)
1782 {
1783   unsigned int i ;
1784
1785   for (i = start ; i < FD_SETSIZE ; i++)
1786     if (!FD_ISSET (i,set))
1787       return i ;
1788
1789
1790   return -1 ;
1791 }
1792 #endif
1793
1794
1795 static void endpointCleanup (void)
1796 {
1797   free (endPoints) ;
1798   free (priorityList) ;
1799   free (sigHandlers) ;
1800   endPoints = NULL ;
1801   priorityList = NULL ;
1802   sigHandlers = NULL ;
1803 }