Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/replace on spot termination events #475

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions core/instance_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ func (i *instance) handleInstanceStates() (bool, error) {
return false, nil
}

// returns an instance ID or error
func (i *instance) launchSpotReplacement() (*string, error) {
return i.launchReplacement("spot")
}

func (i *instance) launchReplacement(replacementLifecycle string) (*string, error) {

ltData, err := i.createLaunchTemplateData()
ltData, err := i.createLaunchTemplateData(replacementLifecycle)

if err != nil {
log.Println("failed to create LaunchTemplate data,", err.Error())
Expand All @@ -61,7 +64,7 @@ func (i *instance) launchSpotReplacement() (*string, error) {
return nil, err
}

cfi := i.createFleetInput(lt, instanceTypes)
cfi := i.createFleetInput(lt, instanceTypes, replacementLifecycle)

resp, err := i.region.services.ec2.CreateFleet(cfi)

Expand All @@ -78,7 +81,8 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error)

odInstance, err := i.getSwapCandidate()
if err != nil {
log.Printf("Couldn't find suitable OnDemand swap candidate: %s", err.Error())
log.Printf("Couldn't find suitable OnDemand swap candidate, terminating Spot instance: %s", *i.InstanceId)
i.terminate()
return nil, err
}

Expand All @@ -95,7 +99,7 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error)
defer asg.setAutoScalingMaxSize(maxSize)
}

log.Printf("Attaching spot instance %s to the group %s",
log.Printf("Attaching %s instance %s to the group %s", *i.InstanceLifecycle,
*i.InstanceId, asg.name)
err = asg.attachSpotInstance(*i.InstanceId, true)

Expand All @@ -106,12 +110,12 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error)
return nil, fmt.Errorf("couldn't attach spot instance %s ", *i.InstanceId)
}

log.Printf("Terminating on-demand instance %s from the group %s",
log.Printf("Terminating instance %s from the group %s",
*odInstance.InstanceId, asg.name)
if err := asg.terminateInstanceInAutoScalingGroup(odInstance.Instance.InstanceId, true, true); err != nil {
log.Printf("On-demand instance %s couldn't be terminated, re-trying...",
log.Printf("instance %s couldn't be terminated, re-trying...",
*odInstance.InstanceId)
return nil, fmt.Errorf("couldn't terminate on-demand instance %s",
return nil, fmt.Errorf("couldn't terminate instance %s",
*odInstance.InstanceId)
}

Expand All @@ -121,23 +125,23 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error)
func (i *instance) getSwapCandidate() (*instance, error) {
odInstanceID := i.getReplacementTargetInstanceID()
if odInstanceID == nil {
log.Println("Couldn't find target on-demand instance of", *i.InstanceId)
log.Println("Couldn't find target instance of", *i.InstanceId)
return nil, fmt.Errorf("couldn't find target instance for %s", *i.InstanceId)
}

if err := i.region.scanInstance(odInstanceID); err != nil {
log.Printf("Couldn't describe the target on-demand instance %s", *odInstanceID)
log.Printf("Couldn't describe the target instance %s", *odInstanceID)
return nil, fmt.Errorf("target instance %s couldn't be described", *odInstanceID)
}

odInstance := i.region.instances.get(*odInstanceID)
if odInstance == nil {
log.Printf("Target on-demand instance %s couldn't be found", *odInstanceID)
log.Printf("Target instance %s couldn't be found", *odInstanceID)
return nil, fmt.Errorf("target instance %s is missing", *odInstanceID)
}

if !odInstance.shouldBeReplacedWithSpot() {
log.Printf("Target on-demand instance %s shouldn't be replaced", *odInstanceID)
log.Printf("Target instance %s shouldn't be replaced", *odInstanceID)
i.terminate()
return nil, fmt.Errorf("target instance %s should not be replaced with spot",
*odInstanceID)
Expand Down
33 changes: 25 additions & 8 deletions core/instance_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,13 @@ func (i *instance) processImageBlockDevices(rii *ec2.RequestLaunchTemplateData)
rii.BlockDeviceMappings = i.convertImageBlockDeviceMappings(resp.Images[0].BlockDeviceMappings)
}

func (i *instance) createLaunchTemplateData() (*ec2.RequestLaunchTemplateData, error) {
i.price = i.typeInfo.pricing.onDemand * i.asg.config.OnDemandPriceMultiplier
func (i *instance) createLaunchTemplateData(instanceLifecycle string) (*ec2.RequestLaunchTemplateData, error) {
odPrice := i.typeInfo.pricing.onDemand
mp := 1.0
if i.asg != nil && i.asg.config.OnDemandPriceMultiplier != 0 {
mp = i.asg.config.OnDemandPriceMultiplier
}
i.price = odPrice * mp

placement := ec2.LaunchTemplatePlacementRequest(*i.Placement)

Expand All @@ -430,11 +435,14 @@ func (i *instance) createLaunchTemplateData() (*ec2.RequestLaunchTemplateData, e

ltData.EbsOptimized = i.EbsOptimized

ltData.InstanceMarketOptions = &ec2.LaunchTemplateInstanceMarketOptionsRequest{
MarketType: aws.String(Spot),
SpotOptions: &ec2.LaunchTemplateSpotMarketOptionsRequest{
MaxPrice: aws.String(strconv.FormatFloat(i.price, 'g', 10, 64)),
},
if instanceLifecycle == "spot" {

ltData.InstanceMarketOptions = &ec2.LaunchTemplateInstanceMarketOptionsRequest{
MarketType: aws.String(Spot),
SpotOptions: &ec2.LaunchTemplateSpotMarketOptionsRequest{
MaxPrice: aws.String(strconv.FormatFloat(i.price, 'g', 10, 64)),
},
}
}

ltData.Placement = &placement
Expand Down Expand Up @@ -467,7 +475,7 @@ func (i *instance) createFleetLaunchTemplate(ltData *ec2.RequestLaunchTemplateDa
return &ltName, err
}

func (i *instance) createFleetInput(ltName *string, instanceTypes []*string) *ec2.CreateFleetInput {
func (i *instance) createFleetInput(ltName *string, instanceTypes []*string, lifeCycle string) *ec2.CreateFleetInput {

var overrides []*ec2.FleetLaunchTemplateOverridesRequest

Expand All @@ -494,13 +502,22 @@ func (i *instance) createFleetInput(ltName *string, instanceTypes []*string) *ec
SpotOptions: &ec2.SpotOptionsRequest{
AllocationStrategy: aws.String(i.asg.config.SpotAllocationStrategy),
},
OnDemandOptions: &ec2.OnDemandOptionsRequest{
AllocationStrategy: aws.String("prioritized"),
},
Type: aws.String("instant"),
TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{
SpotTargetCapacity: aws.Int64(1),
TotalTargetCapacity: aws.Int64(1),
DefaultTargetCapacityType: aws.String("spot"),
},
}
if lifeCycle != "spot" {
log.Printf("Overriding default capacity type to ondemand\n")
retval.TargetCapacitySpecification.DefaultTargetCapacityType = aws.String("on-demand")
retval.TargetCapacitySpecification.SpotTargetCapacity = aws.Int64(0)
retval.TargetCapacitySpecification.OnDemandTargetCapacity = aws.Int64(1)
}
return retval
}

Expand Down
10 changes: 8 additions & 2 deletions core/instance_conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ func Test_instance_createLaunchTemplateData(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

got, _ := tt.inst.createLaunchTemplateData()
got, _ := tt.inst.createLaunchTemplateData("spot")

// make sure the lists of tags are sorted, otherwise the comparison fails
sort.Slice(got.TagSpecifications[0].Tags, func(i, j int) bool {
Expand Down Expand Up @@ -1586,6 +1586,9 @@ func Test_instance_createFleetInput(t *testing.T) {
},
},
},
OnDemandOptions: &ec2.OnDemandOptionsRequest{
AllocationStrategy: aws.String("prioritized"),
},
SpotOptions: &ec2.SpotOptionsRequest{
AllocationStrategy: aws.String("capacity-optimized-prioritized"),
},
Expand Down Expand Up @@ -1634,6 +1637,9 @@ func Test_instance_createFleetInput(t *testing.T) {
},
},
},
OnDemandOptions: &ec2.OnDemandOptionsRequest{
AllocationStrategy: aws.String("prioritized"),
},
SpotOptions: &ec2.SpotOptionsRequest{
AllocationStrategy: aws.String("capacity-optimized"),
},
Expand All @@ -1650,7 +1656,7 @@ func Test_instance_createFleetInput(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

got := tt.i.createFleetInput(tt.ltName, tt.instanceTypes)
got := tt.i.createFleetInput(tt.ltName, tt.instanceTypes, "spot")

if !reflect.DeepEqual(got, tt.want) {
t.Errorf("instance.createFleetInput() = %v, want %v", got, tt.want)
Expand Down
5 changes: 2 additions & 3 deletions core/instance_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ func (i *instance) canTerminate() bool {
func (i *instance) shouldBeReplacedWithSpot() bool {
protT, _ := i.isProtectedFromTermination()
return i.belongsToEnabledASG() &&
i.asgNeedsReplacement() &&
!i.isSpot() &&
(i.isSpot() || i.asgNeedsReplacement()) &&
!i.isProtectedFromScaleIn() &&
!protT
}
Expand All @@ -121,7 +120,7 @@ func (i *instance) belongsToEnabledASG() bool {
asg.loadLaunchTemplate()
i.asg = &asg
i.price = i.typeInfo.pricing.onDemand / i.region.conf.OnDemandPriceMultiplier * i.asg.config.OnDemandPriceMultiplier
log.Printf("%s instace %s belongs to enabled ASG %s", i.region.name,
log.Printf("%s instance %s belongs to enabled ASG %s", i.region.name,
*i.InstanceId, i.asg.name)
return true
}
Expand Down
93 changes: 89 additions & 4 deletions core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,15 @@ func (a *AutoSpotting) processEventInstance(eventType string, region string, ins
spotTermination := newSpotTermination(region)

if spotTermination.IsInAutoSpottingASG(instanceID, a.config.TagFilteringMode, a.config.FilterByTags) {
err := spotTermination.executeAction(instanceID, a.config.TerminationNotificationAction, eventType)
newInstance, err := a.replaceTerminatingSpotInstance(*instanceID, region)
if err == nil {
log.Printf("Launched replacement instance for instance %s: %s\n", *instanceID, *newInstance)
return nil
}

log.Printf("Error launching replacement instance for instance %s: %s, continued to handle its Spot termination\n", *instanceID, err.Error())

err = spotTermination.executeAction(instanceID, a.config.TerminationNotificationAction, eventType)
if err != nil {
log.Printf("Error executing spot termination/rebalance action: %s\n", err.Error())
return err
Expand Down Expand Up @@ -438,10 +446,13 @@ func (a *AutoSpotting) handleNewInstanceLaunch(regionName string, instanceID str
}

// Try OnDemand
if err := a.handleNewOnDemandInstanceLaunch(r, i); err != nil {
if err := a.handleNewOnDemandInstanceLaunch(r, i); !i.isSpot() && err != nil {
log.Printf("%s Instance %s couldn't be handled as on-demand instance", i.region.name, *i.InstanceId)
return err
}

log.Printf("%s Instance %s couldn't be handled as on-demand instance", i.region.name, *i.InstanceId)

// Try Spot
// in case we're not triggered by SQS event we do nothing, onDemand event already manage launched spot instance
if len(a.config.sqsReceiptHandle) > 0 {
Expand Down Expand Up @@ -511,9 +522,9 @@ func (a *AutoSpotting) handleNewOnDemandInstanceLaunch(r *region, i *instance) e
}

} else {
log.Printf("%s skipping instance %s: either doesn't belong to an "+
log.Printf("%s skipping %s instance %s: either doesn't belong to an "+
"enabled ASG or should not be replaced with spot, ",
i.region.name, *i.InstanceId)
i.region.name, *i.InstanceLifecycle, *i.InstanceId)
debug.Printf("%#v", i)
}
return nil
Expand Down Expand Up @@ -551,3 +562,77 @@ func (a *AutoSpotting) handleNewSpotInstanceLaunch(r *region, i *instance) error
}
return nil
}

func (a *AutoSpotting) replaceTerminatingSpotInstance(instanceID, regionName string) (*string, error) {
r := &region{name: regionName, conf: a.config, services: connections{}}

if !r.enabled() {
return nil, fmt.Errorf("region %s is not enabled", regionName)
}

r.services.connect(regionName, a.config.MainRegion)
r.setupAsgFilters()
r.scanForEnabledAutoScalingGroups()

log.Println("Scanning full instance information in", r.name)
r.determineInstanceTypeInformation(r.conf)

if err := r.scanInstance(aws.String(instanceID)); err != nil {
log.Printf("%s Couldn't scan instance %s: %s", regionName,
instanceID, err.Error())
return nil, err
}

i := r.instances.get(instanceID)
if i == nil {
log.Printf("%s Instance %s is missing, skipping...",
regionName, instanceID)
return nil, errors.New("instance missing")
}
log.Printf("%s Found instance %s in state %s",
i.region.name, *i.InstanceId, *i.State.Name)

if *i.State.Name != "running" {
log.Printf("%s Instance %s is not in the running state",
i.region.name, *i.InstanceId)
return nil, errors.New("instance not in running state")
}

asgName := i.getReplacementTargetASGName()

if asgName == nil {
log.Printf("Missing the ASG name tag\n")
return nil, errors.New("missing ASG name tag")
}

i.asg = i.region.findEnabledASGByName(*asgName)
i.asg.scanInstances()
i.asg.loadDefaultConfig()
i.asg.loadConfigFromTags()
i.asg.loadLaunchConfiguration()
i.asg.loadLaunchTemplate()

newInstanceID, err := i.launchSpotReplacement()
if err != nil {
fmt.Printf("Spot Instance launch failed while replacing %s, error: %s, falling back to on-demand\n", *i.InstanceId, err.Error())

newInstanceID, err = i.launchReplacement("on-demand")
if err != nil {
fmt.Printf("Instance launch failed while replacing %s, error: %s\n", *i.InstanceId, err.Error())
return nil, err
}
}

i.region.scanInstances()
newInstance := i.region.instances.get(*newInstanceID)

newInstance.swapWithGroupMember(i.asg)

if err = i.asg.waitForInstanceStatus(newInstanceID, "InService", 5); err != nil {
log.Printf("Instance %s is still not InService, trying to terminate it.",
*newInstanceID)
newInstance.terminate()
}

return newInstanceID, nil
}