@@ -14,7 +14,8 @@ import (
14
14
15
15
type TX struct {
16
16
Lock * sync.Mutex
17
- Engine engine.Engine
17
+ Cache engine.Engine // store active data, which will be written to engine after commit or deleted with rollback
18
+ Engine engine.Engine // store stable data.
18
19
State * TxState
19
20
}
20
21
@@ -35,24 +36,25 @@ func (tx *TX) Begin(readOnly bool) error {
35
36
return errors .Wrap (err , "scanActive" )
36
37
}
37
38
tx .State = & TxState {
38
- TxID : txId ,
39
- ReadOnly : readOnly ,
40
- ActiveTx : active ,
39
+ TxID : txId ,
40
+ ReadOnly : readOnly ,
41
+ ActiveTx : active ,
42
+ ActiveKeys : map [string ]struct {}{},
41
43
}
42
44
if ! readOnly {
43
45
activeKey , err := encodeTxActiveKey (txId )
44
46
if err != nil {
45
47
return errors .Wrap (err , fmt .Sprintf ("getTxActiveKey with %d" , txId ))
46
48
}
47
- tx .Engine .Set (activeKey , []byte {})
49
+ tx .Cache .Set (activeKey , []byte {})
48
50
}
49
51
return nil
50
52
}
51
53
52
54
func (tx * TX ) scanActive () (map [TXID ]struct {}, error ) {
53
55
start := internal .NewBound (TxActivePrefix , internal .Include )
54
56
end := internal .NewBound (TxActivePrefixEnd , internal .Exclude )
55
- iter := tx .Engine .Iter (start , end )
57
+ iter := tx .Cache .Iter (start , end )
56
58
ret := map [TXID ]struct {}{}
57
59
for iter .IsValid () {
58
60
key := iter .Key ()
@@ -86,7 +88,12 @@ func (tx *TX) Write(key string, value []byte) error {
86
88
}
87
89
start := internal .NewBound (startTxKey , internal .Include )
88
90
end := internal .NewBound (endTxKey , internal .Include )
89
- iter := tx .Engine .Iter (start , end )
91
+ cacheIter := tx .Cache .Iter (start , end )
92
+ engineIter := tx .Engine .Iter (start , end )
93
+ iter , err := internal .NewTwoMergeIterstor (cacheIter , engineIter )
94
+ if err != nil {
95
+ return errors .Wrap (err , "NewTwoMergeIterstor" )
96
+ }
90
97
for iter .IsValid () {
91
98
check_id , _ , err := decodeTxKey (iter .Key ())
92
99
if err != nil {
@@ -98,16 +105,12 @@ func (tx *TX) Write(key string, value []byte) error {
98
105
iter .Next ()
99
106
}
100
107
101
- tnWriteKey , err := encodeTxWriteKey (tx .State .TxID , key )
102
- if err != nil {
103
- return errors .Wrap (err , fmt .Sprintf ("encodeTxWriteKey with key: %s" , key ))
104
- }
105
- tx .Engine .Set (tnWriteKey , []byte {})
106
108
txKey , err := encodeTxKey (tx .State .TxID , key )
107
109
if err != nil {
108
110
return errors .Wrap (err , fmt .Sprintf ("encodeTxKey with key: %s" , key ))
109
111
}
110
- tx .Engine .Set (txKey , value )
112
+ tx .Cache .Set (txKey , value )
113
+ tx .State .ActiveKeys [txKey ] = struct {}{}
111
114
return nil
112
115
}
113
116
@@ -123,67 +126,38 @@ func (tx *TX) Commit() error {
123
126
if tx .State .ReadOnly {
124
127
return errors .Wrap (ErrorReadOnly , fmt .Sprintf ("tx with id %d is read only, not need commit" , tx .State .TxID ))
125
128
}
126
- startKey , err := encodeTxWriteKey (tx .State .TxID , "" )
127
- if err != nil {
128
- return errors .Wrap (err , fmt .Sprintf ("encodeTxWriteKey with id: %d" , tx .State .TxID ))
129
- }
130
- endKey := getPrefixEnd (startKey )
131
- start := internal .NewBound (startKey , internal .Include )
132
- end := internal .NewBound (endKey , internal .Exclude )
133
- iter := tx .Engine .Iter (start , end )
134
- removeKeys := []string {}
135
- for iter .IsValid () {
136
- removeKeys = append (removeKeys , iter .Key ())
137
- iter .Next ()
138
- }
139
- for _ , key := range removeKeys {
140
- tx .Engine .Delete (key )
129
+ for k := range tx .State .ActiveKeys {
130
+ if v , ok := tx .Cache .Get (k ); ok {
131
+ tx .Engine .Set (k , v )
132
+ tx .Cache .DeleteReal (k )
133
+ } else {
134
+ return ErrorTxWriteKeyNotAtCache
135
+ }
141
136
}
142
-
143
137
activeKey , err := encodeTxActiveKey (tx .State .TxID )
144
138
if err != nil {
145
139
return errors .Wrap (err , fmt .Sprintf ("getTxActiveKey with %d" , tx .State .TxID ))
146
140
}
147
- tx .Engine . Delete (activeKey )
141
+ tx .Cache . DeleteReal (activeKey )
148
142
return nil
149
143
}
150
144
151
145
func (tx * TX ) RollBack () error {
152
146
if tx .State .ReadOnly {
153
147
return errors .Wrap (ErrorReadOnly , fmt .Sprintf ("tx with id %d is read only, not need rollback" , tx .State .TxID ))
154
148
}
155
- startKey , err := encodeTxWriteKey (tx .State .TxID , "" )
156
- if err != nil {
157
- return errors .Wrap (err , fmt .Sprintf ("encodeTxWriteKey with id: %d" , tx .State .TxID ))
158
- }
159
- endKey := getPrefixEnd (startKey )
160
- start := internal .NewBound (startKey , internal .Include )
161
- end := internal .NewBound (endKey , internal .Exclude )
162
- iter := tx .Engine .Iter (start , end )
163
- removeKeys := []string {}
164
- for iter .IsValid () {
165
- removeKeys = append (removeKeys , iter .Key ())
166
- _ , origin_key , err := decodeTxWriteKey (iter .Key ())
167
- if err != nil {
168
- return errors .Wrap (err , fmt .Sprintf ("decodeTxWriteKey with key: %s" , iter .Key ()))
169
- }
170
- txKey , err := encodeTxKey (tx .State .TxID , origin_key )
171
- if err != nil {
172
- return errors .Wrap (err , fmt .Sprintf ("encodeTxKey with key: %s" , txKey ))
149
+ for k := range tx .State .ActiveKeys {
150
+ if _ , ok := tx .Cache .Get (k ); ok {
151
+ tx .Cache .DeleteReal (k )
152
+ } else {
153
+ return ErrorTxWriteKeyNotAtCache
173
154
}
174
- removeKeys = append (removeKeys , txKey )
175
- iter .Next ()
176
- }
177
-
178
- for _ , key := range removeKeys {
179
- tx .Engine .Delete (key )
180
155
}
181
-
182
156
activeKey , err := encodeTxActiveKey (tx .State .TxID )
183
157
if err != nil {
184
158
return errors .Wrap (err , fmt .Sprintf ("getTxActiveKey with %d" , tx .State .TxID ))
185
159
}
186
- tx .Engine . Delete (activeKey )
160
+ tx .Cache . DeleteReal (activeKey )
187
161
return nil
188
162
}
189
163
@@ -198,7 +172,12 @@ func (tx *TX) Get(key string) ([]byte, error) {
198
172
}
199
173
start := internal .NewBound (startTxKey , internal .Include )
200
174
end := internal .NewBound (endTxKey , internal .Include )
201
- iter := tx .Engine .Iter (start , end )
175
+ cacheIter := tx .Cache .Iter (start , end )
176
+ engineIter := tx .Engine .Iter (start , end )
177
+ iter , err := internal .NewTwoMergeIterstor (cacheIter , engineIter )
178
+ if err != nil {
179
+ return nil , errors .Wrap (err , "NewTwoMergeIterstor" )
180
+ }
202
181
for iter .IsValid () {
203
182
check_id , _ , err := decodeTxKey (iter .Key ())
204
183
if err != nil {
@@ -228,13 +207,18 @@ func (tx *TX) Iter(start, end string) (iface.Iterator, error) {
228
207
229
208
endEngineKey := internal .NewBound (endTxKey , internal .Include )
230
209
210
+ cacheIter := tx .Cache .Iter (startEngineKey , endEngineKey )
231
211
engineIter := tx .Engine .Iter (startEngineKey , endEngineKey )
212
+ iter , err := internal .NewTwoMergeIterstor (cacheIter , engineIter )
213
+ if err != nil {
214
+ return nil , errors .Wrap (err , "NewTwoMergeIterstor" )
215
+ }
232
216
233
217
ret := & TXIterator {
234
218
State : tx .State ,
235
219
Start : start ,
236
220
End : end ,
237
- EngineIterator : engineIter ,
221
+ EngineIterator : iter ,
238
222
}
239
223
err = ret .Next ()
240
224
if err != nil {
0 commit comments