Files
wails-epic/internal/service/capture_service.go

268 lines
7.1 KiB
Go

package service
import (
"context"
"fmt"
"log"
"sync"
"time"
"equipment-analyzer/internal/capture"
"equipment-analyzer/internal/config"
"equipment-analyzer/internal/model"
"equipment-analyzer/internal/utils"
)
type CaptureService struct {
config *config.Config
logger *utils.Logger
packetCapture *capture.PacketCapture
processor *capture.TCPProcessor
parser *ParserService
beforeRemote func()
mutex sync.RWMutex
isCapturing bool
isStarting bool
pendingStop bool
dataChan chan *model.CaptureResult
errorChan chan error
}
func NewCaptureService(cfg *config.Config, logger *utils.Logger, parser *ParserService) *CaptureService {
return &CaptureService{
config: cfg,
logger: logger,
packetCapture: capture.NewPacketCapture(),
processor: capture.NewTCPProcessor(),
parser: parser,
dataChan: make(chan *model.CaptureResult, 100),
errorChan: make(chan error, 100),
}
}
func (cs *CaptureService) SetBeforeRemote(fn func()) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
cs.beforeRemote = fn
}
func (cs *CaptureService) StartCaptureAsync(ctx context.Context, config capture.Config, onStarted func(), onError func(error)) error {
cs.mutex.Lock()
defer cs.mutex.Unlock()
if cs.isCapturing || cs.isStarting {
return fmt.Errorf("capture already running")
}
cs.isStarting = true
cs.pendingStop = false
cs.logger.Info("StartCapture requested",
"interface", config.InterfaceName,
"filter", config.Filter,
"timeout_ms", config.Timeout.Milliseconds(),
"buffer_size", config.BufferSize,
)
go func() {
if err := cs.packetCapture.Start(config); err != nil {
cs.logger.Error("Packet capture start failed", "error", err)
cs.mutex.Lock()
cs.isStarting = false
cs.mutex.Unlock()
if onError != nil {
onError(fmt.Errorf("failed to start capture: %w", err))
}
return
}
var shouldStop bool
cs.mutex.Lock()
cs.isStarting = false
cs.isCapturing = true
shouldStop = cs.pendingStop
cs.pendingStop = false
cs.mutex.Unlock()
cs.logger.Info("Packet capture started", "interface", config.InterfaceName)
if onStarted != nil {
onStarted()
}
if shouldStop {
_, _ = cs.StopAndParseCapture()
}
}()
go cs.processData(ctx)
return nil
}
func (cs *CaptureService) StopCapture() error {
cs.mutex.Lock()
defer cs.mutex.Unlock()
if cs.isStarting {
cs.pendingStop = true
cs.logger.Info("StopCapture queued while starting")
return nil
}
if !cs.isCapturing {
return fmt.Errorf("capture not running")
}
beforeStats := cs.packetCapture.GetStats()
cs.logger.Info("StopCapture requested",
"total_packets", beforeStats.TotalPackets,
"tcp_packets", beforeStats.TCPPackets,
"payload_packets", beforeStats.PayloadPackets,
"ack_groups", beforeStats.AckGroups,
"unique_payloads", beforeStats.UniquePayloads,
"raw_segments", beforeStats.RawSegments,
"final_buffers", beforeStats.FinalBuffers,
)
cs.packetCapture.Stop()
cs.isCapturing = false
cs.logger.Info("Packet capture stopped")
return nil
}
func (cs *CaptureService) GetCapturedData() []string {
return cs.packetCapture.GetCapturedData()
}
func (cs *CaptureService) ProcessAllData() {
cs.packetCapture.ProcessAllData()
}
func (cs *CaptureService) IsCapturing() bool {
cs.mutex.RLock()
defer cs.mutex.RUnlock()
return cs.isCapturing
}
func (cs *CaptureService) IsStarting() bool {
cs.mutex.RLock()
defer cs.mutex.RUnlock()
return cs.isStarting
}
func (cs *CaptureService) processData(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !cs.IsCapturing() {
continue
}
stats := cs.packetCapture.GetStats()
cs.logger.Info("Capture heartbeat",
"total_packets", stats.TotalPackets,
"tcp_packets", stats.TCPPackets,
"payload_packets", stats.PayloadPackets,
"ack_groups", stats.AckGroups,
"unique_payloads", stats.UniquePayloads,
"raw_segments", stats.RawSegments,
"final_buffers", stats.FinalBuffers,
)
}
}
}
func (cs *CaptureService) StopAndParseCapture() (*model.ParsedResult, error) {
log.Printf("[service] StopAndParseCapture enter")
cs.mutex.Lock()
if cs.isStarting {
cs.pendingStop = true
cs.mutex.Unlock()
log.Printf("[service] StopAndParseCapture queued while starting")
return nil, fmt.Errorf("capture starting")
}
if !cs.isCapturing {
cs.mutex.Unlock()
log.Printf("[service] StopAndParseCapture exit: not running")
return nil, fmt.Errorf("capture not running")
}
cs.mutex.Unlock()
beforeStop := cs.packetCapture.GetStats()
cs.logger.Info("StopAndParseCapture requested",
"total_packets", beforeStop.TotalPackets,
"tcp_packets", beforeStop.TCPPackets,
"payload_packets", beforeStop.PayloadPackets,
"ack_groups", beforeStop.AckGroups,
"unique_payloads", beforeStop.UniquePayloads,
"raw_segments", beforeStop.RawSegments,
"final_buffers", beforeStop.FinalBuffers,
)
log.Printf("[service] StopAndParseCapture stopping packet capture")
cs.packetCapture.Stop()
log.Printf("[service] StopAndParseCapture packet capture stopped")
cs.isCapturing = false
cs.logger.Info("Packet capture stopped (StopAndParseCapture)")
cs.packetCapture.ProcessAllData()
afterProcess := cs.packetCapture.GetStats()
cs.logger.Info("ProcessAllData finished",
"total_packets", afterProcess.TotalPackets,
"tcp_packets", afterProcess.TCPPackets,
"payload_packets", afterProcess.PayloadPackets,
"ack_groups", afterProcess.AckGroups,
"unique_payloads", afterProcess.UniquePayloads,
"raw_segments", afterProcess.RawSegments,
"final_buffers", afterProcess.FinalBuffers,
)
port5222 := cs.packetCapture.GetPortCount(5222)
port3333 := cs.packetCapture.GetPortCount(3333)
if port5222 == 0 && port3333 == 0 {
cs.logger.Warn("No target port data after processing",
"port_5222", port5222,
"port_3333", port3333,
)
return nil, fmt.Errorf("no captured data")
}
if afterProcess.FinalBuffers == 0 {
cs.logger.Warn("No captured data after processing", "hint", "check interface and bpf filter")
return nil, fmt.Errorf("no captured data")
}
rawData := cs.packetCapture.GetCapturedData()
cs.logger.Info("Captured raw data snapshot", "hex_chunks", len(rawData))
if len(rawData) == 0 {
cs.logger.Warn("No captured data after stop", "hint", "check interface and bpf filter")
return nil, fmt.Errorf("no captured data")
}
parseStart := time.Now()
cs.logger.Info("ParseHexData starting",
"port_5222", port5222,
"port_3333", port3333,
"final_buffers", afterProcess.FinalBuffers,
"hex_chunks", len(rawData),
)
cs.mutex.RLock()
beforeRemote := cs.beforeRemote
cs.mutex.RUnlock()
if beforeRemote != nil {
beforeRemote()
}
result, _, err := cs.parser.ParseHexData(rawData)
cs.logger.Info("ParseHexData finished", "duration_ms", time.Since(parseStart).Milliseconds())
if err != nil {
cs.logger.Error("ParseHexData failed", "error", err, "hex_chunks", len(rawData))
return nil, fmt.Errorf("解析数据失败: %v", err)
}
cs.logger.Info("ParseHexData succeeded", "items", len(result.Items), "heroes", len(result.Heroes))
return result, nil
}