libassa  3.5.1
Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
ASSA::Reactor Class Reference

#include <Reactor.h>

Public Member Functions

 Reactor ()
 Constructor. More...
 
 ~Reactor ()
 Destructor. More...
 
TimerId registerTimerHandler (EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>")
 Register Timer Event handler with Reactor. More...
 
bool registerIOHandler (EventHandler *eh_, handler_t fd_, EventType et_=RWE_EVENTS)
 Register I/O Event handler with Reactor. More...
 
bool removeHandler (EventHandler *eh_, EventType et_=ALL_EVENTS)
 Remove Event handler from reactor for either all I/O events or timeout event or both. More...
 
bool removeTimerHandler (TimerId id_)
 Remove Timer event from the queue. More...
 
bool removeIOHandler (handler_t fd_)
 Remove IO Event handler from reactor. More...
 
void waitForEvents (void)
 Main waiting loop that blocks indefinitely processing events. More...
 
void waitForEvents (TimeVal *tv_)
 Wait for events for time specified. More...
 
void stopReactor (void)
 Stop Reactor's activity. More...
 
void deactivate (void)
 Deactivate Reactor. More...
 

Private Types

typedef std::map< u_int, EventHandler * > Fd2Eh_Map_Type
 no cloning More...
 
typedef Fd2Eh_Map_Type::iterator Fd2Eh_Map_Iter
 

Private Member Functions

 Reactor (const Reactor &)
 
Reactoroperator= (const Reactor &)
 no cloning More...
 
void adjust_maxfdp1 (handler_t fd_)
 Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether). More...
 
bool handleError (void)
 Handle error in select(2) loop appropriately. More...
 
bool dispatch (int minimum_)
 Notify all EventHandlers registered on respecful events occured. More...
 
int isAnyReady (void)
 Return number of file descriptors ready accross all sets. More...
 
bool checkFDs (void)
 Check mask for bad file descriptors. More...
 
void dispatchHandler (FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_)
 Call handler's callback and, if callback returns negative value, remove it from the Reactor. More...
 
void calculateTimeout (TimeVal *&howlong_, TimeVal *maxwait_)
 Calculate closest timeout. More...
 

Private Attributes

int m_fd_setsize
 Max number of open files per process. More...
 
handler_t m_maxfd_plus1
 Max file descriptor number (in all sets) plus 1. More...
 
bool m_active
 Flag that indicates whether Reactor is active or had been stopped. More...
 
Fd2Eh_Map_Type m_readSet
 Event handlers awaiting on READ_EVENT. More...
 
Fd2Eh_Map_Type m_writeSet
 Event handlers awaiting on WRITE_EVENT. More...
 
Fd2Eh_Map_Type m_exceptSet
 Event handlers awaiting on EXCEPT_EVENT. More...
 
MaskSet m_waitSet
 Handlers to wait for event on. More...
 
MaskSet m_readySet
 Handlers that are ready for processing. More...
 
TimerQueue m_tqueue
 The queue of Timers. More...
 

Detailed Description

Definition at line 57 of file Reactor.h.

Member Typedef Documentation

◆ Fd2Eh_Map_Iter

typedef Fd2Eh_Map_Type::iterator ASSA::Reactor::Fd2Eh_Map_Iter
private

Definition at line 155 of file Reactor.h.

◆ Fd2Eh_Map_Type

typedef std::map<u_int, EventHandler*> ASSA::Reactor::Fd2Eh_Map_Type
private

no cloning

Definition at line 154 of file Reactor.h.

Constructor & Destructor Documentation

◆ Reactor() [1/2]

Reactor::Reactor ( )

Constructor.

Maximum number of sockets supported (per process) Win32 defines it to 64 in winsock2.h.

Initialize winsock2 library

Definition at line 23 of file Reactor.cpp.

24  :
25  m_fd_setsize (1024),
26  m_maxfd_plus1 (0),
27  m_active (true)
28 {
29  trace_with_mask("Reactor::Reactor",REACTTRACE);
30 
34 #if defined(WIN32)
35  m_fd_setsize = FD_SETSIZE;
36 
37 #else // POSIX
38  struct rlimit rlim;
39  rlim.rlim_max = 0;
40 
41  if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
42  m_fd_setsize = rlim.rlim_cur;
43  }
44 #endif
45 
48 #if defined (WIN32)
49  WSADATA data;
50  WSAStartup (MAKEWORD (2, 2), &data);
51 #endif
52 }
#define trace_with_mask(s, m)
trace_with_mask() is used to trace function call chain in C++ program.
Definition: Logger.h:437
handler_t m_maxfd_plus1
Max file descriptor number (in all sets) plus 1.
Definition: Reactor.h:206
int m_fd_setsize
Max number of open files per process.
Definition: Reactor.h:200
bool m_active
Flag that indicates whether Reactor is active or had been stopped.
Definition: Reactor.h:209
@ REACTTRACE
Extended Reactor/PrioriyQueue messages
Definition: LogMask.h:40

