Skip to content

Commit

Permalink
most of the chnages
Browse files Browse the repository at this point in the history
  • Loading branch information
KollaAdithya committed Nov 30, 2023
1 parent 3b212ee commit 6365868
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 31 deletions.
17 changes: 17 additions & 0 deletions internal/pkg/docker/dockerengine/dockerengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand All @@ -29,6 +30,10 @@ type Cmd interface {
RunWithContext(ctx context.Context, name string, args []string, opts ...exec.CmdOption) error
}

type exitCodeError interface {
ExitCode() int
}

// Operating systems and architectures supported by docker.
const (
OSLinux = "linux"
Expand Down Expand Up @@ -342,6 +347,13 @@ func (c DockerCmdClient) Run(ctx context.Context, options *RunOptions) error {
exec.Stdout(stdout),
exec.Stderr(stderr),
exec.NewProcessGroup()); err != nil {
var ec exitCodeError
if errors.As(err, &ec) {
return &ErrContainerExited{
name: options.ContainerName,
exitcode: ec.ExitCode(),
}
}
return fmt.Errorf("running container: %w", err)
}
return nil
Expand Down Expand Up @@ -446,6 +458,11 @@ type ErrContainerExited struct {
exitcode int
}

// ExitCode returns the OS exit code configured for this error.
func (e *ErrContainerExited) ExitCode() int {
return e.exitcode
}

// ErrContainerExited represents docker container exited with an exitcode.
func (e *ErrContainerExited) Error() string {
return fmt.Sprintf("container %q exited with code %d", e.name, e.exitcode)
Expand Down
24 changes: 24 additions & 0 deletions internal/pkg/docker/dockerengine/dockerengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"io"
"os"
osexec "os/exec"
"path/filepath"
"strings"
Expand Down Expand Up @@ -686,6 +687,29 @@ func TestDockerCommand_Run(t *testing.T) {
},
wantedError: fmt.Errorf("running container: %w", mockError),
},

"should return error when container exits": {
containerName: mockPauseContainer,
command: mockCommand,
uri: mockImageURI,
setupMocks: func(controller *gomock.Controller) {
mockCmd = NewMockCmd(controller)

mockCmd.EXPECT().RunWithContext(gomock.Any(), "docker", []string{"run",
"--name", mockPauseContainer,
"mockImageUri",
"sleep", "infinity"}, gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, name string, args []string, opts ...exec.CmdOption) error {
// Simulate an zero exit code.
return &osexec.ExitError{ProcessState: &os.ProcessState{}}
})
},
wantedError: &ErrContainerExited{
name: mockPauseContainer,
exitcode: 0,
},
},

"success with run options for pause container": {
containerName: mockPauseContainer,
ports: mockContainerPorts,
Expand Down
140 changes: 128 additions & 12 deletions internal/pkg/docker/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"time"

"github.com/aws/copilot-cli/internal/pkg/docker/dockerengine"
"github.com/aws/copilot-cli/internal/pkg/graph"
"github.com/aws/copilot-cli/internal/pkg/term/log"
"golang.org/x/sync/errgroup"
)

