Skip to content

Commit ae8dd9b

Browse files
committed
fix: Parameterization of the proc restart, and correction according to code review
Signed-off-by: Jeremy HERGAULT <[email protected]>
1 parent 3e05f02 commit ae8dd9b

File tree

6 files changed

+70
-14
lines changed

6 files changed

+70
-14
lines changed

prosa/src/core/error.rs

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl ProcError for std::io::Error {
5757
impl ProcError for openssl::error::Error {
5858
fn recoverable(&self) -> bool {
5959
if let Some(reason) = self.reason() {
60+
// If it's an SSL protocol error, consider that can be recoverable. It's may be temporary related to a distant.
6061
reason.contains("SSL_")
6162
} else {
6263
false

prosa/src/core/main.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ where
8888
0
8989
}
9090

91-
fn name(&self) -> &String {
92-
&self.name
91+
fn name(&self) -> &str {
92+
self.name.as_str()
9393
}
9494
}
9595

@@ -263,8 +263,8 @@ where
263263
0
264264
}
265265

266-
fn name(&self) -> &String {
267-
&self.name
266+
fn name(&self) -> &str {
267+
self.name.as_str()
268268
}
269269
}
270270

prosa/src/core/proc.rs

+14-6
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ pub trait ProcSettings {
190190
/// Getter of the processor's adaptor configuration path
191191
fn get_adaptor_config_path(&self) -> Option<&String>;
192192

193+
/// Getter of the restart delay that must be apply to the processor if an error is trigger.
194+
/// Return the duration to be add to every restart, and the max duration wait between restarts in seconds.
195+
fn get_proc_restart_delay(&self) -> (Duration, u32);
196+
193197
/// Getter of the processor's adaptor configuration
194198
fn get_adaptor_config<C>(&self) -> Result<C, ::config::ConfigError>
195199
where
@@ -219,7 +223,7 @@ pub trait ProcBusParam {
219223
fn get_proc_id(&self) -> u32;
220224

221225
/// Provide the ProSA name based on ProSA settings
222-
fn name(&self) -> &String;
226+
fn name(&self) -> &str;
223227
}
224228

225229
impl Debug for dyn ProcBusParam {
@@ -230,6 +234,9 @@ impl Debug for dyn ProcBusParam {
230234

231235
/// Trait to define all processor handle functions
232236
pub trait ProcEpilogue {
237+
/// Getter to know timer for processor restart in case of error
238+
fn get_proc_restart_delay(&self) -> (std::time::Duration, u32);
239+
233240
/// Method to remove the processor with a signal queue to the main task
234241
///
235242
/// Once the processor is removed, all its associated service will be remove
@@ -258,7 +265,7 @@ where
258265
self.id
259266
}
260267

261-
fn name(&self) -> &String {
268+
fn name(&self) -> &str {
262269
self.main.name()
263270
}
264271
}
@@ -457,7 +464,8 @@ where
457464
.thread_name(proc_name.clone())
458465
.build()
459466
.unwrap();
460-
let mut wait_time = Duration::ZERO;
467+
let proc_restart_delay = self.get_proc_restart_delay();
468+
let mut wait_time = proc_restart_delay.0;
461469
loop {
462470
if let Err(proc_err) = rt.block_on(self.internal_run(proc_name.clone())) {
463471
let recovery_duration = proc_err.recovery_duration();
@@ -494,9 +502,9 @@ where
494502
return;
495503
}
496504

497-
// Don't wait more than 5 minutes btween restarts
498-
if wait_time.as_secs() < 300 {
499-
wait_time += Duration::from_millis(50);
505+
// Don't wait more than the restart delay parameter
506+
if wait_time.as_secs() < proc_restart_delay.1 as u64 {
507+
wait_time += proc_restart_delay.0;
500508
wait_time *= 2;
501509
}
502510
}

prosa/src/core/service.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,8 @@ where
200200
self.proc_id
201201
}
202202

203-
fn name(&self) -> &String {
204-
&self.proc_name
203+
fn name(&self) -> &str {
204+
self.proc_name.as_str()
205205
}
206206
}
207207

prosa_macros/src/proc.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ fn generate_struct_impl_bus_param(
169169
}
170170

171171
#[doc=concat!(" Getter of the ", stringify!(#item_ident), " processor Name")]
172-
fn name(&self) -> &std::string::String {
172+
fn name(&self) -> &std::primitive::str {
173173
self.proc.name()
174174
}
175175
}
@@ -220,16 +220,27 @@ fn generate_struct_impl_config(
220220

221221
fn generate_struct_impl_epilogue(
222222
item_struct: &syn::ItemStruct,
223+
args: &ProcParams,
223224
) -> syn::parse::Result<proc_macro2::TokenStream> {
224225
let item_ident = &item_struct.ident;
225226
let item_generics = &item_struct.generics;
226227

228+
let restart_settings_quote = if args.settings.is_some() {
229+
quote! { prosa::core::proc::ProcSettings::get_proc_restart_delay(&self.settings) }
230+
} else {
231+
quote! { (std::time::Duration::from_millis(50), 300) }
232+
};
233+
227234
Ok(quote! {
228235
// The definition must be done for the protocol
229236
impl #item_generics prosa::core::proc::ProcEpilogue for #item_ident #item_generics
230237
where
231238
M: 'static + std::marker::Send + std::marker::Sync + std::marker::Sized + std::clone::Clone + std::fmt::Debug + prosa_utils::msg::tvf::Tvf + std::default::Default,
232239
{
240+
fn get_proc_restart_delay(&self) -> (std::time::Duration, u32) {
241+
#restart_settings_quote
242+
}
243+
233244
async fn remove_proc(&self, err: std::option::Option<Box<dyn prosa::core::error::ProcError + Send + Sync>>) -> Result<(), prosa::core::error::BusError> {
234245
self.proc.remove_proc(err).await
235246
}
@@ -290,7 +301,7 @@ pub(crate) fn proc_impl(
290301
let struct_output = generate_struct(item_struct, &proc_args)?;
291302
let struct_impl_bus_param = generate_struct_impl_bus_param(&struct_output)?;
292303
let struct_impl_config = generate_struct_impl_config(&struct_output, &proc_args)?;
293-
let struct_impl_epilogue = generate_struct_impl_epilogue(&struct_output)?;
304+
let struct_impl_epilogue = generate_struct_impl_epilogue(&struct_output, &proc_args)?;
294305
Ok(quote! {
295306
#struct_output
296307
#struct_impl_bus_param

prosa_macros/src/settings.rs

+36
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,26 @@ fn generate_proc_settings_struct(
8787
.parse2(quote! { adaptor_config_path: std::option::Option<std::string::String> })
8888
.unwrap(),
8989
);
90+
91+
// Restart duration period
92+
fields.named.push(
93+
syn::Field::parse_named
94+
.parse2(quote! {
95+
#[serde(skip_serializing)]
96+
proc_restart_duration_period: std::option::Option<std::time::Duration>
97+
})
98+
.unwrap(),
99+
);
100+
101+
// Max restart period
102+
fields.named.push(
103+
syn::Field::parse_named
104+
.parse2(quote! {
105+
#[serde(skip_serializing)]
106+
proc_max_restart_period: std::option::Option<std::primitive::u32>
107+
})
108+
.unwrap(),
109+
);
90110
}
91111

92112
Ok(item_struct)
@@ -102,6 +122,10 @@ fn generate_struct_impl_proc_settings(
102122
fn get_adaptor_config_path(&self) -> std::option::Option<&std::string::String> {
103123
self.adaptor_config_path.as_ref()
104124
}
125+
126+
fn get_proc_restart_delay(&self) -> (std::time::Duration, u32) {
127+
(self.proc_restart_duration_period.unwrap_or(std::time::Duration::from_millis(50)), self.proc_max_restart_period.unwrap_or(300))
128+
}
105129
}
106130
})
107131
}
@@ -124,6 +148,18 @@ pub(crate) fn proc_settings_impl(item: syn::Item) -> syn::parse::Result<proc_mac
124148
.unwrap(),
125149
);
126150
x.fields.push_punct(syn::token::Comma::default());
151+
x.fields.push_value(
152+
syn::FieldValue::parse
153+
.parse2(quote! { proc_restart_duration_period: None })
154+
.unwrap(),
155+
);
156+
x.fields.push_punct(syn::token::Comma::default());
157+
x.fields.push_value(
158+
syn::FieldValue::parse
159+
.parse2(quote! { proc_max_restart_period: None })
160+
.unwrap(),
161+
);
162+
x.fields.push_punct(syn::token::Comma::default());
127163
})?
128164
.into_token_stream()),
129165
_ => Err(syn::Error::new(

0 commit comments

Comments
 (0)