uvcc
libuv C++ bindings
tee

A full-fledged simple program that copies its stdin to stdout and also to each file specified as a program argument.

It demonstrates the following points:

  1. Different I/O endpoints and common input/output operations for them can be handled in a generic uniform manner. The program can deal with stdin, stdout handles that can be system descriptors of any sort of supported I/O endpoints: a file, TCP/UDP socket, pipe, or TTY. Minimal changes relative to the previous example for cpio program need to be made in source code to get a full-fledged application.
  2. The very same uvcc buffer can be easily dispatched to several asynchronous operations and its lifetime will continue until the last operation has completed.
  3. An example for a simple version of the buffer pool implementation. The pool helps avoid intense memory allocation requests. Note that it may actually decrease the performance. The provided simple implementation is an auto-growing pool that is not thread-safe, so either only one dedicated thread might acquire an item from the pool in multi-thread environment or such acquire requests from different threads shall not interleave with each other. As far as this program is a typical single-thread libuv application these requirements are fulfilled. The condition of item's (nrefs() == 1) indicates that no more references are left anywhere at the runtime other than in the pool container itself and thus this spare item can be returned on an acquire request.

In debug build some diagnostic messages are printed out.

See also
stackoverflow.com: "libuv allocated memory buffers re-use techniques"
example/tee1.cpp 
1 
2 #include "uvcc.hpp"
3 
4 #include <cstdio>
5 #include <cinttypes> // PRI*
6 #include <vector>
7 #include <fcntl.h> // O_*
8 
9 #ifndef _WIN32
10 #include <signal.h>
11 sighandler_t sigpipe_handler = signal(SIGPIPE, SIG_IGN); // ignore SIGPIPE
12 #endif
13 
14 
15 #ifndef NDEBUG
16 #define DEBUG_LOG(condition, format, ...) do {\
17  if ((condition))\
18  {\
19  fflush(stdout);\
20  fprintf(stderr, (format), ##__VA_ARGS__);\
21  fflush(stderr);\
22  }\
23 } while (0)
24 #else
25 #define DEBUG_LOG(condition, ...) (void)(condition)
26 #endif
27 
28 
29 #define PRINT_UV_ERR(code, printf_args...) do {\
30  fflush(stdout);\
31  fprintf(stderr, "" printf_args);\
32  fprintf(stderr, ": %s (%i): %s\n", ::uv_err_name(code), (int)(code), ::uv_strerror(code));\
33  fflush(stderr);\
34 } while (0)
35 
36 
37 uv::io in = uv::io::guess_handle(uv::loop::Default(), fileno(stdin)),
38  out = uv::io::guess_handle(uv::loop::Default(), fileno(stdout));
39 
40 constexpr std::size_t BUFFER_SIZE = 8192;
41 constexpr std::size_t WRITE_QUEUE_SIZE_UPPER_LIMIT = 128*BUFFER_SIZE,
42  WRITE_QUEUE_SIZE_LOWER_LIMIT = 16*BUFFER_SIZE;
43 
44 bool wr_err_reported = false;
45 
47 std::size_t file_write_queues_size = 0;
48 
49 
51 void write_to_stdout_cb(uv::output, uv::buffer);
52 
53 
54 int main(int _argc, char *_argv[])
55 {
56  if (!in)
57  {
58  PRINT_UV_ERR(in.uv_status(), "stdin open (%s)", in.type_name());
59  return in.uv_status();
60  }
61  DEBUG_LOG(true, "[debug] stdin: %s handle [0x%08tX]\n", in.type_name(), (ptrdiff_t)static_cast< ::uv_handle_t* >(in));
62 
63  if (!out)
64  {
65  PRINT_UV_ERR(out.uv_status(), "stdout open (%s)", out.type_name());
66  return out.uv_status();
67  }
68  DEBUG_LOG(true, "[debug] stdout: %s handle [0x%08tX]\n", out.type_name(), (ptrdiff_t)static_cast< ::uv_handle_t* >(out));
69 
70  // open files specified as the program arguments on the command line
71  constexpr int mode =
72  #ifdef _WIN32
73  _S_IREAD|_S_IWRITE;
74  #else
75  S_IRWXU|S_IRWXG|S_IRWXO;
76  #endif
77  for (int i = 1; i < _argc; ++i)
78  {
79  uv::file f(uv::loop::Default(), _argv[i], O_CREAT|O_TRUNC|O_WRONLY, mode);
80  if (f)
81  files.emplace_back(std::move(f));
82  else
83  PRINT_UV_ERR(f.uv_status(), "file open (%s)", f.path());
84  }
85 
86  // attach the handle to the loop for reading incoming data
87  in.read_start(alloc_cb,
88  [](uv::io _io, ssize_t _nread, uv::buffer _buf, int64_t _offset, void *_info) -> void
89  {
90  if (_nread < 0)
91  {
92  if (_nread != UV_EOF) PRINT_UV_ERR(_nread, "read from stdin (%s)", in.type_name());
93  _io.read_stop();
94  }
95  else if (_nread > 0)
96  {
97  // set the actual data size
98  _buf.len() = _nread;
99 
100  // write to stdout
101  uv::output io_wr;
102  io_wr.on_request() = write_to_stdout_cb;
103 
104  io_wr.run(out, _buf, _offset, _info); // note: UDP connected sockets are not supported by libuv
105  if (!io_wr)
106  {
107  PRINT_UV_ERR(io_wr.uv_status(), "write initiation to stdout (%s) at offset %" PRIi64, out.type_name(), _offset);
108  _io.read_stop();
109  }
110  }
111 
112  auto total_write_pending_bytes = out.write_queue_size() + file_write_queues_size;
113  int ret = in.read_pause(total_write_pending_bytes >= WRITE_QUEUE_SIZE_UPPER_LIMIT);
114  DEBUG_LOG(ret == 0, "[debug] read paused (total_write_pending_bytes=%zu)\n", total_write_pending_bytes);
115  }
116  );
117  if (!in)
118  {
119  PRINT_UV_ERR(in.uv_status(), "read initiation from stdin (%s)", in.type_name());
120  return in.uv_status();
121  }
122 
123  return uv::loop::Default().run(UV_RUN_DEFAULT);
124 }
125 
126 
128 {
129  constexpr std::size_t default_size = BUFFER_SIZE;
130  static std::vector< uv::buffer > buf_pool;
131 
132  for (std::size_t i = 0; i < buf_pool.size(); ++i) if (buf_pool[i].nrefs() == 1) // a spare item
133  {
134  buf_pool[i].len() = default_size; // restore the buffer capacity size
135 
136  DEBUG_LOG(true, "[debug] buffer pool (size=%zu): spare item #%zu\n", buf_pool.size(), i+1);
137  return buf_pool[i];
138  }
139 
140  buf_pool.emplace_back(uv::buffer{ default_size });
141 
142  DEBUG_LOG(true, "[debug] buffer pool (size=%zu): new item #%zu\n", buf_pool.size(), buf_pool.size());
143  return buf_pool.back();
144 }
145 
146 
147 void write_to_files(uv::buffer, int64_t);
148 
149 void write_to_stdout_cb(uv::output _wr, uv::buffer _buf)
150 {
151  if (!_wr)
152  {
153  if (!wr_err_reported) // report only the very first occurrence of the failure
154  {
155  PRINT_UV_ERR(_wr.uv_status(), "write to stdout (%s) at offset %" PRIi64, _wr.handle().type_name(), _wr.offset());
156  wr_err_reported = true;
157  }
158 
159  in.read_stop();
160  }
161  else // dispatch the data for writing to files only when it successfully has been written to stdout
162  write_to_files(_buf, _wr.offset());
163 
164  auto total_write_pending_bytes = out.write_queue_size() + file_write_queues_size;
165  int ret = in.read_resume(total_write_pending_bytes <= WRITE_QUEUE_SIZE_LOWER_LIMIT);
166  DEBUG_LOG(ret == 0, "[debug] read resumed (total_write_pending_bytes=%zu)\n", total_write_pending_bytes);
167 }
168 
169 
170 void write_to_file_cb(uv::fs::write, uv::buffer);
171 
172 void write_to_files(uv::buffer _buf, int64_t _offset)
173 {
174  for (auto &file : files)
175  {
176  uv::fs::write wr;
177  wr.on_request() = write_to_file_cb;
178 
179  wr.run(file, _buf, _offset);
180  if (wr)
181  file_write_queues_size += _buf.len();
182  else
183  PRINT_UV_ERR(wr.uv_status(), "write initiation to file (%s) at offset %" PRIi64, file.path(), wr.offset());
184  }
185 }
186 
187 
188 void write_to_file_cb(uv::fs::write _wr, uv::buffer _buf)
189 {
190  if (!_wr)
191  PRINT_UV_ERR(_wr.uv_status(), "write to file (%s) at offset %" PRIi64, _wr.handle().path(), _wr.offset());
192  else
193  {
194  file_write_queues_size -= _buf.len();
195 
196  auto total_write_pending_bytes = out.write_queue_size() + file_write_queues_size;
197  int ret = in.read_resume(total_write_pending_bytes <= WRITE_QUEUE_SIZE_LOWER_LIMIT);
198  DEBUG_LOG(ret == 0, "[debug] read resumed (total_write_pending_bytes=%zu)\n", total_write_pending_bytes);
199  }
200 }
The open file handle.
Definition: handle-fs.hpp:29
file handle() const noexcept
The file which this write request has been running on.
Definition: request-fs.hpp:471
int read_resume(bool _trigger_condition)
Resume reading data from the I/O endpoint after having been paused.
Definition: handle-io.hpp:346
decltype(uv_t::len) & len(const std::size_t _i=0) const noexcept
The .len field of the _i-th buffer structure.
Definition: buffer.hpp:242
int read_pause(bool _trigger_condition) const
Pause reading data from the I/O endpoint.
Definition: handle-io.hpp:289
int64_t offset() const noexcept
The offset this write request has been performed at.
Definition: request-fs.hpp:474
int uv_status() const noexcept
The status value returned by the last executed libuv API function on this handle. ...
int read_stop() const
Stop reading data from the I/O endpoint.
Definition: handle-io.hpp:254
The base class for the libuv handles.
Definition: handle-base.hpp:33
static loop & Default() noexcept
Returns the initialized loop that can be used as a global default loop throughout the program...
Definition: loop.hpp:220
int run(::uv_run_mode _mode)
Go into a loop and process events and their callbacks with the current thread.
Definition: loop.hpp:255
int uv_status() const noexcept
The status value returned by the last executed libuv API function on this request.
int run(io &_io, const buffer &_buf, int64_t _offset=-1, void *_info=nullptr)
Run the request with interpreting arguments as additional parameters for actual write/send request pe...
Definition: request-io.hpp:152
T signal(T... args)
T move(T... args)
The base class for handles representing I/O endpoints: a file, TCP/UDP socket, pipe, TTY.
Definition: handle-io.hpp:25
Write data to a file.
Definition: request-fs.hpp:416
int run(file &_file, const buffer &_buf, int64_t _offset)
Run the request. Write data to the _file from the buffers described by _buf object.
Definition: request-fs.hpp:483
T size(T... args)
STL class.
const char * type_name() const noexcept
A string containing the name of the handle type.
const char * path() const noexcept
The file path.
Definition: handle-fs.hpp:207
io handle() const noexcept
The I/O endpoint handle where this output request has been taking place.
Definition: request-io.hpp:115
Encapsulates uv_buf_t data type and provides uv_buf_t[] functionality.
Definition: buffer.hpp:28
T back(T... args)
static io guess_handle(uv::loop &, ::uv_file)
Create an io handle object which actual type is derived from an existing file descriptor.
Definition: handle-io.hpp:408
int64_t offset() const noexcept
The offset value specified for run() call by which this output request has been set going...
Definition: request-io.hpp:133
Generic write/send request type for I/O endpoints (files, TCP/UDP sockets, pipes, TTYs)...
Definition: request-io.hpp:31
int read_start(std::size_t _size=0, int64_t _offset=-1) const
Start reading incoming data from the I/O endpoint.
Definition: handle-io.hpp:191
T emplace_back(T... args)


