Skip to content

Commit

Permalink
Hystrix tests updates (#98)
Browse files Browse the repository at this point in the history
* Reset more things in more places when performing resets in Hystrix
* Limit dotnet test to a single CPU, un-skip the "FlakyOnHostedAgents" test category
* Make tests less sensitive to available CPU resources
  • Loading branch information
dtillman authored and Tim Hess committed Aug 21, 2019
1 parent 07d519e commit 19e3a82
Show file tree
Hide file tree
Showing 61 changed files with 2,454 additions and 2,233 deletions.
9 changes: 5 additions & 4 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
inputs:
command: test
projects: '**/*.Test/*.csproj'
arguments: '-c $(buildConfiguration) --filter "Category!=SkipOnLinux&Category!=FlakyOnHostedAgents"'
arguments: '-c $(buildConfiguration) -maxcpucount:1 --filter "Category!=SkipOnLinux"'
- job: MacOS_Build_and_Test
pool:
vmImage: 'macOS-10.14'
Expand Down Expand Up @@ -76,7 +76,7 @@ jobs:
inputs:
command: test
projects: '**/*.Test/*.csproj'
arguments: '-c $(buildConfiguration) --filter "Category!=SkipOnMacOS&Category!=FlakyOnHostedAgents"'
arguments: '-c $(buildConfiguration) -maxcpucount:1 --filter "Category!=SkipOnMacOS"'
- job: Windows_Build_Test_and_Package
pool:
vmImage: 'windows-2019'
Expand Down Expand Up @@ -130,7 +130,8 @@ jobs:
inputs:
command: test
projects: 'src/Steeltoe.All.sln'
arguments: '-c $(buildConfiguration) /p:TreatWarningsAsErrors=True /p:CopyLocalLockFileAssemblies=true --filter "Category!=Integration&Category!=FlakyOnHostedAgents" /p:CollectCoverage=true /p:CoverletOutputFormat="opencover" /p:Include="[Steeltoe.*]*" /p:Exclude="[*.Test]*"'
# arguments: '-c $(buildConfiguration) /p:TreatWarningsAsErrors=True /p:CopyLocalLockFileAssemblies=true --filter "Category!=Integration&Category!=FlakyOnHostedAgents" /p:CollectCoverage=true /p:CoverletOutputFormat="opencover" /p:Include="[Steeltoe.*]*" /p:Exclude="[*.Test]*"'
arguments: '-c $(buildConfiguration) -maxcpucount:1 /p:TreatWarningsAsErrors=True /p:CopyLocalLockFileAssemblies=true --filter "Category!=Integration" /p:CollectCoverage=true /p:CoverletOutputFormat="opencover" /p:Include="[Steeltoe.*]*" /p:Exclude="[*.Test]*"'
# Generate the report using ReportGenerator (https://github.com/danielpalme/ReportGenerator)
# First install the tool on the machine, then run it
- pwsh: |
Expand Down Expand Up @@ -174,4 +175,4 @@ jobs:
PathtoPublish: $(Build.ArtifactStagingDirectory)
ArtifactName: Packages
publishLocation: Container
condition: always()
condition: always()
3 changes: 3 additions & 0 deletions src/CircuitBreaker/src/HystrixBase/HystrixCollapserMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public static ICollection<HystrixCollapserMetrics> GetInstances()

internal static void Reset()
{
RollingCollapserEventCounterStream.Reset();
CumulativeCollapserEventCounterStream.Reset();
RollingCollapserBatchSizeDistributionStream.Reset();
Metrics.Clear();
}

Expand Down
8 changes: 8 additions & 0 deletions src/CircuitBreaker/src/HystrixBase/HystrixCommandMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ internal static void Reset()
metricsInstance.UnsubscribeAll();
}

RollingCommandEventCounterStream.Reset();
CumulativeCommandEventCounterStream.Reset();
RollingCommandLatencyDistributionStream.Reset();
RollingCommandUserLatencyDistributionStream.Reset();
RollingCommandMaxConcurrencyStream.Reset();
HystrixThreadEventStream.Reset();
HealthCountsStream.Reset();

