Skip to content

Commit

Permalink
feat(indexing): parallelize primary and legacy queries for faster loo…
Browse files Browse the repository at this point in the history
…kups

Previously, the Query method in IndexingService executed lookups sequentially:
1. Query the normal indexing service.
2. If no results were found, query the block index store.

This forced an unnecessary delay when results were only available in the block index store.

Changes:
- Run the primary (indexing service) and legacy (block index store) queries in parallel.
- If the primary query returns results, the legacy query is immediately canceled.
- Maintain the original behavior: prioritize primary results and only fall back to legacy if needed.

- closes #122
  • Loading branch information
frrist committed Feb 12, 2025
1 parent feb3f7b commit 01fa090
Showing 1 changed file with 81 additions and 49 deletions.
130 changes: 81 additions & 49 deletions pkg/service/legacy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,73 +47,105 @@ func (l *IndexingService) Publish(ctx context.Context, claim delegation.Delegati
}

func (l *IndexingService) Query(ctx context.Context, q types.Query) (types.QueryResult, error) {
results, err := l.indexingService.Query(ctx, q)
if err != nil {
return nil, err
}

if len(results.Claims()) > 0 || len(results.Indexes()) > 0 || len(q.Hashes) == 0 {
return results, nil
// Create a cancellable context for querying the indexStore so we can stop it early if the indexService succeeds.
bisCtx, cancelBis := context.WithCancel(ctx)
defer cancelBis()

// We'll capture the results of indexStore query in a channel, using the result if the indexService
// doesn't yield anything.
type indexResult struct {
claims map[cid.Cid]delegation.Delegation
indexes bytemap.ByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView]
err error
}

var locs []cassert.LocationCaveats
for _, h := range q.Hashes {
// lets see if we can materialize some location claims
content := ctypes.FromHash(h)
records, err := l.blockIndexStore.Query(ctx, content.Hash())
if err != nil {
if errors.Is(err, types.ErrKeyNotFound) {
continue
indexResCh := make(chan indexResult, 1)

// Query the indexStore async.
go func() {
var locs []cassert.LocationCaveats
for _, h := range q.Hashes {
// lets see if we can materialize some location claims
content := ctypes.FromHash(h)
records, err := l.blockIndexStore.Query(bisCtx, content.Hash())
if err != nil {
if errors.Is(err, types.ErrKeyNotFound) {
continue
}
// bail if non-continuable error.
indexResCh <- indexResult{err: err}
return

Check warning on line 76 in pkg/service/legacy/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/legacy/service.go#L75-L76

Added lines #L75 - L76 were not covered by tests
}
return nil, err
}

for _, r := range records {
u, err := url.Parse(r.CarPath)
if err != nil || !u.IsAbs() {
// non-URL is legacy region/bucket/key format
// e.g. us-west-2/dotstorage-prod-1/raw/bafy...
parts := strings.Split(r.CarPath, "/")
key := strings.Join(parts[2:], "/")
shard, err := bucketKeyToShardLink(key)
if err != nil {
for _, r := range records {
u, err := url.Parse(r.CarPath)
if err != nil || !u.IsAbs() {
// non-URL is legacy region/bucket/key format
// e.g. us-west-2/dotstorage-prod-1/raw/bafy...
parts := strings.Split(r.CarPath, "/")
key := strings.Join(parts[2:], "/")
shard, err := bucketKeyToShardLink(key)
if err != nil {
continue
}

u = l.bucketURL.JoinPath(fmt.Sprintf("/%s/%s.car", shard.String(), shard.String()))
locs = append(locs, cassert.LocationCaveats{
Content: content,
Location: []url.URL{*u},
Range: &cassert.Range{Offset: r.Offset, Length: &r.Length},
})
continue
}

u = l.bucketURL.JoinPath(fmt.Sprintf("/%s/%s.car", shard.String(), shard.String()))
locs = append(locs, cassert.LocationCaveats{
Content: content,
Location: []url.URL{*u},
Range: &cassert.Range{Offset: r.Offset, Length: &r.Length},
})
continue
}
}

locs = append(locs, cassert.LocationCaveats{
Content: content,
Location: []url.URL{*u},
Range: &cassert.Range{Offset: r.Offset, Length: &r.Length},
})
claims := make(map[cid.Cid]delegation.Delegation, len(locs))
for _, loc := range locs {
claim, err := cassert.Location.Delegate(
l.id,
l.id,
l.id.DID().String(),
loc,
delegation.WithExpiration(int(time.Now().Add(time.Hour).Unix())),
)
if err != nil {
indexResCh <- indexResult{err: err}
return
}

Check warning on line 120 in pkg/service/legacy/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/legacy/service.go#L118-L120

Added lines #L118 - L120 were not covered by tests
claims[link.ToCID(claim.Link())] = claim
}

indexes := bytemap.NewByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView](0)
indexResCh <- indexResult{claims: claims, indexes: indexes, err: nil}
}()

results, err := l.indexingService.Query(ctx, q)
if err != nil {
// if we fail to query the indexService, kill the indexStore query and bail with error.
// cancellation of the indexStore query is handled in defer statement at top of method.
return nil, err
}
if len(results.Claims()) > 0 || len(results.Indexes()) > 0 || len(q.Hashes) == 0 {
// indexService produced a result, kill the blockIndex query and return results.
// cancellation of the blockStore query is handled in defer statement at top of method.
return results, nil
}

claims := map[cid.Cid]delegation.Delegation{}
for _, loc := range locs {
claim, err := cassert.Location.Delegate(
l.id,
l.id,
l.id.DID().String(),
loc,
delegation.WithExpiration(int(time.Now().Add(time.Hour).Unix())),
)
if err != nil {
return nil, err
}
claims[link.ToCID(claim.Link())] = claim
// indexService query yields empty result, check the indexStore.

bsRes := <-indexResCh
if bsRes.err != nil {
return nil, bsRes.err

Check warning on line 144 in pkg/service/legacy/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/legacy/service.go#L144

Added line #L144 was not covered by tests
}

indexes := bytemap.NewByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView](0)
return queryresult.Build(claims, indexes)
// the indexService yielded an empty result, use the indexStore query result.
return queryresult.Build(bsRes.claims, bsRes.indexes)
}

var _ types.Service = (*IndexingService)(nil)
Expand Down

0 comments on commit 01fa090

Please sign in to comment.