Rizin
unix-like reverse engineering framework and cli tools
threadpool.c File Reference
#include "uv-common.h"
#include "unix/internal.h"
#include <stdlib.h>

Go to the source code of this file.

Macros

#define MAX_THREADPOOL_SIZE   1024
 

Functions

static unsigned int slow_work_thread_threshold (void)
 
static void uv__cancelled (struct uv__work *w)
 
static void worker (void *arg)
 
static void post (QUEUE *q, enum uv__work_kind kind)
 
void uv__threadpool_cleanup (void)
 
static void init_threads (void)
 
static void reset_once (void)
 
static void init_once (void)
 
void uv__work_submit (uv_loop_t *loop, struct uv__work *w, enum uv__work_kind kind, void(*work)(struct uv__work *w), void(*done)(struct uv__work *w, int status))
 
static int uv__work_cancel (uv_loop_t *loop, uv_req_t *req, struct uv__work *w)
 
void uv__work_done (uv_async_t *handle)
 
static void uv__queue_work (struct uv__work *w)
 
static void uv__queue_done (struct uv__work *w, int err)
 
int uv_queue_work (uv_loop_t *loop, uv_work_t *req, uv_work_cb work_cb, uv_after_work_cb after_work_cb)
 
int uv_cancel (uv_req_t *req)
 

Variables

static uv_once_t once = UV_ONCE_INIT
 
static uv_cond_t cond
 
static uv_mutex_t mutex
 
static unsigned int idle_threads
 
static unsigned int slow_io_work_running
 
static unsigned int nthreads
 
static uv_thread_tthreads
 
static uv_thread_t default_threads [4]
 
static QUEUE exit_message
 
static QUEUE wq
 
static QUEUE run_slow_work_message
 
static QUEUE slow_io_pending_wq
 

Macro Definition Documentation

◆ MAX_THREADPOOL_SIZE

#define MAX_THREADPOOL_SIZE   1024

Definition at line 30 of file threadpool.c.

Function Documentation

◆ init_once()

static void init_once ( void  )
static

Definition at line 243 of file threadpool.c.

243  {
244 #ifndef _WIN32
245  /* Re-initialize the threadpool after fork.
246  * Note that this discards the global mutex and condition as well
247  * as the work queue.
248  */
249  if (pthread_atfork(NULL, NULL, &reset_once))
250  abort();
251 #endif
252  init_threads();
253 }
#define NULL
Definition: cris-opc.c:27
static void init_threads(void)
Definition: threadpool.c:188
static void reset_once(void)
Definition: threadpool.c:236

References init_threads(), NULL, and reset_once().

Referenced by uv__work_submit().

◆ init_threads()

static void init_threads ( void  )
static

Definition at line 188 of file threadpool.c.

188  {
189  unsigned int i;
190  const char* val;
191  uv_sem_t sem;
192 
194  val = getenv("UV_THREADPOOL_SIZE");
195  if (val != NULL)
196  nthreads = atoi(val);
197  if (nthreads == 0)
198  nthreads = 1;
201 
204  threads = uv__malloc(nthreads * sizeof(threads[0]));
205  if (threads == NULL) {
208  }
209  }
210 
211  if (uv_cond_init(&cond))
212  abort();
213 
214  if (uv_mutex_init(&mutex))
215  abort();
216 
217  QUEUE_INIT(&wq);
220 
221  if (uv_sem_init(&sem, 0))
222  abort();
223 
224  for (i = 0; i < nthreads; i++)
225  if (uv_thread_create(threads + i, worker, &sem))
226  abort();
227 
228  for (i = 0; i < nthreads; i++)
229  uv_sem_wait(&sem);
230 
231  uv_sem_destroy(&sem);
232 }
#define ARRAY_SIZE(a)
lzma_index ** i
Definition: index.h:629
ut16 val
Definition: armass64_const.h:6
#define QUEUE_INIT(q)
Definition: queue.h:45
char * getenv()
static void worker(void *arg)
Definition: threadpool.c:57
static uv_mutex_t mutex
Definition: threadpool.c:34
static QUEUE run_slow_work_message
Definition: threadpool.c:42
static unsigned int nthreads
Definition: threadpool.c:37
static uv_thread_t * threads
Definition: threadpool.c:38
static uv_cond_t cond
Definition: threadpool.c:33
#define MAX_THREADPOOL_SIZE
Definition: threadpool.c:30
static uv_thread_t default_threads[4]
Definition: threadpool.c:39
static QUEUE wq
Definition: threadpool.c:41
static QUEUE slow_io_pending_wq
Definition: threadpool.c:43
UV_PLATFORM_SEM_T uv_sem_t
Definition: unix.h:139
void * uv__malloc(size_t size)
Definition: uv-common.c:75
UV_EXTERN void uv_sem_destroy(uv_sem_t *sem)
Definition: thread.c:662
UV_EXTERN int uv_thread_create(uv_thread_t *tid, uv_thread_cb entry, void *arg)
Definition: thread.c:210
UV_EXTERN void uv_sem_wait(uv_sem_t *sem)
Definition: thread.c:678
UV_EXTERN int uv_sem_init(uv_sem_t *sem, unsigned int value)
Definition: thread.c:650
UV_EXTERN int uv_mutex_init(uv_mutex_t *handle)
Definition: thread.c:282
UV_EXTERN int uv_cond_init(uv_cond_t *cond)
Definition: thread.c:704

