diff --git a/_run/common-commands.mk b/_run/common-commands.mk index 0e27631b7f..11ff4c7273 100644 --- a/_run/common-commands.mk +++ b/_run/common-commands.mk @@ -15,8 +15,9 @@ PRICE ?= 10uakt CERT_HOSTNAME ?= localhost LEASE_SERVICES ?= web -JWT_AUTH_HOSTNAME ?= localhost -JWT_AUTH_HOST ?= $(JWT_AUTH_HOSTNAME):8444 +JWT_AUTH_HOSTNAME ?= localhost +JWT_AUTH_HOST ?= $(JWT_AUTH_HOSTNAME):8444 +RESOURCE_SERVER_HOST ?= localhost:8445 .PHONY: multisig-send multisig-send: @@ -70,6 +71,13 @@ auth-server: --from "$(PROVIDER_KEY_NAME)" \ --jwt-auth-listen-address "$(JWT_AUTH_HOST)" \ +.PHONY: run-resource-server +run-resource-server: + $(AKASH) provider run-resource-server \ + --from "$(PROVIDER_KEY_NAME)" \ + --resource-server-listen-address "$(RESOURCE_SERVER_HOST)" \ + --loki-gateway-listen-address localhost:3100 \ + .PHONY: send-manifest send-manifest: $(AKASH) provider send-manifest "$(SDL_PATH)" \ diff --git a/_run/common-kind.mk b/_run/common-kind.mk index ca2d5e9df5..dbdcf2d798 100644 --- a/_run/common-kind.mk +++ b/_run/common-kind.mk @@ -76,6 +76,7 @@ kind-cluster-create: $(KIND) --config "$(KIND_CONFIG)" \ --name "$(KIND_NAME)" \ --image "$(KIND_IMG)" + chmod 600 $$HOME/.kube/config kubectl label nodes $(KIND_NAME)-control-plane akash.network/role=ingress kubectl apply -f "$(INGRESS_CONFIG_PATH)" kubectl apply -f "$(INGRESS_CLASS_CONFIG_PATH)" @@ -83,20 +84,38 @@ kind-cluster-create: $(KIND) kubectl apply -f "$(METALLB_IP_CONFIG_PATH)" kubectl apply -f "$(METALLB_SERVICE_PATH)" "$(AKASH_ROOT)/script/setup-kind.sh" + helm repo add grafana https://grafana.github.io/helm-charts + helm repo update + helm upgrade --install loki grafana/loki \ + --version 2.9.1 \ + --create-namespace \ + --namespace loki-stack \ + --set config.table_manager.retention_period=48h,config.table_manager.retention_deletes_enabled=true,persistence.enabled=true,persistence.size=10Gi,persistence.storageClassName=standard,config.auth_enabled=true + helm upgrade --install promtail grafana/promtail \ + --version 3.11.0 \ + --namespace loki-stack \ + -f ../promtail-values.yaml + -# Create a kubernetes cluster with loki & grafana integrated for logging. +# Create a kubernetes cluster with multi-tenant loki, promtail and grafana integrated for logging. # See: https://www.scaleway.com/en/docs/tutorials/manage-k8s-logging-loki/ for more info. .PHONY: kind-cluster-loki-create kind-cluster-loki-create: kind-cluster-create helm repo add grafana https://grafana.github.io/helm-charts helm repo update - helm install loki-stack grafana/loki-stack \ + helm upgrade --install loki grafana/loki \ + --version 2.9.1 \ --create-namespace \ --namespace loki-stack \ - --set promtail.enabled=true,loki.persistence.enabled=true,loki.persistence.size=10Gi - helm install loki-grafana grafana/grafana \ - --set persistence.enabled=true,persistence.type=pvc,persistence.size=10Gi \ - --namespace=loki-stack + --set persistence.enabled=true,persistence.size=10Gi,config.auth_enabled=true + helm upgrade --install promtail grafana/promtail \ + --version 3.11.0 \ + --namespace loki-stack \ + -f ../promtail-values.yaml + helm upgrade --install grafana grafana/grafana \ + --version 6.21.2 \ + --namespace loki-stack \ + --set persistence.enabled=true,persistence.type=pvc,persistence.size=10Gi .PHONY: kind-cluster-calico-create kind-cluster-calico-create: $(KIND) diff --git a/_run/kube/Makefile b/_run/kube/Makefile index 54cb7a4258..db1a2a920f 100644 --- a/_run/kube/Makefile +++ b/_run/kube/Makefile @@ -19,7 +19,8 @@ provider-run: --cluster-node-port-quantity 100 \ --cluster-public-hostname "$(GATEWAY_HOSTNAME)" \ --bid-price-strategy "randomRange" \ - --deployment-runtime-class "none" + --deployment-runtime-class "none" \ + --hostname-operator-endpoint "localhost:8085" .PHONY: provider-lease-status provider-lease-status: diff --git a/_run/kube/deployment.yaml b/_run/kube/deployment.yaml index e820432430..f4547df189 100644 --- a/_run/kube/deployment.yaml +++ b/_run/kube/deployment.yaml @@ -15,7 +15,6 @@ services: - hello.localhost to: - global: true - ip: "foobario" bew: image: quay.io/ovrclk/demo-app expose: @@ -54,6 +53,3 @@ deployment: westcoast: profile: web count: 1 -endpoints: - foobario: - kind: ip diff --git a/_run/promtail-values.yaml b/_run/promtail-values.yaml new file mode 100644 index 0000000000..2e3ce716c3 --- /dev/null +++ b/_run/promtail-values.yaml @@ -0,0 +1,7 @@ +config: + lokiAddress: http://loki.loki-stack:3100/loki/api/v1/push + snippets: + pipelineStages: + - cri: {} + - tenant: + source: namespace \ No newline at end of file diff --git a/provider/cluster/kube/loki/loki.go b/provider/cluster/kube/loki/loki.go new file mode 100644 index 0000000000..cae5745c3f --- /dev/null +++ b/provider/cluster/kube/loki/loki.go @@ -0,0 +1,675 @@ +package loki + +/** +Some code in this file is from the Loki project v1.6.1. It uses the Apache License version 2.0 which is the +same as this project. + */ + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + clusterutil "github.com/ovrclk/akash/provider/cluster/util" + mtypes "github.com/ovrclk/akash/x/market/types/v1beta2" + "github.com/tendermint/tendermint/libs/log" + "io" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "net" + "net/http" + "net/url" + "path" + "sort" + "strconv" + "strings" + "sync" + "time" +) + +const ( + lokiNamespace = "loki-stack" + lokiServiceName = "loki-headless" + lokiPortName = "http-metrics" + + queryPath = "/loki/api/v1/query" + queryRangePath = "/loki/api/v1/query_range" + labelsPath = "/loki/api/v1/labels" + labelValuesPath = "/loki/api/v1/label/%s/values" + seriesPath = "/loki/api/v1/series" + tailPath = "/loki/api/v1/tail" + filenameLabel = "filename" + podLabel = "pod" + lokiOrgIdHeader = "X-Scope-OrgID" +) + +type lokiDirection string + +const ( + FORWARD lokiDirection = "FORWARD" + BACKWARD lokiDirection = "BACKWARD" +) + +var ( + ErrLoki = errors.New("error querying loki") +) + +type Client interface { + FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([]LogStatus, error) + GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime, endTime time.Time, forward bool, limit uint) (LogResult, error) + TailLogsByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime time.Time, + eachLogLine func(at time.Time, line string)error, + onFirstDroppedLog func() error) error + + Stop() +} + +func NewClient(logger log.Logger, kubeConfig *rest.Config, port *net.SRV) (Client, error) { + kubeClient, err := kubernetes.NewForConfig(kubeConfig) + if err != nil { + return nil, err + } + sda, err := clusterutil.NewServiceDiscoveryAgent(logger, kubeConfig, lokiPortName, lokiServiceName, lokiNamespace, port) + if err != nil { + return nil, err + } + return &client{ + sda:sda, + lock: &sync.Mutex{}, + kc: kubeClient, + }, nil +} + +type client struct { + sda clusterutil.ServiceDiscoveryAgent + kc kubernetes.Interface + client clusterutil.ServiceClient + websocketClient clusterutil.WebsocketServiceClient + lock sync.Locker +} + +func (c *client) Stop() { + c.sda.Stop() +} + +type lokiLogQueryResult struct { + Stream map[string]string `json:"stream"` + Values [][]string `json:"values"` +} + +type lokiLogQueryData struct { + ResultType string `json:"resultType"` + Result []lokiLogQueryResult `json:"result"` +} + +type lokiLogQueryResponse struct { + Status string `json:"status"` + Data lokiLogQueryData `json:"data"` +} + +type LogResultLine struct { + RunIndex uint + At time.Time + Line string +} + +type LogResult struct { + ServiceName string + ReplicaIndex uint + Entries []LogResultLine + +} + +func (c *client) discoverFilename(ctx context.Context, leaseID mtypes.LeaseID, startTime, endTime time.Time, podName string) (string,error) { + // Query using the pod name to get the result + lc, err := c.getLokiClient(ctx) + if err != nil { + return "", err + } + + httpQueryString := url.Values{} + httpQueryString.Set("start", fmt.Sprintf("%d", startTime.UnixNano())) + httpQueryString.Set("end", fmt.Sprintf("%d", endTime.UnixNano())) + httpQueryString.Set("limit", "1") + lokiQuery := fmt.Sprintf("{%s=%s}", podLabel, safeQuote(podName)) // Note this is not JSON + httpQueryString.Set("query", lokiQuery) + httpQueryString.Set("direction", string(BACKWARD)) + + request, err := lc.CreateRequest(ctx, http.MethodGet, queryRangePath, nil ) + if err != nil { + return "", err + } + request.URL.RawQuery = httpQueryString.Encode() + + request.Header.Add(lokiOrgIdHeader, clusterutil.LeaseIDToNamespace(leaseID)) + + resp, err := lc.DoRequest(request) + if err != nil { + c.clearClient() + return "" , fmt.Errorf("loki filename discovery log query for pod %q failed: %w", podName, err) + } + + if resp.StatusCode != 200 { + buf := &bytes.Buffer{} + _, _ = io.Copy(buf, resp.Body) + msg := strings.Trim(buf.String(), "\n\t\r") + return "", fmt.Errorf("%w: loki filename discovery log query failed for pod %q, got status code %d; %s", ErrLoki, podName, resp.StatusCode, msg) + } + + decoder := json.NewDecoder(resp.Body) + lokiResult := lokiLogQueryResponse{} + err = decoder.Decode(&lokiResult) + if err != nil { + return "",fmt.Errorf("loki log query response for pod %q could not be decoded: %w", podName, err) + } + + if len(lokiResult.Data.Result) == 0 { + return "", fmt.Errorf("%w: loki filename discovery for pod %q returned no results", ErrLoki, podName) + } + + filename, exists:= lokiResult.Data.Result[0].Stream[filenameLabel] + if !exists { + return "", fmt.Errorf("%w: loki filename discovery for pod %q had no label %q", ErrLoki, podName, filenameLabel) + } + + return filename, nil +} + +// DroppedStream represents a dropped stream in tail call +type droppedStream struct { + Timestamp time.Time + Labels map[string]string +} + +//Entry represents a log entry. It includes a log message and the time it occurred at. +type entry struct { + Timestamp time.Time + Line string +} + +//Stream represents a log stream. It includes a set of log entries and their labels. +type stream struct { + Labels map[string]string `json:"stream"` + Entries [][]string `json:"values"` +} + +// tailResponse represents the http json response to a tail query from loki +type tailResponse struct { + Streams []stream `json:"streams,omitempty"` + DroppedStreams []droppedStream `json:"dropped_entries,omitempty"` +} + +func (c *client) TailLogsByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime time.Time, + eachLogLine func(at time.Time, line string)error, + onFirstDroppedLog func() error) error { + possiblePods, err := c.detectRunsForLease(ctx, leaseID) + if err != nil { + return err + } + + datamap := serviceNameAndReplicaIndexToPodName(possiblePods) + podName, exists := datamap[serviceNameAndReplicaIndex{ + replicaIndex: int(replicaIndex), + serviceName: serviceName, + }] + + if !exists { + return fmt.Errorf("%w: no entry for service %q and replica %d", ErrLoki, serviceName, replicaIndex) + } + + httpQueryString := url.Values{} + httpQueryString.Set("from", fmt.Sprintf("%d", startTime.UnixNano())) + httpQueryString.Set("limit", "1000") // TODO - configurable or something? Maybe user requestable? + httpQueryString.Set("delay_for", "0") + + var filenameLabelFilter string + specifyRunIndex := runIndex >= 0 + if specifyRunIndex { + var err error + filenameLabelFilter, err = c.getFilename(ctx, leaseID, startTime, time.Now(), runIndex, podName) + if err != nil { + return err + } + } + + lokiQueryBuf := &bytes.Buffer{} + // Note this is not JSON + _, _ = fmt.Fprint(lokiQueryBuf, "{") + _, _ = fmt.Fprintf(lokiQueryBuf, "%s=%s", podLabel, safeQuote(podName)) + // specify filename label here if runIndex >= 0 + if specifyRunIndex { + _, _ = fmt.Fprintf(lokiQueryBuf,",%s=%s", filenameLabel, safeQuote(filenameLabelFilter)) + } + _, _ = fmt.Fprint(lokiQueryBuf, "}") + + httpQueryString.Set("query", lokiQueryBuf.String()) + + wslc, err := c.getLokiWebsocketClient(ctx) + if err != nil { + return err + } + + headers := http.Header{ + lokiOrgIdHeader: []string{clusterutil.LeaseIDToNamespace(leaseID)}, + } + conn, err := wslc.DialWebsocket(ctx, tailPath + "?" + httpQueryString.Encode(), headers) + if err != nil { + c.clearClient() + return err + } + + dropCalled := false + for { + if ctx.Err() != nil { + return ctx.Err() + } + var logs tailResponse + err := conn.ReadJSON(&logs) + if err != nil { + return fmt.Errorf("error parsing JSON from loki log tail: %w", err) + } + + for _, stream := range logs.Streams { + for _, entry := range stream.Entries { + at := time.Time{} // TODO + line := entry[1] + err = eachLogLine(at, line) + if err != nil { + return err + } + } + } + + if !dropCalled && len(logs.DroppedStreams) != 0{ + // caused by the client being too slow + dropCalled = true + err = onFirstDroppedLog() + if err != nil { + return err + } + } + + } + +} + +func (c *client) getFilename(ctx context.Context, leaseID mtypes.LeaseID, startTime, endTime time.Time, runIndex int, podName string) (string, error){ + filename, err := c.discoverFilename(ctx, leaseID, startTime, endTime, podName) + if err != nil { + return "", err + } + + head, tail := path.Split(filename) + parts := strings.SplitN(tail, ".", 2) + if len(parts) != 2 { + return "", fmt.Errorf("%w: while constructing filename filter cannot make sense of filepath %q", ErrLoki, filename) + } + + return fmt.Sprintf("%s/%d.%s", head, runIndex, parts[1]), nil +} + + +func safeQuote(input string) string { + input = strings.ReplaceAll(input, `"`,"") + return fmt.Sprintf("%q", input) +} + +func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime, endTime time.Time, forward bool, limit uint) (LogResult, error) { + lidNS := clusterutil.LeaseIDToNamespace(leaseID) + // get a list of possible logs for this service + possiblePods, err := c.detectRunsForLease(ctx, leaseID) + if err != nil { + return LogResult{}, err + } + + datamap := serviceNameAndReplicaIndexToPodName(possiblePods) + + podName, exists := datamap[serviceNameAndReplicaIndex{ + replicaIndex: int(replicaIndex), + serviceName: serviceName, + }] + + if !exists { + return LogResult{},fmt.Errorf("%w: no entry for service %q and replica %d", ErrLoki, serviceName, replicaIndex) + } + + // Query using the pod name to get the result + lc, err := c.getLokiClient(ctx) + if err != nil { + return LogResult{},err + } + + // if runIndex >= 0 then launch a query to get a single log line back from the backend + // then use that log line to determine the correct label to query on. Will need to parse it out and then + // modify it to be the expected value + specifyRunIndex := runIndex >= 0 + filenameLabelFilter := "" + if specifyRunIndex { + filenameLabelFilter, err = c.getFilename(ctx, leaseID, startTime, endTime, runIndex, podName) + if err != nil { + return LogResult{}, err + } + } + + httpQueryString := url.Values{} + httpQueryString.Set("start", fmt.Sprintf("%d", startTime.UnixNano())) + httpQueryString.Set("end", fmt.Sprintf("%d", endTime.UnixNano())) + httpQueryString.Set("limit", fmt.Sprintf("%d",limit)) + + lokiQueryBuf := &bytes.Buffer{} + // Note this is not JSON + _, _ = fmt.Fprint(lokiQueryBuf, "{") + _, _ = fmt.Fprintf(lokiQueryBuf, "%s=%s", podLabel, safeQuote(podName)) + // specify filename label here if runIndex >= 0 + if specifyRunIndex { + _, _ = fmt.Fprintf(lokiQueryBuf,",%s=%s", filenameLabel, safeQuote(filenameLabelFilter)) + } + _, _ = fmt.Fprint(lokiQueryBuf, "}") + + httpQueryString.Set("query", lokiQueryBuf.String()) + + direction := FORWARD + if !forward { + direction = BACKWARD + } + httpQueryString.Set("direction", string(direction)) + + request, err := lc.CreateRequest(ctx, http.MethodGet, queryRangePath, nil ) + if err != nil { + return LogResult{},err + } + request.URL.RawQuery = httpQueryString.Encode() + + request.Header.Add(lokiOrgIdHeader, lidNS) + + resp, err := lc.DoRequest(request) + if err != nil { + c.clearClient() + return LogResult{},fmt.Errorf("loki log request failed: %w", err) + } + + if resp.StatusCode != 200 { + buf := &bytes.Buffer{} + _, _ = io.Copy(buf, resp.Body) + msg := strings.Trim(buf.String(), "\n\t\r") + return LogResult{},fmt.Errorf("%w: fetching logs from loki failed, got status code %d; %s", ErrLoki, resp.StatusCode, msg) + } + + // Parse the response & grab the values we care about + decoder := json.NewDecoder(resp.Body) + lokiResult := lokiLogQueryResponse{} + err = decoder.Decode(&lokiResult) + if err != nil { + return LogResult{},fmt.Errorf("loki log query response could not be decoded: %w", err) + } + + result := LogResult{ + ServiceName: serviceName, + ReplicaIndex: replicaIndex, + Entries: nil, + } + for _, resultSet := range lokiResult.Data.Result { + filepath, exists := resultSet.Stream[filenameLabel] + if !exists { + return LogResult{}, fmt.Errorf("%w: expected loki log result set to have label %q but it does not", ErrLoki, filenameLabel) + } + + _, filename := path.Split(filepath) + filenameParts := strings.SplitN(filename, ".", 2) + + runIndex, err := strconv.ParseUint(filenameParts[0], 0, 31) + if err != nil { + return LogResult{}, fmt.Errorf("expected to parse filename %q as integer for kubernetes run index: %w", filename, err) + + } + for _, logEntry := range resultSet.Values { + if len(logEntry) != 2 { + return LogResult{},fmt.Errorf("%w: expected log entry to have 2 values, not %d", ErrLoki, len(logEntry)) + } + timeStampStr := logEntry[0] + + timestamp, err := strconv.ParseInt(timeStampStr,0, 64) + if err != nil { + return LogResult{},fmt.Errorf("could not parse log entry timestamp %q: %w", timeStampStr, err) + } + + at := time.Unix(0, timestamp) + logLine := logEntry[1] + + result.Entries = append(result.Entries, LogResultLine{ + RunIndex: uint(runIndex), + At: at, + Line: logLine, + }) + + } + } + + return result, nil +} + +func (c *client) clearClient(){ + c.lock.Lock() + defer c.lock.Unlock() + + c.websocketClient = nil + c.client = nil +} + +func (c *client) getLokiWebsocketClient(ctx context.Context) (clusterutil.WebsocketServiceClient, error) { + c.lock.Lock() + defer c.lock.Unlock() + + if c.websocketClient == nil { + var err error + c.websocketClient, err = c.sda.GetWebsocketClient(ctx, false, false) + if err != nil { + return nil, err + } + } + return c.websocketClient, nil +} + +func (c *client) getLokiClient(ctx context.Context) (clusterutil.ServiceClient, error) { + c.lock.Lock() + defer c.lock.Unlock() + + if c.client == nil { + var err error + c.client, err = c.sda.GetClient(ctx, false, false) + if err != nil { + return nil, err + } + } + return c.client, nil +} + +type lokiLabelValuesResponse struct { + Status string `json:"status"` + Data []string `json:"data"` +} + +// TODO - should this be an interface ? +type LogStatus struct { + ServiceName string + ReplicaIndex int + Present bool + Restarts uint +} +func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([]LogStatus, error) { + lidNS := clusterutil.LeaseIDToNamespace(leaseID) + lc, err := c.getLokiClient(ctx) + if err != nil { + return nil, err + } + + // get a list of possible logs for this service + possiblePods, err := c.detectRunsForLease(ctx, leaseID) + + // Query Loki for labels of the log filename. The label has a value like + // /var/log/pods/7ev5rjb3nfl9niulmnpaft53g27rck4fhm6v3n4nb5jvu_web-bb84fdfcf-brx2z_f192c48e-ccb3-441e-9778-58de019259d6/web/0.log + // Where the last part of the filename is the associated restart number. The label value only appears if + // the container actually logged something before it terminates. So, we can check the labels and parse out + // that number to determine what containers logged anything before they died + if err != nil { + return nil, err + } + + req, err := lc.CreateRequest(ctx, + http.MethodGet, + fmt.Sprintf(labelValuesPath, podLabel), + nil) + + if err != nil { + return nil, err + } + + req.Header.Add(lokiOrgIdHeader, lidNS) + resp, err := lc.DoRequest(req) + if err != nil { + c.clearClient() + return nil, fmt.Errorf("loki label values request failed: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("%w: fetching loki label values failed, got status code %d", ErrLoki, resp.StatusCode) + } + + decoder := json.NewDecoder(resp.Body) + result := lokiLabelValuesResponse{} + err = decoder.Decode(&result) + if err != nil { + return nil, fmt.Errorf("decoding loki label values failed: %w", err) + } + + podNameToPodData := podNamesToServiceNameAndReplicaIndex(possiblePods) + returnValue := make([]LogStatus, 0, len(podNameToPodData)) + positionData := make(map[serviceNameAndReplicaIndex]int) + + // By default nothing is found + for possiblePodName, entry := range possiblePods { + podData := podNameToPodData[possiblePodName] + positionData[serviceNameAndReplicaIndex{ + replicaIndex: podData.replicaIndex, + serviceName: podData.serviceName, + }] = len(returnValue) + returnValue = append(returnValue, LogStatus{ + ServiceName: entry.serviceName, + ReplicaIndex: podData.replicaIndex, + Restarts: entry.restarts, + }) + } + + // Mark each pod that is found with logs + for _, podName := range result.Data { + podData, exists := podNameToPodData[podName] + if ! exists { + continue + } + i := positionData[podData] + returnValue[i].Present = true + } + + + return returnValue, nil +} + +type serviceNameAndReplicaIndex struct{ + replicaIndex int + serviceName string +} + +func serviceNameAndReplicaIndexToPodName(input map[string]runEntry) map[serviceNameAndReplicaIndex]string { + result := make(map[serviceNameAndReplicaIndex]string) + + withPartitionedPods(input, func(podName string, serviceName string, replicaIndex int){ + result[serviceNameAndReplicaIndex{ + replicaIndex: replicaIndex, + serviceName: serviceName, + }] = podName + }) + + return result +} + +func withPartitionedPods(input map[string]runEntry, fn func(podName string, serviceName string, replicaIndex int)) { + + // Assign pods a replica index by their name, so it is consistent. This is done elsewhere as well + partitionedPods := make(map[string][]string) + for podName, entry := range input { + listForService := partitionedPods[entry.serviceName] + + listForService = append(listForService, podName) + partitionedPods[entry.serviceName] = listForService + } + + for serviceName, podNames := range partitionedPods { + sort.Strings(podNames) + + for i, podName := range podNames { + fn(podName, serviceName, i) + } + } +} + +func podNamesToServiceNameAndReplicaIndex(input map[string]runEntry) map[string]serviceNameAndReplicaIndex { + podNameToReplicaIndex := make(map[string]serviceNameAndReplicaIndex) + + withPartitionedPods(input, func(podName string, serviceName string, replicaIndex int){ + podNameToReplicaIndex[podName] = serviceNameAndReplicaIndex{ + replicaIndex: replicaIndex, + serviceName: serviceName, + } + }) + + return podNameToReplicaIndex +} + +type runEntry struct { + restarts uint + serviceName string +} + +func (c *client) detectRunsForLease(ctx context.Context, leaseID mtypes.LeaseID) (map[string]runEntry, error) { + // Containers can run more than once (i.e. a pod restarts containers when configured to do so) + // so this code picks up on that by looking into the labels + lidNS := clusterutil.LeaseIDToNamespace(leaseID) + // TODO - paginate + podsResult, err := c.kc.CoreV1().Pods(lidNS).List(ctx, metav1.ListOptions{ + // TODO - use a constant here + LabelSelector: "akash.network=true", + }) + + if err != nil { + return nil, err + } + + if len(podsResult.Items) == 0 { + return nil, fmt.Errorf("%w: lease %q has no pods in kubernetes", ErrLoki, leaseID.String()) + } + + // Build up a mapping of pod names to the expected number of log files + result := make(map[string]runEntry) + for _, pod := range podsResult.Items { + // We only define pods with a single container at this time + if len(pod.Status.ContainerStatuses) != 1 { + return nil, fmt.Errorf("%w: pod %q has %d containers, expected 1", ErrLoki, pod.Name, len(pod.Status.ContainerStatuses)) + } + status := pod.Status.ContainerStatuses[0] + + // TODO - use a constant + serviceName, ok := pod.ObjectMeta.Labels["akash.network/manifest-service"] + if !ok { + return nil, fmt.Errorf("%w: pod %q has no akash service name label", ErrLoki, pod.Name) + } + + result[pod.Name] = runEntry{ + restarts: uint(status.RestartCount), + serviceName: serviceName, + } + } + + return result, nil +} diff --git a/provider/cluster/util/service_client.go b/provider/cluster/util/service_client.go index 049f9a39b5..b0aaefc086 100644 --- a/provider/cluster/util/service_client.go +++ b/provider/cluster/util/service_client.go @@ -4,12 +4,16 @@ import ( "context" "crypto/tls" "fmt" + "github.com/gorilla/websocket" "io" + "io/ioutil" "net" "net/http" + "strings" ) func (hwsc *httpWrapperServiceClient) CreateRequest(ctx context.Context, method, path string, body io.Reader) (*http.Request, error) { + path = strings.TrimLeft(path, "/") serviceURL := fmt.Sprintf("%s/%s", hwsc.url, path) req, err := http.NewRequestWithContext(ctx, method, serviceURL, body) if err != nil { @@ -24,6 +28,7 @@ func (hwsc *httpWrapperServiceClient) DoRequest(req *http.Request) (*http.Respon } func newHTTPWrapperServiceClient(isHTTPS, secure bool, baseURL string) *httpWrapperServiceClient { + baseURL = strings.TrimRight(baseURL, "/") netDialer := &net.Dialer{} // By default, block both things @@ -57,6 +62,7 @@ func newHTTPWrapperServiceClient(isHTTPS, secure bool, baseURL string) *httpWrap } func newHTTPWrapperServiceClientWithTransport(transport http.RoundTripper, baseURL string) *httpWrapperServiceClient { + baseURL = strings.TrimRight(baseURL, "/") return &httpWrapperServiceClient{ url: baseURL, httpClient: &http.Client{ @@ -64,3 +70,38 @@ func newHTTPWrapperServiceClientWithTransport(transport http.RoundTripper, baseU }, } } + + +func newWebsocketWrapperServiceClientFromDialer(dialer websocket.Dialer, baseURL string) *websocketWrapperServiceClient { + baseURL = strings.TrimRight(baseURL, "/") + return &websocketWrapperServiceClient{ + url: baseURL, + dialer: &dialer, + } +} + +func (wwsc *websocketWrapperServiceClient) DialWebsocket(ctx context.Context, path string, requestHeaders http.Header) (*websocket.Conn, error) { + path = strings.TrimLeft(path, "/") + dialUrl := fmt.Sprintf("%s/%s", wwsc.url, path) + + if strings.HasPrefix(dialUrl, "https") { + dialUrl = strings.Replace(dialUrl, "https", "wss", 1) + } else if strings.HasPrefix(dialUrl, "http") { + dialUrl = strings.Replace(dialUrl, "http", "ws", 1) + } + + conn, resp, err := wwsc.dialer.DialContext(ctx, dialUrl, requestHeaders) + if err != nil { + if resp == nil { + return nil, err + } + + buf, _ := ioutil.ReadAll(resp.Body) // nolint + bufStr := strings.ReplaceAll(string(buf), "\n", " ") + return nil, fmt.Errorf("%w: error response from server when dialing websocket %q; status %v; response: %s", err, dialUrl, resp.StatusCode, + bufStr) + } + + return conn, err + +} \ No newline at end of file diff --git a/provider/cluster/util/service_discovery_agent.go b/provider/cluster/util/service_discovery_agent.go index dbe00f67f1..97bb34896b 100644 --- a/provider/cluster/util/service_discovery_agent.go +++ b/provider/cluster/util/service_discovery_agent.go @@ -2,10 +2,12 @@ package util import ( "context" + "crypto/tls" "errors" "fmt" "github.com/boz/go-lifecycle" "github.com/desertbit/timer" + "github.com/gorilla/websocket" "github.com/ovrclk/akash/util/runner" "github.com/tendermint/tendermint/libs/log" corev1 "k8s.io/api/core/v1" @@ -14,6 +16,8 @@ import ( "k8s.io/client-go/rest" "math/rand" "net" + "net/http" + "net/url" "time" ) @@ -29,6 +33,7 @@ func NewServiceDiscoveryAgent(logger log.Logger, kubeConfig *rest.Config, portNa return staticServiceDiscoveryAgent(*endpoint), nil } + // TODO - only assign this if that discovery mode is selected kc, err := kubernetes.NewForConfig(kubeConfig) if err != nil { return nil, err @@ -133,7 +138,7 @@ func (sda *serviceDiscoveryAgent) setResult(factory clientFactory, err error) { } } -func (sda *serviceDiscoveryAgent) GetClient(ctx context.Context, isHTTPS, secure bool) (ServiceClient, error) { +func (sda *serviceDiscoveryAgent) getClientFactory(ctx context.Context) (clientFactory, error){ errCh := make(chan error, 1) resultCh := make(chan clientFactory, 1) req := serviceDiscoveryRequest{ @@ -151,12 +156,33 @@ func (sda *serviceDiscoveryAgent) GetClient(ctx context.Context, isHTTPS, secure select { case result := <-resultCh: - return result(isHTTPS, secure), nil + return result, nil case err := <-errCh: return nil, err case <-ctx.Done(): return nil, ctx.Err() } + +} + +func (sda *serviceDiscoveryAgent) GetClient(ctx context.Context, isHTTPS, secure bool) (ServiceClient, error) { + cf, err := sda.getClientFactory(ctx) + + if err != nil { + return nil, err + } + + return cf.MakeServiceClient(isHTTPS, secure), nil +} + +func (sda *serviceDiscoveryAgent) GetWebsocketClient(ctx context.Context, isHTTPS, secure bool) (WebsocketServiceClient, error) { + cf, err := sda.getClientFactory(ctx) + + if err != nil { + return nil, err + } + + return cf.MakeWebsocketServiceClient(isHTTPS, secure), nil } func (sda *serviceDiscoveryAgent) discover() (clientFactory, error) { @@ -172,6 +198,57 @@ func (sda *serviceDiscoveryAgent) discover() (clientFactory, error) { return sda.discoverKube() } +type kubeClientFactory struct { + httpTransport http.RoundTripper + kubeHost string + kubeNamespace string + serviceName string + portName string + tlsConfig *tls.Config + netDialContext func(context.Context, string, string) (net.Conn, error) + proxy func(*http.Request) (*url.URL, error) + handshakeTimeout time.Duration +} + +func (kcf kubeClientFactory) MakeServiceClient(isHTTPS, secure bool) ServiceClient { + serviceName := kcf.serviceName + if isHTTPS { + serviceName = fmt.Sprintf("https:%s", kcf.serviceName) + } + /** + Documentation here: https://kubernetes.io/docs/tasks/administer-cluster/access-cluster-services/ + + The structure is + http://kubernetes_master_address/api/v1/namespaces/namespace_name/services/[https:]service_name[:port_name]/proxy + */ + proxyURL := fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:%s/proxy", kcf.kubeHost, kcf.kubeNamespace, serviceName, kcf.portName) + + return newHTTPWrapperServiceClientWithTransport(kcf.httpTransport, proxyURL) +} + +func (kcf kubeClientFactory) MakeWebsocketServiceClient(isHTTPS, secure bool) WebsocketServiceClient { + dialer := websocket.Dialer{ + NetDialContext: kcf.netDialContext, + Proxy: kcf.proxy, + TLSClientConfig: kcf.tlsConfig, + HandshakeTimeout: kcf.handshakeTimeout, + } + + serviceName := kcf.serviceName + if isHTTPS { + serviceName = fmt.Sprintf("https:%s", kcf.serviceName) + } + /** + Documentation here: https://kubernetes.io/docs/tasks/administer-cluster/access-cluster-services/ + + The structure is + http://kubernetes_master_address/api/v1/namespaces/namespace_name/services/[https:]service_name[:port_name]/proxy + */ + proxyURL := fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:%s/proxy", kcf.kubeHost, kcf.kubeNamespace, serviceName, kcf.portName) + + return newWebsocketWrapperServiceClientFromDialer(dialer, proxyURL) +} + func (sda *serviceDiscoveryAgent) discoverKube() (clientFactory, error) { // This code is retried forever, but don't wait on a result for a very long time. Just poll ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) @@ -188,8 +265,8 @@ func (sda *serviceDiscoveryAgent) discoverKube() (clientFactory, error) { ports := service.Spec.Ports selectedPort := -1 - for i, port := range ports { - if port.Name == sda.portName && corev1.ProtocolTCP == port.Protocol { + for i, kPort := range ports { + if kPort.Name == sda.portName && corev1.ProtocolTCP == kPort.Protocol { selectedPort = i break } @@ -198,7 +275,7 @@ func (sda *serviceDiscoveryAgent) discoverKube() (clientFactory, error) { if selectedPort == -1 { return nil, fmt.Errorf("%w: service %q exists but has no port %q (TCP)", errServiceDiscovery, sda.serviceName, sda.portName) } - port := ports[selectedPort] + kPort := ports[selectedPort] // Get the host for the kube cluster kubeHost := sda.kubeConfig.Host @@ -209,23 +286,62 @@ func (sda *serviceDiscoveryAgent) discoverKube() (clientFactory, error) { return nil, err } - return func(isHTTPS, _ bool) ServiceClient { - serviceName := service.Name - if isHTTPS { - serviceName = fmt.Sprintf("https:%s", service.Name) - } - /** - Documentation here: https://kubernetes.io/docs/tasks/administer-cluster/access-cluster-services/ - The structure is - http://kubernetes_master_address/api/v1/namespaces/namespace_name/services/[https:]service_name[:port_name]/proxy - */ - proxyURL := fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:%s/proxy", kubeHost, service.Namespace, serviceName, port.Name) + tlsConfig, err := rest.TLSConfigFor(sda.kubeConfig) + if err != nil { + return nil, err + } - return newHTTPWrapperServiceClientWithTransport(httpTransport, proxyURL) + return kubeClientFactory{ + httpTransport: httpTransport, + kubeHost: kubeHost, + kubeNamespace: service.Namespace, + serviceName: service.Name, + portName: kPort.Name, + tlsConfig: tlsConfig, + netDialContext: sda.kubeConfig.Dial, + proxy: sda.kubeConfig.Proxy, + handshakeTimeout: sda.kubeConfig.Timeout, }, nil } +type dnsClientFactory struct { + target string + port uint16 + handshakeTimeout time.Duration + +} + +func (dcf dnsClientFactory) MakeServiceClient(isHTTPS, secure bool) ServiceClient { + proto := "http" + if isHTTPS { + proto = "https" + } + discoveredURL := fmt.Sprintf("%s://%v:%v", proto, dcf.target, dcf.port) + + return newHTTPWrapperServiceClient(isHTTPS, secure, discoveredURL) +} + +func (dcf dnsClientFactory) MakeWebsocketServiceClient(isHTTPS, secure bool) WebsocketServiceClient { + proto := "ws" + if isHTTPS { + proto = "wss" + } + discoveredURL := fmt.Sprintf("%s://%v:%v", proto, dcf.target, dcf.port) + + tlsConfig := &tls.Config{ + InsecureSkipVerify: !secure, + } + + dialer := websocket.Dialer{ + TLSClientConfig: tlsConfig, + HandshakeTimeout: dcf.handshakeTimeout, + } + + return newWebsocketWrapperServiceClientFromDialer(dialer, discoveredURL) +} + + func (sda *serviceDiscoveryAgent) discoverDNS() (clientFactory, error) { // FUTURE - try and find a 3rd party API that allows timeouts to be put on this request _, addrs, err := net.LookupSRV(sda.portName, "TCP", fmt.Sprintf("%s.%s.svc.cluster.local", sda.serviceName, sda.namespace)) @@ -244,13 +360,10 @@ func (sda *serviceDiscoveryAgent) discoverDNS() (clientFactory, error) { // nolint:gosec addrI := rand.Int31n(int32(len(addrs))) choice := result[addrI] - return func(isHTTPS, secure bool) ServiceClient { - proto := "http" - if isHTTPS { - proto = "https" - } - discoveredURL := fmt.Sprintf("%s://%v:%v", proto, choice.Target, choice.Port) - return newHTTPWrapperServiceClient(isHTTPS, secure, discoveredURL) + return dnsClientFactory{ + target: choice.Target, + port: choice.Port, + handshakeTimeout: sda.kubeConfig.Timeout, }, nil } diff --git a/provider/cluster/util/service_discovery_agent_static.go b/provider/cluster/util/service_discovery_agent_static.go index 6f529a8039..7134839717 100644 --- a/provider/cluster/util/service_discovery_agent_static.go +++ b/provider/cluster/util/service_discovery_agent_static.go @@ -19,3 +19,8 @@ func (ssda staticServiceDiscoveryAgent) GetClient(ctx context.Context, isHTTPS, url := fmt.Sprintf("%s://%v:%v", proto, ssda.Target, ssda.Port) return newHTTPWrapperServiceClient(isHTTPS, secure, url), nil } + +func (ssda staticServiceDiscoveryAgent) GetWebsocketClient(ctx context.Context, isHTTP, secure bool) (WebsocketServiceClient, error) { + // TODO + panic("not implemented") +} \ No newline at end of file diff --git a/provider/cluster/util/service_discovery_agent_types.go b/provider/cluster/util/service_discovery_agent_types.go index 138cf3ad60..7a7556baf7 100644 --- a/provider/cluster/util/service_discovery_agent_types.go +++ b/provider/cluster/util/service_discovery_agent_types.go @@ -3,6 +3,7 @@ package util import ( "context" "github.com/boz/go-lifecycle" + "github.com/gorilla/websocket" "github.com/tendermint/tendermint/libs/log" "io" "k8s.io/client-go/kubernetes" @@ -13,6 +14,7 @@ import ( type ServiceDiscoveryAgent interface { Stop() GetClient(ctx context.Context, isHTTPS, secure bool) (ServiceClient, error) + GetWebsocketClient(ctx context.Context, isHTTPS, secure bool) (WebsocketServiceClient, error) DiscoverNow() } @@ -21,6 +23,10 @@ type ServiceClient interface { DoRequest(req *http.Request) (*http.Response, error) } +type WebsocketServiceClient interface { + DialWebsocket(ctx context.Context, path string, requestHeader http.Header) (*websocket.Conn, error) +} + type serviceDiscoveryAgent struct { serviceName string namespace string @@ -43,9 +49,18 @@ type serviceDiscoveryRequest struct { resultCh chan<- clientFactory } -type clientFactory func(isHttps, secure bool) ServiceClient +type clientFactory interface { + MakeServiceClient(isHTTPS, secure bool) ServiceClient + MakeWebsocketServiceClient(isHTTPS, secure bool) WebsocketServiceClient +} type httpWrapperServiceClient struct { httpClient *http.Client url string } + +type websocketWrapperServiceClient struct { + url string + dialer *websocket.Dialer +} + diff --git a/provider/cmd/leaseLogs.go b/provider/cmd/leaseLogs.go index e38e1807df..af70ed7f10 100644 --- a/provider/cmd/leaseLogs.go +++ b/provider/cmd/leaseLogs.go @@ -1,9 +1,14 @@ package cmd import ( + "bytes" + "context" "crypto/tls" + "encoding/json" "fmt" + "strconv" "sync" + "time" sdkclient "github.com/cosmos/cosmos-sdk/client" sdk "github.com/cosmos/cosmos-sdk/types" @@ -18,13 +23,22 @@ import ( cutils "github.com/ovrclk/akash/x/cert/utils" ) +const( + FlagStartTime = "start-time" + FlagEndTime = "end-time" + FlagForward = "forward" + FlagLimit = "limit" + FlagRunIndex = "run-index" +) + func leaseLogsCmd() *cobra.Command { cmd := &cobra.Command{ Use: "lease-logs", Short: "get lease logs", SilenceUsage: true, + Args: cobra.MinimumNArgs(2), RunE: func(cmd *cobra.Command, args []string) error { - return doLeaseLogs(cmd) + return doLeaseLogsV2(cmd, args) }, } @@ -33,11 +47,327 @@ func leaseLogsCmd() *cobra.Command { cmd.Flags().BoolP(flagFollow, "f", false, "Specify if the logs should be streamed. Defaults to false") cmd.Flags().Int64P(flagTail, "t", -1, "The number of lines from the end of the logs to show. Defaults to -1") cmd.Flags().StringP(flagOutput, "o", outputText, "Output format text|json. Defaults to text") + cmd.Flags().Bool(FlagForward, true, "") + cmd.Flags().String(FlagEndTime, "", "") + cmd.Flags().String(FlagStartTime, "", "") + cmd.Flags().Uint(FlagLimit, 100, "") + cmd.Flags().Int(FlagRunIndex, -1, "") + + return cmd +} + +func leaseLogStatusCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "lease-log-status", + Short: "get lease log status", + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + return doLeaseLogsStatus(cmd) + }, + } + + addServiceFlags(cmd) return cmd } +func withLeasesForDeployment(ctx context.Context, cctx sdkclient.Context, deploymentID dtypes.DeploymentID, provider string,gseq, oseq uint32, fn func(leaseID mtypes.LeaseID) error) error { + filter := mtypes.LeaseFilters{ + Owner: deploymentID.Owner, + DSeq: deploymentID.DSeq, + State: mtypes.Lease_State_name[int32(mtypes.LeaseActive)], + } + + if len(provider) != 0 { + filter.Provider = provider + } + + if gseq > 0 { + filter.GSeq = gseq + } + + if oseq > 0 { + filter.OSeq = oseq + } + + cclient := akashclient.NewQueryClientFromCtx(cctx) + resp, err := cclient.Leases(ctx, &mtypes.QueryLeasesRequest{ + Filters: filter, + }) + if err != nil { + return err + } + + if len(resp.Leases) == 0 { + msg := &bytes.Buffer{} + _, _ = fmt.Fprintf(msg, "dseq=%v", filter.DSeq) + if len(provider) > 0 { + _, _ = fmt.Fprintf(msg, " provider=%v", filter.Provider) + } + if filter.GSeq > 0 { + _, _ = fmt.Fprintf(msg, " gseq=%v", filter.GSeq) + } + if filter.OSeq > 0 { + _, _ = fmt.Fprintf(msg, " oseq=%v", filter.OSeq) + } + return fmt.Errorf("%w: %s",errNoActiveLease, msg.String()) + } + + for _, lease := range resp.Leases { + err = fn(lease.Lease.LeaseID) + if err != nil { + return err + } + } + return nil +} + +func doLeaseLogsStatus(cmd *cobra.Command) error { + cctx, err := sdkclient.GetClientTxContext(cmd) + if err != nil { + return err + } + + dseq, err := dseqFromFlags(cmd.Flags()) + if err != nil { + return err + } + + deploymentID := dtypes.DeploymentID{ + Owner: cctx.GetFromAddress().String(), + DSeq: dseq, + } + + provider, err := cmd.Flags().GetString(FlagProvider) + if err != nil { + return err + } + gseq, err := cmd.Flags().GetUint32(FlagGSeq) + if err != nil { + return err + } + oseq, err := cmd.Flags().GetUint32(FlagOSeq) + if err != nil { + return err + } + + cert, err := cutils.LoadAndQueryCertificateForAccount(cmd.Context(), cctx, cctx.Keyring) + if err != nil { + return markRPCServerError(err) + } + + return withLeasesForDeployment(cmd.Context(), + cctx, + deploymentID, + provider, + gseq, + oseq, + func(leaseID mtypes.LeaseID) error { + // TODO - use client directory object here? + prov, _ := sdk.AccAddressFromBech32(leaseID.GetProvider()) + gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + if err != nil { + return err + } + status, err := gclient.LeaseLogsStatus(cmd.Context(), leaseID) + if err != nil { + return err + } + + _ = status // TODO - print status + buf := &bytes.Buffer{} + encoder := json.NewEncoder(buf) + err = encoder.Encode(status) + if err != nil { + return err + } + return cctx.PrintBytes(buf.Bytes()) + }) +} + +func streamLeaseLogs(ctx context.Context, + cert tls.Certificate, +cctx sdkclient.Context, +deploymentID dtypes.DeploymentID, +provider string, +gseq uint32, +oseq uint32, +serviceName string, +startTime time.Time, +replicaIndex int, +runIndex int) error { + /** TODO - since service name is specified does this need to actually search for all the providers? + it should only be running on 1 provider with our architecture + */ + return withLeasesForDeployment(ctx, + cctx, + deploymentID, + provider, + gseq, + oseq, + func(leaseID mtypes.LeaseID) error { + prov, _ := sdk.AccAddressFromBech32(leaseID.GetProvider()) + gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + if err != nil { + return err + } + + return gclient.LeaseLogsV2Follow(ctx, + leaseID, + serviceName, + uint(replicaIndex), + runIndex, + startTime, func(at time.Time, line string) error { + return cctx.PrintString(fmt.Sprintf("%v %s\n", at, line)) + }) + }) +} + + +func doLeaseLogsV2(cmd *cobra.Command, args []string) error { + serviceName := args[0] + replicaIndexStr := args[1] + + replicaIndex, err := strconv.ParseUint(replicaIndexStr, 10, 31) + if err != nil { + // TODO - better error here + return err + } + + cctx, err := sdkclient.GetClientTxContext(cmd) + if err != nil { + return err + } + + dseq, err := dseqFromFlags(cmd.Flags()) + if err != nil { + return err + } + + deploymentID := dtypes.DeploymentID{ + Owner: cctx.GetFromAddress().String(), + DSeq: dseq, + } + + provider, err := cmd.Flags().GetString(FlagProvider) + if err != nil { + return err + } + gseq, err := cmd.Flags().GetUint32(FlagGSeq) + if err != nil { + return err + } + oseq, err := cmd.Flags().GetUint32(FlagOSeq) + if err != nil { + return err + } + + cert, err := cutils.LoadAndQueryCertificateForAccount(cmd.Context(), cctx, cctx.Keyring) + if err != nil { + return markRPCServerError(err) + } + + limit, err := cmd.Flags().GetUint(FlagLimit) + if err != nil { + return err + } + startTimeStr, err := cmd.Flags().GetString(FlagStartTime) + if err != nil { + return err + } + endTimeStr,err := cmd.Flags().GetString(FlagEndTime) + if err != nil { + return err + } + forward, err := cmd.Flags().GetBool(FlagForward) + if err != nil { + return err + } + runIndex, err := cmd.Flags().GetInt(FlagRunIndex) + + startTime := time.Time{} + if len(startTimeStr) != 0 { + startTime, err = time.Parse(time.RFC3339, startTimeStr) + if err != nil { + return err + } + } + endTime := time.Time{} + if len(endTimeStr) != 0 { + endTime, err = time.Parse(time.RFC3339, endTimeStr) + if err != nil { + return err + } + } + + follow, err := cmd.Flags().GetBool(flagFollow) + if err != nil { + return err + } + if follow { + return streamLeaseLogs(cmd.Context(), + cert, + cctx, + deploymentID, + provider, + gseq, + oseq, + serviceName, + startTime, + int(replicaIndex), + runIndex) + } + + /** TODO - since service name is specified does this need to actually search for all the providers? + it should only be running on 1 provider with our architecture + */ + return withLeasesForDeployment(cmd.Context(), + cctx, + deploymentID, + provider, + gseq, + oseq, + func(leaseID mtypes.LeaseID) error { + // TODO - use client directory object here? + prov, _ := sdk.AccAddressFromBech32(leaseID.GetProvider()) + gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + if err != nil { + return err + } + + result, err := gclient.LeaseLogsV2(cmd.Context(), + leaseID, + serviceName, + uint(replicaIndex), + runIndex, + startTime, + endTime, + forward, + limit) + + if err != nil { + clientError, ok := err.(gwrest.ClientResponseError) + if ok{ + _ = cctx.PrintString(clientError.ClientError()) + } + return err + } + + // TODO - check output flag for format + buf := &bytes.Buffer{} + encoder := json.NewEncoder(buf) + err = encoder.Encode(result) + if err != nil { + return err + } + + return cctx.PrintBytes(buf.Bytes()) + }) +} + func doLeaseLogs(cmd *cobra.Command) error { + // TODO - clean up most of this into some sort of simple "withEachLease" function that calls a + // function for each lease found cctx, err := sdkclient.GetClientTxContext(cmd) if err != nil { return err diff --git a/provider/cmd/root.go b/provider/cmd/root.go index 31e5a911a7..58b03b24c8 100644 --- a/provider/cmd/root.go +++ b/provider/cmd/root.go @@ -36,6 +36,8 @@ func RootCmd() *cobra.Command { cmd.AddCommand(AuthenticateCmd()) cmd.AddCommand(clusterNSCmd()) cmd.AddCommand(migrate()) + cmd.AddCommand(RunResourceServerCmd()) + cmd.AddCommand(leaseLogStatusCmd()) return cmd } diff --git a/provider/cmd/run.go b/provider/cmd/run.go index ed6e7165ba..ecb00d2253 100644 --- a/provider/cmd/run.go +++ b/provider/cmd/run.go @@ -8,6 +8,7 @@ import ( "encoding/pem" "fmt" "github.com/ovrclk/akash/provider/cluster/kube/clientcommon" + "github.com/ovrclk/akash/provider/cluster/kube/loki" "github.com/ovrclk/akash/provider/cluster/operatorclients" "github.com/ovrclk/akash/provider/operator/waiter" "io" @@ -455,6 +456,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { cachedResultMaxAge := viper.GetDuration(FlagCachedResultMaxAge) rpcQueryTimeout := viper.GetDuration(FlagRPCQueryTimeout) enableIPOperator := viper.GetBool(FlagEnableIPOperator) + enableLogging := true // TODO - configurable via flag pricing, err := createBidPricingStrategy(strategy) if err != nil { @@ -656,6 +658,14 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { } } + var lokiClient loki.Client + if enableLogging { + lokiClient, err = loki.NewClient(logger, kubeConfig, nil) // TODO - pass hostname configuration from flags + if err != nil { + return err + } + } + endpoint, err := provider_flags.GetServiceEndpointFlagValue(logger, serviceHostnameOperator) if err != nil { return err @@ -685,6 +695,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { service, cquery, ipOperatorClient, + lokiClient, gwaddr, cctx.FromAddress, []tls.Certificate{cert}, @@ -733,6 +744,9 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { if ipOperatorClient != nil { ipOperatorClient.Stop() } + if lokiClient != nil { + lokiClient.Stop() + } hostnameOperatorClient.Stop() if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, http.ErrServerClosed) { return err diff --git a/provider/cmd/run_resource_server.go b/provider/cmd/run_resource_server.go new file mode 100644 index 0000000000..8ad34892ad --- /dev/null +++ b/provider/cmd/run_resource_server.go @@ -0,0 +1,120 @@ +package cmd + +import ( + "bytes" + "context" + "crypto/ecdsa" + "crypto/x509" + "encoding/pem" + "errors" + "io" + "net/http" + + "golang.org/x/sync/errgroup" + + sdkclient "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/client/flags" + "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/ovrclk/akash/cmd/common" + gwrest "github.com/ovrclk/akash/provider/gateway/rest" + cutils "github.com/ovrclk/akash/x/cert/utils" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +const ( + FlagResourceServerListenAddress = "resource-server-listen-address" + FlagLokiGatewayListenAddress = "loki-gateway-listen-address" +) + +var ( + ErrEcdsaPubkeyExpected = errors.New("expected a ecdsa public key") +) + +func RunResourceServerCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "run-resource-server", + Short: "Run the resource server which authenticates tenants based on JWT before" + + " providing access to resources", + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + return common.RunForeverWithContext(cmd.Context(), func(ctx context.Context) error { + return doRunResourceServer(ctx, cmd, args) + }) + }, + } + flags.AddTxFlagsToCmd(cmd) + + cmd.Flags().String(FlagResourceServerListenAddress, "0.0.0.0:8445", + "`host:port` for the resource server to listen on") + if err := viper.BindPFlag(FlagResourceServerListenAddress, cmd.Flags().Lookup(FlagResourceServerListenAddress)); err != nil { + return nil + } + + cmd.Flags().String(FlagLokiGatewayListenAddress, "localhost:3100", + "`host:port` where the loki instance is exposed on provider's network") + if err := viper.BindPFlag(FlagLokiGatewayListenAddress, cmd.Flags().Lookup(FlagLokiGatewayListenAddress)); err != nil { + return nil + } + + cmd.Flags().String(FlagAuthPem, "", "") + + return cmd +} + +func doRunResourceServer(ctx context.Context, cmd *cobra.Command, _ []string) error { + gwAddr := viper.GetString(FlagResourceServerListenAddress) + lokiGwAddr := viper.GetString(FlagLokiGatewayListenAddress) + + cctx, err := sdkclient.GetClientTxContext(cmd) + if err != nil { + return err + } + + txFactory := tx.NewFactoryCLI(cctx, cmd.Flags()).WithTxConfig(cctx.TxConfig).WithAccountRetriever(cctx.AccountRetriever) + + var certFromFlag io.Reader + if val := cmd.Flag(FlagAuthPem).Value.String(); val != "" { + certFromFlag = bytes.NewBufferString(val) + } + + cpem, err := cutils.LoadPEMForAccount(cctx, txFactory.Keybase(), cutils.PEMFromReader(certFromFlag)) + if err != nil { + return err + } + + blk, _ := pem.Decode(cpem.Cert) + x509cert, err := x509.ParseCertificate(blk.Bytes) + if err != nil { + return err + } + + pubkey, ok := x509cert.PublicKey.(*ecdsa.PublicKey) + if !ok { + return ErrEcdsaPubkeyExpected + } + + group, ctx := errgroup.WithContext(ctx) + log := openLogger() + + resourceServer, err := gwrest.NewResourceServer(ctx, log, gwAddr, cctx.FromAddress, pubkey, lokiGwAddr) + if err != nil { + return err + } + + group.Go(func() error { + return resourceServer.ListenAndServe() + }) + + group.Go(func() error { + <-ctx.Done() + return resourceServer.Close() + }) + + err = group.Wait() + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, http.ErrServerClosed) { + return err + } + + return nil +} diff --git a/provider/gateway/rest/client.go b/provider/gateway/rest/client.go index 66b2cb1060..6ff20c647b 100644 --- a/provider/gateway/rest/client.go +++ b/provider/gateway/rest/client.go @@ -8,6 +8,7 @@ import ( "crypto/x509" "encoding/json" "fmt" + "github.com/ovrclk/akash/provider/cluster/kube/loki" "io" "net/http" "net/url" @@ -55,6 +56,25 @@ type Client interface { tty bool, tsq <-chan remotecommand.TerminalSize) error MigrateHostnames(ctx context.Context, hostnames []string, dseq uint64, gseq uint32) error + + // TODO - this should be part of an optional client interface only avaiable if the provider supports it + LeaseLogsStatus(ctx context.Context, leaseID mtypes.LeaseID) ([]loki.LogStatus, error) + LeaseLogsV2(ctx context.Context, + leaseID mtypes.LeaseID, + serviceName string, + replicaIndex uint, + runIndex int, + startTime time.Time, + endTime time.Time, + forward bool, + limit uint) (loki.LogResult, error) + LeaseLogsV2Follow(ctx context.Context, + leaseID mtypes.LeaseID, + serviceName string, + replicaIndex uint, + runIndex int, + startTime time.Time, + outputFn func(at time.Time, line string)error) (error) } type JwtClient interface { @@ -657,14 +677,17 @@ func (c *client) getStatus(ctx context.Context, uri string, obj interface{}) err return dec.Decode(obj) } -func createClientResponseErrorIfNotOK(resp *http.Response, responseBuf *bytes.Buffer) error { +func createClientResponseErrorIfNotOK(resp *http.Response, responseBuf io.Reader) error { if resp.StatusCode == http.StatusOK { return nil } + errString := &bytes.Buffer{} + _, _ = io.Copy(errString, responseBuf) + return ClientResponseError{ Status: resp.StatusCode, - Message: responseBuf.String(), + Message: errString.String(), } } @@ -775,6 +798,183 @@ func (c *client) LeaseLogs(ctx context.Context, return logs, nil } +func (c *client) LeaseLogsStatus(ctx context.Context, leaseID mtypes.LeaseID) ([]loki.LogStatus, error) { + path := c.host.String() + "/" + leaseLogsStatusPath(leaseID) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", contentTypeJSON) + + resp, err := c.hclient.Do(req) + if err != nil { + return nil, err + } + responseBuf := &bytes.Buffer{} + _, err = io.Copy(responseBuf, resp.Body) + defer func() { + _ = resp.Body.Close() + }() + if err != nil { + return nil, err + } + + err = createClientResponseErrorIfNotOK(resp, bytes.NewReader(responseBuf.Bytes())) + if err != nil { + return nil, err + } + + var obj []loki.LogStatus + decoder := json.NewDecoder(responseBuf) + err = decoder.Decode(&obj) + if err != nil { + return nil, err + } + + return obj, nil +} + +func (c *client) LeaseLogsV2(ctx context.Context, leaseID mtypes.LeaseID, +serviceName string, +replicaIndex uint, +runIndex int, +startTime time.Time, +endTime time.Time, +forward bool, +limit uint) (loki.LogResult, error) { + + query := url.Values{} + if forward { + query.Set(forwardKey, "1") + } else { + query.Set(forwardKey, "0") + } + + if limit > 0 { + query.Set(limitKey, fmt.Sprintf("%d", limit)) + } + + if runIndex > 0 { + query.Set(runIndexKey, fmt.Sprintf("%d", runIndex)) + } + + if !startTime.Equal(time.Time{}) { + query.Set(startTimeKey, fmt.Sprintf("%d", startTime.Unix())) + } + + if !endTime.Equal(time.Time{}) { + query.Set(endTimeKey, fmt.Sprintf("%d", endTime.Unix())) + } + + path := c.host.String() + "/" + leaseLogsQuery(leaseID, serviceName, replicaIndex) + "?" + query.Encode() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil) + if err != nil { + return loki.LogResult{}, err + } + req.Header.Set("Content-Type", contentTypeJSON) + + resp, err := c.hclient.Do(req) + if err != nil { + return loki.LogResult{}, err + } + responseBuf := &bytes.Buffer{} + _, err = io.Copy(responseBuf, resp.Body) + defer func() { + _ = resp.Body.Close() + }() + if err != nil { + return loki.LogResult{}, err + } + + err = createClientResponseErrorIfNotOK(resp, bytes.NewReader(responseBuf.Bytes())) + if err != nil { + return loki.LogResult{}, err + } + + var obj loki.LogResult + decoder := json.NewDecoder(responseBuf) + err = decoder.Decode(&obj) + if err != nil { + return loki.LogResult{}, err + } + + return obj, nil +} + +var errBadLogLine = errors.New("bad log line") + +func (c *client) LeaseLogsV2Follow(ctx context.Context, leaseID mtypes.LeaseID, + serviceName string, + replicaIndex uint, + runIndex int, + startTime time.Time, + outputFn func(at time.Time, line string) error) (error) { + query := url.Values{} + query.Set("follow", "1") + + if runIndex > 0 { + query.Set(runIndexKey, fmt.Sprintf("%d", runIndex)) + } + + if !startTime.Equal(time.Time{}) { + query.Set(startTimeKey, fmt.Sprintf("%d", startTime.Unix())) + } + + secure := c.host.Scheme == "https" + hostCopy := c.host + if secure { + hostCopy.Scheme = "wss" + } else { + hostCopy.Scheme = "ws" + } + path := c.host.String() + "/" + leaseLogsQuery(leaseID, serviceName, replicaIndex) + "?" + query.Encode() + ws, resp, err := c.wsclient.DialContext(ctx, path, nil ) + if err != nil { + if resp == nil { + return err + } + if clientErr := createClientResponseErrorIfNotOK(resp, resp.Body); clientErr != nil { + return clientErr + } + return err + } + + for { + msg := []interface{}{nil, nil} + _, r, err := ws.NextReader() + if err != nil { + return err + } + dec := json.NewDecoder(r) + dec.UseNumber() + err = dec.Decode(&msg) + if err != nil { + return err + } + + atNum, ok := msg[0].(json.Number) + if !ok { + return fmt.Errorf("%w: expected number at index 0 of log line, got %T", errBadLogLine, msg[0]) + } + atInt, err := atNum.Int64() + if err != nil { + return fmt.Errorf("%w: expected number that is int64 at index 0 of log line, got %v", errBadLogLine, atNum.String()) + } + + logLine ,ok := msg[1].(string) + if !ok { + return fmt.Errorf("%w: expected string at index 1 of log line got %T", errBadLogLine, msg[1]) + } + + at := time.Unix(0, atInt) + + err = outputFn(at, logLine) + if err != nil { + return err + } + } +} + // parseCloseMessage extract close reason from websocket close message // "websocket: [error code]: [client reason]" func parseCloseMessage(msg string) string { diff --git a/provider/gateway/rest/middleware.go b/provider/gateway/rest/middleware.go index 40df76c635..b9f01a1d74 100644 --- a/provider/gateway/rest/middleware.go +++ b/provider/gateway/rest/middleware.go @@ -1,12 +1,17 @@ package rest import ( + "crypto/ecdsa" "net/http" "strconv" "strings" + "github.com/golang-jwt/jwt/v4" + "github.com/tendermint/tendermint/libs/log" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/gorilla/context" + gcontext "github.com/gorilla/context" "github.com/gorilla/mux" "github.com/pkg/errors" @@ -207,3 +212,40 @@ func requestStreamParams() mux.MiddlewareFunc { }) } } + +func resourceServerAuth(log log.Logger, providerAddr sdk.Address, publicKey *ecdsa.PublicKey) mux.MiddlewareFunc { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // verify the provided JWT + token, err := jwt.ParseWithClaims(r.Header.Get("Authorization"), &ClientCustomClaims{}, func(token *jwt.Token) (interface{}, error) { + // return the public key to be used for JWT verification + return publicKey, nil + }) + if err != nil { + log.Error("falied to parse JWT", "error", err) + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + // delete the Authorization header as it is no more needed + r.Header.Del("Authorization") + + // store the owner & provider address in request context to be used in later handlers + customClaims, ok := token.Claims.(*ClientCustomClaims) + if !ok { + log.Error("failed to parse JWT claims") + http.Error(w, "Invalid JWT", http.StatusUnauthorized) + return + } + ownerAddress, err := sdk.AccAddressFromBech32(customClaims.Subject) + if err != nil { + log.Error("failed parsing owner address", "error", err, "address", customClaims.Subject) + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + gcontext.Set(r, ownerContextKey, ownerAddress) + gcontext.Set(r, providerContextKey, providerAddr) + + next.ServeHTTP(w, r) + }) + } +} diff --git a/provider/gateway/rest/path.go b/provider/gateway/rest/path.go index 61a841abca..b7819b0cfe 100644 --- a/provider/gateway/rest/path.go +++ b/provider/gateway/rest/path.go @@ -52,3 +52,11 @@ func serviceStatusPath(id mtypes.LeaseID, service string) string { func serviceLogsPath(id mtypes.LeaseID) string { return fmt.Sprintf("%s/logs", leasePath(id)) } + +func leaseLogsStatusPath(leaseID mtypes.LeaseID) string { + return fmt.Sprintf("%s/logs/status", leasePath(leaseID)) +} + +func leaseLogsQuery(leaseID mtypes.LeaseID, serviceName string, replicaIndex uint) string { + return fmt.Sprintf("%s/logs/query/%s/%d", leasePath(leaseID), serviceName, replicaIndex) +} \ No newline at end of file diff --git a/provider/gateway/rest/router.go b/provider/gateway/rest/router.go index 464316fb56..b7bc414f7f 100644 --- a/provider/gateway/rest/router.go +++ b/provider/gateway/rest/router.go @@ -3,18 +3,24 @@ package rest import ( "bufio" "context" + "crypto/ecdsa" "encoding/json" "fmt" + "github.com/ovrclk/akash/provider/cluster/kube/loki" "github.com/ovrclk/akash/provider/cluster/operatorclients" "github.com/ovrclk/akash/provider/gateway/utils" ipoptypes "github.com/ovrclk/akash/provider/operator/ipoperator/types" "io" "net/http" + "net/http/httputil" + "net/url" "strconv" + "strings" "sync" "time" "github.com/golang-jwt/jwt/v4" + "github.com/ovrclk/akash/provider/cluster/kube/builder" "k8s.io/client-go/tools/remotecommand" @@ -41,6 +47,7 @@ import ( manifestValidation "github.com/ovrclk/akash/validation" dtypes "github.com/ovrclk/akash/x/deployment/types/v1beta2" mtypes "github.com/ovrclk/akash/x/market/types/v1beta2" + providermanifest "github.com/ovrclk/akash/provider/manifest" ) type CtxAuthKey string @@ -64,6 +71,15 @@ const ( websocketInternalServerErrorCode = 4000 websocketLeaseNotFound = 4001 manifestSubmitTimeout = 120 * time.Second + + serviceNameKey = "serviceName" + replicaIndexKey = "replicaIndex" + limitKey = "limit" + runIndexKey = "runIndex" + startTimeKey = "startTime" + endTimeKey = "endTime" + forwardKey = "forward" + followKey = "follow" ) type wsStreamConfig struct { @@ -75,7 +91,7 @@ type wsStreamConfig struct { client cluster.ReadClient } -func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopclient operatorclients.IPOperatorClient, ctxConfig map[interface{}]interface{}) *mux.Router { +func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopclient operatorclients.IPOperatorClient, lokiClient loki.Client, ctxConfig map[interface{}]interface{}) *mux.Router { router := mux.NewRouter() @@ -158,6 +174,12 @@ func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopcl leaseLogsHandler(log, pclient.Cluster())). Methods("GET") + logRouter.HandleFunc("/status", + logStatusHandler(log, pclient.Manifest(), lokiClient)).Methods(http.MethodGet) + + logRouter.HandleFunc("/query/{serviceName}/{replicaIndex}", + logQueryHandler(log, pclient.Manifest(), lokiClient)).Methods(http.MethodGet) + srouter := lrouter.PathPrefix("/service/{serviceName}").Subrouter() srouter.Use( requireService(), @@ -175,6 +197,279 @@ func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopcl return router } +func logQueryFollowHandler(logger log.Logger, + lokiClient loki.Client, + leaseID mtypes.LeaseID, + rw http.ResponseWriter, + req *http.Request, + serviceName string, + replicaIndex uint, + runIndex int, + startTime time.Time, + ) { + + upgrader := websocket.Upgrader{ + ReadBufferSize: 0, + WriteBufferSize: 65535, + } + + ws, err := upgrader.Upgrade(rw, req, nil) + if err != nil { + // At this point the connection either has a response sent already + // or it has been closed + logger.Error("failed handshake", "err", err) + return + } + + // TODO - figoure out how to flush this at a regular cadence to avoid buffering ? + ws.EnableWriteCompression(true) + sendLogLine := func(at time.Time, line string) error { + msg := []interface{}{at.UnixNano(), line} + err := ws.SetWriteDeadline(time.Now().Add(time.Second * 10)) // TODO - configurable ??? + if err != nil { + return err + } + return ws.WriteJSON(msg) + } + + onDropped := func() error { + return nil // Ignore for now + } + + // TODO - put a hard time limit on this to prevent forever running goroutines? + err = lokiClient.TailLogsByService(req.Context(), + leaseID, + serviceName, + replicaIndex, + runIndex, + startTime, + sendLogLine, + onDropped) + + if err != nil { + if ! errors.Is(err, context.Canceled) { + logger.Error("failed during tail of logs", "err", err) + } + } + + err = ws.WriteControl(websocket.CloseMessage,[]byte{}, time.Now().Add(time.Second * 30)) + if err != nil { + logger.Error("could not send websocket close message", "err", err) + } + err = ws.Close() + if err != nil { + logger.Error("could not close websocket", "err", err) + } +} + +func logQueryHandler(logger log.Logger, manifestClient providermanifest.Client, lokiClient loki.Client) http.HandlerFunc{ + return func(rw http.ResponseWriter, req *http.Request) { + leaseID := requestLeaseID(req) + + // Make sure lease is active + active, err := manifestClient.IsActive(req.Context(), leaseID.DeploymentID()) + if err != nil { + logger.Error("could not check if lease is active", "lease-id", leaseID, err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + if !active { + rw.WriteHeader(http.StatusNotFound) + return + } + + serviceName := mux.Vars(req)[serviceNameKey] // TODO validate this exists + replicaIndexStr := mux.Vars(req)[replicaIndexKey] + + replicaIndex, err := strconv.ParseUint(replicaIndexStr, 10, 31) + if err != nil { + logger.Error("could not parse path component for replica index", "err", err, "replicaIndex", replicaIndexStr) + http.Error(rw, fmt.Sprintf("could not parse replica index %q - %v", replicaIndexStr, err), http.StatusNotFound) + return + } + + // TODO - validate replica index points to a valid replica + + query := req.URL.Query() + + limit := uint(1000) + limitStr := query.Get(limitKey) + if len(limitStr) != 0 { + limit64, err := strconv.ParseUint(limitStr, 10, 31) + if err != nil { + logger.Error("could not parse limit", limitKey, limitStr, "err", err) + http.Error(rw, fmt.Sprintf("could not parse limit %q - %v", limitStr, err), http.StatusBadRequest) + return + } + limit = uint(limit64) + } + + // Parse runIndex from query, if present + runIndex := -1 + runIndexStr := query.Get(runIndexKey) + if len(runIndexStr) != 0 { + runIndex64, err := strconv.ParseUint(runIndexStr, 10, 31) + if err != nil { + logger.Error("could not parse run index", runIndexKey, runIndexStr, "err", err) + http.Error(rw, fmt.Sprintf("could not run index %q - %v", runIndexStr, err), http.StatusBadRequest) + return + } + runIndex = int(runIndex64) + } + + // Parse start time from query, if present + startTime := time.Now().Add(-1 * time.Hour) + // start time + startTimeStr := query.Get(startTimeKey) + if len(startTimeStr) != 0 { + startTimeInt, err := strconv.ParseInt(startTimeStr, 10, 64) + if err != nil { + logger.Error("could not parse startTime", startTimeKey, startTimeStr, "err", err) + http.Error(rw, fmt.Sprintf("could not parse start time %q - %v", startTimeStr, err), http.StatusBadRequest) + return + } + + startTime = time.Unix(startTimeInt,0) + } + + // Parse end time from query, if present + endTime := time.Now() + endTimeStr := query.Get(endTimeKey) + if len(endTimeStr) != 0 { + endTimeInt, err := strconv.ParseInt(endTimeStr, 10, 64) + if err != nil { + logger.Error("could not parse endTime", endTimeKey, endTimeStr, "err", err) + http.Error(rw, fmt.Sprintf("could not parse end time %q - %v", endTimeStr, err), http.StatusBadRequest) + return + } + + endTime = time.Unix(endTimeInt,0) + } + + const timeLimit = time.Hour * 72 // TODO - configurable by the provider + duration := endTime.Sub(startTime) + if duration > timeLimit || duration < 0 { + logger.Error("duration of time range queried is not allowed", startTimeKey, startTime, endTimeKey, endTime) + http.Error(rw, fmt.Sprintf("time range of %v too large, maximum is %v", duration, timeLimit), http.StatusBadRequest) + return + } + + forward := true + // Parse direction from query, if present + forwardStr := query.Get(forwardKey) + if len(forwardStr) != 0 { + switch(forwardStr){ + case "0": + forward = false + case "1": + forward = true + default: + logger.Error("unknown value for forward", forwardKey, forwardStr) + http.Error(rw, fmt.Sprintf("unacceptable value for forward %q", forwardStr), http.StatusBadRequest) + return + } + } + + follow := false + followStr := query.Get(followKey) + if len(followStr) != 0 { + switch(followStr) { + case "0": + follow = false + case "1": + follow = true + + default: + logger.Error("unknown value for follow", followKey, forwardStr) + http.Error(rw, fmt.Sprintf("unacceptable value for follow %q", followStr), http.StatusBadRequest) + return + } + } + + if follow { + const msg = "cannot specify end time, reverse order, or limit if logs are tailed" + if len(endTimeStr) != 0 { + logger.Error("client requested log follow with endtime") + http.Error(rw, msg, http.StatusBadRequest) + return + } + + if !forward { + logger.Error("client requested log follow with reverse direction") + http.Error(rw, msg, http.StatusBadRequest) + return + } + + if len(limitStr) != 0 { + logger.Error("client request log follow with limit") + http.Error(rw, msg, http.StatusBadRequest) + return + } + // Upgrade to a websocket + logQueryFollowHandler(logger, + lokiClient, + leaseID, + rw, + req, + serviceName, + uint(replicaIndex), + runIndex, + startTime) + return + } + + logs, err := lokiClient.GetLogByService(req.Context(),leaseID, serviceName, uint(replicaIndex), + runIndex, + startTime, + endTime, forward, limit) + if err != nil { + logger.Error("could not get logs for lease", "lease", leaseID, "err", err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + rw.WriteHeader(http.StatusOK) + enc := json.NewEncoder(rw) + err = enc.Encode(logs) + if err != nil { + logger.Error("could not write lease log status", "lease", leaseID, "err", err) + } + } +} + +func logStatusHandler(logger log.Logger, manifestClient providermanifest.Client, lokiClient loki.Client) http.HandlerFunc{ + return func(rw http.ResponseWriter, req *http.Request) { + leaseID := requestLeaseID(req) + // Make sure lease is active + active, err := manifestClient.IsActive(req.Context(), leaseID.DeploymentID()) + if err != nil { + logger.Error("could not check if lease is active", "lease-id", leaseID, err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + if !active { + rw.WriteHeader(http.StatusNotFound) + return + } + + result, err := lokiClient.FindLogsByLease(req.Context(), leaseID) + if err != nil { + logger.Error("could not get logs for lease", "lease", leaseID, "err", err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + rw.WriteHeader(http.StatusOK) + enc := json.NewEncoder(rw) + err = enc.Encode(result) + if err != nil { + logger.Error("could not write lease log status", "lease", leaseID, "err", err) + } + } +} + func newJwtServerRouter(addr sdk.Address, privateKey interface{}, jwtExpiresAfter time.Duration, certSerialNumber string) *mux.Router { router := mux.NewRouter() @@ -185,6 +480,59 @@ func newJwtServerRouter(addr sdk.Address, privateKey interface{}, jwtExpiresAfte return router } +func newResourceServerRouter(log log.Logger, providerAddr sdk.Address, publicKey *ecdsa.PublicKey, lokiGwAddr string) *mux.Router { + router := mux.NewRouter() + + // add a middleware to verify the JWT provided in Authorization header + router.Use(resourceServerAuth(log, providerAddr, publicKey)) + + lrouter := router.PathPrefix(leasePathPrefix).Subrouter() + lrouter.Use(requireLeaseID()) + + lokiServiceRouter := lrouter.PathPrefix("/loki-service").Subrouter() + lokiServiceRouter.NewRoute().Handler(lokiServiceHandler(log, lokiGwAddr)) + + return router +} + +// lokiServiceHandler forwards all requests to the loki instance running in provider's cluster. +// Example: +// Incoming Request: http://localhost:8445/lease/1/1/1/loki-service/loki/api/v1/query?query={app=".+"} +// Outgoing Request: http://{lokiGwAddr}/loki/api/v1/query?query={app=".+"} +func lokiServiceHandler(log log.Logger, lokiGwAddr string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // set the X-Scope-OrgID header for fetching logs for the right tenant + r.Header.Set("X-Scope-OrgID", builder.LidNS(requestLeaseID(r))) + + // build target url for the reverse proxy + scheme := "http" // for http & https + if strings.HasPrefix(r.URL.Scheme, "ws") { + scheme = "ws" // for ws & wss + } + lokiURL, err := url.Parse(fmt.Sprintf("%s://%s", scheme, lokiGwAddr)) + if err != nil { + log.Error(err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + reverseProxy := httputil.NewSingleHostReverseProxy(lokiURL) + + // remove the "/lease/{dseq}/{gseq}/{oseq}/loki-service" path prefix from the request url + // before it is sent to the reverse proxy. + pathSplits := strings.SplitN(r.URL.Path, "/", 7) + if len(pathSplits) < 7 || pathSplits[6] == "" { + log.Error("loki api not provided in url") + http.Error(w, "loki api not provided in url", http.StatusBadRequest) + return + } + r.URL.Path = pathSplits[6] + + // serve the request using the reverse proxy + log.Info("Forwarding request to loki", "HTTP_API", pathSplits[6]) + reverseProxy.ServeHTTP(w, r) + } +} + func jwtServiceHandler(paddr sdk.Address, privateKey interface{}, jwtExpiresAfter time.Duration, certSerialNumber string) http.HandlerFunc { return func(writer http.ResponseWriter, request *http.Request) { now := time.Now() diff --git a/provider/gateway/rest/server.go b/provider/gateway/rest/server.go index 27b4466f21..7da80e174c 100644 --- a/provider/gateway/rest/server.go +++ b/provider/gateway/rest/server.go @@ -2,7 +2,9 @@ package rest import ( "context" + "crypto/ecdsa" "crypto/tls" + "github.com/ovrclk/akash/provider/cluster/kube/loki" "github.com/ovrclk/akash/provider/cluster/operatorclients" "net" "net/http" @@ -22,6 +24,7 @@ func NewServer( pclient provider.Client, cquery ctypes.QueryClient, ipopclient operatorclients.IPOperatorClient, + lokiClient loki.Client, address string, pid sdk.Address, certs []tls.Certificate, @@ -29,7 +32,7 @@ func NewServer( srv := &http.Server{ Addr: address, - Handler: newRouter(log, pid, pclient, ipopclient, clusterConfig), + Handler: newRouter(log, pid, pclient, ipopclient, lokiClient, clusterConfig), BaseContext: func(_ net.Listener) context.Context { return ctx }, @@ -69,3 +72,19 @@ func NewJwtServer(ctx context.Context, return srv, nil } + +func NewResourceServer(ctx context.Context, + log log.Logger, + serverAddr string, + providerAddr sdk.Address, + pubkey *ecdsa.PublicKey, + lokiGwAddr string, +) (*http.Server, error) { + srv := &http.Server{ + Addr: serverAddr, + Handler: newResourceServerRouter(log, providerAddr, pubkey, lokiGwAddr), + BaseContext: func(_ net.Listener) context.Context { return ctx }, + } + + return srv, nil +}