Skip to content

Commit

Permalink
Support easegress filters and resilience in k8s gateway via extension…
Browse files Browse the repository at this point in the history
…Ref (#1140)

* add custom resource for filter spec

* add extensionref for all filters and resiliences

* update doc

* update doc

* add resilience to proxy

* add log

* add resilience policy to proxy filter
  • Loading branch information
suchen-sci authored Nov 22, 2023
1 parent 84ad751 commit 54350cb
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 5 deletions.
129 changes: 129 additions & 0 deletions docs/04.Cloud-Native/4.2.Gateway-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [Deploy Gateway API Objects](#deploy-gateway-api-objects)
- [Expose The Listener Port](#expose-the-listener-port)
- [Verification](#verification)
- [Extensions](#extensions)


The `GatewayController` is an implementation of
Expand Down Expand Up @@ -48,6 +49,10 @@ rules:
- apiGroups: ["gateway.networking.k8s.io"]
resources: ["gatewayclasses/status", "gateways/status"]
verbs: ["update"]
# used for extensions
- apiGroups: ["easegress.megaease.com"]
resources: ["filterspecs"]
verbs: ["get", "watch", "list"]

---
apiVersion: v1
Expand Down Expand Up @@ -355,3 +360,127 @@ Hello, world!
Version: 2.0.0
Hostname: hello-deployment-7855bc9747-n4qsx
```

### Extensions

To leverage additional Easegress features (such as more filters and resilience policies) with the Kubernetes Gateway API, you can use `extensionRef`s in `HTTPRoute`.


First, you need to create custom resources:

```yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: filterspecs.easegress.megaease.com
spec:
group: easegress.megaease.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
name:
type: string
kind:
type: string
spec:
type: string
scope: Namespaced
names:
plural: filterspecs
singular: filterspec
kind: FilterSpec
```

Next, define some custom resources:

```yaml
apiVersion: easegress.megaease.com/v1
kind: FilterSpec
metadata:
name: rate-limiter
spec:
name: rate-limiter
kind: RateLimiter
spec: |
policies:
- name: policy
limitRefreshPeriod: 1000ms
limitForPeriod: 1
defaultPolicyRef: policy
urls:
- url:
prefix: /
policyRef: policy
---
apiVersion: easegress.megaease.com/v1
kind: FilterSpec
metadata:
name: circuit-breaker
spec:
name: circuit-breaker
kind: CircuitBreaker
spec: |
slidingWindowType: COUNT_BASED
failureRateThreshold: 50
slidingWindowSize: 100
slowCallRateThreshold: 60
slowCallDurationThreshold: 30s
minimumNumberOfCalls: 10
waitDurationInOpenState: 2m
maxWaitDurationInHalfOpenState: 1m
permittedNumberOfCallsInHalfOpenState: 10
```

Here, we define a `RateLimiter` filter and a `CircuitBreaker` resilience policy.

These can be referenced in an `HTTPRoute`:

```yaml
apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
metadata:
name: example-route-3
spec:
parentRefs:
- kind: Gateway
name: example-gateway
sectionName: example-listener
hostnames:
- megaease.com
rules:
- matches:
- path:
value: /3
filters:
- type: ExtensionRef
extensionRef:
group: "easegress.megaease.com"
kind: "FilterSpec"
name: "rate-limiter"
- type: ExtensionRef
extensionRef:
group: "easegress.megaease.com"
kind: "FilterSpec"
name: "circuit-breaker"
backendRefs:
- name: hello-service
port: 60002
```

With this setup, the `rate-limiter` and `circuit-breaker` from `FilterSpec` in the `easegress.megaease.com` group are utilized in the `HTTPRoute`. In fact, all Easegress filters and resilience policies can be extended in this manner.

The execution order of these filters is as follows:

- Filters in `extensionRef`, except for `ResponseAdaptor` and `ResponseBuilder`, in the order of their definition.
- Kubernetes built-in filters, such as `RequestHeaderModifier`.
- `ResponseAdaptor` and `ResponseBuilder`, if they exist.
4 changes: 2 additions & 2 deletions docs/07.Reference/7.02.Filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -626,9 +626,9 @@ urls:

| Name | Type | Description | Required |
| ---------------- | ------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -------- |
| policies | [][urlrule.URLRule](#urlruleURLRule) | Policy definitions | Yes |
| policies | [][ratelimiter.Policy](#ratelimiter.Policy) | Policy definitions | Yes |
| defaultPolicyRef | string | The default policy, if no `policyRef` is configured in one of the `urls`, it uses this policy | No |
| urls | [][resilience.URLRule](#resilienceURLRule) | An array of request match criteria and policy to apply on matched requests. Note that a standalone RateLimiter instance is created for each item of the array, even two or more items can refer to the same policy | Yes |
| urls | [][urlrule.URLRule](#urlrule.URLRule) | An array of request match criteria and policy to apply on matched requests. Note that a standalone RateLimiter instance is created for each item of the array, even two or more items can refer to the same policy | Yes |

### Results

Expand Down
50 changes: 50 additions & 0 deletions pkg/object/gatewaycontroller/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/megaease/easegress/v2/pkg/logger"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
corev1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/informers/internalinterfaces"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -123,6 +125,8 @@ type k8sClient struct {
gwcs *gwclientset.Clientset
gwFactory gwinformers.SharedInformerFactory

dc *dynamic.DynamicClient

eventCh chan interface{}
}

Expand Down Expand Up @@ -176,6 +180,12 @@ func newK8sClient(masterURL string, kubeConfig string) (*k8sClient, error) {
return nil, err
}

dc, err := dynamic.NewForConfig(cfg)
if err != nil {
logger.Errorf("error building dynamic clientset: %s", err.Error())
return nil, err
}

err = checkKubernetesVersion(cfg)
if err != nil {
logger.Errorf("error checking kubernetes version: %s", err.Error())
Expand All @@ -185,6 +195,7 @@ func newK8sClient(masterURL string, kubeConfig string) (*k8sClient, error) {
return &k8sClient{
kcs: kcs,
gwcs: gwcs,
dc: dc,
eventCh: make(chan interface{}, 1),
}, nil
}
Expand Down Expand Up @@ -467,3 +478,42 @@ func (c *k8sClient) isNamespaceWatched(ns string) bool {
}
return false
}

// FilterSpecFromCR is filter spec from kubernetes custom resource.
type FilterSpecFromCR struct {
Name string
Kind string
Spec string
}

var FilterSpecGVR = schema.GroupVersionResource{
Group: "easegress.megaease.com",
Version: "v1",
Resource: "filterspecs",
}

// GetFilterSpecFromCustomResource get filter spec from kubernetes custom resource.
func (c *k8sClient) GetFilterSpecFromCustomResource(namespace string, objName string) (*FilterSpecFromCR, error) {
cr, err := c.dc.Resource(FilterSpecGVR).Namespace(namespace).Get(context.Background(), objName, metav1.GetOptions{})
if err != nil {
return nil, err
}
crSpec := cr.Object["spec"].(map[string]interface{})
name, ok := crSpec["name"].(string)
if !ok {
return nil, fmt.Errorf("custom resource %v/%s does not have string name field", FilterSpecGVR, objName)
}
kind, ok := crSpec["kind"].(string)
if !ok {
return nil, fmt.Errorf("custom resource %v/%s does not have string kind field", FilterSpecGVR, objName)
}
spec, ok := crSpec["spec"].(string)
if !ok {
return nil, fmt.Errorf("custom resource %v/%s does not have string spec field", FilterSpecGVR, objName)
}
return &FilterSpecFromCR{
Name: name,
Kind: kind,
Spec: spec,
}, nil
}
80 changes: 77 additions & 3 deletions pkg/object/gatewaycontroller/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strings"

"github.com/megaease/easegress/v2/pkg/filters"
"github.com/megaease/easegress/v2/pkg/filters/builder"
"github.com/megaease/easegress/v2/pkg/filters/proxies"
"github.com/megaease/easegress/v2/pkg/filters/proxies/httpproxy"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/megaease/easegress/v2/pkg/object/httpserver/routers"
"github.com/megaease/easegress/v2/pkg/object/pipeline"
"github.com/megaease/easegress/v2/pkg/protocols/httpprot/httpheader"
"github.com/megaease/easegress/v2/pkg/resilience"
"github.com/megaease/easegress/v2/pkg/supervisor"
"github.com/megaease/easegress/v2/pkg/util/codectool"
"github.com/megaease/easegress/v2/pkg/util/pathadaptor"
Expand All @@ -53,6 +55,8 @@ type pipelineSpecBuilder struct {
Name string `json:"name"`
pipeline.Spec `json:",inline"`

filters []map[string]interface{} `json:"-"`
resilience []map[string]interface{} `json:"-"`
reqAdaptor *builder.RequestAdaptorSpec `json:"-"`
redirector *redirector.Spec `json:"-"`
respAdaptor *builder.ResponseAdaptorSpec `json:"-"`
Expand All @@ -67,13 +71,26 @@ type httpServerSpecBuilder struct {

func newPipelineSpecBuilder(name string) *pipelineSpecBuilder {
return &pipelineSpecBuilder{
Kind: pipeline.Kind,
Name: name,
Spec: pipeline.Spec{},
Kind: pipeline.Kind,
Name: name,
Spec: pipeline.Spec{},
filters: make([]map[string]interface{}, 0),
resilience: []map[string]interface{}{},
}
}

func (b *pipelineSpecBuilder) jsonConfig() string {
if len(b.filters) > 0 {
for _, f := range b.filters {
kind := f["kind"]
if kind == builder.ResponseAdaptorKind || kind == builder.ResponseBuilderKind {
continue
}
b.Filters = append(b.Filters, f)
b.Flow = append(b.Flow, pipeline.FlowNode{FilterName: f["name"].(string)})
}
}

if b.reqAdaptor != nil {
b.reqAdaptor.BaseSpec.MetaSpec.Name = "requestAdaptor"
b.reqAdaptor.BaseSpec.MetaSpec.Kind = builder.RequestAdaptorKind
Expand All @@ -97,6 +114,15 @@ func (b *pipelineSpecBuilder) jsonConfig() string {
if b.proxy != nil {
b.proxy.BaseSpec.MetaSpec.Name = "proxy"
b.proxy.BaseSpec.MetaSpec.Kind = httpproxy.Kind
for i := range b.proxy.Pools {
for _, r := range b.resilience {
if r["kind"] == resilience.CircuitBreakerKind.Name {
b.proxy.Pools[i].CircuitBreakerPolicy = r["name"].(string)
} else if r["kind"] == resilience.RetryKind.Name {
b.proxy.Pools[i].RetryPolicy = r["name"].(string)
}
}
}
buf, _ := codectool.MarshalJSON(b.proxy)
m := map[string]any{}
codectool.UnmarshalJSON(buf, &m)
Expand All @@ -114,13 +140,48 @@ func (b *pipelineSpecBuilder) jsonConfig() string {
b.Flow = append(b.Flow, pipeline.FlowNode{FilterName: b.respAdaptor.Name()})
}

if len(b.filters) > 0 {
for _, f := range b.filters {
kind := f["kind"]
if kind == builder.ResponseAdaptorKind || kind == builder.ResponseBuilderKind {
b.Filters = append(b.Filters, f)
b.Flow = append(b.Flow, pipeline.FlowNode{FilterName: f["name"].(string)})
}
}
}

b.Resilience = b.resilience

buf, err := codectool.MarshalJSON(b)
if err != nil {
logger.Errorf("BUG: marshal %#v to json failed: %v", b, err)
}
return string(buf)
}

func (b *pipelineSpecBuilder) addExtensionRef(spec *FilterSpecFromCR) {
data := map[string]interface{}{}
err := codectool.UnmarshalYAML([]byte(spec.Spec), &data)
if err != nil {
logger.Errorf("unmarshal filter spec %v failed: %v", spec, err)
return
}
data["name"] = spec.Name
data["kind"] = spec.Kind

if spec.Kind == "CircuitBreaker" || spec.Kind == "Retry" {
b.resilience = append(b.resilience, data)
return
}

kind := filters.GetKind(spec.Kind)
if kind == nil {
logger.Errorf("unknown filter kind %s in extensionRef", spec.Kind)
return
}
b.filters = append(b.filters, data)
}

func (b *pipelineSpecBuilder) addRequestHeaderModifier(f *gwapis.HTTPHeaderFilter) {
if b.reqAdaptor == nil {
b.reqAdaptor = &builder.RequestAdaptorSpec{}
Expand Down Expand Up @@ -273,6 +334,7 @@ func (st *specTranslator) pipelineSpecs() map[string]*supervisor.Spec {

for _, sb := range st.pipelines {
cfg := sb.jsonConfig()
logger.Debugf("pipeline spec: %s", cfg)
spec, err := supervisor.NewSpec(cfg)
if err != nil {
logger.Errorf("failed to build pipeline spec: %v", err)
Expand Down Expand Up @@ -496,6 +558,18 @@ func (st *specTranslator) translatePipeline(ns, name string, r *gwapis.HTTPRoute
}
case gwapis.HTTPRouteFilterResponseHeaderModifier:
sb.addResponseHeaderModifier(f.ResponseHeaderModifier)
case gwapis.HTTPRouteFilterExtensionRef:
g := string(f.ExtensionRef.Group)
if g != FilterSpecGVR.Group {
logger.Errorf("extension group %s is not supported, only support %s", g, FilterSpecGVR.Group)
continue
}
filterSpec, err := st.k8sClient.GetFilterSpecFromCustomResource(ns, string(f.ExtensionRef.Name))
if err != nil {
logger.Errorf("failed to get filter spec %s/%s/%s, %v", ns, FilterSpecGVR.Group, f.ExtensionRef.Name, err)
continue
}
sb.addExtensionRef(filterSpec)
}
}

Expand Down

0 comments on commit 54350cb

Please sign in to comment.