Skip to content

Commit

Permalink
Get HybridCache.Benchmark running (still not correctly configured, an…
Browse files Browse the repository at this point in the history
…d should have a direct-to-iblobcache mode).
  • Loading branch information
lilith committed May 24, 2024
1 parent 1d82686 commit 0ff63c2
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 197 deletions.
120 changes: 120 additions & 0 deletions src/Imazen.Routing/Promises/Pipelines/BlobCachingTestHarness.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
using Imazen.Abstractions.BlobCache;
using Imazen.Abstractions.Blobs;
using Imazen.Abstractions.Logging;
using Imazen.Abstractions.Resulting;
using Imazen.Common.Concurrency.BoundedTaskCollection;
using Imazen.Routing.Caching;
using Imazen.Routing.Health;
using Imazen.Routing.Requests;
using Microsoft.Extensions.Hosting;

namespace Imazen.Routing.Promises.Pipelines;

public record BlobCachingTestHarnessOptions(
long? MaxUploadQueueBytes,
MemoryCacheOptions? MemoryCacheOptions,
bool DelayRequestUntilUploadsComplete,
List<List<IBlobCache>> SeriesOfCacheGroups,
List<IBlobCache> SaveToCaches,
Func<IRequestSnapshot, CancellationToken, ValueTask<CodeResult<IBlobWrapper>>> BlobProvider,
LatencyTrackingZone BlobProviderLatencyZone,
IReLogger Logger,
bool LockByUniqueRequest,
bool ShutdownServices)
{
public static BlobCachingTestHarnessOptions TestSingleCacheSync(IBlobCache cache, Func<IRequestSnapshot, CancellationToken, ValueTask<CodeResult<IBlobWrapper>>> blobProvider, IReLogger logger)
{
return new BlobCachingTestHarnessOptions(
null,
null,
true,
new List<List<IBlobCache>> { new List<IBlobCache> { cache } },
new List<IBlobCache> { cache },
blobProvider,
new LatencyTrackingZone("TestBlobProvider", 10000,true),
logger,
false,
true
);
}
}


public class BlobCachingTestHarness: IHostedService
{
BlobCachingTestHarnessOptions options;
BlobPipelineHarness blobPipelineHarness;
BoundedTaskCollection<BlobTaskItem>? uploadQueue;
CacheHealthTracker cacheHealthTracker;
CancellationTokenSource CancellationTokenSource { get; } = new CancellationTokenSource();
public BlobCachingTestHarness(BlobCachingTestHarnessOptions options)
{
this.options = options;
if (options.MaxUploadQueueBytes != null)
{
uploadQueue = new BoundedTaskCollection<BlobTaskItem>(options.MaxUploadQueueBytes.Value, CancellationTokenSource);
// Now ensure caches wait for uploads to write before shutting down.
foreach(var c in options.SaveToCaches)
c.Initialize(new BlobCacheSupportData(() => uploadQueue!.AwaitAllCurrentTasks()));
}
cacheHealthTracker = new CacheHealthTracker(options.Logger);
var cacheEngineOptions = new CacheEngineOptions
{
HealthTracker = cacheHealthTracker,
SeriesOfCacheGroups = options.SeriesOfCacheGroups,
SaveToCaches = options.SaveToCaches,
Logger = options.Logger,
UploadQueue = uploadQueue,
DelayRequestUntilUploadsComplete = options.DelayRequestUntilUploadsComplete,
LockByUniqueRequest = options.LockByUniqueRequest,
BlobFactory = new SimpleReusableBlobFactory()
};
var cacheEngine = new CacheEngine(null, cacheEngineOptions);
blobPipelineHarness = new BlobPipelineHarness(new BlobPipelineHarnessOptions(
cacheEngine,
options.BlobProvider,
options.Logger,
options.BlobProviderLatencyZone));

}

public async ValueTask<CodeResult<IBlobWrapper>> RequestBlobWrapper(string path, string query = "",
CancellationToken cancellationToken = default)
{
return await blobPipelineHarness.RequestBlobWrapper(path, query, cancellationToken);
}

public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

public async Task StopAsync(CancellationToken cancellationToken)
{
if (uploadQueue != null)
{
await uploadQueue.StopAsync(cancellationToken);
}

await cacheHealthTracker.StopAsync(cancellationToken);
if (options.ShutdownServices)
{
var allCaches = options.SeriesOfCacheGroups.SelectMany(x => x).Concat(options.SaveToCaches);
foreach (var cache in allCaches)
{
if (cache is IHostedService service)
{
await service.StopAsync(cancellationToken);
}
}
}
}

public async Task AwaitEnqueuedTasks()
{
if (uploadQueue != null)
{
await uploadQueue.AwaitAllCurrentTasks();
}
}
}
89 changes: 89 additions & 0 deletions src/Imazen.Routing/Promises/Pipelines/BlobPipelineHarness.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using Imazen.Abstractions.Blobs;
using Imazen.Abstractions.Logging;
using Imazen.Abstractions.Resulting;
using Imazen.Routing.Engine;
using Imazen.Routing.HttpAbstractions;
using Imazen.Routing.Layers;
using Imazen.Routing.Requests;
using Microsoft.Extensions.Logging;

namespace Imazen.Routing.Promises.Pipelines;

public record BlobPipelineHarnessOptions(
IBlobPromisePipeline Pipeline,
Func<IRequestSnapshot, CancellationToken, ValueTask<CodeResult<IBlobWrapper>>> BlobProvider,
IReLogger Logger, LatencyTrackingZone BlobOriginLatencyZone);

public class BlobPipelineHarness
{
readonly RoutingEngine router;
IBlobPromisePipeline pipeline;
IReLogger logger;

public BlobPipelineHarness(RoutingEngine router, IBlobPromisePipeline pipeline, IReLogger logger)
{
this.router = router;
this.pipeline = pipeline;
this.logger = logger;
}
public BlobPipelineHarness(BlobPipelineHarnessOptions options)
{
var routingBuilder = new RoutingBuilder().AddEndpointLayer(
new SimpleLayer("BlobEndpoint", req =>
{
var endpoint =
new PromiseWrappingEndpoint(
new CacheableBlobPromise(req, options.BlobOriginLatencyZone, options.BlobProvider));
return CodeResult<IRoutingEndpoint>.Ok(endpoint);
}, null));
router = routingBuilder.Build(options.Logger);
pipeline = options.Pipeline;
logger = options.Logger;

}

public async ValueTask<CodeResult<IBlobWrapper>> RequestBlobWrapper(string path, string query = "",
CancellationToken cancellationToken = default)
{
var request = new EmptyHttpRequest(path, query);
var mutableRequest = MutableRequest.OriginalRequest(request);
return await Request(mutableRequest, cancellationToken);
}

public async ValueTask<CodeResult<IBlobWrapper>> Request(MutableRequest mutableRequest, CancellationToken cancellationToken = default)
{
var result = await router.RouteToPromiseAsync(mutableRequest, cancellationToken);
if (result == null)
{
return CodeResult<IBlobWrapper>.Err((404, "No route found"));
}
if (result.IsError)
{
return CodeResult<IBlobWrapper>.Err(result.Error);
}

var outerRequest = mutableRequest.OriginatingRequest ?? new EmptyHttpRequest(mutableRequest);

var pipelineResult = await pipeline.GetFinalPromiseAsync(
result.Unwrap(),router, pipeline, outerRequest,cancellationToken);

var finalPromise = pipelineResult.Unwrap();

if (finalPromise.HasDependencies)
{
var dependencyResult = await finalPromise.RouteDependenciesAsync(router, cancellationToken);
if (dependencyResult.IsError)
{
return CodeResult<IBlobWrapper>.Err(dependencyResult.Error);
}
}
var blobResult =
await finalPromise.TryGetBlobAsync(mutableRequest, router, pipeline, cancellationToken);
if (blobResult.IsError)
{
return CodeResult<IBlobWrapper>.Err(blobResult.Error);
}
return CodeResult<IBlobWrapper>.Ok(blobResult.Unwrap());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="3.*" />
</ItemGroup>

<ItemGroup>
<Compile Remove="BenchHarness.cs" />
</ItemGroup>

</Project>
Loading

0 comments on commit 0ff63c2

Please sign in to comment.