diff --git a/Cognite.Extensions/AssetExtensions.cs b/Cognite.Extensions/AssetExtensions.cs index 5d864931..2f8763f2 100644 --- a/Cognite.Extensions/AssetExtensions.cs +++ b/Cognite.Extensions/AssetExtensions.cs @@ -37,6 +37,7 @@ internal static void SetLogger(ILogger logger) /// Chunk size /// Throttle size /// How to handle failed requests + /// The type of sanitation to apply to assets before creating /// Cancellation token /// A containing errors that occured and a list of the created and found assets public static Task> GetOrCreateAsync( @@ -46,13 +47,14 @@ public static Task> GetOrCreateAsync( int chunkSize, int throttleSize, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { Task> asyncBuildAssets(IEnumerable ids) { return Task.FromResult(buildAssets(ids)); } - return assets.GetOrCreateAsync(externalIds, asyncBuildAssets, chunkSize, throttleSize, retryMode, token); + return assets.GetOrCreateAsync(externalIds, asyncBuildAssets, chunkSize, throttleSize, retryMode, sanitationMode, token); } /// /// Get or create the assets with the provided exist in CDF. @@ -68,6 +70,7 @@ Task> asyncBuildAssets(IEnumerable ids) /// Chunk size /// Throttle size /// How to handle failed requests + /// The type of sanitation to apply to assets before creating /// Cancellation token /// A containing errors that occured and a list of the created and found assets public static async Task> GetOrCreateAsync( @@ -77,6 +80,7 @@ public static async Task> GetOrCreateAsync( int chunkSize, int throttleSize, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { var chunks = externalIds @@ -90,7 +94,7 @@ public static async Task> GetOrCreateAsync( var generators = chunks .Select, Func>( (chunk, idx) => async () => { - var result = await GetOrCreateAssetsChunk(assets, chunk, buildAssets, 0, retryMode, token); + var result = await GetOrCreateAssetsChunk(assets, chunk, buildAssets, 0, retryMode, sanitationMode, token); results[idx] = result; }); @@ -120,6 +124,7 @@ await generators.RunThrottled( /// Throttle size /// How to do retries. Keeping duplicates is not valid for /// this method. + /// The type of sanitation to apply to assets before creating /// Cancellation token /// A containing errors that occured and a list of the created assets public static async Task> EnsureExistsAsync( @@ -128,20 +133,21 @@ public static async Task> EnsureExistsAsync( int chunkSize, int throttleSize, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { - CogniteError prePushError; - (assetsToEnsure, prePushError) = Sanitation.CleanAssetRequest(assetsToEnsure); + IEnumerable errors; + (assetsToEnsure, errors) = Sanitation.CleanAssetRequest(assetsToEnsure, sanitationMode); var chunks = assetsToEnsure .ChunkBy(chunkSize) .ToList(); _logger.LogDebug("Ensuring assets. Number of assets: {Number}. Number of chunks: {Chunks}", assetsToEnsure.Count(), chunks.Count()); - int size = chunks.Count + (prePushError != null ? 1 : 0); + int size = chunks.Count + (errors.Any() ? 1 : 0); var results = new CogniteResult[size]; - if (prePushError != null) + if (errors.Any()) { - results[size - 1] = new CogniteResult(new[] { prePushError }, null); + results[size - 1] = new CogniteResult(errors, null); if (size == 1) return results[size - 1]; } if (size == 0) return new CogniteResult(null, null); @@ -172,6 +178,7 @@ private static async Task> GetOrCreateAssetsChunk( Func, Task>> buildAssets, int backoff, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { IEnumerable found; @@ -191,15 +198,15 @@ private static async Task> GetOrCreateAssetsChunk( _logger.LogDebug("Could not fetch {Missing} out of {Found} assets. Attempting to create the missing ones", missing.Count, externalIds.Count()); var toCreate = await buildAssets(missing); - CogniteError prePushError; - (toCreate, prePushError) = Sanitation.CleanAssetRequest(toCreate); + IEnumerable errors; + (toCreate, errors) = Sanitation.CleanAssetRequest(toCreate, sanitationMode); var result = await CreateAssetsHandleErrors(assets, toCreate, retryMode, token); result.Results = result.Results == null ? found : result.Results.Concat(found); - if (prePushError != null) + if (errors.Any()) { - result.Errors = result.Errors == null ? new[] { prePushError } : result.Errors.Append(prePushError); + result.Errors = result.Errors == null ? errors : result.Errors.Concat(errors); } if (!result.Errors?.Any() ?? false @@ -225,7 +232,7 @@ private static async Task> GetOrCreateAssetsChunk( _logger.LogDebug("Found {cnt} duplicated assets, retrying", duplicatedIds.Count); await Task.Delay(TimeSpan.FromSeconds(0.1 * Math.Pow(2, backoff))); - var nextResult = await GetOrCreateAssetsChunk(assets, duplicatedIds, buildAssets, backoff + 1, retryMode, token); + var nextResult = await GetOrCreateAssetsChunk(assets, duplicatedIds, buildAssets, backoff + 1, retryMode, sanitationMode, token); result = result.Merge(nextResult); return result; diff --git a/Cognite.Extensions/CogniteResult.cs b/Cognite.Extensions/CogniteResult.cs index 8584da27..c124bbff 100644 --- a/Cognite.Extensions/CogniteResult.cs +++ b/Cognite.Extensions/CogniteResult.cs @@ -583,6 +583,10 @@ public enum ErrorType /// ItemDuplicated, /// + /// Item does not satisfy CDF field limits + /// + SanitationFailed, + /// /// Something else happened that caused the request to fail /// FatalFailure = -1 @@ -621,6 +625,42 @@ public enum ResourceType /// LegacyName, /// + /// Name on an asset or timeseries + /// + Name, + /// + /// Type of event + /// + Type, + /// + /// SubType of event + /// + SubType, + /// + /// Source on event or asset + /// + Source, + /// + /// Metadata on event, asset or timeseries + /// + Metadata, + /// + /// Labels on an asset + /// + Labels, + /// + /// Description on event, asset or timeseries + /// + Description, + /// + /// Start and end time on an event + /// + TimeRange, + /// + /// Unit on a timeseries + /// + Unit, + /// /// None or unknown /// None = -1 @@ -678,4 +718,23 @@ public enum RetryMode /// OnFatalKeepDuplicates = 7 } + /// + /// How to do sanitation of objects before creating the request + /// + public enum SanitationMode + { + /// + /// Don't do any sanitation. If you use this, you should make sure that objects are sanitized + /// some other way. + /// + None, + /// + /// Clean objects before requesting. This modifies the passed request. + /// + Clean, + /// + /// Remove any offending objects and report them in the result. + /// + Remove + } } diff --git a/Cognite.Extensions/EventExtensions.cs b/Cognite.Extensions/EventExtensions.cs index 6c9ac22f..4fc2acaf 100644 --- a/Cognite.Extensions/EventExtensions.cs +++ b/Cognite.Extensions/EventExtensions.cs @@ -38,6 +38,7 @@ internal static void SetLogger(ILogger logger) /// Chunk size /// Throttle size /// How to handle failed requests + /// The type of sanitation to apply to events before creating /// Cancellation token /// A containing errors that occured and a list of the created and found events public static Task> GetOrCreateAsync( @@ -47,13 +48,14 @@ public static Task> GetOrCreateAsync( int chunkSize, int throttleSize, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { Task> asyncBuildEvents(IEnumerable ids) { return Task.FromResult(buildEvents(ids)); } - return resource.GetOrCreateAsync(externalIds, asyncBuildEvents, chunkSize, throttleSize, retryMode, token); + return resource.GetOrCreateAsync(externalIds, asyncBuildEvents, chunkSize, throttleSize, retryMode, sanitationMode, token); } /// /// Get or create the events with the provided exist in CDF. @@ -69,6 +71,7 @@ Task> asyncBuildEvents(IEnumerable ids) /// Chunk size /// Throttle size /// How to handle failed requests + /// The type of sanitation to apply to events before creating /// Cancellation token /// A containing errors that occured and a list of the created and found events public static async Task> GetOrCreateAsync( @@ -78,6 +81,7 @@ public static async Task> GetOrCreateAsync( int chunkSize, int throttleSize, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { var chunks = externalIds @@ -91,7 +95,7 @@ public static async Task> GetOrCreateAsync( var generators = chunks .Select, Func>( (chunk, idx) => async () => { - var result = await GetOrCreateEventsChunk(resource, chunk, buildEvents, 0, retryMode, token); + var result = await GetOrCreateEventsChunk(resource, chunk, buildEvents, 0, retryMode, sanitationMode, token); results[idx] = result; }); @@ -122,6 +126,7 @@ await generators.RunThrottled( /// Throttle size /// How to do retries. Keeping duplicates is not valid for /// this method. + /// The type of sanitation to apply to events before creating /// Cancellation token /// A containing errors that occured and a list of the created events public static async Task> EnsureExistsAsync( @@ -130,10 +135,11 @@ public static async Task> EnsureExistsAsync( int chunkSize, int throttleSize, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { - CogniteError prePushError; - (events, prePushError) = Sanitation.CleanEventRequest(events); + IEnumerable errors; + (events, errors) = Sanitation.CleanEventRequest(events, sanitationMode); var chunks = events .ChunkBy(chunkSize) @@ -141,11 +147,11 @@ public static async Task> EnsureExistsAsync( _logger.LogDebug("Ensuring events. Number of events: {Number}. Number of chunks: {Chunks}", events.Count(), chunks.Count()); - int size = chunks.Count + (prePushError != null ? 1 : 0); + int size = chunks.Count + (errors.Any() ? 1 : 0); var results = new CogniteResult[size]; - if (prePushError != null) + if (errors.Any()) { - results[size - 1] = new CogniteResult(new[] { prePushError }, null); + results[size - 1] = new CogniteResult(errors, null); if (size == 1) return results[size - 1]; } if (!results.Any()) return new CogniteResult(null, null); @@ -176,6 +182,7 @@ private static async Task> GetOrCreateEventsChunk( Func, Task>> buildEvents, int backoff, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { IEnumerable found; @@ -195,15 +202,15 @@ private static async Task> GetOrCreateEventsChunk( _logger.LogDebug("Could not fetch {Missing} out of {Found} events. Attempting to create the missing ones", missing.Count, externalIds.Count()); var toCreate = await buildEvents(missing); - CogniteError prePushError; - (toCreate, prePushError) = Sanitation.CleanEventRequest(toCreate); + IEnumerable errors; + (toCreate, errors) = Sanitation.CleanEventRequest(toCreate, sanitationMode); var result = await CreateEventsHandleErrors(resource, toCreate, retryMode, token); result.Results = result.Results == null ? found : result.Results.Concat(found); - if (prePushError != null) + if (errors.Any()) { - result.Errors = result.Errors == null ? new[] { prePushError } : result.Errors.Append(prePushError); + result.Errors = result.Errors == null ? errors : result.Errors.Concat(errors); } if (!result.Errors?.Any() ?? false @@ -229,7 +236,7 @@ private static async Task> GetOrCreateEventsChunk( _logger.LogDebug("Found {cnt} duplicated events, retrying", duplicatedIds.Count); await Task.Delay(TimeSpan.FromSeconds(0.1 * Math.Pow(2, backoff))); - var nextResult = await GetOrCreateEventsChunk(resource, duplicatedIds, buildEvents, backoff + 1, retryMode, token); + var nextResult = await GetOrCreateEventsChunk(resource, duplicatedIds, buildEvents, backoff + 1, retryMode, sanitationMode, token); result = result.Merge(nextResult); return result; diff --git a/Cognite.Extensions/Sanitation.cs b/Cognite.Extensions/Sanitation.cs index 038999fc..9e9c7437 100644 --- a/Cognite.Extensions/Sanitation.cs +++ b/Cognite.Extensions/Sanitation.cs @@ -17,6 +17,7 @@ public static class Sanitation public const int AssetMetadataMaxPerValue = 10240; public const int AssetMetadataMaxPairs = 256; public const int AssetSourceMax = 128; + public const int AssetLabelsMax = 10; public const int TimeSeriesNameMax = 255; public const int TimeSeriesDescriptionMax = 1000; @@ -32,7 +33,7 @@ public static class Sanitation public const int EventMetadataMaxPerKey = 128; public const int EventMetadataMaxPerValue = 128_000; public const int EventMetadataMaxPairs = 256; - public const int EventmetadataMaxBytes = 200_000; + public const int EventMetadataMaxBytes = 200_000; public const int EventAssetIdsMax = 10_000; /// /// Reduce the length of given string to maxLength, if it is longer. @@ -46,6 +47,11 @@ public static string Truncate(this string str, int maxLength) return str.Substring(0, maxLength); } + private static bool CheckLength(this string str, int maxLength) + { + return string.IsNullOrEmpty(str) || str.Length <= maxLength; + } + /// /// Reduce the length of given CogniteExternalId to maxLength, if it is longer. /// @@ -139,6 +145,28 @@ public static Dictionary SanitizeMetadata(this Dictionary pair.Item1, pair => pair.Item2); } + public static bool VerifyMetadata(this Dictionary data, + int maxPerKey, + int maxKeys, + int maxPerValue, + int maxBytes) + { + if (data == null || !data.Any()) return true; + int count = 0; + int byteCount = 0; + foreach (var kvp in data) + { + var valueByteCount = string.IsNullOrEmpty(kvp.Value) ? 0 : Encoding.UTF8.GetByteCount(kvp.Value); + if (valueByteCount > maxPerValue) return false; + var keyByteCount = Encoding.UTF8.GetByteCount(kvp.Key); + if (keyByteCount > maxPerKey) return false; + byteCount += valueByteCount + keyByteCount; + count++; + if (byteCount > maxBytes || count > maxKeys) return false; + } + return true; + } + /// /// Sanitize an AssetCreate so that it can be safely sent to CDF. /// Requests may still fail due to conflicts or missing ids. @@ -147,11 +175,11 @@ public static Dictionary SanitizeMetadata(this Dictionary + /// Check that given AssetCreate satisfies CDF limits. + /// + /// Asset to check + /// Failed resourceType or null if nothing failed + public static ResourceType? Verify(this AssetCreate asset) + { + if (asset == null) throw new ArgumentNullException(nameof(asset)); + if (!asset.ExternalId.CheckLength(ExternalIdMax)) return ResourceType.ExternalId; + if (!asset.Name.CheckLength(AssetNameMax)) return ResourceType.Name; + if (asset.ParentId != null && asset.ParentId < 1) return ResourceType.ParentId; + if (!asset.ParentExternalId.CheckLength(ExternalIdMax)) return ResourceType.ParentExternalId; + if (!asset.Description.CheckLength(AssetDescriptionMax)) return ResourceType.Description; + if (asset.DataSetId != null && asset.DataSetId < 1) return ResourceType.DataSetId; + if (!asset.Metadata.VerifyMetadata(AssetMetadataMaxPerKey, AssetMetadataMaxPairs, AssetMetadataMaxPerValue, AssetMetadataMaxBytes)) + return ResourceType.Metadata; + if (!asset.Source.CheckLength(AssetSourceMax)) return ResourceType.Source; + if (asset.Labels != null && (asset.Labels.Count() > AssetLabelsMax || asset.Labels.Any(label => !label.ExternalId.CheckLength(ExternalIdMax)))) + return ResourceType.Labels; + return null; + } + /// /// Sanitize a TimeSeriesCreate object so that it can be safely sent to CDF. /// Requests may still fail due to conflicts or missing ids. @@ -169,15 +219,35 @@ public static void Sanitize(this AssetCreate asset) public static void Sanitize(this TimeSeriesCreate ts) { if (ts == null) throw new ArgumentNullException(nameof(ts)); - ts.ExternalId = ts.ExternalId?.Truncate(ExternalIdMax); - ts.Name = ts.Name?.Truncate(TimeSeriesNameMax); + ts.ExternalId = ts.ExternalId.Truncate(ExternalIdMax); + ts.Name = ts.Name.Truncate(TimeSeriesNameMax); if (ts.AssetId < 1) ts.AssetId = null; - ts.Description = ts.Description?.Truncate(TimeSeriesDescriptionMax); + ts.Description = ts.Description.Truncate(TimeSeriesDescriptionMax); if (ts.DataSetId < 1) ts.DataSetId = null; - ts.Metadata = ts.Metadata?.SanitizeMetadata(TimeSeriesMetadataMaxPerKey, TimeSeriesMetadataMaxPairs, + ts.Metadata = ts.Metadata.SanitizeMetadata(TimeSeriesMetadataMaxPerKey, TimeSeriesMetadataMaxPairs, TimeSeriesMetadataMaxPerValue, TimeSeriesMetadataMaxBytes); - ts.Unit = ts.Unit?.Truncate(TimeSeriesUnitMax); - ts.LegacyName = ts.LegacyName?.Truncate(ExternalIdMax); + ts.Unit = ts.Unit.Truncate(TimeSeriesUnitMax); + ts.LegacyName = ts.LegacyName.Truncate(ExternalIdMax); + } + + /// + /// Check that given TimeSeriesCreate satisfies CDF limits. + /// + /// Timeseries to check + /// True if timeseries satisfies limits + public static ResourceType? Verify(this TimeSeriesCreate ts) + { + if (ts == null) throw new ArgumentNullException(nameof(ts)); + if (!ts.ExternalId.CheckLength(ExternalIdMax)) return ResourceType.ExternalId; + if (!ts.Name.CheckLength(TimeSeriesNameMax)) return ResourceType.Name; + if (ts.AssetId != null && ts.AssetId < 1) return ResourceType.AssetId; + if (!ts.Description.CheckLength(TimeSeriesDescriptionMax)) return ResourceType.Description; + if (ts.DataSetId != null && ts.DataSetId < 1) return ResourceType.DataSetId; + if (!ts.Metadata.VerifyMetadata(TimeSeriesMetadataMaxPerKey, TimeSeriesMetadataMaxPairs, + TimeSeriesMetadataMaxPerValue, TimeSeriesMetadataMaxBytes)) return ResourceType.Metadata; + if (!ts.Unit.CheckLength(TimeSeriesUnitMax)) return ResourceType.Unit; + if (!ts.LegacyName.CheckLength(ExternalIdMax)) return ResourceType.LegacyName; + return null; } /// @@ -188,11 +258,11 @@ public static void Sanitize(this TimeSeriesCreate ts) public static void Sanitize(this EventCreate evt) { if (evt == null) throw new ArgumentNullException(nameof(evt)); - evt.ExternalId = evt.ExternalId?.Truncate(ExternalIdMax); - evt.Type = evt.Type?.Truncate(EventTypeMax); - evt.Subtype = evt.Subtype?.Truncate(EventTypeMax); - evt.Source = evt.Source?.Truncate(EventSourceMax); - evt.Description = evt.Description?.Truncate(EventDescriptionMax); + evt.ExternalId = evt.ExternalId.Truncate(ExternalIdMax); + evt.Type = evt.Type.Truncate(EventTypeMax); + evt.Subtype = evt.Subtype.Truncate(EventTypeMax); + evt.Source = evt.Source.Truncate(EventSourceMax); + evt.Description = evt.Description.Truncate(EventDescriptionMax); evt.AssetIds = evt.AssetIds? .Where(id => id > 0) .Take(EventAssetIdsMax); @@ -200,7 +270,29 @@ public static void Sanitize(this EventCreate evt) if (evt.EndTime < 0) evt.EndTime = 0; if (evt.StartTime > evt.EndTime) evt.EndTime = evt.StartTime; if (evt.DataSetId < 1) evt.DataSetId = null; - evt.Metadata = evt.Metadata?.SanitizeMetadata(EventMetadataMaxPerKey, EventMetadataMaxPairs, EventMetadataMaxPerValue, EventmetadataMaxBytes); + evt.Metadata = evt.Metadata.SanitizeMetadata(EventMetadataMaxPerKey, EventMetadataMaxPairs, EventMetadataMaxPerValue, EventMetadataMaxBytes); + } + + /// + /// Check that given EventCreate satisfies CDF limits. + /// + /// Event to check + /// True if event satisfies limits + public static ResourceType? Verify(this EventCreate evt) + { + if (evt == null) throw new ArgumentNullException(nameof(evt)); + if (!evt.ExternalId.CheckLength(ExternalIdMax)) return ResourceType.ExternalId; + if (!evt.Type.CheckLength(EventTypeMax)) return ResourceType.Type; + if (!evt.Subtype.CheckLength(EventTypeMax)) return ResourceType.SubType; + if (!evt.Source.CheckLength(EventSourceMax)) return ResourceType.Source; + if (evt.AssetIds != null && (evt.AssetIds.Count() > EventAssetIdsMax || evt.AssetIds.Any(id => id < 1))) return ResourceType.AssetId; + if (evt.StartTime != null && evt.StartTime < 1 + || evt.EndTime != null && evt.EndTime < 1 + || evt.StartTime != null && evt.EndTime != null && evt.StartTime > evt.EndTime) return ResourceType.TimeRange; + if (evt.DataSetId != null && evt.DataSetId < 1) return ResourceType.DataSetId; + if (!evt.Metadata.VerifyMetadata(EventMetadataMaxPerKey, EventMetadataMaxPairs, EventMetadataMaxPerValue, EventMetadataMaxBytes)) + return ResourceType.Metadata; + return null; } /// @@ -208,40 +300,68 @@ public static void Sanitize(this EventCreate evt) /// The first encountered duplicate is kept. /// /// AssetCreate request to clean + /// The type of sanitation to apply /// Cleaned create request and an optional error if any ids were duplicated - public static (IEnumerable, CogniteError) CleanAssetRequest(IEnumerable assets) + public static (IEnumerable, IEnumerable) CleanAssetRequest(IEnumerable assets, SanitationMode mode) { var result = new List(); + var errors = new List(); var ids = new HashSet(); var duplicated = new HashSet(); + var bad = new List<(ResourceType, AssetCreate)>(); foreach (var asset in assets) { - asset.Sanitize(); + bool toAdd = true; + if (mode == SanitationMode.Remove) + { + var failedField = asset.Verify(); + if (failedField.HasValue) + { + bad.Add((failedField.Value, asset)); + toAdd = false; + } + } + else if (mode == SanitationMode.Clean) + { + asset.Sanitize(); + } if (asset.ExternalId != null) { if (!ids.Add(asset.ExternalId)) { duplicated.Add(asset.ExternalId); - continue; + toAdd = false; } } - result.Add(asset); + if (toAdd) + { + result.Add(asset); + } } - CogniteError error = null; if (duplicated.Any()) { - error = new CogniteError + errors.Add(new CogniteError { Status = 409, Message = "Duplicate external ids", Resource = ResourceType.ExternalId, Type = ErrorType.ItemDuplicated, Values = duplicated.Select(item => Identity.Create(item)).ToArray() - }; + }); + } + if (bad.Any()) + { + errors.AddRange(bad.GroupBy(pair => pair.Item1).Select(group => new CogniteError + { + Skipped = group.Select(pair => pair.Item2).ToList(), + Resource = group.Key, + Type = ErrorType.SanitationFailed, + Status = 400 + })); } - return (result, error); + return (result, errors); } /// @@ -249,10 +369,13 @@ public static (IEnumerable, CogniteError) CleanAssetRequest(IEnumer /// The first encountered duplicate is kept. /// /// TimeSeriesCreate request to clean + /// The type of sanitation to apply /// Cleaned create request and optional errors for duplicated ids and legacyNames - public static (IEnumerable, CogniteError idError, CogniteError nameError) CleanTimeSeriesRequest(IEnumerable timeseries) + public static (IEnumerable, IEnumerable) CleanTimeSeriesRequest(IEnumerable timeseries, + SanitationMode mode) { var result = new List(); + var errors = new List(); var ids = new HashSet(); var duplicatedIds = new HashSet(); @@ -260,15 +383,31 @@ public static (IEnumerable, CogniteError idError, CogniteError var names = new HashSet(); var duplicatedNames = new HashSet(); + var bad = new List<(ResourceType, TimeSeriesCreate)>(); + + foreach (var ts in timeseries) { - ts.Sanitize(); + bool toAdd = true; + if (mode == SanitationMode.Remove) + { + var failedField = ts.Verify(); + if (failedField.HasValue) + { + bad.Add((failedField.Value, ts)); + toAdd = false; + } + } + else if (mode == SanitationMode.Clean) + { + ts.Sanitize(); + } if (ts.ExternalId != null) { if (!ids.Add(ts.ExternalId)) { duplicatedIds.Add(ts.ExternalId); - continue; + toAdd = false; } } if (ts.LegacyName != null) @@ -276,36 +415,47 @@ public static (IEnumerable, CogniteError idError, CogniteError if (!names.Add(ts.LegacyName)) { duplicatedNames.Add(ts.LegacyName); - continue; + toAdd = false; } } - result.Add(ts); + if (toAdd) + { + result.Add(ts); + } } - CogniteError idError = null; if (duplicatedIds.Any()) { - idError = new CogniteError + errors.Add(new CogniteError { Status = 409, Message = "Conflicting identifiers", Resource = ResourceType.ExternalId, Type = ErrorType.ItemDuplicated, Values = duplicatedIds.Select(Identity.Create).ToArray() - }; + }); } - CogniteError nameError = null; if (duplicatedNames.Any()) { - nameError = new CogniteError + errors.Add(new CogniteError { Status = 409, Message = "Duplicated metric names in request", Resource = ResourceType.LegacyName, Type = ErrorType.ItemDuplicated, Values = duplicatedNames.Select(Identity.Create).ToArray() - }; + }); + } + if (bad.Any()) + { + errors.AddRange(bad.GroupBy(pair => pair.Item1).Select(group => new CogniteError + { + Skipped = group.Select(pair => pair.Item2).ToList(), + Resource = group.Key, + Type = ErrorType.SanitationFailed, + Status = 400 + })); } - return (result, idError, nameError); + return (result, errors); } /// @@ -313,40 +463,69 @@ public static (IEnumerable, CogniteError idError, CogniteError /// The first encountered duplicate is kept. /// /// EventCreate request to clean + /// The type of sanitation to apply /// Cleaned request and optional error if any ids were duplicated - public static (IEnumerable, CogniteError) CleanEventRequest(IEnumerable events) + public static (IEnumerable, IEnumerable) CleanEventRequest(IEnumerable events, SanitationMode mode) { var result = new List(); + var errors = new List(); var ids = new HashSet(); var duplicated = new HashSet(); + var bad = new List<(ResourceType, EventCreate)>(); + foreach (var evt in events) { - evt.Sanitize(); + bool toAdd = true; + if (mode == SanitationMode.Remove) + { + var failedField = evt.Verify(); + if (failedField.HasValue) + { + bad.Add((failedField.Value, evt)); + toAdd = false; + } + } + else if (mode == SanitationMode.Clean) + { + evt.Sanitize(); + } if (evt.ExternalId != null) { if (!ids.Add(evt.ExternalId)) { duplicated.Add(evt.ExternalId); - continue; + toAdd = false; } } - result.Add(evt); + if (toAdd) + { + result.Add(evt); + } } - CogniteError err = null; if (duplicated.Any()) { - err = new CogniteError + errors.Add(new CogniteError { Status = 409, Message = "ExternalIds duplicated", Resource = ResourceType.ExternalId, Type = ErrorType.ItemDuplicated, Values = duplicated.Select(Identity.Create).ToArray() - }; + }); + } + if (bad.Any()) + { + errors.AddRange(bad.GroupBy(pair => pair.Item1).Select(group => new CogniteError + { + Skipped = group.Select(pair => pair.Item2).ToList(), + Resource = group.Key, + Type = ErrorType.SanitationFailed, + Status = 400 + })); } - return (result, err); + return (result, errors); } } } diff --git a/Cognite.Extensions/TimeSeriesExtensions.cs b/Cognite.Extensions/TimeSeriesExtensions.cs index c1bbd085..4a838b26 100644 --- a/Cognite.Extensions/TimeSeriesExtensions.cs +++ b/Cognite.Extensions/TimeSeriesExtensions.cs @@ -38,6 +38,7 @@ internal static void SetLogger(ILogger logger) /// Chunk size /// Throttle size /// How to handle failed requests + /// The type of sanitation to apply to timeseries before creating /// Cancellation token /// A containing errors that occured and a list of the created and found timeseries public static Task> GetOrCreateTimeSeriesAsync( @@ -47,6 +48,7 @@ public static Task> GetOrCreateTimeSeriesAsync( int chunkSize, int throttleSize, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { Task> asyncBuildTimeSeries(IEnumerable ids) @@ -54,7 +56,7 @@ Task> asyncBuildTimeSeries(IEnumerable ids return Task.FromResult(buildTimeSeries(ids)); } return timeSeries.GetOrCreateTimeSeriesAsync(externalIds, asyncBuildTimeSeries, - chunkSize, throttleSize, retryMode, token); + chunkSize, throttleSize, retryMode, sanitationMode, token); } /// @@ -71,6 +73,7 @@ Task> asyncBuildTimeSeries(IEnumerable ids /// Chunk size /// Throttle size /// How to handle failed requests + /// The type of sanitation to apply to timeseries before creating /// Cancellation token /// A containing errors that occured and a list of the created and found timeseries public static async Task> GetOrCreateTimeSeriesAsync( @@ -80,6 +83,7 @@ public static async Task> GetOrCreateTimeSeriesAsync( int chunkSize, int throttleSize, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { var chunks = externalIds @@ -94,7 +98,7 @@ public static async Task> GetOrCreateTimeSeriesAsync( .Select, Func>( (chunk, idx) => async () => { var result = await GetOrCreateTimeSeriesChunk(timeSeries, chunk, - buildTimeSeries, 0, retryMode, token); + buildTimeSeries, 0, retryMode, sanitationMode, token); results[idx] = result; }); @@ -124,6 +128,7 @@ await generators.RunThrottled( /// Throttle size /// How to do retries. Keeping duplicates is not valid for /// this method. + /// The type of sanitation to apply to timeseries before creating /// Cancellation token /// A containing errors that occured and a list of the created timeseries public static async Task> EnsureTimeSeriesExistsAsync( @@ -132,23 +137,21 @@ public static async Task> EnsureTimeSeriesExistsAsync( int chunkSize, int throttleSize, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { - CogniteError idError, nameError; - (timeSeriesToEnsure, idError, nameError) = Sanitation.CleanTimeSeriesRequest(timeSeriesToEnsure); + IEnumerable errors; + (timeSeriesToEnsure, errors) = Sanitation.CleanTimeSeriesRequest(timeSeriesToEnsure, sanitationMode); var chunks = timeSeriesToEnsure .ChunkBy(chunkSize) .ToList(); - int size = chunks.Count + (idError != null || nameError != null ? 1 : 0); + int size = chunks.Count + (errors.Any() ? 1 : 0); var results = new CogniteResult[size]; - if (idError != null || nameError != null) + if (errors.Any()) { - var errors = new List(); - if (idError != null) errors.Add(idError); - if (nameError != null) errors.Add(nameError); results[size - 1] = new CogniteResult(errors, null); if (size == 1) return results[size - 1]; } @@ -225,6 +228,7 @@ private static async Task> GetOrCreateTimeSeriesChunk( Func, Task>> buildTimeSeries, int backoff, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { IEnumerable found; @@ -244,19 +248,15 @@ private static async Task> GetOrCreateTimeSeriesChunk( _logger.LogDebug("Could not fetch {Missing} out of {Found} time series. Attempting to create the missing ones", missing.Count, externalIds.Count()); var toCreate = await buildTimeSeries(missing); - CogniteError idError, nameError; - (toCreate, idError, nameError) = Sanitation.CleanTimeSeriesRequest(toCreate); + IEnumerable errors; + (toCreate, errors) = Sanitation.CleanTimeSeriesRequest(toCreate, sanitationMode); var result = await CreateTimeSeriesHandleErrors(client, toCreate, retryMode, token); result.Results = result.Results == null ? found : result.Results.Concat(found); - if (idError != null || nameError != null) + if (errors.Any()) { - var errors = new List(); - if (result.Errors != null) errors.AddRange(result.Errors); - if (idError != null) errors.Add(idError); - if (nameError != null) errors.Add(nameError); - result.Errors = errors; + result.Errors = result.Errors == null ? errors : result.Errors.Concat(errors); } if (!result.Errors?.Any() ?? false @@ -283,7 +283,7 @@ private static async Task> GetOrCreateTimeSeriesChunk( await Task.Delay(TimeSpan.FromSeconds(0.1 * Math.Pow(2, backoff))); var nextResult = await GetOrCreateTimeSeriesChunk(client, duplicatedIds, - buildTimeSeries, backoff + 1, retryMode, token); + buildTimeSeries, backoff + 1, retryMode, sanitationMode, token); result = result.Merge(nextResult); return result; diff --git a/ExtractorUtils.Test/CogniteTest.cs b/ExtractorUtils.Test/CogniteTest.cs index 3fc3d217..f4458e97 100644 --- a/ExtractorUtils.Test/CogniteTest.cs +++ b/ExtractorUtils.Test/CogniteTest.cs @@ -307,6 +307,7 @@ public async Task TestEnsureTimeSeries(params string[] ids) ids, createFunction, RetryMode.OnErrorKeepDuplicates, + SanitationMode.Remove, CancellationToken.None ); Assert.Equal(ids.Count(), ts.Results.Where(t => ids.Contains(t.ExternalId)).Count()); @@ -319,7 +320,7 @@ public async Task TestEnsureTimeSeries(params string[] ids) using (var source = new CancellationTokenSource(5_000)) { // a timeout would fail the test - await cogniteDestination.EnsureTimeSeriesExistsAsync(newTs, RetryMode.OnFatal, source.Token); + await cogniteDestination.EnsureTimeSeriesExistsAsync(newTs, RetryMode.OnFatal, SanitationMode.Remove, source.Token); } Assert.Equal(ids.Count(), _ensuredTimeSeries .Where(kvp => ids.Contains(kvp.Key)).Count()); @@ -393,6 +394,7 @@ public async Task TestEnsureAssets(params string[] ids) ids, createFunction, RetryMode.OnErrorKeepDuplicates, + SanitationMode.Remove, CancellationToken.None ); Assert.Equal(ids.Count(), ts.Results.Where(t => ids.Contains(t.ExternalId)).Count()); @@ -405,7 +407,7 @@ public async Task TestEnsureAssets(params string[] ids) using (var source = new CancellationTokenSource(5_000)) { // a timeout would fail the test - await cogniteDestination.EnsureAssetsExistsAsync(newAssets, RetryMode.OnFatal, source.Token); + await cogniteDestination.EnsureAssetsExistsAsync(newAssets, RetryMode.OnFatal, SanitationMode.Remove, source.Token); } Assert.Equal(ids.Count(), _ensuredAssets .Where(kvp => ids.Contains(kvp.Key)).Count()); diff --git a/ExtractorUtils.Test/EventTest.cs b/ExtractorUtils.Test/EventTest.cs index 4d6e8a36..ffeeb231 100644 --- a/ExtractorUtils.Test/EventTest.cs +++ b/ExtractorUtils.Test/EventTest.cs @@ -95,6 +95,7 @@ public async Task TestEnsureEvents(params string[] ids) ids, createFunction, RetryMode.OnErrorKeepDuplicates, + SanitationMode.Remove, CancellationToken.None ); Assert.Equal(ids.Count(), ts.Results.Where(t => ids.Contains(t.ExternalId)).Count()); @@ -107,7 +108,7 @@ public async Task TestEnsureEvents(params string[] ids) using (var source = new CancellationTokenSource(5_000)) { // a timeout would fail the test - await cogniteDestination.EnsureEventsExistsAsync(newEvents, RetryMode.OnFatal, source.Token); + await cogniteDestination.EnsureEventsExistsAsync(newEvents, RetryMode.OnFatal, SanitationMode.Remove, source.Token); } Assert.Equal(ids.Count(), _ensuredEvents .Where(kvp => ids.Contains(kvp.Key)).Count()); diff --git a/ExtractorUtils.Test/SanitationTest.cs b/ExtractorUtils.Test/SanitationTest.cs index 0ce2e362..06623105 100644 --- a/ExtractorUtils.Test/SanitationTest.cs +++ b/ExtractorUtils.Test/SanitationTest.cs @@ -45,6 +45,47 @@ public void TestSanitizeAsset() Assert.Equal(new string('æ', 128), asset.Source); } [Fact] + public void TestVerifyAsset() + { + var removeFields = new List + { + ResourceType.ExternalId, ResourceType.Name, ResourceType.ParentId, + ResourceType.ParentExternalId, ResourceType.Description, ResourceType.DataSetId, + ResourceType.Metadata, ResourceType.Source, ResourceType.Labels + }; + var asset = new AssetCreate + { + ExternalId = new string('æ', 300), + Description = new string('æ', 1000), + DataSetId = -2502, + Labels = new CogniteExternalId[] { null, new CogniteExternalId(null) }.Concat(Enumerable.Range(0, 100).Select(i => new CogniteExternalId(new string('æ', 300)))), + Metadata = Enumerable.Range(0, 100) + .ToDictionary(i => $"key{i.ToString("000")}{new string('æ', 100)}", i => new string('æ', 200)), + Name = new string('ø', 1000), + ParentExternalId = new string('æ', 300), + ParentId = -1234, + Source = new string('æ', 12345) + }; + foreach (var field in removeFields) + { + var errType = asset.Verify(); + Assert.Equal(field, errType); + switch (field) + { + case ResourceType.ExternalId: asset.ExternalId = null; break; + case ResourceType.Name: asset.Name = null; break; + case ResourceType.ParentId: asset.ParentId = null; break; + case ResourceType.ParentExternalId: asset.ParentExternalId = null; break; + case ResourceType.Description: asset.Description = null; break; + case ResourceType.DataSetId: asset.DataSetId = null; break; + case ResourceType.Metadata: asset.Metadata = null; break; + case ResourceType.Source: asset.Source = null; break; + case ResourceType.Labels: asset.Labels = null; break; + } + } + Assert.Null(asset.Verify()); + } + [Fact] public void TestSanitizeTimeSeries() { var ts = new TimeSeriesCreate @@ -75,6 +116,45 @@ public void TestSanitizeTimeSeries() Assert.Equal(new string('æ', 32), ts.Unit); } [Fact] + public void TestVerifyTimeSeries() + { + var removeFields = new List + { + ResourceType.ExternalId, ResourceType.Name, ResourceType.AssetId, + ResourceType.Description, ResourceType.DataSetId, ResourceType.Metadata, + ResourceType.Unit, ResourceType.LegacyName + }; + var ts = new TimeSeriesCreate + { + ExternalId = new string('æ', 300), + Description = new string('æ', 2000), + DataSetId = -2952, + AssetId = -1239, + LegacyName = new string('æ', 300), + Metadata = Enumerable.Range(0, 100) + .ToDictionary(i => $"key{i.ToString("000")}{new string('æ', 100)}", i => new string('æ', 200)), + Name = new string('æ', 300), + Unit = new string('æ', 200) + }; + foreach (var field in removeFields) + { + var errType = ts.Verify(); + Assert.Equal(field, errType); + switch (field) + { + case ResourceType.ExternalId: ts.ExternalId = null; break; + case ResourceType.Name: ts.Name = null; break; + case ResourceType.AssetId: ts.AssetId = null; break; + case ResourceType.Description: ts.Description = null; break; + case ResourceType.DataSetId: ts.DataSetId = null; break; + case ResourceType.Metadata: ts.Metadata = null; break; + case ResourceType.Unit: ts.Unit = null; break; + case ResourceType.LegacyName: ts.LegacyName = null; break; + } + } + Assert.Null(ts.Verify()); + } + [Fact] public void TestSanitizeEvent() { var evt = new EventCreate @@ -106,6 +186,50 @@ public void TestSanitizeEvent() Assert.Equal(new string('æ', 64), evt.Type); Assert.Equal(new string('æ', 64), evt.Subtype); } + [Fact] + public void TestVerifyEvent() + { + var removeFields = new List + { + ResourceType.ExternalId, ResourceType.Type, ResourceType.SubType, + ResourceType.Source, ResourceType.AssetId, ResourceType.TimeRange, + ResourceType.DataSetId, ResourceType.Metadata + }; + var evt = new EventCreate + { + AssetIds = Enumerable.Range(-100, 100000).Select(i => (long)i), + DataSetId = -125, + Description = new string('æ', 1000), + EndTime = -12345, + StartTime = -12345, + ExternalId = new string('æ', 300), + Metadata = Enumerable.Range(0, 200) + .ToDictionary(i => $"key{i.ToString("000")}{new string('æ', 100)}", i => new string('æ', 600)), + Source = new string('æ', 200), + Subtype = new string('æ', 300), + Type = new string('æ', 300) + }; + foreach (var field in removeFields) + { + var errType = evt.Verify(); + Assert.Equal(field, errType); + switch (field) + { + case ResourceType.ExternalId: evt.ExternalId = null; break; + case ResourceType.Type: evt.Type = null; break; + case ResourceType.SubType: evt.Subtype = null; break; + case ResourceType.Source: evt.Source = null; break; + case ResourceType.AssetId: evt.AssetIds = null; break; + case ResourceType.TimeRange: + evt.StartTime = 100; + evt.EndTime = 1000; + break; + case ResourceType.DataSetId: evt.DataSetId = null; break; + case ResourceType.Metadata: evt.Metadata = null; break; + } + } + Assert.Null(evt.Verify()); + } [Theory] [InlineData("æææææ", 4)] [InlineData("123412341234", 9)] @@ -125,7 +249,8 @@ public void TestSanitizeEventRequest() new EventCreate { ExternalId = "test2" }, new EventCreate { ExternalId = "test3" } }; - var (result, err) = Sanitation.CleanEventRequest(events); + var (result, errors) = Sanitation.CleanEventRequest(events, SanitationMode.Clean); + var err = errors.First(); Assert.Equal(3, result.Count()); Assert.Equal(2, err.Values.Count()); Assert.Equal(ErrorType.ItemDuplicated, err.Type); @@ -143,7 +268,8 @@ public void TestSanitizeAssetRequest() new AssetCreate { ExternalId = "test2" }, new AssetCreate { ExternalId = "test3" } }; - var (result, err) = Sanitation.CleanAssetRequest(assets); + var (result, errors) = Sanitation.CleanAssetRequest(assets, SanitationMode.Clean); + var err = errors.First(); Assert.Equal(3, result.Count()); Assert.Equal(2, err.Values.Count()); Assert.Equal(ErrorType.ItemDuplicated, err.Type); @@ -166,7 +292,9 @@ public void TestSanitizeTimeSeriesRequest() new TimeSeriesCreate { LegacyName = "test2", ExternalId = "test7" }, new TimeSeriesCreate { LegacyName = "test3", ExternalId = "test8" } }; - var (result, err, err2) = Sanitation.CleanTimeSeriesRequest(timeseries); + var (result, errors) = Sanitation.CleanTimeSeriesRequest(timeseries, SanitationMode.Clean); + var err = errors.First(); + var err2 = errors.ElementAt(1); Assert.Equal(6, result.Count()); Assert.Equal(2, err.Values.Count()); Assert.Equal(ErrorType.ItemDuplicated, err.Type); diff --git a/ExtractorUtils/Cognite/CogniteDestination.cs b/ExtractorUtils/Cognite/CogniteDestination.cs index 09d4afdc..28bca27a 100644 --- a/ExtractorUtils/Cognite/CogniteDestination.cs +++ b/ExtractorUtils/Cognite/CogniteDestination.cs @@ -67,12 +67,14 @@ public async Task TestCogniteConfig(CancellationToken token) /// External Ids /// Function that builds CogniteSdk TimeSeriesCreate objects /// How to handle failed requests + /// The type of sanitation to apply to timeseries before creating /// Cancellation token /// A containing errors that occured and a list of the created and found timeseries public async Task> GetOrCreateTimeSeriesAsync( IEnumerable externalIds, Func, IEnumerable> buildTimeSeries, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { _logger.LogInformation("Getting or creating {Number} time series in CDF", externalIds.Count()); @@ -82,6 +84,7 @@ public async Task> GetOrCreateTimeSeriesAsync( _config.CdfChunking.TimeSeries, _config.CdfThrottling.TimeSeries, retryMode, + sanitationMode, token); } /// @@ -97,12 +100,14 @@ public async Task> GetOrCreateTimeSeriesAsync( /// External Ids /// Async function that builds CogniteSdk TimeSeriesCreate objects /// How to handle failed requests + /// The type of sanitation to apply to timeseries before creating /// Cancellation token /// A containing errors that occured and a list of the created and found timeseries public async Task> GetOrCreateTimeSeriesAsync( IEnumerable externalIds, Func, Task>> buildTimeSeries, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { _logger.LogInformation("Getting or creating {Number} time series in CDF", externalIds.Count()); @@ -112,6 +117,7 @@ public async Task> GetOrCreateTimeSeriesAsync( _config.CdfChunking.TimeSeries, _config.CdfThrottling.TimeSeries, retryMode, + sanitationMode, token); } @@ -127,11 +133,13 @@ public async Task> GetOrCreateTimeSeriesAsync( /// List of CogniteSdk TimeSeriesCreate objects /// How to do retries. Keeping duplicates is not valid for /// this method. + /// The type of sanitation to apply to timeseries before creating /// Cancellation token /// A containing errors that occured and a list of the created timeseries public async Task EnsureTimeSeriesExistsAsync( IEnumerable timeSeries, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { _logger.LogInformation("Ensuring that {Number} time series exist in CDF", timeSeries.Count()); @@ -140,6 +148,7 @@ public async Task EnsureTimeSeriesExistsAsync( _config.CdfChunking.TimeSeries, _config.CdfThrottling.TimeSeries, retryMode, + sanitationMode, token); } #endregion @@ -157,12 +166,14 @@ public async Task EnsureTimeSeriesExistsAsync( /// External Ids /// Function that builds CogniteSdk AssetCreate objects /// How to handle failed requests + /// The type of sanitation to apply to assets before creating /// Cancellation token /// A containing errors that occured and a list of the created and found assets public async Task> GetOrCreateAssetsAsync( IEnumerable externalIds, Func, IEnumerable> buildAssets, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { _logger.LogInformation("Getting or creating {Number} assets in CDF", externalIds.Count()); @@ -172,6 +183,7 @@ public async Task> GetOrCreateAssetsAsync( _config.CdfChunking.Assets, _config.CdfThrottling.Assets, retryMode, + sanitationMode, token); } /// @@ -186,12 +198,14 @@ public async Task> GetOrCreateAssetsAsync( /// External Ids /// Async function that builds CogniteSdk AssetCreate objects /// How to handle failed requests + /// The type of sanitation to apply to assets before creating /// Cancellation token /// A containing errors that occured and a list of the created and found assets public async Task> GetOrCreateAssetsAsync( IEnumerable externalIds, Func, Task>> buildAssets, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { _logger.LogInformation("Getting or creating {Number} assets in CDF", externalIds.Count()); @@ -201,6 +215,7 @@ public async Task> GetOrCreateAssetsAsync( _config.CdfChunking.Assets, _config.CdfThrottling.Assets, retryMode, + sanitationMode, token); } @@ -215,11 +230,13 @@ public async Task> GetOrCreateAssetsAsync( /// List of CogniteSdk AssetCreate objects /// How to do retries. Keeping duplicates is not valid for /// this method. + /// The type of sanitation to apply to assets before creating /// Cancellation token /// A containing errors that occured and a list of the created assets public async Task EnsureAssetsExistsAsync( IEnumerable assets, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { _logger.LogInformation("Ensuring that {Number} assets exist in CDF", assets.Count()); @@ -228,6 +245,7 @@ public async Task EnsureAssetsExistsAsync( _config.CdfChunking.Assets, _config.CdfThrottling.Assets, retryMode, + sanitationMode, token); } #endregion @@ -544,12 +562,14 @@ public Task> GetExtractedRanges( /// External Ids /// Function that builds CogniteSdk EventCreate objects /// How to handle failed requests + /// The type of sanitation to apply to events before creating /// Cancellation token /// A containing errors that occured and a list of the created and found events public async Task> GetOrCreateEventsAsync( IEnumerable externalIds, Func, IEnumerable> buildEvents, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { _logger.LogInformation("Getting or creating {Number} events in CDF", externalIds.Count()); @@ -559,6 +579,7 @@ public async Task> GetOrCreateEventsAsync( _config.CdfChunking.Events, _config.CdfThrottling.Events, retryMode, + sanitationMode, token); } /// @@ -573,12 +594,14 @@ public async Task> GetOrCreateEventsAsync( /// External Ids /// Async function that builds CogniteSdk EventCreate objects /// How to handle failed requests + /// The type of sanitation to apply to events before creating /// Cancellation token /// A containing errors that occured and a list of the created and found events public async Task> GetOrCreateEventsAsync( IEnumerable externalIds, Func, Task>> buildEvents, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { _logger.LogInformation("Getting or creating {Number} events in CDF", externalIds.Count()); @@ -588,6 +611,7 @@ public async Task> GetOrCreateEventsAsync( _config.CdfChunking.Events, _config.CdfThrottling.Events, retryMode, + sanitationMode, token); } @@ -602,11 +626,13 @@ public async Task> GetOrCreateEventsAsync( /// List of CogniteSdk EventCreate objects /// How to do retries. Keeping duplicates is not valid for /// this method. + /// The type of sanitation to apply to events before creating /// Cancellation token /// A containing errors that occured and a list of the created events public async Task EnsureEventsExistsAsync( IEnumerable events, RetryMode retryMode, + SanitationMode sanitationMode, CancellationToken token) { _logger.LogInformation("Ensuring that {Number} events exist in CDF", events.Count()); @@ -615,6 +641,7 @@ public async Task EnsureEventsExistsAsync( _config.CdfChunking.Events, _config.CdfThrottling.Events, retryMode, + sanitationMode, token); } #endregion diff --git a/ExtractorUtils/queues/EventUploadQueue.cs b/ExtractorUtils/queues/EventUploadQueue.cs index 0b44c877..01e5e301 100644 --- a/ExtractorUtils/queues/EventUploadQueue.cs +++ b/ExtractorUtils/queues/EventUploadQueue.cs @@ -96,7 +96,7 @@ private async Task ReadFromBuffer(CancellationToken token) events = await CogniteUtils.ReadEventsAsync(stream, token, 10_000); if (events.Any()) { - var result = await _destination.EnsureEventsExistsAsync(events, RetryMode.OnError, token); + var result = await _destination.EnsureEventsExistsAsync(events, RetryMode.OnError, SanitationMode.Clean, token); var fatalError = result.Errors?.FirstOrDefault(err => err.Type == ErrorType.FatalFailure); if (fatalError != null) @@ -155,7 +155,7 @@ protected override async Task> UploadEntries(IEnu _logger.LogTrace("Dequeued {Number} events to upload to CDF", items.Count()); - var result = await _destination.EnsureEventsExistsAsync(items, RetryMode.OnError, token); + var result = await _destination.EnsureEventsExistsAsync(items, RetryMode.OnError, SanitationMode.Clean, token); var fatalError = result.Errors?.FirstOrDefault(err => err.Type == ErrorType.FatalFailure); if (fatalError != null)