wip: fix existing todos
Co-authored-by: Github CoPilot
This commit is contained in:
@@ -2,6 +2,8 @@
|
|||||||
set -e
|
set -e
|
||||||
echo ">>> checking if required environment is set..."
|
echo ">>> checking if required environment is set..."
|
||||||
test -n "$DOCKER_TOKEN"
|
test -n "$DOCKER_TOKEN"
|
||||||
|
: "${PACKAGE_REGISTRY_USER:=agj}"
|
||||||
|
export PACKAGE_REGISTRY_USER
|
||||||
which make
|
which make
|
||||||
|
|
||||||
echo ">>> compiling..."
|
echo ">>> compiling..."
|
||||||
@@ -30,7 +32,7 @@ echo ">>> building debbuilder image..."
|
|||||||
docker build -t debbuilder -f deb-builder.dockerfile .
|
docker build -t debbuilder -f deb-builder.dockerfile .
|
||||||
|
|
||||||
echo ">>> building .deb in debbuilder docker image..."
|
echo ">>> building .deb in debbuilder docker image..."
|
||||||
docker run --rm -it -v .:/src -e VERSION -e DOCKER_TOKEN debbuilder sh -c '\
|
docker run --rm -it -v .:/src -e VERSION -e DOCKER_TOKEN -e PACKAGE_REGISTRY_USER debbuilder sh -c '\
|
||||||
cd && \
|
cd && \
|
||||||
mkdir -p artifacts && \
|
mkdir -p artifacts && \
|
||||||
cp /src/sci-$VERSION.tar.gz . && \
|
cp /src/sci-$VERSION.tar.gz . && \
|
||||||
@@ -47,8 +49,7 @@ docker run --rm -it -v .:/src -e VERSION -e DOCKER_TOKEN debbuilder sh -c '\
|
|||||||
cp ../*.tar.xz ~/artifacts && \
|
cp ../*.tar.xz ~/artifacts && \
|
||||||
cp ../*.tar.gz ~/artifacts && \
|
cp ../*.tar.gz ~/artifacts && \
|
||||||
cd && \
|
cd && \
|
||||||
curl --user agj:$DOCKER_TOKEN \
|
curl --user "$PACKAGE_REGISTRY_USER:$DOCKER_TOKEN" \
|
||||||
--upload-file sci_$VERSION-1_amd64.deb \
|
--upload-file sci_$VERSION-1_amd64.deb \
|
||||||
"https://git.gtz.dk/api/packages/agj/debian/pool/bionic/main/upload"
|
"https://git.gtz.dk/api/packages/agj/debian/pool/bionic/main/upload"
|
||||||
'
|
'
|
||||||
# TODO: push-user should be some sci-bot or something, not your account. This will do for now though
|
|
||||||
|
|||||||
@@ -85,6 +85,9 @@ install: out/bin/sci
|
|||||||
mkdir -p $(DESTDIR)$(MANPREFIX)/man1
|
mkdir -p $(DESTDIR)$(MANPREFIX)/man1
|
||||||
sed "s/VERSION/$(VERSION)/g" < src/sci.1 > $(DESTDIR)$(MANPREFIX)/man1/sci.1
|
sed "s/VERSION/$(VERSION)/g" < src/sci.1 > $(DESTDIR)$(MANPREFIX)/man1/sci.1
|
||||||
chmod 644 $(DESTDIR)$(MANPREFIX)/man1/sci.1
|
chmod 644 $(DESTDIR)$(MANPREFIX)/man1/sci.1
|
||||||
|
mkdir -p $(DESTDIR)$(MANPREFIX)/man7
|
||||||
|
sed "s/VERSION/$(VERSION)/g" < src/sci-api.7 > $(DESTDIR)$(MANPREFIX)/man7/sci-api.7
|
||||||
|
chmod 644 $(DESTDIR)$(MANPREFIX)/man7/sci-api.7
|
||||||
|
|
||||||
uninstall:
|
uninstall:
|
||||||
# uninstall binaries
|
# uninstall binaries
|
||||||
@@ -93,3 +96,4 @@ uninstall:
|
|||||||
# uninstall services (only if system is using systemd though)
|
# uninstall services (only if system is using systemd though)
|
||||||
# uninstall manpages
|
# uninstall manpages
|
||||||
rm -f $(DESTDIR)$(MANPREFIX)/man1/sci.1
|
rm -f $(DESTDIR)$(MANPREFIX)/man1/sci.1
|
||||||
|
rm -f $(DESTDIR)$(MANPREFIX)/man7/sci-api.7
|
||||||
|
|||||||
@@ -50,8 +50,9 @@ sudo make install PREFIX=/some/path
|
|||||||
|
|
||||||
### Arch Linux
|
### Arch Linux
|
||||||
It is recommended that you use an arch linux distribution when building.
|
It is recommended that you use an arch linux distribution when building.
|
||||||
<!-- TODO: add a dockerfile for arch building -->
|
The `arch-builder.dockerfile` image provides a repeatable build environment.
|
||||||
```sh
|
```sh
|
||||||
|
docker build -t archbuilder -f arch-builder.dockerfile .
|
||||||
```
|
```
|
||||||
|
|
||||||
### Debian
|
### Debian
|
||||||
@@ -65,7 +66,8 @@ docker build -t debbuilder deb-builder.dockerfile .
|
|||||||
```
|
```
|
||||||
|
|
||||||
## Brainstorm
|
## Brainstorm
|
||||||
If you dont want to congest your CI server. Too bad. Write faster ci suites. (TODO: implement runners)
|
If you dont want to congest your CI server, keep pipeline scripts fast or run them on separate machines.
|
||||||
|
Runner orchestration is intentionally outside the current core daemon.
|
||||||
|
|
||||||
I would like to try to avoid writing a million REST APIs, as that just results in bloat usually.
|
I would like to try to avoid writing a million REST APIs, as that just results in bloat usually.
|
||||||
|
|
||||||
|
|||||||
@@ -22,6 +22,7 @@
|
|||||||
// doubly linked list implementation for managing threads
|
// doubly linked list implementation for managing threads
|
||||||
typedef struct pthread_list_node {
|
typedef struct pthread_list_node {
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
|
pthread_mutex_t mutex;
|
||||||
struct pthread_list_node* previous;
|
struct pthread_list_node* previous;
|
||||||
struct pthread_list_node* next;
|
struct pthread_list_node* next;
|
||||||
} pthread_list_node;
|
} pthread_list_node;
|
||||||
|
|||||||
@@ -8,13 +8,61 @@
|
|||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
// TODO: Make sure to write a manpage for this api
|
|
||||||
#define LIST_REQ "list"
|
#define LIST_REQ "list"
|
||||||
#define MQ_MAX_SIZE 8192
|
#define MQ_MAX_SIZE 8192
|
||||||
|
#define SCI_API_EVENTS_QUEUE "/sci_events"
|
||||||
|
#define SCI_API_REQUESTS_QUEUE "/sci_requests"
|
||||||
|
|
||||||
bool api_is_running = false;
|
bool api_is_running = false;
|
||||||
mqd_t api_out;
|
mqd_t api_out = (mqd_t)-1;
|
||||||
mqd_t api_in;
|
mqd_t api_in = (mqd_t)-1;
|
||||||
|
|
||||||
|
typedef struct api_pipeline_node {
|
||||||
|
char* pipeline_id;
|
||||||
|
char* name;
|
||||||
|
struct api_pipeline_node* next;
|
||||||
|
} api_pipeline_node;
|
||||||
|
|
||||||
|
pthread_mutex_t api_pipeline_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
api_pipeline_node* api_running_pipelines = NULL;
|
||||||
|
|
||||||
|
static void api_send(const char* msg) {
|
||||||
|
if(mq_send(api_out, msg, strnlen(msg, MQ_MAX_SIZE), 1) == -1)
|
||||||
|
perror("mq_send");
|
||||||
|
}
|
||||||
|
|
||||||
|
static void api_register_pipeline(const char* pipeline_id, const char* name) {
|
||||||
|
api_pipeline_node* node = malloc(sizeof(api_pipeline_node));
|
||||||
|
node->pipeline_id = strdup(pipeline_id);
|
||||||
|
node->name = strdup(name);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&api_pipeline_mutex);
|
||||||
|
node->next = api_running_pipelines;
|
||||||
|
api_running_pipelines = node;
|
||||||
|
pthread_mutex_unlock(&api_pipeline_mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void api_unregister_pipeline(const char* pipeline_id) {
|
||||||
|
pthread_mutex_lock(&api_pipeline_mutex);
|
||||||
|
api_pipeline_node* previous = NULL;
|
||||||
|
api_pipeline_node* cursor = api_running_pipelines;
|
||||||
|
while(cursor != NULL) {
|
||||||
|
if(strncmp(cursor->pipeline_id, pipeline_id, MQ_MAX_SIZE) == 0) {
|
||||||
|
if(previous == NULL)
|
||||||
|
api_running_pipelines = cursor->next;
|
||||||
|
else
|
||||||
|
previous->next = cursor->next;
|
||||||
|
free(cursor->pipeline_id);
|
||||||
|
free(cursor->name);
|
||||||
|
free(cursor);
|
||||||
|
pthread_mutex_unlock(&api_pipeline_mutex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
previous = cursor;
|
||||||
|
cursor = cursor->next;
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&api_pipeline_mutex);
|
||||||
|
}
|
||||||
|
|
||||||
void api_handle_request(const char* request) {
|
void api_handle_request(const char* request) {
|
||||||
log_trace("api request: '%s'", request);
|
log_trace("api request: '%s'", request);
|
||||||
@@ -33,16 +81,16 @@ void* api_start(void* data) {
|
|||||||
struct mq_attr attr;
|
struct mq_attr attr;
|
||||||
attr.mq_flags = 0;
|
attr.mq_flags = 0;
|
||||||
attr.mq_maxmsg = 10;
|
attr.mq_maxmsg = 10;
|
||||||
attr.mq_msgsize = 512;
|
attr.mq_msgsize = MQ_MAX_SIZE;
|
||||||
attr.mq_curmsgs = 0;
|
attr.mq_curmsgs = 0;
|
||||||
|
|
||||||
api_out = mq_open("/sci_tx", O_CREAT | O_WRONLY, 0666, &attr);
|
api_out = mq_open(SCI_API_EVENTS_QUEUE, O_CREAT | O_WRONLY, 0666, &attr);
|
||||||
if(api_out == -1) {
|
if(api_out == -1) {
|
||||||
perror("mq_open");
|
perror("mq_open");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
api_in = mq_open("/sci_rx", O_CREAT | O_RDONLY, 0666, &attr); // TODO: Consider some better mq names
|
api_in = mq_open(SCI_API_REQUESTS_QUEUE, O_CREAT | O_RDONLY, 0666, &attr);
|
||||||
if(api_in == -1) {
|
if(api_in == -1) {
|
||||||
perror("mq_open");
|
perror("mq_open");
|
||||||
return NULL;
|
return NULL;
|
||||||
@@ -73,31 +121,53 @@ void api_start_p() {
|
|||||||
void api_destroy() {
|
void api_destroy() {
|
||||||
log_trace("closing api");
|
log_trace("closing api");
|
||||||
api_is_running = false;
|
api_is_running = false;
|
||||||
mq_unlink("/sci");
|
if(api_in != (mqd_t)-1)
|
||||||
|
mq_close(api_in);
|
||||||
|
if(api_out != (mqd_t)-1)
|
||||||
|
mq_close(api_out);
|
||||||
|
mq_unlink(SCI_API_REQUESTS_QUEUE);
|
||||||
|
mq_unlink(SCI_API_EVENTS_QUEUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
void api_list_running_pipelines() {
|
void api_list_running_pipelines() {
|
||||||
// TODO: you need a way of enumerating the pipeline ids before this can be implemented.
|
char msg[MQ_MAX_SIZE];
|
||||||
log_error("cannot list running pipelines yet, feature is work-in-progress.");
|
msg[0] = '\0';
|
||||||
|
size_t used = 0;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&api_pipeline_mutex);
|
||||||
|
api_pipeline_node* cursor = api_running_pipelines;
|
||||||
|
while(cursor != NULL) {
|
||||||
|
int written = snprintf(msg + used, MQ_MAX_SIZE - used, "%s%s", used == 0 ? "" : "\n", cursor->pipeline_id);
|
||||||
|
if(written < 0)
|
||||||
|
break;
|
||||||
|
if((size_t)written >= MQ_MAX_SIZE - used) {
|
||||||
|
log_error("running pipeline list exceeds api message size");
|
||||||
|
msg[MQ_MAX_SIZE - 1] = '\0';
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
used += written;
|
||||||
|
cursor = cursor->next;
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&api_pipeline_mutex);
|
||||||
|
api_send(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void api_pipeline_started(const char* pipeline_id, const char* name) {
|
void api_pipeline_started(const char* pipeline_id, const char* name) {
|
||||||
|
api_register_pipeline(pipeline_id, name);
|
||||||
char* msg = join("pipeline_new ", pipeline_id);
|
char* msg = join("pipeline_new ", pipeline_id);
|
||||||
if(mq_send(api_out, msg, strnlen(msg, 256), 1) == -1)
|
api_send(msg);
|
||||||
perror("mq_send");
|
|
||||||
free(msg);
|
free(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void api_pipeline_ended(const char* pipeline_id, const char* name, int exit_code) {
|
void api_pipeline_ended(const char* pipeline_id, const char* name, int exit_code) {
|
||||||
|
api_unregister_pipeline(pipeline_id);
|
||||||
char exit_code_str[64];
|
char exit_code_str[64];
|
||||||
sprintf(exit_code_str, " %d", exit_code);
|
snprintf(exit_code_str, sizeof(exit_code_str), "%d", exit_code);
|
||||||
char* msg = join6("pipeline_end ", pipeline_id, " ", name, " ", exit_code_str);
|
char* msg = join6("pipeline_end ", pipeline_id, " ", name, " ", exit_code_str);
|
||||||
if(mq_send(api_out, msg, strnlen(msg, 256), 1) == -1)
|
api_send(msg);
|
||||||
perror("mq_send");
|
|
||||||
free(msg);
|
free(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void api_started() {
|
void api_started() {
|
||||||
if(mq_send(api_out, "sci_started", 12, 1) == -1)
|
api_send("sci_started");
|
||||||
perror("mq_send");
|
|
||||||
}
|
}
|
||||||
|
|||||||
+35
-21
@@ -18,30 +18,15 @@
|
|||||||
#include "notify.h"
|
#include "notify.h"
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
#include "log.h"
|
#include "log.h"
|
||||||
|
#include <errno.h>
|
||||||
|
#include <fcntl.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
|
||||||
#define EV_SIZE sizeof(struct inotify_event)
|
#define EV_SIZE sizeof(struct inotify_event)
|
||||||
#define BUF_LEN EV_SIZE * 32
|
#define BUF_LEN EV_SIZE * 32
|
||||||
|
|
||||||
void listen_for_changes(const pipeline_conf* config, notify_callback callback) {
|
static void dispatch_pipeline_events(const pipeline_conf* config, notify_callback callback, char* buffer, ssize_t len) {
|
||||||
// TODO: callback is potentially slow. We should also poll once after calling callback
|
for(int i = 0; i < len; ) {
|
||||||
const char* filename = config->trigger;
|
|
||||||
if(access(filename, F_OK) != 0) {
|
|
||||||
log_trace("file does not exist yet, creating it.");
|
|
||||||
FILE* f = fopen(filename, "w+");
|
|
||||||
fclose(f);
|
|
||||||
}
|
|
||||||
int fd = inotify_init();
|
|
||||||
ASSERT_SYSCALL_SUCCESS(fd);
|
|
||||||
inotify_add_watch(fd, filename, IN_ATTRIB);
|
|
||||||
log_trace("listening for changes in file: %s", filename);
|
|
||||||
char buffer[BUF_LEN];
|
|
||||||
int r = read(fd, buffer, BUF_LEN);
|
|
||||||
if(r == -1) {
|
|
||||||
perror("read");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
for(int i = 0; i < r; ) {
|
|
||||||
struct inotify_event* e = (struct inotify_event*)&buffer[i];
|
struct inotify_event* e = (struct inotify_event*)&buffer[i];
|
||||||
pipeline_event* ev = malloc(sizeof(pipeline_event));
|
pipeline_event* ev = malloc(sizeof(pipeline_event));
|
||||||
ev->event = e;
|
ev->event = e;
|
||||||
@@ -52,6 +37,35 @@ void listen_for_changes(const pipeline_conf* config, notify_callback callback) {
|
|||||||
callback(ev);
|
callback(ev);
|
||||||
i += EV_SIZE + e->len;
|
i += EV_SIZE + e->len;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void listen_for_changes(const pipeline_conf* config, notify_callback callback) {
|
||||||
|
const char* filename = config->trigger;
|
||||||
|
if(access(filename, F_OK) != 0) {
|
||||||
|
log_trace("file does not exist yet, creating it.");
|
||||||
|
FILE* f = fopen(filename, "w+");
|
||||||
|
fclose(f);
|
||||||
|
}
|
||||||
|
int fd = inotify_init();
|
||||||
|
ASSERT_SYSCALL_SUCCESS(fd);
|
||||||
|
ASSERT_SYSCALL_SUCCESS(inotify_add_watch(fd, filename, IN_ATTRIB));
|
||||||
|
log_trace("listening for changes in file: %s", filename);
|
||||||
|
char buffer[BUF_LEN];
|
||||||
|
ssize_t r = read(fd, buffer, BUF_LEN);
|
||||||
|
if(r == -1) {
|
||||||
|
perror("read");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
dispatch_pipeline_events(config, callback, buffer, r);
|
||||||
|
|
||||||
|
int flags = fcntl(fd, F_GETFL, 0);
|
||||||
|
if(flags != -1 && fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1) {
|
||||||
|
while((r = read(fd, buffer, BUF_LEN)) > 0)
|
||||||
|
dispatch_pipeline_events(config, callback, buffer, r);
|
||||||
|
if(r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
|
||||||
|
perror("read");
|
||||||
|
}
|
||||||
|
|
||||||
ASSERT_SYSCALL_SUCCESS(close(fd));
|
ASSERT_SYSCALL_SUCCESS(close(fd));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -62,10 +76,10 @@ void listen_for_config_changes(const char* config_filepath, config_change_callba
|
|||||||
}
|
}
|
||||||
int fd = inotify_init();
|
int fd = inotify_init();
|
||||||
ASSERT_SYSCALL_SUCCESS(fd);
|
ASSERT_SYSCALL_SUCCESS(fd);
|
||||||
inotify_add_watch(fd, config_filepath, IN_ATTRIB);
|
ASSERT_SYSCALL_SUCCESS(inotify_add_watch(fd, config_filepath, IN_ATTRIB));
|
||||||
log_trace("listening for changes in file: %s", config_filepath);
|
log_trace("listening for changes in file: %s", config_filepath);
|
||||||
char buffer[BUF_LEN];
|
char buffer[BUF_LEN];
|
||||||
int r = read(fd, buffer, BUF_LEN);
|
ssize_t r = read(fd, buffer, BUF_LEN);
|
||||||
if(r == -1) {
|
if(r == -1) {
|
||||||
perror("read");
|
perror("read");
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -0,0 +1,35 @@
|
|||||||
|
.TH sci-api 7 2024-08-17 "VERSION" "Simple CI manual"
|
||||||
|
|
||||||
|
.SH NAME
|
||||||
|
sci-api - POSIX message queue interface for sci
|
||||||
|
|
||||||
|
.SH DESCRIPTION
|
||||||
|
.B sci
|
||||||
|
exposes a small local API through POSIX message queues.
|
||||||
|
Clients send requests to
|
||||||
|
.I /sci_requests
|
||||||
|
and receive events and responses from
|
||||||
|
.I /sci_events.
|
||||||
|
|
||||||
|
.SH REQUESTS
|
||||||
|
.TP
|
||||||
|
.B list
|
||||||
|
Return the currently running pipeline IDs as a newline-separated message.
|
||||||
|
An empty message means no pipelines are running.
|
||||||
|
|
||||||
|
.SH EVENTS
|
||||||
|
.TP
|
||||||
|
.B sci_started
|
||||||
|
The daemon API thread has started.
|
||||||
|
.TP
|
||||||
|
.BI pipeline_new " pipeline_id"
|
||||||
|
A pipeline process has started.
|
||||||
|
.TP
|
||||||
|
.BI pipeline_end " pipeline_id name exit_status"
|
||||||
|
A pipeline process has exited.
|
||||||
|
|
||||||
|
.SH LIMITS
|
||||||
|
Messages are limited to 8192 bytes.
|
||||||
|
|
||||||
|
.SH SEE ALSO
|
||||||
|
.BR sci (1)
|
||||||
@@ -87,3 +87,4 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|||||||
|
|
||||||
.SH "SEE ALSO"
|
.SH "SEE ALSO"
|
||||||
.BR sci (5),
|
.BR sci (5),
|
||||||
|
.BR sci-api (7),
|
||||||
|
|||||||
+21
-9
@@ -18,19 +18,23 @@
|
|||||||
#include "threadlist.h"
|
#include "threadlist.h"
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
|
||||||
// TODO: mutex should really be per-list.
|
static pthread_list_node* threadlist_root(pthread_list_node* node) {
|
||||||
pthread_mutex_t threadlist_mutex = PTHREAD_MUTEX_INITIALIZER;
|
while(node != NULL && node->previous != NULL)
|
||||||
|
node = node->previous;
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
pthread_list_node* create_thread_node(pthread_t thread) {
|
pthread_list_node* create_thread_node(pthread_t thread) {
|
||||||
pthread_list_node* new = malloc(sizeof(pthread_list_node));
|
pthread_list_node* new = malloc(sizeof(pthread_list_node));
|
||||||
new->previous = NULL;
|
new->previous = NULL;
|
||||||
new->next = NULL;
|
new->next = NULL;
|
||||||
new->thread = thread;
|
new->thread = thread;
|
||||||
|
pthread_mutex_init(&new->mutex, NULL);
|
||||||
return new;
|
return new;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_list_node* add_thread(pthread_t thread, pthread_list_node* root) {
|
pthread_list_node* add_thread(pthread_t thread, pthread_list_node* root) {
|
||||||
pthread_mutex_lock(&threadlist_mutex);
|
pthread_mutex_lock(&root->mutex);
|
||||||
pthread_list_node* cursor = root;
|
pthread_list_node* cursor = root;
|
||||||
while(cursor->next != NULL)
|
while(cursor->next != NULL)
|
||||||
cursor = cursor->next;
|
cursor = cursor->next;
|
||||||
@@ -38,41 +42,49 @@ pthread_list_node* add_thread(pthread_t thread, pthread_list_node* root) {
|
|||||||
new->previous = cursor;
|
new->previous = cursor;
|
||||||
new->next = NULL;
|
new->next = NULL;
|
||||||
new->thread = thread;
|
new->thread = thread;
|
||||||
|
pthread_mutex_init(&new->mutex, NULL);
|
||||||
cursor->next = new;
|
cursor->next = new;
|
||||||
pthread_mutex_unlock(&threadlist_mutex);
|
pthread_mutex_unlock(&root->mutex);
|
||||||
return new;
|
return new;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_list_node* add_thread_node(pthread_list_node* root, pthread_list_node* node) {
|
pthread_list_node* add_thread_node(pthread_list_node* root, pthread_list_node* node) {
|
||||||
pthread_mutex_lock(&threadlist_mutex);
|
pthread_mutex_lock(&root->mutex);
|
||||||
pthread_list_node* cursor = root;
|
pthread_list_node* cursor = root;
|
||||||
while(cursor->next != NULL)
|
while(cursor->next != NULL)
|
||||||
cursor = cursor->next;
|
cursor = cursor->next;
|
||||||
node->previous = cursor;
|
node->previous = cursor;
|
||||||
node->next = NULL;
|
node->next = NULL;
|
||||||
cursor->next = node;
|
cursor->next = node;
|
||||||
pthread_mutex_unlock(&threadlist_mutex);
|
pthread_mutex_unlock(&root->mutex);
|
||||||
return node;
|
return node;
|
||||||
}
|
}
|
||||||
|
|
||||||
void remove_thread_node(pthread_list_node* node) {
|
void remove_thread_node(pthread_list_node* node) {
|
||||||
pthread_mutex_lock(&threadlist_mutex);
|
if(node == NULL)
|
||||||
|
return;
|
||||||
|
pthread_list_node* root = threadlist_root(node);
|
||||||
|
pthread_mutex_lock(&root->mutex);
|
||||||
pthread_list_node* prev = node->previous;
|
pthread_list_node* prev = node->previous;
|
||||||
pthread_list_node* next = node->next;
|
pthread_list_node* next = node->next;
|
||||||
free(node);
|
|
||||||
if(prev != NULL)
|
if(prev != NULL)
|
||||||
prev->next = next;
|
prev->next = next;
|
||||||
if(next != NULL)
|
if(next != NULL)
|
||||||
next->previous = prev;
|
next->previous = prev;
|
||||||
pthread_mutex_unlock(&threadlist_mutex);
|
pthread_mutex_unlock(&root->mutex);
|
||||||
|
pthread_mutex_destroy(&node->mutex);
|
||||||
|
free(node);
|
||||||
}
|
}
|
||||||
|
|
||||||
void clear_thread_list(pthread_list_node* root) {
|
void clear_thread_list(pthread_list_node* root) {
|
||||||
|
if(root == NULL)
|
||||||
|
return;
|
||||||
pthread_list_node* cursor = root;
|
pthread_list_node* cursor = root;
|
||||||
while(cursor != NULL) {
|
while(cursor != NULL) {
|
||||||
pthread_join(cursor->thread, NULL);
|
pthread_join(cursor->thread, NULL);
|
||||||
pthread_list_node* prev = cursor;
|
pthread_list_node* prev = cursor;
|
||||||
cursor = cursor->next;
|
cursor = cursor->next;
|
||||||
|
pthread_mutex_destroy(&prev->mutex);
|
||||||
free(prev);
|
free(prev);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+19
-12
@@ -41,28 +41,33 @@ static void threadpool_work_destroy(threadpool_work* work) {
|
|||||||
free(work);
|
free(work);
|
||||||
}
|
}
|
||||||
|
|
||||||
static threadpool_work* threadpool_work_pop(threadpool* pool) {
|
static optional_threadpool_work threadpool_work_pop(threadpool* pool) {
|
||||||
threadpool_work* work;
|
optional_threadpool_work result;
|
||||||
|
result.value = NULL;
|
||||||
|
result.has_value = false;
|
||||||
if (pool == NULL)
|
if (pool == NULL)
|
||||||
return NULL;
|
return result;
|
||||||
|
|
||||||
work = pool->work_first;
|
threadpool_work* work = pool->work_first;
|
||||||
if (work == NULL)
|
if (work == NULL)
|
||||||
return NULL; // TODO: This should propbably be using optionals
|
return result;
|
||||||
|
|
||||||
if (work->next == NULL) {
|
if (work->next == NULL) {
|
||||||
pool->work_first = NULL;
|
pool->work_first = NULL;
|
||||||
pool->work_last = NULL;
|
pool->work_last = NULL;
|
||||||
return work;
|
result.value = work;
|
||||||
|
result.has_value = true;
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
pool->work_first = work->next;
|
pool->work_first = work->next;
|
||||||
return work;
|
result.value = work;
|
||||||
|
result.has_value = true;
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* threadpool_worker(void* arg) {
|
static void* threadpool_worker(void* arg) {
|
||||||
log_trace("threadpool worker spawned");
|
log_trace("threadpool worker spawned");
|
||||||
threadpool* pool = arg;
|
threadpool* pool = arg;
|
||||||
threadpool_work* work;
|
|
||||||
while(true) {
|
while(true) {
|
||||||
// Wait for work
|
// Wait for work
|
||||||
pthread_mutex_lock(&(pool->work_mutex));
|
pthread_mutex_lock(&(pool->work_mutex));
|
||||||
@@ -70,17 +75,19 @@ static void* threadpool_worker(void* arg) {
|
|||||||
pthread_cond_wait(&(pool->work_cond), &(pool->work_mutex));
|
pthread_cond_wait(&(pool->work_cond), &(pool->work_mutex));
|
||||||
if (pool->stop)
|
if (pool->stop)
|
||||||
break;
|
break;
|
||||||
work = threadpool_work_pop(pool);
|
optional_threadpool_work work = threadpool_work_pop(pool);
|
||||||
|
if(work.has_value)
|
||||||
pool->working_count++;
|
pool->working_count++;
|
||||||
pthread_mutex_unlock(&(pool->work_mutex));
|
pthread_mutex_unlock(&(pool->work_mutex));
|
||||||
|
|
||||||
// Do the work
|
// Do the work
|
||||||
if (work != NULL) {
|
if (work.has_value) {
|
||||||
work->func(work->arg);
|
work.value->func(work.value->arg);
|
||||||
threadpool_work_destroy(work);
|
threadpool_work_destroy(work.value);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_lock(&(pool->work_mutex));
|
pthread_mutex_lock(&(pool->work_mutex));
|
||||||
|
if(work.has_value)
|
||||||
pool->working_count--;
|
pool->working_count--;
|
||||||
if (!pool->stop && pool->working_count == 0 && pool->work_first == NULL)
|
if (!pool->stop && pool->working_count == 0 && pool->work_first == NULL)
|
||||||
pthread_cond_signal(&(pool->working_cond));
|
pthread_cond_signal(&(pool->working_cond));
|
||||||
|
|||||||
Reference in New Issue
Block a user