Skip to content

Commit

Permalink
Rewrap and propagate GRPC errors
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander V. Nikolaev <[email protected]>
  • Loading branch information
avnik committed Dec 11, 2024
1 parent 6dcc4d6 commit b5ee0d7
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 34 deletions.
69 changes: 57 additions & 12 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub use givc_common::query::{Event, QueryResult};
use givc_common::types::*;

use crate::endpoint::{EndpointConfig, TlsConfig};
use crate::error::rewrap_error;

type Client = pb::admin_service_client::AdminServiceClient<Channel>;

Expand Down Expand Up @@ -94,7 +95,12 @@ impl AdminClient {
vm_name,
args,
};
let _response = self.connect_to().await?.start_application(request).await?;
let _response = self
.connect_to()
.await?
.start_application(request)
.await
.map_err(rewrap_error)?;
Ok(())
}

Expand All @@ -104,7 +110,12 @@ impl AdminClient {
vm_name: None,
args: Vec::new(),
};
let _response = self.connect_to().await?.stop_application(request).await?;
let _response = self
.connect_to()
.await?
.stop_application(request)
.await
.map_err(rewrap_error)?;
Ok(())
}

Expand All @@ -114,7 +125,12 @@ impl AdminClient {
vm_name: None,
args: Vec::new(),
};
let _response = self.connect_to().await?.pause_application(request).await?;
let _response = self
.connect_to()
.await?
.pause_application(request)
.await
.map_err(rewrap_error)?;
Ok(())
}

Expand All @@ -124,31 +140,56 @@ impl AdminClient {
vm_name: None,
args: Vec::new(),
};
let _response = self.connect_to().await?.resume_application(request).await?;
let _response = self
.connect_to()
.await?
.resume_application(request)
.await
.map_err(rewrap_error)?;
Ok(())
}

pub async fn reboot(&self) -> anyhow::Result<()> {
let request = pb::admin::Empty {};
let _response = self.connect_to().await?.reboot(request).await?;
let _response = self
.connect_to()
.await?
.reboot(request)
.await
.map_err(rewrap_error)?;
Ok(())
}

pub async fn poweroff(&self) -> anyhow::Result<()> {
let request = pb::admin::Empty {};
let _response = self.connect_to().await?.poweroff(request).await?;
let _response = self
.connect_to()
.await?
.poweroff(request)
.await
.map_err(rewrap_error)?;
Ok(())
}

pub async fn suspend(&self) -> anyhow::Result<()> {
let request = pb::admin::Empty {};
let _response = self.connect_to().await?.suspend(request).await?;
let _response = self
.connect_to()
.await?
.suspend(request)
.await
.map_err(rewrap_error)?;
Ok(())
}

pub async fn wakeup(&self) -> anyhow::Result<()> {
let request = pb::admin::Empty {};
let _response = self.connect_to().await?.wakeup(request).await?;
let _response = self
.connect_to()
.await?
.wakeup(request)
.await
.map_err(rewrap_error)?;
Ok(())
}

