metaproxy  1.13.0
filter_session_shared.cpp
Go to the documentation of this file.
1 /* This file is part of Metaproxy.
2  Copyright (C) Index Data
3 
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8 
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 for more details.
13 
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "config.hpp"
20 
21 #include <metaproxy/filter.hpp>
22 #include <metaproxy/package.hpp>
23 
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30 
31 #include <metaproxy/util.hpp>
33 
34 #include <yaz/copy_types.h>
35 #include <yaz/log.h>
36 #include <yaz/zgdu.h>
37 #include <yaz/otherinfo.h>
38 #include <yaz/diagbib1.h>
39 #include <yazpp/z-query.h>
40 #include <yazpp/record-cache.h>
41 #include <map>
42 #include <iostream>
43 #include <time.h>
44 #include <limits.h>
45 
46 namespace mp = metaproxy_1;
47 namespace yf = metaproxy_1::filter;
48 
49 namespace metaproxy_1 {
50 
51  namespace filter {
52  // key for session.. We'll only share sessions with same InitKey
54  public:
55  bool operator < (const SessionShared::InitKey &k) const;
56  InitKey(Z_InitRequest *req);
57  InitKey(const InitKey &);
58  ~InitKey();
59  private:
64  ODR m_odr;
65  };
66  // worker thread .. for expiry of sessions
68  public:
70  void operator() (void);
71  private:
73  };
74  // backend result set
76  public:
77  std::string m_result_set_id;
80  yazpp_1::Yaz_Z_Query m_query;
82  void timestamp();
83  yazpp_1::RecordCache m_record_cache;
84 
85  Z_OtherInformation *additionalSearchInfoRequest;
86  Z_OtherInformation *additionalSearchInfoResponse;
88  BackendSet(
89  const std::string &result_set_id,
90  const Databases &databases,
91  const yazpp_1::Yaz_Z_Query &query,
92  Z_OtherInformation *additionalSearchInfoRequest);
93  ~BackendSet();
94  bool search(
95  Package &frontend_package,
96  Package &search_package,
97  const Z_APDU *apdu_req,
98  const BackendInstancePtr bp,
99  Z_Records **z_records);
100  };
101  // backend connection instance
103  friend class Rep;
104  friend class BackendClass;
105  friend class BackendSet;
106  public:
107  mp::Session m_session;
109  bool m_in_use;
113  mp::Package * m_close_package;
114  ~BackendInstance();
115  void timestamp();
116  };
117  // backends of some class (all with same InitKey)
118  class SessionShared::BackendClass : boost::noncopyable {
119  friend class Rep;
120  friend struct Frontend;
123  BackendInstancePtr create_backend(const Package &package,
124  int &code, std::string &addinfo);
125  void remove_backend(BackendInstancePtr b);
126  BackendInstancePtr get_backend(const Package &package,
127  int &code, std::string &addinfo);
128  void use_backend(BackendInstancePtr b);
129  void release_backend(BackendInstancePtr b);
130  bool expire_instances();
131  yazpp_1::GDU m_init_request;
132  yazpp_1::GDU m_init_response;
133  boost::mutex m_mutex_backend_class;
134  boost::condition m_cond_set_ready;
144  public:
145  BackendClass(const yazpp_1::GDU &init_request,
146  int resultset_ttl,
147  int resultset_max,
148  int session_ttl,
149  Odr_int preferredRecordSize,
150  Odr_int maximumRecordSize);
151  ~BackendClass();
152  };
153  // frontend result set
156  yazpp_1::Yaz_Z_Query m_query;
157  public:
158  const Databases &get_databases();
159  const yazpp_1::Yaz_Z_Query &get_query();
160  FrontendSet(
161  const Databases &databases,
162  const yazpp_1::Yaz_Z_Query &query);
163  FrontendSet();
164  };
165  // frontend session
167  Frontend(Rep *rep);
168  ~Frontend();
170  bool m_in_use;
171  Z_Options m_init_options;
172  void search(Package &package, Z_APDU *apdu);
173  void present(Package &package, Z_APDU *apdu);
174  void scan(Package &package, Z_APDU *apdu);
175 
176  int result_set_ref(ODR o,
177  const Databases &databases,
178  Z_RPNStructure *s, std::string &rset);
179  void get_set(mp::Package &package,
180  const Z_APDU *apdu_req,
181  const Databases &databases,
182  yazpp_1::Yaz_Z_Query &query,
183  BackendInstancePtr &found_backend,
184  BackendSetPtr &found_set);
185  void override_set(BackendInstancePtr &found_backend,
186  std::string &result_set_id,
187  const Databases &databases,
188  bool out_of_sessions);
189 
193  };
194  // representation
196  friend class SessionShared;
197  friend struct Frontend;
198 
199  FrontendPtr get_frontend(Package &package);
200  void release_frontend(Package &package);
201  Rep();
202  public:
203  ~Rep();
204  void expire();
205  private:
206  void expire_classes();
207  void stat();
208  void init(Package &package, const Z_GDU *gdu,
209  FrontendPtr frontend);
210  void start();
211  boost::mutex m_mutex;
212  boost::condition m_cond_session_ready;
213  boost::condition m_cond_expire_ready;
214  std::map<mp::Session, FrontendPtr> m_clients;
215 
217  boost::mutex m_mutex_backend_map;
218  boost::thread_group m_thrds;
223  bool m_restart;
228  };
229  }
230 }
231 
232 yf::SessionShared::FrontendSet::FrontendSet(
233  const Databases &databases,
234  const yazpp_1::Yaz_Z_Query &query)
235  : m_databases(databases), m_query(query)
236 {
237 }
238 
239 const yf::SessionShared::Databases &
240 yf::SessionShared::FrontendSet::get_databases()
241 {
242  return m_databases;
243 }
244 
245 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
246 {
247  return m_query;
248 }
249 
250 yf::SessionShared::InitKey::InitKey(const InitKey &k)
251 {
252  m_odr = odr_createmem(ODR_ENCODE);
253 
254  m_idAuthentication_size = k.m_idAuthentication_size;
255  m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
256  memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
257  m_idAuthentication_size);
258 
259  m_otherInfo_size = k.m_otherInfo_size;
260  m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
261  memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
262  m_otherInfo_size);
263 }
264 
265 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
266 {
267  m_odr = odr_createmem(ODR_ENCODE);
268 
269  Z_IdAuthentication *t = req->idAuthentication;
270  z_IdAuthentication(m_odr, &t, 1, 0);
271  m_idAuthentication_buf =
272  odr_getbuf(m_odr, &m_idAuthentication_size, 0);
273 
274  Z_OtherInformation *o = req->otherInfo;
275  z_OtherInformation(m_odr, &o, 1, 0);
276  m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
277 }
278 
279 yf::SessionShared::InitKey::~InitKey()
280 {
281  odr_destroy(m_odr);
282 }
283 
284 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
285  const
286 {
287  int c;
288  c = mp::util::memcmp2(
289  (void*) m_idAuthentication_buf, m_idAuthentication_size,
291  if (c < 0)
292  return true;
293  else if (c > 0)
294  return false;
295 
296  c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
297  (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
298  if (c < 0)
299  return true;
300  else if (c > 0)
301  return false;
302  return false;
303 }
304 
305 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
306 {
307  boost::mutex::scoped_lock lock(m_mutex_backend_class);
308  m_cond_set_ready.notify_all();
309  b->m_in_use = false;
310 }
311 
312 
313 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
314 {
315  {
316  boost::mutex::scoped_lock lock(m_mutex_backend_class);
317  BackendInstanceList::iterator it = m_backend_list.begin();
318  for (;;)
319  {
320  if (it == m_backend_list.end())
321  return;
322  if (*it == b)
323  {
324  it = m_backend_list.erase(it);
325  break;
326  }
327  it++;
328  }
329  }
330  mp::odr odr;
331  b->m_close_package->response() = odr.create_close(
332  0, Z_Close_lackOfActivity, 0);
333  b->m_close_package->session().close();
334  b->m_close_package->move();
335 }
336 
337 
338 yf::SessionShared::BackendInstancePtr
339 yf::SessionShared::BackendClass::get_backend(
340  const mp::Package &frontend_package,
341  int &code, std::string &addinfo)
342 {
343  {
344  boost::mutex::scoped_lock lock(m_mutex_backend_class);
345 
346  BackendInstanceList::const_iterator it = m_backend_list.begin();
347 
348  BackendInstancePtr backend1; // null
349 
350  for (; it != m_backend_list.end(); it++)
351  {
352  if (!(*it)->m_in_use)
353  {
354  if (!backend1
355  || (*it)->m_sequence_this < backend1->m_sequence_this)
356  backend1 = *it;
357  }
358  }
359  if (backend1)
360  {
361  use_backend(backend1);
362  return backend1;
363  }
364  }
365  return create_backend(frontend_package, code, addinfo);
366 }
367 
368 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
369 {
370  backend->m_in_use = true;
371  backend->m_sequence_this = m_sequence_top++;
372 }
373 
374 void yf::SessionShared::BackendInstance::timestamp()
375 {
376  assert(m_in_use);
377  time(&m_time_last_use);
378 }
379 
380 yf::SessionShared::BackendInstance::~BackendInstance()
381 {
382  if (m_close_package)
383  {
384  mp::odr odr;
385  m_close_package->response() = odr.create_close(
386  0, Z_Close_lackOfActivity, 0);
387  m_close_package->session().close();
388  m_close_package->move();
389  }
390  delete m_close_package;
391 }
392 
393 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
394  const mp::Package &frontend_package,
395  int &code, std::string &addinfo)
396 {
397  BackendInstancePtr null;
399  bp->m_close_package =
400  new mp::Package(bp->m_session, frontend_package.origin());
401  bp->m_close_package->copy_filter(frontend_package);
402 
403  Package init_package(bp->m_session, frontend_package.origin());
404 
405  init_package.copy_filter(frontend_package);
406 
407  yazpp_1::GDU actual_init_request = m_init_request;
408  Z_GDU *init_pdu = actual_init_request.get();
409 
410  assert(init_pdu->which == Z_GDU_Z3950);
411  assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
412 
413  Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
414  ODR_MASK_ZERO(req->options);
415 
416  ODR_MASK_SET(req->options, Z_Options_search);
417  ODR_MASK_SET(req->options, Z_Options_present);
418  ODR_MASK_SET(req->options, Z_Options_namedResultSets);
419  ODR_MASK_SET(req->options, Z_Options_scan);
420 
421  ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
422  ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
423  ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
424 
426  *req->preferredMessageSize = m_preferredMessageSize;
428  *req->maximumRecordSize = m_maximumRecordSize;
429 
430  init_package.request() = init_pdu;
431 
432  {
433  boost::mutex::scoped_lock lock(m_mutex_backend_class);
434  m_no_init++;
435  }
436 
437  init_package.move();
438 
439  boost::mutex::scoped_lock lock(m_mutex_backend_class);
440 
441  addinfo.clear();
442  code = 0;
443  m_named_result_sets = false;
444  Z_GDU *gdu = init_package.response().get();
445 
446  if (gdu && gdu->which == Z_GDU_Z3950
447  && gdu->u.z3950->which == Z_APDU_initResponse)
448  {
449  Z_InitResponse *res = gdu->u.z3950->u.initResponse;
450 
451  if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
452  {
453  m_named_result_sets = true;
454  }
455  // save init-response until we get one that succeeds
456  if (m_no_succeeded == 0)
457  m_init_response = gdu->u.z3950;
458  if (*gdu->u.z3950->u.initResponse->result
459  && !init_package.session().is_closed())
460  {
461  bp->m_in_use = true;
462  time(&bp->m_time_last_use);
463  bp->m_sequence_this = 0;
464  bp->m_result_set_sequence = 0;
465  m_backend_list.push_back(bp);
466  m_no_succeeded++;
467  return bp;
468  }
469  else
470  {
471  Z_External *uif =
472  gdu->u.z3950->u.initResponse->userInformationField;
473  if (uif && uif->which == Z_External_userInfo1)
474  {
475  Z_OtherInformation *ui = uif->u.userInfo1;
476  if (ui && ui->num_elements >= 1)
477  {
478  Z_OtherInformationUnit *unit = ui->list[0];
479  if (unit->which == Z_OtherInfo_externallyDefinedInfo &&
480  unit->information.externallyDefinedInfo &&
481  unit->information.externallyDefinedInfo->which ==
482  Z_External_diag1)
483  {
484  Z_DiagnosticFormat *diag =
485  unit->information.externallyDefinedInfo->u.diag1;
486  if (diag->num >= 1)
487  {
488  Z_DiagnosticFormat_s *ds = diag->elements[0];
489  if (ds->which ==
490  Z_DiagnosticFormat_s_defaultDiagRec)
491  {
492  Z_DefaultDiagFormat *e =
493  ds->u.defaultDiagRec;
494  code = *e->condition;
495  if (e->which == Z_DefaultDiagFormat_v2Addinfo
496  && e->u.v2Addinfo)
497  {
498  addinfo = e->u.v2Addinfo;
499  }
500  else if (
501  e->which == Z_DefaultDiagFormat_v3Addinfo
502  && e->u.v3Addinfo)
503  {
504  addinfo = e->u.v3Addinfo;
505  }
506  }
507  }
508  }
509  }
510  }
511  }
512  }
513  if (!init_package.session().is_closed())
514  {
515  init_package.copy_filter(frontend_package);
516  init_package.session().close();
517  init_package.move();
518  }
519  m_no_failed++;
520  m_cond_set_ready.notify_all();
521  return null;
522 }
523 
524 
525 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
526  int resultset_ttl,
527  int resultset_max,
528  int session_ttl,
529  Odr_int preferredMessageSize,
530  Odr_int maximumRecordSize)
531  : m_named_result_sets(false), m_init_request(init_request),
532  m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
533  m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max),
534  m_preferredMessageSize(preferredMessageSize),
535  m_maximumRecordSize(maximumRecordSize),
536  m_no_failed(0), m_no_succeeded(0), m_no_init(0)
537 {}
538 
539 yf::SessionShared::BackendClass::~BackendClass()
540 {}
541 
542 void yf::SessionShared::Rep::stat()
543 {
544  int no_classes = 0;
545  int no_instances = 0;
546  BackendClassMap::const_iterator it;
547  {
548  boost::mutex::scoped_lock lock(m_mutex_backend_map);
549  for (it = m_backend_map.begin(); it != m_backend_map.end(); it++)
550  {
551  BackendClassPtr bc = it->second;
552  no_classes++;
553  BackendInstanceList::iterator bit = bc->m_backend_list.begin();
554  for (; bit != bc->m_backend_list.end(); bit++)
555  no_instances++;
556  }
557  }
558 }
559 
560 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
561  FrontendPtr frontend)
562 {
563  Z_InitRequest *req = gdu->u.z3950->u.initRequest;
564 
565  frontend->m_is_virtual = true;
566  frontend->m_init_options = *req->options;
567  InitKey k(req);
568  {
569  boost::mutex::scoped_lock lock(m_mutex_backend_map);
570  BackendClassMap::const_iterator it;
571  it = m_backend_map.find(k);
572  if (it == m_backend_map.end())
573  {
574  BackendClassPtr b(new BackendClass(gdu->u.z3950,
580  m_backend_map[k] = b;
581  frontend->m_backend_class = b;
582  }
583  else
584  {
585  frontend->m_backend_class = it->second;
586  }
587  }
588  BackendClassPtr bc = frontend->m_backend_class;
589  mp::odr odr;
590 
591  // we only need to get init response from "first" target in
592  // backend class - the assumption being that init response is
593  // same for all
594  bool create_first_one = false;
595  {
596  boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
597  if (!bc->m_no_succeeded)
598  create_first_one = true;
599  else
600  {
601  // wait for first one to finish
602  while (!bc->m_no_failed && !bc->m_no_succeeded && bc->m_no_init)
603  {
604  bc->m_cond_set_ready.wait(lock);
605  }
606  }
607  }
608  if (create_first_one)
609  {
610 
611  int code;
612  std::string addinfo;
613  BackendInstancePtr backend = bc->create_backend(package, code, addinfo);
614  if (backend)
615  bc->release_backend(backend);
616  }
617  yazpp_1::GDU init_response;
618  {
619  boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
620 
621  init_response = bc->m_init_response;
622  }
623 
624  if (init_response.get())
625  {
626  Z_GDU *response_gdu = init_response.get();
627  mp::util::transfer_referenceId(odr, gdu->u.z3950,
628  response_gdu->u.z3950);
629  Z_InitResponse *init_res = response_gdu->u.z3950->u.initResponse;
630  Z_Options *server_options = init_res->options;
631  Z_Options *client_options = &frontend->m_init_options;
632  int i;
633  for (i = 0; i < 30; i++)
634  if (!ODR_MASK_GET(client_options, i))
635  ODR_MASK_CLEAR(server_options, i);
636 
637  if (!m_preferredMessageSize ||
638  *init_res->preferredMessageSize > *req->preferredMessageSize)
639  *init_res->preferredMessageSize = *req->preferredMessageSize;
640 
641  if (!m_maximumRecordSize ||
642  *init_res->maximumRecordSize > *req->maximumRecordSize)
643  *init_res->maximumRecordSize = *req->maximumRecordSize;
644 
645  package.response() = init_response;
646  if (!*response_gdu->u.z3950->u.initResponse->result)
647  package.session().close();
648  }
649  else
650  {
651  Z_APDU *apdu =
652  odr.create_initResponse(
653  gdu->u.z3950, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
654  "session_shared: target closed connection during init");
655  *apdu->u.initResponse->result = 0;
656  package.response() = apdu;
657  package.session().close();
658  }
659 }
660 
661 void yf::SessionShared::BackendSet::timestamp()
662 {
663  time(&m_time_last_use);
664 }
665 
666 yf::SessionShared::BackendSet::BackendSet(
667  const std::string &result_set_id,
668  const Databases &databases,
669  const yazpp_1::Yaz_Z_Query &query,
670  Z_OtherInformation *additionalSearchInfo) :
671  m_result_set_id(result_set_id),
672  m_databases(databases), m_result_set_size(0), m_query(query)
673 {
674  timestamp();
675  mem_additionalSearchInfo = nmem_create();
676  additionalSearchInfoResponse = 0;
677  additionalSearchInfoRequest =
678  yaz_clone_z_OtherInformation(additionalSearchInfo,
679  mem_additionalSearchInfo);
680 }
681 
682 yf::SessionShared::BackendSet::~BackendSet()
683 {
684  nmem_destroy(mem_additionalSearchInfo);
685 }
686 
687 static int get_diagnostic(Z_DefaultDiagFormat *r)
688 {
689  return *r->condition;
690 }
691 
692 bool yf::SessionShared::BackendSet::search(
693  mp::Package &frontend_package,
694  mp::Package &search_package,
695  const Z_APDU *frontend_apdu,
696  const BackendInstancePtr bp,
697  Z_Records **z_records)
698 {
699  mp::odr odr;
700  Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
701  Z_SearchRequest *req = apdu_req->u.searchRequest;
702 
703  req->additionalSearchInfo = additionalSearchInfoRequest;
704  req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
705  req->query = m_query.get_Z_Query();
706 
707  req->num_databaseNames = m_databases.size();
708  req->databaseNames = (char**)
709  odr_malloc(odr, req->num_databaseNames * sizeof(char *));
710  Databases::const_iterator it = m_databases.begin();
711  size_t i = 0;
712  for (; it != m_databases.end(); it++)
713  req->databaseNames[i++] = odr_strdup(odr, it->c_str());
714 
715  if (frontend_apdu->which == Z_APDU_searchRequest)
716  req->preferredRecordSyntax =
717  frontend_apdu->u.searchRequest->preferredRecordSyntax;
718 
719  search_package.request() = apdu_req;
720 
721  search_package.move();
722 
723  Z_GDU *gdu = search_package.response().get();
724  if (!search_package.session().is_closed()
725  && gdu && gdu->which == Z_GDU_Z3950
726  && gdu->u.z3950->which == Z_APDU_searchResponse)
727  {
728  Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
729  *z_records = b_resp->records;
730  m_result_set_size = *b_resp->resultCount;
731 
732  additionalSearchInfoResponse = yaz_clone_z_OtherInformation(
733  b_resp->additionalSearchInfo, mem_additionalSearchInfo);
734  return true;
735  }
736  Z_APDU *f_apdu = 0;
737  const char *addinfo = "session_shared: "
738  "target closed connection during search";
739  if (frontend_apdu->which == Z_APDU_searchRequest)
740  f_apdu = odr.create_searchResponse(
741  frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
742  else if (frontend_apdu->which == Z_APDU_presentRequest)
743  f_apdu = odr.create_presentResponse(
744  frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
745  else
746  f_apdu = odr.create_close(
747  frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
748  frontend_package.response() = f_apdu;
749  return false;
750 }
751 
752 void yf::SessionShared::Frontend::override_set(
753  BackendInstancePtr &found_backend,
754  std::string &result_set_id,
755  const Databases &databases,
756  bool out_of_sessions)
757 {
758  BackendClassPtr bc = m_backend_class;
759  BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
760  time_t now;
761  time(&now);
762 
763  size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
764  for (; it != bc->m_backend_list.end(); it++)
765  {
766  if (!(*it)->m_in_use)
767  {
768  BackendSetList::iterator set_it = (*it)->m_sets.begin();
769  for (; set_it != (*it)->m_sets.end(); set_it++)
770  {
771  if ((max_sets > 1 || (*set_it)->m_databases == databases)
772  &&
773  (out_of_sessions ||
774  now < (*set_it)->m_time_last_use ||
775  now - (*set_it)->m_time_last_use >= bc->m_backend_set_ttl))
776  {
777  found_backend = *it;
778  result_set_id = (*set_it)->m_result_set_id;
779  found_backend->m_sets.erase(set_it);
780  return;
781  }
782  }
783  }
784  }
785  for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
786  {
787  if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
788  {
789  found_backend = *it;
790  if (bc->m_named_result_sets)
791  {
792  result_set_id = boost::io::str(
793  boost::format("%1%") %
794  found_backend->m_result_set_sequence);
795  found_backend->m_result_set_sequence++;
796  }
797  else
798  result_set_id = "default";
799  return;
800  }
801  }
802 }
803 
804 void yf::SessionShared::Frontend::get_set(mp::Package &package,
805  const Z_APDU *apdu_req,
806  const Databases &databases,
807  yazpp_1::Yaz_Z_Query &query,
808  BackendInstancePtr &found_backend,
809  BackendSetPtr &found_set)
810 {
811  bool session_restarted = false;
812  Z_OtherInformation *additionalSearchInfo = 0;
813 
814  if (apdu_req->which == Z_APDU_searchRequest)
815  additionalSearchInfo = apdu_req->u.searchRequest->additionalSearchInfo;
816 
817 restart:
818  std::string result_set_id;
819  bool out_of_sessions = false;
820  BackendClassPtr bc = m_backend_class;
821  {
822  boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
823 
824  if ((int) bc->m_backend_list.size() >= m_p->m_session_max)
825  out_of_sessions = true;
826 
827  if (m_p->m_optimize_search)
828  {
829  // look at each backend and see if we have a similar search
830  BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
831  while (it != bc->m_backend_list.end())
832  {
833  bool restart = false;
834  BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
835  for (; set_it != (*it)->m_sets.end(); set_it++)
836  {
837  // for real present request we don't care
838  // if additionalSearchInfo matches: same records
839  if ((*set_it)->m_databases == databases
840  && query.match(&(*set_it)->m_query)
841  && (apdu_req->which != Z_APDU_searchRequest ||
842  yaz_compare_z_OtherInformation(
843  additionalSearchInfo,
844  (*set_it)->additionalSearchInfoRequest)))
845  {
846  if ((*it)->m_in_use)
847  {
848  bc->m_cond_set_ready.wait(lock);
849  restart = true;
850  break;
851  }
852  else
853  {
854  found_set = *set_it;
855  found_backend = *it;
856  bc->use_backend(found_backend);
857  // found matching set. No need to search again
858  return;
859  }
860  }
861  }
862  if (restart)
863  it = bc->m_backend_list.begin();
864  else
865  it++;
866  }
867  }
868  override_set(found_backend, result_set_id, databases, out_of_sessions);
869  if (found_backend)
870  bc->use_backend(found_backend);
871  }
872  if (!found_backend)
873  {
874  int code;
875  std::string addinfo;
876  // create a new backend set (and new set) if we're not out of sessions
877  if (!out_of_sessions)
878  found_backend = bc->create_backend(package, code, addinfo);
879  if (!found_backend)
880  {
881  Z_APDU *f_apdu = 0;
882  mp::odr odr;
883 
884  if (out_of_sessions)
885  {
886  code = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
887  addinfo = "session_shared: all sessions in use";
888  }
889  else
890  {
891  if (code == 0)
892  code = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
893  if (addinfo.length() == 0)
894  addinfo = "session_shared: could not create backend";
895  }
896  if (apdu_req->which == Z_APDU_searchRequest)
897  {
898  f_apdu = odr.create_searchResponse(
899  apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo.c_str());
900  }
901  else if (apdu_req->which == Z_APDU_presentRequest)
902  {
903  f_apdu = odr.create_presentResponse(
904  apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo.c_str());
905  }
906  else
907  {
908  f_apdu = odr.create_close(
909  apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo.c_str());
910  }
911  package.response() = f_apdu;
912  return;
913  }
914  if (bc->m_named_result_sets)
915  {
916  result_set_id = boost::io::str(
917  boost::format("%1%") % found_backend->m_result_set_sequence);
918  found_backend->m_result_set_sequence++;
919  }
920  else
921  result_set_id = "default";
922  }
923  found_backend->timestamp();
924 
925  // we must search ...
926  BackendSetPtr new_set(new BackendSet(result_set_id,
927  databases, query,
928  additionalSearchInfo));
929 
930  found_set = new_set;
931  found_set->timestamp();
932 
933  Z_Records *z_records = 0;
934 
935  Package search_package(found_backend->m_session, package.origin());
936  search_package.copy_filter(package);
937 
938  if (!new_set->search(package, search_package,
939  apdu_req, found_backend, &z_records))
940  {
941  bc->remove_backend(found_backend);
942  found_set.reset();
943  return; // search error
944  }
945 
946  if (z_records)
947  {
948  int condition = 0;
949  if (z_records->which == Z_Records_NSD)
950  {
951  condition =
952  get_diagnostic(z_records->u.nonSurrogateDiagnostic);
953  }
954  else if (z_records->which == Z_Records_multipleNSD)
955  {
956  if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1
957  &&
958 
959  z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which ==
960  Z_DiagRec_defaultFormat)
961  {
962  condition = get_diagnostic(
963  z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat);
964 
965  }
966  }
967  if (m_p->m_restart && !session_restarted &&
968  condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR)
969  {
970  package.log("session_shared", YLOG_LOG, "restart");
971  bc->remove_backend(found_backend);
972  session_restarted = true;
973  found_backend.reset();
974  goto restart;
975 
976  }
977 
978  if (condition)
979  {
980  mp::odr odr;
981  if (apdu_req->which == Z_APDU_searchRequest)
982  {
983  Z_APDU *f_apdu = odr.create_searchResponse(apdu_req,
984  0, 0);
985  Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
986  *f_resp->searchStatus = Z_SearchResponse_none;
987  f_resp->records = z_records;
988  package.response() = f_apdu;
989  }
990  if (apdu_req->which == Z_APDU_presentRequest)
991  {
992  Z_APDU *f_apdu = odr.create_presentResponse(apdu_req,
993  0, 0);
994  Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
995  f_resp->records = z_records;
996  package.response() = f_apdu;
997  }
998  bc->release_backend(found_backend);
999  found_set.reset();
1000  return; // search error
1001  }
1002  }
1003  if (m_p->m_restart && !session_restarted && new_set->m_result_set_size < 0)
1004  {
1005  package.log("session_shared", YLOG_LOG, "restart");
1006  bc->remove_backend(found_backend);
1007  session_restarted = true;
1008  found_backend.reset();
1009  goto restart;
1010  }
1011  found_backend->m_sets.push_back(found_set);
1012 }
1013 
1014 int yf::SessionShared::Frontend::result_set_ref(ODR o,
1015  const Databases &databases,
1016  Z_RPNStructure *s,
1017  std::string &rset)
1018 {
1019  int ret = 0;
1020  switch (s->which)
1021  {
1022  case Z_RPNStructure_simple:
1023  if (s->u.simple->which == Z_Operand_resultSetId)
1024  {
1025  const char *id = s->u.simple->u.resultSetId;
1026  rset = id;
1027 
1028  FrontendSets::iterator fset_it = m_frontend_sets.find(id);
1029  if (fset_it == m_frontend_sets.end())
1030  {
1031  ret = YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST;
1032  }
1033  else if (fset_it->second->get_databases() != databases)
1034  {
1035  ret = YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST;
1036  }
1037  else
1038  {
1039  yazpp_1::Yaz_Z_Query query = fset_it->second->get_query();
1040  Z_Query *q = yaz_copy_Z_Query(query.get_Z_Query(), o);
1041  if (q->which == Z_Query_type_1 || q->which == Z_Query_type_101)
1042  {
1043  s->which = q->u.type_1->RPNStructure->which;
1044  s->u.simple = q->u.type_1->RPNStructure->u.simple;
1045  }
1046  else
1047  {
1048  ret = YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST;
1049  }
1050  }
1051  }
1052  break;
1053  case Z_RPNStructure_complex:
1054  ret = result_set_ref(o, databases, s->u.complex->s1, rset);
1055  if (!ret)
1056  ret = result_set_ref(o, databases, s->u.complex->s2, rset);
1057  break;
1058  }
1059  return ret;
1060 }
1061 
1062 void yf::SessionShared::Frontend::search(mp::Package &package,
1063  Z_APDU *apdu_req)
1064 {
1065  Z_SearchRequest *req = apdu_req->u.searchRequest;
1066  FrontendSets::iterator fset_it =
1067  m_frontend_sets.find(req->resultSetName);
1068  if (fset_it != m_frontend_sets.end())
1069  {
1070  // result set already exist
1071  // if replace indicator is off: we return diagnostic if
1072  // result set already exist.
1073  if (*req->replaceIndicator == 0)
1074  {
1075  mp::odr odr;
1076  Z_APDU *apdu =
1077  odr.create_searchResponse(
1078  apdu_req,
1079  YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
1080  0);
1081  package.response() = apdu;
1082  return;
1083  }
1084  m_frontend_sets.erase(fset_it);
1085  }
1086 
1087  Databases databases;
1088  int i;
1089  for (i = 0; i < req->num_databaseNames; i++)
1090  databases.push_back(req->databaseNames[i]);
1091 
1092 
1093  yazpp_1::Yaz_Z_Query query;
1094  query.set_Z_Query(req->query);
1095 
1096  Z_Query *q = query.get_Z_Query();
1097  if (q->which == Z_Query_type_1 || q->which == Z_Query_type_101)
1098  {
1099  mp::odr odr;
1100  std::string rset;
1101  int diag = result_set_ref(odr, databases, q->u.type_1->RPNStructure,
1102  rset);
1103  if (diag)
1104  {
1105  Z_APDU *apdu =
1106  odr.create_searchResponse(
1107  apdu_req,
1108  diag,
1109  rset.c_str());
1110  package.response() = apdu;
1111  return;
1112  }
1113  query.set_Z_Query(q);
1114  }
1115 
1116  BackendSetPtr found_set; // null
1117  BackendInstancePtr found_backend; // null
1118 
1119  get_set(package, apdu_req, databases, query, found_backend, found_set);
1120  if (!found_set)
1121  return;
1122 
1123  mp::odr odr;
1124  Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
1125  Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
1126  *f_resp->resultCount = found_set->m_result_set_size;
1127  f_resp->additionalSearchInfo = found_set->additionalSearchInfoResponse;
1128  package.response() = f_apdu;
1129 
1130  FrontendSetPtr fset(new FrontendSet(databases, query));
1131  m_frontend_sets[req->resultSetName] = fset;
1132 
1133  m_backend_class->release_backend(found_backend);
1134 }
1135 
1136 void yf::SessionShared::Frontend::present(mp::Package &package,
1137  Z_APDU *apdu_req)
1138 {
1139  mp::odr odr;
1140  Z_PresentRequest *req = apdu_req->u.presentRequest;
1141 
1142  FrontendSets::iterator fset_it =
1143  m_frontend_sets.find(req->resultSetId);
1144 
1145  if (fset_it == m_frontend_sets.end())
1146  {
1147  Z_APDU *apdu =
1148  odr.create_presentResponse(
1149  apdu_req,
1150  YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
1151  req->resultSetId);
1152  package.response() = apdu;
1153  return;
1154  }
1155  FrontendSetPtr fset = fset_it->second;
1156 
1157  Databases databases = fset->get_databases();
1158  yazpp_1::Yaz_Z_Query query = fset->get_query();
1159 
1160  BackendClassPtr bc = m_backend_class;
1161  BackendSetPtr found_set; // null
1162  BackendInstancePtr found_backend;
1163 
1164  get_set(package, apdu_req, databases, query, found_backend, found_set);
1165  if (!found_set)
1166  return;
1167 
1168  Z_NamePlusRecordList *npr_res = 0;
1169  // record_cache.lookup types are int's. Avoid non-fitting values
1170  if (*req->resultSetStartPoint > 0
1171  && *req->resultSetStartPoint < INT_MAX
1172  && *req->numberOfRecordsRequested > 0
1173  && *req->numberOfRecordsRequested < INT_MAX
1174  && found_set->m_record_cache.lookup(odr, &npr_res,
1175  *req->resultSetStartPoint,
1176  *req->numberOfRecordsRequested,
1177  req->preferredRecordSyntax,
1178  req->recordComposition))
1179  {
1180  Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
1181  Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
1182 
1183  yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF
1184  " records in cache %p",
1185  *req->resultSetStartPoint,
1186  *req->numberOfRecordsRequested,
1187  &found_set->m_record_cache);
1188 
1189  *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested;
1190  *f_resp->nextResultSetPosition =
1191  *req->resultSetStartPoint + *req->numberOfRecordsRequested;
1192  // f_resp->presentStatus assumed OK.
1193  f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
1194  f_resp->records->which = Z_Records_DBOSD;
1195  f_resp->records->u.databaseOrSurDiagnostics = npr_res;
1196  package.response() = f_apdu_res;
1197  bc->release_backend(found_backend);
1198  return;
1199  }
1200 
1201  found_backend->timestamp();
1202 
1203  Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
1204  Z_PresentRequest *p_req = p_apdu->u.presentRequest;
1205  p_req->preferredRecordSyntax = req->preferredRecordSyntax;
1206  p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
1207  *p_req->resultSetStartPoint = *req->resultSetStartPoint;
1208  *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
1209  p_req->preferredRecordSyntax = req->preferredRecordSyntax;
1210  p_req->recordComposition = req->recordComposition;
1211 
1212  Package present_package(found_backend->m_session, package.origin());
1213  present_package.copy_filter(package);
1214 
1215  present_package.request() = p_apdu;
1216 
1217  present_package.move();
1218 
1219  Z_GDU *gdu = present_package.response().get();
1220  if (!present_package.session().is_closed()
1221  && gdu && gdu->which == Z_GDU_Z3950
1222  && gdu->u.z3950->which == Z_APDU_presentResponse)
1223  {
1224  Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
1225  Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
1226  Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
1227 
1228  f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
1229  f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
1230  f_resp->presentStatus= b_resp->presentStatus;
1231  f_resp->records = b_resp->records;
1232  f_resp->otherInfo = b_resp->otherInfo;
1233  package.response() = f_apdu_res;
1234 
1235  if (b_resp->records && b_resp->records->which == Z_Records_DBOSD)
1236  {
1237  Z_NamePlusRecordList *npr =
1238  b_resp->records->u.databaseOrSurDiagnostics;
1239  // record_cache.add types are int's. Avoid non-fitting values
1240  if (*req->resultSetStartPoint > 0
1241  && npr->num_records + *req->resultSetStartPoint < INT_MAX)
1242  {
1243 #if 0
1244  yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF
1245  " records to cache %p",
1246  *req->resultSetStartPoint,
1247  *f_resp->numberOfRecordsReturned,
1248  &found_set->m_record_cache);
1249 #endif
1250  found_set->m_record_cache.add(
1251  odr, npr, *req->resultSetStartPoint,
1252  p_req->recordComposition);
1253  }
1254  }
1255  bc->release_backend(found_backend);
1256  }
1257  else
1258  {
1259  bc->remove_backend(found_backend);
1260  Z_APDU *f_apdu_res =
1261  odr.create_presentResponse(
1262  apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1263  "session_shared: target closed connection during present");
1264  package.response() = f_apdu_res;
1265  }
1266 }
1267 
1268 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
1269  Z_APDU *apdu_req)
1270 {
1271  BackendClassPtr bc = m_backend_class;
1272  int code;
1273  std::string addinfo;
1274  BackendInstancePtr backend = bc->get_backend(frontend_package,
1275  code, addinfo);
1276  if (!backend)
1277  {
1278  mp::odr odr;
1279  if (code == 0)
1280  code = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
1281  if (addinfo.length() == 0)
1282  addinfo = "session_shared: could not create backend";
1283  Z_APDU *apdu = odr.create_scanResponse(
1284  apdu_req, code, addinfo.c_str());
1285  frontend_package.response() = apdu;
1286  }
1287  else
1288  {
1289  Package scan_package(backend->m_session, frontend_package.origin());
1290  backend->timestamp();
1291  scan_package.copy_filter(frontend_package);
1292  scan_package.request() = apdu_req;
1293  scan_package.move();
1294  frontend_package.response() = scan_package.response();
1295  if (scan_package.session().is_closed())
1296  {
1297  frontend_package.session().close();
1298  bc->remove_backend(backend);
1299  }
1300  else
1301  bc->release_backend(backend);
1302  }
1303 }
1304 
1305 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
1306 {
1307 }
1308 
1309 void yf::SessionShared::Worker::operator() (void)
1310 {
1311  m_p->expire();
1312 }
1313 
1314 bool yf::SessionShared::BackendClass::expire_instances()
1315 {
1316  time_t now;
1317  time(&now);
1318  boost::mutex::scoped_lock lock(m_mutex_backend_class);
1319  BackendInstanceList::iterator bit = m_backend_list.begin();
1320  while (bit != m_backend_list.end())
1321  {
1322  time_t last_use = (*bit)->m_time_last_use;
1323 
1324  if ((*bit)->m_in_use)
1325  {
1326  bit++;
1327  }
1328  else if (now < last_use || now - last_use > m_backend_expiry_ttl)
1329  {
1330  bit = m_backend_list.erase(bit);
1331  }
1332  else
1333  {
1334  bit++;
1335  }
1336  }
1337  if (m_backend_list.empty())
1338  return true;
1339  return false;
1340 }
1341 
1342 void yf::SessionShared::Rep::expire_classes()
1343 {
1344  boost::mutex::scoped_lock lock(m_mutex_backend_map);
1345  BackendClassMap::iterator b_it = m_backend_map.begin();
1346  while (b_it != m_backend_map.end())
1347  {
1348  if (b_it->second->expire_instances())
1349  {
1350  m_backend_map.erase(b_it);
1351  b_it = m_backend_map.begin();
1352  }
1353  else
1354  b_it++;
1355  }
1356 }
1357 
1358 void yf::SessionShared::Rep::expire()
1359 {
1360  while (true)
1361  {
1362  boost::xtime xt;
1363  boost::xtime_get(&xt,
1364 #if BOOST_VERSION >= 105000
1365  boost::TIME_UTC_
1366 #else
1367  boost::TIME_UTC
1368 #endif
1369  );
1370  xt.sec += m_session_ttl;
1371  {
1372  boost::mutex::scoped_lock lock(m_mutex);
1373  m_cond_expire_ready.timed_wait(lock, xt);
1374  if (close_down)
1375  break;
1376  }
1377  stat();
1378  expire_classes();
1379  }
1380 }
1381 
1382 yf::SessionShared::Rep::Rep()
1383 {
1384  m_resultset_ttl = 30;
1385  m_resultset_max = 10;
1386  m_session_ttl = 90;
1387  m_optimize_search = true;
1388  m_restart = false;
1389  m_session_max = 100;
1391  m_maximumRecordSize = 0;
1392  close_down = false;
1393 }
1394 
1395 yf::SessionShared::Rep::~Rep()
1396 {
1397  {
1398  boost::mutex::scoped_lock lock(m_mutex);
1399  close_down = true;
1400  m_cond_expire_ready.notify_all();
1401  }
1402  m_thrds.join_all();
1403 }
1404 
1405 void yf::SessionShared::Rep::start()
1406 {
1407  yf::SessionShared::Worker w(this);
1408  m_thrds.add_thread(new boost::thread(w));
1409 }
1410 
1411 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
1412 {
1413 }
1414 
1415 yf::SessionShared::~SessionShared() {
1416 }
1417 
1418 void yf::SessionShared::start() const
1419 {
1420  m_p->start();
1421 }
1422 
1423 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
1424 {
1425 }
1426 
1427 yf::SessionShared::Frontend::~Frontend()
1428 {
1429 }
1430 
1431 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
1432 {
1433  boost::mutex::scoped_lock lock(m_mutex);
1434 
1435  std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1436 
1437  while(true)
1438  {
1439  it = m_clients.find(package.session());
1440  if (it == m_clients.end())
1441  break;
1442 
1443  if (!it->second->m_in_use)
1444  {
1445  it->second->m_in_use = true;
1446  return it->second;
1447  }
1448  m_cond_session_ready.wait(lock);
1449  }
1450  FrontendPtr f(new Frontend(this));
1451  m_clients[package.session()] = f;
1452  f->m_in_use = true;
1453  return f;
1454 }
1455 
1456 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
1457 {
1458  boost::mutex::scoped_lock lock(m_mutex);
1459  std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1460 
1461  it = m_clients.find(package.session());
1462  if (it != m_clients.end())
1463  {
1464  if (package.session().is_closed())
1465  {
1466  m_clients.erase(it);
1467  }
1468  else
1469  {
1470  it->second->m_in_use = false;
1471  }
1472  m_cond_session_ready.notify_all();
1473  }
1474 }
1475 
1476 
1477 void yf::SessionShared::process(mp::Package &package) const
1478 {
1479  FrontendPtr f = m_p->get_frontend(package);
1480 
1481  Z_GDU *gdu = package.request().get();
1482 
1483  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1484  Z_APDU_initRequest && !f->m_is_virtual)
1485  {
1486  m_p->init(package, gdu, f);
1487  }
1488  else if (!f->m_is_virtual)
1489  package.move();
1490  else if (gdu && gdu->which == Z_GDU_Z3950)
1491  {
1492  Z_APDU *apdu = gdu->u.z3950;
1493  if (apdu->which == Z_APDU_initRequest)
1494  {
1495  mp::odr odr;
1496 
1497  package.response() = odr.create_close(
1498  apdu,
1499  Z_Close_protocolError,
1500  "double init");
1501 
1502  package.session().close();
1503  }
1504  else if (apdu->which == Z_APDU_close)
1505  {
1506  mp::odr odr;
1507 
1508  package.response() = odr.create_close(
1509  apdu,
1510  Z_Close_peerAbort, "received close from client");
1511  package.session().close();
1512  }
1513  else if (apdu->which == Z_APDU_searchRequest)
1514  {
1515  f->search(package, apdu);
1516  }
1517  else if (apdu->which == Z_APDU_presentRequest)
1518  {
1519  f->present(package, apdu);
1520  }
1521  else if (apdu->which == Z_APDU_scanRequest)
1522  {
1523  f->scan(package, apdu);
1524  }
1525  else
1526  {
1527  mp::odr odr;
1528 
1529  package.response() = odr.create_close(
1530  apdu, Z_Close_protocolError,
1531  "unsupported APDU in filter_session_shared");
1532 
1533  package.session().close();
1534  }
1535  }
1536  m_p->release_frontend(package);
1537 }
1538 
1539 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only,
1540  const char *path)
1541 {
1542  for (ptr = ptr->children; ptr; ptr = ptr->next)
1543  {
1544  if (ptr->type != XML_ELEMENT_NODE)
1545  continue;
1546  if (!strcmp((const char *) ptr->name, "resultset"))
1547  {
1548  const struct _xmlAttr *attr;
1549  for (attr = ptr->properties; attr; attr = attr->next)
1550  {
1551  if (!strcmp((const char *) attr->name, "ttl"))
1552  m_p->m_resultset_ttl =
1553  mp::xml::get_int(attr->children, 30);
1554  else if (!strcmp((const char *) attr->name, "max"))
1555  {
1556  m_p->m_resultset_max =
1557  mp::xml::get_int(attr->children, 10);
1558  }
1559  else if (!strcmp((const char *) attr->name, "optimizesearch"))
1560  {
1561  m_p->m_optimize_search =
1562  mp::xml::get_bool(attr->children, true);
1563  }
1564  else if (!strcmp((const char *) attr->name, "restart"))
1565  {
1566  m_p->m_restart = mp::xml::get_bool(attr->children, true);
1567  }
1568  else
1569  throw mp::filter::FilterException(
1570  "Bad attribute " + std::string((const char *)
1571  attr->name));
1572  }
1573  }
1574  else if (!strcmp((const char *) ptr->name, "session"))
1575  {
1576  const struct _xmlAttr *attr;
1577  for (attr = ptr->properties; attr; attr = attr->next)
1578  {
1579  if (!strcmp((const char *) attr->name, "ttl"))
1580  m_p->m_session_ttl =
1581  mp::xml::get_int(attr->children, 90);
1582  else if (!strcmp((const char *) attr->name, "max"))
1583  m_p->m_session_max =
1584  mp::xml::get_int(attr->children, 100);
1585  else
1586  throw mp::filter::FilterException(
1587  "Bad attribute " + std::string((const char *)
1588  attr->name));
1589  }
1590  }
1591  else if (!strcmp((const char *) ptr->name, "init"))
1592  {
1593  const struct _xmlAttr *attr;
1594  for (attr = ptr->properties; attr; attr = attr->next)
1595  {
1596  if (!strcmp((const char *) attr->name, "maximum-record-size"))
1597  m_p->m_maximumRecordSize =
1598  mp::xml::get_int(attr->children, 0);
1599  else if (!strcmp((const char *) attr->name,
1600  "preferred-message-size"))
1601  m_p->m_preferredMessageSize =
1602  mp::xml::get_int(attr->children, 0);
1603  else
1604  throw mp::filter::FilterException(
1605  "Bad attribute " + std::string((const char *)
1606  attr->name));
1607  }
1608  }
1609  else
1610  {
1611  throw mp::filter::FilterException("Bad element "
1612  + std::string((const char *)
1613  ptr->name));
1614  }
1615  }
1616 }
1617 
1618 static mp::filter::Base* filter_creator()
1619 {
1620  return new mp::filter::SessionShared;
1621 }
1622 
1623 extern "C" {
1624  struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1625  0,
1626  "session_shared",
1628  };
1629 }
1630 
1631 /*
1632  * Local variables:
1633  * c-basic-offset: 4
1634  * c-file-style: "Stroustrup"
1635  * indent-tabs-mode: nil
1636  * End:
1637  * vim: shiftwidth=4 tabstop=8 expandtab
1638  */
1639 
boost::shared_ptr< FrontendSet > FrontendSetPtr
boost::shared_ptr< BackendSet > BackendSetPtr
struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared
boost::shared_ptr< Frontend > FrontendPtr
std::list< BackendInstancePtr > BackendInstanceList
boost::shared_ptr< BackendInstance > BackendInstancePtr
std::list< BackendSetPtr > BackendSetList
static int get_diagnostic(Z_DefaultDiagFormat *r)
bool operator<(const SessionShared::InitKey &k) const
std::map< std::string, FrontendSetPtr > FrontendSets
static mp::filter::Base * filter_creator()
boost::shared_ptr< BackendClass > BackendClassPtr
std::map< mp::Session, FrontendPtr > m_clients
std::map< InitKey, BackendClassPtr > BackendClassMap