Skip to content

Commit

Permalink
refactor: revert db changes and reimplement skip_metdata_json_downloa…
Browse files Browse the repository at this point in the history
…d logic
  • Loading branch information
Nagaprasadvr committed Jan 17, 2025
1 parent 91c1ada commit 5bcb9fe
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 232 deletions.
30 changes: 17 additions & 13 deletions core/src/metadata_json.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use {
backon::{ExponentialBuilder, Retryable},
clap::Parser,
digital_asset_types::dao::{asset_data, sea_orm_active_enums::LastRequestedStatusCode},
digital_asset_types::dao::asset_data::{self},
futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt},
indicatif::HumanDuration,
log::{debug, error},
reqwest::{Client, Url as ReqwestUrl},
sea_orm::{entity::*, SqlxPostgresConnector},
sea_orm::{entity::*, ConnectionTrait, SqlxPostgresConnector, TransactionTrait},
serde::{Deserialize, Serialize},
tokio::{
sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender},
Expand All @@ -17,9 +17,9 @@ use {

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DownloadMetadataInfo {
pub asset_data_id: Vec<u8>,
pub uri: String,
pub slot: i64,
asset_data_id: Vec<u8>,
uri: String,
slot: i64,
}

impl DownloadMetadataInfo {
Expand Down Expand Up @@ -206,17 +206,12 @@ pub async fn perform_metadata_json_task(
download_metadata_info: &DownloadMetadataInfo,
) -> Result<asset_data::Model, MetadataJsonTaskError> {
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
let start = Instant::now();
let fetch_res = fetch_metadata_json(client, &download_metadata_info.uri).await;
let time_elapsed = start.elapsed().as_millis() as u64;
match fetch_res {
match fetch_metadata_json(client, &download_metadata_info.uri).await {
Ok(metadata) => {
let active_model = asset_data::ActiveModel {
id: Set(download_metadata_info.asset_data_id.clone()),
metadata: Set(metadata),
reindex: Set(Some(false)),
last_requested_status_code: Set(Some(LastRequestedStatusCode::Success)),
fetch_duration_in_ms: Set(Some(time_elapsed)),
..Default::default()
};

Expand All @@ -228,8 +223,6 @@ pub async fn perform_metadata_json_task(
let active_model = asset_data::ActiveModel {
id: Set(download_metadata_info.asset_data_id.clone()),
reindex: Set(Some(true)),
last_requested_status_code: Set(Some(LastRequestedStatusCode::Failure)),
fetch_duration_in_ms: Set(Some(time_elapsed)),
..Default::default()
};

Expand Down Expand Up @@ -263,3 +256,14 @@ impl DownloadMetadata {
.map(|_| ())
}
}

pub async fn skip_metadata_json_download<T>(asset_data_id: &[u8], uri: &str, conn: &T) -> bool
where
T: ConnectionTrait + TransactionTrait,
{
asset_data::Entity::find_by_id(asset_data_id.to_vec())
.one(conn)
.await
.unwrap_or(None)
.is_some_and(|model| model.metadata_url.eq(uri))
}
7 changes: 0 additions & 7 deletions digital_asset_types/src/dao/generated/asset_data.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3
use super::sea_orm_active_enums::ChainMutability;
use super::sea_orm_active_enums::LastRequestedStatusCode;
use super::sea_orm_active_enums::Mutability;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
Expand All @@ -28,8 +27,6 @@ pub struct Model {
pub raw_name: Option<Vec<u8>>,
pub raw_symbol: Option<Vec<u8>>,
pub base_info_seq: Option<i64>,
pub fetch_duration_in_ms: Option<u64>,
pub last_requested_status_code: Option<LastRequestedStatusCode>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
Expand All @@ -45,8 +42,6 @@ pub enum Column {
RawName,
RawSymbol,
BaseInfoSeq,
FetchDurationInMs,
LastRequestedStatusCode,
}

#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)]
Expand Down Expand Up @@ -79,8 +74,6 @@ impl ColumnTrait for Column {
Self::RawName => ColumnType::Binary.def().null(),
Self::RawSymbol => ColumnType::Binary.def().null(),
Self::BaseInfoSeq => ColumnType::BigInteger.def().null(),
Self::FetchDurationInMs => ColumnType::Unsigned.def().null(),
Self::LastRequestedStatusCode => LastRequestedStatusCode::db_type().null(),
}
}
}
Expand Down
13 changes: 0 additions & 13 deletions digital_asset_types/src/dao/generated/sea_orm_active_enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,3 @@ pub enum Instruction {
#[sea_orm(string_value = "verify_creator")]
VerifyCreator,
}

#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(
rs_type = "String",
db_type = "Enum",
enum_name = "last_requested_status_code"
)]
pub enum LastRequestedStatusCode {
#[sea_orm(string_value = "success")]
Success,
#[sea_orm(string_value = "failure")]
Failure,
}
2 changes: 0 additions & 2 deletions digital_asset_types/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ pub fn create_asset_data(
raw_name: Some(metadata.name.into_bytes().to_vec().clone()),
raw_symbol: Some(metadata.symbol.into_bytes().to_vec().clone()),
base_info_seq: Some(0),
fetch_duration_in_ms: None,
last_requested_status_code: None,
},
)
}
Expand Down
2 changes: 0 additions & 2 deletions digital_asset_types/tests/json_parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ pub async fn parse_onchain_json(json: serde_json::Value) -> Content {
raw_name: Some(String::from("Handalf").into_bytes().to_vec()),
raw_symbol: Some(String::from("").into_bytes().to_vec()),
base_info_seq: Some(0),
fetch_duration_in_ms: None,
last_requested_status_code: None,
};

v1_content_from_json(&asset_data).unwrap()
Expand Down
2 changes: 0 additions & 2 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ mod m20240319_120101_add_mpl_core_enum_vals;
mod m20240320_120101_add_mpl_core_info_items;
mod m20240520_120101_add_mpl_core_external_plugins_columns;
mod m20240718_161232_change_supply_columns_to_numeric;
mod m20241104_093312_add_metadata_json_fetch_heuristics_columns;
mod m20241119_060310_add_token_inscription_enum_variant;

pub mod model;
Expand Down Expand Up @@ -99,7 +98,6 @@ impl MigratorTrait for Migrator {
Box::new(m20240320_120101_add_mpl_core_info_items::Migration),
Box::new(m20240520_120101_add_mpl_core_external_plugins_columns::Migration),
Box::new(m20240718_161232_change_supply_columns_to_numeric::Migration),
Box::new(m20241104_093312_add_metadata_json_fetch_heuristics_columns::Migration),
Box::new(m20241119_060310_add_token_inscription_enum_variant::Migration),
]
}
Expand Down

