Skip to content

Commit

Permalink
[XdsClient] move ADS encoding and decoding directly into XdsClient (g…
Browse files Browse the repository at this point in the history
…rpc#38356)

There are no functional changes here; this is solely a code cleanup.

Closes grpc#38356

COPYBARA_INTEGRATE_REVIEW=grpc#38356 from markdroth:xds_api_integration 0d7733b
PiperOrigin-RevId: 710128607
  • Loading branch information
markdroth authored and copybara-github committed Dec 27, 2024
1 parent b691026 commit bbefef3
Show file tree
Hide file tree
Showing 4 changed files with 614 additions and 749 deletions.
223 changes: 0 additions & 223 deletions src/core/xds/xds_client/xds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,62 +16,16 @@

#include "src/core/xds/xds_client/xds_api.h"

#include <grpc/status.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
#include <stdint.h>
#include <stdlib.h>

#include <set>
#include <string>
#include <vector>

#include "absl/log/log.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/strip.h"
#include "envoy/config/core/v3/base.upb.h"
#include "envoy/config/endpoint/v3/load_report.upb.h"
#include "envoy/service/discovery/v3/discovery.upb.h"
#include "envoy/service/discovery/v3/discovery.upbdefs.h"
#include "envoy/service/load_stats/v3/lrs.upb.h"
#include "envoy/service/load_stats/v3/lrs.upbdefs.h"
#include "envoy/service/status/v3/csds.upb.h"
#include "google/protobuf/any.upb.h"
#include "google/protobuf/duration.upb.h"
#include "google/protobuf/struct.upb.h"
#include "google/protobuf/timestamp.upb.h"
#include "google/rpc/status.upb.h"
#include "src/core/util/json/json.h"
#include "src/core/util/upb_utils.h"
#include "src/core/xds/xds_client/xds_client.h"
#include "upb/base/string_view.h"
#include "upb/mem/arena.hpp"
#include "upb/reflection/def.h"
#include "upb/text/encode.h"

// IWYU pragma: no_include "upb/msg_internal.h"

namespace grpc_core {

XdsApi::XdsApi(XdsClient* client, TraceFlag* tracer,
const XdsBootstrap::Node* node, upb::DefPool* def_pool,
std::string user_agent_name, std::string user_agent_version)
: client_(client),
tracer_(tracer),
node_(node),
def_pool_(def_pool),
user_agent_name_(std::move(user_agent_name)),
user_agent_version_(std::move(user_agent_version)) {}

namespace {

struct XdsApiContext {
XdsClient* client;
TraceFlag* tracer;
upb_DefPool* def_pool;
upb_Arena* arena;
};

void PopulateMetadataValue(google_protobuf_Value* value_pb, const Json& value,
upb_Arena* arena);

Expand Down Expand Up @@ -125,37 +79,8 @@ void PopulateMetadataValue(google_protobuf_Value* value_pb, const Json& value,
}
}

void MaybeLogDiscoveryRequest(
const XdsApiContext& context,
const envoy_service_discovery_v3_DiscoveryRequest* request) {
if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer) && ABSL_VLOG_IS_ON(2)) {
const upb_MessageDef* msg_type =
envoy_service_discovery_v3_DiscoveryRequest_getmsgdef(context.def_pool);
char buf[10240];
upb_TextEncode(reinterpret_cast<const upb_Message*>(request), msg_type,
nullptr, 0, buf, sizeof(buf));
VLOG(2) << "[xds_client " << context.client
<< "] constructed ADS request: " << buf;
}
}

std::string SerializeDiscoveryRequest(
const XdsApiContext& context,
envoy_service_discovery_v3_DiscoveryRequest* request) {
size_t output_length;
char* output = envoy_service_discovery_v3_DiscoveryRequest_serialize(
request, context.arena, &output_length);
return std::string(output, output_length);
}

} // namespace

void XdsApi::PopulateNode(envoy_config_core_v3_Node* node_msg,
upb_Arena* arena) {
PopulateXdsNode(node_, user_agent_name_, user_agent_version_, node_msg,
arena);
}

void PopulateXdsNode(const XdsBootstrap::Node* node,
absl::string_view user_agent_name,
absl::string_view user_agent_version,
Expand Down Expand Up @@ -202,152 +127,4 @@ void PopulateXdsNode(const XdsBootstrap::Node* node,
arena);
}

