Browse Source

ktp: Move some generic code to ktp.c

Serj Kalichev 2 years ago
parent
commit
82260887fb
4 changed files with 228 additions and 170 deletions
  1. 96 0
      klish/ktp/ktp.c
  2. 113 27
      klish/ktp/ktp_session.c
  3. 10 143
      klish/ktp/ktpd_session.c
  4. 9 0
      klish/ktp_session.h

+ 96 - 0
klish/ktp/ktp.c

@@ -9,9 +9,12 @@
 #include <fcntl.h>
 #include <sys/socket.h>
 #include <sys/un.h>
+#include <syslog.h>
 
 #include <faux/str.h>
 #include <faux/msg.h>
+#include <faux/eloop.h>
+#include <faux/async.h>
 #include <klish/ktp_session.h>
 
 
@@ -64,6 +67,25 @@ int ktp_accept(int listen_sock)
 }
 
 
+bool_t ktp_check_header(faux_hdr_t *hdr)
+{
+	assert(hdr);
+	if (!hdr)
+		return BOOL_FALSE;
+
+	if (faux_hdr_magic(hdr) != KTP_MAGIC)
+		return BOOL_FALSE;
+	if (faux_hdr_major(hdr) != KTP_MAJOR)
+		return BOOL_FALSE;
+	if (faux_hdr_minor(hdr) != KTP_MINOR)
+		return BOOL_FALSE;
+	if (faux_hdr_len(hdr) < (int)sizeof(*hdr))
+		return BOOL_FALSE;
+
+	return BOOL_TRUE;
+}
+
+
 faux_msg_t *ktp_msg_preform(ktp_cmd_e cmd, uint32_t status)
 {
 	faux_msg_t *msg = NULL;
@@ -77,3 +99,77 @@ faux_msg_t *ktp_msg_preform(ktp_cmd_e cmd, uint32_t status)
 
 	return msg;
 }
+
+
+bool_t ktp_send_error(faux_async_t *async, ktp_cmd_e cmd, const char *error)
+{
+	faux_msg_t *msg = NULL;
+
+	assert(async);
+	if (!async)
+		return BOOL_FALSE;
+
+	msg = ktp_msg_preform(cmd, KTP_STATUS_ERROR);
+	if (error)
+		faux_msg_add_param(msg, KTP_PARAM_ERROR, error, strlen(error));
+	faux_msg_send_async(msg, async);
+	faux_msg_free(msg);
+
+	return BOOL_TRUE;
+}
+
+
+bool_t ktp_peer_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
+	void *associated_data, void *user_data)
+{
+	faux_eloop_info_fd_t *info = (faux_eloop_info_fd_t *)associated_data;
+	faux_async_t *async = (faux_async_t *)user_data;
+
+	assert(async);
+
+	// Write data
+	if (info->revents & POLLOUT) {
+		faux_eloop_exclude_fd_event(eloop, info->fd, POLLOUT);
+		if (faux_async_out(async) < 0) {
+			// Someting went wrong
+			faux_eloop_del_fd(eloop, info->fd);
+			syslog(LOG_ERR, "Problem with async output");
+			return BOOL_FALSE; // Stop event loop
+		}
+	}
+
+	// Read data
+	if (info->revents & POLLIN) {
+		if (faux_async_in(async) < 0) {
+			// Someting went wrong
+			faux_eloop_del_fd(eloop, info->fd);
+			syslog(LOG_ERR, "Problem with async input");
+			return BOOL_FALSE; // Stop event loop
+		}
+	}
+
+	// EOF
+	if (info->revents & POLLHUP) {
+		faux_eloop_del_fd(eloop, info->fd);
+		syslog(LOG_DEBUG, "Close connection %d", info->fd);
+		return BOOL_FALSE; // Stop event loop
+	}
+
+	type = type; // Happy compiler
+
+	return BOOL_TRUE;
+}
+
+
+bool_t ktp_stall_cb(faux_async_t *async, size_t len, void *user_data)
+{
+	faux_eloop_t *eloop = (faux_eloop_t *)user_data;
+
+	assert(eloop);
+
+	faux_eloop_include_fd_event(eloop, faux_async_fd(async), POLLOUT);
+
+	len = len; // Happy compiler
+
+	return BOOL_TRUE;
+}

