From aae388897e8c23e0c3da3d81d41df924ab9942e6 Mon Sep 17 00:00:00 2001 From: Asger Gitz-Johansen Date: Wed, 11 Sep 2024 07:22:55 +0200 Subject: [PATCH] feat: add messagequeue api --- Makefile | 3 +- TODO.md | 1 + include/api.h | 27 ++++++++++++ include/pipeline.h | 3 +- include/util.h | 2 + src/api.c | 103 +++++++++++++++++++++++++++++++++++++++++++++ src/executor.c | 3 ++ src/main.c | 7 +++ src/pipeline.c | 10 +++++ src/sci.1 | 6 +-- src/util.c | 23 ++++++++++ 11 files changed, 183 insertions(+), 5 deletions(-) create mode 100644 include/api.h create mode 100644 src/api.c diff --git a/Makefile b/Makefile index 741ec62..bdc5072 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ CFLAGS += -D_POSIX_C_SOURCE=2 CFLAGS += -D_GNU_SOURCE CFLAGS += -Wall -Werror -std=c11 CFLAGS += -Iinclude -CFLAGS += -lpthread -luuid +CFLAGS += -lpthread -luuid -lrt CFLAGS += -fsanitize=address CFLAGS += -fsanitize=undefined CFLAGS += -g @@ -32,6 +32,7 @@ all: out/bin/sci out/obj/%.o: src/%.c | $(OBJDIR) $(CC) -c $? $(CFLAGS) -o $@ +OBJ += out/obj/api.o OBJ += out/obj/cli.o OBJ += out/obj/executor.o OBJ += out/obj/log.o diff --git a/TODO.md b/TODO.md index b576e75..dba99af 100644 --- a/TODO.md +++ b/TODO.md @@ -22,6 +22,7 @@ - [x] custom environment variable passing. Something like `-e MY_TOKEN` ala docker-style - [x] address sanitizers please. - [ ] Ninth things ninth, fix bugs, see https://git.gtz.dk/agj/sci/projects/1 + - [ ] docstring in all header files - [ ] Tenth things tenth, write manpages, choose license - [ ] Eleventh things Eleventh, polish - [ ] Twelveth things last, release! diff --git a/include/api.h b/include/api.h new file mode 100644 index 0000000..6b75679 --- /dev/null +++ b/include/api.h @@ -0,0 +1,27 @@ +#ifndef SCI_API_H +#define SCI_API_H + +// Start the api. This will also trigger an "sci started" event. +// Note that this is a blocking call. +void* api_start(void*); + +// Fork the process and have the child run the api. +void api_start_p(); + +// Destroy all listeners and release the message queue. +void api_destroy(); + +// Post a newline-separated string with pipeline_id entries +// of the currently running pipelines on the message queue. +void api_list_running_pipelines(); + +// Trigger a pipeline started event. +void api_pipeline_started(const char* pipeline_id, const char* name); + +// Trigger a pipeline ended event. +void api_pipeline_ended(const char* pipeline_id, const char* name, int exit_code); + +// Trigger an api started event. +void api_started(); + +#endif // !SCI_API_H diff --git a/include/pipeline.h b/include/pipeline.h index b4d806a..e0ebc9c 100644 --- a/include/pipeline.h +++ b/include/pipeline.h @@ -36,12 +36,13 @@ typedef struct { char* command; } pipeline_event; -// create a new pipeline_conf struct instance based on a configuration line. +// create a new `pipeline_conf` struct instance based on a configuration line. optional_pipeline_conf pipeline_create(const char* config_line); void pipeline_event_destroy(pipeline_event* ev); void pipeline_destroy(pipeline_conf* conf); void pipeline_register(pthread_t thread); void pipeline_loop(); void pipeline_cancel(); +int pipeline_count(); #endif diff --git a/include/util.h b/include/util.h index 12ccf3b..1f054f5 100644 --- a/include/util.h +++ b/include/util.h @@ -42,6 +42,8 @@ void per_line(const char* file, line_handler handler); char* join(const char* a, const char* b); char* join3(const char* a, const char* b, const char* c); char* join4(const char* a, const char* b, const char* c, const char* d); +char* join5(const char* a, const char* b, const char* c, const char* d, const char* e); +char* join6(const char* a, const char* b, const char* c, const char* d, const char* e, const char* f); const char* skip_arg(const char* cp); char* skip_spaces(const char* str); diff --git a/src/api.c b/src/api.c new file mode 100644 index 0000000..b849c1f --- /dev/null +++ b/src/api.c @@ -0,0 +1,103 @@ +#include "api.h" +#include "log.h" +#include "util.h" +#include +#include +#include +#include +#include +#include + +// TODO: Make sure to write a manpage for this api +#define LIST_REQ "list" +#define MQ_MAX_SIZE 8192 + +bool api_is_running = false; +mqd_t api_out; +mqd_t api_in; + +void api_handle_request(const char* request) { + log_trace("api request: '%s'", request); + + // list + if(strncmp(request, LIST_REQ, MQ_MAX_SIZE) == 0) { + api_list_running_pipelines(); + return; + } + + // else + log_error("unrecognized api request: '%s'", request); +} + +void* api_start(void* data) { + struct mq_attr attr; + attr.mq_flags = 0; + attr.mq_maxmsg = 10; + attr.mq_msgsize = 512; + attr.mq_curmsgs = 0; + + api_out = mq_open("/sci_tx", O_CREAT | O_WRONLY, 0666, &attr); + if(api_out == -1) { + perror("mq_open"); + return NULL; + } + + api_in = mq_open("/sci_rx", O_CREAT | O_RDONLY, 0666, &attr); // TODO: Consider some better mq names + if(api_in == -1) { + perror("mq_open"); + return NULL; + } + + api_started(); + + log_info("api listening for requests"); + char msg[MQ_MAX_SIZE]; + api_is_running = true; + while(api_is_running) { + memset(msg, '\0', MQ_MAX_SIZE); + if(mq_receive(api_in, msg, MQ_MAX_SIZE, NULL) == -1) { + perror("mq_receive"); + return NULL; + } + api_handle_request(msg); + } + return NULL; +} + +void api_start_p() { + pthread_t conf_listener; + ASSERT_SYSCALL_SUCCESS(pthread_create(&conf_listener, NULL, &api_start, (void*)NULL)); + pthread_setname_np(conf_listener, "sci-api"); +} + +void api_destroy() { + log_trace("closing api"); + api_is_running = false; + mq_unlink("/sci"); +} + +void api_list_running_pipelines() { + // TODO: you need a way of enumerating the pipeline ids before this can be implemented. + log_error("cannot list running pipelines yet, feature is work-in-progress."); +} + +void api_pipeline_started(const char* pipeline_id, const char* name) { + char* msg = join("pipeline_new ", pipeline_id); + if(mq_send(api_out, msg, strnlen(msg, 256), 1) == -1) + perror("mq_send"); + free(msg); +} + +void api_pipeline_ended(const char* pipeline_id, const char* name, int exit_code) { + char exit_code_str[64]; + sprintf(exit_code_str, " %d", exit_code); + char* msg = join6("pipeline_end ", pipeline_id, " ", name, " ", exit_code_str); + if(mq_send(api_out, msg, strnlen(msg, 256), 1) == -1) + perror("mq_send"); + free(msg); +} + +void api_started() { + if(mq_send(api_out, "sci_started", 12, 1) == -1) + perror("mq_send"); +} diff --git a/src/executor.c b/src/executor.c index c5820e8..5a5f0c6 100644 --- a/src/executor.c +++ b/src/executor.c @@ -16,6 +16,7 @@ along with this program. If not, see . */ #include "executor.h" +#include "api.h" #include "log.h" #include "optional.h" #include "pipeline.h" @@ -155,11 +156,13 @@ void executor(void* data) { return; } + api_pipeline_started(pipeline_id, e->name); log_info("{%s} (%s) spawned", pipeline_id, e->name); // Wait for process to complete int status; waitpid(pid, &status, 0); + api_pipeline_ended(pipeline_id, e->name, status); log_info("{%s} (%s) [pid=%d] exited with status %d", pipeline_id, e->name, pid, status); char buf[32]; sprintf(buf, "exited with status %d", status); diff --git a/src/main.c b/src/main.c index 93c27c1..6eabfb5 100644 --- a/src/main.c +++ b/src/main.c @@ -15,6 +15,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . */ +#include "api.h" #include "cli.h" #include "executor.h" #include "log.h" @@ -22,10 +23,12 @@ #include "pipeline.h" #include "threadpool.h" #include "util.h" +#include #include #include #include #include +#include threadpool* worker_pool = NULL; char* trigger_dir = "/tmp/sci"; @@ -117,6 +120,7 @@ int main(int argc, char** argv) { fprintf(stderr, "no pipeline config file provided see -h for usage\n"); exit(EXIT_FAILURE); } + if(access(args.config_file.value, F_OK) != 0) { fprintf(stderr, "no such file or directory %s\n", args.config_file.value); exit(EXIT_FAILURE); @@ -134,6 +138,8 @@ int main(int argc, char** argv) { if(args.environment_vars.has_value) set_shared_environment(args.environment_vars.value); + + api_start_p(); log_info("spawning trigger thread for config file"); pthread_t conf_listener; @@ -149,4 +155,5 @@ int main(int argc, char** argv) { pthread_cancel(conf_listener); threadpool_destroy(worker_pool); destroy_options(args); + api_destroy(); } diff --git a/src/pipeline.c b/src/pipeline.c index dbd41bb..824127d 100644 --- a/src/pipeline.c +++ b/src/pipeline.c @@ -113,3 +113,13 @@ void pipeline_event_destroy(pipeline_event* ev) { free(ev->command); free(ev); } + +int pipeline_count() { + int result = 0; + pthread_list_node* cursor = root; + while(cursor != NULL) { + cursor = cursor->next; + result++; + } + return result; +} diff --git a/src/sci.1 b/src/sci.1 index f7f3d2a..443b147 100644 --- a/src/sci.1 +++ b/src/sci.1 @@ -42,7 +42,7 @@ The operation of is configured through a .I pipelines.conf configuration file (see -.I sci(7) +.BR sci (5) for configuration language details) and each pipeline will have an associated pipeline trigger file that can be By default, pipeline triggers are placed in /tmp/sci but this can be overridden with the @@ -85,5 +85,5 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . -.SH SEE ALSO -\" TODO: write sci(7) +.SH "SEE ALSO" +.BR sci (5), diff --git a/src/util.c b/src/util.c index f4bd13f..9c8e147 100644 --- a/src/util.c +++ b/src/util.c @@ -82,6 +82,29 @@ char* join4(const char* a, const char* b, const char* c, const char* d) { return result; } +char* join5(const char* a, const char* b, const char* c, const char* d, const char* e) { + size_t alen = strlen(a); + size_t blen = strlen(b); + size_t clen = strlen(c); + size_t dlen = strlen(d); + size_t elen = strlen(e); + char* result = malloc(alen + blen + clen + dlen + elen + 1); + sprintf(result, "%s%s%s%s%s", a, b, c, d, e); + return result; +} + +char* join6(const char* a, const char* b, const char* c, const char* d, const char* e, const char* f) { + size_t alen = strlen(a); + size_t blen = strlen(b); + size_t clen = strlen(c); + size_t dlen = strlen(d); + size_t elen = strlen(e); + size_t flen = strlen(f); + char* result = malloc(alen + blen + clen + dlen + elen + flen + 1); + sprintf(result, "%s%s%s%s%s%s", a, b, c, d, e, f); + return result; +} + const char* skip_arg(const char* cp) { while(*cp && !isspace(*cp)) cp++;