Browse Source

faux.async: Use faux_buf_t with faux_async_t. Unfinished

Serj Kalichev 3 years ago
parent
commit
13f3cc57dc
6 changed files with 172 additions and 142 deletions
  1. 6 0
      faux/async.h
  2. 31 130
      faux/async/async.c
  3. 4 11
      faux/async/private.h
  4. 3 0
      faux/buf.h
  5. 126 1
      faux/buf/buf.c
  6. 2 0
      faux/faux.map

+ 6 - 0
faux/async.h

@@ -10,6 +10,12 @@
 
 #define FAUX_ASYNC_UNLIMITED 0
 
+// Default overflow limit for out buffer ~ 10M
+#define FAUX_ASYNC_OUT_OVERFLOW 10000000l
+// Default overflow limit for in buffer ~ 10M
+#define FAUX_ASYNC_IN_OVERFLOW 10000000l
+
+
 typedef struct faux_async_s faux_async_t;
 
 

+ 31 - 130
faux/async/async.c

@@ -39,13 +39,12 @@
 
 #include "faux/faux.h"
 #include "faux/str.h"
+#include "faux/buf.h"
 #include "faux/net.h"
 #include "faux/async.h"
 
 #include "private.h"
 
-#define DATA_CHUNK 4096
-
 /** @brief Create new async I/O object.
  *
  * Constructor gets associated file descriptor to operate on it. File
@@ -81,22 +80,14 @@ faux_async_t *faux_async_new(int fd)
 	async->read_udata = NULL;
 	async->min = 1;
 	async->max = FAUX_ASYNC_UNLIMITED;
-	async->i_list = faux_list_new(FAUX_LIST_UNSORTED, FAUX_LIST_NONUNIQUE,
-		NULL, NULL, faux_free);
-	async->i_rpos = 0;
-	async->i_wpos = 0;
-	async->i_size = 0;
-	async->i_overflow = 10000000l; // ~ 10M
+	async->ibuf = faux_buf_new(DATA_CHUNK);
+	faux_buf_set_limit(async->ibuf, FAUX_ASYNC_IN_OVERFLOW);
 
 	// Write (Output)
 	async->stall_cb = NULL;
 	async->stall_udata = NULL;
-	async->o_overflow = 10000000l; // ~ 10M
-	async->o_list = faux_list_new(FAUX_LIST_UNSORTED, FAUX_LIST_NONUNIQUE,
-		NULL, NULL, faux_free);
-	async->o_rpos = 0;
-	async->o_wpos = 0;
-	async->o_size = 0;
+	async->obuf = faux_buf_new(DATA_CHUNK);
+	faux_buf_set_limit(async->obuf, FAUX_ASYNC_OUT_OVERFLOW);
 
 	return async;
 }
@@ -111,8 +102,8 @@ void faux_async_free(faux_async_t *async)
 	if (!async)
 		return;
 
-	faux_list_free(async->i_list);
-	faux_list_free(async->o_list);
+	faux_buf_free(async->ibuf);
+	faux_buf_free(async->obuf);
 
 	faux_free(async);
 }
@@ -216,7 +207,7 @@ void faux_async_set_write_overflow(faux_async_t *async, size_t overflow)
 	if (!async)
 		return;
 
-	async->o_overflow = overflow;
+	faux_buf_set_limit(async->obuf, overflow);
 }
 
 
@@ -235,27 +226,7 @@ void faux_async_set_read_overflow(faux_async_t *async, size_t overflow)
 	if (!async)
 		return;
 
-	async->i_overflow = overflow;
-}
-
-
-/** @brief Get amount of unused space within current data chunk.
- *
- * Inernal static function.
- *
- * @param [in] list Internal buffer (list of chunks) to inspect.
- * @param [in] pos Current write position within last chunk
- * @return Size of unused space or < 0 on error.
- */
-static ssize_t free_space(faux_list_t *list, size_t pos)
-{
-	if (!list)
-		return -1;
-
-	if (faux_list_len(list) == 0)
-		return 0;
-
-	return (DATA_CHUNK - pos);
+	faux_buf_set_limit(async->ibuf, overflow);
 }
 
 
