Skip to content

Commit

Permalink
feat: Parallelize Legacy Indexing Service Lookup (#125)
Browse files Browse the repository at this point in the history
Previously, the Query method in IndexingService executed lookups
sequentially:
1. Query the normal indexing service.
2. If no results were found, query the block index store.

This forced an unnecessary delay when results were only available in the
block index store.

Changes:
- Run the primary (indexing service) and legacy (block index store)
queries in parallel.
- If the primary query returns results, the legacy query is immediately
canceled.
- Maintain the original behavior: prioritize primary results and only
fall back to legacy if needed.

- closes #122

Co-authored-by: frrist <[email protected]>
  • Loading branch information
frrist and frrist authored Feb 13, 2025
1 parent feb3f7b commit 61a312d
Showing 1 changed file with 81 additions and 49 deletions.
130 changes: 81 additions & 49 deletions pkg/service/legacy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,73 +47,105 @@ func (l *IndexingService) Publish(ctx context.Context, claim delegation.Delegati
}

func (l *IndexingService) Query(ctx context.Context, q types.Query) (types.QueryResult, error) {
results, err := l.indexingService.Query(ctx, q)
if err != nil {
return nil, err
}

if len(results.Claims()) > 0 || len(results.Indexes()) > 0 || len(q.Hashes) == 0 {
return results, nil
// Create a cancellable context for querying the indexStore so we can stop it early if the indexService succeeds.
bisCtx, cancelBis := context.WithCancel(ctx)
defer cancelBis()

// We'll capture the results of indexStore query in a channel, using the result if the indexService
// doesn't yield anything.
type indexResult struct {
claims map[cid.Cid]delegation.Delegation
indexes bytemap.ByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView]
err error
}

var locs []cassert.LocationCaveats
for _, h := range q.Hashes {
// lets see if we can materialize some location claims
content := ctypes.FromHash(h)
records, err := l.blockIndexStore.Query(ctx, content.Hash())
if err != nil {
if errors.Is(err, types.ErrKeyNotFound) {
continue
indexResCh := make(chan indexResult, 1)

// Query the indexStore async.
go func() {
var locs []cassert.LocationCaveats
for _, h := range q.Hashes {
// lets see if we can materialize some location claims
content := ctypes.FromHash(h)
records, err := l.blockIndexStore.Query(bisCtx, content.Hash())
if err != nil {
if errors.Is(err, types.ErrKeyNotFound) {
continue
}
// bail if non-continuable error.
indexResCh <- indexResult{err: err}
return
}
return nil, err
}

for _, r := range records {
u, err := url.Parse(r.CarPath)
if err != nil || !u.IsAbs() {
// non-URL is legacy region/bucket/key format
// e.g. us-west-2/dotstorage-prod-1/raw/bafy...
parts := strings.Split(r.CarPath, "/")
key := strings.Join(parts[2:], "/")
shard, err := bucketKeyToShardLink(key)
if err != nil {
for _, r := range records {
u, err := url.Parse(r.CarPath)
if err != nil || !u.IsAbs() {
// non-URL is legacy region/bucket/key format
// e.g. us-west-2/dotstorage-prod-1/raw/bafy...
parts := strings.Split(r.CarPath, "/")
key := strings.Join(parts[2:], "/")
shard, err := bucketKeyToShardLink(key)
if err != nil {
continue
}

u = l.bucketURL.JoinPath(fmt.Sprintf("/%s/%s.car", shard.String(), shard.String()))
locs = append(locs, cassert.LocationCaveats{
Content: content,
Location: []url.URL{*u},
Range: &cassert.Range{Offset: r.Offset, Length: &r.Length},
})
continue
}

u = l.bucketURL.JoinPath(fmt.Sprintf("/%s/%s.car", shard.String(), shard.String()))
locs = append(locs, cassert.LocationCaveats{
Content: content,
Location: []url.URL{*u},
Range: &cassert.Range{Offset: r.Offset, Length: &r.Length},
})
continue
}
}

locs = append(locs, cassert.LocationCaveats{
Content: content,
Location: []url.URL{*u},
Range: &cassert.Range{Offset: r.Offset, Length: &r.Length},
})
claims := make(map[cid.Cid]delegation.Delegation, len(locs))
for _, loc := range locs {
claim, err := cassert.Location.Delegate(
l.id,
l.id,
l.id.DID().String(),
loc,
delegation.WithExpiration(int(time.Now().Add(time.Hour).Unix())),
)
if err != nil {
indexResCh <- indexResult{err: err}
return
}
claims[link.ToCID(claim.Link())] = claim
}

indexes := bytemap.NewByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView](0)
indexResCh <- indexResult{claims: claims, indexes: indexes, err: nil}
}()

results, err := l.indexingService.Query(ctx, q)
if err != nil {
// if we fail to query the indexService, kill the indexStore query and bail with error.
// cancellation of the indexStore query is handled in defer statement at top of method.
return nil, err
}
if len(results.Claims()) > 0 || len(results.Indexes()) > 0 || len(q.Hashes) == 0 {
// indexService produced a result, kill the blockIndex query and return results.
// cancellation of the blockStore query is handled in defer statement at top of method.
return results, nil
}

claims := map[cid.Cid]delegation.Delegation{}
for _, loc := range locs {
claim, err := cassert.Location.Delegate(
l.id,
l.id,
l.id.DID().String(),
loc,
delegation.WithExpiration(int(time.Now().Add(time.Hour).Unix())),
)
if err != nil {
return nil, err
}
claims[link.ToCID(claim.Link())] = claim
// indexService query yields empty result, check the indexStore.

bsRes := <-indexResCh
if bsRes.err != nil {
return nil, bsRes.err
}

indexes := bytemap.NewByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView](0)
return queryresult.Build(claims, indexes)
// the indexService yielded an empty result, use the indexStore query result.
return queryresult.Build(bsRes.claims, bsRes.indexes)
}

var _ types.Service = (*IndexingService)(nil)
Expand Down

0 comments on commit 61a312d

Please sign in to comment.