-
Notifications
You must be signed in to change notification settings - Fork 122
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #17 from scylladb/mmt/migrate
migrate
- Loading branch information
Showing
7 changed files
with
247 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,6 @@ language: go | |
go: | ||
- 1.9 | ||
- 1.8 | ||
- 1.7 | ||
|
||
env: | ||
global: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
// Copyright (C) 2017 ScyllaDB | ||
// Use of this source code is governed by a ALv2-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package migrate | ||
|
||
import ( | ||
"crypto/md5" | ||
"encoding/hex" | ||
"io" | ||
"os" | ||
) | ||
|
||
var encode = hex.EncodeToString | ||
|
||
func checksum(b []byte) string { | ||
v := md5.Sum(b) | ||
return encode(v[:]) | ||
} | ||
|
||
func fileChecksum(path string) (string, error) { | ||
f, err := os.Open(path) | ||
if err != nil { | ||
return "", nil | ||
} | ||
defer f.Close() | ||
|
||
h := md5.New() | ||
if _, err := io.Copy(h, f); err != nil { | ||
return "", err | ||
} | ||
v := h.Sum(nil) | ||
return encode(v[:]), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
// Copyright (C) 2017 ScyllaDB | ||
// Use of this source code is governed by a ALv2-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package migrate | ||
|
||
import "testing" | ||
|
||
func TestFileChecksum(t *testing.T) { | ||
c, err := fileChecksum("testdata/file") | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
if c != "bbe02f946d5455d74616fc9777557c22" { | ||
t.Fatal(c) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright (C) 2017 ScyllaDB | ||
// Use of this source code is governed by a ALv2-style | ||
// license that can be found in the LICENSE file. | ||
|
||
// Package migrate provides simple and flexible ScyllaDB and Apache Cassandra® | ||
// migrations. Migrations can be read from a flat directory containing cql files. | ||
// There is no imposed naming schema, migration name is file name and the | ||
// migrations are processed in lexicographical order. Caller provides a | ||
// gocql.Session, the session must use a desired keyspace as migrate would try | ||
// to create migrations table. | ||
package migrate |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
// Copyright (C) 2017 ScyllaDB | ||
// Use of this source code is governed by a ALv2-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package migrate | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
"os" | ||
"path/filepath" | ||
"sort" | ||
"time" | ||
|
||
"github.com/gocql/gocql" | ||
"github.com/scylladb/gocqlx" | ||
"github.com/scylladb/gocqlx/qb" | ||
) | ||
|
||
var infoSchema = `CREATE TABLE IF NOT EXISTS gocqlx_migrations ( | ||
name text, | ||
checksum text, | ||
done tinyint, | ||
start_time timestamp, | ||
end_time timestamp, | ||
PRIMARY KEY(name) | ||
)` | ||
|
||
var selectInfo = "SELECT * FROM gocqlx_migrations" | ||
|
||
// Info contains information on migration applied on a database. | ||
type Info struct { | ||
Name string | ||
Checksum string | ||
Done int | ||
StartTime time.Time | ||
EndTime time.Time | ||
} | ||
|
||
// List provides a listing of applied migrations. | ||
func List(ctx context.Context, session *gocql.Session) ([]*Info, error) { | ||
if err := ensureInfoTable(ctx, session); err != nil { | ||
return nil, err | ||
} | ||
|
||
var v []*Info | ||
err := gocqlx.Select(&v, session.Query(selectInfo).WithContext(ctx)) | ||
if err == gocql.ErrNotFound { | ||
return nil, nil | ||
} | ||
|
||
sort.Slice(v, func(i, j int) bool { | ||
return v[i].Name < v[j].Name | ||
}) | ||
|
||
return v, err | ||
} | ||
|
||
func ensureInfoTable(ctx context.Context, session *gocql.Session) error { | ||
return gocqlx.Query(session.Query(infoSchema).WithContext(ctx), nil).ExecRelease() | ||
} | ||
|
||
// Migrate reads the cql files from a directory and applies required migrations. | ||
func Migrate(ctx context.Context, session *gocql.Session, dir string) error { | ||
// get database migrations | ||
dbm, err := List(ctx, session) | ||
if err != nil { | ||
return fmt.Errorf("failed to list migrations: %s", err) | ||
} | ||
|
||
// get file migrations | ||
fm, err := filepath.Glob(filepath.Join(dir, "*.cql")) | ||
if err != nil { | ||
return fmt.Errorf("failed to list migrations in %q: %s", dir, err) | ||
} | ||
sort.Strings(fm) | ||
|
||
// verify migrations | ||
if len(dbm) > len(fm) { | ||
return fmt.Errorf("database is ahead of %q", dir) | ||
} | ||
|
||
for i := 0; i < len(dbm); i++ { | ||
if dbm[i].Name != filepath.Base(fm[i]) { | ||
fmt.Println(dbm[i].Name, filepath.Base(fm[i]), i) | ||
return errors.New("inconsistent migrations") | ||
} | ||
} | ||
|
||
for i := 0; i < len(dbm); i++ { | ||
c, err := fileChecksum(fm[i]) | ||
if err != nil { | ||
return fmt.Errorf("failed to calculate checksum for %q: %s", fm[i], err) | ||
} | ||
if dbm[i].Checksum != c { | ||
return fmt.Errorf("file %q was tempered with, expected md5 %s", fm[i], dbm[i].Checksum) | ||
} | ||
} | ||
|
||
// apply migrations | ||
if len(dbm) > 0 { | ||
last := len(dbm) - 1 | ||
if err := applyMigration(ctx, session, fm[last], dbm[last].Done); err != nil { | ||
return fmt.Errorf("failed to apply migration %q: %s", fm[last], err) | ||
} | ||
} | ||
|
||
for i := len(dbm); i < len(fm); i++ { | ||
if err := applyMigration(ctx, session, fm[i], 0); err != nil { | ||
return fmt.Errorf("failed to apply migration %q: %s", fm[i], err) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func applyMigration(ctx context.Context, session *gocql.Session, path string, done int) error { | ||
f, err := os.Open(path) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
b, err := ioutil.ReadAll(f) | ||
f.Close() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
info := Info{ | ||
Name: filepath.Base(path), | ||
StartTime: time.Now(), | ||
Checksum: checksum(b), | ||
} | ||
|
||
stmt, names := qb.Insert("gocqlx_migrations").Columns( | ||
"name", | ||
"checksum", | ||
"done", | ||
"start_time", | ||
"end_time", | ||
).ToCql() | ||
|
||
iq := gocqlx.Query(session.Query(stmt).WithContext(ctx), names) | ||
defer iq.Release() | ||
|
||
i := 1 | ||
r := bytes.NewBuffer(b) | ||
for { | ||
stmt, err := r.ReadString(';') | ||
if err == io.EOF { | ||
break | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
if i <= done { | ||
i++ | ||
continue | ||
} | ||
|
||
// execute | ||
q := gocqlx.Query(session.Query(stmt).RetryPolicy(nil).WithContext(ctx), nil) | ||
if err := q.ExecRelease(); err != nil { | ||
return fmt.Errorf("statement %d failed: %s", i, err) | ||
} | ||
|
||
// update info | ||
info.Done = i | ||
info.EndTime = time.Now() | ||
if err := iq.BindStruct(info).Exec(); err != nil { | ||
return fmt.Errorf("migration statement %d failed: %s", i, err) | ||
} | ||
|
||
i++ | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
file |