Skip to content

Commit

Permalink
Merge pull request MaterializeInc#2000 from benesch/catalog-errors
Browse files Browse the repository at this point in the history
catalog: use structured errors
  • Loading branch information
benesch authored Feb 19, 2020
2 parents 9c21e8b + bfc6bec commit ae2d0f1
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 94 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ publish = false
path = "lib.rs"

[dependencies]
backtrace = "0.3.43"
bincode = { version = "1.2", optional = true }
dataflow-types = { path = "../dataflow-types" }
expr = { path = "../expr" }
Expand Down
99 changes: 99 additions & 0 deletions src/catalog/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright Materialize, Inc. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fmt;

use backtrace::Backtrace;

#[derive(Debug)]
pub struct Error {
kind: ErrorKind,
backtrace: Backtrace,
}

#[derive(Debug)]
pub(crate) enum ErrorKind {
Corruption { detail: String },
IdExhaustion,
UnknownDatabase(String),
UnknownSchema(String),
UnknownItem(String),
DatabaseAlreadyExists(String),
SchemaAlreadyExists(String),
ItemAlreadyExists(String),
UnacceptableSchemaName(String),
ReadOnlySystemSchema(String),
UnsatisfiableLoggingDependency { depender_name: String },
Storage(rusqlite::Error),
}

impl Error {
pub(crate) fn new(kind: ErrorKind) -> Error {
Error {
kind,
backtrace: Backtrace::new_unresolved(),
}
}
}

impl From<rusqlite::Error> for Error {
fn from(e: rusqlite::Error) -> Error {
Error::new(ErrorKind::Storage(e))
}
}

impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match &self.kind {
ErrorKind::Corruption { .. }
| ErrorKind::IdExhaustion
| ErrorKind::UnknownDatabase(_)
| ErrorKind::UnknownSchema(_)
| ErrorKind::UnknownItem(_)
| ErrorKind::DatabaseAlreadyExists(_)
| ErrorKind::SchemaAlreadyExists(_)
| ErrorKind::ItemAlreadyExists(_)
| ErrorKind::UnacceptableSchemaName(_)
| ErrorKind::ReadOnlySystemSchema(_)
| ErrorKind::UnsatisfiableLoggingDependency { .. } => None,
ErrorKind::Storage(e) => Some(e),
}
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &self.kind {
ErrorKind::Corruption { detail } => write!(f, "corrupt catalog: {}", detail),
ErrorKind::IdExhaustion => write!(f, "id counter overflows i64"),
ErrorKind::UnknownDatabase(name) => write!(f, "unknown database '{}'", name),
ErrorKind::UnknownSchema(name) => write!(f, "unknown schema '{}'", name),
ErrorKind::UnknownItem(name) => write!(f, "unknown catalog item '{}'", name),
ErrorKind::DatabaseAlreadyExists(name) => {
write!(f, "database '{}' already exists", name)
}
ErrorKind::SchemaAlreadyExists(name) => write!(f, "schema '{}' already exists", name),
ErrorKind::ItemAlreadyExists(name) => {
write!(f, "catalog item '{}' already exists", name)
}
ErrorKind::UnacceptableSchemaName(name) => {
write!(f, "unacceptable schema name '{}'", name)
}
ErrorKind::ReadOnlySystemSchema(name) => {
write!(f, "system schema '{}' cannot be modified", name)
}
ErrorKind::UnsatisfiableLoggingDependency { depender_name } => write!(
f,
"catalog item '{}' depends on system logging, but logging is disabled",
depender_name
),
ErrorKind::Storage(e) => write!(f, "sqlite error: {}", e),
}
}
}
48 changes: 28 additions & 20 deletions src/catalog/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ use dataflow_types::{SinkConnector, SourceConnector};
use expr::{EvalEnv, GlobalId, Id, IdHumanizer, OptimizedRelationExpr, ScalarExpr};
use repr::RelationDesc;

use crate::error::{Error, ErrorKind};
use crate::names::{DatabaseSpecifier, FullName, PartialName};

mod error;
pub mod names;

pub mod sql;

