metaproxy  1.3.55
filter_multi.cpp
Go to the documentation of this file.
1 /* This file is part of Metaproxy.
2  Copyright (C) 2005-2013 Index Data
3 
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8 
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 for more details.
13 
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include <yaz/log.h>
20 
21 #include "config.hpp"
22 
23 #include <metaproxy/filter.hpp>
24 #include <metaproxy/package.hpp>
25 
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition.hpp>
29 #include <boost/shared_ptr.hpp>
30 
31 #include <metaproxy/util.hpp>
32 #include "filter_multi.hpp"
33 
34 #include <yaz/zgdu.h>
35 #include <yaz/otherinfo.h>
36 #include <yaz/diagbib1.h>
37 #include <yaz/match_glob.h>
38 
39 #include <vector>
40 #include <algorithm>
41 #include <map>
42 #include <iostream>
43 
44 namespace mp = metaproxy_1;
45 namespace yf = mp::filter;
46 
47 namespace metaproxy_1 {
48  namespace filter {
52  };
55  int m_count;
56  bool operator < (const BackendSet &k) const;
57  bool operator == (const BackendSet &k) const;
58  };
60  std::string m_norm_term;
61  std::string m_display_term;
62  int m_count;
63  bool operator < (const ScanTermInfo &) const;
64  bool operator == (const ScanTermInfo &) const;
65  Z_Entry *get_entry(ODR odr);
66  };
68  class PresentJob {
69  public:
71  int m_pos; // position for backend (1=first, 2=second,..
72  int m_start; // present request start
73  PresentJob(BackendPtr ptr, int pos) :
74  m_backend(ptr), m_pos(pos), m_start(0) {};
75  };
76  FrontendSet(std::string setname);
77  FrontendSet();
78  ~FrontendSet();
79 
80  void round_robin(int pos, int number, std::list<PresentJob> &job);
81  void serve_order(int pos, int number, std::list<PresentJob> &job);
82 
83  std::list<BackendSet> m_backend_sets;
84  std::string m_setname;
85  };
86  struct Multi::Backend {
88  std::string m_backend_database;
89  std::string m_vhost;
90  std::string m_route;
91  std::string m_auth;
92  void operator() (void); // thread operation
93  };
94  struct Multi::Frontend {
95  Frontend(Rep *rep);
96  ~Frontend();
97  bool m_is_multi;
98  bool m_in_use;
99  std::list<BackendPtr> m_backend_list;
100  std::map<std::string,Multi::FrontendSet> m_sets;
101 
102  void multi_move(std::list<BackendPtr> &blist);
103  void init(Package &package, Z_GDU *gdu);
104  void close(Package &package);
105  void search(Package &package, Z_APDU *apdu);
106  void present(Package &package, Z_APDU *apdu);
107  void scan(Package &package, Z_APDU *apdu);
108  void relay_apdu(Package &package, Z_APDU *apdu);
109  void record_diagnostics(Z_Records *records,
110  Z_DiagRecs * &z_diag,
111  ODR odr,
112  int &no_successful);
114  };
115  class Multi::Map {
116  std::string m_target_pattern;
117  std::string m_route;
118  std::string m_auth;
119  public:
120  Map(std::string pattern, std::string route, std::string auth) :
121  m_target_pattern(pattern), m_route(route), m_auth(auth) {};
122  bool match(const std::string target, std::string *ret,
123  std::string *auth) const {
124  if (yaz_match_glob(m_target_pattern.c_str(), target.c_str()))
125  {
126  *ret = m_route;
127  *auth = m_auth;
128  return true;
129  }
130  return false;
131  };
132  };
133  class Multi::Rep {
134  friend class Multi;
135  friend struct Frontend;
136 
137  Rep();
138  FrontendPtr get_frontend(Package &package);
139  void release_frontend(Package &package);
140  private:
141  std::list<Multi::Map> m_route_patterns;
142  boost::mutex m_mutex;
143  boost::condition m_cond_session_ready;
144  std::map<mp::Session, FrontendPtr> m_clients;
148  };
149  }
150 }
151 
152 yf::Multi::Rep::Rep()
153 {
154  m_hide_unavailable = false;
155  m_hide_errors = false;
156  m_merge_type = round_robin;
157 }
158 
159 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
160 {
161  return m_count < k.m_count;
162 }
163 
164 yf::Multi::Frontend::Frontend(Rep *rep)
165 {
166  m_p = rep;
167  m_is_multi = false;
168 }
169 
170 yf::Multi::Frontend::~Frontend()
171 {
172 }
173 
174 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
175 {
176  boost::mutex::scoped_lock lock(m_mutex);
177 
178  std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
179 
180  while(true)
181  {
182  it = m_clients.find(package.session());
183  if (it == m_clients.end())
184  break;
185 
186  if (!it->second->m_in_use)
187  {
188  it->second->m_in_use = true;
189  return it->second;
190  }
191  m_cond_session_ready.wait(lock);
192  }
193  FrontendPtr f(new Frontend(this));
194  m_clients[package.session()] = f;
195  f->m_in_use = true;
196  return f;
197 }
198 
199 void yf::Multi::Rep::release_frontend(mp::Package &package)
200 {
201  boost::mutex::scoped_lock lock(m_mutex);
202  std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
203 
204  it = m_clients.find(package.session());
205  if (it != m_clients.end())
206  {
207  if (package.session().is_closed())
208  {
209  it->second->close(package);
210  m_clients.erase(it);
211  }
212  else
213  {
214  it->second->m_in_use = false;
215  }
216  m_cond_session_ready.notify_all();
217  }
218 }
219 
220 yf::Multi::FrontendSet::FrontendSet(std::string setname)
221  : m_setname(setname)
222 {
223 }
224 
225 
226 yf::Multi::FrontendSet::FrontendSet()
227 {
228 }
229 
230 
231 yf::Multi::FrontendSet::~FrontendSet()
232 {
233 }
234 
235 yf::Multi::Multi() : m_p(new Multi::Rep)
236 {
237 }
238 
240 }
241 
242 
243 void yf::Multi::Backend::operator() (void)
244 {
245  m_package->move(m_route);
246 }
247 
248 
249 void yf::Multi::Frontend::close(mp::Package &package)
250 {
251  std::list<BackendPtr>::const_iterator bit;
252  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
253  {
254  BackendPtr b = *bit;
255 
256  b->m_package->copy_filter(package);
257  b->m_package->request() = (Z_GDU *) 0;
258  b->m_package->session().close();
259  b->m_package->move(b->m_route);
260  }
261 }
262 
263 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
264 {
265  std::list<BackendPtr>::const_iterator bit;
266  boost::thread_group g;
267  for (bit = blist.begin(); bit != blist.end(); bit++)
268  {
269  g.add_thread(new boost::thread(**bit));
270  }
271  g.join_all();
272 }
273 
274 void yf::Multi::FrontendSet::serve_order(int start, int number,
275  std::list<PresentJob> &jobs)
276 {
277  int i;
278  for (i = 0; i < number; i++)
279  {
280  std::list<BackendSet>::const_iterator bsit;
281  int voffset = 0;
282  int offset = start + i - 1;
283  for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end();
284  bsit++)
285  {
286  if (offset >= voffset && offset < voffset + bsit->m_count)
287  {
288  PresentJob job(bsit->m_backend, offset - voffset + 1);
289  jobs.push_back(job);
290  break;
291  }
292  voffset += bsit->m_count;
293  }
294  }
295 }
296 
297 void yf::Multi::FrontendSet::round_robin(int start, int number,
298  std::list<PresentJob> &jobs)
299 {
300  std::list<int> pos;
301  std::list<BackendSet>::const_iterator bsit;
302  for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
303  {
304  pos.push_back(1);
305  }
306 
307  int p = 1;
308 #if 1
309  // optimization step!
310  int omin = 0;
311  while(true)
312  {
313  int min = 0;
314  int no_left = 0;
315  // find min count for each set which is > omin
316  for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
317  {
318  if (bsit->m_count > omin)
319  {
320  if (no_left == 0 || bsit->m_count < min)
321  min = bsit->m_count;
322  no_left++;
323  }
324  }
325  if (no_left == 0) // if nothing greater than omin, bail out.
326  break;
327  int skip = no_left * min;
328  if (p + skip > start) // step gets us "into" present range?
329  {
330  // Yes. skip until start.. Rounding off is deliberate!
331  min = (start-p) / no_left;
332  p += no_left * min;
333 
334  // update positions in each set..
335  std::list<int>::iterator psit = pos.begin();
336  for (psit = pos.begin(); psit != pos.end(); psit++)
337  *psit += min;
338  break;
339  }
340  // skip on each set.. before "present range"..
341  p = p + skip;
342 
343  std::list<int>::iterator psit = pos.begin();
344  for (psit = pos.begin(); psit != pos.end(); psit++)
345  *psit += min;
346 
347  omin = min; // update so we consider next class (with higher count)
348  }
349 #endif
350  int fetched = 0;
351  bool more = true;
352  while (more)
353  {
354  more = false;
355  std::list<int>::iterator psit = pos.begin();
356  bsit = m_backend_sets.begin();
357 
358  for (; bsit != m_backend_sets.end(); psit++,bsit++)
359  {
360  if (fetched >= number)
361  {
362  more = false;
363  break;
364  }
365  if (*psit <= bsit->m_count)
366  {
367  if (p >= start)
368  {
369  PresentJob job(bsit->m_backend, *psit);
370  jobs.push_back(job);
371  fetched++;
372  }
373  (*psit)++;
374  p++;
375  more = true;
376  }
377  }
378  }
379 }
380 
381 void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
382 {
383  Z_InitRequest *req = gdu->u.z3950->u.initRequest;
384 
385  std::list<std::string> targets;
386 
387  mp::util::get_vhost_otherinfo(req->otherInfo, targets);
388 
389  if (targets.size() < 1)
390  {
391  package.move();
392  return;
393  }
394 
395  std::list<std::string>::const_iterator t_it = targets.begin();
396  for (; t_it != targets.end(); t_it++)
397  {
398  Session s;
399  Backend *b = new Backend;
400  b->m_vhost = *t_it;
401 
402  std::list<Multi::Map>::const_iterator it =
403  m_p->m_route_patterns.begin();
404  while (it != m_p->m_route_patterns.end()) {
405  if (it->match(*t_it, &b->m_route, &b->m_auth))
406  break;
407  it++;
408  }
409  // b->m_route = m_p->m_target_route[*t_it];
410  // b->m_route unset
411  b->m_package = PackagePtr(new Package(s, package.origin()));
412 
413  m_backend_list.push_back(BackendPtr(b));
414  }
415  m_is_multi = true;
416 
417  // create init request
418  std::list<BackendPtr>::iterator bit;
419  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
420  {
421  mp::odr odr;
422  BackendPtr b = *bit;
423  Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
424 
425  std::list<std::string>vhost_one;
426  vhost_one.push_back(b->m_vhost);
427  mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
428  odr, vhost_one);
429 
430 
431  Z_InitRequest *breq = init_apdu->u.initRequest;
432 
433  if (b->m_auth.length())
434  {
435  breq->idAuthentication =
436  (Z_IdAuthentication *)
437  odr_malloc(odr, sizeof(*breq->idAuthentication));
438  breq->idAuthentication->which = Z_IdAuthentication_open;
439  breq->idAuthentication->u.open = odr_strdup(odr, b->m_auth.c_str());
440  }
441  else
442  breq->idAuthentication = req->idAuthentication;
443 
444  *breq->preferredMessageSize = *req->preferredMessageSize;
445  *breq->maximumRecordSize = *req->maximumRecordSize;
446 
447  ODR_MASK_SET(breq->options, Z_Options_search);
448  ODR_MASK_SET(breq->options, Z_Options_present);
449  ODR_MASK_SET(breq->options, Z_Options_namedResultSets);
450  ODR_MASK_SET(breq->options, Z_Options_scan);
451 
452  ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_1);
453  ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_2);
454  ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_3);
455 
456  b->m_package->request() = init_apdu;
457 
458  b->m_package->copy_filter(package);
459  }
460  multi_move(m_backend_list);
461 
462  // create the frontend init response based on each backend init response
463  mp::odr odr;
464 
465  Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
466  Z_InitResponse *f_resp = f_apdu->u.initResponse;
467 
468  ODR_MASK_SET(f_resp->options, Z_Options_search);
469  ODR_MASK_SET(f_resp->options, Z_Options_present);
470  ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
471  ODR_MASK_SET(f_resp->options, Z_Options_scan);
472 
473  ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
474  ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
475  ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
476 
477  int no_failed = 0;
478  int no_succeeded = 0;
479 
480  Odr_int preferredMessageSize = *req->preferredMessageSize;
481  Odr_int maximumRecordSize = *req->maximumRecordSize;
482  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
483  {
484  PackagePtr p = (*bit)->m_package;
485 
486  if (p->session().is_closed())
487  {
488  // failed. Remove from list and increment number of failed
489  no_failed++;
490  bit = m_backend_list.erase(bit);
491  continue;
492  }
493  Z_GDU *gdu = p->response().get();
494  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
495  Z_APDU_initResponse)
496  {
497  int i;
498  Z_APDU *b_apdu = gdu->u.z3950;
499  Z_InitResponse *b_resp = b_apdu->u.initResponse;
500 
501  // common options for all backends
502  for (i = 0; i <= Z_Options_stringSchema; i++)
503  {
504  if (!ODR_MASK_GET(b_resp->options, i))
505  ODR_MASK_CLEAR(f_resp->options, i);
506  }
507  // common protocol version
508  for (i = 0; i <= Z_ProtocolVersion_3; i++)
509  if (!ODR_MASK_GET(b_resp->protocolVersion, i))
510  ODR_MASK_CLEAR(f_resp->protocolVersion, i);
511  if (*b_resp->result)
512  {
513  no_succeeded++;
514  if (preferredMessageSize > *b_resp->preferredMessageSize)
515  preferredMessageSize = *b_resp->preferredMessageSize;
516  if (maximumRecordSize > *b_resp->maximumRecordSize)
517  maximumRecordSize = *b_resp->maximumRecordSize;
518  }
519  else
520  {
521  if (!f_resp->userInformationField
522  && b_resp->userInformationField)
523  f_resp->userInformationField = b_resp->userInformationField;
524  no_failed++;
525  }
526  }
527  else
528  no_failed++;
529  bit++;
530  }
531  *f_resp->preferredMessageSize = preferredMessageSize;
532  *f_resp->maximumRecordSize = maximumRecordSize;
533 
534  if (m_p->m_hide_unavailable)
535  {
536  if (no_succeeded == 0)
537  {
538  *f_resp->result = 0;
539  package.session().close();
540  }
541  }
542  else
543  {
544  if (no_failed)
545  {
546  *f_resp->result = 0;
547  package.session().close();
548  }
549  }
550  package.response() = f_apdu;
551 }
552 
553 void yf::Multi::Frontend::record_diagnostics(Z_Records *records,
554  Z_DiagRecs * &z_diag,
555  ODR odr,
556  int &no_successful)
557 {
558  // see we get any errors (AKA diagnstics)
559  if (records)
560  {
561  if (records->which == Z_Records_NSD)
562  {
563  if (!z_diag)
564  {
565  z_diag = (Z_DiagRecs *)
566  odr_malloc(odr, sizeof(*z_diag));
567  z_diag->num_diagRecs = 0;
568  z_diag->diagRecs = (Z_DiagRec**)
569  odr_malloc(odr, sizeof(*z_diag->diagRecs));
570  }
571  else
572  {
573  Z_DiagRec **n = (Z_DiagRec **)
574  odr_malloc(odr,
575  (1 + z_diag->num_diagRecs) * sizeof(*n));
576  memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs
577  * sizeof(*n));
578  z_diag->diagRecs = n;
579  }
580  Z_DiagRec *nr = (Z_DiagRec *) odr_malloc(odr, sizeof(*nr));
581  nr->which = Z_DiagRec_defaultFormat;
582  nr->u.defaultFormat =
583  records->u.nonSurrogateDiagnostic;
584  z_diag->diagRecs[z_diag->num_diagRecs++] = nr;
585  }
586  else if (records->which == Z_Records_multipleNSD)
587  {
588  Z_DiagRecs * dr =records->u.multipleNonSurDiagnostics;
589 
590  if (!z_diag)
591  {
592  z_diag = (Z_DiagRecs *) odr_malloc(odr, sizeof(*z_diag));
593  z_diag->num_diagRecs = 0;
594  z_diag->diagRecs = 0;
595  }
596  Z_DiagRec **n = (Z_DiagRec **)
597  odr_malloc(odr,
598  (dr->num_diagRecs + z_diag->num_diagRecs) *
599  sizeof(*n));
600  if (z_diag->num_diagRecs)
601  memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs * sizeof(*n));
602  memcpy(n + z_diag->num_diagRecs,
603  dr->diagRecs, dr->num_diagRecs * sizeof(*n));
604  z_diag->diagRecs = n;
605  z_diag->num_diagRecs += dr->num_diagRecs;
606  }
607  else
608  no_successful++; // probably piggyback
609  }
610  else
611  no_successful++; // no records and no diagnostics
612 }
613 
614 void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
615 {
616  // create search request
617  Z_SearchRequest *req = apdu_req->u.searchRequest;
618 
619  std::list<BackendPtr>::const_iterator bit;
620  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
621  {
622  PackagePtr p = (*bit)->m_package;
623  yazpp_1::GDU gdu1(apdu_req);
624  mp::odr odr;
625  Z_SearchRequest *req1 = gdu1.get()->u.z3950->u.searchRequest;
626 
627  // they are altered now - to disable piggyback
628  *req1->smallSetUpperBound = 0;
629  *req1->largeSetLowerBound = 1;
630  *req1->mediumSetPresentNumber = 0;
631 
632  if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
633  &req1->num_databaseNames,
634  &req1->databaseNames))
635  {
636  req1->num_databaseNames = req->num_databaseNames;
637  req1->databaseNames = req->databaseNames;
638  }
639  p->request() = gdu1;
640  p->copy_filter(package);
641  }
642  multi_move(m_backend_list);
643 
644  // look at each response
645  FrontendSet resultSet(std::string(req->resultSetName));
646 
647  mp::odr odr;
648  Odr_int result_set_size = 0;
649  Z_DiagRecs *z_diag = 0;
650  int no_successful = 0;
651  PackagePtr close_p;
652 
653  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
654  {
655  PackagePtr p = (*bit)->m_package;
656 
657  // save closing package for at least one target
658  if (p->session().is_closed())
659  close_p = p;
660 
661  Z_GDU *gdu = p->response().get();
662  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
663  Z_APDU_searchResponse)
664  {
665  Z_APDU *b_apdu = gdu->u.z3950;
666  Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
667 
668  record_diagnostics(b_resp->records, z_diag, odr, no_successful);
669 
670  BackendSet backendSet;
671  backendSet.m_backend = *bit;
672  backendSet.m_count = *b_resp->resultCount;
673  result_set_size += *b_resp->resultCount;
674  resultSet.m_backend_sets.push_back(backendSet);
675  }
676  }
677 
678  Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
679  Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
680 
681  yaz_log(YLOG_DEBUG, "no_successful=%d is_closed=%s hide_errors=%s",
682  no_successful,
683  close_p ? "true" : "false",
684  m_p->m_hide_errors ? "true" : "false");
685  *f_resp->resultCount = result_set_size;
686  if (close_p && (no_successful == 0 || !m_p->m_hide_errors))
687  {
688  package.session().close();
689  package.response() = close_p->response();
690  return;
691  }
692  if (z_diag && (no_successful == 0 || !m_p->m_hide_errors))
693  {
694  f_resp->records = (Z_Records *)
695  odr_malloc(odr, sizeof(*f_resp->records));
696  if (z_diag->num_diagRecs > 1)
697  {
698  f_resp->records->which = Z_Records_multipleNSD;
699  f_resp->records->u.multipleNonSurDiagnostics = z_diag;
700  }
701  else
702  {
703  f_resp->records->which = Z_Records_NSD;
704  f_resp->records->u.nonSurrogateDiagnostic =
705  z_diag->diagRecs[0]->u.defaultFormat;
706  }
707  }
708  // assume OK
709  m_sets[resultSet.m_setname] = resultSet;
710 
711  Odr_int number;
712  mp::util::piggyback(*req->smallSetUpperBound,
713  *req->largeSetLowerBound,
714  *req->mediumSetPresentNumber,
715  0, 0,
716  result_set_size,
717  number, 0);
718  Package pp(package.session(), package.origin());
719  if (z_diag == 0 && number > 0)
720  {
721  pp.copy_filter(package);
722  Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
723  Z_PresentRequest *p_req = p_apdu->u.presentRequest;
724  p_req->preferredRecordSyntax = req->preferredRecordSyntax;
725  p_req->resultSetId = req->resultSetName;
726  *p_req->resultSetStartPoint = 1;
727  *p_req->numberOfRecordsRequested = number;
728  pp.request() = p_apdu;
729  present(pp, p_apdu);
730 
731  if (pp.session().is_closed())
732  package.session().close();
733 
734  Z_GDU *gdu = pp.response().get();
735  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
736  Z_APDU_presentResponse)
737  {
738  Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
739  f_resp->records = p_res->records;
740  *f_resp->numberOfRecordsReturned =
741  *p_res->numberOfRecordsReturned;
742  *f_resp->nextResultSetPosition =
743  *p_res->nextResultSetPosition;
744  }
745  else
746  {
747  package.response() = pp.response();
748  return;
749  }
750  }
751  package.response() = f_apdu; // in this scope because of p
752 }
753 
754 void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
755 {
756  // create present request
757  Z_PresentRequest *req = apdu_req->u.presentRequest;
758 
759  Sets_it it;
760  it = m_sets.find(std::string(req->resultSetId));
761  if (it == m_sets.end())
762  {
763  mp::odr odr;
764  Z_APDU *apdu =
765  odr.create_presentResponse(
766  apdu_req,
767  YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
768  req->resultSetId);
769  package.response() = apdu;
770  return;
771  }
772  std::list<Multi::FrontendSet::PresentJob> jobs;
773  int start = *req->resultSetStartPoint;
774  int number = *req->numberOfRecordsRequested;
775 
776  if (m_p->m_merge_type == round_robin)
777  it->second.round_robin(start, number, jobs);
778  else if (m_p->m_merge_type == serve_order)
779  it->second.serve_order(start, number, jobs);
780 
781  if (0)
782  {
783  std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
784  for (jit = jobs.begin(); jit != jobs.end(); jit++)
785  {
786  yaz_log(YLOG_DEBUG, "job pos=%d", jit->m_pos);
787  }
788  }
789 
790  std::list<BackendPtr> present_backend_list;
791 
792  std::list<BackendSet>::const_iterator bsit;
793  bsit = it->second.m_backend_sets.begin();
794  for (; bsit != it->second.m_backend_sets.end(); bsit++)
795  {
796  int start = -1;
797  int end = -1;
798  {
799  std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
800  for (jit = jobs.begin(); jit != jobs.end(); jit++)
801  {
802  if (jit->m_backend == bsit->m_backend)
803  {
804  if (start == -1 || jit->m_pos < start)
805  start = jit->m_pos;
806  if (end == -1 || jit->m_pos > end)
807  end = jit->m_pos;
808  }
809  }
810  }
811  if (start != -1)
812  {
813  std::list<Multi::FrontendSet::PresentJob>::iterator jit;
814  for (jit = jobs.begin(); jit != jobs.end(); jit++)
815  {
816  if (jit->m_backend == bsit->m_backend)
817  {
818  if (jit->m_pos >= start && jit->m_pos <= end)
819  jit->m_start = start;
820  }
821  }
822 
823  PackagePtr p = bsit->m_backend->m_package;
824 
825  *req->resultSetStartPoint = start;
826  *req->numberOfRecordsRequested = end - start + 1;
827 
828  p->request() = apdu_req;
829  p->copy_filter(package);
830 
831  present_backend_list.push_back(bsit->m_backend);
832  }
833  }
834  multi_move(present_backend_list);
835 
836  // look at each response
837  Z_DiagRecs *z_diag = 0;
838  int no_successful = 0;
839  mp::odr odr;
840  PackagePtr close_p;
841 
842  std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
843  for (; pbit != present_backend_list.end(); pbit++)
844  {
845  PackagePtr p = (*pbit)->m_package;
846 
847  // save closing package for at least one target
848  if (p->session().is_closed())
849  close_p = p;
850 
851  Z_GDU *gdu = p->response().get();
852  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
853  Z_APDU_presentResponse)
854  {
855  Z_APDU *b_apdu = gdu->u.z3950;
856  Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
857 
858  record_diagnostics(b_resp->records, z_diag, odr, no_successful);
859  }
860  }
861 
862  Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
863  Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
864 
865  if (close_p && (no_successful == 0 || !m_p->m_hide_errors))
866  {
867  package.session().close();
868  package.response() = close_p->response();
869  return;
870  }
871  if (z_diag && (no_successful == 0 || !m_p->m_hide_errors))
872  {
873  f_resp->records = (Z_Records *)
874  odr_malloc(odr, sizeof(*f_resp->records));
875  if (z_diag->num_diagRecs > 1)
876  {
877  f_resp->records->which = Z_Records_multipleNSD;
878  f_resp->records->u.multipleNonSurDiagnostics = z_diag;
879  }
880  else
881  {
882  f_resp->records->which = Z_Records_NSD;
883  f_resp->records->u.nonSurrogateDiagnostic =
884  z_diag->diagRecs[0]->u.defaultFormat;
885  }
886  }
887  else if (number < 0 || (size_t) number > jobs.size())
888  {
889  f_apdu =
890  odr.create_presentResponse(
891  apdu_req,
892  YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
893  0);
894  }
895  else
896  {
897  f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
898  Z_Records * records = f_resp->records;
899  records->which = Z_Records_DBOSD;
900  records->u.databaseOrSurDiagnostics =
901  (Z_NamePlusRecordList *)
902  odr_malloc(odr, sizeof(Z_NamePlusRecordList));
903  Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
904  nprl->num_records = jobs.size();
905  nprl->records = (Z_NamePlusRecord**)
906  odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
907  int i = 0;
908  std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
909  for (jit = jobs.begin(); jit != jobs.end(); jit++)
910  {
911  PackagePtr p = jit->m_backend->m_package;
912 
913  Z_GDU *gdu = p->response().get();
914  Z_APDU *b_apdu = gdu->u.z3950;
915  int inside_pos = jit->m_pos - jit->m_start;
916  Z_Records *records = b_apdu->u.presentResponse->records;
917 
918  if (records && records->which == Z_Records_DBOSD
919  && inside_pos <
920  records->u.databaseOrSurDiagnostics->num_records)
921  {
922  nprl->records[i] = (Z_NamePlusRecord*)
923  odr_malloc(odr, sizeof(Z_NamePlusRecord));
924  *nprl->records[i] = *records->
925  u.databaseOrSurDiagnostics->records[inside_pos];
926  nprl->records[i]->databaseName =
927  odr_strdup(odr, jit->m_backend->m_vhost.c_str());
928  i++;
929  }
930  }
931  nprl->num_records = i; // usually same as jobs.size();
932  *f_resp->nextResultSetPosition = start + i;
933  *f_resp->numberOfRecordsReturned = i;
934  }
935  package.response() = f_apdu;
936 }
937 
938 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
939 {
940  return m_norm_term < k.m_norm_term;
941 }
942 
943 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
944 {
945  return m_norm_term == k.m_norm_term;
946 }
947 
948 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
949 {
950  Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
951  e->which = Z_Entry_termInfo;
952  Z_TermInfo *t;
953  t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
954  t->suggestedAttributes = 0;
955  t->displayTerm = 0;
956  t->alternativeTerm = 0;
957  t->byAttributes = 0;
958  t->otherTermInfo = 0;
959  t->globalOccurrences = odr_intdup(odr, m_count);
960  t->term = (Z_Term *)
961  odr_malloc(odr, sizeof(*t->term));
962  t->term->which = Z_Term_general;
963  Odr_oct *o;
964  t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
965 
966  o->len = o->size = m_norm_term.size();
967  o->buf = (unsigned char *) odr_malloc(odr, o->len);
968  memcpy(o->buf, m_norm_term.c_str(), o->len);
969  return e;
970 }
971 
972 void yf::Multi::Frontend::relay_apdu(mp::Package &package, Z_APDU *apdu_req)
973 {
974  std::list<BackendPtr>::const_iterator bit;
975  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
976  {
977  PackagePtr p = (*bit)->m_package;
978  mp::odr odr;
979 
980  p->request() = apdu_req;
981  p->copy_filter(package);
982  }
983  multi_move(m_backend_list);
984  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
985  {
986  PackagePtr p = (*bit)->m_package;
987 
988  if (p->session().is_closed()) // if any backend closes, close frontend
989  package.session().close();
990 
991  package.response() = p->response();
992  }
993 }
994 
995 
996 void yf::Multi::Frontend::scan(mp::Package &package, Z_APDU *apdu_req)
997 {
998  Z_ScanRequest *req = apdu_req->u.scanRequest;
999 
1000  std::list<BackendPtr>::const_iterator bit;
1001  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1002  {
1003  PackagePtr p = (*bit)->m_package;
1004  yazpp_1::GDU gdu1(apdu_req);
1005  mp::odr odr;
1006  Z_ScanRequest *req1 = gdu1.get()->u.z3950->u.scanRequest;
1007 
1008  if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
1009  &req1->num_databaseNames,
1010  &req1->databaseNames))
1011  {
1012  req1->num_databaseNames = req->num_databaseNames;
1013  req1->databaseNames = req->databaseNames;
1014  }
1015  p->request() = gdu1;
1016  p->copy_filter(package);
1017  }
1018  multi_move(m_backend_list);
1019 
1020  ScanTermInfoList entries_before;
1021  ScanTermInfoList entries_after;
1022  int no_before = 0;
1023  int no_after = 0;
1024 
1025  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1026  {
1027  PackagePtr p = (*bit)->m_package;
1028 
1029  if (p->session().is_closed()) // if any backend closes, close frontend
1030  package.session().close();
1031 
1032  Z_GDU *gdu = p->response().get();
1033  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1034  Z_APDU_scanResponse)
1035  {
1036  Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
1037 
1038  if (res->entries && res->entries->nonsurrogateDiagnostics)
1039  {
1040  // failure
1041  mp::odr odr;
1042  Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
1043  Z_ScanResponse *f_res = f_apdu->u.scanResponse;
1044 
1045  f_res->entries->nonsurrogateDiagnostics =
1046  res->entries->nonsurrogateDiagnostics;
1047  f_res->entries->num_nonsurrogateDiagnostics =
1048  res->entries->num_nonsurrogateDiagnostics;
1049 
1050  package.response() = f_apdu;
1051  return;
1052  }
1053 
1054  if (res->entries && res->entries->entries)
1055  {
1056  Z_Entry **entries = res->entries->entries;
1057  int num_entries = res->entries->num_entries;
1058  int position = 1;
1059  if (req->preferredPositionInResponse)
1060  position = *req->preferredPositionInResponse;
1061  if (res->positionOfTerm)
1062  position = *res->positionOfTerm;
1063 
1064  // before
1065  int i;
1066  for (i = 0; i<position-1 && i<num_entries; i++)
1067  {
1068  Z_Entry *ent = entries[i];
1069 
1070  if (ent->which == Z_Entry_termInfo)
1071  {
1072  ScanTermInfo my;
1073 
1074  Odr_int *occur = ent->u.termInfo->globalOccurrences;
1075  my.m_count = occur ? *occur : 0;
1076 
1077  if (ent->u.termInfo->term->which == Z_Term_general)
1078  {
1079  my.m_norm_term = std::string(
1080  (const char *)
1081  ent->u.termInfo->term->u.general->buf,
1082  ent->u.termInfo->term->u.general->len);
1083  }
1084  if (my.m_norm_term.length())
1085  {
1086  ScanTermInfoList::iterator it =
1087  entries_before.begin();
1088  while (it != entries_before.end() && my <*it)
1089  it++;
1090  if (it != entries_before.end() && my == *it)
1091  {
1092  it->m_count += my.m_count;
1093  }
1094  else
1095  {
1096  entries_before.insert(it, my);
1097  no_before++;
1098  }
1099  }
1100  }
1101  }
1102  // after
1103  if (position <= 0)
1104  i = 0;
1105  else
1106  i = position-1;
1107  for ( ; i<num_entries; i++)
1108  {
1109  Z_Entry *ent = entries[i];
1110 
1111  if (ent->which == Z_Entry_termInfo)
1112  {
1113  ScanTermInfo my;
1114 
1115  Odr_int *occur = ent->u.termInfo->globalOccurrences;
1116  my.m_count = occur ? *occur : 0;
1117 
1118  if (ent->u.termInfo->term->which == Z_Term_general)
1119  {
1120  my.m_norm_term = std::string(
1121  (const char *)
1122  ent->u.termInfo->term->u.general->buf,
1123  ent->u.termInfo->term->u.general->len);
1124  }
1125  if (my.m_norm_term.length())
1126  {
1127  ScanTermInfoList::iterator it =
1128  entries_after.begin();
1129  while (it != entries_after.end() && *it < my)
1130  it++;
1131  if (it != entries_after.end() && my == *it)
1132  {
1133  it->m_count += my.m_count;
1134  }
1135  else
1136  {
1137  entries_after.insert(it, my);
1138  no_after++;
1139  }
1140  }
1141  }
1142  }
1143 
1144  }
1145  }
1146  else
1147  {
1148  // if any target does not return scan response - return that
1149  package.response() = p->response();
1150  return;
1151  }
1152  }
1153 
1154  if (false)
1155  {
1156  std::cout << "BEFORE\n";
1157  ScanTermInfoList::iterator it = entries_before.begin();
1158  for(; it != entries_before.end(); it++)
1159  {
1160  std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1161  }
1162 
1163  std::cout << "AFTER\n";
1164  it = entries_after.begin();
1165  for(; it != entries_after.end(); it++)
1166  {
1167  std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1168  }
1169  }
1170 
1171  if (false)
1172  {
1173  mp::odr odr;
1174  Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1175  package.response() = f_apdu;
1176  }
1177  else
1178  {
1179  mp::odr odr;
1180  Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1181  Z_ScanResponse *resp = f_apdu->u.scanResponse;
1182 
1183  int number_returned = *req->numberOfTermsRequested;
1184  int position_returned = *req->preferredPositionInResponse;
1185 
1186  resp->entries->num_entries = number_returned;
1187  resp->entries->entries = (Z_Entry**)
1188  odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1189  int i;
1190 
1191  int lbefore = entries_before.size();
1192  if (lbefore < position_returned-1)
1193  position_returned = lbefore+1;
1194 
1195  ScanTermInfoList::iterator it = entries_before.begin();
1196  for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1197  {
1198  resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1199  }
1200 
1201  it = entries_after.begin();
1202 
1203  if (position_returned <= 0)
1204  i = 0;
1205  else
1206  i = position_returned-1;
1207  for (; i<number_returned && it != entries_after.end(); i++, it++)
1208  {
1209  resp->entries->entries[i] = it->get_entry(odr);
1210  }
1211 
1212  number_returned = i;
1213 
1214  resp->positionOfTerm = odr_intdup(odr, position_returned);
1215  resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1216  resp->entries->num_entries = number_returned;
1217 
1218  package.response() = f_apdu;
1219  }
1220 }
1221 
1222 
1223 void yf::Multi::process(mp::Package &package) const
1224 {
1225  FrontendPtr f = m_p->get_frontend(package);
1226 
1227  Z_GDU *gdu = package.request().get();
1228 
1229  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1230  Z_APDU_initRequest && !f->m_is_multi)
1231  {
1232  f->init(package, gdu);
1233  }
1234  else if (!f->m_is_multi)
1235  package.move();
1236  else if (gdu && gdu->which == Z_GDU_Z3950)
1237  {
1238  Z_APDU *apdu = gdu->u.z3950;
1239  if (apdu->which == Z_APDU_initRequest)
1240  {
1241  mp::odr odr;
1242 
1243  package.response() = odr.create_close(
1244  apdu,
1245  Z_Close_protocolError,
1246  "double init");
1247 
1248  package.session().close();
1249  }
1250  else if (apdu->which == Z_APDU_searchRequest)
1251  {
1252  f->search(package, apdu);
1253  }
1254  else if (apdu->which == Z_APDU_presentRequest)
1255  {
1256  f->present(package, apdu);
1257  }
1258  else if (apdu->which == Z_APDU_scanRequest)
1259  {
1260  f->scan(package, apdu);
1261  }
1262  else if (apdu->which == Z_APDU_close)
1263  {
1264  f->relay_apdu(package, apdu);
1265  }
1266  else
1267  {
1268  mp::odr odr;
1269 
1270  package.response() = odr.create_close(
1271  apdu, Z_Close_protocolError,
1272  "unsupported APDU in filter multi");
1273 
1274  package.session().close();
1275  }
1276  }
1277  m_p->release_frontend(package);
1278 }
1279 
1280 void mp::filter::Multi::configure(const xmlNode * ptr, bool test_only,
1281  const char *path)
1282 {
1283  for (ptr = ptr->children; ptr; ptr = ptr->next)
1284  {
1285  if (ptr->type != XML_ELEMENT_NODE)
1286  continue;
1287  if (!strcmp((const char *) ptr->name, "target"))
1288  {
1289  std::string auth;
1290  std::string route = mp::xml::get_route(ptr, auth);
1291  std::string target = mp::xml::get_text(ptr);
1292  if (target.length() == 0)
1293  target = route;
1294  m_p->m_route_patterns.push_back(Multi::Map(target, route, auth));
1295  }
1296  else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1297  {
1298  m_p->m_hide_unavailable = true;
1299  }
1300  else if (!strcmp((const char *) ptr->name, "hideerrors"))
1301  {
1302  m_p->m_hide_errors = true;
1303  }
1304  else if (!strcmp((const char *) ptr->name, "mergetype"))
1305  {
1306  std::string mergetype = mp::xml::get_text(ptr);
1307  if (mergetype == "roundrobin")
1308  m_p->m_merge_type = round_robin;
1309  else if (mergetype == "serveorder")
1310  m_p->m_merge_type = serve_order;
1311  else
1312  throw mp::filter::FilterException
1313  ("Bad mergetype " + mergetype + " in multi filter");
1314 
1315  }
1316  else
1317  {
1318  throw mp::filter::FilterException
1319  ("Bad element "
1320  + std::string((const char *) ptr->name)
1321  + " in multi filter");
1322  }
1323  }
1324 }
1325 
1326 static mp::filter::Base* filter_creator()
1327 {
1328  return new mp::filter::Multi;
1329 }
1330 
1331 extern "C" {
1332  struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1333  0,
1334  "multi",
1336  };
1337 }
1338 
1339 
1340 /*
1341  * Local variables:
1342  * c-basic-offset: 4
1343  * c-file-style: "Stroustrup"
1344  * indent-tabs-mode: nil
1345  * End:
1346  * vim: shiftwidth=4 tabstop=8 expandtab
1347  */
1348