A full-fledged simple program that copies its stdin
to stdout
and also to each file specified as a program argument.
It demonstrates the following points:
- Different I/O endpoints and common input/output operations for them can be handled in a generic uniform manner. The program can deal with
stdin
, stdout
handles that can be system descriptors of any sort of supported I/O endpoints: a file, TCP/UDP socket, pipe, or TTY. Minimal changes relative to the previous example for cpio program need to be made in source code to get a full-fledged application.
- The very same uvcc buffer can be easily dispatched to several asynchronous operations and its lifetime will continue until the last operation has completed.
- An example for a simple version of the buffer pool implementation. The pool helps avoid intense memory allocation requests. Note that it may actually decrease the performance. The provided simple implementation is an auto-growing pool that is not thread-safe, so either only one dedicated thread might acquire an item from the pool in multi-thread environment or such acquire requests from different threads shall not interleave with each other. As far as this program is a typical single-thread libuv application these requirements are fulfilled. The condition of item's
(nrefs() == 1)
indicates that no more references are left anywhere at the runtime other than in the pool container itself and thus this spare item can be returned on an acquire request.
In debug build some diagnostic messages are printed out.
- See also
- stackoverflow.com: "libuv allocated memory buffers re-use techniques"
example/tee1.cpp
11 sighandler_t sigpipe_handler =
signal(SIGPIPE, SIG_IGN);
16 #define DEBUG_LOG(condition, format, ...) do {\ 20 fprintf(stderr, (format), ##__VA_ARGS__);\ 25 #define DEBUG_LOG(condition, ...) (void)(condition) 29 #define PRINT_UV_ERR(code, printf_args...) do {\ 31 fprintf(stderr, "" printf_args);\ 32 fprintf(stderr, ": %s (%i): %s\n", ::uv_err_name(code), (int)(code), ::uv_strerror(code));\ 41 constexpr
std::size_t WRITE_QUEUE_SIZE_UPPER_LIMIT = 128*BUFFER_SIZE,
42 WRITE_QUEUE_SIZE_LOWER_LIMIT = 16*BUFFER_SIZE;
44 bool wr_err_reported =
false;
54 int main(
int _argc,
char *_argv[])
61 DEBUG_LOG(
true,
"[debug] stdin: %s handle [0x%08tX]\n", in.
type_name(), (ptrdiff_t)static_cast< ::uv_handle_t* >(in));
65 PRINT_UV_ERR(out.uv_status(),
"stdout open (%s)", out.type_name());
66 return out.uv_status();
68 DEBUG_LOG(
true,
"[debug] stdout: %s handle [0x%08tX]\n", out.type_name(), (ptrdiff_t)static_cast< ::uv_handle_t* >(out));
75 S_IRWXU|S_IRWXG|S_IRWXO;
77 for (
int i = 1; i < _argc; ++i)
83 PRINT_UV_ERR(f.uv_status(),
"file open (%s)", f.path());
88 [](
uv::io _io, ssize_t _nread,
uv::buffer _buf, int64_t _offset,
void *_info) ->
void 92 if (_nread != UV_EOF) PRINT_UV_ERR(_nread,
"read from stdin (%s)", in.
type_name());
102 io_wr.on_request() = write_to_stdout_cb;
104 io_wr.
run(out, _buf, _offset, _info);
107 PRINT_UV_ERR(io_wr.
uv_status(),
"write initiation to stdout (%s) at offset %" PRIi64, out.type_name(), _offset);
112 auto total_write_pending_bytes = out.write_queue_size() + file_write_queues_size;
113 int ret = in.
read_pause(total_write_pending_bytes >= WRITE_QUEUE_SIZE_UPPER_LIMIT);
114 DEBUG_LOG(ret == 0,
"[debug] read paused (total_write_pending_bytes=%zu)\n", total_write_pending_bytes);
132 for (
std::size_t i = 0; i < buf_pool.
size(); ++i)
if (buf_pool[i].nrefs() == 1)
134 buf_pool[i].len() = default_size;
136 DEBUG_LOG(
true,
"[debug] buffer pool (size=%zu): spare item #%zu\n", buf_pool.
size(), i+1);
142 DEBUG_LOG(
true,
"[debug] buffer pool (size=%zu): new item #%zu\n", buf_pool.
size(), buf_pool.
size());
143 return buf_pool.
back();
153 if (!wr_err_reported)
156 wr_err_reported =
true;
162 write_to_files(_buf, _wr.
offset());
164 auto total_write_pending_bytes = out.write_queue_size() + file_write_queues_size;
165 int ret = in.
read_resume(total_write_pending_bytes <= WRITE_QUEUE_SIZE_LOWER_LIMIT);
166 DEBUG_LOG(ret == 0,
"[debug] read resumed (total_write_pending_bytes=%zu)\n", total_write_pending_bytes);
172 void write_to_files(
uv::buffer _buf, int64_t _offset)
174 for (
auto &file : files)
177 wr.on_request() = write_to_file_cb;
179 wr.
run(file, _buf, _offset);
181 file_write_queues_size += _buf.
len();
183 PRINT_UV_ERR(wr.
uv_status(),
"write initiation to file (%s) at offset %" PRIi64, file.path(), wr.
offset());
194 file_write_queues_size -= _buf.
len();
196 auto total_write_pending_bytes = out.write_queue_size() + file_write_queues_size;
197 int ret = in.
read_resume(total_write_pending_bytes <= WRITE_QUEUE_SIZE_LOWER_LIMIT);
198 DEBUG_LOG(ret == 0,
"[debug] read resumed (total_write_pending_bytes=%zu)\n", total_write_pending_bytes);
file handle() const noexcept
The file which this write request has been running on.
int read_resume(bool _trigger_condition)
Resume reading data from the I/O endpoint after having been paused.
decltype(uv_t::len) & len(const std::size_t _i=0) const noexcept
The .len field of the _i-th buffer structure.
int read_pause(bool _trigger_condition) const
Pause reading data from the I/O endpoint.
int64_t offset() const noexcept
The offset this write request has been performed at.
int uv_status() const noexcept
The status value returned by the last executed libuv API function on this handle. ...
int read_stop() const
Stop reading data from the I/O endpoint.
The base class for the libuv handles.
static loop & Default() noexcept
Returns the initialized loop that can be used as a global default loop throughout the program...
int run(::uv_run_mode _mode)
Go into a loop and process events and their callbacks with the current thread.
int uv_status() const noexcept
The status value returned by the last executed libuv API function on this request.
int run(io &_io, const buffer &_buf, int64_t _offset=-1, void *_info=nullptr)
Run the request with interpreting arguments as additional parameters for actual write/send request pe...
The base class for handles representing I/O endpoints: a file, TCP/UDP socket, pipe, TTY.
int run(file &_file, const buffer &_buf, int64_t _offset)
Run the request. Write data to the _file from the buffers described by _buf object.
const char * type_name() const noexcept
A string containing the name of the handle type.
const char * path() const noexcept
The file path.
io handle() const noexcept
The I/O endpoint handle where this output request has been taking place.
Encapsulates uv_buf_t data type and provides uv_buf_t[] functionality.
static io guess_handle(uv::loop &, ::uv_file)
Create an io handle object which actual type is derived from an existing file descriptor.
int64_t offset() const noexcept
The offset value specified for run() call by which this output request has been set going...
Generic write/send request type for I/O endpoints (files, TCP/UDP sockets, pipes, TTYs)...
int read_start(std::size_t _size=0, int64_t _offset=-1) const
Start reading incoming data from the I/O endpoint.
T emplace_back(T... args)
Try this program with different combinations of stdin
/stdout
endpoints:
stdin
:TTY / stdout
:TTY
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ build/example/tee1 tee-test.txt
stdin
:PIPE / stdout
:PIPE
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /etc/passwd | build/example/tee1 tee-test.txt | grep root
stdin
:FILE / stdout
:FILE
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ build/example/tee1 tee-test.txt </etc/passwd >tee-stdout.txt
stdin
:FILE / stdout
:SOCKET
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ nc -l 127.0.0.1 54321 &
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ build/example/tee1 tee-test.txt </etc/passwd >/dev/tcp/127.0.0.1/54321
The same test as with cpio example program:
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /dev/zero | build/example/tee1 | dd of=/dev/null iflag=fullblock bs=1M count=10000
10000+0 records in
10000+0 records out
10485760000 bytes (10 GB) copied, 38.706 s, 271 MB/s
write to stdout (pipe) at offset 10485768192: EPIPE (-32): broken pipe
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /dev/zero | build/example/tee1 | dd of=/dev/null iflag=fullblock bs=1M count=10000
write to stdout (pipe) at offset 10485776384: EPIPE (-32): broken pipe
10000+0 records in
10000+0 records out
10485760000 bytes (10 GB) copied, 38.9361 s, 269 MB/s
As it can be noted, tee1 program reports a different value of bytes written to stdout
than dd has read from its stdin
. (The same issue effectively takes place in tests for cpio example programs.) This is because tee's read/write loop is able to perform several successful writes into the pipe before having been notified the pipe is broken and the callback for a pending write operation would receive an error.
Using of a buffer pool does not increase the throughput performance in this case. The implemented buffer pool is intended here for illustrative purposes only, its performance is linearly degrading as long as its size is increased (by setting up WRITE_QUEUE_SIZE_UPPER_LIMIT
to values on the order of 1000*FUFFER_SIZE
). Another example of a buffer pool is in example/tee2.cpp. It uses uv::buffer::sink_cb_t
feature. Here is part from tee2.cpp that differs from tee1.cpp:
class buffer_pool
{
private:
bool pool_destroying = false;
public:
~buffer_pool()
{
DEBUG_LOG(
true,
"[debug] buffer pool destroying: spare_items=%zu total_items=%zu\n", spare_items_pool.
size(), num_total_items);
pool_destroying = true;
}
: buf_size(_buffer_size), num_total_items(0)
{
spare_items_pool.
reserve(_init_pool_capacity);
while (_init_pool_size--) spare_items_pool.
emplace_back(new_item());
}
buffer_pool(const buffer_pool&) = delete;
buffer_pool& operator =(const buffer_pool&) = delete;
buffer_pool(buffer_pool&&) noexcept = default;
buffer_pool& operator =(buffer_pool&&) noexcept = default;
private:
{
{
if (pool_destroying) return;
};
++num_total_items;
DEBUG_LOG(true, "[debug] buffer pool: new item #%zu\n", num_total_items);
}
public:
std::size_t buffer_size() const noexcept {
return buf_size; }
std::size_t total_items() const noexcept {
return num_total_items; }
std::size_t spare_items() const noexcept {
return spare_items_pool.
size(); }
{
if (spare_items_pool.
empty())
return new_item();
else
{
}
}
} buffers(
BUFFER_SIZE,
WRITE_QUEUE_SIZE_LOWER_LIMIT/BUFFER_SIZE,
WRITE_QUEUE_SIZE_UPPER_LIMIT/BUFFER_SIZE + 1
);
Test results:
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /dev/zero | build/example/tee2 | dd of=/dev/null iflag=fullblock bs=1M count=10000
write to stdout (pipe) at offset 10485768192: EPIPE (-32): broken pipe
10000+0 records in
10000+0 records out
10485760000 bytes (10 GB) copied, 37.9453 s, 276 MB/s
[mike@u250 /mnt/sda3/wroot/libuv/uvcc/uvcc.git]
$ cat /dev/zero | build/example/tee2 | dd of=/dev/null iflag=fullblock bs=1M count=10000
10000+0 records in
10000+0 records out
10485760000 bytes (10 GB) copied, 38.0056 s, 276 MB/s
write to stdout (pipe) at offset 10485768192: EPIPE (-32): broken pipe