From 5d5e3233d3fcf196f90da99566ae38f5c73e76da Mon Sep 17 00:00:00 2001 From: Hui Yu Date: Sun, 6 Jun 2021 13:06:50 +0800 Subject: [PATCH] [Websocket] Support exec --- .github/workflows/build.yml | 15 +- README.md | 9 + examples/Makefile | 6 +- examples/auth_provider/Makefile | 2 +- examples/create_pod/Makefile | 2 +- examples/delete_pod/Makefile | 2 +- examples/exec_pod/.gitignore | 1 + examples/exec_pod/Makefile | 11 + examples/exec_pod/main.c | 85 +++++++ examples/exec_provider/Makefile | 2 +- examples/generic/Makefile | 2 +- examples/list_pod/Makefile | 2 +- examples/list_pod_incluster/Makefile | 2 +- examples/multi_thread/Makefile | 2 +- examples/watch_list_pod/Makefile | 2 +- kubernetes/CMakeLists.txt | 4 + kubernetes/websocket/kube_exec.c | 115 +++++++++ kubernetes/websocket/kube_exec.h | 15 ++ kubernetes/websocket/wsclient.c | 337 +++++++++++++++++++++++++++ kubernetes/websocket/wsclient.h | 41 ++++ 20 files changed, 645 insertions(+), 12 deletions(-) create mode 100644 examples/exec_pod/.gitignore create mode 100644 examples/exec_pod/Makefile create mode 100644 examples/exec_pod/main.c create mode 100644 kubernetes/websocket/kube_exec.c create mode 100644 kubernetes/websocket/kube_exec.h create mode 100644 kubernetes/websocket/wsclient.c create mode 100644 kubernetes/websocket/wsclient.h diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 7dbab93..fd05c8e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -16,6 +16,15 @@ jobs: run: | sudo apt-get update sudo apt-get install -y libssl-dev libcurl4-openssl-dev uncrustify libyaml-dev + - name: Prepare libwebsockets + run: | + git clone https://libwebsockets.org/repo/libwebsockets --depth 1 --branch v4.2-stable + cd libwebsockets + mkdir build + cd build + cmake .. + make + sudo make install - name: Build client library run: | cd kubernetes @@ -35,8 +44,10 @@ jobs: cd examples/ make - name: Create k8s kind cluster - uses: helm/kind-action@v1.1.0 + uses: helm/kind-action@v1.2.0 - name: Test examples run: | + kubectl cluster-info --context kind-chart-testing + kubectl describe node cd examples/ - LD_LIBRARY_PATH=$GITHUB_WORKSPACE/kubernetes/build/ make test + LD_LIBRARY_PATH=$GITHUB_WORKSPACE/kubernetes/build/:/usr/local/lib make test diff --git a/README.md b/README.md index fdd30e9..3a3aa42 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,15 @@ CLIENT_REPO_ROOT=${PWD}/c # Install pre-requisites sudo apt-get install libssl-dev libcurl4-openssl-dev uncrustify libyaml-dev +# Build pre-requisite: libwebsockets +git clone https://libwebsockets.org/repo/libwebsockets --depth 1 --branch v4.2-stable +cd libwebsockets +mkdir build +cd build +cmake -DCMAKE_INSTALL_PREFIX=/usr/local/lib .. +make +sudo make install + # Move into the Kubernetes directory cd ${CLIENT_REPO_ROOT}/kubernetes diff --git a/examples/Makefile b/examples/Makefile index 89be907..d6e3aeb 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -8,6 +8,7 @@ all: cd auth_provider; make cd watch_list_pod; make cd multi_thread; make + cd exec_pod; make clean: cd create_pod; make clean @@ -19,10 +20,13 @@ clean: cd auth_provider; make clean cd watch_list_pod; make clean cd multi_thread; make clean + cd exec_pod; make clean test: cd create_pod; make test; sleep 10 cd list_pod; make test cd delete_pod; make test cd generic; make test - cd multi_thread; make test + cd multi_thread; make test; sleep 10 + kubectl describe po test-pod-8 + cd exec_pod; make test \ No newline at end of file diff --git a/examples/auth_provider/Makefile b/examples/auth_provider/Makefile index 931b30d..041c368 100644 --- a/examples/auth_provider/Makefile +++ b/examples/auth_provider/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lyaml +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml -lwebsockets -L/usr/local/lib CFLAGS:=-g all: diff --git a/examples/create_pod/Makefile b/examples/create_pod/Makefile index efc076c..a9efb20 100644 --- a/examples/create_pod/Makefile +++ b/examples/create_pod/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lyaml +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml -lwebsockets -L/usr/local/lib CFLAGS:=-g BIN:=create_pod_bin diff --git a/examples/delete_pod/Makefile b/examples/delete_pod/Makefile index 600eae7..cf35e8b 100644 --- a/examples/delete_pod/Makefile +++ b/examples/delete_pod/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lyaml +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml -lwebsockets -L/usr/local/lib CFLAGS:=-g BIN:=delete_pod_bin diff --git a/examples/exec_pod/.gitignore b/examples/exec_pod/.gitignore new file mode 100644 index 0000000..4be601f --- /dev/null +++ b/examples/exec_pod/.gitignore @@ -0,0 +1 @@ +exec_pod_bin diff --git a/examples/exec_pod/Makefile b/examples/exec_pod/Makefile new file mode 100644 index 0000000..e8f8f2c --- /dev/null +++ b/examples/exec_pod/Makefile @@ -0,0 +1,11 @@ +INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -I../../kubernetes/websocket +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml -lwebsockets -L/usr/local/lib +CFLAGS:=-g +BIN:=exec_pod_bin + +all: + gcc main.c $(CFLAGS) $(INCLUDE) $(LIBS) -o $(BIN) +clean: + rm ./$(BIN) +test: + ./$(BIN) only-run-case-1 diff --git a/examples/exec_pod/main.c b/examples/exec_pod/main.c new file mode 100644 index 0000000..0696b33 --- /dev/null +++ b/examples/exec_pod/main.c @@ -0,0 +1,85 @@ +#include +#include +#include +#include + +/* + * An example of call back function: + */ +void my_exec_data_callback(void **p_data_received, long *p_data_received_len) +{ + printf("%s: Received %ld bytes:\n%s", __func__, *p_data_received_len, (char *) (*p_data_received)); +} + +int main(int argc, char *argv[]) +{ + char *base_path = NULL; + sslConfig_t *ssl_config = NULL; + list_t *api_keys = NULL; + int rc = load_kube_config(&base_path, &ssl_config, &api_keys, NULL); /* NULL means loading configuration from $HOME/.kube/config */ + if (rc != 0) { + printf("Cannot load kubernetes configuration.\n"); + return -1; + } + + /* The log level mask for libwebsokets */ + int wsc_log_mask = LLL_ERR | LLL_WARN; + /* + * If you need a detail log:*/ + //int wsc_log_mask = LLL_ERR | LLL_WARN | LLL_USER | LLL_NOTICE; + + wsclient_t *wsc = wsclient_create(base_path, ssl_config, api_keys, wsc_log_mask); + if (!wsc) { + fprintf(stderr, "Cannot create a websocket client.\n"); + return -1; + } + + /* + * Case #1 + * Normal mode (tty = 0) + */ + kube_exec(wsc, /* websocket client */ + "default", /* namespace */ + "test-pod-8", /* pod name */ + "my-container", /* container name, NULL means the default container in the pod */ + 1, /* stdin */ + 1, /* stdout */ + 0, /* tty */ + "ls /" /* command */ + ); + + printf("Received %ld bytes:\n%s\n", wsc->data_received_len, (char *) (wsc->data_received)); + + if (argc > 1) { // skip the case #2 in the automation test + goto end; + } + + /* + * Case #2 + * Interactive and tty mode (tty = 1) + */ + /* Use the default callback function provided by libkubernetes */ + wsc->data_callback_func = NULL; + /* If you want to use your call back function: + * wsc->data_callback_func = my_callback_function; + */ + kube_exec(wsc, /* websocket client */ + "default", /* namespace */ + "test-pod-8", /* pod name */ + NULL, /* container name, NULL means the default container in the pod */ + 1, /* stdin */ + 1, /* stdout */ + 1, /* tty */ + "bash" /* command */ + ); + + end: + /* Clean up */ + wsclient_free(wsc); + free_client_config(base_path, ssl_config, api_keys); + base_path = NULL; + ssl_config = NULL; + api_keys = NULL; + + return 0; +} diff --git a/examples/exec_provider/Makefile b/examples/exec_provider/Makefile index 130bc29..d75d129 100644 --- a/examples/exec_provider/Makefile +++ b/examples/exec_provider/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lyaml +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml -lwebsockets -L/usr/local/lib CFLAGS:=-g all: my_exec_provider_bin list_pod_by_exec_provider_bin diff --git a/examples/generic/Makefile b/examples/generic/Makefile index c1be7ec..f1296c9 100644 --- a/examples/generic/Makefile +++ b/examples/generic/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lyaml +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml -lwebsockets -L/usr/local/lib CFLAGS:=-g BIN:=generic_bin diff --git a/examples/list_pod/Makefile b/examples/list_pod/Makefile index 69b124d..e167bee 100644 --- a/examples/list_pod/Makefile +++ b/examples/list_pod/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lyaml +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml -lwebsockets -L/usr/local/lib CFLAGS:=-g BIN:=list_pod_bin diff --git a/examples/list_pod_incluster/Makefile b/examples/list_pod_incluster/Makefile index 981d292..b48ab09 100644 --- a/examples/list_pod_incluster/Makefile +++ b/examples/list_pod_incluster/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lyaml +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml -lwebsockets -L/usr/local/lib CFLAGS:=-g all: diff --git a/examples/multi_thread/Makefile b/examples/multi_thread/Makefile index 8d48bf9..d2bbd99 100644 --- a/examples/multi_thread/Makefile +++ b/examples/multi_thread/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -I../../kubernetes/watch -LIBS:=-L../../kubernetes/build -lkubernetes -lyaml -lpthread +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml -lpthread -lwebsockets -L/usr/local/lib CFLAGS:=-g BIN:= multi_thread_bin diff --git a/examples/watch_list_pod/Makefile b/examples/watch_list_pod/Makefile index ba4bde0..825b113 100644 --- a/examples/watch_list_pod/Makefile +++ b/examples/watch_list_pod/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -I../../kubernetes/watch -LIBS:=-L../../kubernetes/build -lkubernetes -lyaml +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml -lwebsockets -L/usr/local/lib CFLAGS:=-g all: diff --git a/kubernetes/CMakeLists.txt b/kubernetes/CMakeLists.txt index bab1aa1..c433986 100644 --- a/kubernetes/CMakeLists.txt +++ b/kubernetes/CMakeLists.txt @@ -41,6 +41,8 @@ set(SRCS config/authn_plugin/authn_plugin_util.c config/authn_plugin/authn_plugin.c watch/watch_util.c + websocket/wsclient.c + websocket/kube_exec.c src/list.c src/apiKey.c src/apiClient.c @@ -804,6 +806,8 @@ set(HDRS config/authn_plugin/authn_plugin_util.h config/authn_plugin/authn_plugin.h watch/watch_util.h + websocket/wsclient.h + websocket/kube_exec.h include/apiClient.h include/list.h include/binary.h diff --git a/kubernetes/websocket/kube_exec.c b/kubernetes/websocket/kube_exec.c new file mode 100644 index 0000000..970c086 --- /dev/null +++ b/kubernetes/websocket/kube_exec.c @@ -0,0 +1,115 @@ +#include +#include "kube_exec.h" + +#define WS_PATH_BUFFER_SIZE 1024 +#define URL_COMMAND_STRING_BUFFER_SIZE 1024 +#define COMMAND_STRING_DELIM " " +#define COMMAND_PREFIX "&command=" +#define CONTAINER_PREFIX "&container=" +#define ESCAPED_STRING_BUFFER_SIZE 8 +#define ESCAPED_STRING_LENGTH 4 +#define TRUE_STRING "true" +#define FALSE_STRING "false" + +static void default_exec_data_callback(void **p_data_received, long *p_data_received_len) +{ + printf("%s: Received %ld bytes:\n%s", __func__, *p_data_received_len, (char *) (*p_data_received)); +} + +static void escape_character_in_url(char *escaped, int escaped_buffer_size, const char chr) +{ + if (!escaped || escaped_buffer_size < ESCAPED_STRING_LENGTH + 1) { + return; + } + + if ('+' == chr) { + strncpy(escaped, "%2B", ESCAPED_STRING_LENGTH); + } else if ('"' == chr) { + strncpy(escaped, "%22", ESCAPED_STRING_LENGTH); + } else if ('%' == chr) { + strncpy(escaped, "%25", ESCAPED_STRING_LENGTH); + } else { + snprintf(escaped, ESCAPED_STRING_LENGTH, "%c", chr); + } +} + +static int assemble_command_in_url(char *command_string_in_url, int url_command_string_buffer_size, const char *original_command_string) +{ + int command_string_length = 0; + const char *p = original_command_string; + char escaped_string[ESCAPED_STRING_BUFFER_SIZE]; + while (*p) { + if (' ' == *p) { + command_string_length += strlen(COMMAND_PREFIX); + if (command_string_length >= url_command_string_buffer_size) { + fprintf(stderr, "%s: The length of command string exceeds the buffer size.", __func__); + return -1; + } + strncat(command_string_in_url, COMMAND_PREFIX, strlen(COMMAND_PREFIX)); + } else { + memset(escaped_string, 0, sizeof(escaped_string)); + escape_character_in_url(escaped_string, ESCAPED_STRING_BUFFER_SIZE, *p); + command_string_length += strlen(escaped_string); + if (command_string_length >= url_command_string_buffer_size) { + fprintf(stderr, "%s: The length of command string exceeds the buffer size.", __func__); + return -1; + } + strncat(command_string_in_url, escaped_string, strlen(escaped_string)); + } + p++; + } + return 0; +} + +int kube_exec(wsclient_t * wsc, const char *namespace_, const char *pod_name, const char *container_name, int stdin, int stdout, int tty, const char *command) +{ + int rc = 0; + + const char *ws_path_template = "/api/v1/namespaces/%s/pods/%s/exec?stdin=%s&stdout=%s&tty=%s&command=%s"; + + /* + * e.g. + * /api/v1/namespaces/default/pods/pod1/exec?stdin=true&stdout=true&tty=false&command=ls&command=/&container=container1 + * /api/v1/namespaces/default/pods/pod1/exec?stdin=true&stdout=true&tty=true&command=bash&container=container1 + */ + + char command_string_in_url[URL_COMMAND_STRING_BUFFER_SIZE]; + memset(command_string_in_url, 0, sizeof(command_string_in_url)); + rc = assemble_command_in_url(command_string_in_url, URL_COMMAND_STRING_BUFFER_SIZE, command); + if (rc != 0) { + return rc; + } + + char ws_path[WS_PATH_BUFFER_SIZE]; + memset(ws_path, 0, sizeof(ws_path)); + snprintf(ws_path, WS_PATH_BUFFER_SIZE, ws_path_template, + namespace_, pod_name, 1 == stdin ? TRUE_STRING : FALSE_STRING, 1 == stdout ? TRUE_STRING : FALSE_STRING, 1 == tty ? TRUE_STRING : FALSE_STRING, command_string_in_url); + + if (container_name && container_name[0] != '\0') { + int length_of_ws_path_with_container = strlen(ws_path) + strlen(CONTAINER_PREFIX) + strlen(container_name); + if (length_of_ws_path_with_container >= WS_PATH_BUFFER_SIZE) { + fprintf(stderr, "%s: Error: The length of the websocket path exceeds the buffer size.", __func__); + return -1; + } + strncat(ws_path, CONTAINER_PREFIX, strlen(CONTAINER_PREFIX)); + strncat(ws_path, container_name, strlen(container_name)); + } + + if (wsc->log_mask & LLL_USER) { + printf("%s: ws_path=%s\n", __func__, ws_path); + } + wsc->path = ws_path; + + wsc_mode_t mode = WSC_MODE_NORMAL; + if (tty) { + mode = WSC_MODE_IT; + if (NULL == wsc->data_callback_func) { + wsc->data_callback_func = default_exec_data_callback; + } + } + + rc = wsclient_run(wsc, mode); + + wsc->path = NULL; + return rc; +} diff --git a/kubernetes/websocket/kube_exec.h b/kubernetes/websocket/kube_exec.h new file mode 100644 index 0000000..0319ad9 --- /dev/null +++ b/kubernetes/websocket/kube_exec.h @@ -0,0 +1,15 @@ +#ifndef _KUBE_EXEC_H +#define _KUBE_EXEC_H + +#include "wsclient.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int kube_exec(wsclient_t * wsc, const char *namespace_, const char *pod_name, const char *container_name, int stdin, int stdout, int tty, const char *command); + +#ifdef __cplusplus +} +#endif +#endif /* _EXEC_H */ diff --git a/kubernetes/websocket/wsclient.c b/kubernetes/websocket/wsclient.c new file mode 100644 index 0000000..acaffbb --- /dev/null +++ b/kubernetes/websocket/wsclient.c @@ -0,0 +1,337 @@ +#include "wsclient.h" +#include +#include +#include +#include +#include +#include + +#define WSC_ATTACH_STDIN_BUFFER_SIZE 1024 +#define TTY_STDIN_NUMBER 0 +#define TTY_STDOUT_NUMBER 1 +#define WS_PROTOCOL_DELIM "://" +#define WS_BASE_PATH_DELIM_CHAR ':' /* ip:port */ +#define WS_BASE_PATH_DELIM_LENGTH 1 + +static struct lws_context *g_lws_context; +static bool g_interrupted; + +/* + * The retry and backoff policy we want to use for our client connections + */ + +static const uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 }; + +static const lws_retry_bo_t retry = { + .retry_ms_table = backoff_ms, + .retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms), + .conceal_count = LWS_ARRAY_SIZE(backoff_ms), + + .secs_since_valid_ping = 3, /* force PINGs after secs idle */ + .secs_since_valid_hangup = 10, /* hangup after secs idle */ + + .jitter_percent = 20, +}; + +static int get_server_address_and_port(char **p_ws_addr, int *p_ws_port, const char *base_path) +{ + if (!base_path) { + return -1; + } + + char *p = strstr(base_path, WS_PROTOCOL_DELIM); /* e.g. base_path="https://ip:port" */ + if (NULL == p) { + fprintf(stderr, "%s: %s is not a valid websocket base path.\n", __func__, base_path); + return -1; + } + const char *addr_and_port_string = p + strlen(WS_PROTOCOL_DELIM); /* addr_and_port_string="ip:port" */ + + p = strrchr(addr_and_port_string, WS_BASE_PATH_DELIM_CHAR); + if (NULL == p) { + fprintf(stderr, "%s: %s is not a valid websocket base path.\n", __func__, addr_and_port_string); + return -1; + } + const char *port_string = p + WS_BASE_PATH_DELIM_LENGTH; + + int ws_port = atoi(port_string); + if (0 == ws_port) { + fprintf(stderr, "%s: Cannot get a valid websocket port from <%s>.\n", __func__, port_string); + return -1; + } + + int port_string_length = strlen(addr_and_port_string) - WS_BASE_PATH_DELIM_LENGTH - strlen(port_string); + char *ws_addr = (char *) calloc(sizeof(char), port_string_length + 1); /* the memory of ws_addr will be deleted by wsclient_free() */ + if (!ws_addr) { + fprintf(stderr, "%s: Cannot allocate the memory for websocket server address.\n", __func__); + return -1; + } + strncpy(ws_addr, addr_and_port_string, port_string_length); + + *p_ws_port = ws_port; + *p_ws_addr = ws_addr; + + return 0; +} + +wsclient_t *wsclient_create(const char *base_path, sslConfig_t * ssl_config, list_t * apiKeys, int ws_log_mask) +{ + if (!base_path) { + fprintf(stderr, "%s: The base path is invalid.\n", __func__); + return NULL; + } + + wsclient_t *wsc = (wsclient_t *) calloc(1, sizeof(wsclient_t)); + if (!wsc) { + fprintf(stderr, "%s: Cannot allocate the memory for websocket client.\n", __func__); + return NULL; + } + + int rc = get_server_address_and_port(&wsc->server_address, &wsc->server_port, base_path); + if (0 != rc) { + fprintf(stderr, "%s: Cannot get the websocket server address or port from the base path <%s>.\n", __func__, base_path); + goto error; + } + + wsc->ssl_config = ssl_config; + wsc->log_mask = ws_log_mask; + + return wsc; + +error: + if (wsc) { + free(wsc); + wsc = NULL; + } + return NULL; +} + +void wsclient_free(wsclient_t * wsc) +{ + if (!wsc) { + return; + } + if (wsc->server_address) { + free(wsc->server_address); + wsc->server_address = NULL; + } + if (wsc->data_received) { + free(wsc->data_received); + wsc->data_received = NULL; + } + wsc->data_received_len = 0; + wsc->data_callback_func = NULL; + wsc->ssl_config = NULL; + + free(wsc); +} + +/* + * Scheduled sul callback that starts the connection attempt + */ + +static void connect_client(lws_sorted_usec_list_t * sul) +{ + wsclient_t *wsc = lws_container_of(sul, wsclient_t, sul); + struct lws_client_connect_info i; + + memset(&i, 0, sizeof(i)); + + i.context = g_lws_context; + i.port = wsc->server_port; + i.address = wsc->server_address; + i.path = wsc->path; + i.host = i.address; + i.origin = i.address; + i.ssl_connection = wsc->ssl_config ? LCCSCF_USE_SSL : 0; + + //i.protocol = pro; + //i.local_protocol_name = "websocket-client"; + i.pwsi = &wsc->wsi; + i.retry_and_idle_policy = &retry; + i.userdata = wsc; + + if (!lws_client_connect_via_info(&i)) + /* + * Failed... schedule a retry... we can't use the _retry_wsi() + * convenience wrapper api here because no valid wsi at this + * point. + */ + if (lws_retry_sul_schedule(g_lws_context, 0, sul, &retry, connect_client, &wsc->retry_count)) { + lwsl_err("%s: connection attempts exhausted\n", __func__); + g_interrupted = true; + } +} + +static int callback_wsclient(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + lwsl_user("%s: callback reason %d\n", __func__, reason); + + wsclient_t *wsc = (wsclient_t *) user; + + switch (reason) { + + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + lwsl_err("%s: wsclient connection error: %s\n", __func__, in ? (char *) in : "(null)"); + goto do_retry; + break; + + case LWS_CALLBACK_CLIENT_ESTABLISHED: + lwsl_user("%s: wsclient connection established\n", __func__); + lws_callback_on_writable(wsi); + break; + + case LWS_CALLBACK_CLIENT_RECEIVE: + lwsl_hexdump_notice(in, len); + if (TTY_STDOUT_NUMBER != ((char *) in)[0]) { + lwsl_user("%s: The first char of data received is not STDOUT.\n", __func__); + return 0; + } + /* + * The first character of data received from + * the kubernets API server is "STDOUT", + * we ignore it + */ + int valid_data_len = len - 1; + if (0 == valid_data_len) { + lwsl_user("%s: The content of data received is empty.\n", __func__); + return 0; + } + char *valid_data = (char *)in + 1 ; + + if (wsc->data_received_len > 0 && wsc->data_received) { + free(wsc->data_received); + wsc->data_received = NULL; + wsc->data_received_len = 0; + } + wsc->data_received = (char *) calloc(sizeof(char), valid_data_len + 1); + if (!wsc->data_received) { + lwsl_err("%s: Cannot allocate memory for received data.\n", __func__); + return -1; + } + memcpy(wsc->data_received, valid_data, valid_data_len); + wsc->data_received_len = valid_data_len; + + if (wsc->data_received_len != 0 && wsc->data_received && wsc->data_callback_func) { + wsc->data_callback_func(&wsc->data_received, &wsc->data_received_len); + } + break; + + case LWS_CALLBACK_CLIENT_WRITEABLE: + if (wsc->data_to_send_len > 0 && wsc->data_to_send) { + int m = lws_write(wsi, wsc->data_to_send + LWS_PRE, wsc->data_to_send_len, LWS_WRITE_BINARY); + if (m < wsc->data_to_send_len) { + lwsl_err("%s: Sending message failed: %d\n", __func__, m); + return -1; + } + lwsl_user("%s: Succeed to send message: %d\n", __func__, m); + if (wsc->data_to_send) { + free(wsc->data_to_send); + wsc->data_to_send = NULL; + wsc->data_to_send_len = 0; + } + } + break; + + case LWS_CALLBACK_CLIENT_CLOSED: + //goto do_retry; + g_interrupted = true; + break; + default: + break; + } + + return lws_callback_http_dummy(wsi, reason, user, in, len); + + do_retry: + /* + * retry the connection to keep it nailed up + * + * For this example, we try to conceal any problem for one set of + * backoff retries and then exit the app. + * + * If you set retry.conceal_count to be larger than the number of + * elements in the backoff table, it will never give up and keep + * retrying at the last backoff delay plus the random jitter amount. + */ + if (lws_retry_sul_schedule_retry_wsi(wsi, &wsc->sul, connect_client, &wsc->retry_count)) { + lwsl_err("%s: wsclient connection attempts exhausted.\n", __func__); + g_interrupted = true; + } + + return 0; +} + +static const struct lws_protocols protocols[] = { + {"", callback_wsclient, 0, 0,}, + {NULL, NULL, 0, 0} +}; + +static void read_from_stdin(wsclient_t * wsc) +{ + char wsc_attach_stdin_buffer[WSC_ATTACH_STDIN_BUFFER_SIZE]; + memset(wsc_attach_stdin_buffer, 0, sizeof(wsc_attach_stdin_buffer)); + + int flag, newflag; + flag = fcntl(STDIN_FILENO, F_GETFL); + newflag = flag | O_NONBLOCK; + fcntl(STDIN_FILENO, F_SETFL, newflag); + fgets(wsc_attach_stdin_buffer, sizeof(wsc_attach_stdin_buffer) - 1, stdin); + + if (wsc_attach_stdin_buffer && strlen(wsc_attach_stdin_buffer) > 0) { + wsc->data_to_send_len = strlen(wsc_attach_stdin_buffer) + 1 /* TTY_STDIN_NUMBER */ ; + wsc->data_to_send = (char *) calloc(LWS_PRE + 1 /* TTY_STDIN_NUMBER */ + wsc->data_to_send_len + 1, 1); + wsc->data_to_send[LWS_PRE] = TTY_STDIN_NUMBER; + strncpy(wsc->data_to_send + LWS_PRE + 1, wsc_attach_stdin_buffer, strlen(wsc_attach_stdin_buffer)); + } + + fcntl(STDIN_FILENO, F_SETFL, flag); +} + +int wsclient_run(wsclient_t * wsc, wsc_mode_t mode) +{ + int n = 0; + lws_set_log_level(wsc->log_mask, NULL); + + struct lws_context_creation_info info; + memset(&info, 0, sizeof(struct lws_context_creation_info)); + + info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ + info.protocols = protocols; + /* + * since we know this lws context is only ever going to be used with + * one client wsis / fds / sockets at a time, let lws know it doesn't + * have to use the default allocations for fd tables up to ulimit -n. + * It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we + * will use. + */ + info.fd_limit_per_thread = 1 + 1 + 1; + + if (wsc->ssl_config) { + info.client_ssl_ca_filepath = wsc->ssl_config->CACertFile; + info.client_ssl_private_key_filepath = wsc->ssl_config->clientKeyFile; + info.client_ssl_cert_filepath = wsc->ssl_config->clientCertFile; + } + + g_lws_context = lws_create_context(&info); + if (!g_lws_context) { + lwsl_err("%s: wsclient init failed.\n", __func__); + return 1; + } + + /* schedule the first client connection attempt to happen immediately */ + lws_sul_schedule(g_lws_context, 0, &wsc->sul, connect_client, 1); + + while (n >= 0 && false == g_interrupted) { + if (WSC_MODE_IT == mode) { + read_from_stdin(wsc); + } + n = lws_service(g_lws_context, 0); + } + + lwsl_user("%s: wsclient completed.\n", __func__); + lws_context_destroy(g_lws_context); + g_interrupted = false; + + return 0; +} diff --git a/kubernetes/websocket/wsclient.h b/kubernetes/websocket/wsclient.h new file mode 100644 index 0000000..82a17a4 --- /dev/null +++ b/kubernetes/websocket/wsclient.h @@ -0,0 +1,41 @@ +#ifndef _WSCLIENT_H +#define _WSCLIENT_H + +#include +#include "../include/apiClient.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void (*data_callback_func) (void **, long *); + +typedef enum wsc_mode_t { + WSC_MODE_NORMAL = 0, + WSC_MODE_IT +} wsc_mode_t; + +typedef struct wsclient_t { + char *server_address; + int server_port; + char *path; + char *data_to_send; + long data_to_send_len; + void *data_received; + long data_received_len; + data_callback_func data_callback_func; + int log_mask; + lws_sorted_usec_list_t sul; /* schedule connection retry */ + struct lws *wsi; /* related wsi if any */ + uint16_t retry_count; /* count of consequetive retries */ + sslConfig_t *ssl_config; +} wsclient_t; + +wsclient_t *wsclient_create(const char *, sslConfig_t *, list_t *, int); +void wsclient_free(wsclient_t *); +int wsclient_run(wsclient_t *, wsc_mode_t); + +#ifdef __cplusplus +} +#endif +#endif /* _WSCLIENT_H */