uvcc
libuv C++ bindings
handle-io.hpp
1 
2 #ifndef UVCC_HANDLE_IO__HPP
3 #define UVCC_HANDLE_IO__HPP
4 
5 #include "uvcc/utility.hpp"
6 #include "uvcc/handle-base.hpp"
7 #include "uvcc/buffer.hpp"
8 #include "uvcc/loop.hpp"
9 
10 #include <cstddef> // size_t
11 #include <uv.h>
12 
13 #include <functional> // function
14 #include <mutex> // lock_guard
15 
16 
17 namespace uv
18 {
19 
20 
21 /*! \ingroup doxy_group__handle
22  \brief The base class for handles representing I/O endpoints: a file, TCP/UDP socket, pipe, TTY.
23  \details Encapsulates common I/O functions and properties.
24  \note `read_start()`/`read_stop()` and `read_pause()`/`read_resume()` functions are mutually exclusive and thread-safe. */
25 class io : public handle
26 {
27  //! \cond
28  friend class handle::instance< io >;
29  friend class output;
30  friend class fs;
31  friend class process;
32  //! \endcond
33 
34 public: /*types*/
35  using uv_t = void;
36  using on_read_t = std::function< void(io _handle, ssize_t _nread, buffer _buffer, int64_t _offset, void *_info) >;
37  /*!< \brief The function type of the callback called by `read_start()` when data was read from an I/O endpoint.
38  \details
39  The `_offset` parameter is the file offset the read operation has been performed at. For I/O endpoints that
40  are not of `uv::file` type this appears to be an artificial value which is being properly calculated basing
41  on the starting offset value from the `read_start()` call and summing the amount of all bytes previously read
42  since that `read_start()` call.
43 
44  The `_info` pointer is valid for the duration of the callback only and for `uv::udp` endpoint refers to the
45  `struct udp::io_info` supplemental data, containing information on remote peer address and received
46  message flags. For other I/O endpoint types it is a `nullptr`.
47 
48  \sa libuv API documentation: [`uv_read_cb`](http://docs.libuv.org/en/v1.x/stream.html#c.uv_read_cb),
49  [`uv_udp_recv_cb`](http://docs.libuv.org/en/v1.x/udp.html#c.uv_udp_recv_cb).
50  \note On error and EOF state this callback is supplied with a dummy _null-initialized_ `_buffer`.
51 
52  On error and EOF state the libuv API calls the user provided
53  [`uv_read_cb`](http://docs.libuv.org/en/v1.x/stream.html#c.uv_read_cb) or
54  [`uv_udp_recv_cb`](http://docs.libuv.org/en/v1.x/udp.html#c.uv_udp_recv_cb) functions with
55  a _null-initialized_ [`uv_buf_t`](http://docs.libuv.org/en/v1.x/misc.html#c.uv_buf_t) buffer structure
56  (where `buf->base = nullptr` and `buf->len = 0`) and does not try to retrieve something from the
57  [`uv_alloc_cb`](http://docs.libuv.org/en/v1.x/handle.html#c.uv_alloc_cb) callback in such a cases.
58  So the uvcc `io::on_read_t` callback is supplied with a dummy _null-initialized_ `_buffer`. */
59 
60 protected: /*types*/
61  //! \cond internals
62  //! \addtogroup doxy_group__internals
63  //! \{
64 
65  enum class rdcmd { UNKNOWN, STOP, PAUSE, START, RESUME };
66 
67  struct properties : handle::properties
68  {
69  spinlock rdstate_switch;
70  rdcmd rdcmd_state = rdcmd::UNKNOWN;
71  std::size_t rdsize = 0;
72  int64_t rdoffset = 0;
73  on_buffer_alloc_t alloc_cb;
74  on_read_t read_cb;
75  };
76 
77  struct uv_interface : virtual handle::uv_interface
78  {
79  virtual std::size_t write_queue_size(void*) const noexcept = 0;
80  virtual int read_start(void*, int64_t) const noexcept = 0;
81  virtual int read_stop(void*) const noexcept = 0;
82  };
83 
84  //! \}
85  //! \endcond
86 
87 private: /*types*/
88  using instance = handle::instance< io >;
89 
90 protected: /*constructors*/
91  //! \cond
92  io() noexcept = default;
93 
94  explicit io(uv_t *_uv_handle) : handle(static_cast< handle::uv_t* >(_uv_handle)) {}
95  //! \endcond
96 
97 public: /*constructors*/
98  ~io() = default;
99 
100  io(const io&) = default;
101  io& operator =(const io&) = default;
102 
103  io(io&&) noexcept = default;
104  io& operator =(io&&) noexcept = default;
105 
106 protected: /*functions*/
107  //! \cond
108  static void io_alloc_cb(uv_t *_uv_handle, std::size_t _suggested_size, ::uv_buf_t *_uv_buf)
109  {
110  auto &properties = instance::from(_uv_handle)->properties();
111 
112  auto &alloc_cb = properties.alloc_cb;
113  buffer &&b = alloc_cb(io(_uv_handle), properties.rdsize ? properties.rdsize : _suggested_size);
114 
115  buffer::instance::from(b.uv_buf)->ref(); // add the reference for further moving the buffer instance into io_read_cb() parameter
116  *_uv_buf = b[0];
117  }
118 
119  static void io_read_cb(uv_t *_uv_handle, ssize_t _nread, const ::uv_buf_t *_uv_buf, void *_info)
120  {
121  auto instance_ptr = instance::from(_uv_handle);
122  auto &properties = instance_ptr->properties();
123 
124  instance_ptr->uv_error = _nread;
125 
126  auto &read_cb = properties.read_cb;
127  if (_uv_buf->base)
128  read_cb(io(_uv_handle), _nread, buffer(buffer::instance::uv_buf::from(_uv_buf->base), adopt_ref), properties.rdoffset, _info);
129  // don't forget to specify adopt_ref flag when using ref_guard to unref the object
130  // don't use ref_guard unless it really needs to hold on the object until the scope end
131  // use move/transfer semantics instead if you need just pass the object to another function for further processing
132  else
133  read_cb(io(_uv_handle), _nread, buffer(), properties.rdoffset, _info);
134 
135  if (_nread > 0) properties.rdoffset += _nread;
136  }
137 
138 #if 0
139  template< typename = void > static void file_read_cb(::uv_fs_t*);
140  template< typename = void > static void stream_read_cb(::uv_stream_t*, ssize_t, const ::uv_buf_t*);
141  template< typename = void > static void udp_recv_cb(::uv_udp_t*, ssize_t, const ::uv_buf_t*, const ::sockaddr*, unsigned int);
142 #endif
143  //! \endcond
144 
145 public: /*interface*/
146  /*! \brief The amount of queued bytes waiting to be written/sent to the I/O endpoint.
147  \sa libuv API documentation: [`uv_stream_t.write_queue_size`](http://docs.libuv.org/en/v1.x/stream.html#c.uv_stream_t.write_queue_size),
148  [`uv_udp_t.send_queue_size`](http://docs.libuv.org/en/v1.x/udp.html#c.uv_udp_t.send_queue_size). */
149  std::size_t write_queue_size() const noexcept { return instance::from(uv_handle)->uv_interface()->write_queue_size(uv_handle); }
150 
151  /*! \brief Set the input buffer allocation callback. */
152  on_buffer_alloc_t& on_alloc() const noexcept { return instance::from(uv_handle)->properties().alloc_cb; }
153 
154  /*! \brief Set the read callback function. */
155  on_read_t& on_read() const noexcept { return instance::from(uv_handle)->properties().read_cb; }
156 
157  /*! \brief Start reading incoming data from the I/O endpoint.
158  \details The appropriate input buffer allocation and read callbacks should be explicitly provided
159  before with `on_alloc()` and `on_read()` functions. Otherwise, `UV_EINVAL` error is returned with
160  no involving any libuv API function.
161 
162  Repeated call to this function results in the automatic call to `read_stop()` first.
163 
164  Parameters are:
165  \arg `_size` - can be set to specify suggested length of the read buffer.
166  \arg `_offset` - the starting offset for reading from. It is primarily intended for `uv::file` I/O endpoints
167  and the default value of \b -1 means using of the current file position. For other I/O endpoint
168  types it is used as a starting value for `_offset` argument of `io::on_read_t` callback function,
169  and the default value of \b -1 means to continue calculating from the offset calculated and internally
170  stored after the last read operation (or, if it has never been started yet, from the initial value of \b 0).
171 
172  \note On successful start this function adds an extra reference to the handle instance,
173  which is released when the counterpart function `read_stop()` (or `read_pause()`) is called.
174 
175  For `uv::stream` and `uv::udp` endpoints the function is just a wrapper around
176  [`uv_read_start()`](http://docs.libuv.org/en/v1.x/stream.html#c.uv_read_start) and
177  [`uv_udp_recv_start()`](http://docs.libuv.org/en/v1.x/udp.html#c.uv_udp_recv_start) libuv facilities.
178 
179  For `uv::file` endpoint it implements a chain of calls for
180  [`uv_fs_read()`](http://docs.libuv.org/en/v1.x/fs.html#c.uv_fs_read) libuv API function with the user provided
181  callback function. If the user callback does not stop the read loop by calling `read_stop()` function,
182  the next call is automatically performed after the callback returns. The user callback receives a number of bytes
183  read during the current iteration, or EOF or error state just like if it would be a
184  [`uv_read_cb`](http://docs.libuv.org/en/v1.x/stream.html#c.uv_read_cb) callback for `uv::stream` endpoint.
185  Note that even on EOF or error state having been reached the loop keeps trying to read from the file until `read_stop()`
186  is explicitly called.
187 
188  \sa libuv API documentation: [`uv_fs_read()`](http://docs.libuv.org/en/v1.x/fs.html#c.uv_fs_read),
189  [`uv_read_start()`](http://docs.libuv.org/en/v1.x/stream.html#c.uv_read_start),
190  [`uv_udp_recv_start()`](http://docs.libuv.org/en/v1.x/udp.html#c.uv_udp_recv_start). */
191  int read_start(std::size_t _size = 0, int64_t _offset = -1) const
192  {
193  auto instance_ptr = instance::from(uv_handle);
194  auto &properties = instance_ptr->properties();
195 
196  std::lock_guard< decltype(properties.rdstate_switch) > lk(properties.rdstate_switch);
197 
198  if (!properties.alloc_cb or !properties.read_cb) return uv_status(UV_EINVAL);
199 
200  auto rdcmd_state0 = properties.rdcmd_state;
201 
202  properties.rdcmd_state = rdcmd::START;
203  instance_ptr->ref(); // REF:START -- make sure it will exist for the future io_read_cb() calls until read_stop()/read_pause()
204 
205  switch (rdcmd_state0)
206  {
207  case rdcmd::UNKNOWN:
208  case rdcmd::STOP:
209  case rdcmd::PAUSE:
210  break;
211  case rdcmd::START:
212  case rdcmd::RESUME:
213  uv_status(instance_ptr->uv_interface()->read_stop(uv_handle));
214  instance_ptr->unref(); // UNREF:RESTART -- emulate reat_stop()
215  break;
216  }
217 
218  properties.rdsize = _size;
219 
220  uv_status(0);
221  auto uv_ret = instance_ptr->uv_interface()->read_start(uv_handle, _offset);
222  if (uv_ret < 0)
223  {
224  uv_status(uv_ret);
225  properties.rdcmd_state = rdcmd::UNKNOWN;
226  instance_ptr->unref(); // UNREF:START_FAILURE -- release the extra reference on failure
227  }
228 
229  return uv_ret;
230  }
231 
232  /*! \brief Start reading incoming data from the I/O endpoint with provided
233  input buffer allocation callback (`_alloc_cb`) and read callback function (`_read_cb`).
234  \details This is equivalent for
235  ```
236  io.on_alloc() = _alloc_cb;
237  io.on_read() = _read_cb;
238  io.read_start(_size, _offset);
239  ```
240  \sa `io::read_start()` */
241  int read_start(const on_buffer_alloc_t &_alloc_cb, const on_read_t &_read_cb, std::size_t _size = 0, int64_t _offset = -1) const
242  {
243  auto &properties = instance::from(uv_handle)->properties();
244 
245  properties.alloc_cb = _alloc_cb;
246  properties.read_cb = _read_cb;
247 
248  return read_start(_size, _offset);
249  }
250 
251  /*! \brief Stop reading data from the I/O endpoint.
252  \sa libuv API documentation: [`uv_read_stop()`](http://docs.libuv.org/en/v1.x/stream.html#c.uv_read_stop),
253  [`uv_udp_recv_stop()`](http://docs.libuv.org/en/v1.x/udp.html#c.uv_udp_recv_stop). */
254  int read_stop() const
255  {
256  auto instance_ptr = instance::from(uv_handle);
257  auto &properties = instance_ptr->properties();
258 
259  std::lock_guard< decltype(properties.rdstate_switch) > lk(properties.rdstate_switch);
260 
261  auto rdcmd_state0 = properties.rdcmd_state;
262  properties.rdcmd_state = rdcmd::STOP;
263 
264  auto uv_ret = uv_status(instance_ptr->uv_interface()->read_stop(uv_handle));
265 
266  switch (rdcmd_state0)
267  {
268  case rdcmd::UNKNOWN:
269  case rdcmd::STOP:
270  case rdcmd::PAUSE:
271  break;
272  case rdcmd::START:
273  case rdcmd::RESUME:
274  instance_ptr->unref(); // UNREF:STOP -- release the reference from read_start()/read_resume()
275  break;
276  }
277 
278  return uv_ret;
279  }
280 
281  /*! \brief Pause reading data from the I/O endpoint.
282  \details
283  \arg `read_pause(true)` - a \e "pause" command that is functionally equivalent to `read_stop()`. Returns `0`
284  or relevant libuv error code. Exit code of `2` is returned if the handle is not currently in reading state.
285  \arg `read_pause(false)` - a _no-op_ command that returns immediately with exit code of `1`.
286 
287  To be used in conjunction with `read_resume()` control command.
288  \sa `io::read_resume()` */
289  int read_pause(bool _trigger_condition) const
290  {
291  if (!_trigger_condition) return 1;
292 
293  auto instance_ptr = instance::from(uv_handle);
294  auto &properties = instance_ptr->properties();
295 
296  std::lock_guard< decltype(properties.rdstate_switch) > lk(properties.rdstate_switch);
297 
298  int ret = 2;
299  switch (properties.rdcmd_state)
300  {
301  case rdcmd::UNKNOWN:
302  case rdcmd::STOP:
303  case rdcmd::PAUSE:
304  break;
305  case rdcmd::START:
306  case rdcmd::RESUME:
307  properties.rdcmd_state = rdcmd::PAUSE;
308  ret = uv_status(instance_ptr->uv_interface()->read_stop(uv_handle));
309  instance_ptr->unref(); // UNREF:PAUSE -- functionally equivalent to read_stop()
310  break;
311  }
312  return ret;
313  }
314 
315  /*! \brief Resume reading data from the I/O endpoint after having been paused.
316  \details
317  \arg `read_resume(true)` - a \e "resume" command that is functionally equivalent to `read_start()` except
318  that it cannot \e "start" and is refused if the previous control command was not \e "pause" (i.e. `read_pause()`).
319  Returns `0` or relevant libuv error code. Exit code of `2` is returned if the handle is not in read pause state.
320  \arg `read_resume(false)` - a _no-op_ command that returns immediately with exit code of `1`.
321 
322  To be used in conjunction with `read_pause()` control command.
323  \note On successful resuming this function adds an extra reference to the handle instance,
324  which is released when the counterpart function `read_pause()` (or `read_stop()`) is called.
325 
326  The different control command interoperating semantics is described as follows:
327  \verbatim
328  ║ The command having been executed previously
329  The command to ╟─────────┬──────────────┬───────────────────┬─────────────┬──────────────────
330  be currently ║ No │ read_start() │ read_resume(true) │ read_stop() │ read_pause(true)
331  executed ║ command │ │ │ │
332  ═══════════════════╬═════════╪══════════════╪═══════════════════╪═════════════╪══════════════════
333  read_start() ║ "start" │ "restart" │ "restart" │ "start" │ "start"
334  read_resume(true) ║ - │ - │ - │ - │ "resume"
335  read_stop() ║ "stop" │ "stop" │ "stop" │ "stop" │ "stop"
336  read_pause(true) ║ - │ "pause" │ "pause" │ - │ -
337 
338  "resume" is functionally equivalent to "start"
339  "pause" is functionally equivalent to "stop"
340  "restart" is functionally equivalent to "stop" followed by "start"
341  \endverbatim
342 
343  `read_pause()`/`read_resume()` commands are intended for temporary pausing the read process for example in
344  such a cases when a consumer which the data being read is sent to becomes overwhelmed with them and its input queue
345  (and/or a sender output queue) has been considerably increased. */
346  int read_resume(bool _trigger_condition)
347  {
348  if (!_trigger_condition) return 1;
349 
350  auto instance_ptr = instance::from(uv_handle);
351  auto &properties = instance_ptr->properties();
352 
353  std::lock_guard< decltype(properties.rdstate_switch) > lk(properties.rdstate_switch);
354 
355  int ret = 2;
356  switch (properties.rdcmd_state)
357  {
358  case rdcmd::UNKNOWN:
359  case rdcmd::STOP:
360  break;
361  case rdcmd::PAUSE:
362  properties.rdcmd_state = rdcmd::RESUME;
363  instance_ptr->ref(); // REF:RESUME -- functionally equivalent to read_start()
364 
365  uv_status(0);
366  ret = instance_ptr->uv_interface()->read_start(uv_handle, properties.rdoffset);
367  if (ret < 0)
368  {
369  uv_status(ret);
370  properties.rdcmd_state = rdcmd::UNKNOWN;
371  instance_ptr->unref(); // UNREF:RESUME_FAILURE -- release the extra reference on failure
372  }
373 
374  break;
375  case rdcmd::START:
376  case rdcmd::RESUME:
377  break;
378  }
379  return ret;
380  }
381 
382  /*! \brief Create an `io` handle object which actual type is derived from an existing file descriptor.
383  \details Supported file desctriptors:
384  - Windows: pipe, tty, file
385  - Unix-like systems: pipe, tty, file, tcp/udp socket
386  .
387  \sa libuv API documentation: [`uv_guess_handle()`](http://docs.libuv.org/en/v1.x/misc.html#c.uv_guess_handle). */
388  static io guess_handle(uv::loop&, ::uv_file);
389 
390 public: /*conversion operators*/
391  explicit operator const uv_t*() const noexcept { return static_cast< const uv_t* >(uv_handle); }
392  explicit operator uv_t*() noexcept { return static_cast< uv_t* >(uv_handle); }
393 };
394 
395 
396 }
397 
398 
399 #include "uvcc/handle-stream.hpp"
400 #include "uvcc/handle-fs.hpp"
401 #include "uvcc/handle-udp.hpp"
402 
403 
404 namespace uv
405 {
406 
407 
408 inline io io::guess_handle(uv::loop &_loop, ::uv_file _fd)
409 {
410  io ret;
411 
412  switch (::uv_guess_handle(_fd))
413  {
414  default:
415  case UV_UNKNOWN_HANDLE: ret.uv_status(UV_EBADF);
416  break;
417  case UV_NAMED_PIPE: ret = pipe(_loop, _fd, false, false);
418  break;
419  case UV_TCP: ret = tcp(_loop, _fd, false);
420  break;
421  case UV_TTY: ret = tty(_loop, _fd, true, false);
422  break;
423  case UV_UDP: ret = udp(_loop, static_cast< ::uv_os_sock_t >(_fd));
424  break;
425  case UV_FILE: ret = file(_loop, _fd);
426  break;
427  }
428 
429  return ret;
430 }
431 
432 
433 }
434 
435 
436 #endif
Namespace for all uvcc definitions.
Definition: buffer.hpp:17
operator bool() const noexcept
Equivalent to (base() != nullptr).
Definition: buffer.hpp:248
A simple spinlock mutex built around std::atomic_flag.
Definition: utility.hpp:304
int read_resume(bool _trigger_condition)
Resume reading data from the I/O endpoint after having been paused.
Definition: handle-io.hpp:346
int read_start(const on_buffer_alloc_t &_alloc_cb, const on_read_t &_read_cb, std::size_t _size=0, int64_t _offset=-1) const
Start reading incoming data from the I/O endpoint with provided input buffer allocation callback (_al...
Definition: handle-io.hpp:241
int read_pause(bool _trigger_condition) const
Pause reading data from the I/O endpoint.
Definition: handle-io.hpp:289
std::size_t write_queue_size() const noexcept
The amount of queued bytes waiting to be written/sent to the I/O endpoint.
Definition: handle-io.hpp:149
int read_stop() const
Stop reading data from the I/O endpoint.
Definition: handle-io.hpp:254
The base class for the libuv handles.
Definition: handle-base.hpp:33
The I/O event loop class.
Definition: loop.hpp:33
The base class for handles representing I/O endpoints: a file, TCP/UDP socket, pipe, TTY.
Definition: handle-io.hpp:25
on_buffer_alloc_t & on_alloc() const noexcept
Set the input buffer allocation callback.
Definition: handle-io.hpp:152
static io guess_handle(uv::loop &, ::uv_file)
Create an io handle object which actual type is derived from an existing file descriptor.
Definition: handle-io.hpp:408
int read_start(std::size_t _size=0, int64_t _offset=-1) const
Start reading incoming data from the I/O endpoint.
Definition: handle-io.hpp:191
on_read_t & on_read() const noexcept
Set the read callback function.
Definition: handle-io.hpp:155