Skip to content

Commit

Permalink
Fix various issues with custom fields
Browse files Browse the repository at this point in the history
  • Loading branch information
ejsmith committed Feb 9, 2025
1 parent 3576775 commit d534cfe
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ private ICustomFieldDefinitionRepository CreateCustomFieldDefinitionRepository()
return new CustomFieldDefinitionRepository(_customFieldDefinitionIndex, _lockProvider);
}

public void AddCustomFieldIndex(string name = "customfield", int replicas = 1)
public CustomFieldDefinitionIndex AddCustomFieldIndex(string name = "customfield", int replicas = 1)
{
_customFieldDefinitionIndex = new CustomFieldDefinitionIndex(this, name, replicas);
AddIndex(_customFieldDefinitionIndex);
return _customFieldDefinitionIndex;
}

public void AddIndex(IIndex index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public async Task<IDictionary<string, CustomFieldDefinition>> GetFieldMappingAsy
fieldMapping[customField.Name] = customField;
} while (await fields.NextPageAsync().AnyContext());

fieldMapping = fieldMapping.OrderBy(f => f.Value.ProcessOrder).ToDictionary(kvp => kvp.Key, kvp => kvp.Value, StringComparer.OrdinalIgnoreCase);

if (fieldMapping.Count > 0)
await _cache.AddAsync(cacheKey, fieldMapping, TimeSpan.FromMinutes(15)).AnyContext();

