Skip to content

Commit

Permalink
Implement all repository functions and move away from mongoClient
Browse files Browse the repository at this point in the history
  • Loading branch information
Hiep committed Aug 11, 2020
1 parent 8195778 commit a03d116
Show file tree
Hide file tree
Showing 11 changed files with 1,522 additions and 469 deletions.
131 changes: 66 additions & 65 deletions odin-engine/api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,124 +2,125 @@ package api

import (
"bytes"
"fmt"
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"

"github.com/theycallmemac/odin/odin-engine/pkg/jobs"
"github.com/theycallmemac/odin/odin-engine/pkg/repository"
"github.com/theycallmemac/odin/odin-engine/pkg/resources"
"github.com/valyala/fasthttp"

"go.mongodb.org/mongo-driver/bson"
"github.com/valyala/fasthttp"
)

// AddJob is used to create a new job
func AddJob(ctx *fasthttp.RequestCtx) {
body := ctx.PostBody()
func AddJob(client *jobs.Client, ctx *fasthttp.RequestCtx) {
body := ctx.PostBody()
path := jobs.SetupEnvironment(body)
client, err := jobs.SetupClient()
id, err := client.Repo.CreateJob(ctx, body, path, "")
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
fmt.Fprintf(ctx, "[FAILED] Job failed to deploy: %v\n", err)
} else {
status := jobs.InsertIntoMongo(client, body, path, "")
fmt.Fprintf(ctx, status)
fmt.Fprintf(ctx, "[SUCCESS] Job (%s) deployed successfully\n", id)
}
}

