From 90e3673f1d358c7e0844f0c7f1cadf2ce660a75a Mon Sep 17 00:00:00 2001 From: Einar Date: Wed, 8 May 2024 07:22:17 +0200 Subject: [PATCH] Promote status codes to GA (#413) * Promote status codes to GA * Update verify * Fix tests --- Cognite.Extensions/Cognite.Extensions.csproj | 2 +- Cognite.Extensions/CogniteUtils.cs | 2 - .../TimeSeries/Beta/DataPointExtensions.cs | 694 ------------------ .../TimeSeries/DataPointExtensions.cs | 14 +- .../TimeSeries/DataPointSanitation.cs | 5 +- .../{Beta => }/StatusCodeHelpers.cs | 7 +- .../integration/RawHighAvailabilityTest.cs | 4 +- .../integration/RedisHighAvailabilityTest.cs | 4 +- ExtractorUtils.Test/unit/BinaryBufferTest.cs | 5 +- ExtractorUtils.Test/unit/StatusCodeTests.cs | 2 +- ExtractorUtils/Cognite/CogniteDestination.cs | 78 -- version | 2 +- 12 files changed, 25 insertions(+), 794 deletions(-) delete mode 100644 Cognite.Extensions/TimeSeries/Beta/DataPointExtensions.cs rename Cognite.Extensions/TimeSeries/{Beta => }/StatusCodeHelpers.cs (99%) diff --git a/Cognite.Extensions/Cognite.Extensions.csproj b/Cognite.Extensions/Cognite.Extensions.csproj index a78e8591..3da77f8d 100644 --- a/Cognite.Extensions/Cognite.Extensions.csproj +++ b/Cognite.Extensions/Cognite.Extensions.csproj @@ -28,7 +28,7 @@ - + diff --git a/Cognite.Extensions/CogniteUtils.cs b/Cognite.Extensions/CogniteUtils.cs index 93e5cb91..47991ca3 100644 --- a/Cognite.Extensions/CogniteUtils.cs +++ b/Cognite.Extensions/CogniteUtils.cs @@ -14,8 +14,6 @@ using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; -using Com.Cognite.V1.Timeseries.Proto.Beta; -using Cognite.Extensions.Beta; namespace Cognite.Extensions { diff --git a/Cognite.Extensions/TimeSeries/Beta/DataPointExtensions.cs b/Cognite.Extensions/TimeSeries/Beta/DataPointExtensions.cs deleted file mode 100644 index 9038d427..00000000 --- a/Cognite.Extensions/TimeSeries/Beta/DataPointExtensions.cs +++ /dev/null @@ -1,694 +0,0 @@ -using Cognite.Extractor.Common; -using CogniteSdk; -using CogniteSdk.Resources; -using Com.Cognite.V1.Timeseries.Proto.Beta; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; -using Prometheus; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Data.Common; -using System.IO.Compression; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using TimeRange = Cognite.Extractor.Common.TimeRange; - -namespace Cognite.Extensions.Beta -{ - /// - /// Extensions to datapoints - /// - public static class DataPointExtensions - { - private const int _maxNumOfVerifyRequests = 10; - private static ILogger _logger = new NullLogger(); - - internal static void SetLogger(ILogger logger) - { - _logger = logger; - } - - /// - /// Create a protobuf insertion request from dictionary - /// - /// Datapoints to insert - /// Converted request - public static DataPointInsertionRequest ToInsertRequest(this IDictionary> dps) - { - if (dps == null) throw new ArgumentNullException(nameof(dps)); - var request = new DataPointInsertionRequest(); - var dataPointCount = 0; - foreach (var kvp in dps) - { - var item = new DataPointInsertionItem(); - if (kvp.Key.Id.HasValue) - { - item.Id = kvp.Key.Id.Value; - } - else - { - item.ExternalId = kvp.Key.ExternalId.ToString(); - } - if (!kvp.Value.Any()) - { - continue; - } - var stringPoints = kvp.Value - .Where(dp => dp.IsString) - .Select(dp => new StringDatapoint - { - Timestamp = dp.Timestamp, - Value = dp.StringValue, - NullValue = dp.StringValue is null, - Status = new Status - { - Code = (long)dp.Status.Code - } - }); - var numericPoints = kvp.Value - .Where(dp => !dp.IsString) - .Select(dp => new NumericDatapoint - { - Timestamp = dp.Timestamp, - Value = dp.NumericValue ?? 0, - NullValue = dp.NumericValue is null, - Status = new Status - { - Code = (long)dp.Status.Code - } - }); - if (stringPoints.Any()) - { - var stringData = new StringDatapoints(); - stringData.Datapoints.AddRange(stringPoints); - if (stringData.Datapoints.Count > 0) - { - item.StringDatapoints = stringData; - request.Items.Add(item); - dataPointCount += stringData.Datapoints.Count; - } - } - else - { - var doubleData = new NumericDatapoints(); - doubleData.Datapoints.AddRange(numericPoints); - if (doubleData.Datapoints.Count > 0) - { - item.NumericDatapoints = doubleData; - request.Items.Add(item); - dataPointCount += doubleData.Datapoints.Count; - } - } - } - return request; - } - - /// - /// Insert datapoints to timeseries. Insertions are chunked and cleaned according to configuration, - /// and can optionally handle errors. If any timeseries missing from the result and inserted by externalId, - /// they are created before the points are inserted again. - /// - /// Cognite client - /// Datapoints to insert - /// Maximum number of timeseries per chunk - /// Maximum number of datapoints per timeseries - /// Maximum number of parallel request - /// Maximum number of timeseries to retrieve per request - /// Maximum number of parallel requests to retrieve timeseries - /// Number of datapoints total before using gzip compression. - /// How to sanitize datapoints - /// How to handle retries - /// Optional replacement for NaN double values - /// Optional data set id - /// Cancellation token - /// Results with a list of errors. If TimeSeriesResult is null, no timeseries were attempted created. - public static async Task<(CogniteResult DataPointResult, CogniteResult? TimeSeriesResult)> InsertAsyncCreateMissing( - Client client, - IDictionary> points, - int keyChunkSize, - int valueChunkSize, - int throttleSize, - int timeseriesChunkSize, - int timeseriesThrottleSize, - int gzipCountLimit, - SanitationMode sanitationMode, - RetryMode retryMode, - double? nanReplacement, - long? dataSetId, - CancellationToken token) - { - if (client == null) throw new ArgumentNullException(nameof(client)); - if (points == null) throw new ArgumentNullException(nameof(points)); - - var result = await InsertAsync(client, points, keyChunkSize, valueChunkSize, throttleSize, - timeseriesChunkSize, timeseriesThrottleSize, gzipCountLimit, sanitationMode, - RetryMode.OnError, nanReplacement, token).ConfigureAwait(false); - - if (result.Errors?.Any(err => err.Type == ErrorType.FatalFailure) ?? false) return (result, null); - - var missingIds = new HashSet((result.Errors ?? Enumerable.Empty()) - .Where(err => err.Type == ErrorType.ItemMissing) - .SelectMany(err => err.Values ?? Enumerable.Empty()) - .Where(idt => idt.ExternalId != null)); - - if (!missingIds.Any()) return (result, null); - - _logger.LogInformation("Creating {Count} missing timeseries", missingIds.Count); - - var toCreate = new List(); - foreach (var id in missingIds) - { - var dp = points[id].FirstOrDefault(); - if (dp == null) continue; - - bool isString = dp.NumericValue == null; - - toCreate.Add(new TimeSeriesCreate - { - ExternalId = id.ExternalId, - IsString = isString, - DataSetId = dataSetId - }); - } - - var tsResult = await client.TimeSeries.EnsureTimeSeriesExistsAsync( - toCreate, - timeseriesChunkSize, - timeseriesThrottleSize, - retryMode, - sanitationMode, - token).ConfigureAwait(false); - - if (tsResult.Errors?.Any(err => err.Type != ErrorType.ItemExists) ?? false) return (result, tsResult); - - var pointsToInsert = points.Where(kvp => missingIds.Contains(kvp.Key)).ToDictionary(kvp => kvp.Key, kvp => kvp.Value); - - var result2 = await InsertAsync(client, pointsToInsert, keyChunkSize, valueChunkSize, throttleSize, - timeseriesChunkSize, timeseriesThrottleSize, gzipCountLimit, sanitationMode, - RetryMode.OnError, nanReplacement, token).ConfigureAwait(false); - - return (result.Merge(result2), tsResult); - } - - - /// - /// Insert datapoints to timeseries. Insertions are chunked and cleaned according to configuration, - /// and can optionally handle errors. - /// - /// Cognite client - /// Datapoints to insert - /// Maximum number of timeseries per chunk - /// Maximum number of datapoints per timeseries - /// Maximum number of parallel request - /// Maximum number of timeseries to retrieve per request - /// Maximum number of parallel requests to retrieve timeseries - /// Number of datapoints total before using gzip compression. - /// How to sanitize datapoints - /// How to handle retries - /// Optional replacement for NaN double values - /// Cancellation token - /// Result with a list of errors - public static async Task> InsertAsync( - Client client, - IDictionary> points, - int keyChunkSize, - int valueChunkSize, - int throttleSize, - int timeseriesChunkSize, - int timeseriesThrottleSize, - int gzipCountLimit, - SanitationMode sanitationMode, - RetryMode retryMode, - double? nanReplacement, - CancellationToken token) - { - IEnumerable> errors; - (points, errors) = Sanitation.CleanDataPointsRequest(points, sanitationMode, nanReplacement); - - var chunks = points - .Select(p => (p.Key, p.Value)) - .ChunkBy(valueChunkSize, keyChunkSize) - .Select(chunk => chunk.ToDictionary(pair => pair.Key, pair => pair.Values)) - .ToList(); - - int size = chunks.Count + (errors.Any() ? 1 : 0); - var results = new CogniteResult[size]; - - if (errors.Any()) - { - results[size - 1] = new CogniteResult(errors); - if (size == 1) return results[size - 1]; - } - if (size == 0) return new CogniteResult(null); - - _logger.LogDebug("Inserting timeseries datapoints. Number of timeseries: {Number}. Number of chunks: {Chunks}", points.Count, chunks.Count); - var generators = chunks - .Select>, Func>( - (chunk, idx) => async () => - { - var result = await - InsertDataPointsHandleErrors(client, chunk, timeseriesChunkSize, timeseriesThrottleSize, gzipCountLimit, retryMode, token) - .ConfigureAwait(false); - results[idx] = result; - }); - - int taskNum = 0; - await generators.RunThrottled( - throttleSize, - (_) => - { - if (chunks.Count > 1) - _logger.LogDebug("{MethodName} completed {NumDone}/{TotalNum} tasks", - nameof(InsertAsync), ++taskNum, chunks.Count); - }, - token).ConfigureAwait(false); - - return CogniteResult.Merge(results); - } - - private static async Task> InsertDataPointsHandleErrors( - Client client, - IDictionary> points, - int timeseriesChunkSize, - int timeseriesThrottleSize, - int gzipCountLimit, - RetryMode retryMode, - CancellationToken token) - { - var errors = new List>(); - while (points != null && points.Any() && !token.IsCancellationRequested) - { - var request = points.ToInsertRequest(); - try - { - bool useGzip = false; - int count = points.Sum(kvp => kvp.Value.Count()); - if (gzipCountLimit >= 0 && count >= gzipCountLimit) - { - useGzip = true; - } - - if (useGzip) - { - using (CdfMetrics.Datapoints.WithLabels("create")) - { - await client.Beta.DataPoints.CreateAsync(request, CompressionLevel.Fastest, token).ConfigureAwait(false); - } - } - else - { - using (CdfMetrics.Datapoints.WithLabels("create")) - { - await client.Beta.DataPoints.CreateAsync(request, token).ConfigureAwait(false); - } - } - - CdfMetrics.NumberDatapoints.Inc(count); - - _logger.LogDebug("Created {cnt} datapoints for {ts} timeseries in CDF", count, points.Count); - return new CogniteResult(errors); - } - catch (Exception ex) - { - _logger.LogDebug("Failed to create datapoints for {seq} timeseries: {msg}", points.Count, ex.Message); - var error = ResultHandlers.ParseException(ex, RequestType.CreateDatapoints); - - if (error.Type == ErrorType.FatalFailure - && (retryMode == RetryMode.OnFatal - || retryMode == RetryMode.OnFatalKeepDuplicates)) - { - await Task.Delay(1000, token).ConfigureAwait(false); - } - else if (retryMode == RetryMode.None) - { - errors.Add(error); - break; - } - else - { - if (!error.Complete) - { - (error, points) = await ResultHandlers - .VerifyDatapointsFromCDF(client.TimeSeries, error, - points, timeseriesChunkSize, timeseriesThrottleSize, token) - .ConfigureAwait(false); - errors.Add(error); - } - else - { - errors.Add(error); - } - points = ResultHandlers.CleanFromError(error, points); - } - } - } - - return new CogniteResult(errors); - } - - /// - /// Deletes ranges of data points in CDF. The parameter contains the first (inclusive) - /// and last (inclusive) timestamps for the range. After the delete request is sent to CDF, attempt to confirm that - /// the data points were deleted by querying the time range. Deletes in CDF are eventually consistent, failing to - /// confirm the deletion does not mean that the operation failed in CDF - /// - /// Cognite datapoints resource - /// Ranges to delete - /// Chunk size for delete operations - /// Chunk size for list operations - /// Throttle size for delete operations - /// Throttle size for list operations - /// Cancelation token - /// A object with any missing ids or ids with unconfirmed deletes - public static async Task DeleteIgnoreErrorsAsync( - this BetaDataPointsResource dataPoints, - IDictionary> ranges, - int deleteChunkSize, - int listChunkSize, - int deleteThrottleSize, - int listThrottleSize, - CancellationToken token) - { - if (ranges == null) - { - throw new ArgumentNullException(nameof(ranges)); - } - var toDelete = new List(); - foreach (var kvp in ranges) - { - _logger.LogTrace("Deleting data points from time series {Name}. Ranges: {Ranges}", - kvp.Key.ToString(), string.Join(", ", kvp.Value.Select(v => v.ToString()))); - toDelete.AddRange(kvp.Value.Select(r => - new IdentityWithRange - { - ExternalId = kvp.Key.ExternalId, - Id = kvp.Key.Id, - InclusiveBegin = r.First.ToUnixTimeMilliseconds(), - ExclusiveEnd = r.Last.ToUnixTimeMilliseconds() + 1 // exclusive - }) - ); - } - - var chunks = toDelete - .ChunkBy(deleteChunkSize) - .ToList(); // Maximum number of items in the /timeseries/data/delete endpoint. - - var missing = new HashSet(); - var mutex = new object(); - - var generators = chunks - .Select, Func>( - c => async () => - { - var errors = await DeleteDataPointsIgnoreErrorsChunk(dataPoints, c, token).ConfigureAwait(false); - lock (mutex) - { - missing.UnionWith(errors); - } - }); - - var taskNum = 0; - await generators.RunThrottled( - deleteThrottleSize, - (_) => - { - if (chunks.Count > 1) - _logger.LogDebug("{MethodName} completed {NumDone}/{TotalNum} tasks", - nameof(DeleteIgnoreErrorsAsync), ++taskNum, chunks.Count); - }, - token).ConfigureAwait(false); - - return new DeleteError(missing, Enumerable.Empty()); - } - - private static async Task> DeleteDataPointsIgnoreErrorsChunk( - BetaDataPointsResource dataPoints, - IEnumerable chunks, - CancellationToken token) - { - var missing = new HashSet(); - if (!chunks.Any()) return missing; - - var deleteQuery = new DataPointsDelete() - { - Items = chunks - }; - try - { - using (CdfMetrics.Datapoints.WithLabels("delete").NewTimer()) - { - await dataPoints.DeleteAsync(deleteQuery, token).ConfigureAwait(false); - } - } - catch (ResponseException e) when (e.Code == 400 && e.Missing != null && e.Missing.Any()) - { - CogniteUtils.ExtractMissingFromResponseException(missing, e); - var remaining = chunks.Where(i => !missing.Contains(i.Id.HasValue ? new Identity(i.Id.Value) : new Identity(i.ExternalId))); - var errors = await DeleteDataPointsIgnoreErrorsChunk(dataPoints, remaining, token).ConfigureAwait(false); - missing.UnionWith(errors); - } - return missing; - } - - /// - /// Get the last timestamp for each time series given in before each given timestamp. - /// Ignores timeseries not in CDF. The return dictionary contains only ids that exist in CDF. - /// Note that end limits closer to actual endpoints in CDF is considerably faster. - /// - /// DataPointsResource to use - /// ExternalIds and last timestamp. Let last timestamp be DateTime.MaxValue to use default ("now"). - /// Number of timeseries per request - /// Maximum number of parallel requests - /// Cancellation token - /// Dictionary from externalId to last timestamp, only contains existing timeseries - public static async Task> GetLatestTimestamps( - this BetaDataPointsResource dataPoints, - IEnumerable<(Identity id, DateTime before)> ids, - int chunkSize, - int throttleSize, - CancellationToken token) - { - var ret = new ConcurrentDictionary(); - var idSet = new HashSet(ids.Select(id => id.id)); - - var chunks = ids - .Select((pair) => - { - var id = pair.id; - IdentityWithBefore idt = id.ExternalId == null ? IdentityWithBefore.Create(id.Id!.Value) : IdentityWithBefore.Create(id.ExternalId); - if (pair.before != DateTime.MaxValue) - { - idt.Before = pair.before.ToUnixTimeMilliseconds().ToString(); - } - return idt; - }) - .ChunkBy(chunkSize) - .ToList(); - - var generators = chunks.Select, Func>( - chunk => async () => - { - IEnumerable> dps; - using (CdfMetrics.Datapoints.WithLabels("latest").NewTimer()) - { - dps = await dataPoints.LatestAsync( - new DataPointsLatestQuery - { - IgnoreUnknownIds = true, - Items = chunk - }, token).ConfigureAwait(false); - } - - foreach (var dp in dps) - { - if (dp.DataPoints.Any()) - { - Identity id; - if (dp.ExternalId != null) - { - id = new Identity(dp.ExternalId); - if (!idSet.Contains(id)) - { - id = new Identity(dp.Id); - } - } - else - { - id = new Identity(dp.Id); - } - ret[id] = CogniteTime.FromUnixTimeMilliseconds(dp.DataPoints.First().Timestamp); - } - } - }); - int numTasks = 0; - await generators - .RunThrottled(throttleSize, (_) => - _logger.LogDebug("Last timestamp from CDF: {num}/{total}", ++numTasks, chunks.Count), token) - .ConfigureAwait(false); - return ret; - } - - /// - /// Get the first timestamp for each time series given in after each given timestamp. - /// Ignores timeseries not in CDF. The return dictionary contains only ids that exist in CDF. - /// - /// DataPointsResource to use - /// ExternalIds and last timestamp. Let last timestamp be Epoch to use default. - /// Number of timeseries per request - /// Maximum number of parallel requests - /// Cancellation token - /// Dictionary from externalId to first timestamp, only contains existing timeseries - public static async Task> GetEarliestTimestamps( - this BetaDataPointsResource dataPoints, - IEnumerable<(Identity id, DateTime after)> ids, - int chunkSize, - int throttleSize, - CancellationToken token) - { - var ret = new ConcurrentDictionary(); - var idSet = new HashSet(ids.Select(id => id.id)); - - var chunks = ids - .Select((pair) => - { - var query = new DataPointsQueryItem(); - if (pair.id.Id.HasValue) - { - query.Id = pair.id.Id.Value; - } - else - { - query.ExternalId = pair.id.ExternalId; - } - if (pair.after > CogniteTime.DateTimeEpoch) - { - query.Start = pair.after.ToUnixTimeMilliseconds().ToString(); - } - return query; - }) - .ChunkBy(chunkSize) - .ToList(); - - var generators = chunks.Select, Func>( - chunk => async () => - { - DataPointListResponse dps; - using (CdfMetrics.Datapoints.WithLabels("first").NewTimer()) - { - dps = await dataPoints.ListAsync( - new DataPointsQuery - { - IgnoreUnknownIds = true, - Items = chunk, - Limit = 1 - }, token).ConfigureAwait(false); - } - - foreach (var dp in dps.Items) - { - Identity id; - if (dp.ExternalId != null) - { - id = new Identity(dp.ExternalId); - if (!idSet.Contains(id)) - { - id = new Identity(dp.Id); - } - } - else - { - id = new Identity(dp.Id); - } - if (dp.DatapointTypeCase == DataPointListItem.DatapointTypeOneofCase.NumericDatapoints - && dp.NumericDatapoints.Datapoints.Any()) - { - ret[id] = CogniteTime.FromUnixTimeMilliseconds(dp.NumericDatapoints.Datapoints.First().Timestamp); - } - else if (dp.DatapointTypeCase == DataPointListItem.DatapointTypeOneofCase.StringDatapoints - && dp.StringDatapoints.Datapoints.Any()) - { - ret[id] = CogniteTime.FromUnixTimeMilliseconds(dp.StringDatapoints.Datapoints.First().Timestamp); - } - } - }); - int numTasks = 0; - await generators - .RunThrottled(throttleSize, (_) => - _logger.LogDebug("First timestamp from CDF: {num}/{total}", ++numTasks, chunks.Count), token) - .ConfigureAwait(false); - return ret; - } - /// - /// Fetches the range of datapoints present in CDF. Limited by given ranges for each id. - /// Note that end limits closer to actual endpoints in CDF is considerably faster. - /// - /// DataPointsResource to use - /// ExternalIds and start/end of region to look for datapoints. - /// Use TimeRange.Complete for first after epoch, and last before now. - /// Number of timeseries to read for each earliest request - /// Number of timeseries to read for each latest request - /// Max number of parallel requests - /// If true, fetch latest timestamps - /// If true, fetch earliest timestamps - /// Cancellation token - /// - public static async Task> GetExtractedRanges( - this BetaDataPointsResource dataPoints, - IEnumerable<(Identity id, TimeRange limit)> ids, - int chunkSizeEarliest, - int chunkSizeLatest, - int throttleSize, - bool latest, - bool earliest, - CancellationToken token) - { - if (ids == null) - { - throw new ArgumentNullException(nameof(ids)); - } - _logger.LogDebug("Getting extracted ranges for {num} timeseries", ids.Count()); - - if (latest && earliest) throttleSize = Math.Max(1, throttleSize / 2); - - var ranges = ids.ToDictionary(pair => pair.id, pair => TimeRange.Empty); - var tasks = new List>>(); - if (latest) - { - tasks.Add(dataPoints.GetLatestTimestamps(ids.Select(pair => (pair.id, pair.limit?.Last ?? DateTime.MaxValue)), - chunkSizeLatest, throttleSize, token)); - } - if (earliest) - { - tasks.Add(dataPoints.GetEarliestTimestamps(ids.Select(pair => (pair.id, pair.limit?.First ?? CogniteTime.DateTimeEpoch)), - chunkSizeEarliest, throttleSize, token)); - } - var results = await Task.WhenAll(tasks).ConfigureAwait(false); - if (latest) - { - var latestResult = results[0]; - foreach (var id in ids) - { - if (latestResult.TryGetValue(id.id, out DateTime ts)) - { - ranges[id.id] = new TimeRange(CogniteTime.DateTimeEpoch, ts); - } - } - } - if (earliest) - { - var earliestResult = results[latest ? 1 : 0]; - foreach (var id in ids) - { - if (earliestResult.TryGetValue(id.id, out DateTime ts)) - { - ranges[id.id] = new TimeRange(ts, ranges[id.id].Last); - } - } - } - return ranges; - } - } -} diff --git a/Cognite.Extensions/TimeSeries/DataPointExtensions.cs b/Cognite.Extensions/TimeSeries/DataPointExtensions.cs index 352eb10e..282e113f 100644 --- a/Cognite.Extensions/TimeSeries/DataPointExtensions.cs +++ b/Cognite.Extensions/TimeSeries/DataPointExtensions.cs @@ -60,14 +60,24 @@ public static DataPointInsertionRequest ToInsertRequest(this IDictionary new StringDatapoint { Timestamp = dp.Timestamp, - Value = dp.StringValue + Value = dp.StringValue, + NullValue = dp.StringValue is null, + Status = new Status + { + Code = (long)dp.Status.Code + } }); var numericPoints = kvp.Value .Where(dp => !dp.IsString && dp.Status.IsGood) .Select(dp => new NumericDatapoint { Timestamp = dp.Timestamp, - Value = dp.NumericValue!.Value + Value = dp.NumericValue!.Value, + NullValue = dp.NumericValue is null, + Status = new Status + { + Code = (long)dp.Status.Code + } }); if (stringPoints.Any()) { diff --git a/Cognite.Extensions/TimeSeries/DataPointSanitation.cs b/Cognite.Extensions/TimeSeries/DataPointSanitation.cs index 6f73de05..a7fa6f45 100644 --- a/Cognite.Extensions/TimeSeries/DataPointSanitation.cs +++ b/Cognite.Extensions/TimeSeries/DataPointSanitation.cs @@ -58,12 +58,13 @@ public static Datapoint Sanitize(this Datapoint point, double? nanReplacement) { if (point.IsString) { - if (point.StringValue == null || point.StringValue.Length > CogniteUtils.StringLengthMax) + if (point.StringValue == null && point.Status.IsGood + || (point.StringValue?.Length ?? 0) > CogniteUtils.StringLengthMax) { return ResourceType.DataPointValue; } } - else + else if (point.Status.IsGood) { double value = point.NumericValue!.Value; if (double.IsNaN(value) diff --git a/Cognite.Extensions/TimeSeries/Beta/StatusCodeHelpers.cs b/Cognite.Extensions/TimeSeries/StatusCodeHelpers.cs similarity index 99% rename from Cognite.Extensions/TimeSeries/Beta/StatusCodeHelpers.cs rename to Cognite.Extensions/TimeSeries/StatusCodeHelpers.cs index 965b8012..b853f90b 100644 --- a/Cognite.Extensions/TimeSeries/Beta/StatusCodeHelpers.cs +++ b/Cognite.Extensions/TimeSeries/StatusCodeHelpers.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Text; -namespace Cognite.Extensions.Beta +namespace Cognite.Extensions { // NOTE: This entire section is largely taken from the time series backend, // it more or less replicates the behavior of the API when it comes to parsing and handling status codes. @@ -239,11 +239,6 @@ public static StatusCode Parse(string symbol) { return "Calculated and Interpolated flags are mutually exclusive"; } - - if (infoBits == 0) - { - return "When info type is 01, info bits must not be 0"; - } } else if (infoBits != 0) { diff --git a/ExtractorUtils.Test/integration/RawHighAvailabilityTest.cs b/ExtractorUtils.Test/integration/RawHighAvailabilityTest.cs index 028f34d5..15c1b2d6 100644 --- a/ExtractorUtils.Test/integration/RawHighAvailabilityTest.cs +++ b/ExtractorUtils.Test/integration/RawHighAvailabilityTest.cs @@ -66,7 +66,7 @@ public RawHighAvailabilityTest(ITestOutputHelper output) } [Fact(Timeout = 30000)] - public async void TestExtractorManagerRun() + public async Task TestExtractorManagerRun() { // Creating configs for two different extractors. string configPath_0 = SetupConfig(index: 0); @@ -101,7 +101,7 @@ public async void TestExtractorManagerRun() } [Fact(Timeout = 45000)] - public async void TestRestartExtractor() + public async Task TestRestartExtractor() { // Creating config for two extractors. string configPath_0 = SetupConfig(index: 2); diff --git a/ExtractorUtils.Test/integration/RedisHighAvailabilityTest.cs b/ExtractorUtils.Test/integration/RedisHighAvailabilityTest.cs index b1974bf8..bb63c6d2 100644 --- a/ExtractorUtils.Test/integration/RedisHighAvailabilityTest.cs +++ b/ExtractorUtils.Test/integration/RedisHighAvailabilityTest.cs @@ -23,7 +23,7 @@ public RedisHighAvailabilityTest(ITestOutputHelper output) } [Fact(Timeout = 30000)] - public async void TestRedisExtractorManagerRun() + public async Task TestRedisExtractorManagerRun() { Assert.True(_redis.IsConnected); @@ -58,7 +58,7 @@ public async void TestRedisExtractorManagerRun() } [Fact(Timeout = 45000)] - public async void TestRedisRestartExtractor() + public async Task TestRedisRestartExtractor() { Assert.True(_redis.IsConnected); diff --git a/ExtractorUtils.Test/unit/BinaryBufferTest.cs b/ExtractorUtils.Test/unit/BinaryBufferTest.cs index 2f19fad1..d2f0d535 100644 --- a/ExtractorUtils.Test/unit/BinaryBufferTest.cs +++ b/ExtractorUtils.Test/unit/BinaryBufferTest.cs @@ -1,5 +1,4 @@ using Cognite.Extensions; -using Cognite.Extensions.Beta; using Cognite.Extractor.Utils; using CogniteSdk; using System; @@ -30,8 +29,8 @@ public async Task TestBinaryBufferData() { Identity.Create(123), new [] {new Datapoint(t1, "321"), new Datapoint(t2, 321.321) } }, { Identity.Create("empty"), Array.Empty() }, { Identity.Create("status"), new[] { - new Datapoint(t1, 123.123, StatusCode.FromCategory(StatusCodeCategory.GoodCascade)), - new Datapoint(t2, 321.321, StatusCode.FromCategory(StatusCodeCategory.BadNoCommunication)) + new Datapoint(t1, 123.123, Cognite.Extensions.StatusCode.FromCategory(StatusCodeCategory.GoodCascade)), + new Datapoint(t2, 321.321, Cognite.Extensions.StatusCode.FromCategory(StatusCodeCategory.BadNoCommunication)) } } }; var dps2 = new Dictionary>() { diff --git a/ExtractorUtils.Test/unit/StatusCodeTests.cs b/ExtractorUtils.Test/unit/StatusCodeTests.cs index 0e41117c..61b80b46 100644 --- a/ExtractorUtils.Test/unit/StatusCodeTests.cs +++ b/ExtractorUtils.Test/unit/StatusCodeTests.cs @@ -1,4 +1,4 @@ -using Cognite.Extensions.Beta; +using Cognite.Extensions; using Xunit; namespace ExtractorUtils.Test.Unit diff --git a/ExtractorUtils/Cognite/CogniteDestination.cs b/ExtractorUtils/Cognite/CogniteDestination.cs index f361294d..7be8bad6 100644 --- a/ExtractorUtils/Cognite/CogniteDestination.cs +++ b/ExtractorUtils/Cognite/CogniteDestination.cs @@ -413,46 +413,6 @@ public async Task> InsertDataPointsAsync( token).ConfigureAwait(false); } - /// - /// BETA: Insert the provided data points into CDF. The data points are chunked - /// according to and trimmed according to the - /// CDF limits. - /// The dictionary keys are time series identities (Id or ExternalId) and the values are numeric or string data points - /// - /// On error, the offending timeseries/datapoints can optionally be removed. - /// - /// This version includes beta support for status codes. - /// - /// Data points - /// - /// - /// Cancellation token - public async Task> BetaInsertDataPointsAsync( - IDictionary>? points, - SanitationMode sanitationMode, - RetryMode retryMode, - CancellationToken token) - { - if (points == null || !points.Any()) return new CogniteResult(null); - - _logger.LogDebug("Uploading {Number} data points to CDF for {NumberTs} time series", - points.Values.Select(dp => dp.Count()).Sum(), - points.Keys.Count); - return await Extensions.Beta.DataPointExtensions.InsertAsync( - _client, - points, - _config.CdfChunking.DataPointTimeSeries, - _config.CdfChunking.DataPoints, - _config.CdfThrottling.DataPoints, - _config.CdfChunking.TimeSeries, - _config.CdfThrottling.TimeSeries, - _config.CdfChunking.DataPointsGzipLimit, - sanitationMode, - retryMode, - _config.NanReplacement, - token).ConfigureAwait(false); - } - /// /// Insert datapoints to timeseries. Insertions are chunked and cleaned according to configuration, /// and can optionally handle errors. If any timeseries missing from the result and inserted by externalId, @@ -489,44 +449,6 @@ public async Task> BetaInsertDataPointsAsync token).ConfigureAwait(false); } - /// - /// BETA: Insert datapoints to timeseries. Insertions are chunked and cleaned according to configuration, - /// and can optionally handle errors. If any timeseries missing from the result and inserted by externalId, - /// they are created before the points are inserted again. - /// - /// This version includes beta support for status codes. - /// - /// Datapoints to insert - /// How to sanitize datapoints - /// How to handle retries - /// Optional data set id - /// Cancellation token - /// Results with a list of errors. If TimeSeriesResult is null, no timeseries were attempted created. - public async Task<(CogniteResult DataPointResult, CogniteResult? TimeSeriesResult)> BetaInsertDataPointsCreateMissingAsync( - IDictionary>? points, - SanitationMode sanitationMode, - RetryMode retryMode, - long? dataSetId, - CancellationToken token) - { - if (points == null || !points.Any()) return (new CogniteResult(null), null); - - return await Extensions.Beta.DataPointExtensions.InsertAsyncCreateMissing( - _client, - points, - _config.CdfChunking.DataPointTimeSeries, - _config.CdfChunking.DataPoints, - _config.CdfThrottling.DataPoints, - _config.CdfChunking.TimeSeries, - _config.CdfThrottling.TimeSeries, - _config.CdfChunking.DataPointsGzipLimit, - sanitationMode, - retryMode, - _config.NanReplacement, - dataSetId, - token).ConfigureAwait(false); - } - /// /// Deletes ranges of data points in CDF. The parameter contains the first (inclusive) /// and last (inclusive) timestamps for the range. After the delete request is sent to CDF, attempt to confirm that diff --git a/version b/version index 6fee2fed..a6c2798a 100644 --- a/version +++ b/version @@ -1 +1 @@ -1.22.2 +1.23.0