metaproxy  1.3.55
filter_z3950_client.cpp
Go to the documentation of this file.
1 /* This file is part of Metaproxy.
2  Copyright (C) 2005-2013 Index Data
3 
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8 
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 for more details.
13 
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "config.hpp"
20 
21 #include "filter_z3950_client.hpp"
22 #include <metaproxy/package.hpp>
23 #include <metaproxy/util.hpp>
24 
25 #include <map>
26 #include <stdexcept>
27 #include <list>
28 #include <iostream>
29 
30 #include <boost/thread/mutex.hpp>
31 #include <boost/thread/condition.hpp>
32 #include <boost/thread/xtime.hpp>
33 
34 #include <yaz/zgdu.h>
35 #include <yaz/log.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 
39 #include <yazpp/socket-manager.h>
40 #include <yazpp/pdu-assoc.h>
41 #include <yazpp/z-assoc.h>
42 
43 namespace mp = metaproxy_1;
44 namespace yf = mp::filter;
45 
46 namespace metaproxy_1 {
47  namespace filter {
48  class Z3950Client::Assoc : public yazpp_1::Z_Assoc{
49  friend class Rep;
50  Assoc(yazpp_1::SocketManager *socket_manager,
51  yazpp_1::IPDU_Observable *PDU_Observable,
52  std::string host, int timeout);
53  ~Assoc();
54  void connectNotify();
55  void failNotify();
56  void timeoutNotify();
57  void recv_GDU(Z_GDU *gdu, int len);
58  void fixup_nsd(ODR odr, Z_Records *records);
59  void fixup_nsd(ODR odr, Z_DefaultDiagFormat *nsd);
60  void fixup_init(ODR odr, Z_InitResponse *initrs);
61  yazpp_1::IPDU_Observer* sessionNotify(
62  yazpp_1::IPDU_Observable *the_PDU_Observable,
63  int fd);
64 
65  yazpp_1::SocketManager *m_socket_manager;
66  yazpp_1::IPDU_Observable *m_PDU_Observable;
67  Package *m_package;
68  bool m_in_use;
69  bool m_waiting;
77  std::string m_host;
78  };
79 
81  public:
82  // number of seconds to wait before we give up request
86  std::string m_default_target;
87  std::string m_force_target;
88  boost::mutex m_mutex;
89  boost::condition m_cond_session_ready;
90  std::map<mp::Session,Z3950Client::Assoc *> m_clients;
91  Z3950Client::Assoc *get_assoc(Package &package);
92  void send_and_receive(Package &package,
93  yf::Z3950Client::Assoc *c);
94  void release_assoc(Package &package);
95  };
96  }
97 }
98 
99 using namespace mp;
100 
101 yf::Z3950Client::Assoc::Assoc(yazpp_1::SocketManager *socket_manager,
102  yazpp_1::IPDU_Observable *PDU_Observable,
103  std::string host, int timeout_sec)
104  : Z_Assoc(PDU_Observable),
105  m_socket_manager(socket_manager), m_PDU_Observable(PDU_Observable),
106  m_package(0), m_in_use(true), m_waiting(false),
107  m_destroyed(false), m_connected(false), m_has_closed(false),
108  m_queue_len(1),
109  m_time_elapsed(0), m_time_max(timeout_sec), m_time_connect_max(10),
110  m_host(host)
111 {
112  // std::cout << "create assoc " << this << "\n";
113 }
114 
115 yf::Z3950Client::Assoc::~Assoc()
116 {
117  // std::cout << "destroy assoc " << this << "\n";
118 }
119 
120 void yf::Z3950Client::Assoc::connectNotify()
121 {
122  m_waiting = false;
123 
124  m_connected = true;
125 }
126 
127 void yf::Z3950Client::Assoc::failNotify()
128 {
129  m_waiting = false;
130 
131  mp::odr odr;
132 
133  if (m_package)
134  {
135  Z_GDU *gdu = m_package->request().get();
136  Z_APDU *apdu = 0;
137  if (gdu && gdu->which == Z_GDU_Z3950)
138  apdu = gdu->u.z3950;
139 
140  m_package->response() = odr.create_close(apdu, Z_Close_peerAbort, 0);
141  m_package->session().close();
142  }
143 }
144 
145 void yf::Z3950Client::Assoc::timeoutNotify()
146 {
147  m_time_elapsed++;
148  if ((m_connected && m_time_elapsed >= m_time_max)
149  || (!m_connected && m_time_elapsed >= m_time_connect_max))
150  {
151  m_waiting = false;
152 
153  mp::odr odr;
154 
155  if (m_package)
156  {
157  Z_GDU *gdu = m_package->request().get();
158  Z_APDU *apdu = 0;
159  if (gdu && gdu->which == Z_GDU_Z3950)
160  apdu = gdu->u.z3950;
161 
162  if (m_connected)
163  m_package->response() =
164  odr.create_close(apdu, Z_Close_lackOfActivity, 0);
165  else
166  m_package->response() =
167  odr.create_close(apdu, Z_Close_peerAbort, 0);
168 
169  m_package->session().close();
170  }
171  }
172 }
173 
174 void yf::Z3950Client::Assoc::fixup_nsd(ODR odr, Z_DefaultDiagFormat *nsd)
175 {
176  std::string addinfo;
177 
178  // should really check for nsd->which.. But union has two members
179  // containing almost same data
180  const char *v2Addinfo = nsd->u.v2Addinfo;
181  // Z_InternationalString *v3Addinfo;
182  if (v2Addinfo && *v2Addinfo)
183  {
184  addinfo.assign(nsd->u.v2Addinfo);
185  addinfo += " ";
186  }
187  addinfo += "(backend=" + m_host + ")";
188  nsd->u.v2Addinfo = odr_strdup(odr, addinfo.c_str());
189 }
190 
191 void yf::Z3950Client::Assoc::fixup_nsd(ODR odr, Z_Records *records)
192 {
193  if (records && records->which == Z_Records_NSD)
194  {
195  fixup_nsd(odr, records->u.nonSurrogateDiagnostic);
196  }
197  if (records && records->which == Z_Records_multipleNSD)
198  {
199  Z_DiagRecs *drecs = records->u.multipleNonSurDiagnostics;
200  int i;
201  for (i = 0; i < drecs->num_diagRecs; i++)
202  {
203  Z_DiagRec *dr = drecs->diagRecs[i];
204 
205  if (dr->which == Z_DiagRec_defaultFormat)
206  fixup_nsd(odr, dr->u.defaultFormat);
207  }
208  }
209 }
210 
211 void yf::Z3950Client::Assoc::fixup_init(ODR odr, Z_InitResponse *initrs)
212 {
213  Z_External *uif = initrs->userInformationField;
214 
215  if (uif && uif->which == Z_External_userInfo1)
216  {
217  Z_OtherInformation *ui = uif->u.userInfo1;
218  int i;
219  for (i = 0; i < ui->num_elements; i++)
220  {
221  Z_OtherInformationUnit *unit = ui->list[i];
222  if (unit->which == Z_OtherInfo_externallyDefinedInfo &&
223  unit->information.externallyDefinedInfo &&
224  unit->information.externallyDefinedInfo->which ==
225  Z_External_diag1)
226  {
227  Z_DiagnosticFormat *diag =
228  unit->information.externallyDefinedInfo->u.diag1;
229  int j;
230  for (j = 0; j < diag->num; j++)
231  {
232  Z_DiagnosticFormat_s *ds = diag->elements[j];
233  if (ds->which == Z_DiagnosticFormat_s_defaultDiagRec)
234  {
235  Z_DefaultDiagFormat *r = ds->u.defaultDiagRec;
236  char *oaddinfo = r->u.v2Addinfo;
237  char *naddinfo = (char *) odr_malloc(
238  odr,
239  (oaddinfo ? strlen(oaddinfo) : 0) + 20 +
240  m_host.length());
241  *naddinfo = '\0';
242  if (oaddinfo && *oaddinfo)
243  {
244  strcat(naddinfo, oaddinfo);
245  strcat(naddinfo, " ");
246  }
247  strcat(naddinfo, "(backend=");
248  strcat(naddinfo, m_host.c_str());
249  strcat(naddinfo, ")");
250 
251  r->u.v2Addinfo = naddinfo;
252  }
253  }
254  }
255  }
256  }
257 }
258 
259 void yf::Z3950Client::Assoc::recv_GDU(Z_GDU *gdu, int len)
260 {
261  m_waiting = false;
262 
263  if (m_package)
264  {
265  mp::odr odr; // must be in scope for response() = assignment
266  if (gdu && gdu->which == Z_GDU_Z3950)
267  {
268  Z_APDU *apdu = gdu->u.z3950;
269  switch (apdu->which)
270  {
271  case Z_APDU_searchResponse:
272  fixup_nsd(odr, apdu->u.searchResponse->records);
273  break;
274  case Z_APDU_presentResponse:
275  fixup_nsd(odr, apdu->u.presentResponse->records);
276  break;
277  case Z_APDU_initResponse:
278  fixup_init(odr, apdu->u.initResponse);
279  break;
280  }
281  }
282  m_package->response() = gdu;
283  }
284 }
285 
286 yazpp_1::IPDU_Observer *yf::Z3950Client::Assoc::sessionNotify(
287  yazpp_1::IPDU_Observable *the_PDU_Observable,
288  int fd)
289 {
290  return 0;
291 }
292 
293 
295 {
296  m_p->m_timeout_sec = 30;
297  m_p->m_max_sockets = 0;
298  m_p->m_force_close = false;
299 }
300 
302 }
303 
304 yf::Z3950Client::Assoc *yf::Z3950Client::Rep::get_assoc(Package &package)
305 {
306  // only one thread messes with the clients list at a time
307  boost::mutex::scoped_lock lock(m_mutex);
308 
309  std::map<mp::Session,yf::Z3950Client::Assoc *>::iterator it;
310 
311  Z_GDU *gdu = package.request().get();
312 
313  int max_sockets = package.origin().get_max_sockets();
314  if (max_sockets == 0)
315  max_sockets = m_max_sockets;
316 
317  it = m_clients.find(package.session());
318  if (it != m_clients.end())
319  {
320  it->second->m_queue_len++;
321  while (true)
322  {
323 #if 0
324  // double init .. NOT working yet
325  if (gdu && gdu->which == Z_GDU_Z3950 &&
326  gdu->u.z3950->which == Z_APDU_initRequest)
327  {
328  yazpp_1::SocketManager *s = it->second->m_socket_manager;
329  delete it->second; // destroy Z_Assoc
330  delete s; // then manager
331  m_clients.erase(it);
332  break;
333  }
334 #endif
335  if (!it->second->m_in_use)
336  {
337  it->second->m_in_use = true;
338  return it->second;
339  }
340  m_cond_session_ready.wait(lock);
341  }
342  }
343  if (!gdu || gdu->which != Z_GDU_Z3950)
344  {
345  package.move();
346  return 0;
347  }
348  // new Z39.50 session ..
349  Z_APDU *apdu = gdu->u.z3950;
350  // check that it is init. If not, close
351  if (apdu->which != Z_APDU_initRequest)
352  {
353  mp::odr odr;
354 
355  package.response() = odr.create_close(apdu,
356  Z_Close_protocolError,
357  "First PDU was not an "
358  "Initialize Request");
359  package.session().close();
360  return 0;
361  }
362  std::string target = m_force_target;
363  if (!target.length())
364  {
365  target = m_default_target;
366  std::list<std::string> vhosts;
367  mp::util::remove_vhost_otherinfo(&apdu->u.initRequest->otherInfo,
368  vhosts);
369  size_t no_vhosts = vhosts.size();
370  if (no_vhosts == 1)
371  {
372  std::list<std::string>::const_iterator v_it = vhosts.begin();
373  target = *v_it;
374  }
375  else if (no_vhosts == 0)
376  {
377  if (!target.length())
378  {
379  // no default target. So we don't know where to connect
380  mp::odr odr;
381  package.response() = odr.create_initResponse(
382  apdu,
383  YAZ_BIB1_INIT_NEGOTIATION_OPTION_REQUIRED,
384  "z3950_client: No vhost given");
385 
386  package.session().close();
387  return 0;
388  }
389  }
390  else if (no_vhosts > 1)
391  {
392  mp::odr odr;
393  package.response() = odr.create_initResponse(
394  apdu,
395  YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP,
396  "z3950_client: Can not cope with multiple vhosts");
397  package.session().close();
398  return 0;
399  }
400  }
401 
402  // see if we have reached max number of clients (max-sockets)
403 
404  while (max_sockets)
405  {
406  int no_not_in_use = 0;
407  int number = 0;
408  it = m_clients.begin();
409  for (; it != m_clients.end(); it++)
410  {
411  yf::Z3950Client::Assoc *as = it->second;
412  if (!strcmp(as->m_host.c_str(), target.c_str()))
413  {
414  number++;
415  if (!as->m_in_use)
416  no_not_in_use++;
417  }
418  }
419  yaz_log(YLOG_LOG, "Found %d/%d connections for %s", number, max_sockets,
420  target.c_str());
421  if (number < max_sockets)
422  break;
423  if (no_not_in_use == 0) // all in use..
424  {
425  mp::odr odr;
426 
427  package.response() = odr.create_initResponse(
428  apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
429  "z3950_client: max sessions");
430  package.session().close();
431  return 0;
432  }
433  boost::xtime xt;
434  xtime_get(&xt,
435 #if BOOST_VERSION >= 105000
436  boost::TIME_UTC_
437 #else
438  boost::TIME_UTC
439 #endif
440  );
441 
442  xt.sec += 15;
443  if (!m_cond_session_ready.timed_wait(lock, xt))
444  {
445  mp::odr odr;
446 
447  package.response() = odr.create_initResponse(
448  apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
449  "z3950_client: max sessions");
450  package.session().close();
451  return 0;
452  }
453  }
454 
455  yazpp_1::SocketManager *sm = new yazpp_1::SocketManager;
456  yazpp_1::PDU_Assoc *pdu_as = new yazpp_1::PDU_Assoc(sm);
457  yf::Z3950Client::Assoc *as = new yf::Z3950Client::Assoc(sm, pdu_as,
458  target.c_str(),
459  m_timeout_sec);
460  m_clients[package.session()] = as;
461  return as;
462 }
463 
464 void yf::Z3950Client::Rep::send_and_receive(Package &package,
465  yf::Z3950Client::Assoc *c)
466 {
467  if (c->m_destroyed)
468  return;
469 
470  c->m_package = &package;
471 
472  if (package.session().is_closed() && c->m_connected && !c->m_has_closed
473  && m_force_close)
474  {
475  mp::odr odr;
476 
477  package.request() = odr.create_close(
478  0, Z_Close_finished, "z3950_client");
479  c->m_package = 0; // don't inspect response
480  }
481  Z_GDU *gdu = package.request().get();
482 
483  if (!gdu || gdu->which != Z_GDU_Z3950)
484  return;
485 
486  if (gdu->u.z3950->which == Z_APDU_close)
487  c->m_has_closed = true;
488 
489  // prepare connect
490  c->m_time_elapsed = 0;
491  c->m_waiting = true;
492  if (!c->m_connected)
493  {
494  if (c->client(c->m_host.c_str()))
495  {
496  mp::odr odr;
497  package.response() =
498  odr.create_close(gdu->u.z3950, Z_Close_peerAbort, 0);
499  package.session().close();
500  return;
501  }
502  c->timeout(1); // so timeoutNotify gets called once per second
503 
504 
505  while (!c->m_destroyed && c->m_waiting
506  && c->m_socket_manager->processEvent() > 0)
507  ;
508  }
509  if (!c->m_connected)
510  {
511  return;
512  }
513 
514  // prepare response
515  c->m_time_elapsed = 0;
516  c->m_waiting = true;
517 
518  // relay the package ..
519  int len;
520  c->send_GDU(gdu, &len);
521 
522  switch (gdu->u.z3950->which)
523  {
524  case Z_APDU_triggerResourceControlRequest:
525  // request only..
526  break;
527  default:
528  // for the rest: wait for a response PDU
529  while (!c->m_destroyed && c->m_waiting
530  && c->m_socket_manager->processEvent() > 0)
531  ;
532  break;
533  }
534 }
535 
536 void yf::Z3950Client::Rep::release_assoc(Package &package)
537 {
538  boost::mutex::scoped_lock lock(m_mutex);
539  std::map<mp::Session,yf::Z3950Client::Assoc *>::iterator it;
540 
541  it = m_clients.find(package.session());
542  if (it != m_clients.end())
543  {
544  it->second->m_in_use = false;
545  it->second->m_queue_len--;
546 
547  if (package.session().is_closed())
548  {
549  // destroy hint (send_and_receive)
550  it->second->m_destroyed = true;
551  if (it->second->m_queue_len == 0)
552  {
553  yazpp_1::SocketManager *s = it->second->m_socket_manager;
554  delete it->second; // destroy Z_Assoc
555  delete s; // then manager
556  m_clients.erase(it);
557  }
558  }
559  m_cond_session_ready.notify_all();
560  }
561 }
562 
563 void yf::Z3950Client::process(Package &package) const
564 {
565  yf::Z3950Client::Assoc *c = m_p->get_assoc(package);
566  if (c)
567  {
568  m_p->send_and_receive(package, c);
569  m_p->release_assoc(package);
570  }
571 }
572 
573 void yf::Z3950Client::configure(const xmlNode *ptr, bool test_only,
574  const char *path)
575 {
576  for (ptr = ptr->children; ptr; ptr = ptr->next)
577  {
578  if (ptr->type != XML_ELEMENT_NODE)
579  continue;
580  if (!strcmp((const char *) ptr->name, "timeout"))
581  {
582  m_p->m_timeout_sec = mp::xml::get_int(ptr, 30);
583  }
584  else if (!strcmp((const char *) ptr->name, "default_target"))
585  {
586  m_p->m_default_target = mp::xml::get_text(ptr);
587  }
588  else if (!strcmp((const char *) ptr->name, "force_target"))
589  {
590  m_p->m_force_target = mp::xml::get_text(ptr);
591  }
592  else if (!strcmp((const char *) ptr->name, "max-sockets"))
593  {
594  m_p->m_max_sockets = mp::xml::get_int(ptr, 0);
595  }
596  else if (!strcmp((const char *) ptr->name, "force_close"))
597  {
598  m_p->m_force_close = mp::xml::get_bool(ptr, 0);
599  }
600  else
601  {
602  throw mp::filter::FilterException("Bad element "
603  + std::string((const char *)
604  ptr->name));
605  }
606  }
607 }
608 
609 static mp::filter::Base* filter_creator()
610 {
611  return new mp::filter::Z3950Client;
612 }
613 
614 extern "C" {
615  struct metaproxy_1_filter_struct metaproxy_1_filter_z3950_client = {
616  0,
617  "z3950_client",
619  };
620 }
621 
622 /*
623  * Local variables:
624  * c-basic-offset: 4
625  * c-file-style: "Stroustrup"
626  * indent-tabs-mode: nil
627  * End:
628  * vim: shiftwidth=4 tabstop=8 expandtab
629  */
630