Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multiplexing #101

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/meta_protocol_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ envoy_cc_library(
],
deps = [
":app_exception_lib",
":config_interface_lib",
":decoder_events_lib",
":decoder_lib",
":heartbeat_response_lib",
":stats_lib",
":upstream_handler_impl_lib",
"//api/meta_protocol_proxy/v1alpha:pkg_cc_proto",
"//src/meta_protocol_proxy/route:rds_interface",
"//src/meta_protocol_proxy/route:route_interface",
Expand All @@ -70,6 +72,7 @@ envoy_cc_library(
"@envoy//envoy/network:filter_interface",
"@envoy//envoy/stats:stats_interface",
"@envoy//envoy/stats:timespan_interface",
"@envoy//envoy/upstream:cluster_manager_interface",
"@envoy//source/common/buffer:buffer_lib",
"@envoy//source/common/buffer:watermark_buffer_lib",
"@envoy//source/common/common:assert_lib",
Expand Down Expand Up @@ -150,4 +153,65 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "config_interface_lib",
repository = "@envoy",
hdrs = ["config_interface.h"],
deps = [
":stats_lib",
"//api/meta_protocol_proxy/v1alpha:pkg_cc_proto",
"//src/meta_protocol_proxy/codec:codec_interface",
"//src/meta_protocol_proxy/filters:filter_interface",
"//src/meta_protocol_proxy/request_id:request_id_interface",
"//src/meta_protocol_proxy/route:rds_interface",
"//src/meta_protocol_proxy/route:route_interface",
"//src/meta_protocol_proxy/tracing:tracer_interface",
],
)

envoy_cc_library(
name = "upstream_handler_lib",
repository = "@envoy",
srcs = ["upstream_handler.cc"],
hdrs = ["upstream_handler.h"],
deps = [
"@envoy//source/common/buffer:buffer_lib",
"@envoy//envoy/upstream:thread_local_cluster_interface",
"@envoy//envoy/upstream:load_balancer_interface",
"@envoy//envoy/tcp:conn_pool_interface",
"//src/meta_protocol_proxy/filters:filter_define_lib",
"//src/meta_protocol_proxy/codec:codec_interface",
],
)

envoy_cc_library(
name = "upstream_handler_impl_lib",
repository = "@envoy",
srcs = ["upstream_handler_impl.cc"],
hdrs = ["upstream_handler_impl.h"],
deps = [
":upstream_response_lib",
":upstream_handler_lib",
"@envoy//source/common/buffer:buffer_lib",
"@envoy//envoy/upstream:thread_local_cluster_interface",
"@envoy//envoy/upstream:load_balancer_interface",
"@envoy//envoy/tcp:conn_pool_interface",
],
)


envoy_cc_library(
name = "upstream_response_lib",
repository = "@envoy",
srcs = ["upstream_response.cc"],
hdrs = ["upstream_response.h"],
deps = [
":decoder_lib",
":decoder_events_lib",
":config_interface_lib",
"//src/meta_protocol_proxy/codec:codec_interface",
"//src/meta_protocol_proxy/filters:filter_interface",
],
)


21 changes: 21 additions & 0 deletions src/meta_protocol_proxy/active_message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,18 @@ const std::vector<AccessLog::InstanceSharedPtr>& ActiveMessageDecoderFilter::acc
return activeMessage_.accessLogs();
}

UpstreamHandlerSharedPtr
ActiveMessageDecoderFilter::getUpstreamHandler(const std::string& cluster_name,
Upstream::LoadBalancerContext& context) {
return activeMessage_.getUpstreamHandler(cluster_name, context);
}

bool ActiveMessageDecoderFilter::multiplexing() { return activeMessage_.multiplexing(); }

void ActiveMessageDecoderFilter::onUpstreamResponse() {
return activeMessage_.onUpstreamResponse();
}

