[Example] Multi-threaded programming example

This commit is contained in:
Hui Yu
2020-11-02 21:38:52 +08:00
parent 3bdbebcae6
commit 91b5f8aff8
19 changed files with 334 additions and 8 deletions

View File

@@ -125,6 +125,15 @@ list all pods in cluster:
apiClient_unsetupGlobalEnv(); apiClient_unsetupGlobalEnv();
``` ```
## Multi-threaded Usage
If the C client library is used in multi-threaded program, the following 2 actions must be taken:
1. After the program starts up, main thread must call the function ```apiClient_setupGlobalEnv()``` before any worker thread is created.
2. If the C client is no longer used, main thread must call the function ```apiClient_unsetupGlobalEnv()``` after all worker threads end.
Refer to the [example](https://github.com/kubernetes-client/c/tree/master/examples/multi_thread/) for detail.
## Community, discussion, contribution, and support ## Community, discussion, contribution, and support

View File

@@ -7,6 +7,7 @@ all:
cd generic; make cd generic; make
cd auth_provider; make cd auth_provider; make
cd watch_list_pod; make cd watch_list_pod; make
cd multi_thread; make
clean: clean:
cd create_pod; make clean cd create_pod; make clean
@@ -17,3 +18,4 @@ clean:
cd generic; make clean cd generic; make clean
cd auth_provider; make clean cd auth_provider; make clean
cd watch_list_pod; make clean cd watch_list_pod; make clean
cd multi_thread; make clean

View File

@@ -1,5 +1,5 @@
INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config
LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz LIBS:=-L../../kubernetes/build -lkubernetes -lyaml
CFLAGS:=-g CFLAGS:=-g
all: all:

View File

@@ -1,5 +1,5 @@
INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config
LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz LIBS:=-L../../kubernetes/build -lkubernetes -lyaml
CFLAGS:=-g CFLAGS:=-g
all: all:

View File

@@ -1,5 +1,5 @@
INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config
LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz LIBS:=-L../../kubernetes/build -lkubernetes -lyaml
CFLAGS:=-g CFLAGS:=-g
BIN:=delete_pod_bin BIN:=delete_pod_bin

View File

@@ -1,5 +1,5 @@
INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config
LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz LIBS:=-L../../kubernetes/build -lkubernetes -lyaml
CFLAGS:=-g CFLAGS:=-g
all: my_exec_provider_bin list_pod_by_exec_provider_bin all: my_exec_provider_bin list_pod_by_exec_provider_bin

View File

@@ -1,5 +1,5 @@
INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config
LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz LIBS:=-L../../kubernetes/build -lkubernetes -lyaml
CFLAGS:=-g CFLAGS:=-g
all: all:

View File

@@ -1,5 +1,5 @@
INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config
LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz LIBS:=-L../../kubernetes/build -lkubernetes -lyaml
CFLAGS:=-g CFLAGS:=-g
all: all:

View File

@@ -1,5 +1,5 @@
INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config
LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz LIBS:=-L../../kubernetes/build -lkubernetes -lyaml
CFLAGS:=-g CFLAGS:=-g
all: all:

1
examples/multi_thread/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
multi_thread_bin

View File

@@ -0,0 +1,16 @@
INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -I../../kubernetes/watch
LIBS:=-L../../kubernetes/build -lkubernetes -lyaml -lpthread
CFLAGS:=-g
BIN:= multi_thread_bin
OBJECTS:=main.o watch_pod.o create_pod.o
all: $(OBJECTS)
$(CC) -o $(BIN) $(OBJECTS) $(LIBS)
$(OBJECTS): %.o: %.c
$(CC) $(CFLAGS) $(INCLUDE) -c $< -o $@
.PHONY : clean
clean:
rm $(BIN) $(OBJECTS)

View File

@@ -0,0 +1,83 @@
#include "mt.h"
static void create_a_pod(apiClient_t * apiClient)
{
char *namespace = "default";
v1_pod_t *podinfo = calloc(1, sizeof(v1_pod_t));
podinfo->api_version = strdup("v1");
podinfo->kind = strdup("Pod");
podinfo->spec = calloc(1, sizeof(v1_pod_spec_t));
podinfo->metadata = calloc(1, sizeof(v1_object_meta_t));
/* set pod name */
podinfo->metadata->name = strdup("test-pod-8");
/* set containers for pod */
list_t *containerlist = list_create();
v1_container_t *con = calloc(1, sizeof(v1_container_t));
con->name = strdup("my-container");
con->image = strdup("ubuntu:latest");
con->image_pull_policy = strdup("IfNotPresent");
/* set command for container */
list_t *commandlist = list_create();
char *cmd = strdup("sleep");
list_addElement(commandlist, cmd);
con->command = commandlist;
list_t *arglist = list_create();
char *arg1 = strdup("3600");
list_addElement(arglist, arg1);
con->args = arglist;
/* set volume mounts for container */
list_t *volumemounts = list_create();
v1_volume_mount_t *volmou = calloc(1, sizeof(v1_volume_mount_t));
volmou->mount_path = strdup("/test");
volmou->name = strdup("test");
list_addElement(volumemounts, volmou);
con->volume_mounts = volumemounts;
list_addElement(containerlist, con);
podinfo->spec->containers = containerlist;
/* set volumes for pod */
list_t *volumelist = list_create();
v1_volume_t *volume = calloc(1, sizeof(v1_volume_t));
volume->name = strdup("test");
v1_host_path_volume_source_t *hostPath = calloc(1, sizeof(v1_host_path_volume_source_t));
hostPath->path = strdup("/test");
volume->host_path = hostPath;
list_addElement(volumelist, volume);
podinfo->spec->volumes = volumelist;
/* call API in libkubernetes to create pod */
v1_pod_t *apod = CoreV1API_createNamespacedPod(apiClient, namespace, podinfo, NULL, NULL, NULL);
printf("code=%ld\n", apiClient->response_code);
v1_pod_free(apod);
v1_pod_free(podinfo);
}
void *create_pod_thread_func(void *arg)
{
kube_params_t *params = (kube_params_t *) arg;
sleep(10); // wait watch_pod_thread to start for 10 seconds
apiClient_t *apiClient = apiClient_create_with_base_path(params->basePath, params->sslConfig, params->apiKeys);
if (!apiClient) {
fprintf(stderr, "Cannot create a kubernetes client.\n");
return ((void *) 1);
}
create_a_pod(apiClient);
apiClient_free(apiClient);
apiClient = NULL;
pthread_exit(NULL);
}

View File

@@ -0,0 +1,55 @@
#include <kube_config.h>
#include "mt.h"
bool g_exit_watch = false;
pthread_mutex_t exit_watch_mutex;
int main(int argc, char *argv[])
{
char *basePath = NULL;
sslConfig_t *sslConfig = NULL;
list_t *apiKeys = NULL;
int rc = load_kube_config(&basePath, &sslConfig, &apiKeys, NULL); /* NULL means loading configuration from $HOME/.kube/config */
if (rc != 0) {
printf("Cannot load kubernetes configuration.\n");
return -1;
}
kube_params_t *params = calloc(1, sizeof(kube_params_t));
params->basePath = basePath;
params->sslConfig = sslConfig;
params->apiKeys = apiKeys;
apiClient_setupGlobalEnv();
pthread_t watch_pod_thread;
int err = pthread_create(&watch_pod_thread, NULL, watch_pod_thread_func, (void *) params);
if (0 != err) {
perror("creating thread error");
exit(-1);
}
pthread_t create_pod_thread;
err = pthread_create(&create_pod_thread, NULL, create_pod_thread_func, (void *) params);
if (0 != err) {
perror("creating thread error");
exit(-1);
}
sleep(30);
pthread_mutex_lock(&exit_watch_mutex);
g_exit_watch = true; // notify watch thread to exit
pthread_mutex_unlock(&exit_watch_mutex);
pthread_join(watch_pod_thread, NULL);
pthread_join(create_pod_thread, NULL);
free_client_config(basePath, sslConfig, apiKeys);
basePath = NULL;
sslConfig = NULL;
apiKeys = NULL;
free(params);
params = NULL;
apiClient_unsetupGlobalEnv();
return 0;
}

View File

@@ -0,0 +1,33 @@
#ifndef __MT_H__
#define __MT_H__
#include "../include/apiClient.h"
#ifdef __cplusplus
extern "C" {
#endif
#include <malloc.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <pthread.h>
#include <apiClient.h>
#include <CoreV1API.h>
typedef struct kube_params_t {
char *basePath;
sslConfig_t *sslConfig;
list_t *apiKeys;
} kube_params_t;
extern void *watch_pod_thread_func(void *);
extern void *create_pod_thread_func(void *);
extern bool g_exit_watch;
extern pthread_mutex_t exit_watch_mutex;
#ifdef __cplusplus
}
#endif
#endif // __MT_H__

View File

@@ -0,0 +1,109 @@
#include "mt.h"
#include <watch_util.h>
#define WATCH_EVENT_KEY_TYPE "type"
#define WATCH_EVENT_KEY_OBJECT "object"
static void on_pod_event_comes(const char *event_string)
{
static char fname[] = "process_one_watch_event()";
if (!event_string) {
return;
}
cJSON *event_json_obj = cJSON_Parse(event_string);
if (!event_json_obj) {
fprintf(stderr, "%s: Cannot create JSON from string.[%s].\n", fname, cJSON_GetErrorPtr());
goto end;
}
cJSON *json_value_type = cJSON_GetObjectItem(event_json_obj, WATCH_EVENT_KEY_TYPE);
if (!json_value_type || json_value_type->type != cJSON_String) {
fprintf(stderr, "%s: Cannot get type in watch event.\n", fname);
goto end;
}
char *type = strdup(json_value_type->valuestring);
printf("type: %s\n", type);
cJSON *json_value_object = cJSON_GetObjectItem(event_json_obj, WATCH_EVENT_KEY_OBJECT);
if (!json_value_object || json_value_object->type != cJSON_Object) {
fprintf(stderr, "%s: Cannot get object in watch event.\n", fname);
goto end;
}
v1_pod_t *pod = v1_pod_parseFromJSON(json_value_object);
if (!pod) {
fprintf(stderr, "%s: Cannot get pod from watch event object.\n", fname);
goto end;
}
printf("pod:\n\tname: %s\n", pod->metadata->name);
end:
if (pod) {
v1_pod_free(pod);
pod = NULL;
}
if (type) {
free(type);
type = NULL;
}
if (event_json_obj) {
cJSON_Delete(event_json_obj);
event_json_obj = NULL;
}
}
static void my_pod_watch_handler(void **pData, long *pDataLen)
{
kubernets_watch_handler(pData, pDataLen, on_pod_event_comes);
}
static int my_watch_progress_func(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
{
bool exit_watch = false;
pthread_mutex_lock(&exit_watch_mutex);
exit_watch = g_exit_watch;
pthread_mutex_unlock(&exit_watch_mutex);
if (true == exit_watch) {
/* Returning any other non-zero value from this callback will cause libcurl to
* abort the transfer and return CURLE_ABORTED_BY_CALLBACK.
*/
return 1;
}
return 0;
}
void *watch_pod_thread_func(void *arg)
{
kube_params_t *params = (kube_params_t *) arg;
apiClient_t *apiClient = apiClient_create_with_base_path(params->basePath, params->sslConfig, params->apiKeys);
if (!apiClient) {
fprintf(stderr, "Cannot create a kubernetes client.\n");
return ((void *) 1);
}
apiClient->data_callback_func = my_pod_watch_handler;
apiClient->progress_func = my_watch_progress_func;
CoreV1API_listNamespacedPod(apiClient, "default", /*namespace */
NULL, /* pretty */
0, /* allowWatchBookmarks */
NULL, /* continue */
NULL, /* fieldSelector */
NULL, /* labelSelector */
0, /* limit */
NULL, /* resourceVersion */
0, /* timeoutSeconds
Setting the value to 0 means the watch never stops.
*/
1 /* watch */
);
apiClient_free(apiClient);
apiClient = NULL;
pthread_exit(NULL);
}

View File

@@ -1,5 +1,5 @@
INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -I../../kubernetes/watch INCLUDE:=-I../../kubernetes/include -I../../kubernetes/model -I../../kubernetes/api -I../../kubernetes/config -I../../kubernetes/watch
LIBS:=-L../../kubernetes/build -lkubernetes -lcurl -lyaml -lpthread -lssl -lz LIBS:=-L../../kubernetes/build -lkubernetes -lyaml
CFLAGS:=-g CFLAGS:=-g
all: all:

View File

@@ -6,6 +6,7 @@
#include <stdio.h> #include <stdio.h>
#include <stdbool.h> #include <stdbool.h>
#include <stdint.h> #include <stdint.h>
#include <curl/curl.h>
#include "../include/list.h" #include "../include/list.h"
#include "../include/keyValuePair.h" #include "../include/keyValuePair.h"
#include "../include/binary.h" #include "../include/binary.h"
@@ -24,6 +25,8 @@ typedef struct apiClient_t {
void *dataReceived; void *dataReceived;
long dataReceivedLen; long dataReceivedLen;
void (*data_callback_func)(void **, long *); void (*data_callback_func)(void **, long *);
int (*progress_func)(void *, curl_off_t, curl_off_t, curl_off_t, curl_off_t);
void *progress_data;
long response_code; long response_code;
list_t *apiKeys_BearerToken; list_t *apiKeys_BearerToken;
} apiClient_t; } apiClient_t;

View File

@@ -13,6 +13,8 @@ apiClient_t *apiClient_create() {
apiClient->dataReceived = NULL; apiClient->dataReceived = NULL;
apiClient->dataReceivedLen = 0; apiClient->dataReceivedLen = 0;
apiClient->data_callback_func = NULL; apiClient->data_callback_func = NULL;
apiClient->progress_func = NULL;
apiClient->progress_data = NULL;
apiClient->response_code = 0; apiClient->response_code = 0;
apiClient->apiKeys_BearerToken = NULL; apiClient->apiKeys_BearerToken = NULL;
@@ -39,6 +41,8 @@ apiClient_t *apiClient_create_with_base_path(const char *basePath
apiClient->dataReceived = NULL; apiClient->dataReceived = NULL;
apiClient->dataReceivedLen = 0; apiClient->dataReceivedLen = 0;
apiClient->data_callback_func = NULL; apiClient->data_callback_func = NULL;
apiClient->progress_func = NULL;
apiClient->progress_data = NULL;
apiClient->response_code = 0; apiClient->response_code = 0;
if(apiKeys_BearerToken!= NULL) { if(apiKeys_BearerToken!= NULL) {
apiClient->apiKeys_BearerToken = list_create(); apiClient->apiKeys_BearerToken = list_create();
@@ -60,6 +64,8 @@ void apiClient_free(apiClient_t *apiClient) {
free(apiClient->basePath); free(apiClient->basePath);
} }
apiClient->data_callback_func = NULL; apiClient->data_callback_func = NULL;
apiClient->progress_func = NULL;
apiClient->progress_data = NULL;
if(apiClient->apiKeys_BearerToken) { if(apiClient->apiKeys_BearerToken) {
listEntry_t *listEntry = NULL; listEntry_t *listEntry = NULL;
list_ForEach(listEntry, apiClient->apiKeys_BearerToken) { list_ForEach(listEntry, apiClient->apiKeys_BearerToken) {
@@ -383,6 +389,14 @@ void apiClient_invoke(apiClient_t *apiClient,
} }
} }
if (apiClient->progress_func != NULL) {
curl_easy_setopt(handle, CURLOPT_XFERINFOFUNCTION, apiClient->progress_func);
if (apiClient->progress_data != NULL) {
curl_easy_setopt(handle, CURLOPT_XFERINFODATA, apiClient->progress_data);
}
curl_easy_setopt(handle, CURLOPT_NOPROGRESS, 0L);
}
// this would only be generated for apiKey authentication // this would only be generated for apiKey authentication
if (apiClient->apiKeys_BearerToken != NULL) if (apiClient->apiKeys_BearerToken != NULL)
{ {

View File

@@ -25,6 +25,7 @@ static int wu_convert_to_json_array(list_t * json_array, const char *json_string
rc = -1; rc = -1;
goto end; goto end;
} }
cJSON_Delete(cjson);
list_addElement(json_array, strdup(token)); list_addElement(json_array, strdup(token));
token = strtok(NULL, JSON_ARRAY_DELIM); token = strtok(NULL, JSON_ARRAY_DELIM);
} }