Advanced Chunk Processing Library 0.2.0
A comprehensive C++ library for advanced data chunking strategies and processing operations
Loading...
Searching...
No Matches
parallel_chunk::ParallelChunkProcessor< T > Class Template Reference

Parallel chunk processor for concurrent operations. More...

#include <parallel_chunk.hpp>

Public Types

using ChunkOperation = std::function< void(std::vector< T > &)>
 

Static Public Member Functions

template<typename U >
static std::vector< std::vector< U > > map (const std::vector< std::vector< T > > &chunks, std::function< U(const T &)> operation)
 Map operation over chunks in parallel.
 
static void process_chunks (std::vector< std::vector< T > > &chunks, const std::function< void(std::vector< T > &)> &operation)
 Process chunks in parallel.
 
static T reduce (const std::vector< std::vector< T > > &chunks, std::function< T(const T &, const T &)> operation, T initial)
 Reduce chunks in parallel.
 

Detailed Description

template<typename T>
class parallel_chunk::ParallelChunkProcessor< T >

Parallel chunk processor for concurrent operations.

Template Parameters
TThe type of elements to process

Definition at line 19 of file parallel_chunk.hpp.

Member Typedef Documentation

◆ ChunkOperation

template<typename T >
using parallel_chunk::ParallelChunkProcessor< T >::ChunkOperation = std::function<void(std::vector<T>&)>

Definition at line 21 of file parallel_chunk.hpp.

Member Function Documentation

◆ map()

template<typename T >
template<typename U >
static std::vector< std::vector< U > > parallel_chunk::ParallelChunkProcessor< T >::map ( const std::vector< std::vector< T > > &  chunks,
std::function< U(const T &)>  operation 
)
inlinestatic

Map operation over chunks in parallel.

Parameters
chunksInput chunks
operationMapping operation
Returns
Transformed chunks

Definition at line 64 of file parallel_chunk.hpp.

65 {
66 std::vector<std::vector<U>> result(chunks.size());
67 std::vector<std::future<void>> futures;
68
69 for (size_t i = 0; i < chunks.size(); ++i) {
70 futures.push_back(std::async(std::launch::async, [&, i]() {
71 result[i].reserve(chunks[i].size());
72 std::transform(chunks[i].begin(), chunks[i].end(), std::back_inserter(result[i]),
73 operation);
74 }));
75 }
76
77 for (auto& future : futures) {
78 future.wait();
79 }
80
81 return result;
82 }

◆ process_chunks()

template<typename T >
static void parallel_chunk::ParallelChunkProcessor< T >::process_chunks ( std::vector< std::vector< T > > &  chunks,
const std::function< void(std::vector< T > &)> &  operation 
)
inlinestatic

Process chunks in parallel.

Parameters
chunksVector of chunks to process
operationOperation to apply to each chunk
num_threadsNumber of threads to use (default: hardware concurrency)

Definition at line 29 of file parallel_chunk.hpp.

30 {
31 std::mutex exception_mutex;
32 std::exception_ptr exception_ptr = nullptr;
33 std::vector<std::thread> threads;
34
35 for (auto& chunk : chunks) {
36 threads.emplace_back([&chunk, &operation, &exception_mutex, &exception_ptr]() {
37 try {
38 operation(chunk);
39 } catch (...) {
40 std::lock_guard<std::mutex> lock(exception_mutex);
41 if (!exception_ptr) {
42 exception_ptr = std::current_exception();
43 }
44 }
45 });
46 }
47
48 for (auto& thread : threads) {
49 thread.join();
50 }
51
52 if (exception_ptr) {
53 std::rethrow_exception(exception_ptr);
54 }
55 }

Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().

◆ reduce()

template<typename T >
static T parallel_chunk::ParallelChunkProcessor< T >::reduce ( const std::vector< std::vector< T > > &  chunks,
std::function< T(const T &, const T &)>  operation,
initial 
)
inlinestatic

Reduce chunks in parallel.

Parameters
chunksInput chunks
operationReduction operation
initialInitial value
Returns
Reduced value

Definition at line 91 of file parallel_chunk.hpp.

92 {
93 std::vector<std::future<T>> futures;
94
95 for (const auto& chunk : chunks) {
96 futures.push_back(std::async(std::launch::async, [&]() {
97 return std::accumulate(chunk.begin(), chunk.end(), T(), operation);
98 }));
99 }
100
101 T result = initial;
102 for (auto& future : futures) {
103 result = operation(result, future.get());
104 }
105
106 return result;
107 }

Referenced by TEST_F(), TEST_F(), and TEST_F().


The documentation for this class was generated from the following file: