41 if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
50 WSAStartup (MAKEWORD (2, 2), &data);
69 const std::string& name_)
77 DL((
REACT,
"TIMEOUT_EVENT......: (%d,%d)\n",
78 timeout_.
sec(),timeout_.
msec()));
84 DL((
REACT,
"---Modified Timer Queue----\n"));
86 DL((
REACT,
"---------------------------\n"));
97 std::ostringstream msg;
104 DL((
ASSAERR,
"readset: fd %d out of range\n", fd_));
115 DL((
ASSAERR,
"writeset: fd %d out of range\n", fd_));
119 msg <<
" WRITE_EVENT";
126 DL((
ASSAERR,
"exceptset: fd %d out of range\n", fd_));
130 msg <<
" EXCEPT_EVENT";
134 DL((
REACT,
"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n",
135 eh_->
get_id ().c_str (), fd_, (
u_long)eh_, msg.str ().c_str () ));
144 DL((
REACT,
"Modified waitSet:\n"));
158 DL((
REACT,
"---Modified Timer Queue----\n"));
160 DL((
REACT,
"---------------------------\n"));
193 if ((*iter).second == eh_) {
207 if ((*iter).second == eh_) {
221 if ((*iter).second == eh_) {
233 DL((
REACT,
"Found EvtH \"%s\"(%p)\n", eh_->
get_id ().c_str (), eh_));
239 DL((
REACT,
"Modifies waitSet:\n"));
257 DL((
REACT,
"Removing handler for fd=%d\n",fd_));
265 ehp = (*iter).second;
278 ehp = (*iter).second;
291 ehp = (*iter).second;
302 if (ret ==
true && ehp != NULL) {
303 DL((
REACT,
"Removed EvtH \"%s\"(%p)\n", ehp->
get_id ().c_str (), ehp));
309 DL((
REACT,
"Modifies waitSet:\n"));
321 bool num_removed =
false;
323 timeval poll = { 0, 0 };
328 if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
331 DL((
REACT,
"Detected BAD FD: %d\n", fd ));
336 return (num_removed);
348 DL((
REACT,
"Received cmd to stop Reactor\n"));
370 if ( errno == EINTR ) {
371 EL((
REACT,
"EINTR: interrupted select(2)\n"));
387 if ( errno == EBADF ) {
388 DL((
REACT,
"EBADF: bad file descriptor\n"));
395 DL ((
REACT,
"select(3) error = %d\n", WSAGetLastError()));
413 DL((
REACT,
"m_readySet: %d FDs are ready for processing\n", n));
444 DL((
REACT,
"--------- Timer Queue ----------\n"));
446 DL((
REACT,
"--------------------------------\n"));
449 *howlong_ = tv - now;
452 *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
457 if (howlong_ != NULL) {
458 DL((
REACT,
"delay (%f)\n",
double (*howlong_) ));
461 DL((
REACT,
"delay (forever)\n"));
500 DL((
REACT,
"======================================\n"));
524 DL((
REACT,
"isAnyReady returned: %d\n",nReady));
529 DL((
REACT,
"=== m_waitSet ===\n"));
534 DL ((
REACT,
"m_readySet after reset():\n"));
538 DL ((
REACT,
"m_readySet after assign:\n"));
548 DL((
REACT,
"::select() returned: %d\n",nReady));
551 DL ((
REACT,
"m_readySet after select:\n"));
579 while (iter != fdSet_.end ())
582 ehp = (*iter).second;
584 if (mask_.
isSet (fd) && ehp != NULL)
587 DL((
REACT,
"Data detected from \"%s\"(fd=%d)\n",
588 eh_id.c_str (), fd));
590 ret = (ehp->*callback_) (fd);
596 DL((
REACT,
"%d bytes pending on fd=%d \"%s\"\n",
597 ret, fd, eh_id.c_str ()));
601 DL((
REACT,
"All data from \"%s\"(fd=%d) are consumed\n",
602 eh_id.c_str (), fd));
611 iter = fdSet_.begin ();
643 DL((
REACT,
"Dispatching %d FDs.\n",ready_));
678 ehp = (*iter).second;
684 ehp = (*iter).second;
690 ehp = (*iter).second;
#define Assure_return(exp_)
Test condition and return bool from a function if assertion fails.
An abstraction to message logging facility.
#define EL(X)
A macro for writing error message to the Logger.
#define DL(X)
A macro for writing debug message to the Logger.
#define trace_with_mask(s, m)
trace_with_mask() is used to trace function call chain in C++ program.
An implementation of Reactor pattern.
virtual int handle_close(int fd)
EOF on peer socket handler callback.
virtual int handle_write(int fd)
Write handler callback.
virtual int handle_except(int fd)
Exception handler callback.
std::string get_id() const
Retrieve EventHandler ID.
virtual int handle_read(int fd)
Read event callback.
bool setFd(handler_t fd_)
Set flag (ON) for the argument fd.
bool isSet(handler_t fd_)
Test whether fd's flag is on.
bool clear(handler_t fd_)
Clear flag (OFF) for the argument fd.
int numSet()
Determine how many bits are set (ON) in the set.
FdSet m_rset
Read fds set.
void sync()
Resync internals after select() call.
FdSet m_eset
Exception fds set.
int max_fd()
Return maximum value of the file descriptor in the Set.
void reset()
Clear all bits in all sets.
void dump()
Write current state of MaskSet object to log file.
FdSet m_wset
Write fds set.
void calculateTimeout(TimeVal *&howlong_, TimeVal *maxwait_)
Calculate closest timeout.
bool registerIOHandler(EventHandler *eh_, handler_t fd_, EventType et_=RWE_EVENTS)
Register I/O Event handler with Reactor.
TimerQueue m_tqueue
The queue of Timers.
MaskSet m_waitSet
Handlers to wait for event on.
Fd2Eh_Map_Type m_writeSet
Event handlers awaiting on WRITE_EVENT.
Fd2Eh_Map_Type::iterator Fd2Eh_Map_Iter
MaskSet m_readySet
Handlers that are ready for processing.
void waitForEvents(void)
Main waiting loop that blocks indefinitely processing events.
handler_t m_maxfd_plus1
Max file descriptor number (in all sets) plus 1.
void dispatchHandler(FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_)
Call handler's callback and, if callback returns negative value, remove it from the Reactor.
int m_fd_setsize
Max number of open files per process.
bool handleError(void)
Handle error in select(2) loop appropriately.
std::map< u_int, EventHandler * > Fd2Eh_Map_Type
no cloning
void deactivate(void)
Deactivate Reactor.
bool dispatch(int minimum_)
Notify all EventHandlers registered on respecful events occured.
Fd2Eh_Map_Type m_readSet
Event handlers awaiting on READ_EVENT.
bool m_active
Flag that indicates whether Reactor is active or had been stopped.
void stopReactor(void)
Stop Reactor's activity.
TimerId registerTimerHandler(EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>")
Register Timer Event handler with Reactor.
bool checkFDs(void)
Check mask for bad file descriptors.
int isAnyReady(void)
Return number of file descriptors ready accross all sets.
bool removeHandler(EventHandler *eh_, EventType et_=ALL_EVENTS)
Remove Event handler from reactor for either all I/O events or timeout event or both.
bool removeTimerHandler(TimerId id_)
Remove Timer event from the queue.
void adjust_maxfdp1(handler_t fd_)
Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).
bool removeIOHandler(handler_t fd_)
Remove IO Event handler from reactor.
Fd2Eh_Map_Type m_exceptSet
Event handlers awaiting on EXCEPT_EVENT.
string fmtString(const char *fmt_=NULL) const
Format timeval structure into readable format.
static TimeVal zeroTime()
Static that returns zero timeval: {0,0}.
void sec(long sec_)
Set seconds.
static TimeVal gettimeofday()
Shields off underlying OS differences in getting current time.
void msec(long msec_)
Set microseconds.
int expire(const TimeVal &tv_)
Traverse the queue, triggering all timers that are past argument timeval.
void dump(void)
Dump Queue information to the log file.
int remove(EventHandler *eh_)
Cancel all timers for the EventHandler eh_.
bool isEmpty()
Is queue empty?
TimerId insert(EventHandler *eh_, const TimeVal &tv_, const TimeVal &delta_, const std::string &name_)
Add timer (EventHandler object) to the queue to be dispatch at the time specified.
TimeVal & top(void)
Return expiration time of the top element in the queue.
EventType
EventType defines events types that Reactor understands.
unsigned long TimerId
Timer Id is used in handle_timeout() calls.
bool isReadEvent(EventType e_)
bool isExceptEvent(EventType e_)
@ REACT
Class Reactor/PrioriyQueue messages
@ ASSAERR
ASSA and system errors
@ REACTTRACE
Extended Reactor/PrioriyQueue messages
bool isSignalEvent(EventType e_)
bool is_valid_handler(handler_t socket_)
Detect socket() error in a portable way.
bool isTimeoutEvent(EventType e_)
Socket & ends(Socket &os_)
ends manipulator.
int(EventHandler::* EH_IO_Callback)(int)
A type for the pointer to I/O-related callback member function of class EventHandler.
bool isWriteEvent(EventType e_)