Expand Down Expand Up @@ -99,7 +101,8 @@ public Task<CustomFieldDefinition> AddFieldAsync(string entityType, string tenan

public override async Task AddAsync(IEnumerable<CustomFieldDefinition> documents, ICommandOptions options = null)
{
var fieldScopes = documents.GroupBy(d => (d.EntityType, d.TenantKey, d.IndexType)).ToArray();
var documentArray = documents as CustomFieldDefinition[] ?? documents.ToArray();
var fieldScopes = documentArray.GroupBy(d => (d.EntityType, d.TenantKey, d.IndexType)).ToArray();
string[] lockKeys = fieldScopes.Select(f => GetLockName(f.Key.EntityType, f.Key.TenantKey, f.Key.IndexType)).ToArray();
await using var lck = await _lockProvider.AcquireAsync(lockKeys, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)).AnyContext();
if (lck is null)
Expand All @@ -108,7 +111,7 @@ public override async Task AddAsync(IEnumerable<CustomFieldDefinition> documents
foreach (var fieldScope in fieldScopes)
{
string slotFieldScopeKey = GetSlotFieldScopeCacheKey(fieldScope.Key.EntityType, fieldScope.Key.TenantKey, fieldScope.Key.IndexType);
string namesFieldScopeKey = GetNamesFieldScopeCacheKey(fieldScope.Key.EntityType, fieldScope.Key.TenantKey, fieldScope.Key.IndexType);
string namesFieldScopeKey = GetNamesFieldScopeCacheKey(fieldScope.Key.EntityType, fieldScope.Key.TenantKey);

var usedNames = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var availableSlots = new Queue<int>();
Expand All @@ -131,15 +134,15 @@ public override async Task AddAsync(IEnumerable<CustomFieldDefinition> documents
var existingFields = await FindAsync(q => q
.FieldEquals(cf => cf.EntityType, fieldScope.Key.EntityType)
.FieldEquals(cf => cf.TenantKey, fieldScope.Key.TenantKey)
.FieldEquals(cf => cf.IndexType, fieldScope.Key.IndexType)
.Include(cf => cf.IndexType)
.Include(cf => cf.IndexSlot)
.Include(cf => cf.Name)
.Include(cf => cf.IsDeleted),
o => o.IncludeSoftDeletes().PageLimit(1000).QueryLogLevel(Microsoft.Extensions.Logging.LogLevel.Information)).AnyContext();

do
{
usedSlots.AddRange(existingFields.Documents.Select(d => d.IndexSlot));
usedSlots.AddRange(existingFields.Documents.Where(f => f.IndexType == fieldScope.Key.IndexType).Select(d => d.IndexSlot));
usedNames.AddRange(existingFields.Documents.Where(d => !d.IsDeleted).Select(d => d.Name));
} while (await existingFields.NextPageAsync().AnyContext());

Expand Down Expand Up @@ -175,7 +178,7 @@ public override async Task AddAsync(IEnumerable<CustomFieldDefinition> documents
}
}

await base.AddAsync(documents, options).AnyContext();
await base.AddAsync(documentArray, options).AnyContext();
}

protected override Task ValidateAndThrowAsync(CustomFieldDefinition document)
Expand Down Expand Up @@ -210,7 +213,7 @@ private async Task OnDocumentsChanged(object source, DocumentsChangeEventArgs<Cu

if (doc.Value.IsDeleted)
{
string namesFieldScopeKey = GetNamesFieldScopeCacheKey(doc.Value.EntityType, doc.Value.TenantKey, doc.Value.IndexType);
string namesFieldScopeKey = GetNamesFieldScopeCacheKey(doc.Value.EntityType, doc.Value.TenantKey);
await _cache.ListRemoveAsync(namesFieldScopeKey, new[] { doc.Value.Name }).AnyContext();
}
}
Expand All @@ -220,9 +223,9 @@ private async Task OnDocumentsChanged(object source, DocumentsChangeEventArgs<Cu
foreach (var doc in args.Documents)
{
string slotFieldScopeKey = GetSlotFieldScopeCacheKey(doc.Value.EntityType, doc.Value.TenantKey, doc.Value.IndexType);
string namesFieldScopeKey = GetNamesFieldScopeCacheKey(doc.Value.EntityType, doc.Value.TenantKey, doc.Value.IndexType);
await _cache.ListAddAsync(slotFieldScopeKey, new[] { doc.Value.IndexSlot }).AnyContext();
await _cache.ListRemoveAsync(namesFieldScopeKey, new[] { doc.Value.Name }).AnyContext();
string namesFieldScopeKey = GetNamesFieldScopeCacheKey(doc.Value.EntityType, doc.Value.TenantKey);
await _cache.ListAddAsync(slotFieldScopeKey, [doc.Value.IndexSlot]).AnyContext();
await _cache.ListRemoveAsync(namesFieldScopeKey, [doc.Value.Name]).AnyContext();
}
}
}
Expand All @@ -242,9 +245,9 @@ private string GetSlotFieldScopeCacheKey(string entityType, string tenantKey, st
return $"customfield:{entityType}:{tenantKey}:{indexType}:slots";
}

private string GetNamesFieldScopeCacheKey(string entityType, string tenantKey, string indexType)
private string GetNamesFieldScopeCacheKey(string entityType, string tenantKey)
{
return $"customfield:{entityType}:{tenantKey}:{indexType}:names";
return $"customfield:{entityType}:{tenantKey}:names";
}

protected override async Task InvalidateCacheByQueryAsync(IRepositoryQuery<CustomFieldDefinition> query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public async Task<T> AddAsync(T document, ICommandOptions options = null)
if (document == null)
throw new ArgumentNullException(nameof(document));

await AddAsync(new[] { document }, options).AnyContext();
await AddAsync([document], options).AnyContext();
return document;
}

Expand Down Expand Up @@ -968,13 +968,11 @@ protected virtual async Task OnCustomFieldsBeforeQuery(object sender, BeforeQuer

protected virtual async Task OnCustomFieldsDocumentsChanging(object sender, DocumentsChangeEventArgs<T> args)
{
var tenantGroups = args.Documents.Select(d => d.Value).GroupBy(e => GetDocumentTenantKey(e)).Where(g => g.Key != null).ToList();
var tenantGroups = args.Documents.Select(d => d.Value).GroupBy(GetDocumentTenantKey).Where(g => g.Key != null).ToList();

foreach (var tenant in tenantGroups)
{
var fieldDefinitions = await ElasticIndex.Configuration.CustomFieldDefinitionRepository.GetFieldMappingAsync(EntityTypeName, tenant.Key);
var processOnValueFields = fieldDefinitions.Where(f => f.Value.ProcessMode == CustomFieldProcessMode.ProcessOnValue).OrderBy(f => f.Value.ProcessOrder).ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
var alwaysProcessFields = fieldDefinitions.Where(f => f.Value.ProcessMode == CustomFieldProcessMode.AlwaysProcess).OrderBy(f => f.Value.ProcessOrder).ToDictionary(kvp => kvp.Key, kvp => kvp.Value);

foreach (var doc in tenant)
{
Expand All @@ -990,11 +988,13 @@ protected virtual async Task OnCustomFieldsDocumentsChanging(object sender, Docu

foreach (var customField in customFields)
{
if (!processOnValueFields.TryGetValue(customField.Key, out var fieldDefinition))
if (!fieldDefinitions.TryGetValue(customField.Key, out var fieldDefinition))
{
fieldDefinition = await HandleUnmappedCustomField(doc, customField.Key, customField.Value);
fieldDefinition = await HandleUnmappedCustomField(doc, customField.Key, customField.Value, fieldDefinitions);
if (fieldDefinition == null)
continue;

fieldDefinitions[customField.Key] = fieldDefinition;
}

if (!ElasticIndex.CustomFieldTypes.TryGetValue(fieldDefinition.IndexType, out var fieldType))
Expand All @@ -1011,7 +1011,7 @@ protected virtual async Task OnCustomFieldsDocumentsChanging(object sender, Docu
await ElasticIndex.Configuration.CustomFieldDefinitionRepository.SaveAsync(fieldDefinition);
}

foreach (var alwaysProcessField in alwaysProcessFields.Values)
foreach (var alwaysProcessField in fieldDefinitions.Values.Where(f => f.ProcessMode == CustomFieldProcessMode.AlwaysProcess))
{
if (!ElasticIndex.CustomFieldTypes.TryGetValue(alwaysProcessField.IndexType, out var fieldType))
{
Expand All @@ -1031,7 +1031,7 @@ protected virtual async Task OnCustomFieldsDocumentsChanging(object sender, Docu
}
}

protected virtual async Task<CustomFieldDefinition> HandleUnmappedCustomField(T document, string name, object value)
protected virtual async Task<CustomFieldDefinition> HandleUnmappedCustomField(T document, string name, object value, IDictionary<string, CustomFieldDefinition> existingFields)
{
if (!AutoCreateCustomFields)
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,29 @@ public async Task CanAddNewFieldsAndReserveSlots()
Assert.Contains(mapping.Keys, c => c == "MyField3");
}

[Fact]
public async Task WontAllowSameFieldNameWithDifferentType()
{
var customField = await _customFieldDefinitionRepository.AddAsync(new CustomFieldDefinition
{
EntityType = nameof(EmployeeWithCustomFields),
TenantKey = "1",
Name = "MyField1",
IndexType = StringFieldType.IndexType
});
Assert.Equal(1, customField.IndexSlot);
var mapping = await _customFieldDefinitionRepository.GetFieldMappingAsync(nameof(EmployeeWithCustomFields), "1");
Assert.Contains(mapping.Keys, c => c == "MyField1");

await Assert.ThrowsAsync<DocumentValidationException>(() => _customFieldDefinitionRepository.AddAsync(new CustomFieldDefinition
{
EntityType = nameof(EmployeeWithCustomFields),
TenantKey = "1",
Name = "MyField1",
IndexType = IntegerFieldType.IndexType
}));
}

[Fact]
public async Task CanUseDeletedSlotAndName()
{
Expand Down Expand Up @@ -151,6 +174,9 @@ public async Task CanUseDeletedSlotAndName()
mapping = await _customFieldDefinitionRepository.GetFieldMappingAsync(nameof(EmployeeWithCustomFields), "1");
Assert.DoesNotContain(mapping.Keys, c => c == "MyField2");

var deletedFields = await _customFieldDefinitionRepository.FindAsync(q => q.FieldEquals(cf => cf.EntityType, nameof(EmployeeWithCustomFields)).FieldEquals(cf => cf.TenantKey, "1"), o => o.IncludeSoftDeletes().PageLimit(1000));
Assert.Contains(deletedFields.Documents, d => d.Name == "MyField2");

customField = await _customFieldDefinitionRepository.AddAsync(new CustomFieldDefinition
{
EntityType = nameof(EmployeeWithCustomFields),
Expand Down Expand Up @@ -284,24 +310,64 @@ public async Task CanSearchByCustomField()
[Fact]
public async Task CanAutoCreateUnmappedCustomField()
{
Log.DefaultMinimumLevel = LogLevel.Trace;

await _customFieldDefinitionRepository.AddAsync([
new CustomFieldDefinition
{
EntityType = nameof(EmployeeWithCustomFields),
TenantKey = "1",
Name = "Field1",
IndexType = IntegerFieldType.IndexType
},
new CustomFieldDefinition
{
EntityType = nameof(EmployeeWithCustomFields),
TenantKey = "1",
Name = "Field2",
IndexType = IntegerFieldType.IndexType
},
new CustomFieldDefinition
{
EntityType = nameof(EmployeeWithCustomFields),
TenantKey = "1",
Name = "Calculated",
IndexType = IntegerFieldType.IndexType,
ProcessMode = CustomFieldProcessMode.AlwaysProcess,
Data = new Dictionary<string, object> { { "Expression", "source.Data.Field1 + source.Data.Field2" } }
}
]);

var fieldMapping = await _customFieldDefinitionRepository.GetFieldMappingAsync(nameof(EmployeeWithCustomFields), "1");
Assert.DoesNotContain(fieldMapping, m => m.Key == "MyField1");

var employee = EmployeeWithCustomFieldsGenerator.Generate(age: 19);
employee.CompanyId = "1";
employee.PhoneNumbers.Add(new PhoneInfo { Number = "214-222-2222" });
employee.Data["MyField1"] = "hey";
await _employeeRepository.AddAsync(employee, o => o.ImmediateConsistency());
var employee1 = EmployeeWithCustomFieldsGenerator.Generate(age: 19);
employee1.CompanyId = "1";
employee1.PhoneNumbers.Add(new PhoneInfo { Number = "214-222-2222" });
employee1.Data["MyField1"] = "hey1";
employee1.Data["Calculated"] = 1;
var employee2 = EmployeeWithCustomFieldsGenerator.Generate(age: 21);
employee2.CompanyId = "1";
employee2.PhoneNumbers.Add(new PhoneInfo { Number = "214-111-1111" });
employee2.Data["myfield1"] = "hey2";
await _employeeRepository.AddAsync([employee1, employee2], o => o.ImmediateConsistency());

fieldMapping = await _customFieldDefinitionRepository.GetFieldMappingAsync(nameof(EmployeeWithCustomFields), "1");
Assert.Contains(fieldMapping, m => m.Key == "MyField1");

var results = await _employeeRepository.FindAsync(q => q.Company("1").FilterExpression("myfield1:hey"), o => o.QueryLogLevel(LogLevel.Information));
var results = await _employeeRepository.FindAsync(q => q.Company("1").FilterExpression("myfield1:hey1"), o => o.QueryLogLevel(LogLevel.Information));
var employees = results.Documents.ToArray();
Assert.Single(employees);
Assert.Equal(19, employees[0].Age);
Assert.Single(employees[0].Data);
Assert.Equal("hey", employees[0].Data["MyField1"]);
Assert.Equal(2, employees[0].Data.Count);
Assert.Equal("hey1", employees[0].Data["MyField1"]);

results = await _employeeRepository.FindAsync(q => q.Company("1").FilterExpression("myfield1:hey2"), o => o.QueryLogLevel(LogLevel.Information));
employees = results.Documents.ToArray();
Assert.Single(employees);
Assert.Equal(21, employees[0].Age);
Assert.Equal(2, employees[0].Data.Count);
Assert.Equal("hey2", employees[0].Data["myfield1"]);
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Foundatio.Messaging;
using Foundatio.Queues;
using Foundatio.Repositories.Elasticsearch.Configuration;
using Foundatio.Repositories.Elasticsearch.CustomFields;
using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration.Indexes;
using Microsoft.Extensions.Logging;
using Nest;
Expand All @@ -29,7 +30,7 @@ public MyAppElasticConfiguration(IQueue<WorkItemData> workItemQueue, ICacheClien
AddIndex(ParentChild = new ParentChildIndex(this));
AddIndex(DailyFileAccessHistory = new DailyFileAccessHistoryIndex(this));
AddIndex(MonthlyFileAccessHistory = new MonthlyFileAccessHistoryIndex(this));
AddCustomFieldIndex(replicas: 0);
CustomFields = AddCustomFieldIndex(replicas: 0);
}

protected override IConnectionPool CreateConnectionPool()
Expand Down Expand Up @@ -98,4 +99,5 @@ protected override void ConfigureSettings(ConnectionSettings settings)
public ParentChildIndex ParentChild { get; }
public DailyFileAccessHistoryIndex DailyFileAccessHistory { get; }
public MonthlyFileAccessHistoryIndex MonthlyFileAccessHistory { get; }
public CustomFieldDefinitionIndex CustomFields { get; }
}

0 comments on commit d534cfe

Please sign in to comment.