init
This commit is contained in:
96
internal/capture/processor.go
Normal file
96
internal/capture/processor.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package capture
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"equipment-analyzer/internal/model"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type TCPProcessor struct {
|
||||
mutex sync.RWMutex
|
||||
ackData map[uint32][]*model.TCPData
|
||||
finalBuffer []string
|
||||
loads map[string]bool // 去重用
|
||||
}
|
||||
|
||||
func NewTCPProcessor() *TCPProcessor {
|
||||
return &TCPProcessor{
|
||||
ackData: make(map[uint32][]*model.TCPData),
|
||||
loads: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (tp *TCPProcessor) ProcessPacket(tcpData *model.TCPData) {
|
||||
tp.mutex.Lock()
|
||||
defer tp.mutex.Unlock()
|
||||
|
||||
// 生成数据哈希用于去重
|
||||
hash := tp.generateHash(tcpData.Payload)
|
||||
if tp.loads[hash] {
|
||||
return // 跳过重复数据
|
||||
}
|
||||
tp.loads[hash] = true
|
||||
|
||||
// 按ACK号分组存储
|
||||
ack := uint32(tcpData.Ack)
|
||||
if tp.ackData[ack] == nil {
|
||||
tp.ackData[ack] = make([]*model.TCPData, 0)
|
||||
}
|
||||
tp.ackData[ack] = append(tp.ackData[ack], tcpData)
|
||||
}
|
||||
|
||||
func (tp *TCPProcessor) generateHash(data []byte) string {
|
||||
hash := md5.Sum(data)
|
||||
return hex.EncodeToString(hash[:])
|
||||
}
|
||||
|
||||
func (tp *TCPProcessor) ProcessAllData() {
|
||||
tp.mutex.Lock()
|
||||
defer tp.mutex.Unlock()
|
||||
|
||||
tp.finalBuffer = make([]string, 0)
|
||||
|
||||
for ack, dataList := range tp.ackData {
|
||||
tp.tryBuffer(ack, dataList)
|
||||
}
|
||||
}
|
||||
|
||||
func (tp *TCPProcessor) tryBuffer(ack uint32, dataList []*model.TCPData) {
|
||||
// 按序列号排序
|
||||
sort.Slice(dataList, func(i, j int) bool {
|
||||
return dataList[i].Seq < dataList[j].Seq
|
||||
})
|
||||
|
||||
// 合并所有分片数据
|
||||
var buffer bytes.Buffer
|
||||
for _, data := range dataList {
|
||||
buffer.Write(data.Payload)
|
||||
}
|
||||
|
||||
// 转换为十六进制字符串
|
||||
hexStr := hex.EncodeToString(buffer.Bytes())
|
||||
tp.finalBuffer = append(tp.finalBuffer, hexStr)
|
||||
}
|
||||
|
||||
func (tp *TCPProcessor) GetFinalBuffer() []string {
|
||||
tp.mutex.RLock()
|
||||
defer tp.mutex.RUnlock()
|
||||
|
||||
result := make([]string, len(tp.finalBuffer))
|
||||
copy(result, tp.finalBuffer)
|
||||
//// 输出每条finalBuffer的16进制字符串到控制台
|
||||
//fmt.Println("抓取16进制字符串")
|
||||
//fmt.Println(result)
|
||||
return result
|
||||
}
|
||||
func (tp *TCPProcessor) Clear() {
|
||||
tp.mutex.Lock()
|
||||
defer tp.mutex.Unlock()
|
||||
|
||||
tp.ackData = make(map[uint32][]*model.TCPData)
|
||||
tp.finalBuffer = make([]string, 0)
|
||||
tp.loads = make(map[string]bool)
|
||||
}
|
||||
Reference in New Issue
Block a user