Skip to content

Commit 5227edf

Browse files
Janosch MachowinskiJanosch Machowinski
Janosch Machowinski
authored and
Janosch Machowinski
committed
feat: Added compression_threads_nice_value option
Signed-off-by: Janosch Machowinski <[email protected]>
1 parent a710e87 commit 5227edf

File tree

4 files changed

+26
-10
lines changed

4 files changed

+26
-10
lines changed

rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp

+19-9
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,15 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
143143

144144
const uint64_t kDefaultCompressionQueueSize = 1;
145145
const uint64_t kDefaultCompressionQueueThreads = 4;
146+
const std::optional<int8_t> kDefaultCompressionQueueThreadsNiceValue = std::nullopt;
146147
};
147148

148149
TEST_F(SequentialCompressionWriterTest, open_throws_on_empty_storage_options_uri)
149150
{
150151
rosbag2_compression::CompressionOptions compression_options{
151152
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
152-
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
153+
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
154+
kDefaultCompressionQueueThreadsNiceValue};
153155
initializeWriter(compression_options);
154156

155157
EXPECT_THROW(
@@ -163,7 +165,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_bad_compression_format)
163165
{
164166
rosbag2_compression::CompressionOptions compression_options{
165167
"bad_format", rosbag2_compression::CompressionMode::FILE,
166-
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
168+
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
169+
kDefaultCompressionQueueThreadsNiceValue};
167170
initializeWriter(compression_options);
168171

169172
EXPECT_THROW(
@@ -175,7 +178,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_invalid_splitting_size)
175178
{
176179
rosbag2_compression::CompressionOptions compression_options{
177180
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
178-
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
181+
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
182+
kDefaultCompressionQueueThreadsNiceValue};
179183

180184
// Set minimum file size greater than max bagfile size option
181185
const uint64_t min_split_file_size = 10;
@@ -196,7 +200,8 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f
196200
{
197201
rosbag2_compression::CompressionOptions compression_options{
198202
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
199-
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
203+
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
204+
kDefaultCompressionQueueThreadsNiceValue};
200205
initializeWriter(compression_options);
201206

202207
auto tmp_dir = rcpputils::fs::temp_directory_path() / "path_not_empty";
@@ -211,7 +216,8 @@ TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor)
211216
{
212217
rosbag2_compression::CompressionOptions compression_options{
213218
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
214-
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
219+
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
220+
kDefaultCompressionQueueThreadsNiceValue};
215221
auto compression_factory = std::make_unique<StrictMock<MockCompressionFactory>>();
216222
EXPECT_CALL(*compression_factory, create_compressor(_)).Times(1);
217223

@@ -235,7 +241,8 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative
235241
DefaultTestCompressor,
236242
rosbag2_compression::CompressionMode::FILE,
237243
kDefaultCompressionQueueSize,
238-
kDefaultCompressionQueueThreads
244+
kDefaultCompressionQueueThreads,
245+
kDefaultCompressionQueueThreadsNiceValue
239246
};
240247

241248
initializeFakeFileStorage();
@@ -277,7 +284,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_open_and_
277284
DefaultTestCompressor,
278285
rosbag2_compression::CompressionMode::MESSAGE,
279286
0,
280-
kDefaultCompressionQueueThreads
287+
kDefaultCompressionQueueThreads,
288+
kDefaultCompressionQueueThreadsNiceValue
281289
};
282290

283291
initializeFakeFileStorage();
@@ -315,7 +323,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_bag_split
315323
DefaultTestCompressor,
316324
rosbag2_compression::CompressionMode::MESSAGE,
317325
0,
318-
kDefaultCompressionQueueThreads
326+
kDefaultCompressionQueueThreads,
327+
kDefaultCompressionQueueThreadsNiceValue
319328
};
320329

321330
initializeFakeFileStorage();
@@ -363,7 +372,8 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz
363372
DefaultTestCompressor,
364373
rosbag2_compression::CompressionMode::MESSAGE,
365374
kCompressionQueueSize,
366-
kDefaultCompressionQueueThreads
375+
kDefaultCompressionQueueThreads,
376+
kDefaultCompressionQueueThreadsNiceValue
367377
};
368378

369379
initializeFakeFileStorage();

rosbag2_transport/include/rosbag2_transport/record_options.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ struct RecordOptions
4242
std::string compression_format = "";
4343
uint64_t compression_queue_size = 1;
4444
uint64_t compression_threads = 0;
45+
int8_t compression_threads_nice_value = 0;
4546
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides{};
4647
bool include_hidden_topics = false;
4748
bool include_unpublished_topics = false;

rosbag2_transport/src/rosbag2_transport/reader_writer_factory.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ std::unique_ptr<rosbag2_cpp::Writer> ReaderWriterFactory::make_writer(
5353
record_options.compression_format,
5454
rosbag2_compression::compression_mode_from_string(record_options.compression_mode),
5555
record_options.compression_queue_size,
56-
record_options.compression_threads
56+
record_options.compression_threads,
57+
record_options.compression_threads_nice_value,
5758
};
5859
if (compression_options.compression_threads < 1) {
5960
compression_options.compression_threads = std::thread::hardware_concurrency();

rosbag2_transport/src/rosbag2_transport/record_options.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ Node convert<rosbag2_transport::RecordOptions>::encode(
5454
node["compression_format"] = record_options.compression_format;
5555
node["compression_queue_size"] = record_options.compression_queue_size;
5656
node["compression_threads"] = record_options.compression_threads;
57+
node["compression_threads_nice_value"] = record_options.compression_threads_nice_value;
5758
std::map<std::string, rosbag2_transport::Rosbag2QoS> qos_overrides(
5859
record_options.topic_qos_profile_overrides.begin(),
5960
record_options.topic_qos_profile_overrides.end());
@@ -80,6 +81,9 @@ bool convert<rosbag2_transport::RecordOptions>::decode(
8081
optional_assign<std::string>(node, "compression_format", record_options.compression_format);
8182
optional_assign<uint64_t>(node, "compression_queue_size", record_options.compression_queue_size);
8283
optional_assign<uint64_t>(node, "compression_threads", record_options.compression_threads);
84+
optional_assign<int8_t>(
85+
node, "compression_threads_nice_value",
86+
record_options.compression_threads_nice_value);
8387

8488
// yaml-cpp doesn't implement unordered_map
8589
std::map<std::string, rosbag2_transport::Rosbag2QoS> qos_overrides;

0 commit comments

Comments
 (0)