feat: add messagequeue api

This commit is contained in:
Asger Gitz-Johansen 2024-09-11 07:22:55 +02:00
parent 99174939f5
commit aae388897e
11 changed files with 183 additions and 5 deletions

View File

@ -20,7 +20,7 @@ CFLAGS += -D_POSIX_C_SOURCE=2
CFLAGS += -D_GNU_SOURCE CFLAGS += -D_GNU_SOURCE
CFLAGS += -Wall -Werror -std=c11 CFLAGS += -Wall -Werror -std=c11
CFLAGS += -Iinclude CFLAGS += -Iinclude
CFLAGS += -lpthread -luuid CFLAGS += -lpthread -luuid -lrt
CFLAGS += -fsanitize=address CFLAGS += -fsanitize=address
CFLAGS += -fsanitize=undefined CFLAGS += -fsanitize=undefined
CFLAGS += -g CFLAGS += -g
@ -32,6 +32,7 @@ all: out/bin/sci
out/obj/%.o: src/%.c | $(OBJDIR) out/obj/%.o: src/%.c | $(OBJDIR)
$(CC) -c $? $(CFLAGS) -o $@ $(CC) -c $? $(CFLAGS) -o $@
OBJ += out/obj/api.o
OBJ += out/obj/cli.o OBJ += out/obj/cli.o
OBJ += out/obj/executor.o OBJ += out/obj/executor.o
OBJ += out/obj/log.o OBJ += out/obj/log.o

View File

@ -22,6 +22,7 @@
- [x] custom environment variable passing. Something like `-e MY_TOKEN` ala docker-style - [x] custom environment variable passing. Something like `-e MY_TOKEN` ala docker-style
- [x] address sanitizers please. - [x] address sanitizers please.
- [ ] Ninth things ninth, fix bugs, see https://git.gtz.dk/agj/sci/projects/1 - [ ] Ninth things ninth, fix bugs, see https://git.gtz.dk/agj/sci/projects/1
- [ ] docstring in all header files
- [ ] Tenth things tenth, write manpages, choose license - [ ] Tenth things tenth, write manpages, choose license
- [ ] Eleventh things Eleventh, polish - [ ] Eleventh things Eleventh, polish
- [ ] Twelveth things last, release! - [ ] Twelveth things last, release!

27
include/api.h Normal file
View File

@ -0,0 +1,27 @@
#ifndef SCI_API_H
#define SCI_API_H
// Start the api. This will also trigger an "sci started" event.
// Note that this is a blocking call.
void* api_start(void*);
// Fork the process and have the child run the api.
void api_start_p();
// Destroy all listeners and release the message queue.
void api_destroy();
// Post a newline-separated string with pipeline_id entries
// of the currently running pipelines on the message queue.
void api_list_running_pipelines();
// Trigger a pipeline started event.
void api_pipeline_started(const char* pipeline_id, const char* name);
// Trigger a pipeline ended event.
void api_pipeline_ended(const char* pipeline_id, const char* name, int exit_code);
// Trigger an api started event.
void api_started();
#endif // !SCI_API_H

View File

@ -36,12 +36,13 @@ typedef struct {
char* command; char* command;
} pipeline_event; } pipeline_event;
// create a new pipeline_conf struct instance based on a configuration line. // create a new `pipeline_conf` struct instance based on a configuration line.
optional_pipeline_conf pipeline_create(const char* config_line); optional_pipeline_conf pipeline_create(const char* config_line);
void pipeline_event_destroy(pipeline_event* ev); void pipeline_event_destroy(pipeline_event* ev);
void pipeline_destroy(pipeline_conf* conf); void pipeline_destroy(pipeline_conf* conf);
void pipeline_register(pthread_t thread); void pipeline_register(pthread_t thread);
void pipeline_loop(); void pipeline_loop();
void pipeline_cancel(); void pipeline_cancel();
int pipeline_count();
#endif #endif

View File

