Skip to content

Commit

Permalink
feat: implement cache claim
Browse files Browse the repository at this point in the history
  • Loading branch information
alanshaw authored and hannahhoward committed Oct 22, 2024
1 parent 047b505 commit 12c4721
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 12 deletions.
6 changes: 6 additions & 0 deletions pkg/service/contentclaims/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func NewService(indexer types.Service) map[ucan.Ability]server.ServiceMethod[ok.
assert.Equals,
func(cap ucan.Capability[assert.EqualsCaveats], inv invocation.Invocation, ctx server.InvocationContext) (ok.Unit, receipt.Effects, error) {
err := indexer.PublishClaim(context.TODO(), inv)
if err != nil {
log.Errorf("publishing equals claim: %w", err)
}
return ok.Unit{}, nil, err
},
),
Expand Down Expand Up @@ -65,6 +68,9 @@ func NewService(indexer types.Service) map[ucan.Ability]server.ServiceMethod[ok.

claim := delegation.NewDelegation(rootbl, bs)
err = indexer.CacheClaim(context.TODO(), provider, claim)
if err != nil {
log.Errorf("caching claim: %w", err)
}
return ok.Unit{}, nil, err
},
),
Expand Down
49 changes: 38 additions & 11 deletions pkg/service/providerindex/providerindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,16 @@ func (pi *ProviderIndex) filterBySpace(results []model.ProviderResult, mh mh.Mul
return results, nil
}

