Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support pilot-agent connection #3293

Open
wants to merge 3 commits into
base: 2.2.x-ospp2023
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.cloud.governance.istio;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.alibaba.cloud.governance.istio.constant.IstioConstants;
import com.alibaba.cloud.governance.istio.protocol.AbstractXdsProtocol;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author musi
* @author <a href="[email protected]"></a>
* @since 2.2.10-RC1 AggregateDiscoveryService is used to send xds request and handle xds
* response.
*/
public class AggregateDiscoveryService {

private static final Logger log = LoggerFactory
.getLogger(AggregateDiscoveryService.class);

private final Map<String, AbstractXdsProtocol> protocolMap = new HashMap<>();

private final Map<String, Set<String>> requestResource = new ConcurrentHashMap<>();

private StreamObserver<DiscoveryRequest> observer;

private final XdsConfigProperties xdsConfigProperties;

private final XdsChannel xdsChannel;

private static final ScheduledExecutorService retry = Executors
.newSingleThreadScheduledExecutor();

public AggregateDiscoveryService(XdsChannel xdsChannel,
XdsConfigProperties xdsConfigProperties) {
this.xdsChannel = xdsChannel;
this.xdsConfigProperties = xdsConfigProperties;
this.observer = xdsChannel.createDiscoveryRequest(new XdsObserver());
}

public void addProtocol(AbstractXdsProtocol abstractXdsProtocol) {
protocolMap.put(abstractXdsProtocol.getTypeUrl(), abstractXdsProtocol);
}

public void sendXdsRequest(String typeUrl, Set<String> resourceNames) {
requestResource.put(typeUrl, resourceNames);
DiscoveryRequest request = DiscoveryRequest.newBuilder()
.setNode(xdsChannel.getNode()).setTypeUrl(typeUrl)
.addAllResourceNames(resourceNames).build();
observer.onNext(request);
}

private void sendAckRequest(DiscoveryResponse response) {
Set<String> ackResource = requestResource.getOrDefault(response.getTypeUrl(),
new HashSet<>());
DiscoveryRequest request = DiscoveryRequest.newBuilder()
.setVersionInfo(response.getVersionInfo()).setNode(xdsChannel.getNode())
.addAllResourceNames(ackResource).setTypeUrl(response.getTypeUrl())
.setResponseNonce(response.getNonce()).build();
observer.onNext(request);
}

private class XdsObserver implements StreamObserver<DiscoveryResponse> {

@Override
public void onNext(DiscoveryResponse discoveryResponse) {
String typeUrl = discoveryResponse.getTypeUrl();
if (xdsConfigProperties.isLogXds()) {
log.info("Receive notification from xds server, type: {}, size: {}",
typeUrl, discoveryResponse.getResourcesCount());
}
AbstractXdsProtocol protocol = protocolMap.get(typeUrl);
if (protocol == null) {
throw new UnsupportedOperationException("No protocol of type " + typeUrl);
}
List<?> responses = protocol.decodeXdsResponse(discoveryResponse);
sendAckRequest(discoveryResponse);
protocol.onResponseDecoded(responses);
}

@Override
public void onError(Throwable throwable) {
if (xdsConfigProperties.isLogXds()) {
log.error("Connect to xds server failed, reconnect after 3 seconds",
throwable);
}

requestResource.clear();
// refresh token again
if (!xdsConfigProperties.getUseAgent() && xdsConfigProperties
.getPort() == IstioConstants.ISTIOD_SECURE_PORT) {
xdsChannel.refreshIstiodToken();
}
retry.schedule(() -> {
observer = xdsChannel.createDiscoveryRequest(this);
sendXdsRequest(IstioConstants.CDS_URL, new HashSet<>());
log.info("Reconnecting to istio control plane!");
}, 3, TimeUnit.SECONDS);
}

@Override
public void onCompleted() {
log.info("Xds connect completed");
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,18 @@ public final class NodeBuilder {
private static Node NODE;

private NodeBuilder() {

}

public static Node getNode() {
public static Node getNode(XdsConfigProperties xdsConfigProperties) {
try {
if (NODE != null) {
return NODE;
}
String podName = System.getenv(IstioConstants.POD_NAME);
String podName = xdsConfigProperties.getPodName();
if (podName == null) {
podName = IstioConstants.DEFAULT_POD_NAME;
}
String podNamespace = System.getenv(IstioConstants.NAMESPACE_NAME);
String podNamespace = xdsConfigProperties.getNamespaceName();
if (podNamespace == null) {
podNamespace = IstioConstants.DEFAULT_NAMESPACE;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

/**
* @author musi
Expand Down Expand Up @@ -75,11 +76,6 @@ public XdsChannel xdsChannel() {
return new XdsChannel(xdsConfigProperties);
}

@Bean
public XdsScheduledThreadPool xdsScheduledThreadPool() {
return new XdsScheduledThreadPool(xdsConfigProperties);
}

@Bean
public XdsResolveFilter<List<Listener>> authXdsResolveFilter() {
return new AuthXdsResolveFilter();
Expand All @@ -91,37 +87,48 @@ public XdsResolveFilter<List<RouteConfiguration>> routingXdsResolveFilter() {
}

@Bean
public PilotExchanger pilotExchanger(LdsProtocol ldsProtocol, CdsProtocol cdsProtocol,
EdsProtocol edsProtocol, RdsProtocol rdsProtocol) {
return new PilotExchanger(ldsProtocol, cdsProtocol, edsProtocol, rdsProtocol);
@Lazy
public AggregateDiscoveryService aggregateDiscoveryService(XdsChannel xdsChannel) {
return new AggregateDiscoveryService(xdsChannel, xdsConfigProperties);
}

@Bean
public LdsProtocol ldsProtocol(XdsChannel xdsChannel,
XdsScheduledThreadPool xdsScheduledThreadPool,
List<XdsResolveFilter<List<Listener>>> filters) {
return new LdsProtocol(xdsChannel, xdsScheduledThreadPool, xdsConfigProperties,
filters);
public LdsProtocol ldsProtocol(List<XdsResolveFilter<List<Listener>>> filters,
RdsProtocol rdsProtocol,
AggregateDiscoveryService aggregateDiscoveryService) {
LdsProtocol ldsProtocol = new LdsProtocol(xdsConfigProperties, filters,
rdsProtocol, aggregateDiscoveryService);
aggregateDiscoveryService.addProtocol(ldsProtocol);
return ldsProtocol;
}

@Bean
public CdsProtocol cdsProtocol(XdsChannel xdsChannel,
XdsScheduledThreadPool xdsScheduledThreadPool) {
return new CdsProtocol(xdsChannel, xdsScheduledThreadPool, xdsConfigProperties);
public CdsProtocol cdsProtocol(EdsProtocol edsProtocol, LdsProtocol ldsProtocol,
AggregateDiscoveryService aggregateDiscoveryService) {
CdsProtocol cdsProtocol = new CdsProtocol(xdsConfigProperties, edsProtocol,
ldsProtocol, aggregateDiscoveryService);
aggregateDiscoveryService.addProtocol(cdsProtocol);
cdsProtocol.observeResource();
return cdsProtocol;
}

@Bean
public EdsProtocol edsProtocol(XdsChannel xdsChannel,
XdsScheduledThreadPool xdsScheduledThreadPool) {
return new EdsProtocol(xdsChannel, xdsScheduledThreadPool, xdsConfigProperties);
public EdsProtocol edsProtocol(LdsProtocol ldsProtocol,
AggregateDiscoveryService aggregateDiscoveryService) {
EdsProtocol edsProtocol = new EdsProtocol(xdsConfigProperties, ldsProtocol,
aggregateDiscoveryService);
aggregateDiscoveryService.addProtocol(edsProtocol);
return edsProtocol;
}

@Bean
public RdsProtocol rdsProtocol(XdsChannel xdsChannel,
XdsScheduledThreadPool xdsScheduledThreadPool,
List<XdsResolveFilter<List<RouteConfiguration>>> filters) {
return new RdsProtocol(xdsChannel, xdsScheduledThreadPool, xdsConfigProperties,
filters);
public RdsProtocol rdsProtocol(
List<XdsResolveFilter<List<RouteConfiguration>>> filters,
AggregateDiscoveryService aggregateDiscoveryService) {
RdsProtocol rdsProtocol = new RdsProtocol(xdsConfigProperties, filters,
aggregateDiscoveryService);
aggregateDiscoveryService.addProtocol(rdsProtocol);
return rdsProtocol;
}

/**
Expand Down
Loading