1 /* $Id: endpoint.c 7738 2008-04-06 09:33:33Z iulius $
3 ** The implementation of the innfeed EndPoint object class.
5 ** Written by James Brister <brister@vix.com>
7 ** The EndPoint class is what gives the illusion (sort of, kind of) of
8 ** threading. Basically it controls a select loop and a set of EndPoint
9 ** objects. Each EndPoint has a file descriptor it is interested in. The
10 ** users of the EndPoint tell the EndPoints to notify them when a read or
11 ** write has been completed (or simple if the file descriptor is read or
18 #include "portable/socket.h"
19 #include "portable/time.h"
33 #ifdef HAVE_SYS_SELECT_H
34 # include <sys/select.h>
37 #include "inn/innconf.h"
38 #include "inn/messages.h"
42 #include "configfile.h"
46 static const char *const timer_name[] = {
47 "idle", "blstats", "stsfile", "newart", "readart", "prepart", "read",
56 /* This is the structure that is the EndPoint */
59 /* fields for managing multiple reads into the inBuffer. */
60 Buffer *inBuffer ; /* list of buffers to read into */
61 unsigned int inBufferIdx ; /* where is list we're at. */
62 size_t inIndex ; /* where in current read we're at */
63 size_t inMinLen ; /* minimum amount to read */
64 size_t inAmtRead ; /* amount read so far */
65 EndpRWCB inCbk ; /* callback for when read complete */
66 void *inClientData ; /* callback data */
68 /* fields for managing multiple writes from the outBuffer */
69 Buffer *outBuffer ; /* list of buffers to write */
70 unsigned int outBufferIdx ; /* index into buffer list to start write */
71 size_t outIndex ; /* where in current buffer we write from */
72 size_t outSize ; /* total of all the buffers */
73 size_t outAmtWritten ; /* amount written so far */
74 EndpRWCB outProgressCbk ; /* callback when done */
75 EndpRWCB outDoneCbk ; /* callback when done */
76 void *outClientData ; /* callback data */
78 EndpWorkCbk workCbk ; /* callback to run if no I/O to do */
79 void *workData ; /* data for callback */
81 int myFd ; /* the file descriptor we're handling */
82 int myErrno ; /* the errno when I/O fails */
84 double selectHits ; /* indicates how often it's ready */
89 /* A private structure. These hold the information on the timer callbacks. */
90 typedef struct timerqelem_s
92 TimeoutId id ; /* the id we gave out */
93 time_t when ; /* The time the timer should go off */
94 EndpTCB func ; /* the function to call */
95 void *data ; /* the client callback data */
96 struct timerqelem_s *next ; /* next in the queue */
97 } *TimerElem, TimerElemStruct ;
101 /* set to 1 elsewhere if you want stderr to get what's also being written
105 extern const char *InputFile ;
107 static EndPoint mainEndPoint ;
108 static bool mainEpIsReg = false ;
109 static unsigned int stdioFdMax = MAX_STDIO_FD ;
114 typedef void (*sigfn) (int) ;
115 static sigfn *sigHandlers ;
117 static volatile sig_atomic_t *sigFlags ;
121 /* private functions */
122 static IoStatus doRead (EndPoint endp) ;
123 static IoStatus doWrite (EndPoint endp) ;
124 static IoStatus doExcept (EndPoint endp) ;
125 static void pipeHandler (int s) ;
126 static void signalHandler (int s) ;
127 static int hitCompare (const void *v1, const void *v2) ;
128 static void reorderPriorityList (void) ;
129 static TimerElem newTimerElem (TimeoutId i, time_t w, EndpTCB f, void *d) ;
130 static TimeoutId timerElemAdd (time_t when, EndpTCB func, void *data) ;
131 static struct timeval *getTimeout (struct timeval *tout) ;
132 static void doTimeout (void) ;
133 static void handleSignals (void) ;
136 static int ff_set (fd_set *set, unsigned int start) ;
137 static int ff_free (fd_set *set, unsigned int start) ;
139 static void endpointCleanup (void) ;
143 static size_t maxEndPoints ;
145 static EndPoint *endPoints ; /* endpoints indexed on fd */
146 static EndPoint *priorityList ; /* endpoints indexed on priority */
148 static int absHighestFd = 0 ; /* never goes down */
149 static int highestFd = -1 ;
150 static unsigned int endPointCount = 0 ;
151 static unsigned int priorityCount = 0 ;
153 static fd_set rdSet ;
154 static fd_set wrSet ;
155 static fd_set exSet ;
157 static int keepSelecting ;
159 static TimerElem timeoutQueue ;
160 static TimerElem timeoutPool ;
161 static TimeoutId nextId ;
162 static int timeoutQueueLength ;
167 /* Create a new EndPoint and hook it to the give file descriptor. All
168 fields are initialized to appropriate values. On the first time this
169 function is called the global data structs that manages lists of
170 endpoints are intialized. */
171 static bool inited = false ;
173 EndPoint newEndPoint (int fd)
180 atexit (endpointCleanup) ;
186 /* try to dup the fd to a larger number to leave lower values free for
187 broken stdio implementations. */
188 if (stdioFdMax > 0 && ((unsigned int) fd) <= stdioFdMax)
190 int newfd = fcntl(fd, F_DUPFD, stdioFdMax + 1);
193 d_printf (1,"Dupped fd %d to %d\n",fd,newfd) ;
195 syswarn ("ME oserr close (%d)", fd) ;
199 d_printf (1,"Couldn't dup fd %d to above %d\n",fd,stdioFdMax) ;
206 if ((unsigned int) fd >= maxEndPoints)
208 unsigned int i = maxEndPoints ;
210 maxEndPoints = (((fd + 256) / 256) * 256); /* round up to nearest 256 */
211 if (endPoints == NULL)
213 endPoints = xmalloc (sizeof(EndPoint) * maxEndPoints) ;
214 priorityList = xmalloc (sizeof(EndPoint) * maxEndPoints) ;
218 endPoints = xrealloc (endPoints,sizeof(EndPoint) * maxEndPoints) ;
219 priorityList = xrealloc (priorityList,
220 sizeof(EndPoint) * maxEndPoints) ;
223 for ( ; i < maxEndPoints ; i++)
224 endPoints [i] = priorityList [i] = NULL ;
227 ASSERT (endPoints [fd] == NULL) ;
229 if (fd > absHighestFd)
231 static bool sizelogged = false ;
233 #if defined (FD_SETSIZE)
234 if (fd >= FD_SETSIZE)
237 warn ("ME fd (%d) looks too big (%d -- FD_SETSIZE)", fd,
242 if (fd > (sizeof (fd_set) * CHAR_BIT))
245 warn ("ME fd (%d) looks too big (%d -- sizeof (fd_set) * CHAR_BIT)",
246 fd, (sizeof (fd_set) * CHAR_BIT)) ;
254 ep = xcalloc (1, sizeof(struct endpoint_s)) ;
256 ep->inBuffer = NULL ;
257 ep->inBufferIdx = 0 ;
262 ep->inClientData = NULL ;
265 ep->outBufferIdx = 0 ;
268 ep->outProgressCbk = NULL ;
269 ep->outDoneCbk = NULL ;
270 ep->outClientData = NULL ;
271 ep->outAmtWritten = 0 ;
274 ep->workData = NULL ;
279 ep->selectHits = 0.0 ;
281 endPoints [fd] = ep ;
282 priorityList [priorityCount++] = ep ;
285 highestFd = (fd > highestFd ? fd : highestFd) ;
292 /* Delete the given endpoint. The files descriptor is closed and the two
293 Buffer arrays are released. */
295 void delEndPoint (EndPoint ep)
302 ASSERT (endPoints [ep->myFd] == ep) ;
304 if (mainEndPoint == ep)
305 mainEndPoint = NULL ;
307 if (ep->inBuffer != NULL)
308 freeBufferArray (ep->inBuffer) ;
310 if (ep->outBuffer != NULL)
311 freeBufferArray (ep->outBuffer) ;
315 /* remove from selectable bits */
316 FD_CLR (ep->myFd,&rdSet) ;
317 FD_CLR (ep->myFd,&wrSet) ;
318 FD_CLR (ep->myFd,&exSet) ;
320 /* Adjust the global arrays to account for deleted endpoint. */
321 endPoints [ep->myFd] = NULL ;
322 if (ep->myFd == highestFd)
323 while (endPoints [highestFd] == NULL && highestFd >= 0)
326 for (idx = 0 ; idx < priorityCount ; idx++)
327 if (priorityList [idx] == ep)
330 ASSERT (idx < priorityCount) ; /* i.e. was found */
331 ASSERT (priorityList [idx] == ep) ; /* redundant */
333 /* this hole will removed in the reorder routine */
334 priorityList [idx] = NULL ;
341 int endPointFd (EndPoint endp)
343 ASSERT (endp != NULL) ;
351 /* Request a read to be done next time there's data. The endpoint
352 * ENDP is what will do the read. BUFF is the Buffer the data should
353 * go into. FUNC is the callback function to call when the read is
354 * complete. CLIENTDATA is the client data to pass back into the
355 * callback function. MINLEN is the minimum amount of data to
356 * read. If MINLEN is 0 then then BUFF must be filled, otherwise at
357 * least MINLEN bytes must be read.
359 * BUFF can be null, in which case no read is actually done, but the
360 * callback function will be called still. This is useful for
363 * Returns 0 if the read couln't be prepared (for example if a read
364 * is already outstanding).
367 int prepareRead (EndPoint endp,
373 int bufferSizeTotal = 0 ;
376 ASSERT (endp != NULL) ;
378 if (endp->inBuffer != NULL || FD_ISSET (endp->myFd,&rdSet))
379 return 0 ; /* something already there */
381 for (idx = 0 ; buffers != NULL && buffers [idx] != NULL ; idx++)
383 size_t bs = bufferSize (buffers [idx]) ;
384 size_t bds = bufferDataSize (buffers [idx]) ;
386 bufferSizeTotal += (bs - bds) ;
389 endp->inBuffer = buffers ;
390 endp->inBufferIdx = 0 ;
392 endp->inMinLen = (minlen > 0 ? minlen : bufferSizeTotal) ;
394 endp->inAmtRead = 0 ;
395 endp->inClientData = clientData ;
397 FD_SET (endp->myFd, &rdSet) ;
398 if ( InputFile == NULL )
399 FD_SET (endp->myFd, &exSet) ;
406 /* Request a write to be done at a later point. ENDP is the EndPoint
407 * to do the write. BUFF is the Buffer to write from. FUNC is the
408 * function to call when the write is complete. CLIENTDATA is some
409 * data to hand back to the callback function.
411 * If BUFF is null, then no write will actually by done, but the
412 * callback function will still be called. This is useful for
413 * connecting sockets.
415 * Returns 0 if the write couldn't be prepared (like if a write is
418 int prepareWrite (EndPoint endp,
424 int bufferSizeTotal = 0 ;
427 ASSERT (endp != NULL) ;
429 if (endp->outBuffer != NULL || FD_ISSET (endp->myFd,&wrSet))
430 return 0 ; /* something already there */
432 for (idx = 0 ; buffers != NULL && buffers [idx] != NULL ; idx++)
433 bufferSizeTotal += bufferDataSize (buffers [idx]) ;
435 endp->outBuffer = buffers ;
436 endp->outBufferIdx = 0 ;
438 endp->outProgressCbk = progress ;
439 endp->outDoneCbk = done ;
440 endp->outClientData = clientData ;
441 endp->outSize = bufferSizeTotal ;
442 endp->outAmtWritten = 0 ;
444 FD_SET (endp->myFd, &wrSet) ;
445 FD_SET (endp->myFd, &exSet) ;
451 /* Cancel the pending read. */
452 void cancelRead (EndPoint endp)
454 FD_CLR (endp->myFd,&rdSet) ;
455 if (!FD_ISSET (endp->myFd, &wrSet))
456 FD_CLR (endp->myFd,&exSet) ;
458 freeBufferArray (endp->inBuffer) ;
460 endp->inBuffer = NULL ;
461 endp->inBufferIdx = 0 ;
464 endp->inAmtRead = 0 ;
466 endp->inClientData = NULL ;
470 /* cancel all pending writes. The first len bytes of the queued write are
471 copied to buffer. The number of bytes copied (if it is less than *len) is
472 copied to len. If no write was outstanding then len will have 0 stored in
474 void cancelWrite (EndPoint endp, char *buffer UNUSED, size_t *len UNUSED)
476 FD_CLR (endp->myFd, &wrSet) ;
477 if (!FD_ISSET (endp->myFd, &rdSet))
478 FD_CLR (endp->myFd, &exSet) ;
481 #error XXX need to copy data to buffer and *len
484 freeBufferArray (endp->outBuffer) ;
486 endp->outBuffer = NULL ;
487 endp->outBufferIdx = 0 ;
489 endp->outProgressCbk = NULL ;
490 endp->outDoneCbk = NULL ;
491 endp->outClientData = NULL ;
493 endp->outAmtWritten = 0 ;
496 /* queue up a new timeout request. to go off at a specific time. */
497 TimeoutId prepareWake (EndpTCB func, time_t timeToWake, void *clientData)
500 time_t now = theTime() ;
502 if (now > timeToWake)
505 id = timerElemAdd (timeToWake,func,clientData) ;
508 d_printf (1,"Preparing wake %d at date %ld for %d seconds\n",
509 (int) id, (long) now, timeToWake - now) ;
516 /* queue up a new timeout request to off TIMETOSLEEP seconds from now */
517 TimeoutId prepareSleep (EndpTCB func, int timeToSleep, void *clientData)
519 time_t now = theTime() ;
522 id = timerElemAdd (now + timeToSleep,func,clientData) ;
525 d_printf (1,"Preparing sleep %d at date %ld for %d seconds\n",
526 (int) id, (long) now, timeToSleep) ;
533 /* Updates a an existing timeout request to go off TIMETOSLEEP seconds from
534 now, or queues a new request. Always returns a new ID. */
535 TimeoutId updateSleep (TimeoutId tid, EndpTCB func, int timeToSleep,
539 return prepareSleep (func, timeToSleep, clientData) ;
542 /* XXX - quick and dirty but CPU wasteful implementation */
543 removeTimeout (tid) ;
544 return prepareSleep (func, timeToSleep, clientData) ;
549 /* Remove a timeout that was previously prepared. 0 is a legal value that
551 bool removeTimeout (TimeoutId tid)
553 TimerElem n = timeoutQueue ;
559 while (n != NULL && n->id != tid)
568 if (p == NULL) /* at the head. */
569 timeoutQueue = n->next ;
573 n->next = timeoutPool ;
576 timeoutQueueLength-- ;
582 /* The main routine. This is a near-infinite loop that drives the whole
589 unsigned long last_summary = 0 ;
592 xsignal (SIGPIPE, pipeHandler) ;
594 while (keepSelecting)
596 struct timeval timeout ;
597 struct timeval *twait ;
600 bool modifiedTime = false ;
602 twait = getTimeout (&timeout) ;
604 memcpy (&rSet,&rdSet,sizeof (rdSet)) ;
605 memcpy (&wSet,&wrSet,sizeof (wrSet)) ;
606 memcpy (&eSet,&exSet,sizeof (exSet)) ;
608 if (highestFd < 0 && twait == NULL) /* no fds and no timeout */
610 else if (twait != NULL && (twait->tv_sec != 0 || twait->tv_usec != 0))
612 /* if we have any workprocs registered we poll rather than
614 for (idx = 0 ; idx < priorityCount ; idx++)
615 if (priorityList [idx] != NULL &&
616 priorityList [idx]->workCbk != NULL)
618 modifiedTime = true ;
626 /* calculate host backlog statistics */
627 TMRstart(TMR_BACKLOGSTATS);
629 TMRstop(TMR_BACKLOGSTATS);
632 sval = select (highestFd + 1, &rSet, &wSet, &eSet, twait) ;
638 unsigned long now = TMRnow () ;
639 if (last_summary == 0
640 || (long) (now - last_summary) > (innconf->timer * 1000))
642 TMRsummary ("ME", timer_name) ;
647 if (sval == 0 && twait == NULL)
648 die ("No fd's ready and no timeouts") ;
649 else if (sval < 0 && errno == EINTR)
655 syswarn ("ME exception: select failed: %d", sval) ;
661 int readyCount = sval ;
662 int endpointsServiced = 1 ;
666 for (idx = 0 ; idx < priorityCount ; idx++)
668 EndPoint ep = priorityList [idx] ;
669 bool specialCheck = false ;
671 if (readyCount > 0 && ep != NULL)
674 int selectHit = 0, readMiss = 0, writeMiss = 0 ;
676 /* Every SELECT_RATIO times we service an endpoint in this
677 loop we check to see if the mainEndPoint fd is ready to
678 read or write. If so we process it and do the current
679 endpoint next time around. */
680 if (((endpointsServiced % (SELECT_RATIO + 1)) == 0) &&
681 ep != mainEndPoint && mainEndPoint != NULL &&
684 fd_set trSet, twSet ;
686 int checkRead = FD_ISSET (mainEndPoint->myFd,&rdSet) ;
687 int checkWrite = FD_ISSET (mainEndPoint->myFd,&wrSet) ;
691 if (checkRead || checkWrite)
693 fd = mainEndPoint->myFd ;
695 tw.tv_sec = tw.tv_usec = 0 ;
696 memset (&trSet,0,sizeof (trSet)) ;
697 memset (&twSet,0,sizeof (twSet)) ;
704 sval = select (fd + 1,&trSet,&twSet,0,&tw) ;
710 specialCheck = true ;
711 if (checkRead && FD_ISSET (fd,&trSet))
716 if (checkWrite && FD_ISSET (fd,&twSet))
724 syswarn ("ME exception: select failed: %d",
730 fd = ep->myFd ; /* back to original fd. */
734 FD_CLR (fd, &exSet) ;
736 if (FD_ISSET (fd,&rSet))
739 endpointsServiced++ ;
742 if ((rval = doRead (ep)) != IoIncomplete)
744 Buffer *buff = ep->inBuffer ;
746 FD_CLR (fd, &rdSet) ;
748 /* incase callback wants to issue read */
749 ep->inBuffer = NULL ;
751 if (ep->inCbk != NULL)
752 (*ep->inCbk) (ep,rval,buff,ep->inClientData) ;
754 freeBufferArray (buff) ;
758 if ( InputFile == NULL )
759 FD_SET (ep->myFd, &exSet) ;
762 else if (FD_ISSET(fd,&rdSet))
765 /* get it again as the read callback may have deleted the */
770 ep = priorityList [idx] ;
772 if (readyCount > 0 && ep != NULL && FD_ISSET (fd,&wSet))
775 endpointsServiced++ ;
778 if ((rval = doWrite (ep)) != IoIncomplete &&
781 Buffer *buff = ep->outBuffer ;
783 FD_CLR (fd, &wrSet) ;
785 /* incase callback wants to issue a write */
786 ep->outBuffer = NULL ;
788 if (ep->outDoneCbk != NULL)
789 (*ep->outDoneCbk) (ep,rval,buff,ep->outClientData) ;
791 freeBufferArray (buff) ;
793 else if (rval == IoProgress)
795 Buffer *buff = ep->outBuffer ;
797 if (ep->outProgressCbk != NULL)
798 (*ep->outProgressCbk) (ep,rval,buff,ep->outClientData) ;
802 FD_SET (ep->myFd, &exSet) ;
805 else if (FD_ISSET(fd,&wrSet))
808 /* get it again as the write callback may have deleted the */
813 ep = priorityList [idx] ;
817 ep->selectHits *= 0.9 ;
819 ep->selectHits += 1.0 ;
820 else if (readMiss && writeMiss)
821 ep->selectHits -= 1.0 ;
824 if (readyCount > 0 && ep != NULL && FD_ISSET (fd,&eSet))
829 reorderPriorityList () ;
831 else if (sval == 0 && !modifiedTime)
834 /* now we're done processing all read fds and/or the
835 timeout(s). Next we do the work callbacks for all the endpoints
836 whose fds weren't ready. */
837 for (idx = 0 ; idx < priorityCount ; idx++)
839 EndPoint ep = priorityList [idx] ;
845 if ( !FD_ISSET (fd,&rSet) && !FD_ISSET (fd,&wSet) )
846 if (ep->workCbk != NULL)
848 EndpWorkCbk func = ep->workCbk ;
849 void *data = ep->workData ;
852 ep->workData = NULL ;
853 TMRstart(TMR_CALLBACK);
855 TMRstop(TMR_CALLBACK);
863 void *addWorkCallback (EndPoint endp, EndpWorkCbk cbk, void *data)
865 void *oldBk = endp->workData ;
867 endp->workCbk = cbk ;
868 endp->workData = data ;
873 /* Tell the Run routine to stop next time around. */
880 int endPointErrno (EndPoint endp)
882 return endp->myErrno ;
885 bool readIsPending (EndPoint endp)
887 return (endp->inBuffer != NULL ? true : false) ;
890 bool writeIsPending (EndPoint endp)
892 return (endp->outBuffer != NULL ? true : false) ;
895 void setMainEndPoint (EndPoint endp)
899 mainEndPoint = endp ;
900 if (endp->myFd >= 0 && fstat (endp->myFd,&buf) < 0)
901 syslog (LOG_ERR,"Can't fstat mainEndPoint fd (%d): %m", endp->myFd) ;
902 else if (endp->myFd < 0)
903 syslog (LOG_ERR,"Negative fd for mainEndPoint???") ;
905 mainEpIsReg = (S_ISREG(buf.st_mode) ? true : false) ;
908 int getMainEndPointFd (void)
910 return(mainEndPoint->myFd) ;
913 void freeTimeoutQueue (void)
921 p->next = timeoutPool ;
924 timeoutQueueLength-- ;
929 /***********************************************************************/
930 /* STATIC FUNCTIONS BELOW HERE */
931 /***********************************************************************/
935 * called when the file descriptor on this endpoint is read ready.
937 static IoStatus doRead (EndPoint endp)
941 unsigned int bCount = 0 ;
942 struct iovec *vp = NULL ;
943 Buffer *buffers = endp->inBuffer ;
944 unsigned int currIdx = endp->inBufferIdx ;
946 IoStatus rval = IoIncomplete ;
949 for (i = currIdx ; buffers && buffers [i] != NULL ; i++)
952 bCount = (bCount > IOV_MAX ? IOV_MAX : bCount) ;
956 /* now set up the iovecs for the readv */
962 vp = xcalloc (bCount, sizeof(struct iovec)) ;
964 bbase = bufferBase (buffers[currIdx]) ;
965 bds = bufferDataSize (buffers[currIdx]) ;
966 bs = bufferSize (buffers [currIdx]) ;
968 /* inIndex is an index in the virtual array of the read, not directly
970 vp[0].iov_base = bbase + bds + endp->inIndex ;
971 vp[0].iov_len = bs - bds - endp->inIndex ;
973 amt = vp[0].iov_len ;
975 for (idx = currIdx + 1 ; idx < bCount ; idx++)
977 bbase = bufferBase (buffers[idx]) ;
978 bds = bufferDataSize (buffers[idx]) ;
979 bs = bufferSize (buffers [idx]) ;
981 vp [idx].iov_base = bbase ;
982 vp [idx].iov_len = bs - bds ;
986 i = readv (endp->myFd,vp,(int) bCount) ;
990 size_t readAmt = (size_t) i ;
992 endp->inAmtRead += readAmt ;
994 /* check if we filled the first buffer */
995 if (readAmt >= (size_t) vp[0].iov_len)
997 bufferIncrDataSize (buffers[currIdx], vp[0].iov_len) ;
998 readAmt -= vp [0].iov_len ;
999 endp->inBufferIdx++ ;
1003 endp->inIndex += readAmt ;
1004 bufferIncrDataSize (buffers[currIdx], readAmt) ;
1008 /* now check the rest of the buffers */
1009 for (idx = 1 ; readAmt > 0 ; idx++)
1011 ASSERT (idx < bCount) ;
1013 bs = bufferSize (buffers [currIdx + idx]) ;
1014 bbase = bufferBase (buffers [currIdx + idx]) ;
1015 bds = bufferDataSize (buffers [currIdx + idx]) ;
1017 if (readAmt >= (bs - bds))
1019 bufferSetDataSize (buffers [currIdx + idx],bs) ;
1021 endp->inBufferIdx++ ;
1025 endp->inIndex = readAmt ;
1026 bufferIncrDataSize (buffers [currIdx + idx], readAmt) ;
1027 memset (bbase + bds + readAmt, 0, bs - bds - readAmt) ;
1032 if (endp->inAmtRead >= endp->inMinLen)
1038 else if (i < 0 && errno != EINTR && errno != EAGAIN)
1040 endp->myErrno = errno ;
1043 else if (i < 0 && errno == EINTR)
1049 else /* i < 0 && errno == EAGAIN */
1050 rval = IoIncomplete ;
1060 /* called when the file descriptor on the endpoint is write ready. */
1061 static IoStatus doWrite (EndPoint endp)
1066 unsigned int bCount = 0 ;
1067 struct iovec *vp = NULL ;
1068 Buffer *buffers = endp->outBuffer ;
1069 unsigned int currIdx = endp->outBufferIdx ;
1070 IoStatus rval = IoIncomplete ;
1072 TMRstart(TMR_WRITE);
1073 for (i = currIdx ; buffers && buffers [i] != NULL ; i++)
1076 bCount = (bCount > IOV_MAX ? IOV_MAX : bCount) ;
1082 vp = xcalloc (bCount, sizeof(struct iovec)) ;
1084 vp[0].iov_base = bufferBase (buffers [currIdx]) ;
1085 vp[0].iov_base = (char *) vp[0].iov_base + endp->outIndex ;
1086 vp[0].iov_len = bufferDataSize (buffers [currIdx]) - endp->outIndex ;
1088 amt = vp[0].iov_len ;
1090 for (idx = 1 ; idx < bCount ; idx++)
1092 vp [idx].iov_base = bufferBase (buffers [idx + currIdx]) ;
1093 vp [idx].iov_len = bufferDataSize (buffers [idx + currIdx]) ;
1094 amt += vp[idx].iov_len ;
1100 /* nasty mixing, but stderr is unbuffered usually. It's debugging only */
1101 d_printf (5,"About to write this:================================\n") ;
1102 writev (2,vp,bCount) ;
1103 d_printf (5,"end=================================================\n") ;
1108 ASSERT (endp->myFd >= 0) ;
1110 ASSERT (bCount > 0) ;
1112 i = writev (endp->myFd,vp,(int) bCount) ;
1116 size_t writeAmt = (size_t) i ;
1118 endp->outAmtWritten += writeAmt ;
1120 /* now figure out which buffers got completely written */
1121 for (idx = 0 ; writeAmt > 0 ; idx++)
1123 if (writeAmt >= (size_t) vp[idx].iov_len)
1125 endp->outBufferIdx++ ;
1126 endp->outIndex = 0 ;
1127 writeAmt -= vp [idx].iov_len ;
1131 /* this buffer was not completly written */
1132 endp->outIndex += writeAmt ;
1137 if (endp->outAmtWritten == endp->outSize)
1142 else if (i < 0 && errno == EINTR)
1146 else if (i < 0 && errno == EAGAIN)
1148 rval = IoIncomplete ;
1152 endp->myErrno = errno ;
1156 d_printf (1,"Wrote 0 bytes in doWrite()?\n") ;
1168 static IoStatus doExcept (EndPoint endp)
1172 int fd = endPointFd (endp) ;
1174 if (getsockopt (fd, SOL_SOCKET, SO_ERROR,
1175 (char *) &optval, &size) != 0)
1176 syswarn ("ME exception: getsockopt (%d)", fd) ;
1177 else if (optval != 0)
1180 syswarn ("ME exception: fd %d", fd) ;
1183 syswarn ("ME exception: fd %d: Unknown error", fd) ;
1195 static void endPointPrint (EndPoint ep, FILE *fp)
1197 fprintf (fp,"EndPoint [%p]: fd [%d]\n",(void *) ep, ep->myFd) ;
1201 static void signalHandler (int s)
1204 #ifndef HAVE_SIGACTION
1205 xsignal (s, signalHandler) ;
1210 static void pipeHandler (int s)
1212 xsignal (s, pipeHandler) ;
1216 /* compare the hit ratio of two endpoint for qsort. We're sorting the
1217 endpoints on their relative activity */
1218 static int hitCompare (const void *v1, const void *v2)
1220 const struct endpoint_s *e1 = *((const struct endpoint_s * const *) v1) ;
1221 const struct endpoint_s *e2 = *((const struct endpoint_s * const *) v2) ;
1222 double e1Hit = e1->selectHits ;
1223 double e2Hit = e2->selectHits ;
1225 if (e1 == mainEndPoint)
1227 else if (e2 == mainEndPoint)
1229 else if (e1Hit < e2Hit)
1231 else if (e1Hit > e2Hit)
1239 /* We maintain the endpoints in order of the percent times they're ready
1240 for read/write when they've been selected. This helps us favour the more
1241 active endpoints. */
1242 static void reorderPriorityList (void)
1245 static int thisTime = 4;
1247 /* only sort every 4th time since it's so expensive */
1253 for (i = j = 0; i < priorityCount; i++)
1254 if (priorityList [i] != NULL)
1257 priorityList [j] = priorityList [i] ;
1261 for (i = j; i < priorityCount; i++)
1262 priorityList [ i ] = NULL;
1266 qsort (priorityList, (size_t)priorityCount, sizeof (EndPoint), &hitCompare);
1270 #define TIMEOUT_POOL_SIZE ((4096 - 2 * (sizeof (void *))) / (sizeof (TimerElemStruct)))
1272 /* create a new timeout data structure properly initialized. */
1273 static TimerElem newTimerElem (TimeoutId i, time_t w, EndpTCB f, void *d)
1277 if (timeoutPool == NULL)
1281 timeoutPool = xmalloc (sizeof(TimerElemStruct) * TIMEOUT_POOL_SIZE) ;
1283 for (j = 0; j < TIMEOUT_POOL_SIZE - 1; j++)
1284 timeoutPool[j] . next = &(timeoutPool [j + 1]) ;
1285 timeoutPool [TIMEOUT_POOL_SIZE-1] . next = NULL ;
1289 timeoutPool = timeoutPool->next ;
1291 ASSERT (p != NULL) ;
1304 /* add a new timeout structure to the global list. */
1305 static TimeoutId timerElemAdd (time_t when, EndpTCB func, void *data)
1307 TimerElem p = newTimerElem (++nextId ? nextId : ++nextId,when,func,data) ;
1308 TimerElem n = timeoutQueue ;
1309 TimerElem q = NULL ;
1311 while (n != NULL && n->when <= when)
1317 if (n == NULL && q == NULL) /* empty list so put at head */
1319 else if (q == NULL) /* put at head of list */
1321 p->next = timeoutQueue ;
1324 else if (n == NULL) /* put at end of list */
1326 else /* in middle of list */
1332 timeoutQueueLength++ ;
1338 /* Fills in TOUT with the timeout to use on the next call to
1339 * select. Returns TOUT. If there is no timeout, then returns NULL. If the
1340 * timeout has already passed, then it calls the timeout handling routine
1343 static struct timeval *getTimeout (struct timeval *tout)
1345 struct timeval *rval = NULL ;
1347 if (timeoutQueue != NULL)
1349 time_t now = theTime() ;
1351 while (timeoutQueue && now > timeoutQueue->when)
1354 if (timeoutQueue != NULL && now == timeoutQueue->when)
1360 else if (timeoutQueue != NULL)
1362 tout->tv_sec = timeoutQueue->when - now ;
1376 static void doTimeout (void)
1378 EndpTCB cbk = timeoutQueue->func ;
1379 void *data = timeoutQueue->data ;
1380 TimerElem p = timeoutQueue ;
1381 TimeoutId tid = timeoutQueue->id ;
1383 timeoutQueue = timeoutQueue->next ;
1385 p->next = timeoutPool ;
1388 timeoutQueueLength-- ;
1391 (*cbk) (tid, data) ; /* call the callback function */
1398 #if defined (WANT_MAIN)
1401 #define BUFF_SIZE 100
1404 void timerCallback (void *cd) ;
1405 void timerCallback (void *cd)
1407 d_printf (1,"Callback \n") ;
1411 void lineIsWritten (EndPoint ep, IoStatus status, Buffer *buffer, void *data);
1412 void lineIsWritten (EndPoint ep, IoStatus status, Buffer *buffer, void *data)
1416 if (status == IoDone)
1417 d_printf (1,"LINE was written\n") ;
1420 int oldErrno = errno ;
1422 errno = endPointErrno (ep) ;
1423 perror ("write failed") ;
1427 for (i = 0 ; buffer && buffer [i] ; i++)
1428 delBuffer (buffer [i]) ;
1431 void lineIsRead (EndPoint myEp, IoStatus status, Buffer *buffer, void *data);
1432 void lineIsRead (EndPoint myEp, IoStatus status, Buffer *buffer, void *d)
1434 Buffer *writeBuffers, *readBuffers ;
1435 Buffer newBuff1, newBuff2 ;
1436 Buffer newInputBuffer ;
1440 if (status == IoFailed)
1442 int oldErrno = errno ;
1444 errno = endPointErrno (myEp) ;
1445 perror ("read failed") ;
1450 else if (status == IoEOF)
1452 d_printf (1,"EOF on endpoint.\n") ;
1453 delEndPoint (myEp) ;
1459 data = bufferBase (buffer[0]) ;
1460 len = bufferDataSize (buffer[0]) ;
1462 if (data [len - 1] == '\r' || data [len - 1] == '\n')
1463 bufferDecrDataSize (buffer [0],1) ;
1464 if (data [len - 1] == '\r' || data [len - 1] == '\n')
1465 bufferDecrDataSize (buffer [0],1) ;
1469 d_printf (1,"Got a line: %s\n", data) ;
1471 newBuff1 = newBuffer (len + 50) ;
1472 newBuff2 = newBuffer (len + 50) ;
1473 newInputBuffer = newBuffer (BUFF_SIZE) ;
1475 p = bufferBase (newBuff1) ;
1476 strcpy (p, "Thanks for that \"") ;
1477 bufferSetDataSize (newBuff1,strlen (p)) ;
1479 p = bufferBase (newBuff2) ;
1480 strcpy (p,"\" very tasty\n") ;
1481 bufferSetDataSize (newBuff2,strlen (p)) ;
1483 writeBuffers = makeBufferArray (newBuff1,buffer[0],newBuff2,NULL) ;
1484 readBuffers = makeBufferArray (newInputBuffer,NULL) ;
1486 prepareWrite (myEp,writeBuffers,lineIsWritten,NULL) ;
1487 prepareRead (myEp,readBuffers,lineIsRead,NULL,1) ;
1490 myEp->registerWake (&timerCallback,theTime() + 7,0) ;
1495 static void printDate (TimeoutId tid, void *data) ;
1496 static void printDate (TimeoutId tid, void *data)
1502 d_printf (1,"Timeout (%d) time now is %ld %s",
1503 (int) tid,(long) t,ctime(&t)) ;
1505 if (timeoutQueue == NULL)
1507 int ti = (rand () % 10) + 1 ;
1509 prepareSleep (printDate,ti,data) ;
1515 static void Timeout (TimeoutId tid, void *data) ;
1516 static void Timeout (TimeoutId tid, void *data)
1519 static int howMany ;
1521 time_t t = theTime() ;
1529 d_printf (1,"Timeout (%d) time now is %ld %s",
1530 (int) tid, (long) t,ctime(&t)) ;
1532 if (timeoutQueue != NULL && timeoutQueue->next != NULL)
1533 d_printf (1,"%s timeout id %d\n",
1534 (removeTimeout (rm) ? "REMOVED" : "FAILED TO REMOVE"), rm) ;
1537 howMany = (rand() % 10) + (timeoutQueue == NULL ? 1 : 0) ;
1539 for (i = 0 ; i < howMany ; i++ )
1542 int count = (rand () % 30) + 1 ;
1544 id = (i % 2 == 0 ? prepareSleep (Timeout,count,data)
1545 : prepareWake (Timeout,t + count,data)) ;
1553 void newConn (EndPoint ep, IoStatus status, Buffer *buffer, void *d) ;
1554 void newConn (EndPoint ep, IoStatus status, Buffer *buffer, void *d)
1557 struct sockaddr_in in ;
1558 Buffer *readBuffers ;
1559 Buffer newBuff = newBuffer (BUFF_SIZE) ;
1560 int len = sizeof (in) ;
1563 memset (&in, 0, sizeof (in)) ;
1565 fd = accept (ep->myFd, (struct sockaddr *) &in, &len) ;
1569 perror ("::accept") ;
1573 newEp = newEndPoint (fd) ;
1575 prepareRead (ep, NULL, newConn,NULL,0) ;
1577 readBuffers = makeBufferArray (newBuff,NULL) ;
1579 prepareRead (newEp, readBuffers, lineIsRead, NULL, 1) ;
1581 d_printf (1,"Set up a new connection\n");
1585 int main (int argc, char **argv)
1588 struct sockaddr_in accNet ;
1589 int accFd = socket (AF_INET,SOCK_STREAM,0) ;
1590 unsigned short port = atoi (argc > 1 ? argv[1] : "10000") ;
1591 time_t t = theTime() ;
1594 program = strrchr (argv[0],'/') ;
1597 program = argv [0] ;
1601 ASSERT (accFd >= 0) ;
1603 memset (&accNet,0,sizeof (accNet)) ;
1604 accNet.sin_family = AF_INET ;
1605 accNet.sin_addr.s_addr = htonl (INADDR_ANY) ;
1606 accNet.sin_port = htons (port) ;
1609 openlog (program, LOG_PERROR | LOG_PID, LOG_NEWS) ;
1611 openlog (program, LOG_PID, LOG_NEWS) ;
1614 if (bind (accFd, (struct sockaddr *) &accNet, sizeof (accNet)) < 0)
1616 perror ("bind: %m") ;
1622 accConn = newEndPoint (accFd) ;
1624 prepareRead (accConn,NULL,newConn,NULL,0) ;
1626 prepareSleep (Timeout,5,(void *) 0x10) ;
1629 d_printf (1,"Time now is %s",ctime(&t)) ;
1631 prepareWake (printDate,t + 16,NULL) ;
1637 #endif /* WANT_MAIN */
1639 /* Probably doesn't do the right thing for SIGCHLD */
1640 void setSigHandler (int signum, void (*ptr)(int))
1644 if (sigHandlers == NULL)
1646 sigHandlers = xmalloc (sizeof(sigfn) * NSIG) ;
1647 sigFlags = xmalloc (sizeof(sig_atomic_t) * NSIG) ;
1648 for (i = 0 ; i < NSIG ; i++)
1650 sigHandlers [i] = NULL ;
1657 syslog (LOG_ERR,"ME signal number bigger than NSIG: %d vs %d",
1662 if (xsignal (signum, signalHandler) == SIG_ERR)
1663 die ("signal failed: %s", strerror(errno)) ;
1665 sigHandlers[signum] = ptr ;
1668 static void handleSignals (void)
1671 #if defined(USE_SIGVEC)
1675 for (i = 1; i < NSIG; i++)
1679 #if defined(USE_SIGACTION)
1680 sigset_t set, oset ;
1682 if (sigemptyset (&set) != 0 || sigaddset (&set, i) != 0)
1683 die ("sigemptyset or sigaddset failed") ;
1684 if (sigprocmask (SIG_BLOCK, &set, &oset) != 0)
1685 die ("sigprocmask failed: %s", strerror(errno)) ;
1686 #elif defined(USE_SIGVEC)
1688 # define sigmask(s) (1 << ((s) - 1))
1692 mask = sigblock (sigmask(i)) ;
1693 #elif defined(USE_SIGSET)
1694 if (sighold (i) != 0)
1695 die ("sighold failed: %s", strerror(errno)) ;
1697 /* hope for the best */
1702 if (sigHandlers[i] != NULL &&
1703 sigHandlers[i] != SIG_IGN &&
1704 sigHandlers[i] != SIG_DFL)
1705 (sigHandlers[i])(i) ;
1707 #if defined(USE_SIGACTION)
1708 if (sigprocmask (SIG_SETMASK, &oset, (sigset_t *)NULL) != 0)
1709 die ("sigprocmask failed: %s", strerror(errno)) ;
1710 #elif defined(USE_SIGVEC)
1712 #elif defined(USE_SIGSET)
1713 if (sigrelse (i) != 0)
1714 die ("sigrelse failed: %s", strerror(errno)) ;
1716 /* hope for the best */
1723 int endpointConfigLoadCbk (void *data)
1725 FILE *fp = (FILE *) data ;
1729 if (getInteger (topScope,"stdio-fdmax",&ival,NO_INHERIT))
1733 #if ! defined (FD_SETSIZE)
1737 logOrPrint (LOG_ERR,fp,NO_STDIO_FDMAX) ;
1744 if (stdioFdMax > FD_SETSIZE)
1746 logOrPrint (LOG_ERR,fp,
1747 "ME config: value of %s (%ld) in %s is higher"
1748 " than maximum of %ld. Using %ld","stdio-fdmax",
1749 ival,"global scope",
1750 (long) FD_SETSIZE, (long) FD_SETSIZE) ;
1751 stdioFdMax = FD_SETSIZE ;
1767 /* definitely not the fastest, but the most portable way to find the first
1768 set bit in a mask */
1769 static int ff_set (fd_set *set,unsigned int start)
1773 for (i = start ; i < FD_SETSIZE ; i++)
1774 if (FD_ISSET (i,set))
1781 static int ff_free (fd_set *set, unsigned int start)
1785 for (i = start ; i < FD_SETSIZE ; i++)
1786 if (!FD_ISSET (i,set))
1795 static void endpointCleanup (void)
1798 free (priorityList) ;
1799 free (sigHandlers) ;
1801 priorityList = NULL ;
1802 sigHandlers = NULL ;