Skip to content

Commit

Permalink
Improved message broker error messages; added '_directory' function t…
Browse files Browse the repository at this point in the history
…hat allows clients to query the service directory through an EPS server.
  • Loading branch information
adewes committed Aug 17, 2021
1 parent f17950c commit ed6aaa3
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 14 deletions.
2 changes: 1 addition & 1 deletion channels/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (c *GRPCClientChannel) DeliverRequest(request *eps.Request) (*eps.Response,
}

if entry == nil {
return nil, fmt.Errorf("cannot deliver gRPC request")
return nil, fmt.Errorf("cannot deliver gRPC request: recipient does not have a gRPC server")
}

if len(entry.Channels) == 0 {
Expand Down
5 changes: 2 additions & 3 deletions channels/jsonrpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,14 @@ func (c *JSONRPCServerChannel) handler(context *jsonrpc.Context) *jsonrpc.Respon
}

if response, err := c.MessageBroker().DeliverRequest(request, clientInfo); err != nil {
eps.Log.Error(err)
return context.Error(1, "cannot deliver JSON-RPC request", err)
return context.Error(1, err.Error(), err)
} else {
if response == nil {
return context.Result(map[string]interface{}{"message": "submitted"})
}
jsonrpcResponse := jsonrpc.FromEPSResponse(response)
if jsonrpcResponse.Error != nil {
return context.Error(2, jsonrpcResponse.Error.Message, jsonrpcResponse.Error.Data)
return context.Error(jsonrpcResponse.Error.Code, jsonrpcResponse.Error.Message, jsonrpcResponse.Error.Data)
} else {
return context.Result(jsonrpcResponse.Result)
}
Expand Down
17 changes: 15 additions & 2 deletions directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,9 @@ func (d *DirectoryEntry) SettingsFor(service, operator string) *OperatorSettings
}

type DirectoryQuery struct {
Operator string
Channels []string
Group string `json:"group"`
Operator string `json:"operator"`
Channels []string `json:"channels"`
}

type DirectoryEntries []*DirectoryEntry
Expand Down Expand Up @@ -307,6 +308,18 @@ func FilterDirectoryEntriesByQuery(entries []*DirectoryEntry, query *DirectoryQu
if query.Operator != "" && entry.Name != query.Operator {
continue
}
if query.Group != "" {
found := false
for _, group := range entry.Groups {
if group == query.Group {
found = true
break
}
}
if !found {
continue
}
}
// we filter the entries by the specified channel types
found := false
if query.Channels != nil {
Expand Down
55 changes: 49 additions & 6 deletions message_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package eps

import (
"fmt"
"github.com/kiprotect/go-helpers/forms"
"sync"
"time"
)
Expand Down Expand Up @@ -53,15 +54,51 @@ func (b *BasicMessageBroker) AddChannel(channel Channel) error {
return nil
}

var DirectoryQueryForm = forms.Form{
Fields: []forms.Field{
{
Name: "group",
Validators: []forms.Validator{
forms.IsOptional{},
forms.IsString{},
},
},
{
Name: "operator",
Validators: []forms.Validator{
forms.IsOptional{},
forms.IsString{},
},
},
{
Name: "channels",
Validators: []forms.Validator{
forms.IsOptional{},
forms.IsStringList{},
},
},
},
}

func (b *BasicMessageBroker) handleInternalRequest(address *Address, request *Request) (*Response, error) {
switch address.Method {
case "_ping":

if ownEntry, err := b.directory.OwnEntry(); err != nil {
return nil, fmt.Errorf("error retrieving own entry: %w", err)
} else {
return &Response{Result: map[string]interface{}{"timestamp": time.Now().Format(time.RFC3339Nano), "params": request.Params, "serverInfo": ownEntry}, Error: nil, ID: &address.ID}, nil
}
case "_directory":
query := &DirectoryQuery{}
if params, err := DirectoryQueryForm.Validate(request.Params); err != nil {
return nil, err
} else if err := DirectoryQueryForm.Coerce(query, params); err != nil {
return nil, err
} else if entries, err := b.directory.Entries(query); err != nil {
return nil, err
} else {
return &Response{Result: map[string]interface{}{"entries": entries}, ID: &address.ID}, nil
}
}
return nil, nil
}
Expand Down Expand Up @@ -95,7 +132,7 @@ func (b *BasicMessageBroker) DeliverRequest(request *Request, clientInfo *Client

// we always update the directory entry of the client info struct
if remoteEntry, err = b.directory.EntryFor(clientInfo.Name); err != nil {
return nil, fmt.Errorf("error retrieving directory entry: %w", err)
return nil, fmt.Errorf("error retrieving directory entry for client '%s': %w", clientInfo.Name, err)
} else {
clientInfo.Entry = remoteEntry
}
Expand All @@ -116,6 +153,10 @@ func (b *BasicMessageBroker) DeliverRequest(request *Request, clientInfo *Client

address, err := GetAddress(request.ID)

if _, err := b.directory.EntryFor(address.Operator); err != nil {
return nil, fmt.Errorf("error retrieving directory entry for recipient '%s': %w", address.Operator, err)
}

if err != nil {
return nil, fmt.Errorf("error parsing address: %w", err)
}
Expand All @@ -125,8 +166,9 @@ func (b *BasicMessageBroker) DeliverRequest(request *Request, clientInfo *Client
// this endpoint
if ownEntry.Name != remoteEntry.Name {
if !CanCall(remoteEntry, ownEntry, address.Method) {
Log.Warningf("Permission denied for method '%s' and client '%s'", address.Method, clientInfo.Name)
return PermissionDenied(&request.ID, nil), nil
msg := fmt.Sprintf("Permission denied for method '%s' and client '%s'", address.Method, clientInfo.Name)
Log.Warningf(msg)
return PermissionDenied(&request.ID, msg, nil), nil
}
}

Expand All @@ -148,8 +190,9 @@ func (b *BasicMessageBroker) DeliverRequest(request *Request, clientInfo *Client
}
Log.Debug("Trying to deliver message...")
if response, err := channel.DeliverRequest(request); err != nil {
Log.Errorf("Channel %d encountered an error delivering message", i)
continue
msg := fmt.Sprintf("Channel %d encountered an error delivering the message: %v", i, err)
Log.Errorf(msg)
return ChannelError(&request.ID, msg, nil), nil
} else {
return response, nil
}
Expand Down
15 changes: 13 additions & 2 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,23 @@ type Error struct {
Data map[string]interface{} `json:"data,omitempty"`
}

func PermissionDenied(id *string, data map[string]interface{}) *Response {
func ChannelError(id *string, message string, data map[string]interface{}) *Response {
return &Response{
ID: id,
Error: &Error{
Code: 500,
Message: message,
Data: data,
},
}
}

func PermissionDenied(id *string, message string, data map[string]interface{}) *Response {
return &Response{
ID: id,
Error: &Error{
Code: 403,
Message: "permission denied",
Message: message,
Data: data,
},
}
Expand Down

0 comments on commit ed6aaa3

Please sign in to comment.