diff --git a/go.mod b/go.mod index 08fac947..f4ff9cf4 100644 --- a/go.mod +++ b/go.mod @@ -240,6 +240,7 @@ require ( go.uber.org/multierr v1.9.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.22.0 // indirect + golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect golang.org/x/mod v0.11.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/sys v0.19.0 // indirect diff --git a/plugins/extractors/maxcompute/client/client.go b/plugins/extractors/maxcompute/client/client.go index 35a980ed..3d9dc205 100644 --- a/plugins/extractors/maxcompute/client/client.go +++ b/plugins/extractors/maxcompute/client/client.go @@ -6,6 +6,7 @@ import ( "github.com/aliyun/aliyun-odps-go-sdk/odps" "github.com/aliyun/aliyun-odps-go-sdk/odps/account" "github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema" + "github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel" "github.com/goto/meteor/plugins" "github.com/goto/meteor/plugins/extractors/maxcompute/config" "github.com/goto/salt/log" @@ -14,6 +15,7 @@ import ( type Client struct { client *odps.Odps project *odps.Project + tunnel *tunnel.Tunnel log log.Logger } @@ -23,11 +25,16 @@ func New(conf config.Config) *Client { client.SetDefaultProjectName(conf.ProjectName) project := client.Project(conf.ProjectName) + tunnelInstance, err := tunnel.NewTunnelFromProject(project) + if err != nil { + plugins.GetLog().Error("failed to create tunnel", "with error:", err) + } return &Client{ client: client, project: project, log: plugins.GetLog(), + tunnel: tunnelInstance, } } @@ -71,3 +78,12 @@ func (*Client) GetTableSchema(_ context.Context, table *odps.Table) (string, *ta } return table.Type().String(), &tableSchema, nil } + +func (c *Client) MaxPreviewRows(table *odps.Table, partitionValue string, limit int64) int { + records, err := c.tunnel.Preview(table, partitionValue, limit) + if err != nil { + c.log.Error("failed to preview table", "with error:", err) + return 0 + } + return len(records) +} diff --git a/plugins/extractors/maxcompute/config/config.go b/plugins/extractors/maxcompute/config/config.go index a0f17a5a..8b9836f5 100644 --- a/plugins/extractors/maxcompute/config/config.go +++ b/plugins/extractors/maxcompute/config/config.go @@ -12,5 +12,7 @@ type Config struct { Schemas []string `mapstructure:"schemas"` Tables []string `mapstructure:"tables"` } `mapstructure:"exclude,omitempty"` + MaxPreviewRows int `mapstructure:"max_preview_rows,omitempty"` + // MixValues bool `mapstructure:"mix_values,omitempty"` Concurrency int `mapstructure:"concurrency,omitempty"` } diff --git a/plugins/extractors/maxcompute/maxcompute.go b/plugins/extractors/maxcompute/maxcompute.go index 7741020b..706c6b02 100644 --- a/plugins/extractors/maxcompute/maxcompute.go +++ b/plugins/extractors/maxcompute/maxcompute.go @@ -52,6 +52,7 @@ exclude: tables: - schema_c.table_a concurrency: 10 +max_preview_rows: 3 ` var info = plugins.Info{ @@ -66,6 +67,7 @@ type Client interface { ListSchema(ctx context.Context) ([]*odps.Schema, error) ListTable(ctx context.Context, schemaName string) ([]*odps.Table, error) GetTableSchema(ctx context.Context, table *odps.Table) (string, *tableschema.TableSchema, error) + MaxPreviewRows(table *odps.Table, partitionValue string, limit int64) int } func New(logger log.Logger, clientFunc NewClientFunc) *Extractor { @@ -168,7 +170,7 @@ func (e *Extractor) processTable(ctx context.Context, schema *odps.Schema, table return nil } -func (e *Extractor) buildAsset(schema *odps.Schema, _ *odps.Table, tableType string, tableSchema *tableschema.TableSchema) (*v1beta2.Asset, error) { +func (e *Extractor) buildAsset(schema *odps.Schema, table *odps.Table, tableType string, tableSchema *tableschema.TableSchema) (*v1beta2.Asset, error) { defaultSchema := "default" schemaName := schema.Name() if schemaName == "" { @@ -187,7 +189,7 @@ func (e *Extractor) buildAsset(schema *odps.Schema, _ *odps.Table, tableType str Service: "maxcompute", } - tableAttributesData := e.buildTableAttributesData(schemaName, tableSchema) + tableAttributesData := e.buildTableAttributesData(schemaName, table, tableSchema) var columns []*v1beta2.Column for i, col := range tableSchema.Columns { @@ -240,7 +242,7 @@ func buildColumns(dataType datatype.DataType) []*v1beta2.Column { return columns } -func (e *Extractor) buildTableAttributesData(schemaName string, tableInfo *tableschema.TableSchema) map[string]interface{} { +func (e *Extractor) buildTableAttributesData(schemaName string, table *odps.Table, tableInfo *tableschema.TableSchema) map[string]interface{} { attributesData := map[string]interface{}{} attributesData["project_name"] = e.config.ProjectName @@ -253,14 +255,16 @@ func (e *Extractor) buildTableAttributesData(schemaName string, tableInfo *table attributesData["sql"] = tableInfo.ViewText } + partitionNames := make([]string, len(tableInfo.PartitionColumns)) if tableInfo.PartitionColumns != nil && len(tableInfo.PartitionColumns) > 0 { - partitionNames := make([]string, len(tableInfo.PartitionColumns)) for i, column := range tableInfo.PartitionColumns { partitionNames[i] = column.Name } attributesData["partition_fields"] = partitionNames } + attributesData["max_preview_rows"] = e.client.MaxPreviewRows(table, "", int64(e.config.MaxPreviewRows)) + return attributesData } diff --git a/plugins/extractors/maxcompute/maxcompute_test.go b/plugins/extractors/maxcompute/maxcompute_test.go index 3caa88f1..18dcf7da 100644 --- a/plugins/extractors/maxcompute/maxcompute_test.go +++ b/plugins/extractors/maxcompute/maxcompute_test.go @@ -180,12 +180,15 @@ func TestExtract(t *testing.T) { "secret": "access_key_secret", }, "endpoint_project": "https://example.com/some-api", + "max_preview_rows": 3, }, }, func(mockClient *mocks.MaxComputeClient) { mockClient.EXPECT().ListSchema(mock.Anything).Return(schema1, nil) mockClient.EXPECT().ListTable(mock.Anything, "my_schema").Return(table1, nil) + mockClient.EXPECT().MaxPreviewRows(table1[0], 0, 10).Return(3) mockClient.EXPECT().GetTableSchema(mock.Anything, table1[0]).Return("VIRTUAL_VIEW", schemaMapping[table1[0].Name()], nil) mockClient.EXPECT().ListTable(mock.Anything, "my_schema").Return(table1[1:], nil) + mockClient.EXPECT().MaxPreviewRows(table1[1], 0, 10).Return(3) mockClient.EXPECT().GetTableSchema(mock.Anything, table1[1]).Return("MANAGED_TABLE", schemaMapping[table1[1].Name()], nil) }) diff --git a/plugins/extractors/maxcompute/mocks/maxcompute_client_mock.go b/plugins/extractors/maxcompute/mocks/maxcompute_client_mock.go index de4f1dbe..919e258d 100644 --- a/plugins/extractors/maxcompute/mocks/maxcompute_client_mock.go +++ b/plugins/extractors/maxcompute/mocks/maxcompute_client_mock.go @@ -208,6 +208,54 @@ func (_c *MaxComputeClient_ListTable_Call) RunAndReturn(run func(context.Context return _c } +// MaxPreviewRows provides a mock function with given fields: table, partitionValue, limit +func (_m *MaxComputeClient) MaxPreviewRows(table *odps.Table, partitionValue string, limit int64) int { + ret := _m.Called(table, partitionValue, limit) + + if len(ret) == 0 { + panic("no return value specified for MaxPreviewRows") + } + + var r0 int + if rf, ok := ret.Get(0).(func(*odps.Table, string, int64) int); ok { + r0 = rf(table, partitionValue, limit) + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + +// MaxComputeClient_MaxPreviewRows_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MaxPreviewRows' +type MaxComputeClient_MaxPreviewRows_Call struct { + *mock.Call +} + +// MaxPreviewRows is a helper method to define mock.On call +// - table *odps.Table +// - partitionValue string +// - limit int64 +func (_e *MaxComputeClient_Expecter) MaxPreviewRows(table interface{}, partitionValue interface{}, limit interface{}) *MaxComputeClient_MaxPreviewRows_Call { + return &MaxComputeClient_MaxPreviewRows_Call{Call: _e.mock.On("MaxPreviewRows", table, partitionValue, limit)} +} + +func (_c *MaxComputeClient_MaxPreviewRows_Call) Run(run func(table *odps.Table, partitionValue string, limit int64)) *MaxComputeClient_MaxPreviewRows_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*odps.Table), args[1].(string), args[2].(int64)) + }) + return _c +} + +func (_c *MaxComputeClient_MaxPreviewRows_Call) Return(_a0 int) *MaxComputeClient_MaxPreviewRows_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MaxComputeClient_MaxPreviewRows_Call) RunAndReturn(run func(*odps.Table, string, int64) int) *MaxComputeClient_MaxPreviewRows_Call { + _c.Call.Return(run) + return _c +} + // NewMaxComputeClient creates a new instance of MaxComputeClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMaxComputeClient(t interface {