Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial kafka-cron solution [WIP] - do not merge #207

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added kompose
Binary file not shown.
Empty file.
13 changes: 13 additions & 0 deletions projects/kafka-cron/alertmanager.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Minimal alertmanager config that doesn't send alerts anywhere -
# Visible at http://localhost:9093/#/alerts
global:
resolve_timeout: 5m
route:
receiver: "null"
group_by:
- job
group_wait: 30s
group_interval: 5m
repeat_interval: 12h
receivers:
- name: "null"
151 changes: 151 additions & 0 deletions projects/kafka-cron/cmds/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package main

import (
"encoding/json"
"flag"
"fmt"
"kafka-cron/configs"
"kafka-cron/message"
"os"
"os/exec"
"os/signal"
"strings"
"syscall"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
var bootstrapServers, clusterName string
var retry bool
flag.StringVar(&bootstrapServers, "bootstrap.servers", "127.0.0.1:9092", "bootstrap servers")
flag.StringVar(&clusterName, "cluster", "cluster-a", "cluster name")
flag.BoolVar(&retry, "retry", false, "retry worker")
flag.Parse()

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
"group.id": "foo",
})
if err != nil {
fmt.Printf("Failed to create consumer: %s", err)
os.Exit(1)
}

topic := configs.GetTopicName(clusterName)
if retry {
topic = configs.GetRetryTopicName(clusterName)
}

err = c.Subscribe(topic, nil)
if err != nil {
fmt.Printf("Failed to subscribe to topic: %s %v\n", topic, err)
os.Exit(1)
} else {
fmt.Printf("Subscribed to topic: %s\n", topic)
}

// Create Producer instance for retrying
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers})
if err != nil {
fmt.Printf("Failed to create producer: %s", err)
os.Exit(1)
}

// Set up a channel for handling Ctrl-C, etc
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

// Process messages
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
msg, err := c.ReadMessage(100 * time.Millisecond)
if err != nil {
// Errors are informational and automatically handled by the consumer
continue
}
recordKey := string(msg.Key)
recordValue := msg.Value
data := message.CronjobMessage{}
err = json.Unmarshal(recordValue, &data)
if err != nil {
fmt.Printf("Failed to decode JSON at offset %d: %v", msg.TopicPartition.Offset, err)
continue
}
fmt.Printf("Consumed record with key %s and value %s\n", recordKey, recordValue)

args := strings.Split(data.Command, " ")
cmd := exec.Command(args[0], args[1:]...)
err = cmd.Run()

if err != nil {
fmt.Printf("Failed to run command %s - re-enqueuing on retry queue: %v\n", data.Command, err)
if data.Retries > 1 {
queueRetryJob(p, data, msg.Key, clusterName)
} else {
// todo dlq
fmt.Printf("To the dead letter queue\n")
queueDLQ(p, data, msg.Key)
}
} else {
fmt.Printf("Command ran OK %s\n", data.Command)
}
}
}

fmt.Printf("Closing consumer\n")
c.Close()
}

func queueRetryJob(p *kafka.Producer, msg message.CronjobMessage, key []byte, cluster string) {
msg.Retries = msg.Retries - 1

jmsg, err := json.Marshal(msg)
if err != nil {
fmt.Println(err)
return
}

topic := configs.GetRetryTopicName(cluster)
fmt.Printf("queuing job: %s on topic %s\n", string(jmsg), topic)
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key: key,
Value: jmsg},
nil,
)
if err != nil {
fmt.Println(err)
return
}
}

func queueDLQ(p *kafka.Producer, msg message.CronjobMessage, key []byte) {
msg.Retries = 0

jmsg, err := json.Marshal(msg)
if err != nil {
fmt.Println(err)
return
}

topic := configs.GetDLQName()
fmt.Printf("queuing job: %s on topic %s\n", string(jmsg), topic)
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key: key,
Value: jmsg},
nil,
)
if err != nil {
fmt.Println(err)
return
}
}
178 changes: 178 additions & 0 deletions projects/kafka-cron/cmds/producer/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package main

import (
"bufio"
"context"
"encoding/json"
"flag"
"fmt"
"kafka-cron/configs"
"kafka-cron/message"
"log"
"os"
"os/signal"
"strconv"
"strings"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/google/uuid"
"github.com/robfig/cron/v3"
)

type cronjob struct {
crontab string
command string
name string
cluster string
retries int
}

func main() {
var bootstrapServers, configPath string
var partitions int

flag.StringVar(&bootstrapServers, "bootstrap.servers", "127.0.0.1:9092", "bootstrap servers")
flag.StringVar(&configPath, "config", "./data/cronjobs.txt", "path to cronjob spec file")
flag.IntVar(&partitions, "partitions", 1, "number of partitions per topic")
flag.Parse()

// Create Producer instance
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers})
if err != nil {
fmt.Printf("Failed to create producer: %s", err)
os.Exit(1)
}

// Create topics if needed
for _, cluster := range configs.GetClusters() {
CreateTopic(p, configs.GetTopicName(cluster), partitions)
CreateTopic(p, configs.GetRetryTopicName(cluster), partitions)
}
CreateTopic(p, configs.GetDLQName(), partitions)

// Parse cronjobs and create schedule
cronjobs := parseCronjobs(configPath)
fmt.Printf("cronjobs: %v\n", cronjobs)

var secondParser = cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.DowOptional | cron.Descriptor)
c := cron.New(cron.WithParser(secondParser), cron.WithChain())
for _, j := range cronjobs {
j := j // for closure
c.AddFunc(j.crontab, func() {
queueJob(p, j.command, j.command, j.cluster, j.retries)
})
fmt.Printf("cronjobs: started cron for %+v\n", j)
}
go c.Start()

fmt.Printf("cronjobs: cron entries are %+v\n", c.Entries())

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
}

func queueJob(p *kafka.Producer, command string, name string, cluster string, retries int) {
msg := message.CronjobMessage{
Command: strings.TrimSpace(command),
Exectime: time.Now().Format(time.RFC3339),
Name: strings.TrimSpace(name),
Retries: retries,
}

jmsg, err := json.Marshal(msg)
if err != nil {
fmt.Println(err)
return
}

key, _ := uuid.New().MarshalText()
topic := configs.GetTopicName(cluster)
fmt.Printf("queuing job: %s on topic %s\n", string(jmsg), topic)
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key: key,
Value: jmsg},
nil,
)
if err != nil {
fmt.Println(err)
return
}
}

func parseCronjobs(path string) []cronjob {
result := make([]cronjob, 0)

f, err := os.Open(path)

if err != nil {
log.Fatal(err)
}

defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
val := scanner.Text()
fields := strings.Split(val, ",")

if len(fields) != 5 {
log.Fatal(fmt.Errorf("wrong format %s %d fields, expect 5", val, len(fields)))
}

retries, err := strconv.Atoi(strings.TrimSpace(fields[4]))
if err != nil {
log.Fatal(fmt.Errorf("wrong format could not parse %s as int (num retries)", fields[4]))
}

cron := cronjob{fields[0], fields[1], fields[2], strings.TrimSpace(fields[3]), retries}
result = append(result, cron)
}
return result
}

// CreateTopic creates a topic using the Admin Client API
func CreateTopic(p *kafka.Producer, topic string, partitions int) {

a, err := kafka.NewAdminClientFromProducer(p)
if err != nil {
fmt.Printf("Failed to create new admin client from producer: %s", err)
os.Exit(1)
}
// Contexts are used to abort or limit the amount of time
// the Admin call blocks waiting for a result.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create topics on cluster.
// Set Admin options to wait up to 60s for the operation to finish on the remote cluster
maxDur, err := time.ParseDuration("60s")
if err != nil {
fmt.Printf("ParseDuration(60s): %s", err)
os.Exit(1)
}
results, err := a.CreateTopics(
ctx,
// Multiple topics can be created simultaneously
// by providing more TopicSpecification structs here.
[]kafka.TopicSpecification{{
Topic: topic,
NumPartitions: partitions,
ReplicationFactor: 1}},
// Admin options
kafka.SetAdminOperationTimeout(maxDur))
if err != nil {
fmt.Printf("Admin Client request error: %v\n", err)
os.Exit(1)
}
for _, result := range results {
if result.Error.Code() != kafka.ErrNoError && result.Error.Code() != kafka.ErrTopicAlreadyExists {
fmt.Printf("Failed to create topic: %v\n", result.Error)
os.Exit(1)
}
fmt.Printf("%v\n", result)
}
a.Close()

}
24 changes: 24 additions & 0 deletions projects/kafka-cron/configs/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package configs

// Hardcoded, for demo purposes.

import (
"fmt"
)

func GetClusters() []string {
clusters := []string{"cluster-a", "cluster-b"}
return clusters
}

func GetTopicName(cluster string) string {
return fmt.Sprintf("%s-cronjobs", cluster)
}

func GetRetryTopicName(cluster string) string {
return fmt.Sprintf("%s-cronjobs-retry", cluster)
}

func GetDLQName() string {
return "dead-letter-queue"
}
3 changes: 3 additions & 0 deletions projects/kafka-cron/data/cronjobs copy.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
* * * * * *, /sbin/ping -c 1 www.microsoft.com, ms-pinger, cluster-a, 3
* * * * * *, /sbin/ping -c 1 www.google.com, google-pinger, cluster-a, 3
* * * * * *, badcmd -c 1 www.google.com, badjob, cluster-a, 3
1 change: 1 addition & 0 deletions projects/kafka-cron/data/cronjobs.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* * * * *, badcmd -c 1 www.google.com, badjob, cluster-a, 3
Loading
Loading