-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathon_transfer_with_sleeps.cpp
118 lines (101 loc) · 3.72 KB
/
on_transfer_with_sleeps.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
* Copyright (c) Lucian Radu Teodorescu
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* General context:
* - server application that processes images
* - execution contexts:
* - 1 dedicated thread for network I/O
* - N worker threads used for CPU-intensive work
* - M threads for auxiliary I/O
* - optional GPU context that may be used on some types of servers
*
* Specific problem description:
* - reading data from the socket before processing the request
* - reading of the data is done on the I/O context
* - no processing of the data needs to be done on the I/O context
*
* Example goals:
* - show how one can change the execution context
* - exemplify the use of `on` and `transfer` algorithms
*/
#include <array>
#include <cstring>
#include <iostream>
#include <string_view>
#include <chrono>
#include <random>
// Pull in the reference implementation of P2300:
#include <execution.hpp>
#include <thread>
// Use a thread pool
#include "schedulers/static_thread_pool.hpp"
namespace ex = std::execution;
size_t legacy_read_from_socket(int sock, char *buffer, size_t buffer_len) {
const char fake_data[] = "Hello, world!";
size_t sz = sizeof(fake_data);
size_t count = std::min(sz, buffer_len);
std::strncpy(buffer, fake_data, count);
return count;
}
void process_read_data(const char *read_data, size_t read_len, int i) {
std::random_device
rd; // Will be used to obtain a seed for the random number engine
std::mt19937 gen(rd()); // Standard mersenne_twister_engine seeded with rd()
std::uniform_int_distribution<> distrib(1, 6);
auto time = distrib(gen);
std::cout << "Processing " << i << " '"
<< std::string_view{read_data, read_len} << " in " << time
<< "s '\n";
std::this_thread::sleep_for(std::chrono::seconds(time));
std::cout << i << "done" << std::endl;
}
int main() {
// Create a thread pool and get a scheduler from it
example::static_thread_pool work_pool{8};
ex::scheduler auto work_sched = work_pool.get_scheduler();
example::static_thread_pool io_pool{1};
ex::scheduler auto io_sched = io_pool.get_scheduler();
std::array<std::byte, 16 * 1024> buffer;
// Fake a couple of requests
for (int i = 0; i < 10; i++) {
std::this_thread::sleep_for(std::chrono::seconds(1));
int sock = i;
auto buf = reinterpret_cast<char *>(&buffer[0]);
// A sender that just calls the legacy read function
auto snd_read =
ex::just(sock, buf, buffer.size()) | ex::then(legacy_read_from_socket);
// The entire flow
auto snd =
// start by reading data on the I/O thread
ex::on(io_sched,
std::move(snd_read)) // TODO: doesn't work apple-clang-13
// ex::on(io_sched, ex::just(size_t(13)))
// do the processing on the worker threads pool
| ex::transfer(work_sched)
// process the incoming data (on worker threads)
| ex::then(
[buf, i](int read_len) { process_read_data(buf, read_len, i); })
// done
;
// execute the whole flow asynchronously
ex::start_detached(std::move(snd));
}
using namespace std::chrono_literals;
std::this_thread::sleep_for(100ms);
io_pool.request_stop();
work_pool.request_stop();
return 0;
}