Rizin
unix-like reverse engineering framework and cli tools
stream.c File Reference
#include "uv.h"
#include "internal.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <unistd.h>
#include <limits.h>

Go to the source code of this file.

Macros

#define RETRY_ON_WRITE_ERROR(errno)   (errno == EINTR)
 
#define IS_TRANSIENT_WRITE_ERROR(errno, send_handle)    (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
 
#define UV_DEC_BACKLOG(w)   /* no-op */
 
#define UV__CMSG_FD_COUNT   64
 
#define UV__CMSG_FD_SIZE   (UV__CMSG_FD_COUNT * sizeof(int))
 

Functions

static void uv__stream_connect (uv_stream_t *)
 
static void uv__write (uv_stream_t *stream)
 
static void uv__read (uv_stream_t *stream)
 
static void uv__stream_io (uv_loop_t *loop, uv__io_t *w, unsigned int events)
 
static void uv__write_callbacks (uv_stream_t *stream)
 
static size_t uv__write_req_size (uv_write_t *req)
 
void uv__stream_init (uv_loop_t *loop, uv_stream_t *stream, uv_handle_type type)
 
static void uv__stream_osx_interrupt_select (uv_stream_t *stream)
 
int uv__stream_open (uv_stream_t *stream, int fd, int flags)
 
void uv__stream_flush_write_queue (uv_stream_t *stream, int error)
 
void uv__stream_destroy (uv_stream_t *stream)
 
static int uv__emfile_trick (uv_loop_t *loop, int accept_fd)
 
void uv__server_io (uv_loop_t *loop, uv__io_t *w, unsigned int events)
 
int uv_accept (uv_stream_t *server, uv_stream_t *client)
 
int uv_listen (uv_stream_t *stream, int backlog, uv_connection_cb cb)
 
static void uv__drain (uv_stream_t *stream)
 
static ssize_t uv__writev (int fd, struct iovec *vec, size_t n)
 
static int uv__write_req_update (uv_stream_t *stream, uv_write_t *req, size_t n)
 
static void uv__write_req_finish (uv_write_t *req)
 
static int uv__handle_fd (uv_handle_t *handle)
 
uv_handle_type uv__handle_type (int fd)
 
static void uv__stream_eof (uv_stream_t *stream, const uv_buf_t *buf)
 
static int uv__stream_queue_fd (uv_stream_t *stream, int fd)
 
static int uv__stream_recv_cmsg (uv_stream_t *stream, struct msghdr *msg)
 
int uv_shutdown (uv_shutdown_t *req, uv_stream_t *stream, uv_shutdown_cb cb)
 
int uv_write2 (uv_write_t *req, uv_stream_t *stream, const uv_buf_t bufs[], unsigned int nbufs, uv_stream_t *send_handle, uv_write_cb cb)
 
int uv_write (uv_write_t *req, uv_stream_t *handle, const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb)
 
void uv_try_write_cb (uv_write_t *req, int status)
 
int uv_try_write (uv_stream_t *stream, const uv_buf_t bufs[], unsigned int nbufs)
 
int uv_read_start (uv_stream_t *stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb)
 
int uv_read_stop (uv_stream_t *stream)
 
int uv_is_readable (const uv_stream_t *stream)
 
int uv_is_writable (const uv_stream_t *stream)
 
void uv__stream_close (uv_stream_t *handle)
 
int uv_stream_set_blocking (uv_stream_t *handle, int blocking)
 

Macro Definition Documentation

◆ IS_TRANSIENT_WRITE_ERROR

#define IS_TRANSIENT_WRITE_ERROR (   errno,
  send_handle 
)     (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)

Definition at line 73 of file stream.c.

◆ RETRY_ON_WRITE_ERROR

#define RETRY_ON_WRITE_ERROR (   errno)    (errno == EINTR)

Definition at line 72 of file stream.c.

◆ UV__CMSG_FD_COUNT

#define UV__CMSG_FD_COUNT   64

Definition at line 1055 of file stream.c.

◆ UV__CMSG_FD_SIZE

#define UV__CMSG_FD_SIZE   (UV__CMSG_FD_COUNT * sizeof(int))

Definition at line 1057 of file stream.c.

◆ UV_DEC_BACKLOG

#define UV_DEC_BACKLOG (   w)    /* no-op */

Definition at line 524 of file stream.c.

Function Documentation

◆ uv__drain()

static void uv__drain ( uv_stream_t stream)
static

Definition at line 679 of file stream.c.

679  {
681  int err;
682 
683  assert(QUEUE_EMPTY(&stream->write_queue));
684  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
686 
687  /* Shutdown? */
688  if ((stream->flags & UV_HANDLE_SHUTTING) &&
689  !(stream->flags & UV_HANDLE_CLOSING) &&
690  !(stream->flags & UV_HANDLE_SHUT)) {
691  assert(stream->shutdown_req);
692 
693  req = stream->shutdown_req;
694  stream->shutdown_req = NULL;
695  stream->flags &= ~UV_HANDLE_SHUTTING;
696  uv__req_unregister(stream->loop, req);
697 
698  err = 0;
699  if (shutdown(uv__stream_fd(stream), SHUT_WR))
700  err = UV__ERR(errno);
701 
702  if (err == 0)
703  stream->flags |= UV_HANDLE_SHUT;
704 
705  if (req->cb != NULL)
706  req->cb(req, err);
707  }
708 }
static bool err
Definition: armass.c:435
#define NULL
Definition: cris-opc.c:27
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence static length const void static len static semflg const void static shmflg const struct timespec req
Definition: sflib.h:128
#define UV__ERR(x)
Definition: errno.h:29
voidpf stream
Definition: ioapi.h:138
assert(limit<=UINT32_MAX/2)
#define QUEUE_EMPTY(q)
Definition: queue.h:39
void uv__io_stop(uv_loop_t *loop, uv__io_t *w, unsigned int events)
Definition: core.c:910
#define uv__stream_fd(handle)
Definition: internal.h:282
static void uv__stream_osx_interrupt_select(uv_stream_t *stream)
Definition: stream.c:123
@ UV_HANDLE_SHUT
Definition: uv-common.h:85
@ UV_HANDLE_CLOSING
Definition: uv-common.h:74
@ UV_HANDLE_SHUTTING
Definition: uv-common.h:84
#define uv__req_unregister(loop, req)
Definition: uv-common.h:230

References assert(), err, NULL, QUEUE_EMPTY, req, UV__ERR, uv__io_stop(), uv__req_unregister, uv__stream_fd, uv__stream_osx_interrupt_select(), UV_HANDLE_CLOSING, UV_HANDLE_SHUT, and UV_HANDLE_SHUTTING.

Referenced by uv__stream_io().

◆ uv__emfile_trick()

static int uv__emfile_trick ( uv_loop_t loop,
int  accept_fd 
)
static

Definition at line 497 of file stream.c.

497  {
498  int err;
499  int emfile_fd;
500 
501  if (loop->emfile_fd == -1)
502  return UV_EMFILE;
503 
504  uv__close(loop->emfile_fd);
505  loop->emfile_fd = -1;
506 
507  do {
508  err = uv__accept(accept_fd);
509  if (err >= 0)
510  uv__close(err);
511  } while (err >= 0 || err == UV_EINTR);
512 
513  emfile_fd = uv__open_cloexec("/", O_RDONLY);
514  if (emfile_fd >= 0)
515  loop->emfile_fd = emfile_fd;
516 
517  return err;
518 }
#define O_RDONLY
Definition: sftypes.h:486
uv_loop_t * loop
Definition: main.c:7
int uv__open_cloexec(const char *path, int flags)
Definition: core.c:1003
int uv__accept(int sockfd)
Definition: core.c:489
int uv__close(int fd)
Definition: core.c:569

References err, loop, O_RDONLY, uv__accept(), uv__close(), and uv__open_cloexec().

Referenced by uv__server_io().

◆ uv__handle_fd()

static int uv__handle_fd ( uv_handle_t handle)
static

Definition at line 788 of file stream.c.

788  {
789  switch (handle->type) {
790  case UV_NAMED_PIPE:
791  case UV_TCP:
792  return ((uv_stream_t*) handle)->io_watcher.fd;
793 
794  case UV_UDP:
795  return ((uv_udp_t*) handle)->io_watcher.fd;
796 
797  default:
798  return -1;
799  }
800 }
static mcore_handle handle
Definition: asm_mcore.c:8
Definition: uv.h:638

References handle.

Referenced by uv__write(), and uv_write2().

◆ uv__handle_type()

uv_handle_type uv__handle_type ( int  fd)

Definition at line 958 of file stream.c.

958  {
959  struct sockaddr_storage ss;
960  socklen_t sslen;
961  socklen_t len;
962  int type;
963 
964  memset(&ss, 0, sizeof(ss));
965  sslen = sizeof(ss);
966 
967  if (getsockname(fd, (struct sockaddr*)&ss, &sslen))
968  return UV_UNKNOWN_HANDLE;
969 
970  len = sizeof type;
971 
972  if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
973  return UV_UNKNOWN_HANDLE;
974 
975  if (type == SOCK_STREAM) {
976 #if defined(_AIX) || defined(__DragonFly__)
977  /* on AIX/DragonFly the getsockname call returns an empty sa structure
978  * for sockets of type AF_UNIX. For all other types it will
979  * return a properly filled in structure.
980  */
981  if (sslen == 0)
982  return UV_NAMED_PIPE;
983 #endif
984  switch (ss.ss_family) {
985  case AF_UNIX:
986  return UV_NAMED_PIPE;
987  case AF_INET:
988  case AF_INET6:
989  return UV_TCP;
990  }
991  }
992 
993  if (type == SOCK_DGRAM &&
994  (ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
995  return UV_UDP;
996 
997  return UV_UNKNOWN_HANDLE;
998 }
size_t len
Definition: 6502dis.c:15
return memset(p, 0, total)
int type
Definition: mipsasm.c:17
#define AF_UNIX
Definition: sftypes.h:285
unsigned int socklen_t
Definition: sftypes.h:219
@ SOCK_DGRAM
Definition: sftypes.h:227
@ SOCK_STREAM
Definition: sftypes.h:224
#define SO_TYPE
Definition: sftypes.h:431
#define SOL_SOCKET
Definition: sftypes.h:427
#define AF_INET
Definition: sftypes.h:287
#define AF_INET6
Definition: sftypes.h:295
@ UV_UNKNOWN_HANDLE
Definition: uv.h:190
static const z80_opcode fd[]
Definition: z80_tab.h:997

References AF_INET, AF_INET6, AF_UNIX, fd, len, memset(), SO_TYPE, SOCK_DGRAM, SOCK_STREAM, SOL_SOCKET, type, and UV_UNKNOWN_HANDLE.

Referenced by uv_pipe_pending_type().

◆ uv__read()

static void uv__read ( uv_stream_t stream)
static

Definition at line 1116 of file stream.c.

1116  {
1117  uv_buf_t buf;
1118  ssize_t nread;
1119  struct msghdr msg;
1120  char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
1121  int count;
1122  int err;
1123  int is_ipc;
1124 
1125  stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1126 
1127  /* Prevent loop starvation when the data comes in as fast as (or faster than)
1128  * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
1129  */
1130  count = 32;
1131 
1132  is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1133 
1134  /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
1135  * tcp->read_cb is NULL or not?
1136  */
1137  while (stream->read_cb
1138  && (stream->flags & UV_HANDLE_READING)
1139  && (count-- > 0)) {
1140  assert(stream->alloc_cb != NULL);
1141 
1142  buf = uv_buf_init(NULL, 0);
1143  stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1144  if (buf.base == NULL || buf.len == 0) {
1145  /* User indicates it can't or won't handle the read. */
1146  stream->read_cb(stream, UV_ENOBUFS, &buf);
1147  return;
1148  }
1149 
1150  assert(buf.base != NULL);
1151  assert(uv__stream_fd(stream) >= 0);
1152 
1153  if (!is_ipc) {
1154  do {
1155  nread = read(uv__stream_fd(stream), buf.base, buf.len);
1156  }
1157  while (nread < 0 && errno == EINTR);
1158  } else {
1159  /* ipc uses recvmsg */
1160  msg.msg_flags = 0;
1161  msg.msg_iov = (struct iovec*) &buf;
1162  msg.msg_iovlen = 1;
1163  msg.msg_name = NULL;
1164  msg.msg_namelen = 0;
1165  /* Set up to receive a descriptor even if one isn't in the message */
1166  msg.msg_controllen = sizeof(cmsg_space);
1167  msg.msg_control = cmsg_space;
1168 
1169  do {
1170  nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1171  }
1172  while (nread < 0 && errno == EINTR);
1173  }
1174 
1175  if (nread < 0) {
1176  /* Error */
1177  if (errno == EAGAIN || errno == EWOULDBLOCK) {
1178  /* Wait for the next one. */
1179  if (stream->flags & UV_HANDLE_READING) {
1180  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1182  }
1183  stream->read_cb(stream, 0, &buf);
1184 #if defined(__CYGWIN__) || defined(__MSYS__)
1185  } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1187  return;
1188 #endif
1189  } else {
1190  /* Error. User should call uv_close(). */
1191  stream->read_cb(stream, UV__ERR(errno), &buf);
1192  if (stream->flags & UV_HANDLE_READING) {
1193  stream->flags &= ~UV_HANDLE_READING;
1194  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1195  if (!uv__io_active(&stream->io_watcher, POLLOUT))
1198  }
1199  }
1200  return;
1201  } else if (nread == 0) {
1203  return;
1204  } else {
1205  /* Successful read */
1206  ssize_t buflen = buf.len;
1207 
1208  if (is_ipc) {
1210  if (err != 0) {
1211  stream->read_cb(stream, err, &buf);
1212  return;
1213  }
1214  }
1215 
1216 #if defined(__MVS__)
1217  if (is_ipc && msg.msg_controllen > 0) {
1218  uv_buf_t blankbuf;
1219  int nread;
1220  struct iovec *old;
1221 
1222  blankbuf.base = 0;
1223  blankbuf.len = 0;
1224  old = msg.msg_iov;
1225  msg.msg_iov = (struct iovec*) &blankbuf;
1226  nread = 0;
1227  do {
1228  nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1230  if (err != 0) {
1231  stream->read_cb(stream, err, &buf);
1232  msg.msg_iov = old;
1233  return;
1234  }
1235  } while (nread == 0 && msg.msg_controllen > 0);
1236  msg.msg_iov = old;
1237  }
1238 #endif
1239  stream->read_cb(stream, nread, &buf);
1240 
1241  /* Return if we didn't fill the buffer, there is no more data to read. */
1242  if (nread < buflen) {
1243  stream->flags |= UV_HANDLE_READ_PARTIAL;
1244  return;
1245  }
1246  }
1247  }
1248 }
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void count
Definition: sflib.h:98
voidpf void * buf
Definition: ioapi.h:138
static struct sockaddr static addrlen static backlog const void msg
Definition: sfsocketcall.h:119
#define EINTR
Definition: sftypes.h:114
#define CMSG_SPACE(len)
Definition: sftypes.h:423
#define ECONNRESET
Definition: sftypes.h:164
#define EAGAIN
Definition: sftypes.h:121
int ssize_t
Definition: sftypes.h:39
Definition: sftypes.h:73
Definition: unix.h:123
size_t len
Definition: unix.h:125
char * base
Definition: unix.h:124
Definition: uv.h:767
void uv__io_start(uv_loop_t *loop, uv__io_t *w, unsigned int events)
Definition: core.c:882
int uv__io_active(const uv__io_t *w, unsigned int events)
Definition: core.c:958
ssize_t uv__recvmsg(int fd, struct msghdr *msg, int flags)
Definition: core.c:670
ut64 buflen
Definition: core.c:76
static void uv__stream_eof(uv_stream_t *stream, const uv_buf_t *buf)
Definition: stream.c:1001
#define UV__CMSG_FD_SIZE
Definition: stream.c:1057
static int uv__stream_recv_cmsg(uv_stream_t *stream, struct msghdr *msg)
Definition: stream.c:1060
@ UV_HANDLE_READ_PARTIAL
Definition: uv-common.h:86
@ UV_HANDLE_READING
Definition: uv-common.h:90
#define uv__handle_stop(h)
Definition: uv-common.h:266
UV_EXTERN uv_buf_t uv_buf_init(char *base, unsigned int len)
Definition: uv-common.c:157
int read(izstream &zs, T *x, Items items)
Definition: zstream.h:115

References assert(), uv_buf_t::base, buflen, CMSG_SPACE, count, EAGAIN, ECONNRESET, EINTR, err, uv_buf_t::len, msg, NULL, read(), UV__CMSG_FD_SIZE, UV__ERR, uv__handle_stop, uv__io_active(), uv__io_start(), uv__io_stop(), uv__recvmsg(), uv__stream_eof(), uv__stream_fd, uv__stream_osx_interrupt_select(), uv__stream_recv_cmsg(), uv_buf_init(), UV_HANDLE_READ_PARTIAL, and UV_HANDLE_READING.

Referenced by uv__stream_io().

◆ uv__server_io()

void uv__server_io ( uv_loop_t loop,
uv__io_t w,
unsigned int  events 
)

Definition at line 528 of file stream.c.

528  {
530  int err;
531 
532  stream = container_of(w, uv_stream_t, io_watcher);
533  assert(events & POLLIN);
534  assert(stream->accepted_fd == -1);
535  assert(!(stream->flags & UV_HANDLE_CLOSING));
536 
537  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
538 
539  /* connection_cb can close the server socket while we're
540  * in the loop so check it on each iteration.
541  */
542  while (uv__stream_fd(stream) != -1) {
543  assert(stream->accepted_fd == -1);
544 
545 #if defined(UV_HAVE_KQUEUE)
546  if (w->rcount <= 0)
547  return;
548 #endif /* defined(UV_HAVE_KQUEUE) */
549 
551  if (err < 0) {
552  if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
553  return; /* Not an error. */
554 
555  if (err == UV_ECONNABORTED)
556  continue; /* Ignore. Nothing we can do about that. */
557 
558  if (err == UV_EMFILE || err == UV_ENFILE) {
560  if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
561  break;
562  }
563 
564  stream->connection_cb(stream, err);
565  continue;
566  }
567 
569  stream->accepted_fd = err;
570  stream->connection_cb(stream, 0);
571 
572  if (stream->accepted_fd != -1) {
573  /* The user hasn't yet accepted called uv_accept() */
574  uv__io_stop(loop, &stream->io_watcher, POLLIN);
575  return;
576  }
577 
578  if (stream->type == UV_TCP &&
579  (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
580  /* Give other processes a chance to accept connections. */
581  struct timespec timeout = { 0, 1 };
583  }
584  }
585 }
#define w
Definition: crypto_rc6.c:13
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence static length const void static len static semflg const void static shmflg nanosleep
Definition: sflib.h:128
#define container_of(ptr, type, member)
Definition: rz_types.h:650
uv_timer_t timeout
Definition: main.c:9
static int uv__emfile_trick(uv_loop_t *loop, int accept_fd)
Definition: stream.c:497
#define UV_DEC_BACKLOG(w)
Definition: stream.c:524
@ UV_HANDLE_TCP_SINGLE_ACCEPT
Definition: uv-common.h:107

References assert(), container_of, err, loop, nanosleep, NULL, timeout, uv__accept(), uv__emfile_trick(), UV__ERR, uv__io_start(), uv__io_stop(), uv__stream_fd, UV_DEC_BACKLOG, UV_HANDLE_CLOSING, UV_HANDLE_TCP_SINGLE_ACCEPT, and w.

Referenced by uv_pipe_listen(), and uv_tcp_listen().

◆ uv__stream_close()

void uv__stream_close ( uv_stream_t handle)

Definition at line 1633 of file stream.c.

1633  {
1634  unsigned int i;
1635  uv__stream_queued_fds_t* queued_fds;
1636 
1637 #if defined(__APPLE__)
1638  /* Terminate select loop first */
1639  if (handle->select != NULL) {
1640  uv__stream_select_t* s;
1641 
1642  s = handle->select;
1643 
1644  uv_sem_post(&s->close_sem);
1645  uv_sem_post(&s->async_sem);
1647  uv_thread_join(&s->thread);
1648  uv_sem_destroy(&s->close_sem);
1649  uv_sem_destroy(&s->async_sem);
1650  uv__close(s->fake_fd);
1651  uv__close(s->int_fd);
1652  uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1653 
1654  handle->select = NULL;
1655  }
1656 #endif /* defined(__APPLE__) */
1657 
1658  uv__io_close(handle->loop, &handle->io_watcher);
1662 
1663  if (handle->io_watcher.fd != -1) {
1664  /* Don't close stdio file descriptors. Nothing good comes from it. */
1665  if (handle->io_watcher.fd > STDERR_FILENO)
1666  uv__close(handle->io_watcher.fd);
1667  handle->io_watcher.fd = -1;
1668  }
1669 
1670  if (handle->accepted_fd != -1) {
1671  uv__close(handle->accepted_fd);
1672  handle->accepted_fd = -1;
1673  }
1674 
1675  /* Close all queued fds */
1676  if (handle->queued_fds != NULL) {
1677  queued_fds = handle->queued_fds;
1678  for (i = 0; i < queued_fds->offset; i++)
1679  uv__close(queued_fds->fds[i]);
1680  uv__free(handle->queued_fds);
1681  handle->queued_fds = NULL;
1682  }
1683 
1684  assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
1685 }
lzma_index ** i
Definition: index.h:629
static RzSocket * s
Definition: rtr.c:28
unsigned int offset
Definition: internal.h:153
void uv__io_close(uv_loop_t *loop, uv__io_t *w)
Definition: core.c:942
#define STDERR_FILENO
Definition: private.h:45
int uv_read_stop(uv_stream_t *stream)
Definition: stream.c:1590
void uv__free(void *ptr)
Definition: uv-common.c:81
@ UV_HANDLE_WRITABLE
Definition: uv-common.h:93
@ UV_HANDLE_READABLE
Definition: uv-common.h:92
UV_EXTERN int uv_thread_join(uv_thread_t *tid)
Definition: thread.c:272
UV_EXTERN void uv_sem_destroy(uv_sem_t *sem)
Definition: thread.c:662
UV_EXTERN void uv_sem_post(uv_sem_t *sem)
Definition: thread.c:670
UV_EXTERN void uv_close(uv_handle_t *handle, uv_close_cb close_cb)
Definition: core.c:108

References assert(), uv__stream_queued_fds_s::fds, handle, i, NULL, uv__stream_queued_fds_s::offset, s, STDERR_FILENO, uv__close(), uv__free(), uv__handle_stop, uv__io_active(), uv__io_close(), uv__stream_osx_interrupt_select(), uv_close(), UV_HANDLE_READABLE, UV_HANDLE_WRITABLE, uv_read_stop(), uv_sem_destroy(), uv_sem_post(), and uv_thread_join().

Referenced by uv__pipe_close(), uv__process_close_stream(), uv__tcp_close(), and uv_close().

◆ uv__stream_connect()

static void uv__stream_connect ( uv_stream_t stream)
static

We get called here from directly following a call to connect(2). In order to determine if we've errored out or succeeded must call getsockopt.

Definition at line 1344 of file stream.c.

1344  {
1345  int error;
1346  uv_connect_t* req = stream->connect_req;
1347  socklen_t errorsize = sizeof(int);
1348 
1349  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1350  assert(req);
1351 
1352  if (stream->delayed_error) {
1353  /* To smooth over the differences between unixes errors that
1354  * were reported synchronously on the first connect can be delayed
1355  * until the next tick--which is now.
1356  */
1357  error = stream->delayed_error;
1358  stream->delayed_error = 0;
1359  } else {
1360  /* Normal situation: we need to get the socket error from the kernel. */
1361  assert(uv__stream_fd(stream) >= 0);
1362  getsockopt(uv__stream_fd(stream),
1363  SOL_SOCKET,
1364  SO_ERROR,
1365  &error,
1366  &errorsize);
1367  error = UV__ERR(error);
1368  }
1369 
1370  if (error == UV__ERR(EINPROGRESS))
1371  return;
1372 
1373  stream->connect_req = NULL;
1374  uv__req_unregister(stream->loop, req);
1375 
1376  if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
1377  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1378  }
1379 
1380  if (req->cb)
1381  req->cb(req, error);
1382 
1383  if (uv__stream_fd(stream) == -1)
1384  return;
1385 
1386  if (error < 0) {
1387  uv__stream_flush_write_queue(stream, UV_ECANCELED);
1389  }
1390 }
static int
Definition: sfsocketcall.h:114
#define SO_ERROR
Definition: sftypes.h:432
#define EINPROGRESS
Definition: sftypes.h:175
void uv__stream_flush_write_queue(uv_stream_t *stream, int error)
Definition: stream.c:443
static void uv__write_callbacks(uv_stream_t *stream)
Definition: stream.c:927
void error(const char *msg)
Definition: untgz.c:593

References assert(), EINPROGRESS, error(), int, NULL, QUEUE_EMPTY, req, SO_ERROR, SOL_SOCKET, UV__ERR, uv__io_stop(), uv__req_unregister, uv__stream_fd, uv__stream_flush_write_queue(), and uv__write_callbacks().

Referenced by uv__stream_io().

◆ uv__stream_destroy()

void uv__stream_destroy ( uv_stream_t stream)

Definition at line 458 of file stream.c.

458  {
459  assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
460  assert(stream->flags & UV_HANDLE_CLOSED);
461 
462  if (stream->connect_req) {
463  uv__req_unregister(stream->loop, stream->connect_req);
464  stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
465  stream->connect_req = NULL;
466  }
467 
468  uv__stream_flush_write_queue(stream, UV_ECANCELED);
470 
471  if (stream->shutdown_req) {
472  /* The ECANCELED error code is a lie, the shutdown(2) syscall is a
473  * fait accompli at this point. Maybe we should revisit this in v0.11.
474  * A possible reason for leaving it unchanged is that it informs the
475  * callee that the handle has been destroyed.
476  */
477  uv__req_unregister(stream->loop, stream->shutdown_req);
478  stream->shutdown_req->cb(stream->shutdown_req, UV_ECANCELED);
479  stream->shutdown_req = NULL;
480  }
481 
482  assert(stream->write_queue_size == 0);
483 }
@ UV_HANDLE_CLOSED
Definition: uv-common.h:75

References assert(), NULL, uv__io_active(), uv__req_unregister, uv__stream_flush_write_queue(), uv__write_callbacks(), and UV_HANDLE_CLOSED.

Referenced by uv__finish_close().

◆ uv__stream_eof()

static void uv__stream_eof ( uv_stream_t stream,
const uv_buf_t buf 
)
static

Definition at line 1001 of file stream.c.

1001  {
1002  stream->flags |= UV_HANDLE_READ_EOF;
1003  stream->flags &= ~UV_HANDLE_READING;
1004  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1005  if (!uv__io_active(&stream->io_watcher, POLLOUT))
1008  stream->read_cb(stream, UV_EOF, buf);
1009 }
@ UV_HANDLE_READ_EOF
Definition: uv-common.h:87

References uv__handle_stop, uv__io_active(), uv__io_stop(), uv__stream_osx_interrupt_select(), UV_HANDLE_READ_EOF, and UV_HANDLE_READING.

Referenced by uv__read(), and uv__stream_io().

◆ uv__stream_flush_write_queue()

void uv__stream_flush_write_queue ( uv_stream_t stream,
int  error 
)

Definition at line 443 of file stream.c.

443  {
444  uv_write_t* req;
445  QUEUE* q;
446  while (!QUEUE_EMPTY(&stream->write_queue)) {
447  q = QUEUE_HEAD(&stream->write_queue);
448  QUEUE_REMOVE(q);
449 
451  req->error = error;
452 
453  QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
454  }
455 }
#define QUEUE_DATA(ptr, type, field)
Definition: queue.h:30
#define QUEUE_HEAD(q)
Definition: queue.h:42
#define QUEUE_INSERT_TAIL(h, q)
Definition: queue.h:92
void * QUEUE[2]
Definition: queue.h:21
#define QUEUE_REMOVE(q)
Definition: queue.h:101
Definition: uv.h:525
uv_pipe_t queue
Definition: worker.c:9

References error(), queue, QUEUE_DATA, QUEUE_EMPTY, QUEUE_HEAD, QUEUE_INSERT_TAIL, QUEUE_REMOVE, and req.

Referenced by uv__stream_connect(), and uv__stream_destroy().

◆ uv__stream_init()

void uv__stream_init ( uv_loop_t loop,
uv_stream_t stream,
uv_handle_type  type 
)

Definition at line 85 of file stream.c.

87  {
88  int err;
89 
91  stream->read_cb = NULL;
92  stream->alloc_cb = NULL;
93  stream->close_cb = NULL;
94  stream->connection_cb = NULL;
95  stream->connect_req = NULL;
96  stream->shutdown_req = NULL;
97  stream->accepted_fd = -1;
98  stream->queued_fds = NULL;
99  stream->delayed_error = 0;
100  QUEUE_INIT(&stream->write_queue);
101  QUEUE_INIT(&stream->write_completed_queue);
102  stream->write_queue_size = 0;
103 
104  if (loop->emfile_fd == -1) {
105  err = uv__open_cloexec("/dev/null", O_RDONLY);
106  if (err < 0)
107  /* In the rare case that "/dev/null" isn't mounted open "/"
108  * instead.
109  */
110  err = uv__open_cloexec("/", O_RDONLY);
111  if (err >= 0)
112  loop->emfile_fd = err;
113  }
114 
115 #if defined(__APPLE__)
116  stream->select = NULL;
117 #endif /* defined(__APPLE_) */
118 
119  uv__io_init(&stream->io_watcher, uv__stream_io, -1);
120 }
#define QUEUE_INIT(q)
Definition: queue.h:45
void uv__io_init(uv__io_t *w, uv__io_cb cb, int fd)
Definition: core.c:865
static void uv__stream_io(uv_loop_t *loop, uv__io_t *w, unsigned int events)
Definition: stream.c:1287
#define uv__handle_init(loop_, h, type_)
Definition: uv-common.h:301

References err, loop, NULL, O_RDONLY, QUEUE_INIT, type, uv__handle_init, uv__io_init(), uv__open_cloexec(), and uv__stream_io().

Referenced by uv_pipe_init(), uv_tcp_init_ex(), and uv_tty_init().

◆ uv__stream_io()

static void uv__stream_io ( uv_loop_t loop,
uv__io_t w,
unsigned int  events 
)
static

Definition at line 1287 of file stream.c.

1287  {
1289 
1290  stream = container_of(w, uv_stream_t, io_watcher);
1291 
1292  assert(stream->type == UV_TCP ||
1293  stream->type == UV_NAMED_PIPE ||
1294  stream->type == UV_TTY);
1295  assert(!(stream->flags & UV_HANDLE_CLOSING));
1296 
1297  if (stream->connect_req) {
1299  return;
1300  }
1301 
1302  assert(uv__stream_fd(stream) >= 0);
1303 
1304  /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
1305  if (events & (POLLIN | POLLERR | POLLHUP))
1306  uv__read(stream);
1307 
1308  if (uv__stream_fd(stream) == -1)
1309  return; /* read_cb closed stream. */
1310 
1311  /* Short-circuit iff POLLHUP is set, the user is still interested in read
1312  * events and uv__read() reported a partial read but not EOF. If the EOF
1313  * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1314  * have to do anything. If the partial read flag is not set, we can't
1315  * report the EOF yet because there is still data to read.
1316  */
1317  if ((events & POLLHUP) &&
1318  (stream->flags & UV_HANDLE_READING) &&
1319  (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1320  !(stream->flags & UV_HANDLE_READ_EOF)) {
1321  uv_buf_t buf = { NULL, 0 };
1323  }
1324 
1325  if (uv__stream_fd(stream) == -1)
1326  return; /* read_cb closed stream. */
1327 
1328  if (events & (POLLOUT | POLLERR | POLLHUP)) {
1329  uv__write(stream);
1331 
1332  /* Write queue drained. */
1333  if (QUEUE_EMPTY(&stream->write_queue))
1334  uv__drain(stream);
1335  }
1336 }
static void uv__write(uv_stream_t *stream)
Definition: stream.c:802
static void uv__read(uv_stream_t *stream)
Definition: stream.c:1116
static void uv__stream_connect(uv_stream_t *)
Definition: stream.c:1344
static void uv__drain(uv_stream_t *stream)
Definition: stream.c:679

References assert(), container_of, NULL, QUEUE_EMPTY, uv__drain(), uv__read(), uv__stream_connect(), uv__stream_eof(), uv__stream_fd, uv__write(), uv__write_callbacks(), UV_HANDLE_CLOSING, UV_HANDLE_READ_EOF, UV_HANDLE_READ_PARTIAL, UV_HANDLE_READING, and w.

Referenced by uv__stream_init().

◆ uv__stream_open()

int uv__stream_open ( uv_stream_t stream,
int  fd,
int  flags 
)

Definition at line 406 of file stream.c.

406  {
407 #if defined(__APPLE__)
408  int enable;
409 #endif
410 
411  if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
412  return UV_EBUSY;
413 
414  assert(fd >= 0);
415  stream->flags |= flags;
416 
417  if (stream->type == UV_TCP) {
418  if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
419  return UV__ERR(errno);
420 
421  /* TODO Use delay the user passed in. */
422  if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
423  uv__tcp_keepalive(fd, 1, 60)) {
424  return UV__ERR(errno);
425  }
426  }
427 
428 #if defined(__APPLE__)
429  enable = 1;
430  if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
431  errno != ENOTSOCK &&
432  errno != EINVAL) {
433  return UV__ERR(errno);
434  }
435 #endif
436 
437  stream->io_watcher.fd = fd;
438 
439  return 0;
440 }
static struct sockaddr static addrlen static backlog const void static flags void flags
Definition: sfsocketcall.h:123
#define SO_OOBINLINE
Definition: sftypes.h:438
#define EINVAL
Definition: sftypes.h:132
#define ENOTSOCK
Definition: sftypes.h:148
int uv__tcp_keepalive(int fd, int on, unsigned int delay)
Definition: tcp.c:380
int uv__tcp_nodelay(int fd, int on)
Definition: tcp.c:373
@ UV_HANDLE_TCP_KEEPALIVE
Definition: uv-common.h:106
@ UV_HANDLE_TCP_NODELAY
Definition: uv-common.h:105

References assert(), EINVAL, ENOTSOCK, fd, flags, SO_OOBINLINE, SOL_SOCKET, UV__ERR, uv__tcp_keepalive(), uv__tcp_nodelay(), UV_HANDLE_TCP_KEEPALIVE, and UV_HANDLE_TCP_NODELAY.

Referenced by new_socket(), uv__process_open_stream(), uv_accept(), uv_pipe_connect(), uv_pipe_open(), uv_tcp_open(), and uv_tty_init().

◆ uv__stream_osx_interrupt_select()

static void uv__stream_osx_interrupt_select ( uv_stream_t stream)
static

Definition at line 123 of file stream.c.

123  {
124 #if defined(__APPLE__)
125  /* Notify select() thread about state change */
126  uv__stream_select_t* s;
127  int r;
128 
129  s = stream->select;
130  if (s == NULL)
131  return;
132 
133  /* Interrupt select() loop
134  * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
135  * emit read event on other side
136  */
137  do
138  r = write(s->fake_fd, "x", 1);
139  while (r == -1 && errno == EINTR);
140 
141  assert(r == 1);
142 #else /* !defined(__APPLE__) */
143  /* No-op on any other platform */
144 #endif /* !defined(__APPLE__) */
145 }
#define r
Definition: crypto_rc6.c:12
static static fork write
Definition: sflib.h:33