This file was deleted.

2 changes: 0 additions & 2 deletions migration/src/model/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ pub enum AssetData {
RawName,
RawSymbol,
BaseInfoSeq,
FetchDurationInMs,
LastRequestedStatusCode,
}

#[derive(Copy, Clone, Iden)]
Expand Down
63 changes: 36 additions & 27 deletions program_transformers/src/bubblegum/db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::error::{ProgramTransformerError, ProgramTransformerResult},
das_core::DownloadMetadataInfo,
das_core::{skip_metadata_json_download, DownloadMetadataInfo},
digital_asset_types::dao::{
asset, asset_authority, asset_creators, asset_data, asset_grouping, cl_audits_v2, cl_items,
sea_orm_active_enums::{
Expand Down Expand Up @@ -378,36 +378,45 @@ pub async fn upsert_asset_data<T>(
where
T: ConnectionTrait + TransactionTrait,
{
let model = asset_data::ActiveModel {
let skip_metadata_json_download = skip_metadata_json_download(&id, &metadata_url, txn).await;

let mut model = asset_data::ActiveModel {
id: ActiveValue::Set(id.clone()),
chain_data_mutability: ActiveValue::Set(chain_data_mutability),
chain_data: ActiveValue::Set(chain_data),
metadata_url: ActiveValue::Set(metadata_url.clone()),
metadata_mutability: ActiveValue::Set(metadata_mutability),
metadata: ActiveValue::Set(JsonValue::String("processing".to_string())),
slot_updated: ActiveValue::Set(slot_updated),
reindex: ActiveValue::Set(Some(true)),
raw_name: ActiveValue::Set(Some(raw_name)),
raw_symbol: ActiveValue::Set(Some(raw_symbol)),
base_info_seq: ActiveValue::Set(Some(seq)),
..Default::default()
};

let mut columns_to_update = vec![
asset_data::Column::ChainDataMutability,
asset_data::Column::ChainData,
asset_data::Column::MetadataMutability,
asset_data::Column::SlotUpdated,
asset_data::Column::RawName,
asset_data::Column::RawSymbol,
asset_data::Column::BaseInfoSeq,
];
if !skip_metadata_json_download {
model.metadata_url = ActiveValue::Set(metadata_url.clone());
model.metadata = ActiveValue::Set(JsonValue::String("processing".to_string()));
model.reindex = ActiveValue::Set(Some(true));

columns_to_update.extend_from_slice(&[
asset_data::Column::MetadataUrl,
asset_data::Column::Metadata,
asset_data::Column::Reindex,
]);
}

let mut query = asset_data::Entity::insert(model)
.on_conflict(
OnConflict::columns([asset_data::Column::Id])
.update_columns([
asset_data::Column::ChainDataMutability,
asset_data::Column::ChainData,
asset_data::Column::MetadataUrl,
asset_data::Column::MetadataMutability,
asset_data::Column::Metadata,
asset_data::Column::SlotUpdated,
asset_data::Column::Reindex,
asset_data::Column::RawName,
asset_data::Column::RawSymbol,
asset_data::Column::BaseInfoSeq,
])
.update_columns(columns_to_update)
.to_owned(),
)
.build(DbBackend::Postgres);
Expand All @@ -421,20 +430,20 @@ where
query.sql
);

let result = txn
.execute(query)
txn.execute(query)
.await
.map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?;

if result.rows_affected() > 0 {
Ok(Some(DownloadMetadataInfo::new(
id,
metadata_url,
slot_updated,
)))
} else {
Ok(None)
// If the metadata JSON already exists, skip the download.
if skip_metadata_json_download {
return Ok(None);
}

Ok(Some(DownloadMetadataInfo::new(
id,
metadata_url,
slot_updated,
)))
}

#[allow(clippy::too_many_arguments)]
Expand Down
8 changes: 1 addition & 7 deletions program_transformers/src/bubblegum/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{
error::{ProgramTransformerError, ProgramTransformerResult},
skip_metadata_json_download, DownloadMetadataNotifier,
DownloadMetadataNotifier,
},
blockbuster::{
instruction::InstructionBundle,
Expand Down Expand Up @@ -72,9 +72,6 @@ where
}
InstructionName::MintV1 | InstructionName::MintToCollectionV1 => {
if let Some(info) = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str).await? {
if skip_metadata_json_download(&info, txn).await {
return Ok(());
}
download_metadata_notifier(info)
.await
.map_err(ProgramTransformerError::DownloadMetadataNotify)?;
Expand Down Expand Up @@ -102,9 +99,6 @@ where
if let Some(info) =
update_metadata::update_metadata(parsing_result, bundle, txn, ix_str).await?
{
if skip_metadata_json_download(&info, txn).await {
return Ok(());
}
download_metadata_notifier(info)
.await
.map_err(ProgramTransformerError::DownloadMetadataNotify)?;
Expand Down
Loading

0 comments on commit 5bcb9fe

Please sign in to comment.