#include <thread_pool_observer.hpp>
Private Attributes |
| boost::scoped_ptr< Rep > | m_p |
Detailed Description
Definition at line 36 of file thread_pool_observer.hpp.
Constructor & Destructor Documentation
| ThreadPoolSocketObserver::ThreadPoolSocketObserver |
( |
yazpp_1::ISocketObservable * |
obs, |
|
|
int |
no_threads |
|
) |
| |
Definition at line 96 of file thread_pool_observer.cpp.
References m_p.
{
obs->addObserver(
m_p->m_pipe.read_fd(),
this);
obs->maskObserver(this, SOCKET_OBSERVE_READ);
m_p->m_stop_flag =
false;
m_p->m_no_threads = no_threads;
m_p->m_no_threads_waiting = 0;
int i;
for (i = 0; i<no_threads; i++)
{
Worker w(this);
m_p->m_thrds.add_thread(
new boost::thread(w));
}
}
| ThreadPoolSocketObserver::~ThreadPoolSocketObserver |
( |
| ) |
|
|
virtual |
Definition at line 114 of file thread_pool_observer.cpp.
References m_p.
{
{
boost::mutex::scoped_lock input_lock(
m_p->m_mutex_input_data);
m_p->m_cond_input_data.notify_all();
}
m_p->m_socketObservable->deleteObserver(
this);
}
Member Function Documentation
| void ThreadPoolSocketObserver::cleanup |
( |
IThreadPoolMsg * |
m, |
|
|
void * |
info |
|
) |
| |
Definition at line 215 of file thread_pool_observer.cpp.
References m_p.
{
boost::mutex::scoped_lock input_lock(
m_p->m_mutex_input_data);
std::deque<IThreadPoolMsg *>::iterator it =
m_p->m_input.begin();
while (it !=
m_p->m_input.end())
{
if ((*it)->cleanup(info))
it =
m_p->m_input.erase(it);
else
it++;
}
}
| void ThreadPoolSocketObserver::get_thread_info |
( |
int & |
tbusy, |
|
|
int & |
total |
|
) |
| |
| void ThreadPoolSocketObserver::run |
( |
void * |
p | ) |
|
Definition at line 175 of file thread_pool_observer.cpp.
References metaproxy_1::IThreadPoolMsg::handle(), and m_p.
Referenced by metaproxy_1::ThreadPoolSocketObserver::Worker::operator()().
{
while(1)
{
{
boost::mutex::scoped_lock input_lock(
m_p->m_mutex_input_data);
m_p->m_no_threads_waiting++;
while (!
m_p->m_stop_flag &&
m_p->m_input.size() == 0)
m_p->m_cond_input_data.wait(input_lock);
m_p->m_no_threads_waiting--;
break;
in =
m_p->m_input.front();
m_p->m_input.pop_front();
m_p->m_cond_input_full.notify_all();
}
{
boost::mutex::scoped_lock output_lock(
m_p->m_mutex_output_data);
m_p->m_output.push_back(out);
#ifdef WIN32
send(
m_p->m_pipe.write_fd(),
"", 1, 0);
#else
ssize_t r = write(
m_p->m_pipe.write_fd(),
"", 1);
if (r != 1)
{
if (r == (ssize_t) (-1))
yaz_log(YLOG_WARN|YLOG_ERRNO,
"ThreadPoolSocketObserver::run. write fail");
else
yaz_log(YLOG_WARN,
"ThreadPoolSocketObserver::run. write returned 0");
}
#endif
}
}
}
| void ThreadPoolSocketObserver::socketNotify |
( |
int |
event | ) |
|
|
private |
Definition at line 126 of file thread_pool_observer.cpp.
References m_p, and metaproxy_1::IThreadPoolMsg::result().
{
if (event & SOCKET_OBSERVE_READ)
{
char buf[2];
#ifdef WIN32
recv(
m_p->m_pipe.read_fd(), buf, 1, 0);
#else
ssize_t r = read(
m_p->m_pipe.read_fd(), buf, 1);
if (r != 1)
{
if (r == (ssize_t) (-1))
yaz_log(YLOG_WARN|YLOG_ERRNO,
"ThreadPoolSocketObserver::socketNotify. read fail");
else
yaz_log(YLOG_WARN,
"ThreadPoolSocketObserver::socketNotify. read returned 0");
}
#endif
{
boost::mutex::scoped_lock output_lock(
m_p->m_mutex_output_data);
out =
m_p->m_output.front();
m_p->m_output.pop_front();
}
if (out)
{
std::ostringstream os;
{
boost::mutex::scoped_lock input_lock(
m_p->m_mutex_input_data);
os << "tbusy/total " <<
m_p->m_no_threads -
m_p->m_no_threads_waiting <<
<<
" queue in/out " <<
m_p->m_input.size() <<
"/"
}
out->
result(os.str().c_str());
}
}
}
Member Data Documentation
| boost::scoped_ptr<Rep> metaproxy_1::ThreadPoolSocketObserver::m_p |
|
private |
The documentation for this class was generated from the following files: