Skip to content

DIPs agent rebased (2) #1113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
27f84ad
feat: indexing fees / dips (wip)
pcarranzav Feb 7, 2025
c8b33c8
all: update dips code for multinetworks
dwerner Apr 17, 2025
4eb8949
fix: manage dips deployments in any mode
pcarranzav May 9, 2025
0090a3a
fix: use the correct name for the indexing_agreements table
pcarranzav Mar 31, 2025
3780d05
fix: use the correct timestamp column names
pcarranzav Mar 31, 2025
f1681df
fix: lint
pcarranzav May 9, 2025
ec6c084
chore: debug logging
pcarranzav May 9, 2025
4e0e2e7
fix: lint again
pcarranzav May 9, 2025
b487bd2
fix: allow actions on deployments that are not published yet
pcarranzav May 12, 2025
d1ae77d
fix: validate deployment ID in actions anyways
pcarranzav May 12, 2025
59fffae
fix: use old validation, lint
pcarranzav May 12, 2025
3db46f9
fix: ensure agreement allocation ids are stored if the allocation con…
pcarranzav May 23, 2025
3f1016f
fix: lint
pcarranzav May 27, 2025
f661e39
fix: dont requireSupported, use NEVER as blocklist
pcarranzav May 27, 2025
fb37804
fix: include dips deployments that are not published when evaluating
pcarranzav May 27, 2025
f79ed40
fix: matching
pcarranzav May 27, 2025
24d5f9f
fix: use DIPs-specific decision basis
pcarranzav May 27, 2025
931774f
fix: parseDecisionBasis
pcarranzav May 27, 2025
6c489c1
fix: migration
pcarranzav May 27, 2025
9903420
fix: cleanup
pcarranzav May 27, 2025
76c9975
fix: test
pcarranzav May 27, 2025
73245ea
fix: migration
pcarranzav May 28, 2025
329faaa
fix: grapql
pcarranzav May 28, 2025
7039228
fix: match allocations for cancelled agreements
pcarranzav May 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/action-queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ The action execution worker will only grab items from the action queue to execut

## Allocation management modes:
- `auto`: The indexer-agent will act similarly to the legacy paradigm. When it identifies allocation actions it will add them to the queue with ActionStatus = `approved`; the execution worker process will pick up the approved actions within 30 seconds and execute them.
- `manual`: The indexer-agent will not add any items to the action queue in this mode. It will spin up an indexer-management server which can be interacted with manually or integrated with 3rd party tools to add actions to the action queue and execute them.
- `oversight`: The indexer-agent will add run its reconciliation loop to make allocation decisions and when actions are identified it will queue them. These actions will then require approval before they can be executed.
- `manual`: The indexer-agent will not add any items to the action queue in this mode. It will spin up an indexer-management server which can be interacted with manually or integrated with 3rd party tools to add actions to the action queue and execute them. An exception to this is indexing agreements (DIPs), for which actions will be queued and executed even in this mode.
- `oversight`: The indexer-agent will add run its reconciliation loop to make allocation decisions and when actions are identified it will queue them. These actions will then require approval before they can be executed. An exception to this is indexing agreements (DIPs), for which actions will be queued as approved and executed even in this mode.

