Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(maxcompute): added support for preview rows #72

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ require (
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/extractors/bigquery/auditlog"
"github.com/goto/meteor/plugins/extractors/bigquery/upstream"
"github.com/goto/meteor/plugins/internal/upstream"
"github.com/goto/meteor/registry"
"github.com/goto/meteor/utils"
"github.com/goto/salt/log"
Expand Down
49 changes: 41 additions & 8 deletions plugins/extractors/maxcompute/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,39 @@ import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/goto/meteor/plugins"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"github.com/goto/meteor/plugins/extractors/maxcompute/config"
"github.com/goto/salt/log"
"google.golang.org/protobuf/types/known/structpb"
)

type Client struct {
client *odps.Odps
project *odps.Project
log log.Logger
tunnel *tunnel.Tunnel
}

func New(conf config.Config) *Client {
func New(conf config.Config) (*Client, error) {
aliAccount := account.NewAliyunAccount(conf.AccessKey.ID, conf.AccessKey.Secret)
client := odps.NewOdps(aliAccount, conf.EndpointProject)
client.SetDefaultProjectName(conf.ProjectName)

project := client.Project(conf.ProjectName)
tunnelInstance, err := tunnel.NewTunnelFromProject(project)
if err != nil {
return nil, err
}

return &Client{
client: client,
project: project,
log: plugins.GetLog(),
}
tunnel: tunnelInstance,
}, nil
}

