From e8abb0c863332445fe50d2f564602b5e11044a70 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Tue, 12 May 2026 14:53:04 +0800 Subject: [PATCH 1/3] feat(manifest): add ManifestFilterManager and ManifestMergeManager Implement two manifest management classes for table write operations: - ManifestFilterManager: filters manifest entries by row filter expression, file path, or partition value; supports FailMissingDeletePaths validation. Rewrites manifests that contain matching files, marking entries as DELETED; passes through manifests that cannot contain matching files unchanged. - ManifestMergeManager: merges small manifests using greedy bin-packing, grouping by partition_spec_id (manifests with different specs are never merged). Oversized manifests pass through unchanged. ADDED entries from prior manifests become EXISTING when merged (matching Java semantics). Also fix Schema destructor declaration to resolve a clang compilation issue with incomplete SchemaCache type in unique_ptr (PIMPL pattern fix). --- src/iceberg/CMakeLists.txt | 2 + .../manifest/manifest_filter_manager.cc | 246 ++++++++++++++++ .../manifest/manifest_filter_manager.h | 168 +++++++++++ .../manifest/manifest_merge_manager.cc | 139 +++++++++ src/iceberg/manifest/manifest_merge_manager.h | 91 ++++++ src/iceberg/manifest/meson.build | 2 + src/iceberg/meson.build | 2 + src/iceberg/schema.cc | 2 + src/iceberg/schema.h | 2 + src/iceberg/test/CMakeLists.txt | 2 + .../test/manifest_filter_manager_test.cc | 272 ++++++++++++++++++ .../test/manifest_merge_manager_test.cc | 246 ++++++++++++++++ 12 files changed, 1174 insertions(+) create mode 100644 src/iceberg/manifest/manifest_filter_manager.cc create mode 100644 src/iceberg/manifest/manifest_filter_manager.h create mode 100644 src/iceberg/manifest/manifest_merge_manager.cc create mode 100644 src/iceberg/manifest/manifest_merge_manager.h create mode 100644 src/iceberg/test/manifest_filter_manager_test.cc create mode 100644 src/iceberg/test/manifest_merge_manager_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index c4e193b89..76407a94c 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -45,8 +45,10 @@ set(ICEBERG_SOURCES location_provider.cc manifest/manifest_adapter.cc manifest/manifest_entry.cc + manifest/manifest_filter_manager.cc manifest/manifest_group.cc manifest/manifest_list.cc + manifest/manifest_merge_manager.cc manifest/manifest_reader.cc manifest/manifest_util.cc manifest/manifest_writer.cc diff --git a/src/iceberg/manifest/manifest_filter_manager.cc b/src/iceberg/manifest/manifest_filter_manager.cc new file mode 100644 index 000000000..67fd737bf --- /dev/null +++ b/src/iceberg/manifest/manifest_filter_manager.cc @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_filter_manager.h" + +#include +#include +#include + +#include "iceberg/expression/inclusive_metrics_evaluator.h" +#include "iceberg/expression/manifest_evaluator.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +ManifestFilterManager::ManifestFilterManager(ManifestContent content, + std::shared_ptr file_io) + : manifest_content_(content), file_io_(std::move(file_io)) {} + +void ManifestFilterManager::DeleteByRowFilter(std::shared_ptr expr, + bool case_sensitive) { + delete_exprs_.push_back({std::move(expr), case_sensitive}); +} + +void ManifestFilterManager::DeleteFile(std::string_view path) { + std::string p(path); + delete_paths_.insert(p); + pending_paths_.insert(std::move(p)); +} + +void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues partition) { + drop_partitions_.add(spec_id, std::move(partition)); +} + +void ManifestFilterManager::FailMissingDeletePaths() { + fail_missing_delete_paths_ = true; +} + +bool ManifestFilterManager::DeletesFiles() const { + return !delete_exprs_.empty() || !delete_paths_.empty() || !drop_partitions_.empty(); +} + +bool ManifestFilterManager::CanContainDroppedFiles() const { + return !delete_paths_.empty(); +} + +bool ManifestFilterManager::CanContainDroppedPartitions(const ManifestFile& manifest) { + if (drop_partitions_.empty()) return false; + // Without a partition filter helper, we conservatively say yes. + // A full implementation would use ManifestFileUtil::canContainAny; for now + // we open the manifest and let per-entry checks decide. + (void)manifest; + return true; +} + +bool ManifestFilterManager::CanContainExpressionDeletes(const ManifestFile& manifest, + const TableMetadata& metadata) { + if (delete_exprs_.empty()) return false; + int32_t spec_id = manifest.partition_spec_id; + for (size_t i = 0; i < delete_exprs_.size(); ++i) { + auto* evaluator_ptr = GetManifestEvaluator(metadata, spec_id, delete_exprs_[i]) + .value_or(nullptr); + if (evaluator_ptr == nullptr) return true; // conservative on error + auto result = evaluator_ptr->Evaluate(manifest); + if (!result.has_value() || result.value()) return true; + } + return false; +} + +bool ManifestFilterManager::CanContainDeletedFiles(const ManifestFile& manifest, + const TableMetadata& metadata) { + // A manifest with no live files cannot contain files to delete. + bool has_live = (manifest.added_files_count.value_or(0) > 0) || + (manifest.existing_files_count.value_or(0) > 0); + if (!has_live) return false; + + return CanContainDroppedFiles() || CanContainExpressionDeletes(manifest, metadata) || + CanContainDroppedPartitions(manifest); +} + +Result ManifestFilterManager::GetManifestEvaluator( + const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) { + auto& vec = manifest_evaluator_cache_[spec_id]; + size_t idx = &de - delete_exprs_.data(); + if (idx >= vec.size()) { + vec.resize(delete_exprs_.size()); + } + if (!vec[idx]) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE(vec[idx], ManifestEvaluator::MakeRowFilter( + de.expr, spec, *schema, de.case_sensitive)); + } + return vec[idx].get(); +} + +Result ManifestFilterManager::GetMetricsEvaluator( + const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) { + auto& vec = metrics_evaluator_cache_[spec_id]; + size_t idx = &de - delete_exprs_.data(); + if (idx >= vec.size()) { + vec.resize(delete_exprs_.size()); + } + if (!vec[idx]) { + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE(vec[idx], + InclusiveMetricsEvaluator::Make(de.expr, *schema, + de.case_sensitive)); + } + return vec[idx].get(); +} + +bool ManifestFilterManager::ShouldDelete(const ManifestEntry& entry, + const TableMetadata& metadata, + int32_t manifest_spec_id) { + if (!entry.data_file) return false; + const DataFile& file = *entry.data_file; + + // Path-based check + if (delete_paths_.count(file.file_path)) { + pending_paths_.erase(file.file_path); + return true; + } + + // Partition-drop check + int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id); + if (drop_partitions_.contains(spec_id, file.partition)) { + return true; + } + + // Expression-based check (inclusive metrics) + for (const auto& de : delete_exprs_) { + auto* eval = GetMetricsEvaluator(metadata, spec_id, de).value_or(nullptr); + if (eval == nullptr) return true; // conservative on error + auto result = eval->Evaluate(file); + if (!result.has_value() || result.value()) return true; + } + + return false; +} + +Result> ManifestFilterManager::FilterManifests( + const TableMetadata& metadata, const std::shared_ptr& base_snapshot, + const ManifestWriterFactory& writer_factory) { + // No base snapshot → nothing to filter + if (!base_snapshot) return std::vector{}; + + // Load the relevant manifests from the manifest list + ICEBERG_ASSIGN_OR_RAISE(auto list_reader, + ManifestListReader::Make(base_snapshot->manifest_list, file_io_)); + ICEBERG_ASSIGN_OR_RAISE(auto all_manifests, list_reader->Files()); + + // Keep only manifests for this manager's content type + std::vector manifests; + manifests.reserve(all_manifests.size()); + for (const auto& m : all_manifests) { + if (m.content == manifest_content_) manifests.push_back(m); + } + + // No conditions registered → return unchanged + if (!DeletesFiles()) return manifests; + + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + + std::vector result; + result.reserve(manifests.size()); + + for (const auto& manifest : manifests) { + // Fast path: metadata skip + if (!CanContainDeletedFiles(manifest, metadata)) { + result.push_back(manifest); + continue; + } + + int32_t spec_id = manifest.partition_spec_id; + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id)); + + // Read all live entries from the manifest + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, file_io_, schema, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); + + // Check whether any entry should be deleted + bool has_deletes = false; + for (const auto& entry : entries) { + if (ShouldDelete(entry, metadata, spec_id)) { + has_deletes = true; + break; + } + } + + if (!has_deletes) { + result.push_back(manifest); + continue; + } + + // Rewrite the manifest with deleted entries marked + ICEBERG_ASSIGN_OR_RAISE(auto writer, writer_factory(spec_id, manifest_content_)); + for (const auto& entry : entries) { + if (ShouldDelete(entry, metadata, spec_id)) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry)); + } else { + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry)); + } + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto filtered_manifest, writer->ToManifestFile()); + result.push_back(std::move(filtered_manifest)); + } + + // Validate that all registered delete paths were found + if (fail_missing_delete_paths_ && !pending_paths_.empty()) { + std::string missing; + for (const auto& p : pending_paths_) { + if (!missing.empty()) missing += ", "; + missing += p; + } + return InvalidArgument("Missing delete paths: {}", missing); + } + + return result; +} + +} // namespace iceberg diff --git a/src/iceberg/manifest/manifest_filter_manager.h b/src/iceberg/manifest/manifest_filter_manager.h new file mode 100644 index 000000000..546f7c0fc --- /dev/null +++ b/src/iceberg/manifest/manifest_filter_manager.h @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/manifest_filter_manager.h +/// Filters an existing snapshot's manifest list, marking data files as DELETED +/// or EXISTING based on row-filter expressions, exact path deletes, and partition drops. + +#include +#include +#include +#include +#include +#include + +#include "iceberg/expression/inclusive_metrics_evaluator.h" +#include "iceberg/expression/manifest_evaluator.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/util/partition_value_util.h" + +namespace iceberg { + +/// \brief Factory type for creating ManifestWriter instances during filtering/merging. +/// +/// The factory receives the partition spec ID (to look up the spec) and the manifest +/// content type, and returns a new ManifestWriter ready for writing. The caller +/// (i.e. MergingSnapshotUpdate in PR2) captures metadata, FileIO, and snapshot ID +/// inside the lambda. +using ManifestWriterFactory = + std::function>(int32_t spec_id, + ManifestContent content)>; + +/// \brief Filters an existing snapshot's manifest list. +/// +/// The manager accumulates delete conditions incrementally, then applies them all +/// at once in a single FilterManifests() call. Manifests that contain no deleted +/// entries are returned unchanged (no I/O). Manifests that do contain deleted +/// entries are rewritten with those entries marked DELETED. +/// +/// The manager is content-agnostic: pass ManifestContent::kData to process data +/// manifests, or ManifestContent::kDeletes to process delete manifests. +/// +/// \note This class is non-copyable and non-movable. +class ICEBERG_EXPORT ManifestFilterManager { + public: + ManifestFilterManager(ManifestContent content, std::shared_ptr file_io); + + ManifestFilterManager(const ManifestFilterManager&) = delete; + ManifestFilterManager& operator=(const ManifestFilterManager&) = delete; + + /// \brief Register a row-filter expression. + /// + /// Any manifest entry whose column metrics indicate the file may satisfy the + /// expression will be marked DELETED. + /// + /// \param expr The expression to match files against + /// \param case_sensitive Whether field name matching is case-sensitive + void DeleteByRowFilter(std::shared_ptr expr, + bool case_sensitive = true); + + /// \brief Register an exact file path for deletion. + /// + /// Any manifest entry whose file_path matches this path will be marked DELETED. + /// + /// \param path The exact file path to delete + void DeleteFile(std::string_view path); + + /// \brief Register a partition for dropping. + /// + /// Any manifest entry whose (spec_id, partition) pair matches will be marked DELETED. + /// + /// \param spec_id The partition spec ID + /// \param partition The partition values to drop + void DropPartition(int32_t spec_id, PartitionValues partition); + + /// \brief Set a flag that makes FilterManifests() fail if any registered + /// delete path was not found in any manifest entry. + void FailMissingDeletePaths(); + + /// \brief Returns true if any delete condition has been registered. + bool DeletesFiles() const; + + /// \brief Apply all accumulated delete conditions to the base snapshot's manifests. + /// + /// Manifests that cannot possibly contain deleted files are returned unchanged. + /// Manifests that do contain deleted files are rewritten using writer_factory. + /// + /// \param metadata Table metadata (provides specs and schema for evaluators) + /// \param base_snapshot The snapshot whose manifests to filter (may be null) + /// \param writer_factory Factory to create new ManifestWriter instances + /// \return The filtered manifest list, or an error + Result> FilterManifests( + const TableMetadata& metadata, const std::shared_ptr& base_snapshot, + const ManifestWriterFactory& writer_factory); + + private: + struct DeleteExpr { + std::shared_ptr expr; + bool case_sensitive; + }; + + /// \brief Returns true if the manifest might contain files matching any expression. + bool CanContainExpressionDeletes(const ManifestFile& manifest, + const TableMetadata& metadata); + + /// \brief Returns true if the manifest might contain files in a dropped partition. + bool CanContainDroppedPartitions(const ManifestFile& manifest); + + /// \brief Returns true if the manifest might contain path-deleted files. + bool CanContainDroppedFiles() const; + + /// \brief Returns true if the manifest possibly contains any deleted file. + bool CanContainDeletedFiles(const ManifestFile& manifest, + const TableMetadata& metadata); + + /// \brief Get or create a ManifestEvaluator for the given spec and expression. + Result GetManifestEvaluator(const TableMetadata& metadata, + int32_t spec_id, + const DeleteExpr& de); + + /// \brief Get or create an InclusiveMetricsEvaluator for the given spec and expression. + Result GetMetricsEvaluator(const TableMetadata& metadata, + int32_t spec_id, + const DeleteExpr& de); + + /// \brief Check whether a single entry should be deleted. + bool ShouldDelete(const ManifestEntry& entry, const TableMetadata& metadata, + int32_t manifest_spec_id); + + const ManifestContent manifest_content_; + std::shared_ptr file_io_; + + std::vector delete_exprs_; + std::unordered_set delete_paths_; + std::unordered_set pending_paths_; + PartitionSet drop_partitions_; + bool fail_missing_delete_paths_{false}; + + // Cache: (spec_id, expr_index) → ManifestEvaluator + std::unordered_map>> + manifest_evaluator_cache_; + // Cache: (spec_id, expr_index) → InclusiveMetricsEvaluator + std::unordered_map>> + metrics_evaluator_cache_; +}; + +} // namespace iceberg diff --git a/src/iceberg/manifest/manifest_merge_manager.cc b/src/iceberg/manifest/manifest_merge_manager.cc new file mode 100644 index 000000000..ae0051453 --- /dev/null +++ b/src/iceberg/manifest/manifest_merge_manager.cc @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_merge_manager.h" + +#include +#include + +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/table_metadata.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +ManifestMergeManager::ManifestMergeManager(int64_t target_size_bytes, + int32_t min_count_to_merge, + bool merge_enabled) + : target_size_bytes_(target_size_bytes), + min_count_to_merge_(min_count_to_merge), + merge_enabled_(merge_enabled) {} + +Result> ManifestMergeManager::MergeManifests( + const std::vector& existing_manifests, + const std::vector& new_manifests, const TableMetadata& metadata, + std::shared_ptr file_io, const ManifestWriterFactory& writer_factory) { + // Combine new then existing (new-first ordering is preserved in output) + std::vector all; + all.reserve(new_manifests.size() + existing_manifests.size()); + all.insert(all.end(), new_manifests.begin(), new_manifests.end()); + all.insert(all.end(), existing_manifests.begin(), existing_manifests.end()); + + if (!merge_enabled_ || static_cast(all.size()) < min_count_to_merge_) { + return all; + } + + // Group manifests by partition_spec_id — never merge across specs + std::map> by_spec; + for (const auto& m : all) { + by_spec[m.partition_spec_id].push_back(m); + } + + std::vector result; + result.reserve(all.size()); + for (auto& [spec_id, group] : by_spec) { + ICEBERG_ASSIGN_OR_RAISE(auto merged, + MergeGroup(group, metadata, file_io, writer_factory)); + result.insert(result.end(), std::make_move_iterator(merged.begin()), + std::make_move_iterator(merged.end())); + } + return result; +} + +Result> ManifestMergeManager::MergeGroup( + const std::vector& group, const TableMetadata& metadata, + std::shared_ptr file_io, const ManifestWriterFactory& writer_factory) { + std::vector result; + std::vector current_bin; + int64_t bin_size = 0; + + for (const auto& manifest : group) { + if (manifest.manifest_length >= target_size_bytes_) { + // Oversized manifest passes through unchanged without affecting the current bin + result.push_back(manifest); + } else if (bin_size + manifest.manifest_length > target_size_bytes_ && + !current_bin.empty()) { + // Adding this manifest would overflow the bin — flush and start a new bin + ICEBERG_ASSIGN_OR_RAISE(auto merged, + FlushBin(current_bin, metadata, file_io, writer_factory)); + result.push_back(std::move(merged)); + current_bin.clear(); + bin_size = 0; + current_bin.push_back(manifest); + bin_size = manifest.manifest_length; + } else { + current_bin.push_back(manifest); + bin_size += manifest.manifest_length; + } + } + + if (!current_bin.empty()) { + ICEBERG_ASSIGN_OR_RAISE(auto merged, + FlushBin(current_bin, metadata, file_io, writer_factory)); + result.push_back(std::move(merged)); + } + + return result; +} + +Result ManifestMergeManager::FlushBin( + const std::vector& bin, const TableMetadata& metadata, + std::shared_ptr file_io, const ManifestWriterFactory& writer_factory) { + // A single-manifest bin requires no merging + if (bin.size() == 1) return bin[0]; + + const ManifestFile& first = bin[0]; + int32_t spec_id = first.partition_spec_id; + + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id)); + + ICEBERG_ASSIGN_OR_RAISE(auto writer, writer_factory(spec_id, first.content)); + + for (const auto& manifest : bin) { + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, file_io, schema, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + for (const auto& entry : entries) { + // When merging, ADDED entries from prior manifests become EXISTING; + // DELETED entries are preserved as-is (tombstones). + if (entry.status == ManifestStatus::kDeleted) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry)); + } else { + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry)); + } + } + } + + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); +} + +} // namespace iceberg diff --git a/src/iceberg/manifest/manifest_merge_manager.h b/src/iceberg/manifest/manifest_merge_manager.h new file mode 100644 index 000000000..c26df875d --- /dev/null +++ b/src/iceberg/manifest/manifest_merge_manager.h @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/manifest_merge_manager.h +/// Merges small manifests into fewer larger ones according to table properties. + +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_filter_manager.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Merges small manifests into larger ones using greedy bin-packing. +/// +/// Manifests are grouped by partition_spec_id before merging; manifests with +/// different spec IDs are never merged together. Within a group, manifests are +/// accumulated into bins until a bin would exceed target_size_bytes, at which +/// point the bin is flushed (written) and a new one started. Manifests already +/// larger than target_size_bytes pass through unchanged. +/// +/// \note This class is non-copyable and non-movable. +class ICEBERG_EXPORT ManifestMergeManager { + public: + /// \brief Construct a merge manager with the given configuration. + /// + /// \param target_size_bytes Target output manifest size in bytes + /// \param min_count_to_merge Minimum number of manifests before any merging occurs + /// \param merge_enabled Whether merging is enabled at all + ManifestMergeManager(int64_t target_size_bytes, int32_t min_count_to_merge, + bool merge_enabled); + + ManifestMergeManager(const ManifestMergeManager&) = delete; + ManifestMergeManager& operator=(const ManifestMergeManager&) = delete; + + /// \brief Merge existing and new manifests according to configured thresholds. + /// + /// \param existing_manifests Manifests already in the base snapshot + /// \param new_manifests Newly written manifests to incorporate + /// \param metadata Table metadata (provides specs and schema for readers) + /// \param file_io File IO used to open existing manifests for reading + /// \param writer_factory Factory to create new ManifestWriter instances + /// \return The merged manifest list, or an error + Result> MergeManifests( + const std::vector& existing_manifests, + const std::vector& new_manifests, const TableMetadata& metadata, + std::shared_ptr file_io, const ManifestWriterFactory& writer_factory); + + private: + /// \brief Merge a group of manifests sharing the same spec_id. + /// + /// Returns the merged manifests for this group (pass-throughs + newly written). + Result> MergeGroup( + const std::vector& group, const TableMetadata& metadata, + std::shared_ptr file_io, const ManifestWriterFactory& writer_factory); + + /// \brief Write a merged manifest from all manifests in a bin. + Result FlushBin(const std::vector& bin, + const TableMetadata& metadata, + std::shared_ptr file_io, + const ManifestWriterFactory& writer_factory); + + const int64_t target_size_bytes_; + const int32_t min_count_to_merge_; + const bool merge_enabled_; +}; + +} // namespace iceberg diff --git a/src/iceberg/manifest/meson.build b/src/iceberg/manifest/meson.build index 41e685ffc..d4b039a67 100644 --- a/src/iceberg/manifest/meson.build +++ b/src/iceberg/manifest/meson.build @@ -18,8 +18,10 @@ install_headers( [ 'manifest_entry.h', + 'manifest_filter_manager.h', 'manifest_group.h', 'manifest_list.h', + 'manifest_merge_manager.h', 'manifest_reader.h', 'manifest_writer.h', 'rolling_manifest_writer.h', diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index c2947f3fe..8136f46ac 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -67,8 +67,10 @@ iceberg_sources = files( 'location_provider.cc', 'manifest/manifest_adapter.cc', 'manifest/manifest_entry.cc', + 'manifest/manifest_filter_manager.cc', 'manifest/manifest_group.cc', 'manifest/manifest_list.cc', + 'manifest/manifest_merge_manager.cc', 'manifest/manifest_reader.cc', 'manifest/manifest_util.cc', 'manifest/manifest_writer.cc', diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc index 00905378a..5fdd47998 100644 --- a/src/iceberg/schema.cc +++ b/src/iceberg/schema.cc @@ -40,6 +40,8 @@ Schema::Schema(std::vector fields, int32_t schema_id) schema_id_(schema_id), cache_(std::make_unique(this)) {} +Schema::~Schema() = default; + Result> Schema::Make(std::vector fields, int32_t schema_id, std::vector identifier_field_ids) { diff --git a/src/iceberg/schema.h b/src/iceberg/schema.h index 3c84bc2af..791ed5c8f 100644 --- a/src/iceberg/schema.h +++ b/src/iceberg/schema.h @@ -57,6 +57,8 @@ class ICEBERG_EXPORT Schema : public StructType { explicit Schema(std::vector fields, int32_t schema_id = kInitialSchemaId); + ~Schema() override; + /// \brief Create a schema. /// /// \param fields The fields that make up the schema. diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 1d80b29a5..ca77e4e2f 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -180,6 +180,7 @@ if(ICEBERG_BUILD_BUNDLE) delete_file_index_test.cc manifest_group_test.cc manifest_list_versions_test.cc + manifest_merge_manager_test.cc manifest_reader_stats_test.cc manifest_reader_test.cc manifest_writer_versions_test.cc @@ -205,6 +206,7 @@ if(ICEBERG_BUILD_BUNDLE) SOURCES expire_snapshots_test.cc fast_append_test.cc + manifest_filter_manager_test.cc name_mapping_update_test.cc snapshot_manager_test.cc transaction_test.cc diff --git a/src/iceberg/test/manifest_filter_manager_test.cc b/src/iceberg/test/manifest_filter_manager_test.cc new file mode 100644 index 000000000..271b054df --- /dev/null +++ b/src/iceberg/test/manifest_filter_manager_test.cc @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_filter_manager.h" + +#include +#include +#include + +#include +#include + +#include "iceberg/avro/avro_register.h" +#include "iceberg/expression/expression.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/update/fast_append.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +class ManifestFilterManagerTest : public MinimalUpdateTestBase { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + void SetUp() override { + MinimalUpdateTestBase::SetUp(); + + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + + // Two files in different partitions (identity(x)) + file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L); + file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L); + } + + std::shared_ptr MakeDataFile(const std::string& path, int64_t partition_x) { + auto f = std::make_shared(); + f->content = DataFile::Content::kData; + f->file_path = table_location_ + path; + f->file_format = FileFormatType::kParquet; + f->partition = PartitionValues(std::vector{Literal::Long(partition_x)}); + f->file_size_in_bytes = 1024; + f->record_count = 100; + f->partition_spec_id = spec_->spec_id(); + return f; + } + + // Append files, commit, refresh, and return the current snapshot. + Result> CommitFiles( + std::vector> files) { + ICEBERG_ASSIGN_OR_RAISE(auto fa, table_->NewFastAppend()); + for (const auto& f : files) fa->AppendFile(f); + ICEBERG_RETURN_UNEXPECTED(fa->Commit()); + ICEBERG_RETURN_UNEXPECTED(table_->Refresh()); + return table_->current_snapshot(); + } + + ManifestWriterFactory MakeWriterFactory(const TableMetadata& metadata) { + auto fv = metadata.format_version; + return [this, fv, &metadata](int32_t spec_id, + ManifestContent content) mutable + -> Result> { + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + auto path = std::format("{}/metadata/flt-{}.avro", table_location_, + manifest_counter_++); + return ManifestWriter::MakeWriter(fv, kTestSnapshotId, path, file_io_, spec, + schema, content); + }; + } + + // Read all entries from a list of ManifestFiles. + Result> ReadAllEntries( + const std::vector& manifests, const TableMetadata& metadata) { + std::vector result; + for (const auto& m : manifests) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(m.partition_spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto reader, ManifestReader::Make(m, file_io_, schema, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + result.insert(result.end(), entries.begin(), entries.end()); + } + return result; + } + + static constexpr int64_t kTestSnapshotId = 55555L; + int manifest_counter_ = 0; + std::shared_ptr spec_; + std::shared_ptr schema_; + std::shared_ptr file_a_; + std::shared_ptr file_b_; +}; + +TEST_F(ManifestFilterManagerTest, NullSnapshotReturnsEmpty) { + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, nullptr, factory)); + EXPECT_TRUE(result.empty()); +} + +TEST_F(ManifestFilterManagerTest, DeletesFilesReturnsCorrectState) { + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + EXPECT_FALSE(mgr.DeletesFiles()); + mgr.DeleteFile("/some/path.parquet"); + EXPECT_TRUE(mgr.DeletesFiles()); +} + +TEST_F(ManifestFilterManagerTest, NoConditionsReturnsManifestsUnchanged) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + // Load original manifests so we can compare paths + ICEBERG_UNWRAP_OR_FAIL(auto list_reader, + ManifestListReader::Make(snap->manifest_list, file_io_)); + ICEBERG_UNWRAP_OR_FAIL(auto orig_manifests, list_reader->Files()); + + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, factory)); + + ASSERT_EQ(result.size(), orig_manifests.size()); + for (size_t i = 0; i < result.size(); ++i) { + // No rewrite → same manifest path + EXPECT_EQ(result[i].manifest_path, orig_manifests[i].manifest_path); + } +} + +TEST_F(ManifestFilterManagerTest, DeleteFileByPath) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.DeleteFile(file_a_->file_path); + + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, factory)); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata)); + int deleted_count = 0; + int live_count = 0; + for (const auto& e : entries) { + if (e.status == ManifestStatus::kDeleted) { + ++deleted_count; + ASSERT_NE(e.data_file, nullptr); + EXPECT_EQ(e.data_file->file_path, file_a_->file_path); + } else { + ++live_count; + } + } + EXPECT_EQ(deleted_count, 1); + EXPECT_EQ(live_count, 1); +} + +TEST_F(ManifestFilterManagerTest, RowFilterAlwaysTrueDeletesAll) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.DeleteByRowFilter(Expressions::AlwaysTrue()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, factory)); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata)); + for (const auto& e : entries) { + EXPECT_EQ(e.status, ManifestStatus::kDeleted) + << "Expected all entries to be DELETED"; + } +} + +TEST_F(ManifestFilterManagerTest, RowFilterAlwaysFalseDeletesNone) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.DeleteByRowFilter(Expressions::AlwaysFalse()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, factory)); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata)); + for (const auto& e : entries) { + // AlwaysFalse means nothing can match → entries remain ADDED or EXISTING + EXPECT_NE(e.status, ManifestStatus::kDeleted) + << "Expected no entries to be DELETED"; + } +} + +TEST_F(ManifestFilterManagerTest, DropPartition) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + // Drop partition of file_a (partition_x = 1) + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.DropPartition(spec_->spec_id(), + PartitionValues(std::vector{Literal::Long(1L)})); + + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, factory)); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata)); + int deleted_count = 0; + for (const auto& e : entries) { + if (e.status == ManifestStatus::kDeleted) { + ++deleted_count; + ASSERT_TRUE(e.data_file != nullptr); + EXPECT_EQ(e.data_file->file_path, file_a_->file_path); + } + } + EXPECT_EQ(deleted_count, 1); +} + +TEST_F(ManifestFilterManagerTest, FailMissingDeletePathsReturnsError) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.DeleteFile("/does/not/exist.parquet"); + mgr.FailMissingDeletePaths(); + + auto result = mgr.FilterManifests(*metadata, snap, factory); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); +} + +TEST_F(ManifestFilterManagerTest, MultipleConditionsOrCombined) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + // Both files should be deleted: file_a by path, file_b by AlwaysTrue expression + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.DeleteFile(file_a_->file_path); + mgr.DeleteByRowFilter(Expressions::AlwaysTrue()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, factory)); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata)); + for (const auto& e : entries) { + EXPECT_EQ(e.status, ManifestStatus::kDeleted); + } +} + +} // namespace iceberg diff --git a/src/iceberg/test/manifest_merge_manager_test.cc b/src/iceberg/test/manifest_merge_manager_test.cc new file mode 100644 index 000000000..14f23893f --- /dev/null +++ b/src/iceberg/test/manifest_merge_manager_test.cc @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_merge_manager.h" + +#include +#include +#include +#include + +#include +#include + +#include "iceberg/arrow/arrow_io_util.h" +#include "iceberg/avro/avro_register.h" +#include "iceberg/file_format.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_filter_manager.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/sort_order.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/transform.h" +#include "iceberg/type.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +constexpr int8_t kFormatVersion = 2; +constexpr int64_t kSnapshotId = 12345L; +constexpr int32_t kSpecId0 = 0; +constexpr int32_t kSpecId1 = 1; + +} // namespace + +class ManifestMergeManagerTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + void SetUp() override { + file_io_ = arrow::MakeMockFileIO(); + + // Simple schema: one long column + schema_ = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "x", int64()), + }); + spec0_ = PartitionSpec::Make( + kSpecId0, + {PartitionField(1, 1000, "x", Transform::Identity())}) + .value(); + spec1_ = PartitionSpec::Make( + kSpecId1, + {PartitionField(1, 1001, "x_bucket", Transform::Bucket(8))}) + .value(); + + // Build minimal TableMetadata with both specs + auto builder = TableMetadataBuilder::BuildFromEmpty(kFormatVersion); + builder->SetCurrentSchema(schema_, schema_->HighestFieldId().value_or(0)); + builder->SetDefaultPartitionSpec(spec0_); + builder->AddPartitionSpec(spec1_); + builder->SetDefaultSortOrder(SortOrder::Unsorted()); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + metadata_ = std::shared_ptr(std::move(metadata)); + } + + // Write a small manifest with N data files and return the ManifestFile descriptor. + Result WriteManifest(int32_t spec_id, int num_files, + int64_t file_size_override = 512) { + auto path = std::format("manifest-{}.avro", manifest_counter_++); + auto spec = spec_id == kSpecId0 ? spec0_ : spec1_; + ICEBERG_ASSIGN_OR_RAISE(auto writer, + ManifestWriter::MakeWriter(kFormatVersion, kSnapshotId, path, + file_io_, spec, schema_, + ManifestContent::kData)); + for (int i = 0; i < num_files; ++i) { + auto f = std::make_shared(); + f->content = DataFile::Content::kData; + f->file_path = std::format("data/file-{}-{}.parquet", manifest_counter_, i); + f->file_format = FileFormatType::kParquet; + // Identity spec uses LONG partition values; Bucket spec uses INT + Literal part_val = (spec_id == kSpecId0) ? Literal::Long(i) : Literal::Int(i % 8); + f->partition = PartitionValues(std::vector{part_val}); + f->file_size_in_bytes = 1024; + f->record_count = 10; + f->partition_spec_id = spec_id; + ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(f)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, writer->ToManifestFile()); + // Override length so we can control bin-packing behaviour in tests + manifest_file.manifest_length = file_size_override; + return manifest_file; + } + + ManifestWriterFactory MakeWriterFactory() { + return [this](int32_t spec_id, ManifestContent content) + -> Result> { + ++factory_call_count_; + auto spec = spec_id == kSpecId0 ? spec0_ : spec1_; + auto path = std::format("merged-{}.avro", manifest_counter_++); + return ManifestWriter::MakeWriter(kFormatVersion, kSnapshotId, path, file_io_, + spec, schema_, content); + }; + } + + // Count total entries across all manifests. + Result CountEntries(const std::vector& manifests) { + int total = 0; + for (const auto& m : manifests) { + auto spec = m.partition_spec_id == kSpecId0 ? spec0_ : spec1_; + ICEBERG_ASSIGN_OR_RAISE(auto reader, ManifestReader::Make(m, file_io_, schema_, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + total += static_cast(entries.size()); + } + return total; + } + + std::shared_ptr file_io_; + std::shared_ptr schema_; + std::shared_ptr spec0_; + std::shared_ptr spec1_; + std::shared_ptr metadata_; + int manifest_counter_ = 0; + int factory_call_count_ = 0; +}; + +TEST_F(ManifestMergeManagerTest, MergeDisabled) { + ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1)); + ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1)); + ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId0, 1)); + + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/false); + ICEBERG_UNWRAP_OR_FAIL( + auto result, mgr.MergeManifests({m0, m1}, {m2}, *metadata_, file_io_, + MakeWriterFactory())); + // merge disabled → all 3 manifests returned, factory never called + EXPECT_EQ(result.size(), 3U); + EXPECT_EQ(factory_call_count_, 0); +} + +TEST_F(ManifestMergeManagerTest, BelowMinCountThreshold) { + ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1)); + ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1)); + + // min_count=3, only 2 manifests total → no merge + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL( + auto result, mgr.MergeManifests({m0}, {m1}, *metadata_, file_io_, MakeWriterFactory())); + EXPECT_EQ(result.size(), 2U); + EXPECT_EQ(factory_call_count_, 0); +} + +TEST_F(ManifestMergeManagerTest, MergeOccursAtThreshold) { + // 3 small manifests (each 100 bytes), target=1024 → all fit in one bin + ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId0, 1, /*size=*/100)); + + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL( + auto result, mgr.MergeManifests({m0, m1}, {m2}, *metadata_, file_io_, + MakeWriterFactory())); + // All 3 merged into 1 manifest (total 3 entries) + EXPECT_EQ(result.size(), 1U); + ICEBERG_UNWRAP_OR_FAIL(auto count1, CountEntries(result)); + EXPECT_EQ(count1, 3); +} + +TEST_F(ManifestMergeManagerTest, OversizedManifestPassedThrough) { + // m_large exceeds target → must not be merged; m_small fits + ICEBERG_UNWRAP_OR_FAIL(auto m_large, WriteManifest(kSpecId0, 2, /*size=*/2000)); + ICEBERG_UNWRAP_OR_FAIL(auto m_small, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m_small2, WriteManifest(kSpecId0, 1, /*size=*/100)); + + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL( + auto result, + mgr.MergeManifests({m_large, m_small}, {m_small2}, *metadata_, file_io_, + MakeWriterFactory())); + // m_large passes through; m_small and m_small2 merge into 1 + EXPECT_EQ(result.size(), 2U); + ICEBERG_UNWRAP_OR_FAIL(auto count2, CountEntries(result)); + EXPECT_EQ(count2, 4); // 2 + 1 + 1 +} + +TEST_F(ManifestMergeManagerTest, CrossSpecManifestsNotMerged) { + // Manifests with different spec IDs must never be merged together + ICEBERG_UNWRAP_OR_FAIL(auto m_spec0a, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m_spec0b, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m_spec1a, WriteManifest(kSpecId1, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m_spec1b, WriteManifest(kSpecId1, 1, /*size=*/100)); + + // With 4 manifests (target large enough for each pair), we get 2 merged outputs + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL( + auto result, + mgr.MergeManifests({m_spec0a, m_spec1a}, {m_spec0b, m_spec1b}, *metadata_, + file_io_, MakeWriterFactory())); + EXPECT_EQ(result.size(), 2U); + // Verify spec IDs are preserved per output manifest + for (const auto& m : result) { + EXPECT_THAT(m.partition_spec_id, ::testing::AnyOf(kSpecId0, kSpecId1)); + } +} + +TEST_F(ManifestMergeManagerTest, WriterFactoryCalledOncePerMergedManifest) { + // 4 small manifests in two groups → 2 merged outputs → factory called twice + ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId1, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m3, WriteManifest(kSpecId1, 1, /*size=*/100)); + + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL( + auto result, + mgr.MergeManifests({m0, m2}, {m1, m3}, *metadata_, file_io_, MakeWriterFactory())); + EXPECT_EQ(result.size(), 2U); + EXPECT_EQ(factory_call_count_, 2); +} + +} // namespace iceberg From a38da04b73b30db7e367bb9f596f1e6ce9700858 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Thu, 14 May 2026 11:58:11 +0800 Subject: [PATCH 2/3] fix(manifest): align managers with Java review feedback Squash the review-fix commits after 2b124ca3 and the current tracked working-tree changes into one commit. This aligns ManifestFilterManager and ManifestMergeManager more closely with Java by: - adding object-level DeleteFile support and FilesToBeDeleted tracking - renaming DeletesFiles to ContainsDeletes and adding FailAnyDelete - separating merge grouping by content and adding kDeletes coverage - matching ListPacker.packEnd bin-packing behavior - aligning defined public APIs and row-filter semantics with Java, including manager-wide CaseSensitive handling and residual-based metrics evaluation - applying doc and reviewer follow-up fixes --- .../manifest/manifest_filter_manager.cc | 186 +++++++++++++----- .../manifest/manifest_filter_manager.h | 78 ++++++-- .../manifest/manifest_merge_manager.cc | 121 ++++++++---- src/iceberg/manifest/manifest_merge_manager.h | 35 +++- .../test/manifest_filter_manager_test.cc | 71 +++++-- .../test/manifest_merge_manager_test.cc | 175 +++++++++++++--- 6 files changed, 500 insertions(+), 166 deletions(-) diff --git a/src/iceberg/manifest/manifest_filter_manager.cc b/src/iceberg/manifest/manifest_filter_manager.cc index 67fd737bf..ddad60eda 100644 --- a/src/iceberg/manifest/manifest_filter_manager.cc +++ b/src/iceberg/manifest/manifest_filter_manager.cc @@ -25,6 +25,8 @@ #include "iceberg/expression/inclusive_metrics_evaluator.h" #include "iceberg/expression/manifest_evaluator.h" +#include "iceberg/expression/residual_evaluator.h" +#include "iceberg/expression/strict_metrics_evaluator.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" #include "iceberg/manifest/manifest_reader.h" @@ -39,15 +41,29 @@ ManifestFilterManager::ManifestFilterManager(ManifestContent content, std::shared_ptr file_io) : manifest_content_(content), file_io_(std::move(file_io)) {} -void ManifestFilterManager::DeleteByRowFilter(std::shared_ptr expr, - bool case_sensitive) { - delete_exprs_.push_back({std::move(expr), case_sensitive}); +void ManifestFilterManager::DeleteByRowFilter(std::shared_ptr expr) { + ICEBERG_CHECK_OR_DIE(expr != nullptr, "Cannot delete files using filter: null"); + delete_exprs_.push_back({.expr = std::move(expr)}); +} + +void ManifestFilterManager::CaseSensitive(bool case_sensitive) { + case_sensitive_ = case_sensitive; + manifest_evaluator_cache_.clear(); + residual_evaluator_cache_.clear(); } void ManifestFilterManager::DeleteFile(std::string_view path) { - std::string p(path); - delete_paths_.insert(p); - pending_paths_.insert(std::move(p)); + delete_paths_.insert(std::string(path)); +} + +void ManifestFilterManager::DeleteFile(std::shared_ptr file) { + ICEBERG_CHECK_OR_DIE(file != nullptr, "Cannot delete file: null"); + delete_paths_.insert(file->file_path); + delete_files_.insert(std::move(file)); +} + +const DataFileSet& ManifestFilterManager::FilesToBeDeleted() const { + return delete_files_; } void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues partition) { @@ -58,7 +74,9 @@ void ManifestFilterManager::FailMissingDeletePaths() { fail_missing_delete_paths_ = true; } -bool ManifestFilterManager::DeletesFiles() const { +void ManifestFilterManager::FailAnyDelete() { fail_any_delete_ = true; } + +bool ManifestFilterManager::ContainsDeletes() const { return !delete_exprs_.empty() || !delete_paths_.empty() || !drop_partitions_.empty(); } @@ -68,20 +86,22 @@ bool ManifestFilterManager::CanContainDroppedFiles() const { bool ManifestFilterManager::CanContainDroppedPartitions(const ManifestFile& manifest) { if (drop_partitions_.empty()) return false; - // Without a partition filter helper, we conservatively say yes. - // A full implementation would use ManifestFileUtil::canContainAny; for now - // we open the manifest and let per-entry checks decide. - (void)manifest; - return true; + // Only manifests whose partition spec matches a registered drop can contain + // entries for that partition. PartitionKey is pair. + int32_t spec_id = manifest.partition_spec_id; + for (const auto& key : drop_partitions_) { + if (key.first == spec_id) return true; + } + return false; } bool ManifestFilterManager::CanContainExpressionDeletes(const ManifestFile& manifest, const TableMetadata& metadata) { if (delete_exprs_.empty()) return false; int32_t spec_id = manifest.partition_spec_id; - for (size_t i = 0; i < delete_exprs_.size(); ++i) { - auto* evaluator_ptr = GetManifestEvaluator(metadata, spec_id, delete_exprs_[i]) - .value_or(nullptr); + for (const auto& delete_expr : delete_exprs_) { + auto* evaluator_ptr = + GetManifestEvaluator(metadata, spec_id, delete_expr).value_or(nullptr); if (evaluator_ptr == nullptr) return true; // conservative on error auto result = evaluator_ptr->Evaluate(manifest); if (!result.has_value() || result.value()) return true; @@ -92,8 +112,12 @@ bool ManifestFilterManager::CanContainExpressionDeletes(const ManifestFile& mani bool ManifestFilterManager::CanContainDeletedFiles(const ManifestFile& manifest, const TableMetadata& metadata) { // A manifest with no live files cannot contain files to delete. - bool has_live = (manifest.added_files_count.value_or(0) > 0) || - (manifest.existing_files_count.value_or(0) > 0); + // Null counts mean the count is unknown — treat conservatively as possibly non-zero + // (matches Java's ManifestFile.hasAddedFiles / hasExistingFiles behaviour). + bool has_live = !manifest.added_files_count.has_value() || + manifest.added_files_count.value() > 0 || + !manifest.existing_files_count.has_value() || + manifest.existing_files_count.value() > 0; if (!has_live) return false; return CanContainDroppedFiles() || CanContainExpressionDeletes(manifest, metadata) || @@ -111,51 +135,79 @@ Result ManifestFilterManager::GetManifestEvaluator( ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id)); ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); ICEBERG_ASSIGN_OR_RAISE(vec[idx], ManifestEvaluator::MakeRowFilter( - de.expr, spec, *schema, de.case_sensitive)); + de.expr, spec, *schema, case_sensitive_)); } return vec[idx].get(); } -Result ManifestFilterManager::GetMetricsEvaluator( +Result ManifestFilterManager::GetResidualEvaluator( const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) { - auto& vec = metrics_evaluator_cache_[spec_id]; + auto& vec = residual_evaluator_cache_[spec_id]; size_t idx = &de - delete_exprs_.data(); if (idx >= vec.size()) { vec.resize(delete_exprs_.size()); } if (!vec[idx]) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id)); ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); - ICEBERG_ASSIGN_OR_RAISE(vec[idx], - InclusiveMetricsEvaluator::Make(de.expr, *schema, - de.case_sensitive)); + ICEBERG_ASSIGN_OR_RAISE( + vec[idx], ResidualEvaluator::Make(de.expr, *spec, *schema, case_sensitive_)); } return vec[idx].get(); } -bool ManifestFilterManager::ShouldDelete(const ManifestEntry& entry, - const TableMetadata& metadata, - int32_t manifest_spec_id) { +Result ManifestFilterManager::ShouldDelete(const ManifestEntry& entry, + const TableMetadata& metadata, + int32_t manifest_spec_id) { if (!entry.data_file) return false; const DataFile& file = *entry.data_file; - - // Path-based check - if (delete_paths_.count(file.file_path)) { - pending_paths_.erase(file.file_path); - return true; + int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id); + std::shared_ptr schema; + if (!delete_exprs_.empty()) { + ICEBERG_ASSIGN_OR_RAISE(schema, metadata.Schema()); } - // Partition-drop check - int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id); - if (drop_partitions_.contains(spec_id, file.partition)) { + // Path-based and partition-drop checks + if (delete_paths_.count(file.file_path) || + drop_partitions_.contains(spec_id, file.partition)) { + if (fail_any_delete_) { + return InvalidArgument("Operation would delete existing data: {}", file.file_path); + } return true; } - // Expression-based check (inclusive metrics) + // Expression-based check. + // Java semantics: compute a partition residual first, then use strict/inclusive + // metrics on that residual. Data manifests reject partial matches; delete manifests + // tolerate them because only data-file deletes require all-rows-match validation. for (const auto& de : delete_exprs_) { - auto* eval = GetMetricsEvaluator(metadata, spec_id, de).value_or(nullptr); - if (eval == nullptr) return true; // conservative on error - auto result = eval->Evaluate(file); - if (!result.has_value() || result.value()) return true; + ICEBERG_ASSIGN_OR_RAISE(auto* residual_eval, + GetResidualEvaluator(metadata, spec_id, de)); + ICEBERG_ASSIGN_OR_RAISE(auto residual_expr, + residual_eval->ResidualFor(file.partition)); + ICEBERG_ASSIGN_OR_RAISE( + auto strict_eval, + StrictMetricsEvaluator::Make(residual_expr, schema, case_sensitive_)); + ICEBERG_ASSIGN_OR_RAISE(auto strict_match, strict_eval->Evaluate(file)); + if (strict_match) { + if (fail_any_delete_) { + return InvalidArgument("Operation would delete existing data: {}", + file.file_path); + } + return true; + } + + ICEBERG_ASSIGN_OR_RAISE(auto incl_eval, InclusiveMetricsEvaluator::Make( + residual_expr, *schema, case_sensitive_)); + ICEBERG_ASSIGN_OR_RAISE(auto incl_match, incl_eval->Evaluate(file)); + if (incl_match) { + if (manifest_content_ == ManifestContent::kDeletes) { + continue; + } + return InvalidArgument( + "Cannot delete file where some, but not all, rows match filter: {}", + file.file_path); + } } return false; @@ -164,12 +216,25 @@ bool ManifestFilterManager::ShouldDelete(const ManifestEntry& entry, Result> ManifestFilterManager::FilterManifests( const TableMetadata& metadata, const std::shared_ptr& base_snapshot, const ManifestWriterFactory& writer_factory) { + // Validate required delete paths before any early return — even an empty base + // snapshot must report missing required paths (matches Java's validateRequiredDeletes). + if (fail_missing_delete_paths_ && !delete_paths_.empty() && !base_snapshot) { + return InvalidArgument("Missing delete paths: {}", [&] { + std::string s; + for (const auto& p : delete_paths_) { + if (!s.empty()) s += ", "; + s += p; + } + return s; + }()); + } + // No base snapshot → nothing to filter if (!base_snapshot) return std::vector{}; // Load the relevant manifests from the manifest list - ICEBERG_ASSIGN_OR_RAISE(auto list_reader, - ManifestListReader::Make(base_snapshot->manifest_list, file_io_)); + ICEBERG_ASSIGN_OR_RAISE( + auto list_reader, ManifestListReader::Make(base_snapshot->manifest_list, file_io_)); ICEBERG_ASSIGN_OR_RAISE(auto all_manifests, list_reader->Files()); // Keep only manifests for this manager's content type @@ -180,12 +245,16 @@ Result> ManifestFilterManager::FilterManifests( } // No conditions registered → return unchanged - if (!DeletesFiles()) return manifests; + if (!ContainsDeletes()) return manifests; ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); std::vector result; result.reserve(manifests.size()); + // Track which registered delete paths were actually found across all manifests. + // Using delete_paths_ as the immutable source of truth makes FilterManifests + // idempotent across commit retries (matches Java's validateRequiredDeletes design). + std::unordered_set found_paths; for (const auto& manifest : manifests) { // Fast path: metadata skip @@ -199,13 +268,14 @@ Result> ManifestFilterManager::FilterManifests( // Read all live entries from the manifest ICEBERG_ASSIGN_OR_RAISE(auto reader, - ManifestReader::Make(manifest, file_io_, schema, spec)); + ManifestReader::Make(manifest, file_io_, schema, spec)); ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); // Check whether any entry should be deleted bool has_deletes = false; for (const auto& entry : entries) { - if (ShouldDelete(entry, metadata, spec_id)) { + ICEBERG_ASSIGN_OR_RAISE(auto should_delete, ShouldDelete(entry, metadata, spec_id)); + if (should_delete) { has_deletes = true; break; } @@ -216,10 +286,17 @@ Result> ManifestFilterManager::FilterManifests( continue; } - // Rewrite the manifest with deleted entries marked + // Rewrite the manifest with deleted entries marked; record found paths. ICEBERG_ASSIGN_OR_RAISE(auto writer, writer_factory(spec_id, manifest_content_)); for (const auto& entry : entries) { - if (ShouldDelete(entry, metadata, spec_id)) { + ICEBERG_ASSIGN_OR_RAISE(auto should_delete, ShouldDelete(entry, metadata, spec_id)); + if (should_delete) { + if (entry.data_file && delete_paths_.count(entry.data_file->file_path)) { + found_paths.insert(entry.data_file->file_path); + } + if (entry.data_file) { + delete_files_.insert(std::make_shared(*entry.data_file)); + } ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry)); } else { ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry)); @@ -230,14 +307,19 @@ Result> ManifestFilterManager::FilterManifests( result.push_back(std::move(filtered_manifest)); } - // Validate that all registered delete paths were found - if (fail_missing_delete_paths_ && !pending_paths_.empty()) { + // Validate that all registered delete paths were found. Uses delete_paths_ (not a + // consumed set) so repeated calls on the same manager produce the same result. + if (fail_missing_delete_paths_) { std::string missing; - for (const auto& p : pending_paths_) { - if (!missing.empty()) missing += ", "; - missing += p; + for (const auto& p : delete_paths_) { + if (!found_paths.count(p)) { + if (!missing.empty()) missing += ", "; + missing += p; + } + } + if (!missing.empty()) { + return InvalidArgument("Missing delete paths: {}", missing); } - return InvalidArgument("Missing delete paths: {}", missing); } return result; diff --git a/src/iceberg/manifest/manifest_filter_manager.h b/src/iceberg/manifest/manifest_filter_manager.h index 546f7c0fc..8f766ef77 100644 --- a/src/iceberg/manifest/manifest_filter_manager.h +++ b/src/iceberg/manifest/manifest_filter_manager.h @@ -32,11 +32,14 @@ #include "iceberg/expression/inclusive_metrics_evaluator.h" #include "iceberg/expression/manifest_evaluator.h" +#include "iceberg/expression/residual_evaluator.h" +#include "iceberg/expression/strict_metrics_evaluator.h" #include "iceberg/iceberg_export.h" #include "iceberg/manifest/manifest_list.h" #include "iceberg/manifest/manifest_writer.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" +#include "iceberg/util/data_file_set.h" #include "iceberg/util/partition_value_util.h" namespace iceberg { @@ -47,9 +50,8 @@ namespace iceberg { /// content type, and returns a new ManifestWriter ready for writing. The caller /// (i.e. MergingSnapshotUpdate in PR2) captures metadata, FileIO, and snapshot ID /// inside the lambda. -using ManifestWriterFactory = - std::function>(int32_t spec_id, - ManifestContent content)>; +using ManifestWriterFactory = std::function>( + int32_t spec_id, ManifestContent content)>; /// \brief Filters an existing snapshot's manifest list. /// @@ -61,6 +63,11 @@ using ManifestWriterFactory = /// The manager is content-agnostic: pass ManifestContent::kData to process data /// manifests, or ManifestContent::kDeletes to process delete manifests. /// +/// \note For ManifestContent::kDeletes, this implementation applies +/// path/partition/row-filter/object-level deletes. Java's additional cleanup +/// semantics (dropDeleteFilesOlderThan, removeDanglingDeletesFor / DV cleanup) +/// are not yet implemented and are planned for a later layer. +/// /// \note This class is non-copyable and non-movable. class ICEBERG_EXPORT ManifestFilterManager { public: @@ -75,9 +82,10 @@ class ICEBERG_EXPORT ManifestFilterManager { /// expression will be marked DELETED. /// /// \param expr The expression to match files against - /// \param case_sensitive Whether field name matching is case-sensitive - void DeleteByRowFilter(std::shared_ptr expr, - bool case_sensitive = true); + void DeleteByRowFilter(std::shared_ptr expr); + + /// \brief Set whether row-filter field binding is case-sensitive. + void CaseSensitive(bool case_sensitive); /// \brief Register an exact file path for deletion. /// @@ -86,6 +94,24 @@ class ICEBERG_EXPORT ManifestFilterManager { /// \param path The exact file path to delete void DeleteFile(std::string_view path); + /// \brief Register a file object for deletion. + /// + /// Any manifest entry whose file_path matches file->file_path will be marked + /// DELETED. The file object is retained in FilesToBeDeleted(), allowing callers + /// (e.g. RowDelta in PR5) to enumerate deleted file objects for DV cleanup. + /// Duplicate registrations (same path) are silently ignored. + /// + /// \param file The data/delete file to delete (must not be null) + void DeleteFile(std::shared_ptr file); + + /// \brief Returns the set of file objects marked for deletion by this manager. + /// + /// This includes files registered via DeleteFile(DataFile) and files discovered + /// during FilterManifests() that were deleted by path, partition, or row-filter + /// matching. Used by higher-level operations (e.g. RowDelta) to enumerate the + /// deleted data files for delete-file cleanup. + const DataFileSet& FilesToBeDeleted() const; + /// \brief Register a partition for dropping. /// /// Any manifest entry whose (spec_id, partition) pair matches will be marked DELETED. @@ -98,8 +124,15 @@ class ICEBERG_EXPORT ManifestFilterManager { /// delete path was not found in any manifest entry. void FailMissingDeletePaths(); + /// \brief Set a flag that makes FilterManifests() return an error if any + /// manifest entry matches a delete condition. + /// + /// Used by conflict-detection logic (e.g. OverwriteFiles in PR4) to detect + /// whether a concurrent delete would invalidate a set of newly written files. + void FailAnyDelete(); + /// \brief Returns true if any delete condition has been registered. - bool DeletesFiles() const; + bool ContainsDeletes() const; /// \brief Apply all accumulated delete conditions to the base snapshot's manifests. /// @@ -117,7 +150,6 @@ class ICEBERG_EXPORT ManifestFilterManager { private: struct DeleteExpr { std::shared_ptr expr; - bool case_sensitive; }; /// \brief Returns true if the manifest might contain files matching any expression. @@ -125,6 +157,12 @@ class ICEBERG_EXPORT ManifestFilterManager { const TableMetadata& metadata); /// \brief Returns true if the manifest might contain files in a dropped partition. + /// + /// Checks whether the manifest's partition_spec_id matches any spec_id registered + /// via DropPartition(). Manifests from a different spec cannot contain the dropped + /// partition values. A more precise implementation could also compare + /// partition_summaries bounds, but that requires decoding binary bounds against the + /// PartitionSpec, which is not yet available at this call site. bool CanContainDroppedPartitions(const ManifestFile& manifest); /// \brief Returns true if the manifest might contain path-deleted files. @@ -136,33 +174,33 @@ class ICEBERG_EXPORT ManifestFilterManager { /// \brief Get or create a ManifestEvaluator for the given spec and expression. Result GetManifestEvaluator(const TableMetadata& metadata, - int32_t spec_id, - const DeleteExpr& de); + int32_t spec_id, const DeleteExpr& de); - /// \brief Get or create an InclusiveMetricsEvaluator for the given spec and expression. - Result GetMetricsEvaluator(const TableMetadata& metadata, - int32_t spec_id, - const DeleteExpr& de); + /// \brief Get or create a ResidualEvaluator for the given spec and expression. + Result GetResidualEvaluator(const TableMetadata& metadata, + int32_t spec_id, const DeleteExpr& de); /// \brief Check whether a single entry should be deleted. - bool ShouldDelete(const ManifestEntry& entry, const TableMetadata& metadata, - int32_t manifest_spec_id); + Result ShouldDelete(const ManifestEntry& entry, const TableMetadata& metadata, + int32_t manifest_spec_id); const ManifestContent manifest_content_; std::shared_ptr file_io_; std::vector delete_exprs_; std::unordered_set delete_paths_; - std::unordered_set pending_paths_; + DataFileSet delete_files_; PartitionSet drop_partitions_; bool fail_missing_delete_paths_{false}; + bool fail_any_delete_{false}; + bool case_sensitive_{true}; // Cache: (spec_id, expr_index) → ManifestEvaluator std::unordered_map>> manifest_evaluator_cache_; - // Cache: (spec_id, expr_index) → InclusiveMetricsEvaluator - std::unordered_map>> - metrics_evaluator_cache_; + // Cache: (spec_id, expr_index) → ResidualEvaluator + std::unordered_map>> + residual_evaluator_cache_; }; } // namespace iceberg diff --git a/src/iceberg/manifest/manifest_merge_manager.cc b/src/iceberg/manifest/manifest_merge_manager.cc index ae0051453..aacdc7a52 100644 --- a/src/iceberg/manifest/manifest_merge_manager.cc +++ b/src/iceberg/manifest/manifest_merge_manager.cc @@ -19,7 +19,9 @@ #include "iceberg/manifest/manifest_merge_manager.h" +#include #include +#include #include #include "iceberg/manifest/manifest_entry.h" @@ -30,37 +32,48 @@ namespace iceberg { ManifestMergeManager::ManifestMergeManager(int64_t target_size_bytes, - int32_t min_count_to_merge, - bool merge_enabled) + int32_t min_count_to_merge, bool merge_enabled) : target_size_bytes_(target_size_bytes), min_count_to_merge_(min_count_to_merge), merge_enabled_(merge_enabled) {} Result> ManifestMergeManager::MergeManifests( const std::vector& existing_manifests, - const std::vector& new_manifests, const TableMetadata& metadata, - std::shared_ptr file_io, const ManifestWriterFactory& writer_factory) { + const std::vector& new_manifests, int64_t snapshot_id, + const TableMetadata& metadata, std::shared_ptr file_io, + const ManifestWriterFactory& writer_factory) { // Combine new then existing (new-first ordering is preserved in output) std::vector all; all.reserve(new_manifests.size() + existing_manifests.size()); all.insert(all.end(), new_manifests.begin(), new_manifests.end()); all.insert(all.end(), existing_manifests.begin(), existing_manifests.end()); - if (!merge_enabled_ || static_cast(all.size()) < min_count_to_merge_) { + if (all.empty() || !merge_enabled_) { return all; } - // Group manifests by partition_spec_id — never merge across specs - std::map> by_spec; + // Match Java's separate DataFileMergeManager/DeleteFileMergeManager behavior by + // tracking the first (newest) manifest independently per content type. + std::map first_by_content; + for (const auto& manifest : all) { + first_by_content.try_emplace(manifest.content, &manifest); + } + + // Group manifests by (partition_spec_id, content) — never merge across specs or + // content types. Use reverse spec ordering to match Java's reverse-TreeMap behaviour, + // which is observable in v3 tables where first-row IDs are assigned in output order. + using GroupKey = std::pair; + std::map, std::greater> by_spec; for (const auto& m : all) { - by_spec[m.partition_spec_id].push_back(m); + by_spec[{m.partition_spec_id, m.content}].push_back(m); } std::vector result; result.reserve(all.size()); - for (auto& [spec_id, group] : by_spec) { - ICEBERG_ASSIGN_OR_RAISE(auto merged, - MergeGroup(group, metadata, file_io, writer_factory)); + for (auto& [key, group] : by_spec) { + const auto* first = first_by_content.at(key.second); + ICEBERG_ASSIGN_OR_RAISE(auto merged, MergeGroup(group, *first, snapshot_id, metadata, + file_io, writer_factory)); result.insert(result.end(), std::make_move_iterator(merged.begin()), std::make_move_iterator(merged.end())); } @@ -68,45 +81,65 @@ Result> ManifestMergeManager::MergeManifests( } Result> ManifestMergeManager::MergeGroup( - const std::vector& group, const TableMetadata& metadata, - std::shared_ptr file_io, const ManifestWriterFactory& writer_factory) { - std::vector result; + const std::vector& group, const ManifestFile& first, + int64_t snapshot_id, const TableMetadata& metadata, std::shared_ptr file_io, + const ManifestWriterFactory& writer_factory) { + // Mirror Java's ListPacker.packEnd(group, ManifestFile::length) with lookback 1: + // 1. Process manifests in reverse order (oldest-first). + // 2. Greedy forward-pack with lookback=1: emit the current bin when the next item + // doesn't fit, then start a new bin. + // 3. Reverse each bin (restoring original item order within a bin). + // 4. Reverse the bin list (newest manifest's bin ends up first). + // Effect: the newest manifest is in the first, possibly under-filled, bin — exactly + // what Java's comment describes ("the manifest that gets under-filled is the first one, + // which will be merged the next time"). + std::vector> bins; std::vector current_bin; int64_t bin_size = 0; - for (const auto& manifest : group) { - if (manifest.manifest_length >= target_size_bytes_) { - // Oversized manifest passes through unchanged without affecting the current bin - result.push_back(manifest); - } else if (bin_size + manifest.manifest_length > target_size_bytes_ && - !current_bin.empty()) { - // Adding this manifest would overflow the bin — flush and start a new bin - ICEBERG_ASSIGN_OR_RAISE(auto merged, - FlushBin(current_bin, metadata, file_io, writer_factory)); - result.push_back(std::move(merged)); + for (auto it = group.rbegin(); it != group.rend(); ++it) { + const auto& manifest = *it; + if (!current_bin.empty() && + bin_size + manifest.manifest_length > target_size_bytes_) { + bins.push_back(std::move(current_bin)); current_bin.clear(); bin_size = 0; - current_bin.push_back(manifest); - bin_size = manifest.manifest_length; - } else { - current_bin.push_back(manifest); - bin_size += manifest.manifest_length; } + current_bin.push_back(manifest); + bin_size += manifest.manifest_length; } - if (!current_bin.empty()) { - ICEBERG_ASSIGN_OR_RAISE(auto merged, - FlushBin(current_bin, metadata, file_io, writer_factory)); - result.push_back(std::move(merged)); + bins.push_back(std::move(current_bin)); + } + + for (auto& bin : bins) { + std::ranges::reverse(bin); + } + std::ranges::reverse(bins); + + // Process each bin: if the bin contains the newest manifest and is too small, + // pass its contents through unchanged (mirrors Java's minCountToMerge logic). + std::vector result; + result.reserve(group.size()); + for (auto& bin : bins) { + bool contains_first = std::ranges::find(bin, first) != bin.end(); + if (contains_first && std::cmp_less(bin.size(), min_count_to_merge_)) { + result.insert(result.end(), bin.begin(), bin.end()); + } else { + ICEBERG_ASSIGN_OR_RAISE( + auto merged, FlushBin(bin, snapshot_id, metadata, file_io, writer_factory)); + result.push_back(std::move(merged)); + } } return result; } Result ManifestMergeManager::FlushBin( - const std::vector& bin, const TableMetadata& metadata, - std::shared_ptr file_io, const ManifestWriterFactory& writer_factory) { - // A single-manifest bin requires no merging + const std::vector& bin, int64_t snapshot_id, + const TableMetadata& metadata, std::shared_ptr file_io, + const ManifestWriterFactory& writer_factory) { + // A single-manifest bin requires no merging. if (bin.size() == 1) return bin[0]; const ManifestFile& first = bin[0]; @@ -119,14 +152,21 @@ Result ManifestMergeManager::FlushBin( for (const auto& manifest : bin) { ICEBERG_ASSIGN_OR_RAISE(auto reader, - ManifestReader::Make(manifest, file_io, schema, spec)); + ManifestReader::Make(manifest, file_io, schema, spec)); ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); for (const auto& entry : entries) { - // When merging, ADDED entries from prior manifests become EXISTING; - // DELETED entries are preserved as-is (tombstones). + bool is_current = + entry.snapshot_id.has_value() && entry.snapshot_id.value() == snapshot_id; if (entry.status == ManifestStatus::kDeleted) { - ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry)); + // Carry forward only the current snapshot's deletes; drop older tombstones. + if (is_current) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry)); + } + } else if (entry.status == ManifestStatus::kAdded && is_current) { + // Files added by the current snapshot retain their ADDED status. + ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry)); } else { + // Files added by prior snapshots (ADDED or EXISTING) become EXISTING. ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry)); } } @@ -135,5 +175,4 @@ Result ManifestMergeManager::FlushBin( ICEBERG_RETURN_UNEXPECTED(writer->Close()); return writer->ToManifestFile(); } - } // namespace iceberg diff --git a/src/iceberg/manifest/manifest_merge_manager.h b/src/iceberg/manifest/manifest_merge_manager.h index c26df875d..ea0e37825 100644 --- a/src/iceberg/manifest/manifest_merge_manager.h +++ b/src/iceberg/manifest/manifest_merge_manager.h @@ -58,27 +58,50 @@ class ICEBERG_EXPORT ManifestMergeManager { /// \brief Merge existing and new manifests according to configured thresholds. /// + /// Manifests are grouped by (partition_spec_id, content) — data and delete manifests + /// are never merged together. Within each group, a greedy bin-packing algorithm + /// combines manifests up to target_size_bytes. The bin that contains the newest + /// manifest for that content type is protected by min_count_to_merge: if it has fewer + /// than that many items it is passed through unchanged. + /// + /// \note Java's ManifestMergeManager additionally exposes replacedManifestsCount() + /// and cleanUncommitted(committed) for retry / rollback support. In this C++ + /// implementation those responsibilities are handled by the caller + /// (MergingSnapshotUpdate in PR2) via a tracked ManifestWriterFactory wrapper. + /// /// \param existing_manifests Manifests already in the base snapshot /// \param new_manifests Newly written manifests to incorporate + /// \param snapshot_id The ID of the snapshot being committed. Used to preserve + /// ADDED/DELETED status for entries written by this snapshot and to suppress + /// stale DELETED tombstones from prior snapshots. /// \param metadata Table metadata (provides specs and schema for readers) /// \param file_io File IO used to open existing manifests for reading /// \param writer_factory Factory to create new ManifestWriter instances /// \return The merged manifest list, or an error Result> MergeManifests( const std::vector& existing_manifests, - const std::vector& new_manifests, const TableMetadata& metadata, - std::shared_ptr file_io, const ManifestWriterFactory& writer_factory); + const std::vector& new_manifests, int64_t snapshot_id, + const TableMetadata& metadata, std::shared_ptr file_io, + const ManifestWriterFactory& writer_factory); private: /// \brief Merge a group of manifests sharing the same spec_id. /// - /// Returns the merged manifests for this group (pass-throughs + newly written). + /// \param first The overall first (newest) manifest across all groups, used to + /// apply the min_count_to_merge threshold on the bin that contains it. Result> MergeGroup( - const std::vector& group, const TableMetadata& metadata, - std::shared_ptr file_io, const ManifestWriterFactory& writer_factory); + const std::vector& group, const ManifestFile& first, + int64_t snapshot_id, const TableMetadata& metadata, std::shared_ptr file_io, + const ManifestWriterFactory& writer_factory); /// \brief Write a merged manifest from all manifests in a bin. - Result FlushBin(const std::vector& bin, + /// + /// Entries are written snapshot-aware: + /// - ADDED from snapshot_id → WriteAddedEntry (preserve status) + /// - DELETED from snapshot_id → WriteDeletedEntry (preserve tombstone) + /// - DELETED from older snapshots → dropped (stale tombstones are not carried forward) + /// - All other entries → WriteExistingEntry + Result FlushBin(const std::vector& bin, int64_t snapshot_id, const TableMetadata& metadata, std::shared_ptr file_io, const ManifestWriterFactory& writer_factory); diff --git a/src/iceberg/test/manifest_filter_manager_test.cc b/src/iceberg/test/manifest_filter_manager_test.cc index 271b054df..4057c8c70 100644 --- a/src/iceberg/test/manifest_filter_manager_test.cc +++ b/src/iceberg/test/manifest_filter_manager_test.cc @@ -27,6 +27,7 @@ #include #include "iceberg/avro/avro_register.h" +#include "iceberg/exception.h" #include "iceberg/expression/expression.h" #include "iceberg/expression/expressions.h" #include "iceberg/manifest/manifest_entry.h" @@ -84,15 +85,14 @@ class ManifestFilterManagerTest : public MinimalUpdateTestBase { ManifestWriterFactory MakeWriterFactory(const TableMetadata& metadata) { auto fv = metadata.format_version; - return [this, fv, &metadata](int32_t spec_id, - ManifestContent content) mutable + return [this, fv, &metadata](int32_t spec_id, ManifestContent content) mutable -> Result> { ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id)); ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); - auto path = std::format("{}/metadata/flt-{}.avro", table_location_, - manifest_counter_++); - return ManifestWriter::MakeWriter(fv, kTestSnapshotId, path, file_io_, spec, - schema, content); + auto path = + std::format("{}/metadata/flt-{}.avro", table_location_, manifest_counter_++); + return ManifestWriter::MakeWriter(fv, kTestSnapshotId, path, file_io_, spec, schema, + content); }; } @@ -103,7 +103,8 @@ class ManifestFilterManagerTest : public MinimalUpdateTestBase { for (const auto& m : manifests) { ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(m.partition_spec_id)); ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); - ICEBERG_ASSIGN_OR_RAISE(auto reader, ManifestReader::Make(m, file_io_, schema, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(m, file_io_, schema, spec)); ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); result.insert(result.end(), entries.begin(), entries.end()); } @@ -126,11 +127,22 @@ TEST_F(ManifestFilterManagerTest, NullSnapshotReturnsEmpty) { EXPECT_TRUE(result.empty()); } -TEST_F(ManifestFilterManagerTest, DeletesFilesReturnsCorrectState) { +TEST_F(ManifestFilterManagerTest, ContainsDeletesReturnsCorrectState) { ManifestFilterManager mgr(ManifestContent::kData, file_io_); - EXPECT_FALSE(mgr.DeletesFiles()); + EXPECT_FALSE(mgr.ContainsDeletes()); mgr.DeleteFile("/some/path.parquet"); - EXPECT_TRUE(mgr.DeletesFiles()); + EXPECT_TRUE(mgr.ContainsDeletes()); +} + +TEST_F(ManifestFilterManagerTest, DeleteByRowFilterRejectsNull) { + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + EXPECT_THROW(mgr.DeleteByRowFilter(nullptr), IcebergError); +} + +TEST_F(ManifestFilterManagerTest, DeleteFileObjectRejectsNull) { + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + std::shared_ptr null_file; + EXPECT_THROW(mgr.DeleteFile(null_file), IcebergError); } TEST_F(ManifestFilterManagerTest, NoConditionsReturnsManifestsUnchanged) { @@ -140,7 +152,7 @@ TEST_F(ManifestFilterManagerTest, NoConditionsReturnsManifestsUnchanged) { // Load original manifests so we can compare paths ICEBERG_UNWRAP_OR_FAIL(auto list_reader, - ManifestListReader::Make(snap->manifest_list, file_io_)); + ManifestListReader::Make(snap->manifest_list, file_io_)); ICEBERG_UNWRAP_OR_FAIL(auto orig_manifests, list_reader->Files()); ManifestFilterManager mgr(ManifestContent::kData, file_io_); @@ -191,8 +203,7 @@ TEST_F(ManifestFilterManagerTest, RowFilterAlwaysTrueDeletesAll) { ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata)); for (const auto& e : entries) { - EXPECT_EQ(e.status, ManifestStatus::kDeleted) - << "Expected all entries to be DELETED"; + EXPECT_EQ(e.status, ManifestStatus::kDeleted) << "Expected all entries to be DELETED"; } } @@ -209,9 +220,39 @@ TEST_F(ManifestFilterManagerTest, RowFilterAlwaysFalseDeletesNone) { ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata)); for (const auto& e : entries) { // AlwaysFalse means nothing can match → entries remain ADDED or EXISTING - EXPECT_NE(e.status, ManifestStatus::kDeleted) - << "Expected no entries to be DELETED"; + EXPECT_NE(e.status, ManifestStatus::kDeleted) << "Expected no entries to be DELETED"; + } +} + +TEST_F(ManifestFilterManagerTest, RowFilterUsesPartitionResiduals) { + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_})); + auto* metadata = table_->metadata().get(); + auto factory = MakeWriterFactory(*metadata); + + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.CaseSensitive(false); + mgr.DeleteByRowFilter(Expressions::Equal("X", Literal::Long(1L))); + + ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, factory)); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata)); + int deleted_count = 0; + int live_count = 0; + for (const auto& e : entries) { + ASSERT_NE(e.data_file, nullptr); + if (e.status == ManifestStatus::kDeleted) { + ++deleted_count; + EXPECT_EQ(e.data_file->file_path, file_a_->file_path); + } else { + ++live_count; + EXPECT_EQ(e.data_file->file_path, file_b_->file_path); + } } + + EXPECT_EQ(deleted_count, 1); + EXPECT_EQ(live_count, 1); + ASSERT_EQ(mgr.FilesToBeDeleted().size(), 1U); + EXPECT_EQ(mgr.FilesToBeDeleted().begin()->get()->file_path, file_a_->file_path); } TEST_F(ManifestFilterManagerTest, DropPartition) { diff --git a/src/iceberg/test/manifest_merge_manager_test.cc b/src/iceberg/test/manifest_merge_manager_test.cc index 14f23893f..1915bda5d 100644 --- a/src/iceberg/test/manifest_merge_manager_test.cc +++ b/src/iceberg/test/manifest_merge_manager_test.cc @@ -69,13 +69,11 @@ class ManifestMergeManagerTest : public ::testing::Test { schema_ = std::make_shared(std::vector{ SchemaField::MakeRequired(1, "x", int64()), }); - spec0_ = PartitionSpec::Make( - kSpecId0, - {PartitionField(1, 1000, "x", Transform::Identity())}) + spec0_ = PartitionSpec::Make(kSpecId0, + {PartitionField(1, 1000, "x", Transform::Identity())}) .value(); spec1_ = PartitionSpec::Make( - kSpecId1, - {PartitionField(1, 1001, "x_bucket", Transform::Bucket(8))}) + kSpecId1, {PartitionField(1, 1001, "x_bucket", Transform::Bucket(8))}) .value(); // Build minimal TableMetadata with both specs @@ -90,16 +88,18 @@ class ManifestMergeManagerTest : public ::testing::Test { // Write a small manifest with N data files and return the ManifestFile descriptor. Result WriteManifest(int32_t spec_id, int num_files, - int64_t file_size_override = 512) { + int64_t file_size_override = 512, + ManifestContent content = ManifestContent::kData) { auto path = std::format("manifest-{}.avro", manifest_counter_++); auto spec = spec_id == kSpecId0 ? spec0_ : spec1_; ICEBERG_ASSIGN_OR_RAISE(auto writer, - ManifestWriter::MakeWriter(kFormatVersion, kSnapshotId, path, - file_io_, spec, schema_, - ManifestContent::kData)); + ManifestWriter::MakeWriter(kFormatVersion, kSnapshotId, path, + file_io_, spec, schema_, content)); for (int i = 0; i < num_files; ++i) { auto f = std::make_shared(); - f->content = DataFile::Content::kData; + f->content = (content == ManifestContent::kDeletes) + ? DataFile::Content::kPositionDeletes + : DataFile::Content::kData; f->file_path = std::format("data/file-{}-{}.parquet", manifest_counter_, i); f->file_format = FileFormatType::kParquet; // Identity spec uses LONG partition values; Bucket spec uses INT @@ -118,13 +118,13 @@ class ManifestMergeManagerTest : public ::testing::Test { } ManifestWriterFactory MakeWriterFactory() { - return [this](int32_t spec_id, ManifestContent content) - -> Result> { + return [this](int32_t spec_id, + ManifestContent content) -> Result> { ++factory_call_count_; auto spec = spec_id == kSpecId0 ? spec0_ : spec1_; auto path = std::format("merged-{}.avro", manifest_counter_++); - return ManifestWriter::MakeWriter(kFormatVersion, kSnapshotId, path, file_io_, - spec, schema_, content); + return ManifestWriter::MakeWriter(kFormatVersion, kSnapshotId, path, file_io_, spec, + schema_, content); }; } @@ -133,7 +133,8 @@ class ManifestMergeManagerTest : public ::testing::Test { int total = 0; for (const auto& m : manifests) { auto spec = m.partition_spec_id == kSpecId0 ? spec0_ : spec1_; - ICEBERG_ASSIGN_OR_RAISE(auto reader, ManifestReader::Make(m, file_io_, schema_, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(m, file_io_, schema_, spec)); ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); total += static_cast(entries.size()); } @@ -156,8 +157,8 @@ TEST_F(ManifestMergeManagerTest, MergeDisabled) { ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/false); ICEBERG_UNWRAP_OR_FAIL( - auto result, mgr.MergeManifests({m0, m1}, {m2}, *metadata_, file_io_, - MakeWriterFactory())); + auto result, mgr.MergeManifests({m0, m1}, {m2}, kSnapshotId, *metadata_, file_io_, + MakeWriterFactory())); // merge disabled → all 3 manifests returned, factory never called EXPECT_EQ(result.size(), 3U); EXPECT_EQ(factory_call_count_, 0); @@ -169,8 +170,9 @@ TEST_F(ManifestMergeManagerTest, BelowMinCountThreshold) { // min_count=3, only 2 manifests total → no merge ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true); - ICEBERG_UNWRAP_OR_FAIL( - auto result, mgr.MergeManifests({m0}, {m1}, *metadata_, file_io_, MakeWriterFactory())); + ICEBERG_UNWRAP_OR_FAIL(auto result, + mgr.MergeManifests({m0}, {m1}, kSnapshotId, *metadata_, file_io_, + MakeWriterFactory())); EXPECT_EQ(result.size(), 2U); EXPECT_EQ(factory_call_count_, 0); } @@ -183,8 +185,8 @@ TEST_F(ManifestMergeManagerTest, MergeOccursAtThreshold) { ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true); ICEBERG_UNWRAP_OR_FAIL( - auto result, mgr.MergeManifests({m0, m1}, {m2}, *metadata_, file_io_, - MakeWriterFactory())); + auto result, mgr.MergeManifests({m0, m1}, {m2}, kSnapshotId, *metadata_, file_io_, + MakeWriterFactory())); // All 3 merged into 1 manifest (total 3 entries) EXPECT_EQ(result.size(), 1U); ICEBERG_UNWRAP_OR_FAIL(auto count1, CountEntries(result)); @@ -198,12 +200,13 @@ TEST_F(ManifestMergeManagerTest, OversizedManifestPassedThrough) { ICEBERG_UNWRAP_OR_FAIL(auto m_small2, WriteManifest(kSpecId0, 1, /*size=*/100)); ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true); - ICEBERG_UNWRAP_OR_FAIL( - auto result, - mgr.MergeManifests({m_large, m_small}, {m_small2}, *metadata_, file_io_, - MakeWriterFactory())); - // m_large passes through; m_small and m_small2 merge into 1 - EXPECT_EQ(result.size(), 2U); + ICEBERG_UNWRAP_OR_FAIL(auto result, + mgr.MergeManifests({m_large, m_small}, {m_small2}, kSnapshotId, + *metadata_, file_io_, MakeWriterFactory())); + // m_large is oversized and acts as a bin boundary — the two small manifests on either + // side of it are never merged together. m_small2 (the newest) is also protected by + // minCountToMerge (size 1 < 2). All three remain separate. + EXPECT_EQ(result.size(), 3U); ICEBERG_UNWRAP_OR_FAIL(auto count2, CountEntries(result)); EXPECT_EQ(count2, 4); // 2 + 1 + 1 } @@ -219,8 +222,8 @@ TEST_F(ManifestMergeManagerTest, CrossSpecManifestsNotMerged) { ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true); ICEBERG_UNWRAP_OR_FAIL( auto result, - mgr.MergeManifests({m_spec0a, m_spec1a}, {m_spec0b, m_spec1b}, *metadata_, - file_io_, MakeWriterFactory())); + mgr.MergeManifests({m_spec0a, m_spec1a}, {m_spec0b, m_spec1b}, kSnapshotId, + *metadata_, file_io_, MakeWriterFactory())); EXPECT_EQ(result.size(), 2U); // Verify spec IDs are preserved per output manifest for (const auto& m : result) { @@ -236,11 +239,119 @@ TEST_F(ManifestMergeManagerTest, WriterFactoryCalledOncePerMergedManifest) { ICEBERG_UNWRAP_OR_FAIL(auto m3, WriteManifest(kSpecId1, 1, /*size=*/100)); ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true); - ICEBERG_UNWRAP_OR_FAIL( - auto result, - mgr.MergeManifests({m0, m2}, {m1, m3}, *metadata_, file_io_, MakeWriterFactory())); + ICEBERG_UNWRAP_OR_FAIL(auto result, + mgr.MergeManifests({m0, m2}, {m1, m3}, kSnapshotId, *metadata_, + file_io_, MakeWriterFactory())); EXPECT_EQ(result.size(), 2U); EXPECT_EQ(factory_call_count_, 2); } +TEST_F(ManifestMergeManagerTest, MixedContentManifestsNotMerged) { + // Data and delete manifests sharing the same spec_id must never be merged together. + // The grouping key is (spec_id, content), so they land in separate bins. + ICEBERG_UNWRAP_OR_FAIL( + auto d0, WriteManifest(kSpecId0, 1, /*size=*/100, ManifestContent::kData)); + ICEBERG_UNWRAP_OR_FAIL( + auto d1, WriteManifest(kSpecId0, 1, /*size=*/100, ManifestContent::kData)); + ICEBERG_UNWRAP_OR_FAIL( + auto del0, WriteManifest(kSpecId0, 1, /*size=*/100, ManifestContent::kDeletes)); + ICEBERG_UNWRAP_OR_FAIL( + auto del1, WriteManifest(kSpecId0, 1, /*size=*/100, ManifestContent::kDeletes)); + + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL( + auto result, mgr.MergeManifests({d0, del0}, {d1, del1}, kSnapshotId, *metadata_, + file_io_, MakeWriterFactory())); + // 2 data → 1 merged data manifest; 2 delete → 1 merged delete manifest + EXPECT_EQ(result.size(), 2U); + int data_count = 0; + int delete_count = 0; + for (const auto& m : result) { + if (m.content == ManifestContent::kData) ++data_count; + if (m.content == ManifestContent::kDeletes) ++delete_count; + } + EXPECT_EQ(data_count, 1); + EXPECT_EQ(delete_count, 1); +} + +TEST_F(ManifestMergeManagerTest, MixedContentUsesFirstManifestPerContent) { + ICEBERG_UNWRAP_OR_FAIL( + auto d0, WriteManifest(kSpecId0, 1, /*size=*/100, ManifestContent::kData)); + ICEBERG_UNWRAP_OR_FAIL( + auto d1, WriteManifest(kSpecId0, 1, /*size=*/100, ManifestContent::kData)); + ICEBERG_UNWRAP_OR_FAIL( + auto del0, WriteManifest(kSpecId0, 1, /*size=*/100, ManifestContent::kDeletes)); + ICEBERG_UNWRAP_OR_FAIL( + auto del1, WriteManifest(kSpecId0, 1, /*size=*/100, ManifestContent::kDeletes)); + + // Java applies minCountToMerge independently in DataFileMergeManager and + // DeleteFileMergeManager. With mixed input, each content type's newest manifest must + // be protected by the threshold independently. + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL( + auto result, mgr.MergeManifests({d0, del0}, {d1, del1}, kSnapshotId, *metadata_, + file_io_, MakeWriterFactory())); + + // Each content type has exactly two manifests, below min_count=3, so neither pair + // should be merged. + ASSERT_EQ(result.size(), 4U); + int data_count = 0; + int delete_count = 0; + for (const auto& manifest : result) { + if (manifest.content == ManifestContent::kData) { + ++data_count; + } else if (manifest.content == ManifestContent::kDeletes) { + ++delete_count; + } + } + EXPECT_EQ(data_count, 2); + EXPECT_EQ(delete_count, 2); +} + +TEST_F(ManifestMergeManagerTest, DeleteManifestsMerged) { + // Delete manifests are bin-packed and merged just like data manifests. + ICEBERG_UNWRAP_OR_FAIL( + auto del0, WriteManifest(kSpecId0, 1, /*size=*/100, ManifestContent::kDeletes)); + ICEBERG_UNWRAP_OR_FAIL( + auto del1, WriteManifest(kSpecId0, 1, /*size=*/100, ManifestContent::kDeletes)); + ICEBERG_UNWRAP_OR_FAIL( + auto del2, WriteManifest(kSpecId0, 1, /*size=*/100, ManifestContent::kDeletes)); + + ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL(auto result, + mgr.MergeManifests({del0, del1}, {del2}, kSnapshotId, *metadata_, + file_io_, MakeWriterFactory())); + EXPECT_EQ(result.size(), 1U); + EXPECT_EQ(result[0].content, ManifestContent::kDeletes); + ICEBERG_UNWRAP_OR_FAIL(auto count, CountEntries(result)); + EXPECT_EQ(count, 3); +} + +TEST_F(ManifestMergeManagerTest, PackEndOlderManifestsMergedNotNewest) { + // packEnd semantics: for [m0_new, m1_old, m2_old] with target=250 (pairs fit but + // triples don't), Java packs from the end so m1+m2 (the older pair) get merged and + // m0 (the newest) is left in its own under-filled bin at the front of the output. + // This is the opposite of naive forward packing, which would merge m0+m1. + ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1, /*size=*/100)); + + // target=250 fits two 100-byte manifests but not three. + // min_count=3 so m0's single-element bin is kept as-is (below threshold). + ManifestMergeManager mgr(/*target=*/250, /*min_count=*/3, /*enabled=*/true); + ICEBERG_UNWRAP_OR_FAIL( + auto result, mgr.MergeManifests({m1, m2}, {m0}, kSnapshotId, *metadata_, file_io_, + MakeWriterFactory())); + // Expected: [m0 (pass-through), merged(m1+m2)] + ASSERT_EQ(result.size(), 2U); + // First output is the newest manifest m0, passed through unchanged (under-filled bin). + EXPECT_EQ(result[0].manifest_length, m0.manifest_length); + // Second output is the merged older pair — it must be a newly written manifest + // (different path than either original). + EXPECT_NE(result[1].manifest_path, m1.manifest_path); + EXPECT_NE(result[1].manifest_path, m2.manifest_path); + ICEBERG_UNWRAP_OR_FAIL(auto count, CountEntries(result)); + EXPECT_EQ(count, 3); +} + } // namespace iceberg From ca5c34ab0d9fe901e8f765ac5be054ebf9919dfc Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Thu, 14 May 2026 13:08:28 +0800 Subject: [PATCH 3/3] fix(manifest): fix clang-tidy warnings in ManifestMergeManager --- src/iceberg/manifest/manifest_merge_manager.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/iceberg/manifest/manifest_merge_manager.cc b/src/iceberg/manifest/manifest_merge_manager.cc index aacdc7a52..28fe016d4 100644 --- a/src/iceberg/manifest/manifest_merge_manager.cc +++ b/src/iceberg/manifest/manifest_merge_manager.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -63,7 +64,7 @@ Result> ManifestMergeManager::MergeManifests( // content types. Use reverse spec ordering to match Java's reverse-TreeMap behaviour, // which is observable in v3 tables where first-row IDs are assigned in output order. using GroupKey = std::pair; - std::map, std::greater> by_spec; + std::map, std::greater<>> by_spec; for (const auto& m : all) { by_spec[{m.partition_spec_id, m.content}].push_back(m); } @@ -97,8 +98,7 @@ Result> ManifestMergeManager::MergeGroup( std::vector current_bin; int64_t bin_size = 0; - for (auto it = group.rbegin(); it != group.rend(); ++it) { - const auto& manifest = *it; + for (const auto& manifest : std::views::reverse(group)) { if (!current_bin.empty() && bin_size + manifest.manifest_length > target_size_bytes_) { bins.push_back(std::move(current_bin));