瀏覽代碼

faux.async: async input. Test for async input

Serj Kalichev 3 年之前
父節點
當前提交
f33098a299
共有 5 個文件被更改,包括 221 次插入20 次删除
  1. 3 9
      faux/async.h
  2. 126 6
      faux/async/async.c
  3. 4 1
      faux/async/private.h
  4. 86 3
      faux/async/testc_async.c
  5. 2 1
      faux/testc_module/testc_module.c

+ 3 - 9
faux/async.h

@@ -10,14 +10,6 @@
 
 typedef struct faux_async_s faux_async_t;
 
-/*
-typedef enum {
-	FAUX_ELOOP_NULL = 0,
-	FAUX_ELOOP_SIGNAL = 1,
-	FAUX_ELOOP_SCHED = 2,
-	FAUX_ELOOP_FD = 3
-} faux_eloop_type_e;
-*/
 
 // Callback function prototypes
 typedef bool_t (*faux_async_read_cb_f)(faux_async_t *async,
@@ -36,9 +28,11 @@ void faux_async_set_read_cb(faux_async_t *async,
 bool_t faux_async_set_read_limits(faux_async_t *async, size_t min, size_t max);
 void faux_async_set_stall_cb(faux_async_t *async,
 	faux_async_stall_cb_f stall_cb, void *user_data);
-void faux_async_set_overflow(faux_async_t *async, size_t overflow);
+void faux_async_set_write_overflow(faux_async_t *async, size_t overflow);
+void faux_async_set_read_overflow(faux_async_t *async, size_t overflow);
 ssize_t faux_async_write(faux_async_t *async, void *data, size_t len);
 ssize_t faux_async_out(faux_async_t *async);
+ssize_t faux_async_in(faux_async_t *async);
 
 C_DECL_END
 

+ 126 - 6
faux/async/async.c

@@ -65,11 +65,12 @@ faux_async_t *faux_async_new(int fd)
 	async->i_rpos = 0;
 	async->i_wpos = 0;
 	async->i_size = 0;
+	async->i_overflow = 10000000l; // ~ 10M
 
 	// Write (Output)
 	async->stall_cb = NULL;
 	async->stall_udata = NULL;
-	async->overflow = 10000000l; // ~ 10M
+	async->o_overflow = 10000000l; // ~ 10M
 	async->o_list = faux_list_new(FAUX_LIST_UNSORTED, FAUX_LIST_NONUNIQUE,
 		NULL, NULL, faux_free);
 	async->o_rpos = 0;
@@ -179,7 +180,7 @@ void faux_async_set_stall_cb(faux_async_t *async,
 }
 
 
-/** @brief Set overflow value.
+/** @brief Set write overflow value.
  *
  * "Overflow" is a value when engine consider data consumer as a stalled.
  * Data gets into the async I/O object buffer but object can't write it to
@@ -188,13 +189,32 @@ void faux_async_set_stall_cb(faux_async_t *async,
  * @param [in] async Allocated and initialized async I/O object.
  * @param [in] overflow Overflow value.
  */
-void faux_async_set_overflow(faux_async_t *async, size_t overflow)
+void faux_async_set_write_overflow(faux_async_t *async, size_t overflow)
 {
 	assert(async);
 	if (!async)
 		return;
 
-	async->overflow = overflow;
+	async->o_overflow = overflow;
+}
+
+
+/** @brief Set read overflow value.
+ *
+ * "Overflow" is a value when engine consider data consumer as a stalled.
+ * Data gets into the async I/O object buffer but object can't write it to
+ * serviced fd for too long time. So it accumulates great amount of data.
+ *
+ * @param [in] async Allocated and initialized async I/O object.
+ * @param [in] overflow Overflow value.
+ */
+void faux_async_set_read_overflow(faux_async_t *async, size_t overflow)
+{
+	assert(async);
+	if (!async)
+		return;
+
+	async->i_overflow = overflow;
 }
 
 
@@ -247,7 +267,7 @@ ssize_t faux_async_write(faux_async_t *async, void *data, size_t len)
 		async->o_wpos += copy_len;
 		data_left -= copy_len;
 		async->o_size += copy_len;
-		if (async->o_size >= async->overflow)
+		if (async->o_size >= async->o_overflow)
 			return -1;
 	}
 
@@ -294,7 +314,7 @@ ssize_t faux_async_out(faux_async_t *async)
 		node = faux_list_head(async->o_list);
 		if (!node) // List is empty while o_size > 0
 			return -1;
-		chunk_ptr = faux_list_data(faux_list_head(async->o_list));
+		chunk_ptr = faux_list_data(node);
 		data_to_write = data_avail(async->o_list,
 			async->o_rpos, async->o_wpos);
 		if (data_to_write <= 0) // Strange case
@@ -341,3 +361,103 @@ ssize_t faux_async_out(faux_async_t *async)
 
 	return total_written;
 }
