diff --git a/internal/topo/node/source_node.go b/internal/topo/node/source_node.go index 8f86b7623e..8ca1c7c915 100644 --- a/internal/topo/node/source_node.go +++ b/internal/topo/node/source_node.go @@ -216,6 +216,11 @@ func (m *SourceNode) ingestEof(ctx api.StreamContext) { m.Broadcast(xsql.EOFTuple(0)) } +// GetSource only used for test +func (m *SourceNode) GetSource() api.Source { + return m.s +} + const ( OffsetKey = "$$offset" ) diff --git a/internal/topo/planner/dataSourcePlan.go b/internal/topo/planner/dataSourcePlan.go index 2eae442d1d..018bea35ce 100644 --- a/internal/topo/planner/dataSourcePlan.go +++ b/internal/topo/planner/dataSourcePlan.go @@ -48,6 +48,8 @@ type DataSourcePlan struct { fields map[string]*ast.JsonStreamField metaMap map[string]string pruneFields []string + // inRuleTest means whether in the rule test mode + inRuleTest bool } func (p DataSourcePlan) Init() *DataSourcePlan { diff --git a/internal/topo/planner/planner_source.go b/internal/topo/planner/planner_source.go index 06001f1e56..ba7d64b94b 100644 --- a/internal/topo/planner/planner_source.go +++ b/internal/topo/planner/planner_source.go @@ -35,6 +35,7 @@ func transformSourceNode(ctx api.StreamContext, t *DataSourcePlan, mockSourcesPr mockProps, isMock := mockSourcesProp[string(t.name)] if isMock { t.streamStmt.Options.TYPE = "simulator" + t.inRuleTest = true } strType := t.streamStmt.Options.TYPE if strType == "" { @@ -192,7 +193,7 @@ func splitSource(ctx api.StreamContext, t *DataSourcePlan, ss api.Source, option index++ } - if t.streamStmt.Options.SHARED { + if t.streamStmt.Options.SHARED && !t.inRuleTest { // Create subtopo in the end to avoid errors in the middle srcSubtopo, existed := topo.GetOrCreateSubTopo(string(t.name)) if !existed { diff --git a/internal/topo/topo.go b/internal/topo/topo.go index 8e1335f313..8a35cca671 100644 --- a/internal/topo/topo.go +++ b/internal/topo/topo.go @@ -79,6 +79,11 @@ func NewWithNameAndOptions(name string, options *def.RuleOption) (*Topo, error) return tp, nil } +// GetSourceNodes only for test +func (s *Topo) GetSourceNodes() []node.DataSourceNode { + return s.sources +} + func (s *Topo) SetStreams(streams []string) { if s == nil { return diff --git a/internal/trial/manager_test.go b/internal/trial/manager_test.go index dd2a013df9..16a6e20e03 100644 --- a/internal/trial/manager_test.go +++ b/internal/trial/manager_test.go @@ -25,17 +25,53 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/io/http/httpserver" + "github.com/lf-edge/ekuiper/v2/internal/io/simulator" "github.com/lf-edge/ekuiper/v2/internal/pkg/store" "github.com/lf-edge/ekuiper/v2/internal/processor" + "github.com/lf-edge/ekuiper/v2/internal/topo/node" "github.com/lf-edge/ekuiper/v2/pkg/connection" "github.com/lf-edge/ekuiper/v2/pkg/timex" ) +func TestTrialRuleSharedStream(t *testing.T) { + ip := "127.0.0.1" + port := 10092 + httpserver.InitGlobalServerManager(ip, port, nil) + defer httpserver.ShutDown() + connection.InitConnectionManager4Test() + conf.IsTesting = true + conf.InitConf() + dataDir, err := conf.GetDataLoc() + require.NoError(t, err) + require.NoError(t, store.SetupDefault(dataDir)) + p := processor.NewStreamProcessor() + p.ExecStmt("DROP STREAM sharedemo876") + + _, err = p.ExecStmt("CREATE STREAM sharedemo876 () WITH (DATASOURCE=\"sharedemo876\", SHARED=\"TRUE\")") + require.NoError(t, err) + defer p.ExecStmt("DROP STREAM sharedemo876") + + mockDef1 := `{"id":"sharedrule876","sql":"select * from sharedemo876","mockSource":{"sharedemo876":{"data":[{"name":"demo876","value":1}],"interval":100,"loop":false}},"sinkProps":{"sendSingle":true}}` + id, err := TrialManager.CreateRule(mockDef1) + require.NoError(t, err) + require.Equal(t, "sharedrule876", id) + tp, ok := TrialManager.runs["sharedrule876"] + require.True(t, ok) + srcNodes := tp.topo.GetSourceNodes() + require.Len(t, srcNodes, 1) + srcNode, ok := srcNodes[0].(*node.SourceNode) + require.True(t, ok) + _, ok = srcNode.GetSource().(*simulator.SimulatorSource) + require.True(t, ok) + TrialManager.StopRule("sharedrule876") +} + // Run two test rules in parallel. Rerun one of the rules func TestTrialRule(t *testing.T) { ip := "127.0.0.1" port := 10091 httpserver.InitGlobalServerManager(ip, port, nil) + defer httpserver.ShutDown() connection.InitConnectionManager4Test() conf.IsTesting = true conf.InitConf() @@ -70,7 +106,6 @@ func TestTrialRule(t *testing.T) { // Test 4 Rule without mock testRealSourceTrial(t) - require.Equal(t, 0, len(TrialManager.runs)) } func testValidTrial(t *testing.T, mockDef1 string) {