@ -42,6 +42,8 @@ void per_line(const char* file, line_handler handler);
char* join(const char* a, const char* b); char* join(const char* a, const char* b);
char* join3(const char* a, const char* b, const char* c); char* join3(const char* a, const char* b, const char* c);
char* join4(const char* a, const char* b, const char* c, const char* d); char* join4(const char* a, const char* b, const char* c, const char* d);
char* join5(const char* a, const char* b, const char* c, const char* d, const char* e);
char* join6(const char* a, const char* b, const char* c, const char* d, const char* e, const char* f);
const char* skip_arg(const char* cp); const char* skip_arg(const char* cp);
char* skip_spaces(const char* str); char* skip_spaces(const char* str);

103
src/api.c Normal file
View File

@ -0,0 +1,103 @@
#include "api.h"
#include "log.h"
#include "util.h"
#include <linux/limits.h>
#include <mqueue.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
// TODO: Make sure to write a manpage for this api
#define LIST_REQ "list"
#define MQ_MAX_SIZE 8192
bool api_is_running = false;
mqd_t api_out;
mqd_t api_in;
void api_handle_request(const char* request) {
log_trace("api request: '%s'", request);
// list
if(strncmp(request, LIST_REQ, MQ_MAX_SIZE) == 0) {
api_list_running_pipelines();
return;
}
// else
log_error("unrecognized api request: '%s'", request);
}
void* api_start(void* data) {
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = 10;
attr.mq_msgsize = 512;
attr.mq_curmsgs = 0;
api_out = mq_open("/sci_tx", O_CREAT | O_WRONLY, 0666, &attr);
if(api_out == -1) {
perror("mq_open");
return NULL;
}
api_in = mq_open("/sci_rx", O_CREAT | O_RDONLY, 0666, &attr); // TODO: Consider some better mq names
if(api_in == -1) {
perror("mq_open");
return NULL;
}
api_started();
log_info("api listening for requests");
char msg[MQ_MAX_SIZE];
api_is_running = true;
while(api_is_running) {
memset(msg, '\0', MQ_MAX_SIZE);
if(mq_receive(api_in, msg, MQ_MAX_SIZE, NULL) == -1) {
perror("mq_receive");
return NULL;
}
api_handle_request(msg);
}
return NULL;
}
void api_start_p() {
pthread_t conf_listener;
ASSERT_SYSCALL_SUCCESS(pthread_create(&conf_listener, NULL, &api_start, (void*)NULL));
pthread_setname_np(conf_listener, "sci-api");
}
void api_destroy() {
log_trace("closing api");
api_is_running = false;
mq_unlink("/sci");
}
void api_list_running_pipelines() {
// TODO: you need a way of enumerating the pipeline ids before this can be implemented.
log_error("cannot list running pipelines yet, feature is work-in-progress.");
}
void api_pipeline_started(const char* pipeline_id, const char* name) {
char* msg = join("pipeline_new ", pipeline_id);
if(mq_send(api_out, msg, strnlen(msg, 256), 1) == -1)
perror("mq_send");
free(msg);
}
void api_pipeline_ended(const char* pipeline_id, const char* name, int exit_code) {
char exit_code_str[64];
sprintf(exit_code_str, " %d", exit_code);
char* msg = join6("pipeline_end ", pipeline_id, " ", name, " ", exit_code_str);
if(mq_send(api_out, msg, strnlen(msg, 256), 1) == -1)
perror("mq_send");
free(msg);
}
void api_started() {
if(mq_send(api_out, "sci_started", 12, 1) == -1)
perror("mq_send");
}

View File

@ -16,6 +16,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
#include "executor.h" #include "executor.h"
#include "api.h"
#include "log.h" #include "log.h"
#include "optional.h" #include "optional.h"
#include "pipeline.h" #include "pipeline.h"
@ -155,11 +156,13 @@ void executor(void* data) {
return; return;
} }
api_pipeline_started(pipeline_id, e->name);
log_info("{%s} (%s) spawned", pipeline_id, e->name); log_info("{%s} (%s) spawned", pipeline_id, e->name);
// Wait for process to complete // Wait for process to complete
int status; int status;
waitpid(pid, &status, 0); waitpid(pid, &status, 0);
api_pipeline_ended(pipeline_id, e->name, status);
log_info("{%s} (%s) [pid=%d] exited with status %d", pipeline_id, e->name, pid, status); log_info("{%s} (%s) [pid=%d] exited with status %d", pipeline_id, e->name, pid, status);
char buf[32]; char buf[32];
sprintf(buf, "exited with status %d", status); sprintf(buf, "exited with status %d", status);