+ 113 - 27
klish/ktp/ktp_session.c

@@ -25,71 +25,157 @@ typedef enum {
 
 struct ktp_session_s {
 	ktp_session_state_e state;
-	faux_net_t *net;
+	faux_async_t *async;
+	faux_hdr_t *hdr; // Service var: engine will receive header and then msg
+	bool_t done;
+	faux_eloop_t *eloop;
 };
 
 
+static bool_t stop_loop_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
+	void *associated_data, void *user_data);
+
+
 ktp_session_t *ktp_session_new(int sock)
 {
-	ktp_session_t *session = NULL;
+	ktp_session_t *ktp = NULL;
 
 	if (sock < 0)
 		return NULL;
 
-	session = faux_zmalloc(sizeof(*session));
-	assert(session);
-	if (!session)
+	ktp = faux_zmalloc(sizeof(*ktp));
+	assert(ktp);
+	if (!ktp)
 		return NULL;
 
 	// Init
-	session->state = KTP_SESSION_STATE_UNAUTHORIZED;
-	session->net = faux_net_new();
-	assert(session->net);
-	faux_net_set_fd(session->net, sock);
-
-	return session;
+	ktp->state = KTP_SESSION_STATE_UNAUTHORIZED;
+	ktp->done = BOOL_FALSE;
+
+	// Event loop
+	ktp->eloop = faux_eloop_new(NULL);
+
+	// Async object
+	ktp->async = faux_async_new(sock);
+	assert(ktp->async);
+	// Receive message header first
+	faux_async_set_read_limits(ktp->async,
+		sizeof(faux_hdr_t), sizeof(faux_hdr_t));
+//	faux_async_set_read_cb(ktp->async, ktpd_session_read_cb, ktpd);
+	ktp->hdr = NULL;
+	faux_async_set_stall_cb(ktp->async, ktp_stall_cb, ktp->eloop);
+
+	// Event loop handlers
+	faux_eloop_add_signal(ktp->eloop, SIGINT, stop_loop_ev, ktp);
+	faux_eloop_add_signal(ktp->eloop, SIGTERM, stop_loop_ev, ktp);
+	faux_eloop_add_signal(ktp->eloop, SIGQUIT, stop_loop_ev, ktp);
+	faux_eloop_add_fd(ktp->eloop, ktp_session_fd(ktp), POLLIN,
+		ktp_peer_ev, ktp);
+
+	return ktp;
 }
 
 
-void ktp_session_free(ktp_session_t *session)
+void ktp_session_free(ktp_session_t *ktp)
 {
-	if (!session)
+	if (!ktp)
 		return;
 
-	faux_net_free(session->net);
-	faux_free(session);
+	faux_free(ktp->hdr);
+	close(ktp_session_fd(ktp));
+	faux_async_free(ktp->async);
+	faux_eloop_free(ktp->eloop);
+	faux_free(ktp);
+}
+
+
+bool_t ktp_session_done(const ktp_session_t *ktp)
+{
+	assert(ktp);
+	if (!ktp)
+		return BOOL_TRUE; // Done flag
+
+	return ktp->done;
+}
+
+
+bool_t ktp_session_set_done(ktp_session_t *ktp, bool_t done)
+{
+	assert(ktp);
+	if (!ktp)
+		return BOOL_FALSE;
+
+	ktp->done = done;
+
+	return BOOL_TRUE;
 }
 
 
-bool_t ktp_session_connected(ktp_session_t *session)
+bool_t ktp_session_connected(ktp_session_t *ktp)
 {
-	assert(session);
-	if (!session)
+	assert(ktp);
+	if (!ktp)
 		return BOOL_FALSE;
-	if (KTP_SESSION_STATE_DISCONNECTED == session->state)
+	if (KTP_SESSION_STATE_DISCONNECTED == ktp->state)
 		return BOOL_FALSE;
 
 	return BOOL_TRUE;
 }
 
 
-int ktp_session_fd(const ktp_session_t *session)
+int ktp_session_fd(const ktp_session_t *ktp)
 {
-	assert(session);
-	if (!session)
+	assert(ktp);
+	if (!ktp)
 		return BOOL_FALSE;
 
-	return faux_net_get_fd(session->net);
+	return faux_async_fd(ktp->async);
 }
 
 
 #if 0
