Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process kafka requests in a separate scheduling group #24973

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,12 @@ configuration::configuration()
"Use a separate scheduler group for kafka produce requests processing.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
true)
, use_kafka_handler_scheduler_group(
*this,
"use_kafka_handler_scheduler_group",
"Use separate scheduler group to handle parsing Kafka protocol requests",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
true)
, metadata_status_wait_timeout_ms(
*this,
"metadata_status_wait_timeout_ms",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ struct configuration final : public config_store {
property<size_t> fetch_max_bytes;
property<bool> use_fetch_scheduler_group;
property<bool> use_produce_scheduler_group;
property<bool> use_kafka_handler_scheduler_group;
property<std::chrono::milliseconds> metadata_status_wait_timeout_ms;
property<std::chrono::seconds> kafka_tcp_keepalive_idle_timeout_seconds;
property<std::chrono::seconds> kafka_tcp_keepalive_probe_interval_seconds;
Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ seastar::future<> server_app::init(
seastar::smp_service_group smp,
seastar::scheduling_group fetch_sched,
seastar::scheduling_group prod_sched,
seastar::scheduling_group handler_sched,
seastar::sharded<cluster::metadata_cache>& mdc,
seastar::sharded<cluster::topics_frontend>& tf,
seastar::sharded<cluster::config_frontend>& cf,
Expand Down Expand Up @@ -49,6 +50,7 @@ seastar::future<> server_app::init(
smp,
fetch_sched,
prod_sched,
handler_sched,
std::ref(mdc),
std::ref(tf),
std::ref(cf),
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class server_app {
seastar::smp_service_group,
seastar::scheduling_group,
seastar::scheduling_group,
seastar::scheduling_group,
seastar::sharded<cluster::metadata_cache>&,
seastar::sharded<cluster::topics_frontend>&,
seastar::sharded<cluster::config_frontend>&,
Expand Down
6 changes: 6 additions & 0 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <seastar/core/temporary_buffer.hh>
#include <seastar/core/with_timeout.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/coroutine/switch_to.hh>

#include <chrono>
#include <cstdint>
Expand Down Expand Up @@ -355,6 +356,7 @@ ss::future<> connection_context::revoke_credentials(std::string_view name) {
}

ss::future<> connection_context::process() {
co_await ss::coroutine::switch_to(_server.get_request_handler_sg());
while (true) {
if (is_finished_parsing()) {
break;
Expand Down Expand Up @@ -682,6 +684,10 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
? std::make_optional<ss::sstring>(*hdr.client_id)
: std::nullopt,
};

co_await ss::coroutine::switch_to(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@travisdowns related to the discussion from yesterday, does this even allocate the coroutine frame if await_ready returns true (because we are already in the right group - ignoring for a second whether we actually are here or not)? I thought not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which coroutine do you refer to? In any case there shouldn't be a coroutine frame allocated per switch_to, whether the switch happens or not. What await_ready returning true does is avoids an unnecessary task switch (which is probably more expensive even than a coro frame in "micro" costs, and certainly in macro costs as it breaks out of the current executing request, etc).

_server.get_request_scheduling_group(hdr.key));

auto sres_in = co_await throttle_request(std::move(r_data), size);
if (abort_requested()) {
// protect against shutdown behavior
Expand Down
45 changes: 20 additions & 25 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1543,32 +1543,27 @@ fetch_handler::handle(request_context rctx, ss::smp_service_group ssg) {
return ss::do_with(
std::make_unique<op_context>(std::move(rctx), ssg),
[](std::unique_ptr<op_context>& octx_ptr) {
auto sg
= octx_ptr->rctx.connection()->server().fetch_scheduling_group();
return ss::with_scheduling_group(sg, [&octx_ptr] {
auto& octx = *octx_ptr;

log_request(octx.rctx.header(), octx.request);
// top-level error is used for session-level errors
if (octx.session_ctx.has_error()) {
octx.response.data.error_code = octx.session_ctx.error();
return std::move(octx).send_response();
auto& octx = *octx_ptr;
log_request(octx.rctx.header(), octx.request);
// top-level error is used for session-level errors
if (octx.session_ctx.has_error()) {
octx.response.data.error_code = octx.session_ctx.error();
return std::move(octx).send_response();
}
if (unlikely(octx.rctx.recovery_mode_enabled())) {
octx.response.data.error_code = error_code::policy_violation;
return std::move(octx).send_response();
}
octx.response.data.error_code = error_code::none;
return do_fetch(octx).then([&octx] {
// NOTE: Audit call doesn't happen until _after_ the fetch
// is done. This was done for the sake of simplicity and
// because fetch doesn't alter the state of the broker
if (!octx.rctx.audit()) {
return std::move(octx).send_error_response(
error_code::broker_not_available);
}
if (unlikely(octx.rctx.recovery_mode_enabled())) {
octx.response.data.error_code = error_code::policy_violation;
return std::move(octx).send_response();
}
octx.response.data.error_code = error_code::none;
return do_fetch(octx).then([&octx] {
// NOTE: Audit call doesn't happen until _after_ the fetch
// is done. This was done for the sake of simplicity and
// because fetch doesn't alter the state of the broker
if (!octx.rctx.audit()) {
return std::move(octx).send_error_response(
error_code::broker_not_available);
}
return std::move(octx).send_response();
});
return std::move(octx).send_response();
});
});
}
Expand Down
215 changes: 98 additions & 117 deletions src/v/kafka/server/handlers/produce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -756,126 +756,107 @@ produce_handler::handle(request_context ctx, ss::smp_service_group ssg) {
error_code::invalid_topic_exception,
resp);
request.data.topics.erase_to_end(migrated_it);
auto sg = ctx.server().local().produce_scheduling_group();
ss::promise<> dispatched_promise;
auto dispatched_f = dispatched_promise.get_future();
auto produced_f = ss::with_scheduling_group(
sg,
[ctx = std::move(ctx),
resp = std::move(resp),
ssg,
request = std::move(request),
dispatched_promise = std::move(dispatched_promise)]() mutable {
return ss::do_with(
produce_ctx(
std::move(ctx), std::move(request), std::move(resp), ssg),
[dispatched_promise = std::move(dispatched_promise)](
produce_ctx& octx) mutable {
// dispatch produce requests for each topic
auto stages = produce_topics(octx);
std::vector<ss::future<>> dispatched;
std::vector<ss::future<produce_response::topic>> produced;
dispatched.reserve(stages.size());
produced.reserve(stages.size());

for (auto& s : stages) {
dispatched.push_back(std::move(s.dispatched));
produced.push_back(std::move(s.produced));

auto produced_f = ss::do_with(
produce_ctx(std::move(ctx), std::move(request), std::move(resp), ssg),
[dispatched_promise = std::move(dispatched_promise)](
produce_ctx& octx) mutable {
// dispatch produce requests for each topic
auto stages = produce_topics(octx);
std::vector<ss::future<>> dispatched;
std::vector<ss::future<produce_response::topic>> produced;
dispatched.reserve(stages.size());
produced.reserve(stages.size());

for (auto& s : stages) {
dispatched.push_back(std::move(s.dispatched));
produced.push_back(std::move(s.produced));
}
return seastar::when_all_succeed(dispatched.begin(), dispatched.end())
.then_wrapped([&octx,
dispatched_promise = std::move(dispatched_promise),
produced = std::move(produced)](
ss::future<> f) mutable {
try {
f.get();
dispatched_promise.set_value();
// collect topic responses
return when_all_succeed(produced.begin(), produced.end())
.then(
[&octx](std::vector<produce_response::topic> topics) {
std::move(
topics.begin(),
topics.end(),
std::back_inserter(octx.response.data.responses));
})
.then([&octx] {
// send response immediately
if (octx.request.data.acks != 0) {
return octx.rctx.respond(
std::move(octx.response));
}

// acks = 0 is handled separately. first, check
// for errors
bool has_error = false;
for (const auto& topic :
octx.response.data.responses) {
for (const auto& p : topic.partitions) {
if (p.error_code != error_code::none) {
has_error = true;
break;
}
}
}

// in the absence of errors, acks = 0 results in
// the response being dropped, as the client
// does not expect a response. here we mark the
// response as noop, but let it flow back so
// that it can be accounted for in quota and
// stats tracking. it is dropped later during
// processing.
if (!has_error) {
return octx.rctx.respond(std::move(octx.response))
.then([](response_ptr resp) {
resp->mark_noop();
return resp;
});
}

// errors in a response from an acks=0 produce
// request result in the connection being
// dropped to signal an issue to the client
return ss::make_exception_future<response_ptr>(
std::runtime_error(fmt::format(
"Closing connection due to error in produce "
"response: {}",
octx.response)));
});
} catch (...) {
/*
* if the first stage failed then we cannot resolve
* the current future (do_with holding octx)
* immediately, otherwise octx will be destroyed and
* all of the second stage futures (which have a
* reference to octx) will be backgrounded. logging
* about the second stage return value is handled in
* connection_context handler.
*/
dispatched_promise.set_exception(std::current_exception());
return when_all_succeed(produced.begin(), produced.end())
.discard_result()
.then([] {
return ss::make_exception_future<response_ptr>(
std::runtime_error("First stage produce failed but "
"second stage succeeded."));
})
.handle_exception([](std::exception_ptr e) {
return ss::make_exception_future<response_ptr>(e);
});
}
return seastar::when_all_succeed(
dispatched.begin(), dispatched.end())
.then_wrapped([&octx,
dispatched_promise = std::move(
dispatched_promise),
produced = std::move(produced)](
ss::future<> f) mutable {
try {
f.get();
dispatched_promise.set_value();
// collect topic responses
return when_all_succeed(
produced.begin(), produced.end())
.then(
[&octx](
std::vector<produce_response::topic> topics) {
std::move(
topics.begin(),
topics.end(),
std::back_inserter(
octx.response.data.responses));
})
.then([&octx] {
// send response immediately
if (octx.request.data.acks != 0) {
return octx.rctx.respond(
std::move(octx.response));
}

// acks = 0 is handled separately. first, check
// for errors
bool has_error = false;
for (const auto& topic :
octx.response.data.responses) {
for (const auto& p : topic.partitions) {
if (p.error_code != error_code::none) {
has_error = true;
break;
}
}
}

// in the absence of errors, acks = 0 results in
// the response being dropped, as the client
// does not expect a response. here we mark the
// response as noop, but let it flow back so
// that it can be accounted for in quota and
// stats tracking. it is dropped later during
// processing.
if (!has_error) {
return octx.rctx
.respond(std::move(octx.response))
.then([](response_ptr resp) {
resp->mark_noop();
return resp;
});
}

// errors in a response from an acks=0 produce
// request result in the connection being
// dropped to signal an issue to the client
return ss::make_exception_future<
response_ptr>(std::runtime_error(fmt::format(
"Closing connection due to error in produce "
"response: {}",
octx.response)));
});
} catch (...) {
/*
* if the first stage failed then we cannot resolve
* the current future (do_with holding octx)
* immediately, otherwise octx will be destroyed and
* all of the second stage futures (which have a
* reference to octx) will be backgrounded. logging
* about the second stage return value is handled in
* connection_context handler.
*/
dispatched_promise.set_exception(
std::current_exception());
return when_all_succeed(
produced.begin(), produced.end())
.discard_result()
.then([] {
return ss::make_exception_future<response_ptr>(
std::runtime_error(
"First stage produce failed but "
"second stage succeeded."));
})
.handle_exception([](std::exception_ptr e) {
return ss::make_exception_future<response_ptr>(
e);
});
}
});
});
});

Expand Down
18 changes: 18 additions & 0 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ server::server(
ss::smp_service_group smp,
ss::scheduling_group fetch_sg,
ss::scheduling_group produce_sg,
ss::scheduling_group handler_sg,
ss::sharded<cluster::metadata_cache>& meta,
ss::sharded<cluster::topics_frontend>& tf,
ss::sharded<cluster::config_frontend>& cf,
Expand Down Expand Up @@ -147,6 +148,7 @@ server::server(
, _smp_group(smp)
, _fetch_scheduling_group(fetch_sg)
, _produce_scheduling_group(produce_sg)
, _request_handler_scheduling_group(handler_sg)
, _topics_frontend(tf)
, _config_frontend(cf)
, _feature_table(ft)
Expand Down Expand Up @@ -236,10 +238,26 @@ ss::scheduling_group server::produce_scheduling_group() const {
: ss::default_scheduling_group();
}

ss::scheduling_group server::get_request_handler_sg() const {
return config::shard_local_cfg().use_kafka_handler_scheduler_group()
? _request_handler_scheduling_group
: ss::default_scheduling_group();
}

coordinator_ntp_mapper& server::coordinator_mapper() {
return _group_router.local().coordinator_mapper().local();
}

ss::scheduling_group server::get_request_scheduling_group(api_key key) const {
if (key == produce_request::api_type::key) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this kind of stuff usually we want to add a handler method I think, rather than having the kafka server know anything about the different keys. E.g., a get_request_sg() method on the generic handler which returns the group (or none/default) to use?

Is the problem that the handlers shouldn't access config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, i will change that

return produce_scheduling_group();
}
if (key == fetch_request::api_type::key) {
return fetch_scheduling_group();
}
return get_request_handler_sg();
}

config::broker_authn_method get_authn_method(const net::connection& conn) {
// If authn_method is set on the endpoint
// Use it
Expand Down
Loading