@@ -275,8 +246,7 @@ static ssize_t free_space(faux_list_t *list, size_t pos)
  */
 ssize_t faux_async_write(faux_async_t *async, void *data, size_t len)
 {
-	void *new_chunk = NULL;
-	size_t data_left = len;
+	ssize_t data_written = len;
 
 	assert(async);
 	if (!async)
@@ -285,34 +255,9 @@ ssize_t faux_async_write(faux_async_t *async, void *data, size_t len)
 	if (!data)
 		return -1;
 
-	while (data_left != 0) {
-		ssize_t bytes_free = 0;
-		size_t copy_len = 0;
-		char *chunk_ptr = NULL;
-
-		// Allocate new chunk if necessary
-		bytes_free = free_space(async->o_list, async->o_wpos);
-		if (bytes_free < 0)
-			return -1;
-		if (0 == bytes_free) {
-			new_chunk = faux_malloc(DATA_CHUNK);
-			assert(new_chunk);
-			faux_list_add(async->o_list, new_chunk);
-			async->o_wpos = 0;
-			bytes_free = free_space(async->o_list, async->o_wpos);
-		}
-
-		// Copy data
-		chunk_ptr = faux_list_data(faux_list_tail(async->o_list));
-		copy_len = (data_left < (size_t)bytes_free) ? data_left : (size_t)bytes_free;
-		memcpy(chunk_ptr + async->o_wpos, data + len - data_left,
-			copy_len);
-		async->o_wpos += copy_len;
-		data_left -= copy_len;
-		async->o_size += copy_len;
-		if (async->o_size >= async->o_overflow)
-			return -1;
-	}
+	data_written = faux_buf_write(async->obuf, data, len);
+	if (data_written < 0)
+		return -1;
 
 	// Try to real write data to fd in nonblocked mode
 	faux_async_out(async);
@@ -442,33 +387,23 @@ ssize_t faux_async_out(faux_async_t *async)
  */
 ssize_t faux_async_in(faux_async_t *async)
 {
-	void *new_chunk = NULL;
 	ssize_t total_readed = 0;
 	ssize_t bytes_readed = 0;
-	ssize_t bytes_free = 0; // Free space within current (last) chunk
+	ssize_t locked_len = 0;
 
 	assert(async);
 	if (!async)
 		return -1;
 
 	do {
-		char *chunk_ptr = NULL;
+		void *data = NULL;
+		size_t bytes_stored = 0;
 
-		// Allocate new chunk if necessary
-		bytes_free = free_space(async->i_list, async->i_wpos);
-		if (bytes_free < 0)
+		locked_len = faux_buf_dwrite_lock_easy(async->ibuf, &data);
+		if (locked_len <= 0)
 			return -1;
-		if (0 == bytes_free) { // We need to allocate additional chunk
-			new_chunk = faux_malloc(DATA_CHUNK);
-			assert(new_chunk);
-			faux_list_add(async->i_list, new_chunk);
-			async->i_wpos = 0;
-			bytes_free = free_space(async->i_list, async->i_wpos);
-		}
-
-		// Read data to last chunk
-		chunk_ptr = faux_list_data(faux_list_tail(async->i_list));
-		bytes_readed = read(async->fd, chunk_ptr + async->i_wpos, bytes_free);
+		// Read data
+		bytes_readed = read(async->fd, data, locked_len);
 		if (bytes_readed < 0) {
 			if ( // Something went wrong
 				(errno != EINTR) &&
@@ -477,64 +412,30 @@ ssize_t faux_async_in(faux_async_t *async)
 			)
 				return -1;
 		}
-		if (bytes_readed > 0) {
-			async->i_wpos += bytes_readed;
-			async->i_size += bytes_readed;
-			total_readed += bytes_readed;
-		}
-		if (async->i_size >= async->i_overflow)
-			return -1;
+		faux_buf_dwrite_unlock_easy(async->ibuf, bytes_readed);
+		total_readed += bytes_readed;
 
 		// Check for amount of stored data
-		while (async->i_size >= async->min) {
-
+		while ((bytes_stored = faux_buf_len(async->ibuf)) >= async->min) {
 			size_t copy_len = 0;
-			size_t full_size = 0;
 			char *buf = NULL;
-			char *buf_ptr = NULL;
 
 			if (FAUX_ASYNC_UNLIMITED == async->max) { // Indefinite
-				copy_len = async->i_size; // Take all data
+				copy_len = bytes_stored; // Take all data
 			} else {
-				copy_len = (async->i_size < async->max) ?
-					async->i_size : async->max;
+				copy_len = (bytes_stored < async->max) ?
+					bytes_stored : async->max;
 			}
+			buf = faux_malloc(copy_len);
+			assert(buf);
+			faux_buf_read(async->ibuf, buf, copy_len);
 
-			full_size = copy_len; // Save full length value
-			buf = faux_malloc(full_size);
-			buf_ptr = buf;
-			while (copy_len > 0) {
-				size_t data_to_write = 0;
-				faux_list_node_t *node = faux_list_head(async->i_list);
-				char *chunk_ptr = NULL;
-
-				if (!node) // Something went wrong
-					return -1;
-				chunk_ptr = faux_list_data(node);
-				data_to_write = data_avail(async->i_list,
-					async->i_rpos, async->i_wpos);
-				if (copy_len < data_to_write)
-					data_to_write = copy_len;
-				memcpy(buf_ptr, chunk_ptr + async->i_rpos,
-					data_to_write);
-				copy_len -= data_to_write;
-				async->i_size -= data_to_write;
-				async->i_rpos += data_to_write;
-				buf_ptr += data_to_write;
-				if (data_avail(async->i_list,
-					async->i_rpos, async->i_wpos) <= 0) {
-					async->i_rpos = 0;
-					faux_list_del(async->i_list, node);
-				}
-			}
 			// Execute callback
 			if (async->read_cb)
 				async->read_cb(async, buf,
-					full_size, async->read_udata);
-
+					copy_len, async->read_udata);
 		}
-
-	} while (bytes_readed == bytes_free);
+	} while (bytes_readed == locked_len);
 
 	return total_readed;
 }

