feat: add threadpool and started pooling executors

This commit is contained in:
Asger Gitz-Johansen 2024-08-03 13:08:46 +02:00
parent e35dbdb1f3
commit dcc25f88a7
15 changed files with 544 additions and 85 deletions

View File

@ -15,6 +15,8 @@ BINDIR := out/bin
CFLAGS += -DSCI_VERSION="\"$(VERSION)\"" CFLAGS += -DSCI_VERSION="\"$(VERSION)\""
CFLAGS += -DSCI_NAME="\"$(NAME)\"" CFLAGS += -DSCI_NAME="\"$(NAME)\""
CFLAGS += -DSCI_DESCRIPTION="\"$(DESCRIPTION)\"" CFLAGS += -DSCI_DESCRIPTION="\"$(DESCRIPTION)\""
CFLAGS += -D_POSIX_C_SOURCE=2
CFLAGS += -D_GNU_SOURCE
# compiler flags # compiler flags
CFLAGS += -Wall -Werror -std=c23 -g CFLAGS += -Wall -Werror -std=c23 -g
# includes # includes
@ -33,6 +35,10 @@ OBJ += out/obj/main.o
OBJ += out/obj/cli.o OBJ += out/obj/cli.o
OBJ += out/obj/log.o OBJ += out/obj/log.o
OBJ += out/obj/notify.o OBJ += out/obj/notify.o
OBJ += out/obj/util.o
OBJ += out/obj/pipeline.o
OBJ += out/obj/threadlist.o
OBJ += out/obj/threadpool.o
out/bin/sci: $(OBJ) | $(BINDIR) out/bin/sci: $(OBJ) | $(BINDIR)
$(CC) -o $@ $(CFLAGS) $^ $(CC) -o $@ $(CFLAGS) $^

View File

