Skip to content

Commit

Permalink
Presearch Code Refactor
Browse files Browse the repository at this point in the history
- Refactor the presearch code path to make
   it more generic and extensible.
  • Loading branch information
CascadingRadium committed Dec 6, 2024
1 parent 60489d6 commit f539dab
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 63 deletions.
114 changes: 96 additions & 18 deletions index_alias_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type indexAliasImpl struct {
indexes []Index
mutex sync.RWMutex
open bool
// if all the indexes in tha alias have the same mapping
// then the user can set the mapping here to avoid
// checking the mapping of each index in the alias
mapping mapping.IndexMapping
}

// NewIndexAlias creates a new IndexAlias over the provided
Expand Down Expand Up @@ -168,7 +172,10 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
// indicates that this index alias is set as an Index
// in another alias, so we need to do a preSearch search
// and NOT a real search
return preSearchDataSearch(ctx, req, i.indexes...)
flags := &preSearchFlags{
knn: requestHasKNN(req), // set knn flag if the request has KNN
}
return preSearchDataSearch(ctx, req, flags, i.indexes...)
}

// at this point we know we are doing a real search
Expand All @@ -184,7 +191,7 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
if req.PreSearchData != nil {
if requestHasKNN(req) {
var err error
preSearchData, err = redistributeKNNPreSearchData(req, i.indexes)
preSearchData, err = redistributePreSearchData(req, i.indexes)
if err != nil {
return nil, err
}
Expand All @@ -208,9 +215,10 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
// - the request requires preSearch
var preSearchDuration time.Duration
var sr *SearchResult
if req.PreSearchData == nil && preSearchRequired(req) {
flags := preSearchRequired(req, i.mapping)
if req.PreSearchData == nil && flags != nil {
searchStart := time.Now()
preSearchResult, err := preSearch(ctx, req, i.indexes...)
preSearchResult, err := preSearch(ctx, req, flags, i.indexes...)
if err != nil {
return nil, err
}
Expand All @@ -221,17 +229,17 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
return preSearchResult, nil
}
// finalize the preSearch result now
finalizePreSearchResult(req, preSearchResult)
finalizePreSearchResult(req, flags, preSearchResult)

// if there are no errors, then merge the data in the preSearch result
// and construct the preSearchData to be used in the actual search
// if the request is satisfied by the preSearch result, then we can
// directly return the preSearch result as the final result
if requestSatisfiedByPreSearch(req) {
if requestSatisfiedByPreSearch(req, flags) {
sr = finalizeSearchResult(req, preSearchResult)
// no need to run the 2nd phase MultiSearch(..)
} else {
preSearchData, err = constructPreSearchData(req, preSearchResult, i.indexes)
preSearchData, err = constructPreSearchData(req, flags, preSearchResult, i.indexes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -352,6 +360,20 @@ func (i *indexAliasImpl) Close() error {
return nil
}

// SetIndexMapping sets the mapping for the alias and must be used
// ONLY when all the indexes in the alias have the same mapping.
// This is to avoid checking the mapping of each index in the alias
// when executing a search request.
func (i *indexAliasImpl) SetIndexMapping(m mapping.IndexMapping) error {
i.mutex.Lock()
defer i.mutex.Unlock()
if !i.open {
return ErrorIndexClosed
}
i.mapping = m
return nil
}

func (i *indexAliasImpl) Mapping() mapping.IndexMapping {
i.mutex.RLock()
defer i.mutex.RUnlock()
Expand All @@ -360,6 +382,11 @@ func (i *indexAliasImpl) Mapping() mapping.IndexMapping {
return nil
}

// if the mapping is already set, return it
if i.mapping != nil {
return i.mapping
}

err := i.isAliasToSingleIndex()
if err != nil {
return nil
Expand Down Expand Up @@ -520,21 +547,35 @@ type asyncSearchResult struct {
Err error
}

func preSearchRequired(req *SearchRequest) bool {
return requestHasKNN(req)
// preSearchFlags is a struct to hold flags indicating why preSearch is required
type preSearchFlags struct {
knn bool
}

// preSearchRequired checks if preSearch is required and returns the presearch flags struct
// indicating which preSearch is required
func preSearchRequired(req *SearchRequest, m mapping.IndexMapping) *preSearchFlags {
// Check for KNN query
knn := requestHasKNN(req)
if knn {
return &preSearchFlags{
knn: knn,
}
}
return nil
}

func preSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
func preSearch(ctx context.Context, req *SearchRequest, flags *preSearchFlags, indexes ...Index) (*SearchResult, error) {
// create a dummy request with a match none query
// since we only care about the preSearchData in PreSearch
dummyRequest := &SearchRequest{
Query: query.NewMatchNoneQuery(),
}
newCtx := context.WithValue(ctx, search.PreSearchKey, true)
if requestHasKNN(req) {
if flags.knn {
addKnnToDummyRequest(dummyRequest, req)
}
return preSearchDataSearch(newCtx, dummyRequest, indexes...)
return preSearchDataSearch(newCtx, dummyRequest, flags, indexes...)
}

// if the request is satisfied by just the preSearch result,
Expand Down Expand Up @@ -585,20 +626,20 @@ func finalizeSearchResult(req *SearchRequest, preSearchResult *SearchResult) *Se
return preSearchResult
}

func requestSatisfiedByPreSearch(req *SearchRequest) bool {
if requestHasKNN(req) && isKNNrequestSatisfiedByPreSearch(req) {
func requestSatisfiedByPreSearch(req *SearchRequest, flags *preSearchFlags) bool {
if flags.knn && isKNNrequestSatisfiedByPreSearch(req) {
return true
}
return false
}

func constructPreSearchData(req *SearchRequest, preSearchResult *SearchResult, indexes []Index) (map[string]map[string]interface{}, error) {
func constructPreSearchData(req *SearchRequest, flags *preSearchFlags, preSearchResult *SearchResult, indexes []Index) (map[string]map[string]interface{}, error) {
mergedOut := make(map[string]map[string]interface{}, len(indexes))
for _, index := range indexes {
mergedOut[index.Name()] = make(map[string]interface{})
}
var err error
if requestHasKNN(req) {
if flags.knn {
mergedOut, err = constructKnnPreSearchData(mergedOut, preSearchResult, indexes)
if err != nil {
return nil, err
Expand All @@ -607,7 +648,7 @@ func constructPreSearchData(req *SearchRequest, preSearchResult *SearchResult, i
return mergedOut, nil
}

func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
func preSearchDataSearch(ctx context.Context, req *SearchRequest, flags *preSearchFlags, indexes ...Index) (*SearchResult, error) {
asyncResults := make(chan *asyncSearchResult, len(indexes))
// run search on each index in separate go routine
var waitGroup sync.WaitGroup
Expand Down Expand Up @@ -638,7 +679,7 @@ func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Ind
if prp == nil {
// first valid preSearch result
// create a new preSearch result processor
prp = createPreSearchResultProcessor(req)
prp = createPreSearchResultProcessor(req, flags)
}
prp.add(asr.Result, asr.Name)
if sr == nil {
Expand Down Expand Up @@ -684,6 +725,43 @@ func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Ind
return sr, nil
}

// redistributePreSearchData redistributes the preSearchData coming from the individual constituent index aliases
// of an alias to the individual indexes in the alias. This is necessary when the preSearchData is specific to each
// index in the alias. This is used only in the case of an alias tree, where the indexes are at the leaves of the tree,
// and the master alias is at the root. At each level of the tree, the preSearchData needs to be redistributed to the
// indexes/aliases at that level. Because the preSearchData is specific to each final index at the leaf.
func redistributePreSearchData(req *SearchRequest, indexes []Index) (map[string]map[string]interface{}, error) {
rv := make(map[string]map[string]interface{})
for _, index := range indexes {
rv[index.Name()] = make(map[string]interface{})
}
if knnHits, ok := req.PreSearchData[search.KnnPreSearchDataKey].([]*search.DocumentMatch); ok {
// the preSearchData for KNN is a list of DocumentMatch objects
// that need to be redistributed to the right index.
// This is used only in the case of an alias tree, where the indexes
// are at the leaves of the tree, and the master alias is at the root.
// At each level of the tree, the preSearchData needs to be redistributed
// to the indexes/aliases at that level. Because the preSearchData is
// specific to each final index at the leaf.
segregatedKnnHits, err := validateAndDistributeKNNHits(knnHits, indexes)
if err != nil {
return nil, err
}
for _, index := range indexes {
rv[index.Name()][search.KnnPreSearchDataKey] = segregatedKnnHits[index.Name()]
}
}
return rv, nil
}

// finalizePreSearchResult finalizes the preSearch result by applying the finalization steps
// specific to the preSearch flags
func finalizePreSearchResult(req *SearchRequest, flags *preSearchFlags, preSearchResult *SearchResult) {
if flags.knn {
preSearchResult.Hits = finalizeKNNResults(req, preSearchResult.Hits)
}
}

// hitsInCurrentPage returns the hits in the current page
// using the From and Size parameters in the request
func hitsInCurrentPage(req *SearchRequest, hits []*search.DocumentMatch) []*search.DocumentMatch {
Expand Down
45 changes: 38 additions & 7 deletions pre_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type preSearchResultProcessor interface {
finalize(*SearchResult)
}

// -----------------------------------------------------------------------------
// KNN preSearchResultProcessor for handling KNN presearch results
type knnPreSearchResultProcessor struct {
addFn func(sr *SearchResult, indexName string)
finalizeFn func(sr *SearchResult)
Expand All @@ -44,16 +46,45 @@ func (k *knnPreSearchResultProcessor) finalize(sr *SearchResult) {
}

// -----------------------------------------------------------------------------
// Master struct that can hold any number of presearch result processors
type compositePreSearchResultProcessor struct {
presearchResultProcessors []preSearchResultProcessor
}

func finalizePreSearchResult(req *SearchRequest, preSearchResult *SearchResult) {
if requestHasKNN(req) {
preSearchResult.Hits = finalizeKNNResults(req, preSearchResult.Hits)
// Implements the add method, which forwards to all the internal processors
func (m *compositePreSearchResultProcessor) add(sr *SearchResult, indexName string) {
for _, p := range m.presearchResultProcessors {
p.add(sr, indexName)
}
}

func createPreSearchResultProcessor(req *SearchRequest) preSearchResultProcessor {
if requestHasKNN(req) {
return newKnnPreSearchResultProcessor(req)
// Implements the finalize method, which forwards to all the internal processors
func (m *compositePreSearchResultProcessor) finalize(sr *SearchResult) {
for _, p := range m.presearchResultProcessors {
p.finalize(sr)
}
}

// -----------------------------------------------------------------------------
// Function to create the appropriate preSearchResultProcessor(s)
func createPreSearchResultProcessor(req *SearchRequest, flags *preSearchFlags) preSearchResultProcessor {
var processors []preSearchResultProcessor
// Add KNN processor if the request has KNN
if flags.knn {
if knnProcessor := newKnnPreSearchResultProcessor(req); knnProcessor != nil {
processors = append(processors, knnProcessor)
}
}
// Return based on the number of processors, optimizing for the common case of 1 processor
// If there are no processors, return nil
switch len(processors) {
case 0:
return nil
case 1:
return processors[0]
default:
return &compositePreSearchResultProcessor{
presearchResultProcessors: processors,
}
}
return &knnPreSearchResultProcessor{} // equivalent to nil
}
5 changes: 5 additions & 0 deletions search.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,3 +589,8 @@ func (r *SearchRequest) SortFunc() func(data sort.Interface) {

return sort.Sort
}

func isMatchNoneQuery(q query.Query) bool {
_, ok := q.(*query.MatchNoneQuery)
return ok
}
39 changes: 2 additions & 37 deletions search_knn.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (i *indexImpl) runKnnCollector(ctx context.Context, req *SearchRequest, rea
continue
}

if _, ok := filterQ.(*query.MatchNoneQuery); ok {
if isMatchNoneQuery(filterQ) {
// Filtering required since no hits are eligible.
requiresFiltering[idx] = true
// a match none query just means none the documents are eligible
Expand Down Expand Up @@ -559,7 +559,7 @@ func requestHasKNN(req *SearchRequest) bool {
func isKNNrequestSatisfiedByPreSearch(req *SearchRequest) bool {
// if req.Query is not match_none => then we need to go to phase 2
// to perform the actual query.
if _, ok := req.Query.(*query.MatchNoneQuery); !ok {
if !isMatchNoneQuery(req.Query) {
return false
}
// req.Query is a match_none query
Expand Down Expand Up @@ -598,41 +598,6 @@ func addKnnToDummyRequest(dummyReq *SearchRequest, realReq *SearchRequest) {
dummyReq.Sort = realReq.Sort
}

// the preSearchData for KNN is a list of DocumentMatch objects
// that need to be redistributed to the right index.
// This is used only in the case of an alias tree, where the indexes
// are at the leaves of the tree, and the master alias is at the root.
// At each level of the tree, the preSearchData needs to be redistributed
// to the indexes/aliases at that level. Because the preSearchData is
// specific to each final index at the leaf.
func redistributeKNNPreSearchData(req *SearchRequest, indexes []Index) (map[string]map[string]interface{}, error) {
knnHits, ok := req.PreSearchData[search.KnnPreSearchDataKey].([]*search.DocumentMatch)
if !ok {
return nil, fmt.Errorf("request does not have knn preSearchData for redistribution")
}
segregatedKnnHits, err := validateAndDistributeKNNHits(knnHits, indexes)
if err != nil {
return nil, err
}

rv := make(map[string]map[string]interface{})
for _, index := range indexes {
rv[index.Name()] = make(map[string]interface{})
}

for _, index := range indexes {
for k, v := range req.PreSearchData {
switch k {
case search.KnnPreSearchDataKey:
rv[index.Name()][k] = segregatedKnnHits[index.Name()]
default:
rv[index.Name()][k] = v
}
}
}
return rv, nil
}

func newKnnPreSearchResultProcessor(req *SearchRequest) *knnPreSearchResultProcessor {
kArray := make([]int64, len(req.KNN))
for i, knnReq := range req.KNN {
Expand Down
2 changes: 1 addition & 1 deletion search_no_knn.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func requestHasKNN(req *SearchRequest) bool {
func addKnnToDummyRequest(dummyReq *SearchRequest, realReq *SearchRequest) {
}

func redistributeKNNPreSearchData(req *SearchRequest, indexes []Index) (map[string]map[string]interface{}, error) {
func validateAndDistributeKNNHits(knnHits []*search.DocumentMatch, indexes []Index) (map[string][]*search.DocumentMatch, error) {
return nil, nil
}

Expand Down

0 comments on commit f539dab

Please sign in to comment.