Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Proof of concept using Loki to store and retrieve logs #1540

Open
wants to merge 13 commits into
base: edu/metal-lb
Choose a base branch
from
Prev Previous commit
Next Next commit
wip, query loki for logs
hydrogen18 committed Mar 2, 2022
commit 20d80698aa284c515bb498af7c97b33ba38d67cf
243 changes: 223 additions & 20 deletions provider/cluster/kube/loki/loki.go
Original file line number Diff line number Diff line change
@@ -6,19 +6,25 @@ same as this project.
*/

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
clusterutil "github.com/ovrclk/akash/provider/cluster/util"
mtypes "github.com/ovrclk/akash/x/market/types/v1beta2"
"github.com/tendermint/tendermint/libs/log"
"io"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"net"
"net/http"
"net/url"
"path"
"sort"
"strconv"
"strings"
"sync"
"time"
)
@@ -39,12 +45,20 @@ const (
lokiOrgIdHeader = "X-Scope-OrgID"
)

type lokiDirection string

const (
FORWARD lokiDirection = "FORWARD"
BACKWARD lokiDirection = "BACKWARD"
)

var (
ErrLoki = errors.New("error querying loki")
)

type Client interface {
FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([]LogStatus, error)
GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime, endTime time.Time, forward bool) (LogResult, error)
Stop()
}

@@ -75,14 +89,153 @@ func (c *client) Stop() {
c.sda.Stop()
}

func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex uint, startTime, endTime time.Time) {
type lokiLogQueryResult struct {
Stream map[string]string `json:"stream"`
Values [][]string `json:"values"`
}

type lokiLogQueryData struct {
ResultType string `json:"resultType"`
Result []lokiLogQueryResult `json:"result"`
}

type lokiLogQueryResponse struct {
Status string `json:"status"`
Data lokiLogQueryData `json:"data"`
}

type LogResultLine struct {
RunIndex uint
At time.Time
Line string
}

type LogResult struct {
ServiceName string
ReplicaIndex uint
Entries []LogResultLine

}

func (c *client) GetLogByService(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, replicaIndex uint, runIndex int, startTime, endTime time.Time, forward bool) (LogResult, error) {
lidNS := clusterutil.LeaseIDToNamespace(leaseID)
// get a list of possible logs for this service
possiblePods, err := c.detectRunsForLease(ctx, leaseID)
if err != nil {
return LogResult{}, err
}

datamap := serviceNameAndReplicaIndexToPodName(possiblePods)

podName, exists := datamap[serviceNameAndReplicaIndex{
replicaIndex: int(replicaIndex),
serviceName: serviceName,
}]

if !exists {
return LogResult{},fmt.Errorf("%w: no entry for service %q and replica %d", ErrLoki, serviceName, replicaIndex)
}

// Query using the pod name to get the result
lc, err := c.getLokiClient(ctx)
if err != nil {
return LogResult{},err
}

httpQueryString := url.Values{}
httpQueryString.Set("start", fmt.Sprintf("%d", startTime.UnixNano()))
httpQueryString.Set("end", fmt.Sprintf("%d", endTime.UnixNano()))
httpQueryString.Set("limit", "1000") // TODO - configurable or something? Maybe user requestable?
// TODO - guard against injection here even though it is unlikely
lokiQuery := fmt.Sprintf("{pod=%q}", podName) // Note this is not JSON
// TODO - query by run index as well ? do I even need this
httpQueryString.Set("query", lokiQuery)

direction := FORWARD
if !forward {
direction = BACKWARD
}
httpQueryString.Set("direction", string(direction))

request, err := lc.CreateRequest(ctx, http.MethodGet, queryRangePath, nil )
if err != nil {
return LogResult{},err
}
request.URL.RawQuery = httpQueryString.Encode()

request.Header.Add(lokiOrgIdHeader, lidNS)

resp, err := lc.DoRequest(request)
if err != nil {
return LogResult{},fmt.Errorf("loki log request failed: %w", err)
}

if resp.StatusCode != 200 {
buf := &bytes.Buffer{}
_, _ = io.Copy(buf, resp.Body)
msg := strings.Trim(buf.String(), "\n\t\r")
return LogResult{},fmt.Errorf("%w: fetching logs from loki failed, got status code %d; %s", ErrLoki, resp.StatusCode, msg)
}

// TODO - parse response

decoder := json.NewDecoder(resp.Body)
lokiResult := lokiLogQueryResponse{}
err = decoder.Decode(&lokiResult)
if err != nil {
return LogResult{},fmt.Errorf("loki log request could not be decoded: %w", err)
}

result := LogResult{
ServiceName: serviceName,
ReplicaIndex: replicaIndex,
Entries: nil,
}
for _, resultSet := range lokiResult.Data.Result {
filepath, exists := resultSet.Stream[filenameLabel]
if !exists {
return LogResult{},fmt.Errorf("%w: expected loki log result set to have label %q but it does not", ErrLoki, filenameLabel)
}

_, filename := path.Split(filepath)
filenameParts := strings.SplitN(filename, ".", 2)

runIndex, err := strconv.ParseUint(filenameParts[0], 0, 31)
if err != nil {
return LogResult{},fmt.Errorf("expected to parse filename %q as integer for kubernetes run index: %w", filename, err)

}
for _, logEntry := range resultSet.Values {
if len(logEntry) != 2 {
return LogResult{},fmt.Errorf("%w: expected log entry to have 2 values, not %d", ErrLoki, len(logEntry))
}
timeStampStr := logEntry[0]

timestamp, err := strconv.ParseInt(timeStampStr,0, 64)
if err != nil {
return LogResult{},fmt.Errorf("could not parse log entry timestamp %q: %w", timeStampStr, err)
}

at := time.Unix(0, timestamp)
logLine := logEntry[1]

result.Entries = append(result.Entries, LogResultLine{
RunIndex: uint(runIndex),
At: at,
Line: logLine,
})

}
}

return result, nil
}

func (c *client) getLokiClient(ctx context.Context) (clusterutil.ServiceClient, error) {
c.lock.Lock()
defer c.lock.Unlock()

// TODO - reset this client on error from Loki
if c.client == nil {
var err error
c.client, err = c.sda.GetClient(ctx, false, false)
@@ -98,6 +251,7 @@ type lokiLabelValuesResponse struct {
Data []string `json:"data"`
}

// TODO - should this be an interface ?
type LogStatus struct {
ServiceName string
ReplicaIndex int
@@ -136,7 +290,7 @@ func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([
req.Header.Add(lokiOrgIdHeader, lidNS)
resp, err := lc.DoRequest(req)
if err != nil {
return nil, err
return nil, fmt.Errorf("loki label values request failed: %w", err)
}

if resp.StatusCode != 200 {
@@ -150,33 +304,30 @@ func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([
return nil, fmt.Errorf("decoding loki label values failed: %w", err)
}

// Assign pods a replica index by their name, so it is consistent. This is done elsewhere as well
podNames := make([]string, 0, len(possiblePods))
for podName := range possiblePods {
podNames = append(podNames, podName)
}
sort.Strings(podNames)
podNameToReplicaIndex := make(map[string]int)
for i, podName := range podNames {
podNameToReplicaIndex[podName] = i
}
podNameToPodData := podNamesToServiceNameAndReplicaIndex(possiblePods)
returnValue := make([]LogStatus, 0, len(podNameToPodData))
positionData := make(map[serviceNameAndReplicaIndex]int)

returnValue := make([]LogStatus, len(podNameToReplicaIndex))
// By default nothing is found
for possiblePodName, entry := range possiblePods {
replicaIndex := podNameToReplicaIndex[possiblePodName]
returnValue[replicaIndex] = LogStatus{
podData := podNameToPodData[possiblePodName]
positionData[serviceNameAndReplicaIndex{
replicaIndex: podData.replicaIndex,
serviceName: podData.serviceName,
}] = len(returnValue)
returnValue = append(returnValue, LogStatus{
ServiceName: entry.serviceName,
ReplicaIndex: replicaIndex,
}
ReplicaIndex: podData.replicaIndex,
})
}

// Mark each pod that is found by name
// Mark each pod that is found with logs
for _, podName := range result.Data {
i, exists := podNameToReplicaIndex[podName]
if !exists {
podData, exists := podNameToPodData[podName]
if ! exists {
continue
}
i := positionData[podData]
returnValue[i].Present = true
}

@@ -185,10 +336,62 @@ func (c *client) FindLogsByLease(ctx context.Context, leaseID mtypes.LeaseID) ([
return returnValue, nil
}

type serviceNameAndReplicaIndex struct{
replicaIndex int
serviceName string
}

func serviceNameAndReplicaIndexToPodName(input map[string]runEntry) map[serviceNameAndReplicaIndex]string {
result := make(map[serviceNameAndReplicaIndex]string)

withPartitionedPods(input, func(podName string, serviceName string, replicaIndex int){
result[serviceNameAndReplicaIndex{
replicaIndex: replicaIndex,
serviceName: serviceName,
}] = podName
})

return result
}

func withPartitionedPods(input map[string]runEntry, fn func(podName string, serviceName string, replicaIndex int)) {

// Assign pods a replica index by their name, so it is consistent. This is done elsewhere as well
partitionedPods := make(map[string][]string)
for podName, entry := range input {
listForService := partitionedPods[entry.serviceName]

listForService = append(listForService, podName)
partitionedPods[entry.serviceName] = listForService
}

for serviceName, podNames := range partitionedPods {
sort.Strings(podNames)

for i, podName := range podNames {
fn(podName, serviceName, i)
}
}
}

func podNamesToServiceNameAndReplicaIndex(input map[string]runEntry) map[string]serviceNameAndReplicaIndex {
podNameToReplicaIndex := make(map[string]serviceNameAndReplicaIndex)

withPartitionedPods(input, func(podName string, serviceName string, replicaIndex int){
podNameToReplicaIndex[podName] = serviceNameAndReplicaIndex{
replicaIndex: replicaIndex,
serviceName: serviceName,
}
})

return podNameToReplicaIndex
}

type runEntry struct {
restarts uint
serviceName string
}

func (c *client) detectRunsForLease(ctx context.Context, leaseID mtypes.LeaseID) (map[string]runEntry, error) {
// Containers can run more than once (i.e. a pod restarts containers when configured to do so)
// so this code picks up on that by looking into the labels
39 changes: 39 additions & 0 deletions provider/gateway/rest/router.go
Original file line number Diff line number Diff line change
@@ -167,6 +167,9 @@ func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopcl
logRouter.HandleFunc("/status",
logStatusHandler(log, lokiClient)).Methods(http.MethodGet)

logRouter.HandleFunc("/query/{serviceName}/{replicaIndex}",
logQueryHandler(log, lokiClient)).Methods(http.MethodGet)

srouter := lrouter.PathPrefix("/service/{serviceName}").Subrouter()
srouter.Use(
requireService(),
@@ -184,9 +187,45 @@ func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ipopcl
return router
}

func logQueryHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc{
return func(rw http.ResponseWriter, req *http.Request) {
leaseID := requestLeaseID(req)
// TODO - make sure the lease actually exists first

serviceName := mux.Vars(req)["serviceName"] // TODO validate this exists
replicaIndexStr := mux.Vars(req)["replicaIndex"]

replicaIndex, err := strconv.ParseUint(replicaIndexStr, 10, 31)
if err != nil {
logger.Error("could not parse path compeonent for replica index", "err", err, "replicaIndex", replicaIndexStr)
rw.WriteHeader(http.StatusNotFound)
return
}

logs, err := lokiClient.GetLogByService(req.Context(),leaseID, serviceName, uint(replicaIndex),
-1,
time.Now().Add(time.Hour * 72 * -1), // TODO - configurable
time.Now(), false)

if err != nil {
logger.Error("could not get logs for lease", "lease", leaseID, "err", err)
rw.WriteHeader(http.StatusInternalServerError)
return
}

rw.WriteHeader(http.StatusOK)
enc := json.NewEncoder(rw)
err = enc.Encode(logs)
if err != nil {
logger.Error("could not write lease log status", "lease", leaseID, "err", err)
}
}
}

func logStatusHandler(logger log.Logger, lokiClient loki.Client) http.HandlerFunc{
return func(rw http.ResponseWriter, req *http.Request) {
leaseID := requestLeaseID(req)
// TODO - make sure the lease actually exists first
result, err := lokiClient.FindLogsByLease(req.Context(), leaseID)
if err != nil {
logger.Error("could not get logs for lease", "lease", leaseID, "err", err)