Skip to content

Commit

Permalink
Deactivate stream limiter, add timeout on protocol and opti operation…
Browse files Browse the repository at this point in the history
…s fetch. (#4220)

* Deactivate stream_limiter

Signed-off-by: Litchi Pi <[email protected]>

* Deactivate stream_limiter entirely

Signed-off-by: Litchi Pi <[email protected]>

* fixup

Signed-off-by: Litchi Pi <[email protected]>

* Add timeout on protocol and optimize operations verifications.

* Update sandbox version

* Remove imcompatibility

* Add timeout of protocol in tests.

---------

Signed-off-by: Litchi Pi <[email protected]>
Co-authored-by: Litchi Pi <[email protected]>
  • Loading branch information
AurelienFT and litchipi authored Jul 5, 2023
1 parent 300c2aa commit 05b7aa0
Show file tree
Hide file tree
Showing 14 changed files with 41 additions and 43 deletions.
10 changes: 1 addition & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion massa-bootstrap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ substruct = { git = "https://github.com/sydhds/substruct" }
socket2 = "0.4.7"
crossbeam = "0.8.2"
mio = { version = "0.8", features = ["net", "os-poll"] }
stream_limiter = "3.2.0"

# custom modules
massa_consensus_exports = { path = "../massa-consensus-exports" }
Expand Down
27 changes: 13 additions & 14 deletions massa-bootstrap/src/bindings/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ use massa_signature::{PublicKey, Signature};
use rand::{rngs::StdRng, RngCore, SeedableRng};
use std::time::Instant;
use std::{net::TcpStream, time::Duration};
use stream_limiter::{Limiter, LimiterOptions};

/// Bootstrap client binder
pub struct BootstrapClientBinder {
remote_pubkey: PublicKey,
duplex: Limiter<TcpStream>,
duplex: TcpStream, //Limiter<TcpStream>,
prev_message: Option<Hash>,
version_serializer: VersionSerializer,
cfg: BootstrapClientConfig,
Expand All @@ -47,11 +46,11 @@ impl BootstrapClientBinder {
duplex: TcpStream,
remote_pubkey: PublicKey,
cfg: BootstrapClientConfig,
limit: Option<u64>,
_limit: Option<u64>,
) -> Self {
let limit_opts =
limit.map(|limit| LimiterOptions::new(limit, Duration::from_millis(1000), limit));
let duplex = Limiter::new(duplex, limit_opts.clone(), limit_opts);
// let limit_opts =
// limit.map(|limit| LimiterOptions::new(limit, Duration::from_millis(1000), limit));
// let duplex = Limiter::new(duplex, limit_opts.clone(), limit_opts);
BootstrapClientBinder {
remote_pubkey,
duplex,
Expand Down Expand Up @@ -215,10 +214,10 @@ impl BootstrapClientBinder {

impl crate::bindings::BindingReadExact for BootstrapClientBinder {
fn set_read_timeout(&mut self, duration: Option<Duration>) -> Result<(), std::io::Error> {
if let Some(ref mut opts) = self.duplex.read_opt {
opts.timeout = duration;
}
self.duplex.stream.set_read_timeout(duration)
// if let Some(ref mut opts) = self.duplex.read_opt {
// opts.timeout = duration;
// }
self.duplex.set_read_timeout(duration)
}
}

Expand All @@ -230,10 +229,10 @@ impl std::io::Read for BootstrapClientBinder {

impl crate::bindings::BindingWriteExact for BootstrapClientBinder {
fn set_write_timeout(&mut self, duration: Option<Duration>) -> Result<(), std::io::Error> {
if let Some(ref mut opts) = self.duplex.write_opt {
opts.timeout = duration;
}
self.duplex.stream.set_write_timeout(duration)
// if let Some(ref mut opts) = self.duplex.write_opt {
// opts.timeout = duration;
// }
self.duplex.set_write_timeout(duration)
}
}

Expand Down
18 changes: 9 additions & 9 deletions massa-bootstrap/src/bindings/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
thread,
time::Duration,
};
use stream_limiter::{Limiter, LimiterOptions};
// use stream_limiter::{Limiter, LimiterOptions};
use tracing::error;

use super::BindingWriteExact;
Expand All @@ -43,7 +43,7 @@ pub struct BootstrapServerBinder {
max_datastore_key_length: u8,
randomness_size_bytes: usize,
local_keypair: KeyPair,
duplex: Limiter<TcpStream>,
duplex: TcpStream, //Limiter<TcpStream>,
prev_message: Option<Hash>,
version_serializer: VersionSerializer,
version_deserializer: VersionDeserializer,
Expand All @@ -62,7 +62,7 @@ impl BootstrapServerBinder {
duplex: TcpStream,
local_keypair: KeyPair,
cfg: BootstrapSrvBindCfg,
rw_limit: Option<u64>,
_rw_limit: Option<u64>,
) -> Self {
let BootstrapSrvBindCfg {
max_bytes_read_write: _limit,
Expand All @@ -73,10 +73,10 @@ impl BootstrapServerBinder {
write_error_timeout,
} = cfg;

let limit_opts = rw_limit.map(|limit| -> LimiterOptions {
LimiterOptions::new(limit, Duration::from_millis(1000), limit)
});
let duplex = Limiter::new(duplex, limit_opts.clone(), limit_opts);
// let limit_opts = rw_limit.map(|limit| -> LimiterOptions {
// LimiterOptions::new(limit, Duration::from_millis(1000), limit)
// });
// let duplex = Limiter::new(duplex, limit_opts.clone(), limit_opts);
BootstrapServerBinder {
max_consensus_block_ids: consensus_bootstrap_part_size,
local_keypair,
Expand Down Expand Up @@ -326,7 +326,7 @@ impl io::Read for BootstrapServerBinder {

impl crate::bindings::BindingReadExact for BootstrapServerBinder {
fn set_read_timeout(&mut self, duration: Option<Duration>) -> Result<(), std::io::Error> {
self.duplex.stream.set_read_timeout(duration)
self.duplex.set_read_timeout(duration)
}
}

Expand All @@ -342,6 +342,6 @@ impl io::Write for BootstrapServerBinder {

impl crate::bindings::BindingWriteExact for BootstrapServerBinder {
fn set_write_timeout(&mut self, duration: Option<Duration>) -> Result<(), std::io::Error> {
self.duplex.stream.set_write_timeout(duration)
self.duplex.set_write_timeout(duration)
}
}
4 changes: 2 additions & 2 deletions massa-models/src/config/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ lazy_static::lazy_static! {
/// node version
pub static ref VERSION: Version = {
if cfg!(feature = "sandbox") {
"SAND.24.0"
"SAND.24.1"
} else {
"TEST.24.0"
"TEST.24.1"
}
.parse()
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion massa-node/base_config/openrpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"openrpc": "1.2.4",
"info": {
"title": "Massa OpenRPC Specification",
"version": "TEST.24.0",
"version": "TEST.24.1",
"description": "Massa OpenRPC Specification document. Find more information on https://docs.massa.net/en/latest/technical-doc/api.html",
"termsOfService": "https://open-rpc.org",
"contact": {
Expand Down
1 change: 1 addition & 0 deletions massa-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ async fn launch(
try_connection_timer: SETTINGS.protocol.try_connection_timer,
max_in_connections: SETTINGS.protocol.max_in_connections,
timeout_connection: SETTINGS.protocol.timeout_connection,
message_timeout: SETTINGS.protocol.message_timeout,
routable_ip: SETTINGS
.protocol
.routable_ip
Expand Down
2 changes: 2 additions & 0 deletions massa-node/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ pub struct ProtocolSettings {
pub try_connection_timer: MassaTime,
/// Timeout connection
pub timeout_connection: MassaTime,
/// Message timeout
pub message_timeout: MassaTime,
/// Nb in connections
pub max_in_connections: usize,
/// Peers limits per category
Expand Down
2 changes: 1 addition & 1 deletion massa-protocol-exports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ nom = "=7.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# TODO tag peernet version
peernet = { git = "https://github.com/massalabs/PeerNet", rev = "7b2a1a9" }
peernet = { git = "https://github.com/massalabs/PeerNet", branch = "deactivate_stream_limiter" } #rev = "7b2a1a9" }
tempfile = { version = "3.3", optional = true } # use with testing feature
mockall = "0.11.4"

Expand Down
2 changes: 2 additions & 0 deletions massa-protocol-exports/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ pub struct ProtocolConfig {
pub max_in_connections: usize,
/// Timeout connection
pub timeout_connection: MassaTime,
/// Timeout message
pub message_timeout: MassaTime,
/// Number of bytes per second that can be read/write in a connection (should be a 10 multiplier)
pub read_write_limit_bytes_per_second: u128,
/// Optional routable ip
Expand Down
1 change: 1 addition & 0 deletions massa-protocol-exports/src/test_exports/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl Default for ProtocolConfig {
max_endorsements_per_message: 1000,
max_size_listeners_per_peer: 100,
max_size_peers_announcement: 100,
message_timeout: MassaTime::from_millis(10000),
last_start_period: 0,
read_write_limit_bytes_per_second: 1024 * 1000,
timeout_connection: MassaTime::from_millis(1000),
Expand Down
2 changes: 1 addition & 1 deletion massa-protocol-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ serde_json = "1.0"
nom = "=7.1"
num_enum = "0.5"
# TODO tag peernet version
peernet = { git = "https://github.com/massalabs/PeerNet", rev = "7b2a1a9" }
peernet = { git = "https://github.com/massalabs/PeerNet", branch = "deactivate_stream_limiter" } #rev = "7b2a1a9" }
tempfile = { version = "3.3", optional = true } # use with testing feature
rayon = "1.7.0"
schnellru = "0.2.1"
Expand Down
10 changes: 5 additions & 5 deletions massa-protocol-worker/src/handlers/block_handler/retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,12 +1010,12 @@ impl RetrievalThread {
}
return Ok(());
};
operations.retain(|op| block_operation_ids.contains(&op.id));
let block_ids_set: PreHashSet<OperationId> =
block_operation_ids.iter().copied().collect();
operations.retain(|op| block_ids_set.contains(&op.id));
// add operations to local storage and claim ref
info.storage.store_operations(operations);
let block_ids_set = block_operation_ids.clone().into_iter().collect();
let known_operations = info.storage.claim_operation_refs(&block_ids_set);

let known_operations = info.storage.get_op_refs();
// Ban the node if:
// - mismatch with asked operations (asked operations are the one that are not in storage) + operations already in storage and block operations
// - full operations serialized size overflow
Expand All @@ -1035,7 +1035,7 @@ impl RetrievalThread {
self.consensus_controller
.mark_invalid_block(block_id, header);
} else {
if known_operations != block_ids_set {
if known_operations != &block_ids_set {
warn!(
"Peer id {} didn't sent us all the full operations for block id {}.",
from_peer_id, block_id
Expand Down
2 changes: 2 additions & 0 deletions massa-protocol-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ pub fn start_protocol_controller(
our_keypair: keypair.clone(),
},
);
peernet_config.write_timeout = config.message_timeout.to_duration();
peernet_config.read_timeout = config.message_timeout.to_duration();

let initial_peers_infos = serde_json::from_str::<HashMap<PeerId, PeerData>>(
&std::fs::read_to_string(&config.initial_peers)?,
Expand Down

0 comments on commit 05b7aa0

Please sign in to comment.