std::string XdsApi::CreateAdsRequest(
absl::string_view type_url, absl::string_view version,
absl::string_view nonce, const std::vector<std::string>& resource_names,
absl::Status status, bool populate_node) {
upb::Arena arena;
const XdsApiContext context = {client_, tracer_, def_pool_->ptr(),
arena.ptr()};
// Create a request.
envoy_service_discovery_v3_DiscoveryRequest* request =
envoy_service_discovery_v3_DiscoveryRequest_new(arena.ptr());
// Set type_url.
std::string type_url_str = absl::StrCat("type.googleapis.com/", type_url);
envoy_service_discovery_v3_DiscoveryRequest_set_type_url(
request, StdStringToUpbString(type_url_str));
// Set version_info.
if (!version.empty()) {
envoy_service_discovery_v3_DiscoveryRequest_set_version_info(
request, StdStringToUpbString(version));
}
// Set nonce.
if (!nonce.empty()) {
envoy_service_discovery_v3_DiscoveryRequest_set_response_nonce(
request, StdStringToUpbString(nonce));
}
// Set error_detail if it's a NACK.
std::string error_string_storage;
if (!status.ok()) {
google_rpc_Status* error_detail =
envoy_service_discovery_v3_DiscoveryRequest_mutable_error_detail(
request, arena.ptr());
// Hard-code INVALID_ARGUMENT as the status code.
// TODO(roth): If at some point we decide we care about this value,
// we could attach a status code to the individual errors where we
// generate them in the parsing code, and then use that here.
google_rpc_Status_set_code(error_detail, GRPC_STATUS_INVALID_ARGUMENT);
// Error description comes from the status that was passed in.
error_string_storage = std::string(status.message());
upb_StringView error_description =
StdStringToUpbString(error_string_storage);
google_rpc_Status_set_message(error_detail, error_description);
}
// Populate node.
if (populate_node) {
envoy_config_core_v3_Node* node_msg =
envoy_service_discovery_v3_DiscoveryRequest_mutable_node(request,
arena.ptr());
PopulateNode(node_msg, arena.ptr());
envoy_config_core_v3_Node_add_client_features(
node_msg, upb_StringView_FromString("xds.config.resource-in-sotw"),
context.arena);
}
// Add resource_names.
for (const std::string& resource_name : resource_names) {
envoy_service_discovery_v3_DiscoveryRequest_add_resource_names(
request, StdStringToUpbString(resource_name), arena.ptr());
}
MaybeLogDiscoveryRequest(context, request);
return SerializeDiscoveryRequest(context, request);
}

namespace {

void MaybeLogDiscoveryResponse(
const XdsApiContext& context,
const envoy_service_discovery_v3_DiscoveryResponse* response) {
if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer) && ABSL_VLOG_IS_ON(2)) {
const upb_MessageDef* msg_type =
envoy_service_discovery_v3_DiscoveryResponse_getmsgdef(
context.def_pool);
char buf[10240];
upb_TextEncode(reinterpret_cast<const upb_Message*>(response), msg_type,
nullptr, 0, buf, sizeof(buf));
VLOG(2) << "[xds_client " << context.client
<< "] received response: " << buf;
}
}

} // namespace

absl::Status XdsApi::ParseAdsResponse(absl::string_view encoded_response,
AdsResponseParserInterface* parser) {
upb::Arena arena;
const XdsApiContext context = {client_, tracer_, def_pool_->ptr(),
arena.ptr()};
// Decode the response.
const envoy_service_discovery_v3_DiscoveryResponse* response =
envoy_service_discovery_v3_DiscoveryResponse_parse(
encoded_response.data(), encoded_response.size(), arena.ptr());
// If decoding fails, report a fatal error and return.
if (response == nullptr) {
return absl::InvalidArgumentError("Can't decode DiscoveryResponse.");
}
MaybeLogDiscoveryResponse(context, response);
// Report the type_url, version, nonce, and number of resources to the parser.
AdsResponseParserInterface::AdsResponseFields fields;
fields.type_url = std::string(absl::StripPrefix(
UpbStringToAbsl(
envoy_service_discovery_v3_DiscoveryResponse_type_url(response)),
"type.googleapis.com/"));
fields.version = UpbStringToStdString(
envoy_service_discovery_v3_DiscoveryResponse_version_info(response));
fields.nonce = UpbStringToStdString(
envoy_service_discovery_v3_DiscoveryResponse_nonce(response));
size_t num_resources;
const google_protobuf_Any* const* resources =
envoy_service_discovery_v3_DiscoveryResponse_resources(response,
&num_resources);
fields.num_resources = num_resources;
absl::Status status = parser->ProcessAdsResponseFields(std::move(fields));
if (!status.ok()) return status;
// Process each resource.
for (size_t i = 0; i < num_resources; ++i) {
absl::string_view type_url = absl::StripPrefix(
UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])),
"type.googleapis.com/");
absl::string_view serialized_resource =
UpbStringToAbsl(google_protobuf_Any_value(resources[i]));
// Unwrap Resource messages, if so wrapped.
absl::string_view resource_name;
if (type_url == "envoy.service.discovery.v3.Resource") {
const auto* resource_wrapper = envoy_service_discovery_v3_Resource_parse(
serialized_resource.data(), serialized_resource.size(), arena.ptr());
if (resource_wrapper == nullptr) {
parser->ResourceWrapperParsingFailed(
i, "Can't decode Resource proto wrapper");
continue;
}
const auto* resource =
envoy_service_discovery_v3_Resource_resource(resource_wrapper);
if (resource == nullptr) {
parser->ResourceWrapperParsingFailed(
i, "No resource present in Resource proto wrapper");
continue;
}
type_url = absl::StripPrefix(
UpbStringToAbsl(google_protobuf_Any_type_url(resource)),
"type.googleapis.com/");
serialized_resource =
UpbStringToAbsl(google_protobuf_Any_value(resource));
resource_name = UpbStringToAbsl(
envoy_service_discovery_v3_Resource_name(resource_wrapper));
}
parser->ParseResource(context.arena, i, type_url, resource_name,
serialized_resource);
}
return absl::OkStatus();
}

} // namespace grpc_core
Loading

0 comments on commit bbefef3

Please sign in to comment.