package capture import ( "errors" "fmt" "io" "log" "sort" "sync" "sync/atomic" "time" "equipment-analyzer/internal/model" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" ) type PacketCapture struct { handles []*pcap.Handle isCapturing bool stopChan chan struct{} mutex sync.RWMutex tcpProcessor *TCPProcessor dataChan chan *model.TCPData errorChan chan error totalPackets uint64 tcpPackets uint64 payloads uint64 portCounts map[uint16]uint64 ipCounts map[string]uint64 } type CaptureStats struct { TotalPackets uint64 TCPPackets uint64 PayloadPackets uint64 AckGroups int UniquePayloads int RawSegments int FinalBuffers int } type Config struct { InterfaceName string Filter string Timeout time.Duration BufferSize int } func NewPacketCapture() *PacketCapture { return &PacketCapture{ stopChan: make(chan struct{}), tcpProcessor: NewTCPProcessor(), dataChan: make(chan *model.TCPData, 1000), errorChan: make(chan error, 100), portCounts: make(map[uint16]uint64), ipCounts: make(map[string]uint64), } } func (pc *PacketCapture) Start(config Config) error { pc.mutex.Lock() defer pc.mutex.Unlock() if pc.isCapturing { return fmt.Errorf("capture already running") } atomic.StoreUint64(&pc.totalPackets, 0) atomic.StoreUint64(&pc.tcpPackets, 0) atomic.StoreUint64(&pc.payloads, 0) pc.resetStats() if pc.stopChan == nil { pc.stopChan = make(chan struct{}) } interfaces := []string{} if config.InterfaceName != "" { interfaces = []string{config.InterfaceName} } else { interfaces = findWorkingInterfaces() } log.Printf("[capture] start: interfaces=%v filter=%q timeout=%s buffer_size=%d", interfaces, config.Filter, config.Timeout, config.BufferSize) var opened []*pcap.Handle for _, iface := range interfaces { handle, err := pcap.OpenLive( iface, int32(config.BufferSize), true, config.Timeout, ) if err != nil { log.Printf("[capture] OpenLive failed: interface=%s err=%v", iface, err) continue } if config.Filter != "" { if err := handle.SetBPFFilter(config.Filter); err != nil { handle.Close() log.Printf("[capture] SetBPFFilter failed: interface=%s filter=%q err=%v", iface, config.Filter, err) continue } log.Printf("[capture] SetBPFFilter applied: interface=%s filter=%q", iface, config.Filter) } else { log.Printf("[capture] SetBPFFilter skipped: interface=%s filter=empty", iface) } opened = append(opened, handle) } if len(opened) == 0 { return fmt.Errorf("failed to open any interfaces") } pc.handles = opened pc.isCapturing = true for _, h := range pc.handles { go pc.captureLoop(h) } return nil } func (pc *PacketCapture) Stop() { pc.mutex.Lock() if !pc.isCapturing { pc.mutex.Unlock() return } pc.isCapturing = false if pc.stopChan != nil { close(pc.stopChan) pc.stopChan = nil } stats := pc.GetStats() portsSnapshot, ipsSnapshot := pc.snapshotStatsLocked() for _, h := range pc.handles { if h != nil { h.Close() } } pc.handles = nil pc.mutex.Unlock() log.Printf("[capture] stop: total=%d tcp=%d payload=%d ack_groups=%d unique_payloads=%d raw_segments=%d final_buffers=%d", stats.TotalPackets, stats.TCPPackets, stats.PayloadPackets, stats.AckGroups, stats.UniquePayloads, stats.RawSegments, stats.FinalBuffers) pc.logTopStatsFromSnapshots(portsSnapshot, ipsSnapshot) } func (pc *PacketCapture) IsCapturing() bool { pc.mutex.RLock() defer pc.mutex.RUnlock() return pc.isCapturing } func (pc *PacketCapture) captureLoop(handle *pcap.Handle) { packetSource := gopacket.NewPacketSource(handle, handle.LinkType()) log.Println("[capture] packet loop started") for { select { case <-pc.stopChan: return default: packet, err := packetSource.NextPacket() if err != nil { if errors.Is(err, io.EOF) || err.Error() == "EOF" { return } if err.Error() == "Timeout Expired" { continue } log.Printf("[capture] NextPacket error: %v", err) continue } atomic.AddUint64(&pc.totalPackets, 1) pc.processTCPPacket(packet) } } } func findWorkingInterfaces() []string { devs, err := pcap.FindAllDevs() if err != nil { log.Printf("[capture] FindAllDevs failed: %v", err) return nil } for _, dev := range devs { log.Printf("[capture] iface: name=%s desc=%q addrs=%v", dev.Name, dev.Description, dev.Addresses) } all := make([]string, 0, len(devs)) for _, dev := range devs { all = append(all, dev.Name) } return uniqueStrings(all) } func uniqueStrings(in []string) []string { seen := make(map[string]struct{}, len(in)) out := make([]string, 0, len(in)) for _, v := range in { if _, ok := seen[v]; ok { continue } seen[v] = struct{}{} out = append(out, v) } return out } func (pc *PacketCapture) processTCPPacket(packet gopacket.Packet) { ipLayer := packet.Layer(layers.LayerTypeIPv4) tcpLayer := packet.Layer(layers.LayerTypeTCP) if tcpLayer == nil { return } atomic.AddUint64(&pc.tcpPackets, 1) tcp, ok := tcpLayer.(*layers.TCP) if !ok { return } if len(tcp.Payload) == 0 { return } atomic.AddUint64(&pc.payloads, 1) pc.recordTCPStats(tcp, ipLayer) tcpData := &model.TCPData{ Payload: tcp.Payload, Seq: uint32(tcp.Seq), Ack: uint32(tcp.Ack), SrcPort: uint16(tcp.SrcPort), DstPort: uint16(tcp.DstPort), } pc.tcpProcessor.ProcessPacket(tcpData) } func (pc *PacketCapture) resetStats() { pc.portCounts = make(map[uint16]uint64) pc.ipCounts = make(map[string]uint64) } func (pc *PacketCapture) recordTCPStats(tcp *layers.TCP, ipLayer gopacket.Layer) { pc.mutex.Lock() defer pc.mutex.Unlock() pc.portCounts[uint16(tcp.SrcPort)]++ pc.portCounts[uint16(tcp.DstPort)]++ if ipLayer == nil { return } if ip, ok := ipLayer.(*layers.IPv4); ok { pc.ipCounts[ip.SrcIP.String()]++ pc.ipCounts[ip.DstIP.String()]++ } } type kvU16 struct { Key uint16 Value uint64 } type kvStr struct { Key string Value uint64 } func (pc *PacketCapture) snapshotStatsLocked() ([]kvU16, []kvStr) { ports := make([]kvU16, 0, len(pc.portCounts)) for k, v := range pc.portCounts { ports = append(ports, kvU16{Key: k, Value: v}) } ips := make([]kvStr, 0, len(pc.ipCounts)) for k, v := range pc.ipCounts { ips = append(ips, kvStr{Key: k, Value: v}) } return ports, ips } func (pc *PacketCapture) logTopStatsFromSnapshots(ports []kvU16, ips []kvStr) { sort.Slice(ports, func(i, j int) bool { return ports[i].Value > ports[j].Value }) sort.Slice(ips, func(i, j int) bool { return ips[i].Value > ips[j].Value }) maxPorts := 10 if len(ports) < maxPorts { maxPorts = len(ports) } maxIPs := 10 if len(ips) < maxIPs { maxIPs = len(ips) } log.Printf("[capture] top tcp ports: %v", ports[:maxPorts]) log.Printf("[capture] top tcp ips: %v", ips[:maxIPs]) } func (pc *PacketCapture) GetCapturedData() []string { return pc.tcpProcessor.GetFinalBuffer() } func (pc *PacketCapture) GetPortCount(port uint16) uint64 { pc.mutex.RLock() defer pc.mutex.RUnlock() return pc.portCounts[port] } func (pc *PacketCapture) ProcessAllData() { start := time.Now() log.Printf("[capture] ProcessAllData started") pc.tcpProcessor.ProcessAllData() log.Printf("[capture] ProcessAllData finished in %s", time.Since(start)) } func (pc *PacketCapture) Clear() { pc.tcpProcessor.Clear() } func (pc *PacketCapture) GetStats() CaptureStats { processorStats := pc.tcpProcessor.Stats() return CaptureStats{ TotalPackets: atomic.LoadUint64(&pc.totalPackets), TCPPackets: atomic.LoadUint64(&pc.tcpPackets), PayloadPackets: atomic.LoadUint64(&pc.payloads), AckGroups: processorStats.AckGroups, UniquePayloads: processorStats.UniquePayloads, RawSegments: processorStats.RawSegmentCount, FinalBuffers: processorStats.FinalBufferSize, } }