Browse Source

ktpd: Process ACTION

Serj Kalichev 2 years ago
parent
commit
e0b011bc69
3 changed files with 230 additions and 150 deletions
  1. 0 29
      bin/klishd/klishd.c
  2. 230 119
      klish/ktp/ktpd_session.c
  3. 0 2
      klish/ktp_session.h

+ 0 - 29
bin/klishd/klishd.c

@@ -211,10 +211,6 @@ err: // For listen daemon
 	faux_eloop_add_signal(eloop, SIGINT, stop_loop_ev, NULL);
 	faux_eloop_add_signal(eloop, SIGTERM, stop_loop_ev, NULL);
 	faux_eloop_add_signal(eloop, SIGQUIT, stop_loop_ev, NULL);
-	// Theoretically eloop can use SIGCHLD for different child processes but
-	// not only for single ktpd_session's ACTIONs so it's not goot to grab
-	// whole SIGCHLD event handler by ktpd_session object.
-	faux_eloop_add_signal(eloop, SIGCHLD, wait_for_actions_ev, ktpd_session);
 	// Main service loop
 	faux_eloop_loop(eloop);
 
@@ -540,31 +536,6 @@ static bool_t wait_for_child_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 }
 
 
-/** @brief Wait for child processes (ACTIONs).
- */
-static bool_t wait_for_actions_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
-	void *associated_data, void *user_data)
-{
-	int wstatus = 0;
-	pid_t child_pid = -1;
-	ktpd_session_t *ktpd_session = (ktpd_session_t *)user_data;
-
-	// Wait for any child process. Doesn't block.
-	while ((child_pid = waitpid(-1, &wstatus, WNOHANG)) > 0) {
-		if (!ktpd_session)
-			continue;
-		ktpd_session_terminated_action(ktpd_session, child_pid, wstatus);
-	}
-
-	// Happy compiler
-	eloop = eloop;
-	type = type;
-	associated_data = associated_data;
-
-	return BOOL_TRUE;
-}
-
-
 /** @brief Re-read config file.
  *
  * This function can refresh klishd options but plugins (dbs for example) are

+ 230 - 119
klish/ktp/ktpd_session.c

@@ -11,6 +11,7 @@
 #include <sys/un.h>
 #include <syslog.h>
 #include <poll.h>
+#include <sys/wait.h>
 
 #include <faux/str.h>
 #include <faux/async.h>
@@ -30,14 +31,15 @@ typedef enum {
 
 
 struct ktpd_session_s {
-	ksession_t *ksession;
+	ksession_t *session;
 	ktpd_session_state_e state;
 	uid_t uid;
 	gid_t gid;
 	char *user;
 	faux_async_t *async;
 	faux_hdr_t *hdr; // Engine will receive header and then msg
-	faux_eloop_t *eloop;
+	faux_eloop_t *eloop; // External link, dont's free()
+	kexec_t *exec;
 };
 
 
@@ -48,57 +50,63 @@ static bool_t ktpd_session_stall_cb(faux_async_t *async,
 	size_t len, void *user_data);
 static bool_t client_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 	void *associated_data, void *user_data);
+static bool_t wait_for_actions_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
+	void *associated_data, void *user_data);
+static bool_t ktpd_session_exec(ktpd_session_t *ktpd, const char *line,
+	int *retcode, faux_error_t *error);
 
 
 ktpd_session_t *ktpd_session_new(int sock, const kscheme_t *scheme,
 	const char *start_entry, faux_eloop_t *eloop)
 {
-	ktpd_session_t *session = NULL;
+	ktpd_session_t *ktpd = NULL;
 
 	if (sock < 0)
 		return NULL;
 	if (!eloop)
 		return NULL;
 
-	session = faux_zmalloc(sizeof(*session));
-	assert(session);
-	if (!session)
+	ktpd = faux_zmalloc(sizeof(*ktpd));
+	assert(ktpd);
+	if (!ktpd)
 		return NULL;
 
 	// Init
-	session->state = KTPD_SESSION_STATE_NOT_AUTHORIZED;
-	session->eloop = eloop;
-	session->ksession = ksession_new(scheme, start_entry);
-	assert(session->ksession);
+	ktpd->state = KTPD_SESSION_STATE_IDLE;
+	ktpd->eloop = eloop;
+	ktpd->session = ksession_new(scheme, start_entry);
+	assert(ktpd->session);
 
 	// Async object
-	session->async = faux_async_new(sock);
-	assert(session->async);
+	ktpd->async = faux_async_new(sock);
+	assert(ktpd->async);
 	// Receive message header first
-	faux_async_set_read_limits(session->async,
+	faux_async_set_read_limits(ktpd->async,
 		sizeof(faux_hdr_t), sizeof(faux_hdr_t));
-	faux_async_set_read_cb(session->async, ktpd_session_read_cb, session);
-	session->hdr = NULL;
-	faux_async_set_stall_cb(session->async, ktpd_session_stall_cb, session);
+	faux_async_set_read_cb(ktpd->async, ktpd_session_read_cb, ktpd);
+	ktpd->hdr = NULL;
+	faux_async_set_stall_cb(ktpd->async, ktpd_session_stall_cb, ktpd);
 
 	// Eloop callbacks
-	faux_eloop_add_fd(session->eloop, ktpd_session_fd(session), POLLIN,
-		client_ev, session);
+	faux_eloop_add_fd(ktpd->eloop, ktpd_session_fd(ktpd), POLLIN,
+		client_ev, ktpd);
+	faux_eloop_add_signal(ktpd->eloop, SIGCHLD, wait_for_actions_ev, ktpd);
 
-	return session;
+	return ktpd;
 }
 
 
-void ktpd_session_free(ktpd_session_t *session)
+void ktpd_session_free(ktpd_session_t *ktpd)
 {
-	if (!session)
+	if (!ktpd)
 		return;
 
-	ksession_free(session->ksession);
-	faux_free(session->hdr);
-	close(ktpd_session_fd(session));
-	faux_async_free(session->async);
-	faux_free(session);
+	kexec_free(ktpd->exec);
+	ksession_free(ktpd->session);
+	faux_free(ktpd->hdr);
+	close(ktpd_session_fd(ktpd));
+	faux_async_free(ktpd->async);
+	faux_free(ktpd);
 }
 
 
@@ -140,77 +148,53 @@ static bool_t ktpd_session_send_error(ktpd_session_t *session,
 }
 
 
-static bool_t ktpd_session_process_cmd(ktpd_session_t *session, faux_msg_t *msg)
+static bool_t ktpd_session_process_cmd(ktpd_session_t *ktpd, faux_msg_t *msg)
 {
 	char *line = NULL;
 	faux_msg_t *ack = NULL;
-//	kpargv_t *pargv = NULL;
+	int retcode = -1;
 	ktp_cmd_e cmd = KTP_CMD_ACK;
-	kexec_t *exec = NULL;
 	faux_error_t *error = NULL;
+	bool_t rc = BOOL_FALSE;
 
-	assert(session);
+	assert(ktpd);
 	assert(msg);
 
 	// Get line from message
 	if (!(line = faux_msg_get_str_param_by_type(msg, KTP_PARAM_LINE))) {
-		ktpd_session_send_error(session, cmd,
+		ktpd_session_send_error(ktpd, cmd,
 			"The line is not specified");
 		return BOOL_FALSE;
 	}
 
-	// Parsing
-/*	error = faux_error_new();
-	exec = ksession_parse_for_exec(session->ksession, line, error);
-	faux_str_free(line);
+	error = faux_error_new();
 
-	if (exec) {
-		kexec_contexts_node_t *iter = kexec_contexts_iter(exec);
-		kcontext_t *context = NULL;
-		while ((context = kexec_contexts_each(&iter))) {
-			kpargv_debug(kcontext_pargv(context));
-		}
-	} else {
-		char *err = faux_error_cstr(error);
-		ktpd_session_send_error(session, cmd, err);
-		faux_str_free(err);
-		return BOOL_FALSE;
+	rc = ktpd_session_exec(ktpd, line, &retcode, error);
+	if (ktpd->exec) {
+		faux_error_free(error);
+		return BOOL_TRUE; // Continue and wait for ACTION
 	}
 
-	kexec_exec(exec);
-*/
+	// Session status can be changed while parsing
+	if (ksession_done(ktpd->session)) {
+		ktpd_session_send_error(ktpd, cmd, "Interrupted by system");
+		faux_error_free(error);
+		return BOOL_FALSE;
+	}
 
