From dcc25f88a7329c963101b436813b7a4dcade4950 Mon Sep 17 00:00:00 2001 From: Asger Gitz-Johansen Date: Sat, 3 Aug 2024 13:08:46 +0200 Subject: [PATCH] feat: add threadpool and started pooling executors --- Makefile | 6 ++ README.md | 5 +- include/cli.h | 9 +-- include/notify.h | 5 +- include/pipeline.h | 25 +++++++ include/threadlist.h | 33 +++++++++ include/threadpool.h | 42 ++++++++++++ include/util.h | 8 +++ src/cli.c | 43 +++++------- src/main.c | 112 ++++++++++++++++++------------ src/notify.c | 20 ++++-- src/pipeline.c | 65 ++++++++++++++++++ src/threadlist.c | 60 ++++++++++++++++ src/threadpool.c | 158 +++++++++++++++++++++++++++++++++++++++++++ src/util.c | 38 +++++++++++ 15 files changed, 544 insertions(+), 85 deletions(-) create mode 100644 include/pipeline.h create mode 100644 include/threadlist.h create mode 100644 include/threadpool.h create mode 100644 src/pipeline.c create mode 100644 src/threadlist.c create mode 100644 src/threadpool.c create mode 100644 src/util.c diff --git a/Makefile b/Makefile index d4fdfeb..39cfd01 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,8 @@ BINDIR := out/bin CFLAGS += -DSCI_VERSION="\"$(VERSION)\"" CFLAGS += -DSCI_NAME="\"$(NAME)\"" CFLAGS += -DSCI_DESCRIPTION="\"$(DESCRIPTION)\"" +CFLAGS += -D_POSIX_C_SOURCE=2 +CFLAGS += -D_GNU_SOURCE # compiler flags CFLAGS += -Wall -Werror -std=c23 -g # includes @@ -33,6 +35,10 @@ OBJ += out/obj/main.o OBJ += out/obj/cli.o OBJ += out/obj/log.o OBJ += out/obj/notify.o +OBJ += out/obj/util.o +OBJ += out/obj/pipeline.o +OBJ += out/obj/threadlist.o +OBJ += out/obj/threadpool.o out/bin/sci: $(OBJ) | $(BINDIR) $(CC) -o $@ $(CFLAGS) $^ diff --git a/README.md b/README.md index a8978fa..e5344d8 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,10 @@ If you want `compile_commands.json` files, you should use [bear](https://github. output file using cli options. - [x] Third things third, implement a thing that simultaneously watches two different files (multithreading). it should be cancellable with ctrl+c, but it should just contiuously print event notifications. - - [ ] Fourth things fourth, implement a prototype that reads a space-separated file and populates a struct. + - [x] Fourth things fourth, implement a prototype that reads a space-separated file and populates a struct. + - [ ] Fifth things fifth, implement a prototype that spawns a new thread that executes a shell command. + - [ ] Voila! You're there! + - [ ] Second iteration! Now Reimplement it and do it better! ### Note Regarding `inotify` usage From the manpage: diff --git a/include/cli.h b/include/cli.h index 9dbbdda..a8cc973 100644 --- a/include/cli.h +++ b/include/cli.h @@ -4,11 +4,8 @@ #include "optional.h" typedef struct { - // prototyping - optional_str file; - optional_str file2; - - // actual + optional_str config_file; + int executors; int verbosity; bool help; bool version; @@ -20,7 +17,7 @@ typedef struct { cli_options new_options(); // Delete a cli_options struct instance. -void free_options(cli_options v); +void destroy_options(cli_options v); // Print the help message. void print_help(FILE * out, char* prog_name); diff --git a/include/notify.h b/include/notify.h index 1ff00e9..7b98366 100644 --- a/include/notify.h +++ b/include/notify.h @@ -1,11 +1,12 @@ #ifndef SCI_NOTIFY_H #define SCI_NOTIFY_H +#include "pipeline.h" #include -typedef void(*notify_callback)(struct inotify_event* const); +typedef void(*notify_callback)(pipeline_event* const); // Start listening for changes to the provided file. // Note that the `struct inotify_event*` provided is a managed pointer. -void listen_for_changes(const char* filename, notify_callback callback); +void listen_for_changes(const pipeline_conf* config, notify_callback callback); #endif diff --git a/include/pipeline.h b/include/pipeline.h new file mode 100644 index 0000000..42c01d7 --- /dev/null +++ b/include/pipeline.h @@ -0,0 +1,25 @@ +#ifndef SCI_PIPELINE_H +#define SCI_PIPELINE_H +#include "optional.h" +#include +#include + +typedef struct { + char* name; + char* url; + char* trigger; + char* command; +} pipeline_conf; +typedef optional_type(pipeline_conf*) optional_pipeline_conf; + +typedef struct { + const struct inotify_event* event; + const char* command; +} pipeline_event; + +// create a new pipeline_conf struct instance based on a configuration line. +optional_pipeline_conf pipeline_create(const char* config_line); +void pipeline_register(pthread_t thread); +void pipeline_loop(); + +#endif diff --git a/include/threadlist.h b/include/threadlist.h new file mode 100644 index 0000000..9c9a0cc --- /dev/null +++ b/include/threadlist.h @@ -0,0 +1,33 @@ +#ifndef SCI_THREADLIST_H +#define SCI_THREADLIST_H +#include + +// doubly linked list implementation for managing threads +typedef struct pthread_list_node { + pthread_t thread; + struct pthread_list_node* previous; + struct pthread_list_node* next; +} pthread_list_node; + +// Create a new root node. +pthread_list_node* create_thread_node(pthread_t thread); + +// Add a new node to the end of the list. +// Returns the new node. +pthread_list_node* add_thread(pthread_t thread, pthread_list_node* root); + +pthread_list_node* add_thread_node(pthread_list_node* root, pthread_list_node* node); + +// Remove a node from the list. +// This will not call pthread_join, so make sure that the thread is joined before calling. +void remove_thread_node(pthread_list_node* node); + +// Completely clear the thread list. +// This will call pthread_join on all nodes. +// The list is completely invalid after this call and should be discarded. +// Note: +// - `root` has already been free'd. +// - this function is not thread-safe. +void clear_thread_list(pthread_list_node* root); + +#endif diff --git a/include/threadpool.h b/include/threadpool.h new file mode 100644 index 0000000..6a2879e --- /dev/null +++ b/include/threadpool.h @@ -0,0 +1,42 @@ +#ifndef SCI_THREADPOOL_H +#define SCI_THREADPOOL_H +#include +#include +#include +// Very inspired by: https://nachtimwald.com/2019/04/12/thread-pool-in-c/ + +// a work function +typedef void (*thread_func)(void *arg); + +// linked list for work functions +typedef struct threadpool_work { + thread_func func; + void* arg; + struct threadpool_work* next; +} threadpool_work; + +// thread pool object +typedef struct { + threadpool_work* work_first; + threadpool_work* work_last; + pthread_mutex_t work_mutex; + pthread_cond_t work_cond; + pthread_cond_t working_cond; + size_t working_count; + size_t thread_count; + bool stop; +} threadpool; + +// create a new threadpool instance with `num` amount of worker threads +threadpool* threadpool_create(size_t num); + +// destroy a threadpool instance +void threadpool_destroy(threadpool* pool); + +// add work to the threadpool +bool threadpool_add_work(threadpool* pool, thread_func func, void* arg); + +// wait for the remaining work in the threadpool to complete +void threadpool_wait(threadpool* pool); + +#endif diff --git a/include/util.h b/include/util.h index a0939a0..b143e8c 100644 --- a/include/util.h +++ b/include/util.h @@ -13,5 +13,13 @@ assert(fd != -1); \ } \ } while (0) +#define ARRAY_SIZE(a) (sizeof((a)) / sizeof((a)[0])) + +// remove whitespace characters on both ends of the string. +// Retuirns a new string that you must free. +char* trim(const char* const str); + +typedef void(*line_handler)(const char*); +void per_line(const char* file, line_handler handler); #endif diff --git a/src/cli.c b/src/cli.c index ef764e8..ea5ec39 100644 --- a/src/cli.c +++ b/src/cli.c @@ -6,16 +6,11 @@ cli_options new_options() { cli_options result; - result.file.has_value = false; - result.file.value = NULL; - - result.file2.has_value = false; - result.file2.value = NULL; - + result.config_file.has_value = false; + result.config_file.value = NULL; + result.executors = 32; result.verbosity = 1; - result.help = false; - result.version = false; char *no_color = getenv("NO_COLOR"); @@ -29,26 +24,23 @@ cli_options new_options() { return result; } -void free_options(cli_options v) { - if(v.file.has_value) - free(v.file.value); - if(v.file2.has_value) - free(v.file2.value); +void destroy_options(cli_options v) { + if(v.config_file.has_value) + free(v.config_file.value); + if(v.log_file.has_value) + free(v.log_file.value); } // +#include +#include #include #include +#include +#include #include -#include +void executor(void* data) { + const char* command = data; + system(command); +} -void on_event(struct inotify_event* const e) { - const char* msg = - "got an event:\n" - " wd: %d\n" - " mask: %d\n" - " cookie: %d\n" - " len: %d\n" - " name: %s" - ; - log_info(msg, e->wd, e->mask, e->cookie, e->len, e->name); +threadpool* pool = NULL; + +void on_event(pipeline_event* const e) { + threadpool_add_work(pool, executor, (void*)e->command); } void* listen_for_changes_thread(void* data) { - const char* f = (const char*)data; - listen_for_changes(f, &on_event); + const pipeline_conf* conf = (const pipeline_conf*)data; + while(1) // TODO: Should be while(sigint_has_not_been_caught) instead + listen_for_changes(conf, &on_event); + // We're now done with the config. + free(conf->name); + free(conf->url); + free(conf->trigger); + free(conf->command); + free(data); return NULL; } +// Spawn a thread that fires events whenever the provided trigger file has changed. +// Usage: +// `pthread_t t = spawn_listener(config);` +// `pthread_join(t, NULL);` +pthread_t spawn_listener(const pipeline_conf* conf) { + log_info("spawning trigger thread for %s", conf->name); + pthread_t result; + ASSERT_SYSCALL_SUCCESS(pthread_create(&result, NULL, &listen_for_changes_thread, (void*)conf)); + pthread_setname_np(result, "sci-listener"); + return result; +} + +void config_interpret_line(const char* line) { + optional_pipeline_conf conf = pipeline_create(line); + if(!conf.has_value) { + log_error("unable to register pipeline"); + return; + } + char* dest; + // NOTE: trigger names are allowed max 32 characters + dest = malloc(sizeof(*dest) * (9+33)); + dest[0] = '\0'; + strncat(dest, "/tmp/sci/", 10); + strncat(dest, conf.value->trigger, 33); + free(conf.value->trigger); + conf.value->trigger = dest; + pthread_t t = spawn_listener(conf.value); + pipeline_register(t); +} + int main(int argc, char** argv) { cli_options args = parse(argc, argv); log_settings settings; @@ -33,51 +74,34 @@ int main(int argc, char** argv) { settings.use_colors = args.use_colors; settings.out_file = args.log_file.has_value ? fopen(args.log_file.value, "w+") : stdout; log_init(settings); + pool = threadpool_create(args.executors); if(args.help) { print_help(stdout, argv[0]); exit(EXIT_SUCCESS); } + if(args.version) { fprintf(stdout, SCI_VERSION "\n"); exit(EXIT_SUCCESS); } - if(!args.file.has_value) { + if(!args.config_file.has_value) { fprintf(stderr, "no file provided see -h for usage\n"); exit(EXIT_FAILURE); } - if(!args.file2.has_value) { - fprintf(stderr, "no second file provided see -h for usage\n"); + + struct stat st = {0}; + if(stat("/tmp/sci", &st) == -1) + mkdir("/tmp/sci", 0700); + + if(access(args.config_file.value, F_OK) != 0) { + fprintf(stderr, "no such file or directory %s\n", args.config_file.value); exit(EXIT_FAILURE); } + + per_line(args.config_file.value, &config_interpret_line); - if(access(args.file.value, F_OK) != 0) { - fprintf(stderr, "no such file or directory %s\n", args.file.value); - exit(EXIT_FAILURE); - } - if(access(args.file2.value, F_OK) != 0) { - fprintf(stderr, "no such file or directory %s\n", args.file2.value); - exit(EXIT_FAILURE); - } - - - log_trace("spawning trigger thread for %s", args.file.value); - pthread_t file1_thread; - pthread_attr_t attr1; - ASSERT_SYSCALL_SUCCESS(pthread_attr_init(&attr1)); - ASSERT_SYSCALL_SUCCESS(pthread_create(&file1_thread, &attr1, &listen_for_changes_thread, (void*)args.file.value)); - ASSERT_SYSCALL_SUCCESS(pthread_attr_destroy(&attr1)); - - log_trace("spawning trigger thread for %s", args.file2.value); - pthread_t file2_thread; - pthread_attr_t attr2; - ASSERT_SYSCALL_SUCCESS(pthread_attr_init(&attr2)); - ASSERT_SYSCALL_SUCCESS(pthread_create(&file2_thread, &attr1, &listen_for_changes_thread, (void*)args.file2.value)); - ASSERT_SYSCALL_SUCCESS(pthread_attr_destroy(&attr2)); - - pthread_join(file1_thread, NULL); - pthread_join(file2_thread, NULL); - - free_options(args); + pipeline_loop(); + threadpool_destroy(pool); } diff --git a/src/notify.c b/src/notify.c index 39167e7..158c597 100644 --- a/src/notify.c +++ b/src/notify.c @@ -5,20 +5,28 @@ #define EV_SIZE sizeof(struct inotify_event) #define BUF_LEN EV_SIZE * 32 -void listen_for_changes(const char* filename, notify_callback callback) { +void listen_for_changes(const pipeline_conf* config, notify_callback callback) { + // TODO: callback is a bit slow sometimes. We should also poll once after calling callback + const char* filename = config->trigger; + if(access(filename, F_OK) != 0) { + log_trace("file does not exist yet, creating it."); + FILE* f = fopen(filename, "w+"); + fclose(f); + } int fd = inotify_init(); ASSERT_SYSCALL_SUCCESS(fd); inotify_add_watch(fd, filename, IN_ATTRIB); - - log_info("listening for changes in file: %s", filename); - + log_trace("listening for changes in file: %s", filename); char buffer[BUF_LEN]; int r = read(fd, buffer, BUF_LEN); assert(r != -1); for(int i = 0; i < r; ) { struct inotify_event* e = (struct inotify_event*)&buffer[i]; - callback(e); + pipeline_event ev; + ev.event = e; + ev.command = config->command; + callback(&ev); i += EV_SIZE + e->len; } - ASSERT_SYSCALL_SUCCESS(close(fd)); // TODO: have a hashmap of threads (see readme) + ASSERT_SYSCALL_SUCCESS(close(fd)); } diff --git a/src/pipeline.c b/src/pipeline.c new file mode 100644 index 0000000..7b2656c --- /dev/null +++ b/src/pipeline.c @@ -0,0 +1,65 @@ +#include "log.h" +#include "pipeline.h" +#include "threadlist.h" +#include "util.h" +#include +#include +#include + +pthread_list_node* root = NULL; + +optional_pipeline_conf pipeline_create(const char* config_line) { + optional_pipeline_conf result; + result.has_value = false; + const char* pattern = "[^[:blank:]]+|\"[^\"]*\""; + regex_t reg; + regmatch_t pmatch[1]; + regoff_t off, len; + assert(regcomp(®, pattern, REG_EXTENDED) == 0); + const char* cursor = config_line; + char* opts[4]; + int i = 0; + for( ; i < 4; i++) { + if(regexec(®, cursor, ARRAY_SIZE(pmatch), pmatch, 0)) + break; + off = pmatch[0].rm_so + (cursor - config_line); + len = pmatch[0].rm_eo - pmatch[0].rm_so; + opts[i] = strndup(config_line + off, len); + cursor += pmatch[0].rm_eo; + } + if(i != 4) { + log_error("invalid configuration!\nline is invalid: \"%s\""); + for(int j = i-1; j >= 0; j--) + free(opts[j]); + return result; + } + + result.value = malloc(sizeof(pipeline_conf)); + result.value->name = opts[0]; + result.value->url = opts[1]; + result.value->trigger = opts[2]; + result.value->command = opts[3]; + const char* msg = + "config:\n" + "name: %s\n" + "url: %s\n" + "trigger: %s\n" + "command: %s" + ; + log_trace(msg, result.value->name, result.value->url, result.value->trigger, result.value->command); + result.has_value = true; + return result; +} + +void pipeline_register(pthread_t thread) { + if(root == NULL) { + root = create_thread_node(thread); + return; + } + add_thread(thread, root); +} + +void pipeline_loop() { + clear_thread_list(root); + root = NULL; +} diff --git a/src/threadlist.c b/src/threadlist.c new file mode 100644 index 0000000..e3ee75c --- /dev/null +++ b/src/threadlist.c @@ -0,0 +1,60 @@ +#include "threadlist.h" +#include + +// TODO: mutex should really be per-list. +pthread_mutex_t threadlist_mutex = PTHREAD_MUTEX_INITIALIZER; + +pthread_list_node* create_thread_node(pthread_t thread) { + pthread_list_node* new = malloc(sizeof(pthread_list_node)); + new->previous = NULL; + new->next = NULL; + new->thread = thread; + return new; +} + +pthread_list_node* add_thread(pthread_t thread, pthread_list_node* root) { + pthread_mutex_lock(&threadlist_mutex); + pthread_list_node* cursor = root; + while(cursor->next != NULL) + cursor = cursor->next; + pthread_list_node* new = malloc(sizeof(pthread_list_node)); + new->previous = cursor; + new->next = NULL; + new->thread = thread; + cursor->next = new; + pthread_mutex_unlock(&threadlist_mutex); + return new; +} + +pthread_list_node* add_thread_node(pthread_list_node* root, pthread_list_node* node) { + pthread_mutex_lock(&threadlist_mutex); + pthread_list_node* cursor = root; + while(cursor->next != NULL) + cursor = cursor->next; + node->previous = cursor; + node->next = NULL; + cursor->next = node; + pthread_mutex_unlock(&threadlist_mutex); + return node; +} + +void remove_thread_node(pthread_list_node* node) { + pthread_mutex_lock(&threadlist_mutex); + pthread_list_node* prev = node->previous; + pthread_list_node* next = node->next; + free(node); + if(prev != NULL) + prev->next = next; + if(next != NULL) + next->previous = prev; + pthread_mutex_unlock(&threadlist_mutex); +} + +void clear_thread_list(pthread_list_node* root) { + pthread_list_node* cursor = root; + while(cursor != NULL) { + pthread_join(cursor->thread, NULL); + cursor = cursor->next; + free(cursor->previous); + } +} diff --git a/src/threadpool.c b/src/threadpool.c new file mode 100644 index 0000000..9fd14c0 --- /dev/null +++ b/src/threadpool.c @@ -0,0 +1,158 @@ +#include "threadpool.h" +#include "log.h" +#include + +static threadpool_work* threadpool_work_create(thread_func func, void *arg) { + threadpool_work* result; + if (func == NULL) + return NULL; + result = malloc(sizeof(threadpool_work)); + result->func = func; + result->arg = arg; + result->next = NULL; + return result; +} + +static void threadpool_work_destroy(threadpool_work* work) { + if (work == NULL) + return; + free(work); +} + +static threadpool_work* threadpool_work_pop(threadpool* pool) { + threadpool_work* work; + if (pool == NULL) + return NULL; + + work = pool->work_first; + if (work == NULL) + return NULL; // TODO: This should propbably be using optionals + + if (work->next == NULL) { + pool->work_first = NULL; + pool->work_last = NULL; + return work; + } + pool->work_first = work->next; + return work; +} + +static void* threadpool_worker(void *arg) { + log_trace("threadpool worker spawned"); + threadpool* pool = arg; + threadpool_work* work; + while(true) { + // Wait for work + pthread_mutex_lock(&(pool->work_mutex)); + while (pool->work_first == NULL && !pool->stop) + pthread_cond_wait(&(pool->work_cond), &(pool->work_mutex)); + if (pool->stop) + break; + work = threadpool_work_pop(pool); + pool->working_count++; + pthread_mutex_unlock(&(pool->work_mutex)); + + // Do the work + if (work != NULL) { + work->func(work->arg); + threadpool_work_destroy(work); + } + + pthread_mutex_lock(&(pool->work_mutex)); + pool->working_count--; + if (!pool->stop && pool->working_count == 0 && pool->work_first == NULL) + pthread_cond_signal(&(pool->working_cond)); + pthread_mutex_unlock(&(pool->work_mutex)); + } + pool->thread_count--; + pthread_cond_signal(&(pool->working_cond)); + pthread_mutex_unlock(&(pool->work_mutex)); + return NULL; +} + +threadpool* threadpool_create(size_t num) { + log_trace("creating threadpool of size %d", num); + threadpool* result; + pthread_t thread; + size_t i; + if (num == 0) + num = 2; + result = calloc(1, sizeof(threadpool)); + result->thread_count = num; + pthread_mutex_init(&(result->work_mutex), NULL); + pthread_cond_init(&(result->work_cond), NULL); + pthread_cond_init(&(result->working_cond), NULL); + result->work_first = NULL; + result->work_last = NULL; + for (i=0; iwork_mutex)); + work = pool->work_first; + while (work != NULL) { + work2 = work->next; + threadpool_worker(work); + work = work2; + } + pool->work_first = NULL; + pool->stop = true; + pthread_cond_broadcast(&(pool->work_cond)); + pthread_mutex_unlock(&(pool->work_mutex)); + + threadpool_wait(pool); + + pthread_mutex_destroy(&(pool->work_mutex)); + pthread_cond_destroy(&(pool->work_cond)); + pthread_cond_destroy(&(pool->working_cond)); + + free(pool); +} + +bool threadpool_add_work(threadpool* pool, thread_func func, void *arg) { + log_trace("adding work task to pool"); + threadpool_work* work; + if (pool == NULL) + return false; + + work = threadpool_work_create(func, arg); + if (work == NULL) + return false; + + pthread_mutex_lock(&(pool->work_mutex)); + if (pool->work_first == NULL) { + pool->work_first = work; + pool->work_last = pool->work_first; + } else { + pool->work_last->next = work; + pool->work_last = work; + } + pthread_cond_broadcast(&(pool->work_cond)); + pthread_mutex_unlock(&(pool->work_mutex)); + + return true; +} + +void threadpool_wait(threadpool* pool) { + if (pool == NULL) + return; + pthread_mutex_lock(&(pool->work_mutex)); + while (1) { + if (pool->work_first != NULL || (!pool->stop && pool->working_count != 0) || (pool->stop && pool->thread_count != 0)) + pthread_cond_wait(&(pool->working_cond), &(pool->work_mutex)); + else + break; + } + pthread_mutex_unlock(&(pool->work_mutex)); +} diff --git a/src/util.c b/src/util.c new file mode 100644 index 0000000..bfb7de7 --- /dev/null +++ b/src/util.c @@ -0,0 +1,38 @@ +#include "util.h" +#include "log.h" +#include +#include + +char* trim(const char* const str) { + char* begin = strdup(str); + char* end; + while(isspace((unsigned char)*begin)) + begin++; + if(*begin == 0) + return begin; + end = begin + strlen(begin) - 1; + while(end > begin && isspace((unsigned char)*end)) + end--; + *(end + 1) = '\0'; + return begin; +} + +void per_line(const char* file, line_handler handler) { + FILE* stream; + char* line = NULL; + size_t len = 0; + ssize_t nread; + log_trace("reading file %s", file); + stream = fopen(file, "r"); + if(stream == NULL) { + perror("fopen"); + return; + } + while((nread = getline(&line, &len, stream)) != -1) { + char* line_trimmed = trim(line); + handler(line_trimmed); + free(line_trimmed); + } + free(line); + fclose(stream); +}