Files
c/kubernetes/websocket/wsclient.c

338 lines
11 KiB
C
Raw Normal View History

2021-06-06 13:06:50 +08:00
#include "wsclient.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdbool.h>
#define WSC_ATTACH_STDIN_BUFFER_SIZE 1024
#define TTY_STDIN_NUMBER 0
#define TTY_STDOUT_NUMBER 1
#define WS_PROTOCOL_DELIM "://"
#define WS_BASE_PATH_DELIM_CHAR ':' /* ip:port */
#define WS_BASE_PATH_DELIM_LENGTH 1
static struct lws_context *g_lws_context;
static bool g_interrupted;
/*
* The retry and backoff policy we want to use for our client connections
*/
static const uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 };
static const lws_retry_bo_t retry = {
.retry_ms_table = backoff_ms,
.retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms),
.conceal_count = LWS_ARRAY_SIZE(backoff_ms),
.secs_since_valid_ping = 3, /* force PINGs after secs idle */
.secs_since_valid_hangup = 10, /* hangup after secs idle */
.jitter_percent = 20,
};
static int get_server_address_and_port(char **p_ws_addr, int *p_ws_port, const char *base_path)
{
if (!base_path) {
return -1;
}
char *p = strstr(base_path, WS_PROTOCOL_DELIM); /* e.g. base_path="https://ip:port" */
if (NULL == p) {
fprintf(stderr, "%s: %s is not a valid websocket base path.\n", __func__, base_path);
return -1;
}
const char *addr_and_port_string = p + strlen(WS_PROTOCOL_DELIM); /* addr_and_port_string="ip:port" */
p = strrchr(addr_and_port_string, WS_BASE_PATH_DELIM_CHAR);
if (NULL == p) {
fprintf(stderr, "%s: %s is not a valid websocket base path.\n", __func__, addr_and_port_string);
return -1;
}
const char *port_string = p + WS_BASE_PATH_DELIM_LENGTH;
int ws_port = atoi(port_string);
if (0 == ws_port) {
fprintf(stderr, "%s: Cannot get a valid websocket port from <%s>.\n", __func__, port_string);
return -1;
}
int port_string_length = strlen(addr_and_port_string) - WS_BASE_PATH_DELIM_LENGTH - strlen(port_string);
char *ws_addr = (char *) calloc(sizeof(char), port_string_length + 1); /* the memory of ws_addr will be deleted by wsclient_free() */
if (!ws_addr) {
fprintf(stderr, "%s: Cannot allocate the memory for websocket server address.\n", __func__);
return -1;
}
strncpy(ws_addr, addr_and_port_string, port_string_length);
*p_ws_port = ws_port;
*p_ws_addr = ws_addr;
return 0;
}
wsclient_t *wsclient_create(const char *base_path, sslConfig_t * ssl_config, list_t * apiKeys, int ws_log_mask)
{
if (!base_path) {
fprintf(stderr, "%s: The base path is invalid.\n", __func__);
return NULL;
}
wsclient_t *wsc = (wsclient_t *) calloc(1, sizeof(wsclient_t));
if (!wsc) {
fprintf(stderr, "%s: Cannot allocate the memory for websocket client.\n", __func__);
return NULL;
}
int rc = get_server_address_and_port(&wsc->server_address, &wsc->server_port, base_path);
if (0 != rc) {
fprintf(stderr, "%s: Cannot get the websocket server address or port from the base path <%s>.\n", __func__, base_path);
goto error;
}
wsc->ssl_config = ssl_config;
wsc->log_mask = ws_log_mask;
return wsc;
error:
if (wsc) {
free(wsc);
wsc = NULL;
}
return NULL;
}
void wsclient_free(wsclient_t * wsc)
{
if (!wsc) {
return;
}
if (wsc->server_address) {
free(wsc->server_address);
wsc->server_address = NULL;
}
if (wsc->data_received) {
free(wsc->data_received);
wsc->data_received = NULL;
}
wsc->data_received_len = 0;
wsc->data_callback_func = NULL;
wsc->ssl_config = NULL;
free(wsc);
}
/*
* Scheduled sul callback that starts the connection attempt
*/
static void connect_client(lws_sorted_usec_list_t * sul)
{
wsclient_t *wsc = lws_container_of(sul, wsclient_t, sul);
struct lws_client_connect_info i;
memset(&i, 0, sizeof(i));
i.context = g_lws_context;
i.port = wsc->server_port;
i.address = wsc->server_address;
i.path = wsc->path;
i.host = i.address;
i.origin = i.address;
i.ssl_connection = wsc->ssl_config ? LCCSCF_USE_SSL : 0;
//i.protocol = pro;
//i.local_protocol_name = "websocket-client";
i.pwsi = &wsc->wsi;
i.retry_and_idle_policy = &retry;
i.userdata = wsc;
if (!lws_client_connect_via_info(&i))
/*
* Failed... schedule a retry... we can't use the _retry_wsi()
* convenience wrapper api here because no valid wsi at this
* point.
*/
if (lws_retry_sul_schedule(g_lws_context, 0, sul, &retry, connect_client, &wsc->retry_count)) {
lwsl_err("%s: connection attempts exhausted\n", __func__);
g_interrupted = true;
}
}
static int callback_wsclient(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
lwsl_user("%s: callback reason %d\n", __func__, reason);
wsclient_t *wsc = (wsclient_t *) user;
switch (reason) {
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
lwsl_err("%s: wsclient connection error: %s\n", __func__, in ? (char *) in : "(null)");
goto do_retry;
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
lwsl_user("%s: wsclient connection established\n", __func__);
lws_callback_on_writable(wsi);
break;
case LWS_CALLBACK_CLIENT_RECEIVE:
lwsl_hexdump_notice(in, len);
if (TTY_STDOUT_NUMBER != ((char *) in)[0]) {
lwsl_user("%s: The first char of data received is not STDOUT.\n", __func__);
return 0;
}
/*
* The first character of data received from
* the kubernets API server is "STDOUT",
* we ignore it
*/
int valid_data_len = len - 1;
if (0 == valid_data_len) {
lwsl_user("%s: The content of data received is empty.\n", __func__);
return 0;
}
char *valid_data = (char *)in + 1 ;
if (wsc->data_received_len > 0 && wsc->data_received) {
free(wsc->data_received);
wsc->data_received = NULL;
wsc->data_received_len = 0;
}
wsc->data_received = (char *) calloc(sizeof(char), valid_data_len + 1);
if (!wsc->data_received) {
lwsl_err("%s: Cannot allocate memory for received data.\n", __func__);
return -1;
}
memcpy(wsc->data_received, valid_data, valid_data_len);
wsc->data_received_len = valid_data_len;
if (wsc->data_received_len != 0 && wsc->data_received && wsc->data_callback_func) {
wsc->data_callback_func(&wsc->data_received, &wsc->data_received_len);
}
break;
case LWS_CALLBACK_CLIENT_WRITEABLE:
if (wsc->data_to_send_len > 0 && wsc->data_to_send) {
int m = lws_write(wsi, wsc->data_to_send + LWS_PRE, wsc->data_to_send_len, LWS_WRITE_BINARY);
if (m < wsc->data_to_send_len) {
lwsl_err("%s: Sending message failed: %d\n", __func__, m);
return -1;
}
lwsl_user("%s: Succeed to send message: %d\n", __func__, m);
if (wsc->data_to_send) {
free(wsc->data_to_send);
wsc->data_to_send = NULL;
wsc->data_to_send_len = 0;
}
}
break;
case LWS_CALLBACK_CLIENT_CLOSED:
//goto do_retry;
g_interrupted = true;
break;
default:
break;
}
return lws_callback_http_dummy(wsi, reason, user, in, len);
do_retry:
/*
* retry the connection to keep it nailed up
*
* For this example, we try to conceal any problem for one set of
* backoff retries and then exit the app.
*
* If you set retry.conceal_count to be larger than the number of
* elements in the backoff table, it will never give up and keep
* retrying at the last backoff delay plus the random jitter amount.
*/
if (lws_retry_sul_schedule_retry_wsi(wsi, &wsc->sul, connect_client, &wsc->retry_count)) {
lwsl_err("%s: wsclient connection attempts exhausted.\n", __func__);
g_interrupted = true;
}
return 0;
}
static const struct lws_protocols protocols[] = {
{"", callback_wsclient, 0, 0,},
{NULL, NULL, 0, 0}
};
static void read_from_stdin(wsclient_t * wsc)
{
char wsc_attach_stdin_buffer[WSC_ATTACH_STDIN_BUFFER_SIZE];
memset(wsc_attach_stdin_buffer, 0, sizeof(wsc_attach_stdin_buffer));
int flag, newflag;
flag = fcntl(STDIN_FILENO, F_GETFL);
newflag = flag | O_NONBLOCK;
fcntl(STDIN_FILENO, F_SETFL, newflag);
fgets(wsc_attach_stdin_buffer, sizeof(wsc_attach_stdin_buffer) - 1, stdin);
if (wsc_attach_stdin_buffer && strlen(wsc_attach_stdin_buffer) > 0) {
wsc->data_to_send_len = strlen(wsc_attach_stdin_buffer) + 1 /* TTY_STDIN_NUMBER */ ;
wsc->data_to_send = (char *) calloc(LWS_PRE + 1 /* TTY_STDIN_NUMBER */ + wsc->data_to_send_len + 1, 1);
wsc->data_to_send[LWS_PRE] = TTY_STDIN_NUMBER;
strncpy(wsc->data_to_send + LWS_PRE + 1, wsc_attach_stdin_buffer, strlen(wsc_attach_stdin_buffer));
}
fcntl(STDIN_FILENO, F_SETFL, flag);
}
int wsclient_run(wsclient_t * wsc, wsc_mode_t mode)
{
int n = 0;
lws_set_log_level(wsc->log_mask, NULL);
struct lws_context_creation_info info;
memset(&info, 0, sizeof(struct lws_context_creation_info));
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
info.protocols = protocols;
/*
* since we know this lws context is only ever going to be used with
* one client wsis / fds / sockets at a time, let lws know it doesn't
* have to use the default allocations for fd tables up to ulimit -n.
* It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we
* will use.
*/
info.fd_limit_per_thread = 1 + 1 + 1;
if (wsc->ssl_config) {
info.client_ssl_ca_filepath = wsc->ssl_config->CACertFile;
info.client_ssl_private_key_filepath = wsc->ssl_config->clientKeyFile;
info.client_ssl_cert_filepath = wsc->ssl_config->clientCertFile;
}
g_lws_context = lws_create_context(&info);
if (!g_lws_context) {
lwsl_err("%s: wsclient init failed.\n", __func__);
return 1;
}
/* schedule the first client connection attempt to happen immediately */
lws_sul_schedule(g_lws_context, 0, &wsc->sul, connect_client, 1);
while (n >= 0 && false == g_interrupted) {
if (WSC_MODE_IT == mode) {
read_from_stdin(wsc);
}
n = lws_service(g_lws_context, 0);
}
lwsl_user("%s: wsclient completed.\n", __func__);
lws_context_destroy(g_lws_context);
g_interrupted = false;
return 0;
}