diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index a871788fe89..07ac28297b1 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -5357,14 +5357,10 @@ where let allow_replica_read = read_only && msg.get_header().get_replica_read(); let flags = WriteBatchFlags::from_bits_check(msg.get_header().get_flags()); let allow_stale_read = read_only && flags.contains(WriteBatchFlags::STALE_READ); - let split_region = msg.has_admin_request() - && msg.get_admin_request().get_cmd_type() == AdminCmdType::BatchSplit; if !self.fsm.peer.is_leader() && !is_read_index_request && !allow_replica_read && !allow_stale_read - // allow proposal split command at non-leader, raft layer will forward it to leader. - && !split_region { self.ctx.raft_metrics.invalid_proposal.not_leader.inc(); let leader = self.fsm.peer.get_peer_from_cache(leader_id); diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index ccc28306059..8595ed0bcf6 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -193,7 +193,6 @@ make_static_metric! { conf_change, batch, dropped_read_index, - non_leader_split, } pub label_enum RaftInvalidProposal { diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index b9a3a491563..fa5c8346c0c 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -4343,12 +4343,7 @@ where } match req.get_admin_request().get_cmd_type() { - AdminCmdType::Split | AdminCmdType::BatchSplit => { - ctx.insert(ProposalContext::SPLIT); - if !self.is_leader() { - poll_ctx.raft_metrics.propose.non_leader_split.inc(); - } - } + AdminCmdType::Split | AdminCmdType::BatchSplit => ctx.insert(ProposalContext::SPLIT), AdminCmdType::PrepareMerge => { self.pre_propose_prepare_merge(poll_ctx, req)?; ctx.insert(ProposalContext::PREPARE_MERGE); diff --git a/components/raftstore/src/store/worker/pd.rs b/components/raftstore/src/store/worker/pd.rs index 3ec4c65c4c5..152dc7b3ef6 100644 --- a/components/raftstore/src/store/worker/pd.rs +++ b/components/raftstore/src/store/worker/pd.rs @@ -469,14 +469,6 @@ where const DEFAULT_LOAD_BASE_SPLIT_CHECK_INTERVAL: Duration = Duration::from_secs(1); const DEFAULT_COLLECT_TICK_INTERVAL: Duration = Duration::from_secs(1); -fn default_load_base_split_check_interval() -> Duration { - fail_point!("mock_load_base_split_check_interval", |t| { - let t = t.unwrap().parse::().unwrap(); - Duration::from_millis(t) - }); - DEFAULT_LOAD_BASE_SPLIT_CHECK_INTERVAL -} - fn default_collect_tick_interval() -> Duration { fail_point!("mock_collect_tick_interval", |_| { Duration::from_millis(1) @@ -602,7 +594,7 @@ where cpu_stats_sender: None, collect_store_infos_interval: interval, load_base_split_check_interval: cmp::min( - default_load_base_split_check_interval(), + DEFAULT_LOAD_BASE_SPLIT_CHECK_INTERVAL, interval, ), // Use `inspect_latency_interval` as the minimal limitation for collecting tick. @@ -2132,19 +2124,11 @@ where let f = async move { for split_info in split_infos { - let Ok(Some((region, leader))) = pd_client - .get_region_leader_by_id(split_info.region_id) - .await + let Ok(Some(region)) = + pd_client.get_region_by_id(split_info.region_id).await else { continue; }; - if leader.get_id() != split_info.peer.get_id() { - info!("load base split region on non-leader"; - "region_id" => region.get_id(), - "peer_id" => split_info.peer.get_id(), - "leader_id" => leader.get_id(), - ); - } // Try to split the region with the given split key. if let Some(split_key) = split_info.split_key { Self::handle_ask_batch_split( diff --git a/components/raftstore/src/store/worker/split_controller.rs b/components/raftstore/src/store/worker/split_controller.rs index eb281db4f4e..b3d97413ab3 100644 --- a/components/raftstore/src/store/worker/split_controller.rs +++ b/components/raftstore/src/store/worker/split_controller.rs @@ -285,7 +285,7 @@ impl Recorder { } fn update_peer(&mut self, peer: &Peer) { - if self.peer != *peer && peer.get_id() != 0 { + if self.peer != *peer { self.peer = peer.clone(); } } @@ -844,7 +844,6 @@ impl AutoSplitController { "qps" => qps, "byte" => byte, "cpu_usage" => cpu_usage, - "peer" => ?recorder.peer, ); self.recorders.remove(®ion_id); } else if is_unified_read_pool_busy && is_region_busy { diff --git a/tests/integrations/raftstore/test_split_region.rs b/tests/integrations/raftstore/test_split_region.rs index 831ce113a64..b6874f10df2 100644 --- a/tests/integrations/raftstore/test_split_region.rs +++ b/tests/integrations/raftstore/test_split_region.rs @@ -9,14 +9,11 @@ use std::{ use engine_rocks::RocksEngine; use engine_traits::{Peekable, CF_DEFAULT, CF_WRITE}; -use grpcio::{ChannelBuilder, Environment}; use keys::data_key; use kvproto::{ - kvrpcpb::{Context, Op}, metapb, pdpb, raft_cmdpb::*, raft_serverpb::{ExtraMessageType, RaftMessage}, - tikvpb_grpc::TikvClient, }; use pd_client::PdClient; use raft::eraftpb::MessageType; @@ -241,89 +238,6 @@ fn test_auto_split_region() { assert!(resp.get_header().get_error().has_key_not_in_region()); } -#[test_case(test_raftstore::new_server_cluster)] -fn test_load_base_auto_split_with_follower_read() { - fail::cfg("mock_tick_interval", "return(0)").unwrap(); - fail::cfg("mock_collect_tick_interval", "return(0)").unwrap(); - fail::cfg("mock_load_base_split_check_interval", "return(100)").unwrap(); - fail::cfg("mock_region_is_busy", "return(0)").unwrap(); - fail::cfg("mock_unified_read_pool_is_busy", "return(0)").unwrap(); - let count = 2; - let mut cluster = new_cluster(0, count); - cluster.cfg.split.qps_threshold = Some(10); - cluster.cfg.split.byte_threshold = Some(1); - cluster.cfg.split.sample_threshold = 10; - cluster.cfg.split.detect_times = 2; - cluster.cfg.split.split_balance_score = 0.5; - cluster.run(); - let pd_client = Arc::clone(&cluster.pd_client); - let target = pd_client.get_region(b"").unwrap(); - let leader = cluster.leader_of_region(target.get_id()).unwrap(); - let follower = target - .get_peers() - .iter() - .find(|p| p.get_id() != leader.get_id()) - .unwrap() - .clone(); - - let env: Arc = Arc::new(Environment::new(1)); - let new_client = |peer: metapb::Peer| { - let cli = TikvClient::new( - ChannelBuilder::new(env.clone()) - .connect(&cluster.sim.rl().get_addr(peer.get_store_id())), - ); - let epoch = cluster.get_region_epoch(target.get_id()); - let mut ctx = Context::default(); - ctx.set_region_id(target.get_id()); - ctx.set_peer(peer); - ctx.set_region_epoch(epoch); - PeerClient { cli, ctx } - }; - let mut region1 = pd_client.get_region(b"k1").unwrap(); - let mut region2 = pd_client.get_region(b"k3").unwrap(); - assert_eq!(region1.get_id(), region2.get_id()); - - let leader_client = new_client(leader); - let commit_ts1 = leader_client.must_kv_write( - &pd_client, - vec![new_mutation(Op::Put, &b"k1"[..], &b"v1"[..])], - b"k1".to_vec(), - ); - let commit_ts2 = leader_client.must_kv_write( - &pd_client, - vec![new_mutation(Op::Put, &b"k2"[..], &b"v2"[..])], - b"k2".to_vec(), - ); - let commit_ts3 = leader_client.must_kv_write( - &pd_client, - vec![new_mutation(Op::Put, &b"k3"[..], &b"v3"[..])], - b"k3".to_vec(), - ); - let mut follower_client = new_client(follower); - follower_client.ctx.set_replica_read(true); - for i in 0..100 { - follower_client.kv_read(b"k1".to_vec(), commit_ts1 + i); - follower_client.kv_read(b"k2".to_vec(), commit_ts2 + i); - follower_client.kv_read(b"k3".to_vec(), commit_ts3 + i); - } - thread::sleep(Duration::from_millis(100)); - follower_client.kv_read(b"k3".to_vec(), commit_ts3); - for _ in 1..250 { - region1 = pd_client.get_region(b"k0").unwrap(); - region2 = pd_client.get_region(b"k4").unwrap(); - if region1.get_id() != region2.get_id() { - break; - } - thread::sleep(Duration::from_millis(20)) - } - assert_ne!(region1.get_id(), region2.get_id()); - fail::remove("mock_tick_interval"); - fail::remove("mock_region_is_busy"); - fail::remove("mock_collect_tick_interval"); - fail::remove("mock_unified_read_pool_is_busy"); - fail::remove("mock_load_base_split_check_interval"); -} - // A filter that disable commitment by heartbeat. #[derive(Clone)] struct EraseHeartbeatCommit;