diff --git a/include/pipeline.h b/include/pipeline.h index 19bf3a4..b4d806a 100644 --- a/include/pipeline.h +++ b/include/pipeline.h @@ -38,8 +38,10 @@ typedef struct { // create a new pipeline_conf struct instance based on a configuration line. optional_pipeline_conf pipeline_create(const char* config_line); +void pipeline_event_destroy(pipeline_event* ev); void pipeline_destroy(pipeline_conf* conf); void pipeline_register(pthread_t thread); void pipeline_loop(); +void pipeline_cancel(); #endif diff --git a/src/executor.c b/src/executor.c index 719f3cf..90d7329 100644 --- a/src/executor.c +++ b/src/executor.c @@ -152,7 +152,7 @@ end: argv_free(argv); close(fd.value); free(pipeline_id); - free(data); + pipeline_event_destroy(data); char** cursor = envp; while(*cursor != NULL) { free(*cursor); diff --git a/src/main.c b/src/main.c index 51129dc..b714080 100644 --- a/src/main.c +++ b/src/main.c @@ -26,21 +26,26 @@ #include #include -threadpool* pool = NULL; +threadpool* worker_pool = NULL; char* trigger_dir = "/tmp/sci"; bool config_file_changed = false; void on_event(pipeline_event* const e) { - if(!threadpool_add_work(pool, executor, (void*)e)) + if(!threadpool_add_work(worker_pool, executor, (void*)e)) log_error("could not add work to the threadpool"); } +void listener_thread_cleanup(void* data) { + // We're now done with the config. + pipeline_destroy((pipeline_conf*)data); +} + void* listen_for_changes_thread(void* data) { pipeline_conf* conf = (pipeline_conf*)data; - while(1) // TODO: Should be while(sigint_has_not_been_caught) instead + pthread_cleanup_push(listener_thread_cleanup, conf); + while(1) listen_for_changes(conf, &on_event); - // We're now done with the config. - pipeline_destroy(conf); + pthread_cleanup_pop(1); return NULL; } diff --git a/src/notify.c b/src/notify.c index 93291a3..c10591f 100644 --- a/src/notify.c +++ b/src/notify.c @@ -24,7 +24,7 @@ #define BUF_LEN EV_SIZE * 32 void listen_for_changes(const pipeline_conf* config, notify_callback callback) { - // TODO: callback is a bit slow sometimes. We should also poll once after calling callback + // TODO: callback is potentially slow. We should also poll once after calling callback const char* filename = config->trigger; if(access(filename, F_OK) != 0) { log_trace("file does not exist yet, creating it."); @@ -37,17 +37,43 @@ void listen_for_changes(const pipeline_conf* config, notify_callback callback) { log_trace("listening for changes in file: %s", filename); char buffer[BUF_LEN]; int r = read(fd, buffer, BUF_LEN); - assert(r != -1); + if(r == -1) { + perror("read"); + return; + } for(int i = 0; i < r; ) { struct inotify_event* e = (struct inotify_event*)&buffer[i]; pipeline_event* ev = malloc(sizeof(pipeline_event)); ev->event = e; - ev->name = config->name; - ev->url = config->url; - ev->trigger = config->trigger; - ev->command = config->command; + ev->name = strdup(config->name); + ev->url = strdup(config->url); + ev->trigger = strdup(config->trigger); + ev->command = strdup(config->command); callback(ev); i += EV_SIZE + e->len; } ASSERT_SYSCALL_SUCCESS(close(fd)); } + +void listen_for_config_changes(const char* config_filepath, config_change_callback callback) { + if(access(config_filepath, F_OK) != 0) { + perror("access"); + return; + } + int fd = inotify_init(); + ASSERT_SYSCALL_SUCCESS(fd); + inotify_add_watch(fd, config_filepath, IN_ATTRIB); + log_trace("listening for changes in file: %s", config_filepath); + char buffer[BUF_LEN]; + int r = read(fd, buffer, BUF_LEN); + if(r == -1) { + perror("read"); + return; + } + assert(r != -1); + for(int i = 0; i < r; ) { + callback(); + i += EV_SIZE + ((struct inotify_event*)&buffer[i])->len;; + } + ASSERT_SYSCALL_SUCCESS(close(fd)); +} diff --git a/src/pipeline.c b/src/pipeline.c index 6d5f923..eb5284f 100644 --- a/src/pipeline.c +++ b/src/pipeline.c @@ -19,6 +19,7 @@ #include "pipeline.h" #include "threadlist.h" #include "util.h" +#include #include #include @@ -54,6 +55,7 @@ optional_pipeline_conf pipeline_create(const char* config_line) { free(opts[j]); return result; } + regfree(®); result.value = malloc(sizeof(pipeline_conf)); result.value->name = opts[0]; @@ -89,3 +91,19 @@ void pipeline_loop() { clear_thread_list(root); root = NULL; } + +void pipeline_cancel() { + pthread_list_node* cursor = root; + while(cursor != NULL) { + pthread_cancel(cursor->thread); + cursor = cursor->next; + } +} + +void pipeline_event_destroy(pipeline_event* ev) { + free(ev->name); + free(ev->trigger); + free(ev->url); + free(ev->command); + free(ev); +} diff --git a/src/threadlist.c b/src/threadlist.c index a3de7e7..9d81d2e 100644 --- a/src/threadlist.c +++ b/src/threadlist.c @@ -71,7 +71,8 @@ void clear_thread_list(pthread_list_node* root) { pthread_list_node* cursor = root; while(cursor != NULL) { pthread_join(cursor->thread, NULL); + pthread_list_node* prev = cursor; cursor = cursor->next; - free(cursor->previous); + free(prev); } }