Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Add connection to optional remote ers #1601

Draft
wants to merge 1 commit into
base: separate-keycloak-ers-implementation
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions opentdf-example-remote-ers.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
mode: all
modeoptions:
remoteERSUrl: http://localhost:8181
Comment on lines +2 to +3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are your thoughts if we moved this configuration underneath SDKConfig. Trying to think how we could layer in other options for our internal products as well.

type SDKConfig struct {
// Endpoint is the URL of the Core Platform endpoint.
Endpoint string `mapstructure:"endpoint" json:"endpoint"`
// Plaintext specifies whether the SDK should use plaintext communication.
Plaintext bool `mapstructure:"plaintext" json:"plaintext" default:"false" validate:"boolean"`
// ClientID is the client ID used for client credentials grant.
// It is required together with ClientSecret.
ClientID string `mapstructure:"client_id" json:"client_id" validate:"required_with=ClientSecret"`
// ClientSecret is the client secret used for client credentials grant.
// It is required together with ClientID.
ClientSecret string `mapstructure:"client_secret" json:"client_secret" validate:"required_with=ClientID"`

logger:
level: debug
type: text
output: stdout
# DB and Server configurations are defaulted for local development
db:
host: opentdfdb
# port: 5432
# user: postgres
# password: changeme
# mode: all
services:
kas:
eccertid: e1
rsacertid: r1
entityresolution:
url: http://keycloak:8888/auth
clientid: 'tdf-entity-resolution'
clientsecret: 'secret'
realm: 'opentdf'
legacykeycloak: true
inferid:
from:
email: true
username: true
server:
auth:
enabled: true
enforceDPoP: false
public_client_id: 'opentdf-public'
audience: 'http://localhost:8080'
issuer: http://keycloak:8888/auth/realms/opentdf
policy:
## Default policy for all requests
default: #"role:standard"
## Dot notation is used to access nested claims (i.e. realm_access.roles)
claim: # realm_access.roles
## Maps the external role to the opentdf role
## Note: left side is used in the policy, right side is the external role
map:
# standard: opentdf-standard
# admin: opentdf-admin
# org-admin: opentdf-org-admin

## Custom policy (see examples https://github.com/casbin/casbin/tree/master/examples)
csv: #|
# p, role:org-admin, policy:attributes, *, *, allow
# p, role:org-admin, policy:subject-mappings, *, *, allow
# p, role:org-admin, policy:resource-mappings, *, *, allow
# p, role:org-admin, policy:kas-registry, *, *, allow
# p, role:org-admin, policy:unsafe, *, *, allow
## Custom model (see https://casbin.org/docs/syntax-for-models/)
model: #|
# [request_definition]
# r = sub, res, act, obj
#
# [policy_definition]
# p = sub, res, act, obj, eft
#
# [role_definition]
# g = _, _
#
# [policy_effect]
# e = some(where (p.eft == allow)) && !some(where (p.eft == deny))
#
# [matchers]
# m = g(r.sub, p.sub) && globOrRegexMatch(r.res, p.res) && globOrRegexMatch(r.act, p.act) && globOrRegexMatch(r.obj, p.obj)
cors:
enabled: false
# "*" to allow any origin or a specific domain like "https://yourdomain.com"
allowedorigins:
- '*'
# List of methods. Examples: "GET,POST,PUT"
allowedmethods:
- GET
- POST
- PATCH
- PUT
- DELETE
- OPTIONS
# List of headers that are allowed in a request
allowedheaders:
- ACCEPT
- Authorization
- Content-Type
- X-CSRF-Token
# List of response headers that browsers are allowed to access
exposedheaders:
- Link
# Sets whether credentials are included in the CORS request
allowcredentials: true
# Sets the maximum age (in seconds) of a specific CORS preflight request
maxage: 3600
grpc:
reflectionEnabled: true # Default is false
cryptoProvider:
type: standard
standard:
keys:
- kid: r1
alg: rsa:2048
private: /keys/kas-private.pem
cert: /keys/kas-cert.pem
- kid: e1
alg: ec:secp256r1
private: /keys/kas-ec-private.pem
cert: /keys/kas-ec-cert.pem
port: 8080
4 changes: 2 additions & 2 deletions sdk/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type config struct {
customAccessTokenSource auth.AccessTokenSource
oauthAccessTokenSource oauth2.TokenSource
coreConn *grpc.ClientConn
entityResolutionConn *grpc.ClientConn
}

// Options specific to TDF protocol features
Expand Down Expand Up @@ -127,10 +128,9 @@ func WithCustomAuthorizationConnection(conn *grpc.ClientConn) Option {
}
}

