30 const std::function<
void(std::vector<T>&)>& operation) {
31 std::mutex exception_mutex;
32 std::exception_ptr exception_ptr =
nullptr;
33 std::vector<std::thread> threads;
35 for (
auto& chunk : chunks) {
36 threads.emplace_back([&chunk, &operation, &exception_mutex, &exception_ptr]() {
40 std::lock_guard<std::mutex> lock(exception_mutex);
42 exception_ptr = std::current_exception();
48 for (
auto& thread : threads) {
53 std::rethrow_exception(exception_ptr);
64 static std::vector<std::vector<U>>
map(
const std::vector<std::vector<T>>& chunks,
65 std::function<U(
const T&)> operation) {
66 std::vector<std::vector<U>> result(chunks.size());
67 std::vector<std::future<void>> futures;
69 for (
size_t i = 0; i < chunks.size(); ++i) {
70 futures.push_back(std::async(std::launch::async, [&, i]() {
71 result[i].reserve(chunks[i].size());
72 std::transform(chunks[i].begin(), chunks[i].end(), std::back_inserter(result[i]),
77 for (
auto& future : futures) {
91 static T
reduce(
const std::vector<std::vector<T>>& chunks,
92 std::function<T(
const T&,
const T&)> operation, T initial) {
93 std::vector<std::future<T>> futures;
95 for (
const auto& chunk : chunks) {
96 futures.push_back(std::async(std::launch::async, [&]() {
97 return std::accumulate(chunk.begin(), chunk.end(), T(), operation);
102 for (
auto& future : futures) {
103 result = operation(result, future.get());