From 415a53d3e0615ce6e208f626bef58bb1db3139c6 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Tue, 18 Jan 2022 20:52:59 +0530 Subject: [PATCH 01/13] feat: Add resource server to serve loki logs --- _run/common-commands.mk | 12 ++- _run/common-kind.mk | 31 +++++-- _run/promtail-values.yaml | 7 ++ provider/cmd/root.go | 1 + provider/cmd/run_resource_server.go | 120 ++++++++++++++++++++++++++++ provider/gateway/rest/middleware.go | 42 ++++++++++ provider/gateway/rest/router.go | 60 ++++++++++++++ provider/gateway/rest/server.go | 17 ++++ 8 files changed, 282 insertions(+), 8 deletions(-) create mode 100644 _run/promtail-values.yaml create mode 100644 provider/cmd/run_resource_server.go diff --git a/_run/common-commands.mk b/_run/common-commands.mk index 0e27631b7f..11ff4c7273 100644 --- a/_run/common-commands.mk +++ b/_run/common-commands.mk @@ -15,8 +15,9 @@ PRICE ?= 10uakt CERT_HOSTNAME ?= localhost LEASE_SERVICES ?= web -JWT_AUTH_HOSTNAME ?= localhost -JWT_AUTH_HOST ?= $(JWT_AUTH_HOSTNAME):8444 +JWT_AUTH_HOSTNAME ?= localhost +JWT_AUTH_HOST ?= $(JWT_AUTH_HOSTNAME):8444 +RESOURCE_SERVER_HOST ?= localhost:8445 .PHONY: multisig-send multisig-send: @@ -70,6 +71,13 @@ auth-server: --from "$(PROVIDER_KEY_NAME)" \ --jwt-auth-listen-address "$(JWT_AUTH_HOST)" \ +.PHONY: run-resource-server +run-resource-server: + $(AKASH) provider run-resource-server \ + --from "$(PROVIDER_KEY_NAME)" \ + --resource-server-listen-address "$(RESOURCE_SERVER_HOST)" \ + --loki-gateway-listen-address localhost:3100 \ + .PHONY: send-manifest send-manifest: $(AKASH) provider send-manifest "$(SDL_PATH)" \ diff --git a/_run/common-kind.mk b/_run/common-kind.mk index ca2d5e9df5..dbdcf2d798 100644 --- a/_run/common-kind.mk +++ b/_run/common-kind.mk @@ -76,6 +76,7 @@ kind-cluster-create: $(KIND) --config "$(KIND_CONFIG)" \ --name "$(KIND_NAME)" \ --image "$(KIND_IMG)" + chmod 600 $$HOME/.kube/config kubectl label nodes $(KIND_NAME)-control-plane akash.network/role=ingress kubectl apply -f "$(INGRESS_CONFIG_PATH)" kubectl apply -f "$(INGRESS_CLASS_CONFIG_PATH)" @@ -83,20 +84,38 @@ kind-cluster-create: $(KIND) kubectl apply -f "$(METALLB_IP_CONFIG_PATH)" kubectl apply -f "$(METALLB_SERVICE_PATH)" "$(AKASH_ROOT)/script/setup-kind.sh" + helm repo add grafana https://grafana.github.io/helm-charts + helm repo update + helm upgrade --install loki grafana/loki \ + --version 2.9.1 \ + --create-namespace \ + --namespace loki-stack \ + --set config.table_manager.retention_period=48h,config.table_manager.retention_deletes_enabled=true,persistence.enabled=true,persistence.size=10Gi,persistence.storageClassName=standard,config.auth_enabled=true + helm upgrade --install promtail grafana/promtail \ + --version 3.11.0 \ + --namespace loki-stack \ + -f ../promtail-values.yaml + -# Create a kubernetes cluster with loki & grafana integrated for logging. +# Create a kubernetes cluster with multi-tenant loki, promtail and grafana integrated for logging. # See: https://www.scaleway.com/en/docs/tutorials/manage-k8s-logging-loki/ for more info. .PHONY: kind-cluster-loki-create kind-cluster-loki-create: kind-cluster-create helm repo add grafana https://grafana.github.io/helm-charts helm repo update - helm install loki-stack grafana/loki-stack \ + helm upgrade --install loki grafana/loki \ + --version 2.9.1 \ --create-namespace \ --namespace loki-stack \ - --set promtail.enabled=true,loki.persistence.enabled=true,loki.persistence.size=10Gi - helm install loki-grafana grafana/grafana \ - --set persistence.enabled=true,persistence.type=pvc,persistence.size=10Gi \ - --namespace=loki-stack + --set persistence.enabled=true,persistence.size=10Gi,config.auth_enabled=true + helm upgrade --install promtail grafana/promtail \ + --version 3.11.0 \ + --namespace loki-stack \ + -f ../promtail-values.yaml + helm upgrade --install grafana grafana/grafana \ + --version 6.21.2 \ + --namespace loki-stack \ + --set persistence.enabled=true,persistence.type=pvc,persistence.size=10Gi .PHONY: kind-cluster-calico-create kind-cluster-calico-create: $(KIND) diff --git a/_run/promtail-values.yaml b/_run/promtail-values.yaml new file mode 100644 index 0000000000..2e3ce716c3 --- /dev/null +++ b/_run/promtail-values.yaml @@ -0,0 +1,7 @@ +config: + lokiAddress: http://loki.loki-stack:3100/loki/api/v1/push + snippets: + pipelineStages: + - cri: {} + - tenant: + source: namespace \ No newline at end of file diff --git a/provider/cmd/root.go b/provider/cmd/root.go index 31e5a911a7..7c96443442 100644 --- a/provider/cmd/root.go +++ b/provider/cmd/root.go @@ -36,6 +36,7 @@ func RootCmd() *cobra.Command { cmd.AddCommand(AuthenticateCmd()) cmd.AddCommand(clusterNSCmd()) cmd.AddCommand(migrate()) + cmd.AddCommand(RunResourceServerCmd()) return cmd } diff --git a/provider/cmd/run_resource_server.go b/provider/cmd/run_resource_server.go new file mode 100644 index 0000000000..8ad34892ad --- /dev/null +++ b/provider/cmd/run_resource_server.go @@ -0,0 +1,120 @@ +package cmd + +import ( + "bytes" + "context" + "crypto/ecdsa" + "crypto/x509" + "encoding/pem" + "errors" + "io" + "net/http" + + "golang.org/x/sync/errgroup" + + sdkclient "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/client/flags" + "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/ovrclk/akash/cmd/common" + gwrest "github.com/ovrclk/akash/provider/gateway/rest" + cutils "github.com/ovrclk/akash/x/cert/utils" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +const ( + FlagResourceServerListenAddress = "resource-server-listen-address" + FlagLokiGatewayListenAddress = "loki-gateway-listen-address" +) + +var ( + ErrEcdsaPubkeyExpected = errors.New("expected a ecdsa public key") +) + +func RunResourceServerCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "run-resource-server", + Short: "Run the resource server which authenticates tenants based on JWT before" + + " providing access to resources", + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + return common.RunForeverWithContext(cmd.Context(), func(ctx context.Context) error { + return doRunResourceServer(ctx, cmd, args) + }) + }, + } + flags.AddTxFlagsToCmd(cmd) + + cmd.Flags().String(FlagResourceServerListenAddress, "0.0.0.0:8445", + "`host:port` for the resource server to listen on") + if err := viper.BindPFlag(FlagResourceServerListenAddress, cmd.Flags().Lookup(FlagResourceServerListenAddress)); err != nil { + return nil + } + + cmd.Flags().String(FlagLokiGatewayListenAddress, "localhost:3100", + "`host:port` where the loki instance is exposed on provider's network") + if err := viper.BindPFlag(FlagLokiGatewayListenAddress, cmd.Flags().Lookup(FlagLokiGatewayListenAddress)); err != nil { + return nil + } + + cmd.Flags().String(FlagAuthPem, "", "") + + return cmd +} + +func doRunResourceServer(ctx context.Context, cmd *cobra.Command, _ []string) error { + gwAddr := viper.GetString(FlagResourceServerListenAddress) + lokiGwAddr := viper.GetString(FlagLokiGatewayListenAddress) + + cctx, err := sdkclient.GetClientTxContext(cmd) + if err != nil { + return err + } + + txFactory := tx.NewFactoryCLI(cctx, cmd.Flags()).WithTxConfig(cctx.TxConfig).WithAccountRetriever(cctx.AccountRetriever) + + var certFromFlag io.Reader + if val := cmd.Flag(FlagAuthPem).Value.String(); val != "" { + certFromFlag = bytes.NewBufferString(val) + } + + cpem, err := cutils.LoadPEMForAccount(cctx, txFactory.Keybase(), cutils.PEMFromReader(certFromFlag)) + if err != nil { + return err + } + + blk, _ := pem.Decode(cpem.Cert) + x509cert, err := x509.ParseCertificate(blk.Bytes) + if err != nil { + return err + } + + pubkey, ok := x509cert.PublicKey.(*ecdsa.PublicKey) + if !ok { + return ErrEcdsaPubkeyExpected + } + + group, ctx := errgroup.WithContext(ctx) + log := openLogger() + + resourceServer, err := gwrest.NewResourceServer(ctx, log, gwAddr, cctx.FromAddress, pubkey, lokiGwAddr) + if err != nil { + return err + } + + group.Go(func() error { + return resourceServer.ListenAndServe() + }) + + group.Go(func() error { + <-ctx.Done() + return resourceServer.Close() + }) + + err = group.Wait() + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, http.ErrServerClosed) { + return err + } + + return nil +} diff --git a/provider/gateway/rest/middleware.go b/provider/gateway/rest/middleware.go index 40df76c635..b9f01a1d74 100644 --- a/provider/gateway/rest/middleware.go +++ b/provider/gateway/rest/middleware.go @@ -1,12 +1,17 @@ package rest import ( + "crypto/ecdsa" "net/http" "strconv" "strings" + "github.com/golang-jwt/jwt/v4" + "github.com/tendermint/tendermint/libs/log" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/gorilla/context" + gcontext "github.com/gorilla/context" "github.com/gorilla/mux" "github.com/pkg/errors" @@ -207,3 +212,40 @@ func requestStreamParams() mux.MiddlewareFunc { }) } } + +func resourceServerAuth(log log.Logger, providerAddr sdk.Address, publicKey *ecdsa.PublicKey) mux.MiddlewareFunc { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // verify the provided JWT + token, err := jwt.ParseWithClaims(r.Header.Get("Authorization"), &ClientCustomClaims{}, func(token *jwt.Token) (interface{}, error) { + // return the public key to be used for JWT verification + return publicKey, nil + }) + if err != nil { + log.Error("falied to parse JWT", "error", err) + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + // delete the Authorization header as it is no more needed + r.Header.Del("Authorization") + + // store the owner & provider address in request context to be used in later handlers + customClaims, ok := token.Claims.(*ClientCustomClaims) + if !ok { + log.Error("failed to parse JWT claims") + http.Error(w, "Invalid JWT", http.StatusUnauthorized) + return + } + ownerAddress, err := sdk.AccAddressFromBech32(customClaims.Subject) + if err != nil { + log.Error("failed parsing owner address", "error", err, "address", customClaims.Subject) + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + gcontext.Set(r, ownerContextKey, ownerAddress) + gcontext.Set(r, providerContextKey, providerAddr) + + next.ServeHTTP(w, r) + }) + } +} diff --git a/provider/gateway/rest/router.go b/provider/gateway/rest/router.go index 464316fb56..eb97f520bc 100644 --- a/provider/gateway/rest/router.go +++ b/provider/gateway/rest/router.go @@ -3,6 +3,7 @@ package rest import ( "bufio" "context" + "crypto/ecdsa" "encoding/json" "fmt" "github.com/ovrclk/akash/provider/cluster/operatorclients" @@ -10,11 +11,17 @@ import ( ipoptypes "github.com/ovrclk/akash/provider/operator/ipoperator/types" "io" "net/http" + "net/http/httputil" + "net/url" "strconv" + "strings" "sync" "time" "github.com/golang-jwt/jwt/v4" + "github.com/ovrclk/akash/provider/cluster/kube/builder" + + "github.com/cosmos/cosmos-sdk/version" "k8s.io/client-go/tools/remotecommand" @@ -185,6 +192,59 @@ func newJwtServerRouter(addr sdk.Address, privateKey interface{}, jwtExpiresAfte return router } +func newResourceServerRouter(log log.Logger, providerAddr sdk.Address, publicKey *ecdsa.PublicKey, lokiGwAddr string) *mux.Router { + router := mux.NewRouter() + + // add a middleware to verify the JWT provided in Authorization header + router.Use(resourceServerAuth(log, providerAddr, publicKey)) + + lrouter := router.PathPrefix(leasePathPrefix).Subrouter() + lrouter.Use(requireLeaseID()) + + lokiServiceRouter := lrouter.PathPrefix("/loki-service").Subrouter() + lokiServiceRouter.NewRoute().Handler(lokiServiceHandler(log, lokiGwAddr)) + + return router +} + +// lokiServiceHandler forwards all requests to the loki instance running in provider's cluster. +// Example: +// Incoming Request: http://localhost:8445/lease/1/1/1/loki-service/loki/api/v1/query?query={app=".+"} +// Outgoing Request: http://{lokiGwAddr}/loki/api/v1/query?query={app=".+"} +func lokiServiceHandler(log log.Logger, lokiGwAddr string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // set the X-Scope-OrgID header for fetching logs for the right tenant + r.Header.Set("X-Scope-OrgID", builder.LidNS(requestLeaseID(r))) + + // build target url for the reverse proxy + scheme := "http" // for http & https + if strings.HasPrefix(r.URL.Scheme, "ws") { + scheme = "ws" // for ws & wss + } + lokiURL, err := url.Parse(fmt.Sprintf("%s://%s", scheme, lokiGwAddr)) + if err != nil { + log.Error(err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + reverseProxy := httputil.NewSingleHostReverseProxy(lokiURL) + + // remove the "/lease/{dseq}/{gseq}/{oseq}/loki-service" path prefix from the request url + // before it is sent to the reverse proxy. + pathSplits := strings.SplitN(r.URL.Path, "/", 7) + if len(pathSplits) < 7 || pathSplits[6] == "" { + log.Error("loki api not provided in url") + http.Error(w, "loki api not provided in url", http.StatusBadRequest) + return + } + r.URL.Path = pathSplits[6] + + // serve the request using the reverse proxy + log.Info("Forwarding request to loki", "HTTP_API", pathSplits[6]) + reverseProxy.ServeHTTP(w, r) + } +} + func jwtServiceHandler(paddr sdk.Address, privateKey interface{}, jwtExpiresAfter time.Duration, certSerialNumber string) http.HandlerFunc { return func(writer http.ResponseWriter, request *http.Request) { now := time.Now() diff --git a/provider/gateway/rest/server.go b/provider/gateway/rest/server.go index 27b4466f21..2fa4199e6e 100644 --- a/provider/gateway/rest/server.go +++ b/provider/gateway/rest/server.go @@ -2,6 +2,7 @@ package rest import ( "context" + "crypto/ecdsa" "crypto/tls" "github.com/ovrclk/akash/provider/cluster/operatorclients" "net" @@ -69,3 +70,19 @@ func NewJwtServer(ctx context.Context, return srv, nil } + +func NewResourceServer(ctx context.Context, + log log.Logger, + serverAddr string, + providerAddr sdk.Address, + pubkey *ecdsa.PublicKey, + lokiGwAddr string, +) (*http.Server, error) { + srv := &http.Server{ + Addr: serverAddr, + Handler: newResourceServerRouter(log, providerAddr, pubkey, lokiGwAddr), + BaseContext: func(_ net.Listener) context.Context { return ctx }, + } + + return srv, nil +} From 1aab09b652918c6d148995f972f9aad4c61a2cd6 Mon Sep 17 00:00:00 2001 From: Eric Urban Date: Wed, 16 Feb 2022 14:19:07 -0600 Subject: [PATCH 02/13] add hostname operator config --- _run/kube/Makefile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/_run/kube/Makefile b/_run/kube/Makefile index 54cb7a4258..db1a2a920f 100644 --- a/_run/kube/Makefile +++ b/_run/kube/Makefile @@ -19,7 +19,8 @@ provider-run: --cluster-node-port-quantity 100 \ --cluster-public-hostname "$(GATEWAY_HOSTNAME)" \ --bid-price-strategy "randomRange" \ - --deployment-runtime-class "none" + --deployment-runtime-class "none" \ + --hostname-operator-endpoint "localhost:8085" .PHONY: provider-lease-status provider-lease-status: From 92f6fc8a643e3876cf2389ce08c99ff9f27f3c69 Mon Sep 17 00:00:00 2001 From: Eric Urban Date: Thu, 24 Feb 2022 12:54:02 -0600 Subject: [PATCH 03/13] wip --- _run/kube/deployment.yaml | 4 ---- provider/cluster/kube/loki/loki.go | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 4 deletions(-) create mode 100644 provider/cluster/kube/loki/loki.go diff --git a/_run/kube/deployment.yaml b/_run/kube/deployment.yaml index e820432430..f4547df189 100644 --- a/_run/kube/deployment.yaml +++ b/_run/kube/deployment.yaml @@ -15,7 +15,6 @@ services: - hello.localhost to: - global: true - ip: "foobario" bew: image: quay.io/ovrclk/demo-app expose: @@ -54,6 +53,3 @@ deployment: westcoast: profile: web count: 1 -endpoints: - foobario: - kind: ip diff --git a/provider/cluster/kube/loki/loki.go b/provider/cluster/kube/loki/loki.go new file mode 100644 index 0000000000..443723b078 --- /dev/null +++ b/provider/cluster/kube/loki/loki.go @@ -0,0 +1,15 @@ +package loki + +/* +import ( + loki "github.com/grafana/loki/pkg/logcli/client" +) + +type client struct { + lc loki.Client +} + +func (c *client) GetLogsByService(serviceName string) { + +} +*/ \ No newline at end of file From 29c899e3c3281afef1a6974d7b99cccade8a1c76 Mon Sep 17 00:00:00 2001 From: Eric Urban Date: Mon, 28 Feb 2022 15:18:06 -0600 Subject: [PATCH 04/13] wip, query loki for stuff --- provider/cluster/kube/loki/loki.go | 227 +++++++++++++++++- .../cluster/util/service_discovery_agent.go | 1 + provider/cmd/leaseLogs.go | 123 ++++++++++ provider/cmd/run.go | 14 ++ provider/gateway/rest/router.go | 27 ++- provider/gateway/rest/server.go | 4 +- 6 files changed, 387 insertions(+), 9 deletions(-) diff --git a/provider/cluster/kube/loki/loki.go b/provider/cluster/kube/loki/loki.go index 443723b078..f6d53a8510 100644 --- a/provider/cluster/kube/loki/loki.go +++ b/provider/cluster/kube/loki/loki.go @@ -1,15 +1,232 @@ package loki -/* +/** +Some code in this file is from the Loki project v1.6.1. It uses the Apache License version 2.0 which is the +same as this project. + */ + import ( - loki "github.com/grafana/loki/pkg/logcli/client" + "context" + "encoding/json" + "errors" + "fmt" + clusterutil "github.com/ovrclk/akash/provider/cluster/util" + mtypes "github.com/ovrclk/akash/x/market/types/v1beta2" + "github.com/tendermint/tendermint/libs/log" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "net" + "net/http" + "sort" + "sync" + "time" +) + +const ( + lokiNamespace = "loki-stack" + lokiServiceName = "loki-headless" + lokiPortName = "http-metrics" + + queryPath = "/loki/api/v1/query" + queryRangePath = "/loki/api/v1/query_range" + labelsPath = "/loki/api/v1/labels" + labelValuesPath = "/loki/api/v1/label/%s/values" + seriesPath = "/loki/api/v1/series" + tailPath = "/loki/api/v1/tail" + filenameLabel = "filename" + podLabel = "pod" + lokiOrgIdHeader = "X-Scope-OrgID" +) + +var ( + ErrLoki = errors.New("error querying loki") ) +type Client interface { + FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([]LogStatus, error) + Stop() +} + +func NewClient(logger log.Logger, kubeConfig *rest.Config, port *net.SRV) (Client, error) { + kubeClient, err := kubernetes.NewForConfig(kubeConfig) + if err != nil { + return nil, err + } + sda, err := clusterutil.NewServiceDiscoveryAgent(logger, kubeConfig, lokiPortName, lokiServiceName, lokiNamespace, port) + if err != nil { + return nil, err + } + return &client{ + sda:sda, + lock: &sync.Mutex{}, + kc: kubeClient, + }, nil +} + type client struct { - lc loki.Client + sda clusterutil.ServiceDiscoveryAgent + kc kubernetes.Interface + client clusterutil.ServiceClient + lock sync.Locker } -func (c *client) GetLogsByService(serviceName string) { +func (c *client) Stop() { + c.sda.Stop() +} + +func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex uint, startTime, endTime time.Time) { + +} + +func (c *client) getLokiClient(ctx context.Context) (clusterutil.ServiceClient, error) { + c.lock.Lock() + defer c.lock.Unlock() + + if c.client == nil { + var err error + c.client, err = c.sda.GetClient(ctx, false, false) + if err != nil { + return nil, err + } + } + return c.client, nil +} + +type lokiLabelValuesResponse struct { + Status string `json:"status"` + Data []string `json:"data"` +} + +type LogStatus struct { + ServiceName string + ReplicaIndex int + Present bool +} +func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([]LogStatus, error) { + lidNS := clusterutil.LeaseIDToNamespace(leaseID) + lc, err := c.getLokiClient(ctx) + if err != nil { + return nil, err + } + + // get a list of possible logs for this service + possiblePods, err := c.detectRunsForLease(ctx, leaseID) + + // Query Loki for labels of the log filename. The label has a value like + // /var/log/pods/7ev5rjb3nfl9niulmnpaft53g27rck4fhm6v3n4nb5jvu_web-bb84fdfcf-brx2z_f192c48e-ccb3-441e-9778-58de019259d6/web/0.log + // Where the last part of the filename is the associated restart number. The label value only appears if + // the container actually logged something before it terminates. So, we can check the labels and parse out + // that number to determine what containers logged anything before they died + if err != nil { + return nil, err + } + + req, err := lc.CreateRequest(ctx, + http.MethodGet, + fmt.Sprintf(labelValuesPath, podLabel), + nil) + + if err != nil { + return nil, err + } + + // TODO - on the query set end=1646072848813679310&start=1646065648813679310 or similar query args + // Based on the retention set on the provider + req.Header.Add(lokiOrgIdHeader, lidNS) + resp, err := lc.DoRequest(req) + if err != nil { + return nil, err + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("%w: fetching loki label values failed, got status code %d", ErrLoki, resp.StatusCode) + } + + decoder := json.NewDecoder(resp.Body) + result := lokiLabelValuesResponse{} + err = decoder.Decode(&result) + if err != nil { + return nil, fmt.Errorf("decoding loki label values failed: %w", err) + } + + // Assign pods a replica index by their name, so it is consistent. This is done elsewhere as well + podNames := make([]string, 0, len(possiblePods)) + for podName := range possiblePods { + podNames = append(podNames, podName) + } + sort.Strings(podNames) + podNameToReplicaIndex := make(map[string]int) + for i, podName := range podNames { + podNameToReplicaIndex[podName] = i + } + + returnValue := make([]LogStatus, len(podNameToReplicaIndex)) + // By default nothing is found + for possiblePodName, entry := range possiblePods { + replicaIndex := podNameToReplicaIndex[possiblePodName] + returnValue[replicaIndex] = LogStatus{ + ServiceName: entry.serviceName, + ReplicaIndex: replicaIndex, + } + } + + // Mark each pod that is found by name + for _, podName := range result.Data { + i, exists := podNameToReplicaIndex[podName] + if !exists { + continue + } + returnValue[i].Present = true + } + + // TODO - query by pod name and then figure out how many restarts actually logged something? + + return returnValue, nil +} + +type runEntry struct { + restarts uint + serviceName string +} +func (c *client) detectRunsForLease(ctx context.Context, leaseID mtypes.LeaseID) (map[string]runEntry, error) { + // Containers can run more than once (i.e. a pod restarts containers when configured to do so) + // so this code picks up on that by looking into the labels + lidNS := clusterutil.LeaseIDToNamespace(leaseID) + // TODO - paginate + podsResult, err := c.kc.CoreV1().Pods(lidNS).List(ctx, metav1.ListOptions{ + // TODO - use a constant here + LabelSelector: "akash.network=true", + }) + + if err != nil { + return nil, err + } + + if len(podsResult.Items) == 0 { + return nil, fmt.Errorf("%w: lease %q has no pods in kubernetes", ErrLoki, leaseID.String()) + } + + // Build up a mapping of pod names to the expected number of log files + result := make(map[string]runEntry) + for _, pod := range podsResult.Items { + // We only define pods with a single container at this time + if len(pod.Status.ContainerStatuses) != 1 { + return nil, fmt.Errorf("%w: pod %q has %d containers, expected 1", ErrLoki, pod.Name, len(pod.Status.ContainerStatuses)) + } + status := pod.Status.ContainerStatuses[0] + + // TODO - use a constant + serviceName, ok := pod.ObjectMeta.Labels["akash.network/manifest-service"] + if !ok { + return nil, fmt.Errorf("%w: pod %q has no akash service name label", ErrLoki, pod.Name) + } + + result[pod.Name] = runEntry{ + restarts: uint(status.RestartCount), + serviceName: serviceName, + } + } + return result, nil } -*/ \ No newline at end of file diff --git a/provider/cluster/util/service_discovery_agent.go b/provider/cluster/util/service_discovery_agent.go index dbe00f67f1..78249122e6 100644 --- a/provider/cluster/util/service_discovery_agent.go +++ b/provider/cluster/util/service_discovery_agent.go @@ -29,6 +29,7 @@ func NewServiceDiscoveryAgent(logger log.Logger, kubeConfig *rest.Config, portNa return staticServiceDiscoveryAgent(*endpoint), nil } + // TODO - only assign this if that discovery mode is selected kc, err := kubernetes.NewForConfig(kubeConfig) if err != nil { return nil, err diff --git a/provider/cmd/leaseLogs.go b/provider/cmd/leaseLogs.go index e38e1807df..34e53f4a91 100644 --- a/provider/cmd/leaseLogs.go +++ b/provider/cmd/leaseLogs.go @@ -37,7 +37,130 @@ func leaseLogsCmd() *cobra.Command { return cmd } +func leaseLogStatusCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "lease-log-status", + Short: "get lease log status", + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + return doLeaseLogsStatus(cmd) + }, + } + + addServiceFlags(cmd) + + return cmd +} + +func doLeaseLogsStatus(cmd *cobra.Command) error { + /** + cctx, err := sdkclient.GetClientTxContext(cmd) + if err != nil { + return err + } + + cert, err := cutils.LoadAndQueryCertificateForAccount(cmd.Context(), cctx, cctx.Keyring) + if err != nil { + return markRPCServerError(err) + } + + dseq, err := dseqFromFlags(cmd.Flags()) + if err != nil { + return err + } + + leases, err := leasesForDeployment(cmd.Context(), cctx, cmd.Flags(), dtypes.DeploymentID{ + Owner: cctx.GetFromAddress().String(), + DSeq: dseq, + }) + if err != nil { + return markRPCServerError(err) + } + + svcs, err := cmd.Flags().GetString(FlagService) + if err != nil { + return err + } + + type result struct { + lid mtypes.LeaseID + error error + stream *gwrest.ServiceLogs + } + + streams := make([]result, 0, len(leases)) + + ctx := cmd.Context() + + for _, lid := range leases { + stream := result{lid: lid} + prov, _ := sdk.AccAddressFromBech32(lid.Provider) + gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + if err == nil { + stream.stream, stream.error = gclient.LeaseLogs(ctx, lid, svcs, follow, tailLines) + } else { + stream.error = err + } + + streams = append(streams, stream) + } + + var wgStreams sync.WaitGroup + + type logEntry struct { + gwrest.ServiceLogMessage `json:",inline"` + Lid mtypes.LeaseID `json:"lease_id"` + } + + outch := make(chan logEntry) + + printFn := func(evt logEntry) { + fmt.Printf("[%s][%s] %s\n", evt.Lid, evt.Name, evt.Message) + } + + if outputFormat == "json" { + printFn = func(evt logEntry) { + _ = cmdcommon.PrintJSON(cctx, evt) + } + } + + go func() { + for evt := range outch { + printFn(evt) + } + }() + + for _, stream := range streams { + if stream.error != nil { + continue + } + + wgStreams.Add(1) + go func(stream result) { + defer wgStreams.Done() + + for res := range stream.stream.Stream { + outch <- logEntry{ + ServiceLogMessage: res, + Lid: stream.lid, + } + } + }(stream) + } + + wgStreams.Wait() + close(outch) + + return nil + + */ + return nil +} + func doLeaseLogs(cmd *cobra.Command) error { + + // TODO - clean up most of this into some sort of simple "withEachLease" function that calls a + // function for each lease found cctx, err := sdkclient.GetClientTxContext(cmd) if err != nil { return err diff --git a/provider/cmd/run.go b/provider/cmd/run.go index ed6e7165ba..ecb00d2253 100644 --- a/provider/cmd/run.go +++ b/provider/cmd/run.go @@ -8,6 +8,7 @@ import ( "encoding/pem" "fmt" "github.com/ovrclk/akash/provider/cluster/kube/clientcommon" + "github.com/ovrclk/akash/provider/cluster/kube/loki" "github.com/ovrclk/akash/provider/cluster/operatorclients" "github.com/ovrclk/akash/provider/operator/waiter" "io" @@ -455,6 +456,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { cachedResultMaxAge := viper.GetDuration(FlagCachedResultMaxAge) rpcQueryTimeout := viper.GetDuration(FlagRPCQueryTimeout) enableIPOperator := viper.GetBool(FlagEnableIPOperator) + enableLogging := true // TODO - configurable via flag pricing, err := createBidPricingStrategy(strategy) if err != nil { @@ -656,6 +658,14 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { } } + var lokiClient loki.Client + if enableLogging { + lokiClient, err = loki.NewClient(logger, kubeConfig, nil) // TODO - pass hostname configuration from flags + if err != nil { + return err + } + } + endpoint, err := provider_flags.GetServiceEndpointFlagValue(logger, serviceHostnameOperator) if err != nil { return err @@ -685,6 +695,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { service, cquery, ipOperatorClient, + lokiClient, gwaddr, cctx.FromAddress, []tls.Certificate{cert}, @@ -733,6 +744,9 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { if ipOperatorClient != nil { ipOperatorClient.Stop() } + if lokiClient != nil { + lokiClient.Stop() + } hostnameOperatorClient.Stop() if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, http.ErrServerClosed) { return err diff --git a/provider/gateway/rest/router.go b/provider/gateway/rest/router.go index eb97f520bc..ef2bb1d18a 100644 --- a/provider/gateway/rest/router.go +++ b/provider/gateway/rest/router.go @@ -6,6 +6,7 @@ import ( "crypto/ecdsa" "encoding/json" "fmt" + "github.com/ovrclk/akash/provider/cluster/kube/loki" "github.com/ovrclk/akash/provider/cluster/operatorclients" "github.com/ovrclk/akash/provider/gateway/utils" ipoptypes "github.com/ovrclk/akash/provider/operator/ipoperator/types" @@ -21,8 +22,6 @@ import ( "github.com/golang-jwt/jwt/v4" "github.com/ovrclk/akash/provider/cluster/kube/builder" - "github.com/cosmos/cosmos-sdk/version" - "k8s.io/client-go/tools/remotecommand" manifest "github.com/ovrclk/akash/manifest/v2beta1" @@ -82,7 +81,7 @@ type wsStreamConfig struct { client cluster.ReadClient } -func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopclient operatorclients.IPOperatorClient, ctxConfig map[interface{}]interface{}) *mux.Router { +func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopclient operatorclients.IPOperatorClient, lokiClient loki.Client, ctxConfig map[interface{}]interface{}) *mux.Router { router := mux.NewRouter() @@ -165,6 +164,9 @@ func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopcl leaseLogsHandler(log, pclient.Cluster())). Methods("GET") + logRouter.HandleFunc("/status", + logStatusHandler(log, lokiClient)).Methods(http.MethodGet) + srouter := lrouter.PathPrefix("/service/{serviceName}").Subrouter() srouter.Use( requireService(), @@ -182,6 +184,25 @@ func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopcl return router } +func logStatusHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc{ + return func(rw http.ResponseWriter, req *http.Request) { + leaseID := requestLeaseID(req) + result, err := lokiClient.FindLogsByLease(req.Context(), leaseID) + if err != nil { + logger.Error("could not get logs for lease", "lease", leaseID, "err", err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + rw.WriteHeader(http.StatusOK) + enc := json.NewEncoder(rw) + err = enc.Encode(result) + if err != nil { + logger.Error("could not write lease log status", "lease", leaseID, "err", err) + } + } +} + func newJwtServerRouter(addr sdk.Address, privateKey interface{}, jwtExpiresAfter time.Duration, certSerialNumber string) *mux.Router { router := mux.NewRouter() diff --git a/provider/gateway/rest/server.go b/provider/gateway/rest/server.go index 2fa4199e6e..7da80e174c 100644 --- a/provider/gateway/rest/server.go +++ b/provider/gateway/rest/server.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" "crypto/tls" + "github.com/ovrclk/akash/provider/cluster/kube/loki" "github.com/ovrclk/akash/provider/cluster/operatorclients" "net" "net/http" @@ -23,6 +24,7 @@ func NewServer( pclient provider.Client, cquery ctypes.QueryClient, ipopclient operatorclients.IPOperatorClient, + lokiClient loki.Client, address string, pid sdk.Address, certs []tls.Certificate, @@ -30,7 +32,7 @@ func NewServer( srv := &http.Server{ Addr: address, - Handler: newRouter(log, pid, pclient, ipopclient, clusterConfig), + Handler: newRouter(log, pid, pclient, ipopclient, lokiClient, clusterConfig), BaseContext: func(_ net.Listener) context.Context { return ctx }, From 20d80698aa284c515bb498af7c97b33ba38d67cf Mon Sep 17 00:00:00 2001 From: Eric Urban Date: Wed, 2 Mar 2022 16:24:55 -0600 Subject: [PATCH 05/13] wip, query loki for logs --- provider/cluster/kube/loki/loki.go | 243 ++++++++++++++++++++++++++--- provider/gateway/rest/router.go | 39 +++++ 2 files changed, 262 insertions(+), 20 deletions(-) diff --git a/provider/cluster/kube/loki/loki.go b/provider/cluster/kube/loki/loki.go index f6d53a8510..526ea99d6c 100644 --- a/provider/cluster/kube/loki/loki.go +++ b/provider/cluster/kube/loki/loki.go @@ -6,6 +6,7 @@ same as this project. */ import ( + "bytes" "context" "encoding/json" "errors" @@ -13,12 +14,17 @@ import ( clusterutil "github.com/ovrclk/akash/provider/cluster/util" mtypes "github.com/ovrclk/akash/x/market/types/v1beta2" "github.com/tendermint/tendermint/libs/log" + "io" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "net" "net/http" + "net/url" + "path" "sort" + "strconv" + "strings" "sync" "time" ) @@ -39,12 +45,20 @@ const ( lokiOrgIdHeader = "X-Scope-OrgID" ) +type lokiDirection string + +const ( + FORWARD lokiDirection = "FORWARD" + BACKWARD lokiDirection = "BACKWARD" +) + var ( ErrLoki = errors.New("error querying loki") ) type Client interface { FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([]LogStatus, error) + GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime, endTime time.Time, forward bool) (LogResult, error) Stop() } @@ -75,14 +89,153 @@ func (c *client) Stop() { c.sda.Stop() } -func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex uint, startTime, endTime time.Time) { +type lokiLogQueryResult struct { + Stream map[string]string `json:"stream"` + Values [][]string `json:"values"` +} + +type lokiLogQueryData struct { + ResultType string `json:"resultType"` + Result []lokiLogQueryResult `json:"result"` +} + +type lokiLogQueryResponse struct { + Status string `json:"status"` + Data lokiLogQueryData `json:"data"` +} +type LogResultLine struct { + RunIndex uint + At time.Time + Line string +} + +type LogResult struct { + ServiceName string + ReplicaIndex uint + Entries []LogResultLine + +} + +func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime, endTime time.Time, forward bool) (LogResult, error) { + lidNS := clusterutil.LeaseIDToNamespace(leaseID) + // get a list of possible logs for this service + possiblePods, err := c.detectRunsForLease(ctx, leaseID) + if err != nil { + return LogResult{}, err + } + + datamap := serviceNameAndReplicaIndexToPodName(possiblePods) + + podName, exists := datamap[serviceNameAndReplicaIndex{ + replicaIndex: int(replicaIndex), + serviceName: serviceName, + }] + + if !exists { + return LogResult{},fmt.Errorf("%w: no entry for service %q and replica %d", ErrLoki, serviceName, replicaIndex) + } + + // Query using the pod name to get the result + lc, err := c.getLokiClient(ctx) + if err != nil { + return LogResult{},err + } + + httpQueryString := url.Values{} + httpQueryString.Set("start", fmt.Sprintf("%d", startTime.UnixNano())) + httpQueryString.Set("end", fmt.Sprintf("%d", endTime.UnixNano())) + httpQueryString.Set("limit", "1000") // TODO - configurable or something? Maybe user requestable? + // TODO - guard against injection here even though it is unlikely + lokiQuery := fmt.Sprintf("{pod=%q}", podName) // Note this is not JSON + // TODO - query by run index as well ? do I even need this + httpQueryString.Set("query", lokiQuery) + + direction := FORWARD + if !forward { + direction = BACKWARD + } + httpQueryString.Set("direction", string(direction)) + + request, err := lc.CreateRequest(ctx, http.MethodGet, queryRangePath, nil ) + if err != nil { + return LogResult{},err + } + request.URL.RawQuery = httpQueryString.Encode() + + request.Header.Add(lokiOrgIdHeader, lidNS) + + resp, err := lc.DoRequest(request) + if err != nil { + return LogResult{},fmt.Errorf("loki log request failed: %w", err) + } + + if resp.StatusCode != 200 { + buf := &bytes.Buffer{} + _, _ = io.Copy(buf, resp.Body) + msg := strings.Trim(buf.String(), "\n\t\r") + return LogResult{},fmt.Errorf("%w: fetching logs from loki failed, got status code %d; %s", ErrLoki, resp.StatusCode, msg) + } + + // TODO - parse response + + decoder := json.NewDecoder(resp.Body) + lokiResult := lokiLogQueryResponse{} + err = decoder.Decode(&lokiResult) + if err != nil { + return LogResult{},fmt.Errorf("loki log request could not be decoded: %w", err) + } + + result := LogResult{ + ServiceName: serviceName, + ReplicaIndex: replicaIndex, + Entries: nil, + } + for _, resultSet := range lokiResult.Data.Result { + filepath, exists := resultSet.Stream[filenameLabel] + if !exists { + return LogResult{},fmt.Errorf("%w: expected loki log result set to have label %q but it does not", ErrLoki, filenameLabel) + } + + _, filename := path.Split(filepath) + filenameParts := strings.SplitN(filename, ".", 2) + + runIndex, err := strconv.ParseUint(filenameParts[0], 0, 31) + if err != nil { + return LogResult{},fmt.Errorf("expected to parse filename %q as integer for kubernetes run index: %w", filename, err) + + } + for _, logEntry := range resultSet.Values { + if len(logEntry) != 2 { + return LogResult{},fmt.Errorf("%w: expected log entry to have 2 values, not %d", ErrLoki, len(logEntry)) + } + timeStampStr := logEntry[0] + + timestamp, err := strconv.ParseInt(timeStampStr,0, 64) + if err != nil { + return LogResult{},fmt.Errorf("could not parse log entry timestamp %q: %w", timeStampStr, err) + } + + at := time.Unix(0, timestamp) + logLine := logEntry[1] + + result.Entries = append(result.Entries, LogResultLine{ + RunIndex: uint(runIndex), + At: at, + Line: logLine, + }) + + } + } + + return result, nil } func (c *client) getLokiClient(ctx context.Context) (clusterutil.ServiceClient, error) { c.lock.Lock() defer c.lock.Unlock() + // TODO - reset this client on error from Loki if c.client == nil { var err error c.client, err = c.sda.GetClient(ctx, false, false) @@ -98,6 +251,7 @@ type lokiLabelValuesResponse struct { Data []string `json:"data"` } +// TODO - should this be an interface ? type LogStatus struct { ServiceName string ReplicaIndex int @@ -136,7 +290,7 @@ func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([ req.Header.Add(lokiOrgIdHeader, lidNS) resp, err := lc.DoRequest(req) if err != nil { - return nil, err + return nil, fmt.Errorf("loki label values request failed: %w", err) } if resp.StatusCode != 200 { @@ -150,33 +304,30 @@ func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([ return nil, fmt.Errorf("decoding loki label values failed: %w", err) } - // Assign pods a replica index by their name, so it is consistent. This is done elsewhere as well - podNames := make([]string, 0, len(possiblePods)) - for podName := range possiblePods { - podNames = append(podNames, podName) - } - sort.Strings(podNames) - podNameToReplicaIndex := make(map[string]int) - for i, podName := range podNames { - podNameToReplicaIndex[podName] = i - } + podNameToPodData := podNamesToServiceNameAndReplicaIndex(possiblePods) + returnValue := make([]LogStatus, 0, len(podNameToPodData)) + positionData := make(map[serviceNameAndReplicaIndex]int) - returnValue := make([]LogStatus, len(podNameToReplicaIndex)) // By default nothing is found for possiblePodName, entry := range possiblePods { - replicaIndex := podNameToReplicaIndex[possiblePodName] - returnValue[replicaIndex] = LogStatus{ + podData := podNameToPodData[possiblePodName] + positionData[serviceNameAndReplicaIndex{ + replicaIndex: podData.replicaIndex, + serviceName: podData.serviceName, + }] = len(returnValue) + returnValue = append(returnValue, LogStatus{ ServiceName: entry.serviceName, - ReplicaIndex: replicaIndex, - } + ReplicaIndex: podData.replicaIndex, + }) } - // Mark each pod that is found by name + // Mark each pod that is found with logs for _, podName := range result.Data { - i, exists := podNameToReplicaIndex[podName] - if !exists { + podData, exists := podNameToPodData[podName] + if ! exists { continue } + i := positionData[podData] returnValue[i].Present = true } @@ -185,10 +336,62 @@ func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([ return returnValue, nil } +type serviceNameAndReplicaIndex struct{ + replicaIndex int + serviceName string +} + +func serviceNameAndReplicaIndexToPodName(input map[string]runEntry) map[serviceNameAndReplicaIndex]string { + result := make(map[serviceNameAndReplicaIndex]string) + + withPartitionedPods(input, func(podName string, serviceName string, replicaIndex int){ + result[serviceNameAndReplicaIndex{ + replicaIndex: replicaIndex, + serviceName: serviceName, + }] = podName + }) + + return result +} + +func withPartitionedPods(input map[string]runEntry, fn func(podName string, serviceName string, replicaIndex int)) { + + // Assign pods a replica index by their name, so it is consistent. This is done elsewhere as well + partitionedPods := make(map[string][]string) + for podName, entry := range input { + listForService := partitionedPods[entry.serviceName] + + listForService = append(listForService, podName) + partitionedPods[entry.serviceName] = listForService + } + + for serviceName, podNames := range partitionedPods { + sort.Strings(podNames) + + for i, podName := range podNames { + fn(podName, serviceName, i) + } + } +} + +func podNamesToServiceNameAndReplicaIndex(input map[string]runEntry) map[string]serviceNameAndReplicaIndex { + podNameToReplicaIndex := make(map[string]serviceNameAndReplicaIndex) + + withPartitionedPods(input, func(podName string, serviceName string, replicaIndex int){ + podNameToReplicaIndex[podName] = serviceNameAndReplicaIndex{ + replicaIndex: replicaIndex, + serviceName: serviceName, + } + }) + + return podNameToReplicaIndex +} + type runEntry struct { restarts uint serviceName string } + func (c *client) detectRunsForLease(ctx context.Context, leaseID mtypes.LeaseID) (map[string]runEntry, error) { // Containers can run more than once (i.e. a pod restarts containers when configured to do so) // so this code picks up on that by looking into the labels diff --git a/provider/gateway/rest/router.go b/provider/gateway/rest/router.go index ef2bb1d18a..efb9c25744 100644 --- a/provider/gateway/rest/router.go +++ b/provider/gateway/rest/router.go @@ -167,6 +167,9 @@ func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopcl logRouter.HandleFunc("/status", logStatusHandler(log, lokiClient)).Methods(http.MethodGet) + logRouter.HandleFunc("/query/{serviceName}/{replicaIndex}", + logQueryHandler(log, lokiClient)).Methods(http.MethodGet) + srouter := lrouter.PathPrefix("/service/{serviceName}").Subrouter() srouter.Use( requireService(), @@ -184,9 +187,45 @@ func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopcl return router } +func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc{ + return func(rw http.ResponseWriter, req *http.Request) { + leaseID := requestLeaseID(req) + // TODO - make sure the lease actually exists first + + serviceName := mux.Vars(req)["serviceName"] // TODO validate this exists + replicaIndexStr := mux.Vars(req)["replicaIndex"] + + replicaIndex, err := strconv.ParseUint(replicaIndexStr, 10, 31) + if err != nil { + logger.Error("could not parse path compeonent for replica index", "err", err, "replicaIndex", replicaIndexStr) + rw.WriteHeader(http.StatusNotFound) + return + } + + logs, err := lokiClient.GetLogByService(req.Context(),leaseID, serviceName, uint(replicaIndex), + -1, + time.Now().Add(time.Hour * 72 * -1), // TODO - configurable + time.Now(), false) + + if err != nil { + logger.Error("could not get logs for lease", "lease", leaseID, "err", err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + rw.WriteHeader(http.StatusOK) + enc := json.NewEncoder(rw) + err = enc.Encode(logs) + if err != nil { + logger.Error("could not write lease log status", "lease", leaseID, "err", err) + } + } +} + func logStatusHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc{ return func(rw http.ResponseWriter, req *http.Request) { leaseID := requestLeaseID(req) + // TODO - make sure the lease actually exists first result, err := lokiClient.FindLogsByLease(req.Context(), leaseID) if err != nil { logger.Error("could not get logs for lease", "lease", leaseID, "err", err) From a57b7e1420a61554f988d7723982f34dbcd883d0 Mon Sep 17 00:00:00 2001 From: Eric Urban Date: Thu, 3 Mar 2022 10:01:18 -0600 Subject: [PATCH 06/13] add TODOs --- provider/cluster/kube/loki/loki.go | 5 +++++ provider/gateway/rest/router.go | 11 +++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/provider/cluster/kube/loki/loki.go b/provider/cluster/kube/loki/loki.go index 526ea99d6c..fb1ffe7844 100644 --- a/provider/cluster/kube/loki/loki.go +++ b/provider/cluster/kube/loki/loki.go @@ -142,11 +142,16 @@ func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, se return LogResult{},err } + // TODO - if runIndex >= 0 then launch a query to get a single log line back from the backend + // then use that log line to determine the correct label to query on. Will need to parse it out and then + // modify it to be the expected value + httpQueryString := url.Values{} httpQueryString.Set("start", fmt.Sprintf("%d", startTime.UnixNano())) httpQueryString.Set("end", fmt.Sprintf("%d", endTime.UnixNano())) httpQueryString.Set("limit", "1000") // TODO - configurable or something? Maybe user requestable? // TODO - guard against injection here even though it is unlikely + // TODO - specify filename label here if runIndex >= 0 lokiQuery := fmt.Sprintf("{pod=%q}", podName) // Note this is not JSON // TODO - query by run index as well ? do I even need this httpQueryString.Set("query", lokiQuery) diff --git a/provider/gateway/rest/router.go b/provider/gateway/rest/router.go index efb9c25744..0fe9f64de7 100644 --- a/provider/gateway/rest/router.go +++ b/provider/gateway/rest/router.go @@ -197,14 +197,21 @@ func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc replicaIndex, err := strconv.ParseUint(replicaIndexStr, 10, 31) if err != nil { - logger.Error("could not parse path compeonent for replica index", "err", err, "replicaIndex", replicaIndexStr) + logger.Error("could not parse path component for replica index", "err", err, "replicaIndex", replicaIndexStr) rw.WriteHeader(http.StatusNotFound) return } + // TODO - validate replica index + + // TODO - get the following from the query, with defaults + // runIndex + // start time + // end time + logs, err := lokiClient.GetLogByService(req.Context(),leaseID, serviceName, uint(replicaIndex), -1, - time.Now().Add(time.Hour * 72 * -1), // TODO - configurable + time.Now().Add(time.Hour * 72 * -1), // TODO - configurable. Loki apparently has a default time limit of 720hrs here time.Now(), false) if err != nil { From f9fb9ad0aa1bdec696b05dcca2decfca2cebaabf Mon Sep 17 00:00:00 2001 From: Eric Urban Date: Thu, 3 Mar 2022 16:03:19 -0600 Subject: [PATCH 07/13] work on mroe features --- provider/cluster/kube/loki/loki.go | 91 ++++++++++++++++++++++++++++-- provider/gateway/rest/router.go | 75 ++++++++++++++++++++++-- 2 files changed, 154 insertions(+), 12 deletions(-) diff --git a/provider/cluster/kube/loki/loki.go b/provider/cluster/kube/loki/loki.go index fb1ffe7844..f10bdaa430 100644 --- a/provider/cluster/kube/loki/loki.go +++ b/provider/cluster/kube/loki/loki.go @@ -117,6 +117,61 @@ type LogResult struct { } +func (c *client) discoverFilename(ctx context.Context, leaseID mtypes.LeaseID, startTime, endTime time.Time, podName string) (string,error) { + // Query using the pod name to get the result + lc, err := c.getLokiClient(ctx) + if err != nil { + return "", err + } + + httpQueryString := url.Values{} + httpQueryString.Set("start", fmt.Sprintf("%d", startTime.UnixNano())) + httpQueryString.Set("end", fmt.Sprintf("%d", endTime.UnixNano())) + httpQueryString.Set("limit", "1") + // TODO - guard against injection + lokiQuery := fmt.Sprintf("{%s=%q}", podLabel, podName) // Note this is not JSON + httpQueryString.Set("query", lokiQuery) + httpQueryString.Set("direction", string(BACKWARD)) + + request, err := lc.CreateRequest(ctx, http.MethodGet, queryRangePath, nil ) + if err != nil { + return "", err + } + request.URL.RawQuery = httpQueryString.Encode() + + request.Header.Add(lokiOrgIdHeader, clusterutil.LeaseIDToNamespace(leaseID)) + + resp, err := lc.DoRequest(request) + if err != nil { + return "" , fmt.Errorf("loki filename discovery log query for pod %q failed: %w", podName, err) + } + + if resp.StatusCode != 200 { + buf := &bytes.Buffer{} + _, _ = io.Copy(buf, resp.Body) + msg := strings.Trim(buf.String(), "\n\t\r") + return "", fmt.Errorf("%w: loki filename discovery log query failed for pod %q, got status code %d; %s", ErrLoki, podName, resp.StatusCode, msg) + } + + decoder := json.NewDecoder(resp.Body) + lokiResult := lokiLogQueryResponse{} + err = decoder.Decode(&lokiResult) + if err != nil { + return "",fmt.Errorf("loki log query response for pod %q could not be decoded: %w", podName, err) + } + + if len(lokiResult.Data.Result) == 0 { + return "", fmt.Errorf("%w: loki filename discovery for pod %q returned no results", ErrLoki, podName) + } + + filename, exists:= lokiResult.Data.Result[0].Stream[filenameLabel] + if !exists { + return "", fmt.Errorf("%w: loki filename discovery for pod %q had no label %q", ErrLoki, podName, filenameLabel) + } + + return filename, nil +} + func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime, endTime time.Time, forward bool) (LogResult, error) { lidNS := clusterutil.LeaseIDToNamespace(leaseID) // get a list of possible logs for this service @@ -142,19 +197,43 @@ func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, se return LogResult{},err } - // TODO - if runIndex >= 0 then launch a query to get a single log line back from the backend + // if runIndex >= 0 then launch a query to get a single log line back from the backend // then use that log line to determine the correct label to query on. Will need to parse it out and then // modify it to be the expected value + specifyRunIndex := runIndex >= 0 + filenameLabelFilter := "" + if specifyRunIndex { + filename, err := c.discoverFilename(ctx, leaseID, startTime, endTime, podName) + if err != nil { + return LogResult{}, err + } + + head, tail := path.Split(filename) + parts := strings.SplitN(tail, ".", 2) + if len(parts) != 2 { + return LogResult{}, fmt.Errorf("%w: while constructing fielname filter cannot make sense of filepath %q", ErrLoki, filename) + } + + filenameLabelFilter = fmt.Sprintf("%s/%d.%s", head, runIndex, parts[1]) + } httpQueryString := url.Values{} httpQueryString.Set("start", fmt.Sprintf("%d", startTime.UnixNano())) httpQueryString.Set("end", fmt.Sprintf("%d", endTime.UnixNano())) httpQueryString.Set("limit", "1000") // TODO - configurable or something? Maybe user requestable? + + lokiQueryBuf := &bytes.Buffer{} + // Note this is not JSON // TODO - guard against injection here even though it is unlikely - // TODO - specify filename label here if runIndex >= 0 - lokiQuery := fmt.Sprintf("{pod=%q}", podName) // Note this is not JSON - // TODO - query by run index as well ? do I even need this - httpQueryString.Set("query", lokiQuery) + _, _ = fmt.Fprint(lokiQueryBuf, "{") + _, _ = fmt.Fprintf(lokiQueryBuf, "%s=%q", podLabel, podName) + // specify filename label here if runIndex >= 0 + if specifyRunIndex { + _, _ = fmt.Fprintf(lokiQueryBuf,",%s=%q", filenameLabel, filenameLabelFilter) + } + _, _ = fmt.Fprint(lokiQueryBuf, "}") + + httpQueryString.Set("query", lokiQueryBuf.String()) direction := FORWARD if !forward { @@ -188,7 +267,7 @@ func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, se lokiResult := lokiLogQueryResponse{} err = decoder.Decode(&lokiResult) if err != nil { - return LogResult{},fmt.Errorf("loki log request could not be decoded: %w", err) + return LogResult{},fmt.Errorf("loki log query response could not be decoded: %w", err) } result := LogResult{ diff --git a/provider/gateway/rest/router.go b/provider/gateway/rest/router.go index 0fe9f64de7..e2f84ed7fc 100644 --- a/provider/gateway/rest/router.go +++ b/provider/gateway/rest/router.go @@ -205,15 +205,78 @@ func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc // TODO - validate replica index // TODO - get the following from the query, with defaults - // runIndex + query := req.URL.Query() + + // Parse runIndex from query, if present + runIndex := -1 + runIndexStr := query.Get("runIndex") + if len(runIndexStr) != 0 { + runIndex64, err := strconv.ParseUint(runIndexStr, 10, 31) + if err != nil { + logger.Error("could not parse run index", "runIndex", runIndexStr, "err", err) + rw.WriteHeader(http.StatusBadRequest) + return + } + runIndex = int(runIndex64) + } + + // Parse start time from query, if present + startTime := time.Now().Add(-1 * time.Hour) // start time - // end time + startTimeStr := query.Get("startTime") + if len(startTimeStr) != 0 { + startTimeInt, err := strconv.ParseInt(startTimeStr, 10, 64) + if err != nil { + logger.Error("could not parse startTime", "startTime", startTimeStr, "err", err) + rw.WriteHeader(http.StatusBadRequest) + return + } - logs, err := lokiClient.GetLogByService(req.Context(),leaseID, serviceName, uint(replicaIndex), - -1, - time.Now().Add(time.Hour * 72 * -1), // TODO - configurable. Loki apparently has a default time limit of 720hrs here - time.Now(), false) + startTime = time.Unix(startTimeInt,0) + } + + // Parse end time from query, if present + endTime := time.Now() + endTimeStr := query.Get("endTime") + if len(startTimeStr) != 0 { + endTimeInt, err := strconv.ParseInt(endTimeStr, 10, 64) + if err != nil { + logger.Error("could not parse endTime", "endTime", endTimeStr, "err", err) + rw.WriteHeader(http.StatusBadRequest) + return + } + + endTime = time.Unix(endTimeInt,0) + } + const timeLimit = time.Hour * 72 // TODO - configurable + duration := endTime.Sub(startTime) + if duration > timeLimit || duration < 0 { + logger.Error("duration of time range queried is not allowed", "startTime", startTime, "endTime", endTime) + rw.WriteHeader(http.StatusBadRequest) + return + } + + forward := true + // Parse direction from query, if present + forwardStr := query.Get("forward") + if len(forwardStr) != 0 { + switch(forwardStr){ + case "0": + forward = false + case "1": + forward = true + default: + logger.Error("unknown value for forward", "forward", forwardStr) + rw.WriteHeader(http.StatusBadRequest) + return + } + } + + logs, err := lokiClient.GetLogByService(req.Context(),leaseID, serviceName, uint(replicaIndex), + runIndex, + startTime, + endTime, forward) if err != nil { logger.Error("could not get logs for lease", "lease", leaseID, "err", err) rw.WriteHeader(http.StatusInternalServerError) From 4658e616ac47e594c5b30ed78021b74e50c360ee Mon Sep 17 00:00:00 2001 From: Eric Urban Date: Mon, 7 Mar 2022 16:06:20 -0600 Subject: [PATCH 08/13] wip on streaming logs back --- provider/cluster/kube/loki/loki.go | 167 ++++++++++++++++-- provider/cluster/util/service_client.go | 36 ++++ .../cluster/util/service_discovery_agent.go | 144 ++++++++++++--- .../util/service_discovery_agent_static.go | 5 + .../util/service_discovery_agent_types.go | 17 +- provider/gateway/rest/router.go | 88 ++++++++- 6 files changed, 419 insertions(+), 38 deletions(-) diff --git a/provider/cluster/kube/loki/loki.go b/provider/cluster/kube/loki/loki.go index f10bdaa430..12fa911fa3 100644 --- a/provider/cluster/kube/loki/loki.go +++ b/provider/cluster/kube/loki/loki.go @@ -59,6 +59,10 @@ var ( type Client interface { FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([]LogStatus, error) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime, endTime time.Time, forward bool) (LogResult, error) + TailLogsByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime time.Time, + eachLogLine func(at time.Time, line string)error, + onFirstDroppedLog func() error) error + Stop() } @@ -82,6 +86,7 @@ type client struct { sda clusterutil.ServiceDiscoveryAgent kc kubernetes.Interface client clusterutil.ServiceClient + websocketClient clusterutil.WebsocketServiceClient lock sync.Locker } @@ -172,6 +177,138 @@ func (c *client) discoverFilename(ctx context.Context, leaseID mtypes.LeaseID, s return filename, nil } +// DroppedStream represents a dropped stream in tail call +type droppedStream struct { + Timestamp time.Time + Labels map[string]string +} + +//Entry represents a log entry. It includes a log message and the time it occurred at. +type entry struct { + Timestamp time.Time + Line string +} + +//Stream represents a log stream. It includes a set of log entries and their labels. +type stream struct { + Labels map[string]string `json:"stream"` + Entries []entry `json:"values"` +} + +// tailResponse represents the http json response to a tail query from loki +type tailResponse struct { + Streams []stream `json:"streams,omitempty"` + DroppedStreams []droppedStream `json:"dropped_entries,omitempty"` +} + +func (c *client) TailLogsByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime time.Time, + eachLogLine func(at time.Time, line string)error, + onFirstDroppedLog func() error) error { + possiblePods, err := c.detectRunsForLease(ctx, leaseID) + if err != nil { + return err + } + + datamap := serviceNameAndReplicaIndexToPodName(possiblePods) + podName, exists := datamap[serviceNameAndReplicaIndex{ + replicaIndex: int(replicaIndex), + serviceName: serviceName, + }] + + if !exists { + return fmt.Errorf("%w: no entry for service %q and replica %d", ErrLoki, serviceName, replicaIndex) + } + + httpQueryString := url.Values{} + httpQueryString.Set("from", fmt.Sprintf("%d", startTime.UnixNano())) + httpQueryString.Set("limit", "1000") // TODO - configurable or something? Maybe user requestable? + httpQueryString.Set("delay_for", "3") // TODO - what does this do + + var filenameLabelFilter string + specifyRunIndex := runIndex >= 0 + if specifyRunIndex { + var err error + filenameLabelFilter, err = c.getFilename(ctx, leaseID, startTime, time.Now(), runIndex, podName) + if err != nil { + return err + } + } + + lokiQueryBuf := &bytes.Buffer{} + // Note this is not JSON + // TODO - guard against injection here even though it is unlikely + _, _ = fmt.Fprint(lokiQueryBuf, "{") + _, _ = fmt.Fprintf(lokiQueryBuf, "%s=%q", podLabel, podName) + // specify filename label here if runIndex >= 0 + if specifyRunIndex { + _, _ = fmt.Fprintf(lokiQueryBuf,",%s=%q", filenameLabel, filenameLabelFilter) + } + _, _ = fmt.Fprint(lokiQueryBuf, "}") + + httpQueryString.Set("query", lokiQueryBuf.String()) + + wslc, err := c.getLokiWebsocketClient(ctx) + if err != nil { + return err + } + + headers := http.Header{ + lokiOrgIdHeader: []string{clusterutil.LeaseIDToNamespace(leaseID)}, + } + conn, err := wslc.DialWebsocket(ctx, tailPath + "?" + httpQueryString.Encode(), headers) + if err != nil { + return err + } + + dropCalled := false + for { + if ctx.Err() != nil { + return ctx.Err() + } + var logs tailResponse + err := conn.ReadJSON(&logs) + if err != nil { + return fmt.Errorf("error parsing JSON from loki log tail: %w", err) + } + + for _, stream := range logs.Streams { + for _, entry := range stream.Entries { + err = eachLogLine(entry.Timestamp, entry.Line) + if err != nil { + return err + } + } + } + + if !dropCalled && len(logs.DroppedStreams) != 0{ + // caused by the client being too slow + dropCalled = true + err = onFirstDroppedLog() + if err != nil { + return err + } + } + + } + +} + +func (c *client) getFilename(ctx context.Context, leaseID mtypes.LeaseID, startTime, endTime time.Time, runIndex int, podName string) (string, error){ + filename, err := c.discoverFilename(ctx, leaseID, startTime, endTime, podName) + if err != nil { + return "", err + } + + head, tail := path.Split(filename) + parts := strings.SplitN(tail, ".", 2) + if len(parts) != 2 { + return "", fmt.Errorf("%w: while constructing fielname filter cannot make sense of filepath %q", ErrLoki, filename) + } + + return fmt.Sprintf("%s/%d.%s", head, runIndex, parts[1]), nil +} + + func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime, endTime time.Time, forward bool) (LogResult, error) { lidNS := clusterutil.LeaseIDToNamespace(leaseID) // get a list of possible logs for this service @@ -203,18 +340,10 @@ func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, se specifyRunIndex := runIndex >= 0 filenameLabelFilter := "" if specifyRunIndex { - filename, err := c.discoverFilename(ctx, leaseID, startTime, endTime, podName) + filenameLabelFilter, err = c.getFilename(ctx, leaseID, startTime, endTime, runIndex, podName) if err != nil { return LogResult{}, err } - - head, tail := path.Split(filename) - parts := strings.SplitN(tail, ".", 2) - if len(parts) != 2 { - return LogResult{}, fmt.Errorf("%w: while constructing fielname filter cannot make sense of filepath %q", ErrLoki, filename) - } - - filenameLabelFilter = fmt.Sprintf("%s/%d.%s", head, runIndex, parts[1]) } httpQueryString := url.Values{} @@ -261,8 +390,7 @@ func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, se return LogResult{},fmt.Errorf("%w: fetching logs from loki failed, got status code %d; %s", ErrLoki, resp.StatusCode, msg) } - // TODO - parse response - + // Parse the response & grab the values we care about decoder := json.NewDecoder(resp.Body) lokiResult := lokiLogQueryResponse{} err = decoder.Decode(&lokiResult) @@ -315,11 +443,26 @@ func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, se return result, nil } +func (c *client) getLokiWebsocketClient(ctx context.Context) (clusterutil.WebsocketServiceClient, error) { + c.lock.Lock() + defer c.lock.Unlock() + + // TODO - reset this client on error after HTTP request + if c.websocketClient == nil { + var err error + c.websocketClient, err = c.sda.GetWebsocketClient(ctx, false, false) + if err != nil { + return nil, err + } + } + return c.websocketClient, nil +} + func (c *client) getLokiClient(ctx context.Context) (clusterutil.ServiceClient, error) { c.lock.Lock() defer c.lock.Unlock() - // TODO - reset this client on error from Loki + // TODO - reset this client on error after HTTP request if c.client == nil { var err error c.client, err = c.sda.GetClient(ctx, false, false) diff --git a/provider/cluster/util/service_client.go b/provider/cluster/util/service_client.go index 049f9a39b5..ae3f66b11d 100644 --- a/provider/cluster/util/service_client.go +++ b/provider/cluster/util/service_client.go @@ -4,9 +4,12 @@ import ( "context" "crypto/tls" "fmt" + "github.com/gorilla/websocket" "io" + "io/ioutil" "net" "net/http" + "strings" ) func (hwsc *httpWrapperServiceClient) CreateRequest(ctx context.Context, method, path string, body io.Reader) (*http.Request, error) { @@ -23,6 +26,7 @@ func (hwsc *httpWrapperServiceClient) DoRequest(req *http.Request) (*http.Respon return hwsc.httpClient.Do(req) } + func newHTTPWrapperServiceClient(isHTTPS, secure bool, baseURL string) *httpWrapperServiceClient { netDialer := &net.Dialer{} @@ -64,3 +68,35 @@ func newHTTPWrapperServiceClientWithTransport(transport http.RoundTripper, baseU }, } } + + +func newWebsocketWrapperServiceClientFromDialer(dialer websocket.Dialer, baseURL string) *websocketWrapperServiceClient { + return &websocketWrapperServiceClient{ + url: baseURL, + dialer: &dialer, + } +} + +func (wwsc *websocketWrapperServiceClient) DialWebsocket(ctx context.Context, path string, requestHeaders http.Header) (*websocket.Conn, error) { + dialUrl := fmt.Sprintf("%s/%s", wwsc.url, path) + + if strings.HasPrefix(dialUrl, "https") { + dialUrl = strings.Replace(dialUrl, "https", "wss", 1) + } else if strings.HasPrefix(dialUrl, "http") { + dialUrl = strings.Replace(dialUrl, "http", "ws", 1) + } + + conn, resp, err := wwsc.dialer.DialContext(ctx, dialUrl, requestHeaders) + if err != nil { + if resp == nil { + return nil, err + } + + buf, _ := ioutil.ReadAll(resp.Body) // nolint + return nil, fmt.Errorf("%w: error response from server when dialing websocket; status %v; response: %s", err, resp.StatusCode, + string(buf)) + } + + return conn, err + +} \ No newline at end of file diff --git a/provider/cluster/util/service_discovery_agent.go b/provider/cluster/util/service_discovery_agent.go index 78249122e6..3320fbbb20 100644 --- a/provider/cluster/util/service_discovery_agent.go +++ b/provider/cluster/util/service_discovery_agent.go @@ -2,10 +2,12 @@ package util import ( "context" + "crypto/tls" "errors" "fmt" "github.com/boz/go-lifecycle" "github.com/desertbit/timer" + "github.com/gorilla/websocket" "github.com/ovrclk/akash/util/runner" "github.com/tendermint/tendermint/libs/log" corev1 "k8s.io/api/core/v1" @@ -14,6 +16,8 @@ import ( "k8s.io/client-go/rest" "math/rand" "net" + "net/http" + "net/url" "time" ) @@ -134,7 +138,7 @@ func (sda *serviceDiscoveryAgent) setResult(factory clientFactory, err error) { } } -func (sda *serviceDiscoveryAgent) GetClient(ctx context.Context, isHTTPS, secure bool) (ServiceClient, error) { +func (sda *serviceDiscoveryAgent) getClientFactory(ctx context.Context) (clientFactory, error){ errCh := make(chan error, 1) resultCh := make(chan clientFactory, 1) req := serviceDiscoveryRequest{ @@ -152,12 +156,33 @@ func (sda *serviceDiscoveryAgent) GetClient(ctx context.Context, isHTTPS, secure select { case result := <-resultCh: - return result(isHTTPS, secure), nil + return result, nil case err := <-errCh: return nil, err case <-ctx.Done(): return nil, ctx.Err() } + +} + +func (sda *serviceDiscoveryAgent) GetClient(ctx context.Context, isHTTPS, secure bool) (ServiceClient, error) { + cf, err := sda.getClientFactory(ctx) + + if err != nil { + return nil, err + } + + return cf.MakeServiceClient(isHTTPS, secure), nil +} + +func (sda *serviceDiscoveryAgent) GetWebsocketClient(ctx context.Context, isHTTPS, secure bool) (WebsocketServiceClient, error) { + cf, err := sda.getClientFactory(ctx) + + if err != nil { + return nil, err + } + + return cf.MakeWebsocketServiceClient(isHTTPS, secure), nil } func (sda *serviceDiscoveryAgent) discover() (clientFactory, error) { @@ -173,6 +198,57 @@ func (sda *serviceDiscoveryAgent) discover() (clientFactory, error) { return sda.discoverKube() } +type kubeClientFactory struct { + httpTransport http.RoundTripper + kubeHost string + kubeNamespace string + serviceName string + portName string + tlsConfig *tls.Config + netDialContext func(context.Context, string, string) (net.Conn, error) + proxy func(*http.Request) (*url.URL, error) + handshakeTimeout time.Duration +} + +func (kcf kubeClientFactory) MakeServiceClient(isHTTPS, secure bool) ServiceClient { + serviceName := kcf.serviceName + if isHTTPS { + serviceName = fmt.Sprintf("https:%s", kcf.serviceName) + } + /** + Documentation here: https://kubernetes.io/docs/tasks/administer-cluster/access-cluster-services/ + + The structure is + http://kubernetes_master_address/api/v1/namespaces/namespace_name/services/[https:]service_name[:port_name]/proxy + */ + proxyURL := fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:%s/proxy", kcf.kubeHost, kcf.kubeNamespace, serviceName, kcf.portName) + + return newHTTPWrapperServiceClientWithTransport(kcf.httpTransport, proxyURL) +} + +func (kcf kubeClientFactory) MakeWebsocketServiceClient(isHTTPS, secure bool) WebsocketServiceClient { + dialer := websocket.Dialer{ + NetDialContext: kcf.netDialContext, + Proxy: kcf.proxy, + TLSClientConfig: kcf.tlsConfig, + HandshakeTimeout: kcf.handshakeTimeout, + } + + serviceName := kcf.serviceName + if isHTTPS { + serviceName = fmt.Sprintf("https:%s", kcf.serviceName) + } + /** + Documentation here: https://kubernetes.io/docs/tasks/administer-cluster/access-cluster-services/ + + The structure is + http://kubernetes_master_address/api/v1/namespaces/namespace_name/services/[https:]service_name[:port_name]/proxy + */ + proxyURL := fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:%s/proxy", kcf.kubeHost, kcf.kubeNamespace, serviceName, kcf.portName) + + return newWebsocketWrapperServiceClientFromDialer(dialer, proxyURL) +} + func (sda *serviceDiscoveryAgent) discoverKube() (clientFactory, error) { // This code is retried forever, but don't wait on a result for a very long time. Just poll ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) @@ -189,8 +265,8 @@ func (sda *serviceDiscoveryAgent) discoverKube() (clientFactory, error) { ports := service.Spec.Ports selectedPort := -1 - for i, port := range ports { - if port.Name == sda.portName && corev1.ProtocolTCP == port.Protocol { + for i, kPort := range ports { + if kPort.Name == sda.portName && corev1.ProtocolTCP == kPort.Protocol { selectedPort = i break } @@ -199,7 +275,7 @@ func (sda *serviceDiscoveryAgent) discoverKube() (clientFactory, error) { if selectedPort == -1 { return nil, fmt.Errorf("%w: service %q exists but has no port %q (TCP)", errServiceDiscovery, sda.serviceName, sda.portName) } - port := ports[selectedPort] + kPort := ports[selectedPort] // Get the host for the kube cluster kubeHost := sda.kubeConfig.Host @@ -210,23 +286,47 @@ func (sda *serviceDiscoveryAgent) discoverKube() (clientFactory, error) { return nil, err } - return func(isHTTPS, _ bool) ServiceClient { - serviceName := service.Name - if isHTTPS { - serviceName = fmt.Sprintf("https:%s", service.Name) - } - /** - Documentation here: https://kubernetes.io/docs/tasks/administer-cluster/access-cluster-services/ - The structure is - http://kubernetes_master_address/api/v1/namespaces/namespace_name/services/[https:]service_name[:port_name]/proxy - */ - proxyURL := fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:%s/proxy", kubeHost, service.Namespace, serviceName, port.Name) + tlsConfig, err := rest.TLSConfigFor(sda.kubeConfig) + if err != nil { + return nil, err + } - return newHTTPWrapperServiceClientWithTransport(httpTransport, proxyURL) + return kubeClientFactory{ + httpTransport: httpTransport, + kubeHost: kubeHost, + kubeNamespace: service.Namespace, + serviceName: service.Name, + portName: kPort.Name, + tlsConfig: tlsConfig, + netDialContext: sda.kubeConfig.Dial, + proxy: sda.kubeConfig.Proxy, + handshakeTimeout: sda.kubeConfig.Timeout, }, nil } +type dnsClientFactory struct { + target string + port uint16 + +} + +func (dcf dnsClientFactory) MakeServiceClient(isHTTPS, secure bool) ServiceClient { + proto := "http" + if isHTTPS { + proto = "https" + } + discoveredURL := fmt.Sprintf("%s://%v:%v", proto, dcf.target, dcf.port) + + return newHTTPWrapperServiceClient(isHTTPS, secure, discoveredURL) +} + +func (dcf dnsClientFactory) MakeWebsocketServiceClient(isHTTPS, secure bool) WebsocketServiceClient { + // TODO + panic("not implemented") +} + + func (sda *serviceDiscoveryAgent) discoverDNS() (clientFactory, error) { // FUTURE - try and find a 3rd party API that allows timeouts to be put on this request _, addrs, err := net.LookupSRV(sda.portName, "TCP", fmt.Sprintf("%s.%s.svc.cluster.local", sda.serviceName, sda.namespace)) @@ -245,13 +345,9 @@ func (sda *serviceDiscoveryAgent) discoverDNS() (clientFactory, error) { // nolint:gosec addrI := rand.Int31n(int32(len(addrs))) choice := result[addrI] - return func(isHTTPS, secure bool) ServiceClient { - proto := "http" - if isHTTPS { - proto = "https" - } - discoveredURL := fmt.Sprintf("%s://%v:%v", proto, choice.Target, choice.Port) - return newHTTPWrapperServiceClient(isHTTPS, secure, discoveredURL) + return dnsClientFactory{ + target: choice.Target, + port: choice.Port, }, nil } diff --git a/provider/cluster/util/service_discovery_agent_static.go b/provider/cluster/util/service_discovery_agent_static.go index 6f529a8039..7134839717 100644 --- a/provider/cluster/util/service_discovery_agent_static.go +++ b/provider/cluster/util/service_discovery_agent_static.go @@ -19,3 +19,8 @@ func (ssda staticServiceDiscoveryAgent) GetClient(ctx context.Context, isHTTPS, url := fmt.Sprintf("%s://%v:%v", proto, ssda.Target, ssda.Port) return newHTTPWrapperServiceClient(isHTTPS, secure, url), nil } + +func (ssda staticServiceDiscoveryAgent) GetWebsocketClient(ctx context.Context, isHTTP, secure bool) (WebsocketServiceClient, error) { + // TODO + panic("not implemented") +} \ No newline at end of file diff --git a/provider/cluster/util/service_discovery_agent_types.go b/provider/cluster/util/service_discovery_agent_types.go index 138cf3ad60..7a7556baf7 100644 --- a/provider/cluster/util/service_discovery_agent_types.go +++ b/provider/cluster/util/service_discovery_agent_types.go @@ -3,6 +3,7 @@ package util import ( "context" "github.com/boz/go-lifecycle" + "github.com/gorilla/websocket" "github.com/tendermint/tendermint/libs/log" "io" "k8s.io/client-go/kubernetes" @@ -13,6 +14,7 @@ import ( type ServiceDiscoveryAgent interface { Stop() GetClient(ctx context.Context, isHTTPS, secure bool) (ServiceClient, error) + GetWebsocketClient(ctx context.Context, isHTTPS, secure bool) (WebsocketServiceClient, error) DiscoverNow() } @@ -21,6 +23,10 @@ type ServiceClient interface { DoRequest(req *http.Request) (*http.Response, error) } +type WebsocketServiceClient interface { + DialWebsocket(ctx context.Context, path string, requestHeader http.Header) (*websocket.Conn, error) +} + type serviceDiscoveryAgent struct { serviceName string namespace string @@ -43,9 +49,18 @@ type serviceDiscoveryRequest struct { resultCh chan<- clientFactory } -type clientFactory func(isHttps, secure bool) ServiceClient +type clientFactory interface { + MakeServiceClient(isHTTPS, secure bool) ServiceClient + MakeWebsocketServiceClient(isHTTPS, secure bool) WebsocketServiceClient +} type httpWrapperServiceClient struct { httpClient *http.Client url string } + +type websocketWrapperServiceClient struct { + url string + dialer *websocket.Dialer +} + diff --git a/provider/gateway/rest/router.go b/provider/gateway/rest/router.go index e2f84ed7fc..8985564235 100644 --- a/provider/gateway/rest/router.go +++ b/provider/gateway/rest/router.go @@ -187,6 +187,60 @@ func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopcl return router } +func logQueryFollowHandler(logger log.Logger, + lokiClient loki.Client, + leaseID mtypes.LeaseID, + rw http.ResponseWriter, + req *http.Request, + serviceName string, + replicaIndex uint, + runIndex int, + startTime time.Time, + ) { + + upgrader := websocket.Upgrader{ + ReadBufferSize: 0, + WriteBufferSize: 65535, + } + + ws, err := upgrader.Upgrade(rw, req, nil) + if err != nil { + // At this point the connection either has a response sent already + // or it has been closed + logger.Error("failed handshake", "err", err) + return + } + + ws.EnableWriteCompression(true) + sendLogLine := func(at time.Time, line string) error { + err := ws.SetWriteDeadline(time.Now().Add(time.Second * 10)) // TODO - configurable ??? + if err != nil { + return err + } + return ws.WriteMessage(websocket.TextMessage, []byte(line)) + } + + onDropped := func() error { + return nil // Ignore for now + } + + err = lokiClient.TailLogsByService(req.Context(), + leaseID, + serviceName, + replicaIndex, + runIndex, + startTime, + sendLogLine, + onDropped) + + if err != nil { + if ! errors.Is(err, context.Canceled) { + logger.Error("failed during tail of logs", "err", err) + } + } + +} + func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc{ return func(rw http.ResponseWriter, req *http.Request) { leaseID := requestLeaseID(req) @@ -202,7 +256,7 @@ func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc return } - // TODO - validate replica index + // TODO - validate replica index points to a valid replica // TODO - get the following from the query, with defaults query := req.URL.Query() @@ -273,6 +327,38 @@ func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc } } + follow := false + followStr := query.Get("follow") + // TODO - if follow is true and forward is not true then error + // TODO - if endTime is specified and follow is true then error + if len(followStr) != 0 { + switch(followStr) { + case "0": + follow = false + case "1": + follow = true + + default: + logger.Error("unknown value for follow", "follow", forwardStr) + rw.WriteHeader(http.StatusBadRequest) + return + } + } + + if follow { + // Upgrade to a websocket + logQueryFollowHandler(logger, + lokiClient, + leaseID, + rw, + req, + serviceName, + uint(replicaIndex), + runIndex, + startTime) + return + } + logs, err := lokiClient.GetLogByService(req.Context(),leaseID, serviceName, uint(replicaIndex), runIndex, startTime, From c53147c64483691da2198fdc8af3cd225f0d2af9 Mon Sep 17 00:00:00 2001 From: Eric Urban Date: Tue, 8 Mar 2022 09:48:38 -0600 Subject: [PATCH 09/13] actually works for streaming logs now --- provider/cluster/kube/loki/loki.go | 6 ++++-- provider/cluster/util/service_client.go | 11 +++++++--- .../cluster/util/service_discovery_agent.go | 20 +++++++++++++++++-- provider/gateway/rest/router.go | 9 +++++++++ 4 files changed, 39 insertions(+), 7 deletions(-) diff --git a/provider/cluster/kube/loki/loki.go b/provider/cluster/kube/loki/loki.go index 12fa911fa3..5f38ac95fd 100644 --- a/provider/cluster/kube/loki/loki.go +++ b/provider/cluster/kube/loki/loki.go @@ -192,7 +192,7 @@ type entry struct { //Stream represents a log stream. It includes a set of log entries and their labels. type stream struct { Labels map[string]string `json:"stream"` - Entries []entry `json:"values"` + Entries [][]string `json:"values"` } // tailResponse represents the http json response to a tail query from loki @@ -273,7 +273,9 @@ func (c *client) TailLogsByService(ctx context.Context, leaseID mtypes.LeaseID, for _, stream := range logs.Streams { for _, entry := range stream.Entries { - err = eachLogLine(entry.Timestamp, entry.Line) + at := time.Time{} // TODO + line := entry[1] + err = eachLogLine(at, line) if err != nil { return err } diff --git a/provider/cluster/util/service_client.go b/provider/cluster/util/service_client.go index ae3f66b11d..b0aaefc086 100644 --- a/provider/cluster/util/service_client.go +++ b/provider/cluster/util/service_client.go @@ -13,6 +13,7 @@ import ( ) func (hwsc *httpWrapperServiceClient) CreateRequest(ctx context.Context, method, path string, body io.Reader) (*http.Request, error) { + path = strings.TrimLeft(path, "/") serviceURL := fmt.Sprintf("%s/%s", hwsc.url, path) req, err := http.NewRequestWithContext(ctx, method, serviceURL, body) if err != nil { @@ -26,8 +27,8 @@ func (hwsc *httpWrapperServiceClient) DoRequest(req *http.Request) (*http.Respon return hwsc.httpClient.Do(req) } - func newHTTPWrapperServiceClient(isHTTPS, secure bool, baseURL string) *httpWrapperServiceClient { + baseURL = strings.TrimRight(baseURL, "/") netDialer := &net.Dialer{} // By default, block both things @@ -61,6 +62,7 @@ func newHTTPWrapperServiceClient(isHTTPS, secure bool, baseURL string) *httpWrap } func newHTTPWrapperServiceClientWithTransport(transport http.RoundTripper, baseURL string) *httpWrapperServiceClient { + baseURL = strings.TrimRight(baseURL, "/") return &httpWrapperServiceClient{ url: baseURL, httpClient: &http.Client{ @@ -71,6 +73,7 @@ func newHTTPWrapperServiceClientWithTransport(transport http.RoundTripper, baseU func newWebsocketWrapperServiceClientFromDialer(dialer websocket.Dialer, baseURL string) *websocketWrapperServiceClient { + baseURL = strings.TrimRight(baseURL, "/") return &websocketWrapperServiceClient{ url: baseURL, dialer: &dialer, @@ -78,6 +81,7 @@ func newWebsocketWrapperServiceClientFromDialer(dialer websocket.Dialer, baseURL } func (wwsc *websocketWrapperServiceClient) DialWebsocket(ctx context.Context, path string, requestHeaders http.Header) (*websocket.Conn, error) { + path = strings.TrimLeft(path, "/") dialUrl := fmt.Sprintf("%s/%s", wwsc.url, path) if strings.HasPrefix(dialUrl, "https") { @@ -93,8 +97,9 @@ func (wwsc *websocketWrapperServiceClient) DialWebsocket(ctx context.Context, pa } buf, _ := ioutil.ReadAll(resp.Body) // nolint - return nil, fmt.Errorf("%w: error response from server when dialing websocket; status %v; response: %s", err, resp.StatusCode, - string(buf)) + bufStr := strings.ReplaceAll(string(buf), "\n", " ") + return nil, fmt.Errorf("%w: error response from server when dialing websocket %q; status %v; response: %s", err, dialUrl, resp.StatusCode, + bufStr) } return conn, err diff --git a/provider/cluster/util/service_discovery_agent.go b/provider/cluster/util/service_discovery_agent.go index 3320fbbb20..97bb34896b 100644 --- a/provider/cluster/util/service_discovery_agent.go +++ b/provider/cluster/util/service_discovery_agent.go @@ -308,6 +308,7 @@ func (sda *serviceDiscoveryAgent) discoverKube() (clientFactory, error) { type dnsClientFactory struct { target string port uint16 + handshakeTimeout time.Duration } @@ -322,8 +323,22 @@ func (dcf dnsClientFactory) MakeServiceClient(isHTTPS, secure bool) ServiceClien } func (dcf dnsClientFactory) MakeWebsocketServiceClient(isHTTPS, secure bool) WebsocketServiceClient { - // TODO - panic("not implemented") + proto := "ws" + if isHTTPS { + proto = "wss" + } + discoveredURL := fmt.Sprintf("%s://%v:%v", proto, dcf.target, dcf.port) + + tlsConfig := &tls.Config{ + InsecureSkipVerify: !secure, + } + + dialer := websocket.Dialer{ + TLSClientConfig: tlsConfig, + HandshakeTimeout: dcf.handshakeTimeout, + } + + return newWebsocketWrapperServiceClientFromDialer(dialer, discoveredURL) } @@ -349,5 +364,6 @@ func (sda *serviceDiscoveryAgent) discoverDNS() (clientFactory, error) { return dnsClientFactory{ target: choice.Target, port: choice.Port, + handshakeTimeout: sda.kubeConfig.Timeout, }, nil } diff --git a/provider/gateway/rest/router.go b/provider/gateway/rest/router.go index 8985564235..f20aa5c9f6 100644 --- a/provider/gateway/rest/router.go +++ b/provider/gateway/rest/router.go @@ -211,6 +211,7 @@ func logQueryFollowHandler(logger log.Logger, return } + // TODO - figoure out how to flush this at a regular cadence to avoid buffering ? ws.EnableWriteCompression(true) sendLogLine := func(at time.Time, line string) error { err := ws.SetWriteDeadline(time.Now().Add(time.Second * 10)) // TODO - configurable ??? @@ -239,6 +240,14 @@ func logQueryFollowHandler(logger log.Logger, } } + err = ws.WriteControl(websocket.CloseMessage,[]byte{}, time.Now().Add(time.Second * 30)) + if err != nil { + logger.Error("could not send websocket close message", "err", err) + } + err = ws.Close() + if err != nil { + logger.Error("could not close websocket", "err", err) + } } func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc{ From 42c59acac7f27e3bd77330255cd9da8c52530687 Mon Sep 17 00:00:00 2001 From: Eric Urban Date: Tue, 8 Mar 2022 12:50:23 -0600 Subject: [PATCH 10/13] proof of concept work --- provider/cluster/kube/loki/loki.go | 45 +++++++++++------- provider/gateway/rest/router.go | 73 +++++++++++++++++++++++++----- 2 files changed, 90 insertions(+), 28 deletions(-) diff --git a/provider/cluster/kube/loki/loki.go b/provider/cluster/kube/loki/loki.go index 5f38ac95fd..465a32698f 100644 --- a/provider/cluster/kube/loki/loki.go +++ b/provider/cluster/kube/loki/loki.go @@ -58,7 +58,7 @@ var ( type Client interface { FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([]LogStatus, error) - GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime, endTime time.Time, forward bool) (LogResult, error) + GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime, endTime time.Time, forward bool, limit uint) (LogResult, error) TailLogsByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime time.Time, eachLogLine func(at time.Time, line string)error, onFirstDroppedLog func() error) error @@ -133,8 +133,7 @@ func (c *client) discoverFilename(ctx context.Context, leaseID mtypes.LeaseID, s httpQueryString.Set("start", fmt.Sprintf("%d", startTime.UnixNano())) httpQueryString.Set("end", fmt.Sprintf("%d", endTime.UnixNano())) httpQueryString.Set("limit", "1") - // TODO - guard against injection - lokiQuery := fmt.Sprintf("{%s=%q}", podLabel, podName) // Note this is not JSON + lokiQuery := fmt.Sprintf("{%s=%s}", podLabel, safeQuote(podName)) // Note this is not JSON httpQueryString.Set("query", lokiQuery) httpQueryString.Set("direction", string(BACKWARD)) @@ -148,6 +147,7 @@ func (c *client) discoverFilename(ctx context.Context, leaseID mtypes.LeaseID, s resp, err := lc.DoRequest(request) if err != nil { + c.clearClient() return "" , fmt.Errorf("loki filename discovery log query for pod %q failed: %w", podName, err) } @@ -222,7 +222,7 @@ func (c *client) TailLogsByService(ctx context.Context, leaseID mtypes.LeaseID, httpQueryString := url.Values{} httpQueryString.Set("from", fmt.Sprintf("%d", startTime.UnixNano())) httpQueryString.Set("limit", "1000") // TODO - configurable or something? Maybe user requestable? - httpQueryString.Set("delay_for", "3") // TODO - what does this do + httpQueryString.Set("delay_for", "0") var filenameLabelFilter string specifyRunIndex := runIndex >= 0 @@ -236,12 +236,11 @@ func (c *client) TailLogsByService(ctx context.Context, leaseID mtypes.LeaseID, lokiQueryBuf := &bytes.Buffer{} // Note this is not JSON - // TODO - guard against injection here even though it is unlikely _, _ = fmt.Fprint(lokiQueryBuf, "{") - _, _ = fmt.Fprintf(lokiQueryBuf, "%s=%q", podLabel, podName) + _, _ = fmt.Fprintf(lokiQueryBuf, "%s=%s", podLabel, safeQuote(podName)) // specify filename label here if runIndex >= 0 if specifyRunIndex { - _, _ = fmt.Fprintf(lokiQueryBuf,",%s=%q", filenameLabel, filenameLabelFilter) + _, _ = fmt.Fprintf(lokiQueryBuf,",%s=%s", filenameLabel, safeQuote(filenameLabelFilter)) } _, _ = fmt.Fprint(lokiQueryBuf, "}") @@ -257,6 +256,7 @@ func (c *client) TailLogsByService(ctx context.Context, leaseID mtypes.LeaseID, } conn, err := wslc.DialWebsocket(ctx, tailPath + "?" + httpQueryString.Encode(), headers) if err != nil { + c.clearClient() return err } @@ -304,14 +304,19 @@ func (c *client) getFilename(ctx context.Context, leaseID mtypes.LeaseID, startT head, tail := path.Split(filename) parts := strings.SplitN(tail, ".", 2) if len(parts) != 2 { - return "", fmt.Errorf("%w: while constructing fielname filter cannot make sense of filepath %q", ErrLoki, filename) + return "", fmt.Errorf("%w: while constructing filename filter cannot make sense of filepath %q", ErrLoki, filename) } return fmt.Sprintf("%s/%d.%s", head, runIndex, parts[1]), nil } -func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime, endTime time.Time, forward bool) (LogResult, error) { +func safeQuote(input string) string { + input = strings.ReplaceAll(input, `"`,"") + return fmt.Sprintf("%q", input) +} + +func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime, endTime time.Time, forward bool, limit uint) (LogResult, error) { lidNS := clusterutil.LeaseIDToNamespace(leaseID) // get a list of possible logs for this service possiblePods, err := c.detectRunsForLease(ctx, leaseID) @@ -351,16 +356,15 @@ func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, se httpQueryString := url.Values{} httpQueryString.Set("start", fmt.Sprintf("%d", startTime.UnixNano())) httpQueryString.Set("end", fmt.Sprintf("%d", endTime.UnixNano())) - httpQueryString.Set("limit", "1000") // TODO - configurable or something? Maybe user requestable? + httpQueryString.Set("limit", fmt.Sprintf("%d","1000")) lokiQueryBuf := &bytes.Buffer{} // Note this is not JSON - // TODO - guard against injection here even though it is unlikely _, _ = fmt.Fprint(lokiQueryBuf, "{") - _, _ = fmt.Fprintf(lokiQueryBuf, "%s=%q", podLabel, podName) + _, _ = fmt.Fprintf(lokiQueryBuf, "%s=%s", podLabel, safeQuote(podName)) // specify filename label here if runIndex >= 0 if specifyRunIndex { - _, _ = fmt.Fprintf(lokiQueryBuf,",%s=%q", filenameLabel, filenameLabelFilter) + _, _ = fmt.Fprintf(lokiQueryBuf,",%s=%s", filenameLabel, safeQuote(filenameLabelFilter)) } _, _ = fmt.Fprint(lokiQueryBuf, "}") @@ -382,6 +386,7 @@ func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, se resp, err := lc.DoRequest(request) if err != nil { + c.clearClient() return LogResult{},fmt.Errorf("loki log request failed: %w", err) } @@ -445,11 +450,19 @@ func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, se return result, nil } +func (c *client) clearClient(){ + c.lock.Lock() + defer c.lock.Unlock() + + c.websocketClient = nil + c.client = nil + +} + func (c *client) getLokiWebsocketClient(ctx context.Context) (clusterutil.WebsocketServiceClient, error) { c.lock.Lock() defer c.lock.Unlock() - // TODO - reset this client on error after HTTP request if c.websocketClient == nil { var err error c.websocketClient, err = c.sda.GetWebsocketClient(ctx, false, false) @@ -464,7 +477,6 @@ func (c *client) getLokiClient(ctx context.Context) (clusterutil.ServiceClient, c.lock.Lock() defer c.lock.Unlock() - // TODO - reset this client on error after HTTP request if c.client == nil { var err error c.client, err = c.sda.GetClient(ctx, false, false) @@ -519,6 +531,7 @@ func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([ req.Header.Add(lokiOrgIdHeader, lidNS) resp, err := lc.DoRequest(req) if err != nil { + c.clearClient() return nil, fmt.Errorf("loki label values request failed: %w", err) } @@ -560,8 +573,6 @@ func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([ returnValue[i].Present = true } - // TODO - query by pod name and then figure out how many restarts actually logged something? - return returnValue, nil } diff --git a/provider/gateway/rest/router.go b/provider/gateway/rest/router.go index f20aa5c9f6..123fe33a08 100644 --- a/provider/gateway/rest/router.go +++ b/provider/gateway/rest/router.go @@ -47,6 +47,7 @@ import ( manifestValidation "github.com/ovrclk/akash/validation" dtypes "github.com/ovrclk/akash/x/deployment/types/v1beta2" mtypes "github.com/ovrclk/akash/x/market/types/v1beta2" + providermanifest "github.com/ovrclk/akash/provider/manifest" ) type CtxAuthKey string @@ -165,10 +166,10 @@ func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopcl Methods("GET") logRouter.HandleFunc("/status", - logStatusHandler(log, lokiClient)).Methods(http.MethodGet) + logStatusHandler(log, pclient.Manifest(), lokiClient)).Methods(http.MethodGet) logRouter.HandleFunc("/query/{serviceName}/{replicaIndex}", - logQueryHandler(log, lokiClient)).Methods(http.MethodGet) + logQueryHandler(log, pclient.Manifest(), lokiClient)).Methods(http.MethodGet) srouter := lrouter.PathPrefix("/service/{serviceName}").Subrouter() srouter.Use( @@ -250,10 +251,22 @@ func logQueryFollowHandler(logger log.Logger, } } -func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc{ +func logQueryHandler(logger log.Logger, manifestClient providermanifest.Client, lokiClient loki.Client) http.HandlerFunc{ return func(rw http.ResponseWriter, req *http.Request) { leaseID := requestLeaseID(req) - // TODO - make sure the lease actually exists first + + // Make sure lease is active + active, err := manifestClient.IsActive(req.Context(), leaseID.DeploymentID()) + if err != nil { + logger.Error("could not check if lease is active", "lease-id", leaseID, err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + if !active { + rw.WriteHeader(http.StatusNotFound) + return + } serviceName := mux.Vars(req)["serviceName"] // TODO validate this exists replicaIndexStr := mux.Vars(req)["replicaIndex"] @@ -267,9 +280,20 @@ func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc // TODO - validate replica index points to a valid replica - // TODO - get the following from the query, with defaults query := req.URL.Query() + limit := uint(1000) + limitStr := query.Get("limit") + if len(limitStr) != 0 { + limit64, err := strconv.ParseUint(limitStr, 10, 31) + if err != nil { + logger.Error("could not parse limit", "limit", limitStr, "err", err) + rw.WriteHeader(http.StatusBadRequest) + return + } + limit = uint(limit64) + } + // Parse runIndex from query, if present runIndex := -1 runIndexStr := query.Get("runIndex") @@ -312,7 +336,7 @@ func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc endTime = time.Unix(endTimeInt,0) } - const timeLimit = time.Hour * 72 // TODO - configurable + const timeLimit = time.Hour * 72 // TODO - configurable by the provider duration := endTime.Sub(startTime) if duration > timeLimit || duration < 0 { logger.Error("duration of time range queried is not allowed", "startTime", startTime, "endTime", endTime) @@ -338,8 +362,6 @@ func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc follow := false followStr := query.Get("follow") - // TODO - if follow is true and forward is not true then error - // TODO - if endTime is specified and follow is true then error if len(followStr) != 0 { switch(followStr) { case "0": @@ -355,6 +377,23 @@ func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc } if follow { + if len(endTimeStr) != 0 { + logger.Error("client requested log follow with endtime") + rw.WriteHeader(http.StatusBadRequest) + return + } + + if !forward { + logger.Error("client requested log follow with reverse direction") + rw.WriteHeader(http.StatusBadRequest) + return + } + + if len(limitStr) != 0 { + logger.Error("client request log follow with limit") + rw.WriteHeader(http.StatusBadRequest) + return + } // Upgrade to a websocket logQueryFollowHandler(logger, lokiClient, @@ -371,7 +410,7 @@ func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc logs, err := lokiClient.GetLogByService(req.Context(),leaseID, serviceName, uint(replicaIndex), runIndex, startTime, - endTime, forward) + endTime, forward, limit) if err != nil { logger.Error("could not get logs for lease", "lease", leaseID, "err", err) rw.WriteHeader(http.StatusInternalServerError) @@ -387,10 +426,22 @@ func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc } } -func logStatusHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc{ +func logStatusHandler(logger log.Logger, manifestClient providermanifest.Client, lokiClient loki.Client) http.HandlerFunc{ return func(rw http.ResponseWriter, req *http.Request) { leaseID := requestLeaseID(req) - // TODO - make sure the lease actually exists first + // Make sure lease is active + active, err := manifestClient.IsActive(req.Context(), leaseID.DeploymentID()) + if err != nil { + logger.Error("could not check if lease is active", "lease-id", leaseID, err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + if !active { + rw.WriteHeader(http.StatusNotFound) + return + } + result, err := lokiClient.FindLogsByLease(req.Context(), leaseID) if err != nil { logger.Error("could not get logs for lease", "lease", leaseID, "err", err) From c9c745decc0487fa9fcfa43c09893c8f31f7db02 Mon Sep 17 00:00:00 2001 From: Eric Urban Date: Wed, 9 Mar 2022 15:06:11 -0600 Subject: [PATCH 11/13] actually retrieve logs via CLI --- provider/cluster/kube/loki/loki.go | 12 +- provider/cmd/leaseLogs.go | 273 ++++++++++++++++++++++------- provider/cmd/root.go | 1 + provider/gateway/rest/client.go | 123 ++++++++++++- provider/gateway/rest/path.go | 8 + provider/gateway/rest/router.go | 65 ++++--- 6 files changed, 385 insertions(+), 97 deletions(-) diff --git a/provider/cluster/kube/loki/loki.go b/provider/cluster/kube/loki/loki.go index 465a32698f..cae5745c3f 100644 --- a/provider/cluster/kube/loki/loki.go +++ b/provider/cluster/kube/loki/loki.go @@ -356,7 +356,7 @@ func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, se httpQueryString := url.Values{} httpQueryString.Set("start", fmt.Sprintf("%d", startTime.UnixNano())) httpQueryString.Set("end", fmt.Sprintf("%d", endTime.UnixNano())) - httpQueryString.Set("limit", fmt.Sprintf("%d","1000")) + httpQueryString.Set("limit", fmt.Sprintf("%d",limit)) lokiQueryBuf := &bytes.Buffer{} // Note this is not JSON @@ -413,7 +413,7 @@ func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, se for _, resultSet := range lokiResult.Data.Result { filepath, exists := resultSet.Stream[filenameLabel] if !exists { - return LogResult{},fmt.Errorf("%w: expected loki log result set to have label %q but it does not", ErrLoki, filenameLabel) + return LogResult{}, fmt.Errorf("%w: expected loki log result set to have label %q but it does not", ErrLoki, filenameLabel) } _, filename := path.Split(filepath) @@ -421,7 +421,7 @@ func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, se runIndex, err := strconv.ParseUint(filenameParts[0], 0, 31) if err != nil { - return LogResult{},fmt.Errorf("expected to parse filename %q as integer for kubernetes run index: %w", filename, err) + return LogResult{}, fmt.Errorf("expected to parse filename %q as integer for kubernetes run index: %w", filename, err) } for _, logEntry := range resultSet.Values { @@ -456,7 +456,6 @@ func (c *client) clearClient(){ c.websocketClient = nil c.client = nil - } func (c *client) getLokiWebsocketClient(ctx context.Context) (clusterutil.WebsocketServiceClient, error) { @@ -497,6 +496,7 @@ type LogStatus struct { ServiceName string ReplicaIndex int Present bool + Restarts uint } func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([]LogStatus, error) { lidNS := clusterutil.LeaseIDToNamespace(leaseID) @@ -526,8 +526,6 @@ func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([ return nil, err } - // TODO - on the query set end=1646072848813679310&start=1646065648813679310 or similar query args - // Based on the retention set on the provider req.Header.Add(lokiOrgIdHeader, lidNS) resp, err := lc.DoRequest(req) if err != nil { @@ -560,6 +558,7 @@ func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([ returnValue = append(returnValue, LogStatus{ ServiceName: entry.serviceName, ReplicaIndex: podData.replicaIndex, + Restarts: entry.restarts, }) } @@ -573,6 +572,7 @@ func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([ returnValue[i].Present = true } + return returnValue, nil } diff --git a/provider/cmd/leaseLogs.go b/provider/cmd/leaseLogs.go index 34e53f4a91..764175abe0 100644 --- a/provider/cmd/leaseLogs.go +++ b/provider/cmd/leaseLogs.go @@ -1,9 +1,14 @@ package cmd import ( + "bytes" + "context" "crypto/tls" + "encoding/json" "fmt" + "strconv" "sync" + "time" sdkclient "github.com/cosmos/cosmos-sdk/client" sdk "github.com/cosmos/cosmos-sdk/types" @@ -18,13 +23,22 @@ import ( cutils "github.com/ovrclk/akash/x/cert/utils" ) +const( + FlagStartTime = "start-time" + FlagEndTime = "end-time" + FlagForward = "forward" + FlagLimit = "limit" + FlagRunIndex = "run-index" +) + func leaseLogsCmd() *cobra.Command { cmd := &cobra.Command{ Use: "lease-logs", Short: "get lease logs", SilenceUsage: true, + Args: cobra.MinimumNArgs(2), RunE: func(cmd *cobra.Command, args []string) error { - return doLeaseLogs(cmd) + return doLeaseLogsV2(cmd, args) }, } @@ -33,6 +47,11 @@ func leaseLogsCmd() *cobra.Command { cmd.Flags().BoolP(flagFollow, "f", false, "Specify if the logs should be streamed. Defaults to false") cmd.Flags().Int64P(flagTail, "t", -1, "The number of lines from the end of the logs to show. Defaults to -1") cmd.Flags().StringP(flagOutput, "o", outputText, "Output format text|json. Defaults to text") + cmd.Flags().Bool(FlagForward, true, "") + cmd.Flags().String(FlagEndTime, "", "") + cmd.Flags().String(FlagStartTime, "", "") + cmd.Flags().Uint(FlagLimit, 100, "") + cmd.Flags().Int(FlagRunIndex, -1, "") return cmd } @@ -52,16 +71,61 @@ func leaseLogStatusCmd() *cobra.Command { return cmd } -func doLeaseLogsStatus(cmd *cobra.Command) error { - /** - cctx, err := sdkclient.GetClientTxContext(cmd) +func withLeasesForDeployment(ctx context.Context, cctx sdkclient.Context, deploymentID dtypes.DeploymentID, provider string,gseq, oseq uint32, fn func(leaseID mtypes.LeaseID) error) error { + filter := mtypes.LeaseFilters{ + Owner: deploymentID.Owner, + DSeq: deploymentID.DSeq, + State: mtypes.Lease_State_name[int32(mtypes.LeaseActive)], + } + + if len(provider) != 0 { + filter.Provider = provider + } + + if gseq > 0 { + filter.GSeq = gseq + } + + if oseq > 0 { + filter.OSeq = oseq + } + + cclient := akashclient.NewQueryClientFromCtx(cctx) + resp, err := cclient.Leases(ctx, &mtypes.QueryLeasesRequest{ + Filters: filter, + }) if err != nil { return err } - cert, err := cutils.LoadAndQueryCertificateForAccount(cmd.Context(), cctx, cctx.Keyring) + if len(resp.Leases) == 0 { + msg := &bytes.Buffer{} + _, _ = fmt.Fprintf(msg, "dseq=%v", filter.DSeq) + if len(provider) > 0 { + _, _ = fmt.Fprintf(msg, " provider=%v", filter.Provider) + } + if filter.GSeq > 0 { + _, _ = fmt.Fprintf(msg, " gseq=%v", filter.GSeq) + } + if filter.OSeq > 0 { + _, _ = fmt.Fprintf(msg, " oseq=%v", filter.OSeq) + } + return fmt.Errorf("%w: %s",errNoActiveLease, msg.String()) + } + + for _, lease := range resp.Leases { + err = fn(lease.Lease.LeaseID) + if err != nil { + return err + } + } + return nil +} + +func doLeaseLogsStatus(cmd *cobra.Command) error { + cctx, err := sdkclient.GetClientTxContext(cmd) if err != nil { - return markRPCServerError(err) + return err } dseq, err := dseqFromFlags(cmd.Flags()) @@ -69,96 +133,181 @@ func doLeaseLogsStatus(cmd *cobra.Command) error { return err } - leases, err := leasesForDeployment(cmd.Context(), cctx, cmd.Flags(), dtypes.DeploymentID{ + deploymentID := dtypes.DeploymentID{ Owner: cctx.GetFromAddress().String(), DSeq: dseq, - }) - if err != nil { - return markRPCServerError(err) } - svcs, err := cmd.Flags().GetString(FlagService) + provider, err := cmd.Flags().GetString(FlagProvider) + if err != nil { + return err + } + gseq, err := cmd.Flags().GetUint32(FlagGSeq) + if err != nil { + return err + } + oseq, err := cmd.Flags().GetUint32(FlagOSeq) if err != nil { return err } - type result struct { - lid mtypes.LeaseID - error error - stream *gwrest.ServiceLogs + cert, err := cutils.LoadAndQueryCertificateForAccount(cmd.Context(), cctx, cctx.Keyring) + if err != nil { + return markRPCServerError(err) } - streams := make([]result, 0, len(leases)) + return withLeasesForDeployment(cmd.Context(), + cctx, + deploymentID, + provider, + gseq, + oseq, + func(leaseID mtypes.LeaseID) error { + // TODO - use client directory object here? + prov, _ := sdk.AccAddressFromBech32(leaseID.GetProvider()) + gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + if err != nil { + return err + } + status, err := gclient.LeaseLogsStatus(cmd.Context(), leaseID) + if err != nil { + return err + } - ctx := cmd.Context() + _ = status // TODO - print status + buf := &bytes.Buffer{} + encoder := json.NewEncoder(buf) + err = encoder.Encode(status) + if err != nil { + return err + } + return cctx.PrintBytes(buf.Bytes()) + }) +} - for _, lid := range leases { - stream := result{lid: lid} - prov, _ := sdk.AccAddressFromBech32(lid.Provider) - gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) - if err == nil { - stream.stream, stream.error = gclient.LeaseLogs(ctx, lid, svcs, follow, tailLines) - } else { - stream.error = err - } - streams = append(streams, stream) + +func doLeaseLogsV2(cmd *cobra.Command, args []string) error { + serviceName := args[0] + replicaIndexStr := args[1] + + replicaIndex, err := strconv.ParseUint(replicaIndexStr, 10, 31) + if err != nil { + // TODO - better error here + return err } - var wgStreams sync.WaitGroup + cctx, err := sdkclient.GetClientTxContext(cmd) + if err != nil { + return err + } - type logEntry struct { - gwrest.ServiceLogMessage `json:",inline"` - Lid mtypes.LeaseID `json:"lease_id"` + dseq, err := dseqFromFlags(cmd.Flags()) + if err != nil { + return err } - outch := make(chan logEntry) + deploymentID := dtypes.DeploymentID{ + Owner: cctx.GetFromAddress().String(), + DSeq: dseq, + } - printFn := func(evt logEntry) { - fmt.Printf("[%s][%s] %s\n", evt.Lid, evt.Name, evt.Message) + provider, err := cmd.Flags().GetString(FlagProvider) + if err != nil { + return err + } + gseq, err := cmd.Flags().GetUint32(FlagGSeq) + if err != nil { + return err + } + oseq, err := cmd.Flags().GetUint32(FlagOSeq) + if err != nil { + return err } - if outputFormat == "json" { - printFn = func(evt logEntry) { - _ = cmdcommon.PrintJSON(cctx, evt) - } + cert, err := cutils.LoadAndQueryCertificateForAccount(cmd.Context(), cctx, cctx.Keyring) + if err != nil { + return markRPCServerError(err) } - go func() { - for evt := range outch { - printFn(evt) - } - }() + limit, err := cmd.Flags().GetUint(FlagLimit) + if err != nil { + return err + } + startTimeStr, err := cmd.Flags().GetString(FlagStartTime) + if err != nil { + return err + } + endTimeStr,err := cmd.Flags().GetString(FlagEndTime) + if err != nil { + return err + } + forward, err := cmd.Flags().GetBool(FlagForward) + if err != nil { + return err + } + runIndex, err := cmd.Flags().GetInt(FlagRunIndex) - for _, stream := range streams { - if stream.error != nil { - continue + startTime := time.Time{} + if len(startTimeStr) != 0 { + startTime, err = time.Parse(time.RFC3339, startTimeStr) + if err != nil { + return err + } + } + endTime := time.Time{} + if len(endTimeStr) != 0 { + endTime, err = time.Parse(time.RFC3339, endTimeStr) + if err != nil { + return err } + } - wgStreams.Add(1) - go func(stream result) { - defer wgStreams.Done() + return withLeasesForDeployment(cmd.Context(), + cctx, + deploymentID, + provider, + gseq, + oseq, + func(leaseID mtypes.LeaseID) error { + // TODO - use client directory object here? + prov, _ := sdk.AccAddressFromBech32(leaseID.GetProvider()) + gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + if err != nil { + return err + } - for res := range stream.stream.Stream { - outch <- logEntry{ - ServiceLogMessage: res, - Lid: stream.lid, + result, err := gclient.LeaseLogsV2(cmd.Context(), + leaseID, + serviceName, + uint(replicaIndex), + runIndex, + startTime, + endTime, + forward, + limit) + + if err != nil { + clientError, ok := err.(gwrest.ClientResponseError) + if ok{ + _ = cctx.PrintString(clientError.ClientError()) } + return err } - }(stream) - } - - wgStreams.Wait() - close(outch) - return nil + // TODO - check output flag for format + buf := &bytes.Buffer{} + encoder := json.NewEncoder(buf) + err = encoder.Encode(result) + if err != nil { + return err + } - */ - return nil + return cctx.PrintBytes(buf.Bytes()) + }) } func doLeaseLogs(cmd *cobra.Command) error { - // TODO - clean up most of this into some sort of simple "withEachLease" function that calls a // function for each lease found cctx, err := sdkclient.GetClientTxContext(cmd) diff --git a/provider/cmd/root.go b/provider/cmd/root.go index 7c96443442..58b03b24c8 100644 --- a/provider/cmd/root.go +++ b/provider/cmd/root.go @@ -37,6 +37,7 @@ func RootCmd() *cobra.Command { cmd.AddCommand(clusterNSCmd()) cmd.AddCommand(migrate()) cmd.AddCommand(RunResourceServerCmd()) + cmd.AddCommand(leaseLogStatusCmd()) return cmd } diff --git a/provider/gateway/rest/client.go b/provider/gateway/rest/client.go index 66b2cb1060..351bff7cda 100644 --- a/provider/gateway/rest/client.go +++ b/provider/gateway/rest/client.go @@ -8,6 +8,7 @@ import ( "crypto/x509" "encoding/json" "fmt" + "github.com/ovrclk/akash/provider/cluster/kube/loki" "io" "net/http" "net/url" @@ -55,6 +56,18 @@ type Client interface { tty bool, tsq <-chan remotecommand.TerminalSize) error MigrateHostnames(ctx context.Context, hostnames []string, dseq uint64, gseq uint32) error + + // TODO - this should be part of an optional client interface only avaiable if the provider supports it + LeaseLogsStatus(ctx context.Context, leaseID mtypes.LeaseID) ([]loki.LogStatus, error) + LeaseLogsV2(ctx context.Context, + leaseID mtypes.LeaseID, + serviceName string, + replicaIndex uint, + runIndex int, + startTime time.Time, + endTime time.Time, + forward bool, + limit uint) (loki.LogResult, error) } type JwtClient interface { @@ -657,14 +670,17 @@ func (c *client) getStatus(ctx context.Context, uri string, obj interface{}) err return dec.Decode(obj) } -func createClientResponseErrorIfNotOK(resp *http.Response, responseBuf *bytes.Buffer) error { +func createClientResponseErrorIfNotOK(resp *http.Response, responseBuf io.Reader) error { if resp.StatusCode == http.StatusOK { return nil } + errString := &bytes.Buffer{} + _, _ = io.Copy(errString, responseBuf) + return ClientResponseError{ Status: resp.StatusCode, - Message: responseBuf.String(), + Message: errString.String(), } } @@ -775,6 +791,109 @@ func (c *client) LeaseLogs(ctx context.Context, return logs, nil } +func (c *client) LeaseLogsStatus(ctx context.Context, leaseID mtypes.LeaseID) ([]loki.LogStatus, error) { + path := c.host.String() + "/" + leaseLogsStatusPath(leaseID) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", contentTypeJSON) + + resp, err := c.hclient.Do(req) + if err != nil { + return nil, err + } + responseBuf := &bytes.Buffer{} + _, err = io.Copy(responseBuf, resp.Body) + defer func() { + _ = resp.Body.Close() + }() + if err != nil { + return nil, err + } + + err = createClientResponseErrorIfNotOK(resp, bytes.NewReader(responseBuf.Bytes())) + if err != nil { + return nil, err + } + + var obj []loki.LogStatus + decoder := json.NewDecoder(responseBuf) + err = decoder.Decode(&obj) + if err != nil { + return nil, err + } + + return obj, nil +} + +func (c *client) LeaseLogsV2(ctx context.Context, leaseID mtypes.LeaseID, +serviceName string, +replicaIndex uint, +runIndex int, +startTime time.Time, +endTime time.Time, +forward bool, +limit uint) (loki.LogResult, error) { + + query := url.Values{} + if forward { + query.Set(forwardKey, "1") + } else { + query.Set(forwardKey, "0") + } + + if limit > 0 { + query.Set(limitKey, fmt.Sprintf("%d", limit)) + } + + if runIndex > 0 { + query.Set(runIndexKey, fmt.Sprintf("%d", runIndex)) + } + + if !startTime.Equal(time.Time{}) { + query.Set(startTimeKey, fmt.Sprintf("%d", startTime.Unix())) + } + + if !endTime.Equal(time.Time{}) { + query.Set(endTimeKey, fmt.Sprintf("%d", endTime.Unix())) + } + + path := c.host.String() + "/" + leaseLogsQuery(leaseID, serviceName, replicaIndex) + "?" + query.Encode() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil) + if err != nil { + return loki.LogResult{}, err + } + req.Header.Set("Content-Type", contentTypeJSON) + + resp, err := c.hclient.Do(req) + if err != nil { + return loki.LogResult{}, err + } + responseBuf := &bytes.Buffer{} + _, err = io.Copy(responseBuf, resp.Body) + defer func() { + _ = resp.Body.Close() + }() + if err != nil { + return loki.LogResult{}, err + } + + err = createClientResponseErrorIfNotOK(resp, bytes.NewReader(responseBuf.Bytes())) + if err != nil { + return loki.LogResult{}, err + } + + var obj loki.LogResult + decoder := json.NewDecoder(responseBuf) + err = decoder.Decode(&obj) + if err != nil { + return loki.LogResult{}, err + } + + return obj, nil +} + // parseCloseMessage extract close reason from websocket close message // "websocket: [error code]: [client reason]" func parseCloseMessage(msg string) string { diff --git a/provider/gateway/rest/path.go b/provider/gateway/rest/path.go index 61a841abca..b7819b0cfe 100644 --- a/provider/gateway/rest/path.go +++ b/provider/gateway/rest/path.go @@ -52,3 +52,11 @@ func serviceStatusPath(id mtypes.LeaseID, service string) string { func serviceLogsPath(id mtypes.LeaseID) string { return fmt.Sprintf("%s/logs", leasePath(id)) } + +func leaseLogsStatusPath(leaseID mtypes.LeaseID) string { + return fmt.Sprintf("%s/logs/status", leasePath(leaseID)) +} + +func leaseLogsQuery(leaseID mtypes.LeaseID, serviceName string, replicaIndex uint) string { + return fmt.Sprintf("%s/logs/query/%s/%d", leasePath(leaseID), serviceName, replicaIndex) +} \ No newline at end of file diff --git a/provider/gateway/rest/router.go b/provider/gateway/rest/router.go index 123fe33a08..3c794eeb8b 100644 --- a/provider/gateway/rest/router.go +++ b/provider/gateway/rest/router.go @@ -71,6 +71,15 @@ const ( websocketInternalServerErrorCode = 4000 websocketLeaseNotFound = 4001 manifestSubmitTimeout = 120 * time.Second + + serviceNameKey = "serviceName" + replicaIndexKey = "replicaIndex" + limitKey = "limit" + runIndexKey = "runIndex" + startTimeKey = "startTime" + endTimeKey = "endTime" + forwardKey = "forward" + followKey = "follow" ) type wsStreamConfig struct { @@ -226,6 +235,7 @@ func logQueryFollowHandler(logger log.Logger, return nil // Ignore for now } + // TODO - put a hard time limit on this to prevent forever running goroutines? err = lokiClient.TailLogsByService(req.Context(), leaseID, serviceName, @@ -268,13 +278,13 @@ func logQueryHandler(logger log.Logger, manifestClient providermanifest.Client, return } - serviceName := mux.Vars(req)["serviceName"] // TODO validate this exists - replicaIndexStr := mux.Vars(req)["replicaIndex"] + serviceName := mux.Vars(req)[serviceNameKey] // TODO validate this exists + replicaIndexStr := mux.Vars(req)[replicaIndexKey] replicaIndex, err := strconv.ParseUint(replicaIndexStr, 10, 31) if err != nil { logger.Error("could not parse path component for replica index", "err", err, "replicaIndex", replicaIndexStr) - rw.WriteHeader(http.StatusNotFound) + http.Error(rw, fmt.Sprintf("could not parse replica index %q - %v", replicaIndexStr, err), http.StatusNotFound) return } @@ -283,12 +293,12 @@ func logQueryHandler(logger log.Logger, manifestClient providermanifest.Client, query := req.URL.Query() limit := uint(1000) - limitStr := query.Get("limit") + limitStr := query.Get(limitKey) if len(limitStr) != 0 { limit64, err := strconv.ParseUint(limitStr, 10, 31) if err != nil { - logger.Error("could not parse limit", "limit", limitStr, "err", err) - rw.WriteHeader(http.StatusBadRequest) + logger.Error("could not parse limit", limitKey, limitStr, "err", err) + http.Error(rw, fmt.Sprintf("could not parse limit %q - %v", limitStr, err), http.StatusBadRequest) return } limit = uint(limit64) @@ -296,12 +306,12 @@ func logQueryHandler(logger log.Logger, manifestClient providermanifest.Client, // Parse runIndex from query, if present runIndex := -1 - runIndexStr := query.Get("runIndex") + runIndexStr := query.Get(runIndexKey) if len(runIndexStr) != 0 { runIndex64, err := strconv.ParseUint(runIndexStr, 10, 31) if err != nil { - logger.Error("could not parse run index", "runIndex", runIndexStr, "err", err) - rw.WriteHeader(http.StatusBadRequest) + logger.Error("could not parse run index", runIndexKey, runIndexStr, "err", err) + http.Error(rw, fmt.Sprintf("could not run index %q - %v", runIndexStr, err), http.StatusBadRequest) return } runIndex = int(runIndex64) @@ -310,12 +320,12 @@ func logQueryHandler(logger log.Logger, manifestClient providermanifest.Client, // Parse start time from query, if present startTime := time.Now().Add(-1 * time.Hour) // start time - startTimeStr := query.Get("startTime") + startTimeStr := query.Get(startTimeKey) if len(startTimeStr) != 0 { startTimeInt, err := strconv.ParseInt(startTimeStr, 10, 64) if err != nil { - logger.Error("could not parse startTime", "startTime", startTimeStr, "err", err) - rw.WriteHeader(http.StatusBadRequest) + logger.Error("could not parse startTime", startTimeKey, startTimeStr, "err", err) + http.Error(rw, fmt.Sprintf("could not parse start time %q - %v", startTimeStr, err), http.StatusBadRequest) return } @@ -324,12 +334,12 @@ func logQueryHandler(logger log.Logger, manifestClient providermanifest.Client, // Parse end time from query, if present endTime := time.Now() - endTimeStr := query.Get("endTime") - if len(startTimeStr) != 0 { + endTimeStr := query.Get(endTimeKey) + if len(endTimeStr) != 0 { endTimeInt, err := strconv.ParseInt(endTimeStr, 10, 64) if err != nil { - logger.Error("could not parse endTime", "endTime", endTimeStr, "err", err) - rw.WriteHeader(http.StatusBadRequest) + logger.Error("could not parse endTime", endTimeKey, endTimeStr, "err", err) + http.Error(rw, fmt.Sprintf("could not parse end time %q - %v", endTimeStr, err), http.StatusBadRequest) return } @@ -339,14 +349,14 @@ func logQueryHandler(logger log.Logger, manifestClient providermanifest.Client, const timeLimit = time.Hour * 72 // TODO - configurable by the provider duration := endTime.Sub(startTime) if duration > timeLimit || duration < 0 { - logger.Error("duration of time range queried is not allowed", "startTime", startTime, "endTime", endTime) - rw.WriteHeader(http.StatusBadRequest) + logger.Error("duration of time range queried is not allowed", startTimeKey, startTime, endTimeKey, endTime) + http.Error(rw, fmt.Sprintf("time range of %v too large, maximum is %v", duration, timeLimit), http.StatusBadRequest) return } forward := true // Parse direction from query, if present - forwardStr := query.Get("forward") + forwardStr := query.Get(forwardKey) if len(forwardStr) != 0 { switch(forwardStr){ case "0": @@ -354,14 +364,14 @@ func logQueryHandler(logger log.Logger, manifestClient providermanifest.Client, case "1": forward = true default: - logger.Error("unknown value for forward", "forward", forwardStr) - rw.WriteHeader(http.StatusBadRequest) + logger.Error("unknown value for forward", forwardKey, forwardStr) + http.Error(rw, fmt.Sprintf("unacceptable value for forward %q", forwardStr), http.StatusBadRequest) return } } follow := false - followStr := query.Get("follow") + followStr := query.Get(followKey) if len(followStr) != 0 { switch(followStr) { case "0": @@ -370,28 +380,29 @@ func logQueryHandler(logger log.Logger, manifestClient providermanifest.Client, follow = true default: - logger.Error("unknown value for follow", "follow", forwardStr) - rw.WriteHeader(http.StatusBadRequest) + logger.Error("unknown value for follow", followKey, forwardStr) + http.Error(rw, fmt.Sprintf("unacceptable value for follow %q", followStr), http.StatusBadRequest) return } } if follow { + const msg = "cannot specify end time, reverse order, or limit if logs are tailed" if len(endTimeStr) != 0 { logger.Error("client requested log follow with endtime") - rw.WriteHeader(http.StatusBadRequest) + http.Error(rw, msg, http.StatusBadRequest) return } if !forward { logger.Error("client requested log follow with reverse direction") - rw.WriteHeader(http.StatusBadRequest) + http.Error(rw, msg, http.StatusBadRequest) return } if len(limitStr) != 0 { logger.Error("client request log follow with limit") - rw.WriteHeader(http.StatusBadRequest) + http.Error(rw, msg, http.StatusBadRequest) return } // Upgrade to a websocket From 9d8f23482e76620b2027848013b7e2787678f481 Mon Sep 17 00:00:00 2001 From: Eric Urban Date: Fri, 11 Mar 2022 10:25:33 -0600 Subject: [PATCH 12/13] wip --- provider/gateway/rest/client.go | 75 +++++++++++++++++++++++++++++++++ provider/gateway/rest/router.go | 3 +- 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/provider/gateway/rest/client.go b/provider/gateway/rest/client.go index 351bff7cda..41d4bb9dbd 100644 --- a/provider/gateway/rest/client.go +++ b/provider/gateway/rest/client.go @@ -68,6 +68,13 @@ type Client interface { endTime time.Time, forward bool, limit uint) (loki.LogResult, error) + LeaseLogsV2Follow(ctx context.Context, + leaseID mtypes.LeaseID, + serviceName string, + replicaIndex uint, + runIndex int, + startTime time.Time, + outputFn func(at time.Time, line string)error) (error) } type JwtClient interface { @@ -894,6 +901,74 @@ limit uint) (loki.LogResult, error) { return obj, nil } +var errBadLogLine = errors.New("bad log line") + +func (c *client) LeaseLogsV2Follow(ctx context.Context, leaseID mtypes.LeaseID, + serviceName string, + replicaIndex uint, + runIndex int, + startTime time.Time, + outputFn func(at time.Time, line string) error) (error) { + query := url.Values{} + query.Set("follow", "1") + + if runIndex > 0 { + query.Set(runIndexKey, fmt.Sprintf("%d", runIndex)) + } + + if !startTime.Equal(time.Time{}) { + query.Set(startTimeKey, fmt.Sprintf("%d", startTime.Unix())) + } + + secure := c.host.Scheme == "https" + hostCopy := c.host + if secure { + hostCopy.Scheme = "wss" + } else { + hostCopy.Scheme = "ws" + } + path := c.host.String() + "/" + leaseLogsQuery(leaseID, serviceName, replicaIndex) + "?" + query.Encode() + ws, resp, err := c.wsclient.DialContext(ctx, path, nil ) + if err != nil { + if resp == nil { + return err + } + if clientErr := createClientResponseErrorIfNotOK(resp, resp.Body); clientErr != nil { + return clientErr + } + return err + } + + for { + msg := []interface{}{nil, nil} + err := ws.ReadJSON(&msg) + if err != nil { + return err + } + + atNum, ok := msg[0].(*json.Number) + if !ok { + return fmt.Errorf("%w: expected number at index 0 of log line, got %T", errBadLogLine, msg[0]) + } + atInt, err := atNum.Int64() + if err != nil { + return fmt.Errorf("%w: expected number that is int64 at index 0 of log line, got %v", errBadLogLine, atNum.String()) + } + + logLine ,ok := msg[1].(string) + if !ok { + return fmt.Errorf("%w: expected string at index 1 of log line got %T", errBadLogLine, msg[1]) + } + + at := time.Unix(0, atInt) + + err = outputFn(at, logLine) + if err != nil { + return err + } + } +} + // parseCloseMessage extract close reason from websocket close message // "websocket: [error code]: [client reason]" func parseCloseMessage(msg string) string { diff --git a/provider/gateway/rest/router.go b/provider/gateway/rest/router.go index 3c794eeb8b..b7bc414f7f 100644 --- a/provider/gateway/rest/router.go +++ b/provider/gateway/rest/router.go @@ -224,11 +224,12 @@ func logQueryFollowHandler(logger log.Logger, // TODO - figoure out how to flush this at a regular cadence to avoid buffering ? ws.EnableWriteCompression(true) sendLogLine := func(at time.Time, line string) error { + msg := []interface{}{at.UnixNano(), line} err := ws.SetWriteDeadline(time.Now().Add(time.Second * 10)) // TODO - configurable ??? if err != nil { return err } - return ws.WriteMessage(websocket.TextMessage, []byte(line)) + return ws.WriteJSON(msg) } onDropped := func() error { From c21e52aca72fdcfa4ccaae4e2b4a2fe34f6a5da2 Mon Sep 17 00:00:00 2001 From: Eric Urban Date: Fri, 11 Mar 2022 19:51:33 -0600 Subject: [PATCH 13/13] poc for loki streaming of logs --- provider/cmd/leaseLogs.go | 58 +++++++++++++++++++++++++++++++++ provider/gateway/rest/client.go | 10 ++++-- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/provider/cmd/leaseLogs.go b/provider/cmd/leaseLogs.go index 764175abe0..af70ed7f10 100644 --- a/provider/cmd/leaseLogs.go +++ b/provider/cmd/leaseLogs.go @@ -185,6 +185,43 @@ func doLeaseLogsStatus(cmd *cobra.Command) error { }) } +func streamLeaseLogs(ctx context.Context, + cert tls.Certificate, +cctx sdkclient.Context, +deploymentID dtypes.DeploymentID, +provider string, +gseq uint32, +oseq uint32, +serviceName string, +startTime time.Time, +replicaIndex int, +runIndex int) error { + /** TODO - since service name is specified does this need to actually search for all the providers? + it should only be running on 1 provider with our architecture + */ + return withLeasesForDeployment(ctx, + cctx, + deploymentID, + provider, + gseq, + oseq, + func(leaseID mtypes.LeaseID) error { + prov, _ := sdk.AccAddressFromBech32(leaseID.GetProvider()) + gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + if err != nil { + return err + } + + return gclient.LeaseLogsV2Follow(ctx, + leaseID, + serviceName, + uint(replicaIndex), + runIndex, + startTime, func(at time.Time, line string) error { + return cctx.PrintString(fmt.Sprintf("%v %s\n", at, line)) + }) + }) +} func doLeaseLogsV2(cmd *cobra.Command, args []string) error { @@ -263,6 +300,27 @@ func doLeaseLogsV2(cmd *cobra.Command, args []string) error { } } + follow, err := cmd.Flags().GetBool(flagFollow) + if err != nil { + return err + } + if follow { + return streamLeaseLogs(cmd.Context(), + cert, + cctx, + deploymentID, + provider, + gseq, + oseq, + serviceName, + startTime, + int(replicaIndex), + runIndex) + } + + /** TODO - since service name is specified does this need to actually search for all the providers? + it should only be running on 1 provider with our architecture + */ return withLeasesForDeployment(cmd.Context(), cctx, deploymentID, diff --git a/provider/gateway/rest/client.go b/provider/gateway/rest/client.go index 41d4bb9dbd..6ff20c647b 100644 --- a/provider/gateway/rest/client.go +++ b/provider/gateway/rest/client.go @@ -941,12 +941,18 @@ func (c *client) LeaseLogsV2Follow(ctx context.Context, leaseID mtypes.LeaseID, for { msg := []interface{}{nil, nil} - err := ws.ReadJSON(&msg) + _, r, err := ws.NextReader() + if err != nil { + return err + } + dec := json.NewDecoder(r) + dec.UseNumber() + err = dec.Decode(&msg) if err != nil { return err } - atNum, ok := msg[0].(*json.Number) + atNum, ok := msg[0].(json.Number) if !ok { return fmt.Errorf("%w: expected number at index 0 of log line, got %T", errBadLogLine, msg[0]) }