diff --git a/.sci.sh b/.sci.sh index f9a8444..850a563 100755 --- a/.sci.sh +++ b/.sci.sh @@ -2,6 +2,8 @@ set -e echo ">>> checking if required environment is set..." test -n "$DOCKER_TOKEN" +: "${PACKAGE_REGISTRY_USER:=agj}" +export PACKAGE_REGISTRY_USER which make echo ">>> compiling..." @@ -30,7 +32,7 @@ echo ">>> building debbuilder image..." docker build -t debbuilder -f deb-builder.dockerfile . echo ">>> building .deb in debbuilder docker image..." -docker run --rm -it -v .:/src -e VERSION -e DOCKER_TOKEN debbuilder sh -c '\ +docker run --rm -it -v .:/src -e VERSION -e DOCKER_TOKEN -e PACKAGE_REGISTRY_USER debbuilder sh -c '\ cd && \ mkdir -p artifacts && \ cp /src/sci-$VERSION.tar.gz . && \ @@ -47,8 +49,7 @@ docker run --rm -it -v .:/src -e VERSION -e DOCKER_TOKEN debbuilder sh -c '\ cp ../*.tar.xz ~/artifacts && \ cp ../*.tar.gz ~/artifacts && \ cd && \ - curl --user agj:$DOCKER_TOKEN \ + curl --user "$PACKAGE_REGISTRY_USER:$DOCKER_TOKEN" \ --upload-file sci_$VERSION-1_amd64.deb \ "https://git.gtz.dk/api/packages/agj/debian/pool/bionic/main/upload" ' -# TODO: push-user should be some sci-bot or something, not your account. This will do for now though diff --git a/Makefile b/Makefile index bdc5072..a3b2814 100644 --- a/Makefile +++ b/Makefile @@ -85,6 +85,9 @@ install: out/bin/sci mkdir -p $(DESTDIR)$(MANPREFIX)/man1 sed "s/VERSION/$(VERSION)/g" < src/sci.1 > $(DESTDIR)$(MANPREFIX)/man1/sci.1 chmod 644 $(DESTDIR)$(MANPREFIX)/man1/sci.1 + mkdir -p $(DESTDIR)$(MANPREFIX)/man7 + sed "s/VERSION/$(VERSION)/g" < src/sci-api.7 > $(DESTDIR)$(MANPREFIX)/man7/sci-api.7 + chmod 644 $(DESTDIR)$(MANPREFIX)/man7/sci-api.7 uninstall: # uninstall binaries @@ -93,3 +96,4 @@ uninstall: # uninstall services (only if system is using systemd though) # uninstall manpages rm -f $(DESTDIR)$(MANPREFIX)/man1/sci.1 + rm -f $(DESTDIR)$(MANPREFIX)/man7/sci-api.7 diff --git a/README.md b/README.md index 72fa79f..79f9065 100644 --- a/README.md +++ b/README.md @@ -50,8 +50,9 @@ sudo make install PREFIX=/some/path ### Arch Linux It is recommended that you use an arch linux distribution when building. - +The `arch-builder.dockerfile` image provides a repeatable build environment. ```sh +docker build -t archbuilder -f arch-builder.dockerfile . ``` ### Debian @@ -65,7 +66,8 @@ docker build -t debbuilder deb-builder.dockerfile . ``` ## Brainstorm -If you dont want to congest your CI server. Too bad. Write faster ci suites. (TODO: implement runners) +If you dont want to congest your CI server, keep pipeline scripts fast or run them on separate machines. +Runner orchestration is intentionally outside the current core daemon. I would like to try to avoid writing a million REST APIs, as that just results in bloat usually. diff --git a/include/threadlist.h b/include/threadlist.h index ce62459..0590cfd 100644 --- a/include/threadlist.h +++ b/include/threadlist.h @@ -22,6 +22,7 @@ // doubly linked list implementation for managing threads typedef struct pthread_list_node { pthread_t thread; + pthread_mutex_t mutex; struct pthread_list_node* previous; struct pthread_list_node* next; } pthread_list_node; diff --git a/src/api.c b/src/api.c index b849c1f..912c275 100644 --- a/src/api.c +++ b/src/api.c @@ -8,13 +8,61 @@ #include #include -// TODO: Make sure to write a manpage for this api #define LIST_REQ "list" #define MQ_MAX_SIZE 8192 +#define SCI_API_EVENTS_QUEUE "/sci_events" +#define SCI_API_REQUESTS_QUEUE "/sci_requests" bool api_is_running = false; -mqd_t api_out; -mqd_t api_in; +mqd_t api_out = (mqd_t)-1; +mqd_t api_in = (mqd_t)-1; + +typedef struct api_pipeline_node { + char* pipeline_id; + char* name; + struct api_pipeline_node* next; +} api_pipeline_node; + +pthread_mutex_t api_pipeline_mutex = PTHREAD_MUTEX_INITIALIZER; +api_pipeline_node* api_running_pipelines = NULL; + +static void api_send(const char* msg) { + if(mq_send(api_out, msg, strnlen(msg, MQ_MAX_SIZE), 1) == -1) + perror("mq_send"); +} + +static void api_register_pipeline(const char* pipeline_id, const char* name) { + api_pipeline_node* node = malloc(sizeof(api_pipeline_node)); + node->pipeline_id = strdup(pipeline_id); + node->name = strdup(name); + + pthread_mutex_lock(&api_pipeline_mutex); + node->next = api_running_pipelines; + api_running_pipelines = node; + pthread_mutex_unlock(&api_pipeline_mutex); +} + +static void api_unregister_pipeline(const char* pipeline_id) { + pthread_mutex_lock(&api_pipeline_mutex); + api_pipeline_node* previous = NULL; + api_pipeline_node* cursor = api_running_pipelines; + while(cursor != NULL) { + if(strncmp(cursor->pipeline_id, pipeline_id, MQ_MAX_SIZE) == 0) { + if(previous == NULL) + api_running_pipelines = cursor->next; + else + previous->next = cursor->next; + free(cursor->pipeline_id); + free(cursor->name); + free(cursor); + pthread_mutex_unlock(&api_pipeline_mutex); + return; + } + previous = cursor; + cursor = cursor->next; + } + pthread_mutex_unlock(&api_pipeline_mutex); +} void api_handle_request(const char* request) { log_trace("api request: '%s'", request); @@ -33,16 +81,16 @@ void* api_start(void* data) { struct mq_attr attr; attr.mq_flags = 0; attr.mq_maxmsg = 10; - attr.mq_msgsize = 512; + attr.mq_msgsize = MQ_MAX_SIZE; attr.mq_curmsgs = 0; - api_out = mq_open("/sci_tx", O_CREAT | O_WRONLY, 0666, &attr); + api_out = mq_open(SCI_API_EVENTS_QUEUE, O_CREAT | O_WRONLY, 0666, &attr); if(api_out == -1) { perror("mq_open"); return NULL; } - api_in = mq_open("/sci_rx", O_CREAT | O_RDONLY, 0666, &attr); // TODO: Consider some better mq names + api_in = mq_open(SCI_API_REQUESTS_QUEUE, O_CREAT | O_RDONLY, 0666, &attr); if(api_in == -1) { perror("mq_open"); return NULL; @@ -73,31 +121,53 @@ void api_start_p() { void api_destroy() { log_trace("closing api"); api_is_running = false; - mq_unlink("/sci"); + if(api_in != (mqd_t)-1) + mq_close(api_in); + if(api_out != (mqd_t)-1) + mq_close(api_out); + mq_unlink(SCI_API_REQUESTS_QUEUE); + mq_unlink(SCI_API_EVENTS_QUEUE); } void api_list_running_pipelines() { - // TODO: you need a way of enumerating the pipeline ids before this can be implemented. - log_error("cannot list running pipelines yet, feature is work-in-progress."); + char msg[MQ_MAX_SIZE]; + msg[0] = '\0'; + size_t used = 0; + + pthread_mutex_lock(&api_pipeline_mutex); + api_pipeline_node* cursor = api_running_pipelines; + while(cursor != NULL) { + int written = snprintf(msg + used, MQ_MAX_SIZE - used, "%s%s", used == 0 ? "" : "\n", cursor->pipeline_id); + if(written < 0) + break; + if((size_t)written >= MQ_MAX_SIZE - used) { + log_error("running pipeline list exceeds api message size"); + msg[MQ_MAX_SIZE - 1] = '\0'; + break; + } + used += written; + cursor = cursor->next; + } + pthread_mutex_unlock(&api_pipeline_mutex); + api_send(msg); } void api_pipeline_started(const char* pipeline_id, const char* name) { + api_register_pipeline(pipeline_id, name); char* msg = join("pipeline_new ", pipeline_id); - if(mq_send(api_out, msg, strnlen(msg, 256), 1) == -1) - perror("mq_send"); + api_send(msg); free(msg); } void api_pipeline_ended(const char* pipeline_id, const char* name, int exit_code) { + api_unregister_pipeline(pipeline_id); char exit_code_str[64]; - sprintf(exit_code_str, " %d", exit_code); + snprintf(exit_code_str, sizeof(exit_code_str), "%d", exit_code); char* msg = join6("pipeline_end ", pipeline_id, " ", name, " ", exit_code_str); - if(mq_send(api_out, msg, strnlen(msg, 256), 1) == -1) - perror("mq_send"); + api_send(msg); free(msg); } void api_started() { - if(mq_send(api_out, "sci_started", 12, 1) == -1) - perror("mq_send"); + api_send("sci_started"); } diff --git a/src/notify.c b/src/notify.c index c10591f..80d53a8 100644 --- a/src/notify.c +++ b/src/notify.c @@ -18,30 +18,15 @@ #include "notify.h" #include "util.h" #include "log.h" +#include +#include #include #define EV_SIZE sizeof(struct inotify_event) #define BUF_LEN EV_SIZE * 32 -void listen_for_changes(const pipeline_conf* config, notify_callback callback) { - // TODO: callback is potentially slow. We should also poll once after calling callback - const char* filename = config->trigger; - if(access(filename, F_OK) != 0) { - log_trace("file does not exist yet, creating it."); - FILE* f = fopen(filename, "w+"); - fclose(f); - } - int fd = inotify_init(); - ASSERT_SYSCALL_SUCCESS(fd); - inotify_add_watch(fd, filename, IN_ATTRIB); - log_trace("listening for changes in file: %s", filename); - char buffer[BUF_LEN]; - int r = read(fd, buffer, BUF_LEN); - if(r == -1) { - perror("read"); - return; - } - for(int i = 0; i < r; ) { +static void dispatch_pipeline_events(const pipeline_conf* config, notify_callback callback, char* buffer, ssize_t len) { + for(int i = 0; i < len; ) { struct inotify_event* e = (struct inotify_event*)&buffer[i]; pipeline_event* ev = malloc(sizeof(pipeline_event)); ev->event = e; @@ -52,6 +37,35 @@ void listen_for_changes(const pipeline_conf* config, notify_callback callback) { callback(ev); i += EV_SIZE + e->len; } +} + +void listen_for_changes(const pipeline_conf* config, notify_callback callback) { + const char* filename = config->trigger; + if(access(filename, F_OK) != 0) { + log_trace("file does not exist yet, creating it."); + FILE* f = fopen(filename, "w+"); + fclose(f); + } + int fd = inotify_init(); + ASSERT_SYSCALL_SUCCESS(fd); + ASSERT_SYSCALL_SUCCESS(inotify_add_watch(fd, filename, IN_ATTRIB)); + log_trace("listening for changes in file: %s", filename); + char buffer[BUF_LEN]; + ssize_t r = read(fd, buffer, BUF_LEN); + if(r == -1) { + perror("read"); + return; + } + dispatch_pipeline_events(config, callback, buffer, r); + + int flags = fcntl(fd, F_GETFL, 0); + if(flags != -1 && fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1) { + while((r = read(fd, buffer, BUF_LEN)) > 0) + dispatch_pipeline_events(config, callback, buffer, r); + if(r == -1 && errno != EAGAIN && errno != EWOULDBLOCK) + perror("read"); + } + ASSERT_SYSCALL_SUCCESS(close(fd)); } @@ -62,10 +76,10 @@ void listen_for_config_changes(const char* config_filepath, config_change_callba } int fd = inotify_init(); ASSERT_SYSCALL_SUCCESS(fd); - inotify_add_watch(fd, config_filepath, IN_ATTRIB); + ASSERT_SYSCALL_SUCCESS(inotify_add_watch(fd, config_filepath, IN_ATTRIB)); log_trace("listening for changes in file: %s", config_filepath); char buffer[BUF_LEN]; - int r = read(fd, buffer, BUF_LEN); + ssize_t r = read(fd, buffer, BUF_LEN); if(r == -1) { perror("read"); return; diff --git a/src/sci-api.7 b/src/sci-api.7 new file mode 100644 index 0000000..3b78f71 --- /dev/null +++ b/src/sci-api.7 @@ -0,0 +1,35 @@ +.TH sci-api 7 2024-08-17 "VERSION" "Simple CI manual" + +.SH NAME +sci-api - POSIX message queue interface for sci + +.SH DESCRIPTION +.B sci +exposes a small local API through POSIX message queues. +Clients send requests to +.I /sci_requests +and receive events and responses from +.I /sci_events. + +.SH REQUESTS +.TP +.B list +Return the currently running pipeline IDs as a newline-separated message. +An empty message means no pipelines are running. + +.SH EVENTS +.TP +.B sci_started +The daemon API thread has started. +.TP +.BI pipeline_new " pipeline_id" +A pipeline process has started. +.TP +.BI pipeline_end " pipeline_id name exit_status" +A pipeline process has exited. + +.SH LIMITS +Messages are limited to 8192 bytes. + +.SH SEE ALSO +.BR sci (1) diff --git a/src/sci.1 b/src/sci.1 index 443b147..83094f6 100644 --- a/src/sci.1 +++ b/src/sci.1 @@ -87,3 +87,4 @@ along with this program. If not, see . .SH "SEE ALSO" .BR sci (5), +.BR sci-api (7), diff --git a/src/threadlist.c b/src/threadlist.c index 9d81d2e..f76970e 100644 --- a/src/threadlist.c +++ b/src/threadlist.c @@ -18,19 +18,23 @@ #include "threadlist.h" #include -// TODO: mutex should really be per-list. -pthread_mutex_t threadlist_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_list_node* threadlist_root(pthread_list_node* node) { + while(node != NULL && node->previous != NULL) + node = node->previous; + return node; +} pthread_list_node* create_thread_node(pthread_t thread) { pthread_list_node* new = malloc(sizeof(pthread_list_node)); new->previous = NULL; new->next = NULL; new->thread = thread; + pthread_mutex_init(&new->mutex, NULL); return new; } pthread_list_node* add_thread(pthread_t thread, pthread_list_node* root) { - pthread_mutex_lock(&threadlist_mutex); + pthread_mutex_lock(&root->mutex); pthread_list_node* cursor = root; while(cursor->next != NULL) cursor = cursor->next; @@ -38,41 +42,49 @@ pthread_list_node* add_thread(pthread_t thread, pthread_list_node* root) { new->previous = cursor; new->next = NULL; new->thread = thread; + pthread_mutex_init(&new->mutex, NULL); cursor->next = new; - pthread_mutex_unlock(&threadlist_mutex); + pthread_mutex_unlock(&root->mutex); return new; } pthread_list_node* add_thread_node(pthread_list_node* root, pthread_list_node* node) { - pthread_mutex_lock(&threadlist_mutex); + pthread_mutex_lock(&root->mutex); pthread_list_node* cursor = root; while(cursor->next != NULL) cursor = cursor->next; node->previous = cursor; node->next = NULL; cursor->next = node; - pthread_mutex_unlock(&threadlist_mutex); + pthread_mutex_unlock(&root->mutex); return node; } void remove_thread_node(pthread_list_node* node) { - pthread_mutex_lock(&threadlist_mutex); + if(node == NULL) + return; + pthread_list_node* root = threadlist_root(node); + pthread_mutex_lock(&root->mutex); pthread_list_node* prev = node->previous; pthread_list_node* next = node->next; - free(node); if(prev != NULL) prev->next = next; if(next != NULL) next->previous = prev; - pthread_mutex_unlock(&threadlist_mutex); + pthread_mutex_unlock(&root->mutex); + pthread_mutex_destroy(&node->mutex); + free(node); } void clear_thread_list(pthread_list_node* root) { + if(root == NULL) + return; pthread_list_node* cursor = root; while(cursor != NULL) { pthread_join(cursor->thread, NULL); pthread_list_node* prev = cursor; cursor = cursor->next; + pthread_mutex_destroy(&prev->mutex); free(prev); } } diff --git a/src/threadpool.c b/src/threadpool.c index 0cf9aad..8f19191 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -41,28 +41,33 @@ static void threadpool_work_destroy(threadpool_work* work) { free(work); } -static threadpool_work* threadpool_work_pop(threadpool* pool) { - threadpool_work* work; +static optional_threadpool_work threadpool_work_pop(threadpool* pool) { + optional_threadpool_work result; + result.value = NULL; + result.has_value = false; if (pool == NULL) - return NULL; + return result; - work = pool->work_first; + threadpool_work* work = pool->work_first; if (work == NULL) - return NULL; // TODO: This should propbably be using optionals + return result; if (work->next == NULL) { pool->work_first = NULL; pool->work_last = NULL; - return work; + result.value = work; + result.has_value = true; + return result; } pool->work_first = work->next; - return work; + result.value = work; + result.has_value = true; + return result; } static void* threadpool_worker(void* arg) { log_trace("threadpool worker spawned"); threadpool* pool = arg; - threadpool_work* work; while(true) { // Wait for work pthread_mutex_lock(&(pool->work_mutex)); @@ -70,18 +75,20 @@ static void* threadpool_worker(void* arg) { pthread_cond_wait(&(pool->work_cond), &(pool->work_mutex)); if (pool->stop) break; - work = threadpool_work_pop(pool); - pool->working_count++; + optional_threadpool_work work = threadpool_work_pop(pool); + if(work.has_value) + pool->working_count++; pthread_mutex_unlock(&(pool->work_mutex)); // Do the work - if (work != NULL) { - work->func(work->arg); - threadpool_work_destroy(work); + if (work.has_value) { + work.value->func(work.value->arg); + threadpool_work_destroy(work.value); } pthread_mutex_lock(&(pool->work_mutex)); - pool->working_count--; + if(work.has_value) + pool->working_count--; if (!pool->stop && pool->working_count == 0 && pool->work_first == NULL) pthread_cond_signal(&(pool->working_cond)); pthread_mutex_unlock(&(pool->work_mutex));