https://github.com/mtrebi/thread-pool/blob/master/include/ThreadPool.h
SaveQueue.h
cpp
#pragma once
#include <mutex>
#include <queue>
// Thread safe implementation of a Queue using an std::queue
template <typename T>
class SafeQueue {
private:
std::queue<T> m_queue;
std::mutex m_mutex;
public:
SafeQueue() {
}
SafeQueue(SafeQueue& other) {
//TODO:
}
~SafeQueue() {
}
bool empty() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
int size() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
void enqueue(T& t) {
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(t);
}
bool dequeue(T& t) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty()) {
return false;
}
t = std::move(m_queue.front());
m_queue.pop();
return true;
}
}; ThreadPool.h
cpp
#pragma once
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>
#include <vector>
#include "SafeQueue.h"
class ThreadPool {
private:
class ThreadWorker {
private:
int m_id;
ThreadPool * m_pool;
public:
ThreadWorker(ThreadPool * pool, const int id)
: m_pool(pool), m_id(id) {
}
void operator()() {
std::function<void()> func;
bool dequeued;
while (!m_pool->m_shutdown) {
{
std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
if (m_pool->m_queue.empty()) {
m_pool->m_conditional_lock.wait(lock);
}
dequeued = m_pool->m_queue.dequeue(func);
}
if (dequeued) {
func();
}
}
}
};
bool m_shutdown;
SafeQueue<std::function<void()>> m_queue;
std::vector<std::thread> m_threads;
std::mutex m_conditional_mutex;
std::condition_variable m_conditional_lock;
public:
ThreadPool(const int n_threads)
: m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false) {
}
ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool & operator=(const ThreadPool &) = delete;
ThreadPool & operator=(ThreadPool &&) = delete;
// Inits thread pool
void init() {
for (int i = 0; i < m_threads.size(); ++i) {
m_threads[i] = std::thread(ThreadWorker(this, i));
}
}
// Waits until threads finish their current task and shutdowns the pool
void shutdown() {
m_shutdown = true;
m_conditional_lock.notify_all();
for (int i = 0; i < m_threads.size(); ++i) {
if(m_threads[i].joinable()) {
m_threads[i].join();
}
}
}
// Submit a function to be executed asynchronously by the pool
template<typename F, typename...Args>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
// Create a function with bounded parameters ready to execute
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
// Encapsulate it into a shared ptr in order to be able to copy construct / assign
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
// Wrap packaged task into void function
std::function<void()> wrapper_func = [task_ptr]() {
(*task_ptr)();
};
// Enqueue generic wrapper function
m_queue.enqueue(wrapper_func);
// Wake up one thread if its waiting
m_conditional_lock.notify_one();
// Return future from promise
return task_ptr->get_future();
}
}; 例子:
cpp
#include <iostream>
#include <random>
#include "../include/ThreadPool.h"
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<int> dist(-1000, 1000);
auto rnd = std::bind(dist, mt);
void simulate_hard_computation() {
std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}
// Simple function that adds multiplies two numbers and prints the result
void multiply(const int a, const int b) {
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
}
// Same as before but now we have an output parameter
void multiply_output(int & out, const int a, const int b) {
simulate_hard_computation();
out = a * b;
std::cout << a << " * " << b << " = " << out << std::endl;
}
// Same as before but now we have an output parameter
int multiply_return(const int a, const int b) {
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
return res;
}
int main(int argc, char *argv[])
{
// Create pool with 3 threads
ThreadPool pool(3);
// Initialize pool
pool.init();
// Submit (partial) multiplication table
for (int i = 1; i < 3; ++i) {
for (int j = 1; j < 10; ++j) {
pool.submit(multiply, i, j);
}
}
// Submit function with output parameter passed by ref
int output_ref;
auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);
// Wait for multiplication output to finish
future1.get();
std::cout << "Last operation result is equals to " << output_ref << std::endl;
// Submit function with return parameter
auto future2 = pool.submit(multiply_return, 5, 3);
// Wait for multiplication output to finish
int res = future2.get();
std::cout << "Last operation result is equals to " << res << std::endl;
pool.shutdown();
return 0;
}