-	{
-	int retcode = 0;
-	bool_t r = BOOL_FALSE;
-	r = ksession_exec_locally(session->ksession, line, &retcode, error);
-	faux_str_free(line);
-	if (!r)
-		printf("ksession_exec_locally() return value is false\n");
-	printf("kexec retcode is %d\n", retcode);
+	if (rc) {
+		char *err = faux_error_cstr(error);
+		ktpd_session_send_error(ktpd, cmd, err);
+		faux_str_free(err);
+		return BOOL_FALSE;
+	} else {
+		ack = ktp_msg_preform(cmd, KTP_STATUS_NONE);
+		faux_msg_send_async(ack, ktpd->async);
+		faux_msg_free(ack);
 	}
 
-//	ktpd_session_exec(session, exec);
-
-//	kpargv_debug(pargv);
-//	if (kpargv_status(pargv) != KPARSE_OK) {
-//		char *error = NULL;
-//		error = faux_str_sprintf("Can't parse line: %s",
-//			kpargv_status_str(pargv));
-//		kpargv_free(pargv);
-//		ktpd_session_send_error(session, cmd, error);
-//		return BOOL_FALSE;
-//	}
-//
-//	kpargv_free(pargv);
-	kexec_free(exec);
 	faux_error_free(error);
 
-	// Send ACK message
-	ack = ktp_msg_preform(cmd, KTP_STATUS_NONE);
-	faux_msg_send_async(ack, session->async);
-	faux_msg_free(ack);
-
 	return BOOL_TRUE;
 }
 