// class ActiveMessageEncoderFilter
ActiveMessageEncoderFilter::ActiveMessageEncoderFilter(ActiveMessage& parent,
EncoderFilterSharedPtr filter,
Expand Down Expand Up @@ -415,6 +427,15 @@ const std::vector<AccessLog::InstanceSharedPtr>& ActiveMessage::accessLogs() {
return connection_manager_.accessLogs();
}

UpstreamHandlerSharedPtr ActiveMessage::getUpstreamHandler(const std::string& cluster_name,
Upstream::LoadBalancerContext& context) {
return connection_manager_.getUpstreamHandler(cluster_name, context);
}

bool ActiveMessage::multiplexing() { return connection_manager_.config().multiplexing(); }

void ActiveMessage::onUpstreamResponse() { connection_manager_.deferredDeleteMessage(*this); }

void ActiveMessage::maybeDeferredDeleteMessage() {
pending_stream_decoded_ = false;
connection_manager_.stats().request_.inc();
Expand Down
8 changes: 8 additions & 0 deletions src/meta_protocol_proxy/active_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ class ActiveMessageDecoderFilter : public DecoderFilterCallbacks,
Tracing::TracingConfig* tracingConfig() override;
RequestIDExtensionSharedPtr requestIDExtension() override;
const std::vector<AccessLog::InstanceSharedPtr>& accessLogs() override;
UpstreamHandlerSharedPtr getUpstreamHandler(const std::string& cluster_name,
Upstream::LoadBalancerContext& context) override;
bool multiplexing() override;
void onUpstreamResponse() override;

DecoderFilterSharedPtr handler() { return handle_; }

Expand Down Expand Up @@ -190,6 +194,10 @@ class ActiveMessage : public LinkedObject<ActiveMessage>,
Tracing::TracingConfig* tracingConfig() override;
RequestIDExtensionSharedPtr requestIDExtension() override;
const std::vector<AccessLog::InstanceSharedPtr>& accessLogs() override;
UpstreamHandlerSharedPtr getUpstreamHandler(const std::string& cluster_name,
Upstream::LoadBalancerContext& context) override;
bool multiplexing() override;
void onUpstreamResponse() override;

void createFilterChain();
FilterStatus applyDecoderFilters(ActiveMessageDecoderFilter* filter,
Expand Down
24 changes: 16 additions & 8 deletions src/meta_protocol_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ Network::FilterFactoryCb MetaProtocolProxyFilterConfigFactory::createFilterFacto
// Keep in mind the lambda capture list **doesn't** determine the destruction order, but it's fine
// as these captured objects are also global singletons.
return [singletons, filter_config, &context](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(
std::make_shared<ConnectionManager>(*filter_config, context.api().randomGenerator(),
context.mainThreadDispatcher().timeSource()));
filter_manager.addReadFilter(std::make_shared<ConnectionManager>(
*filter_config, context.api().randomGenerator(),
context.mainThreadDispatcher().timeSource(), context.clusterManager()));
};
}

Expand All @@ -72,11 +72,11 @@ ConfigImpl::ConfigImpl(const MetaProtocolProxyConfig& config,
Server::Configuration::FactoryContext& context,
Route::RouteConfigProviderManager& route_config_provider_manager,
MetaProtocolProxy::Tracing::MetaProtocolTracerManager& tracer_manager)
: context_(context),
: context_(context), application_protocol_(config.application_protocol()),
codecConfig_(config.codec()), application_protocol_config_(config.protocol()),
stats_prefix_(
fmt::format("meta_protocol.{}.{}.", config.application_protocol(), config.stat_prefix())),
fmt::format("meta_protocol.{}.{}.", applicationProtocol(), config.stat_prefix())),
stats_(MetaProtocolProxyStats::generateStats(stats_prefix_, context_.scope())),
application_protocol_(config.application_protocol()), codecConfig_(config.codec()),
route_config_provider_manager_(route_config_provider_manager) {
ENVOY_LOG(trace, "********** MetaProtocolProxy ConfigImpl constructor ***********");
// check idle_timer config
Expand Down Expand Up @@ -205,9 +205,9 @@ Route::RouteConstSharedPtr ConfigImpl::route(const Metadata& metadata,

CodecPtr ConfigImpl::createCodec() {
auto& factory = Envoy::Config::Utility::getAndCheckFactoryByName<NamedCodecConfigFactory>(
codecConfig_.name());
getCodecConfig().name());
ProtobufTypes::MessagePtr message = factory.createEmptyConfigProto();
Envoy::Config::Utility::translateOpaqueConfig(codecConfig_.config(),
Envoy::Config::Utility::translateOpaqueConfig(getCodecConfig().config(),
context_.messageValidationVisitor(), *message);
return factory.createCodec(*message);
}
Expand All @@ -231,6 +231,14 @@ void ConfigImpl::registerFilter(const MetaProtocolFilterConfig& proto_config) {
filter_factories_.push_back(callback);
}

const ConfigImpl::CodecConfig& ConfigImpl::getCodecConfig() {
if (application_protocol_config_.has_codec() &&
(!application_protocol_config_.codec().name().empty())) {
return application_protocol_config_.codec();
}
return codecConfig_;
}

} // namespace MetaProtocolProxy
} // namespace NetworkFilters
} // namespace Extensions
Expand Down
14 changes: 11 additions & 3 deletions src/meta_protocol_proxy/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class ConfigImpl : public Config,
using MetaProtocolProxyConfig = aeraki::meta_protocol_proxy::v1alpha::MetaProtocolProxy;
using MetaProtocolFilterConfig = aeraki::meta_protocol_proxy::v1alpha::MetaProtocolFilter;
using CodecConfig = aeraki::meta_protocol_proxy::v1alpha::Codec;
using ApplicationProtocolConfig = aeraki::meta_protocol_proxy::v1alpha::ApplicationProtocol;

