21 #include <metaproxy/filter.hpp>
22 #include <metaproxy/package.hpp>
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
31 #include <metaproxy/util.hpp>
34 #include <yaz/copy_types.h>
37 #include <yaz/otherinfo.h>
38 #include <yaz/diagbib1.h>
39 #include <yazpp/z-query.h>
40 #include <yazpp/record-cache.h>
89 const std::string &result_set_id,
91 const yazpp_1::Yaz_Z_Query &query,
95 Package &frontend_package,
96 Package &search_package,
97 const Z_APDU *apdu_req,
99 Z_Records **z_records);
124 int &code, std::string &addinfo);
127 int &code, std::string &addinfo);
149 Odr_int preferredRecordSize,
150 Odr_int maximumRecordSize);
162 const yazpp_1::Yaz_Z_Query &query);
172 void search(Package &package, Z_APDU *apdu);
173 void present(Package &package, Z_APDU *apdu);
174 void scan(Package &package, Z_APDU *apdu);
178 Z_RPNStructure *s, std::string &rset);
179 void get_set(mp::Package &package,
180 const Z_APDU *apdu_req,
182 yazpp_1::Yaz_Z_Query &query,
186 std::string &result_set_id,
188 bool out_of_sessions);
208 void init(Package &package,
const Z_GDU *gdu,
233 yf::SessionShared::FrontendSet::FrontendSet(
235 const yazpp_1::Yaz_Z_Query &query)
236 : m_databases(databases), m_query(query)
240 const yf::SessionShared::Databases &
241 yf::SessionShared::FrontendSet::get_databases()
246 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
251 yf::SessionShared::InitKey::InitKey(
const InitKey &k)
253 m_odr = odr_createmem(ODR_ENCODE);
256 m_idAuthentication_buf = (
char*)odr_malloc(m_odr, m_idAuthentication_size);
258 m_idAuthentication_size);
261 m_otherInfo_buf = (
char*)odr_malloc(m_odr, m_otherInfo_size);
266 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
268 m_odr = odr_createmem(ODR_ENCODE);
270 Z_IdAuthentication *t = req->idAuthentication;
271 z_IdAuthentication(m_odr, &t, 1, 0);
272 m_idAuthentication_buf =
273 odr_getbuf(m_odr, &m_idAuthentication_size, 0);
275 Z_OtherInformation *o = req->otherInfo;
276 z_OtherInformation(m_odr, &o, 1, 0);
277 m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
280 yf::SessionShared::InitKey::~InitKey()
289 c = mp::util::memcmp2(
290 (
void*) m_idAuthentication_buf, m_idAuthentication_size,
297 c = mp::util::memcmp2((
void*) m_otherInfo_buf, m_otherInfo_size,
308 boost::mutex::scoped_lock lock(m_mutex_backend_class);
309 m_cond_set_ready.notify_all();
317 boost::mutex::scoped_lock lock(m_mutex_backend_class);
318 BackendInstanceList::iterator it = m_backend_list.begin();
321 if (it == m_backend_list.end())
325 it = m_backend_list.erase(it);
332 b->m_close_package->response() = odr.create_close(
333 0, Z_Close_lackOfActivity, 0);
334 b->m_close_package->session().close();
335 b->m_close_package->move();
339 yf::SessionShared::BackendInstancePtr
340 yf::SessionShared::BackendClass::get_backend(
341 const mp::Package &frontend_package,
342 int &code, std::string &addinfo)
345 boost::mutex::scoped_lock lock(m_mutex_backend_class);
347 BackendInstanceList::const_iterator it = m_backend_list.begin();
351 for (; it != m_backend_list.end(); it++)
353 if (!(*it)->m_in_use)
356 || (*it)->m_sequence_this < backend1->m_sequence_this)
362 use_backend(backend1);
366 return create_backend(frontend_package, code, addinfo);
371 backend->m_in_use =
true;
372 backend->m_sequence_this = m_sequence_top++;
375 void yf::SessionShared::BackendInstance::timestamp()
378 time(&m_time_last_use);
381 yf::SessionShared::BackendInstance::~BackendInstance()
386 m_close_package->response() = odr.create_close(
387 0, Z_Close_lackOfActivity, 0);
388 m_close_package->session().close();
389 m_close_package->move();
391 delete m_close_package;
394 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
395 const mp::Package &frontend_package,
396 int &code, std::string &addinfo)
400 bp->m_close_package =
401 new mp::Package(bp->m_session, frontend_package.origin());
402 bp->m_close_package->copy_filter(frontend_package);
404 Package init_package(bp->m_session, frontend_package.origin());
406 init_package.copy_filter(frontend_package);
408 yazpp_1::GDU actual_init_request = m_init_request;
409 Z_GDU *init_pdu = actual_init_request.get();
411 assert(init_pdu->which == Z_GDU_Z3950);
412 assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
414 Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
415 ODR_MASK_ZERO(req->options);
417 ODR_MASK_SET(req->options, Z_Options_search);
418 ODR_MASK_SET(req->options, Z_Options_present);
419 ODR_MASK_SET(req->options, Z_Options_namedResultSets);
420 ODR_MASK_SET(req->options, Z_Options_scan);
422 ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
423 ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
424 ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
426 if (m_preferredMessageSize)
427 *req->preferredMessageSize = m_preferredMessageSize;
428 if (m_maximumRecordSize)
429 *req->maximumRecordSize = m_maximumRecordSize;
431 init_package.request() = init_pdu;
434 boost::mutex::scoped_lock lock(m_mutex_backend_class);
440 boost::mutex::scoped_lock lock(m_mutex_backend_class);
444 m_named_result_sets =
false;
445 Z_GDU *gdu = init_package.response().get();
447 if (gdu && gdu->which == Z_GDU_Z3950
448 && gdu->u.z3950->which == Z_APDU_initResponse)
450 Z_InitResponse *res = gdu->u.z3950->u.initResponse;
452 if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
454 m_named_result_sets =
true;
457 if (m_no_succeeded == 0)
458 m_init_response = gdu->u.z3950;
459 if (*gdu->u.z3950->u.initResponse->result
460 && !init_package.session().is_closed())
463 time(&bp->m_time_last_use);
464 bp->m_sequence_this = 0;
465 bp->m_result_set_sequence = 0;
466 m_backend_list.push_back(bp);
473 gdu->u.z3950->u.initResponse->userInformationField;
474 if (uif && uif->which == Z_External_userInfo1)
476 Z_OtherInformation *ui = uif->u.userInfo1;
477 if (ui && ui->num_elements >= 1)
479 Z_OtherInformationUnit *unit = ui->list[0];
480 if (unit->which == Z_OtherInfo_externallyDefinedInfo &&
481 unit->information.externallyDefinedInfo &&
482 unit->information.externallyDefinedInfo->which ==
485 Z_DiagnosticFormat *diag =
486 unit->information.externallyDefinedInfo->u.diag1;
489 Z_DiagnosticFormat_s *ds = diag->elements[0];
491 Z_DiagnosticFormat_s_defaultDiagRec)
493 Z_DefaultDiagFormat *e =
494 ds->u.defaultDiagRec;
495 code = *e->condition;
496 if (e->which == Z_DefaultDiagFormat_v2Addinfo
499 addinfo = e->u.v2Addinfo;
502 e->which == Z_DefaultDiagFormat_v3Addinfo
505 addinfo = e->u.v3Addinfo;
514 if (!init_package.session().is_closed())
516 init_package.copy_filter(frontend_package);
517 init_package.session().close();
521 m_cond_set_ready.notify_all();
526 yf::SessionShared::BackendClass::BackendClass(
const yazpp_1::GDU &init_request,
530 Odr_int preferredMessageSize,
531 Odr_int maximumRecordSize)
532 : m_named_result_sets(false), m_init_request(init_request),
533 m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
534 m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max),
535 m_preferredMessageSize(preferredMessageSize),
536 m_maximumRecordSize(maximumRecordSize),
537 m_no_failed(0), m_no_succeeded(0), m_no_init(0)
540 yf::SessionShared::BackendClass::~BackendClass()
543 void yf::SessionShared::Rep::stat()
546 int no_instances = 0;
547 BackendClassMap::const_iterator it;
549 boost::mutex::scoped_lock lock(m_mutex_backend_map);
550 for (it = m_backend_map.begin(); it != m_backend_map.end(); it++)
554 BackendInstanceList::iterator bit = bc->m_backend_list.begin();
555 for (; bit != bc->m_backend_list.end(); bit++)
561 void yf::SessionShared::Rep::init(mp::Package &package,
const Z_GDU *gdu,
564 Z_InitRequest *req = gdu->u.z3950->u.initRequest;
566 frontend->m_is_virtual =
true;
567 frontend->m_init_options = *req->options;
569 req->idAuthentication = 0;
572 boost::mutex::scoped_lock lock(m_mutex_backend_map);
573 BackendClassMap::const_iterator it;
574 it = m_backend_map.find(k);
575 if (it == m_backend_map.end())
581 m_preferredMessageSize,
582 m_maximumRecordSize));
583 m_backend_map[k] = b;
584 frontend->m_backend_class = b;
588 frontend->m_backend_class = it->second;
597 bool create_first_one =
false;
599 boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
600 if (!bc->m_no_succeeded)
601 create_first_one =
true;
605 while (!bc->m_no_failed && !bc->m_no_succeeded && bc->m_no_init)
607 bc->m_cond_set_ready.wait(lock);
611 if (create_first_one)
618 bc->release_backend(backend);
620 yazpp_1::GDU init_response;
622 boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
624 init_response = bc->m_init_response;
627 if (init_response.get())
629 Z_GDU *response_gdu = init_response.get();
630 mp::util::transfer_referenceId(odr, gdu->u.z3950,
631 response_gdu->u.z3950);
632 Z_InitResponse *init_res = response_gdu->u.z3950->u.initResponse;
633 Z_Options *server_options = init_res->options;
634 Z_Options *client_options = &frontend->m_init_options;
636 for (i = 0; i < 30; i++)
637 if (!ODR_MASK_GET(client_options, i))
638 ODR_MASK_CLEAR(server_options, i);
640 if (!m_preferredMessageSize ||
641 *init_res->preferredMessageSize > *req->preferredMessageSize)
642 *init_res->preferredMessageSize = *req->preferredMessageSize;
644 if (!m_maximumRecordSize ||
645 *init_res->maximumRecordSize > *req->maximumRecordSize)
646 *init_res->maximumRecordSize = *req->maximumRecordSize;
648 package.response() = init_response;
649 if (!*response_gdu->u.z3950->u.initResponse->result)
650 package.session().close();
655 odr.create_initResponse(
656 gdu->u.z3950, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
657 "session_shared: target closed connection during init");
658 *apdu->u.initResponse->result = 0;
659 package.response() = apdu;
660 package.session().close();
664 void yf::SessionShared::BackendSet::timestamp()
666 time(&m_time_last_use);
669 yf::SessionShared::BackendSet::BackendSet(
670 const std::string &result_set_id,
672 const yazpp_1::Yaz_Z_Query &query,
673 Z_OtherInformation *additionalSearchInfo) :
674 m_result_set_id(result_set_id),
675 m_databases(databases), m_result_set_size(0), m_query(query)
681 yaz_clone_z_OtherInformation(additionalSearchInfo,
685 yf::SessionShared::BackendSet::~BackendSet()
687 nmem_destroy(mem_additionalSearchInfo);
692 return *r->condition;
695 bool yf::SessionShared::BackendSet::search(
696 mp::Package &frontend_package,
697 mp::Package &search_package,
698 const Z_APDU *frontend_apdu,
700 Z_Records **z_records)
703 Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
704 Z_SearchRequest *req = apdu_req->u.searchRequest;
706 req->additionalSearchInfo = additionalSearchInfoRequest;
707 req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
708 req->query = m_query.get_Z_Query();
710 req->num_databaseNames = m_databases.size();
711 req->databaseNames = (
char**)
712 odr_malloc(odr, req->num_databaseNames *
sizeof(
char *));
713 Databases::const_iterator it = m_databases.begin();
715 for (; it != m_databases.end(); it++)
716 req->databaseNames[i++] = odr_strdup(odr, it->c_str());
718 if (frontend_apdu->which == Z_APDU_searchRequest)
719 req->preferredRecordSyntax =
720 frontend_apdu->u.searchRequest->preferredRecordSyntax;
722 search_package.request() = apdu_req;
724 search_package.move();
726 Z_GDU *gdu = search_package.response().get();
727 if (!search_package.session().is_closed()
728 && gdu && gdu->which == Z_GDU_Z3950
729 && gdu->u.z3950->which == Z_APDU_searchResponse)
731 Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
732 *z_records = b_resp->records;
733 m_result_set_size = *b_resp->resultCount;
735 additionalSearchInfoResponse = yaz_clone_z_OtherInformation(
736 b_resp->additionalSearchInfo, mem_additionalSearchInfo);
740 const char *addinfo =
"session_shared: "
741 "target closed connection during search";
742 if (frontend_apdu->which == Z_APDU_searchRequest)
743 f_apdu = odr.create_searchResponse(
744 frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
745 else if (frontend_apdu->which == Z_APDU_presentRequest)
746 f_apdu = odr.create_presentResponse(
747 frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
749 f_apdu = odr.create_close(
750 frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
751 frontend_package.response() = f_apdu;
755 void yf::SessionShared::Frontend::override_set(
757 std::string &result_set_id,
759 bool out_of_sessions)
762 BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
766 size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
767 for (; it != bc->m_backend_list.end(); it++)
769 if (!(*it)->m_in_use)
771 BackendSetList::iterator set_it = (*it)->m_sets.begin();
772 for (; set_it != (*it)->m_sets.end(); set_it++)
774 bool same_databases = mp::util::match((*set_it)->m_databases,
776 if ((max_sets > 1 || same_databases)
779 now < (*set_it)->m_time_last_use ||
780 now - (*set_it)->m_time_last_use >= bc->m_backend_set_ttl))
783 result_set_id = (*set_it)->m_result_set_id;
784 found_backend->m_sets.erase(set_it);
790 for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
792 if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
795 if (bc->m_named_result_sets)
797 result_set_id = boost::io::str(
798 boost::format(
"%1%") %
799 found_backend->m_result_set_sequence);
800 found_backend->m_result_set_sequence++;
803 result_set_id =
"default";
809 void yf::SessionShared::Frontend::get_set(mp::Package &package,
810 const Z_APDU *apdu_req,
812 yazpp_1::Yaz_Z_Query &query,
816 bool session_restarted =
false;
817 Z_OtherInformation *additionalSearchInfo = 0;
819 if (apdu_req->which == Z_APDU_searchRequest)
820 additionalSearchInfo = apdu_req->u.searchRequest->additionalSearchInfo;
823 std::string result_set_id;
824 bool out_of_sessions =
false;
827 boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
829 if ((
int) bc->m_backend_list.size() >=
m_p->m_session_max)
830 out_of_sessions =
true;
832 if (
m_p->m_optimize_search)
835 BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
836 while (it != bc->m_backend_list.end())
838 bool restart =
false;
839 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
840 for (; set_it != (*it)->m_sets.end(); set_it++)
844 if (mp::util::match((*set_it)->m_databases, databases)
845 && query.match(&(*set_it)->m_query)
846 && (apdu_req->which != Z_APDU_searchRequest ||
847 yaz_compare_z_OtherInformation(
848 additionalSearchInfo,
849 (*set_it)->additionalSearchInfoRequest)))
853 bc->m_cond_set_ready.wait(lock);
861 bc->use_backend(found_backend);
868 it = bc->m_backend_list.begin();
873 override_set(found_backend, result_set_id, databases, out_of_sessions);
875 bc->use_backend(found_backend);
882 if (!out_of_sessions)
883 found_backend = bc->create_backend(package, code, addinfo);
891 code = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
892 addinfo =
"session_shared: all sessions in use";
897 code = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
898 if (addinfo.length() == 0)
899 addinfo =
"session_shared: could not create backend";
901 if (apdu_req->which == Z_APDU_searchRequest)
903 f_apdu = odr.create_searchResponse(
904 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo.c_str());
906 else if (apdu_req->which == Z_APDU_presentRequest)
908 f_apdu = odr.create_presentResponse(
909 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo.c_str());
913 f_apdu = odr.create_close(
914 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo.c_str());
916 package.response() = f_apdu;
919 if (bc->m_named_result_sets)
921 result_set_id = boost::io::str(
922 boost::format(
"%1%") % found_backend->m_result_set_sequence);
923 found_backend->m_result_set_sequence++;
926 result_set_id =
"default";
928 found_backend->timestamp();
933 additionalSearchInfo));
936 found_set->timestamp();
938 Z_Records *z_records = 0;
940 Package search_package(found_backend->m_session, package.origin());
941 search_package.copy_filter(package);
943 if (!new_set->search(package, search_package,
944 apdu_req, found_backend, &z_records))
946 bc->remove_backend(found_backend);
954 if (z_records->which == Z_Records_NSD)
959 else if (z_records->which == Z_Records_multipleNSD)
961 if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1
964 z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which ==
965 Z_DiagRec_defaultFormat)
968 z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat);
972 if (
m_p->m_restart && !session_restarted &&
973 condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR)
975 package.log(
"session_shared", YLOG_LOG,
"restart");
976 bc->remove_backend(found_backend);
977 session_restarted =
true;
978 found_backend.reset();
986 if (apdu_req->which == Z_APDU_searchRequest)
988 Z_APDU *f_apdu = odr.create_searchResponse(apdu_req,
990 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
991 *f_resp->searchStatus = Z_SearchResponse_none;
992 f_resp->records = z_records;
993 package.response() = f_apdu;
995 if (apdu_req->which == Z_APDU_presentRequest)
997 Z_APDU *f_apdu = odr.create_presentResponse(apdu_req,
999 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
1000 f_resp->records = z_records;
1001 package.response() = f_apdu;
1003 bc->release_backend(found_backend);
1008 if (
m_p->m_restart && !session_restarted && new_set->m_result_set_size < 0)
1010 package.log(
"session_shared", YLOG_LOG,
"restart");
1011 bc->remove_backend(found_backend);
1012 session_restarted =
true;
1013 found_backend.reset();
1016 found_backend->m_sets.push_back(found_set);
1019 int yf::SessionShared::Frontend::result_set_ref(ODR o,
1027 case Z_RPNStructure_simple:
1028 if (s->u.simple->which == Z_Operand_resultSetId)
1030 const char *
id = s->u.simple->u.resultSetId;
1033 FrontendSets::iterator fset_it = m_frontend_sets.find(
id);
1034 if (fset_it == m_frontend_sets.end())
1036 ret = YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST;
1038 else if (!mp::util::match(fset_it->second->get_databases(),
1041 ret = YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST;
1045 yazpp_1::Yaz_Z_Query query = fset_it->second->get_query();
1046 Z_Query *q = yaz_copy_Z_Query(query.get_Z_Query(), o);
1047 if (q->which == Z_Query_type_1 || q->which == Z_Query_type_101)
1049 s->which = q->u.type_1->RPNStructure->which;
1050 s->u.simple = q->u.type_1->RPNStructure->u.simple;
1054 ret = YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST;
1059 case Z_RPNStructure_complex:
1060 ret = result_set_ref(o, databases, s->u.complex->s1, rset);
1062 ret = result_set_ref(o, databases, s->u.complex->s2, rset);
1068 void yf::SessionShared::Frontend::search(mp::Package &package,
1071 Z_SearchRequest *req = apdu_req->u.searchRequest;
1072 FrontendSets::iterator fset_it =
1073 m_frontend_sets.find(req->resultSetName);
1074 if (fset_it != m_frontend_sets.end())
1079 if (*req->replaceIndicator == 0)
1083 odr.create_searchResponse(
1085 YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
1087 package.response() = apdu;
1090 m_frontend_sets.erase(fset_it);
1095 for (i = 0; i < req->num_databaseNames; i++)
1096 databases.push_back(req->databaseNames[i]);
1099 yazpp_1::Yaz_Z_Query query;
1100 query.set_Z_Query(req->query);
1102 Z_Query *q = query.get_Z_Query();
1103 if (q->which == Z_Query_type_1 || q->which == Z_Query_type_101)
1107 int diag = result_set_ref(odr, databases, q->u.type_1->RPNStructure,
1112 odr.create_searchResponse(
1116 package.response() = apdu;
1119 query.set_Z_Query(q);
1125 get_set(package, apdu_req, databases, query, found_backend, found_set);
1130 Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
1131 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
1132 *f_resp->resultCount = found_set->m_result_set_size;
1133 f_resp->additionalSearchInfo = found_set->additionalSearchInfoResponse;
1134 package.response() = f_apdu;
1137 m_frontend_sets[req->resultSetName] = fset;
1139 m_backend_class->release_backend(found_backend);
1142 void yf::SessionShared::Frontend::present(mp::Package &package,
1146 Z_PresentRequest *req = apdu_req->u.presentRequest;
1148 FrontendSets::iterator fset_it =
1149 m_frontend_sets.find(req->resultSetId);
1151 if (fset_it == m_frontend_sets.end())
1154 odr.create_presentResponse(
1156 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
1158 package.response() = apdu;
1163 Databases databases = fset->get_databases();
1164 yazpp_1::Yaz_Z_Query query = fset->get_query();
1170 get_set(package, apdu_req, databases, query, found_backend, found_set);
1174 Z_NamePlusRecordList *npr_res = 0;
1176 if (*req->resultSetStartPoint > 0
1177 && *req->resultSetStartPoint < INT_MAX
1178 && *req->numberOfRecordsRequested > 0
1179 && *req->numberOfRecordsRequested < INT_MAX
1180 && found_set->m_record_cache.lookup(odr, &npr_res,
1181 *req->resultSetStartPoint,
1182 *req->numberOfRecordsRequested,
1183 req->preferredRecordSyntax,
1184 req->recordComposition))
1186 Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
1187 Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
1189 yaz_log(YLOG_LOG,
"Found " ODR_INT_PRINTF
"+" ODR_INT_PRINTF
1190 " records in cache %p",
1191 *req->resultSetStartPoint,
1192 *req->numberOfRecordsRequested,
1193 &found_set->m_record_cache);
1195 *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested;
1196 *f_resp->nextResultSetPosition =
1197 *req->resultSetStartPoint + *req->numberOfRecordsRequested;
1199 f_resp->records = (Z_Records *) odr_malloc(odr,
sizeof(Z_Records));
1200 f_resp->records->which = Z_Records_DBOSD;
1201 f_resp->records->u.databaseOrSurDiagnostics = npr_res;
1202 package.response() = f_apdu_res;
1203 bc->release_backend(found_backend);
1207 found_backend->timestamp();
1209 Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
1210 Z_PresentRequest *p_req = p_apdu->u.presentRequest;
1211 p_req->preferredRecordSyntax = req->preferredRecordSyntax;
1212 p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
1213 *p_req->resultSetStartPoint = *req->resultSetStartPoint;
1214 *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
1215 p_req->preferredRecordSyntax = req->preferredRecordSyntax;
1216 p_req->recordComposition = req->recordComposition;
1218 Package present_package(found_backend->m_session, package.origin());
1219 present_package.copy_filter(package);
1221 present_package.request() = p_apdu;
1223 present_package.move();
1225 Z_GDU *gdu = present_package.response().get();
1226 if (!present_package.session().is_closed()
1227 && gdu && gdu->which == Z_GDU_Z3950
1228 && gdu->u.z3950->which == Z_APDU_presentResponse)
1230 Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
1231 Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
1232 Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
1234 f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
1235 f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
1236 f_resp->presentStatus= b_resp->presentStatus;
1237 f_resp->records = b_resp->records;
1238 f_resp->otherInfo = b_resp->otherInfo;
1239 package.response() = f_apdu_res;
1241 if (b_resp->records && b_resp->records->which == Z_Records_DBOSD)
1243 Z_NamePlusRecordList *npr =
1244 b_resp->records->u.databaseOrSurDiagnostics;
1246 if (*req->resultSetStartPoint > 0
1247 && npr->num_records + *req->resultSetStartPoint < INT_MAX)
1250 yaz_log(YLOG_LOG,
"Adding " ODR_INT_PRINTF
"+" ODR_INT_PRINTF
1251 " records to cache %p",
1252 *req->resultSetStartPoint,
1253 *f_resp->numberOfRecordsReturned,
1254 &found_set->m_record_cache);
1256 found_set->m_record_cache.add(
1257 odr, npr, *req->resultSetStartPoint,
1258 p_req->recordComposition);
1261 bc->release_backend(found_backend);
1265 bc->remove_backend(found_backend);
1266 Z_APDU *f_apdu_res =
1267 odr.create_presentResponse(
1268 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1269 "session_shared: target closed connection during present");
1270 package.response() = f_apdu_res;
1274 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
1279 std::string addinfo;
1286 code = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
1287 if (addinfo.length() == 0)
1288 addinfo =
"session_shared: could not create backend";
1289 Z_APDU *apdu = odr.create_scanResponse(
1290 apdu_req, code, addinfo.c_str());
1291 frontend_package.response() = apdu;
1295 Package scan_package(backend->m_session, frontend_package.origin());
1296 backend->timestamp();
1297 scan_package.copy_filter(frontend_package);
1298 scan_package.request() = apdu_req;
1299 scan_package.move();
1300 frontend_package.response() = scan_package.response();
1301 if (scan_package.session().is_closed())
1303 frontend_package.session().close();
1304 bc->remove_backend(backend);
1307 bc->release_backend(backend);
1315 void yf::SessionShared::Worker::operator() (
void)
1320 bool yf::SessionShared::BackendClass::expire_instances()
1324 boost::mutex::scoped_lock lock(m_mutex_backend_class);
1325 BackendInstanceList::iterator bit = m_backend_list.begin();
1326 while (bit != m_backend_list.end())
1328 time_t last_use = (*bit)->m_time_last_use;
1330 if ((*bit)->m_in_use)
1332 yaz_log(YLOG_LOG,
"session_shared id=%ld in_use",
1333 (*bit)->m_session.id());
1336 else if (now < last_use || now - last_use > m_backend_expiry_ttl)
1338 yaz_log(YLOG_LOG,
"session_shared id=%ld erase",
1339 (*bit)->m_session.id());
1340 bit = m_backend_list.erase(bit);
1344 yaz_log(YLOG_LOG,
"session_shared id=%ld skip now-last_use=%ld",
1345 (*bit)->m_session.id(), (
long) (now - last_use));
1349 if (m_backend_list.empty())
1354 void yf::SessionShared::Rep::expire_classes()
1356 boost::mutex::scoped_lock lock(m_mutex_backend_map);
1357 BackendClassMap::iterator b_it = m_backend_map.begin();
1358 while (b_it != m_backend_map.end())
1360 if (b_it->second->expire_instances())
1362 m_backend_map.erase(b_it);
1363 b_it = m_backend_map.begin();
1370 void yf::SessionShared::Rep::expire()
1375 boost::xtime_get(&xt,
1376 #
if BOOST_VERSION >= 105000
1382 xt.sec += m_session_ttl;
1384 boost::mutex::scoped_lock lock(m_mutex);
1385 m_cond_expire_ready.timed_wait(lock, xt);
1394 yf::SessionShared::Rep::Rep()
1396 m_resultset_ttl = 30;
1397 m_resultset_max = 10;
1399 m_optimize_search =
true;
1400 m_ignore_auth =
false;
1402 m_session_max = 100;
1403 m_preferredMessageSize = 0;
1404 m_maximumRecordSize = 0;
1408 yf::SessionShared::Rep::~Rep()
1411 boost::mutex::scoped_lock lock(m_mutex);
1413 m_cond_expire_ready.notify_all();
1418 void yf::SessionShared::Rep::start()
1420 yf::SessionShared::Worker w(
this);
1421 m_thrds.add_thread(
new boost::thread(w));
1428 yf::SessionShared::~SessionShared() {
1431 void yf::SessionShared::start()
const
1436 yf::SessionShared::Frontend::Frontend(
Rep *rep) : m_is_virtual(false), m_p(rep)
1440 yf::SessionShared::Frontend::~Frontend()
1444 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
1446 boost::mutex::scoped_lock lock(m_mutex);
1448 std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1452 it = m_clients.find(package.session());
1453 if (it == m_clients.end())
1456 if (!it->second->m_in_use)
1458 it->second->m_in_use =
true;
1461 m_cond_session_ready.wait(lock);
1464 m_clients[package.session()] = f;
1469 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
1471 boost::mutex::scoped_lock lock(m_mutex);
1472 std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1474 it = m_clients.find(package.session());
1475 if (it != m_clients.end())
1477 if (package.session().is_closed())
1479 m_clients.erase(it);
1483 it->second->m_in_use =
false;
1485 m_cond_session_ready.notify_all();
1490 void yf::SessionShared::process(mp::Package &package)
const
1494 Z_GDU *gdu = package.request().get();
1496 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1497 Z_APDU_initRequest && !f->m_is_virtual)
1499 m_p->init(package, gdu, f);
1501 else if (!f->m_is_virtual)
1503 else if (gdu && gdu->which == Z_GDU_Z3950)
1505 Z_APDU *apdu = gdu->u.z3950;
1506 if (apdu->which == Z_APDU_initRequest)
1510 package.response() = odr.create_close(
1512 Z_Close_protocolError,
1515 package.session().close();
1517 else if (apdu->which == Z_APDU_close)
1521 package.response() = odr.create_close(
1523 Z_Close_peerAbort,
"received close from client");
1524 package.session().close();
1526 else if (apdu->which == Z_APDU_searchRequest)
1528 f->search(package, apdu);
1530 else if (apdu->which == Z_APDU_presentRequest)
1532 f->present(package, apdu);
1534 else if (apdu->which == Z_APDU_scanRequest)
1536 f->scan(package, apdu);
1542 package.response() = odr.create_close(
1543 apdu, Z_Close_protocolError,
1544 "unsupported APDU in filter_session_shared");
1546 package.session().close();
1549 m_p->release_frontend(package);
1552 void yf::SessionShared::configure(
const xmlNode *ptr,
bool test_only,
1555 for (ptr = ptr->children; ptr; ptr = ptr->next)
1557 if (ptr->type != XML_ELEMENT_NODE)
1559 if (!strcmp((
const char *) ptr->name,
"resultset"))
1561 const struct _xmlAttr *attr;
1562 for (attr = ptr->properties; attr; attr = attr->next)
1564 if (!strcmp((
const char *) attr->name,
"ttl"))
1565 m_p->m_resultset_ttl =
1566 mp::xml::get_int(attr->children, 30);
1567 else if (!strcmp((
const char *) attr->name,
"max"))
1569 m_p->m_resultset_max =
1570 mp::xml::get_int(attr->children, 10);
1572 else if (!strcmp((
const char *) attr->name,
"optimizesearch"))
1574 m_p->m_optimize_search =
1575 mp::xml::get_bool(attr->children,
true);
1577 else if (!strcmp((
const char *) attr->name,
"restart"))
1579 m_p->m_restart = mp::xml::get_bool(attr->children,
true);
1582 throw mp::filter::FilterException(
1583 "Bad attribute " + std::string((
const char *)
1587 else if (!strcmp((
const char *) ptr->name,
"session"))
1589 const struct _xmlAttr *attr;
1590 for (attr = ptr->properties; attr; attr = attr->next)
1592 if (!strcmp((
const char *) attr->name,
"ttl"))
1593 m_p->m_session_ttl =
1594 mp::xml::get_int(attr->children, 90);
1595 else if (!strcmp((
const char *) attr->name,
"max"))
1596 m_p->m_session_max =
1597 mp::xml::get_int(attr->children, 100);
1599 throw mp::filter::FilterException(
1600 "Bad attribute " + std::string((
const char *)
1604 else if (!strcmp((
const char *) ptr->name,
"init"))
1606 const struct _xmlAttr *attr;
1607 for (attr = ptr->properties; attr; attr = attr->next)
1609 if (!strcmp((
const char *) attr->name,
"maximum-record-size"))
1610 m_p->m_maximumRecordSize =
1611 mp::xml::get_int(attr->children, 0);
1612 else if (!strcmp((
const char *) attr->name,
1613 "preferred-message-size"))
1614 m_p->m_preferredMessageSize =
1615 mp::xml::get_int(attr->children, 0);
1616 else if (!strcmp((
const char *) attr->name,
1618 m_p->m_ignore_auth =
1619 mp::xml::get_bool(attr->children,
false);
1621 throw mp::filter::FilterException(
1622 "Bad attribute " + std::string((
const char *)
1628 throw mp::filter::FilterException(
"Bad element "
1629 + std::string((
const char *)
1637 return new mp::filter::SessionShared;
static mp::filter::Base * filter_creator()
static int get_diagnostic(Z_DefaultDiagFormat *r)
struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared