diff --git a/index/scorch/introducer.go b/index/scorch/introducer.go index 0d472ac8b..d7864ddb8 100644 --- a/index/scorch/introducer.go +++ b/index/scorch/introducer.go @@ -429,6 +429,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { } skipped := true + // make the newly merged segments part of the newSnapshot being constructed for i, newMergedSegment := range nextMerge.new { // checking if this newly merged segment is worth keeping based on // obsoleted doc count since the merge intro started diff --git a/index/scorch/merge.go b/index/scorch/merge.go index be6de3863..1e318237d 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -443,6 +443,9 @@ type mergeTaskIntroStatus struct { skipped bool } +// this is important when it comes to introducing multiple merged segments in a +// single introducer channel push. That way there is a check to ensure that the +// file count doesn't explode during the index's lifetime. type mergedSegmentHistory struct { workerID uint64 oldNewDocIDs []uint64 @@ -501,6 +504,9 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1) filename := zapFileName(newSegmentID) path := s.path + string(os.PathSeparator) + filename + + // the newly merged segment is already flushed out to disk, just needs + // to be opened using mmap. newDocNums, _, err := s.segPlugin.Merge(segsBatch, dropsBatch, path, s.closeCh, s) if err != nil { @@ -527,7 +533,7 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj // close the new merged segments _ = closeNewMergedSegments(newMergedSegments) - // tbd: need a better way to handle error + // tbd: need a better way to consolidate errors return nil, nil, errs[0] } diff --git a/index/scorch/persister.go b/index/scorch/persister.go index c2d6754af..b48b49711 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -369,11 +369,13 @@ type flushable struct { totDocs uint64 } -var DefaultNumPersisterWorkers = 1 +// number workers which parallely perform an in-memory merge of the segments followed +// by a flush operation. +var DefaultNumPersisterWorkers = 4 // maximum size of data that a single worker is allowed to perform the in-memory // merge operation. -var DefaultMaxSizeInMemoryMerge = 0 +var DefaultMaxSizeInMemoryMerge = 200 * 1024 * 1024 func legacyFlushBehaviour() bool { // DefaultMaxSizeInMemoryMerge = 0 is a special value to preserve the leagcy @@ -417,6 +419,8 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) ( flushSet = append(flushSet, val) } else { + // constructs a flushSet where each flushable object contains a set of segments + // to be merged and flushed out to disk. for i, snapshot := range snapshot.segment { if totSize >= DefaultMaxSizeInMemoryMerge { if len(sbs) >= DefaultMinSegmentsForInMemoryMerge { @@ -480,12 +484,7 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) ( return false, nil } - // deploy the workers, have a wait group which waits for the flush set to complete - // each worker - // 1. merges the segments using mergeSegmentBases() - // wait for group to finish - // - // construct equiv snapshot and do a persistSnapshotDirect() + // drains out (after merging in memory) the segments in the flushSet parallely newSnapshot, newSegmentIDs, err := s.mergeSegmentBasesParallel(snapshot, flushSet) if err != nil { return false, err @@ -694,7 +693,8 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, } filenames = append(filenames, filename) case segment.UnpersistedSegment: - // need to persist this to disk + // need to persist this to disk if its not part of exclude list (which + // restricts which in-memory segment to be persisted to disk) if _, ok := exclude[segmentSnapshot.id]; !ok { filename := zapFileName(segmentSnapshot.id) path := filepath.Join(path, filename) diff --git a/index/scorch/scorch_test.go b/index/scorch/scorch_test.go index d21156dd9..8165774e7 100644 --- a/index/scorch/scorch_test.go +++ b/index/scorch/scorch_test.go @@ -438,7 +438,6 @@ func TestIndexInsertThenDelete(t *testing.T) { t.Fatal(err) } - fmt.Println("start delete") err = idx.Delete("1") if err != nil { t.Errorf("Error deleting entry from index: %v", err) diff --git a/index/scorch/snapshot_index_vr.go b/index/scorch/snapshot_index_vr.go index 2d226214a..320364bc7 100644 --- a/index/scorch/snapshot_index_vr.go +++ b/index/scorch/snapshot_index_vr.go @@ -118,6 +118,7 @@ func (i *IndexSnapshotVectorReader) Next(preAlloced *index.VectorDoc) ( nnum := next.Number() rv.ID = docNumberToBytes(rv.ID, nnum+globalOffset) rv.Score = float64(next.Score()) + i.currID = rv.ID i.currPosting = next diff --git a/search/scorer/scorer_term.go b/search/scorer/scorer_term.go index 883af40ab..ca268648b 100644 --- a/search/scorer/scorer_term.go +++ b/search/scorer/scorer_term.go @@ -94,6 +94,7 @@ func (s *TermQueryScorer) SetQueryNorm(qnorm float64) { // update the query weight s.queryWeight = s.queryBoost * s.idf * s.queryNorm + if s.options.Explain { childrenExplanations := make([]*search.Explanation, 3) childrenExplanations[0] = &search.Explanation{