metaproxy  1.13.0
thread_pool_observer.cpp
Go to the documentation of this file.
1 /* This file is part of Metaproxy.
2  Copyright (C) Index Data
3 
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8 
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 for more details.
13 
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "config.hpp"
20 
21 #if HAVE_UNISTD_H
22 #include <unistd.h>
23 #endif
24 
25 #ifdef WIN32
26 #include <windows.h>
27 #include <winsock.h>
28 #endif
29 #include <boost/thread/thread.hpp>
30 #include <boost/thread/mutex.hpp>
31 #include <boost/thread/condition.hpp>
32 
33 #include <ctype.h>
34 #include <stdio.h>
35 
36 #include <deque>
37 #include <sstream>
38 
39 #include <yazpp/socket-observer.h>
40 #include <yaz/log.h>
41 
42 #include "thread_pool_observer.hpp"
43 #include "pipe.hpp"
44 
45 namespace metaproxy_1 {
47  public:
50  void operator() (void) {
51  m_s->run(0);
52  }
53  };
54 
55  class ThreadPoolSocketObserver::Rep : public boost::noncopyable {
57  public:
58  Rep(yazpp_1::ISocketObservable *obs);
59  ~Rep();
60  private:
61  yazpp_1::ISocketObservable *m_socketObservable;
63  boost::thread_group m_thrds;
64  boost::mutex m_mutex_input_data;
65  boost::condition m_cond_input_data;
66  boost::condition m_cond_input_full;
67  boost::mutex m_mutex_output_data;
68  std::deque<IThreadPoolMsg *> m_input;
69  std::deque<IThreadPoolMsg *> m_output;
71 #if BOOST_VERSION >= 105000
72  boost::thread::attributes attrs;
73 #endif
74  unsigned m_no_threads;
75  unsigned m_min_threads;
76  unsigned m_max_threads;
78  };
79  const unsigned int queue_size_per_thread = 64;
80 }
81 
82 
83 
84 using namespace yazpp_1;
85 using namespace metaproxy_1;
86 
87 ThreadPoolSocketObserver::Rep::Rep(yazpp_1::ISocketObservable *obs)
88  : m_socketObservable(obs), m_pipe(9123)
89 {
90 }
91 
93 {
94 }
95 
97 {
98 
99 }
100 
102  yazpp_1::ISocketObservable *obs,
103  unsigned min_threads, unsigned max_threads,
104  unsigned stack_size)
105  : m_p(new Rep(obs))
106 {
107  obs->addObserver(m_p->m_pipe.read_fd(), this);
108  obs->maskObserver(this, SOCKET_OBSERVE_READ);
109 
110  m_p->m_stop_flag = false;
111  m_p->m_no_threads = 0;
112  m_p->m_min_threads = min_threads;
113  m_p->m_max_threads = max_threads;
114  m_p->m_waiting_threads = 0;
115  unsigned i;
116 #if BOOST_VERSION >= 105000
117  if (stack_size > 0)
118  m_p->attrs.set_stack_size(stack_size);
119 #else
120  if (stack_size)
121  yaz_log(YLOG_WARN, "stack_size has no effect (Requires Boost 1.50)");
122 #endif
123  for (i = 0; i < min_threads; i++)
124  add_worker();
125 }
126 
128 {
129  {
130  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
131  m_p->m_stop_flag = true;
132  m_p->m_cond_input_data.notify_all();
133  }
134  m_p->m_thrds.join_all();
135  m_p->m_socketObservable->deleteObserver(this);
136 }
137 
139 {
140  Worker w(this);
141 #if BOOST_VERSION >= 105000
142  boost::thread *x = new boost::thread(m_p->attrs, w);
143 #else
144  boost::thread *x = new boost::thread(w);
145 #endif
146  m_p->m_no_threads++;
147  m_p->m_thrds.add_thread(x);
148 }
149 
151 {
152  if (event & SOCKET_OBSERVE_READ)
153  {
154  char buf[2];
155 #ifdef WIN32
156  recv(m_p->m_pipe.read_fd(), buf, 1, 0);
157 #else
158  ssize_t r = read(m_p->m_pipe.read_fd(), buf, 1);
159  if (r != 1)
160  {
161  if (r == (ssize_t) (-1))
162  yaz_log(YLOG_WARN|YLOG_ERRNO,
163  "ThreadPoolSocketObserver::socketNotify. read fail");
164  else
165  yaz_log(YLOG_WARN,
166  "ThreadPoolSocketObserver::socketNotify. read returned 0");
167  }
168 #endif
169  IThreadPoolMsg *out;
170  {
171  boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
172  out = m_p->m_output.front();
173  m_p->m_output.pop_front();
174  }
175  if (out)
176  {
177  std::ostringstream os;
178  {
179  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
180  os << "tbusy/total " <<
181  m_p->m_no_threads - m_p->m_waiting_threads <<
182  "/" << m_p->m_no_threads
183  << " queue in/out " << m_p->m_input.size() << "/"
184  << m_p->m_output.size();
185  }
186  out->result(os.str().c_str());
187  }
188  }
189 }
190 
191 void ThreadPoolSocketObserver::get_thread_info(int &tbusy, int &total)
192 {
193  tbusy = m_p->m_no_threads - m_p->m_waiting_threads;
194  total = m_p->m_no_threads;
195 }
196 
198 {
199  while(1)
200  {
201  IThreadPoolMsg *in = 0;
202  {
203  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
204  m_p->m_waiting_threads++;
205  while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
206  m_p->m_cond_input_data.wait(input_lock);
207  m_p->m_waiting_threads--;
208  if (m_p->m_stop_flag)
209  break;
210 
211  in = m_p->m_input.front();
212  m_p->m_input.pop_front();
213  m_p->m_cond_input_full.notify_all();
214  }
215  IThreadPoolMsg *out = in->handle();
216  {
217  boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
218  m_p->m_output.push_back(out);
219 #ifdef WIN32
220  send(m_p->m_pipe.write_fd(), "", 1, 0);
221 #else
222  ssize_t r = write(m_p->m_pipe.write_fd(), "", 1);
223  if (r != 1)
224  {
225  if (r == (ssize_t) (-1))
226  yaz_log(YLOG_WARN|YLOG_ERRNO,
227  "ThreadPoolSocketObserver::run. write fail");
228  else
229  yaz_log(YLOG_WARN,
230  "ThreadPoolSocketObserver::run. write returned 0");
231  }
232 #endif
233  }
234  }
235 }
236 
238 {
239  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
240 
241  std::deque<IThreadPoolMsg *>::iterator it = m_p->m_input.begin();
242  while (it != m_p->m_input.end())
243  {
244  if ((*it)->cleanup(info))
245  {
246  delete *it;
247  it = m_p->m_input.erase(it);
248  }
249  else
250  it++;
251  }
252 }
253 
255 {
256  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
257  if (m_p->m_waiting_threads == 0 &&
258  m_p->m_no_threads < m_p->m_max_threads)
259  {
260  add_worker();
261  }
262  while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
263  m_p->m_cond_input_full.wait(input_lock);
264  m_p->m_input.push_back(m);
265  m_p->m_cond_input_data.notify_one();
266 }
267 
268 /*
269  * Local variables:
270  * c-basic-offset: 4
271  * c-file-style: "Stroustrup"
272  * indent-tabs-mode: nil
273  * End:
274  * vim: shiftwidth=4 tabstop=8 expandtab
275  */
276 
virtual void result(const char *info)=0
virtual IThreadPoolMsg * handle()=0
const unsigned int queue_size_per_thread
void get_thread_info(int &tbusy, int &total)
void cleanup(IThreadPoolMsg *m, void *info)
ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs, unsigned min_threads, unsigned max_threads, unsigned stack_size)