From 660f63741cfe50934f2753229f5422871655240f Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sun, 9 Jan 2022 15:09:21 -0500 Subject: [PATCH 1/8] index creation app example --- .../Akka.Linq2Db.Sandbox.csproj | 15 ++ ....Persistence.Linq2Db.IndexHelperApp.csproj | 25 ++++ .../Options.cs | 22 +++ .../Program.cs | 141 ++++++++++++++++++ .../example.hocon | 15 ++ ....Persistence.Linq2Db.IndexHelperLib.csproj | 15 ++ .../Class1.cs | 50 +++++++ Akka.Persistence.Linq2Db.sln | 18 +++ 8 files changed, 301 insertions(+) create mode 100644 Akka.Linq2Db.Sandbox/Akka.Linq2Db.Sandbox.csproj create mode 100644 Akka.Persistence.Linq2Db.IndexHelperApp/Akka.Persistence.Linq2Db.IndexHelperApp.csproj create mode 100644 Akka.Persistence.Linq2Db.IndexHelperApp/Options.cs create mode 100644 Akka.Persistence.Linq2Db.IndexHelperApp/Program.cs create mode 100644 Akka.Persistence.Linq2Db.IndexHelperApp/example.hocon create mode 100644 Akka.Persistence.Linq2Db.IndexHelperLib/Akka.Persistence.Linq2Db.IndexHelperLib.csproj create mode 100644 Akka.Persistence.Linq2Db.IndexHelperLib/Class1.cs diff --git a/Akka.Linq2Db.Sandbox/Akka.Linq2Db.Sandbox.csproj b/Akka.Linq2Db.Sandbox/Akka.Linq2Db.Sandbox.csproj new file mode 100644 index 00000000..8019ffa3 --- /dev/null +++ b/Akka.Linq2Db.Sandbox/Akka.Linq2Db.Sandbox.csproj @@ -0,0 +1,15 @@ + + + + netcoreapp3.1 + enable + enable + + + + + + + + + diff --git a/Akka.Persistence.Linq2Db.IndexHelperApp/Akka.Persistence.Linq2Db.IndexHelperApp.csproj b/Akka.Persistence.Linq2Db.IndexHelperApp/Akka.Persistence.Linq2Db.IndexHelperApp.csproj new file mode 100644 index 00000000..caf07276 --- /dev/null +++ b/Akka.Persistence.Linq2Db.IndexHelperApp/Akka.Persistence.Linq2Db.IndexHelperApp.csproj @@ -0,0 +1,25 @@ + + + + Exe + netcoreapp3.1 + + + + + + + + + + + + + + + + Always + + + + diff --git a/Akka.Persistence.Linq2Db.IndexHelperApp/Options.cs b/Akka.Persistence.Linq2Db.IndexHelperApp/Options.cs new file mode 100644 index 00000000..b8ebaa58 --- /dev/null +++ b/Akka.Persistence.Linq2Db.IndexHelperApp/Options.cs @@ -0,0 +1,22 @@ +using CommandLine; + +namespace Akka.Persistence.Linq2Db.IndexHelperApp +{ + public class Options + { + [Option('f',"file", Required=true, HelpText = "Specify the HOCON file to use")] + public string File { get; set; } + + [Option('p',"path", Required = true, HelpText = "The Path to the Akka.Persistence.Linq2Db Config in the HOCON.")] + public string HoconPath { get; set; } + + [Option("OrderingIdx", Required = true, Group = "IndexType", HelpText = "Generates the SQL Text for an Ordering index")] + public bool GenerateOrdering { get; set; } + + [Option("PidSeqNoIdx", Required = true, Group = "IndexType", HelpText = "Generates the SQL Text for an index on PersistenceID and SeqNo")] + public bool GeneratePidSeqNo { get; set; } + + [Option("TimeStampIdx", Required = true, Group = "IndexType", HelpText = "Generates the SQL Text for a Timestamp Index")] + public bool GenerateTimestamp { get; set; } + } +} \ No newline at end of file diff --git a/Akka.Persistence.Linq2Db.IndexHelperApp/Program.cs b/Akka.Persistence.Linq2Db.IndexHelperApp/Program.cs new file mode 100644 index 00000000..14a386b5 --- /dev/null +++ b/Akka.Persistence.Linq2Db.IndexHelperApp/Program.cs @@ -0,0 +1,141 @@ +using System; +using System.IO; +using Akka.Configuration; +using Akka.Persistence.Linq2Db.IndexHelperLib; +using Akka.Persistence.Sql.Linq2Db.Config; +using Akka.Persistence.Sql.Linq2Db.Tests; +using CommandLine; +using FluentMigrator.Expressions; +using FluentMigrator.Runner.Generators; +using FluentMigrator.Runner.Generators.Generic; +using FluentMigrator.Runner.Generators.MySql; +using FluentMigrator.Runner.Generators.Oracle; +using FluentMigrator.Runner.Generators.Postgres; +using FluentMigrator.Runner.Generators.Postgres92; +using FluentMigrator.Runner.Generators.SQLite; +using FluentMigrator.Runner.Generators.SqlServer; +using FluentMigrator.Runner.Processors.Postgres; +using LinqToDB; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; + +namespace Akka.Persistence.Linq2Db.IndexHelperApp +{ + class Program + { + + static void Main(string[] args) + { + Parser.Default.ParseArguments(args) + .WithParsed(opts => + { + //var str = Linq2DbJournalDefaultSpecConfig.customConfig("testGen", + // "journalTbl", "metadataTbl", ProviderName.SqlServer, + // "connStr"); + var conf = + ConfigurationFactory.ParseString( + File.ReadAllText(opts.File)); + + var journalConf = + new Akka.Persistence.Sql.Linq2Db.Config.JournalConfig( + conf.GetConfig( + opts.HoconPath + //"akka.persistence.journal.linq2db.testGen" + ) + .WithFallback(Akka.Persistence.Sql.Linq2Db + .Journal + .Linq2DbWriteJournal.DefaultConfiguration)); + var generator = getGenerator(journalConf.ProviderName); + var helper = new JournalIndexHelper(); + CreateIndexExpression expr = null; + GeneratePerOptions(opts, helper, journalConf, generator); + }); + } + + private static void GeneratePerOptions(Options opts, JournalIndexHelper helper, + JournalConfig journalConf, GenericGenerator generator) + { + CreateIndexExpression expr; + if (opts.GeneratePidSeqNo) + { + expr = new CreateIndexExpression() + { + Index = helper.JournalOrdering(journalConf.TableConfig.TableName, + journalConf.TableConfig.ColumnNames.Ordering, + journalConf.TableConfig.SchemaName) + }; + GenerateWithHeaderAndFooter(generator, expr, "Ordering"); + } + + if (opts.GeneratePidSeqNo) + { + expr = new CreateIndexExpression() + { + Index = helper.DefaultJournalIndex( + journalConf.TableConfig.TableName, + journalConf.TableConfig.ColumnNames.PersistenceId, + journalConf.TableConfig.ColumnNames.SequenceNumber, + journalConf.TableConfig.SchemaName) + }; + GenerateWithHeaderAndFooter(generator, expr, "PidAndSequenceNo"); + } + + if (opts.GenerateTimestamp) + { + expr = new CreateIndexExpression() + { + Index = helper.JournalTimestamp(journalConf.TableConfig.TableName, + journalConf.TableConfig.ColumnNames.Created, + journalConf.TableConfig.SchemaName) + }; + GenerateWithHeaderAndFooter(generator, expr, "Timestamp"); + } + } + + private static void GenerateWithHeaderAndFooter(GenericGenerator generator, + CreateIndexExpression expr, string indexType) + { + Console.WriteLine("-------"); + Console.WriteLine($"----{indexType} Index Create Below"); + Console.WriteLine(generator.Generate(expr)); + Console.WriteLine($"----{indexType} Index Create Above"); + Console.WriteLine("-------"); + } + + static GenericGenerator getGenerator(string dbArg) + { + if (dbArg.StartsWith("sqlserver", + StringComparison.InvariantCultureIgnoreCase)) + { + return new SqlServer2008Generator(); + } + else if (dbArg.Contains("sqlite", + StringComparison.InvariantCultureIgnoreCase)) + { + return new SQLiteGenerator(); + } + else if (dbArg.Contains("postgres", + StringComparison.InvariantCultureIgnoreCase)) + { + return new Postgres92Generator( + new PostgresQuoter(new PostgresOptions()), + new OptionsWrapper( + new GeneratorOptions())); + } + else if (dbArg.Contains("mysql", + StringComparison.InvariantCultureIgnoreCase)) + { + return new MySql5Generator(); + } + else if (dbArg.Contains("oracle", + StringComparison.InvariantCultureIgnoreCase)) + { + return new OracleGenerator(); + } + else + { + throw new Exception("IDK what to do with this!"); + } + } + } +} \ No newline at end of file diff --git a/Akka.Persistence.Linq2Db.IndexHelperApp/example.hocon b/Akka.Persistence.Linq2Db.IndexHelperApp/example.hocon new file mode 100644 index 00000000..a382c24c --- /dev/null +++ b/Akka.Persistence.Linq2Db.IndexHelperApp/example.hocon @@ -0,0 +1,15 @@ +akka.persistence.journal.linq2db{ + testGen { + class = "Akka.Persistence.Sql.Linq2Db.Journal.Linq2DbWriteJournal, Akka.Persistence.Sql.Linq2Db" + provider-name = "SqlServer" + connection-string = "connStr" + tables{ + journal{ + auto-init = true + warn-on-auto-init-fail = false + table-name = "journalTbl" + metadata-table-name = "metadataTbl" + } + } +} +} \ No newline at end of file diff --git a/Akka.Persistence.Linq2Db.IndexHelperLib/Akka.Persistence.Linq2Db.IndexHelperLib.csproj b/Akka.Persistence.Linq2Db.IndexHelperLib/Akka.Persistence.Linq2Db.IndexHelperLib.csproj new file mode 100644 index 00000000..49c19454 --- /dev/null +++ b/Akka.Persistence.Linq2Db.IndexHelperLib/Akka.Persistence.Linq2Db.IndexHelperLib.csproj @@ -0,0 +1,15 @@ + + + + netstandard2.0 + + + + + + + + + + + diff --git a/Akka.Persistence.Linq2Db.IndexHelperLib/Class1.cs b/Akka.Persistence.Linq2Db.IndexHelperLib/Class1.cs new file mode 100644 index 00000000..65f5b08e --- /dev/null +++ b/Akka.Persistence.Linq2Db.IndexHelperLib/Class1.cs @@ -0,0 +1,50 @@ +using System; +using FluentMigrator.Model; + +namespace Akka.Persistence.Linq2Db.IndexHelperLib +{ + public class JournalIndexHelper + { + public IndexDefinition DefaultJournalIndex(string tableName, string persistenceIdCol, string sequenceNoCol, string schemaName = null) + { + var idx = beginCreateIndex(tableName, schemaName, $"UX_{tableName}_PID_SEQNO"); + //short name for easy compat with all dbs. (*cough* oracle *cough*) + idx.Columns.Add(new IndexColumnDefinition(){ Name = persistenceIdCol }); + idx.Columns.Add(new IndexColumnDefinition(){Name = sequenceNoCol, Direction = Direction.Ascending}); + idx.IsUnique = true; + return idx; + } + + public IndexDefinition JournalOrdering(string tableName, + string orderingCol, string schemaName = null) + { + var idx = beginCreateIndex(tableName, schemaName,$"IX_{tableName}_Ordering"); + idx.Columns.Add(new IndexColumnDefinition(){Name = orderingCol}); + //Should it be? + //idx.IsUnique = true; + return idx; + } + + public IndexDefinition JournalTimestamp(string tableName, + string timestampCol, string schemaName = null) + { + var idx = beginCreateIndex(tableName, schemaName, + $"IX_{tableName}_TimeStamp"); + idx.Columns.Add(new IndexColumnDefinition(){Name = timestampCol}); + //Not unique by any stretch. + return idx; + } + + private static IndexDefinition beginCreateIndex(string tableName, string schemaName, string indexName) + { + var idx = new IndexDefinition(); + if (string.IsNullOrWhiteSpace(schemaName) == false) + { + idx.SchemaName = schemaName; + } + idx.TableName = tableName; + idx.Name = indexName; + return idx; + } + } +} \ No newline at end of file diff --git a/Akka.Persistence.Linq2Db.sln b/Akka.Persistence.Linq2Db.sln index f84ac72e..81a30e5c 100644 --- a/Akka.Persistence.Linq2Db.sln +++ b/Akka.Persistence.Linq2Db.sln @@ -36,6 +36,12 @@ EndProjectSection EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests", "Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests\Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.csproj", "{170698FA-DA1E-40BC-896D-AFA67976C0EB}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Linq2Db.IndexHelperLib", "Akka.Persistence.Linq2Db.IndexHelperLib\Akka.Persistence.Linq2Db.IndexHelperLib.csproj", "{AACE3FBC-51FE-4A9B-B6C4-4CCA750DB22E}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Linq2Db.IndexHelperApp", "Akka.Persistence.Linq2Db.IndexHelperApp\Akka.Persistence.Linq2Db.IndexHelperApp.csproj", "{D5C851AA-DB80-4E9F-BD2E-03E63DC3082E}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Linq2Db.Sandbox", "Akka.Linq2Db.Sandbox\Akka.Linq2Db.Sandbox.csproj", "{697B9FC8-29E2-4F7D-B63B-9E4B873F6AA1}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -78,6 +84,18 @@ Global {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Debug|Any CPU.Build.0 = Debug|Any CPU {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Release|Any CPU.ActiveCfg = Release|Any CPU {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Release|Any CPU.Build.0 = Release|Any CPU + {AACE3FBC-51FE-4A9B-B6C4-4CCA750DB22E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AACE3FBC-51FE-4A9B-B6C4-4CCA750DB22E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AACE3FBC-51FE-4A9B-B6C4-4CCA750DB22E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AACE3FBC-51FE-4A9B-B6C4-4CCA750DB22E}.Release|Any CPU.Build.0 = Release|Any CPU + {D5C851AA-DB80-4E9F-BD2E-03E63DC3082E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D5C851AA-DB80-4E9F-BD2E-03E63DC3082E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D5C851AA-DB80-4E9F-BD2E-03E63DC3082E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D5C851AA-DB80-4E9F-BD2E-03E63DC3082E}.Release|Any CPU.Build.0 = Release|Any CPU + {697B9FC8-29E2-4F7D-B63B-9E4B873F6AA1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {697B9FC8-29E2-4F7D-B63B-9E4B873F6AA1}.Debug|Any CPU.Build.0 = Debug|Any CPU + {697B9FC8-29E2-4F7D-B63B-9E4B873F6AA1}.Release|Any CPU.ActiveCfg = Release|Any CPU + {697B9FC8-29E2-4F7D-B63B-9E4B873F6AA1}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE From fec6cf57c3e44ccc7d670269feeaa0c96a1d0b3f Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sat, 19 Mar 2022 16:52:30 -0400 Subject: [PATCH 2/8] WIP Tag Table with backwards compatibility --- .../Config/BaseByteArrayJournalDaoConfig.cs | 3 +- .../Config/JournalConfig.cs | 4 +- .../Config/JournalTableConfig.cs | 39 +++- .../Config/ReadJournalPluginConfig.cs | 29 ++- .../AkkaPersistenceDataConnectionFactory.cs | 7 +- .../Journal/DAO/BaseByteArrayJournalDao.cs | 147 ++++++++++++-- .../DAO/BaseJournalDaoWithReadMessages.cs | 29 ++- .../Journal/DAO/ByteArrayJournalSerializer.cs | 77 ++++++-- .../Journal/Types/JournalRow.cs | 7 + .../Query/Dao/BaseByteReadArrayJournalDAO.cs | 185 ++++++++++++++++-- .../Serialization/PersistentReprSerializer.cs | 92 ++++++--- 11 files changed, 527 insertions(+), 92 deletions(-) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs index 0410df3d..5e6499a1 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs @@ -10,6 +10,7 @@ public BaseByteArrayJournalDaoConfig(Configuration.Config config) BufferSize = config.GetInt("buffer-size", 5000); BatchSize = config.GetInt("batch-size", 100); DbRoundTripBatchSize = config.GetInt("db-round-trip-max-batch-size", 1000); + DbRoundTripTagBatchSize = config.GetInt("db-round-trip-max-tag-batch-size", 1000); PreferParametersOnMultiRowInsert = config.GetBoolean("prefer-parameters-on-multirow-insert", false); @@ -43,6 +44,6 @@ public BaseByteArrayJournalDaoConfig(Configuration.Config config) public int BufferSize { get; protected set; } public bool SqlCommonCompatibilityMode { get; protected set; } - + public int DbRoundTripTagBatchSize { get; set; } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalConfig.cs index ac40d71c..52708066 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalConfig.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalConfig.cs @@ -21,7 +21,6 @@ public JournalConfig(Configuration.Config config) UseSharedDb = string.IsNullOrWhiteSpace(dbConf) ? null : dbConf; UseCloneConnection = config.GetBoolean("use-clone-connection", false); - } public string MaterializerDispatcher { get; protected set; } @@ -44,6 +43,7 @@ public IDaoConfig IDaoConfig public string ProviderName { get; } public string ConnectionString { get; } public bool UseCloneConnection { get; set; } + } public interface IProviderConfig @@ -61,4 +61,6 @@ public interface IDaoConfig bool SqlCommonCompatibilityMode { get; } int Parallelism { get; } } + + } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs index 4d680076..f5b532d8 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs @@ -3,6 +3,13 @@ namespace Akka.Persistence.Sql.Linq2Db.Config { + [Flags] + public enum TagWriteMode + { + CommaSeparatedArray = 1, + TagTable = 2, + CommaSeparatedArrayAndTagTable = 3, + } public class JournalTableConfig { @@ -13,6 +20,8 @@ public class JournalTableConfig public string MetadataTableName { get; protected set; } public MetadataTableColumnNames MetadataColumnNames { get; protected set; } public bool WarnOnAutoInitializeFail { get; } + + public TagWriteMode TagWriteMode { get; } public JournalTableConfig(Configuration.Config config) { @@ -28,6 +37,31 @@ public JournalTableConfig(Configuration.Config config) AutoInitialize = localcfg.GetBoolean("auto-init", false); WarnOnAutoInitializeFail = localcfg.GetBoolean("warn-on-auto-init-fail", true); + + var s = config.GetString("tag-write-mode", "default"); + if (Enum.TryParse(s, true, out TagWriteMode res)) + { + + } + else if (s.Equals("default", StringComparison.InvariantCultureIgnoreCase)) + { + res = TagWriteMode.CommaSeparatedArray; + } + else if (s.Equals("migration", + StringComparison.InvariantCultureIgnoreCase)) + { + res = TagWriteMode.CommaSeparatedArrayAndTagTable; + } + else if (s.Equals("tagtableonly", + StringComparison.InvariantCultureIgnoreCase)) + { + res = TagWriteMode.TagTable; + } + else + { + res = TagWriteMode.CommaSeparatedArray; + } + TagWriteMode = res; } @@ -40,7 +74,8 @@ protected bool Equals(JournalTableConfig other) AutoInitialize == other.AutoInitialize && MetadataTableName == other.MetadataTableName && WarnOnAutoInitializeFail == other.WarnOnAutoInitializeFail && - Equals(MetadataColumnNames, other.MetadataColumnNames); + Equals(MetadataColumnNames, other.MetadataColumnNames) && + TagWriteMode== other.TagWriteMode; } public override bool Equals(object obj) @@ -54,7 +89,7 @@ public override bool Equals(object obj) public override int GetHashCode() { return HashCode.Combine(ColumnNames, TableName, SchemaName, - AutoInitialize, MetadataTableName, MetadataColumnNames); + AutoInitialize, MetadataTableName, MetadataColumnNames, TagWriteMode); } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs index 691d3e9d..10394f2c 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs @@ -1,4 +1,6 @@ -namespace Akka.Persistence.Sql.Linq2Db.Config +using System; + +namespace Akka.Persistence.Sql.Linq2Db.Config { public class ReadJournalPluginConfig { @@ -7,11 +9,34 @@ public ReadJournalPluginConfig(Configuration.Config config) TagSeparator = config.GetString("tag-separator", ","); Dao = config.GetString("dao", "akka.persistence.sql.linq2db.dao.bytea.readjournal.bytearrayreadjournaldao"); - + var tagReadStr = config.GetString("tag-read-mode", "defaultconcatvarchar"); + if (Enum.TryParse(tagReadStr,true,out TagReadMode tgr)) + { + + } + else if (tagReadStr.Equals("default", StringComparison.InvariantCultureIgnoreCase)) + { + tgr = TagReadMode.DefaultConcatVarchar; + } + else if (tagReadStr.Equals("migrate", StringComparison.InvariantCultureIgnoreCase)) + { + tgr = TagReadMode.MigrateToTagTable; + } + + TagReadMode = tgr; } public string Dao { get; set; } public string TagSeparator { get; set; } + public TagReadMode TagReadMode { get; set; } + } + + [Flags] + public enum TagReadMode + { + DefaultConcatVarchar = 1, + MigrateToTagTable = 3, + TagTableOnly = 2 } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs b/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs index 5a63fef1..c29a33c3 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs @@ -135,7 +135,12 @@ private static void MapJournalRow(IProviderConfig config, .HasColumnName(tableConfig.ColumnNames.SequenceNumber) .Member(r => r.Timestamp) .HasColumnName(tableConfig.ColumnNames.Created); - + //We can skip writing tags the old way by ignoring the column in mapping. + if ((tableConfig.TagWriteMode & + TagWriteMode.CommaSeparatedArray) == 0) + { + journalRowBuilder.Member(r => r.tags).IsNotColumn(); + } if (config.ProviderName.ToLower().Contains("sqlite")) { journalRowBuilder.Member(r => r.ordering).IsPrimaryKey().HasDbType("INTEGER") diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs index 2c749fa4..1cc8a141 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs @@ -1,4 +1,5 @@ using System; +using System.Collections; using System.Collections.Generic; using System.Collections.Immutable; using System.Data; @@ -23,6 +24,18 @@ namespace Akka.Persistence.Sql.Linq2Db.Journal.DAO { + public static class EnumerableExtensions + { + public static IEnumerable SelectStateful( + this IEnumerable enumerable, TState state, + Func selector) + { + foreach (var item in enumerable) + { + yield return selector(item,state); + } + } + } public abstract class BaseByteArrayJournalDao : BaseJournalDaoWithReadMessages, IJournalDaoWithUpdates @@ -149,15 +162,75 @@ private async Task WriteJournalRows(Seq xs) private async Task InsertSingle(Seq xs) { - //If we are writing a single row, - //we don't need to worry about transactions. - using (var db = _connectionFactory.GetConnection()) + if ((_journalConfig.TableConfig.TagWriteMode & TagWriteMode.TagTable)!=0 && xs.Head.tagArr.Length>0) + { + //Lazy fallback; do the InsertMultiple call here and leave it at that. + await InsertMultiple(xs); + } + else + { + //If we are writing a single row, + //we don't need to worry about transactions. + using (var db = _connectionFactory.GetConnection()) + { + await db.InsertAsync(xs.Head); + } + } + + + } + + private async Task BulkInsertWithTagTableTags(DataConnection dc, + Seq xs) + { + var tagsToInsert = new List(xs.Count); + foreach (var journalRow in xs) { - await db.InsertAsync(xs.Head); + var dbid = await dc.InsertWithInt64IdentityAsync(journalRow); + tagsToInsert.AddRange(journalRow.tagArr.SelectStateful(dbid, + (tag, id) => new JournalTagRow() + { JournalOrderingId = id, TagValue = tag })); } + + await dc.GetTable() + .BulkCopyAsync(new BulkCopyOptions() + { + BulkCopyType = BulkCopyType.MultipleRows, + UseParameters = _journalConfig.DaoConfig + .PreferParametersOnMultiRowInsert, + MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripTagBatchSize + }, tagsToInsert); + } + private async Task BulkInsertNoTagTableTags(DataConnection dc, Seq xs) + { + await dc.GetTable() + .BulkCopyAsync( + new BulkCopyOptions() + { + BulkCopyType = + xs.Count > _journalConfig.DaoConfig + .MaxRowByRowSize + ? BulkCopyType.Default + : BulkCopyType.MultipleRows, + UseParameters = _journalConfig.DaoConfig.PreferParametersOnMultiRowInsert, + MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripBatchSize + }, xs); } private async Task InsertMultiple(Seq xs) + { + if ((_journalConfig.TableConfig.TagWriteMode & TagWriteMode.TagTable) !=0) + { + await HandleTagTableInsert(xs); + } + else + { + await HandleDefaultInsert(xs); + } + + } + + private async Task HandleDefaultInsert(Seq xs) { using (var db = _connectionFactory.GetConnection()) { @@ -165,18 +238,7 @@ private async Task InsertMultiple(Seq xs) { await db.BeginTransactionAsync(IsolationLevel .ReadCommitted); - await db.GetTable() - .BulkCopyAsync( - new BulkCopyOptions() - { - BulkCopyType = - xs.Count > _journalConfig.DaoConfig - .MaxRowByRowSize - ? BulkCopyType.Default - : BulkCopyType.MultipleRows, - UseParameters = _journalConfig.DaoConfig.PreferParametersOnMultiRowInsert, - MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripBatchSize - }, xs); + await BulkInsertNoTagTableTags(db, xs); await db.CommitTransactionAsync(); } catch (Exception e) @@ -195,9 +257,58 @@ await db.GetTable() } } + private async Task HandleTagTableInsert(Seq xs) + { + using (var db = _connectionFactory.GetConnection()) + { + try + { + await db.BeginTransactionAsync(IsolationLevel + .ReadCommitted); + await consumeSequenceForTagInsert(xs, db); + await db.CommitTransactionAsync(); + } + catch (Exception ex) + { + try + { + await db.RollbackTransactionAsync(); + } + catch (Exception exception) + { + throw ex; + } + + throw; + } + } + } + + private async Task consumeSequenceForTagInsert(Seq xs, DataConnection db) + { + Seq tail = xs; + while (tail.Count > 0) + { + Seq noTags; + Seq hasTags; + (noTags, tail) = + tail.Span(r => r.tagArr.Length == 0); + if (noTags.Count > 0) + { + await BulkInsertNoTagTableTags(db, noTags); + } + + (hasTags, tail) = + tail.Span(r => r.tagArr.Length > 0); + if (hasTags.Count > 0) + { + await BulkInsertWithTagTableTags(db, hasTags); + } + } + } //By using a custom flatten here, we avoid an Enumerable/LINQ allocation //And are able to have a little more control over default capacity of array. - static List FlattenListOfListsToList(List>> source) { + static List FlattenListOfListsToList(List> source) { //List ResultSet( // Akka.Util.Try> item) @@ -236,7 +347,7 @@ public async Task> AsyncWriteMessages( } protected static ImmutableList BuildWriteRejections( - List>> serializedTries) + List> serializedTries) { Exception[] builderEx = new Exception[serializedTries.Count]; diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs index b201b7ef..0f6f8310 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs @@ -40,21 +40,30 @@ protected BaseJournalDaoWithReadMessages(IAdvancedScheduler ec, FlowControlEnum.Continue), async opt => { - async Task>)>> - RetrieveNextBatch() + async Task>> BatchFromDb(string s, long l, int i, + long fromSeqNo) { - Seq< - Util.Try> msg; + Seq> msg; using (var conn = _connectionFactory.GetConnection()) { - msg = await Messages(conn, persistenceId, - opt.Item1, - toSequenceNr, batchSize) + msg = await Messages(conn, s, + fromSeqNo, + l, i) .RunWith( - ExtSeq.Seq>(), mat); + ExtSeq.Seq>(), mat); } + return msg; + } + + async Task>)>> + RetrieveNextBatch(long fromSeq) + { + Seq< + Util.Try> msg; + msg = await BatchFromDb(persistenceId, toSequenceNr, batchSize, fromSeq); + var hasMoreEvents = msg.Count == batchSize; //var lastMsg = msg.IsEmpty.LastOrDefault(); Util.Option lastSeq = Util.Option.None; @@ -99,9 +108,9 @@ protected BaseJournalDaoWithReadMessages(IAdvancedScheduler ec, case FlowControlEnum.Stop: return Util.Option<((long, FlowControlEnum), Seq>)>.None; case FlowControlEnum.Continue: - return await RetrieveNextBatch(); + return await RetrieveNextBatch(opt.Item1); case FlowControlEnum.ContinueDelayed when refreshInterval.HasValue: - return await FutureTimeoutSupport.After(refreshInterval.Value.Item1,refreshInterval.Value.Item2, RetrieveNextBatch); + return await FutureTimeoutSupport.After(refreshInterval.Value.Item1,refreshInterval.Value.Item2,()=> RetrieveNextBatch(opt.Item1)); default: return InvalidFlowThrowHelper(opt); } diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs index 21ec4de9..3c476932 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs @@ -20,6 +20,7 @@ public class ByteArrayJournalSerializer : FlowPersistentReprSerializer _journalConfig; private readonly string[] _separatorArray; + private readonly TagWriteMode _tagWriteMode; public ByteArrayJournalSerializer(IProviderConfig journalConfig, Akka.Serialization.Serialization serializer, string separator) { @@ -27,6 +28,7 @@ public ByteArrayJournalSerializer(IProviderConfig journalCon _serializer = serializer; _separator = separator; _separatorArray = new[] {_separator}; + _tagWriteMode = journalConfig.TableConfig.TagWriteMode; } /// @@ -47,6 +49,43 @@ private static string StringSep(IImmutableSet tags, tl + separator + tr); } + private JournalRow CreateJournalRow( + IImmutableSet tags, IPersistentRepresentation _persistentRepr, long ts) + { + switch (_tagWriteMode) + { + case TagWriteMode.CommaSeparatedArray: + return new JournalRow() + { + tags = StringSep(tags, _separator), + Timestamp = _persistentRepr.Timestamp == 0 + ? ts + : _persistentRepr.Timestamp + }; + case TagWriteMode.TagTable: + return new JournalRow() + { + tags = "", + tagArr = tags.ToArray(), + Timestamp = _persistentRepr.Timestamp == 0 + ? ts + : _persistentRepr.Timestamp + }; + case TagWriteMode.CommaSeparatedArrayAndTagTable: + return new JournalRow() + { + tags = StringSep(tags, _separator), + tagArr = tags.ToArray(), + Timestamp = _persistentRepr.Timestamp == 0 + ? ts + : _persistentRepr.Timestamp + }; + default: + throw new Exception("Invalid Tag Write Mode!"); + } + } + + protected override Try Serialize(IPersistentRepresentation persistentRepr, IImmutableSet tTags, long timeStamp = 0) { try @@ -55,39 +94,37 @@ protected override Try Serialize(IPersistentRepresentation persisten return Akka.Serialization.Serialization.WithTransport( _serializer.System, (persistentRepr , _serializer.FindSerializerForType(persistentRepr.Payload.GetType(),_journalConfig.DefaultSerializer), - StringSep(tTags,_separator), - timeStamp + CreateJournalRow(tTags,persistentRepr,timeStamp) ), state => { - var (_persistentRepr, serializer,tags,ts) = state; - string thisManifest = ""; + var (_persistentRepr, serializer,row) = state; + if (serializer is SerializerWithStringManifest withStringManifest) { - thisManifest = + row.manifest = withStringManifest.Manifest(_persistentRepr.Payload); } else { if (serializer.IncludeManifest) { - thisManifest = _persistentRepr.Payload + row.manifest = _persistentRepr.Payload .GetType().TypeQualifiedName(); } } - return new Try(new JournalRow() + { - message = - serializer.ToBinary(_persistentRepr.Payload), - manifest = thisManifest, - persistenceId = _persistentRepr.PersistenceId, - tags = tags, - Identifier = serializer.Identifier, - sequenceNumber = _persistentRepr.SequenceNr, - Timestamp = _persistentRepr.Timestamp == 0 - ? ts - : _persistentRepr.Timestamp - }); + row.message = + serializer.ToBinary(_persistentRepr + .Payload); + row.persistenceId = + _persistentRepr.PersistenceId; + row.Identifier = serializer.Identifier; + row.sequenceNumber = _persistentRepr.SequenceNr; + } + return new Try(row + ); }); } catch (Exception e) @@ -121,7 +158,7 @@ protected override Try Serialize(IPersistentRepresentation persisten t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp), t.tags?.Split(_separatorArray, StringSplitOptions.RemoveEmptyEntries) - .ToImmutableHashSet() ?? ImmutableHashSet.Empty, + .ToImmutableHashSet() ?? t.tagArr?.ToImmutableHashSet()?? ImmutableHashSet.Empty, t.ordering)); } else @@ -134,7 +171,7 @@ protected override Try Serialize(IPersistentRepresentation persisten t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp), t.tags?.Split(_separatorArray, StringSplitOptions.RemoveEmptyEntries) - .ToImmutableHashSet() ?? ImmutableHashSet.Empty, + .ToImmutableHashSet() ??t.tagArr?.ToImmutableHashSet()?? ImmutableHashSet.Empty, t.ordering)); // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 } diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs index 4574e9d6..e4311d83 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs @@ -3,6 +3,12 @@ namespace Akka.Persistence.Sql.Linq2Db.Journal.Types { + + public sealed class JournalTagRow + { + public long JournalOrderingId { get; set; } + public string TagValue { get; set; } + } public sealed class JournalRow { public JournalRow() @@ -22,5 +28,6 @@ public JournalRow() public string tags { get; set; } public string manifest { get; set; } public int? Identifier { get; set; } + public string[] tagArr { get; set; } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs index 9d1ff11f..afdbe0e5 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs @@ -15,6 +15,7 @@ using Akka.Util; using LinqToDB; using LinqToDB.Data; +using LinqToDB.Tools; namespace Akka.Persistence.Sql.Linq2Db.Query.Dao { @@ -104,19 +105,43 @@ public Source< { using (var conn = input._connectionFactory.GetConnection()) { - return await conn.GetTable() + var evts = await conn.GetTable() .OrderBy(r => r.ordering) .Where(r => r.ordering > input.offset && r.ordering <= input.maxOffset) .Take(input.maxTake).ToListAsync(); + return await AddTagDataIfNeeded(evts, conn); } } ).Via(deserializeFlow); } - + + public async Task> AddTagDataIfNeeded(List toAdd, DataConnection context) + { + if (_readJournalConfig.PluginConfig.TagReadMode == + TagReadMode.TagTableOnly) + { + var tagRows = await context.GetTable() + .Where(r => + r.JournalOrderingId.In(toAdd.Select(r => r.ordering))) + .OrderBy(r=>r.JournalOrderingId) + .ToListAsync(); + foreach (var journalRow in toAdd) + { + journalRow.tagArr = + tagRows.Where(r => + r.JournalOrderingId == + journalRow.ordering) + .Select(r => r.TagValue) + .ToArray(); + } + } + + return toAdd; + } public Source< Akka.Util.Try<(IPersistentRepresentation, IImmutableSet, long)>, NotUsed> EventsByTag(string tag, long offset, long maxOffset, @@ -124,29 +149,159 @@ public Source< { var separator = _readJournalConfig.PluginConfig.TagSeparator; var maxTake = MaxTake(max); - return AsyncSource.FromEnumerable(new{separator,tag,offset,maxOffset,maxTake,_connectionFactory}, - async(input)=> - { - using (var conn = input._connectionFactory.GetConnection()) + switch (_readJournalConfig.PluginConfig.TagReadMode) + { + case TagReadMode.DefaultConcatVarchar: + return AsyncSource.FromEnumerable(new{separator,tag,offset,maxOffset,maxTake,_connectionFactory}, + async(input)=> + { + using (var conn = input._connectionFactory.GetConnection()) + { + return await conn.GetTable() + .Where(r => r.tags.Contains(input.tag)) + .OrderBy(r => r.ordering) + .Where(r => + r.ordering > input.offset && r.ordering <= input.maxOffset) + .Take(input.maxTake).ToListAsync(); + } + }).Via(perfectlyMatchTag(tag, separator)) + .Via(deserializeFlow); + case TagReadMode.MigrateToTagTable: + return eventByTagMigration(tag, offset, maxOffset, separator, maxTake); + case TagReadMode.TagTableOnly: + return eventByTagTableOnly(tag, offset, maxOffset, separator, maxTake); + default: + throw new ArgumentOutOfRangeException(); + } + + + + } + + private Source< + Try<(IPersistentRepresentation, IImmutableSet, long)>, + NotUsed> eventByTagTableOnly(string tag, long offset, + long maxOffset, + string separator, int maxTake) + { + return AsyncSource.FromEnumerable( + new { - return await conn.GetTable() - .Where(r => r.tags.Contains(input.tag)) - .OrderBy(r => r.ordering) - .Where(r => - r.ordering > input.offset && r.ordering <= input.maxOffset) - .Take(input.maxTake).ToListAsync(); - } - }).Via(perfectlyMatchTag(tag, separator)) + separator, tag, offset, maxOffset, maxTake, _connectionFactory + }, + async (input) => + { + //TODO: Optimize Flow + using (var conn = input._connectionFactory.GetConnection()) + { + //First, Get eligible rows. + var mainRows = await + conn.GetTable() + .LeftJoin( + conn.GetTable< + JournalTagRow>(), + (jr, jtr) => + jr.ordering == + jtr.JournalOrderingId, + (jr, jtr) => + new { jr, jtr }) + .Where(r => + r.jtr.TagValue == input.tag) + .Select(r=>r.jr) + .Where(r => + r.ordering > input.offset && + r.ordering <= input.maxOffset) + .Take(input.maxTake).ToListAsync(); + //Then, Get the tags for the rows. + var tagRows = await conn.GetTable() + .Where(r => + r.JournalOrderingId.In( + mainRows.Select(r => + r.ordering))) + .ToListAsync(); + foreach (var journalRow in mainRows) + { + journalRow.tagArr = + tagRows.Where(r => + r.JournalOrderingId == + journalRow.ordering) + .Select(r => r.TagValue) + .ToArray(); + } + + return mainRows; + } + }).Via(perfectlyMatchTag(tag, separator)) .Via(deserializeFlow); + } + private Source, long)>, NotUsed> eventByTagMigration(string tag, long offset, long maxOffset, + string separator, int maxTake) + { + return AsyncSource.FromEnumerable( + new + { + separator, tag, offset, maxOffset, maxTake, _connectionFactory + }, + async (input) => + { + //NOTE: This flow is probably not performant, + //It is meant to allow for safe migration + //And is not necessarily intended for long term use + using (var conn = input._connectionFactory.GetConnection()) + { + //First, fin + var mainRows = await conn.GetTable() + .Where(r => r.ordering.In( + conn.GetTable() + .LeftJoin( + conn.GetTable< + JournalTagRow>(), + (jr, jtr) => + jr.ordering == + jtr.JournalOrderingId, + (jr, jtr) => + new { jr, jtr }) + .Where(r => + r.jr.tags.Contains( + input.tag) || + r.jtr.TagValue == input.tag) + .Select(r => r.jr.ordering) + .OrderBy(r => r) + .Where(r => + r > input.offset && + r <= input.maxOffset) + .Take(input.maxTake))).ToListAsync(); + var tagRows = await conn.GetTable() + .Where(r => + r.JournalOrderingId.In( + mainRows.Select(r => + r.ordering))) + .ToListAsync(); + foreach (var journalRow in mainRows) + { + journalRow.tagArr = + tagRows.Where(r => + r.JournalOrderingId == + journalRow.ordering) + .Select(r => r.TagValue) + .ToArray(); + } + + return mainRows; + } + }).Via(perfectlyMatchTag(tag, separator)) + .Via(deserializeFlow); } private Flow perfectlyMatchTag( string tag, string separator) { - + //Do the tagArr check first here + //Since the logic is simpler. return Flow.Create().Where(r => + r.tagArr?.Contains(tag)?? (r.tags ?? "") .Split(new[] {separator}, StringSplitOptions.RemoveEmptyEntries) .Any(t => t.Contains(tag))); diff --git a/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs index c2b1e347..fb3f9cac 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs @@ -11,7 +11,7 @@ namespace Akka.Persistence.Sql.Linq2Db.Serialization { public abstract class PersistentReprSerializer { - public List>> Serialize( + public List> Serialize( IEnumerable messages, long timeStamp = 0) { return messages.Select(aw => @@ -23,58 +23,106 @@ public abstract class PersistentReprSerializer //Also, if we are only persisting a single event //We will only enumerate if we have more than one element. var payloads = - (aw.Payload as IImmutableList - ); + (aw.Payload as IImmutableList + ); if (payloads is null) { - return new Util.Try>( + return new Util.Try( new ArgumentNullException( $"{aw.PersistenceId} received empty payload for sequenceNr range " + $"{aw.LowestSequenceNr} - {aw.HighestSequenceNr}")); } + //Preallocate our list; In the common case //This saves a tiny bit of garbage - var retList = new List(payloads.Count); + var retList = new T[payloads.Count]; if (payloads.Count == 1) + { + // If there's only one payload + // Don't allocate the enumerable. + var ser = Serialize(payloads[0], timeStamp); + var opt = ser.Success; + if (opt.HasValue) { - // If there's only one payload - // Don't allocate the enumerable. - var ser = Serialize(payloads[0], timeStamp); - var opt = ser.Success; - if (opt.HasValue) - { - retList.Add(opt.Value); - return new Util.Try>(retList); - } - else - { - return new Util.Try>(ser.Failure.Value); - } + retList[0] = opt.Value; + return new Util.Try(retList); } + else + { + return new Util.Try(ser.Failure.Value); + } + } else { + int idx = 0; foreach (var p in payloads) { var ser = Serialize(p, timeStamp); var opt = ser.Success; if (opt.HasValue) { - retList.Add(opt.Value); + retList[idx] = opt.Value; + idx = idx + 1; } else { - return new Util.Try>(ser.Failure.Value); + return new Util.Try(ser.Failure.Value); } } - return new Util.Try>(retList); + return new Util.Try(retList); } //return new Util.Try>(retList); - }).ToList(); } + private List> HandleSerializeList(long timeStamp, AtomicWrite[] msgArr) + { + List> fullSet = + new List>(msgArr.Length); + for (int i = 0; i < msgArr.Length; i++) + { + var payloads = + (msgArr[i].Payload as + IImmutableList + ); + if (payloads is null) + { + fullSet.Add(new Util.Try( + new ArgumentNullException( + $"{msgArr[i].PersistenceId} received empty payload for sequenceNr range " + + $"{msgArr[i].LowestSequenceNr} - {msgArr[i].HighestSequenceNr}"))); + } + else + { + fullSet.Add(serializerItem(timeStamp, payloads)); + } + } + + return fullSet; + } + + private Util.Try serializerItem(long timeStamp, IImmutableList payloads) + { + var retList = new T[payloads.Count]; + for (int j = 0; j < payloads.Count; j++) + { + var ser = Serialize(payloads[j], timeStamp); + var opt = ser.Success; + if (opt.HasValue) + { + retList[j] = opt.Value; + } + else + { + return new Util.Try(ser.Failure.Value); + } + } + + return new Util.Try(retList); + } + public Akka.Util.Try Serialize(IPersistentRepresentation persistentRepr, long timeStamp = 0) { From 8b657246ca156e5381dd9ae96b0b67da002d0178 Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sat, 19 Mar 2022 17:47:35 -0400 Subject: [PATCH 3/8] fix incomplete rename refactor, re-add case to guarantee manifest is written even if empty --- .../Config/ReadJournalPluginConfig.cs | 12 ++++++------ .../Db/AkkaPersistenceDataConnectionFactory.cs | 1 + .../Journal/DAO/ByteArrayJournalSerializer.cs | 11 ++++++----- .../Query/Dao/BaseByteReadArrayJournalDAO.cs | 10 +++++----- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs index 10394f2c..bcd8da16 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs @@ -9,18 +9,18 @@ public ReadJournalPluginConfig(Configuration.Config config) TagSeparator = config.GetString("tag-separator", ","); Dao = config.GetString("dao", "akka.persistence.sql.linq2db.dao.bytea.readjournal.bytearrayreadjournaldao"); - var tagReadStr = config.GetString("tag-read-mode", "defaultconcatvarchar"); + var tagReadStr = config.GetString("tag-read-mode", "default"); if (Enum.TryParse(tagReadStr,true,out TagReadMode tgr)) { } else if (tagReadStr.Equals("default", StringComparison.InvariantCultureIgnoreCase)) { - tgr = TagReadMode.DefaultConcatVarchar; + tgr = TagReadMode.CommaSeparatedArray; } else if (tagReadStr.Equals("migrate", StringComparison.InvariantCultureIgnoreCase)) { - tgr = TagReadMode.MigrateToTagTable; + tgr = TagReadMode.CommaSeparatedArrayAndTagTable; } TagReadMode = tgr; @@ -35,8 +35,8 @@ public ReadJournalPluginConfig(Configuration.Config config) [Flags] public enum TagReadMode { - DefaultConcatVarchar = 1, - MigrateToTagTable = 3, - TagTableOnly = 2 + CommaSeparatedArray = 1, + TagTable = 2, + CommaSeparatedArrayAndTagTable = 3 } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs b/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs index c29a33c3..74d44b86 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs @@ -136,6 +136,7 @@ private static void MapJournalRow(IProviderConfig config, .Member(r => r.Timestamp) .HasColumnName(tableConfig.ColumnNames.Created); //We can skip writing tags the old way by ignoring the column in mapping. + journalRowBuilder.Member(r => r.tagArr).IsNotColumn(); if ((tableConfig.TagWriteMode & TagWriteMode.CommaSeparatedArray) == 0) { diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs index 3c476932..d9a7a7f3 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs @@ -105,13 +105,14 @@ protected override Try Serialize(IPersistentRepresentation persisten row.manifest = withStringManifest.Manifest(_persistentRepr.Payload); } - else + else if (serializer.IncludeManifest) { - if (serializer.IncludeManifest) - { - row.manifest = _persistentRepr.Payload + row.manifest = _persistentRepr.Payload .GetType().TypeQualifiedName(); - } + } + else + { + row.manifest = ""; } { diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs index afdbe0e5..04703218 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs @@ -121,8 +121,8 @@ public Source< public async Task> AddTagDataIfNeeded(List toAdd, DataConnection context) { - if (_readJournalConfig.PluginConfig.TagReadMode == - TagReadMode.TagTableOnly) + if ((_readJournalConfig.PluginConfig.TagReadMode & + TagReadMode.TagTable) != 0) { var tagRows = await context.GetTable() .Where(r => @@ -151,7 +151,7 @@ public Source< var maxTake = MaxTake(max); switch (_readJournalConfig.PluginConfig.TagReadMode) { - case TagReadMode.DefaultConcatVarchar: + case TagReadMode.CommaSeparatedArray: return AsyncSource.FromEnumerable(new{separator,tag,offset,maxOffset,maxTake,_connectionFactory}, async(input)=> { @@ -166,9 +166,9 @@ public Source< } }).Via(perfectlyMatchTag(tag, separator)) .Via(deserializeFlow); - case TagReadMode.MigrateToTagTable: + case TagReadMode.CommaSeparatedArrayAndTagTable: return eventByTagMigration(tag, offset, maxOffset, separator, maxTake); - case TagReadMode.TagTableOnly: + case TagReadMode.TagTable: return eventByTagTableOnly(tag, offset, maxOffset, separator, maxTake); default: throw new ArgumentOutOfRangeException(); From adc8ce49b0cfc67d10e550b1e3212b73a4da184d Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sun, 20 Mar 2022 15:38:47 -0400 Subject: [PATCH 4/8] fix: code for journal queries was not always respecting isdeleted --- .../Query/Dao/BaseByteReadArrayJournalDAO.cs | 51 ++++++++----------- 1 file changed, 21 insertions(+), 30 deletions(-) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs index 04703218..c7616a3c 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs @@ -101,11 +101,11 @@ public Source< var maxTake = MaxTake(max); - return AsyncSource.FromEnumerable(new {_connectionFactory,maxTake,maxOffset,offset},async(input)=> + return AsyncSource.FromEnumerable(new {t=this,maxTake,maxOffset,offset},async(input)=> { - using (var conn = input._connectionFactory.GetConnection()) + using (var conn = input.t._connectionFactory.GetConnection()) { - var evts = await conn.GetTable() + var evts = await input.t.baseQuery(conn) .OrderBy(r => r.ordering) .Where(r => r.ordering > input.offset && @@ -152,12 +152,12 @@ public Source< switch (_readJournalConfig.PluginConfig.TagReadMode) { case TagReadMode.CommaSeparatedArray: - return AsyncSource.FromEnumerable(new{separator,tag,offset,maxOffset,maxTake,_connectionFactory}, + return AsyncSource.FromEnumerable(new{separator,tag,offset,maxOffset,maxTake,t=this}, async(input)=> { - using (var conn = input._connectionFactory.GetConnection()) + using (var conn = input.t._connectionFactory.GetConnection()) { - return await conn.GetTable() + return await input.t.baseQuery(conn) .Where(r => r.tags.Contains(input.tag)) .OrderBy(r => r.ordering) .Where(r => @@ -187,16 +187,16 @@ private Source< return AsyncSource.FromEnumerable( new { - separator, tag, offset, maxOffset, maxTake, _connectionFactory + separator, tag, offset, maxOffset, maxTake, t=this }, async (input) => { //TODO: Optimize Flow - using (var conn = input._connectionFactory.GetConnection()) + using (var conn = input.t._connectionFactory.GetConnection()) { //First, Get eligible rows. var mainRows = await - conn.GetTable() + input.t.baseQuery(conn) .LeftJoin( conn.GetTable< JournalTagRow>(), @@ -241,37 +241,26 @@ private Source< return AsyncSource.FromEnumerable( new { - separator, tag, offset, maxOffset, maxTake, _connectionFactory + separator, tag, offset, maxOffset, maxTake, t =this }, async (input) => { //NOTE: This flow is probably not performant, //It is meant to allow for safe migration //And is not necessarily intended for long term use - using (var conn = input._connectionFactory.GetConnection()) + using (var conn = input.t._connectionFactory.GetConnection()) { //First, fin - var mainRows = await conn.GetTable() + var mainRows = await input.t.baseQuery(conn) .Where(r => r.ordering.In( - conn.GetTable() - .LeftJoin( - conn.GetTable< - JournalTagRow>(), - (jr, jtr) => - jr.ordering == - jtr.JournalOrderingId, - (jr, jtr) => - new { jr, jtr }) - .Where(r => - r.jr.tags.Contains( - input.tag) || - r.jtr.TagValue == input.tag) - .Select(r => r.jr.ordering) - .OrderBy(r => r) + conn.GetTable().Where(r=>r.TagValue==input.tag).Select(r=>r.JournalOrderingId)) + || r.tags.Contains(input.tag) + ) + .OrderBy(r => r.ordering) .Where(r => - r > input.offset && - r <= input.maxOffset) - .Take(input.maxTake))).ToListAsync(); + r.ordering > input.offset && + r.ordering <= input.maxOffset) + .Take(input.maxTake).ToListAsync(); var tagRows = await conn.GetTable() .Where(r => r.JournalOrderingId.In( @@ -355,6 +344,7 @@ public Source JournalSequence(long offset, long limit) using (var conn = input._connectionFactory.GetConnection()) { + //persistence-jdbc does not filter deleted here. return await conn.GetTable() .Where(r => r.ordering > input.offset) .Select(r => r.ordering) @@ -367,6 +357,7 @@ public async Task MaxJournalSequenceAsync() { using (var db = _connectionFactory.GetConnection()) { + //persistence-jdbc does not filter deleted here. return await db.GetTable() .Select(r => r.ordering) .FirstOrDefaultAsync(); From 6936c1e861b84e6cfa6b94c4ef689358868b44ce Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sun, 17 Apr 2022 13:38:34 -0400 Subject: [PATCH 5/8] Added WIP Tag-join-via UUID Support for users who may want faster writes with tags at expense of size and read speed. Cleaned up ReadJournal Queries --- .../Config/JournalTableConfig.cs | 9 + .../Config/ReadJournalPluginConfig.cs | 1 + .../AkkaPersistenceDataConnectionFactory.cs | 37 +++- .../Journal/DAO/BaseByteArrayJournalDao.cs | 116 ++++++++-- .../Journal/Types/JournalRow.cs | 5 +- .../Query/Dao/BaseByteReadArrayJournalDAO.cs | 198 ++++++++++++------ .../Query/JournalSequenceActor.cs | 6 +- src/Akka.Persistence.Sql.Linq2Db/Readme.MD | 21 +- 8 files changed, 303 insertions(+), 90 deletions(-) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs index f5b532d8..c4c6c046 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs @@ -10,6 +10,12 @@ public enum TagWriteMode TagTable = 2, CommaSeparatedArrayAndTagTable = 3, } + + public enum TagTableMode + { + OrderingId, + SequentialUUID + } public class JournalTableConfig { @@ -22,6 +28,9 @@ public class JournalTableConfig public bool WarnOnAutoInitializeFail { get; } public TagWriteMode TagWriteMode { get; } + public TagTableMode TagTableMode { get; } + public string? TagTableName { get; } + public JournalTableConfig(Configuration.Config config) { diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs index bcd8da16..c27d5806 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs @@ -30,6 +30,7 @@ public ReadJournalPluginConfig(Configuration.Config config) public string TagSeparator { get; set; } public TagReadMode TagReadMode { get; set; } + public TagTableMode TagTableMode { get; } } [Flags] diff --git a/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs b/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs index 74d44b86..c82c1c09 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs @@ -6,6 +6,7 @@ using Akka.Persistence.Sql.Linq2Db.Journal; using Akka.Persistence.Sql.Linq2Db.Journal.Types; using Akka.Persistence.Sql.Linq2Db.Snapshot; +using Akka.Streams.Dsl; using Akka.Util; using LinqToDB; using LinqToDB.Configuration; @@ -41,7 +42,6 @@ public AkkaPersistenceDataConnectionFactory(IProviderConfig opts = new LinqToDbConnectionOptionsBuilder() .UseConnectionString(providerName, connString) .UseMappingSchema(mappingSchema).Build(); - if (providerName.ToLower().StartsWith("sqlserver")) { policy = new SqlServerRetryPolicy(); @@ -154,6 +154,41 @@ private static void MapJournalRow(IProviderConfig config, .Member(r=>r.sequenceNumber).IsPrimaryKey(); } + void SetJoinCol(PropertyMappingBuilder builder, + PropertyMappingBuilder propertyMappingBuilder) + { + if (config.TableConfig.TagTableMode == + TagTableMode.SequentialUUID) + { + builder.Member(r => r.JournalOrderingId) + .IsNotColumn() + .Member(r => r.WriteUUID) + .IsColumn().IsPrimaryKey(); + journalRowBuilder.Member(r => r.WriteUUID) + .IsColumn(); + } + else + { + builder.Member(r => r.WriteUUID) + .IsNotColumn() + .Member(r => r.JournalOrderingId) + .IsColumn().IsPrimaryKey(); + journalRowBuilder.Member(r => r.WriteUUID) + .IsNotColumn(); + } + } + if ((config.TableConfig.TagWriteMode & TagWriteMode.TagTable) != 0) + { + var tagTableBuilder = fmb.Entity() + .HasTableName(tableConfig.TagTableName) + .HasSchemaName(tableConfig.SchemaName) + .Member(r => r.TagValue) + .IsColumn().IsNullable(false) + .HasLength(64) + .IsPrimaryKey(); + SetJoinCol(tagTableBuilder, journalRowBuilder); + } + //Probably overkill, but we only set Metadata Mapping if specified //That we are in delete compatibility mode. if (config.IDaoConfig.SqlCommonCompatibilityMode) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs index 1cc8a141..ed0f14e7 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs @@ -24,16 +24,35 @@ namespace Akka.Persistence.Sql.Linq2Db.Journal.DAO { - public static class EnumerableExtensions + public class SequentialUUIDGenerator { - public static IEnumerable SelectStateful( - this IEnumerable enumerable, TState state, - Func selector) + private long _counter = DateTime.UtcNow.Ticks; + + /// + /// Gets a value to be assigned to a property. + /// + /// The change tracking entry of the entity for which the value is being generated. + /// The value to be assigned to a property. + public Guid Next() { - foreach (var item in enumerable) + var guidBytes = Guid.NewGuid().ToByteArray(); + var counterBytes = BitConverter.GetBytes(Interlocked.Increment(ref _counter)); + + if (!BitConverter.IsLittleEndian) { - yield return selector(item,state); + System.Array.Reverse(counterBytes); } + + guidBytes[08] = counterBytes[1]; + guidBytes[09] = counterBytes[0]; + guidBytes[10] = counterBytes[7]; + guidBytes[11] = counterBytes[6]; + guidBytes[12] = counterBytes[5]; + guidBytes[13] = counterBytes[4]; + guidBytes[14] = counterBytes[3]; + guidBytes[15] = counterBytes[2]; + + return new Guid(guidBytes); } } public abstract class BaseByteArrayJournalDao : @@ -66,6 +85,7 @@ protected BaseByteArrayJournalDao(IAdvancedScheduler sched, Serializer = serializer; deserializeFlow = Serializer.DeserializeFlow(); deserializeFlowMapped = Serializer.DeserializeFlow().Select(MessageWithBatchMapper()); + _uuidGen = new SequentialUUIDGenerator(); //Due to C# rules we have to initialize WriteQueue here //Keeping it here vs init function prevents accidental moving of init //to where variables aren't set yet. @@ -119,6 +139,7 @@ private async Task QueueWriteJournalRows(Seq xs) new TaskCompletionSource( TaskCreationOptions.RunContinuationsAsynchronously ); + //Send promise and rows into queue. If the Queue takes it, //It will write the Promise state when finished writing (or failing) var result = @@ -150,6 +171,7 @@ private async Task QueueWriteJournalRows(Seq xs) private async Task WriteJournalRows(Seq xs) { { + //hot path: //If we only have one row, penalty for BulkCopy //Isn't worth it due to insert caching/transaction/etc. @@ -180,18 +202,18 @@ private async Task InsertSingle(Seq xs) } - private async Task BulkInsertWithTagTableTags(DataConnection dc, + private async Task InsertWithOrderingAndBulkInsertTags(DataConnection dc, Seq xs) { var tagsToInsert = new List(xs.Count); foreach (var journalRow in xs) { var dbid = await dc.InsertWithInt64IdentityAsync(journalRow); - tagsToInsert.AddRange(journalRow.tagArr.SelectStateful(dbid, - (tag, id) => new JournalTagRow() - { JournalOrderingId = id, TagValue = tag })); + foreach (var s1 in journalRow.tagArr) + { + tagsToInsert.Add(new JournalTagRow(){JournalOrderingId = dbid, TagValue = s1}); + } } - await dc.GetTable() .BulkCopyAsync(new BulkCopyOptions() { @@ -216,12 +238,19 @@ await dc.GetTable() MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripBatchSize }, xs); } - private async Task InsertMultiple(Seq xs) { if ((_journalConfig.TableConfig.TagWriteMode & TagWriteMode.TagTable) !=0) { - await HandleTagTableInsert(xs); + if (_journalConfig.TableConfig.TagTableMode == + TagTableMode.OrderingId) + { + await HandleTagTableInsert(xs); + } + else + { + await HandleTagTableUUIDInsert(xs); + } } else { @@ -230,6 +259,61 @@ private async Task InsertMultiple(Seq xs) } + private async Task HandleTagTableUUIDInsert(Seq xs) + { + var tagWrites = new List(); + foreach (var journalRow in xs) + { + if (journalRow.tagArr?.Length > 0) + { + var uid = NextUUID(); + journalRow.WriteUUID = uid; + foreach (var s1 in journalRow.tagArr) + { + tagWrites.Add(new JournalTagRow(){WriteUUID=uid, TagValue = s1}); + } + } + } + + using (var ctx = _connectionFactory.GetConnection()) + { + using (var tx = await ctx.BeginTransactionAsync()) + { + try + { + await ctx.BulkCopyAsync(new BulkCopyOptions() + { + TableName = _journalConfig.TableConfig.TableName, + MaxBatchSize = _journalConfig.DaoConfig + .DbRoundTripBatchSize + },xs); + if (tagWrites.Count > 0) + { + await ctx.BulkCopyAsync(new BulkCopyOptions() + { + TableName = _journalConfig.TableConfig.TagTableName, + MaxBatchSize = _journalConfig.DaoConfig + .DbRoundTripTagBatchSize, + UseParameters = _journalConfig.DaoConfig + .PreferParametersOnMultiRowInsert + }, tagWrites); + } + await ctx.CommitTransactionAsync(); + } + catch (Exception e) + { + await ctx.RollbackTransactionAsync(); + throw; + } + } + } + } + + private Guid NextUUID() + { + return _uuidGen.Next(); + } + private async Task HandleDefaultInsert(Seq xs) { using (var db = _connectionFactory.GetConnection()) @@ -302,7 +386,7 @@ private async Task consumeSequenceForTagInsert(Seq xs, DataConnectio tail.Span(r => r.tagArr.Length > 0); if (hasTags.Count > 0) { - await BulkInsertWithTagTableTags(db, hasTags); + await InsertWithOrderingAndBulkInsertTags(db, hasTags); } } } @@ -560,7 +644,9 @@ private static readonly Expression> sequenceNumberSelector = r => r.SequenceNumber; - + + private readonly SequentialUUIDGenerator _uuidGen; + public async Task Update(string persistenceId, long sequenceNr, object payload) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs index e4311d83..a7ee3e9d 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs @@ -1,4 +1,5 @@ -using LinqToDB; +using System; +using LinqToDB; using LinqToDB.Mapping; namespace Akka.Persistence.Sql.Linq2Db.Journal.Types @@ -8,6 +9,7 @@ public sealed class JournalTagRow { public long JournalOrderingId { get; set; } public string TagValue { get; set; } + public Guid WriteUUID { get; set; } } public sealed class JournalRow { @@ -29,5 +31,6 @@ public JournalRow() public string manifest { get; set; } public int? Identifier { get; set; } public string[] tagArr { get; set; } + public Guid? WriteUUID { get; set; } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs index c7616a3c..4298a034 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Linq.Expressions; using System.Threading.Tasks; using Akka.Actor; using Akka.Persistence.Sql.Linq2Db.Config; @@ -45,6 +46,7 @@ protected BaseByteReadArrayJournalDAO(IAdvancedScheduler ec, protected IQueryable baseQuery(DataConnection connection) { + return connection.GetTable() .Where(jr => includeDeleted == false || (jr.deleted == false)); @@ -124,23 +126,83 @@ public async Task> AddTagDataIfNeeded(List toAdd, D if ((_readJournalConfig.PluginConfig.TagReadMode & TagReadMode.TagTable) != 0) { - var tagRows = await context.GetTable() - .Where(r => - r.JournalOrderingId.In(toAdd.Select(r => r.ordering))) - .OrderBy(r=>r.JournalOrderingId) - .ToListAsync(); + await addTagDataFromTagTable(toAdd, context); + } + return toAdd; + } + + private async Task addTagDataFromTagTable(List toAdd, DataConnection context) + { + var pred = TagCheckPredicate(toAdd); + var tagRows = pred.HasValue + ? await context.GetTable() + .Where(pred.Value) + .ToListAsync() + : new List(); + if (_readJournalConfig.TableConfig.TagTableMode == + TagTableMode.OrderingId) + { + foreach (var journalRow in toAdd) + { + journalRow.tagArr = + tagRows.Where(r => + r.JournalOrderingId == + journalRow.ordering) + .Select(r => r.TagValue) + .ToArray(); + } + } + else + { foreach (var journalRow in toAdd) { journalRow.tagArr = tagRows.Where(r => - r.JournalOrderingId == - journalRow.ordering) - .Select(r => r.TagValue) - .ToArray(); + r.WriteUUID == + journalRow.WriteUUID) + .Select(r => r.TagValue) + .ToArray(); } } + } - return toAdd; + public Option>> TagCheckPredicate( + List toAdd) + { + if (_readJournalConfig.PluginConfig.TagTableMode == + TagTableMode.SequentialUUID) + { + //Check whether we have anything to query for two reasons: + //1: Linq2Db may choke on an empty 'in' set. + //2: Don't wanna make a useless round trip to the DB, + // if we know nothing is tagged. + var set = toAdd.Where(r => r.WriteUUID.HasValue) + .Select(r => r.WriteUUID.Value).ToList(); + if (set.Count == 0) + { + return Option>>.None; + } + else + { + return new Option>>(r => + r.WriteUUID.In(set)); + } + } + else + { + //We can just check the count here. + //Alas, we won't know if there are tags + //Until we actually query on this one. + if (toAdd.Count == 0) + { + return Option>>.None; + } + else + { + return new Option>>( r => + r.JournalOrderingId.In(toAdd.Select(r => r.ordering))); + } + } } public Source< Akka.Util.Try<(IPersistentRepresentation, IImmutableSet, long)>, @@ -195,43 +257,35 @@ private Source< using (var conn = input.t._connectionFactory.GetConnection()) { //First, Get eligible rows. - var mainRows = await - input.t.baseQuery(conn) - .LeftJoin( - conn.GetTable< - JournalTagRow>(), - (jr, jtr) => + var mainRows = await + input.t.baseQuery(conn) + .LeftJoin( + conn.GetTable< + JournalTagRow>(), + _readJournalConfig.TableConfig + .TagTableMode == + TagTableMode.OrderingId + ? (jr, jtr) => jr.ordering == - jtr.JournalOrderingId, - (jr, jtr) => - new { jr, jtr }) - .Where(r => - r.jtr.TagValue == input.tag) - .Select(r=>r.jr) - .Where(r => - r.ordering > input.offset && - r.ordering <= input.maxOffset) - .Take(input.maxTake).ToListAsync(); - //Then, Get the tags for the rows. - var tagRows = await conn.GetTable() - .Where(r => - r.JournalOrderingId.In( - mainRows.Select(r => - r.ordering))) - .ToListAsync(); - foreach (var journalRow in mainRows) - { - journalRow.tagArr = - tagRows.Where(r => - r.JournalOrderingId == - journalRow.ordering) - .Select(r => r.TagValue) - .ToArray(); - } - + jtr.JournalOrderingId + : (jr, jtr) => + jr.WriteUUID == jtr.WriteUUID, + (jr, jtr) => + new { jr, jtr }) + .Where(r => + r.jtr.TagValue == input.tag) + .Select(r => r.jr) + .Where(r => + r.ordering > input.offset && + r.ordering <= input.maxOffset) + .Take(input.maxTake).ToListAsync(); + await addTagDataFromTagTable(mainRows, conn); return mainRows; } - }).Via(perfectlyMatchTag(tag, separator)) + }) + //We still PerfectlyMatchTag here + //Because DB Collation :) + .Via(perfectlyMatchTag(tag, separator)) .Via(deserializeFlow); } @@ -250,33 +304,40 @@ private Source< //And is not necessarily intended for long term use using (var conn = input.t._connectionFactory.GetConnection()) { - //First, fin + //First, find the rows. + //We use IN here instead of leftjoin + //because it's safer from a + //'avoid duplicate rows tripping things up later' + //standpoint. var mainRows = await input.t.baseQuery(conn) - .Where(r => r.ordering.In( - conn.GetTable().Where(r=>r.TagValue==input.tag).Select(r=>r.JournalOrderingId)) - || r.tags.Contains(input.tag) - ) + .Where( + _readJournalConfig.TableConfig + .TagTableMode == TagTableMode.OrderingId + ? r => r.ordering.In( + conn.GetTable< + JournalTagRow>() + .Where(r => + r.TagValue == + input.tag) + .Select(r => + r.JournalOrderingId)) + || r.tags.Contains(input.tag) + : r => r.WriteUUID.Value.In( + conn.GetTable< + JournalTagRow>() + .Where(r => + r.TagValue == + input.tag) + .Select(r => + r.WriteUUID)) + || r.tags.Contains(input.tag) + ) .OrderBy(r => r.ordering) - .Where(r => - r.ordering > input.offset && - r.ordering <= input.maxOffset) - .Take(input.maxTake).ToListAsync(); - var tagRows = await conn.GetTable() .Where(r => - r.JournalOrderingId.In( - mainRows.Select(r => - r.ordering))) - .ToListAsync(); - foreach (var journalRow in mainRows) - { - journalRow.tagArr = - tagRows.Where(r => - r.JournalOrderingId == - journalRow.ordering) - .Select(r => r.TagValue) - .ToArray(); - } - + r.ordering > input.offset && + r.ordering <= input.maxOffset) + .Take(input.maxTake).ToListAsync(); + await addTagDataFromTagTable(mainRows, conn); return mainRows; } }).Via(perfectlyMatchTag(tag, separator)) @@ -360,6 +421,7 @@ public async Task MaxJournalSequenceAsync() //persistence-jdbc does not filter deleted here. return await db.GetTable() .Select(r => r.ordering) + .OrderByDescending(r=>r) .FirstOrDefaultAsync(); } } diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/JournalSequenceActor.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/JournalSequenceActor.cs index b71cd6f0..efab6533 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/JournalSequenceActor.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/JournalSequenceActor.cs @@ -72,7 +72,7 @@ protected bool _receive(object message, long currentMaxOrdering, .RunWith(Sink.Seq(), _mat) .PipeTo(Self, sender: Self, success: res => - new NewOrderingIds(currentMaxOrdering, res)); + new NewOrderingIds(currentMaxOrdering, res),f=> new Status.Failure(f)); } else if (message is NewOrderingIds nids) { @@ -186,9 +186,7 @@ protected override void PreStart() self.Tell(new QueryOrderingIds()); try { - - - _readJournalDao.MaxJournalSequenceAsync().ContinueWith(t => + _readJournalDao.MaxJournalSequenceAsync().ContinueWith(t => { if (t.IsFaulted) { diff --git a/src/Akka.Persistence.Sql.Linq2Db/Readme.MD b/src/Akka.Persistence.Sql.Linq2Db/Readme.MD index fe826cff..9a29c031 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Readme.MD +++ b/src/Akka.Persistence.Sql.Linq2Db/Readme.MD @@ -57,7 +57,26 @@ Please read the documentation carefully. Some features may be specific to use ca - Classes used in place of ValueTuples in certain areas - We don't have separate Query classes at this time. This can definitely be improved in future - A couple of places around `WriteMessagesAsync` have had their logic moved to facilitate performance (i.e. use of `await` instead of `ContinueWith`) - - Backwards Compatibility mode is implemented, to interoperate with existing journals and snapsho stores. + - Backwards Compatibility mode is implemented, to interoperate with existing journals and snapsho stores. + + - Tag Table Support (Alpha): + - Allows the writing of tags to a separate table to allow for different performance strategies when working with tags. + - Supports Two Tag Table Modes: + - WriteUUID: The tag table and join uses a 'sequential-uuid' type field that will have lower page splits while allowing for good row locality on insert. + - This option is intended for those who want maximum write performance, at the expense of database storage and load. + - OrderingId: Uses the Journal Row's 'ordering' sequential Int64 for the tag table and join. + - This option is intended for those who want more efficient use of the DB's space + - This will result in slower writes, but faster/more efficient reads. + - Provides multiple modes of operation for reads and writes, note that there are separate switches for both read and write! + - CommaSeparatedOnly: The old behavior, where the comma separated tags are held in a column + - CommaSeparatedAndTagTable: Will Read/Write from both the Comma Separated column as well as the Tag Table + - TagTableOnly: will only use the tag table for Read/Write + - 'Live' Migration should be possible via the following flow: + 1. Run Migration scripts to create new columns/tables. + 2. Rolling Deploy your system with Reads and Writes in 'CommaSeparatedAndTagTable' mode. + 3. Rolling deploy your system (again), with Writes now in 'TagTableOnly' mode. + 4. Run Migration App/Script to move existing tags into tag table. + 5. Rolling deploy your system (last one!) with Reads now in 'TagTableOnly' mode. ## Currently Implemented: From 627d159f42b0d9aff5679888c5a69461108ff40e Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sun, 17 Apr 2022 13:54:51 -0400 Subject: [PATCH 6/8] Try refactoring to make older compiler happy about expressions. --- .../Query/Dao/BaseByteReadArrayJournalDAO.cs | 62 ++++++++++--------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs index 4298a034..742a12f8 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs @@ -262,14 +262,7 @@ private Source< .LeftJoin( conn.GetTable< JournalTagRow>(), - _readJournalConfig.TableConfig - .TagTableMode == - TagTableMode.OrderingId - ? (jr, jtr) => - jr.ordering == - jtr.JournalOrderingId - : (jr, jtr) => - jr.WriteUUID == jtr.WriteUUID, + EventsByTagOnlyJoinPredicate, (jr, jtr) => new { jr, jtr }) .Where(r => @@ -289,6 +282,16 @@ private Source< .Via(deserializeFlow); } + private Expression> EventsByTagOnlyJoinPredicate => + _readJournalConfig.TableConfig + .TagTableMode == + TagTableMode.OrderingId + ? (jr, jtr) => + jr.ordering == + jtr.JournalOrderingId + : (jr, jtr) => + jr.WriteUUID == jtr.WriteUUID; + private Source, long)>, NotUsed> eventByTagMigration(string tag, long offset, long maxOffset, string separator, int maxTake) { @@ -311,26 +314,7 @@ private Source< //standpoint. var mainRows = await input.t.baseQuery(conn) .Where( - _readJournalConfig.TableConfig - .TagTableMode == TagTableMode.OrderingId - ? r => r.ordering.In( - conn.GetTable< - JournalTagRow>() - .Where(r => - r.TagValue == - input.tag) - .Select(r => - r.JournalOrderingId)) - || r.tags.Contains(input.tag) - : r => r.WriteUUID.Value.In( - conn.GetTable< - JournalTagRow>() - .Where(r => - r.TagValue == - input.tag) - .Select(r => - r.WriteUUID)) - || r.tags.Contains(input.tag) + eventsByTagMigrationPredicate(conn, input.tag) ) .OrderBy(r => r.ordering) .Where(r => @@ -344,6 +328,28 @@ private Source< .Via(deserializeFlow); } + private Expression> eventsByTagMigrationPredicate(DataConnection conn, string tagVal) + { + return _readJournalConfig.TableConfig + .TagTableMode == TagTableMode.OrderingId + ? r => r.ordering.In( + Queryable.Where(conn.GetTable< + JournalTagRow>(), r => + r.TagValue == + tagVal) + .Select(r => + r.JournalOrderingId)) + || r.tags.Contains(tagVal) + : r => r.WriteUUID.Value.In( + Queryable.Where(conn.GetTable< + JournalTagRow>(), r => + r.TagValue == + tagVal) + .Select(r => + r.WriteUUID)) + || r.tags.Contains(tagVal); + } + private Flow perfectlyMatchTag( string tag, string separator) From bc5257925c518e97841cf3942185394e1eb3d028 Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sun, 17 Apr 2022 14:01:58 -0400 Subject: [PATCH 7/8] Try Unwinding ternarys to make compiler happy. --- .../Query/Dao/BaseByteReadArrayJournalDAO.cs | 64 +++++++++++-------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs index 742a12f8..eae422ab 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs @@ -282,15 +282,21 @@ private Source< .Via(deserializeFlow); } - private Expression> EventsByTagOnlyJoinPredicate => - _readJournalConfig.TableConfig - .TagTableMode == - TagTableMode.OrderingId - ? (jr, jtr) => - jr.ordering == - jtr.JournalOrderingId - : (jr, jtr) => - jr.WriteUUID == jtr.WriteUUID; + private Expression> EventsByTagOnlyJoinPredicate + { + get + { + if (_readJournalConfig.TableConfig + .TagTableMode == + TagTableMode.OrderingId) + return (jr, jtr) => + jr.ordering == + jtr.JournalOrderingId; + else + return (jr, jtr) => + jr.WriteUUID == jtr.WriteUUID; + } + } private Source, long)>, NotUsed> eventByTagMigration(string tag, long offset, long maxOffset, string separator, int maxTake) @@ -330,24 +336,28 @@ private Source< private Expression> eventsByTagMigrationPredicate(DataConnection conn, string tagVal) { - return _readJournalConfig.TableConfig - .TagTableMode == TagTableMode.OrderingId - ? r => r.ordering.In( - Queryable.Where(conn.GetTable< - JournalTagRow>(), r => - r.TagValue == - tagVal) - .Select(r => - r.JournalOrderingId)) - || r.tags.Contains(tagVal) - : r => r.WriteUUID.Value.In( - Queryable.Where(conn.GetTable< - JournalTagRow>(), r => - r.TagValue == - tagVal) - .Select(r => - r.WriteUUID)) - || r.tags.Contains(tagVal); + if (_readJournalConfig.TableConfig.TagTableMode == TagTableMode.OrderingId) + { + return (JournalRow r) => r.ordering.In( + conn.GetTable< + JournalTagRow>().Where(r => + r.TagValue == + tagVal) + .Select(r => + r.JournalOrderingId)) + || r.tags.Contains(tagVal); + } + else + { + return (JournalRow r) => r.WriteUUID.Value.In( + conn.GetTable< + JournalTagRow>().Where(r => + r.TagValue == + tagVal) + .Select(r => + r.WriteUUID)) + || r.tags.Contains(tagVal); + } } private Flow perfectlyMatchTag( From 37780993f9b4ce6a716516b173c1a22f9c41477d Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sun, 17 Apr 2022 14:21:47 -0400 Subject: [PATCH 8/8] WIP Allow proper handling of EventManifest --- .../Config/JournalTableConfig.cs | 1 + .../Db/AkkaPersistenceDataConnectionFactory.cs | 11 +++++++++++ .../Journal/DAO/ByteArrayJournalSerializer.cs | 5 +++-- .../Journal/Types/JournalRow.cs | 1 + 4 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs index c4c6c046..bdd78d96 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs @@ -30,6 +30,7 @@ public class JournalTableConfig public TagWriteMode TagWriteMode { get; } public TagTableMode TagTableMode { get; } public string? TagTableName { get; } + public bool UseEventManifestColumn { get; } public JournalTableConfig(Configuration.Config config) { diff --git a/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs b/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs index c82c1c09..589a8580 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs @@ -177,6 +177,17 @@ void SetJoinCol(PropertyMappingBuilder builder, .IsNotColumn(); } } + + if (config.TableConfig.UseEventManifestColumn) + { + journalRowBuilder.Member(r => r.eventManifest) + .IsColumn().HasLength(64); + } + else + { + journalRowBuilder.Member(r => r.eventManifest) + .IsNotColumn(); + } if ((config.TableConfig.TagWriteMode & TagWriteMode.TagTable) != 0) { var tagTableBuilder = fmb.Entity() diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs index d9a7a7f3..b070bd9e 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs @@ -123,6 +123,7 @@ protected override Try Serialize(IPersistentRepresentation persisten _persistentRepr.PersistenceId; row.Identifier = serializer.Identifier; row.sequenceNumber = _persistentRepr.SequenceNr; + row.eventManifest = _persistentRepr.Manifest; } return new Try(row ); @@ -156,7 +157,7 @@ protected override Try Serialize(IPersistentRepresentation persisten state.message, state.type); }), t.sequenceNumber, t.persistenceId, - t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp), + t.eventManifest??t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp), t.tags?.Split(_separatorArray, StringSplitOptions.RemoveEmptyEntries) .ToImmutableHashSet() ?? t.tagArr?.ToImmutableHashSet()?? ImmutableHashSet.Empty, @@ -169,7 +170,7 @@ protected override Try Serialize(IPersistentRepresentation persisten new Persistent(_serializer.Deserialize(t.message, identifierMaybe.Value,t.manifest), t.sequenceNumber, t.persistenceId, - t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp), + t.eventManifest?? t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp), t.tags?.Split(_separatorArray, StringSplitOptions.RemoveEmptyEntries) .ToImmutableHashSet() ??t.tagArr?.ToImmutableHashSet()?? ImmutableHashSet.Empty, diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs index a7ee3e9d..9573af07 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs @@ -32,5 +32,6 @@ public JournalRow() public int? Identifier { get; set; } public string[] tagArr { get; set; } public Guid? WriteUUID { get; set; } + public string eventManifest { get; set; } } } \ No newline at end of file