Skip to content

Commit

Permalink
fix: fix ruletest when shared stream (#3539)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer authored Feb 11, 2025
1 parent daf5d6e commit a73d1d0
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 2 deletions.
5 changes: 5 additions & 0 deletions internal/topo/node/source_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ func (m *SourceNode) ingestEof(ctx api.StreamContext) {
m.Broadcast(xsql.EOFTuple(0))
}

// GetSource only used for test
func (m *SourceNode) GetSource() api.Source {
return m.s
}

const (
OffsetKey = "$$offset"
)
Expand Down
2 changes: 2 additions & 0 deletions internal/topo/planner/dataSourcePlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type DataSourcePlan struct {
fields map[string]*ast.JsonStreamField
metaMap map[string]string
pruneFields []string
// inRuleTest means whether in the rule test mode
inRuleTest bool
}

func (p DataSourcePlan) Init() *DataSourcePlan {
Expand Down
3 changes: 2 additions & 1 deletion internal/topo/planner/planner_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func transformSourceNode(ctx api.StreamContext, t *DataSourcePlan, mockSourcesPr
mockProps, isMock := mockSourcesProp[string(t.name)]
if isMock {
t.streamStmt.Options.TYPE = "simulator"
t.inRuleTest = true
}
strType := t.streamStmt.Options.TYPE
if strType == "" {
Expand Down Expand Up @@ -192,7 +193,7 @@ func splitSource(ctx api.StreamContext, t *DataSourcePlan, ss api.Source, option
index++
}

if t.streamStmt.Options.SHARED {
if t.streamStmt.Options.SHARED && !t.inRuleTest {
// Create subtopo in the end to avoid errors in the middle
srcSubtopo, existed := topo.GetOrCreateSubTopo(string(t.name))
if !existed {
Expand Down
5 changes: 5 additions & 0 deletions internal/topo/topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func NewWithNameAndOptions(name string, options *def.RuleOption) (*Topo, error)
return tp, nil
}

// GetSourceNodes only for test
func (s *Topo) GetSourceNodes() []node.DataSourceNode {
return s.sources
}

func (s *Topo) SetStreams(streams []string) {
if s == nil {
return
Expand Down
37 changes: 36 additions & 1 deletion internal/trial/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,53 @@ import (

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/io/http/httpserver"
"github.com/lf-edge/ekuiper/v2/internal/io/simulator"
"github.com/lf-edge/ekuiper/v2/internal/pkg/store"
"github.com/lf-edge/ekuiper/v2/internal/processor"
"github.com/lf-edge/ekuiper/v2/internal/topo/node"
"github.com/lf-edge/ekuiper/v2/pkg/connection"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
)

func TestTrialRuleSharedStream(t *testing.T) {
ip := "127.0.0.1"
port := 10092
httpserver.InitGlobalServerManager(ip, port, nil)
defer httpserver.ShutDown()
connection.InitConnectionManager4Test()
conf.IsTesting = true
conf.InitConf()
dataDir, err := conf.GetDataLoc()
require.NoError(t, err)
require.NoError(t, store.SetupDefault(dataDir))
p := processor.NewStreamProcessor()
p.ExecStmt("DROP STREAM sharedemo876")

_, err = p.ExecStmt("CREATE STREAM sharedemo876 () WITH (DATASOURCE=\"sharedemo876\", SHARED=\"TRUE\")")
require.NoError(t, err)
defer p.ExecStmt("DROP STREAM sharedemo876")

mockDef1 := `{"id":"sharedrule876","sql":"select * from sharedemo876","mockSource":{"sharedemo876":{"data":[{"name":"demo876","value":1}],"interval":100,"loop":false}},"sinkProps":{"sendSingle":true}}`
id, err := TrialManager.CreateRule(mockDef1)
require.NoError(t, err)
require.Equal(t, "sharedrule876", id)
tp, ok := TrialManager.runs["sharedrule876"]
require.True(t, ok)
srcNodes := tp.topo.GetSourceNodes()
require.Len(t, srcNodes, 1)
srcNode, ok := srcNodes[0].(*node.SourceNode)
require.True(t, ok)
_, ok = srcNode.GetSource().(*simulator.SimulatorSource)
require.True(t, ok)
TrialManager.StopRule("sharedrule876")
}

// Run two test rules in parallel. Rerun one of the rules
func TestTrialRule(t *testing.T) {
ip := "127.0.0.1"
port := 10091
httpserver.InitGlobalServerManager(ip, port, nil)
defer httpserver.ShutDown()
connection.InitConnectionManager4Test()
conf.IsTesting = true
conf.InitConf()
Expand Down Expand Up @@ -70,7 +106,6 @@ func TestTrialRule(t *testing.T) {

// Test 4 Rule without mock
testRealSourceTrial(t)
require.Equal(t, 0, len(TrialManager.runs))
}

func testValidTrial(t *testing.T, mockDef1 string) {
Expand Down

0 comments on commit a73d1d0

Please sign in to comment.