Try this program with different combinations of stdin/stdout endpoints:

stdin:TTY / stdout:TTY

[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ build/example/tee1 tee-test.txt


stdin:PIPE / stdout:PIPE

[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /etc/passwd | build/example/tee1 tee-test.txt | grep root


stdin:FILE / stdout:FILE

[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ build/example/tee1 tee-test.txt </etc/passwd >tee-stdout.txt


stdin:FILE / stdout:SOCKET

[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ nc -l 127.0.0.1 54321 &
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ build/example/tee1 tee-test.txt </etc/passwd >/dev/tcp/127.0.0.1/54321


The same test as with cpio example program:

[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /dev/zero | build/example/tee1 | dd of=/dev/null iflag=fullblock bs=1M count=10000
10000+0 records in
10000+0 records out
10485760000 bytes (10 GB) copied, 38.706 s, 271 MB/s
write to stdout (pipe) at offset 10485768192: EPIPE (-32): broken pipe
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /dev/zero | build/example/tee1 | dd of=/dev/null iflag=fullblock bs=1M count=10000
write to stdout (pipe) at offset 10485776384: EPIPE (-32): broken pipe
10000+0 records in
10000+0 records out
10485760000 bytes (10 GB) copied, 38.9361 s, 269 MB/s

As it can be noted, tee1 program reports a different value of bytes written to stdout than dd has read from its stdin. (The same issue effectively takes place in tests for cpio example programs.) This is because tee's read/write loop is able to perform several successful writes into the pipe before having been notified the pipe is broken and the callback for a pending write operation would receive an error.

Using of a buffer pool does not increase the throughput performance in this case. The implemented buffer pool is intended here for illustrative purposes only, its performance is linearly degrading as long as its size is increased (by setting up WRITE_QUEUE_SIZE_UPPER_LIMIT to values on the order of 1000*FUFFER_SIZE). Another example of a buffer pool is in example/tee2.cpp. It uses uv::buffer::sink_cb_t feature. Here is part from tee2.cpp that differs from tee1.cpp:

class buffer_pool
{
private: /*data*/
bool pool_destroying = false;
std::size_t buf_size;
std::size_t num_total_items;
std::vector< uv::buffer > spare_items_pool;
public: /*constructors*/
~buffer_pool()
{
DEBUG_LOG(true, "[debug] buffer pool destroying: spare_items=%zu total_items=%zu\n", spare_items_pool.size(), num_total_items);
pool_destroying = true;
}
buffer_pool(std::size_t _buffer_size, std::size_t _init_pool_size, std::size_t _init_pool_capacity)
: buf_size(_buffer_size), num_total_items(0)
{
spare_items_pool.reserve(_init_pool_capacity);
while (_init_pool_size--) spare_items_pool.emplace_back(new_item());
}
buffer_pool(const buffer_pool&) = delete;
buffer_pool& operator =(const buffer_pool&) = delete;
buffer_pool(buffer_pool&&) noexcept = default;
buffer_pool& operator =(buffer_pool&&) noexcept = default;
private: /*functions*/
uv::buffer new_item()
{
uv::buffer ret{ buf_size };
ret.sink_cb() = [this](uv::buffer &_buf)
{
if (pool_destroying) return; // buffers, that are being destroyed on `spare_items_pool` emptying
// during its destroying when executing the `buffer_pool` class'
// destructor, are not to be added back into `spare_items_pool`
_buf.len() = buf_size; // restore buffer size
spare_items_pool.push_back(std::move(_buf));
};
++num_total_items;
DEBUG_LOG(true, "[debug] buffer pool: new item #%zu\n", num_total_items);
return std::move(ret);
}
public: /*interface*/
std::size_t buffer_size() const noexcept { return buf_size; }
std::size_t total_items() const noexcept { return num_total_items; }
std::size_t spare_items() const noexcept { return spare_items_pool.size(); }
uv::buffer get()
{
if (spare_items_pool.empty())
return new_item();
else
{
uv::buffer ret = std::move(spare_items_pool.back());
spare_items_pool.pop_back();
return std::move(ret);
}
}
} buffers(
BUFFER_SIZE,
WRITE_QUEUE_SIZE_LOWER_LIMIT/BUFFER_SIZE,
WRITE_QUEUE_SIZE_UPPER_LIMIT/BUFFER_SIZE + 1
);
uv::buffer alloc_cb(uv::handle, std::size_t) { return buffers.get(); }

Test results:

[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /dev/zero | build/example/tee2 | dd of=/dev/null iflag=fullblock bs=1M count=10000
write to stdout (pipe) at offset 10485768192: EPIPE (-32): broken pipe
10000+0 records in
10000+0 records out
10485760000 bytes (10 GB) copied, 37.9453 s, 276 MB/s
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /dev/zero | build/example/tee2 | dd of=/dev/null iflag=fullblock bs=1M count=10000
10000+0 records in
10000+0 records out
10485760000 bytes (10 GB) copied, 38.0056 s, 276 MB/s
write to stdout (pipe) at offset 10485768192: EPIPE (-32): broken pipe