Skip to content

Commit 92f40a9

Browse files
committed
feat(server/memory,client/orb_transport/memory): Implement streaming, closes #48
1 parent 5d5548f commit 92f40a9

File tree

7 files changed

+400
-287
lines changed

7 files changed

+400
-287
lines changed

client/orb/memory_transport.go

-65
This file was deleted.

client/orb_transport/memory/go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module github.com/go-orb/plugins/client/orb_transport/memory
2+
3+
go 1.23.6

client/orb_transport/memory/memory.go

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Package memory implements a go-orb/plugins/client/orb compatible memory transport.
2+
package memory
3+
4+
import (
5+
"context"
6+
"maps"
7+
8+
"github.com/go-orb/go-orb/client"
9+
"github.com/go-orb/go-orb/log"
10+
"github.com/go-orb/go-orb/util/metadata"
11+
"github.com/go-orb/go-orb/util/orberrors"
12+
13+
"github.com/go-orb/plugins/client/orb"
14+
)
15+
16+
// Name is the name of this transport.
17+
const Name = "memory"
18+
19+
func init() {
20+
orb.RegisterTransport(Name, NewTransport)
21+
}
22+
23+
// Transport is a go-orb/plugins/client/orb compatible memory transport.
24+
type Transport struct {
25+
logger log.Logger
26+
}
27+
28+
// Start starts the transport.
29+
func (t *Transport) Start() error {
30+
return nil
31+
}
32+
33+
// Stop stop the transport.
34+
func (t *Transport) Stop(_ context.Context) error {
35+
return nil
36+
}
37+
38+
// Name returns the name of this transport.
39+
func (t *Transport) Name() string {
40+
return Name
41+
}
42+
43+
// Request does the actual rpc request to the server.
44+
func (t *Transport) Request(ctx context.Context, infos client.RequestInfos, req any, result any, opts *client.CallOptions) error {
45+
server, err := client.ResolveMemoryServer(infos.Service)
46+
if err != nil {
47+
return orberrors.ErrInternalServerError.Wrap(err)
48+
}
49+
50+
md := opts.Metadata
51+
if md == nil {
52+
md = map[string]string{}
53+
}
54+
55+
ctx, outMd := metadata.WithOutgoing(ctx)
56+
ctx, inMd := metadata.WithIncoming(ctx)
57+
58+
maps.Copy(inMd, md)
59+
inMd["Content-Type"] = opts.ContentType
60+
61+
err = server.Request(ctx, infos, req, result, opts)
62+
63+
// Retrieve metadata from memory.
64+
if opts.ResponseMetadata != nil {
65+
for k, v := range outMd {
66+
opts.ResponseMetadata[k] = v
67+
}
68+
}
69+
70+
return err
71+
}
72+
73+
// Stream opens a bidirectional stream to the memory server.
74+
func (t *Transport) Stream(
75+
ctx context.Context, infos client.RequestInfos,
76+
opts *client.CallOptions,
77+
) (client.StreamIface[any, any], error) {
78+
server, err := client.ResolveMemoryServer(infos.Service)
79+
if err != nil {
80+
return nil, orberrors.ErrInternalServerError.Wrap(err)
81+
}
82+
83+
md := opts.Metadata
84+
if md == nil {
85+
md = map[string]string{}
86+
}
87+
88+
ctx, outMd := metadata.WithOutgoing(ctx)
89+
ctx, inMd := metadata.WithIncoming(ctx)
90+
91+
maps.Copy(inMd, md)
92+
inMd["Content-Type"] = opts.ContentType
93+
94+
stream, err := server.Stream(ctx, infos, opts)
95+
96+
// Retrieve metadata from memory.
97+
if opts.ResponseMetadata != nil {
98+
for k, v := range outMd {
99+
opts.ResponseMetadata[k] = v
100+
}
101+
}
102+
103+
return stream, err
104+
}
105+
106+
// NewTransport creates a Transport.
107+
func NewTransport(logger log.Logger, _ *orb.Config) (orb.TransportType, error) {
108+
return orb.TransportType{Transport: &Transport{
109+
logger: logger,
110+
}}, nil
111+
}
+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package memory
2+
3+
import (
4+
"context"
5+
"os"
6+
"testing"
7+
8+
"github.com/go-orb/go-orb/log"
9+
"github.com/go-orb/go-orb/registry"
10+
"github.com/go-orb/go-orb/server"
11+
"github.com/go-orb/go-orb/types"
12+
"github.com/go-orb/plugins/client/tests"
13+
"github.com/stretchr/testify/suite"
14+
15+
"github.com/go-orb/plugins/server/memory"
16+
17+
echohandler "github.com/go-orb/plugins/client/tests/handler/echo"
18+
echoproto "github.com/go-orb/plugins/client/tests/proto/echo"
19+
20+
filehandler "github.com/go-orb/plugins/client/tests/handler/file"
21+
fileproto "github.com/go-orb/plugins/client/tests/proto/file"
22+
23+
// Blank imports here are fine.
24+
_ "github.com/go-orb/plugins/codecs/json"
25+
_ "github.com/go-orb/plugins/codecs/proto"
26+
_ "github.com/go-orb/plugins/codecs/yaml"
27+
_ "github.com/go-orb/plugins/log/slog"
28+
_ "github.com/go-orb/plugins/registry/mdns"
29+
)
30+
31+
func setupServer(sn string) (*tests.SetupData, error) {
32+
ctx, cancel := context.WithCancel(context.Background())
33+
34+
setupData := &tests.SetupData{}
35+
36+
logger, err := log.New(log.WithLevel(log.LevelTrace))
37+
if err != nil {
38+
cancel()
39+
40+
return nil, err
41+
}
42+
43+
reg, err := registry.New(nil, &types.Components{}, logger)
44+
if err != nil {
45+
cancel()
46+
47+
return nil, err
48+
}
49+
50+
echoHInstance := new(echohandler.Handler)
51+
echoHRegister := echoproto.RegisterStreamsHandler(echoHInstance)
52+
53+
fileHInstance := new(filehandler.Handler)
54+
fileHRegister := fileproto.RegisterFileServiceHandler(fileHInstance)
55+
56+
options := []server.Option{
57+
memory.WithHandlers(echoHRegister, fileHRegister),
58+
}
59+
60+
ep, err := memory.New(sn, "", memory.NewConfig(options...), logger, reg)
61+
if err != nil {
62+
cancel()
63+
64+
return nil, err
65+
}
66+
67+
setupData.Logger = logger
68+
setupData.Registry = reg
69+
setupData.Entrypoints = []server.Entrypoint{ep}
70+
setupData.Ctx = ctx
71+
setupData.Stop = cancel
72+
73+
return setupData, nil
74+
}
75+
76+
func newSuite() *tests.TestSuite {
77+
s := tests.NewSuite(setupServer, []string{Name})
78+
// s.Debug = true
79+
return s
80+
}
81+
82+
func TestSuite(t *testing.T) {
83+
if os.Getenv("CI") != "" {
84+
t.Skip("Skipping testing in CI environment")
85+
}
86+
87+
// Run the tests.
88+
suite.Run(t, newSuite())
89+
}