// Deprecated: Use WithCustomCoreConnection instead
func WithCustomEntityResolutionConnection(conn *grpc.ClientConn) Option {
return func(c *config) {
c.coreConn = conn
c.entityResolutionConn = conn
}
}

Expand Down
8 changes: 7 additions & 1 deletion sdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
func New(platformEndpoint string, opts ...Option) (*SDK, error) {
var (
platformConn *grpc.ClientConn // Connection to the platform
ersConn *grpc.ClientConn // Connection to ERS (possibly remote)
err error
)

Expand Down Expand Up @@ -160,11 +161,16 @@
if cfg.coreConn != nil {
platformConn = cfg.coreConn
} else {
platformConn, err = grpc.Dial(platformEndpoint, dialOptions...)

Check failure on line 164 in sdk/sdk.go

View workflow job for this annotation

GitHub Actions / go (sdk)

SA1019: grpc.Dial is deprecated: use NewClient instead. Will be supported throughout 1.x. (staticcheck)
if err != nil {
return nil, errors.Join(ErrGrpcDialFailed, err)
}
}
if cfg.entityResolutionConn != nil {
ersConn = cfg.entityResolutionConn
} else {
ersConn = platformConn
}

return &SDK{
config: *cfg,
Expand All @@ -179,7 +185,7 @@
Unsafe: unsafe.NewUnsafeServiceClient(platformConn),
KeyAccessServerRegistry: kasregistry.NewKeyAccessServerRegistryServiceClient(platformConn),
Authorization: authorization.NewAuthorizationServiceClient(platformConn),
EntityResoution: entityresolution.NewEntityResolutionServiceClient(platformConn),
EntityResoution: entityresolution.NewEntityResolutionServiceClient(ersConn),
wellknownConfiguration: wellknownconfiguration.NewWellKnownServiceClient(platformConn),
}, nil
}
Expand Down Expand Up @@ -357,7 +363,7 @@
}

