uvcc
libuv C++ bindings
cpio

A simple program that copies its stdin to stdout written in pure C using libuv.

For the sake of simplicity it is assumed that both streams can be handled with as pipes.

There are some points that should be mentioned:

example/cpio-uv.c 
1 
2 #include <uv.h>
3 #include <stdlib.h>
4 #include <stdio.h>
5 
6 #ifndef _WIN32
7 #include <signal.h>
8 sighandler_t sigpipe_handler = signal(SIGPIPE, SIG_IGN); // ignore SIGPIPE
9 #endif
10 
11 
12 #define PRINT_UV_ERR(code, printf_args...) do {\
13  fflush(stdout);\
14  fprintf(stderr, "" printf_args);\
15  fprintf(stderr, ": %s (%i): %s\n", uv_err_name(code), (int)(code), uv_strerror(code));\
16  fflush(stderr);\
17 } while (0)
18 
19 
20 /* assume that stdin and stdout can be handled with as pipes */
21 uv_pipe_t in, out;
22 
23 
24 /* define queue size limits and transfer control states */
25 const size_t BUFFER_SIZE = 8192;
26 const size_t WRITE_QUEUE_SIZE_UPPER_LIMIT = 128*BUFFER_SIZE,
27  WRITE_QUEUE_SIZE_LOWER_LIMIT = 16*BUFFER_SIZE;
28 enum { RD_UNKNOWN, RD_STOP, RD_PAUSE, RD_START } rdcmd_state = RD_UNKNOWN;
29 
30 int wr_err_reported = 0;
31 
32 
33 /* forward declarations for callback functions */
34 void alloc_cb(uv_handle_t*, size_t, uv_buf_t*);
35 void read_cb(uv_stream_t*, ssize_t, const uv_buf_t*);
36 void write_cb(uv_write_t*, int);
37 
38 
39 int main(int _argc, char *_argv[])
40 {
41  int ret = 0;
42 
43  uv_loop_t *loop = uv_default_loop();
44 
45  uv_pipe_init(loop, &in, 0);
46  ret = uv_pipe_open(&in, fileno(stdin));
47  if (ret < 0)
48  {
49  PRINT_UV_ERR(ret, "stdin open");
50  return ret;
51  }
52 
53  uv_pipe_init(loop, &out, 0);
54  ret = uv_pipe_open(&out, fileno(stdout));
55  if (ret < 0)
56  {
57  PRINT_UV_ERR(ret, "stdout open");
58  return ret;
59  }
60 
61  rdcmd_state = RD_START;
62  ret = uv_read_start((uv_stream_t*)&in, alloc_cb, read_cb);
63  if (ret < 0)
64  {
65  PRINT_UV_ERR(ret, "read initiation");
66  return ret;
67  }
68 
69  return uv_run(loop, UV_RUN_DEFAULT);
70 }
71 
72 
73 
74 void alloc_cb(uv_handle_t *_handle, size_t _suggested_size, uv_buf_t *_buf)
75 {
76  /* allocate the memory for a new I/O buffer */
77  *_buf = uv_buf_init((char*)malloc(BUFFER_SIZE), BUFFER_SIZE);
78 }
79 
80 
81 void read_cb(uv_stream_t *_stream, ssize_t _nread, const uv_buf_t *_buf)
82 {
83  if (_nread < 0)
84  {
85  if (_nread != UV_EOF) PRINT_UV_ERR(_nread, "read");
86  uv_read_stop(_stream);
87  }
88  else if (_nread > 0)
89  {
90  /* initialize a new buffer descriptor specifying the actual data length */
91  uv_buf_t buf = uv_buf_init(_buf->base, _nread);
92 
93  /* create a write request descriptor; see note [1] */
94  uv_write_t *wr = (uv_write_t*)malloc(sizeof(uv_write_t));
95 
96  /* save a reference to the I/O buffer somehow along with the write request; see note [2] */
97  wr->data = _buf->base;
98 
99  /* fire up the write request */
100  int ret = uv_write(wr, (uv_stream_t*)&out, &buf, 1, write_cb);
101  /* the I/O buffer being used up should be deleted somewhere after the request has completed;
102  see note [3] */
103  if (ret < 0)
104  {
105  PRINT_UV_ERR(ret, "write initiation");
106  rdcmd_state = RD_STOP;
107  uv_read_stop((uv_stream_t*)&in);
108  }
109 
110  /* stop reading from stdin when a consumer of stdout does not keep up with our transferring rate
111  and stdout write queue size has grown up significantly; see note [4] */
112  if (rdcmd_state == RD_START && out.write_queue_size >= WRITE_QUEUE_SIZE_UPPER_LIMIT)
113  {
114  rdcmd_state = RD_PAUSE;
115  uv_read_stop((uv_stream_t*)&in);
116  }
117  }
118 }
119 
120 
121 void write_cb(uv_write_t *_wr, int _status)
122 {
123  if (_status < 0)
124  {
125  if (!wr_err_reported)
126  {
127  /* report only the very first occurrence of the failure */
128  PRINT_UV_ERR(_status, "write");
129  wr_err_reported = 1;
130  }
131 
132  rdcmd_state = RD_STOP;
133  uv_read_stop((uv_stream_t*)&in);
134  }
135  else
136  {
137  /* resume stdin to stdout transferring when stdout output queue has gone away; see note [4] */
138  if (rdcmd_state == RD_PAUSE && out.write_queue_size <= WRITE_QUEUE_SIZE_LOWER_LIMIT)
139  {
140  rdcmd_state = RD_START;
141  uv_read_start((uv_stream_t*)&in, alloc_cb, read_cb);
142  }
143  }
144 
145  /* when the write request has completed it's safe to free up the memory allocated for the I/O buffer;
146  see notes [2][3] */
147  free(_wr->data);
148  /* delete the write request descriptor */
149  free(_wr);
150 }
T free(T... args)
T signal(T... args)
T malloc(T... args)

