metaproxy  1.13.0
Classes | Public Member Functions | Private Member Functions | Private Attributes | List of all members
metaproxy_1::ThreadPoolSocketObserver Class Reference

#include <thread_pool_observer.hpp>

Inheritance diagram for metaproxy_1::ThreadPoolSocketObserver:
Inheritance graph
Collaboration diagram for metaproxy_1::ThreadPoolSocketObserver:
Collaboration graph

Classes

class  Rep
 
class  Worker
 

Public Member Functions

 ThreadPoolSocketObserver (yazpp_1::ISocketObservable *obs, unsigned min_threads, unsigned max_threads, unsigned stack_size)
 
virtual ~ThreadPoolSocketObserver ()
 
void put (IThreadPoolMsg *m)
 
void cleanup (IThreadPoolMsg *m, void *info)
 
IThreadPoolMsgget ()
 
void run (void *p)
 
void get_thread_info (int &tbusy, int &total)
 

Private Member Functions

void add_worker (void)
 
void socketNotify (int event)
 

Private Attributes

boost::scoped_ptr< Repm_p
 

Detailed Description

Definition at line 36 of file thread_pool_observer.hpp.

Constructor & Destructor Documentation

ThreadPoolSocketObserver::ThreadPoolSocketObserver ( yazpp_1::ISocketObservable *  obs,
unsigned  min_threads,
unsigned  max_threads,
unsigned  stack_size 
)

Definition at line 101 of file thread_pool_observer.cpp.

References add_worker(), and m_p.

105  : m_p(new Rep(obs))
106 {
107  obs->addObserver(m_p->m_pipe.read_fd(), this);
108  obs->maskObserver(this, SOCKET_OBSERVE_READ);
109 
110  m_p->m_stop_flag = false;
111  m_p->m_no_threads = 0;
112  m_p->m_min_threads = min_threads;
113  m_p->m_max_threads = max_threads;
114  m_p->m_waiting_threads = 0;
115  unsigned i;
116 #if BOOST_VERSION >= 105000
117  if (stack_size > 0)
118  m_p->attrs.set_stack_size(stack_size);
119 #else
120  if (stack_size)
121  yaz_log(YLOG_WARN, "stack_size has no effect (Requires Boost 1.50)");
122 #endif
123  for (i = 0; i < min_threads; i++)
124  add_worker();
125 }

Here is the call graph for this function:

ThreadPoolSocketObserver::~ThreadPoolSocketObserver ( )
virtual

Definition at line 127 of file thread_pool_observer.cpp.

References m_p.

128 {
129  {
130  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
131  m_p->m_stop_flag = true;
132  m_p->m_cond_input_data.notify_all();
133  }
134  m_p->m_thrds.join_all();
135  m_p->m_socketObservable->deleteObserver(this);
136 }

Member Function Documentation

void ThreadPoolSocketObserver::add_worker ( void  )
private

Definition at line 138 of file thread_pool_observer.cpp.

References m_p.

Referenced by put(), and ThreadPoolSocketObserver().

139 {
140  Worker w(this);
141 #if BOOST_VERSION >= 105000
142  boost::thread *x = new boost::thread(m_p->attrs, w);
143 #else
144  boost::thread *x = new boost::thread(w);
145 #endif
146  m_p->m_no_threads++;
147  m_p->m_thrds.add_thread(x);
148 }
void ThreadPoolSocketObserver::cleanup ( IThreadPoolMsg m,
void *  info 
)

Definition at line 237 of file thread_pool_observer.cpp.

References m_p.

238 {
239  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
240 
241  std::deque<IThreadPoolMsg *>::iterator it = m_p->m_input.begin();
242  while (it != m_p->m_input.end())
243  {
244  if ((*it)->cleanup(info))
245  {
246  delete *it;
247  it = m_p->m_input.erase(it);
248  }
249  else
250  it++;
251  }
252 }
IThreadPoolMsg* metaproxy_1::ThreadPoolSocketObserver::get ( )
void ThreadPoolSocketObserver::get_thread_info ( int &  tbusy,
int &  total 
)