References assert(), EINTR, NULL, r, s, and write.

Referenced by uv__drain(), uv__read(), uv__stream_close(), uv__stream_eof(), uv__write(), uv_read_start(), uv_read_stop(), uv_shutdown(), uv_try_write(), and uv_write2().

◆ uv__stream_queue_fd()

static int uv__stream_queue_fd ( uv_stream_t stream,
int  fd 
)
static

Definition at line 1012 of file stream.c.

1012  {
1013  uv__stream_queued_fds_t* queued_fds;
1014  unsigned int queue_size;
1015 
1016  queued_fds = stream->queued_fds;
1017  if (queued_fds == NULL) {
1018  queue_size = 8;
1019  queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
1020  sizeof(*queued_fds));
1021  if (queued_fds == NULL)
1022  return UV_ENOMEM;
1023  queued_fds->size = queue_size;
1024  queued_fds->offset = 0;
1025  stream->queued_fds = queued_fds;
1026 
1027  /* Grow */
1028  } else if (queued_fds->size == queued_fds->offset) {
1029  queue_size = queued_fds->size + 8;
1030  queued_fds = uv__realloc(queued_fds,
1031  (queue_size - 1) * sizeof(*queued_fds->fds) +
1032  sizeof(*queued_fds));
1033 
1034  /*
1035  * Allocation failure, report back.
1036  * NOTE: if it is fatal - sockets will be closed in uv__stream_close
1037  */
1038  if (queued_fds == NULL)
1039  return UV_ENOMEM;
1040  queued_fds->size = queue_size;
1041  stream->queued_fds = queued_fds;
1042  }
1043 
1044  /* Put fd in a queue */
1045  queued_fds->fds[queued_fds->offset++] = fd;
1046 
1047  return 0;
1048 }
unsigned int size
Definition: internal.h:152
void * uv__realloc(void *ptr, size_t size)
Definition: uv-common.c:96
void * uv__malloc(size_t size)
Definition: uv-common.c:75

References fd, uv__stream_queued_fds_s::fds, NULL, uv__stream_queued_fds_s::offset, uv__stream_queued_fds_s::size, uv__malloc(), and uv__realloc().

Referenced by uv__stream_recv_cmsg().

◆ uv__stream_recv_cmsg()

static int uv__stream_recv_cmsg ( uv_stream_t stream,
struct msghdr msg 
)
static

Definition at line 1060 of file stream.c.

1060  {
1061  struct cmsghdr* cmsg;
1062 
1063  for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
1064  char* start;
1065  char* end;
1066  int err;
1067  void* pv;
1068  int* pi;
1069  unsigned int i;
1070  unsigned int count;
1071 
1072  if (cmsg->cmsg_type != SCM_RIGHTS) {
1073  fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
1074  cmsg->cmsg_type);
1075  continue;
1076  }
1077 
1078  /* silence aliasing warning */
1079  pv = CMSG_DATA(cmsg);
1080  pi = pv;
1081 
1082  /* Count available fds */
1083  start = (char*) cmsg;
1084  end = (char*) cmsg + cmsg->cmsg_len;
1085  count = 0;
1086  while (start + CMSG_LEN(count * sizeof(*pi)) < end)
1087  count++;
1088  assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
1089 
1090  for (i = 0; i < count; i++) {
1091  /* Already has accepted fd, queue now */
1092  if (stream->accepted_fd != -1) {
1093  err = uv__stream_queue_fd(stream, pi[i]);
1094  if (err != 0) {
1095  /* Close rest */
1096  for (; i < count; i++)
1097  uv__close(pi[i]);
1098  return err;
1099  }
1100  } else {
1101  stream->accepted_fd = pi[i];
1102  }
1103  }
1104  }
1105 
1106  return 0;
1107 }
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence static length const void static len static semflg const void static shmflg const struct timespec struct timespec static rem const char static group const void start
Definition: sflib.h:133
while(len< limit &&buf1[len]==buf2[len])++len
#define CMSG_LEN(len)
Definition: sftypes.h:425
@ SCM_RIGHTS
Definition: sftypes.h:383
#define CMSG_FIRSTHDR(mhdr)
Definition: sftypes.h:418
#define CMSG_NXTHDR(mhdr, cmsg)
Definition: sftypes.h:417
#define CMSG_DATA(cmsg)
Definition: sftypes.h:416
int cmsg_type
Definition: sftypes.h:412
size_t cmsg_len
Definition: sftypes.h:409
static int uv__stream_queue_fd(uv_stream_t *stream, int fd)
Definition: stream.c:1012

