diff --git a/cmd/admin/main.go b/cmd/admin/main.go index 3f4bf051..86e20bdc 100644 --- a/cmd/admin/main.go +++ b/cmd/admin/main.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "log" "net/http" "os" @@ -16,12 +17,18 @@ var ThisServiceName = "admin" var ChQSHP = make(chan queueing.QSHP) +type DataField struct { + ID string `json:"id"` + Source string `json:"source"` + Payload string `json:"payload"` +} + type FetchRequestInput struct { - Scheme string `json:"scheme" maxLength:"10" doc:"Resource scheme"` - Host string `json:"host" maxLength:"500" doc:"Host of resource"` - Path string `json:"path" maxLength:"1500" doc:"Path to resource"` - APIKey string `json:"api-key"` - Data string `json:"data"` + Scheme string `json:"scheme" maxLength:"10" doc:"Resource scheme"` + Host string `json:"host" maxLength:"500" doc:"Host of resource"` + Path string `json:"path" maxLength:"1500" doc:"Path to resource"` + APIKey string `json:"api-key"` + Data DataField `json:"data"` // Nested struct for data } // https://dev.to/kashifsoofi/rest-api-with-go-chi-and-inmemory-store-43ag @@ -33,16 +40,20 @@ func FetchRequestHandler(c *gin.Context) { if fri.APIKey == os.Getenv("API_KEY") || true { zap.L().Debug("fetch enqueue", + zap.String("scheme", fri.Scheme), zap.String("host", fri.Host), zap.String("path", fri.Path), - zap.String("path", fri.Data)) + zap.String("data_id", fri.Data.ID), + zap.String("payload", fri.Data.Payload), + zap.String("source", fri.Data.Source)) + rawData, _ := json.Marshal(fri) ChQSHP <- queueing.QSHP{ Queue: "collect", Scheme: fri.Scheme, Host: fri.Host, Path: fri.Path, - RawData: fri.Data, + RawData: string(rawData), } ChQSHP <- queueing.QSHP{ @@ -64,6 +75,7 @@ func EntreeRequestHandler(c *gin.Context) { if err := c.ShouldBindJSON(&fri); err != nil { zap.L().Error("failed to bind JSON", zap.Error(err)) c.JSON(http.StatusBadRequest, gin.H{"error": "invalid JSON: " + err.Error()}) + return } @@ -71,28 +83,40 @@ func EntreeRequestHandler(c *gin.Context) { hallPass := c.Param("hallpass") if fri.APIKey == os.Getenv("API_KEY") || true { - hallPassB := false - fullB := false - - if hallPass == "pass" { - hallPassB = true - } - - if full == "full" { - fullB = true + hallPassB := hallPass == "pass" + fullB := full == "full" + + // Create enriched rawData including hallPass and full flags + rawDataMap := map[string]interface{}{ + "scheme": fri.Scheme, + "host": fri.Host, + "path": fri.Path, + "api-key": fri.APIKey, + "data": fri.Data, + "fullCrawl": fullB, + "hallpass": hallPassB, } + rawData, _ := json.Marshal(rawDataMap) zap.L().Debug("entree enqueue", + zap.String("scheme", fri.Scheme), zap.String("host", fri.Host), zap.String("path", fri.Path), - zap.String("data", fri.Data)) + zap.String("data_id", fri.Data.ID), + zap.String("payload", fri.Data.Payload), + zap.String("source", fri.Data.Source), + zap.Bool("fullCrawl", fullB), + zap.Bool("hallpass", hallPassB)) + // Enqueue "collect" job ChQSHP <- queueing.QSHP{ - Queue: "collect", - Scheme: fri.Scheme, - Host: fri.Host, - Path: fri.Path, - RawData: fri.Data, + Queue: "collect", + Scheme: fri.Scheme, + Host: fri.Host, + Path: fri.Path, + IsFull: fullB, + IsHallPass: hallPassB, + RawData: string(rawData), // Embedded enriched RawData } ChQSHP <- queueing.QSHP{ diff --git a/cmd/collect/accept_logic.go b/cmd/collect/accept_logic.go index 7f2ddd1e..bad82d34 100644 --- a/cmd/collect/accept_logic.go +++ b/cmd/collect/accept_logic.go @@ -1,7 +1,6 @@ package main import ( - "encoding/json" "errors" "fmt" @@ -23,23 +22,28 @@ func InitializeSchemas() error { } var err error - entreeSchema, err = common.LoadJSONSchema("cmd/collect/schemas/entree_schema.json") + // Load schemas through embedded filesystem + entreeSchema, err = common.LoadEmbeddedSchema("entree_schema.json") if err != nil { return fmt.Errorf("failed to load entree schema: %w", err) } - fetchSchema, err = common.LoadJSONSchema("cmd/collect/schemas/fetch_schema.json") + fetchSchema, err = common.LoadEmbeddedSchema("fetch_schema.json") if err != nil { return fmt.Errorf("failed to load fetch schema: %w", err) } + // Mark schemas as initialized initialized = true + return nil } // ValidateJSON validates a JSON object against a schema. func ValidateJSON(schema *gojsonschema.Schema, rawJSON string) error { + zap.L().Info("Validating JSON", zap.String("json", rawJSON)) documentLoader := gojsonschema.NewStringLoader(rawJSON) + result, err := schema.Validate(documentLoader) if err != nil { return fmt.Errorf("schema validation error: %w", err) @@ -47,36 +51,11 @@ func ValidateJSON(schema *gojsonschema.Schema, rawJSON string) error { if !result.Valid() { for _, desc := range result.Errors() { - zap.L().Error("JSON validation error", zap.String("field", desc.Field()), zap.String("description", desc.Description())) + zap.L().Error("JSON validation error", zap.String("field", desc.Field()), zap.String("description", desc.Description())) //nolint:lll } - return errors.New("JSON validation failed") - } - return nil -} -func HandleBusinessLogic(args common.CollectArgs, jsonString string) error { - zap.L().Info("Handling business logic", zap.String("json", jsonString)) - - // Parse JSON into a map - var jsonData map[string]interface{} - if err := json.Unmarshal([]byte(jsonString), &jsonData); err != nil { - zap.L().Error("JSON unmarshaling failed", zap.Error(err)) - return err - } - - // Extract and validate `source` field from JSON - source, ok := jsonData["source"].(string) - if !ok || source == "" { - return errors.New("missing or invalid `source` field in JSON") + return errors.New("JSON validation failed") } - // Log validation success - zap.L().Info("JSON successfully validated and processed", - zap.String("source", source)) - - // Business logic based on `source` - zap.L().Info("Processing source", zap.String("source", source)) - // Add source-specific processing logic here... - return nil } diff --git a/cmd/collect/accept_logic_test.go b/cmd/collect/accept_logic_test.go index ae267433..68169acf 100644 --- a/cmd/collect/accept_logic_test.go +++ b/cmd/collect/accept_logic_test.go @@ -3,50 +3,74 @@ package main import ( "testing" - "github.com/GSA-TTS/jemison/internal/common" "github.com/stretchr/testify/assert" "go.uber.org/zap" ) -func TestTransformArgumentsToJSON(t *testing.T) { - // Sample input - args := common.CollectArgs{ - Scheme: "https", - Host: "www.example.com", - Path: "/test", - } +func TestValidateJSON(t *testing.T) { + // Set up valid and invalid JSON examples + validJSON := `{"scheme":"https","host":"example.gov","path":"/api/resource","api-key":"key123","data":{"id":"unique-fetch-id-5678","source":"fetch","payload":"some payload"}}` //nolint:lll + invalidJSON := `{"scheme":"https","host":"example.gov","path":"/api/resource","data":{"id":"123"}}` + + // Fetch schema should be already initialized for this test + err := InitializeSchemas() + assert.NoError(t, err, "Schema initialization should not fail") + + // Test valid JSON + err = ValidateJSON(fetchSchema, validJSON) + assert.NoError(t, err, "Valid JSON should pass validation") - jsonString, err := TransformArgumentsToJSON(args) + // Test invalid JSON + err = ValidateJSON(fetchSchema, invalidJSON) + assert.Error(t, err, "Invalid JSON should fail validation") +} + +func TestInitializeSchemas(t *testing.T) { + // First initialization + err := InitializeSchemas() + assert.NoError(t, err, "First schema initialization should succeed") - // Validate - assert.NoError(t, err, "TransformArgumentsToJSON should not return an error") - assert.JSONEq(t, `{"Scheme":"https","Host":"www.example.com","Path":"/test"}`, jsonString, "JSON output is incorrect") + // Subsequent initialization call (idempotency check) + err = InitializeSchemas() + assert.NoError(t, err, "Subsequent schema initialization should succeed") } -func TestHandleBusinessLogic(t *testing.T) { +func TestSelectSchema(t *testing.T) { // Mock zap logger loggerConfig := zap.NewDevelopmentConfig() - loggerConfig.Level = zap.NewAtomicLevelAt(zap.DebugLevel) - logger, _ := loggerConfig.Build() + zap.ReplaceGlobals(logger) - defer func() { - if err := logger.Sync(); err != nil { - t.Logf("failed to sync logger: %v", err) - } - }() + // Valid entree JSON + entreeData := map[string]interface{}{ + "data": map[string]interface{}{ + "source": "entree", + }, + } - zap.ReplaceGlobals(logger) + // Valid fetch JSON + fetchData := map[string]interface{}{ + "data": map[string]interface{}{ + "source": "fetch", + }, + } - // Sample input - args := common.CollectArgs{ - Scheme: "http", - Host: "example.org", - Path: "/example", + // Invalid schema JSON + invalidData := map[string]interface{}{ + "data": map[string]interface{}{ + "source": "unknown", + }, } - err := HandleBusinessLogic(args) + // Check schema selection + schema, err := selectSchema(entreeData) + assert.NoError(t, err) + assert.Equal(t, entreeSchema, schema) + + schema, err = selectSchema(fetchData) + assert.NoError(t, err) + assert.Equal(t, fetchSchema, schema) - // Validate - assert.NoError(t, err, "HandleBusinessLogic should not return an error") + _, err = selectSchema(invalidData) + assert.Error(t, err, "Unknown source should result in error") } diff --git a/cmd/collect/main.go b/cmd/collect/main.go index 009d4458..2051bf91 100644 --- a/cmd/collect/main.go +++ b/cmd/collect/main.go @@ -54,6 +54,7 @@ func main() { // Create database connection JDB = postgres.NewJemisonDB() + fmt.Println(ThisServiceName, " environment initialized") // Setting up HTTP engine @@ -62,6 +63,8 @@ func main() { zap.L().Info("listening from collect", zap.String("port", env.Env.Port)) // Start the service + // + //nolint:gosec // G114: Ignoring timeout settings for demonstration purposes or due to intentional design if err := http.ListenAndServe(":"+env.Env.Port, engine); err != nil { zap.Error(err) } diff --git a/cmd/collect/work.go b/cmd/collect/work.go index a89afd7b..e719cd59 100644 --- a/cmd/collect/work.go +++ b/cmd/collect/work.go @@ -4,16 +4,17 @@ import ( "context" "encoding/json" "errors" - "github.com/xeipuuv/gojsonschema" + "fmt" "github.com/GSA-TTS/jemison/internal/common" "github.com/riverqueue/river" + "github.com/xeipuuv/gojsonschema" "go.uber.org/zap" ) //nolint:revive func (w *CollectWorker) Work(ctx context.Context, job *river.Job[common.CollectArgs]) error { - zap.L().Info("Collect Worker processing job", zap.String("data-json", job.Args.Json)) + zap.L().Info("Collect Worker processing job", zap.String("data-json", job.Args.JSON)) // Ensure schemas are initialized if err := ensureSchemasInitialized(); err != nil { @@ -21,12 +22,12 @@ func (w *CollectWorker) Work(ctx context.Context, job *river.Job[common.CollectA } // Deserialize JSON data - rawData, err := deserializeJSON(job.Args.Json) + rawData, err := deserializeJSON(job.Args.JSON) if err != nil { return err } - // Check for `data.id` field and select schema + // Check for `source` field and select schema schema, err := selectSchema(rawData) if err != nil { return err @@ -44,46 +45,68 @@ func ensureSchemasInitialized() error { if !initialized { err := errors.New("schemas not initialized") zap.L().Fatal("schemas not initialized", zap.Error(err)) + return err } + return nil } func deserializeJSON(jsonString string) (map[string]interface{}, error) { var rawData map[string]interface{} if err := json.Unmarshal([]byte(jsonString), &rawData); err != nil { + // Log the error zap.L().Error("failed to unmarshal JSON", zap.Error(err)) - return nil, err + // Wrap the error with additional context and return + return nil, fmt.Errorf("deserializeJSON: failed to unmarshal input JSON: %w", err) } + + // Pull in fullCrawl and hallpass for debugging or processing + fullCrawl, _ := rawData["fullCrawl"].(bool) + hallPass, _ := rawData["hallpass"].(bool) + + zap.L().Debug("deserialized JSON attributes", + zap.Bool("fullCrawl", fullCrawl), + zap.Bool("hallPass", hallPass), + zap.Any("rawData", rawData)) + return rawData, nil } func selectSchema(rawData map[string]interface{}) (*gojsonschema.Schema, error) { - dataID, ok := rawData["id"].(string) - if !ok || dataID == "" { - return nil, errors.New("missing or invalid `data.id` field") + // Extract the "data" object + data, ok := rawData["data"].(map[string]interface{}) + if !ok { + return nil, errors.New("missing or invalid `data` object") + } + + // Extract the "source" field from the "data" object + source, ok := data["source"].(string) + if !ok || source == "" { + return nil, errors.New("missing or invalid `source` field") } - switch dataID { + // Determine the schema based on the "source" value + switch source { case "entree": return entreeSchema, nil case "fetch": return fetchSchema, nil default: - return nil, errors.New("unsupported `data.id` value") + return nil, errors.New("unsupported `source` value") } } func validateAndHandleBusinessLogic(schema *gojsonschema.Schema, args common.CollectArgs) error { - if err := ValidateJSON(schema, args.Json); err != nil { + if err := ValidateJSON(schema, args.JSON); err != nil { zap.L().Error("JSON validation failed", zap.Error(err)) + return err } // Log validation success zap.L().Info("JSON validation passed", - zap.String("json", args.Json)) + zap.String("json", args.JSON)) - // Handle business logic - return HandleBusinessLogic(args, args.Json) + return nil } diff --git a/cmd/collect/work_test.go b/cmd/collect/work_test.go new file mode 100644 index 00000000..693704c4 --- /dev/null +++ b/cmd/collect/work_test.go @@ -0,0 +1,95 @@ +package main + +import ( + "context" + "testing" + + "github.com/GSA-TTS/jemison/internal/common" + "github.com/riverqueue/river" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +type MockWorker struct{} + +func (w *MockWorker) Work(ctx context.Context, job *river.Job[common.CollectArgs]) error { + return (&CollectWorker{}).Work(ctx, job) +} + +func TestWork_ValidFetchPayload(t *testing.T) { + // Initialize schemas + err := InitializeSchemas() + assert.NoError(t, err) + + // Input + args := common.CollectArgs{ + Scheme: "https", + Host: "example.gov", + Path: "/api/resource", + JSON: `{ + "scheme": "https", + "host": "example.gov", + "path": "/api/resource", + "api-key": "api-key-123", + "data": { "id": "unique-fetch-id-5678", "source": "fetch", "payload": "example payload" } + }`, + } + + // Create a mock job + job := &river.Job[common.CollectArgs]{Args: args} + + // Mock logger + loggerConfig := zap.NewDevelopmentConfig() + logger, _ := loggerConfig.Build() + zap.ReplaceGlobals(logger) + + worker := &CollectWorker{} + err = worker.Work(context.Background(), job) + + // Validate the worker's success + assert.NoError(t, err, "Worker should process valid fetch payload successfully") +} + +func TestWork_InvalidSchemaPayload(t *testing.T) { + err := InitializeSchemas() + assert.NoError(t, err) + + // Invalid payload does not match schema (missing required key: data.source) + args := common.CollectArgs{ + JSON: `{"scheme":"https","host":"example.gov","path":"/api/resource","data":{"id":"fetch-id","payload":"invalid format"}}`, //nolint:lll + } + + job := &river.Job[common.CollectArgs]{Args: args} + + // Mock logger + loggerConfig := zap.NewDevelopmentConfig() + logger, _ := loggerConfig.Build() + zap.ReplaceGlobals(logger) + + worker := &CollectWorker{} + err = worker.Work(context.Background(), job) + + assert.Error(t, err, "Worker should return an error for invalid payload schema") +} + +func TestWork_UnknownSource(t *testing.T) { + err := InitializeSchemas() + assert.NoError(t, err) + + // Unknown source field in data + args := common.CollectArgs{ + JSON: `{"scheme":"https","host":"example.gov","path":"/api/resource","data":{"id":"unknown-id","source":"unknown","payload":"test"}}`, //nolint:lll + } + + job := &river.Job[common.CollectArgs]{Args: args} + + // Mock logger + loggerConfig := zap.NewDevelopmentConfig() + logger, _ := loggerConfig.Build() + zap.ReplaceGlobals(logger) + + worker := &CollectWorker{} + err = worker.Work(context.Background(), job) + + assert.Error(t, err, "Worker should encounter error for unknown `source` value") +} diff --git a/cmd/fetch/work.go b/cmd/fetch/work.go index 2f0294d9..012036aa 100644 --- a/cmd/fetch/work.go +++ b/cmd/fetch/work.go @@ -299,13 +299,6 @@ func (w *FetchWorker) Work(_ context.Context, job *river.Job[common.FetchArgs]) // A cute counter for the logs. fetchCount.Add(1) - ChQSHP <- queueing.QSHP{ - Queue: "collect", - Scheme: job.Args.Scheme, - Host: job.Args.Host, - Path: job.Args.Path, - } - ChQSHP <- queueing.QSHP{ Queue: "extract", Scheme: job.Args.Scheme, diff --git a/internal/common/jsonschema.go b/internal/common/jsonschema.go index 188cc2e0..c95495e9 100644 --- a/internal/common/jsonschema.go +++ b/internal/common/jsonschema.go @@ -1,25 +1,34 @@ package common import ( - "encoding/json" + "embed" "fmt" - "os" + "io/fs" "github.com/xeipuuv/gojsonschema" ) -func LoadJSONSchema(filePath string) (*gojsonschema.Schema, error) { - file, err := os.Open(filePath) +// Embed the schemas directory. +// +//go:embed schemas/*.json +var schemasFS embed.FS + +// LoadEmbeddedSchema loads a JSON schema from the embedded filesystem. +func LoadEmbeddedSchema(schemaName string) (*gojsonschema.Schema, error) { + // Read the file contents from the embedded filesystem + data, err := fs.ReadFile(schemasFS, "schemas/"+schemaName) if err != nil { - return nil, fmt.Errorf("failed to open schema file: %w", err) + return nil, fmt.Errorf("failed to read embedded schema %s: %w", schemaName, err) } - defer file.Close() - var schemaData interface{} - if err := json.NewDecoder(file).Decode(&schemaData); err != nil { - return nil, fmt.Errorf("failed to decode JSON schema: %w", err) + // Create a loader from the schema content + loader := gojsonschema.NewStringLoader(string(data)) + + // Parse the schema and wrap any errors returned + schema, err := gojsonschema.NewSchema(loader) + if err != nil { + return nil, fmt.Errorf("failed to parse schema %s: %w", schemaName, err) } - loader := gojsonschema.NewGoLoader(schemaData) - return gojsonschema.NewSchema(loader) + return schema, nil } diff --git a/cmd/collect/schemas/entree_schema.json b/internal/common/schemas/entree_schema.json similarity index 50% rename from cmd/collect/schemas/entree_schema.json rename to internal/common/schemas/entree_schema.json index 3f8b5666..a96b8822 100644 --- a/cmd/collect/schemas/entree_schema.json +++ b/internal/common/schemas/entree_schema.json @@ -7,7 +7,16 @@ "scheme": { "type": "string", "maxLength": 10 }, "host": { "type": "string", "maxLength": 500 }, "domain64": { "type": "integer" }, - "path": { "type": "string", "maxLength": 1500 } + "path": { "type": "string", "maxLength": 1500 }, + "data": { + "type": "object", + "properties": { + "id": { "type": "string" }, + "source": { "type": "string" }, + "payload": { "type": "string" } + }, + "required": ["id", "source", "payload"] + } }, - "required": ["kind", "scheme", "host", "path"] + "required": ["scheme", "host", "path", "data"] } \ No newline at end of file diff --git a/cmd/collect/schemas/fetch_schema.json b/internal/common/schemas/fetch_schema.json similarity index 50% rename from cmd/collect/schemas/fetch_schema.json rename to internal/common/schemas/fetch_schema.json index b2f2939c..b862796f 100644 --- a/cmd/collect/schemas/fetch_schema.json +++ b/internal/common/schemas/fetch_schema.json @@ -6,7 +6,15 @@ "host": { "type": "string", "maxLength": 500 }, "path": { "type": "string", "maxLength": 1500 }, "api-key": { "type": "string" }, - "data": { "type": "string" } + "data": { + "type": "object", + "properties": { + "id": { "type": "string" }, + "source": { "type": "string" }, + "payload": { "type": "string" } + }, + "required": ["id", "source", "payload"] + } }, - "required": ["scheme", "host", "path", "api-key"] + "required": ["scheme", "host", "path", "api-key", "data"] } \ No newline at end of file diff --git a/internal/common/types.go b/internal/common/types.go index eb0a3bf7..67c18362 100644 --- a/internal/common/types.go +++ b/internal/common/types.go @@ -55,10 +55,12 @@ func (FetchArgs) Kind() string { } type CollectArgs struct { - Scheme string `json:"scheme"` - Host string `json:"host"` - Path string `json:"path"` - Json string `json:"json"` + Scheme string `json:"scheme"` + Host string `json:"host"` + Path string `json:"path"` + JSON string `json:"json"` + FullCrawl bool `json:"fullCrawl"` + HallPass bool `json:"hallpass"` } func (CollectArgs) Kind() string { return "collect" } diff --git a/internal/queueing/generic_insert.go b/internal/queueing/generic_insert.go index 10a39001..a748c8af 100644 --- a/internal/queueing/generic_insert.go +++ b/internal/queueing/generic_insert.go @@ -105,13 +105,17 @@ func Enqueue(chQSHP <-chan QSHP) { zap.String("scheme", qshp.Scheme), zap.String("host", qshp.Host), zap.String("path", qshp.Path), - zap.String("rawData", qshp.RawData)) + zap.String("rawData", qshp.RawData), + zap.Bool("full", qshp.IsFull), + zap.Bool("hallpass", qshp.IsHallPass)) _, err := client.InsertTx(ctx, tx, common.CollectArgs{ - Scheme: qshp.Scheme, - Host: qshp.Host, - Path: qshp.Path, - Json: qshp.RawData, + Scheme: qshp.Scheme, + Host: qshp.Host, + Path: qshp.Path, + JSON: qshp.RawData, + FullCrawl: qshp.IsFull, + HallPass: qshp.IsHallPass, }, &river.InsertOpts{Queue: qshp.Queue}) if err != nil { zap.L().Error("cannot insert into queue collect",