client/tests/tests.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ func (s *TestSuite) TestFileUpload() {
333333
// Open a stream to the service
334334
stream, err := fileClient.UploadFile(ctx, ServiceName, client.WithPreferredTransports(t))
335335
if errors.Is(err, orberrors.ErrNotImplemented) {
336-
// Transport does not support streaming.
336+
s.T().Skip("Transport does not support streaming.")
337337
return
338338
}
339339

@@ -377,6 +377,8 @@ func (s *TestSuite) TestFileUpload() {
377377
}
378378

379379
// TestAuthorizedFileUpload tests the authorized client streaming functionality for file uploads.
380+
//
381+
//nolint:funlen
380382
func (s *TestSuite) TestAuthorizedFileUpload() {
381383
// Create a file service client
382384
fileClient := file.NewFileServiceClient(s.client)
@@ -389,7 +391,7 @@ func (s *TestSuite) TestAuthorizedFileUpload() {
389391
// Open a stream to the service
390392
stream, err := fileClient.AuthorizedUploadFile(ctx, ServiceName, client.WithPreferredTransports(t))
391393
if errors.Is(err, orberrors.ErrNotImplemented) {
392-
// Transport does not support streaming.
394+
s.T().Skip("Transport does not support streaming.")
393395
return
394396
}
395397

@@ -436,7 +438,7 @@ func (s *TestSuite) TestAuthorizedFileUpload() {
436438
client.WithResponseMetadata(responseMd),
437439
)
438440
if errors.Is(err, orberrors.ErrNotImplemented) {
439-
// Transport does not support streaming.
441+
s.T().Skip("Transport does not support streaming.")
440442
return
441443
}
442444

0 commit comments

Comments
 (0)