-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Storage of historical extraction state. Includes state store for litedb and raw. State stores have shared interface and an use should be largely agnostic of which one is active (raw is likely to be a bit slower, and of course requires access to CDF). Each state store includes a generic interface for storing arbitrary data, using arbitrary extraction state classes. This is represented through the IExtractionState interface, which includes just an "last time updated" property, and a string "id". Objects stored must inherit from BaseStorableState, which contains just a string Id. Alternatively, a more specialized implementation is also included, which adds BaseExtractionState as a generic implementation of extraction state for historical data/events, and BaseExtractionStatePoco as the stored object, containing just Id and first/last timestamp. An even more complete version, HistoryExtractionState, is also included, which adds further methods for synchronization of frontfill/backfill.
- Loading branch information
Showing
34 changed files
with
2,109 additions
and
184 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 1 addition & 1 deletion
2
ExtractorUtils/Utils/CogniteTime.cs → Cognite.Common/CogniteTime.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
using System; | ||
|
||
namespace Cognite.Extractor.Common | ||
{ | ||
/// <summary> | ||
/// Represents a range of time from First to Last. First and Last are both considered | ||
/// to be included in the range. | ||
/// The legal range of values is from the unix epoch to DateTime.MaxValue. | ||
/// </summary> | ||
public sealed class TimeRange | ||
{ | ||
/// <summary> | ||
/// An empty range where first > last, and both are at the extreme range. | ||
/// Extending an empty range will return the given range | ||
/// Contracting an empty range will return the empty range | ||
/// </summary> | ||
public static readonly TimeRange Empty = new TimeRange(DateTime.MaxValue, CogniteTime.DateTimeEpoch); | ||
/// <summary> | ||
/// The full range of legal values. | ||
/// </summary> | ||
public static readonly TimeRange Complete = new TimeRange(CogniteTime.DateTimeEpoch, DateTime.MaxValue); | ||
|
||
/// <summary> | ||
/// Initialize a TimeRange object | ||
/// </summary> | ||
/// <param name="first">First point in the time range</param> | ||
/// <param name="last">Last point in the time range</param> | ||
public TimeRange(DateTime first, DateTime last) | ||
{ | ||
if (first < CogniteTime.DateTimeEpoch) | ||
first = CogniteTime.DateTimeEpoch; | ||
First = first; | ||
Last = last; | ||
} | ||
/// <summary> | ||
/// First point in the range | ||
/// </summary> | ||
public DateTime First { get; } | ||
/// <summary> | ||
/// Last point in the range | ||
/// </summary> | ||
public DateTime Last { get; } | ||
|
||
/// <summary> | ||
/// True if there are no points in the range at all (first > last). | ||
/// </summary> | ||
public bool IsEmpty | ||
{ | ||
get | ||
{ | ||
return First > Last; | ||
} | ||
} | ||
/// <summary> | ||
/// True if given datetime is inside the range | ||
/// </summary> | ||
/// <param name="t">DateTime to test</param> | ||
/// <returns></returns> | ||
public bool Contains(DateTime t) | ||
{ | ||
return t >= First && t <= Last; | ||
} | ||
/// <summary> | ||
/// True if given datetime is ahead of the range. | ||
/// </summary> | ||
/// <param name="t">Datetime to test</param> | ||
/// <returns></returns> | ||
public bool Before(DateTime t) | ||
{ | ||
return t < First; | ||
} | ||
/// <summary> | ||
/// True if given datetime is after the range | ||
/// </summary> | ||
/// <param name="t">Datetime to test</param> | ||
/// <returns></returns> | ||
public bool After(DateTime t) | ||
{ | ||
return t > Last; | ||
} | ||
/// <summary> | ||
/// Return a new TimeRange extended by the given timestamps. | ||
/// New TimeRange is the earliest start point and the latest end point. | ||
/// Letting first or last be null will keep the existing value for that end of the range. | ||
/// </summary> | ||
/// <param name="first">First point in extending range</param> | ||
/// <param name="last">Last point in extending range</param> | ||
/// <returns></returns> | ||
public TimeRange Extend(DateTime? first, DateTime? last) | ||
{ | ||
if (!first.HasValue || first >= First) | ||
first = First; | ||
if (!last.HasValue || last <= Last) | ||
last = Last; | ||
if (first != First || last != Last) | ||
return new TimeRange(first.Value, last.Value); | ||
return this; | ||
} | ||
/// <summary> | ||
/// Returns a new TimeRange extended by the given TimeRange | ||
/// New TimeRange is the earliest start point and the latest end point. | ||
/// </summary> | ||
/// <param name="newRange">Extending range</param> | ||
/// <returns></returns> | ||
public TimeRange Extend(TimeRange newRange) | ||
{ | ||
return Extend(newRange.First, newRange.Last); | ||
} | ||
/// <summary> | ||
/// Return a new TimeRange contracted by the given timestamps. | ||
/// New TimeRange is the latest start point and earliest end point. | ||
/// Letting first or last be null will keep the existing value for that end of the range. | ||
/// </summary> | ||
/// <param name="first">First point in contracting range</param> | ||
/// <param name="last">Last point in contracting range</param> | ||
/// <returns></returns> | ||
public TimeRange Contract(DateTime? first, DateTime? last) | ||
{ | ||
if (!first.HasValue || first <= First) | ||
first = First; | ||
if (!last.HasValue || last >= Last) | ||
last = Last; | ||
if (first != First || last != Last) | ||
return new TimeRange(first.Value, last.Value); | ||
return this; | ||
} | ||
/// <summary> | ||
/// Returns a new TimeRange contracted by the given TimeRange | ||
/// New TimeRange is the latest start point and the earliest end point. | ||
/// </summary> | ||
/// <param name="newRange">Extending range</param> | ||
/// <returns></returns> | ||
public TimeRange Contract(TimeRange newRange) | ||
{ | ||
return Contract(newRange.First, newRange.Last); | ||
} | ||
/// <summary> | ||
/// Returns a string representation of the TimeRange. | ||
/// </summary> | ||
/// <returns></returns> | ||
public override string ToString() | ||
{ | ||
return $"({First.ToISOString()}, {Last.ToISOString()})"; | ||
} | ||
/// <summary> | ||
/// Compares this time range with the provided object and returns | ||
/// true if they are equal | ||
/// </summary> | ||
/// <param name="obj">Object to compare</param> | ||
/// <returns>true, if equal. false otherwise</returns> | ||
public override bool Equals(object obj) | ||
{ | ||
if (ReferenceEquals(obj, this)) return true; | ||
if (obj == null) return false; | ||
return obj is TimeRange && this == (TimeRange)obj; | ||
} | ||
/// <summary> | ||
/// Returns the computed hash code for this time range using its (First, Last) tuple | ||
/// </summary> | ||
/// <returns></returns> | ||
public override int GetHashCode() | ||
{ | ||
var tuple = (First, Last); | ||
return tuple.GetHashCode(); | ||
} | ||
|
||
/// <summary> | ||
/// Returns true if the provided time ranges are equal | ||
/// </summary> | ||
/// <param name="x">time range</param> | ||
/// <param name="y"> time range</param> | ||
/// <returns>true, if equal. false otherwise</returns> | ||
public static bool operator ==(TimeRange x, TimeRange y) | ||
{ | ||
if (x is null) | ||
{ | ||
return y is null; | ||
} | ||
if (y is null) return false; | ||
return x.First == y.First && x.Last == y.Last; | ||
} | ||
|
||
/// <summary> | ||
/// Returns true if the provided time ranges are different | ||
/// </summary> | ||
/// <param name="x">time range</param> | ||
/// <param name="y"> time range</param> | ||
/// <returns>true, if different. false otherwise</returns> | ||
public static bool operator !=(TimeRange x, TimeRange y) | ||
{ | ||
return !(x == y); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
using Cognite.Extractor.Common; | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Text; | ||
|
||
namespace Cognite.Extractor.StateStorage | ||
{ | ||
/// <summary> | ||
/// Represents a minimal extraction state implementation | ||
/// </summary> | ||
public class BaseExtractionState : IExtractionState | ||
{ | ||
/// <summary> | ||
/// Mutex used for safely modifying ranges | ||
/// </summary> | ||
protected readonly object _mutex = new object(); | ||
/// <summary> | ||
/// True if this state has been properly initialized. | ||
/// If false, methods to update from source system will fail. | ||
/// </summary> | ||
public bool Initialized { get; protected set; } | ||
/// <summary> | ||
/// Unique id for extracted object | ||
/// </summary> | ||
public string Id { get; } | ||
/// <summary> | ||
/// Range of data pushed to destination(s) | ||
/// </summary> | ||
public TimeRange DestinationExtractedRange | ||
{ | ||
get => _destinationExtractedRange; | ||
protected set | ||
{ | ||
if (value == _destinationExtractedRange) return; | ||
LastTimeModified = DateTime.UtcNow; | ||
_destinationExtractedRange = value; | ||
} | ||
} | ||
private TimeRange _destinationExtractedRange; | ||
/// <summary> | ||
/// Last time the destination range was modified. | ||
/// </summary> | ||
public DateTime? LastTimeModified { get; protected set; } | ||
|
||
/// <summary> | ||
/// Constructor | ||
/// </summary> | ||
/// <param name="id">Id of state</param> | ||
public BaseExtractionState(string id) | ||
{ | ||
Id = id; | ||
DestinationExtractedRange = TimeRange.Empty; | ||
} | ||
|
||
/// <summary> | ||
/// Called when initializing extracted range from destinations and state storage. | ||
/// This will always contract the believed range. | ||
/// </summary> | ||
/// <param name="first"></param> | ||
/// <param name="last"></param> | ||
public virtual void InitExtractedRange(DateTime first, DateTime last) | ||
{ | ||
if (Initialized) throw new InvalidOperationException("Extracted state is already initialized"); | ||
lock (_mutex) | ||
{ | ||
DestinationExtractedRange = new TimeRange(first, last); | ||
Initialized = true; | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Update the state with first and last points successfully pushed to destination(s). | ||
/// </summary> | ||
/// <param name="first">Earliest timestamp in successful push to destination(s)</param> | ||
/// <param name="last">Latest timestamp in successful push to destination(s)</param> | ||
public virtual void UpdateDestinationRange(DateTime first, DateTime last) | ||
{ | ||
if (!Initialized) throw new InvalidOperationException("Extracted state not initialized"); | ||
lock (_mutex) | ||
{ | ||
DestinationExtractedRange = DestinationExtractedRange.Extend(first, last); | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
using System; | ||
|
||
namespace Cognite.Extractor.StateStorage | ||
{ | ||
/// <summary> | ||
/// Represents an historical object in the destination system | ||
/// </summary> | ||
public class BaseExtractionStatePoco : BaseStorableState | ||
{ | ||
/// <summary> | ||
/// Earliest known extracted timestamp | ||
/// </summary> | ||
[StateStoreProperty("first")] | ||
public DateTime FirstTimestamp { get; set; } | ||
|
||
/// <summary> | ||
/// Last known extracted timestamp | ||
/// </summary> | ||
[StateStoreProperty("last")] | ||
public DateTime LastTimestamp { get; set; } | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
using LiteDB; | ||
|
||
namespace Cognite.Extractor.StateStorage | ||
{ | ||
/// <summary> | ||
/// Minimal state that may be stored to a state-storage system. | ||
/// </summary> | ||
public class BaseStorableState | ||
{ | ||
/// <summary> | ||
/// Unique identifier for the state in the destination storage | ||
/// </summary> | ||
[BsonId] | ||
[StateStoreProperty("id")] | ||
public string Id { get; set; } | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<TargetFramework>netstandard2.0</TargetFramework> | ||
<GenerateDocumentationFile>true</GenerateDocumentationFile> | ||
<PackageId>Cognite.Extractor.StateStorage</PackageId> | ||
<Author>Cognite AS</Author> | ||
<Company>Cognite AS</Company> | ||
<Copyright>Cognite AS</Copyright> | ||
<PackageLicenseFile>LICENSE</PackageLicenseFile> | ||
<RootNamespace>Cognite.Extractor.StateStorage</RootNamespace> | ||
<Description> | ||
A library containing state storage utilities for Cognite extractors | ||
</Description> | ||
</PropertyGroup> | ||
<ItemGroup> | ||
<None Include="..\LICENSE" Pack="true" Visible="false" PackagePath="" /> | ||
</ItemGroup> | ||
<ItemGroup> | ||
<PackageReference Include="LiteDB" Version="5.0.8" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\Cognite.Logging\Cognite.Logging.csproj" /> | ||
<ProjectReference Include="..\Cognite.Common\Cognite.Common.csproj" /> | ||
<ProjectReference Include="..\Cognite.Metrics\Cognite.Metrics.csproj" /> | ||
</ItemGroup> | ||
</Project> |
Oops, something went wrong.