Skip to content

Commit

Permalink
Cancel Kubernetes Job when CancelScript is called (#742)
Browse files Browse the repository at this point in the history
  • Loading branch information
APErebus authored Dec 15, 2023
1 parent cd04022 commit 338a2e8
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 55 deletions.
50 changes: 18 additions & 32 deletions source/Octopus.Tentacle/Kubernetes/KubernetesJobService.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using k8s;
using k8s.Autorest;
using k8s.Models;
using Octopus.Tentacle.Contracts;

namespace Octopus.Tentacle.Kubernetes
{
public interface IKubernetesJobService
{
Task<V1Job?> TryGet(ScriptTicket scriptTicket, CancellationToken cancellationToken);
string BuildJobName(ScriptTicket scriptTicket);
Task CreateJob(V1Job job, CancellationToken cancellationToken);
void Delete(ScriptTicket scriptTicket);
Task Delete(ScriptTicket scriptTicket, CancellationToken cancellationToken);
Task Watch(ScriptTicket scriptTicket, Func<V1Job, bool> onChange, Action<Exception> onError, CancellationToken cancellationToken);
Task SuspendJob(ScriptTicket scriptTicket, CancellationToken cancellationToken);
}

public class KubernetesJobService : KubernetesService, IKubernetesJobService
Expand All @@ -25,19 +23,19 @@ public KubernetesJobService(IKubernetesClientConfigProvider configProvider)
{
}

public async Task<V1Job?> TryGet(ScriptTicket scriptTicket, CancellationToken cancellationToken)
public async Task SuspendJob(ScriptTicket scriptTicket, CancellationToken cancellationToken)
{
var jobName = BuildJobName(scriptTicket);

try
var patchJob = new V1Job
{
return await Client.ReadNamespacedJobStatusAsync(jobName, KubernetesConfig.Namespace, cancellationToken: cancellationToken);
}
catch (HttpOperationException opException)
when (opException.Response.StatusCode == HttpStatusCode.NotFound)
{
return null;
}
Spec = new V1JobSpec
{
Suspend = true
}
};
var patchYaml = KubernetesJson.Serialize(patchJob);
await Client.PatchNamespacedJobAsync(new V1Patch(patchYaml, V1Patch.PatchType.MergePatch), jobName, KubernetesConfig.Namespace, cancellationToken: cancellationToken);
}

public async Task Watch(ScriptTicket scriptTicket, Func<V1Job, bool> onChange, Action<Exception> onError, CancellationToken cancellationToken)
Expand All @@ -54,33 +52,21 @@ public async Task Watch(ScriptTicket scriptTicket, Func<V1Job, bool> onChange, A

await foreach (var (type, job) in response.WatchAsync<V1Job, V1JobList>(onError, cancellationToken: cancellationToken))
{
//we are only watching for modifications
if (type != WatchEventType.Modified)
//watch for modifications and deletions
if (type is not (WatchEventType.Modified or WatchEventType.Deleted))
continue;

var stopWatching = onChange(job);
if (stopWatching)
//we stop watching when told to or if this is deleted
if (stopWatching || type is WatchEventType.Deleted)
break;
}
}

public string BuildJobName(ScriptTicket scriptTicket) => $"octopus-{scriptTicket.TaskId}".ToLowerInvariant();
public string BuildJobName(ScriptTicket scriptTicket) => $"octopus-job-{scriptTicket.TaskId}".ToLowerInvariant();

public async Task CreateJob(V1Job job, CancellationToken cancellationToken)
{
await Client.CreateNamespacedJobAsync(job, KubernetesConfig.Namespace, cancellationToken: cancellationToken);
}
public async Task CreateJob(V1Job job, CancellationToken cancellationToken) => await Client.CreateNamespacedJobAsync(job, KubernetesConfig.Namespace, cancellationToken: cancellationToken);

public void Delete(ScriptTicket scriptTicket)
{
try
{
Client.DeleteNamespacedJob(BuildJobName(scriptTicket), KubernetesConfig.Namespace);
}
catch
{
//we are comfortable silently consuming this as the jobs have a TTL that will clean it up anyway
}
}
public async Task Delete(ScriptTicket scriptTicket, CancellationToken cancellationToken) => await Client.DeleteNamespacedJobAsync(BuildJobName(scriptTicket), KubernetesConfig.Namespace, cancellationToken: cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@ public KubernetesJobScriptExecutor(IKubernetesJobService jobService, IKubernetes

public IRunningScript ExecuteOnBackgroundThread(StartScriptCommandV3Alpha command, IScriptWorkspace workspace, ScriptStateStore scriptStateStore, CancellationToken cancellationToken)
{
var runningScript = new RunningKubernetesJob(workspace, workspace.CreateLog(), command.ScriptTicket, command.TaskId, cancellationToken, log, scriptStateStore, jobService, containerResolver, appInstanceSelector);
var runningScript = new RunningKubernetesJob(workspace, workspace.CreateLog(), command.ScriptTicket, command.TaskId, log, scriptStateStore, jobService, containerResolver, appInstanceSelector, cancellationToken);

Task.Run(async () =>
{
await runningScript.Execute(cancellationToken);
}, cancellationToken);
Task.Run(() => runningScript.Execute(), cancellationToken);

return runningScript;
}
Expand Down
65 changes: 47 additions & 18 deletions source/Octopus.Tentacle/Kubernetes/Scripts/RunningKubernetesJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ public class RunningKubernetesJob : IRunningScript
readonly IScriptStateStore stateStore;
readonly IKubernetesJobService jobService;
readonly IKubernetesJobContainerResolver containerResolver;
readonly CancellationToken scriptCancellationToken;
CancellationToken scriptCancellationToken;
readonly string? instanceName;
readonly KubernetesJobOutputStreamWriter outputStreamWriter;
readonly string jobName;

public int ExitCode { get; private set; }
public ProcessState State { get; private set; }
Expand All @@ -46,12 +47,12 @@ public RunningKubernetesJob(
IScriptLog scriptLog,
ScriptTicket scriptTicket,
string taskId,
CancellationToken scriptCancellationToken,
ILog log,
IScriptStateStore stateStore,
IKubernetesJobService jobService,
IKubernetesJobContainerResolver containerResolver,
IApplicationInstanceSelector appInstanceSelector)
IApplicationInstanceSelector appInstanceSelector,
CancellationToken scriptCancellationToken)
{
this.workspace = workspace;
this.scriptTicket = scriptTicket;
Expand All @@ -65,35 +66,60 @@ public RunningKubernetesJob(
instanceName = appInstanceSelector.Current.InstanceName;

outputStreamWriter = new KubernetesJobOutputStreamWriter(workspace);

// this doesn't change, so build it once
jobName = jobService.BuildJobName(scriptTicket);
}

public async Task Execute(CancellationToken taskCancellationToken)
public async Task Execute()
{
var exitCode = -1;

var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(scriptCancellationToken, taskCancellationToken);
var cancellationToken = linkedCancellationTokenSource.Token;
try
{
using var writer = ScriptLog.CreateWriter();

//register a cancellation callback so that when the script is cancelled, we cancel the job
//we use a using to make sure this callback is deregistered
using var cancellationTokenRegistration = scriptCancellationToken.Register(() =>
{
//we spawn the job cancellation on a background thread (as this callback runs synchronously)
Task.Run(async () =>
{
try
{
writer.WriteVerbose($"Cancelling Kubernetes Job '{jobName}'.");
//first we suspend the job, which terminates the underlying pods
await jobService.SuspendJob(scriptTicket, CancellationToken.None);
//then we delete the job (because we no longer need it)
await jobService.Delete(scriptTicket, CancellationToken.None);
writer.WriteVerbose($"Cancelled Kubernetes Job '{jobName}'.");
}
catch (Exception e)
{
writer.WriteOutput(ProcessOutputSource.StdErr, $"Failed to cancel Kubernetes job {jobName}. {e}");
}
}, CancellationToken.None);
});

try
{
using (ScriptIsolationMutex.Acquire(workspace.IsolationLevel,
workspace.ScriptMutexAcquireTimeout,
workspace.ScriptMutexName ?? nameof(RunningKubernetesJob),
message => writer.WriteOutput(ProcessOutputSource.StdOut, message),
taskId,
cancellationToken,
scriptCancellationToken,
log))
{
//create the k8s job
await CreateJob(writer, cancellationToken);
await CreateJob(writer, scriptCancellationToken);

State = ProcessState.Running;
RecordScriptHasStarted(writer);

//we now need to monitor the resulting pod status
exitCode = await MonitorJobAndLogs(writer, cancellationToken);
exitCode = await MonitorJobAndLogs(writer);
}
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -125,11 +151,10 @@ public async Task Execute(CancellationToken taskCancellationToken)
}
}

async Task<int> MonitorJobAndLogs(IScriptLogWriter writer, CancellationToken cancellationToken)
async Task<int> MonitorJobAndLogs(IScriptLogWriter writer)
{
var jobCompletionCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

var checkJobTask = CheckIfJobHasCompleted(cancellationToken, jobCompletionCancellationTokenSource);
var jobCompletionCancellationTokenSource = new CancellationTokenSource();
var checkJobTask = CheckIfJobHasCompleted(jobCompletionCancellationTokenSource);

//we pass the job completion CTS here because its used to cancel the writing of the job stream
var monitorJobOutputTask = outputStreamWriter.StreamJobLogsToScriptLog(writer, jobCompletionCancellationTokenSource.Token);
Expand All @@ -143,7 +168,7 @@ async Task<int> MonitorJobAndLogs(IScriptLogWriter writer, CancellationToken can
return checkJobTask.Result;
}

async Task<int> CheckIfJobHasCompleted(CancellationToken cancellationToken, CancellationTokenSource jobCompletionCancellationTokenSource)
async Task<int> CheckIfJobHasCompleted(CancellationTokenSource jobCompletionCancellationTokenSource)
{
var resultStatusCode = 0;
await jobService.Watch(scriptTicket, job =>
Expand All @@ -165,7 +190,13 @@ await jobService.Watch(scriptTicket, job =>
{
log.Error(ex);
resultStatusCode = 0;
}, cancellationToken);
}, CancellationToken.None);

//if the job was killed by cancellation, then we need to change the exit code
if (scriptCancellationToken.IsCancellationRequested)
{
resultStatusCode = ScriptExitCodes.CanceledExitCode;
}

jobCompletionCancellationTokenSource.Cancel();

Expand All @@ -179,8 +210,6 @@ async Task CreateJob(IScriptLogWriter writer, CancellationToken cancellationToke

var scriptName = Path.GetFileName(workspace.BootstrapScriptFilePath);

var jobName = jobService.BuildJobName(scriptTicket);

//Deserialize the volume configuration from the environment configuration
var volumes = KubernetesYaml.Deserialize<List<V1Volume>>(KubernetesConfig.JobVolumeYaml);

Expand Down Expand Up @@ -247,7 +276,7 @@ async Task CreateJob(IScriptLogWriter writer, CancellationToken cancellationToke
}
};

writer.WriteVerbose($"Executing script in Kubernetes Job '{job.Name()}'");
writer.WriteVerbose($"Executing script in Kubernetes Job '{job.Name()}'.");

await jobService.CreateJob(job, cancellationToken);
}
Expand Down

0 comments on commit 338a2e8

Please sign in to comment.