Skip to content

Commit

Permalink
Moving init funciton to init package
Browse files Browse the repository at this point in the history
  • Loading branch information
sciclon2 committed Sep 2, 2024
1 parent 25ca44e commit 09217f8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 17 deletions.
12 changes: 1 addition & 11 deletions cmd/kafka-lag-go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@ import (
"os"
"time"

"github.com/sciclon2/kafka-lag-go/pkg/config"
maininit "github.com/sciclon2/kafka-lag-go/pkg/init"

"github.com/sciclon2/kafka-lag-go/pkg/heartbeat"
"github.com/sciclon2/kafka-lag-go/pkg/kafka"
"github.com/sciclon2/kafka-lag-go/pkg/metrics"
"github.com/sciclon2/kafka-lag-go/pkg/storage"
"github.com/sciclon2/kafka-lag-go/pkg/structs"

_ "net/http/pprof"
Expand Down Expand Up @@ -53,7 +50,7 @@ func main() {
defer store.GracefulStop()

// Initialize and start the ApplicationHeartbeat
initializeAndStartHeartbeat(admin, store, heartbeatInterval*time.Second, cfg)
maininit.InitializeAndStartHeartbeat(admin, store, heartbeatInterval*time.Second, cfg)

// Generate a unique ID for this node.
nodeID := generateNodeID()
Expand Down Expand Up @@ -172,10 +169,3 @@ func SleepToMaintainInterval(startTime time.Time, iterationInterval time.Duratio
totalElapsedTime := time.Since(startTime)
log.Printf("Total time elapsed for this iteration including wait time: %v", totalElapsedTime)
}

// initializeAndStartHeartbeat initializes the ApplicationHeartbeat and starts the health check routine.
func initializeAndStartHeartbeat(kafkaAdmin kafka.KafkaAdmin, store storage.Storage, interval time.Duration, cfg *config.Config) *heartbeat.ApplicationHeartbeat {
applicationHeartbeat := heartbeat.NewApplicationHeartbeat(kafkaAdmin, store, interval, cfg.App.HealthCheckPort, cfg.App.HealthCheckPath)
applicationHeartbeat.Start()
return applicationHeartbeat
}
15 changes: 9 additions & 6 deletions pkg/init/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/IBM/sarama"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sciclon2/kafka-lag-go/pkg/config"
"github.com/sciclon2/kafka-lag-go/pkg/heartbeat"
"github.com/sciclon2/kafka-lag-go/pkg/kafka"
"github.com/sciclon2/kafka-lag-go/pkg/storage"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -61,14 +63,15 @@ func InitializeMetricsServer(cfg *config.Config) {
}()
}

// GetConfigFilePath retrieves the configuration file path.
func GetConfigFilePath() string {
// Logic to get the configuration file path.
return config.GetConfigFilePath()
}

func InitializeSignalHandling() chan os.Signal {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGUSR1)
return sigChan
}

// initializeAndStartHeartbeat initializes the ApplicationHeartbeat and starts the health check routine.
func InitializeAndStartHeartbeat(kafkaAdmin kafka.KafkaAdmin, store storage.Storage, interval time.Duration, cfg *config.Config) *heartbeat.ApplicationHeartbeat {
applicationHeartbeat := heartbeat.NewApplicationHeartbeat(kafkaAdmin, store, interval, cfg.App.HealthCheckPort, cfg.App.HealthCheckPath)
applicationHeartbeat.Start()
return applicationHeartbeat
}

0 comments on commit 09217f8

Please sign in to comment.