diff --git a/go.mod1 b/go.mod1 index c2952fbe4..eb2936c27 100644 --- a/go.mod1 +++ b/go.mod1 @@ -39,10 +39,12 @@ require ( github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457 github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0 go.etcd.io/bbolt v1.3.5 // indirect + go.uber.org/atomic v1.7.0 + go.uber.org/multierr v1.6.0 go.uber.org/zap v1.16.0 golang.org/x/net v0.0.0-20200904194848-62affa334b73 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 - golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f // indirect + golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f golang.org/x/text v0.3.4 golang.org/x/tools v0.0.0-20200904185747-39188db58858 // indirect google.golang.org/grpc v1.27.1 diff --git a/go.sum1 b/go.sum1 index 3c7b31598..26de92e8e 100644 --- a/go.sum1 +++ b/go.sum1 @@ -165,6 +165,7 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cheggaaa/pb v2.0.7+incompatible h1:gLKifR1UkZ/kLkda5gC0K6c8g+jU2sINPtBeOiNlMhU= github.com/cheggaaa/pb v2.0.7+incompatible/go.mod h1:pQciLPpbU0oxA0h+VJYYLxO+XeDQb5pZijXscXHm81s= github.com/cheggaaa/pb/v3 v3.0.4/go.mod h1:7rgWxLrAUcFMkvJuv09+DYi7mMUYi8nO9iOWcvGJPfw= +github.com/cheggaaa/pb/v3 v3.0.5 h1:lmZOti7CraK9RSjzExsY53+WWfub9Qv13B5m4ptEoPE= github.com/cheggaaa/pb/v3 v3.0.5/go.mod h1:X1L61/+36nz9bjIsrDU52qHKOQukUQe2Ge+YvGuquCw= github.com/cheynewallace/tabby v1.1.0/go.mod h1:Pba/6cUL8uYqvOc9RkyvFbHGrQ9wShyrn6/S/1OYVys= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= @@ -206,6 +207,7 @@ github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142/go.mod h1:F5haX7 github.com/coreos/go-systemd v0.0.0-20190212144455-93d5ec2c7f76/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= @@ -500,6 +502,7 @@ github.com/gostaticanalysis/analysisutil v0.0.3/go.mod h1:eEOZF4jCKGi+aprrirO9e7 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 h1:z53tR0945TRRQO/fLEVPI6SMv7ZflF0TEaTAoU7tOzg= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= +github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 h1:THDBEeQ9xZ8JEaCLyLQqXMMdRqNr0QAUJTIkQAUtFjg= github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xCzHAvxcr8HZnzsqU6ILg/0NiiE= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= @@ -507,6 +510,7 @@ github.com/grpc-ecosystem/grpc-gateway v1.8.1/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.12.1/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c= +github.com/grpc-ecosystem/grpc-gateway v1.14.3 h1:OCJlWkOUoTnl0neNGlf4fUm3TmbEtguw7vR+nGtnDjY= github.com/grpc-ecosystem/grpc-gateway v1.14.3/go.mod h1:6CwZWGDSPRJidgKAtJVvND6soZe6fT7iteq8wDPdhb0= github.com/gtank/cryptopasta v0.0.0-20170601214702-1f550f6f2f69/go.mod h1:YLEMZOtU+AZ7dhN9T/IpGhXVGly2bvkJQ+zxj3WeVQo= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= @@ -655,6 +659,7 @@ github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVc github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.7 h1:bQGKb3vps/j0E9GfJQ03JyhRuxsvdAanXlT9BTw3mdw= github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= @@ -664,6 +669,7 @@ github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2y github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.11 h1:FxPOTFNqGkuDUGi3H/qkUbQO4ZiBa2brKq5r0l8TGeM= github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= @@ -918,6 +924,7 @@ github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uY github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237 h1:HQagqIiBmr8YXawX/le3+O26N+vPPC1PtjaF3mwnook= github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= @@ -1030,6 +1037,7 @@ github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d h1:4J9HCZVpvDmj2t github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= +github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 h1:uSDYjYejelKyceA6DiCsngFof9jAyeaSyX9XC5a1a7Q= github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= github.com/tdakkota/asciicheck v0.0.0-20200416190851-d7f85be797a2/go.mod h1:yHp0ai0Z9gUljN3o0xMhYJnH/IcvkdTBOX2fmJ93JEM= github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c/go.mod h1:ahpPrc7HpcfEWDQRZEmnXMzHY03mLDYMCxeDzy46i+8= @@ -1061,6 +1069,7 @@ github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA= github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/uber-go/atomic v1.3.2 h1:Azu9lPBWRNKzYXSIwRfgRuDuS0YKsK4NFhiQv98gkxo= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= +github.com/uber-go/atomic v1.4.0 h1:yOuPqEq4ovnhEjpHmfFwsqBXDYbQeT6Nb0bwD6XnD5o= github.com/uber-go/atomic v1.4.0/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-client-go v2.16.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= @@ -1234,6 +1243,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f h1:J5lckAjkw6qYlOZNj90mLYNTEKDvWeuc1yieZ8qUzUE= golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs= golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= @@ -1623,6 +1633,7 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= software.sslmate.com/src/go-pkcs12 v0.0.0-20200619203921-c9ed90bd32dc/go.mod h1:/xvNRWUqm0+/ZMiF4EX00vrSCMsE4/NHb+Pt3freEeQ= sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4 h1:VO9oZbbkvTwqLimlQt15QNdOOBArT2dw/bvzsMZBiqQ= sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= +sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 h1:ucqkfpjg9WzSUubAO62csmucvxl4/JeW3F4I4909XkM= sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 h1:e1sMhtVq9AfcEy8AXNb8eSg6gbzfdpYhoNqnPJa+GzI= sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67/go.mod h1:L5q+DGLGOQFpo1snNEkLOJT2d1YTW66rWNzatr3He1k= diff --git a/lightning/backend/backend.go b/lightning/backend/backend.go index 965fa5b19..c81c0743c 100644 --- a/lightning/backend/backend.go +++ b/lightning/backend/backend.go @@ -16,6 +16,7 @@ package backend import ( "context" "fmt" + "sort" "time" "github.com/google/uuid" @@ -82,6 +83,18 @@ func MakeUUID(tableName string, engineID int32) (string, uuid.UUID) { var engineNamespace = uuid.MustParse("d68d6abe-c59e-45d6-ade8-e2b0ceb7bedf") +type EngineFileSize struct { + // UUID is the engine's UUID. + UUID uuid.UUID + // DiskSize is the estimated total file size on disk right now. + DiskSize int64 + // MemSize is the total memory size used by the engine. This is the + // estimated additional size saved onto disk after calling Flush(). + MemSize int64 + // IsImporting indicates whether the engine performing Import(). + IsImporting bool +} + // AbstractBackend is the abstract interface behind Backend. // Implementations of this interface must be goroutine safe: you can share an // instance and execute any method anywhere. @@ -128,7 +141,29 @@ type AbstractBackend interface { // - PKIsHandle (true = do not generate _tidb_rowid) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) - LocalWriter(ctx context.Context, engineUUID uuid.UUID) (EngineWriter, error) + // FlushEngine ensures all KV pairs written to an open engine has been + // synchronized, such that kill-9'ing Lightning afterwards and resuming from + // checkpoint can recover the exact same content. + // + // This method is only relevant for local backend, and is no-op for all + // other backends. + FlushEngine(ctx context.Context, engineUUID uuid.UUID) error + + // FlushAllEngines performs FlushEngine on all opened engines. This is a + // very expensive operation and should only be used in some rare situation + // (e.g. preparing to resolve a disk quota violation). + FlushAllEngines(ctx context.Context) error + + // EngineFileSizes obtains the size occupied locally of all engines managed + // by this backend. This method is used to compute disk quota. + // It can return nil if the content are all stored remotely. + EngineFileSizes() []EngineFileSize + + // ResetEngine clears all written KV pairs in this opened engine. + ResetEngine(ctx context.Context, engineUUID uuid.UUID) error + + // LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine. + LocalWriter(ctx context.Context, engineUUID uuid.UUID, maxCacheSize int64) (EngineWriter, error) } func fetchRemoteTableModelsFromTLS(ctx context.Context, tls *common.TLS, schema string) ([]*model.TableInfo, error) { @@ -207,6 +242,61 @@ func (be Backend) FetchRemoteTableModels(ctx context.Context, schemaName string) return be.abstract.FetchRemoteTableModels(ctx, schemaName) } +func (be Backend) FlushAll(ctx context.Context) error { + return be.abstract.FlushAllEngines(ctx) +} + +// CheckDiskQuota verifies if the total engine file size is below the given +// quota. If the quota is exceeded, this method returns an array of engines, +// which after importing can decrease the total size below quota. +func (be Backend) CheckDiskQuota(quota int64) ( + largeEngines []uuid.UUID, + inProgressLargeEngines int, + totalDiskSize int64, + totalMemSize int64, +) { + sizes := be.abstract.EngineFileSizes() + sort.Slice(sizes, func(i, j int) bool { + a, b := &sizes[i], &sizes[j] + if a.IsImporting != b.IsImporting { + return a.IsImporting + } + return a.DiskSize+a.MemSize < b.DiskSize+b.MemSize + }) + for _, size := range sizes { + totalDiskSize += size.DiskSize + totalMemSize += size.MemSize + if totalDiskSize+totalMemSize > quota { + if size.IsImporting { + inProgressLargeEngines++ + } else { + largeEngines = append(largeEngines, size.UUID) + } + } + } + return +} + +// UnsafeImportAndReset forces the backend to import the content of an engine +// into the target and then reset the engine to empty. This method will not +// close the engine. Make sure the engine is flushed manually before calling +// this method. +func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID) error { + // DO NOT call be.abstract.CloseEngine()! The engine should still be writable after + // calling UnsafeImportAndReset(). + closedEngine := ClosedEngine{ + engine: engine{ + backend: be.abstract, + logger: makeLogger("", engineUUID), + uuid: engineUUID, + }, + } + if err := closedEngine.Import(ctx); err != nil { + return err + } + return be.abstract.ResetEngine(ctx, engineUUID) +} + // OpenEngine opens an engine with the given table name and engine ID. func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32) (*OpenedEngine, error) { tag, engineUUID := MakeUUID(tableName, engineID) @@ -251,16 +341,13 @@ func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error) { } // Flush current written data for local backend -func (engine *OpenedEngine) Flush() error { - if l, ok := engine.backend.(*local); ok { - return l.Flush(engine.uuid) - } - return nil +func (engine *OpenedEngine) Flush(ctx context.Context) error { + return engine.backend.FlushEngine(ctx, engine.uuid) } // WriteRows writes a collection of encoded rows into the engine. func (engine *OpenedEngine) WriteRows(ctx context.Context, columnNames []string, rows Rows) error { - writer, err := engine.backend.LocalWriter(ctx, engine.uuid) + writer, err := engine.backend.LocalWriter(ctx, engine.uuid, LocalMemoryTableSize) if err != nil { return err } @@ -271,8 +358,8 @@ func (engine *OpenedEngine) WriteRows(ctx context.Context, columnNames []string, return writer.Close() } -func (engine *OpenedEngine) LocalWriter(ctx context.Context) (*LocalEngineWriter, error) { - w, err := engine.backend.LocalWriter(ctx, engine.uuid) +func (engine *OpenedEngine) LocalWriter(ctx context.Context, maxCacheSize int64) (*LocalEngineWriter, error) { + w, err := engine.backend.LocalWriter(ctx, engine.uuid, maxCacheSize) if err != nil { return nil, err } diff --git a/lightning/backend/backend_test.go b/lightning/backend/backend_test.go index d2417391a..4b413c5f7 100644 --- a/lightning/backend/backend_test.go +++ b/lightning/backend/backend_test.go @@ -125,7 +125,7 @@ func (s *backendSuite) TestWriteEngine(c *C) { Return(nil) mockWriter := mock.NewMockEngineWriter(s.controller) - s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any()).Return(mockWriter, nil).AnyTimes() + s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(kv.LocalMemoryTableSize)).Return(mockWriter, nil).AnyTimes() mockWriter.EXPECT(). AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows1). Return(nil) @@ -153,7 +153,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) { s.mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil) writer.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), emptyRows).Return(nil) writer.EXPECT().Close().Return(nil) - s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any()).Return(writer, nil) + s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(kv.LocalMemoryTableSize)).Return(writer, nil) engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1) c.Assert(err, IsNil) @@ -183,7 +183,7 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) { s.mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil) mockWriter := mock.NewMockEngineWriter(s.controller) - s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any()).Return(mockWriter, nil).AnyTimes() + s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(kv.LocalMemoryTableSize)).Return(mockWriter, nil).AnyTimes() mockWriter.EXPECT(). AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows). Return(errors.Annotate(context.Canceled, "fake unrecoverable write error")) @@ -204,7 +204,7 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) { s.mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil) mockWriter := mock.NewMockEngineWriter(s.controller) - s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any()).Return(mockWriter, nil).AnyTimes() + s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(kv.LocalMemoryTableSize)).Return(mockWriter, nil).AnyTimes() mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows). Return(errors.New("fake recoverable write batch error")). MinTimes(1) @@ -304,3 +304,77 @@ func (s *backendSuite) TestNewEncoder(c *C) { c.Assert(realEncoder, Equals, encoder) c.Assert(err, IsNil) } + +func (s *backendSuite) TestCheckDiskQuota(c *C) { + s.setUpTest(c) + defer s.tearDownTest() + + uuid1 := uuid.MustParse("11111111-1111-1111-1111-111111111111") + uuid3 := uuid.MustParse("33333333-3333-3333-3333-333333333333") + uuid5 := uuid.MustParse("55555555-5555-5555-5555-555555555555") + uuid7 := uuid.MustParse("77777777-7777-7777-7777-777777777777") + uuid9 := uuid.MustParse("99999999-9999-9999-9999-999999999999") + + fileSizes := []kv.EngineFileSize{ + { + UUID: uuid1, + DiskSize: 1000, + MemSize: 0, + IsImporting: false, + }, + { + UUID: uuid3, + DiskSize: 2000, + MemSize: 1000, + IsImporting: true, + }, + { + UUID: uuid5, + DiskSize: 1500, + MemSize: 3500, + IsImporting: false, + }, + { + UUID: uuid7, + DiskSize: 0, + MemSize: 7000, + IsImporting: true, + }, + { + UUID: uuid9, + DiskSize: 4500, + MemSize: 4500, + IsImporting: false, + }, + } + + s.mockBackend.EXPECT().EngineFileSizes().Return(fileSizes).Times(4) + + // No quota exceeded + le, iple, ds, ms := s.backend.CheckDiskQuota(30000) + c.Assert(le, HasLen, 0) + c.Assert(iple, Equals, 0) + c.Assert(ds, Equals, int64(9000)) + c.Assert(ms, Equals, int64(16000)) + + // Quota exceeded, the largest one is out + le, iple, ds, ms = s.backend.CheckDiskQuota(20000) + c.Assert(le, DeepEquals, []uuid.UUID{uuid9}) + c.Assert(iple, Equals, 0) + c.Assert(ds, Equals, int64(9000)) + c.Assert(ms, Equals, int64(16000)) + + // Quota exceeded, the importing one should be ranked least priority + le, iple, ds, ms = s.backend.CheckDiskQuota(12000) + c.Assert(le, DeepEquals, []uuid.UUID{uuid5, uuid9}) + c.Assert(iple, Equals, 0) + c.Assert(ds, Equals, int64(9000)) + c.Assert(ms, Equals, int64(16000)) + + // Quota exceeded, the importing ones should not be visible + le, iple, ds, ms = s.backend.CheckDiskQuota(5000) + c.Assert(le, DeepEquals, []uuid.UUID{uuid1, uuid5, uuid9}) + c.Assert(iple, Equals, 1) + c.Assert(ds, Equals, int64(9000)) + c.Assert(ms, Equals, int64(16000)) +} diff --git a/lightning/backend/importer.go b/lightning/backend/importer.go index 231b64c5d..30f999d0a 100644 --- a/lightning/backend/importer.go +++ b/lightning/backend/importer.go @@ -355,7 +355,23 @@ func (importer *importer) FetchRemoteTableModels(ctx context.Context, schema str return fetchRemoteTableModelsFromTLS(ctx, importer.tls, schema) } -func (importer *importer) LocalWriter(ctx context.Context, engineUUID uuid.UUID) (EngineWriter, error) { +func (importer *importer) EngineFileSizes() []EngineFileSize { + return nil +} + +func (importer *importer) FlushEngine(context.Context, uuid.UUID) error { + return nil +} + +func (importer *importer) FlushAllEngines(context.Context) error { + return nil +} + +func (importer *importer) ResetEngine(context.Context, uuid.UUID) error { + return errors.New("cannot reset an engine in importer backend") +} + +func (importer *importer) LocalWriter(ctx context.Context, engineUUID uuid.UUID, maxCacheSize int64) (EngineWriter, error) { return &ImporterWriter{importer: importer, engineUUID: engineUUID}, nil } diff --git a/lightning/backend/local.go b/lightning/backend/local.go index 0db30b78b..7ea97bf68 100644 --- a/lightning/backend/local.go +++ b/lightning/backend/local.go @@ -18,15 +18,12 @@ import ( "context" "encoding/binary" "encoding/json" - "fmt" "io" - "io/ioutil" "os" "path/filepath" "sort" "strings" "sync" - "sync/atomic" "time" "github.com/cockroachdb/pebble" @@ -48,13 +45,17 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/hack" pd "github.com/tikv/pd/client" + "go.uber.org/atomic" + "go.uber.org/multierr" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "github.com/pingcap/tidb-lightning/lightning/common" + "github.com/pingcap/tidb-lightning/lightning/config" "github.com/pingcap/tidb-lightning/lightning/glue" "github.com/pingcap/tidb-lightning/lightning/log" "github.com/pingcap/tidb-lightning/lightning/manual" @@ -63,15 +64,14 @@ import ( ) const ( - dialTimeout = 5 * time.Second - bigValueSize = 1 << 16 // 64K - engineMetaFileSuffix = ".meta" + dialTimeout = 5 * time.Second + bigValueSize = 1 << 16 // 64K gRPCKeepAliveTime = 10 * time.Second gRPCKeepAliveTimeout = 3 * time.Second gRPCBackOffMaxDelay = 3 * time.Second - LocalMemoryTableSize = 128 << 20 + LocalMemoryTableSize = config.LocalMemoryTableSize // See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360 regionMaxKeyCount = 1_440_000 @@ -91,6 +91,11 @@ var ( localMinPDVersion = *semver.New("4.0.0") ) +var ( + engineMetaKey = []byte{0, 'm', 'e', 't', 'a'} + normalIterStartKey = []byte{1} +) + // Range record start and end key for localStoreDir.DB // so we can write it to tikv in streaming type Range struct { @@ -102,42 +107,46 @@ type Range struct { // localFileMeta contains some field that is necessary to continue the engine restore/import process. // These field should be written to disk when we update chunk checkpoint type localFileMeta struct { - Ts uint64 `json:"ts"` - Length int64 `json:"length"` - TotalSize int64 `json:"total_size"` + Ts uint64 `json:"ts"` + // Length is the number of KV pairs stored by the engine. + Length atomic.Int64 `json:"length"` + // TotalSize is the total pre-compressed KV byte size stored by engine. + TotalSize atomic.Int64 `json:"total_size"` } -type importMutexState int32 +type importMutexState uint32 const ( - importMutexStateNoLock importMutexState = iota - importMutexStateImport + importMutexStateImport importMutexState = 1 << iota importMutexStateFlush importMutexStateClose + importMutexStateLocalIngest ) type LocalFile struct { localFileMeta - db *pebble.DB - Uuid uuid.UUID + db *pebble.DB + Uuid uuid.UUID + localWriters sync.Map // isImportingAtomic is an atomic variable indicating whether the importMutex has been locked. // This should not be used as a "spin lock" indicator. - isImportingAtomic int32 + isImportingAtomic atomic.Uint32 mutex sync.Mutex } func (e *LocalFile) Close() error { - return e.db.Close() + log.L().Debug("closing local engine", zap.Stringer("engine", e.Uuid), zap.Stack("stack")) + if e.db == nil { + return nil + } + err := errors.Trace(e.db.Close()) + e.db = nil + return err } // Cleanup remove meta and db files func (e *LocalFile) Cleanup(dataDir string) error { - metaPath := filepath.Join(dataDir, e.Uuid.String()+engineMetaFileSuffix) - if err := os.Remove(metaPath); err != nil && !os.IsNotExist(err) { - return errors.Trace(err) - } - dbPath := filepath.Join(dataDir, e.Uuid.String()) return os.RemoveAll(dbPath) } @@ -178,17 +187,125 @@ func (e *LocalFile) getSizeProperties() (*sizeProperties, error) { return sizeProps, nil } +func (e *LocalFile) isLocked() bool { + return e.isImportingAtomic.Load() != 0 +} + +func (e *LocalFile) getEngineFileSize() EngineFileSize { + metrics := e.db.Metrics() + total := metrics.Total() + var memSize int64 + e.localWriters.Range(func(k, v interface{}) bool { + w := k.(*LocalWriter) + memSize += w.writeBatch.totalSize + if w.writer != nil { + total.Size += int64(w.writer.writer.EstimatedSize()) + } + return true + }) + + return EngineFileSize{ + UUID: e.Uuid, + DiskSize: total.Size, + MemSize: memSize, + IsImporting: e.isLocked(), + } +} + // lock locks the local file for importing. func (e *LocalFile) lock(state importMutexState) { e.mutex.Lock() - atomic.StoreInt32(&e.isImportingAtomic, int32(state)) + e.isImportingAtomic.Store(uint32(state)) +} + +// lockUnless tries to lock the local file unless it is already locked into the state given by +// ignoreStateMask. Returns whether the lock is successful. +func (e *LocalFile) lockUnless(newState, ignoreStateMask importMutexState) bool { + curState := e.isImportingAtomic.Load() + if curState&uint32(ignoreStateMask) != 0 { + return false + } + e.lock(newState) + return true } func (e *LocalFile) unlock() { - atomic.StoreInt32(&e.isImportingAtomic, int32(importMutexStateNoLock)) + if e == nil { + return + } + e.isImportingAtomic.Store(0) e.mutex.Unlock() } +func (e *LocalFile) flushLocalWriters(parentCtx context.Context) error { + eg, ctx := errgroup.WithContext(parentCtx) + e.localWriters.Range(func(k, v interface{}) bool { + eg.Go(func() error { + w := k.(*LocalWriter) + replyErrCh := make(chan error, 1) + w.flushChMutex.RLock() + if w.flushCh != nil { + w.flushCh <- replyErrCh + } else { + replyErrCh <- nil + } + w.flushChMutex.RUnlock() + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-replyErrCh: + return err + } + }) + return true + }) + return eg.Wait() +} + +func (e *LocalFile) flushEngineWithoutLock(ctx context.Context) error { + if err := e.flushLocalWriters(ctx); err != nil { + return err + } + if err := e.saveEngineMeta(); err != nil { + return err + } + flushFinishedCh, err := e.db.AsyncFlush() + if err != nil { + return err + } + select { + case <-flushFinishedCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// saveEngineMeta saves the metadata about the DB into the DB itself. +// This method should be followed by a Flush to ensure the data is actually synchronized +func (e *LocalFile) saveEngineMeta() error { + jsonBytes, err := json.Marshal(&e.localFileMeta) + if err != nil { + return errors.Trace(err) + } + // note: we can't set Sync to true since we disabled WAL. + return errors.Trace(e.db.Set(engineMetaKey, jsonBytes, &pebble.WriteOptions{Sync: false})) +} + +func (e *LocalFile) loadEngineMeta() { + jsonBytes, closer, err := e.db.Get(engineMetaKey) + if err != nil { + log.L().Debug("local db missing engine meta", zap.Stringer("uuid", e.Uuid), zap.Error(err)) + return + } + defer closer.Close() + + err = json.Unmarshal(jsonBytes, &e.localFileMeta) + if err != nil { + log.L().Warn("local db failed to deserialize meta", zap.Stringer("uuid", e.Uuid), zap.ByteString("content", jsonBytes), zap.Error(err)) + } +} + type gRPCConns struct { mu sync.Mutex conns map[uint64]*connPool @@ -204,7 +321,8 @@ func (conns *gRPCConns) Close() { } type local struct { - engines sync.Map + engines sync.Map // sync version of map[uuid.UUID]*LocalFile + conns gRPCConns splitCli split.SplitClient tls *common.TLS @@ -315,7 +433,7 @@ func NewLocalBackend( if shouldCreate { err = os.Mkdir(localFile, 0700) if err != nil { - return MakeBackend(nil), err + return MakeBackend(nil), errors.Annotate(err, "invalid sorted-kv-dir for local backend, please change the config or delete the path") } } @@ -340,24 +458,28 @@ func NewLocalBackend( return MakeBackend(local), nil } -// lock locks the local file. -func (local *local) lockEngine(engineId uuid.UUID, state importMutexState) bool { +// lock locks a local file and returns the LocalFile instance if it exists. +func (local *local) lockEngine(engineId uuid.UUID, state importMutexState) *LocalFile { if e, ok := local.engines.Load(engineId); ok { engine := e.(*LocalFile) engine.lock(state) - return true + return engine } - return false + return nil } -// unlock unlocks the local file from importing. -func (local *local) unlockEngine(engineId uuid.UUID) bool { - if e, ok := local.engines.Load(engineId); ok { - engine := e.(*LocalFile) - engine.unlock() +// lockAllEnginesUnless tries to lock all engines, unless those which are already locked in the +// state given by ignoreStateMask. Returns the list of locked engines. +func (local *local) lockAllEnginesUnless(newState, ignoreStateMask importMutexState) []*LocalFile { + var allEngines []*LocalFile + local.engines.Range(func(k, v interface{}) bool { + engine := v.(*LocalFile) + if engine.lockUnless(newState, ignoreStateMask) { + allEngines = append(allEngines, engine) + } return true - } - return false + }) + return allEngines } func (local *local) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) { @@ -405,13 +527,15 @@ func (local *local) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grp return local.conns.conns[storeID].get(ctx) } -// Close the importer connection. +// Close the local backend. func (local *local) Close() { - local.engines.Range(func(k, v interface{}) bool { - v.(*LocalFile).Close() - return true - }) + allEngines := local.lockAllEnginesUnless(importMutexStateClose, 0) + local.engines = sync.Map{} + for _, engine := range allEngines { + engine.Close() + engine.unlock() + } local.conns.Close() // if checkpoint is disable or we finish load all data successfully, then files in this @@ -424,16 +548,35 @@ func (local *local) Close() { } } -// Flush ensure the written data is saved successfully, to make sure no data lose after restart -func (local *local) Flush(engineId uuid.UUID) error { - if engine, ok := local.engines.Load(engineId); ok { - engineFile := engine.(*LocalFile) - if err := engineFile.db.Flush(); err != nil { - return err +// FlushEngine ensure the written data is saved successfully, to make sure no data lose after restart +func (local *local) FlushEngine(ctx context.Context, engineId uuid.UUID) error { + engineFile := local.lockEngine(engineId, importMutexStateFlush) + + // the engine cannot be deleted after while we've acquired the lock identified by UUID. + + if engineFile == nil { + return errors.Errorf("engine '%s' not found", engineId) + } + defer engineFile.unlock() + return engineFile.flushEngineWithoutLock(ctx) +} + +func (local *local) FlushAllEngines(parentCtx context.Context) (err error) { + allEngines := local.lockAllEnginesUnless(importMutexStateFlush, ^importMutexStateLocalIngest) + defer func() { + for _, engine := range allEngines { + engine.unlock() } - return local.saveEngineMeta(engineFile) + }() + + eg, ctx := errgroup.WithContext(parentCtx) + for _, engineFile := range allEngines { + ef := engineFile + eg.Go(func() error { + return ef.flushEngineWithoutLock(ctx) + }) } - return errors.Errorf("engine '%s' not found", engineId) + return eg.Wait() } func (local *local) RetryImportDelay() time.Duration { @@ -463,53 +606,23 @@ func (local *local) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*pebble.D DisableWAL: true, ReadOnly: readOnly, TablePropertyCollectors: []func() pebble.TablePropertyCollector{ - func() pebble.TablePropertyCollector { - return newRangePropertiesCollector() - }, + newRangePropertiesCollector, }, } dbPath := filepath.Join(local.localStoreDir, engineUUID.String()) return pebble.Open(dbPath, opt) } -func (local *local) saveEngineMeta(engine *LocalFile) error { - jsonBytes, err := json.Marshal(&engine.localFileMeta) - if err != nil { - return errors.Trace(err) - } - metaPath := filepath.Join(local.localStoreDir, engine.Uuid.String()+engineMetaFileSuffix) - return errors.Trace(ioutil.WriteFile(metaPath, jsonBytes, 0644)) -} - -func (local *local) LoadEngineMeta(engineUUID uuid.UUID) (localFileMeta, error) { - var meta localFileMeta - - mataPath := filepath.Join(local.localStoreDir, engineUUID.String()+engineMetaFileSuffix) - f, err := os.Open(mataPath) - if err != nil { - return meta, err - } - err = json.NewDecoder(f).Decode(&meta) - return meta, err -} - // This method must be called with holding mutex of LocalFile func (local *local) OpenEngine(ctx context.Context, engineUUID uuid.UUID) error { - meta, err := local.LoadEngineMeta(engineUUID) - if err != nil { - meta = localFileMeta{} - } db, err := local.openEngineDB(engineUUID, false) if err != nil { return err } - if e, ok := local.engines.Load(engineUUID); ok { - engine := e.(*LocalFile) - engine.db = db - engine.localFileMeta = meta - } else { - local.engines.Store(engineUUID, &LocalFile{localFileMeta: meta, db: db, Uuid: engineUUID, isImportingAtomic: int32(importMutexStateNoLock)}) - } + e, _ := local.engines.LoadOrStore(engineUUID, &LocalFile{Uuid: engineUUID}) + engine := e.(*LocalFile) + engine.db = db + engine.loadEngineMeta() return nil } @@ -522,33 +635,26 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error engine, ok := local.engines.Load(engineUUID) if !ok { // recovery mode, we should reopen this engine file - meta, err := local.LoadEngineMeta(engineUUID) + db, err := local.openEngineDB(engineUUID, true) if err != nil { - // if engine meta not exist, just skip - if os.IsNotExist(err) { + // if engine db does not exist, just skip + if os.IsNotExist(errors.Cause(err)) { return nil } return err } - db, err := local.openEngineDB(engineUUID, true) - if err != nil { - return err - } engineFile := &LocalFile{ - localFileMeta: meta, - Uuid: engineUUID, - db: db, - isImportingAtomic: int32(importMutexStateNoLock), + Uuid: engineUUID, + db: db, } + engineFile.loadEngineMeta() local.engines.Store(engineUUID, engineFile) return nil } engineFile := engine.(*LocalFile) - err := engineFile.db.Flush() - if err != nil { - return err - } - return local.saveEngineMeta(engineFile) + engineFile.lock(importMutexStateFlush) + defer engineFile.unlock() + return engineFile.flushEngineWithoutLock(ctx) } func (local *local) getImportClient(ctx context.Context, peer *metapb.Peer) (sst.ImportSSTClient, error) { @@ -794,7 +900,7 @@ func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit } func (local *local) readAndSplitIntoRange(engineFile *LocalFile) ([]Range, error) { - iter := engineFile.db.NewIter(nil) + iter := engineFile.db.NewIter(&pebble.IterOptions{LowerBound: normalIterStartKey}) defer iter.Close() iterError := func(e string) error { @@ -818,9 +924,12 @@ func (local *local) readAndSplitIntoRange(engineFile *LocalFile) ([]Range, error } endKey := nextKey(lastKey) + engineFileTotalSize := engineFile.TotalSize.Load() + engineFileLength := engineFile.Length.Load() + // <= 96MB no need to split into range - if engineFile.TotalSize <= local.regionSplitSize && engineFile.Length <= regionMaxKeyCount { - ranges := []Range{{start: firstKey, end: endKey, length: int(engineFile.Length)}} + if engineFileTotalSize <= local.regionSplitSize && engineFileLength <= regionMaxKeyCount { + ranges := []Range{{start: firstKey, end: endKey, length: int(engineFileLength)}} return ranges, nil } @@ -833,7 +942,7 @@ func (local *local) readAndSplitIntoRange(engineFile *LocalFile) ([]Range, error local.regionSplitSize, regionMaxKeyCount*2/3) log.L().Info("split engine key ranges", zap.Stringer("engine", engineFile.Uuid), - zap.Int64("totalSize", engineFile.TotalSize), zap.Int64("totalCount", engineFile.Length), + zap.Int64("totalSize", engineFileTotalSize), zap.Int64("totalCount", engineFileLength), log.ZapRedactBinary("firstKey", firstKey), log.ZapRedactBinary("lastKey", lastKey), zap.Int("ranges", len(ranges))) @@ -1131,46 +1240,48 @@ loopWrite: } func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *LocalFile, ranges []Range, remainRanges *syncdRanges) error { - if engineFile.Length == 0 { + if engineFile.Length.Load() == 0 { // engine is empty, this is likes because it's a index engine but the table contains no index - log.L().Warn("engine contains no data", zap.Stringer("uuid", engineFile.Uuid)) + log.L().Info("engine contains no data", zap.Stringer("uuid", engineFile.Uuid)) return nil } log.L().Debug("the ranges Length write to tikv", zap.Int("Length", len(ranges))) - errCh := make(chan error, len(ranges)) + var allErrLock sync.Mutex + var allErr error + var wg sync.WaitGroup + + wg.Add(len(ranges)) - ctx, cancel := context.WithCancel(ctx) - defer cancel() for _, r := range ranges { startKey := r.start endKey := r.end w := local.rangeConcurrency.Apply() go func(w *worker.Worker) { - defer local.rangeConcurrency.Recycle(w) + defer func() { + local.rangeConcurrency.Recycle(w) + wg.Done() + }() var err error for i := 0; i < maxRetryTimes; i++ { err = local.writeAndIngestByRange(ctx, engineFile, startKey, endKey, remainRanges) if err == nil || errors.Cause(err) == context.Canceled { - break + return } log.L().Warn("write and ingest by range failed", zap.Int("retry time", i+1), log.ShortError(err)) } - errCh <- err + + allErrLock.Lock() + allErr = multierr.Append(allErr, err) + allErrLock.Unlock() }(w) } - var err error - for i := 0; i < len(ranges); i++ { - // wait for all sub tasks finish to avoid panic. if we return on the first error, - // the outer tasks may close the pebble db but some sub tasks still read from the db - e := <-errCh - if e != nil && err == nil { - err = e - } - } - return err + // wait for all sub tasks finish to avoid panic. if we return on the first error, + // the outer tasks may close the pebble db but some sub tasks still read from the db + wg.Wait() + return allErr } type syncdRanges struct { @@ -1198,14 +1309,17 @@ func (r *syncdRanges) take() []Range { } func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) error { - engineFile, ok := local.engines.Load(engineUUID) - if !ok { + lf := local.lockEngine(engineUUID, importMutexStateImport) + if lf == nil { // skip if engine not exist. See the comment of `CloseEngine` for more detail. return nil } + defer lf.unlock() - lf := engineFile.(*LocalFile) - if lf.TotalSize == 0 { + lfTotalSize := lf.TotalSize.Load() + lfLength := lf.Length.Load() + + if lfTotalSize == 0 { log.L().Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID)) return nil } @@ -1223,7 +1337,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro // if all the kv can fit in one region, skip split regions. TiDB will split one region for // the table when table is created. - needSplit := len(ranges) > 1 || lf.TotalSize > local.regionSplitSize || lf.Length > regionMaxKeyCount + needSplit := len(ranges) > 1 || lfTotalSize > local.regionSplitSize || lfLength > regionMaxKeyCount // split region by given ranges for i := 0; i < maxRetryTimes; i++ { @@ -1241,7 +1355,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro } // start to write to kv and ingest - err = local.writeAndIngestByRanges(ctx, engineFile.(*LocalFile), ranges, remains) + err = local.writeAndIngestByRanges(ctx, lf, ranges, remains) if err != nil { log.L().Error("write and ingest engine failed", log.ShortError(err)) return err @@ -1256,19 +1370,17 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro } log.L().Info("import engine success", zap.Stringer("uuid", engineUUID), - zap.Int64("size", lf.TotalSize), zap.Int64("kvs", lf.Length)) + zap.Int64("size", lfTotalSize), zap.Int64("kvs", lfLength)) return nil } func (local *local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error { // the only way to reset the engine + reclaim the space is to delete and reopen it 🤷 - engineFile, ok := local.engines.Load(engineUUID) - if !ok { + localEngine := local.lockEngine(engineUUID, importMutexStateClose) + if localEngine == nil { log.L().Warn("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID)) return nil } - localEngine := engineFile.(*LocalFile) - localEngine.lock(importMutexStateClose) defer localEngine.unlock() if err := localEngine.Close(); err != nil { return err @@ -1276,35 +1388,38 @@ func (local *local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error if err := localEngine.Cleanup(local.localStoreDir); err != nil { return err } - meta, err := local.LoadEngineMeta(engineUUID) - if err != nil { - meta = localFileMeta{} - } db, err := local.openEngineDB(engineUUID, false) if err == nil { localEngine.db = db - localEngine.localFileMeta = meta + localEngine.localFileMeta = localFileMeta{} } return err } func (local *local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error { + localEngine := local.lockEngine(engineUUID, importMutexStateClose) // release this engine after import success - engineFile, ok := local.engines.Load(engineUUID) - if ok { - localEngine := engineFile.(*LocalFile) - err := localEngine.Close() - if err != nil { - return err - } - err = localEngine.Cleanup(local.localStoreDir) - if err != nil { - return err - } - local.engines.Delete(engineUUID) - } else { + if localEngine == nil { log.L().Warn("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID)) + return nil + } + defer localEngine.unlock() + + // since closing the engine causes all subsequent operations on it panic, + // we make sure to delete it from the engine map before calling Close(). + // (note that Close() returning error does _not_ mean the pebble DB + // remains open/usable.) + local.engines.Delete(engineUUID) + err := localEngine.Close() + if err != nil { + return err } + err = localEngine.Cleanup(local.localStoreDir) + if err != nil { + return err + } + localEngine.TotalSize.Store(0) + localEngine.Length.Store(0) return nil } @@ -1333,19 +1448,25 @@ func (local *local) NewEncoder(tbl table.Table, options *SessionOptions) (Encode return NewTableKVEncoder(tbl, options) } -func (local *local) LocalWriter(ctx context.Context, engineUUID uuid.UUID) (EngineWriter, error) { +func (local *local) LocalWriter(ctx context.Context, engineUUID uuid.UUID, maxCacheSize int64) (EngineWriter, error) { e, ok := local.engines.Load(engineUUID) if !ok { return nil, errors.Errorf("could not find engine for %s", engineUUID.String()) } engineFile := e.(*LocalFile) - return openLocalWriter(engineFile, local.localStoreDir, LocalMemoryTableSize), nil + return openLocalWriter(engineFile, local.localStoreDir, maxCacheSize), nil } func openLocalWriter(f *LocalFile, sstDir string, memtableSizeLimit int64) *LocalWriter { - kvsChan := make(chan []common.KvPair, 1024) - w := &LocalWriter{sstDir: sstDir, kvsChan: kvsChan, local: f, memtableSizeLimit: memtableSizeLimit} - w.consumeWg.Add(1) + w := &LocalWriter{ + sstDir: sstDir, + kvsChan: make(chan []common.KvPair, 1024), + flushCh: make(chan chan error), + consumeCh: make(chan struct{}, 1), + local: f, + memtableSizeLimit: memtableSizeLimit, + } + f.localWriters.Store(w, nil) go w.writeRowsLoop() return w } @@ -1539,7 +1660,7 @@ type RangePropertiesCollector struct { propKeysIdxDistance uint64 } -func newRangePropertiesCollector() *RangePropertiesCollector { +func newRangePropertiesCollector() pebble.TablePropertyCollector { return &RangePropertiesCollector{ props: make([]rangeProperty, 0, 1024), propSizeIdxDistance: defaultPropSizeIndexDistance, @@ -1626,29 +1747,26 @@ func (s *sizeProperties) iter(f func(p *rangeProperty) bool) { }) } +func (local *local) EngineFileSizes() (res []EngineFileSize) { + local.engines.Range(func(k, v interface{}) bool { + engine := v.(*LocalFile) + res = append(res, engine.getEngineFileSize()) + return true + }) + return +} + type LocalWriter struct { - writeErr common.OnceError - local *LocalFile - lastKey []byte - consumeWg sync.WaitGroup - kvsChan chan []common.KvPair - sstDir string - memtableSizeLimit int64 - writeBatch []common.KvPair - isWriteBatchSorted bool -} - -// If this method return false, it would not change `w.lastKey` -func (w *LocalWriter) isSorted(kvs []common.KvPair) bool { - lastKey := w.lastKey - for _, pair := range kvs { - if len(lastKey) > 0 && bytes.Compare(lastKey, pair.Key) >= 0 { - return false - } - lastKey = pair.Key - } - w.lastKey = append(w.lastKey[:0], lastKey...) - return true + writeErr common.OnceError + local *LocalFile + consumeCh chan struct{} + kvsChan chan []common.KvPair + flushChMutex sync.RWMutex + flushCh chan chan error + sstDir string + memtableSizeLimit int64 + writeBatch kvMemCache + writer *sstWriter } func (w *LocalWriter) AppendRows(ctx context.Context, tableName string, columnNames []string, ts uint64, rows Rows) error { @@ -1656,156 +1774,280 @@ func (w *LocalWriter) AppendRows(ctx context.Context, tableName string, columnNa if len(kvs) == 0 { return nil } + if err := w.writeErr.Get(); err != nil { + return err + } w.kvsChan <- kvs w.local.Ts = ts return nil } func (w *LocalWriter) Close() error { + w.local.localWriters.Delete(w) close(w.kvsChan) - w.consumeWg.Wait() - return w.writeErr.Get() + + w.flushChMutex.Lock() + flushCh := w.flushCh + w.flushCh = nil + w.flushChMutex.Unlock() + + // after closing kvsChan, the writeRowsLoop will ingest all cached KVs. + // during this time, the flushCh might still be receiving data. + // so we have this extra loop to immediately consume them to avoid AsyncFlush + for { + select { + case <-w.consumeCh: + return w.writeErr.Get() + case replyErrCh := <-flushCh: + replyErrCh <- nil + } + } +} + +func (w *LocalWriter) genSSTPath() string { + return filepath.Join(w.sstDir, uuid.New().String()+".sst") } func (w *LocalWriter) writeRowsLoop() { - batchSize := int64(0) - totalSize := int64(0) - totalCount := int64(0) - w.isWriteBatchSorted = true - var writer *sstable.Writer = nil - //var wb *pebble.Batch = nil - var filePath string - defer w.consumeWg.Done() + defer func() { + if w.writer != nil { + w.writer.cleanUp() + w.writer = nil + } + w.consumeCh <- struct{}{} + }() var err error - for kvs := range w.kvsChan { - hasSort := w.isSorted(kvs) - if totalCount > 1000 && hasSort { - for _, pair := range kvs { - totalSize += int64(len(pair.Key) + len(pair.Val)) +outside: + for { + w.flushChMutex.RLock() + flushCh := w.flushCh + w.flushChMutex.RUnlock() + + select { + case kvs, ok := <-w.kvsChan: + if !ok { + break outside } - if writer == nil { - writer, filePath, err = w.createWriter() + + w.writeBatch.append(kvs) + if w.writeBatch.totalSize <= w.memtableSizeLimit { + break + } + if w.writer == nil { + w.writer, err = newSSTWriter(w.genSSTPath()) if err != nil { w.writeErr.Set(err) return } - if len(w.writeBatch) > 0 && w.isWriteBatchSorted { - if err := writeKVs(writer, w.writeBatch); err != nil { - w.writeErr.Set(err) - return - } - w.isWriteBatchSorted = true - w.writeBatch = w.writeBatch[:0] - totalSize += batchSize - batchSize = 0 - } } - if err := writeKVs(writer, kvs); err != nil { + + if err = w.writeKVsOrIngest(0); err != nil { w.writeErr.Set(err) return } - } else { - for _, pair := range kvs { - batchSize += int64(len(pair.Key) + len(pair.Val)) - } - if !hasSort { - w.isWriteBatchSorted = false - } - w.writeBatch = append(w.writeBatch, kvs...) - if batchSize > w.memtableSizeLimit { - if err := w.flushKVs(); err != nil { - w.writeErr.Set(err) - return - } - if writer == nil { - w.lastKey = w.lastKey[:0] + case replyErrCh := <-flushCh: + err = w.writeKVsOrIngest(localIngestDescriptionFlushed) + if w.writer != nil { + err = w.writer.ingestInto(w.local, localIngestDescriptionFlushed) + if err == nil { + err = w.writer.reopen() } - totalSize += batchSize - batchSize = 0 - w.isWriteBatchSorted = true + } + replyErrCh <- err + if err != nil { + w.writeErr.Set(err) + return } } - totalCount += int64(len(kvs)) } - if batchSize > 0 { - if err := w.flushKVs(); err != nil { + if err = w.writeKVsOrIngest(0); err != nil { + w.writeErr.Set(err) + return + } + if w.writer != nil { + if err := w.writer.ingestInto(w.local, 0); err != nil { w.writeErr.Set(err) - return - } - totalSize += batchSize - log.L().Info("write data by sort index", zap.Int64("bytes", totalSize)) - } - w.local.lock(importMutexStateNoLock) - if writer != nil { - err := writer.Close() - if err == nil { - err = w.local.db.Ingest([]string{filePath}) - // The following two variable should be changed with holding mutex, - // because there may be another thread change localFileMeta object. See it in `local::OpenEngine` - atomic.AddInt64(&w.local.TotalSize, totalSize) - atomic.AddInt64(&w.local.Length, totalCount) } - w.writeErr.Set(err) - log.L().Info("write data by sst writer", zap.Int64("bytes", totalSize)) - } else { - atomic.AddInt64(&w.local.TotalSize, totalSize) - atomic.AddInt64(&w.local.Length, totalCount) } - w.local.unlock() } -func (w *LocalWriter) flushKVs() error { - writer, filePath, err := w.createWriter() +func (w *LocalWriter) writeKVsOrIngest(desc localIngestDescription) error { + if w.writer != nil { + if err := w.writer.writeKVs(&w.writeBatch); err != errorUnorderedSSTInsertion { + return err + } + } + + // if write failed only because of unorderedness, we immediately ingest the memcache. + immWriter, err := newSSTWriter(w.genSSTPath()) if err != nil { return err } - if !w.isWriteBatchSorted { - sort.Slice(w.writeBatch, func(i, j int) bool { - return bytes.Compare(w.writeBatch[i].Key, w.writeBatch[j].Key) < 0 - }) - } - if err := writeKVs(writer, w.writeBatch); err != nil { + defer immWriter.cleanUp() + + if err = immWriter.writeKVs(&w.writeBatch); err != nil { return err } - if err := writer.Close(); err != nil { - return err + + return immWriter.ingestInto(w.local, desc|localIngestDescriptionImmediate) +} + +var errorUnorderedSSTInsertion = errors.New("inserting KVs into SST without order") + +type localIngestDescription uint8 + +const ( + localIngestDescriptionFlushed localIngestDescription = 1 << iota + localIngestDescriptionImmediate +) + +type sstWriter struct { + path string + writer *sstable.Writer + lastKey []byte + totalSize int64 + totalCount int64 +} + +func newSSTWriter(path string) (*sstWriter, error) { + sw := &sstWriter{path: path} + if err := sw.reopen(); err != nil { + return nil, err } - w.writeBatch = w.writeBatch[:0] - w.local.lock(importMutexStateNoLock) - err = w.local.db.Ingest([]string{filePath}) - w.local.unlock() - return err + return sw, nil +} + +// writeKVs moves the KV pairs in the cache into the SST writer. +// On success, the cache will be cleared. +func (sw *sstWriter) writeKVs(m *kvMemCache) error { + if len(m.kvs) == 0 { + return nil + } + m.sort() + if bytes.Compare(m.kvs[0].Key, sw.lastKey) <= 0 { + return errorUnorderedSSTInsertion + } + + internalKey := sstable.InternalKey{ + Trailer: uint64(sstable.InternalKeyKindSet), + } + for _, p := range m.kvs { + internalKey.UserKey = p.Key + if err := sw.writer.Add(internalKey, p.Val); err != nil { + return errors.Trace(err) + } + } + sw.totalSize += m.totalSize + sw.totalCount += int64(len(m.kvs)) + sw.lastKey = m.kvs[len(m.kvs)-1].Key + m.clear() + return nil } -func (w *LocalWriter) createWriter() (*sstable.Writer, string, error) { - filePath := filepath.Join(w.sstDir, fmt.Sprintf("%s.sst", uuid.New())) - f, err := os.Create(filePath) +// ingestInto finishes the SST file, and ingests itself into the target LocalFile database. +// On success, the entire writer will be reset as empty. +func (sw *sstWriter) ingestInto(e *LocalFile, desc localIngestDescription) error { + if sw.totalCount > 0 { + if err := sw.writer.Close(); err != nil { + return errors.Trace(err) + } + if desc&localIngestDescriptionFlushed == 0 { + // No need to acquire lock around ingestion when flushing. + // we already held the lock before flushing. + e.lock(importMutexStateLocalIngest) + defer e.unlock() + } + meta, _ := sw.writer.Metadata() // this method returns error only if it has not been closed yet. + log.L().Info("write data to local DB", + zap.Int64("size", sw.totalSize), + zap.Int64("kvs", sw.totalCount), + zap.Uint8("description", uint8(desc)), + zap.Uint64("sstFileSize", meta.Size), + log.ZapRedactBinary("firstKey", meta.SmallestPoint.UserKey), + log.ZapRedactBinary("lastKey", meta.LargestPoint.UserKey)) + + if err := e.db.Ingest([]string{sw.path}); err != nil { + return errors.Trace(err) + } + e.TotalSize.Add(sw.totalSize) + e.Length.Add(sw.totalCount) + sw.totalSize = 0 + sw.totalCount = 0 + sw.lastKey = nil + } + sw.writer = nil + return nil +} + +// reopen creates a new SST file after ingestInto is successful. +// Returns error if the SST file was not ingested. +func (sw *sstWriter) reopen() error { + if sw.writer != nil { + return errors.New("cannot reopen an SST writer without ingesting it first") + } + f, err := os.Create(sw.path) if err != nil { - return nil, filePath, err + return errors.Trace(err) } - writer := sstable.NewWriter(f, sstable.WriterOptions{ + sw.writer = sstable.NewWriter(f, sstable.WriterOptions{ TablePropertyCollectors: []func() pebble.TablePropertyCollector{ - func() pebble.TablePropertyCollector { - return newRangePropertiesCollector() - }, + newRangePropertiesCollector, }, BlockSize: 16 * 1024, }) - return writer, filePath, nil + return nil } -func writeKVs(writer *sstable.Writer, kvs []common.KvPair) error { - internalKey := sstable.InternalKey{ - UserKey: []byte{}, - Trailer: uint64(sstable.InternalKeyKindSet), +// cleanUp removes any un-ingested SST file. +func (sw *sstWriter) cleanUp() { + if sw.writer != nil { + sw.writer.Close() + os.Remove(sw.path) } - for _, p := range kvs { - internalKey.UserKey = p.Key - if err := writer.Add(internalKey, p.Val); err != nil { - return err +} + +// kvMemCache is an array of KV pairs. It also keep tracks of the total KV size and whether the array is already sorted. +type kvMemCache struct { + kvs []common.KvPair + totalSize int64 + notSorted bool // record "not sorted" instead of "sorted" so that the zero value is correct. +} + +// append more KV pairs to the kvMemCache. +func (m *kvMemCache) append(kvs []common.KvPair) { + if !m.notSorted { + var lastKey []byte + if len(m.kvs) > 0 { + lastKey = m.kvs[len(m.kvs)-1].Key + } + for _, kv := range kvs { + if bytes.Compare(kv.Key, lastKey) <= 0 { + m.notSorted = true + break + } + lastKey = kv.Key } } - return nil + + m.kvs = append(m.kvs, kvs...) + for _, kv := range kvs { + m.totalSize += int64(len(kv.Key)) + int64(len(kv.Val)) + } +} + +// sort ensures the content is actually sorted. +func (m *kvMemCache) sort() { + if m.notSorted { + sort.Slice(m.kvs, func(i, j int) bool { return bytes.Compare(m.kvs[i].Key, m.kvs[j].Key) < 0 }) + m.notSorted = false + } +} + +// clear resets the cache to contain nothing. +func (m *kvMemCache) clear() { + m.kvs = m.kvs[:0] + m.totalSize = 0 + m.notSorted = false } diff --git a/lightning/backend/local_test.go b/lightning/backend/local_test.go index c7ba16899..70193d3bb 100644 --- a/lightning/backend/local_test.go +++ b/lightning/backend/local_test.go @@ -371,8 +371,8 @@ func testLocalWriter(c *C, needSort bool, partitialSort bool) { sort.Slice(keys, func(i, j int) bool { return bytes.Compare(keys[i], keys[j]) < 0 }) - c.Assert(int(f.Length), Equals, 20000) - c.Assert(int(f.TotalSize), Equals, 144*20000) + c.Assert(int(f.Length.Load()), Equals, 20000) + c.Assert(int(f.TotalSize.Load()), Equals, 144*20000) valid := it.SeekGE(keys[0]) c.Assert(valid, IsTrue) for _, k := range keys { diff --git a/lightning/backend/tidb.go b/lightning/backend/tidb.go index c3c1d2c44..efc0b6709 100644 --- a/lightning/backend/tidb.go +++ b/lightning/backend/tidb.go @@ -550,7 +550,23 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st return } -func (be *tidbBackend) LocalWriter(ctx context.Context, engineUUID uuid.UUID) (EngineWriter, error) { +func (be *tidbBackend) EngineFileSizes() []EngineFileSize { + return nil +} + +func (be *tidbBackend) FlushEngine(context.Context, uuid.UUID) error { + return nil +} + +func (be *tidbBackend) FlushAllEngines(context.Context) error { + return nil +} + +func (be *tidbBackend) ResetEngine(context.Context, uuid.UUID) error { + return errors.New("cannot reset an engine in TiDB backend") +} + +func (be *tidbBackend) LocalWriter(ctx context.Context, engineUUID uuid.UUID, maxCacheSize int64) (EngineWriter, error) { return &TiDBWriter{be: be, engineUUID: engineUUID}, nil } diff --git a/lightning/common/pause_test.go b/lightning/common/pause_test.go index a7773720e..472433afe 100644 --- a/lightning/common/pause_test.go +++ b/lightning/common/pause_test.go @@ -96,7 +96,7 @@ func (s *pauseSuite) TestPause(c *C) { p.Resume() }() - c.Assert(&wg, unblocksBetween, 500*time.Millisecond, 510*time.Millisecond) + c.Assert(&wg, unblocksBetween, 500*time.Millisecond, 520*time.Millisecond) // if the context is canceled, Wait() should immediately unblock... @@ -130,7 +130,7 @@ func (s *pauseSuite) TestPause(c *C) { p.Resume() }() - c.Assert(&wg, unblocksBetween, 500*time.Millisecond, 510*time.Millisecond) + c.Assert(&wg, unblocksBetween, 500*time.Millisecond, 520*time.Millisecond) } // Run `go test github.com/pingcap/tidb-lightning/lightning/common -check.b -test.v` to get benchmark result. diff --git a/lightning/common/storage.go b/lightning/common/storage.go new file mode 100644 index 000000000..e03820319 --- /dev/null +++ b/lightning/common/storage.go @@ -0,0 +1,23 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// TODO: Deduplicate this implementation with DM! + +package common + +// StorageSize represents the storage's capacity and available size +// Learn from tidb-binlog source code. +type StorageSize struct { + Capacity uint64 + Available uint64 +} diff --git a/lightning/common/storage_test.go b/lightning/common/storage_test.go new file mode 100644 index 000000000..dba3565fb --- /dev/null +++ b/lightning/common/storage_test.go @@ -0,0 +1,34 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package common_test + +import ( + . "github.com/pingcap/check" + + "github.com/pingcap/tidb-lightning/lightning/common" +) + +var _ = Suite(&testStorageSuite{}) + +type testStorageSuite struct { +} + +func (t *testStorageSuite) TestGetStorageSize(c *C) { + // only ensure we can get storage size. + d := c.MkDir() + size, err := common.GetStorageSize(d) + c.Assert(err, IsNil) + c.Assert(size.Capacity, Greater, uint64(0)) + c.Assert(size.Available, Greater, uint64(0)) +} diff --git a/lightning/common/storage_unix.go b/lightning/common/storage_unix.go new file mode 100644 index 000000000..fcae82db7 --- /dev/null +++ b/lightning/common/storage_unix.go @@ -0,0 +1,57 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !windows + +// TODO: Deduplicate this implementation with DM! + +package common + +import ( + "reflect" + + "golang.org/x/sys/unix" + + "github.com/pingcap/errors" +) + +// GetStorageSize gets storage's capacity and available size +func GetStorageSize(dir string) (size StorageSize, err error) { + var stat unix.Statfs_t + + err = unix.Statfs(dir, &stat) + if err != nil { + return size, errors.Annotatef(err, "cannot get disk capacity at %s", dir) + } + + // When container is run in MacOS, `bsize` obtained by `statfs` syscall is not the fundamental block size, + // but the `iosize` (optimal transfer block size) instead, it's usually 1024 times larger than the `bsize`. + // for example `4096 * 1024`. To get the correct block size, we should use `frsize`. But `frsize` isn't + // guaranteed to be supported everywhere, so we need to check whether it's supported before use it. + // For more details, please refer to: https://github.com/docker/for-mac/issues/2136 + bSize := uint64(stat.Bsize) + field := reflect.ValueOf(&stat).Elem().FieldByName("Frsize") + if field.IsValid() { + if field.Kind() == reflect.Uint64 { + bSize = field.Uint() + } else { + bSize = uint64(field.Int()) + } + } + + // Available blocks * size per block = available space in bytes + size.Available = stat.Bavail * bSize + size.Capacity = stat.Blocks * bSize + + return +} diff --git a/lightning/common/storage_windows.go b/lightning/common/storage_windows.go new file mode 100644 index 000000000..7df060d24 --- /dev/null +++ b/lightning/common/storage_windows.go @@ -0,0 +1,44 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +// TODO: Deduplicate this implementation with DM! + +package common + +import ( + "syscall" + "unsafe" + + "github.com/pingcap/errors" +) + +var ( + kernel32 = syscall.MustLoadDLL("kernel32.dll") + getDiskFreeSpaceExW = kernel32.MustFindProc("GetDiskFreeSpaceExW") +) + +// GetStorageSize gets storage's capacity and available size +func GetStorageSize(dir string) (size StorageSize, err error) { + r, _, e := getDiskFreeSpaceExW.Call( + uintptr(unsafe.Pointer(syscall.StringToUTF16Ptr(dir))), + uintptr(unsafe.Pointer(&size.Available)), + uintptr(unsafe.Pointer(&size.Capacity)), + 0, + ) + if r == 0 { + err = errors.Annotatef(e, "cannot get disk capacity at %s", dir) + } + return +} diff --git a/lightning/common/util.go b/lightning/common/util.go index bcd523271..3ee6df1eb 100644 --- a/lightning/common/util.go +++ b/lightning/common/util.go @@ -198,7 +198,18 @@ var stdErrorType = reflect.TypeOf(stderrors.New("")) // IsRetryableError returns whether the error is transient (e.g. network // connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This // function returns `false` (irrecoverable) if `err == nil`. +// +// If the error is a multierr, returns true only if all suberrors are retryable. func IsRetryableError(err error) bool { + for _, singleError := range errors.Errors(err) { + if !isSingleRetryableError(singleError) { + return false + } + } + return true +} + +func isSingleRetryableError(err error) bool { err = errors.Cause(err) switch err { diff --git a/lightning/common/util_test.go b/lightning/common/util_test.go index de392d32d..1330f765c 100644 --- a/lightning/common/util_test.go +++ b/lightning/common/util_test.go @@ -28,6 +28,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" tmysql "github.com/pingcap/tidb/errno" + "go.uber.org/multierr" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -119,6 +120,11 @@ func (s *utilSuite) TestIsRetryableError(c *C) { // sqlmock errors c.Assert(common.IsRetryableError(fmt.Errorf("call to database Close was not expected")), IsFalse) c.Assert(common.IsRetryableError(errors.New("call to database Close was not expected")), IsTrue) + + // multierr + c.Assert(common.IsRetryableError(multierr.Combine(context.Canceled, context.Canceled)), IsFalse) + c.Assert(common.IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true})), IsTrue) + c.Assert(common.IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})), IsFalse) } func (s *utilSuite) TestToDSN(c *C) { diff --git a/lightning/config/config.go b/lightning/config/config.go index 8e30659a5..04721bba9 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -19,6 +19,7 @@ import ( "fmt" "net" "net/url" + "os" "path/filepath" "runtime" "strconv" @@ -26,6 +27,7 @@ import ( "time" "github.com/BurntSushi/toml" + "github.com/docker/go-units" gomysql "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/parser/mysql" @@ -70,6 +72,28 @@ const ( defaultChecksumTableConcurrency = 2 ) +const ( + LocalMemoryTableSize = 512 * units.MiB + + // autoDiskQuotaLocalReservedSize is the estimated size a local-backend + // engine may gain after calling Flush(). This is currently defined by its + // max MemTable size (512 MiB). It is used to compensate for the soft limit + // of the disk quota against the hard limit of the disk free space. + // + // With a maximum of 8 engines, this should contribute 4.0 GiB to the + // reserved size. + autoDiskQuotaLocalReservedSize uint64 = LocalMemoryTableSize + + // autoDiskQuotaLocalReservedSpeed is the estimated size increase per + // millisecond per write thread the local backend may gain on all engines. + // This is used to compute the maximum size overshoot between two disk quota + // checks, if the first one has barely passed. + // + // With cron.check-disk-quota = 1m, region-concurrency = 40, this should + // contribute 2.3 GiB to the reserved size. + autoDiskQuotaLocalReservedSpeed uint64 = 1 * units.KiB +) + var ( defaultConfigPaths = []string{"tidb-lightning.toml", "conf/tidb-lightning.toml"} supportedStorageTypes = []string{"file", "local", "s3", "noop"} @@ -260,6 +284,7 @@ type TikvImporter struct { SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"` RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"` SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"` + DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"` RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"` } @@ -272,8 +297,9 @@ type Checkpoint struct { } type Cron struct { - SwitchMode Duration `toml:"switch-mode" json:"switch-mode"` - LogProgress Duration `toml:"log-progress" json:"log-progress"` + SwitchMode Duration `toml:"switch-mode" json:"switch-mode"` + LogProgress Duration `toml:"log-progress" json:"log-progress"` + CheckDiskQuota Duration `toml:"check-disk-quota" json:"check-disk-quota"` } type Security struct { @@ -347,8 +373,9 @@ func NewConfig() *Config { ChecksumTableConcurrency: defaultChecksumTableConcurrency, }, Cron: Cron{ - SwitchMode: Duration{Duration: 5 * time.Minute}, - LogProgress: Duration{Duration: 5 * time.Minute}, + SwitchMode: Duration{Duration: 5 * time.Minute}, + LogProgress: Duration{Duration: 5 * time.Minute}, + CheckDiskQuota: Duration{Duration: 1 * time.Minute}, }, Mydumper: MydumperRuntime{ ReadBlockSize: ReadBlockSize, @@ -553,6 +580,40 @@ func (cfg *Config) Adjust(ctx context.Context) error { if len(cfg.TikvImporter.SortedKVDir) == 0 { return errors.Errorf("tikv-importer.sorted-kv-dir must not be empty!") } + + storageSizeDir := filepath.Clean(cfg.TikvImporter.SortedKVDir) + sortedKVDirInfo, err := os.Stat(storageSizeDir) + switch { + case os.IsNotExist(err): + // the sorted-kv-dir does not exist, meaning we will create it automatically. + // so we extract the storage size from its parent directory. + storageSizeDir = filepath.Dir(storageSizeDir) + case err == nil: + if !sortedKVDirInfo.IsDir() { + return errors.Errorf("tikv-importer.sorted-kv-dir ('%s') is not a directory", storageSizeDir) + } + default: + return errors.Annotate(err, "invalid tikv-importer.sorted-kv-dir") + } + + if cfg.TikvImporter.DiskQuota == 0 { + enginesCount := uint64(cfg.App.IndexConcurrency + cfg.App.TableConcurrency) + writeAmount := uint64(cfg.App.RegionConcurrency) * uint64(cfg.Cron.CheckDiskQuota.Milliseconds()) + reservedSize := enginesCount*autoDiskQuotaLocalReservedSize + writeAmount*autoDiskQuotaLocalReservedSpeed + + storageSize, err := common.GetStorageSize(storageSizeDir) + if err != nil { + return err + } + if storageSize.Available <= reservedSize { + return errors.Errorf( + "insufficient disk free space on `%s` (only %s, expecting >%s), please use a storage with enough free space, or specify `tikv-importer.disk-quota`", + cfg.TikvImporter.SortedKVDir, + units.BytesSize(float64(storageSize.Available)), + units.BytesSize(float64(reservedSize))) + } + cfg.TikvImporter.DiskQuota = ByteSize(storageSize.Available - reservedSize) + } } if cfg.TikvImporter.Backend == BackendTiDB { diff --git a/lightning/config/config_test.go b/lightning/config/config_test.go index 26f4230c4..6e14b7846 100644 --- a/lightning/config/config_test.go +++ b/lightning/config/config_test.go @@ -65,6 +65,7 @@ func assignMinimalLegalValue(cfg *config.Config) { cfg.TiDB.StatusPort = 8901 cfg.TiDB.PdAddr = "234.56.78.90:12345" cfg.Mydumper.SourceDir = "file://." + cfg.TikvImporter.DiskQuota = 1 } func (s *configTestSuite) TestAdjustPdAddrAndPort(c *C) { @@ -144,9 +145,9 @@ func (s *configTestSuite) TestAdjustFileRoutePath(c *C) { rule := &config.FileRouteRule{Path: invalidPath, Type: "sql", Schema: "test", Table: "tbl"} cfg.Mydumper.FileRouters = []*config.FileRouteRule{rule} err := cfg.Adjust(ctx) - c.Assert(err, ErrorMatches, fmt.Sprintf("file route path '%s' is not in source dir '%s'", invalidPath, tmpDir)) + c.Assert(err, ErrorMatches, fmt.Sprintf("\\Qfile route path '%s' is not in source dir '%s'\\E", invalidPath, tmpDir)) - relPath := "test_dir/1.sql" + relPath := filepath.FromSlash("test_dir/1.sql") rule.Path = filepath.Join(tmpDir, relPath) err = cfg.Adjust(ctx) c.Assert(err, IsNil) @@ -620,13 +621,12 @@ func (s *configTestSuite) TestTomlPostRestore(c *C) { func (s *configTestSuite) TestCronEncodeDecode(c *C) { cfg := &config.Config{} - d, _ := time.ParseDuration("1m") - cfg.Cron.SwitchMode.Duration = d - d, _ = time.ParseDuration("2m") - cfg.Cron.LogProgress.Duration = d + cfg.Cron.SwitchMode.Duration = 1 * time.Minute + cfg.Cron.LogProgress.Duration = 2 * time.Minute + cfg.Cron.CheckDiskQuota.Duration = 3 * time.Second var b bytes.Buffer c.Assert(toml.NewEncoder(&b).Encode(cfg.Cron), IsNil) - c.Assert(b.String(), Equals, "switch-mode = \"1m0s\"\nlog-progress = \"2m0s\"\n") + c.Assert(b.String(), Equals, "switch-mode = \"1m0s\"\nlog-progress = \"2m0s\"\ncheck-disk-quota = \"3s\"\n") confStr := "[cron]\r\n" + b.String() cfg2 := &config.Config{} diff --git a/lightning/lightning_test.go b/lightning/lightning_test.go index ec514eea5..20ec20691 100644 --- a/lightning/lightning_test.go +++ b/lightning/lightning_test.go @@ -468,7 +468,7 @@ func (s *lightningServerSuite) TestCheckSystemRequirement(c *C) { } // with max open files 1024, the max table size will be: 65536MB - err := failpoint.Enable("github.com/pingcap/tidb-lightning/lightning/backend/GetRlimitValue", "return(8199)") + err := failpoint.Enable("github.com/pingcap/tidb-lightning/lightning/backend/GetRlimitValue", "return(2049)") c.Assert(err, IsNil) err = failpoint.Enable("github.com/pingcap/tidb-lightning/lightning/backend/SetRlimitError", "return(true)") c.Assert(err, IsNil) diff --git a/lightning/metric/metric.go b/lightning/metric/metric.go index bce7ad4bb..b4ffdc9c5 100644 --- a/lightning/metric/metric.go +++ b/lightning/metric/metric.go @@ -187,6 +187,14 @@ var ( Buckets: prometheus.ExponentialBuckets(1, 2.2679331552660544, 10), }, ) + + LocalStorageUsageBytesGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "lightning", + Name: "local_storage_usage_bytes", + Help: "disk/memory size currently occupied by intermediate files in local backend", + }, []string{"medium"}, + ) ) func init() { @@ -208,6 +216,7 @@ func init() { prometheus.MustRegister(ChecksumSecondsHistogram) prometheus.MustRegister(ChunkParserReadBlockSecondsHistogram) prometheus.MustRegister(ApplyWorkerSecondsHistogram) + prometheus.MustRegister(LocalStorageUsageBytesGauge) } func RecordTableCount(status string, err error) { @@ -239,7 +248,7 @@ func ReadCounter(counter prometheus.Counter) float64 { return metric.Counter.GetValue() } -// ReadCounter reports the sum of all observed values in the histogram. +// ReadHistogramSum reports the sum of all observed values in the histogram. func ReadHistogramSum(histogram prometheus.Histogram) float64 { var metric dto.Metric if err := histogram.Write(&metric); err != nil { diff --git a/lightning/mydump/loader_test.go b/lightning/mydump/loader_test.go index 63e98e188..6aff65b63 100644 --- a/lightning/mydump/loader_test.go +++ b/lightning/mydump/loader_test.go @@ -516,17 +516,23 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) { }, }, { - DB: "d1", - Name: "v1", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "d1/v1-table.sql", Type: md.SourceTypeTableSchema}}, - DataFiles: []md.FileInfo{}, + DB: "d1", + Name: "v1", + SchemaFile: md.FileInfo{ + TableName: filter.Table{Schema: "d1", Name: "v1"}, + FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d1/v1-table.sql"), Type: md.SourceTypeTableSchema}, + }, + DataFiles: []md.FileInfo{}, }, }, Views: []*md.MDTableMeta{ { - DB: "d1", - Name: "v1", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "d1/v1-view.sql", Type: md.SourceTypeViewSchema}}, + DB: "d1", + Name: "v1", + SchemaFile: md.FileInfo{ + TableName: filter.Table{Schema: "d1", Name: "v1"}, + FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d1/v1-view.sql"), Type: md.SourceTypeViewSchema}, + }, }, }, }, diff --git a/lightning/mydump/parquet_parser_test.go b/lightning/mydump/parquet_parser_test.go index c8e92e25c..d17d47365 100644 --- a/lightning/mydump/parquet_parser_test.go +++ b/lightning/mydump/parquet_parser_test.go @@ -5,6 +5,7 @@ import ( "io" "path/filepath" "strconv" + "time" "github.com/pingcap/br/pkg/storage" . "github.com/pingcap/check" @@ -81,6 +82,13 @@ func (s testParquetParserSuite) TestParquetParser(c *C) { } func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { + // those deprecated TIME/TIMESTAMP types depend on the local timezone! + prevTZ := time.Local + time.Local = time.FixedZone("UTC+8", 8*60*60) + defer func() { + time.Local = prevTZ + }() + type Test struct { Date int32 `parquet:"name=date, type=DATE"` TimeMillis int32 `parquet:"name=timemillis, type=TIME_MILLIS"` @@ -108,7 +116,7 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { v := &Test{ Date: 18564, //2020-10-29 - TimeMillis: 62775123, // 17:26:15.123 + TimeMillis: 62775123, // 17:26:15.123 (note all time are in UTC+8!) TimeMicros: 62775123000, // 17:26:15.123 TimestampMillis: 1603963672356, // 2020-10-29T17:27:52.356 TimestampMicros: 1603963672356956, //2020-10-29T17:27:52.356956 diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index bd53e13f2..6dcf198c5 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/collate" + "go.uber.org/multierr" "go.uber.org/zap" "modernc.org/mathutil" @@ -127,6 +128,12 @@ func (es *errorSummaries) record(tableName string, err error, status CheckpointS es.summary[tableName] = errorSummary{status: status, err: err} } +const ( + diskQuotaStateIdle int32 = iota + diskQuotaStateChecking + diskQuotaStateImporting +) + type RestoreController struct { cfg *config.Config dbMetas []*mydump.MDDatabaseMeta @@ -154,6 +161,9 @@ type RestoreController struct { closedEngineLimit *worker.Pool store storage.ExternalStorage checksumManager ChecksumManager + + diskQuotaLock sync.RWMutex + diskQuotaState int32 } func NewRestoreController( @@ -828,22 +838,34 @@ func (rc *RestoreController) listenCheckpointUpdates() { } func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan struct{}) { - logProgressTicker := time.NewTicker(rc.cfg.Cron.LogProgress.Duration) - defer logProgressTicker.Stop() + // a nil channel blocks forever. + // if the cron duration is zero we use the nil channel to skip the action. + var logProgressChan <-chan time.Time + if rc.cfg.Cron.LogProgress.Duration > 0 { + logProgressTicker := time.NewTicker(rc.cfg.Cron.LogProgress.Duration) + defer logProgressTicker.Stop() + logProgressChan = logProgressTicker.C + } glueProgressTicker := time.NewTicker(3 * time.Second) defer glueProgressTicker.Stop() var switchModeChan <-chan time.Time - // tide backend don't need to switch tikv to import mode - if rc.cfg.TikvImporter.Backend != config.BackendTiDB { + // tidb backend don't need to switch tikv to import mode + if rc.cfg.TikvImporter.Backend != config.BackendTiDB && rc.cfg.Cron.SwitchMode.Duration > 0 { switchModeTicker := time.NewTicker(rc.cfg.Cron.SwitchMode.Duration) defer switchModeTicker.Stop() switchModeChan = switchModeTicker.C rc.switchToImportMode(ctx) - } else { - switchModeChan = make(chan time.Time) + } + + var checkQuotaChan <-chan time.Time + // only local storage has disk quota concern. + if rc.cfg.TikvImporter.Backend == config.BackendLocal && rc.cfg.Cron.CheckDiskQuota.Duration > 0 { + checkQuotaTicker := time.NewTicker(rc.cfg.Cron.CheckDiskQuota.Duration) + defer checkQuotaTicker.Stop() + checkQuotaChan = checkQuotaTicker.C } start := time.Now() @@ -860,7 +882,7 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan // periodically switch to import mode, as requested by TiKV 3.0 rc.switchToImportMode(ctx) - case <-logProgressTicker.C: + case <-logProgressChan: // log the current progress periodically, so OPS will know that we're still working nanoseconds := float64(time.Since(start).Nanoseconds()) // the estimated chunk is not accurate(likely under estimated), but the actual count is not accurate @@ -945,6 +967,12 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan zap.String("state", state), remaining, ) + + case <-checkQuotaChan: + // verify the total space occupied by sorted-kv-dir is below the quota, + // otherwise we perform an emergency import. + rc.enforceDiskQuota(ctx) + case <-glueProgressTicker.C: finished := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished)) rc.tidbGlue.Record(glue.RecordFinishedChunk, uint64(finished)) @@ -1334,6 +1362,7 @@ func (t *TableRestore) restoreEngines(ctx context.Context, rc *RestoreController rc.postProcessLock.Lock() } err = t.importKV(ctx, closedIndexEngine, rc, indexEngineID) + rc.saveStatusCheckpoint(t.tableName, indexEngineID, err, CheckpointStatusImported) if !rc.isLocalBackend() { rc.postProcessLock.Unlock() } @@ -1367,7 +1396,19 @@ func (t *TableRestore) restoreEngine( return closedEngine, nil } - indexWriter, err := indexEngine.LocalWriter(ctx) + // In Local backend, the local writer will produce an SST file for batch + // ingest into the local DB every 1000 KV pairs or up to 512 MiB. + // There are (region-concurrency) data writers, and (index-concurrency) index writers. + // Thus, the disk size occupied by these writers are up to + // (region-concurrency + index-concurrency) * 512 MiB. + // This number should not exceed the disk quota. + // Therefore, we need to reduce that "512 MiB" to respect the disk quota: + localWriterMaxCacheSize := int64(rc.cfg.TikvImporter.DiskQuota) // int64(rc.cfg.App.IndexConcurrency+rc.cfg.App.RegionConcurrency) + if localWriterMaxCacheSize > config.LocalMemoryTableSize { + localWriterMaxCacheSize = config.LocalMemoryTableSize + } + + indexWriter, err := indexEngine.LocalWriter(ctx, localWriterMaxCacheSize) if err != nil { return nil, errors.Trace(err) } @@ -1415,7 +1456,7 @@ func (t *TableRestore) restoreEngine( restoreWorker := rc.regionWorkers.Apply() wg.Add(1) - dataWriter, err := dataEngine.LocalWriter(ctx) + dataWriter, err := dataEngine.LocalWriter(ctx, localWriterMaxCacheSize) if err != nil { return nil, errors.Trace(err) } @@ -1461,7 +1502,7 @@ func (t *TableRestore) restoreEngine( ) flushAndSaveAllChunks := func() error { - if err = indexEngine.Flush(); err != nil { + if err = indexEngine.Flush(ctx); err != nil { return errors.Trace(err) } // Currently we write all the checkpoints after data&index engine are flushed. @@ -1532,6 +1573,7 @@ func (t *TableRestore) importEngine( rc.postProcessLock.Lock() } err := t.importKV(ctx, closedEngine, rc, engineID) + rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusImported) if !rc.isLocalBackend() { rc.postProcessLock.Unlock() } @@ -1727,6 +1769,91 @@ func (rc *RestoreController) switchTiKVMode(ctx context.Context, mode sstpb.Swit ) } +func (rc *RestoreController) enforceDiskQuota(ctx context.Context) { + if !atomic.CompareAndSwapInt32(&rc.diskQuotaState, diskQuotaStateIdle, diskQuotaStateChecking) { + // do not run multiple the disk quota check / import simultaneously. + // (we execute the lock check in background to avoid blocking the cron thread) + return + } + + go func() { + // locker is assigned when we detect the disk quota is exceeded. + // before the disk quota is confirmed exceeded, we keep the diskQuotaLock + // unlocked to avoid periodically interrupting the writer threads. + var locker sync.Locker + defer func() { + atomic.StoreInt32(&rc.diskQuotaState, diskQuotaStateIdle) + if locker != nil { + locker.Unlock() + } + }() + + isRetrying := false + + for { + // sleep for a cycle if we are retrying because there is nothing new to import. + if isRetrying { + select { + case <-ctx.Done(): + return + case <-time.After(rc.cfg.Cron.CheckDiskQuota.Duration): + } + } else { + isRetrying = true + } + + quota := int64(rc.cfg.TikvImporter.DiskQuota) + largeEngines, inProgressLargeEngines, totalDiskSize, totalMemSize := rc.backend.CheckDiskQuota(quota) + metric.LocalStorageUsageBytesGauge.WithLabelValues("disk").Set(float64(totalDiskSize)) + metric.LocalStorageUsageBytesGauge.WithLabelValues("mem").Set(float64(totalMemSize)) + + logger := log.With( + zap.Int64("diskSize", totalDiskSize), + zap.Int64("memSize", totalMemSize), + zap.Int64("quota", quota), + zap.Int("largeEnginesCount", len(largeEngines)), + zap.Int("inProgressLargeEnginesCount", inProgressLargeEngines)) + + if len(largeEngines) == 0 && inProgressLargeEngines == 0 { + logger.Debug("disk quota respected") + return + } + + if locker == nil { + // blocks all writers when we detected disk quota being exceeded. + rc.diskQuotaLock.Lock() + locker = &rc.diskQuotaLock + } + + logger.Warn("disk quota exceeded") + if len(largeEngines) == 0 { + logger.Warn("all large engines are already importing, keep blocking all writes") + continue + } + + // flush all engines so that checkpoints can be updated. + if err := rc.backend.FlushAll(ctx); err != nil { + logger.Error("flush engine for disk quota failed, check again later", log.ShortError(err)) + return + } + + // at this point, all engines are synchronized on disk. + // we then import every large engines one by one and complete. + // if any engine failed to import, we just try again next time, since the data are still intact. + atomic.StoreInt32(&rc.diskQuotaState, diskQuotaStateImporting) + task := logger.Begin(zap.WarnLevel, "importing large engines for disk quota") + var importErr error + for _, engine := range largeEngines { + if err := rc.backend.UnsafeImportAndReset(ctx, engine); err != nil { + importErr = multierr.Append(importErr, err) + } + } + task.End(zap.ErrorLevel, importErr) + return + } + }() +} + func (rc *RestoreController) checkRequirements(ctx context.Context) error { // skip requirement check if explicitly turned off if !rc.cfg.App.CheckRequirements { @@ -2160,42 +2287,53 @@ func (cr *chunkRestore) deliverLoop( } } - // Write KVs into the engine - start := time.Now() - - if err = dataEngine.WriteRows(ctx, columns, dataKVs); err != nil { - deliverLogger.Error("write to data engine failed", log.ShortError(err)) - return - } - if err = indexEngine.WriteRows(ctx, columns, indexKVs); err != nil { - deliverLogger.Error("write to index engine failed", log.ShortError(err)) - return + // we are allowed to save checkpoint when the disk quota state moved to "importing" + // since all engines are flushed. + if atomic.LoadInt32(&rc.diskQuotaState) == diskQuotaStateImporting { + saveCheckpoint(rc, t, engineID, cr.chunk) } - deliverDur := time.Since(start) - deliverTotalDur += deliverDur - metric.BlockDeliverSecondsHistogram.Observe(deliverDur.Seconds()) - metric.BlockDeliverBytesHistogram.WithLabelValues(metric.BlockDeliverKindData).Observe(float64(dataChecksum.SumSize())) - metric.BlockDeliverBytesHistogram.WithLabelValues(metric.BlockDeliverKindIndex).Observe(float64(indexChecksum.SumSize())) - metric.BlockDeliverKVPairsHistogram.WithLabelValues(metric.BlockDeliverKindData).Observe(float64(dataChecksum.SumKVS())) - metric.BlockDeliverKVPairsHistogram.WithLabelValues(metric.BlockDeliverKindIndex).Observe(float64(indexChecksum.SumKVS())) + func() { + rc.diskQuotaLock.RLock() + defer rc.diskQuotaLock.RUnlock() + + // Write KVs into the engine + start := time.Now() + + if err = dataEngine.WriteRows(ctx, columns, dataKVs); err != nil { + deliverLogger.Error("write to data engine failed", log.ShortError(err)) + return + } + if err = indexEngine.WriteRows(ctx, columns, indexKVs); err != nil { + deliverLogger.Error("write to index engine failed", log.ShortError(err)) + return + } + + deliverDur := time.Since(start) + deliverTotalDur += deliverDur + metric.BlockDeliverSecondsHistogram.Observe(deliverDur.Seconds()) + metric.BlockDeliverBytesHistogram.WithLabelValues(metric.BlockDeliverKindData).Observe(float64(dataChecksum.SumSize())) + metric.BlockDeliverBytesHistogram.WithLabelValues(metric.BlockDeliverKindIndex).Observe(float64(indexChecksum.SumSize())) + metric.BlockDeliverKVPairsHistogram.WithLabelValues(metric.BlockDeliverKindData).Observe(float64(dataChecksum.SumKVS())) + metric.BlockDeliverKVPairsHistogram.WithLabelValues(metric.BlockDeliverKindIndex).Observe(float64(indexChecksum.SumKVS())) + }() // Update the table, and save a checkpoint. // (the write to the importer is effective immediately, thus update these here) - // No need to apply a lock since this is the only thread updating these variables. + // No need to apply a lock since this is the only thread updating `cr.chunk.**`. + // In local mode, we should write these checkpoint after engine flushed. cr.chunk.Checksum.Add(&dataChecksum) cr.chunk.Checksum.Add(&indexChecksum) cr.chunk.Chunk.Offset = offset cr.chunk.Chunk.PrevRowIDMax = rowID - // IN local mode, we should write these checkpoint after engine flushed if !rc.isLocalBackend() && (dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0) { // No need to save checkpoint if nothing was delivered. saveCheckpoint(rc, t, engineID, cr.chunk) } - failpoint.Inject("FailAfterWriteRows", func() { - time.Sleep(time.Second) - panic("forcing failure due to FailAfterWriteRows") + failpoint.Inject("SlowDownWriteRows", func() { + deliverLogger.Warn("Slowed down write rows") }) + failpoint.Inject("FailAfterWriteRows", nil) // TODO: for local backend, we may save checkpoint more frequently, e.g. after writen // 10GB kv pairs to data engine, we can do a flush for both data & index engine, then we // can safely update current checkpoint. diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 7ba75b6ce..4180ea0e0 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -836,18 +836,18 @@ func (s *chunkRestoreSuite) TestDeliverLoopEmptyData(c *C) { mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil).Times(2) mockBackend.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes() mockWriter := mock.NewMockEngineWriter(controller) - mockBackend.EXPECT().LocalWriter(ctx, gomock.Any()).Return(mockWriter, nil).AnyTimes() + mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(2048)).Return(mockWriter, nil).AnyTimes() mockWriter.EXPECT(). AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(nil).AnyTimes() dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0) c.Assert(err, IsNil) - dataWriter, err := dataEngine.LocalWriter(ctx) + dataWriter, err := dataEngine.LocalWriter(ctx, 2048) c.Assert(err, IsNil) indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1) c.Assert(err, IsNil) - indexWriter, err := indexEngine.LocalWriter(ctx) + indexWriter, err := indexEngine.LocalWriter(ctx, 2048) c.Assert(err, IsNil) // Deliver nothing. @@ -876,16 +876,16 @@ func (s *chunkRestoreSuite) TestDeliverLoop(c *C) { mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil).Times(2) mockBackend.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes() mockWriter := mock.NewMockEngineWriter(controller) - mockBackend.EXPECT().LocalWriter(ctx, gomock.Any()).Return(mockWriter, nil).AnyTimes() + mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(2048)).Return(mockWriter, nil).AnyTimes() dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0) c.Assert(err, IsNil) indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1) c.Assert(err, IsNil) - dataWriter, err := dataEngine.LocalWriter(ctx) + dataWriter, err := dataEngine.LocalWriter(ctx, 2048) c.Assert(err, IsNil) - indexWriter, err := indexEngine.LocalWriter(ctx) + indexWriter, err := indexEngine.LocalWriter(ctx, 2048) c.Assert(err, IsNil) // Set up the expected API calls to the data engine... @@ -1096,9 +1096,9 @@ func (s *chunkRestoreSuite) TestRestore(c *C) { c.Assert(err, IsNil) indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1) c.Assert(err, IsNil) - dataWriter, err := dataEngine.LocalWriter(ctx) + dataWriter, err := dataEngine.LocalWriter(ctx, 2048) c.Assert(err, IsNil) - indexWriter, err := indexEngine.LocalWriter(ctx) + indexWriter, err := indexEngine.LocalWriter(ctx, 2048) c.Assert(err, IsNil) // Expected API sequence diff --git a/mock/backend.go b/mock/backend.go index 7e73a985b..e7d9a0038 100644 --- a/mock/backend.go +++ b/mock/backend.go @@ -97,6 +97,20 @@ func (mr *MockBackendMockRecorder) CloseEngine(arg0, arg1 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseEngine", reflect.TypeOf((*MockBackend)(nil).CloseEngine), arg0, arg1) } +// EngineFileSizes mocks base method +func (m *MockBackend) EngineFileSizes() []backend.EngineFileSize { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EngineFileSizes") + ret0, _ := ret[0].([]backend.EngineFileSize) + return ret0 +} + +// EngineFileSizes indicates an expected call of EngineFileSizes +func (mr *MockBackendMockRecorder) EngineFileSizes() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EngineFileSizes", reflect.TypeOf((*MockBackend)(nil).EngineFileSizes)) +} + // FetchRemoteTableModels mocks base method func (m *MockBackend) FetchRemoteTableModels(arg0 context.Context, arg1 string) ([]*model.TableInfo, error) { m.ctrl.T.Helper() @@ -112,6 +126,34 @@ func (mr *MockBackendMockRecorder) FetchRemoteTableModels(arg0, arg1 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchRemoteTableModels", reflect.TypeOf((*MockBackend)(nil).FetchRemoteTableModels), arg0, arg1) } +// FlushAllEngines mocks base method +func (m *MockBackend) FlushAllEngines(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FlushAllEngines", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// FlushAllEngines indicates an expected call of FlushAllEngines +func (mr *MockBackendMockRecorder) FlushAllEngines(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlushAllEngines", reflect.TypeOf((*MockBackend)(nil).FlushAllEngines), arg0) +} + +// FlushEngine mocks base method +func (m *MockBackend) FlushEngine(arg0 context.Context, arg1 uuid.UUID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FlushEngine", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// FlushEngine indicates an expected call of FlushEngine +func (mr *MockBackendMockRecorder) FlushEngine(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlushEngine", reflect.TypeOf((*MockBackend)(nil).FlushEngine), arg0, arg1) +} + // ImportEngine mocks base method func (m *MockBackend) ImportEngine(arg0 context.Context, arg1 uuid.UUID) error { m.ctrl.T.Helper() @@ -127,18 +169,18 @@ func (mr *MockBackendMockRecorder) ImportEngine(arg0, arg1 interface{}) *gomock. } // LocalWriter mocks base method -func (m *MockBackend) LocalWriter(arg0 context.Context, arg1 uuid.UUID) (backend.EngineWriter, error) { +func (m *MockBackend) LocalWriter(arg0 context.Context, arg1 uuid.UUID, arg2 int64) (backend.EngineWriter, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LocalWriter", arg0, arg1) + ret := m.ctrl.Call(m, "LocalWriter", arg0, arg1, arg2) ret0, _ := ret[0].(backend.EngineWriter) ret1, _ := ret[1].(error) return ret0, ret1 } // LocalWriter indicates an expected call of LocalWriter -func (mr *MockBackendMockRecorder) LocalWriter(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockBackendMockRecorder) LocalWriter(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LocalWriter", reflect.TypeOf((*MockBackend)(nil).LocalWriter), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LocalWriter", reflect.TypeOf((*MockBackend)(nil).LocalWriter), arg0, arg1, arg2) } // MakeEmptyRows mocks base method @@ -184,6 +226,20 @@ func (mr *MockBackendMockRecorder) OpenEngine(arg0, arg1 interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenEngine", reflect.TypeOf((*MockBackend)(nil).OpenEngine), arg0, arg1) } +// ResetEngine mocks base method +func (m *MockBackend) ResetEngine(arg0 context.Context, arg1 uuid.UUID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ResetEngine", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ResetEngine indicates an expected call of ResetEngine +func (mr *MockBackendMockRecorder) ResetEngine(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetEngine", reflect.TypeOf((*MockBackend)(nil).ResetEngine), arg0, arg1) +} + // RetryImportDelay mocks base method func (m *MockBackend) RetryImportDelay() time.Duration { m.ctrl.T.Helper() diff --git a/tests/README.md b/tests/README.md index 4777998ea..6190ab52e 100644 --- a/tests/README.md +++ b/tests/README.md @@ -55,7 +55,7 @@ The script should exit with a nonzero error code on failure. Several convenient commands are provided: * `run_lightning [CONFIG]` — Starts `tidb-lightning` using `tests/TEST_NAME/CONFIG.toml` -* `run_sql ` — Executes an SQL query on the TiDB database +* `run_sql ` — Executes an SQL query on the TiDB database * `check_contains ` — Checks if the previous `run_sql` result contains the given text (in `-E` format) * `check_not_contains ` — Checks if the previous `run_sql` result does not contain the given diff --git a/tests/_utils/run_lightning b/tests/_utils/run_lightning index 22352f2b8..945439f26 100755 --- a/tests/_utils/run_lightning +++ b/tests/_utils/run_lightning @@ -27,7 +27,7 @@ bin/tidb-lightning.test -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.$$.out" DEV --config "tests/$TEST_NAME/config.toml" \ -d "tests/$TEST_NAME/data" \ --importer '127.0.0.1:8808' \ - --sorted-kv-dir "$TEST_DIR/sorted" \ + --sorted-kv-dir "$TEST_DIR/$TEST_NAME.sorted" \ --enable-checkpoint=0 \ --check-requirements=0 \ "$@" diff --git a/tests/_utils/run_sql b/tests/_utils/run_sql index 50010c7b1..06099b9c3 100755 --- a/tests/_utils/run_sql +++ b/tests/_utils/run_sql @@ -14,12 +14,13 @@ # limitations under the License. set -eu -TEST_DIR=/tmp/lightning_test_result +SQL="$1" +shift -echo "[$(date)] Executing SQL: ${*: -1:1}" > "$TEST_DIR/sql_res.$TEST_NAME.txt" +echo "[$(date)] Executing SQL: $SQL" > "$TEST_DIR/sql_res.$TEST_NAME.txt" mysql -uroot -h127.0.0.1 -P4000 \ --ssl-ca="$TEST_DIR/tls/ca.pem" \ --ssl-cert="$TEST_DIR/tls/curl.pem" \ --ssl-key="$TEST_DIR/tls/curl.key" \ - ${@:1:$#-1} \ - --default-character-set utf8 -E -e "${*: -1:1}" >> "$TEST_DIR/sql_res.$TEST_NAME.txt" + "$@" \ + --default-character-set utf8 -E -e "$SQL" >> "$TEST_DIR/sql_res.$TEST_NAME.txt" diff --git a/tests/checkpoint_columns/run.sh b/tests/checkpoint_columns/run.sh index 3ced60652..23b55e5e6 100755 --- a/tests/checkpoint_columns/run.sh +++ b/tests/checkpoint_columns/run.sh @@ -26,7 +26,8 @@ echo "INSERT INTO tbl (j, i) VALUES (3, 1),(4, 2);" > "$DBPATH/cp_tsr.tbl.sql" # Set minDeliverBytes to a small enough number to only write only 1 row each time # Set the failpoint to kill the lightning instance as soon as one row is written -export GO_FAILPOINTS="github.com/pingcap/tidb-lightning/lightning/restore/FailAfterWriteRows=return;github.com/pingcap/tidb-lightning/lightning/restore/SetMinDeliverBytes=return(1)" +PKG="github.com/pingcap/tidb-lightning/lightning/restore" +export GO_FAILPOINTS="$PKG/SlowDownWriteRows=sleep(1000);$PKG/FailAfterWriteRows=panic;$PKG/SetMinDeliverBytes=return(1)" # Start importing the tables. run_sql 'DROP DATABASE IF EXISTS cp_tsr' diff --git a/tests/checkpoint_parquet/run.sh b/tests/checkpoint_parquet/run.sh index 52624aca3..ece6cb8cf 100755 --- a/tests/checkpoint_parquet/run.sh +++ b/tests/checkpoint_parquet/run.sh @@ -30,7 +30,8 @@ echo 'CREATE TABLE tbl(iVal INT, s VARCHAR(16));' > "$DBPATH/cppq_tsr.tbl-schema bin/parquet_gen --dir $DBPATH --schema cppq_tsr --table tbl --chunk 1 --rows $ROW_COUNT # Set the failpoint to kill the lightning instance as soon as one batch data is written -export GO_FAILPOINTS="github.com/pingcap/tidb-lightning/lightning/restore/FailAfterWriteRows=return;github.com/pingcap/tidb-lightning/lightning/restore/SetMinDeliverBytes=return(1)" +PKG="github.com/pingcap/tidb-lightning/lightning/restore" +export GO_FAILPOINTS="$PKG/SlowDownWriteRows=sleep(1000);$PKG/FailAfterWriteRows=panic;$PKG/SetMinDeliverBytes=return(1)" # Start importing the tables. run_sql 'DROP DATABASE IF EXISTS cppq_tsr' diff --git a/tests/disk_quota/config.toml b/tests/disk_quota/config.toml new file mode 100644 index 000000000..2859d63cc --- /dev/null +++ b/tests/disk_quota/config.toml @@ -0,0 +1,7 @@ +[tikv-importer] +backend = 'local' +disk-quota = '75MB' +max-kv-pairs = 50 + +[cron] +check-disk-quota = '1s' diff --git a/tests/disk_quota/data/disk_quota-schema-create.sql b/tests/disk_quota/data/disk_quota-schema-create.sql new file mode 100644 index 000000000..f8f98e5a7 --- /dev/null +++ b/tests/disk_quota/data/disk_quota-schema-create.sql @@ -0,0 +1 @@ +create schema disk_quota; diff --git a/tests/disk_quota/data/disk_quota.t-schema.sql b/tests/disk_quota/data/disk_quota.t-schema.sql new file mode 100644 index 000000000..335ff625d --- /dev/null +++ b/tests/disk_quota/data/disk_quota.t-schema.sql @@ -0,0 +1,12 @@ +create table t ( + id int not null primary key, + + -- each stored generated column occupy about 150 KB of data, so we are 750 KB per row. + -- without disk quota the engine size will be 750 KB * 2000 rows = 1.5 GB ≈ 1.4 GiB. + -- (FIXME: making the KV size too large may crash PD?) + sa longblob as (aes_encrypt(rpad(id, 150000, 'a'), 'xxx', 'iviviviviviviviv')) stored, + sb longblob as (aes_encrypt(rpad(id, 150000, 'b'), 'xxx', 'iviviviviviviviv')) stored, + sc longblob as (aes_encrypt(rpad(id, 150000, 'c'), 'xxx', 'iviviviviviviviv')) stored, + sd longblob as (aes_encrypt(rpad(id, 150000, 'd'), 'xxx', 'iviviviviviviviv')) stored, + se longblob as (aes_encrypt(rpad(id, 150000, 'e'), 'xxx', 'iviviviviviviviv')) stored +); diff --git a/tests/disk_quota/data/disk_quota.t.0.sql b/tests/disk_quota/data/disk_quota.t.0.sql new file mode 100644 index 000000000..1b441b25c --- /dev/null +++ b/tests/disk_quota/data/disk_quota.t.0.sql @@ -0,0 +1,51 @@ +insert into t (id) values +(0), (1), (2), (3), (4), (5), (6), (7), (8), (9), +(10), (11), (12), (13), (14), (15), (16), (17), (18), (19), +(20), (21), (22), (23), (24), (25), (26), (27), (28), (29), +(30), (31), (32), (33), (34), (35), (36), (37), (38), (39), +(40), (41), (42), (43), (44), (45), (46), (47), (48), (49), +(50), (51), (52), (53), (54), (55), (56), (57), (58), (59), +(60), (61), (62), (63), (64), (65), (66), (67), (68), (69), +(70), (71), (72), (73), (74), (75), (76), (77), (78), (79), +(80), (81), (82), (83), (84), (85), (86), (87), (88), (89), +(90), (91), (92), (93), (94), (95), (96), (97), (98), (99), +(100), (101), (102), (103), (104), (105), (106), (107), (108), (109), +(110), (111), (112), (113), (114), (115), (116), (117), (118), (119), +(120), (121), (122), (123), (124), (125), (126), (127), (128), (129), +(130), (131), (132), (133), (134), (135), (136), (137), (138), (139), +(140), (141), (142), (143), (144), (145), (146), (147), (148), (149), +(150), (151), (152), (153), (154), (155), (156), (157), (158), (159), +(160), (161), (162), (163), (164), (165), (166), (167), (168), (169), +(170), (171), (172), (173), (174), (175), (176), (177), (178), (179), +(180), (181), (182), (183), (184), (185), (186), (187), (188), (189), +(190), (191), (192), (193), (194), (195), (196), (197), (198), (199), +(200), (201), (202), (203), (204), (205), (206), (207), (208), (209), +(210), (211), (212), (213), (214), (215), (216), (217), (218), (219), +(220), (221), (222), (223), (224), (225), (226), (227), (228), (229), +(230), (231), (232), (233), (234), (235), (236), (237), (238), (239), +(240), (241), (242), (243), (244), (245), (246), (247), (248), (249), +(250), (251), (252), (253), (254), (255), (256), (257), (258), (259), +(260), (261), (262), (263), (264), (265), (266), (267), (268), (269), +(270), (271), (272), (273), (274), (275), (276), (277), (278), (279), +(280), (281), (282), (283), (284), (285), (286), (287), (288), (289), +(290), (291), (292), (293), (294), (295), (296), (297), (298), (299), +(300), (301), (302), (303), (304), (305), (306), (307), (308), (309), +(310), (311), (312), (313), (314), (315), (316), (317), (318), (319), +(320), (321), (322), (323), (324), (325), (326), (327), (328), (329), +(330), (331), (332), (333), (334), (335), (336), (337), (338), (339), +(340), (341), (342), (343), (344), (345), (346), (347), (348), (349), +(350), (351), (352), (353), (354), (355), (356), (357), (358), (359), +(360), (361), (362), (363), (364), (365), (366), (367), (368), (369), +(370), (371), (372), (373), (374), (375), (376), (377), (378), (379), +(380), (381), (382), (383), (384), (385), (386), (387), (388), (389), +(390), (391), (392), (393), (394), (395), (396), (397), (398), (399), +(400), (401), (402), (403), (404), (405), (406), (407), (408), (409), +(410), (411), (412), (413), (414), (415), (416), (417), (418), (419), +(420), (421), (422), (423), (424), (425), (426), (427), (428), (429), +(430), (431), (432), (433), (434), (435), (436), (437), (438), (439), +(440), (441), (442), (443), (444), (445), (446), (447), (448), (449), +(450), (451), (452), (453), (454), (455), (456), (457), (458), (459), +(460), (461), (462), (463), (464), (465), (466), (467), (468), (469), +(470), (471), (472), (473), (474), (475), (476), (477), (478), (479), +(480), (481), (482), (483), (484), (485), (486), (487), (488), (489), +(490), (491), (492), (493), (494), (495), (496), (497), (498), (499); diff --git a/tests/disk_quota/data/disk_quota.t.1.sql b/tests/disk_quota/data/disk_quota.t.1.sql new file mode 100644 index 000000000..63b5a57d3 --- /dev/null +++ b/tests/disk_quota/data/disk_quota.t.1.sql @@ -0,0 +1,51 @@ +insert into t (id) values +(500), (501), (502), (503), (504), (505), (506), (507), (508), (509), +(510), (511), (512), (513), (514), (515), (516), (517), (518), (519), +(520), (521), (522), (523), (524), (525), (526), (527), (528), (529), +(530), (531), (532), (533), (534), (535), (536), (537), (538), (539), +(540), (541), (542), (543), (544), (545), (546), (547), (548), (549), +(550), (551), (552), (553), (554), (555), (556), (557), (558), (559), +(560), (561), (562), (563), (564), (565), (566), (567), (568), (569), +(570), (571), (572), (573), (574), (575), (576), (577), (578), (579), +(580), (581), (582), (583), (584), (585), (586), (587), (588), (589), +(590), (591), (592), (593), (594), (595), (596), (597), (598), (599), +(600), (601), (602), (603), (604), (605), (606), (607), (608), (609), +(610), (611), (612), (613), (614), (615), (616), (617), (618), (619), +(620), (621), (622), (623), (624), (625), (626), (627), (628), (629), +(630), (631), (632), (633), (634), (635), (636), (637), (638), (639), +(640), (641), (642), (643), (644), (645), (646), (647), (648), (649), +(650), (651), (652), (653), (654), (655), (656), (657), (658), (659), +(660), (661), (662), (663), (664), (665), (666), (667), (668), (669), +(670), (671), (672), (673), (674), (675), (676), (677), (678), (679), +(680), (681), (682), (683), (684), (685), (686), (687), (688), (689), +(690), (691), (692), (693), (694), (695), (696), (697), (698), (699), +(700), (701), (702), (703), (704), (705), (706), (707), (708), (709), +(710), (711), (712), (713), (714), (715), (716), (717), (718), (719), +(720), (721), (722), (723), (724), (725), (726), (727), (728), (729), +(730), (731), (732), (733), (734), (735), (736), (737), (738), (739), +(740), (741), (742), (743), (744), (745), (746), (747), (748), (749), +(750), (751), (752), (753), (754), (755), (756), (757), (758), (759), +(760), (761), (762), (763), (764), (765), (766), (767), (768), (769), +(770), (771), (772), (773), (774), (775), (776), (777), (778), (779), +(780), (781), (782), (783), (784), (785), (786), (787), (788), (789), +(790), (791), (792), (793), (794), (795), (796), (797), (798), (799), +(800), (801), (802), (803), (804), (805), (806), (807), (808), (809), +(810), (811), (812), (813), (814), (815), (816), (817), (818), (819), +(820), (821), (822), (823), (824), (825), (826), (827), (828), (829), +(830), (831), (832), (833), (834), (835), (836), (837), (838), (839), +(840), (841), (842), (843), (844), (845), (846), (847), (848), (849), +(850), (851), (852), (853), (854), (855), (856), (857), (858), (859), +(860), (861), (862), (863), (864), (865), (866), (867), (868), (869), +(870), (871), (872), (873), (874), (875), (876), (877), (878), (879), +(880), (881), (882), (883), (884), (885), (886), (887), (888), (889), +(890), (891), (892), (893), (894), (895), (896), (897), (898), (899), +(900), (901), (902), (903), (904), (905), (906), (907), (908), (909), +(910), (911), (912), (913), (914), (915), (916), (917), (918), (919), +(920), (921), (922), (923), (924), (925), (926), (927), (928), (929), +(930), (931), (932), (933), (934), (935), (936), (937), (938), (939), +(940), (941), (942), (943), (944), (945), (946), (947), (948), (949), +(950), (951), (952), (953), (954), (955), (956), (957), (958), (959), +(960), (961), (962), (963), (964), (965), (966), (967), (968), (969), +(970), (971), (972), (973), (974), (975), (976), (977), (978), (979), +(980), (981), (982), (983), (984), (985), (986), (987), (988), (989), +(990), (991), (992), (993), (994), (995), (996), (997), (998), (999); diff --git a/tests/disk_quota/data/disk_quota.t.2.sql b/tests/disk_quota/data/disk_quota.t.2.sql new file mode 100644 index 000000000..57b739923 --- /dev/null +++ b/tests/disk_quota/data/disk_quota.t.2.sql @@ -0,0 +1,51 @@ +insert into t (id) values +(1000), (1001), (1002), (1003), (1004), (1005), (1006), (1007), (1008), (1009), +(1010), (1011), (1012), (1013), (1014), (1015), (1016), (1017), (1018), (1019), +(1020), (1021), (1022), (1023), (1024), (1025), (1026), (1027), (1028), (1029), +(1030), (1031), (1032), (1033), (1034), (1035), (1036), (1037), (1038), (1039), +(1040), (1041), (1042), (1043), (1044), (1045), (1046), (1047), (1048), (1049), +(1050), (1051), (1052), (1053), (1054), (1055), (1056), (1057), (1058), (1059), +(1060), (1061), (1062), (1063), (1064), (1065), (1066), (1067), (1068), (1069), +(1070), (1071), (1072), (1073), (1074), (1075), (1076), (1077), (1078), (1079), +(1080), (1081), (1082), (1083), (1084), (1085), (1086), (1087), (1088), (1089), +(1090), (1091), (1092), (1093), (1094), (1095), (1096), (1097), (1098), (1099), +(1100), (1101), (1102), (1103), (1104), (1105), (1106), (1107), (1108), (1109), +(1110), (1111), (1112), (1113), (1114), (1115), (1116), (1117), (1118), (1119), +(1120), (1121), (1122), (1123), (1124), (1125), (1126), (1127), (1128), (1129), +(1130), (1131), (1132), (1133), (1134), (1135), (1136), (1137), (1138), (1139), +(1140), (1141), (1142), (1143), (1144), (1145), (1146), (1147), (1148), (1149), +(1150), (1151), (1152), (1153), (1154), (1155), (1156), (1157), (1158), (1159), +(1160), (1161), (1162), (1163), (1164), (1165), (1166), (1167), (1168), (1169), +(1170), (1171), (1172), (1173), (1174), (1175), (1176), (1177), (1178), (1179), +(1180), (1181), (1182), (1183), (1184), (1185), (1186), (1187), (1188), (1189), +(1190), (1191), (1192), (1193), (1194), (1195), (1196), (1197), (1198), (1199), +(1200), (1201), (1202), (1203), (1204), (1205), (1206), (1207), (1208), (1209), +(1210), (1211), (1212), (1213), (1214), (1215), (1216), (1217), (1218), (1219), +(1220), (1221), (1222), (1223), (1224), (1225), (1226), (1227), (1228), (1229), +(1230), (1231), (1232), (1233), (1234), (1235), (1236), (1237), (1238), (1239), +(1240), (1241), (1242), (1243), (1244), (1245), (1246), (1247), (1248), (1249), +(1250), (1251), (1252), (1253), (1254), (1255), (1256), (1257), (1258), (1259), +(1260), (1261), (1262), (1263), (1264), (1265), (1266), (1267), (1268), (1269), +(1270), (1271), (1272), (1273), (1274), (1275), (1276), (1277), (1278), (1279), +(1280), (1281), (1282), (1283), (1284), (1285), (1286), (1287), (1288), (1289), +(1290), (1291), (1292), (1293), (1294), (1295), (1296), (1297), (1298), (1299), +(1300), (1301), (1302), (1303), (1304), (1305), (1306), (1307), (1308), (1309), +(1310), (1311), (1312), (1313), (1314), (1315), (1316), (1317), (1318), (1319), +(1320), (1321), (1322), (1323), (1324), (1325), (1326), (1327), (1328), (1329), +(1330), (1331), (1332), (1333), (1334), (1335), (1336), (1337), (1338), (1339), +(1340), (1341), (1342), (1343), (1344), (1345), (1346), (1347), (1348), (1349), +(1350), (1351), (1352), (1353), (1354), (1355), (1356), (1357), (1358), (1359), +(1360), (1361), (1362), (1363), (1364), (1365), (1366), (1367), (1368), (1369), +(1370), (1371), (1372), (1373), (1374), (1375), (1376), (1377), (1378), (1379), +(1380), (1381), (1382), (1383), (1384), (1385), (1386), (1387), (1388), (1389), +(1390), (1391), (1392), (1393), (1394), (1395), (1396), (1397), (1398), (1399), +(1400), (1401), (1402), (1403), (1404), (1405), (1406), (1407), (1408), (1409), +(1410), (1411), (1412), (1413), (1414), (1415), (1416), (1417), (1418), (1419), +(1420), (1421), (1422), (1423), (1424), (1425), (1426), (1427), (1428), (1429), +(1430), (1431), (1432), (1433), (1434), (1435), (1436), (1437), (1438), (1439), +(1440), (1441), (1442), (1443), (1444), (1445), (1446), (1447), (1448), (1449), +(1450), (1451), (1452), (1453), (1454), (1455), (1456), (1457), (1458), (1459), +(1460), (1461), (1462), (1463), (1464), (1465), (1466), (1467), (1468), (1469), +(1470), (1471), (1472), (1473), (1474), (1475), (1476), (1477), (1478), (1479), +(1480), (1481), (1482), (1483), (1484), (1485), (1486), (1487), (1488), (1489), +(1490), (1491), (1492), (1493), (1494), (1495), (1496), (1497), (1498), (1499); diff --git a/tests/disk_quota/data/disk_quota.t.3.sql b/tests/disk_quota/data/disk_quota.t.3.sql new file mode 100644 index 000000000..58bf50641 --- /dev/null +++ b/tests/disk_quota/data/disk_quota.t.3.sql @@ -0,0 +1,51 @@ +insert into t (id) values +(1500), (1501), (1502), (1503), (1504), (1505), (1506), (1507), (1508), (1509), +(1510), (1511), (1512), (1513), (1514), (1515), (1516), (1517), (1518), (1519), +(1520), (1521), (1522), (1523), (1524), (1525), (1526), (1527), (1528), (1529), +(1530), (1531), (1532), (1533), (1534), (1535), (1536), (1537), (1538), (1539), +(1540), (1541), (1542), (1543), (1544), (1545), (1546), (1547), (1548), (1549), +(1550), (1551), (1552), (1553), (1554), (1555), (1556), (1557), (1558), (1559), +(1560), (1561), (1562), (1563), (1564), (1565), (1566), (1567), (1568), (1569), +(1570), (1571), (1572), (1573), (1574), (1575), (1576), (1577), (1578), (1579), +(1580), (1581), (1582), (1583), (1584), (1585), (1586), (1587), (1588), (1589), +(1590), (1591), (1592), (1593), (1594), (1595), (1596), (1597), (1598), (1599), +(1600), (1601), (1602), (1603), (1604), (1605), (1606), (1607), (1608), (1609), +(1610), (1611), (1612), (1613), (1614), (1615), (1616), (1617), (1618), (1619), +(1620), (1621), (1622), (1623), (1624), (1625), (1626), (1627), (1628), (1629), +(1630), (1631), (1632), (1633), (1634), (1635), (1636), (1637), (1638), (1639), +(1640), (1641), (1642), (1643), (1644), (1645), (1646), (1647), (1648), (1649), +(1650), (1651), (1652), (1653), (1654), (1655), (1656), (1657), (1658), (1659), +(1660), (1661), (1662), (1663), (1664), (1665), (1666), (1667), (1668), (1669), +(1670), (1671), (1672), (1673), (1674), (1675), (1676), (1677), (1678), (1679), +(1680), (1681), (1682), (1683), (1684), (1685), (1686), (1687), (1688), (1689), +(1690), (1691), (1692), (1693), (1694), (1695), (1696), (1697), (1698), (1699), +(1700), (1701), (1702), (1703), (1704), (1705), (1706), (1707), (1708), (1709), +(1710), (1711), (1712), (1713), (1714), (1715), (1716), (1717), (1718), (1719), +(1720), (1721), (1722), (1723), (1724), (1725), (1726), (1727), (1728), (1729), +(1730), (1731), (1732), (1733), (1734), (1735), (1736), (1737), (1738), (1739), +(1740), (1741), (1742), (1743), (1744), (1745), (1746), (1747), (1748), (1749), +(1750), (1751), (1752), (1753), (1754), (1755), (1756), (1757), (1758), (1759), +(1760), (1761), (1762), (1763), (1764), (1765), (1766), (1767), (1768), (1769), +(1770), (1771), (1772), (1773), (1774), (1775), (1776), (1777), (1778), (1779), +(1780), (1781), (1782), (1783), (1784), (1785), (1786), (1787), (1788), (1789), +(1790), (1791), (1792), (1793), (1794), (1795), (1796), (1797), (1798), (1799), +(1800), (1801), (1802), (1803), (1804), (1805), (1806), (1807), (1808), (1809), +(1810), (1811), (1812), (1813), (1814), (1815), (1816), (1817), (1818), (1819), +(1820), (1821), (1822), (1823), (1824), (1825), (1826), (1827), (1828), (1829), +(1830), (1831), (1832), (1833), (1834), (1835), (1836), (1837), (1838), (1839), +(1840), (1841), (1842), (1843), (1844), (1845), (1846), (1847), (1848), (1849), +(1850), (1851), (1852), (1853), (1854), (1855), (1856), (1857), (1858), (1859), +(1860), (1861), (1862), (1863), (1864), (1865), (1866), (1867), (1868), (1869), +(1870), (1871), (1872), (1873), (1874), (1875), (1876), (1877), (1878), (1879), +(1880), (1881), (1882), (1883), (1884), (1885), (1886), (1887), (1888), (1889), +(1890), (1891), (1892), (1893), (1894), (1895), (1896), (1897), (1898), (1899), +(1900), (1901), (1902), (1903), (1904), (1905), (1906), (1907), (1908), (1909), +(1910), (1911), (1912), (1913), (1914), (1915), (1916), (1917), (1918), (1919), +(1920), (1921), (1922), (1923), (1924), (1925), (1926), (1927), (1928), (1929), +(1930), (1931), (1932), (1933), (1934), (1935), (1936), (1937), (1938), (1939), +(1940), (1941), (1942), (1943), (1944), (1945), (1946), (1947), (1948), (1949), +(1950), (1951), (1952), (1953), (1954), (1955), (1956), (1957), (1958), (1959), +(1960), (1961), (1962), (1963), (1964), (1965), (1966), (1967), (1968), (1969), +(1970), (1971), (1972), (1973), (1974), (1975), (1976), (1977), (1978), (1979), +(1980), (1981), (1982), (1983), (1984), (1985), (1986), (1987), (1988), (1989), +(1990), (1991), (1992), (1993), (1994), (1995), (1996), (1997), (1998), (1999); diff --git a/tests/disk_quota/run.sh b/tests/disk_quota/run.sh new file mode 100644 index 000000000..9356f11ee --- /dev/null +++ b/tests/disk_quota/run.sh @@ -0,0 +1,74 @@ +#!/bin/sh +# +# Copyright 2020 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +check_cluster_version 4 0 0 'local backend' || exit 0 + +# the default mode (aes-128-ecb) can be easily compressed, switch to cbc to reduce the compression effect. +run_sql 'DROP DATABASE IF EXISTS disk_quota;' +run_sql "SELECT @@block_encryption_mode" +OLD_ENCRYPTION_MODE=$(read_result) +run_sql "SET GLOBAL block_encryption_mode = 'aes-256-cbc';" + +DISK_QUOTA_DIR="$TEST_DIR/with-disk-quota" +FINISHED_FILE="$TEST_DIR/sorted-with-disk-quota.finished" + +mkdir -p "$DISK_QUOTA_DIR" +rm -f "$FINISHED_FILE" +cleanup() { + touch "$FINISHED_FILE" + run_sql "SET GLOBAL block_encryption_mode = '$OLD_ENCRYPTION_MODE';" +} +trap cleanup EXIT + +# There is normally a 2 second delay between these SET GLOBAL statements returns +# and the changes are actually effective. So we have this check-and-retry loop +# below to ensure Lightning gets our desired global vars. +for i in $(seq 3); do + sleep 1 + run_sql "SELECT @@block_encryption_mode" + if [ "$(read_result)" = 'aes-256-cbc' ]; then + break + fi +done + +while [ ! -e "$FINISHED_FILE" ]; do + DISK_USAGE=$(du -s -B1 "$DISK_QUOTA_DIR" | cut -f 1) + # the disk quota of 75 MiB is a just soft limit. + # the reserved size we have is (512 MiB + 4 files × 1000ms × 1 KiB/ms) = 516 MiB, + # which sums up to 591 MiB as the hard limit. + if [ "0$DISK_USAGE" -gt 619610112 ]; then + echo "hard disk quota exceeded, actual size = $DISK_USAGE" > "$FINISHED_FILE" + break + else + sleep 1 + fi +done & + +export GO_FAILPOINTS="github.com/pingcap/tidb-lightning/lightning/restore/SlowDownWriteRows=sleep(500)" +run_lightning --sorted-kv-dir "$DISK_QUOTA_DIR/sorted" --log-file "$TEST_DIR/lightning-disk-quota.log" +touch "$FINISHED_FILE" +# if $FINISHED_FILE has content, it is only because the hard disk quota is exceeded. +[ -s "$FINISHED_FILE" ] && cat "$FINISHED_FILE" && exit 1 + +# check that disk quota is indeed triggered. +grep -q 'disk quota exceeded' "$TEST_DIR/lightning-disk-quota.log" + +# check that the columns are correct. +run_sql "select cast(trim(trailing 'a' from aes_decrypt(sa, 'xxx', 'iviviviviviviviv')) as char) a from disk_quota.t where id = 1357" +check_contains 'a: 1357' +run_sql "select cast(trim(trailing 'e' from aes_decrypt(se, 'xxx', 'iviviviviviviviv')) as char) e from disk_quota.t where id = 246" +check_contains 'e: 246' diff --git a/tests/generated_columns/run.sh b/tests/generated_columns/run.sh index 9d02a4c25..2e0ac3864 100644 --- a/tests/generated_columns/run.sh +++ b/tests/generated_columns/run.sh @@ -50,7 +50,7 @@ for BACKEND in 'local' 'tidb' 'importer'; do check_contains 'd: 103' check_contains 'e: 104' - run_sql --binary-as-hex 'SELECT * FROM gencol.various_types' + run_sql 'SELECT * FROM gencol.various_types' --binary-as-hex check_contains 'int64: 3' check_contains 'uint64: 5764801' check_contains 'float32: 0.5625' diff --git a/tests/local_backend/run.sh b/tests/local_backend/run.sh index 10df1057e..805e6bdab 100755 --- a/tests/local_backend/run.sh +++ b/tests/local_backend/run.sh @@ -84,7 +84,7 @@ for ckpt in mysql file; do run_sql 'DROP DATABASE IF EXISTS cpeng;' run_sql 'DROP DATABASE IF EXISTS tidb_lightning_checkpoint_local_backend_test' rm -f "/tmp/tidb_lightning_checkpoint_local_backend_test.pb" - + # before chunk pos is updated, local files could handle lost set +e export GO_FAILPOINTS="github.com/pingcap/tidb-lightning/lightning/restore/FailAfterWriteRows=return" @@ -95,7 +95,7 @@ for ckpt in mysql file; do --enable-checkpoint=1 \ --config=tests/$TEST_NAME/$ckpt.toml >$TEST_DIR/lightning_ctl.output 2>&1 grep -Fq "No table has lost intermediate files according to given config" $TEST_DIR/lightning_ctl.output - + # when position of chunk file doesn't equal to offset, intermediate file should exist set +e export GO_FAILPOINTS="github.com/pingcap/tidb-lightning/lightning/restore/LocalBackendSaveCheckpoint=return;github.com/pingcap/tidb-lightning/lightning/restore/FailIfImportedChunk=return(1)" @@ -107,8 +107,8 @@ for ckpt in mysql file; do --config=tests/$TEST_NAME/$ckpt.toml >$TEST_DIR/lightning_ctl.output 2>&1 grep -Eq "These tables are missing intermediate files: \[.+\]" $TEST_DIR/lightning_ctl.output # don't distinguish whole sort-kv directory missing and table's directory missing for now - ls -lA $TEST_DIR/sorted - + ls -lA $TEST_DIR/$TEST_NAME.sorted + # after index engine is imported, local file could handle lost set +e export GO_FAILPOINTS="github.com/pingcap/tidb-lightning/lightning/restore/FailIfIndexEngineImported=return(1)" @@ -120,4 +120,4 @@ for ckpt in mysql file; do --config=tests/$TEST_NAME/$ckpt.toml >$TEST_DIR/lightning_ctl.output 2>&1 grep -Fq "No table has lost intermediate files according to given config" $TEST_DIR/lightning_ctl.output done -rm -r $TEST_DIR/sorted \ No newline at end of file +rm -r $TEST_DIR/$TEST_NAME.sorted \ No newline at end of file diff --git a/tests/parquet/run.sh b/tests/parquet/run.sh index 3d2fd56cc..d0064f6c6 100755 --- a/tests/parquet/run.sh +++ b/tests/parquet/run.sh @@ -26,7 +26,7 @@ for BACKEND in local importer tidb; do fi run_sql 'DROP DATABASE IF EXISTS test' run_sql 'CREATE DATABASE test' - run_sql -D test "source tests/$TEST_NAME/db.sql;" + run_sql "source tests/$TEST_NAME/db.sql;" -D test run_lightning --backend $BACKEND diff --git a/tests/run.sh b/tests/run.sh index a6ddebe5a..8c02ac879 100755 --- a/tests/run.sh +++ b/tests/run.sh @@ -17,10 +17,10 @@ export TIFLASH="$TIFLASH" export NEW_COLLATION="$NEW_COLLATION" set -eu -TEST_DIR=/tmp/lightning_test_result +export TEST_DIR=/tmp/lightning_test_result SELECTED_TEST_NAME="${TEST_NAME-$(find tests -mindepth 2 -maxdepth 2 -name run.sh | cut -d/ -f2 | sort)}" export PATH="tests/_utils:$PATH" -source tests/_utils/run_services +. tests/_utils/run_services trap stop_services EXIT start_services diff --git a/tidb-lightning.toml b/tidb-lightning.toml index a55daa48f..b5ff0359a 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -90,6 +90,17 @@ addr = "127.0.0.1:8287" #send-kv-pairs = 32768 # local storage directory used in "local" backend. #sorted-kv-dir = "" +# Maximum size of the local storage directory. Periodically, Lightning will check if the total storage size exceeds this +# value. If so the "local" backend will block and immediately ingest the largest engines into the target TiKV until the +# usage falls below the specified capacity. +# Note that the disk-quota IS NOT A HARD LIMIT. There are chances that the usage overshoots the quota before it was +# detected. The overshoot is up to 6.3 GiB in default settings (8 open engines, 40 region-concurrency, check quota every +# minute). +# Setting the disk quota too low may cause engines to overlap each other too much and slow down import. +# This setting is ignored in "tidb" and "importer" backends. +# The default value of 0 means letting Lightning to automatically pick an appropriate capacity using the free disk space +# of sorted-kv-dir, subtracting the overshoot. +#disk-quota = 0 # range-concurrency controls the maximum ingest concurrently while writing to tikv, It can affect the network traffic. # this default config can make full use of a 10Gib bandwidth network, if the network bandwidth is higher, you can increase # this to gain better performance. Larger value will also increase the memory usage slightly. @@ -258,3 +269,5 @@ post-process-at-last = true switch-mode = "5m" # the duration which the an import progress will be printed to the log. log-progress = "5m" +# the duration which tikv-importer.sorted-kv-dir-capacity is checked. +check-disk-quota = "1m"