Skip to content

Commit

Permalink
asss
Browse files Browse the repository at this point in the history
  • Loading branch information
CascadingRadium committed Nov 6, 2024
1 parent 25d6851 commit 02e5403
Show file tree
Hide file tree
Showing 7 changed files with 467 additions and 5 deletions.
91 changes: 89 additions & 2 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var reflectStaticSizeIndexSnapshot int
// exported variable, or at the index level by setting the FieldTFRCacheThreshold
// in the kvConfig.
var DefaultFieldTFRCacheThreshold uint64 = 10
var DefaultSynonymTermReaderCacheThreshold uint64 = 10

func init() {
var is interface{} = IndexSnapshot{}
Expand Down Expand Up @@ -87,8 +88,9 @@ type IndexSnapshot struct {
m sync.Mutex // Protects the fields that follow.
refs int64

m2 sync.Mutex // Protects the fields that follow.
fieldTFRs map[string][]*IndexSnapshotTermFieldReader // keyed by field, recycled TFR's
m2 sync.Mutex // Protects the fields that follow.
fieldTFRs map[string][]*IndexSnapshotTermFieldReader // keyed by field, recycled TFR's
synonymTermReaders map[string][]*IndexSnapshotSynonymTermReader // keyed by thesaurus name, recycled thesaurus readers
}

func (i *IndexSnapshot) Segments() []*SegmentSnapshot {
Expand Down Expand Up @@ -649,6 +651,15 @@ func (is *IndexSnapshot) getFieldTFRCacheThreshold() uint64 {
return DefaultFieldTFRCacheThreshold
}

func (is *IndexSnapshot) getSynonymTermReaderCacheThreshold() uint64 {
if is.parent.config != nil {
if _, ok := is.parent.config["SynonymTermReaderCacheThreshold"]; ok {
return is.parent.config["SynonymTermReaderCacheThreshold"].(uint64)
}
}
return DefaultSynonymTermReaderCacheThreshold
}

func (is *IndexSnapshot) recycleTermFieldReader(tfr *IndexSnapshotTermFieldReader) {
if !tfr.recycle {
// Do not recycle an optimized unadorned term field reader (used for
Expand Down Expand Up @@ -677,6 +688,25 @@ func (is *IndexSnapshot) recycleTermFieldReader(tfr *IndexSnapshotTermFieldReade
is.m2.Unlock()
}

func (is *IndexSnapshot) recycleSynonymTermReader(str *IndexSnapshotSynonymTermReader) {
is.parent.rootLock.RLock()
obsolete := is.parent.root != is
is.parent.rootLock.RUnlock()
if obsolete {
// if we're not the current root (mutations happened), don't bother recycling
return
}

is.m2.Lock()
if is.synonymTermReaders == nil {
is.synonymTermReaders = map[string][]*IndexSnapshotSynonymTermReader{}
}
if uint64(len(is.synonymTermReaders[str.name])) < is.getSynonymTermReaderCacheThreshold() {
is.synonymTermReaders[str.name] = append(is.synonymTermReaders[str.name], str)
}
is.m2.Unlock()
}

func docNumberToBytes(buf []byte, in uint64) []byte {
if len(buf) != 8 {
if cap(buf) >= 8 {
Expand Down Expand Up @@ -956,3 +986,60 @@ func (is *IndexSnapshot) CloseCopyReader() error {
// close the index snapshot normally
return is.Close()
}

func (is *IndexSnapshot) allocSynonymTermReader(name string) (str *IndexSnapshotSynonymTermReader) {
is.m2.Lock()
if is.synonymTermReaders != nil {
strs := is.synonymTermReaders[name]
last := len(strs) - 1
if last >= 0 {
str = strs[last]
strs[last] = nil
is.synonymTermReaders[name] = strs[:last]
is.m2.Unlock()
return
}
}
is.m2.Unlock()
return &IndexSnapshotSynonymTermReader{}
}

func (is *IndexSnapshot) SynonymTermReader(ctx context.Context, thesaurusName string, term []byte) (index.SynonymTermReader, error) {
rv := is.allocSynonymTermReader(thesaurusName)

rv.name = thesaurusName
rv.snapshot = is
if rv.postings == nil {
rv.postings = make([]segment.SynonymsList, len(is.segment))
}
if rv.iterators == nil {
rv.iterators = make([]segment.SynonymsIterator, len(is.segment))
}
rv.segmentOffset = 0

if rv.thesauri == nil {
rv.thesauri = make([]segment.Thesaurus, len(is.segment))
for i, s := range is.segment {
if synSeg, ok := s.segment.(segment.SynonymSegment); ok {
thes, err := synSeg.Thesaurus(thesaurusName)
if err != nil {
return nil, err
}
rv.thesauri[i] = thes
}
}
}

for i, s := range is.segment {
if _, ok := s.segment.(segment.SynonymSegment); ok {
pl, err := rv.thesauri[i].SynonymsList(term, s.deleted, rv.postings[i])
if err != nil {
return nil, err
}
rv.postings[i] = pl

rv.iterators[i] = pl.Iterator(rv.iterators[i])
}
}
return rv, nil
}
82 changes: 82 additions & 0 deletions index/scorch/snapshot_index_str.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) 2024 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package scorch

import (
"reflect"

"github.com/blevesearch/bleve/v2/size"
segment "github.com/blevesearch/scorch_segment_api/v2"
)

var reflectStaticSizeIndexSnapshotSynonymTermReader int

func init() {
var istr IndexSnapshotSynonymTermReader
reflectStaticSizeIndexSnapshotSynonymTermReader = int(reflect.TypeOf(istr).Size())
}

type IndexSnapshotSynonymTermReader struct {
name string
snapshot *IndexSnapshot
thesauri []segment.Thesaurus
postings []segment.SynonymsList
iterators []segment.SynonymsIterator
segmentOffset int
}

func (i *IndexSnapshotSynonymTermReader) Size() int {
sizeInBytes := reflectStaticSizeIndexSnapshotSynonymTermReader + size.SizeOfPtr +
len(i.name)

for _, thesaurus := range i.thesauri {
sizeInBytes += thesaurus.Size()
}

for _, postings := range i.postings {
sizeInBytes += postings.Size()
}

for _, iterator := range i.iterators {
sizeInBytes += iterator.Size()
}

return sizeInBytes
}

func (i *IndexSnapshotSynonymTermReader) Next() (string, error) {
// find the next hit
for i.segmentOffset < len(i.iterators) {
if i.iterators[i.segmentOffset] != nil {
next, err := i.iterators[i.segmentOffset].Next()
if err != nil {
return "", err
}
if next != nil {
synTerm := next.Term()
return synTerm, nil
}
i.segmentOffset++
}
}
return "", nil
}

func (i *IndexSnapshotSynonymTermReader) Close() error {
if i.snapshot != nil {
i.snapshot.recycleSynonymTermReader(i)
}
return nil
}
23 changes: 22 additions & 1 deletion index_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,10 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
}

var knnHits []*search.DocumentMatch
var thesauri map[string]map[string][]string
var ok bool
var skipKnnCollector bool
var skipSynonymCollector bool
if req.PreSearchData != nil {
for k, v := range req.PreSearchData {
switch k {
Expand All @@ -516,8 +518,16 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
if !ok {
return nil, fmt.Errorf("knn preSearchData must be of type []*search.DocumentMatch")
}
skipKnnCollector = true
}
case search.SynonymPreSearchDataKey:
if v != nil {
thesauri, ok = v.(search.FieldTermSynonyms)
if !ok {
return nil, fmt.Errorf("synonym preSearchData must be of type map[string]map[string][]string")
}
skipSynonymCollector = true
}
skipKnnCollector = true
}
}
}
Expand All @@ -528,6 +538,10 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
}
}

if !skipSynonymCollector && mappingHasSynonymSources(i.m) {
terms := getTermsFromQuery(req.Query)
}

setKnnHitsInCollector(knnHits, req, coll)

// This callback and variable handles the tracking of bytes read
Expand Down Expand Up @@ -1127,3 +1141,10 @@ func (i *indexImpl) FireIndexEvent() {
internalEventIndex.FireIndexEvent()
}
}

func mappingHasSynonymSources(m mapping.IndexMapping) bool {
if im, ok := m.(*mapping.IndexMappingImpl); ok {
return len(im.SynonymSources) > 0
}
return false
}
15 changes: 15 additions & 0 deletions mapping/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type IndexMappingImpl struct {
IndexDynamic bool `json:"index_dynamic"`
DocValuesDynamic bool `json:"docvalues_dynamic"`
CustomAnalysis *customAnalysis `json:"analysis,omitempty"`
SynonymSources map[string]*SynonymSource `json:"synonym_sources,omitempty"`
cache *registry.Cache
}

Expand Down Expand Up @@ -186,6 +187,12 @@ func (im *IndexMappingImpl) Validate() error {
return err
}
}
for _, synSource := range im.SynonymSources {
err = synSource.Validate(im.cache)
if err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -283,6 +290,14 @@ func (im *IndexMappingImpl) UnmarshalJSON(data []byte) error {
if err != nil {
return err
}
case "synonym_sources":
if im.SynonymSources == nil {
im.SynonymSources = make(map[string]*SynonymSource)
}
err := util.UnmarshalJSON(v, &im.SynonymSources)
if err != nil {
return err
}
default:
invalidKeys = append(invalidKeys, k)
}
Expand Down
56 changes: 56 additions & 0 deletions mapping/synonym.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2024 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mapping

import (
"fmt"

"github.com/blevesearch/bleve/v2/registry"
)

type SynonymSource struct {
CollectionName string `json:"collection"`
AnalyzerName string `json:"analyzer"`
}

func (s *SynonymSource) Collection() string {
return s.CollectionName
}

func (s *SynonymSource) Analyzer() string {
return s.AnalyzerName
}

func (s *SynonymSource) SetCollection(c string) {
s.CollectionName = c
}

func (s *SynonymSource) SetAnalyzer(a string) {
s.AnalyzerName = a
}

func (s *SynonymSource) Validate(c *registry.Cache) error {
if s.CollectionName == "" {
return fmt.Errorf("collection name is required")
}
if s.AnalyzerName == "" {
return fmt.Errorf("analyzer name is required")
}
_, err := c.AnalyzerNamed(s.AnalyzerName)
if err != nil {
return fmt.Errorf("analyzer named '%s' not found", s.AnalyzerName)
}
return nil
}
Loading

0 comments on commit 02e5403

Please sign in to comment.