diff --git a/cmd/webhook/admission/admission_test.go b/cmd/webhook/admission/admission_test.go index d3869de555..9563c2a7d3 100644 --- a/cmd/webhook/admission/admission_test.go +++ b/cmd/webhook/admission/admission_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/zalando/skipper/dataclients/kubernetes/definitions" ) const ( @@ -122,6 +123,21 @@ func TestRouteGroupAdmitter(t *testing.T) { inputFile: "rg-with-invalid-eskip-filters-and-predicates.json", message: "parse failed after token status, last route id: , position 11: syntax error\\nparse failed after token Method, last route id: Method, position 6: syntax error", }, + { + name: "invalid routgroup multiple filters per json/yaml array item", + inputFile: "rg-with-multiple-filters.json", + message: `single filter expected at \"status(201) -> inlineContent(\\\"hi\\\")\"\nsingle filter expected at \" \"`, + }, + { + name: "invalid routgroup multiple predicates per json/yaml array item", + inputFile: "rg-with-multiple-predicates.json", + message: `single predicate expected at \"Method(\\\"GET\\\") && Path(\\\"/\\\")\"\nsingle predicate expected at \" \"`, + }, + { + name: "routegroup with invalid backends", + inputFile: "rg-with-invalid-backend-path.json", + message: `backend address \"http://example.com/foo\" contains path, query or missing scheme\nbackend address \"http://example.com/foo/bar\" contains path, query or missing scheme\nbackend address \"http://example.com/foo/\" contains path, query or missing scheme\nbackend address \"/foo\" contains path, query or missing scheme\nbackend address \"http://example.com/\" contains path, query or missing scheme\nbackend address \"example.com/\" contains path, query or missing scheme\nbackend address \"example.com/foo\" contains path, query or missing scheme\nbackend address \"http://example.com?foo=bar\" contains path, query or missing scheme\nbackend address \"example.com\" contains path, query or missing scheme`, + }, } { t.Run(tc.name, func(t *testing.T) { expectedResponse := responseAllowedFmt @@ -136,7 +152,7 @@ func TestRouteGroupAdmitter(t *testing.T) { req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() - rgAdm := &RouteGroupAdmitter{} + rgAdm := &RouteGroupAdmitter{RouteGroupValidator: &definitions.RouteGroupValidator{}} h := Handler(rgAdm) h(w, req) @@ -200,7 +216,7 @@ func TestIngressAdmitter(t *testing.T) { req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() - ingressAdm := &IngressAdmitter{} + ingressAdm := &IngressAdmitter{IngressValidator: &definitions.IngressV1Validator{}} h := Handler(ingressAdm) h(w, req) @@ -217,8 +233,8 @@ func TestIngressAdmitter(t *testing.T) { } func TestMalformedRequests(t *testing.T) { - routeGroupHandler := Handler(&RouteGroupAdmitter{}) - ingressHandler := Handler(&IngressAdmitter{}) + routeGroupHandler := Handler(&RouteGroupAdmitter{RouteGroupValidator: &definitions.RouteGroupValidator{}}) + ingressHandler := Handler(&IngressAdmitter{IngressValidator: &definitions.IngressV1Validator{}}) for _, tc := range []struct { name string diff --git a/cmd/webhook/admission/testdata/rg/rg-with-invalid-backend-path.json b/cmd/webhook/admission/testdata/rg/rg-with-invalid-backend-path.json new file mode 100644 index 0000000000..d604b97244 --- /dev/null +++ b/cmd/webhook/admission/testdata/rg/rg-with-invalid-backend-path.json @@ -0,0 +1,103 @@ +{ + "request": { + "uid": "req-uid", + "name": "req1", + "operation": "create", + "kind": { + "group": "zalando", + "version": "v1", + "kind": "RouteGroup" + }, + "namespace": "n1", + "object": { + "metadata": { + "name": "rg1", + "namespace": "n1" + }, + "spec": { + "backends": [ + { + "name": "valid-backend1", + "type": "shunt" + }, + { + "name": "valid-backend3", + "type": "network", + "address": "http://example.com" + }, + { + "name": "invalid-backend2", + "type": "network", + "address": "http://example.com/foo" + }, + { + "name": "invalid-backend3", + "type": "network", + "address": "http://example.com/foo/bar" + }, + { + "name": "invalid-backend4", + "type": "network", + "address": "http://example.com/foo/" + }, + { + "name": "invalid-backend5", + "type": "network", + "address": "/foo" + }, + { + "name": "valid-backend2", + "type": "network", + "address": "http://user:pass@example.com" + }, + { + "name": "invalid-backend6", + "type": "network", + "address": "http://example.com/" + }, + { + "name": "invalid-backend7", + "type": "network", + "address": "example.com/" + }, + { + "name" : "invalid-backend8", + "type" : "network", + "address" : "example.com/foo" + }, + { + "name" : "invalid-backend9", + "type" : "network", + "address" : "http://example.com?foo=bar" + }, + { + "name": "invalid-backend10", + "type": "network", + "address": "example.com" + } + ], + "defaultBackends": [ + { + "backendName": "valid-backend1" + } + ], + "routes": [ + { + "backends": [ + { + "backendName": "valid-backend1" + } + ], + "filters": [ + "status(201)" + ], + "path": "/", + "predicates": [ + "Method(\"GET\")" + ] + } + ] + } + } + } +} diff --git a/cmd/webhook/admission/testdata/rg/rg-with-multiple-filters.json b/cmd/webhook/admission/testdata/rg/rg-with-multiple-filters.json new file mode 100644 index 0000000000..02abcfd9a6 --- /dev/null +++ b/cmd/webhook/admission/testdata/rg/rg-with-multiple-filters.json @@ -0,0 +1,49 @@ +{ + "request": { + "uid": "req-uid", + "name": "req1", + "operation": "create", + "kind": { + "group": "zalando", + "version": "v1", + "kind": "RouteGroup" + }, + "namespace": "n1", + "object": { + "metadata": { + "name": "rg1", + "namespace": "n1" + }, + "spec": { + "backends": [ + { + "name": "backend", + "type": "shunt" + } + ], + "defaultBackends": [ + { + "backendName": "backend" + } + ], + "routes": [ + { + "backends": [ + { + "backendName": "backend" + } + ], + "filters": [ + "status(201) -> inlineContent(\"hi\")", + " " + ], + "path": "/", + "predicates": [ + "Method(\"GET\")" + ] + } + ] + } + } + } +} diff --git a/cmd/webhook/admission/testdata/rg/rg-with-multiple-predicates.json b/cmd/webhook/admission/testdata/rg/rg-with-multiple-predicates.json new file mode 100644 index 0000000000..47090f6a83 --- /dev/null +++ b/cmd/webhook/admission/testdata/rg/rg-with-multiple-predicates.json @@ -0,0 +1,49 @@ +{ + "request": { + "uid": "req-uid", + "name": "req1", + "operation": "create", + "kind": { + "group": "zalando", + "version": "v1", + "kind": "RouteGroup" + }, + "namespace": "n1", + "object": { + "metadata": { + "name": "rg1", + "namespace": "n1" + }, + "spec": { + "backends": [ + { + "name": "backend", + "type": "shunt" + } + ], + "defaultBackends": [ + { + "backendName": "backend" + } + ], + "routes": [ + { + "backends": [ + { + "backendName": "backend" + } + ], + "filters": [ + "status(201)" + ], + "path": "/", + "predicates": [ + "Method(\"GET\") && Path(\"/\")", + " " + ] + } + ] + } + } + } +} diff --git a/dataclients/kubernetes/definitions/definitions_test.go b/dataclients/kubernetes/definitions/definitions_test.go index 46d98586cd..5edab7fa4c 100644 --- a/dataclients/kubernetes/definitions/definitions_test.go +++ b/dataclients/kubernetes/definitions/definitions_test.go @@ -2,6 +2,7 @@ package definitions_test import ( "os" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -18,10 +19,15 @@ func TestValidateRouteGroups(t *testing.T) { data, err := os.ReadFile("testdata/errorwrapdata/routegroups.json") require.NoError(t, err) + logs, err := os.ReadFile("testdata/errorwrapdata/errors.log") + require.NoError(t, err) + + logsString := strings.TrimSuffix(string(logs), "\n") + rgl, err := definitions.ParseRouteGroupsJSON(data) require.NoError(t, err) err = definitions.ValidateRouteGroups(&rgl) - assert.EqualError(t, err, "route group without name\nerror in route group default/rg1: route group without backend") + assert.EqualError(t, err, logsString) } diff --git a/dataclients/kubernetes/definitions/routegroups.go b/dataclients/kubernetes/definitions/routegroups.go index 626880e7d5..922d363149 100644 --- a/dataclients/kubernetes/definitions/routegroups.go +++ b/dataclients/kubernetes/definitions/routegroups.go @@ -25,8 +25,6 @@ var ( errRouteGroupWithoutName = errors.New("route group without name") errRouteGroupWithoutSpec = errors.New("route group without spec") errInvalidRouteSpec = errors.New("invalid route spec") - errInvalidPredicate = errors.New("invalid predicate") - errInvalidFilter = errors.New("invalid filter") errInvalidMethod = errors.New("invalid method") errBothPathAndPathSubtree = errors.New("path and path subtree in the same route") errMissingBackendReference = errors.New("missing backend reference") diff --git a/dataclients/kubernetes/definitions/routegroupvalidator.go b/dataclients/kubernetes/definitions/routegroupvalidator.go index 9159ea6703..21fa31427b 100644 --- a/dataclients/kubernetes/definitions/routegroupvalidator.go +++ b/dataclients/kubernetes/definitions/routegroupvalidator.go @@ -1,11 +1,20 @@ package definitions import ( + "errors" + "fmt" + "net/url" + "github.com/zalando/skipper/eskip" ) type RouteGroupValidator struct{} +var ( + errSingleFilterExpected = errors.New("single filter expected") + errSinglePredicateExpected = errors.New("single predicate expected") +) + var defaultRouteGroupValidator = &RouteGroupValidator{} // ValidateRouteGroup validates a RouteGroupItem @@ -27,8 +36,9 @@ func (rgv *RouteGroupValidator) Validate(item *RouteGroupItem) error { return err } var errs []error - errs = append(errs, rgv.filtersValidation(item)) - errs = append(errs, rgv.predicatesValidation(item)) + errs = append(errs, rgv.validateFilters(item)) + errs = append(errs, rgv.validatePredicates(item)) + errs = append(errs, rgv.validateBackends(item)) return errorsJoin(errs...) } @@ -52,24 +62,47 @@ func (rgv *RouteGroupValidator) basicValidation(r *RouteGroupItem) error { return nil } -func (rgv *RouteGroupValidator) filtersValidation(item *RouteGroupItem) error { +func (rgv *RouteGroupValidator) validateFilters(item *RouteGroupItem) error { var errs []error for _, r := range item.Spec.Routes { for _, f := range r.Filters { - _, err := eskip.ParseFilters(f) - errs = append(errs, err) + filters, err := eskip.ParseFilters(f) + if err != nil { + errs = append(errs, err) + } else if len(filters) != 1 { + errs = append(errs, fmt.Errorf("%w at %q", errSingleFilterExpected, f)) + } } } return errorsJoin(errs...) } -func (rgv *RouteGroupValidator) predicatesValidation(item *RouteGroupItem) error { +func (rgv *RouteGroupValidator) validatePredicates(item *RouteGroupItem) error { var errs []error for _, r := range item.Spec.Routes { for _, p := range r.Predicates { - _, err := eskip.ParsePredicates(p) - errs = append(errs, err) + predicates, err := eskip.ParsePredicates(p) + if err != nil { + errs = append(errs, err) + } else if len(predicates) != 1 { + errs = append(errs, fmt.Errorf("%w at %q", errSinglePredicateExpected, p)) + } + } + } + return errorsJoin(errs...) +} + +func (rgv *RouteGroupValidator) validateBackends(item *RouteGroupItem) error { + var errs []error + for _, backend := range item.Spec.Backends { + if backend.Type == eskip.NetworkBackend { + address, err := url.Parse(backend.Address) + if err != nil { + errs = append(errs, fmt.Errorf("failed to parse backend address %q: %w", backend.Address, err)) + } else if address.Path != "" || address.RawQuery != "" || address.Scheme == "" { + errs = append(errs, fmt.Errorf("backend address %q contains path, query or missing scheme", backend.Address)) + } } } return errorsJoin(errs...) @@ -131,14 +164,6 @@ func (r *RouteSpec) validate(hasDefault bool, backends map[string]bool) error { return errBothPathAndPathSubtree } - if hasEmpty(r.Predicates) { - return errInvalidPredicate - } - - if hasEmpty(r.Filters) { - return errInvalidFilter - } - if hasEmpty(r.Methods) { return errInvalidMethod } diff --git a/dataclients/kubernetes/definitions/testdata/errorwrapdata/errors.log b/dataclients/kubernetes/definitions/testdata/errorwrapdata/errors.log new file mode 100644 index 0000000000..b5c5856864 --- /dev/null +++ b/dataclients/kubernetes/definitions/testdata/errorwrapdata/errors.log @@ -0,0 +1,13 @@ +route group without name +error in route group default/rg1: route group without backend +single predicate expected at "Path(\"/foo\") && Method(\"GET\")" +single predicate expected at "" +single filter expected at "inlineContent(\"/foo\") -> status(200)" +single filter expected at " " +backend address "http://example.com/foo" contains path, query or missing scheme +backend address "http://example.com/foo/bar" contains path, query or missing scheme +backend address "http://example.com/foo/" contains path, query or missing scheme +backend address "/foo" contains path, query or missing scheme +backend address "example.com/" contains path, query or missing scheme +backend address "http://example.com?foo=bar" contains path, query or missing scheme +backend address "example.com" contains path, query or missing scheme diff --git a/dataclients/kubernetes/definitions/testdata/errorwrapdata/routegroups.json b/dataclients/kubernetes/definitions/testdata/errorwrapdata/routegroups.json index b2d7b9a315..c512a644bc 100644 --- a/dataclients/kubernetes/definitions/testdata/errorwrapdata/routegroups.json +++ b/dataclients/kubernetes/definitions/testdata/errorwrapdata/routegroups.json @@ -45,10 +45,153 @@ "kind": "RouteGroup", "spec": { "backends": [ - { + { + "name": "shunt", + "type": "shunt" + } + ], + "hosts": [ + "test.example.com" + ], + "routes": [ + { + "backends": [ + { + "backendName": "shunt" + } + ], + "filters": [ + "inlineContent(\"/foo\")" + ], + "path": "/foo" + } + ] + } + }, + { + "apiVersion": "zalando.org/v1", + "metadata": { + "name": "rg1" + }, + "kind": "RouteGroup", + "spec": { + "backends": [ + { + "name": "shunt", + "type": "shunt" + } + ], + "hosts": [ + "test.example.com" + ], + "routes": [ + { + "backends": [ + { + "backendName": "shunt" + } + ], + "predicates": [ + "Path(\"/foo\") && Method(\"GET\")", + "" + ], + "filters": [ + "inlineContent(\"/foo\")" + ], + "path": "/foo" + } + ] + } + }, + { + "apiVersion": "zalando.org/v1", + "metadata": { + "name": "rg1" + }, + "kind": "RouteGroup", + "spec": { + "backends": [ + { + "name": "shunt", + "type": "shunt" + } + ], + "hosts": [ + "test.example.com" + ], + "routes": [ + { + "backends": [ + { + "backendName": "shunt" + } + ], + "filters": [ + "inlineContent(\"/foo\") -> status(200)", + " " + ], + "path": "/foo" + } + ] + } + }, + { + "apiVersion": "zalando.org/v1", + "metadata": { + "name": "rg1" + }, + "kind": "RouteGroup", + "spec": { + "backends": [ + { + "name": "backend1", + "type": "network", + "address": "http://example.com" + }, + { + "name": "backend2", + "type": "network", + "address": "http://example.com/foo" + }, + { + "name": "backend3", + "type": "network", + "address": "http://example.com/foo/bar" + }, + { + "name": "backend4", + "type": "network", + "address": "http://example.com/foo/" + }, + { + "name": "backend5", + "type": "network", + "address": "/foo" + }, + { + "name": "backend6", + "type": "network", + "address": "example.com/" + }, + { + "name": "backend7", + "type": "network", + "address": "http://user:pass@example.com" + }, + { + "name": "backend8", + "type": "network", + "address": "http://example.com?foo=bar" + }, + { + "name": "backend9", + "type": "network", + "address": "example.com" + }, + { "name": "shunt", "type": "shunt" - } + } ], "hosts": [ "test.example.com" diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-filter.log b/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-filter.log index 1ccd7d1fe1..6300163089 100644 --- a/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-filter.log +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-filter.log @@ -1,2 +1 @@ -test-route-group -invalid filter +single filter expected diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-predicate.log b/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-predicate.log index d222d6d555..6ec8ec4cbd 100644 --- a/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-predicate.log +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-empty-predicate.log @@ -1,2 +1 @@ -test-route-group -invalid predicate +single predicate expected diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-invalid-backend-with-path.log b/dataclients/kubernetes/definitions/testdata/validation/route-with-invalid-backend-with-path.log new file mode 100644 index 0000000000..c6d33f774f --- /dev/null +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-invalid-backend-with-path.log @@ -0,0 +1,8 @@ +backend address \\\"http://example\.com/foo\\\" contains path, query or missing scheme +backend address \\\"http://example\.com/foo/bar\\\" contains path, query or missing scheme +backend address \\\"/foo\\\" contains path, query or missing scheme +backend address \\\"/foo/bar\\\" contains path, query or missing scheme +backend address \\\"example\.com/foo\\\" contains path, query or missing scheme +backend address \\\"http://example\.com/\\\" contains path, query or missing scheme +backend address \\\"http://example\.com\?foo=bar\\\" contains path, query or missing scheme +backend address \\\"example\.com\\\" contains path, query or missing scheme diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-invalid-backend-with-path.yaml b/dataclients/kubernetes/definitions/testdata/validation/route-with-invalid-backend-with-path.yaml new file mode 100644 index 0000000000..dc7b64a935 --- /dev/null +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-invalid-backend-with-path.yaml @@ -0,0 +1,56 @@ +apiVersion: zalando.org/v1 +kind: RouteGroup +metadata: + name: test-route-group +spec: + hosts: + - example.org + backends: + - name: app + type: service + serviceName: app-svc + servicePort: 80 + - name: backend1 + type: network + address: http://example.com + - name: backend2 + type: network + address: http://example.com/foo + - name: backend3 + type: network + address: http://example.com/foo/bar + - name: backend4 + type: network + address: /foo + - name: backend5 + type: network + address: /foo/bar + - name: backend6 + type: network + address: example.com/foo + - name: backend7 + type: network + address: http://example.com/ + - name: backend8 + type: network + address: http://user:pass@example.com + - name: backend9 + type: network + address: http://example.com?foo=bar + - name: backend10 + type: network + address: example.com + defaultBackends: + - backendName: app + routes: + - path: / + methods: + - GET + - HEAD + predicates: + - Foo("X-Bar", "42") + filters: + - foo(42) + - bar(24) + backends: + - backendName: app diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.log b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.log new file mode 100644 index 0000000000..1b78680a21 --- /dev/null +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.log @@ -0,0 +1 @@ +single filter expected at \\\"foo\(42\) -> bar\(24\)\\\" diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.yaml b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.yaml new file mode 100644 index 0000000000..c9fe72cdc2 --- /dev/null +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-filters-one-item.yaml @@ -0,0 +1,25 @@ +apiVersion: zalando.org/v1 +kind: RouteGroup +metadata: + name: test-route-group +spec: + hosts: + - example.org + backends: + - name: app + type: service + serviceName: app-svc + servicePort: 80 + defaultBackends: + - backendName: app + routes: + - path: / + methods: + - GET + - HEAD + predicates: + - Foo("X-Bar", "42") + filters: + - foo(42) -> bar(24) + backends: + - backendName: app diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.log b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.log new file mode 100644 index 0000000000..ee42e11293 --- /dev/null +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.log @@ -0,0 +1 @@ +single predicate expected at \\\"Foo\(\\\\\\"X-Bar\\\\\\", \\\\\\"42\\\\\\"\) && Bar\(\\\\\\"X-Foo\\\\\\", \\\\\\"24\\\\\\"\)\\\" diff --git a/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.yaml b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.yaml new file mode 100644 index 0000000000..3bdd72f2d0 --- /dev/null +++ b/dataclients/kubernetes/definitions/testdata/validation/route-with-multiple-predicates-one-item.yaml @@ -0,0 +1,26 @@ +apiVersion: zalando.org/v1 +kind: RouteGroup +metadata: + name: test-route-group +spec: + hosts: + - example.org + backends: + - name: app + type: service + serviceName: app-svc + servicePort: 80 + defaultBackends: + - backendName: app + routes: + - path: / + methods: + - GET + - HEAD + predicates: + - Foo("X-Bar", "42") && Bar("X-Foo", "24") + filters: + - foo(42) + - bar(24) + backends: + - backendName: app diff --git a/dataclients/kubernetes/endpointslices.go b/dataclients/kubernetes/endpointslices.go index 148e78fee1..f9dd0d57e4 100644 --- a/dataclients/kubernetes/endpointslices.go +++ b/dataclients/kubernetes/endpointslices.go @@ -153,5 +153,6 @@ func (ep *EndpointSliceEndpoints) isReady() bool { return false } // defaults to ready, see also https://github.com/kubernetes/kubernetes/blob/91aca10d5984313c1c5858979d4946ff9446615f/pkg/proxy/endpointslicecache.go#L137C39-L139 - return ep.Conditions.Ready == nil || *ep.Conditions.Ready || ep.Conditions.Serving == nil || *ep.Conditions.Serving + // we ignore serving because of https://github.com/zalando/skipper/issues/2684 + return ep.Conditions.Ready == nil || *ep.Conditions.Ready } diff --git a/dataclients/kubernetes/testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-serving.eskip b/dataclients/kubernetes/testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-serving.eskip index dfcf6a1594..dd6a35f699 100644 --- a/dataclients/kubernetes/testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-serving.eskip +++ b/dataclients/kubernetes/testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-serving.eskip @@ -1,13 +1,13 @@ // default backend, target 1: kube_namespace1__ingress1______: * - -> ; + -> ; // path rule, target 1: kube_namespace1__ingress1__test_example_org___test1__service1: Host(/^(test[.]example[.]org[.]?(:[0-9]+)?)$/) && PathRegexp(/^(\/test1)/) - -> ; + -> ; // catch all: kube___catchall__test_example_org____: diff --git a/dataclients/kubernetes/testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-serving.yaml b/dataclients/kubernetes/testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-serving.yaml index a8bd86c55e..fab37e03c9 100644 --- a/dataclients/kubernetes/testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-serving.yaml +++ b/dataclients/kubernetes/testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-serving.yaml @@ -85,6 +85,17 @@ endpoints: serving: true terminating: false zone: my-zone-2 + - addresses: + - 42.0.1.5 + conditions: + ready: false + serving: true + zone: my-zone-1 + - addresses: + - 42.0.1.6 + conditions: + ready: false + zone: my-zone-3 ports: - name: port1 port: 8080 diff --git a/docs/operation/operation.md b/docs/operation/operation.md index e40feee974..cf327d2fb5 100644 --- a/docs/operation/operation.md +++ b/docs/operation/operation.md @@ -213,6 +213,8 @@ To monitor skipper we recommend the following queries: - P99 response filter duration (depends on label selector): `histogram_quantile(0.99, sum(rate(skipper_filter_response_duration_seconds_bucket{application="skipper-ingress"}[1m])) by (le) )` - If you use Kubernetes limits or Linux cgroup CFS quotas (depends on label selector): `sum(rate(container_cpu_cfs_throttled_periods_total{container_name="skipper-ingress"}[1m]))` +You may add static metrics labels like `version` using Prometheus [relabeling feature](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config). + ### Connection metrics This option will enable known loadbalancer connections metrics, like @@ -359,7 +361,9 @@ utilized applications (less than 100 requests per second): Metrics from the [go runtime memstats](https://golang.org/pkg/runtime/#MemStats) are exposed from skipper to the metrics endpoint, default listener -:9911, on path /metrics : +:9911, on path /metrics + +#### Go metrics - Codahale ```json "gauges": { @@ -464,6 +468,119 @@ are exposed from skipper to the metrics endpoint, default listener } ``` +#### Go metrics - Prometheus + +``` +# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 4.7279e-05 +go_gc_duration_seconds{quantile="0.25"} 5.9291e-05 +go_gc_duration_seconds{quantile="0.5"} 7.4e-05 +go_gc_duration_seconds{quantile="0.75"} 9.55e-05 +go_gc_duration_seconds{quantile="1"} 0.000199667 +go_gc_duration_seconds_sum 0.001108339 +go_gc_duration_seconds_count 13 +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 13 +# HELP go_info Information about the Go environment. +# TYPE go_info gauge +go_info{version="go1.21.3"} 1 +# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use. +# TYPE go_memstats_alloc_bytes gauge +go_memstats_alloc_bytes 6.4856e+06 +# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed. +# TYPE go_memstats_alloc_bytes_total counter +go_memstats_alloc_bytes_total 4.1797384e+07 +# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table. +# TYPE go_memstats_buck_hash_sys_bytes gauge +go_memstats_buck_hash_sys_bytes 1.462151e+06 +# HELP go_memstats_frees_total Total number of frees. +# TYPE go_memstats_frees_total counter +go_memstats_frees_total 507460 +# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata. +# TYPE go_memstats_gc_sys_bytes gauge +go_memstats_gc_sys_bytes 4.549296e+06 +# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use. +# TYPE go_memstats_heap_alloc_bytes gauge +go_memstats_heap_alloc_bytes 6.4856e+06 +# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used. +# TYPE go_memstats_heap_idle_bytes gauge +go_memstats_heap_idle_bytes 7.421952e+06 +# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use. +# TYPE go_memstats_heap_inuse_bytes gauge +go_memstats_heap_inuse_bytes 8.372224e+06 +# HELP go_memstats_heap_objects Number of allocated objects. +# TYPE go_memstats_heap_objects gauge +go_memstats_heap_objects 70159 +# HELP go_memstats_heap_released_bytes Number of heap bytes released to OS. +# TYPE go_memstats_heap_released_bytes gauge +go_memstats_heap_released_bytes 6.47168e+06 +# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system. +# TYPE go_memstats_heap_sys_bytes gauge +go_memstats_heap_sys_bytes 1.5794176e+07 +# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection. +# TYPE go_memstats_last_gc_time_seconds gauge +go_memstats_last_gc_time_seconds 1.6987664839728708e+09 +# HELP go_memstats_lookups_total Total number of pointer lookups. +# TYPE go_memstats_lookups_total counter +go_memstats_lookups_total 0 +# HELP go_memstats_mallocs_total Total number of mallocs. +# TYPE go_memstats_mallocs_total counter +go_memstats_mallocs_total 577619 +# HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures. +# TYPE go_memstats_mcache_inuse_bytes gauge +go_memstats_mcache_inuse_bytes 19200 +# HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system. +# TYPE go_memstats_mcache_sys_bytes gauge +go_memstats_mcache_sys_bytes 31200 +# HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures. +# TYPE go_memstats_mspan_inuse_bytes gauge +go_memstats_mspan_inuse_bytes 302904 +# HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system. +# TYPE go_memstats_mspan_sys_bytes gauge +go_memstats_mspan_sys_bytes 309624 +# HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place. +# TYPE go_memstats_next_gc_bytes gauge +go_memstats_next_gc_bytes 8.206808e+06 +# HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations. +# TYPE go_memstats_other_sys_bytes gauge +go_memstats_other_sys_bytes 2.402169e+06 +# HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator. +# TYPE go_memstats_stack_inuse_bytes gauge +go_memstats_stack_inuse_bytes 983040 +# HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator. +# TYPE go_memstats_stack_sys_bytes gauge +go_memstats_stack_sys_bytes 983040 +# HELP go_memstats_sys_bytes Number of bytes obtained from system. +# TYPE go_memstats_sys_bytes gauge +go_memstats_sys_bytes 2.5531656e+07 +# HELP go_threads Number of OS threads created. +# TYPE go_threads gauge +go_threads 22 +# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds. +# TYPE process_cpu_seconds_total counter +process_cpu_seconds_total 0.42 +# HELP process_max_fds Maximum number of open file descriptors. +# TYPE process_max_fds gauge +process_max_fds 60000 +# HELP process_open_fds Number of open file descriptors. +# TYPE process_open_fds gauge +process_open_fds 10 +# HELP process_resident_memory_bytes Resident memory size in bytes. +# TYPE process_resident_memory_bytes gauge +process_resident_memory_bytes 4.2811392e+07 +# HELP process_start_time_seconds Start time of the process since unix epoch in seconds. +# TYPE process_start_time_seconds gauge +process_start_time_seconds 1.69876646736e+09 +# HELP process_virtual_memory_bytes Virtual memory size in bytes. +# TYPE process_virtual_memory_bytes gauge +process_virtual_memory_bytes 2.823462912e+09 +# HELP process_virtual_memory_max_bytes Maximum amount of virtual memory available in bytes. +# TYPE process_virtual_memory_max_bytes gauge +process_virtual_memory_max_bytes 1.8446744073709552e+19 +``` + ### Redis - Rate limiting metrics System metrics exposed by the redisclient: @@ -491,7 +608,7 @@ See more details about rate limiting at [Rate limiting](../reference/filters.md# ### Open Policy Agent metrics -If Open Policy Agent filters are enabled, the following counters show up in the `/metrics` endpoint. The bundle-name is the first parameter of the filter so that for example increased error codes can be attributed to a specific source bundle / system. +If Open Policy Agent filters are enabled, the following counters show up in the `/metrics` endpoint. The bundle-name is the first parameter of the filter so that for example increased error codes can be attributed to a specific source bundle / system. - `skipper.opaAuthorizeRequest.custom.decision.allow.` - `skipper.opaAuthorizeRequest.custom.decision.deny.` @@ -505,6 +622,83 @@ The following timer metrics are exposed per used bundle-name: - `skipper.opaAuthorizeRequest.custom.eval_time.` - `skipper.opaServeResponse.custom.eval_time.` +### RouteSRV metrics + +RouteSRV metrics expose the following metrics in Prometheus format: + +``` +% curl http://127.0.0.1:9911/metrics +# 8< Go metrics >8 + +# HELP routesrv_backend_combined_duration_seconds Duration in seconds of a proxy backend combined. +# TYPE routesrv_backend_combined_duration_seconds histogram +routesrv_backend_combined_duration_seconds_bucket{le="0.005"} 5 +routesrv_backend_combined_duration_seconds_bucket{le="0.01"} 5 +routesrv_backend_combined_duration_seconds_bucket{le="0.025"} 5 +routesrv_backend_combined_duration_seconds_bucket{le="0.05"} 5 +routesrv_backend_combined_duration_seconds_bucket{le="0.1"} 5 +routesrv_backend_combined_duration_seconds_bucket{le="0.25"} 5 +routesrv_backend_combined_duration_seconds_bucket{le="0.5"} 5 +routesrv_backend_combined_duration_seconds_bucket{le="1"} 5 +routesrv_backend_combined_duration_seconds_bucket{le="2.5"} 5 +routesrv_backend_combined_duration_seconds_bucket{le="5"} 5 +routesrv_backend_combined_duration_seconds_bucket{le="10"} 5 +routesrv_backend_combined_duration_seconds_bucket{le="+Inf"} 5 +routesrv_backend_combined_duration_seconds_sum 0.001349441 +routesrv_backend_combined_duration_seconds_count 5 +# HELP routesrv_backend_duration_seconds Duration in seconds of a proxy backend. +# TYPE routesrv_backend_duration_seconds histogram +routesrv_backend_duration_seconds_bucket{host="",route="routersv",le="0.005"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",le="0.01"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",le="0.025"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",le="0.05"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",le="0.1"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",le="0.25"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",le="0.5"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",le="1"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",le="2.5"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",le="5"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",le="10"} 5 +routesrv_backend_duration_seconds_bucket{host="",route="routersv",le="+Inf"} 5 +routesrv_backend_duration_seconds_sum{host="",route="routersv"} 0.001349441 +routesrv_backend_duration_seconds_count{host="",route="routersv"} 5 +# HELP routesrv_custom_gauges Gauges number of custom metrics. +# TYPE routesrv_custom_gauges gauge +routesrv_custom_gauges{key="polling_started_timestamp"} 1.69876646881321e+09 +routesrv_custom_gauges{key="redis_endpoints"} 1 +routesrv_custom_gauges{key="routes.byte"} 91378 +routesrv_custom_gauges{key="routes.initialized_timestamp"} 1.6987664689696188e+09 +routesrv_custom_gauges{key="routes.total"} 258 +routesrv_custom_gauges{key="routes.updated_timestamp"} 1.698766468969631e+09 +# HELP routesrv_custom_total Total number of custom metrics. +# TYPE routesrv_custom_total counter +routesrv_custom_total{key="200"} 5 +``` + +Metrics explanation: + +- `routesrv_custom_total{key="200"} 5`: + 5 requests were responded with status code 200 by the current routesrv + version `v0.18.38`. +- `routesrv_custom_gauges{key="polling_started_timestamp"} 1.69876646881321e+09`: + routesrv started to poll at 1.69876646881321e+09 seconds of UNIX beginning + (2023-10-31 16:34:28 1705425/2097152 +0100). +- `routesrv_custom_gauges{key="redis_endpoints"} 1`: + The routes endpoint `/swarm/redis/shards` was called 1 times +- `routesrv_custom_gauges{key="routes.byte"} 91378`: + The number of bytes that are served at `/routes` is 91378. +- `routesrv_custom_gauges{key="routes.initialized_timestamp"} 1.6987664689696188e+09`: + routesrv initialized the routes at 1.6987664689696188e+09 seconds of UNIX beginning. + (2023-10-31 16:34:28 1016719/1048576 +0100) +- `routesrv_custom_gauges{key="routes.total"} 258`: + The number of routes that are served at `/routes` are 258. +- `routesrv_custom_gauges{key="routes.updated_timestamp"} 1.698766468969631e+09`: + The last update of routes by routesrv was at 1.698766468969631e+09. + (2023-10-31 16:34:28 4066927/4194304 +0100) + + +If you want to read more about RouteSRV see [deploy RouteSRV](../kubernetes/ingress-controller.md#routesrv). + ## OpenTracing Skipper has support for different [OpenTracing API](http://opentracing.io/) vendors, including @@ -636,7 +830,7 @@ The following tags are added to the Span, labels are taken from the OPA configur - `opa.decision_id=` - `opa.labels.=` -The labels can for example be used to link to a specific decision in the control plane if they contain URL fragments for the receiving entity. +The labels can for example be used to link to a specific decision in the control plane if they contain URL fragments for the receiving entity. ### Redis rate limiting spans @@ -962,11 +1156,13 @@ predicates and filters involved in the route processing: } ``` -## Profiling skipper +## Profiling Go profiling is explained in Go's [diagnostics](https://golang.org/doc/diagnostics.html) documentation. +### Profiling skipper or RouteSRV + To enable profiling in skipper you have to use `-enable-profile`. This will start a profiling route at `/debug/pprof/profile` on the support listener, which defaults to `:9911`. diff --git a/docs/reference/filters.md b/docs/reference/filters.md index abfd5aaa70..f14265ba03 100644 --- a/docs/reference/filters.md +++ b/docs/reference/filters.md @@ -2819,6 +2819,31 @@ Example: fifo(100, 150, "10s") ``` +### fifoWithBody + +This Filter is similar to the [lifo](#lifo) filter in regards to +parameters and status codes. +Performance considerations are similar to [fifo](#fifo). + +The difference between fifo and fifoWithBody is that fifo will decrement +the concurrency as soon as the backend sent response headers and +fifoWithBody will decrement the concurrency if the response body was +served. Normally both are very similar, but if you have a fully async +component that serves multiple website fragments, this would decrement +concurrency too early. + +Parameters: + +* MaxConcurrency specifies how many goroutines are allowed to work on this queue (int) +* MaxQueueSize sets the queue size (int) +* Timeout sets the timeout to get request scheduled (time) + +Example: + +``` +fifoWithBody(100, 150, "10s") +``` + ### lifo This Filter changes skipper to handle the route with a bounded last in @@ -2932,31 +2957,54 @@ the -rfc-patch-path flag. See [URI standards interpretation](../operation/operation.md#uri-standards-interpretation). ## Egress -### bearerinjector -This filter injects `Bearer` tokens into `Authorization` headers read -from file providing the token as content. This is only for use cases -using skipper as sidecar to inject tokens for the application on the +### setRequestHeaderFromSecret + +This filter sets request header to the secret value with optional prefix and suffix. +This is only for use cases using skipper as sidecar to inject tokens for the application on the [**egress**](egress.md) path, if it's used in the **ingress** path you likely create a security issue for your application. This filter should be used as an [egress](egress.md) only feature. +Parameters: + +* header name (string) +* secret name (string) +* value prefix (string) - optional +* value suffix (string) - optional + Example: ``` -egress1: Method("POST") && Host("api.example.com") -> bearerinjector("/tmp/secrets/write-token") -> "https://api.example.com/shoes"; -egress2: Method("GET") && Host("api.example.com") -> bearerinjector("/tmp/secrets/read-token") -> "https://api.example.com/shoes"; +egress1: Method("GET") -> setRequestHeaderFromSecret("Authorization", "/tmp/secrets/get-token") -> "https://api.example.com"; +egress2: Method("POST") -> setRequestHeaderFromSecret("Authorization", "/tmp/secrets/post-token", "foo-") -> "https://api.example.com"; +egress3: Method("PUT") -> setRequestHeaderFromSecret("X-Secret", "/tmp/secrets/put-token", "bar-", "-baz") -> "https://api.example.com"; ``` -To integrate with the `bearerinjector` filter you need to run skipper -with `-credentials-paths=/tmp/secrets` and specify an update interval -`-credentials-update-interval=10s`. Files in the credentials path can -be a directory, which will be able to find all files within this -directory, but it won't walk subtrees. For the example case, there -have to be filenames `write-token` and `read-token` within the +To use `setRequestHeaderFromSecret` filter you need to run skipper +with `-credentials-paths=/tmp/secrets` and specify an update interval `-credentials-update-interval=10s`. +Files in the credentials path can be a directory, which will be able to find all files within this +directory, but it won't walk subtrees. +For the example case, there have to be `get-token`, `post-token` and `put-token` files within the specified credential paths `/tmp/secrets/`, resulting in -`/tmp/secrets/write-token` and `/tmp/secrets/read-token`. +`/tmp/secrets/get-token`, `/tmp/secrets/post-token` and `/tmp/secrets/put-token`. + +### bearerinjector + +This filter injects `Bearer` tokens into `Authorization` headers read +from file providing the token as content. + +It is a special form of `setRequestHeaderFromSecret` with `"Authorization"` header name, +`"Bearer "` prefix and empty suffix. + +Example: + +``` +egress: * -> bearerinjector("/tmp/secrets/my-token") -> "https://api.example.com"; + +// equivalent to setRequestHeaderFromSecret("Authorization", "/tmp/secrets/my-token", "Bearer ") +``` ## Open Tracing ### tracingBaggageToTag diff --git a/filters/auth/secretheader.go b/filters/auth/secretheader.go new file mode 100644 index 0000000000..cf29b1fab0 --- /dev/null +++ b/filters/auth/secretheader.go @@ -0,0 +1,75 @@ +package auth + +import ( + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/secrets" +) + +type ( + secretHeaderSpec struct { + secretsReader secrets.SecretsReader + } + + secretHeaderFilter struct { + headerName string + secretName string + prefix string + suffix string + + secretsReader secrets.SecretsReader + } +) + +func NewSetRequestHeaderFromSecret(sr secrets.SecretsReader) filters.Spec { + return &secretHeaderSpec{secretsReader: sr} +} + +func (*secretHeaderSpec) Name() string { + return filters.SetRequestHeaderFromSecretName +} + +func (s *secretHeaderSpec) CreateFilter(args []interface{}) (filters.Filter, error) { + if len(args) < 2 || len(args) > 4 { + return nil, filters.ErrInvalidFilterParameters + } + var ok bool + + f := &secretHeaderFilter{ + secretsReader: s.secretsReader, + } + + f.headerName, ok = args[0].(string) + if !ok { + return nil, filters.ErrInvalidFilterParameters + } + + f.secretName, ok = args[1].(string) + if !ok { + return nil, filters.ErrInvalidFilterParameters + } + + if len(args) > 2 { + f.prefix, ok = args[2].(string) + if !ok { + return nil, filters.ErrInvalidFilterParameters + } + } + + if len(args) > 3 { + f.suffix, ok = args[3].(string) + if !ok { + return nil, filters.ErrInvalidFilterParameters + } + } + + return f, nil +} + +func (f *secretHeaderFilter) Request(ctx filters.FilterContext) { + value, ok := f.secretsReader.GetSecret(f.secretName) + if ok { + ctx.Request().Header.Set(f.headerName, f.prefix+string(value)+f.suffix) + } +} + +func (*secretHeaderFilter) Response(filters.FilterContext) {} diff --git a/filters/auth/secretheader_test.go b/filters/auth/secretheader_test.go new file mode 100644 index 0000000000..a40f348f87 --- /dev/null +++ b/filters/auth/secretheader_test.go @@ -0,0 +1,102 @@ +package auth_test + +import ( + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/filters/auth" + "github.com/zalando/skipper/filters/filtertest" +) + +type testSecretsReader struct { + name string + secret string +} + +func (tsr *testSecretsReader) GetSecret(name string) ([]byte, bool) { + if name == tsr.name { + return []byte(tsr.secret), true + } + return nil, false +} + +func (*testSecretsReader) Close() {} + +func TestSetRequestHeaderFromSecretInvalidArgs(t *testing.T) { + spec := auth.NewSetRequestHeaderFromSecret(nil) + for _, def := range []string{ + `setRequestHeaderFromSecret()`, + `setRequestHeaderFromSecret("X-Secret")`, + `setRequestHeaderFromSecret("X-Secret", 1)`, + `setRequestHeaderFromSecret(1, "/my-secret")`, + `setRequestHeaderFromSecret("X-Secret", "/my-secret", 1)`, + `setRequestHeaderFromSecret("X-Secret", "/my-secret", "prefix", 1)`, + `setRequestHeaderFromSecret("X-Secret", "/my-secret", "prefix", "suffix", "garbage")`, + } { + t.Run(def, func(t *testing.T) { + ff := eskip.MustParseFilters(def) + require.Len(t, ff, 1) + + _, err := spec.CreateFilter(ff[0].Args) + assert.Error(t, err) + }) + } +} + +func TestSetRequestHeaderFromSecret(t *testing.T) { + spec := auth.NewSetRequestHeaderFromSecret(&testSecretsReader{ + name: "/my-secret", + secret: "secret-value", + }) + + assert.Equal(t, "setRequestHeaderFromSecret", spec.Name()) + + for _, tc := range []struct { + def, header, value string + }{ + { + def: `setRequestHeaderFromSecret("X-Secret", "/my-secret")`, + header: "X-Secret", + value: "secret-value", + }, + { + def: `setRequestHeaderFromSecret("X-Secret", "/my-secret", "foo-")`, + header: "X-Secret", + value: "foo-secret-value", + }, + { + def: `setRequestHeaderFromSecret("X-Secret", "/my-secret", "foo-", "-bar")`, + header: "X-Secret", + value: "foo-secret-value-bar", + }, + { + def: `setRequestHeaderFromSecret("X-Secret", "/does-not-exist")`, + header: "X-Secret", + value: "", + }, + } { + t.Run(tc.def, func(t *testing.T) { + ff := eskip.MustParseFilters(tc.def) + require.Len(t, ff, 1) + + f, err := spec.CreateFilter(ff[0].Args) + assert.NoError(t, err) + + ctx := &filtertest.Context{ + FRequest: &http.Request{ + Header: http.Header{}, + }, + } + f.Request(ctx) + + if tc.value != "" { + assert.Equal(t, tc.value, ctx.FRequest.Header.Get(tc.header)) + } else { + assert.NotContains(t, ctx.FRequest.Header, tc.header) + } + }) + } +} diff --git a/filters/builtin/builtin.go b/filters/builtin/builtin.go index 0609d87412..d4e1895bf4 100644 --- a/filters/builtin/builtin.go +++ b/filters/builtin/builtin.go @@ -216,6 +216,7 @@ func Filters() []filters.Spec { auth.NewForwardToken(), auth.NewForwardTokenField(), scheduler.NewFifo(), + scheduler.NewFifoWithBody(), scheduler.NewLIFO(), scheduler.NewLIFOGroup(), rfc.NewPath(), diff --git a/filters/diag/logheader_test.go b/filters/diag/logheader_test.go index 560a1f51c2..c1c356baf6 100644 --- a/filters/diag/logheader_test.go +++ b/filters/diag/logheader_test.go @@ -12,6 +12,25 @@ import ( "github.com/zalando/skipper/filters/filtertest" ) +func TestCreateFilterLogHeader(t *testing.T) { + lgh := logHeader{} + f, err := lgh.CreateFilter([]interface{}{"request", "response"}) + if err != nil { + t.Fatal(err) + } + lgh = f.(logHeader) + if !(lgh.request && lgh.response) { + t.Errorf("Failed to set members: %v %v", lgh.request, lgh.response) + } +} +func TestCreateFilterLogHeaderWrongInput(t *testing.T) { + lgh := logHeader{} + _, err := lgh.CreateFilter([]interface{}{5}) + if err == nil { + t.Fatal("Failed to get expected error 5 is no string") + } +} + func TestLogHeader(t *testing.T) { defer func() { log.SetOutput(os.Stderr) @@ -152,3 +171,109 @@ func TestLogHeaderRequestResponse(t *testing.T) { } } + +func TestLogHeaderAuthorizationRequestResponse(t *testing.T) { + defer func() { + log.SetOutput(os.Stderr) + }() + + req, err := http.NewRequest("GET", "https://example.org/", nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Authorization", "secret") + + // req.Header is a map so for non flaky tests we need to drop the default header from expected output + req.Header.Del("User-Agent") + + resp := &http.Response{ + Header: http.Header{ + "Authorization": []string{"secret"}, + }, + StatusCode: http.StatusOK, + Status: http.StatusText(200), + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Body: nil, + ContentLength: 0, + } + + ctx := &filtertest.Context{ + FRequest: req, + FResponse: resp, + } + + outputVerify := bytes.NewBuffer(nil) + req.Body = nil + if err := req.Write(outputVerify); err != nil { + t.Fatal(err) + } + + loggerVerify := bytes.NewBuffer(nil) + log.SetOutput(loggerVerify) + s := outputVerify.String() + s = strings.ReplaceAll(s, "secret", "TRUNCATED") + // req.Header is a map so for non flaky tests we need to drop the default header from expected output + s = strings.ReplaceAll(s, "User-Agent: Go-http-client/1.1\r\n", "") + log.Println(s) + + loggerTest := bytes.NewBuffer(nil) + log.SetOutput(loggerTest) + + req.Body = io.NopCloser(bytes.NewBufferString("foo bar baz")) + + lh := logHeader{ + request: true, + response: true, + } + + lh.Request(ctx) + if loggerTest.Len() == 0 || !bytes.Equal(loggerTest.Bytes(), loggerVerify.Bytes()) { + t.Error("failed to log the request header") + t.Log("expected:") + t.Log(loggerVerify.String()) + t.Log("got:") + t.Log(loggerTest.String()) + } + + // response + outputVerify = bytes.NewBuffer(nil) + resp.Body = nil + outputVerify.WriteString("Response for ") + outputVerify.WriteString(req.Method) + outputVerify.WriteString(" ") + outputVerify.WriteString(req.URL.Path) + outputVerify.WriteString(" ") + outputVerify.WriteString(req.Proto) + outputVerify.WriteString("\r\n") + outputVerify.WriteString(resp.Status) + outputVerify.WriteString("\r\n") + for k, v := range resp.Header { + if k == "Authorization" { + v = []string{"TRUNCATED"} + } + outputVerify.WriteString(k) + outputVerify.WriteString(": ") + outputVerify.WriteString(strings.Join(v, " ")) + outputVerify.WriteString("\r\n") + } + outputVerify.WriteString("\r\n") + + loggerVerify = bytes.NewBuffer(nil) + log.SetOutput(loggerVerify) + log.Println(outputVerify.String()) + + loggerTest = bytes.NewBuffer(nil) + log.SetOutput(loggerTest) + + lh.Response(ctx) + if loggerTest.Len() == 0 || !bytes.Equal(loggerTest.Bytes(), loggerVerify.Bytes()) { + t.Error("failed to log the response header") + t.Log("expected:") + t.Log(loggerVerify.String()) + t.Log("got:") + t.Log(loggerTest.String()) + } + +} diff --git a/filters/filters.go b/filters/filters.go index 6caaaa7a41..d91cc435b4 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -327,11 +327,13 @@ const ( SetDynamicBackendUrl = "setDynamicBackendUrl" ApiUsageMonitoringName = "apiUsageMonitoring" FifoName = "fifo" + FifoWithBodyName = "fifoWithBody" LifoName = "lifo" LifoGroupName = "lifoGroup" RfcPathName = "rfcPath" RfcHostName = "rfcHost" BearerInjectorName = "bearerinjector" + SetRequestHeaderFromSecretName = "setRequestHeaderFromSecret" TracingBaggageToTagName = "tracingBaggageToTag" StateBagToTagName = "stateBagToTag" TracingTagName = "tracingTag" diff --git a/filters/scheduler/cleanup_test.go b/filters/scheduler/cleanup_test.go new file mode 100644 index 0000000000..a479173561 --- /dev/null +++ b/filters/scheduler/cleanup_test.go @@ -0,0 +1,85 @@ +package scheduler + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/proxy" + "github.com/zalando/skipper/routing" + "github.com/zalando/skipper/routing/testdataclient" + "github.com/zalando/skipper/scheduler" +) + +func TestCleanupOnBackendErrors(t *testing.T) { + doc := ` + aroute: * + -> lifo(1, 1, "100ms") + -> lifoGroup("foo", 1, 1, "100ms") + -> lifo(2, 2, "200ms") + -> lifoGroup("bar", 1, 1, "100ms") + -> fifo(1, 1, "200ms") + -> "http://test.invalid" + ` + + dc, err := testdataclient.NewDoc(doc) + require.NoError(t, err) + defer dc.Close() + + reg := scheduler.RegistryWith(scheduler.Options{}) + defer reg.Close() + + fr := make(filters.Registry) + fr.Register(NewLIFO()) + fr.Register(NewLIFOGroup()) + fr.Register(NewFifo()) + + ro := routing.Options{ + SignalFirstLoad: true, + FilterRegistry: fr, + DataClients: []routing.DataClient{dc}, + PostProcessors: []routing.PostProcessor{reg}, + } + + rt := routing.New(ro) + defer rt.Close() + + <-rt.FirstLoad() + + pr := proxy.WithParams(proxy.Params{ + Routing: rt, + }) + defer pr.Close() + + ts := httptest.NewServer(pr) + defer ts.Close() + + rsp, err := http.Get(ts.URL) + require.NoError(t, err) + rsp.Body.Close() + + var route *routing.Route + { + req, err := http.NewRequest("GET", ts.URL, nil) + require.NoError(t, err) + + route, _ = rt.Get().Do(req) + require.NotNil(t, route, "failed to lookup route") + } + + for _, f := range route.Filters { + if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.Queue }); ok { + status := qf.GetQueue().Status() + assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status) + } else if qf, ok := f.Filter.(interface{ GetQueue() *scheduler.FifoQueue }); ok { + status := qf.GetQueue().Status() + assert.Equal(t, scheduler.QueueStatus{ActiveRequests: 0, QueuedRequests: 0, Closed: false}, status) + } else { + t.Fatal("filter does not implement GetQueue()") + } + } +} diff --git a/filters/scheduler/fifo.go b/filters/scheduler/fifo.go index 040d92a3ad..50ceb95b65 100644 --- a/filters/scheduler/fifo.go +++ b/filters/scheduler/fifo.go @@ -11,24 +11,31 @@ import ( "github.com/zalando/skipper/scheduler" ) -const ( - fifoKey string = "fifo" -) - type ( - fifoSpec struct{} + fifoSpec struct { + typ string + } fifoFilter struct { config scheduler.Config queue *scheduler.FifoQueue + typ string } ) func NewFifo() filters.Spec { - return &fifoSpec{} + return &fifoSpec{ + typ: filters.FifoName, + } } -func (*fifoSpec) Name() string { - return filters.FifoName +func NewFifoWithBody() filters.Spec { + return &fifoSpec{ + typ: filters.FifoWithBodyName, + } +} + +func (s *fifoSpec) Name() string { + return s.typ } // CreateFilter creates a fifoFilter, that will use a semaphore based @@ -65,6 +72,7 @@ func (s *fifoSpec) CreateFilter(args []interface{}) (filters.Filter, error) { } return &fifoFilter{ + typ: s.typ, config: scheduler.Config{ MaxConcurrency: cc, MaxQueueSize: qs, @@ -132,21 +140,33 @@ func (f *fifoFilter) Request(ctx filters.FilterContext) { } // ok - pending, _ := ctx.StateBag()[fifoKey].([]func()) - ctx.StateBag()[fifoKey] = append(pending, done) + pending, _ := ctx.StateBag()[f.typ].([]func()) + ctx.StateBag()[f.typ] = append(pending, done) } // Response will decrease the number of inflight requests to release // the concurrency reservation for the request. func (f *fifoFilter) Response(ctx filters.FilterContext) { - pending, ok := ctx.StateBag()[fifoKey].([]func()) - if !ok { - return - } - last := len(pending) - 1 - if last < 0 { - return + switch f.typ { + case filters.FifoName: + pending, ok := ctx.StateBag()[f.typ].([]func()) + if !ok { + return + } + last := len(pending) - 1 + if last < 0 { + return + } + pending[last]() + ctx.StateBag()[f.typ] = pending[:last] + + case filters.FifoWithBodyName: + // nothing to do here, handled in the proxy after copyStream() } - pending[last]() - ctx.StateBag()[fifoKey] = pending[:last] +} + +// HandleErrorResponse is to opt-in for filters to get called +// Response(ctx) in case of errors via proxy. It has to return true to opt-in. +func (f *fifoFilter) HandleErrorResponse() bool { + return true } diff --git a/filters/scheduler/fifo_test.go b/filters/scheduler/fifo_test.go index b0ed9bda38..c9f0093f20 100644 --- a/filters/scheduler/fifo_test.go +++ b/filters/scheduler/fifo_test.go @@ -1,15 +1,20 @@ package scheduler import ( + "bytes" "fmt" "io" "net/http" stdlibhttptest "net/http/httptest" - "net/url" + "strings" "testing" + "testing/iotest" "time" "github.com/opentracing/opentracing-go/mocktracer" + "github.com/sirupsen/logrus" + + "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/filters" "github.com/zalando/skipper/metrics/metricstest" "github.com/zalando/skipper/net/httptest" @@ -19,11 +24,34 @@ import ( "github.com/zalando/skipper/scheduler" ) +func TestCreateFifoName(t *testing.T) { + for _, tt := range []struct { + name string + filterFunc func() filters.Spec + }{ + { + name: filters.FifoName, + filterFunc: NewFifo, + }, + { + name: filters.FifoWithBodyName, + filterFunc: NewFifoWithBody, + }, + } { + t.Run(tt.name, func(t *testing.T) { + if tt.filterFunc().Name() != tt.name { + t.Fatalf("got %q, want %q", tt.filterFunc().Name(), tt.name) + } + }) + } +} + func TestCreateFifoFilter(t *testing.T) { for _, tt := range []struct { name string args []interface{} wantParseErr bool + wantConfig scheduler.Config }{ { name: "fifo no args", @@ -51,6 +79,38 @@ func TestCreateFifoFilter(t *testing.T) { 5, "1s", }, + wantConfig: scheduler.Config{ + MaxConcurrency: 3, + MaxQueueSize: 5, + Timeout: 1 * time.Second, + }, + }, + { + name: "fifo negative value arg1", + args: []interface{}{ + -3, + 5, + "1s", + }, + wantParseErr: true, + }, + { + name: "fifo negative value arg2", + args: []interface{}{ + 3, + -5, + "1s", + }, + wantParseErr: true, + }, + { + name: "fifo too small value arg3", + args: []interface{}{ + 3, + 5, + "1ns", + }, + wantParseErr: true, }, { name: "fifo wrong type arg1", @@ -100,18 +160,226 @@ func TestCreateFifoFilter(t *testing.T) { }, } { t.Run(tt.name, func(t *testing.T) { - spec := &fifoSpec{} - ff, err := spec.CreateFilter(tt.args) - if err != nil && !tt.wantParseErr { - t.Fatalf("Failed to parse filter: %v", err) + for _, f := range []func() filters.Spec{NewFifo, NewFifoWithBody} { + spec := f() + ff, err := spec.CreateFilter(tt.args) + if err != nil && !tt.wantParseErr { + t.Fatalf("Failed to parse filter: %v", err) + } + if err == nil && tt.wantParseErr { + t.Fatal("Failed to get wanted error on create filter") + } + + if _, ok := ff.(*fifoFilter); !ok && err == nil { + t.Fatal("Failed to convert filter to *fifoFilter") + } + } + }) + } +} + +type flusher struct { + w http.ResponseWriter +} + +func (f *flusher) Flush() { + f.w.(http.Flusher).Flush() +} + +func (f *flusher) Unwrap() http.ResponseWriter { + return f.w +} + +func (f *flusher) Write(p []byte) (n int, err error) { + n, err = f.w.Write(p) + if err == nil { + f.Flush() + } + return +} + +type slowReader struct { + r io.Reader + d time.Duration +} + +func (sr *slowReader) Read(p []byte) (int, error) { + logrus.Infof("slowReader: %d", len(p)) + if len(p) == 0 { + return 0, nil + } + time.Sleep(sr.d) + n, err := sr.r.Read(p) + logrus.Infof("slowReader: %d %v", n, err) + return n, err +} + +func TestFifoWithBody(t *testing.T) { + for _, tt := range []struct { + name string + args []interface{} + backendTime time.Duration + responseSize int + wantErr bool + }{ + { + name: "fifoWithBody 1024B with 1 queue should be ok", + args: []interface{}{1, 1, "1s"}, + backendTime: 10 * time.Millisecond, + responseSize: 1024, + }, + { + name: "fifoWithBody 1024B with 0 queue should fail", + args: []interface{}{1, 0, "10ms"}, + backendTime: 50 * time.Millisecond, + responseSize: 1024, + wantErr: true, + }, + { + name: "fifoWithBody 2x 1024B with 1 queue should be ok", + args: []interface{}{1, 1, "1s"}, + backendTime: 10 * time.Millisecond, + responseSize: 2 * 1024, + }, + { + name: "fifoWithBody 2x 1024B with 0 queue should fail", + args: []interface{}{1, 0, "15ms"}, + backendTime: 10 * time.Millisecond, + responseSize: 2 * 1024, + wantErr: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + + backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Logf("backend path: %s", r.URL.Path) + buf := bytes.NewBufferString(strings.Repeat("A", tt.responseSize)) + halfReader := iotest.HalfReader(buf) + sr := &slowReader{ + d: 100 * time.Millisecond, + r: halfReader, + } + + w.WriteHeader(http.StatusOK) + // sleep here to test the difference between streaming response and not + time.Sleep(tt.backendTime) + // TODO: maybe better to do slow body streaming? + b := make([]byte, 1024) + io.CopyBuffer(&flusher{w}, sr, b) + })) + defer backend.Close() + + // proxy + metrics := &metricstest.MockMetrics{} + defer metrics.Close() + reg := scheduler.RegistryWith(scheduler.Options{ + Metrics: metrics, + EnableRouteFIFOMetrics: true, + }) + defer reg.Close() + fr := make(filters.Registry) + fr.Register(NewFifoWithBody()) + args := append(tt.args, backend.URL) + doc := fmt.Sprintf(`r: * -> fifoWithBody(%v, %v, "%v") -> "%s"`, args...) + t.Logf("%s", doc) + dc, err := testdataclient.NewDoc(doc) + if err != nil { + t.Fatalf("Failed to create testdataclient: %v", err) + } + defer dc.Close() + ro := routing.Options{ + SignalFirstLoad: true, + FilterRegistry: fr, + DataClients: []routing.DataClient{dc}, + PostProcessors: []routing.PostProcessor{reg}, + } + rt := routing.New(ro) + defer rt.Close() + <-rt.FirstLoad() + tracer := &testTracer{MockTracer: mocktracer.New()} + pr := proxy.WithParams(proxy.Params{ + Routing: rt, + OpenTracing: &proxy.OpenTracingParams{Tracer: tracer}, + }) + defer pr.Close() + ts := stdlibhttptest.NewServer(pr) + defer ts.Close() + + // simple test + rsp, err := ts.Client().Get(ts.URL + "/test") + if err != nil { + t.Fatalf("Failed to get response from %s: %v", ts.URL, err) } - if err == nil && tt.wantParseErr { - t.Fatal("Failed to get wanted error on create filter") + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + t.Fatalf("Failed to get valid response from endpoint: %d", rsp.StatusCode) + } + b, err := io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed to read response body from: %v", err) + } + if len(b) != tt.responseSize { + t.Fatalf("Failed to read the size, got: %v, want: %v", len(b), tt.responseSize) + } + + t.Log("the streaming test") + // the streaming test + rspCH := make(chan *http.Response) + errCH := make(chan error) + defer func() { + close(rspCH) + close(errCH) + }() + waithCH := make(chan struct{}) + go func() { + rsp, err := ts.Client().Get(ts.URL + "/1") + t.Logf("rsp1: %s", rsp.Status) + close(waithCH) + if err != nil { + errCH <- err + } else { + rspCH <- rsp + } + }() + + <-waithCH + rsp2, err2 := ts.Client().Get(ts.URL + "/2") + t.Logf("rsp2: %s", rsp.Status) + if tt.wantErr { + n, err := io.Copy(io.Discard, rsp2.Body) + if n != 0 { + t.Fatalf("Failed to get error copied %d bytes, err: %v", n, err) + } + rsp2.Body.Close() + } else { + if err2 != nil { + t.Errorf("Failed to do 2nd request: %v", err2) + } else { + b, err2 := io.ReadAll(rsp2.Body) + if err2 != nil { + t.Errorf("Failed 2nd request to read body: %v", err2) + } + if len(b) != tt.responseSize { + t.Errorf("Failed 2nd request to get response size: %d, want: %d", len(b), tt.responseSize) + } + } } - if _, ok := ff.(*fifoFilter); !ok && err == nil { - t.Fatal("Failed to convert filter to *fifoFilter") + // read body from first request + select { + case err := <-errCH: + t.Fatalf("Failed to do request: %v", err) + case rsp := <-rspCH: + t.Logf("client1 got %s", rsp.Status) + b, err := io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed to read body: %v", err) + } + if len(b) != tt.responseSize { + t.Fatalf("Failed to get response size: %d, want: %d", len(b), tt.responseSize) + } } + }) } } @@ -119,111 +387,86 @@ func TestCreateFifoFilter(t *testing.T) { func TestFifo(t *testing.T) { for _, tt := range []struct { name string - args []interface{} + filter string freq int per time.Duration backendTime time.Duration clientTimeout time.Duration - wantConfig scheduler.Config - wantParseErr bool wantOkRate float64 - epsilon float64 }{ { - name: "fifo defaults", - args: []interface{}{}, - wantParseErr: true, + name: "fifo simple ok", + filter: `fifo(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + wantOkRate: 1.0, }, { - name: "fifo simple ok", - args: []interface{}{ - 3, - 5, - "1s", - }, + name: "fifoWithBody simple ok", + filter: `fifoWithBody(3, 5, "1s")`, freq: 20, per: 100 * time.Millisecond, backendTime: 1 * time.Millisecond, clientTimeout: time.Second, - wantConfig: scheduler.Config{ - MaxConcurrency: 3, - MaxQueueSize: 5, - Timeout: time.Second, - }, - wantParseErr: false, - wantOkRate: 1.0, - epsilon: 1, + wantOkRate: 1.0, }, { - name: "fifo with reaching max concurrency and queue timeouts", - args: []interface{}{ - 3, - 5, - "10ms", - }, + name: "fifo simple client canceled", + filter: `fifo(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Nanosecond, + wantOkRate: 0, + }, + { + name: "fifoWithBody simple client canceled", + filter: `fifoWithBody(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Nanosecond, + wantOkRate: 0, + }, + { + name: "fifo with reaching max concurrency and queue timeouts", + filter: `fifo(3, 5, "10ms")`, + freq: 20, + per: 10 * time.Millisecond, + backendTime: 11 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.005, + }, + { + name: "fifoWithBody with reaching max concurrency and queue timeouts", + filter: `fifoWithBody(3, 5, "10ms")`, + freq: 20, + per: 10 * time.Millisecond, + backendTime: 11 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 0.005, + }, + { + name: "fifo with reaching max concurrency and queue full", + filter: `fifo(1, 1, "250ms")`, freq: 200, per: 100 * time.Millisecond, - backendTime: 10 * time.Millisecond, + backendTime: 100 * time.Millisecond, clientTimeout: time.Second, - wantConfig: scheduler.Config{ - MaxConcurrency: 3, - MaxQueueSize: 5, - Timeout: 10 * time.Millisecond, - }, - wantParseErr: false, - wantOkRate: 0.1, - epsilon: 1, + wantOkRate: 0.0008, }, { - name: "fifo with reaching max concurrency and queue full", - args: []interface{}{ - 1, - 1, - "250ms", - }, + name: "fifoWithBody with reaching max concurrency and queue full", + filter: `fifoWithBody(1, 1, "250ms")`, freq: 200, per: 100 * time.Millisecond, backendTime: 100 * time.Millisecond, clientTimeout: time.Second, - wantConfig: scheduler.Config{ - MaxConcurrency: 1, - MaxQueueSize: 1, - Timeout: 250 * time.Millisecond, - }, - wantParseErr: false, - wantOkRate: 0.0008, - epsilon: 1, + wantOkRate: 0.0008, }, } { t.Run(tt.name, func(t *testing.T) { - fs := NewFifo() - if fs.Name() != filters.FifoName { - t.Fatalf("Failed to get name got %s want %s", fs.Name(), filters.FifoName) - } - - // no parse error - ff, err := fs.CreateFilter(tt.args) - if err != nil && !tt.wantParseErr { - t.Fatalf("Failed to parse filter: %v", err) - } - if err == nil && tt.wantParseErr { - t.Fatalf("want parse error but have no: %v", err) - } - if tt.wantParseErr { - return - } - - // validate config - if f, ok := ff.(*fifoFilter); ok { - config := f.Config() - if config != tt.wantConfig { - t.Fatalf("Failed to get Config, got: %v, want: %v", config, tt.wantConfig) - } - if f.queue != f.GetQueue() { - t.Fatal("Failed to get expected queue") - } - } - metrics := &metricstest.MockMetrics{} reg := scheduler.RegistryWith(scheduler.Options{ Metrics: metrics, @@ -232,7 +475,8 @@ func TestFifo(t *testing.T) { defer reg.Close() fr := make(filters.Registry) - fr.Register(fs) + fr.Register(NewFifo()) + fr.Register(NewFifoWithBody()) backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(tt.backendTime) @@ -241,22 +485,11 @@ func TestFifo(t *testing.T) { })) defer backend.Close() - var fmtStr string - switch len(tt.args) { - case 0: - fmtStr = `aroute: * -> fifo() -> "%s"` - case 1: - fmtStr = `aroute: * -> fifo(%v) -> "%s"` - case 2: - fmtStr = `aroute: * -> fifo(%v, %v) -> "%s"` - case 3: - fmtStr = `aroute: * -> fifo(%v, %v, "%v") -> "%s"` - default: - t.Fatalf("Test not possible %d >3", len(tt.args)) + if ff := eskip.MustParseFilters(tt.filter); len(ff) != 1 { + t.Fatalf("expected one filter, got %d", len(ff)) } - args := append(tt.args, backend.URL) - doc := fmt.Sprintf(fmtStr, args...) + doc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) t.Logf("%s", doc) dc, err := testdataclient.NewDoc(doc) @@ -285,14 +518,9 @@ func TestFifo(t *testing.T) { ts := stdlibhttptest.NewServer(pr) defer ts.Close() - reqURL, err := url.Parse(ts.URL) + rsp, err := ts.Client().Get(ts.URL) if err != nil { - t.Fatalf("Failed to parse url %s: %v", ts.URL, err) - } - - rsp, err := http.DefaultClient.Get(reqURL.String()) - if err != nil { - t.Fatalf("Failed to get response from %s: %v", reqURL.String(), err) + t.Fatalf("Failed to get response from %s: %v", ts.URL, err) } defer rsp.Body.Close() @@ -300,16 +528,26 @@ func TestFifo(t *testing.T) { t.Fatalf("Failed to get valid response from endpoint: %d", rsp.StatusCode) } - va := httptest.NewVegetaAttacker(reqURL.String(), tt.freq, tt.per, tt.clientTimeout) + va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, tt.clientTimeout) va.Attack(io.Discard, 1*time.Second, tt.name) t.Logf("Success [0..1]: %0.2f", va.Success()) t.Logf("requests: %d", va.TotalRequests()) + count200, _ := va.CountStatus(200) + count499, _ := va.CountStatus(0) + count502, _ := va.CountStatus(502) + count503, _ := va.CountStatus(503) + t.Logf("status 200: %d", count200) + t.Logf("status 499: %d", count499) + t.Logf("status 502: %d", count502) + t.Logf("status 503: %d", count503) + got := va.TotalSuccess() want := tt.wantOkRate * float64(va.TotalRequests()) if got < want { t.Fatalf("OK rate too low got 0 { t.Fatal("no OK") @@ -327,71 +565,38 @@ func TestFifo(t *testing.T) { } } -func TestConstantRouteUpdatesFifo(t *testing.T) { +func TestFifoConstantRouteUpdates(t *testing.T) { for _, tt := range []struct { name string - args []interface{} + filter string freq int per time.Duration updateRate time.Duration backendTime time.Duration clientTimeout time.Duration - wantConfig scheduler.Config - wantParseErr bool wantOkRate float64 - epsilon float64 }{ { - name: "fifo simple ok", - args: []interface{}{ - 3, - 5, - "1s", - }, + name: "fifo simple ok", + filter: `fifo(3, 5, "1s")`, freq: 20, per: 100 * time.Millisecond, updateRate: 25 * time.Millisecond, backendTime: 1 * time.Millisecond, clientTimeout: time.Second, - wantConfig: scheduler.Config{ - MaxConcurrency: 3, - MaxQueueSize: 5, - Timeout: time.Second, - }, - wantParseErr: false, - wantOkRate: 1.0, - epsilon: 1, + wantOkRate: 1.0, + }, { + name: "fifoWithBody simple ok", + filter: `fifoWithBody(3, 5, "1s")`, + freq: 20, + per: 100 * time.Millisecond, + updateRate: 25 * time.Millisecond, + backendTime: 1 * time.Millisecond, + clientTimeout: time.Second, + wantOkRate: 1.0, }, } { t.Run(tt.name, func(t *testing.T) { - fs := NewFifo() - if fs.Name() != filters.FifoName { - t.Fatalf("Failed to get name got %s want %s", fs.Name(), filters.FifoName) - } - - // no parse error - ff, err := fs.CreateFilter(tt.args) - if err != nil && !tt.wantParseErr { - t.Fatalf("Failed to parse filter: %v", err) - } - if err == nil && tt.wantParseErr { - t.Fatalf("want parse error but have no: %v", err) - } - if tt.wantParseErr { - return - } - - // validate config - if f, ok := ff.(*fifoFilter); ok { - config := f.Config() - if config != tt.wantConfig { - t.Fatalf("Failed to get Config, got: %v, want: %v", config, tt.wantConfig) - } - if f.queue != f.GetQueue() { - t.Fatal("Failed to get expected queue") - } - } - metrics := &metricstest.MockMetrics{} reg := scheduler.RegistryWith(scheduler.Options{ Metrics: metrics, @@ -400,7 +605,8 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { defer reg.Close() fr := make(filters.Registry) - fr.Register(fs) + fr.Register(NewFifo()) + fr.Register(NewFifoWithBody()) backend := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(tt.backendTime) @@ -409,9 +615,11 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { })) defer backend.Close() - args := append(tt.args, backend.URL) - doc := fmt.Sprintf(`aroute: * -> fifo(%v, %v, "%v") -> "%s"`, args...) + if ff := eskip.MustParseFilters(tt.filter); len(ff) != 1 { + t.Fatalf("expected one filter, got %d", len(ff)) + } + doc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) dc, err := testdataclient.NewDoc(doc) if err != nil { t.Fatalf("Failed to create testdataclient: %v", err) @@ -438,14 +646,9 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { ts := stdlibhttptest.NewServer(pr) defer ts.Close() - reqURL, err := url.Parse(ts.URL) - if err != nil { - t.Fatalf("Failed to parse url %s: %v", ts.URL, err) - } - - rsp, err := http.DefaultClient.Get(reqURL.String()) + rsp, err := ts.Client().Get(ts.URL) if err != nil { - t.Fatalf("Failed to get response from %s: %v", reqURL.String(), err) + t.Fatalf("Failed to get response from %s: %v", ts.URL, err) } defer rsp.Body.Close() @@ -455,7 +658,7 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { // run dataclient updates quit := make(chan struct{}) - newDoc := fmt.Sprintf(`aroute: * -> fifo(100, 200, "250ms") -> "%s"`, backend.URL) + newDoc := fmt.Sprintf(`aroute: * -> %s -> "%s"`, tt.filter, backend.URL) go func(q chan<- struct{}, updateRate time.Duration, doc1, doc2 string) { i := 0 for { @@ -475,7 +678,7 @@ func TestConstantRouteUpdatesFifo(t *testing.T) { }(quit, tt.updateRate, doc, newDoc) - va := httptest.NewVegetaAttacker(reqURL.String(), tt.freq, tt.per, tt.clientTimeout) + va := httptest.NewVegetaAttacker(ts.URL, tt.freq, tt.per, tt.clientTimeout) va.Attack(io.Discard, 1*time.Second, tt.name) quit <- struct{}{} diff --git a/filters/scheduler/lifo.go b/filters/scheduler/lifo.go index 60db279a58..717580bcd9 100644 --- a/filters/scheduler/lifo.go +++ b/filters/scheduler/lifo.go @@ -243,7 +243,7 @@ func (l *lifoFilter) Close() error { // increase the number of inflight requests and respond to the caller, // if the bounded queue returns an error. Status code by Error: // -// - 503 if jobqueue.ErrQueueFull +// - 503 if jobqueue.ErrStackFull // - 502 if jobqueue.ErrTimeout func (l *lifoFilter) Request(ctx filters.FilterContext) { request(l.GetQueue(), scheduler.LIFOKey, ctx) @@ -255,6 +255,12 @@ func (l *lifoFilter) Response(ctx filters.FilterContext) { response(scheduler.LIFOKey, ctx) } +// HandleErrorResponse is to opt-in for filters to get called +// Response(ctx) in case of errors via proxy. It has to return true to opt-in. +func (l *lifoFilter) HandleErrorResponse() bool { + return true +} + func (l *lifoGroupFilter) Group() string { return l.name } @@ -300,6 +306,12 @@ func (l *lifoGroupFilter) Response(ctx filters.FilterContext) { response(scheduler.LIFOKey, ctx) } +// HandleErrorResponse is to opt-in for filters to get called +// Response(ctx) in case of errors via proxy. It has to return true to opt-in. +func (l *lifoGroupFilter) HandleErrorResponse() bool { + return true +} + func request(q *scheduler.Queue, key string, ctx filters.FilterContext) { if q == nil { ctx.Logger().Warnf("Unexpected scheduler.Queue is nil for key %s", key) diff --git a/net/httptest/client.go b/net/httptest/client.go index 9b6b44f06b..778913cc0c 100644 --- a/net/httptest/client.go +++ b/net/httptest/client.go @@ -2,7 +2,6 @@ package httptest import ( "io" - "log" "strconv" "time" @@ -64,13 +63,8 @@ func (atk *VegetaAttacker) Attack(w io.Writer, d time.Duration, name string) { continue } atk.metrics.Add(res) - //metrics.Latencies.Add(res.Latency) } atk.metrics.Close() - // logrus.Info("histogram reporter:") - // histReporter := vegeta.NewHistogramReporter(atk.metrics.Histogram) - // histReporter.Report(os.Stdout) - log.Print("text reporter:") reporter := vegeta.NewTextReporter(atk.metrics) reporter.Report(w) } diff --git a/proxy/context.go b/proxy/context.go index 418d76ed64..468dc4ac98 100644 --- a/proxy/context.go +++ b/proxy/context.go @@ -296,7 +296,9 @@ func (c *context) Split() (filters.FilterContext, error) { } func (c *context) Loopback() { - err := c.proxy.do(c) + loopSpan := c.Tracer().StartSpan(c.proxy.tracing.initialOperationName, opentracing.ChildOf(c.ParentSpan().Context())) + defer loopSpan.Finish() + err := c.proxy.do(c, loopSpan) if c.response != nil && c.response.Body != nil { if _, err := io.Copy(io.Discard, c.response.Body); err != nil { c.Logger().Errorf("context: error while discarding remainder response body: %v.", err) diff --git a/proxy/proxy.go b/proxy/proxy.go index 2d2ef831d8..7b6923cb2e 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -40,7 +40,6 @@ import ( "github.com/zalando/skipper/ratelimit" "github.com/zalando/skipper/rfc" "github.com/zalando/skipper/routing" - "github.com/zalando/skipper/scheduler" "github.com/zalando/skipper/tracing" ) @@ -1025,7 +1024,7 @@ func stack() []byte { } } -func (p *Proxy) do(ctx *context) (err error) { +func (p *Proxy) do(ctx *context, parentSpan ot.Span) (err error) { defer func() { if r := recover(); r != nil { p.onPanicSometimes.Do(func() { @@ -1046,29 +1045,6 @@ func (p *Proxy) do(ctx *context) (err error) { return errMaxLoopbacksReached } - // this can be deleted after fixing - // https://github.com/zalando/skipper/issues/1238 problem - // happens if we get proxy errors, for example connect errors, - // which would block responses until fifo() timeouts. - defer func() { - stateBag := ctx.StateBag() - - pendingFIFO, _ := stateBag[scheduler.FIFOKey].([]func()) - for _, done := range pendingFIFO { - done() - } - - pendingLIFO, _ := stateBag[scheduler.LIFOKey].([]func()) - for _, done := range pendingLIFO { - done() - } - - // Cleanup state bag to avoid double call of done() - // because do() could be called for loopback backend - delete(stateBag, scheduler.FIFOKey) - delete(stateBag, scheduler.LIFOKey) - }() - // proxy global setting if !ctx.wasExecuted() { if settings, retryAfter := p.limiters.Check(ctx.request); retryAfter > 0 { @@ -1092,6 +1068,7 @@ func (p *Proxy) do(ctx *context) (err error) { p.makeErrorResponse(ctx, errRouteLookupFailed) return errRouteLookupFailed } + parentSpan.SetTag(SkipperRouteIDTag, route.Id) ctx.applyRoute(route, params, p.flags.PreserveHost()) @@ -1110,7 +1087,16 @@ func (p *Proxy) do(ctx *context) (err error) { ctx.ensureDefaultResponse() } else if ctx.route.BackendType == eskip.LoopBackend { loopCTX := ctx.clone() - if err := p.do(loopCTX); err != nil { + loopSpan := tracing.CreateSpan("loopback", ctx.request.Context(), p.tracing.tracer) + p.tracing. + setTag(loopSpan, SpanKindTag, SpanKindServer). + setTag(loopSpan, SkipperRouteIDTag, ctx.route.Id) + p.setCommonSpanInfo(ctx.Request().URL, ctx.Request(), loopSpan) + ctx.parentSpan = loopSpan + + defer loopSpan.Finish() + + if err := p.do(loopCTX, loopSpan); err != nil { // in case of error we have to copy the response in this recursion unwinding ctx.response = loopCTX.response if err != nil { @@ -1466,6 +1452,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx.startServe = time.Now() ctx.tracer = p.tracing.tracer ctx.initialSpan = span + ctx.parentSpan = span defer func() { if ctx.response != nil && ctx.response.Body != nil { @@ -1476,8 +1463,9 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { } }() - err := p.do(ctx) + err := p.do(ctx, span) + // writeTimeout() filter if d, ok := ctx.StateBag()[filters.WriteTimeout].(time.Duration); ok { e := ctx.ResponseController().SetWriteDeadline(time.Now().Add(d)) if e != nil { @@ -1485,12 +1473,22 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } + // stream response body to client if err != nil { p.errorResponse(ctx, err) } else { p.serveResponse(ctx) } + // fifoWtihBody() filter + if sbf, ok := ctx.StateBag()[filters.FifoWithBodyName]; ok { + if fs, ok := sbf.([]func()); ok { + for i := len(fs) - 1; i >= 0; i-- { + fs[i]() + } + } + } + if ctx.cancelBackendContext != nil { ctx.cancelBackendContext() } diff --git a/proxy/tracing_test.go b/proxy/tracing_test.go index 7962358f82..10c55b96eb 100644 --- a/proxy/tracing_test.go +++ b/proxy/tracing_test.go @@ -84,7 +84,8 @@ func TestTracingIngressSpan(t *testing.T) { }) defer s.Close() - doc := fmt.Sprintf(`hello: Path("/hello") -> setPath("/bye") -> setQuery("void") -> "%s"`, s.URL) + routeID := "ingressRoute" + doc := fmt.Sprintf(`%s: Path("/hello") -> setPath("/bye") -> setQuery("void") -> "%s"`, routeID, s.URL) tracer := mocktracer.New() params := Params{ @@ -126,6 +127,7 @@ func TestTracingIngressSpan(t *testing.T) { verifyTag(t, span, SpanKindTag, SpanKindServer) verifyTag(t, span, ComponentTag, "skipper") + verifyTag(t, span, SkipperRouteIDTag, routeID) // to save memory we dropped the URL tag from ingress span //verifyTag(t, span, HTTPUrlTag, "/hello?world") // For server requests there is no scheme://host:port, see https://golang.org/pkg/net/http/#Request verifyTag(t, span, HTTPMethodTag, "GET") @@ -137,6 +139,139 @@ func TestTracingIngressSpan(t *testing.T) { verifyHasTag(t, span, HTTPRemoteIPTag) } +func TestTracingIngressSpanShunt(t *testing.T) { + routeID := "ingressShuntRoute" + doc := fmt.Sprintf(`%s: Path("/hello") -> setPath("/bye") -> setQuery("void") -> status(205) -> `, routeID) + + tracer := mocktracer.New() + params := Params{ + OpenTracing: &OpenTracingParams{ + Tracer: tracer, + }, + Flags: FlagsNone, + } + + t.Setenv("HOSTNAME", "ingress-shunt.tracing.test") + + tp, err := newTestProxyWithParams(doc, params) + if err != nil { + t.Fatal(err) + } + defer tp.close() + + ps := httptest.NewServer(tp.proxy) + defer ps.Close() + + req, err := http.NewRequest("GET", ps.URL+"/hello?world", nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("X-Flow-Id", "test-flow-id") + + rsp, err := ps.Client().Do(req) + if err != nil { + t.Fatal(err) + } + defer rsp.Body.Close() + io.Copy(io.Discard, rsp.Body) + + // client may get response before proxy finishes span + time.Sleep(10 * time.Millisecond) + + span, ok := findSpan(tracer, "ingress") + if !ok { + t.Fatal("ingress span not found") + } + + verifyTag(t, span, SpanKindTag, SpanKindServer) + verifyTag(t, span, ComponentTag, "skipper") + verifyTag(t, span, SkipperRouteIDTag, routeID) + // to save memory we dropped the URL tag from ingress span + //verifyTag(t, span, HTTPUrlTag, "/hello?world") // For server requests there is no scheme://host:port, see https://golang.org/pkg/net/http/#Request + verifyTag(t, span, HTTPMethodTag, "GET") + verifyTag(t, span, HostnameTag, "ingress-shunt.tracing.test") + verifyTag(t, span, HTTPPathTag, "/hello") + verifyTag(t, span, HTTPHostTag, ps.Listener.Addr().String()) + verifyTag(t, span, FlowIDTag, "test-flow-id") + verifyTag(t, span, HTTPStatusCodeTag, uint16(205)) + verifyHasTag(t, span, HTTPRemoteIPTag) +} + +func TestTracingIngressSpanLoopback(t *testing.T) { + shuntRouteID := "ingressShuntRoute" + loop1RouteID := "loop1Route" + loop2RouteID := "loop2Route" + routeIDs := []string{loop2RouteID, loop1RouteID, shuntRouteID} + paths := map[string]string{ + loop2RouteID: "/loop2", + loop1RouteID: "/loop1", + shuntRouteID: "/shunt", + } + + doc := fmt.Sprintf(` +%s: Path("/shunt") -> setPath("/bye") -> setQuery("void") -> status(204) -> ; +%s: Path("/loop1") -> setPath("/shunt") -> ; +%s: Path("/loop2") -> setPath("/loop1") -> ; +`, shuntRouteID, loop1RouteID, loop2RouteID) + + tracer := mocktracer.New() + params := Params{ + OpenTracing: &OpenTracingParams{ + Tracer: tracer, + }, + Flags: FlagsNone, + } + + t.Setenv("HOSTNAME", "ingress-loop.tracing.test") + + tp, err := newTestProxyWithParams(doc, params) + if err != nil { + t.Fatal(err) + } + defer tp.close() + + ps := httptest.NewServer(tp.proxy) + defer ps.Close() + + req, err := http.NewRequest("GET", ps.URL+"/loop2", nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("X-Flow-Id", "test-flow-id") + + rsp, err := ps.Client().Do(req) + if err != nil { + t.Fatal(err) + } + defer rsp.Body.Close() + io.Copy(io.Discard, rsp.Body) + t.Logf("got response %d", rsp.StatusCode) + + // client may get response before proxy finishes span + time.Sleep(10 * time.Millisecond) + + sp, ok := findSpanByRouteID(tracer, loop2RouteID) + if !ok { + t.Fatalf("span for route %q not found", loop2RouteID) + } + verifyTag(t, sp, HTTPStatusCodeTag, uint16(204)) + + for _, rid := range routeIDs { + span, ok := findSpanByRouteID(tracer, rid) + if !ok { + t.Fatalf("span for route %q not found", rid) + } + verifyTag(t, span, SpanKindTag, SpanKindServer) + verifyTag(t, span, ComponentTag, "skipper") + verifyTag(t, span, SkipperRouteIDTag, rid) + verifyTag(t, span, HTTPMethodTag, "GET") + verifyTag(t, span, HostnameTag, "ingress-loop.tracing.test") + verifyTag(t, span, HTTPPathTag, paths[rid]) + verifyTag(t, span, HTTPHostTag, ps.Listener.Addr().String()) + verifyTag(t, span, FlowIDTag, "test-flow-id") + } +} + func TestTracingSpanName(t *testing.T) { traceContent := fmt.Sprintf("%x", md5.New().Sum([]byte(time.Now().String()))) s := startTestServer(nil, 0, func(r *http.Request) { @@ -562,6 +697,15 @@ func findSpan(tracer *mocktracer.MockTracer, name string) (*mocktracer.MockSpan, return nil, false } +func findSpanByRouteID(tracer *mocktracer.MockTracer, routeID string) (*mocktracer.MockSpan, bool) { + for _, s := range tracer.FinishedSpans() { + if s.Tag(SkipperRouteIDTag) == routeID { + return s, true + } + } + return nil, false +} + func verifyTag(t *testing.T, span *mocktracer.MockSpan, name string, expected interface{}) { t.Helper() if got := span.Tag(name); got != expected { diff --git a/routesrv/eskipbytes.go b/routesrv/eskipbytes.go index 620b2415d7..da9f47c593 100644 --- a/routesrv/eskipbytes.go +++ b/routesrv/eskipbytes.go @@ -2,32 +2,69 @@ package routesrv import ( "bytes" + "compress/gzip" "crypto/sha256" "fmt" "net/http" "strconv" + "strings" "sync" "time" ot "github.com/opentracing/opentracing-go" "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/metrics" "github.com/zalando/skipper/routing" "github.com/zalando/skipper/tracing" ) +type responseWriterInterceptor struct { + http.ResponseWriter + statusCode int + bytesWritten int +} + +func (w *responseWriterInterceptor) WriteHeader(statusCode int) { + w.statusCode = statusCode + w.ResponseWriter.WriteHeader(statusCode) +} + +func (w *responseWriterInterceptor) Header() http.Header { + return w.ResponseWriter.Header() +} + +func (w *responseWriterInterceptor) Write(p []byte) (int, error) { + w.bytesWritten += len(p) + return w.ResponseWriter.Write(p) +} + +// Unwrap will be used by ResponseController, so if they will use that +// to get the ResponseWrite for some reason they can do it. +func (w *responseWriterInterceptor) Unwrap() http.ResponseWriter { + return w.ResponseWriter +} + +var ( + _ http.ResponseWriter = &responseWriterInterceptor{} +) + // eskipBytes keeps eskip-formatted routes as a byte slice and // provides synchronized r/w access to them. Additionally it can // serve as an HTTP handler exposing its content. type eskipBytes struct { + mu sync.RWMutex data []byte - etag string + hash string lastModified time.Time initialized bool count int - mu sync.RWMutex - tracer ot.Tracer - now func() time.Time + zw *gzip.Writer + zdata []byte + + tracer ot.Tracer + metrics metrics.Metrics + now func() time.Time } // formatAndSet takes a slice of routes and stores them eskip-formatted @@ -45,18 +82,52 @@ func (e *eskipBytes) formatAndSet(routes []*eskip.Route) (_ int, _ string, initi if updated { e.lastModified = e.now() e.data = data - e.etag = fmt.Sprintf(`"%x"`, sha256.Sum256(e.data)) + e.zdata = e.compressLocked(data) + e.hash = fmt.Sprintf("%x", sha256.Sum256(e.data)) e.count = len(routes) } initialized = !e.initialized e.initialized = true - return len(e.data), e.etag, initialized, updated + return len(e.data), e.hash, initialized, updated } -func (e *eskipBytes) ServeHTTP(w http.ResponseWriter, r *http.Request) { +// compressLocked compresses the data with gzip and returns +// the compressed data or nil if compression fails. +// e.mu must be held. +func (e *eskipBytes) compressLocked(data []byte) []byte { + var buf bytes.Buffer + if e.zw == nil { + e.zw = gzip.NewWriter(&buf) + } else { + e.zw.Reset(&buf) + } + if _, err := e.zw.Write(data); err != nil { + return nil + } + if err := e.zw.Close(); err != nil { + return nil + } + return buf.Bytes() +} + +func (e *eskipBytes) ServeHTTP(rw http.ResponseWriter, r *http.Request) { span := tracing.CreateSpan("serve_routes", r.Context(), e.tracer) defer span.Finish() + start := time.Now() + defer e.metrics.MeasureBackend("routersv", start) + + w := &responseWriterInterceptor{ + ResponseWriter: rw, + statusCode: http.StatusOK, + } + + defer func() { + span.SetTag("status", w.statusCode) + span.SetTag("bytes", w.bytesWritten) + + e.metrics.IncCounter(strconv.Itoa(w.statusCode)) + }() if r.Method != "GET" && r.Method != "HEAD" { w.WriteHeader(http.StatusMethodNotAllowed) @@ -66,17 +137,24 @@ func (e *eskipBytes) ServeHTTP(w http.ResponseWriter, r *http.Request) { e.mu.RLock() count := e.count data := e.data - etag := e.etag + zdata := e.zdata + hash := e.hash lastModified := e.lastModified initialized := e.initialized e.mu.RUnlock() if initialized { - w.Header().Add("Etag", etag) - w.Header().Add("Content-Type", "text/plain; charset=utf-8") - w.Header().Add(routing.RoutesCountName, strconv.Itoa(count)) - - http.ServeContent(w, r, "", lastModified, bytes.NewReader(data)) + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.Header().Set(routing.RoutesCountName, strconv.Itoa(count)) + + if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && len(zdata) > 0 { + w.Header().Set("Etag", `"`+hash+`+gzip"`) + w.Header().Set("Content-Encoding", "gzip") + http.ServeContent(w, r, "", lastModified, bytes.NewReader(zdata)) + } else { + w.Header().Set("Etag", `"`+hash+`"`) + http.ServeContent(w, r, "", lastModified, bytes.NewReader(data)) + } } else { w.WriteHeader(http.StatusNotFound) } diff --git a/routesrv/polling.go b/routesrv/polling.go index b262e5a4dd..28702e5da2 100644 --- a/routesrv/polling.go +++ b/routesrv/polling.go @@ -8,11 +8,10 @@ import ( "time" ot "github.com/opentracing/opentracing-go" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" log "github.com/sirupsen/logrus" "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/filters/auth" + "github.com/zalando/skipper/metrics" "github.com/zalando/skipper/routing" "github.com/zalando/skipper/tracing" ) @@ -26,24 +25,6 @@ const ( LogRoutesUpdated = "routes updated" ) -var ( - pollingStarted = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "routesrv", - Name: "polling_started_timestamp", - Help: "UNIX time when the routes polling has started", - }) - routesInitialized = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "routesrv", - Name: "routes_initialized_timestamp", - Help: "UNIX time when the first routes were received and stored", - }) - routesUpdated = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "routesrv", - Name: "routes_updated_timestamp", - Help: "UNIX time of the last routes update (initial load counts as well)", - }) -) - type poller struct { client routing.DataClient b *eskipBytes @@ -56,8 +37,9 @@ type poller struct { editRoute []*eskip.Editor cloneRoute []*eskip.Clone - // tracer - tracer ot.Tracer + // visibility + tracer ot.Tracer + metrics metrics.Metrics } func (p *poller) poll(wg *sync.WaitGroup) { @@ -66,21 +48,20 @@ func (p *poller) poll(wg *sync.WaitGroup) { log.WithField("timeout", p.timeout).Info(LogPollingStarted) ticker := time.NewTicker(p.timeout) defer ticker.Stop() - pollingStarted.SetToCurrentTime() + p.setGaugeToCurrentTime("polling_started_timestamp") var lastRoutesById map[string]string for { span := tracing.CreateSpan("poll_routes", context.TODO(), p.tracer) routes, err := p.client.LoadAll() - routes = p.process(routes) - routesCount := len(routes) switch { case err != nil: log.WithError(err).Error(LogRoutesFetchingFailed) + p.metrics.IncCounter("routes.fetch_errors") span.SetTag("error", true) span.LogKV( @@ -89,6 +70,7 @@ func (p *poller) poll(wg *sync.WaitGroup) { ) case routesCount == 0: log.Error(LogRoutesEmpty) + p.metrics.IncCounter("routes.empty") span.SetTag("error", true) span.LogKV( @@ -96,21 +78,23 @@ func (p *poller) poll(wg *sync.WaitGroup) { "message", LogRoutesEmpty, ) case routesCount > 0: - routesBytes, routesEtag, initialized, updated := p.b.formatAndSet(routes) - logger := log.WithFields(log.Fields{"count": routesCount, "bytes": routesBytes, "etag": routesEtag}) + routesBytes, routesHash, initialized, updated := p.b.formatAndSet(routes) + logger := log.WithFields(log.Fields{"count": routesCount, "bytes": routesBytes, "hash": routesHash}) if initialized { logger.Info(LogRoutesInitialized) span.SetTag("routes.initialized", true) - routesInitialized.SetToCurrentTime() + p.setGaugeToCurrentTime("routes.initialized_timestamp") } if updated { logger.Info(LogRoutesUpdated) span.SetTag("routes.updated", true) - routesUpdated.SetToCurrentTime() + p.setGaugeToCurrentTime("routes.updated_timestamp") + p.metrics.UpdateGauge("routes.total", float64(routesCount)) + p.metrics.UpdateGauge("routes.byte", float64(routesBytes)) } span.SetTag("routes.count", routesCount) span.SetTag("routes.bytes", routesBytes) - span.SetTag("routes.etag", routesEtag) + span.SetTag("routes.hash", routesHash) if updated && log.IsLevelEnabled(log.DebugLevel) { routesById := mapRoutes(routes) @@ -154,6 +138,10 @@ func (p *poller) process(routes []*eskip.Route) []*eskip.Route { return routes } +func (p *poller) setGaugeToCurrentTime(name string) { + p.metrics.UpdateGauge(name, (float64(time.Now().UnixNano()) / 1e9)) +} + func mapRoutes(routes []*eskip.Route) map[string]string { byId := make(map[string]string) for _, r := range routes { diff --git a/routesrv/redishandler.go b/routesrv/redishandler.go index 1a4fc10ea9..45ad66e715 100644 --- a/routesrv/redishandler.go +++ b/routesrv/redishandler.go @@ -8,6 +8,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/zalando/skipper/dataclients/kubernetes" + "github.com/zalando/skipper/metrics" ) type RedisHandler struct { @@ -37,10 +38,11 @@ func (rh *RedisHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Write(address) } -func getRedisAddresses(namespace, name string, kdc *kubernetes.Client) func() ([]byte, error) { +func getRedisAddresses(namespace, name string, kdc *kubernetes.Client, m metrics.Metrics) func() ([]byte, error) { return func() ([]byte, error) { a := kdc.GetEndpointAddresses(namespace, name) log.Debugf("Redis updater called and found %d redis endpoints: %v", len(a), a) + m.UpdateGauge("redis_endpoints", float64(len(a))) result := RedisEndpoints{} for i := 0; i < len(a); i++ { diff --git a/routesrv/routesrv.go b/routesrv/routesrv.go index d333411199..a4a6aeb1a7 100644 --- a/routesrv/routesrv.go +++ b/routesrv/routesrv.go @@ -9,21 +9,24 @@ import ( "syscall" "time" - "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "github.com/zalando/skipper" "github.com/zalando/skipper/dataclients/kubernetes" "github.com/zalando/skipper/filters/auth" + "github.com/zalando/skipper/metrics" "github.com/zalando/skipper/tracing" ) // RouteServer is used to serve eskip-formatted routes, // that originate from the polled data source. type RouteServer struct { - server *http.Server - poller *poller - wg *sync.WaitGroup + metrics metrics.Metrics + server *http.Server + supportServer *http.Server + poller *poller + wg *sync.WaitGroup } // New returns an initialized route server according to the passed options. @@ -31,7 +34,27 @@ type RouteServer struct { // will stay in an uninitialized state, till StartUpdates is called and // in effect data source is queried and routes initialized/updated. func New(opts skipper.Options) (*RouteServer, error) { - rs := &RouteServer{} + if opts.PrometheusRegistry == nil { + opts.PrometheusRegistry = prometheus.NewRegistry() + } + + mopt := metrics.Options{ + Format: metrics.PrometheusKind, + Prefix: "routesrv", + PrometheusRegistry: opts.PrometheusRegistry, + EnableDebugGcMetrics: true, + EnableRuntimeMetrics: true, + EnableProfile: opts.EnableProfile, + BlockProfileRate: opts.BlockProfileRate, + MutexProfileFraction: opts.MutexProfileFraction, + MemProfileRate: opts.MemProfileRate, + } + m := metrics.NewMetrics(mopt) + metricsHandler := metrics.NewHandler(mopt, m) + + rs := &RouteServer{ + metrics: m, + } opentracingOpts := opts.OpenTracing if len(opentracingOpts) == 0 { @@ -42,12 +65,25 @@ func New(opts skipper.Options) (*RouteServer, error) { return nil, err } - b := &eskipBytes{tracer: tracer, now: time.Now} - bs := &eskipBytesStatus{b: b} - handler := http.NewServeMux() - handler.Handle("/health", bs) - handler.Handle("/routes", b) - handler.Handle("/metrics", promhttp.Handler()) + b := &eskipBytes{ + tracer: tracer, + metrics: m, + now: time.Now, + } + bs := &eskipBytesStatus{ + b: b, + } + mux := http.NewServeMux() + mux.Handle("/health", bs) + mux.Handle("/routes", b) + supportHandler := http.NewServeMux() + supportHandler.Handle("/metrics", metricsHandler) + supportHandler.Handle("/metrics/", metricsHandler) + + if opts.EnableProfile { + supportHandler.Handle("/debug/pprof", metricsHandler) + supportHandler.Handle("/debug/pprof/", metricsHandler) + } dataclient, err := kubernetes.New(opts.KubernetesDataClientOptions()) if err != nil { @@ -68,13 +104,20 @@ func New(opts skipper.Options) (*RouteServer, error) { if err != nil { return nil, err } - rh.AddrUpdater = getRedisAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName, dataclient) - handler.Handle("/swarm/redis/shards", rh) + rh.AddrUpdater = getRedisAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName, dataclient, m) + mux.Handle("/swarm/redis/shards", rh) } rs.server = &http.Server{ Addr: opts.Address, - Handler: handler, + Handler: mux, + ReadTimeout: 1 * time.Minute, + ReadHeaderTimeout: 1 * time.Minute, + } + + rs.supportServer = &http.Server{ + Addr: opts.SupportListener, + Handler: supportHandler, ReadTimeout: 1 * time.Minute, ReadHeaderTimeout: 1 * time.Minute, } @@ -89,6 +132,7 @@ func New(opts skipper.Options) (*RouteServer, error) { cloneRoute: opts.CloneRoute, oauth2Config: oauthConfig, tracer: tracer, + metrics: m, } rs.wg = &sync.WaitGroup{} @@ -114,6 +158,15 @@ func (rs *RouteServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { rs.server.Handler.ServeHTTP(w, r) } +func (rs *RouteServer) startSupportListener() { + if rs.supportServer != nil { + err := rs.supportServer.ListenAndServe() + if err != nil { + log.Errorf("Failed support listener: %v", err) + } + } +} + func newShutdownFunc(rs *RouteServer) func(delay time.Duration) { once := sync.Once{} rs.wg.Add(1) @@ -125,6 +178,12 @@ func newShutdownFunc(rs *RouteServer) func(delay time.Duration) { log.Infof("shutting down the server in %s...", delay) time.Sleep(delay) + if rs.supportServer != nil { + if err := rs.supportServer.Shutdown(context.Background()); err != nil { + log.Error("unable to shut down the support server: ", err) + } + log.Info("supportServer shut down") + } if err := rs.server.Shutdown(context.Background()); err != nil { log.Error("unable to shut down the server: ", err) } @@ -133,21 +192,11 @@ func newShutdownFunc(rs *RouteServer) func(delay time.Duration) { } } -// Run starts a route server set up according to the passed options. -// It is a blocking call designed to be used as a single call/entry point, -// when running the route server as a standalone binary. It returns, when -// the server is closed, which can happen due to server startup errors or -// gracefully handled SIGTERM signal. In case of a server startup error, -// the error is returned as is. -func Run(opts skipper.Options) error { - rs, err := New(opts) - if err != nil { - return err - } +func run(rs *RouteServer, opts skipper.Options, sigs chan os.Signal) error { + var err error shutdown := newShutdownFunc(rs) - sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) go func() { <-sigs @@ -156,6 +205,7 @@ func Run(opts skipper.Options) error { rs.StartUpdates() + go rs.startSupportListener() if err = rs.server.ListenAndServe(); err != http.ErrServerClosed { go shutdown(0) } else { @@ -166,3 +216,19 @@ func Run(opts skipper.Options) error { return err } + +// Run starts a route server set up according to the passed options. +// It is a blocking call designed to be used as a single call/entry point, +// when running the route server as a standalone binary. It returns, when +// the server is closed, which can happen due to server startup errors or +// gracefully handled SIGTERM signal. In case of a server startup error, +// the error is returned as is. +func Run(opts skipper.Options) error { + rs, err := New(opts) + if err != nil { + return err + } + sigs := make(chan os.Signal, 1) + return run(rs, opts, sigs) + +} diff --git a/routesrv/routesrv_test.go b/routesrv/routesrv_test.go index c92bb3ccf9..0326932519 100644 --- a/routesrv/routesrv_test.go +++ b/routesrv/routesrv_test.go @@ -2,6 +2,7 @@ package routesrv_test import ( "bytes" + "compress/gzip" "flag" "io" "net/http" @@ -16,6 +17,8 @@ import ( "github.com/google/go-cmp/cmp" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/zalando/skipper" "github.com/zalando/skipper/dataclients/kubernetes/kubernetestest" "github.com/zalando/skipper/eskip" @@ -109,6 +112,14 @@ func getRoutes(rs *routesrv.RouteServer) *httptest.ResponseRecorder { return w } +func getHealth(rs *routesrv.RouteServer) *httptest.ResponseRecorder { + w := httptest.NewRecorder() + r := httptest.NewRequest("GET", "/health", nil) + rs.ServeHTTP(w, r) + + return w +} + func getRedisURLs(rs *routesrv.RouteServer) *httptest.ResponseRecorder { w := httptest.NewRecorder() r := httptest.NewRequest("GET", "/swarm/redis/shards", nil) @@ -171,6 +182,9 @@ func TestNotInitializedRoutesAreNotServed(t *testing.T) { t.Error("uninitialized routes were served") } wantHTTPCode(t, w, http.StatusNotFound) + + w = getHealth(rs) + wantHTTPCode(t, w, http.StatusServiceUnavailable) } func TestEmptyRoutesAreNotServed(t *testing.T) { @@ -218,6 +232,9 @@ func TestFetchedRoutesAreServedInEskipFormat(t *testing.T) { t.Errorf("served routes do not reflect kubernetes resources: %s", cmp.Diff(got, want)) } wantHTTPCode(t, w, http.StatusOK) + + w = getHealth(rs) + wantHTTPCode(t, w, http.StatusNoContent) } func TestRedisEndpointSlices(t *testing.T) { @@ -709,3 +726,93 @@ func TestRoutesWithExplicitLBAlgorithm(t *testing.T) { } wantHTTPCode(t, responseRecorder, http.StatusOK) } + +func TestESkipBytesHandlerGzip(t *testing.T) { + defer tl.Reset() + ks, handler := newKubeServer(t, loadKubeYAML(t, "testdata/lb-target-multi.yaml")) + ks.Start() + defer ks.Close() + rs := newRouteServer(t, ks) + + rs.StartUpdates() + defer rs.StopUpdates() + + testGzipResponse := func(t *testing.T, count int) { + // Get plain response + plainResponse := getRoutes(rs) + plainEtag := plainResponse.Header().Get("Etag") + plainContent := plainResponse.Body.Bytes() + + // Get gzip response + gzipResponse := getRoutesWithRequestHeadersSetting(rs, map[string]string{"Accept-Encoding": "gzip"}) + assert.Equal(t, http.StatusOK, gzipResponse.Code) + assert.Equal(t, "text/plain; charset=utf-8", gzipResponse.Header().Get("Content-Type")) + assert.Equal(t, "gzip", gzipResponse.Header().Get("Content-Encoding")) + assert.Equal(t, strconv.Itoa(count), gzipResponse.Header().Get("X-Count")) + + gzipEtag := gzipResponse.Header().Get("Etag") + assert.NotEqual(t, plainEtag, gzipEtag, "gzip Etag should differ from plain Etag") + + zr, err := gzip.NewReader(gzipResponse.Body) + require.NoError(t, err) + defer zr.Close() + + gzipContent, err := io.ReadAll(zr) + require.NoError(t, err) + + assert.Equal(t, plainContent, gzipContent, "gzip content should be equal to plain content") + + // Get gzip response using Etag + gzipEtagResponse := getRoutesWithRequestHeadersSetting(rs, map[string]string{"If-None-Match": gzipEtag, "Accept-Encoding": "gzip"}) + + assert.Equal(t, http.StatusNotModified, gzipEtagResponse.Code) + // RFC 7232 section 4.1: + assert.Empty(t, gzipEtagResponse.Header().Get("Content-Type")) + assert.Empty(t, gzipEtagResponse.Header().Get("Content-Length")) + assert.Empty(t, gzipEtagResponse.Header().Get("Content-Encoding")) + assert.Equal(t, strconv.Itoa(count), gzipEtagResponse.Header().Get("X-Count")) + assert.Empty(t, gzipEtagResponse.Body.String()) + } + + require.NoError(t, tl.WaitFor(routesrv.LogRoutesInitialized, waitTimeout)) + testGzipResponse(t, 3) + + handler.set(newKubeAPI(t, loadKubeYAML(t, "testdata/lb-target-single.yaml"))) + require.NoError(t, tl.WaitForN(routesrv.LogRoutesUpdated, 2, waitTimeout)) + + testGzipResponse(t, 2) +} + +func TestESkipBytesHandlerGzipServedForDefaultClient(t *testing.T) { + defer tl.Reset() + ks, _ := newKubeServer(t, loadKubeYAML(t, "testdata/lb-target-multi.yaml")) + ks.Start() + defer ks.Close() + + rs, err := routesrv.New(skipper.Options{ + SourcePollTimeout: pollInterval, + KubernetesURL: ks.URL, + }) + require.NoError(t, err) + + rs.StartUpdates() + defer rs.StopUpdates() + + require.NoError(t, tl.WaitFor(routesrv.LogRoutesInitialized, waitTimeout)) + + ts := httptest.NewServer(rs) + defer ts.Close() + + resp, err := ts.Client().Get(ts.URL + "/routes") + require.NoError(t, err) + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.True(t, resp.Uncompressed, "expected uncompressed body") + + b, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + routes, err := eskip.Parse(string(b)) + require.NoError(t, err) + assert.Len(t, routes, 3) +} diff --git a/routesrv/shutdown_test.go b/routesrv/shutdown_test.go new file mode 100644 index 0000000000..4ce164be6c --- /dev/null +++ b/routesrv/shutdown_test.go @@ -0,0 +1,187 @@ +package routesrv + +import ( + "bytes" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "os" + "strings" + "sync" + "syscall" + "testing" + "time" + + "github.com/zalando/skipper" + "github.com/zalando/skipper/dataclients/kubernetes/kubernetestest" +) + +type muxHandler struct { + handler http.Handler + mu sync.RWMutex +} + +func (m *muxHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + m.mu.RLock() + defer m.mu.RUnlock() + + m.handler.ServeHTTP(w, r) +} + +func newKubeAPI(t *testing.T, specs ...io.Reader) http.Handler { + t.Helper() + api, err := kubernetestest.NewAPI(kubernetestest.TestAPIOptions{}, specs...) + if err != nil { + t.Fatalf("cannot initialize kubernetes api: %s", err) + } + return api +} +func newKubeServer(t *testing.T, specs ...io.Reader) (*httptest.Server, *muxHandler) { + t.Helper() + handler := &muxHandler{handler: newKubeAPI(t, specs...)} + return httptest.NewUnstartedServer(handler), handler +} + +func loadKubeYAML(t *testing.T, path string) io.Reader { + t.Helper() + y, err := os.ReadFile(path) + if err != nil { + t.Fatalf("failed to open kubernetes resources fixture %s: %v", path, err) + } + return bytes.NewBuffer(y) +} + +func findAddress() (string, error) { + l, err := net.ListenTCP("tcp6", &net.TCPAddr{}) + if err != nil { + return "", err + } + + defer l.Close() + return l.Addr().String(), nil +} + +func TestServerShutdownHTTP(t *testing.T) { + ks, _ := newKubeServer(t, loadKubeYAML(t, "testdata/lb-target-multi.yaml")) + defer ks.Close() + ks.Start() + + o := skipper.Options{ + KubernetesURL: "http://" + ks.Listener.Addr().String(), + SourcePollTimeout: 500 * time.Millisecond, + } + const shutdownDelay = 1 * time.Second + + address, err := findAddress() + if err != nil { + t.Fatalf("Failed to find address: %v", err) + } + supportAddress, err := findAddress() + if err != nil { + t.Fatalf("Failed to find supportAddress: %v", err) + } + + o.Address, o.SupportListener, o.WaitForHealthcheckInterval = address, supportAddress, shutdownDelay + baseURL := "http://" + address + supportBaseURL := "http://" + supportAddress + testEndpoints := []string{baseURL + "/routes", supportBaseURL + "/metrics"} + + t.Logf("kube endpoint: %q", o.KubernetesURL) + for _, u := range testEndpoints { + t.Logf("test endpoint: %q", u) + } + + rs, err := New(o) + if err != nil { + t.Fatalf("Failed to create a routesrv: %v", err) + } + + time.Sleep(o.SourcePollTimeout * 2) + + cli := http.Client{ + Timeout: time.Second, + } + rsp, err := cli.Get(o.KubernetesURL + "/api/v1/services") + if err != nil { + t.Fatalf("Failed to get %q: %v", o.KubernetesURL, err) + } + if rsp.StatusCode != 200 { + t.Fatalf("Failed to get status OK for %q: %d", o.KubernetesURL, rsp.StatusCode) + } + + sigs := make(chan os.Signal, 1) + + errCh := make(chan error) + go func() { + err := run(rs, o, sigs) + if err != nil { + errCh <- err + } + }() + + // make sure we started all listeners correctly + for i := 0; i < 5; i++ { + var ( + err error + rsp *http.Response + ) + + for _, u := range testEndpoints { + rsp, err = http.DefaultClient.Get(u) + if err != nil { + err = fmt.Errorf("failed to get %q: %v", u, err) + time.Sleep(10 * time.Millisecond) + continue + } + if rsp.StatusCode != 200 { + err = fmt.Errorf("failed to get expected status code 200 for %q, got: %d", u, rsp.StatusCode) + + time.Sleep(10 * time.Millisecond) + continue + } + err = nil + } + if i == 4 && err != nil { + t.Fatalf("Failed to get %v", err) + } + } + + // initiate shutdown + sigs <- syscall.SIGTERM + + // test that we can fetch even within termination + time.Sleep(shutdownDelay / 2) + + for _, u := range testEndpoints { + rsp, err := http.DefaultClient.Get(u) + if err != nil { + t.Fatalf("Failed to get %q after SIGTERM: %v", u, err) + } + if rsp.StatusCode != 200 { + t.Fatalf("Failed to get expected status code 200 for %q after SIGTERM, got: %d", u, rsp.StatusCode) + } + } + + // test that we get connection refused after shutdown + time.Sleep(shutdownDelay / 2) + + for _, u := range testEndpoints { + _, err = http.DefaultClient.Get(u) + switch err { + case nil: + t.Fatalf("Failed to get error as expected: %q", u) + default: + if e := err.Error(); !strings.Contains(e, "refused") { + t.Fatalf("Failed to get connection refused, got: %s", e) + } + } + } + + select { + case err := <-errCh: + t.Fatalf("Failed to shutdown: %v", err) + default: + } +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 5da0dd803e..e2e7de35e4 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -155,6 +155,20 @@ func (fq *fifoQueue) wait(ctx context.Context) (func(), error) { cnt := fq.counter fq.mu.RUnlock() + // check request context expired + // https://github.com/golang/go/issues/63615 + if err := ctx.Err(); err != nil { + switch err { + case context.DeadlineExceeded: + return nil, ErrQueueTimeout + case context.Canceled: + return nil, ErrClientCanceled + default: + // does not exist yet in Go stdlib as of Go1.18.4 + return nil, err + } + } + // handle queue all := cnt.Add(1) // queue full? diff --git a/skipper.go b/skipper.go index 97ff66990f..eb75610a60 100644 --- a/skipper.go +++ b/skipper.go @@ -1584,6 +1584,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { block.NewBlock(o.MaxMatcherBufferSize), block.NewBlockHex(o.MaxMatcherBufferSize), auth.NewBearerInjector(sp), + auth.NewSetRequestHeaderFromSecret(sp), auth.NewJwtValidationWithOptions(tio), auth.TokenintrospectionWithOptions(auth.NewOAuthTokenintrospectionAnyClaims, tio), auth.TokenintrospectionWithOptions(auth.NewOAuthTokenintrospectionAllClaims, tio),