diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 18f35ff91b1..f551a650d3b 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,32 @@ +#### 1.3.7 April 17 2018 #### +**Maintenance Release for Akka.NET 1.3** + +Akka.NET v1.3.7 is a minor patch consisting mostly of bug fixes. + +**DotNetty stabilization** +We've had a number of issues related to DotNetty issues over recent weeks, and we've resolved those in this patch by doing the following: + +* [Locking down the version of DotNetty to v0.4.6 until further notice](https://github.com/akkadotnet/akka.net/pull/3410) +* [Resolving memory leaks introduced with DotNetty in v1.3.6](https://github.com/akkadotnet/akka.net/pull/3436) + +We will be upgrading to DotNetty v0.4.8 in a near future release, but in the meantime this patch fixes critical issues introduced in v1.3.6. + +**Bugfixes** +1. [Akka.Persistence.Sql: Slow reading of big snapshots](https://github.com/akkadotnet/akka.net/issues/3422) - this will require a recompilation of all Akka.Persistence.Sql-type Akka.Persistence plugins. +2. [Akka.Fsharp: spawning an actor results in Exception in 1.3.6 release](https://github.com/akkadotnet/akka.net/issues/3402) + +See [the full list of fixes for Akka.NET v1.3.7 here](https://github.com/akkadotnet/akka.net/milestone/25). + +| COMMITS | LOC+ | LOC- | AUTHOR | +| --- | --- | --- | --- | +| 5 | 130 | 180 | Aaron Stannard | +| 3 | 7 | 1 | chrisjhoare | +| 2 | 3 | 1 | ivog | +| 1 | 70 | 17 | TietoOliverKurowski | +| 1 | 41 | 4 | Bart de Boer | +| 1 | 11 | 3 | Oleksandr Bogomaz | +| 1 | 1 | 1 | Vasily Kirichenko | + #### 1.3.6 April 17 2018 #### **Maintenance Release for Akka.NET 1.3** diff --git a/build.fsx b/build.fsx index b17ebf09ee1..7df38d416f0 100644 --- a/build.fsx +++ b/build.fsx @@ -282,9 +282,17 @@ Target "NBench" <| fun _ -> info.FileName <- nbenchTestPath info.WorkingDirectory <- (Path.GetDirectoryName (FullName nbenchTestPath)) info.Arguments <- args) (System.TimeSpan.FromMinutes 45.0) (* Reasonably long-running task. *) - if result <> 0 then failwithf "NBench.Runner failed. %s %s" nbenchTestPath args - - nbenchTestAssemblies |> Seq.iter runNBench + if result <> 0 then failwithf "%s %s \nexited with code %i" nbenchTestPath args result + + let failedRuns = + nbenchTestAssemblies + |> Seq.map (fun asm -> try runNBench asm; None with e -> Some(e.ToString())) + |> Seq.filter Option.isSome + |> Seq.map Option.get + |> Seq.mapi (fun i s -> sprintf "%i: \"%s\"" (i + 1) s) + |> Seq.toArray + if failedRuns.Length > 0 then + failwithf "NBench.Runner failed for %i run(s):\n%s\n\n" failedRuns.Length (String.concat "\n\n" failedRuns) //-------------------------------------------------------------------------------- // Nuget targets diff --git a/buildIncremental.fsx b/buildIncremental.fsx index 8aa5040c406..c28db9b3d15 100644 --- a/buildIncremental.fsx +++ b/buildIncremental.fsx @@ -42,6 +42,7 @@ module IncrementalTests = let getUnitTestProjects runtime = let allTestProjects = !! "./**/core/**/*.Tests.csproj" + ++ "./**/core/**/*.Tests.fsproj" ++ "./**/contrib/**/*.Tests.csproj" -- "./**/serializers/**/*Wire*.csproj" allTestProjects diff --git a/src/common.props b/src/common.props index 49222c45ff9..9a5db7a9b63 100644 --- a/src/common.props +++ b/src/common.props @@ -2,7 +2,7 @@ Copyright © 2013-2017 Akka.NET Team Akka.NET Team - 1.3.6 + 1.3.7 http://getakka.net/images/akkalogo.png https://github.com/akkadotnet/akka.net https://github.com/akkadotnet/akka.net/blob/master/LICENSE @@ -18,37 +18,24 @@ Maintenance Release for Akka.NET 1.3** -Akka.NET v1.3.6 is a minor patch consisting mostly of bug fixes. -Akka.FSharp on .NET Standard** -The biggest change in this release is [the availability of Akka.FSharp on .NET Standard and .NET Core](https://github.com/akkadotnet/akka.net/issues/2826)! -Akka.FSharp runs on .NET Standard 2.0 as of 1.3.6 (it doesn't support .NET Standard 1.6 like the rest of Akka.NET due to FSharp-specific, downstream dependencies.) -Updates and Additions** -1. [Akka.Streams: Port 4 "streams contrib" stages - AccumulateWhileUnchanged, LastElement, PartitionWith, Sample](https://github.com/akkadotnet/akka.net/pull/3375) -2. [Akka.Remote: Add `public-port` setting to allow for port aliasing inside environments like Docker, PCF](https://github.com/akkadotnet/akka.net/issues/3357) +Akka.NET v1.3.7 is a minor patch consisting mostly of bug fixes. +DotNetty stabilization** +We've had a number of issues related to DotNetty issues over recent weeks, and we've resolved those in this patch by doing the following: +[Locking down the version of DotNetty to v0.4.6 until further notice](https://github.com/akkadotnet/akka.net/pull/3410) +[Resolving memory leaks introduced with DotNetty in v1.3.6](https://github.com/akkadotnet/akka.net/pull/3436) +We will be upgrading to DotNetty v0.4.8 in a near future release, but in the meantime this patch fixes critical issues introduced in v1.3.6. Bugfixes** -1. [Akka.Cluster.Sharding: Removing string.GetHashCode usage from distributed classes](https://github.com/akkadotnet/akka.net/pull/3363) -2. [Akka.Cluster.Sharding: HashCodeMessageExtractor can create inconsistent ShardId's](https://github.com/akkadotnet/akka.net/issues/3361) -3. [Akka.Remote: -Error while decoding incoming Akka PDU Exception when communicating between Remote Actors with a large number of messages on Linux](https://github.com/akkadotnet/akka.net/issues/3370) -4. [Akka.Cluster.Sharding: DData: Cannot create a shard proxy on a cluster node that is not in the same role as the proxied shard entity](https://github.com/akkadotnet/akka.net/issues/3352) -5. [Akka.Streams: Fix GroupedWithin allocation of new buffer after emit.](https://github.com/akkadotnet/akka.net/pull/3382) -6. [Akka.Persistence: Add missing ReturnRecoveryPermit](https://github.com/akkadotnet/akka.net/pull/3372) -You can see [the full set of changes for Akka.NET v1.3.6 here](hhttps://github.com/akkadotnet/akka.net/milestone/24). +1. [Akka.Persistence.Sql: Slow reading of big snapshots](https://github.com/akkadotnet/akka.net/issues/3422) - this will require a recompilation of all Akka.Persistence.Sql-type Akka.Persistence plugins. +2. [Akka.Fsharp: spawning an actor results in Exception in 1.3.6 release](https://github.com/akkadotnet/akka.net/issues/3402) +See [the full list of fixes for Akka.NET v1.3.7 here](https://github.com/akkadotnet/akka.net/milestone/25). | COMMITS | LOC+ | LOC- | AUTHOR | | --- | --- | --- | --- | -| 7 | 261 | 38 | Aaron Stannard | -| 6 | 28 | 28 | cimryan | -| 5 | 53 | 20 | Tomasz Jaskula | -| 2 | 7 | 4 | Ondrej Pialek | -| 2 | 20 | 10 | Ismael Hamed | -| 1 | 739 | 0 | Oleksandr Bogomaz | -| 1 | 64 | 6 | Robert | -| 1 | 23 | 29 | nathvi | -| 1 | 2 | 1 | Sebastien Bacquet | -| 1 | 1 | 2 | Ondřej Piálek | -| 1 | 1 | 1 | Steffen Skov | -| 1 | 1 | 1 | Sean Gilliam | -| 1 | 1 | 1 | Matthew Herman | -| 1 | 1 | 1 | Jan Pluskal | +| 5 | 130 | 180 | Aaron Stannard | +| 3 | 7 | 1 | chrisjhoare | +| 2 | 3 | 1 | ivog | +| 1 | 70 | 17 | TietoOliverKurowski | +| 1 | 41 | 4 | Bart de Boer | +| 1 | 11 | 3 | Oleksandr Bogomaz | +| 1 | 1 | 1 | Vasily Kirichenko | \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs index df06421954e..4b6fa1ab258 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs @@ -38,8 +38,8 @@ public interface IJournalQueryExecutor Task> SelectAllPersistenceIdsAsync(DbConnection connection, CancellationToken cancellationToken); /// - /// Asynchronously replays a on all selected events for provided - /// , within boundaries of + /// Asynchronously replays a on all selected events for provided + /// , within boundaries of /// and up to number of events. /// /// TBD @@ -53,8 +53,8 @@ public interface IJournalQueryExecutor Task SelectByPersistenceIdAsync(DbConnection connection, CancellationToken cancellationToken, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action callback); /// - /// Asynchronously replays on all selected events, which have been tagged using - /// provided , within boundaries of and + /// Asynchronously replays on all selected events, which have been tagged using + /// provided , within boundaries of and /// , up to number of elements. /// Returns highest sequence number from selected events. /// @@ -171,6 +171,11 @@ public class QueryConfiguration /// public string DefaultSerializer { get; } + /// + /// Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance improvement for reading large BLOBS. + /// + public bool UseSequentialAccess { get; } + /// /// TBD /// @@ -188,6 +193,7 @@ public class QueryConfiguration /// TBD /// TBD /// The default serializer used when not type override matching is found + /// Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance improvement for reading large BLOBS. public QueryConfiguration( string schemaName, string journalEventsTableName, @@ -202,7 +208,8 @@ public QueryConfiguration( string orderingColumnName, string serializerIdColumnName, TimeSpan timeout, - string defaultSerializer) + string defaultSerializer, + bool useSequentialAccess) { SchemaName = schemaName; JournalEventsTableName = journalEventsTableName; @@ -218,6 +225,7 @@ public QueryConfiguration( OrderingColumnName = orderingColumnName; DefaultSerializer = defaultSerializer; SerializerIdColumnName = serializerIdColumnName; + UseSequentialAccess = useSequentialAccess; } /// @@ -235,7 +243,7 @@ public QueryConfiguration( /// public abstract class AbstractQueryExecutor : IJournalQueryExecutor { - // indexes of particular fields returned from all events queries + // indexes of particular fields returned from all events queries // they must match `allEventColumnNames` order /// /// TBD @@ -461,7 +469,18 @@ public virtual async Task SelectByPersistenceIdAsync(DbConnection connection, Ca AddParameter(command, "@FromSequenceNr", DbType.Int64, fromSequenceNr); AddParameter(command, "@ToSequenceNr", DbType.Int64, toSequenceNr); - using (var reader = await command.ExecuteReaderAsync(cancellationToken)) + CommandBehavior commandBehavior; + + if (Configuration.UseSequentialAccess) + { + commandBehavior = CommandBehavior.SequentialAccess; + } + else + { + commandBehavior = CommandBehavior.Default; + } + + using (var reader = await command.ExecuteReaderAsync(commandBehavior, cancellationToken)) { var i = 0L; while ((i++) < max && await reader.ReadAsync(cancellationToken)) @@ -494,7 +513,18 @@ public virtual async Task SelectByTagAsync(DbConnection connection, Cancel AddParameter(command, "@Ordering", DbType.Int64, fromOffset); AddParameter(command, "@Take", DbType.Int64, take); - using (var reader = await command.ExecuteReaderAsync(cancellationToken)) + CommandBehavior commandBehavior; + + if (Configuration.UseSequentialAccess) + { + commandBehavior = CommandBehavior.SequentialAccess; + } + else + { + commandBehavior = CommandBehavior.Default; + } + + using (var reader = await command.ExecuteReaderAsync(commandBehavior, cancellationToken)) { var maxSequenceNr = 0L; while (await reader.ReadAsync(cancellationToken)) diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryExecutor.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryExecutor.cs index 2a1f2bb4d08..d64ec8c1f9f 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryExecutor.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryExecutor.cs @@ -16,7 +16,7 @@ namespace Akka.Persistence.Sql.Common.Snapshot { /// - /// Flattened and serialized snapshot object used as intermediate representation + /// Flattened and serialized snapshot object used as intermediate representation /// before saving snapshot with metadata inside SQL Server database. /// public class SnapshotEntry @@ -112,6 +112,11 @@ public class QueryConfiguration /// public readonly string DefaultSerializer; + /// + /// Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance improvement for reading large BLOBS. + /// + public bool UseSequentialAccess { get; } + /// /// TBD /// @@ -125,6 +130,7 @@ public class QueryConfiguration /// TBD /// TBD /// The default serializer used when not type override matching is found + /// Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance improvement for reading large BLOBS. public QueryConfiguration( string schemaName, string snapshotTableName, @@ -134,8 +140,9 @@ public QueryConfiguration( string manifestColumnName, string timestampColumnName, string serializerIdColumnName, - TimeSpan timeout, - string defaultSerializer) + TimeSpan timeout, + string defaultSerializer, + bool useSequentialAccess) { SchemaName = schemaName; SnapshotTableName = snapshotTableName; @@ -147,6 +154,7 @@ public QueryConfiguration( SerializerIdColumnName = serializerIdColumnName; Timeout = timeout; DefaultSerializer = defaultSerializer; + UseSequentialAccess = useSequentialAccess; } /// @@ -166,7 +174,7 @@ public interface ISnapshotQueryExecutor QueryConfiguration Configuration { get; } /// - /// Deletes a single snapshot identified by it's persistent actor's , + /// Deletes a single snapshot identified by it's persistent actor's , /// and . /// /// TBD @@ -178,7 +186,7 @@ public interface ISnapshotQueryExecutor Task DeleteAsync(DbConnection connection, CancellationToken cancellationToken, string persistenceId, long sequenceNr, DateTime? timestamp); /// - /// Deletes all snapshot matching persistent actor's as well as + /// Deletes all snapshot matching persistent actor's as well as /// upper (inclusive) bounds of the both and . /// /// TBD @@ -466,7 +474,19 @@ public virtual async Task SelectSnapshotAsync(DbConnection con SetPersistenceIdParameter(persistenceId, command); SetSequenceNrParameter(maxSequenceNr, command); SetTimestampParameter(maxTimestamp, command); - using (var reader = await command.ExecuteReaderAsync(cancellationToken)) + + CommandBehavior commandBehavior; + + if (Configuration.UseSequentialAccess) + { + commandBehavior = CommandBehavior.SequentialAccess; + } + else + { + commandBehavior = CommandBehavior.Default; + } + + using (var reader = await command.ExecuteReaderAsync(commandBehavior, cancellationToken)) { if (await reader.ReadAsync(cancellationToken)) { diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite/Journal/BatchingSqliteJournal.cs b/src/contrib/persistence/Akka.Persistence.Sqlite/Journal/BatchingSqliteJournal.cs index 6e83dd328f0..3fd3080d8f9 100644 --- a/src/contrib/persistence/Akka.Persistence.Sqlite/Journal/BatchingSqliteJournal.cs +++ b/src/contrib/persistence/Akka.Persistence.Sqlite/Journal/BatchingSqliteJournal.cs @@ -40,7 +40,8 @@ public sealed class BatchingSqliteJournalSetup : BatchingSqlJournalSetup orderingColumnName: "ordering", serializerIdColumnName: "serializer_id", timeout: config.GetTimeSpan("connection-timeout"), - defaultSerializer: config.GetString("serializer"))) + defaultSerializer: config.GetString("serializer"), + useSequentialAccess: config.GetBoolean("use-sequential-access"))) { } diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite/Journal/SqliteJournal.cs b/src/contrib/persistence/Akka.Persistence.Sqlite/Journal/SqliteJournal.cs index ff2b9746a01..80ec331ae9c 100644 --- a/src/contrib/persistence/Akka.Persistence.Sqlite/Journal/SqliteJournal.cs +++ b/src/contrib/persistence/Akka.Persistence.Sqlite/Journal/SqliteJournal.cs @@ -43,7 +43,8 @@ public SqliteJournal(Config journalConfig) : base(journalConfig.WithFallback(Ext orderingColumnName: "ordering", serializerIdColumnName: "serializer_id", timeout: config.GetTimeSpan("connection-timeout"), - defaultSerializer: config.GetString("serializer")), + defaultSerializer: config.GetString("serializer"), + useSequentialAccess: config.GetBoolean("use-sequential-access")), Context.System.Serialization, GetTimestampProvider(config.GetString("timestamp-provider"))); } diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite/Snapshot/SqliteSnapshotStore.cs b/src/contrib/persistence/Akka.Persistence.Sqlite/Snapshot/SqliteSnapshotStore.cs index a9a705936fb..e817cf1c92b 100644 --- a/src/contrib/persistence/Akka.Persistence.Sqlite/Snapshot/SqliteSnapshotStore.cs +++ b/src/contrib/persistence/Akka.Persistence.Sqlite/Snapshot/SqliteSnapshotStore.cs @@ -121,7 +121,8 @@ public SqliteSnapshotStore(Config snapshotConfig) : base(snapshotConfig) timestampColumnName: "created_at", serializerIdColumnName: "serializer_id", timeout: config.GetTimeSpan("connection-timeout"), - defaultSerializer: config.GetString("serializer")), + defaultSerializer: config.GetString("serializer"), + useSequentialAccess: config.GetBoolean("use-sequential-access")), Context.System.Serialization); } diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index c23a80dddb0..3409ea26761 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -883,7 +883,8 @@ namespace Akka.Actor public static System.Threading.Tasks.Task Ask(this Akka.Actor.ICanTell self, object message, System.Nullable timeout, System.Threading.CancellationToken cancellationToken) { } public static System.Threading.Tasks.Task Ask(this Akka.Actor.ICanTell self, object message, System.Nullable timeout = null) { } public static System.Threading.Tasks.Task Ask(this Akka.Actor.ICanTell self, object message, System.Threading.CancellationToken cancellationToken) { } - public static async System.Threading.Tasks.Task Ask(this Akka.Actor.ICanTell self, object message, System.Nullable timeout, System.Threading.CancellationToken cancellationToken) { } + public static System.Threading.Tasks.Task Ask(this Akka.Actor.ICanTell self, object message, System.Nullable timeout, System.Threading.CancellationToken cancellationToken) { } + public static async System.Threading.Tasks.Task Ask(this Akka.Actor.ICanTell self, System.Func messageFactory, System.Nullable timeout, System.Threading.CancellationToken cancellationToken) { } } public class static GracefulStopSupport { diff --git a/src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj b/src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj index df7df36a6f3..40983dac09a 100644 --- a/src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj +++ b/src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj @@ -5,6 +5,7 @@ Akka.FSharp.Tests net452;netstandard2.0 false + @@ -23,6 +24,7 @@ + diff --git a/src/core/Akka.FSharp/Akka.FSharp.fsproj b/src/core/Akka.FSharp/Akka.FSharp.fsproj index a7da616afac..4e6a63bbda5 100644 --- a/src/core/Akka.FSharp/Akka.FSharp.fsproj +++ b/src/core/Akka.FSharp/Akka.FSharp.fsproj @@ -7,6 +7,7 @@ net45;netstandard2.0 $(AkkaPackageTags);F#;fsharp true + @@ -19,8 +20,9 @@ - + + diff --git a/src/core/Akka.FSharp/FsApi.fs b/src/core/Akka.FSharp/FsApi.fs index e40f24e3731..4829142dc19 100644 --- a/src/core/Akka.FSharp/FsApi.fs +++ b/src/core/Akka.FSharp/FsApi.fs @@ -380,7 +380,7 @@ module Linq = | _ -> failwith "Doesn't match" type Expression = - static member ToExpression(f : System.Linq.Expressions.Expression>>) = toExpression> f + static member ToExpression(f : System.Linq.Expressions.Expression>>) = f static member ToExpression<'Actor>(f : Quotations.Expr<(unit -> 'Actor)>) = toExpression<'Actor> (QuotationEvaluator.ToLinqExpression f) [] diff --git a/src/core/Akka.Persistence.FSharp/Akka.Persistence.FSharp.fsproj b/src/core/Akka.Persistence.FSharp/Akka.Persistence.FSharp.fsproj index 35647a0d79a..93c77474a61 100644 --- a/src/core/Akka.Persistence.FSharp/Akka.Persistence.FSharp.fsproj +++ b/src/core/Akka.Persistence.FSharp/Akka.Persistence.FSharp.fsproj @@ -18,6 +18,9 @@ + + + $(DefineConstants);RELEASE diff --git a/src/core/Akka.Persistence.Tests/SnapshotFailureRobustnessSpec.cs b/src/core/Akka.Persistence.Tests/SnapshotFailureRobustnessSpec.cs index 58f2f40b139..c418a4ff9ac 100644 --- a/src/core/Akka.Persistence.Tests/SnapshotFailureRobustnessSpec.cs +++ b/src/core/Akka.Persistence.Tests/SnapshotFailureRobustnessSpec.cs @@ -69,13 +69,12 @@ protected override bool ReceiveRecover(object message) protected override bool ReceiveCommand(object message) { - if (message is Cmd) + if (message is Cmd cmd) { - var cmd = (Cmd) message; Persist(cmd.Payload, _ => SaveSnapshot(cmd.Payload)); } - else if (message is SaveSnapshotSuccess) - _probe.Tell(((SaveSnapshotSuccess)message).Metadata.SequenceNr); + else if (message is SaveSnapshotSuccess success) + _probe.Tell(success.Metadata.SequenceNr); else _probe.Tell(message); return true; diff --git a/src/core/Akka.Persistence/Snapshot/LocalSnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/LocalSnapshotStore.cs index 84f6ff715aa..e394c5200c0 100644 --- a/src/core/Akka.Persistence/Snapshot/LocalSnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/LocalSnapshotStore.cs @@ -18,8 +18,12 @@ namespace Akka.Persistence.Snapshot { /// - /// TBD + /// Local file-based implementation. /// + /// + /// This is the default `akka.peristence.snapshot-store` implementation, when no others are + /// explicitly set via HOCON configuration. + /// public class LocalSnapshotStore : SnapshotStore { private static readonly Regex FilenameRegex = new Regex(@"^snapshot-(.+)-(\d+)-(\d+)", RegexOptions.Compiled); @@ -31,10 +35,10 @@ public class LocalSnapshotStore : SnapshotStore private readonly Akka.Serialization.Serialization _serialization; - private string _defaultSerializer; + private readonly string _defaultSerializer; /// - /// TBD + /// Creates a new instance. /// public LocalSnapshotStore() { @@ -51,14 +55,9 @@ public LocalSnapshotStore() _log = Context.GetLogger(); } - private ILoggingAdapter _log; + private readonly ILoggingAdapter _log; - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD + /// protected override Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) { // @@ -72,12 +71,7 @@ protected override Task LoadAsync(string persistenceId, Snapsh return RunWithStreamDispatcher(() => Load(metadata)); } - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD + /// protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot) { _saving.Add(metadata); @@ -88,11 +82,7 @@ protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot) }); } - /// - /// TBD - /// - /// TBD - /// TBD + /// protected override Task DeleteAsync(SnapshotMetadata metadata) { _saving.Remove(metadata); @@ -109,12 +99,7 @@ protected override Task DeleteAsync(SnapshotMetadata metadata) }); } - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD + /// protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) { foreach (var metadata in GetSnapshotMetadata(persistenceId, criteria)) @@ -123,16 +108,12 @@ protected override async Task DeleteAsync(string persistenceId, SnapshotSelectio } } - /// - /// TBD - /// - /// TBD - /// TBD + /// protected override bool ReceivePluginInternal(object message) { - if (message is SaveSnapshotSuccess) + if (message is SaveSnapshotSuccess success) { - _saving.Remove(((SaveSnapshotSuccess) message).Metadata); + _saving.Remove(success.Metadata); } else if (message is SaveSnapshotFailure) { diff --git a/src/core/Akka.Persistence/SnapshotProtocol.cs b/src/core/Akka.Persistence/SnapshotProtocol.cs index c73aff46163..68e654ec03a 100644 --- a/src/core/Akka.Persistence/SnapshotProtocol.cs +++ b/src/core/Akka.Persistence/SnapshotProtocol.cs @@ -27,7 +27,7 @@ public interface ISnapshotRequest : ISnapshotMessage { } public interface ISnapshotResponse : ISnapshotMessage { } /// - /// TBD + /// Metadata for all persisted snapshot records. /// [Serializable] public sealed class SnapshotMetadata : IEquatable diff --git a/src/core/Akka.Remote/Akka.Remote.csproj b/src/core/Akka.Remote/Akka.Remote.csproj index c1c8654088d..1ba1278e192 100644 --- a/src/core/Akka.Remote/Akka.Remote.csproj +++ b/src/core/Akka.Remote/Akka.Remote.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs index 1cdc39d944b..129a5e675ad 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs @@ -125,8 +125,8 @@ internal abstract class DotNettyTransport : Transport protected volatile Address LocalAddress; protected internal volatile IChannel ServerChannel; - private readonly IEventLoopGroup serverEventLoopGroup; - private readonly IEventLoopGroup clientEventLoopGroup; + private readonly IEventLoopGroup _serverEventLoopGroup; + private readonly IEventLoopGroup _clientEventLoopGroup; protected DotNettyTransport(ActorSystem system, Config config) { @@ -141,8 +141,8 @@ protected DotNettyTransport(ActorSystem system, Config config) Settings = DotNettyTransportSettings.Create(config); Log = Logging.GetLogger(System, GetType()); - serverEventLoopGroup = new MultithreadEventLoopGroup(Settings.ServerSocketWorkerPoolSize); - clientEventLoopGroup = new MultithreadEventLoopGroup(Settings.ClientSocketWorkerPoolSize); + _serverEventLoopGroup = new MultithreadEventLoopGroup(Settings.ServerSocketWorkerPoolSize); + _clientEventLoopGroup = new MultithreadEventLoopGroup(Settings.ClientSocketWorkerPoolSize); ConnectionGroup = new ConcurrentSet(); AssociationListenerPromise = new TaskCompletionSource(); @@ -161,8 +161,7 @@ protected async Task NewServer(EndPoint listenAddress) if (InternalTransport != TransportMode.Tcp) throw new NotImplementedException("Haven't implemented UDP transport at this time"); - var dns = listenAddress as DnsEndPoint; - if (dns != null) + if (listenAddress is DnsEndPoint dns) { listenAddress = await DnsToIPEndpoint(dns).ConfigureAwait(false); } @@ -256,8 +255,8 @@ public override async Task Shutdown() // free all of the connection objects we were holding onto ConnectionGroup.Clear(); #pragma warning disable 4014 // shutting down the worker groups can take up to 10 seconds each. Let that happen asnychronously. - clientEventLoopGroup.ShutdownGracefullyAsync(); - serverEventLoopGroup.ShutdownGracefullyAsync(); + _clientEventLoopGroup.ShutdownGracefullyAsync(); + _serverEventLoopGroup.ShutdownGracefullyAsync(); #pragma warning restore 4014 } } @@ -270,13 +269,13 @@ protected Bootstrap ClientFactory(Address remoteAddress) var addressFamily = Settings.DnsUseIpv6 ? AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork; var client = new Bootstrap() - .Group(clientEventLoopGroup) + .Group(_clientEventLoopGroup) .Option(ChannelOption.SoReuseaddr, Settings.TcpReuseAddr) .Option(ChannelOption.SoKeepalive, Settings.TcpKeepAlive) .Option(ChannelOption.TcpNodelay, Settings.TcpNoDelay) .Option(ChannelOption.ConnectTimeout, Settings.ConnectTimeout) .Option(ChannelOption.AutoRead, false) - .Option(ChannelOption.Allocator, Settings.EnableBufferPooling ? (IByteBufferAllocator)new PooledByteBufferAllocator() : new UnpooledByteBufferAllocator()) + .Option(ChannelOption.Allocator, Settings.EnableBufferPooling ? (IByteBufferAllocator)PooledByteBufferAllocator.Default : UnpooledByteBufferAllocator.Default) .ChannelFactory(() => Settings.EnforceIpFamily ? new TcpSocketChannel(addressFamily) : new TcpSocketChannel()) @@ -379,13 +378,13 @@ private ServerBootstrap ServerFactory() var addressFamily = Settings.DnsUseIpv6 ? AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork; var server = new ServerBootstrap() - .Group(serverEventLoopGroup) + .Group(_serverEventLoopGroup) .Option(ChannelOption.SoReuseaddr, Settings.TcpReuseAddr) .Option(ChannelOption.SoKeepalive, Settings.TcpKeepAlive) .Option(ChannelOption.TcpNodelay, Settings.TcpNoDelay) .Option(ChannelOption.AutoRead, false) .Option(ChannelOption.SoBacklog, Settings.Backlog) - .Option(ChannelOption.Allocator, Settings.EnableBufferPooling ? (IByteBufferAllocator)new PooledByteBufferAllocator() : new UnpooledByteBufferAllocator()) + .Option(ChannelOption.Allocator, Settings.EnableBufferPooling ? (IByteBufferAllocator)PooledByteBufferAllocator.Default : UnpooledByteBufferAllocator.Default) .ChannelFactory(() => Settings.EnforceIpFamily ? new TcpServerSocketChannel(addressFamily) : new TcpServerSocketChannel()) diff --git a/src/core/Akka.Tests/Actor/AskSpec.cs b/src/core/Akka.Tests/Actor/AskSpec.cs index 8a4257b0ad4..581ace3c468 100644 --- a/src/core/Akka.Tests/Actor/AskSpec.cs +++ b/src/core/Akka.Tests/Actor/AskSpec.cs @@ -11,6 +11,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using Akka.Util.Internal; using Nito.AsyncEx; namespace Akka.Tests.Actor @@ -71,6 +72,23 @@ protected override void OnReceive(object message) } } + public class ReplyToActor : UntypedActor + { + protected override void OnReceive(object message) + { + var requester = message.AsInstanceOf(); + requester.Tell("i_hear_ya"); + } + } + + [Fact] + public async Task Can_Ask_Response_actor() + { + var actor = Sys.ActorOf(); + var res = await actor.Ask( sender => sender, null, CancellationToken.None); + res.ShouldBe("i_hear_ya"); + } + [Fact] public async Task Can_Ask_actor() { diff --git a/src/core/Akka/Actor/Futures.cs b/src/core/Akka/Actor/Futures.cs index d57172d3415..81d8002a301 100644 --- a/src/core/Akka/Actor/Futures.cs +++ b/src/core/Akka/Actor/Futures.cs @@ -100,7 +100,24 @@ public static Task Ask(this ICanTell self, object message, CancellationTok /// This exception is thrown if the system can't resolve the target provider. /// /// TBD - public static async Task Ask(this ICanTell self, object message, TimeSpan? timeout, CancellationToken cancellationToken) + public static Task Ask(this ICanTell self, object message, TimeSpan? timeout, CancellationToken cancellationToken) + { + return Ask(self, _ => message, timeout, cancellationToken); + } + + /// + /// TBD + /// + /// TBD + /// TBD + /// Factory method that creates a message that can encapsulate the 'Sender' IActorRef + /// TBD + /// TBD + /// + /// This exception is thrown if the system can't resolve the target provider. + /// + /// TBD + public static async Task Ask(this ICanTell self, Func messageFactory, TimeSpan? timeout, CancellationToken cancellationToken) { await SynchronizationContextManager.RemoveContext; @@ -108,7 +125,7 @@ public static async Task Ask(this ICanTell self, object message, TimeSpan? if (provider == null) throw new ArgumentException("Unable to resolve the target Provider", nameof(self)); - return (T) await Ask(self, message, provider, timeout, cancellationToken); + return (T)await Ask(self, messageFactory, provider, timeout, cancellationToken); } /// @@ -134,7 +151,7 @@ internal static IActorRefProvider ResolveProvider(ICanTell self) private static readonly bool isRunContinuationsAsynchronouslyAvailable = Enum.IsDefined(typeof(TaskCreationOptions), RunContinuationsAsynchronously); - private static async Task Ask(ICanTell self, object message, IActorRefProvider provider, + private static async Task Ask(ICanTell self, Func messageFactory, IActorRefProvider provider, TimeSpan? timeout, CancellationToken cancellationToken) { TaskCompletionSource result; @@ -174,6 +191,7 @@ private static async Task Ask(ICanTell self, object message, IActorRefPr var future = new FutureActorRef(result, () => { }, path, isRunContinuationsAsynchronouslyAvailable); //The future actor needs to be registered in the temp container provider.RegisterTempActor(future, path); + var message = messageFactory(future); self.Tell(message, future); try