@ -83,7 +83,10 @@ If you want `compile_commands.json` files, you should use [bear](https://github.
output file using cli options. output file using cli options.
- [x] Third things third, implement a thing that simultaneously watches two different files (multithreading). - [x] Third things third, implement a thing that simultaneously watches two different files (multithreading).
it should be cancellable with ctrl+c, but it should just contiuously print event notifications. it should be cancellable with ctrl+c, but it should just contiuously print event notifications.
- [ ] Fourth things fourth, implement a prototype that reads a space-separated file and populates a struct. - [x] Fourth things fourth, implement a prototype that reads a space-separated file and populates a struct.
- [ ] Fifth things fifth, implement a prototype that spawns a new thread that executes a shell command.
- [ ] Voila! You're there!
- [ ] Second iteration! Now Reimplement it and do it better!
### Note Regarding `inotify` usage ### Note Regarding `inotify` usage
From the manpage: From the manpage:

View File

@ -4,11 +4,8 @@
#include "optional.h" #include "optional.h"
typedef struct { typedef struct {
// prototyping optional_str config_file;
optional_str file; int executors;
optional_str file2;
// actual
int verbosity; int verbosity;
bool help; bool help;
bool version; bool version;
@ -20,7 +17,7 @@ typedef struct {
cli_options new_options(); cli_options new_options();
// Delete a cli_options struct instance. // Delete a cli_options struct instance.
void free_options(cli_options v); void destroy_options(cli_options v);
// Print the help message. // Print the help message.
void print_help(FILE * out, char* prog_name); void print_help(FILE * out, char* prog_name);

View File

@ -1,11 +1,12 @@
#ifndef SCI_NOTIFY_H #ifndef SCI_NOTIFY_H
#define SCI_NOTIFY_H #define SCI_NOTIFY_H
#include "pipeline.h"
#include <sys/inotify.h> #include <sys/inotify.h>
typedef void(*notify_callback)(struct inotify_event* const); typedef void(*notify_callback)(pipeline_event* const);
// Start listening for changes to the provided file. // Start listening for changes to the provided file.
// Note that the `struct inotify_event*` provided is a managed pointer. // Note that the `struct inotify_event*` provided is a managed pointer.
void listen_for_changes(const char* filename, notify_callback callback); void listen_for_changes(const pipeline_conf* config, notify_callback callback);
#endif #endif

25
include/pipeline.h Normal file
View File

@ -0,0 +1,25 @@
#ifndef SCI_PIPELINE_H
#define SCI_PIPELINE_H
#include "optional.h"
#include <pthread.h>
#include <sys/inotify.h>
typedef struct {
char* name;
char* url;
char* trigger;
char* command;
} pipeline_conf;
typedef optional_type(pipeline_conf*) optional_pipeline_conf;
typedef struct {
const struct inotify_event* event;
const char* command;
} pipeline_event;
// create a new pipeline_conf struct instance based on a configuration line.
optional_pipeline_conf pipeline_create(const char* config_line);
void pipeline_register(pthread_t thread);
void pipeline_loop();
#endif

33
include/threadlist.h Normal file
View File

@ -0,0 +1,33 @@
#ifndef SCI_THREADLIST_H
#define SCI_THREADLIST_H
#include <pthread.h>
// doubly linked list implementation for managing threads
typedef struct pthread_list_node {
pthread_t thread;
struct pthread_list_node* previous;
struct pthread_list_node* next;
} pthread_list_node;
// Create a new root node.
pthread_list_node* create_thread_node(pthread_t thread);
// Add a new node to the end of the list.
// Returns the new node.
pthread_list_node* add_thread(pthread_t thread, pthread_list_node* root);
pthread_list_node* add_thread_node(pthread_list_node* root, pthread_list_node* node);
// Remove a node from the list.
// This will not call pthread_join, so make sure that the thread is joined before calling.
void remove_thread_node(pthread_list_node* node);
// Completely clear the thread list.
// This will call pthread_join on all nodes.
// The list is completely invalid after this call and should be discarded.
// Note:
// - `root` has already been free'd.
// - this function is not thread-safe.
void clear_thread_list(pthread_list_node* root);
#endif

42
include/threadpool.h Normal file
View File

@ -0,0 +1,42 @@
#ifndef SCI_THREADPOOL_H
#define SCI_THREADPOOL_H
#include <stdbool.h>
#include <stddef.h>
#include <pthread.h>
// Very inspired by: https://nachtimwald.com/2019/04/12/thread-pool-in-c/
// a work function
typedef void (*thread_func)(void *arg);
// linked list for work functions
typedef struct threadpool_work {
thread_func func;
void* arg;
struct threadpool_work* next;
} threadpool_work;
// thread pool object
typedef struct {
threadpool_work* work_first;
threadpool_work* work_last;
pthread_mutex_t work_mutex;
pthread_cond_t work_cond;
pthread_cond_t working_cond;
size_t working_count;
size_t thread_count;
bool stop;
} threadpool;
// create a new threadpool instance with `num` amount of worker threads
threadpool* threadpool_create(size_t num);
// destroy a threadpool instance
void threadpool_destroy(threadpool* pool);
// add work to the threadpool
bool threadpool_add_work(threadpool* pool, thread_func func, void* arg);
// wait for the remaining work in the threadpool to complete
void threadpool_wait(threadpool* pool);
#endif

View File

@ -13,5 +13,13 @@
assert(fd != -1); \ assert(fd != -1); \
} \ } \
} while (0) } while (0)
#define ARRAY_SIZE(a) (sizeof((a)) / sizeof((a)[0]))
// remove whitespace characters on both ends of the string.
// Retuirns a new string that you must free.
char* trim(const char* const str);
typedef void(*line_handler)(const char*);
void per_line(const char* file, line_handler handler);
#endif #endif

View File

