Skip to content

Commit

Permalink
feat: added support for preview rows
Browse files Browse the repository at this point in the history
  • Loading branch information
solsticemj25 committed Nov 28, 2024
1 parent 4e0947b commit bd9f06c
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 4 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ require (
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
Expand Down
16 changes: 16 additions & 0 deletions plugins/extractors/maxcompute/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/extractors/maxcompute/config"
"github.com/goto/salt/log"
Expand All @@ -14,6 +15,7 @@ import (
type Client struct {
client *odps.Odps
project *odps.Project
tunnel *tunnel.Tunnel
log log.Logger
}

Expand All @@ -23,11 +25,16 @@ func New(conf config.Config) *Client {
client.SetDefaultProjectName(conf.ProjectName)

project := client.Project(conf.ProjectName)
tunnelInstance, err := tunnel.NewTunnelFromProject(project)
if err != nil {
plugins.GetLog().Error("failed to create tunnel", "with error:", err)
}

return &Client{
client: client,
project: project,
log: plugins.GetLog(),
tunnel: tunnelInstance,
}
}

Expand Down Expand Up @@ -71,3 +78,12 @@ func (*Client) GetTableSchema(_ context.Context, table *odps.Table) (string, *ta
}
return table.Type().String(), &tableSchema, nil
}

func (c *Client) MaxPreviewRows(table *odps.Table, partitionValue string, limit int64) int {
records, err := c.tunnel.Preview(table, partitionValue, limit)
if err != nil {
c.log.Error("failed to preview table", "with error:", err)
return 0
}
return len(records)
}
2 changes: 2 additions & 0 deletions plugins/extractors/maxcompute/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ type Config struct {
Schemas []string `mapstructure:"schemas"`
Tables []string `mapstructure:"tables"`
} `mapstructure:"exclude,omitempty"`
MaxPreviewRows int `mapstructure:"max_preview_rows,omitempty"`
// MixValues bool `mapstructure:"mix_values,omitempty"`
Concurrency int `mapstructure:"concurrency,omitempty"`
}
12 changes: 8 additions & 4 deletions plugins/extractors/maxcompute/maxcompute.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ exclude:
tables:
- schema_c.table_a
concurrency: 10
max_preview_rows: 3
`

var info = plugins.Info{
Expand All @@ -66,6 +67,7 @@ type Client interface {
ListSchema(ctx context.Context) ([]*odps.Schema, error)
ListTable(ctx context.Context, schemaName string) ([]*odps.Table, error)
GetTableSchema(ctx context.Context, table *odps.Table) (string, *tableschema.TableSchema, error)
MaxPreviewRows(table *odps.Table, partitionValue string, limit int64) int
}

func New(logger log.Logger, clientFunc NewClientFunc) *Extractor {
Expand Down Expand Up @@ -168,7 +170,7 @@ func (e *Extractor) processTable(ctx context.Context, schema *odps.Schema, table
return nil
}

func (e *Extractor) buildAsset(schema *odps.Schema, _ *odps.Table, tableType string, tableSchema *tableschema.TableSchema) (*v1beta2.Asset, error) {
func (e *Extractor) buildAsset(schema *odps.Schema, table *odps.Table, tableType string, tableSchema *tableschema.TableSchema) (*v1beta2.Asset, error) {
defaultSchema := "default"
schemaName := schema.Name()
if schemaName == "" {
Expand All @@ -187,7 +189,7 @@ func (e *Extractor) buildAsset(schema *odps.Schema, _ *odps.Table, tableType str
Service: "maxcompute",
}

tableAttributesData := e.buildTableAttributesData(schemaName, tableSchema)
tableAttributesData := e.buildTableAttributesData(schemaName, table, tableSchema)

var columns []*v1beta2.Column
for i, col := range tableSchema.Columns {
Expand Down Expand Up @@ -240,7 +242,7 @@ func buildColumns(dataType datatype.DataType) []*v1beta2.Column {
return columns
}

func (e *Extractor) buildTableAttributesData(schemaName string, tableInfo *tableschema.TableSchema) map[string]interface{} {
func (e *Extractor) buildTableAttributesData(schemaName string, table *odps.Table, tableInfo *tableschema.TableSchema) map[string]interface{} {
attributesData := map[string]interface{}{}

attributesData["project_name"] = e.config.ProjectName
Expand All @@ -253,14 +255,16 @@ func (e *Extractor) buildTableAttributesData(schemaName string, tableInfo *table
attributesData["sql"] = tableInfo.ViewText
}

partitionNames := make([]string, len(tableInfo.PartitionColumns))
if tableInfo.PartitionColumns != nil && len(tableInfo.PartitionColumns) > 0 {
partitionNames := make([]string, len(tableInfo.PartitionColumns))
for i, column := range tableInfo.PartitionColumns {
partitionNames[i] = column.Name
}
attributesData["partition_fields"] = partitionNames
}

attributesData["max_preview_rows"] = e.client.MaxPreviewRows(table, "", int64(e.config.MaxPreviewRows))

return attributesData
}

Expand Down
3 changes: 3 additions & 0 deletions plugins/extractors/maxcompute/maxcompute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,15 @@ func TestExtract(t *testing.T) {
"secret": "access_key_secret",
},
"endpoint_project": "https://example.com/some-api",
"max_preview_rows": 3,
},
}, func(mockClient *mocks.MaxComputeClient) {
mockClient.EXPECT().ListSchema(mock.Anything).Return(schema1, nil)
mockClient.EXPECT().ListTable(mock.Anything, "my_schema").Return(table1, nil)
mockClient.EXPECT().MaxPreviewRows(table1[0], 0, 10).Return(3)
mockClient.EXPECT().GetTableSchema(mock.Anything, table1[0]).Return("VIRTUAL_VIEW", schemaMapping[table1[0].Name()], nil)
mockClient.EXPECT().ListTable(mock.Anything, "my_schema").Return(table1[1:], nil)
mockClient.EXPECT().MaxPreviewRows(table1[1], 0, 10).Return(3)
mockClient.EXPECT().GetTableSchema(mock.Anything, table1[1]).Return("MANAGED_TABLE", schemaMapping[table1[1].Name()], nil)
})

Expand Down
48 changes: 48 additions & 0 deletions plugins/extractors/maxcompute/mocks/maxcompute_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit bd9f06c

Please sign in to comment.