ConfigImpl(const MetaProtocolProxyConfig& config, Server::Configuration::FactoryContext& context,
Route::RouteConfigProviderManager& route_config_provider_manager,
Expand All @@ -90,14 +91,18 @@ class ConfigImpl : public Config,
FilterChainFactory& filterFactory() override { return *this; }
Route::Config& routerConfig() override { return *this; }
CodecPtr createCodec() override;
std::string applicationProtocol() override { return application_protocol_; };
std::string applicationProtocol() override {
return application_protocol_config_.name().empty() ? application_protocol_
: application_protocol_config_.name();
};
absl::optional<std::chrono::milliseconds> idleTimeout() override { return idle_timeout_; };
Tracing::MetaProtocolTracerSharedPtr tracer() override { return tracer_; };
Tracing::TracingConfig* tracingConfig() override { return tracing_config_.get(); };
RequestIDExtensionSharedPtr requestIDExtension() override { return request_id_extension_; };
const std::vector<AccessLog::InstanceSharedPtr>& accessLogs() const override {
return access_logs_;
}
bool multiplexing() override { return application_protocol_config_.multiplexing(); }

private:
void registerFilter(const MetaProtocolFilterConfig& proto_config);
Expand All @@ -108,12 +113,15 @@ class ConfigImpl : public Config,
const envoy::config::trace::v3::Tracing_Http*
getPerFilterTracerConfig(const MetaProtocolProxyConfig& config);

const CodecConfig& getCodecConfig();

Server::Configuration::FactoryContext& context_;
const std::string stats_prefix_;
MetaProtocolProxyStats stats_;
// Router::RouteMatcherPtr route_matcher_;
std::string application_protocol_;
CodecConfig codecConfig_;
ApplicationProtocolConfig application_protocol_config_;
const std::string stats_prefix_;
MetaProtocolProxyStats stats_;
std::list<FilterFactoryCb> filter_factories_;
Route::RouteConfigProviderSharedPtr route_config_provider_;
Route::RouteConfigProviderManager& route_config_provider_manager_;
Expand Down
46 changes: 46 additions & 0 deletions src/meta_protocol_proxy/config_interface.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#pragma once

#include "src/meta_protocol_proxy/filters/filter.h"
#include "src/meta_protocol_proxy/stats.h"
#include "src/meta_protocol_proxy/codec/codec.h"
#include "src/meta_protocol_proxy/route/route.h"
#include "src/meta_protocol_proxy/route/rds.h"
#include "src/meta_protocol_proxy/tracing/tracer.h"
#include "src/meta_protocol_proxy/request_id/request_id_extension.h"


namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace MetaProtocolProxy {

/**
* Config is a configuration interface for ConnectionManager.
*/
class Config {
public:
virtual ~Config() = default;

virtual FilterChainFactory& filterFactory() PURE;
virtual MetaProtocolProxyStats& stats() PURE;
virtual CodecPtr createCodec() PURE;
virtual Route::Config& routerConfig() PURE;
virtual std::string applicationProtocol() PURE;
virtual absl::optional<std::chrono::milliseconds> idleTimeout() PURE;
/**
* @return Route::RouteConfigProvider* the configuration provider used to acquire a route
* config for each request flow. Pointer ownership is _not_ transferred to the caller of
* this function.
*/
virtual Route::RouteConfigProvider* routeConfigProvider() PURE;
virtual Tracing::MetaProtocolTracerSharedPtr tracer() PURE;
virtual Tracing::TracingConfig* tracingConfig() PURE;
virtual RequestIDExtensionSharedPtr requestIDExtension() PURE;
virtual const std::vector<AccessLog::InstanceSharedPtr>& accessLogs() const PURE;
virtual bool multiplexing() PURE;
};

} // namespace MetaProtocolProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
49 changes: 47 additions & 2 deletions src/meta_protocol_proxy/conn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "src/meta_protocol_proxy/app_exception.h"
#include "src/meta_protocol_proxy/heartbeat_response.h"
#include "src/meta_protocol_proxy/codec_impl.h"
#include "src/meta_protocol_proxy/upstream_handler_impl.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -17,10 +18,12 @@ namespace MetaProtocolProxy {
constexpr uint32_t BufferLimit = UINT32_MAX;

ConnectionManager::ConnectionManager(Config& config, Random::RandomGenerator& random_generator,
TimeSource& time_system)
TimeSource& time_system,
Upstream::ClusterManager& cluster_manager)
: config_(config), time_system_(time_system), stats_(config_.stats()),
random_generator_(random_generator), codec_(config.createCodec()),
decoder_(std::make_unique<RequestDecoder>(*codec_, *this)) {}
decoder_(std::make_unique<RequestDecoder>(*codec_, *this)),
cluster_manager_(cluster_manager) {}

