diff --git a/cmd/provider-services/cmd/grpc.go b/cmd/provider-services/cmd/grpc.go new file mode 100644 index 000000000..146e2a2ad --- /dev/null +++ b/cmd/provider-services/cmd/grpc.go @@ -0,0 +1,23 @@ +package cmd + +import ( + "fmt" + "net" + "net/url" +) + +const gRPCDefaultPort = "8444" + +func grpcURI(hostURI string) (string, error) { + u, err := url.Parse(hostURI) + if err != nil { + return "", fmt.Errorf("url parse: %w", err) + } + + h, _, err := net.SplitHostPort(u.Host) + if err != nil { + return "", fmt.Errorf("split host port: %w", err) + } + + return net.JoinHostPort(h, gRPCDefaultPort), nil +} diff --git a/cmd/provider-services/cmd/leaseLogs.go b/cmd/provider-services/cmd/leaseLogs.go index f6fe1f920..0dd262f61 100644 --- a/cmd/provider-services/cmd/leaseLogs.go +++ b/cmd/provider-services/cmd/leaseLogs.go @@ -1,21 +1,29 @@ package cmd import ( + "context" "crypto/tls" "fmt" + "strings" "sync" + "time" sdkclient "github.com/cosmos/cosmos-sdk/client" sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/pkg/errors" "github.com/spf13/cobra" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/akash-network/akash-api/go/node/client/v1beta2" dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4" + ptypes "github.com/akash-network/akash-api/go/node/provider/v1beta3" + leasev1 "github.com/akash-network/akash-api/go/provider/lease/v1" cmdcommon "github.com/akash-network/node/cmd/common" cutils "github.com/akash-network/node/x/cert/utils" aclient "github.com/akash-network/provider/client" + gwgrpc "github.com/akash-network/provider/gateway/grpc" gwrest "github.com/akash-network/provider/gateway/rest" ) @@ -81,7 +89,7 @@ func doLeaseLogs(cmd *cobra.Command) error { } if outputFormat != outputText && outputFormat != outputJSON { - return errors.Errorf("invalid output format %s. expected text|json", outputFormat) + return fmt.Errorf("invalid output format %s. expected text|json", outputFormat) } follow, err := cmd.Flags().GetBool(flagFollow) @@ -95,9 +103,127 @@ func doLeaseLogs(cmd *cobra.Command) error { } if tailLines < -1 { - return errors.Errorf("tail flag supplied with invalid value. must be >= -1") + return fmt.Errorf("tail flag supplied with invalid value. must be >= -1") } + g := leaseLogGetter{ + cert: cert, + cl: cl, + svcs: svcs, + follow: follow, + tailLines: tailLines, + printer: printer{ + cctx: cctx, + fmt: outputFormat, + }, + } + + if err = g.run(ctx, leases); err != nil { + return fmt.Errorf("getting logs: %w", err) + } + + return nil +} + +type leaseLogGetter struct { + cert tls.Certificate + cl v1beta2.QueryClient + svcs string + follow bool + tailLines int64 + printer printer +} + +func (g leaseLogGetter) run(ctx context.Context, leases []mtypes.LeaseID) error { + var ( + restLeases = make([]mtypes.LeaseID, 0, len(leases)) + grpcLeases = make(map[mtypes.LeaseID]*gwgrpc.Client, len(leases)) + ) + + for _, lid := range leases { + provAddr, _ := sdk.AccAddressFromBech32(lid.Provider) + prov, err := g.cl.Provider(ctx, &ptypes.QueryProviderRequest{Owner: provAddr.String()}) + if err != nil { + return fmt.Errorf("query client provider: %w", err) + } + + hostURIgRPC, err := grpcURI(prov.GetProvider().HostURI) + if err != nil { + return fmt.Errorf("grpc uri: %w", err) + } + + ctxDial, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + client, err := gwgrpc.NewClient(ctxDial, hostURIgRPC, g.cert, g.cl) + if err == nil { + grpcLeases[lid] = client + } else { + restLeases = append(restLeases, lid) + } + } + + g.grpc(ctx, grpcLeases) + g.rest(ctx, restLeases) + + return nil +} + +func (g leaseLogGetter) grpc(ctx context.Context, leases map[mtypes.LeaseID]*gwgrpc.Client) { + var wg sync.WaitGroup + wg.Add(len(leases)) + + for lid, cc := range leases { + go func(c *gwgrpc.Client, id mtypes.LeaseID) { + defer wg.Done() + + req := leasev1.ServiceLogsRequest{ + Services: strings.Split(g.svcs, " "), + LeaseId: id, + } + + logErr := func(err error) { + fmt.Printf("[%s]: %v", id, err) + } + + s, err := c.StreamServiceLogs(ctx, &req) + if err != nil { + logErr(fmt.Errorf("stream service logs: %w", err)) + return + } + + for { + select { + case <-ctx.Done(): + return + default: + r, err := s.Recv() + if err != nil { + if e, ok := status.FromError(err); ok { + if e.Code() != codes.Canceled { + logErr(fmt.Errorf("recv: %w", err)) + } + } + + return + } + + for _, s := range r.Services { + g.printer.write(logEntry{ + Name: s.GetName(), + Message: string(s.GetLogs()), + Lid: id, + }) + } + } + } + }(cc, lid) + } + + wg.Wait() +} + +func (g leaseLogGetter) rest(ctx context.Context, leases []mtypes.LeaseID) { type result struct { lid mtypes.LeaseID error error @@ -109,9 +235,9 @@ func doLeaseLogs(cmd *cobra.Command) error { for _, lid := range leases { stream := result{lid: lid} prov, _ := sdk.AccAddressFromBech32(lid.Provider) - gclient, err := gwrest.NewClient(ctx, cl, prov, []tls.Certificate{cert}) + gclient, err := gwrest.NewClient(ctx, g.cl, prov, []tls.Certificate{g.cert}) if err == nil { - stream.stream, stream.error = gclient.LeaseLogs(ctx, lid, svcs, follow, tailLines) + stream.stream, stream.error = gclient.LeaseLogs(ctx, lid, g.svcs, g.follow, g.tailLines) } else { stream.error = err } @@ -121,26 +247,11 @@ func doLeaseLogs(cmd *cobra.Command) error { var wgStreams sync.WaitGroup - type logEntry struct { - gwrest.ServiceLogMessage `json:",inline"` - Lid mtypes.LeaseID `json:"lease_id"` - } - outch := make(chan logEntry) - printFn := func(evt logEntry) { - fmt.Printf("[%s][%s] %s\n", evt.Lid, evt.Name, evt.Message) - } - - if outputFormat == "json" { - printFn = func(evt logEntry) { - _ = cmdcommon.PrintJSON(cctx, evt) - } - } - go func() { for evt := range outch { - printFn(evt) + g.printer.write(evt) } }() @@ -155,8 +266,9 @@ func doLeaseLogs(cmd *cobra.Command) error { for res := range stream.stream.Stream { outch <- logEntry{ - ServiceLogMessage: res, - Lid: stream.lid, + Name: res.Name, + Message: res.Message, + Lid: stream.lid, } } }(stream) @@ -164,6 +276,23 @@ func doLeaseLogs(cmd *cobra.Command) error { wgStreams.Wait() close(outch) +} - return nil +type logEntry struct { + Name string `json:"name"` + Message string `json:"message"` + Lid mtypes.LeaseID `json:"lease_id"` +} + +type printer struct { + fmt string + cctx sdkclient.Context +} + +func (p printer) write(e logEntry) { + if p.fmt == "json" { + cmdcommon.PrintJSON(p.cctx, e) + } else { + fmt.Printf("[%s][%s] %s\n", e.Lid, e.Name, e.Message) + } } diff --git a/cmd/provider-services/cmd/leaseStatus.go b/cmd/provider-services/cmd/leaseStatus.go index 42e5e9313..2006595af 100644 --- a/cmd/provider-services/cmd/leaseStatus.go +++ b/cmd/provider-services/cmd/leaseStatus.go @@ -1,17 +1,26 @@ package cmd import ( + "context" "crypto/tls" + "fmt" + "time" sdkclient "github.com/cosmos/cosmos-sdk/client" "github.com/spf13/cobra" + "github.com/akash-network/akash-api/go/manifest/v2beta2" + ptypes "github.com/akash-network/akash-api/go/node/provider/v1beta3" + leasev1 "github.com/akash-network/akash-api/go/provider/lease/v1" cmdcommon "github.com/akash-network/node/cmd/common" cutils "github.com/akash-network/node/x/cert/utils" dcli "github.com/akash-network/node/x/deployment/client/cli" mcli "github.com/akash-network/node/x/market/client/cli" + cltypes "github.com/akash-network/provider/cluster/types/v1beta3" + aclient "github.com/akash-network/provider/client" + gwgrpc "github.com/akash-network/provider/gateway/grpc" gwrest "github.com/akash-network/provider/gateway/rest" ) @@ -44,7 +53,7 @@ func doLeaseStatus(cmd *cobra.Command) error { return err } - prov, err := providerFromFlags(cmd.Flags()) + provAddr, err := providerFromFlags(cmd.Flags()) if err != nil { return err } @@ -59,15 +68,90 @@ func doLeaseStatus(cmd *cobra.Command) error { return markRPCServerError(err) } - gclient, err := gwrest.NewClient(ctx, cl, prov, []tls.Certificate{cert}) + prov, err := cl.Provider(ctx, &ptypes.QueryProviderRequest{Owner: provAddr.String()}) if err != nil { - return err + return fmt.Errorf("query client provider: %w", err) } - result, err := gclient.LeaseStatus(cmd.Context(), bid.LeaseID()) + hostURIgRPC, err := grpcURI(prov.GetProvider().HostURI) if err != nil { - return showErrorToUser(err) + return fmt.Errorf("grpc uri: %w", err) + } + + ctxDial, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var leaseStatus gwrest.LeaseStatus + + client, err := gwgrpc.NewClient(ctxDial, hostURIgRPC, cert, cl) + if err == nil { + res, err := client.ServiceStatus(ctx, &leasev1.ServiceStatusRequest{ + LeaseId: bid.LeaseID(), + }) + if err != nil { + return fmt.Errorf("service status: %w", err) + } + + leaseStatus = toLeaseStatus(res) + } else { + gclient, err := gwrest.NewClient(ctx, cl, provAddr, []tls.Certificate{cert}) + if err != nil { + return err + } + + leaseStatus, err = gclient.LeaseStatus(cmd.Context(), bid.LeaseID()) + if err != nil { + return showErrorToUser(err) + } + + } + + return cmdcommon.PrintJSON(cctx, leaseStatus) +} + +func toLeaseStatus(r *leasev1.ServiceStatusResponse) gwrest.LeaseStatus { + s := gwrest.LeaseStatus{ + Services: make(map[string]*cltypes.ServiceStatus), + } + + for _, svc := range r.Services { + s.Services[svc.Name] = &cltypes.ServiceStatus{ + Name: svc.Name, + Available: svc.Status.Available, + Total: svc.Status.Total, + URIs: svc.Status.Uris, + ObservedGeneration: svc.Status.ObservedGeneration, + Replicas: svc.Status.Replicas, + UpdatedReplicas: svc.Status.UpdatedReplicas, + ReadyReplicas: svc.Status.ReadyReplicas, + AvailableReplicas: svc.Status.AvailableReplicas, + } + + if len(svc.Ports) > 0 { + s.ForwardedPorts = make(map[string][]cltypes.ForwardedPortStatus) + } + for _, fp := range svc.Ports { + s.ForwardedPorts[svc.Name] = append(s.ForwardedPorts[svc.Name], cltypes.ForwardedPortStatus{ + Host: fp.Host, + Port: uint16(fp.Port), + ExternalPort: uint16(fp.ExternalPort), + Proto: v2beta2.ServiceProtocol(fp.Proto), + Name: fp.Name, + }) + } + + if len(svc.Ips) > 0 { + s.IPs = make(map[string][]gwrest.LeasedIPStatus) + } + for _, ip := range svc.Ips { + s.IPs[svc.Name] = append(s.IPs[svc.Name], gwrest.LeasedIPStatus{ + Port: ip.Port, + ExternalPort: ip.Port, + Protocol: ip.Protocol, + IP: ip.Ip, + }) + } } - return cmdcommon.PrintJSON(cctx, result) + return s } diff --git a/cmd/provider-services/cmd/manifest.go b/cmd/provider-services/cmd/manifest.go index 79147623b..06447e376 100644 --- a/cmd/provider-services/cmd/manifest.go +++ b/cmd/provider-services/cmd/manifest.go @@ -21,9 +21,7 @@ import ( gwrest "github.com/akash-network/provider/gateway/rest" ) -var ( - errSubmitManifestFailed = errors.New("submit manifest to some providers has been failed") -) +var errSubmitManifestFailed = errors.New("submit manifest to some providers has been failed") // SendManifestCmd looks up the Providers blockchain information, // and POSTs the SDL file to the Gateway address. @@ -146,7 +144,6 @@ func doSendManifest(cmd *cobra.Command, sdlpath string) error { } _, err = fmt.Fprint(cmd.OutOrStdout(), buf.String()) - if err != nil { return err } @@ -157,3 +154,4 @@ func doSendManifest(cmd *cobra.Command, sdlpath string) error { return nil } + diff --git a/cmd/provider-services/cmd/run.go b/cmd/provider-services/cmd/run.go index 5eeec75e8..a30d0c3a9 100644 --- a/cmd/provider-services/cmd/run.go +++ b/cmd/provider-services/cmd/run.go @@ -109,9 +109,7 @@ const ( serviceHostnameOperator = "hostname-operator" ) -var ( - errInvalidConfig = errors.New("Invalid configuration") -) +var errInvalidConfig = errors.New("Invalid configuration") // RunCmd launches the Akash Provider service func RunCmd() *cobra.Command { @@ -201,12 +199,12 @@ func RunCmd() *cobra.Command { panic(err) } - cmd.Flags().String(FlagGatewayListenAddress, "0.0.0.0:8443", "Gateway listen address") + cmd.Flags().String(FlagGatewayListenAddress, "0.0.0.0:8443", "REST gateway listen address") if err := viper.BindPFlag(FlagGatewayListenAddress, cmd.Flags().Lookup(FlagGatewayListenAddress)); err != nil { panic(err) } - cmd.Flags().String(FlagGatewayGRPCListenAddress, "0.0.0.0:8444", "Gateway listen address") + cmd.Flags().String(FlagGatewayGRPCListenAddress, "0.0.0.0:8444", "gRPC gateway listen address") if err := viper.BindPFlag(FlagGatewayGRPCListenAddress, cmd.Flags().Lookup(FlagGatewayGRPCListenAddress)); err != nil { panic(err) } @@ -420,9 +418,11 @@ var allowedBidPricingStrategies = [...]string{ bidPricingStrategyShellScript, } -var errNoSuchBidPricingStrategy = fmt.Errorf("No such bid pricing strategy. Allowed: %v", allowedBidPricingStrategies) -var errInvalidValueForBidPrice = errors.New("not a valid bid price") -var errBidPriceNegative = errors.New("Bid price cannot be a negative number") +var ( + errNoSuchBidPricingStrategy = fmt.Errorf("No such bid pricing strategy. Allowed: %v", allowedBidPricingStrategies) + errInvalidValueForBidPrice = errors.New("not a valid bid price") + errBidPriceNegative = errors.New("Bid price cannot be a negative number") +) func strToBidPriceScale(val string) (decimal.Decimal, error) { v, err := decimal.NewFromString(val) @@ -746,8 +746,17 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { return err } - err = gwgrpc.NewServer(ctx, grpcaddr, []tls.Certificate{tlsCert}, service) - if err != nil { + ctx = gwgrpc.ContextWithQueryClient(ctx, cl.Query()) + + gs := gwgrpc.NewServer(ctx, + gwgrpc.WithCerts([]tls.Certificate{tlsCert}), + gwgrpc.WithProviderClient(service), + gwgrpc.WithClusterClient(service.Cluster()), + gwgrpc.WithClusterSettings(clusterSettings), + gwgrpc.WithIPClient(ipOperatorClient), + ) + + if err = gs.ServeOn(ctx, grpcaddr); err != nil { return err } diff --git a/gateway/grpc/client.go b/gateway/grpc/client.go new file mode 100644 index 000000000..659a68cb9 --- /dev/null +++ b/gateway/grpc/client.go @@ -0,0 +1,76 @@ +package grpc + +import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + ctypes "github.com/akash-network/akash-api/go/node/cert/v1beta3" + leasev1 "github.com/akash-network/akash-api/go/provider/lease/v1" + providerv1 "github.com/akash-network/akash-api/go/provider/v1" + atls "github.com/akash-network/akash-api/go/util/tls" +) + +type Client struct { + providerv1.ProviderRPCClient + leasev1.LeaseRPCClient + + conn *grpc.ClientConn +} + +func (c *Client) Close() error { + return c.conn.Close() +} + +func NewClient(ctx context.Context, addr string, cert tls.Certificate, cquery ctypes.QueryClient) (*Client, error) { + tlsConfig := tls.Config{ + InsecureSkipVerify: true, + Certificates: []tls.Certificate{cert}, + MinVersion: tls.VersionTLS13, + VerifyPeerCertificate: func(chain [][]byte, _ [][]*x509.Certificate) error { + if len(chain) == 0 { + return errors.New("tls: empty chain") + } + + if len(chain) > 1 { + return errors.New("tls: invalid certificate chain") + } + + cert, err := x509.ParseCertificate(chain[0]) + if err != nil { + return fmt.Errorf("x509 parse certificate: %w", err) + } + + _, _, err = atls.ValidatePeerCertificates( + ctx, + cquery, + []*x509.Certificate{cert}, + []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}) + if err != nil { + return fmt.Errorf("validate peer certificates: %w", err) + } + + return nil + }, + } + + conn, err := grpc.DialContext(ctx, addr, + grpc.WithBlock(), + grpc.WithTransportCredentials(credentials.NewTLS(&tlsConfig)), + ) + if err != nil { + return nil, fmt.Errorf("grpc dial context %s: %w", addr, err) + } + + return &Client{ + ProviderRPCClient: providerv1.NewProviderRPCClient(conn), + LeaseRPCClient: leasev1.NewLeaseRPCClient(conn), + + conn: conn, + }, nil +} diff --git a/gateway/grpc/context.go b/gateway/grpc/context.go new file mode 100644 index 000000000..f697c818e --- /dev/null +++ b/gateway/grpc/context.go @@ -0,0 +1,41 @@ +package grpc + +import ( + "context" + + ctypes "github.com/akash-network/akash-api/go/node/cert/v1beta3" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +type ContextKey string + +const ( + ContextKeyQueryClient = ContextKey("query-client") + ContextKeyOwner = ContextKey("owner") +) + +func ContextWithQueryClient(ctx context.Context, c ctypes.QueryClient) context.Context { + return context.WithValue(ctx, ContextKeyQueryClient, c) +} + +func MustQueryClientFromCtx(ctx context.Context) ctypes.QueryClient { + val := ctx.Value(ContextKeyQueryClient) + if val == nil { + panic("context does not have query client set") + } + + return val.(ctypes.QueryClient) +} + +func ContextWithOwner(ctx context.Context, address sdk.Address) context.Context { + return context.WithValue(ctx, ContextKeyOwner, address) +} + +func OwnerFromCtx(ctx context.Context) sdk.Address { + val := ctx.Value(ContextKeyOwner) + if val == nil { + return sdk.AccAddress{} + } + + return val.(sdk.Address) +} diff --git a/gateway/grpc/lease.go b/gateway/grpc/lease.go new file mode 100644 index 000000000..833ec7a10 --- /dev/null +++ b/gateway/grpc/lease.go @@ -0,0 +1,301 @@ +package grpc + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + leasev1 "github.com/akash-network/akash-api/go/provider/lease/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + kubeErrors "k8s.io/apimachinery/pkg/api/errors" + + kubeclienterrors "github.com/akash-network/provider/cluster/kube/errors" + ctypes "github.com/akash-network/provider/cluster/types/v1beta3" + "github.com/akash-network/provider/cluster/types/v1beta3/clients/ip" + crd "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2" + "github.com/akash-network/provider/tools/fromctx" +) + +var ErrNoRunningPods = errors.New("no running pods") + +func (*server) SendManifest(context.Context, *leasev1.SendManifestRequest) (*leasev1.SendManifestResponse, error) { + panic("unimplemented") +} + +func (s *server) ServiceLogs(ctx context.Context, r *leasev1.ServiceLogsRequest) (*leasev1.ServiceLogsResponse, error) { + var ( + id = r.GetLeaseId() + services = strings.Join(r.GetServices(), " ") + lines = int64(0) + ) + + logs, err := s.cc.LeaseLogs(ctx, id, services, true, &lines) + if err != nil { + return nil, fmt.Errorf("lease logs: %w", err) + } + + if len(logs) == 0 { + return nil, ErrNoRunningPods + } + + var resp leasev1.ServiceLogsResponse + for _, l := range logs { + defer l.Stream.Close() + + for l.Scanner.Scan() { + resp.Services = append(resp.Services, + newLeaseV1ServiceLogs(l.Name, l.Scanner.Bytes())) + } + + if err := l.Scanner.Err(); err != nil { + return nil, fmt.Errorf("%s scanner: %w", l.Name, err) + } + } + + return &resp, nil +} + +func (s *server) ServiceStatus(ctx context.Context, r *leasev1.ServiceStatusRequest) (*leasev1.ServiceStatusResponse, error) { + ctx = fromctx.ApplyToContext(ctx, s.clusterSettings) + + id := r.GetLeaseId() + found, m, err := s.cc.GetManifestGroup(ctx, id) + if err != nil { + return nil, fmt.Errorf("get manifest groups: %w", err) + } + + if !found { + return nil, status.Error(codes.NotFound, "lease does not exist") + } + + i := getInfo(m) + + ips := make(map[string][]leasev1.LeaseIPStatus) + if s.ip != nil && i.hasLeasedIPs { + var statuses []ip.LeaseIPStatus + if statuses, err = s.ip.GetIPAddressStatus(ctx, id.OrderID()); err != nil { + return nil, fmt.Errorf("get ip address status: %w", err) + } + + for _, stat := range statuses { + ips[stat.ServiceName] = append(ips[stat.ServiceName], leasev1.LeaseIPStatus{ + Port: stat.Port, + ExternalPort: stat.ExternalPort, + Protocol: stat.Protocol, + Ip: stat.IP, + }) + } + } + + ports := make(map[string][]leasev1.ForwarderPortStatus) + if i.hasForwardedPorts { + var allStatuses map[string][]ctypes.ForwardedPortStatus + if allStatuses, err = s.cc.ForwardedPortStatus(ctx, id); err != nil { + return nil, fmt.Errorf("forward port status: %w", err) + } + + for name, statuses := range allStatuses { + for _, stat := range statuses { + ports[name] = append(ports[name], leasev1.ForwarderPortStatus{ + Name: stat.Name, + Host: stat.Host, + Port: uint32(stat.Port), + ExternalPort: uint32(stat.ExternalPort), + Proto: string(stat.Proto), + }) + } + } + } + + serviceStatus, err := s.cc.LeaseStatus(ctx, id) + switch { + case errors.Is(err, kubeclienterrors.ErrNoDeploymentForLease): + return nil, status.Error(codes.NotFound, "no deployment for lease") + case errors.Is(err, kubeclienterrors.ErrLeaseNotFound): + return nil, status.Error(codes.NotFound, "lease does not exist") + case kubeErrors.IsNotFound(err): + return nil, status.Error(codes.NotFound, "not found") + case err != nil: + return nil, fmt.Errorf("lease status: %w", err) + } + + var resp leasev1.ServiceStatusResponse + for _, svc := range m.Services { + name := svc.Name + ss, ok := serviceStatus[name] + if !ok { + continue + } + + resp.Services = append(resp.Services, leasev1.ServiceStatus{ + Name: name, + Ports: ports[name], + Ips: ips[name], + Status: leasev1.LeaseServiceStatus{ + Available: ss.Available, + Total: ss.Total, + Uris: ss.URIs, + ObservedGeneration: ss.ObservedGeneration, + Replicas: ss.Replicas, + UpdatedReplicas: ss.UpdatedReplicas, + ReadyReplicas: ss.ReadyReplicas, + AvailableReplicas: ss.AvailableReplicas, + }, + }) + } + + return &resp, nil +} + +type manifestGroupInfo struct { + hasLeasedIPs bool + hasForwardedPorts bool +} + +func getInfo(m crd.ManifestGroup) manifestGroupInfo { + var i manifestGroupInfo + + for _, s := range m.Services { + for _, e := range s.Expose { + if len(e.IP) > 0 { + i.hasLeasedIPs = true + } + if e.Global && e.ExternalPort != 80 { + i.hasForwardedPorts = true + } + } + } + + return i +} + +func (s *server) StreamServiceLogs(r *leasev1.ServiceLogsRequest, strm leasev1.LeaseRPC_StreamServiceLogsServer) error { + var ( + ctx = strm.Context() + id = r.GetLeaseId() + services = strings.Join(r.GetServices(), " ") + lines = int64(0) + ) + + logs, err := s.cc.LeaseLogs(ctx, id, services, true, &lines) + if err != nil { + return fmt.Errorf("lease logs: %w", err) + } + + if len(logs) == 0 { + return ErrNoRunningPods + } + + var ( + errs = make(chan error, len(logs)) + stream = &safeLogStream{s: strm} + ) + + for _, ll := range logs { + go func(l *ctypes.ServiceLog) { + defer l.Stream.Close() + defer func() { errs <- nil }() + + for l.Scanner.Scan() { + select { + case <-ctx.Done(): + return + default: + resp := leasev1.ServiceLogsResponse{ + Services: []*leasev1.ServiceLogs{ + newLeaseV1ServiceLogs(l.Name, l.Scanner.Bytes()), + }, + } + + if sendErr := stream.Send(&resp); sendErr != nil { + errs <- fmt.Errorf("stream send: %w", sendErr) + return + } + } + + if scanErr := l.Scanner.Err(); scanErr != nil { + errs <- fmt.Errorf("scanner err: %w", err) + return + } + } + }(ll) + } + + var allErr error + for i := 0; i < len(logs); i++ { + allErr = errors.Join(err, <-errs) + } + if allErr != nil { + return fmt.Errorf("scan all logs: %w", err) + } + + return nil +} + +func newLeaseV1ServiceLogs(name string, buf []byte) *leasev1.ServiceLogs { + logs := make([]byte, len(buf)) + copy(logs, buf) + + return &leasev1.ServiceLogs{ + Name: name, + Logs: buf, + } +} + +type safeLogStream struct { + s leasev1.LeaseRPC_StreamServiceLogsServer + mu sync.Mutex +} + +func (s *safeLogStream) Send(r *leasev1.ServiceLogsResponse) error { + s.mu.Lock() + defer s.mu.Unlock() + + if err := s.s.Send(r); err != nil { + return fmt.Errorf("safe log stream send: %w", err) + } + + return nil +} + +const defaultServiceStatusInterval = 5 * time.Second + +func (s *server) StreamServiceStatus(r *leasev1.ServiceStatusRequest, strm leasev1.LeaseRPC_StreamServiceStatusServer) error { + var ( + ctx = strm.Context() + id = r.GetLeaseId() + ) + + interval := defaultServiceStatusInterval + + if md, ok := metadata.FromIncomingContext(ctx); ok { + i := md.Get("interval")[0] + + var err error + if interval, err = time.ParseDuration(i); err != nil { + return fmt.Errorf("parse duration %s: %w", i, err) + } + } + + t := time.NewTicker(interval) + + for { + select { + case <-ctx.Done(): + return nil + case <-t.C: + res, err := s.ServiceStatus(ctx, &leasev1.ServiceStatusRequest{LeaseId: id}) + if err != nil { + return fmt.Errorf("service status: %w", err) + } + + strm.Send(res) + } + } +} diff --git a/gateway/grpc/provider.go b/gateway/grpc/provider.go new file mode 100644 index 000000000..1e6941e20 --- /dev/null +++ b/gateway/grpc/provider.go @@ -0,0 +1,105 @@ +package grpc + +import ( + "fmt" + "runtime/debug" + + providerv1 "github.com/akash-network/akash-api/go/provider/v1" + "golang.org/x/net/context" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/akash-network/provider/tools/fromctx" + ptypes "github.com/akash-network/provider/types" + "github.com/akash-network/provider/version" +) + +func (s *server) GetStatus(ctx context.Context, _ *emptypb.Empty) (*providerv1.Status, error) { + return s.pc.StatusV1(ctx) +} + +func (s *server) StreamStatus(_ *emptypb.Empty, stream providerv1.ProviderRPC_StreamStatusServer) error { + bus, err := fromctx.PubSubFromCtx(s.ctx) + if err != nil { + return err + } + + events := bus.Sub(ptypes.PubSubTopicProviderStatus) + + for { + select { + case <-s.ctx.Done(): + return s.ctx.Err() + case <-stream.Context().Done(): + return stream.Context().Err() + case evt := <-events: + val := evt.(providerv1.Status) + if err := stream.Send(&val); err != nil { + return err + } + } + } +} + +func (s *server) GetVersion(ctx context.Context, _ *emptypb.Empty) (*providerv1.GetVersionResponse, error) { + v := version.NewInfo() + + k, err := s.cc.KubeVersion() + if err != nil { + return nil, fmt.Errorf("kube version: %w", err) + } + + bd := make([]*providerv1.BuildDep, 0, len(v.BuildDeps)) + for _, b := range v.BuildDeps { + bd = append(bd, toProviderv1BuildDep(b.Module)) + } + + return &providerv1.GetVersionResponse{ + Akash: &providerv1.AkashInfo{ + Name: v.Name, + AppName: v.AppName, + Version: v.Version, + GitCommit: v.GitCommit, + BuildTags: v.BuildTags, + GoVersion: v.GoVersion, + BuildDeps: bd, + }, + Kube: &providerv1.KubeInfo{ + Major: k.Major, + Minor: k.Minor, + GitVersion: k.GitCommit, + GitTreeState: k.GitTreeState, + BuildDate: k.BuildDate, + GoVersion: k.GoVersion, + Compiler: k.Compiler, + Platform: k.Platform, + }, + }, nil +} + +func toProviderv1BuildDep(m *debug.Module) *providerv1.BuildDep { + if m == nil { + return nil + } + + return &providerv1.BuildDep{ + Path: m.Path, + Version: m.Version, + Sum: m.Sum, + Replace: toProviderv1BuildDep(m.Replace), + } +} + +func (s *server) Validate(ctx context.Context, r *providerv1.ValidateRequest) (*providerv1.ValidateResponse, error) { + v, err := s.pc.Validate(ctx, OwnerFromCtx(ctx), r.GetGroup()) + if err != nil { + return nil, fmt.Errorf("validate: %w", err) + } + + return &providerv1.ValidateResponse{ + MinBidPrice: v.MinBidPrice, + }, nil +} + +func (s *server) WIBOY(ctx context.Context, r *providerv1.ValidateRequest) (*providerv1.ValidateResponse, error) { + return s.Validate(ctx, r) +} diff --git a/gateway/grpc/server.go b/gateway/grpc/server.go index 4dfb8cd08..c3d4c0dd5 100644 --- a/gateway/grpc/server.go +++ b/gateway/grpc/server.go @@ -1,6 +1,7 @@ package grpc import ( + "context" "crypto/tls" "crypto/x509" "fmt" @@ -8,70 +9,48 @@ import ( "time" atls "github.com/akash-network/akash-api/go/util/tls" - "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/peer" - "google.golang.org/protobuf/types/known/emptypb" - - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/akash-network/akash-api/go/grpc/gogoreflection" ctypes "github.com/akash-network/akash-api/go/node/cert/v1beta3" + leasev1 "github.com/akash-network/akash-api/go/provider/lease/v1" providerv1 "github.com/akash-network/akash-api/go/provider/v1" "github.com/akash-network/provider" + "github.com/akash-network/provider/cluster" + "github.com/akash-network/provider/cluster/types/v1beta3/clients/ip" "github.com/akash-network/provider/tools/fromctx" - ptypes "github.com/akash-network/provider/types" ) -type ContextKey string - -const ( - ContextKeyQueryClient = ContextKey("query-client") - ContextKeyOwner = ContextKey("owner") +var ( + _ providerv1.ProviderRPCServer = (*server)(nil) + _ leasev1.LeaseRPCServer = (*server)(nil) + _ Server = (*grpcServer)(nil) ) -type grpcProviderV1 struct { - ctx context.Context - client provider.StatusClient +type Server interface { + ServeOn(context.Context, string) error } -var _ providerv1.ProviderRPCServer = (*grpcProviderV1)(nil) - -func QueryClientFromCtx(ctx context.Context) ctypes.QueryClient { - val := ctx.Value(ContextKeyQueryClient) - if val == nil { - panic("context does not have pubsub set") - } - - return val.(ctypes.QueryClient) +type grpcServer struct { + *grpc.Server } -func ContextWithOwner(ctx context.Context, address sdk.Address) context.Context { - return context.WithValue(ctx, ContextKeyOwner, address) -} +type server struct { + ctx context.Context + pc provider.Client + cc cluster.Client -func OwnerFromCtx(ctx context.Context) sdk.Address { - val := ctx.Value(ContextKeyOwner) - if val == nil { - return sdk.AccAddress{} - } + certs []tls.Certificate + ip ip.Client - return val.(sdk.Address) + clusterSettings map[any]any } -func NewServer(ctx context.Context, endpoint string, certs []tls.Certificate, client provider.StatusClient) error { - // InsecureSkipVerify is set to true due to inability to use normal TLS verification - // certificate validation and authentication performed later in mtlsHandler - tlsConfig := &tls.Config{ - Certificates: certs, - ClientAuth: tls.RequestClientCert, - InsecureSkipVerify: true, // nolint: gosec - MinVersion: tls.VersionTLS13, - } - +func (s *grpcServer) ServeOn(ctx context.Context, addr string) error { group, err := fromctx.ErrGroupFromCtx(ctx) if err != nil { return err @@ -79,34 +58,21 @@ func NewServer(ctx context.Context, endpoint string, certs []tls.Certificate, cl log := fromctx.LogcFromCtx(ctx) - grpcSrv := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: 30 * time.Second, - PermitWithoutStream: false, - }), grpc.ChainUnaryInterceptor(mtlsInterceptor())) - - pRPC := &grpcProviderV1{ - ctx: ctx, - client: client, - } - - providerv1.RegisterProviderRPCServer(grpcSrv, pRPC) - gogoreflection.Register(grpcSrv) - group.Go(func() error { - grpcLis, err := net.Listen("tcp", endpoint) + grpcLis, err := net.Listen("tcp", addr) if err != nil { return err } - log.Info(fmt.Sprintf("grpc listening on \"%s\"", endpoint)) + log.Info(fmt.Sprintf("grpc listening on \"%s\"", addr)) - return grpcSrv.Serve(grpcLis) + return s.Serve(grpcLis) }) group.Go(func() error { <-ctx.Done() - grpcSrv.GracefulStop() + s.GracefulStop() return ctx.Err() }) @@ -114,52 +80,98 @@ func NewServer(ctx context.Context, endpoint string, certs []tls.Certificate, cl return nil } -func mtlsInterceptor() grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - if p, ok := peer.FromContext(ctx); ok { - if mtls, ok := p.AuthInfo.(credentials.TLSInfo); ok { - certificates := mtls.State.PeerCertificates +type serverOpts struct { + certs []tls.Certificate + providerClient provider.Client + clusterClient cluster.Client + clusterSettings map[any]any + ipClient ip.Client +} - if len(certificates) > 0 { - cquery := QueryClientFromCtx(ctx) +type opt func(*serverOpts) - owner, _, err := atls.ValidatePeerCertificates(ctx, cquery, certificates, []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}) - if err != nil { - return nil, err - } +func WithCerts(c []tls.Certificate) opt { + return func(so *serverOpts) { so.certs = c } +} - ctx = ContextWithOwner(ctx, owner) - } - } - } +func WithProviderClient(c provider.Client) opt { + return func(so *serverOpts) { so.providerClient = c } +} - return handler(ctx, req) - } +func WithClusterClient(c cluster.Client) opt { + return func(so *serverOpts) { so.clusterClient = c } } -func (gm *grpcProviderV1) GetStatus(ctx context.Context, _ *emptypb.Empty) (*providerv1.Status, error) { - return gm.client.StatusV1(ctx) +func WithClusterSettings(s map[any]any) opt { + return func(so *serverOpts) { so.clusterSettings = s } } -func (gm *grpcProviderV1) StreamStatus(_ *emptypb.Empty, stream providerv1.ProviderRPC_StreamStatusServer) error { - bus, err := fromctx.PubSubFromCtx(gm.ctx) - if err != nil { - return err +func WithIPClient(c ip.Client) opt { + return func(so *serverOpts) { so.ipClient = c } +} + +func NewServer(ctx context.Context, opts ...opt) Server { + var o serverOpts + for _, opt := range opts { + opt(&o) + } + + // InsecureSkipVerify is set to true due to inability to use normal TLS verification + // certificate validation and authentication performed later in mtlsHandler + tlsConfig := &tls.Config{ + Certificates: o.certs, + ClientAuth: tls.RequestClientCert, + InsecureSkipVerify: true, // nolint: gosec + MinVersion: tls.VersionTLS13, + } + + cquery := MustQueryClientFromCtx(ctx) + + g := grpc.NewServer( + grpc.Creds( + credentials.NewTLS(tlsConfig), + ), + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 30 * time.Second, + PermitWithoutStream: false, + }), + grpc.ChainUnaryInterceptor( + mtlsInterceptor(cquery), + ), + ) + + s := &server{ + ctx: ctx, + pc: o.providerClient, + cc: o.clusterClient, + clusterSettings: o.clusterSettings, + ip: o.ipClient, } - events := bus.Sub(ptypes.PubSubTopicProviderStatus) - - for { - select { - case <-gm.ctx.Done(): - return gm.ctx.Err() - case <-stream.Context().Done(): - return stream.Context().Err() - case evt := <-events: - val := evt.(providerv1.Status) - if err := stream.Send(&val); err != nil { - return err + providerv1.RegisterProviderRPCServer(g, s) + leasev1.RegisterLeaseRPCServer(g, s) + gogoreflection.Register(g) + + return &grpcServer{Server: g} +} + +func mtlsInterceptor(cquery ctypes.QueryClient) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + if p, ok := peer.FromContext(ctx); ok { + if mtls, ok := p.AuthInfo.(credentials.TLSInfo); ok { + owner, _, err := atls.ValidatePeerCertificates( + ctx, + cquery, + mtls.State.PeerCertificates, + []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}) + if err != nil { + return nil, fmt.Errorf("validate peer certificates: %w", err) + } + + ctx = ContextWithOwner(ctx, owner) } } + + return handler(ctx, req) } } diff --git a/gateway/grpc/server_test.go b/gateway/grpc/server_test.go new file mode 100644 index 000000000..8ea1439a8 --- /dev/null +++ b/gateway/grpc/server_test.go @@ -0,0 +1,802 @@ +package grpc + +import ( + "bufio" + "context" + "crypto/tls" + "errors" + "io" + "net" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/emptypb" + + types "github.com/akash-network/akash-api/go/node/cert/v1beta3" + qmock "github.com/akash-network/akash-api/go/node/client/v1beta2/mocks" + leasev1 "github.com/akash-network/akash-api/go/provider/lease/v1" + v1 "github.com/akash-network/akash-api/go/provider/lease/v1" + providerv1 "github.com/akash-network/akash-api/go/provider/v1" + "github.com/akash-network/node/testutil" + + "github.com/akash-network/provider" + + sdk "github.com/cosmos/cosmos-sdk/types" + kubeversion "k8s.io/apimachinery/pkg/version" + + cmocks "github.com/akash-network/provider/cluster/mocks" + "github.com/akash-network/provider/cluster/types/v1beta3" + ctypes "github.com/akash-network/provider/cluster/types/v1beta3" + "github.com/akash-network/provider/cluster/types/v1beta3/clients/ip" + ipmocks "github.com/akash-network/provider/cluster/types/v1beta3/clients/ip/mocks" + pmocks "github.com/akash-network/provider/mocks" + "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2" +) + +type client struct { + p providerv1.ProviderRPCClient + l leasev1.LeaseRPCClient +} + +func TestRPCs(t *testing.T) { + var ( + qclient = &qmock.QueryClient{} + com = testutil.CertificateOptionMocks(qclient) + cod = testutil.CertificateOptionDomains([]string{"localhost", "127.0.0.1"}) + ) + + var ( + crt1 = testutil.Certificate(t, testutil.AccAddress(t), com, cod) + crt2 = testutil.Certificate(t, testutil.AccAddress(t), com, cod) + ) + + qclient.EXPECT().Certificates(mock.Anything, mock.Anything).Return(&types.QueryCertificatesResponse{ + Certificates: types.CertificatesResponse{ + types.CertificateResponse{ + Certificate: types.Certificate{ + State: types.CertificateValid, + Cert: crt2.PEM.Cert, + Pubkey: crt2.PEM.Pub, + }, + Serial: crt2.Serial.String(), + }, + }, + }, nil) + + cases := []struct { + desc string + providerClient func(*testing.T) *pmocks.Client + clusterClient func(*testing.T) *cmocks.Client + ipClient func(*testing.T) *ipmocks.Client + run func(context.Context, *testing.T, client) + }{ + // GetStatus + { + desc: "GetStatus", + providerClient: func(t *testing.T) *pmocks.Client { + var m pmocks.Client + m.EXPECT().StatusV1(mock.Anything).Return(&providerv1.Status{}, nil) + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + _, err := c.p.GetStatus(ctx, &emptypb.Empty{}) + assert.NoError(t, err) + }, + }, + + // GetVersion + { + desc: "GetVersion", + clusterClient: func(t *testing.T) *cmocks.Client { + var m cmocks.Client + + m.EXPECT().KubeVersion().Return(&kubeversion.Info{ + Major: "1", + Minor: "2", + GitVersion: "3", + GitTreeState: "4", + BuildDate: "5", + GoVersion: "6", + Compiler: "7", + Platform: "8", + }, nil) + + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + r, err := c.p.GetVersion(ctx, &emptypb.Empty{}) + require.NoError(t, err) + + assert.NotEmpty(t, r.Kube) + assert.NotEmpty(t, r.Akash) + }, + }, + + // Validate + { + desc: "Validate error", + providerClient: func(t *testing.T) *pmocks.Client { + var m pmocks.Client + + m.EXPECT().Validate(mock.Anything, mock.Anything, mock.Anything). + Return(provider.ValidateGroupSpecResult{}, errors.New("boom")) + + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + _, err := c.p.Validate(ctx, &providerv1.ValidateRequest{}) + assert.ErrorContains(t, err, "boom") + }, + }, + { + desc: "Validate", + providerClient: func(t *testing.T) *pmocks.Client { + var m pmocks.Client + + m.EXPECT().Validate(mock.Anything, mock.Anything, mock.Anything). + Return(provider.ValidateGroupSpecResult{ + MinBidPrice: sdk.DecCoin{ + Denom: t.Name(), + Amount: sdk.NewDec(111), + }, + }, nil) + + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + res, err := c.p.Validate(ctx, &providerv1.ValidateRequest{}) + require.NoError(t, err) + + assert.Equal(t, t.Name(), res.MinBidPrice.Denom) + assert.Equal(t, sdk.NewDec(111), res.MinBidPrice.Amount) + }, + }, + + // ServiceLogs + { + desc: "ServiceLogs none", + clusterClient: func(t *testing.T) *cmocks.Client { + var m cmocks.Client + m.EXPECT().LeaseLogs(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]*v1beta3.ServiceLog{}, nil) + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + _, err := c.l.ServiceLogs(ctx, &leasev1.ServiceLogsRequest{}) + assert.ErrorContains(t, err, ErrNoRunningPods.Error()) + }, + }, + { + desc: "ServiceLogs LeaseLogs err", + clusterClient: func(t *testing.T) *cmocks.Client { + var m cmocks.Client + m.EXPECT().LeaseLogs(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil, errors.New("boom")) + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + _, err := c.l.ServiceLogs(ctx, &leasev1.ServiceLogsRequest{}) + assert.ErrorContains(t, err, "boom") + }, + }, + { + desc: "ServiceLogs", + clusterClient: func(t *testing.T) *cmocks.Client { + var m cmocks.Client + + var ( + stream1 = io.NopCloser(strings.NewReader("1_1\n1_2\n1_3")) + stream2 = io.NopCloser(strings.NewReader("2_1\n2_2\n2_3")) + stream3 = io.NopCloser(strings.NewReader("3_1\n3_2\n3_3")) + ) + + m.EXPECT().LeaseLogs(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]*v1beta3.ServiceLog{ + { + Name: "one", + Stream: stream1, + Scanner: bufio.NewScanner(stream1), + }, + { + Name: "two", + Stream: stream2, + Scanner: bufio.NewScanner(stream2), + }, + { + Name: "three", + Stream: stream3, + Scanner: bufio.NewScanner(stream3), + }, + }, nil) + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + resp, err := c.l.ServiceLogs(ctx, &leasev1.ServiceLogsRequest{}) + require.NoError(t, err) + + expected := leasev1.ServiceLogsResponse{ + Services: []*leasev1.ServiceLogs{ + {Name: "one", Logs: []byte("1_1")}, + {Name: "one", Logs: []byte("1_2")}, + {Name: "one", Logs: []byte("1_3")}, + {Name: "two", Logs: []byte("2_1")}, + {Name: "two", Logs: []byte("2_2")}, + {Name: "two", Logs: []byte("2_3")}, + {Name: "three", Logs: []byte("3_1")}, + {Name: "three", Logs: []byte("3_2")}, + {Name: "three", Logs: []byte("3_3")}, + }, + } + + assert.EqualValues(t, &expected, resp) + }, + }, + + // ServiceStatus + { + desc: "ServiceStatus get manifest error", + clusterClient: func(t *testing.T) *cmocks.Client { + var m cmocks.Client + + m.EXPECT().GetManifestGroup(mock.Anything, mock.Anything). + Return(false, v2beta2.ManifestGroup{}, errors.New("boom")) + + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + _, err := c.l.ServiceStatus(ctx, &leasev1.ServiceStatusRequest{}) + assert.ErrorContains(t, err, "boom") + }, + }, + { + desc: "ServiceStatus no manifest group", + clusterClient: func(t *testing.T) *cmocks.Client { + var m cmocks.Client + + m.EXPECT().GetManifestGroup(mock.Anything, mock.Anything). + Return(false, v2beta2.ManifestGroup{}, nil) + + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + _, err := c.l.ServiceStatus(ctx, &leasev1.ServiceStatusRequest{}) + assert.ErrorContains(t, err, "lease does not exist") + }, + }, + { + desc: "ServiceStatus", + clusterClient: func(t *testing.T) *cmocks.Client { + var m cmocks.Client + + mgi := newManifestGroup(t) + mgi.Services[0].Expose[0].IP = "1.2.3.4" + mgi.Services[0].Expose[0].Global = true + mgi.Services[0].Expose[0].ExternalPort = 8111 + + m.EXPECT().GetManifestGroup(mock.Anything, mock.Anything). + Return(true, mgi, nil) + + m.EXPECT().ForwardedPortStatus(mock.Anything, mock.Anything). + Return(map[string][]ctypes.ForwardedPortStatus{ + serviceName(t): { + { + Host: "host", + Port: 1111, + ExternalPort: 1112, + Proto: "test", + Name: serviceName(t), + }, + }, + }, nil) + + m.EXPECT().LeaseStatus(mock.Anything, mock.Anything). + Return(map[string]*ctypes.ServiceStatus{ + serviceName(t): { + Name: serviceName(t), + Available: 111, + Total: 222, + URIs: []string{"1", "2", "3"}, + ObservedGeneration: 1, + Replicas: 2, + UpdatedReplicas: 3, + ReadyReplicas: 4, + AvailableReplicas: 4, + }, + }, nil) + + return &m + }, + ipClient: func(t *testing.T) *ipmocks.Client { + var m ipmocks.Client + + m.EXPECT().GetIPAddressStatus(mock.Anything, mock.Anything). + Return([]ip.LeaseIPStatus{ + { + Port: 3333, + ExternalPort: 3334, + ServiceName: serviceName(t), + IP: "4.5.6.7", + Protocol: "test", + }, + }, nil) + + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + s, err := c.l.ServiceStatus(ctx, &leasev1.ServiceStatusRequest{ + Services: []string{serviceName(t)}, + }) + require.NoError(t, err) + + expected := v1.ServiceStatusResponse{ + Services: []v1.ServiceStatus{ + { + Name: serviceName(t), + Status: v1.LeaseServiceStatus{ + Available: 111, + Total: 222, + Uris: []string{"1", "2", "3"}, + ObservedGeneration: 1, + Replicas: 2, + UpdatedReplicas: 3, + ReadyReplicas: 4, + AvailableReplicas: 4, + }, + Ports: []v1.ForwarderPortStatus{ + { + Host: "host", + Port: 1111, + ExternalPort: 1112, + Proto: "test", + Name: serviceName(t), + }, + }, + Ips: []v1.LeaseIPStatus{ + { + Port: 3333, + ExternalPort: 3334, + Protocol: "test", + Ip: "4.5.6.7", + }, + }, + }, + }, + } + + assert.Equal(t, &expected, s) + }, + }, + + // StreamServiceLogs + { + desc: "StreamServiceLogs none", + clusterClient: func(t *testing.T) *cmocks.Client { + var m cmocks.Client + m.EXPECT().LeaseLogs(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]*v1beta3.ServiceLog{}, nil) + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + s, err := c.l.StreamServiceLogs(ctx, &leasev1.ServiceLogsRequest{}) + require.NoError(t, err) + + _, err = s.Recv() + assert.ErrorContains(t, err, ErrNoRunningPods.Error()) + }, + }, + { + desc: "StreamServiceLogs LeaseLogs err", + clusterClient: func(t *testing.T) *cmocks.Client { + var m cmocks.Client + m.EXPECT().LeaseLogs(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil, errors.New("boom")) + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + s, err := c.l.StreamServiceLogs(ctx, &leasev1.ServiceLogsRequest{}) + require.NoError(t, err) + + _, err = s.Recv() + assert.ErrorContains(t, err, "boom") + }, + }, + { + desc: "StreamServiceLogs one", + clusterClient: func(t *testing.T) *cmocks.Client { + var m cmocks.Client + + stream := io.NopCloser(strings.NewReader("1\n2\n3")) + + m.EXPECT().LeaseLogs(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]*v1beta3.ServiceLog{ + { + Name: "one", + Stream: stream, + Scanner: bufio.NewScanner(stream), + }, + }, nil) + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + s, err := c.l.StreamServiceLogs(ctx, &leasev1.ServiceLogsRequest{}) + require.NoError(t, err) + + var ( + expected = []string{"1", "2", "3"} + actual = make([]string, 0, len(expected)) + ) + + for { + r, err := s.Recv() + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) + actual = append(actual, string(r.Services[0].Logs)) + } + + assert.Equal(t, expected, actual) + }, + }, + { + desc: "StreamServiceLogs multiple", + clusterClient: func(t *testing.T) *cmocks.Client { + var m cmocks.Client + + var ( + stream1 = io.NopCloser(strings.NewReader("1_1\n1_2\n1_3")) + stream2 = io.NopCloser(strings.NewReader("2_1\n2_2\n2_3")) + stream3 = io.NopCloser(strings.NewReader("3_1\n3_2\n3_3")) + ) + + m.EXPECT().LeaseLogs(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]*v1beta3.ServiceLog{ + { + Name: "one", + Stream: stream1, + Scanner: bufio.NewScanner(stream1), + }, + { + Name: "two", + Stream: stream2, + Scanner: bufio.NewScanner(stream2), + }, + { + Name: "three", + Stream: stream3, + Scanner: bufio.NewScanner(stream3), + }, + }, nil) + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + s, err := c.l.StreamServiceLogs(ctx, &leasev1.ServiceLogsRequest{}) + require.NoError(t, err) + + var ( + expected = map[string][]string{ + "one": {"1_1", "1_2", "1_3"}, + "two": {"2_1", "2_2", "2_3"}, + "three": {"3_1", "3_2", "3_3"}, + } + actual = make(map[string][]string) + ) + + for { + r, err := s.Recv() + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) + + for _, s := range r.Services { + actual[s.Name] = append(actual[s.Name], string(s.Logs)) + } + } + + assert.Equal(t, expected, actual) + }, + }, + + // StreamServiceStatus + { + desc: "StreamServiceStatus", + clusterClient: func(t *testing.T) *cmocks.Client { + var m cmocks.Client + + mgi := newManifestGroup(t) + + m.EXPECT().GetManifestGroup(mock.Anything, mock.Anything). + Return(true, mgi, nil) + + m.EXPECT().LeaseStatus(mock.Anything, mock.Anything). + Return(nil, nil) + + return &m + }, + run: func(ctx context.Context, t *testing.T, c client) { + interval := 500 * time.Millisecond + + ctx = metadata.AppendToOutgoingContext(ctx, "interval", interval.String()) + s, err := c.l.StreamServiceStatus(ctx, &leasev1.ServiceStatusRequest{}) + require.NoError(t, err) + + var ( + iterations = 3 + after = time.After(interval * time.Duration(iterations)) + hits int + ) + + for { + select { + case <-after: + assert.Equal(t, iterations, hits) + return + default: + _, err = s.Recv() + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) + hits++ + } + } + }, + }, + } + + for _, c := range cases { + c := c + + t.Run(c.desc, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = ContextWithQueryClient(ctx, qclient) + + var ( + pc *pmocks.Client + cc *cmocks.Client + ip *ipmocks.Client + ) + + if c.providerClient != nil { + pc = c.providerClient(t) + defer pc.AssertExpectations(t) + } + if c.clusterClient != nil { + cc = c.clusterClient(t) + defer cc.AssertExpectations(t) + } + if c.ipClient != nil { + ip = c.ipClient(t) + defer ip.AssertExpectations(t) + } + + s := NewServer(ctx, + WithCerts(crt1.Cert), + WithProviderClient(pc), + WithClusterClient(cc), + WithIPClient(ip), + ).(*grpcServer) + + defer s.Stop() + + l, err := net.Listen("tcp", ":0") + require.NoError(t, err) + + go func() { + require.NoError(t, s.Serve(l)) + }() + + tlsConfig := tls.Config{ + InsecureSkipVerify: true, + Certificates: crt2.Cert, + } + + conn, err := grpc.DialContext(ctx, l.Addr().String(), + grpc.WithTransportCredentials(credentials.NewTLS(&tlsConfig))) + require.NoError(t, err) + + defer conn.Close() + + c.run(ctx, t, client{ + p: providerv1.NewProviderRPCClient(conn), + l: leasev1.NewLeaseRPCClient(conn), + }) + }) + } +} + +func Test_getInfo(t *testing.T) { + cases := []struct { + desc string + manifest func(t *testing.T) v2beta2.ManifestGroup + mgi manifestGroupInfo + }{ + { + desc: "has leased ips", + manifest: func(t *testing.T) v2beta2.ManifestGroup { + mgi := newManifestGroup(t) + + mgi.Services[0].Expose[0].IP = "1.2.3.4" + + return mgi + }, + mgi: manifestGroupInfo{ + hasLeasedIPs: true, + }, + }, + { + desc: "has forwarded ports", + manifest: func(t *testing.T) v2beta2.ManifestGroup { + mgi := newManifestGroup(t) + + mgi.Services[0].Expose[0].Global = true + mgi.Services[0].Expose[0].ExternalPort = 8111 + + return mgi + }, + mgi: manifestGroupInfo{ + hasForwardedPorts: true, + }, + }, + } + + for _, c := range cases { + c := c + + t.Run(c.desc, func(t *testing.T) { + mgi := getInfo(c.manifest(t)) + assert.Equal(t, c.mgi, mgi) + }) + } +} + +func newManifestGroup(t *testing.T) v2beta2.ManifestGroup { + t.Helper() + + return v2beta2.ManifestGroup{ + Name: serviceName(t), + Services: []v2beta2.ManifestService{{ + Name: serviceName(t), + Image: t.Name() + "_image", + Args: nil, + Env: nil, + Resources: v2beta2.Resources{ + CPU: v2beta2.ResourceCPU{ + Units: 1000, + }, + Memory: v2beta2.ResourceMemory{ + Size: "3333", + }, + Storage: v2beta2.ResourceStorage{ + { + Name: "default", + Size: "4444", + }, + }, + }, + Count: 1, + Expose: []v2beta2.ManifestServiceExpose{{ + Port: 8080, + ExternalPort: 80, + Proto: "TCP", + Service: serviceName(t), + Global: true, + Hosts: []string{"hello.localhost"}, + HTTPOptions: v2beta2.ManifestServiceExposeHTTPOptions{ + MaxBodySize: 1, + ReadTimeout: 2, + SendTimeout: 3, + NextTries: 4, + NextTimeout: 5, + NextCases: nil, + }, + IP: "", + EndpointSequenceNumber: 1, + }}, + Params: nil, + }}, + } +} + +func serviceName(t *testing.T) string { + return t.Name() + "_service" +} + +func TestMTLS(t *testing.T) { + var ( + qclient = &qmock.QueryClient{} + com = testutil.CertificateOptionMocks(qclient) + cod = testutil.CertificateOptionDomains([]string{"localhost", "127.0.0.1"}) + ) + + crt := testutil.Certificate(t, testutil.AccAddress(t), com, cod) + + qclient.EXPECT().Certificates(mock.Anything, mock.Anything).Return(&types.QueryCertificatesResponse{ + Certificates: types.CertificatesResponse{ + types.CertificateResponse{ + Certificate: types.Certificate{ + State: types.CertificateValid, + Cert: crt.PEM.Cert, + Pubkey: crt.PEM.Pub, + }, + Serial: crt.Serial.String(), + }, + }, + }, nil) + + cases := []struct { + desc string + certs func() []tls.Certificate + errContains string + }{ + { + desc: "good cert", + certs: func() []tls.Certificate { + return crt.Cert + }, + }, + { + desc: "empty chain", + certs: func() []tls.Certificate { + return nil + }, + errContains: "too many peer certificates", + }, + } + + for _, c := range cases { + c := c + + t.Run(c.desc, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = ContextWithQueryClient(ctx, qclient) + + var m pmocks.Client + m.EXPECT().StatusV1(mock.Anything).Return(&providerv1.Status{}, nil) + + s := NewServer(ctx, + WithCerts(crt.Cert), + WithProviderClient(&m), + ).(*grpcServer) + + defer s.Stop() + + l, err := net.Listen("tcp", ":0") + require.NoError(t, err) + + go func() { + require.NoError(t, s.Serve(l)) + }() + + tlsConfig := tls.Config{ + InsecureSkipVerify: true, + Certificates: c.certs(), + } + + conn, err := grpc.DialContext(ctx, l.Addr().String(), + grpc.WithTransportCredentials(credentials.NewTLS(&tlsConfig))) + require.NoError(t, err) + + defer conn.Close() + + _, err = providerv1.NewProviderRPCClient(conn).GetStatus(ctx, &emptypb.Empty{}) + if c.errContains != "" { + assert.ErrorContains(t, err, c.errContains) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/go.mod b/go.mod index 1d548b894..9579bcbb2 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/akash-network/provider go 1.21 require ( - github.com/akash-network/akash-api v0.0.66 - github.com/akash-network/node v0.34.0 + github.com/akash-network/akash-api v0.0.67 + github.com/akash-network/node v0.34.1 github.com/avast/retry-go/v4 v4.5.0 github.com/blang/semver/v4 v4.0.0 github.com/boz/go-lifecycle v0.1.1 @@ -39,7 +39,7 @@ require ( golang.org/x/net v0.24.0 golang.org/x/sync v0.7.0 google.golang.org/grpc v1.63.2 - google.golang.org/protobuf v1.33.0 + google.golang.org/protobuf v1.34.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.26.1 k8s.io/apimachinery v0.26.1 @@ -53,6 +53,8 @@ replace ( // use cosmos fork of keyring github.com/99designs/keyring => github.com/cosmos/keyring v1.2.0 + github.com/akash-network/akash-api => github.com/akash-network/akash-api v0.0.68-0.20240505172348-f082c5cdbb68 + github.com/cosmos/ledger-cosmos-go => github.com/akash-network/ledger-go/cosmos v0.14.4 // Fix upstream GHSA-h395-qcrw-5vmq vulnerability. @@ -73,7 +75,7 @@ require ( cosmossdk.io/depinject v1.0.0-alpha.3 // indirect filippo.io/edwards25519 v1.1.0 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect - github.com/99designs/keyring v1.2.1 // indirect + github.com/99designs/keyring v1.2.2 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/BurntSushi/toml v1.2.1 // indirect github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d // indirect @@ -87,6 +89,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bgentry/speakeasy v0.1.1-0.20220910012023-760eaf8b6816 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect + github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -109,14 +112,14 @@ require ( github.com/cosmos/ibc-go/v4 v4.6.0 // indirect github.com/cosmos/ledger-cosmos-go v0.12.2 // indirect github.com/creachadair/taskgroup v0.3.2 // indirect - github.com/danieljoos/wincred v1.1.2 // indirect + github.com/danieljoos/wincred v1.2.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/dgraph-io/badger/v2 v2.2007.4 // indirect github.com/dgraph-io/ristretto v0.0.3 // indirect github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/dustin/go-humanize v1.0.0 // indirect - github.com/dvsekhvalnov/jose2go v1.5.0 // indirect + github.com/dvsekhvalnov/jose2go v1.7.0 // indirect github.com/edwingeng/deque/v2 v2.1.1 // indirect github.com/emicklei/go-restful/v3 v3.10.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect @@ -175,7 +178,7 @@ require ( github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3 // indirect github.com/huandu/xstrings v1.4.0 // indirect github.com/iancoleman/strcase v0.2.0 // indirect - github.com/imdario/mergo v0.3.13 // indirect + github.com/imdario/mergo v0.3.16 // indirect github.com/improbable-eng/grpc-web v0.14.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jaypipes/pcidb v1.0.0 // indirect @@ -258,9 +261,9 @@ require ( golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.17.0 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect - google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect + google.golang.org/genproto v0.0.0-20240429193739-8cf5692501f6 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240429193739-8cf5692501f6 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect diff --git a/go.sum b/go.sum index 0b2f91444..3219e6e27 100644 --- a/go.sum +++ b/go.sum @@ -22,7 +22,7 @@ cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmW cloud.google.com/go v0.78.0/go.mod h1:QjdrLG0uq+YwhjoVOLsS1t7TW8fs36kLs4XO5R5ECHg= cloud.google.com/go v0.79.0/go.mod h1:3bzgcEeQlzbuEAYu4mrWhKqWjmpprinYgKJLgKHnbb8= cloud.google.com/go v0.81.0/go.mod h1:mk/AM35KwGk/Nm2YSeZbxXdrNK3KZOYHmLkOqC2V6E0= -cloud.google.com/go v0.112.0 h1:tpFCD7hpHFlQ8yPwT3x+QeXqc2T6+n6T+hmABHfDUSM= +cloud.google.com/go v0.112.2 h1:ZaGT6LiG7dBzi6zNOvVZwacaXlmf3lRqnC4DQzqyRQw= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= @@ -30,19 +30,19 @@ cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUM cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc= cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= cloud.google.com/go/bigtable v1.2.0/go.mod h1:JcVAOl45lrTmQfLj7T6TxyMzIN/3FGGcFm+2xVAli2o= -cloud.google.com/go/compute v1.24.0 h1:phWcR2eWzRJaL/kOiJwfFsPs4BaKq1j6vnpZrc1YlVg= -cloud.google.com/go/compute v1.24.0/go.mod h1:kw1/T+h/+tK2LJK0wiPPx1intgdAM3j/g3hFDlscY40= +cloud.google.com/go/compute v1.25.1 h1:ZRpHJedLtTpKgr3RV1Fx23NuaAEN1Zfx9hw1u4aJdjU= +cloud.google.com/go/compute v1.25.1/go.mod h1:oopOIR53ly6viBYxaDhBfJwzUAxf1zE//uf3IB011ls= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk= -cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc= -cloud.google.com/go/iam v1.1.6/go.mod h1:O0zxdPeGBoFdWW3HWmBxJsk0pfvNM/p/qa82rWOGTwI= +cloud.google.com/go/iam v1.1.7 h1:z4VHOhwKLF/+UYXAJDFwGtNF0b6gjsW1Pk9Ml0U/IoM= +cloud.google.com/go/iam v1.1.7/go.mod h1:J4PMPg8TtyurAUvSmPj8FF3EDgY1SPRZxcUGrn7WXGA= cloud.google.com/go/kms v1.15.8 h1:szIeDCowID8th2i8XE4uRev5PMxQFqW+JjwYxL9h6xs= cloud.google.com/go/kms v1.15.8/go.mod h1:WoUHcDjD9pluCg7pNds131awnH429QGvRM3N/4MyoVs= -cloud.google.com/go/monitoring v1.18.0 h1:NfkDLQDG2UR3WYZVQE8kwSbUIEyIqJUPl+aOQdFH1T4= -cloud.google.com/go/monitoring v1.18.0/go.mod h1:c92vVBCeq/OB4Ioyo+NbN2U7tlg5ZH41PZcdvfc+Lcg= +cloud.google.com/go/monitoring v1.18.2 h1:nIQdZdf1/9M0cEmXSlLB2jMq/k3CRh9p3oUzS06VDG8= +cloud.google.com/go/monitoring v1.18.2/go.mod h1:MuL95M6d9HtXQOaWP9JxhFZJKP+fdTF0Gt5xl4IDsew= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= @@ -197,16 +197,16 @@ github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= -github.com/akash-network/akash-api v0.0.66 h1:HGYbjLmnKj7hNIO2V7f6CiHJfZJzeOBCIV45gRo/AbY= -github.com/akash-network/akash-api v0.0.66/go.mod h1:PdOQGTCX3kLBoKHdbPF9pe5+vSLANaMJbgA04UE+OqY= +github.com/akash-network/akash-api v0.0.68-0.20240505172348-f082c5cdbb68 h1:wlTZfjQRVsRKWObgpTiBuUOqxPoQ2bKxVvmMuSDuYqA= +github.com/akash-network/akash-api v0.0.68-0.20240505172348-f082c5cdbb68/go.mod h1:VETP3ilOwLIRP40WPy806ygL/IzgGEuMoxXaCUZpRPI= github.com/akash-network/cometbft v0.34.27-akash h1:V1dApDOr8Ee7BJzYyQ7Z9VBtrAul4+baMeA6C49dje0= github.com/akash-network/cometbft v0.34.27-akash/go.mod h1:BcCbhKv7ieM0KEddnYXvQZR+pZykTKReJJYf7YC7qhw= github.com/akash-network/ledger-go v0.14.3 h1:LCEFkTfgGA2xFMN2CtiKvXKE7dh0QSM77PJHCpSkaAo= github.com/akash-network/ledger-go v0.14.3/go.mod h1:NfsjfFvno9Kaq6mfpsKz4sqjnAVVEsVsnBJfKB4ueAs= github.com/akash-network/ledger-go/cosmos v0.14.4 h1:h3WiXmoKKs9wkj1LHcJ12cLjXXg6nG1fp+UQ5+wu/+o= github.com/akash-network/ledger-go/cosmos v0.14.4/go.mod h1:SjAfheQTE4rWk0ir+wjbOWxwj8nc8E4AZ08NdsvYG24= -github.com/akash-network/node v0.34.0 h1:qBLEJlMDs7hSt1skomdPAtTXIXiAAmezYob8V+gG5Ks= -github.com/akash-network/node v0.34.0/go.mod h1:EnqNTPmvkKK0CHO1SqyF5ozAPJXpgmyFpBGak+KcPDY= +github.com/akash-network/node v0.34.1 h1:5ky3Q1dgXgGkcZA0y0AjEshi3fL7bk76OqCeh5ecMTs= +github.com/akash-network/node v0.34.1/go.mod h1:lPxn9dDCAXXflq9o1bqRH4DsLOiQaXIwCnU0l/nOLUs= github.com/alecthomas/participle/v2 v2.0.0-alpha7 h1:cK4vjj0VSgb3lN1nuKA5F7dw+1s1pWBe5bx7nNCnN+c= github.com/alecthomas/participle/v2 v2.0.0-alpha7/go.mod h1:NumScqsC42o9x+dGj8/YqsIfhrIQjFEOFovxotbBirA= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -306,8 +306,9 @@ github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY github.com/btcsuite/btcd/btcutil v1.1.2 h1:XLMbX8JQEiwMcYft2EGi8zPUkoa0abKIU6/BJSRsjzQ= github.com/btcsuite/btcd/btcutil v1.1.2/go.mod h1:UR7dsSJzJUfMmFiiLlIrMq1lS9jh9EdCV7FStZSnpi0= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.0/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= -github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= +github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 h1:59Kx4K6lzOW5w6nFlA0v5+lk/6sjybR934QNHSJZPTQ= +github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= @@ -454,8 +455,8 @@ github.com/cucumber/common/gherkin/go/v22 v22.0.0/go.mod h1:3mJT10B2GGn3MvVPd3Fw github.com/cucumber/common/messages/go/v17 v17.1.1 h1:RNqopvIFyLWnKv0LfATh34SWBhXeoFTJnSrgm9cT/Ts= github.com/cucumber/common/messages/go/v17 v17.1.1/go.mod h1:bpGxb57tDE385Rb2EohgUadLkAbhoC4IyCFi89u/JQI= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= -github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0= -github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0= +github.com/danieljoos/wincred v1.2.1 h1:dl9cBrupW8+r5250DYkYxocLeZ1Y4vB1kxgtjxw8GQs= +github.com/danieljoos/wincred v1.2.1/go.mod h1:uGaFL9fDn3OLTvzCGulzE+SzjEe5NGlh5FdCcyfPwps= github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -520,8 +521,8 @@ github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74/go.mod github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM= -github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= +github.com/dvsekhvalnov/jose2go v1.7.0 h1:bnQc8+GMnidJZA8zc6lLEAb4xNrIqHwO+9TzqvtQZPo= +github.com/dvsekhvalnov/jose2go v1.7.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= @@ -1190,8 +1191,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1: github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= -github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk= -github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= +github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= +github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= github.com/improbable-eng/grpc-web v0.14.1 h1:NrN4PY71A6tAz2sKDvC5JCauENWp0ykG8Oq1H3cpFvw= github.com/improbable-eng/grpc-web v0.14.1/go.mod h1:zEjGHa8DAlkoOXmswrNvhUGEYQA9UI7DhrGeHR1DMGU= @@ -2311,7 +2312,6 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -2549,12 +2549,12 @@ google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaE google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24= google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= -google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY= -google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo= -google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2 h1:rIo7ocm2roD9DcFIX67Ym8icoGCKSARAiPljFhh5suQ= -google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2/go.mod h1:O1cOfN1Cy6QEYr7VxtjOyP5AdAuR0aJ/MYZaaof623Y= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/genproto v0.0.0-20240429193739-8cf5692501f6 h1:MTmrc2F5TZKDKXigcZetYkH04YwqtOPEQJwh4PPOgfk= +google.golang.org/genproto v0.0.0-20240429193739-8cf5692501f6/go.mod h1:2ROWwqCIx97Y7CSyp11xB8fori0wzvD6+gbacaf5c8I= +google.golang.org/genproto/googleapis/api v0.0.0-20240429193739-8cf5692501f6 h1:DTJM0R8LECCgFeUwApvcEJHz85HLagW8uRENYxHh1ww= +google.golang.org/genproto/googleapis/api v0.0.0-20240429193739-8cf5692501f6/go.mod h1:10yRODfgim2/T8csjQsMPgZOMvtytXKTDRzH6HRGzRw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6 h1:DujSIu+2tC9Ht0aPNA7jgj23Iq8Ewi5sgkQ++wdvonE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= @@ -2606,8 +2606,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.0 h1:Qo/qEd2RZPCf2nKuorzksSknv0d3ERwp1vFG38gSmH4= +google.golang.org/protobuf v1.34.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -2665,7 +2665,6 @@ gopkg.in/yaml.v3 v3.0.0-20191120175047-4206685974f2/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= diff --git a/integration/migrate_hostname_test.go b/integration/migrate_hostname_test.go index b8fb48988..d47a969a0 100644 --- a/integration/migrate_hostname_test.go +++ b/integration/migrate_hostname_test.go @@ -263,7 +263,7 @@ func (s *E2EMigrateHostname) TestE2EMigrateHostname() { fmt.Sprintf("--%s=%s", flags.FlagHome, s.validator.ClientCtx.HomeDir), ) s.Require().Error(err) - s.Require().Contains(err.Error(), "remote server returned 404") + s.Require().Contains(err.Error(), "lease does not exist") s.Require().NotNil(cmdResult) // confirm hostname still reachable, on the hostname it was migrated to