References ARRAY_SIZE, cond, default_threads, getenv(), i, MAX_THREADPOOL_SIZE, mutex, nthreads, NULL, QUEUE_INIT, run_slow_work_message, slow_io_pending_wq, threads, uv__malloc(), uv_cond_init(), uv_mutex_init(), uv_sem_destroy(), uv_sem_init(), uv_sem_wait(), uv_thread_create(), val, worker(), and wq.

Referenced by init_once().

◆ post()

static void post ( QUEUE q,
enum uv__work_kind  kind 
)
static

Definition at line 142 of file threadpool.c.

142  {
144  if (kind == UV__WORK_SLOW_IO) {
145  /* Insert into a separate queue. */
148  /* Running slow I/O tasks is already scheduled => Nothing to do here.
149  The worker that runs said other task will schedule this one as well. */
151  return;
152  }
154  }
155 
156  QUEUE_INSERT_TAIL(&wq, q);
157  if (idle_threads > 0)
160 }
#define QUEUE_EMPTY(q)
Definition: queue.h:39
#define QUEUE_INSERT_TAIL(h, q)
Definition: queue.h:92
static unsigned int idle_threads
Definition: threadpool.c:35
@ UV__WORK_SLOW_IO
Definition: uv-common.h:194
UV_EXTERN void uv_mutex_lock(uv_mutex_t *handle)
Definition: thread.c:330
UV_EXTERN void uv_mutex_unlock(uv_mutex_t *handle)
Definition: thread.c:350
UV_EXTERN void uv_cond_signal(uv_cond_t *cond)
Definition: thread.c:769

References cond, idle_threads, mutex, QUEUE_EMPTY, QUEUE_INSERT_TAIL, run_slow_work_message, slow_io_pending_wq, UV__WORK_SLOW_IO, uv_cond_signal(), uv_mutex_lock(), uv_mutex_unlock(), and wq.

Referenced by cs_op_index(), sh_apply_effects(), sh_il_set_param_pc_ctx(), uv__threadpool_cleanup(), and uv__work_submit().

◆ reset_once()

static void reset_once ( void  )
static

Definition at line 236 of file threadpool.c.

236  {
237  uv_once_t child_once = UV_ONCE_INIT;
238  memcpy(&once, &child_once, sizeof(child_once));
239 }
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
static uv_once_t once
Definition: threadpool.c:32
pthread_once_t uv_once_t
Definition: unix.h:135
#define UV_ONCE_INIT
Definition: unix.h:133

References memcpy(), once, and UV_ONCE_INIT.

Referenced by init_once().

◆ slow_work_thread_threshold()

static unsigned int slow_work_thread_threshold ( void  )
static

Definition at line 45 of file threadpool.c.

45  {
46  return (nthreads + 1) / 2;
47 }

References nthreads.

Referenced by worker().

◆ uv__cancelled()

static void uv__cancelled ( struct uv__work w)
static

Definition at line 49 of file threadpool.c.

49  {
50  abort();
51 }

Referenced by uv__work_cancel(), and uv__work_done().

◆ uv__queue_done()

static void uv__queue_done ( struct uv__work w,
int  err 
)
static

Definition at line 325 of file threadpool.c.