Network::FilterStatus ConnectionManager::onData(Buffer::Instance& data, bool end_stream) {
ENVOY_LOG(debug, "meta protocol: read {} bytes", data.length());
Expand Down Expand Up @@ -57,12 +60,15 @@ void ConnectionManager::initializeReadFilterCallbacks(Network::ReadFilterCallbac
}

void ConnectionManager::onEvent(Network::ConnectionEvent event) {
ENVOY_LOG(debug, "ConnectionManager onEvent {}", static_cast<int>(event));
if (event == Network::ConnectionEvent::LocalClose) {
disableIdleTimer();
resetAllMessages(true);
resetUpstreamHandlerManager();
} else if (event == Network::ConnectionEvent::RemoteClose) {
disableIdleTimer();
resetAllMessages(false);
resetUpstreamHandlerManager();
}
}

Expand Down Expand Up @@ -231,6 +237,45 @@ void ConnectionManager::disableIdleTimer() {
}
}

void ConnectionManager::resetUpstreamHandlerManager() { upstream_handler_manager_.clear(); }

UpstreamHandlerSharedPtr
ConnectionManager::getUpstreamHandler(const std::string& cluster_name,
Upstream::LoadBalancerContext& context) {
auto* cluster = cluster_manager_.getThreadLocalCluster(cluster_name);
if (cluster == nullptr) {
ENVOY_LOG(error, "unknown cluster '{}'", cluster_name);
return nullptr;
}

auto tcp_pool_data = UpstreamHandler::createTcpPoolData(*cluster, context);
if (!tcp_pool_data) {
ENVOY_LOG(error, "no conn pool for {}", cluster_name);
return nullptr;
}
std::string key = cluster_name + "_" + tcp_pool_data.value().host()->address()->asString();

// get exist upstream handler
auto upstream_handler = upstream_handler_manager_.get(key);
if (upstream_handler) {
ENVOY_LOG(debug, "use exist upstream handler, key:{}", key);
return upstream_handler;
}

// create upstream handler
ENVOY_LOG(debug, "create upstream handler: key={}, hostname={}, address={}", key,
tcp_pool_data.value().host()->hostname(),
tcp_pool_data.value().host()->address()->asString());

auto new_upstream_handler = std::make_shared<UpstreamHandlerImpl>(
key, read_callbacks_->connection(), config_,
[this](const std::string& key) { this->upstream_handler_manager_.del(key); });
upstream_handler_manager_.add(key, new_upstream_handler);

new_upstream_handler->start(*tcp_pool_data);
return new_upstream_handler;
}

} // namespace MetaProtocolProxy
} // namespace NetworkFilters
} // namespace Extensions
Expand Down
Loading