-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathsnapshot.go
182 lines (150 loc) · 5.54 KB
/
snapshot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package graviton
import "fmt"
import "encoding/binary"
// Snapshot are used to access any arbitrary snapshot of entire database at any point in time
// snapshot refers to collective state of all trees + data (key-values) + history
// each commit ( tree.Commit() or Commit(tree1, tree2 .....)) creates a new snapshot
// each snapshot is represented by an incrementing uint64 number, 0 represents most recent snapshot.
// TODO we may need to provide API to export DB at specific snapshot version
type Snapshot struct {
store *Store
version uint64
findex, fpos uint32
vroot *inner
}
// Load a specific snapshot from the store, 0th version = load most recent version as a special case
// note: 0th tree is not stored in disk
// also note that commits are being done so versions might be change
func (store *Store) LoadSnapshot(version uint64) (*Snapshot, error) {
var err error
_, highest_version, findex, fpos, err := store.findhighestsnapshotinram() // only latest version can be reached from the table
if err != nil {
return nil, err
}
if version > highest_version {
return nil, fmt.Errorf("Database highest version: %d you requested %d.Not Possible!!", highest_version, version)
}
if version <= 0 || version == highest_version { // user requested most recent version
if findex == 0 && fpos == 0 { // if storage is newly create, lets build up a new version root
return &Snapshot{store: store, version: highest_version, findex: uint32(findex), fpos: uint32(fpos), vroot: newInner(0)}, nil
} else {
if _, vroot, err := store.loadrootusingpos(findex, fpos); err != nil {
return nil, err
} else {
return &Snapshot{store: store, version: highest_version, findex: uint32(findex), fpos: uint32(fpos), vroot: vroot}, nil
}
}
}
// user requested an arbitrary version between 1 and highest_version -1
if findex, fpos, err = store.ReadVersionData(version); err != nil {
return nil, err
}
_, vroot, err := store.loadrootusingpos(findex, fpos)
if err != nil {
return nil, err
}
return &Snapshot{store: store, version: version, findex: findex, fpos: fpos, vroot: vroot}, nil
}
func (store *Store) loadrootusingpos(findex, fpos uint32) (string, *inner, error) {
var buf [512]byte
bytes_count, err := store.read(findex, fpos, buf[:])
if bytes_count >= 3 {
tmp := &inner{hash: make([]byte, 0, HASHSIZE)}
err := tmp.Unmarshal(buf[:bytes_count])
if err != nil {
return "", nil, err
} else {
tmp.findex, tmp.fpos = findex, fpos
return string(tmp.bucket_name), tmp, nil
}
}
return "", nil, err
}
// load tree using the specfic global version
func (s *Snapshot) loadTree(key []byte) (tree *Tree, err error) {
var bname string
var root *inner
var position []byte
if position, err = s.vroot.Get(s.store, sum(key)); err == nil { // underscore is first character
if bname, root, err = s.store.loadrootusingpos(decode(position)); err == nil {
tree = &Tree{store: s.store, root: root, treename: bname, snapshot_version: s.version}
tree.Hash()
}
}
return tree, err
}
// Load a versioned tree from the store all trees have there own version number
func (s *Snapshot) GetTreeWithVersion(treename string, version uint64) (*Tree, error) {
var buf = [512]byte{':'}
if err := check_tree_name(treename); err != nil {
return nil, err
}
if version == 0 {
return &Tree{root: newInner(0), treename: treename, store: s.store, snapshot_version: s.version}, nil
}
done := 1
done += copy(buf[done:], []byte(treename))
done += binary.PutUvarint(buf[done:], version)
return s.loadTree(buf[:done])
}
// Gets the snapshot version number
func (s *Snapshot) GetVersion() uint64 {
return s.version
}
// Gets highest stored version number of the specific tree
func (s *Snapshot) GetTreeHighestVersion(treename string) (uint64, error) {
var buf = [512]byte{':'}
if err := check_tree_name(treename); err != nil {
return 0, err
}
done := 1
done += copy(buf[done:], []byte(treename))
vversion, err := s.vroot.Get(s.store, sum(buf[:done]))
if err != nil { // return no found
return 0, nil // fmt.Errorf("version is not stored")
}
version, versionsize := binary.Uvarint(vversion)
if versionsize <= 0 {
return 0, fmt.Errorf("version could not be decoded probably data corruption")
}
return version, nil
}
// Gets most recent tree committed to the store
func (s *Snapshot) GetTree(treename string) (*Tree, error) {
if version, err := s.GetTreeHighestVersion(treename); err != nil {
return nil, err
} else {
return s.GetTreeWithVersion(treename, version)
}
}
// Gets the tree which has specific roothash
func (s *Snapshot) GetTreeWithRootHash(roothash []byte) (*Tree, error) {
return s.loadTree(roothash)
}
// Gets the tree which has specific tag
// NOTE: same tags might point to different trees in different snapshots of db
func (s *Snapshot) GetTreeWithTag(tag string) (*Tree, error) {
return s.loadTree([]byte(tag))
}
func check_tree_name(bucket string) error {
if len(bucket) > TREE_NAME_LIMIT {
return fmt.Errorf("Bucket name is too big than allowed limit of 127 bytes")
}
if len(bucket) >= 1 && bucket[0] == ':' {
return fmt.Errorf("Bucket cannot start with ':'")
}
return nil
}
// store highest version of tree
func (s *Snapshot) putTreeHighestVersion(treename string, version uint64) error {
var buf = [512]byte{':'}
var value [12]byte
if err := check_tree_name(treename); err != nil {
return err
}
done := 1
done += copy(buf[done:], []byte(treename))
valuesize := binary.PutUvarint(value[:], version)
leaf := newLeaf(sum(buf[:done]), buf[:done], value[:valuesize])
return s.vroot.Insert(s.store, leaf)
}