Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
feat: cron track deal
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Sep 29, 2023
1 parent d65fa26 commit 23de3fc
Show file tree
Hide file tree
Showing 28 changed files with 1,203 additions and 56 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"@ucanto/client": "8.0.0",
"@ucanto/principal": "8.0.0",
"@ucanto/transport": "8.0.0",
"@web3-storage/data-segment": "^3.0.1",
"@web3-storage/filecoin-api": "^1.4.3",
"@web3-storage/filecoin-client": "1.3.0",
"sst": "^2.8.3",
Expand Down
2 changes: 2 additions & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"@aws-sdk/util-dynamodb": "^3.398.0",
"@ipld/dag-json": "^10.1.3",
"@ipld/dag-ucan": "3.3.2",
"@ucanto/client": "^8.0.1",
"@ucanto/interface": "^8.0.0",
"@ucanto/principal": "^8.0.0",
"@ucanto/server": "^8.0.2",
Expand All @@ -30,6 +31,7 @@
"ava": "^5.3.1",
"delay": "^6.0.0",
"nanoid": "^4.0.2",
"p-defer": "^4.0.0",
"p-wait-for": "^5.0.2",
"sqs-consumer": "^7.2.2",
"sst": "^2.8.3",
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/data/deal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export enum Status {
}
export type UnixTime = number

