Procházet zdrojové kódy

faux.async: write and out functions

Serj Kalichev před 3 roky
rodič
revize
3745c9d052
3 změnil soubory, kde provedl 137 přidání a 4 odebrání
  1. 8 0
      faux/async.h
  2. 123 2
      faux/async/async.c
  3. 6 2
      faux/async/private.h

+ 8 - 0
faux/async.h

@@ -31,6 +31,14 @@ C_DECL_BEGIN
 faux_async_t *faux_async_new(int fd);
 void faux_async_free(faux_async_t *async);
 int faux_async_fd(const faux_async_t *async);
+void faux_async_set_read_cb(faux_async_t *async,
+	faux_async_read_cb_f read_cb, void *user_data);
+bool_t faux_async_set_read_limits(faux_async_t *async, size_t min, size_t max);
+void faux_async_set_stall_cb(faux_async_t *async,
+	faux_async_stall_cb_f stall_cb, void *user_data);
+void faux_async_set_overflow(faux_async_t *async, size_t overflow);
+ssize_t faux_async_write(faux_async_t *async, void *data, size_t len);
+ssize_t faux_async_out(faux_async_t *async);
 
 C_DECL_END
 

+ 123 - 2
faux/async/async.c

@@ -62,7 +62,9 @@ faux_async_t *faux_async_new(int fd)
 	async->max = 0; // Indefinite
 	async->i_list = faux_list_new(FAUX_LIST_UNSORTED, FAUX_LIST_NONUNIQUE,
 		NULL, NULL, faux_free);
-	async->i_pos = 0;
+	async->i_rpos = 0;
+	async->i_wpos = 0;
+	async->i_size = 0;
 
 	// Write (Output)
 	async->stall_cb = NULL;
@@ -70,7 +72,9 @@ faux_async_t *faux_async_new(int fd)
 	async->overflow = 10000000l; // ~ 10M
 	async->o_list = faux_list_new(FAUX_LIST_UNSORTED, FAUX_LIST_NONUNIQUE,
 		NULL, NULL, faux_free);
-	async->o_pos = 0;
+	async->o_rpos = 0;
+	async->o_wpos = 0;
+	async->o_size = 0;
 
 	return async;
 }
@@ -192,3 +196,120 @@ void faux_async_set_overflow(faux_async_t *async, size_t overflow)
 
 	async->overflow = overflow;
 }
+
+
+static ssize_t free_space(faux_list_t *list, size_t pos)
+{
+	if (!list)
+		return -1;
+
+	if (faux_list_len(list) == 0)
+		return 0;
+
+	return (DATA_CHUNK - pos);
+}
+
+
+ssize_t faux_async_write(faux_async_t *async, void *data, size_t len)
+{
+	void *new_chunk = NULL;
+	size_t data_left = len;
+
+	assert(async);
+	if (!async)
+		return -1;
+	assert(data);
+	if (!data)
+		return -1;
+
+	while (data_left != 0) {
+		ssize_t bytes_free = 0;
+		size_t copy_len = 0;
+		char *chunk_ptr = NULL;
+
+		// Allocate new chunk if necessary
+		bytes_free = free_space(async->o_list, async->o_wpos);
+		if (bytes_free < 0)
+			return -1;
+		if (0 == bytes_free) {
+			new_chunk = faux_malloc(DATA_CHUNK);
+			assert(new_chunk);
+			faux_list_add(async->o_list, new_chunk);
+			async->o_wpos = 0;
+			bytes_free = free_space(async->o_list, async->o_wpos);
+		}
+
+		// Copy data
+		chunk_ptr = faux_list_data(faux_list_tail(async->o_list));
+		copy_len = (len < (size_t)bytes_free) ? len : (size_t)bytes_free;
+		memcpy(chunk_ptr + async->o_wpos, data + len - data_left,
+			copy_len);
+		async->o_wpos += copy_len;
+		data_left -= copy_len;
+		async->o_size += copy_len;
+		if (async->o_size >= async->overflow)
+			return -1;
+	}
+
+	// Try to real write data to fd in nonblocked mode
+	faux_async_out(async);
+
+	return len;
+}
+
+
+static ssize_t data_avail(faux_list_t *list, size_t rpos, size_t wpos)
+{
+	size_t len = 0;
+
+	if (!list)
+		return -1;
+
+	len = faux_list_len(list);
+	if (len == 0)
+		return 0;
+	if (len > 1)
+		return (DATA_CHUNK - rpos);
+
+	// Single chunk
+	return (wpos - rpos);
+}
+
+
+ssize_t faux_async_out(faux_async_t *async)
+{
+	assert(async);
+	if (!async)
+		return -1;
+
+	while (async->o_size > 0) {
+		faux_list_node_t *node = NULL;
+		char *chunk_ptr = NULL;
+		ssize_t data_to_write = 0;
+		ssize_t bytes_written = 0;
+
+		node = faux_list_head(async->o_list);
+		chunk_ptr = faux_list_data(faux_list_head(async->o_list));
+		data_to_write = data_avail(async->o_list,
+			async->o_rpos, async->o_wpos);
+		if (data_to_write < 0)
+			return -1;
+		bytes_written = write(async->fd, chunk_ptr + async->o_rpos,
+			data_to_write);
+		if (bytes_written <= 0)
+			return bytes_written;
+		async->o_size -= bytes_written;
+		if (bytes_written != data_to_write) {
+			async->o_rpos += bytes_written;
+			// Execute callback
+			if (async->stall_cb)
+				async->stall_cb(async, async->o_size,
+					async->stall_udata);
+			return async->o_size;
+		}
+		async->o_rpos = 0;
+		faux_list_del(async->o_list, node);
+	}
+
+	return 0;
+}

+ 6 - 2
faux/async/private.h

@@ -11,11 +11,15 @@ struct faux_async_s {
 	size_t min;
 	size_t max;
 	faux_list_t *i_list;
-	size_t i_pos;
+	size_t i_rpos;
+	size_t i_wpos;
+	size_t i_size;
 	// Write
 	faux_async_stall_cb_f stall_cb; // Stall callback
 	void *stall_udata;
 	faux_list_t *o_list;
-	size_t o_pos;
+	size_t o_rpos;
+	size_t o_wpos;
+	size_t o_size;
 	size_t overflow;
 };