Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define a TelemetryBlob type to hold JSON blobs #42

Merged
merged 1 commit into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,11 +576,22 @@ func (tc *TelemetryClient) Register() (err error) {
return nil
}

func (tc *TelemetryClient) Generate(telemetry types.TelemetryType, content []byte, tags types.Tags) error {
// Enforce size limits
tdl := telemetrylib.NewTelemetryDataLimits()
err := tdl.CheckLimits(content)
if err != nil {
func (tc *TelemetryClient) Generate(telemetry types.TelemetryType, content *types.TelemetryBlob, tags types.Tags) error {
// Enforce valid versioned JSON object
if err := content.Valid(); err != nil {
slog.Debug(
"Supplied content is not a versioned JSON object",
slog.String("error", err.Error()),
)
return err
}

// Enforce content size limits
if err := content.CheckLimits(); err != nil {
slog.Debug(
"Supplied JSON blob failed limits check",
slog.String("error", err.Error()),
)
return err
}

Expand All @@ -589,7 +600,7 @@ func (tc *TelemetryClient) Generate(telemetry types.TelemetryType, content []byt
"Generated Telemetry",
slog.String("name", telemetry.String()),
slog.String("tags", tags.String()),
slog.String("content", string(content)),
slog.String("content", content.String()),
)

return tc.processor.AddData(telemetry, content, tags)
Expand Down
26 changes: 9 additions & 17 deletions pkg/lib/items.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package telemetrylib

import (
"database/sql"
"fmt"
"encoding/json"
"log/slog"
"strings"

Expand All @@ -13,17 +13,12 @@ import (

type TelemetryDataItem struct {
Header TelemetryDataItemHeader `json:"header" validate:"required"`
TelemetryData map[string]interface{} `json:"telemetryData" validate:"required,dive"`
TelemetryData json.RawMessage `json:"telemetryData" validate:"required,dive"`
Footer TelemetryDataItemFooter `json:"footer" validate:"required"`
}

// func NewTelemetryDataItem(telemetry types.TelemetryType, tags types.Tags, data map[string]interface{}) *TelemetryDataItem {
func NewTelemetryDataItem(telemetry types.TelemetryType, tags types.Tags, marshaledData []byte) (*TelemetryDataItem, error) {
data, err := utils.DeserializeMap(string(marshaledData))
if err != nil {
return nil, fmt.Errorf("unable to unmarshal JSON: %s", err.Error())
}

func NewTelemetryDataItem(telemetry types.TelemetryType, tags types.Tags, content *types.TelemetryBlob) *TelemetryDataItem {
tdi := new(TelemetryDataItem)

// fill in header fields
Expand All @@ -35,12 +30,12 @@ func NewTelemetryDataItem(telemetry types.TelemetryType, tags types.Tags, marsha
}

// fill in body
tdi.TelemetryData = data
tdi.TelemetryData = content.Bytes()

// fill in footer
tdi.Footer.Checksum = "ichecksum" // TODO

return tdi, nil
return tdi
}

type TelemetryDataItemHeader struct {
Expand Down Expand Up @@ -84,22 +79,19 @@ type TelemetryDataItemRow struct {
BundleId sql.NullInt64
}

func NewTelemetryDataItemRow(telemetry types.TelemetryType, tags types.Tags, marshaledData []byte) (*TelemetryDataItemRow, error) {
func NewTelemetryDataItemRow(telemetry types.TelemetryType, tags types.Tags, content *types.TelemetryBlob) *TelemetryDataItemRow {

item, err := NewTelemetryDataItem(telemetry, tags, marshaledData)
if err != nil {
return nil, fmt.Errorf("unable to create a new telemetry data item: %s", err.Error())
}
item := NewTelemetryDataItem(telemetry, tags, content)

dataItemRow := new(TelemetryDataItemRow)
dataItemRow.ItemId = item.Header.TelemetryId
dataItemRow.ItemType = item.Header.TelemetryType
dataItemRow.ItemTimestamp = item.Header.TelemetryTimeStamp
dataItemRow.ItemAnnotations = strings.Join(item.Header.TelemetryAnnotations, ",")
dataItemRow.ItemData = marshaledData
dataItemRow.ItemData = content.Bytes()
dataItemRow.ItemChecksum = item.Footer.Checksum

return dataItemRow, nil
return dataItemRow

}

Expand Down
17 changes: 6 additions & 11 deletions pkg/lib/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/SUSE/telemetry/pkg/config"
"github.com/SUSE/telemetry/pkg/types"
"github.com/SUSE/telemetry/pkg/utils"
)

type TelemetryProcessor interface {
Expand All @@ -16,7 +15,7 @@ type TelemetryProcessor interface {
// Add telemetry data - a method to process jsonData as a byte[]
AddData(
telemetry types.TelemetryType,
content []byte,
content *types.TelemetryBlob,
tags types.Tags,
) (err error)

Expand Down Expand Up @@ -106,14 +105,10 @@ func NewTelemetryProcessor(cfg *config.DBConfig) (TelemetryProcessor, error) {
return &p, err
}

func (p *TelemetryProcessorImpl) AddData(telemetry types.TelemetryType, marshaledData []byte, tags types.Tags) (err error) {
dataItemRow, err := NewTelemetryDataItemRow(telemetry, tags, marshaledData)
if err != nil {
return fmt.Errorf("unable to create telemetry data: %s", err.Error())
}
func (p *TelemetryProcessorImpl) AddData(telemetry types.TelemetryType, marshaledData *types.TelemetryBlob, tags types.Tags) (err error) {
dataItemRow := NewTelemetryDataItemRow(telemetry, tags, marshaledData)

err = dataItemRow.Insert(p.t.storer.Conn)
return
return dataItemRow.Insert(p.t.storer.Conn)
}

func (p *TelemetryProcessorImpl) GenerateBundle(clientId int64, customerId string, tags types.Tags) (bundleRow *TelemetryBundleRow, err error) {
Expand Down Expand Up @@ -250,11 +245,11 @@ func (p *TelemetryProcessorImpl) ToItem(itemRow *TelemetryDataItemRow) (item Tel
Checksum: itemRow.ItemChecksum,
}

data, err := utils.DeserializeMap(string(itemRow.ItemData))
//data, err := utils.DeserializeMap(string(itemRow.ItemData))

item = TelemetryDataItem{
Header: itemHeader,
TelemetryData: data,
TelemetryData: itemRow.ItemData,
Footer: itemFooter,
}

Expand Down
65 changes: 17 additions & 48 deletions pkg/lib/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package telemetrylib
import (
"fmt"
"log/slog"
"strings"
"testing"

"github.com/SUSE/telemetry/pkg/config"
Expand Down Expand Up @@ -75,18 +74,16 @@ func (t *TelemetryProcessorTestSuite) AfterTest() {
func (t *TelemetryProcessorTestSuite) TestAddTelemetryDataItem() {
telemetryType := types.TelemetryType("SLE-SERVER-Test")
tags := types.Tags{types.Tag("key1=value1"), types.Tag("key2")}
payload := `
{
"ItemA": 1,
"ItemB": "b",
"ItemC": "c"
}
`
payload := types.NewTelemetryBlob([]byte(`{
"ItemA": 1,
"ItemB": "b",
"ItemC": "c"
}`))

// test the fileEnv.yaml based datastores
processor := t.defaultEnv.telemetryprocessor

err := processor.AddData(telemetryType, []byte(payload), tags)
err := processor.AddData(telemetryType, payload, tags)
if err != nil {
t.Fail("Test failed to add telemetry data item to datastore")
}
Expand All @@ -104,14 +101,12 @@ func (t *TelemetryProcessorTestSuite) TestCreateBundle() {

tags := types.Tags{types.Tag("key1=value1"), types.Tag("key2")}

payload := `
{
"field1": "example_data",
"field2": null,
"field3": [1, 2, 3]
}
`
err := telemetryprocessor.AddData(telemetryType, []byte(payload), tags)
payload := types.NewTelemetryBlob([]byte(`{
"field1": "example_data",
"field2": null,
"field3": [1, 2, 3]
}`))
err := telemetryprocessor.AddData(telemetryType, payload, tags)

if err != nil {
t.Fail("Test failed to add telemetry data item")
Expand All @@ -121,14 +116,12 @@ func (t *TelemetryProcessorTestSuite) TestCreateBundle() {
telemetryType = types.TelemetryType("SLE-SERVER-Pkg")
newtags := types.Tags{types.Tag("key1=value1"), types.Tag("key2")}

payload = `
{
payload = types.NewTelemetryBlob([]byte(`{
"ItemA": 1,
"ItemB": "b"
}
`
}`))

err = telemetryprocessor.AddData(telemetryType, []byte(payload), newtags)
err = telemetryprocessor.AddData(telemetryType, payload, newtags)

if err != nil {
t.Fail("Test failed to add telemetry data item")
Expand Down Expand Up @@ -331,30 +324,6 @@ func (t *TelemetryProcessorTestSuite) TestReport() {

}

func (t *TelemetryProcessorTestSuite) TestAddTelemetryDataItemInvalidPayload() {

payload := `
{
"field1": "example_data",
"field2": null
"field3": [1, 2, 3]
}
`
telemetryType := types.TelemetryType("SLE-SERVER-Pkg")
var tags types.Tags

processor := t.defaultEnv.telemetryprocessor
err := processor.AddData(telemetryType, []byte(payload), tags)

expectedmsg := "unable to unmarshal JSON"

// Check if the string contains the substring
if !strings.Contains(err.Error(), expectedmsg) {
t.T().Errorf("String '%s' does not contain substring '%s'", err.Error(), expectedmsg)
}

}

func addDataItems(totalItems int, processor TelemetryProcessor) error {

telemetryType := types.TelemetryType("SLE-SERVER-Test")
Expand All @@ -372,8 +341,8 @@ func addDataItems(totalItems int, processor TelemetryProcessor) error {
`
numItems := 1
for numItems <= totalItems {
formattedJSON := fmt.Sprintf(payload, utils.GenerateRandomString(3))
err := processor.AddData(telemetryType, []byte(formattedJSON), tags)
formattedJSON := types.NewTelemetryBlob([]byte(fmt.Sprintf(payload, utils.GenerateRandomString(3))))
err := processor.AddData(telemetryType, formattedJSON, tags)
if err != nil {
slog.Error(
"Failed to add the item",
Expand Down
2 changes: 1 addition & 1 deletion pkg/lib/limits.go → pkg/limits/limits.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package telemetrylib
package limits

import (
"errors"
Expand Down
70 changes: 70 additions & 0 deletions pkg/types/blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package types

import (
"encoding/json"
"fmt"
"log/slog"

"github.com/SUSE/telemetry/pkg/limits"
)

type TelemetryBlob struct {
bytes []byte
}

func NewTelemetryBlob(jsonBlob []byte) *TelemetryBlob {
return &TelemetryBlob{bytes: jsonBlob}
}

func (tb *TelemetryBlob) String() string {
return string(tb.bytes)
}

func (tb *TelemetryBlob) Bytes() []byte {
return tb.bytes
}

func (tb *TelemetryBlob) errNotValidJson() error {
return fmt.Errorf("not valid JSON blob")
}

func (tb *TelemetryBlob) errNotJsonObject(err error) error {
return fmt.Errorf("not a JSON object: %s", err.Error())
}

func (tb *TelemetryBlob) errNotVersionedObject() error {
return fmt.Errorf("missing 'version' field in JSON object")
}

func (tb *TelemetryBlob) validJson() bool {
return json.Valid(tb.Bytes())
}

func (tb *TelemetryBlob) Valid() error {
var data map[string]any

if !tb.validJson() {
return tb.errNotValidJson()
}

if err := json.Unmarshal(tb.Bytes(), &data); err != nil {
slog.Debug(
"Not a valid JSON object",
slog.String("blob", tb.String()),
slog.String("error", err.Error()),
)
newErr := tb.errNotJsonObject(err)
return newErr
}

if _, found := data["version"]; !found {
slog.Debug("Not a valid JSON object", slog.String("blob", tb.String()))
return tb.errNotVersionedObject()
}

return nil
}

func (tb *TelemetryBlob) CheckLimits() error {
return limits.NewTelemetryDataLimits().CheckLimits(tb.Bytes())
}
5 changes: 4 additions & 1 deletion telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func Generate(telemetry types.TelemetryType, class TelemetryClass, content []byt
return err
}

// check that the telemetry content is valid
blob := types.NewTelemetryBlob(content)

// attempt to load the default config file
cfg, err := config.NewConfig(client.CONFIG_PATH)
if err != nil {
Expand Down Expand Up @@ -113,7 +116,7 @@ func Generate(telemetry types.TelemetryType, class TelemetryClass, content []byt
}

// generate the telemetry, storing it in the local data store
err = tc.Generate(telemetry, content, tags)
err = tc.Generate(telemetry, blob, tags)
if err != nil {
slog.Warn(
"Failed to generate telemetry",
Expand Down