Skip to content

Commit

Permalink
API: Sandbox routing: Use Redis for client-proxy DNS instead of local…
Browse files Browse the repository at this point in the history
… map.
  • Loading branch information
jaytaylor committed Jan 17, 2025
1 parent 8d1235f commit 5dda45e
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 41 deletions.
14 changes: 12 additions & 2 deletions .terraform.lock.hcl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 39 additions & 19 deletions packages/api/internal/dns/server.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,65 @@
package dns

import (
"context"
"fmt"
"log"
"net"
"os"
"strings"
"sync"

redis "github.com/go-redis/redis/v8"
resolver "github.com/miekg/dns"

"github.com/e2b-dev/infra/packages/shared/pkg/smap"
)

type FallbackResolverFn = func(sandboxID string) (string, bool)

const ttl = 0

const defaultRoutingIP = "127.0.0.1"

var ctx = context.Background()

type DNS struct {
mu sync.Mutex
records *smap.Map[string]
rdb *redis.Client
fallbackResolverFn FallbackResolverFn
}

func New() *DNS {
func New(rdbOpts *redis.Options, fallbackResolverFn FallbackResolverFn) *DNS {
return &DNS{
records: smap.New[string](),
rdb: redis.NewClient(rdbOpts),
fallbackResolverFn: fallbackResolverFn,
}
}

func (d *DNS) Add(sandboxID, ip string) {
d.records.Insert(d.hostname(sandboxID), ip)
func (d *DNS) Add(sandboxID, ip string) error {
if err := d.rdb.Set(ctx, d.dnsKeyFor(sandboxID), ip, 86400).Err(); err != nil {
return err
}
return nil
}

func (d *DNS) Remove(sandboxID, ip string) {
d.records.RemoveCb(d.hostname(sandboxID), func(key string, v string, exists bool) bool {
return v == ip
})
func (d *DNS) Remove(sandboxID, ip string) error {
if err := d.rdb.Del(ctx, d.dnsKeyFor(sandboxID)).Err(); err != nil {
return err
}
return nil
}

func (d *DNS) get(hostname string) (string, bool) {
return d.records.Get(hostname)
func (d *DNS) get(sandboxID string) (string, bool) {
if res, err := d.rdb.Get(ctx, d.dnsKeyFor(sandboxID)).Result(); err == nil {
return res, true
} else {
fmt.Fprintf(os.Stderr, "WARN: redis error getting key for sandbox '%s': %s :: trying fallback resolver..\n", sandboxID, err)

rec, ok := d.fallbackResolverFn(sandboxID)
fmt.Fprintf(os.Stderr, "Fallback result for sandbox '%s': %s / ok=%v\n", sandboxID, rec, ok)
return rec, ok
}
}

func (*DNS) hostname(sandboxID string) string {
return fmt.Sprintf("%s.", sandboxID)
func (d *DNS) dnsKeyFor(sandboxID string) string {
return fmt.Sprintf("dns.%s", sandboxID)
}

func (d *DNS) handleDNSRequest(w resolver.ResponseWriter, r *resolver.Msg) {
Expand All @@ -63,8 +80,11 @@ func (d *DNS) handleDNSRequest(w resolver.ResponseWriter, r *resolver.Msg) {
}

sandboxID := strings.Split(q.Name, "-")[0]
ip, found := d.get(sandboxID)
if found {
// Trim trailing period to facilitate key consistency.
if strings.HasSuffix(sandboxID, ".") {
sandboxID = sandboxID[0 : len(sandboxID)-1]
}
if ip, found := d.get(sandboxID); found {
a.A = net.ParseIP(ip).To4()
} else {
a.A = net.ParseIP(defaultRoutingIP).To4()
Expand Down
8 changes: 6 additions & 2 deletions packages/api/internal/orchestrator/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func (o *Orchestrator) getDeleteInstanceFunction(ctx context.Context, posthogCli
node.CPUUsage.Add(-info.VCpu)
node.RamUsage.Add(-info.RamMB)

o.dns.Remove(info.Instance.SandboxID, node.Info.IPAddress)
if err := o.dns.Remove(info.Instance.SandboxID, node.Info.IPAddress); err != nil {
return err
}
}

req := &orchestrator.SandboxDeleteRequest{SandboxId: info.Instance.SandboxID}
Expand Down Expand Up @@ -179,7 +181,9 @@ func (o *Orchestrator) getInsertInstanceFunction(ctx context.Context, logger *za
node.CPUUsage.Add(info.VCpu)
node.RamUsage.Add(info.RamMB)

o.dns.Add(info.Instance.SandboxID, node.Info.IPAddress)
if err := o.dns.Add(info.Instance.SandboxID, node.Info.IPAddress); err != nil {
return err
}
}

_, err := o.analytics.Client.InstanceStarted(ctx, &analyticscollector.InstanceStartedEvent{
Expand Down
47 changes: 29 additions & 18 deletions packages/api/internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package orchestrator
import (
"context"
"errors"
"fmt"
"log"

redis "github.com/go-redis/redis/v8"
nomadapi "github.com/hashicorp/nomad/api"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
Expand Down Expand Up @@ -39,28 +39,12 @@ func New(
logger.Errorf("Error initializing Analytics client\n: %v", err)
}

dnsServer := dns.New()

if env.IsLocal() {
fmt.Printf("Running locally, skipping starting DNS server\n")
} else {
go func() {
fmt.Printf("Starting DNS server\n")

dnsErr := dnsServer.Start("127.0.0.4", 53)
if dnsErr != nil {
log.Fatalf("Failed running DNS server: %v\n", dnsErr)
}
}()
}

o := Orchestrator{
analytics: analyticsInstance,
nomadClient: nomadClient,
logger: logger,
tracer: tracer,
nodes: smap.New[*Node](),
dns: dnsServer,
}

cache := instance.NewCache(
Expand All @@ -72,9 +56,36 @@ func New(

o.instanceCache = cache

rdbOpts := &redis.Options{Addr: "127.0.0.1:6379"}

fallbackResolverFn := func(sandboxID string) (string, bool) {
for _, apiNode := range o.GetNodes() {
detail := o.GetNodeDetail(apiNode.NodeID)
for _, sb := range detail.Sandboxes {
if sandboxID == sb.SandboxID {
if node := o.GetNode(apiNode.NodeID); node != nil {
return node.Info.IPAddress, true
}
}
}
}
return "", false
}

o.dns = dns.New(rdbOpts, fallbackResolverFn)

if env.IsLocal() {
logger.Info("Skipping syncing sandboxes, running locally")
logger.Info("Running locally, skipping starting DNS server")
logger.Info("Running locally, skipping syncing sandboxes")
} else {
go func() {
logger.Info("Starting DNS server")

if err := o.dns.Start("127.0.0.4", 53); err != nil {
log.Fatalf("Failed running DNS server: %v\n", err)
}
}()

go o.keepInSync(cache)
}

Expand Down
10 changes: 10 additions & 0 deletions packages/nomad/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ resource "nomad_job" "api" {
}
}

resource "nomad_job" "redis" {
jobspec = file("${path.module}/redis.hcl")

hcl2 {
vars = {
gcp_zone = var.gcp_zone
}
}
}

resource "nomad_job" "docker_reverse_proxy" {
jobspec = file("${path.module}/docker-reverse-proxy.hcl")

Expand Down
64 changes: 64 additions & 0 deletions packages/nomad/redis.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
variable "gcp_zone" {
type = string
default = "us-central1-a"
}

variable "image_name" {
type = string
default = "redis:7.4.2-alpine"
}

variable "redis_port_number" {
type = number
default = 6379
}

variable "redis_port_name" {
type = string
default = "redis"
}

job "redis" {
datacenters = [var.gcp_zone]
node_pool = "api"
priority = 95

group "api-service" {
network {
port "redis" {
static = var.redis_port_number
}
}

service {
name = "redis"
port = var.redis_port_name

check {
type = "tcp"
name = "health"
interval = "10s"
timeout = "2s"
port = var.redis_port_number
}
}

task "start" {
driver = "docker"

resources {
memory_max = 4096
memory = 2048
cpu = 1024
}

config {
network_mode = "host"
image = var.image_name
ports = [var.redis_port_name]
args = [
]
}
}
}
}

0 comments on commit 5dda45e

Please sign in to comment.