Skip to content

Commit

Permalink
Tasks history. Fixes #17.
Browse files Browse the repository at this point in the history
  • Loading branch information
mlowicki committed Oct 30, 2018
1 parent 8bfd25d commit 9713e06
Show file tree
Hide file tree
Showing 14 changed files with 416 additions and 102 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ Options:
* digest (optional and used only if `scheme` is set to `"digest"`)
* user (optional)
* password (optional)
* taskttl (optional) - number of milliseconds record of runned task should be kept (`7 days` by default).

Example:
```javascript
Expand Down
26 changes: 26 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
"sort"
"time"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -44,6 +45,7 @@ type storage interface {
GetJob(group string, project string, id string) (*model.Job, error)
SaveJob(j *model.Job) error
DeleteJob(group string, project string, id string) error
GetTasks(group string, project string, id string) ([]*model.Task, error)
}

type handler struct {
Expand Down Expand Up @@ -104,6 +106,29 @@ func getJobs(a authorizer, s storage, w http.ResponseWriter, r *http.Request) er
return nil
}

func getTasks(a authorizer, s storage, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
group := vars["group"]
project := vars["project"]
lvl, err := a.GetProjectAccessLevel(r, group, project)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return err
}
if lvl == auth.NoAccess {
w.WriteHeader(http.StatusForbidden)
return errForbidden
}
tasks, err := s.GetTasks(group, project, vars["id"])
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return err
}
sort.Slice(tasks, func(i, j int) bool { return tasks[i].End.Before(tasks[j].End) })
encoder(w).Encode(tasks)
return nil
}

func getGroupJobs(a authorizer, s storage, w http.ResponseWriter, r *http.Request) error {
jobs, err := s.GetGroupJobs(mux.Vars(r)["group"])
if err != nil {
Expand Down Expand Up @@ -450,6 +475,7 @@ func New(c *conf.API, s storage, state State) {
v1.Handle("/jobs/{group}/{project}/{id}", &handler{a, s, getJob}).Methods("GET")
v1.Handle("/jobs/{group}/{project}/{id}", &handler{a, s, deleteJob}).Methods("DELETE")
v1.Handle("/jobs/{group}/{project}/{id}", &handler{a, s, updateJob}).Methods("PUT")
v1.Handle("/jobs/{group}/{project}/{id}/tasks", &handler{a, s, getTasks}).Methods("GET")
v1.Handle("/metrics", promhttp.Handler())
srv := &http.Server{
Handler: r,
Expand Down
40 changes: 30 additions & 10 deletions conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package conf
import (
"encoding/json"
"io/ioutil"
"reflect"
"time"
)

Expand Down Expand Up @@ -67,6 +68,7 @@ type StorageZK struct {
Addrs []string
Timeout time.Duration
Auth ZKAuth
TaskTTL time.Duration
}

const CoordinatorBackendZK = "zookeeper"
Expand All @@ -77,10 +79,11 @@ type Coordinator struct {
}

type CoordinatorZK struct {
Dir string
Addrs []string
Timeout time.Duration
Auth ZKAuth
Dir string
Addrs []string
Timeout time.Duration
Auth ZKAuth
ElectionDir string
}

const (
Expand Down Expand Up @@ -167,6 +170,22 @@ type LoggingSentry struct {
Tags map[string]string
}

func millisecondFieldsToDuration(v reflect.Value) {
for i := 0; i < v.NumField(); i++ {
if v.Field(i).Kind() == reflect.Struct {
millisecondFieldsToDuration(v.Field(i))
} else {
if v.Field(i).Type() == reflect.TypeOf(time.Second) {
if v.Field(i).CanSet() {
d := v.Field(i).Interface().(time.Duration)
d *= time.Millisecond
v.Field(i).Set(reflect.ValueOf(d))
}
}
}
}
}

func New(path string) (*Conf, error) {
file, err := ioutil.ReadFile(path)
if err != nil {
Expand All @@ -193,6 +212,7 @@ func New(path string) (*Conf, error) {
Auth: ZKAuth{
Scheme: ZKAuthSchemeWorld,
},
TaskTTL: 1000 * 3600 * 24, // 24h
},
},
Coordinator: Coordinator{
Expand All @@ -214,7 +234,7 @@ func New(path string) (*Conf, error) {
},
},
Mesos: Mesos{
FailoverTimeout: 1000 * 3600 * 24 * 7, // 7 days
FailoverTimeout: 1000 * 3600 * 24 * 7, // 7d
Roles: []string{"*"},
Auth: MesosAuth{
Type: MesosAuthTypeNone,
Expand All @@ -229,10 +249,10 @@ func New(path string) (*Conf, error) {
if err != nil {
return nil, err
}
conf.Mesos.FailoverTimeout *= time.Millisecond
conf.Secrets.Vault.Timeout *= time.Millisecond
conf.Storage.ZooKeeper.Timeout *= time.Millisecond
conf.Coordinator.ZooKeeper.Timeout *= time.Millisecond
conf.API.Auth.LDAP.Timeout *= time.Millisecond
conf.Coordinator.ZooKeeper.ElectionDir = "election/mesos_scheduler"
// All time.Duration fields from Conf should be in milliseconds so
// conversion to time elapsed in nanoseconds (represented by time.Duration)
// is needed.
millisecondFieldsToDuration(reflect.ValueOf(conf).Elem())
return conf, nil
}
2 changes: 1 addition & 1 deletion coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type coordinator interface {
WaitUntilLeader() (context.Context, error)
WaitUntilLeader() context.Context
}

func New(c *conf.Coordinator) coordinator {
Expand Down
79 changes: 40 additions & 39 deletions coordinator/zk/zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,50 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/mlowicki/rhythm/conf"
"github.com/mlowicki/rhythm/zkutil"
"github.com/samuel/go-zookeeper/zk"
log "github.com/sirupsen/logrus"
)

const electionDir = "election"

type Coordinator struct {
dir string
conn *zk.Conn
acl func(perms int32) []zk.ACL
ticket string
eventChan <-chan zk.Event
cancel context.CancelFunc
dir string
electionDir string
conn *zk.Conn
acl func(perms int32) []zk.ACL
ticket string
eventChan <-chan zk.Event
cancel context.CancelFunc
sync.Mutex
}

func (coord *Coordinator) WaitUntilLeader() (context.Context, error) {
isLeader, ch, err := coord.isLeader()
if err != nil {
return nil, err
}
if !isLeader {
for {
log.Println("Not elected as leader. Waiting...")
<-ch
isLeader, ch, err = coord.isLeader()
if err != nil {
return nil, err
} else if isLeader {
break
}
func (coord *Coordinator) WaitUntilLeader() context.Context {
for {
isLeader, ch, err := coord.isLeader()
if err != nil {
log.Errorf("Failed checking if elected as leader: %s", err)
<-time.After(time.Second)
continue
}
if isLeader {
break
}
log.Debug("Not elected as leader. Waiting...")
<-ch
continue
}
log.Println("Elected as leader")
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
coord.Lock()
coord.cancel = cancel
coord.Unlock()
return ctx, nil
return ctx
}

func (coord *Coordinator) register() error {
name, err := coord.conn.Create(coord.dir+"/"+electionDir+"/", []byte{}, zk.FlagEphemeral|zk.FlagSequence, coord.acl(zk.PermAll))
name, err := coord.conn.Create(coord.dir+"/"+coord.electionDir+"/", []byte{}, zk.FlagEphemeral|zk.FlagSequence, coord.acl(zk.PermAll))
if err != nil {
return err
}
Expand All @@ -73,7 +70,7 @@ func (coord *Coordinator) isLeader() (bool, <-chan zk.Event, error) {
return false, nil, fmt.Errorf("Registration failed: %s", err)
}
}
tickets, _, eventChan, err := coord.conn.ChildrenW(coord.dir + "/" + electionDir)
tickets, _, eventChan, err := coord.conn.ChildrenW(coord.dir + "/" + coord.electionDir)
if err != nil {
return false, nil, fmt.Errorf("Failed getting registration tickets: %s", err)
}
Expand Down Expand Up @@ -108,15 +105,18 @@ func (coord *Coordinator) initZK() error {
return err
}
}
path := coord.dir + "/" + electionDir
exists, _, err = coord.conn.Exists(path)
if err != nil {
return fmt.Errorf("Failed checking if election directory exists: %s", err)
}
if !exists {
_, err = coord.conn.Create(path, []byte{}, 0, coord.acl(zk.PermAll))
path := coord.dir
for _, part := range strings.Split(coord.electionDir, "/") {
path += "/" + part
exists, _, err = coord.conn.Exists(path)
if err != nil {
return fmt.Errorf("Failed creating election directory: %s", err)
return fmt.Errorf("Failed checking if election directory exists: %s", err)
}
if !exists {
_, err = coord.conn.Create(path, []byte{}, 0, coord.acl(zk.PermAll))
if err != nil {
return fmt.Errorf("Failed creating election directory: %s", err)
}
}
}
return nil
Expand All @@ -132,10 +132,11 @@ func New(c *conf.CoordinatorZK) (*Coordinator, error) {
return nil, err
}
coord := Coordinator{
conn: conn,
acl: acl,
dir: "/" + c.Dir,
eventChan: eventChan,
conn: conn,
acl: acl,
dir: "/" + c.Dir,
eventChan: eventChan,
electionDir: c.ElectionDir,
}
err = coord.initZK()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions dev/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"storage": {
"backend": "zookeeper",
"zookeeper": {
"taskttl": 300000,
"dir": "rhythm",
"timeout": 10000,
"auth": {
Expand Down
Loading

0 comments on commit 9713e06

Please sign in to comment.