## Actions CLI
The indexer-cli provides an `actions` module for manually working with the action queue. It uses the #Graphql API hosted by the indexer management server to interact with the actions queue.
Expand Down
1 change: 1 addition & 0 deletions packages/indexer-agent/src/__tests__/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ const setup = async () => {
const network = await Network.create(
logger,
networkSpecification,
models,
queryFeeModels,
graphNode,
metrics,
Expand Down
166 changes: 137 additions & 29 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,20 @@ export class Agent {
{ logger, milliseconds: requestIntervalSmall },
async () => {
return this.multiNetworks.map(async ({ network, operator }) => {
if (network.specification.indexerOptions.enableDips) {
// There should be a DipsManager in the operator
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}
logger.debug('Ensuring indexing rules for DIPs', {
protocolNetwork: network.specification.networkIdentifier,
})
await operator.dipsManager.ensureAgreementRules()
} else {
logger.debug(
'DIPs is disabled, skipping indexing rule enforcement',
)
}
logger.trace('Fetching indexing rules', {
protocolNetwork: network.specification.networkIdentifier,
})
Expand Down Expand Up @@ -324,12 +338,21 @@ export class Agent {
},
)

// Skip fetching active deployments if the deployment management mode is manual and POI tracking is disabled
// Skip fetching active deployments if the deployment management mode is manual, DIPs is disabled, and POI tracking is disabled
const activeDeployments: Eventual<SubgraphDeploymentID[]> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
async () => {
if (this.deploymentManagement === DeploymentManagementMode.AUTO) {
let dipsEnabled = false
await this.multiNetworks.map(async ({ network }) => {
if (network.specification.indexerOptions.enableDips) {
dipsEnabled = true
}
})
if (
this.deploymentManagement === DeploymentManagementMode.AUTO ||
dipsEnabled
) {
logger.debug('Fetching active deployments')
const assignments =
await this.graphNode.subgraphDeploymentsAssignments(
Expand All @@ -338,7 +361,7 @@ export class Agent {
return assignments.map(assignment => assignment.id)
} else {
logger.info(
"Skipping fetching active deployments fetch since DeploymentManagementMode = 'manual' and POI tracking is disabled",
"Skipping fetching active deployments fetch since DeploymentManagementMode = 'manual' and DIPs is disabled",
)
return []
}
Expand All @@ -351,24 +374,50 @@ export class Agent {
},
)

const networkDeployments: Eventual<NetworkMapped<SubgraphDeployment[]>> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
async () =>
await this.multiNetworks.map(({ network }) => {
logger.trace('Fetching network deployments', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.subgraphDeployments()
}),
{
onError: error =>
logger.warn(
`Failed to obtain network deployments, trying again later`,
{ error },
),
},
)
const networkAndDipsDeployments: Eventual<
NetworkMapped<SubgraphDeployment[]>
> = sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
async () =>
await this.multiNetworks.map(async ({ network, operator }) => {
logger.trace('Fetching network deployments', {
protocolNetwork: network.specification.networkIdentifier,
})
const deployments = network.networkMonitor.subgraphDeployments()
if (network.specification.indexerOptions.enableDips) {
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}
const resolvedDeployments = await deployments
const dipsDeployments = await Promise.all(
(await operator.dipsManager.getActiveDipsDeployments()).map(
deployment =>
network.networkMonitor.subgraphDeployment(
deployment.ipfsHash,
),
),
)
for (const deployment of dipsDeployments) {
if (
resolvedDeployments.find(
d => d.id.bytes32 === deployment.id.bytes32,
) == null
) {
resolvedDeployments.push(deployment)
}
}
return resolvedDeployments
}
return deployments
}),
{
onError: error =>
logger.warn(
`Failed to obtain network deployments, trying again later`,
{ error },
),
},
)

const eligibleTransferDeployments: Eventual<
NetworkMapped<TransferredSubgraphDeployment[]>
Expand Down Expand Up @@ -423,13 +472,13 @@ export class Agent {
const intermediateNetworkDeploymentAllocationDecisions: Eventual<
NetworkMapped<AllocationDecision[]>
> = join({
networkDeployments,
networkAndDipsDeployments,
indexingRules,
}).tryMap(
({ indexingRules, networkDeployments }) => {
({ indexingRules, networkAndDipsDeployments }) => {
return mapValues(
this.multiNetworks.zip(indexingRules, networkDeployments),
([indexingRules, networkDeployments]: [
this.multiNetworks.zip(indexingRules, networkAndDipsDeployments),
([indexingRules, networkAndDipsDeployments]: [
IndexingRuleAttributes[],
SubgraphDeployment[],
]) => {
Expand All @@ -438,7 +487,11 @@ export class Agent {
logger.trace('Evaluating which deployments are worth allocating to')
return indexingRules.length === 0
? []
: evaluateDeployments(logger, networkDeployments, indexingRules)
: evaluateDeployments(
logger,
networkAndDipsDeployments,
indexingRules,
)
},
)
},
Expand Down Expand Up @@ -729,9 +782,42 @@ export class Agent {
}
break
case DeploymentManagementMode.MANUAL:
this.logger.debug(
`Skipping subgraph deployment reconciliation since DeploymentManagementMode = 'manual'`,
)
await this.multiNetworks.map(async ({ network, operator }) => {
if (network.specification.indexerOptions.enableDips) {
// Reconcile DIPs deployments anyways
this.logger.warn(
`Deployment management is manual, but DIPs is enabled. Reconciling DIPs deployments anyways.`,
)
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}
const dipsDeployments =
await operator.dipsManager.getActiveDipsDeployments()
const newTargetDeployments = new Set([
...activeDeployments,
...dipsDeployments,
])
try {
await this.reconcileDeployments(
activeDeployments,
Array.from(newTargetDeployments),
eligibleAllocations,
)
} catch (err) {
logger.warn(
`Exited early while reconciling deployments. Skipped reconciling actions.`,
{
err: indexerError(IndexerErrorCode.IE005, err),
},
)
return
}
} else {
this.logger.debug(
`Skipping subgraph deployment reconciliation since DeploymentManagementMode = 'manual'`,
)
}
})
break
default:
throw new Error(
Expand All @@ -752,6 +838,23 @@ export class Agent {
})
return
}

await this.multiNetworks.mapNetworkMapped(
activeAllocations,
async ({ network, operator }, activeAllocations: Allocation[]) => {
if (network.specification.indexerOptions.enableDips) {
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}
this.logger.debug(
`Matching agreement allocations for network ${network.specification.networkIdentifier}`,
)
await operator.dipsManager.matchAgreementAllocations(
activeAllocations,
)
}
},
)
},
)
}
Expand Down Expand Up @@ -1053,6 +1156,7 @@ export class Agent {
maxAllocationEpochs: number,
network: Network,
operator: Operator,
forceAction: boolean = false,
): Promise<void> {
const logger = this.logger.child({
deployment: deploymentAllocationDecision.deployment.ipfsHash,
Expand All @@ -1074,6 +1178,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
activeDeploymentAllocations,
forceAction,
)
case true: {
// If no active allocations and subgraph health passes safety check, create one
Expand Down Expand Up @@ -1110,6 +1215,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
mostRecentlyClosedAllocation,
forceAction,
)
}
} else if (activeDeploymentAllocations.length > 0) {
Expand All @@ -1118,6 +1224,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
activeDeploymentAllocations,
forceAction,
)
} else {
// Refresh any expiring allocations
Expand All @@ -1134,6 +1241,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
expiringAllocations,
forceAction,
)
}
}
Expand Down
36 changes: 35 additions & 1 deletion packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,26 @@ export const start = {
default: 1,
group: 'Indexer Infrastructure',
})
.option('enable-dips', {
description: 'Whether to enable Indexing Fees (DIPs)',
type: 'boolean',
default: false,
group: 'Indexing Fees ("DIPs")',
})
.option('dipper-endpoint', {
description: 'Gateway endpoint for DIPs receipts',
type: 'string',
array: false,
required: false,
group: 'Indexing Fees ("DIPs")',
})
.option('dips-allocation-amount', {
description: 'Amount of GRT to allocate for DIPs',
type: 'number',
default: 1,
required: false,
group: 'Indexing Fees ("DIPs")',
})
.check(argv => {
if (
!argv['network-subgraph-endpoint'] &&
Expand Down Expand Up @@ -330,6 +350,9 @@ export const start = {
) {
return 'Invalid --rebate-claim-max-batch-size provided. Must be > 0 and an integer.'
}
if (argv['enable-dips'] && !argv['dipper-endpoint']) {
return 'Invalid --dipper-endpoint provided. Must be provided when --enable-dips is true.'
}
return true
})
},
Expand Down Expand Up @@ -365,6 +388,10 @@ export async function createNetworkSpecification(
allocateOnNetworkSubgraph: argv.allocateOnNetworkSubgraph,
register: argv.register,
finalityTime: argv.chainFinalizeTime,
enableDips: argv.enableDips,
dipperEndpoint: argv.dipperEndpoint,
dipsAllocationAmount: argv.dipsAllocationAmount,
dipsEpochsMargin: argv.dipsEpochsMargin,
}

const transactionMonitoring = {
Expand Down Expand Up @@ -583,7 +610,14 @@ export async function run(
const networks: Network[] = await pMap(
networkSpecifications,
async (spec: NetworkSpecification) =>
Network.create(logger, spec, queryFeeModels, graphNode, metrics),
Network.create(
logger,
spec,
managementModels,
queryFeeModels,
graphNode,
metrics,
),
)

// --------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import type { Logger } from '@graphprotocol/common-ts'
import type { QueryInterface } from 'sequelize'

interface MigrationContext {
queryInterface: QueryInterface
logger: Logger
}

interface Context {
context: MigrationContext
}

export async function up({ context }: Context): Promise<void> {
const { queryInterface, logger } = context

if (await queryInterface.tableExists('IndexingRules')) {
logger.debug('Adding dips to decision basis')

await queryInterface.sequelize.query(
`ALTER TYPE "enum_IndexingRules_decisionBasis" ADD VALUE 'dips'`,
)
} else {
logger.debug('IndexingRules table does not exist, skipping migration')
}

logger.info('Migration completed')
}

export async function down({ context }: Context): Promise<void> {
const { queryInterface, logger } = context

logger.info('Removing dips from decision basis')
await queryInterface.sequelize.query(
`ALTER TYPE "enum_IndexingRules_decisionBasis" DROP VALUE 'dips'`,
)

logger.info('Migration completed')
}
1 change: 1 addition & 0 deletions packages/indexer-cli/src/__tests__/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ export const setup = async (multiNetworksEnabled: boolean) => {
const network = await Network.create(
logger,
testNetworkSpecification,
models,
queryFeeModels,
graphNode,
metrics,
Expand Down
Loading
Loading