Skip to content

Commit

Permalink
more debug output + new webhook delivery tool
Browse files Browse the repository at this point in the history
  • Loading branch information
NorseGaud committed Nov 25, 2024
1 parent 9471436 commit a16b71d
Show file tree
Hide file tree
Showing 14 changed files with 467 additions and 163 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: 't1-failure-no-tag'
name: 't1-cancelled-failure-no-tag'
on:
workflow_dispatch:

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: 't1-failure-no-template'
name: 't1-cancelled-failure-no-template'
on:
workflow_dispatch:

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/t2-dual-without-tag.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
echo "123"
testJob2:
runs-on: [
"anka-template:6abae54f-025d-4a27-b5eb-b985d5eddac9",
"anka-template:fa990c7f-d540-4c5b-bf72-b886c4692c3a",
]
steps:
- uses: actions/checkout@v3
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ You can control the location plugins are stored on the host by setting the `plug

### Metrics

Metrics for monitoring are available at `http://127.0.0.1:8080/metrics?format=json` or `http://127.0.0.1:8080/metrics?format=prometheus`. These instructions apply to handler and receiver plugins, but receivers can differ slightly in what metrics are available. Be sure to check the specific plugin documentation for more information and examples.
Metrics for monitoring are available at `http://127.0.0.1:8080/metrics?format=json` or `http://127.0.0.1:8080/metrics?format=prometheus`. This applies to both handler and receiver plugins, but receivers can differ slightly in what metrics are available. Be sure to check the specific plugin documentation for more information and examples.

Note: If port 8080 is already in use, Anklet will automatically increment the port by 1 until it finds an open port.

- You can change the port in the `config.yml` under `metrics`, like so:

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.2
0.10.0
3 changes: 2 additions & 1 deletion internal/anka/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ func (cli *Cli) AnkaCopy(pluginCtx context.Context, filesToCopyIn ...string) err
if copyOutput.Status != "OK" {
return fmt.Errorf("error copying into vm: %s", copyOutput.Message)
}
logger.DebugContext(pluginCtx, "successfully copied file into vm", "file", hostLevelFile)
logger.DebugContext(pluginCtx, "copy output", "std", copyOutput)
logger.InfoContext(pluginCtx, "successfully copied file into vm", "file", hostLevelFile, "stdout", copyOutput.Message)
}

return nil
Expand Down
11 changes: 1 addition & 10 deletions internal/config/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package config

