Browse Source

Client temporarily stops stdin getting if out buffer is full

Serj Kalichev 6 months ago
parent
commit
95d2b64f79
4 changed files with 50 additions and 5 deletions
  1. 38 0
      bin/klish/klish.c
  2. 2 0
      bin/klish/private.h
  3. 7 5
      klish/ktp/ktp_session.c
  4. 3 0
      klish/ktp_session.h

+ 38 - 0
bin/klish/klish.c

@@ -65,6 +65,8 @@ typedef struct ctx_s {
 
 
 // KTP session static functions
+static bool_t async_stdin_sent_cb(ktp_session_t *ktp, size_t len,
+	void *user_data);
 static bool_t stdout_cb(ktp_session_t *ktp, const char *line, size_t len,
 	void *user_data);
 static bool_t stderr_cb(ktp_session_t *ktp, const char *line, size_t len,
@@ -212,6 +214,7 @@ int main(int argc, char **argv)
 	ctx.opts = opts;
 	ctx.pager_working = TRI_UNDEFINED;
 
+	ktp_session_set_cb(ktp, KTP_SESSION_CB_STDIN, async_stdin_sent_cb, &ctx);
 	ktp_session_set_cb(ktp, KTP_SESSION_CB_STDOUT, stdout_cb, &ctx);
 	ktp_session_set_cb(ktp, KTP_SESSION_CB_STDERR, stderr_cb, &ctx);
 	ktp_session_set_cb(ktp, KTP_SESSION_CB_AUTH_ACK, auth_ack_cb, &ctx);
@@ -579,6 +582,7 @@ static bool_t stdin_cb(faux_eloop_t *eloop, faux_eloop_type_e type,
 	ktp_session_state_e state = KTP_SESSION_STATE_ERROR;
 	faux_eloop_info_fd_t *info = (faux_eloop_info_fd_t *)associated_data;
 	bool_t close_stdin = BOOL_FALSE;
+	size_t obuf_len = 0;
 
 	if (!ctx)
 		return BOOL_FALSE;
@@ -588,6 +592,14 @@ static bool_t stdin_cb(faux_eloop_t *eloop, faux_eloop_type_e type,
 	if (info->revents & (POLLHUP | POLLERR | POLLNVAL))
 		close_stdin = BOOL_TRUE;
 
+	// Temporarily stop stdin reading because too much data is buffered
+	// and all data can't be sent to server yet
+	obuf_len = faux_buf_len(faux_async_obuf(ktp_session_async(ctx->ktp)));
+	if (obuf_len > OBUF_LIMIT) {
+		faux_eloop_del_fd(eloop, STDIN_FILENO);
+		return BOOL_TRUE;
+	}
+
 	state = ktp_session_state(ctx->ktp);
 
 	// Standard klish command line
@@ -632,6 +644,25 @@ static bool_t stdin_cb(faux_eloop_t *eloop, faux_eloop_type_e type,
 }
 
 
+static bool_t async_stdin_sent_cb(ktp_session_t *ktp, size_t len,
+	void *user_data)
+{
+	ctx_t *ctx = (ctx_t *)user_data;
+
+	assert(ktp);
+
+	// This callbacks is executed when any number of bytes is really written
+	// to server socket. So if stdin transmit was stopped due to obuf
+	// overflow it's time to rearm transmission
+	faux_eloop_add_fd(ktp_session_eloop(ktp), STDIN_FILENO, POLLIN,
+		stdin_cb, ctx);
+
+	len = len; // Happy compiler
+
+	return BOOL_TRUE;
+}
+
+
 static bool_t send_winch_notification(ctx_t *ctx)
 {
 	size_t width = 0;
@@ -1013,6 +1044,8 @@ bool_t help_ack_cb(ktp_session_t *ktp, const faux_msg_t *msg, void *udata)
 }
 
 
+//size_t max_stdout_len = 0;
+
 static bool_t stdout_cb(ktp_session_t *ktp, const char *line, size_t len,
 	void *udata)
 {
@@ -1020,6 +1053,11 @@ static bool_t stdout_cb(ktp_session_t *ktp, const char *line, size_t len,
 
 	assert(ctx);
 
+//if (len > max_stdout_len) {
+//max_stdout_len = len;
+//fprintf(stderr, "max_stdout_len=%ld\n", max_stdout_len);
+//}
+
 	// Start pager if necessary
 	if (
 		ctx->opts->pager_enabled && // Pager enabled within config file

+ 2 - 0
bin/klish/private.h

@@ -9,6 +9,8 @@
 #define DEFAULT_CFGFILE "/etc/klish/klish.conf"
 #define DEFAULT_PAGER "/usr/bin/less -I -F -e -X -K -d -R"
 
+#define OBUF_LIMIT 65536
+
 /** @brief Command line and config file options
  */
 struct options {

+ 7 - 5
klish/ktp/ktp_session.c

@@ -80,10 +80,6 @@ ktp_session_t *ktp_session_new(int sock, faux_eloop_t *eloop)
 	// Async object
 	ktp->async = faux_async_new(sock);
 	assert(ktp->async);
-	// Workaround. Make buffer a large else we have lost stdin
-	// TODO: It must be refactored. So large buffer is bad idea
-	faux_async_set_write_overflow(ktp->async, 1000000000l);
-	faux_async_set_read_overflow(ktp->async, 1000000000l);
 	// Receive message header first
 	faux_async_set_read_limits(ktp->async,
 		sizeof(faux_hdr_t), sizeof(faux_hdr_t));
@@ -258,13 +254,19 @@ static bool_t server_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 
 	// Write data
 	if (info->revents & POLLOUT) {
+		ssize_t len = 0;
 		faux_eloop_exclude_fd_event(eloop, info->fd, POLLOUT);
-		if (faux_async_out_easy(ktp->async) < 0) {
+		if ((len = faux_async_out_easy(ktp->async)) < 0) {
 			// Someting went wrong
 			faux_eloop_del_fd(eloop, info->fd);
 			syslog(LOG_ERR, "Problem with async output");
 			return BOOL_FALSE; // Stop event loop
 		}
+		// Execute external callback
+		if (ktp->cb[KTP_SESSION_CB_STDIN].fn)
+			((ktp_session_stdin_cb_fn)
+				ktp->cb[KTP_SESSION_CB_STDIN].fn)(
+				ktp, len, ktp->cb[KTP_SESSION_CB_STDIN].udata);
 	}
 
 	// Read data

+ 3 - 0
klish/ktp_session.h

@@ -56,6 +56,7 @@ typedef enum {
 } ktp_session_state_e;
 
 typedef enum {
+	KTP_SESSION_CB_STDIN,
 	KTP_SESSION_CB_STDOUT,
 	KTP_SESSION_CB_STDERR,
 	KTP_SESSION_CB_AUTH_ACK,
@@ -67,6 +68,8 @@ typedef enum {
 	KTP_SESSION_CB_MAX,
 } ktp_session_cb_e;
 
+typedef bool_t (*ktp_session_stdin_cb_fn)(ktp_session_t *ktp,
+	size_t len, void *udata);
 typedef bool_t (*ktp_session_stdout_cb_fn)(ktp_session_t *ktp,
 	const char *line, size_t len, void *udata);
 typedef bool_t (*ktp_session_event_cb_fn)(ktp_session_t *ktp,