Skip to content

Commit

Permalink
TUN-8914: Add a new configuration to locally override the max-active-…
Browse files Browse the repository at this point in the history
…flows

## Summary

This commit introduces a new command line flag, `--max-active-flows`, which allows overriding the remote configuration for the maximum number of active flows.

The flag can be used with the `run` command, like `cloudflared tunnel --no-autoupdate run --token <TUNNEL_TOKEN> --max-active-flows 50000`, or set via an environment variable `TUNNEL_MAX_ACTIVE_FLOWS`.

Note that locally-set values always take precedence over remote settings, even if the tunnel is remotely managed.

Closes TUN-8914
  • Loading branch information
jcsf committed Feb 3, 2025
1 parent 2feccd7 commit b187879
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 39 deletions.
3 changes: 2 additions & 1 deletion cmd/cloudflared/tunnel/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ var (
routeFailMsg = fmt.Sprintf("failed to provision routing, please create it manually via Cloudflare dashboard or UI; "+
"most likely you already have a conflicting record there. You can also rerun this command with --%s to overwrite "+
"any existing DNS records for this hostname.", overwriteDNSFlag)
errDeprecatedClassicTunnel = fmt.Errorf("Classic tunnels have been deprecated, please use Named Tunnels. (https://developers.cloudflare.com/cloudflare-one/connections/connect-apps/install-and-setup/tunnel-guide/)")
errDeprecatedClassicTunnel = errors.New("Classic tunnels have been deprecated, please use Named Tunnels. (https://developers.cloudflare.com/cloudflare-one/connections/connect-apps/install-and-setup/tunnel-guide/)")
// TODO: TUN-8756 the list below denotes the flags that do not possess any kind of sensitive information
// however this approach is not maintainble in the long-term.
nonSecretFlagsList = []string{
Expand Down Expand Up @@ -214,6 +214,7 @@ var (
"protocol",
"overwrite-dns",
"help",
"max-active-flows",
}
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/cloudflared/tunnel/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
var (
secretFlags = [2]*altsrc.StringFlag{credentialsContentsFlag, tunnelTokenFlag}

configFlags = []string{"autoupdate-freq", "no-autoupdate", "retries", "protocol", "loglevel", "transport-loglevel", "origincert", "metrics", "metrics-update-freq", "edge-ip-version", "edge-bind-address"}
configFlags = []string{"autoupdate-freq", "no-autoupdate", "retries", "protocol", "loglevel", "transport-loglevel", "origincert", "metrics", "metrics-update-freq", "edge-ip-version", "edge-bind-address", "max-active-flows"}
)

func logClientOptions(c *cli.Context, log *zerolog.Logger) {
Expand Down
6 changes: 6 additions & 0 deletions cmd/cloudflared/tunnel/subcommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ var (
Usage: "Network diagnostics won't be performed",
Value: false,
}
maxActiveFlowsFlag = &cli.Uint64Flag{
Name: "max-active-flows",
Usage: "Overrides the remote configuration for max active private network flows (TCP/UDP) that this cloudflared instance supports",
EnvVars: []string{"TUNNEL_MAX_ACTIVE_FLOWS"},
}
)

func buildCreateCommand() *cli.Command {
Expand Down Expand Up @@ -705,6 +710,7 @@ func buildRunCommand() *cli.Command {
tunnelTokenFlag,
icmpv4SrcFlag,
icmpv6SrcFlag,
maxActiveFlowsFlag,
}
flags = append(flags, configureProxyFlags(false)...)
return &cli.Command{
Expand Down
37 changes: 33 additions & 4 deletions orchestration/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"
"sync/atomic"

"github.com/pkg/errors"
pkgerrors "github.com/pkg/errors"
"github.com/rs/zerolog"

cfdflow "github.com/cloudflare/cloudflared/flow"

"github.com/cloudflare/cloudflared/config"
"github.com/cloudflare/cloudflared/connection"
cfdflow "github.com/cloudflare/cloudflared/flow"
"github.com/cloudflare/cloudflared/ingress"
"github.com/cloudflare/cloudflared/proxy"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
Expand Down Expand Up @@ -117,6 +117,30 @@ func (o *Orchestrator) UpdateConfig(version int32, config []byte) *pogs.UpdateCo
}
}

// overrideRemoteWarpRoutingWithLocalValues overrides the ingress.WarpRoutingConfig that comes from the remote with
// the local values if there is any.
func (o *Orchestrator) overrideRemoteWarpRoutingWithLocalValues(remoteWarpRouting *ingress.WarpRoutingConfig) error {
return o.overrideMaxActiveFlows(o.config.ConfigurationFlags["max-active-flows"], remoteWarpRouting)
}

// overrideMaxActiveFlows checks the local configuration flags, and if a value is found for the flags.MaxActiveFlows
// overrides the value that comes on the remote ingress.WarpRoutingConfig with the local value.
func (o *Orchestrator) overrideMaxActiveFlows(maxActiveFlowsLocalConfig string, remoteWarpRouting *ingress.WarpRoutingConfig) error {
// If max active flows isn't defined locally just use the remote value
if maxActiveFlowsLocalConfig == "" {
return nil
}

maxActiveFlowsLocalOverride, err := strconv.ParseUint(maxActiveFlowsLocalConfig, 10, 64)
if err != nil {
return pkgerrors.Wrapf(err, "failed to parse %s", "max-active-flows")
}

// Override the value that comes from the remote with the local value
remoteWarpRouting.MaxActiveFlows = maxActiveFlowsLocalOverride
return nil
}

// The caller is responsible to make sure there is no concurrent access
func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRouting ingress.WarpRoutingConfig) error {
select {
Expand All @@ -125,6 +149,11 @@ func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRouting i
default:
}

// Overrides the local values, onto the remote values of the warp routing configuration
if err := o.overrideRemoteWarpRoutingWithLocalValues(&warpRouting); err != nil {
return pkgerrors.Wrap(err, "failed to merge local overrides into warp routing configuration")
}

// Assign the internal ingress rules to the parsed ingress
ingressRules.InternalRules = o.internalRules

Expand All @@ -139,7 +168,7 @@ func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRouting i
// The downside is minimized because none of the ingress.OriginService implementation have that requirement
proxyShutdownC := make(chan struct{})
if err := ingressRules.StartOrigins(o.log, proxyShutdownC); err != nil {
return errors.Wrap(err, "failed to start origin")
return pkgerrors.Wrap(err, "failed to start origin")
}

// Update the flow limit since the configuration might have changed
Expand Down
Loading

0 comments on commit b187879

Please sign in to comment.