// todo: proof
export type Deal = {
aggregate: PieceLink
storefront: string
Expand Down
23 changes: 23 additions & 0 deletions packages/core/src/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
export const DatabaseOperationErrorName = /** @type {const} */ (
'DatabaseOperationFailed'
)
export class DatabaseOperationFailed extends Error {
get reason() {
return this.message
}
get name() {
return DatabaseOperationErrorName
}
}

export const UnexpectedDealForApprovalErrorName = /** @type {const} */ (
'UnexpectedDealForApprovalFailed'
)
export class UnexpectedDealForApprovalFailed extends Error {
get reason() {
return this.message
}
get name() {
return UnexpectedDealForApprovalErrorName
}
}
2 changes: 1 addition & 1 deletion packages/core/src/queue/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export function createQueueClient <Record> (conf: Target, context: QueueContext<

let encodedMessage: string
try {
encodedMessage = await context.encodeMessage(record, encodedKey)
encodedMessage = context.encodeMessage(record, encodedKey)
} catch (error: any) {
return {
error: new EncodeRecordFailed(error.message)
Expand Down
31 changes: 30 additions & 1 deletion packages/core/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import * as ed25519 from '@ucanto/principal/ed25519'
import * as Server from '@ucanto/server'
import * as DID from '@ipld/dag-ucan/did'
import { Signer } from '@ucanto/interface'
import { CAR } from '@ucanto/transport'
import { CAR, HTTP } from '@ucanto/transport'
import { connect } from '@ucanto/client'

import { createService } from '@web3-storage/filecoin-api/dealer'

Expand All @@ -29,6 +30,34 @@ export function getServiceSigner(config: ServiceSignerCtx) {
return signer
}

/**
*
* @param {{ did: string, url: string }} config
* @returns
*/
export function getServiceConnection (config: ServiceConnectionCtx) {
const servicePrincipal = DID.parse(config.did) // 'did:web:tracker.web3.storage'
const serviceURL = new URL(config.url) // 'https://tracker.web3.storage'

const serviceConnection = connect({
id: servicePrincipal,
codec: CAR.outbound,
channel: HTTP.open({
url: serviceURL,
method: 'POST',
}) as ed25519.Channel<Record<string, any>>,
})

return serviceConnection
}

export type ServiceConnectionCtx = {
// url of deployed service
url: string
// public DID for the service (did:key:... derived from PRIVATE_KEY if not set)
did: string
}

export type ServiceSignerCtx = {
// multiformats private key of primary signing key
privateKey: string
Expand Down
67 changes: 67 additions & 0 deletions packages/core/src/workflow/aggregate-accept.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { ConnectionView } from '@ucanto/principal/ed25519'
import { Store } from '@web3-storage/filecoin-api/types'
import { InvocationConfig } from '@web3-storage/filecoin-client/types'
import { Dealer } from '@web3-storage/filecoin-client'

import { Deal, Status } from '../data/deal.js'
import { DealerMessageRecord } from '@web3-storage/filecoin-api/types'
import { UnexpectedDealForApprovalFailed } from '../errors.js'

/**
* Invokes `aggregate/accept` capability to self sign receipt.
* Get `pieces` in the aggregate and invoke `aggregate/accept`.
*/
export async function aggregateAccept ({
deal,
dealerServiceConnection,
dealerInvocationConfig,
offerStore
}: AggregateAcceptContext) {
// Deal should only get to here if it was approved
if (deal.stat !== Status.Approved) {
return {
error: new UnexpectedDealForApprovalFailed(
`aggregate ${deal.aggregate.link.toString()} has state ${deal.stat}`
)
}
}

// Get pieces included in this aggregate
const offer = await offerStore.get(deal)
if (offer.error) {
return {
error: offer.error
}
}

// Invoke `aggregate/accept` capability to issue self signed receipt
const dealAddResponse = await Dealer.dealAdd(
dealerInvocationConfig,
deal.aggregate,
offer.ok.pieces,
dealerInvocationConfig.issuer.did(),
'random-label-to-deprecate',
{ connection: dealerServiceConnection }
)

if (dealAddResponse.out.error) {
return {
error: dealAddResponse.out.error
}
}

// TODO: Add to deal queue trigger
// is this `filecoinDealQueue`?

return {
ok: {},
error: undefined,
}
}

export interface AggregateAcceptContext {
deal: Deal
dealerServiceConnection: ConnectionView<any>
dealerInvocationConfig: InvocationConfig
offerStore: Store<DealerMessageRecord>
}
165 changes: 165 additions & 0 deletions packages/core/src/workflow/deal-track.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { DynamoDBClient, QueryCommand, UpdateItemCommand } from '@aws-sdk/client-dynamodb'
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb'
import { ConnectionView } from '@ucanto/principal/ed25519'
import { Chain } from '@web3-storage/filecoin-client'
import { InvocationConfig } from '@web3-storage/filecoin-client/types'

import { Status, Deal, decode as decodeDeal } from '../data/deal.js'
import { DatabaseOperationFailed } from '../errors.js'

/**
* Track filecoin deals Job.
* Find `OFFERED` deals pending approval and find out if these deals are on chain.
* If `OFFERED` deals are now on chain, update them to `APPROVED`.
*/
export async function dealTrack ({
tableName,
tableClient,
dealTrackerServiceConnection,
dealTrackerInvocationConfig
}: DealTrackContext) {
// Get offered deals pending approval/rejection
const offeredDeals = await getOfferedDeals({ tableName, tableClient })
if (offeredDeals.error) {
return offeredDeals
}

// Update approved deals from the ones resolved
const updatedResponses = await Promise.all(
offeredDeals.ok.map(deal => updateApprovedDeals({
deal,
tableName,
tableClient,
dealTrackerServiceConnection,
dealTrackerInvocationConfig
}))
)

// Fail if one or more update operations did not succeed.
// The successful ones are still valid, but we should keep track of errors for monitoring/alerting.
const updateErrorResponse = updatedResponses.find(r => r.error)
if (updateErrorResponse) {
return {
error: updateErrorResponse.error
}
}

// Return successful update operation
// Include in response the ones that were Updated, and the ones still pending response.
// TODO: this response body can probably be used to flag deals that we are waiting for response for too late if we add details.
const updatedDealsCount = updatedResponses.filter(r => r.ok?.updated).length
return {
ok: {
updatedCount: updatedDealsCount,
pendingCount: updatedResponses.length - updatedDealsCount
},
error: undefined
}
}

/**
* Find out if deal is on chain.
* When on chain, updates its stat in store.
*/
async function updateApprovedDeals ({
deal,
dealTrackerServiceConnection,
dealTrackerInvocationConfig,
tableName,
tableClient,
}: DealTrackContext & { deal: Deal }) {
// Query current state
const info = await Chain.chainInfo(
dealTrackerInvocationConfig,
deal.aggregate,
{ connection: dealTrackerServiceConnection }
)

if (info.out.error) {
return info.out
}

// If there are no deals for it, we can skip
// @ts-expect-error deals not yet typed
if (!Object.keys(info.out.ok.deals || {}).length) {
return {
ok: {
updated: false
}
}
}

// Update entry with stat `Status.Approved`
const updateRes = await tableClient.send(
new UpdateItemCommand({
TableName: tableName,
Key: marshall({
aggregate: deal.aggregate.link().toString()
}),
ExpressionAttributeValues: {
':ns': { N: `${Status.Approved}` }
},
UpdateExpression: `SET stat = :ns`,
ReturnValues: 'ALL_NEW',
})
)

if (updateRes.$metadata.httpStatusCode !== 200) {
return {
error: new DatabaseOperationFailed(`failed to update status of aggregate ${deal.aggregate.link().toString()}`)
}
}

return {
ok: {
updated: true
}
}
}

/**
* Query deals table to get deals that are currently in `Offered` state.
*/
async function getOfferedDeals ({
tableName,
tableClient
}: DealTable) {
// TODO: Pagination
// A single Query only returns a result set that fits within the 1 MB size limit.
// This should be enough for our throughput here, specially if we sort by insertion date.
const cmd = new QueryCommand({
TableName: tableName,
KeyConditions: {
stat: {
ComparisonOperator: 'EQ',
AttributeValueList: [{ N: `${Status.Offered}` }]
}
},
// Index on `stat` column
IndexName: 'indexStat',
})

const resp = await tableClient.send(cmd)
if (resp.$metadata.httpStatusCode !== 200) {
return {
error: new DatabaseOperationFailed(`failed to query available aggregates with stat ${Status.Offered}`)
}
}

return {
ok: (resp?.Items && resp?.Items.map(i =>
// @ts-expect-error unmarshall not typed
decodeDeal.record((unmarshall(i))))
) || []
}
}

export interface DealTrackContext extends DealTable {
dealTrackerServiceConnection: ConnectionView<any>
dealTrackerInvocationConfig: InvocationConfig
}

export interface DealTable {
tableName: string
tableClient: DynamoDBClient
}
2 changes: 1 addition & 1 deletion packages/core/src/workflow/dealer-store.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Store, DealerRecord } from '@web3-storage/filecoin-api/types'
import { Store } from '@web3-storage/filecoin-api/types'
import { Deal } from '../data/deal.js'
import { decode as offerDecode } from '../data/offer.js'

Expand Down
2 changes: 1 addition & 1 deletion packages/core/test/helpers/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import anyTest from 'ava'
*
* @typedef {import("ava").TestFn<any>} Test
* @typedef {import("ava").TestFn<S3Context & DbContext & QueueContext>} TestService
* @typedef {import("ava").TestFn<DbContext>} TestWorkflow
* @typedef {import("ava").TestFn<S3Context & DbContext>} TestWorkflow
*/

// eslint-disable-next-line unicorn/prefer-export-from
Expand Down
21 changes: 21 additions & 0 deletions packages/core/test/helpers/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import * as Server from '@ucanto/server'

export const OperationErrorName = /** @type {const} */ ('OperationFailed')
export class OperationFailed extends Server.Failure {
/**
* @param {string} message
* @param {import('@web3-storage/data-segment').PieceLink} piece
*/
constructor(message, piece) {
super(message)
this.piece = piece
}

get reason() {
return this.message
}

get name() {
return OperationErrorName
}
}
Loading

0 comments on commit 23de3fc

Please sign in to comment.