View File

@ -15,6 +15,7 @@
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
#include "api.h"
#include "cli.h" #include "cli.h"
#include "executor.h" #include "executor.h"
#include "log.h" #include "log.h"
@ -22,10 +23,12 @@
#include "pipeline.h" #include "pipeline.h"
#include "threadpool.h" #include "threadpool.h"
#include "util.h" #include "util.h"
#include <mqueue.h>
#include <signal.h> #include <signal.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdlib.h> #include <stdlib.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <time.h>
threadpool* worker_pool = NULL; threadpool* worker_pool = NULL;
char* trigger_dir = "/tmp/sci"; char* trigger_dir = "/tmp/sci";
@ -117,6 +120,7 @@ int main(int argc, char** argv) {
fprintf(stderr, "no pipeline config file provided see -h for usage\n"); fprintf(stderr, "no pipeline config file provided see -h for usage\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if(access(args.config_file.value, F_OK) != 0) { if(access(args.config_file.value, F_OK) != 0) {
fprintf(stderr, "no such file or directory %s\n", args.config_file.value); fprintf(stderr, "no such file or directory %s\n", args.config_file.value);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
@ -134,6 +138,8 @@ int main(int argc, char** argv) {
if(args.environment_vars.has_value) if(args.environment_vars.has_value)
set_shared_environment(args.environment_vars.value); set_shared_environment(args.environment_vars.value);
api_start_p();
log_info("spawning trigger thread for config file"); log_info("spawning trigger thread for config file");
pthread_t conf_listener; pthread_t conf_listener;
@ -149,4 +155,5 @@ int main(int argc, char** argv) {
pthread_cancel(conf_listener); pthread_cancel(conf_listener);
threadpool_destroy(worker_pool); threadpool_destroy(worker_pool);
destroy_options(args); destroy_options(args);
api_destroy();
} }

View File

@ -113,3 +113,13 @@ void pipeline_event_destroy(pipeline_event* ev) {
free(ev->command); free(ev->command);
free(ev); free(ev);
} }
int pipeline_count() {
int result = 0;
pthread_list_node* cursor = root;
while(cursor != NULL) {
cursor = cursor->next;
result++;
}
return result;
}

View File

@ -42,7 +42,7 @@ The operation of
is configured through a is configured through a
.I pipelines.conf .I pipelines.conf
configuration file (see configuration file (see
.I sci(7) .BR sci (5)
for configuration language details) for configuration language details)
and each pipeline will have an associated pipeline trigger file that can be and each pipeline will have an associated pipeline trigger file that can be
By default, pipeline triggers are placed in /tmp/sci but this can be overridden with the By default, pipeline triggers are placed in /tmp/sci but this can be overridden with the
@ -85,5 +85,5 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
.SH SEE ALSO .SH "SEE ALSO"
\" TODO: write sci(7) .BR sci (5),

View File

@ -82,6 +82,29 @@ char* join4(const char* a, const char* b, const char* c, const char* d) {
return result; return result;
} }
char* join5(const char* a, const char* b, const char* c, const char* d, const char* e) {
size_t alen = strlen(a);
size_t blen = strlen(b);
size_t clen = strlen(c);
size_t dlen = strlen(d);
size_t elen = strlen(e);
char* result = malloc(alen + blen + clen + dlen + elen + 1);
sprintf(result, "%s%s%s%s%s", a, b, c, d, e);
return result;
}
char* join6(const char* a, const char* b, const char* c, const char* d, const char* e, const char* f) {
size_t alen = strlen(a);
size_t blen = strlen(b);
size_t clen = strlen(c);
size_t dlen = strlen(d);
size_t elen = strlen(e);
size_t flen = strlen(f);
char* result = malloc(alen + blen + clen + dlen + elen + flen + 1);
sprintf(result, "%s%s%s%s%s%s", a, b, c, d, e, f);
return result;
}
const char* skip_arg(const char* cp) { const char* skip_arg(const char* cp) {
while(*cp && !isspace(*cp)) while(*cp && !isspace(*cp))
cp++; cp++;