From 4e48058f5edf294de44f1011da85345bb5d26076 Mon Sep 17 00:00:00 2001 From: negbie Date: Thu, 21 Sep 2017 17:01:37 +0200 Subject: [PATCH 1/6] Readme --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index ed0f4b4..6764405 100644 --- a/README.md +++ b/README.md @@ -3,10 +3,10 @@ heplify is captagents little brother. While it offers a compareable performance It's a single binary which you can place on your linux or windows machine. Just run it to capture packets and send them to Homer. Right now heplify is able to send SIP, DNS, LOG or TLS handshakes into homer. It's able to handle fragmented and duplicate packets out of the box. - + ### Requirements * libpcap - + On Debian/Ubuntu: sudo apt-get install libpcap-dev On CentOS/RHEL: yum install libpcap-devel On Windows: install WinPcap From e8003ff3d59e0d0d0df62c3b04b6095feded5a2a Mon Sep 17 00:00:00 2001 From: negbie Date: Thu, 19 Oct 2017 14:51:59 +0200 Subject: [PATCH 2/6] Add RTCP skeleton --- build_static.sh | 17 +++ decoder/decoder.go | 7 +- main.go | 2 +- protos/rtcp.go | 294 +++++++++++++++++++++++++++++++++++++++++++++ sniffer/sniffer.go | 8 +- 5 files changed, 323 insertions(+), 5 deletions(-) create mode 100644 build_static.sh create mode 100644 protos/rtcp.go diff --git a/build_static.sh b/build_static.sh new file mode 100644 index 0000000..8f24c24 --- /dev/null +++ b/build_static.sh @@ -0,0 +1,17 @@ +#!/bin/sh + +set -ex + +apk update +apk add linux-headers musl-dev gcc go libpcap-dev ca-certificates git + +mkdir /go +export GOPATH=/go +mkdir -p /go/src/github.com/negbie +mkdir -p /mnt/out +cp -a /mnt /go/src/github.com/negbie/heplify +cd /go/src/github.com/negbie/heplify +rm -f heplify* +go get -v ./ ./ +go build --ldflags '-linkmode external -extldflags "-static -s -w"' -v ./ +cp ./heplify /mnt/out/ diff --git a/decoder/decoder.go b/decoder/decoder.go index 4dc255f..2b29dc8 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -109,8 +109,8 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error ip4New, err := d.defragger.DefragIPv4(ip4) if err != nil { - logp.Err("Error while de-fragmenting", err) - return nil, err + logp.Warn("Error while de-fragmenting", err) + return nil, nil } else if ip4New == nil { d.fragCount++ return nil, nil @@ -148,6 +148,9 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error pkt.Sport = uint16(udp.SrcPort) pkt.Dport = uint16(udp.DstPort) pkt.Payload = udp.Payload + if (udp.Payload[0]&0xc0)>>6 == 2 && udp.Payload[1] == 200 || udp.Payload[1] == 201 { + pkt.Payload, _ = protos.ParseRTCP(udp.Payload) + } } else if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil { tcp, ok := tcpLayer.(*layers.TCP) diff --git a/main.go b/main.go index e134cf2..e0a51b9 100644 --- a/main.go +++ b/main.go @@ -38,7 +38,7 @@ func parseFlags() { flag.StringVar(&fileRotator.Path, "p", "./", "Log filepath") flag.StringVar(&fileRotator.Name, "n", "heplify.log", "Log filename") flag.Uint64Var(&rotateEveryKB, "r", 16384, "Log filesize (KB)") - flag.StringVar(&config.Cfg.Mode, "m", "SIP", "Capture modes [DNS, LOG, SIP, TLS]") + flag.StringVar(&config.Cfg.Mode, "m", "SIP", "Capture modes [DNS, LOG, SIP, RTCP, TLS]") flag.BoolVar(&config.Cfg.Dedup, "dd", true, "Deduplicate packets") flag.StringVar(&config.Cfg.Filter, "fi", "", "Filter out interesting packets like SIP INVITES, Handshakes ...") flag.StringVar(&config.Cfg.Discard, "di", "", "Discard uninteresting packets like SIP OPTIONS, HTTP Requests ...") diff --git a/protos/rtcp.go b/protos/rtcp.go new file mode 100644 index 0000000..3c76229 --- /dev/null +++ b/protos/rtcp.go @@ -0,0 +1,294 @@ +package protos + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + + "github.com/negbie/heplify/logp" +) + +/* RTCP header + 0 1 2 3 4 + 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +header |V=2|P| RC | PT(200-204) | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +SR: + 0 1 2 3 4 + 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + header |V=2|P| RC | PT=SR=200 | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of sender | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + sender | NTP timestamp,most significant word | + info +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | NTP timestamp,least significant word | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | RTP timestamp | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | sender's packet count | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | sender's octet count | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + report | SSRC_1 (SSRC of first source) | + block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 1 | fraction lost | cumulative number of packets lost | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | extended highest sequence number received | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | interarrival jitter | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | last SR (LSR) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | delay since last SR (DLSR) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + report | SSRC_2 (SSRC of second source) | + block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 2 : ... : + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | profile-specific extensions | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +RR: + 0 1 2 3 4 + 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + header |V=2|P| RC | PT=PR=201 | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of sender | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + report | SSRC_1 (SSRC of first source) | + block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 1 | fraction lost | cumulative number of packets lost | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | extended highest sequence number received | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | interarrival jitter | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | last SR (LSR) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | delay since last SR (DLSR) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + report | SSRC_2 (SSRC of second source) | + block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 2 : ... : + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | profile-specific extensions | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +SDES: + 0 1 2 3 4 + 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + header |V=2|P| SC | PT=SR=202 | length | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | SSRC/CSRC_1 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SDES items | + | ... | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | SSRC/CSRC_2 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SDES items | + | ... | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + +BYE: + 0 1 2 3 4 + 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + header |V=2|P| SC | PT=SR=203 | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC/CSRC | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + : ... : + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + (opt) | length | reason for leaving ... + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + APP: + 0 1 2 3 4 + 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + header |V=2|P| SC | PT=SR=204 | length | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | SSRC/CSRC | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | name (ASCII) | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | application-dependent data ... + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +*/ + +const ( + TYPE_RTCP_SR = 200 + TYPE_RTCP_RR = 201 + TYPE_RTCP_SDES = 202 + TYPE_RTCP_BYE = 203 + TYPE_RTCP_APP = 204 +) + +type RTCP_header struct { + Version uint8 `json:"version"` // 2 bit + Padding uint8 `json:"padding"` // 1 bit + ReceptionReportCount uint8 `json:"report_count"` // 5 bit + PacketType uint8 `json:"type"` // 16 bit + Length uint16 `json:"length"` // 16 bit +} + +type RTCP_report_block struct { + Ssrc uint32 `json:"source_ssrc"` // 32 bit + Fraction_lost uint8 `json:"fraction_lost"` // 8 bit + Cumulative_lost uint32 `json:"packets_lost"` // 24 bit + Highest_seq_no uint32 `json:"highest_seq_no"` // 32 bit + Jitter uint32 `json:"ia_jitter"` // 32 bit + LastSR uint32 `json:"lsr"` // 32 bit + Delay_last_SR uint32 `json:"dlsr"` // 32 bit +} + +type RTCP_SR struct { + Ssrc uint32 `json:"ssrc"` // 32 bit + Ntp_timestamp_MSW uint32 `json:"ntp_timestamp_sec"` // 32 bit + Ntp_timestamp_LSW uint32 `json:"ntp_timestamp_usec"` // 32 bit + Rtp_timestamp uint32 `json:"rtp_timestamp"` // 32 bit + Pkt_count uint32 `json:"packets"` // 32 bit + Octet_count uint32 `json:"octets"` // 32 bit + ReportBlocks []RTCP_report_block `json:"report_blocks"` +} + +type RTCP_RR struct { + ReportBlocks []RTCP_report_block `json:"report_blocks"` +} + +type RTCP_SDES struct { +} + +type RTCP_BYE struct { +} + +type RTCP_APP struct { +} + +type RTCP_Packet struct { + Header RTCP_header `json:"header"` + Sender_information interface{} `json:"sender_information"` +} + +//read data by bit +type BitReader struct { + buf []byte + curBit int +} + +func (br *BitReader) Init(data []byte) { + br.curBit = 0 + br.buf = make([]byte, len(data)) + copy(br.buf, data) +} + +func (br *BitReader) ReadBit() int { + if br.curBit > (len(br.buf) << 3) { + return -1 + } + idx := (br.curBit >> 3) + offset := br.curBit%8 + 1 + br.curBit++ + return int(br.buf[idx]>>uint(8-offset)) & 0x01 +} + +func (br *BitReader) ReadBits(num int) int { + r := 0 + for i := 0; i < num; i++ { + r |= (br.ReadBit() << uint(num-i-1)) + } + return r +} + +func (br *BitReader) Read32Bits() uint32 { + idx := (br.curBit >> 3) + var r uint32 + binary.Read(bytes.NewReader(br.buf[idx:]), binary.BigEndian, &r) + br.curBit += 32 + return r +} + +func ParseRTCP(data []byte) ([]byte, error) { + pkt := &RTCP_Packet{} + if len(data) < 4 { + return nil, errors.New("Not enough packets inside RTCP header") + } + + pkt.Header.Version = (data[0] & 0xc0) >> 6 + pkt.Header.Padding = (data[0] & 0x20) >> 5 + pkt.Header.ReceptionReportCount = data[0] & 0x1f + pkt.Header.PacketType = data[1] + pkt.Header.Length = binary.BigEndian.Uint16(data[2:]) + + if pkt.Header.ReceptionReportCount < 0 { + return nil, errors.New("Negative reception report count") + } + reader := &BitReader{} + reader.Init(data[4:]) + switch pkt.Header.PacketType { + case TYPE_RTCP_SR: + sr := &RTCP_SR{} + pkt.Sender_information = sr + sr.Ssrc = uint32(reader.Read32Bits()) + sr.Ntp_timestamp_MSW = uint32(reader.Read32Bits()) + sr.Ntp_timestamp_LSW = uint32(reader.Read32Bits()) + sr.Rtp_timestamp = uint32(reader.Read32Bits()) + sr.Pkt_count = uint32(reader.Read32Bits()) + sr.Octet_count = uint32(reader.Read32Bits()) + + if pkt.Header.ReceptionReportCount > 0 { + sr.ReportBlocks = make([]RTCP_report_block, pkt.Header.ReceptionReportCount) + for i := 0; i < int(pkt.Header.ReceptionReportCount); i++ { + sr.ReportBlocks[i].Ssrc = uint32(reader.Read32Bits()) + sr.ReportBlocks[i].Fraction_lost = byte(reader.ReadBits(8)) + sr.ReportBlocks[i].Cumulative_lost = uint32(reader.ReadBits(24)) + sr.ReportBlocks[i].Highest_seq_no = uint32(reader.Read32Bits()) + sr.ReportBlocks[i].Jitter = uint32(reader.Read32Bits()) + sr.ReportBlocks[i].LastSR = uint32(reader.Read32Bits()) + sr.ReportBlocks[i].Delay_last_SR = uint32(reader.Read32Bits()) + } + } + + case TYPE_RTCP_RR: + rr := &RTCP_RR{} + if pkt.Header.ReceptionReportCount > 0 { + rr.ReportBlocks = make([]RTCP_report_block, pkt.Header.ReceptionReportCount) + for i := 0; i < int(pkt.Header.ReceptionReportCount); i++ { + rr.ReportBlocks[i].Ssrc = uint32(reader.Read32Bits()) + rr.ReportBlocks[i].Fraction_lost = byte(reader.ReadBits(8)) + rr.ReportBlocks[i].Cumulative_lost = uint32(reader.ReadBits(24)) + rr.ReportBlocks[i].Highest_seq_no = uint32(reader.Read32Bits()) + rr.ReportBlocks[i].Jitter = uint32(reader.Read32Bits()) + rr.ReportBlocks[i].LastSR = uint32(reader.Read32Bits()) + rr.ReportBlocks[i].Delay_last_SR = uint32(reader.Read32Bits()) + } + } + case TYPE_RTCP_SDES: + logp.Info("Discard RTCP_SDES packet type: %d", pkt.Header.PacketType) + case TYPE_RTCP_APP: + logp.Info("Discard RTCP_APP packet type: %d", pkt.Header.PacketType) + case TYPE_RTCP_BYE: + logp.Info("Discard RTCP_BYE packet type: %d", pkt.Header.PacketType) + default: + logp.Info("Discard unsupported packet type: %d", pkt.Header.PacketType) + } + rtcpPkt, err := json.Marshal(pkt) + + if err != nil { + fmt.Println("json error!") + logp.Err("RTCP marshal", err) + return nil, err + } + + fmt.Println(string(rtcpPkt)) + return nil, nil +} diff --git a/sniffer/sniffer.go b/sniffer/sniffer.go index 3cae13b..2246e7b 100644 --- a/sniffer/sniffer.go +++ b/sniffer/sniffer.go @@ -96,6 +96,10 @@ func (sniffer *SnifferSetup) setFromConfig(cfg *config.InterfacesConfig) error { switch sniffer.mode { case "SIP": sniffer.filter = "(greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0) or (vlan and (greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0))" + case "RTCP": + sniffer.filter = "(ip and ip[6] & 0x2 = 0 and ip[6:2] & 0x1fff = 0 and udp and udp[8] & 0xc0 = 0x80 and udp[9] >= 0xc8 && udp[9] <= 0xcc)" + case "SIPRTCP": + sniffer.filter = "(greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0) or (ip and ip[6] & 0x2 = 0 and ip[6:2] & 0x1fff = 0 and udp and udp[8] & 0xc0 = 0x80 and udp[9] >= 0xc8 && udp[9] <= 0xcc))" case "LOG": sniffer.filter = "greater 128 and port 514" case "DNS": @@ -107,7 +111,7 @@ func (sniffer *SnifferSetup) setFromConfig(cfg *config.InterfacesConfig) error { sniffer.filter = "(greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0) or (vlan and (greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0))" } - logp.Debug("sniffer", "Sniffer type: [%s] device: [%s] mode: [%s]", sniffer.config.Type, sniffer.config.Device, sniffer.mode) + logp.Info("sniffer", "Sniffer type: [%s] device: [%s] mode: [%s]", sniffer.config.Type, sniffer.config.Device, sniffer.mode) switch sniffer.config.Type { case "file": @@ -167,7 +171,7 @@ func (sniffer *SnifferSetup) Init(testMode bool, mode string, factory WorkerFact var err error sniffer.mode = mode - if interfaces.Device == "" { + if interfaces.Device == "" && interfaces.ReadFile == "" { fmt.Printf("\nPlease use one of the following devices:\n\n") _, err := ListDeviceNames(false, false) if err != nil { From b603899b36172a64c9669253fd58408c11d1d72e Mon Sep 17 00:00:00 2001 From: Eugen Biegler Date: Sun, 5 Nov 2017 19:44:45 +0100 Subject: [PATCH 3/6] Clean up ParseRTCP --- decoder/decoder.go | 7 +- protos/rtcp.go | 223 +++++++++++++++++++++------------------------ sniffer/sniffer.go | 57 +++++------- 3 files changed, 130 insertions(+), 157 deletions(-) diff --git a/decoder/decoder.go b/decoder/decoder.go index 2b29dc8..0287546 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -148,8 +148,11 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error pkt.Sport = uint16(udp.SrcPort) pkt.Dport = uint16(udp.DstPort) pkt.Payload = udp.Payload - if (udp.Payload[0]&0xc0)>>6 == 2 && udp.Payload[1] == 200 || udp.Payload[1] == 201 { - pkt.Payload, _ = protos.ParseRTCP(udp.Payload) + if (udp.Payload[0]&0xc0)>>6 == 2 && (udp.Payload[1] == 200 || udp.Payload[1] == 201) { + pkt.Payload, err = protos.ParseRTCP(udp.Payload) + if err != nil { + logp.Warn(err) + return nil, nil } } else if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil { diff --git a/protos/rtcp.go b/protos/rtcp.go index 3c76229..5d558d0 100644 --- a/protos/rtcp.go +++ b/protos/rtcp.go @@ -1,11 +1,9 @@ package protos import ( - "bytes" "encoding/binary" "encoding/json" "errors" - "fmt" "github.com/negbie/heplify/logp" ) @@ -131,16 +129,29 @@ const ( TYPE_RTCP_SDES = 202 TYPE_RTCP_BYE = 203 TYPE_RTCP_APP = 204 + TYPE_RTCP_XR = 207 ) type RTCP_header struct { Version uint8 `json:"version"` // 2 bit Padding uint8 `json:"padding"` // 1 bit ReceptionReportCount uint8 `json:"report_count"` // 5 bit - PacketType uint8 `json:"type"` // 16 bit + PacketType uint8 `json:"type"` // 8 bit Length uint16 `json:"length"` // 16 bit } +type RTCP_Packet struct { + SenderInformation struct { + Ssrc uint32 `json:"ssrc"` // 32 bit + Ntp_timestamp_MSW uint32 `json:"ntp_timestamp_sec"` // 32 bit + Ntp_timestamp_LSW uint32 `json:"ntp_timestamp_usec"` // 32 bit + Rtp_timestamp uint32 `json:"rtp_timestamp"` // 32 bit + Pkt_count uint32 `json:"packets"` // 32 bit + Octet_count uint32 `json:"octets"` // 32 bit + } `json:"sender_information"` + ReportBlocks []RTCP_report_block `json:"report_blocks"` +} + type RTCP_report_block struct { Ssrc uint32 `json:"source_ssrc"` // 32 bit Fraction_lost uint8 `json:"fraction_lost"` // 8 bit @@ -149,146 +160,116 @@ type RTCP_report_block struct { Jitter uint32 `json:"ia_jitter"` // 32 bit LastSR uint32 `json:"lsr"` // 32 bit Delay_last_SR uint32 `json:"dlsr"` // 32 bit + ReportCount uint8 `json:"report_count"` // 8 bit + PacketType uint8 `json:"type"` // 8 bit } -type RTCP_SR struct { - Ssrc uint32 `json:"ssrc"` // 32 bit - Ntp_timestamp_MSW uint32 `json:"ntp_timestamp_sec"` // 32 bit - Ntp_timestamp_LSW uint32 `json:"ntp_timestamp_usec"` // 32 bit - Rtp_timestamp uint32 `json:"rtp_timestamp"` // 32 bit - Pkt_count uint32 `json:"packets"` // 32 bit - Octet_count uint32 `json:"octets"` // 32 bit - ReportBlocks []RTCP_report_block `json:"report_blocks"` +func (rp *RTCP_Packet) AddReportBlock(rb RTCP_report_block) []RTCP_report_block { + rp.ReportBlocks = append(rp.ReportBlocks, rb) + return rp.ReportBlocks } -type RTCP_RR struct { - ReportBlocks []RTCP_report_block `json:"report_blocks"` +func (rp *RTCP_Packet) MarshalJSON() ([]byte, error) { + bytes, err := json.Marshal(*rp) + return bytes, err } -type RTCP_SDES struct { -} +func ParseRTCP(data []byte) ([]byte, error) { + if len(data) < 28 { + return nil, errors.New("No usefull data inside RTCP packet") + } + pkt := &RTCP_Packet{} + var offset int -type RTCP_BYE struct { -} + for pktLen := len(data); pktLen > 0; { + if pktLen%4 != 0 || pktLen < 4 { + return nil, errors.New("Fishy RTCP packet") + } -type RTCP_APP struct { -} + //version := (data[offset] & 0xc0) >> 6 + //padding := (data[offset] & 0x20) >> 5 + receptionReportCount := data[offset] & 0x1f + packetType := data[offset+1] + //packetLength := binary.BigEndian.Uint16(data[offset+2:]) -type RTCP_Packet struct { - Header RTCP_header `json:"header"` - Sender_information interface{} `json:"sender_information"` -} + offset = offset + 4 -//read data by bit -type BitReader struct { - buf []byte - curBit int -} + if receptionReportCount < 0 || pktLen < 28 { + return nil, errors.New("Negative reception report or to small RTCP packet") + } -func (br *BitReader) Init(data []byte) { - br.curBit = 0 - br.buf = make([]byte, len(data)) - copy(br.buf, data) -} + switch packetType { + case TYPE_RTCP_SR: -func (br *BitReader) ReadBit() int { - if br.curBit > (len(br.buf) << 3) { - return -1 - } - idx := (br.curBit >> 3) - offset := br.curBit%8 + 1 - br.curBit++ - return int(br.buf[idx]>>uint(8-offset)) & 0x01 -} + pkt.SenderInformation.Ssrc = binary.BigEndian.Uint32(data[offset:]) + pkt.SenderInformation.Ntp_timestamp_MSW = binary.BigEndian.Uint32(data[offset+4:]) + pkt.SenderInformation.Ntp_timestamp_LSW = binary.BigEndian.Uint32(data[offset+8:]) + pkt.SenderInformation.Rtp_timestamp = binary.BigEndian.Uint32(data[offset+12:]) + pkt.SenderInformation.Pkt_count = binary.BigEndian.Uint32(data[offset+16:]) + pkt.SenderInformation.Octet_count = binary.BigEndian.Uint32(data[offset+20:]) + offset = offset + 24 -func (br *BitReader) ReadBits(num int) int { - r := 0 - for i := 0; i < num; i++ { - r |= (br.ReadBit() << uint(num-i-1)) - } - return r -} + if receptionReportCount > 0 { + tmpReportBlocks := make([]RTCP_report_block, receptionReportCount) + for i := 0; i < int(receptionReportCount); i++ { + tmpReportBlocks[i].Ssrc = binary.BigEndian.Uint32(data[offset:]) + tmpReportBlocks[i].Fraction_lost = data[offset+4] + var cumBuf [4]byte + copy(cumBuf[1:], data[offset+5:offset+8]) + tmpReportBlocks[i].Cumulative_lost = binary.BigEndian.Uint32(cumBuf[:]) + tmpReportBlocks[i].Highest_seq_no = binary.BigEndian.Uint32(data[offset+8:]) + tmpReportBlocks[i].Jitter = binary.BigEndian.Uint32(data[offset+12:]) + tmpReportBlocks[i].LastSR = binary.BigEndian.Uint32(data[offset+16:]) + tmpReportBlocks[i].Delay_last_SR = binary.BigEndian.Uint32(data[offset+20:]) + tmpReportBlocks[i].ReportCount = receptionReportCount + tmpReportBlocks[i].PacketType = packetType + offset = offset + 24 + pkt.ReportBlocks = pkt.AddReportBlock(tmpReportBlocks[i]) + } + } -func (br *BitReader) Read32Bits() uint32 { - idx := (br.curBit >> 3) - var r uint32 - binary.Read(bytes.NewReader(br.buf[idx:]), binary.BigEndian, &r) - br.curBit += 32 - return r -} + case TYPE_RTCP_RR: + if receptionReportCount > 0 { + tmpReportBlocks := make([]RTCP_report_block, receptionReportCount) + for i := 0; i < int(receptionReportCount); i++ { + tmpReportBlocks[i].Ssrc = binary.BigEndian.Uint32(data[offset:]) + tmpReportBlocks[i].Fraction_lost = data[offset+4] + var cumBuf [4]byte + copy(cumBuf[1:], data[offset+5:offset+8]) + tmpReportBlocks[i].Cumulative_lost = binary.BigEndian.Uint32(cumBuf[:]) + tmpReportBlocks[i].Highest_seq_no = binary.BigEndian.Uint32(data[offset+8:]) + tmpReportBlocks[i].Jitter = binary.BigEndian.Uint32(data[offset+12:]) + tmpReportBlocks[i].LastSR = binary.BigEndian.Uint32(data[offset+16:]) + tmpReportBlocks[i].Delay_last_SR = binary.BigEndian.Uint32(data[offset+20:]) + tmpReportBlocks[i].ReportCount = receptionReportCount + tmpReportBlocks[i].PacketType = packetType + offset = offset + 24 + pkt.ReportBlocks = pkt.AddReportBlock(tmpReportBlocks[i]) + } + } -func ParseRTCP(data []byte) ([]byte, error) { - pkt := &RTCP_Packet{} - if len(data) < 4 { - return nil, errors.New("Not enough packets inside RTCP header") - } + case TYPE_RTCP_SDES: + logp.Info("Discard RTCP_SDES packet type: %d", packetType) + case TYPE_RTCP_APP: + logp.Info("Discard RTCP_APP packet type: %d", packetType) + case TYPE_RTCP_BYE: + logp.Info("Discard RTCP_BYE packet type: %d", packetType) + case TYPE_RTCP_XR: + logp.Info("Discard RTCP_XR packet type: %d", packetType) + default: + logp.Info("Discard unsupported packet type: %d", packetType) + } - pkt.Header.Version = (data[0] & 0xc0) >> 6 - pkt.Header.Padding = (data[0] & 0x20) >> 5 - pkt.Header.ReceptionReportCount = data[0] & 0x1f - pkt.Header.PacketType = data[1] - pkt.Header.Length = binary.BigEndian.Uint16(data[2:]) + pktLen -= offset - if pkt.Header.ReceptionReportCount < 0 { - return nil, errors.New("Negative reception report count") } - reader := &BitReader{} - reader.Init(data[4:]) - switch pkt.Header.PacketType { - case TYPE_RTCP_SR: - sr := &RTCP_SR{} - pkt.Sender_information = sr - sr.Ssrc = uint32(reader.Read32Bits()) - sr.Ntp_timestamp_MSW = uint32(reader.Read32Bits()) - sr.Ntp_timestamp_LSW = uint32(reader.Read32Bits()) - sr.Rtp_timestamp = uint32(reader.Read32Bits()) - sr.Pkt_count = uint32(reader.Read32Bits()) - sr.Octet_count = uint32(reader.Read32Bits()) - - if pkt.Header.ReceptionReportCount > 0 { - sr.ReportBlocks = make([]RTCP_report_block, pkt.Header.ReceptionReportCount) - for i := 0; i < int(pkt.Header.ReceptionReportCount); i++ { - sr.ReportBlocks[i].Ssrc = uint32(reader.Read32Bits()) - sr.ReportBlocks[i].Fraction_lost = byte(reader.ReadBits(8)) - sr.ReportBlocks[i].Cumulative_lost = uint32(reader.ReadBits(24)) - sr.ReportBlocks[i].Highest_seq_no = uint32(reader.Read32Bits()) - sr.ReportBlocks[i].Jitter = uint32(reader.Read32Bits()) - sr.ReportBlocks[i].LastSR = uint32(reader.Read32Bits()) - sr.ReportBlocks[i].Delay_last_SR = uint32(reader.Read32Bits()) - } - } - case TYPE_RTCP_RR: - rr := &RTCP_RR{} - if pkt.Header.ReceptionReportCount > 0 { - rr.ReportBlocks = make([]RTCP_report_block, pkt.Header.ReceptionReportCount) - for i := 0; i < int(pkt.Header.ReceptionReportCount); i++ { - rr.ReportBlocks[i].Ssrc = uint32(reader.Read32Bits()) - rr.ReportBlocks[i].Fraction_lost = byte(reader.ReadBits(8)) - rr.ReportBlocks[i].Cumulative_lost = uint32(reader.ReadBits(24)) - rr.ReportBlocks[i].Highest_seq_no = uint32(reader.Read32Bits()) - rr.ReportBlocks[i].Jitter = uint32(reader.Read32Bits()) - rr.ReportBlocks[i].LastSR = uint32(reader.Read32Bits()) - rr.ReportBlocks[i].Delay_last_SR = uint32(reader.Read32Bits()) - } - } - case TYPE_RTCP_SDES: - logp.Info("Discard RTCP_SDES packet type: %d", pkt.Header.PacketType) - case TYPE_RTCP_APP: - logp.Info("Discard RTCP_APP packet type: %d", pkt.Header.PacketType) - case TYPE_RTCP_BYE: - logp.Info("Discard RTCP_BYE packet type: %d", pkt.Header.PacketType) - default: - logp.Info("Discard unsupported packet type: %d", pkt.Header.PacketType) - } - rtcpPkt, err := json.Marshal(pkt) + rtcpPkt, err := pkt.MarshalJSON() if err != nil { - fmt.Println("json error!") - logp.Err("RTCP marshal", err) return nil, err } - fmt.Println(string(rtcpPkt)) - return nil, nil + logp.Debug("rtcp", "RTCP json payload:\n%v\n", string(rtcpPkt)) + return rtcpPkt, nil } diff --git a/sniffer/sniffer.go b/sniffer/sniffer.go index 2246e7b..42ff714 100644 --- a/sniffer/sniffer.go +++ b/sniffer/sniffer.go @@ -89,7 +89,7 @@ func (sniffer *SnifferSetup) setFromConfig(cfg *config.InterfacesConfig) error { sniffer.config.Snaplen = 65535 } - if sniffer.config.Type != "file" && sniffer.config.Type != "af_packet" { + if sniffer.config.Type != "af_packet" { sniffer.config.Type = "pcap" } @@ -111,29 +111,24 @@ func (sniffer *SnifferSetup) setFromConfig(cfg *config.InterfacesConfig) error { sniffer.filter = "(greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0) or (vlan and (greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0))" } - logp.Info("sniffer", "Sniffer type: [%s] device: [%s] mode: [%s]", sniffer.config.Type, sniffer.config.Device, sniffer.mode) + logp.Info("Sniffer type: [%s] device: [%s] mode: [%s]", sniffer.config.Type, sniffer.config.Device, sniffer.mode) switch sniffer.config.Type { - case "file": - sniffer.pcapHandle, err = pcap.OpenOffline(sniffer.config.ReadFile) - if err != nil { - return fmt.Errorf("couldn't open file %v %v", sniffer.config.ReadFile, err) - } - err = sniffer.pcapHandle.SetBPFFilter(sniffer.filter) - if err != nil { - return fmt.Errorf("SetBPFFilter '%s' for pcap: %v", sniffer.filter, err) - } - - sniffer.DataSource = gopacket.PacketDataSource(sniffer.pcapHandle) - case "pcap": - sniffer.pcapHandle, err = pcap.OpenLive(sniffer.config.Device, int32(sniffer.config.Snaplen), true, pcap.BlockForever) - if err != nil { - return fmt.Errorf("setting pcap live mode: %v", err) - } - err = sniffer.pcapHandle.SetBPFFilter(sniffer.filter) - if err != nil { - return fmt.Errorf("SetBPFFilter '%s' for pcap: %v", sniffer.filter, err) + if sniffer.config.ReadFile != "" { + sniffer.pcapHandle, err = pcap.OpenOffline(sniffer.config.ReadFile) + if err != nil { + return fmt.Errorf("couldn't open file %v! %v", sniffer.config.ReadFile, err) + } + } else { + sniffer.pcapHandle, err = pcap.OpenLive(sniffer.config.Device, int32(sniffer.config.Snaplen), true, pcap.BlockForever) + if err != nil { + return fmt.Errorf("setting pcap live mode: %v", err) + } + err = sniffer.pcapHandle.SetBPFFilter(sniffer.filter) + if err != nil { + return fmt.Errorf("SetBPFFilter '%s' for pcap: %v", sniffer.filter, err) + } } sniffer.DataSource = gopacket.PacketDataSource(sniffer.pcapHandle) @@ -298,7 +293,7 @@ func (sniffer *SnifferSetup) Run() error { } else if sniffer.config.WriteFile != "" { err := sniffer.dumper.WritePacket(ci, data) if err != nil { - return fmt.Errorf("couldn't write to file %v %v", sniffer.config.WriteFile, err) + return fmt.Errorf("couldn't write to file %v! %v", sniffer.config.WriteFile, err) } } @@ -310,8 +305,6 @@ func (sniffer *SnifferSetup) Run() error { func (sniffer *SnifferSetup) Close() error { switch sniffer.config.Type { - case "file": - sniffer.pcapHandle.Close() case "pcap": sniffer.pcapHandle.Close() case "af_packet": @@ -323,8 +316,8 @@ func (sniffer *SnifferSetup) Close() error { func (sniffer *SnifferSetup) Reopen() error { var err error - if sniffer.config.Type != "file" { - return fmt.Errorf("Reopen is only possible for files") + if sniffer.config.Type != "pcap" || sniffer.config.ReadFile == "" { + return fmt.Errorf("Reopen is only possible for files and in pcap mode") } sniffer.pcapHandle.Close() @@ -356,18 +349,14 @@ func (sniffer *SnifferSetup) IsAlive() bool { func (sniffer *SnifferSetup) printStats() { var err error + if sniffer.config.ReadFile != "" { + logp.Info("Read in pcap file. Stats won't be generated.") + return + } for { <-time.After(1 * time.Minute) go func() { switch sniffer.config.Type { - case "file": - sniffer.pcapStats, err = sniffer.pcapHandle.Stats() - if err != nil { - logp.Warn("Stats err: %v", err) - } - logp.Info("Packets overall received: %d, dropped by OS: %d, dropped by interface: %d", - sniffer.pcapStats.PacketsReceived, sniffer.pcapStats.PacketsDropped, sniffer.pcapStats.PacketsIfDropped) - case "pcap": sniffer.pcapStats, err = sniffer.pcapHandle.Stats() if err != nil { From e25f909dbc00edb26a6b9355c2febf00f22d806f Mon Sep 17 00:00:00 2001 From: Eugen Biegler Date: Tue, 7 Nov 2017 00:59:41 +0100 Subject: [PATCH 4/6] Initial SIP<->RTCP correlation --- decoder/decoder.go | 132 ++++++++++++++++++++++++++++++++++++++++----- protos/rtcp.go | 18 ++++--- 2 files changed, 129 insertions(+), 21 deletions(-) diff --git a/decoder/decoder.go b/decoder/decoder.go index 0287546..db3514a 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -1,9 +1,15 @@ package decoder import ( + "bytes" + "errors" + "fmt" "hash" "os" + "strconv" + "time" + "github.com/allegro/bigcache" "github.com/cespare/xxhash" "github.com/google/gopacket" "github.com/google/gopacket/layers" @@ -24,7 +30,10 @@ type Decoder struct { tcpCount int dnsCount int unknownCount int + IPFlow gopacket.Flow + UDPFlow gopacket.Flow lru *lru.ARCCache + bigcache *bigcache.BigCache hash hash.Hash64 } @@ -43,16 +52,44 @@ type Packet struct { } func NewDecoder() *Decoder { - host, err := os.Hostname() if err != nil { host = "sniffer" } - l, err := lru.NewARC(8192) + + la, err := lru.NewARC(8192) if err != nil { logp.Err("lru %v", err) } - h := xxhash.New() + + xh := xxhash.New() + + bConf := bigcache.Config{ + // number of shards (must be a power of 2) + Shards: 1024, + // time after which entry can be evicted + LifeWindow: 10 * time.Minute, + // rps * lifeWindow, used only in initial memory allocation + MaxEntriesInWindow: 1000 * 180 * 60, + // max entry size in bytes, used only in initial memory allocation + MaxEntrySize: 384, + // prints information about additional memory allocation + Verbose: true, + // cache will not allocate more memory than this limit, value in MB + // if value is reached then the oldest entries can be overridden for the new ones + // 0 value means no size limit + HardMaxCacheSize: 1024, + // callback fired when the oldest entry is removed because of its + // expiration time or no space left for the new entry. Default value is nil which + // means no callback and it prevents from unwrapping the oldest entry. + OnRemove: nil, + } + + bc, err := bigcache.NewBigCache(bConf) + if err != nil { + logp.Err("bigcache %v", err) + } + d := &Decoder{ Host: host, defragger: ip4defrag.NewIPv4Defragmenter(), @@ -63,8 +100,9 @@ func NewDecoder() *Decoder { tcpCount: 0, dnsCount: 0, unknownCount: 0, - lru: l, - hash: h, + lru: la, + hash: xh, + bigcache: bc, } go d.flushFrag() go d.printStats() @@ -101,7 +139,9 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error } } + d.IPFlow = ip4.NetworkFlow() d.ip4Count++ + pkt.Version = ip4.Version pkt.Protocol = uint8(ip4.Protocol) pkt.Srcip = ip2int(ip4.SrcIP) @@ -133,10 +173,6 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error nextDecoder := ip4New.NextLayerType() nextDecoder.Decode(ip4New.Payload, pb) } - // TODO: generate a more meaningful CorrelationID - if config.Cfg.Mode == "DNS" || config.Cfg.Mode == "LOG" || config.Cfg.Mode == "TLS" { - pkt.CorrelationID = []byte(config.Cfg.Mode) - } } if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil { @@ -144,15 +180,19 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error if !ok { return nil, nil } + + d.UDPFlow = udp.TransportFlow() d.udpCount++ + pkt.Sport = uint16(udp.SrcPort) pkt.Dport = uint16(udp.DstPort) pkt.Payload = udp.Payload - if (udp.Payload[0]&0xc0)>>6 == 2 && (udp.Payload[1] == 200 || udp.Payload[1] == 201) { - pkt.Payload, err = protos.ParseRTCP(udp.Payload) - if err != nil { - logp.Warn(err) - return nil, nil + + if config.Cfg.Mode == "SIPRTCP" { + d.cacheSDPIPPort(udp.Payload) + if (udp.Payload[0]&0xc0)>>6 == 2 && (udp.Payload[1] == 200 || udp.Payload[1] == 201) { + pkt.Payload, pkt.CorrelationID = d.correlateRTCP(udp.Payload) + } } } else if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil { @@ -164,6 +204,10 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error pkt.Sport = uint16(tcp.SrcPort) pkt.Dport = uint16(tcp.DstPort) pkt.Payload = tcp.Payload + + if config.Cfg.Mode == "SIPRTCP" { + d.cacheSDPIPPort(tcp.Payload) + } } if dnsLayer := packet.Layer(layers.LayerTypeDNS); dnsLayer != nil { @@ -192,3 +236,63 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error d.unknownCount++ return nil, nil } + +func (d *Decoder) cacheSDPIPPort(payload []byte) error { + var SDPIP, RTCPPort string + var callID []byte + + if posSDPIP, posSDPPort := bytes.Index(payload, []byte("c=IN IP4 ")), bytes.Index(payload, []byte("m=audio ")); posSDPIP >= 0 && posSDPPort >= 0 { + restIP := payload[posSDPIP:] + if posRestIP := bytes.Index(restIP, []byte("\r\n")); posRestIP >= 0 { + SDPIP = string(restIP[len("c=IN IP4 "):bytes.Index(restIP, []byte("\r\n"))]) + } else { + return errors.New("Couldn't find end of SDP IP") + } + + restPort := payload[posSDPPort:] + if posRestPort := bytes.Index(restIP, []byte(" RTP")); posRestPort >= 0 { + SDPPort, err := strconv.Atoi(string(restPort[len("m=audio "):bytes.Index(restPort, []byte(" RTP"))])) + if err != nil { + return err + } + RTCPPort = strconv.Itoa(SDPPort + 1) + } else { + return errors.New("Couldn't find end of SDP Port") + } + + if posCallID := bytes.Index(payload, []byte("Call-ID: ")); posCallID >= 0 { + restCallID := payload[posCallID:] + if posRestCallID := bytes.Index(restIP, []byte("\r\n")); posRestCallID >= 0 { + callID = restCallID[len("Call-ID: "):bytes.Index(restCallID, []byte("\r\n"))] + } else { + return errors.New("Couldn't find end of Call-ID") + } + } else if posID := bytes.Index(payload, []byte("i: ")); posID >= 0 { + restID := payload[posID:] + if posRestID := bytes.Index(restIP, []byte("\r\n")); posRestID >= 0 { + callID = restID[len("i: "):bytes.Index(restID, []byte("\r\n"))] + } else { + return errors.New("Couldn't find end of Call-ID") + } + } + d.bigcache.Set(SDPIP+RTCPPort, callID) + } + return nil +} + +func (d *Decoder) correlateRTCP(payload []byte) ([]byte, []byte) { + jsonRTCP, err := protos.ParseRTCP(payload) + if err != nil { + logp.Warn("%v", err) + return nil, nil + } + + corrID, err := d.bigcache.Get(d.IPFlow.Src().String() + d.UDPFlow.Src().String()) + if err != nil { + logp.Warn("%v", err) + return nil, nil + } + + fmt.Println(string(jsonRTCP)) + return jsonRTCP, corrID +} diff --git a/protos/rtcp.go b/protos/rtcp.go index 5d558d0..891e4b1 100644 --- a/protos/rtcp.go +++ b/protos/rtcp.go @@ -190,7 +190,7 @@ func ParseRTCP(data []byte) ([]byte, error) { //padding := (data[offset] & 0x20) >> 5 receptionReportCount := data[offset] & 0x1f packetType := data[offset+1] - //packetLength := binary.BigEndian.Uint16(data[offset+2:]) + packetLength := binary.BigEndian.Uint16(data[offset+2:]) offset = offset + 4 @@ -249,15 +249,20 @@ func ParseRTCP(data []byte) ([]byte, error) { } case TYPE_RTCP_SDES: - logp.Info("Discard RTCP_SDES packet type: %d", packetType) + logp.Debug("rtcp", "Discard RTCP_SDES packet type: %d", packetType) + pktLen -= int(packetLength) * 4 case TYPE_RTCP_APP: - logp.Info("Discard RTCP_APP packet type: %d", packetType) + logp.Debug("rtcp", "Discard RTCP_APP packet type: %d", packetType) + pktLen -= int(packetLength) * 4 case TYPE_RTCP_BYE: - logp.Info("Discard RTCP_BYE packet type: %d", packetType) + logp.Debug("rtcp", "Discard RTCP_BYE packet type: %d", packetType) + pktLen -= int(packetLength) * 4 case TYPE_RTCP_XR: - logp.Info("Discard RTCP_XR packet type: %d", packetType) + logp.Debug("rtcp", "Discard RTCP_XR packet type: %d", packetType) + pktLen -= int(packetLength) * 4 default: - logp.Info("Discard unsupported packet type: %d", packetType) + logp.Debug("rtcp", "Discard unsupported packet type: %d", packetType) + pktLen -= int(packetLength) * 4 } pktLen -= offset @@ -265,7 +270,6 @@ func ParseRTCP(data []byte) ([]byte, error) { } rtcpPkt, err := pkt.MarshalJSON() - if err != nil { return nil, err } From 311de5771a86f82715f9b3ebc24ec40c58c7438b Mon Sep 17 00:00:00 2001 From: negbie Date: Tue, 7 Nov 2017 18:29:47 +0100 Subject: [PATCH 5/6] Better RTCP error handling --- decoder/decoder.go | 16 +++++++------ protos/rtcp.go | 57 ++++++++++++++++++++++++++-------------------- publish/hep.go | 13 +++++++---- sniffer/sniffer.go | 2 +- 4 files changed, 51 insertions(+), 37 deletions(-) diff --git a/decoder/decoder.go b/decoder/decoder.go index db3514a..a66a80f 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -3,7 +3,6 @@ package decoder import ( "bytes" "errors" - "fmt" "hash" "os" "strconv" @@ -49,6 +48,7 @@ type Packet struct { Dport uint16 CorrelationID []byte Payload []byte + Type byte } func NewDecoder() *Decoder { @@ -187,11 +187,12 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error pkt.Sport = uint16(udp.SrcPort) pkt.Dport = uint16(udp.DstPort) pkt.Payload = udp.Payload + pkt.Type = 1 if config.Cfg.Mode == "SIPRTCP" { d.cacheSDPIPPort(udp.Payload) if (udp.Payload[0]&0xc0)>>6 == 2 && (udp.Payload[1] == 200 || udp.Payload[1] == 201) { - pkt.Payload, pkt.CorrelationID = d.correlateRTCP(udp.Payload) + pkt.Payload, pkt.CorrelationID, pkt.Type = d.correlateRTCP(udp.Payload) } } @@ -204,6 +205,7 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error pkt.Sport = uint16(tcp.SrcPort) pkt.Dport = uint16(tcp.DstPort) pkt.Payload = tcp.Payload + pkt.Type = 1 if config.Cfg.Mode == "SIPRTCP" { d.cacheSDPIPPort(tcp.Payload) @@ -280,19 +282,19 @@ func (d *Decoder) cacheSDPIPPort(payload []byte) error { return nil } -func (d *Decoder) correlateRTCP(payload []byte) ([]byte, []byte) { +func (d *Decoder) correlateRTCP(payload []byte) ([]byte, []byte, byte) { jsonRTCP, err := protos.ParseRTCP(payload) if err != nil { logp.Warn("%v", err) - return nil, nil + return nil, nil, 0 } corrID, err := d.bigcache.Get(d.IPFlow.Src().String() + d.UDPFlow.Src().String()) if err != nil { logp.Warn("%v", err) - return nil, nil + return nil, nil, 0 } - fmt.Println(string(jsonRTCP)) - return jsonRTCP, corrID + //fmt.Println(string(jsonRTCP)) + return jsonRTCP, corrID, 5 } diff --git a/protos/rtcp.go b/protos/rtcp.go index 891e4b1..bc22541 100644 --- a/protos/rtcp.go +++ b/protos/rtcp.go @@ -3,7 +3,7 @@ package protos import ( "encoding/binary" "encoding/json" - "errors" + "fmt" "github.com/negbie/heplify/logp" ) @@ -136,7 +136,7 @@ type RTCP_header struct { Version uint8 `json:"version"` // 2 bit Padding uint8 `json:"padding"` // 1 bit ReceptionReportCount uint8 `json:"report_count"` // 5 bit - PacketType uint8 `json:"type"` // 8 bit + RTCPType uint8 `json:"type"` // 8 bit Length uint16 `json:"length"` // 16 bit } @@ -161,7 +161,7 @@ type RTCP_report_block struct { LastSR uint32 `json:"lsr"` // 32 bit Delay_last_SR uint32 `json:"dlsr"` // 32 bit ReportCount uint8 `json:"report_count"` // 8 bit - PacketType uint8 `json:"type"` // 8 bit + RTCPType uint8 `json:"type"` // 8 bit } func (rp *RTCP_Packet) AddReportBlock(rb RTCP_report_block) []RTCP_report_block { @@ -176,30 +176,33 @@ func (rp *RTCP_Packet) MarshalJSON() ([]byte, error) { func ParseRTCP(data []byte) ([]byte, error) { if len(data) < 28 { - return nil, errors.New("No usefull data inside RTCP packet") + return nil, fmt.Errorf("No usefull data inside RTCP packet='%s' length=%d", string(data), len(data)) } pkt := &RTCP_Packet{} - var offset int + offset := 0 for pktLen := len(data); pktLen > 0; { if pktLen%4 != 0 || pktLen < 4 { - return nil, errors.New("Fishy RTCP packet") + return nil, fmt.Errorf("Fishy RTCP packet=%v length=%d", data, pktLen) } //version := (data[offset] & 0xc0) >> 6 //padding := (data[offset] & 0x20) >> 5 receptionReportCount := data[offset] & 0x1f - packetType := data[offset+1] - packetLength := binary.BigEndian.Uint16(data[offset+2:]) + RTCPType := data[offset+1] + RTCPLength := binary.BigEndian.Uint16(data[offset+2:]) * 4 offset = offset + 4 - if receptionReportCount < 0 || pktLen < 28 { - return nil, errors.New("Negative reception report or to small RTCP packet") + if receptionReportCount < 0 || receptionReportCount > 4 { + return nil, fmt.Errorf("Fishy RTCP receptionReportCount=%d", receptionReportCount) } - switch packetType { + switch RTCPType { case TYPE_RTCP_SR: + if RTCPLength < 24 { + return nil, fmt.Errorf("To small RTCP packet=%v length=%d type=%d", data[offset:RTCPLength], RTCPLength, RTCPType) + } pkt.SenderInformation.Ssrc = binary.BigEndian.Uint32(data[offset:]) pkt.SenderInformation.Ntp_timestamp_MSW = binary.BigEndian.Uint32(data[offset+4:]) @@ -209,7 +212,7 @@ func ParseRTCP(data []byte) ([]byte, error) { pkt.SenderInformation.Octet_count = binary.BigEndian.Uint32(data[offset+20:]) offset = offset + 24 - if receptionReportCount > 0 { + if receptionReportCount > 0 && RTCPLength >= 24 { tmpReportBlocks := make([]RTCP_report_block, receptionReportCount) for i := 0; i < int(receptionReportCount); i++ { tmpReportBlocks[i].Ssrc = binary.BigEndian.Uint32(data[offset:]) @@ -222,14 +225,18 @@ func ParseRTCP(data []byte) ([]byte, error) { tmpReportBlocks[i].LastSR = binary.BigEndian.Uint32(data[offset+16:]) tmpReportBlocks[i].Delay_last_SR = binary.BigEndian.Uint32(data[offset+20:]) tmpReportBlocks[i].ReportCount = receptionReportCount - tmpReportBlocks[i].PacketType = packetType + tmpReportBlocks[i].RTCPType = RTCPType offset = offset + 24 pkt.ReportBlocks = pkt.AddReportBlock(tmpReportBlocks[i]) } } case TYPE_RTCP_RR: - if receptionReportCount > 0 { + if RTCPLength < 24 { + return nil, fmt.Errorf("To small RTCP packet=%v length=%d type=%d", data[offset:RTCPLength], RTCPLength, RTCPType) + } + + if receptionReportCount > 0 && RTCPLength >= 24 { tmpReportBlocks := make([]RTCP_report_block, receptionReportCount) for i := 0; i < int(receptionReportCount); i++ { tmpReportBlocks[i].Ssrc = binary.BigEndian.Uint32(data[offset:]) @@ -242,27 +249,27 @@ func ParseRTCP(data []byte) ([]byte, error) { tmpReportBlocks[i].LastSR = binary.BigEndian.Uint32(data[offset+16:]) tmpReportBlocks[i].Delay_last_SR = binary.BigEndian.Uint32(data[offset+20:]) tmpReportBlocks[i].ReportCount = receptionReportCount - tmpReportBlocks[i].PacketType = packetType + tmpReportBlocks[i].RTCPType = RTCPType offset = offset + 24 pkt.ReportBlocks = pkt.AddReportBlock(tmpReportBlocks[i]) } } case TYPE_RTCP_SDES: - logp.Debug("rtcp", "Discard RTCP_SDES packet type: %d", packetType) - pktLen -= int(packetLength) * 4 + logp.Debug("rtcp", "Discard RTCP_SDES packet type: %d", RTCPType) + offset = offset + int(RTCPLength) case TYPE_RTCP_APP: - logp.Debug("rtcp", "Discard RTCP_APP packet type: %d", packetType) - pktLen -= int(packetLength) * 4 + logp.Debug("rtcp", "Discard RTCP_APP packet type: %d", RTCPType) + offset = offset + int(RTCPLength) case TYPE_RTCP_BYE: - logp.Debug("rtcp", "Discard RTCP_BYE packet type: %d", packetType) - pktLen -= int(packetLength) * 4 + logp.Debug("rtcp", "Discard RTCP_BYE packet type: %d", RTCPType) + offset = offset + int(RTCPLength) case TYPE_RTCP_XR: - logp.Debug("rtcp", "Discard RTCP_XR packet type: %d", packetType) - pktLen -= int(packetLength) * 4 + logp.Debug("rtcp", "Discard RTCP_XR packet type: %d", RTCPType) + offset = offset + int(RTCPLength) default: - logp.Debug("rtcp", "Discard unsupported packet type: %d", packetType) - pktLen -= int(packetLength) * 4 + logp.Debug("rtcp", "Discard unsupported packet type: %d", RTCPType) + offset = offset + int(RTCPLength) } pktLen -= offset diff --git a/publish/hep.go b/publish/hep.go index 2560b5b..ce2d1f1 100644 --- a/publish/hep.go +++ b/publish/hep.go @@ -6,7 +6,6 @@ import ( "encoding/binary" "net" - "github.com/negbie/heplify/config" "github.com/negbie/heplify/decoder" "github.com/negbie/heplify/logp" ) @@ -180,10 +179,16 @@ func makeChunck(chunckVen uint16, chunckType uint16, h *decoder.Packet) []byte { // Chunk protocol type (SIP/H323/RTP/MGCP/M2UA) case 0x000b: chunck = make([]byte, 6+1) - if config.Cfg.Mode == "SIP" { + switch h.Type { + case 1: chunck[6] = 1 // SIP - } else { + case 5: + chunck[6] = 5 // RTCP + case 100: chunck[6] = 100 // LOG + default: + chunck[6] = 66 // Unknown + } // Chunk capture agent ID @@ -240,7 +245,7 @@ func newHEPChuncks(h *decoder.Packet) []byte { buf.Write(makeChunck(0x0000, 0x000c, h)) buf.Write(makeChunck(0x0000, 0x000e, h)) buf.Write(makeChunck(0x0000, 0x000f, h)) - if config.Cfg.Mode != "SIP" { + if h.CorrelationID != nil { buf.Write(makeChunck(0x0000, 0x0011, h)) } return buf.Bytes() diff --git a/sniffer/sniffer.go b/sniffer/sniffer.go index 42ff714..fc6e4c8 100644 --- a/sniffer/sniffer.go +++ b/sniffer/sniffer.go @@ -99,7 +99,7 @@ func (sniffer *SnifferSetup) setFromConfig(cfg *config.InterfacesConfig) error { case "RTCP": sniffer.filter = "(ip and ip[6] & 0x2 = 0 and ip[6:2] & 0x1fff = 0 and udp and udp[8] & 0xc0 = 0x80 and udp[9] >= 0xc8 && udp[9] <= 0xcc)" case "SIPRTCP": - sniffer.filter = "(greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0) or (ip and ip[6] & 0x2 = 0 and ip[6:2] & 0x1fff = 0 and udp and udp[8] & 0xc0 = 0x80 and udp[9] >= 0xc8 && udp[9] <= 0xcc))" + sniffer.filter = "(greater 256 and portrange 5060-5090 or ip[6:2] & 0x1fff != 0) or (ip and ip[6] & 0x2 = 0 and ip[6:2] & 0x1fff = 0 and udp and udp[8] & 0xc0 = 0x80 and udp[9] >= 0xc8 && udp[9] <= 0xcc)" case "LOG": sniffer.filter = "greater 128 and port 514" case "DNS": From 24cece9be70153f1dea597233adcfac084c3b57b Mon Sep 17 00:00:00 2001 From: Eugen Biegler Date: Tue, 7 Nov 2017 23:45:20 +0100 Subject: [PATCH 6/6] Read sender ssrc for RTCP RR, set ReadFile timeout --- config/config.go | 2 +- decoder/decoder.go | 26 +++-- main.go | 4 +- protos/rtcp.go | 240 +++++++++++++++++++++++---------------------- sniffer/sniffer.go | 11 ++- 5 files changed, 143 insertions(+), 140 deletions(-) diff --git a/config/config.go b/config/config.go index 6f7ef3c..4eff8f3 100644 --- a/config/config.go +++ b/config/config.go @@ -23,7 +23,7 @@ type InterfacesConfig struct { WriteFile string `config:"write_file"` Snaplen int `config:"snaplen"` BufferSizeMb int `config:"buffer_size_mb"` - TopSpeed bool `config:"top_speed"` + ReadSpeed bool `config:"top_speed"` OneAtATime bool `config:"one_at_a_time"` Loop int `config:"loop"` } diff --git a/decoder/decoder.go b/decoder/decoder.go index a66a80f..3b44682 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -2,7 +2,6 @@ package decoder import ( "bytes" - "errors" "hash" "os" "strconv" @@ -31,7 +30,7 @@ type Decoder struct { unknownCount int IPFlow gopacket.Flow UDPFlow gopacket.Flow - lru *lru.ARCCache + lru *lru.Cache bigcache *bigcache.BigCache hash hash.Hash64 } @@ -57,7 +56,7 @@ func NewDecoder() *Decoder { host = "sniffer" } - la, err := lru.NewARC(8192) + la, err := lru.New(8000) if err != nil { logp.Err("lru %v", err) } @@ -68,17 +67,17 @@ func NewDecoder() *Decoder { // number of shards (must be a power of 2) Shards: 1024, // time after which entry can be evicted - LifeWindow: 10 * time.Minute, + LifeWindow: 180 * time.Minute, // rps * lifeWindow, used only in initial memory allocation MaxEntriesInWindow: 1000 * 180 * 60, // max entry size in bytes, used only in initial memory allocation - MaxEntrySize: 384, + MaxEntrySize: 300, // prints information about additional memory allocation Verbose: true, // cache will not allocate more memory than this limit, value in MB // if value is reached then the oldest entries can be overridden for the new ones // 0 value means no size limit - HardMaxCacheSize: 1024, + HardMaxCacheSize: 512, // callback fired when the oldest entry is removed because of its // expiration time or no space left for the new entry. Default value is nil which // means no callback and it prevents from unwrapping the oldest entry. @@ -239,7 +238,7 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error return nil, nil } -func (d *Decoder) cacheSDPIPPort(payload []byte) error { +func (d *Decoder) cacheSDPIPPort(payload []byte) { var SDPIP, RTCPPort string var callID []byte @@ -248,18 +247,18 @@ func (d *Decoder) cacheSDPIPPort(payload []byte) error { if posRestIP := bytes.Index(restIP, []byte("\r\n")); posRestIP >= 0 { SDPIP = string(restIP[len("c=IN IP4 "):bytes.Index(restIP, []byte("\r\n"))]) } else { - return errors.New("Couldn't find end of SDP IP") + logp.Warn("Couldn't find end of SDP IP in '%s'", string(restIP)) } restPort := payload[posSDPPort:] if posRestPort := bytes.Index(restIP, []byte(" RTP")); posRestPort >= 0 { SDPPort, err := strconv.Atoi(string(restPort[len("m=audio "):bytes.Index(restPort, []byte(" RTP"))])) if err != nil { - return err + logp.Warn("%v", err) } RTCPPort = strconv.Itoa(SDPPort + 1) } else { - return errors.New("Couldn't find end of SDP Port") + logp.Warn("Couldn't find end of SDP Port in '%s'", string(restPort)) } if posCallID := bytes.Index(payload, []byte("Call-ID: ")); posCallID >= 0 { @@ -267,19 +266,18 @@ func (d *Decoder) cacheSDPIPPort(payload []byte) error { if posRestCallID := bytes.Index(restIP, []byte("\r\n")); posRestCallID >= 0 { callID = restCallID[len("Call-ID: "):bytes.Index(restCallID, []byte("\r\n"))] } else { - return errors.New("Couldn't find end of Call-ID") + logp.Warn("Couldn't find end of Call-ID in '%s'", string(restCallID)) } } else if posID := bytes.Index(payload, []byte("i: ")); posID >= 0 { restID := payload[posID:] if posRestID := bytes.Index(restIP, []byte("\r\n")); posRestID >= 0 { callID = restID[len("i: "):bytes.Index(restID, []byte("\r\n"))] } else { - return errors.New("Couldn't find end of Call-ID") + logp.Warn("Couldn't find end of Call-ID in '%s'", string(restID)) } } d.bigcache.Set(SDPIP+RTCPPort, callID) } - return nil } func (d *Decoder) correlateRTCP(payload []byte) ([]byte, []byte, byte) { @@ -295,6 +293,6 @@ func (d *Decoder) correlateRTCP(payload []byte) ([]byte, []byte, byte) { return nil, nil, 0 } - //fmt.Println(string(jsonRTCP)) + logp.Debug("decoder", "RTCP JSON payload: %s", string(jsonRTCP)) return jsonRTCP, corrID, 5 } diff --git a/main.go b/main.go index e0a51b9..e64f3bb 100644 --- a/main.go +++ b/main.go @@ -28,8 +28,8 @@ func parseFlags() { flag.StringVar(&ifaceConfig.Type, "t", "pcap", "Capture types are [af_packet, pcap, file]") flag.StringVar(&ifaceConfig.ReadFile, "rf", "", "Read packets from file. Please use -t file") flag.StringVar(&ifaceConfig.WriteFile, "wf", "", "Write packets to file") - flag.IntVar(&ifaceConfig.Loop, "lp", 0, "Loop count over ReadFile") - flag.BoolVar(&ifaceConfig.TopSpeed, "ts", false, "Topspeed uses timestamps from packets") + flag.IntVar(&ifaceConfig.Loop, "lp", 1, "Loop count over ReadFile") + flag.BoolVar(&ifaceConfig.ReadSpeed, "rs", false, "Maximum read speed. Doesn't use packet timestamps") flag.IntVar(&ifaceConfig.Snaplen, "s", 32768, "Snap length") flag.IntVar(&ifaceConfig.BufferSizeMb, "b", 64, "Interface buffersize (MB)") flag.IntVar(&keepLogFiles, "kl", 4, "Rotate the number of log files") diff --git a/protos/rtcp.go b/protos/rtcp.go index bc22541..f41f0a4 100644 --- a/protos/rtcp.go +++ b/protos/rtcp.go @@ -9,118 +9,118 @@ import ( ) /* RTCP header - 0 1 2 3 4 - 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -header |V=2|P| RC | PT(200-204) | length | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +0 1 2 3 4 +0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +|V=2|P| RC | PT(200-204) | length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ SR: - 0 1 2 3 4 - 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - header |V=2|P| RC | PT=SR=200 | length | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | SSRC of sender | - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ - sender | NTP timestamp,most significant word | - info +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | NTP timestamp,least significant word | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | RTP timestamp | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | sender's packet count | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | sender's octet count | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - report | SSRC_1 (SSRC of first source) | - block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - 1 | fraction lost | cumulative number of packets lost | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | extended highest sequence number received | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | interarrival jitter | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | last SR (LSR) | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | delay since last SR (DLSR) | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - report | SSRC_2 (SSRC of second source) | - block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - 2 : ... : - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ - | profile-specific extensions | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +0 1 2 3 4 +0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +|V=2|P| RC | PT=SR=200 | length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SSRC of sender | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| NTP timestamp,most significant word | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| NTP timestamp,least significant word | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| RTP timestamp | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| sender's packet count | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| sender's octet count | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SSRC_1 (SSRC of first source) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| fraction lost | cumulative number of packets lost | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| extended highest sequence number received | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| interarrival jitter | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| last SR (LSR) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| delay since last SR (DLSR) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SSRC_2 (SSRC of second source) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| ... | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| profile-specific extensions | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -RR: - 0 1 2 3 4 - 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - header |V=2|P| RC | PT=PR=201 | length | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | SSRC of sender | - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ - report | SSRC_1 (SSRC of first source) | - block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - 1 | fraction lost | cumulative number of packets lost | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | extended highest sequence number received | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | interarrival jitter | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | last SR (LSR) | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | delay since last SR (DLSR) | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - report | SSRC_2 (SSRC of second source) | - block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - 2 : ... : - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ - | profile-specific extensions | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +RR: +0 1 2 3 4 +0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +|V=2|P| RC | PT=PR=201 | length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SSRC of sender | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| SSRC_1 (SSRC of first source) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| fraction lost | cumulative number of packets lost | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| extended highest sequence number received | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| interarrival jitter | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| last SR (LSR) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| delay since last SR (DLSR) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SSRC_2 (SSRC of second source) | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +: ... : ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| profile-specific extensions | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ SDES: - 0 1 2 3 4 - 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - header |V=2|P| SC | PT=SR=202 | length | - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ - | SSRC/CSRC_1 | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | SDES items | - | ... | - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ - | SSRC/CSRC_2 | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | SDES items | - | ... | - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +0 1 2 3 4 +0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +|V=2|P| SC | PT=SR=202 | length | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| SSRC/CSRC_1 | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SDES items | +| ... | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| SSRC/CSRC_2 | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SDES items | +| ... | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ -BYE: - 0 1 2 3 4 - 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - header |V=2|P| SC | PT=SR=203 | length | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | SSRC/CSRC | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - : ... : - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ - (opt) | length | reason for leaving ... - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +BYE: +0 1 2 3 4 +0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +|V=2|P| SC | PT=SR=203 | length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| SSRC/CSRC | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +: ... | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| length | reason for leaving | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - APP: - 0 1 2 3 4 - 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - header |V=2|P| SC | PT=SR=204 | length | - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ - | SSRC/CSRC | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | name (ASCII) | - +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ - | application-dependent data ... - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +APP: +0 1 2 3 4 +0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +|V=2|P| SC | PT=SR=204 | length | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| SSRC/CSRC | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| name (ASCII) | ++=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +| application-dependent data | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ */ const ( @@ -142,18 +142,18 @@ type RTCP_header struct { type RTCP_Packet struct { SenderInformation struct { - Ssrc uint32 `json:"ssrc"` // 32 bit Ntp_timestamp_MSW uint32 `json:"ntp_timestamp_sec"` // 32 bit Ntp_timestamp_LSW uint32 `json:"ntp_timestamp_usec"` // 32 bit Rtp_timestamp uint32 `json:"rtp_timestamp"` // 32 bit Pkt_count uint32 `json:"packets"` // 32 bit Octet_count uint32 `json:"octets"` // 32 bit } `json:"sender_information"` + Ssrc uint32 `json:"ssrc"` // 32 bit ReportBlocks []RTCP_report_block `json:"report_blocks"` } type RTCP_report_block struct { - Ssrc uint32 `json:"source_ssrc"` // 32 bit + SourceSsrc uint32 `json:"source_ssrc"` // 32 bit Fraction_lost uint8 `json:"fraction_lost"` // 8 bit Cumulative_lost uint32 `json:"packets_lost"` // 24 bit Highest_seq_no uint32 `json:"highest_seq_no"` // 32 bit @@ -175,15 +175,16 @@ func (rp *RTCP_Packet) MarshalJSON() ([]byte, error) { } func ParseRTCP(data []byte) ([]byte, error) { - if len(data) < 28 { - return nil, fmt.Errorf("No usefull data inside RTCP packet='%s' length=%d", string(data), len(data)) + dataLen := len(data) + if dataLen < 28 { + return nil, fmt.Errorf("Useless data inside RTCP packet='%s' length=%d", string(data), len(data)) } pkt := &RTCP_Packet{} offset := 0 - for pktLen := len(data); pktLen > 0; { - if pktLen%4 != 0 || pktLen < 4 { - return nil, fmt.Errorf("Fishy RTCP packet=%v length=%d", data, pktLen) + for dataLen > 0 { + if dataLen%4 != 0 || dataLen < 4 { + return nil, fmt.Errorf("Fishy RTCP packet=%v length=%d", data, dataLen) } //version := (data[offset] & 0xc0) >> 6 @@ -204,7 +205,7 @@ func ParseRTCP(data []byte) ([]byte, error) { return nil, fmt.Errorf("To small RTCP packet=%v length=%d type=%d", data[offset:RTCPLength], RTCPLength, RTCPType) } - pkt.SenderInformation.Ssrc = binary.BigEndian.Uint32(data[offset:]) + pkt.Ssrc = binary.BigEndian.Uint32(data[offset:]) pkt.SenderInformation.Ntp_timestamp_MSW = binary.BigEndian.Uint32(data[offset+4:]) pkt.SenderInformation.Ntp_timestamp_LSW = binary.BigEndian.Uint32(data[offset+8:]) pkt.SenderInformation.Rtp_timestamp = binary.BigEndian.Uint32(data[offset+12:]) @@ -215,7 +216,7 @@ func ParseRTCP(data []byte) ([]byte, error) { if receptionReportCount > 0 && RTCPLength >= 24 { tmpReportBlocks := make([]RTCP_report_block, receptionReportCount) for i := 0; i < int(receptionReportCount); i++ { - tmpReportBlocks[i].Ssrc = binary.BigEndian.Uint32(data[offset:]) + tmpReportBlocks[i].SourceSsrc = binary.BigEndian.Uint32(data[offset:]) tmpReportBlocks[i].Fraction_lost = data[offset+4] var cumBuf [4]byte copy(cumBuf[1:], data[offset+5:offset+8]) @@ -232,14 +233,17 @@ func ParseRTCP(data []byte) ([]byte, error) { } case TYPE_RTCP_RR: - if RTCPLength < 24 { + if RTCPLength < 28 { return nil, fmt.Errorf("To small RTCP packet=%v length=%d type=%d", data[offset:RTCPLength], RTCPLength, RTCPType) } - if receptionReportCount > 0 && RTCPLength >= 24 { + pkt.Ssrc = binary.BigEndian.Uint32(data[offset:]) + offset = offset + 4 + + if receptionReportCount > 0 && RTCPLength >= 28 { tmpReportBlocks := make([]RTCP_report_block, receptionReportCount) for i := 0; i < int(receptionReportCount); i++ { - tmpReportBlocks[i].Ssrc = binary.BigEndian.Uint32(data[offset:]) + tmpReportBlocks[i].SourceSsrc = binary.BigEndian.Uint32(data[offset:]) tmpReportBlocks[i].Fraction_lost = data[offset+4] var cumBuf [4]byte copy(cumBuf[1:], data[offset+5:offset+8]) @@ -272,7 +276,7 @@ func ParseRTCP(data []byte) ([]byte, error) { offset = offset + int(RTCPLength) } - pktLen -= offset + dataLen -= offset } diff --git a/sniffer/sniffer.go b/sniffer/sniffer.go index fc6e4c8..781d88e 100644 --- a/sniffer/sniffer.go +++ b/sniffer/sniffer.go @@ -246,8 +246,8 @@ func (sniffer *SnifferSetup) Run() error { logp.Debug("sniffer", "End of file") loopCount++ if sniffer.config.Loop > 0 && loopCount > sniffer.config.Loop { - // time for the publish goroutine to flush - time.Sleep(300 * time.Millisecond) + // Give the publish goroutine 200 ms to flush + time.Sleep(200 * time.Millisecond) sniffer.isAlive = false continue } @@ -276,7 +276,7 @@ func (sniffer *SnifferSetup) Run() error { } if sniffer.config.ReadFile != "" { - if lastPktTime != nil && !sniffer.config.TopSpeed { + if lastPktTime != nil && !sniffer.config.ReadSpeed { sleep := ci.Timestamp.Sub(*lastPktTime) if sleep > 0 { time.Sleep(sleep) @@ -286,7 +286,7 @@ func (sniffer *SnifferSetup) Run() error { } _lastPktTime := ci.Timestamp lastPktTime = &_lastPktTime - if !sniffer.config.TopSpeed { + if !sniffer.config.ReadSpeed { // Overwrite what we get from the pcap ci.Timestamp = time.Now() } @@ -315,12 +315,13 @@ func (sniffer *SnifferSetup) Close() error { func (sniffer *SnifferSetup) Reopen() error { var err error + time.Sleep(100 * time.Millisecond) if sniffer.config.Type != "pcap" || sniffer.config.ReadFile == "" { return fmt.Errorf("Reopen is only possible for files and in pcap mode") } - sniffer.pcapHandle.Close() + sniffer.Close() sniffer.pcapHandle, err = pcap.OpenOffline(sniffer.config.ReadFile) if err != nil { return err