metaproxy  1.13.0
filter_frontend_net.cpp
Go to the documentation of this file.
1 /* This file is part of Metaproxy.
2  Copyright (C) Index Data
3 
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8 
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 for more details.
13 
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "config.hpp"
20 
21 #if HAVE_GETRLIMIT
22 #include <sys/resource.h>
23 #endif
24 #include <sstream>
25 #include <iomanip>
26 #include <metaproxy/util.hpp>
27 #include "pipe.hpp"
28 #include <metaproxy/filter.hpp>
29 #include <metaproxy/package.hpp>
30 #include "thread_pool_observer.hpp"
31 #include "filter_frontend_net.hpp"
32 #include <yazpp/z-assoc.h>
33 #include <yazpp/pdu-assoc.h>
34 #include <yazpp/socket-manager.h>
35 #include <yazpp/limit-connect.h>
36 #include <yaz/timing.h>
37 #include <yaz/log.h>
38 #include <yaz/daemon.h>
39 #include <yaz/malloc_info.h>
40 #include "gduutil.hpp"
41 #include <signal.h>
42 
43 #include <iostream>
44 
45 namespace mp = metaproxy_1;
46 namespace yf = metaproxy_1::filter;
47 
48 namespace metaproxy_1 {
49  namespace filter {
51  friend class Rep;
52  friend class FrontendNet;
53  std::string port;
54  std::string route;
55  std::string cert_fname;
57  };
59  friend class Rep;
60  friend class FrontendNet;
61  std::string pattern;
62  int verbose;
63  int value;
64  };
66  friend class FrontendNet;
67 
71  std::vector<Port> m_ports;
74  std::list<IP_Pattern> connect_max;
75  std::list<IP_Pattern> http_req_max;
76  std::string m_msg_config;
77  std::string m_stat_req;
78  yazpp_1::SocketManager mySocketManager;
80  yazpp_1::PDU_Assoc **pdu;
81  int m_duration_freq[22];
82  double m_duration_lim[22];
87  public:
88  Rep();
89  ~Rep();
90  };
91  class FrontendNet::My_Timer_Thread : public yazpp_1::ISocketObserver {
92  private:
93  yazpp_1::ISocketObservable *m_obs;
95  bool m_timeout;
96  public:
97  My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
98  void socketNotify(int event);
99  bool timeout();
100  };
101  class FrontendNet::ZAssocChild : public yazpp_1::Z_Assoc {
102  public:
103  ~ZAssocChild();
104  ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
105  mp::ThreadPoolSocketObserver *m_thread_pool_observer,
106  const mp::Package *package,
107  Port *port,
108  Rep *rep,
109  yazpp_1::LimitConnect &limit,
110  const char *peername);
113  private:
114  yazpp_1::IPDU_Observer* sessionNotify(
115  yazpp_1::IPDU_Observable *the_PDU_Observable,
116  int fd);
117  void recv_GDU(Z_GDU *apdu, int len);
118  void report(Z_HTTP_Request *hreq);
119  void failNotify();
120  void timeoutNotify();
121  void connectNotify();
122  private:
124  mp::Session m_session;
125  mp::Origin m_origin;
127  const mp::Package *m_package;
129  yazpp_1::LimitConnect &m_limit_http_req;
130  };
132  public:
133  ThreadPoolPackage(mp::Package *package,
134  yf::FrontendNet::ZAssocChild *ses,
135  Rep *rep);
137  IThreadPoolMsg *handle();
138  void result(const char *t_info);
139  bool cleanup(void *info);
140  private:
141  yaz_timing_t timer;
143  mp::Package *m_package;
145  };
146  class FrontendNet::ZAssocServer : public yazpp_1::Z_Assoc {
147  public:
148  ~ZAssocServer();
149  ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
151  Rep *rep);
152  void set_package(const mp::Package *package);
153  void set_thread_pool(ThreadPoolSocketObserver *observer);
154  private:
155  yazpp_1::IPDU_Observer* sessionNotify(
156  yazpp_1::IPDU_Observable *the_PDU_Observable,
157  int fd);
158  void recv_GDU(Z_GDU *apdu, int len);
159 
160  void failNotify();
161  void timeoutNotify();
162  void connectNotify();
163  private:
165  const mp::Package *m_package;
166  yazpp_1::LimitConnect limit_connect;
167  yazpp_1::LimitConnect limit_http_req;
170  };
171  }
172 }
173 
174 yf::FrontendNet::ThreadPoolPackage::ThreadPoolPackage(mp::Package *package,
175  ZAssocChild *ses,
176  Rep *rep) :
177  m_assoc_child(ses), m_package(package), m_p(rep)
178 {
179  timer = yaz_timing_create();
180 }
181 
182 yf::FrontendNet::ThreadPoolPackage::~ThreadPoolPackage()
183 {
184  yaz_timing_destroy(&timer); // timer may be NULL
185  delete m_package;
186 }
187 
188 bool yf::FrontendNet::ThreadPoolPackage::cleanup(void *info)
189 {
190  mp::Session *ses = (mp::Session *) info;
191  if (*ses == m_package->session())
192  {
193  m_assoc_child->m_no_requests--;
194  return true;
195  }
196  return false;
197 }
198 
199 void yf::FrontendNet::ThreadPoolPackage::result(const char *t_info)
200 {
201  m_assoc_child->m_no_requests--;
202 
203  yazpp_1::GDU *gdu = &m_package->response();
204 
205  if (gdu->get())
206  {
207  int len;
208  m_assoc_child->send_GDU(gdu->get(), &len);
209 
210  yaz_timing_stop(timer);
211  double duration = yaz_timing_get_real(timer);
212 
213  size_t ent = 0;
214  while (m_p->m_duration_lim[ent] != 0.0 && duration > m_p->m_duration_lim[ent])
215  ent++;
216  m_p->m_duration_freq[ent]++;
217 
218  m_p->m_duration_total += duration;
219 
220  if (m_p->m_duration_max < duration)
221  m_p->m_duration_max = duration;
222 
223  if (m_p->m_duration_min == 0.0 || m_p->m_duration_min > duration)
224  m_p->m_duration_min = duration;
225 
226  if (m_p->m_msg_config.length())
227  {
228  Z_GDU *z_gdu = gdu->get();
229 
230  std::ostringstream os;
231  os << m_p->m_msg_config << " "
232  << *m_package << " "
233  << std::fixed << std::setprecision (6) << duration << " ";
234 
235  if (z_gdu)
236  os << *z_gdu;
237  else
238  os << "-";
239 
240  yaz_log(YLOG_LOG, "%s %s", os.str().c_str(), t_info);
241  }
242  }
243  else if (!m_package->session().is_closed())
244  {
245  // no response package and yet the session is still open..
246  // means that request is unhandled..
247  yazpp_1::GDU *gdu_req = &m_package->request();
248  Z_GDU *z_gdu = gdu_req->get();
249  if (z_gdu && z_gdu->which == Z_GDU_Z3950)
250  {
251  // For Z39.50, response with a Close and shutdown
252  mp::odr odr;
253  int len;
254  Z_APDU *apdu_response = odr.create_close(
255  z_gdu->u.z3950, Z_Close_systemProblem,
256  "unhandled Z39.50 request");
257 
258  m_assoc_child->send_Z_PDU(apdu_response, &len);
259  }
260  else if (z_gdu && z_gdu->which == Z_GDU_HTTP_Request)
261  {
262  // For HTTP, respond with Server Error
263  int len;
264  mp::odr odr;
265  Z_GDU *zgdu_res
266  = odr.create_HTTP_Response(m_package->session(),
267  z_gdu->u.HTTP_Request, 500);
268  m_assoc_child->send_GDU(zgdu_res, &len);
269  }
270  m_package->session().close();
271  }
272 
273  if (m_assoc_child->m_no_requests == 0 && m_package->session().is_closed())
274  {
275  m_assoc_child->close();
276  }
277 
278 
279  delete this;
280 }
281 
282 mp::IThreadPoolMsg *yf::FrontendNet::ThreadPoolPackage::handle()
283 {
284  m_package->move(m_assoc_child->m_port->route);
285  return this;
286 }
287 
288 yf::FrontendNet::ZAssocChild::ZAssocChild(
289  yazpp_1::IPDU_Observable *PDU_Observable,
290  mp::ThreadPoolSocketObserver *my_thread_pool,
291  const mp::Package *package,
292  Port *port, Rep *rep,
293  yazpp_1::LimitConnect &limit_http_req,
294  const char *peername)
295  : Z_Assoc(PDU_Observable), m_p(rep), m_limit_http_req(limit_http_req)
296 {
297  m_thread_pool_observer = my_thread_pool;
298  m_no_requests = 0;
299  m_delete_flag = false;
300  m_package = package;
301  m_port = port;
302  std::string addr;
303  addr.append(peername);
304  addr.append(" ");
305  addr.append(port->port);
306  m_origin.set_tcpip_address(addr, m_session.id());
307  timeout(m_p->m_session_timeout);
308 }
309 
310 yazpp_1::IPDU_Observer *yf::FrontendNet::ZAssocChild::sessionNotify(
311  yazpp_1::IPDU_Observable *the_PDU_Observable, int fd)
312 {
313  return 0;
314 }
315 
316 yf::FrontendNet::ZAssocChild::~ZAssocChild()
317 {
318 }
319 
320 void yf::FrontendNet::ZAssocChild::report(Z_HTTP_Request *hreq)
321 {
322  mp::odr o;
323 
324  Z_GDU *gdu_res = o.create_HTTP_Response(m_session, hreq, 200);
325 
326  Z_HTTP_Response *hres = gdu_res->u.HTTP_Response;
327 
328  mp::wrbuf w;
329  size_t i;
330  int number_total = 0;
331 
332  for (i = 0; m_p->m_duration_lim[i] != 0.0; i++)
333  number_total += m_p->m_duration_freq[i];
334  number_total += m_p->m_duration_freq[i];
335 
336  wrbuf_puts(w, "<?xml version=\"1.0\"?>\n");
337  wrbuf_puts(w, "<frontend_net>\n");
338  wrbuf_printf(w, " <responses frequency=\"%d\">\n", number_total);
339  for (i = 0; m_p->m_duration_lim[i] != 0.0; i++)
340  {
341  if (m_p->m_duration_freq[i] > 0)
342  wrbuf_printf(
343  w, " <response duration_start=\"%f\" "
344  "duration_end=\"%f\" frequency=\"%d\"/>\n",
345  i > 0 ? m_p->m_duration_lim[i - 1] : 0.0,
347  }
348 
349  if (m_p->m_duration_freq[i] > 0)
350  wrbuf_printf(
351  w, " <response duration_start=\"%f\" frequency=\"%d\"/>\n",
352  m_p->m_duration_lim[i - 1], m_p->m_duration_freq[i]);
353 
354  if (m_p->m_duration_max != 0.0)
355  wrbuf_printf(
356  w, " <response duration_max=\"%f\"/>\n",
358  if (m_p->m_duration_min != 0.0)
359  wrbuf_printf(
360  w, " <response duration_min=\"%f\"/>\n",
362  if (m_p->m_duration_total != 0.0)
363  wrbuf_printf(
364  w, " <response duration_average=\"%f\"/>\n",
365  m_p->m_duration_total / number_total);
366 
367  wrbuf_puts(w, " </responses>\n");
368 
369  int thread_busy;
370  int thread_total;
371  m_thread_pool_observer->get_thread_info(thread_busy, thread_total);
372 
373  wrbuf_printf(w, " <thread_info busy=\"%d\" total=\"%d\"/>\n",
374  thread_busy, thread_total);
375 
376  wrbuf_malloc_info(w);
377 
378  {
379  char buf[200];
380  if (nmem_get_status(buf, sizeof(buf) - 1) == 0)
381  wrbuf_puts(w, buf);
382  }
383  wrbuf_puts(w, "</frontend_net>\n");
384 
385  hres->content_len = w.len();
386  hres->content_buf = (char *) w.buf();
387 
388  int len;
389  send_GDU(gdu_res, &len);
390 }
391 
392 void yf::FrontendNet::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
393 {
394  m_no_requests++;
395 
396  mp::Package *p = new mp::Package(m_session, m_origin);
397 
398  if (z_pdu && z_pdu->which == Z_GDU_HTTP_Request)
399  {
400  Z_HTTP_Request *hreq = z_pdu->u.HTTP_Request;
401 
402  const char *f = z_HTTP_header_lookup(hreq->headers, "X-Forwarded-For");
403  if (f)
404  p->origin().set_tcpip_address(std::string(f), m_session.id());
405 
406  if (m_p->m_stat_req.length()
407  && !strcmp(hreq->path, m_p->m_stat_req.c_str()))
408  {
409  report(hreq);
410  delete p;
411  delete this;
412  return;
413  }
414  }
415 
416  p->copy_route(*m_package);
417  p->request() = yazpp_1::GDU(z_pdu);
418 
419  if (m_p->m_msg_config.length())
420  {
421  if (z_pdu)
422  {
423  std::ostringstream os;
424  os << m_p->m_msg_config << " "
425  << *p << " "
426  << "0.000000" << " "
427  << *z_pdu;
428  yaz_log(YLOG_LOG, "%s", os.str().c_str());
429  }
430  }
431  if (z_pdu && z_pdu->which == Z_GDU_HTTP_Request)
432  {
433  Z_HTTP_Request *hreq = z_pdu->u.HTTP_Request;
434  std::string peername = p->origin().get_address();
435 
436  m_limit_http_req.cleanup(false);
437  int con_sz = m_limit_http_req.get_total(peername.c_str());
438  std::list<IP_Pattern>::const_iterator it = m_p->http_req_max.begin();
439  for (; it != m_p->http_req_max.end(); it++)
440  {
441  if (mp::util::match_ip(it->pattern, peername))
442  {
443  if (it->verbose > 1 ||
444  (it->value && con_sz >= it->value && it->verbose > 0))
445  yaz_log(YLOG_LOG, "http-req-max pattern=%s ip=%s con_sz=%d value=%d", it->pattern.c_str(), peername.c_str(), con_sz, it->value);
446  if (it->value == 0 || con_sz < it->value)
447  break;
448  mp::odr o;
449  Z_GDU *gdu_res = o.create_HTTP_Response(m_session, hreq, 500);
450  int len;
451  send_GDU(gdu_res, &len);
452  delete p;
453  delete this;
454  return;
455  }
456  }
457  m_limit_http_req.add_connect(peername.c_str());
458  }
459  ThreadPoolPackage *tp = new ThreadPoolPackage(p, this, m_p);
461 }
462 
463 void yf::FrontendNet::ZAssocChild::failNotify()
464 {
465  // TODO: send Package to signal "close"
466  if (m_session.is_closed())
467  {
468  if (m_no_requests == 0)
469  delete this;
470  return;
471  }
472  m_no_requests++;
473 
474  m_session.close();
475 
476  mp::Package *p = new mp::Package(m_session, m_origin);
477 
478  ThreadPoolPackage *tp = new ThreadPoolPackage(p, this, m_p);
479  p->copy_route(*m_package);
480  m_thread_pool_observer->cleanup(tp, &m_session);
482 }
483 
484 void yf::FrontendNet::ZAssocChild::timeoutNotify()
485 {
486  failNotify();
487 }
488 
489 void yf::FrontendNet::ZAssocChild::connectNotify()
490 {
491 
492 }
493 
494 yf::FrontendNet::ZAssocServer::ZAssocServer(
495  yazpp_1::IPDU_Observable *PDU_Observable,
496  Port *port,
497  Rep *rep)
498  :
499  Z_Assoc(PDU_Observable), m_port(port), m_p(rep)
500 {
501  m_package = 0;
502 }
503 
504 
505 void yf::FrontendNet::ZAssocServer::set_package(const mp::Package *package)
506 {
507  m_package = package;
508 }
509 
510 void yf::FrontendNet::ZAssocServer::set_thread_pool(
512 {
513  m_thread_pool_observer = observer;
514 }
515 
516 yazpp_1::IPDU_Observer *yf::FrontendNet::ZAssocServer::sessionNotify(
517  yazpp_1::IPDU_Observable *the_PDU_Observable, int fd)
518 {
519 
520  const char *peername = the_PDU_Observable->getpeername();
521  if (!peername)
522  peername = "unknown";
523  else
524  {
525  const char *cp = strchr(peername, ':');
526  if (cp)
527  peername = cp + 1;
528  }
529  if (peername)
530  {
531  limit_connect.cleanup(false);
532  int con_sz = limit_connect.get_total(peername);
533  std::list<IP_Pattern>::const_iterator it = m_p->connect_max.begin();
534  for (; it != m_p->connect_max.end(); it++)
535  {
536  if (mp::util::match_ip(it->pattern, peername))
537  {
538  if (it->verbose > 1 ||
539  (it->value && con_sz >= it->value && it->verbose > 0))
540  yaz_log(YLOG_LOG, "connect-max pattern=%s ip=%s con_sz=%d value=%d", it->pattern.c_str(), peername, con_sz, it->value);
541  if (it->value == 0 || con_sz < it->value)
542  break;
543  return 0;
544  }
545  }
546  limit_connect.add_connect(peername);
547  }
548  ZAssocChild *my = new ZAssocChild(the_PDU_Observable,
551  peername);
552  return my;
553 }
554 
555 yf::FrontendNet::ZAssocServer::~ZAssocServer()
556 {
557 }
558 
559 void yf::FrontendNet::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
560 {
561 }
562 
563 void yf::FrontendNet::ZAssocServer::failNotify()
564 {
565 }
566 
567 void yf::FrontendNet::ZAssocServer::timeoutNotify()
568 {
569 }
570 
571 void yf::FrontendNet::ZAssocServer::connectNotify()
572 {
573 }
574 
575 yf::FrontendNet::FrontendNet() : m_p(new Rep)
576 {
577 }
578 
579 yf::FrontendNet::Rep::Rep()
580 {
581  m_max_threads = m_no_threads = 5;
582  m_stack_size = 0;
583  m_listen_duration = 0;
584  m_session_timeout = 300; // 5 minutes
585  az = 0;
586  size_t i;
587  for (i = 0; i < 22; i++)
588  m_duration_freq[i] = 0;
589  m_duration_lim[0] = 0.000001;
590  m_duration_lim[1] = 0.00001;
591  m_duration_lim[2] = 0.0001;
592  m_duration_lim[3] = 0.001;
593  m_duration_lim[4] = 0.01;
594  m_duration_lim[5] = 0.1;
595  m_duration_lim[6] = 0.2;
596  m_duration_lim[7] = 0.3;
597  m_duration_lim[8] = 0.5;
598  m_duration_lim[9] = 1.0;
599  m_duration_lim[10] = 1.5;
600  m_duration_lim[11] = 2.0;
601  m_duration_lim[12] = 3.0;
602  m_duration_lim[13] = 4.0;
603  m_duration_lim[14] = 5.0;
604  m_duration_lim[15] = 6.0;
605  m_duration_lim[16] = 8.0;
606  m_duration_lim[17] = 10.0;
607  m_duration_lim[18] = 15.0;
608  m_duration_lim[19] = 20.0;
609  m_duration_lim[20] = 30.0;
610  m_duration_lim[21] = 0.0;
611  m_duration_max = 0.0;
612  m_duration_min = 0.0;
613  m_duration_total = 0.0;
614  m_stop_signo = 0;
615 }
616 
617 yf::FrontendNet::Rep::~Rep()
618 {
619  if (az)
620  {
621  size_t i;
622  for (i = 0; i < m_ports.size(); i++)
623  delete az[i];
624  delete [] az;
625  delete [] pdu;
626  }
627  az = 0;
628 }
629 
630 yf::FrontendNet::~FrontendNet()
631 {
632 }
633 
634 void yf::FrontendNet::stop(int signo) const
635 {
636  m_p->m_stop_signo = signo;
637 }
638 
639 void yf::FrontendNet::start() const
640 {
641 #if HAVE_GETRLIMIT
642  struct rlimit limit_data;
643  getrlimit(RLIMIT_NOFILE, &limit_data);
644  yaz_log(YLOG_LOG, "getrlimit NOFILE cur=%ld max=%ld",
645  (long) limit_data.rlim_cur, (long) limit_data.rlim_max);
646 #endif
647 }
648 
649 bool yf::FrontendNet::My_Timer_Thread::timeout()
650 {
651  return m_timeout;
652 }
653 
654 yf::FrontendNet::My_Timer_Thread::My_Timer_Thread(
655  yazpp_1::ISocketObservable *obs,
656  int duration) :
657  m_obs(obs), m_pipe(9123), m_timeout(false)
658 {
659  obs->addObserver(m_pipe.read_fd(), this);
660  obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
661  obs->timeoutObserver(this, duration);
662 }
663 
664 void yf::FrontendNet::My_Timer_Thread::socketNotify(int event)
665 {
666  m_timeout = true;
667  m_obs->deleteObserver(this);
668 }
669 
670 void yf::FrontendNet::process(mp::Package &package) const
671 {
672  if (m_p->az == 0)
673  return;
674  size_t i;
675  My_Timer_Thread *tt = 0;
676 
677  if (m_p->m_listen_duration)
680 
683  m_p->m_stack_size);
684 
685  for (i = 0; i<m_p->m_ports.size(); i++)
686  {
687  m_p->az[i]->set_package(&package);
688  m_p->az[i]->set_thread_pool(&tp);
689  }
690  while (m_p->mySocketManager.processEvent() > 0)
691  {
692  if (m_p->m_stop_signo == SIGTERM)
693  {
694  yaz_log(YLOG_LOG, "metaproxy received SIGTERM");
695  if (m_p->az)
696  {
697  size_t i;
698  for (i = 0; i < m_p->m_ports.size(); i++)
699  {
700  m_p->pdu[i]->shutdown();
701  m_p->az[i]->server("");
702  }
703  yaz_daemon_stop();
704  }
705  break; /* stop right away */
706  }
707 #ifndef WIN32
708  if (m_p->m_stop_signo == SIGUSR1)
709  { /* just stop listeners and cont till all sessions are done*/
710  yaz_log(YLOG_LOG, "metaproxy received SIGUSR1");
711  m_p->m_stop_signo = 0;
712  if (m_p->az)
713  {
714  size_t i;
715  for (i = 0; i < m_p->m_ports.size(); i++)
716  m_p->az[i]->server("");
717  yaz_daemon_stop();
718  }
719  }
720 #endif
721  int no = m_p->mySocketManager.getNumberOfObservers();
722  if (no <= 1)
723  break;
724  if (tt && tt->timeout())
725  break;
726  }
727  delete tt;
728 }
729 
730 void yf::FrontendNet::configure(const xmlNode * ptr, bool test_only,
731  const char *path)
732 {
733  if (!ptr || !ptr->children)
734  {
735  throw yf::FilterException("No ports for Frontend");
736  }
737  std::vector<Port> ports;
738  for (ptr = ptr->children; ptr; ptr = ptr->next)
739  {
740  if (ptr->type != XML_ELEMENT_NODE)
741  continue;
742  if (!strcmp((const char *) ptr->name, "port"))
743  {
744  Port port;
745 
746  const char *names[5] = {"route", "max_recv_bytes", "port",
747  "cert_fname", 0};
748  std::string values[4];
749 
750  mp::xml::parse_attr(ptr, names, values);
751  port.route = values[0];
752  if (values[1].length() > 0)
753  port.max_recv_bytes = atoi(values[1].c_str());
754  else
755  port.max_recv_bytes = 0;
756  if (values[2].length() > 0)
757  port.port = values[2];
758  else
759  port.port = mp::xml::get_text(ptr);
760  port.cert_fname = values[3];
761  ports.push_back(port);
762  }
763  else if (!strcmp((const char *) ptr->name, "threads"))
764  {
765  std::string threads_str = mp::xml::get_text(ptr);
766  int threads = atoi(threads_str.c_str());
767  if (threads < 1)
768  throw yf::FilterException("Bad value for threads: "
769  + threads_str);
770  m_p->m_no_threads = threads;
771  }
772  else if (!strcmp((const char *) ptr->name, "max-threads"))
773  {
774  std::string threads_str = mp::xml::get_text(ptr);
775  int threads = atoi(threads_str.c_str());
776  if (threads < 1)
777  throw yf::FilterException("Bad value for max-threads: "
778  + threads_str);
779  m_p->m_max_threads = threads;
780  }
781  else if (!strcmp((const char *) ptr->name, "stack-size"))
782  {
783  std::string sz_str = mp::xml::get_text(ptr);
784  int sz = atoi(sz_str.c_str());
785  if (sz < 0)
786  throw yf::FilterException("Bad value for stack-size: "
787  + sz_str);
788  m_p->m_stack_size = sz * 1024;
789  }
790  else if (!strcmp((const char *) ptr->name, "timeout"))
791  {
792  std::string timeout_str = mp::xml::get_text(ptr);
793  int timeout = atoi(timeout_str.c_str());
794  if (timeout < 1)
795  throw yf::FilterException("Bad value for timeout: "
796  + timeout_str);
797  m_p->m_session_timeout = timeout;
798  }
799  else if (!strcmp((const char *) ptr->name, "connect-max"))
800  {
801  const char *names[3] = {"ip", "verbose", 0};
802  std::string values[2];
803 
804  mp::xml::parse_attr(ptr, names, values);
805  IP_Pattern m;
806  m.value = mp::xml::get_int(ptr, 0);
807  m.pattern = values[0];
808  m.verbose = values[1].length() ? atoi(values[1].c_str()) : 1;
809  m_p->connect_max.push_back(m);
810  }
811  else if (!strcmp((const char *) ptr->name, "http-req-max"))
812  {
813  const char *names[3] = {"ip", "verbose", 0};
814  std::string values[2];
815 
816  mp::xml::parse_attr(ptr, names, values);
817  IP_Pattern m;
818  m.value = mp::xml::get_int(ptr, 0);
819  m.pattern = values[0];
820  m.verbose = values[1].length() ? atoi(values[1].c_str()) : 1;
821  m_p->http_req_max.push_back(m);
822  }
823  else if (!strcmp((const char *) ptr->name, "message"))
824  {
825  m_p->m_msg_config = mp::xml::get_text(ptr);
826  }
827  else if (!strcmp((const char *) ptr->name, "stat-req"))
828  {
829  m_p->m_stat_req = mp::xml::get_text(ptr);
830  }
831  else
832  {
833  throw yf::FilterException("Bad element "
834  + std::string((const char *)
835  ptr->name));
836  }
837  }
838  if (m_p->m_msg_config.length() > 0 && m_p->m_stat_req.length() == 0)
839  { // allow stats if message is enabled for filter
840  m_p->m_stat_req = "/fn_stat";
841  }
842  if (test_only)
843  return;
844  set_ports(ports);
845 }
846 
847 void yf::FrontendNet::set_ports(std::vector<std::string> &ports)
848 {
849  std::vector<Port> nports;
850  size_t i;
851 
852  for (i = 0; i < ports.size(); i++)
853  {
854  Port nport;
855 
856  nport.port = ports[i];
857 
858  nports.push_back(nport);
859  }
860  set_ports(nports);
861 }
862 
863 
864 void yf::FrontendNet::set_ports(std::vector<Port> &ports)
865 {
866  m_p->m_ports = ports;
867 
868  m_p->az = new yf::FrontendNet::ZAssocServer *[m_p->m_ports.size()];
869  m_p->pdu = new yazpp_1::PDU_Assoc *[m_p->m_ports.size()];
870 
871  // Create yf::FrontendNet::ZAssocServer for each port
872  size_t i;
873  for (i = 0; i < m_p->m_ports.size(); i++)
874  m_p->az[i] = 0;
875  for (i = 0; i < m_p->m_ports.size(); i++)
876  {
877  // create a PDU assoc object (one per yf::FrontendNet::ZAssocServer)
878  yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&m_p->mySocketManager);
879 
880  if (m_p->m_ports[i].cert_fname.length())
881  as->set_cert_fname(m_p->m_ports[i].cert_fname.c_str());
882  // create ZAssoc with PDU Assoc
883  m_p->pdu[i] = as;
884  m_p->az[i] = new yf::FrontendNet::ZAssocServer(
885  as, &m_p->m_ports[i], m_p.get());
886  if (m_p->az[i]->server(m_p->m_ports[i].port.c_str()))
887  {
888  throw yf::FilterException("Unable to bind to address "
889  + std::string(m_p->m_ports[i].port));
890  }
891  COMSTACK cs = as->get_comstack();
892 
893  if (cs && m_p->m_ports[i].max_recv_bytes)
894  cs_set_max_recv_bytes(cs, m_p->m_ports[i].max_recv_bytes);
895 
896  }
897 }
898 
899 void yf::FrontendNet::set_listen_duration(int d)
900 {
901  m_p->m_listen_duration = d;
902 }
903 
904 static yf::Base* filter_creator()
905 {
906  return new yf::FrontendNet;
907 }
908 
909 extern "C" {
910  struct metaproxy_1_filter_struct metaproxy_1_filter_frontend_net = {
911  0,
912  "frontend_net",
914  };
915 }
916 
917 /*
918  * Local variables:
919  * c-basic-offset: 4
920  * c-file-style: "Stroustrup"
921  * indent-tabs-mode: nil
922  * End:
923  * vim: shiftwidth=4 tabstop=8 expandtab
924  */
925 
static yf::Base * filter_creator()
void set_thread_pool(ThreadPoolSocketObserver *observer)
void set_ports(std::vector< Port > &ports)
set ports
void set_package(const mp::Package *package)
struct metaproxy_1_filter_struct metaproxy_1_filter_frontend_net
void get_thread_info(int &tbusy, int &total)
mp::ThreadPoolSocketObserver * m_thread_pool_observer
mp::ThreadPoolSocketObserver * m_thread_pool_observer
void cleanup(IThreadPoolMsg *m, void *info)