// Publish should do the following:
// 1. Write the entries to the cache with no expiration until publishing is complete
// 2. Generate an advertisement for the advertised hashes and publish/announce it
func (pi *ProviderIndex) Publish(ctx context.Context, provider peer.AddrInfo, contextID string, digests []mh.Multihash, meta meta.Metadata) error {
func (pi *ProviderIndex) Cache(ctx context.Context, provider peer.AddrInfo, contextID string, digests []mh.Multihash, meta meta.Metadata) error {
// Cache the entries _with_ expiry - we cannot rely on the IPNI notifier to
// tell us when they are published since we are not publishing to IPNI.
return Cache(ctx, pi.providerStore, provider, contextID, digests, meta, true)
}

func Cache(ctx context.Context, providerStore types.ProviderStore, provider peer.AddrInfo, contextID string, digests []mh.Multihash, meta meta.Metadata, expire bool) error {
log := log.With("contextID", []byte(contextID))
log.Infof("caching %d provider results for provider: %s", len(digests), provider.ID)

mdb, err := meta.MarshalBinary()
if err != nil {
return fmt.Errorf("marshaling metadata: %w", err)
Expand All @@ -152,14 +158,13 @@ func (pi *ProviderIndex) Publish(ctx context.Context, provider peer.AddrInfo, co
Provider: &provider,
}

var joberr error
q := jobqueue.NewJobQueue(
func(ctx context.Context, digest mh.Multihash) error {
return appendProviderResult(ctx, pi.providerStore, digest, pr)
return appendProviderResult(ctx, providerStore, digest, pr, expire)
},
jobqueue.WithConcurrency(5),
jobqueue.WithErrorHandler(func(err error) {
log.Errorf("appending provider result: %w", err)
}),
jobqueue.WithErrorHandler(func(err error) { joberr = err }),
)
q.Startup()
for _, d := range digests {
Expand All @@ -168,7 +173,29 @@ func (pi *ProviderIndex) Publish(ctx context.Context, provider peer.AddrInfo, co
return err
}
}
q.Shutdown(ctx)
err = q.Shutdown(ctx)
if err != nil {
return fmt.Errorf("shutting down job queue: %w", err)
}
if joberr != nil {
return fmt.Errorf("appending provider result: %w", joberr)
}

log.Infof("cached %d provider results", len(digests))
return nil
}

// Publish should do the following:
// 1. Write the entries to the cache with no expiration until publishing is complete
// 2. Generate an advertisement for the advertised hashes and publish/announce it
func (pi *ProviderIndex) Publish(ctx context.Context, provider peer.AddrInfo, contextID string, digests []mh.Multihash, meta meta.Metadata) error {
log := log.With("contextID", []byte(contextID))

// cache but do not expire (entries will be expired via the notifier)
err := Cache(ctx, pi.providerStore, provider, contextID, digests, meta, false)
if err != nil {
return fmt.Errorf("caching provider results: %w", err)
}

id, err := pi.publisher.Publish(ctx, provider, contextID, digests, meta)
if err != nil {
Expand All @@ -179,15 +206,15 @@ func (pi *ProviderIndex) Publish(ctx context.Context, provider peer.AddrInfo, co
}

// TODO: atomic append...
func appendProviderResult(ctx context.Context, providerStore types.ProviderStore, digest mh.Multihash, meta model.ProviderResult) error {
func appendProviderResult(ctx context.Context, providerStore types.ProviderStore, digest mh.Multihash, meta model.ProviderResult, expire bool) error {
metas, err := providerStore.Get(ctx, digest)
if err != nil {
if err != types.ErrKeyNotFound {
return fmt.Errorf("getting existing provider results for digest: %s: %w", digest.B58String(), err)
}
}
metas = append(metas, meta)
err = providerStore.Set(ctx, digest, metas, false)
err = providerStore.Set(ctx, digest, metas, expire)
if err != nil {
return fmt.Errorf("setting provider results for digest: %s: %w", digest.B58String(), err)
}
Expand Down
62 changes: 61 additions & 1 deletion pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type ProviderIndex interface {
// 2. With returned provider results, filter additionally for claim type. If space dids are set, calculate an encodedcontextid's by hashing space DID and Hash, and filter for a matching context id
// Future TODO: kick off a conversion task to update the recrds
Find(context.Context, providerindex.QueryKey) ([]model.ProviderResult, error)
// Cache writes entries to the cache but does not publish/announce an
// advertisement for them. Entries expire after a pre-determined time.
Cache(ctx context.Context, provider peer.AddrInfo, contextID string, digests []multihash.Multihash, meta meta.Metadata) error
// Publish should do the following:
// 1. Write the entries to the cache with no expiration until publishing is complete
// 2. Generate an advertisement for the advertised hashes and publish/announce it
Expand Down Expand Up @@ -309,7 +312,7 @@ func fetchRetrievalURL(provider peer.AddrInfo, shard cid.Cid) (*url.URL, error)
// ideally however, IPNI would enable UCAN chains for publishing so that we could publish it directly from the storage service
// it doesn't for now, so we let SPs publish themselves them direct cache with us
func (is *IndexingService) CacheClaim(ctx context.Context, provider peer.AddrInfo, claim delegation.Delegation) error {
return errors.New("not implemented")
return CacheClaim(ctx, is.blobIndexLookup, is.claimLookup, is.providerIndex, is.provider, claim)
}

// PublishClaim caches and publishes a content claim
Expand Down Expand Up @@ -347,6 +350,63 @@ func NewIndexingService(blobIndexLookup BlobIndexLookup, claimLookup ClaimLookup
return is
}

func CacheClaim(ctx context.Context, blobIndex BlobIndexLookup, claimLookup ClaimLookup, provIndex ProviderIndex, provider peer.AddrInfo, claim delegation.Delegation) error {
caps := claim.Capabilities()
switch caps[0].Can() {
case assert.LocationAbility:
return cacheLocationCommitment(ctx, provIndex, provider, claim)
default:
return ErrUnrecognizedClaim
}
}

func cacheLocationCommitment(ctx context.Context, provIndex ProviderIndex, provider peer.AddrInfo, claim delegation.Delegation) error {
caps := claim.Capabilities()
if len(caps) == 0 {
return fmt.Errorf("missing capabilities in claim: %s", claim.Link())
}

if caps[0].Can() != assert.LocationAbility {
return fmt.Errorf("unsupported claim: %s", caps[0].Can())
}

nb, rerr := assert.LocationCaveatsReader.Read(caps[0].Nb())
if rerr != nil {
return fmt.Errorf("reading index claim data: %w", rerr)
}

digests := []multihash.Multihash{nb.Content.Hash()}
contextID, err := types.ContextID{Space: &nb.Space, Hash: nb.Content.Hash()}.ToEncoded()
if err != nil {
return fmt.Errorf("encoding advertisement context ID: %w", err)
}

var exp int
if claim.Expiration() != nil {
exp = *claim.Expiration()
}

var rng *metadata.Range
if nb.Range != nil {
rng = &metadata.Range{Offset: nb.Range.Offset, Length: nb.Range.Length}
}

meta := metadata.MetadataContext.New(
&metadata.LocationCommitmentMetadata{
Expiration: int64(exp),
Claim: asCID(claim.Link()),
Range: rng,
},
)

err = provIndex.Cache(ctx, provider, string(contextID), digests, meta)
if err != nil {
return fmt.Errorf("caching claim: %w", err)
}

return nil
}

func PublishClaim(ctx context.Context, blobIndex BlobIndexLookup, claimLookup ClaimLookup, provIndex ProviderIndex, provider peer.AddrInfo, claim delegation.Delegation) error {
caps := claim.Capabilities()
switch caps[0].Can() {
Expand Down

0 comments on commit 12c4721

Please sign in to comment.