Skip to content

feat: improve documentation and error handling #456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions wrappers/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
//! Wrappers is a development framework for Postgres Foreign Data Wrappers (FDW).
//! This crate provides the core functionality and implementations for various FDWs.
//!
//! # Features
//! - Easy to implement FDW interface
//! - Support for rich data types
//! - Support both sync and async backends
//! - Built on top of pgrx

use pgrx::prelude::*;

pg_module_magic!();

extension_sql_file!("../sql/bootstrap.sql", bootstrap);
extension_sql_file!("../sql/finalize.sql", finalize);

mod fdw;
mod stats;
/// FDW implementations for various data sources
pub mod fdw;
/// Statistics collection and reporting utilities
pub mod stats;

#[cfg(test)]
pub mod pg_test {
Expand Down
116 changes: 87 additions & 29 deletions wrappers/src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
//! Statistics collection module for Foreign Data Wrappers.
//! This module provides functionality to track and report various metrics
//! about FDW usage and performance.

use pgrx::{prelude::*, JsonB};
use std::fmt;
use supabase_wrappers::prelude::report_warning;

// fdw stats table name
const FDW_STATS_TABLE: &str = "wrappers_fdw_stats";
/// The name of the table storing FDW statistics
const WRAPPERS_STATS_TABLE_NAME: &str = "wrappers_fdw_stats";

// metric list
/// Metrics that can be collected for FDWs
#[allow(dead_code)]
#[derive(Debug, Clone, Copy)]
pub(crate) enum Metric {
/// Number of times the FDW has been created
CreateTimes,
/// Number of rows read from the foreign source
RowsIn,
/// Number of rows written to the foreign source
RowsOut,
/// Number of bytes read from the foreign source
BytesIn,
/// Number of bytes written to the foreign source
BytesOut,
}

Expand All @@ -28,88 +37,137 @@ impl fmt::Display for Metric {
}
}

// get stats table full qualified name
fn get_stats_table() -> String {
/// Returns the fully qualified name of the statistics table
fn get_stats_table() -> Result<String, &'static str> {
let sql = format!(
"select b.nspname || '.{}'
from pg_catalog.pg_extension a join pg_namespace b on a.extnamespace = b.oid
where a.extname = 'wrappers'",
FDW_STATS_TABLE
WRAPPERS_STATS_TABLE_NAME
);

Spi::get_one(&sql)
.expect("wrappers extension should be installed")
.expect("fdw stats table should be created")
.map_err(|_| "wrappers extension is not installed")?
.ok_or("fdw stats table is not created")
}

/// Checks if the current transaction is read-only
fn is_txn_read_only() -> bool {
Spi::get_one("show transaction_read_only") == Ok(Some("on"))
}

// increase stats value
/// Increments a specific metric for the given FDW
///
/// # Arguments
/// * `fdw_name` - Name of the FDW
/// * `metric` - The metric to increment
/// * `inc` - The increment value
///
/// # Note
/// This function is a no-op in read-only transactions
#[allow(dead_code)]
pub(crate) fn inc_stats(fdw_name: &str, metric: Metric, inc: i64) {
if is_txn_read_only() {
return;
}

let stats_table = match get_stats_table() {
Ok(table) => table,
Err(e) => {
report_warning(&format!("Failed to get stats table: {}", e));
return;
}
};

let sql = format!(
"insert into {} as s (fdw_name, {}) values($1, $2)
on conflict(fdw_name)
do update set
{} = coalesce(s.{}, 0) + excluded.{},
updated_at = timezone('utc'::text, now())",
get_stats_table(),
metric,
metric,
metric,
metric
stats_table, metric, metric, metric, metric
);
Spi::run_with_args(

if let Err(e) = Spi::run_with_args(
&sql,
Some(vec![
(PgBuiltInOids::TEXTOID.oid(), fdw_name.into_datum()),
(PgBuiltInOids::INT8OID.oid(), inc.into_datum()),
]),
)
.expect("should insert into fdw stats table");
) {
report_warning(&format!("Failed to increment stats: {}", e));
}
}

// get metadata
/// Retrieves metadata for the specified FDW
///
/// # Arguments
/// * `fdw_name` - Name of the FDW
///
/// # Returns
/// Optional JSON metadata associated with the FDW
#[allow(dead_code)]
pub(crate) fn get_metadata(fdw_name: &str) -> Option<JsonB> {
let sql = format!(
"select metadata from {} where fdw_name = $1",
get_stats_table()
);
Spi::get_one_with_args(
let stats_table = match get_stats_table() {
Ok(table) => table,
Err(e) => {
report_warning(&format!("Failed to get stats table: {}", e));
return None;
}
};

let sql = format!("select metadata from {} where fdw_name = $1", stats_table);

match Spi::get_one_with_args(
&sql,
vec![(PgBuiltInOids::TEXTOID.oid(), fdw_name.into_datum())],
)
.unwrap_or_default()
) {
Ok(metadata) => metadata,
Err(e) => {
report_warning(&format!("Failed to get metadata: {}", e));
None
}
}
}

// set metadata
/// Sets metadata for the specified FDW
///
/// # Arguments
/// * `fdw_name` - Name of the FDW
/// * `metadata` - JSON metadata to associate with the FDW
///
/// # Note
/// This function is a no-op in read-only transactions
#[allow(dead_code)]
pub(crate) fn set_metadata(fdw_name: &str, metadata: Option<JsonB>) {
if is_txn_read_only() {
return;
}

let stats_table = match get_stats_table() {
Ok(table) => table,
Err(e) => {
report_warning(&format!("Failed to get stats table: {}", e));
return;
}
};

let sql = format!(
"insert into {} as s (fdw_name, metadata) values($1, $2)
on conflict(fdw_name)
do update set
metadata = $2,
updated_at = timezone('utc'::text, now())",
get_stats_table()
stats_table
);

if let Err(err) = Spi::run_with_args(
&sql,
Some(vec![
(PgBuiltInOids::TEXTOID.oid(), fdw_name.into_datum()),
(PgBuiltInOids::JSONBOID.oid(), metadata.into_datum()),
]),
) {
report_warning(&format!("set fdw stats metadata failed: {}", err));
};
report_warning(&format!("Failed to set metadata: {}", err));
}
}
Loading