pazpar2  1.13.0
sel_thread.c
Go to the documentation of this file.
1 /* This file is part of Pazpar2.
2  Copyright (C) Index Data
3 
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8 
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 for more details.
13 
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 
18 */
19 
20 #if HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include "sel_thread.h"
25 #include <yaz/log.h>
26 #include <yaz/nmem.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30 #ifdef WIN32
31 #include <winsock2.h>
32 #endif
33 #include <stdlib.h>
34 #include <yaz/thread_create.h>
35 #include <yaz/mutex.h>
36 #include <yaz/spipe.h>
37 #include <assert.h>
38 
39 struct work_item {
40  void *data;
41  struct work_item *next;
42 };
43 
44 static struct work_item *queue_remove_last(struct work_item **q)
45 {
46  struct work_item **work_p = q, *work_this = 0;
47 
48  while (*work_p && (*work_p)->next)
49  work_p = &(*work_p)->next;
50  if (*work_p)
51  {
52  work_this = *work_p;
53  *work_p = 0;
54  }
55  return work_this;
56 }
57 
58 static void queue_trav(struct work_item *q, void (*f)(void *data))
59 {
60  for (; q; q = q->next)
61  f(q->data);
62 }
63 
64 struct sel_thread {
65  int write_fd;
66  int read_fd;
67  yaz_spipe_t spipe;
68  NMEM nmem;
69  yaz_thread_t *thread_id;
70  YAZ_MUTEX mutex;
71  YAZ_COND input_data;
72  int stop_flag;
77  void (*work_handler)(void *work_data);
78  void (*work_destroy)(void *work_data);
79 };
80 
81 static int input_queue_length = 0;
82 
83 static void *sel_thread_handler(void *vp)
84 {
85  sel_thread_t p = (sel_thread_t) vp;
86 
87  while (1)
88  {
89  struct work_item *work_this = 0;
90  /* wait for some work */
91  yaz_mutex_enter(p->mutex);
92  while (!p->stop_flag && !p->input_queue)
93  yaz_cond_wait(p->input_data, p->mutex, 0);
94  /* see if we were waken up because we're shutting down */
95  if (p->stop_flag)
96  break;
97  /* got something. Take the last one out of input_queue */
98 
99  assert(p->input_queue);
100  work_this = queue_remove_last(&p->input_queue);
101  input_queue_length--;
102 #if 0
103  yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length);
104 #endif
105  assert(work_this);
106 
107  yaz_mutex_leave(p->mutex);
108 
109  /* work on this item */
110  p->work_handler(work_this->data);
111 
112  /* put it back into output queue */
113  yaz_mutex_enter(p->mutex);
114  work_this->next = p->output_queue;
115  p->output_queue = work_this;
116  yaz_mutex_leave(p->mutex);
117 
118  /* wake up select/poll with a single byte */
119 #ifdef WIN32
120  (void) send(p->write_fd, "", 1, 0);
121 #else
122  (void) write(p->write_fd, "", 1);
123 #endif
124  }
125  yaz_mutex_leave(p->mutex);
126  return 0;
127 }
128 
129 sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
130  void (*work_destroy)(void *work_data),
131  int *read_fd, int no_of_threads)
132 {
133  int i;
134  NMEM nmem = nmem_create();
135  sel_thread_t p = nmem_malloc(nmem, sizeof(*p));
136 
137  assert(work_handler);
138  /* work_destroy may be NULL */
139  assert(read_fd);
140  assert(no_of_threads >= 1);
141 
142  p->nmem = nmem;
143 
144 #ifdef WIN32
145  /* use port 12119 temporarily on Windos and hope for the best */
146  p->spipe = yaz_spipe_create(12119, 0);
147 #else
148  p->spipe = yaz_spipe_create(0, 0);
149 #endif
150  if (!p->spipe)
151  {
152  nmem_destroy(nmem);
153  return 0;
154  }
155 
156  *read_fd = p->read_fd = yaz_spipe_get_read_fd(p->spipe);
157  p->write_fd = yaz_spipe_get_write_fd(p->spipe);
158 
159  p->input_queue = 0;
160  p->output_queue = 0;
161  p->free_queue = 0;
164  p->no_threads = 0; /* we if need to destroy */
165  p->stop_flag = 0;
166  p->mutex = 0;
167  yaz_mutex_create(&p->mutex);
168  yaz_cond_create(&p->input_data);
169  if (p->input_data == 0) /* condition variable could not be created? */
170  {
172  return 0;
173  }
174 
175  p->no_threads = no_of_threads;
176  p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
177  for (i = 0; i < p->no_threads; i++)
178  p->thread_id[i] = yaz_thread_create(sel_thread_handler, p);
179  return p;
180 }
181 
183 {
184  int i;
185  yaz_mutex_enter(p->mutex);
186  p->stop_flag = 1;
187  yaz_cond_broadcast(p->input_data);
188  yaz_mutex_leave(p->mutex);
189 
190  for (i = 0; i< p->no_threads; i++)
191  yaz_thread_join(&p->thread_id[i], 0);
192 
193  if (p->work_destroy)
194  {
197  }
198 
199  yaz_spipe_destroy(p->spipe);
200  yaz_cond_destroy(&p->input_data);
201  yaz_mutex_destroy(&p->mutex);
202  nmem_destroy(p->nmem);
203 }
204 
206 {
207  struct work_item *work_p;
208 
209  yaz_mutex_enter(p->mutex);
210 
211  if (p->free_queue)
212  {
213  work_p = p->free_queue;
214  p->free_queue = p->free_queue->next;
215  }
216  else
217  work_p = nmem_malloc(p->nmem, sizeof(*work_p));
218 
219  work_p->data = data;
220  work_p->next = p->input_queue;
221  p->input_queue = work_p;
222  input_queue_length++;
223 #if 0
224  yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length);
225 #endif
226  yaz_cond_signal(p->input_data);
227  yaz_mutex_leave(p->mutex);
228 }
229 
231 {
232  struct work_item *work_this = 0;
233  void *data = 0;
234  char read_buf[1];
235 
236  yaz_mutex_enter(p->mutex);
237 
238  /* got something. Take the last one out of output_queue */
239  work_this = queue_remove_last(&p->output_queue);
240  if (work_this)
241  {
242  /* put freed item in free list */
243  work_this->next = p->free_queue;
244  p->free_queue = work_this;
245 
246  data = work_this->data;
247 #ifdef WIN32
248  (void) recv(p->read_fd, read_buf, 1, 0);
249 #else
250  (void) read(p->read_fd, read_buf, 1);
251 #endif
252  }
253  yaz_mutex_leave(p->mutex);
254  return data;
255 }
256 
257 /*
258  * Local variables:
259  * c-basic-offset: 4
260  * c-file-style: "Stroustrup"
261  * indent-tabs-mode: nil
262  * End:
263  * vim: shiftwidth=4 tabstop=8 expandtab
264  */
265 
struct work_item * output_queue
Definition: sel_thread.c:75
void sel_thread_destroy(sel_thread_t p)
destorys select thread
Definition: sel_thread.c:182
struct sel_thread * sel_thread_t
select thread handler type
Definition: sel_thread.h:27
yaz_spipe_t spipe
Definition: sel_thread.c:67
struct work_item * input_queue
Definition: sel_thread.c:74
struct work_item * free_queue
Definition: sel_thread.c:76
void * data
Definition: sel_thread.c:40
static void queue_trav(struct work_item *q, void(*f)(void *data))
Definition: sel_thread.c:58
struct work_item * next
Definition: sel_thread.c:41
static struct work_item * queue_remove_last(struct work_item **q)
Definition: sel_thread.c:44
YAZ_MUTEX mutex
Definition: sel_thread.c:70
NMEM nmem
Definition: sel_thread.c:68
static void * sel_thread_handler(void *vp)
Definition: sel_thread.c:83
int write_fd
Definition: sel_thread.c:65
int no_threads
Definition: sel_thread.c:73
static void work_destroy(void *vp)
how work is destructed
int stop_flag
Definition: sel_thread.c:72
void sel_thread_add(sel_thread_t p, void *data)
adds work to be carried out in thread
Definition: sel_thread.c:205
yaz_thread_t * thread_id
Definition: sel_thread.c:69
sel_thread_t sel_thread_create(void(*work_handler)(void *work_data), void(*work_destroy)(void *work_data), int *read_fd, int no_of_threads)
creates select thread
Definition: sel_thread.c:129
static int input_queue_length
Definition: sel_thread.c:81
void * sel_thread_result(sel_thread_t p)
gets result of work
Definition: sel_thread.c:230
int read_fd
Definition: sel_thread.c:66
YAZ_COND input_data
Definition: sel_thread.c:71
void(* work_destroy)(void *work_data)
Definition: sel_thread.c:78
static void work_handler(void *work_data)
Definition: eventl.c:228
void(* work_handler)(void *work_data)
Definition: sel_thread.c:77