metaproxy  1.21.0
filter_session_shared.cpp
Go to the documentation of this file.
1 /* This file is part of Metaproxy.
2  Copyright (C) Index Data
3 
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8 
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 for more details.
13 
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "config.hpp"
20 
21 #include <metaproxy/filter.hpp>
22 #include <metaproxy/package.hpp>
23 
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30 
31 #include <metaproxy/util.hpp>
33 
34 #include <yaz/copy_types.h>
35 #include <yaz/log.h>
36 #include <yaz/zgdu.h>
37 #include <yaz/otherinfo.h>
38 #include <yaz/diagbib1.h>
39 #include <yazpp/z-query.h>
40 #include <yazpp/record-cache.h>
41 #include <map>
42 #include <iostream>
43 #include <time.h>
44 #include <limits.h>
45 
46 namespace mp = metaproxy_1;
47 namespace yf = metaproxy_1::filter;
48 
49 namespace metaproxy_1 {
50 
51  namespace filter {
52  // key for session.. We'll only share sessions with same InitKey
54  public:
55  bool operator < (const SessionShared::InitKey &k) const;
56  InitKey(Z_InitRequest *req);
57  InitKey(const InitKey &);
58  ~InitKey();
59  private:
64  ODR m_odr;
65  };
66  // worker thread .. for expiry of sessions
68  public:
70  void operator() (void);
71  private:
73  };
74  // backend result set
76  public:
77  std::string m_result_set_id;
80  yazpp_1::Yaz_Z_Query m_query;
82  void timestamp();
83  yazpp_1::RecordCache m_record_cache;
84 
85  Z_OtherInformation *additionalSearchInfoRequest;
86  Z_OtherInformation *additionalSearchInfoResponse;
88  BackendSet(
89  const std::string &result_set_id,
90  const Databases &databases,
91  const yazpp_1::Yaz_Z_Query &query,
92  Z_OtherInformation *additionalSearchInfoRequest);
93  ~BackendSet();
94  bool search(
95  Package &frontend_package,
96  Package &search_package,
97  const Z_APDU *apdu_req,
98  const BackendInstancePtr bp,
99  Z_Records **z_records);
100  };
101  // backend connection instance
103  friend class Rep;
104  friend class BackendClass;
105  friend class BackendSet;
106  public:
107  mp::Session m_session;
109  bool m_in_use;
113  mp::Package * m_close_package;
115  void timestamp();
116  };
117  // backends of some class (all with same InitKey)
118  class SessionShared::BackendClass : boost::noncopyable {
119  friend class Rep;
120  friend struct Frontend;
123  BackendInstancePtr create_backend(const Package &package,
124  int &code, std::string &addinfo);
126  BackendInstancePtr get_backend(const Package &package,
127  int &code, std::string &addinfo);
130  bool expire_instances();
131  yazpp_1::GDU m_init_request;
132  yazpp_1::GDU m_init_response;
133  boost::mutex m_mutex_backend_class;
134  boost::condition m_cond_set_ready;
144  public:
145  BackendClass(const yazpp_1::GDU &init_request,
146  int resultset_ttl,
147  int resultset_max,
148  int session_ttl,
149  Odr_int preferredRecordSize,
150  Odr_int maximumRecordSize);
151  ~BackendClass();
152  };
153  // frontend result set
156  yazpp_1::Yaz_Z_Query m_query;
157  public:
158  const Databases &get_databases();
159  const yazpp_1::Yaz_Z_Query &get_query();
160  FrontendSet(
161  const Databases &databases,
162  const yazpp_1::Yaz_Z_Query &query);
164  };
165  // frontend session
167  Frontend(Rep *rep);
168  ~Frontend();
170  bool m_in_use;
171  Z_Options m_init_options;
172  void search(Package &package, Z_APDU *apdu);
173  void present(Package &package, Z_APDU *apdu);
174  void scan(Package &package, Z_APDU *apdu);
175 
176  int result_set_ref(ODR o,
177  const Databases &databases,
178  Z_RPNStructure *s, std::string &rset);
179  void get_set(mp::Package &package,
180  const Z_APDU *apdu_req,
181  const Databases &databases,
182  yazpp_1::Yaz_Z_Query &query,
183  BackendInstancePtr &found_backend,
184  BackendSetPtr &found_set);
185  void override_set(BackendInstancePtr &found_backend,
186  std::string &result_set_id,
187  const Databases &databases,
188  bool out_of_sessions);
189 
193  };
194  // representation
196  friend class SessionShared;
197  friend struct Frontend;
198 
199  FrontendPtr get_frontend(Package &package);
200  void release_frontend(Package &package);
201  Rep();
202  public:
203  ~Rep();
204  void expire();
205  private:
206  void expire_classes();
207  void stat();
208  void init(Package &package, const Z_GDU *gdu,
209  FrontendPtr frontend);
210  void start();
211  boost::mutex m_mutex;
212  boost::condition m_cond_session_ready;
213  boost::condition m_cond_expire_ready;
214  std::map<mp::Session, FrontendPtr> m_clients;
215 
217  boost::mutex m_mutex_backend_map;
218  boost::thread_group m_thrds;
223  bool m_restart;
229  };
230  }
231 }
232 
233 yf::SessionShared::FrontendSet::FrontendSet(
234  const Databases &databases,
235  const yazpp_1::Yaz_Z_Query &query)
236  : m_databases(databases), m_query(query)
237 {
238 }
239 
240 const yf::SessionShared::Databases &
241 yf::SessionShared::FrontendSet::get_databases()
242 {
243  return m_databases;
244 }
245 
246 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
247 {
248  return m_query;
249 }
250 
251 yf::SessionShared::InitKey::InitKey(const InitKey &k)
252 {
253  m_odr = odr_createmem(ODR_ENCODE);
254 
255  m_idAuthentication_size = k.m_idAuthentication_size;
256  m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
257  memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
258  m_idAuthentication_size);
259 
260  m_otherInfo_size = k.m_otherInfo_size;
261  m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
262  memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
263  m_otherInfo_size);
264 }
265 
266 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
267 {
268  m_odr = odr_createmem(ODR_ENCODE);
269 
270  Z_IdAuthentication *t = req->idAuthentication;
271  z_IdAuthentication(m_odr, &t, 1, 0);
272  m_idAuthentication_buf =
273  odr_getbuf(m_odr, &m_idAuthentication_size, 0);
274 
275  Z_OtherInformation *o = req->otherInfo;
276  z_OtherInformation(m_odr, &o, 1, 0);
277  m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
278 }
279 
280 yf::SessionShared::InitKey::~InitKey()
281 {
282  odr_destroy(m_odr);
283 }
284 
285 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
286  const
287 {
288  int c;
289  c = mp::util::memcmp2(
290  (void*) m_idAuthentication_buf, m_idAuthentication_size,
292  if (c < 0)
293  return true;
294  else if (c > 0)
295  return false;
296 
297  c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
298  (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
299  if (c < 0)
300  return true;
301  else if (c > 0)
302  return false;
303  return false;
304 }
305 
306 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
307 {
308  boost::mutex::scoped_lock lock(m_mutex_backend_class);
309  m_cond_set_ready.notify_all();
310  b->m_in_use = false;
311 }
312 
313 
314 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
315 {
316  {
317  boost::mutex::scoped_lock lock(m_mutex_backend_class);
318  BackendInstanceList::iterator it = m_backend_list.begin();
319  for (;;)
320  {
321  if (it == m_backend_list.end())
322  return;
323  if (*it == b)
324  {
325  it = m_backend_list.erase(it);
326  break;
327  }
328  it++;
329  }
330  }
331  mp::odr odr;
332  b->m_close_package->response() = odr.create_close(
333  0, Z_Close_lackOfActivity, 0);
334  b->m_close_package->session().close();
335  b->m_close_package->move();
336 }
337 
338 
339 yf::SessionShared::BackendInstancePtr
340 yf::SessionShared::BackendClass::get_backend(
341  const mp::Package &frontend_package,
342  int &code, std::string &addinfo)
343 {
344  {
345  boost::mutex::scoped_lock lock(m_mutex_backend_class);
346 
347  BackendInstanceList::const_iterator it = m_backend_list.begin();
348 
349  BackendInstancePtr backend1; // null
350 
351  for (; it != m_backend_list.end(); it++)
352  {
353  if (!(*it)->m_in_use)
354  {
355  if (!backend1
356  || (*it)->m_sequence_this < backend1->m_sequence_this)
357  backend1 = *it;
358  }
359  }
360  if (backend1)
361  {
362  use_backend(backend1);
363  return backend1;
364  }
365  }
366  return create_backend(frontend_package, code, addinfo);
367 }
368 
369 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
370 {
371  backend->m_in_use = true;
372  backend->m_sequence_this = m_sequence_top++;
373 }
374 
375 void yf::SessionShared::BackendInstance::timestamp()
376 {
377  assert(m_in_use);
378  time(&m_time_last_use);
379 }
380 
381 yf::SessionShared::BackendInstance::~BackendInstance()
382 {
383  if (m_close_package)
384  {
385  mp::odr odr;
386  m_close_package->response() = odr.create_close(
387  0, Z_Close_lackOfActivity, 0);
388  m_close_package->session().close();
389  m_close_package->move();
390  }
391  delete m_close_package;
392 }
393 
394 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
395  const mp::Package &frontend_package,
396  int &code, std::string &addinfo)
397 {
398  BackendInstancePtr null;
400  bp->m_close_package =
401  new mp::Package(bp->m_session, frontend_package.origin());
402  bp->m_close_package->copy_filter(frontend_package);
403 
404  Package init_package(bp->m_session, frontend_package.origin());
405 
406  init_package.copy_filter(frontend_package);
407 
408  yazpp_1::GDU actual_init_request = m_init_request;
409  Z_GDU *init_pdu = actual_init_request.get();
410 
411  assert(init_pdu->which == Z_GDU_Z3950);
412  assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
413 
414  Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
415  ODR_MASK_ZERO(req->options);
416 
417  ODR_MASK_SET(req->options, Z_Options_search);
418  ODR_MASK_SET(req->options, Z_Options_present);
419  ODR_MASK_SET(req->options, Z_Options_namedResultSets);
420  ODR_MASK_SET(req->options, Z_Options_scan);
421 
422  ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
423  ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
424  ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
425 
426  if (m_preferredMessageSize)
427  *req->preferredMessageSize = m_preferredMessageSize;
428  if (m_maximumRecordSize)
429  *req->maximumRecordSize = m_maximumRecordSize;
430 
431  init_package.request() = init_pdu;
432 
433  {
434  boost::mutex::scoped_lock lock(m_mutex_backend_class);
435  m_no_init++;
436  }
437 
438  init_package.move();
439 
440  boost::mutex::scoped_lock lock(m_mutex_backend_class);
441 
442  addinfo.clear();
443  code = 0;
444  m_named_result_sets = false;
445  Z_GDU *gdu = init_package.response().get();
446 
447  if (gdu && gdu->which == Z_GDU_Z3950
448  && gdu->u.z3950->which == Z_APDU_initResponse)
449  {
450  Z_InitResponse *res = gdu->u.z3950->u.initResponse;
451 
452  if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
453  {
454  m_named_result_sets = true;
455  }
456  // save init-response until we get one that succeeds
457  if (m_no_succeeded == 0)
458  m_init_response = gdu->u.z3950;
459  if (*gdu->u.z3950->u.initResponse->result
460  && !init_package.session().is_closed())
461  {
462  bp->m_in_use = true;
463  time(&bp->m_time_last_use);
464  bp->m_sequence_this = 0;
465  bp->m_result_set_sequence = 0;
466  m_backend_list.push_back(bp);
467  m_no_succeeded++;
468  return bp;
469  }
470  else
471  {
472  Z_External *uif =
473  gdu->u.z3950->u.initResponse->userInformationField;
474  if (uif && uif->which == Z_External_userInfo1)
475  {
476  Z_OtherInformation *ui = uif->u.userInfo1;
477  if (ui && ui->num_elements >= 1)
478  {
479  Z_OtherInformationUnit *unit = ui->list[0];
480  if (unit->which == Z_OtherInfo_externallyDefinedInfo &&
481  unit->information.externallyDefinedInfo &&
482  unit->information.externallyDefinedInfo->which ==
483  Z_External_diag1)
484  {
485  Z_DiagnosticFormat *diag =
486  unit->information.externallyDefinedInfo->u.diag1;
487  if (diag->num >= 1)
488  {
489  Z_DiagnosticFormat_s *ds = diag->elements[0];
490  if (ds->which ==
491  Z_DiagnosticFormat_s_defaultDiagRec)
492  {
493  Z_DefaultDiagFormat *e =
494  ds->u.defaultDiagRec;
495  code = *e->condition;
496  if (e->which == Z_DefaultDiagFormat_v2Addinfo
497  && e->u.v2Addinfo)
498  {
499  addinfo = e->u.v2Addinfo;
500  }
501  else if (
502  e->which == Z_DefaultDiagFormat_v3Addinfo
503  && e->u.v3Addinfo)
504  {
505  addinfo = e->u.v3Addinfo;
506  }
507  }
508  }
509  }
510  }
511  }
512  }
513  }
514  if (!init_package.session().is_closed())
515  {
516  init_package.copy_filter(frontend_package);
517  init_package.session().close();
518  init_package.move();
519  }
520  m_no_failed++;
521  m_cond_set_ready.notify_all();
522  return null;
523 }
524 
525 
526 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
527  int resultset_ttl,
528  int resultset_max,
529  int session_ttl,
530  Odr_int preferredMessageSize,
531  Odr_int maximumRecordSize)
532  : m_named_result_sets(false), m_init_request(init_request),
533  m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
534  m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max),
535  m_preferredMessageSize(preferredMessageSize),
536  m_maximumRecordSize(maximumRecordSize),
537  m_no_failed(0), m_no_succeeded(0), m_no_init(0)
538 {}
539 
540 yf::SessionShared::BackendClass::~BackendClass()
541 {}
542 
543 void yf::SessionShared::Rep::stat()
544 {
545  int no_classes = 0;
546  int no_instances = 0;
547  BackendClassMap::const_iterator it;
548  {
549  boost::mutex::scoped_lock lock(m_mutex_backend_map);
550  for (it = m_backend_map.begin(); it != m_backend_map.end(); it++)
551  {
552  BackendClassPtr bc = it->second;
553  no_classes++;
554  BackendInstanceList::iterator bit = bc->m_backend_list.begin();
555  for (; bit != bc->m_backend_list.end(); bit++)
556  no_instances++;
557  }
558  }
559 }
560 
561 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
562  FrontendPtr frontend)
563 {
564  Z_InitRequest *req = gdu->u.z3950->u.initRequest;
565 
566  frontend->m_is_virtual = true;
567  frontend->m_init_options = *req->options;
568  if (m_ignore_auth)
569  req->idAuthentication = 0;
570  InitKey k(req);
571  {
572  boost::mutex::scoped_lock lock(m_mutex_backend_map);
573  BackendClassMap::const_iterator it;
574  it = m_backend_map.find(k);
575  if (it == m_backend_map.end())
576  {
577  BackendClassPtr b(new BackendClass(gdu->u.z3950,
578  m_resultset_ttl,
579  m_resultset_max,
580  m_session_ttl,
581  m_preferredMessageSize,
582  m_maximumRecordSize));
583  m_backend_map[k] = b;
584  frontend->m_backend_class = b;
585  }
586  else
587  {
588  frontend->m_backend_class = it->second;
589  }
590  }
591  BackendClassPtr bc = frontend->m_backend_class;
592  mp::odr odr;
593 
594  // we only need to get init response from "first" target in
595  // backend class - the assumption being that init response is
596  // same for all
597  bool create_first_one = false;
598  {
599  boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
600  if (!bc->m_no_succeeded)
601  create_first_one = true;
602  else
603  {
604  // wait for first one to finish
605  while (!bc->m_no_failed && !bc->m_no_succeeded && bc->m_no_init)
606  {
607  bc->m_cond_set_ready.wait(lock);
608  }
609  }
610  }
611  if (create_first_one)
612  {
613 
614  int code;
615  std::string addinfo;
616  BackendInstancePtr backend = bc->create_backend(package, code, addinfo);
617  if (backend)
618  bc->release_backend(backend);
619  }
620  yazpp_1::GDU init_response;
621  {
622  boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
623 
624  init_response = bc->m_init_response;
625  }
626 
627  if (init_response.get())
628  {
629  Z_GDU *response_gdu = init_response.get();
630  mp::util::transfer_referenceId(odr, gdu->u.z3950,
631  response_gdu->u.z3950);
632  Z_InitResponse *init_res = response_gdu->u.z3950->u.initResponse;
633  Z_Options *server_options = init_res->options;
634  Z_Options *client_options = &frontend->m_init_options;
635  int i;
636  for (i = 0; i < 30; i++)
637  if (!ODR_MASK_GET(client_options, i))
638  ODR_MASK_CLEAR(server_options, i);
639 
640  if (!m_preferredMessageSize ||
641  *init_res->preferredMessageSize > *req->preferredMessageSize)
642  *init_res->preferredMessageSize = *req->preferredMessageSize;
643 
644  if (!m_maximumRecordSize ||
645  *init_res->maximumRecordSize > *req->maximumRecordSize)
646  *init_res->maximumRecordSize = *req->maximumRecordSize;
647 
648  package.response() = init_response;
649  if (!*response_gdu->u.z3950->u.initResponse->result)
650  package.session().close();
651  }
652  else
653  {
654  Z_APDU *apdu =
655  odr.create_initResponse(
656  gdu->u.z3950, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
657  "session_shared: target closed connection during init");
658  *apdu->u.initResponse->result = 0;
659  package.response() = apdu;
660  package.session().close();
661  }
662 }
663 
664 void yf::SessionShared::BackendSet::timestamp()
665 {
666  time(&m_time_last_use);
667 }
668 
669 yf::SessionShared::BackendSet::BackendSet(
670  const std::string &result_set_id,
671  const Databases &databases,
672  const yazpp_1::Yaz_Z_Query &query,
673  Z_OtherInformation *additionalSearchInfo) :
674  m_result_set_id(result_set_id),
675  m_databases(databases), m_result_set_size(0), m_query(query)
676 {
677  timestamp();
678  mem_additionalSearchInfo = nmem_create();
681  yaz_clone_z_OtherInformation(additionalSearchInfo,
683 }
684 
685 yf::SessionShared::BackendSet::~BackendSet()
686 {
687  nmem_destroy(mem_additionalSearchInfo);
688 }
689 
690 static int get_diagnostic(Z_DefaultDiagFormat *r)
691 {
692  return *r->condition;
693 }
694 
695 bool yf::SessionShared::BackendSet::search(
696  mp::Package &frontend_package,
697  mp::Package &search_package,
698  const Z_APDU *frontend_apdu,
699  const BackendInstancePtr bp,
700  Z_Records **z_records)
701 {
702  mp::odr odr;
703  Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
704  Z_SearchRequest *req = apdu_req->u.searchRequest;
705 
706  req->additionalSearchInfo = additionalSearchInfoRequest;
707  req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
708  req->query = m_query.get_Z_Query();
709 
710  req->num_databaseNames = m_databases.size();
711  req->databaseNames = (char**)
712  odr_malloc(odr, req->num_databaseNames * sizeof(char *));
713  Databases::const_iterator it = m_databases.begin();
714  size_t i = 0;
715  for (; it != m_databases.end(); it++)
716  req->databaseNames[i++] = odr_strdup(odr, it->c_str());
717 
718  if (frontend_apdu->which == Z_APDU_searchRequest)
719  req->preferredRecordSyntax =
720  frontend_apdu->u.searchRequest->preferredRecordSyntax;
721 
722  search_package.request() = apdu_req;
723 
724  search_package.move();
725 
726  Z_GDU *gdu = search_package.response().get();
727  if (!search_package.session().is_closed()
728  && gdu && gdu->which == Z_GDU_Z3950
729  && gdu->u.z3950->which == Z_APDU_searchResponse)
730  {
731  Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
732  *z_records = b_resp->records;
733  m_result_set_size = *b_resp->resultCount;
734 
735  additionalSearchInfoResponse = yaz_clone_z_OtherInformation(
736  b_resp->additionalSearchInfo, mem_additionalSearchInfo);
737  return true;
738  }
739  Z_APDU *f_apdu = 0;
740  const char *addinfo = "session_shared: "
741  "target closed connection during search";
742  if (frontend_apdu->which == Z_APDU_searchRequest)
743  f_apdu = odr.create_searchResponse(
744  frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
745  else if (frontend_apdu->which == Z_APDU_presentRequest)
746  f_apdu = odr.create_presentResponse(
747  frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
748  else
749  f_apdu = odr.create_close(
750  frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
751  frontend_package.response() = f_apdu;
752  return false;
753 }
754 
755 void yf::SessionShared::Frontend::override_set(
756  BackendInstancePtr &found_backend,
757  std::string &result_set_id,
758  const Databases &databases,
759  bool out_of_sessions)
760 {
761  BackendClassPtr bc = m_backend_class;
762  BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
763  time_t now;
764  time(&now);
765 
766  size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
767  for (; it != bc->m_backend_list.end(); it++)
768  {
769  if (!(*it)->m_in_use)
770  {
771  BackendSetList::iterator set_it = (*it)->m_sets.begin();
772  for (; set_it != (*it)->m_sets.end(); set_it++)
773  {
774  bool same_databases = mp::util::match((*set_it)->m_databases,
775  databases);
776  if ((max_sets > 1 || same_databases)
777  &&
778  (out_of_sessions ||
779  now < (*set_it)->m_time_last_use ||
780  now - (*set_it)->m_time_last_use >= bc->m_backend_set_ttl))
781  {
782  found_backend = *it;
783  result_set_id = (*set_it)->m_result_set_id;
784  found_backend->m_sets.erase(set_it);
785  return;
786  }
787  }
788  }
789  }
790  for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
791  {
792  if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
793  {
794  found_backend = *it;
795  if (bc->m_named_result_sets)
796  {
797  result_set_id = boost::io::str(
798  boost::format("%1%") %
799  found_backend->m_result_set_sequence);
800  found_backend->m_result_set_sequence++;
801  }
802  else
803  result_set_id = "default";
804  return;
805  }
806  }
807 }
808 
809 void yf::SessionShared::Frontend::get_set(mp::Package &package,
810  const Z_APDU *apdu_req,
811  const Databases &databases,
812  yazpp_1::Yaz_Z_Query &query,
813  BackendInstancePtr &found_backend,
814  BackendSetPtr &found_set)
815 {
816  bool session_restarted = false;
817  Z_OtherInformation *additionalSearchInfo = 0;
818 
819  if (apdu_req->which == Z_APDU_searchRequest)
820  additionalSearchInfo = apdu_req->u.searchRequest->additionalSearchInfo;
821 
822 restart:
823  std::string result_set_id;
824  bool out_of_sessions = false;
825  BackendClassPtr bc = m_backend_class;
826  {
827  boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
828 
829  if ((int) bc->m_backend_list.size() >= m_p->m_session_max)
830  out_of_sessions = true;
831 
832  if (m_p->m_optimize_search)
833  {
834  // look at each backend and see if we have a similar search
835  BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
836  while (it != bc->m_backend_list.end())
837  {
838  bool restart = false;
839  BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
840  for (; set_it != (*it)->m_sets.end(); set_it++)
841  {
842  // for real present request we don't care
843  // if additionalSearchInfo matches: same records
844  if (mp::util::match((*set_it)->m_databases, databases)
845  && query.match(&(*set_it)->m_query)
846  && (apdu_req->which != Z_APDU_searchRequest ||
847  yaz_compare_z_OtherInformation(
848  additionalSearchInfo,
849  (*set_it)->additionalSearchInfoRequest)))
850  {
851  if ((*it)->m_in_use)
852  {
853  bc->m_cond_set_ready.wait(lock);
854  restart = true;
855  break;
856  }
857  else
858  {
859  found_set = *set_it;
860  found_backend = *it;
861  bc->use_backend(found_backend);
862  // found matching set. No need to search again
863  return;
864  }
865  }
866  }
867  if (restart)
868  it = bc->m_backend_list.begin();
869  else
870  it++;
871  }
872  }
873  override_set(found_backend, result_set_id, databases, out_of_sessions);
874  if (found_backend)
875  bc->use_backend(found_backend);
876  }
877  if (!found_backend)
878  {
879  int code;
880  std::string addinfo;
881  // create a new backend set (and new set) if we're not out of sessions
882  if (!out_of_sessions)
883  found_backend = bc->create_backend(package, code, addinfo);
884  if (!found_backend)
885  {
886  Z_APDU *f_apdu = 0;
887  mp::odr odr;
888 
889  if (out_of_sessions)
890  {
891  code = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
892  addinfo = "session_shared: all sessions in use";
893  }
894  else
895  {
896  if (code == 0)
897  code = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
898  if (addinfo.length() == 0)
899  addinfo = "session_shared: could not create backend";
900  }
901  if (apdu_req->which == Z_APDU_searchRequest)
902  {
903  f_apdu = odr.create_searchResponse(
904  apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo.c_str());
905  }
906  else if (apdu_req->which == Z_APDU_presentRequest)
907  {
908  f_apdu = odr.create_presentResponse(
909  apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo.c_str());
910  }
911  else
912  {
913  f_apdu = odr.create_close(
914  apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo.c_str());
915  }
916  package.response() = f_apdu;
917  return;
918  }
919  if (bc->m_named_result_sets)
920  {
921  result_set_id = boost::io::str(
922  boost::format("%1%") % found_backend->m_result_set_sequence);
923  found_backend->m_result_set_sequence++;
924  }
925  else
926  result_set_id = "default";
927  }
928  found_backend->timestamp();
929 
930  // we must search ...
931  BackendSetPtr new_set(new BackendSet(result_set_id,
932  databases, query,
933  additionalSearchInfo));
934 
935  found_set = new_set;
936  found_set->timestamp();
937 
938  Z_Records *z_records = 0;
939 
940  Package search_package(found_backend->m_session, package.origin());
941  search_package.copy_filter(package);
942 
943  if (!new_set->search(package, search_package,
944  apdu_req, found_backend, &z_records))
945  {
946  bc->remove_backend(found_backend);
947  found_set.reset();
948  return; // search error
949  }
950 
951  if (z_records)
952  {
953  int condition = 0;
954  if (z_records->which == Z_Records_NSD)
955  {
956  condition =
957  get_diagnostic(z_records->u.nonSurrogateDiagnostic);
958  }
959  else if (z_records->which == Z_Records_multipleNSD)
960  {
961  if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1
962  &&
963 
964  z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which ==
965  Z_DiagRec_defaultFormat)
966  {
967  condition = get_diagnostic(
968  z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat);
969 
970  }
971  }
972  if (m_p->m_restart && !session_restarted &&
973  condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR)
974  {
975  package.log("session_shared", YLOG_LOG, "restart");
976  bc->remove_backend(found_backend);
977  session_restarted = true;
978  found_backend.reset();
979  goto restart;
980 
981  }
982 
983  if (condition)
984  {
985  mp::odr odr;
986  if (apdu_req->which == Z_APDU_searchRequest)
987  {
988  Z_APDU *f_apdu = odr.create_searchResponse(apdu_req,
989  0, 0);
990  Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
991  *f_resp->searchStatus = Z_SearchResponse_none;
992  f_resp->records = z_records;
993  package.response() = f_apdu;
994  }
995  if (apdu_req->which == Z_APDU_presentRequest)
996  {
997  Z_APDU *f_apdu = odr.create_presentResponse(apdu_req,
998  0, 0);
999  Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
1000  f_resp->records = z_records;
1001  package.response() = f_apdu;
1002  }
1003  bc->release_backend(found_backend);
1004  found_set.reset();
1005  return; // search error
1006  }
1007  }
1008  if (m_p->m_restart && !session_restarted && new_set->m_result_set_size < 0)
1009  {
1010  package.log("session_shared", YLOG_LOG, "restart");
1011  bc->remove_backend(found_backend);
1012  session_restarted = true;
1013  found_backend.reset();
1014  goto restart;
1015  }
1016  found_backend->m_sets.push_back(found_set);
1017 }
1018 
1019 int yf::SessionShared::Frontend::result_set_ref(ODR o,
1020  const Databases &databases,
1021  Z_RPNStructure *s,
1022  std::string &rset)
1023 {
1024  int ret = 0;
1025  switch (s->which)
1026  {
1027  case Z_RPNStructure_simple:
1028  if (s->u.simple->which == Z_Operand_resultSetId)
1029  {
1030  const char *id = s->u.simple->u.resultSetId;
1031  rset = id;
1032 
1033  FrontendSets::iterator fset_it = m_frontend_sets.find(id);
1034  if (fset_it == m_frontend_sets.end())
1035  {
1036  ret = YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST;
1037  }
1038  else if (!mp::util::match(fset_it->second->get_databases(),
1039  databases))
1040  {
1041  ret = YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST;
1042  }
1043  else
1044  {
1045  yazpp_1::Yaz_Z_Query query = fset_it->second->get_query();
1046  Z_Query *q = yaz_copy_Z_Query(query.get_Z_Query(), o);
1047  if (q->which == Z_Query_type_1 || q->which == Z_Query_type_101)
1048  {
1049  s->which = q->u.type_1->RPNStructure->which;
1050  s->u.simple = q->u.type_1->RPNStructure->u.simple;
1051  }
1052  else
1053  {
1054  ret = YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST;
1055  }
1056  }
1057  }
1058  break;
1059  case Z_RPNStructure_complex:
1060  ret = result_set_ref(o, databases, s->u.complex->s1, rset);
1061  if (!ret)
1062  ret = result_set_ref(o, databases, s->u.complex->s2, rset);
1063  break;
1064  }
1065  return ret;
1066 }
1067 
1068 void yf::SessionShared::Frontend::search(mp::Package &package,
1069  Z_APDU *apdu_req)
1070 {
1071  Z_SearchRequest *req = apdu_req->u.searchRequest;
1072  FrontendSets::iterator fset_it =
1073  m_frontend_sets.find(req->resultSetName);
1074  if (fset_it != m_frontend_sets.end())
1075  {
1076  // result set already exist
1077  // if replace indicator is off: we return diagnostic if
1078  // result set already exist.
1079  if (*req->replaceIndicator == 0)
1080  {
1081  mp::odr odr;
1082  Z_APDU *apdu =
1083  odr.create_searchResponse(
1084  apdu_req,
1085  YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
1086  0);
1087  package.response() = apdu;
1088  return;
1089  }
1090  m_frontend_sets.erase(fset_it);
1091  }
1092 
1093  Databases databases;
1094  int i;
1095  for (i = 0; i < req->num_databaseNames; i++)
1096  databases.push_back(req->databaseNames[i]);
1097 
1098 
1099  yazpp_1::Yaz_Z_Query query;
1100  query.set_Z_Query(req->query);
1101 
1102  Z_Query *q = query.get_Z_Query();
1103  if (q->which == Z_Query_type_1 || q->which == Z_Query_type_101)
1104  {
1105  mp::odr odr;
1106  std::string rset;
1107  int diag = result_set_ref(odr, databases, q->u.type_1->RPNStructure,
1108  rset);
1109  if (diag)
1110  {
1111  Z_APDU *apdu =
1112  odr.create_searchResponse(
1113  apdu_req,
1114  diag,
1115  rset.c_str());
1116  package.response() = apdu;
1117  return;
1118  }
1119  query.set_Z_Query(q);
1120  }
1121 
1122  BackendSetPtr found_set; // null
1123  BackendInstancePtr found_backend; // null
1124 
1125  get_set(package, apdu_req, databases, query, found_backend, found_set);
1126  if (!found_set)
1127  return;
1128 
1129  mp::odr odr;
1130  Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
1131  Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
1132  *f_resp->resultCount = found_set->m_result_set_size;
1133  f_resp->additionalSearchInfo = found_set->additionalSearchInfoResponse;
1134  package.response() = f_apdu;
1135 
1136  FrontendSetPtr fset(new FrontendSet(databases, query));
1137  m_frontend_sets[req->resultSetName] = fset;
1138 
1139  m_backend_class->release_backend(found_backend);
1140 }
1141 
1142 void yf::SessionShared::Frontend::present(mp::Package &package,
1143  Z_APDU *apdu_req)
1144 {
1145  mp::odr odr;
1146  Z_PresentRequest *req = apdu_req->u.presentRequest;
1147 
1148  FrontendSets::iterator fset_it =
1149  m_frontend_sets.find(req->resultSetId);
1150 
1151  if (fset_it == m_frontend_sets.end())
1152  {
1153  Z_APDU *apdu =
1154  odr.create_presentResponse(
1155  apdu_req,
1156  YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
1157  req->resultSetId);
1158  package.response() = apdu;
1159  return;
1160  }
1161  FrontendSetPtr fset = fset_it->second;
1162 
1163  Databases databases = fset->get_databases();
1164  yazpp_1::Yaz_Z_Query query = fset->get_query();
1165 
1166  BackendClassPtr bc = m_backend_class;
1167  BackendSetPtr found_set; // null
1168  BackendInstancePtr found_backend;
1169 
1170  get_set(package, apdu_req, databases, query, found_backend, found_set);
1171  if (!found_set)
1172  return;
1173 
1174  Z_NamePlusRecordList *npr_res = 0;
1175  // record_cache.lookup types are int's. Avoid non-fitting values
1176  if (*req->resultSetStartPoint > 0
1177  && *req->resultSetStartPoint < INT_MAX
1178  && *req->numberOfRecordsRequested > 0
1179  && *req->numberOfRecordsRequested < INT_MAX
1180  && found_set->m_record_cache.lookup(odr, &npr_res,
1181  *req->resultSetStartPoint,
1182  *req->numberOfRecordsRequested,
1183  req->preferredRecordSyntax,
1184  req->recordComposition))
1185  {
1186  Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
1187  Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
1188 
1189  yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF
1190  " records in cache %p",
1191  *req->resultSetStartPoint,
1192  *req->numberOfRecordsRequested,
1193  &found_set->m_record_cache);
1194 
1195  *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested;
1196  *f_resp->nextResultSetPosition =
1197  *req->resultSetStartPoint + *req->numberOfRecordsRequested;
1198  // f_resp->presentStatus assumed OK.
1199  f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
1200  f_resp->records->which = Z_Records_DBOSD;
1201  f_resp->records->u.databaseOrSurDiagnostics = npr_res;
1202  package.response() = f_apdu_res;
1203  bc->release_backend(found_backend);
1204  return;
1205  }
1206 
1207  found_backend->timestamp();
1208 
1209  Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
1210  Z_PresentRequest *p_req = p_apdu->u.presentRequest;
1211  p_req->preferredRecordSyntax = req->preferredRecordSyntax;
1212  p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
1213  *p_req->resultSetStartPoint = *req->resultSetStartPoint;
1214  *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
1215  p_req->preferredRecordSyntax = req->preferredRecordSyntax;
1216  p_req->recordComposition = req->recordComposition;
1217 
1218  Package present_package(found_backend->m_session, package.origin());
1219  present_package.copy_filter(package);
1220 
1221  present_package.request() = p_apdu;
1222 
1223  present_package.move();
1224 
1225  Z_GDU *gdu = present_package.response().get();
1226  if (!present_package.session().is_closed()
1227  && gdu && gdu->which == Z_GDU_Z3950
1228  && gdu->u.z3950->which == Z_APDU_presentResponse)
1229  {
1230  Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
1231  Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
1232  Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
1233 
1234  f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
1235  f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
1236  f_resp->presentStatus= b_resp->presentStatus;
1237  f_resp->records = b_resp->records;
1238  f_resp->otherInfo = b_resp->otherInfo;
1239  package.response() = f_apdu_res;
1240 
1241  if (b_resp->records && b_resp->records->which == Z_Records_DBOSD)
1242  {
1243  Z_NamePlusRecordList *npr =
1244  b_resp->records->u.databaseOrSurDiagnostics;
1245  // record_cache.add types are int's. Avoid non-fitting values
1246  if (*req->resultSetStartPoint > 0
1247  && npr->num_records + *req->resultSetStartPoint < INT_MAX)
1248  {
1249 #if 0
1250  yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF
1251  " records to cache %p",
1252  *req->resultSetStartPoint,
1253  *f_resp->numberOfRecordsReturned,
1254  &found_set->m_record_cache);
1255 #endif
1256  found_set->m_record_cache.add(
1257  odr, npr, *req->resultSetStartPoint,
1258  p_req->recordComposition);
1259  }
1260  }
1261  bc->release_backend(found_backend);
1262  }
1263  else
1264  {
1265  bc->remove_backend(found_backend);
1266  Z_APDU *f_apdu_res =
1267  odr.create_presentResponse(
1268  apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1269  "session_shared: target closed connection during present");
1270  package.response() = f_apdu_res;
1271  }
1272 }
1273 
1274 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
1275  Z_APDU *apdu_req)
1276 {
1277  BackendClassPtr bc = m_backend_class;
1278  int code;
1279  std::string addinfo;
1280  BackendInstancePtr backend = bc->get_backend(frontend_package,
1281  code, addinfo);
1282  if (!backend)
1283  {
1284  mp::odr odr;
1285  if (code == 0)
1286  code = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
1287  if (addinfo.length() == 0)
1288  addinfo = "session_shared: could not create backend";
1289  Z_APDU *apdu = odr.create_scanResponse(
1290  apdu_req, code, addinfo.c_str());
1291  frontend_package.response() = apdu;
1292  }
1293  else
1294  {
1295  Package scan_package(backend->m_session, frontend_package.origin());
1296  backend->timestamp();
1297  scan_package.copy_filter(frontend_package);
1298  scan_package.request() = apdu_req;
1299  scan_package.move();
1300  frontend_package.response() = scan_package.response();
1301  if (scan_package.session().is_closed())
1302  {
1303  frontend_package.session().close();
1304  bc->remove_backend(backend);
1305  }
1306  else
1307  bc->release_backend(backend);
1308  }
1309 }
1310 
1311 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
1312 {
1313 }
1314 
1315 void yf::SessionShared::Worker::operator() (void)
1316 {
1317  m_p->expire();
1318 }
1319 
1320 bool yf::SessionShared::BackendClass::expire_instances()
1321 {
1322  time_t now;
1323  time(&now);
1324  boost::mutex::scoped_lock lock(m_mutex_backend_class);
1325  BackendInstanceList::iterator bit = m_backend_list.begin();
1326  while (bit != m_backend_list.end())
1327  {
1328  time_t last_use = (*bit)->m_time_last_use;
1329 
1330  if ((*bit)->m_in_use)
1331  {
1332  yaz_log(YLOG_LOG, "session_shared id=%ld in_use",
1333  (*bit)->m_session.id());
1334  bit++;
1335  }
1336  else if (now < last_use || now - last_use > m_backend_expiry_ttl)
1337  {
1338  yaz_log(YLOG_LOG, "session_shared id=%ld erase",
1339  (*bit)->m_session.id());
1340  bit = m_backend_list.erase(bit);
1341  }
1342  else
1343  {
1344  yaz_log(YLOG_LOG, "session_shared id=%ld skip now-last_use=%ld",
1345  (*bit)->m_session.id(), (long) (now - last_use));
1346  bit++;
1347  }
1348  }
1349  if (m_backend_list.empty())
1350  return true;
1351  return false;
1352 }
1353 
1354 void yf::SessionShared::Rep::expire_classes()
1355 {
1356  boost::mutex::scoped_lock lock(m_mutex_backend_map);
1357  BackendClassMap::iterator b_it = m_backend_map.begin();
1358  while (b_it != m_backend_map.end())
1359  {
1360  if (b_it->second->expire_instances())
1361  {
1362  m_backend_map.erase(b_it);
1363  b_it = m_backend_map.begin();
1364  }
1365  else
1366  b_it++;
1367  }
1368 }
1369 
1370 void yf::SessionShared::Rep::expire()
1371 {
1372  while (true)
1373  {
1374  boost::xtime xt;
1375  boost::xtime_get(&xt,
1376 #if BOOST_VERSION >= 105000
1377  boost::TIME_UTC_
1378 #else
1379  boost::TIME_UTC
1380 #endif
1381  );
1382  xt.sec += m_session_ttl;
1383  {
1384  boost::mutex::scoped_lock lock(m_mutex);
1385  m_cond_expire_ready.timed_wait(lock, xt);
1386  if (close_down)
1387  break;
1388  }
1389  stat();
1390  expire_classes();
1391  }
1392 }
1393 
1394 yf::SessionShared::Rep::Rep()
1395 {
1396  m_resultset_ttl = 30;
1397  m_resultset_max = 10;
1398  m_session_ttl = 90;
1399  m_optimize_search = true;
1400  m_ignore_auth = false;
1401  m_restart = false;
1402  m_session_max = 100;
1403  m_preferredMessageSize = 0;
1404  m_maximumRecordSize = 0;
1405  close_down = false;
1406 }
1407 
1408 yf::SessionShared::Rep::~Rep()
1409 {
1410  {
1411  boost::mutex::scoped_lock lock(m_mutex);
1412  close_down = true;
1413  m_cond_expire_ready.notify_all();
1414  }
1415  m_thrds.join_all();
1416 }
1417 
1418 void yf::SessionShared::Rep::start()
1419 {
1420  yf::SessionShared::Worker w(this);
1421  m_thrds.add_thread(new boost::thread(w));
1422 }
1423 
1424 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
1425 {
1426 }
1427 
1428 yf::SessionShared::~SessionShared() {
1429 }
1430 
1431 void yf::SessionShared::start() const
1432 {
1433  m_p->start();
1434 }
1435 
1436 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
1437 {
1438 }
1439 
1440 yf::SessionShared::Frontend::~Frontend()
1441 {
1442 }
1443 
1444 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
1445 {
1446  boost::mutex::scoped_lock lock(m_mutex);
1447 
1448  std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1449 
1450  while(true)
1451  {
1452  it = m_clients.find(package.session());
1453  if (it == m_clients.end())
1454  break;
1455 
1456  if (!it->second->m_in_use)
1457  {
1458  it->second->m_in_use = true;
1459  return it->second;
1460  }
1461  m_cond_session_ready.wait(lock);
1462  }
1463  FrontendPtr f(new Frontend(this));
1464  m_clients[package.session()] = f;
1465  f->m_in_use = true;
1466  return f;
1467 }
1468 
1469 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
1470 {
1471  boost::mutex::scoped_lock lock(m_mutex);
1472  std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1473 
1474  it = m_clients.find(package.session());
1475  if (it != m_clients.end())
1476  {
1477  if (package.session().is_closed())
1478  {
1479  m_clients.erase(it);
1480  }
1481  else
1482  {
1483  it->second->m_in_use = false;
1484  }
1485  m_cond_session_ready.notify_all();
1486  }
1487 }
1488 
1489 
1490 void yf::SessionShared::process(mp::Package &package) const
1491 {
1492  FrontendPtr f = m_p->get_frontend(package);
1493 
1494  Z_GDU *gdu = package.request().get();
1495 
1496  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1497  Z_APDU_initRequest && !f->m_is_virtual)
1498  {
1499  m_p->init(package, gdu, f);
1500  }
1501  else if (!f->m_is_virtual)
1502  package.move();
1503  else if (gdu && gdu->which == Z_GDU_Z3950)
1504  {
1505  Z_APDU *apdu = gdu->u.z3950;
1506  if (apdu->which == Z_APDU_initRequest)
1507  {
1508  mp::odr odr;
1509 
1510  package.response() = odr.create_close(
1511  apdu,
1512  Z_Close_protocolError,
1513  "double init");
1514 
1515  package.session().close();
1516  }
1517  else if (apdu->which == Z_APDU_close)
1518  {
1519  mp::odr odr;
1520 
1521  package.response() = odr.create_close(
1522  apdu,
1523  Z_Close_peerAbort, "received close from client");
1524  package.session().close();
1525  }
1526  else if (apdu->which == Z_APDU_searchRequest)
1527  {
1528  f->search(package, apdu);
1529  }
1530  else if (apdu->which == Z_APDU_presentRequest)
1531  {
1532  f->present(package, apdu);
1533  }
1534  else if (apdu->which == Z_APDU_scanRequest)
1535  {
1536  f->scan(package, apdu);
1537  }
1538  else
1539  {
1540  mp::odr odr;
1541 
1542  package.response() = odr.create_close(
1543  apdu, Z_Close_protocolError,
1544  "unsupported APDU in filter_session_shared");
1545 
1546  package.session().close();
1547  }
1548  }
1549  m_p->release_frontend(package);
1550 }
1551 
1552 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only,
1553  const char *path)
1554 {
1555  for (ptr = ptr->children; ptr; ptr = ptr->next)
1556  {
1557  if (ptr->type != XML_ELEMENT_NODE)
1558  continue;
1559  if (!strcmp((const char *) ptr->name, "resultset"))
1560  {
1561  const struct _xmlAttr *attr;
1562  for (attr = ptr->properties; attr; attr = attr->next)
1563  {
1564  if (!strcmp((const char *) attr->name, "ttl"))
1565  m_p->m_resultset_ttl =
1566  mp::xml::get_int(attr->children, 30);
1567  else if (!strcmp((const char *) attr->name, "max"))
1568  {
1569  m_p->m_resultset_max =
1570  mp::xml::get_int(attr->children, 10);
1571  }
1572  else if (!strcmp((const char *) attr->name, "optimizesearch"))
1573  {
1574  m_p->m_optimize_search =
1575  mp::xml::get_bool(attr->children, true);
1576  }
1577  else if (!strcmp((const char *) attr->name, "restart"))
1578  {
1579  m_p->m_restart = mp::xml::get_bool(attr->children, true);
1580  }
1581  else
1582  throw mp::filter::FilterException(
1583  "Bad attribute " + std::string((const char *)
1584  attr->name));
1585  }
1586  }
1587  else if (!strcmp((const char *) ptr->name, "session"))
1588  {
1589  const struct _xmlAttr *attr;
1590  for (attr = ptr->properties; attr; attr = attr->next)
1591  {
1592  if (!strcmp((const char *) attr->name, "ttl"))
1593  m_p->m_session_ttl =
1594  mp::xml::get_int(attr->children, 90);
1595  else if (!strcmp((const char *) attr->name, "max"))
1596  m_p->m_session_max =
1597  mp::xml::get_int(attr->children, 100);
1598  else
1599  throw mp::filter::FilterException(
1600  "Bad attribute " + std::string((const char *)
1601  attr->name));
1602  }
1603  }
1604  else if (!strcmp((const char *) ptr->name, "init"))
1605  {
1606  const struct _xmlAttr *attr;
1607  for (attr = ptr->properties; attr; attr = attr->next)
1608  {
1609  if (!strcmp((const char *) attr->name, "maximum-record-size"))
1610  m_p->m_maximumRecordSize =
1611  mp::xml::get_int(attr->children, 0);
1612  else if (!strcmp((const char *) attr->name,
1613  "preferred-message-size"))
1614  m_p->m_preferredMessageSize =
1615  mp::xml::get_int(attr->children, 0);
1616  else if (!strcmp((const char *) attr->name,
1617  "ignore-auth"))
1618  m_p->m_ignore_auth =
1619  mp::xml::get_bool(attr->children, false);
1620  else
1621  throw mp::filter::FilterException(
1622  "Bad attribute " + std::string((const char *)
1623  attr->name));
1624  }
1625  }
1626  else
1627  {
1628  throw mp::filter::FilterException("Bad element "
1629  + std::string((const char *)
1630  ptr->name));
1631  }
1632  }
1633 }
1634 
1635 static mp::filter::Base* filter_creator()
1636 {
1637  return new mp::filter::SessionShared;
1638 }
1639 
1640 extern "C" {
1641  struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1642  0,
1643  "session_shared",
1645  };
1646 }
1647 
1648 /*
1649  * Local variables:
1650  * c-basic-offset: 4
1651  * c-file-style: "Stroustrup"
1652  * indent-tabs-mode: nil
1653  * End:
1654  * vim: shiftwidth=4 tabstop=8 expandtab
1655  */
1656 
BackendClass(const yazpp_1::GDU &init_request, int resultset_ttl, int resultset_max, int session_ttl, Odr_int preferredRecordSize, Odr_int maximumRecordSize)
BackendInstancePtr get_backend(const Package &package, int &code, std::string &addinfo)
BackendInstancePtr create_backend(const Package &package, int &code, std::string &addinfo)
BackendSet(const std::string &result_set_id, const Databases &databases, const yazpp_1::Yaz_Z_Query &query, Z_OtherInformation *additionalSearchInfoRequest)
bool search(Package &frontend_package, Package &search_package, const Z_APDU *apdu_req, const BackendInstancePtr bp, Z_Records **z_records)
bool operator<(const SessionShared::InitKey &k) const
void init(Package &package, const Z_GDU *gdu, FrontendPtr frontend)
std::map< mp::Session, FrontendPtr > m_clients
std::list< BackendSetPtr > BackendSetList
boost::shared_ptr< BackendInstance > BackendInstancePtr
boost::shared_ptr< BackendSet > BackendSetPtr
boost::shared_ptr< BackendClass > BackendClassPtr
std::list< BackendInstancePtr > BackendInstanceList
std::map< InitKey, BackendClassPtr > BackendClassMap
boost::shared_ptr< FrontendSet > FrontendSetPtr
boost::shared_ptr< Frontend > FrontendPtr
std::map< std::string, FrontendSetPtr > FrontendSets
static mp::filter::Base * filter_creator()
static int get_diagnostic(Z_DefaultDiagFormat *r)
struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared
void search(Package &package, Z_APDU *apdu)
void override_set(BackendInstancePtr &found_backend, std::string &result_set_id, const Databases &databases, bool out_of_sessions)
int result_set_ref(ODR o, const Databases &databases, Z_RPNStructure *s, std::string &rset)
void present(Package &package, Z_APDU *apdu)
void get_set(mp::Package &package, const Z_APDU *apdu_req, const Databases &databases, yazpp_1::Yaz_Z_Query &query, BackendInstancePtr &found_backend, BackendSetPtr &found_set)