#include "wsclient.h" #include #include #include #include #include #include #define WSC_ATTACH_STDIN_BUFFER_SIZE 1024 #define TTY_STDIN_NUMBER 0 #define TTY_STDOUT_NUMBER 1 #define WS_PROTOCOL_DELIM "://" #define WS_BASE_PATH_DELIM_CHAR ':' /* ip:port */ #define WS_BASE_PATH_DELIM_LENGTH 1 static struct lws_context *g_lws_context; static bool g_interrupted; /* * The retry and backoff policy we want to use for our client connections */ static const uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 }; static const lws_retry_bo_t retry = { .retry_ms_table = backoff_ms, .retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms), .conceal_count = LWS_ARRAY_SIZE(backoff_ms), .secs_since_valid_ping = 3, /* force PINGs after secs idle */ .secs_since_valid_hangup = 10, /* hangup after secs idle */ .jitter_percent = 20, }; static int get_server_address_and_port(char **p_ws_addr, int *p_ws_port, const char *base_path) { if (!base_path) { return -1; } char *p = strstr(base_path, WS_PROTOCOL_DELIM); /* e.g. base_path="https://ip:port" */ if (NULL == p) { fprintf(stderr, "%s: %s is not a valid websocket base path.\n", __func__, base_path); return -1; } const char *addr_and_port_string = p + strlen(WS_PROTOCOL_DELIM); /* addr_and_port_string="ip:port" */ p = strrchr(addr_and_port_string, WS_BASE_PATH_DELIM_CHAR); if (NULL == p) { fprintf(stderr, "%s: %s is not a valid websocket base path.\n", __func__, addr_and_port_string); return -1; } const char *port_string = p + WS_BASE_PATH_DELIM_LENGTH; int ws_port = atoi(port_string); if (0 == ws_port) { fprintf(stderr, "%s: Cannot get a valid websocket port from <%s>.\n", __func__, port_string); return -1; } int port_string_length = strlen(addr_and_port_string) - WS_BASE_PATH_DELIM_LENGTH - strlen(port_string); char *ws_addr = (char *) calloc(sizeof(char), port_string_length + 1); /* the memory of ws_addr will be deleted by wsclient_free() */ if (!ws_addr) { fprintf(stderr, "%s: Cannot allocate the memory for websocket server address.\n", __func__); return -1; } strncpy(ws_addr, addr_and_port_string, port_string_length); *p_ws_port = ws_port; *p_ws_addr = ws_addr; return 0; } wsclient_t *wsclient_create(const char *base_path, sslConfig_t * ssl_config, list_t * apiKeys, int ws_log_mask) { if (!base_path) { fprintf(stderr, "%s: The base path is invalid.\n", __func__); return NULL; } wsclient_t *wsc = (wsclient_t *) calloc(1, sizeof(wsclient_t)); if (!wsc) { fprintf(stderr, "%s: Cannot allocate the memory for websocket client.\n", __func__); return NULL; } int rc = get_server_address_and_port(&wsc->server_address, &wsc->server_port, base_path); if (0 != rc) { fprintf(stderr, "%s: Cannot get the websocket server address or port from the base path <%s>.\n", __func__, base_path); goto error; } wsc->ssl_config = ssl_config; wsc->log_mask = ws_log_mask; return wsc; error: if (wsc) { free(wsc); wsc = NULL; } return NULL; } void wsclient_free(wsclient_t * wsc) { if (!wsc) { return; } if (wsc->server_address) { free(wsc->server_address); wsc->server_address = NULL; } if (wsc->data_received) { free(wsc->data_received); wsc->data_received = NULL; } wsc->data_received_len = 0; wsc->data_callback_func = NULL; wsc->ssl_config = NULL; free(wsc); } /* * Scheduled sul callback that starts the connection attempt */ static void connect_client(lws_sorted_usec_list_t * sul) { wsclient_t *wsc = lws_container_of(sul, wsclient_t, sul); struct lws_client_connect_info i; memset(&i, 0, sizeof(i)); i.context = g_lws_context; i.port = wsc->server_port; i.address = wsc->server_address; i.path = wsc->path; i.host = i.address; i.origin = i.address; i.ssl_connection = wsc->ssl_config ? LCCSCF_USE_SSL : 0; //i.protocol = pro; //i.local_protocol_name = "websocket-client"; i.pwsi = &wsc->wsi; i.retry_and_idle_policy = &retry; i.userdata = wsc; if (!lws_client_connect_via_info(&i)) /* * Failed... schedule a retry... we can't use the _retry_wsi() * convenience wrapper api here because no valid wsi at this * point. */ if (lws_retry_sul_schedule(g_lws_context, 0, sul, &retry, connect_client, &wsc->retry_count)) { lwsl_err("%s: connection attempts exhausted\n", __func__); g_interrupted = true; } } static int callback_wsclient(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { lwsl_user("%s: callback reason %d\n", __func__, reason); wsclient_t *wsc = (wsclient_t *) user; switch (reason) { case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: lwsl_err("%s: wsclient connection error: %s\n", __func__, in ? (char *) in : "(null)"); goto do_retry; break; case LWS_CALLBACK_CLIENT_ESTABLISHED: lwsl_user("%s: wsclient connection established\n", __func__); lws_callback_on_writable(wsi); break; case LWS_CALLBACK_CLIENT_RECEIVE: lwsl_hexdump_notice(in, len); if (TTY_STDOUT_NUMBER != ((char *) in)[0]) { lwsl_user("%s: The first char of data received is not STDOUT.\n", __func__); return 0; } /* * The first character of data received from * the kubernets API server is "STDOUT", * we ignore it */ int valid_data_len = len - 1; if (0 == valid_data_len) { lwsl_user("%s: The content of data received is empty.\n", __func__); return 0; } char *valid_data = (char *)in + 1 ; if (wsc->data_received_len > 0 && wsc->data_received) { free(wsc->data_received); wsc->data_received = NULL; wsc->data_received_len = 0; } wsc->data_received = (char *) calloc(sizeof(char), valid_data_len + 1); if (!wsc->data_received) { lwsl_err("%s: Cannot allocate memory for received data.\n", __func__); return -1; } memcpy(wsc->data_received, valid_data, valid_data_len); wsc->data_received_len = valid_data_len; if (wsc->data_received_len != 0 && wsc->data_received && wsc->data_callback_func) { wsc->data_callback_func(&wsc->data_received, &wsc->data_received_len); } break; case LWS_CALLBACK_CLIENT_WRITEABLE: if (wsc->data_to_send_len > 0 && wsc->data_to_send) { int m = lws_write(wsi, wsc->data_to_send + LWS_PRE, wsc->data_to_send_len, LWS_WRITE_BINARY); if (m < wsc->data_to_send_len) { lwsl_err("%s: Sending message failed: %d\n", __func__, m); return -1; } lwsl_user("%s: Succeed to send message: %d\n", __func__, m); if (wsc->data_to_send) { free(wsc->data_to_send); wsc->data_to_send = NULL; wsc->data_to_send_len = 0; } } break; case LWS_CALLBACK_CLIENT_CLOSED: //goto do_retry; g_interrupted = true; break; default: break; } return lws_callback_http_dummy(wsi, reason, user, in, len); do_retry: /* * retry the connection to keep it nailed up * * For this example, we try to conceal any problem for one set of * backoff retries and then exit the app. * * If you set retry.conceal_count to be larger than the number of * elements in the backoff table, it will never give up and keep * retrying at the last backoff delay plus the random jitter amount. */ if (lws_retry_sul_schedule_retry_wsi(wsi, &wsc->sul, connect_client, &wsc->retry_count)) { lwsl_err("%s: wsclient connection attempts exhausted.\n", __func__); g_interrupted = true; } return 0; } static const struct lws_protocols protocols[] = { {"", callback_wsclient, 0, 0,}, {NULL, NULL, 0, 0} }; static void read_from_stdin(wsclient_t * wsc) { char wsc_attach_stdin_buffer[WSC_ATTACH_STDIN_BUFFER_SIZE]; memset(wsc_attach_stdin_buffer, 0, sizeof(wsc_attach_stdin_buffer)); int flag, newflag; flag = fcntl(STDIN_FILENO, F_GETFL); newflag = flag | O_NONBLOCK; fcntl(STDIN_FILENO, F_SETFL, newflag); fgets(wsc_attach_stdin_buffer, sizeof(wsc_attach_stdin_buffer) - 1, stdin); if (wsc_attach_stdin_buffer && strlen(wsc_attach_stdin_buffer) > 0) { wsc->data_to_send_len = strlen(wsc_attach_stdin_buffer) + 1 /* TTY_STDIN_NUMBER */ ; wsc->data_to_send = (char *) calloc(LWS_PRE + 1 /* TTY_STDIN_NUMBER */ + wsc->data_to_send_len + 1, 1); wsc->data_to_send[LWS_PRE] = TTY_STDIN_NUMBER; strncpy(wsc->data_to_send + LWS_PRE + 1, wsc_attach_stdin_buffer, strlen(wsc_attach_stdin_buffer)); } fcntl(STDIN_FILENO, F_SETFL, flag); } int wsclient_run(wsclient_t * wsc, wsc_mode_t mode) { int n = 0; lws_set_log_level(wsc->log_mask, NULL); struct lws_context_creation_info info; memset(&info, 0, sizeof(struct lws_context_creation_info)); info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ info.protocols = protocols; /* * since we know this lws context is only ever going to be used with * one client wsis / fds / sockets at a time, let lws know it doesn't * have to use the default allocations for fd tables up to ulimit -n. * It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we * will use. */ info.fd_limit_per_thread = 1 + 1 + 1; if (wsc->ssl_config) { info.client_ssl_ca_filepath = wsc->ssl_config->CACertFile; info.client_ssl_private_key_filepath = wsc->ssl_config->clientKeyFile; info.client_ssl_cert_filepath = wsc->ssl_config->clientCertFile; } g_lws_context = lws_create_context(&info); if (!g_lws_context) { lwsl_err("%s: wsclient init failed.\n", __func__); return 1; } /* schedule the first client connection attempt to happen immediately */ lws_sul_schedule(g_lws_context, 0, &wsc->sul, connect_client, 1); while (n >= 0 && false == g_interrupted) { if (WSC_MODE_IT == mode) { read_from_stdin(wsc); } n = lws_service(g_lws_context, 0); } lwsl_user("%s: wsclient completed.\n", __func__); lws_context_destroy(g_lws_context); g_interrupted = false; return 0; }