References m_fd_setsize, ASSA::REACTTRACE, and trace_with_mask.

◆ ~Reactor()

Reactor::~Reactor ( )

Destructor.

Definition at line 54 of file Reactor.cpp.

56 {
57  trace_with_mask("Reactor::~Reactor",REACTTRACE);
58 
59  m_readSet.clear ();
60  m_writeSet.clear ();
61  m_exceptSet.clear ();
62  deactivate ();
63 }
Fd2Eh_Map_Type m_writeSet
Event handlers awaiting on WRITE_EVENT.
Definition: Reactor.h:215
void deactivate(void)
Deactivate Reactor.
Definition: Reactor.h:234
Fd2Eh_Map_Type m_readSet
Event handlers awaiting on READ_EVENT.
Definition: Reactor.h:212
Fd2Eh_Map_Type m_exceptSet
Event handlers awaiting on EXCEPT_EVENT.
Definition: Reactor.h:218

References deactivate(), m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, and trace_with_mask.

◆ Reactor() [2/2]

ASSA::Reactor::Reactor ( const Reactor )
private

Member Function Documentation

◆ adjust_maxfdp1()

void Reactor::adjust_maxfdp1 ( handler_t  fd_)
private

Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).

If the socket descriptor that has just been eliminated was the maxfd+1, we readjust to the next highest.

Win32 implementation of select() ignores this value altogether.

Definition at line 700 of file Reactor.cpp.

702 {
703 #if !defined (WIN32) /* POSIX */
704 
705  trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE);
706 
707  if (m_maxfd_plus1 == fd_ + 1)
708  {
709  m_maxfd_plus1 = m_waitSet.max_fd () + 1;
710  DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
711  }
712 #endif
713 }
#define DL(X)
A macro for writing debug message to the Logger.
Definition: Logger.h:273
int max_fd()
Return maximum value of the file descriptor in the Set.
Definition: MaskSet.h:71
MaskSet m_waitSet
Handlers to wait for event on.
Definition: Reactor.h:221
@ REACT
Class Reactor/PrioriyQueue messages
Definition: LogMask.h:39

References DL, m_maxfd_plus1, m_waitSet, ASSA::MaskSet::max_fd(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by removeHandler(), and removeIOHandler().

◆ calculateTimeout()

void Reactor::calculateTimeout ( TimeVal *&  howlong_,
TimeVal maxwait_ 
)
private

Calculate closest timeout.

If TimerQueue is not empty, then return smallest of maxtimeout and first in the queue. Otherwise, return maxtimeout.

Parameters
maxwait_(in) how long we are expected to wait for event(s).
howlong_(out) how long we are going to wait.

Definition at line 420 of file Reactor.cpp.

422 {
423  trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
424 
425  TimeVal now;
426  TimeVal tv;
427 
428  if (m_tqueue.isEmpty () ) {
429  howlong_ = maxwait_;
430  goto done;
431  }
432  now = TimeVal::gettimeofday ();
433  tv = m_tqueue.top ();
434 
435  if (tv < now) {
436  /*---
437  It took too long to get here (fraction of a millisecond),
438  and top timer had already expired. In this case,
439  perform non-blocking select in order to drain the timer queue.
440  ---*/
441  *howlong_ = 0;
442  }
443  else {
444  DL((REACT,"--------- Timer Queue ----------\n"));
445  m_tqueue.dump();
446  DL((REACT,"--------------------------------\n"));
447 
448  if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
449  *howlong_ = tv - now;
450  }
451  else {
452  *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
453  }
454  }
455 
456  done:
457  if (howlong_ != NULL) {
458  DL((REACT,"delay (%f)\n", double (*howlong_) ));
459  }
460  else {
461  DL((REACT,"delay (forever)\n"));
462  }
463 }
TimerQueue m_tqueue
The queue of Timers.
Definition: Reactor.h:227
static TimeVal zeroTime()
Static that returns zero timeval: {0,0}.
Definition: TimeVal.h:157
static TimeVal gettimeofday()
Shields off underlying OS differences in getting current time.
Definition: TimeVal.cpp:44
void dump(void)
Dump Queue information to the log file.
Definition: TimerQueue.cpp:152
bool isEmpty()
Is queue empty?
Definition: TimerQueue.h:110
TimeVal & top(void)
Return expiration time of the top element in the queue.
Definition: TimerQueue.h:117

References DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::isEmpty(), m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::top(), trace_with_mask, and ASSA::TimeVal::zeroTime().

Referenced by waitForEvents().

◆ checkFDs()

bool Reactor::checkFDs ( void  )
private