References assert(), CMSG_DATA, CMSG_FIRSTHDR, cmsghdr::cmsg_len, CMSG_LEN, CMSG_NXTHDR, cmsghdr::cmsg_type, count, test_evm::end, err, i, msg, NULL, SCM_RIGHTS, start, uv__close(), uv__stream_queue_fd(), and while().

Referenced by uv__read().

◆ uv__write()

static void uv__write ( uv_stream_t stream)
static

Definition at line 802 of file stream.c.

802  {
803  struct iovec* iov;
804  QUEUE* q;
805  uv_write_t* req;
806  int iovmax;
807  int iovcnt;
808  ssize_t n;
809  int err;
810 
811 start:
812 
813  assert(uv__stream_fd(stream) >= 0);
814 
815  if (QUEUE_EMPTY(&stream->write_queue))
816  return;
817 
818  q = QUEUE_HEAD(&stream->write_queue);
820  assert(req->handle == stream);
821 
822  /*
823  * Cast to iovec. We had to have our own uv_buf_t instead of iovec
824  * because Windows's WSABUF is not an iovec.
825  */
826  assert(sizeof(uv_buf_t) == sizeof(struct iovec));
827  iov = (struct iovec*) &(req->bufs[req->write_index]);
828  iovcnt = req->nbufs - req->write_index;
829 
830  iovmax = uv__getiovmax();
831 
832  /* Limit iov count to avoid EINVALs from writev() */
833  if (iovcnt > iovmax)
834  iovcnt = iovmax;
835 
836  /*
837  * Now do the actual writev. Note that we've been updating the pointers
838  * inside the iov each time we write. So there is no need to offset it.
839  */
840 
841  if (req->send_handle) {
842  int fd_to_send;
843  struct msghdr msg;
844  struct cmsghdr *cmsg;
845  union {
846  char data[64];
847  struct cmsghdr alias;
848  } scratch;
849 
850  if (uv__is_closing(req->send_handle)) {
851  err = UV_EBADF;
852  goto error;
853  }
854 
855  fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
856 
857  memset(&scratch, 0, sizeof(scratch));
858 
859  assert(fd_to_send >= 0);
860 
861  msg.msg_name = NULL;
862  msg.msg_namelen = 0;
863  msg.msg_iov = iov;
864  msg.msg_iovlen = iovcnt;
865  msg.msg_flags = 0;
866 
867  msg.msg_control = &scratch.alias;
868  msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
869 
870  cmsg = CMSG_FIRSTHDR(&msg);
871  cmsg->cmsg_level = SOL_SOCKET;
872  cmsg->cmsg_type = SCM_RIGHTS;
873  cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
874 
875  /* silence aliasing warning */
876  {
877  void* pv = CMSG_DATA(cmsg);
878  int* pi = pv;
879  *pi = fd_to_send;
880  }
881 
882  do
883  n = sendmsg(uv__stream_fd(stream), &msg, 0);
884  while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
885 
886  /* Ensure the handle isn't sent again in case this is a partial write. */
887  if (n >= 0)
888  req->send_handle = NULL;
889  } else {
890  do
891  n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
892  while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
893  }
894 
895  if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
896  err = UV__ERR(errno);
897  goto error;
898  }
899 
900  if (n >= 0 && uv__write_req_update(stream, req, n)) {
902  return; /* TODO(bnoordhuis) Start trying to write the next request. */
903  }
904 
905  /* If this is a blocking stream, try again. */
906  if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
907  goto start;
908 
909  /* We're not done. */
910  uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
911 
912  /* Notify select() thread about state change */
914 
915  return;
916 
917 error:
918  req->error = err;
920  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
921  if (!uv__io_active(&stream->io_watcher, POLLIN))
924 }
int n
Definition: mipsasm.c:19
int cmsg_level
Definition: sftypes.h:411
static uv_buf_t iov
Definition: main.c:15
int uv__getiovmax(void)
Definition: core.c:215
static int uv__handle_fd(uv_handle_t *handle)
Definition: stream.c:788
static int uv__write_req_update(uv_stream_t *stream, uv_write_t *req, size_t n)
Definition: stream.c:737
static ssize_t uv__writev(int fd, struct iovec *vec, size_t n)
Definition: stream.c:711
#define IS_TRANSIENT_WRITE_ERROR(errno, send_handle)
Definition: stream.c:73
static void uv__write_req_finish(uv_write_t *req)
Definition: stream.c:762
#define RETRY_ON_WRITE_ERROR(errno)
Definition: stream.c:72
@ UV_HANDLE_BLOCKING_WRITES
Definition: uv-common.h:98
#define uv__is_closing(h)
Definition: uv-common.h:255

References assert(), CMSG_DATA, CMSG_FIRSTHDR, cmsghdr::cmsg_len, CMSG_LEN, cmsghdr::cmsg_level, CMSG_SPACE, cmsghdr::cmsg_type, err, error(), iov, IS_TRANSIENT_WRITE_ERROR, memset(), msg, n, NULL, queue, QUEUE_DATA, QUEUE_EMPTY, QUEUE_HEAD, req, RETRY_ON_WRITE_ERROR, SCM_RIGHTS, SOL_SOCKET, start, UV__ERR, uv__getiovmax(), uv__handle_fd(), uv__handle_stop, uv__io_active(), uv__io_start(), uv__io_stop(), uv__is_closing, uv__stream_fd, uv__stream_osx_interrupt_select(), uv__write_req_finish(), uv__write_req_update(), uv__writev(), and UV_HANDLE_BLOCKING_WRITES.

Referenced by uv__stream_io(), and uv_write2().

◆ uv__write_callbacks()

static void uv__write_callbacks ( uv_stream_t stream)
static

Definition at line 927 of file stream.c.

927  {
928  uv_write_t* req;
929  QUEUE* q;
930  QUEUE pq;
931 
932  if (QUEUE_EMPTY(&stream->write_completed_queue))
933  return;
934 
935  QUEUE_MOVE(&stream->write_completed_queue, &pq);
936 
937  while (!QUEUE_EMPTY(&pq)) {
938  /* Pop a req off write_completed_queue. */
939  q = QUEUE_HEAD(&pq);
941  QUEUE_REMOVE(q);
942  uv__req_unregister(stream->loop, req);
943 
944  if (req->bufs != NULL) {
945  stream->write_queue_size -= uv__write_req_size(req);
946  if (req->bufs != req->bufsml)
947  uv__free(req->bufs);
948  req->bufs = NULL;
949  }
950 
951  /* NOTE: call callback AFTER freeing the request data. */
952  if (req->cb)
953  req->cb(req, req->error);
954  }
955 }
#define QUEUE_MOVE(h, n)
Definition: queue.h:72
static size_t uv__write_req_size(uv_write_t *req)
Definition: stream.c:719

