Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Make standalone function make_metadata_file_sinks
Browse files Browse the repository at this point in the history
aliddell committed Jan 22, 2025
1 parent 42f78b6 commit 06250fa
Showing 7 changed files with 154 additions and 105 deletions.
120 changes: 117 additions & 3 deletions src/streaming/sink.cpp
Original file line number Diff line number Diff line change
@@ -27,6 +27,28 @@ bucket_exists(std::string_view bucket_name,
return bucket_exists;
}

std::vector<std::string>
construct_metadata_paths(ZarrVersion version)
{
std::vector<std::string> paths;
switch (version) {
case ZarrVersion_2:
paths.emplace_back(".zattrs");
paths.emplace_back(".zgroup");
paths.emplace_back("acquire.json");
break;
case ZarrVersion_3:
paths.emplace_back("zarr.json");
paths.emplace_back("acquire.json");
break;
default:
throw std::runtime_error("Invalid Zarr version " +
std::to_string(static_cast<int>(version)));
}

return paths;
}

bool
make_file_sinks(std::vector<std::string>& file_paths,
std::shared_ptr<zarr::ThreadPool> thread_pool,
@@ -36,6 +58,12 @@ make_file_sinks(std::vector<std::string>& file_paths,
return true;
}

const auto parents = zarr::get_parent_paths(file_paths);
if (!zarr::make_dirs(parents, thread_pool)) {
LOG_ERROR("Failed to make parent directories");
return false;
}

std::atomic<char> all_successful = 1;

const auto n_files = file_paths.size();
@@ -73,6 +101,72 @@ make_file_sinks(std::vector<std::string>& file_paths,

return (bool)all_successful;
}

bool
make_file_sinks(
const std::string& base_dir,
const std::vector<std::string>& file_paths,
std::shared_ptr<zarr::ThreadPool> thread_pool,
std::unordered_map<std::string, std::unique_ptr<zarr::Sink>>& sinks)
{
if (file_paths.empty()) {
return true;
}

// create the parent directories if they don't exist
const std::string prefix = base_dir.empty() ? "" : base_dir + "/";
{
std::vector<std::string> paths_with_parents(file_paths.size());
for (auto i = 0; i < file_paths.size(); ++i) {
paths_with_parents[i] = prefix + file_paths[i];
}

if (!zarr::make_dirs(zarr::get_parent_paths(paths_with_parents),
thread_pool)) {
LOG_ERROR("Failed to make parent directories");
return false;
}
}

std::atomic<char> all_successful = 1;

const auto n_files = file_paths.size();
std::latch latch(n_files);

sinks.clear();
for (const auto& filename : file_paths) {
sinks[filename] = nullptr;
std::unique_ptr<zarr::Sink>* psink = &sinks[filename];
const auto file_path = prefix + filename;

EXPECT(thread_pool->push_job(
[filename = file_path, psink, &latch, &all_successful](
std::string& err) -> bool {
bool success = false;

try {
if (all_successful) {
*psink =
std::make_unique<zarr::FileSink>(filename);
}
success = true;
} catch (const std::exception& exc) {
err = "Failed to create file '" + filename +
"': " + exc.what();
}

latch.count_down();
all_successful.fetch_and((char)success);

return success;
}),
"Failed to push job to thread pool.");
}

latch.wait();

return (bool)all_successful;
}
} // namespace