Check mask for bad file descriptors.

Returns
true if any fd(s) were found and removed; false otherwise

Definition at line 316 of file Reactor.cpp.

318 {
319  trace_with_mask("Reactor::checkFDs",REACTTRACE);
320 
321  bool num_removed = false;
322  FdSet mask;
323  timeval poll = { 0, 0 };
324 
325  for (handler_t fd = 0; fd < m_fd_setsize; fd++) {
326  if ( m_readSet[fd] != NULL ) {
327  mask.setFd (fd);
328  if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
329  removeIOHandler (fd);
330  num_removed = true;
331  DL((REACT,"Detected BAD FD: %d\n", fd ));
332  }
333  mask.clear (fd);
334  }
335  }
336  return (num_removed);
337 }
int handler_t
Definition: Logger_Impl.h:82
Class FdSet.
Definition: FdSet.h:52
bool setFd(handler_t fd_)
Set flag (ON) for the argument fd.
Definition: FdSet.cpp:20
bool clear(handler_t fd_)
Clear flag (OFF) for the argument fd.
Definition: FdSet.cpp:39
bool removeIOHandler(handler_t fd_)
Remove IO Event handler from reactor.
Definition: Reactor.cpp:247

References ASSA::FdSet::clear(), DL, m_fd_setsize, m_readSet, ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by handleError().

◆ deactivate()

void ASSA::Reactor::deactivate ( void  )
inline

Deactivate Reactor.

This function sets internal flag which notifies Reactor's internal event handling loop to abort its activity. It is mostly used when a slow system call is interrupted by the signal handler. The system call will be restarted by OS after control returns from the signal handler. Signal handler (GenServer::handle_signal()) should call this method to delay Reactor's deactivation.

Definition at line 234 of file Reactor.h.

234 { m_active = false; }

References m_active.

Referenced by ASSA::GenServer::handle_signal(), ASSA::GenServer::stop_service(), and ~Reactor().

◆ dispatch()

bool Reactor::dispatch ( int  minimum_)
private

Notify all EventHandlers registered on respecful events occured.

Many UNIX systems will count a particular file descriptor in the ready_ only ONCE, even if it was flagged by select(2) in, say, both read and write masks.

Parameters
minimum_number of file descriptors ready.

Definition at line 625 of file Reactor.cpp.

627 {
628  trace_with_mask("Reactor::dispatch", REACTTRACE);
629 
631 
632  if ( ready_ < 0 )
633  {
634 #if !defined (WIN32)
635  EL((ASSAERR,"::select(3) error\n"));
636 #endif
637  return (false);
638  }
639  if ( ready_ == 0 ) {
640  return (true);
641  }
642 
643  DL((REACT,"Dispatching %d FDs.\n",ready_));
644  DL((REACT,"m_readySet:\n"));
645  m_readySet.dump ();
646 
647  /*--- Writes first ---*/
649  m_writeSet,
651 
652  /*--- Exceptions next ---*/
654  m_exceptSet,
656 
657  /*--- Finally, the Reads ---*/
659  m_readSet,
661 
662  return (true);
663 }
#define EL(X)
A macro for writing error message to the Logger.
Definition: Logger.h:285
virtual int handle_write(int fd)
Write handler callback.
Definition: EventHandler.h:180
virtual int handle_except(int fd)
Exception handler callback.
Definition: EventHandler.h:188
virtual int handle_read(int fd)
Read event callback.
Definition: EventHandler.h:172
FdSet m_rset
Read fds set.
Definition: MaskSet.h:28
FdSet m_eset
Exception fds set.
Definition: MaskSet.h:34
void dump()
Write current state of MaskSet object to log file.
Definition: MaskSet.h:80
FdSet m_wset
Write fds set.
Definition: MaskSet.h:31
MaskSet m_readySet
Handlers that are ready for processing.
Definition: Reactor.h:224
void dispatchHandler(FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_)
Call handler's callback and, if callback returns negative value, remove it from the Reactor.
Definition: Reactor.cpp:568
int expire(const TimeVal &tv_)
Traverse the queue, triggering all timers that are past argument timeval.
Definition: TimerQueue.cpp:89
@ ASSAERR
ASSA and system errors
Definition: LogMask.h:34