+
+
+ssize_t faux_async_in(faux_async_t *async)
+{
+	void *new_chunk = NULL;
+	ssize_t total_readed = 0;
+	ssize_t bytes_readed = 0;
+	ssize_t bytes_free = 0; // Free space within current (last) chunk
+
+	assert(async);
+	if (!async)
+		return -1;
+
+	do {
+		char *chunk_ptr = NULL;
+
+		// Allocate new chunk if necessary
+		bytes_free = free_space(async->i_list, async->i_wpos);
+		if (bytes_free < 0)
+			return -1;
+		if (0 == bytes_free) { // We need to allocate additional chunk
+			new_chunk = faux_malloc(DATA_CHUNK);
+			assert(new_chunk);
+			faux_list_add(async->i_list, new_chunk);
+			async->i_wpos = 0;
+			bytes_free = free_space(async->i_list, async->i_wpos);
+		}
+
+		// Read data to last chunk
+		chunk_ptr = faux_list_data(faux_list_tail(async->i_list));
+		bytes_readed = read(async->fd, chunk_ptr + async->i_wpos, bytes_free);
+		if (bytes_readed < 0) {
+			if ( // Something went wrong
+				(errno != EINTR) &&
+				(errno != EAGAIN) &&
+				(errno != EWOULDBLOCK)
+			)
+				return -1;
+		}
+		if (bytes_readed > 0) {
+			async->i_wpos += bytes_readed;
+			async->i_size += bytes_readed;
+			total_readed += bytes_readed;
+		}
+		if (async->i_size >= async->i_overflow)
+			return -1;
+
+		// Check for amount of stored data
+		while (async->i_size >= async->min) {
+
+			size_t copy_len = async->min;
+			size_t full_size = 0;
+			char *buf = NULL;
+			char *buf_ptr = NULL;
+
+			if (0 == async->max) { // Indefinite
+				copy_len = async->i_size; // Take all data
+			} else {
+				copy_len = (async->i_size < async->max) ?
+					async->i_size : async->max;
+			}
+
+			full_size = copy_len; // Save full length value
+			buf = faux_malloc(full_size);
+			buf_ptr = buf;
+			while (copy_len > 0) {
+				size_t data_to_write = 0;
+				faux_list_node_t *node = faux_list_head(async->i_list);
+				char *chunk_ptr = NULL;
+
+				if (!node) // Something went wrong
+					return -1;
+				chunk_ptr = faux_list_data(node);
+				data_to_write = data_avail(async->i_list,
+					async->i_rpos, async->i_wpos);
+				if (copy_len < data_to_write)
+					data_to_write = copy_len;
+				memcpy(buf_ptr, chunk_ptr + async->i_rpos,
+					data_to_write);
+				copy_len -= data_to_write;
+				async->i_size -= data_to_write;
+				async->i_rpos += data_to_write;
+				buf_ptr += data_to_write;
+				if (data_avail(async->i_list,
+					async->i_rpos, async->i_wpos) <= 0) {
+					async->i_rpos = 0;
+					faux_list_del(async->i_list, node);
+				}
+			}
+			// Execute callback
+			if (async->read_cb)
+				async->read_cb(async, buf,
+					full_size, async->read_udata);
+
+		}
+
+	} while (bytes_readed == bytes_free);
+
+	return total_readed;
+}

+ 4 - 1
faux/async/private.h

