metaproxy  1.3.55
filter_session_shared.cpp
Go to the documentation of this file.
1 /* This file is part of Metaproxy.
2  Copyright (C) 2005-2013 Index Data
3 
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8 
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 for more details.
13 
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "config.hpp"
20 
21 #include <metaproxy/filter.hpp>
22 #include <metaproxy/package.hpp>
23 
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30 
31 #include <metaproxy/util.hpp>
33 
34 #include <yaz/log.h>
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yazpp/z-query.h>
39 #include <yazpp/record-cache.h>
40 #include <map>
41 #include <iostream>
42 #include <time.h>
43 
44 namespace mp = metaproxy_1;
45 namespace yf = metaproxy_1::filter;
46 
47 namespace metaproxy_1 {
48 
49  namespace filter {
50  // key for session.. We'll only share sessions with same InitKey
52  public:
53  bool operator < (const SessionShared::InitKey &k) const;
54  InitKey(Z_InitRequest *req);
55  InitKey(const InitKey &);
56  ~InitKey();
57  private:
62  ODR m_odr;
63  };
64  // worker thread .. for expiry of sessions
66  public:
68  void operator() (void);
69  private:
71  };
72  // backend result set
74  public:
75  std::string m_result_set_id;
78  yazpp_1::Yaz_Z_Query m_query;
80  void timestamp();
81  yazpp_1::RecordCache m_record_cache;
82  BackendSet(
83  const std::string &result_set_id,
84  const Databases &databases,
85  const yazpp_1::Yaz_Z_Query &query);
86  bool search(
87  Package &frontend_package,
88  Package &search_package,
89  const Z_APDU *apdu_req,
90  const BackendInstancePtr bp,
91  Z_Records **z_records);
92  };
93  // backend connection instance
95  friend class Rep;
96  friend class BackendClass;
97  friend class BackendSet;
98  public:
99  mp::Session m_session;
101  bool m_in_use;
105  mp::Package * m_close_package;
107  void timestamp();
108  };
109  // backends of some class (all with same InitKey)
110  class SessionShared::BackendClass : boost::noncopyable {
111  friend class Rep;
112  friend struct Frontend;
115  BackendInstancePtr create_backend(const Package &package);
117  BackendInstancePtr get_backend(const Package &package);
120  void expire_class();
121  yazpp_1::GDU m_init_request;
122  yazpp_1::GDU m_init_response;
123  boost::mutex m_mutex_backend_class;
128  public:
129  BackendClass(const yazpp_1::GDU &init_request,
130  int resultset_ttl,
131  int resultset_max,
132  int session_ttl);
133  ~BackendClass();
134  };
135  // frontend result set
138  yazpp_1::Yaz_Z_Query m_query;
139  public:
140  const Databases &get_databases();
141  const yazpp_1::Yaz_Z_Query &get_query();
142  FrontendSet(
143  const Databases &databases,
144  const yazpp_1::Yaz_Z_Query &query);
145  FrontendSet();
146  };
147  // frontend session
149  Frontend(Rep *rep);
150  ~Frontend();
152  bool m_in_use;
153  Z_Options m_init_options;
154  void search(Package &package, Z_APDU *apdu);
155  void present(Package &package, Z_APDU *apdu);
156  void scan(Package &package, Z_APDU *apdu);
157 
158  void get_set(mp::Package &package,
159  const Z_APDU *apdu_req,
160  const Databases &databases,
161  yazpp_1::Yaz_Z_Query &query,
162  BackendInstancePtr &found_backend,
163  BackendSetPtr &found_set);
164  void override_set(BackendInstancePtr &found_backend,
165  std::string &result_set_id,
166  const Databases &databases,
167  bool out_of_sessions);
168 
172  };
173  // representation
175  friend class SessionShared;
176  friend struct Frontend;
177 
178  FrontendPtr get_frontend(Package &package);
179  void release_frontend(Package &package);
180  Rep();
181  public:
182  void expire();
183  private:
184  void init(Package &package, const Z_GDU *gdu,
185  FrontendPtr frontend);
186  void start();
187  boost::mutex m_mutex;
188  boost::condition m_cond_session_ready;
189  std::map<mp::Session, FrontendPtr> m_clients;
190 
192  boost::mutex m_mutex_backend_map;
193  boost::thread_group m_thrds;
198  bool m_restart;
200  };
201  }
202 }
203 
204 yf::SessionShared::FrontendSet::FrontendSet(
205  const Databases &databases,
206  const yazpp_1::Yaz_Z_Query &query)
207  : m_databases(databases), m_query(query)
208 {
209 }
210 
211 const yf::SessionShared::Databases &
212 yf::SessionShared::FrontendSet::get_databases()
213 {
214  return m_databases;
215 }
216 
217 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
218 {
219  return m_query;
220 }
221 
222 yf::SessionShared::InitKey::InitKey(const InitKey &k)
223 {
224  m_odr = odr_createmem(ODR_ENCODE);
225 
226  m_idAuthentication_size = k.m_idAuthentication_size;
227  m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
228  memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
229  m_idAuthentication_size);
230 
231  m_otherInfo_size = k.m_otherInfo_size;
232  m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
233  memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
234  m_otherInfo_size);
235 }
236 
237 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
238 {
239  m_odr = odr_createmem(ODR_ENCODE);
240 
241  Z_IdAuthentication *t = req->idAuthentication;
242  z_IdAuthentication(m_odr, &t, 1, 0);
243  m_idAuthentication_buf =
244  odr_getbuf(m_odr, &m_idAuthentication_size, 0);
245 
246  Z_OtherInformation *o = req->otherInfo;
247  z_OtherInformation(m_odr, &o, 1, 0);
248  m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
249 }
250 
251 yf::SessionShared::InitKey::~InitKey()
252 {
253  odr_destroy(m_odr);
254 }
255 
256 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
257  const
258 {
259  int c;
260  c = mp::util::memcmp2(
261  (void*) m_idAuthentication_buf, m_idAuthentication_size,
262  (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
263  if (c < 0)
264  return true;
265  else if (c > 0)
266  return false;
267 
268  c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
269  (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
270  if (c < 0)
271  return true;
272  else if (c > 0)
273  return false;
274  return false;
275 }
276 
277 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
278 {
279  boost::mutex::scoped_lock lock(m_mutex_backend_class);
280  b->m_in_use = false;
281 }
282 
283 
284 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
285 {
286  BackendInstanceList::iterator it = m_backend_list.begin();
287 
288  while (it != m_backend_list.end())
289  {
290  if (*it == b)
291  {
292  mp::odr odr;
293  (*it)->m_close_package->response() = odr.create_close(
294  0, Z_Close_lackOfActivity, 0);
295  (*it)->m_close_package->session().close();
296  (*it)->m_close_package->move();
297 
298  it = m_backend_list.erase(it);
299  }
300  else
301  it++;
302  }
303 }
304 
305 
306 
307 yf::SessionShared::BackendInstancePtr
308 yf::SessionShared::BackendClass::get_backend(
309  const mp::Package &frontend_package)
310 {
311  {
312  boost::mutex::scoped_lock lock(m_mutex_backend_class);
313 
314  BackendInstanceList::const_iterator it = m_backend_list.begin();
315 
316  BackendInstancePtr backend1; // null
317 
318  for (; it != m_backend_list.end(); it++)
319  {
320  if (!(*it)->m_in_use)
321  {
322  if (!backend1
323  || (*it)->m_sequence_this < backend1->m_sequence_this)
324  backend1 = *it;
325  }
326  }
327  if (backend1)
328  {
329  use_backend(backend1);
330  return backend1;
331  }
332  }
333  return create_backend(frontend_package);
334 }
335 
336 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
337 {
338  backend->m_in_use = true;
339  backend->m_sequence_this = m_sequence_top++;
340 }
341 
342 void yf::SessionShared::BackendInstance::timestamp()
343 {
344  assert(m_in_use);
345  time(&m_time_last_use);
346 }
347 
348 yf::SessionShared::BackendInstance::~BackendInstance()
349 {
350  delete m_close_package;
351 }
352 
353 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
354  const mp::Package &frontend_package)
355 {
356  BackendInstancePtr bp(new BackendInstance);
357  BackendInstancePtr null;
358 
359  bp->m_close_package =
360  new mp::Package(bp->m_session, frontend_package.origin());
361  bp->m_close_package->copy_filter(frontend_package);
362 
363  Package init_package(bp->m_session, frontend_package.origin());
364 
365  init_package.copy_filter(frontend_package);
366 
367  yazpp_1::GDU actual_init_request = m_init_request;
368  Z_GDU *init_pdu = actual_init_request.get();
369 
370  assert(init_pdu->which == Z_GDU_Z3950);
371  assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
372 
373  Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
374  ODR_MASK_ZERO(req->options);
375 
376  ODR_MASK_SET(req->options, Z_Options_search);
377  ODR_MASK_SET(req->options, Z_Options_present);
378  ODR_MASK_SET(req->options, Z_Options_namedResultSets);
379  ODR_MASK_SET(req->options, Z_Options_scan);
380 
381  ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
382  ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
383  ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
384 
385  init_package.request() = init_pdu;
386 
387  init_package.move();
388 
389  boost::mutex::scoped_lock lock(m_mutex_backend_class);
390 
391  m_named_result_sets = false;
392  Z_GDU *gdu = init_package.response().get();
393 
394  if (gdu && gdu->which == Z_GDU_Z3950
395  && gdu->u.z3950->which == Z_APDU_initResponse)
396  {
397  Z_InitResponse *res = gdu->u.z3950->u.initResponse;
398  m_init_response = gdu->u.z3950;
399  if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
400  {
401  m_named_result_sets = true;
402  }
403  if (*gdu->u.z3950->u.initResponse->result
404  && !init_package.session().is_closed())
405  {
406  bp->m_in_use = true;
407  time(&bp->m_time_last_use);
408  bp->m_sequence_this = 0;
409  bp->m_result_set_sequence = 0;
410  m_backend_list.push_back(bp);
411  return bp;
412  }
413  }
414  else
415  {
416  yazpp_1::GDU empty_gdu;
417  m_init_response = empty_gdu;
418  }
419 
420  if (!init_package.session().is_closed())
421  {
422  init_package.copy_filter(frontend_package);
423  init_package.session().close();
424  init_package.move();
425  }
426  return null;
427 }
428 
429 
430 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
431  int resultset_ttl,
432  int resultset_max,
433  int session_ttl)
434  : m_named_result_sets(false), m_init_request(init_request),
435  m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
436  m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
437 {}
438 
439 yf::SessionShared::BackendClass::~BackendClass()
440 {}
441 
442 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
443  FrontendPtr frontend)
444 {
445  Z_InitRequest *req = gdu->u.z3950->u.initRequest;
446 
447  frontend->m_is_virtual = true;
448  frontend->m_init_options = *req->options;
449  InitKey k(req);
450  {
451  boost::mutex::scoped_lock lock(m_mutex_backend_map);
452  BackendClassMap::const_iterator it;
453  it = m_backend_map.find(k);
454  if (it == m_backend_map.end())
455  {
456  BackendClassPtr b(new BackendClass(gdu->u.z3950,
457  m_resultset_ttl,
458  m_resultset_max,
459  m_session_ttl));
460  m_backend_map[k] = b;
461  frontend->m_backend_class = b;
462  }
463  else
464  {
465  frontend->m_backend_class = it->second;
466  }
467  }
468  BackendClassPtr bc = frontend->m_backend_class;
469  mp::odr odr;
470 
471  // we only need to get init response from "first" target in
472  // backend class - the assumption being that init response is
473  // same for all
474  if (bc->m_backend_list.size() == 0)
475  {
476  BackendInstancePtr backend = bc->create_backend(package);
477 
478  if (backend)
479  bc->release_backend(backend);
480  }
481 
482  yazpp_1::GDU init_response;
483  {
484  boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
485 
486  init_response = bc->m_init_response;
487  }
488 
489  if (init_response.get())
490  {
491  Z_GDU *response_gdu = init_response.get();
492  mp::util::transfer_referenceId(odr, gdu->u.z3950,
493  response_gdu->u.z3950);
494  Z_Options *server_options =
495  response_gdu->u.z3950->u.initResponse->options;
496  Z_Options *client_options = &frontend->m_init_options;
497  int i;
498  for (i = 0; i < 30; i++)
499  if (!ODR_MASK_GET(client_options, i))
500  ODR_MASK_CLEAR(server_options, i);
501  package.response() = init_response;
502  if (!*response_gdu->u.z3950->u.initResponse->result)
503  package.session().close();
504  }
505  else
506  {
507  Z_APDU *apdu =
508  odr.create_initResponse(
509  gdu->u.z3950, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
510  "session_shared: target closed connection during init");
511  *apdu->u.initResponse->result = 0;
512  package.response() = apdu;
513  package.session().close();
514  }
515 }
516 
517 void yf::SessionShared::BackendSet::timestamp()
518 {
519  time(&m_time_last_use);
520 }
521 
522 yf::SessionShared::BackendSet::BackendSet(
523  const std::string &result_set_id,
524  const Databases &databases,
525  const yazpp_1::Yaz_Z_Query &query) :
526  m_result_set_id(result_set_id),
527  m_databases(databases), m_result_set_size(0), m_query(query)
528 {
529  timestamp();
530 }
531 
532 static int get_diagnostic(Z_DefaultDiagFormat *r)
533 {
534  return *r->condition;
535 }
536 
537 bool yf::SessionShared::BackendSet::search(
538  mp::Package &frontend_package,
539  mp::Package &search_package,
540  const Z_APDU *frontend_apdu,
541  const BackendInstancePtr bp,
542  Z_Records **z_records)
543 {
544  mp::odr odr;
545  Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
546  Z_SearchRequest *req = apdu_req->u.searchRequest;
547 
548  req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
549  req->query = m_query.get_Z_Query();
550 
551  req->num_databaseNames = m_databases.size();
552  req->databaseNames = (char**)
553  odr_malloc(odr, req->num_databaseNames * sizeof(char *));
554  Databases::const_iterator it = m_databases.begin();
555  size_t i = 0;
556  for (; it != m_databases.end(); it++)
557  req->databaseNames[i++] = odr_strdup(odr, it->c_str());
558 
559  if (frontend_apdu->which == Z_APDU_searchRequest)
560  req->preferredRecordSyntax =
561  frontend_apdu->u.searchRequest->preferredRecordSyntax;
562 
563  search_package.request() = apdu_req;
564 
565  search_package.move();
566 
567  Z_GDU *gdu = search_package.response().get();
568  if (!search_package.session().is_closed()
569  && gdu && gdu->which == Z_GDU_Z3950
570  && gdu->u.z3950->which == Z_APDU_searchResponse)
571  {
572  Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
573  *z_records = b_resp->records;
574  m_result_set_size = *b_resp->resultCount;
575  return true;
576  }
577  Z_APDU *f_apdu = 0;
578  const char *addinfo = "session_shared: "
579  "target closed connection during search";
580  if (frontend_apdu->which == Z_APDU_searchRequest)
581  f_apdu = odr.create_searchResponse(
582  frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
583  else if (frontend_apdu->which == Z_APDU_presentRequest)
584  f_apdu = odr.create_presentResponse(
585  frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
586  else
587  f_apdu = odr.create_close(
588  frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
589  frontend_package.response() = f_apdu;
590  return false;
591 }
592 
593 void yf::SessionShared::Frontend::override_set(
594  BackendInstancePtr &found_backend,
595  std::string &result_set_id,
596  const Databases &databases,
597  bool out_of_sessions)
598 {
599  BackendClassPtr bc = m_backend_class;
600  BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
601  time_t now;
602  time(&now);
603 
604  size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
605  for (; it != bc->m_backend_list.end(); it++)
606  {
607  if (!(*it)->m_in_use)
608  {
609  BackendSetList::iterator set_it = (*it)->m_sets.begin();
610  for (; set_it != (*it)->m_sets.end(); set_it++)
611  {
612  if ((max_sets > 1 || (*set_it)->m_databases == databases)
613  &&
614  (out_of_sessions ||
615  now < (*set_it)->m_time_last_use ||
616  now - (*set_it)->m_time_last_use >= bc->m_backend_set_ttl))
617  {
618  found_backend = *it;
619  result_set_id = (*set_it)->m_result_set_id;
620  found_backend->m_sets.erase(set_it);
621  return;
622  }
623  }
624  }
625  }
626  for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
627  {
628  if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
629  {
630  found_backend = *it;
631  if (bc->m_named_result_sets)
632  {
633  result_set_id = boost::io::str(
634  boost::format("%1%") %
635  found_backend->m_result_set_sequence);
636  found_backend->m_result_set_sequence++;
637  }
638  else
639  result_set_id = "default";
640  return;
641  }
642  }
643 }
644 
645 void yf::SessionShared::Frontend::get_set(mp::Package &package,
646  const Z_APDU *apdu_req,
647  const Databases &databases,
648  yazpp_1::Yaz_Z_Query &query,
649  BackendInstancePtr &found_backend,
650  BackendSetPtr &found_set)
651 {
652  bool session_restarted = false;
653 
654 restart:
655  std::string result_set_id;
656  bool out_of_sessions = false;
657  BackendClassPtr bc = m_backend_class;
658  {
659  boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
660 
661  if ((int) bc->m_backend_list.size() >= m_p->m_session_max)
662  out_of_sessions = true;
663 
664  if (m_p->m_optimize_search)
665  {
666  // look at each backend and see if we have a similar search
667  BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
668  for (; it != bc->m_backend_list.end(); it++)
669  {
670  if (!(*it)->m_in_use)
671  {
672  BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
673  for (; set_it != (*it)->m_sets.end(); set_it++)
674  {
675  if ((*set_it)->m_databases == databases
676  && query.match(&(*set_it)->m_query))
677  {
678  found_set = *set_it;
679  found_backend = *it;
680  bc->use_backend(found_backend);
681  // found matching set. No need to search again
682  return;
683  }
684  }
685  }
686  }
687  }
688  override_set(found_backend, result_set_id, databases, out_of_sessions);
689  if (found_backend)
690  bc->use_backend(found_backend);
691  }
692  if (!found_backend)
693  {
694  // create a new backend set (and new set) if we're not out of sessions
695  if (!out_of_sessions)
696  found_backend = bc->create_backend(package);
697 
698  if (!found_backend)
699  {
700  Z_APDU *f_apdu = 0;
701  mp::odr odr;
702  const char *addinfo = 0;
703 
704  if (out_of_sessions)
705  addinfo = "session_shared: all sessions in use";
706  else
707  addinfo = "session_shared: could not create backend";
708  if (apdu_req->which == Z_APDU_searchRequest)
709  {
710  f_apdu = odr.create_searchResponse(
711  apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
712  }
713  else if (apdu_req->which == Z_APDU_presentRequest)
714  {
715  f_apdu = odr.create_presentResponse(
716  apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
717  }
718  else
719  {
720  f_apdu = odr.create_close(
721  apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
722  }
723  package.response() = f_apdu;
724  return;
725  }
726  if (bc->m_named_result_sets)
727  {
728  result_set_id = boost::io::str(
729  boost::format("%1%") % found_backend->m_result_set_sequence);
730  found_backend->m_result_set_sequence++;
731  }
732  else
733  result_set_id = "default";
734  }
735  found_backend->timestamp();
736 
737  // we must search ...
738  BackendSetPtr new_set(new BackendSet(result_set_id,
739  databases, query));
740  Z_Records *z_records = 0;
741 
742  Package search_package(found_backend->m_session, package.origin());
743  search_package.copy_filter(package);
744 
745  if (!new_set->search(package, search_package,
746  apdu_req, found_backend, &z_records))
747  {
748  bc->remove_backend(found_backend);
749  return; // search error
750  }
751 
752  if (z_records)
753  {
754  int condition = 0;
755  if (z_records->which == Z_Records_NSD)
756  {
757  condition =
758  get_diagnostic(z_records->u.nonSurrogateDiagnostic);
759  }
760  else if (z_records->which == Z_Records_multipleNSD)
761  {
762  if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1
763  &&
764 
765  z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which ==
766  Z_DiagRec_defaultFormat)
767  {
768  condition = get_diagnostic(
769  z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat);
770 
771  }
772  }
773  if (m_p->m_restart && !session_restarted &&
774  condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR)
775  {
776  package.log("session_shared", YLOG_LOG, "restart");
777  bc->remove_backend(found_backend);
778  session_restarted = true;
779  found_backend.reset();
780  goto restart;
781 
782  }
783 
784  if (condition)
785  {
786  mp::odr odr;
787  if (apdu_req->which == Z_APDU_searchRequest)
788  {
789  Z_APDU *f_apdu = odr.create_searchResponse(apdu_req,
790  0, 0);
791  Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
792  *f_resp->searchStatus = Z_SearchResponse_none;
793  f_resp->records = z_records;
794  package.response() = f_apdu;
795  }
796  if (apdu_req->which == Z_APDU_presentRequest)
797  {
798  Z_APDU *f_apdu = odr.create_presentResponse(apdu_req,
799  0, 0);
800  Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
801  f_resp->records = z_records;
802  package.response() = f_apdu;
803  }
804  bc->release_backend(found_backend);
805  return; // search error
806  }
807  }
808  if (m_p->m_restart && !session_restarted && new_set->m_result_set_size < 0)
809  {
810  package.log("session_shared", YLOG_LOG, "restart");
811  bc->remove_backend(found_backend);
812  session_restarted = true;
813  found_backend.reset();
814  goto restart;
815  }
816 
817  found_set = new_set;
818  found_set->timestamp();
819  found_backend->m_sets.push_back(found_set);
820 }
821 
822 void yf::SessionShared::Frontend::search(mp::Package &package,
823  Z_APDU *apdu_req)
824 {
825  Z_SearchRequest *req = apdu_req->u.searchRequest;
826  FrontendSets::iterator fset_it =
827  m_frontend_sets.find(req->resultSetName);
828  if (fset_it != m_frontend_sets.end())
829  {
830  // result set already exist
831  // if replace indicator is off: we return diagnostic if
832  // result set already exist.
833  if (*req->replaceIndicator == 0)
834  {
835  mp::odr odr;
836  Z_APDU *apdu =
837  odr.create_searchResponse(
838  apdu_req,
839  YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
840  0);
841  package.response() = apdu;
842 
843  return;
844  }
845  m_frontend_sets.erase(fset_it);
846  }
847 
848  yazpp_1::Yaz_Z_Query query;
849  query.set_Z_Query(req->query);
850  Databases databases;
851  int i;
852  for (i = 0; i < req->num_databaseNames; i++)
853  databases.push_back(req->databaseNames[i]);
854 
855  BackendSetPtr found_set; // null
856  BackendInstancePtr found_backend; // null
857 
858  get_set(package, apdu_req, databases, query, found_backend, found_set);
859  if (!found_set)
860  return;
861 
862  mp::odr odr;
863  Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
864  Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
865  *f_resp->resultCount = found_set->m_result_set_size;
866  package.response() = f_apdu;
867 
868  FrontendSetPtr fset(new FrontendSet(databases, query));
869  m_frontend_sets[req->resultSetName] = fset;
870 
871  m_backend_class->release_backend(found_backend);
872 }
873 
874 void yf::SessionShared::Frontend::present(mp::Package &package,
875  Z_APDU *apdu_req)
876 {
877  mp::odr odr;
878  Z_PresentRequest *req = apdu_req->u.presentRequest;
879 
880  FrontendSets::iterator fset_it =
881  m_frontend_sets.find(req->resultSetId);
882 
883  if (fset_it == m_frontend_sets.end())
884  {
885  Z_APDU *apdu =
886  odr.create_presentResponse(
887  apdu_req,
888  YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
889  req->resultSetId);
890  package.response() = apdu;
891  return;
892  }
893  FrontendSetPtr fset = fset_it->second;
894 
895  Databases databases = fset->get_databases();
896  yazpp_1::Yaz_Z_Query query = fset->get_query();
897 
898  BackendClassPtr bc = m_backend_class;
899  BackendSetPtr found_set; // null
900  BackendInstancePtr found_backend;
901 
902  get_set(package, apdu_req, databases, query, found_backend, found_set);
903  if (!found_set)
904  return;
905 
906  Z_NamePlusRecordList *npr_res = 0;
907  if (found_set->m_record_cache.lookup(odr, &npr_res,
908  *req->resultSetStartPoint,
909  *req->numberOfRecordsRequested,
910  req->preferredRecordSyntax,
911  req->recordComposition))
912  {
913  Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
914  Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
915 
916  yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF
917  " records in cache %p",
918  *req->resultSetStartPoint,
919  *req->numberOfRecordsRequested,
920  &found_set->m_record_cache);
921 
922  *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested;
923  *f_resp->nextResultSetPosition =
924  *req->resultSetStartPoint + *req->numberOfRecordsRequested;
925  // f_resp->presentStatus assumed OK.
926  f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
927  f_resp->records->which = Z_Records_DBOSD;
928  f_resp->records->u.databaseOrSurDiagnostics = npr_res;
929  package.response() = f_apdu_res;
930  bc->release_backend(found_backend);
931  return;
932  }
933 
934  found_backend->timestamp();
935 
936  Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
937  Z_PresentRequest *p_req = p_apdu->u.presentRequest;
938  p_req->preferredRecordSyntax = req->preferredRecordSyntax;
939  p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
940  *p_req->resultSetStartPoint = *req->resultSetStartPoint;
941  *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
942  p_req->preferredRecordSyntax = req->preferredRecordSyntax;
943  p_req->recordComposition = req->recordComposition;
944 
945  Package present_package(found_backend->m_session, package.origin());
946  present_package.copy_filter(package);
947 
948  present_package.request() = p_apdu;
949 
950  present_package.move();
951 
952  Z_GDU *gdu = present_package.response().get();
953  if (!present_package.session().is_closed()
954  && gdu && gdu->which == Z_GDU_Z3950
955  && gdu->u.z3950->which == Z_APDU_presentResponse)
956  {
957  Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
958  Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
959  Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
960 
961  f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
962  f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
963  f_resp->presentStatus= b_resp->presentStatus;
964  f_resp->records = b_resp->records;
965  f_resp->otherInfo = b_resp->otherInfo;
966  package.response() = f_apdu_res;
967 
968  if (b_resp->records && b_resp->records->which == Z_Records_DBOSD)
969  {
970  yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF
971  " records to cache %p",
972  *req->resultSetStartPoint,
973  *f_resp->numberOfRecordsReturned,
974  &found_set->m_record_cache);
975  found_set->m_record_cache.add(
976  odr,
977  b_resp->records->u.databaseOrSurDiagnostics,
978  *req->resultSetStartPoint,
979  *f_resp->numberOfRecordsReturned);
980  }
981  bc->release_backend(found_backend);
982  }
983  else
984  {
985  bc->remove_backend(found_backend);
986  Z_APDU *f_apdu_res =
987  odr.create_presentResponse(
988  apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
989  "session_shared: target closed connection during present");
990  package.response() = f_apdu_res;
991  }
992 }
993 
994 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
995  Z_APDU *apdu_req)
996 {
997  BackendClassPtr bc = m_backend_class;
998  BackendInstancePtr backend = bc->get_backend(frontend_package);
999  if (!backend)
1000  {
1001  mp::odr odr;
1002  Z_APDU *apdu = odr.create_scanResponse(
1003  apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1004  "session_shared: could not create backend");
1005  frontend_package.response() = apdu;
1006  }
1007  else
1008  {
1009  Package scan_package(backend->m_session, frontend_package.origin());
1010  backend->timestamp();
1011  scan_package.copy_filter(frontend_package);
1012  scan_package.request() = apdu_req;
1013  scan_package.move();
1014  frontend_package.response() = scan_package.response();
1015  if (scan_package.session().is_closed())
1016  {
1017  frontend_package.session().close();
1018  bc->remove_backend(backend);
1019  }
1020  else
1021  bc->release_backend(backend);
1022  }
1023 }
1024 
1025 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
1026 {
1027 }
1028 
1029 void yf::SessionShared::Worker::operator() (void)
1030 {
1031  m_p->expire();
1032 }
1033 
1034 void yf::SessionShared::BackendClass::expire_class()
1035 {
1036  time_t now;
1037  time(&now);
1038  boost::mutex::scoped_lock lock(m_mutex_backend_class);
1039  BackendInstanceList::iterator bit = m_backend_list.begin();
1040  while (bit != m_backend_list.end())
1041  {
1042  time_t last_use = (*bit)->m_time_last_use;
1043 
1044  if ((*bit)->m_in_use)
1045  {
1046  bit++;
1047  }
1048  else if (now < last_use || now - last_use > m_backend_expiry_ttl)
1049  {
1050  mp::odr odr;
1051  (*bit)->m_close_package->response() = odr.create_close(
1052  0, Z_Close_lackOfActivity, 0);
1053  (*bit)->m_close_package->session().close();
1054  (*bit)->m_close_package->move();
1055 
1056  bit = m_backend_list.erase(bit);
1057  }
1058  else
1059  {
1060  bit++;
1061  }
1062  }
1063 }
1064 
1065 void yf::SessionShared::Rep::expire()
1066 {
1067  while (true)
1068  {
1069  boost::xtime xt;
1070  boost::xtime_get(&xt,
1071 #if BOOST_VERSION >= 105000
1072  boost::TIME_UTC_
1073 #else
1074  boost::TIME_UTC
1075 #endif
1076  );
1077  xt.sec += m_session_ttl / 3;
1078  boost::thread::sleep(xt);
1079 
1080  BackendClassMap::const_iterator b_it = m_backend_map.begin();
1081  for (; b_it != m_backend_map.end(); b_it++)
1082  b_it->second->expire_class();
1083  }
1084 }
1085 
1086 yf::SessionShared::Rep::Rep()
1087 {
1088  m_resultset_ttl = 30;
1089  m_resultset_max = 10;
1090  m_session_ttl = 90;
1091  m_optimize_search = true;
1092  m_restart = false;
1093  m_session_max = 100;
1094 }
1095 
1096 void yf::SessionShared::Rep::start()
1097 {
1098  yf::SessionShared::Worker w(this);
1099  m_thrds.add_thread(new boost::thread(w));
1100 }
1101 
1102 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
1103 {
1104 }
1105 
1106 yf::SessionShared::~SessionShared() {
1107 }
1108 
1109 void yf::SessionShared::start() const
1110 {
1111  m_p->start();
1112 }
1113 
1114 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
1115 {
1116 }
1117 
1118 yf::SessionShared::Frontend::~Frontend()
1119 {
1120 }
1121 
1122 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
1123 {
1124  boost::mutex::scoped_lock lock(m_mutex);
1125 
1126  std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1127 
1128  while(true)
1129  {
1130  it = m_clients.find(package.session());
1131  if (it == m_clients.end())
1132  break;
1133 
1134  if (!it->second->m_in_use)
1135  {
1136  it->second->m_in_use = true;
1137  return it->second;
1138  }
1139  m_cond_session_ready.wait(lock);
1140  }
1141  FrontendPtr f(new Frontend(this));
1142  m_clients[package.session()] = f;
1143  f->m_in_use = true;
1144  return f;
1145 }
1146 
1147 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
1148 {
1149  boost::mutex::scoped_lock lock(m_mutex);
1150  std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1151 
1152  it = m_clients.find(package.session());
1153  if (it != m_clients.end())
1154  {
1155  if (package.session().is_closed())
1156  {
1157  m_clients.erase(it);
1158  }
1159  else
1160  {
1161  it->second->m_in_use = false;
1162  }
1163  m_cond_session_ready.notify_all();
1164  }
1165 }
1166 
1167 
1168 void yf::SessionShared::process(mp::Package &package) const
1169 {
1170  FrontendPtr f = m_p->get_frontend(package);
1171 
1172  Z_GDU *gdu = package.request().get();
1173 
1174  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1175  Z_APDU_initRequest && !f->m_is_virtual)
1176  {
1177  m_p->init(package, gdu, f);
1178  }
1179  else if (!f->m_is_virtual)
1180  package.move();
1181  else if (gdu && gdu->which == Z_GDU_Z3950)
1182  {
1183  Z_APDU *apdu = gdu->u.z3950;
1184  if (apdu->which == Z_APDU_initRequest)
1185  {
1186  mp::odr odr;
1187 
1188  package.response() = odr.create_close(
1189  apdu,
1190  Z_Close_protocolError,
1191  "double init");
1192 
1193  package.session().close();
1194  }
1195  else if (apdu->which == Z_APDU_close)
1196  {
1197  mp::odr odr;
1198 
1199  package.response() = odr.create_close(
1200  apdu,
1201  Z_Close_peerAbort, "received close from client");
1202  package.session().close();
1203  }
1204  else if (apdu->which == Z_APDU_searchRequest)
1205  {
1206  f->search(package, apdu);
1207  }
1208  else if (apdu->which == Z_APDU_presentRequest)
1209  {
1210  f->present(package, apdu);
1211  }
1212  else if (apdu->which == Z_APDU_scanRequest)
1213  {
1214  f->scan(package, apdu);
1215  }
1216  else
1217  {
1218  mp::odr odr;
1219 
1220  package.response() = odr.create_close(
1221  apdu, Z_Close_protocolError,
1222  "unsupported APDU in filter_session_shared");
1223 
1224  package.session().close();
1225  }
1226  }
1227  m_p->release_frontend(package);
1228 }
1229 
1230 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only,
1231  const char *path)
1232 {
1233  for (ptr = ptr->children; ptr; ptr = ptr->next)
1234  {
1235  if (ptr->type != XML_ELEMENT_NODE)
1236  continue;
1237  if (!strcmp((const char *) ptr->name, "resultset"))
1238  {
1239  const struct _xmlAttr *attr;
1240  for (attr = ptr->properties; attr; attr = attr->next)
1241  {
1242  if (!strcmp((const char *) attr->name, "ttl"))
1243  m_p->m_resultset_ttl =
1244  mp::xml::get_int(attr->children, 30);
1245  else if (!strcmp((const char *) attr->name, "max"))
1246  {
1247  m_p->m_resultset_max =
1248  mp::xml::get_int(attr->children, 10);
1249  }
1250  else if (!strcmp((const char *) attr->name, "optimizesearch"))
1251  {
1252  m_p->m_optimize_search =
1253  mp::xml::get_bool(attr->children, true);
1254  }
1255  else if (!strcmp((const char *) attr->name, "restart"))
1256  {
1257  m_p->m_restart = mp::xml::get_bool(attr->children, true);
1258  }
1259  else
1260  throw mp::filter::FilterException(
1261  "Bad attribute " + std::string((const char *)
1262  attr->name));
1263  }
1264  }
1265  else if (!strcmp((const char *) ptr->name, "session"))
1266  {
1267  const struct _xmlAttr *attr;
1268  for (attr = ptr->properties; attr; attr = attr->next)
1269  {
1270  if (!strcmp((const char *) attr->name, "ttl"))
1271  m_p->m_session_ttl =
1272  mp::xml::get_int(attr->children, 90);
1273  else if (!strcmp((const char *) attr->name, "max"))
1274  m_p->m_session_max =
1275  mp::xml::get_int(attr->children, 100);
1276  else
1277  throw mp::filter::FilterException(
1278  "Bad attribute " + std::string((const char *)
1279  attr->name));
1280  }
1281  }
1282  else
1283  {
1284  throw mp::filter::FilterException("Bad element "
1285  + std::string((const char *)
1286  ptr->name));
1287  }
1288  }
1289 }
1290 
1291 static mp::filter::Base* filter_creator()
1292 {
1293  return new mp::filter::SessionShared;
1294 }
1295 
1296 extern "C" {
1297  struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1298  0,
1299  "session_shared",
1301  };
1302 }
1303 
1304 /*
1305  * Local variables:
1306  * c-basic-offset: 4
1307  * c-file-style: "Stroustrup"
1308  * indent-tabs-mode: nil
1309  * End:
1310  * vim: shiftwidth=4 tabstop=8 expandtab
1311  */
1312