325  {
326  uv_work_t* req;
327 
328  req = container_of(w, uv_work_t, work_req);
329  uv__req_unregister(req->loop, req);
330 
331  if (req->after_work_cb == NULL)
332  return;
333 
334  req->after_work_cb(req, err);
335 }
static bool err
Definition: armass.c:435
#define w
Definition: crypto_rc6.c:13
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence static length const void static len static semflg const void static shmflg const struct timespec req
Definition: sflib.h:128
#define container_of(ptr, type, member)
Definition: rz_types.h:650
Definition: uv.h:1066
#define uv__req_unregister(loop, req)
Definition: uv-common.h:230

References container_of, err, NULL, req, uv__req_unregister, and w.

Referenced by uv_queue_work().

◆ uv__queue_work()

static void uv__queue_work ( struct uv__work w)
static

Definition at line 318 of file threadpool.c.

318  {
319  uv_work_t* req = container_of(w, uv_work_t, work_req);
320 
321  req->work_cb(req);
322 }

References container_of, req, and w.

Referenced by uv_queue_work().

◆ uv__threadpool_cleanup()

void uv__threadpool_cleanup ( void  )

Definition at line 163 of file threadpool.c.

163  {
164 #ifndef _WIN32
165  unsigned int i;
166 
167  if (nthreads == 0)
168  return;
169 
171 
172  for (i = 0; i < nthreads; i++)
173  if (uv_thread_join(threads + i))
174  abort();
175 
176  if (threads != default_threads)
177  uv__free(threads);
178 
181 
182  threads = NULL;
183  nthreads = 0;
184 #endif
185 }
static QUEUE exit_message
Definition: threadpool.c:40
static void post(QUEUE *q, enum uv__work_kind kind)
Definition: threadpool.c:142
void uv__free(void *ptr)
Definition: uv-common.c:81
@ UV__WORK_CPU
Definition: uv-common.h:192
UV_EXTERN int uv_thread_join(uv_thread_t *tid)
Definition: thread.c:272
UV_EXTERN void uv_mutex_destroy(uv_mutex_t *handle)
Definition: thread.c:324
UV_EXTERN void uv_cond_destroy(uv_cond_t *cond)
Definition: thread.c:735

References cond, default_threads, exit_message, i, mutex, nthreads, NULL, post(), threads, uv__free(), UV__WORK_CPU, uv_cond_destroy(), uv_mutex_destroy(), and uv_thread_join().

Referenced by uv_library_shutdown().

◆ uv__work_cancel()

static int uv__work_cancel ( uv_loop_t loop,
uv_req_t req,
struct uv__work w 
)
static

Definition at line 269 of file threadpool.c.

269  {
270  int cancelled;
271 
273  uv_mutex_lock(&w->loop->wq_mutex);
274 
275  cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
276  if (cancelled)
277  QUEUE_REMOVE(&w->wq);
278 
279  uv_mutex_unlock(&w->loop->wq_mutex);
281 
282  if (!cancelled)
283  return UV_EBUSY;
284 
285  w->work = uv__cancelled;
286  uv_mutex_lock(&loop->wq_mutex);
287  QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
288  uv_async_send(&loop->wq_async);
289  uv_mutex_unlock(&loop->wq_mutex);
290 
291  return 0;
292 }
#define QUEUE_REMOVE(q)
Definition: queue.h:101
uv_loop_t * loop
Definition: main.c:7
static void uv__cancelled(struct uv__work *w)
Definition: threadpool.c:49
UV_EXTERN int uv_async_send(uv_async_t *async)
Definition: async.c:63

References loop, mutex, NULL, QUEUE_EMPTY, QUEUE_INSERT_TAIL, QUEUE_REMOVE, uv__cancelled(), uv_async_send(), uv_mutex_lock(), uv_mutex_unlock(), and w.

Referenced by uv_cancel().

◆ uv__work_done()

void uv__work_done ( uv_async_t handle)

Definition at line 295 of file threadpool.c.

295  {
296  struct uv__work* w;
297  uv_loop_t* loop;
298  QUEUE* q;
299  QUEUE wq;
300  int err;
301 
302  loop = container_of(handle, uv_loop_t, wq_async);
303  uv_mutex_lock(&loop->wq_mutex);
304  QUEUE_MOVE(&loop->wq, &wq);
305  uv_mutex_unlock(&loop->wq_mutex);
306 
307  while (!QUEUE_EMPTY(&wq)) {
308  q = QUEUE_HEAD(&wq);
309  QUEUE_REMOVE(q);
310 
311  w = container_of(q, struct uv__work, wq);
312  err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
313  w->done(w, err);
314  }
315 }
static mcore_handle handle
Definition: asm_mcore.c:8
#define QUEUE_HEAD(q)
Definition: queue.h:42
#define QUEUE_MOVE(h, n)
Definition: queue.h:72
void * QUEUE[2]
Definition: queue.h:21
Definition: uv.h:1780

