Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate #17

Merged
merged 2 commits into from
Sep 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ language: go
go:
- 1.9
- 1.8
- 1.7

env:
global:
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ hood it uses `sqlx/reflectx` package so `sqlx` models will also work with `gocql

## Features

* Builders for `SELECT`, `INSERT`, `UPDATE` `DELETE` and `BATCH`
* Builders for `SELECT`, `INSERT`, `UPDATE` `DELETE` and `BATCH` (supporting collections, counters and functions)
* Queries with named parameters (:identifier) support
* Functions support
* Binding parameters form struct or map
* Scanning results into structs and slices
* Automatic query releasing
* Schema migrations
* Fast!

## Example
Expand Down
34 changes: 34 additions & 0 deletions migrate/checksum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (C) 2017 ScyllaDB
// Use of this source code is governed by a ALv2-style
// license that can be found in the LICENSE file.

package migrate

import (
"crypto/md5"
"encoding/hex"
"io"
"os"
)

var encode = hex.EncodeToString

func checksum(b []byte) string {
v := md5.Sum(b)
return encode(v[:])
}

func fileChecksum(path string) (string, error) {
f, err := os.Open(path)
if err != nil {
return "", nil
}
defer f.Close()

h := md5.New()
if _, err := io.Copy(h, f); err != nil {
return "", err
}
v := h.Sum(nil)
return encode(v[:]), nil
}
17 changes: 17 additions & 0 deletions migrate/checksum_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (C) 2017 ScyllaDB
// Use of this source code is governed by a ALv2-style
// license that can be found in the LICENSE file.

package migrate

import "testing"

func TestFileChecksum(t *testing.T) {
c, err := fileChecksum("testdata/file")
if err != nil {
t.Fatal(err)
}
if c != "bbe02f946d5455d74616fc9777557c22" {
t.Fatal(c)
}
}
11 changes: 11 additions & 0 deletions migrate/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (C) 2017 ScyllaDB
// Use of this source code is governed by a ALv2-style
// license that can be found in the LICENSE file.

// Package migrate provides simple and flexible ScyllaDB and Apache Cassandra®
// migrations. Migrations can be read from a flat directory containing cql files.
// There is no imposed naming schema, migration name is file name and the
// migrations are processed in lexicographical order. Caller provides a
// gocql.Session, the session must use a desired keyspace as migrate would try
// to create migrations table.
package migrate
182 changes: 182 additions & 0 deletions migrate/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright (C) 2017 ScyllaDB
// Use of this source code is governed by a ALv2-style
// license that can be found in the LICENSE file.

package migrate

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"time"

"github.com/gocql/gocql"
"github.com/scylladb/gocqlx"
"github.com/scylladb/gocqlx/qb"
)

var infoSchema = `CREATE TABLE IF NOT EXISTS gocqlx_migrations (
name text,
checksum text,
done tinyint,
start_time timestamp,
end_time timestamp,
PRIMARY KEY(name)
)`

var selectInfo = "SELECT * FROM gocqlx_migrations"

// Info contains information on migration applied on a database.
type Info struct {
Name string
Checksum string
Done int
StartTime time.Time
EndTime time.Time
}

// List provides a listing of applied migrations.
func List(ctx context.Context, session *gocql.Session) ([]*Info, error) {
if err := ensureInfoTable(ctx, session); err != nil {
return nil, err
}

var v []*Info
err := gocqlx.Select(&v, session.Query(selectInfo).WithContext(ctx))
if err == gocql.ErrNotFound {
return nil, nil
}

sort.Slice(v, func(i, j int) bool {
return v[i].Name < v[j].Name
})

return v, err
}

func ensureInfoTable(ctx context.Context, session *gocql.Session) error {
return gocqlx.Query(session.Query(infoSchema).WithContext(ctx), nil).ExecRelease()
}

// Migrate reads the cql files from a directory and applies required migrations.
func Migrate(ctx context.Context, session *gocql.Session, dir string) error {
// get database migrations
dbm, err := List(ctx, session)
if err != nil {
return fmt.Errorf("failed to list migrations: %s", err)
}

// get file migrations
fm, err := filepath.Glob(filepath.Join(dir, "*.cql"))
if err != nil {
return fmt.Errorf("failed to list migrations in %q: %s", dir, err)
}
sort.Strings(fm)

// verify migrations
if len(dbm) > len(fm) {
return fmt.Errorf("database is ahead of %q", dir)
}

for i := 0; i < len(dbm); i++ {
if dbm[i].Name != filepath.Base(fm[i]) {
fmt.Println(dbm[i].Name, filepath.Base(fm[i]), i)
return errors.New("inconsistent migrations")
}
}

for i := 0; i < len(dbm); i++ {
c, err := fileChecksum(fm[i])
if err != nil {
return fmt.Errorf("failed to calculate checksum for %q: %s", fm[i], err)
}
if dbm[i].Checksum != c {
return fmt.Errorf("file %q was tempered with, expected md5 %s", fm[i], dbm[i].Checksum)
}
}

// apply migrations
if len(dbm) > 0 {
last := len(dbm) - 1
if err := applyMigration(ctx, session, fm[last], dbm[last].Done); err != nil {
return fmt.Errorf("failed to apply migration %q: %s", fm[last], err)
}
}

for i := len(dbm); i < len(fm); i++ {
if err := applyMigration(ctx, session, fm[i], 0); err != nil {
return fmt.Errorf("failed to apply migration %q: %s", fm[i], err)
}
}

return nil
}

func applyMigration(ctx context.Context, session *gocql.Session, path string, done int) error {
f, err := os.Open(path)
if err != nil {
return err
}

b, err := ioutil.ReadAll(f)
f.Close()
if err != nil {
return err
}

info := Info{
Name: filepath.Base(path),
StartTime: time.Now(),
Checksum: checksum(b),
}

stmt, names := qb.Insert("gocqlx_migrations").Columns(
"name",
"checksum",
"done",
"start_time",
"end_time",
).ToCql()

iq := gocqlx.Query(session.Query(stmt).WithContext(ctx), names)
defer iq.Release()

i := 1
r := bytes.NewBuffer(b)
for {
stmt, err := r.ReadString(';')
if err == io.EOF {
break
}
if err != nil {
return err
}
if i <= done {
i++
continue
}

// execute
q := gocqlx.Query(session.Query(stmt).RetryPolicy(nil).WithContext(ctx), nil)
if err := q.ExecRelease(); err != nil {
return fmt.Errorf("statement %d failed: %s", i, err)
}

// update info
info.Done = i
info.EndTime = time.Now()
if err := iq.BindStruct(info).Exec(); err != nil {
return fmt.Errorf("migration statement %d failed: %s", i, err)
}

i++
}

return nil
}
1 change: 1 addition & 0 deletions migrate/testdata/file
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
file