Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Starter code for raft-otel project #214

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions projects/raft-otel/raft/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM golang:1.22.3

WORKDIR /

COPY go.mod go.sum ./
RUN go mod download

COPY *.go ./
COPY main/*.go ./main/
COPY raft_proto/*.go ./raft_proto/


EXPOSE 7600

# Bit lazy not to build properly but that's not the main point of this exercise
CMD ["go", "run", "main/main.go"]
16 changes: 16 additions & 0 deletions projects/raft-otel/raft/Dockerfile.client
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM golang:1.22.3

WORKDIR /

COPY go.mod go.sum ./
RUN go mod download

COPY *.go ./
COPY client/*.go ./client/
COPY raft_proto/*.go ./raft_proto/


EXPOSE 7600

# Bit lazy not to build properly but that's not the main point of this exercise
CMD ["go", "run", "client/client.go"]
23 changes: 23 additions & 0 deletions projects/raft-otel/raft/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Sample Raft implementation

This is based on Eli Bendersky's [https://eli.thegreenplace.net] RAFT demo code.

I've modified it in a few ways:
* Adds a main.go so you can run the RAFT code as docker containers (or Kube) - peers are found via DNS lookup
* Changed from integer based peer IDs to use of IP addresses, so that the instances can come up without coordination with each other
* Changed from standard RPC to gRPC (as we've been using throughout this course)
* Adds Dockerfile and docker-compose.yml
* Added structure to Command (to simplify gRPCing)
* Removed one test (TestCrashAfterSubmit) as could not make the timing work to reliably crash leader before it had a chance to commit a change (would be easier to do this if code were restructured to inject time)
* Added endpoint for doing some sets/gets of data, and a simple client that calls this - to demo what's usually done with RAFT, also added a client that exercises it

## Building and running this project

If you change the raft.proto protocol buffer definitions, you must regenerate the bindings by:

```
protoc --proto_path=. --go_out=. --go-grpc_out=. raft.proto
```

To run this under docker-compose, use `docker-compose up --build -d` or your preferred variant.
Docker must be installed and running.
60 changes: 60 additions & 0 deletions projects/raft-otel/raft/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"os"
"raft/raft_proto"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const port = 7600

func main() {
addr := flag.String("dns", "raft", "dns address for raft cluster")

if addr == nil || *addr == "" {
fmt.Printf("Must supply dns address of cluster\n")
os.Exit(1)
}

time.Sleep(time.Second * 5) // wait for raft servers to come up

ips, err := net.LookupIP(*addr)
if err != nil {
fmt.Printf("Could not get IPs: %v\n", err)
os.Exit(1)
}

clients := make([]raft_proto.RaftKVServiceClient, 0)

for _, ip := range ips {
fmt.Printf("Connecting to %s\n", ip.String())
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", ip.String(), port), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("%v", err)
}
client := raft_proto.NewRaftKVServiceClient(conn)
clients = append(clients, client)
}

for {
for _, c := range clients {
n := time.Now().Second()
res, err := c.Set(context.TODO(), &raft_proto.SetRequest{Keyname: "cursec", Value: fmt.Sprintf("%d", n)})
fmt.Printf("Called set cursec %d, got %v, %v\n", n, res, err)

time.Sleep(1 * time.Second) // allow consensus to happen

getres, err := c.Get(context.TODO(), &raft_proto.GetRequest{Keyname: "cursec"})
fmt.Printf("Called get cursec, got %v, %v\n", getres, err)
}
time.Sleep(5 * time.Second)
}
}
12 changes: 12 additions & 0 deletions projects/raft-otel/raft/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
services:
raft:
build: .
deploy:
mode: replicated
replicas: 3
client:
build:
dockerfile: ./Dockerfile.client
deploy:
mode: replicated
replicas: 1
16 changes: 16 additions & 0 deletions projects/raft-otel/raft/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module raft

go 1.19

require (
github.com/fortytw2/leaktest v1.3.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
)

require (
golang.org/x/net v0.22.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
)
15 changes: 15 additions & 0 deletions projects/raft-otel/raft/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
113 changes: 113 additions & 0 deletions projects/raft-otel/raft/main/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package main

import (
"flag"
"fmt"
"net"
"os"
"os/signal"
"raft"
"strings"
"syscall"
"time"
)

const port = 7600

func main() {
addr := flag.String("dns", "raft", "dns address for raft cluster")
if_addr := flag.String("if", "eth0", "use IPV4 address of this interface") // eth0 works on docker, may vary for other platforms

if addr == nil || *addr == "" {
fmt.Printf("Must supply dns address of cluster\n")
os.Exit(1)
}

id := getOwnAddr(*if_addr)
fmt.Printf("My address/node ID is %s\n", id)

ready := make(chan interface{})
storage := raft.NewMapStorage()
commitChan := make(chan raft.CommitEntry)
server := raft.NewServer(id, id, storage, ready, commitChan, port)
server.Serve(raft.NewKV())

ips, err := net.LookupIP(*addr)
if err != nil {
fmt.Printf("Could not get IPs: %v\n", err)
os.Exit(1)
}

// Connect to all peers with appropriate waits
// TODO: we only do this once, on startup - we really should periodically check to see if the DNS listing for peers has changed
for _, ip := range ips {
// if not own IP
if !ownAddr(ip, id) {
peerAddr := fmt.Sprintf("%s:%d", ip.String(), port)

connected := false
for rt := 0; rt <= 3 && !connected; rt++ {
fmt.Printf("Connecting to peer %s\n", peerAddr)
err = server.ConnectToPeer(peerAddr, peerAddr)
if err == nil {
connected = true
} else { // probably just not started up yet, retry
fmt.Printf("Error connecting to peer: %+v", err)
time.Sleep(time.Duration(rt+1) * time.Second)
}
}
if err != nil {
fmt.Printf("Exhausted retries connecting to peer %s", peerAddr)
os.Exit(1)
}
}
}

close(ready) // start raft server, peers are connected

gracefulShutdown := make(chan os.Signal, 1)
signal.Notify(gracefulShutdown, syscall.SIGINT, syscall.SIGTERM)
<-gracefulShutdown
server.DisconnectAll()
server.Shutdown()
}

func getOwnAddr(intf string) string {
ifs, err := net.Interfaces()
if err != nil {
fmt.Printf("Could not get intf: %v\n", err)
os.Exit(1)
}

for _, cif := range ifs {
if cif.Name == intf {
ads, _ := cif.Addrs()
for _, addr := range ads {
if isIPV4(addr.String()) {
ip := getIP(addr.String())
return ip.String()
}

}
}
}

fmt.Printf("Could not find intf: %s\n", intf)
os.Exit(1)
return ""
}

func isIPV4(addr string) bool {
parts := strings.Split(addr, "::")
return len(parts) == 1
}

func getIP(addr string) net.IP {
parts := strings.Split(addr, "/")
return net.ParseIP(parts[0])
}

func ownAddr(ip net.IP, myAddr string) bool {
res := ip.String() == myAddr
return res
}
Loading
Loading