-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgkv.go
144 lines (111 loc) · 3.04 KB
/
gkv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package gkv
import (
"bytes"
"database/sql"
"encoding/binary"
"fmt"
)
type KeyValueStore interface {
Get(key string) (string, error)
Set(key string, value string) error
GetUInt64(key string) (uint64, error)
SetUInt64(key string, value uint64) error
Del(key string) error
Close() error
}
type SQLKeyValueStoreConfig struct {
// database table details
TableName string `yaml:"table_name"`
KeyColumn string `yaml:"key_column"`
ValueColumn string `yaml:"value_column"`
// limits
MaxKeyLen int `yaml:"max_key_len"`
MaxValueLen int `yaml:"max_value_len"`
}
type SQLKeyValueStore struct {
config *SQLKeyValueStoreConfig
maxKeyLen int
maxValueLen int
stmtGet *sql.Stmt
stmtSet *sql.Stmt
stmtDel *sql.Stmt
}
type closable interface {
Close() error
}
func NewMySQLKeyValueStore(connection *sql.DB, config *SQLKeyValueStoreConfig) (*SQLKeyValueStore, error) {
var err error
keyValueStore := &SQLKeyValueStore{
config: config,
maxKeyLen: config.MaxKeyLen,
maxValueLen: config.MaxValueLen,
}
keyValueStore.stmtGet, err = connection.Prepare(fmt.Sprintf("SELECT `%s` FROM `%s` WHERE `%s` = BINARY ?", config.ValueColumn, config.TableName, config.KeyColumn))
if err != nil {
return nil, err
}
keyValueStore.stmtSet, err = connection.Prepare(fmt.Sprintf("INSERT INTO `%s`(`%s`, `%s`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `%s` = VALUES(`%s`)", config.TableName, config.KeyColumn, config.ValueColumn, config.ValueColumn, config.ValueColumn))
if err != nil {
return nil, err
}
keyValueStore.stmtDel, err = connection.Prepare(fmt.Sprintf("DELETE FROM `%s` WHERE `%s` = ?", config.TableName, config.KeyColumn))
if err != nil {
return nil, err
}
return keyValueStore, nil
}
func (s *SQLKeyValueStore) Get(key string) (string, error) {
var value string
row := s.stmtGet.QueryRow(key)
err := row.Scan(&value)
if err == sql.ErrNoRows {
return "", nil
} else if err != nil {
return "", err
}
return value, nil
}
func (s *SQLKeyValueStore) Set(key string, value string) error {
if len(key) > s.maxKeyLen {
return fmt.Errorf("Key is too long")
}
if len(value) > s.maxValueLen {
return fmt.Errorf("Value is too long")
}
_, err := s.stmtSet.Exec(key, value)
return err
}
func (s *SQLKeyValueStore) GetUInt64(key string) (uint64, error) {
data := make([]byte, 8)
row := s.stmtGet.QueryRow(key)
err := row.Scan(&data)
if err == sql.ErrNoRows {
return 0, nil
} else if err != nil {
return 0, err
}
var num uint64
err = binary.Read(bytes.NewBuffer(data[:]), binary.LittleEndian, &num)
if err != nil {
return 0, err
}
return num, nil
}
func (s *SQLKeyValueStore) SetUInt64(key string, value uint64) error {
data := make([]byte, 8)
binary.LittleEndian.PutUint64(data, value)
_, err := s.stmtSet.Exec(key, data)
return err
}
func (s *SQLKeyValueStore) Del(key string) error {
_, err := s.stmtDel.Exec(key)
return err
}
func (s *SQLKeyValueStore) Close() error {
for _, c := range []*sql.Stmt{s.stmtSet, s.stmtGet, s.stmtDel} {
if err := c.Close(); err != nil {
return err
}
}
return nil
}