diff --git a/tee-worker/omni-executor/executor-core/src/native_task.rs b/tee-worker/omni-executor/executor-core/src/native_task.rs index 3d99c95ca3..c8269b2b12 100644 --- a/tee-worker/omni-executor/executor-core/src/native_task.rs +++ b/tee-worker/omni-executor/executor-core/src/native_task.rs @@ -57,6 +57,8 @@ pub enum NativeTask { MaybeGoogleCode, // google_code Option, // language ), + #[codec(index = 25)] + PumpxNotifyLimitOrderResult(Identity, u32, String, Option), } impl NativeTaskTrait for NativeTask { @@ -74,6 +76,7 @@ impl NativeTaskTrait for NativeTask { Self::PumpxAddWallet(sender, ..) => sender, Self::PumpxSignLimitOrder(sender, ..) => sender, Self::PumpxTransferWidthdraw(sender, ..) => sender, + Self::PumpxNotifyLimitOrderResult(sender, ..) => sender, } } diff --git a/tee-worker/omni-executor/native-task-handler/src/lib.rs b/tee-worker/omni-executor/native-task-handler/src/lib.rs index 04a2f744ea..b74e560b25 100644 --- a/tee-worker/omni-executor/native-task-handler/src/lib.rs +++ b/tee-worker/omni-executor/native-task-handler/src/lib.rs @@ -939,6 +939,48 @@ async fn handle_native_task< }, }; }, + NativeTask::PumpxNotifyLimitOrderResult(sender, intent_id, result, message) => { + if result != "ok" && result != "nok" { + send_error( + format!("Invalid result value: {}. Must be 'ok' or 'nok'", result), + response_sender, + NativeTaskError::PumpxApiError(PumpxApiError::InvalidInput), + ); + return; + } + + let execution_result = match result.as_str() { + "ok" => IntentCompletedDetail::Success, + "nok" => IntentCompletedDetail::Failure, + _ => unreachable!(), // Already validated above + }; + + if let Some(msg) = message { + log::info!("Limit order result message for intent_id {}: {}", intent_id, msg); + } + + let intent_executed_call = + parentchain_api_interface::tx().omni_account().intent_completed( + sender.to_omni_account().to_subxt_type(), + intent_id, + execution_result, + ); + + let tx = ctx.transaction_signer.sign(intent_executed_call).await; + + if rpc_client.submit_tx(&tx).await.is_err() { + send_error( + "Failed to submit tx".to_string(), + response_sender, + NativeTaskError::InternalError, + ); + ctx.transaction_signer.update_nonce().await; + return; + } + + send_ok(response_sender, NativeTaskOk::PumpxNotifyLimitOrderResult); + return; + }, }; match rpc_client.submit_and_watch_tx_until(&tx, XtStatus::Finalized).await { diff --git a/tee-worker/omni-executor/native-task-handler/src/types.rs b/tee-worker/omni-executor/native-task-handler/src/types.rs index 48fc634da0..256a3c142a 100644 --- a/tee-worker/omni-executor/native-task-handler/src/types.rs +++ b/tee-worker/omni-executor/native-task-handler/src/types.rs @@ -23,6 +23,7 @@ pub enum NativeTaskOk { PumpxAddWallet(AddWalletResponse), PumpxSignLimitOrder(Vec>), PumpxTransferWithdraw(SendTransferTxResponse), + PumpxNotifyLimitOrderResult, } #[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)] @@ -46,6 +47,7 @@ pub enum PumpxApiError { AddWalletFailed, CreateTransferUnsignedTxFailed, SendTransferTxFailed, + InvalidInput, } #[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)] diff --git a/tee-worker/omni-executor/rpc-server/src/error_code.rs b/tee-worker/omni-executor/rpc-server/src/error_code.rs index 524b71b442..e5af64186a 100644 --- a/tee-worker/omni-executor/rpc-server/src/error_code.rs +++ b/tee-worker/omni-executor/rpc-server/src/error_code.rs @@ -22,6 +22,7 @@ const PUMPX_API_ERROR_CODE: i32 = -32033; const PUMPX_API_ADD_WALLET_FAILED_CODE: i32 = -32034; const PUMPX_API_CREATE_TRANSFER_UNSIGNED_TX_FAILED_CODE: i32 = -32035; const PUMPX_API_SEND_TRANSFER_TX_FAILED_CODE: i32 = -32036; +const PUMPX_API_INVALID_INPUT_FAILED_CODE: i32 = -32037; const PUMPX_SIGNER_REQUEST_SIGNATURE_FAILED_CODE: i32 = -32050; @@ -47,6 +48,7 @@ pub fn get_native_task_error_code(error: &NativeTaskError) -> i32 { PUMPX_API_CREATE_TRANSFER_UNSIGNED_TX_FAILED_CODE }, PumpxApiError::SendTransferTxFailed => PUMPX_API_SEND_TRANSFER_TX_FAILED_CODE, + PumpxApiError::InvalidInput => PUMPX_API_INVALID_INPUT_FAILED_CODE, }, NativeTaskError::PumpxSignerError(signer_error) => match signer_error { PumpxSignerError::RequestSignatureFailed => PUMPX_SIGNER_REQUEST_SIGNATURE_FAILED_CODE, diff --git a/tee-worker/omni-executor/rpc-server/src/methods/pumpx/mod.rs b/tee-worker/omni-executor/rpc-server/src/methods/pumpx/mod.rs index 44a08544e8..5aab10c8ce 100644 --- a/tee-worker/omni-executor/rpc-server/src/methods/pumpx/mod.rs +++ b/tee-worker/omni-executor/rpc-server/src/methods/pumpx/mod.rs @@ -22,6 +22,9 @@ use get_next_intent_id::*; mod sign_limit_order; use sign_limit_order::*; +mod notify_limit_order_result; +use notify_limit_order_result::*; + pub fn register_pumpx(module: &mut RpcModule) { register_request_jwt(module); register_export_wallet(module); @@ -30,4 +33,5 @@ pub fn register_pumpx(module: &mut RpcModule) { register_submit_swap_order(module); register_get_next_intent_id(module); register_sign_limit_order_params(module); + register_notify_limit_order_result(module); } diff --git a/tee-worker/omni-executor/rpc-server/src/methods/pumpx/notify_limit_order_result.rs b/tee-worker/omni-executor/rpc-server/src/methods/pumpx/notify_limit_order_result.rs new file mode 100644 index 0000000000..a40fc80676 --- /dev/null +++ b/tee-worker/omni-executor/rpc-server/src/methods/pumpx/notify_limit_order_result.rs @@ -0,0 +1,103 @@ +use crate::{ + error_code::*, oneshot, server::RpcContext, verify_auth::verify_auth, Decode, Deserialize, + ErrorCode, +}; +use executor_core::native_task::*; +use executor_crypto::jwt; +use executor_primitives::{utils::hex::FromHexPrefixed, OmniAuth}; +use heima_authentication::auth_token::AuthTokenClaims; +use heima_primitives::{Address32, Identity}; +use jsonrpsee::{types::ErrorObject, RpcModule}; +use native_task_handler::{NativeTaskError, NativeTaskOk, NativeTaskResponse}; +use rsa::pkcs1::DecodeRsaPrivateKey; +use rsa::pkcs1::EncodeRsaPublicKey; +use rsa::RsaPrivateKey; + +#[derive(Debug, Deserialize)] +pub struct NotifyLimitOrderResultParams { + pub intent_id: u32, + pub result: String, + pub message: Option, + pub auth_token: String, +} + +pub fn register_notify_limit_order_result(module: &mut RpcModule) { + module + .register_async_method("pumpx_notifyLimitOrderResult", |params, ctx, _| async move { + let internal_error: ErrorObject = ErrorCode::InternalError.into(); + let params = params.parse::()?; + + let private_key = RsaPrivateKey::from_pkcs1_der(&ctx.jwt_rsa_private_key) + .map_err(|_| internal_error.clone())?; + let public_key = + private_key.to_public_key().to_pkcs1_der().map_err(|_| internal_error.clone())?; + + let Ok(token) = + jwt::decode::(¶ms.auth_token, public_key.as_bytes()) + else { + return Err(ErrorCode::ServerError(AUTH_VERIFICATION_FAILED_CODE).into()); + }; + if token.typ != "access" { + return Err(ErrorCode::ServerError(AUTH_VERIFICATION_FAILED_CODE).into()); + } + + let omni_account = token.sub; + let Ok(address) = Address32::from_hex(&omni_account) else { + log::error!("Not a valid address"); + return Err(internal_error); + }; + + let wrapper = NativeTaskWrapper { + task: NativeTask::PumpxNotifyLimitOrderResult( + Identity::Substrate(address), + params.intent_id, + params.result, + params.message, + ), + nonce: None, + auth: Some(OmniAuth::AuthToken(params.auth_token)), + }; + + if wrapper.task.require_auth() && verify_auth(ctx.clone(), &wrapper).await.is_err() { + return Err(ErrorCode::ServerError(AUTH_VERIFICATION_FAILED_CODE).into()); + } + + let (response_sender, response_receiver) = oneshot::channel(); + + if ctx.native_task_sender.send((wrapper, response_sender)).await.is_err() { + log::error!("Failed to send request to native call executor"); + return Err(ErrorCode::InternalError.into()); + } + + match response_receiver.await { + Ok(response) => { + let native_task_response: NativeTaskResponse = + Decode::decode(&mut response.as_slice()) + .map_err(|_| internal_error.clone())?; + match native_task_response { + Ok(NativeTaskOk::PumpxNotifyLimitOrderResult) => Ok(()), + Err(NativeTaskError::InternalError) => { + log::error!("Internal error in native task"); + Err(internal_error) + }, + Err(native_task_error) => { + log::error!("Native task error: {:?}", native_task_error); + Err(ErrorCode::ServerError(get_native_task_error_code( + &native_task_error, + )) + .into()) + }, + _ => { + log::error!("Unexpected response type"); + Err(internal_error) + }, + } + }, + Err(e) => { + log::error!("Failed to receive response from native call handler: {:?}", e); + Err(internal_error) + }, + } + }) + .expect("Failed to register pumpx_notifyLimitOrderResult method"); +} diff --git a/tee-worker/omni-executor/rpc-server/src/methods/pumpx/sign_limit_order.rs b/tee-worker/omni-executor/rpc-server/src/methods/pumpx/sign_limit_order.rs index b39479f74e..976480e182 100644 --- a/tee-worker/omni-executor/rpc-server/src/methods/pumpx/sign_limit_order.rs +++ b/tee-worker/omni-executor/rpc-server/src/methods/pumpx/sign_limit_order.rs @@ -23,9 +23,11 @@ use rsa::pkcs1::EncodeRsaPublicKey; use crate::error_code::get_native_task_error_code; use crate::error_code::AUTH_VERIFICATION_FAILED_CODE; use crate::server::RpcContext; +use crate::verify_auth::verify_auth; use crate::ErrorCode; use ethers::types::Bytes; use executor_core::native_task::NativeTask; +use executor_core::native_task::NativeTaskTrait; use executor_core::native_task::NativeTaskWrapper; use executor_core::native_task::PumpxChainId; use executor_core::native_task::PumxWalletIndex; @@ -88,7 +90,7 @@ pub fn register_sign_limit_order_params(module: &mut RpcModule) { return Err(internal_error); }; - let task_wrapper = NativeTaskWrapper { + let wrapper = NativeTaskWrapper { task: NativeTask::PumpxSignLimitOrder( Identity::Substrate(address), params.chain_id, @@ -99,9 +101,13 @@ pub fn register_sign_limit_order_params(module: &mut RpcModule) { auth: Some(OmniAuth::AuthToken(params.auth_token)), }; + if wrapper.task.require_auth() && verify_auth(ctx.clone(), &wrapper).await.is_err() { + return Err(ErrorCode::ServerError(AUTH_VERIFICATION_FAILED_CODE).into()); + } + let (response_sender, response_receiver) = oneshot::channel(); - if ctx.native_task_sender.send((task_wrapper, response_sender)).await.is_err() { + if ctx.native_task_sender.send((wrapper, response_sender)).await.is_err() { log::error!("Failed to send request to native call executor"); return Err(internal_error); }