diff --git a/src/streaming/sink.cpp b/src/streaming/sink.cpp index 7bad2ae..f2faabc 100644 --- a/src/streaming/sink.cpp +++ b/src/streaming/sink.cpp @@ -27,6 +27,28 @@ bucket_exists(std::string_view bucket_name, return bucket_exists; } +std::vector +construct_metadata_paths(ZarrVersion version) +{ + std::vector paths; + switch (version) { + case ZarrVersion_2: + paths.emplace_back(".zattrs"); + paths.emplace_back(".zgroup"); + paths.emplace_back("acquire.json"); + break; + case ZarrVersion_3: + paths.emplace_back("zarr.json"); + paths.emplace_back("acquire.json"); + break; + default: + throw std::runtime_error("Invalid Zarr version " + + std::to_string(static_cast(version))); + } + + return paths; +} + bool make_file_sinks(std::vector& file_paths, std::shared_ptr thread_pool, @@ -36,6 +58,12 @@ make_file_sinks(std::vector& file_paths, return true; } + const auto parents = zarr::get_parent_paths(file_paths); + if (!zarr::make_dirs(parents, thread_pool)) { + LOG_ERROR("Failed to make parent directories"); + return false; + } + std::atomic all_successful = 1; const auto n_files = file_paths.size(); @@ -73,6 +101,72 @@ make_file_sinks(std::vector& file_paths, return (bool)all_successful; } + +bool +make_file_sinks( + const std::string& base_dir, + const std::vector& file_paths, + std::shared_ptr thread_pool, + std::unordered_map>& sinks) +{ + if (file_paths.empty()) { + return true; + } + + // create the parent directories if they don't exist + const std::string prefix = base_dir.empty() ? "" : base_dir + "/"; + { + std::vector paths_with_parents(file_paths.size()); + for (auto i = 0; i < file_paths.size(); ++i) { + paths_with_parents[i] = prefix + file_paths[i]; + } + + if (!zarr::make_dirs(zarr::get_parent_paths(paths_with_parents), + thread_pool)) { + LOG_ERROR("Failed to make parent directories"); + return false; + } + } + + std::atomic all_successful = 1; + + const auto n_files = file_paths.size(); + std::latch latch(n_files); + + sinks.clear(); + for (const auto& filename : file_paths) { + sinks[filename] = nullptr; + std::unique_ptr* psink = &sinks[filename]; + const auto file_path = prefix + filename; + + EXPECT(thread_pool->push_job( + [filename = file_path, psink, &latch, &all_successful]( + std::string& err) -> bool { + bool success = false; + + try { + if (all_successful) { + *psink = + std::make_unique(filename); + } + success = true; + } catch (const std::exception& exc) { + err = "Failed to create file '" + filename + + "': " + exc.what(); + } + + latch.count_down(); + all_successful.fetch_and((char)success); + + return success; + }), + "Failed to push job to thread pool."); + } + + latch.wait(); + + return (bool)all_successful; +} } // namespace bool @@ -167,6 +261,11 @@ zarr::make_dirs(const std::vector& dir_paths, std::latch latch(unique_paths.size()); for (const auto& path : unique_paths) { + if (path.empty()) { + latch.count_down(); + continue; + } + auto job = [&path, &latch, &all_successful](std::string& err) { bool success = true; if (fs::is_directory(path)) { @@ -241,9 +340,6 @@ zarr::make_data_file_sinks(std::string_view base_path, try { paths = construct_data_paths(base_path, dimensions, parts_along_dimension); - const auto parents = get_parent_paths(paths); - EXPECT(make_dirs(parents, thread_pool), - "Failed to create directories."); } catch (const std::exception& exc) { LOG_ERROR("Failed to create dataset paths: ", exc.what()); return false; @@ -252,6 +348,24 @@ zarr::make_data_file_sinks(std::string_view base_path, return make_file_sinks(paths, thread_pool, part_sinks); } +bool +zarr::make_metadata_file_sinks( + ZarrVersion version, + std::string_view base_path, + std::shared_ptr thread_pool, + std::unordered_map>& metadata_sinks) +{ + if (base_path.starts_with("file://")) { + base_path = base_path.substr(7); + } + EXPECT(!base_path.empty(), "Base path must not be empty."); + + std::vector file_paths = construct_metadata_paths(version); + + return make_file_sinks( + base_path.data(), file_paths, thread_pool, metadata_sinks); +} + std::unique_ptr zarr::make_s3_sink(std::string_view bucket_name, std::string_view object_key, diff --git a/src/streaming/sink.creator.cpp b/src/streaming/sink.creator.cpp index 5e7adef..c6e313a 100644 --- a/src/streaming/sink.creator.cpp +++ b/src/streaming/sink.creator.cpp @@ -35,23 +35,6 @@ zarr::SinkCreator::make_data_sinks( return make_s3_objects_(bucket_name, paths, part_sinks); } -bool -zarr::SinkCreator::make_metadata_sinks( - size_t version, - std::string_view base_path, - std::unordered_map>& metadata_sinks) -{ - if (base_path.starts_with("file://")) { - base_path = base_path.substr(7); - } - EXPECT(!base_path.empty(), "Base path must not be empty."); - - std::vector file_paths = - make_metadata_sink_paths_(version, base_path, true); - - return make_files_(base_path.data(), file_paths, metadata_sinks); -} - bool zarr::SinkCreator::make_metadata_sinks( size_t version, @@ -240,57 +223,6 @@ zarr::SinkCreator::make_dirs_(std::queue& dir_paths) return (bool)all_successful; } -bool -zarr::SinkCreator::make_files_( - const std::string& base_dir, - const std::vector& file_paths, - std::unordered_map>& sinks) -{ - if (file_paths.empty()) { - return true; - } - - std::atomic all_successful = 1; - - const auto n_files = file_paths.size(); - std::latch latch(n_files); - - sinks.clear(); - for (const auto& filename : file_paths) { - sinks[filename] = nullptr; - std::unique_ptr* psink = &sinks[filename]; - - const std::string prefix = base_dir.empty() ? "" : base_dir + "/"; - const auto file_path = prefix + filename; - - EXPECT(thread_pool_->push_job( - [filename = file_path, psink, &latch, &all_successful]( - std::string& err) -> bool { - bool success = false; - - try { - if (all_successful) { - *psink = std::make_unique(filename); - } - success = true; - } catch (const std::exception& exc) { - err = "Failed to create file '" + filename + - "': " + exc.what(); - } - - latch.count_down(); - all_successful.fetch_and((char)success); - - return success; - }), - "Failed to push job to thread pool."); - } - - latch.wait(); - - return (bool)all_successful; -} - bool zarr::SinkCreator::bucket_exists_(std::string_view bucket_name) { diff --git a/src/streaming/sink.creator.hh b/src/streaming/sink.creator.hh index 0876e15..2c47bc5 100644 --- a/src/streaming/sink.creator.hh +++ b/src/streaming/sink.creator.hh @@ -35,20 +35,6 @@ class SinkCreator const DimensionPartsFun& parts_along_dimension, std::vector>& part_sinks); - /** - * @brief Create a collection of metadata sinks for a Zarr dataset. - * @param[in] version The Zarr version. - * @param[in] base_path The base URI for the dataset. - * @param[out] metadata_sinks The sinks created, keyed by path. - * @return True iff all metadata sinks were created successfully. - * @throws std::runtime_error if @p base_uri is not valid, or if, for S3 - * sinks, the bucket does not exist. - */ - [[nodiscard]] bool make_metadata_sinks( - size_t version, - std::string_view base_path, - std::unordered_map>& metadata_sinks); - /** * @brief * @param version diff --git a/src/streaming/sink.hh b/src/streaming/sink.hh index 9b21545..1731be7 100644 --- a/src/streaming/sink.hh +++ b/src/streaming/sink.hh @@ -94,6 +94,24 @@ make_data_file_sinks(std::string_view base_path, std::shared_ptr thread_pool, std::vector>& part_sinks); +/** + * @brief Create a collection of metadata sinks for a Zarr dataset. + * @param[in] version The Zarr version. + * @param[in] base_path The base URI for the dataset. + * @param[in] thread_pool Pointer to a thread pool object. Used to create files + * in parallel. + * @param[out] metadata_sinks The sinks created, keyed by path. + * @return True iff all metadata sinks were created successfully. + * @throws std::runtime_error if @p base_uri is not valid, or if, for S3 + * sinks, the bucket does not exist. + */ +[[nodiscard]] bool +make_metadata_file_sinks( + ZarrVersion version, + std::string_view base_path, + std::shared_ptr thread_pool, + std::unordered_map>& metadata_sinks); + /** * @brief Create a sink from an S3 bucket name and object key. * @param bucket_name The name of the bucket in which the object is stored. diff --git a/src/streaming/zarr.stream.cpp b/src/streaming/zarr.stream.cpp index d24e760..704e387 100644 --- a/src/streaming/zarr.stream.cpp +++ b/src/streaming/zarr.stream.cpp @@ -674,8 +674,8 @@ ZarrStream_s::create_metadata_sinks_() return false; } } else { - if (!creator.make_metadata_sinks( - version_, store_path_, metadata_sinks_)) { + if (!make_metadata_file_sinks( + version_, store_path_, thread_pool_, metadata_sinks_)) { set_error_("Error creating metadata sinks"); return false; } diff --git a/tests/unit-tests/CMakeLists.txt b/tests/unit-tests/CMakeLists.txt index 8c7843f..94ed400 100644 --- a/tests/unit-tests/CMakeLists.txt +++ b/tests/unit-tests/CMakeLists.txt @@ -17,8 +17,8 @@ set(tests file-sink-write s3-sink-write s3-sink-write-multipart - sink-creator-make-metadata-sinks make-data-sinks + make-metadata-sinks array-writer-downsample-writer-config array-writer-write-frame-to-chunks zarrv2-writer-write-even diff --git a/tests/unit-tests/sink-creator-make-metadata-sinks.cpp b/tests/unit-tests/make-metadata-sinks.cpp similarity index 87% rename from tests/unit-tests/sink-creator-make-metadata-sinks.cpp rename to tests/unit-tests/make-metadata-sinks.cpp index 26317b1..799c89d 100644 --- a/tests/unit-tests/sink-creator-make-metadata-sinks.cpp +++ b/tests/unit-tests/make-metadata-sinks.cpp @@ -46,13 +46,11 @@ get_credentials(std::string& endpoint, } // namespace void -sink_creator_make_v2_metadata_sinks( - std::shared_ptr thread_pool) +make_v2_metadata_file_sinks(std::shared_ptr thread_pool) { - zarr::SinkCreator sink_creator(thread_pool, nullptr); - std::unordered_map> metadata_sinks; - CHECK(sink_creator.make_metadata_sinks(2, test_dir, metadata_sinks)); + CHECK(make_metadata_file_sinks( + ZarrVersion_2, test_dir, thread_pool, metadata_sinks)); CHECK(metadata_sinks.size() == 3); CHECK(metadata_sinks.contains(".zattrs")); @@ -64,7 +62,10 @@ sink_creator_make_v2_metadata_sinks( sink.reset(nullptr); // close the file fs::path file_path(test_dir + "/" + key); - CHECK(fs::is_regular_file(file_path)); + EXPECT(fs::is_regular_file(file_path), + "Metadata file ", + file_path, + " not found."); // cleanup fs::remove(file_path); } @@ -81,8 +82,8 @@ sink_creator_make_v2_metadata_sinks( zarr::SinkCreator sink_creator(thread_pool, connection_pool); std::unordered_map> metadata_sinks; - CHECK( - sink_creator.make_metadata_sinks(2, bucket_name, test_dir, metadata_sinks)); + CHECK(sink_creator.make_metadata_sinks( + 2, bucket_name, test_dir, metadata_sinks)); CHECK(metadata_sinks.size() == 3); CHECK(metadata_sinks.contains(".zattrs")); @@ -110,13 +111,11 @@ sink_creator_make_v2_metadata_sinks( } void -sink_creator_make_v3_metadata_sinks( - std::shared_ptr thread_pool) +make_v3_metadata_file_sinks(std::shared_ptr thread_pool) { - zarr::SinkCreator sink_creator(thread_pool, nullptr); - std::unordered_map> metadata_sinks; - CHECK(sink_creator.make_metadata_sinks(3, test_dir, metadata_sinks)); + CHECK(make_metadata_file_sinks( + ZarrVersion_3, test_dir, thread_pool, metadata_sinks)); CHECK(metadata_sinks.size() == 2); CHECK(metadata_sinks.contains("zarr.json")); @@ -144,8 +143,8 @@ sink_creator_make_v3_metadata_sinks( zarr::SinkCreator sink_creator(thread_pool, connection_pool); std::unordered_map> metadata_sinks; - CHECK( - sink_creator.make_metadata_sinks(3, bucket_name, test_dir, metadata_sinks)); + CHECK(sink_creator.make_metadata_sinks( + 3, bucket_name, test_dir, metadata_sinks)); CHECK(metadata_sinks.size() == 2); CHECK(metadata_sinks.contains("zarr.json")); @@ -180,8 +179,8 @@ main() [](const std::string& err) { LOG_ERROR("Failed: ", err.c_str()); }); try { - sink_creator_make_v2_metadata_sinks(thread_pool); - sink_creator_make_v3_metadata_sinks(thread_pool); + make_v2_metadata_file_sinks(thread_pool); + make_v3_metadata_file_sinks(thread_pool); } catch (const std::exception& e) { LOG_ERROR("Failed: ", e.what()); return 1;