The following is the above program being rewritten using uvcc. All the considered points are taken into account by design of uvcc.

example/cpio-uvcc.cpp 
1 
2 #include "uvcc.hpp"
3 #include <cstdio>
4 
5 #ifndef _WIN32
6 #include <signal.h>
7 sighandler_t sigpipe_handler = signal(SIGPIPE, SIG_IGN); // ignore SIGPIPE
8 #endif
9 
10 
11 #define PRINT_UV_ERR(code, printf_args...) do {\
12  fflush(stdout);\
13  fprintf(stderr, "" printf_args);\
14  fprintf(stderr, ": %s (%i): %s\n", ::uv_err_name(code), (int)(code), ::uv_strerror(code));\
15  fflush(stderr);\
16 } while (0)
17 
18 
19 uv::pipe in(uv::loop::Default(), fileno(stdin), false, false),
20  out(uv::loop::Default(), fileno(stdout), false, false);
21 
22 constexpr std::size_t BUFFER_SIZE = 8192;
23 constexpr std::size_t WRITE_QUEUE_SIZE_UPPER_LIMIT = 128*BUFFER_SIZE,
24  WRITE_QUEUE_SIZE_LOWER_LIMIT = 16*BUFFER_SIZE;
25 
26 bool wr_err_reported = false;
27 
28 
29 void read_cb(uv::io, ssize_t, uv::buffer, int64_t, void*);
30 void write_cb(uv::write, uv::buffer);
31 
32 
33 int main(int _argc, char *_argv[])
34 {
35  if (!in)
36  {
37  PRINT_UV_ERR(in.uv_status(), "stdin open");
38  return in.uv_status();
39  }
40  if (!out)
41  {
42  PRINT_UV_ERR(out.uv_status(), "stdout open");
43  return out.uv_status();
44  }
45 
46  in.read_start(
47  [](uv::handle, std::size_t){ return uv::buffer{ BUFFER_SIZE }; },
48  read_cb
49  );
50  if (!in)
51  {
52  PRINT_UV_ERR(in.uv_status(), "read initiation");
53  return in.uv_status();
54  }
55 
56  return uv::loop::Default().run(UV_RUN_DEFAULT);
57 }
58 
59 
60 
61 void read_cb(uv::io _io, ssize_t _nread, uv::buffer _buffer, int64_t, void*)
62 {
63  if (_nread < 0)
64  {
65  if (_nread != UV_EOF) PRINT_UV_ERR(_nread, "read");
66  _io.read_stop();
67  }
68  else if (_nread > 0)
69  {
70  _buffer.len() = _nread;
71 
72  uv::write wr;
73  wr.on_request() = write_cb;
74 
75  wr.run(out, _buffer);
76  if (!wr)
77  {
78  PRINT_UV_ERR(wr.uv_status(), "write initiation");
79  _io.read_stop();
80  }
81 
82  in.read_pause(out.write_queue_size() >= WRITE_QUEUE_SIZE_UPPER_LIMIT);
83  }
84 }
85 
86 
87 void write_cb(uv::write _wr, uv::buffer)
88 {
89  if (!_wr)
90  {
91  if (!wr_err_reported)
92  {
93  PRINT_UV_ERR(_wr.uv_status(), "write");
94  wr_err_reported = true;
95  }
96 
97  in.read_stop();
98  }
99  else
100  in.read_resume(out.write_queue_size() <= WRITE_QUEUE_SIZE_LOWER_LIMIT);
101 }
int run(stream &_stream, const buffer &_buf)
Run the request.
decltype(uv_t::len) & len(const std::size_t _i=0) const noexcept
The .len field of the _i-th buffer structure.
Definition: buffer.hpp:242
int read_stop() const
Stop reading data from the I/O endpoint.
Definition: handle-io.hpp:254
The base class for the libuv handles.
Definition: handle-base.hpp:33
static loop & Default() noexcept
Returns the initialized loop that can be used as a global default loop throughout the program...
Definition: loop.hpp:220
int run(::uv_run_mode _mode)
Go into a loop and process events and their callbacks with the current thread.
Definition: loop.hpp:255
int uv_status() const noexcept
The status value returned by the last executed libuv API function on this request.
Stream write request type.
T signal(T... args)
The base class for handles representing I/O endpoints: a file, TCP/UDP socket, pipe, TTY.
Definition: handle-io.hpp:25
Pipe handle.
Encapsulates uv_buf_t data type and provides uv_buf_t[] functionality.
Definition: buffer.hpp:28

Here is a performance comparison between the two variants:

[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /dev/zero | build/example/cpio-uv | dd of=/dev/null iflag=fullblock bs=1M count=10000
write: EPIPE (-32): broken pipe
10000+0 records in
10000+0 records out
10485760000 bytes (10 GB) copied, 38.5126 s, 272 MB/s
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /dev/zero | build/example/cpio-uv | dd of=/dev/null iflag=fullblock bs=1M count=10000
write: EPIPE (-32): broken pipe
10000+0 records in
10000+0 records out
10485760000 bytes (10 GB) copied, 38.4723 s, 273 MB/s

[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /dev/zero | build/example/cpio-uvcc | dd of=/dev/null iflag=fullblock bs=1M count=10000
write: EPIPE (-32): broken pipe
10000+0 records in
10000+0 records out
10485760000 bytes (10 GB) copied, 38.7677 s, 270 MB/s
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /dev/zero | build/example/cpio-uvcc | dd of=/dev/null iflag=fullblock bs=1M count=10000
write: EPIPE (-32): broken pipe
10000+0 records in
10000+0 records out
10485760000 bytes (10 GB) copied, 38.9242 s, 269 MB/s

Obviously there is an impact of reference counting operations and other C++ related overheads that slightly reduce the performance.