Browse Source

Limit server's buffers

Serj Kalichev 6 months ago
parent
commit
08e40f4fdb
2 changed files with 28 additions and 17 deletions
  1. 3 0
      klish/ksession/kexec.c
  2. 25 17
      klish/ktp/ktpd_session.c

+ 3 - 0
klish/ksession/kexec.c

@@ -357,6 +357,9 @@ static bool_t kexec_prepare(kexec_t *exec)
 	} else {
 		if (pipe(pipefd) < 0)
 			return BOOL_FALSE;
+		// Write end of 'stdin' pipe must be non-blocked
+		fflags = fcntl(pipefd[1], F_GETFL);
+		fcntl(pipefd[1], F_SETFL, fflags | O_NONBLOCK);
 		r_end = pipefd[0];
 		w_end = pipefd[1];
 	}

+ 25 - 17
klish/ktp/ktpd_session.c

@@ -62,7 +62,8 @@ static bool_t action_stdout_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 	void *associated_data, void *user_data);
 static bool_t action_stderr_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 	void *associated_data, void *user_data);
-static bool_t get_stream(ktpd_session_t *ktpd, int fd, bool_t is_stderr);
+static bool_t get_stream(ktpd_session_t *ktpd, int fd, bool_t is_stderr,
+	bool_t process_all_data);
 
 
 ktpd_session_t *ktpd_session_new(int sock, kscheme_t *scheme,
@@ -518,8 +519,8 @@ static bool_t wait_for_actions_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 	// Sometimes SIGCHILD signal can appear before all data were really read
 	// from process stdout buffer. So read the least data before closing
 	// file descriptors and send it to client.
-	get_stream(ktpd, kexec_stdout(ktpd->exec), BOOL_FALSE);
-	get_stream(ktpd, kexec_stderr(ktpd->exec), BOOL_TRUE);
+	get_stream(ktpd, kexec_stdout(ktpd->exec), BOOL_FALSE, BOOL_TRUE);
+	get_stream(ktpd, kexec_stderr(ktpd->exec), BOOL_TRUE, BOOL_TRUE);
 	faux_eloop_del_fd(eloop, kexec_stdin(ktpd->exec));
 	faux_eloop_del_fd(eloop, kexec_stdout(ktpd->exec));
 	faux_eloop_del_fd(eloop, kexec_stderr(ktpd->exec));
@@ -913,9 +914,9 @@ static ssize_t stdin_out(int fd, faux_buf_t *buf, bool_t process_all_data)
 		// Not whole data block was written
 		} else if (bytes_written != data_to_write) {
 			break;
-		} else if (!process_all_data) {
-			break;
 		}
+		if (!process_all_data)
+			break;
 	}
 
 	return total_written;
@@ -956,8 +957,6 @@ static bool_t push_stdin(ktpd_session_t *ktpd)
 }
 
 
-//size_t max_kexec_stdin = 0;
-
 static bool_t ktpd_session_process_stdin(ktpd_session_t *ktpd, faux_msg_t *msg)
 {
 	char *line = NULL;
@@ -1007,11 +1006,6 @@ static bool_t ktpd_session_process_stdin(ktpd_session_t *ktpd, faux_msg_t *msg)
 		faux_buf_write(bufin, line, len);
 	}
 
-//size_t l = faux_buf_len(bufin);
-//if (l > max_kexec_stdin) {
-//max_kexec_stdin = l;
-//fprintf(stderr, "max_kexec_stdin=%ld\n", l);
-//}
 	stdin_out(fd, bufin, BOOL_FALSE); // Non-blocking write
 	if (faux_buf_len(bufin) == 0)
 		return BOOL_TRUE;
@@ -1326,7 +1320,8 @@ int ktpd_session_fd(const ktpd_session_t *ktpd)
 }
 
 
-static bool_t get_stream(ktpd_session_t *ktpd, int fd, bool_t is_stderr)
+static bool_t get_stream(ktpd_session_t *ktpd, int fd, bool_t is_stderr,
+	bool_t process_all_data)
 {
 	ssize_t r = -1;
 	faux_buf_t *faux_buf = NULL;
@@ -1356,7 +1351,7 @@ static bool_t get_stream(ktpd_session_t *ktpd, int fd, bool_t is_stderr)
 		if (r > 0)
 			really_readed = r;
 		faux_buf_dwrite_unlock_easy(faux_buf, really_readed);
-	} while (r > 0);
+	} while ((r > 0) && process_all_data);
 
 	len = faux_buf_len(faux_buf);
 	if (0 == len)
@@ -1373,11 +1368,15 @@ static bool_t get_stream(ktpd_session_t *ktpd, int fd, bool_t is_stderr)
 
 	free(buf);
 
+	// Pause stdout/stderr receiving because buffer (to send to client)
+	// is full
+	if (faux_buf_len(faux_async_obuf(ktpd->async)) > BUF_LIMIT)
+		faux_eloop_exclude_fd_event(ktpd->eloop, fd, POLLIN);
+
 	return BOOL_TRUE;
 }
 
 
-
 static bool_t action_stdout_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 	void *associated_data, void *user_data)
 {
@@ -1391,7 +1390,7 @@ static bool_t action_stdout_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 		push_stdin(ktpd);
 
 	if (info->revents & POLLIN)
-		get_stream(ktpd, info->fd, BOOL_FALSE);
+		get_stream(ktpd, info->fd, BOOL_FALSE, BOOL_FALSE);
 
 	// Some errors or fd is closed so remove it from polling
 	// EOF || POLERR || POLLNVAL
@@ -1411,7 +1410,7 @@ static bool_t action_stderr_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 	ktpd_session_t *ktpd = (ktpd_session_t *)user_data;
 
 	if (info->revents & POLLIN)
-		get_stream(ktpd, info->fd, BOOL_TRUE);
+		get_stream(ktpd, info->fd, BOOL_TRUE, BOOL_FALSE);
 
 	// Some errors or fd is closed so remove it from polling
 	// EOF || POLERR || POLLNVAL
@@ -1442,6 +1441,15 @@ bool_t client_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 			syslog(LOG_ERR, "Can't send data to client");
 			return BOOL_FALSE; // Stop event loop
 		}
+		// Restore stdout and stderr receiving if out buffer is not
+		// full
+		if (ktpd->exec &&
+			faux_buf_len(faux_async_obuf(async)) < BUF_LIMIT) {
+			faux_eloop_include_fd_event(ktpd->eloop,
+				kexec_stdout(ktpd->exec), POLLIN);
+			faux_eloop_include_fd_event(ktpd->eloop,
+				kexec_stderr(ktpd->exec), POLLIN);
+		}
 	}
 
 	// Read data