diff --git a/README.md b/README.md index ed0f4b4..6764405 100644 --- a/README.md +++ b/README.md @@ -3,10 +3,10 @@ heplify is captagents little brother. While it offers a compareable performance It's a single binary which you can place on your linux or windows machine. Just run it to capture packets and send them to Homer. Right now heplify is able to send SIP, DNS, LOG or TLS handshakes into homer. It's able to handle fragmented and duplicate packets out of the box. - + ### Requirements * libpcap - + On Debian/Ubuntu: sudo apt-get install libpcap-dev On CentOS/RHEL: yum install libpcap-devel On Windows: install WinPcap diff --git a/build_static.sh b/build_static.sh new file mode 100644 index 0000000..8f24c24 --- /dev/null +++ b/build_static.sh @@ -0,0 +1,17 @@ +#!/bin/sh + +set -ex + +apk update +apk add linux-headers musl-dev gcc go libpcap-dev ca-certificates git + +mkdir /go +export GOPATH=/go +mkdir -p /go/src/github.com/negbie +mkdir -p /mnt/out +cp -a /mnt /go/src/github.com/negbie/heplify +cd /go/src/github.com/negbie/heplify +rm -f heplify* +go get -v ./ ./ +go build --ldflags '-linkmode external -extldflags "-static -s -w"' -v ./ +cp ./heplify /mnt/out/ diff --git a/config/config.go b/config/config.go index 6f7ef3c..4eff8f3 100644 --- a/config/config.go +++ b/config/config.go @@ -23,7 +23,7 @@ type InterfacesConfig struct { WriteFile string `config:"write_file"` Snaplen int `config:"snaplen"` BufferSizeMb int `config:"buffer_size_mb"` - TopSpeed bool `config:"top_speed"` + ReadSpeed bool `config:"top_speed"` OneAtATime bool `config:"one_at_a_time"` Loop int `config:"loop"` } diff --git a/decoder/decoder.go b/decoder/decoder.go index 4dc255f..3b44682 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -1,9 +1,13 @@ package decoder import ( + "bytes" "hash" "os" + "strconv" + "time" + "github.com/allegro/bigcache" "github.com/cespare/xxhash" "github.com/google/gopacket" "github.com/google/gopacket/layers" @@ -24,7 +28,10 @@ type Decoder struct { tcpCount int dnsCount int unknownCount int - lru *lru.ARCCache + IPFlow gopacket.Flow + UDPFlow gopacket.Flow + lru *lru.Cache + bigcache *bigcache.BigCache hash hash.Hash64 } @@ -40,19 +47,48 @@ type Packet struct { Dport uint16 CorrelationID []byte Payload []byte + Type byte } func NewDecoder() *Decoder { - host, err := os.Hostname() if err != nil { host = "sniffer" } - l, err := lru.NewARC(8192) + + la, err := lru.New(8000) if err != nil { logp.Err("lru %v", err) } - h := xxhash.New() + + xh := xxhash.New() + + bConf := bigcache.Config{ + // number of shards (must be a power of 2) + Shards: 1024, + // time after which entry can be evicted + LifeWindow: 180 * time.Minute, + // rps * lifeWindow, used only in initial memory allocation + MaxEntriesInWindow: 1000 * 180 * 60, + // max entry size in bytes, used only in initial memory allocation + MaxEntrySize: 300, + // prints information about additional memory allocation + Verbose: true, + // cache will not allocate more memory than this limit, value in MB + // if value is reached then the oldest entries can be overridden for the new ones + // 0 value means no size limit + HardMaxCacheSize: 512, + // callback fired when the oldest entry is removed because of its + // expiration time or no space left for the new entry. Default value is nil which + // means no callback and it prevents from unwrapping the oldest entry. + OnRemove: nil, + } + + bc, err := bigcache.NewBigCache(bConf) + if err != nil { + logp.Err("bigcache %v", err) + } + d := &Decoder{ Host: host, defragger: ip4defrag.NewIPv4Defragmenter(), @@ -63,8 +99,9 @@ func NewDecoder() *Decoder { tcpCount: 0, dnsCount: 0, unknownCount: 0, - lru: l, - hash: h, + lru: la, + hash: xh, + bigcache: bc, } go d.flushFrag() go d.printStats() @@ -101,7 +138,9 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error } } + d.IPFlow = ip4.NetworkFlow() d.ip4Count++ + pkt.Version = ip4.Version pkt.Protocol = uint8(ip4.Protocol) pkt.Srcip = ip2int(ip4.SrcIP) @@ -109,8 +148,8 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error ip4New, err := d.defragger.DefragIPv4(ip4) if err != nil { - logp.Err("Error while de-fragmenting", err) - return nil, err + logp.Warn("Error while de-fragmenting", err) + return nil, nil } else if ip4New == nil { d.fragCount++ return nil, nil @@ -133,10 +172,6 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error nextDecoder := ip4New.NextLayerType() nextDecoder.Decode(ip4New.Payload, pb) } - // TODO: generate a more meaningful CorrelationID - if config.Cfg.Mode == "DNS" || config.Cfg.Mode == "LOG" || config.Cfg.Mode == "TLS" { - pkt.CorrelationID = []byte(config.Cfg.Mode) - } } if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil { @@ -144,10 +179,21 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error if !ok { return nil, nil } + + d.UDPFlow = udp.TransportFlow() d.udpCount++ + pkt.Sport = uint16(udp.SrcPort) pkt.Dport = uint16(udp.DstPort) pkt.Payload = udp.Payload + pkt.Type = 1 + + if config.Cfg.Mode == "SIPRTCP" { + d.cacheSDPIPPort(udp.Payload) + if (udp.Payload[0]&0xc0)>>6 == 2 && (udp.Payload[1] == 200 || udp.Payload[1] == 201) { + pkt.Payload, pkt.CorrelationID, pkt.Type = d.correlateRTCP(udp.Payload) + } + } } else if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil { tcp, ok := tcpLayer.(*layers.TCP) @@ -158,6 +204,11 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error pkt.Sport = uint16(tcp.SrcPort) pkt.Dport = uint16(tcp.DstPort) pkt.Payload = tcp.Payload + pkt.Type = 1 + + if config.Cfg.Mode == "SIPRTCP" { + d.cacheSDPIPPort(tcp.Payload) + } } if dnsLayer := packet.Layer(layers.LayerTypeDNS); dnsLayer != nil { @@ -186,3 +237,62 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error d.unknownCount++ return nil, nil } + +func (d *Decoder) cacheSDPIPPort(payload []byte) { + var SDPIP, RTCPPort string + var callID []byte + + if posSDPIP, posSDPPort := bytes.Index(payload, []byte("c=IN IP4 ")), bytes.Index(payload, []byte("m=audio ")); posSDPIP >= 0 && posSDPPort >= 0 { + restIP := payload[posSDPIP:] + if posRestIP := bytes.Index(restIP, []byte("\r\n")); posRestIP >= 0 { + SDPIP = string(restIP[len("c=IN IP4 "):bytes.Index(restIP, []byte("\r\n"))]) + } else { + logp.Warn("Couldn't find end of SDP IP in '%s'", string(restIP)) + } + + restPort := payload[posSDPPort:] + if posRestPort := bytes.Index(restIP, []byte(" RTP")); posRestPort >= 0 { + SDPPort, err := strconv.Atoi(string(restPort[len("m=audio "):bytes.Index(restPort, []byte(" RTP"))])) + if err != nil { + logp.Warn("%v", err) + } + RTCPPort = strconv.Itoa(SDPPort + 1) + } else { + logp.Warn("Couldn't find end of SDP Port in '%s'", string(restPort)) + } + + if posCallID := bytes.Index(payload, []byte("Call-ID: ")); posCallID >= 0 { + restCallID := payload[posCallID:] + if posRestCallID := bytes.Index(restIP, []byte("\r\n")); posRestCallID >= 0 { + callID = restCallID[len("Call-ID: "):bytes.Index(restCallID, []byte("\r\n"))] + } else { + logp.Warn("Couldn't find end of Call-ID in '%s'", string(restCallID)) + } + } else if posID := bytes.Index(payload, []byte("i: ")); posID >= 0 { + restID := payload[posID:] + if posRestID := bytes.Index(restIP, []byte("\r\n")); posRestID >= 0 { + callID = restID[len("i: "):bytes.Index(restID, []byte("\r\n"))] + } else { + logp.Warn("Couldn't find end of Call-ID in '%s'", string(restID)) + } + } + d.bigcache.Set(SDPIP+RTCPPort, callID) + } +} + +func (d *Decoder) correlateRTCP(payload []byte) ([]byte, []byte, byte) { + jsonRTCP, err := protos.ParseRTCP(payload) + if err != nil { + logp.Warn("%v", err) + return nil, nil, 0 + } + + corrID, err := d.bigcache.Get(d.IPFlow.Src().String() + d.UDPFlow.Src().String()) + if err != nil { + logp.Warn("%v", err) + return nil, nil, 0 + } + + logp.Debug("decoder", "RTCP JSON payload: %s", string(jsonRTCP)) + return jsonRTCP, corrID, 5 +} diff --git a/main.go b/main.go index e134cf2..e64f3bb 100644 --- a/main.go +++ b/main.go @@ -28,8 +28,8 @@ func parseFlags() { flag.StringVar(&ifaceConfig.Type, "t", "pcap", "Capture types are [af_packet, pcap, file]") flag.StringVar(&ifaceConfig.ReadFile, "rf", "", "Read packets from file. Please use -t file") flag.StringVar(&ifaceConfig.WriteFile, "wf", "", "Write packets to file") - flag.IntVar(&ifaceConfig.Loop, "lp", 0, "Loop count over ReadFile") - flag.BoolVar(&ifaceConfig.TopSpeed, "ts", false, "Topspeed uses timestamps from packets") + flag.IntVar(&ifaceConfig.Loop, "lp", 1, "Loop count over ReadFile") + flag.BoolVar(&ifaceConfig.ReadSpeed, "rs", false, "Maximum read speed. Doesn't use packet timestamps") flag.IntVar(&ifaceConfig.Snaplen, "s", 32768, "Snap length") flag.IntVar(&ifaceConfig.BufferSizeMb, "b", 64, "Interface buffersize (MB)") flag.IntVar(&keepLogFiles, "kl", 4, "Rotate the number of log files") @@ -38,7 +38,7 @@ func parseFlags() { flag.StringVar(&fileRotator.Path, "p", "./", "Log filepath") flag.StringVar(&fileRotator.Name, "n", "heplify.log", "Log filename") flag.Uint64Var(&rotateEveryKB, "r", 16384, "Log filesize (KB)") - flag.StringVar(&config.Cfg.Mode, "m", "SIP", "Capture modes [DNS, LOG, SIP, TLS]") + flag.StringVar(&config.Cfg.Mode, "m", "SIP", "Capture modes [DNS, LOG, SIP, RTCP, TLS]") flag.BoolVar(&config.Cfg.Dedup, "dd", true, "Deduplicate packets") flag.StringVar(&config.Cfg.Filter, "fi", "", "Filter out interesting packets like SIP INVITES, Handshakes ...") flag.StringVar(&config.Cfg.Discard, "di", "", "Discard uninteresting packets like SIP OPTIONS, HTTP Requests ...") diff --git a/protos/rtcp.go b/protos/rtcp.go new file mode 100644 index 0000000..f41f0a4 --- /dev/null +++ b/protos/rtcp.go @@ -0,0 +1,290 @@ +package protos + +import ( + "encoding/binary" + "encoding/json" + "fmt" + + "github.com/negbie/heplify/logp" +) + +/* RTCP header +0 1 2 3 4 +0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +|V=2|P| RC | PT(200-204) | length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +SR: +0 1 2 3 4 +0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +|V=2|P| RC | PT=SR=200 | length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SSRC of sender | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| NTP timestamp,most significant word | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| NTP timestamp,least significant word | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| RTP timestamp | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| sender's packet count | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| sender's octet count | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SSRC_1 (SSRC of first source) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| fraction lost | cumulative number of packets lost | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| extended highest sequence number received | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| interarrival jitter | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| last SR (LSR) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| delay since last SR (DLSR) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SSRC_2 (SSRC of second source) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| ... | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| profile-specific extensions | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +RR: +0 1 2 3 4 +0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +|V=2|P| RC | PT=PR=201 | length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SSRC of sender | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| SSRC_1 (SSRC of first source) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| fraction lost | cumulative number of packets lost | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| extended highest sequence number received | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| interarrival jitter | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| last SR (LSR) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| delay since last SR (DLSR) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SSRC_2 (SSRC of second source) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +: ... : ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| profile-specific extensions | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +SDES: +0 1 2 3 4 +0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +|V=2|P| SC | PT=SR=202 | length | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| SSRC/CSRC_1 | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SDES items | +| ... | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| SSRC/CSRC_2 | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SDES items | +| ... | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + +BYE: +0 1 2 3 4 +0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +|V=2|P| SC | PT=SR=203 | length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SSRC/CSRC | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +: ... | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| length | reason for leaving | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +APP: +0 1 2 3 4 +0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +|V=2|P| SC | PT=SR=204 | length | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| SSRC/CSRC | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| name (ASCII) | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| application-dependent data | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +*/ + +const ( + TYPE_RTCP_SR = 200 + TYPE_RTCP_RR = 201 + TYPE_RTCP_SDES = 202 + TYPE_RTCP_BYE = 203 + TYPE_RTCP_APP = 204 + TYPE_RTCP_XR = 207 +) + +type RTCP_header struct { + Version uint8 `json:"version"` // 2 bit + Padding uint8 `json:"padding"` // 1 bit + ReceptionReportCount uint8 `json:"report_count"` // 5 bit + RTCPType uint8 `json:"type"` // 8 bit + Length uint16 `json:"length"` // 16 bit +} + +type RTCP_Packet struct { + SenderInformation struct { + Ntp_timestamp_MSW uint32 `json:"ntp_timestamp_sec"` // 32 bit + Ntp_timestamp_LSW uint32 `json:"ntp_timestamp_usec"` // 32 bit + Rtp_timestamp uint32 `json:"rtp_timestamp"` // 32 bit + Pkt_count uint32 `json:"packets"` // 32 bit + Octet_count uint32 `json:"octets"` // 32 bit + } `json:"sender_information"` + Ssrc uint32 `json:"ssrc"` // 32 bit + ReportBlocks []RTCP_report_block `json:"report_blocks"` +} + +type RTCP_report_block struct { + SourceSsrc uint32 `json:"source_ssrc"` // 32 bit + Fraction_lost uint8 `json:"fraction_lost"` // 8 bit + Cumulative_lost uint32 `json:"packets_lost"` // 24 bit + Highest_seq_no uint32 `json:"highest_seq_no"` // 32 bit + Jitter uint32 `json:"ia_jitter"` // 32 bit + LastSR uint32 `json:"lsr"` // 32 bit + Delay_last_SR uint32 `json:"dlsr"` // 32 bit + ReportCount uint8 `json:"report_count"` // 8 bit + RTCPType uint8 `json:"type"` // 8 bit +} + +func (rp *RTCP_Packet) AddReportBlock(rb RTCP_report_block) []RTCP_report_block { + rp.ReportBlocks = append(rp.ReportBlocks, rb) + return rp.ReportBlocks +} + +func (rp *RTCP_Packet) MarshalJSON() ([]byte, error) { + bytes, err := json.Marshal(*rp) + return bytes, err +} + +func ParseRTCP(data []byte) ([]byte, error) { + dataLen := len(data) + if dataLen < 28 { + return nil, fmt.Errorf("Useless data inside RTCP packet='%s' length=%d", string(data), len(data)) + } + pkt := &RTCP_Packet{} + offset := 0 + + for dataLen > 0 { + if dataLen%4 != 0 || dataLen < 4 { + return nil, fmt.Errorf("Fishy RTCP packet=%v length=%d", data, dataLen) + } + + //version := (data[offset] & 0xc0) >> 6 + //padding := (data[offset] & 0x20) >> 5 + receptionReportCount := data[offset] & 0x1f + RTCPType := data[offset+1] + RTCPLength := binary.BigEndian.Uint16(data[offset+2:]) * 4 + + offset = offset + 4 + + if receptionReportCount < 0 || receptionReportCount > 4 { + return nil, fmt.Errorf("Fishy RTCP receptionReportCount=%d", receptionReportCount) + } + + switch RTCPType { + case TYPE_RTCP_SR: + if RTCPLength < 24 { + return nil, fmt.Errorf("To small RTCP packet=%v length=%d type=%d", data[offset:RTCPLength], RTCPLength, RTCPType) + } + + pkt.Ssrc = binary.BigEndian.Uint32(data[offset:]) + pkt.SenderInformation.Ntp_timestamp_MSW = binary.BigEndian.Uint32(data[offset+4:]) + pkt.SenderInformation.Ntp_timestamp_LSW = binary.BigEndian.Uint32(data[offset+8:]) + pkt.SenderInformation.Rtp_timestamp = binary.BigEndian.Uint32(data[offset+12:]) + pkt.SenderInformation.Pkt_count = binary.BigEndian.Uint32(data[offset+16:]) + pkt.SenderInformation.Octet_count = binary.BigEndian.Uint32(data[offset+20:]) + offset = offset + 24 + + if receptionReportCount > 0 && RTCPLength >= 24 { + tmpReportBlocks := make([]RTCP_report_block, receptionReportCount) + for i := 0; i < int(receptionReportCount); i++ { + tmpReportBlocks[i].SourceSsrc = binary.BigEndian.Uint32(data[offset:]) + tmpReportBlocks[i].Fraction_lost = data[offset+4] + var cumBuf [4]byte + copy(cumBuf[1:], data[offset+5:offset+8]) + tmpReportBlocks[i].Cumulative_lost = binary.BigEndian.Uint32(cumBuf[:]) + tmpReportBlocks[i].Highest_seq_no = binary.BigEndian.Uint32(data[offset+8:]) + tmpReportBlocks[i].Jitter = binary.BigEndian.Uint32(data[offset+12:]) + tmpReportBlocks[i].LastSR = binary.BigEndian.Uint32(data[offset+16:]) + tmpReportBlocks[i].Delay_last_SR = binary.BigEndian.Uint32(data[offset+20:]) + tmpReportBlocks[i].ReportCount = receptionReportCount + tmpReportBlocks[i].RTCPType = RTCPType + offset = offset + 24 + pkt.ReportBlocks = pkt.AddReportBlock(tmpReportBlocks[i]) + } + } + + case TYPE_RTCP_RR: + if RTCPLength < 28 { + return nil, fmt.Errorf("To small RTCP packet=%v length=%d type=%d", data[offset:RTCPLength], RTCPLength, RTCPType) + } + + pkt.Ssrc = binary.BigEndian.Uint32(data[offset:]) + offset = offset + 4 + + if receptionReportCount > 0 && RTCPLength >= 28 { + tmpReportBlocks := make([]RTCP_report_block, receptionReportCount) + for i := 0; i < int(receptionReportCount); i++ { + tmpReportBlocks[i].SourceSsrc = binary.BigEndian.Uint32(data[offset:]) + tmpReportBlocks[i].Fraction_lost = data[offset+4] + var cumBuf [4]byte + copy(cumBuf[1:], data[offset+5:offset+8]) + tmpReportBlocks[i].Cumulative_lost = binary.BigEndian.Uint32(cumBuf[:]) + tmpReportBlocks[i].Highest_seq_no = binary.BigEndian.Uint32(data[offset+8:]) + tmpReportBlocks[i].Jitter = binary.BigEndian.Uint32(data[offset+12:]) + tmpReportBlocks[i].LastSR = binary.BigEndian.Uint32(data[offset+16:]) + tmpReportBlocks[i].Delay_last_SR = binary.BigEndian.Uint32(data[offset+20:]) + tmpReportBlocks[i].ReportCount = receptionReportCount + tmpReportBlocks[i].RTCPType = RTCPType + offset = offset + 24 + pkt.ReportBlocks = pkt.AddReportBlock(tmpReportBlocks[i]) + } + } + + case TYPE_RTCP_SDES: + logp.Debug("rtcp", "Discard RTCP_SDES packet type: %d", RTCPType) + offset = offset + int(RTCPLength) + case TYPE_RTCP_APP: + logp.Debug("rtcp", "Discard RTCP_APP packet type: %d", RTCPType) + offset = offset + int(RTCPLength) + case TYPE_RTCP_BYE: + logp.Debug("rtcp", "Discard RTCP_BYE packet type: %d", RTCPType) + offset = offset + int(RTCPLength) + case TYPE_RTCP_XR: + logp.Debug("rtcp", "Discard RTCP_XR packet type: %d", RTCPType) + offset = offset + int(RTCPLength) + default: + logp.Debug("rtcp", "Discard unsupported packet type: %d", RTCPType) + offset = offset + int(RTCPLength) + } + + dataLen -= offset + + } + + rtcpPkt, err := pkt.MarshalJSON() + if err != nil { + return nil, err + } + + logp.Debug("rtcp", "RTCP json payload:\n%v\n", string(rtcpPkt)) + return rtcpPkt, nil +} diff --git a/publish/hep.go b/publish/hep.go index 2560b5b..ce2d1f1 100644 --- a/publish/hep.go +++ b/publish/hep.go @@ -6,7 +6,6 @@ import ( "encoding/binary" "net" - "github.com/negbie/heplify/config" "github.com/negbie/heplify/decoder" "github.com/negbie/heplify/logp" ) @@ -180,10 +179,16 @@ func makeChunck(chunckVen uint16, chunckType uint16, h *decoder.Packet) []byte { // Chunk protocol type (SIP/H323/RTP/MGCP/M2UA) case 0x000b: chunck = make([]byte, 6+1) - if config.Cfg.Mode == "SIP" { + switch h.Type { + case 1: chunck[6] = 1 // SIP - } else { + case 5: + chunck[6] = 5 // RTCP + case 100: chunck[6] = 100 // LOG + default: + chunck[6] = 66 // Unknown + } // Chunk capture agent ID @@ -240,7 +245,7 @@ func newHEPChuncks(h *decoder.Packet) []byte { buf.Write(makeChunck(0x0000, 0x000c, h)) buf.Write(makeChunck(0x0000, 0x000e, h)) buf.Write(makeChunck(0x0000, 0x000f, h)) - if config.Cfg.Mode != "SIP" { + if h.CorrelationID != nil { buf.Write(makeChunck(0x0000, 0x0011, h)) } return buf.Bytes() diff --git a/sniffer/sniffer.go b/sniffer/sniffer.go index 3cae13b..781d88e 100644 --- a/sniffer/sniffer.go +++ b/sniffer/sniffer.go @@ -89,13 +89,17 @@ func (sniffer *SnifferSetup) setFromConfig(cfg *config.InterfacesConfig) error { sniffer.config.Snaplen = 65535 } - if sniffer.config.Type != "file" && sniffer.config.Type != "af_packet" { + if sniffer.config.Type != "af_packet" { sniffer.config.Type = "pcap" } switch sniffer.mode { case "SIP": sniffer.filter = "(greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0) or (vlan and (greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0))" + case "RTCP": + sniffer.filter = "(ip and ip[6] & 0x2 = 0 and ip[6:2] & 0x1fff = 0 and udp and udp[8] & 0xc0 = 0x80 and udp[9] >= 0xc8 && udp[9] <= 0xcc)" + case "SIPRTCP": + sniffer.filter = "(greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0) or (ip and ip[6] & 0x2 = 0 and ip[6:2] & 0x1fff = 0 and udp and udp[8] & 0xc0 = 0x80 and udp[9] >= 0xc8 && udp[9] <= 0xcc)" case "LOG": sniffer.filter = "greater 128 and port 514" case "DNS": @@ -107,29 +111,24 @@ func (sniffer *SnifferSetup) setFromConfig(cfg *config.InterfacesConfig) error { sniffer.filter = "(greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0) or (vlan and (greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0))" } - logp.Debug("sniffer", "Sniffer type: [%s] device: [%s] mode: [%s]", sniffer.config.Type, sniffer.config.Device, sniffer.mode) + logp.Info("Sniffer type: [%s] device: [%s] mode: [%s]", sniffer.config.Type, sniffer.config.Device, sniffer.mode) switch sniffer.config.Type { - case "file": - sniffer.pcapHandle, err = pcap.OpenOffline(sniffer.config.ReadFile) - if err != nil { - return fmt.Errorf("couldn't open file %v %v", sniffer.config.ReadFile, err) - } - err = sniffer.pcapHandle.SetBPFFilter(sniffer.filter) - if err != nil { - return fmt.Errorf("SetBPFFilter '%s' for pcap: %v", sniffer.filter, err) - } - - sniffer.DataSource = gopacket.PacketDataSource(sniffer.pcapHandle) - case "pcap": - sniffer.pcapHandle, err = pcap.OpenLive(sniffer.config.Device, int32(sniffer.config.Snaplen), true, pcap.BlockForever) - if err != nil { - return fmt.Errorf("setting pcap live mode: %v", err) - } - err = sniffer.pcapHandle.SetBPFFilter(sniffer.filter) - if err != nil { - return fmt.Errorf("SetBPFFilter '%s' for pcap: %v", sniffer.filter, err) + if sniffer.config.ReadFile != "" { + sniffer.pcapHandle, err = pcap.OpenOffline(sniffer.config.ReadFile) + if err != nil { + return fmt.Errorf("couldn't open file %v! %v", sniffer.config.ReadFile, err) + } + } else { + sniffer.pcapHandle, err = pcap.OpenLive(sniffer.config.Device, int32(sniffer.config.Snaplen), true, pcap.BlockForever) + if err != nil { + return fmt.Errorf("setting pcap live mode: %v", err) + } + err = sniffer.pcapHandle.SetBPFFilter(sniffer.filter) + if err != nil { + return fmt.Errorf("SetBPFFilter '%s' for pcap: %v", sniffer.filter, err) + } } sniffer.DataSource = gopacket.PacketDataSource(sniffer.pcapHandle) @@ -167,7 +166,7 @@ func (sniffer *SnifferSetup) Init(testMode bool, mode string, factory WorkerFact var err error sniffer.mode = mode - if interfaces.Device == "" { + if interfaces.Device == "" && interfaces.ReadFile == "" { fmt.Printf("\nPlease use one of the following devices:\n\n") _, err := ListDeviceNames(false, false) if err != nil { @@ -247,8 +246,8 @@ func (sniffer *SnifferSetup) Run() error { logp.Debug("sniffer", "End of file") loopCount++ if sniffer.config.Loop > 0 && loopCount > sniffer.config.Loop { - // time for the publish goroutine to flush - time.Sleep(300 * time.Millisecond) + // Give the publish goroutine 200 ms to flush + time.Sleep(200 * time.Millisecond) sniffer.isAlive = false continue } @@ -277,7 +276,7 @@ func (sniffer *SnifferSetup) Run() error { } if sniffer.config.ReadFile != "" { - if lastPktTime != nil && !sniffer.config.TopSpeed { + if lastPktTime != nil && !sniffer.config.ReadSpeed { sleep := ci.Timestamp.Sub(*lastPktTime) if sleep > 0 { time.Sleep(sleep) @@ -287,14 +286,14 @@ func (sniffer *SnifferSetup) Run() error { } _lastPktTime := ci.Timestamp lastPktTime = &_lastPktTime - if !sniffer.config.TopSpeed { + if !sniffer.config.ReadSpeed { // Overwrite what we get from the pcap ci.Timestamp = time.Now() } } else if sniffer.config.WriteFile != "" { err := sniffer.dumper.WritePacket(ci, data) if err != nil { - return fmt.Errorf("couldn't write to file %v %v", sniffer.config.WriteFile, err) + return fmt.Errorf("couldn't write to file %v! %v", sniffer.config.WriteFile, err) } } @@ -306,8 +305,6 @@ func (sniffer *SnifferSetup) Run() error { func (sniffer *SnifferSetup) Close() error { switch sniffer.config.Type { - case "file": - sniffer.pcapHandle.Close() case "pcap": sniffer.pcapHandle.Close() case "af_packet": @@ -318,12 +315,13 @@ func (sniffer *SnifferSetup) Close() error { func (sniffer *SnifferSetup) Reopen() error { var err error + time.Sleep(100 * time.Millisecond) - if sniffer.config.Type != "file" { - return fmt.Errorf("Reopen is only possible for files") + if sniffer.config.Type != "pcap" || sniffer.config.ReadFile == "" { + return fmt.Errorf("Reopen is only possible for files and in pcap mode") } - sniffer.pcapHandle.Close() + sniffer.Close() sniffer.pcapHandle, err = pcap.OpenOffline(sniffer.config.ReadFile) if err != nil { return err @@ -352,18 +350,14 @@ func (sniffer *SnifferSetup) IsAlive() bool { func (sniffer *SnifferSetup) printStats() { var err error + if sniffer.config.ReadFile != "" { + logp.Info("Read in pcap file. Stats won't be generated.") + return + } for { <-time.After(1 * time.Minute) go func() { switch sniffer.config.Type { - case "file": - sniffer.pcapStats, err = sniffer.pcapHandle.Stats() - if err != nil { - logp.Warn("Stats err: %v", err) - } - logp.Info("Packets overall received: %d, dropped by OS: %d, dropped by interface: %d", - sniffer.pcapStats.PacketsReceived, sniffer.pcapStats.PacketsDropped, sniffer.pcapStats.PacketsIfDropped) - case "pcap": sniffer.pcapStats, err = sniffer.pcapHandle.Stats() if err != nil {