From f3ebd93fbf9f3b2d7d5d24c91571ebd9eaab6bd7 Mon Sep 17 00:00:00 2001 From: dmicanzerofox Date: Wed, 23 May 2018 12:15:35 -0400 Subject: [PATCH] Postgres Surfacer w/ tests! refs #130 Removed Redundant Postgres Surfacer Event log Postgres: Factors out EventMetric param from distribution functions` --- examples/external/redis_probe.go | 2 +- surfacers/postgres/postgres.go | 295 ++++++++++++++++++++ surfacers/postgres/postgres_test.go | 99 +++++++ surfacers/postgres/proto/config.pb.go | 88 ++++++ surfacers/postgres/proto/config.proto | 8 + surfacers/postgres/tests/cloudprober.sh | 14 + surfacers/postgres/tests/docker-compose.yml | 11 + surfacers/postgres/tests/postgres.cfg | 17 ++ surfacers/proto/config.pb.go | 216 ++++++++------ surfacers/proto/config.proto | 3 + surfacers/surfacers.go | 3 + 11 files changed, 673 insertions(+), 83 deletions(-) create mode 100644 surfacers/postgres/postgres.go create mode 100644 surfacers/postgres/postgres_test.go create mode 100644 surfacers/postgres/proto/config.pb.go create mode 100644 surfacers/postgres/proto/config.proto create mode 100644 surfacers/postgres/tests/cloudprober.sh create mode 100644 surfacers/postgres/tests/docker-compose.yml create mode 100644 surfacers/postgres/tests/postgres.cfg diff --git a/examples/external/redis_probe.go b/examples/external/redis_probe.go index e85cb0cc..8e8ccdd6 100644 --- a/examples/external/redis_probe.go +++ b/examples/external/redis_probe.go @@ -48,8 +48,8 @@ import ( "time" "github.com/golang/protobuf/proto" - "github.com/google/cloudprober/probes/external/serverutils" epb "github.com/google/cloudprober/probes/external/proto" + "github.com/google/cloudprober/probes/external/serverutils" "github.com/hoisie/redis" ) diff --git a/surfacers/postgres/postgres.go b/surfacers/postgres/postgres.go new file mode 100644 index 00000000..8b6ae075 --- /dev/null +++ b/surfacers/postgres/postgres.go @@ -0,0 +1,295 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this postgres except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package postgres implements "postgres" surfacer. This surfacer type is in +// experimental phase right now. +package postgres + +import ( + "context" + "fmt" + "os" + + "github.com/google/cloudprober/logger" + "github.com/google/cloudprober/metrics" + + "database/sql" + "encoding/json" + configpb "github.com/google/cloudprober/surfacers/postgres/proto" + "github.com/lib/pq" + "strconv" + "time" +) + +type label struct { + key string + value string +} + +type pgMetric struct { + time time.Time + metricName string + value string + labels []label +} + +// labelsJSON takes the label array and formats it for insertion into +// postgres jsonb labels column, storing each label as k,v json object +func (c pgMetric) labelsJSON() (string, error) { + m := make(map[string]string) + for _, l := range c.labels { + m[l.key] = l.value + } + + bs, err := json.Marshal(m) + if err != nil { + return "", err + } + + return string(bs), nil +} + +func newPGMetric(t time.Time, metricName, val string, labels []label) pgMetric { + return pgMetric{ + time: t, + metricName: metricName, + value: val, + labels: labels, + } +} + +type pgDistribution struct { + *metrics.DistributionData + + metricName string + labels []label + timestamp time.Time +} + +// sumMetric creates the correctly named metric +// representing sum of the distribution +func (d pgDistribution) sumMetric() pgMetric { + return newPGMetric( + d.timestamp, + d.metricName+"_sum", + strconv.FormatFloat(d.Sum, 'f', -1, 64), + d.labels, + ) +} + +// countMetric creates the correctly named metric representing +// the count of the distribution +func (d pgDistribution) countMetric() pgMetric { + return newPGMetric( + d.timestamp, + d.metricName+"_count", + strconv.FormatInt(d.Count, 10), + d.labels, + ) +} + +// bucketMetrics creates and formats all metrics for each bucket in this distribution. +// each bucket is assigned a metric name suffixed with "_bucket" and labeled with the +// corresponding bucket as "le: {bucket}" +func (d pgDistribution) bucketMetrics() []pgMetric { + var val int64 + ms := []pgMetric{} + + for i := range d.LowerBounds { + val += d.BucketCounts[i] + var lb string + if i == len(d.LowerBounds)-1 { + lb = "+Inf" + } else { + lb = strconv.FormatFloat(d.LowerBounds[i+1], 'f', -1, 64) + } + labelsWithBucket := append(d.labels, label{"le", lb}) + ms = append(ms, newPGMetric(d.timestamp, d.metricName+"_bucket", strconv.FormatInt(val, 10), labelsWithBucket)) + } + + return ms +} + +// metricRows extracts all metrics to be insterted into postgres +// corresponding to the EventMEtric +func metricRows(em *metrics.EventMetrics) []pgMetric { + fmt.Printf("%+v\n", em) + cs := []pgMetric{} + + labels := []label{} + + for _, k := range em.LabelsKeys() { + labels = append(labels, label{k, em.Label(k)}) + } + + for _, metricName := range em.MetricsKeys() { + val := em.Metric(metricName) + + if mapVal, ok := val.(*metrics.Map); ok { + for _, k := range mapVal.Keys() { + labelsWithMap := append(labels, label{mapVal.MapName, k}) + cs = append(cs, newPGMetric(em.Timestamp, metricName, mapVal.GetKey(k).String(), labelsWithMap)) + } + continue + } + + if distVal, ok := val.(*metrics.Distribution); ok { + d := distVal.Data() + pgD := pgDistribution{d, metricName, labels, em.Timestamp} + + cs = append(cs, + pgD.sumMetric(), + pgD.countMetric(), + ) + + cs = append(cs, pgD.bucketMetrics()...) + + continue + } + + if _, ok := val.(metrics.String); ok { + newLabels := append(labels, label{"val", val.String()}) + cs = append(cs, newPGMetric(em.Timestamp, metricName, "1", newLabels)) + continue + } + + cs = append(cs, newPGMetric(em.Timestamp, metricName, val.String(), labels)) + } + return cs +} + +// PostgresSurfacer structures for writing to postgres. +type PostgresSurfacer struct { + // Configuration + c *configpb.SurfacerConf + + // Channel for incoming data. + writeChan chan *metrics.EventMetrics + + // Cloud logger + l *logger.Logger + + openDB func(connectionString string) (*sql.DB, error) + db *sql.DB +} + +// New initializes a PostgresSurfacer for inserting probe results into postgres +func New(config *configpb.SurfacerConf, l *logger.Logger) (*PostgresSurfacer, error) { + s := &PostgresSurfacer{ + c: config, + l: l, + openDB: func(cs string) (*sql.DB, error) { + return sql.Open("postgres", cs) + }, + } + return s, s.init() +} + +// writeMetrics parses events metrics into postgres rows, starts a transaction +// and inserts all discreet metric rows represented by the EventMetrics +func (s *PostgresSurfacer) writeMetrics(em *metrics.EventMetrics) error { + rows := metricRows(em) + + txn, err := s.db.Begin() + if err != nil { + return err + } + + stmt, err := txn.Prepare( + pq.CopyIn( + s.c.GetMetricsTableName(), "time", "metric_name", "value", "labels", + ), + ) + + if err != nil { + return err + } + + for _, r := range rows { + s, err := r.labelsJSON() + if err != nil { + return err + } + _, err = stmt.Exec(r.time, r.metricName, r.value, s) + if err != nil { + return err + } + } + + _, err = stmt.Exec() + if err != nil { + return err + } + + err = stmt.Close() + if err != nil { + return err + } + + err = txn.Commit() + if err != nil { + return err + } + + return nil +} + +// init connects to postgres +func (s *PostgresSurfacer) init() error { + var err error + fmt.Fprintf(os.Stdout, "%s\n", s.c.GetConnectionString()) + + s.db, err = s.openDB(s.c.GetConnectionString()) + if err != nil { + return err + } + + err = s.db.Ping() + if err != nil { + return err + } + + s.writeChan = make(chan *metrics.EventMetrics, 1000) + + // Start a goroutine to run forever, polling on the writeChan. Allows + // for the surfacer to write asynchronously to the serial port. + go func() { + defer s.db.Close() + + for { + em := <-s.writeChan + + // batch all metrics into a sql statement + if em.Kind != metrics.CUMULATIVE && em.Kind != metrics.GAUGE { + continue + } + + err := s.writeMetrics(em) + if err != nil { + fmt.Fprintf(os.Stdout, "%+v\n", err) + } + } + }() + + return nil +} + +// Write takes the data to be written +func (s *PostgresSurfacer) Write(ctx context.Context, em *metrics.EventMetrics) { + select { + case s.writeChan <- em: + default: + s.l.Warningf("PostgresSurfacer's write channel is full, dropping new data.") + } +} diff --git a/surfacers/postgres/postgres_test.go b/surfacers/postgres/postgres_test.go new file mode 100644 index 00000000..fd5a5d69 --- /dev/null +++ b/surfacers/postgres/postgres_test.go @@ -0,0 +1,99 @@ +package postgres + +import ( + "github.com/google/cloudprober/metrics" + "testing" + "time" +) + +func Test_metricRows_No_Distribution(t *testing.T) { + respCodesVal := metrics.NewMap("code", metrics.NewInt(0)) + respCodesVal.IncKeyBy("200", metrics.NewInt(19)) + ts := time.Now() + em := metrics.NewEventMetrics(ts). + AddMetric("sent", metrics.NewInt(32)). + AddMetric("rcvd", metrics.NewInt(22)). + AddMetric("latency", metrics.NewFloat(10.11111)). + AddMetric("resp_code", respCodesVal). + AddLabel("ptype", "http") + + rows := metricRows(em) + + if len(rows) != 4 { + t.Errorf("Expected %d rows, received: %d\n", 4, len(rows)) + } + + if !isRowExpected(rows[0], ts, "sent", "32", []label{{"ptype", "http"}}) { + t.Errorf("Incorrect Row found %+v", rows[0]) + } + + if !isRowExpected(rows[1], ts, "rcvd", "22", []label{{"ptype", "http"}}) { + t.Errorf("Incorrect Row found %+v", rows[1]) + } + + if !isRowExpected(rows[2], ts, "latency", "10.111", []label{{"ptype", "http"}}) { + t.Errorf("Incorrect Row found %+v", rows[2]) + } + + if !isRowExpected(rows[3], ts, "resp_code", "19", []label{{"ptype", "http"}, label{"code", "200"}}) { + t.Errorf("Incorrect Row found %+v", rows[3]) + } +} + +func Test_metricRows_With_Distribution(t *testing.T) { + respCodesVal := metrics.NewMap("code", metrics.NewInt(0)) + respCodesVal.IncKeyBy("200", metrics.NewInt(19)) + latencyVal := metrics.NewDistribution([]float64{1, 4}) + latencyVal.AddSample(0.5) + latencyVal.AddSample(5) + ts := time.Now() + em := metrics.NewEventMetrics(ts). + AddMetric("latency", latencyVal). + AddLabel("ptype", "http") + + rows := metricRows(em) + + if len(rows) != 5 { + t.Errorf("Expected %d rows, received: %d\n", 5, len(rows)) + } + + if !isRowExpected(rows[0], ts, "latency_sum", "5.5", []label{{"ptype", "http"}}) { + t.Errorf("Incorrect Row found %+v", rows[0]) + } + + if !isRowExpected(rows[1], ts, "latency_count", "2", []label{{"ptype", "http"}}) { + t.Errorf("Incorrect Row found %+v", rows[1]) + } + + if !isRowExpected(rows[2], ts, "latency_bucket", "1", []label{{"ptype", "http"}, label{"le", "1"}}) { + t.Errorf("Incorrect Row found %+v", rows[2]) + } + + if !isRowExpected(rows[3], ts, "latency_bucket", "1", []label{{"ptype", "http"}, label{"le", "4"}}) { + t.Errorf("Incorrect Row found %+v", rows[3]) + } + + if !isRowExpected(rows[4], ts, "latency_bucket", "2", []label{{"ptype", "http"}, label{"le", "+Inf"}}) { + t.Errorf("Incorrect Row found %+v", rows[4]) + } + +} + +func isRowExpected(row pgMetric, t time.Time, metricName string, value string, labels []label) bool { + if row.time != t { + return false + } + if row.metricName != metricName { + return false + } + if row.value != value { + return false + } + for i, l := range row.labels { + if l != labels[i] { + return false + } + } + + return true +} diff --git a/surfacers/postgres/proto/config.pb.go b/surfacers/postgres/proto/config.pb.go new file mode 100644 index 00000000..588ad346 --- /dev/null +++ b/surfacers/postgres/proto/config.pb.go @@ -0,0 +1,88 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: github.com/google/cloudprober/surfacers/postgres/proto/config.proto + +package cloudprober_surfacer_postgres + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type SurfacerConf struct { + ConnectionString *string `protobuf:"bytes,1,req,name=connection_string,json=connectionString" json:"connection_string,omitempty"` + MetricsTableName *string `protobuf:"bytes,2,req,name=metrics_table_name,json=metricsTableName" json:"metrics_table_name,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SurfacerConf) Reset() { *m = SurfacerConf{} } +func (m *SurfacerConf) String() string { return proto.CompactTextString(m) } +func (*SurfacerConf) ProtoMessage() {} +func (*SurfacerConf) Descriptor() ([]byte, []int) { + return fileDescriptor_config_ca5f6a08327971c1, []int{0} +} +func (m *SurfacerConf) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SurfacerConf.Unmarshal(m, b) +} +func (m *SurfacerConf) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SurfacerConf.Marshal(b, m, deterministic) +} +func (dst *SurfacerConf) XXX_Merge(src proto.Message) { + xxx_messageInfo_SurfacerConf.Merge(dst, src) +} +func (m *SurfacerConf) XXX_Size() int { + return xxx_messageInfo_SurfacerConf.Size(m) +} +func (m *SurfacerConf) XXX_DiscardUnknown() { + xxx_messageInfo_SurfacerConf.DiscardUnknown(m) +} + +var xxx_messageInfo_SurfacerConf proto.InternalMessageInfo + +func (m *SurfacerConf) GetConnectionString() string { + if m != nil && m.ConnectionString != nil { + return *m.ConnectionString + } + return "" +} + +func (m *SurfacerConf) GetMetricsTableName() string { + if m != nil && m.MetricsTableName != nil { + return *m.MetricsTableName + } + return "" +} + +func init() { + proto.RegisterType((*SurfacerConf)(nil), "cloudprober.surfacer.postgres.SurfacerConf") +} + +func init() { + proto.RegisterFile("github.com/google/cloudprober/surfacers/postgres/proto/config.proto", fileDescriptor_config_ca5f6a08327971c1) +} + +var fileDescriptor_config_ca5f6a08327971c1 = []byte{ + // 169 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x8b, 0xb1, 0x8e, 0xc2, 0x30, + 0x10, 0x05, 0x75, 0xe9, 0xce, 0xba, 0xe2, 0xce, 0x55, 0x9a, 0x93, 0x10, 0x15, 0x12, 0xc8, 0xfe, + 0x88, 0xf4, 0x14, 0x84, 0x3e, 0x72, 0x96, 0x8d, 0xb1, 0x14, 0xef, 0x46, 0xeb, 0xcd, 0xff, 0x23, + 0x42, 0x22, 0x28, 0xdf, 0xcc, 0x1b, 0xd3, 0xc4, 0xa4, 0xf7, 0xb9, 0x77, 0xc0, 0xd9, 0x47, 0xe6, + 0x38, 0xa2, 0x87, 0x91, 0xe7, 0xdb, 0x24, 0xdc, 0xa3, 0xf8, 0x32, 0xcb, 0x10, 0x00, 0xa5, 0xf8, + 0x89, 0x8b, 0x46, 0xc1, 0xe2, 0x27, 0x61, 0x65, 0x0f, 0x4c, 0x43, 0x8a, 0x6e, 0x19, 0xf6, 0xff, + 0x23, 0x71, 0x5b, 0xe2, 0xb6, 0x62, 0x9f, 0xcc, 0x4f, 0xbb, 0xc2, 0x86, 0x69, 0xb0, 0x47, 0xf3, + 0x07, 0x4c, 0x84, 0xa0, 0x89, 0xa9, 0x2b, 0x2a, 0x89, 0x62, 0xfd, 0xb5, 0xab, 0x0e, 0xdf, 0x97, + 0xdf, 0xb7, 0x68, 0x17, 0x6e, 0x4f, 0xc6, 0x66, 0x54, 0x49, 0x50, 0x3a, 0x0d, 0xfd, 0x88, 0x1d, + 0x85, 0x8c, 0x75, 0xf5, 0x7a, 0xaf, 0xe6, 0xfa, 0x14, 0xe7, 0x90, 0xf1, 0x11, 0x00, 0x00, 0xff, + 0xff, 0xae, 0xe9, 0x01, 0x19, 0xcf, 0x00, 0x00, 0x00, +} diff --git a/surfacers/postgres/proto/config.proto b/surfacers/postgres/proto/config.proto new file mode 100644 index 00000000..a6740608 --- /dev/null +++ b/surfacers/postgres/proto/config.proto @@ -0,0 +1,8 @@ +syntax = "proto2"; + +package cloudprober.surfacer.postgres; + +message SurfacerConf { + required string connection_string = 1; + required string metrics_table_name = 2; +} diff --git a/surfacers/postgres/tests/cloudprober.sh b/surfacers/postgres/tests/cloudprober.sh new file mode 100644 index 00000000..274af523 --- /dev/null +++ b/surfacers/postgres/tests/cloudprober.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env sh + + +psql -c "create database cloudprober" + +psql -c "create schema cloudprober" + +psql cloudprober -c "CREATE TABLE metrics ( + time TIMESTAMP WITH TIME ZONE, + metric_name text NOT NULL, + value DOUBLE PRECISION , + labels jsonb, + PRIMARY KEY (time, metric_name, labels) +)" \ No newline at end of file diff --git a/surfacers/postgres/tests/docker-compose.yml b/surfacers/postgres/tests/docker-compose.yml new file mode 100644 index 00000000..c61e357d --- /dev/null +++ b/surfacers/postgres/tests/docker-compose.yml @@ -0,0 +1,11 @@ +version: '3' +services: + postgres: + image: postgres:9.6 + ports: + - 5432:5432 + environment: + POSTGRES_USER: root + POSTGRES_PASSWORD: root + volumes: + - .:/docker-entrypoint-initdb.d \ No newline at end of file diff --git a/surfacers/postgres/tests/postgres.cfg b/surfacers/postgres/tests/postgres.cfg new file mode 100644 index 00000000..1e2bf4da --- /dev/null +++ b/surfacers/postgres/tests/postgres.cfg @@ -0,0 +1,17 @@ +probe { + name: "google_homepage" + type: HTTP + targets { + host_names: "www.google.com" + } + interval_msec: 5000 # 5s + timeout_msec: 1000 # 1s +} + +surfacer { + type: POSTGRES + postgres_surfacer { + connection_string: "postgresql://root:root@localhost/cloudprober?sslmode=disable" + metrics_table_name: "metrics" + } +} \ No newline at end of file diff --git a/surfacers/proto/config.pb.go b/surfacers/proto/config.pb.go index f62f99ed..b629eb94 100644 --- a/surfacers/proto/config.pb.go +++ b/surfacers/proto/config.pb.go @@ -1,26 +1,18 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // source: github.com/google/cloudprober/surfacers/proto/config.proto -/* -Package proto is a generated protocol buffer package. +package cloudprober_surfacer -It is generated from these files: - github.com/google/cloudprober/surfacers/proto/config.proto - -It has these top-level messages: - SurfacerDef -*/ -package proto - -import proto1 "github.com/golang/protobuf/proto" +import proto "github.com/golang/protobuf/proto" import fmt "fmt" import math "math" -import cloudprober_surfacer_prometheus "github.com/google/cloudprober/surfacers/prometheus/proto" -import cloudprober_surfacer_stackdriver "github.com/google/cloudprober/surfacers/stackdriver/proto" -import cloudprober_surfacer_file "github.com/google/cloudprober/surfacers/file/proto" +import proto3 "github.com/google/cloudprober/surfacers/file/proto" +import proto4 "github.com/google/cloudprober/surfacers/postgres/proto" +import proto1 "github.com/google/cloudprober/surfacers/prometheus/proto" +import proto2 "github.com/google/cloudprober/surfacers/stackdriver/proto" // Reference imports to suppress errors if they are not otherwise used. -var _ = proto1.Marshal +var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf @@ -28,7 +20,7 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto1.ProtoPackageIsVersion2 // please upgrade the proto package +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package // Enumeration for each type of surfacer we can parse and create type Type int32 @@ -38,6 +30,7 @@ const ( Type_PROMETHEUS Type = 1 Type_STACKDRIVER Type = 2 Type_FILE Type = 3 + Type_POSTGRES Type = 4 Type_USER_DEFINED Type = 99 ) @@ -46,6 +39,7 @@ var Type_name = map[int32]string{ 1: "PROMETHEUS", 2: "STACKDRIVER", 3: "FILE", + 4: "POSTGRES", 99: "USER_DEFINED", } var Type_value = map[string]int32{ @@ -53,6 +47,7 @@ var Type_value = map[string]int32{ "PROMETHEUS": 1, "STACKDRIVER": 2, "FILE": 3, + "POSTGRES": 4, "USER_DEFINED": 99, } @@ -62,17 +57,19 @@ func (x Type) Enum() *Type { return p } func (x Type) String() string { - return proto1.EnumName(Type_name, int32(x)) + return proto.EnumName(Type_name, int32(x)) } func (x *Type) UnmarshalJSON(data []byte) error { - value, err := proto1.UnmarshalJSONEnum(Type_value, data, "Type") + value, err := proto.UnmarshalJSONEnum(Type_value, data, "Type") if err != nil { return err } *x = Type(value) return nil } -func (Type) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (Type) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_config_dc94760276a55873, []int{0} +} type SurfacerDef struct { // This name is used for logging. If not defined, it's derived from the type. @@ -88,32 +85,58 @@ type SurfacerDef struct { // *SurfacerDef_PrometheusSurfacer // *SurfacerDef_StackdriverSurfacer // *SurfacerDef_FileSurfacer - Surfacer isSurfacerDef_Surfacer `protobuf_oneof:"surfacer"` - XXX_unrecognized []byte `json:"-"` + // *SurfacerDef_PostgresSurfacer + Surfacer isSurfacerDef_Surfacer `protobuf_oneof:"surfacer"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } -func (m *SurfacerDef) Reset() { *m = SurfacerDef{} } -func (m *SurfacerDef) String() string { return proto1.CompactTextString(m) } -func (*SurfacerDef) ProtoMessage() {} -func (*SurfacerDef) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (m *SurfacerDef) Reset() { *m = SurfacerDef{} } +func (m *SurfacerDef) String() string { return proto.CompactTextString(m) } +func (*SurfacerDef) ProtoMessage() {} +func (*SurfacerDef) Descriptor() ([]byte, []int) { + return fileDescriptor_config_dc94760276a55873, []int{0} +} +func (m *SurfacerDef) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SurfacerDef.Unmarshal(m, b) +} +func (m *SurfacerDef) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SurfacerDef.Marshal(b, m, deterministic) +} +func (dst *SurfacerDef) XXX_Merge(src proto.Message) { + xxx_messageInfo_SurfacerDef.Merge(dst, src) +} +func (m *SurfacerDef) XXX_Size() int { + return xxx_messageInfo_SurfacerDef.Size(m) +} +func (m *SurfacerDef) XXX_DiscardUnknown() { + xxx_messageInfo_SurfacerDef.DiscardUnknown(m) +} + +var xxx_messageInfo_SurfacerDef proto.InternalMessageInfo type isSurfacerDef_Surfacer interface { isSurfacerDef_Surfacer() } type SurfacerDef_PrometheusSurfacer struct { - PrometheusSurfacer *cloudprober_surfacer_prometheus.SurfacerConf `protobuf:"bytes,10,opt,name=prometheus_surfacer,json=prometheusSurfacer,oneof"` + PrometheusSurfacer *proto1.SurfacerConf `protobuf:"bytes,10,opt,name=prometheus_surfacer,json=prometheusSurfacer,oneof"` } type SurfacerDef_StackdriverSurfacer struct { - StackdriverSurfacer *cloudprober_surfacer_stackdriver.SurfacerConf `protobuf:"bytes,11,opt,name=stackdriver_surfacer,json=stackdriverSurfacer,oneof"` + StackdriverSurfacer *proto2.SurfacerConf `protobuf:"bytes,11,opt,name=stackdriver_surfacer,json=stackdriverSurfacer,oneof"` } type SurfacerDef_FileSurfacer struct { - FileSurfacer *cloudprober_surfacer_file.SurfacerConf `protobuf:"bytes,12,opt,name=file_surfacer,json=fileSurfacer,oneof"` + FileSurfacer *proto3.SurfacerConf `protobuf:"bytes,12,opt,name=file_surfacer,json=fileSurfacer,oneof"` +} +type SurfacerDef_PostgresSurfacer struct { + PostgresSurfacer *proto4.SurfacerConf `protobuf:"bytes,13,opt,name=postgres_surfacer,json=postgresSurfacer,oneof"` } func (*SurfacerDef_PrometheusSurfacer) isSurfacerDef_Surfacer() {} func (*SurfacerDef_StackdriverSurfacer) isSurfacerDef_Surfacer() {} func (*SurfacerDef_FileSurfacer) isSurfacerDef_Surfacer() {} +func (*SurfacerDef_PostgresSurfacer) isSurfacerDef_Surfacer() {} func (m *SurfacerDef) GetSurfacer() isSurfacerDef_Surfacer { if m != nil { @@ -136,55 +159,68 @@ func (m *SurfacerDef) GetType() Type { return Type_NONE } -func (m *SurfacerDef) GetPrometheusSurfacer() *cloudprober_surfacer_prometheus.SurfacerConf { +func (m *SurfacerDef) GetPrometheusSurfacer() *proto1.SurfacerConf { if x, ok := m.GetSurfacer().(*SurfacerDef_PrometheusSurfacer); ok { return x.PrometheusSurfacer } return nil } -func (m *SurfacerDef) GetStackdriverSurfacer() *cloudprober_surfacer_stackdriver.SurfacerConf { +func (m *SurfacerDef) GetStackdriverSurfacer() *proto2.SurfacerConf { if x, ok := m.GetSurfacer().(*SurfacerDef_StackdriverSurfacer); ok { return x.StackdriverSurfacer } return nil } -func (m *SurfacerDef) GetFileSurfacer() *cloudprober_surfacer_file.SurfacerConf { +func (m *SurfacerDef) GetFileSurfacer() *proto3.SurfacerConf { if x, ok := m.GetSurfacer().(*SurfacerDef_FileSurfacer); ok { return x.FileSurfacer } return nil } +func (m *SurfacerDef) GetPostgresSurfacer() *proto4.SurfacerConf { + if x, ok := m.GetSurfacer().(*SurfacerDef_PostgresSurfacer); ok { + return x.PostgresSurfacer + } + return nil +} + // XXX_OneofFuncs is for the internal use of the proto package. -func (*SurfacerDef) XXX_OneofFuncs() (func(msg proto1.Message, b *proto1.Buffer) error, func(msg proto1.Message, tag, wire int, b *proto1.Buffer) (bool, error), func(msg proto1.Message) (n int), []interface{}) { +func (*SurfacerDef) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { return _SurfacerDef_OneofMarshaler, _SurfacerDef_OneofUnmarshaler, _SurfacerDef_OneofSizer, []interface{}{ (*SurfacerDef_PrometheusSurfacer)(nil), (*SurfacerDef_StackdriverSurfacer)(nil), (*SurfacerDef_FileSurfacer)(nil), + (*SurfacerDef_PostgresSurfacer)(nil), } } -func _SurfacerDef_OneofMarshaler(msg proto1.Message, b *proto1.Buffer) error { +func _SurfacerDef_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { m := msg.(*SurfacerDef) // surfacer switch x := m.Surfacer.(type) { case *SurfacerDef_PrometheusSurfacer: - b.EncodeVarint(10<<3 | proto1.WireBytes) + b.EncodeVarint(10<<3 | proto.WireBytes) if err := b.EncodeMessage(x.PrometheusSurfacer); err != nil { return err } case *SurfacerDef_StackdriverSurfacer: - b.EncodeVarint(11<<3 | proto1.WireBytes) + b.EncodeVarint(11<<3 | proto.WireBytes) if err := b.EncodeMessage(x.StackdriverSurfacer); err != nil { return err } case *SurfacerDef_FileSurfacer: - b.EncodeVarint(12<<3 | proto1.WireBytes) + b.EncodeVarint(12<<3 | proto.WireBytes) if err := b.EncodeMessage(x.FileSurfacer); err != nil { return err } + case *SurfacerDef_PostgresSurfacer: + b.EncodeVarint(13<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.PostgresSurfacer); err != nil { + return err + } case nil: default: return fmt.Errorf("SurfacerDef.Surfacer has unexpected type %T", x) @@ -192,56 +228,69 @@ func _SurfacerDef_OneofMarshaler(msg proto1.Message, b *proto1.Buffer) error { return nil } -func _SurfacerDef_OneofUnmarshaler(msg proto1.Message, tag, wire int, b *proto1.Buffer) (bool, error) { +func _SurfacerDef_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { m := msg.(*SurfacerDef) switch tag { case 10: // surfacer.prometheus_surfacer - if wire != proto1.WireBytes { - return true, proto1.ErrInternalBadWireType + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType } - msg := new(cloudprober_surfacer_prometheus.SurfacerConf) + msg := new(proto1.SurfacerConf) err := b.DecodeMessage(msg) m.Surfacer = &SurfacerDef_PrometheusSurfacer{msg} return true, err case 11: // surfacer.stackdriver_surfacer - if wire != proto1.WireBytes { - return true, proto1.ErrInternalBadWireType + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType } - msg := new(cloudprober_surfacer_stackdriver.SurfacerConf) + msg := new(proto2.SurfacerConf) err := b.DecodeMessage(msg) m.Surfacer = &SurfacerDef_StackdriverSurfacer{msg} return true, err case 12: // surfacer.file_surfacer - if wire != proto1.WireBytes { - return true, proto1.ErrInternalBadWireType + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType } - msg := new(cloudprober_surfacer_file.SurfacerConf) + msg := new(proto3.SurfacerConf) err := b.DecodeMessage(msg) m.Surfacer = &SurfacerDef_FileSurfacer{msg} return true, err + case 13: // surfacer.postgres_surfacer + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(proto4.SurfacerConf) + err := b.DecodeMessage(msg) + m.Surfacer = &SurfacerDef_PostgresSurfacer{msg} + return true, err default: return false, nil } } -func _SurfacerDef_OneofSizer(msg proto1.Message) (n int) { +func _SurfacerDef_OneofSizer(msg proto.Message) (n int) { m := msg.(*SurfacerDef) // surfacer switch x := m.Surfacer.(type) { case *SurfacerDef_PrometheusSurfacer: - s := proto1.Size(x.PrometheusSurfacer) - n += proto1.SizeVarint(10<<3 | proto1.WireBytes) - n += proto1.SizeVarint(uint64(s)) + s := proto.Size(x.PrometheusSurfacer) + n += 1 // tag and wire + n += proto.SizeVarint(uint64(s)) n += s case *SurfacerDef_StackdriverSurfacer: - s := proto1.Size(x.StackdriverSurfacer) - n += proto1.SizeVarint(11<<3 | proto1.WireBytes) - n += proto1.SizeVarint(uint64(s)) + s := proto.Size(x.StackdriverSurfacer) + n += 1 // tag and wire + n += proto.SizeVarint(uint64(s)) n += s case *SurfacerDef_FileSurfacer: - s := proto1.Size(x.FileSurfacer) - n += proto1.SizeVarint(12<<3 | proto1.WireBytes) - n += proto1.SizeVarint(uint64(s)) + s := proto.Size(x.FileSurfacer) + n += 1 // tag and wire + n += proto.SizeVarint(uint64(s)) + n += s + case *SurfacerDef_PostgresSurfacer: + s := proto.Size(x.PostgresSurfacer) + n += 1 // tag and wire + n += proto.SizeVarint(uint64(s)) n += s case nil: default: @@ -251,35 +300,38 @@ func _SurfacerDef_OneofSizer(msg proto1.Message) (n int) { } func init() { - proto1.RegisterType((*SurfacerDef)(nil), "cloudprober.surfacer.SurfacerDef") - proto1.RegisterEnum("cloudprober.surfacer.Type", Type_name, Type_value) + proto.RegisterType((*SurfacerDef)(nil), "cloudprober.surfacer.SurfacerDef") + proto.RegisterEnum("cloudprober.surfacer.Type", Type_name, Type_value) } func init() { - proto1.RegisterFile("github.com/google/cloudprober/surfacers/proto/config.proto", fileDescriptor0) + proto.RegisterFile("github.com/google/cloudprober/surfacers/proto/config.proto", fileDescriptor_config_dc94760276a55873) } -var fileDescriptor0 = []byte{ - // 335 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x90, 0x41, 0x4f, 0xc2, 0x30, - 0x14, 0xc7, 0x19, 0xee, 0x80, 0x6f, 0x88, 0x4b, 0xe1, 0x40, 0x38, 0x11, 0x2f, 0x12, 0x13, 0xbb, - 0x84, 0xa3, 0x17, 0xa3, 0xac, 0x04, 0xa2, 0x0c, 0xd3, 0x81, 0x57, 0x84, 0xd2, 0x8d, 0x45, 0xa0, - 0x4b, 0xd9, 0x4c, 0xf8, 0xde, 0x7e, 0x00, 0xb3, 0xc5, 0xba, 0x85, 0xf4, 0x80, 0xb7, 0xed, 0xbd, - 0xdf, 0xfb, 0xfd, 0xb7, 0x3f, 0x3c, 0x84, 0x51, 0xb2, 0x49, 0x57, 0x98, 0x89, 0x9d, 0x13, 0x0a, - 0x11, 0x6e, 0xb9, 0xc3, 0xb6, 0x22, 0x5d, 0xc7, 0x52, 0xac, 0xb8, 0x74, 0x0e, 0xa9, 0x0c, 0x96, - 0x8c, 0xcb, 0x83, 0x13, 0x4b, 0x91, 0x08, 0x87, 0x89, 0x7d, 0x10, 0x85, 0x38, 0x7f, 0x41, 0xad, - 0x12, 0x89, 0x15, 0xd9, 0x21, 0xff, 0x30, 0xee, 0x78, 0xb2, 0xe1, 0xa9, 0x4e, 0xde, 0x19, 0x9e, - 0xab, 0x39, 0x24, 0x4b, 0xf6, 0xb9, 0x96, 0xd1, 0x17, 0x97, 0x3a, 0xcf, 0xe3, 0xb9, 0x9e, 0x20, - 0xda, 0x72, 0x8d, 0xe0, 0xe6, 0xbb, 0x0a, 0x96, 0xff, 0x4b, 0xb9, 0x3c, 0x40, 0x08, 0xcc, 0xfd, - 0x72, 0xc7, 0xdb, 0x46, 0xd7, 0xe8, 0x5d, 0xd2, 0xfc, 0x19, 0x61, 0x30, 0x93, 0x63, 0xcc, 0xdb, - 0xd5, 0xae, 0xd1, 0x6b, 0xf4, 0x3b, 0x58, 0x57, 0x0c, 0x9e, 0x1d, 0x63, 0x4e, 0x73, 0x0e, 0x7d, - 0x40, 0xb3, 0xf8, 0xfb, 0x85, 0x22, 0xda, 0xd0, 0x35, 0x7a, 0x56, 0xff, 0x5e, 0x7f, 0x5e, 0x1c, - 0x60, 0xf5, 0x39, 0x03, 0xb1, 0x0f, 0x46, 0x15, 0x8a, 0x8a, 0x95, 0xda, 0x20, 0x06, 0xad, 0x52, - 0x31, 0x45, 0x84, 0x95, 0x47, 0x60, 0x7d, 0x44, 0xe9, 0xe2, 0x34, 0xa3, 0x59, 0xda, 0xfd, 0x85, - 0x78, 0x70, 0x95, 0xb5, 0x56, 0xd8, 0xeb, 0xb9, 0xfd, 0x56, 0x6f, 0xcf, 0xd0, 0x53, 0x6d, 0x3d, - 0x1b, 0xaa, 0xd9, 0x33, 0x40, 0x4d, 0xd1, 0x77, 0x13, 0x30, 0xb3, 0xc2, 0x50, 0x0d, 0x4c, 0x6f, - 0xea, 0x11, 0xbb, 0x82, 0x1a, 0x00, 0x6f, 0x74, 0x3a, 0x21, 0xb3, 0x11, 0x99, 0xfb, 0xb6, 0x81, - 0xae, 0xc1, 0xf2, 0x67, 0x4f, 0x83, 0x17, 0x97, 0x8e, 0xdf, 0x09, 0xb5, 0xab, 0x19, 0x3a, 0x1c, - 0xbf, 0x12, 0xfb, 0x02, 0xd9, 0x50, 0x9f, 0xfb, 0x84, 0x2e, 0x5c, 0x32, 0x1c, 0x7b, 0xc4, 0xb5, - 0xd9, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x3b, 0xb9, 0x5b, 0x96, 0xe8, 0x02, 0x00, 0x00, +var fileDescriptor_config_dc94760276a55873 = []byte{ + // 381 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x91, 0xc1, 0xcf, 0xd2, 0x30, + 0x18, 0xc6, 0xbf, 0x7d, 0xdf, 0x0e, 0xf8, 0x6e, 0x60, 0x2d, 0x1c, 0x08, 0x27, 0xe2, 0x45, 0xa2, + 0xb1, 0x4b, 0x38, 0x7a, 0x31, 0xca, 0x8a, 0x10, 0x75, 0x90, 0x6e, 0x78, 0xd0, 0x03, 0xc2, 0xe8, + 0xc6, 0x22, 0xd0, 0xa5, 0xdb, 0x4c, 0xf8, 0x2f, 0xfd, 0x93, 0xcc, 0x16, 0xca, 0x16, 0x52, 0x13, + 0xbc, 0x6d, 0x7d, 0x9f, 0xe7, 0xf7, 0xb4, 0xcf, 0x0b, 0xef, 0xe2, 0x24, 0xdf, 0x17, 0x5b, 0x12, + 0x8a, 0xa3, 0x13, 0x0b, 0x11, 0x1f, 0xb8, 0x13, 0x1e, 0x44, 0xb1, 0x4b, 0xa5, 0xd8, 0x72, 0xe9, + 0x64, 0x85, 0x8c, 0x36, 0x21, 0x97, 0x99, 0x93, 0x4a, 0x91, 0x0b, 0x27, 0x14, 0xa7, 0x28, 0x89, + 0x49, 0xf5, 0x83, 0x7b, 0x0d, 0x25, 0x51, 0xca, 0x01, 0xfd, 0x0f, 0xe2, 0x91, 0xe7, 0x7b, 0x5e, + 0xe8, 0xe0, 0x83, 0xe9, 0xbd, 0x98, 0x2c, 0xdf, 0x84, 0xbf, 0x76, 0x32, 0xf9, 0xcd, 0xa5, 0x8e, + 0xf3, 0xfe, 0x5e, 0x4e, 0x94, 0x1c, 0xb8, 0x0e, 0x30, 0xb9, 0xfb, 0x3d, 0x22, 0xcb, 0x63, 0xc9, + 0x75, 0xaf, 0x79, 0xf9, 0xe7, 0x09, 0x2c, 0xff, 0xa2, 0x74, 0x79, 0x84, 0x31, 0x98, 0xa7, 0xcd, + 0x91, 0xf7, 0x8d, 0xa1, 0x31, 0x7a, 0xc6, 0xaa, 0x6f, 0x4c, 0xc0, 0xcc, 0xcf, 0x29, 0xef, 0x3f, + 0x0e, 0x8d, 0x51, 0x67, 0x3c, 0x20, 0xba, 0x76, 0x49, 0x70, 0x4e, 0x39, 0xab, 0x74, 0xf8, 0x27, + 0x74, 0xeb, 0x0a, 0xd7, 0x4a, 0xd1, 0x87, 0xa1, 0x31, 0xb2, 0xc6, 0x6f, 0xf5, 0xf6, 0xda, 0x40, + 0xd4, 0x75, 0x26, 0xe2, 0x14, 0xcd, 0x1e, 0x18, 0xae, 0x47, 0x6a, 0x82, 0x43, 0xe8, 0x35, 0xda, + 0xad, 0x23, 0xac, 0x2a, 0x82, 0xe8, 0x23, 0x1a, 0x8e, 0xdb, 0x8c, 0x6e, 0x63, 0x76, 0x0d, 0xf1, + 0xa0, 0x5d, 0x56, 0x5f, 0xd3, 0xed, 0x8a, 0xfe, 0x4a, 0x4f, 0x2f, 0xa5, 0xb7, 0x58, 0xbb, 0x3c, + 0xbc, 0xf2, 0xbe, 0xc3, 0x0b, 0xb5, 0x89, 0x9a, 0xd9, 0xae, 0x98, 0x6f, 0xfe, 0x51, 0xca, 0x45, + 0x7e, 0xcb, 0x45, 0x6a, 0xa0, 0xce, 0x3f, 0x02, 0xb4, 0x94, 0xeb, 0xf5, 0x0f, 0x30, 0xcb, 0x65, + 0xe0, 0x16, 0x98, 0xde, 0xc2, 0xa3, 0xe8, 0x01, 0x77, 0x00, 0x96, 0x6c, 0xf1, 0x95, 0x06, 0x33, + 0xba, 0xf2, 0x91, 0x81, 0x9f, 0x83, 0xe5, 0x07, 0x1f, 0x26, 0x9f, 0x5d, 0x36, 0xff, 0x46, 0x19, + 0x7a, 0x2c, 0xa5, 0xd3, 0xf9, 0x17, 0x8a, 0x9e, 0xb0, 0x0d, 0xad, 0xe5, 0xc2, 0x0f, 0x3e, 0x31, + 0xea, 0x23, 0x13, 0x23, 0xb0, 0x57, 0x3e, 0x65, 0x6b, 0x97, 0x4e, 0xe7, 0x1e, 0x75, 0x51, 0xf8, + 0x37, 0x00, 0x00, 0xff, 0xff, 0x2f, 0xee, 0xd1, 0x34, 0x97, 0x03, 0x00, 0x00, } diff --git a/surfacers/proto/config.proto b/surfacers/proto/config.proto index 6e89d911..83aaf4df 100644 --- a/surfacers/proto/config.proto +++ b/surfacers/proto/config.proto @@ -3,6 +3,7 @@ syntax = "proto2"; import "github.com/google/cloudprober/surfacers/prometheus/proto/config.proto"; import "github.com/google/cloudprober/surfacers/stackdriver/proto/config.proto"; import "github.com/google/cloudprober/surfacers/file/proto/config.proto"; +import "github.com/google/cloudprober/surfacers/postgres/proto/config.proto"; package cloudprober.surfacer; @@ -12,6 +13,7 @@ enum Type { PROMETHEUS = 1; STACKDRIVER = 2; FILE = 3; + POSTGRES = 4; USER_DEFINED = 99; } @@ -30,5 +32,6 @@ message SurfacerDef { prometheus.SurfacerConf prometheus_surfacer = 10; stackdriver.SurfacerConf stackdriver_surfacer = 11; file.SurfacerConf file_surfacer = 12; + postgres.SurfacerConf postgres_surfacer = 13; } } diff --git a/surfacers/surfacers.go b/surfacers/surfacers.go index 89c18c3f..894b75f7 100644 --- a/surfacers/surfacers.go +++ b/surfacers/surfacers.go @@ -32,6 +32,7 @@ import ( "github.com/google/cloudprober/logger" "github.com/google/cloudprober/metrics" "github.com/google/cloudprober/surfacers/file" + "github.com/google/cloudprober/surfacers/postgres" "github.com/google/cloudprober/surfacers/prometheus" "github.com/google/cloudprober/surfacers/stackdriver" @@ -74,6 +75,8 @@ func initSurfacer(s *surfacerpb.SurfacerDef) (Surfacer, error) { return stackdriver.New(s.GetStackdriverSurfacer(), l) case surfacerpb.Type_FILE: return file.New(s.GetFileSurfacer(), l) + case surfacerpb.Type_POSTGRES: + return postgres.New(s.GetPostgresSurfacer(), l) case surfacerpb.Type_USER_DEFINED: userDefinedSurfacersMu.Lock() defer userDefinedSurfacersMu.Unlock()