Advanced Chunk Processing Library 0.2.0
A comprehensive C++ library for advanced data chunking strategies and processing operations
Loading...
Searching...
No Matches
parallel_chunk.hpp
Go to the documentation of this file.
1#ifndef PARALLEL_CHUNK_HPP
2#define PARALLEL_CHUNK_HPP
3
4#include "chunk.hpp"
5#include <exception>
6#include <functional>
7#include <future>
8#include <mutex>
9#include <thread>
10#include <vector>
11
12namespace parallel_chunk {
13
14/**
15 * @brief Parallel chunk processor for concurrent operations
16 * @tparam T The type of elements to process
17 */
18template <typename T>
20public:
21 using ChunkOperation = std::function<void(std::vector<T>&)>;
22
23 /**
24 * @brief Process chunks in parallel
25 * @param chunks Vector of chunks to process
26 * @param operation Operation to apply to each chunk
27 * @param num_threads Number of threads to use (default: hardware concurrency)
28 */
29 static void process_chunks(std::vector<std::vector<T>>& chunks,
30 const std::function<void(std::vector<T>&)>& operation) {
31 std::mutex exception_mutex;
32 std::exception_ptr exception_ptr = nullptr;
33 std::vector<std::thread> threads;
34
35 for (auto& chunk : chunks) {
36 threads.emplace_back([&chunk, &operation, &exception_mutex, &exception_ptr]() {
37 try {
38 operation(chunk);
39 } catch (...) {
40 std::lock_guard<std::mutex> lock(exception_mutex);
41 if (!exception_ptr) {
42 exception_ptr = std::current_exception();
43 }
44 }
45 });
46 }
47
48 for (auto& thread : threads) {
49 thread.join();
50 }
51
52 if (exception_ptr) {
53 std::rethrow_exception(exception_ptr);
54 }
55 }
56
57 /**
58 * @brief Map operation over chunks in parallel
59 * @param chunks Input chunks
60 * @param operation Mapping operation
61 * @return Transformed chunks
62 */
63 template <typename U>
64 static std::vector<std::vector<U>> map(const std::vector<std::vector<T>>& chunks,
65 std::function<U(const T&)> operation) {
66 std::vector<std::vector<U>> result(chunks.size());
67 std::vector<std::future<void>> futures;
68
69 for (size_t i = 0; i < chunks.size(); ++i) {
70 futures.push_back(std::async(std::launch::async, [&, i]() {
71 result[i].reserve(chunks[i].size());
72 std::transform(chunks[i].begin(), chunks[i].end(), std::back_inserter(result[i]),
73 operation);
74 }));
75 }
76
77 for (auto& future : futures) {
78 future.wait();
79 }
80
81 return result;
82 }
83
84 /**
85 * @brief Reduce chunks in parallel
86 * @param chunks Input chunks
87 * @param operation Reduction operation
88 * @param initial Initial value
89 * @return Reduced value
90 */
91 static T reduce(const std::vector<std::vector<T>>& chunks,
92 std::function<T(const T&, const T&)> operation, T initial) {
93 std::vector<std::future<T>> futures;
94
95 for (const auto& chunk : chunks) {
96 futures.push_back(std::async(std::launch::async, [&]() {
97 return std::accumulate(chunk.begin(), chunk.end(), T(), operation);
98 }));
99 }
100
101 T result = initial;
102 for (auto& future : futures) {
103 result = operation(result, future.get());
104 }
105
106 return result;
107 }
108};
109
110} // namespace parallel_chunk
111
112#endif // PARALLEL_CHUNK_HPP
Parallel chunk processor for concurrent operations.
static std::vector< std::vector< U > > map(const std::vector< std::vector< T > > &chunks, std::function< U(const T &)> operation)
Map operation over chunks in parallel.
std::function< void(std::vector< T > &)> ChunkOperation
static T reduce(const std::vector< std::vector< T > > &chunks, std::function< T(const T &, const T &)> operation, T initial)
Reduce chunks in parallel.
static void process_chunks(std::vector< std::vector< T > > &chunks, const std::function< void(std::vector< T > &)> &operation)
Process chunks in parallel.