Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MergeOperator proposal #70

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions RocksDbSharp/ColumnFamilyOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,16 +298,16 @@ private static MergeOperator GetMergeOperatorFromPtr(IntPtr getMergeOperatorPtr)
return getMergeOperator();
}

private unsafe static IntPtr MergeOperator_PartialMerge(IntPtr state, IntPtr key, UIntPtr keyLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, IntPtr success, IntPtr newValueLength)
private unsafe static IntPtr MergeOperator_PartialMerge(IntPtr state, IntPtr key, UIntPtr keyLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, out IntPtr success, out IntPtr newValueLength)
{
var mergeOperator = GetMergeOperatorFromPtr((*((MergeOperatorState*)state)).GetMergeOperatorPtr);
return mergeOperator.PartialMerge(key, keyLength, operandsList, operandsListLength, numOperands, success, newValueLength);
return mergeOperator.PartialMerge(key, keyLength, operandsList, operandsListLength, numOperands, out success, out newValueLength);
}

private unsafe static IntPtr MergeOperator_FullMerge(IntPtr state, IntPtr key, UIntPtr keyLength, IntPtr existingValue, UIntPtr existingValueLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, IntPtr success, IntPtr newValueLength)
private unsafe static IntPtr MergeOperator_FullMerge(IntPtr state, IntPtr key, UIntPtr keyLength, IntPtr existingValue, UIntPtr existingValueLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, out IntPtr success, out IntPtr newValueLength)
{
var mergeOperator = GetMergeOperatorFromPtr((*((MergeOperatorState*)state)).GetMergeOperatorPtr);
return mergeOperator.FullMerge(key, keyLength, existingValue, existingValueLength, operandsList, operandsListLength, numOperands, success, newValueLength);
return mergeOperator.FullMerge(key, keyLength, existingValue, existingValueLength, operandsList, operandsListLength, numOperands, out success, out newValueLength);
}

private unsafe static void MergeOperator_DeleteValue(IntPtr state, IntPtr value, UIntPtr valueLength)
Expand Down
108 changes: 79 additions & 29 deletions RocksDbSharp/MergeOperator.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Text;

namespace RocksDbSharp
{
public interface MergeOperator
{
string Name { get; }
IntPtr PartialMerge(IntPtr key, UIntPtr keyLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, IntPtr success, IntPtr newValueLength);
IntPtr FullMerge(IntPtr key, UIntPtr keyLength, IntPtr existingValue, UIntPtr existingValueLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, IntPtr success, IntPtr newValueLength);
IntPtr PartialMerge(IntPtr key, UIntPtr keyLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, out IntPtr success, out IntPtr newValueLength);
IntPtr FullMerge(IntPtr key, UIntPtr keyLength, IntPtr existingValue, UIntPtr existingValueLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, out IntPtr success, out IntPtr newValueLength);
void DeleteValue(IntPtr value, UIntPtr valueLength);
}

Expand All @@ -22,63 +23,112 @@ public static class MergeOperators
/// This is called to combine two-merge operands (if possible)
/// </summary>
/// <param name="key">The key that's associated with this merge operation</param>
/// <param name="keyLength"></param>
/// <param name="operandsList">the sequence of merge operations to apply, front() first</param>
/// <param name="operandsListLength"></param>
/// <param name="numOperands"></param>
/// <param name="operands">the sequence of merge operations to apply, front() first</param>
/// <param name="success">Client is responsible for filling the merge result here</param>
/// <param name="newValueLength"></param>
/// <returns></returns>
public delegate IntPtr PartialMergeFunc(IntPtr key, UIntPtr keyLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, IntPtr success, IntPtr newValueLength);
public delegate byte[] PartialMergeFunc(byte[] key, byte[][] operands, out bool success);

/// <summary>
/// Gives the client a way to express the read -> modify -> write semantics.
/// Called when a Put/Delete is the *existing_value (or nullptr)
/// </summary>
/// <param name="key">The key that's associated with this merge operation.</param>
/// <param name="keyLength"></param>
/// <param name="existingValue">null indicates that the key does not exist before this op</param>
/// <param name="existingValueLength"></param>
/// <param name="operandsList">the sequence of merge operations to apply, front() first.</param>
/// <param name="operandsListLength"></param>
/// <param name="numOperands"></param>
/// <param name="operands">the sequence of merge operations to apply, front() first.</param>
/// <param name="success">Client is responsible for filling the merge result here</param>
/// <param name="newValueLength"></param>
/// <returns></returns>
public delegate IntPtr FullMergeFunc(IntPtr key, UIntPtr keyLength, IntPtr existingValue, UIntPtr existingValueLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, IntPtr success, IntPtr newValueLength);
public delegate void DeleteValueFunc(IntPtr value, UIntPtr valueLength);
public delegate byte[] FullMergeFunc(byte[] key, byte[] existingValue, byte[][] operands, out bool success);

public static MergeOperator Create(
string name,
PartialMergeFunc partialMerge,
FullMergeFunc fullMerge,
DeleteValueFunc deleteValue)
FullMergeFunc fullMerge)
{
return new MergeOperatorImpl(name, partialMerge, fullMerge, deleteValue);
return new MergeOperatorImpl(name, partialMerge, fullMerge);
}

private class MergeOperatorImpl : MergeOperator
{
public string Name { get; }
private PartialMergeFunc PartialMerge { get; }
private FullMergeFunc FullMerge { get; }
private DeleteValueFunc DeleteValue { get; }

public MergeOperatorImpl(string name, PartialMergeFunc partialMerge, FullMergeFunc fullMerge, DeleteValueFunc deleteValue)
public MergeOperatorImpl(string name, PartialMergeFunc partialMerge, FullMergeFunc fullMerge)
{
Name = name;
PartialMerge = partialMerge;
FullMerge = fullMerge;
DeleteValue = deleteValue;
}

IntPtr MergeOperator.PartialMerge(IntPtr key, UIntPtr keyLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, IntPtr success, IntPtr newValueLength)
=> PartialMerge(key, keyLength, operandsList, operandsListLength, numOperands, success, newValueLength);
IntPtr MergeOperator.PartialMerge(IntPtr key, UIntPtr keyLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, out IntPtr success, out IntPtr newValueLength)
{
var _key = new byte[(uint)keyLength];
Marshal.Copy(key, _key, 0, _key.Length);

var _operandsList = new IntPtr[numOperands];
Marshal.Copy(operandsList, _operandsList, 0, _operandsList.Length);

var _operandsListLength = new long[numOperands];
Marshal.Copy(operandsListLength, _operandsListLength, 0, _operandsListLength.Length);

var operands = new byte[numOperands][];
for (int i = 0; i < numOperands; i++)
{
var operand = new byte[_operandsListLength[i]];
Marshal.Copy(_operandsList[i], operand, 0, operand.Length);
operands[i] = operand;
}

var value = PartialMerge(_key, operands, out var _success);

IntPtr MergeOperator.FullMerge(IntPtr key, UIntPtr keyLength, IntPtr existingValue, UIntPtr existingValueLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, IntPtr success, IntPtr newValueLength)
=> FullMerge(key, keyLength, existingValue, existingValueLength, operandsList, operandsListLength, numOperands, success, newValueLength);
var ret = Marshal.AllocHGlobal(value.Length);
Marshal.Copy(value, 0, ret, value.Length);
newValueLength = (IntPtr)value.Length;

success = (IntPtr)Convert.ToInt32(_success);

return ret;
}

IntPtr MergeOperator.FullMerge(IntPtr key, UIntPtr keyLength, IntPtr existingValue, UIntPtr existingValueLength, IntPtr operandsList, IntPtr operandsListLength, int numOperands, out IntPtr success, out IntPtr newValueLength)
{
var _key = new byte[(uint)keyLength];
Marshal.Copy(key, _key, 0, _key.Length);

byte[] _existingValue = null;
if (existingValue != IntPtr.Zero)
{
_existingValue = new byte[(uint)existingValueLength];
Marshal.Copy(existingValue, _existingValue, 0, _existingValue.Length);
}

var _operandsList = new IntPtr[numOperands];
Marshal.Copy(operandsList, _operandsList, 0, _operandsList.Length);

var _operandsListLength = new long[numOperands];
Marshal.Copy(operandsListLength, _operandsListLength, 0, _operandsListLength.Length);

var operands = new byte[numOperands][];

for (int i = 0; i < numOperands; i++)
{
var operand = new byte[_operandsListLength[i]];
Marshal.Copy(_operandsList[i], operand, 0, operand.Length);
operands[i] = operand;
}

var value = FullMerge(_key, _existingValue, operands, out var _success);

var ret = Marshal.AllocHGlobal(value.Length);
Marshal.Copy(value, 0, ret, value.Length);
newValueLength = (IntPtr)value.Length;

success = (IntPtr)Convert.ToInt32(_success);

return ret;
}

void MergeOperator.DeleteValue(IntPtr value, UIntPtr valueLength)
=> DeleteValue(value, valueLength);
void MergeOperator.DeleteValue(IntPtr value, UIntPtr valueLength) => Marshal.FreeHGlobal(value);
}
}
}
}
4 changes: 2 additions & 2 deletions RocksDbSharp/Native.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ namespace RocksDbSharp
public delegate char_ptr CreateFilterDelegate(void_ptr p0, const_char_ptr_const_ptr key_array, const_size_t_ptr key_length_array, int num_keys, size_t_ptr filter_length);
public delegate char KeyMayMatchDelegate(void_ptr p0, const_char_ptr key, size_t length, const_char_ptr filter, size_t filter_length);
public delegate void DeleteFilterDelegate(void_ptr p0, const_char_ptr filter, size_t filter_length);
public delegate char_ptr FullMergeDelegate(void_ptr p0, const_char_ptr key, size_t key_length, const_char_ptr existing_value, size_t existing_value_length, const_char_ptr_const_ptr operands_list, const_size_t_ptr operands_list_length, int num_operands, unsigned_char_ptr success, size_t_ptr new_value_length);
public delegate char_ptr PartialMergeDelegate(void_ptr p0, const_char_ptr key, size_t key_length, const_char_ptr_const_ptr operands_list, const_size_t_ptr operands_list_length, int num_operands, unsigned_char_ptr success, size_t_ptr new_value_length);
public delegate char_ptr FullMergeDelegate(void_ptr p0, const_char_ptr key, size_t key_length, const_char_ptr existing_value, size_t existing_value_length, const_char_ptr_const_ptr operands_list, const_size_t_ptr operands_list_length, int num_operands, out unsigned_char_ptr success, out size_t_ptr new_value_length);
public delegate char_ptr PartialMergeDelegate(void_ptr p0, const_char_ptr key, size_t key_length, const_char_ptr_const_ptr operands_list, const_size_t_ptr operands_list_length, int num_operands, out unsigned_char_ptr success, out size_t_ptr new_value_length);
public delegate void DeleteValueDelegate(void_ptr p0, const_char_ptr value, size_t value_length);
public delegate char_ptr TransformDelegate(void_ptr p0, const_char_ptr key, size_t length, size_t_ptr dst_length);
public delegate char InDomainDelegate(void_ptr p0, const_char_ptr key, size_t length);
Expand Down
19 changes: 16 additions & 3 deletions tests/RocksDbSharpTest/FunctionalTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,8 @@ public void FunctionalTest()
.SetCreateIfMissing(true)
.SetMergeOperator(MergeOperators.Create(
name: "test-merge-operator",
partialMerge: (key, keyLength, operandsList, operandsListLength, numOperands, success, newValueLength) => IntPtr.Zero,
fullMerge: (key, keyLength, existingValue, existingValueLength, operandsList, operandsListLength, numOperands, success, newValueLength) => IntPtr.Zero,
deleteValue: (value, valueLength) => { }
partialMerge: PartialMerge,
fullMerge: FullMerge
));
GC.Collect();
using (var db = RocksDbSharp.RocksDb.Open(optsTest, dbname))
Expand Down Expand Up @@ -474,5 +473,19 @@ class IntegerStringComparator : StringComparatorBase
public override int Compare(string a, string b)
=> Comparer(long.TryParse(a, out long avalue) ? avalue : 0, long.TryParse(b, out long bvalue) ? bvalue : 0);
}

private static byte[] PartialMerge(byte[] key, byte[][] operands, out bool success)
{
success = true;

return operands[operands.Length - 1];
}

private static byte[] FullMerge(byte[] key, byte[] existingValue, byte[][] operands, out bool success)
{
success = true;

return operands[operands.Length - 1];
}
}
}