Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC Endpoints #232

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: Add StreamServiceStatus
Andrew Hare committed May 1, 2024
commit 9640ab9c5c8b6b904a8311bf488a22805d8c4bb5
38 changes: 36 additions & 2 deletions gateway/grpc/lease.go
Original file line number Diff line number Diff line change
@@ -6,9 +6,11 @@ import (
"fmt"
"strings"
"sync"
"time"

leasev1 "github.com/akash-network/akash-api/go/provider/lease/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

kubeErrors "k8s.io/apimachinery/pkg/api/errors"
@@ -237,6 +239,38 @@ func (s *safeLogStream) Send(r *leasev1.ServiceLogsResponse) error {
return nil
}

func (*server) StreamServiceStatus(*leasev1.ServiceStatusRequest, leasev1.LeaseRPC_StreamServiceStatusServer) error {
panic("unimplemented")
const defaultServiceStatusInterval = 5 * time.Second

func (s *server) StreamServiceStatus(r *leasev1.ServiceStatusRequest, strm leasev1.LeaseRPC_StreamServiceStatusServer) error {
var (
ctx = strm.Context()
id = r.GetLeaseId()
)

interval := defaultServiceStatusInterval

if md, ok := metadata.FromIncomingContext(ctx); ok {
i := md.Get("interval")[0]

var err error
if interval, err = time.ParseDuration(i); err != nil {
return fmt.Errorf("parse duration %s: %w", i, err)
}
}

t := time.NewTicker(interval)

for {
select {
case <-ctx.Done():
return nil
case <-t.C:
res, err := s.ServiceStatus(ctx, &leasev1.ServiceStatusRequest{LeaseId: id})
if err != nil {
return fmt.Errorf("service status: %w", err)
}

strm.Send(res)
}
}
}
48 changes: 48 additions & 0 deletions gateway/grpc/server_test.go
Original file line number Diff line number Diff line change
@@ -9,12 +9,14 @@
"net"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/emptypb"

types "github.com/akash-network/akash-api/go/node/cert/v1beta3"
@@ -349,6 +351,52 @@
assert.Equal(t, expected, actual)
},
},

// StreamServiceStatus
{
desc: "StreamServiceStatus",
readClient: func(t *testing.T) *cmocks.ReadClient {
var m cmocks.ReadClient

mgi := newManifestGroup(t)

m.EXPECT().GetManifestGroup(mock.Anything, mock.Anything).
Return(true, mgi, nil)

m.EXPECT().LeaseStatus(mock.Anything, mock.Anything).
Return(nil, nil)

return &m
},
run: func(ctx context.Context, t *testing.T, c client) {
interval := 500 * time.Millisecond

ctx = metadata.AppendToOutgoingContext(ctx, "interval", interval.String())
s, err := c.l.StreamServiceStatus(ctx, &leasev1.ServiceStatusRequest{})
require.NoError(t, err)

var (
iterations = 3
after = time.After(interval * time.Duration(iterations))
hits int
)

for {
select {
case <-after:
assert.Equal(t, iterations, hits)
return
default:
_, err = s.Recv()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
hits++
}
}
},
},
}

for _, c := range cases {
@@ -388,7 +436,7 @@

defer s.Stop()

l, err := net.Listen("tcp", ":0")

Check failure on line 439 in gateway/grpc/server_test.go

GitHub Actions / lint

G102: Binds to all network interfaces (gosec)
require.NoError(t, err)

go func() {
@@ -396,7 +444,7 @@
}()

tlsConfig := tls.Config{
InsecureSkipVerify: true,

Check failure on line 447 in gateway/grpc/server_test.go

GitHub Actions / lint

G402: TLS InsecureSkipVerify set true. (gosec)
Certificates: crt2.Cert,
}

@@ -572,7 +620,7 @@

defer s.Stop()

l, err := net.Listen("tcp", ":0")

Check failure on line 623 in gateway/grpc/server_test.go

GitHub Actions / lint

G102: Binds to all network interfaces (gosec)
require.NoError(t, err)

go func() {
@@ -580,7 +628,7 @@
}()

tlsConfig := tls.Config{
InsecureSkipVerify: true,

Check failure on line 631 in gateway/grpc/server_test.go

GitHub Actions / lint

G402: TLS InsecureSkipVerify set true. (gosec)
Certificates: []tls.Certificate{c.cert(t)},
}

Loading