Skip to content

Commit

Permalink
feat(all): retry and clean queue system (#77)
Browse files Browse the repository at this point in the history
- fix: bug first call http scale 0 to 1
- feature: add clean queue system
- refactor: upgrade dependencies
- feature: retry pattern now configurable
  • Loading branch information
guillaume-chervet authored Dec 31, 2024
1 parent f9716ef commit fda13cd
Show file tree
Hide file tree
Showing 48 changed files with 1,502 additions and 458 deletions.
39 changes: 32 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

Why use SlimFaas?
- Scale to 0 after a period of inactivity (work with deployment and statefulset)
- Scale up : compatible with HPA (Horizontal Auto Scaler) and Keda (SlimFaas integrated autonomous Scale Up coming soon)
- Synchronous HTTP calls
- Asynchronous HTTP calls
- Allows you to limit the number of parallel HTTP requests for each underlying function
- Retry: 3 times with graduation: 2 seconds, 4 seconds, 8 seconds
- Retry Pattern configurable
- Private and Public functions
- Private functions can be accessed only by internal namespace http call from pods
- Synchronous Publish/Subscribe internal events via HTTP calls to every replicas via HTTP without any use of specific drivers/libraries (**Couple you application with SlimFaas**)
- Synchronous Publish/Subscribe internal events via HTTP calls to every replicas via HTTP without any use of specific drivers/libraries (**Couple your application with SlimFaas**)
- Mind Changer: REST API that show the status of your functions and allow to wake up your infrastructure (**Couple your application with Slimfaas**)
- Very useful to inform end users that your infrastructure is starting
- Plug and Play: just deploy a standard pod
Expand All @@ -27,7 +28,8 @@ To test SlimFaas on your local machine by using kubernetes with Docker Desktop,

```bash
git clone https://github.com/AxaFrance/slimfaas.git
cd slimfaas/demo
cd slimfaas
cd demo
# Create slimfaas service account and pods
kubectl apply -f deployment-slimfaas.yml
# Expose SlimFaaS service as NodePort or Ingress
Expand All @@ -40,7 +42,8 @@ kubectl apply -f deployment-functions.yml
# Install MySql
kubectl apply -f deployment-mysql.yml
# to run Single Page webapp demo (optional) on http://localhost:8000
docker run -p 8000:8000 --rm axaguildev/fibonacci-webapp:latest
docker run -d -p 8000:8000 --rm axaguildev/fibonacci-webapp:latest
kubectl port-forward svc/slimfaas-nodeport 30021:5000 -n slimfaas-demo
```

Now, you can access your pod via SlimFaas proxy:
Expand Down Expand Up @@ -372,6 +375,28 @@ spec:
- **SlimFaas/ExcludeDeploymentsFromVisibilityPrivate** : ""
- Comma separated list of deployment names or statefulset names
- Message from that pods will be considered as public. It is useful if you want to exclude some pods from the private visibility, for example for a backend for frontend.
- **SlimFaas/Configuration** : json configuration default values displayed below
- Allows you to define a configuration for your functions. For example, you can define a timeout for HTTP calls, a retry pattern for timeouts and HTTP status codes.

````bash
{
"DefaultSync":{
"HttpTimeout": 120, # Timeout in seconds
"TimeoutRetries": [2,4,8] # Retry pattern in seconds
"HttpStatusRetries": [500,502,503] # Retry only for 500,502,503 HTTP status codes
}
"DefaultAsync":{
"HttpTimeout": 120, # Timeout in seconds
"TimeoutRetries": [2,4,8] # Retry pattern in seconds
"HttpStatusRetries": [500,502,503] # Retry only for 500,502,503 HTTP status codes
},
"DefaultPublish":{
"HttpTimeout": 120, # Timeout in seconds
"TimeoutRetries": [2,4,8] # Retry pattern in seconds
"HttpStatusRetries": [500,502,503] # Retry only for 500,502,503 HTTP status codes
}
}
````
- **SlimFaas/Schedule** : json configuration
- Allows you to define a schedule for your functions. If you want to wake up your infrastructure at 07:00 or for example scale down after 60 seconds of inactivity after 07:00 and scale down after 10 seconds of inactivity after 21:00. Time zones are defined as IANA time zones. The full list is available [here](https://nodatime.org/TimeZones)

Expand All @@ -380,10 +405,10 @@ spec:
{
"TimeZoneID":"Europe/Paris", # Time Zone ID can be found here: https://nodatime.org/TimeZones
"Default":{
"WakeUp":["07:00"], // Wake up your infrastructure at 07:00
"WakeUp":["07:00"], # Wake up your infrastructure at 07:00
"ScaleDownTimeout":[
{"Time":"07:00","Value":20}, // Scale down after 20 seconds of inactivity after 07:00
{"Time":"21:00","Value":10} // Scale down after 10 seconds of inactivity after 21:00
{"Time":"07:00","Value":20}, # Scale down after 20 seconds of inactivity after 07:00
{"Time":"21:00","Value":10} # Scale down after 10 seconds of inactivity after 21:00
]
}
}
Expand Down
46 changes: 42 additions & 4 deletions demo/deployment-functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,28 @@ spec:
automountServiceAccountToken: false
containers:
- name: fibonacci1
image: axaguildev/fibonacci:0.24.18
image: axaguildev/fibonacci:latest
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 5
readinessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 3
periodSeconds: 1
timeoutSeconds: 1
successThreshold: 1
failureThreshold: 3
env:
- name: ASPNETCORE_URLS
value: http://+:5000
- name: Logging__LogLevel__Default
value: Debug
resources:
limits:
memory: "96Mi"
Expand Down Expand Up @@ -65,14 +76,23 @@ spec:
automountServiceAccountToken: false
containers:
- name: fibonacci2
image: axaguildev/fibonacci:0.24.18
image: axaguildev/fibonacci:latest
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 5
readinessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 3
periodSeconds: 1
timeoutSeconds: 1
successThreshold: 1
failureThreshold: 3
env:
- name: ASPNETCORE_URLS
value: http://+:5000
Expand Down Expand Up @@ -109,14 +129,23 @@ spec:
automountServiceAccountToken: false
containers:
- name: fibonacci3
image: axaguildev/fibonacci:0.24.18
image: axaguildev/fibonacci:latest
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 5
readinessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 3
periodSeconds: 1
timeoutSeconds: 1
successThreshold: 1
failureThreshold: 3
env:
- name: ASPNETCORE_URLS
value: http://+:5000
Expand Down Expand Up @@ -158,14 +187,23 @@ spec:
automountServiceAccountToken: false
containers:
- name: fibonacci4
image: axaguildev/fibonacci:0.24.18
image: axaguildev/fibonacci:latest
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 5
readinessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 3
periodSeconds: 1
timeoutSeconds: 1
successThreshold: 1
failureThreshold: 3
env:
- name: ASPNETCORE_URLS
value: http://+:5000
Expand Down
30 changes: 29 additions & 1 deletion demo/deployment-slimfaas.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ roleRef:
name: deployment-statefulset-manager
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: endpoints-viewer
namespace: slimfaas-demo
rules:
- apiGroups: [""]
resources: ["endpoints"]
verbs: ["get", "list", "watch"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: slimfaas-endpoints-viewer
namespace: slimfaas-demo
subjects:
- kind: ServiceAccount
name: slimfaas
namespace: slimfaas-demo
roleRef:
kind: Role
name: endpoints-viewer
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
Expand All @@ -84,7 +109,7 @@ spec:
serviceAccountName: slimfaas
containers:
- name: slimfaas
image: axaguildev/slimfaas:0.24.18
image: axaguildev/slimfaas:latest
livenessProbe:
httpGet:
path: /health
Expand Down Expand Up @@ -114,6 +139,9 @@ spec:
value: "Debug"
#- name: SLIMDATA_CONFIGURATION
# value: |
# {"coldStart":"true"}
#- name: SLIMDATA_CONFIGURATION
# value: |
# {"lowerElectionTimeout":"500","upperElectionTimeout":"1000","requestTimeout":"00:01:20.0000000","rpcTimeout":"00:00:40.0000000","heartbeatThreshold":"0.5"}
#- name: SLIMDATA_SOCKETS_HTTP_HANDLER_TIMEOUT
# value: "500"
Expand Down
8 changes: 7 additions & 1 deletion src/Fibonacci/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@

app.MapGet("/health", () => "OK");

app.MapGet("/error", async () =>
{
await Task.Delay(100);
throw new Exception("Error");
});

app.MapGet("/hello/{name}", ([FromServices] ILogger<Fibonacci> logger, string name) =>
{
logger.LogInformation("Hello Called with name: {Name}", name);
Expand All @@ -46,7 +52,7 @@
[FromServices] Fibonacci fibonacci,
FibonacciInput input) =>
{
logger.LogInformation("Fibonacci Called");
logger.LogInformation("Fibonacci Called with input: {Input}", input.Input);
var output = new FibonacciOutput();
output.Result = fibonacci.Run(input.Input);
logger.LogInformation("Fibonacci output: {Output}", output.Result);
Expand Down
2 changes: 1 addition & 1 deletion src/SlimData/Commands/AddKeyValueCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public struct AddKeyValueCommand : ISerializable<AddKeyValueCommand>
public string Key { get; set; }
public ReadOnlyMemory<byte> Value { get; set; }

long? IDataTransferObject.Length => sizeof(int) + Value.Length;
long? IDataTransferObject.Length => Encoding.UTF8.GetByteCount(Key) + Value.Length;

public async ValueTask WriteToAsync<TWriter>(TWriter writer, CancellationToken token)
where TWriter : notnull, IAsyncBinaryWriter
Expand Down
45 changes: 45 additions & 0 deletions src/SlimData/Commands/ListCallbackCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System.Text;
using DotNext.IO;
using DotNext.Runtime.Serialization;
using DotNext.Text;

namespace SlimData.Commands;

public struct ListCallbackCommand : ISerializable<ListCallbackCommand>
{
public const int Id = 15;

public string Identifier { get; set; }
public string Key { get; set; }

public int HttpCode { get; set; }

public long NowTicks { get; set; }

public async ValueTask WriteToAsync<TWriter>(TWriter writer, CancellationToken token) where TWriter : notnull, IAsyncBinaryWriter
{
var command = this;
await writer.EncodeAsync(command.Identifier.AsMemory(), new EncodingContext(Encoding.UTF8, false),
LengthFormat.LittleEndian, token).ConfigureAwait(false);
await writer.EncodeAsync(command.Key.AsMemory(), new EncodingContext(Encoding.UTF8, false),
LengthFormat.LittleEndian, token).ConfigureAwait(false);
await writer.WriteLittleEndianAsync(HttpCode, token).ConfigureAwait(false);
await writer.WriteLittleEndianAsync(NowTicks, token).ConfigureAwait(false);
}

long? IDataTransferObject.Length => Encoding.UTF8.GetByteCount(Identifier) + sizeof(int) + Encoding.UTF8.GetByteCount(Key) + sizeof(long);

public static async ValueTask<ListCallbackCommand> ReadFromAsync<TReader>(TReader reader, CancellationToken token) where TReader : notnull, IAsyncBinaryReader
{
var identifier = await reader.DecodeAsync( new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token).ConfigureAwait(false);
var key = await reader.DecodeAsync( new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token).ConfigureAwait(false);

return new ListCallbackCommand
{
Identifier = identifier.ToString(),
Key = key.ToString(),
HttpCode = await reader.ReadLittleEndianAsync<Int32>(token).ConfigureAwait(false),
NowTicks = await reader.ReadLittleEndianAsync<Int64>(token).ConfigureAwait(false)
};
}
}
50 changes: 48 additions & 2 deletions src/SlimData/Commands/ListLeftPushCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,42 @@ public struct ListLeftPushCommand : ISerializable<ListLeftPushCommand>
public const int Id = 13;

public string Key { get; set; }

public string Identifier { get; set; }
public long NowTicks { get; set; }

public int RetryTimeout { get; set; }

public List<int> Retries { get; set; }

public List<int> HttpStatusCodesWorthRetrying { get; set; }

public ReadOnlyMemory<byte> Value { get; set; }

long? IDataTransferObject.Length => sizeof(int) + Value.Length;
long? IDataTransferObject.Length => Encoding.UTF8.GetByteCount(Key) + Value.Length + Encoding.UTF8.GetByteCount(Identifier) + sizeof(long) + sizeof(int) + Retries.Count * sizeof(int) + sizeof(int) + sizeof(int) + HttpStatusCodesWorthRetrying.Count * sizeof(int);

public async ValueTask WriteToAsync<TWriter>(TWriter writer, CancellationToken token)
where TWriter : notnull, IAsyncBinaryWriter
{
var command = this;
await writer.EncodeAsync(command.Key.AsMemory(), new EncodingContext(Encoding.UTF8, false),
LengthFormat.LittleEndian, token).ConfigureAwait(false);
await writer.EncodeAsync(command.Identifier.AsMemory(), new EncodingContext(Encoding.UTF8, false),
LengthFormat.LittleEndian, token).ConfigureAwait(false);
await writer.WriteLittleEndianAsync(NowTicks, token).ConfigureAwait(false);
await writer.WriteLittleEndianAsync(RetryTimeout, token).ConfigureAwait(false);
await writer.WriteAsync(command.Value, LengthFormat.Compressed, token).ConfigureAwait(false);
await writer.WriteLittleEndianAsync(Retries.Count, token).ConfigureAwait(false);
foreach (var retry in Retries)
{
await writer.WriteLittleEndianAsync(retry, token).ConfigureAwait(false);
}
await writer.WriteLittleEndianAsync(HttpStatusCodesWorthRetrying.Count, token).ConfigureAwait(false);
foreach (var httpStatus in HttpStatusCodesWorthRetrying)
{
await writer.WriteLittleEndianAsync(httpStatus, token).ConfigureAwait(false);
}

}

#pragma warning disable CA2252
Expand All @@ -29,11 +54,32 @@ public static async ValueTask<ListLeftPushCommand> ReadFromAsync<TReader>(TReade
where TReader : notnull, IAsyncBinaryReader
{
var key = await reader.DecodeAsync(new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token).ConfigureAwait(false);
var identifier = await reader.DecodeAsync(new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token).ConfigureAwait(false);
var nowTicks = await reader.ReadLittleEndianAsync<Int64>(token).ConfigureAwait(false);
var timeout = await reader.ReadLittleEndianAsync<Int32>(token).ConfigureAwait(false);
using var value = await reader.ReadAsync(LengthFormat.Compressed, token: token).ConfigureAwait(false);
var retriesCount = await reader.ReadLittleEndianAsync<Int32>(token).ConfigureAwait(false);
var retries = new List<int>(retriesCount);
while (retriesCount-- > 0)
{
retries.Add(await reader.ReadLittleEndianAsync<Int32>(token).ConfigureAwait(false));
}
var httpStatusCodesWorthRetryingCount = await reader.ReadLittleEndianAsync<Int32>(token).ConfigureAwait(false);
var httpStatusCodesWorthRetrying = new List<int>(httpStatusCodesWorthRetryingCount);
while (httpStatusCodesWorthRetryingCount-- > 0)
{
httpStatusCodesWorthRetrying.Add(await reader.ReadLittleEndianAsync<Int32>(token).ConfigureAwait(false));
}

return new ListLeftPushCommand
{
Key = key.ToString(),
Value = value.Memory.ToArray()
Identifier = identifier.ToString(),
NowTicks = nowTicks,
RetryTimeout = timeout,
Retries = retries,
Value = value.Memory.ToArray(),
HttpStatusCodesWorthRetrying = httpStatusCodesWorthRetrying
};
}
}
Loading

0 comments on commit fda13cd

Please sign in to comment.