From 91b5f8aff8c5f646cb6d46d32f71a6ece6e749bf Mon Sep 17 00:00:00 2001 From: Hui Yu Date: Mon, 2 Nov 2020 21:38:52 +0800 Subject: [PATCH] [Example] Multi-threaded programming example --- README.md | 9 +++ examples/Makefile | 2 + examples/auth_provider/Makefile | 2 +- examples/create_pod/Makefile | 2 +- examples/delete_pod/Makefile | 2 +- examples/exec_provider/Makefile | 2 +- examples/generic/Makefile | 2 +- examples/list_pod/Makefile | 2 +- examples/list_pod_incluster/Makefile | 2 +- examples/multi_thread/.gitignore | 1 + examples/multi_thread/Makefile | 16 ++++ examples/multi_thread/create_pod.c | 83 ++++++++++++++++++++ examples/multi_thread/main.c | 55 ++++++++++++++ examples/multi_thread/mt.h | 33 ++++++++ examples/multi_thread/watch_pod.c | 109 +++++++++++++++++++++++++++ examples/watch_list_pod/Makefile | 2 +- kubernetes/include/apiClient.h | 3 + kubernetes/src/apiClient.c | 14 ++++ kubernetes/watch/watch_util.c | 1 + 19 files changed, 334 insertions(+), 8 deletions(-) create mode 100644 examples/multi_thread/.gitignore create mode 100644 examples/multi_thread/Makefile create mode 100644 examples/multi_thread/create_pod.c create mode 100644 examples/multi_thread/main.c create mode 100644 examples/multi_thread/mt.h create mode 100644 examples/multi_thread/watch_pod.c diff --git a/README.md b/README.md index 17ec1ab..418462d 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,15 @@ list all pods in cluster: apiClient_unsetupGlobalEnv(); ``` +## Multi-threaded Usage + +If the C client library is used in multi-threaded program, the following 2 actions must be taken: + +1. After the program starts up, main thread must call the function ```apiClient_setupGlobalEnv()``` before any worker thread is created. + +2. If the C client is no longer used, main thread must call the function ```apiClient_unsetupGlobalEnv()``` after all worker threads end. + +Refer to the [example](https://github.com/kubernetes-client/c/tree/master/examples/multi_thread/) for detail. ## Community, discussion, contribution, and support diff --git a/examples/Makefile b/examples/Makefile index 092a328..c8ecf6f 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -7,6 +7,7 @@ all: cd generic; make cd auth_provider; make cd watch_list_pod; make + cd multi_thread; make clean: cd create_pod; make clean @@ -17,3 +18,4 @@ clean: cd generic; make clean cd auth_provider; make clean cd watch_list_pod; make clean + cd multi_thread; make clean diff --git a/examples/auth_provider/Makefile b/examples/auth_provider/Makefile index ef1f632..931b30d 100644 --- a/examples/auth_provider/Makefile +++ b/examples/auth_provider/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml CFLAGS:=-g all: diff --git a/examples/create_pod/Makefile b/examples/create_pod/Makefile index 8137440..524119f 100644 --- a/examples/create_pod/Makefile +++ b/examples/create_pod/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml CFLAGS:=-g all: diff --git a/examples/delete_pod/Makefile b/examples/delete_pod/Makefile index 6a290e9..37320b1 100644 --- a/examples/delete_pod/Makefile +++ b/examples/delete_pod/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml CFLAGS:=-g BIN:=delete_pod_bin diff --git a/examples/exec_provider/Makefile b/examples/exec_provider/Makefile index 5851042..130bc29 100644 --- a/examples/exec_provider/Makefile +++ b/examples/exec_provider/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml CFLAGS:=-g all: my_exec_provider_bin list_pod_by_exec_provider_bin diff --git a/examples/generic/Makefile b/examples/generic/Makefile index 873e2f6..31b5456 100644 --- a/examples/generic/Makefile +++ b/examples/generic/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml CFLAGS:=-g all: diff --git a/examples/list_pod/Makefile b/examples/list_pod/Makefile index eee7d05..8825018 100644 --- a/examples/list_pod/Makefile +++ b/examples/list_pod/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml CFLAGS:=-g all: diff --git a/examples/list_pod_incluster/Makefile b/examples/list_pod_incluster/Makefile index 29c6121..981d292 100644 --- a/examples/list_pod_incluster/Makefile +++ b/examples/list_pod_incluster/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml CFLAGS:=-g all: diff --git a/examples/multi_thread/.gitignore b/examples/multi_thread/.gitignore new file mode 100644 index 0000000..c31daf6 --- /dev/null +++ b/examples/multi_thread/.gitignore @@ -0,0 +1 @@ +multi_thread_bin diff --git a/examples/multi_thread/Makefile b/examples/multi_thread/Makefile new file mode 100644 index 0000000..3ef160d --- /dev/null +++ b/examples/multi_thread/Makefile @@ -0,0 +1,16 @@ +INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -I../../kubernetes/watch +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml -lpthread +CFLAGS:=-g + +BIN:= multi_thread_bin +OBJECTS:=main.o watch_pod.o create_pod.o + +all: $(OBJECTS) + $(CC) -o $(BIN) $(OBJECTS) $(LIBS) + +$(OBJECTS): %.o: %.c + $(CC) $(CFLAGS) $(INCLUDE) -c $< -o $@ + +.PHONY : clean +clean: + rm $(BIN) $(OBJECTS) \ No newline at end of file diff --git a/examples/multi_thread/create_pod.c b/examples/multi_thread/create_pod.c new file mode 100644 index 0000000..00f7b78 --- /dev/null +++ b/examples/multi_thread/create_pod.c @@ -0,0 +1,83 @@ +#include "mt.h" + +static void create_a_pod(apiClient_t * apiClient) +{ + char *namespace = "default"; + + v1_pod_t *podinfo = calloc(1, sizeof(v1_pod_t)); + podinfo->api_version = strdup("v1"); + podinfo->kind = strdup("Pod"); + podinfo->spec = calloc(1, sizeof(v1_pod_spec_t)); + + podinfo->metadata = calloc(1, sizeof(v1_object_meta_t)); + /* set pod name */ + podinfo->metadata->name = strdup("test-pod-8"); + + /* set containers for pod */ + list_t *containerlist = list_create(); + v1_container_t *con = calloc(1, sizeof(v1_container_t)); + con->name = strdup("my-container"); + con->image = strdup("ubuntu:latest"); + con->image_pull_policy = strdup("IfNotPresent"); + + /* set command for container */ + list_t *commandlist = list_create(); + char *cmd = strdup("sleep"); + list_addElement(commandlist, cmd); + con->command = commandlist; + + list_t *arglist = list_create(); + char *arg1 = strdup("3600"); + list_addElement(arglist, arg1); + con->args = arglist; + + /* set volume mounts for container */ + list_t *volumemounts = list_create(); + v1_volume_mount_t *volmou = calloc(1, sizeof(v1_volume_mount_t)); + volmou->mount_path = strdup("/test"); + volmou->name = strdup("test"); + list_addElement(volumemounts, volmou); + con->volume_mounts = volumemounts; + + list_addElement(containerlist, con); + podinfo->spec->containers = containerlist; + + /* set volumes for pod */ + list_t *volumelist = list_create(); + v1_volume_t *volume = calloc(1, sizeof(v1_volume_t)); + volume->name = strdup("test"); + + v1_host_path_volume_source_t *hostPath = calloc(1, sizeof(v1_host_path_volume_source_t)); + hostPath->path = strdup("/test"); + volume->host_path = hostPath; + + list_addElement(volumelist, volume); + podinfo->spec->volumes = volumelist; + + /* call API in libkubernetes to create pod */ + v1_pod_t *apod = CoreV1API_createNamespacedPod(apiClient, namespace, podinfo, NULL, NULL, NULL); + printf("code=%ld\n", apiClient->response_code); + + v1_pod_free(apod); + v1_pod_free(podinfo); +} + +void *create_pod_thread_func(void *arg) +{ + kube_params_t *params = (kube_params_t *) arg; + + sleep(10); // wait watch_pod_thread to start for 10 seconds + + apiClient_t *apiClient = apiClient_create_with_base_path(params->basePath, params->sslConfig, params->apiKeys); + if (!apiClient) { + fprintf(stderr, "Cannot create a kubernetes client.\n"); + return ((void *) 1); + } + + create_a_pod(apiClient); + + apiClient_free(apiClient); + apiClient = NULL; + + pthread_exit(NULL); +} diff --git a/examples/multi_thread/main.c b/examples/multi_thread/main.c new file mode 100644 index 0000000..fea0b3b --- /dev/null +++ b/examples/multi_thread/main.c @@ -0,0 +1,55 @@ +#include +#include "mt.h" + +bool g_exit_watch = false; +pthread_mutex_t exit_watch_mutex; + +int main(int argc, char *argv[]) +{ + char *basePath = NULL; + sslConfig_t *sslConfig = NULL; + list_t *apiKeys = NULL; + int rc = load_kube_config(&basePath, &sslConfig, &apiKeys, NULL); /* NULL means loading configuration from $HOME/.kube/config */ + if (rc != 0) { + printf("Cannot load kubernetes configuration.\n"); + return -1; + } + kube_params_t *params = calloc(1, sizeof(kube_params_t)); + params->basePath = basePath; + params->sslConfig = sslConfig; + params->apiKeys = apiKeys; + apiClient_setupGlobalEnv(); + + pthread_t watch_pod_thread; + int err = pthread_create(&watch_pod_thread, NULL, watch_pod_thread_func, (void *) params); + if (0 != err) { + perror("creating thread error"); + exit(-1); + } + + pthread_t create_pod_thread; + err = pthread_create(&create_pod_thread, NULL, create_pod_thread_func, (void *) params); + if (0 != err) { + perror("creating thread error"); + exit(-1); + } + + sleep(30); + + pthread_mutex_lock(&exit_watch_mutex); + g_exit_watch = true; // notify watch thread to exit + pthread_mutex_unlock(&exit_watch_mutex); + + pthread_join(watch_pod_thread, NULL); + pthread_join(create_pod_thread, NULL); + + free_client_config(basePath, sslConfig, apiKeys); + basePath = NULL; + sslConfig = NULL; + apiKeys = NULL; + free(params); + params = NULL; + apiClient_unsetupGlobalEnv(); + + return 0; +} diff --git a/examples/multi_thread/mt.h b/examples/multi_thread/mt.h new file mode 100644 index 0000000..b99ae64 --- /dev/null +++ b/examples/multi_thread/mt.h @@ -0,0 +1,33 @@ +#ifndef __MT_H__ +#define __MT_H__ + +#include "../include/apiClient.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include +#include +#include + +typedef struct kube_params_t { + char *basePath; + sslConfig_t *sslConfig; + list_t *apiKeys; +} kube_params_t; + +extern void *watch_pod_thread_func(void *); +extern void *create_pod_thread_func(void *); + +extern bool g_exit_watch; +extern pthread_mutex_t exit_watch_mutex; + +#ifdef __cplusplus +} +#endif +#endif // __MT_H__ diff --git a/examples/multi_thread/watch_pod.c b/examples/multi_thread/watch_pod.c new file mode 100644 index 0000000..c1af55d --- /dev/null +++ b/examples/multi_thread/watch_pod.c @@ -0,0 +1,109 @@ +#include "mt.h" +#include + +#define WATCH_EVENT_KEY_TYPE "type" +#define WATCH_EVENT_KEY_OBJECT "object" + +static void on_pod_event_comes(const char *event_string) +{ + static char fname[] = "process_one_watch_event()"; + + if (!event_string) { + return; + } + + cJSON *event_json_obj = cJSON_Parse(event_string); + if (!event_json_obj) { + fprintf(stderr, "%s: Cannot create JSON from string.[%s].\n", fname, cJSON_GetErrorPtr()); + goto end; + } + + cJSON *json_value_type = cJSON_GetObjectItem(event_json_obj, WATCH_EVENT_KEY_TYPE); + if (!json_value_type || json_value_type->type != cJSON_String) { + fprintf(stderr, "%s: Cannot get type in watch event.\n", fname); + goto end; + } + char *type = strdup(json_value_type->valuestring); + printf("type: %s\n", type); + + cJSON *json_value_object = cJSON_GetObjectItem(event_json_obj, WATCH_EVENT_KEY_OBJECT); + if (!json_value_object || json_value_object->type != cJSON_Object) { + fprintf(stderr, "%s: Cannot get object in watch event.\n", fname); + goto end; + } + v1_pod_t *pod = v1_pod_parseFromJSON(json_value_object); + if (!pod) { + fprintf(stderr, "%s: Cannot get pod from watch event object.\n", fname); + goto end; + } + printf("pod:\n\tname: %s\n", pod->metadata->name); + + end: + if (pod) { + v1_pod_free(pod); + pod = NULL; + } + if (type) { + free(type); + type = NULL; + } + if (event_json_obj) { + cJSON_Delete(event_json_obj); + event_json_obj = NULL; + } +} + +static void my_pod_watch_handler(void **pData, long *pDataLen) +{ + kubernets_watch_handler(pData, pDataLen, on_pod_event_comes); +} + +static int my_watch_progress_func(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) +{ + bool exit_watch = false; + + pthread_mutex_lock(&exit_watch_mutex); + exit_watch = g_exit_watch; + pthread_mutex_unlock(&exit_watch_mutex); + + if (true == exit_watch) { + /* Returning any other non-zero value from this callback will cause libcurl to + * abort the transfer and return CURLE_ABORTED_BY_CALLBACK. + */ + return 1; + } + + return 0; +} + +void *watch_pod_thread_func(void *arg) +{ + kube_params_t *params = (kube_params_t *) arg; + apiClient_t *apiClient = apiClient_create_with_base_path(params->basePath, params->sslConfig, params->apiKeys); + if (!apiClient) { + fprintf(stderr, "Cannot create a kubernetes client.\n"); + return ((void *) 1); + } + + apiClient->data_callback_func = my_pod_watch_handler; + apiClient->progress_func = my_watch_progress_func; + + CoreV1API_listNamespacedPod(apiClient, "default", /*namespace */ + NULL, /* pretty */ + 0, /* allowWatchBookmarks */ + NULL, /* continue */ + NULL, /* fieldSelector */ + NULL, /* labelSelector */ + 0, /* limit */ + NULL, /* resourceVersion */ + 0, /* timeoutSeconds + Setting the value to 0 means the watch never stops. + */ + 1 /* watch */ + ); + + apiClient_free(apiClient); + apiClient = NULL; + + pthread_exit(NULL); +} diff --git a/examples/watch_list_pod/Makefile b/examples/watch_list_pod/Makefile index df3b28f..ba4bde0 100644 --- a/examples/watch_list_pod/Makefile +++ b/examples/watch_list_pod/Makefile @@ -1,5 +1,5 @@ INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -I../../kubernetes/watch -LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz +LIBS:=-L../../kubernetes/build -lkubernetes -lyaml CFLAGS:=-g all: diff --git a/kubernetes/include/apiClient.h b/kubernetes/include/apiClient.h index e937c5e..c7f6d6b 100644 --- a/kubernetes/include/apiClient.h +++ b/kubernetes/include/apiClient.h @@ -6,6 +6,7 @@ #include #include #include +#include #include "../include/list.h" #include "../include/keyValuePair.h" #include "../include/binary.h" @@ -24,6 +25,8 @@ typedef struct apiClient_t { void *dataReceived; long dataReceivedLen; void (*data_callback_func)(void **, long *); + int (*progress_func)(void *, curl_off_t, curl_off_t, curl_off_t, curl_off_t); + void *progress_data; long response_code; list_t *apiKeys_BearerToken; } apiClient_t; diff --git a/kubernetes/src/apiClient.c b/kubernetes/src/apiClient.c index ca1b1f8..288d406 100644 --- a/kubernetes/src/apiClient.c +++ b/kubernetes/src/apiClient.c @@ -13,6 +13,8 @@ apiClient_t *apiClient_create() { apiClient->dataReceived = NULL; apiClient->dataReceivedLen = 0; apiClient->data_callback_func = NULL; + apiClient->progress_func = NULL; + apiClient->progress_data = NULL; apiClient->response_code = 0; apiClient->apiKeys_BearerToken = NULL; @@ -39,6 +41,8 @@ apiClient_t *apiClient_create_with_base_path(const char *basePath apiClient->dataReceived = NULL; apiClient->dataReceivedLen = 0; apiClient->data_callback_func = NULL; + apiClient->progress_func = NULL; + apiClient->progress_data = NULL; apiClient->response_code = 0; if(apiKeys_BearerToken!= NULL) { apiClient->apiKeys_BearerToken = list_create(); @@ -60,6 +64,8 @@ void apiClient_free(apiClient_t *apiClient) { free(apiClient->basePath); } apiClient->data_callback_func = NULL; + apiClient->progress_func = NULL; + apiClient->progress_data = NULL; if(apiClient->apiKeys_BearerToken) { listEntry_t *listEntry = NULL; list_ForEach(listEntry, apiClient->apiKeys_BearerToken) { @@ -383,6 +389,14 @@ void apiClient_invoke(apiClient_t *apiClient, } } + if (apiClient->progress_func != NULL) { + curl_easy_setopt(handle, CURLOPT_XFERINFOFUNCTION, apiClient->progress_func); + if (apiClient->progress_data != NULL) { + curl_easy_setopt(handle, CURLOPT_XFERINFODATA, apiClient->progress_data); + } + curl_easy_setopt(handle, CURLOPT_NOPROGRESS, 0L); + } + // this would only be generated for apiKey authentication if (apiClient->apiKeys_BearerToken != NULL) { diff --git a/kubernetes/watch/watch_util.c b/kubernetes/watch/watch_util.c index b68da48..36b24a9 100644 --- a/kubernetes/watch/watch_util.c +++ b/kubernetes/watch/watch_util.c @@ -25,6 +25,7 @@ static int wu_convert_to_json_array(list_t * json_array, const char *json_string rc = -1; goto end; } + cJSON_Delete(cjson); list_addElement(json_array, strdup(token)); token = strtok(NULL, JSON_ARRAY_DELIM); }