00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include <iostream>
00015 #include <sstream>
00016 #include <string>
00017
00018 #include "assa/Reactor.h"
00019 #include "assa/Logger.h"
00020
00021 using namespace ASSA;
00022
00023 Reactor::
00024 Reactor () :
00025 m_fd_setsize (1024),
00026 m_maxfd_plus1 (0),
00027 m_active (true)
00028 {
00029 trace_with_mask("Reactor::Reactor",REACTTRACE);
00030
00034 #if defined(WIN32)
00035 m_fd_setsize = FD_SETSIZE;
00036
00037 #else // POSIX
00038 struct rlimit rlim;
00039 rlim.rlim_max = 0;
00040
00041 if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
00042 m_fd_setsize = rlim.rlim_cur;
00043 }
00044 #endif
00045
00048 #if defined (WIN32)
00049 WSADATA data;
00050 WSAStartup (MAKEWORD (2, 2), &data);
00051 #endif
00052 }
00053
00054 Reactor::
00055 ~Reactor()
00056 {
00057 trace_with_mask("Reactor::~Reactor",REACTTRACE);
00058
00059 m_readSet.clear ();
00060 m_writeSet.clear ();
00061 m_exceptSet.clear ();
00062 deactivate ();
00063 }
00064
00065 TimerId
00066 Reactor::
00067 registerTimerHandler (EventHandler* eh_,
00068 const TimeVal& timeout_,
00069 const std::string& name_)
00070 {
00071 trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
00072 Assure_return (eh_);
00073
00074 TimeVal now (TimeVal::gettimeofday());
00075 TimeVal t (now + timeout_);
00076
00077 DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n",
00078 timeout_.sec(),timeout_.msec()));
00079 DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
00080 DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
00081
00082 TimerId tid = m_tqueue.insert (eh_, t, timeout_, name_);
00083
00084 DL((REACT,"---Modified Timer Queue----\n"));
00085 m_tqueue.dump();
00086 DL((REACT,"---------------------------\n"));
00087
00088 return (tid);
00089 }
00090
00091 bool
00092 Reactor::
00093 registerIOHandler (EventHandler* eh_, handler_t fd_, EventType et_)
00094 {
00095 trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
00096
00097 std::ostringstream msg;
00098 Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));
00099
00100 if (isReadEvent (et_))
00101 {
00102 if (!m_waitSet.m_rset.setFd (fd_))
00103 {
00104 DL((ASSAERR,"readset: fd %d out of range\n", fd_));
00105 return (false);
00106 }
00107 m_readSet[fd_] = eh_;
00108 msg << "READ_EVENT";
00109 }
00110
00111 if (isWriteEvent (et_))
00112 {
00113 if (!m_waitSet.m_wset.setFd (fd_))
00114 {
00115 DL((ASSAERR,"writeset: fd %d out of range\n", fd_));
00116 return (false);
00117 }
00118 m_writeSet[fd_] = eh_;
00119 msg << " WRITE_EVENT";
00120 }
00121
00122 if (isExceptEvent (et_))
00123 {
00124 if (!m_waitSet.m_eset.setFd (fd_))
00125 {
00126 DL((ASSAERR,"exceptset: fd %d out of range\n", fd_));
00127 return (false);
00128 }
00129 m_exceptSet[fd_] = eh_;
00130 msg << " EXCEPT_EVENT";
00131 }
00132 msg << std::ends;
00133
00134 DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n",
00135 eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () ));
00136
00137 #if !defined (WIN32)
00138 if (m_maxfd_plus1 < fd_+1) {
00139 m_maxfd_plus1 = fd_+1;
00140 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00141 }
00142 #endif
00143
00144 DL((REACT,"Modified waitSet:\n"));
00145 m_waitSet.dump ();
00146
00147 return (true);
00148 }
00149
00150 bool
00151 Reactor::
00152 removeTimerHandler (TimerId tid_)
00153 {
00154 trace_with_mask("Reactor::removeTimer",REACTTRACE);
00155 bool ret;
00156
00157 if ((ret = m_tqueue.remove (tid_))) {
00158 DL((REACT,"---Modified Timer Queue----\n"));
00159 m_tqueue.dump();
00160 DL((REACT,"---------------------------\n"));
00161 }
00162 else {
00163 EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
00164 }
00165 return (ret);
00166 }
00167
00171 bool
00172 Reactor::
00173 removeHandler (EventHandler* eh_, EventType event_)
00174 {
00175 trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
00176
00177 bool ret = false;
00178 handler_t fd;
00179 Fd2Eh_Map_Iter iter;
00180
00181 if (eh_ == NULL) {
00182 return false;
00183 }
00184
00185 if (isTimeoutEvent (event_)) {
00186 ret = m_tqueue.remove (eh_);
00187 ret = true;
00188 }
00189
00190 if (isReadEvent (event_)) {
00191 iter = m_readSet.begin ();
00192 while (iter != m_readSet.end ()) {
00193 if ((*iter).second == eh_) {
00194 fd = (*iter).first;
00195 m_readSet.erase (iter);
00196 m_waitSet.m_rset.clear (fd);
00197 ret = true;
00198 break;
00199 }
00200 iter++;
00201 }
00202 }
00203
00204 if (isWriteEvent (event_)) {
00205 iter = m_writeSet.begin ();
00206 while (iter != m_writeSet.end ()) {
00207 if ((*iter).second == eh_) {
00208 fd = (*iter).first;
00209 m_writeSet.erase (iter);
00210 m_waitSet.m_wset.clear (fd);
00211 ret = true;
00212 break;
00213 }
00214 iter++;
00215 }
00216 }
00217
00218 if (isExceptEvent (event_)) {
00219 iter = m_exceptSet.begin ();
00220 while (iter != m_exceptSet.end ()) {
00221 if ((*iter).second == eh_) {
00222 fd = (*iter).first;
00223 m_exceptSet.erase (iter);
00224 m_waitSet.m_eset.clear (fd);
00225 ret = true;
00226 break;
00227 }
00228 iter++;
00229 }
00230 }
00231
00232 if (ret == true) {
00233 DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_));
00234 eh_->handle_close (fd);
00235 }
00236
00237 adjust_maxfdp1 (fd);
00238
00239 DL((REACT,"Modifies waitSet:\n"));
00240 m_waitSet.dump ();
00241
00242 return (ret);
00243 }
00244
00245 bool
00246 Reactor::
00247 removeIOHandler (handler_t fd_)
00248 {
00249 trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
00250
00251 bool ret = false;
00252 EventHandler* ehp = NULL;
00253 Fd2Eh_Map_Iter iter;
00254
00255 Assure_return (ASSA::is_valid_handler (fd_));
00256
00257 DL((REACT,"Removing handler for fd=%d\n",fd_));
00258
00263 if ((iter = m_readSet.find (fd_)) != m_readSet.end ())
00264 {
00265 ehp = (*iter).second;
00266 m_readSet.erase (iter);
00267 m_waitSet.m_rset.clear (fd_);
00268 m_readySet.m_rset.clear (fd_);
00269 if (m_readSet.size () > 0) {
00270 iter = m_readSet.end ();
00271 iter--;
00272 }
00273 ret = true;
00274 }
00275
00276 if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ())
00277 {
00278 ehp = (*iter).second;
00279 m_writeSet.erase (iter);
00280 m_waitSet.m_wset.clear (fd_);
00281 m_readySet.m_wset.clear (fd_);
00282 if (m_writeSet.size () > 0) {
00283 iter = m_writeSet.end ();
00284 iter--;
00285 }
00286 ret = true;
00287 }
00288
00289 if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ())
00290 {
00291 ehp = (*iter).second;
00292 m_exceptSet.erase (iter);
00293 m_waitSet.m_eset.clear (fd_);
00294 m_readySet.m_eset.clear (fd_);
00295 if (m_exceptSet.size () > 0) {
00296 iter = m_exceptSet.end ();
00297 iter--;
00298 }
00299 ret = true;
00300 }
00301
00302 if (ret == true && ehp != NULL) {
00303 DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp));
00304 ehp->handle_close (fd_);
00305 }
00306
00307 adjust_maxfdp1 (fd_);
00308
00309 DL((REACT,"Modifies waitSet:\n"));
00310 m_waitSet.dump ();
00311
00312 return (ret);
00313 }
00314
00315 bool
00316 Reactor::
00317 checkFDs (void)
00318 {
00319 trace_with_mask("Reactor::checkFDs",REACTTRACE);
00320
00321 bool num_removed = false;
00322 FdSet mask;
00323 timeval poll = { 0, 0 };
00324
00325 for (handler_t fd = 0; fd < m_fd_setsize; fd++) {
00326 if ( m_readSet[fd] != NULL ) {
00327 mask.setFd (fd);
00328 if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
00329 removeIOHandler (fd);
00330 num_removed = true;
00331 DL((REACT,"Detected BAD FD: %d\n", fd ));
00332 }
00333 mask.clear (fd);
00334 }
00335 }
00336 return (num_removed);
00337 }
00338
00339 bool
00340 Reactor::
00341 handleError (void)
00342 {
00343 trace_with_mask("Reactor::handleError",REACTTRACE);
00344
00347 if ( !m_active ) {
00348 DL((REACT,"Received cmd to stop Reactor\n"));
00349 return (false);
00350 }
00351
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370 if ( errno == EINTR ) {
00371 EL((REACT,"EINTR: interrupted select(2)\n"));
00372
00373
00374
00375
00376
00377
00378
00379 return (true);
00380 }
00381
00382
00383
00384
00385
00386
00387 if ( errno == EBADF ) {
00388 DL((REACT,"EBADF: bad file descriptor\n"));
00389 return (checkFDs ());
00390 }
00391
00392
00393
00394 #if defined (WIN32)
00395 DL ((REACT,"select(3) error = %d\n", WSAGetLastError()));
00396 #else
00397 EL((ASSAERR,"select(3) error\n"));
00398 #endif
00399 return (false);
00400 }
00401
00402 int
00403 Reactor::
00404 isAnyReady (void)
00405 {
00406 trace_with_mask("Reactor::isAnyReady",REACTTRACE);
00407
00408 int n = m_readySet.m_rset.numSet () +
00409 m_readySet.m_wset.numSet () +
00410 m_readySet.m_eset.numSet ();
00411
00412 if ( n > 0 ) {
00413 DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
00414 m_readySet.dump ();
00415 }
00416 return (n);
00417 }
00418
00419 void
00420 Reactor::
00421 calculateTimeout (TimeVal*& howlong_, TimeVal* maxwait_)
00422 {
00423 trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
00424
00425 TimeVal now;
00426 TimeVal tv;
00427
00428 if (m_tqueue.isEmpty () ) {
00429 howlong_ = maxwait_;
00430 goto done;
00431 }
00432 now = TimeVal::gettimeofday ();
00433 tv = m_tqueue.top ();
00434
00435 if (tv < now) {
00436
00437
00438
00439
00440
00441 *howlong_ = 0;
00442 }
00443 else {
00444 DL((REACT,"--------- Timer Queue ----------\n"));
00445 m_tqueue.dump();
00446 DL((REACT,"--------------------------------\n"));
00447
00448 if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
00449 *howlong_ = tv - now;
00450 }
00451 else {
00452 *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
00453 }
00454 }
00455
00456 done:
00457 if (howlong_ != NULL) {
00458 DL((REACT,"delay (%f)\n", double (*howlong_) ));
00459 }
00460 else {
00461 DL((REACT,"delay (forever)\n"));
00462 }
00463 }
00464
00468 void
00469 Reactor::
00470 waitForEvents (void)
00471 {
00472 while ( m_active ) {
00473 waitForEvents ((TimeVal*) NULL);
00474 }
00475 }
00476
00493 void
00494 Reactor::
00495 waitForEvents (TimeVal* tv_)
00496 {
00497 trace_with_mask("Reactor::waitForEvents",REACTTRACE);
00498
00499 TimerCountdown traceTime (tv_);
00500 DL((REACT,"======================================\n"));
00501
00502
00503 m_tqueue.expire (TimeVal::gettimeofday ());
00504
00505
00506
00507
00508 if (!m_active) {
00509 return;
00510 }
00511
00512 int nReady;
00513 TimeVal delay;
00514 TimeVal* dlp = &delay;
00515
00516
00517
00518
00519
00520
00521
00522
00523 if ((nReady = isAnyReady ())) {
00524 DL((REACT,"isAnyReady returned: %d\n",nReady));
00525 dispatch (nReady);
00526 return;
00527 }
00528
00529 DL((REACT,"=== m_waitSet ===\n"));
00530 m_waitSet.dump ();
00531
00532 do {
00533 m_readySet.reset ();
00534 DL ((REACT,"m_readySet after reset():\n"));
00535 m_readySet.dump ();
00536
00537 m_readySet = m_waitSet;
00538 DL ((REACT,"m_readySet after assign:\n"));
00539 m_readySet.dump ();
00540
00541 calculateTimeout (dlp, tv_);
00542
00543 nReady = ::select (m_maxfd_plus1,
00544 &m_readySet.m_rset,
00545 &m_readySet.m_wset,
00546 &m_readySet.m_eset,
00547 dlp);
00548 DL((REACT,"::select() returned: %d\n",nReady));
00549
00550 m_readySet.sync ();
00551 DL ((REACT,"m_readySet after select:\n"));
00552 m_readySet.dump ();
00553
00554 }
00555 while (nReady < 0 && handleError ());
00556
00557 dispatch (nReady);
00558 }
00559
00566 void
00567 Reactor::
00568 dispatchHandler (FdSet& mask_, Fd2Eh_Map_Type& fdSet_, EH_IO_Callback callback_)
00569 {
00570 trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
00571
00572 int ret = 0;
00573 handler_t fd;
00574 EventHandler* ehp = NULL;
00575 std::string eh_id;
00576
00577 Fd2Eh_Map_Iter iter = fdSet_.begin ();
00578
00579 while (iter != fdSet_.end ())
00580 {
00581 fd = (*iter).first;
00582 ehp = (*iter).second;
00583
00584 if (mask_.isSet (fd) && ehp != NULL)
00585 {
00586 eh_id = ehp->get_id ();
00587 DL((REACT,"Data detected from \"%s\"(fd=%d)\n",
00588 eh_id.c_str (), fd));
00589
00590 ret = (ehp->*callback_) (fd);
00591
00592 if (ret == -1) {
00593 removeIOHandler (fd);
00594 }
00595 else if (ret > 0) {
00596 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n",
00597 ret, fd, eh_id.c_str ()));
00598
00599 }
00600 else {
00601 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n",
00602 eh_id.c_str (), fd));
00603 mask_.clear (fd);
00604 }
00611 iter = fdSet_.begin ();
00612 }
00613 else {
00614 iter++;
00615 }
00616 }
00617 }
00618
00624 bool
00625 Reactor::
00626 dispatch (int ready_)
00627 {
00628 trace_with_mask("Reactor::dispatch", REACTTRACE);
00629
00630 m_tqueue.expire (TimeVal::gettimeofday ());
00631
00632 if ( ready_ < 0 )
00633 {
00634 #if !defined (WIN32)
00635 EL((ASSAERR,"::select(3) error\n"));
00636 #endif
00637 return (false);
00638 }
00639 if ( ready_ == 0 ) {
00640 return (true);
00641 }
00642
00643 DL((REACT,"Dispatching %d FDs.\n",ready_));
00644 DL((REACT,"m_readySet:\n"));
00645 m_readySet.dump ();
00646
00647
00648 dispatchHandler (m_readySet.m_wset,
00649 m_writeSet,
00650 &EventHandler::handle_write);
00651
00652
00653 dispatchHandler (m_readySet.m_eset,
00654 m_exceptSet,
00655 &EventHandler::handle_except);
00656
00657
00658 dispatchHandler (m_readySet.m_rset,
00659 m_readSet,
00660 &EventHandler::handle_read);
00661
00662 return (true);
00663 }
00664
00665 void
00666 Reactor::
00667 stopReactor (void)
00668 {
00669 trace_with_mask("Reactor::stopReactor", REACTTRACE);
00670
00671 m_active = false;
00672
00673 Fd2Eh_Map_Iter iter;
00674 EventHandler* ehp;
00675
00676 while (m_readSet.size () > 0) {
00677 iter = m_readSet.begin ();
00678 ehp = (*iter).second;
00679 removeHandler (ehp);
00680 }
00681
00682 while (m_writeSet.size () > 0) {
00683 iter = m_writeSet.begin ();
00684 ehp = (*iter).second;
00685 removeHandler (ehp);
00686 }
00687
00688 while (m_exceptSet.size () > 0) {
00689 iter = m_exceptSet.begin ();
00690 ehp = (*iter).second;
00691 removeHandler (ehp);
00692 }
00693 }
00694
00699 void
00700 Reactor::
00701 adjust_maxfdp1 (handler_t fd_)
00702 {
00703 #if !defined (WIN32)
00704
00705 trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE);
00706
00707 if (m_maxfd_plus1 == fd_ + 1)
00708 {
00709 m_maxfd_plus1 = m_waitSet.max_fd () + 1;
00710 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00711 }
00712 #endif
00713 }