@@ -232,7 +216,7 @@ static bool_t ktpd_session_process_completion(ktpd_session_t *session, faux_msg_
 	}
 
 	// Parsing
-	pargv = ksession_parse_for_completion(session->ksession, line);
+	pargv = ksession_parse_for_completion(session->session, line);
 	faux_str_free(line);
 	if (!pargv) {
 		ktpd_session_send_error(session, cmd, NULL);
@@ -268,7 +252,7 @@ static bool_t ktpd_session_process_help(ktpd_session_t *session, faux_msg_t *msg
 	}
 
 /*	// Parsing
-	pargv = ksession_parse_line(session->ksession, line, KPURPOSE_HELP);
+	pargv = ksession_parse_line(session->session, line, KPURPOSE_HELP);
 	faux_str_free(line);
 	kpargv_free(pargv);
 */
@@ -321,27 +305,27 @@ static bool_t ktpd_session_dispatch(ktpd_session_t *session, faux_msg_t *msg)
 static bool_t ktpd_session_read_cb(faux_async_t *async,
 	void *data, size_t len, void *user_data)
 {
-	ktpd_session_t *session = (ktpd_session_t *)user_data;
+	ktpd_session_t *ktpd = (ktpd_session_t *)user_data;
 	faux_msg_t *completed_msg = NULL;
 
 	assert(async);
 	assert(data);
-	assert(session);
+	assert(ktpd);
 
 	// Receive header
-	if (!session->hdr) {
+	if (!ktpd->hdr) {
 		size_t whole_len = 0;
 		size_t msg_wo_hdr = 0;
 
-		session->hdr = (faux_hdr_t *)data;
+		ktpd->hdr = (faux_hdr_t *)data;
 		// Check for broken header
-		if (!check_ktp_header(session->hdr)) {
-			faux_free(session->hdr);
-			session->hdr = NULL;
+		if (!check_ktp_header(ktpd->hdr)) {
+			faux_free(ktpd->hdr);
+			ktpd->hdr = NULL;
 			return BOOL_FALSE;
 		}
 
-		whole_len = faux_hdr_len(session->hdr);
+		whole_len = faux_hdr_len(ktpd->hdr);
 		// msg_wo_hdr >= 0 because check_ktp_header() validates whole_len
 		msg_wo_hdr = whole_len - sizeof(faux_hdr_t);
 		// Plan to receive message body
@@ -351,24 +335,28 @@ static bool_t ktpd_session_read_cb(faux_async_t *async,
 			return BOOL_TRUE;
 		}
 		// Here message is completed (msg body has zero length)
-		completed_msg = faux_msg_deserialize_parts(session->hdr, NULL, 0);
+		completed_msg = faux_msg_deserialize_parts(ktpd->hdr, NULL, 0);
 
 	// Receive message body
 	} else {
-		completed_msg = faux_msg_deserialize_parts(session->hdr, data, len);
+		completed_msg = faux_msg_deserialize_parts(ktpd->hdr, data, len);
 		faux_free(data);
 	}
 
 	// Plan to receive msg header
-	faux_async_set_read_limits(session->async,
+	faux_async_set_read_limits(ktpd->async,
 		sizeof(faux_hdr_t), sizeof(faux_hdr_t));
-	faux_free(session->hdr);
-	session->hdr = NULL; // Ready to recv new header
+	faux_free(ktpd->hdr);
+	ktpd->hdr = NULL; // Ready to recv new header
 
 	// Here message is completed
-	ktpd_session_dispatch(session, completed_msg);
+	ktpd_session_dispatch(ktpd, completed_msg);
 	faux_msg_free(completed_msg);
 
+	// Session status can be changed while parsing
+	if (ksession_done(ktpd->session))
+		return BOOL_FALSE;
+
 	return BOOL_TRUE;
 }
 
@@ -376,13 +364,13 @@ static bool_t ktpd_session_read_cb(faux_async_t *async,
 static bool_t ktpd_session_stall_cb(faux_async_t *async,
 	size_t len, void *user_data)
 {
-	ktpd_session_t *session = (ktpd_session_t *)user_data;
+	ktpd_session_t *ktpd = (ktpd_session_t *)user_data;
 
 	assert(async);
-	assert(session);
-	assert(session->eloop);
+	assert(ktpd);
+	assert(ktpd->eloop);
 
-	faux_eloop_include_fd_event(session->eloop, ktpd_session_fd(session), POLLOUT);
+	faux_eloop_include_fd_event(ktpd->eloop, ktpd_session_fd(ktpd), POLLOUT);
 
 	async = async; // Happy compiler
 	len = len; // Happy compiler
@@ -391,52 +379,52 @@ static bool_t ktpd_session_stall_cb(faux_async_t *async,
 }
 
 
-bool_t ktpd_session_connected(ktpd_session_t *session)
+bool_t ktpd_session_connected(ktpd_session_t *ktpd)
 {
-	assert(session);
-	if (!session)
+	assert(ktpd);
+	if (!ktpd)
 		return BOOL_FALSE;
-	if (KTPD_SESSION_STATE_DISCONNECTED == session->state)
+	if (KTPD_SESSION_STATE_DISCONNECTED == ktpd->state)
 		return BOOL_FALSE;
 
 	return BOOL_TRUE;
 }
 
 
-int ktpd_session_fd(const ktpd_session_t *session)
+int ktpd_session_fd(const ktpd_session_t *ktpd)
 {
-	assert(session);
-	if (!session)
+	assert(ktpd);
+	if (!ktpd)
 		return BOOL_FALSE;
 
-	return faux_async_fd(session->async);
+	return faux_async_fd(ktpd->async);
 }
 
 
-bool_t ktpd_session_async_in(ktpd_session_t *session)
+bool_t ktpd_session_async_in(ktpd_session_t *ktpd)
 {
-	assert(session);
-	if (!session)
+	assert(ktpd);
+	if (!ktpd)
 		return BOOL_FALSE;
-	if (!ktpd_session_connected(session))
+	if (!ktpd_session_connected(ktpd))
 		return BOOL_FALSE;
 
-	if (faux_async_in(session->async) < 0)
+	if (faux_async_in(ktpd->async) < 0)
 		return BOOL_FALSE;
 
 	return BOOL_TRUE;
 }
 
 
-bool_t ktpd_session_async_out(ktpd_session_t *session)
+bool_t ktpd_session_async_out(ktpd_session_t *ktpd)
 {
-	assert(session);
-	if (!session)
+	assert(ktpd);
+	if (!ktpd)
 		return BOOL_FALSE;
-	if (!ktpd_session_connected(session))
+	if (!ktpd_session_connected(ktpd))
 		return BOOL_FALSE;
 
-	if (faux_async_out(session->async) < 0)
+	if (faux_async_out(ktpd->async) < 0)
 		return BOOL_FALSE;
 
 	return BOOL_TRUE;
@@ -485,15 +473,138 @@ static bool_t client_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 }
 
 
-bool_t ktpd_session_terminated_action(ktpd_session_t *session,
-	pid_t pid, int wstatus)
+static bool_t wait_for_actions_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
+	void *associated_data, void *user_data)
 {
-	assert(session);
-	if (!session)
+	int wstatus = 0;
+	pid_t child_pid = -1;
+	ktpd_session_t *ktpd = (ktpd_session_t *)user_data;
+	int retcode = -1;
+	faux_msg_t *ack = NULL;
+	ktp_cmd_e cmd = KTP_CMD_ACK;
+
+	if (!ktpd)
 		return BOOL_FALSE;
 
-	syslog(LOG_ERR, "ACTION process %d was terminated: %d",
-		pid, WEXITSTATUS(wstatus));
+	// Wait for any child process. Doesn't block.
+	while ((child_pid = waitpid(-1, &wstatus, WNOHANG)) > 0) {
+		if (ktpd->exec)
+			kexec_continue_command_execution(ktpd->exec, child_pid,
+				wstatus);
+	}
+	if (!ktpd->exec)
+		return BOOL_TRUE;
+
+	// Check if kexec is done now
+	if (!kexec_retcode(ktpd->exec, &retcode))
+		return BOOL_TRUE; // Continue
+
+	faux_eloop_del_fd(eloop, kexec_stdout(ktpd->exec));
+
+	kexec_free(ktpd->exec);
+	ktpd->exec = NULL;
+	ktpd->state = KTPD_SESSION_STATE_IDLE;
+
+	// Send ACK message
+	ack = ktp_msg_preform(cmd, KTP_STATUS_NONE);
+	faux_msg_send_async(ack, ktpd->async);
+	faux_msg_free(ack);
+
+	type = type; // Happy compiler
+	associated_data = associated_data; // Happy compiler
+
+	return BOOL_TRUE;
+}
+
+
+static bool_t action_stdout_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
+	void *associated_data, void *user_data)
+{
+	faux_eloop_info_fd_t *info = (faux_eloop_info_fd_t *)associated_data;
+	ktpd_session_t *ktpd = (ktpd_session_t *)user_data;
+	kexec_t *exec = NULL;
+	ssize_t r = -1;
+	faux_buf_t *faux_buf = NULL;
+	void *linear_buf = NULL;
+	char *buf = NULL;
+	ssize_t len = 0;
+
+	if (!ktpd)
+		return BOOL_TRUE;
+	exec = ktpd->exec;
+	if (!exec)
+		return BOOL_TRUE;
+
+	faux_buf = kexec_bufout(exec);
+	assert(faux_buf);
+
+	do {
+		ssize_t really_readed = 0;
+		ssize_t linear_len =
+			faux_buf_dwrite_lock_easy(faux_buf, &linear_buf);
+		// Non-blocked read. The fd became non-blocked while
+		// kexec_prepare().
+		r = read(info->fd, linear_buf, linear_len);
+		if (r > 0)
+			really_readed = r;
+		faux_buf_dwrite_unlock_easy(faux_buf, really_readed);
+	} while (r > 0);
+
+	len = faux_buf_len(faux_buf);
+	if (0 == len)
+		return BOOL_TRUE;
+
+	buf = malloc(len);
+	faux_buf_read(faux_buf, buf, len);
+write(STDOUT_FILENO, buf, len);
+	free(buf);
+
+	// Happy compiler
+	eloop = eloop;
+	type = type;
+
+	return BOOL_TRUE;
+}
+
+
+static bool_t ktpd_session_exec(ktpd_session_t *ktpd, const char *line,
+	int *retcode, faux_error_t *error)
+{
+	kexec_t *exec = NULL;
+
+	assert(ktpd);
+	if (!ktpd)
+		return BOOL_FALSE;
+
+	// Parsing
+	exec = ksession_parse_for_exec(ktpd->session, line, error);
+	if (!exec)
+		return BOOL_FALSE;
+
+	// Session status can be changed while parsing
+	if (ksession_done(ktpd->session)) {
+		kexec_free(exec);
+		return BOOL_FALSE; // Because action is not completed
+	}
+
+	// Execute kexec and then wait for completion using global Eloop
+	if (!kexec_exec(exec)) {
+		kexec_free(exec);
+		return BOOL_FALSE; // Something went wrong
+	}
+	// If kexec contains only non-exec (for example dry-run) ACTIONs then
+	// we don't need event loop and can return here.
+	if (kexec_retcode(exec, retcode)) {
+		kexec_free(exec);
+		return BOOL_TRUE;
+	}
+
+	// Save kexec pointer to use later
+	ktpd->state = KTPD_SESSION_STATE_WAIT_FOR_PROCESS;
+	ktpd->exec = exec;
+
+	faux_eloop_add_fd(ktpd->eloop, kexec_stdout(exec), POLLIN,
+		action_stdout_ev, ktpd);
 
 	return BOOL_TRUE;
 }

+ 0 - 2
klish/ktp_session.h

@@ -41,8 +41,6 @@ bool_t ktpd_session_connected(ktpd_session_t *session);
 int ktpd_session_fd(const ktpd_session_t *session);
 bool_t ktpd_session_async_in(ktpd_session_t *session);
 bool_t ktpd_session_async_out(ktpd_session_t *session);
-bool_t ktpd_session_terminated_action(ktpd_session_t *session,
-	pid_t pid, int wstatus);
 
 C_DECL_END