268 lines
7.1 KiB
Go
268 lines
7.1 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"equipment-analyzer/internal/capture"
|
|
"equipment-analyzer/internal/config"
|
|
"equipment-analyzer/internal/model"
|
|
"equipment-analyzer/internal/utils"
|
|
)
|
|
|
|
type CaptureService struct {
|
|
config *config.Config
|
|
logger *utils.Logger
|
|
packetCapture *capture.PacketCapture
|
|
processor *capture.TCPProcessor
|
|
parser *ParserService
|
|
beforeRemote func()
|
|
mutex sync.RWMutex
|
|
isCapturing bool
|
|
isStarting bool
|
|
pendingStop bool
|
|
dataChan chan *model.CaptureResult
|
|
errorChan chan error
|
|
}
|
|
|
|
func NewCaptureService(cfg *config.Config, logger *utils.Logger, parser *ParserService) *CaptureService {
|
|
return &CaptureService{
|
|
config: cfg,
|
|
logger: logger,
|
|
packetCapture: capture.NewPacketCapture(),
|
|
processor: capture.NewTCPProcessor(),
|
|
parser: parser,
|
|
dataChan: make(chan *model.CaptureResult, 100),
|
|
errorChan: make(chan error, 100),
|
|
}
|
|
}
|
|
|
|
func (cs *CaptureService) SetBeforeRemote(fn func()) {
|
|
cs.mutex.Lock()
|
|
defer cs.mutex.Unlock()
|
|
cs.beforeRemote = fn
|
|
}
|
|
|
|
func (cs *CaptureService) StartCaptureAsync(ctx context.Context, config capture.Config, onStarted func(), onError func(error)) error {
|
|
cs.mutex.Lock()
|
|
defer cs.mutex.Unlock()
|
|
|
|
if cs.isCapturing || cs.isStarting {
|
|
return fmt.Errorf("capture already running")
|
|
}
|
|
|
|
cs.isStarting = true
|
|
cs.pendingStop = false
|
|
|
|
cs.logger.Info("StartCapture requested",
|
|
"interface", config.InterfaceName,
|
|
"filter", config.Filter,
|
|
"timeout_ms", config.Timeout.Milliseconds(),
|
|
"buffer_size", config.BufferSize,
|
|
)
|
|
|
|
go func() {
|
|
if err := cs.packetCapture.Start(config); err != nil {
|
|
cs.logger.Error("Packet capture start failed", "error", err)
|
|
cs.mutex.Lock()
|
|
cs.isStarting = false
|
|
cs.mutex.Unlock()
|
|
if onError != nil {
|
|
onError(fmt.Errorf("failed to start capture: %w", err))
|
|
}
|
|
return
|
|
}
|
|
|
|
var shouldStop bool
|
|
cs.mutex.Lock()
|
|
cs.isStarting = false
|
|
cs.isCapturing = true
|
|
shouldStop = cs.pendingStop
|
|
cs.pendingStop = false
|
|
cs.mutex.Unlock()
|
|
|
|
cs.logger.Info("Packet capture started", "interface", config.InterfaceName)
|
|
if onStarted != nil {
|
|
onStarted()
|
|
}
|
|
if shouldStop {
|
|
_, _ = cs.StopAndParseCapture()
|
|
}
|
|
}()
|
|
|
|
go cs.processData(ctx)
|
|
return nil
|
|
}
|
|
|
|
func (cs *CaptureService) StopCapture() error {
|
|
cs.mutex.Lock()
|
|
defer cs.mutex.Unlock()
|
|
|
|
if cs.isStarting {
|
|
cs.pendingStop = true
|
|
cs.logger.Info("StopCapture queued while starting")
|
|
return nil
|
|
}
|
|
|
|
if !cs.isCapturing {
|
|
return fmt.Errorf("capture not running")
|
|
}
|
|
|
|
beforeStats := cs.packetCapture.GetStats()
|
|
cs.logger.Info("StopCapture requested",
|
|
"total_packets", beforeStats.TotalPackets,
|
|
"tcp_packets", beforeStats.TCPPackets,
|
|
"payload_packets", beforeStats.PayloadPackets,
|
|
"ack_groups", beforeStats.AckGroups,
|
|
"unique_payloads", beforeStats.UniquePayloads,
|
|
"raw_segments", beforeStats.RawSegments,
|
|
"final_buffers", beforeStats.FinalBuffers,
|
|
)
|
|
|
|
cs.packetCapture.Stop()
|
|
cs.isCapturing = false
|
|
cs.logger.Info("Packet capture stopped")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (cs *CaptureService) GetCapturedData() []string {
|
|
return cs.packetCapture.GetCapturedData()
|
|
}
|
|
|
|
func (cs *CaptureService) ProcessAllData() {
|
|
cs.packetCapture.ProcessAllData()
|
|
}
|
|
|
|
func (cs *CaptureService) IsCapturing() bool {
|
|
cs.mutex.RLock()
|
|
defer cs.mutex.RUnlock()
|
|
return cs.isCapturing
|
|
}
|
|
|
|
func (cs *CaptureService) IsStarting() bool {
|
|
cs.mutex.RLock()
|
|
defer cs.mutex.RUnlock()
|
|
return cs.isStarting
|
|
}
|
|
|
|
func (cs *CaptureService) processData(ctx context.Context) {
|
|
ticker := time.NewTicker(3 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if !cs.IsCapturing() {
|
|
continue
|
|
}
|
|
stats := cs.packetCapture.GetStats()
|
|
cs.logger.Info("Capture heartbeat",
|
|
"total_packets", stats.TotalPackets,
|
|
"tcp_packets", stats.TCPPackets,
|
|
"payload_packets", stats.PayloadPackets,
|
|
"ack_groups", stats.AckGroups,
|
|
"unique_payloads", stats.UniquePayloads,
|
|
"raw_segments", stats.RawSegments,
|
|
"final_buffers", stats.FinalBuffers,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cs *CaptureService) StopAndParseCapture() (*model.ParsedResult, error) {
|
|
log.Printf("[service] StopAndParseCapture enter")
|
|
cs.mutex.Lock()
|
|
if cs.isStarting {
|
|
cs.pendingStop = true
|
|
cs.mutex.Unlock()
|
|
log.Printf("[service] StopAndParseCapture queued while starting")
|
|
return nil, fmt.Errorf("capture starting")
|
|
}
|
|
if !cs.isCapturing {
|
|
cs.mutex.Unlock()
|
|
log.Printf("[service] StopAndParseCapture exit: not running")
|
|
return nil, fmt.Errorf("capture not running")
|
|
}
|
|
cs.mutex.Unlock()
|
|
|
|
beforeStop := cs.packetCapture.GetStats()
|
|
cs.logger.Info("StopAndParseCapture requested",
|
|
"total_packets", beforeStop.TotalPackets,
|
|
"tcp_packets", beforeStop.TCPPackets,
|
|
"payload_packets", beforeStop.PayloadPackets,
|
|
"ack_groups", beforeStop.AckGroups,
|
|
"unique_payloads", beforeStop.UniquePayloads,
|
|
"raw_segments", beforeStop.RawSegments,
|
|
"final_buffers", beforeStop.FinalBuffers,
|
|
)
|
|
|
|
log.Printf("[service] StopAndParseCapture stopping packet capture")
|
|
cs.packetCapture.Stop()
|
|
log.Printf("[service] StopAndParseCapture packet capture stopped")
|
|
cs.isCapturing = false
|
|
cs.logger.Info("Packet capture stopped (StopAndParseCapture)")
|
|
|
|
cs.packetCapture.ProcessAllData()
|
|
afterProcess := cs.packetCapture.GetStats()
|
|
cs.logger.Info("ProcessAllData finished",
|
|
"total_packets", afterProcess.TotalPackets,
|
|
"tcp_packets", afterProcess.TCPPackets,
|
|
"payload_packets", afterProcess.PayloadPackets,
|
|
"ack_groups", afterProcess.AckGroups,
|
|
"unique_payloads", afterProcess.UniquePayloads,
|
|
"raw_segments", afterProcess.RawSegments,
|
|
"final_buffers", afterProcess.FinalBuffers,
|
|
)
|
|
|
|
port5222 := cs.packetCapture.GetPortCount(5222)
|
|
port3333 := cs.packetCapture.GetPortCount(3333)
|
|
if port5222 == 0 && port3333 == 0 {
|
|
cs.logger.Warn("No target port data after processing",
|
|
"port_5222", port5222,
|
|
"port_3333", port3333,
|
|
)
|
|
return nil, fmt.Errorf("no captured data")
|
|
}
|
|
|
|
if afterProcess.FinalBuffers == 0 {
|
|
cs.logger.Warn("No captured data after processing", "hint", "check interface and bpf filter")
|
|
return nil, fmt.Errorf("no captured data")
|
|
}
|
|
|
|
rawData := cs.packetCapture.GetCapturedData()
|
|
cs.logger.Info("Captured raw data snapshot", "hex_chunks", len(rawData))
|
|
if len(rawData) == 0 {
|
|
cs.logger.Warn("No captured data after stop", "hint", "check interface and bpf filter")
|
|
return nil, fmt.Errorf("no captured data")
|
|
}
|
|
|
|
parseStart := time.Now()
|
|
cs.logger.Info("ParseHexData starting",
|
|
"port_5222", port5222,
|
|
"port_3333", port3333,
|
|
"final_buffers", afterProcess.FinalBuffers,
|
|
"hex_chunks", len(rawData),
|
|
)
|
|
cs.mutex.RLock()
|
|
beforeRemote := cs.beforeRemote
|
|
cs.mutex.RUnlock()
|
|
if beforeRemote != nil {
|
|
beforeRemote()
|
|
}
|
|
result, _, err := cs.parser.ParseHexData(rawData)
|
|
cs.logger.Info("ParseHexData finished", "duration_ms", time.Since(parseStart).Milliseconds())
|
|
if err != nil {
|
|
cs.logger.Error("ParseHexData failed", "error", err, "hex_chunks", len(rawData))
|
|
return nil, fmt.Errorf("解析数据失败: %v", err)
|
|
}
|
|
|
|
cs.logger.Info("ParseHexData succeeded", "items", len(result.Items), "heroes", len(result.Heroes))
|
|
return result, nil
|
|
}
|