Skip to content

Commit

Permalink
revert Transaction and AsyncTransaction function signatures update
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Feb 20, 2025
1 parent 3b68270 commit 5a6a9a9
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 76 deletions.
10 changes: 2 additions & 8 deletions refinery_core/src/drivers/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ use std::convert::Infallible;
impl Transaction for Config {
type Error = Infallible;

fn execute<'a, T: Iterator<Item = &'a str>>(
&mut self,
_queries: T,
) -> Result<usize, Self::Error> {
fn execute(&mut self, _queries: &[&str]) -> Result<usize, Self::Error> {
Ok(0)
}
}
Expand All @@ -36,10 +33,7 @@ impl Query<Vec<Migration>> for Config {
impl AsyncTransaction for Config {
type Error = Infallible;

async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
&mut self,
_queries: T,
) -> Result<usize, Self::Error> {
async fn execute(&mut self, _queries: &[&str]) -> Result<usize, Self::Error> {
Ok(0)
}
}
Expand Down
14 changes: 4 additions & 10 deletions refinery_core/src/drivers/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,10 @@ fn query_applied_migrations(
impl Transaction for Conn {
type Error = MError;

fn execute<'a, T: Iterator<Item = &'a str>>(
&mut self,
queries: T,
) -> Result<usize, Self::Error> {
fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
let mut transaction = self.start_transaction(get_tx_opts())?;
let mut count = 0;
for query in queries {
for query in queries.iter() {
transaction.query_iter(query)?;
count += 1;
}
Expand All @@ -61,14 +58,11 @@ impl Transaction for Conn {
impl Transaction for PooledConn {
type Error = MError;

fn execute<'a, T: Iterator<Item = &'a str>>(
&mut self,
queries: T,
) -> Result<usize, Self::Error> {
fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
let mut transaction = self.start_transaction(get_tx_opts())?;
let mut count = 0;

for query in queries {
for query in queries.iter() {
transaction.query_iter(query)?;
count += 1;
}
Expand Down
7 changes: 2 additions & 5 deletions refinery_core/src/drivers/mysql_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,15 @@ async fn query_applied_migrations<'a>(
impl AsyncTransaction for Pool {
type Error = MError;

async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
&mut self,
queries: T,
) -> Result<usize, Self::Error> {
async fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
let mut conn = self.get_conn().await?;
let mut options = TxOpts::new();
options.with_isolation_level(Some(IsolationLevel::ReadCommitted));

let mut transaction = conn.start_transaction(options).await?;
let mut count = 0;
for query in queries {
transaction.query_drop(query).await?;
transaction.query_drop(*query).await?;
count += 1;
}
transaction.commit().await?;
Expand Down
7 changes: 2 additions & 5 deletions refinery_core/src/drivers/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@ fn query_applied_migrations(
impl Transaction for PgClient {
type Error = PgError;

fn execute<'a, T: Iterator<Item = &'a str>>(
&mut self,
queries: T,
) -> Result<usize, Self::Error> {
fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
let mut transaction = PgClient::transaction(self)?;
let mut count = 0;
for query in queries {
for query in queries.iter() {
PgTransaction::batch_execute(&mut transaction, query)?;
count += 1;
}
Expand Down
7 changes: 2 additions & 5 deletions refinery_core/src/drivers/rusqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,10 @@ fn query_applied_migrations(

impl Transaction for RqlConnection {
type Error = RqlError;
fn execute<'a, T: Iterator<Item = &'a str>>(
&mut self,
queries: T,
) -> Result<usize, Self::Error> {
fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
let transaction = self.transaction()?;
let mut count = 0;
for query in queries {
for query in queries.iter() {
transaction.execute_batch(query)?;
count += 1;
}
Expand Down
7 changes: 2 additions & 5 deletions refinery_core/src/drivers/tiberius.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,13 @@ where
{
type Error = Error;

async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
&mut self,
queries: T,
) -> Result<usize, Self::Error> {
async fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
// Tiberius doesn't support transactions, see https://github.com/prisma/tiberius/issues/28
self.simple_query("BEGIN TRAN T1;").await?;
let mut count = 0;
for query in queries {
// Drop the returning `QueryStream<'a>` to avoid compiler complaning regarding lifetimes
if let Err(err) = self.simple_query(query).await.map(drop) {
if let Err(err) = self.simple_query(*query).await.map(drop) {
if let Err(err) = self.simple_query("ROLLBACK TRAN T1").await {
log::error!("could not ROLLBACK transaction, {}", err);
}
Expand Down
5 changes: 1 addition & 4 deletions refinery_core/src/drivers/tokio_postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ async fn query_applied_migrations(
impl AsyncTransaction for Client {
type Error = PgError;

async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
&mut self,
queries: T,
) -> Result<usize, Self::Error> {
async fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error> {
let transaction = self.transaction().await?;
let mut count = 0;
for query in queries {
Expand Down
28 changes: 10 additions & 18 deletions refinery_core/src/traits/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ use std::string::ToString;
pub trait AsyncTransaction {
type Error: std::error::Error + Send + Sync + 'static;

async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
&mut self,
queries: T,
) -> Result<usize, Self::Error>;
async fn execute(&mut self, query: &[&str]) -> Result<usize, Self::Error>;
}

#[async_trait]
Expand Down Expand Up @@ -46,13 +43,10 @@ async fn migrate<T: AsyncTransaction>(
migration.set_applied();
let update_query = insert_migration_query(&migration, migration_table_name);
transaction
.execute(
[
migration.sql().as_ref().expect("sql must be Some!"),
update_query.as_str(),
]
.into_iter(),
)
.execute(&[
migration.sql().as_ref().expect("sql must be Some!"),
&update_query,
])
.await
.migration_err(
&format!("error applying migration {}", migration),
Expand Down Expand Up @@ -111,10 +105,10 @@ async fn migrate_grouped<T: AsyncTransaction>(
);
}

let refs = grouped_migrations.iter().map(AsRef::as_ref);
let refs: Vec<&str> = grouped_migrations.iter().map(AsRef::as_ref).collect();

transaction
.execute(refs)
.execute(refs.as_ref())
.await
.migration_err("error applying migrations", None)?;

Expand Down Expand Up @@ -172,11 +166,9 @@ where
target: Target,
migration_table_name: &str,
) -> Result<Report, Error> {
self.execute(
[Self::assert_migrations_table_query(migration_table_name).as_str()].into_iter(),
)
.await
.migration_err("error asserting migrations table", None)?;
self.execute(&[&Self::assert_migrations_table_query(migration_table_name)])
.await
.migration_err("error asserting migrations table", None)?;

let applied_migrations = self
.get_applied_migrations(migration_table_name)
Expand Down
27 changes: 11 additions & 16 deletions refinery_core/src/traits/sync.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::ops::Deref;

use crate::error::WrapMigrationError;
use crate::traits::{
insert_migration_query, verify_migrations, ASSERT_MIGRATIONS_TABLE_QUERY,
Expand All @@ -10,10 +8,7 @@ use crate::{Error, Migration, Report, Target};
pub trait Transaction {
type Error: std::error::Error + Send + Sync + 'static;

fn execute<'a, T: Iterator<Item = &'a str>>(
&mut self,
queries: T,
) -> Result<usize, Self::Error>;
fn execute(&mut self, queries: &[&str]) -> Result<usize, Self::Error>;
}

pub trait Query<T>: Transaction {
Expand All @@ -25,7 +20,7 @@ pub fn migrate<T: Transaction>(
migrations: Vec<Migration>,
target: Target,
migration_table_name: &str,
grouped: bool,
batched: bool,
) -> Result<Report, Error> {
let mut migration_batch = Vec::new();
let mut applied_migrations = Vec::new();
Expand Down Expand Up @@ -54,7 +49,7 @@ pub fn migrate<T: Transaction>(
migration_batch.push(insert_migration);
}

match (target, grouped) {
match (target, batched) {
(Target::Fake | Target::FakeVersion(_), _) => {
log::info!("not going to apply any migration as fake flag is enabled");
}
Expand All @@ -73,14 +68,16 @@ pub fn migrate<T: Transaction>(
}
};

if grouped {
let refs: Vec<&str> = migration_batch.iter().map(AsRef::as_ref).collect();

if batched {
transaction
.execute(migration_batch.iter().map(Deref::deref))
.execute(refs.as_ref())
.migration_err("error applying migrations", None)?;
} else {
for (i, update) in migration_batch.into_iter().enumerate() {
for (i, update) in refs.iter().enumerate() {
transaction
.execute([update.as_str()].into_iter())
.execute(&[update])
.migration_err("error applying update", Some(&applied_migrations[0..i / 2]))?;
}
}
Expand Down Expand Up @@ -108,10 +105,8 @@ where
fn assert_migrations_table(&mut self, migration_table_name: &str) -> Result<usize, Error> {
// Needed cause some database vendors like Mssql have a non sql standard way of checking the migrations table,
// thou on this case it's just to be consistent with the async trait `AsyncMigrate`
self.execute(
[Self::assert_migrations_table_query(migration_table_name).as_str()].into_iter(),
)
.migration_err("error asserting migrations table", None)
self.execute(&[Self::assert_migrations_table_query(migration_table_name).as_str()])
.migration_err("error asserting migrations table", None)
}

fn get_last_applied_migration(
Expand Down

0 comments on commit 5a6a9a9

Please sign in to comment.