feat(database): add CRUD operations for parsed sessions and update session name functionality
This commit is contained in:
@@ -3,6 +3,7 @@ package service
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -17,54 +18,110 @@ type CaptureService struct {
|
||||
logger *utils.Logger
|
||||
packetCapture *capture.PacketCapture
|
||||
processor *capture.TCPProcessor
|
||||
parser *ParserService
|
||||
beforeRemote func()
|
||||
mutex sync.RWMutex
|
||||
isCapturing bool
|
||||
isStarting bool
|
||||
pendingStop bool
|
||||
dataChan chan *model.CaptureResult
|
||||
errorChan chan error
|
||||
}
|
||||
|
||||
func NewCaptureService(cfg *config.Config, logger *utils.Logger) *CaptureService {
|
||||
func NewCaptureService(cfg *config.Config, logger *utils.Logger, parser *ParserService) *CaptureService {
|
||||
return &CaptureService{
|
||||
config: cfg,
|
||||
logger: logger,
|
||||
packetCapture: capture.NewPacketCapture(),
|
||||
processor: capture.NewTCPProcessor(),
|
||||
parser: parser,
|
||||
dataChan: make(chan *model.CaptureResult, 100),
|
||||
errorChan: make(chan error, 100),
|
||||
}
|
||||
}
|
||||
|
||||
// StartCapture 开始抓包
|
||||
func (cs *CaptureService) StartCapture(ctx context.Context, config capture.Config) error {
|
||||
func (cs *CaptureService) SetBeforeRemote(fn func()) {
|
||||
cs.mutex.Lock()
|
||||
defer cs.mutex.Unlock()
|
||||
cs.beforeRemote = fn
|
||||
}
|
||||
|
||||
func (cs *CaptureService) StartCaptureAsync(ctx context.Context, config capture.Config, onStarted func(), onError func(error)) error {
|
||||
cs.mutex.Lock()
|
||||
defer cs.mutex.Unlock()
|
||||
|
||||
if cs.isCapturing {
|
||||
if cs.isCapturing || cs.isStarting {
|
||||
return fmt.Errorf("capture already running")
|
||||
}
|
||||
|
||||
if err := cs.packetCapture.Start(config); err != nil {
|
||||
return fmt.Errorf("failed to start capture: %w", err)
|
||||
}
|
||||
cs.isStarting = true
|
||||
cs.pendingStop = false
|
||||
|
||||
cs.isCapturing = true
|
||||
cs.logger.Info("Packet capture started", "interface", config.InterfaceName)
|
||||
cs.logger.Info("StartCapture requested",
|
||||
"interface", config.InterfaceName,
|
||||
"filter", config.Filter,
|
||||
"timeout_ms", config.Timeout.Milliseconds(),
|
||||
"buffer_size", config.BufferSize,
|
||||
)
|
||||
|
||||
go func() {
|
||||
if err := cs.packetCapture.Start(config); err != nil {
|
||||
cs.logger.Error("Packet capture start failed", "error", err)
|
||||
cs.mutex.Lock()
|
||||
cs.isStarting = false
|
||||
cs.mutex.Unlock()
|
||||
if onError != nil {
|
||||
onError(fmt.Errorf("failed to start capture: %w", err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
var shouldStop bool
|
||||
cs.mutex.Lock()
|
||||
cs.isStarting = false
|
||||
cs.isCapturing = true
|
||||
shouldStop = cs.pendingStop
|
||||
cs.pendingStop = false
|
||||
cs.mutex.Unlock()
|
||||
|
||||
cs.logger.Info("Packet capture started", "interface", config.InterfaceName)
|
||||
if onStarted != nil {
|
||||
onStarted()
|
||||
}
|
||||
if shouldStop {
|
||||
_, _ = cs.StopAndParseCapture()
|
||||
}
|
||||
}()
|
||||
|
||||
// 启动数据处理协程
|
||||
go cs.processData(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopCapture 停止抓包
|
||||
func (cs *CaptureService) StopCapture() error {
|
||||
cs.mutex.Lock()
|
||||
defer cs.mutex.Unlock()
|
||||
|
||||
if cs.isStarting {
|
||||
cs.pendingStop = true
|
||||
cs.logger.Info("StopCapture queued while starting")
|
||||
return nil
|
||||
}
|
||||
|
||||
if !cs.isCapturing {
|
||||
return fmt.Errorf("capture not running")
|
||||
}
|
||||
|
||||
beforeStats := cs.packetCapture.GetStats()
|
||||
cs.logger.Info("StopCapture requested",
|
||||
"total_packets", beforeStats.TotalPackets,
|
||||
"tcp_packets", beforeStats.TCPPackets,
|
||||
"payload_packets", beforeStats.PayloadPackets,
|
||||
"ack_groups", beforeStats.AckGroups,
|
||||
"unique_payloads", beforeStats.UniquePayloads,
|
||||
"raw_segments", beforeStats.RawSegments,
|
||||
"final_buffers", beforeStats.FinalBuffers,
|
||||
)
|
||||
|
||||
cs.packetCapture.Stop()
|
||||
cs.isCapturing = false
|
||||
cs.logger.Info("Packet capture stopped")
|
||||
@@ -72,62 +129,139 @@ func (cs *CaptureService) StopCapture() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetCapturedData 获取抓包数据
|
||||
func (cs *CaptureService) GetCapturedData() []string {
|
||||
return cs.packetCapture.GetCapturedData()
|
||||
}
|
||||
|
||||
// ProcessAllData 处理所有数据
|
||||
func (cs *CaptureService) ProcessAllData() {
|
||||
cs.packetCapture.ProcessAllData()
|
||||
}
|
||||
|
||||
// IsCapturing 检查是否正在抓包
|
||||
func (cs *CaptureService) IsCapturing() bool {
|
||||
cs.mutex.RLock()
|
||||
defer cs.mutex.RUnlock()
|
||||
return cs.isCapturing
|
||||
}
|
||||
|
||||
// processData 处理抓包数据
|
||||
func (cs *CaptureService) IsStarting() bool {
|
||||
cs.mutex.RLock()
|
||||
defer cs.mutex.RUnlock()
|
||||
return cs.isStarting
|
||||
}
|
||||
|
||||
func (cs *CaptureService) processData(ctx context.Context) {
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
// 这里可以添加实时数据处理逻辑
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
case <-ticker.C:
|
||||
if !cs.IsCapturing() {
|
||||
continue
|
||||
}
|
||||
stats := cs.packetCapture.GetStats()
|
||||
cs.logger.Info("Capture heartbeat",
|
||||
"total_packets", stats.TotalPackets,
|
||||
"tcp_packets", stats.TCPPackets,
|
||||
"payload_packets", stats.PayloadPackets,
|
||||
"ack_groups", stats.AckGroups,
|
||||
"unique_payloads", stats.UniquePayloads,
|
||||
"raw_segments", stats.RawSegments,
|
||||
"final_buffers", stats.FinalBuffers,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// StopAndParseCapture 停止抓包并解析数据
|
||||
func (cs *CaptureService) StopAndParseCapture(parser *ParserService) (*model.ParsedResult, error) {
|
||||
func (cs *CaptureService) StopAndParseCapture() (*model.ParsedResult, error) {
|
||||
log.Printf("[service] StopAndParseCapture enter")
|
||||
cs.mutex.Lock()
|
||||
defer cs.mutex.Unlock()
|
||||
|
||||
if cs.isStarting {
|
||||
cs.pendingStop = true
|
||||
cs.mutex.Unlock()
|
||||
log.Printf("[service] StopAndParseCapture queued while starting")
|
||||
return nil, fmt.Errorf("capture starting")
|
||||
}
|
||||
if !cs.isCapturing {
|
||||
cs.mutex.Unlock()
|
||||
log.Printf("[service] StopAndParseCapture exit: not running")
|
||||
return nil, fmt.Errorf("capture not running")
|
||||
}
|
||||
cs.mutex.Unlock()
|
||||
|
||||
beforeStop := cs.packetCapture.GetStats()
|
||||
cs.logger.Info("StopAndParseCapture requested",
|
||||
"total_packets", beforeStop.TotalPackets,
|
||||
"tcp_packets", beforeStop.TCPPackets,
|
||||
"payload_packets", beforeStop.PayloadPackets,
|
||||
"ack_groups", beforeStop.AckGroups,
|
||||
"unique_payloads", beforeStop.UniquePayloads,
|
||||
"raw_segments", beforeStop.RawSegments,
|
||||
"final_buffers", beforeStop.FinalBuffers,
|
||||
)
|
||||
|
||||
log.Printf("[service] StopAndParseCapture stopping packet capture")
|
||||
cs.packetCapture.Stop()
|
||||
log.Printf("[service] StopAndParseCapture packet capture stopped")
|
||||
cs.isCapturing = false
|
||||
cs.logger.Info("Packet capture stopped (StopAndParseCapture)")
|
||||
|
||||
// 处理所有收集的数据
|
||||
cs.packetCapture.ProcessAllData()
|
||||
afterProcess := cs.packetCapture.GetStats()
|
||||
cs.logger.Info("ProcessAllData finished",
|
||||
"total_packets", afterProcess.TotalPackets,
|
||||
"tcp_packets", afterProcess.TCPPackets,
|
||||
"payload_packets", afterProcess.PayloadPackets,
|
||||
"ack_groups", afterProcess.AckGroups,
|
||||
"unique_payloads", afterProcess.UniquePayloads,
|
||||
"raw_segments", afterProcess.RawSegments,
|
||||
"final_buffers", afterProcess.FinalBuffers,
|
||||
)
|
||||
|
||||
// 获取抓包数据
|
||||
rawData := cs.packetCapture.GetCapturedData()
|
||||
if len(rawData) == 0 {
|
||||
port5222 := cs.packetCapture.GetPortCount(5222)
|
||||
port3333 := cs.packetCapture.GetPortCount(3333)
|
||||
if port5222 == 0 && port3333 == 0 {
|
||||
cs.logger.Warn("No target port data after processing",
|
||||
"port_5222", port5222,
|
||||
"port_3333", port3333,
|
||||
)
|
||||
return nil, fmt.Errorf("no captured data")
|
||||
}
|
||||
|
||||
// 解析数据
|
||||
result, _, err := parser.ParseHexData(rawData)
|
||||
if afterProcess.FinalBuffers == 0 {
|
||||
cs.logger.Warn("No captured data after processing", "hint", "check interface and bpf filter")
|
||||
return nil, fmt.Errorf("no captured data")
|
||||
}
|
||||
|
||||
rawData := cs.packetCapture.GetCapturedData()
|
||||
cs.logger.Info("Captured raw data snapshot", "hex_chunks", len(rawData))
|
||||
if len(rawData) == 0 {
|
||||
cs.logger.Warn("No captured data after stop", "hint", "check interface and bpf filter")
|
||||
return nil, fmt.Errorf("no captured data")
|
||||
}
|
||||
|
||||
parseStart := time.Now()
|
||||
cs.logger.Info("ParseHexData starting",
|
||||
"port_5222", port5222,
|
||||
"port_3333", port3333,
|
||||
"final_buffers", afterProcess.FinalBuffers,
|
||||
"hex_chunks", len(rawData),
|
||||
)
|
||||
cs.mutex.RLock()
|
||||
beforeRemote := cs.beforeRemote
|
||||
cs.mutex.RUnlock()
|
||||
if beforeRemote != nil {
|
||||
beforeRemote()
|
||||
}
|
||||
result, _, err := cs.parser.ParseHexData(rawData)
|
||||
cs.logger.Info("ParseHexData finished", "duration_ms", time.Since(parseStart).Milliseconds())
|
||||
if err != nil {
|
||||
cs.logger.Error("ParseHexData failed", "error", err, "hex_chunks", len(rawData))
|
||||
return nil, fmt.Errorf("解析数据失败: %v", err)
|
||||
}
|
||||
|
||||
cs.logger.Info("ParseHexData succeeded", "items", len(result.Items), "heroes", len(result.Heroes))
|
||||
return result, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user