Skip to content

Commit

Permalink
enhance: Append drop partition msg to wal (milvus-io#35326)
Browse files Browse the repository at this point in the history
issue: milvus-io#33285

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Aug 7, 2024
1 parent 838f063 commit 72a1754
Show file tree
Hide file tree
Showing 14 changed files with 295 additions and 76 deletions.
3 changes: 3 additions & 0 deletions internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ dir: 'internal/mocks/{{trimPrefix .PackagePath "github.com/milvus-io/milvus/inte
mockname: "Mock{{.InterfaceName}}"
outpkg: "mock_{{.PackageName}}"
packages:
github.com/milvus-io/milvus/internal/distributed/streaming:
interfaces:
WALAccesser:
github.com/milvus-io/milvus/internal/streamingcoord/server/balancer:
interfaces:
Balancer:
Expand Down
10 changes: 7 additions & 3 deletions internal/distributed/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/options"
)

var singleton *walAccesserImpl = nil
var singleton WALAccesser = nil

func SetWAL(w WALAccesser) {
singleton = w
}

// Init initializes the wal accesser with the given etcd client.
// should be called before any other operations.
Expand All @@ -19,8 +23,8 @@ func Init(c *clientv3.Client) {

// Release releases the resources of the wal accesser.
func Release() {
if singleton != nil {
singleton.Close()
if w, ok := singleton.(*walAccesserImpl); ok && w != nil {
w.Close()
}
}

Expand Down
141 changes: 141 additions & 0 deletions internal/mocks/distributed/mock_streaming/mock_WALAccesser.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions internal/rootcoord/drop_collection_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,14 @@ func Test_dropCollectionTask_Execute(t *testing.T) {
return true
}

gc := newMockGarbageCollector()
gc := mockrootcoord.NewGarbageCollector(t)
deleteCollectionCalled := false
deleteCollectionChan := make(chan struct{}, 1)
gc.GcCollectionDataFunc = func(ctx context.Context, coll *model.Collection) (Timestamp, error) {
gc.EXPECT().GcCollectionData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, coll *model.Collection) (Timestamp, error) {
deleteCollectionCalled = true
deleteCollectionChan <- struct{}{}
return 0, nil
}
})

