Skip to content

Commit

Permalink
Move crontab job to temporal workflow (#202)
Browse files Browse the repository at this point in the history
Co-authored-by: 泽华 <[email protected]>
  • Loading branch information
pulltheflower and 泽华 authored Dec 11, 2024
1 parent 2d54ab0 commit 9e84a56
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 71 deletions.
19 changes: 19 additions & 0 deletions api/workflow/activity/calc_recom_score.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package activity

import (
"context"
"log/slog"

"opencsg.com/csghub-server/common/config"
"opencsg.com/csghub-server/component"
)

func CalcRecomScore(ctx context.Context, config *config.Config) error {
c, err := component.NewRecomComponent(config)
if err != nil {
slog.Error("failed to create recom component", "err", err)
return err
}
c.CalculateRecomScore(context.Background())
return nil
}
28 changes: 28 additions & 0 deletions api/workflow/activity/sync_as_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package activity

import (
"context"
"log/slog"

"opencsg.com/csghub-server/builder/multisync"
"opencsg.com/csghub-server/builder/store/database"
"opencsg.com/csghub-server/common/config"
"opencsg.com/csghub-server/component"
)

func SyncAsClient(ctx context.Context, config *config.Config) error {
c, err := component.NewMultiSyncComponent(config)
if err != nil {
slog.Error("failed to create multi sync component", "err", err)
return err
}
syncClientSettingStore := database.NewSyncClientSettingStore()
setting, err := syncClientSettingStore.First(ctx)
if err != nil {
slog.Error("failed to find sync client setting", "error", err)
return err
}
apiDomain := config.MultiSync.SaasAPIDomain
sc := multisync.FromOpenCSG(apiDomain, setting.Token)
return c.SyncAsClient(ctx, sc)
}
32 changes: 32 additions & 0 deletions api/workflow/cron_calc_recom_score.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package workflow

import (
"time"

"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"opencsg.com/csghub-server/api/workflow/activity"
"opencsg.com/csghub-server/common/config"
)

func CalcRecomScoreWorkflow(ctx workflow.Context, config *config.Config) error {
logger := workflow.GetLogger(ctx)
logger.Info("calc recom score workflow started")

retryPolicy := &temporal.RetryPolicy{
MaximumAttempts: 3,
}

options := workflow.ActivityOptions{
StartToCloseTimeout: time.Hour * 1,
RetryPolicy: retryPolicy,
}

ctx = workflow.WithActivityOptions(ctx, options)
err := workflow.ExecuteActivity(ctx, activity.CalcRecomScore, config).Get(ctx, nil)
if err != nil {
logger.Error("failed to calc recom score", "error", err)
return err
}
return nil
}
32 changes: 32 additions & 0 deletions api/workflow/cron_sync_as_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package workflow

import (
"time"

"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"opencsg.com/csghub-server/api/workflow/activity"
"opencsg.com/csghub-server/common/config"
)

func SyncAsClientWorkflow(ctx workflow.Context, config *config.Config) error {
logger := workflow.GetLogger(ctx)
logger.Info("sync as client workflow started")

retryPolicy := &temporal.RetryPolicy{
MaximumAttempts: 3,
}

options := workflow.ActivityOptions{
StartToCloseTimeout: time.Hour * 1,
RetryPolicy: retryPolicy,
}

ctx = workflow.WithActivityOptions(ctx, options)
err := workflow.ExecuteActivity(ctx, activity.SyncAsClient, config).Get(ctx, nil)
if err != nil {
logger.Error("failed to sync as client", "error", err)
return err
}
return nil
}
88 changes: 88 additions & 0 deletions api/workflow/cron_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package workflow

import (
"context"
"fmt"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"opencsg.com/csghub-server/api/workflow/activity"
"opencsg.com/csghub-server/common/config"
)

const (
AlreadyScheduledMessage = "schedule with this ID is already registered"
CronJobQueueName = "workflow_cron_queue"
)

func RegisterCronJobs(config *config.Config) error {
var err error
if wfClient == nil {
wfClient, err = client.Dial(client.Options{
HostPort: config.WorkFLow.Endpoint,
})
if err != nil {
return fmt.Errorf("unable to create workflow client, error:%w", err)
}
}

if !config.Saas {
_, err = wfClient.ScheduleClient().Create(context.Background(), client.ScheduleOptions{
ID: "sync-as-client-schedule",
Spec: client.ScheduleSpec{
CronExpressions: []string{config.CronJob.SyncAsClientCronExpression},
},
Overlap: enumspb.SCHEDULE_OVERLAP_POLICY_SKIP,
Action: &client.ScheduleWorkflowAction{
ID: "sync-as-client-workflow",
TaskQueue: CronJobQueueName,
Workflow: SyncAsClientWorkflow,
Args: []interface{}{config},
},
})
if err != nil && err.Error() != AlreadyScheduledMessage {
return fmt.Errorf("unable to create schedule, error:%w", err)
}
}

_, err = wfClient.ScheduleClient().Create(context.Background(), client.ScheduleOptions{
ID: "calc-recom-score-schedule",
Spec: client.ScheduleSpec{
CronExpressions: []string{config.CronJob.CalcRecomScoreCronExpression},
},
Overlap: enumspb.SCHEDULE_OVERLAP_POLICY_SKIP,
Action: &client.ScheduleWorkflowAction{
ID: "calc-recom-score-workflow",
TaskQueue: CronJobQueueName,
Workflow: CalcRecomScoreWorkflow,
Args: []interface{}{config},
},
})
if err != nil && err.Error() != AlreadyScheduledMessage {
return fmt.Errorf("unable to create schedule, error:%w", err)
}

return nil
}

func StartCronWorker(config *config.Config) error {
var err error
if wfClient == nil {
wfClient, err = client.Dial(client.Options{
HostPort: config.WorkFLow.Endpoint,
})
if err != nil {
return fmt.Errorf("unable to create workflow client, error:%w", err)
}
}
wfWorker = worker.New(wfClient, CronJobQueueName, worker.Options{})
if !config.Saas {
wfWorker.RegisterWorkflow(SyncAsClientWorkflow)
wfWorker.RegisterActivity(activity.SyncAsClient)
}
wfWorker.RegisterWorkflow(CalcRecomScoreWorkflow)
wfWorker.RegisterActivity(activity.CalcRecomScore)

return wfWorker.Start()
}
6 changes: 4 additions & 2 deletions api/workflow/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (

const HandlePushQueueName = "workflow_handle_push_queue"

var wfWorker worker.Worker
var wfClient client.Client
var (
wfWorker worker.Worker
wfClient client.Client
)

func StartWorker(config *config.Config) error {
var err error
Expand Down
11 changes: 11 additions & 0 deletions cmd/csghub-server/cmd/start/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ var serverCmd = &cobra.Command{
if err != nil {
return fmt.Errorf("failed to start worker: %w", err)
}

err = workflow.RegisterCronJobs(cfg)
if err != nil {
return fmt.Errorf("failed to register cron jobs: %w", err)
}

err = workflow.StartCronWorker(cfg)
if err != nil {
return fmt.Errorf("failed to start cron worker: %w", err)
}

server := httpbase.NewGracefulServer(
httpbase.GraceServerOpt{
Port: cfg.APIServer.Port,
Expand Down
5 changes: 5 additions & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ type Config struct {
// S3PublicBucket is used to store public files, should set bucket same with portal
S3PublicBucket string `env:"STARHUB_SERVER_ARGO_S3_PUBLIC_BUCKET"`
}

CronJob struct {
SyncAsClientCronExpression string `env:"STARHUB_SERVER_CRON_JOB_SYNC_AS_CLIENT_CRON_EXPRESSION, default=0 * * * *"`
CalcRecomScoreCronExpression string `env:"STARHUB_SERVER_CRON_JOB_CLAC_RECOM_SCORE_CRON_EXPRESSION, default=0 1 * * *"`
}
}

func SetConfigFile(file string) {
Expand Down
4 changes: 4 additions & 0 deletions common/config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,7 @@ encoded_sensitive_words = "5Lmg6L+R5bmzLHhpamlucGluZw=="

[workflow]
endpoint = "localhost:7233"

[cron_job]
sync_as_client_cron_expression = "0 * * * *"
calc_recom_score_cron_expression = "0 1 * * *"
12 changes: 6 additions & 6 deletions component/multi_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (c *multiSyncComponentImpl) createLocalDataset(ctx context.Context, m *type
}

err = c.repo.DeleteAllTags(ctx, newDBRepo.ID)
if err != nil {
if err != nil && err != sql.ErrNoRows {
slog.Error("failed to delete database tag", slog.Any("error", err))
}

Expand All @@ -253,14 +253,14 @@ func (c *multiSyncComponentImpl) createLocalDataset(ctx context.Context, m *type
}

err = c.repo.DeleteAllFiles(ctx, newDBRepo.ID)
if err != nil {
if err != nil && err != sql.ErrNoRows {
slog.Error("failed to delete database files", slog.Any("error", err))
}

ctxGetFileList, cancel := context.WithTimeout(ctx, 5*time.Second)
files, err := sc.FileList(ctxGetFileList, s)
cancel()
if err != nil {
if err != nil && err != sql.ErrNoRows {
slog.Error("failed to get all files of repo", slog.Any("sync_version", s), slog.Any("error", err))
}
if len(files) > 0 {
Expand Down Expand Up @@ -367,7 +367,7 @@ func (c *multiSyncComponentImpl) createLocalModel(ctx context.Context, m *types.
})
}
err = c.repo.DeleteAllTags(ctx, newDBRepo.ID)
if err != nil {
if err != nil && err != sql.ErrNoRows {
slog.Error("failed to delete database tag", slog.Any("error", err))
}
err = c.repo.BatchCreateRepoTags(ctx, repoTags)
Expand All @@ -377,14 +377,14 @@ func (c *multiSyncComponentImpl) createLocalModel(ctx context.Context, m *types.
}

err = c.repo.DeleteAllFiles(ctx, newDBRepo.ID)
if err != nil {
if err != nil && err != sql.ErrNoRows {
slog.Error("failed to delete all files for repo", slog.Any("error", err))
}

ctxGetFileList, cancel := context.WithTimeout(ctx, 5*time.Second)
files, err := sc.FileList(ctxGetFileList, s)
cancel()
if err != nil {
if err != nil && err != sql.ErrNoRows {
slog.Error("failed to get all files of repo", slog.Any("sync_version", s), slog.Any("error", err))
}
if len(files) > 0 {
Expand Down
63 changes: 0 additions & 63 deletions scripts/init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,69 +72,6 @@ if [ "$STARHUB_SERVER_GITSERVER_TYPE" = "gitea" ]; then
fi
fi


# Create cron job
cron=""
read_and_set_cron() {
env_variable=$1
default_value=$2

cron=${!env_variable}

if [[ -z $cron ]]; then
cron=$default_value
fi
}

current_cron_jobs=$(crontab -l 2>/dev/null)

if echo "$current_cron_jobs" | grep -qF "starhub logscan gitea"; then
echo "Gitea log scan job already exists"
else
echo "Creating cron job for gitea logscan..."
read_and_set_cron "STARHUB_SERVER_CRON_LOGSCAN" "0 23 * * *"
(crontab -l ;echo "$cron STARHUB_DATABASE_DSN=$STARHUB_DATABASE_DSN /starhub-bin/starhub logscan gitea --path /starhub-bin/logs/gitea.log >> /starhub-bin/cron.log 2>&1") | crontab -
fi

if echo "$current_cron_jobs" | grep -qF "calc-recom-score"; then
echo "Calculate score job already exists"
else
echo "Creating cron job for repository recommendation score calculation..."
read_and_set_cron "STARHUB_SERVER_CRON_CALC_RECOM_SCORE" "0 1 * * *"
(crontab -l ;echo "$cron STARHUB_DATABASE_DSN=$STARHUB_DATABASE_DSN STARHUB_SERVER_GITSERVER_HOST=$STARHUB_SERVER_GITSERVER_HOST STARHUB_SERVER_GITSERVER_USERNAME=$STARHUB_SERVER_GITSERVER_USERNAME STARHUB_SERVER_GITSERVER_PASSWORD=$STARHUB_SERVER_GITSERVER_PASSWORD /starhub-bin/starhub cron calc-recom-score >> /starhub-bin/cron-calc-recom-score.log 2>&1") | crontab -
fi

if echo "$current_cron_jobs" | grep -qF "create-push-mirror"; then
echo "Create push mirror job already exists"
else
echo "Creating cron job for push mirror creation..."
read_and_set_cron "STARHUB_SERVER_CRON_PUSH_MIRROR" "*/10 * * * *"
(crontab -l ;echo "$cron STARHUB_DATABASE_DSN=$STARHUB_DATABASE_DSN STARHUB_SERVER_GITSERVER_HOST=$STARHUB_SERVER_GITSERVER_HOST STARHUB_SERVER_GITSERVER_USERNAME=$STARHUB_SERVER_GITSERVER_USERNAME STARHUB_SERVER_GITSERVER_PASSWORD=$STARHUB_SERVER_GITSERVER_PASSWORD STARHUB_SERVER_MIRRORSERVER_HOST=$STARHUB_SERVER_MIRRORSERVER_HOST STARHUB_SERVER_MIRRORSERVER_USERNAME=$STARHUB_SERVER_MIRRORSERVER_USERNAME STARHUB_SERVER_MIRRORSERVER_PASSWORD=$STARHUB_SERVER_MIRRORSERVER_PASSWORD /starhub-bin/starhub cron create-push-mirror >> /starhub-bin/create-push-mirror.log 2>&1") | crontab -
fi

if echo "$current_cron_jobs" | grep -qF "check-mirror-progress"; then
echo "Check mirror progress job already exists"
else
echo "Creating cron job for update mirror status and progress..."
read_and_set_cron "STARHUB_SERVER_CRON_PUSH_MIRROR" "*/5 * * * *"
(crontab -l ;echo "$cronstarhub-bin/starhub mirror check-mirror-progress >> /starhub-bin/check-mirror-progress.log 2>&1") | crontab -
fi

if [ "$STARHUB_SERVER_SAAS" == "false" ]; then
if echo "$current_cron_jobs" | grep -qF "sync-as-client"; then
echo "Sync as client job already exists"
else
echo "Creating cron job for sync saas sync verions..."
read_and_set_cron "STARHUB_SERVER_CRON_SYNC_AS_CLIENT" "*/10 * * * *"
(crontab -l ;echo "$cron STARHUB_SERVER_REDIS_ENDPOINT=$STARHUB_SERVER_REDIS_ENDPOINT STARHUB_SERVER_REDIS_USER=$STARHUB_SERVER_REDIS_USER STARHUB_SERVER_REDIS_PASSWORD=$STARHUB_SERVER_REDIS_PASSWORD STARHUB_DATABASE_DSN=$STARHUB_DATABASE_DSN STARHUB_SERVER_GITSERVER_TYPE=$STARHUB_SERVER_GITSERVER_TYPE STARHUB_SERVER_GITALY_TOKEN=$STARHUB_SERVER_GITALY_TOKEN STARHUB_SERVER_GITALY_SERVER_SOCKET=$STARHUB_SERVER_GITALY_SERVER_SOCKET STARHUB_SERVER_GITSERVER_HOST=$STARHUB_SERVER_GITSERVER_HOST STARHUB_SERVER_GITSERVER_USERNAME=$STARHUB_SERVER_GITSERVER_USERNAME STARHUB_SERVER_GITSERVER_PASSWORD=$STARHUB_SERVER_GITSERVER_PASSWORD /starhub-bin/starhub sync sync-as-client >> /starhub-bin/cron-sync-as-client.log 2>&1") | crontab -
fi
else
echo "Saas does not need sync-as-client cron job"
fi
# Reload cron server
service cron restart
echo "Done."

echo "Database setup..."

echo "Migration init"
Expand Down

0 comments on commit 9e84a56

Please sign in to comment.