+ 4 - 11
faux/async/private.h

@@ -1,7 +1,8 @@
 #include "faux/faux.h"
-#include "faux/list.h"
+#include "faux/buf.h"
 #include "faux/net.h"
 
+#define DATA_CHUNK 4096
 
 struct faux_async_s {
 	int fd;
@@ -11,18 +12,10 @@ struct faux_async_s {
 	void *read_udata;
 	size_t min;
 	size_t max;
-	faux_list_t *i_list;
-	size_t i_rpos;
-	size_t i_wpos;
-	size_t i_size;
-	size_t i_overflow;
+	faux_buf_t *ibuf;
 
 	// Write
 	faux_async_stall_cb_fn stall_cb; // Stall callback
 	void *stall_udata;
-	faux_list_t *o_list;
-	size_t o_rpos;
-	size_t o_wpos;
-	size_t o_size;
-	size_t o_overflow;
+	faux_buf_t *obuf;
 };

+ 3 - 0
faux/buf.h

@@ -33,6 +33,9 @@ ssize_t faux_buf_dwrite_lock(faux_buf_t *buf, size_t len,
 	struct iovec **iov_out, size_t *iov_num_out);
 ssize_t faux_buf_dwrite_unlock(faux_buf_t *buf, size_t really_written,
 	struct iovec *iov);
+ssize_t faux_buf_dwrite_lock_easy(faux_buf_t *buf, void **data);
+ssize_t faux_buf_dwrite_unlock_easy(faux_buf_t *buf, size_t really_written);
+
 
 C_DECL_END
 

+ 126 - 1
faux/buf/buf.c

@@ -418,6 +418,65 @@ ssize_t faux_buf_dread_lock(faux_buf_t *buf, size_t len,
 }
 
 
+/** @brief Locks data for reading.
+ *
+ * The complimentary function is faux_buf_dread_unlock_easy().
+ * This function has the same functionality as faux_buf_dread_lock() but
+ * chooses the lentgh of locked space itself to return single continuous buffer.
+ *
+ * @param [in] buf Allocated and initialized dynamic buffer object.
+ * @param [out] data Continuous buffer for direct reading.
+ * @return Length of data actually locked or < 0 on error.
+ */
+ssize_t faux_buf_dread_lock_easy(faux_buf_t *buf, void **data)
+{
+	struct iovec *iov = NULL;
+	size_t iov_num = 0;
+	ssize_t len_to_lock = 0;
+	ssize_t avail = 0;
+	ssize_t locked_len = 0;
+
+	assert(buf);
+	if (!buf)
+		return -1;
+	assert(data);
+	if (!data)
+		return -1;
+
+	// Don't use already locked buffer
+	if (faux_buf_is_rlocked(buf))
+		return -1;
+
+	avail = faux_buf_ravail(buf);
+	if (avail < 0)
+		return -1;
+	if (0 == avail)
+		avail = buf->chunk_size; // Next chunk
+
+	len_to_lock = ((size_t)avail < buf->len) ? (size_t)avail : buf->len;
+	// Nothing to lock
+	if (0 == len_to_lock) {
+		*data = NULL;
+		return 0;
+	}
+
+	locked_len = faux_buf_dread_lock(buf, len_to_lock, &iov, &iov_num);
+	if (locked_len <= 0)
+		return -1;
+	if (iov_num < 1) {
+		faux_free(iov);
+		return -1;
+	}
+
+	*data = iov[0].iov_base;
+	locked_len = iov[0].iov_len;
+
+	faux_free(iov);
+
+	return locked_len;
+}
+
+
 /** @brief Frees "struct iovec" array and unlocks read data.
  *
  * The length of actually readed data can be less than length of locked data.
@@ -634,6 +693,57 @@ ssize_t faux_buf_dwrite_lock(faux_buf_t *buf, size_t len,
 }
 
 
+/** @brief Gets a data buffer for direct writing and locks it.
+ *
+ * The complimentary function is faux_buf_dwrite_unlock_easy().
+ * This function has the same functionality as faux_buf_dwrite_lock() but
+ * choose the lentgh of locked space itself to return single continuous buffer.
+ *
+ * @param [in] buf Allocated and initialized dynamic buffer object.
+ * @param [out] data Continuous buffer for direct writting.
+ * @return Length of data actually locked or < 0 on error.
+ */
+ssize_t faux_buf_dwrite_lock_easy(faux_buf_t *buf, void **data)
+{
+	struct iovec *iov = NULL;
+	size_t iov_num = 0;
+	ssize_t len = 0;
+	ssize_t locked_len = 0;
+
+	assert(buf);
+	if (!buf)
+		return -1;
+	assert(data);
+	if (!data)
+		return -1;
+
+	// Don't use already locked buffer
+	if (faux_buf_is_wlocked(buf))
+		return -1;
+
+	len = faux_buf_wavail(buf);
+	if (len < 0)
+		return -1;
+	if (0 == len)
+		len = buf->chunk_size; // It will use next chunk
+
+	locked_len = faux_buf_dwrite_lock(buf, len, &iov, &iov_num);
+	if (locked_len <= 0)
+		return -1;
+	if (iov_num < 1) {
+		faux_free(iov);
+		return -1;
+	}
+
+	*data = iov[0].iov_base;
+	locked_len = iov[0].iov_len;
+
+	faux_free(iov);
+
+	return locked_len;
+}
+
+
 /** @brief Frees "struct iovec" array and unlocks written data.
  *
  * The length of actually written data can be less than length of locked data.
@@ -646,7 +756,6 @@ ssize_t faux_buf_dwrite_lock(faux_buf_t *buf, size_t len,
  * @param [in] buf Allocated and initialized dynamic buffer object.
  * @param [in] really_written Length of data actually written.
  * @param [out] iov "struct iovec" array to free.
- * @param [out] iov_num_out Number of "struct iovec" array elements.
  * @return Length of data actually unlocked or < 0 on error.
  */
 ssize_t faux_buf_dwrite_unlock(faux_buf_t *buf, size_t really_written,
@@ -706,3 +815,19 @@ ssize_t faux_buf_dwrite_unlock(faux_buf_t *buf, size_t really_written,
 
 	return really_written;
 }
+
+
+/** @brief Unlocks written data.
+ *
+ * It's a function complementary to faux_buf_dwrite_lock_easy().
+ * It has the same functionality as faux_dwrite_lock() but doesn't free
+ * "struct iovec" array.
+ *
+ * @param [in] buf Allocated and initialized dynamic buffer object.
+ * @param [in] really_written Length of data actually written.
+ * @return Length of data actually unlocked or < 0 on error.
+ */
+ssize_t faux_buf_dwrite_unlock_easy(faux_buf_t *buf, size_t really_written)
+{
+	return faux_buf_dwrite_unlock(buf, really_written, NULL);
+}

+ 2 - 0
faux/faux.map

@@ -334,6 +334,8 @@ FAUX_2.0 {
 		faux_buf_dread_unlock;
 		faux_buf_dwrite_lock;
 		faux_buf_dwrite_unlock;
+		faux_buf_dwrite_lock_easy;
+		faux_buf_dwrite_unlock_easy;
 
 		testc_version_major;
 		testc_version_minor;