#include <thread_pool_observer.hpp>
Definition at line 36 of file thread_pool_observer.hpp.
◆ ThreadPoolSocketObserver()
ThreadPoolSocketObserver::ThreadPoolSocketObserver |
( |
yazpp_1::ISocketObservable * |
obs, |
|
|
unsigned |
min_threads, |
|
|
unsigned |
max_threads, |
|
|
unsigned |
stack_size |
|
) |
| |
Definition at line 102 of file thread_pool_observer.cpp.
108 obs->addObserver(
m_p->m_pipe.read_fd(),
this);
109 obs->maskObserver(
this, SOCKET_OBSERVE_READ);
111 m_p->m_stop_flag =
false;
112 m_p->m_no_threads = 0;
113 m_p->m_min_threads = min_threads;
114 m_p->m_max_threads = max_threads;
115 m_p->m_waiting_threads = 0;
117 #if BOOST_VERSION >= 105000
119 m_p->attrs.set_stack_size(stack_size);
122 yaz_log(YLOG_WARN,
"stack_size has no effect (Requires Boost 1.50)");
124 for (i = 0; i < min_threads; i++)
References add_worker(), and m_p.
◆ ~ThreadPoolSocketObserver()
ThreadPoolSocketObserver::~ThreadPoolSocketObserver |
( |
| ) |
|
|
virtual |
Definition at line 128 of file thread_pool_observer.cpp.
131 boost::mutex::scoped_lock input_lock(
m_p->m_mutex_input_data);
132 m_p->m_stop_flag =
true;
133 m_p->m_cond_input_data.notify_all();
135 m_p->m_thrds.join_all();
136 m_p->m_socketObservable->deleteObserver(
this);
References m_p.
◆ add_worker()
void ThreadPoolSocketObserver::add_worker |
( |
void |
| ) |
|
|
private |
◆ cleanup()
void ThreadPoolSocketObserver::cleanup |
( |
IThreadPoolMsg * |
m, |
|
|
void * |
info |
|
) |
| |
Definition at line 247 of file thread_pool_observer.cpp.
249 boost::mutex::scoped_lock input_lock(
m_p->m_mutex_input_data);
251 std::deque<IThreadPoolMsg *>::iterator it =
m_p->m_input.begin();
252 while (it !=
m_p->m_input.end())
254 if ((*it)->cleanup(info))
257 it =
m_p->m_input.erase(it);
References m_p.
◆ get()
◆ get_thread_info()
void ThreadPoolSocketObserver::get_thread_info |
( |
int & |
tbusy, |
|
|
int & |
total |
|
) |
| |
◆ put()
◆ run()
void ThreadPoolSocketObserver::run |
( |
void * |
p | ) |
|
Definition at line 207 of file thread_pool_observer.cpp.
213 boost::mutex::scoped_lock input_lock(
m_p->m_mutex_input_data);
214 m_p->m_waiting_threads++;
215 while (!
m_p->m_stop_flag &&
m_p->m_input.size() == 0)
216 m_p->m_cond_input_data.wait(input_lock);
217 m_p->m_waiting_threads--;
218 if (
m_p->m_stop_flag)
221 in =
m_p->m_input.front();
222 m_p->m_input.pop_front();
223 m_p->m_cond_input_full.notify_all();
227 boost::mutex::scoped_lock output_lock(
m_p->m_mutex_output_data);
228 m_p->m_output.push_back(out);
230 send(
m_p->m_pipe.write_fd(),
"", 1, 0);
232 ssize_t r = write(
m_p->m_pipe.write_fd(),
"", 1);
235 if (r == (ssize_t) (-1))
236 yaz_log(YLOG_WARN|YLOG_ERRNO,
237 "ThreadPoolSocketObserver::run. write fail");
240 "ThreadPoolSocketObserver::run. write returned 0");
References metaproxy_1::IThreadPoolMsg::handle(), and m_p.
Referenced by metaproxy_1::ThreadPoolSocketObserver::Worker::operator()().
◆ socketNotify()
void ThreadPoolSocketObserver::socketNotify |
( |
int |
event | ) |
|
|
private |
Definition at line 151 of file thread_pool_observer.cpp.
153 if (event & SOCKET_OBSERVE_READ)
157 recv(
m_p->m_pipe.read_fd(), buf, 1, 0);
159 ssize_t r = read(
m_p->m_pipe.read_fd(), buf, 1);
162 if (r == (ssize_t) (-1))
163 yaz_log(YLOG_WARN|YLOG_ERRNO,
164 "ThreadPoolSocketObserver::socketNotify. read fail");
167 "ThreadPoolSocketObserver::socketNotify. read returned 0");
173 size_t output_size = 0;
175 boost::mutex::scoped_lock output_lock(
m_p->m_mutex_output_data);
176 if (
m_p->m_output.empty()) {
179 out =
m_p->m_output.front();
180 m_p->m_output.pop_front();
181 output_size =
m_p->m_output.size();
185 size_t input_size = 0;
186 std::ostringstream os;
188 boost::mutex::scoped_lock input_lock(
m_p->m_mutex_input_data);
189 input_size =
m_p->m_input.size();
192 <<
m_p->m_no_threads -
m_p->m_waiting_threads
193 <<
"/" <<
m_p->m_no_threads
194 <<
" queue in/out " << input_size <<
"/" << output_size;
195 out->
result(os.str().c_str());
References m_p, and metaproxy_1::IThreadPoolMsg::result().
◆ m_p
boost::scoped_ptr<Rep> metaproxy_1::ThreadPoolSocketObserver::m_p |
|
private |
The documentation for this class was generated from the following files: