Skip to content

Commit

Permalink
Changing autoscaling lifecycle agent to publish to sns instead of sqs
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert committed Feb 1, 2017
1 parent 4234f5d commit df845af
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties
class InstanceTerminationConfigurationProperties {
String accountName
String queueARN
String sourceARN
String topicARN

int maxMessagesPerCycle = 1000
int visibilityTimeout = 30
Expand All @@ -34,13 +34,13 @@ class InstanceTerminationConfigurationProperties {

InstanceTerminationConfigurationProperties(String accountName,
String queueARN,
String sourceARN,
String topicARN,
int maxMessagesPerCycle,
int visibilityTimeout,
int waitTimeSeconds) {
this.accountName = accountName
this.queueARN = queueARN
this.sourceARN = sourceARN
this.topicARN = topicARN
this.maxMessagesPerCycle = maxMessagesPerCycle
this.visibilityTimeout = visibilityTimeout
this.waitTimeSeconds = waitTimeSeconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import com.amazonaws.auth.policy.Principal;
import com.amazonaws.auth.policy.Resource;
import com.amazonaws.auth.policy.Statement;
import com.amazonaws.auth.policy.actions.SNSActions;
import com.amazonaws.auth.policy.actions.SQSActions;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.model.SetTopicAttributesRequest;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
Expand Down Expand Up @@ -68,6 +71,7 @@ public class InstanceTerminationLifecycleAgent implements RunnableAgent, CustomS
Provider<AwsEurekaSupport> discoverySupport;

private final ARN queueARN;
private final ARN topicARN;

private String queueId = null;

Expand All @@ -84,6 +88,7 @@ public InstanceTerminationLifecycleAgent(ObjectMapper objectMapper,

Set<? extends AccountCredentials> accountCredentials = accountCredentialsProvider.getAll();
this.queueARN = new ARN(accountCredentials, properties.getQueueARN());
this.topicARN = new ARN(accountCredentials, properties.getTopicARN());
}

@Override
Expand All @@ -109,14 +114,16 @@ public long getTimeoutMillis() {
@Override
public void run() {
AmazonSQS amazonSQS = amazonClientProvider.getAmazonSQS(queueARN.account, queueARN.region);
AmazonSNS amazonSNS = amazonClientProvider.getAmazonSNS(topicARN.account, topicARN.region);

Set<String> allAccountIds = accountCredentialsProvider.getAll()
List<String> allAccountIds = accountCredentialsProvider.getAll()
.stream()
.map(AccountCredentials::getAccountId)
.filter(a -> a != null)
.collect(Collectors.toSet());
.collect(Collectors.toList());

this.queueId = ensureQueueExists(amazonSQS, queueARN, properties.getSourceARN(), allAccountIds);
this.queueId = ensureQueueExists(amazonSQS, queueARN, topicARN);
ensureTopicExists(amazonSNS, topicARN, allAccountIds, queueARN);

AtomicInteger messagesProcessed = new AtomicInteger(0);
while (messagesProcessed.get() < properties.getMaxMessagesPerCycle()) {
Expand Down Expand Up @@ -191,24 +198,49 @@ private NetflixAmazonCredentials getAccountCredentialsById(String accountId) {
});
}

private static String ensureQueueExists(AmazonSQS amazonSQS, ARN queueARN, String sourceARN, Set<String> allAccountIds) {
private static String ensureTopicExists(AmazonSNS amazonSNS,
ARN topicARN,
List<String> allAccountIds,
ARN queueARN) {
topicARN.arn = amazonSNS.createTopic(topicARN.name).getTopicArn();

amazonSNS.setTopicAttributes(
new SetTopicAttributesRequest()
.withTopicArn(topicARN.arn)
.withAttributeName("Policy")
.withAttributeValue(buildSNSPolicy(topicARN, allAccountIds).toJson())
);

amazonSNS.subscribe(topicARN.arn, "sqs", queueARN.arn);

return topicARN.arn;
}

private static Policy buildSNSPolicy(ARN topicARN, List<String> allAccountIds) {
Statement statement = new Statement(Statement.Effect.Allow).withActions(SNSActions.Publish);
statement.setPrincipals(allAccountIds.stream().map(Principal::new).collect(Collectors.toList()));
statement.setResources(Collections.singletonList(new Resource(topicARN.arn)));

return new Policy("allow-remote-account-send", Collections.singletonList(statement));
}

private static String ensureQueueExists(AmazonSQS amazonSQS, ARN queueARN, ARN topicARN) {
String queueUrl = amazonSQS.createQueue(queueARN.name).getQueueUrl();
amazonSQS.setQueueAttributes(
queueUrl, Collections.singletonMap("Policy", buildSQSPolicy(queueARN, sourceARN, allAccountIds).toJson())
queueUrl, Collections.singletonMap("Policy", buildSQSPolicy(queueARN, topicARN).toJson())
);

return queueUrl;
}

private static Policy buildSQSPolicy(ARN queue, String sourceARN, Set<String> allAccountIds) {
private static Policy buildSQSPolicy(ARN queue, ARN topic) {
Statement statement = new Statement(Statement.Effect.Allow).withActions(SQSActions.SendMessage);
statement.setPrincipals(allAccountIds.stream().map(Principal::new).collect(Collectors.toList()));
statement.setPrincipals(Principal.All);
statement.setResources(Collections.singletonList(new Resource(queue.arn)));

statement.setConditions(Collections.singletonList(
new Condition().withType("ArnLike").withConditionKey("aws:SourceArn").withValues(sourceARN)
new Condition().withType("ArnEquals").withConditionKey("aws:SourceArn").withValues(topic.arn)
));

return new Policy("allow-remote-account-send", Collections.singletonList(statement));
return new Policy("allow-sns-topic-send", Collections.singletonList(statement));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Collection<Agent> agents() {
.getQueueARN()
.replaceAll(REGION_TEMPLATE_PATTERN, region.getName())
.replaceAll(ACCOUNT_ID_TEMPLATE_PATTERN, credentials.getAccountId()),
properties.getSourceARN()
properties.getTopicARN()
.replaceAll(REGION_TEMPLATE_PATTERN, region.getName())
.replaceAll(ACCOUNT_ID_TEMPLATE_PATTERN, credentials.getAccountId()),
properties.getMaxMessagesPerCycle(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class InstanceTerminationLifecycleAgentProviderSpec extends Specification {

InstanceTerminationConfigurationProperties properties = new InstanceTerminationConfigurationProperties(
'mgmt',
'arn:aws:sqs:{{region}}:{{accountId}}:{{environment}}-queueName',
'arn:aws:iam::*:sourceArn',
'arn:aws:sqs:{{region}}:{{accountId}}:queueName',
'arn:aws:sqs:{{region}}:{{accountId}}:topicName',
-1,
-1,
-1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package com.netflix.spinnaker.clouddriver.aws.lifecycle

import com.amazonaws.services.sns.AmazonSNS
import com.amazonaws.services.sns.model.CreateTopicResult
import com.amazonaws.services.sns.model.SetTopicAttributesRequest
import com.amazonaws.services.sqs.AmazonSQS
import com.amazonaws.services.sqs.model.CreateQueueResult
import com.fasterxml.jackson.databind.ObjectMapper
Expand Down Expand Up @@ -43,13 +46,15 @@ class InstanceTerminationLifecycleAgentSpec extends Specification {
}

AmazonSQS amazonSQS = Mock()
AmazonSNS amazonSNS = Mock()
AccountCredentialsProvider accountCredentialsProvider = Mock() {
getAll() >>[mgmtCredentials, testCredentials]
}
Provider<AwsEurekaSupport> awsEurekaSupportProvider = Mock()
AwsEurekaSupport awsEurekaSupport = Mock()

def queueARN = new ARN([mgmtCredentials, testCredentials], "arn:aws:sqs:us-west-2:100:queueName")
def topicARN = new ARN([mgmtCredentials, testCredentials], "arn:aws:sns:us-west-2:100:topicName")

@Subject
def subject = new InstanceTerminationLifecycleAgent(
Expand All @@ -59,25 +64,45 @@ class InstanceTerminationLifecycleAgentSpec extends Specification {
new InstanceTerminationConfigurationProperties(
'mgmt',
queueARN.arn,
'aws:arn:iam::*:role/sourceArn',
topicARN.arn,
-1,
-1,
-1
),
awsEurekaSupportProvider
)

def "should create topic if it does not exist"() {
when:
def topicId = LaunchFailureNotificationAgent.ensureTopicExists(amazonSNS, topicARN, ['100', '200'], queueARN)

then:
topicId == topicARN.arn

1 * amazonSNS.createTopic(topicARN.name) >> { new CreateTopicResult().withTopicArn(topicARN.arn) }

// should attach a policy granting SendMessage rights to the source topic
1 * amazonSNS.setTopicAttributes(new SetTopicAttributesRequest()
.withTopicArn(topicARN.arn)
.withAttributeName("Policy")
.withAttributeValue(LaunchFailureNotificationAgent.buildSNSPolicy(topicARN, ['100', '200']).toJson()))

// should subscribe the queue to this topic
1 * amazonSNS.subscribe(topicARN.arn, "sqs", queueARN.arn)
0 * _
}

def 'should create queue if it does not exist'() {
when:
def queueId = InstanceTerminationLifecycleAgent.ensureQueueExists(amazonSQS, queueARN, 'sourceArn', ['100', '200'] as Set)
def queueId = InstanceTerminationLifecycleAgent.ensureQueueExists(amazonSQS, queueARN, topicARN)

then:
queueId == "my-queue-url"

1 * amazonSQS.createQueue(queueARN.name) >> { new CreateQueueResult().withQueueUrl("my-queue-url") }

1 * amazonSQS.setQueueAttributes("my-queue-url", [
"Policy": InstanceTerminationLifecycleAgent.buildSQSPolicy(queueARN, 'sourceArn', ['100', '200'] as Set).toJson()
"Policy": InstanceTerminationLifecycleAgent.buildSQSPolicy(queueARN, topicARN).toJson()
])
0 * _
}
Expand Down

0 comments on commit df845af

Please sign in to comment.