@ -6,16 +6,11 @@
cli_options new_options() { cli_options new_options() {
cli_options result; cli_options result;
result.file.has_value = false; result.config_file.has_value = false;
result.file.value = NULL; result.config_file.value = NULL;
result.executors = 32;
result.file2.has_value = false;
result.file2.value = NULL;
result.verbosity = 1; result.verbosity = 1;
result.help = false; result.help = false;
result.version = false; result.version = false;
char *no_color = getenv("NO_COLOR"); char *no_color = getenv("NO_COLOR");
@ -29,26 +24,23 @@ cli_options new_options() {
return result; return result;
} }
void free_options(cli_options v) { void destroy_options(cli_options v) {
if(v.file.has_value) if(v.config_file.has_value)
free(v.file.value); free(v.config_file.value);
if(v.file2.has_value) if(v.log_file.has_value)
free(v.file2.value); free(v.log_file.value);
} }
// <max // <max
const char* optstring = "f:F:v:Cl:hV"; const char* optstring = "f:e:v:Cl:hV";
const char* help_msg = const char* help_msg =
"Usage: %s [-v level] [-C] [-l file] [-h] [-V]\n" "Usage: %s [-f file] [-e count] [-v level] [-C] [-l file] [-h] [-V]\n"
"\n" "\n"
SCI_DESCRIPTION "\n" SCI_DESCRIPTION "\n"
"\n" "\n"
"THIS PROGRAM IS STILL JUST A PROTOTYPE, AND NOT\n"
"ACTUALLY USEFUL YET\n"
"\n"
"OPTIONS:\n" "OPTIONS:\n"
" -f file (WIP) set file\n" " -f file Set sci config file\n"
" -F file (WIP) set another file\n" " -e count Set the amount of worker threads\n"
" -v level Set verbosity level [0-4]\n" " -v level Set verbosity level [0-4]\n"
" -C Force color output, ignoring $NO_COLOR\n" " -C Force color output, ignoring $NO_COLOR\n"
" -l file Set log to output to a file\n" " -l file Set log to output to a file\n"
@ -67,16 +59,15 @@ cli_options parse(int argc, char** argv) {
while((opt = getopt(argc, argv, optstring)) != -1) { while((opt = getopt(argc, argv, optstring)) != -1) {
switch(opt) { switch(opt) {
case 'f': case 'f':
options.file.value = strdup(optarg); options.config_file.value = strdup(optarg);
options.file.has_value = true; options.config_file.has_value = true;
break;
case 'F':
options.file2.value = strdup(optarg);
options.file2.has_value = true;
break; break;
case 'v': case 'v':
options.verbosity = atoi(optarg); options.verbosity = atoi(optarg);
break; break;
case 'e':
options.executors = atoi(optarg);
break;
case 'C': case 'C':
options.use_colors = true; options.use_colors = true;
break; break;

View File

@ -1,31 +1,72 @@
#include "cli.h" #include "cli.h"
#include "log.h" #include "log.h"
#include "notify.h" #include "notify.h"
#include "pipeline.h"
#include "threadpool.h"
#include "util.h" #include "util.h"
#include <bits/pthreadtypes.h>
#include <pthread.h>
#include <regex.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#include <pthread.h> void executor(void* data) {
const char* command = data;
system(command);
}
void on_event(struct inotify_event* const e) { threadpool* pool = NULL;
const char* msg =
"got an event:\n" void on_event(pipeline_event* const e) {
" wd: %d\n" threadpool_add_work(pool, executor, (void*)e->command);
" mask: %d\n"
" cookie: %d\n"
" len: %d\n"
" name: %s"
;
log_info(msg, e->wd, e->mask, e->cookie, e->len, e->name);
} }
void* listen_for_changes_thread(void* data) { void* listen_for_changes_thread(void* data) {
const char* f = (const char*)data; const pipeline_conf* conf = (const pipeline_conf*)data;
listen_for_changes(f, &on_event); while(1) // TODO: Should be while(sigint_has_not_been_caught) instead
listen_for_changes(conf, &on_event);
// We're now done with the config.
free(conf->name);
free(conf->url);
free(conf->trigger);
free(conf->command);
free(data);
return NULL; return NULL;
} }
// Spawn a thread that fires events whenever the provided trigger file has changed.
// Usage:
// `pthread_t t = spawn_listener(config);`
// `pthread_join(t, NULL);`
pthread_t spawn_listener(const pipeline_conf* conf) {
log_info("spawning trigger thread for %s", conf->name);
pthread_t result;
ASSERT_SYSCALL_SUCCESS(pthread_create(&result, NULL, &listen_for_changes_thread, (void*)conf));
pthread_setname_np(result, "sci-listener");
return result;
}
void config_interpret_line(const char* line) {
optional_pipeline_conf conf = pipeline_create(line);
if(!conf.has_value) {
log_error("unable to register pipeline");
return;
}
char* dest;
// NOTE: trigger names are allowed max 32 characters
dest = malloc(sizeof(*dest) * (9+33));
dest[0] = '\0';
strncat(dest, "/tmp/sci/", 10);
strncat(dest, conf.value->trigger, 33);
free(conf.value->trigger);
conf.value->trigger = dest;
pthread_t t = spawn_listener(conf.value);
pipeline_register(t);
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
cli_options args = parse(argc, argv); cli_options args = parse(argc, argv);
log_settings settings; log_settings settings;
@ -33,51 +74,34 @@ int main(int argc, char** argv) {
settings.use_colors = args.use_colors; settings.use_colors = args.use_colors;
settings.out_file = args.log_file.has_value ? fopen(args.log_file.value, "w+") : stdout; settings.out_file = args.log_file.has_value ? fopen(args.log_file.value, "w+") : stdout;
log_init(settings); log_init(settings);
pool = threadpool_create(args.executors);
if(args.help) { if(args.help) {
print_help(stdout, argv[0]); print_help(stdout, argv[0]);
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
if(args.version) { if(args.version) {
fprintf(stdout, SCI_VERSION "\n"); fprintf(stdout, SCI_VERSION "\n");
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
if(!args.file.has_value) { if(!args.config_file.has_value) {
fprintf(stderr, "no file provided see -h for usage\n"); fprintf(stderr, "no file provided see -h for usage\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if(!args.file2.has_value) {
fprintf(stderr, "no second file provided see -h for usage\n"); struct stat st = {0};
if(stat("/tmp/sci", &st) == -1)
mkdir("/tmp/sci", 0700);
if(access(args.config_file.value, F_OK) != 0) {
fprintf(stderr, "no such file or directory %s\n", args.config_file.value);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
per_line(args.config_file.value, &config_interpret_line);
if(access(args.file.value, F_OK) != 0) { pipeline_loop();
fprintf(stderr, "no such file or directory %s\n", args.file.value); threadpool_destroy(pool);
exit(EXIT_FAILURE);
}
if(access(args.file2.value, F_OK) != 0) {
fprintf(stderr, "no such file or directory %s\n", args.file2.value);
exit(EXIT_FAILURE);
}
log_trace("spawning trigger thread for %s", args.file.value);
pthread_t file1_thread;
pthread_attr_t attr1;
ASSERT_SYSCALL_SUCCESS(pthread_attr_init(&attr1));
ASSERT_SYSCALL_SUCCESS(pthread_create(&file1_thread, &attr1, &listen_for_changes_thread, (void*)args.file.value));
ASSERT_SYSCALL_SUCCESS(pthread_attr_destroy(&attr1));
log_trace("spawning trigger thread for %s", args.file2.value);
pthread_t file2_thread;
pthread_attr_t attr2;
ASSERT_SYSCALL_SUCCESS(pthread_attr_init(&attr2));
ASSERT_SYSCALL_SUCCESS(pthread_create(&file2_thread, &attr1, &listen_for_changes_thread, (void*)args.file2.value));
ASSERT_SYSCALL_SUCCESS(pthread_attr_destroy(&attr2));
pthread_join(file1_thread, NULL);
pthread_join(file2_thread, NULL);
free_options(args);
} }

View File

@ -5,20 +5,28 @@
#define EV_SIZE sizeof(struct inotify_event) #define EV_SIZE sizeof(struct inotify_event)
#define BUF_LEN EV_SIZE * 32 #define BUF_LEN EV_SIZE * 32
void listen_for_changes(const char* filename, notify_callback callback) { void listen_for_changes(const pipeline_conf* config, notify_callback callback) {
// TODO: callback is a bit slow sometimes. We should also poll once after calling callback
const char* filename = config->trigger;
if(access(filename, F_OK) != 0) {
log_trace("file does not exist yet, creating it.");
FILE* f = fopen(filename, "w+");
fclose(f);
}
int fd = inotify_init(); int fd = inotify_init();
ASSERT_SYSCALL_SUCCESS(fd); ASSERT_SYSCALL_SUCCESS(fd);
inotify_add_watch(fd, filename, IN_ATTRIB); inotify_add_watch(fd, filename, IN_ATTRIB);
log_trace("listening for changes in file: %s", filename);
log_info("listening for changes in file: %s", filename);
char buffer[BUF_LEN]; char buffer[BUF_LEN];
int r = read(fd, buffer, BUF_LEN); int r = read(fd, buffer, BUF_LEN);
assert(r != -1); assert(r != -1);
for(int i = 0; i < r; ) { for(int i = 0; i < r; ) {
struct inotify_event* e = (struct inotify_event*)&buffer[i]; struct inotify_event* e = (struct inotify_event*)&buffer[i];
callback(e); pipeline_event ev;
ev.event = e;
ev.command = config->command;
callback(&ev);
i += EV_SIZE + e->len; i += EV_SIZE + e->len;
} }
ASSERT_SYSCALL_SUCCESS(close(fd)); // TODO: have a hashmap of threads (see readme) ASSERT_SYSCALL_SUCCESS(close(fd));
} }

65
src/pipeline.c Normal file
View File

@ -0,0 +1,65 @@
#include "log.h"
#include "pipeline.h"
#include "threadlist.h"
#include "util.h"
#include <assert.h>
#include <regex.h>
#include <stdlib.h>
pthread_list_node* root = NULL;
optional_pipeline_conf pipeline_create(const char* config_line) {
optional_pipeline_conf result;
result.has_value = false;
const char* pattern = "[^[:blank:]]+|\"[^\"]*\"";
regex_t reg;
regmatch_t pmatch[1];
regoff_t off, len;
assert(regcomp(&reg, pattern, REG_EXTENDED) == 0);
const char* cursor = config_line;
char* opts[4];
int i = 0;
for( ; i < 4; i++) {
if(regexec(&reg, cursor, ARRAY_SIZE(pmatch), pmatch, 0))
break;
off = pmatch[0].rm_so + (cursor - config_line);
len = pmatch[0].rm_eo - pmatch[0].rm_so;
opts[i] = strndup(config_line + off, len);
cursor += pmatch[0].rm_eo;
}
if(i != 4) {
log_error("invalid configuration!\nline is invalid: \"%s\"");
for(int j = i-1; j >= 0; j--)
free(opts[j]);
return result;
}
result.value = malloc(sizeof(pipeline_conf));
result.value->name = opts[0];
result.value->url = opts[1];
result.value->trigger = opts[2];
result.value->command = opts[3];
const char* msg =
"config:\n"
"name: %s\n"
"url: %s\n"
"trigger: %s\n"
"command: %s"
;
log_trace(msg, result.value->name, result.value->url, result.value->trigger, result.value->command);
result.has_value = true;
return result;
}
void pipeline_register(pthread_t thread) {
if(root == NULL) {
root = create_thread_node(thread);
return;
}
add_thread(thread, root);
}
void pipeline_loop() {
clear_thread_list(root);
root = NULL;
}

60
src/threadlist.c Normal file
View File

@ -0,0 +1,60 @@
#include "threadlist.h"
#include <stdlib.h>
// TODO: mutex should really be per-list.
pthread_mutex_t threadlist_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_list_node* create_thread_node(pthread_t thread) {
pthread_list_node* new = malloc(sizeof(pthread_list_node));
new->previous = NULL;
new->next = NULL;
new->thread = thread;
return new;
}
pthread_list_node* add_thread(pthread_t thread, pthread_list_node* root) {
pthread_mutex_lock(&threadlist_mutex);
pthread_list_node* cursor = root;
while(cursor->next != NULL)
cursor = cursor->next;
pthread_list_node* new = malloc(sizeof(pthread_list_node));
new->previous = cursor;
new->next = NULL;
new->thread = thread;
cursor->next = new;
pthread_mutex_unlock(&threadlist_mutex);
return new;
}
pthread_list_node* add_thread_node(pthread_list_node* root, pthread_list_node* node) {
pthread_mutex_lock(&threadlist_mutex);
pthread_list_node* cursor = root;
while(cursor->next != NULL)
cursor = cursor->next;
node->previous = cursor;
node->next = NULL;
cursor->next = node;
pthread_mutex_unlock(&threadlist_mutex);
return node;
}
void remove_thread_node(pthread_list_node* node) {
pthread_mutex_lock(&threadlist_mutex);
pthread_list_node* prev = node->previous;
pthread_list_node* next = node->next;
free(node);
if(prev != NULL)
prev->next = next;
if(next != NULL)
next->previous = prev;
pthread_mutex_unlock(&threadlist_mutex);
}
void clear_thread_list(pthread_list_node* root) {
pthread_list_node* cursor = root;
while(cursor != NULL) {
pthread_join(cursor->thread, NULL);
cursor = cursor->next;
free(cursor->previous);
}
}

158
src/threadpool.c Normal file
View File

@ -0,0 +1,158 @@
#include "threadpool.h"
#include "log.h"
#include <stdlib.h>
static threadpool_work* threadpool_work_create(thread_func func, void *arg) {
threadpool_work* result;
if (func == NULL)
return NULL;
result = malloc(sizeof(threadpool_work));
result->func = func;
result->arg = arg;
result->next = NULL;
return result;
}
static void threadpool_work_destroy(threadpool_work* work) {
if (work == NULL)
return;
free(work);
}
static threadpool_work* threadpool_work_pop(threadpool* pool) {
threadpool_work* work;
if (pool == NULL)
return NULL;
work = pool->work_first;
if (work == NULL)
return NULL; // TODO: This should propbably be using optionals
if (work->next == NULL) {
pool->work_first = NULL;
pool->work_last = NULL;
return work;
}
pool->work_first = work->next;
return work;
}
static void* threadpool_worker(void *arg) {
log_trace("threadpool worker spawned");
threadpool* pool = arg;
threadpool_work* work;
while(true) {
// Wait for work
pthread_mutex_lock(&(pool->work_mutex));
while (pool->work_first == NULL && !pool->stop)
pthread_cond_wait(&(pool->work_cond), &(pool->work_mutex));
if (pool->stop)
break;
work = threadpool_work_pop(pool);
pool->working_count++;
pthread_mutex_unlock(&(pool->work_mutex));
// Do the work
if (work != NULL) {
work->func(work->arg);
threadpool_work_destroy(work);
}
pthread_mutex_lock(&(pool->work_mutex));
pool->working_count--;
if (!pool->stop && pool->working_count == 0 && pool->work_first == NULL)
pthread_cond_signal(&(pool->working_cond));
pthread_mutex_unlock(&(pool->work_mutex));
}
pool->thread_count--;
pthread_cond_signal(&(pool->working_cond));
pthread_mutex_unlock(&(pool->work_mutex));
return NULL;
}
threadpool* threadpool_create(size_t num) {
log_trace("creating threadpool of size %d", num);
threadpool* result;
pthread_t thread;
size_t i;
if (num == 0)
num = 2;
result = calloc(1, sizeof(threadpool));
result->thread_count = num;
pthread_mutex_init(&(result->work_mutex), NULL);
pthread_cond_init(&(result->work_cond), NULL);
pthread_cond_init(&(result->working_cond), NULL);
result->work_first = NULL;
result->work_last = NULL;
for (i=0; i<num; i++) {
pthread_create(&thread, NULL, threadpool_worker, result);
pthread_setname_np(thread, "sci-worker");
pthread_detach(thread);
}
return result;
}
// TODO: add log statements
void threadpool_destroy(threadpool* pool) {
log_trace("destroying threadpool");
threadpool_work* work;
threadpool_work* work2;
if (pool == NULL)
return;
pthread_mutex_lock(&(pool->work_mutex));
work = pool->work_first;
while (work != NULL) {
work2 = work->next;
threadpool_worker(work);
work = work2;
}
pool->work_first = NULL;
pool->stop = true;
pthread_cond_broadcast(&(pool->work_cond));
pthread_mutex_unlock(&(pool->work_mutex));
threadpool_wait(pool);
pthread_mutex_destroy(&(pool->work_mutex));
pthread_cond_destroy(&(pool->work_cond));
pthread_cond_destroy(&(pool->working_cond));
free(pool);
}
bool threadpool_add_work(threadpool* pool, thread_func func, void *arg) {
log_trace("adding work task to pool");
threadpool_work* work;
if (pool == NULL)
return false;
work = threadpool_work_create(func, arg);
if (work == NULL)
return false;
pthread_mutex_lock(&(pool->work_mutex));
if (pool->work_first == NULL) {
pool->work_first = work;
pool->work_last = pool->work_first;
} else {
pool->work_last->next = work;
pool->work_last = work;
}
pthread_cond_broadcast(&(pool->work_cond));
pthread_mutex_unlock(&(pool->work_mutex));
return true;
}
void threadpool_wait(threadpool* pool) {
if (pool == NULL)
return;
pthread_mutex_lock(&(pool->work_mutex));
while (1) {
if (pool->work_first != NULL || (!pool->stop && pool->working_count != 0) || (pool->stop && pool->thread_count != 0))
pthread_cond_wait(&(pool->working_cond), &(pool->work_mutex));
else
break;
}
pthread_mutex_unlock(&(pool->work_mutex));
}

38
src/util.c Normal file
View File

@ -0,0 +1,38 @@
#include "util.h"
#include "log.h"
#include <ctype.h>
#include <stdlib.h>
char* trim(const char* const str) {
char* begin = strdup(str);
char* end;
while(isspace((unsigned char)*begin))
begin++;
if(*begin == 0)
return begin;
end = begin + strlen(begin) - 1;
while(end > begin && isspace((unsigned char)*end))
end--;
*(end + 1) = '\0';
return begin;
}
void per_line(const char* file, line_handler handler) {
FILE* stream;
char* line = NULL;
size_t len = 0;
ssize_t nread;
log_trace("reading file %s", file);
stream = fopen(file, "r");
if(stream == NULL) {
perror("fopen");
return;
}
while((nread = getline(&line, &len, stream)) != -1) {
char* line_trimmed = trim(line);
handler(line_trimmed);
free(line_trimmed);
}
free(line);
fclose(stream);
}