diff --git a/src/server/cluster/cluster_defs.h b/src/server/cluster/cluster_defs.h index b54bb3859e34..b130f20b0698 100644 --- a/src/server/cluster/cluster_defs.h +++ b/src/server/cluster/cluster_defs.h @@ -172,12 +172,11 @@ class ClusterShardInfos { }; // MigrationState constants are ordered in state changing order -enum class MigrationState : uint8_t { - C_CONNECTING, - C_SYNC, - C_ERROR, - C_FINISHED, -}; +enum class MigrationState : uint8_t { C_CONNECTING, C_SYNC, C_ERROR, C_FINISHED, C_FATAL }; + +// Errors during slot migration +static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION"; +static constexpr std::string_view kIncomingMigrationOOM = "INCOMING_MIGRATION_OOM"; // return error message if slot doesn't belong to this node facade::ErrorReply SlotOwnershipError(SlotId slot_id); diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 88446a1d3fe5..3368baf0e756 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -726,6 +726,8 @@ static string_view StateToStr(MigrationState state) { return "ERROR"sv; case MigrationState::C_FINISHED: return "FINISHED"sv; + case MigrationState::C_FATAL: + return "FATAL"sv; } DCHECK(false) << "Unknown State value " << static_cast>(state); return "UNDEFINED_STATE"sv; @@ -765,7 +767,6 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, SinkReplyBuilder* b }; for (const auto& m : incoming_migrations_jobs_) { - // TODO add error status append_answer("in", m->GetSourceID(), node_id, m->GetState(), m->GetKeyCount(), m->GetErrorStr()); } @@ -834,7 +835,7 @@ ClusterFamily::TakeOutOutgoingMigrations(shared_ptr new_config, removed_slots.Merge(slots); LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to " << migration.GetHostIp() << ":" << migration.GetPort(); - migration.Finish(); + migration.Finish(MigrationState::C_FINISHED); res.migrations.push_back(std::move(*it)); outgoing_migration_jobs_.erase(it); } @@ -925,7 +926,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) { if (!migration) { VLOG(1) << "Unrecognized incoming migration from " << source_id; - return builder->SendSimpleString(OutgoingMigration::kUnknownMigration); + return builder->SendSimpleString(kUnknownMigration); } if (migration->GetState() != MigrationState::C_CONNECTING) { @@ -936,6 +937,10 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) { DeleteSlots(slots); } + if (migration->GetState() == MigrationState::C_FATAL) { + return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM)); + } + migration->Init(flows_num); return builder->SendOk(); @@ -955,6 +960,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder, cntx->conn()->SetName(absl::StrCat("migration_flow_", source_id)); auto migration = GetIncomingMigration(source_id); + if (!migration) { return builder->SendError(kIdNotFound); } @@ -1033,7 +1039,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) { [source_id = source_id](const auto& m) { return m.node_info.id == source_id; }); if (m_it == in_migrations.end()) { LOG(WARNING) << "migration isn't in config"; - return builder->SendError(OutgoingMigration::kUnknownMigration); + return builder->SendError(kUnknownMigration); } auto migration = GetIncomingMigration(source_id); @@ -1041,7 +1047,11 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) { return builder->SendError(kIdNotFound); if (!migration->Join(attempt)) { - return builder->SendError("Join timeout happened"); + if (migration->GetState() == MigrationState::C_FATAL) { + return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM)); + } else { + return builder->SendError("Join timeout happened"); + } } ApplyMigrationSlotRangeToConfig(migration->GetSourceID(), migration->GetSlots(), true); diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 2a6c8057a262..4c0420c71de5 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -70,6 +70,14 @@ class ClusterShardMigration { break; } + auto used_mem = used_mem_current.load(memory_order_relaxed); + // If aplying transaction data will reach 90% of max_memory_limit we end migration. + if ((used_mem + tx_data->command.cmd_len) > (0.9 * max_memory_limit)) { + cntx->ReportError(std::string{kIncomingMigrationOOM}); + in_migration_->ReportFatalError(std::string{kIncomingMigrationOOM}); + break; + } + while (tx_data->opcode == journal::Op::LSN) { VLOG(2) << "Attempt to finalize flow " << source_shard_id_ << " attempt " << tx_data->lsn; last_attempt_.store(tx_data->lsn); @@ -79,6 +87,11 @@ class ClusterShardMigration { VLOG(1) << "Finalized flow " << source_shard_id_; return; } + if (in_migration_->GetState() == MigrationState::C_FATAL) { + VLOG(1) << "Flow finalization " << source_shard_id_ + << " canceled due memory limit reached"; + return; + } if (!tx_data->command.cmd_args.empty()) { VLOG(1) << "Flow finalization failed " << source_shard_id_ << " by " << tx_data->command.cmd_args[0]; @@ -181,6 +194,11 @@ bool IncomingSlotMigration::Join(long attempt) { return false; } + // If any of migration shards reported ERROR (OOM) we can return error + if (GetState() == MigrationState::C_FATAL) { + return false; + } + // if data was sent after LSN, WaitFor() always returns false so to reduce wait time // we check current state and if WaitFor false but GetLastAttempt() == attempt // the Join is failed and we can return false @@ -218,6 +236,11 @@ void IncomingSlotMigration::Stop() { } } + // Don't wait if we reached FATAL state + if (state_ == MigrationState::C_FATAL) { + return; + } + // we need to Join the migration process to prevent data corruption const absl::Time start = absl::Now(); const absl::Duration timeout = @@ -251,7 +274,12 @@ void IncomingSlotMigration::Init(uint32_t shards_num) { void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) { shard_flows_[shard]->Start(&cntx_, source); - VLOG(1) << "Incoming flow " << shard << " finished for " << source_id_; + VLOG(1) << "Incoming flow " << shard + << (GetState() == MigrationState::C_FINISHED ? " finished " : " cancelled ") << "for " + << source_id_; + if (GetState() == MigrationState::C_FATAL) { + Stop(); + } } size_t IncomingSlotMigration::GetKeyCount() const { diff --git a/src/server/cluster/incoming_slot_migration.h b/src/server/cluster/incoming_slot_migration.h index 11edf400996f..481245c6d3f3 100644 --- a/src/server/cluster/incoming_slot_migration.h +++ b/src/server/cluster/incoming_slot_migration.h @@ -50,10 +50,20 @@ class IncomingSlotMigration { return source_id_; } + // Switch to FATAL state and store error message + void ReportFatalError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(state_mu_, error_mu_) { + errors_count_.fetch_add(1, std::memory_order_relaxed); + util::fb2::LockGuard lk_state(state_mu_); + util::fb2::LockGuard lk_error(error_mu_); + state_ = MigrationState::C_FATAL; + last_error_ = std::move(err); + } + void ReportError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(error_mu_) { errors_count_.fetch_add(1, std::memory_order_relaxed); util::fb2::LockGuard lk(error_mu_); - last_error_ = std::move(err); + if (GetState() != MigrationState::C_FATAL) + last_error_ = std::move(err); } std::string GetErrorStr() const ABSL_LOCKS_EXCLUDED(error_mu_) { @@ -75,6 +85,7 @@ class IncomingSlotMigration { std::vector> shard_flows_; SlotRanges slots_; ExecutionState cntx_; + mutable util::fb2::Mutex error_mu_; dfly::GenericError last_error_ ABSL_GUARDED_BY(error_mu_); std::atomic errors_count_ = 0; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 52d47b07be22..60086d784c56 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -36,7 +36,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots, journal::Journal* journal, OutgoingMigration* om) : ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) { - exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); }); + exec_st_.SwitchErrorHandler( + [om](auto ge) { om->Finish(MigrationState::C_ERROR, std::move(ge)); }); } ~SliceSlotMigration() { @@ -138,10 +139,8 @@ void OutgoingMigration::OnAllShards( }); } -void OutgoingMigration::Finish(GenericError error) { - auto next_state = MigrationState::C_FINISHED; +void OutgoingMigration::Finish(MigrationState next_state, GenericError error) { if (error) { - next_state = MigrationState::C_ERROR; LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": " << migration_info_.node_info.id << " with error: " << error.Format(); exec_st_.ReportError(std::move(error)); @@ -164,6 +163,7 @@ void OutgoingMigration::Finish(GenericError error) { case MigrationState::C_SYNC: case MigrationState::C_ERROR: + case MigrationState::C_FATAL: should_cancel_flows = true; break; } @@ -221,6 +221,14 @@ void OutgoingMigration::SyncFb() { continue; } + // Break outgoing migration if INIT from incoming node responded with OOM. Usually this will + // happen on second iteration after first failed with OOM. Sending second INIT is required to + // cleanup slots on incoming slot migration node. + if (CheckRespSimpleError(kIncomingMigrationOOM)) { + ChangeState(MigrationState::C_FATAL); + break; + } + if (!CheckRespIsSimpleReply("OK")) { if (CheckRespIsSimpleReply(kUnknownMigration)) { const absl::Duration passed = absl::Now() - start_time; @@ -272,7 +280,11 @@ void OutgoingMigration::SyncFb() { long attempt = 0; while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) { - // process commands that were on pause and try again + // Break loop and don't sleep in case of C_FATAL + if (GetState() == MigrationState::C_FATAL) { + break; + } + // Process commands that were on pause and try again VLOG(1) << "Waiting for migration to finalize..."; ThisFiber::SleepFor(500ms); } @@ -355,6 +367,12 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { return false; } + // Check OOM from incoming slot migration on ACK request + if (CheckRespSimpleError(kIncomingMigrationOOM)) { + Finish(MigrationState::C_FATAL, std::string(kIncomingMigrationOOM)); + return false; + } + if (!CheckRespFirstTypes({RespExpr::INT64})) { LOG(WARNING) << "Incorrect response type for " << cf_->MyID() << " : " << migration_info_.node_info.id << " attempt " << attempt @@ -371,7 +389,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { } if (!exec_st_.GetError()) { - Finish(); + Finish(MigrationState::C_FINISHED); keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges); cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges, false); diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 6b020fc7eace..95a32feb9534 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -30,7 +30,7 @@ class OutgoingMigration : private ProtocolClient { // if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet // can be called from any thread, but only after Start() // if is_error = true and migration is in progress it will be restarted otherwise nothing happens - void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); + void Finish(MigrationState next_state, GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_); @@ -76,8 +76,6 @@ class OutgoingMigration : private ProtocolClient { size_t GetKeyCount() const ABSL_LOCKS_EXCLUDED(state_mu_); - static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION"; - private: // should be run for all shards void StartFlow(journal::Journal* journal, io::Sink* dest); diff --git a/src/server/journal/executor.cc b/src/server/journal/executor.cc index bc43095c0c6d..0ff9c13a05e3 100644 --- a/src/server/journal/executor.cc +++ b/src/server/journal/executor.cc @@ -33,7 +33,7 @@ template journal::ParsedEntry::CmdData BuildFromParts(Ts... par start += part.size(); } - return {std::move(buf), std::move(slice_parts)}; + return {std::move(buf), std::move(slice_parts), cmd_str.size()}; } } // namespace diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index be7d7ba839cb..1a148fad48b4 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -162,6 +162,7 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data) size_t cmd_size = 0; SET_OR_RETURN(ReadUInt(), cmd_size); + data->cmd_len = cmd_size; // Read all strings consecutively. data->command_buf = make_unique(cmd_size); @@ -174,6 +175,9 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data) ptr += size; cmd_size -= size; } + + data->cmd_len -= cmd_size; + return {}; } diff --git a/src/server/journal/types.h b/src/server/journal/types.h index 24183623a5e3..ce06e8347384 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -73,6 +73,7 @@ struct ParsedEntry : public EntryBase { struct CmdData { std::unique_ptr command_buf; CmdArgVec cmd_args; // represents the parsed command. + size_t cmd_len{0}; }; CmdData cmd; diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index 8ecd6f65db7d..895e2af1a04a 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -339,6 +339,11 @@ bool ProtocolClient::CheckRespIsSimpleReply(string_view reply) const { ToSV(resp_args_.front().GetBuf()) == reply; } +bool ProtocolClient::CheckRespSimpleError(string_view error) const { + return resp_args_.size() == 1 && resp_args_.front().type == RespExpr::ERROR && + ToSV(resp_args_.front().GetBuf()) == error; +} + bool ProtocolClient::CheckRespFirstTypes(initializer_list types) const { unsigned i = 0; for (RespExpr::Type type : types) { diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index 6c08a0bb862f..7e7ddda036b7 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -85,6 +85,9 @@ class ProtocolClient { // Check if reps_args contains a simple reply. bool CheckRespIsSimpleReply(std::string_view reply) const; + // Check if resp_args contains a simple error + bool CheckRespSimpleError(std::string_view error) const; + // Check resp_args contains the following types at front. bool CheckRespFirstTypes(std::initializer_list types) const; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index ac5faa7c042b..6e7144138f55 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -3236,3 +3236,60 @@ async def test_cancel_blocking_cmd_during_mygration_finalization(df_factory: Dfl await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes]) assert await c_nodes[1].type("list") == "none" + + +@dfly_args({"cluster_mode": "yes"}) +async def test_slot_migration_oom(df_factory): + instances = [ + df_factory.create( + port=next(next_port), + admin_port=next(next_port), + proactor_threads=4, + maxmemory="1024MB", + ), + df_factory.create( + port=next(next_port), + admin_port=next(next_port), + proactor_threads=2, + maxmemory="512MB", + ), + ] + + df_factory.start_all(instances) + + nodes = [(await create_node_info(instance)) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + await nodes[0].client.execute_command("DEBUG POPULATE 100 test 10000000") + + nodes[0].migrations.append( + MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id) + ) + + logging.info("Start migration") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + # Wait for FATAL status + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FATAL", 300) + await wait_for_status(nodes[1].admin_client, nodes[0].id, "FATAL") + + # Node_0 slot-migration-status + status = await nodes[0].admin_client.execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[1].id + ) + # Direction + assert status[0][0] == "out" + # Error message + assert status[0][4] == "INCOMING_MIGRATION_OOM" + + # Node_1 slot-migration-status + status = await nodes[1].admin_client.execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id + ) + # Direction + assert status[0][0] == "in" + # Error message + assert status[0][4] == "INCOMING_MIGRATION_OOM" diff --git a/tools/cluster_mgr.py b/tools/cluster_mgr.py index 4530d8902133..007ebbbee053 100755 --- a/tools/cluster_mgr.py +++ b/tools/cluster_mgr.py @@ -449,6 +449,8 @@ def migrate(args): continue if len(sync_status) != 1: die_with_err(f"Unexpected number of migrations {len(sync_status)}: {sync_status}") + if "FATAL" in sync_status[0]: + die_with_err(f"Error in migration {len(sync_status)}: {sync_status}") if "FINISHED" in sync_status[0]: print(f"Migration finished: {sync_status[0]}") break