This repository was archived by the owner on May 14, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlog.js
105 lines (95 loc) · 2.69 KB
/
log.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// SPDX-FileCopyrightText: 2021 Anders Rune Jensen
//
// SPDX-License-Identifier: LGPL-3.0-only
const OffsetLog = require('async-append-only-log')
const bipf = require('bipf')
const TooHot = require('too-hot')
const { BLOCK_SIZE, newLogPath, tooHotOpts } = require('./defaults')
module.exports = function (dir, config, privateIndex) {
config = config || {}
config.db2 = config.db2 || {}
const log = OffsetLog(newLogPath(dir), {
blockSize: BLOCK_SIZE,
validateRecord: (d) => {
try {
bipf.decode(d, 0)
return true
} catch (ex) {
return false
}
},
})
log.add = function (key, value, cb) {
const kvt = {
key,
value,
timestamp: Date.now(),
}
const buf = Buffer.alloc(bipf.encodingLength(kvt))
bipf.encode(kvt, buf, 0)
log.append(buf, (err) => {
if (err) cb(err)
else cb(null, kvt)
})
}
log.addTransaction = function (keys, values, cb) {
let buffers = []
let kvts = []
for (let i = 0; i < keys.length; ++i) {
const kvt = {
key: keys[i],
value: values[i],
timestamp: Date.now(),
}
const buf = Buffer.alloc(bipf.encodingLength(kvt))
bipf.encode(kvt, buf, 0)
buffers.push(buf)
kvts.push(kvt)
}
log.appendTransaction(buffers, (err) => {
if (err) cb(err)
else cb(null, kvts)
})
}
// monkey-patch log.get to decrypt the msg
const originalGet = log.get
log.get = function (offset, cb) {
originalGet(offset, (err, buffer) => {
if (err) return cb(err)
else {
const record = { offset, value: buffer }
cb(null, privateIndex.decrypt(record, false).value)
}
})
}
// monkey-patch log.stream to temporarily pause when the CPU is too busy,
// and to decrypt the msg
const originalStream = log.stream
log.stream = function (opts) {
const shouldDecrypt = opts.decrypt === false ? false : true
const tooHot = config.db2.maxCpu ? TooHot(tooHotOpts(config)) : () => false
const s = originalStream(opts)
const originalPipe = s.pipe.bind(s)
s.pipe = function pipe(o) {
let originalWrite = o.write.bind(o)
o.write = (record) => {
const hot = tooHot()
if (hot && !s.sink.paused) {
s.sink.paused = true
hot.then(() => {
if (shouldDecrypt) originalWrite(privateIndex.decrypt(record, true))
else originalWrite(record)
s.sink.paused = false
s.resume()
})
} else {
if (shouldDecrypt) originalWrite(privateIndex.decrypt(record, true))
else originalWrite(record)
}
}
return originalPipe(o)
}
return s
}
return log
}