async.c 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. /** @file async.c
  2. */
  3. #ifdef HAVE_CONFIG_H
  4. #include "config.h"
  5. #endif /* HAVE_CONFIG_H */
  6. #include <stdlib.h>
  7. #include <stdint.h>
  8. #include <stdio.h>
  9. #include <string.h>
  10. #include <assert.h>
  11. #include <unistd.h>
  12. #include <errno.h>
  13. #include <sys/types.h>
  14. #include <sys/stat.h>
  15. #include <fcntl.h>
  16. #include "faux/faux.h"
  17. #include "faux/str.h"
  18. #include "faux/net.h"
  19. #include "faux/async.h"
  20. #include "private.h"
  21. #define DATA_CHUNK 4096
  22. /** @brief Create new async I/O object.
  23. *
  24. * Constructor gets associated file descriptor to operate on it. File
  25. * descriptor must be nonblocked. If not so then constructor will set
  26. * nonblock flag itself.
  27. *
  28. * @param [in] fd File descriptor.
  29. * @return Allocated object or NULL on error.
  30. */
  31. faux_async_t *faux_async_new(int fd)
  32. {
  33. faux_async_t *async = NULL;
  34. int fflags = 0;
  35. // Prepare FD
  36. if (fd < 0) // Illegal fd
  37. return NULL;
  38. if ((fflags = fcntl(fd, F_GETFL)) == -1)
  39. return NULL;
  40. if (fcntl(fd, F_SETFL, fflags | O_NONBLOCK) == -1)
  41. return NULL;
  42. async = faux_zmalloc(sizeof(*async));
  43. assert(async);
  44. if (!async)
  45. return NULL;
  46. // Init
  47. async->fd = fd;
  48. // Read (Input)
  49. async->read_cb = NULL;
  50. async->read_udata = NULL;
  51. async->min = 1;
  52. async->max = 0; // Indefinite
  53. async->i_list = faux_list_new(FAUX_LIST_UNSORTED, FAUX_LIST_NONUNIQUE,
  54. NULL, NULL, faux_free);
  55. async->i_rpos = 0;
  56. async->i_wpos = 0;
  57. async->i_size = 0;
  58. // Write (Output)
  59. async->stall_cb = NULL;
  60. async->stall_udata = NULL;
  61. async->overflow = 10000000l; // ~ 10M
  62. async->o_list = faux_list_new(FAUX_LIST_UNSORTED, FAUX_LIST_NONUNIQUE,
  63. NULL, NULL, faux_free);
  64. async->o_rpos = 0;
  65. async->o_wpos = 0;
  66. async->o_size = 0;
  67. return async;
  68. }
  69. /** @brief Free async I/O object.
  70. *
  71. * @param [in] Async I/O object.
  72. */
  73. void faux_async_free(faux_async_t *async)
  74. {
  75. if (!async)
  76. return;
  77. faux_list_free(async->i_list);
  78. faux_list_free(async->o_list);
  79. faux_free(async);
  80. }
  81. /** @brief Get file descriptor from async I/O object.
  82. *
  83. * @param [in] async Allocated and initialized async I/O object.
  84. * @return Serviced file descriptor.
  85. */
  86. int faux_async_fd(const faux_async_t *async)
  87. {
  88. assert(async);
  89. if (!async)
  90. return -1;
  91. return async->fd;
  92. }
  93. /** @brief Set read callback and associated user data.
  94. *
  95. * If callback function pointer is NULL then class will drop all readed data.
  96. *
  97. * @param [in] async Allocated and initialized async I/O object.
  98. * @param [in] read_cb Read callback.
  99. * @param [in] user_data Associated user data.
  100. */
  101. void faux_async_set_read_cb(faux_async_t *async,
  102. faux_async_read_cb_f read_cb, void *user_data)
  103. {
  104. assert(async);
  105. if (!async)
  106. return;
  107. async->read_cb = read_cb;
  108. async->read_udata = user_data;
  109. }
  110. /** @brief Set read limits.
  111. *
  112. * Read limits define conditions when the read callback will be executed.
  113. * Buffer must contain data amount greater or equal to "min" value. Callback
  114. * will not get data amount greater than "max" value. If min == max then
  115. * callback will be executed with fixed data size. The "max" value can be "0".
  116. * It means indefinite i.e. data transferred to callback can be really large.
  117. *
  118. * @param [in] async Allocated and initialized async I/O object.
  119. * @param [in] min Minimal data amount.
  120. * @param [in] max Maximal data amount. The "0" means indefinite.
  121. * @return BOOL_TRUE - success, BOOL_FALSE - error.
  122. */
  123. bool_t faux_async_set_read_limits(faux_async_t *async, size_t min, size_t max)
  124. {
  125. assert(async);
  126. if (!async)
  127. return BOOL_FALSE;
  128. if (min < 1)
  129. return BOOL_FALSE;
  130. if ((min > max) && (max != 0))
  131. return BOOL_FALSE;
  132. async->min = min;
  133. async->max = max;
  134. return BOOL_TRUE;
  135. }
  136. /** @brief Set stall callback and associated user data.
  137. *
  138. * @param [in] async Allocated and initialized async I/O object.
  139. * @param [in] stall_cb Stall callback.
  140. * @param [in] user_data Associated user data.
  141. */
  142. void faux_async_set_stall_cb(faux_async_t *async,
  143. faux_async_stall_cb_f stall_cb, void *user_data)
  144. {
  145. assert(async);
  146. if (!async)
  147. return;
  148. async->stall_cb = stall_cb;
  149. async->stall_udata = user_data;
  150. }
  151. /** @brief Set overflow value.
  152. *
  153. * "Overflow" is a value when engine consider data consumer as a stalled.
  154. * Data gets into the async I/O object buffer but object can't write it to
  155. * serviced fd for too long time. So it accumulates great amount of data.
  156. *
  157. * @param [in] async Allocated and initialized async I/O object.
  158. * @param [in] overflow Overflow value.
  159. */
  160. void faux_async_set_overflow(faux_async_t *async, size_t overflow)
  161. {
  162. assert(async);
  163. if (!async)
  164. return;
  165. async->overflow = overflow;
  166. }
  167. static ssize_t free_space(faux_list_t *list, size_t pos)
  168. {
  169. if (!list)
  170. return -1;
  171. if (faux_list_len(list) == 0)
  172. return 0;
  173. return (DATA_CHUNK - pos);
  174. }
  175. ssize_t faux_async_write(faux_async_t *async, void *data, size_t len)
  176. {
  177. void *new_chunk = NULL;
  178. size_t data_left = len;
  179. assert(async);
  180. if (!async)
  181. return -1;
  182. assert(data);
  183. if (!data)
  184. return -1;
  185. while (data_left != 0) {
  186. ssize_t bytes_free = 0;
  187. size_t copy_len = 0;
  188. char *chunk_ptr = NULL;
  189. // Allocate new chunk if necessary
  190. bytes_free = free_space(async->o_list, async->o_wpos);
  191. if (bytes_free < 0)
  192. return -1;
  193. if (0 == bytes_free) {
  194. new_chunk = faux_malloc(DATA_CHUNK);
  195. assert(new_chunk);
  196. faux_list_add(async->o_list, new_chunk);
  197. async->o_wpos = 0;
  198. bytes_free = free_space(async->o_list, async->o_wpos);
  199. }
  200. // Copy data
  201. chunk_ptr = faux_list_data(faux_list_tail(async->o_list));
  202. copy_len = (data_left < (size_t)bytes_free) ? data_left : (size_t)bytes_free;
  203. memcpy(chunk_ptr + async->o_wpos, data + len - data_left,
  204. copy_len);
  205. async->o_wpos += copy_len;
  206. data_left -= copy_len;
  207. async->o_size += copy_len;
  208. if (async->o_size >= async->overflow)
  209. return -1;
  210. }
  211. // Try to real write data to fd in nonblocked mode
  212. faux_async_out(async);
  213. return len;
  214. }
  215. static ssize_t data_avail(faux_list_t *list, size_t rpos, size_t wpos)
  216. {
  217. size_t len = 0;
  218. if (!list)
  219. return -1;
  220. len = faux_list_len(list);
  221. if (len == 0)
  222. return 0;
  223. if (len > 1)
  224. return (DATA_CHUNK - rpos);
  225. // Single chunk
  226. return (wpos - rpos);
  227. }
  228. ssize_t faux_async_out(faux_async_t *async)
  229. {
  230. ssize_t total_written = 0;
  231. assert(async);
  232. if (!async)
  233. return -1;
  234. while (async->o_size > 0) {
  235. faux_list_node_t *node = NULL;
  236. char *chunk_ptr = NULL;
  237. ssize_t data_to_write = 0;
  238. ssize_t bytes_written = 0;
  239. bool_t postpone = BOOL_FALSE;
  240. node = faux_list_head(async->o_list);
  241. if (!node) // List is empty while o_size > 0
  242. return -1;
  243. chunk_ptr = faux_list_data(faux_list_head(async->o_list));
  244. data_to_write = data_avail(async->o_list,
  245. async->o_rpos, async->o_wpos);
  246. if (data_to_write <= 0) // Strange case
  247. return -1;
  248. bytes_written = write(async->fd, chunk_ptr + async->o_rpos,
  249. data_to_write);
  250. if (bytes_written > 0) {
  251. async->o_size -= bytes_written;
  252. total_written += bytes_written;
  253. }
  254. if (bytes_written < 0) {
  255. if ( // Something went wrong
  256. (errno != EINTR) &&
  257. (errno != EAGAIN) &&
  258. (errno != EWOULDBLOCK)
  259. )
  260. return -1;
  261. // Postpone next read
  262. postpone = BOOL_TRUE;
  263. // Not whole data block was written
  264. } else if (bytes_written != data_to_write) {
  265. async->o_rpos += bytes_written;
  266. // Postpone next read
  267. postpone = BOOL_TRUE;
  268. }
  269. // Postponed
  270. if (postpone) {
  271. // Execute callback
  272. if (async->stall_cb)
  273. async->stall_cb(async, async->o_size,
  274. async->stall_udata);
  275. break;
  276. }
  277. // Not postponed. Current chunk was fully written. So
  278. // remove it from list.
  279. async->o_rpos = 0;
  280. faux_list_del(async->o_list, node);
  281. }
  282. return total_written;
  283. }