References NULL, queue, QUEUE_DATA, QUEUE_EMPTY, QUEUE_HEAD, QUEUE_MOVE, QUEUE_REMOVE, req, uv__free(), uv__req_unregister, and uv__write_req_size().

Referenced by uv__stream_connect(), uv__stream_destroy(), and uv__stream_io().

◆ uv__write_req_finish()

static void uv__write_req_finish ( uv_write_t req)
static

Definition at line 762 of file stream.c.

762  {
763  uv_stream_t* stream = req->handle;
764 
765  /* Pop the req off tcp->write_queue. */
766  QUEUE_REMOVE(&req->queue);
767 
768  /* Only free when there was no error. On error, we touch up write_queue_size
769  * right before making the callback. The reason we don't do that right away
770  * is that a write_queue_size > 0 is our only way to signal to the user that
771  * they should stop writing - which they should if we got an error. Something
772  * to revisit in future revisions of the libuv API.
773  */
774  if (req->error == 0) {
775  if (req->bufs != req->bufsml)
776  uv__free(req->bufs);
777  req->bufs = NULL;
778  }
779 
780  /* Add it to the write_completed_queue where it will have its
781  * callback called in the near future.
782  */
783  QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
784  uv__io_feed(stream->loop, &stream->io_watcher);
785 }
void uv__io_feed(uv_loop_t *loop, uv__io_t *w)
Definition: core.c:952

References NULL, QUEUE_INSERT_TAIL, QUEUE_REMOVE, req, uv__free(), and uv__io_feed().

Referenced by uv__write().

◆ uv__write_req_size()

static size_t uv__write_req_size ( uv_write_t req)
static

Definition at line 719 of file stream.c.

719  {
720  size_t size;
721 
722  assert(req->bufs != NULL);
723  size = uv__count_bufs(req->bufs + req->write_index,
724  req->nbufs - req->write_index);
725  assert(req->handle->write_queue_size >= size);
726 
727  return size;
728 }
voidpf void uLong size
Definition: ioapi.h:138
size_t uv__count_bufs(const uv_buf_t bufs[], unsigned int nbufs)
Definition: uv-common.c:573

References assert(), NULL, req, and uv__count_bufs().

Referenced by uv__write_callbacks(), and uv_try_write().

◆ uv__write_req_update()

static int uv__write_req_update ( uv_stream_t stream,
uv_write_t req,
size_t  n 
)
static

Definition at line 737 of file stream.c.

739  {
740  uv_buf_t* buf;
741  size_t len;
742 
743  assert(n <= stream->write_queue_size);
744  stream->write_queue_size -= n;
745 
746  buf = req->bufs + req->write_index;
747 
748  do {
749  len = n < buf->len ? n : buf->len;
750  buf->base += len;
751  buf->len -= len;
752  buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */
753  n -= len;
754  } while (n > 0);
755 
756  req->write_index = buf - req->bufs;
757 
758  return req->write_index == req->nbufs;
759 }

References assert(), len, n, and req.

Referenced by uv__write().

◆ uv__writev()

static ssize_t uv__writev ( int  fd,
struct iovec vec,
size_t  n 
)
static

Definition at line 711 of file stream.c.

711  {
712  if (n == 1)
713  return write(fd, vec->iov_base, vec->iov_len);
714  else
715  return writev(fd, vec, n);
716 }
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz writev
Definition: sflib.h:82

References fd, n, write, and writev.

Referenced by uv__write().

◆ uv_accept()

int uv_accept ( uv_stream_t server,
uv_stream_t client 
)

Definition at line 591 of file stream.c.

591  {
592  int err;
593 
594  assert(server->loop == client->loop);
595 
596  if (server->accepted_fd == -1)
597  return UV_EAGAIN;
598 
599  switch (client->type) {
600  case UV_NAMED_PIPE:
601  case UV_TCP:
602  err = uv__stream_open(client,
603  server->accepted_fd,
605  if (err) {
606  /* TODO handle error */
607  uv__close(server->accepted_fd);
608  goto done;
609  }
610  break;
611 
612  case UV_UDP:
613  err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
614  if (err) {
615  uv__close(server->accepted_fd);
616  goto done;
617  }
618  break;
619 
620  default:
621  return UV_EINVAL;
622  }
623 
624  client->flags |= UV_HANDLE_BOUND;
625 
626 done:
627  /* Process queued fds */
628  if (server->queued_fds != NULL) {
629  uv__stream_queued_fds_t* queued_fds;
630 
631  queued_fds = server->queued_fds;
632 
633  /* Read first */
634  server->accepted_fd = queued_fds->fds[0];
635 
636  /* All read, free */
637  assert(queued_fds->offset > 0);
638  if (--queued_fds->offset == 0) {
639  uv__free(queued_fds);
640  server->queued_fds = NULL;
641  } else {
642  /* Shift rest */
643  memmove(queued_fds->fds,
644  queued_fds->fds + 1,
645  queued_fds->offset * sizeof(*queued_fds->fds));
646  }
647  } else {
648  server->accepted_fd = -1;
649  if (err == 0)
650  uv__io_start(server->loop, &server->io_watcher, POLLIN);
651  }
652  return err;
653 }
struct tab * done
Definition: enough.c:233
int uv__stream_open(uv_stream_t *stream, int fd, int flags)
Definition: stream.c:406
@ UV_HANDLE_BOUND
Definition: uv-common.h:91
UV_EXTERN int uv_udp_open(uv_udp_t *handle, uv_os_sock_t sock)
Definition: udp.c:991

◆ uv_is_readable()

int uv_is_readable ( const uv_stream_t stream)

Definition at line 1606 of file stream.c.

1606  {
1607  return !!(stream->flags & UV_HANDLE_READABLE);
1608 }

◆ uv_is_writable()

int uv_is_writable ( const uv_stream_t stream)

Definition at line 1611 of file stream.c.

1611  {
1612  return !!(stream->flags & UV_HANDLE_WRITABLE);
1613 }

◆ uv_listen()

int uv_listen ( uv_stream_t stream,
int  backlog,
uv_connection_cb  cb 
)

Definition at line 656 of file stream.c.

656  {
657  int err;
658 
659  switch (stream->type) {
660  case UV_TCP:
661  err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
662  break;
663 
664  case UV_NAMED_PIPE:
665  err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
666  break;
667 
668  default:
669  err = UV_EINVAL;
670  }
671 
672  if (err == 0)
674 
675  return err;
676 }
Definition: uv.h:547
int uv_pipe_listen(uv_pipe_t *handle, int backlog, uv_connection_cb cb)
Definition: pipe.c:94
int uv_tcp_listen(uv_tcp_t *tcp, int backlog, uv_connection_cb cb)
Definition: tcp.c:328
#define uv__handle_start(h)
Definition: uv-common.h:258
static const char * cb[]
Definition: z80_tab.h:176

◆ uv_read_start()

int uv_read_start ( uv_stream_t stream,
uv_alloc_cb  alloc_cb,
uv_read_cb  read_cb 
)

Definition at line 1555 of file stream.c.

1557  {
1558  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1559  stream->type == UV_TTY);
1560 
1561  if (stream->flags & UV_HANDLE_CLOSING)
1562  return UV_EINVAL;
1563 
1564  if (!(stream->flags & UV_HANDLE_READABLE))
1565  return UV_ENOTCONN;
1566 
1567  /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
1568  * expresses the desired state of the user.
1569  */
1570  stream->flags |= UV_HANDLE_READING;
1571 
1572  /* TODO: try to do the read inline? */
1573  /* TODO: keep track of tcp state. If we've gotten a EOF then we should
1574  * not start the IO watcher.
1575  */
1576  assert(uv__stream_fd(stream) >= 0);
1577  assert(alloc_cb);
1578 
1579  stream->read_cb = read_cb;
1580  stream->alloc_cb = alloc_cb;
1581 
1582  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1585 
1586  return 0;
1587 }

