Skip to content

Commit

Permalink
No sanitation (#67)
Browse files Browse the repository at this point in the history
Configurable sanitation. Either skip it entirely, use the old behavior of modifying existing objects, or remove them entirely and report the first field that caused it to be removed in a CogniteError.
  • Loading branch information
einarmo authored Aug 31, 2020
1 parent a0a83e4 commit 9a6cfa0
Show file tree
Hide file tree
Showing 10 changed files with 505 additions and 95 deletions.
31 changes: 19 additions & 12 deletions Cognite.Extensions/AssetExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ internal static void SetLogger(ILogger logger)
/// <param name="chunkSize">Chunk size</param>
/// <param name="throttleSize">Throttle size</param>
/// <param name="retryMode">How to handle failed requests</param>
/// <param name="sanitationMode">The type of sanitation to apply to assets before creating</param>
/// <param name="token">Cancellation token</param>
/// <returns>A <see cref="CogniteResult"/> containing errors that occured and a list of the created and found assets</returns>
public static Task<CogniteResult<Asset>> GetOrCreateAsync(
Expand All @@ -46,13 +47,14 @@ public static Task<CogniteResult<Asset>> GetOrCreateAsync(
int chunkSize,
int throttleSize,
RetryMode retryMode,
SanitationMode sanitationMode,
CancellationToken token)
{
Task<IEnumerable<AssetCreate>> asyncBuildAssets(IEnumerable<string> ids)
{
return Task.FromResult(buildAssets(ids));
}
return assets.GetOrCreateAsync(externalIds, asyncBuildAssets, chunkSize, throttleSize, retryMode, token);
return assets.GetOrCreateAsync(externalIds, asyncBuildAssets, chunkSize, throttleSize, retryMode, sanitationMode, token);
}
/// <summary>
/// Get or create the assets with the provided <paramref name="externalIds"/> exist in CDF.
Expand All @@ -68,6 +70,7 @@ Task<IEnumerable<AssetCreate>> asyncBuildAssets(IEnumerable<string> ids)
/// <param name="chunkSize">Chunk size</param>
/// <param name="throttleSize">Throttle size</param>
/// <param name="retryMode">How to handle failed requests</param>
/// <param name="sanitationMode">The type of sanitation to apply to assets before creating</param>
/// <param name="token">Cancellation token</param>
/// <returns>A <see cref="CogniteResult"/> containing errors that occured and a list of the created and found assets</returns>
public static async Task<CogniteResult<Asset>> GetOrCreateAsync(
Expand All @@ -77,6 +80,7 @@ public static async Task<CogniteResult<Asset>> GetOrCreateAsync(
int chunkSize,
int throttleSize,
RetryMode retryMode,
SanitationMode sanitationMode,
CancellationToken token)
{
var chunks = externalIds
Expand All @@ -90,7 +94,7 @@ public static async Task<CogniteResult<Asset>> GetOrCreateAsync(
var generators = chunks
.Select<IEnumerable<string>, Func<Task>>(
(chunk, idx) => async () => {
var result = await GetOrCreateAssetsChunk(assets, chunk, buildAssets, 0, retryMode, token);
var result = await GetOrCreateAssetsChunk(assets, chunk, buildAssets, 0, retryMode, sanitationMode, token);
results[idx] = result;
});

Expand Down Expand Up @@ -120,6 +124,7 @@ await generators.RunThrottled(
/// <param name="throttleSize">Throttle size</param>
/// <param name="retryMode">How to do retries. Keeping duplicates is not valid for
/// this method.</param>
/// <param name="sanitationMode">The type of sanitation to apply to assets before creating</param>
/// <param name="token">Cancellation token</param>
/// <returns>A <see cref="CogniteResult"/> containing errors that occured and a list of the created assets</returns>
public static async Task<CogniteResult<Asset>> EnsureExistsAsync(
Expand All @@ -128,20 +133,21 @@ public static async Task<CogniteResult<Asset>> EnsureExistsAsync(
int chunkSize,
int throttleSize,
RetryMode retryMode,
SanitationMode sanitationMode,
CancellationToken token)
{
CogniteError prePushError;
(assetsToEnsure, prePushError) = Sanitation.CleanAssetRequest(assetsToEnsure);
IEnumerable<CogniteError> errors;
(assetsToEnsure, errors) = Sanitation.CleanAssetRequest(assetsToEnsure, sanitationMode);

var chunks = assetsToEnsure
.ChunkBy(chunkSize)
.ToList();
_logger.LogDebug("Ensuring assets. Number of assets: {Number}. Number of chunks: {Chunks}", assetsToEnsure.Count(), chunks.Count());
int size = chunks.Count + (prePushError != null ? 1 : 0);
int size = chunks.Count + (errors.Any() ? 1 : 0);
var results = new CogniteResult<Asset>[size];
if (prePushError != null)
if (errors.Any())
{
results[size - 1] = new CogniteResult<Asset>(new[] { prePushError }, null);
results[size - 1] = new CogniteResult<Asset>(errors, null);
if (size == 1) return results[size - 1];
}
if (size == 0) return new CogniteResult<Asset>(null, null);
Expand Down Expand Up @@ -172,6 +178,7 @@ private static async Task<CogniteResult<Asset>> GetOrCreateAssetsChunk(
Func<IEnumerable<string>, Task<IEnumerable<AssetCreate>>> buildAssets,
int backoff,
RetryMode retryMode,
SanitationMode sanitationMode,
CancellationToken token)
{
IEnumerable<Asset> found;
Expand All @@ -191,15 +198,15 @@ private static async Task<CogniteResult<Asset>> GetOrCreateAssetsChunk(
_logger.LogDebug("Could not fetch {Missing} out of {Found} assets. Attempting to create the missing ones", missing.Count, externalIds.Count());
var toCreate = await buildAssets(missing);

CogniteError prePushError;
(toCreate, prePushError) = Sanitation.CleanAssetRequest(toCreate);
IEnumerable<CogniteError> errors;
(toCreate, errors) = Sanitation.CleanAssetRequest(toCreate, sanitationMode);

var result = await CreateAssetsHandleErrors(assets, toCreate, retryMode, token);
result.Results = result.Results == null ? found : result.Results.Concat(found);

if (prePushError != null)
if (errors.Any())
{
result.Errors = result.Errors == null ? new[] { prePushError } : result.Errors.Append(prePushError);
result.Errors = result.Errors == null ? errors : result.Errors.Concat(errors);
}

if (!result.Errors?.Any() ?? false
Expand All @@ -225,7 +232,7 @@ private static async Task<CogniteResult<Asset>> GetOrCreateAssetsChunk(
_logger.LogDebug("Found {cnt} duplicated assets, retrying", duplicatedIds.Count);

await Task.Delay(TimeSpan.FromSeconds(0.1 * Math.Pow(2, backoff)));
var nextResult = await GetOrCreateAssetsChunk(assets, duplicatedIds, buildAssets, backoff + 1, retryMode, token);
var nextResult = await GetOrCreateAssetsChunk(assets, duplicatedIds, buildAssets, backoff + 1, retryMode, sanitationMode, token);
result = result.Merge(nextResult);

return result;
Expand Down
59 changes: 59 additions & 0 deletions Cognite.Extensions/CogniteResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,10 @@ public enum ErrorType
/// </summary>
ItemDuplicated,
/// <summary>
/// Item does not satisfy CDF field limits
/// </summary>
SanitationFailed,
/// <summary>
/// Something else happened that caused the request to fail
/// </summary>
FatalFailure = -1
Expand Down Expand Up @@ -621,6 +625,42 @@ public enum ResourceType
/// </summary>
LegacyName,
/// <summary>
/// Name on an asset or timeseries
/// </summary>
Name,
/// <summary>
/// Type of event
/// </summary>
Type,
/// <summary>
/// SubType of event
/// </summary>
SubType,
/// <summary>
/// Source on event or asset
/// </summary>
Source,
/// <summary>
/// Metadata on event, asset or timeseries
/// </summary>
Metadata,
/// <summary>
/// Labels on an asset
/// </summary>
Labels,
/// <summary>
/// Description on event, asset or timeseries
/// </summary>
Description,
/// <summary>
/// Start and end time on an event
/// </summary>
TimeRange,
/// <summary>
/// Unit on a timeseries
/// </summary>
Unit,
/// <summary>
/// None or unknown
/// </summary>
None = -1
Expand Down Expand Up @@ -678,4 +718,23 @@ public enum RetryMode
/// </summary>
OnFatalKeepDuplicates = 7
}
/// <summary>
/// How to do sanitation of objects before creating the request
/// </summary>
public enum SanitationMode
{
/// <summary>
/// Don't do any sanitation. If you use this, you should make sure that objects are sanitized
/// some other way.
/// </summary>
None,
/// <summary>
/// Clean objects before requesting. This modifies the passed request.
/// </summary>
Clean,
/// <summary>
/// Remove any offending objects and report them in the result.
/// </summary>
Remove
}
}
31 changes: 19 additions & 12 deletions Cognite.Extensions/EventExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ internal static void SetLogger(ILogger logger)
/// <param name="chunkSize">Chunk size</param>
/// <param name="throttleSize">Throttle size</param>
/// <param name="retryMode">How to handle failed requests</param>
/// <param name="sanitationMode">The type of sanitation to apply to events before creating</param>
/// <param name="token">Cancellation token</param>
/// <returns>A <see cref="CogniteResult"/> containing errors that occured and a list of the created and found events</returns>
public static Task<CogniteResult<Event>> GetOrCreateAsync(
Expand All @@ -47,13 +48,14 @@ public static Task<CogniteResult<Event>> GetOrCreateAsync(
int chunkSize,
int throttleSize,
RetryMode retryMode,
SanitationMode sanitationMode,
CancellationToken token)
{
Task<IEnumerable<EventCreate>> asyncBuildEvents(IEnumerable<string> ids)
{
return Task.FromResult(buildEvents(ids));
}
return resource.GetOrCreateAsync(externalIds, asyncBuildEvents, chunkSize, throttleSize, retryMode, token);
return resource.GetOrCreateAsync(externalIds, asyncBuildEvents, chunkSize, throttleSize, retryMode, sanitationMode, token);
}
/// <summary>
/// Get or create the events with the provided <paramref name="externalIds"/> exist in CDF.
Expand All @@ -69,6 +71,7 @@ Task<IEnumerable<EventCreate>> asyncBuildEvents(IEnumerable<string> ids)
/// <param name="chunkSize">Chunk size</param>
/// <param name="throttleSize">Throttle size</param>
/// <param name="retryMode">How to handle failed requests</param>
/// <param name="sanitationMode">The type of sanitation to apply to events before creating</param>
/// <param name="token">Cancellation token</param>
/// <returns>A <see cref="CogniteResult"/> containing errors that occured and a list of the created and found events</returns>
public static async Task<CogniteResult<Event>> GetOrCreateAsync(
Expand All @@ -78,6 +81,7 @@ public static async Task<CogniteResult<Event>> GetOrCreateAsync(
int chunkSize,
int throttleSize,
RetryMode retryMode,
SanitationMode sanitationMode,
CancellationToken token)
{
var chunks = externalIds
Expand All @@ -91,7 +95,7 @@ public static async Task<CogniteResult<Event>> GetOrCreateAsync(
var generators = chunks
.Select<IEnumerable<string>, Func<Task>>(
(chunk, idx) => async () => {
var result = await GetOrCreateEventsChunk(resource, chunk, buildEvents, 0, retryMode, token);
var result = await GetOrCreateEventsChunk(resource, chunk, buildEvents, 0, retryMode, sanitationMode, token);
results[idx] = result;
});

Expand Down Expand Up @@ -122,6 +126,7 @@ await generators.RunThrottled(
/// <param name="throttleSize">Throttle size</param>
/// <param name="retryMode">How to do retries. Keeping duplicates is not valid for
/// this method.</param>
/// <param name="sanitationMode">The type of sanitation to apply to events before creating</param>
/// <param name="token">Cancellation token</param>
/// <returns>A <see cref="CogniteResult"/> containing errors that occured and a list of the created events</returns>
public static async Task<CogniteResult<Event>> EnsureExistsAsync(
Expand All @@ -130,22 +135,23 @@ public static async Task<CogniteResult<Event>> EnsureExistsAsync(
int chunkSize,
int throttleSize,
RetryMode retryMode,
SanitationMode sanitationMode,
CancellationToken token)
{
CogniteError prePushError;
(events, prePushError) = Sanitation.CleanEventRequest(events);
IEnumerable<CogniteError> errors;
(events, errors) = Sanitation.CleanEventRequest(events, sanitationMode);

var chunks = events
.ChunkBy(chunkSize)
.ToList();

_logger.LogDebug("Ensuring events. Number of events: {Number}. Number of chunks: {Chunks}", events.Count(), chunks.Count());

int size = chunks.Count + (prePushError != null ? 1 : 0);
int size = chunks.Count + (errors.Any() ? 1 : 0);
var results = new CogniteResult<Event>[size];
if (prePushError != null)
if (errors.Any())
{
results[size - 1] = new CogniteResult<Event>(new[] { prePushError }, null);
results[size - 1] = new CogniteResult<Event>(errors, null);
if (size == 1) return results[size - 1];
}
if (!results.Any()) return new CogniteResult<Event>(null, null);
Expand Down Expand Up @@ -176,6 +182,7 @@ private static async Task<CogniteResult<Event>> GetOrCreateEventsChunk(
Func<IEnumerable<string>, Task<IEnumerable<EventCreate>>> buildEvents,
int backoff,
RetryMode retryMode,
SanitationMode sanitationMode,
CancellationToken token)
{
IEnumerable<Event> found;
Expand All @@ -195,15 +202,15 @@ private static async Task<CogniteResult<Event>> GetOrCreateEventsChunk(
_logger.LogDebug("Could not fetch {Missing} out of {Found} events. Attempting to create the missing ones", missing.Count, externalIds.Count());
var toCreate = await buildEvents(missing);

CogniteError prePushError;
(toCreate, prePushError) = Sanitation.CleanEventRequest(toCreate);
IEnumerable<CogniteError> errors;
(toCreate, errors) = Sanitation.CleanEventRequest(toCreate, sanitationMode);

var result = await CreateEventsHandleErrors(resource, toCreate, retryMode, token);
result.Results = result.Results == null ? found : result.Results.Concat(found);

if (prePushError != null)
if (errors.Any())
{
result.Errors = result.Errors == null ? new[] { prePushError } : result.Errors.Append(prePushError);
result.Errors = result.Errors == null ? errors : result.Errors.Concat(errors);
}

if (!result.Errors?.Any() ?? false
Expand All @@ -229,7 +236,7 @@ private static async Task<CogniteResult<Event>> GetOrCreateEventsChunk(
_logger.LogDebug("Found {cnt} duplicated events, retrying", duplicatedIds.Count);

await Task.Delay(TimeSpan.FromSeconds(0.1 * Math.Pow(2, backoff)));
var nextResult = await GetOrCreateEventsChunk(resource, duplicatedIds, buildEvents, backoff + 1, retryMode, token);
var nextResult = await GetOrCreateEventsChunk(resource, duplicatedIds, buildEvents, backoff + 1, retryMode, sanitationMode, token);
result = result.Merge(nextResult);

return result;
Expand Down
Loading

0 comments on commit 9a6cfa0

Please sign in to comment.