core := newTestCore(
withValidProxyManager(),
Expand Down
1 change: 1 addition & 0 deletions internal/rootcoord/drop_partition_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
redoTask.AddAsyncStep(&deletePartitionDataStep{
baseStep: baseStep{core: t.core},
pchans: t.collMeta.PhysicalChannelNames,
vchans: t.collMeta.VirtualChannelNames,
partition: &model.Partition{
PartitionID: partID,
PartitionName: t.Req.GetPartitionName(),
Expand Down
6 changes: 3 additions & 3 deletions internal/rootcoord/drop_partition_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ func Test_dropPartitionTask_Execute(t *testing.T) {
return nil
})

gc := newMockGarbageCollector()
gc := mockrootcoord.NewGarbageCollector(t)
deletePartitionCalled := false
deletePartitionChan := make(chan struct{}, 1)
gc.GcPartitionDataFunc = func(ctx context.Context, pChannels []string, coll *model.Partition) (Timestamp, error) {
gc.EXPECT().GcPartitionData(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pChannels, vchannel []string, coll *model.Partition) (Timestamp, error) {
deletePartitionChan <- struct{}{}
deletePartitionCalled = true
time.Sleep(confirmGCInterval)
return 0, nil
}
})

broker := newMockBroker()
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
Expand Down
53 changes: 48 additions & 5 deletions internal/rootcoord/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/streamingutil"
ms "github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
)

//go:generate mockery --name=GarbageCollector --outpkg=mockrootcoord --filename=garbage_collector.go --with-expecter --testonly
type GarbageCollector interface {
ReDropCollection(collMeta *model.Collection, ts Timestamp)
RemoveCreatingCollection(collMeta *model.Collection)
ReDropPartition(dbID int64, pChannels []string, partition *model.Partition, ts Timestamp)
ReDropPartition(dbID int64, pChannels, vchannels []string, partition *model.Partition, ts Timestamp)
RemoveCreatingPartition(dbID int64, partition *model.Partition, ts Timestamp)
GcCollectionData(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error)
GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (ddlTs Timestamp, err error)
GcPartitionData(ctx context.Context, pChannels, vchannels []string, partition *model.Partition) (ddlTs Timestamp, err error)
}

type bgGarbageCollector struct {
Expand Down Expand Up @@ -110,14 +113,15 @@ func (c *bgGarbageCollector) RemoveCreatingCollection(collMeta *model.Collection
_ = redo.Execute(context.Background())
}

func (c *bgGarbageCollector) ReDropPartition(dbID int64, pChannels []string, partition *model.Partition, ts Timestamp) {
func (c *bgGarbageCollector) ReDropPartition(dbID int64, pChannels, vchannels []string, partition *model.Partition, ts Timestamp) {
// TODO: remove this after data gc can be notified by rpc.
c.s.chanTimeTick.addDmlChannels(pChannels...)

redo := newBaseRedoTask(c.s.stepExecutor)
redo.AddAsyncStep(&deletePartitionDataStep{
baseStep: baseStep{core: c.s},
pchans: pChannels,
vchans: vchannels,
partition: partition,
isSkip: !Params.CommonCfg.TTMsgEnabled.GetAsBool(),
})
Expand Down Expand Up @@ -227,6 +231,41 @@ func (c *bgGarbageCollector) notifyPartitionGc(ctx context.Context, pChannels []
return ts, nil
}

func (c *bgGarbageCollector) notifyPartitionGcByStreamingService(ctx context.Context, vchannels []string, partition *model.Partition) (uint64, error) {
req := &msgpb.DropPartitionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DropPartition),
commonpbutil.WithTimeStamp(0), // ts is given by streamingnode.
commonpbutil.WithSourceID(c.s.session.ServerID),
),
PartitionName: partition.PartitionName,
CollectionID: partition.CollectionID,
PartitionID: partition.PartitionID,
}

msgs := make([]message.MutableMessage, 0, len(vchannels))
for _, vchannel := range vchannels {
msg, err := message.NewDropPartitionMessageBuilderV1().
WithVChannel(vchannel).
WithHeader(&message.DropPartitionMessageHeader{
CollectionId: partition.CollectionID,
PartitionId: partition.PartitionID,
}).
WithBody(req).
BuildMutable()
if err != nil {
return 0, err
}
msgs = append(msgs, msg)
}
resp := streaming.WAL().Append(ctx, msgs...)
if err := resp.IsAnyError(); err != nil {
return 0, err
}
// TODO: sheep, return resp.MaxTimeTick(), nil
return c.s.tsoAllocator.GenerateTSO(1)
}

func (c *bgGarbageCollector) GcCollectionData(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error) {
c.s.ddlTsLockManager.Lock()
c.s.ddlTsLockManager.AddRefCnt(1)
Expand All @@ -241,13 +280,17 @@ func (c *bgGarbageCollector) GcCollectionData(ctx context.Context, coll *model.C
return ddlTs, nil
}

func (c *bgGarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (ddlTs Timestamp, err error) {
func (c *bgGarbageCollector) GcPartitionData(ctx context.Context, pChannels, vchannels []string, partition *model.Partition) (ddlTs Timestamp, err error) {
c.s.ddlTsLockManager.Lock()
c.s.ddlTsLockManager.AddRefCnt(1)
defer c.s.ddlTsLockManager.AddRefCnt(-1)
defer c.s.ddlTsLockManager.Unlock()

ddlTs, err = c.notifyPartitionGc(ctx, pChannels, partition)
if streamingutil.IsStreamingServiceEnabled() {
ddlTs, err = c.notifyPartitionGcByStreamingService(ctx, vchannels, partition)
} else {
ddlTs, err = c.notifyPartitionGc(ctx, pChannels, partition)
}
if err != nil {
return 0, err
}
Expand Down
30 changes: 27 additions & 3 deletions internal/rootcoord/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ import (
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"

"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming"
"github.com/milvus-io/milvus/internal/proto/querypb"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
mocktso "github.com/milvus-io/milvus/internal/tso/mocks"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
)
Expand Down Expand Up @@ -353,7 +356,7 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropPartition(0, pchans, &model.Partition{}, 100000)
gc.ReDropPartition(0, pchans, nil, &model.Partition{}, 100000)
})

t.Run("failed to RemovePartition", func(t *testing.T) {
Expand Down Expand Up @@ -393,7 +396,7 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropPartition(0, pchans, &model.Partition{}, 100000)
gc.ReDropPartition(0, pchans, nil, &model.Partition{}, 100000)
<-gcConfirmChan
assert.True(t, gcConfirmCalled)
<-removePartitionChan
Expand Down Expand Up @@ -438,7 +441,7 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropPartition(0, pchans, &model.Partition{}, 100000)
gc.ReDropPartition(0, pchans, nil, &model.Partition{}, 100000)
<-gcConfirmChan
assert.True(t, gcConfirmCalled)
<-removePartitionChan
Expand Down Expand Up @@ -536,3 +539,24 @@ func TestGarbageCollector_RemoveCreatingPartition(t *testing.T) {
<-signal
})
}

func TestGcPartitionData(t *testing.T) {
defer cleanTestEnv()

streamingutil.SetStreamingServiceEnabled()
defer streamingutil.UnsetStreamingServiceEnabled()

wal := mock_streaming.NewMockWALAccesser(t)
wal.EXPECT().Append(mock.Anything, mock.Anything, mock.Anything).Return(streaming.AppendResponses{})
streaming.SetWAL(wal)

tsoAllocator := mocktso.NewAllocator(t)
tsoAllocator.EXPECT().GenerateTSO(mock.Anything).Return(1000, nil)

core := newTestCore(withTsoAllocator(tsoAllocator))
gc := newBgGarbageCollector(core)
core.ddlTsLockManager = newDdlTsLockManager(tsoAllocator)

_, err := gc.GcPartitionData(context.Background(), nil, []string{"ch-0", "ch-1"}, &model.Partition{})
assert.NoError(t, err)
}
Loading

0 comments on commit 72a1754

Please sign in to comment.