Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
Rigidity committed Jan 4, 2025
1 parent 5a43e29 commit 6f8f8da
Show file tree
Hide file tree
Showing 16 changed files with 174 additions and 185 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ chrono = "0.4.38"
glob = "0.3.1"
num-bigint = "0.4.6"
uuid = "1.11.0"
mime-sniffer = "0.1.3"

# Tracing
tracing = "0.1.40"
Expand Down
2 changes: 2 additions & 0 deletions crates/sage-api/src/records/nft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ pub struct NftRecord {
pub struct NftData {
pub blob: Option<String>,
pub mime_type: Option<String>,
pub hash_matches: bool,
pub metadata_json: Option<String>,
pub metadata_hash_matches: bool,
}
2 changes: 1 addition & 1 deletion crates/sage-assets/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ workspace = true
[dependencies]
chia = { workspace = true }
clvmr = { workspace = true }
tokio = { workspace = true, features = ["time"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
num-bigint = { workspace = true, features = ["serde"] }
Expand All @@ -27,3 +26,4 @@ futures-lite = { workspace = true }
futures-util = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
mime-sniffer = { workspace = true }
Empty file removed crates/sage-assets/src/cats.rs
Empty file.
2 changes: 0 additions & 2 deletions crates/sage-assets/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
mod cats;
mod nfts;

pub use cats::*;
pub use nfts::*;
116 changes: 34 additions & 82 deletions crates/sage-assets/src/nfts/fetch_nft_uri.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::time::Duration;

use chia::protocol::Bytes32;
use clvmr::sha2::Sha256;
use futures_lite::StreamExt;
use futures_util::stream::FuturesUnordered;
use mime_sniffer::MimeTypeSniffer;
use reqwest::header::CONTENT_TYPE;
use thiserror::Error;
use tokio::time::timeout;
use tracing::debug;

#[derive(Debug, Clone)]
Expand All @@ -18,71 +16,44 @@ pub struct Data {

#[derive(Debug, Error)]
pub enum UriError {
#[error("Failed to fetch data for URI {0}: {1}")]
Fetch(String, reqwest::Error),

#[error("Timed out fetching data for URI {0}")]
FetchTimeout(String),

#[error("Missing or invalid content type for URI {0}")]
InvalidContentType(String),
#[error("Failed to fetch NFT data: {0}")]
Fetch(#[from] reqwest::Error),

#[error("Failed to stream response bytes for URI {0}: {1}")]
Stream(String, reqwest::Error),
#[error("Missing or invalid content type")]
InvalidContentType,

#[error("Timed out streaming response bytes for URI {0}")]
StreamTimeout(String),
#[error("Mime type mismatch, expected {expected} but found {found}")]
MimeTypeMismatch { expected: String, found: String },

#[error("Mime type mismatch for URI {uri}, expected {expected} but found {found}")]
MimeTypeMismatch {
uri: String,
expected: String,
found: String,
},

#[error("Hash mismatch for URI {uri}, expected {expected} but found {found}")]
HashMismatch {
uri: String,
expected: Bytes32,
found: Bytes32,
},
#[error("Hash mismatch, expected {expected} but found {found}")]
HashMismatch { expected: Bytes32, found: Bytes32 },

#[error("No URIs provided")]
NoUris,
}

pub async fn fetch_uri(
uri: &str,
request_timeout: Duration,
stream_timeout: Duration,
) -> Result<Data, UriError> {
let response = match timeout(request_timeout, reqwest::get(uri)).await {
Ok(Ok(response)) => response,
Ok(Err(error)) => {
return Err(UriError::Fetch(uri.to_string(), error));
}
Err(_) => {
return Err(UriError::FetchTimeout(uri.to_string()));
}
pub async fn fetch_uri(uri: String) -> Result<Data, UriError> {
let response = reqwest::get(uri).await?;

let mime_type = match response.headers().get(CONTENT_TYPE) {
Some(header) => Some(
header
.to_str()
.map(ToString::to_string)
.map_err(|_| UriError::InvalidContentType)?,
),
None => None,
};

let Some(mime_type) = response
.headers()
.get(CONTENT_TYPE)
.cloned()
.and_then(|value| value.to_str().map(ToString::to_string).ok())
else {
return Err(UriError::InvalidContentType(uri.to_string()));
};
let blob = response.bytes().await?.to_vec();

let blob = match timeout(stream_timeout, response.bytes()).await {
Ok(Ok(data)) => data.to_vec(),
Ok(Err(error)) => {
return Err(UriError::Stream(uri.to_string(), error));
}
Err(_) => {
return Err(UriError::StreamTimeout(uri.to_string()));
}
let mime_type = if let Some(mime_type) = mime_type {
mime_type
} else {
blob.as_slice()
.sniff_mime_type()
.unwrap_or("image/png")
.to_string()
};

let mut hasher = Sha256::new();
Expand All @@ -96,23 +67,16 @@ pub async fn fetch_uri(
})
}

pub async fn fetch_uris(
uris: Vec<String>,
request_timeout: Duration,
stream_timeout: Duration,
) -> Result<Data, UriError> {
pub async fn fetch_uris_without_hash(uris: Vec<String>) -> Result<Data, UriError> {
let mut futures = FuturesUnordered::new();

for uri in uris {
futures.push(async move {
let result = fetch_uri(&uri, request_timeout, stream_timeout).await;
(uri, result)
});
futures.push(fetch_uri(uri));
}

let mut data = None;

while let Some((uri, result)) = futures.next().await {
while let Some(result) = futures.next().await {
let item = result?;

let Some(old_data) = data.take() else {
Expand All @@ -122,41 +86,29 @@ pub async fn fetch_uris(

if old_data.hash != item.hash {
return Err(UriError::HashMismatch {
uri,
expected: old_data.hash,
found: item.hash,
});
}

if old_data.mime_type != item.mime_type {
return Err(UriError::MimeTypeMismatch {
uri,
expected: old_data.mime_type,
found: item.mime_type,
});
}

assert_eq!(old_data.blob, item.blob);

data = Some(item);
data = Some(old_data);
}

data.ok_or(UriError::NoUris)
}

pub async fn lookup_from_uris_with_hash(
uris: Vec<String>,
request_timeout: Duration,
stream_timeout: Duration,
hash: Bytes32,
) -> Option<Data> {
pub async fn fetch_uris_with_hash(uris: Vec<String>, hash: Bytes32) -> Option<Data> {
let mut futures = FuturesUnordered::new();

for uri in uris {
futures.push(async move {
let result = fetch_uri(&uri, request_timeout, stream_timeout).await;
(uri, result)
});
futures.push(async move { (uri.clone(), fetch_uri(uri).await) });
}

while let Some((uri, result)) = futures.next().await {
Expand Down
27 changes: 19 additions & 8 deletions crates/sage-database/src/primitives/nfts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
pub struct NftData {
pub blob: Vec<u8>,
pub mime_type: String,
pub hash_matches: bool,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -233,8 +234,13 @@ impl<'a> DatabaseTx<'a> {
insert_nft_uri(&mut *self.tx, uri, hash).await
}

pub async fn set_nft_uri_checked(&mut self, uri: String, hash: Bytes32) -> Result<()> {
set_nft_uri_checked(&mut *self.tx, uri, hash).await
pub async fn set_nft_uri_checked(
&mut self,
uri: String,
hash: Bytes32,
hash_matches: Option<bool>,
) -> Result<()> {
set_nft_uri_checked(&mut *self.tx, uri, hash, hash_matches).await
}

pub async fn insert_nft_data(&mut self, hash: Bytes32, nft_data: NftData) -> Result<()> {
Expand Down Expand Up @@ -515,10 +521,11 @@ async fn insert_nft_uri(conn: impl SqliteExecutor<'_>, uri: String, hash: Bytes3
let hash = hash.as_ref();

sqlx::query!(
"INSERT OR IGNORE INTO `nft_uris` (`hash`, `uri`, `checked`) VALUES (?, ?, ?)",
"INSERT OR IGNORE INTO `nft_uris` (`hash`, `uri`, `checked`, `hash_matches`) VALUES (?, ?, ?, ?)",
hash,
uri,
false
false,
None::<bool>
)
.execute(conn)
.await?;
Expand Down Expand Up @@ -550,11 +557,13 @@ async fn set_nft_uri_checked(
conn: impl SqliteExecutor<'_>,
uri: String,
hash: Bytes32,
hash_matches: Option<bool>,
) -> Result<()> {
let hash = hash.as_ref();

sqlx::query!(
"UPDATE `nft_uris` SET `checked` = 1 WHERE `hash` = ? AND `uri` = ?",
"UPDATE `nft_uris` SET `checked` = 1, `hash_matches` = ? WHERE `hash` = ? AND `uri` = ?",
hash_matches,
hash,
uri
)
Expand All @@ -574,10 +583,11 @@ async fn insert_nft_data(
let mime_type = nft_data.mime_type;

sqlx::query!(
"INSERT OR IGNORE INTO `nft_data` (`hash`, `data`, `mime_type`) VALUES (?, ?, ?)",
"REPLACE INTO `nft_data` (`hash`, `data`, `mime_type`, `hash_matches`) VALUES (?, ?, ?, ?)",
hash,
data,
mime_type
mime_type,
nft_data.hash_matches
)
.execute(conn)
.await?;
Expand All @@ -589,7 +599,7 @@ async fn fetch_nft_data(conn: impl SqliteExecutor<'_>, hash: Bytes32) -> Result<
let hash = hash.as_ref();

let row = sqlx::query!(
"SELECT `data`, `mime_type` FROM `nft_data` WHERE `hash` = ?",
"SELECT `data`, `mime_type`, `hash_matches` FROM `nft_data` WHERE `hash` = ?",
hash
)
.fetch_optional(conn)
Expand All @@ -598,6 +608,7 @@ async fn fetch_nft_data(conn: impl SqliteExecutor<'_>, hash: Bytes32) -> Result<
Ok(row.map(|row| NftData {
blob: row.data,
mime_type: row.mime_type,
hash_matches: row.hash_matches,
}))
}

Expand Down
Loading

0 comments on commit 6f8f8da

Please sign in to comment.