import (
"net/http"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -66,7 +65,7 @@ type Plugin struct {
Database Database `yaml:"database"`
RegistryURL string `yaml:"registry_url"`
PrivateKey string `yaml:"private_key"`
AppID int `yaml:"app_id"`
AppID int64 `yaml:"app_id"`
InstallationID int64 `yaml:"installation_id"`
Workflows Workflow `yaml:"workflows"`
Port string `yaml:"port"`
Expand Down Expand Up @@ -215,14 +214,6 @@ func GetGlobalsFromContext(ctx context.Context) Globals {
return globals
}

func GetHttpTransportFromContext(ctx context.Context) *http.Transport {
httpTransport, ok := ctx.Value(ContextKey("httpTransport")).(*http.Transport)
if !ok {
panic("GetHttpTransportFromContext failed")
}
return httpTransport
}

func GetLoadedConfigFromContext(ctx context.Context) *Config {
config, ok := ctx.Value(ContextKey("config")).(*Config)
if !ok {
Expand Down
108 changes: 107 additions & 1 deletion internal/github/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@ package github

import (
"context"
"log/slog"
"net/http"
"os"
"strings"
"time"

"github.com/bradleyfalzon/ghinstallation/v2"
"github.com/gofri/go-github-ratelimit/github_ratelimit"
"github.com/google/go-github/v66/github"
"github.com/veertuinc/anklet/internal/config"
"github.com/veertuinc/anklet/internal/logging"
"github.com/veertuinc/anklet/internal/metrics"
)

type GitHubClientWrapper struct {
Expand All @@ -28,8 +36,106 @@ func GetGitHubClientFromContext(ctx context.Context) *github.Client {

func GetRateLimitWaiterClientFromContext(ctx context.Context) *http.Client {
rateLimiter, ok := ctx.Value(config.ContextKey("rateLimiter")).(*http.Client)
if !ok {
if rateLimiter != nil && !ok {
panic("GetRateLimitWaiterClientFromContext failed")
}
return rateLimiter
}

func GetHttpTransportFromContext(ctx context.Context) *http.Transport {
httpTransport, ok := ctx.Value(config.ContextKey("httpTransport")).(*http.Transport)
if httpTransport != nil && !ok {
panic("GetHttpTransportFromContext failed")
}
return httpTransport
}

func AuthenticateAndReturnGitHubClient(
ctx context.Context,
logger *slog.Logger,
privateKey string,
appID int64,
installationID int64,
token string,
) (*github.Client, error) {

var client *github.Client
var err error
var rateLimiter *http.Client
rateLimiter = GetRateLimitWaiterClientFromContext(ctx)
var httpTransport *http.Transport
httpTransport = GetHttpTransportFromContext(ctx)
if httpTransport == nil {
httpTransport = http.DefaultTransport.(*http.Transport)
}
if rateLimiter == nil {
rateLimiter, err = github_ratelimit.NewRateLimitWaiterClient(httpTransport)
if err != nil {
logger.ErrorContext(ctx, "error creating github_ratelimit.NewRateLimitWaiterClient", "err", err)
return nil, err
}
}
if privateKey != "" {
// support private key in a file or as text
var privateKeyBytes []byte
privateKeyBytes, err = os.ReadFile(privateKey)
if err != nil {
privateKeyBytes = []byte(privateKey)
}
itr, err := ghinstallation.New(httpTransport, appID, installationID, privateKeyBytes)
if err != nil {
if strings.Contains(err.Error(), "invalid key") {
panic("error creating github app installation token: " + err.Error() + " (does the key exist on the filesystem?)")
} else {
panic("error creating github app installation token: " + err.Error())
}
}
rateLimiter.Transport = itr
client = github.NewClient(rateLimiter)
} else {
client = github.NewClient(rateLimiter).WithAuthToken(token)
}
return client, nil

}

// https://github.com/gofri/go-github-ratelimit has yet to support primary rate limits, so we have to do it ourselves.
func ExecuteGitHubClientFunction[T any](pluginCtx context.Context, logger *slog.Logger, executeFunc func() (*T, *github.Response, error)) (context.Context, *T, *github.Response, error) {
innerPluginCtx, cancel := context.WithCancel(pluginCtx) // Inherit from parent context
defer cancel()
result, response, err := executeFunc()
if response != nil {
innerPluginCtx = logging.AppendCtx(innerPluginCtx, slog.Int("api_limit_remaining", response.Rate.Remaining))
innerPluginCtx = logging.AppendCtx(innerPluginCtx, slog.String("api_limit_reset_time", response.Rate.Reset.Time.Format(time.RFC3339)))
innerPluginCtx = logging.AppendCtx(innerPluginCtx, slog.Int("api_limit", response.Rate.Limit))
if response.Rate.Remaining <= 10 { // handle primary rate limiting
sleepDuration := time.Until(response.Rate.Reset.Time) + time.Second // Adding a second to ensure we're past the reset time
logger.WarnContext(innerPluginCtx, "GitHub API rate limit exceeded, sleeping until reset")
metricsData := metrics.GetMetricsDataFromContext(pluginCtx)
ctxPlugin := config.GetPluginFromContext(pluginCtx)
metricsData.UpdatePlugin(pluginCtx, logger, metrics.PluginBase{
Name: ctxPlugin.Name,
Status: "limit_paused",
})
select {
case <-time.After(sleepDuration):
metricsData.UpdatePlugin(pluginCtx, logger, metrics.PluginBase{
Name: ctxPlugin.Name,
Status: "running",
})
return ExecuteGitHubClientFunction(pluginCtx, logger, executeFunc) // Retry the function after waiting
case <-pluginCtx.Done():
return pluginCtx, nil, nil, pluginCtx.Err()
}
}
}
if err != nil {
if err.Error() != "context canceled" {
if !strings.Contains(err.Error(), "try again later") {
logger.Error("error executing GitHub client function: " + err.Error())
}
}
return pluginCtx, nil, nil, err
}
return pluginCtx, result, response, nil
}
16 changes: 14 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -266,11 +267,22 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.
metricsPort := "8080"
if loadedConfig.Metrics.Port != "" {
metricsPort = loadedConfig.Metrics.Port
} else {
for {
ln, err := net.Listen("tcp", ":"+metricsPort)
if err == nil {
ln.Close()
break
}
port, _ := strconv.Atoi(metricsPort)
port++
metricsPort = strconv.Itoa(port)
}
}
ln, err := net.Listen("tcp", ":"+metricsPort)
if err != nil {
logger.ErrorContext(workerCtx, "port already in use", "port", metricsPort, "error", err)
panic(fmt.Sprintf("port %s is already in use", metricsPort))
logger.ErrorContext(workerCtx, "metrics port already in use", "port", metricsPort, "error", err)
panic(fmt.Sprintf("metrics port %s is already in use", metricsPort))
}
ln.Close()
metricsService := metrics.NewServer(metricsPort)
Expand Down
Loading

0 comments on commit a16b71d

Please sign in to comment.