From 6a71cdfaaf80b45f95a9036cc6012fb161e800e0 Mon Sep 17 00:00:00 2001 From: ivan770 Date: Sun, 4 Apr 2021 15:46:40 +0200 Subject: [PATCH] Added support for service restarts, restart GCS after 305 secs --- src/db.rs | 2 +- src/gcs.rs | 25 +++++++++++++++++++++++-- src/recognition.rs | 5 +++-- src/service.rs | 39 +++++++++++++++++++++++++++++++-------- 4 files changed, 58 insertions(+), 13 deletions(-) diff --git a/src/db.rs b/src/db.rs index 326cfcc..e16cc6a 100644 --- a/src/db.rs +++ b/src/db.rs @@ -36,7 +36,7 @@ impl HandlerDatabase { /// Receive new WebSocket notification. /// - /// This method will yield to executor if there are no transcriptions in queue. + /// This method will yield to executor if there are no notifications in queue. pub async fn recv_notification(&self) -> WsNotification { self.ws_notifications.pop().await } diff --git a/src/gcs.rs b/src/gcs.rs index 36ab20d..f0a8ef0 100644 --- a/src/gcs.rs +++ b/src/gcs.rs @@ -12,7 +12,7 @@ use thiserror::Error; use tonic::{ metadata::{errors::InvalidMetadataValue, MetadataValue}, transport::{Certificate, Channel, ClientTlsConfig, Error as TransportError}, - Request, Status, + Code, Request, Status, }; use tracing::warn; use yup_oauth2::{ @@ -56,10 +56,27 @@ pub enum CloudSpeechError { AuthenticationError(#[from] OauthError), #[error("Unable to call gRPC API: {0}")] - CallError(#[from] Status), + CallError(Status), #[error("Unable to send transcription to background service: {0}")] FlumeError(#[from] SendError), + + #[error("Max GCS stream duration is 305 seconds.")] + TooLongStream(Status), +} + +impl From for CloudSpeechError { + fn from(error: Status) -> Self { + match error.code() { + Code::OutOfRange + if error.message() + == "Exceeded maximum allowed stream duration of 305 seconds." => + { + CloudSpeechError::TooLongStream(error) + } + _ => CloudSpeechError::CallError(error), + } + } } pub mod await_time { @@ -249,6 +266,10 @@ where .map_err(CloudSpeechError::from)) } } + + fn restartable(error: &Self::Error) -> bool { + matches!(error, CloudSpeechError::TooLongStream(_)) + } } impl<'c> FromConfig<'c> for GoogleCloudSpeech diff --git a/src/recognition.rs b/src/recognition.rs index 9246345..91a051e 100644 --- a/src/recognition.rs +++ b/src/recognition.rs @@ -36,6 +36,7 @@ pub struct SpeechRecognitionResponse { /// specific options via [`SpeechRecognitionConfig`] /// /// [`application config`]: Config +#[derive(Clone)] pub struct SpeechRecognitionServiceConfig<'c> { /// Application configuration. pub application_config: &'c Config, @@ -54,9 +55,11 @@ pub struct SpeechRecognitionConfig { pub language: String, /// Enable profanity filter (if provider supports it)? + #[serde(default)] pub profanity_filter: bool, /// Enable punctuation guessing (if provider supports it)? + #[serde(default)] pub punctuation: bool, } @@ -73,10 +76,8 @@ impl Default for SpeechRecognitionConfig { #[pin_project] pub struct SpeechRecognitionSink { id: Uuid, - #[pin] database: Arc, - _error: PhantomData, } diff --git a/src/service.rs b/src/service.rs index 76a6fad..b871ac9 100644 --- a/src/service.rs +++ b/src/service.rs @@ -54,6 +54,13 @@ where /// Start process of audio streaming. fn stream(self, stream: S) -> Self::Fut; + + /// Determine if service should be restarted in case of error. + /// + /// By default, in case of fail there will be no restart for a service. + fn restartable(_: &Self::Error) -> bool { + false + } } /// Create a [`Service`] from provided config, possibly failing to do so. @@ -82,7 +89,8 @@ async fn spawn_service( mut sink: O, ) -> Result<(), >::Error> where - I: Stream>::Input> + Send + Sync + 'static, + C: Clone, + I: Stream>::Input> + Clone + Send + Sync + 'static, O: Sink<<>::Ok as TryStream>::Ok, Error = >::Error> + Send + Unpin, @@ -94,13 +102,28 @@ where >::Error: From<>::Error> + From<::Error>, { - S::from_config(config) - .await? - .stream(stream) - .await? - .map_err(>::Error::from) - .forward(&mut sink) - .await?; + macro_rules! spawner { + ($config:expr, $stream:expr, $sink:expr) => { + S::from_config(config.clone()) + .await? + .stream(stream.clone()) + .await? + .map_err(>::Error::from) + .forward(&mut sink) + .await; + }; + } + + let mut spawned = spawner!(config.clone(), stream.clone(), &mut sink); + + while let Err(ref e) = spawned { + if >::restartable(e) { + tracing::warn!("Restarting service due to error match"); + spawned = spawner!(config.clone(), stream.clone(), &mut sink); + } else { + return spawned; + } + } Ok(()) }