◆ uv_read_stop()

int uv_read_stop ( uv_stream_t stream)

Definition at line 1590 of file stream.c.

1590  {
1591  if (!(stream->flags & UV_HANDLE_READING))
1592  return 0;
1593 
1594  stream->flags &= ~UV_HANDLE_READING;
1595  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1596  if (!uv__io_active(&stream->io_watcher, POLLOUT))
1599 
1600  stream->read_cb = NULL;
1601  stream->alloc_cb = NULL;
1602  return 0;
1603 }

Referenced by uv__stream_close().

◆ uv_shutdown()

int uv_shutdown ( uv_shutdown_t req,
uv_stream_t stream,
uv_shutdown_cb  cb 
)

Definition at line 1259 of file stream.c.

1259  {
1260  assert(stream->type == UV_TCP ||
1261  stream->type == UV_TTY ||
1262  stream->type == UV_NAMED_PIPE);
1263 
1264  if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1265  stream->flags & UV_HANDLE_SHUT ||
1266  stream->flags & UV_HANDLE_SHUTTING ||
1268  return UV_ENOTCONN;
1269  }
1270 
1271  assert(uv__stream_fd(stream) >= 0);
1272 
1273  /* Initialize request */
1274  uv__req_init(stream->loop, req, UV_SHUTDOWN);
1275  req->handle = stream;
1276  req->cb = cb;
1277  stream->shutdown_req = req;
1278  stream->flags |= UV_HANDLE_SHUTTING;
1279 
1280  uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1282 
1283  return 0;
1284 }
#define uv__req_init(loop, req, typ)
Definition: uv-common.h:329

◆ uv_stream_set_blocking()

int uv_stream_set_blocking ( uv_stream_t handle,
int  blocking 
)

Definition at line 1688 of file stream.c.

1688  {
1689  /* Don't need to check the file descriptor, uv__nonblock()
1690  * will fail with EBADF if it's not valid.
1691  */
1692  return uv__nonblock(uv__stream_fd(handle), !blocking);
1693 }
#define uv__nonblock
Definition: internal.h:170

◆ uv_try_write()

int uv_try_write ( uv_stream_t stream,
const uv_buf_t  bufs[],
unsigned int  nbufs 
)

Definition at line 1507 of file stream.c.

1509  {
1510  int r;
1511  int has_pollout;
1512  size_t written;
1513  size_t req_size;
1514  uv_write_t req;
1515 
1516  /* Connecting or already writing some data */
1517  if (stream->connect_req != NULL || stream->write_queue_size != 0)
1518  return UV_EAGAIN;
1519 
1520  has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);
1521 
1522  r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
1523  if (r != 0)
1524  return r;
1525 
1526  /* Remove not written bytes from write queue size */
1527  written = uv__count_bufs(bufs, nbufs);
1528  if (req.bufs != NULL)
1529  req_size = uv__write_req_size(&req);
1530  else
1531  req_size = 0;
1532  written -= req_size;
1533  stream->write_queue_size -= req_size;
1534 
1535  /* Unqueue request, regardless of immediateness */
1536  QUEUE_REMOVE(&req.queue);
1537  uv__req_unregister(stream->loop, &req);
1538  if (req.bufs != req.bufsml)
1539  uv__free(req.bufs);
1540  req.bufs = NULL;
1541 
1542  /* Do not poll for writable, if we wasn't before calling this */
1543  if (!has_pollout) {
1544  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1546  }
1547 
1548  if (written == 0 && req_size != 0)
1549  return req.error < 0 ? req.error : UV_EAGAIN;
1550  else
1551  return written;
1552 }
static char bufs[4][128]
Buffers for uint64_to_str() and uint64_to_nicestr()
Definition: util.c:18
void uv_try_write_cb(uv_write_t *req, int status)
Definition: stream.c:1501
int uv_write(uv_write_t *req, uv_stream_t *handle, const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb)
Definition: stream.c:1492

◆ uv_try_write_cb()

void uv_try_write_cb ( uv_write_t req,
int  status 
)

Definition at line 1501 of file stream.c.

1501  {
1502  /* Should not be called */
1503  abort();
1504 }

Referenced by uv_try_write().

◆ uv_write()

int uv_write ( uv_write_t req,
uv_stream_t handle,
const uv_buf_t  bufs[],
unsigned int  nbufs,
uv_write_cb  cb 
)

Definition at line 1492 of file stream.c.

1496  {
1497  return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1498 }
int uv_write2(uv_write_t *req, uv_stream_t *stream, const uv_buf_t bufs[], unsigned int nbufs, uv_stream_t *send_handle, uv_write_cb cb)
Definition: stream.c:1393

Referenced by uv_try_write(), and uv_write2().

◆ uv_write2()

int uv_write2 ( uv_write_t req,
uv_stream_t stream,
const uv_buf_t  bufs[],
unsigned int  nbufs,
uv_stream_t send_handle,
uv_write_cb  cb 
)

Definition at line 1393 of file stream.c.

1398  {
1399  int empty_queue;
1400 
1401  assert(nbufs > 0);
1402  assert((stream->type == UV_TCP ||
1403  stream->type == UV_NAMED_PIPE ||
1404  stream->type == UV_TTY) &&
1405  "uv_write (unix) does not yet support other types of streams");
1406 
1407  if (uv__stream_fd(stream) < 0)
1408  return UV_EBADF;
1409 
1410  if (!(stream->flags & UV_HANDLE_WRITABLE))
1411  return UV_EPIPE;
1412 
1413  if (send_handle) {
1414  if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1415  return UV_EINVAL;
1416 
1417  /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1418  * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1419  * evaluates to a function that operates on a uv_stream_t with a couple of
1420  * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1421  * which works but only by accident.
1422  */
1423  if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1424  return UV_EBADF;
1425 
1426 #if defined(__CYGWIN__) || defined(__MSYS__)
1427  /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
1428  See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
1429  return UV_ENOSYS;
1430 #endif
1431  }
1432 
1433  /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1434  * it means there are error-state requests in the write_completed_queue that
1435  * will touch up write_queue_size later, see also uv__write_req_finish().
1436  * We could check that write_queue is empty instead but that implies making
1437  * a write() syscall when we know that the handle is in error mode.
1438  */
1439  empty_queue = (stream->write_queue_size == 0);
1440 
1441  /* Initialize the req */
1442  uv__req_init(stream->loop, req, UV_WRITE);
1443  req->cb = cb;
1444  req->handle = stream;
1445  req->error = 0;
1446  req->send_handle = send_handle;
1447  QUEUE_INIT(&req->queue);
1448 
1449  req->bufs = req->bufsml;
1450  if (nbufs > ARRAY_SIZE(req->bufsml))
1451  req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
1452 
1453  if (req->bufs == NULL)
1454  return UV_ENOMEM;
1455 
1456  memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1457  req->nbufs = nbufs;
1458  req->write_index = 0;
1459  stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1460 
1461  /* Append the request to write_queue. */
1462  QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
1463 
1464  /* If the queue was empty when this function began, we should attempt to
1465  * do the write immediately. Otherwise start the write_watcher and wait
1466  * for the fd to become writable.
1467  */
1468  if (stream->connect_req) {
1469  /* Still connecting, do nothing. */
1470  }
1471  else if (empty_queue) {
1472  uv__write(stream);
1473  }
1474  else {
1475  /*
1476  * blocking streams should never have anything in the queue.
1477  * if this assert fires then somehow the blocking stream isn't being
1478  * sufficiently flushed in uv__write.
1479  */
1481  uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1483  }
1484 
1485  return 0;
1486 }
#define ARRAY_SIZE(a)
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))

Referenced by uv_write().