func fetchPlatformConfiguration(platformEndpoint string, dialOptions []grpc.DialOption) (PlatformConfiguration, error) {
conn, err := grpc.Dial(platformEndpoint, dialOptions...)

Check failure on line 366 in sdk/sdk.go

View workflow job for this annotation

GitHub Actions / go (sdk)

SA1019: grpc.Dial is deprecated: use NewClient instead. Will be supported throughout 1.x. (staticcheck)
if err != nil {
return nil, errors.Join(ErrGrpcDialFailed, err)
}
Expand Down
7 changes: 7 additions & 0 deletions service/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,20 @@ type Config struct {
// By default, it runs all services.
Mode []string `mapstructure:"mode" json:"mode" default:"[\"all\"]"`

// Remote services
ModeOptions ModeOptions `mapstructure:"modeoptions" json:"modeoptions"`

// SDKConfig represents the configuration settings for the SDK.
SDKConfig SDKConfig `mapstructure:"sdk_config" json:"sdk_config"`

// Services represents the configuration settings for the services.
Services map[string]serviceregistry.ServiceConfig `mapstructure:"services"`
}

type ModeOptions struct {
RemoteERSUrl string `mapstructure:"remoteERSUrl" json:"remoteERSUrl"`
}

// SDKConfig represents the configuration for the SDK.
type SDKConfig struct {
// Endpoint is the URL of the Core Platform endpoint.
Expand Down
40 changes: 26 additions & 14 deletions service/pkg/server/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ const (
serviceAuthorization = "authorization"
)

var (
allIndividualServices = []string{serviceAuthorization, serviceEntityResolution, serviceKAS, serviceWellKnown}
coreIndividualServices = []string{serviceAuthorization, serviceEntityResolution, serviceWellKnown}
registrationFunctions = map[string](func() serviceregistry.Registration){
serviceKAS: kas.NewRegistration,
serviceEntityResolution: entityresolution.NewRegistration,
serviceWellKnown: wellknown.NewRegistration,
serviceAuthorization: authorization.NewRegistration,
}
)

// registerEssentialServices registers the essential services to the given service registry.
// It takes a serviceregistry.Registry as input and returns an error if registration fails.
func registerEssentialServices(reg serviceregistry.Registry) error {
Expand All @@ -53,7 +64,7 @@ func registerEssentialServices(reg serviceregistry.Registry) error {

// registerCoreServices registers the core services based on the provided mode.
// It returns the list of registered services and any error encountered during registration.
func registerCoreServices(reg serviceregistry.Registry, mode []string) ([]string, error) {
func registerCoreServices(reg serviceregistry.Registry, mode []string, modeoptions config.ModeOptions) ([]string, error) {
var (
services []serviceregistry.Registration
registeredServices []string
Expand All @@ -62,21 +73,22 @@ func registerCoreServices(reg serviceregistry.Registry, mode []string) ([]string
for _, m := range mode {
switch m {
case "all":
registeredServices = append(registeredServices, []string{servicePolicy, serviceAuthorization, serviceKAS, serviceWellKnown, serviceEntityResolution}...)
services = append(services, []serviceregistry.Registration{
authorization.NewRegistration(),
kas.NewRegistration(),
wellknown.NewRegistration(),
entityresolution.NewRegistration(),
}...)
for _, service := range allIndividualServices {
if service == serviceEntityResolution && modeoptions.RemoteERSUrl != "" {
continue
}
registeredServices = append(registeredServices, service)
services = append(services, registrationFunctions[service]())
}
services = append(services, policy.NewRegistrations()...)
case "core":
registeredServices = append(registeredServices, []string{servicePolicy, serviceAuthorization, serviceWellKnown}...)
services = append(services, []serviceregistry.Registration{
entityresolution.NewRegistration(),
authorization.NewRegistration(),
wellknown.NewRegistration(),
}...)
for _, service := range coreIndividualServices {
if service == serviceEntityResolution && modeoptions.RemoteERSUrl != "" {
continue
}
registeredServices = append(registeredServices, service)
services = append(services, registrationFunctions[service]())
}
services = append(services, policy.NewRegistrations()...)
case "kas":
// If the mode is "kas", register only the KAS service
Expand Down
37 changes: 34 additions & 3 deletions service/pkg/server/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (suite *ServiceTestSuite) TestRegisterEssentialServiceRegistrationIsSuccess

func (suite *ServiceTestSuite) Test_RegisterCoreServices_In_Mode_ALL_Expect_All_Services_Registered() {
registry := serviceregistry.NewServiceRegistry()
_, err := registerCoreServices(registry, []string{modeALL})
_, err := registerCoreServices(registry, []string{modeALL}, config.ModeOptions{})
suite.Require().NoError(err)

authz, err := registry.GetNamespace(serviceAuthorization)
Expand Down Expand Up @@ -122,7 +122,7 @@ func (suite *ServiceTestSuite) Test_RegisterCoreServices_In_Mode_ALL_Expect_All_
// Every service except kas is registered
func (suite *ServiceTestSuite) Test_RegisterCoreServices_In_Mode_Core_Expect_Core_Services_Registered() {
registry := serviceregistry.NewServiceRegistry()
_, err := registerCoreServices(registry, []string{modeCore})
_, err := registerCoreServices(registry, []string{modeCore}, config.ModeOptions{})
suite.Require().NoError(err)

authz, err := registry.GetNamespace(serviceAuthorization)
Expand Down Expand Up @@ -153,7 +153,7 @@ func (suite *ServiceTestSuite) Test_RegisterCoreServices_In_Mode_Core_Expect_Cor
// Register core and kas services
func (suite *ServiceTestSuite) Test_RegisterServices_In_Mode_Core_Plus_Kas_Expect_Core_And_Kas_Services_Registered() {
registry := serviceregistry.NewServiceRegistry()
_, err := registerCoreServices(registry, []string{modeCore, modeKAS})
_, err := registerCoreServices(registry, []string{modeCore, modeKAS}, config.ModeOptions{})
suite.Require().NoError(err)

authz, err := registry.GetNamespace(serviceAuthorization)
Expand Down Expand Up @@ -182,6 +182,37 @@ func (suite *ServiceTestSuite) Test_RegisterServices_In_Mode_Core_Plus_Kas_Expec
suite.Equal(modeCore, ers.Mode)
}

// Register all mode with remote ERS mode option
func (suite *ServiceTestSuite) Test_RegisterCoreServices_In_Mode_ALL_Remote_ERS_Expect_All_Except_ERS_Registered() {
registry := serviceregistry.NewServiceRegistry()
_, err := registerCoreServices(registry, []string{modeALL}, config.ModeOptions{RemoteERSUrl: "remoteurl"})
suite.Require().NoError(err)

authz, err := registry.GetNamespace(serviceAuthorization)
suite.Require().NoError(err)
suite.Len(authz.Services, 1)
suite.Equal(modeCore, authz.Mode)

kas, err := registry.GetNamespace(serviceKAS)
suite.Require().NoError(err)
suite.Len(kas.Services, 1)
suite.Equal(modeCore, kas.Mode)

policy, err := registry.GetNamespace(servicePolicy)
suite.Require().NoError(err)
suite.Len(policy.Services, 6)
suite.Equal(modeCore, policy.Mode)

wellKnown, err := registry.GetNamespace(serviceWellKnown)
suite.Require().NoError(err)
suite.Len(wellKnown.Services, 1)
suite.Equal(modeCore, wellKnown.Mode)

_, err = registry.GetNamespace(serviceEntityResolution)
suite.Require().Error(err)
suite.Require().ErrorContains(err, "namespace not found")
}

func (suite *ServiceTestSuite) TestStartServicesWithVariousCases() {
ctx := context.Background()

Expand Down
23 changes: 22 additions & 1 deletion service/pkg/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/opentdf/platform/service/pkg/serviceregistry"
wellknown "github.com/opentdf/platform/service/wellknownconfiguration"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const devModeMessage = `
Expand Down Expand Up @@ -104,7 +106,7 @@ func Start(f ...StartOptions) error {

var registeredCoreServices []string

registeredCoreServices, err = registerCoreServices(svcRegistry, cfg.Mode)
registeredCoreServices, err = registerCoreServices(svcRegistry, cfg.Mode, cfg.ModeOptions)
if err != nil {
logger.Error("could not register core services", slog.String("error", err.Error()))
return fmt.Errorf("could not register core services: %w", err)
Expand Down Expand Up @@ -157,6 +159,14 @@ func Start(f ...StartOptions) error {
// Use IPC for the SDK client
sdkOptions = append(sdkOptions, sdk.WithIPC())
sdkOptions = append(sdkOptions, sdk.WithCustomCoreConnection(otdf.GRPCInProcess.Conn()))
if cfg.ModeOptions.RemoteERSUrl != "" {
conn, err := getNewGRPCConn(cfg.ModeOptions.RemoteERSUrl)
if err != nil {
logger.Error("issue connecting to remote ers", slog.String("error", err.Error()))
return fmt.Errorf("issue connecting to remote ers: %w", err)
}
sdkOptions = append(sdkOptions, sdk.WithCustomEntityResolutionConnection(conn))
}

client, err = sdk.New("", sdkOptions...)
if err != nil {
Expand Down Expand Up @@ -197,6 +207,17 @@ func Start(f ...StartOptions) error {
return nil
}

func getNewGRPCConn(url string) (*grpc.ClientConn, error) {
defaultOptions := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: support dial options in the yaml config

conn, err := grpc.NewClient(url, defaultOptions...)
if err != nil {
return nil, fmt.Errorf("failed to dial grpc server: %w", err)
}
return conn, nil
}

// waitForShutdownSignal blocks until a SIGINT or SIGTERM is received.
func waitForShutdownSignal() {
sigs := make(chan os.Signal, 1)
Expand Down
Loading