diff --git a/plugin/runner.go b/plugin/runner.go new file mode 100644 index 00000000..6e02b101 --- /dev/null +++ b/plugin/runner.go @@ -0,0 +1,161 @@ +package plugin + +import ( + "context" + "errors" + "fmt" + "io/fs" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + pluginapp "github.com/grafana/grafana-plugin-sdk-go/backend/app" + "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" + "k8s.io/client-go/rest" + + "github.com/grafana/grafana-app-sdk/app" + "github.com/grafana/grafana-app-sdk/metrics" + "github.com/grafana/grafana-app-sdk/plugin/runner" +) + +var ( + _ backend.CheckHealthHandler = (*Runner)(nil) + _ backend.CallResourceHandler = (*Runner)(nil) + _ backend.QueryDataHandler = (*Runner)(nil) + _ backend.AdmissionHandler = (*Runner)(nil) + _ backend.ConversionHandler = (*Runner)(nil) +) + +type RunnerConfig struct { + // MetricsConfig contains the configuration for exposing prometheus metrics, if desired + MetricsConfig RunnerMetricsConfig + // KubeConfig is the kubernetes rest.Config to use when communicating with the API server + KubeConfig rest.Config + // Filesystem is an fs.FS that can be used in lieu of the OS filesystem. + // if empty, it defaults to os.DirFS(".") + Filesystem fs.FS +} + +// RunnerMetricsConfig contains configuration information for exposing prometheus metrics +type RunnerMetricsConfig struct { + metrics.ExporterConfig + Enabled bool + Namespace string +} + +// Runner runs an app.App as a Grafana Plugin, capable of exposing admission (validation, mutation) +// and conversion as webhooks, and running a main control loop with reconcilers and watchers. +// It relies on the Kinds managed by the app.App already existing in the API server it talks to, either as CRD's +// or another type. It does not support certain advanced app.App functionality which is not natively supported by +// CRDs, such as arbitrary subresources (app.App.CallSubresource). It should be instantiated with NewRunner. +type Runner struct { + config RunnerConfig + pluginRunner *runner.PluginRunner + metricsServer *runner.MetricsServerRunner +} + +// NewRunner creates a new, properly-initialized instance of a Runner +func NewRunner(cfg RunnerConfig) *Runner { + op := Runner{ + config: cfg, + } + + if cfg.MetricsConfig.Enabled { + exporter := metrics.NewExporter(cfg.MetricsConfig.ExporterConfig) + op.metricsServer = runner.NewMetricsServerRunner(exporter) + } + return &op +} + +// Run runs the Runner for the app built from the provided app.AppProvider, until the provided context.Context is closed, +// or an unrecoverable error occurs. If an app.App cannot be instantiated from the app.AppProvider, an error will be returned. +func (r *Runner) Run(ctx context.Context, provider app.Provider) error { + if provider == nil { + return errors.New("provider cannot be nil") + } + + // only embedded manifests are supported for now + manifest := provider.Manifest() + if manifest.ManifestData == nil { + return fmt.Errorf("missing embeded app manifest data") + } + appConfig := app.Config{ + KubeConfig: r.config.KubeConfig, + ManifestData: *manifest.ManifestData, + SpecificConfig: provider.SpecificConfig(), + } + + // Create the app + a, err := provider.NewApp(appConfig) + if err != nil { + return err + } + + r.pluginRunner = runner.NewPluginRunner(a) + + // Build the operator + runner := app.NewMultiRunner() + + // Main loop + mainRunner := a.Runner() + if mainRunner != nil { + runner.AddRunnable(mainRunner) + } + + // Metrics + if r.metricsServer != nil { + err = r.metricsServer.RegisterCollectors(runner.PrometheusCollectors()...) + if err != nil { + return err + } + runner.AddRunnable(r.metricsServer) + } + + return runner.Run(ctx) +} + +func (r *Runner) GetInstanceFactoryFunc() pluginapp.InstanceFactoryFunc { + return func(_ context.Context, _ backend.AppInstanceSettings) (instancemgmt.Instance, error) { + return r, nil + } +} + +func (r *Runner) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + if r.pluginRunner == nil { + return nil, errors.New("pluginRunner not initialized") + } + return r.pluginRunner.QueryData(ctx, req) +} + +func (r *Runner) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + if r.pluginRunner == nil { + return nil, errors.New("pluginRunner not initialized") + } + return r.pluginRunner.CheckHealth(ctx, req) +} + +func (r *Runner) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + if r.pluginRunner == nil { + return errors.New("pluginRunner not initialized") + } + return r.pluginRunner.CallResource(ctx, req, sender) +} + +func (r *Runner) MutateAdmission(ctx context.Context, req *backend.AdmissionRequest) (*backend.MutationResponse, error) { + if r.pluginRunner == nil { + return nil, errors.New("pluginRunner not initialized") + } + return r.pluginRunner.MutateAdmission(ctx, req) +} + +func (r *Runner) ValidateAdmission(ctx context.Context, req *backend.AdmissionRequest) (*backend.ValidationResponse, error) { + if r.pluginRunner == nil { + return nil, errors.New("pluginRunner not initialized") + } + return r.pluginRunner.ValidateAdmission(ctx, req) +} + +func (r *Runner) ConvertObjects(ctx context.Context, req *backend.ConversionRequest) (*backend.ConversionResponse, error) { + if r.pluginRunner == nil { + return nil, errors.New("pluginRunner not initialized") + } + return r.pluginRunner.ConvertObjects(ctx, req) +} diff --git a/plugin/runner/metrics.go b/plugin/runner/metrics.go new file mode 100644 index 00000000..840b6437 --- /dev/null +++ b/plugin/runner/metrics.go @@ -0,0 +1,43 @@ +package runner + +import ( + "context" + + "github.com/grafana/grafana-app-sdk/app" + "github.com/grafana/grafana-app-sdk/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +func NewMetricsServerRunner(exporter *metrics.Exporter) *MetricsServerRunner { + return &MetricsServerRunner{ + server: exporter, + runner: app.NewSingletonRunner(&k8sRunnable{ + runner: exporter, + }, false), + } +} + +type MetricsServerRunner struct { + runner *app.SingletonRunner + server *metrics.Exporter +} + +func (m *MetricsServerRunner) Run(ctx context.Context) error { + return m.runner.Run(ctx) +} + +func (m *MetricsServerRunner) RegisterCollectors(collectors ...prometheus.Collector) error { + return m.server.RegisterCollectors(collectors...) +} + +type k8sRunner interface { + Run(<-chan struct{}) error +} + +type k8sRunnable struct { + runner k8sRunner +} + +func (k *k8sRunnable) Run(ctx context.Context) error { + return k.runner.Run(ctx.Done()) +} diff --git a/plugin/runner/plugin.go b/plugin/runner/plugin.go new file mode 100644 index 00000000..005cc4cf --- /dev/null +++ b/plugin/runner/plugin.go @@ -0,0 +1,177 @@ +package runner + +import ( + "bytes" + "context" + "errors" + "net/http" + + "github.com/grafana/grafana-app-sdk/app" + "github.com/grafana/grafana-app-sdk/resource" + "github.com/grafana/grafana-plugin-sdk-go/backend" +) + +var ( + _ backend.ConversionHandler = (*PluginRunner)(nil) + _ backend.AdmissionHandler = (*PluginRunner)(nil) + _ backend.QueryDataHandler = (*PluginRunner)(nil) + _ backend.CheckHealthHandler = (*PluginRunner)(nil) + _ backend.CallResourceHandler = (*PluginRunner)(nil) +) + +func NewPluginRunner(app app.App) *PluginRunner { + return &PluginRunner{ + app: app, + codec: resource.NewJSONCodec(), + } +} + +type PluginRunner struct { + app app.App + codec *resource.JSONCodec +} + +func (r *PluginRunner) Run(ctx context.Context) error { + <-ctx.Done() + if ctx.Err() == context.Canceled { + return nil + } + return ctx.Err() +} + +func (r *PluginRunner) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + return nil, errors.New("not implemented") +} + +func (r *PluginRunner) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + return &backend.CheckHealthResult{ + Status: backend.HealthStatusOk, + }, nil +} + +func (r *PluginRunner) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + customReq := &app.ResourceCustomRouteRequest{ + // TODO: why is this needed? + // ResourceIdentifier: resource.FullIdentifier{}, + SubresourcePath: req.Path, + Method: req.Method, + Headers: req.Headers, + Body: req.Body, + } + + res, err := r.app.CallResourceCustomRoute(ctx, customReq) + if err != nil { + return err + } + + return sender.Send(&backend.CallResourceResponse{ + Status: res.StatusCode, + Headers: res.Headers, + Body: res.Body, + }) +} + +func (r *PluginRunner) MutateAdmission(ctx context.Context, req *backend.AdmissionRequest) (*backend.MutationResponse, error) { + res := &backend.MutationResponse{ + Allowed: false, + Result: &backend.StatusResult{ + Status: "Failure", + Message: "", + Reason: "", + Code: http.StatusBadRequest, + }, + Warnings: []string{}, + ObjectBytes: []byte{}, + } + admissionReq, err := r.translateAdmissionRequest(req) + if err != nil { + res.Result.Message = err.Error() + return res, nil + } + + mutatingResponse, err := r.app.Mutate(ctx, admissionReq) + if err != nil { + res.Result.Message = err.Error() + return res, nil + } + + raw := bytes.NewBuffer([]byte{}) + if err := r.codec.Write(raw, mutatingResponse.UpdatedObject); err != nil { + res.Result.Message = err.Error() + return res, nil + } + + res.Allowed = true + res.Result.Status = "Success" + res.Result.Code = http.StatusOK + res.ObjectBytes = raw.Bytes() + return res, nil +} + +func (r *PluginRunner) ValidateAdmission(ctx context.Context, req *backend.AdmissionRequest) (*backend.ValidationResponse, error) { + admissionReq, err := r.translateAdmissionRequest(req) + if err != nil { + return nil, err + } + + err = r.app.Validate(ctx, admissionReq) + code := http.StatusBadRequest + statusMessage := "Failure" + errorMessage := "" + if err == nil { + statusMessage = "Success" + code = http.StatusOK + } else { + errorMessage = err.Error() + } + + status := backend.StatusResult{ + Status: statusMessage, + Reason: errorMessage, + Code: int32(code), + } + + return &backend.ValidationResponse{ + Allowed: err == nil, + Result: &status, + }, nil +} + +func (r *PluginRunner) ConvertObjects(ctx context.Context, req *backend.ConversionRequest) (*backend.ConversionResponse, error) { + return nil, errors.New("not implemented") +} + +func (r *PluginRunner) translateAdmissionRequest(req *backend.AdmissionRequest) (*app.AdmissionRequest, error) { + var action resource.AdmissionAction + + switch req.Operation { + case backend.AdmissionRequestCreate: + action = resource.AdmissionActionCreate + case backend.AdmissionRequestUpdate: + action = resource.AdmissionActionUpdate + case backend.AdmissionRequestDelete: + action = resource.AdmissionActionDelete + } + + var newObj resource.Object + var oldObj resource.Object + + if err := r.codec.Read(bytes.NewReader(req.ObjectBytes), newObj); err != nil { + return nil, err + } + + if req.OldObjectBytes != nil { + if err := r.codec.Read(bytes.NewReader(req.OldObjectBytes), oldObj); err != nil { + return nil, err + } + } + + return &app.AdmissionRequest{ + Action: action, + Object: newObj, + OldObject: oldObj, + Kind: req.Kind.Kind, + Group: req.Kind.Group, + Version: req.Kind.Version, + }, nil +}