00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00036 #include "BLOCXX_config.h"
00037 #include "Select.hpp"
00038 #include "AutoPtr.hpp"
00039 #include "Assertion.hpp"
00040 #include "Thread.hpp"
00041
00042 #if defined(BLOCXX_WIN32)
00043 #include <cassert>
00044 #endif
00045
00046 extern "C"
00047 {
00048
00049 #ifndef BLOCXX_WIN32
00050 #ifdef BLOCXX_HAVE_SYS_EPOLL_H
00051 #include <sys/epoll.h>
00052 #endif
00053 #if defined (BLOCXX_HAVE_SYS_POLL_H)
00054 #include <sys/poll.h>
00055 #endif
00056 #if defined (BLOCXX_HAVE_SYS_SELECT_H)
00057 #include <sys/select.h>
00058 #endif
00059 #endif
00060
00061 #ifdef BLOCXX_HAVE_SYS_TIME_H
00062 #include <sys/time.h>
00063 #endif
00064
00065 #include <sys/types.h>
00066
00067 #ifdef BLOCXX_HAVE_UNISTD_H
00068 #include <unistd.h>
00069 #endif
00070
00071 #include <errno.h>
00072 }
00073
00074 namespace BLOCXX_NAMESPACE
00075 {
00076
00077 namespace Select
00078 {
00079 #if defined(BLOCXX_WIN32)
00080
00081 int
00082 selectRW(SelectObjectArray& selarray, UInt32 ms)
00083 {
00084 int rc;
00085 size_t hcount = static_cast<DWORD>(selarray.size());
00086 AutoPtrVec<HANDLE> hdls(new HANDLE[hcount]);
00087
00088 size_t handleidx = 0;
00089 for (size_t i = 0; i < selarray.size(); i++, handleidx++)
00090 {
00091 if(selarray[i].s.sockfd != INVALID_SOCKET
00092 && selarray[i].s.networkevents)
00093 {
00094 ::WSAEventSelect(selarray[i].s.sockfd,
00095 selarray[i].s.event, selarray[i].s.networkevents);
00096 }
00097
00098 hdls[handleidx] = selarray[i].s.event;
00099 }
00100
00101 DWORD timeout = (ms != ~0U) ? ms : INFINITE;
00102 DWORD cc = ::WaitForMultipleObjects(hcount, hdls.get(), FALSE, timeout);
00103
00104 assert(cc != WAIT_ABANDONED);
00105
00106 switch (cc)
00107 {
00108 case WAIT_FAILED:
00109 rc = Select::SELECT_ERROR;
00110 break;
00111 case WAIT_TIMEOUT:
00112 rc = Select::SELECT_TIMEOUT;
00113 break;
00114 default:
00115 rc = cc - WAIT_OBJECT_0;
00116
00117
00118
00119 if(selarray[rc].s.sockfd != INVALID_SOCKET)
00120 {
00121 if(selarray[rc].s.networkevents
00122 && selarray[rc].s.doreset == false)
00123 {
00124 ::WSAEventSelect(selarray[rc].s.sockfd,
00125 selarray[rc].s.event, selarray[rc].s.networkevents);
00126 }
00127 else
00128 {
00129
00130 ::WSAEventSelect(selarray[rc].s.sockfd,
00131 selarray[rc].s.event, 0);
00132 u_long ioctlarg = 0;
00133 ::ioctlsocket(selarray[rc].s.sockfd, FIONBIO, &ioctlarg);
00134 }
00135 }
00136 break;
00137 }
00138
00139 if( rc < 0 )
00140 return rc;
00141
00142 int availableCount = 0;
00143 for (size_t i = 0; i < selarray.size(); i++)
00144 {
00145 if( WaitForSingleObject(selarray[i].s.event, 0) == WAIT_OBJECT_0 )
00146 {
00147 if( selarray[i].waitForRead )
00148 selarray[i].readAvailable = true;
00149 if( selarray[i].waitForWrite )
00150 selarray[i].writeAvailable = true;
00151 ++availableCount;
00152 }
00153 else
00154 {
00155 selarray[i].readAvailable = false;
00156 selarray[i].writeAvailable = false;
00157 }
00158 }
00159 return availableCount;
00160 }
00161
00162
00163 #else
00164
00166
00167 int
00168 selectRWEpoll(SelectObjectArray& selarray, UInt32 ms)
00169 {
00170 #ifdef BLOCXX_HAVE_SYS_EPOLL_H
00171 int lerrno = 0, ecc = 0;
00172 int timeout;
00173 AutoPtrVec<epoll_event> events(new epoll_event[selarray.size()]);
00174 int epfd = epoll_create(selarray.size());
00175 if(epfd == -1)
00176 {
00177 if (errno == ENOSYS)
00178 {
00179 return SELECT_NOT_IMPLEMENTED;
00180 }
00181
00182 return Select::SELECT_ERROR;
00183 }
00184
00185 for (size_t i = 0; i < selarray.size(); i++)
00186 {
00187 BLOCXX_ASSERT(selarray[i].s >= 0);
00188 selarray[i].readAvailable = false;
00189 selarray[i].writeAvailable = false;
00190 selarray[i].wasError = false;
00191 events[i].data.u32 = i;
00192 events[i].events = 0;
00193 if(selarray[i].waitForRead)
00194 {
00195 events[i].events |= (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP);
00196 }
00197 if(selarray[i].waitForWrite)
00198 {
00199 events[i].events |= EPOLLOUT;
00200 }
00201
00202 if(epoll_ctl(epfd, EPOLL_CTL_ADD, selarray[i].s, &events[i]) != 0)
00203 {
00204 ::close(epfd);
00205
00206 return Select::SELECT_ERROR;
00207 }
00208 }
00209
00210
00211 const Int32 loopMicroSeconds = 100 * 1000;
00212 timeval now, end;
00213 gettimeofday(&now, NULL);
00214 end = now;
00215 end.tv_sec += ms / 1000;
00216 end.tv_usec += (ms % 1000) * 1000;
00217
00218 while ((ecc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00219 || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec))))
00220 {
00221 timeval tv;
00222 tv.tv_sec = end.tv_sec - now.tv_sec;
00223 if (end.tv_usec >= now.tv_usec)
00224 {
00225 tv.tv_usec = end.tv_usec - now.tv_usec;
00226 }
00227 else
00228 {
00229 tv.tv_sec--;
00230 tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00231 }
00232
00233 if ((tv.tv_sec != 0)
00234 || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00235 {
00236 tv.tv_sec = 0;
00237 tv.tv_usec = loopMicroSeconds;
00238 }
00239
00240 timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
00241 Thread::testCancel();
00242 ecc = epoll_wait(epfd, events.get(), selarray.size(), timeout);
00243 lerrno = errno;
00244 Thread::testCancel();
00245 gettimeofday(&now, NULL);
00246 }
00247
00248 ::close(epfd);
00249 if (ecc < 0)
00250 {
00251 return (lerrno == EINTR) ? Select::SELECT_INTERRUPTED : Select::SELECT_ERROR;
00252 }
00253 if (ecc == 0)
00254 {
00255 return Select::SELECT_TIMEOUT;
00256 }
00257
00258 for(int i = 0; i < ecc; i++)
00259 {
00260 int ndx = events[i].data.u32;
00261 selarray[ndx].readAvailable = ((events[i].events
00262 & (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP)) != 0);
00263 selarray[ndx].writeAvailable = ((events[i].events
00264 & (EPOLLOUT | EPOLLERR | EPOLLHUP)) != 0);
00265 }
00266
00267 return ecc;
00268 #else
00269 return SELECT_NOT_IMPLEMENTED;
00270 #endif
00271 }
00272
00274
00275 int
00276 selectRWPoll(SelectObjectArray& selarray, UInt32 ms)
00277 {
00278 #if defined (BLOCXX_HAVE_SYS_POLL_H)
00279 int lerrno = 0, rc = 0;
00280
00281 AutoPtrVec<pollfd> pfds(new pollfd[selarray.size()]);
00282
00283
00284 timeval now, end;
00285 const Int32 loopMicroSeconds = 100 * 1000;
00286 gettimeofday(&now, NULL);
00287 end = now;
00288 end.tv_sec += ms / 1000;
00289 end.tv_usec += (ms % 1000) * 1000;
00290 while ((rc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00291 || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec))))
00292 {
00293 for (size_t i = 0; i < selarray.size(); i++)
00294 {
00295 BLOCXX_ASSERT(selarray[i].s >= 0);
00296 selarray[i].readAvailable = false;
00297 selarray[i].writeAvailable = false;
00298 selarray[i].wasError = false;
00299 pfds[i].revents = 0;
00300 pfds[i].fd = selarray[i].s;
00301 pfds[i].events = selarray[i].waitForRead ? (POLLIN | POLLPRI) : 0;
00302 if(selarray[i].waitForWrite)
00303 pfds[i].events |= POLLOUT;
00304 }
00305
00306 timeval tv;
00307 tv.tv_sec = end.tv_sec - now.tv_sec;
00308 if (end.tv_usec >= now.tv_usec)
00309 {
00310 tv.tv_usec = end.tv_usec - now.tv_usec;
00311 }
00312 else
00313 {
00314 tv.tv_sec--;
00315 tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00316 }
00317
00318 if ((tv.tv_sec != 0) || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00319 {
00320 tv.tv_sec = 0;
00321 tv.tv_usec = loopMicroSeconds;
00322 }
00323
00324
00325 int loopMSecs = tv.tv_sec * 1000 + tv.tv_usec / 1000;
00326
00327 Thread::testCancel();
00328 rc = ::poll(pfds.get(), selarray.size(), loopMSecs);
00329 lerrno = errno;
00330 Thread::testCancel();
00331
00332 gettimeofday(&now, NULL);
00333 }
00334
00335 if (rc < 0)
00336 {
00337 if (lerrno == EINTR)
00338 {
00339 #ifdef BLOCXX_NETWARE
00340
00341
00342
00343
00344 pthread_yield();
00345 #endif
00346 return Select::SELECT_INTERRUPTED;
00347 }
00348 else
00349 {
00350 return Select::SELECT_ERROR;
00351 }
00352 }
00353 if (rc == 0)
00354 {
00355 return Select::SELECT_TIMEOUT;
00356 }
00357 for (size_t i = 0; i < selarray.size(); i++)
00358 {
00359 if(pfds[i].events & (POLLERR | POLLNVAL))
00360 {
00361 selarray[i].wasError = true;
00362 }
00363 else
00364 {
00365 if(selarray[i].waitForRead)
00366 {
00367 selarray[i].readAvailable = (pfds[i].revents &
00368 (POLLIN | POLLPRI | POLLHUP));
00369 }
00370
00371 if(selarray[i].waitForWrite)
00372 {
00373 selarray[i].writeAvailable = (pfds[i].revents &
00374 (POLLOUT | POLLHUP));
00375 }
00376 }
00377 }
00378
00379 return rc;
00380 #else
00381 return SELECT_NOT_IMPLEMENTED;
00382 #endif
00383 }
00385
00386 int
00387 selectRWSelect(SelectObjectArray& selarray, UInt32 ms)
00388 {
00389 #if defined (BLOCXX_HAVE_SYS_SELECT_H)
00390 int lerrno, rc = 0;
00391 fd_set ifds;
00392 fd_set ofds;
00393
00394
00395 timeval now, end;
00396 const Int32 loopMicroSeconds = 100 * 1000;
00397 gettimeofday(&now, NULL);
00398 end = now;
00399 end.tv_sec += ms / 1000;
00400 end.tv_usec += (ms % 1000) * 1000;
00401 while ((rc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00402 || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec))))
00403 {
00404 int maxfd = 0;
00405 FD_ZERO(&ifds);
00406 FD_ZERO(&ofds);
00407 for (size_t i = 0; i < selarray.size(); ++i)
00408 {
00409 int fd = selarray[i].s;
00410 BLOCXX_ASSERT(fd >= 0);
00411 if (maxfd < fd)
00412 {
00413 maxfd = fd;
00414 }
00415 if (fd < 0 || fd >= FD_SETSIZE)
00416 {
00417 return Select::SELECT_ERROR;
00418 }
00419 if (selarray[i].waitForRead)
00420 {
00421 FD_SET(fd, &ifds);
00422 }
00423 if (selarray[i].waitForWrite)
00424 {
00425 FD_SET(fd, &ofds);
00426 }
00427 }
00428
00429 timeval tv;
00430 tv.tv_sec = end.tv_sec - now.tv_sec;
00431 if (end.tv_usec >= now.tv_usec)
00432 {
00433 tv.tv_usec = end.tv_usec - now.tv_usec;
00434 }
00435 else
00436 {
00437 tv.tv_sec--;
00438 tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00439 }
00440
00441 if ((tv.tv_sec != 0) || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00442 {
00443 tv.tv_sec = 0;
00444 tv.tv_usec = loopMicroSeconds;
00445 }
00446
00447 Thread::testCancel();
00448 rc = ::select(maxfd+1, &ifds, &ofds, NULL, &tv);
00449 lerrno = errno;
00450 Thread::testCancel();
00451
00452 gettimeofday(&now, NULL);
00453 }
00454
00455 if (rc < 0)
00456 {
00457 if (lerrno == EINTR)
00458 {
00459 #ifdef BLOCXX_NETWARE
00460
00461
00462
00463
00464 pthread_yield();
00465 #endif
00466 return Select::SELECT_INTERRUPTED;
00467 }
00468 else
00469 {
00470 return Select::SELECT_ERROR;
00471 }
00472 }
00473 if (rc == 0)
00474 {
00475 return Select::SELECT_TIMEOUT;
00476 }
00477 int availableCount = 0;
00478 int cval;
00479 for (size_t i = 0; i < selarray.size(); i++)
00480 {
00481 selarray[i].wasError = false;
00482 cval = 0;
00483 if (FD_ISSET(selarray[i].s, &ifds))
00484 {
00485 selarray[i].readAvailable = true;
00486 cval = 1;
00487 }
00488 else
00489 {
00490 selarray[i].readAvailable = false;
00491 }
00492
00493 if (FD_ISSET(selarray[i].s, &ofds))
00494 {
00495 selarray[i].writeAvailable = true;
00496 cval = 1;
00497 }
00498 else
00499 {
00500 selarray[i].writeAvailable = false;
00501 }
00502
00503 availableCount += cval;
00504
00505 }
00506
00507 return availableCount;
00508 #else
00509 return SELECT_NOT_IMPLEMENTED;
00510 #endif
00511 }
00512
00513 int
00514 selectRW(SelectObjectArray& selarray, UInt32 ms)
00515 {
00516 int rv = selectRWEpoll(selarray, ms);
00517 if (rv != SELECT_NOT_IMPLEMENTED)
00518 {
00519 return rv;
00520 }
00521
00522 rv = selectRWPoll(selarray, ms);
00523 if (rv != SELECT_NOT_IMPLEMENTED)
00524 {
00525 return rv;
00526 }
00527
00528 rv = selectRWSelect(selarray, ms);
00529 BLOCXX_ASSERT(rv != SELECT_NOT_IMPLEMENTED);
00530 return rv;
00531 }
00532
00534 #endif // #else BLOCXX_WIN32
00535
00536 int
00537 select(const SelectTypeArray& selarray, UInt32 ms)
00538 {
00539 SelectObjectArray soa;
00540 soa.reserve(selarray.size());
00541 for (size_t i = 0; i < selarray.size(); ++i)
00542 {
00543 SelectObject curObj(selarray[i]);
00544 curObj.waitForRead = true;
00545 soa.push_back(curObj);
00546 }
00547 int rv = selectRW(soa, ms);
00548 if (rv < 0)
00549 {
00550 return rv;
00551 }
00552
00553
00554 for (size_t i = 0; i < soa.size(); ++i)
00555 {
00556 if (soa[i].readAvailable)
00557 {
00558 return i;
00559 }
00560 }
00561 return SELECT_ERROR;
00562 }
00563
00564 }
00565
00566 }
00567