Expand All @@ -165,7 +206,8 @@ impl AdminClient {
self.connect_to()
.await?
.query_list(pb::admin::Empty {})
.await?
.await
.map_err(rewrap_error)?
.into_inner()
.list
.into_iter()
Expand All @@ -177,15 +219,17 @@ impl AdminClient {
self.connect_to()
.await?
.set_locale(pb::admin::LocaleRequest { locale })
.await?;
.await
.map_err(rewrap_error)?;
Ok(())
}

pub async fn set_timezone(&self, timezone: String) -> anyhow::Result<()> {
self.connect_to()
.await?
.set_timezone(pb::admin::TimezoneRequest { timezone })
.await?;
.await
.map_err(rewrap_error)?;
Ok(())
}

Expand All @@ -199,7 +243,8 @@ impl AdminClient {
.connect_to()
.await?
.watch(pb::admin::Empty {})
.await?
.await
.map_err(rewrap_error)?
.into_inner();

let list = match watch.try_next().await? {
Expand Down
17 changes: 17 additions & 0 deletions client/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use anyhow::Error;
use tonic::Status;
use tonic_types::StatusExt;
use tracing::{debug, error};

pub fn rewrap_error(status: Status) -> Error {
let mut err = Error::msg(status.message().to_owned());
let details = status.get_error_details();
if let Some(debug_info) = details.debug_info() {
err = err.context(format!("Detail: {}", debug_info.detail));
err = debug_info
.stack_entries
.iter()
.fold(err, |err, each| err.context(format!("Stack: {}", each)))
};
err
}
1 change: 1 addition & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod client;
pub mod endpoint;
pub mod error;
pub use crate::client::{AdminClient, QueryResult};
2 changes: 1 addition & 1 deletion nixos/modules/admin.nix
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ in
"HOST_KEY" = "${cfg.tls.keyPath}";
}
// attrsets.optionalAttrs cfg.debug {
"RUST_BACKTRACE" = "1";
# "RUST_BACKTRACE" = "1";
"GIVC_LOG" = "debug";
};
};
Expand Down
3 changes: 2 additions & 1 deletion nixos/tests/admin.nix
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ in
];
tls = mkTls "ghaf-host";
};
systemd.services."microvm@foot-vm" = {
systemd.services."microvm@chromium-vm" = {
preStart = ''
expr $(cat /tmp/[email protected] 2>/dev/null || true) + 1 >/tmp/[email protected]
'';
Expand Down Expand Up @@ -363,6 +363,7 @@ in
print(hostvm.succeed("${cli} --addr ${nodes.adminvm.config.givc.admin.addr} --port ${nodes.adminvm.config.givc.admin.port} --cacert ${nodes.hostvm.givc.host.tls.caCertPath} --cert ${nodes.hostvm.givc.host.tls.certPath} --key ${nodes.hostvm.givc.host.tls.keyPath} ${if tls then "" else "--notls"} --name ${nodes.adminvm.config.givc.admin.name} start --vm chromium-vm clearexit"))
with subtest("VM restart"):
appvm.crash()
hostvm.succeed("systemctl stop [email protected]")
time.sleep(20)
appvm.start()
time.sleep(20)
Expand Down
3 changes: 2 additions & 1 deletion src/admin/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl RegistryEntry {
impl RegistryEntry {
pub fn try_from_request(
req: pb::RegistryRequest,
vm_name: String,
vm_name: Option<String>,
) -> Result<Self, anyhow::Error> {
let ty = UnitType::try_from(req.r#type)?;
let status = req
Expand All @@ -107,6 +107,7 @@ impl RegistryEntry {
.ok_or(anyhow!("endpoint missing"))
.and_then(EndpointEntry::try_from)?;
let watch = ty.service == ServiceType::Mgr;
let vm_name = vm_name.unwrap_or(format!("givc-{}.service", endpoint.tls_name));
// FIXME: We currently ignore `req.parent`, what we should do if we got both parent and endpoint
// Protocol very inconsistent here
Ok(Self {
Expand Down
29 changes: 21 additions & 8 deletions src/admin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ impl AdminServiceImpl {
let vmservice = format_service_name(vmname, None);

/* Return error if the vm is not registered */
let endpoint = self.agent_endpoint(&vmservice)?;
let endpoint = self
.agent_endpoint(&vmservice)
.with_context(|| format!("{vmservice} not registered"))?;
let client = SystemDClient::new(endpoint);

/* Check status of the unit */
Expand All @@ -183,7 +185,7 @@ impl AdminServiceImpl {
Err(anyhow!("Service {unit} is not loaded!"))
}
Err(e) => {
eprintln!("Error retrieving unit status: {e}");
error!("Error retrieving unit status: {e}");
Err(e)
}
}
Expand All @@ -196,7 +198,7 @@ impl AdminServiceImpl {
let status = client
.get_remote_status(name.to_string())
.await
.with_context(|| format!("cannot retrieve vm status for {name}"))?;
.with_context(|| format!("cannot retrieve vm status for {name}, host agent failed"))?;

if status.load_state != "loaded" {
bail!("vm {name} not loaded")
Expand All @@ -223,26 +225,33 @@ impl AdminServiceImpl {
}

pub async fn handle_error(&self, entry: RegistryEntry) -> anyhow::Result<()> {
info!(
error!(
"Handling error for {} vm type {} service type {}",
entry.name, entry.r#type.vm, entry.r#type.service
);
match (entry.r#type.vm, entry.r#type.service) {
(VmType::AppVM, ServiceType::App) => {
if entry.status.is_exitted() {
debug!("Deregister exitted {}", entry.name);
self.registry.deregister(&entry.name)?;
}
Ok(())
}
(VmType::AppVM, ServiceType::Mgr) | (VmType::SysVM, ServiceType::Mgr) => {
info!("AppVM/Mgr: deregister");
self.registry.deregister(&entry.name)?;
if let Placement::Managed { vm: vm_name, .. } = entry.placement {
debug!("Restarting VM {vm_name}");
self.start_vm(&vm_name)
.await
.with_context(|| format!("handing error, by restart VM {}", entry.name))?;
}
Ok(()) // FIXME: should use `?` from line above, why it didn't work?
}
(x, y) => bail!("Don't known how to handle_error for VM type: {x:?}:{y:?}"),
(x, y) => {
error!("Don't known how to handle_error for VM type: {x:?}:{y:?}");
Ok(())
}
}
}

Expand Down Expand Up @@ -290,6 +299,7 @@ impl AdminServiceImpl {
if let Err(err) = self.monitor_routine().await {
error!("Error during watch: {err}");
}
info!("{:#?}", self.registry)
}
}

Expand All @@ -311,8 +321,12 @@ impl AdminServiceImpl {

// Entry unused in "go" code
match self.registry.by_name(&systemd_agent_name) {
std::result::Result::Ok(e) => e,
Ok(e) => {
info!("VM {e:#?} already spawned");
e
}
Err(_) => {
info!("Starting up VM {vm_name}");
self.start_vm(&vm_name)
.await
.with_context(|| format!("Starting vm for {name}"))?;
Expand Down Expand Up @@ -369,8 +383,7 @@ impl pb::admin_service_server::AdminService for AdminService {
let vm_name = request
.extensions()
.get::<SecurityInfo>()
.map(move |si| si.hostname().unwrap_or("bogus, no hostname in cert".into()))
.unwrap_or("bogus: no TLS".into());
.map(move |si| si.hostname().unwrap_or("bogus, no hostname in cert".into()));

let req = request.into_inner();
info!("Registering service {:?}", req);
Expand Down
Loading

0 comments on commit b5ee0d7

Please sign in to comment.