From 769ad57b1b8334ce43102e5d8f7bea28aa263e13 Mon Sep 17 00:00:00 2001 From: Timur Yusupov Date: Wed, 1 Nov 2023 23:42:49 +0300 Subject: [PATCH] [BACKPORT 2.14.10][#19730] docdb: added SST write retries on corruption Summary: Added support for `rocksdb_max_sst_write_retries` flag: maximum allowed number of attempts to write SST file in case of detected corruption after write (by default 0 which means no retries). Implemented for both flushes and compactions. For now, we only support retries when sub-compaction results in single output file (which is the case for yugabyte-db as of today). If in future we detect corruption after the first output file in case of multiple sub-compaction output files and retries are enabled, DFATAL will be logged and no retry will be done. Original commit: f58c809658e350fc60b6a8339992cde011052beb / D29784 Backported from 2.14 commit: daf620d87613ae7b0a745f5bc08d0584f3e427a8 / D29953 Jira: DB-8560 Test Plan: Jenkins: urgent DBTest.SstTailZerosCheckFlushRetries and DBTest.SstTailZerosCheckCompactionRetries Reviewers: bogdan, timur, rthallam Reviewed By: bogdan Subscribers: ybase Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D30001 --- src/yb/rocksdb/db/builder.cc | 256 +++++++++------- src/yb/rocksdb/db/compaction_job.cc | 450 ++++++++++++++++------------ src/yb/rocksdb/db/compaction_job.h | 10 +- src/yb/rocksdb/db/db_sst_test.cc | 67 ++++- src/yb/rocksdb/env.h | 3 +- src/yb/rocksdb/util/env.cc | 6 +- 6 files changed, 486 insertions(+), 306 deletions(-) diff --git a/src/yb/rocksdb/db/builder.cc b/src/yb/rocksdb/db/builder.cc index 72780b57e169..c87e28490781 100644 --- a/src/yb/rocksdb/db/builder.cc +++ b/src/yb/rocksdb/db/builder.cc @@ -55,6 +55,12 @@ DECLARE_uint64(rocksdb_check_sst_file_tail_for_zeros); +DEFINE_uint64(rocksdb_max_sst_write_retries, 0, + "Maximum allowed number of retries to write SST file in case of detected corruption after " + "write."); +TAG_FLAG(rocksdb_max_sst_write_retries, runtime); +TAG_FLAG(rocksdb_max_sst_write_retries, advanced); + namespace rocksdb { class TableFactory; @@ -128,138 +134,164 @@ Status BuildTable(const std::string& dbname, TableProperties* table_properties) { // Reports the IOStats for flush for every following bytes. const size_t kReportFlushIOStatsEvery = 1048576; + auto* env = db_options.env; + const bool is_split_sst = ioptions.table_factory->IsSplitSstForWriteSupported(); + const std::string base_fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(), + meta->fd.GetPathId()); + const std::string data_fname = TableBaseToDataFileName(base_fname); + Status s; - meta->fd.total_file_size = 0; - meta->fd.base_file_size = 0; - iter->SeekToFirst(); + auto num_retries_left = FLAGS_rocksdb_max_sst_write_retries; + for (;;) { + s = Status::OK(); + meta->fd.total_file_size = 0; + meta->fd.base_file_size = 0; + iter->SeekToFirst(); - const bool is_split_sst = ioptions.table_factory->IsSplitSstForWriteSupported(); + bool do_retry = false; - const std::string base_fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(), - meta->fd.GetPathId()); - const std::string data_fname = is_split_sst ? TableBaseToDataFileName(base_fname) : ""; - if (iter->Valid()) { - std::shared_ptr base_file_writer; - std::shared_ptr data_file_writer; - s = CreateWritableFileWriter(base_fname, env_options, io_priority, env, &base_file_writer); - if (s.ok() && is_split_sst) { - s = CreateWritableFileWriter(data_fname, env_options, io_priority, env, &data_file_writer); - } - if (!s.ok()) { - return s; - } - std::unique_ptr builder(NewTableBuilder( - ioptions, internal_comparator, int_tbl_prop_collector_factories, - column_family_id, base_file_writer.get(), data_file_writer.get(), compression, - compression_opts)); - - MergeHelper merge(env, internal_comparator->user_comparator(), - ioptions.merge_operator, nullptr, ioptions.info_log, - ioptions.min_partial_merge_operands, - true /* internal key corruption is not ok */, - snapshots.empty() ? 0 : snapshots.back()); - - CompactionIterator c_iter(iter, internal_comparator->user_comparator(), - &merge, kMaxSequenceNumber, &snapshots, - earliest_write_conflict_snapshot, - true /* internal key corruption is not ok */); - c_iter.SeekToFirst(); - const bool non_empty = c_iter.Valid(); - if (non_empty) { - meta->UpdateKey(c_iter.key(), UpdateBoundariesType::kSmallest); - } + if (iter->Valid()) { + std::shared_ptr base_file_writer; + std::shared_ptr data_file_writer; + s = CreateWritableFileWriter(base_fname, env_options, io_priority, env, &base_file_writer); + if (s.ok() && is_split_sst) { + s = CreateWritableFileWriter( + data_fname, env_options, io_priority, env, &data_file_writer); + } + if (!s.ok()) { + return s; + } + std::unique_ptr builder(NewTableBuilder( + ioptions, internal_comparator, int_tbl_prop_collector_factories, column_family_id, + base_file_writer.get(), data_file_writer.get(), compression, compression_opts)); + + MergeHelper merge( + env, internal_comparator->user_comparator(), ioptions.merge_operator, nullptr, + ioptions.info_log, ioptions.min_partial_merge_operands, + true /* internal key corruption is not ok */, snapshots.empty() ? 0 : snapshots.back()); - boost::container::small_vector user_values; - for (; c_iter.Valid(); c_iter.Next()) { - const Slice& key = c_iter.key(); - const Slice& value = c_iter.value(); - builder->Add(key, value); - meta->UpdateBoundarySeqNo(GetInternalKeySeqno(key)); - if (db_options.boundary_extractor) { - user_values.clear(); - auto status = db_options.boundary_extractor->Extract(ExtractUserKey(key), &user_values); - if (!status.ok()) { + CompactionIterator c_iter( + iter, internal_comparator->user_comparator(), &merge, kMaxSequenceNumber, &snapshots, + earliest_write_conflict_snapshot, true /* internal key corruption is not ok */); + c_iter.SeekToFirst(); + const bool non_empty = c_iter.Valid(); + if (non_empty) { + meta->UpdateKey(c_iter.key(), UpdateBoundariesType::kSmallest); + } + + boost::container::small_vector user_values; + for (; c_iter.Valid(); c_iter.Next()) { + const Slice& key = c_iter.key(); + const Slice& value = c_iter.value(); + builder->Add(key, value); + meta->UpdateBoundarySeqNo(GetInternalKeySeqno(key)); + if (db_options.boundary_extractor) { + user_values.clear(); + auto status = db_options.boundary_extractor->Extract(ExtractUserKey(key), &user_values); + if (!status.ok()) { + builder->Abandon(); + return status; + } + meta->UpdateBoundaryUserValues(user_values, UpdateBoundariesType::kAll); + } + } + + if (non_empty) { + meta->UpdateKey(builder->LastKey(), UpdateBoundariesType::kLargest); + } + + // Finish and check for builder errors + bool empty = builder->NumEntries() == 0; + s = c_iter.status(); + if (!s.ok() || empty) { builder->Abandon(); - return status; + } else { + s = builder->Finish(); } - meta->UpdateBoundaryUserValues(user_values, UpdateBoundariesType::kAll); - } - } - if (non_empty) { - meta->UpdateKey(builder->LastKey(), UpdateBoundariesType::kLargest); - } + if (s.ok() && !empty) { + meta->fd.total_file_size = builder->TotalFileSize(); + meta->fd.base_file_size = builder->BaseFileSize(); + meta->marked_for_compaction = builder->NeedCompact(); + assert(meta->fd.GetTotalFileSize() > 0); + if (table_properties) { + *table_properties = builder->GetTableProperties(); + } + } - // Finish and check for builder errors - bool empty = builder->NumEntries() == 0; - s = c_iter.status(); - if (!s.ok() || empty) { - builder->Abandon(); - } else { - s = builder->Finish(); - } + // Finish and check for file errors + if (s.ok() && !empty && !ioptions.disable_data_sync) { + if (is_split_sst) { + RETURN_NOT_OK(data_file_writer->Sync(ioptions.use_fsync)); + } + RETURN_NOT_OK(base_file_writer->Sync(ioptions.use_fsync)); + } + if (s.ok() && !empty && is_split_sst) { + s = data_file_writer->Close(); + } + if (s.ok() && !empty) { + s = base_file_writer->Close(); + } - if (s.ok() && !empty) { - meta->fd.total_file_size = builder->TotalFileSize(); - meta->fd.base_file_size = builder->BaseFileSize(); - meta->marked_for_compaction = builder->NeedCompact(); - assert(meta->fd.GetTotalFileSize() > 0); - if (table_properties) { - *table_properties = builder->GetTableProperties(); - } - } + const auto rocksdb_check_sst_file_tail_for_zeros = + FLAGS_rocksdb_check_sst_file_tail_for_zeros; + if (s.ok() && !empty && PREDICT_FALSE(rocksdb_check_sst_file_tail_for_zeros > 0)) { + s = CheckSstTailForZeros( + db_options, env_options, is_split_sst ? data_fname : base_fname, + rocksdb_check_sst_file_tail_for_zeros); + if (!s.ok()) { + // Retry flush if more attempts are allowed. + do_retry = true; + } + } - // Finish and check for file errors - if (s.ok() && !empty && !ioptions.disable_data_sync) { - if (is_split_sst) { - RETURN_NOT_OK(data_file_writer->Sync(ioptions.use_fsync)); + if (s.ok() && !empty) { + // Verify that the table is usable + std::unique_ptr it(table_cache->NewIterator( + ReadOptions(), env_options, internal_comparator, meta->fd, meta->UserFilter(), + nullptr, (internal_stats == nullptr) ? nullptr : internal_stats->GetFileReadHist(0), + false)); + s = it->status(); + if (s.ok() && paranoid_file_checks) { + for (it->SeekToFirst(); it->Valid(); it->Next()) { + } + s = it->status(); + } + if (!s.ok()) { + // Retry flush if more attempts are allowed. + do_retry = true; + } + } } - RETURN_NOT_OK(base_file_writer->Sync(ioptions.use_fsync)); - } - if (s.ok() && !empty && is_split_sst) { - s = data_file_writer->Close(); - } - if (s.ok() && !empty) { - s = base_file_writer->Close(); - } - const auto rocksdb_check_sst_file_tail_for_zeros = - FLAGS_rocksdb_check_sst_file_tail_for_zeros; - if (s.ok() && !empty && PREDICT_FALSE(rocksdb_check_sst_file_tail_for_zeros > 0)) { - s = CheckSstTailForZeros( - db_options, env_options, is_split_sst ? data_fname : base_fname, - rocksdb_check_sst_file_tail_for_zeros); - } + // Check for input iterator errors + if (!iter->status().ok()) { + s = iter->status(); + } - if (s.ok() && !empty) { - // Verify that the table is usable - std::unique_ptr it(table_cache->NewIterator( - ReadOptions(), env_options, internal_comparator, meta->fd, meta->UserFilter(), nullptr, - (internal_stats == nullptr) ? nullptr - : internal_stats->GetFileReadHist(0), - false)); - s = it->status(); - if (s.ok() && paranoid_file_checks) { - for (it->SeekToFirst(); it->Valid(); it->Next()) { + if (!s.ok() || meta->fd.GetTotalFileSize() == 0) { + if (!env->CleanupFile(base_fname, db_options.log_prefix)) { + do_retry = false; + } + if (is_split_sst) { + if (!env->CleanupFile(data_fname, db_options.log_prefix)) { + do_retry = false; + } } - s = it->status(); } - } - } - // Check for input iterator errors - if (!iter->status().ok()) { - s = iter->status(); - } + if (!do_retry || num_retries_left == 0) { + break; + } - if (!s.ok() || meta->fd.GetTotalFileSize() == 0) { - env->CleanupFile(base_fname); - if (is_split_sst) { - env->CleanupFile(data_fname); - } + --num_retries_left; + RLOG( + InfoLogLevel::INFO_LEVEL, db_options.info_log, "Retrying flush to %s", + base_fname.c_str()); } + return s; } diff --git a/src/yb/rocksdb/db/compaction_job.cc b/src/yb/rocksdb/db/compaction_job.cc index 4c3935bf26d2..162fa0b55e15 100644 --- a/src/yb/rocksdb/db/compaction_job.cc +++ b/src/yb/rocksdb/db/compaction_job.cc @@ -28,57 +28,51 @@ #endif #include -#include #include #include #include -#include #include -#include #include -#include #include #include +#include "yb/rocksdb/db.h" +#include "yb/rocksdb/env.h" +#include "yb/rocksdb/perf_level.h" +#include "yb/rocksdb/statistics.h" +#include "yb/rocksdb/table.h" + #include "yb/rocksdb/db/builder.h" #include "yb/rocksdb/db/compaction_context.h" #include "yb/rocksdb/db/dbformat.h" #include "yb/rocksdb/db/event_helpers.h" #include "yb/rocksdb/db/filename.h" #include "yb/rocksdb/db/file_numbers.h" -#include "yb/rocksdb/db/log_reader.h" -#include "yb/rocksdb/db/log_writer.h" #include "yb/rocksdb/db/memtable.h" #include "yb/rocksdb/db/memtable_list.h" #include "yb/rocksdb/db/merge_helper.h" #include "yb/rocksdb/db/version_set.h" -#include "yb/rocksdb/port/likely.h" -#include "yb/rocksdb/port/port.h" -#include "yb/rocksdb/db.h" -#include "yb/rocksdb/env.h" -#include "yb/rocksdb/statistics.h" -#include "yb/rocksdb/status.h" -#include "yb/rocksdb/table.h" + #include "yb/rocksdb/table/internal_iterator.h" #include "yb/rocksdb/table/table_builder.h" -#include "yb/rocksdb/util/coding.h" + #include "yb/rocksdb/util/file_reader_writer.h" +#include "yb/rocksdb/util/file_util.h" #include "yb/rocksdb/util/log_buffer.h" #include "yb/rocksdb/util/logging.h" #include "yb/rocksdb/util/sst_file_manager_impl.h" #include "yb/rocksdb/util/mutexlock.h" -#include "yb/rocksdb/perf_level.h" #include "yb/rocksdb/util/stop_watch.h" #include "yb/rocksdb/util/sync_point.h" #include "yb/util/logging.h" #include "yb/util/result.h" -#include "yb/util/stats/perf_step_timer.h" #include "yb/util/stats/iostats_context_imp.h" #include "yb/util/string_util.h" DECLARE_uint64(rocksdb_check_sst_file_tail_for_zeros); +DECLARE_uint64(rocksdb_max_sst_write_retries); namespace rocksdb { @@ -105,6 +99,9 @@ struct CompactionJob::SubcompactionState : public CompactionFeed { // State kept for output being generated std::vector outputs; + + // Current output: + FileNumber output_file_number; std::unique_ptr base_outfile; std::unique_ptr data_outfile; std::unique_ptr builder; @@ -600,112 +597,157 @@ void CompactionJob::ProcessKeyValueCompaction( prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); } - ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); - auto compaction_filter = cfd->ioptions()->compaction_filter; + const auto* const cfd = sub_compact->compaction->column_family_data(); + auto* compaction_filter = cfd->ioptions()->compaction_filter; + std::unique_ptr compaction_filter_from_factory = nullptr; - if (compaction_filter == nullptr) { - compaction_filter_from_factory = - sub_compact->compaction->CreateCompactionFilter(); - compaction_filter = compaction_filter_from_factory.get(); - } + Status status; - MergeHelper merge( - env_, cfd->user_comparator(), cfd->ioptions()->merge_operator, - compaction_filter, db_options_.info_log.get(), - cfd->ioptions()->min_partial_merge_operands, - false /* internal key corruption is expected */, - existing_snapshots_.empty() ? 0 : existing_snapshots_.back(), - compact_->compaction->level(), db_options_.statistics.get()); - - TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); - - Slice* start = sub_compact->start; - Slice* end = sub_compact->end; - if (start != nullptr) { - IterKey start_iter; - start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek); - input->Seek(start_iter.GetKey()); - } else { - input->SeekToFirst(); - } + // Clear, so it will be set by using file_numbers_provider_ later. + sub_compact->output_file_number = 0; + + auto num_retries_left = FLAGS_rocksdb_max_sst_write_retries; + for(;;) { + status = Status::OK(); + // We want to remove output file in case of detected corruption if we are going to retry write. + const auto retries_allowed = + [&num_retries_left, &outputs = sub_compact->outputs] { + // Only support retrying compaction if the first output file has been corrupted. + // As of 2023-11-01 yugabyte-db always generated single output file as a result of + // compaction. + return outputs.size() == 1 && num_retries_left > 0; + }; + + if (compaction_filter == nullptr) { + compaction_filter_from_factory = sub_compact->compaction->CreateCompactionFilter(); + compaction_filter = compaction_filter_from_factory.get(); + } - if (db_options_.compaction_context_factory) { - auto context = CompactionContextOptions { - .level0_inputs = *compact_->compaction->inputs(0), - .boundary_extractor = sub_compact->boundary_extractor, - }; - sub_compact->context = (*db_options_.compaction_context_factory)(sub_compact, context); - sub_compact->feed = sub_compact->context->Feed(); - } else { - sub_compact->feed = sub_compact; - } + MergeHelper merge( + env_, cfd->user_comparator(), cfd->ioptions()->merge_operator, compaction_filter, + db_options_.info_log.get(), cfd->ioptions()->min_partial_merge_operands, + false /* internal key corruption is expected */, + existing_snapshots_.empty() ? 0 : existing_snapshots_.back(), compact_->compaction->level(), + db_options_.statistics.get()); + + TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); + + Slice* start = sub_compact->start; + Slice* end = sub_compact->end; + if (start != nullptr) { + IterKey start_iter; + start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek); + input->Seek(start_iter.GetKey()); + } else { + input->SeekToFirst(); + } - Status status; - sub_compact->c_iter = std::make_unique( - input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), - &existing_snapshots_, earliest_write_conflict_snapshot_, false, - sub_compact->compaction, compaction_filter); + if (db_options_.compaction_context_factory) { + auto context = CompactionContextOptions{ + .level0_inputs = *compact_->compaction->inputs(0), + .boundary_extractor = sub_compact->boundary_extractor, + }; + sub_compact->context = (*db_options_.compaction_context_factory)(sub_compact, context); + sub_compact->feed = sub_compact->context->Feed(); + } else { + sub_compact->feed = sub_compact; + } - if (sub_compact->context) { - sub_compact->c_iter->AddLiveRanges(sub_compact->context->GetLiveRanges()); - } + sub_compact->c_iter = std::make_unique( + input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), + &existing_snapshots_, earliest_write_conflict_snapshot_, false, sub_compact->compaction, + compaction_filter); - sub_compact->open_compaction_output_file = [this, holder, sub_compact]() { - return OpenCompactionOutputFile(holder, sub_compact); - }; + if (sub_compact->context) { + sub_compact->c_iter->AddLiveRanges(sub_compact->context->GetLiveRanges()); + } - auto c_iter = sub_compact->c_iter.get(); - c_iter->SeekToFirst(); - const auto& c_iter_stats = c_iter->iter_stats(); - // TODO(noetzli): check whether we could check !shutting_down_->... only - // only occasionally (see diff D42687) - while (status.ok() && !shutting_down_->load(std::memory_order_acquire) && - !cfd->IsDropped() && c_iter->Valid()) { - // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() - // returns true. - const Slice& key = c_iter->key(); - const Slice& value = c_iter->value(); - - // If an end key (exclusive) is specified, check if the current key is - // >= than it and exit if it is because the iterator is out of its range - if (end != nullptr && - cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) { - break; - } else if (sub_compact->compaction->ShouldStopBefore(key) && - sub_compact->builder != nullptr) { - status = FinishCompactionOutputFile(input->status(), sub_compact); + sub_compact->open_compaction_output_file = [this, holder, sub_compact]() { + if (sub_compact->output_file_number == 0) { + // This is a first attempt - generate file number. + sub_compact->output_file_number = file_numbers_provider_->NewFileNumber(holder); + } + return OpenCompactionOutputFile(sub_compact->output_file_number, sub_compact); + }; + + auto c_iter = sub_compact->c_iter.get(); + c_iter->SeekToFirst(); + const auto& c_iter_stats = c_iter->iter_stats(); + + // TODO(noetzli): check whether we could check !shutting_down_->... only + // only occasionally (see diff D42687) + while (status.ok() && !shutting_down_->load(std::memory_order_acquire) && !cfd->IsDropped() && + c_iter->Valid()) { + // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() + // returns true. + const Slice& key = c_iter->key(); + const Slice& value = c_iter->value(); + + // If an end key (exclusive) is specified, check if the current key is + // >= than it and exit if it is because the iterator is out of its range + if (end != nullptr && cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) { + break; + } else if ( + sub_compact->compaction->ShouldStopBefore(key) && sub_compact->builder != nullptr) { + status = FinishCompactionOutputFile( + input->status(), sub_compact, ShouldDeleteCorruptedFile{retries_allowed()}); + if (!status.ok()) { + break; + } + } + + if (c_iter_stats.num_input_records % kRecordStatsEvery == kRecordStatsEvery - 1) { + RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); + c_iter->ResetRecordCounts(); + RecordCompactionIOStats(); + } + + status = sub_compact->feed->Feed(key, value); if (!status.ok()) { break; } - } - if (c_iter_stats.num_input_records % kRecordStatsEvery == - kRecordStatsEvery - 1) { - RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); - c_iter->ResetRecordCounts(); - RecordCompactionIOStats(); + // Close output file if it is big enough + // TODO(aekmekji): determine if file should be closed earlier than this + // during subcompactions (i.e. if output size, estimated by input size, is + // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB + // and 0.6MB instead of 1MB and 0.2MB) + if (sub_compact->builder && sub_compact->builder->TotalFileSize() >= + sub_compact->compaction->max_output_file_size()) { + status = FinishCompactionOutputFile( + input->status(), sub_compact, ShouldDeleteCorruptedFile{retries_allowed()}); + } + + c_iter->Next(); } - status = sub_compact->feed->Feed(key, value); - if (!status.ok()) { - break; + if (status.ok()) { + status = sub_compact->feed->Flush(); } - // Close output file if it is big enough - // TODO(aekmekji): determine if file should be closed earlier than this - // during subcompactions (i.e. if output size, estimated by input size, is - // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB - // and 0.6MB instead of 1MB and 0.2MB) - if (sub_compact->builder && - sub_compact->builder->TotalFileSize() >= sub_compact->compaction->max_output_file_size()) { - status = FinishCompactionOutputFile(input->status(), sub_compact); + if (status.ok() && + (shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) { + status = STATUS(ShutdownInProgress, + "Database shutdown or Column family drop during compaction"); + } + if (status.ok() && sub_compact->builder != nullptr) { + status = FinishCompactionOutputFile( + input->status(), sub_compact, ShouldDeleteCorruptedFile{retries_allowed()}); } - c_iter->Next(); - } + if (status.IsTryAgain() && num_retries_left > 0) { + if (sub_compact->outputs.size() != 0) { + LOG(DFATAL) << "Retries are not supported when first sub-compaction file is not " + "corrupted, sub_compact->outputs.size(): " + << sub_compact->outputs.size(); + break; + } + RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Retrying compaction"); + --num_retries_left; + continue; + } - if (status.ok()) { - status = sub_compact->feed->Flush(); + break; } // This is used to persist the history cutoff hybrid time chosen for the DocDB compaction @@ -714,6 +756,8 @@ void CompactionJob::ProcessKeyValueCompaction( largest_user_frontier_ = sub_compact->context->GetLargestUserFrontier(); } + const auto& c_iter_stats = sub_compact->c_iter->iter_stats(); + sub_compact->num_input_records = c_iter_stats.num_input_records; sub_compact->compaction_job_stats.num_input_deletion_records = c_iter_stats.num_input_deletion_records; @@ -727,14 +771,6 @@ void CompactionJob::ProcessKeyValueCompaction( RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); RecordCompactionIOStats(); - if (status.ok() && - (shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) { - status = STATUS(ShutdownInProgress, - "Database shutdown or Column family drop during compaction"); - } - if (status.ok() && sub_compact->builder != nullptr) { - status = FinishCompactionOutputFile(input->status(), sub_compact); - } if (status.ok()) { status = input->status(); } @@ -796,8 +832,50 @@ void CompactionJob::CloseFile(Status* status, std::unique_ptrreset(); } +Status CompactionJob::CheckOutputFile(SubcompactionState* sub_compact) { + const auto is_split_sst = sub_compact->compaction->column_family_data() + ->ioptions() + ->table_factory->IsSplitSstForWriteSupported(); + const auto& meta = sub_compact->current_output()->meta; + + const auto output_number = sub_compact->current_output()->meta.fd.GetNumber(); + DCHECK_NE(output_number, 0); + + const auto rocksdb_check_sst_file_tail_for_zeros = + FLAGS_rocksdb_check_sst_file_tail_for_zeros; + if (PREDICT_FALSE(rocksdb_check_sst_file_tail_for_zeros > 0)) { + const auto base_fname = TableFileName( + db_options_.db_paths, meta.fd.GetNumber(), sub_compact->compaction->output_path_id()); + RETURN_NOT_OK(CheckSstTailForZeros( + db_options_, env_options_, is_split_sst ? TableBaseToDataFileName(base_fname) : base_fname, + rocksdb_check_sst_file_tail_for_zeros)); + } + + if (sub_compact->builder->NumEntries() == 0) { + return Status::OK(); + } + + // Verify that the table is usable + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); + { + std::unique_ptr iter(cfd->table_cache()->NewIterator( + ReadOptions(), env_options_, cfd->internal_comparator(), meta.fd, meta.UserFilter(), + nullptr, cfd->internal_stats()->GetFileReadHist(compact_->compaction->output_level()), + false)); + RETURN_NOT_OK(iter->status()); + + if (paranoid_file_checks_) { + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {} + RETURN_NOT_OK(iter->status()); + } + } + + return Status::OK(); +} + Status CompactionJob::FinishCompactionOutputFile( - const Status& input_status, SubcompactionState* sub_compact) { + const Status& input_status, SubcompactionState* sub_compact, + ShouldDeleteCorruptedFile should_delete_corrupted_file) { assert(sub_compact != nullptr); assert(sub_compact->base_outfile); const bool is_split_sst = sub_compact->compaction->column_family_data()->ioptions() @@ -817,15 +895,15 @@ Status CompactionJob::FinishCompactionOutputFile( TableProperties table_properties; // Check for iterator errors - Status s = input_status; + Status status = input_status; auto& meta = sub_compact->current_output()->meta; const uint64_t current_entries = sub_compact->builder->NumEntries(); meta.marked_for_compaction = sub_compact->builder->NeedCompact(); - if (s.ok() && sub_compact->context) { - s = sub_compact->context->UpdateMeta(&meta); + if (status.ok() && sub_compact->context) { + status = sub_compact->context->UpdateMeta(&meta); } - if (s.ok()) { - s = sub_compact->builder->Finish(); + if (status.ok()) { + status = sub_compact->builder->Finish(); } else { sub_compact->builder->Abandon(); } @@ -838,84 +916,87 @@ Status CompactionJob::FinishCompactionOutputFile( // Finish and check for file errors if (sub_compact->data_outfile) { - CloseFile(&s, &sub_compact->data_outfile); + CloseFile(&status, &sub_compact->data_outfile); } - CloseFile(&s, &sub_compact->base_outfile); + CloseFile(&status, &sub_compact->base_outfile); - const auto rocksdb_check_sst_file_tail_for_zeros = - FLAGS_rocksdb_check_sst_file_tail_for_zeros; - if (s.ok() && PREDICT_FALSE(rocksdb_check_sst_file_tail_for_zeros > 0)) { - const auto base_fname = TableFileName( - db_options_.db_paths, meta.fd.GetNumber(), sub_compact->compaction->output_path_id()); - s = CheckSstTailForZeros( - db_options_, env_options_, is_split_sst ? TableBaseToDataFileName(base_fname) : base_fname, - rocksdb_check_sst_file_tail_for_zeros); - } + auto* cfd = sub_compact->compaction->column_family_data(); + const auto base_file_path = + TableFileName(cfd->ioptions()->db_paths, meta.fd.GetNumber(), meta.fd.GetPathId()); + const auto data_file_path = TableBaseToDataFileName(base_file_path); - if (s.ok() && current_entries > 0) { - // Verify that the table is usable - ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); - InternalIterator* iter = cfd->table_cache()->NewIterator( - ReadOptions(), env_options_, cfd->internal_comparator(), meta.fd, meta.UserFilter(), - nullptr, cfd->internal_stats()->GetFileReadHist( - compact_->compaction->output_level()), - false); - s = iter->status(); + bool has_base_file = true; + bool has_data_file = is_split_sst; - if (s.ok() && paranoid_file_checks_) { - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {} - s = iter->status(); + if (status.ok()) { + status = CheckOutputFile(sub_compact); + if (!status.ok() && should_delete_corrupted_file) { + auto deleted = env_->CleanupFile(base_file_path, db_options_.log_prefix); + if (deleted) { + has_base_file = false; + if (has_data_file) { + deleted = env_->CleanupFile(data_file_path, db_options_.log_prefix); + has_data_file = !deleted; + } + } + + if (deleted) { + sub_compact->outputs.pop_back(); + // Allow retries at higher level. + status = status.CloneAndReplaceCode(Status::Code::kTryAgain); + } } + } - delete iter; - if (s.ok()) { + if (status.ok()) { + // Don't reuse output file number in case we are continuing compaction to a new file. + sub_compact->output_file_number = 0; + if (current_entries > 0) { auto tp = sub_compact->builder->GetTableProperties(); - sub_compact->current_output()->table_properties = - std::make_shared(tp); + sub_compact->current_output()->table_properties = std::make_shared(tp); TableFileCreationInfo info(std::move(tp)); info.db_name = dbname_; info.cf_name = cfd->GetName(); - info.file_path = - TableFileName(cfd->ioptions()->db_paths, meta.fd.GetNumber(), - meta.fd.GetPathId()); + info.file_path = base_file_path; info.file_size = meta.fd.GetTotalFileSize(); info.job_id = job_id_; RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 - " keys, %" PRIu64 " bytes %s %s", - cfd->GetName().c_str(), job_id_, output_number, current_entries, - current_total_bytes, - meta.marked_for_compaction ? "(need compaction)" : ToString(compaction_reason).c_str(), - meta.FrontiersToString().c_str()); + "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 + " keys, %" PRIu64 " bytes %s %s", + cfd->GetName().c_str(), job_id_, output_number, current_entries, + current_total_bytes, + meta.marked_for_compaction ? "(need compaction)" : ToString(compaction_reason).c_str(), + meta.FrontiersToString().c_str()); EventHelpers::LogAndNotifyTableFileCreation( event_logger_, cfd->ioptions()->listeners, meta.fd, info); } } - // Report new file to SstFileManagerImpl - auto sfm = - static_cast(db_options_.sst_file_manager.get()); - if (sfm && meta.fd.GetPathId() == 0) { - ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); - auto fn = TableFileName(cfd->ioptions()->db_paths, meta.fd.GetNumber(), - meta.fd.GetPathId()); - RETURN_NOT_OK(sfm->OnAddFile(fn)); - if (is_split_sst) { - RETURN_NOT_OK(sfm->OnAddFile(TableBaseToDataFileName(fn))); - } - if (sfm->IsMaxAllowedSpaceReached()) { - InstrumentedMutexLock l(db_mutex_); - if (db_bg_error_->ok()) { - s = STATUS(IOError, "Max allowed space was reached"); - *db_bg_error_ = s; - TEST_SYNC_POINT( - "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached"); + { + // Report new file to SstFileManagerImpl + auto sfm = + static_cast(db_options_.sst_file_manager.get()); + if (sfm && meta.fd.GetPathId() == 0) { + if (has_base_file) { + RETURN_NOT_OK(sfm->OnAddFile(base_file_path)); + } + if (has_data_file) { + RETURN_NOT_OK(sfm->OnAddFile(data_file_path)); + } + if (sfm->IsMaxAllowedSpaceReached()) { + InstrumentedMutexLock l(db_mutex_); + if (db_bg_error_->ok()) { + status = STATUS(IOError, "Max allowed space was reached"); + *db_bg_error_ = status; + TEST_SYNC_POINT( + "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached"); + } } } } sub_compact->builder.reset(); - return s; + return status; } Status CompactionJob::InstallCompactionResults( @@ -983,21 +1064,25 @@ Status CompactionJob::OpenFile(const std::string table_name, uint64_t file_numbe } Status CompactionJob::OpenCompactionOutputFile( - FileNumbersHolder* holder, SubcompactionState* sub_compact) { + FileNumber file_number, SubcompactionState* sub_compact) { RSTATUS_DCHECK(sub_compact != nullptr, InternalError, "sub_compact is NULL"); RSTATUS_DCHECK(sub_compact->builder == nullptr, InternalError, "Sub compact builder already present"); - FileNumber file_number = file_numbers_provider_->NewFileNumber(holder); + + const auto* cfd = sub_compact->compaction->column_family_data(); + const bool is_split_sst = cfd->ioptions()->table_factory->IsSplitSstForWriteSupported(); // Make the output file - unique_ptr base_writable_file; - unique_ptr data_writable_file; - const std::string base_fname = TableFileName(db_options_.db_paths, file_number, + std::unique_ptr base_writable_file; + std::unique_ptr data_writable_file; + const auto table_name = sub_compact->compaction->column_family_data()->GetName(); + const auto base_fname = TableFileName(db_options_.db_paths, file_number, sub_compact->compaction->output_path_id()); - const std::string data_fname = TableBaseToDataFileName(base_fname); - const std::string table_name = sub_compact->compaction->column_family_data()->GetName(); RETURN_NOT_OK(OpenFile(table_name, file_number, "base", base_fname, &base_writable_file)); - RETURN_NOT_OK(OpenFile(table_name, file_number, "data", data_fname, &data_writable_file)); + if (is_split_sst) { + const auto data_fname = TableBaseToDataFileName(base_fname); + RETURN_NOT_OK(OpenFile(table_name, file_number, "data", data_fname, &data_writable_file)); + } SubcompactionState::Output out; out.meta.fd = @@ -1013,8 +1098,6 @@ Status CompactionJob::OpenCompactionOutputFile( sub_compact->outputs.push_back(out); - ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); - { auto setup_outfile = [this, sub_compact] ( size_t preallocation_block_size, std::unique_ptr* writable_file, @@ -1027,7 +1110,6 @@ Status CompactionJob::OpenCompactionOutputFile( std::move(*writable_file), env_options_, sub_compact->compaction->suspender())); }; - const bool is_split_sst = cfd->ioptions()->table_factory->IsSplitSstForWriteSupported(); const size_t preallocation_data_block_size = static_cast( sub_compact->compaction->OutputFilePreallocationSize()); // if we don't have separate data file - preallocate size for base file diff --git a/src/yb/rocksdb/db/compaction_job.h b/src/yb/rocksdb/db/compaction_job.h index e22f3d6d0d7a..295235abebb4 100644 --- a/src/yb/rocksdb/db/compaction_job.h +++ b/src/yb/rocksdb/db/compaction_job.h @@ -69,6 +69,8 @@ class Arena; class FileNumbersProvider; class FileNumbersHolder; +YB_STRONGLY_TYPED_BOOL(ShouldDeleteCorruptedFile); + class CompactionJob { public: CompactionJob(int job_id, Compaction* compaction, const DBOptions& db_options, @@ -113,14 +115,16 @@ class CompactionJob { // kv-pairs void ProcessKeyValueCompaction(FileNumbersHolder* holder, SubcompactionState* sub_compact); - Status FinishCompactionOutputFile(const Status& input_status, - SubcompactionState* sub_compact); + Status CheckOutputFile(SubcompactionState* sub_compact); + Status FinishCompactionOutputFile( + const Status& input_status, SubcompactionState* sub_compact, + ShouldDeleteCorruptedFile should_delete_corrupted_file); Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options); void RecordCompactionIOStats(); Status OpenFile(const std::string table_name, uint64_t file_number, const std::string file_type_label, const std::string fname, std::unique_ptr* writable_file); - Status OpenCompactionOutputFile(FileNumbersHolder* holder, SubcompactionState* sub_compact); + Status OpenCompactionOutputFile(FileNumber file_number, SubcompactionState* sub_compact); void CleanupCompaction(); void UpdateCompactionJobStats( const InternalStats::CompactionStats& stats) const; diff --git a/src/yb/rocksdb/db/db_sst_test.cc b/src/yb/rocksdb/db/db_sst_test.cc index 3e7c1426e664..ca251e47c2a9 100644 --- a/src/yb/rocksdb/db/db_sst_test.cc +++ b/src/yb/rocksdb/db/db_sst_test.cc @@ -35,6 +35,7 @@ #include "yb/util/test_macros.h" DECLARE_uint64(rocksdb_check_sst_file_tail_for_zeros); +DECLARE_uint64(rocksdb_max_sst_write_retries); DECLARE_bool(TEST_simulate_fully_zeroed_file); namespace rocksdb { @@ -953,11 +954,16 @@ class SstTailZerosCheckTest : public DBTest { LOG(INFO) << "Do not corrupt " << file_path << " again"; continue; } + if (bytes_to_corrupt > 0 && corrupt_files_limit != 0) { + if (corrupt_files_limit > 0) { + --corrupt_files_limit; + } + ASSERT_OK(CorruptFile( + file_path, /* offset = */ -bytes_to_corrupt, bytes_to_corrupt, + yb::CorruptionType::kZero)); + } // Still proceed with no-op corruption when bytes_to_corrupt == 0 and insert file path // into corrupt_files, so we don't try to corrupt it again for test purposes. - ASSERT_OK(CorruptFile( - file_path, /* offset = */ -bytes_to_corrupt, bytes_to_corrupt, - yb::CorruptionType::kZero)); corrupt_files.insert(file_path); } }); @@ -993,10 +999,13 @@ class SstTailZerosCheckTest : public DBTest { } void DestroyAndReopen(const Options& options) { + num_keys_written = 0; + corrupt_file_type = FileType::kTableSBlockFile; bytes_to_corrupt = 0; - num_keys_written = 0; + corrupt_files.clear(); + corrupt_files_limit = -1; return DBHolder::DestroyAndReopen(options); } @@ -1013,9 +1022,12 @@ class SstTailZerosCheckTest : public DBTest { Random rnd{301}; int num_keys_written = 0; + uint64_t bytes_to_corrupt = 0; FileType corrupt_file_type = FileType::kTableSBlockFile; + std::unordered_set corrupt_files; + int corrupt_files_limit = -1; // -1 - no limit. }; TEST_F_EX(DBTest, SstTailZerosCheckFlush, SstTailZerosCheckTest) { @@ -1035,6 +1047,25 @@ TEST_F_EX(DBTest, SstTailZerosCheckFlush, SstTailZerosCheckTest) { ASSERT_NOK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); } +TEST_F_EX(DBTest, SstTailZerosCheckFlushRetries, SstTailZerosCheckTest) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_max_sst_write_retries) = 1; + + auto options = GetOptions(); + + for (auto corrupt_file_type_value : {kTableSBlockFile, kTableFile}) { + DestroyAndReopen(options); + + bytes_to_corrupt = kSstFileTailSizeToCheck; + corrupt_file_type = corrupt_file_type_value; + + // Should be able to flush both after retry. + ASSERT_OK(GenAndFlushFiles(/* num_files = */ 2)); + ASSERT_EQ("2", FilesPerLevel(0)); + ASSERT_EQ(2, corrupt_files.size()); + ASSERT_EQ(num_keys_written, ASSERT_RESULT(CountKeys())); + } +} + TEST_F_EX(DBTest, SstTailZerosCheckCompaction, SstTailZerosCheckTest) { auto options = GetOptions(); for (auto compaction_output_bytes_to_corrupt : @@ -1069,6 +1100,34 @@ TEST_F_EX(DBTest, SstTailZerosCheckCompaction, SstTailZerosCheckTest) { } } +TEST_F_EX(DBTest, SstTailZerosCheckCompactionRetries, SstTailZerosCheckTest) { + FLAGS_rocksdb_max_sst_write_retries = 1; + + auto options = GetOptions(); + + for (auto corrupt_file_type_value : {kTableSBlockFile, kTableFile}) { + DestroyAndReopen(options); + + corrupt_file_type = corrupt_file_type_value; + + // Do not corrupt flushed files. + bytes_to_corrupt = 0; + ASSERT_OK(GenAndFlushFiles(/* num_files = */ 2)); + ASSERT_EQ("2", FilesPerLevel(0)); + ASSERT_EQ(2, corrupt_files.size()) << yb::AsString(corrupt_files); + + // Corrupt compaction output file once. + bytes_to_corrupt = kSstFileTailSizeToCheck; + corrupt_files_limit = 1; + + // Should be able to compact after retry. + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,1", FilesPerLevel(0)); + ASSERT_EQ(3, corrupt_files.size()) << yb::AsString(corrupt_files); + ASSERT_EQ(num_keys_written, ASSERT_RESULT(CountKeys())); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/src/yb/rocksdb/env.h b/src/yb/rocksdb/env.h index 931d3e64203c..01cbef776c10 100644 --- a/src/yb/rocksdb/env.h +++ b/src/yb/rocksdb/env.h @@ -290,7 +290,8 @@ class Env { virtual Status DeleteFile(const std::string& fname) = 0; // Delete file, print warning on failure. - void CleanupFile(const std::string& fname); + // Returns true iff file has been deleted. + bool CleanupFile(const std::string& fname, const std::string& log_prefix = ""); // Create the specified directory. Returns error if directory exists. virtual Status CreateDir(const std::string& dirname) = 0; diff --git a/src/yb/rocksdb/util/env.cc b/src/yb/rocksdb/util/env.cc index e29e4dbb56ce..654b656fdae1 100644 --- a/src/yb/rocksdb/util/env.cc +++ b/src/yb/rocksdb/util/env.cc @@ -83,8 +83,10 @@ yb::Result Env::GetFileSize(const std::string& fname) { return result; } -void Env::CleanupFile(const std::string& fname) { - WARN_NOT_OK(DeleteFile(fname), "Failed to cleanup " + fname); +bool Env::CleanupFile(const std::string& fname, const std::string& log_prefix) { + Status s; + WARN_NOT_OK(s = DeleteFile(fname), log_prefix + "Failed to cleanup " + fname); + return s.ok(); } void Env::GetChildrenWarnNotOk(const std::string& dir,