Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support easegress filters and resilience in k8s gateway via extensionRef #1140

Merged
merged 7 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions docs/04.Cloud-Native/4.2.Gateway-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [Deploy Gateway API Objects](#deploy-gateway-api-objects)
- [Expose The Listener Port](#expose-the-listener-port)
- [Verification](#verification)
- [Extensions](#extensions)


The `GatewayController` is an implementation of
Expand Down Expand Up @@ -48,6 +49,10 @@ rules:
- apiGroups: ["gateway.networking.k8s.io"]
resources: ["gatewayclasses/status", "gateways/status"]
verbs: ["update"]
# used for extensions
- apiGroups: ["easegress.megaease.com"]
resources: ["filterspecs"]
verbs: ["get", "watch", "list"]

---
apiVersion: v1
Expand Down Expand Up @@ -355,3 +360,127 @@ Hello, world!
Version: 2.0.0
Hostname: hello-deployment-7855bc9747-n4qsx
```

### Extensions

To leverage additional Easegress features (such as more filters and resilience policies) with the Kubernetes Gateway API, you can use `extensionRef`s in `HTTPRoute`.


First, you need to create custom resources:

```yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: filterspecs.easegress.megaease.com
spec:
group: easegress.megaease.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
name:
type: string
kind:
type: string
spec:
type: string
scope: Namespaced
names:
plural: filterspecs
singular: filterspec
kind: FilterSpec
```

Next, define some custom resources:

```yaml
apiVersion: easegress.megaease.com/v1
kind: FilterSpec
metadata:
name: rate-limiter
spec:
name: rate-limiter
kind: RateLimiter
spec: |
policies:
- name: policy
limitRefreshPeriod: 1000ms
limitForPeriod: 1
defaultPolicyRef: policy
urls:
- url:
prefix: /
policyRef: policy

---

apiVersion: easegress.megaease.com/v1
kind: FilterSpec
metadata:
name: circuit-breaker
spec:
name: circuit-breaker
kind: CircuitBreaker
spec: |
slidingWindowType: COUNT_BASED
failureRateThreshold: 50
slidingWindowSize: 100
slowCallRateThreshold: 60
slowCallDurationThreshold: 30s
minimumNumberOfCalls: 10
waitDurationInOpenState: 2m
maxWaitDurationInHalfOpenState: 1m
permittedNumberOfCallsInHalfOpenState: 10
```

Here, we define a `RateLimiter` filter and a `CircuitBreaker` resilience policy.

These can be referenced in an `HTTPRoute`:

```yaml
apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
metadata:
name: example-route-3
spec:
parentRefs:
- kind: Gateway
name: example-gateway
sectionName: example-listener
hostnames:
- megaease.com
rules:
- matches:
- path:
value: /3
filters:
- type: ExtensionRef
extensionRef:
group: "easegress.megaease.com"
kind: "FilterSpec"
name: "rate-limiter"
- type: ExtensionRef
extensionRef:
group: "easegress.megaease.com"
kind: "FilterSpec"
name: "circuit-breaker"
backendRefs:
- name: hello-service
port: 60002
```

With this setup, the `rate-limiter` and `circuit-breaker` from `FilterSpec` in the `easegress.megaease.com` group are utilized in the `HTTPRoute`. In fact, all Easegress filters and resilience policies can be extended in this manner.

The execution order of these filters is as follows:

- Filters in `extensionRef`, except for `ResponseAdaptor` and `ResponseBuilder`, in the order of their definition.
- Kubernetes built-in filters, such as `RequestHeaderModifier`.
- `ResponseAdaptor` and `ResponseBuilder`, if they exist.
4 changes: 2 additions & 2 deletions docs/07.Reference/7.02.Filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -626,9 +626,9 @@ urls:

| Name | Type | Description | Required |
| ---------------- | ------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -------- |
| policies | [][urlrule.URLRule](#urlruleURLRule) | Policy definitions | Yes |
| policies | [][ratelimiter.Policy](#ratelimiter.Policy) | Policy definitions | Yes |
| defaultPolicyRef | string | The default policy, if no `policyRef` is configured in one of the `urls`, it uses this policy | No |
| urls | [][resilience.URLRule](#resilienceURLRule) | An array of request match criteria and policy to apply on matched requests. Note that a standalone RateLimiter instance is created for each item of the array, even two or more items can refer to the same policy | Yes |
| urls | [][urlrule.URLRule](#urlrule.URLRule) | An array of request match criteria and policy to apply on matched requests. Note that a standalone RateLimiter instance is created for each item of the array, even two or more items can refer to the same policy | Yes |

### Results

Expand Down
50 changes: 50 additions & 0 deletions pkg/object/gatewaycontroller/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/megaease/easegress/v2/pkg/logger"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
corev1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/informers/internalinterfaces"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -123,6 +125,8 @@ type k8sClient struct {
gwcs *gwclientset.Clientset
gwFactory gwinformers.SharedInformerFactory

dc *dynamic.DynamicClient

eventCh chan interface{}
}

Expand Down Expand Up @@ -176,6 +180,12 @@ func newK8sClient(masterURL string, kubeConfig string) (*k8sClient, error) {
return nil, err
}

dc, err := dynamic.NewForConfig(cfg)
if err != nil {
logger.Errorf("error building dynamic clientset: %s", err.Error())
return nil, err
}

err = checkKubernetesVersion(cfg)
if err != nil {
logger.Errorf("error checking kubernetes version: %s", err.Error())
Expand All @@ -185,6 +195,7 @@ func newK8sClient(masterURL string, kubeConfig string) (*k8sClient, error) {
return &k8sClient{
kcs: kcs,
gwcs: gwcs,
dc: dc,
eventCh: make(chan interface{}, 1),
}, nil
}
Expand Down Expand Up @@ -467,3 +478,42 @@ func (c *k8sClient) isNamespaceWatched(ns string) bool {
}
return false
}

// FilterSpecFromCR is filter spec from kubernetes custom resource.
type FilterSpecFromCR struct {
Name string
Kind string
Spec string
}

var FilterSpecGVR = schema.GroupVersionResource{
Group: "easegress.megaease.com",
Version: "v1",
Resource: "filterspecs",
}

// GetFilterSpecFromCustomResource get filter spec from kubernetes custom resource.
func (c *k8sClient) GetFilterSpecFromCustomResource(namespace string, objName string) (*FilterSpecFromCR, error) {
cr, err := c.dc.Resource(FilterSpecGVR).Namespace(namespace).Get(context.Background(), objName, metav1.GetOptions{})
if err != nil {
return nil, err
}
crSpec := cr.Object["spec"].(map[string]interface{})
name, ok := crSpec["name"].(string)
if !ok {
return nil, fmt.Errorf("custom resource %v/%s does not have string name field", FilterSpecGVR, objName)
}
kind, ok := crSpec["kind"].(string)
if !ok {
return nil, fmt.Errorf("custom resource %v/%s does not have string kind field", FilterSpecGVR, objName)
}
spec, ok := crSpec["spec"].(string)
if !ok {
return nil, fmt.Errorf("custom resource %v/%s does not have string spec field", FilterSpecGVR, objName)
}
return &FilterSpecFromCR{
Name: name,
Kind: kind,
Spec: spec,
}, nil
}
80 changes: 77 additions & 3 deletions pkg/object/gatewaycontroller/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strings"

"github.com/megaease/easegress/v2/pkg/filters"
"github.com/megaease/easegress/v2/pkg/filters/builder"
"github.com/megaease/easegress/v2/pkg/filters/proxies"
"github.com/megaease/easegress/v2/pkg/filters/proxies/httpproxy"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/megaease/easegress/v2/pkg/object/httpserver/routers"
"github.com/megaease/easegress/v2/pkg/object/pipeline"
"github.com/megaease/easegress/v2/pkg/protocols/httpprot/httpheader"
"github.com/megaease/easegress/v2/pkg/resilience"
"github.com/megaease/easegress/v2/pkg/supervisor"
"github.com/megaease/easegress/v2/pkg/util/codectool"
"github.com/megaease/easegress/v2/pkg/util/pathadaptor"
Expand All @@ -53,6 +55,8 @@ type pipelineSpecBuilder struct {
Name string `json:"name"`
pipeline.Spec `json:",inline"`

filters []map[string]interface{} `json:"-"`
resilience []map[string]interface{} `json:"-"`
reqAdaptor *builder.RequestAdaptorSpec `json:"-"`
redirector *redirector.Spec `json:"-"`
respAdaptor *builder.ResponseAdaptorSpec `json:"-"`
Expand All @@ -67,13 +71,26 @@ type httpServerSpecBuilder struct {

func newPipelineSpecBuilder(name string) *pipelineSpecBuilder {
return &pipelineSpecBuilder{
Kind: pipeline.Kind,
Name: name,
Spec: pipeline.Spec{},
Kind: pipeline.Kind,
Name: name,
Spec: pipeline.Spec{},
filters: make([]map[string]interface{}, 0),
resilience: []map[string]interface{}{},
}
}

func (b *pipelineSpecBuilder) jsonConfig() string {
if len(b.filters) > 0 {
for _, f := range b.filters {
kind := f["kind"]
if kind == builder.ResponseAdaptorKind || kind == builder.ResponseBuilderKind {
continue
}
b.Filters = append(b.Filters, f)
b.Flow = append(b.Flow, pipeline.FlowNode{FilterName: f["name"].(string)})
}
}

if b.reqAdaptor != nil {
b.reqAdaptor.BaseSpec.MetaSpec.Name = "requestAdaptor"
b.reqAdaptor.BaseSpec.MetaSpec.Kind = builder.RequestAdaptorKind
Expand All @@ -97,6 +114,15 @@ func (b *pipelineSpecBuilder) jsonConfig() string {
if b.proxy != nil {
b.proxy.BaseSpec.MetaSpec.Name = "proxy"
b.proxy.BaseSpec.MetaSpec.Kind = httpproxy.Kind
for i := range b.proxy.Pools {
for _, r := range b.resilience {
if r["kind"] == resilience.CircuitBreakerKind.Name {
b.proxy.Pools[i].CircuitBreakerPolicy = r["name"].(string)
} else if r["kind"] == resilience.RetryKind.Name {
b.proxy.Pools[i].RetryPolicy = r["name"].(string)
}
}
}
buf, _ := codectool.MarshalJSON(b.proxy)
m := map[string]any{}
codectool.UnmarshalJSON(buf, &m)
Expand All @@ -114,13 +140,48 @@ func (b *pipelineSpecBuilder) jsonConfig() string {
b.Flow = append(b.Flow, pipeline.FlowNode{FilterName: b.respAdaptor.Name()})
}

if len(b.filters) > 0 {
for _, f := range b.filters {
kind := f["kind"]
if kind == builder.ResponseAdaptorKind || kind == builder.ResponseBuilderKind {
b.Filters = append(b.Filters, f)
b.Flow = append(b.Flow, pipeline.FlowNode{FilterName: f["name"].(string)})
}
}
}

b.Resilience = b.resilience

buf, err := codectool.MarshalJSON(b)
if err != nil {
logger.Errorf("BUG: marshal %#v to json failed: %v", b, err)
}
return string(buf)
}

func (b *pipelineSpecBuilder) addExtensionRef(spec *FilterSpecFromCR) {
data := map[string]interface{}{}
err := codectool.UnmarshalYAML([]byte(spec.Spec), &data)
if err != nil {
logger.Errorf("unmarshal filter spec %v failed: %v", spec, err)
return
}
data["name"] = spec.Name
data["kind"] = spec.Kind

if spec.Kind == "CircuitBreaker" || spec.Kind == "Retry" {
b.resilience = append(b.resilience, data)
return
}

kind := filters.GetKind(spec.Kind)
if kind == nil {
logger.Errorf("unknown filter kind %s in extensionRef", spec.Kind)
return
}
b.filters = append(b.filters, data)
}

func (b *pipelineSpecBuilder) addRequestHeaderModifier(f *gwapis.HTTPHeaderFilter) {
if b.reqAdaptor == nil {
b.reqAdaptor = &builder.RequestAdaptorSpec{}
Expand Down Expand Up @@ -273,6 +334,7 @@ func (st *specTranslator) pipelineSpecs() map[string]*supervisor.Spec {

for _, sb := range st.pipelines {
cfg := sb.jsonConfig()
logger.Debugf("pipeline spec: %s", cfg)
spec, err := supervisor.NewSpec(cfg)
if err != nil {
logger.Errorf("failed to build pipeline spec: %v", err)
Expand Down Expand Up @@ -496,6 +558,18 @@ func (st *specTranslator) translatePipeline(ns, name string, r *gwapis.HTTPRoute
}
case gwapis.HTTPRouteFilterResponseHeaderModifier:
sb.addResponseHeaderModifier(f.ResponseHeaderModifier)
case gwapis.HTTPRouteFilterExtensionRef:
g := string(f.ExtensionRef.Group)
if g != FilterSpecGVR.Group {
logger.Errorf("extension group %s is not supported, only support %s", g, FilterSpecGVR.Group)
continue
}
filterSpec, err := st.k8sClient.GetFilterSpecFromCustomResource(ns, string(f.ExtensionRef.Name))
if err != nil {
logger.Errorf("failed to get filter spec %s/%s/%s, %v", ns, FilterSpecGVR.Group, f.ExtensionRef.Name, err)
continue
}
sb.addExtensionRef(filterSpec)
}
}

Expand Down
Loading