forked from taskflow/taskflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
text_pipeline.cpp
125 lines (109 loc) · 3.09 KB
/
text_pipeline.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// This program demonstrates how to create a pipeline scheduling framework
// that computes the maximum occurrence of the character for each input string.
//
// The pipeline has the following structure:
//
// o -> o -> o
// | |
// v v
// o -> o -> o
// | |
// v v
// o -> o -> o
// | |
// v v
// o -> o -> o (string -> unordered_map<char, size_t> -> pair<char, size_t>)
//
// Input:
// abade
// ddddf
// eefge
// xyzzd
// ijjjj
// jiiii
// kkijk
//
// Output:
// a:2
// d:4
// e:3
// z:2
// j:4
// i:4
// k:3
#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/pipeline.hpp>
// Function: format the map
std::string format_map(const std::unordered_map<char, size_t>& map) {
std::ostringstream oss;
for(const auto& [i, j] : map) {
oss << i << ':' << j << ' ';
}
return oss.str();
}
int main() {
tf::Taskflow taskflow("text-processing pipeline");
tf::Executor executor;
const size_t num_lines = 2;
// input data
std::vector<std::string> input = {
"abade",
"ddddf",
"eefge",
"xyzzd",
"ijjjj",
"jiiii",
"kkijk"
};
// custom data storage
using data_type = std::variant<
std::string, std::unordered_map<char, size_t>, std::pair<char, size_t>
>;
std::array<data_type, num_lines> buffer;
// the pipeline consists of three pipes (serial-parallel-serial)
// and up to two concurrent scheduling tokens
tf::Pipeline pl(num_lines,
// first pipe processes the input data
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
if(pf.token() == input.size()) {
pf.stop();
}
else {
buffer[pf.line()] = input[pf.token()];
printf("stage 1: input token = %s\n", input[pf.token()].c_str());
}
}},
// second pipe counts the frequency of each character
tf::Pipe{tf::PipeType::PARALLEL, [&](tf::Pipeflow& pf) {
std::unordered_map<char, size_t> map;
for(auto c : std::get<std::string>(buffer[pf.line()])) {
map[c]++;
}
buffer[pf.line()] = map;
printf("stage 2: map = %s\n", format_map(map).c_str());
}},
// third pipe reduces the most frequent character
tf::Pipe{tf::PipeType::SERIAL, [&buffer](tf::Pipeflow& pf) {
auto& map = std::get<std::unordered_map<char, size_t>>(buffer[pf.line()]);
auto sol = std::max_element(map.begin(), map.end(), [](auto& a, auto& b){
return a.second < b.second;
});
printf("stage 3: %c:%zu\n", sol->first, sol->second);
}}
);
// build the pipeline graph using composition
tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
.name("starting pipeline");
tf::Task task = taskflow.composed_of(pl)
.name("pipeline");
tf::Task stop = taskflow.emplace([](){ std::cout << "stopped\n"; })
.name("pipeline stopped");
// create task dependency
init.precede(task);
task.precede(stop);
// dump the pipeline graph structure (with composition)
taskflow.dump(std::cout);
// run the pipeline
executor.run(taskflow).wait();
return 0;
}