bool
@@ -167,6 +261,11 @@ zarr::make_dirs(const std::vector<std::string>& dir_paths,

std::latch latch(unique_paths.size());
for (const auto& path : unique_paths) {
if (path.empty()) {
latch.count_down();
continue;
}

auto job = [&path, &latch, &all_successful](std::string& err) {
bool success = true;
if (fs::is_directory(path)) {
@@ -241,9 +340,6 @@ zarr::make_data_file_sinks(std::string_view base_path,
try {
paths =
construct_data_paths(base_path, dimensions, parts_along_dimension);
const auto parents = get_parent_paths(paths);
EXPECT(make_dirs(parents, thread_pool),
"Failed to create directories.");
} catch (const std::exception& exc) {
LOG_ERROR("Failed to create dataset paths: ", exc.what());
return false;
@@ -252,6 +348,24 @@ zarr::make_data_file_sinks(std::string_view base_path,
return make_file_sinks(paths, thread_pool, part_sinks);
}

bool
zarr::make_metadata_file_sinks(
ZarrVersion version,
std::string_view base_path,
std::shared_ptr<ThreadPool> thread_pool,
std::unordered_map<std::string, std::unique_ptr<Sink>>& metadata_sinks)
{
if (base_path.starts_with("file://")) {
base_path = base_path.substr(7);
}
EXPECT(!base_path.empty(), "Base path must not be empty.");

std::vector<std::string> file_paths = construct_metadata_paths(version);

return make_file_sinks(
base_path.data(), file_paths, thread_pool, metadata_sinks);
}

std::unique_ptr<zarr::Sink>
zarr::make_s3_sink(std::string_view bucket_name,
std::string_view object_key,
68 changes: 0 additions & 68 deletions src/streaming/sink.creator.cpp
Original file line number Diff line number Diff line change
@@ -35,23 +35,6 @@ zarr::SinkCreator::make_data_sinks(
return make_s3_objects_(bucket_name, paths, part_sinks);
}

bool
zarr::SinkCreator::make_metadata_sinks(
size_t version,
std::string_view base_path,
std::unordered_map<std::string, std::unique_ptr<Sink>>& metadata_sinks)
{
if (base_path.starts_with("file://")) {
base_path = base_path.substr(7);
}
EXPECT(!base_path.empty(), "Base path must not be empty.");

std::vector<std::string> file_paths =
make_metadata_sink_paths_(version, base_path, true);

return make_files_(base_path.data(), file_paths, metadata_sinks);
}

bool
zarr::SinkCreator::make_metadata_sinks(
size_t version,
@@ -240,57 +223,6 @@ zarr::SinkCreator::make_dirs_(std::queue<std::string>& dir_paths)
return (bool)all_successful;
}

bool
zarr::SinkCreator::make_files_(
const std::string& base_dir,
const std::vector<std::string>& file_paths,
std::unordered_map<std::string, std::unique_ptr<Sink>>& sinks)
{
if (file_paths.empty()) {
return true;
}

std::atomic<char> all_successful = 1;

const auto n_files = file_paths.size();
std::latch latch(n_files);

sinks.clear();
for (const auto& filename : file_paths) {
sinks[filename] = nullptr;
std::unique_ptr<Sink>* psink = &sinks[filename];

const std::string prefix = base_dir.empty() ? "" : base_dir + "/";
const auto file_path = prefix + filename;

EXPECT(thread_pool_->push_job(
[filename = file_path, psink, &latch, &all_successful](
std::string& err) -> bool {
bool success = false;

try {
if (all_successful) {
*psink = std::make_unique<FileSink>(filename);
}
success = true;
} catch (const std::exception& exc) {
err = "Failed to create file '" + filename +
"': " + exc.what();
}

latch.count_down();
all_successful.fetch_and((char)success);

return success;
}),
"Failed to push job to thread pool.");
}

latch.wait();

return (bool)all_successful;
}

bool
zarr::SinkCreator::bucket_exists_(std::string_view bucket_name)
{
14 changes: 0 additions & 14 deletions src/streaming/sink.creator.hh
Original file line number Diff line number Diff line change
@@ -35,20 +35,6 @@ class SinkCreator
const DimensionPartsFun& parts_along_dimension,
std::vector<std::unique_ptr<Sink>>& part_sinks);

/**
* @brief Create a collection of metadata sinks for a Zarr dataset.
* @param[in] version The Zarr version.
* @param[in] base_path The base URI for the dataset.
* @param[out] metadata_sinks The sinks created, keyed by path.
* @return True iff all metadata sinks were created successfully.
* @throws std::runtime_error if @p base_uri is not valid, or if, for S3
* sinks, the bucket does not exist.
*/
[[nodiscard]] bool make_metadata_sinks(
size_t version,
std::string_view base_path,
std::unordered_map<std::string, std::unique_ptr<Sink>>& metadata_sinks);

/**
* @brief
* @param version
18 changes: 18 additions & 0 deletions src/streaming/sink.hh
Original file line number Diff line number Diff line change
@@ -94,6 +94,24 @@ make_data_file_sinks(std::string_view base_path,
std::shared_ptr<ThreadPool> thread_pool,
std::vector<std::unique_ptr<Sink>>& part_sinks);

/**
* @brief Create a collection of metadata sinks for a Zarr dataset.
* @param[in] version The Zarr version.
* @param[in] base_path The base URI for the dataset.
* @param[in] thread_pool Pointer to a thread pool object. Used to create files
* in parallel.
* @param[out] metadata_sinks The sinks created, keyed by path.
* @return True iff all metadata sinks were created successfully.
* @throws std::runtime_error if @p base_uri is not valid, or if, for S3
* sinks, the bucket does not exist.
*/
[[nodiscard]] bool
make_metadata_file_sinks(
ZarrVersion version,
std::string_view base_path,
std::shared_ptr<ThreadPool> thread_pool,
std::unordered_map<std::string, std::unique_ptr<Sink>>& metadata_sinks);

/**
* @brief Create a sink from an S3 bucket name and object key.
* @param bucket_name The name of the bucket in which the object is stored.
4 changes: 2 additions & 2 deletions src/streaming/zarr.stream.cpp
Original file line number Diff line number Diff line change
@@ -674,8 +674,8 @@ ZarrStream_s::create_metadata_sinks_()
return false;
}
} else {
if (!creator.make_metadata_sinks(
version_, store_path_, metadata_sinks_)) {
if (!make_metadata_file_sinks(
version_, store_path_, thread_pool_, metadata_sinks_)) {
set_error_("Error creating metadata sinks");
return false;
}
2 changes: 1 addition & 1 deletion tests/unit-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -17,8 +17,8 @@ set(tests
file-sink-write
s3-sink-write
s3-sink-write-multipart
sink-creator-make-metadata-sinks
make-data-sinks
make-metadata-sinks
array-writer-downsample-writer-config
array-writer-write-frame-to-chunks
zarrv2-writer-write-even
Original file line number Diff line number Diff line change
@@ -46,13 +46,11 @@ get_credentials(std::string& endpoint,
} // namespace

void
sink_creator_make_v2_metadata_sinks(
std::shared_ptr<zarr::ThreadPool> thread_pool)
make_v2_metadata_file_sinks(std::shared_ptr<zarr::ThreadPool> thread_pool)
{
zarr::SinkCreator sink_creator(thread_pool, nullptr);

std::unordered_map<std::string, std::unique_ptr<zarr::Sink>> metadata_sinks;
CHECK(sink_creator.make_metadata_sinks(2, test_dir, metadata_sinks));
CHECK(make_metadata_file_sinks(
ZarrVersion_2, test_dir, thread_pool, metadata_sinks));

CHECK(metadata_sinks.size() == 3);
CHECK(metadata_sinks.contains(".zattrs"));
@@ -64,7 +62,10 @@ sink_creator_make_v2_metadata_sinks(
sink.reset(nullptr); // close the file

fs::path file_path(test_dir + "/" + key);
CHECK(fs::is_regular_file(file_path));
EXPECT(fs::is_regular_file(file_path),
"Metadata file ",
file_path,
" not found.");
// cleanup
fs::remove(file_path);
}
@@ -81,8 +82,8 @@ sink_creator_make_v2_metadata_sinks(
zarr::SinkCreator sink_creator(thread_pool, connection_pool);

std::unordered_map<std::string, std::unique_ptr<zarr::Sink>> metadata_sinks;
CHECK(
sink_creator.make_metadata_sinks(2, bucket_name, test_dir, metadata_sinks));
CHECK(sink_creator.make_metadata_sinks(
2, bucket_name, test_dir, metadata_sinks));

CHECK(metadata_sinks.size() == 3);
CHECK(metadata_sinks.contains(".zattrs"));
@@ -110,13 +111,11 @@ sink_creator_make_v2_metadata_sinks(
}

void
sink_creator_make_v3_metadata_sinks(
std::shared_ptr<zarr::ThreadPool> thread_pool)
make_v3_metadata_file_sinks(std::shared_ptr<zarr::ThreadPool> thread_pool)
{
zarr::SinkCreator sink_creator(thread_pool, nullptr);

std::unordered_map<std::string, std::unique_ptr<zarr::Sink>> metadata_sinks;
CHECK(sink_creator.make_metadata_sinks(3, test_dir, metadata_sinks));
CHECK(make_metadata_file_sinks(
ZarrVersion_3, test_dir, thread_pool, metadata_sinks));

CHECK(metadata_sinks.size() == 2);
CHECK(metadata_sinks.contains("zarr.json"));
@@ -144,8 +143,8 @@ sink_creator_make_v3_metadata_sinks(
zarr::SinkCreator sink_creator(thread_pool, connection_pool);

std::unordered_map<std::string, std::unique_ptr<zarr::Sink>> metadata_sinks;
CHECK(
sink_creator.make_metadata_sinks(3, bucket_name, test_dir, metadata_sinks));
CHECK(sink_creator.make_metadata_sinks(
3, bucket_name, test_dir, metadata_sinks));

CHECK(metadata_sinks.size() == 2);
CHECK(metadata_sinks.contains("zarr.json"));
@@ -180,8 +179,8 @@ main()
[](const std::string& err) { LOG_ERROR("Failed: ", err.c_str()); });

try {
sink_creator_make_v2_metadata_sinks(thread_pool);
sink_creator_make_v3_metadata_sinks(thread_pool);
make_v2_metadata_file_sinks(thread_pool);
make_v3_metadata_file_sinks(thread_pool);
} catch (const std::exception& e) {
LOG_ERROR("Failed: ", e.what());
return 1;

0 comments on commit 06250fa

Please sign in to comment.