Definition at line 191 of file thread_pool_observer.cpp.

References m_p.

192 {
193  tbusy = m_p->m_no_threads - m_p->m_waiting_threads;
194  total = m_p->m_no_threads;
195 }
void ThreadPoolSocketObserver::put ( IThreadPoolMsg m)

Definition at line 254 of file thread_pool_observer.cpp.

References add_worker(), m_p, and metaproxy_1::queue_size_per_thread.

255 {
256  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
257  if (m_p->m_waiting_threads == 0 &&
258  m_p->m_no_threads < m_p->m_max_threads)
259  {
260  add_worker();
261  }
262  while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
263  m_p->m_cond_input_full.wait(input_lock);
264  m_p->m_input.push_back(m);
265  m_p->m_cond_input_data.notify_one();
266 }
const unsigned int queue_size_per_thread

Here is the call graph for this function:

void ThreadPoolSocketObserver::run ( void *  p)

Definition at line 197 of file thread_pool_observer.cpp.

References metaproxy_1::IThreadPoolMsg::handle(), and m_p.

Referenced by metaproxy_1::ThreadPoolSocketObserver::Worker::operator()().

198 {
199  while(1)
200  {
201  IThreadPoolMsg *in = 0;
202  {
203  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
204  m_p->m_waiting_threads++;
205  while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
206  m_p->m_cond_input_data.wait(input_lock);
207  m_p->m_waiting_threads--;
208  if (m_p->m_stop_flag)
209  break;
210 
211  in = m_p->m_input.front();
212  m_p->m_input.pop_front();
213  m_p->m_cond_input_full.notify_all();
214  }
215  IThreadPoolMsg *out = in->handle();
216  {
217  boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
218  m_p->m_output.push_back(out);
219 #ifdef WIN32
220  send(m_p->m_pipe.write_fd(), "", 1, 0);
221 #else
222  ssize_t r = write(m_p->m_pipe.write_fd(), "", 1);
223  if (r != 1)
224  {
225  if (r == (ssize_t) (-1))
226  yaz_log(YLOG_WARN|YLOG_ERRNO,
227  "ThreadPoolSocketObserver::run. write fail");
228  else
229  yaz_log(YLOG_WARN,
230  "ThreadPoolSocketObserver::run. write returned 0");
231  }
232 #endif
233  }
234  }
235 }
virtual IThreadPoolMsg * handle()=0

Here is the call graph for this function:

void ThreadPoolSocketObserver::socketNotify ( int  event)
private

Definition at line 150 of file thread_pool_observer.cpp.

References m_p, and metaproxy_1::IThreadPoolMsg::result().

151 {
152  if (event & SOCKET_OBSERVE_READ)
153  {
154  char buf[2];
155 #ifdef WIN32
156  recv(m_p->m_pipe.read_fd(), buf, 1, 0);
157 #else
158  ssize_t r = read(m_p->m_pipe.read_fd(), buf, 1);
159  if (r != 1)
160  {
161  if (r == (ssize_t) (-1))
162  yaz_log(YLOG_WARN|YLOG_ERRNO,
163  "ThreadPoolSocketObserver::socketNotify. read fail");
164  else
165  yaz_log(YLOG_WARN,
166  "ThreadPoolSocketObserver::socketNotify. read returned 0");
167  }
168 #endif
169  IThreadPoolMsg *out;
170  {
171  boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
172  out = m_p->m_output.front();
173  m_p->m_output.pop_front();
174  }
175  if (out)
176  {
177  std::ostringstream os;
178  {
179  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
180  os << "tbusy/total " <<
181  m_p->m_no_threads - m_p->m_waiting_threads <<
182  "/" << m_p->m_no_threads
183  << " queue in/out " << m_p->m_input.size() << "/"
184  << m_p->m_output.size();
185  }
186  out->result(os.str().c_str());
187  }
188  }
189 }
virtual void result(const char *info)=0

Here is the call graph for this function:

Member Data Documentation

boost::scoped_ptr<Rep> metaproxy_1::ThreadPoolSocketObserver::m_p
private

The documentation for this class was generated from the following files: