Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/all ports #15

Open
wants to merge 21 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 46 additions & 36 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,37 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/akto-api-security/mirroring-api-logging/db"
"github.com/akto-api-security/mirroring-api-logging/utils"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/akto-api-security/mirroring-api-logging/db"
"github.com/akto-api-security/mirroring-api-logging/utils"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/tcpassembly"

"net"

"github.com/akto-api-security/gomiddleware"
"github.com/segmentio/kafka-go"
)

var isGcp = false
var printCounter = 500
var bytesInThreshold = 200 * 1024 * 1024
var bytesInSleepDuration = time.Second * 120
var assemblerMap = make(map[int]*tcpassembly.Assembler)
var incomingCountMap = make(map[string]utils.IncomingCounter)
var outgoingCountMap = make(map[string]utils.OutgoingCounter)

var ignoreCloudMetadataCalls = false
var ignoreIpTraffic = false
var (
handle *pcap.Handle
err error
Expand Down Expand Up @@ -166,6 +170,14 @@ func (s *myStream) ReassemblyComplete() {
s.bidi.maybeFinish()
}

func checkIfIp(host string) bool {
if len(host) == 0 {
return true
}
chunks := strings.Split(host, ":")
return net.ParseIP(chunks[0]) != nil
}

func tryReadFromBD(bd *bidi, isPending bool) {
reader := bufio.NewReader(bytes.NewReader(bd.a.bytes))
i := 0
Expand Down Expand Up @@ -241,6 +253,16 @@ func tryReadFromBD(bd *bidi, isPending bool) {

reqHeader["host"] = req.Host

if ignoreIpTraffic && checkIfIp(req.Host) {
i++
continue
}

if ignoreCloudMetadataCalls && req.Host == "169.254.169.254" {
i++
continue
}

respHeader := make(map[string]string)
for name, values := range resp.Header {
// Loop over all values for the name.
Expand Down Expand Up @@ -385,11 +407,9 @@ func run(handle *pcap.Handle, apiCollectionId int, source string) {
// handle, err = pcap.OpenOffline("/Users/ankushjain/Downloads/dump2.pcap")
// if err != nil { }

if !isGcp {
if err := handle.SetBPFFilter("udp and port 4789"); err != nil { // optional
log.Fatal(err)
return
}
if err := handle.SetBPFFilter("not (port 9092 or port 22)"); err != nil { // optional
log.Fatal(err)
return
}

log.Println("reading in packets")
Expand All @@ -400,26 +420,10 @@ func run(handle *pcap.Handle, apiCollectionId int, source string) {
for packet := range packetSource.Packets() {
innerPacket := packet
vxlanID := apiCollectionId
if apiCollectionId <= 0 && !isGcp {

if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || packet.TransportLayer().LayerType() != layers.LayerTypeUDP {
continue
}

udpContent := packet.TransportLayer().(*layers.UDP)

vxlanIDbyteArr := udpContent.Payload[4:7]
vxlanID = int(vxlanIDbyteArr[2]) + (int(vxlanIDbyteArr[1]) * 256) + (int(vxlanIDbyteArr[0]) * 256 * 256)
innerPacket = gopacket.NewPacket(udpContent.Payload[8:], layers.LayerTypeEthernet, gopacket.Default)
// log.Println("%v", innerPacket)
}
if innerPacket.NetworkLayer() == nil || innerPacket.TransportLayer() == nil || innerPacket.TransportLayer().LayerType() != layers.LayerTypeTCP {
// log.Println("not a tcp payload")
continue
} else {
if isGcp {
vxlanID = 0
}
tcp := innerPacket.TransportLayer().(*layers.TCP)

payloadLength := len(tcp.Payload)
Expand Down Expand Up @@ -473,6 +477,22 @@ func main() {
// Handle error
}

ignoreIpTrafficVar := os.Getenv("AKTO_IGNORE_IP_TRAFFIC")
if len(ignoreIpTrafficVar) > 0 {
ignoreIpTraffic = strings.ToLower(ignoreIpTrafficVar) == "true"
log.Println("ignoreIpTraffic: ", ignoreIpTraffic)
} else {
log.Println("ignoreIpTraffic: missing. defaulting to false")
}

ignoreCloudMetadataCallsVar := os.Getenv("AKTO_IGNORE_CLOUD_METADATA_CALLS")
if len(ignoreCloudMetadataCallsVar) > 0 {
ignoreCloudMetadataCalls = strings.ToLower(ignoreCloudMetadataCallsVar) == "true"
log.Println("ignoreCloudMetadataCalls: ", ignoreCloudMetadataCalls)
} else {
log.Println("ignoreCloudMetadataCalls: missing. defaulting to false")
}

// Set up a ticker to run every 2 minutes
ticker := time.NewTicker(2 * time.Minute)

Expand All @@ -484,17 +504,7 @@ func main() {
}
}()

infra_mirroring_mode_input := os.Getenv("AKTO_INFRA_MIRRORING_MODE")

if len(infra_mirroring_mode_input) > 0 {
isGcp = (infra_mirroring_mode_input == "gcp")
}

interfaceName := "eth0"

if isGcp {
interfaceName = "ens4"
}
interfaceName := "any"

if handle, err := pcap.OpenLive(interfaceName, 128*1024, true, pcap.BlockForever); err != nil {
log.Fatal(err)
Expand Down