metaproxy  1.21.0
Classes | Public Member Functions | Private Member Functions | Private Attributes | List of all members
metaproxy_1::ThreadPoolSocketObserver Class Reference

#include <thread_pool_observer.hpp>

Inheritance diagram for metaproxy_1::ThreadPoolSocketObserver:
Inheritance graph
Collaboration diagram for metaproxy_1::ThreadPoolSocketObserver:
Collaboration graph

Classes

class  Rep
 
class  Worker
 

Public Member Functions

 ThreadPoolSocketObserver (yazpp_1::ISocketObservable *obs, unsigned min_threads, unsigned max_threads, unsigned stack_size)
 
virtual ~ThreadPoolSocketObserver ()
 
void put (IThreadPoolMsg *m)
 
void cleanup (IThreadPoolMsg *m, void *info)
 
IThreadPoolMsgget ()
 
void run (void *p)
 
void get_thread_info (int &tbusy, int &total)
 

Private Member Functions

void add_worker (void)
 
void socketNotify (int event)
 

Private Attributes

boost::scoped_ptr< Repm_p
 

Detailed Description

Definition at line 36 of file thread_pool_observer.hpp.

Constructor & Destructor Documentation

◆ ThreadPoolSocketObserver()

ThreadPoolSocketObserver::ThreadPoolSocketObserver ( yazpp_1::ISocketObservable *  obs,
unsigned  min_threads,
unsigned  max_threads,
unsigned  stack_size 
)

Definition at line 102 of file thread_pool_observer.cpp.

106  : m_p(new Rep(obs))
107 {
108  obs->addObserver(m_p->m_pipe.read_fd(), this);
109  obs->maskObserver(this, SOCKET_OBSERVE_READ);
110 
111  m_p->m_stop_flag = false;
112  m_p->m_no_threads = 0;
113  m_p->m_min_threads = min_threads;
114  m_p->m_max_threads = max_threads;
115  m_p->m_waiting_threads = 0;
116  unsigned i;
117 #if BOOST_VERSION >= 105000
118  if (stack_size > 0)
119  m_p->attrs.set_stack_size(stack_size);
120 #else
121  if (stack_size)
122  yaz_log(YLOG_WARN, "stack_size has no effect (Requires Boost 1.50)");
123 #endif
124  for (i = 0; i < min_threads; i++)
125  add_worker();
126 }

References add_worker(), and m_p.

Here is the call graph for this function:

◆ ~ThreadPoolSocketObserver()

ThreadPoolSocketObserver::~ThreadPoolSocketObserver ( )
virtual

Definition at line 128 of file thread_pool_observer.cpp.

129 {
130  {
131  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
132  m_p->m_stop_flag = true;
133  m_p->m_cond_input_data.notify_all();
134  }
135  m_p->m_thrds.join_all();
136  m_p->m_socketObservable->deleteObserver(this);
137 }

References m_p.

Member Function Documentation

◆ add_worker()

void ThreadPoolSocketObserver::add_worker ( void  )
private

Definition at line 139 of file thread_pool_observer.cpp.

140 {
141  Worker w(this);
142 #if BOOST_VERSION >= 105000
143  boost::thread *x = new boost::thread(m_p->attrs, w);
144 #else
145  boost::thread *x = new boost::thread(w);
146 #endif
147  m_p->m_no_threads++;
148  m_p->m_thrds.add_thread(x);
149 }

References m_p.

Referenced by put(), and ThreadPoolSocketObserver().

◆ cleanup()

void ThreadPoolSocketObserver::cleanup ( IThreadPoolMsg m,
void *  info 
)

Definition at line 247 of file thread_pool_observer.cpp.

248 {
249  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
250 
251  std::deque<IThreadPoolMsg *>::iterator it = m_p->m_input.begin();
252  while (it != m_p->m_input.end())
253  {
254  if ((*it)->cleanup(info))
255  {
256  delete *it;
257  it = m_p->m_input.erase(it);
258  }
259  else
260  it++;
261  }
262 }

References m_p.

◆ get()

IThreadPoolMsg* metaproxy_1::ThreadPoolSocketObserver::get ( )

◆ get_thread_info()

void ThreadPoolSocketObserver::get_thread_info ( int &  tbusy,
int &  total 
)

Definition at line 201 of file thread_pool_observer.cpp.