// Orchestrator manages running a Task. Only a single Task
Expand Down Expand Up @@ -49,6 +52,8 @@ type logOptionsFunc func(name string, ctr ContainerDefinition) dockerengine.RunL
type DockerEngine interface {
Run(context.Context, *dockerengine.RunOptions) error
IsContainerRunning(context.Context, string) (bool, error)
IsContainerCompleteOrSuccess(ctx context.Context, containerName string) (bool, int, error)
IsContainerHealthy(ctx context.Context, containerName string) (bool, error)
Stop(context.Context, string) error
Build(ctx context.Context, args *dockerengine.BuildArguments, w io.Writer) error
Exec(ctx context.Context, container string, out io.Writer, cmd string, args ...string) error
Expand All @@ -68,6 +73,18 @@ const (
proxyPortStart = uint16(50000)
)

const (
ctrStateUnknown = "unknown"
ctrStateRunningOrExited = "RunningOrExited"
)

const (
ctrStateHealthy = "healthy"
ctrStateComplete = "complete"
ctrStateSuccess = "success"
ctrStateStart = "start"
)

//go:embed Pause-Dockerfile
var pauseDockerfile string

Expand Down Expand Up @@ -194,8 +211,8 @@ func (a *runTaskAction) Do(o *Orchestrator) error {

// start the pause container
opts := o.pauseRunOptions(a.task)
o.run(pauseCtrTaskID, opts)
if err := o.waitForContainerToStart(ctx, opts.ContainerName); err != nil {
o.run(pauseCtrTaskID, opts, true, cancel)
if err := o.waitForContainerToStart(ctx, opts.ContainerName, true); err != nil {
return fmt.Errorf("wait for pause container to start: %w", err)
}

Expand All @@ -222,16 +239,43 @@ func (a *runTaskAction) Do(o *Orchestrator) error {
// TODO(Aiden): Implement a container ID system or use `docker ps` to ensure containers are stopped
time.Sleep(1 * time.Second)
}

for name, ctr := range a.task.Containers {
name, ctr := name, ctr
o.run(taskID, o.containerRunOptions(name, ctr))
}

o.curTask = a.task
depGraph := buildDependencyGraph(a.task.Containers)
err := depGraph.UpwardTraversal(ctx, func(ctx context.Context, containerName string) error {
if len(a.task.Containers[containerName].DependsOn) > 0 {
if err := o.waitForContainerDependencies(ctx, containerName, a); err != nil {
return fmt.Errorf("wait for container %s dependencies: %w", containerName, err)
}
}
o.run(taskID, o.containerRunOptions(containerName, a.task.Containers[containerName]), a.task.Containers[containerName].IsEssential, cancel)
return o.waitForContainerToStart(ctx, o.containerID(containerName), a.task.Containers[containerName].IsEssential)
}, ctrStateUnknown, ctrStateRunningOrExited)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return fmt.Errorf("upward traversal: %w", err)
}
return nil
}

func buildDependencyGraph(containers map[string]ContainerDefinition) *graph.Graph[string] {
var vertices []string
for vertex := range containers {
vertices = append(vertices, vertex)
}
dependencyGraph := graph.New(vertices, graph.WithStatus[string](ctrStateUnknown))
for containerName, container := range containers {
for depCtr := range container.DependsOn {
dependencyGraph.Add(graph.Edge[string]{
From: containerName,
To: depCtr,
})
}
}
return dependencyGraph
}

// setupProxyConnections creates proxy connections to a.hosts in pauseContainer.
// It assumes that pauseContainer is already running. A unique proxy connection
// is created for each host (in parallel) using AWS SSM Port Forwarding through
Expand Down Expand Up @@ -416,16 +460,19 @@ func (o *Orchestrator) stopTask(ctx context.Context, task Task) error {
}

// waitForContainerToStart blocks until the container specified by id starts.
func (o *Orchestrator) waitForContainerToStart(ctx context.Context, id string) error {
func (o *Orchestrator) waitForContainerToStart(ctx context.Context, id string, isEssential bool) error {
for {
isRunning, err := o.docker.IsContainerRunning(ctx, id)
switch {
case err != nil:
var errContainerExited *dockerengine.ErrContainerExited
if errors.As(err, &errContainerExited) && !isEssential {
return nil
}
return fmt.Errorf("check if %q is running: %w", id, err)
case isRunning:
return nil
}

select {
case <-time.After(1 * time.Second):
case <-ctx.Done():
Expand All @@ -434,6 +481,66 @@ func (o *Orchestrator) waitForContainerToStart(ctx context.Context, id string) e
}
}

func (o *Orchestrator) waitForContainerDependencies(ctx context.Context, containerName string, a *runTaskAction) error {
fmt.Printf("Waiting for container %q dependencies...\n", containerName)
eg, ctx := errgroup.WithContext(ctx)
for name, state := range a.task.Containers[containerName].DependsOn {
name, state := name, state
eg.Go(func() error {
ctrId := o.containerID(name)
isEssential := a.task.Containers[name].IsEssential
ticker := time.NewTicker(700 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
switch state {
case ctrStateStart:
err := o.waitForContainerToStart(ctx, ctrId, isEssential)
if err != nil {
return err
}
if !isEssential {
fmt.Printf("non-essential container %q started successfully\n", ctrId)
}
log.Successf("Successfully container %s is running\n", ctrId)
return nil
case ctrStateHealthy:
healthy, err := o.docker.IsContainerHealthy(ctx, ctrId)
if err != nil {
if !isEssential {
fmt.Printf("non-essential container %q failed to be %q: %v\n", ctrId, state, err)
return nil
}
return fmt.Errorf("essential container %q failed to be %q: %w", ctrId, state, err)
}
if healthy {
log.Successf("Successfully dependency container %q reached %q\n", ctrId, state)
return nil
}
case ctrStateComplete, ctrStateSuccess:
exited, exitCode, err := o.docker.IsContainerCompleteOrSuccess(ctx, ctrId)
if err != nil {
if !isEssential {
fmt.Printf("non-essential container %q failed to be %q: %v\n", ctrId, state, err)
return nil
}
return fmt.Errorf("dependency container %q failed to be %q: %w", ctrId, state, err)
}
if exited && ((state == "complete" && exitCode != 0) || (state == "success" && exitCode == 0)) {
log.Successf("Successfully dependency container %q exited with code: %d\n", name, exitCode)
return nil
}
}
case <-ctx.Done():
return ctx.Err()
}
}
})
}
return eg.Wait()
}

// containerID returns the full ID for a container with name run by s.
func (o *Orchestrator) containerID(name string) string {
return o.idPrefix + name
Expand Down Expand Up @@ -495,7 +602,10 @@ func (o *Orchestrator) containerRunOptions(name string, ctr ContainerDefinition)
// run calls `docker run` using opts. Errors are only returned
// to the main Orchestrator routine if the taskID the container was run with
// matches the current taskID the Orchestrator is running.
func (o *Orchestrator) run(taskID int32, opts dockerengine.RunOptions) {
// run calls `docker run` using opts. Errors are only returned
// to the main Orchestrator routine if the taskID the container was run with
// matches the current taskID the Orchestrator is running.
func (o *Orchestrator) run(taskID int32, opts dockerengine.RunOptions, isEssential bool, cancelFn context.CancelFunc) {
o.wg.Add(1)
go func() {
defer o.wg.Done()
Expand All @@ -514,7 +624,13 @@ func (o *Orchestrator) run(taskID int32, opts dockerengine.RunOptions) {
if err == nil {
err = errors.New("container stopped unexpectedly")
}
o.runErrs <- fmt.Errorf("run %q: %w", opts.ContainerName, err)
var errContainerExited *dockerengine.ErrContainerExited
if errors.As(err, &errContainerExited) && !isEssential {
return
}
// cancel context to indicate all the other go routines spawned by `graph.UpwardTarversal`.
cancelFn()
o.runErrs <- fmt.Errorf("run essential %q: %w", opts.ContainerName, err)
}
}()
}
51 changes: 32 additions & 19 deletions internal/pkg/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ package graph
import (
"context"
"sync"

"golang.org/x/sync/errgroup"
)

// vertexStatus denotes the visiting status of a vertex when running DFS in a graph.
Expand Down Expand Up @@ -370,53 +368,68 @@ type graphTraversal[V comparable] struct {
}

func (t *graphTraversal[V]) execute(ctx context.Context, graph *Graph[V]) error {

ctx, cancel := context.WithCancel(ctx)
defer cancel()

vertexCount := len(graph.vertices)
if vertexCount == 0 {
return nil
}
eg, ctx := errgroup.WithContext(ctx)

var wg sync.WaitGroup
vertexCh := make(chan V, vertexCount)
defer close(vertexCh)
errCh := make(chan error, vertexCount) // Channel for errors

processVertices := func(ctx context.Context, graph *Graph[V], eg *errgroup.Group, vertices []V, vertexCh chan V) {
processVertices := func(ctx context.Context, graph *Graph[V], wg *sync.WaitGroup, vertices []V, vertexCh chan V) {
for _, vertex := range vertices {
vertex := vertex
// Delay processing this vertex if any of its dependent vertices are yet to be processed.
if len(t.filterVerticesByStatus(graph, vertex, t.adjacentVertexSkipStatus)) != 0 {
continue
}
if !t.markAsSeen(vertex) {
// Skip this vertex if it's already been processed by another routine.
continue
}
eg.Go(func() error {
wg.Add(1)
go func() {
defer wg.Done()
if err := t.processVertex(ctx, vertex); err != nil {
return err
errCh <- err
return
}
// Assign new status to the vertex upon successful processing.
graph.updateStatus(vertex, t.requiredVertexStatus)
vertexCh <- vertex
return nil
})
}()
}
}

eg.Go(func() error {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return nil
return
case vertex := <-vertexCh:
vertexCount--
if vertexCount == 0 {
return nil
return
}
processVertices(ctx, graph, eg, t.findAdjacentVertices(graph, vertex), vertexCh)
processVertices(ctx, graph, &wg, t.findAdjacentVertices(graph, vertex), vertexCh)
}
}
})
processVertices(ctx, graph, eg, t.findBoundaryVertices(graph), vertexCh)
return eg.Wait()
}()
processVertices(ctx, graph, &wg, t.findBoundaryVertices(graph), vertexCh)
wg.Wait() // Wait for all goroutines to finish
close(errCh) // Close error channel

// Check if there were any errors
for err := range errCh {
if err != nil {
return err
}
}
return nil
}

func (t *graphTraversal[V]) markAsSeen(vertex V) bool {
Expand Down

0 comments on commit 6365868

Please sign in to comment.