References container_of, err, handle, loop, QUEUE_EMPTY, QUEUE_HEAD, QUEUE_MOVE, QUEUE_REMOVE, uv__cancelled(), uv_mutex_lock(), uv_mutex_unlock(), w, and wq.

Referenced by uv_loop_init().

◆ uv__work_submit()

void uv__work_submit ( uv_loop_t loop,
struct uv__work w,
enum uv__work_kind  kind,
void(*)(struct uv__work *w work,
void(*)(struct uv__work *w, int status done 
)

Definition at line 256 of file threadpool.c.

260  {
262  w->loop = loop;
263  w->work = work;
264  w->done = done;
265  post(&w->wq, kind);
266 }
struct tab * done
Definition: enough.c:233
static void init_once(void)
Definition: threadpool.c:243
UV_EXTERN void uv_once(uv_once_t *guard, void(*callback)(void))
Definition: thread.c:419

References done, init_once(), loop, once, post(), uv_once(), w, and uv__work::work.

Referenced by uv_getaddrinfo(), uv_getnameinfo(), uv_queue_work(), and uv_random().

◆ uv_cancel()

int uv_cancel ( uv_req_t req)

Definition at line 358 of file threadpool.c.

358  {
359  struct uv__work* wreq;
360  uv_loop_t* loop;
361 
362  switch (req->type) {
363  case UV_FS:
364  loop = ((uv_fs_t*) req)->loop;
365  wreq = &((uv_fs_t*) req)->work_req;
366  break;
367  case UV_GETADDRINFO:
368  loop = ((uv_getaddrinfo_t*) req)->loop;
369  wreq = &((uv_getaddrinfo_t*) req)->work_req;
370  break;
371  case UV_GETNAMEINFO:
372  loop = ((uv_getnameinfo_t*) req)->loop;
373  wreq = &((uv_getnameinfo_t*) req)->work_req;
374  break;
375  case UV_RANDOM:
376  loop = ((uv_random_t*) req)->loop;
377  wreq = &((uv_random_t*) req)->work_req;
378  break;
379  case UV_WORK:
380  loop = ((uv_work_t*) req)->loop;
381  wreq = &((uv_work_t*) req)->work_req;
382  break;
383  default:
384  return UV_EINVAL;
385  }
386 
387  return uv__work_cancel(loop, req, wreq);
388 }
Definition: uv.h:1306
static int uv__work_cancel(uv_loop_t *loop, uv_req_t *req, struct uv__work *w)
Definition: threadpool.c:269

References loop, req, and uv__work_cancel().

Referenced by signal_handler().

◆ uv_queue_work()

int uv_queue_work ( uv_loop_t loop,
uv_work_t req,
uv_work_cb  work_cb,
uv_after_work_cb  after_work_cb 
)

Definition at line 338 of file threadpool.c.

341  {
342  if (work_cb == NULL)
343  return UV_EINVAL;
344 
345  uv__req_init(loop, req, UV_WORK);
346  req->loop = loop;
347  req->work_cb = work_cb;
348  req->after_work_cb = after_work_cb;
350  &req->work_req,
351  UV__WORK_CPU,
354  return 0;
355 }
static void uv__queue_done(struct uv__work *w, int err)
Definition: threadpool.c:325
static void uv__queue_work(struct uv__work *w)
Definition: threadpool.c:318
void uv__work_submit(uv_loop_t *loop, struct uv__work *w, enum uv__work_kind kind, void(*work)(struct uv__work *w), void(*done)(struct uv__work *w, int status))
Definition: threadpool.c:256
#define uv__req_init(loop, req, typ)
Definition: uv-common.h:329

References loop, NULL, req, uv__queue_done(), uv__queue_work(), uv__req_init, UV__WORK_CPU, and uv__work_submit().

Referenced by main().

◆ worker()

static void worker ( void *  arg)
static

Definition at line 57 of file threadpool.c.

57  {
58  struct uv__work* w;
59  QUEUE* q;
60  int is_slow_work;
61 
63  arg = NULL;
64 
66  for (;;) {
67  /* `mutex` should always be locked at this point. */
68 
69  /* Keep waiting while either no work is present or only slow I/O
70  and we're at the threshold for that. */
71  while (QUEUE_EMPTY(&wq) ||
75  idle_threads += 1;
77  idle_threads -= 1;
78  }
79 
80  q = QUEUE_HEAD(&wq);
81  if (q == &exit_message) {
84  break;
85  }
86 
87  QUEUE_REMOVE(q);
88  QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */
89 
90  is_slow_work = 0;
91  if (q == &run_slow_work_message) {
92  /* If we're at the slow I/O threshold, re-schedule until after all
93  other work in the queue is done. */
95  QUEUE_INSERT_TAIL(&wq, q);
96  continue;
97  }
98 
99  /* If we encountered a request to run slow I/O work but there is none
100  to run, that means it's cancelled => Start over. */
102  continue;
103 
104  is_slow_work = 1;
106 
108  QUEUE_REMOVE(q);
109  QUEUE_INIT(q);
110 
111  /* If there is more slow I/O work, schedule it to be run as well. */
114  if (idle_threads > 0)
116  }
117  }
118 
120 
121  w = QUEUE_DATA(q, struct uv__work, wq);
122  w->work(w);
123 
124  uv_mutex_lock(&w->loop->wq_mutex);
125  w->work = NULL; /* Signal uv_cancel() that the work req is done
126  executing. */
127  QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
128  uv_async_send(&w->loop->wq_async);
129  uv_mutex_unlock(&w->loop->wq_mutex);
130 
131  /* Lock `mutex` since that is expected at the start of the next
132  * iteration. */
134  if (is_slow_work) {
135  /* `slow_io_work_running` is protected by `mutex`. */
137  }
138  }
139 }
#define QUEUE_DATA(ptr, type, field)
Definition: queue.h:30
#define QUEUE_NEXT(q)
Definition: queue.h:24
static unsigned int slow_io_work_running
Definition: threadpool.c:36
static unsigned int slow_work_thread_threshold(void)
Definition: threadpool.c:45
UV_EXTERN void uv_sem_post(uv_sem_t *sem)
Definition: thread.c:670
UV_EXTERN void uv_cond_wait(uv_cond_t *cond, uv_mutex_t *mutex)
Definition: thread.c:779

