Skip to content

Commit

Permalink
Revert "raftstore: fix load base split cannot works in pure follower …
Browse files Browse the repository at this point in the history
…scenario (tikv#16376)

ref tikv#16314

Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch authored Jan 15, 2024
1 parent dafd147 commit 2a553aa
Show file tree
Hide file tree
Showing 6 changed files with 5 additions and 118 deletions.
4 changes: 0 additions & 4 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5357,14 +5357,10 @@ where
let allow_replica_read = read_only && msg.get_header().get_replica_read();
let flags = WriteBatchFlags::from_bits_check(msg.get_header().get_flags());
let allow_stale_read = read_only && flags.contains(WriteBatchFlags::STALE_READ);
let split_region = msg.has_admin_request()
&& msg.get_admin_request().get_cmd_type() == AdminCmdType::BatchSplit;
if !self.fsm.peer.is_leader()
&& !is_read_index_request
&& !allow_replica_read
&& !allow_stale_read
// allow proposal split command at non-leader, raft layer will forward it to leader.
&& !split_region
{
self.ctx.raft_metrics.invalid_proposal.not_leader.inc();
let leader = self.fsm.peer.get_peer_from_cache(leader_id);
Expand Down
1 change: 0 additions & 1 deletion components/raftstore/src/store/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ make_static_metric! {
conf_change,
batch,
dropped_read_index,
non_leader_split,
}

pub label_enum RaftInvalidProposal {
Expand Down
7 changes: 1 addition & 6 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4343,12 +4343,7 @@ where
}

match req.get_admin_request().get_cmd_type() {
AdminCmdType::Split | AdminCmdType::BatchSplit => {
ctx.insert(ProposalContext::SPLIT);
if !self.is_leader() {
poll_ctx.raft_metrics.propose.non_leader_split.inc();
}
}
AdminCmdType::Split | AdminCmdType::BatchSplit => ctx.insert(ProposalContext::SPLIT),
AdminCmdType::PrepareMerge => {
self.pre_propose_prepare_merge(poll_ctx, req)?;
ctx.insert(ProposalContext::PREPARE_MERGE);
Expand Down
22 changes: 3 additions & 19 deletions components/raftstore/src/store/worker/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,14 +469,6 @@ where
const DEFAULT_LOAD_BASE_SPLIT_CHECK_INTERVAL: Duration = Duration::from_secs(1);
const DEFAULT_COLLECT_TICK_INTERVAL: Duration = Duration::from_secs(1);

fn default_load_base_split_check_interval() -> Duration {
fail_point!("mock_load_base_split_check_interval", |t| {
let t = t.unwrap().parse::<u64>().unwrap();
Duration::from_millis(t)
});
DEFAULT_LOAD_BASE_SPLIT_CHECK_INTERVAL
}

fn default_collect_tick_interval() -> Duration {
fail_point!("mock_collect_tick_interval", |_| {
Duration::from_millis(1)
Expand Down Expand Up @@ -602,7 +594,7 @@ where
cpu_stats_sender: None,
collect_store_infos_interval: interval,
load_base_split_check_interval: cmp::min(
default_load_base_split_check_interval(),
DEFAULT_LOAD_BASE_SPLIT_CHECK_INTERVAL,
interval,
),
// Use `inspect_latency_interval` as the minimal limitation for collecting tick.
Expand Down Expand Up @@ -2132,19 +2124,11 @@ where

let f = async move {
for split_info in split_infos {
let Ok(Some((region, leader))) = pd_client
.get_region_leader_by_id(split_info.region_id)
.await
let Ok(Some(region)) =
pd_client.get_region_by_id(split_info.region_id).await
else {
continue;
};
if leader.get_id() != split_info.peer.get_id() {
info!("load base split region on non-leader";
"region_id" => region.get_id(),
"peer_id" => split_info.peer.get_id(),
"leader_id" => leader.get_id(),
);
}
// Try to split the region with the given split key.
if let Some(split_key) = split_info.split_key {
Self::handle_ask_batch_split(
Expand Down
3 changes: 1 addition & 2 deletions components/raftstore/src/store/worker/split_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl Recorder {
}

fn update_peer(&mut self, peer: &Peer) {
if self.peer != *peer && peer.get_id() != 0 {
if self.peer != *peer {
self.peer = peer.clone();
}
}
Expand Down Expand Up @@ -844,7 +844,6 @@ impl AutoSplitController {
"qps" => qps,
"byte" => byte,
"cpu_usage" => cpu_usage,
"peer" => ?recorder.peer,
);
self.recorders.remove(&region_id);
} else if is_unified_read_pool_busy && is_region_busy {
Expand Down
86 changes: 0 additions & 86 deletions tests/integrations/raftstore/test_split_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ use std::{

use engine_rocks::RocksEngine;
use engine_traits::{Peekable, CF_DEFAULT, CF_WRITE};
use grpcio::{ChannelBuilder, Environment};
use keys::data_key;
use kvproto::{
kvrpcpb::{Context, Op},
metapb, pdpb,
raft_cmdpb::*,
raft_serverpb::{ExtraMessageType, RaftMessage},
tikvpb_grpc::TikvClient,
};
use pd_client::PdClient;
use raft::eraftpb::MessageType;
Expand Down Expand Up @@ -241,89 +238,6 @@ fn test_auto_split_region() {
assert!(resp.get_header().get_error().has_key_not_in_region());
}

#[test_case(test_raftstore::new_server_cluster)]
fn test_load_base_auto_split_with_follower_read() {
fail::cfg("mock_tick_interval", "return(0)").unwrap();
fail::cfg("mock_collect_tick_interval", "return(0)").unwrap();
fail::cfg("mock_load_base_split_check_interval", "return(100)").unwrap();
fail::cfg("mock_region_is_busy", "return(0)").unwrap();
fail::cfg("mock_unified_read_pool_is_busy", "return(0)").unwrap();
let count = 2;
let mut cluster = new_cluster(0, count);
cluster.cfg.split.qps_threshold = Some(10);
cluster.cfg.split.byte_threshold = Some(1);
cluster.cfg.split.sample_threshold = 10;
cluster.cfg.split.detect_times = 2;
cluster.cfg.split.split_balance_score = 0.5;
cluster.run();
let pd_client = Arc::clone(&cluster.pd_client);
let target = pd_client.get_region(b"").unwrap();
let leader = cluster.leader_of_region(target.get_id()).unwrap();
let follower = target
.get_peers()
.iter()
.find(|p| p.get_id() != leader.get_id())
.unwrap()
.clone();

let env: Arc<Environment> = Arc::new(Environment::new(1));
let new_client = |peer: metapb::Peer| {
let cli = TikvClient::new(
ChannelBuilder::new(env.clone())
.connect(&cluster.sim.rl().get_addr(peer.get_store_id())),
);
let epoch = cluster.get_region_epoch(target.get_id());
let mut ctx = Context::default();
ctx.set_region_id(target.get_id());
ctx.set_peer(peer);
ctx.set_region_epoch(epoch);
PeerClient { cli, ctx }
};
let mut region1 = pd_client.get_region(b"k1").unwrap();
let mut region2 = pd_client.get_region(b"k3").unwrap();
assert_eq!(region1.get_id(), region2.get_id());

let leader_client = new_client(leader);
let commit_ts1 = leader_client.must_kv_write(
&pd_client,
vec![new_mutation(Op::Put, &b"k1"[..], &b"v1"[..])],
b"k1".to_vec(),
);
let commit_ts2 = leader_client.must_kv_write(
&pd_client,
vec![new_mutation(Op::Put, &b"k2"[..], &b"v2"[..])],
b"k2".to_vec(),
);
let commit_ts3 = leader_client.must_kv_write(
&pd_client,
vec![new_mutation(Op::Put, &b"k3"[..], &b"v3"[..])],
b"k3".to_vec(),
);
let mut follower_client = new_client(follower);
follower_client.ctx.set_replica_read(true);
for i in 0..100 {
follower_client.kv_read(b"k1".to_vec(), commit_ts1 + i);
follower_client.kv_read(b"k2".to_vec(), commit_ts2 + i);
follower_client.kv_read(b"k3".to_vec(), commit_ts3 + i);
}
thread::sleep(Duration::from_millis(100));
follower_client.kv_read(b"k3".to_vec(), commit_ts3);
for _ in 1..250 {
region1 = pd_client.get_region(b"k0").unwrap();
region2 = pd_client.get_region(b"k4").unwrap();
if region1.get_id() != region2.get_id() {
break;
}
thread::sleep(Duration::from_millis(20))
}
assert_ne!(region1.get_id(), region2.get_id());
fail::remove("mock_tick_interval");
fail::remove("mock_region_is_busy");
fail::remove("mock_collect_tick_interval");
fail::remove("mock_unified_read_pool_is_busy");
fail::remove("mock_load_base_split_check_interval");
}

// A filter that disable commitment by heartbeat.
#[derive(Clone)]
struct EraseHeartbeatCommit;
Expand Down

0 comments on commit 2a553aa

Please sign in to comment.