Files
wails-epic/internal/capture/capture.go

349 lines
7.8 KiB
Go

package capture
import (
"errors"
"fmt"
"io"
"log"
"sort"
"sync"
"sync/atomic"
"time"
"equipment-analyzer/internal/model"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
)
type PacketCapture struct {
handles []*pcap.Handle
isCapturing bool
stopChan chan struct{}
mutex sync.RWMutex
tcpProcessor *TCPProcessor
dataChan chan *model.TCPData
errorChan chan error
totalPackets uint64
tcpPackets uint64
payloads uint64
portCounts map[uint16]uint64
ipCounts map[string]uint64
}
type CaptureStats struct {
TotalPackets uint64
TCPPackets uint64
PayloadPackets uint64
AckGroups int
UniquePayloads int
RawSegments int
FinalBuffers int
}
type Config struct {
InterfaceName string
Filter string
Timeout time.Duration
BufferSize int
}
func NewPacketCapture() *PacketCapture {
return &PacketCapture{
stopChan: make(chan struct{}),
tcpProcessor: NewTCPProcessor(),
dataChan: make(chan *model.TCPData, 1000),
errorChan: make(chan error, 100),
portCounts: make(map[uint16]uint64),
ipCounts: make(map[string]uint64),
}
}
func (pc *PacketCapture) Start(config Config) error {
pc.mutex.Lock()
defer pc.mutex.Unlock()
if pc.isCapturing {
return fmt.Errorf("capture already running")
}
atomic.StoreUint64(&pc.totalPackets, 0)
atomic.StoreUint64(&pc.tcpPackets, 0)
atomic.StoreUint64(&pc.payloads, 0)
pc.resetStats()
if pc.stopChan == nil {
pc.stopChan = make(chan struct{})
}
interfaces := []string{}
if config.InterfaceName != "" {
interfaces = []string{config.InterfaceName}
} else {
interfaces = findWorkingInterfaces()
}
log.Printf("[capture] start: interfaces=%v filter=%q timeout=%s buffer_size=%d",
interfaces, config.Filter, config.Timeout, config.BufferSize)
var opened []*pcap.Handle
for _, iface := range interfaces {
handle, err := pcap.OpenLive(
iface,
int32(config.BufferSize),
true,
config.Timeout,
)
if err != nil {
log.Printf("[capture] OpenLive failed: interface=%s err=%v", iface, err)
continue
}
if config.Filter != "" {
if err := handle.SetBPFFilter(config.Filter); err != nil {
handle.Close()
log.Printf("[capture] SetBPFFilter failed: interface=%s filter=%q err=%v", iface, config.Filter, err)
continue
}
log.Printf("[capture] SetBPFFilter applied: interface=%s filter=%q", iface, config.Filter)
} else {
log.Printf("[capture] SetBPFFilter skipped: interface=%s filter=empty", iface)
}
opened = append(opened, handle)
}
if len(opened) == 0 {
return fmt.Errorf("failed to open any interfaces")
}
pc.handles = opened
pc.isCapturing = true
for _, h := range pc.handles {
go pc.captureLoop(h)
}
return nil
}
func (pc *PacketCapture) Stop() {
pc.mutex.Lock()
if !pc.isCapturing {
pc.mutex.Unlock()
return
}
pc.isCapturing = false
if pc.stopChan != nil {
close(pc.stopChan)
pc.stopChan = nil
}
stats := pc.GetStats()
portsSnapshot, ipsSnapshot := pc.snapshotStatsLocked()
for _, h := range pc.handles {
if h != nil {
h.Close()
}
}
pc.handles = nil
pc.mutex.Unlock()
log.Printf("[capture] stop: total=%d tcp=%d payload=%d ack_groups=%d unique_payloads=%d raw_segments=%d final_buffers=%d",
stats.TotalPackets, stats.TCPPackets, stats.PayloadPackets, stats.AckGroups, stats.UniquePayloads, stats.RawSegments, stats.FinalBuffers)
pc.logTopStatsFromSnapshots(portsSnapshot, ipsSnapshot)
}
func (pc *PacketCapture) IsCapturing() bool {
pc.mutex.RLock()
defer pc.mutex.RUnlock()
return pc.isCapturing
}
func (pc *PacketCapture) captureLoop(handle *pcap.Handle) {
packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
log.Println("[capture] packet loop started")
for {
select {
case <-pc.stopChan:
return
default:
packet, err := packetSource.NextPacket()
if err != nil {
if errors.Is(err, io.EOF) || err.Error() == "EOF" {
return
}
if err.Error() == "Timeout Expired" {
continue
}
log.Printf("[capture] NextPacket error: %v", err)
continue
}
atomic.AddUint64(&pc.totalPackets, 1)
pc.processTCPPacket(packet)
}
}
}
func findWorkingInterfaces() []string {
devs, err := pcap.FindAllDevs()
if err != nil {
log.Printf("[capture] FindAllDevs failed: %v", err)
return nil
}
for _, dev := range devs {
log.Printf("[capture] iface: name=%s desc=%q addrs=%v", dev.Name, dev.Description, dev.Addresses)
}
all := make([]string, 0, len(devs))
for _, dev := range devs {
all = append(all, dev.Name)
}
return uniqueStrings(all)
}
func uniqueStrings(in []string) []string {
seen := make(map[string]struct{}, len(in))
out := make([]string, 0, len(in))
for _, v := range in {
if _, ok := seen[v]; ok {
continue
}
seen[v] = struct{}{}
out = append(out, v)
}
return out
}
func (pc *PacketCapture) processTCPPacket(packet gopacket.Packet) {
ipLayer := packet.Layer(layers.LayerTypeIPv4)
tcpLayer := packet.Layer(layers.LayerTypeTCP)
if tcpLayer == nil {
return
}
atomic.AddUint64(&pc.tcpPackets, 1)
tcp, ok := tcpLayer.(*layers.TCP)
if !ok {
return
}
if len(tcp.Payload) == 0 {
return
}
atomic.AddUint64(&pc.payloads, 1)
pc.recordTCPStats(tcp, ipLayer)
tcpData := &model.TCPData{
Payload: tcp.Payload,
Seq: uint32(tcp.Seq),
Ack: uint32(tcp.Ack),
SrcPort: uint16(tcp.SrcPort),
DstPort: uint16(tcp.DstPort),
}
pc.tcpProcessor.ProcessPacket(tcpData)
}
func (pc *PacketCapture) resetStats() {
pc.portCounts = make(map[uint16]uint64)
pc.ipCounts = make(map[string]uint64)
}
func (pc *PacketCapture) recordTCPStats(tcp *layers.TCP, ipLayer gopacket.Layer) {
pc.mutex.Lock()
defer pc.mutex.Unlock()
pc.portCounts[uint16(tcp.SrcPort)]++
pc.portCounts[uint16(tcp.DstPort)]++
if ipLayer == nil {
return
}
if ip, ok := ipLayer.(*layers.IPv4); ok {
pc.ipCounts[ip.SrcIP.String()]++
pc.ipCounts[ip.DstIP.String()]++
}
}
type kvU16 struct {
Key uint16
Value uint64
}
type kvStr struct {
Key string
Value uint64
}
func (pc *PacketCapture) snapshotStatsLocked() ([]kvU16, []kvStr) {
ports := make([]kvU16, 0, len(pc.portCounts))
for k, v := range pc.portCounts {
ports = append(ports, kvU16{Key: k, Value: v})
}
ips := make([]kvStr, 0, len(pc.ipCounts))
for k, v := range pc.ipCounts {
ips = append(ips, kvStr{Key: k, Value: v})
}
return ports, ips
}
func (pc *PacketCapture) logTopStatsFromSnapshots(ports []kvU16, ips []kvStr) {
sort.Slice(ports, func(i, j int) bool { return ports[i].Value > ports[j].Value })
sort.Slice(ips, func(i, j int) bool { return ips[i].Value > ips[j].Value })
maxPorts := 10
if len(ports) < maxPorts {
maxPorts = len(ports)
}
maxIPs := 10
if len(ips) < maxIPs {
maxIPs = len(ips)
}
log.Printf("[capture] top tcp ports: %v", ports[:maxPorts])
log.Printf("[capture] top tcp ips: %v", ips[:maxIPs])
}
func (pc *PacketCapture) GetCapturedData() []string {
return pc.tcpProcessor.GetFinalBuffer()
}
func (pc *PacketCapture) GetPortCount(port uint16) uint64 {
pc.mutex.RLock()
defer pc.mutex.RUnlock()
return pc.portCounts[port]
}
func (pc *PacketCapture) ProcessAllData() {
start := time.Now()
log.Printf("[capture] ProcessAllData started")
pc.tcpProcessor.ProcessAllData()
log.Printf("[capture] ProcessAllData finished in %s", time.Since(start))
}
func (pc *PacketCapture) Clear() {
pc.tcpProcessor.Clear()
}
func (pc *PacketCapture) GetStats() CaptureStats {
processorStats := pc.tcpProcessor.Stats()
return CaptureStats{
TotalPackets: atomic.LoadUint64(&pc.totalPackets),
TCPPackets: atomic.LoadUint64(&pc.tcpPackets),
PayloadPackets: atomic.LoadUint64(&pc.payloads),
AckGroups: processorStats.AckGroups,
UniquePayloads: processorStats.UniquePayloads,
RawSegments: processorStats.RawSegmentCount,
FinalBuffers: processorStats.FinalBufferSize,
}
}