Skip to content

Commit

Permalink
refactor(backfiller): skip queueing a transaction that is already pro…
Browse files Browse the repository at this point in the history
…cessed at
  • Loading branch information
kespinola committed Dec 14, 2023
1 parent 07132c0 commit 57d42c0
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
16 changes: 15 additions & 1 deletion migration/src/m20231208_103949_create_tree_transactions_table.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use sea_orm_migration::prelude::*;
use sea_orm_migration::{sea_orm::ConnectionTrait, prelude::*};
use sea_orm::Statement;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
manager
.create_table(
Table::create()
Expand All @@ -25,6 +27,14 @@ impl MigrationTrait for Migration {
)
.await?;

let stmt = Statement::from_sql_and_values(
manager.get_database_backend(),
r#"CREATE INDEX signature_processed_at_not_null_index ON tree_transactions (signature, processed_at) WHERE processed_at IS NOT NULL"#,
[]
);

db.execute(stmt).await?;

manager
.create_index(
Index::create()
Expand All @@ -38,6 +48,10 @@ impl MigrationTrait for Migration {
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(Index::drop().name("signature_processed_at_null_index").table(TreeTransactions::Table).to_owned())
.await?;

manager
.drop_index(Index::drop().name("tree_slot_index").table(TreeTransactions::Table).to_owned())
.await?;
Expand Down
15 changes: 15 additions & 0 deletions tree_backfiller/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use borsh::BorshDeserialize;
use clap::Args;
use digital_asset_types::dao::tree_transactions;
use flatbuffers::FlatBufferBuilder;
use log::info;
use plerkle_serialization::serializer::seralize_encoded_transaction_with_status;
use sea_orm::{
sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait,
Expand Down Expand Up @@ -132,6 +133,20 @@ impl TreeResponse {
let slot = i64::try_from(sig.slot)?;
let sig = Signature::from_str(&sig.signature)?;

let tree_transaction_processed = tree_transactions::Entity::find()
.filter(
tree_transactions::Column::Signature
.eq(sig.to_string())
.and(tree_transactions::Column::ProcessedAt.is_not_null()),
)
.one(&conn)
.await?;

if tree_transaction_processed.is_some() {
info!("skipping previously processed transaction {}", sig);
continue;
}

let tree_transaction = tree_transactions::ActiveModel {
signature: Set(sig.to_string()),
tree: Set(self.pubkey.as_ref().to_vec()),
Expand Down

0 comments on commit 57d42c0

Please sign in to comment.