async.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. /** @file async.c
  2. * @brief Asynchronous input and output.
  3. *
  4. * Class uses non-blocking input and output and has internal input and output
  5. * buffers. Class has associated file descriptor to work with it.
  6. *
  7. * For async writing user uses faux_async_write() function. It writes all
  8. * given data to internal buffer and then tries to really write it to file
  9. * descriptor. If not all data was written in non-blocking mode then function
  10. * executes special callback "stall" function to inform us about non-empty
  11. * output buffer. "Stall" callback function can make programm to inspect fd
  12. * for write possibility. Then programm must call faux_async_out() to really
  13. * write the rest of the data to fd. Function also can execute "stall" callback.
  14. *
  15. * For async reading user can call faux_sync_in(). For example this function
  16. * can be called after select() or poll() when data is available on interested
  17. * fd. Function reads data in non-blocking mode and stores to internal buffer.
  18. * User can specify read "limits" - min and max. When amount of reded data is
  19. * greater or equal to "min" limit then "read" callback will be executed.
  20. * The "read" callback will get allocated buffer with received data. The
  21. * length of the data is greater or equal to "min" limit and less or equal to
  22. * "max" limit.
  23. */
  24. #ifdef HAVE_CONFIG_H
  25. #include "config.h"
  26. #endif /* HAVE_CONFIG_H */
  27. #include <stdlib.h>
  28. #include <stdint.h>
  29. #include <stdio.h>
  30. #include <string.h>
  31. #include <assert.h>
  32. #include <unistd.h>
  33. #include <errno.h>
  34. #include <sys/types.h>
  35. #include <sys/stat.h>
  36. #include <fcntl.h>
  37. #include "faux/faux.h"
  38. #include "faux/str.h"
  39. #include "faux/net.h"
  40. #include "faux/async.h"
  41. #include "private.h"
  42. #define DATA_CHUNK 4096
  43. /** @brief Create new async I/O object.
  44. *
  45. * Constructor gets associated file descriptor to operate on it. File
  46. * descriptor must be nonblocked. If not so then constructor will set
  47. * nonblock flag itself.
  48. *
  49. * @param [in] fd File descriptor.
  50. * @return Allocated object or NULL on error.
  51. */
  52. faux_async_t *faux_async_new(int fd)
  53. {
  54. faux_async_t *async = NULL;
  55. int fflags = 0;
  56. // Prepare FD
  57. if (fd < 0) // Illegal fd
  58. return NULL;
  59. if ((fflags = fcntl(fd, F_GETFL)) == -1)
  60. return NULL;
  61. if (fcntl(fd, F_SETFL, fflags | O_NONBLOCK) == -1)
  62. return NULL;
  63. async = faux_zmalloc(sizeof(*async));
  64. assert(async);
  65. if (!async)
  66. return NULL;
  67. // Init
  68. async->fd = fd;
  69. // Read (Input)
  70. async->read_cb = NULL;
  71. async->read_udata = NULL;
  72. async->min = 1;
  73. async->max = FAUX_ASYNC_UNLIMITED;
  74. async->i_list = faux_list_new(FAUX_LIST_UNSORTED, FAUX_LIST_NONUNIQUE,
  75. NULL, NULL, faux_free);
  76. async->i_rpos = 0;
  77. async->i_wpos = 0;
  78. async->i_size = 0;
  79. async->i_overflow = 10000000l; // ~ 10M
  80. // Write (Output)
  81. async->stall_cb = NULL;
  82. async->stall_udata = NULL;
  83. async->o_overflow = 10000000l; // ~ 10M
  84. async->o_list = faux_list_new(FAUX_LIST_UNSORTED, FAUX_LIST_NONUNIQUE,
  85. NULL, NULL, faux_free);
  86. async->o_rpos = 0;
  87. async->o_wpos = 0;
  88. async->o_size = 0;
  89. return async;
  90. }
  91. /** @brief Free async I/O object.
  92. *
  93. * @param [in] Async I/O object.
  94. */
  95. void faux_async_free(faux_async_t *async)
  96. {
  97. if (!async)
  98. return;
  99. faux_list_free(async->i_list);
  100. faux_list_free(async->o_list);
  101. faux_free(async);
  102. }
  103. /** @brief Get file descriptor from async I/O object.
  104. *
  105. * @param [in] async Allocated and initialized async I/O object.
  106. * @return Serviced file descriptor.
  107. */
  108. int faux_async_fd(const faux_async_t *async)
  109. {
  110. assert(async);
  111. if (!async)
  112. return -1;
  113. return async->fd;
  114. }
  115. /** @brief Set read callback and associated user data.
  116. *
  117. * If callback function pointer is NULL then class will drop all readed data.
  118. *
  119. * @param [in] async Allocated and initialized async I/O object.
  120. * @param [in] read_cb Read callback.
  121. * @param [in] user_data Associated user data.
  122. */
  123. void faux_async_set_read_cb(faux_async_t *async,
  124. faux_async_read_cb_fn read_cb, void *user_data)
  125. {
  126. assert(async);
  127. if (!async)
  128. return;
  129. async->read_cb = read_cb;
  130. async->read_udata = user_data;
  131. }
  132. /** @brief Set read limits.
  133. *
  134. * Read limits define conditions when the read callback will be executed.
  135. * Buffer must contain data amount greater or equal to "min" value. Callback
  136. * will not get data amount greater than "max" value. If min == max then
  137. * callback will be executed with fixed data size. The "max" value can be "0".
  138. * It means indefinite i.e. data transferred to callback can be really large.
  139. *
  140. * @param [in] async Allocated and initialized async I/O object.
  141. * @param [in] min Minimal data amount.
  142. * @param [in] max Maximal data amount. The "0" means indefinite.
  143. * @return BOOL_TRUE - success, BOOL_FALSE - error.
  144. */
  145. bool_t faux_async_set_read_limits(faux_async_t *async, size_t min, size_t max)
  146. {
  147. assert(async);
  148. if (!async)
  149. return BOOL_FALSE;
  150. if (min < 1)
  151. return BOOL_FALSE;
  152. if ((min > max) && (max != 0))
  153. return BOOL_FALSE;
  154. async->min = min;
  155. async->max = max;
  156. return BOOL_TRUE;
  157. }
  158. /** @brief Set stall callback and associated user data.
  159. *
  160. * @param [in] async Allocated and initialized async I/O object.
  161. * @param [in] stall_cb Stall callback.
  162. * @param [in] user_data Associated user data.
  163. */
  164. void faux_async_set_stall_cb(faux_async_t *async,
  165. faux_async_stall_cb_fn stall_cb, void *user_data)
  166. {
  167. assert(async);
  168. if (!async)
  169. return;
  170. async->stall_cb = stall_cb;
  171. async->stall_udata = user_data;
  172. }
  173. /** @brief Set write overflow value.
  174. *
  175. * "Overflow" is a value when engine consider data consumer as a stalled.
  176. * Data gets into the async I/O object buffer but object can't write it to
  177. * serviced fd for too long time. So it accumulates great amount of data.
  178. *
  179. * @param [in] async Allocated and initialized async I/O object.
  180. * @param [in] overflow Overflow value.
  181. */
  182. void faux_async_set_write_overflow(faux_async_t *async, size_t overflow)
  183. {
  184. assert(async);
  185. if (!async)
  186. return;
  187. async->o_overflow = overflow;
  188. }
  189. /** @brief Set read overflow value.
  190. *
  191. * "Overflow" is a value when engine consider data consumer as a stalled.
  192. * Data gets into the async I/O object buffer but object can't write it to
  193. * serviced fd for too long time. So it accumulates great amount of data.
  194. *
  195. * @param [in] async Allocated and initialized async I/O object.
  196. * @param [in] overflow Overflow value.
  197. */
  198. void faux_async_set_read_overflow(faux_async_t *async, size_t overflow)
  199. {
  200. assert(async);
  201. if (!async)
  202. return;
  203. async->i_overflow = overflow;
  204. }
  205. /** @brief Get amount of unused space within current data chunk.
  206. *
  207. * Inernal static function.
  208. *
  209. * @param [in] list Internal buffer (list of chunks) to inspect.
  210. * @param [in] pos Current write position within last chunk
  211. * @return Size of unused space or < 0 on error.
  212. */
  213. static ssize_t free_space(faux_list_t *list, size_t pos)
  214. {
  215. if (!list)
  216. return -1;
  217. if (faux_list_len(list) == 0)
  218. return 0;
  219. return (DATA_CHUNK - pos);
  220. }
  221. /** @brief Async data write.
  222. *
  223. * All given data will be stored to internal buffer (list of data chunks).
  224. * Then function will try to write stored data to file descriptor in
  225. * non-blocking mode. Note some data can be left within buffer. In this case
  226. * the "stall" callback will be executed to inform about it. To try to write
  227. * the rest of the data user can be call faux_async_out() function. Both
  228. * functions will not block.
  229. *
  230. * @param [in] async Allocated and initialized async I/O object.
  231. * @param [in] data Data buffer to write.
  232. * @param [in] len Data length to write.
  233. * @return Length of stored/writed data or < 0 on error.
  234. */
  235. ssize_t faux_async_write(faux_async_t *async, void *data, size_t len)
  236. {
  237. void *new_chunk = NULL;
  238. size_t data_left = len;
  239. assert(async);
  240. if (!async)
  241. return -1;
  242. assert(data);
  243. if (!data)
  244. return -1;
  245. while (data_left != 0) {
  246. ssize_t bytes_free = 0;
  247. size_t copy_len = 0;
  248. char *chunk_ptr = NULL;
  249. // Allocate new chunk if necessary
  250. bytes_free = free_space(async->o_list, async->o_wpos);
  251. if (bytes_free < 0)
  252. return -1;
  253. if (0 == bytes_free) {
  254. new_chunk = faux_malloc(DATA_CHUNK);
  255. assert(new_chunk);
  256. faux_list_add(async->o_list, new_chunk);
  257. async->o_wpos = 0;
  258. bytes_free = free_space(async->o_list, async->o_wpos);
  259. }
  260. // Copy data
  261. chunk_ptr = faux_list_data(faux_list_tail(async->o_list));
  262. copy_len = (data_left < (size_t)bytes_free) ? data_left : (size_t)bytes_free;
  263. memcpy(chunk_ptr + async->o_wpos, data + len - data_left,
  264. copy_len);
  265. async->o_wpos += copy_len;
  266. data_left -= copy_len;
  267. async->o_size += copy_len;
  268. if (async->o_size >= async->o_overflow)
  269. return -1;
  270. }
  271. // Try to real write data to fd in nonblocked mode
  272. faux_async_out(async);
  273. return len;
  274. }
  275. /** @brief Get amount of available data within first chunk.
  276. *
  277. * Inernal static function.
  278. *
  279. * @param [in] list Internal buffer (list of chunks) to inspect.
  280. * @param [in] rpos Current read position within chunk.
  281. * @param [in] wpos Current write position within chunk.
  282. * @return Available data length or < 0 on error.
  283. */
  284. static ssize_t data_avail(faux_list_t *list, size_t rpos, size_t wpos)
  285. {
  286. size_t len = 0;
  287. if (!list)
  288. return -1;
  289. len = faux_list_len(list);
  290. if (len == 0)
  291. return 0;
  292. if (len > 1)
  293. return (DATA_CHUNK - rpos);
  294. // Single chunk
  295. return (wpos - rpos);
  296. }
  297. /** @brief Write output buffer to fd in non-blocking mode.
  298. *
  299. * Previously data must be written to internal buffer by faux_async_write()
  300. * function. But some data can be left within internal buffer because can't be
  301. * written to fd in non-blocking mode. This function tries to write the rest of
  302. * data to fd in non-blocking mode. So function doesn't block. It can be called
  303. * after select() or poll() if fd is ready to be written to. If function can't
  304. * to write all buffer to fd it executes "stall" callback to inform about it.
  305. *
  306. * @param [in] async Allocated and initialized async I/O object.
  307. * @return Length of data actually written or < 0 on error.
  308. */
  309. ssize_t faux_async_out(faux_async_t *async)
  310. {
  311. ssize_t total_written = 0;
  312. assert(async);
  313. if (!async)
  314. return -1;
  315. while (async->o_size > 0) {
  316. faux_list_node_t *node = NULL;
  317. char *chunk_ptr = NULL;
  318. ssize_t data_to_write = 0;
  319. ssize_t bytes_written = 0;
  320. bool_t postpone = BOOL_FALSE;
  321. node = faux_list_head(async->o_list);
  322. if (!node) // List is empty while o_size > 0
  323. return -1;
  324. chunk_ptr = faux_list_data(node);
  325. data_to_write = data_avail(async->o_list,
  326. async->o_rpos, async->o_wpos);
  327. if (data_to_write <= 0) // Strange case
  328. return -1;
  329. bytes_written = write(async->fd, chunk_ptr + async->o_rpos,
  330. data_to_write);
  331. if (bytes_written > 0) {
  332. async->o_size -= bytes_written;
  333. total_written += bytes_written;
  334. }
  335. if (bytes_written < 0) {
  336. if ( // Something went wrong
  337. (errno != EINTR) &&
  338. (errno != EAGAIN) &&
  339. (errno != EWOULDBLOCK)
  340. )
  341. return -1;
  342. // Postpone next read
  343. postpone = BOOL_TRUE;
  344. // Not whole data block was written
  345. } else if (bytes_written != data_to_write) {
  346. async->o_rpos += bytes_written;
  347. // Postpone next read
  348. postpone = BOOL_TRUE;
  349. }
  350. // Postponed
  351. if (postpone) {
  352. // Execute callback
  353. if (async->stall_cb)
  354. async->stall_cb(async, async->o_size,
  355. async->stall_udata);
  356. break;
  357. }
  358. // Not postponed. Current chunk was fully written. So
  359. // remove it from list.
  360. async->o_rpos = 0;
  361. faux_list_del(async->o_list, node);
  362. }
  363. return total_written;
  364. }
  365. /** @brief Read data and store it to internal buffer in non-blocking mode.
  366. *
  367. * Reads fd and puts data to internal buffer. It can't be blocked. If length of
  368. * data stored within internal buffer is greater or equal than "min" limit then
  369. * function will execute "read" callback. It allocates linear buffer, copies
  370. * data to it and give it to callback. Note this function will never free
  371. * allocated buffer. So callback must do it or it must be done later. Function
  372. * will not allocate buffer larger than "max" read limit. If "max" limit is "0"
  373. * (it means indefinite) then function will pass all available data to callback.
  374. *
  375. * @param [in] async Allocated and initialized async I/O object.
  376. * @return Length of data actually readed or < 0 on error.
  377. */
  378. ssize_t faux_async_in(faux_async_t *async)
  379. {
  380. void *new_chunk = NULL;
  381. ssize_t total_readed = 0;
  382. ssize_t bytes_readed = 0;
  383. ssize_t bytes_free = 0; // Free space within current (last) chunk
  384. assert(async);
  385. if (!async)
  386. return -1;
  387. do {
  388. char *chunk_ptr = NULL;
  389. // Allocate new chunk if necessary
  390. bytes_free = free_space(async->i_list, async->i_wpos);
  391. if (bytes_free < 0)
  392. return -1;
  393. if (0 == bytes_free) { // We need to allocate additional chunk
  394. new_chunk = faux_malloc(DATA_CHUNK);
  395. assert(new_chunk);
  396. faux_list_add(async->i_list, new_chunk);
  397. async->i_wpos = 0;
  398. bytes_free = free_space(async->i_list, async->i_wpos);
  399. }
  400. // Read data to last chunk
  401. chunk_ptr = faux_list_data(faux_list_tail(async->i_list));
  402. bytes_readed = read(async->fd, chunk_ptr + async->i_wpos, bytes_free);
  403. if (bytes_readed < 0) {
  404. if ( // Something went wrong
  405. (errno != EINTR) &&
  406. (errno != EAGAIN) &&
  407. (errno != EWOULDBLOCK)
  408. )
  409. return -1;
  410. }
  411. if (bytes_readed > 0) {
  412. async->i_wpos += bytes_readed;
  413. async->i_size += bytes_readed;
  414. total_readed += bytes_readed;
  415. }
  416. if (async->i_size >= async->i_overflow)
  417. return -1;
  418. // Check for amount of stored data
  419. while (async->i_size >= async->min) {
  420. size_t copy_len = 0;
  421. size_t full_size = 0;
  422. char *buf = NULL;
  423. char *buf_ptr = NULL;
  424. if (FAUX_ASYNC_UNLIMITED == async->max) { // Indefinite
  425. copy_len = async->i_size; // Take all data
  426. } else {
  427. copy_len = (async->i_size < async->max) ?
  428. async->i_size : async->max;
  429. }
  430. full_size = copy_len; // Save full length value
  431. buf = faux_malloc(full_size);
  432. buf_ptr = buf;
  433. while (copy_len > 0) {
  434. size_t data_to_write = 0;
  435. faux_list_node_t *node = faux_list_head(async->i_list);
  436. char *chunk_ptr = NULL;
  437. if (!node) // Something went wrong
  438. return -1;
  439. chunk_ptr = faux_list_data(node);
  440. data_to_write = data_avail(async->i_list,
  441. async->i_rpos, async->i_wpos);
  442. if (copy_len < data_to_write)
  443. data_to_write = copy_len;
  444. memcpy(buf_ptr, chunk_ptr + async->i_rpos,
  445. data_to_write);
  446. copy_len -= data_to_write;
  447. async->i_size -= data_to_write;
  448. async->i_rpos += data_to_write;
  449. buf_ptr += data_to_write;
  450. if (data_avail(async->i_list,
  451. async->i_rpos, async->i_wpos) <= 0) {
  452. async->i_rpos = 0;
  453. faux_list_del(async->i_list, node);
  454. }
  455. }
  456. // Execute callback
  457. if (async->read_cb)
  458. async->read_cb(async, buf,
  459. full_size, async->read_udata);
  460. }
  461. } while (bytes_readed == bytes_free);
  462. return total_readed;
  463. }