From 682c914d993ccfb2ce4963693901ec529d1937f5 Mon Sep 17 00:00:00 2001 From: Likith B Date: Wed, 30 Oct 2024 14:45:34 +0530 Subject: [PATCH] MB-57888: WIP: Updated Merge Process to Support Index Update --- build.go | 2 ++ merge.go | 61 ++++++++++++++++++++++++++++++---- section_faiss_vector_index.go | 16 +++++++-- section_inverted_text_index.go | 27 +++++++++++---- segment.go | 7 ++++ 5 files changed, 96 insertions(+), 17 deletions(-) diff --git a/build.go b/build.go index cbbd2ab..dc336db 100644 --- a/build.go +++ b/build.go @@ -21,6 +21,7 @@ import ( "math" "os" + index "github.com/blevesearch/bleve_index_api" "github.com/blevesearch/vellum" ) @@ -169,6 +170,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64 sectionsIndexOffset: sectionsIndexOffset, fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), docValueOffset: 0, // docValueOffsets identified automatically by the section + updatedFields: make(map[string]index.FieldInfo), fieldFSTs: make(map[uint16]*vellum.FST), vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), diff --git a/merge.go b/merge.go index 683d920..904dd2c 100644 --- a/merge.go +++ b/merge.go @@ -24,6 +24,7 @@ import ( "sort" "github.com/RoaringBitmap/roaring" + index "github.com/blevesearch/bleve_index_api" seg "github.com/blevesearch/scorch_segment_api/v2" "github.com/golang/snappy" ) @@ -48,6 +49,9 @@ func (*ZapPlugin) Merge(segments []seg.Segment, drops []*roaring.Bitmap, path st default: panic(fmt.Sprintf("oops, unexpected segment type: %T", segment)) } + if s, ok := segment.(seg.UpdatableSegment); ok { + segmentBases[segmenti].updatedFields = s.UpdatedFields() + } } return mergeSegmentBases(segmentBases, drops, path, DefaultChunkMode, closeCh, s) } @@ -109,6 +113,19 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat return newDocNums, uint64(cr.Count()), nil } +func filterFields(fieldsInv []string, fieldInfo map[string]*index.FieldInfo) []string { + rv := make([]string, 0) + for _, field := range fieldsInv { + if val, ok := fieldInfo[field]; ok { + if val.All { + continue + } + } + rv = append(rv, field) + } + return rv +} + func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) ( newDocNums [][]uint64, numDocs, storedIndexOffset uint64, @@ -117,6 +134,8 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, var fieldsSame bool fieldsSame, fieldsInv = mergeFields(segments) + updatedFields := mergeUpdatedFields(segments) + fieldsInv = filterFields(fieldsInv, updatedFields) fieldsMap = mapFields(fieldsInv) numDocs = computeNewDocCount(segments, drops) @@ -130,15 +149,16 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, // offsets in the fields section index of the file (the final merged file). mergeOpaque := map[int]resetable{} args := map[string]interface{}{ - "chunkMode": chunkMode, - "fieldsSame": fieldsSame, - "fieldsMap": fieldsMap, - "numDocs": numDocs, + "chunkMode": chunkMode, + "fieldsSame": fieldsSame, + "fieldsMap": fieldsMap, + "numDocs": numDocs, + "updatedFields": updatedFields, } if numDocs > 0 { storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, - fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh) + fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh, updatedFields) if err != nil { return nil, 0, 0, nil, nil, 0, err } @@ -358,7 +378,7 @@ type varintEncoder func(uint64) (int, error) func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64, - w *CountHashWriter, closeCh chan struct{}) (uint64, [][]uint64, error) { + w *CountHashWriter, closeCh chan struct{}, updatedFields map[string]*index.FieldInfo) (uint64, [][]uint64, error) { var rv [][]uint64 // The remapped or newDocNums for each segment. var newDocNum uint64 @@ -397,7 +417,7 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // optimize when the field mapping is the same across all // segments and there are no deletions, via byte-copying // of stored docs bytes directly to the writer - if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) { + if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) && len(updatedFields) == 0 { err := segment.copyStoredDocs(newDocNum, docNumOffsets, w) if err != nil { return 0, nil, err @@ -471,6 +491,11 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // now walk the non-"_id" fields in order for fieldID := 1; fieldID < len(fieldsInv); fieldID++ { + if val, ok := updatedFields[fieldsInv[fieldID]]; ok { + if val.Store { + continue + } + } storedFieldValues := vals[fieldID] stf := typs[fieldID] @@ -606,6 +631,28 @@ func mergeFields(segments []*SegmentBase) (bool, []string) { return fieldsSame, rv } +func mergeUpdatedFields(segments []*SegmentBase) map[string]*index.FieldInfo { + fieldInfo := make(map[string]*index.FieldInfo) + + for _, segment := range segments { + for field, info := range segment.updatedFields { + if _, ok := fieldInfo[field]; !ok { + fieldInfo[field] = &index.FieldInfo{ + All: info.All, + Index: info.Index, + Store: info.Store, + } + } else { + fieldInfo[field].All = fieldInfo[field].All || info.All + fieldInfo[field].Index = fieldInfo[field].Index || info.Index + fieldInfo[field].Store = fieldInfo[field].Store || info.Store + } + } + + } + return fieldInfo +} + func isClosed(closeCh chan struct{}) bool { select { case <-closeCh: diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 6242d2b..0bdebe4 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -105,6 +105,9 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se if _, ok := sb.fieldsMap[fieldName]; !ok { continue } + if vo.updatedFields[fieldName].Index { + continue + } // check if the section address is a valid one for "fieldName" in the // segment sb. the local fieldID (fetched by the fieldsMap of the sb) @@ -703,9 +706,10 @@ func (v *faissVectorIndexSection) getvectorIndexOpaque(opaque map[int]resetable) func (v *faissVectorIndexSection) InitOpaque(args map[string]interface{}) resetable { rv := &vectorIndexOpaque{ - fieldAddrs: make(map[uint16]int), - vecIDMap: make(map[int64]*vecInfo), - vecFieldMap: make(map[uint16]*indexContent), + fieldAddrs: make(map[uint16]int), + vecIDMap: make(map[int64]*vecInfo), + vecFieldMap: make(map[uint16]*indexContent), + updatedFields: make(map[string]*index.FieldInfo), } for k, v := range args { rv.Set(k, v) @@ -744,6 +748,8 @@ type vectorIndexOpaque struct { // index to be build. vecFieldMap map[uint16]*indexContent + updatedFields map[string]*index.FieldInfo + tmp0 []byte } @@ -790,4 +796,8 @@ func (v *vectorIndexOpaque) Reset() (err error) { } func (v *vectorIndexOpaque) Set(key string, val interface{}) { + switch key { + case "updatedFields": + v.updatedFields = val.(map[string]*index.FieldInfo) + } } diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index db7b1a9..2975bad 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -82,7 +82,8 @@ func (i *invertedTextIndexSection) AddrForField(opaque map[int]resetable, fieldI func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.Bitmap, fieldsInv []string, fieldsMap map[string]uint16, fieldsSame bool, newDocNumsIn [][]uint64, newSegDocCount uint64, chunkMode uint32, - w *CountHashWriter, closeCh chan struct{}) (map[int]int, uint64, error) { + updatedFields map[string]*index.FieldInfo, w *CountHashWriter, + closeCh chan struct{}) (map[int]int, uint64, error) { var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64) var bufLoc []uint64 @@ -126,9 +127,15 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. return nil, 0, seg.ErrClosed } - dict, err2 := segment.dictionary(fieldName) - if err2 != nil { - return nil, 0, err2 + var dict *Dictionary + var err2 error + if !updatedFields[fieldName].Index { + dict, err2 = segment.dictionary(fieldName) + if err2 != nil { + return nil, 0, err2 + } + } else { + dict = nil } if dict != nil && dict.fst != nil { itr, err2 := dict.fst.Iterator(nil, nil) @@ -244,7 +251,7 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. postItr = postings.iterator(true, true, true, postItr) - if fieldsSame { + if fieldsSame && len(updatedFields) == 0 { // can optimize by copying freq/norm/loc bytes directly lastDocNum, lastFreq, lastNorm, err = mergeTermFreqNormLocsByCopying( term, postItr, newDocNums[itrI], newRoaring, @@ -317,7 +324,9 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. if isClosed(closeCh) { return nil, 0, seg.ErrClosed } - + if updatedFields[fieldName].DocValues { + continue + } fieldIDPlus1 := uint16(segment.fieldsMap[fieldName]) if dvIter, exists := segment.fieldDvReaders[SectionInvertedTextIndex][fieldIDPlus1-1]; exists && dvIter != nil { @@ -398,7 +407,7 @@ func (i *invertedTextIndexSection) Merge(opaque map[int]resetable, segments []*S w *CountHashWriter, closeCh chan struct{}) error { io := i.getInvertedIndexOpaque(opaque) fieldAddrs, _, err := mergeAndPersistInvertedSection(segments, drops, fieldsInv, - io.FieldsMap, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, w, closeCh) + io.FieldsMap, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, io.updatedFields, w, closeCh) if err != nil { return err } @@ -967,6 +976,8 @@ type invertedIndexOpaque struct { fieldAddrs map[int]int + updatedFields map[string]*index.FieldInfo + bytesWritten uint64 fieldsSame bool numDocs uint64 @@ -1034,5 +1045,7 @@ func (i *invertedIndexOpaque) Set(key string, val interface{}) { i.FieldsMap = val.(map[string]uint16) case "numDocs": i.numDocs = val.(uint64) + case "updatedFields": + i.updatedFields = val.(map[string]*index.FieldInfo) } } diff --git a/segment.go b/segment.go index 41abde2..f3bd8f3 100644 --- a/segment.go +++ b/segment.go @@ -25,6 +25,7 @@ import ( "unsafe" "github.com/RoaringBitmap/roaring" + index "github.com/blevesearch/bleve_index_api" mmap "github.com/blevesearch/mmap-go" segment "github.com/blevesearch/scorch_segment_api/v2" "github.com/blevesearch/vellum" @@ -109,6 +110,8 @@ type SegmentBase struct { bytesRead uint64 bytesWritten uint64 + updatedFields map[string]index.FieldInfo + m sync.Mutex fieldFSTs map[uint16]*vellum.FST @@ -952,3 +955,7 @@ func (s *SegmentBase) loadDvReaders() error { return nil } + +func (s *SegmentBase) UpdatedFields() map[string]index.FieldInfo { + return s.updatedFields +}