Skip to content

Commit 74c1da7

Browse files
committed
提交
1 parent 185d286 commit 74c1da7

10 files changed

+311
-33
lines changed

datamove/datamove.go

+6
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ func New(options *Options) (t *task.Task, err error) {
4747
if err != nil {
4848
return
4949
}
50+
t.Extend = progress
51+
progress.t = t
5052
return
5153

5254
}
@@ -86,9 +88,13 @@ type Progress struct {
8688
isEnd bool
8789
isStopped bool
8890
dataMoveStop *bool
91+
t *task.Task
8992
}
9093

9194
func (this_ *Progress) ShouldStop() bool {
95+
if this_.t != nil && this_.t.IsStopped() {
96+
return true
97+
}
9298
if this_.dataMoveStop != nil {
9399
if *this_.dataMoveStop {
94100
return true

datamove/datamove_base.go

+29-25
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type Options struct {
3232

3333
DataList []map[string]interface{} `json:"dataList"`
3434

35+
Total int64 `json:"total"`
36+
3537
FilePath string `json:"filePath"`
3638
ShouldOwner bool `json:"shouldOwner"` // 需要 建库
3739
ShouldTable bool `json:"shouldTable"` // 需要 建表
@@ -103,6 +105,33 @@ func (this_ *DataSourceConfig) GetTxtFileType() string {
103105
}
104106
return this_.TxtFileType
105107
}
108+
func (this_ *DataSourceConfig) IsData() bool {
109+
return this_.Type == "data"
110+
}
111+
func (this_ *DataSourceConfig) IsDb() bool {
112+
return this_.Type == "database"
113+
}
114+
func (this_ *DataSourceConfig) IsEs() bool {
115+
return this_.Type == "elasticsearch"
116+
}
117+
func (this_ *DataSourceConfig) IsTxt() bool {
118+
return this_.Type == "txt"
119+
}
120+
func (this_ *DataSourceConfig) IsExcel() bool {
121+
return this_.Type == "excel"
122+
}
123+
func (this_ *DataSourceConfig) IsSql() bool {
124+
return this_.Type == "sql"
125+
}
126+
func (this_ *DataSourceConfig) IsKafka() bool {
127+
return this_.Type == "kafka"
128+
}
129+
func (this_ *DataSourceConfig) IsRedis() bool {
130+
return this_.Type == "redis"
131+
}
132+
func (this_ *DataSourceConfig) IsScript() bool {
133+
return this_.Type == "script"
134+
}
106135

107136
type DbOwner struct {
108137
From *dialect.OwnerModel `json:"from"`
@@ -158,28 +187,3 @@ type DbColumn struct {
158187
To *dialect.ColumnModel `json:"to"`
159188
Value string `json:"value"`
160189
}
161-
162-
func (this_ DataSourceConfig) IsData() bool {
163-
return this_.Type == "data"
164-
}
165-
func (this_ DataSourceConfig) IsDb() bool {
166-
return this_.Type == "database"
167-
}
168-
func (this_ DataSourceConfig) IsEs() bool {
169-
return this_.Type == "elasticsearch"
170-
}
171-
func (this_ DataSourceConfig) IsTxt() bool {
172-
return this_.Type == "txt"
173-
}
174-
func (this_ DataSourceConfig) IsExcel() bool {
175-
return this_.Type == "excel"
176-
}
177-
func (this_ DataSourceConfig) IsSql() bool {
178-
return this_.Type == "sql"
179-
}
180-
func (this_ DataSourceConfig) IsKafka() bool {
181-
return this_.Type == "kafka"
182-
}
183-
func (this_ DataSourceConfig) IsRedis() bool {
184-
return this_.Type == "redis"
185-
}

datamove/datamove_datasource.go

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ func (this_ *Executor) datasourceToDb(from DataSource) (err error) {
174174
if err != nil {
175175
return
176176
}
177+
defer func() { _ = to.Service.GetDb().Close() }()
177178
err = DateMove(this_.Progress, from, to)
178179
if err != nil {
179180
util.Logger.Error("datasource to excel error", zap.Error(err))

datamove/datamove_db.go

+48-5
Original file line numberDiff line numberDiff line change
@@ -178,27 +178,70 @@ func (this_ *Executor) forEachOwnersTables(on func(owner *DbOwner, table *DbTabl
178178
util.Logger.Error("for each owners to do panic error", zap.Error(err))
179179
}
180180
}()
181+
182+
if this_.BySql {
183+
var service db.IService
184+
service, err = this_.newDbService(*this_.From.DbConfig, this_.From.Username, this_.From.Password, "")
185+
if err != nil {
186+
return
187+
}
188+
defer func() {
189+
if service != nil {
190+
_ = service.GetDb().Close()
191+
}
192+
}()
193+
194+
owner := &DbOwner{
195+
From: &dialect.OwnerModel{
196+
OwnerName: this_.OwnerName,
197+
},
198+
To: &dialect.OwnerModel{
199+
OwnerName: this_.OwnerName,
200+
},
201+
}
202+
table := &DbTable{
203+
From: &dialect.TableModel{
204+
TableName: this_.TableName,
205+
},
206+
To: &dialect.TableModel{
207+
TableName: this_.TableName,
208+
},
209+
IndexName: this_.IndexName,
210+
IdName: this_.IdName,
211+
IdScript: this_.IdScript,
212+
}
213+
from := NewDataSourceDb()
214+
from.ParamModel = this_.GetDialectParam()
215+
from.OwnerName = owner.From.OwnerName
216+
from.TableName = table.From.TableName
217+
from.ColumnList = this_.ColumnList
218+
from.Service = service
219+
err = on(owner, table, from)
220+
221+
return
222+
}
223+
181224
util.Logger.Info("for each owners to do", zap.Any("allOwner", this_.AllOwner))
182225
owners := this_.Owners
183226

184227
if this_.AllOwner {
185228
var list []*dialect.OwnerModel
186229
var service db.IService
187-
service, err = this_.newDbService(*this_.From.DbConfig, "", "", "")
230+
service, err = this_.newDbService(*this_.From.DbConfig, this_.From.Username, this_.From.Password, "")
188231
if err != nil {
189232
return
190233
}
191234
defer func() {
192235
if service != nil {
193-
service.Close()
236+
_ = service.GetDb().Close()
194237
}
195238
}()
196239
list, err = worker.OwnersSelect(service.GetDb(), service.GetDialect(), this_.GetDialectParam())
197240
if err != nil {
198241
return
199242
}
200243

201-
service.Close()
244+
_ = service.GetDb().Close()
202245
service = nil
203246

204247
for _, one := range list {
@@ -287,12 +330,12 @@ func (this_ *Executor) forEachOwnerTables(owner *DbOwner, on func(owner *DbOwner
287330
defer func() {
288331

289332
if owner.fromService != nil {
290-
owner.fromService.Close()
333+
_ = owner.fromService.GetDb().Close()
291334
owner.fromService = nil
292335
}
293336

294337
if owner.toService != nil {
295-
owner.toService.Close()
338+
_ = owner.toService.GetDb().Close()
296339
owner.toService = nil
297340
}
298341

datamove/datamove_execute.go

+29
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,19 @@ func (this_ *Executor) execute() (err error) {
1414
err = errors.New(fmt.Sprint(e))
1515
util.Logger.Error("execute panic error", zap.Error(err))
1616
}
17+
if this_.From != nil && this_.From.DbConfig != nil && this_.From.DbConfig.SSHClient != nil {
18+
_ = this_.From.DbConfig.SSHClient.Close()
19+
}
20+
if this_.To != nil && this_.To.DbConfig != nil && this_.To.DbConfig.SSHClient != nil {
21+
_ = this_.To.DbConfig.SSHClient.Close()
22+
}
23+
24+
if this_.From != nil && this_.From.RedisConfig != nil && this_.From.RedisConfig.SSHClient != nil {
25+
_ = this_.From.RedisConfig.SSHClient.Close()
26+
}
27+
if this_.To != nil && this_.To.RedisConfig != nil && this_.To.RedisConfig.SSHClient != nil {
28+
_ = this_.To.RedisConfig.SSHClient.Close()
29+
}
1730
}()
1831

1932
if this_.From.IsDb() {
@@ -96,6 +109,22 @@ func (this_ *Executor) execute() (err error) {
96109
util.Logger.Error("execute error", zap.Error(err))
97110
return
98111
}
112+
} else if this_.From.IsScript() {
113+
if this_.To.IsSql() {
114+
err = this_.scriptToSql()
115+
} else if this_.To.IsExcel() {
116+
err = this_.scriptToExcel()
117+
} else if this_.To.IsTxt() {
118+
err = this_.scriptToTxt()
119+
} else if this_.To.IsDb() {
120+
err = this_.scriptToDb()
121+
} else if this_.To.IsEs() {
122+
err = this_.scriptToEs()
123+
} else {
124+
err = errors.New(fmt.Sprintf("不支持的 目标 类型[%s]", this_.To.Type))
125+
util.Logger.Error("execute error", zap.Error(err))
126+
return
127+
}
99128
} else {
100129
err = errors.New(fmt.Sprintf("不支持的 源 类型[%s]", this_.To.Type))
101130
util.Logger.Error("execute error", zap.Error(err))

datamove/datamove_script.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package datamove
2+
3+
import "github.com/team-ide/go-tool/util"
4+
5+
func (this_ *Executor) scriptToDb() (err error) {
6+
util.Logger.Info("script to db start")
7+
err = this_.onScriptSourceData(this_.datasourceToDb)
8+
util.Logger.Info("script to db end")
9+
return
10+
}
11+
12+
func (this_ *Executor) scriptToEs() (err error) {
13+
util.Logger.Info("script to es start")
14+
err = this_.onScriptSourceData(this_.datasourceToEs)
15+
util.Logger.Info("script to es end")
16+
return
17+
}
18+
19+
func (this_ *Executor) scriptToSql() (err error) {
20+
util.Logger.Info("script to sql start")
21+
err = this_.onScriptSourceData(this_.datasourceToSql)
22+
util.Logger.Info("script to sql end")
23+
return
24+
}
25+
26+
func (this_ *Executor) scriptToTxt() (err error) {
27+
util.Logger.Info("script to txt start")
28+
err = this_.onScriptSourceData(this_.datasourceToTxt)
29+
util.Logger.Info("script to txt end")
30+
return
31+
}
32+
33+
func (this_ *Executor) scriptToExcel() (err error) {
34+
util.Logger.Info("script to excel start")
35+
err = this_.onScriptSourceData(this_.datasourceToExcel)
36+
util.Logger.Info("script to excel end")
37+
return
38+
}
39+
40+
func (this_ *Executor) onScriptSourceData(on func(datasource DataSource) (err error)) (err error) {
41+
datasource := NewDataSourceScript()
42+
datasource.Total = this_.Total
43+
datasource.ColumnList = this_.ColumnList
44+
err = on(datasource)
45+
return
46+
}

datamove/datasource_base.go

+45-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package datamove
22

33
import (
4+
"errors"
45
"github.com/team-ide/go-dialect/dialect"
6+
"github.com/team-ide/go-tool/javascript"
7+
"github.com/team-ide/go-tool/util"
58
)
69

710
type DataSourceBase struct {
8-
SkipNames []string `json:"skipNames"`
9-
ColumnList []*Column `json:"columnList"`
11+
SkipNames []string `json:"skipNames"`
12+
ColumnList []*Column `json:"columnList"`
13+
script *javascript.Script
14+
ScriptContext map[string]interface{}
1015
}
1116

1217
type Column struct {
@@ -30,6 +35,44 @@ func (this_ *DataSourceBase) GetDialectColumnList() []*dialect.ColumnModel {
3035
return columns
3136
}
3237

38+
func (this_ *DataSourceBase) initScriptContext() {
39+
if this_.script == nil {
40+
this_.script, _ = javascript.NewScript()
41+
if this_.ScriptContext != nil {
42+
for k, v := range this_.ScriptContext {
43+
_ = this_.script.Set(k, v)
44+
}
45+
}
46+
}
47+
}
48+
49+
func (this_ *DataSourceBase) SetScriptContext(key string, value interface{}) {
50+
this_.initScriptContext()
51+
_ = this_.script.Set(key, value)
52+
}
53+
54+
func (this_ *DataSourceBase) GetValueByScript(script string) (res interface{}, err error) {
55+
this_.initScriptContext()
56+
if script == "" {
57+
return
58+
}
59+
res, err = this_.script.GetScriptValue(script)
60+
if err != nil {
61+
err = errors.New("script [" + script + "] error:" + err.Error())
62+
return
63+
}
64+
return
65+
}
66+
67+
func (this_ *DataSourceBase) GetStringValueByScript(script string) (res string, err error) {
68+
r, err := this_.GetValueByScript(script)
69+
if err != nil {
70+
return
71+
}
72+
res = util.GetStringValue(r)
73+
return
74+
}
75+
3376
func (this_ *DataSourceBase) ValuesToValues(progress *Progress, cols []interface{}) (res []interface{}, err error) {
3477
vSize := len(cols)
3578
for index, _ := range this_.ColumnList {

0 commit comments

Comments
 (0)