20 #include <metaproxy/package.hpp>
21 #include <metaproxy/filter.hpp>
23 #include <metaproxy/util.hpp>
26 #include <boost/thread/mutex.hpp>
28 #include <yaz/diagbib1.h>
41 namespace mp = metaproxy_1;
42 namespace yf = mp::filter;
53 void process(metaproxy_1::Package & package);
57 void add_dead(
unsigned long session_id);
61 void add_session(
unsigned long session_id, std::string target);
66 unsigned int cost(std::string target);
67 unsigned int dead(std::string target);
91 yf::LoadBalance::LoadBalance() : m_p(new Impl)
102 m_p->configure(xmlnode);
107 m_p->process(package);
111 yf::LoadBalance::Impl::Impl()
115 yf::LoadBalance::Impl::~Impl()
125 bool is_closed_front =
false;
128 if (package.session().is_closed())
130 is_closed_front =
true;
133 Z_GDU *gdu_req = package.request().get();
136 if (gdu_req && gdu_req->which == Z_GDU_Z3950)
139 if (gdu_req->u.z3950->which == Z_APDU_initRequest)
141 yazpp_1::GDU base_req(gdu_req);
142 Z_APDU *apdu = base_req.get()->u.z3950;
144 Z_InitRequest *org_init = base_req.get()->u.z3950->u.initRequest;
145 mp::odr odr_en(ODR_ENCODE);
147 std::list<std::string> vhosts;
148 mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
154 std::list<std::string>::iterator ivh = vhosts.begin();
156 Package init_pkg(package.session(), package.origin());
157 init_pkg.copy_filter(package);
159 unsigned int cost = std::numeric_limits<unsigned int>::max();
161 boost::mutex::scoped_lock scoped_lock(m_mutex);
163 for (; ivh != vhosts.end(); )
165 if ((*ivh).size() != 0)
168 = yf::LoadBalance::Impl::cost(*ivh);
169 yaz_log(YLOG_LOG,
"Consider %s cost=%u vhcost=%u",
170 (*ivh).c_str(), cost, vhcost);
175 ivh = vhosts.erase(ivh);
184 if (target.length() == 0)
188 yazpp_1::GDU init_gdu(base_req);
189 Z_InitRequest *init_req = init_gdu.get()->u.z3950->u.initRequest;
191 mp::util::set_vhost_otherinfo(&(init_req->otherInfo),
194 init_pkg.request() = init_gdu;
200 if (!init_pkg.session().is_closed())
202 add_session(package.session().id(), target);
204 package.response() = init_pkg.response();
209 package.response() = odr.create_initResponse(
210 apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
211 "load_balance: no available targets");
212 package.session().close();
216 else if (gdu_req->u.z3950->which == Z_APDU_close)
218 is_closed_front =
true;
219 boost::mutex::scoped_lock scoped_lock(m_mutex);
220 add_package(package.session().id());
225 boost::mutex::scoped_lock scoped_lock(m_mutex);
226 add_package(package.session().id());
233 bool is_closed_back =
false;
236 if (package.session().is_closed())
237 is_closed_back =
true;
239 Z_GDU *gdu_res = package.response().get();
242 if (gdu_res && gdu_res->which == Z_GDU_Z3950)
245 if (gdu_res->u.z3950->which == Z_APDU_close)
247 is_closed_back =
true;
248 boost::mutex::scoped_lock scoped_lock(m_mutex);
249 remove_package(package.session().id());
254 boost::mutex::scoped_lock scoped_lock(m_mutex);
255 remove_package(package.session().id());
260 if (is_closed_back || is_closed_front)
262 boost::mutex::scoped_lock scoped_lock(m_mutex);
265 if (is_closed_front ==
false)
266 add_dead(package.session().id());
268 remove_session(package.session().id());
271 package.session().close();
276 void yf::LoadBalance::Impl::add_dead(
unsigned long session_id)
278 std::string target = find_session_target(session_id);
280 if (target.size() != 0)
282 std::map<std::string, TargetStat>::iterator itarg;
283 itarg = m_target_stat.find(target);
284 if (itarg != m_target_stat.end()
285 && itarg->second.deads < std::numeric_limits<unsigned int>::max())
287 itarg->second.deads += 1;
294 void yf::LoadBalance::Impl::add_package(
unsigned long session_id)
296 std::string target = find_session_target(session_id);
298 if (target.size() != 0)
300 std::map<std::string, TargetStat>::iterator itarg;
301 itarg = m_target_stat.find(target);
302 if (itarg != m_target_stat.end()
303 && itarg->second.packages
304 < std::numeric_limits<unsigned int>::max())
306 itarg->second.packages += 1;
311 void yf::LoadBalance::Impl::remove_package(
unsigned long session_id)
313 std::string target = find_session_target(session_id);
315 if (target.size() != 0)
317 std::map<std::string, TargetStat>::iterator itarg;
318 itarg = m_target_stat.find(target);
319 if (itarg != m_target_stat.end()
320 && itarg->second.packages > 0)
322 itarg->second.packages -= 1;
327 void yf::LoadBalance::Impl::add_session(
unsigned long session_id,
331 std::map<unsigned long, std::string>::iterator isess;
332 isess = m_session_target.find(session_id);
333 if (isess == m_session_target.end())
335 m_session_target.insert(std::make_pair(session_id, target));
339 std::map<std::string, TargetStat>::iterator itarg;
340 itarg = m_target_stat.find(target);
341 if (itarg == m_target_stat.end())
347 m_target_stat.insert(std::make_pair(target, stat));
349 else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
351 itarg->second.sessions += 1;
355 void yf::LoadBalance::Impl::remove_session(
unsigned long session_id)
360 std::map<unsigned long, std::string>::iterator isess;
361 isess = m_session_target.find(session_id);
362 if (isess == m_session_target.end())
365 target = isess->second;
368 std::map<std::string, TargetStat>::iterator itarg;
369 itarg = m_target_stat.find(target);
370 if (itarg == m_target_stat.end())
372 m_session_target.erase(isess);
377 if (itarg->second.sessions > 0)
378 itarg->second.sessions -= 1;
380 if (itarg->second.sessions == 0 && itarg->second.deads == 0)
382 m_target_stat.erase(itarg);
383 m_session_target.erase(isess);
387 std::string yf::LoadBalance::Impl::find_session_target(
unsigned long session_id)
390 std::map<unsigned long, std::string>::iterator isess;
391 isess = m_session_target.find(session_id);
392 if (isess != m_session_target.end())
393 target = isess->second;
399 unsigned int yf::LoadBalance::Impl::cost(std::string target)
401 unsigned int cost = 0;
403 if (target.size() != 0)
405 std::map<std::string, TargetStat>::iterator itarg;
406 itarg = m_target_stat.find(target);
407 if (itarg != m_target_stat.end())
408 cost = itarg->second.cost();
413 unsigned int yf::LoadBalance::Impl::dead(std::string target)
415 unsigned int dead = 0;
417 if (target.size() != 0)
419 std::map<std::string, TargetStat>::iterator itarg;
420 itarg = m_target_stat.find(target);
421 if (itarg != m_target_stat.end())
422 dead = itarg->second.deads;