Rizin
unix-like reverse engineering framework and cli tools
threadpool.c
Go to the documentation of this file.
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv-common.h"
23 
24 #if !defined(_WIN32)
25 # include "unix/internal.h"
26 #endif
27 
28 #include <stdlib.h>
29 
30 #define MAX_THREADPOOL_SIZE 1024
31 
33 static uv_cond_t cond;
35 static unsigned int idle_threads;
36 static unsigned int slow_io_work_running;
37 static unsigned int nthreads;
41 static QUEUE wq;
44 
45 static unsigned int slow_work_thread_threshold(void) {
46  return (nthreads + 1) / 2;
47 }
48 
49 static void uv__cancelled(struct uv__work* w) {
50  abort();
51 }
52 
53 
54 /* To avoid deadlock with uv_cancel() it's crucial that the worker
55  * never holds the global mutex and the loop-local mutex at the same time.
56  */
57 static void worker(void* arg) {
58  struct uv__work* w;
59  QUEUE* q;
60  int is_slow_work;
61 
63  arg = NULL;
64 
66  for (;;) {
67  /* `mutex` should always be locked at this point. */
68 
69  /* Keep waiting while either no work is present or only slow I/O
70  and we're at the threshold for that. */
71  while (QUEUE_EMPTY(&wq) ||
75  idle_threads += 1;
77  idle_threads -= 1;
78  }
79 
80  q = QUEUE_HEAD(&wq);
81  if (q == &exit_message) {
84  break;
85  }
86 
87  QUEUE_REMOVE(q);
88  QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */
89 
90  is_slow_work = 0;
91  if (q == &run_slow_work_message) {
92  /* If we're at the slow I/O threshold, re-schedule until after all
93  other work in the queue is done. */
95  QUEUE_INSERT_TAIL(&wq, q);
96  continue;
97  }
98 
99  /* If we encountered a request to run slow I/O work but there is none
100  to run, that means it's cancelled => Start over. */
102  continue;
103 
104  is_slow_work = 1;
106 
108  QUEUE_REMOVE(q);
109  QUEUE_INIT(q);
110 
111  /* If there is more slow I/O work, schedule it to be run as well. */
114  if (idle_threads > 0)
116  }
117  }
118 
120 
121  w = QUEUE_DATA(q, struct uv__work, wq);
122  w->work(w);
123 
124  uv_mutex_lock(&w->loop->wq_mutex);
125  w->work = NULL; /* Signal uv_cancel() that the work req is done
126  executing. */
127  QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
128  uv_async_send(&w->loop->wq_async);
129  uv_mutex_unlock(&w->loop->wq_mutex);
130 
131  /* Lock `mutex` since that is expected at the start of the next
132  * iteration. */
134  if (is_slow_work) {
135  /* `slow_io_work_running` is protected by `mutex`. */
137  }
138  }
139 }
140 
141 
142 static void post(QUEUE* q, enum uv__work_kind kind) {
144  if (kind == UV__WORK_SLOW_IO) {
145  /* Insert into a separate queue. */
148  /* Running slow I/O tasks is already scheduled => Nothing to do here.
149  The worker that runs said other task will schedule this one as well. */
151  return;
152  }
154  }
155 
156  QUEUE_INSERT_TAIL(&wq, q);
157  if (idle_threads > 0)
160 }
161 
162 
164 #ifndef _WIN32
165  unsigned int i;
166 
167  if (nthreads == 0)
168  return;
169 
171 
172  for (i = 0; i < nthreads; i++)
173  if (uv_thread_join(threads + i))
174  abort();
175 
176  if (threads != default_threads)
177  uv__free(threads);
178 
181 
182  threads = NULL;
183  nthreads = 0;
184 #endif
185 }
186 
187 
188 static void init_threads(void) {
189  unsigned int i;
190  const char* val;
191  uv_sem_t sem;
192 
194  val = getenv("UV_THREADPOOL_SIZE");
195  if (val != NULL)
196  nthreads = atoi(val);
197  if (nthreads == 0)
198  nthreads = 1;
201 
204  threads = uv__malloc(nthreads * sizeof(threads[0]));
205  if (threads == NULL) {
208  }
209  }
210 
211  if (uv_cond_init(&cond))
212  abort();
213 
214  if (uv_mutex_init(&mutex))
215  abort();
216 
217  QUEUE_INIT(&wq);
220 
221  if (uv_sem_init(&sem, 0))
222  abort();
223 
224  for (i = 0; i < nthreads; i++)
225  if (uv_thread_create(threads + i, worker, &sem))
226  abort();
227 
228  for (i = 0; i < nthreads; i++)
229  uv_sem_wait(&sem);
230 
231  uv_sem_destroy(&sem);
232 }
233 
234 
235 #ifndef _WIN32
236 static void reset_once(void) {
237  uv_once_t child_once = UV_ONCE_INIT;
238  memcpy(&once, &child_once, sizeof(child_once));
239 }
240 #endif
241 
242 
243 static void init_once(void) {
244 #ifndef _WIN32
245  /* Re-initialize the threadpool after fork.
246  * Note that this discards the global mutex and condition as well
247  * as the work queue.
248  */
249  if (pthread_atfork(NULL, NULL, &reset_once))
250  abort();
251 #endif
252  init_threads();
253 }
254 
255 
257  struct uv__work* w,
258  enum uv__work_kind kind,
259  void (*work)(struct uv__work* w),
260  void (*done)(struct uv__work* w, int status)) {
262  w->loop = loop;
263  w->work = work;
264  w->done = done;
265  post(&w->wq, kind);
266 }
267 
268 
270  int cancelled;
271 
273  uv_mutex_lock(&w->loop->wq_mutex);
274 
275  cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
276  if (cancelled)
277  QUEUE_REMOVE(&w->wq);
278 
279  uv_mutex_unlock(&w->loop->wq_mutex);
281 
282  if (!cancelled)
283  return UV_EBUSY;
284 
285  w->work = uv__cancelled;
286  uv_mutex_lock(&loop->wq_mutex);
287  QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
288  uv_async_send(&loop->wq_async);
289  uv_mutex_unlock(&loop->wq_mutex);
290 
291  return 0;
292 }
293 
294 
296  struct uv__work* w;
297  uv_loop_t* loop;
298  QUEUE* q;
299  QUEUE wq;
300  int err;
301 
302  loop = container_of(handle, uv_loop_t, wq_async);
303  uv_mutex_lock(&loop->wq_mutex);
304  QUEUE_MOVE(&loop->wq, &wq);
305  uv_mutex_unlock(&loop->wq_mutex);
306 
307  while (!QUEUE_EMPTY(&wq)) {
308  q = QUEUE_HEAD(&wq);
309  QUEUE_REMOVE(q);
310 
311  w = container_of(q, struct uv__work, wq);
312  err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
313  w->done(w, err);
314  }
315 }
316 
317 
318 static void uv__queue_work(struct uv__work* w) {
319  uv_work_t* req = container_of(w, uv_work_t, work_req);
320 
321  req->work_cb(req);
322 }
323 
324 
325 static void uv__queue_done(struct uv__work* w, int err) {
326  uv_work_t* req;
327 
328  req = container_of(w, uv_work_t, work_req);
329  uv__req_unregister(req->loop, req);
330 
331  if (req->after_work_cb == NULL)
332  return;
333 
334  req->after_work_cb(req, err);
335 }
336 
337 
339  uv_work_t* req,
340  uv_work_cb work_cb,
341  uv_after_work_cb after_work_cb) {
342  if (work_cb == NULL)
343  return UV_EINVAL;
344 
345  uv__req_init(loop, req, UV_WORK);
346  req->loop = loop;
347  req->work_cb = work_cb;
348  req->after_work_cb = after_work_cb;
350  &req->work_req,
351  UV__WORK_CPU,
354  return 0;
355 }
356 
357 
359  struct uv__work* wreq;
360  uv_loop_t* loop;
361 
362  switch (req->type) {
363  case UV_FS:
364  loop = ((uv_fs_t*) req)->loop;
365  wreq = &((uv_fs_t*) req)->work_req;
366  break;
367  case UV_GETADDRINFO:
368  loop = ((uv_getaddrinfo_t*) req)->loop;
369  wreq = &((uv_getaddrinfo_t*) req)->work_req;
370  break;
371  case UV_GETNAMEINFO:
372  loop = ((uv_getnameinfo_t*) req)->loop;
373  wreq = &((uv_getnameinfo_t*) req)->work_req;
374  break;
375  case UV_RANDOM:
376  loop = ((uv_random_t*) req)->loop;
377  wreq = &((uv_random_t*) req)->work_req;
378  break;
379  case UV_WORK:
380  loop = ((uv_work_t*) req)->loop;
381  wreq = &((uv_work_t*) req)->work_req;
382  break;
383  default:
384  return UV_EINVAL;
385  }
386 
387  return uv__work_cancel(loop, req, wreq);
388 }
#define ARRAY_SIZE(a)
lzma_index ** i
Definition: index.h:629
ut16 val
Definition: armass64_const.h:6
static bool err
Definition: armass.c:435
static mcore_handle handle
Definition: asm_mcore.c:8
#define NULL
Definition: cris-opc.c:27
#define w
Definition: crypto_rc6.c:13
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence static length const void static len static semflg const void static shmflg const struct timespec req
Definition: sflib.h:128
struct tab * done
Definition: enough.c:233
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
static const char struct stat static buf struct stat static buf static vhangup int status
Definition: sflib.h:145
#define QUEUE_DATA(ptr, type, field)
Definition: queue.h:30
#define QUEUE_EMPTY(q)
Definition: queue.h:39
#define QUEUE_NEXT(q)
Definition: queue.h:24
#define QUEUE_HEAD(q)
Definition: queue.h:42
#define QUEUE_INSERT_TAIL(h, q)
Definition: queue.h:92
#define QUEUE_MOVE(h, n)
Definition: queue.h:72
#define QUEUE_INIT(q)
Definition: queue.h:45
void * QUEUE[2]
Definition: queue.h:21
#define QUEUE_REMOVE(q)
Definition: queue.h:101
#define container_of(ptr, type, member)
Definition: rz_types.h:650
void(* work)(struct uv__work *w)
Definition: threadpool.h:31
Definition: uv.h:844
Definition: uv.h:1306
Definition: uv.h:1780
Definition: uv.h:407
Definition: uv.h:1066
char * getenv()
uv_loop_t * loop
Definition: main.c:7
static void worker(void *arg)
Definition: threadpool.c:57
static QUEUE exit_message
Definition: threadpool.c:40
static unsigned int idle_threads
Definition: threadpool.c:35
static unsigned int slow_io_work_running
Definition: threadpool.c:36
static uv_mutex_t mutex
Definition: threadpool.c:34
static QUEUE run_slow_work_message
Definition: threadpool.c:42
void uv__work_done(uv_async_t *handle)
Definition: threadpool.c:295
void uv__threadpool_cleanup(void)
Definition: threadpool.c:163
static unsigned int nthreads
Definition: threadpool.c:37
static uv_thread_t * threads
Definition: threadpool.c:38
int uv_cancel(uv_req_t *req)
Definition: threadpool.c:358
static void post(QUEUE *q, enum uv__work_kind kind)
Definition: threadpool.c:142
int uv_queue_work(uv_loop_t *loop, uv_work_t *req, uv_work_cb work_cb, uv_after_work_cb after_work_cb)
Definition: threadpool.c:338
static void uv__queue_done(struct uv__work *w, int err)
Definition: threadpool.c:325
static uv_once_t once
Definition: threadpool.c:32
static unsigned int slow_work_thread_threshold(void)
Definition: threadpool.c:45
static uv_cond_t cond
Definition: threadpool.c:33
static int uv__work_cancel(uv_loop_t *loop, uv_req_t *req, struct uv__work *w)
Definition: threadpool.c:269
static void init_threads(void)
Definition: threadpool.c:188
static void reset_once(void)
Definition: threadpool.c:236
#define MAX_THREADPOOL_SIZE
Definition: threadpool.c:30
static void uv__queue_work(struct uv__work *w)
Definition: threadpool.c:318
static uv_thread_t default_threads[4]
Definition: threadpool.c:39
static void uv__cancelled(struct uv__work *w)
Definition: threadpool.c:49
void uv__work_submit(uv_loop_t *loop, struct uv__work *w, enum uv__work_kind kind, void(*work)(struct uv__work *w), void(*done)(struct uv__work *w, int status))
Definition: threadpool.c:256
static QUEUE wq
Definition: threadpool.c:41
static void init_once(void)
Definition: threadpool.c:243
static QUEUE slow_io_pending_wq
Definition: threadpool.c:43
Definition: win.h:249
UV_PLATFORM_SEM_T uv_sem_t
Definition: unix.h:139
pthread_mutex_t uv_mutex_t
Definition: unix.h:137
pthread_t uv_thread_t
Definition: unix.h:136
pthread_once_t uv_once_t
Definition: unix.h:135
#define UV_ONCE_INIT
Definition: unix.h:133
void * uv__malloc(size_t size)
Definition: uv-common.c:75
void uv__free(void *ptr)
Definition: uv-common.c:81
uv__work_kind
Definition: uv-common.h:191
@ UV__WORK_SLOW_IO
Definition: uv-common.h:194
@ UV__WORK_CPU
Definition: uv-common.h:192
#define uv__req_init(loop, req, typ)
Definition: uv-common.h:329
#define uv__req_unregister(loop, req)
Definition: uv-common.h:230
UV_EXTERN void uv_mutex_lock(uv_mutex_t *handle)
Definition: thread.c:330
UV_EXTERN int uv_thread_join(uv_thread_t *tid)
Definition: thread.c:272
UV_EXTERN void uv_once(uv_once_t *guard, void(*callback)(void))
Definition: thread.c:419
UV_EXTERN void uv_sem_destroy(uv_sem_t *sem)
Definition: thread.c:662
void(* uv_after_work_cb)(uv_work_t *req, int status)
Definition: uv.h:330
UV_EXTERN void uv_sem_post(uv_sem_t *sem)
Definition: thread.c:670
UV_EXTERN void uv_mutex_destroy(uv_mutex_t *handle)
Definition: thread.c:324
UV_EXTERN void uv_mutex_unlock(uv_mutex_t *handle)
Definition: thread.c:350
UV_EXTERN int uv_thread_create(uv_thread_t *tid, uv_thread_cb entry, void *arg)
Definition: thread.c:210
UV_EXTERN void uv_cond_wait(uv_cond_t *cond, uv_mutex_t *mutex)
Definition: thread.c:779
UV_EXTERN void uv_cond_signal(uv_cond_t *cond)
Definition: thread.c:769
UV_EXTERN int uv_async_send(uv_async_t *async)
Definition: async.c:63
UV_EXTERN void uv_sem_wait(uv_sem_t *sem)
Definition: thread.c:678
UV_EXTERN int uv_sem_init(uv_sem_t *sem, unsigned int value)
Definition: thread.c:650
UV_EXTERN int uv_mutex_init(uv_mutex_t *handle)
Definition: thread.c:282
UV_EXTERN void uv_cond_destroy(uv_cond_t *cond)
Definition: thread.c:735
UV_EXTERN int uv_cond_init(uv_cond_t *cond)
Definition: thread.c:704
void(* uv_work_cb)(uv_work_t *req)
Definition: uv.h:329