/// A `Catalog` keeps track of the SQL objects known to the planner.
Expand Down Expand Up @@ -205,7 +206,7 @@ impl Catalog {
/// Opens or creates a `Catalog` that stores data at `path`. The
/// `initialize` callback will be invoked after database and schemas are
/// loaded but before any persisted user items are loaded.
pub fn open<S, F>(path: Option<&Path>, f: F) -> Result<Catalog, failure::Error>
pub fn open<S, F>(path: Option<&Path>, f: F) -> Result<Catalog, Error>
where
S: CatalogItemSerializer,
F: FnOnce(&mut Self),
Expand Down Expand Up @@ -267,15 +268,20 @@ impl Catalog {
// safely even if the error message we're sniffing out changes.
lazy_static! {
static ref LOGGING_ERROR: Regex =
Regex::new("catalog item 'mz_catalog.[^']*' does not exist").unwrap();
Regex::new("unknown catalog item 'mz_catalog.[^']*'").unwrap();
}
let item = match S::deserialize(&catalog, def) {
Ok(item) => item,
Err(e) if LOGGING_ERROR.is_match(&e.to_string()) => bail!(
"catalog item '{}' depends on system logging, but logging is disabled",
name
),
Err(e) => bail!("corrupt catalog: failed to deserialize item: {}", e),
Err(e) if LOGGING_ERROR.is_match(&e.to_string()) => {
return Err(Error::new(ErrorKind::UnsatisfiableLoggingDependency {
depender_name: name.to_string(),
}));
}
Err(e) => {
return Err(Error::new(ErrorKind::Corruption {
detail: format!("failed to deserialize item: {}", e),
}))
}
};
catalog.insert_item(id, name, item);
}
Expand All @@ -301,7 +307,7 @@ impl Catalog {
self.storage.clone()
}

pub fn allocate_id(&mut self) -> Result<GlobalId, failure::Error> {
pub fn allocate_id(&mut self) -> Result<GlobalId, Error> {
self.storage().allocate_id()
}

Expand All @@ -315,7 +321,7 @@ impl Catalog {
current_database: DatabaseSpecifier,
search_path: &[&str],
name: &PartialName,
) -> Result<FullName, failure::Error> {
) -> Result<FullName, Error> {
if let (Some(database_name), Some(schema_name)) = (&name.database, &name.schema) {
// `name` is fully specified already. No resolution required.
return Ok(FullName {
Expand Down Expand Up @@ -350,7 +356,7 @@ impl Catalog {
}
}

bail!("catalog item '{}' does not exist", name);
Err(Error::new(ErrorKind::UnknownItem(name.to_string())))
}

/// Returns the named catalog item, if it exists.
Expand All @@ -366,9 +372,9 @@ impl Catalog {
/// Returns the named catalog item, or an error if it does not exist.
///
/// See also [`Catalog::try_get`].
pub fn get(&self, name: &FullName) -> Result<&CatalogEntry, failure::Error> {
pub fn get(&self, name: &FullName) -> Result<&CatalogEntry, Error> {
self.try_get(name)
.ok_or_else(|| failure::err_msg(format!("catalog item '{}' does not exist", name)))
.ok_or_else(|| Error::new(ErrorKind::UnknownItem(name.to_string())))
}

pub fn get_by_id(&self, id: &GlobalId) -> &CatalogEntry {
Expand All @@ -383,7 +389,7 @@ impl Catalog {
pub fn database_resolver<'a>(
&'a self,
database_spec: DatabaseSpecifier,
) -> Result<DatabaseResolver<'a>, failure::Error> {
) -> Result<DatabaseResolver<'a>, Error> {
match &database_spec {
DatabaseSpecifier::Ambient => Ok(DatabaseResolver {
database_spec,
Expand All @@ -396,7 +402,7 @@ impl Catalog {
database,
ambient_schemas: &self.ambient_schemas,
}),
None => bail!("unknown database '{}'", name),
None => Err(Error::new(ErrorKind::UnknownDatabase(name.to_owned()))),
},
}
}
Expand Down Expand Up @@ -526,7 +532,7 @@ impl Catalog {
}
}

pub fn transact(&mut self, ops: Vec<Op>) -> Result<Vec<OpStatus>, failure::Error> {
pub fn transact(&mut self, ops: Vec<Op>) -> Result<Vec<OpStatus>, Error> {
trace!("transact: {:?}", ops);

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -569,12 +575,12 @@ impl Catalog {
schema_name,
} => {
if schema_name.starts_with("mz_") || schema_name.starts_with("pg_") {
bail!("unacceptable schema name '{}'", schema_name);
return Err(Error::new(ErrorKind::UnacceptableSchemaName(schema_name)));
}
let (database_id, database_name) = match database_name {
DatabaseSpecifier::Name(name) => (tx.load_database_id(&name)?, name),
DatabaseSpecifier::Ambient => {
bail!("writing to {} is not allowed", schema_name)
return Err(Error::new(ErrorKind::ReadOnlySystemSchema(schema_name)));
}
};
Action::CreateSchema {
Expand All @@ -587,7 +593,9 @@ impl Catalog {
let database_id = match &name.database {
DatabaseSpecifier::Name(name) => tx.load_database_id(&name)?,
DatabaseSpecifier::Ambient => {
bail!("writing to {} is not allowed", name.schema)
return Err(Error::new(ErrorKind::ReadOnlySystemSchema(
name.to_string(),
)));
}
};
let schema_id = tx.load_schema_id(database_id, &name.schema)?;
Expand All @@ -606,7 +614,7 @@ impl Catalog {
let (database_id, database_name) = match database_name {
DatabaseSpecifier::Name(name) => (tx.load_database_id(&name)?, name),
DatabaseSpecifier::Ambient => {
bail!("dropping {} is not allowed", schema_name)
return Err(Error::new(ErrorKind::ReadOnlySystemSchema(schema_name)));
}
};
tx.remove_schema(database_id, &schema_name)?;
Expand Down
Loading

0 comments on commit ae2d0f1

Please sign in to comment.