Skip to content

Commit

Permalink
Merge pull request #217 from TileDB-Inc/improve-serialization
Browse files Browse the repository at this point in the history
Add serialization helper func, create copy of bytes to prevent gc
  • Loading branch information
snagles authored Apr 20, 2022
2 parents 5687906 + c0e7346 commit bba32b4
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 70 deletions.
23 changes: 2 additions & 21 deletions array_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,11 @@ type ArraySchema struct {

// MarshalJSON marshal arraySchema struct to json using tiledb
func (a *ArraySchema) MarshalJSON() ([]byte, error) {
clientSide := false // Currently this parameter is unused in libtiledb
buffer, err := SerializeArraySchema(a, TILEDB_JSON, clientSide)
bs, err := SerializeArraySchema(a, TILEDB_JSON, false)
if err != nil {
return nil, fmt.Errorf("Error marshaling json for array schema: %s", a.context.LastError())
}

bytes, err := buffer.Data()
if err != nil {
return nil, fmt.Errorf("Error marshaling json for array schema: %s", buffer.context.LastError())
}

// Create a full copy of the byte slice, as the Buffer object owns the memory.
size := len(bytes)
cpy := make([]byte, size)

copy(cpy, bytes)

// Check if the last character is a null byte, if so remove it from the slice
if cpy[size-1] == 0 {
cpy = cpy[:size-1]
}

runtime.KeepAlive(buffer)
return cpy, nil
return bs, nil
}

// Context exposes the internal TileDB context used to initialize the array schema
Expand Down
9 changes: 7 additions & 2 deletions array_schema_evolution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,16 @@ func TestArraySchemaEvolution(t *testing.T) {
// Remove atrribute a1
require.NoError(t, arraySchemaEvolution.DropAttribute("a1"))

buffer, err := SerializeArraySchemaEvolution(arraySchemaEvolution,
bs, err := SerializeArraySchemaEvolution(arraySchemaEvolution,
TILEDB_CAPNP, true)
require.NoError(t, err)

newArraySchemaEvolution, err := DeserializeArraySchemaEvolution(buffer,
buff, err := NewBuffer(context)

err = buff.SetBuffer(bs)
require.NoError(t, err)

newArraySchemaEvolution, err := DeserializeArraySchemaEvolution(buff,
TILEDB_CAPNP, true)
require.NoError(t, err)

Expand Down
61 changes: 48 additions & 13 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package tiledb
*/
import "C"
import (
"bytes"
"fmt"
"runtime"
"unsafe"
Expand Down Expand Up @@ -77,25 +78,36 @@ func (b *Buffer) Type() (Datatype, error) {
return Datatype(bufferType), nil
}

// Data returns a byte slice backed by the underlying C memory region of the buffer
func (b *Buffer) Data() ([]byte, error) {
var cbuffer unsafe.Pointer
var csize C.uint64_t
func (b *Buffer) Serialize(serializationType SerializationType) ([]byte, error) {
switch serializationType {
case TILEDB_CAPNP:
return b.asCapnp()
case TILEDB_JSON:
return b.asJSON()
default:
return nil, fmt.Errorf("unsupported serialization type: %v", serializationType)
}
}

ret := C.tiledb_buffer_get_data(b.context.tiledbContext, b.tiledbBuffer, &cbuffer, &csize)
if ret != C.TILEDB_OK {
return nil, fmt.Errorf("Error getting tiledb buffer data: %s", b.context.LastError())
func (b *Buffer) asJSON() ([]byte, error) {
bs, err := b.bytes()
if err != nil {
return nil, err
}
// cstrings are null terminated. Go's are not, remove it
return bytes.TrimSuffix(bs, []byte("\u0000")), nil
}

if cbuffer == nil {
return nil, nil
} else {
size := uint64(csize)
return (*[1 << 46]uint8)(unsafe.Pointer(cbuffer))[:size:size], nil
func (b *Buffer) asCapnp() ([]byte, error) {
// Create a full copy of the byte slice, as the Buffer object owns the memory.
bs, err := b.bytes()
if err != nil {
return nil, err
}
return bs, nil
}

// SetType sets the data pointer and size on the Buffer to the given slice
// SetBuffer sets the data pointer and size on the Buffer to the given slice
func (b *Buffer) SetBuffer(buffer []byte) error {
bufferSize := len(buffer)

Expand All @@ -106,3 +118,26 @@ func (b *Buffer) SetBuffer(buffer []byte) error {

return nil
}

// bytes returns a byte slice backed by the underlying C memory region of the buffer
func (b *Buffer) bytes() ([]byte, error) {
var cbuffer unsafe.Pointer
var csize C.uint64_t

ret := C.tiledb_buffer_get_data(b.context.tiledbContext, b.tiledbBuffer, &cbuffer, &csize)
if ret != C.TILEDB_OK {
return nil, fmt.Errorf("Error getting tiledb buffer data: %s", b.context.LastError())
}

if cbuffer == nil {
return nil, nil
}

size := uint64(csize)
bs := (*[1 << 46]uint8)(unsafe.Pointer(cbuffer))[:size:size]

cpy := make([]byte, len(bs))
copy(cpy, bs)
runtime.KeepAlive(b)
return cpy, nil
}
4 changes: 2 additions & 2 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func ExampleNewBuffer() {
}

// Get data slice
bytes, err := buffer.Data()
bytes, err := buffer.bytes()
if err != nil {
// Handle error
return
Expand All @@ -42,7 +42,7 @@ func TestNewBuffer(t *testing.T) {
require.NoError(t, err)
assert.NotNil(t, buffer)

bytes, err := buffer.Data()
bytes, err := buffer.bytes()
require.NoError(t, err)
assert.Nil(t, bytes)

Expand Down
18 changes: 4 additions & 14 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package tiledb

import (
"bytes"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -62,7 +61,7 @@ func (g *Group) Deserialize(buffer *Buffer, serializationType SerializationType,
cClientSide = 0
}

b, err := buffer.Data()
b, err := buffer.bytes()
if err != nil {
return errors.New("failed to retrieve bytes from buffer")
}
Expand Down Expand Up @@ -541,7 +540,7 @@ func (g *Group) Dump(recurse bool) (string, error) {
}

// SerializeGroupMetadata gets and serializes the group metadata
func SerializeGroupMetadata(g *Group, serializationType SerializationType) (*Buffer, error) {
func SerializeGroupMetadata(g *Group, serializationType SerializationType) ([]byte, error) {
buffer := Buffer{context: g.context}
// Set finalizer for free C pointer on gc
runtime.SetFinalizer(&buffer, func(buffer *Buffer) {
Expand All @@ -553,21 +552,12 @@ func SerializeGroupMetadata(g *Group, serializationType SerializationType) (*Buf
return nil, fmt.Errorf("Error serializing group metadata: %s", g.context.LastError())
}

b, err := buffer.Data()
if err != nil {
return nil, errors.New("failed to retrieve bytes from buffer")
}
// cstrings are null terminated. Go's are not, remove the suffix if it exists
if err := buffer.SetBuffer(bytes.TrimSuffix(b, []byte("\u0000"))); err != nil {
return nil, errors.New("failed to remove null terminator from buffer")
}

return &buffer, nil
return buffer.Serialize(serializationType)
}

// DeserializeGroupMetadata deserializes group metadata
func DeserializeGroupMetadata(g *Group, buffer *Buffer, serializationType SerializationType) error {
b, err := buffer.Data()
b, err := buffer.bytes()
if err != nil {
return errors.New("failed to retrieve bytes from buffer")
}
Expand Down
8 changes: 4 additions & 4 deletions query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ func TestDenseQueryWrite(t *testing.T) {
bufferA5 := "hello" + "world"
offsetBufferA5 := []uint64{0, 5}
// Second string array so we can compare reads
bufferA5Comparison := make([]byte, len(bufferA5)) //new(string, len(bufferA5))
bufferA5Comparison := make([]byte, len(bufferA5)) // new(string, len(bufferA5))
elementsCopied = copy(bufferA5Comparison, bufferA5)
assert.Equal(t, len(bufferA5), elementsCopied)
assert.EqualValues(t, bufferA5, bufferA5Comparison)
Expand Down Expand Up @@ -1469,7 +1469,7 @@ func TestDenseQueryWrite(t *testing.T) {
_, _, err = query.SetBufferVar("a4", readOffsetBufferA4, readBufferA4)
require.NoError(t, err)

readBufferA5 := make([]byte, 10) //make(string, 10)
readBufferA5 := make([]byte, 10) // make(string, 10)
readOffsetBufferA5 := make([]uint64, 2)
_, _, err = query.SetBufferVar("a5", readOffsetBufferA5, readBufferA5)
require.NoError(t, err)
Expand Down Expand Up @@ -1651,8 +1651,8 @@ func TestSparseQueryWrite(t *testing.T) {
assert.NotNil(t, query)

// Set read subarray to only data that was written
//err = query.SetSubArray(subArray)
//require.NoError(t, err)
// err = query.SetSubArray(subArray)
// require.NoError(t, err)

// Set coordinates, since test is 1d, this is subarray
_, err = query.SetBuffer("dim1", subArray)
Expand Down
24 changes: 12 additions & 12 deletions serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

// SerializeArraySchema serializes an array schema
func SerializeArraySchema(schema *ArraySchema, serializationType SerializationType, clientSide bool) (*Buffer, error) {
func SerializeArraySchema(schema *ArraySchema, serializationType SerializationType, clientSide bool) ([]byte, error) {
var cClientSide C.int32_t
if clientSide {
cClientSide = 1
Expand All @@ -37,7 +37,7 @@ func SerializeArraySchema(schema *ArraySchema, serializationType SerializationTy
return nil, fmt.Errorf("Error serializing array schema: %s", schema.context.LastError())
}

return &buffer, nil
return buffer.Serialize(serializationType)
}

// DeserializeArraySchema deserializes a new array schema from the given buffer
Expand Down Expand Up @@ -68,7 +68,7 @@ func DeserializeArraySchema(buffer *Buffer, serializationType SerializationType,
}

// SerializeArrayNonEmptyDomain gets and serializes the array nonempty domain
func SerializeArrayNonEmptyDomain(a *Array, serializationType SerializationType) (*Buffer, error) {
func SerializeArrayNonEmptyDomain(a *Array, serializationType SerializationType) ([]byte, error) {
schema, err := a.Schema()
if err != nil {
return nil, err
Expand Down Expand Up @@ -106,7 +106,7 @@ func SerializeArrayNonEmptyDomain(a *Array, serializationType SerializationType)
return nil, fmt.Errorf("Error serializing array nonempty domain: %s", a.context.LastError())
}

return &buffer, nil
return buffer.Serialize(serializationType)
}

// DeserializeArrayNonEmptyDomain deserializes an array nonempty domain
Expand Down Expand Up @@ -217,7 +217,7 @@ func DeserializeArrayNonEmptyDomain(a *Array, buffer *Buffer, serializationType
}

// SerializeArrayNonEmptyDomainAllDimensions gets and serializes the array nonempty domain
func SerializeArrayNonEmptyDomainAllDimensions(a *Array, serializationType SerializationType) (*Buffer, error) {
func SerializeArrayNonEmptyDomainAllDimensions(a *Array, serializationType SerializationType) ([]byte, error) {

buffer := Buffer{context: a.context}
// Set finalizer for free C pointer on gc
Expand All @@ -231,7 +231,7 @@ func SerializeArrayNonEmptyDomainAllDimensions(a *Array, serializationType Seria
return nil, fmt.Errorf("Error serializing array nonempty domain: %s", a.context.LastError())
}

return &buffer, nil
return buffer.Serialize(serializationType)
}

// DeserializeArrayNonEmptyDomainAllDimensions deserializes an array nonempty domain
Expand All @@ -247,7 +247,7 @@ func DeserializeArrayNonEmptyDomainAllDimensions(a *Array, buffer *Buffer, seria
}

// SerializeArrayMaxBufferSizes gets and serializes the array max buffer sizes for the given subarray
func SerializeArrayMaxBufferSizes(a *Array, subarray interface{}, serializationType SerializationType) (*Buffer, error) {
func SerializeArrayMaxBufferSizes(a *Array, subarray interface{}, serializationType SerializationType) ([]byte, error) {
// Create subarray void*
var cSubarray unsafe.Pointer
if reflect.TypeOf(subarray).Kind() != reflect.Slice {
Expand Down Expand Up @@ -306,7 +306,7 @@ func SerializeArrayMaxBufferSizes(a *Array, subarray interface{}, serializationT
return nil, fmt.Errorf("error serializing array max buffer sizes: %s", a.context.LastError())
}

return &buffer, nil
return buffer.Serialize(serializationType)
}

// SerializeQuery serializes a query
Expand Down Expand Up @@ -350,7 +350,7 @@ func DeserializeQuery(query *Query, buffer *Buffer, serializationType Serializat
}

// SerializeArrayMetadata gets and serializes the array metadata
func SerializeArrayMetadata(a *Array, serializationType SerializationType) (*Buffer, error) {
func SerializeArrayMetadata(a *Array, serializationType SerializationType) ([]byte, error) {
buffer := Buffer{context: a.context}
// Set finalizer for free C pointer on gc
runtime.SetFinalizer(&buffer, func(buffer *Buffer) {
Expand All @@ -362,7 +362,7 @@ func SerializeArrayMetadata(a *Array, serializationType SerializationType) (*Buf
return nil, fmt.Errorf("Error serializing array metadata: %s", a.context.LastError())
}

return &buffer, nil
return buffer.Serialize(serializationType)
}

// DeserializeArrayMetadata deserializes array metadata
Expand All @@ -375,7 +375,7 @@ func DeserializeArrayMetadata(a *Array, buffer *Buffer, serializationType Serial
}

// SerializeQueryEstResultSizes gets and serializes the query estimated result sizes
func SerializeQueryEstResultSizes(q *Query, serializationType SerializationType, clientSide bool) (*Buffer, error) {
func SerializeQueryEstResultSizes(q *Query, serializationType SerializationType, clientSide bool) ([]byte, error) {
var cClientSide C.int32_t
if clientSide {
cClientSide = 1
Expand All @@ -394,7 +394,7 @@ func SerializeQueryEstResultSizes(q *Query, serializationType SerializationType,
return nil, fmt.Errorf("Error serializing query est buffer sizes: %s", q.context.LastError())
}

return &buffer, nil
return buffer.Serialize(serializationType)
}

// DeserializeQueryEstResultSizes deserializes query estimated result sizes
Expand Down
4 changes: 2 additions & 2 deletions serialize_array_schema_evolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

// SerializeArraySchemaEvolution serializes the given array schema evolution
func SerializeArraySchemaEvolution(arraySchemaEvolution *ArraySchemaEvolution, serializationType SerializationType, clientSide bool) (*Buffer, error) {
func SerializeArraySchemaEvolution(arraySchemaEvolution *ArraySchemaEvolution, serializationType SerializationType, clientSide bool) ([]byte, error) {
var cClientSide C.int32_t
if clientSide {
cClientSide = 1
Expand All @@ -46,7 +46,7 @@ func SerializeArraySchemaEvolution(arraySchemaEvolution *ArraySchemaEvolution, s
arraySchemaEvolution.context.LastError())
}

return &buffer, nil
return buffer.Serialize(serializationType)
}

// DeserializeArraySchemaEvolution deserializes a new array schema evolution object from the given buffer
Expand Down

0 comments on commit bba32b4

Please sign in to comment.