References cond, exit_message, idle_threads, mutex, NULL, QUEUE_DATA, QUEUE_EMPTY, QUEUE_HEAD, QUEUE_INIT, QUEUE_INSERT_TAIL, QUEUE_NEXT, QUEUE_REMOVE, run_slow_work_message, slow_io_pending_wq, slow_io_work_running, slow_work_thread_threshold(), uv_async_send(), uv_cond_signal(), uv_cond_wait(), uv_mutex_lock(), uv_mutex_unlock(), uv_sem_post(), w, and wq.

Referenced by init_threads(), on_new_connection(), and setup_workers().

Variable Documentation

◆ cond

uv_cond_t cond
static

Definition at line 33 of file threadpool.c.

Referenced by init_threads(), post(), uv__threadpool_cleanup(), and worker().

◆ default_threads

uv_thread_t default_threads[4]
static

Definition at line 39 of file threadpool.c.

Referenced by init_threads(), and uv__threadpool_cleanup().

◆ exit_message

QUEUE exit_message
static

Definition at line 40 of file threadpool.c.

Referenced by uv__threadpool_cleanup(), and worker().

◆ idle_threads

unsigned int idle_threads
static

Definition at line 35 of file threadpool.c.

Referenced by post(), and worker().

◆ mutex

◆ nthreads

unsigned int nthreads
static

Definition at line 37 of file threadpool.c.

Referenced by init_threads(), slow_work_thread_threshold(), and uv__threadpool_cleanup().

◆ once

◆ run_slow_work_message

QUEUE run_slow_work_message
static

Definition at line 42 of file threadpool.c.

Referenced by init_threads(), post(), and worker().

◆ slow_io_pending_wq

QUEUE slow_io_pending_wq
static

Definition at line 43 of file threadpool.c.

Referenced by init_threads(), post(), and worker().

◆ slow_io_work_running

unsigned int slow_io_work_running
static

Definition at line 36 of file threadpool.c.

Referenced by worker().

◆ threads

◆ wq

QUEUE wq
static

Definition at line 41 of file threadpool.c.

Referenced by init_threads(), post(), uv__work_done(), and worker().