Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(reloader): allow passing http Header in the http reloader request #8042

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons.
- [#8000](https://github.com/thanos-io/thanos/pull/8000) Query: Bump promql-engine, pass partial response through options
- [#7353](https://github.com/thanos-io/thanos/pull/7353) Receiver: introduce optional cache for matchers in series calls.
- [#8042](https://github.com/thanos-io/thanos/pull/8042) Reloader: introduce optional header option for http reloader.

### Changed

Expand Down
27 changes: 20 additions & 7 deletions pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@
// - Optionally, specify different output file for watched `cfgFile` (`cfgOutputFile`).
// This will also try decompress the `cfgFile` if needed and substitute ALL the envvars using Kubernetes substitution format: (`$(var)`)
// - Watch on changes against certain directories (`watchedDirs`).
// - Optionally, specify map of header for HTTPReloader
//
// Once any of those two changes, Prometheus on given `reloadURL` will be notified, causing Prometheus to reload configuration and rules.
//
// This and below for reloader:
//
// u, _ := url.Parse("http://localhost:9090")
// h := &http.Header{
// "Authorization": {fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte("user:password")))},
// }
// rl := reloader.New(nil, nil, &reloader.Options{
// ReloadURL: reloader.ReloadURLFromBase(u),
// ReloadHeader: h,
// CfgFile: "/path/to/cfg",
// CfgOutputFile: "/path/to/cfg.out",
// WatchedDirs: []string{"/path/to/dirs"},
Expand Down Expand Up @@ -141,6 +146,9 @@ type Options struct {
// ReloadURL is the Prometheus URL to trigger reloads.
ReloadURL *url.URL

// ReloadHeader is the map of headers transmits in http reload request
ReloadHeader *http.Header

// HTTP client used to connect to the web server.
HTTPClient http.Client

Expand Down Expand Up @@ -254,7 +262,7 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader {
r.tr = NewPIDReloader(r.logger, o.ProcessName, o.RuntimeInfoURL, o.HTTPClient)
r.reloaderInfo.WithLabelValues("signal").Set(1)
} else {
r.tr = NewHTTPReloader(r.logger, o.ReloadURL, o.HTTPClient)
r.tr = NewHTTPReloader(r.logger, o.ReloadURL, o.ReloadHeader, o.HTTPClient)
r.reloaderInfo.WithLabelValues("http").Set(1)
}

Expand Down Expand Up @@ -656,22 +664,27 @@ var _ = TriggerReloader(&PIDReloader{})
type HTTPReloader struct {
logger log.Logger

u *url.URL
c http.Client
u *url.URL
headers *http.Header
c http.Client
}

var _ = TriggerReloader(&HTTPReloader{})

func NewHTTPReloader(logger log.Logger, u *url.URL, c http.Client) *HTTPReloader {
func NewHTTPReloader(logger log.Logger, u *url.URL, headers *http.Header, c http.Client) *HTTPReloader {
return &HTTPReloader{
logger: logger,
u: u,
c: c,
logger: logger,
u: u,
headers: headers,
c: c,
}
}

func (hr *HTTPReloader) TriggerReload(ctx context.Context) error {
req, err := http.NewRequest("POST", hr.u.String(), nil)
if hr.headers != nil {
req.Header = *hr.headers
}
if err != nil {
return errors.Wrap(err, "create request")
}
Expand Down
92 changes: 92 additions & 0 deletions pkg/reloader/reloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package reloader

import (
"context"
"encoding/base64"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -200,6 +201,97 @@ config:
testutil.Ok(t, os.Unsetenv("TEST_RELOADER_THANOS_ENV2"))
}

func TestReloader_ConfigApplyWithHttpHeader(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

l, err := net.Listen("tcp", "localhost:0")
testutil.Ok(t, err)

reloads := &atomic.Value{}
reloads.Store(0)
i := 0
srv := &http.Server{}
srv.Handler = http.HandlerFunc(func(resp http.ResponseWriter, r *http.Request) {
i++
if i%2 == 0 {
// Every second request, fail to ensure that retry works.
resp.WriteHeader(http.StatusServiceUnavailable)
return
}
// return http invalid request if basic authorization header is missing
if r.Header.Get("Authorization") != "Basic dXNlcjpwYXNzd29yZA==" {
resp.WriteHeader(http.StatusBadRequest)
} else {
reloads.Store(reloads.Load().(int) + 1) // The only writer.
resp.WriteHeader(http.StatusOK)
}
})
go func() { _ = srv.Serve(l) }()
defer func() { testutil.Ok(t, srv.Close()) }()

reloadURL, err := url.Parse(fmt.Sprintf("http://%s", l.Addr().String()))
testutil.Ok(t, err)

dir := t.TempDir()

testutil.Ok(t, os.Mkdir(filepath.Join(dir, "in"), os.ModePerm))
testutil.Ok(t, os.Mkdir(filepath.Join(dir, "out"), os.ModePerm))

var (
input = filepath.Join(dir, "in", "cfg.yaml.tmpl")
output = filepath.Join(dir, "out", "cfg.yaml")
)
reloader := New(nil, nil, &Options{
ReloadURL: reloadURL,
ReloadHeader: &http.Header{
"Authorization": {fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte("user:password")))},
},
CfgFile: input,
CfgOutputFile: output,
CfgDirs: nil,
WatchedDirs: nil,
WatchInterval: 9999 * time.Hour, // Disable interval to test watch logic only.
RetryInterval: 100 * time.Millisecond,
DelayInterval: 1 * time.Millisecond,
})

testutil.Ok(t, os.WriteFile(input, []byte(`
config:
a: 1
b: 2
c: 3
`), os.ModePerm))

rctx, cancel2 := context.WithCancel(ctx)
g := sync.WaitGroup{}
g.Add(1)
go func() {
defer g.Done()
testutil.Ok(t, reloader.Watch(rctx))
}()

Outer:
for {
select {
case <-ctx.Done():
break Outer
case <-time.After(300 * time.Millisecond):
}

rel := reloads.Load().(int)
if rel == 1 {
testutil.Equals(t, 1, rel)
break
}
}

cancel2()
g.Wait()
}

func TestReloader_ConfigRollback(t *testing.T) {
t.Parallel()

Expand Down
Loading