blob: 0ea30be6317935363ff36047924f948d3dc4e395 [file] [edit]
// Copyright (c) 2017 Facebook Inc.
// Copyright (c) 2015-2017 Georgia Institute of Technology
// All rights reserved.
//
// Copyright 2019 Google LLC
//
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree.
// Needed for syscall.
#ifndef _GNU_SOURCE
#define _GNU_SOURCE 1
#endif // _GNU_SOURCE
/* Standard C headers */
#include <assert.h>
#include <limits.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
/* Configuration header */
#include <fxdiv.h>
#include "threadpool-common.h"
/* POSIX headers */
#if PTHREADPOOL_USE_PTHREADS
#include <pthread.h>
#include <unistd.h>
#else
#include <threads.h>
#endif // PTHREADPOOL_USE_PTHREADS
/* Futex-specific headers */
#if PTHREADPOOL_USE_FUTEX
#if defined(__linux__)
#include <linux/futex.h>
#include <sys/syscall.h>
/* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */
#ifndef SYS_futex
#define SYS_futex __NR_futex
#endif // SYS_futex
#ifndef FUTEX_PRIVATE_FLAG
#define FUTEX_PRIVATE_FLAG 128
#endif // FUTEX_PRIVATE_FLAG
#elif defined(__EMSCRIPTEN__)
/* math.h for INFINITY constant */
#include <emscripten/threading.h>
#include <math.h>
#else
#error \
"Platform-specific implementation of futex_wait and futex_wake_all required"
#endif // defined(__linux__)
#endif // PTHREADPOOL_USE_FUTEX
/* Windows-specific headers */
#ifdef _WIN32
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <windows.h>
#include <sysinfoapi.h>
#endif
/* Dependencies */
#if PTHREADPOOL_USE_CPUINFO
#include <cpuinfo.h>
#endif
/* Alloca */
#if defined(_MSC_VER)
#include <malloc.h>
#define alloca _alloca
#elif !defined(alloca) && defined(__GNUC__)
#define alloca(s) __builtin_alloca(s)
#else
#include <alloca.h>
#endif
/* Public library header */
#include <pthreadpool.h>
/* Internal library headers */
#include "threadpool-atomics.h"
#include "threadpool-object.h"
#include "threadpool-utils.h"
/* Logging-related headers */
#ifndef PTHREADPOOL_DEBUG_LOGGING
// Default value if not specified at compile time.
#define PTHREADPOOL_DEBUG_LOGGING 0
#endif // PTHREADPOOL_DEBUG_LOGGING
#if PTHREADPOOL_DEBUG_LOGGING
#include <stdio.h>
#if defined(__ARM_ARCH)
static uint64_t __rdtsc() {
uint64_t val;
asm volatile("mrs %0, cntvct_el0" : "=r"(val));
return val;
}
#endif // defined(__ARM_ARCH)
static size_t pthreadpool_get_ticks(size_t since) { return __rdtsc() - since; }
#define pthreadpool_log_debug(format, ...) \
fprintf(stderr, "[%zu] %s (%s:%i): " format "\n", pthreadpool_get_ticks(0), \
__FUNCTION__, __FILE__, __LINE__, ##__VA_ARGS__);
#else
#define pthreadpool_log_debug(format, ...)
#endif // PTHREADPOOL_DEBUG_LOGGING
#if PTHREADPOOL_USE_FUTEX
#if defined(__linux__)
static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) {
return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value,
NULL);
}
static int futex_wake_n(pthreadpool_atomic_uint32_t* address, uint32_t n) {
return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, n);
}
static int futex_wake_all(pthreadpool_atomic_uint32_t* address) {
return futex_wake_n(address, /*n=*/INT_MAX);
}
#elif defined(__EMSCRIPTEN__)
static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) {
return emscripten_futex_wait((volatile void*)address, value, INFINITY);
}
static int futex_wake_n(pthreadpool_atomic_uint32_t* address, uint32_t n) {
return emscripten_futex_wake((volatile void*)address, n);
}
static int futex_wake_all(pthreadpool_atomic_uint32_t* address) {
return emscripten_futex_wake((volatile void*)address, INT_MAX);
}
#else
#error \
"Platform-specific implementation of futex_wait and futex_wake_all required"
#endif
#endif
size_t pthreadpool_set_threads_count(struct pthreadpool* threadpool,
size_t num_threads) {
if (threadpool == NULL) {
return 1;
}
/* We shouldn't change this while a parallel computation is running. */
pthreadpool_mutex_lock(&threadpool->execution_mutex);
// Adjust `num_threads` to the feasible limits.
if (num_threads == 0) {
// Set to the maximum number of threads.
num_threads = threadpool->max_num_threads;
} else {
num_threads = max(num_threads, 1);
num_threads = min(num_threads, threadpool->max_num_threads);
}
// Check whether this is really a change.
if (num_threads != threadpool->threads_count) {
pthreadpool_store_release_size_t(&threadpool->threads_count, num_threads);
}
pthreadpool_log_debug("setting max_num_threads to %zu.", num_threads);
pthreadpool_mutex_unlock(&threadpool->execution_mutex);
return num_threads;
}
static void wait_on_num_recruited_threads(pthreadpool_t threadpool,
uint32_t expected) {
uint32_t num_recruited_threads =
pthreadpool_load_acquire_uint32_t(&threadpool->num_recruited_threads);
#if !PTHREADPOOL_USE_FUTEX
if (num_recruited_threads != expected) {
pthreadpool_mutex_lock(&threadpool->completion_mutex);
#endif // !PTHREADPOOL_USE_FUTEX
for (size_t iter = 0; num_recruited_threads != expected; iter++) {
// Just spin for the first few iterations.
if (iter < PTHREADPOOL_SPIN_WAIT_ITERATIONS) {
pthreadpool_yield(iter);
} else {
pthreadpool_log_debug("waiting on %u recruited threads...",
num_recruited_threads - expected);
#if PTHREADPOOL_USE_FUTEX
futex_wait(&threadpool->num_recruited_threads, num_recruited_threads);
#else
pthreadpool_cond_wait(&threadpool->completion_condvar,
&threadpool->completion_mutex);
#endif // PTHREADPOOL_USE_FUTEX
}
num_recruited_threads =
pthreadpool_load_acquire_uint32_t(&threadpool->num_recruited_threads);
}
#if !PTHREADPOOL_USE_FUTEX
pthreadpool_mutex_unlock(&threadpool->completion_mutex);
}
#endif // !PTHREADPOOL_USE_FUTEX
}
static int32_t wait_on_num_active_threads(pthreadpool_t threadpool,
uint32_t thread_id) {
int32_t curr_active_threads =
pthreadpool_load_consume_int32_t(&threadpool->num_active_threads);
if (curr_active_threads <= 0) {
#if !PTHREADPOOL_USE_FUTEX
pthreadpool_mutex_lock(&threadpool->num_active_threads_mutex);
#endif // !PTHREADPOOL_USE_FUTEX
for (size_t iter = 0; curr_active_threads <= 0; iter++) {
// Just spin for the first few iterations.
if (!(threadpool->flags & PTHREADPOOL_FLAG_DONT_SPIN_WORKERS) &&
iter < PTHREADPOOL_SPIN_WAIT_ITERATIONS) {
pthreadpool_yield(iter);
}
// If we've borrowed this thread from an executor, then we should return
// it to the executor instead of blocking it indefinitely.
else if (threadpool->executor.num_threads) {
return PTHREADPOOL_NUM_ACTIVE_THREADS_DONE;
}
// Otherwise, put this thread to sleep until `num_waiting_threads` is
// larger than zero.
else {
#if PTHREADPOOL_USE_FUTEX
// First increase the `num_waiting_threads` counter and re-check
// `num_active_threads` thereafter to avoid slipping past calls to
// `signal_num_active_threads`.
pthreadpool_fetch_add_acquire_release_uint32_t(
&threadpool->num_waiting_threads, 1);
if ((curr_active_threads = pthreadpool_load_consume_int32_t(
&threadpool->num_active_threads)) <= 0) {
// Use futex/condition signaling.
pthreadpool_log_debug(
"thread %u waiting on change in num active threads (curr=%i)...",
thread_id, curr_active_threads);
futex_wait(
(pthreadpool_atomic_uint32_t*)&threadpool->num_active_threads,
curr_active_threads);
}
pthreadpool_decrement_fetch_acquire_release_uint32_t(
&threadpool->num_waiting_threads);
#else
pthreadpool_log_debug(
"thread %u waiting on change in num active threads (curr=%i)...",
thread_id, curr_active_threads);
pthreadpool_cond_wait(&threadpool->num_active_threads_condvar,
&threadpool->num_active_threads_mutex);
#endif // PTHREADPOOL_USE_FUTEX
}
curr_active_threads =
pthreadpool_load_consume_int32_t(&threadpool->num_active_threads);
}
#if !PTHREADPOOL_USE_FUTEX
pthreadpool_mutex_unlock(&threadpool->num_active_threads_mutex);
#endif // !PTHREADPOOL_USE_FUTEX
}
return curr_active_threads;
}
static void wait_on_work_is_done(pthreadpool_t threadpool) {
int32_t work_is_done =
pthreadpool_exchange_acquire_uint32_t(&threadpool->work_is_done, 0);
#if !PTHREADPOOL_USE_FUTEX
if (!work_is_done) {
pthreadpool_mutex_lock(&threadpool->completion_mutex);
#endif // !PTHREADPOOL_USE_FUTEX
for (size_t iter = 0; !work_is_done; iter++) {
// Just spin for the first few iterations.
if (iter < PTHREADPOOL_SPIN_WAIT_ITERATIONS) {
pthreadpool_yield(iter);
} else {
// Use futex/condition signaling.
pthreadpool_log_debug("thread waiting on work_is_done...");
#if PTHREADPOOL_USE_FUTEX
futex_wait((pthreadpool_atomic_uint32_t*)&threadpool->work_is_done,
work_is_done);
#else
pthreadpool_cond_wait(&threadpool->completion_condvar,
&threadpool->completion_mutex);
#endif // PTHREADPOOL_USE_FUTEX
}
work_is_done =
pthreadpool_exchange_acquire_uint32_t(&threadpool->work_is_done, 0);
}
#if !PTHREADPOOL_USE_FUTEX
pthreadpool_mutex_unlock(&threadpool->completion_mutex);
}
#endif // !PTHREADPOOL_USE_FUTEX
}
static void signal_num_recruited_threads(pthreadpool_t threadpool) {
#if PTHREADPOOL_USE_FUTEX
futex_wake_all(&threadpool->num_recruited_threads);
#else
pthreadpool_mutex_lock(&threadpool->completion_mutex);
pthreadpool_cond_signal(&threadpool->completion_condvar);
pthreadpool_mutex_unlock(&threadpool->completion_mutex);
#endif // PTHREADPOOL_USE_FUTEX
}
static void signal_num_active_threads(pthreadpool_t threadpool,
uint32_t max_num_waiting) {
#if PTHREADPOOL_USE_FUTEX
const uint32_t num_waiting_threads =
pthreadpool_load_consume_uint32_t(&threadpool->num_waiting_threads);
if (num_waiting_threads > max_num_waiting) {
futex_wake_n((pthreadpool_atomic_uint32_t*)&threadpool->num_active_threads,
num_waiting_threads - max_num_waiting);
}
#else
pthreadpool_mutex_lock(&threadpool->num_active_threads_mutex);
pthreadpool_cond_broadcast(&threadpool->num_active_threads_condvar);
pthreadpool_mutex_unlock(&threadpool->num_active_threads_mutex);
#endif // PTHREADPOOL_USE_FUTEX
}
static void signal_work_is_done(pthreadpool_t threadpool) {
uint32_t prev_value = pthreadpool_exchange_acquire_release_uint32_t(
&threadpool->work_is_done, 1);
assert(prev_value == 0);
#if PTHREADPOOL_USE_FUTEX
futex_wake_all(&threadpool->work_is_done);
#else
pthreadpool_mutex_lock(&threadpool->completion_mutex);
pthreadpool_cond_signal(&threadpool->completion_condvar);
pthreadpool_mutex_unlock(&threadpool->completion_mutex);
#endif // PTHREADPOOL_USE_FUTEX
}
static void pthreadpool_register_threads(pthreadpool_t threadpool,
uint32_t num_threads) {
pthreadpool_fetch_add_acquire_release_uint32_t(
&threadpool->num_recruited_threads, num_threads);
}
static void pthreadpool_release(pthreadpool_t threadpool) {
// If we're the last pending thread of a "done" threadpool, signal for
// any waiting cleanup and bail before anything else can go wrong (e.g.
// the `threadpool` might get cleaned up).
if (pthreadpool_decrement_fetch_acquire_release_uint32_t(
&threadpool->num_recruited_threads) == 0) {
signal_num_recruited_threads(threadpool);
}
}
static void run_thread_function(struct pthreadpool* threadpool,
uint32_t thread_id) {
// Save the current FPU state, if requested.
const uint32_t flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
struct fpu_state saved_fpu_state = {0};
if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
saved_fpu_state = get_fpu_state();
disable_fpu_denormals();
}
// Call the job function.
const thread_function_t thread_function =
pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
thread_function(threadpool, &threadpool->threads[thread_id]);
// Restore the original FPU state in case we clobbered it.
if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
set_fpu_state(saved_fpu_state);
}
pthreadpool_log_debug("thread %u done working on job %u.", thread_id,
threadpool->job_id);
}
static uint32_t thread_wrap_up(struct pthreadpool* threadpool,
uint32_t thread_id) {
// Get the current state.
int32_t curr_active_threads =
pthreadpool_load_consume_int32_t(&threadpool->num_active_threads);
assert(curr_active_threads != 0);
assert(curr_active_threads != PTHREADPOOL_NUM_ACTIVE_THREADS_DONE);
// If we are the first thread to finish work, flip the state from
// "running" to "wrapping_up".
bool first_past_the_post = false;
while (curr_active_threads > 0 &&
!(first_past_the_post =
pthreadpool_compare_exchange_sequentially_consistent_int32_t(
&threadpool->num_active_threads, &curr_active_threads,
-(curr_active_threads - 1)))) {
}
if (first_past_the_post) {
pthreadpool_log_debug(
"thread %u switched num_active_threads from `%i' to '%i'.", thread_id,
curr_active_threads, -(curr_active_threads - 1));
curr_active_threads = -(curr_active_threads - 1);
} else {
curr_active_threads = pthreadpool_fetch_add_acquire_release_int32_t(
&threadpool->num_active_threads, 1) +
1;
}
// If we are the last active thread, let the calling thread know (unless we
// are the calling thread).
if (thread_id != 0 && curr_active_threads == 0) {
pthreadpool_log_debug("thread %u switched num_active_threads to 0.",
thread_id);
signal_work_is_done(threadpool);
}
return curr_active_threads;
}
static pthreadpool_thread_return_t thread_main(void* arg);
static void ensure_num_threads(struct pthreadpool* threadpool,
uint32_t thread_id) {
struct pthreadpool_executor* executor = &threadpool->executor;
/* If we're not using an executor, do nothing. */
if (!executor->num_threads) {
return;
}
// Get the number of required threads.
const uint32_t max_num_threads =
pthreadpool_load_acquire_size_t(&threadpool->threads_count);
// Start up to two other threads so that we fan out exponentially.
uint32_t num_threads_to_start = 2;
/* Schedule any missing threads for this threadpool. */
for (uint32_t tid = thread_id + 1;
num_threads_to_start && tid < max_num_threads; tid++) {
// Check whether this thread was active, and if not, schedule it.
struct thread_info* thread = &threadpool->threads[tid];
if (!pthreadpool_load_relaxed_uint32_t(&thread->is_active)) {
// Make sure there is still ongoing work (unless we're the main thread).
const int32_t curr_active_threads =
thread_id ? pthreadpool_load_consume_int32_t(
&threadpool->num_active_threads)
: 1;
if (curr_active_threads < 0 ||
curr_active_threads == PTHREADPOOL_NUM_ACTIVE_THREADS_DONE) {
return;
}
// Schedule this worker thread.
if (!pthreadpool_exchange_sequentially_consistent_uint32_t(
&thread->is_active, 1)) {
// Note that `threadpool->num_recruited_threads` is always non-zero
// because this function is only ever called by the main thread or
// another active thread, so we don't have to worry about activating a
// thread on an already stopped threadpool.
pthreadpool_register_threads(threadpool, 1);
executor->schedule(threadpool->executor_context, thread,
(void (*)(void*))thread_main);
num_threads_to_start--;
}
}
}
}
static pthreadpool_thread_return_t thread_main(void* arg) {
// Unpack the argument, i.e. extract the pointer to the `pthreadpool` from the
// provided pointer to this thread's `thread_info`.
struct thread_info* thread = (struct thread_info*)arg;
const uint32_t thread_id = thread->thread_number;
struct pthreadpool* threadpool =
(struct pthreadpool*)((uintptr_t)thread -
thread_id * sizeof(struct thread_info) -
offsetof(struct pthreadpool, threads));
uint32_t last_job_id = 0;
// Get the current threadpool state.
int32_t curr_active_threads =
pthreadpool_load_consume_int32_t(&threadpool->num_active_threads);
// Check whether we have to wake up any other threads.
if (curr_active_threads > 0 &&
curr_active_threads != PTHREADPOOL_NUM_ACTIVE_THREADS_DONE) {
ensure_num_threads(threadpool, thread_id);
curr_active_threads =
pthreadpool_load_consume_int32_t(&threadpool->num_active_threads);
}
pthreadpool_log_debug("thread %u entering main loop.", thread_id);
// Main loop.
while (true) {
if (curr_active_threads == PTHREADPOOL_NUM_ACTIVE_THREADS_DONE) {
// Signal our intent to leave the party.
pthreadpool_store_release_uint32_t(&thread->is_active, 0);
// Double-check that it really was time to go.
if (!((curr_active_threads =
pthreadpool_load_sequentially_consistent_int32_t(
&threadpool->num_active_threads)) > 0 &&
curr_active_threads != PTHREADPOOL_NUM_ACTIVE_THREADS_DONE) ||
pthreadpool_exchange_sequentially_consistent_uint32_t(
&thread->is_active, 1)) {
// We're done here.
pthreadpool_log_debug("thread %u leaving main loop.", thread_id);
break;
}
} else if (curr_active_threads <= 0) {
// If the state is `idle` or `wrapping_up`, wait for a state change to
// "running".
curr_active_threads = wait_on_num_active_threads(threadpool, thread_id);
} else {
// If the threadpool is currently running a job, try to join in on the
// work.
bool got_work = false;
while (!got_work && curr_active_threads > 0 &&
curr_active_threads != PTHREADPOOL_NUM_ACTIVE_THREADS_DONE) {
got_work = pthreadpool_compare_exchange_sequentially_consistent_int32_t(
&threadpool->num_active_threads, &curr_active_threads,
curr_active_threads + 1);
}
// Did we get a foot in?
if (got_work) {
assert(last_job_id < threadpool->job_id);
last_job_id = threadpool->job_id;
// Do we already have too many threads working on this?
const uint32_t max_active_threads =
pthreadpool_load_acquire_size_t(&threadpool->threads_count);
if (curr_active_threads < max_active_threads) {
// Do the needful.
pthreadpool_log_debug("thread %u working on job %u.", thread_id,
threadpool->job_id);
run_thread_function(threadpool, thread_id);
}
// Ring the bell on the way out.
curr_active_threads = thread_wrap_up(threadpool, thread_id);
}
}
}
// Release our hold on the threadpool.
pthreadpool_release(threadpool);
return 0;
}
static size_t get_num_cpus(void) {
#if PTHREADPOOL_USE_CPUINFO
return cpuinfo_get_processors_count();
#elif defined(_SC_NPROCESSORS_ONLN)
size_t num_cpus = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
#if defined(__EMSCRIPTEN_PTHREADS__)
/* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE
* option */
if (num_cpus >= 8) {
num_cpus = 8;
}
#endif
return num_cpus;
#elif defined(_WIN32)
SYSTEM_INFO system_info;
ZeroMemory(&system_info, sizeof(system_info));
GetSystemInfo(&system_info);
return (size_t)system_info.dwNumberOfProcessors;
#else
#error \
"Platform-specific implementation of sysconf(_SC_NPROCESSORS_ONLN) required"
#endif
}
struct pthreadpool* pthreadpool_create(size_t threads_count) {
return pthreadpool_create_v2(/*executor=*/NULL, /*executor_context=*/NULL,
threads_count);
}
struct pthreadpool* pthreadpool_create_v2(struct pthreadpool_executor* executor,
void* executor_context,
size_t max_num_threads) {
#if PTHREADPOOL_USE_CPUINFO
if (!cpuinfo_initialize()) {
return NULL;
}
#endif
if (max_num_threads == 0) {
max_num_threads = get_num_cpus();
}
const uint32_t num_threads =
executor
? min(max_num_threads, executor->num_threads(executor_context) + 1)
: max_num_threads;
struct pthreadpool* threadpool = pthreadpool_allocate(num_threads);
if (threadpool == NULL) {
return NULL;
}
if (executor) {
threadpool->executor = *executor;
threadpool->executor_context = executor_context;
}
threadpool->max_num_threads = num_threads;
threadpool->threads_count = num_threads;
for (size_t tid = 0; tid < num_threads; tid++) {
threadpool->threads[tid].thread_number = tid;
threadpool->threads[tid].threadpool = threadpool;
}
threadpool->num_active_threads = 0;
/* Initialize the execution mutex. */
pthreadpool_mutex_init(&threadpool->execution_mutex);
if (num_threads > 1) {
#if !PTHREADPOOL_USE_FUTEX
/* Initialize the condition variables and mutexes. */
pthreadpool_mutex_init(&threadpool->completion_mutex);
pthreadpool_cond_init(&threadpool->completion_condvar);
pthreadpool_mutex_init(&threadpool->num_active_threads_mutex);
pthreadpool_cond_init(&threadpool->num_active_threads_condvar);
#endif
/* If we weren't given an executor, start our own threads. */
if (!executor) {
/* Caller thread serves as worker #0. Thus, we create system threads
* starting with worker #1. */
pthreadpool_register_threads(threadpool, num_threads - 1);
for (size_t tid = 1; tid < num_threads; tid++) {
pthreadpool_thread_create(&threadpool->threads[tid].thread_object,
&thread_main, &threadpool->threads[tid]);
}
}
}
return threadpool;
}
PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
struct pthreadpool* threadpool, thread_function_t thread_function,
const void* params, size_t params_size, void* task, void* context,
size_t linear_range, uint32_t flags) {
assert(threadpool != NULL);
assert(thread_function != NULL);
assert(task != NULL);
assert(linear_range > 1);
/* Protect the global threadpool structures */
pthreadpool_mutex_lock(&threadpool->execution_mutex);
/* Make changes by other threads visible to this thread. */
pthreadpool_fence_acquire();
/* Make sure the threadpool is idle or done. */
const int32_t num_active_threads =
pthreadpool_load_consume_int32_t(&threadpool->num_active_threads);
assert(num_active_threads == 0 ||
num_active_threads == PTHREADPOOL_NUM_ACTIVE_THREADS_DONE);
/* Setup global arguments */
pthreadpool_store_relaxed_void_p(&threadpool->thread_function,
thread_function);
pthreadpool_store_relaxed_void_p(&threadpool->task, task);
pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
threadpool->job_id += 1;
if (params_size != 0) {
memcpy(&threadpool->params, params, params_size);
}
// How many threads should we parallelize over?
const uint32_t prev_num_threads = threadpool->threads_count;
const uint32_t num_threads = min(linear_range, prev_num_threads);
pthreadpool_store_relaxed_size_t(&threadpool->threads_count, num_threads);
pthreadpool_log_debug("main thread starting job %u with %u threads.",
(uint32_t)threadpool->job_id, num_threads);
/* Populate a `thread_info` struct for each thread */
const struct fxdiv_result_size_t range_params =
fxdiv_divide_size_t(linear_range, fxdiv_init_size_t(num_threads));
size_t range_start = 0;
for (size_t tid = 0; tid < num_threads; tid++) {
struct thread_info* thread = &threadpool->threads[tid];
const size_t range_length =
range_params.quotient + (size_t)(tid < range_params.remainder);
const size_t range_end = range_start + range_length;
pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
pthreadpool_store_relaxed_size_t(&thread->range_length, range_length);
/* The next subrange starts where the previous ended */
range_start = range_end;
}
// Populate sentinels.
for (size_t tid = num_threads; tid < threadpool->max_num_threads; tid++) {
struct thread_info* thread = &threadpool->threads[tid];
pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
pthreadpool_store_relaxed_size_t(&thread->range_end, range_start);
pthreadpool_store_relaxed_size_t(&thread->range_length, 0);
}
/* Make changes by this thread visible to other threads. */
pthreadpool_fence_release();
/* Set the number of active threads for this job (currently just this thread).
*/
pthreadpool_store_sequentially_consistent_int32_t(
&threadpool->num_active_threads, 1);
/* Wake up any thread waiting on a change of state. */
signal_num_active_threads(threadpool,
threadpool->max_num_threads - num_threads);
/* Make sure we have enough threads running. */
ensure_num_threads(threadpool, /*thread_id=*/0);
/* Do a bit of work ourselves, as thread zero. */
run_thread_function(threadpool, /*thread_id=*/0);
/* If we weren't the last one out, wait for any other threads to finish. */
if (thread_wrap_up(threadpool, /*thread_id=*/0)) {
/* Wait for any other threads to finish. */
wait_on_work_is_done(threadpool);
}
/* Make changes by other threads visible to this thread. */
pthreadpool_fence_acquire();
/* Re-set the number of threads in case it was reduced for this task. */
if (prev_num_threads != num_threads) {
pthreadpool_store_relaxed_size_t(&threadpool->threads_count,
prev_num_threads);
}
/* Unprotect the global threadpool structures now that we're done. */
pthreadpool_mutex_unlock(&threadpool->execution_mutex);
}
static void pthreadpool_release_all_threads(struct pthreadpool* threadpool) {
if (threadpool != NULL) {
assert(threadpool->num_active_threads == 0 ||
threadpool->num_active_threads ==
PTHREADPOOL_NUM_ACTIVE_THREADS_DONE);
// Set the state to "done".
pthreadpool_store_sequentially_consistent_int32_t(
&threadpool->num_active_threads, PTHREADPOOL_NUM_ACTIVE_THREADS_DONE);
pthreadpool_log_debug(
"main thread switching num_active_threads from %i to %i.", 0,
PTHREADPOOL_NUM_ACTIVE_THREADS_DONE);
/* Wake up any thread waiting on a change of state. */
signal_num_active_threads(threadpool, 0);
}
}
void pthreadpool_release_executor_threads(struct pthreadpool* threadpool) {
if (threadpool && threadpool->executor.num_threads) {
pthreadpool_release_all_threads(threadpool);
}
}
bool pthreadpool_update_executor(pthreadpool_t threadpool,
struct pthreadpool_executor* executor,
void* executor_context) {
/* Protect the global threadpool structures */
pthreadpool_mutex_lock(&threadpool->execution_mutex);
const bool res = threadpool->executor.num_threads != executor->num_threads ||
threadpool->executor.schedule != executor->schedule ||
threadpool->executor_context != executor_context;
if (res) {
pthreadpool_release_executor_threads(threadpool);
threadpool->executor = *executor;
threadpool->executor_context = executor_context;
}
/* Unprotect the global threadpool structures now that we're done. */
pthreadpool_mutex_unlock(&threadpool->execution_mutex);
return res;
}
void pthreadpool_destroy(struct pthreadpool* threadpool) {
if (threadpool != NULL) {
/* Tell all threads to stop. */
pthreadpool_release_all_threads(threadpool);
if (!threadpool->executor.num_threads) {
/* Wait until all threads return */
for (size_t thread = 1; thread < threadpool->max_num_threads; thread++) {
pthreadpool_thread_join(threadpool->threads[thread].thread_object,
NULL);
}
} else {
/* Wait for executor-provided threads. */
wait_on_num_recruited_threads(threadpool, 0);
}
/* Release resources */
pthreadpool_mutex_destroy(&threadpool->execution_mutex);
#if !PTHREADPOOL_USE_FUTEX
pthreadpool_mutex_destroy(&threadpool->num_active_threads_mutex);
pthreadpool_cond_destroy(&threadpool->num_active_threads_condvar);
pthreadpool_mutex_destroy(&threadpool->completion_mutex);
pthreadpool_cond_destroy(&threadpool->completion_condvar);
#endif
#if PTHREADPOOL_USE_CPUINFO
cpuinfo_deinitialize();
#endif
pthreadpool_log_debug("destroying threadpool at %p.", threadpool);
pthreadpool_deallocate(threadpool);
}
}