As the diagram described, the cluster does data synchronization of all nodes, and the supervisor manages the lifecycle of all kinds of objects:
- System Controller: Every instance of EG has one and only one kind of it.
- Business Controller: The component does its own task which does not directly handle the traffic.
- Traffic Gate: It receives traffic of different protocols, and dispatches them to pipelines.
- Pipeline: It is a filter chain that handles traffic from the traffic gate.
We try to follow the go project layout standard, and the important directories are described below:
.
├── bin // executable binary
├── cmd // command source code
├── doc // documents
├── pkg // importable golang packages
│ ├── api // restful api layer
│ ├── cluster // cluster component
│ ├── common // some common utilies
│ ├── context // context for traffic gate and pipeline
│ ├── env // preparation for running environment
│ ├── filter // filters bucket
│ ├── graceupdate // graceful update
│ ├── logger // logger utilities
│ ├── object // controllers bucket
│ ├── option // startup arguments utilities
│ ├── pidfile // handle file to record pid
│ ├── profile // dedicated pprof
│ ├── protocol // decoupling for protocol
│ ├── registry // registry for all dynamic registering component
│ ├── storage // distributed storage wrapper
│ ├── supervisor // the supervisor to manage controllers
│ ├── tracing // distributed tracing
│ ├── util // all kinds of utilities
│ ├── v // validation tool
│ └── version // release version
├── test // scripts of integration testing
The first job is to choose which category the object is. The architecture part has described them well enough. In most cases, create a new business controller is the best choice to extend the ability of EG at the object level. So we will develop a lightweight business controller to show the detail. For example, we want to develop a controller to dump the status of all objects to a local file. Let's name the kind of controller StatusInLocalController
, so the config of the controller:
kind: StatusInLocalController
name: statusInLocal
path: ./running_status.yaml
We put the controller package in pkg/object/statusinlocalcontroller
. We could implement the main logic in pkg/object/statusinlocalcontroller/statusinlocalcontroller.go
:
The supervisor has all references of running objects, so we need invoke supervisor to get status of running objects, and the main business code would be:
type (
// StatusInLocalController posts status of all objects in a local file.
StatusInLocalController struct {
super *supervisor.Supervisor
superSpec *supervisor.Spec
spec *Spec
done chan struct{}
}
// Spec describes StatusInLocalController.
Spec struct {
Path string `yaml:"path" jsonschema:"required"`
}
// Entry is the structure of the status file
Entry struct {
Statuses map[string]interface{}
UnixTimestmp int64
}
)
func (c *StatusInLocalController) syncStatus() {
// Step1: Use entry to record status of all running objects.
entry := &Entry{
UnixTimestmp: time.Now().Unix(),
Statuses: make(map[string]interface{}),
}
walkFn := func(runningObject *supervisor.RunningObject) bool {
defer func() {
if err := recover(); err != nil {
logger.Errorf("recover from syncStatus, err: %v, stack trace:\n%s\n",
err, debug.Stack())
}
}()
name := runningObject.Spec().Name()
entry.Statuses[name] = runningObject.Instance().Status().ObjectStatus
return true
}
c.super.WalkRunningObjects(walkFn, supervisor.CategoryAll)
// Step2: Write the status to the local file.
buff, err := yaml.Marshal(entry)
if err != nil {
logger.Errorf("BUG: marshal %#v to yaml failed: %v",
entry, err)
return
}
ioutil.WriteFile(c.spec.Path, buff, 0644)
}
All objects must satisfy the interface Object
in pkg/object/supervisor/registry.go
.
package statusinlocalcontroller
import (
"io/ioutil"
"runtime/debug"
"time"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/supervisor"
"gopkg.in/yaml.v2"
)
const (
// Kind is the kind of StatusInLocalController.
Kind = "StatusInLocalController"
)
type (
// StatusInLocalController posts status of all objects in a local file.
StatusInLocalController struct {
super *supervisor.Supervisor
superSpec *supervisor.Spec
spec *Spec
done chan struct{}
}
// Spec describes StatusInLocalController.
Spec struct {
Path string `yaml:"path" jsonschema:"required"`
}
// Entry is the structure of the status file.
Entry struct {
Statuses map[string]interface{}
UnixTimestmp int64
}
)
// init registers itself to supervisor registry.
func init() { supervisor.Register(&StatusInLocalController{}) }
// Category returns the category of StatusInLocalController.
func (c *StatusInLocalController) Category() supervisor.ObjectCategory {
return supervisor.CategoryBusinessController
}
// Kind return the kind of StatusInLocalController.
func (c *StatusInLocalController) Kind() string { return Kind }
// DefaultSpec returns the default spec of StatusInLocalController.
func (c *StatusInLocalController) DefaultSpec() interface{} { return &Spec{} }
// Init initializes StatusInLocalController.
func (c *StatusInLocalController) Init(superSpec *supervisor.Spec, super *supervisor.Supervisor) {
c.superSpec, c.spec, c.super = superSpec, superSpec.ObjectSpec().(*Spec), super
c.reload()
}
// Inherit inherits previous generation of StatusInLocalController.
func (c *StatusInLocalController) Inherit(spec *supervisor.Spec,
previousGeneration supervisor.Object, super *supervisor.Supervisor) {
previousGeneration.Close()
c.Init(spec, super)
}
func (c *StatusInLocalController) reload() {
c.done = make(chan struct{})
go c.run()
}
func (c *StatusInLocalController) run() {
for {
select {
case <-time.After(5 * time.Second):
c.syncStatus()
case <-c.done:
return
}
}
}
// Status returns the status of StatusInLocalController.
func (c *StatusInLocalController) Status() *supervisor.Status {
return &supervisor.Status{
ObjectStatus: struct{}{},
}
}
// Close closes StatusInLocalController.
func (c *StatusInLocalController) Close() {
close(c.done)
}
In most scenarios of handling traffic, do the second development of filters is the right choice, since its scheduling is covered by the flexible pipeline. The filter only does its own business, for example, we want to develop a filter to count the number of requests which have the specified header. Let's name the kind of filter headerCounter
, so the config of the filter in pipeline spec would be:
filters:
- kind: HeaderCounter
name: headerCounter
headers: ['Cookie', 'Authorization']
We put the filter package in pkg/filter/headercounter
. We could implement the main logic counting the header in pkg/filter/headercounter/headercounter.go
:
type (
HeaderCounter struct {
super *supervisor.Supervisor // The supervisor runtime.
pipeSpec *httppipeline.FilterSpec // The filter spec in pipeline level, which has two more fiels: kind and name.
spec *Spec // The filter spec in its own level.
// The read and write for count must be locked, because the Handle is called concurrently.
countMutex sync.Mutex
count map[string]int64
}
Spec struct {
Headers []string `yaml:"headers"`
}
)
func (m *HeaderCounter) Handle(ctx context.HTTPContext) (result string) {
for _, key := range m.spec.Headers {
value := ctx.Request().Header().Get(key)
if value != "" {
m.countMutex.Lock()
m.count[key]++
m.countMutex.Unlock()
}
}
// NOTE: The filter must call the next handler to satisfy the Chain of Responsibility Pattern.
return ctx.CallNextHandler("")
}
Our core logic is very simple, now let's add some non-business code to make our new filter conform with the requirement of the Pipeline framework. All filters must satisfy the interface Filter
in pkg/object/httppipeline/registry.go
.
All of the methods with their names and comments are clean, the only one we need to emphasize is Inherit
, it will be called when the pipeline is updated but the filter with the same name and kind has still existed. It's the filter's own responsibility to do hot-update in Inherit
such as transferring meaningful consecutive data.
// init registers itself to pipeline registry.
func init() { httppipeline.Register(&HeaderCounter{}) }
// Kind returns the kind of HeaderCounter.
func (hc *HeaderCounter) Kind() string { return "HeaderCounter" }
// DefaultSpec returns default spec of HeaderCounter.
func (hc *HeaderCounter) DefaultSpec() interface{} { return &Spec{} }
// Description returns the description of HeaderCounter.
func (hc *HeaderCounter) Description() string {
return "HeaderCounter counts the number of requests which contain the specified header."
}
// Results returns the results of HeaderCounter.
func (hc *HeaderCounter) Results() []string { return nil }
// Init initializes HeaderCounter.
func (hc *HeaderCounter) Init(pipeSpec *httppipeline.FilterSpec, super *supervisor.Supervisor) {
hc.pipeSpec, hc.spec, hc.super = pipeSpec, pipeSpec.FilterSpec().(*Spec), super
hc.reload()
}
// Inherit inherits previous generation of HeaderCounter.
func (hc *HeaderCounter) Inherit(pipeSpec *httppipeline.FilterSpec,
previousGeneration httppipeline.Filter, super *supervisor.Supervisor) {
previousGeneration.Close()
hc.Init(pipeSpec, super)
}
func (m *HeaderCounter) reload() {
m.count = make(map[string]int64)
}
// Status returns status.
func (m *HeaderCounter) Status() interface{} {
m.countMutex.Lock()
defer m.countMutex.Unlock()
return m.count
}
// Close closes HeaderCounter.
func (m *HeaderCounter) Close() {}
Then we need to add the import line in the pkg/registry/registry.go
:
import (
_ "github.com/megaease/easegress/pkg/filter/headercounter
)
As we described in the get started, the pipeline below uses the result of validator
:
name: pipeline-demo
kind: HTTPPipeline
flow:
- filter: validator
jumpIf: { invalid: END }
- filter: requestAdaptor
- filter: proxy
That jumpIf
means the request will jump into the end without going through requestAdaptor
and proxy
if the validator
returns the result invalid
. So the method Results
is to register all possible results of the filter. In the example of HeaderCounter
, the empty results mean Handle
only returns the empty result. So if we want to prevent requests which haven't any counting headers from going forward to next filters, we could change it to:
const resultInvalidHeader = "invalidHeader"
// Results returns the results of HeaderCounter.
func (hc *HeaderCounter) Results() []string {
return []string{resultInvalidHeader} // New code
}
// Handle counts the header of the requests.
func (m *HeaderCounter) Handle(ctx context.HTTPContext) (result string) {
counted := false // New code
for _, key := range m.spec.Headers {
value := ctx.Request().Header().Get(key)
if value != "" {
m.countMutex.Lock()
counted = true // New code
m.count[key]++
m.countMutex.Unlock()
}
}
if !counted { // New code
return resultInvalidHeader // New code
} // New code
return ""
}