-static void ktp_session_bad_socket(ktp_session_t *session)
+static void ktp_session_bad_socket(ktp_session_t *ktp)
 {
-	assert(session);
-	if (!session)
+	assert(ktp);
+	if (!ktp)
 		return;
 
-	session->state = KTP_SESSION_STATE_DISCONNECTED;
+	ktp->state = KTP_SESSION_STATE_DISCONNECTED;
 }
 #endif
+
+
+static bool_t stop_loop_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
+	void *associated_data, void *user_data)
+{
+	ktp_session_t *ktp = (ktp_session_t *)user_data;
+
+	if (!ktp)
+		return BOOL_FALSE;
+
+	ktp_session_set_done(ktp, BOOL_TRUE);
+
+	// Happy compiler
+	eloop = eloop;
+	type = type;
+	associated_data = associated_data;
+
+	return BOOL_FALSE; // Stop Event Loop
+}
+
+
+bool_t ktp_session_req_cmd(ktp_session_t *ktp, const char *line, int *retcode)
+{
+	faux_eloop_t *eloop = NULL;
+
+	assert(ktp);
+	if (!ktp)
+		return BOOL_FALSE;
+
+	faux_eloop_loop(eloop);
+
+	line = line;
+	retcode = retcode;
+
+	return BOOL_TRUE;
+}

+ 10 - 143
klish/ktp/ktpd_session.c

@@ -46,10 +46,6 @@ struct ktpd_session_s {
 // Static declarations
 static bool_t ktpd_session_read_cb(faux_async_t *async,
 	void *data, size_t len, void *user_data);
-static bool_t ktpd_session_stall_cb(faux_async_t *async,
-	size_t len, void *user_data);
-static bool_t client_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
-	void *associated_data, void *user_data);
 static bool_t wait_for_actions_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 	void *associated_data, void *user_data);
 static bool_t ktpd_session_exec(ktpd_session_t *ktpd, const char *line,
@@ -85,11 +81,11 @@ ktpd_session_t *ktpd_session_new(int sock, const kscheme_t *scheme,
 		sizeof(faux_hdr_t), sizeof(faux_hdr_t));
 	faux_async_set_read_cb(ktpd->async, ktpd_session_read_cb, ktpd);
 	ktpd->hdr = NULL;
-	faux_async_set_stall_cb(ktpd->async, ktpd_session_stall_cb, ktpd);
+	faux_async_set_stall_cb(ktpd->async, ktp_stall_cb, ktpd->eloop);
 
 	// Eloop callbacks
 	faux_eloop_add_fd(ktpd->eloop, ktpd_session_fd(ktpd), POLLIN,
-		client_ev, ktpd);
+		ktp_peer_ev, ktpd->async);
 	faux_eloop_add_signal(ktpd->eloop, SIGCHLD, wait_for_actions_ev, ktpd);
 
 	return ktpd;
@@ -110,44 +106,6 @@ void ktpd_session_free(ktpd_session_t *ktpd)
 }
 
 