References ASSA::ASSAERR, dispatchHandler(), DL, ASSA::MaskSet::dump(), EL, ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), ASSA::EventHandler::handle_except(), ASSA::EventHandler::handle_read(), ASSA::EventHandler::handle_write(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

◆ dispatchHandler()

void Reactor::dispatchHandler ( FdSet mask_,
Fd2Eh_Map_Type fdSet_,
EH_IO_Callback  callback_ 
)
private

Call handler's callback and, if callback returns negative value, remove it from the Reactor.

This spot needs re-thinking.

When you have several high data-rate connections sending data at the same time, the one that had connected first would get lower FD number and would get data transfer preference over everybody else who has connected later on.

WIN32 HACK: Without having restarted scan from the beginning, this causes crash due to the fact that firing a callback of EventHandler might have invalidated the iterator (happens with Connector's in a sync mode).

Definition at line 567 of file Reactor.cpp.

569 {
570  trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
571 
572  int ret = 0;
573  handler_t fd;
574  EventHandler* ehp = NULL;
575  std::string eh_id;
576 
577  Fd2Eh_Map_Iter iter = fdSet_.begin ();
578 
579  while (iter != fdSet_.end ())
580  {
581  fd = (*iter).first;
582  ehp = (*iter).second;
583 
584  if (mask_.isSet (fd) && ehp != NULL)
585  {
586  eh_id = ehp->get_id ();
587  DL((REACT,"Data detected from \"%s\"(fd=%d)\n",
588  eh_id.c_str (), fd));
589 
590  ret = (ehp->*callback_) (fd); /* Fire up a callback */
591 
592  if (ret == -1) {
593  removeIOHandler (fd);
594  }
595  else if (ret > 0) {
596  DL((REACT,"%d bytes pending on fd=%d \"%s\"\n",
597  ret, fd, eh_id.c_str ()));
598  //return; <-- would starve other connections
599  }
600  else {
601  DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n",
602  eh_id.c_str (), fd));
603  mask_.clear (fd);
604  }
611  iter = fdSet_.begin ();
612  }
613  else {
614  iter++;
615  }
616  }
617 }
EventHandler class.
Definition: EventHandler.h:103
std::string get_id() const
Retrieve EventHandler ID.
Definition: EventHandler.h:157
bool isSet(handler_t fd_)
Test whether fd's flag is on.
Definition: FdSet.h:122
Fd2Eh_Map_Type::iterator Fd2Eh_Map_Iter
Definition: Reactor.h:155

References ASSA::FdSet::clear(), DL, ASSA::EventHandler::get_id(), ASSA::FdSet::isSet(), ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), and trace_with_mask.

Referenced by dispatch().

◆ handleError()

bool Reactor::handleError ( void  )
private

Handle error in select(2) loop appropriately.

If commanded to stop, do so

Definition at line 340 of file Reactor.cpp.

342 {
343  trace_with_mask("Reactor::handleError",REACTTRACE);
344 
347  if ( !m_active ) {
348  DL((REACT,"Received cmd to stop Reactor\n"));
349  return (false);
350  }
351 
352  /*---
353  TODO: If select(2) returns before time expires, with
354  a descriptor ready or with EINTR, timeval is not
355  going to be updated with number of seconds remaining.
356  This is true for all systems except Linux, which will
357  do so. Therefore, to restart correctly in case of
358  EINTR, we ought to take time measurement before and
359  after select, and try to select() for remaining time.
360 
361  For now, we restart with the initial timing value.
362  ---*/
363  /*---
364  BSD kernel never restarts select(2). SVR4 will restart if
365  the SA_RESTART flag is specified when the signal handler
366  for the signal delivered is installed. This means taht for
367  portability, we must handle signal interrupts.
368  ---*/
369 
370  if ( errno == EINTR ) {
371  EL((REACT,"EINTR: interrupted select(2)\n"));
372  /*
373  If I was sitting in select(2) and received SIGTERM,
374  the signal handler would have set m_active to 'false',
375  and this function would have returned 'false' as above.
376  For any other non-critical signals (USR1,...),
377  we retry select.
378  */
379  return (true);
380  }
381  /*
382  EBADF - bad file number. One of the file descriptors does
383  not reference an open file to open(), close(), ioctl().
384  This can happen if user closed fd and forgot to remove
385  handler from Reactor.
386  */
387  if ( errno == EBADF ) {
388  DL((REACT,"EBADF: bad file descriptor\n"));
389  return (checkFDs ());
390  }
391  /*
392  Any other error from select
393  */
394 #if defined (WIN32)
395  DL ((REACT,"select(3) error = %d\n", WSAGetLastError()));
396 #else
397  EL((ASSAERR,"select(3) error\n"));
398 #endif
399  return (false);
400 }
bool checkFDs(void)
Check mask for bad file descriptors.
Definition: Reactor.cpp:317

References ASSA::ASSAERR, checkFDs(), DL, EL, m_active, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

◆ isAnyReady()

int Reactor::isAnyReady ( void  )
private

Return number of file descriptors ready accross all sets.

Definition at line 403 of file Reactor.cpp.

405 {
406  trace_with_mask("Reactor::isAnyReady",REACTTRACE);
407 
408  int n = m_readySet.m_rset.numSet () +
411 
412  if ( n > 0 ) {
413  DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
414  m_readySet.dump ();
415  }
416  return (n);
417 }
int numSet()
Determine how many bits are set (ON) in the set.
Definition: FdSet.h:126

References DL, ASSA::MaskSet::dump(), ASSA::MaskSet::m_eset, m_readySet, ASSA::MaskSet::m_rset, ASSA::MaskSet::m_wset, ASSA::FdSet::numSet(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

◆ operator=()

Reactor& ASSA::Reactor::operator= ( const Reactor )
private

no cloning

◆ registerIOHandler()

bool Reactor::registerIOHandler ( EventHandler eh_,
handler_t  fd_,
EventType  et_ = RWE_EVENTS 
)

Register I/O Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters
eh_Pointer to the EventHandler
fd_File descriptor
et_Event Type
Returns
true if success, false if error

Definition at line 92 of file Reactor.cpp.

94 {
95  trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
96 
97  std::ostringstream msg;
98  Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));
99 
100  if (isReadEvent (et_))
101  {
102  if (!m_waitSet.m_rset.setFd (fd_))
103  {
104  DL((ASSAERR,"readset: fd %d out of range\n", fd_));
105  return (false);
106  }
107  m_readSet[fd_] = eh_;
108  msg << "READ_EVENT";
109  }
110 
111  if (isWriteEvent (et_))
112  {
113  if (!m_waitSet.m_wset.setFd (fd_))
114  {
115  DL((ASSAERR,"writeset: fd %d out of range\n", fd_));
116  return (false);
117  }
118  m_writeSet[fd_] = eh_;
119  msg << " WRITE_EVENT";
120  }
121 
122  if (isExceptEvent (et_))
123  {
124  if (!m_waitSet.m_eset.setFd (fd_))
125  {
126  DL((ASSAERR,"exceptset: fd %d out of range\n", fd_));
127  return (false);
128  }
129  m_exceptSet[fd_] = eh_;
130  msg << " EXCEPT_EVENT";
131  }
132  msg << std::ends;
133 
134  DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n",
135  eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () ));
136 
137 #if !defined (WIN32)
138  if (m_maxfd_plus1 < fd_+1) {
139  m_maxfd_plus1 = fd_+1;
140  DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
141  }
142 #endif
143 
144  DL((REACT,"Modified waitSet:\n"));
145  m_waitSet.dump ();
146 
147  return (true);
148 }
#define Assure_return(exp_)
Test condition and return bool from a function if assertion fails.
Definition: Assure.h:64
unsigned long u_long
Definition: Logger_Impl.h:41
bool isReadEvent(EventType e_)
Definition: EventHandler.h:51
bool isExceptEvent(EventType e_)
Definition: EventHandler.h:63
bool isSignalEvent(EventType e_)
Definition: EventHandler.h:75
bool isTimeoutEvent(EventType e_)
Definition: EventHandler.h:69
Socket & ends(Socket &os_)
ends manipulator.
Definition: Socket.h:622
bool isWriteEvent(EventType e_)
Definition: EventHandler.h:57

References ASSA::ASSAERR, Assure_return, DL, ASSA::MaskSet::dump(), ASSA::ends(), ASSA::EventHandler::get_id(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isSignalEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_maxfd_plus1, m_readSet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), and ASSA::RemoteLogger::log_open().

◆ registerTimerHandler()

TimerId Reactor::registerTimerHandler ( EventHandler eh_,
const TimeVal tv_,
const std::string &  name_ = "<unknown>" 
)

Register Timer Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters
eh_Pointer to the EventHandler
tv_Timeout value
name_Name of the timer
Returns
Timer ID that can be used to cancel timer and find out its name.

Definition at line 66 of file Reactor.cpp.

70 {
71  trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
72  Assure_return (eh_);
73 
75  TimeVal t (now + timeout_);
76 
77  DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n",
78  timeout_.sec(),timeout_.msec()));
79  DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
80  DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
81 
82  TimerId tid = m_tqueue.insert (eh_, t, timeout_, name_);
83 
84  DL((REACT,"---Modified Timer Queue----\n"));
85  m_tqueue.dump();
86  DL((REACT,"---------------------------\n"));
87 
88  return (tid);
89 }
TimerId insert(EventHandler *eh_, const TimeVal &tv_, const TimeVal &delta_, const std::string &name_)
Add timer (EventHandler object) to the queue to be dispatch at the time specified.
Definition: TimerQueue.cpp:138
unsigned long TimerId
Timer Id is used in handle_timeout() calls.
Definition: EventHandler.h:27

References Assure_return, DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::fmtString(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::insert(), m_tqueue, ASSA::TimeVal::msec(), ASSA::REACT, ASSA::REACTTRACE, ASSA::TimeVal::sec(), and trace_with_mask.

◆ removeHandler()

bool Reactor::removeHandler ( EventHandler eh_,
EventType  et_ = ALL_EVENTS 
)

Remove Event handler from reactor for either all I/O events or timeout event or both.

Remove handler from all events that matches event_.

If et_ is TIMEOUT_EVENT, all timers associated with Event Handler eh_ will be removed.

Parameters
eh_Pointer to the EventHandler
et_Event Type to remove. Default will remove Event Handler for all events.
Returns
true if success, false if wasn't registered for any events.

Definition at line 172 of file Reactor.cpp.

174 {
175  trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
176 
177  bool ret = false;
178  handler_t fd;
179  Fd2Eh_Map_Iter iter;
180 
181  if (eh_ == NULL) {
182  return false;
183  }
184 
185  if (isTimeoutEvent (event_)) {
186  ret = m_tqueue.remove (eh_);
187  ret = true;
188  }
189 
190  if (isReadEvent (event_)) {
191  iter = m_readSet.begin ();
192  while (iter != m_readSet.end ()) {
193  if ((*iter).second == eh_) {
194  fd = (*iter).first;
195  m_readSet.erase (iter);
196  m_waitSet.m_rset.clear (fd);
197  ret = true;
198  break;
199  }
200  iter++;
201  }
202  }
203 
204  if (isWriteEvent (event_)) {
205  iter = m_writeSet.begin ();
206  while (iter != m_writeSet.end ()) {
207  if ((*iter).second == eh_) {
208  fd = (*iter).first;
209  m_writeSet.erase (iter);
210  m_waitSet.m_wset.clear (fd);
211  ret = true;
212  break;
213  }
214  iter++;
215  }
216  }
217 
218  if (isExceptEvent (event_)) {
219  iter = m_exceptSet.begin ();
220  while (iter != m_exceptSet.end ()) {
221  if ((*iter).second == eh_) {
222  fd = (*iter).first;
223  m_exceptSet.erase (iter);
224  m_waitSet.m_eset.clear (fd);
225  ret = true;
226  break;
227  }
228  iter++;
229  }
230  }
231 
232  if (ret == true) {
233  DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_));
234  eh_->handle_close (fd);
235  }
236 
237  adjust_maxfdp1 (fd);
238 
239  DL((REACT,"Modifies waitSet:\n"));
240  m_waitSet.dump ();
241 
242  return (ret);
243 }
virtual int handle_close(int fd)
EOF on peer socket handler callback.
Definition: EventHandler.h:212
void adjust_maxfdp1(handler_t fd_)
Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).
Definition: Reactor.cpp:701
int remove(EventHandler *eh_)
Cancel all timers for the EventHandler eh_.
Definition: TimerQueue.cpp:31

References adjust_maxfdp1(), ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::EventHandler::get_id(), ASSA::EventHandler::handle_close(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.

Referenced by ASSA::RemoteLogger::log_close(), and stopReactor().

◆ removeIOHandler()

bool Reactor::removeIOHandler ( handler_t  fd_)

Remove IO Event handler from reactor.

This will remove handler from receiving all I/O events.

Parameters
fd_File descriptor
Returns
true on success, false if fd_ is out of range

We clear m_readySet mask here as well, because if we don't, it will be erroneously used by isAnyReady() before select().

Definition at line 246 of file Reactor.cpp.

248 {
249  trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
250 
251  bool ret = false;
252  EventHandler* ehp = NULL;
253  Fd2Eh_Map_Iter iter;
254 
256 
257  DL((REACT,"Removing handler for fd=%d\n",fd_));
258 
263  if ((iter = m_readSet.find (fd_)) != m_readSet.end ())
264  {
265  ehp = (*iter).second;
266  m_readSet.erase (iter);
267  m_waitSet.m_rset.clear (fd_);
268  m_readySet.m_rset.clear (fd_);
269  if (m_readSet.size () > 0) {
270  iter = m_readSet.end ();
271  iter--;
272  }
273  ret = true;
274  }
275 
276  if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ())
277  {
278  ehp = (*iter).second;
279  m_writeSet.erase (iter);
280  m_waitSet.m_wset.clear (fd_);
281  m_readySet.m_wset.clear (fd_);
282  if (m_writeSet.size () > 0) {
283  iter = m_writeSet.end ();
284  iter--;
285  }
286  ret = true;
287  }
288 
289  if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ())
290  {
291  ehp = (*iter).second;
292  m_exceptSet.erase (iter);
293  m_waitSet.m_eset.clear (fd_);
294  m_readySet.m_eset.clear (fd_);
295  if (m_exceptSet.size () > 0) {
296  iter = m_exceptSet.end ();
297  iter--;
298  }
299  ret = true;
300  }
301 
302  if (ret == true && ehp != NULL) {
303  DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp));
304  ehp->handle_close (fd_);
305  }
306 
307  adjust_maxfdp1 (fd_);
308 
309  DL((REACT,"Modifies waitSet:\n"));
310  m_waitSet.dump ();
311 
312  return (ret);
313 }
bool is_valid_handler(handler_t socket_)
Detect socket() error in a portable way.
Definition: Logger_Impl.h:100

References adjust_maxfdp1(), Assure_return, ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::EventHandler::get_id(), ASSA::EventHandler::handle_close(), ASSA::is_valid_handler(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by checkFDs(), and dispatchHandler().

◆ removeTimerHandler()

bool Reactor::removeTimerHandler ( TimerId  id_)

Remove Timer event from the queue.

This removes particular event.

Parameters
id_Timer Id returned by registerTimer.
Returns
true if timer found and removed; false otherwise

Definition at line 151 of file Reactor.cpp.

153 {
154  trace_with_mask("Reactor::removeTimer",REACTTRACE);
155  bool ret;
156 
157  if ((ret = m_tqueue.remove (tid_))) {
158  DL((REACT,"---Modified Timer Queue----\n"));
159  m_tqueue.dump();
160  DL((REACT,"---------------------------\n"));
161  }
162  else {
163  EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
164  }
165  return (ret);
166 }

References ASSA::ASSAERR, DL, ASSA::TimerQueue::dump(), EL, m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.

◆ stopReactor()

void Reactor::stopReactor ( void  )

Stop Reactor's activity.

This effectively removes all handlers from under Reactor's supervision. As of now, there is no way to re-activate the Reactor. This method is typically called from method other then EventHandler::signal_handler(). EventHandler::handle_read () is a good candidate. Calling it from EventHandler::handle_close () will most likely cause an infinite loop of recursive calls.

Definition at line 666 of file Reactor.cpp.

668 {
669  trace_with_mask("Reactor::stopReactor", REACTTRACE);
670 
671  m_active = false;
672 
673  Fd2Eh_Map_Iter iter;
674  EventHandler* ehp;
675 
676  while (m_readSet.size () > 0) {
677  iter = m_readSet.begin ();
678  ehp = (*iter).second;
679  removeHandler (ehp);
680  }
681 
682  while (m_writeSet.size () > 0) {
683  iter = m_writeSet.begin ();
684  ehp = (*iter).second;
685  removeHandler (ehp);
686  }
687 
688  while (m_exceptSet.size () > 0) {
689  iter = m_exceptSet.begin ();
690  ehp = (*iter).second;
691  removeHandler (ehp);
692  }
693 }
bool removeHandler(EventHandler *eh_, EventType et_=ALL_EVENTS)
Remove Event handler from reactor for either all I/O events or timeout event or both.
Definition: Reactor.cpp:173

References m_active, m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, removeHandler(), and trace_with_mask.

◆ waitForEvents() [1/2]

void Reactor::waitForEvents ( TimeVal tv_)

Wait for events for time specified.

Passing NULL replicates behavior of waitForEvents(void). Passing tv_ {0, 0} will cause non-blocking polling for all events. This method blocks up to tv_ time interval processing event. If an event occurs, it will process event(s) and return. tv_ time is adjusted by substracting time spent in event processing.

Parameters
tv_[RW] is time to wait for.

| select() | errno | Events | Behavior | |===================================================================| | < 0 | EINTR | Interrup by signal | Retry | +-------—+----—+------------------—+-----------------------—+ | < 0 | EBADF | Bad file descriptor | Remove bad fds and retry | | | | | and retry | +-------—+----—+------------------—+-----------------------—+ | < 0 | others| Some other error | Fall through | +-------—+----—+------------------—+-----------------------—+ | == 0 | 0 | Timed out | Fall through | +-------—+----—+------------------—+-----------------------—+ | > 0 | 0 | Got some work to do | Fall through | +----------------------------------------------------------------—+

Definition at line 494 of file Reactor.cpp.

496 {
497  trace_with_mask("Reactor::waitForEvents",REACTTRACE);
498 
499  TimerCountdown traceTime (tv_);
500  DL((REACT,"======================================\n"));
501 
502  /*--- Expire all stale Timers ---*/
504 
505  /* Test to see if Reactor has been deactivated as a result
506  * of processing done by any TimerHandlers.
507  */
508  if (!m_active) {
509  return;
510  }
511 
512  int nReady;
513  TimeVal delay;
514  TimeVal* dlp = &delay;
515 
516  /*---
517  In case if not all data have been processed by the EventHandler,
518  and EventHandler stated so in its callback's return value
519  to dispatcher (), it will be called again. This way
520  underlying file/socket stream can efficiently utilize its
521  buffering mechaninsm.
522  ---*/
523  if ((nReady = isAnyReady ())) {
524  DL((REACT,"isAnyReady returned: %d\n",nReady));
525  dispatch (nReady);
526  return;
527  }
528 
529  DL((REACT,"=== m_waitSet ===\n"));
530  m_waitSet.dump ();
531 
532  do {
533  m_readySet.reset ();
534  DL ((REACT,"m_readySet after reset():\n"));
535  m_readySet.dump ();
536 
538  DL ((REACT,"m_readySet after assign:\n"));
539  m_readySet.dump ();
540 
541  calculateTimeout (dlp, tv_);
542 
543  nReady = ::select (m_maxfd_plus1,
547  dlp);
548  DL((REACT,"::select() returned: %d\n",nReady));
549 
550  m_readySet.sync ();
551  DL ((REACT,"m_readySet after select:\n"));
552  m_readySet.dump ();
553 
554  }
555  while (nReady < 0 && handleError ());
556 
557  dispatch (nReady);
558 }
void sync()
Resync internals after select() call.
Definition: MaskSet.h:52
void reset()
Clear all bits in all sets.
Definition: MaskSet.h:62
void calculateTimeout(TimeVal *&howlong_, TimeVal *maxwait_)
Calculate closest timeout.
Definition: Reactor.cpp:421
bool handleError(void)
Handle error in select(2) loop appropriately.
Definition: Reactor.cpp:341
bool dispatch(int minimum_)
Notify all EventHandlers registered on respecful events occured.
Definition: Reactor.cpp:626
int isAnyReady(void)
Return number of file descriptors ready accross all sets.
Definition: Reactor.cpp:404

References calculateTimeout(), dispatch(), DL, ASSA::MaskSet::dump(), ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), handleError(), isAnyReady(), m_active, ASSA::MaskSet::m_eset, m_maxfd_plus1, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::MaskSet::reset(), ASSA::MaskSet::sync(), and trace_with_mask.

