Advanced Chunk Processing Library 0.2.0
A comprehensive C++ library for advanced data chunking strategies and processing operations
Loading...
Searching...
No Matches
parallel_chunk_test.cpp
Go to the documentation of this file.
1#include "parallel_chunk.hpp"
2#include "test_base.hpp"
3#include <atomic>
4#include <numeric>
5
6using namespace parallel_chunk;
7
9protected:
10 std::vector<int> test_data;
11 std::vector<std::vector<int>> chunks;
12
13 void SetUp() override {
15
16 try {
17 test_data = {1, 2, 3, 4, 5, 6, 7, 8, 9};
18 chunks = {{1, 2, 3}, {4, 5, 6}, {7, 8, 9}};
19 } catch (const std::exception& e) {
20 FAIL() << "Setup failed: " << e.what();
21 }
22 }
23
24 void TearDown() override {
25 try {
26 test_data.clear();
27 chunks.clear();
28 } catch (...) {
29 // Ensure base teardown still happens
30 }
32 }
33};
34
36 std::unique_lock<std::mutex> lock(global_test_mutex_);
37
38 // Double each number in parallel
39 auto operation = [](std::vector<int>& chunk) {
40 for (int& val : chunk) {
41 val *= 2;
42 }
43 };
44
46
47 lock.unlock();
48
49 EXPECT_EQ(chunks[0], (std::vector<int>{2, 4, 6}));
50 EXPECT_EQ(chunks[1], (std::vector<int>{8, 10, 12}));
51 EXPECT_EQ(chunks[2], (std::vector<int>{14, 16, 18}));
52}
53
55 std::unique_lock<std::mutex> lock(global_test_mutex_);
56
57 // Map: Square each number
58 auto square = [](const int& x) { return x * x; };
59 auto squared_chunks = ParallelChunkProcessor<int>::map<int>(chunks, square);
60
61 // Reduce: Sum all squares
62 auto sum = [](const int& a, const int& b) { return a + b; };
63 int result = ParallelChunkProcessor<int>::reduce(squared_chunks, sum, 0);
64
65 lock.unlock();
66
67 // Expected: 1^2 + 2^2 + ... + 9^2 = 285
68 EXPECT_EQ(result, 285);
69}
70
72 std::vector<std::vector<int>> empty_data;
73 ParallelChunkProcessor<int>::process_chunks(empty_data, [](std::vector<int>& chunk) {
74 for (int& x : chunk) {
75 x *= 2;
76 }
77 });
78 EXPECT_TRUE(empty_data.empty());
79}
80
81TEST_F(ParallelChunkProcessorTest, SingleThreadProcessing) {
82 std::vector<std::vector<int>> chunked_data = {test_data};
83 ParallelChunkProcessor<int>::process_chunks(chunked_data, [](std::vector<int>& chunk) {
84 for (int& x : chunk) {
85 x *= 2;
86 }
87 });
88 EXPECT_EQ(chunked_data[0].size(), test_data.size());
89 for (size_t i = 0; i < test_data.size(); ++i) {
90 EXPECT_EQ(chunked_data[0][i], test_data[i] * 2);
91 }
92}
93
95 std::vector<std::vector<int>> chunked_data = {test_data};
96 EXPECT_THROW(
98 [](std::vector<int>& chunk) {
99 for (size_t i = 0; i < chunk.size(); ++i) {
100 int x = chunk[i];
101 if (x > 3)
102 throw std::runtime_error("test");
103 chunk[i] = x * 2;
104 }
105 }),
106 std::runtime_error);
107}
108
109TEST_F(ParallelChunkProcessorTest, MapWithEmptyChunks) {
110 std::vector<std::vector<int>> empty_chunks;
111 auto square = [](const int& x) { return x * x; };
112 auto result = ParallelChunkProcessor<int>::map<int>(empty_chunks, square);
113 EXPECT_TRUE(result.empty());
114}
115
116TEST_F(ParallelChunkProcessorTest, MapWithSingleElement) {
117 std::vector<std::vector<int>> single_chunk = {{42}};
118 auto square = [](const int& x) { return x * x; };
119 auto result = ParallelChunkProcessor<int>::map<int>(single_chunk, square);
120 EXPECT_EQ(result.size(), 1);
121 EXPECT_EQ(result[0][0], 1764); // 42^2
122}
123
124TEST_F(ParallelChunkProcessorTest, ReduceWithEmptyChunks) {
125 std::vector<std::vector<int>> empty_chunks;
126 auto sum = [](const int& a, const int& b) { return a + b; };
127 int result = ParallelChunkProcessor<int>::reduce(empty_chunks, sum, 0);
128 EXPECT_EQ(result, 0);
129}
130
131TEST_F(ParallelChunkProcessorTest, ReduceWithSingleElement) {
132 std::vector<std::vector<int>> single_chunk = {{42}};
133 auto sum = [](const int& a, const int& b) { return a + b; };
134 int result = ParallelChunkProcessor<int>::reduce(single_chunk, sum, 10);
135 EXPECT_EQ(result, 52); // 10 + 42
136}
137
139 auto double_it = [](const int& x) { return x * 2; };
140 auto multiply = [](const int& a, const int& b) { return a * b; };
141
142 // First, map to double all numbers
143 auto doubled = ParallelChunkProcessor<int>::map<int>(chunks, double_it);
144
145 // Flatten the chunks into a single vector for reduction
146 std::vector<int> flattened;
147 for (const auto& chunk : doubled) {
148 flattened.insert(flattened.end(), chunk.begin(), chunk.end());
149 }
150
151 // Perform reduction on the flattened vector
152 int result = std::accumulate(flattened.begin(), flattened.end(), 1, multiply);
153
154 // Calculate expected result: product of all numbers doubled
155 int expected = 1;
156 for (const auto& chunk : chunks) {
157 for (int x : chunk) {
158 expected *= (x * 2);
159 }
160 }
161 EXPECT_EQ(result, expected);
162}
163
164TEST_F(ParallelChunkProcessorTest, ConcurrentModification) {
165 std::vector<std::vector<int>> data = {{1, 2, 3}, {4, 5, 6}, {7, 8, 9}};
166
167 std::atomic<int> counter{0};
168 ParallelChunkProcessor<int>::process_chunks(data, [&counter](std::vector<int>& chunk) {
169 for (int& x : chunk) {
170 x *= 2;
171 counter++;
172 }
173 });
174
175 EXPECT_EQ(counter, 9); // Should have processed all elements
176 for (size_t i = 0; i < data.size(); i++) {
177 for (size_t j = 0; j < data[i].size(); j++) {
178 EXPECT_EQ(data[i][j], (i * 3 + j + 1) * 2);
179 }
180 }
181}
182
183TEST_F(ParallelChunkProcessorTest, ExceptionPropagation) {
184 std::vector<std::vector<int>> data = {{1}, {2}, {3}, {4}, {5}};
185 int exception_threshold = 2;
186
187 EXPECT_THROW(
188 {
190 data, [exception_threshold](std::vector<int>& chunk) {
191 if (chunk[0] > exception_threshold) {
192 throw std::runtime_error("Value too large");
193 }
194 std::cout << "chunk[0] before increment: " << chunk[0] << std::endl;
195 chunk[0] *= 2;
196 });
197 },
198 std::runtime_error);
199
200 // Check that some chunks were processed before exception
201 bool found_modified = false;
202 bool found_unmodified = false;
203 for (const auto& chunk : data) {
204 if (chunk[0] <= exception_threshold * 2 && chunk[0] % 2 == 0) {
205 found_modified = true;
206 }
207 if (chunk[0] > exception_threshold && chunk[0] == chunk[0] / 2 * 2) {
208 found_unmodified = true;
209 }
210 }
211 EXPECT_TRUE(found_modified || found_unmodified);
212}
void SetUp() override
Definition test_base.cpp:8
void TearDown() override
Definition test_base.cpp:15
std::vector< std::vector< int > > chunks
Parallel chunk processor for concurrent operations.
static T reduce(const std::vector< std::vector< T > > &chunks, std::function< T(const T &, const T &)> operation, T initial)
Reduce chunks in parallel.
static void process_chunks(std::vector< std::vector< T > > &chunks, const std::function< void(std::vector< T > &)> &operation)
Process chunks in parallel.
TEST_F(ParallelChunkProcessorTest, BasicProcessing)