Rizin
unix-like reverse engineering framework and cli tools
stream.c
Go to the documentation of this file.
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv.h"
23 #include "internal.h"
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <errno.h>
30 
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <sys/uio.h>
34 #include <sys/un.h>
35 #include <unistd.h>
36 #include <limits.h> /* IOV_MAX */
37 
38 #if defined(__APPLE__)
39 # include <sys/event.h>
40 # include <sys/time.h>
41 # include <sys/select.h>
42 
43 /* Forward declaration */
44 typedef struct uv__stream_select_s uv__stream_select_t;
45 
46 struct uv__stream_select_s {
48  uv_thread_t thread;
49  uv_sem_t close_sem;
50  uv_sem_t async_sem;
52  int events;
53  int fake_fd;
54  int int_fd;
55  int fd;
56  fd_set* sread;
57  size_t sread_sz;
58  fd_set* swrite;
59  size_t swrite_sz;
60 };
61 
62 /* Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
63  * EPROTOTYPE can be returned while trying to write to a socket that is
64  * shutting down. If we retry the write, we should get the expected EPIPE
65  * instead.
66  */
67 # define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE)
68 # define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
69  (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \
70  (errno == EMSGSIZE && send_handle != NULL))
71 #else
72 # define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR)
73 # define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
74  (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
75 #endif /* defined(__APPLE__) */
76 
77 static void uv__stream_connect(uv_stream_t*);
78 static void uv__write(uv_stream_t* stream);
79 static void uv__read(uv_stream_t* stream);
80 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
82 static size_t uv__write_req_size(uv_write_t* req);
83 
84 
88  int err;
89 
91  stream->read_cb = NULL;
92  stream->alloc_cb = NULL;
93  stream->close_cb = NULL;
94  stream->connection_cb = NULL;
95  stream->connect_req = NULL;
96  stream->shutdown_req = NULL;
97  stream->accepted_fd = -1;
98  stream->queued_fds = NULL;
99  stream->delayed_error = 0;
100  QUEUE_INIT(&stream->write_queue);
101  QUEUE_INIT(&stream->write_completed_queue);
102  stream->write_queue_size = 0;
103 
104  if (loop->emfile_fd == -1) {
105  err = uv__open_cloexec("/dev/null", O_RDONLY);
106  if (err < 0)
107  /* In the rare case that "/dev/null" isn't mounted open "/"
108  * instead.
109  */
110  err = uv__open_cloexec("/", O_RDONLY);
111  if (err >= 0)
112  loop->emfile_fd = err;
113  }
114 
115 #if defined(__APPLE__)
116  stream->select = NULL;
117 #endif /* defined(__APPLE_) */
118 
119  uv__io_init(&stream->io_watcher, uv__stream_io, -1);
120 }
121 
122 
124 #if defined(__APPLE__)
125  /* Notify select() thread about state change */
126  uv__stream_select_t* s;
127  int r;
128 
129  s = stream->select;
130  if (s == NULL)
131  return;
132 
133  /* Interrupt select() loop
134  * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
135  * emit read event on other side
136  */
137  do
138  r = write(s->fake_fd, "x", 1);
139  while (r == -1 && errno == EINTR);
140 
141  assert(r == 1);
142 #else /* !defined(__APPLE__) */
143  /* No-op on any other platform */
144 #endif /* !defined(__APPLE__) */
145 }
146 
147 
148 #if defined(__APPLE__)
149 static void uv__stream_osx_select(void* arg) {
151  uv__stream_select_t* s;
152  char buf[1024];
153  int events;
154  int fd;
155  int r;
156  int max_fd;
157 
158  stream = arg;
159  s = stream->select;
160  fd = s->fd;
161 
162  if (fd > s->int_fd)
163  max_fd = fd;
164  else
165  max_fd = s->int_fd;
166 
167  while (1) {
168  /* Terminate on semaphore */
169  if (uv_sem_trywait(&s->close_sem) == 0)
170  break;
171 
172  /* Watch fd using select(2) */
173  memset(s->sread, 0, s->sread_sz);
174  memset(s->swrite, 0, s->swrite_sz);
175 
176  if (uv__io_active(&stream->io_watcher, POLLIN))
177  FD_SET(fd, s->sread);
178  if (uv__io_active(&stream->io_watcher, POLLOUT))
179  FD_SET(fd, s->swrite);
180  FD_SET(s->int_fd, s->sread);
181 
182  /* Wait indefinitely for fd events */
183  r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
184  if (r == -1) {
185  if (errno == EINTR)
186  continue;
187 
188  /* XXX: Possible?! */
189  abort();
190  }
191 
192  /* Ignore timeouts */
193  if (r == 0)
194  continue;
195 
196  /* Empty socketpair's buffer in case of interruption */
197  if (FD_ISSET(s->int_fd, s->sread))
198  while (1) {
199  r = read(s->int_fd, buf, sizeof(buf));
200 
201  if (r == sizeof(buf))
202  continue;
203 
204  if (r != -1)
205  break;
206 
207  if (errno == EAGAIN || errno == EWOULDBLOCK)
208  break;
209 
210  if (errno == EINTR)
211  continue;
212 
213  abort();
214  }
215 
216  /* Handle events */
217  events = 0;
218  if (FD_ISSET(fd, s->sread))
219  events |= POLLIN;
220  if (FD_ISSET(fd, s->swrite))
221  events |= POLLOUT;
222 
223  assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
224  if (events != 0) {
225  ACCESS_ONCE(int, s->events) = events;
226 
227  uv_async_send(&s->async);
228  uv_sem_wait(&s->async_sem);
229 
230  /* Should be processed at this stage */
231  assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
232  }
233  }
234 }
235 
236 
237 static void uv__stream_osx_select_cb(uv_async_t* handle) {
238  uv__stream_select_t* s;
240  int events;
241 
242  s = container_of(handle, uv__stream_select_t, async);
243  stream = s->stream;
244 
245  /* Get and reset stream's events */
246  events = s->events;
247  ACCESS_ONCE(int, s->events) = 0;
248 
249  assert(events != 0);
250  assert(events == (events & (POLLIN | POLLOUT)));
251 
252  /* Invoke callback on event-loop */
253  if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
254  uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
255 
256  if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
257  uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
258 
259  if (stream->flags & UV_HANDLE_CLOSING)
260  return;
261 
262  /* NOTE: It is important to do it here, otherwise `select()` might be called
263  * before the actual `uv__read()`, leading to the blocking syscall
264  */
265  uv_sem_post(&s->async_sem);
266 }
267 
268 
269 static void uv__stream_osx_cb_close(uv_handle_t* async) {
270  uv__stream_select_t* s;
271 
272  s = container_of(async, uv__stream_select_t, async);
273  uv__free(s);
274 }
275 
276 
277 int uv__stream_try_select(uv_stream_t* stream, int* fd) {
278  /*
279  * kqueue doesn't work with some files from /dev mount on osx.
280  * select(2) in separate thread for those fds
281  */
282 
283  struct kevent filter[1];
284  struct kevent events[1];
285  struct timespec timeout;
286  uv__stream_select_t* s;
287  int fds[2];
288  int err;
289  int ret;
290  int kq;
291  int old_fd;
292  int max_fd;
293  size_t sread_sz;
294  size_t swrite_sz;
295 
296  kq = kqueue();
297  if (kq == -1) {
298  perror("(libuv) kqueue()");
299  return UV__ERR(errno);
300  }
301 
302  EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
303 
304  /* Use small timeout, because we only want to capture EINVALs */
305  timeout.tv_sec = 0;
306  timeout.tv_nsec = 1;
307 
308  do
309  ret = kevent(kq, filter, 1, events, 1, &timeout);
310  while (ret == -1 && errno == EINTR);
311 
312  uv__close(kq);
313 
314  if (ret == -1)
315  return UV__ERR(errno);
316 
317  if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
318  return 0;
319 
320  /* At this point we definitely know that this fd won't work with kqueue */
321 
322  /*
323  * Create fds for io watcher and to interrupt the select() loop.
324  * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
325  */
326  if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
327  return UV__ERR(errno);
328 
329  max_fd = *fd;
330  if (fds[1] > max_fd)
331  max_fd = fds[1];
332 
333  sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
334  swrite_sz = sread_sz;
335 
336  s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
337  if (s == NULL) {
338  err = UV_ENOMEM;
339  goto failed_malloc;
340  }
341 
342  s->events = 0;
343  s->fd = *fd;
344  s->sread = (fd_set*) ((char*) s + sizeof(*s));
345  s->sread_sz = sread_sz;
346  s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
347  s->swrite_sz = swrite_sz;
348 
349  err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
350  if (err)
351  goto failed_async_init;
352 
353  s->async.flags |= UV_HANDLE_INTERNAL;
354  uv__handle_unref(&s->async);
355 
356  err = uv_sem_init(&s->close_sem, 0);
357  if (err != 0)
358  goto failed_close_sem_init;
359 
360  err = uv_sem_init(&s->async_sem, 0);
361  if (err != 0)
362  goto failed_async_sem_init;
363 
364  s->fake_fd = fds[0];
365  s->int_fd = fds[1];
366 
367  old_fd = *fd;
368  s->stream = stream;
369  stream->select = s;
370  *fd = s->fake_fd;
371 
372  err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
373  if (err != 0)
374  goto failed_thread_create;
375 
376  return 0;
377 
378 failed_thread_create:
379  s->stream = NULL;
380  stream->select = NULL;
381  *fd = old_fd;
382 
383  uv_sem_destroy(&s->async_sem);
384 
385 failed_async_sem_init:
386  uv_sem_destroy(&s->close_sem);
387 
388 failed_close_sem_init:
389  uv__close(fds[0]);
390  uv__close(fds[1]);
391  uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
392  return err;
393 
394 failed_async_init:
395  uv__free(s);
396 
397 failed_malloc:
398  uv__close(fds[0]);
399  uv__close(fds[1]);
400 
401  return err;
402 }
403 #endif /* defined(__APPLE__) */
404 
405 
407 #if defined(__APPLE__)
408  int enable;
409 #endif
410 
411  if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
412  return UV_EBUSY;
413 
414  assert(fd >= 0);
415  stream->flags |= flags;
416 
417  if (stream->type == UV_TCP) {
418  if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
419  return UV__ERR(errno);
420 
421  /* TODO Use delay the user passed in. */
422  if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
423  uv__tcp_keepalive(fd, 1, 60)) {
424  return UV__ERR(errno);
425  }
426  }
427 
428 #if defined(__APPLE__)
429  enable = 1;
430  if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
431  errno != ENOTSOCK &&
432  errno != EINVAL) {
433  return UV__ERR(errno);
434  }
435 #endif
436 
437  stream->io_watcher.fd = fd;
438 
439  return 0;
440 }
441 
442 
444  uv_write_t* req;
445  QUEUE* q;
446  while (!QUEUE_EMPTY(&stream->write_queue)) {
447  q = QUEUE_HEAD(&stream->write_queue);
448  QUEUE_REMOVE(q);
449 
451  req->error = error;
452 
453  QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
454  }
455 }
456 
457 
459  assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
460  assert(stream->flags & UV_HANDLE_CLOSED);
461 
462  if (stream->connect_req) {
463  uv__req_unregister(stream->loop, stream->connect_req);
464  stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
465  stream->connect_req = NULL;
466  }
467 
468  uv__stream_flush_write_queue(stream, UV_ECANCELED);
470 
471  if (stream->shutdown_req) {
472  /* The ECANCELED error code is a lie, the shutdown(2) syscall is a
473  * fait accompli at this point. Maybe we should revisit this in v0.11.
474  * A possible reason for leaving it unchanged is that it informs the
475  * callee that the handle has been destroyed.
476  */
477  uv__req_unregister(stream->loop, stream->shutdown_req);
478  stream->shutdown_req->cb(stream->shutdown_req, UV_ECANCELED);
479  stream->shutdown_req = NULL;
480  }
481 
482  assert(stream->write_queue_size == 0);
483 }
484 
485 
486 /* Implements a best effort approach to mitigating accept() EMFILE errors.
487  * We have a spare file descriptor stashed away that we close to get below
488  * the EMFILE limit. Next, we accept all pending connections and close them
489  * immediately to signal the clients that we're overloaded - and we are, but
490  * we still keep on trucking.
491  *
492  * There is one caveat: it's not reliable in a multi-threaded environment.
493  * The file descriptor limit is per process. Our party trick fails if another
494  * thread opens a file or creates a socket in the time window between us
495  * calling close() and accept().
496  */
497 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
498  int err;
499  int emfile_fd;
500 
501  if (loop->emfile_fd == -1)
502  return UV_EMFILE;
503 
504  uv__close(loop->emfile_fd);
505  loop->emfile_fd = -1;
506 
507  do {
508  err = uv__accept(accept_fd);
509  if (err >= 0)
510  uv__close(err);
511  } while (err >= 0 || err == UV_EINTR);
512 
513  emfile_fd = uv__open_cloexec("/", O_RDONLY);
514  if (emfile_fd >= 0)
515  loop->emfile_fd = emfile_fd;
516 
517  return err;
518 }
519 
520 
521 #if defined(UV_HAVE_KQUEUE)
522 # define UV_DEC_BACKLOG(w) w->rcount--;
523 #else
524 # define UV_DEC_BACKLOG(w) /* no-op */
525 #endif /* defined(UV_HAVE_KQUEUE) */
526 
527 
528 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
530  int err;
531 
532  stream = container_of(w, uv_stream_t, io_watcher);
533  assert(events & POLLIN);
534  assert(stream->accepted_fd == -1);
535  assert(!(stream->flags & UV_HANDLE_CLOSING));
536 
537  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
538 
539  /* connection_cb can close the server socket while we're
540  * in the loop so check it on each iteration.
541  */
542  while (uv__stream_fd(stream) != -1) {
543  assert(stream->accepted_fd == -1);
544 
545 #if defined(UV_HAVE_KQUEUE)
546  if (w->rcount <= 0)
547  return;
548 #endif /* defined(UV_HAVE_KQUEUE) */
549 
551  if (err < 0) {
552  if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
553  return; /* Not an error. */
554 
555  if (err == UV_ECONNABORTED)
556  continue; /* Ignore. Nothing we can do about that. */
557 
558  if (err == UV_EMFILE || err == UV_ENFILE) {
560  if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
561  break;
562  }
563 
564  stream->connection_cb(stream, err);
565  continue;
566  }
567 
569  stream->accepted_fd = err;
570  stream->connection_cb(stream, 0);
571 
572  if (stream->accepted_fd != -1) {
573  /* The user hasn't yet accepted called uv_accept() */
574  uv__io_stop(loop, &stream->io_watcher, POLLIN);
575  return;
576  }
577 
578  if (stream->type == UV_TCP &&
579  (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
580  /* Give other processes a chance to accept connections. */
581  struct timespec timeout = { 0, 1 };
583  }
584  }
585 }
586 
587 
588 #undef UV_DEC_BACKLOG
589 
590 
591 int uv_accept(uv_stream_t* server, uv_stream_t* client) {
592  int err;
593 
594  assert(server->loop == client->loop);
595 
596  if (server->accepted_fd == -1)
597  return UV_EAGAIN;
598 
599  switch (client->type) {
600  case UV_NAMED_PIPE:
601  case UV_TCP:
602  err = uv__stream_open(client,
603  server->accepted_fd,
605  if (err) {
606  /* TODO handle error */
607  uv__close(server->accepted_fd);
608  goto done;
609  }
610  break;
611 
612  case UV_UDP:
613  err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
614  if (err) {
615  uv__close(server->accepted_fd);
616  goto done;
617  }
618  break;
619 
620  default:
621  return UV_EINVAL;
622  }
623 
624  client->flags |= UV_HANDLE_BOUND;
625 
626 done:
627  /* Process queued fds */
628  if (server->queued_fds != NULL) {
629  uv__stream_queued_fds_t* queued_fds;
630 
631  queued_fds = server->queued_fds;
632 
633  /* Read first */
634  server->accepted_fd = queued_fds->fds[0];
635 
636  /* All read, free */
637  assert(queued_fds->offset > 0);
638  if (--queued_fds->offset == 0) {
639  uv__free(queued_fds);
640  server->queued_fds = NULL;
641  } else {
642  /* Shift rest */
643  memmove(queued_fds->fds,
644  queued_fds->fds + 1,
645  queued_fds->offset * sizeof(*queued_fds->fds));
646  }
647  } else {
648  server->accepted_fd = -1;
649  if (err == 0)
650  uv__io_start(server->loop, &server->io_watcher, POLLIN);
651  }
652  return err;
653 }
654 
655 
657  int err;
658 
659  switch (stream->type) {
660  case UV_TCP:
661  err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
662  break;
663 
664  case UV_NAMED_PIPE:
665  err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
666  break;
667 
668  default:
669  err = UV_EINVAL;
670  }
671 
672  if (err == 0)
674 
675  return err;
676 }
677 
678 
679 static void uv__drain(uv_stream_t* stream) {
681  int err;
682 
683  assert(QUEUE_EMPTY(&stream->write_queue));
684  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
686 
687  /* Shutdown? */
688  if ((stream->flags & UV_HANDLE_SHUTTING) &&
689  !(stream->flags & UV_HANDLE_CLOSING) &&
690  !(stream->flags & UV_HANDLE_SHUT)) {
691  assert(stream->shutdown_req);
692 
693  req = stream->shutdown_req;
694  stream->shutdown_req = NULL;
695  stream->flags &= ~UV_HANDLE_SHUTTING;
696  uv__req_unregister(stream->loop, req);
697 
698  err = 0;
699  if (shutdown(uv__stream_fd(stream), SHUT_WR))
700  err = UV__ERR(errno);
701 
702  if (err == 0)
703  stream->flags |= UV_HANDLE_SHUT;
704 
705  if (req->cb != NULL)
706  req->cb(req, err);
707  }
708 }
709 
710 
711 static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
712  if (n == 1)
713  return write(fd, vec->iov_base, vec->iov_len);
714  else
715  return writev(fd, vec, n);
716 }
717 
718 
720  size_t size;
721 
722  assert(req->bufs != NULL);
723  size = uv__count_bufs(req->bufs + req->write_index,
724  req->nbufs - req->write_index);
725  assert(req->handle->write_queue_size >= size);
726 
727  return size;
728 }
729 
730 
731 /* Returns 1 if all write request data has been written, or 0 if there is still
732  * more data to write.
733  *
734  * Note: the return value only says something about the *current* request.
735  * There may still be other write requests sitting in the queue.
736  */
738  uv_write_t* req,
739  size_t n) {
740  uv_buf_t* buf;
741  size_t len;
742 
743  assert(n <= stream->write_queue_size);
744  stream->write_queue_size -= n;
745 
746  buf = req->bufs + req->write_index;
747 
748  do {
749  len = n < buf->len ? n : buf->len;
750  buf->base += len;
751  buf->len -= len;
752  buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */
753  n -= len;
754  } while (n > 0);
755 
756  req->write_index = buf - req->bufs;
757 
758  return req->write_index == req->nbufs;
759 }
760 
761 
763  uv_stream_t* stream = req->handle;
764 
765  /* Pop the req off tcp->write_queue. */
766  QUEUE_REMOVE(&req->queue);
767 
768  /* Only free when there was no error. On error, we touch up write_queue_size
769  * right before making the callback. The reason we don't do that right away
770  * is that a write_queue_size > 0 is our only way to signal to the user that
771  * they should stop writing - which they should if we got an error. Something
772  * to revisit in future revisions of the libuv API.
773  */
774  if (req->error == 0) {
775  if (req->bufs != req->bufsml)
776  uv__free(req->bufs);
777  req->bufs = NULL;
778  }
779 
780  /* Add it to the write_completed_queue where it will have its
781  * callback called in the near future.
782  */
783  QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
784  uv__io_feed(stream->loop, &stream->io_watcher);
785 }
786 
787 
789  switch (handle->type) {
790  case UV_NAMED_PIPE:
791  case UV_TCP:
792  return ((uv_stream_t*) handle)->io_watcher.fd;
793 
794  case UV_UDP:
795  return ((uv_udp_t*) handle)->io_watcher.fd;
796 
797  default:
798  return -1;
799  }
800 }
801 
802 static void uv__write(uv_stream_t* stream) {
803  struct iovec* iov;
804  QUEUE* q;
805  uv_write_t* req;
806  int iovmax;
807  int iovcnt;
808  ssize_t n;
809  int err;
810 
811 start:
812 
813  assert(uv__stream_fd(stream) >= 0);
814 
815  if (QUEUE_EMPTY(&stream->write_queue))
816  return;
817 
818  q = QUEUE_HEAD(&stream->write_queue);
820  assert(req->handle == stream);
821 
822  /*
823  * Cast to iovec. We had to have our own uv_buf_t instead of iovec
824  * because Windows's WSABUF is not an iovec.
825  */
826  assert(sizeof(uv_buf_t) == sizeof(struct iovec));
827  iov = (struct iovec*) &(req->bufs[req->write_index]);
828  iovcnt = req->nbufs - req->write_index;
829 
830  iovmax = uv__getiovmax();
831 
832  /* Limit iov count to avoid EINVALs from writev() */
833  if (iovcnt > iovmax)
834  iovcnt = iovmax;
835 
836  /*
837  * Now do the actual writev. Note that we've been updating the pointers
838  * inside the iov each time we write. So there is no need to offset it.
839  */
840 
841  if (req->send_handle) {
842  int fd_to_send;
843  struct msghdr msg;
844  struct cmsghdr *cmsg;
845  union {
846  char data[64];
847  struct cmsghdr alias;
848  } scratch;
849 
850  if (uv__is_closing(req->send_handle)) {
851  err = UV_EBADF;
852  goto error;
853  }
854 
855  fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
856 
857  memset(&scratch, 0, sizeof(scratch));
858 
859  assert(fd_to_send >= 0);
860 
861  msg.msg_name = NULL;
862  msg.msg_namelen = 0;
863  msg.msg_iov = iov;
864  msg.msg_iovlen = iovcnt;
865  msg.msg_flags = 0;
866 
867  msg.msg_control = &scratch.alias;
868  msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
869 
870  cmsg = CMSG_FIRSTHDR(&msg);
871  cmsg->cmsg_level = SOL_SOCKET;
872  cmsg->cmsg_type = SCM_RIGHTS;
873  cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
874 
875  /* silence aliasing warning */
876  {
877  void* pv = CMSG_DATA(cmsg);
878  int* pi = pv;
879  *pi = fd_to_send;
880  }
881 
882  do
883  n = sendmsg(uv__stream_fd(stream), &msg, 0);
884  while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
885 
886  /* Ensure the handle isn't sent again in case this is a partial write. */
887  if (n >= 0)
888  req->send_handle = NULL;
889  } else {
890  do
891  n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
892  while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
893  }
894 
895  if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
896  err = UV__ERR(errno);
897  goto error;
898  }
899 
900  if (n >= 0 && uv__write_req_update(stream, req, n)) {
902  return; /* TODO(bnoordhuis) Start trying to write the next request. */
903  }
904 
905  /* If this is a blocking stream, try again. */
906  if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
907  goto start;
908 
909  /* We're not done. */
910  uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
911 
912  /* Notify select() thread about state change */
914 
915  return;
916 
917 error:
918  req->error = err;
920  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
921  if (!uv__io_active(&stream->io_watcher, POLLIN))
924 }
925 
926 
928  uv_write_t* req;
929  QUEUE* q;
930  QUEUE pq;
931 
932  if (QUEUE_EMPTY(&stream->write_completed_queue))
933  return;
934 
935  QUEUE_MOVE(&stream->write_completed_queue, &pq);
936 
937  while (!QUEUE_EMPTY(&pq)) {
938  /* Pop a req off write_completed_queue. */
939  q = QUEUE_HEAD(&pq);
941  QUEUE_REMOVE(q);
942  uv__req_unregister(stream->loop, req);
943 
944  if (req->bufs != NULL) {
945  stream->write_queue_size -= uv__write_req_size(req);
946  if (req->bufs != req->bufsml)
947  uv__free(req->bufs);
948  req->bufs = NULL;
949  }
950 
951  /* NOTE: call callback AFTER freeing the request data. */
952  if (req->cb)
953  req->cb(req, req->error);
954  }
955 }
956 
957 
959  struct sockaddr_storage ss;
960  socklen_t sslen;
961  socklen_t len;
962  int type;
963 
964  memset(&ss, 0, sizeof(ss));
965  sslen = sizeof(ss);
966 
967  if (getsockname(fd, (struct sockaddr*)&ss, &sslen))
968  return UV_UNKNOWN_HANDLE;
969 
970  len = sizeof type;
971 
972  if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
973  return UV_UNKNOWN_HANDLE;
974 
975  if (type == SOCK_STREAM) {
976 #if defined(_AIX) || defined(__DragonFly__)
977  /* on AIX/DragonFly the getsockname call returns an empty sa structure
978  * for sockets of type AF_UNIX. For all other types it will
979  * return a properly filled in structure.
980  */
981  if (sslen == 0)
982  return UV_NAMED_PIPE;
983 #endif
984  switch (ss.ss_family) {
985  case AF_UNIX:
986  return UV_NAMED_PIPE;
987  case AF_INET:
988  case AF_INET6:
989  return UV_TCP;
990  }
991  }
992 
993  if (type == SOCK_DGRAM &&
994  (ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
995  return UV_UDP;
996 
997  return UV_UNKNOWN_HANDLE;
998 }
999 
1000 
1002  stream->flags |= UV_HANDLE_READ_EOF;
1003  stream->flags &= ~UV_HANDLE_READING;
1004  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1005  if (!uv__io_active(&stream->io_watcher, POLLOUT))
1008  stream->read_cb(stream, UV_EOF, buf);
1009 }
1010 
1011 
1013  uv__stream_queued_fds_t* queued_fds;
1014  unsigned int queue_size;
1015 
1016  queued_fds = stream->queued_fds;
1017  if (queued_fds == NULL) {
1018  queue_size = 8;
1019  queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
1020  sizeof(*queued_fds));
1021  if (queued_fds == NULL)
1022  return UV_ENOMEM;
1023  queued_fds->size = queue_size;
1024  queued_fds->offset = 0;
1025  stream->queued_fds = queued_fds;
1026 
1027  /* Grow */
1028  } else if (queued_fds->size == queued_fds->offset) {
1029  queue_size = queued_fds->size + 8;
1030  queued_fds = uv__realloc(queued_fds,
1031  (queue_size - 1) * sizeof(*queued_fds->fds) +
1032  sizeof(*queued_fds));
1033 
1034  /*
1035  * Allocation failure, report back.
1036  * NOTE: if it is fatal - sockets will be closed in uv__stream_close
1037  */
1038  if (queued_fds == NULL)
1039  return UV_ENOMEM;
1040  queued_fds->size = queue_size;
1041  stream->queued_fds = queued_fds;
1042  }
1043 
1044  /* Put fd in a queue */
1045  queued_fds->fds[queued_fds->offset++] = fd;
1046 
1047  return 0;
1048 }
1049 
1050 
1051 #if defined(__PASE__)
1052 /* on IBMi PASE the control message length can not exceed 256. */
1053 # define UV__CMSG_FD_COUNT 60
1054 #else
1055 # define UV__CMSG_FD_COUNT 64
1056 #endif
1057 #define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
1058 
1059 
1061  struct cmsghdr* cmsg;
1062 
1063  for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
1064  char* start;
1065  char* end;
1066  int err;
1067  void* pv;
1068  int* pi;
1069  unsigned int i;
1070  unsigned int count;
1071 
1072  if (cmsg->cmsg_type != SCM_RIGHTS) {
1073  fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
1074  cmsg->cmsg_type);
1075  continue;
1076  }
1077 
1078  /* silence aliasing warning */
1079  pv = CMSG_DATA(cmsg);
1080  pi = pv;
1081 
1082  /* Count available fds */
1083  start = (char*) cmsg;
1084  end = (char*) cmsg + cmsg->cmsg_len;
1085  count = 0;
1086  while (start + CMSG_LEN(count * sizeof(*pi)) < end)
1087  count++;
1088  assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
1089 
1090  for (i = 0; i < count; i++) {
1091  /* Already has accepted fd, queue now */
1092  if (stream->accepted_fd != -1) {
1093  err = uv__stream_queue_fd(stream, pi[i]);
1094  if (err != 0) {
1095  /* Close rest */
1096  for (; i < count; i++)
1097  uv__close(pi[i]);
1098  return err;
1099  }
1100  } else {
1101  stream->accepted_fd = pi[i];
1102  }
1103  }
1104  }
1105 
1106  return 0;
1107 }
1108 
1109 
1110 #ifdef __clang__
1111 # pragma clang diagnostic push
1112 # pragma clang diagnostic ignored "-Wgnu-folding-constant"
1113 # pragma clang diagnostic ignored "-Wvla-extension"
1114 #endif
1115 
1116 static void uv__read(uv_stream_t* stream) {
1117  uv_buf_t buf;
1118  ssize_t nread;
1119  struct msghdr msg;
1120  char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
1121  int count;
1122  int err;
1123  int is_ipc;
1124 
1125  stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1126 
1127  /* Prevent loop starvation when the data comes in as fast as (or faster than)
1128  * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
1129  */
1130  count = 32;
1131 
1132  is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1133 
1134  /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
1135  * tcp->read_cb is NULL or not?
1136  */
1137  while (stream->read_cb
1138  && (stream->flags & UV_HANDLE_READING)
1139  && (count-- > 0)) {
1140  assert(stream->alloc_cb != NULL);
1141 
1142  buf = uv_buf_init(NULL, 0);
1143  stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1144  if (buf.base == NULL || buf.len == 0) {
1145  /* User indicates it can't or won't handle the read. */
1146  stream->read_cb(stream, UV_ENOBUFS, &buf);
1147  return;
1148  }
1149 
1150  assert(buf.base != NULL);
1151  assert(uv__stream_fd(stream) >= 0);
1152 
1153  if (!is_ipc) {
1154  do {
1155  nread = read(uv__stream_fd(stream), buf.base, buf.len);
1156  }
1157  while (nread < 0 && errno == EINTR);
1158  } else {
1159  /* ipc uses recvmsg */
1160  msg.msg_flags = 0;
1161  msg.msg_iov = (struct iovec*) &buf;
1162  msg.msg_iovlen = 1;
1163  msg.msg_name = NULL;
1164  msg.msg_namelen = 0;
1165  /* Set up to receive a descriptor even if one isn't in the message */
1166  msg.msg_controllen = sizeof(cmsg_space);
1167  msg.msg_control = cmsg_space;
1168 
1169  do {
1170  nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1171  }
1172  while (nread < 0 && errno == EINTR);
1173  }
1174 
1175  if (nread < 0) {
1176  /* Error */
1177  if (errno == EAGAIN || errno == EWOULDBLOCK) {
1178  /* Wait for the next one. */
1179  if (stream->flags & UV_HANDLE_READING) {
1180  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1182  }
1183  stream->read_cb(stream, 0, &buf);
1184 #if defined(__CYGWIN__) || defined(__MSYS__)
1185  } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1187  return;
1188 #endif
1189  } else {
1190  /* Error. User should call uv_close(). */
1191  stream->read_cb(stream, UV__ERR(errno), &buf);
1192  if (stream->flags & UV_HANDLE_READING) {
1193  stream->flags &= ~UV_HANDLE_READING;
1194  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1195  if (!uv__io_active(&stream->io_watcher, POLLOUT))
1198  }
1199  }
1200  return;
1201  } else if (nread == 0) {
1203  return;
1204  } else {
1205  /* Successful read */
1206  ssize_t buflen = buf.len;
1207 
1208  if (is_ipc) {
1210  if (err != 0) {
1211  stream->read_cb(stream, err, &buf);
1212  return;
1213  }
1214  }
1215 
1216 #if defined(__MVS__)
1217  if (is_ipc && msg.msg_controllen > 0) {
1218  uv_buf_t blankbuf;
1219  int nread;
1220  struct iovec *old;
1221 
1222  blankbuf.base = 0;
1223  blankbuf.len = 0;
1224  old = msg.msg_iov;
1225  msg.msg_iov = (struct iovec*) &blankbuf;
1226  nread = 0;
1227  do {
1228  nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1230  if (err != 0) {
1231  stream->read_cb(stream, err, &buf);
1232  msg.msg_iov = old;
1233  return;
1234  }
1235  } while (nread == 0 && msg.msg_controllen > 0);
1236  msg.msg_iov = old;
1237  }
1238 #endif
1239  stream->read_cb(stream, nread, &buf);
1240 
1241  /* Return if we didn't fill the buffer, there is no more data to read. */
1242  if (nread < buflen) {
1243  stream->flags |= UV_HANDLE_READ_PARTIAL;
1244  return;
1245  }
1246  }
1247  }
1248 }
1249 
1250 
1251 #ifdef __clang__
1252 # pragma clang diagnostic pop
1253 #endif
1254 
1255 #undef UV__CMSG_FD_COUNT
1256 #undef UV__CMSG_FD_SIZE
1257 
1258 
1260  assert(stream->type == UV_TCP ||
1261  stream->type == UV_TTY ||
1262  stream->type == UV_NAMED_PIPE);
1263 
1264  if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1265  stream->flags & UV_HANDLE_SHUT ||
1266  stream->flags & UV_HANDLE_SHUTTING ||
1268  return UV_ENOTCONN;
1269  }
1270 
1271  assert(uv__stream_fd(stream) >= 0);
1272 
1273  /* Initialize request */
1274  uv__req_init(stream->loop, req, UV_SHUTDOWN);
1275  req->handle = stream;
1276  req->cb = cb;
1277  stream->shutdown_req = req;
1278  stream->flags |= UV_HANDLE_SHUTTING;
1279 
1280  uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1282 
1283  return 0;
1284 }
1285 
1286 
1287 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1289 
1290  stream = container_of(w, uv_stream_t, io_watcher);
1291 
1292  assert(stream->type == UV_TCP ||
1293  stream->type == UV_NAMED_PIPE ||
1294  stream->type == UV_TTY);
1295  assert(!(stream->flags & UV_HANDLE_CLOSING));
1296 
1297  if (stream->connect_req) {
1299  return;
1300  }
1301 
1302  assert(uv__stream_fd(stream) >= 0);
1303 
1304  /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
1305  if (events & (POLLIN | POLLERR | POLLHUP))
1306  uv__read(stream);
1307 
1308  if (uv__stream_fd(stream) == -1)
1309  return; /* read_cb closed stream. */
1310 
1311  /* Short-circuit iff POLLHUP is set, the user is still interested in read
1312  * events and uv__read() reported a partial read but not EOF. If the EOF
1313  * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1314  * have to do anything. If the partial read flag is not set, we can't
1315  * report the EOF yet because there is still data to read.
1316  */
1317  if ((events & POLLHUP) &&
1318  (stream->flags & UV_HANDLE_READING) &&
1319  (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1320  !(stream->flags & UV_HANDLE_READ_EOF)) {
1321  uv_buf_t buf = { NULL, 0 };
1323  }
1324 
1325  if (uv__stream_fd(stream) == -1)
1326  return; /* read_cb closed stream. */
1327 
1328  if (events & (POLLOUT | POLLERR | POLLHUP)) {
1329  uv__write(stream);
1331 
1332  /* Write queue drained. */
1333  if (QUEUE_EMPTY(&stream->write_queue))
1334  uv__drain(stream);
1335  }
1336 }
1337 
1338 
1345  int error;
1346  uv_connect_t* req = stream->connect_req;
1347  socklen_t errorsize = sizeof(int);
1348 
1349  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1350  assert(req);
1351 
1352  if (stream->delayed_error) {
1353  /* To smooth over the differences between unixes errors that
1354  * were reported synchronously on the first connect can be delayed
1355  * until the next tick--which is now.
1356  */
1357  error = stream->delayed_error;
1358  stream->delayed_error = 0;
1359  } else {
1360  /* Normal situation: we need to get the socket error from the kernel. */
1361  assert(uv__stream_fd(stream) >= 0);
1362  getsockopt(uv__stream_fd(stream),
1363  SOL_SOCKET,
1364  SO_ERROR,
1365  &error,
1366  &errorsize);
1367  error = UV__ERR(error);
1368  }
1369 
1370  if (error == UV__ERR(EINPROGRESS))
1371  return;
1372 
1373  stream->connect_req = NULL;
1374  uv__req_unregister(stream->loop, req);
1375 
1376  if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
1377  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1378  }
1379 
1380  if (req->cb)
1381  req->cb(req, error);
1382 
1383  if (uv__stream_fd(stream) == -1)
1384  return;
1385 
1386  if (error < 0) {
1387  uv__stream_flush_write_queue(stream, UV_ECANCELED);
1389  }
1390 }
1391 
1392 
1395  const uv_buf_t bufs[],
1396  unsigned int nbufs,
1397  uv_stream_t* send_handle,
1398  uv_write_cb cb) {
1399  int empty_queue;
1400 
1401  assert(nbufs > 0);
1402  assert((stream->type == UV_TCP ||
1403  stream->type == UV_NAMED_PIPE ||
1404  stream->type == UV_TTY) &&
1405  "uv_write (unix) does not yet support other types of streams");
1406 
1407  if (uv__stream_fd(stream) < 0)
1408  return UV_EBADF;
1409 
1410  if (!(stream->flags & UV_HANDLE_WRITABLE))
1411  return UV_EPIPE;
1412 
1413  if (send_handle) {
1414  if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1415  return UV_EINVAL;
1416 
1417  /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1418  * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1419  * evaluates to a function that operates on a uv_stream_t with a couple of
1420  * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1421  * which works but only by accident.
1422  */
1423  if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1424  return UV_EBADF;
1425 
1426 #if defined(__CYGWIN__) || defined(__MSYS__)
1427  /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
1428  See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
1429  return UV_ENOSYS;
1430 #endif
1431  }
1432 
1433  /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1434  * it means there are error-state requests in the write_completed_queue that
1435  * will touch up write_queue_size later, see also uv__write_req_finish().
1436  * We could check that write_queue is empty instead but that implies making
1437  * a write() syscall when we know that the handle is in error mode.
1438  */
1439  empty_queue = (stream->write_queue_size == 0);
1440 
1441  /* Initialize the req */
1442  uv__req_init(stream->loop, req, UV_WRITE);
1443  req->cb = cb;
1444  req->handle = stream;
1445  req->error = 0;
1446  req->send_handle = send_handle;
1447  QUEUE_INIT(&req->queue);
1448 
1449  req->bufs = req->bufsml;
1450  if (nbufs > ARRAY_SIZE(req->bufsml))
1451  req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
1452 
1453  if (req->bufs == NULL)
1454  return UV_ENOMEM;
1455 
1456  memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1457  req->nbufs = nbufs;
1458  req->write_index = 0;
1459  stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1460 
1461  /* Append the request to write_queue. */
1462  QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
1463 
1464  /* If the queue was empty when this function began, we should attempt to
1465  * do the write immediately. Otherwise start the write_watcher and wait
1466  * for the fd to become writable.
1467  */
1468  if (stream->connect_req) {
1469  /* Still connecting, do nothing. */
1470  }
1471  else if (empty_queue) {
1472  uv__write(stream);
1473  }
1474  else {
1475  /*
1476  * blocking streams should never have anything in the queue.
1477  * if this assert fires then somehow the blocking stream isn't being
1478  * sufficiently flushed in uv__write.
1479  */
1481  uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1483  }
1484 
1485  return 0;
1486 }
1487 
1488 
1489 /* The buffers to be written must remain valid until the callback is called.
1490  * This is not required for the uv_buf_t array.
1491  */
1494  const uv_buf_t bufs[],
1495  unsigned int nbufs,
1496  uv_write_cb cb) {
1497  return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1498 }
1499 
1500 
1502  /* Should not be called */
1503  abort();
1504 }
1505 
1506 
1508  const uv_buf_t bufs[],
1509  unsigned int nbufs) {
1510  int r;
1511  int has_pollout;
1512  size_t written;
1513  size_t req_size;
1514  uv_write_t req;
1515 
1516  /* Connecting or already writing some data */
1517  if (stream->connect_req != NULL || stream->write_queue_size != 0)
1518  return UV_EAGAIN;
1519 
1520  has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);
1521 
1522  r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
1523  if (r != 0)
1524  return r;
1525 
1526  /* Remove not written bytes from write queue size */
1527  written = uv__count_bufs(bufs, nbufs);
1528  if (req.bufs != NULL)
1529  req_size = uv__write_req_size(&req);
1530  else
1531  req_size = 0;
1532  written -= req_size;
1533  stream->write_queue_size -= req_size;
1534 
1535  /* Unqueue request, regardless of immediateness */
1536  QUEUE_REMOVE(&req.queue);
1537  uv__req_unregister(stream->loop, &req);
1538  if (req.bufs != req.bufsml)
1539  uv__free(req.bufs);
1540  req.bufs = NULL;
1541 
1542  /* Do not poll for writable, if we wasn't before calling this */
1543  if (!has_pollout) {
1544  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1546  }
1547 
1548  if (written == 0 && req_size != 0)
1549  return req.error < 0 ? req.error : UV_EAGAIN;
1550  else
1551  return written;
1552 }
1553 
1554 
1556  uv_alloc_cb alloc_cb,
1557  uv_read_cb read_cb) {
1558  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1559  stream->type == UV_TTY);
1560 
1561  if (stream->flags & UV_HANDLE_CLOSING)
1562  return UV_EINVAL;
1563 
1564  if (!(stream->flags & UV_HANDLE_READABLE))
1565  return UV_ENOTCONN;
1566 
1567  /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
1568  * expresses the desired state of the user.
1569  */
1570  stream->flags |= UV_HANDLE_READING;
1571 
1572  /* TODO: try to do the read inline? */
1573  /* TODO: keep track of tcp state. If we've gotten a EOF then we should
1574  * not start the IO watcher.
1575  */
1576  assert(uv__stream_fd(stream) >= 0);
1577  assert(alloc_cb);
1578 
1579  stream->read_cb = read_cb;
1580  stream->alloc_cb = alloc_cb;
1581 
1582  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1585 
1586  return 0;
1587 }
1588 
1589 
1591  if (!(stream->flags & UV_HANDLE_READING))
1592  return 0;
1593 
1594  stream->flags &= ~UV_HANDLE_READING;
1595  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1596  if (!uv__io_active(&stream->io_watcher, POLLOUT))
1599 
1600  stream->read_cb = NULL;
1601  stream->alloc_cb = NULL;
1602  return 0;
1603 }
1604 
1605 
1607  return !!(stream->flags & UV_HANDLE_READABLE);
1608 }
1609 
1610 
1612  return !!(stream->flags & UV_HANDLE_WRITABLE);
1613 }
1614 
1615 
1616 #if defined(__APPLE__)
1617 int uv___stream_fd(const uv_stream_t* handle) {
1618  const uv__stream_select_t* s;
1619 
1620  assert(handle->type == UV_TCP ||
1621  handle->type == UV_TTY ||
1622  handle->type == UV_NAMED_PIPE);
1623 
1624  s = handle->select;
1625  if (s != NULL)
1626  return s->fd;
1627 
1628  return handle->io_watcher.fd;
1629 }
1630 #endif /* defined(__APPLE__) */
1631 
1632 
1634  unsigned int i;
1635  uv__stream_queued_fds_t* queued_fds;
1636 
1637 #if defined(__APPLE__)
1638  /* Terminate select loop first */
1639  if (handle->select != NULL) {
1640  uv__stream_select_t* s;
1641 
1642  s = handle->select;
1643 
1644  uv_sem_post(&s->close_sem);
1645  uv_sem_post(&s->async_sem);
1647  uv_thread_join(&s->thread);
1648  uv_sem_destroy(&s->close_sem);
1649  uv_sem_destroy(&s->async_sem);
1650  uv__close(s->fake_fd);
1651  uv__close(s->int_fd);
1652  uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1653 
1654  handle->select = NULL;
1655  }
1656 #endif /* defined(__APPLE__) */
1657 
1658  uv__io_close(handle->loop, &handle->io_watcher);
1662 
1663  if (handle->io_watcher.fd != -1) {
1664  /* Don't close stdio file descriptors. Nothing good comes from it. */
1665  if (handle->io_watcher.fd > STDERR_FILENO)
1666  uv__close(handle->io_watcher.fd);
1667  handle->io_watcher.fd = -1;
1668  }
1669 
1670  if (handle->accepted_fd != -1) {
1671  uv__close(handle->accepted_fd);
1672  handle->accepted_fd = -1;
1673  }
1674 
1675  /* Close all queued fds */
1676  if (handle->queued_fds != NULL) {
1677  queued_fds = handle->queued_fds;
1678  for (i = 0; i < queued_fds->offset; i++)
1679  uv__close(queued_fds->fds[i]);
1680  uv__free(handle->queued_fds);
1681  handle->queued_fds = NULL;
1682  }
1683 
1684  assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
1685 }
1686 
1687 
1689  /* Don't need to check the file descriptor, uv__nonblock()
1690  * will fail with EBADF if it's not valid.
1691  */
1692  return uv__nonblock(uv__stream_fd(handle), !blocking);
1693 }
size_t len
Definition: 6502dis.c:15
#define ARRAY_SIZE(a)
lzma_index ** i
Definition: index.h:629
static const char * arg(RzAnalysis *a, csh *handle, cs_insn *insn, char *buf, int n)
Definition: arm_esil32.c:136
static bool err
Definition: armass.c:435
static mcore_handle handle
Definition: asm_mcore.c:8
#define NULL
Definition: cris-opc.c:27
#define r
Definition: crypto_rc6.c:12
#define w
Definition: crypto_rc6.c:13
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence static length const void static len static semflg const void static shmflg const struct timespec req
Definition: sflib.h:128
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence static length const void static len static semflg const void static shmflg nanosleep
Definition: sflib.h:128
static static fork write
Definition: sflib.h:33
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void count
Definition: sflib.h:98
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence static length const void static len static semflg const void static shmflg const struct timespec struct timespec static rem const char static group const void start
Definition: sflib.h:133
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz writev
Definition: sflib.h:82
struct tab * done
Definition: enough.c:233
#define UV__ERR(x)
Definition: errno.h:29
ssize_t sread(int, void *, size_t, int)
voidpf void uLong size
Definition: ioapi.h:138
voidpf stream
Definition: ioapi.h:138
voidpf void * buf
Definition: ioapi.h:138
return memset(p, 0, total)
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
static static fork const void static count static fd const char const char static newpath char char char static envp time_t static t const char static mode static whence const char static dir time_t static t unsigned static seconds const char struct utimbuf static buf static inc static sig const char static mode static oldfd struct tms static buf static getgid static geteuid const char static filename static arg static mask struct ustat static ubuf static getppid static setsid static egid sigset_t static set struct timeval struct timezone static tz select
Definition: sflib.h:108
static const char struct stat static buf struct stat static buf static vhangup int status
Definition: sflib.h:145
assert(limit<=UINT32_MAX/2)
while(len< limit &&buf1[len]==buf2[len])++len
int n
Definition: mipsasm.c:19
int type
Definition: mipsasm.c:17
#define QUEUE_DATA(ptr, type, field)
Definition: queue.h:30
#define QUEUE_EMPTY(q)
Definition: queue.h:39
#define QUEUE_HEAD(q)
Definition: queue.h:42
#define QUEUE_INSERT_TAIL(h, q)
Definition: queue.h:92
#define QUEUE_MOVE(h, n)
Definition: queue.h:72
#define QUEUE_INIT(q)
Definition: queue.h:45
void * QUEUE[2]
Definition: queue.h:21
#define QUEUE_REMOVE(q)
Definition: queue.h:101
static RzSocket * s
Definition: rtr.c:28
#define container_of(ptr, type, member)
Definition: rz_types.h:650
static struct sockaddr static addrlen static backlog const void static flags void flags
Definition: sfsocketcall.h:123
static int
Definition: sfsocketcall.h:114
static struct sockaddr static addrlen static backlog const void msg
Definition: sfsocketcall.h:119
#define SO_ERROR
Definition: sftypes.h:432
#define AF_UNIX
Definition: sftypes.h:285
#define SO_OOBINLINE
Definition: sftypes.h:438
#define CMSG_LEN(len)
Definition: sftypes.h:425
@ SCM_RIGHTS
Definition: sftypes.h:383
#define EINVAL
Definition: sftypes.h:132
#define CMSG_FIRSTHDR(mhdr)
Definition: sftypes.h:418
#define FD_ISSET(d, set)
Definition: sftypes.h:214
#define FD_SET(d, set)
Definition: sftypes.h:212
unsigned int uint32_t
Definition: sftypes.h:29
#define EINTR
Definition: sftypes.h:114
#define CMSG_NXTHDR(mhdr, cmsg)
Definition: sftypes.h:417
#define CMSG_DATA(cmsg)
Definition: sftypes.h:416
#define CMSG_SPACE(len)
Definition: sftypes.h:423
#define EINPROGRESS
Definition: sftypes.h:175
unsigned int socklen_t
Definition: sftypes.h:219
#define O_RDONLY
Definition: sftypes.h:486
@ SOCK_DGRAM
Definition: sftypes.h:227
@ SOCK_STREAM
Definition: sftypes.h:224
#define SO_TYPE
Definition: sftypes.h:431
#define SOL_SOCKET
Definition: sftypes.h:427
#define AF_INET
Definition: sftypes.h:287
#define AF_INET6
Definition: sftypes.h:295
#define ECONNRESET
Definition: sftypes.h:164
#define ENOTSOCK
Definition: sftypes.h:148
#define EAGAIN
Definition: sftypes.h:121
int ssize_t
Definition: sftypes.h:39
int cmsg_type
Definition: sftypes.h:412
size_t cmsg_len
Definition: sftypes.h:409
int cmsg_level
Definition: sftypes.h:411
Definition: sftypes.h:73
Definition: unix.h:96
unsigned int size
Definition: internal.h:152
unsigned int offset
Definition: internal.h:153
Definition: uv.h:844
Definition: unix.h:123
size_t len
Definition: unix.h:125
char * base
Definition: unix.h:124
Definition: uv.h:1780
Definition: uv.h:767
Definition: uv.h:547
Definition: uv.h:638
Definition: uv.h:525
uv_loop_t * loop
Definition: main.c:7
uv_async_t async
Definition: main.c:8
static uv_buf_t iov
Definition: main.c:15
uv_timer_t timeout
Definition: main.c:9
void uv__io_stop(uv_loop_t *loop, uv__io_t *w, unsigned int events)
Definition: core.c:910
int uv__open_cloexec(const char *path, int flags)
Definition: core.c:1003
int uv__getiovmax(void)
Definition: core.c:215
void uv__io_start(uv_loop_t *loop, uv__io_t *w, unsigned int events)
Definition: core.c:882
void uv__io_close(uv_loop_t *loop, uv__io_t *w)
Definition: core.c:942
int uv__io_active(const uv__io_t *w, unsigned int events)
Definition: core.c:958
int uv__accept(int sockfd)
Definition: core.c:489
ssize_t uv__recvmsg(int fd, struct msghdr *msg, int flags)
Definition: core.c:670
void uv__io_init(uv__io_t *w, uv__io_cb cb, int fd)
Definition: core.c:865
void uv__io_feed(uv_loop_t *loop, uv__io_t *w)
Definition: core.c:952
int uv__close(int fd)
Definition: core.c:569
int uv__tcp_keepalive(int fd, int on, unsigned int delay)
Definition: tcp.c:380
int uv_pipe_listen(uv_pipe_t *handle, int backlog, uv_connection_cb cb)
Definition: pipe.c:94
int uv_tcp_listen(uv_tcp_t *tcp, int backlog, uv_connection_cb cb)
Definition: tcp.c:328
#define uv__stream_fd(handle)
Definition: internal.h:282
#define uv__nonblock
Definition: internal.h:170
#define ROUND_UP(a, b)
Definition: internal.h:82
#define ACCESS_ONCE(type, var)
Definition: internal.h:79
int uv__tcp_nodelay(int fd, int on)
Definition: tcp.c:373
ut64 buflen
Definition: core.c:76
#define STDERR_FILENO
Definition: private.h:45
static char bufs[4][128]
Buffers for uint64_to_str() and uint64_to_nicestr()
Definition: util.c:18
static int uv__handle_fd(uv_handle_t *handle)
Definition: stream.c:788
static size_t uv__write_req_size(uv_write_t *req)
Definition: stream.c:719
int uv_stream_set_blocking(uv_stream_t *handle, int blocking)
Definition: stream.c:1688
uv_handle_type uv__handle_type(int fd)
Definition: stream.c:958
int uv_listen(uv_stream_t *stream, int backlog, uv_connection_cb cb)
Definition: stream.c:656
void uv__server_io(uv_loop_t *loop, uv__io_t *w, unsigned int events)
Definition: stream.c:528
static int uv__write_req_update(uv_stream_t *stream, uv_write_t *req, size_t n)
Definition: stream.c:737
static void uv__write(uv_stream_t *stream)
Definition: stream.c:802
int uv_write2(uv_write_t *req, uv_stream_t *stream, const uv_buf_t bufs[], unsigned int nbufs, uv_stream_t *send_handle, uv_write_cb cb)
Definition: stream.c:1393
int uv__stream_open(uv_stream_t *stream, int fd, int flags)
Definition: stream.c:406
static void uv__read(uv_stream_t *stream)
Definition: stream.c:1116
int uv_accept(uv_stream_t *server, uv_stream_t *client)
Definition: stream.c:591
int uv_is_readable(const uv_stream_t *stream)
Definition: stream.c:1606
static int uv__stream_queue_fd(uv_stream_t *stream, int fd)
Definition: stream.c:1012
void uv__stream_flush_write_queue(uv_stream_t *stream, int error)
Definition: stream.c:443
static ssize_t uv__writev(int fd, struct iovec *vec, size_t n)
Definition: stream.c:711
static void uv__stream_eof(uv_stream_t *stream, const uv_buf_t *buf)
Definition: stream.c:1001
#define IS_TRANSIENT_WRITE_ERROR(errno, send_handle)
Definition: stream.c:73
static void uv__stream_io(uv_loop_t *loop, uv__io_t *w, unsigned int events)
Definition: stream.c:1287
#define UV__CMSG_FD_SIZE
Definition: stream.c:1057
static void uv__stream_connect(uv_stream_t *)
Definition: stream.c:1344
static void uv__write_req_finish(uv_write_t *req)
Definition: stream.c:762
void uv__stream_destroy(uv_stream_t *stream)
Definition: stream.c:458
static void uv__write_callbacks(uv_stream_t *stream)
Definition: stream.c:927
static void uv__drain(uv_stream_t *stream)
Definition: stream.c:679
void uv__stream_close(uv_stream_t *handle)
Definition: stream.c:1633
int uv_try_write(uv_stream_t *stream, const uv_buf_t bufs[], unsigned int nbufs)
Definition: stream.c:1507
#define RETRY_ON_WRITE_ERROR(errno)
Definition: stream.c:72
static int uv__emfile_trick(uv_loop_t *loop, int accept_fd)
Definition: stream.c:497
int uv_shutdown(uv_shutdown_t *req, uv_stream_t *stream, uv_shutdown_cb cb)
Definition: stream.c:1259
int uv_read_start(uv_stream_t *stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb)
Definition: stream.c:1555
void uv_try_write_cb(uv_write_t *req, int status)
Definition: stream.c:1501
#define UV_DEC_BACKLOG(w)
Definition: stream.c:524
static void uv__stream_osx_interrupt_select(uv_stream_t *stream)
Definition: stream.c:123
static int uv__stream_recv_cmsg(uv_stream_t *stream, struct msghdr *msg)
Definition: stream.c:1060
int uv_write(uv_write_t *req, uv_stream_t *handle, const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb)
Definition: stream.c:1492
int uv_read_stop(uv_stream_t *stream)
Definition: stream.c:1590
void uv__stream_init(uv_loop_t *loop, uv_stream_t *stream, uv_handle_type type)
Definition: stream.c:85
int uv_is_writable(const uv_stream_t *stream)
Definition: stream.c:1611
UV_PLATFORM_SEM_T uv_sem_t
Definition: unix.h:139
pthread_t uv_thread_t
Definition: unix.h:136
void error(const char *msg)
Definition: untgz.c:593
void * uv__realloc(void *ptr, size_t size)
Definition: uv-common.c:96
size_t uv__count_bufs(const uv_buf_t bufs[], unsigned int nbufs)
Definition: uv-common.c:573
void * uv__malloc(size_t size)
Definition: uv-common.c:75
void uv__free(void *ptr)
Definition: uv-common.c:81
@ UV_HANDLE_TCP_KEEPALIVE
Definition: uv-common.h:106
@ UV_HANDLE_READ_PARTIAL
Definition: uv-common.h:86
@ UV_HANDLE_TCP_NODELAY
Definition: uv-common.h:105
@ UV_HANDLE_BLOCKING_WRITES
Definition: uv-common.h:98
@ UV_HANDLE_TCP_SINGLE_ACCEPT
Definition: uv-common.h:107
@ UV_HANDLE_READING
Definition: uv-common.h:90
@ UV_HANDLE_READ_EOF
Definition: uv-common.h:87
@ UV_HANDLE_SHUT
Definition: uv-common.h:85
@ UV_HANDLE_CLOSING
Definition: uv-common.h:74
@ UV_HANDLE_SHUTTING
Definition: uv-common.h:84
@ UV_HANDLE_WRITABLE
Definition: uv-common.h:93
@ UV_HANDLE_CLOSED
Definition: uv-common.h:75
@ UV_HANDLE_INTERNAL
Definition: uv-common.h:78
@ UV_HANDLE_BOUND
Definition: uv-common.h:91
@ UV_HANDLE_READABLE
Definition: uv-common.h:92
#define uv__handle_unref(h)
Definition: uv-common.h:283
#define uv__is_closing(h)
Definition: uv-common.h:255
#define uv__handle_init(loop_, h, type_)
Definition: uv-common.h:301
#define uv__handle_stop(h)
Definition: uv-common.h:266
#define uv__req_init(loop, req, typ)
Definition: uv-common.h:329
#define uv__req_unregister(loop, req)
Definition: uv-common.h:230
#define uv__handle_start(h)
Definition: uv-common.h:258
void(* uv_write_cb)(uv_write_t *req, int status)
Definition: uv.h:315
void(* uv_connection_cb)(uv_stream_t *server, int status)
Definition: uv.h:318
UV_EXTERN int uv_thread_join(uv_thread_t *tid)
Definition: thread.c:272
uv_handle_type
Definition: uv.h:189
@ UV_UNKNOWN_HANDLE
Definition: uv.h:190
UV_EXTERN void uv_sem_destroy(uv_sem_t *sem)
Definition: thread.c:662
UV_EXTERN uv_buf_t uv_buf_init(char *base, unsigned int len)
Definition: uv-common.c:157
UV_EXTERN void uv_sem_post(uv_sem_t *sem)
Definition: thread.c:670
UV_EXTERN int uv_sem_trywait(uv_sem_t *sem)
Definition: thread.c:686
UV_EXTERN int uv_udp_open(uv_udp_t *handle, uv_os_sock_t sock)
Definition: udp.c:991
void(* uv_read_cb)(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
Definition: uv.h:312
void(* uv_alloc_cb)(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
Definition: uv.h:309
UV_EXTERN void uv_close(uv_handle_t *handle, uv_close_cb close_cb)
Definition: core.c:108
UV_EXTERN int uv_thread_create(uv_thread_t *tid, uv_thread_cb entry, void *arg)
Definition: thread.c:210
UV_EXTERN int uv_async_init(uv_loop_t *, uv_async_t *async, uv_async_cb async_cb)
Definition: async.c:45
UV_EXTERN int uv_async_send(uv_async_t *async)
Definition: async.c:63
UV_EXTERN void uv_sem_wait(uv_sem_t *sem)
Definition: thread.c:678
UV_EXTERN int uv_sem_init(uv_sem_t *sem, unsigned int value)
Definition: thread.c:650
void(* uv_shutdown_cb)(uv_shutdown_t *req, int status)
Definition: uv.h:317
uv_pipe_t queue
Definition: worker.c:9
static const z80_opcode fd[]
Definition: z80_tab.h:997
static const char * cb[]
Definition: z80_tab.h:176
int read(izstream &zs, T *x, Items items)
Definition: zstream.h:115