Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members

Select.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2001-2004 Vintela, Inc. All rights reserved.
00003 *
00004 * Redistribution and use in source and binary forms, with or without
00005 * modification, are permitted provided that the following conditions are met:
00006 *
00007 *  - Redistributions of source code must retain the above copyright notice,
00008 *    this list of conditions and the following disclaimer.
00009 *
00010 *  - Redistributions in binary form must reproduce the above copyright notice,
00011 *    this list of conditions and the following disclaimer in the documentation
00012 *    and/or other materials provided with the distribution.
00013 *
00014 *  - Neither the name of Vintela, Inc. nor the names of its
00015 *    contributors may be used to endorse or promote products derived from this
00016 *    software without specific prior written permission.
00017 *
00018 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
00019 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00020 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00021 * ARE DISCLAIMED. IN NO EVENT SHALL Vintela, Inc. OR THE CONTRIBUTORS
00022 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00023 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00024 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00025 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00026 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00027 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00028 * POSSIBILITY OF SUCH DAMAGE.
00029 *******************************************************************************/
00030 
00036 #include "BLOCXX_config.h"
00037 #include "Select.hpp"
00038 #include "AutoPtr.hpp"
00039 #include "Assertion.hpp"
00040 #include "Thread.hpp" // for testCancel()
00041 
00042 #if defined(BLOCXX_WIN32)
00043 #include <cassert>
00044 #endif
00045 
00046 extern "C"
00047 {
00048 
00049 #ifndef BLOCXX_WIN32
00050  #ifdef BLOCXX_HAVE_SYS_EPOLL_H
00051   #include <sys/epoll.h>
00052  #endif
00053  #if defined (BLOCXX_HAVE_SYS_POLL_H)
00054   #include <sys/poll.h>
00055  #endif
00056  #if defined (BLOCXX_HAVE_SYS_SELECT_H)
00057   #include <sys/select.h>
00058  #endif
00059 #endif
00060 
00061 #ifdef BLOCXX_HAVE_SYS_TIME_H
00062  #include <sys/time.h>
00063 #endif
00064 
00065 #include <sys/types.h>
00066 
00067 #ifdef BLOCXX_HAVE_UNISTD_H
00068  #include <unistd.h>
00069 #endif
00070 
00071 #include <errno.h>
00072 }
00073 
00074 namespace BLOCXX_NAMESPACE
00075 {
00076 
00077 namespace Select
00078 {
00079 #if defined(BLOCXX_WIN32)
00080 
00081 int
00082 selectRW(SelectObjectArray& selarray, UInt32 ms)
00083 {
00084    int rc;
00085    size_t hcount = static_cast<DWORD>(selarray.size());
00086    AutoPtrVec<HANDLE> hdls(new HANDLE[hcount]);
00087 
00088    size_t handleidx = 0;
00089    for (size_t i = 0; i < selarray.size(); i++, handleidx++)
00090    {
00091       if(selarray[i].s.sockfd != INVALID_SOCKET
00092          && selarray[i].s.networkevents)
00093       {
00094          ::WSAEventSelect(selarray[i].s.sockfd, 
00095             selarray[i].s.event, selarray[i].s.networkevents);
00096       }
00097             
00098       hdls[handleidx] = selarray[i].s.event;
00099    }
00100 
00101    DWORD timeout = (ms != ~0U) ? ms : INFINITE;
00102    DWORD cc = ::WaitForMultipleObjects(hcount, hdls.get(), FALSE, timeout);
00103 
00104    assert(cc != WAIT_ABANDONED);
00105 
00106    switch (cc)
00107    {
00108       case WAIT_FAILED:
00109          rc = Select::SELECT_ERROR;
00110          break;
00111       case WAIT_TIMEOUT:
00112          rc = Select::SELECT_TIMEOUT;
00113          break;
00114       default:
00115          rc = cc - WAIT_OBJECT_0;
00116          
00117          // If this is a socket, set it back to 
00118          // blocking mode
00119          if(selarray[rc].s.sockfd != INVALID_SOCKET)
00120          {
00121             if(selarray[rc].s.networkevents
00122                && selarray[rc].s.doreset == false)
00123             {
00124                ::WSAEventSelect(selarray[rc].s.sockfd, 
00125                   selarray[rc].s.event, selarray[rc].s.networkevents);
00126             }
00127             else
00128             {
00129                // Set socket back to blocking
00130                ::WSAEventSelect(selarray[rc].s.sockfd, 
00131                   selarray[rc].s.event, 0);
00132                u_long ioctlarg = 0;
00133                ::ioctlsocket(selarray[rc].s.sockfd, FIONBIO, &ioctlarg);
00134             }
00135          }
00136          break;
00137    }
00138 
00139    if( rc < 0 )
00140       return rc;
00141 
00142    int availableCount = 0;
00143    for (size_t i = 0; i < selarray.size(); i++)
00144    {
00145       if( WaitForSingleObject(selarray[i].s.event, 0) == WAIT_OBJECT_0 )
00146       {
00147          if( selarray[i].waitForRead )
00148             selarray[i].readAvailable = true;
00149          if( selarray[i].waitForWrite )
00150             selarray[i].writeAvailable = true;
00151          ++availableCount;
00152       }
00153       else
00154       {
00155          selarray[i].readAvailable = false;
00156          selarray[i].writeAvailable = false;
00157       }
00158    }
00159    return availableCount;
00160 }
00161 
00162 
00163 #else
00164 
00166 // epoll version
00167 int
00168 selectRWEpoll(SelectObjectArray& selarray, UInt32 ms)
00169 {
00170 #ifdef BLOCXX_HAVE_SYS_EPOLL_H
00171    int lerrno = 0, ecc = 0;
00172    int timeout;
00173    AutoPtrVec<epoll_event> events(new epoll_event[selarray.size()]);
00174    int epfd = epoll_create(selarray.size());
00175    if(epfd == -1)
00176    {
00177       if (errno == ENOSYS) // kernel doesn't support it
00178       {
00179          return SELECT_NOT_IMPLEMENTED;
00180       }
00181       // Need to return something else?
00182       return Select::SELECT_ERROR;
00183    }
00184 
00185    for (size_t i = 0; i < selarray.size(); i++)
00186    {
00187       BLOCXX_ASSERT(selarray[i].s >= 0);
00188       selarray[i].readAvailable = false;
00189       selarray[i].writeAvailable = false;
00190       selarray[i].wasError = false;
00191       events[i].data.u32 = i;
00192       events[i].events = 0;
00193       if(selarray[i].waitForRead)
00194       {
00195          events[i].events |= (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP);
00196       }
00197       if(selarray[i].waitForWrite)
00198       {
00199          events[i].events |= EPOLLOUT;
00200       }
00201 
00202       if(epoll_ctl(epfd, EPOLL_CTL_ADD, selarray[i].s, &events[i]) != 0)
00203       {
00204 			::close(epfd);
00205          // Need to return something else?
00206          return Select::SELECT_ERROR;
00207       }
00208    }
00209 
00210    // here we spin checking for thread cancellation every so often.
00211    const Int32 loopMicroSeconds = 100 * 1000; // 1/10 of a second
00212    timeval now, end;
00213    gettimeofday(&now, NULL);
00214    end = now;
00215    end.tv_sec  += ms / 1000;
00216    end.tv_usec += (ms % 1000) * 1000;
00217 
00218    while ((ecc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00219        || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec))))
00220    {
00221       timeval tv;
00222       tv.tv_sec = end.tv_sec - now.tv_sec;
00223       if (end.tv_usec >= now.tv_usec)
00224       {
00225          tv.tv_usec = end.tv_usec - now.tv_usec;
00226       }
00227       else
00228       {
00229          tv.tv_sec--;
00230          tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00231       }
00232 
00233       if ((tv.tv_sec != 0) 
00234          || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00235       {
00236          tv.tv_sec = 0;
00237          tv.tv_usec = loopMicroSeconds;
00238       }
00239 
00240       timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
00241       Thread::testCancel();
00242       ecc = epoll_wait(epfd, events.get(), selarray.size(), timeout);
00243       lerrno = errno;
00244       Thread::testCancel();
00245       gettimeofday(&now, NULL);
00246    }
00247 
00248 	::close(epfd);
00249    if (ecc < 0)
00250    {
00251       return (lerrno == EINTR) ? Select::SELECT_INTERRUPTED : Select::SELECT_ERROR;
00252    }
00253    if (ecc == 0)
00254    {
00255       return Select::SELECT_TIMEOUT;
00256    }
00257 
00258    for(int i = 0; i < ecc; i++)
00259    {
00260       int ndx = events[i].data.u32;
00261       selarray[ndx].readAvailable = ((events[i].events 
00262          & (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP)) != 0);
00263       selarray[ndx].writeAvailable = ((events[i].events 
00264          & (EPOLLOUT | EPOLLERR | EPOLLHUP)) != 0);
00265    }
00266 
00267    return ecc;
00268 #else
00269    return SELECT_NOT_IMPLEMENTED;
00270 #endif
00271 }
00272 
00274 // poll() version
00275 int
00276 selectRWPoll(SelectObjectArray& selarray, UInt32 ms)
00277 {
00278 #if defined (BLOCXX_HAVE_SYS_POLL_H)
00279    int lerrno = 0, rc = 0;
00280 
00281    AutoPtrVec<pollfd> pfds(new pollfd[selarray.size()]);
00282 
00283    // here we spin checking for thread cancellation every so often.
00284    timeval now, end;
00285    const Int32 loopMicroSeconds = 100 * 1000; // 1/10 of a second
00286    gettimeofday(&now, NULL);
00287    end = now;
00288    end.tv_sec  += ms / 1000;
00289    end.tv_usec += (ms % 1000) * 1000;
00290    while ((rc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00291        || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec))))
00292    {
00293       for (size_t i = 0; i < selarray.size(); i++)
00294       {
00295          BLOCXX_ASSERT(selarray[i].s >= 0);
00296          selarray[i].readAvailable = false;
00297          selarray[i].writeAvailable = false;
00298          selarray[i].wasError = false;
00299          pfds[i].revents = 0;
00300          pfds[i].fd = selarray[i].s;
00301          pfds[i].events = selarray[i].waitForRead ? (POLLIN | POLLPRI) : 0;
00302          if(selarray[i].waitForWrite)
00303             pfds[i].events |= POLLOUT;
00304       }
00305 
00306       timeval tv;
00307       tv.tv_sec = end.tv_sec - now.tv_sec;
00308       if (end.tv_usec >= now.tv_usec)
00309       {
00310          tv.tv_usec = end.tv_usec - now.tv_usec;
00311       }
00312       else
00313       {
00314          tv.tv_sec--;
00315          tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00316       }
00317 
00318       if ((tv.tv_sec != 0) || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00319       {
00320          tv.tv_sec = 0;
00321          tv.tv_usec = loopMicroSeconds;
00322       }
00323 
00324       // TODO optimize this. 
00325       int loopMSecs = tv.tv_sec * 1000 + tv.tv_usec / 1000; 
00326 
00327       Thread::testCancel();
00328       rc = ::poll(pfds.get(), selarray.size(), loopMSecs); 
00329       lerrno = errno;
00330       Thread::testCancel();
00331 
00332       gettimeofday(&now, NULL);
00333    }
00334    
00335    if (rc < 0)
00336    {
00337       if (lerrno == EINTR)
00338       {
00339 #ifdef BLOCXX_NETWARE
00340          // When the NetWare server is shutting down, select will
00341          // set errno to EINTR on return. If this thread does not
00342          // yield control (cooperative multitasking) then we end
00343          // up in a very tight loop and get a CPUHog server abbend.
00344          pthread_yield();
00345 #endif
00346          return Select::SELECT_INTERRUPTED;
00347       }
00348       else
00349       {
00350          return Select::SELECT_ERROR;
00351       }
00352    }
00353    if (rc == 0)
00354    {
00355       return Select::SELECT_TIMEOUT;
00356    }
00357    for (size_t i = 0; i < selarray.size(); i++)
00358    {
00359       if(pfds[i].events & (POLLERR | POLLNVAL))
00360       {
00361          selarray[i].wasError = true;
00362       }
00363       else
00364       {
00365          if(selarray[i].waitForRead)
00366          {
00367             selarray[i].readAvailable = (pfds[i].revents & 
00368                (POLLIN | POLLPRI | POLLHUP));
00369          }
00370 
00371          if(selarray[i].waitForWrite)
00372          {
00373             selarray[i].writeAvailable = (pfds[i].revents &
00374                (POLLOUT | POLLHUP));
00375          }
00376       }
00377    }
00378 
00379    return rc;
00380 #else
00381    return SELECT_NOT_IMPLEMENTED;
00382 #endif
00383 }
00385 // ::select() version
00386 int
00387 selectRWSelect(SelectObjectArray& selarray, UInt32 ms)
00388 {
00389 #if defined (BLOCXX_HAVE_SYS_SELECT_H)
00390    int lerrno, rc = 0;
00391    fd_set ifds;
00392    fd_set ofds;
00393 
00394    // here we spin checking for thread cancellation every so often.
00395    timeval now, end;
00396    const Int32 loopMicroSeconds = 100 * 1000; // 1/10 of a second
00397    gettimeofday(&now, NULL);
00398    end = now;
00399    end.tv_sec  += ms / 1000;
00400    end.tv_usec += (ms % 1000) * 1000;
00401    while ((rc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00402        || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec))))
00403    {
00404       int maxfd = 0;
00405       FD_ZERO(&ifds);
00406       FD_ZERO(&ofds);
00407       for (size_t i = 0; i < selarray.size(); ++i)
00408       {
00409          int fd = selarray[i].s;
00410          BLOCXX_ASSERT(fd >= 0);
00411          if (maxfd < fd)
00412          {
00413             maxfd = fd;
00414          }
00415          if (fd < 0 || fd >= FD_SETSIZE)
00416          {
00417             return Select::SELECT_ERROR;
00418          }
00419          if (selarray[i].waitForRead)
00420          {
00421             FD_SET(fd, &ifds);
00422          }
00423          if (selarray[i].waitForWrite)
00424          {
00425             FD_SET(fd, &ofds);
00426          }
00427       }
00428 
00429       timeval tv;
00430       tv.tv_sec = end.tv_sec - now.tv_sec;
00431       if (end.tv_usec >= now.tv_usec)
00432       {
00433          tv.tv_usec = end.tv_usec - now.tv_usec;
00434       }
00435       else
00436       {
00437          tv.tv_sec--;
00438          tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00439       }
00440 
00441       if ((tv.tv_sec != 0) || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00442       {
00443          tv.tv_sec = 0;
00444          tv.tv_usec = loopMicroSeconds;
00445       }
00446 
00447       Thread::testCancel();
00448       rc = ::select(maxfd+1, &ifds, &ofds, NULL, &tv);
00449       lerrno = errno;
00450       Thread::testCancel();
00451 
00452       gettimeofday(&now, NULL);
00453    }
00454    
00455    if (rc < 0)
00456    {
00457       if (lerrno == EINTR)
00458       {
00459 #ifdef BLOCXX_NETWARE
00460          // When the NetWare server is shutting down, select will
00461          // set errno to EINTR on return. If this thread does not
00462          // yield control (cooperative multitasking) then we end
00463          // up in a very tight loop and get a CPUHog server abbend.
00464          pthread_yield();
00465 #endif
00466          return Select::SELECT_INTERRUPTED;
00467       }
00468       else
00469       {
00470          return Select::SELECT_ERROR;
00471       }
00472    }
00473    if (rc == 0)
00474    {
00475       return Select::SELECT_TIMEOUT;
00476    }
00477    int availableCount = 0;
00478    int cval;
00479    for (size_t i = 0; i < selarray.size(); i++)
00480    {
00481       selarray[i].wasError = false;
00482       cval = 0;
00483       if (FD_ISSET(selarray[i].s, &ifds))
00484       {
00485          selarray[i].readAvailable = true;
00486          cval = 1;
00487       }
00488       else
00489       {
00490          selarray[i].readAvailable = false;
00491       }
00492 
00493       if (FD_ISSET(selarray[i].s, &ofds))
00494       {
00495          selarray[i].writeAvailable = true;
00496          cval = 1;
00497       }
00498       else
00499       {
00500          selarray[i].writeAvailable = false;
00501       }
00502 
00503       availableCount += cval;
00504 
00505    }
00506       
00507    return availableCount;
00508 #else
00509    return SELECT_NOT_IMPLEMENTED;
00510 #endif
00511 }
00512 
00513 int
00514 selectRW(SelectObjectArray& selarray, UInt32 ms)
00515 {
00516    int rv = selectRWEpoll(selarray, ms);
00517    if (rv != SELECT_NOT_IMPLEMENTED)
00518    {
00519       return rv;
00520    }
00521 
00522    rv = selectRWPoll(selarray, ms);
00523    if (rv != SELECT_NOT_IMPLEMENTED)
00524    {
00525       return rv;
00526    }
00527 
00528    rv = selectRWSelect(selarray, ms);
00529    BLOCXX_ASSERT(rv != SELECT_NOT_IMPLEMENTED);
00530    return rv;
00531 }
00532 
00534 #endif   // #else BLOCXX_WIN32
00535 
00536 int
00537 select(const SelectTypeArray& selarray, UInt32 ms)
00538 {
00539    SelectObjectArray soa;
00540    soa.reserve(selarray.size());
00541    for (size_t i = 0; i < selarray.size(); ++i)
00542    {
00543       SelectObject curObj(selarray[i]);
00544       curObj.waitForRead = true;
00545       soa.push_back(curObj);
00546    }
00547    int rv = selectRW(soa, ms);
00548    if (rv < 0)
00549    {
00550       return rv;
00551    }
00552 
00553    // find the first selected object
00554    for (size_t i = 0; i < soa.size(); ++i)
00555    {
00556       if (soa[i].readAvailable)
00557       {
00558          return i;
00559       }
00560    }
00561    return SELECT_ERROR;
00562 }
00563 
00564 } // end namespace Select
00565 
00566 } // end namespace BLOCXX_NAMESPACE
00567 

Generated on Mon Sep 12 23:56:36 2005 for blocxx by  doxygen 1.4.4