The package contains basic implementation of gRPC server and client.
Usage of the package will be descripted by the example where time service was realized.
To import module in your code write following line:
import "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc"
First of all you have to write proto file for your protocol. For example:
syntax = "proto3";
package proto;
option go_package = "github.com/dipdup-net/indexer-sdk/examples/grpc/pb";
import "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc/proto/general.proto";
service TimeService {
rpc SubscribeOnTime(Request) returns (stream Response);
rpc UnsubscribeFromTime(UnsubscribeRequest) returns (UnsubscribeResponse);
}
message Request {}
message Response{
string time = 1;
}
After that generates pb
files by your proto file:
protoc -I=. -I=${GOPATH}/src --go-grpc_out=${GOPATH}/src --go_out=${GOPATH}/src ${GOPATH}/src/github.com/dipdup-net/indexer-sdk/examples/grpc/proto/*.proto
Inherit Server
module to implement custom gRPC server module. It realizes default Module
interface. Also you have to inherit of you unimplemented server generated by proto. For example:
type Server struct {
*grpc.Server
pb.UnimplementedTimeServiceServer
subscriptions *grpc.Subscriptions[time.Time, *pb.Response]
wg *sync.WaitGroup
}
Subscriptions is the structure describing as Subscription[T any, P any]
. It's generic struct containing map with all subscriptions for the stream. It filters models of type T
according to subscription pattern and send it to stream if message is applied by pattern. You have to realize Subscription
interface to your custom fiteration of message and notification. Example is here.
After creation of server you have to register generated server in you Start
function. For example:
func (server *Server) Start(ctx context.Context) {
pb.RegisterTimeServiceServer(server.Server.Server(), server)
server.Server.Start(ctx)
server.wg.Add(1)
go server.listen(ctx)
}
listen
is the function waiting any events to notify subscribers. In example it sends time to subscribers every 5 second.
Next step is definition of SubscribeOnXXX
and UnsibscribeFromXXX
methods to realize custom stream which calls default implementaion of functions. For example:
// SubscribeOnTime -
func (server *Server) SubscribeOnTime(req *pb.Request, stream pb.TimeService_SubscribeOnTimeServer) error {
return grpc.DefaultSubscribeOn[time.Time, *pb.Response](
stream,
server.subscriptions,
NewTimeSubscription(),
)
}
// UnsubscribeFromTime -
func (server *Server) UnsubscribeFromTime(ctx context.Context, req *generalPB.UnsubscribeRequest) (*generalPB.UnsubscribeResponse, error) {
return grpc.DefaultUnsubscribe(ctx, server.subscriptions, req.Id)
}
DefaultSubscribeOn
has the notation:
func DefaultSubscribeOn[T any, P any](stream ServerStream[P], subscriptions *Subscriptions[T, P], subscription Subscription[T, P], handler func(id uint64) error) error
handler
is the function which calls after subscription response before real-time subscription. You can realize synchronization process by handler.
DefaultUnsubscribe
has the notation:
func DefaultUnsubscribe[T any, P any](ctx context.Context, subscriptions *Subscriptions[T, P], subscriptionID uint64) (*pb.UnsubscribeResponse, error)
Inherit of Client
module and generated client to implement custom gRPC client module. Also you should create output of this module to add it in the workflow.
type Client struct {
*grpc.Client
output *modules.Output
client pb.TimeServiceClient
wg *sync.WaitGroup
}
After that implement Start
method where initialize generated client. The initilization of generated client has to be in Start
because it requires opened connection to server which opens by calling Connect
.
// Start -
func (client *Client) Start(ctx context.Context) {
client.client = pb.NewTimeServiceClient(client.Connection())
}
Then implement SubscribeOnXXX
and UnsubscribeFromXXX
methods. Code below describes how to connect to server and notifies all client's subscribers about new event.
// SubscribeOnTime -
func (client *Client) SubscribeOnTime(ctx context.Context) (uint64, error) {
stream, err := client.client.SubscribeOnTime(ctx, new(pb.Request))
if err != nil {
return 0, err
}
return grpc.Subscribe[*pb.Response](
stream,
client.handleTime,
client.wg,
)
}
func (client *Client) handleTime(ctx context.Context, data *pb.Response, id messages.SubscriptionID) error {
log.Info().Str("time", data.Time).Msg("now")
client.Publisher().Notify(messages.NewMessage(id, data))
return nil
}
func (client *Client) handleTime(ctx context.Context, data *pb.Response, id uint64) error {
client.output.Push(data)
return nil
}
// UnsubscribeFromTime -
func (client *Client) UnsubscribeFromTime(ctx context.Context, id uint64) error {
if _, err := client.client.UnsubscribeFromTime(ctx, &generalPB.UnsubscribeRequest{
Id: id,
}); err != nil {
return err
}
return nil
}
handleTime
is a hadler which called on receiving new event from server.
Also you need implement Module
interface. It's described here. For example:
// Input -
func (client *Client) Input(name string) (*modules.Input, error) {
return nil, errors.Wrap(modules.ErrUnknownInput, name)
}
// Output -
func (client *Client) Output(name string) (*modules.Output, error) {
if name != "time" {
return nil, errors.Wrap(modules.ErrUnknownOutput, name)
}
return client.output, nil
}
// AttachTo -
func (client *Client) AttachTo(name string, input *modules.Input) error {
output, err := client.Output(name)
if err != nil {
return err
}
output.Attach(input)
return nil
}
The inputs and outputs of your server and client modules must be defined by developer, because the data structure and the notification or processing logic differ from project to project.
Full code of main file is below.
package main
import (
"context"
"os"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/dipdup-net/indexer-sdk/pkg/modules"
"github.com/dipdup-net/indexer-sdk/pkg/modules/grpc"
)
func main() {
log.Logger = log.Output(zerolog.ConsoleWriter{
Out: os.Stdout,
TimeFormat: "2006-01-02 15:04:05",
})
bind := "127.0.0.1:8889"
serverCfg := grpc.ServerConfig{
Bind: bind,
}
// create server module
server, err := NewServer(&serverCfg)
if err != nil {
log.Panic().Err(err).Msg("creating server error")
return
}
// creating client module
client := NewClient(bind)
ctx, cancel := context.WithCancel(context.Background())
// starting all modules
server.Start(ctx)
if err := client.Connect(ctx); err != nil {
log.Panic().Err(err).Msg("connecting to server error")
return
}
client.Start(ctx)
// subscribing to time
subscriptionID, err := client.SubscribeOnTime(ctx)
if err != nil {
log.Panic().Err(err).Msg("subscribing error")
return
}
log.Info().Uint64("subscription_id", subscriptionID).Msg("subscribed")
// creating custom module which receives notification from client and log it to console.
module := NewCustomModule()
if err := modules.Connect(client, module, "time", "input"); err != nil {
log.Panic().Err(err).Msg("module connection error")
return
}
module.Start(ctx)
time.Sleep(time.Minute)
if err := client.UnsubscribeFromTime(ctx, subscriptionID); err != nil {
log.Panic().Err(err).Msg("unsubscribing error")
return
}
log.Info().Msg("unsubscribed")
cancel()
// closing all modules
if err := client.Close(); err != nil {
log.Panic().Err(err).Msg("closing client error")
}
if err := module.Close(); err != nil {
log.Panic().Err(err).Msg("closing custo module error")
}
if err := server.Close(); err != nil {
log.Panic().Err(err).Msg("closing server error")
}
}