202 {
203  tbusy = m_p->m_no_threads - m_p->m_waiting_threads;
204  total = m_p->m_no_threads;
205 }

References m_p.

◆ put()

void ThreadPoolSocketObserver::put ( IThreadPoolMsg m)

Definition at line 264 of file thread_pool_observer.cpp.

265 {
266  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
267  if (m_p->m_waiting_threads == 0 &&
268  m_p->m_no_threads < m_p->m_max_threads)
269  {
270  add_worker();
271  }
272  while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
273  m_p->m_cond_input_full.wait(input_lock);
274  m_p->m_input.push_back(m);
275  m_p->m_cond_input_data.notify_one();
276 }
const unsigned int queue_size_per_thread

References add_worker(), m_p, and metaproxy_1::queue_size_per_thread.

Here is the call graph for this function:

◆ run()

void ThreadPoolSocketObserver::run ( void *  p)

Definition at line 207 of file thread_pool_observer.cpp.

208 {
209  while(1)
210  {
211  IThreadPoolMsg *in = 0;
212  {
213  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
214  m_p->m_waiting_threads++;
215  while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
216  m_p->m_cond_input_data.wait(input_lock);
217  m_p->m_waiting_threads--;
218  if (m_p->m_stop_flag)
219  break;
220 
221  in = m_p->m_input.front();
222  m_p->m_input.pop_front();
223  m_p->m_cond_input_full.notify_all();
224  }
225  IThreadPoolMsg *out = in->handle();
226  {
227  boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
228  m_p->m_output.push_back(out);
229 #ifdef WIN32
230  send(m_p->m_pipe.write_fd(), "", 1, 0);
231 #else
232  ssize_t r = write(m_p->m_pipe.write_fd(), "", 1);
233  if (r != 1)
234  {
235  if (r == (ssize_t) (-1))
236  yaz_log(YLOG_WARN|YLOG_ERRNO,
237  "ThreadPoolSocketObserver::run. write fail");
238  else
239  yaz_log(YLOG_WARN,
240  "ThreadPoolSocketObserver::run. write returned 0");
241  }
242 #endif
243  }
244  }
245 }
virtual IThreadPoolMsg * handle()=0

References metaproxy_1::IThreadPoolMsg::handle(), and m_p.

Referenced by metaproxy_1::ThreadPoolSocketObserver::Worker::operator()().

Here is the call graph for this function:

◆ socketNotify()

void ThreadPoolSocketObserver::socketNotify ( int  event)
private

Definition at line 151 of file thread_pool_observer.cpp.

152 {
153  if (event & SOCKET_OBSERVE_READ)
154  {
155  char buf[2];
156 #ifdef WIN32
157  recv(m_p->m_pipe.read_fd(), buf, 1, 0);
158 #else
159  ssize_t r = read(m_p->m_pipe.read_fd(), buf, 1);
160  if (r != 1)
161  {
162  if (r == (ssize_t) (-1))
163  yaz_log(YLOG_WARN|YLOG_ERRNO,
164  "ThreadPoolSocketObserver::socketNotify. read fail");
165  else
166  yaz_log(YLOG_WARN,
167  "ThreadPoolSocketObserver::socketNotify. read returned 0");
168  }
169 #endif
170  while (1)
171  {
172  IThreadPoolMsg *out;
173  size_t output_size = 0;
174  {
175  boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
176  if (m_p->m_output.empty()) {
177  break;
178  }
179  out = m_p->m_output.front();
180  m_p->m_output.pop_front();
181  output_size = m_p->m_output.size();
182  }
183  if (out)
184  {
185  size_t input_size = 0;
186  std::ostringstream os;
187  {
188  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
189  input_size = m_p->m_input.size();
190  }
191  os << "tbusy/total "
192  << m_p->m_no_threads - m_p->m_waiting_threads
193  << "/" << m_p->m_no_threads
194  << " queue in/out " << input_size << "/" << output_size;
195  out->result(os.str().c_str());
196  }
197  }
198  }
199 }
virtual void result(const char *info)=0

References m_p, and metaproxy_1::IThreadPoolMsg::result().

Here is the call graph for this function:

Member Data Documentation

◆ m_p

boost::scoped_ptr<Rep> metaproxy_1::ThreadPoolSocketObserver::m_p
private

The documentation for this class was generated from the following files: