libassa  3.5.1
Reactor.cpp
Go to the documentation of this file.
1 // -*- c++ -*-
2 //------------------------------------------------------------------------------
3 // Reactor.cpp
4 //------------------------------------------------------------------------------
5 // Copyright (C) 1997-2002,2005-2007 Vladislav Grinchenko
6 //
7 // This library is free software; you can redistribute it and/or
8 // modify it under the terms of the GNU Library General Public
9 // License as published by the Free Software Foundation; either
10 // version 2 of the License, or (at your option) any later version.
11 //-----------------------------------------------------------------------------
12 // Created: 05/25/1999
13 //-----------------------------------------------------------------------------
14 #include <iostream>
15 #include <sstream>
16 #include <string>
17 
18 #include "assa/Reactor.h"
19 #include "assa/Logger.h"
20 
21 using namespace ASSA;
22 
24 Reactor () :
25  m_fd_setsize (1024),
26  m_maxfd_plus1 (0),
27  m_active (true)
28 {
29  trace_with_mask("Reactor::Reactor",REACTTRACE);
30 
34 #if defined(WIN32)
35  m_fd_setsize = FD_SETSIZE;
36 
37 #else // POSIX
38  struct rlimit rlim;
39  rlim.rlim_max = 0;
40 
41  if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
42  m_fd_setsize = rlim.rlim_cur;
43  }
44 #endif
45 
48 #if defined (WIN32)
49  WSADATA data;
50  WSAStartup (MAKEWORD (2, 2), &data);
51 #endif
52 }
53 
55 ~Reactor()
56 {
57  trace_with_mask("Reactor::~Reactor",REACTTRACE);
58 
59  m_readSet.clear ();
60  m_writeSet.clear ();
61  m_exceptSet.clear ();
62  deactivate ();
63 }
64 
65 TimerId
68  const TimeVal& timeout_,
69  const std::string& name_)
70 {
71  trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
72  Assure_return (eh_);
73 
75  TimeVal t (now + timeout_);
76 
77  DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n",
78  timeout_.sec(),timeout_.msec()));
79  DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
80  DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
81 
82  TimerId tid = m_tqueue.insert (eh_, t, timeout_, name_);
83 
84  DL((REACT,"---Modified Timer Queue----\n"));
85  m_tqueue.dump();
86  DL((REACT,"---------------------------\n"));
87 
88  return (tid);
89 }
90 
91 bool
94 {
95  trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
96 
97  std::ostringstream msg;
98  Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));
99 
100  if (isReadEvent (et_))
101  {
102  if (!m_waitSet.m_rset.setFd (fd_))
103  {
104  DL((ASSAERR,"readset: fd %d out of range\n", fd_));
105  return (false);
106  }
107  m_readSet[fd_] = eh_;
108  msg << "READ_EVENT";
109  }
110 
111  if (isWriteEvent (et_))
112  {
113  if (!m_waitSet.m_wset.setFd (fd_))
114  {
115  DL((ASSAERR,"writeset: fd %d out of range\n", fd_));
116  return (false);
117  }
118  m_writeSet[fd_] = eh_;
119  msg << " WRITE_EVENT";
120  }
121 
122  if (isExceptEvent (et_))
123  {
124  if (!m_waitSet.m_eset.setFd (fd_))
125  {
126  DL((ASSAERR,"exceptset: fd %d out of range\n", fd_));
127  return (false);
128  }
129  m_exceptSet[fd_] = eh_;
130  msg << " EXCEPT_EVENT";
131  }
132  msg << std::ends;
133 
134  DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n",
135  eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () ));
136 
137 #if !defined (WIN32)
138  if (m_maxfd_plus1 < fd_+1) {
139  m_maxfd_plus1 = fd_+1;
140  DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
141  }
142 #endif
143 
144  DL((REACT,"Modified waitSet:\n"));
145  m_waitSet.dump ();
146 
147  return (true);
148 }
149 
150 bool
153 {
154  trace_with_mask("Reactor::removeTimer",REACTTRACE);
155  bool ret;
156 
157  if ((ret = m_tqueue.remove (tid_))) {
158  DL((REACT,"---Modified Timer Queue----\n"));
159  m_tqueue.dump();
160  DL((REACT,"---------------------------\n"));
161  }
162  else {
163  EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
164  }
165  return (ret);
166 }
167 
171 bool
173 removeHandler (EventHandler* eh_, EventType event_)
174 {
175  trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
176 
177  bool ret = false;
178  handler_t fd;
179  Fd2Eh_Map_Iter iter;
180 
181  if (eh_ == NULL) {
182  return false;
183  }
184 
185  if (isTimeoutEvent (event_)) {
186  ret = m_tqueue.remove (eh_);
187  ret = true;
188  }
189 
190  if (isReadEvent (event_)) {
191  iter = m_readSet.begin ();
192  while (iter != m_readSet.end ()) {
193  if ((*iter).second == eh_) {
194  fd = (*iter).first;
195  m_readSet.erase (iter);
196  m_waitSet.m_rset.clear (fd);
197  ret = true;
198  break;
199  }
200  iter++;
201  }
202  }
203 
204  if (isWriteEvent (event_)) {
205  iter = m_writeSet.begin ();
206  while (iter != m_writeSet.end ()) {
207  if ((*iter).second == eh_) {
208  fd = (*iter).first;
209  m_writeSet.erase (iter);
210  m_waitSet.m_wset.clear (fd);
211  ret = true;
212  break;
213  }
214  iter++;
215  }
216  }
217 
218  if (isExceptEvent (event_)) {
219  iter = m_exceptSet.begin ();
220  while (iter != m_exceptSet.end ()) {
221  if ((*iter).second == eh_) {
222  fd = (*iter).first;
223  m_exceptSet.erase (iter);
224  m_waitSet.m_eset.clear (fd);
225  ret = true;
226  break;
227  }
228  iter++;
229  }
230  }
231 
232  if (ret == true) {
233  DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_));
234  eh_->handle_close (fd);
235  }
236 
237  adjust_maxfdp1 (fd);
238 
239  DL((REACT,"Modifies waitSet:\n"));
240  m_waitSet.dump ();
241 
242  return (ret);
243 }
244 
245 bool
248 {
249  trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
250 
251  bool ret = false;
252  EventHandler* ehp = NULL;
253  Fd2Eh_Map_Iter iter;
254 
256 
257  DL((REACT,"Removing handler for fd=%d\n",fd_));
258 
263  if ((iter = m_readSet.find (fd_)) != m_readSet.end ())
264  {
265  ehp = (*iter).second;
266  m_readSet.erase (iter);
267  m_waitSet.m_rset.clear (fd_);
268  m_readySet.m_rset.clear (fd_);
269  if (m_readSet.size () > 0) {
270  iter = m_readSet.end ();
271  iter--;
272  }
273  ret = true;
274  }
275 
276  if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ())
277  {
278  ehp = (*iter).second;
279  m_writeSet.erase (iter);
280  m_waitSet.m_wset.clear (fd_);
281  m_readySet.m_wset.clear (fd_);
282  if (m_writeSet.size () > 0) {
283  iter = m_writeSet.end ();
284  iter--;
285  }
286  ret = true;
287  }
288 
289  if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ())
290  {
291  ehp = (*iter).second;
292  m_exceptSet.erase (iter);
293  m_waitSet.m_eset.clear (fd_);
294  m_readySet.m_eset.clear (fd_);
295  if (m_exceptSet.size () > 0) {
296  iter = m_exceptSet.end ();
297  iter--;
298  }
299  ret = true;
300  }
301 
302  if (ret == true && ehp != NULL) {
303  DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp));
304  ehp->handle_close (fd_);
305  }
306 
307  adjust_maxfdp1 (fd_);
308 
309  DL((REACT,"Modifies waitSet:\n"));
310  m_waitSet.dump ();
311 
312  return (ret);
313 }
314 
315 bool
317 checkFDs (void)
318 {
319  trace_with_mask("Reactor::checkFDs",REACTTRACE);
320 
321  bool num_removed = false;
322  FdSet mask;
323  timeval poll = { 0, 0 };
324 
325  for (handler_t fd = 0; fd < m_fd_setsize; fd++) {
326  if ( m_readSet[fd] != NULL ) {
327  mask.setFd (fd);
328  if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
329  removeIOHandler (fd);
330  num_removed = true;
331  DL((REACT,"Detected BAD FD: %d\n", fd ));
332  }
333  mask.clear (fd);
334  }
335  }
336  return (num_removed);
337 }
338 
339 bool
341 handleError (void)
342 {
343  trace_with_mask("Reactor::handleError",REACTTRACE);
344 
347  if ( !m_active ) {
348  DL((REACT,"Received cmd to stop Reactor\n"));
349  return (false);
350  }
351 
352  /*---
353  TODO: If select(2) returns before time expires, with
354  a descriptor ready or with EINTR, timeval is not
355  going to be updated with number of seconds remaining.
356  This is true for all systems except Linux, which will
357  do so. Therefore, to restart correctly in case of
358  EINTR, we ought to take time measurement before and
359  after select, and try to select() for remaining time.
360 
361  For now, we restart with the initial timing value.
362  ---*/
363  /*---
364  BSD kernel never restarts select(2). SVR4 will restart if
365  the SA_RESTART flag is specified when the signal handler
366  for the signal delivered is installed. This means taht for
367  portability, we must handle signal interrupts.
368  ---*/
369 
370  if ( errno == EINTR ) {
371  EL((REACT,"EINTR: interrupted select(2)\n"));
372  /*
373  If I was sitting in select(2) and received SIGTERM,
374  the signal handler would have set m_active to 'false',
375  and this function would have returned 'false' as above.
376  For any other non-critical signals (USR1,...),
377  we retry select.
378  */
379  return (true);
380  }
381  /*
382  EBADF - bad file number. One of the file descriptors does
383  not reference an open file to open(), close(), ioctl().
384  This can happen if user closed fd and forgot to remove
385  handler from Reactor.
386  */
387  if ( errno == EBADF ) {
388  DL((REACT,"EBADF: bad file descriptor\n"));
389  return (checkFDs ());
390  }
391  /*
392  Any other error from select
393  */
394 #if defined (WIN32)
395  DL ((REACT,"select(3) error = %d\n", WSAGetLastError()));
396 #else
397  EL((ASSAERR,"select(3) error\n"));
398 #endif
399  return (false);
400 }
401 
402 int
404 isAnyReady (void)
405 {
406  trace_with_mask("Reactor::isAnyReady",REACTTRACE);
407 
408  int n = m_readySet.m_rset.numSet () +
411 
412  if ( n > 0 ) {
413  DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
414  m_readySet.dump ();
415  }
416  return (n);
417 }
418 
419 void
421 calculateTimeout (TimeVal*& howlong_, TimeVal* maxwait_)
422 {
423  trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
424 
425  TimeVal now;
426  TimeVal tv;
427 
428  if (m_tqueue.isEmpty () ) {
429  howlong_ = maxwait_;
430  goto done;
431  }
432  now = TimeVal::gettimeofday ();
433  tv = m_tqueue.top ();
434 
435  if (tv < now) {
436  /*---
437  It took too long to get here (fraction of a millisecond),
438  and top timer had already expired. In this case,
439  perform non-blocking select in order to drain the timer queue.
440  ---*/
441  *howlong_ = 0;
442  }
443  else {
444  DL((REACT,"--------- Timer Queue ----------\n"));
445  m_tqueue.dump();
446  DL((REACT,"--------------------------------\n"));
447 
448  if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
449  *howlong_ = tv - now;
450  }
451  else {
452  *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
453  }
454  }
455 
456  done:
457  if (howlong_ != NULL) {
458  DL((REACT,"delay (%f)\n", double (*howlong_) ));
459  }
460  else {
461  DL((REACT,"delay (forever)\n"));
462  }
463 }
464 
468 void
470 waitForEvents (void)
471 {
472  while ( m_active ) {
473  waitForEvents ((TimeVal*) NULL);
474  }
475 }
476 
493 void
495 waitForEvents (TimeVal* tv_)
496 {
497  trace_with_mask("Reactor::waitForEvents",REACTTRACE);
498 
499  TimerCountdown traceTime (tv_);
500  DL((REACT,"======================================\n"));
501 
502  /*--- Expire all stale Timers ---*/
504 
505  /* Test to see if Reactor has been deactivated as a result
506  * of processing done by any TimerHandlers.
507  */
508  if (!m_active) {
509  return;
510  }
511 
512  int nReady;
513  TimeVal delay;
514  TimeVal* dlp = &delay;
515 
516  /*---
517  In case if not all data have been processed by the EventHandler,
518  and EventHandler stated so in its callback's return value
519  to dispatcher (), it will be called again. This way
520  underlying file/socket stream can efficiently utilize its
521  buffering mechaninsm.
522  ---*/
523  if ((nReady = isAnyReady ())) {
524  DL((REACT,"isAnyReady returned: %d\n",nReady));
525  dispatch (nReady);
526  return;
527  }
528 
529  DL((REACT,"=== m_waitSet ===\n"));
530  m_waitSet.dump ();
531 
532  do {
533  m_readySet.reset ();
534  DL ((REACT,"m_readySet after reset():\n"));
535  m_readySet.dump ();
536 
538  DL ((REACT,"m_readySet after assign:\n"));
539  m_readySet.dump ();
540 
541  calculateTimeout (dlp, tv_);
542 
543  nReady = ::select (m_maxfd_plus1,
547  dlp);
548  DL((REACT,"::select() returned: %d\n",nReady));
549 
550  m_readySet.sync ();
551  DL ((REACT,"m_readySet after select:\n"));
552  m_readySet.dump ();
553 
554  }
555  while (nReady < 0 && handleError ());
556 
557  dispatch (nReady);
558 }
559 
566 void
568 dispatchHandler (FdSet& mask_, Fd2Eh_Map_Type& fdSet_, EH_IO_Callback callback_)
569 {
570  trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
571 
572  int ret = 0;
573  handler_t fd;
574  EventHandler* ehp = NULL;
575  std::string eh_id;
576 
577  Fd2Eh_Map_Iter iter = fdSet_.begin ();
578 
579  while (iter != fdSet_.end ())
580  {
581  fd = (*iter).first;
582  ehp = (*iter).second;
583 
584  if (mask_.isSet (fd) && ehp != NULL)
585  {
586  eh_id = ehp->get_id ();
587  DL((REACT,"Data detected from \"%s\"(fd=%d)\n",
588  eh_id.c_str (), fd));
589 
590  ret = (ehp->*callback_) (fd); /* Fire up a callback */
591 
592  if (ret == -1) {
593  removeIOHandler (fd);
594  }
595  else if (ret > 0) {
596  DL((REACT,"%d bytes pending on fd=%d \"%s\"\n",
597  ret, fd, eh_id.c_str ()));
598  //return; <-- would starve other connections
599  }
600  else {
601  DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n",
602  eh_id.c_str (), fd));
603  mask_.clear (fd);
604  }
611  iter = fdSet_.begin ();
612  }
613  else {
614  iter++;
615  }
616  }
617 }
618 
624 bool
626 dispatch (int ready_)
627 {
628  trace_with_mask("Reactor::dispatch", REACTTRACE);
629 
631 
632  if ( ready_ < 0 )
633  {
634 #if !defined (WIN32)
635  EL((ASSAERR,"::select(3) error\n"));
636 #endif
637  return (false);
638  }
639  if ( ready_ == 0 ) {
640  return (true);
641  }
642 
643  DL((REACT,"Dispatching %d FDs.\n",ready_));
644  DL((REACT,"m_readySet:\n"));
645  m_readySet.dump ();
646 
647  /*--- Writes first ---*/
649  m_writeSet,
651 
652  /*--- Exceptions next ---*/
654  m_exceptSet,
656 
657  /*--- Finally, the Reads ---*/
659  m_readSet,
661 
662  return (true);
663 }
664 
665 void
667 stopReactor (void)
668 {
669  trace_with_mask("Reactor::stopReactor", REACTTRACE);
670 
671  m_active = false;
672 
673  Fd2Eh_Map_Iter iter;
674  EventHandler* ehp;
675 
676  while (m_readSet.size () > 0) {
677  iter = m_readSet.begin ();
678  ehp = (*iter).second;
679  removeHandler (ehp);
680  }
681 
682  while (m_writeSet.size () > 0) {
683  iter = m_writeSet.begin ();
684  ehp = (*iter).second;
685  removeHandler (ehp);
686  }
687 
688  while (m_exceptSet.size () > 0) {
689  iter = m_exceptSet.begin ();
690  ehp = (*iter).second;
691  removeHandler (ehp);
692  }
693 }
694 
699 void
702 {
703 #if !defined (WIN32) /* POSIX */
704 
705  trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE);
706 
707  if (m_maxfd_plus1 == fd_ + 1)
708  {
709  m_maxfd_plus1 = m_waitSet.max_fd () + 1;
710  DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
711  }
712 #endif
713 }
#define Assure_return(exp_)
Test condition and return bool from a function if assertion fails.
Definition: Assure.h:64
An abstraction to message logging facility.
#define EL(X)
A macro for writing error message to the Logger.
Definition: Logger.h:285
#define DL(X)
A macro for writing debug message to the Logger.
Definition: Logger.h:273
#define trace_with_mask(s, m)
trace_with_mask() is used to trace function call chain in C++ program.
Definition: Logger.h:437
unsigned long u_long
Definition: Logger_Impl.h:41
int handler_t
Definition: Logger_Impl.h:82
An implementation of Reactor pattern.
EventHandler class.
Definition: EventHandler.h:103
virtual int handle_close(int fd)
EOF on peer socket handler callback.
Definition: EventHandler.h:212
virtual int handle_write(int fd)
Write handler callback.
Definition: EventHandler.h:180
virtual int handle_except(int fd)
Exception handler callback.
Definition: EventHandler.h:188
std::string get_id() const
Retrieve EventHandler ID.
Definition: EventHandler.h:157
virtual int handle_read(int fd)
Read event callback.
Definition: EventHandler.h:172
Class FdSet.
Definition: FdSet.h:52
bool setFd(handler_t fd_)
Set flag (ON) for the argument fd.
Definition: FdSet.cpp:20
bool isSet(handler_t fd_)
Test whether fd's flag is on.
Definition: FdSet.h:122
bool clear(handler_t fd_)
Clear flag (OFF) for the argument fd.
Definition: FdSet.cpp:39
int numSet()
Determine how many bits are set (ON) in the set.
Definition: FdSet.h:126
FdSet m_rset
Read fds set.
Definition: MaskSet.h:28
void sync()
Resync internals after select() call.
Definition: MaskSet.h:52
FdSet m_eset
Exception fds set.
Definition: MaskSet.h:34
int max_fd()
Return maximum value of the file descriptor in the Set.
Definition: MaskSet.h:71
void reset()
Clear all bits in all sets.
Definition: MaskSet.h:62
void dump()
Write current state of MaskSet object to log file.
Definition: MaskSet.h:80
FdSet m_wset
Write fds set.
Definition: MaskSet.h:31
void calculateTimeout(TimeVal *&howlong_, TimeVal *maxwait_)
Calculate closest timeout.
Definition: Reactor.cpp:421
bool registerIOHandler(EventHandler *eh_, handler_t fd_, EventType et_=RWE_EVENTS)
Register I/O Event handler with Reactor.
Definition: Reactor.cpp:93
TimerQueue m_tqueue
The queue of Timers.
Definition: Reactor.h:227
MaskSet m_waitSet
Handlers to wait for event on.
Definition: Reactor.h:221
Fd2Eh_Map_Type m_writeSet
Event handlers awaiting on WRITE_EVENT.
Definition: Reactor.h:215
Reactor()
Constructor.
Definition: Reactor.cpp:24
Fd2Eh_Map_Type::iterator Fd2Eh_Map_Iter
Definition: Reactor.h:155
MaskSet m_readySet
Handlers that are ready for processing.
Definition: Reactor.h:224
void waitForEvents(void)
Main waiting loop that blocks indefinitely processing events.
Definition: Reactor.cpp:470
handler_t m_maxfd_plus1
Max file descriptor number (in all sets) plus 1.
Definition: Reactor.h:206
void dispatchHandler(FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_)
Call handler's callback and, if callback returns negative value, remove it from the Reactor.
Definition: Reactor.cpp:568
int m_fd_setsize
Max number of open files per process.
Definition: Reactor.h:200
bool handleError(void)
Handle error in select(2) loop appropriately.
Definition: Reactor.cpp:341
std::map< u_int, EventHandler * > Fd2Eh_Map_Type
no cloning
Definition: Reactor.h:154
void deactivate(void)
Deactivate Reactor.
Definition: Reactor.h:234
bool dispatch(int minimum_)
Notify all EventHandlers registered on respecful events occured.
Definition: Reactor.cpp:626
Fd2Eh_Map_Type m_readSet
Event handlers awaiting on READ_EVENT.
Definition: Reactor.h:212
bool m_active
Flag that indicates whether Reactor is active or had been stopped.
Definition: Reactor.h:209
void stopReactor(void)
Stop Reactor's activity.
Definition: Reactor.cpp:667
TimerId registerTimerHandler(EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>")
Register Timer Event handler with Reactor.
Definition: Reactor.cpp:67
bool checkFDs(void)
Check mask for bad file descriptors.
Definition: Reactor.cpp:317
int isAnyReady(void)
Return number of file descriptors ready accross all sets.
Definition: Reactor.cpp:404
bool removeHandler(EventHandler *eh_, EventType et_=ALL_EVENTS)
Remove Event handler from reactor for either all I/O events or timeout event or both.
Definition: Reactor.cpp:173
~Reactor()
Destructor.
Definition: Reactor.cpp:55
bool removeTimerHandler(TimerId id_)
Remove Timer event from the queue.
Definition: Reactor.cpp:152
void adjust_maxfdp1(handler_t fd_)
Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).
Definition: Reactor.cpp:701
bool removeIOHandler(handler_t fd_)
Remove IO Event handler from reactor.
Definition: Reactor.cpp:247
Fd2Eh_Map_Type m_exceptSet
Event handlers awaiting on EXCEPT_EVENT.
Definition: Reactor.h:218
string fmtString(const char *fmt_=NULL) const
Format timeval structure into readable format.
Definition: TimeVal.cpp:146
static TimeVal zeroTime()
Static that returns zero timeval: {0,0}.
Definition: TimeVal.h:157
void sec(long sec_)
Set seconds.
Definition: TimeVal.h:64
static TimeVal gettimeofday()
Shields off underlying OS differences in getting current time.
Definition: TimeVal.cpp:44
void msec(long msec_)
Set microseconds.
Definition: TimeVal.h:70
int expire(const TimeVal &tv_)
Traverse the queue, triggering all timers that are past argument timeval.
Definition: TimerQueue.cpp:89
void dump(void)
Dump Queue information to the log file.
Definition: TimerQueue.cpp:152
int remove(EventHandler *eh_)
Cancel all timers for the EventHandler eh_.
Definition: TimerQueue.cpp:31
bool isEmpty()
Is queue empty?
Definition: TimerQueue.h:110
TimerId insert(EventHandler *eh_, const TimeVal &tv_, const TimeVal &delta_, const std::string &name_)
Add timer (EventHandler object) to the queue to be dispatch at the time specified.
Definition: TimerQueue.cpp:138
TimeVal & top(void)
Return expiration time of the top element in the queue.
Definition: TimerQueue.h:117
Definition: Acceptor.h:40
EventType
EventType defines events types that Reactor understands.
Definition: EventHandler.h:35
unsigned long TimerId
Timer Id is used in handle_timeout() calls.
Definition: EventHandler.h:27
bool isReadEvent(EventType e_)
Definition: EventHandler.h:51
bool isExceptEvent(EventType e_)
Definition: EventHandler.h:63
@ REACT
Class Reactor/PrioriyQueue messages
Definition: LogMask.h:39
@ ASSAERR
ASSA and system errors
Definition: LogMask.h:34
@ REACTTRACE
Extended Reactor/PrioriyQueue messages
Definition: LogMask.h:40
bool isSignalEvent(EventType e_)
Definition: EventHandler.h:75
bool is_valid_handler(handler_t socket_)
Detect socket() error in a portable way.
Definition: Logger_Impl.h:100
bool isTimeoutEvent(EventType e_)
Definition: EventHandler.h:69
Socket & ends(Socket &os_)
ends manipulator.
Definition: Socket.h:622
int(EventHandler::* EH_IO_Callback)(int)
A type for the pointer to I/O-related callback member function of class EventHandler.
Definition: EventHandler.h:236
bool isWriteEvent(EventType e_)
Definition: EventHandler.h:57