-static bool_t check_ktp_header(faux_hdr_t *hdr)
-{
-	assert(hdr);
-	if (!hdr)
-		return BOOL_FALSE;
-
-	if (faux_hdr_magic(hdr) != KTP_MAGIC)
-		return BOOL_FALSE;
-	if (faux_hdr_major(hdr) != KTP_MAJOR)
-		return BOOL_FALSE;
-	if (faux_hdr_minor(hdr) != KTP_MINOR)
-		return BOOL_FALSE;
-	if (faux_hdr_len(hdr) < (int)sizeof(*hdr))
-		return BOOL_FALSE;
-
-	return BOOL_TRUE;
-}
-
-
-static bool_t ktpd_session_send_error(ktpd_session_t *ktpd,
-	ktp_cmd_e cmd, const char *error)
-{
-	faux_msg_t *msg = NULL;
-
-	assert(ktpd);
-	if (!ktpd)
-		return BOOL_FALSE;
-
-	msg = ktp_msg_preform(cmd, KTP_STATUS_ERROR);
-	if (error)
-		faux_msg_add_param(msg, KTP_PARAM_ERROR, error, strlen(error));
-	faux_msg_send_async(msg, ktpd->async);
-	faux_msg_free(msg);
-
-	return BOOL_TRUE;
-}
-
-
 static bool_t ktpd_session_process_cmd(ktpd_session_t *ktpd, faux_msg_t *msg)
 {
 	char *line = NULL;
@@ -162,8 +120,7 @@ static bool_t ktpd_session_process_cmd(ktpd_session_t *ktpd, faux_msg_t *msg)
 
 	// Get line from message
 	if (!(line = faux_msg_get_str_param_by_type(msg, KTP_PARAM_LINE))) {
-		ktpd_session_send_error(ktpd, cmd,
-			"The line is not specified");
+		ktp_send_error(ktpd->async, cmd, "The line is not specified");
 		return BOOL_FALSE;
 	}
 
@@ -177,7 +134,7 @@ static bool_t ktpd_session_process_cmd(ktpd_session_t *ktpd, faux_msg_t *msg)
 
 	// Session status can be changed while parsing
 	if (ksession_done(ktpd->session)) {
-		ktpd_session_send_error(ktpd, cmd, "Interrupted by system");
+		ktp_send_error(ktpd->async, cmd, "Interrupted by system");
 		faux_error_free(error);
 		return BOOL_FALSE;
 	}
@@ -191,7 +148,7 @@ static bool_t ktpd_session_process_cmd(ktpd_session_t *ktpd, faux_msg_t *msg)
 		faux_msg_free(ack);
 	} else {
 		char *err = faux_error_cstr(error);
-		ktpd_session_send_error(ktpd, cmd, err);
+		ktp_send_error(ktpd->async, cmd, err);
 		faux_str_free(err);
 		return BOOL_FALSE;
 	}