@@ -5,6 +5,7 @@
 
 struct faux_async_s {
 	int fd;
+
 	// Read
 	faux_async_read_cb_f read_cb; // Read callback
 	void *read_udata;
@@ -14,6 +15,8 @@ struct faux_async_s {
 	size_t i_rpos;
 	size_t i_wpos;
 	size_t i_size;
+	size_t i_overflow;
+
 	// Write
 	faux_async_stall_cb_f stall_cb; // Stall callback
 	void *stall_udata;
@@ -21,5 +24,5 @@ struct faux_async_s {
 	size_t o_rpos;
 	size_t o_wpos;
 	size_t o_size;
-	size_t overflow;
+	size_t o_overflow;
 };

+ 86 - 3
faux/async/testc_async.c

@@ -9,6 +9,7 @@
 #include "faux/async.h"
 #include "faux/testc_helpers.h"
 
+
 static bool_t stall_cb(faux_async_t *async, size_t len, void *user_data)
 {
 	bool_t *o_flag = (bool_t *)user_data;
@@ -16,7 +17,6 @@ static bool_t stall_cb(faux_async_t *async, size_t len, void *user_data)
 	if (!o_flag)
 		return BOOL_FALSE;
 	*o_flag = BOOL_TRUE;
-//	printf("Stall %lu\n", len);
 
 	async = async; // Happy compiler
 	len = len; // Happy compiler
@@ -24,7 +24,8 @@ static bool_t stall_cb(faux_async_t *async, size_t len, void *user_data)
 	return BOOL_TRUE;
 }
 
-int testc_faux_async(void)
+
+int testc_faux_async_write(void)
 {
 	const size_t len = 9000000l;
 	const size_t read_chunk = 1000;
@@ -58,7 +59,7 @@ int testc_faux_async(void)
 
 	out = faux_async_new(pipefd[1]);
 	faux_async_set_stall_cb(out, stall_cb, &o_flag);
-	faux_async_set_overflow(out, len + 1);
+	faux_async_set_write_overflow(out, len + 1);
 	if (faux_async_write(out, src_file, len) < 0) {
 		fprintf(stderr, "faux_async_write() error\n");
 		goto parse_error;
@@ -102,3 +103,85 @@ parse_error:
 
 	return ret;
 }
+
+
+static bool_t read_cb(faux_async_t *async, void *data, size_t len, void *user_data)
+{
+	int fd = *((int *)user_data);
+
+	faux_write_block(fd, data, len);
+	faux_free(data);
+
+	async = async; // Happy compiler
+
+	return BOOL_TRUE;
+}
+
+
+int testc_faux_async_read(void)
+{
+	const size_t len = 9000000l;
+	const size_t write_chunk = 2000;
+	const size_t read_chunk = 5000;
+	size_t left = 0;
+	char *src_file = NULL;
+	int ret = -1; // Pessimistic return value
+	char *src_fn = NULL;
+	char *dst_fn = NULL;
+	unsigned int i = 0;
+	unsigned char counter = 0;
+	faux_async_t *out = NULL;
+	int pipefd[2] = {-1, -1};
+	int fd = -1;
+
+	// Prepare files
+	src_file = faux_zmalloc(len);
+	for (i = 0; i < len; i++) {
+		src_file[i] = counter;
+		counter++;
+	}
+	src_fn = faux_testc_tmpfile_deploy(src_file, len);
+
+	if (pipe(pipefd) < 0)
+		goto parse_error;
+
+	dst_fn = faux_str_sprintf("%s/dst", getenv(FAUX_TESTC_TMPDIR_ENV));
+	fd = open(dst_fn, O_WRONLY | O_CREAT | O_TRUNC, 0600);
+
+	out = faux_async_new(pipefd[0]);
+	faux_async_set_read_cb(out, read_cb, &fd);
+	faux_async_set_read_limits(out, read_chunk, read_chunk);
+
+	// Sync pipe write and async pipe read
+	left = len;
+	while (left > 0) {
+		ssize_t bytes_written = 0;
+
+		bytes_written = write(pipefd[1], src_file + len - left,
+			left < write_chunk ? left : write_chunk);
+		if (bytes_written < 0)
+			continue;
+		left -= bytes_written;
+		faux_async_in(out);
+	}
+
+	// Compare etalon file and generated file
+	if (faux_testc_file_cmp(dst_fn, src_fn) != 0) {
+		fprintf(stderr, "Destination file %s is not equal to source %s\n",
+			dst_fn, src_fn);
+		goto parse_error;
+	}
+
+	ret = 0; // success
+
+parse_error:
+	if (pipefd[0] >= 0)
+		close(pipefd[0]);
+	if (pipefd[1] >= 0)
+		close(pipefd[1]);
+	faux_async_free(out);
+	faux_str_free(dst_fn);
+	faux_str_free(src_fn);
+
+	return ret;
+}

+ 2 - 1
faux/testc_module/testc_module.c

@@ -42,7 +42,8 @@ const char *testc_module[][2] = {
 	{"testc_faux_vec", "Complex test of variable length vector"},
 
 	// async
-	{"testc_faux_async", "Async read/write operations"},
+	{"testc_faux_async_write", "Async write operations"},
+	{"testc_faux_async_read", "Async read operations"},
 
 	// End of list
 	{NULL, NULL}