metaproxy  1.13.0
filter_load_balance.cpp
Go to the documentation of this file.
1 /* This file is part of Metaproxy.
2  Copyright (C) Index Data
3 
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8 
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 for more details.
13 
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "config.hpp"
20 #include <metaproxy/package.hpp>
21 #include <metaproxy/filter.hpp>
22 #include "filter_load_balance.hpp"
23 #include <metaproxy/util.hpp>
24 
25 
26 #include <boost/thread/mutex.hpp>
27 
28 #include <yaz/diagbib1.h>
29 #include <yaz/log.h>
30 #include <yaz/zgdu.h>
31 
32 // remove max macro if already defined (defined later in <limits>)
33 #ifdef max
34 #undef max
35 #endif
36 
37 #include <list>
38 #include <map>
39 #include <limits>
40 
41 namespace mp = metaproxy_1;
42 namespace yf = mp::filter;
43 
44 namespace metaproxy_1
45 {
46  namespace filter
47  {
49  {
50  public:
51  Impl();
52  ~Impl();
53  void process(metaproxy_1::Package & package);
54  void configure(const xmlNode * ptr);
55  private:
56  // statistic manipulating functions,
57  void add_dead(unsigned long session_id);
58  //void clear_dead(unsigned long session_id);
59  void add_package(unsigned long session_id);
60  void remove_package(unsigned long session_id);
61  void add_session(unsigned long session_id, std::string target);
62  void remove_session(unsigned long session_id);
63  std::string find_session_target(unsigned long session_id);
64 
65  // cost functions
66  unsigned int cost(std::string target);
67  unsigned int dead(std::string target);
68 
69  // local classes
70  class TargetStat {
71  public:
72  unsigned int sessions;
73  unsigned int packages;
74  unsigned int deads;
75  unsigned int cost() {
76  unsigned int c = sessions + packages + deads;
77  return c;
78  }
79  };
80 
81  // local protected databases
82  boost::mutex m_mutex;
83  std::map<std::string, TargetStat> m_target_stat;
84  std::map<unsigned long, std::string> m_session_target;
85  };
86  }
87 }
88 
89 // define Pimpl wrapper forwarding to Impl
90 
91 yf::LoadBalance::LoadBalance() : m_p(new Impl)
92 {
93 }
94 
95 yf::LoadBalance::~LoadBalance()
96 { // must have a destructor because of boost::scoped_ptr
97 }
98 
99 void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only,
100  const char *path)
101 {
102  m_p->configure(xmlnode);
103 }
104 
105 void yf::LoadBalance::process(mp::Package &package) const
106 {
107  m_p->process(package);
108 }
109 
110 
111 yf::LoadBalance::Impl::Impl()
112 {
113 }
114 
115 yf::LoadBalance::Impl::~Impl()
116 {
117 }
118 
119 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
120 {
121 }
122 
123 void yf::LoadBalance::Impl::process(mp::Package &package)
124 {
125  bool is_closed_front = false;
126 
127  // checking for closed front end packages
128  if (package.session().is_closed())
129  {
130  is_closed_front = true;
131  }
132 
133  Z_GDU *gdu_req = package.request().get();
134 
135  // passing anything but z3950 packages
136  if (gdu_req && gdu_req->which == Z_GDU_Z3950)
137  {
138  // target selecting only on Z39.50 init request
139  if (gdu_req->u.z3950->which == Z_APDU_initRequest)
140  {
141  yazpp_1::GDU base_req(gdu_req);
142  Z_APDU *apdu = base_req.get()->u.z3950;
143 
144  Z_InitRequest *org_init = base_req.get()->u.z3950->u.initRequest;
145  mp::odr odr_en(ODR_ENCODE);
146 
147  std::list<std::string> vhosts;
148  mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
149  // get lowest of all vhosts.. Remove them if individually if
150  // they turn out to be bad..
151  while (1)
152  {
153  std::list<std::string>::iterator ivh = vhosts.begin();
154  std::list<std::string>::iterator ivh_pick = vhosts.end();
155 
156  Package init_pkg(package.session(), package.origin());
157  init_pkg.copy_filter(package);
158 
159  unsigned int cost_i = std::numeric_limits<unsigned int>::max();
160  {
161  boost::mutex::scoped_lock scoped_lock(m_mutex);
162 
163  for (; ivh != vhosts.end(); ivh++)
164  {
165  if ((*ivh).size() != 0)
166  {
167  unsigned int cost
168  = yf::LoadBalance::Impl::cost(*ivh);
169 
170  std::ostringstream os;
171  os << "LB" << " "
172  << package << " "
173  << "0.000000" << " "
174  << "Consider " << *ivh
175  << " cost=" << cost;
176  yaz_log(YLOG_LOG, "%s", os.str().c_str());
177  if (cost_i > cost)
178  {
179  ivh_pick = ivh;
180  cost_i = cost;
181  }
182  }
183  }
184  }
185  if (ivh_pick == vhosts.end())
186  break;
187  std::string target = *ivh_pick;
188  vhosts.erase(ivh_pick);
189  // copying new target into init package
190  yazpp_1::GDU init_gdu(base_req);
191  Z_InitRequest *init_req = init_gdu.get()->u.z3950->u.initRequest;
192 
193  mp::util::set_vhost_otherinfo(&(init_req->otherInfo),
194  odr_en, target, 1);
195 
196  init_pkg.request() = init_gdu;
197 
198  // moving all package types
199  init_pkg.move();
200 
201  // checking for closed back end packages
202  if (!init_pkg.session().is_closed())
203  {
204  add_session(package.session().id(), target);
205 
206  package.response() = init_pkg.response();
207  return;
208  }
209  std::ostringstream os;
210  os << "LB" << " "
211  << package << " "
212  << "0.000000" << " "
213  << "Failed " << target;
214  yaz_log(YLOG_LOG, "%s", os.str().c_str());
215  }
216  mp::odr odr;
217  package.response() = odr.create_initResponse(
218  apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
219  "load_balance: no available targets");
220  package.session().close();
221  return;
222  }
223  // frontend Z39.50 close request is added to statistics and marked
224  else if (gdu_req->u.z3950->which == Z_APDU_close)
225  {
226  is_closed_front = true;
227  boost::mutex::scoped_lock scoped_lock(m_mutex);
228  add_package(package.session().id());
229  }
230  // any other Z39.50 package is added to statistics
231  else
232  {
233  boost::mutex::scoped_lock scoped_lock(m_mutex);
234  add_package(package.session().id());
235  }
236  }
237 
238  // moving all package types
239  package.move();
240 
241  bool is_closed_back = false;
242 
243  // checking for closed back end packages
244  if (package.session().is_closed())
245  is_closed_back = true;
246 
247  Z_GDU *gdu_res = package.response().get();
248 
249  // passing anything but z3950 packages
250  if (gdu_res && gdu_res->which == Z_GDU_Z3950)
251  {
252  // session closing only on Z39.50 close response
253  if (gdu_res->u.z3950->which == Z_APDU_close)
254  {
255  is_closed_back = true;
256  boost::mutex::scoped_lock scoped_lock(m_mutex);
257  remove_package(package.session().id());
258  }
259  // any other Z39.50 package is removed from statistics
260  else
261  {
262  boost::mutex::scoped_lock scoped_lock(m_mutex);
263  remove_package(package.session().id());
264  }
265  }
266 
267  // finally removing sessions and marking deads
268  if (is_closed_back || is_closed_front)
269  {
270  boost::mutex::scoped_lock scoped_lock(m_mutex);
271 
272  // marking backend dead if backend closed without fronted close
273  if (is_closed_front == false)
274  add_dead(package.session().id());
275 
276  remove_session(package.session().id());
277 
278  // making sure that package is closed
279  package.session().close();
280  }
281 }
282 
283 // statistic manipulating functions,
284 void yf::LoadBalance::Impl::add_dead(unsigned long session_id)
285 {
286  std::string target = find_session_target(session_id);
287 
288  if (target.size() != 0)
289  {
290  std::map<std::string, TargetStat>::iterator itarg;
291  itarg = m_target_stat.find(target);
292  if (itarg != m_target_stat.end()
293  && itarg->second.deads < std::numeric_limits<unsigned int>::max())
294  {
295  itarg->second.deads += 1;
296  // std:.cout << "add_dead " << session_id << " " << target
297  // << " d:" << itarg->second.deads << "\n";
298  }
299  }
300 }
301 
302 void yf::LoadBalance::Impl::add_package(unsigned long session_id)
303 {
304  std::string target = find_session_target(session_id);
305 
306  if (target.size() != 0)
307  {
308  std::map<std::string, TargetStat>::iterator itarg;
309  itarg = m_target_stat.find(target);
310  if (itarg != m_target_stat.end()
311  && itarg->second.packages
312  < std::numeric_limits<unsigned int>::max())
313  {
314  itarg->second.packages += 1;
315  }
316  }
317 }
318 
319 void yf::LoadBalance::Impl::remove_package(unsigned long session_id)
320 {
321  std::string target = find_session_target(session_id);
322 
323  if (target.size() != 0)
324  {
325  std::map<std::string, TargetStat>::iterator itarg;
326  itarg = m_target_stat.find(target);
327  if (itarg != m_target_stat.end()
328  && itarg->second.packages > 0)
329  {
330  itarg->second.packages -= 1;
331  }
332  }
333 }
334 
335 void yf::LoadBalance::Impl::add_session(unsigned long session_id,
336  std::string target)
337 {
338  // finding and adding session
339  std::map<unsigned long, std::string>::iterator isess;
340  isess = m_session_target.find(session_id);
341  if (isess == m_session_target.end())
342  {
343  m_session_target.insert(std::make_pair(session_id, target));
344  }
345 
346  // finding and adding target statistics
347  std::map<std::string, TargetStat>::iterator itarg;
348  itarg = m_target_stat.find(target);
349  if (itarg == m_target_stat.end())
350  {
351  TargetStat stat;
352  stat.sessions = 1;
353  stat.packages = 0;
354  stat.deads = 0;
355  m_target_stat.insert(std::make_pair(target, stat));
356  }
357  else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
358  {
359  itarg->second.sessions += 1;
360  }
361 }
362 
363 void yf::LoadBalance::Impl::remove_session(unsigned long session_id)
364 {
365  std::map<unsigned long, std::string>::iterator isess;
366  isess = m_session_target.find(session_id);
367  if (isess == m_session_target.end())
368  return;
369 
370  std::string target = isess->second;
371  m_session_target.erase(isess);
372 
373  // finding target statistics
374  std::map<std::string, TargetStat>::iterator itarg;
375  itarg = m_target_stat.find(target);
376  if (itarg == m_target_stat.end())
377  return;
378 
379  if (itarg->second.sessions > 0)
380  itarg->second.sessions -= 1;
381 
382  if (itarg->second.sessions == 0 || itarg->second.deads == 0)
383  m_target_stat.erase(itarg);
384 }
385 
386 std::string yf::LoadBalance::Impl::find_session_target(unsigned long session_id)
387 {
388  std::string target;
389  std::map<unsigned long, std::string>::iterator isess;
390  isess = m_session_target.find(session_id);
391  if (isess != m_session_target.end())
392  target = isess->second;
393  return target;
394 }
395 
396 
397 // cost functions
398 unsigned int yf::LoadBalance::Impl::cost(std::string target)
399 {
400  unsigned int cost = 0;
401 
402  if (target.size() != 0)
403  {
404  std::map<std::string, TargetStat>::iterator itarg;
405  itarg = m_target_stat.find(target);
406  if (itarg != m_target_stat.end())
407  cost = itarg->second.cost();
408  }
409  return cost;
410 }
411 
412 unsigned int yf::LoadBalance::Impl::dead(std::string target)
413 {
414  unsigned int dead = 0;
415 
416  if (target.size() != 0)
417  {
418  std::map<std::string, TargetStat>::iterator itarg;
419  itarg = m_target_stat.find(target);
420  if (itarg != m_target_stat.end())
421  dead = itarg->second.deads;
422  }
423  return dead;
424 }
425 
426 
427 static mp::filter::Base* filter_creator()
428 {
429  return new mp::filter::LoadBalance;
430 }
431 
432 extern "C" {
433  struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
434  0,
435  "load_balance",
437  };
438 }
439 
440 
441 /*
442  * Local variables:
443  * c-basic-offset: 4
444  * c-file-style: "Stroustrup"
445  * indent-tabs-mode: nil
446  * End:
447  * vim: shiftwidth=4 tabstop=8 expandtab
448  */
449 
std::string find_session_target(unsigned long session_id)
static mp::filter::Base * filter_creator()
unsigned int dead(std::string target)
void remove_package(unsigned long session_id)
void configure(const xmlNode *ptr)
void add_session(unsigned long session_id, std::string target)
std::map< unsigned long, std::string > m_session_target
void remove_session(unsigned long session_id)
struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance
void process(metaproxy_1::Package &package)
void add_package(unsigned long session_id)
unsigned int cost(std::string target)
void add_dead(unsigned long session_id)
std::map< std::string, TargetStat > m_target_stat