@@ -214,7 +171,7 @@ static bool_t ktpd_session_process_completion(ktpd_session_t *ktpd, faux_msg_t *
 
 	// Get line from message
 	if (!(line = faux_msg_get_str_param_by_type(msg, KTP_PARAM_LINE))) {
-		ktpd_session_send_error(ktpd, cmd, NULL);
+		ktp_send_error(ktpd->async, cmd, NULL);
 		return BOOL_FALSE;
 	}
 
@@ -222,7 +179,7 @@ static bool_t ktpd_session_process_completion(ktpd_session_t *ktpd, faux_msg_t *
 	pargv = ksession_parse_for_completion(ktpd->session, line);
 	faux_str_free(line);
 	if (!pargv) {
-		ktpd_session_send_error(ktpd, cmd, NULL);
+		ktp_send_error(ktpd->async, cmd, NULL);
 		return BOOL_FALSE;
 	}
 	kpargv_debug(pargv);
@@ -250,7 +207,7 @@ static bool_t ktpd_session_process_help(ktpd_session_t *ktpd, faux_msg_t *msg)
 
 	// Get line from message
 	if (!(line = faux_msg_get_str_param_by_type(msg, KTP_PARAM_LINE))) {
-		ktpd_session_send_error(ktpd, cmd, NULL);
+		ktp_send_error(ktpd->async, cmd, NULL);
 		return BOOL_FALSE;
 	}
 
@@ -322,14 +279,14 @@ static bool_t ktpd_session_read_cb(faux_async_t *async,
 
 		ktpd->hdr = (faux_hdr_t *)data;
 		// Check for broken header
-		if (!check_ktp_header(ktpd->hdr)) {
+		if (!ktp_check_header(ktpd->hdr)) {
 			faux_free(ktpd->hdr);
 			ktpd->hdr = NULL;
 			return BOOL_FALSE;
 		}
 
 		whole_len = faux_hdr_len(ktpd->hdr);
-		// msg_wo_hdr >= 0 because check_ktp_header() validates whole_len
+		// msg_wo_hdr >= 0 because ktp_check_header() validates whole_len
 		msg_wo_hdr = whole_len - sizeof(faux_hdr_t);
 		// Plan to receive message body
 		if (msg_wo_hdr > 0) {
@@ -364,24 +321,6 @@ static bool_t ktpd_session_read_cb(faux_async_t *async,
 }
 
 
-static bool_t ktpd_session_stall_cb(faux_async_t *async,
-	size_t len, void *user_data)
-{
-	ktpd_session_t *ktpd = (ktpd_session_t *)user_data;
-
-	assert(async);
-	assert(ktpd);
-	assert(ktpd->eloop);
-
-	faux_eloop_include_fd_event(ktpd->eloop, ktpd_session_fd(ktpd), POLLOUT);
-
-	async = async; // Happy compiler
-	len = len; // Happy compiler
-
-	return BOOL_TRUE;
-}
-
-
 bool_t ktpd_session_connected(ktpd_session_t *ktpd)
 {
 	assert(ktpd);
@@ -404,78 +343,6 @@ int ktpd_session_fd(const ktpd_session_t *ktpd)
 }
 
 
-bool_t ktpd_session_async_in(ktpd_session_t *ktpd)
-{
-	assert(ktpd);
-	if (!ktpd)
-		return BOOL_FALSE;
-	if (!ktpd_session_connected(ktpd))
-		return BOOL_FALSE;
-
-	if (faux_async_in(ktpd->async) < 0)
-		return BOOL_FALSE;
-
-	return BOOL_TRUE;
-}
-
-
-bool_t ktpd_session_async_out(ktpd_session_t *ktpd)
-{
-	assert(ktpd);
-	if (!ktpd)
-		return BOOL_FALSE;
-	if (!ktpd_session_connected(ktpd))
-		return BOOL_FALSE;
-
-	if (faux_async_out(ktpd->async) < 0)
-		return BOOL_FALSE;
-
-	return BOOL_TRUE;
-}
-
-
-static bool_t client_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
-	void *associated_data, void *user_data)
-{
-	faux_eloop_info_fd_t *info = (faux_eloop_info_fd_t *)associated_data;
-	ktpd_session_t *ktpd = (ktpd_session_t *)user_data;
-
-	assert(ktpd);
-
-	// Write data
-	if (info->revents & POLLOUT) {
-		faux_eloop_exclude_fd_event(eloop, info->fd, POLLOUT);
-		if (!ktpd_session_async_out(ktpd)) {
-			// Someting went wrong
-			faux_eloop_del_fd(eloop, info->fd);
-			syslog(LOG_ERR, "Problem with async output");
-			return BOOL_FALSE; // Stop event loop
-		}
-	}
-
-	// Read data
-	if (info->revents & POLLIN) {
-		if (!ktpd_session_async_in(ktpd)) {
-			// Someting went wrong
-			faux_eloop_del_fd(eloop, info->fd);
-			syslog(LOG_ERR, "Problem with async input");
-			return BOOL_FALSE; // Stop event loop
-		}
-	}
-
-	// EOF
-	if (info->revents & POLLHUP) {
-		faux_eloop_del_fd(eloop, info->fd);
-		syslog(LOG_DEBUG, "Close connection %d", info->fd);
-		return BOOL_FALSE; // Stop event loop
-	}
-
-	type = type; // Happy compiler
-
-	return BOOL_TRUE;
-}
-
-
 static bool_t wait_for_actions_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
 	void *associated_data, void *user_data)
 {

+ 9 - 0
klish/ktp_session.h

@@ -23,12 +23,21 @@ C_DECL_BEGIN
 int ktp_connect_unix(const char *sun_path);
 void ktp_disconnect(int fd);
 int ktp_accept(int listen_sock);
+
+bool_t ktp_check_header(faux_hdr_t *hdr);
 faux_msg_t *ktp_msg_preform(ktp_cmd_e cmd, uint32_t status);
+bool_t ktp_send_error(faux_async_t *async, ktp_cmd_e cmd, const char *error);
+
+bool_t ktp_peer_ev(faux_eloop_t *eloop, faux_eloop_type_e type,
+	void *associated_data, void *user_data);
+bool_t ktp_stall_cb(faux_async_t *async, size_t len, void *user_data);
 
 
 // Client KTP session
 ktp_session_t *ktp_session_new(int sock);
 void ktp_session_free(ktp_session_t *session);
+bool_t ktp_session_done(const ktp_session_t *ktp);
+bool_t ktp_session_set_done(ktp_session_t *ktp, bool_t done);
 bool_t ktp_session_connected(ktp_session_t *session);
 int ktp_session_fd(const ktp_session_t *session);