Metrics.Clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ private static bool HasExecutedCommandsOnThread(HystrixThreadPoolMetrics threadP

internal static void Reset()
{
RollingThreadPoolEventCounterStream.Reset();
CumulativeThreadPoolEventCounterStream.Reset();
RollingThreadPoolMaxConcurrencyStream.Reset();

Metrics.Clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,5 +150,10 @@ public long GetTotalCount()
{
return totalCount;
}

public override string ToString()
{
return "[Mean: " + GetMean() + "/Total: " + GetTotalCount() + "]";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace Steeltoe.CircuitBreaker.Hystrix.Metric.Consumer
Expand Down Expand Up @@ -50,7 +51,7 @@ protected BucketedCounterStream(IHystrixEventStream<Event> inputEventStream, int
{
return inputEventStream
.Observe()
.Window(TimeSpan.FromMilliseconds(bucketSizeInMs)) // bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
.Window(TimeSpan.FromMilliseconds(bucketSizeInMs), NewThreadScheduler.Default) // bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
.SelectMany((b) =>
{
return reduceBucketToSummary(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public Output Latest
{
get
{
StartCachingStreamValuesIfUnstarted();
if (counterSubject.TryGetValue(out Output v))
{
return v;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ public Output Latest
{
get
{
StartCachingStreamValuesIfUnstarted();
if (counterSubject.TryGetValue(out Output v))
{
return v;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,22 @@ public static CumulativeCollapserEventCounterStream GetInstance(IHystrixCollapse

public static CumulativeCollapserEventCounterStream GetInstance(IHystrixCollapserKey collapserKey, int numBuckets, int bucketSizeInMs)
{
return Streams.GetOrAddEx(collapserKey.Name, (k) => new CumulativeCollapserEventCounterStream(collapserKey, numBuckets, bucketSizeInMs, HystrixCollapserMetrics.AppendEventToBucket, HystrixCollapserMetrics.BucketAggregator));
return Streams.GetOrAddEx(collapserKey.Name, (k) =>
{
var stream = new CumulativeCollapserEventCounterStream(collapserKey, numBuckets, bucketSizeInMs, HystrixCollapserMetrics.AppendEventToBucket, HystrixCollapserMetrics.BucketAggregator);
stream.StartCachingStreamValuesIfUnstarted();
return stream;
});
}

public static void Reset()
{
foreach (var stream in Streams.Values)
{
stream.Unsubscribe();
}

HystrixCollapserEventStream.Reset();
Streams.Clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,24 @@ public static CumulativeCommandEventCounterStream GetInstance(IHystrixCommandKey

public static CumulativeCommandEventCounterStream GetInstance(IHystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs)
{
var result = Streams.GetOrAddEx(commandKey.Name, (k) => new CumulativeCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs, HystrixCommandMetrics.AppendEventToBucket, HystrixCommandMetrics.BucketAggregator));
var result = Streams.GetOrAddEx(commandKey.Name, (k) =>
{
var stream = new CumulativeCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs, HystrixCommandMetrics.AppendEventToBucket, HystrixCommandMetrics.BucketAggregator);
stream.StartCachingStreamValuesIfUnstarted();
return stream;
});
return result;
}

public static void Reset()
{
foreach (var stream in Streams.Values)
{
stream.Unsubscribe();
}

HystrixCommandCompletionStream.Reset();

Streams.Clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,23 @@ public static CumulativeThreadPoolEventCounterStream GetInstance(IHystrixThreadP

public static CumulativeThreadPoolEventCounterStream GetInstance(IHystrixThreadPoolKey threadPoolKey, int numBuckets, int bucketSizeInMs)
{
return Streams.GetOrAddEx(threadPoolKey.Name, (k) => new CumulativeThreadPoolEventCounterStream(threadPoolKey, numBuckets, bucketSizeInMs, HystrixThreadPoolMetrics.AppendEventToBucket, HystrixThreadPoolMetrics.CounterAggregator));
return Streams.GetOrAddEx(threadPoolKey.Name, (k) =>
{
var stream = new CumulativeThreadPoolEventCounterStream(threadPoolKey, numBuckets, bucketSizeInMs, HystrixThreadPoolMetrics.AppendEventToBucket, HystrixThreadPoolMetrics.CounterAggregator);
stream.StartCachingStreamValuesIfUnstarted();
return stream;
});
}

public static void Reset()
{
foreach (var stream in Streams.Values)
{
stream.Unsubscribe();
}

HystrixThreadPoolCompletionStream.Reset();

Streams.Clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public static HealthCountsStream GetInstance(IHystrixCommandKey commandKey, int

public static void Reset()
{
foreach (var stream in Streams.Values)
{
stream.Unsubscribe();
}

HystrixCommandCompletionStream.Reset();

Streams.Clear();
}

Expand All @@ -64,6 +71,12 @@ public static void RemoveByKey(IHystrixCommandKey key)
Streams.TryRemove(key.Name, out HealthCountsStream old);
}

internal static HealthCountsStream GetInstance(string commandKey)
{
Streams.TryGetValue(commandKey, out HealthCountsStream result);
return result;
}

private HealthCountsStream(IHystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs, Func<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion)
: base(HystrixCommandCompletionStream.GetInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, HealthCheckAccumulator)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,22 @@ public static RollingCollapserBatchSizeDistributionStream GetInstance(IHystrixCo

public static RollingCollapserBatchSizeDistributionStream GetInstance(IHystrixCollapserKey collapserKey, int numBuckets, int bucketSizeInMs)
{
return Streams.GetOrAddEx(collapserKey.Name, (k) => new RollingCollapserBatchSizeDistributionStream(collapserKey, numBuckets, bucketSizeInMs));
return Streams.GetOrAddEx(collapserKey.Name, (k) =>
{
var stream = new RollingCollapserBatchSizeDistributionStream(collapserKey, numBuckets, bucketSizeInMs);
stream.StartCachingStreamValuesIfUnstarted();
return stream;
});
}

public static void Reset()
{
foreach (var stream in Streams.Values)
{
stream.Unsubscribe();
}

HystrixCollapserEventStream.Reset();
Streams.Clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,24 @@ public static RollingCollapserEventCounterStream GetInstance(IHystrixCollapserKe

public static RollingCollapserEventCounterStream GetInstance(IHystrixCollapserKey collapserKey, int numBuckets, int bucketSizeInMs)
{
var result = Streams.GetOrAddEx(collapserKey.Name, (k) => new RollingCollapserEventCounterStream(collapserKey, numBuckets, bucketSizeInMs, HystrixCollapserMetrics.AppendEventToBucket, HystrixCollapserMetrics.BucketAggregator));
var result = Streams.GetOrAddEx(collapserKey.Name, (k) =>
{
var stream = new RollingCollapserEventCounterStream(collapserKey, numBuckets, bucketSizeInMs, HystrixCollapserMetrics.AppendEventToBucket, HystrixCollapserMetrics.BucketAggregator);
stream.StartCachingStreamValuesIfUnstarted();
return stream;
});
return result;
}

public static void Reset()
{
foreach (var stream in Streams.Values)
{
stream.Unsubscribe();
}

HystrixCollapserEventStream.Reset();

Streams.Clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,23 @@ public static RollingCommandEventCounterStream GetInstance(IHystrixCommandKey co

public static RollingCommandEventCounterStream GetInstance(IHystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs)
{
var result = Streams.GetOrAddEx(commandKey.Name, (k) => new RollingCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs, HystrixCommandMetrics.AppendEventToBucket, HystrixCommandMetrics.BucketAggregator));
var result = Streams.GetOrAddEx(commandKey.Name, (k) =>
{
var stream = new RollingCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs, HystrixCommandMetrics.AppendEventToBucket, HystrixCommandMetrics.BucketAggregator);
stream.StartCachingStreamValuesIfUnstarted();
return stream;
});
return result;
}

public static void Reset()
{
foreach (var stream in Streams.Values)
{
stream.Unsubscribe();
}

HystrixCommandCompletionStream.Reset();
Streams.Clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,24 @@ public static RollingCommandLatencyDistributionStream GetInstance(IHystrixComman

public static RollingCommandLatencyDistributionStream GetInstance(IHystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs)
{
var result = Streams.GetOrAddEx(commandKey.Name, (k) => new RollingCommandLatencyDistributionStream(commandKey, numBuckets, bucketSizeInMs));
var result = Streams.GetOrAddEx(commandKey.Name, (k) =>
{
var stream = new RollingCommandLatencyDistributionStream(commandKey, numBuckets, bucketSizeInMs);
stream.StartCachingStreamValuesIfUnstarted();
return stream;
});
return result;
}

public static void Reset()
{
foreach (var stream in Streams.Values)
{
stream.Unsubscribe();
}

HystrixCommandCompletionStream.Reset();

Streams.Clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,24 @@ public static RollingCommandMaxConcurrencyStream GetInstance(IHystrixCommandKey

public static RollingCommandMaxConcurrencyStream GetInstance(IHystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs)
{
var result = Streams.GetOrAddEx(commandKey.Name, (k) => new RollingCommandMaxConcurrencyStream(commandKey, numBuckets, bucketSizeInMs));
var result = Streams.GetOrAddEx(commandKey.Name, (k) =>
{
var stream = new RollingCommandMaxConcurrencyStream(commandKey, numBuckets, bucketSizeInMs);
stream.StartCachingStreamValuesIfUnstarted();
return stream;
});
return result;
}

public static void Reset()
{
foreach (var stream in Streams.Values)
{
stream.Unsubscribe();
}

HystrixCommandStartStream.Reset();

Streams.Clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,24 @@ public static RollingCommandUserLatencyDistributionStream GetInstance(IHystrixCo

public static RollingCommandUserLatencyDistributionStream GetInstance(IHystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs)
{
var result = Streams.GetOrAddEx(commandKey.Name, (k) => new RollingCommandUserLatencyDistributionStream(commandKey, numBuckets, bucketSizeInMs));
var result = Streams.GetOrAddEx(commandKey.Name, (k) =>
{
var stream = new RollingCommandUserLatencyDistributionStream(commandKey, numBuckets, bucketSizeInMs);
stream.StartCachingStreamValuesIfUnstarted();
return stream;
});
return result;
}

public static void Reset()
{
foreach (var stream in Streams.Values)
{
stream.Unsubscribe();
}

HystrixCommandCompletionStream.Reset();

Streams.Clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Observable.Aliases;
using System.Reactive.Subjects;
Expand Down Expand Up @@ -54,7 +55,7 @@ protected RollingConcurrencyStream(IHystrixEventStream<HystrixCommandExecutionSt
rollingMaxStream = inputEventStream
.Observe()
.Map((arg) => GetConcurrencyCountFromEvent(arg))
.Window(TimeSpan.FromMilliseconds(bucketSizeInMs))
.Window(TimeSpan.FromMilliseconds(bucketSizeInMs), NewThreadScheduler.Default)
.SelectMany((arg) => ReduceStreamToMax(arg))
.StartWith(emptyRollingMaxBuckets)
.Window(numBuckets, 1)
Expand Down Expand Up @@ -84,7 +85,6 @@ public long LatestRollingMax
{
get
{
StartCachingStreamValuesIfUnstarted();
rollingMax.TryGetValue(out int value);
return value;
}
Expand Down
Loading

0 comments on commit 19e3a82

Please sign in to comment.