Skip to content

Commit

Permalink
Add eBPF files for Tigger.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbutrovich committed Mar 9, 2023
1 parent d0265cd commit d94a0f4
Show file tree
Hide file tree
Showing 12 changed files with 988 additions and 0 deletions.
21 changes: 21 additions & 0 deletions bpf/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2022 Matt Butrovich

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
39 changes: 39 additions & 0 deletions bpf/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
CC = clang
CFLAGS = -O2 -Wall -Wextra -Werror -Wno-missing-braces -g

all: mp_bouncer tc_test

clean:
rm -f *.bpf.o *.skel.h vmlinux.h mp_bouncer tc_test

vmlinux.h:
bpftool btf dump file /sys/kernel/btf/vmlinux format c > $@

mp_client: client.bpf.c vmlinux.h mp_bouncer.bpf.h mp_common.h
$(CC) $(CFLAGS) -target bpf -c client.bpf.c -o mp_client.bpf.o

mp_server: server.bpf.c vmlinux.h mp_bouncer.bpf.h mp_common.h
$(CC) $(CFLAGS) -target bpf -c server.bpf.c -o mp_server.bpf.o

mp_mirror_udp: mirror_udp.bpf.c vmlinux.h mp_common.h
$(CC) $(CFLAGS) -target bpf -c mirror_udp.bpf.c -o mp_mirror_udp.bpf.o

mp_mirror_tcp: mirror_tcp.bpf.c vmlinux.h mp_common.h
$(CC) $(CFLAGS) -target bpf -c mirror_tcp.bpf.c -o mp_mirror_tcp.bpf.o

tc_test: tc_test.bpf.c vmlinux.h mp_common.h
$(CC) $(CFLAGS) -target bpf -c tc_test.bpf.c -o tc_test.bpf.o

skeletons: mp_client mp_server mp_mirror_udp mp_mirror_tcp
bpftool gen skeleton ./mp_client.bpf.o > ./client.skel.h
bpftool gen skeleton ./mp_server.bpf.o > ./server.skel.h
bpftool gen skeleton ./mp_mirror_udp.bpf.o > ./mirror_udp.skel.h
bpftool gen skeleton ./mp_mirror_tcp.bpf.o > ./mirror_tcp.skel.h

mp_bouncer: mp_bouncer.c skeletons mp_common.h tc_test
$(CC) $(CFLAGS) mp_bouncer.c -o mp_bouncer -lbpf

.PHONY: all clean

.DELETE_ON_ERROR:
.SECONDARY:
208 changes: 208 additions & 0 deletions bpf/client.bpf.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
#include <bpf/bpf_endian.h>

#include "mp_bouncer.bpf.h"

const volatile int pooling_mode = SESSION_POOLING;

static bool process_client(struct __sk_buff *const skb, SocketState *const client_socket_state) {
// bpf_printk("\n process_client");
const uint32_t skb_length = skb->len;
uint32_t offset = 0;

uint8_t header_buffer[5] = {0, 0, 0, 0, 0};
PostgresMessageHeader *const header = (PostgresMessageHeader *const)header_buffer;

// Check if we have a leftover offset from the last buffer.
if (client_socket_state->offset_ > 0) {
// We have a leftover offset from the last buffer, so start our message processing there.
offset = client_socket_state->offset_;
// bpf_printk("retrieved offset: %u", offset);
// We "consumed" this offset so reset it to 0.
client_socket_state->offset_ = 0;
} else if (client_socket_state->offset_ < 0) {
// We have a partial header. Read it into stack header.
const int64_t partial_offset = client_socket_state->offset_;
// bpf_printk("finish partial header read with partial_offset: %d", partial_offset);

switch (partial_offset) {
case -4:
// [ X X X X ] in header
__builtin_memcpy(header_buffer, &(client_socket_state->split_header_), 4);
bpf_skb_load_bytes(skb, 0, &(header_buffer[4]), 1); // TODO(Matt): In theory this could fail with a tiny skb.
break;
case -3:
// [ X X X - ] in header
__builtin_memcpy(header_buffer, &(client_socket_state->split_header_), 3);
bpf_skb_load_bytes(skb, 0, &(header_buffer[3]), 2); // TODO(Matt): In theory this could fail with a tiny skb.
break;
case -2:
// [ X X - - ] in header
__builtin_memcpy(header_buffer, &(client_socket_state->split_header_), 2);
bpf_skb_load_bytes(skb, 0, &(header_buffer[2]), 3); // TODO(Matt): In theory this could fail with a tiny skb.
break;
case -1:
// [ X - - - ] in header
__builtin_memcpy(header_buffer, &(client_socket_state->split_header_), 1);
bpf_skb_load_bytes(skb, 0, &(header_buffer[1]), 4); // TODO(Matt): In theory this could fail with a tiny skb.
break;
}
// Reset the SocketState.
__builtin_memset(client_socket_state, 0, 8);

// char query_type[2] = {header->type_, 0};

// bpf_printk("split header query type: %s", query_type);
// bpf_printk("split header query length: %u", bpf_ntohl(header->length_));

if (header->type_ == 'X') {
bpf_printk("Retrieved a split header for a disconnect message.");
// TODO(Matt): Check if type is 'X', if yes -> extend front of packet, prepend header, and don't redirect.
}

// Move our offset to look for the next message.
offset += sizeof(char) + bpf_ntohl(header->length_) + partial_offset;

// bpf_printk("offset after split header: %u", offset);
}

uint16_t messages = 0;
do {
// bpf_printk("offset: %u", offset);
// bpf_printk("skb_length: %u", skb_length);

if (offset > skb_length) {
// The offset is past this buffer's length. Stash the difference as the starting point for the next buffer.
client_socket_state->offset_ = offset - skb_length;
// bpf_printk("stashing offset_map_val: %d", client_socket_state->offset_);
return true;
}

if (offset == skb_length) {
// This buffer is complete.
// bpf_printk("done with this buffer");
return true;
}

// offset < skb_length

// Check for a header split across buffers.
if (skb_length - offset < sizeof(PostgresMessageHeader)) {
// We can't do a full header read.
const uint8_t remaining_bytes = skb_length - offset;
// bpf_printk("start partial header read with remaining bytes: %u", remaining_bytes);

switch (remaining_bytes) {
case 4: {
// Put [ X X X X ] in temp buffer.
bpf_skb_load_bytes(skb, offset, &(client_socket_state->split_header_), 4);
client_socket_state->offset_ = -4;
break;
}
case 3: {
// Put [ X X X - ] in temp buffer.
bpf_skb_load_bytes(skb, offset, &(client_socket_state->split_header_), 3);
client_socket_state->offset_ = -3;
break;
}
case 2: {
// Put [ X X - - ] in temp buffer, truncate it from current skb.
bpf_skb_load_bytes(skb, offset, &(client_socket_state->split_header_), 2);
client_socket_state->offset_ = -2;
break;
}
case 1: {
// Put [ X - - - ] in temp buffer, truncate it from current skb.
bpf_skb_load_bytes(skb, offset, &(client_socket_state->split_header_), 1);
client_socket_state->offset_ = -1;
break;
}
}
if (client_socket_state->split_header_[0] == 'X') {
bpf_printk("Got a split header for a disconnect message.");
// TODO(Matt): Check if type is 'X', if yes -> truncate and forward.
}
return true;
}

// offset + sizeof(PostgresMessageHeader) <= skb_length
// We can do a full header read from this buffer. This is the common case.
bpf_skb_load_bytes(skb, offset, header, sizeof(PostgresMessageHeader));

// char query_type[2] = {header->type_, 0};

// bpf_printk("query type: %s", query_type);
// bpf_printk("query length: %u", bpf_ntohl(header->length_));

if (header->type_ == 'X') {
// Client wants to disconnect. Don't redirect this buffer.
return false;
}
// Move our offset to look for the next message.
offset += sizeof(char) + bpf_ntohl(header->length_);
messages++;
} while (messages < MAX_MESSAGES); // Need to bound the loop to some constant to please the verifier.
return true;
}

SEC("sk_skb/mp_client")
int32_t _mp_client(struct __sk_buff *const skb) {
// bpf_printk("");
// bpf_printk("client");
// bpf_printk("len: %u", skb->len);
// bpf_printk("local_port: %u", skb->local_port);
// bpf_printk("remote_port: %u", bpf_ntohl(skb->remote_port));
if (skb->local_port == 6432) {
const uint32_t client_socket_key = bpf_ntohl(skb->remote_port);
// bpf_printk("client traffic from %u", client_socket_key);

SocketState *const client_socket_state = bpf_map_lookup_elem(&socket_states, &client_socket_key);

if (!client_socket_state) {
// bpf_printk("no client socket state.");
// This shouldn't happen, but it keeps the verifier happy.
return SK_PASS;
}

// Check if client has a link, if not find one.
if (client_socket_state->sink_ == 0) {
uint32_t server_socket_key;
const int64_t pop_result = bpf_map_pop_elem(&idle_server_sockets, &server_socket_key);
if (pop_result < 0 || server_socket_key == 0) {
// bpf_printk("couldn't get a server socket, sending client to userspace.");
// Send this client on the slow path to userspace and handle it there for the rest of this pooled operation.
client_socket_state->sink_ = 0xFFFFFFFF;
return SK_PASS;
}
client_socket_state->sink_ = server_socket_key;

SocketState *const server_socket_state = bpf_map_lookup_elem(&socket_states, &server_socket_key);
if (server_socket_state) {
server_socket_state->sink_ = client_socket_key;
}
// bpf_printk("linked client %u and server %u", client_socket_key, server_socket_key);
} else if (client_socket_state->sink_ == 0xFFFFFFFF) {
// bpf_printk("client being handled in userspace.");
return SK_PASS;
}

const bool redirect = process_client(skb, client_socket_state);

const uint32_t server_socket_key = client_socket_state->sink_;

if (redirect) {
// bpf_printk("bouncing to %u", server_socket_key);
bpf_sk_redirect_map(skb, &server_sockets, server_socket_key, 0);
} else {
// bpf_printk("unlinking client %u and server %u", client_socket_key, server_socket_key);
SocketState *const server_socket_state = bpf_map_lookup_elem(&socket_states, &server_socket_key);
if (server_socket_state) {
server_socket_state->sink_ = 0;
}
client_socket_state->sink_ = 0;
bpf_map_push_elem(&idle_server_sockets, &server_socket_key, 0);
}
}
return SK_PASS;
}

char LICENSE[] SEC("license") = "Dual MIT/GPL";
19 changes: 19 additions & 0 deletions bpf/mirror_tcp.bpf.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include <bpf/bpf_endian.h>

#include "mp_bouncer.bpf.h"
#include "mp_common.h"

SEC("sk_skb/mp_mirror_tcp")
int32_t _mp_mirror_tcp(__attribute__((unused)) struct __sk_buff *const skb) {
// bpf_printk("");
// bpf_printk("mirror_tcp");
// bpf_printk("len: %u", skb->len);
// bpf_printk("local_port: %u", skb->local_port);
// bpf_printk("remote_port: %u", bpf_ntohl(skb->remote_port));

// TODO(Matt): In the scenario where we want the first response back to be forwarded to the client, handle that here?

return SK_DROP;
}

char LICENSE[] SEC("license") = "Dual MIT/GPL";
31 changes: 31 additions & 0 deletions bpf/mirror_udp.bpf.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#include <bpf/bpf_endian.h>

#include "mp_bouncer.bpf.h"
#include "mp_common.h"

SEC("sk_skb/mp_mirror_udp")
int32_t _mp_mirror_udp(struct __sk_buff *const skb) {
// bpf_printk("");
// bpf_printk("mirror_udp");
// bpf_printk("len: %u", skb->len);
// bpf_printk("local_port: %u", skb->local_port);
// bpf_printk("remote_port: %u", bpf_ntohl(skb->remote_port));

// Read the stashed data offset.
uint8_t data_offset;
bpf_skb_load_bytes(skb, 0, &data_offset, 1);
// bpf_printk("data offset: %u", data_offset);
// Read the stashed TCP destination port.
uint32_t tcp_port;
bpf_skb_load_bytes(skb, 1, &tcp_port, 4);

// Trim the buffer to remove any leftover bits of TCP header and stashed metadata.
bpf_skb_adjust_room(skb, -data_offset, 0, 0);
// bpf_printk("len: %u", skb->len);

// Redirect the buffer to the correct TCP port.
// bpf_printk("bouncing to %u", tcp_port);
return bpf_sk_redirect_map(skb, &mirror_tcp_sockets, tcp_port, 0);
}

char LICENSE[] SEC("license") = "Dual MIT/GPL";
56 changes: 56 additions & 0 deletions bpf/mp_bouncer.bpf.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

// clang-format off
#include "vmlinux.h" // Needs to be included before bpf_helpers.h
#include <bpf/bpf_helpers.h>
// clang-format on

#include "mp_common.h"

struct {
__uint(type, BPF_MAP_TYPE_ARRAY);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(SocketState));
__uint(max_entries, 65536);
} socket_states SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_SOCKMAP);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(u32));
__uint(max_entries, 65536);
} client_sockets SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_SOCKMAP);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(u32));
__uint(max_entries, 65536);
} server_sockets SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_SOCKMAP);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(u32));
__uint(max_entries, 65536);
} mirror_udp_sockets SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_SOCKMAP);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(u32));
__uint(max_entries, 65536);
} mirror_tcp_sockets SEC(".maps");

// TODO(Matt): idle_server_sockets size should be the same as bpf_pool_size in pgbouncer.ini.
struct {
__uint(type, BPF_MAP_TYPE_STACK);
__uint(key_size, 0);
__uint(value_size, sizeof(u32));
__uint(max_entries, 65536);
} idle_server_sockets SEC(".maps");

enum {
MAX_MESSAGES = 2048 // Arbitrary limit to make the verifier happy. I had to raise this from 255 to make pg_dump happy
// since it sends a lot of stuff back in a single buffer sometimes.
};
Loading

0 comments on commit d94a0f4

Please sign in to comment.