metaproxy  1.3.55
Classes | Public Member Functions | Private Member Functions | Private Attributes | List of all members
metaproxy_1::ThreadPoolSocketObserver Class Reference

#include <thread_pool_observer.hpp>

Collaboration diagram for metaproxy_1::ThreadPoolSocketObserver:
Collaboration graph

Classes

class  Rep
class  Worker

Public Member Functions

 ThreadPoolSocketObserver (yazpp_1::ISocketObservable *obs, int no_threads)
virtual ~ThreadPoolSocketObserver ()
void put (IThreadPoolMsg *m)
void cleanup (IThreadPoolMsg *m, void *info)
IThreadPoolMsgget ()
void run (void *p)
void get_thread_info (int &tbusy, int &total)

Private Member Functions

void socketNotify (int event)

Private Attributes

boost::scoped_ptr< Repm_p

Detailed Description

Definition at line 36 of file thread_pool_observer.hpp.

Constructor & Destructor Documentation

ThreadPoolSocketObserver::ThreadPoolSocketObserver ( yazpp_1::ISocketObservable *  obs,
int  no_threads 
)

Definition at line 96 of file thread_pool_observer.cpp.

References m_p.

: m_p(new Rep(obs))
{
obs->addObserver(m_p->m_pipe.read_fd(), this);
obs->maskObserver(this, SOCKET_OBSERVE_READ);
m_p->m_stop_flag = false;
m_p->m_no_threads = no_threads;
m_p->m_no_threads_waiting = 0;
int i;
for (i = 0; i<no_threads; i++)
{
Worker w(this);
m_p->m_thrds.add_thread(new boost::thread(w));
}
}
ThreadPoolSocketObserver::~ThreadPoolSocketObserver ( )
virtual

Definition at line 114 of file thread_pool_observer.cpp.

References m_p.

{
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
m_p->m_stop_flag = true;
m_p->m_cond_input_data.notify_all();
}
m_p->m_thrds.join_all();
m_p->m_socketObservable->deleteObserver(this);
}

Member Function Documentation

void ThreadPoolSocketObserver::cleanup ( IThreadPoolMsg m,
void *  info 
)

Definition at line 215 of file thread_pool_observer.cpp.

References m_p.

{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
std::deque<IThreadPoolMsg *>::iterator it = m_p->m_input.begin();
while (it != m_p->m_input.end())
{
if ((*it)->cleanup(info))
it = m_p->m_input.erase(it);
else
it++;
}
}
IThreadPoolMsg* metaproxy_1::ThreadPoolSocketObserver::get ( )
void ThreadPoolSocketObserver::get_thread_info ( int &  tbusy,
int &  total 
)

Definition at line 169 of file thread_pool_observer.cpp.

References m_p.

{
tbusy = m_p->m_no_threads - m_p->m_no_threads_waiting;
total = m_p->m_no_threads;
}
void ThreadPoolSocketObserver::put ( IThreadPoolMsg m)

Definition at line 229 of file thread_pool_observer.cpp.

References m_p, and metaproxy_1::queue_size_per_thread.

{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
m_p->m_cond_input_full.wait(input_lock);
m_p->m_input.push_back(m);
m_p->m_cond_input_data.notify_one();
}
void ThreadPoolSocketObserver::run ( void *  p)

Definition at line 175 of file thread_pool_observer.cpp.

References metaproxy_1::IThreadPoolMsg::handle(), and m_p.

Referenced by metaproxy_1::ThreadPoolSocketObserver::Worker::operator()().

{
while(1)
{
IThreadPoolMsg *in = 0;
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
m_p->m_no_threads_waiting++;
while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
m_p->m_cond_input_data.wait(input_lock);
m_p->m_no_threads_waiting--;
if (m_p->m_stop_flag)
break;
in = m_p->m_input.front();
m_p->m_input.pop_front();
m_p->m_cond_input_full.notify_all();
}
IThreadPoolMsg *out = in->handle();
{
boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
m_p->m_output.push_back(out);
#ifdef WIN32
send(m_p->m_pipe.write_fd(), "", 1, 0);
#else
ssize_t r = write(m_p->m_pipe.write_fd(), "", 1);
if (r != 1)
{
if (r == (ssize_t) (-1))
yaz_log(YLOG_WARN|YLOG_ERRNO,
"ThreadPoolSocketObserver::run. write fail");
else
yaz_log(YLOG_WARN,
"ThreadPoolSocketObserver::run. write returned 0");
}
#endif
}
}
}

Here is the call graph for this function:

void ThreadPoolSocketObserver::socketNotify ( int  event)
private

Definition at line 126 of file thread_pool_observer.cpp.

References m_p, and metaproxy_1::IThreadPoolMsg::result().

{
if (event & SOCKET_OBSERVE_READ)
{
char buf[2];
#ifdef WIN32
recv(m_p->m_pipe.read_fd(), buf, 1, 0);
#else
ssize_t r = read(m_p->m_pipe.read_fd(), buf, 1);
if (r != 1)
{
if (r == (ssize_t) (-1))
yaz_log(YLOG_WARN|YLOG_ERRNO,
"ThreadPoolSocketObserver::socketNotify. read fail");
else
yaz_log(YLOG_WARN,
"ThreadPoolSocketObserver::socketNotify. read returned 0");
}
#endif
{
boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
out = m_p->m_output.front();
m_p->m_output.pop_front();
}
if (out)
{
std::ostringstream os;
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
os << "tbusy/total " <<
m_p->m_no_threads - m_p->m_no_threads_waiting <<
"/" << m_p->m_no_threads
<< " queue in/out " << m_p->m_input.size() << "/"
<< m_p->m_output.size();
}
out->result(os.str().c_str());
}
}
}

Here is the call graph for this function:

Member Data Documentation

boost::scoped_ptr<Rep> metaproxy_1::ThreadPoolSocketObserver::m_p
private

The documentation for this class was generated from the following files: