Skip to content

Commit

Permalink
Implement upsert support
Browse files Browse the repository at this point in the history
Closes #52
  • Loading branch information
roji committed Mar 12, 2024
1 parent 053fa31 commit 2b7f9ac
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 14 deletions.
50 changes: 50 additions & 0 deletions Milvus.Client.Tests/DataTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,56 @@ public async Task Insert_Drop()
Assert.Empty(result.Data);
}

[Fact]
public async Task Upsert()
{
await Collection.InsertAsync(
[
FieldData.Create("id", new[] { 1L }),
FieldData.CreateFloatVector("float_vector", new ReadOnlyMemory<float>[] { new[] { 20f, 30f } })
]);

MutationResult upsertResult = await Collection.UpsertAsync(
[
FieldData.Create("id", new[] { 1L, 2L }),
FieldData.CreateFloatVector(
"float_vector",
new ReadOnlyMemory<float>[] { new[] { 1f, 2f }, new[] { 3f, 4f } })
]);

Assert.Collection(upsertResult.Ids.LongIds!,
i => Assert.Equal(1, i),
i => Assert.Equal(2, i));
// TODO: Weirdly these all seem to contain 2, though we're supposed to have inserted one row and updated one
// Assert.Equal(0, upsertResult.DeleteCount);
// Assert.Equal(1, upsertResult.InsertCount);
// Assert.Equal(1, upsertResult.UpsertCount);

IReadOnlyList<FieldData> results = await Collection.QueryAsync(
"id in [1,2]",
new()
{
OutputFields = { "float_vector" },
ConsistencyLevel = ConsistencyLevel.Strong
});

Assert.Collection(
results.OrderBy(r => r.FieldName),
r =>
{
Assert.Equal("float_vector", r.FieldName);
Assert.Collection(
((FloatVectorFieldData)r).Data,
v => Assert.Equal(new[] { 1f, 2f }, v),
v => Assert.Equal(new[] { 3f, 4f }, v));
},
r =>
{
Assert.Equal("id", r.FieldName);
Assert.Equivalent(new[] { 1L, 2L }, ((FieldData<long>)r).Data);
});
}

[Fact]
public async Task Timestamp_conversion()
{
Expand Down
68 changes: 54 additions & 14 deletions Milvus.Client/MilvusCollection.Entity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,60 @@ public async Task<MutationResult> InsertAsync(
request.PartitionName = partitionName;
}

PopulateData(data, request.FieldsData);

request.NumRows = (uint)data[0].RowCount;

Grpc.MutationResult response =
await _client.InvokeAsync(_client.GrpcClient.InsertAsync, request, static r => r.Status, cancellationToken)
.ConfigureAwait(false);

_client.CollectionLastMutationTimestamps[Name] = response.Timestamp;

return new MutationResult(response);
}

/// <summary>
/// Upserts rows of data into a collection.
/// </summary>
/// <param name="data">The field data to upsert; each field contains a list of row values.</param>
/// <param name="partitionName">An optional name of a partition to upsert into.</param>
/// <param name="cancellationToken">
/// The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.
/// </param>
public async Task<MutationResult> UpsertAsync(
IReadOnlyList<FieldData> data,
string? partitionName = null,
CancellationToken cancellationToken = default)
{
Verify.NotNull(data);

UpsertRequest request = new() { CollectionName = Name };

if (partitionName is not null)
{
request.PartitionName = partitionName;
}

PopulateData(data, request.FieldsData);

request.NumRows = (uint)data[0].RowCount;

Grpc.MutationResult response =
await _client.InvokeAsync(_client.GrpcClient.UpsertAsync, request, static r => r.Status, cancellationToken)
.ConfigureAwait(false);

_client.CollectionLastMutationTimestamps[Name] = response.Timestamp;

return new MutationResult(response);
}

private static void PopulateData(IReadOnlyList<FieldData> fieldsData, RepeatedField<Grpc.FieldData> grpcFieldsData)
{
Dictionary<string, object?>?[]? dynamicFieldsData = null;

long count = data[0].RowCount;
foreach (FieldData field in data)
long count = fieldsData[0].RowCount;
foreach (FieldData field in fieldsData)
{
if (field.RowCount != count)
{
Expand All @@ -60,7 +110,7 @@ public async Task<MutationResult> InsertAsync(
}
else
{
request.FieldsData.Add(field.ToGrpcFieldData());
grpcFieldsData.Add(field.ToGrpcFieldData());
}
}

Expand All @@ -73,18 +123,8 @@ public async Task<MutationResult> InsertAsync(
}

FieldData metaFieldData = new FieldData<string>(encodedJsonStrings, MilvusDataType.Json, isDynamic: true);
request.FieldsData.Add(metaFieldData.ToGrpcFieldData());
grpcFieldsData.Add(metaFieldData.ToGrpcFieldData());
}

request.NumRows = (uint)count;

Grpc.MutationResult response =
await _client.InvokeAsync(_client.GrpcClient.InsertAsync, request, static r => r.Status, cancellationToken)
.ConfigureAwait(false);

_client.CollectionLastMutationTimestamps[Name] = response.Timestamp;

return new MutationResult(response);
}

/// <summary>
Expand Down

0 comments on commit 2b7f9ac

Please sign in to comment.