// DeleteJob is used to delete a job
func DeleteJob(ctx *fasthttp.RequestCtx) {
func DeleteJob(client *jobs.Client, ctx *fasthttp.RequestCtx) {
args := strings.Split(string(ctx.PostBody()), " ")
id, uid := args[0], args[1]
client, err := jobs.SetupClient()
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
os.RemoveAll("/etc/odin/jobs/" + id)
os.RemoveAll("/etc/odin/logs/" + id)
if err := client.Repo.DeleteJob(ctx, id, uid); err != nil {
fmt.Fprintf(ctx, "[FAILED] Failed to remove job (%s): %v\n", id, err)
} else {
os.RemoveAll("/etc/odin/jobs/" + id)
os.RemoveAll("/etc/odin/logs/" + id)
if jobs.DeleteJobByValue(client, bson.M{"id": id}, uid) {
fmt.Fprintf(ctx, "Job removed!\n")
} else {
fmt.Fprintf(ctx, "Job with that ID does not exist!\n")
}
fmt.Fprintf(ctx, "[SUCCESS] Job (%s) removed\n", id)
}
}

// UpdateJob is used to update a job
func UpdateJob(ctx *fasthttp.RequestCtx) {
func UpdateJob(client *jobs.Client, ctx *fasthttp.RequestCtx) {
args := strings.Split(string(ctx.PostBody()), "_")
id, name, description, schedule, uid := args[0], args[1], args[2], args[3], args[4]
client, err := jobs.SetupClient()
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
job := &repository.Job{
ID: id,
UID: uid,
}
if resources.NotEmpty(name) {
job.Name = name
}
if resources.NotEmpty(description) {
job.Description = description
}
if resources.NotEmpty(schedule) {
ioutil.WriteFile(".tmp.yml", []byte("provider:\n name: 'odin'\n version: '1.0.0'\njob:\n name: ''\n description: ''\n language: ''\n file: ''\n schedule: "+schedule+"\n\n"), 0654)
resp := jobs.MakePostRequest("http://localhost:3939/schedule", bytes.NewBuffer([]byte(".tmp.yml")))
os.Remove(".tmp.yml")
job.Schedule = resp
}
if err := client.Repo.UpdateJob(ctx, job); err != nil {
fmt.Fprintf(ctx, "[FAILED] Failed to update job (%s): %v\n", job.ID, err)
} else {
job := jobs.GetJobByValue(client, bson.M{"id": id}, uid)
if resources.NotEmpty(name) {
job.Name = name
}
if resources.NotEmpty(description) {
job.Description = description
}
if resources.NotEmpty(schedule) {
ioutil.WriteFile(".tmp.yml", []byte("provider:\n name: 'odin'\n version: '1.0.0'\njob:\n name: ''\n description: ''\n language: ''\n file: ''\n schedule: "+schedule+"\n\n"), 0654)
resp := jobs.MakePostRequest("http://localhost:3939/schedule", bytes.NewBuffer([]byte(".tmp.yml")))
os.Remove(".tmp.yml")
job.Schedule = resp
}
_ = jobs.UpdateJobByValue(client, job)
fmt.Fprintf(ctx, "Updated job " + id + " successfully\n")
fmt.Fprintf(ctx, "[SUCCESS] Job (%s) updated\n", id)
}
}

// GetJobDescription is used to show a job's description
func GetJobDescription(ctx *fasthttp.RequestCtx) {
func GetJobDescription(client *jobs.Client, ctx *fasthttp.RequestCtx) {
args := strings.Split(string(ctx.PostBody()), "_")
id, uid := args[0], args[1]
client, err := jobs.SetupClient()
job, err := client.Repo.GetJobById(ctx, id, uid)
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
fmt.Fprintf(ctx, "[FAILED] Failed to get job (%s): %v", id, err)
} else {
job := jobs.GetJobByValue(client, bson.M{"id": id}, uid)
fmt.Fprintf(ctx, job.Name + " - " + job.Description + "\n")
fmt.Fprintf(ctx, job.Name+" - "+job.Description+"\n")
}
}

// UpdateJobRuns is used to update a job's run number
func UpdateJobRuns(ctx *fasthttp.RequestCtx) {
// TODO: Get and update job should be done as a transaction
func UpdateJobRuns(client *jobs.Client, ctx *fasthttp.RequestCtx) {
args := strings.Split(string(ctx.PostBody()), " ")
id, runs, uid := args[0], args[1], args[2]
client, err := jobs.SetupClient()
job, err := client.Repo.GetJobById(ctx, id, uid)
if err != nil {
fmt.Fprintf(ctx, "[FAILED] Failed to update job (%s): %v\n", id, err)
return
}
inc, err := strconv.Atoi(runs)
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
fmt.Fprint(ctx, "Invalid run")
return
}
job.Runs = job.Runs + inc
if err := client.Repo.UpdateJob(ctx, job); err != nil {
fmt.Fprintf(ctx, "[FAILED] Failed to update job (%s): %v\n", job.ID, err)
} else {
job := jobs.GetJobByValue(client, bson.M{"id": id}, uid)
inc, _ := strconv.Atoi(runs)
job.Runs = job.Runs + inc
_ = jobs.UpdateJobByValue(client, job)
fmt.Fprintf(ctx, "[SUCCESS] Job (%s) updated\n", id)
}
}

// ListJobs is used to list the current jobs running
func ListJobs(ctx *fasthttp.RequestCtx) {
client, err := jobs.SetupClient()
func ListJobs(client *jobs.Client, ctx *fasthttp.RequestCtx) {
uid := string(ctx.PostBody())
jobList, err := client.Repo.GetUserJobs(ctx, uid)
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
} else {
jobList := jobs.GetUserJobs(client, string(ctx.PostBody()))
fmt.Fprintf(ctx, jobs.SchFormat("ID", "NAME", "DESCRIPTION", "LANGUAGE", "LINKS", "SCHEDULE"))
for _, job := range jobList {
linkLen := len(job.Links) - 1
if linkLen < 0 {
linkLen = 0
}
fmt.Fprintf(ctx, jobs.SchFormat(job.ID, job.Name, job.Description, job.Language, job.Links[:linkLen], job.Schedule[:len(job.Schedule)-1]))
fmt.Fprintf(ctx, "[FAILED] Failed to get jobs for user %s\n", uid)
return
}
fmt.Fprintf(ctx, jobs.SchFormat("ID", "NAME", "DESCRIPTION", "LANGUAGE", "LINKS", "SCHEDULE"))
for _, job := range jobList {
linkLen := len(job.Links) - 1
if linkLen < 0 {
linkLen = 0
}
fmt.Fprintf(ctx, jobs.SchFormat(job.ID, job.Name, job.Description, job.Language, job.Links[:linkLen], job.Schedule[:len(job.Schedule)-1]))
}
}

// GetJobLogs is used to retrieve the logs for a job
func GetJobLogs(ctx *fasthttp.RequestCtx) {
log, _ := ioutil.ReadFile("/etc/odin/logs/" + string(ctx.PostBody()))
fmt.Fprintf(ctx, "\n" + string(log) + "\n")
fmt.Fprintf(ctx, "\n"+string(log)+"\n")
}
24 changes: 10 additions & 14 deletions odin-engine/api/links.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,31 @@
package api

import (
"fmt"
"fmt"
"strings"

"github.com/theycallmemac/odin/odin-engine/pkg/jobs"
"github.com/valyala/fasthttp"
"github.com/valyala/fasthttp"
)

// LinkJobs is used to link two jobs together
func LinkJobs(ctx *fasthttp.RequestCtx) {
func LinkJobs(client *jobs.Client, ctx *fasthttp.RequestCtx) {
split := strings.Split(string(ctx.PostBody()), "_")
from, to, uid := split[0], split[1], split[2]
client, _ := jobs.SetupClient()
updated := jobs.AddJobLink(client, from, to, uid)
if updated == 1 {
fmt.Fprintf(ctx, "Job " + from + " linked to " + to + "!\n")
if err := client.Repo.AddJobLink(ctx, from, to, uid); err != nil {
fmt.Fprintf(ctx, "[FAILED] Job %s could not be linked to %s: %v\n", from, to, err)
} else {
fmt.Fprintf(ctx, "Job " + from + " could not be linked to " + to + ".\n")
fmt.Fprintf(ctx, "[SUCCESS] Job %s linked to %s\n", from, to)
}
}

// UnlinkJobs is used to delete a job link
func UnlinkJobs(ctx *fasthttp.RequestCtx) {
func UnlinkJobs(client *jobs.Client, ctx *fasthttp.RequestCtx) {
split := strings.Split(string(ctx.PostBody()), "_")
from, to, uid := split[0], split[1], split[2]
client, _ := jobs.SetupClient()
updated := jobs.DeleteJobLink(client, from, to, uid)
if updated == 1 {
fmt.Fprintf(ctx, "Job " + to + " unlinked from " + from + "!\n")
if err := client.Repo.DeleteJobLink(ctx, from, to, uid); err != nil {
fmt.Fprintf(ctx, "[FAILED] Link %s could not be unlinked from %s: %v\n", to, from, err)
} else {
fmt.Fprintf(ctx, "Job " + to + " has no links!\n")
fmt.Fprintf(ctx, "[SUCCESS] Link %s unlinked from %s\n", to, from)
}
}
32 changes: 16 additions & 16 deletions odin-engine/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ func SetOdinEnv(mongoURL string) {
// Service is a type to be used by the raft consensus protocol
// consists of a base http address and a store in the finite state machine
type Service struct {
addr string
store fsm.Store
repo repository.Repository
addr string
store fsm.Store
client *jobs.Client
}

// NewService is used to initialize a new service struct
// parameters: addr (a string of a http address), store (a store of node details)
// returns: *Service (a newly initialized service struct)
func NewService(addr string, store fsm.Store, repo repository.Repository) *Service {
return &Service{
addr: addr,
store: store,
repo: repo,
addr: addr,
store: store,
client: jobs.NewClient(repo),
}
}

Expand All @@ -60,29 +60,29 @@ func (service *Service) Start() {
case "/execute/yaml":
ExecuteYaml(ctx)
case "/jobs/add":
AddJob(ctx)
AddJob(service.client, ctx)
case "/jobs/delete":
DeleteJob(ctx)
DeleteJob(service.client, ctx)
case "/jobs/info/update":
UpdateJob(ctx)
UpdateJob(service.client, ctx)
case "/jobs/info/description":
GetJobDescription(ctx)
GetJobDescription(service.client, ctx)
case "/jobs/info/runs":
UpdateJobRuns(ctx)
UpdateJobRuns(service.client, ctx)
case "/jobs/list":
ListJobs(ctx)
ListJobs(service.client, ctx)
case "/jobs/logs":
GetJobLogs(ctx)
case "/links/add":
LinkJobs(ctx)
LinkJobs(service.client, ctx)
case "/links/delete":
UnlinkJobs(ctx)
UnlinkJobs(service.client, ctx)
case "/schedule":
GetJobSchedule(ctx)
case "/stats/add":
AddJobStats(ctx)
AddJobStats(service.client, ctx)
case "/stats/get":
GetJobStats(service.repo, ctx)
GetJobStats(service.client, ctx)
}
}

Expand Down
48 changes: 14 additions & 34 deletions odin-engine/api/stats.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package api

import (
"context"
"fmt"
"strings"

"github.com/theycallmemac/odin/odin-engine/pkg/jobs"
"github.com/theycallmemac/odin/odin-engine/pkg/repository"
"github.com/valyala/fasthttp"

"go.mongodb.org/mongo-driver/mongo"
)

// JobStats is a type to be used for accessing and storing job stats information
Expand All @@ -22,45 +19,28 @@ type JobStats struct {
}

// AddJobStats is used to parse collected metrics
func AddJobStats(ctx *fasthttp.RequestCtx) {
func AddJobStats(client *jobs.Client, ctx *fasthttp.RequestCtx) {
args := strings.Split(string(ctx.PostBody()), ",")
typeOfValue, desc, value, id, timestamp := args[0], args[1], args[2], args[3], args[4]
client, err := jobs.SetupClient()
if err != nil {
fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n")
} else {
if InsertIntoMongo(client, typeOfValue, desc, value, id, timestamp) {
fmt.Fprintf(ctx, "200")
} else {
fmt.Fprintf(ctx, "500")
}
js := &repository.JobStats{
ID: id,
Description: desc,
Type: typeOfValue,
Value: value,
Timestamp: timestamp,
}
}

// InsertIntoMongo is used to add collected metrics to the observability collection
// parameters: client (a *mongo.Client), typeOfValue (a string of the type of value being stored), desc (a string describing the value being stored), value (a string of the value being stored), id (a string of the associated Job ID), timestamp (a string of the unix time at which the operation took place)
// returns: bool (true is successful, false if otherwise)
func InsertIntoMongo(client *mongo.Client, typeOfValue string, desc string, value string, id string, timestamp string) bool {
var js JobStats
js.ID = id
js.Description = desc
js.Type = typeOfValue
js.Value = value
js.Timestamp = timestamp
collection := client.Database("odin").Collection("observability")
_, err := collection.InsertOne(context.TODO(), js)
client.Disconnect(context.TODO())
if err != nil {
return false
if err := client.Repo.CreateJobStats(ctx, js); err != nil {
fmt.Fprintf(ctx, "500")
} else {
fmt.Fprintf(ctx, "200")
}
return true
}

// GetJobStats is used to show stats collected by a specified job
func GetJobStats(repo repository.Repository, ctx *fasthttp.RequestCtx) {
statsList, err := repo.GetJobStats(ctx, string(ctx.PostBody()))
func GetJobStats(client *jobs.Client, ctx *fasthttp.RequestCtx) {
statsList, err := client.Repo.GetJobStats(ctx, string(ctx.PostBody()))
if err != nil {
fmt.Fprintf(ctx, "[ERROR] Cannot get job stats: %v", err)
fmt.Fprintf(ctx, "[FAILED] Cannot get job stats: %v\n", err)
} else {
for _, stat := range statsList {
fmt.Fprintf(ctx, jobs.Format(stat.ID, stat.Description, stat.Type, stat.Value))
Expand Down
2 changes: 2 additions & 0 deletions odin-engine/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/theycallmemac/odin/odin-engine

go 1.13

replace github.com/hidal-go/hidalgo => github.com/hiepd/hidalgo v0.0.0-20200806161247-e4a84db22d06

require (
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
github.com/hashicorp/raft v1.1.2
Expand Down
4 changes: 4 additions & 0 deletions odin-engine/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hidal-go/hidalgo v0.0.0-20190814174001-42e03f3b5eaa h1:hBE4LGxApbZiV/3YoEPv7uYlUMWOogG1hwtkpiU87zQ=
github.com/hidal-go/hidalgo v0.0.0-20190814174001-42e03f3b5eaa/go.mod h1:bPkrxDlroXxigw8BMWTEPTv4W5/rQwNgg2BECXsgyX0=
github.com/hiepd/hidalgo v0.0.0-20190814174001-42e03f3b5eaa h1:PmwFwRyxsWbp6bGa+cNbfDdJm3cyz41ut83kblKc6o8=
github.com/hiepd/hidalgo v0.0.0-20190814174001-42e03f3b5eaa/go.mod h1:bPkrxDlroXxigw8BMWTEPTv4W5/rQwNgg2BECXsgyX0=
github.com/hiepd/hidalgo v0.0.0-20200806161247-e4a84db22d06 h1:ZKIptNmc3Mfb8pNDMz3kM2dZAWliF53qZFno8XcqbRw=
github.com/hiepd/hidalgo v0.0.0-20200806161247-e4a84db22d06/go.mod h1:bPkrxDlroXxigw8BMWTEPTv4W5/rQwNgg2BECXsgyX0=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand Down
Loading

0 comments on commit a03d116

Please sign in to comment.