◆ waitForEvents() [2/2]

void Reactor::waitForEvents ( void  )

Main waiting loop that blocks indefinitely processing events.

Block forever version.

Definition at line 469 of file Reactor.cpp.

471 {
472  while ( m_active ) {
473  waitForEvents ((TimeVal*) NULL);
474  }
475 }
void waitForEvents(void)
Main waiting loop that blocks indefinitely processing events.
Definition: Reactor.cpp:470

References m_active.

Member Data Documentation

◆ m_active

bool ASSA::Reactor::m_active
private

Flag that indicates whether Reactor is active or had been stopped.

Definition at line 209 of file Reactor.h.

Referenced by deactivate(), handleError(), stopReactor(), and waitForEvents().

◆ m_exceptSet

Fd2Eh_Map_Type ASSA::Reactor::m_exceptSet
private

Event handlers awaiting on EXCEPT_EVENT.

Definition at line 218 of file Reactor.h.

Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().

◆ m_fd_setsize

int ASSA::Reactor::m_fd_setsize
private

Max number of open files per process.

This is the soft limit enforced by the kernel. It can be obtained/manipulated from the shell with ulimit/limit utilities, but may not exceed the hard limit.

Definition at line 200 of file Reactor.h.

Referenced by checkFDs(), and Reactor().

◆ m_maxfd_plus1

handler_t ASSA::Reactor::m_maxfd_plus1
private

Max file descriptor number (in all sets) plus 1.

This value is ignored by WIN32 implementation of select()

Definition at line 206 of file Reactor.h.

Referenced by adjust_maxfdp1(), registerIOHandler(), and waitForEvents().

◆ m_readSet

Fd2Eh_Map_Type ASSA::Reactor::m_readSet
private

Event handlers awaiting on READ_EVENT.

Definition at line 212 of file Reactor.h.

Referenced by checkFDs(), dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().

◆ m_readySet

MaskSet ASSA::Reactor::m_readySet
private

Handlers that are ready for processing.

Definition at line 224 of file Reactor.h.

Referenced by dispatch(), isAnyReady(), removeIOHandler(), and waitForEvents().

◆ m_tqueue

TimerQueue ASSA::Reactor::m_tqueue
private

The queue of Timers.

Definition at line 227 of file Reactor.h.

Referenced by calculateTimeout(), dispatch(), registerTimerHandler(), removeHandler(), removeTimerHandler(), and waitForEvents().

◆ m_waitSet

MaskSet ASSA::Reactor::m_waitSet
private

Handlers to wait for event on.

Definition at line 221 of file Reactor.h.

Referenced by adjust_maxfdp1(), registerIOHandler(), removeHandler(), removeIOHandler(), and waitForEvents().

◆ m_writeSet

Fd2Eh_Map_Type ASSA::Reactor::m_writeSet
private

Event handlers awaiting on WRITE_EVENT.

Definition at line 215 of file Reactor.h.

Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().


The documentation for this class was generated from the following files: