Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry)_: replace telemetry with prometheus metrics #6256

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/geth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/status-im/status-go/internal/sentry"
"github.com/status-im/status-go/internal/version"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/metrics"
"github.com/status-im/status-go/multiaccounts"
"github.com/status-im/status-go/multiaccounts/accounts"
multiacccommon "github.com/status-im/status-go/multiaccounts/common"
Expand Down Expand Up @@ -103,6 +104,7 @@ type GethStatusBackend struct {
allowAllRPC bool // used only for tests, disables api method restrictions
LocalPairingStateManager *statecontrol.ProcessStateManager
centralizedMetrics *centralizedmetrics.MetricService
prometheusMetrics *metrics.Server
sentryDSN string

logger *zap.Logger
Expand Down Expand Up @@ -267,6 +269,15 @@ func (b *GethStatusBackend) AcceptTerms() error {
return b.multiaccountsDB.UpdateHasAcceptedTerms(accounts[0].KeyUID, true)
}

func (b *GethStatusBackend) StartPrometheusMetricsServer(address string) error {
if b.prometheusMetrics != nil {
return nil
}
b.prometheusMetrics = metrics.NewMetricsServer(address, nil)
go b.prometheusMetrics.Listen()
return nil
}

func (b *GethStatusBackend) getAccountByKeyUID(keyUID string) (*multiaccounts.Account, error) {
b.mu.Lock()
defer b.mu.Unlock()
Expand Down
14 changes: 6 additions & 8 deletions cmd/status-cli/util.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"errors"
"fmt"
"os"
Expand All @@ -12,11 +11,11 @@ import (

"github.com/status-im/status-go/api"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/metrics/wakumetrics"
"github.com/status-im/status-go/multiaccounts"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/services/wakuv2ext"
"github.com/status-im/status-go/telemetry"

"github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -74,14 +73,12 @@ func start(p StartParams, logger *zap.SugaredLogger) (*StatusCLI, error) {
}

if p.TelemetryURL != "" {
telemetryLogger, err := getLogger(true)
waku := backend.StatusNode().WakuV2Service()
telemetryClient, err := wakumetrics.NewClient(wakumetrics.WithPeerID(waku.PeerID().String()))
if err != nil {
return nil, err
}
waku := backend.StatusNode().WakuV2Service()
telemetryClient := telemetry.NewClient(telemetryLogger, p.TelemetryURL, backend.SelectedAccountKeyID(), p.Name, "cli", telemetry.WithPeerID(waku.PeerID().String()))
telemetryClient.Start(context.Background())
backend.StatusNode().WakuV2Service().SetStatusTelemetryClient(telemetryClient)
backend.StatusNode().WakuV2Service().SetMetricsHandler(telemetryClient)
}
wakuAPI := wakuv2ext.NewPublicAPI(wakuService)

Expand Down Expand Up @@ -152,7 +149,8 @@ func createAccountAndLogin(b *api.GethStatusBackend, rootDataDir, password strin
HTTPHost: "127.0.0.1",
HTTPPort: p.Port,
},
TelemetryServerURL: p.TelemetryURL,
TelemetryServerURL: p.TelemetryURL,
WakuV2EnableMissingMessageVerification: true,
}
return b.CreateAccountAndLogin(req,
params.WithFleet(p.Fleet),
Expand Down
3 changes: 2 additions & 1 deletion cmd/statusd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os/signal"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -313,7 +314,7 @@ func main() {
if *metricsEnabled || gethmetrics.Enabled {
go startCollectingNodeMetrics(interruptCh, backend.StatusNode())
go gethmetrics.CollectProcessMetrics(3 * time.Second)
go metrics.NewMetricsServer(*metricsPort, gethmetrics.DefaultRegistry).Listen()
go metrics.NewMetricsServer("localhost:"+strconv.Itoa(*metricsPort), gethmetrics.DefaultRegistry).Listen()
}

// Check if profiling shall be enabled.
Expand Down
23 changes: 17 additions & 6 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package metrics

import (
"fmt"
"net/http"
"time"

Expand All @@ -22,13 +21,13 @@ type Server struct {
server *http.Server
}

func NewMetricsServer(port int, r metrics.Registry) *Server {
func NewMetricsServer(address string, r metrics.Registry) *Server {
mux := http.NewServeMux()
mux.Handle("/health", healthHandler())
mux.Handle("/metrics", Handler(r))
p := Server{
server: &http.Server{
Addr: fmt.Sprintf(":%d", port),
Addr: address,
ReadHeaderTimeout: 5 * time.Second,
Handler: mux,
},
Expand All @@ -48,9 +47,13 @@ func healthHandler() http.Handler {
func Handler(reg metrics.Registry) http.Handler {
// we disable compression because geth doesn't support it
opts := promhttp.HandlerOpts{DisableCompression: true}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can enable compression, if geth metrics are not exposed?

// we are combining handlers to avoid having 2 endpoints
statusMetrics := promhttp.HandlerFor(prom.DefaultGatherer, opts) // our metrics
gethMetrics := gethprom.Handler(reg) // geth metrics
// we are using only our own metrics
statusMetrics := promhttp.HandlerFor(prom.DefaultGatherer, opts)
if reg == nil {
return statusMetrics
}
// if registry is provided, combine handlers
gethMetrics := gethprom.Handler(reg)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
statusMetrics.ServeHTTP(w, r)
gethMetrics.ServeHTTP(w, r)
Expand All @@ -62,3 +65,11 @@ func (p *Server) Listen() {
defer common.LogOnPanic()
logutils.ZapLogger().Info("metrics server stopped", zap.Error(p.server.ListenAndServe()))
}

// Stop gracefully shuts down the metrics server
func (p *Server) Stop() error {
if p.server != nil {
return p.server.Close()
}
return nil
}
183 changes: 183 additions & 0 deletions metrics/wakumetrics/client.go
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here, is it actually wakumetrics? Not just any metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metrics in this file are specifically related to waku

Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package wakumetrics

import (
"fmt"
"strconv"

"github.com/status-im/status-go/protocol/transport"
wakutypes "github.com/status-im/status-go/waku/types"
"github.com/status-im/status-go/wakuv2"

v1protocol "github.com/status-im/status-go/protocol/v1"
v2common "github.com/status-im/status-go/wakuv2/common"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
)

type ReceivedMessages struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose prometheus marshals these types to json?
Then we should explicitly define the json tags for each of these structs fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are just used for the function arguments in PushReceivedMetrics, the data pushed to prometheus is a numerical value and string labels. Each metric defines its own keys for the labels.

Filter transport.Filter
SSHMessage *wakutypes.Message
Messages []*v1protocol.StatusMessage
}

type Client struct {
peerId string
deviceType string
version string
lastPeerConnFailures map[string]int
}

type TelemetryClientOption func(*Client)

func WithPeerID(peerId string) TelemetryClientOption {
return func(c *Client) {
c.peerId = peerId
nodePeerId.WithLabelValues(peerId).Set(1)
}
}

func WithDeviceType(deviceType string) TelemetryClientOption {
return func(c *Client) {
c.deviceType = deviceType
}
}

func WithVersion(version string) TelemetryClientOption {
return func(c *Client) {
c.version = version
}
}

func NewClient(opts ...TelemetryClientOption) (*Client, error) {
client := &Client{
lastPeerConnFailures: make(map[string]int),
}

for _, opt := range opts {
opt(client)
}

return client, nil
}

// RegisterWithRegistry registers all metrics with the provided registry
func (c *Client) RegisterWithRegistry() error {
if err := RegisterMetrics(); err != nil {
return fmt.Errorf("failed to register metrics: %v", err)
}
return nil
}

func (c *Client) SetDeviceType(deviceType string) {
c.deviceType = deviceType
}

func (c *Client) PushReceivedMessages(receivedMessages ReceivedMessages) {
messagesReceivedTotal.WithLabelValues(
receivedMessages.Filter.PubsubTopic,
receivedMessages.Filter.ContentTopic.String(),
receivedMessages.Filter.ChatID,
).Add(float64(len(receivedMessages.Messages)))
}

func (c *Client) PushSentEnvelope(sentEnvelope wakuv2.SentEnvelope) {
EnvelopeSentTotal.WithLabelValues(
sentEnvelope.Envelope.PubsubTopic(),
sentEnvelope.Envelope.Message().ContentTopic,
sentEnvelope.PublishMethod.String(),
).Inc()
}

func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) {
envelopeSentErrors.WithLabelValues(
errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(),
errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic,
).Inc()
}

func (c *Client) PushPeerCount(peerCount int) {
connectedPeers.Set(float64(peerCount))
}

func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) {
for peerID, failures := range peerConnFailures {
if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists {
if failures == lastFailures {
continue
}
}
c.lastPeerConnFailures[peerID] = failures
peerConnectionFailures.Add(float64(failures))
}
}

func (c *Client) PushMessageCheckSuccess() {
storeQuerySuccesses.Inc()
}

func (c *Client) PushMessageCheckFailure() {
storeQueryFailures.Inc()
}

func (c *Client) PushPeerCountByShard(peerCountByShard map[uint16]uint) {
for shard, count := range peerCountByShard {
peersByShard.WithLabelValues(strconv.FormatUint(uint64(shard), 10)).Set(float64(count))
}
}

func (c *Client) PushPeerCountByOrigin(peerCountByOrigin map[wps.Origin]uint) {
for origin, count := range peerCountByOrigin {
peersByOrigin.WithLabelValues(getOriginString(origin)).Set(float64(count))
}
}

func (c *Client) PushDialFailure(dialFailure v2common.DialError) {
peerDialFailures.WithLabelValues(
dialFailure.ErrType.String(),
dialFailure.Protocols,
).Inc()
}

func (c *Client) PushMissedMessage(envelope *v2protocol.Envelope) {
missedMessages.WithLabelValues(
envelope.PubsubTopic(),
envelope.Message().ContentTopic,
).Inc()
}

func (c *Client) PushMissedRelevantMessage(receivedMessage *v2common.ReceivedMessage) {
missedMessages.WithLabelValues(
receivedMessage.PubsubTopic,
receivedMessage.ContentTopic.String(),
).Inc()
}

func (c *Client) PushMessageDeliveryConfirmed() {
messageDeliveryConfirmations.Inc()
}

func (c *Client) PushSentMessageTotal(messageSize uint32, publishMethod string) {
wakuMessagesSizeBytes.WithLabelValues(publishMethod).Add(float64(messageSize))
messagesSentTotal.WithLabelValues(publishMethod).Inc()
}

func getOriginString(origin wps.Origin) string {
switch origin {
case wps.Unknown:
return "unknown"
case wps.Discv5:
return "discv5"
case wps.Static:
return "static"
case wps.PeerExchange:
return "peer_exchange"
case wps.DNSDiscovery:
return "dns_discovery"
case wps.Rendezvous:
return "rendezvous"
case wps.PeerManager:
return "peer_manager"
default:
return "unknown"
}
}
Loading
Loading