func (c *Client) ListSchema(context.Context) (schemas []*odps.Schema, err error) {
err = c.project.Schemas().List(func(schema *odps.Schema, err2 error) {
if err2 != nil {
err = err2
c.log.Error("failed to process schema", "with error:", err)
return
}
schemas = append(schemas, schema)
Expand All @@ -50,7 +53,6 @@ func (c *Client) ListTable(_ context.Context, schemaName string) (tables []*odps
func(table *odps.Table, err2 error) {
if err2 != nil {
err = err2
c.log.Error("failed to process table", "with error:", err)
return
}
tables = append(tables, table)
Expand All @@ -71,3 +73,34 @@ func (*Client) GetTableSchema(_ context.Context, table *odps.Table) (string, *ta
}
return table.Type().String(), &tableSchema, nil
}

func (c *Client) GetTablePreview(_ context.Context, partitionValue string, table *odps.Table, maxRows int) (
previewFields []string, previewRows *structpb.ListValue, err error,
) {
if table.Type().String() == config.TableTypeView {
return nil, nil, nil
}

records, err := c.tunnel.Preview(table, partitionValue, int64(maxRows))
haveiss marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, nil, err
}

columnNames := make([]string, len(table.Schema().Columns))
for i, column := range table.Schema().Columns {
columnNames[i] = column.Name
}

var protoRows []*structpb.Value
for _, record := range records {
var rowValues []*structpb.Value
for _, value := range record {
rowValues = append(rowValues, structpb.NewStringValue(value.String()))
}
protoRows = append(protoRows, structpb.NewListValue(&structpb.ListValue{Values: rowValues}))
}

protoList := &structpb.ListValue{Values: protoRows}

return columnNames, protoList, nil
}
7 changes: 6 additions & 1 deletion plugins/extractors/maxcompute/config/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package config

const TableTypeView = "VIRTUAL_VIEW"

type Config struct {
ProjectName string `mapstructure:"project_name"`
EndpointProject string `mapstructure:"endpoint_project"`
Expand All @@ -12,5 +14,8 @@ type Config struct {
Schemas []string `mapstructure:"schemas"`
Tables []string `mapstructure:"tables"`
} `mapstructure:"exclude,omitempty"`
Concurrency int `mapstructure:"concurrency,omitempty"`
MaxPreviewRows int `mapstructure:"max_preview_rows,omitempty"`
MixValues bool `mapstructure:"mix_values,omitempty"`
Concurrency int `mapstructure:"concurrency,omitempty"`
BuildViewLineage bool `mapstructure:"build_view_lineage,omitempty"`
}
125 changes: 121 additions & 4 deletions plugins/extractors/maxcompute/maxcompute.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,29 @@ import (
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/extractors/maxcompute/client"
"github.com/goto/meteor/plugins/extractors/maxcompute/config"
"github.com/goto/meteor/plugins/internal/upstream"
"github.com/goto/meteor/registry"
"github.com/goto/meteor/utils"
"github.com/goto/salt/log"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

type Extractor struct {
plugins.BaseExtractor
logger log.Logger
config config.Config
randFn randFn

client Client
newClient NewClientFunc
eg *errgroup.Group
}

type randFn func(rndSeed int64) func(int64) int64

type NewClientFunc func(ctx context.Context, logger log.Logger, conf config.Config) (Client, error)

//go:embed README.md
Expand All @@ -52,6 +57,9 @@ exclude:
tables:
- schema_c.table_a
concurrency: 10
max_preview_rows: 3
mix_values: false
build_view_lineage: true,
`

var info = plugins.Info{
Expand All @@ -66,6 +74,7 @@ type Client interface {
ListSchema(ctx context.Context) ([]*odps.Schema, error)
ListTable(ctx context.Context, schemaName string) ([]*odps.Table, error)
GetTableSchema(ctx context.Context, table *odps.Table) (string, *tableschema.TableSchema, error)
GetTablePreview(ctx context.Context, partitionValue string, table *odps.Table, maxRows int) ([]string, *structpb.ListValue, error)
}

func New(logger log.Logger, clientFunc NewClientFunc) *Extractor {
Expand Down Expand Up @@ -158,7 +167,7 @@ func (e *Extractor) processTable(ctx context.Context, schema *odps.Schema, table
return err
}

asset, err := e.buildAsset(schema, table, tableType, tableSchema)
asset, err := e.buildAsset(ctx, schema, table, tableType, tableSchema)
if err != nil {
e.logger.Error("failed to build asset", "table", table.Name(), "error", err)
return err
Expand All @@ -168,7 +177,9 @@ func (e *Extractor) processTable(ctx context.Context, schema *odps.Schema, table
return nil
}

func (e *Extractor) buildAsset(schema *odps.Schema, _ *odps.Table, tableType string, tableSchema *tableschema.TableSchema) (*v1beta2.Asset, error) {
func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema,
table *odps.Table, tableType string, tableSchema *tableschema.TableSchema,
) (*v1beta2.Asset, error) {
defaultSchema := "default"
schemaName := schema.Name()
if schemaName == "" {
Expand All @@ -177,6 +188,14 @@ func (e *Extractor) buildAsset(schema *odps.Schema, _ *odps.Table, tableType str

tableURN := plugins.MaxComputeURN(e.config.ProjectName, schemaName, tableSchema.TableName)

var previewFields []string
var previewRows *structpb.ListValue
var err error
previewFields, previewRows, err = e.buildPreview(ctx, table, tableSchema)
if err != nil {
e.logger.Warn("error building preview", "err", err, "table", tableSchema.TableName)
}

asset := &v1beta2.Asset{
Urn: tableURN,
Name: tableSchema.TableName,
Expand All @@ -189,6 +208,17 @@ func (e *Extractor) buildAsset(schema *odps.Schema, _ *odps.Table, tableType str

tableAttributesData := e.buildTableAttributesData(schemaName, tableSchema)

if tableType == config.TableTypeView {
query := tableSchema.ViewText
tableAttributesData["sql"] = query
if e.config.BuildViewLineage {
upstreamResources := getUpstreamResources(query)
asset.Lineage = &v1beta2.Lineage{
Upstreams: upstreamResources,
}
}
}

var columns []*v1beta2.Column
for i, col := range tableSchema.Columns {
columnData := &v1beta2.Column{
Expand All @@ -209,6 +239,12 @@ func (e *Extractor) buildAsset(schema *odps.Schema, _ *odps.Table, tableType str
UpdateTime: timestamppb.New(time.Time(tableSchema.LastModifiedTime)),
}

maxPreviewRows := e.config.MaxPreviewRows
if maxPreviewRows > 0 {
tableData.PreviewFields = previewFields
tableData.PreviewRows = previewRows
}

tbl, err := anypb.New(tableData)
if err != nil {
e.logger.Warn("error creating Any struct", "error", err)
Expand All @@ -218,6 +254,22 @@ func (e *Extractor) buildAsset(schema *odps.Schema, _ *odps.Table, tableType str
return asset, nil
}

func getUpstreamResources(query string) []*v1beta2.Resource {
upstreamDependencies := upstream.ParseTopLevelUpstreamsFromQuery(query)
uniqueUpstreamDependencies := upstream.UniqueFilterResources(upstreamDependencies)
var upstreams []*v1beta2.Resource
for _, dependency := range uniqueUpstreamDependencies {
urn := plugins.MaxComputeURN(dependency.Project, dependency.Dataset, dependency.Name)
upstreams = append(upstreams, &v1beta2.Resource{
Urn: urn,
Name: dependency.Name,
Type: "table",
Service: "maxcompute",
})
}
return upstreams
}

func buildColumns(dataType datatype.DataType) []*v1beta2.Column {
if dataType.ID() != datatype.STRUCT {
return nil
Expand Down Expand Up @@ -253,8 +305,8 @@ func (e *Extractor) buildTableAttributesData(schemaName string, tableInfo *table
attributesData["sql"] = tableInfo.ViewText
}

partitionNames := make([]string, len(tableInfo.PartitionColumns))
if tableInfo.PartitionColumns != nil && len(tableInfo.PartitionColumns) > 0 {
partitionNames := make([]string, len(tableInfo.PartitionColumns))
for i, column := range tableInfo.PartitionColumns {
partitionNames[i] = column.Name
}
Expand All @@ -278,6 +330,71 @@ func buildColumnAttributesData(column *tableschema.Column) map[string]interface{
return attributesData
}

func (e *Extractor) buildPreview(ctx context.Context, t *odps.Table, tSchema *tableschema.TableSchema) ([]string, *structpb.ListValue, error) {
maxPreviewRows := e.config.MaxPreviewRows
if maxPreviewRows <= 0 {
return nil, nil, nil
}

previewFields, previewRows, err := e.client.GetTablePreview(ctx, "", t, maxPreviewRows)
if err != nil {
e.logger.Error("failed to preview table", "table", t.Name(), "error", err)
return nil, nil, err
}

if e.config.MixValues {
tempRows := make([]interface{}, len(previewRows.GetValues()))
for i, val := range previewRows.GetValues() {
tempRows[i] = val.AsInterface()
}

tempRows, err = e.mixValuesIfNeeded(tempRows, time.Time(tSchema.LastModifiedTime).Unix())
if err != nil {
return nil, nil, fmt.Errorf("mix values: %w", err)
}

previewRows, err = structpb.NewList(tempRows)
if err != nil {
return nil, nil, fmt.Errorf("create preview list: %w", err)
}
}

return previewFields, previewRows, nil
}

func (e *Extractor) mixValuesIfNeeded(rows []interface{}, rndSeed int64) ([]interface{}, error) {
if !e.config.MixValues || len(rows) < 2 {
return rows, nil
}

var table [][]any
for _, row := range rows {
arr, ok := row.([]any)
if !ok {
return nil, fmt.Errorf("row %d is not a slice", row)
}
table = append(table, arr)
}

numRows := len(table)
numColumns := len(table[0])

rndGen := e.randFn(rndSeed)
for col := 0; col < numColumns; col++ {
for row := 0; row < numRows; row++ {
randomRow := rndGen(int64(numRows))

table[row][col], table[randomRow][col] = table[randomRow][col], table[row][col]
}
}

mixedRows := make([]any, numRows)
for i, row := range table {
mixedRows[i] = row
}
return mixedRows, nil
}

func dataTypeToString(dataType datatype.DataType) string {
if dataType.ID() == datatype.MAP {
return dataType.Name()
Expand Down Expand Up @@ -306,5 +423,5 @@ func init() {
}

func CreateClient(_ context.Context, _ log.Logger, conf config.Config) (Client, error) {
return client.New(conf), nil
return client.New(conf)
}
Loading
Loading