diff --git a/ds.go b/ds.go index 1597f63..518ea21 100644 --- a/ds.go +++ b/ds.go @@ -1,12 +1,6 @@ package kail import ( - "context" - - logutil "github.com/boz/go-logutil" - "github.com/boz/kcache/filter" - "github.com/boz/kcache/join" - "github.com/boz/kcache/nsname" "github.com/boz/kcache/types/daemonset" "github.com/boz/kcache/types/deployment" "github.com/boz/kcache/types/node" @@ -14,25 +8,8 @@ import ( "github.com/boz/kcache/types/replicaset" "github.com/boz/kcache/types/replicationcontroller" "github.com/boz/kcache/types/service" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/kubernetes" ) -type DSBuilder interface { - WithIgnore(selectors ...labels.Selector) DSBuilder - WithSelectors(selectors ...labels.Selector) DSBuilder - WithPods(id ...nsname.NSName) DSBuilder - WithNamespace(name ...string) DSBuilder - WithService(id ...nsname.NSName) DSBuilder - WithNode(name ...string) DSBuilder - WithRC(id ...nsname.NSName) DSBuilder - WithRS(id ...nsname.NSName) DSBuilder - WithDS(id ...nsname.NSName) DSBuilder - WithDeployment(id ...nsname.NSName) DSBuilder - - Create(ctx context.Context, cs kubernetes.Interface) (DS, error) -} - type DS interface { Pods() pod.Controller Ready() <-chan struct{} @@ -40,252 +17,6 @@ type DS interface { Shutdown() } -type dsBuilder struct { - ignore []labels.Selector - selectors []labels.Selector - pods []nsname.NSName - namespaces []string - services []nsname.NSName - nodes []string - rcs []nsname.NSName - rss []nsname.NSName - dss []nsname.NSName - deployments []nsname.NSName -} - -func NewDSBuilder() DSBuilder { - return &dsBuilder{} -} - -func (b *dsBuilder) WithIgnore(selector ...labels.Selector) DSBuilder { - b.ignore = append(b.ignore, selector...) - return b -} - -func (b *dsBuilder) WithSelectors(selectors ...labels.Selector) DSBuilder { - b.selectors = append(b.selectors, selectors...) - return b -} - -func (b *dsBuilder) WithPods(id ...nsname.NSName) DSBuilder { - b.pods = append(b.pods, id...) - return b -} - -func (b *dsBuilder) WithNamespace(name ...string) DSBuilder { - b.namespaces = append(b.namespaces, name...) - return b -} - -func (b *dsBuilder) WithService(id ...nsname.NSName) DSBuilder { - b.services = append(b.services, id...) - return b -} - -func (b *dsBuilder) WithNode(name ...string) DSBuilder { - b.nodes = append(b.nodes, name...) - return b -} - -func (b *dsBuilder) WithRC(id ...nsname.NSName) DSBuilder { - b.rcs = append(b.rcs, id...) - return b -} - -func (b *dsBuilder) WithRS(id ...nsname.NSName) DSBuilder { - b.rss = append(b.rss, id...) - return b -} - -func (b *dsBuilder) WithDS(id ...nsname.NSName) DSBuilder { - b.dss = append(b.dss, id...) - return b -} - -func (b *dsBuilder) WithDeployment(id ...nsname.NSName) DSBuilder { - b.deployments = append(b.deployments, id...) - return b -} - -func (b *dsBuilder) Create(ctx context.Context, cs kubernetes.Interface) (DS, error) { - log := logutil.FromContextOrDefault(ctx) - - ds := &datastore{ - readych: make(chan struct{}), - donech: make(chan struct{}), - } - - base, err := pod.NewController(ctx, log, cs, "") - if err != nil { - return nil, log.Err(err, "base pod controller") - } - - ds.podBase = base - ds.pods, err = base.CloneWithFilter(filter.Null()) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "null filter") - } - - if len(b.ignore) != 0 { - filters := make([]filter.Filter, 0, len(b.ignore)) - for _, selector := range b.ignore { - filters = append(filters, filter.Not(filter.Selector(selector))) - } - ds.pods, err = ds.pods.CloneWithFilter(filter.And(filters...)) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "labels filter") - } - } - - if len(b.selectors) != 0 { - filters := make([]filter.Filter, 0, len(b.selectors)) - for _, selector := range b.selectors { - filters = append(filters, filter.Selector(selector)) - } - ds.pods, err = ds.pods.CloneWithFilter(filter.And(filters...)) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "labels filter") - } - } - - if len(b.pods) != 0 { - ds.pods, err = ds.pods.CloneWithFilter(filter.NSName(b.pods...)) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "pods filter") - } - } - - if sz := len(b.namespaces); sz > 0 { - ids := make([]nsname.NSName, 0, sz) - for _, ns := range b.namespaces { - ids = append(ids, nsname.New(ns, "")) - } - - ds.pods, err = ds.pods.CloneWithFilter(filter.NSName(ids...)) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "namespace filter") - } - } - - if len(b.nodes) != 0 { - ds.pods, err = ds.pods.CloneWithFilter(pod.NodeFilter(b.nodes...)) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "node filter") - } - } - - if len(b.services) != 0 { - ds.servicesBase, err = service.NewController(ctx, log, cs, "") - if err != nil { - ds.closeAll() - return nil, log.Err(err, "service base controller") - } - - ds.services, err = ds.servicesBase.CloneWithFilter(filter.NSName(b.services...)) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "service controller") - } - - ds.pods, err = join.ServicePods(ctx, ds.services, ds.pods) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "service join") - } - } - - if len(b.rcs) != 0 { - ds.rcsBase, err = replicationcontroller.NewController(ctx, log, cs, "") - if err != nil { - ds.closeAll() - return nil, log.Err(err, "rc base controller") - } - - ds.rcs, err = ds.rcsBase.CloneWithFilter(filter.NSName(b.rcs...)) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "rc controller") - } - - ds.pods, err = join.RCPods(ctx, ds.rcs, ds.pods) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "rc join") - } - } - - if len(b.rss) != 0 { - ds.rssBase, err = replicaset.NewController(ctx, log, cs, "") - if err != nil { - ds.closeAll() - return nil, log.Err(err, "rs base controller") - } - - ds.rss, err = ds.rssBase.CloneWithFilter(filter.NSName(b.rss...)) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "rs controller") - } - - ds.pods, err = join.RSPods(ctx, ds.rss, ds.pods) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "rs join") - } - } - - if len(b.dss) != 0 { - ds.dssBase, err = daemonset.NewController(ctx, log, cs, "") - if err != nil { - ds.closeAll() - return nil, log.Err(err, "ds base controller") - } - - ds.dss, err = ds.dssBase.CloneWithFilter(filter.NSName(b.dss...)) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "ds controller") - } - - ds.pods, err = join.DaemonSetPods(ctx, ds.dss, ds.pods) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "ds join") - } - } - - if len(b.deployments) != 0 { - ds.deploymentsBase, err = deployment.NewController(ctx, log, cs, "") - if err != nil { - ds.closeAll() - return nil, log.Err(err, "deployment base controller") - } - - ds.deployments, err = ds.deploymentsBase.CloneWithFilter(filter.NSName(b.deployments...)) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "deployment controller") - } - - ds.pods, err = join.DeploymentPods(ctx, ds.deployments, ds.pods) - if err != nil { - ds.closeAll() - return nil, log.Err(err, "deployment join") - } - } - - go ds.waitReadyAll() - go ds.waitDoneAll() - - return ds, nil -} - type datastore struct { podBase pod.Controller servicesBase service.Controller @@ -307,6 +38,12 @@ type datastore struct { donech chan struct{} } +type cacheController interface { + Close() + Done() <-chan struct{} + Ready() <-chan struct{} +} + func (ds *datastore) Pods() pod.Controller { return ds.pods } @@ -374,9 +111,3 @@ func (ds *datastore) controllers() []cacheController { } return existing } - -type cacheController interface { - Close() - Done() <-chan struct{} - Ready() <-chan struct{} -} diff --git a/ds_builder.go b/ds_builder.go new file mode 100644 index 0000000..a889bb5 --- /dev/null +++ b/ds_builder.go @@ -0,0 +1,279 @@ +package kail + +import ( + "context" + + logutil "github.com/boz/go-logutil" + "github.com/boz/kcache/filter" + "github.com/boz/kcache/join" + "github.com/boz/kcache/nsname" + "github.com/boz/kcache/types/daemonset" + "github.com/boz/kcache/types/deployment" + "github.com/boz/kcache/types/pod" + "github.com/boz/kcache/types/replicaset" + "github.com/boz/kcache/types/replicationcontroller" + "github.com/boz/kcache/types/service" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" +) + +type DSBuilder interface { + WithIgnore(selectors ...labels.Selector) DSBuilder + WithSelectors(selectors ...labels.Selector) DSBuilder + WithPods(id ...nsname.NSName) DSBuilder + WithNamespace(name ...string) DSBuilder + WithService(id ...nsname.NSName) DSBuilder + WithNode(name ...string) DSBuilder + WithRC(id ...nsname.NSName) DSBuilder + WithRS(id ...nsname.NSName) DSBuilder + WithDS(id ...nsname.NSName) DSBuilder + WithDeployment(id ...nsname.NSName) DSBuilder + + Create(ctx context.Context, cs kubernetes.Interface) (DS, error) +} + +func NewDSBuilder() DSBuilder { + return &dsBuilder{} +} + +type dsBuilder struct { + ignore []labels.Selector + selectors []labels.Selector + pods []nsname.NSName + namespaces []string + services []nsname.NSName + nodes []string + rcs []nsname.NSName + rss []nsname.NSName + dss []nsname.NSName + deployments []nsname.NSName +} + +func (b *dsBuilder) WithIgnore(selector ...labels.Selector) DSBuilder { + b.ignore = append(b.ignore, selector...) + return b +} + +func (b *dsBuilder) WithSelectors(selectors ...labels.Selector) DSBuilder { + b.selectors = append(b.selectors, selectors...) + return b +} + +func (b *dsBuilder) WithPods(id ...nsname.NSName) DSBuilder { + b.pods = append(b.pods, id...) + return b +} + +func (b *dsBuilder) WithNamespace(name ...string) DSBuilder { + b.namespaces = append(b.namespaces, name...) + return b +} + +func (b *dsBuilder) WithService(id ...nsname.NSName) DSBuilder { + b.services = append(b.services, id...) + return b +} + +func (b *dsBuilder) WithNode(name ...string) DSBuilder { + b.nodes = append(b.nodes, name...) + return b +} + +func (b *dsBuilder) WithRC(id ...nsname.NSName) DSBuilder { + b.rcs = append(b.rcs, id...) + return b +} + +func (b *dsBuilder) WithRS(id ...nsname.NSName) DSBuilder { + b.rss = append(b.rss, id...) + return b +} + +func (b *dsBuilder) WithDS(id ...nsname.NSName) DSBuilder { + b.dss = append(b.dss, id...) + return b +} + +func (b *dsBuilder) WithDeployment(id ...nsname.NSName) DSBuilder { + b.deployments = append(b.deployments, id...) + return b +} + +func (b *dsBuilder) Create(ctx context.Context, cs kubernetes.Interface) (DS, error) { + log := logutil.FromContextOrDefault(ctx) + + ds := &datastore{ + readych: make(chan struct{}), + donech: make(chan struct{}), + } + + base, err := pod.NewController(ctx, log, cs, "") + if err != nil { + return nil, log.Err(err, "base pod controller") + } + + ds.podBase = base + ds.pods, err = base.CloneWithFilter(filter.Null()) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "null filter") + } + + if len(b.ignore) != 0 { + filters := make([]filter.Filter, 0, len(b.ignore)) + for _, selector := range b.ignore { + filters = append(filters, filter.Not(filter.Selector(selector))) + } + ds.pods, err = ds.pods.CloneWithFilter(filter.And(filters...)) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "labels filter") + } + } + + if len(b.selectors) != 0 { + filters := make([]filter.Filter, 0, len(b.selectors)) + for _, selector := range b.selectors { + filters = append(filters, filter.Selector(selector)) + } + ds.pods, err = ds.pods.CloneWithFilter(filter.And(filters...)) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "labels filter") + } + } + + if len(b.pods) != 0 { + ds.pods, err = ds.pods.CloneWithFilter(filter.NSName(b.pods...)) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "pods filter") + } + } + + if sz := len(b.namespaces); sz > 0 { + ids := make([]nsname.NSName, 0, sz) + for _, ns := range b.namespaces { + ids = append(ids, nsname.New(ns, "")) + } + + ds.pods, err = ds.pods.CloneWithFilter(filter.NSName(ids...)) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "namespace filter") + } + } + + if len(b.nodes) != 0 { + ds.pods, err = ds.pods.CloneWithFilter(pod.NodeFilter(b.nodes...)) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "node filter") + } + } + + if len(b.services) != 0 { + ds.servicesBase, err = service.NewController(ctx, log, cs, "") + if err != nil { + ds.closeAll() + return nil, log.Err(err, "service base controller") + } + + ds.services, err = ds.servicesBase.CloneWithFilter(filter.NSName(b.services...)) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "service controller") + } + + ds.pods, err = join.ServicePods(ctx, ds.services, ds.pods) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "service join") + } + } + + if len(b.rcs) != 0 { + ds.rcsBase, err = replicationcontroller.NewController(ctx, log, cs, "") + if err != nil { + ds.closeAll() + return nil, log.Err(err, "rc base controller") + } + + ds.rcs, err = ds.rcsBase.CloneWithFilter(filter.NSName(b.rcs...)) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "rc controller") + } + + ds.pods, err = join.RCPods(ctx, ds.rcs, ds.pods) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "rc join") + } + } + + if len(b.rss) != 0 { + ds.rssBase, err = replicaset.NewController(ctx, log, cs, "") + if err != nil { + ds.closeAll() + return nil, log.Err(err, "rs base controller") + } + + ds.rss, err = ds.rssBase.CloneWithFilter(filter.NSName(b.rss...)) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "rs controller") + } + + ds.pods, err = join.RSPods(ctx, ds.rss, ds.pods) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "rs join") + } + } + + if len(b.dss) != 0 { + ds.dssBase, err = daemonset.NewController(ctx, log, cs, "") + if err != nil { + ds.closeAll() + return nil, log.Err(err, "ds base controller") + } + + ds.dss, err = ds.dssBase.CloneWithFilter(filter.NSName(b.dss...)) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "ds controller") + } + + ds.pods, err = join.DaemonSetPods(ctx, ds.dss, ds.pods) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "ds join") + } + } + + if len(b.deployments) != 0 { + ds.deploymentsBase, err = deployment.NewController(ctx, log, cs, "") + if err != nil { + ds.closeAll() + return nil, log.Err(err, "deployment base controller") + } + + ds.deployments, err = ds.deploymentsBase.CloneWithFilter(filter.NSName(b.deployments...)) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "deployment controller") + } + + ds.pods, err = join.DeploymentPods(ctx, ds.deployments, ds.pods) + if err != nil { + ds.closeAll() + return nil, log.Err(err, "deployment join") + } + } + + go ds.waitReadyAll() + go ds.waitDoneAll() + + return ds, nil +} diff --git a/monitor.go b/monitor.go index 48507a5..838cb68 100644 --- a/monitor.go +++ b/monitor.go @@ -140,8 +140,7 @@ func (m *_monitor) readloop(ctx context.Context) error { return io.EOF } - logs := string(logbuf[0:nread]) - event := newEvent(m.source, logs) + event := newEvent(m.source, logbuf[0:nread]) select { case m.eventch <- event: diff --git a/util.go b/util.go index cc55fb1..e1cb672 100644 --- a/util.go +++ b/util.go @@ -42,22 +42,22 @@ func (es eventSource) String() string { type Event interface { Source() EventSource - Log() string + Log() []byte } -func newEvent(source EventSource, log string) Event { +func newEvent(source EventSource, log []byte) Event { return &event{source, log} } type event struct { source EventSource - log string + log []byte } func (e *event) Source() EventSource { return e.source } -func (e *event) Log() string { +func (e *event) Log() []byte { return e.log } diff --git a/writer.go b/writer.go index 4edecf6..1bfb07b 100644 --- a/writer.go +++ b/writer.go @@ -37,14 +37,14 @@ func (w *writer) Fprint(out io.Writer, ev Event) error { if _, err := prefixColor.Fprint(out, ": "); err != nil { return err } - if _, err := out.Write([]byte(ev.Log())); err != nil { + if _, err := out.Write(ev.Log()); err != nil { return err } return nil } func (w *writer) prefix(ev Event) string { - return fmt.Sprintf("%v/%v@%v", + return fmt.Sprintf("%v/%v[%v]", ev.Source().Namespace(), ev.Source().Name(), ev.Source().Container())