Skip to content

Commit

Permalink
Merge pull request #128 from blocklessnetwork/multiple-topics
Browse files Browse the repository at this point in the history
  • Loading branch information
Maelkum authored Jan 12, 2024
2 parents b05c576 + ce3927b commit 7ed1220
Show file tree
Hide file tree
Showing 25 changed files with 288 additions and 112 deletions.
7 changes: 5 additions & 2 deletions api/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
)

// ExecuteRequest describes the payload for the REST API request for function execution.
type ExecuteRequest execute.Request
type ExecuteRequest struct {
execute.Request
Subgroup string `json:"subgroup,omitempty"`
}

// ExecuteResponse describes the REST API response for function execution.
type ExecuteResponse struct {
Expand Down Expand Up @@ -44,7 +47,7 @@ func (a *API) Execute(ctx echo.Context) error {
}

// Get the execution result.
code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(req))
code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(req.Request), req.Subgroup)
if err != nil {
a.Log.Warn().Str("function", req.FunctionID).Err(err).Msg("node failed to execute function")
}
Expand Down
4 changes: 2 additions & 2 deletions api/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestAPI_Execute(t *testing.T) {
expectedCode := codes.OK

node := mocks.BaselineNode(t)
node.ExecuteFunctionFunc = func(context.Context, execute.Request) (codes.Code, string, execute.ResultMap, execute.Cluster, error) {
node.ExecuteFunctionFunc = func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) {

res := execute.ResultMap{
mocks.GenericPeerID: executionResult,
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestAPI_Execute_HandlesErrors(t *testing.T) {
expectedCode := codes.Error

node := mocks.BaselineNode(t)
node.ExecuteFunctionFunc = func(context.Context, execute.Request) (codes.Code, string, execute.ResultMap, execute.Cluster, error) {
node.ExecuteFunctionFunc = func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) {

res := execute.ResultMap{
mocks.GenericPeerID: executionResult,
Expand Down
7 changes: 4 additions & 3 deletions api/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ const (

// InstallFunctionRequest describes the payload for the REST API request for function install.
type InstallFunctionRequest struct {
CID string `json:"cid"`
URI string `json:"uri"`
CID string `json:"cid"`
URI string `json:"uri"`
Subgroup string `json:"subgroup"`
}

// InstallFunctionResponse describes the REST API response for the function install.
Expand Down Expand Up @@ -46,7 +47,7 @@ func (a *API) Install(ctx echo.Context) error {
// Start function install in a separate goroutine and signal when it's done.
fnErr := make(chan error)
go func() {
err = a.Node.PublishFunctionInstall(reqCtx, req.URI, req.CID)
err = a.Node.PublishFunctionInstall(reqCtx, req.URI, req.CID, req.Subgroup)
fnErr <- err
}()

Expand Down
4 changes: 2 additions & 2 deletions api/install_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestAPI_FunctionInstall_HandlesErrors(t *testing.T) {
)

node := mocks.BaselineNode(t)
node.PublishFunctionInstallFunc = func(context.Context, string, string) error {
node.PublishFunctionInstallFunc = func(context.Context, string, string, string) error {
time.Sleep(installDuration)
return nil
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestAPI_FunctionInstall_HandlesErrors(t *testing.T) {
t.Parallel()

node := mocks.BaselineNode(t)
node.PublishFunctionInstallFunc = func(context.Context, string, string) error {
node.PublishFunctionInstallFunc = func(context.Context, string, string, string) error {
return mocks.GenericError
}

Expand Down
4 changes: 2 additions & 2 deletions api/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type Node interface {
ExecuteFunction(context.Context, execute.Request) (code codes.Code, requestID string, results execute.ResultMap, peers execute.Cluster, err error)
ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (code codes.Code, requestID string, results execute.ResultMap, peers execute.Cluster, err error)
ExecutionResult(id string) (execute.Result, bool)
PublishFunctionInstall(ctx context.Context, uri string, cid string) error
PublishFunctionInstall(ctx context.Context, uri string, cid string, subgroup string) error
}
3 changes: 2 additions & 1 deletion cmd/node/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ func parseFlags() *config.Config {
pflag.StringVar(&cfg.API, "rest-api", "", "address where the head node REST API will listen on")
pflag.StringVar(&cfg.Workspace, "workspace", "./workspace", "directory that the node can use for file storage")
pflag.StringVar(&cfg.RuntimePath, "runtime-path", "", "runtime path (used by the worker node)")
pflag.StringVar(&cfg.RuntimeCLI, "runtime-cli", "", "runtime path (used by the worker node)")
pflag.StringVar(&cfg.RuntimeCLI, "runtime-cli", "", "runtime CLI name (used by the worker node)")
pflag.BoolVar(&cfg.LoadAttributes, "attributes", false, "node should try to load its attribute data from IPFS")
pflag.StringSliceVar(&cfg.Topics, "topic", nil, "topics node should subscribe to")

// Host configuration.
pflag.StringVar(&cfg.Host.PrivateKey, "private-key", "", "private key that the b7s host will use")
Expand Down
7 changes: 6 additions & 1 deletion cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func run() int {
executor, err := executor.New(log, execOptions...)
if err != nil {
log.Error().
Err(err).
Err(err).
Str("workspace", cfg.Workspace).
Str("runtime_path", cfg.RuntimePath).
Msg("could not create an executor")
Expand All @@ -181,6 +181,11 @@ func run() int {
// Create function store.
fstore := fstore.New(log, functionStore, cfg.Workspace)

// If we have topics specified, use those.
if len(cfg.Topics) > 0 {
opts = append(opts, node.WithTopics(cfg.Topics))
}

// Instantiate node.
node, err := node.New(log, host, peerstore, fstore, opts...)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions config/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ type Config struct {
Role string
BootNodes []string
Concurrency uint
Topics []string

Host Host
API string
RuntimePath string
RuntimeCLI string
LoadAttributes bool
Host Host
API string
RuntimePath string
RuntimeCLI string
LoadAttributes bool

CPUPercentage float64
MemoryMaxKB int64
Expand Down
3 changes: 3 additions & 0 deletions host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/rs/zerolog"

"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
ma "github.com/multiformats/go-multiaddr"
Expand All @@ -20,6 +21,8 @@ type Host struct {

log zerolog.Logger
cfg Config

pubsub *pubsub.PubSub
}

// New creates a new Host.
Expand Down
30 changes: 26 additions & 4 deletions host/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,44 @@ package host

import (
"context"
"errors"
"fmt"

pubsub "github.com/libp2p/go-libp2p-pubsub"
)

// Subscribe will have the host start listening to a specified gossipsub topic.
func (h *Host) Subscribe(ctx context.Context, topic string) (*pubsub.Topic, *pubsub.Subscription, error) {
func (h *Host) InitPubSub(ctx context.Context) error {

// Get a new PubSub object with the default router.
pubsub, err := pubsub.NewGossipSub(ctx, h)
if err != nil {
return nil, nil, fmt.Errorf("could not create new gossipsub: %w", err)
return fmt.Errorf("could not create new gossipsub: %w", err)
}
h.pubsub = pubsub

return nil
}

func (h *Host) JoinTopic(topic string) (*pubsub.Topic, error) {

if h.pubsub == nil {
return nil, errors.New("pubsub is not initialized")
}

// Join the specified topic.
th, err := h.pubsub.Join(topic)
if err != nil {
return nil, fmt.Errorf("could not join topic: %w", err)
}

return th, nil
}

// Subscribe will have the host start listening to a specified gossipsub topic.
func (h *Host) Subscribe(topic string) (*pubsub.Topic, *pubsub.Subscription, error) {

// Join the specified topic.
th, err := pubsub.Join(topic)
th, err := h.JoinTopic(topic)
if err != nil {
return nil, nil, fmt.Errorf("could not join topic: %w", err)
}
Expand Down
14 changes: 7 additions & 7 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Option func(*Config)
// DefaultConfig represents the default settings for the node.
var DefaultConfig = Config{
Role: blockless.WorkerNode,
Topic: DefaultTopic,
Topics: []string{DefaultTopic},
HealthInterval: DefaultHealthInterval,
RollCallTimeout: DefaultRollCallTimeout,
Concurrency: DefaultConcurrency,
Expand All @@ -28,7 +28,7 @@ var DefaultConfig = Config{
// Config represents the Node configuration.
type Config struct {
Role blockless.NodeRole // Node role.
Topic string // Topic to subscribe to.
Topics []string // Topics to subscribe to.
Execute blockless.Executor // Executor to use for running functions.
HealthInterval time.Duration // How often should we emit the health ping.
RollCallTimeout time.Duration // How long do we wait for roll call responses.
Expand All @@ -47,8 +47,8 @@ func (n *Node) ValidateConfig() error {
return errors.New("node role is not valid")
}

if n.cfg.Topic == "" {
return errors.New("topic cannot be empty")
if len(n.cfg.Topics) == 0 {
return errors.New("topics cannot be empty")
}

// Worker specific validation.
Expand Down Expand Up @@ -83,10 +83,10 @@ func WithRole(role blockless.NodeRole) Option {
}
}

// WithTopic specifies the p2p topic to which node should subscribe.
func WithTopic(topic string) Option {
// WithTopics specifies the p2p topics to which node should subscribe.
func WithTopics(topics []string) Option {
return func(cfg *Config) {
cfg.Topic = topic
cfg.Topics = topics
}
}

Expand Down
8 changes: 4 additions & 4 deletions node/config_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ func TestConfig_NodeRole(t *testing.T) {

func TestConfig_Topic(t *testing.T) {

const topic = "super-secret-topic"
topics := []string{"super-secret-topic"}

cfg := Config{
Topic: "",
Topics: []string{},
}

WithTopic(topic)(&cfg)
require.Equal(t, topic, cfg.Topic)
WithTopics(topics)(&cfg)
require.Equal(t, topics, cfg.Topics)
}

func TestConfig_Executor(t *testing.T) {
Expand Down
9 changes: 6 additions & 3 deletions node/execute_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func TestNode_HeadExecute(t *testing.T) {
node := createNode(t, blockless.HeadNode)

ctx := context.Background()
_, err := node.subscribe(ctx)
err := node.subscribeToTopics(ctx)
require.NoError(t, err)

// Create a host that will receive the execution response.
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestNode_HeadExecute(t *testing.T) {
node.listenDirectMessages(ctx)

defer cancel()
_, err := node.subscribe(ctx)
err := node.subscribeToTopics(ctx)
require.NoError(t, err)

// Create a host that will simulate a worker.
Expand All @@ -340,7 +340,10 @@ func TestNode_HeadExecute(t *testing.T) {
mockWorker, err := host.New(mocks.NoopLogger, loopback, 0)
require.NoError(t, err)

_, subscription, err := mockWorker.Subscribe(ctx, topic)
err = mockWorker.InitPubSub(ctx)
require.NoError(t, err)

_, subscription, err := mockWorker.Subscribe(topic)
require.NoError(t, err)

hostAddNewPeer(t, node.host, mockWorker)
Expand Down
7 changes: 4 additions & 3 deletions node/head_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/blocklessnetwork/b7s/models/response"
)

// NOTE: head node typically receives execution requests from the REST API. This message handling is not cognizant of subgroups.
func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, payload []byte) error {

// Unpack the request.
Expand All @@ -34,7 +35,7 @@ func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, payload []b

log := n.log.With().Str("request", req.RequestID).Str("peer", from.String()).Str("function", req.FunctionID).Logger()

code, results, cluster, err := n.headExecute(ctx, requestID, req.Request)
code, results, cluster, err := n.headExecute(ctx, requestID, req.Request, "")
if err != nil {
log.Error().Err(err).Msg("execution failed")
}
Expand Down Expand Up @@ -66,7 +67,7 @@ func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, payload []b

// headExecute is called on the head node. The head node will publish a roll call and delegate an execution request to chosen nodes.
// The returned map contains execution results, mapped to the peer IDs of peers who reported them.
func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Request) (codes.Code, execute.ResultMap, execute.Cluster, error) {
func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Request, subgroup string) (codes.Code, execute.ResultMap, execute.Cluster, error) {

nodeCount := 1
if req.Config.NodeCount > 1 {
Expand All @@ -89,7 +90,7 @@ func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Re
log.Info().Msg("processing execution request")

// Phase 1. - Issue roll call to nodes.
reportingPeers, err := n.executeRollCall(ctx, requestID, req.FunctionID, nodeCount, consensusAlgo, req.Config.Attributes)
reportingPeers, err := n.executeRollCall(ctx, requestID, req.FunctionID, nodeCount, consensusAlgo, subgroup, req.Config.Attributes)
if err != nil {
code := codes.Error
if errors.Is(err, blockless.ErrRollCallTimeout) {
Expand Down
9 changes: 6 additions & 3 deletions node/health_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestNode_Health(t *testing.T) {
nhost, err := host.New(logger, loopback, 0)
require.NoError(t, err)

node, err := New(logger, nhost, peerstore, functionHandler, WithRole(blockless.HeadNode), WithHealthInterval(healthInterval), WithTopic(topic))
node, err := New(logger, nhost, peerstore, functionHandler, WithRole(blockless.HeadNode), WithHealthInterval(healthInterval), WithTopics([]string{topic}))
require.NoError(t, err)

// Create a host that will listen on the the topic to verify health pings
Expand All @@ -55,11 +55,14 @@ func TestNode_Health(t *testing.T) {
err = node.host.Connect(ctx, *info)
require.NoError(t, err)

err = receiver.InitPubSub(ctx)
require.NoError(t, err)

// Have both client and node subscribe to the same topic.
_, subscription, err := receiver.Subscribe(ctx, topic)
_, subscription, err := receiver.Subscribe(topic)
require.NoError(t, err)

_, err = node.subscribe(ctx)
err = node.subscribeToTopics(ctx)
require.NoError(t, err)

go node.HealthPing(ctx)
Expand Down
Loading

0 comments on commit 7ed1220

Please sign in to comment.