Files
PacketBreeze/pkg/packetAnalyzer/packetAnalyzer.go
Unic-X 3f83936e6e Init
2023-06-18 14:03:27 +05:30

263 lines
16 KiB
Go

package packetAnalyzer
import (
"os"
"strings"
"time"
"github.com/deepfence/FlowMeter/pkg/common"
"github.com/deepfence/FlowMeter/pkg/constants"
"github.com/deepfence/FlowMeter/pkg/fileProcess"
"github.com/deepfence/FlowMeter/pkg/ml"
"github.com/google/gopacket"
"github.com/sirupsen/logrus"
)
// This go routine communicates through channels and computes flow stats
func FlowMeter(ch chan gopacket.Packet, done chan struct{}, maxNumPackets int, localIP string, ifLocalIPKnown bool, fname string) error {
flowDict := make(map[string][]interface{})
flowSave := make(map[string][]interface{})
// Import model parameters (weight, scaling - mean, standard deviations)
wt, intercept, meanScale, stdScale := ml.ModelParameters()
numPackets := 0
err := os.MkdirAll(constants.FlowOutputFolder, 0777)
if err != nil {
logrus.Error(err)
return err
}
for packet := range ch {
numPackets++
if constants.Verbose {
if numPackets > 0 {
logrus.Info("Num packets: ", numPackets)
logrus.Info(" ")
}
}
packet5Tuple, reverseTuple, packetSize, packetTime := PacketInfo(packet)
if reverseTuple != "nil" {
ok, ok1, ok2 := true, true, true
direction := ""
// Ascertain directionality.
if ifLocalIPKnown {
if strings.Split(packet5Tuple, "--")[0] == localIP {
//If packet5Tuple is localIP. Check if flow key for this already exists.
_, ok = flowDict[packet5Tuple]
direction = "fwd"
} else {
//If packetReverse5Tuple is localIP. Check if flow key for this already exists.
_, ok = flowDict[reverseTuple]
packet5Tuple = reverseTuple
direction = "bwd"
}
} else {
_, ok1 = flowDict[packet5Tuple]
_, ok2 = flowDict[reverseTuple]
ok = ok1 || ok2
if (!ok1) && (!ok2) {
direction = "fwd"
} else if (ok1) && (!ok2) {
direction = "fwd"
} else if (!ok1) && (ok2) {
packet5Tuple = reverseTuple
direction = "bwd"
} else {
logrus.Info("We shouldn't have keys for flow-five-tuple and reverse-five-tuple. Error in code.")
}
}
dstIPFlow, srcIPFlow, protocolFlow, dstPortFlow, srcPortFlow := strings.Split(packet5Tuple, "--")[0], strings.Split(packet5Tuple, "--")[1], strings.Split(packet5Tuple, "--")[2], strings.Split(packet5Tuple, "--")[3], strings.Split(packet5Tuple, "--")[4]
fwdPacketSize, bwdPacketSize := 0, 0
if !ok {
flowDict[packet5Tuple] = []interface{}{srcIPFlow, dstIPFlow, protocolFlow, srcPortFlow, dstPortFlow, 0 * time.Microsecond, 1, 0, 0, packetSize, float64(packetSize), 0.0, packetSize, packetSize, fwdPacketSize, bwdPacketSize, 0.0, 0.0, 0.0, 0.0, 0, 0, 0, 0, 0 * time.Microsecond, 0 * time.Microsecond, 0 * time.Microsecond, 0 * time.Microsecond, 0 * time.Microsecond, 0 * time.Microsecond, 0 * time.Microsecond, 0 * time.Microsecond, 0 * time.Microsecond, 0 * time.Microsecond, 0 * time.Microsecond, 0 * time.Microsecond, 0 * time.Microsecond, 0 * time.Microsecond, 0 * time.Microsecond, packetTime, packetTime, packetTime, packetTime, false, []int{}, []int{}, []time.Duration{}, []time.Duration{}, []time.Duration{}, []int{packetSize}}
if direction == "fwd" {
fwdPacketSize, bwdPacketSize = 1.0*packetSize, 0.0
flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeArr"]] = append(flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeArr"]].([]int), fwdPacketSize)
flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeTotal"]] = fwdPacketSize
flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeMean"]] = float64(fwdPacketSize)
flowDict[packet5Tuple][constants.MapKeys["fwdFlowLength"]] = 1
} else {
fwdPacketSize, bwdPacketSize = 0.0, 1.0*packetSize
flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeArr"]] = append(flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeArr"]].([]int), bwdPacketSize)
flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeTotal"]] = bwdPacketSize
flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeMean"]] = float64(bwdPacketSize)
flowDict[packet5Tuple][constants.MapKeys["bwdFlowLength"]] = 1
}
} else {
if flowDict[packet5Tuple][constants.MapKeys["flowLength"]].(int) <= constants.MaxPacketsPerFlow {
currIAT := packetTime.Sub(flowDict[packet5Tuple][constants.MapKeys["flowPrevTime"]].(time.Time))
flowDict[packet5Tuple][constants.MapKeys["IATArr"]] = append(flowDict[packet5Tuple][constants.MapKeys["IATArr"]].([]time.Duration), currIAT)
if direction == "fwd" {
fwdPacketSize, bwdPacketSize = packetSize, 0
flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeArr"]] = append(flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeArr"]].([]int), fwdPacketSize)
flowDict[packet5Tuple][constants.MapKeys["packetSizeArr"]] = append(flowDict[packet5Tuple][constants.MapKeys["packetSizeArr"]].([]int), fwdPacketSize)
flowDict[packet5Tuple][constants.MapKeys["fwdFlowLength"]] = flowDict[packet5Tuple][constants.MapKeys["fwdFlowLength"]].(int) + 1
if flowDict[packet5Tuple][constants.MapKeys["fwdFlowLength"]] == 1 {
flowDict[packet5Tuple][constants.MapKeys["fwdFlowPrevTime"]] = packetTime
} else {
currFwdIAT := packetTime.Sub(flowDict[packet5Tuple][constants.MapKeys["fwdFlowPrevTime"]].(time.Time))
flowDict[packet5Tuple][constants.MapKeys["fwdIATTotal"]] = flowDict[packet5Tuple][constants.MapKeys["fwdIATTotal"]].(time.Duration) + currFwdIAT
flowDict[packet5Tuple][constants.MapKeys["fwdIATArr"]] = append(flowDict[packet5Tuple][constants.MapKeys["fwdIATArr"]].([]time.Duration), currFwdIAT)
fwdIATMin, fwdIATMax := common.MinMaxTimeDuration(flowDict[packet5Tuple][constants.MapKeys["fwdIATArr"]].([]time.Duration))
flowDict[packet5Tuple][constants.MapKeys["fwdIATMean"]] = common.MeanTimeDuration(flowDict[packet5Tuple][constants.MapKeys["fwdIATArr"]].([]time.Duration))
flowDict[packet5Tuple][constants.MapKeys["fwdIATStd"]] = common.StdDevTimeDuration(flowDict[packet5Tuple][constants.MapKeys["fwdIATArr"]].([]time.Duration))
flowDict[packet5Tuple][constants.MapKeys["fwdIATMin"]] = fwdIATMin
flowDict[packet5Tuple][constants.MapKeys["fwdIATMax"]] = fwdIATMax
flowDict[packet5Tuple][constants.MapKeys["fwdFlowPrevTime"]] = packetTime
}
}
if direction == "bwd" {
fwdPacketSize, bwdPacketSize = 0, packetSize
flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeArr"]] = append(flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeArr"]].([]int), bwdPacketSize)
flowDict[packet5Tuple][constants.MapKeys["packetSizeArr"]] = append(flowDict[packet5Tuple][constants.MapKeys["packetSizeArr"]].([]int), bwdPacketSize)
flowDict[packet5Tuple][constants.MapKeys["bwdFlowLength"]] = flowDict[packet5Tuple][constants.MapKeys["bwdFlowLength"]].(int) + 1
if flowDict[packet5Tuple][constants.MapKeys["bwdFlowLength"]] == 1 {
flowDict[packet5Tuple][constants.MapKeys["bwdFlowPrevTime"]] = packetTime
} else {
currBwdIAT := packetTime.Sub(flowDict[packet5Tuple][constants.MapKeys["bwdFlowPrevTime"]].(time.Time))
flowDict[packet5Tuple][constants.MapKeys["bwdIATTotal"]] = flowDict[packet5Tuple][constants.MapKeys["bwdIATTotal"]].(time.Duration) + currBwdIAT
flowDict[packet5Tuple][constants.MapKeys["bwdIATArr"]] = append(flowDict[packet5Tuple][constants.MapKeys["bwdIATArr"]].([]time.Duration), currBwdIAT)
bwdIATMin, bwdIATMax := common.MinMaxTimeDuration(flowDict[packet5Tuple][constants.MapKeys["bwdIATArr"]].([]time.Duration))
flowDict[packet5Tuple][constants.MapKeys["bwdIATMean"]] = common.MeanTimeDuration(flowDict[packet5Tuple][constants.MapKeys["bwdIATArr"]].([]time.Duration))
flowDict[packet5Tuple][constants.MapKeys["bwdIATStd"]] = common.StdDevTimeDuration(flowDict[packet5Tuple][constants.MapKeys["bwdIATArr"]].([]time.Duration))
flowDict[packet5Tuple][constants.MapKeys["bwdIATMin"]] = bwdIATMin
flowDict[packet5Tuple][constants.MapKeys["bwdIATMax"]] = bwdIATMax
flowDict[packet5Tuple][constants.MapKeys["bwdFlowPrevTime"]] = packetTime
}
}
flowDict[packet5Tuple][constants.MapKeys["flowDuration"]] = packetTime.Sub(flowDict[packet5Tuple][constants.MapKeys["flowStartTime"]].(time.Time))
flowDict[packet5Tuple][constants.MapKeys["flowLength"]] = flowDict[packet5Tuple][constants.MapKeys["flowLength"]].(int) + 1
IATArr := append(flowDict[packet5Tuple][constants.MapKeys["fwdIATArr"]].([]time.Duration), flowDict[packet5Tuple][constants.MapKeys["bwdIATArr"]].([]time.Duration)...)
//flowDict[packet5Tuple][constants.MapKeys["IATTotal"]] = flowDict[packet5Tuple][constants.MapKeys["IATTotal"]].(time.Duration) + currIAT
flowDict[packet5Tuple][constants.MapKeys["IATTotal"]] = flowDict[packet5Tuple][constants.MapKeys["fwdIATTotal"]].(time.Duration) + flowDict[packet5Tuple][constants.MapKeys["bwdIATTotal"]].(time.Duration)
IATMin, IATMax := common.MinMaxTimeDuration(IATArr)
flowDict[packet5Tuple][constants.MapKeys["IATMin"]] = IATMin
flowDict[packet5Tuple][constants.MapKeys["IATMax"]] = IATMax
flowDict[packet5Tuple][constants.MapKeys["IATMean"]] = common.MeanTimeDuration(IATArr)
if len(IATArr) > 1 {
flowDict[packet5Tuple][constants.MapKeys["IATStd"]] = common.StdDevTimeDuration(IATArr)
}
flowDict[packet5Tuple][constants.MapKeys["flowPrevTime"]] = packetTime
fwdPacketSizeMin, fwdPacketSizeMax := common.MinMax(flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeArr"]].([]int))
bwdPacketSizeMin, bwdPacketSizeMax := common.MinMax(flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeArr"]].([]int))
flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeTotal"]] = flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeTotal"]].(int) + fwdPacketSize
flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeTotal"]] = flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeTotal"]].(int) + bwdPacketSize
flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeMean"]] = common.Mean(flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeArr"]].([]int))
flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeMean"]] = common.Mean(flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeArr"]].([]int))
flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeStd"]] = common.StdDev(flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeArr"]].([]int))
flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeStd"]] = common.StdDev(flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeArr"]].([]int))
flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeMin"]] = fwdPacketSizeMin
flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeMin"]] = bwdPacketSizeMin
flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeMax"]] = fwdPacketSizeMax
flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeMax"]] = bwdPacketSizeMax
// flowDict[packet5Tuple][constants.MapKeys["packetSizeArr"]]
flowDict[packet5Tuple][constants.MapKeys["packetSizeTotal"]] = flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeTotal"]].(int) + flowDict[packet5Tuple][constants.MapKeys["bwdPacketSizeTotal"]].(int)
//packetSizeArr := append(flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeArr"]].([]int), flowDict[packet5Tuple][constants.MapKeys["fwdPacketSizeArr"]].([]int)...)
packetSizeMin, packetSizeMax := common.MinMax(flowDict[packet5Tuple][constants.MapKeys["packetSizeArr"]].([]int))
flowDict[packet5Tuple][constants.MapKeys["packetSizeMin"]] = packetSizeMin
flowDict[packet5Tuple][constants.MapKeys["packetSizeMax"]] = packetSizeMax
flowDict[packet5Tuple][constants.MapKeys["packetSizeMean"]] = common.Mean(flowDict[packet5Tuple][constants.MapKeys["packetSizeArr"]].([]int))
flowDict[packet5Tuple][constants.MapKeys["packetSizeStd"]] = common.StdDev(flowDict[packet5Tuple][constants.MapKeys["packetSizeArr"]].([]int))
if flowDict[packet5Tuple][constants.MapKeys["flowLength"]].(int) >= constants.MinPacketsPerFlow {
flowDict[packet5Tuple][constants.MapKeys["minPacketsBool"]] = true
}
}
}
//logrus.Info("SaveInterval: ", constants.SaveIntervals)
_, ifSave := common.IfPresentInSlice(constants.SaveIntervals, numPackets)
if ifSave {
// if flowDict[packet5Tuple][constants.MapKeys["flowLength"]].(int) <= constants.MaxPacketsPerFlow {
if len(flowDict) > 0 {
for flow5Tuple, values := range flowDict {
features := []float64{}
// logrus.Info(flow5Tuple, flowDict[flow5Tuple][constants.MapKeys["flowLength"]], constants.MinPacketsPerFlow, numPackets, " - Flow stats.")
if (flowDict[flow5Tuple][constants.MapKeys["flowLength"]].(int) >= constants.MinPacketsPerFlow) && (flowDict[flow5Tuple][constants.MapKeys["flowDuration"]].(time.Duration) >= constants.MinTimeDuration) {
// Populate flowSave map with flows for which number of packets is beyond a given threshold.
flowSave[flow5Tuple] = values
// Create feature struct with float64 datatypes for features.
flow := common.FlowData(values)
// Create feature array for machine learning (ML) analysis.
features = append(features, flow.FlowDuration, flow.FlowLength, flow.FwdFlowLength, flow.BwdFlowLength, flow.PacketSizeTotal, flow.PacketSizeMean, flow.PacketSizeStd, flow.PacketSizeMin, flow.PacketSizeMax, flow.FwdPacketSizeTotal, flow.BwdPacketSizeTotal, flow.FwdPacketSizeMean, flow.BwdPacketSizeMean, flow.FwdPacketSizeStd, flow.BwdPacketSizeStd, flow.FwdPacketSizeMin, flow.BwdPacketSizeMin, flow.FwdPacketSizeMax, flow.BwdPacketSizeMax, flow.IATMean, flow.IATStd, flow.IATMin, flow.IATMax, flow.FwdIATTotal, flow.BwdIATTotal, flow.FwdIATMean, flow.BwdIATMean, flow.FwdIATStd, flow.BwdIATStd, flow.FwdIATMin, flow.BwdIATMin, flow.FwdIATMax, flow.BwdIATMax, flow.FlowLengthPerTime, flow.FwdFlowLengthPerTime, flow.BwdFlowLengthPerTime, flow.PacketSizeTotalPerTime, flow.FwdPacketSizeTotalPerTime, flow.BwdPacketSizeTotalPerTime)
if constants.IfFlowStatsVerbose {
// Scaling of array and ML prediction
scaledFeature := ml.StdScaler(features, meanScale, stdScale)
yPred := ml.GetCategory(ml.BinaryClassifier(ml.Sigmoid(ml.NetInput(wt, intercept, scaledFeature))))
logrus.Info(flow5Tuple, ": ", yPred, " ", ml.Sigmoid(ml.NetInput(wt, intercept, scaledFeature)))
logrus.Info(" ")
// Print flow statistics.
for j := 0; j < 41; j++ {
logrus.Info(constants.MapLabels[j], ": ", flowDict[flow5Tuple][j])
}
logrus.Info("Flow Length Per Time(ms) : ", float64(flowDict[flow5Tuple][constants.MapKeys["flowLength"]].(int))/float64(values[constants.MapKeys["flowDuration"]].(time.Duration)/time.Millisecond))
logrus.Info("Forward Flow Length Per Time(ms) : ", float64(flowDict[flow5Tuple][constants.MapKeys["fwdFlowLength"]].(int))/float64(values[constants.MapKeys["flowDuration"]].(time.Duration)/time.Millisecond))
logrus.Info("Backward Flow Length Per Time(ms) : ", float64(flowDict[flow5Tuple][constants.MapKeys["bwdFlowLength"]].(int))/float64(values[constants.MapKeys["flowDuration"]].(time.Duration)/time.Millisecond))
logrus.Info("Packet Size Per Time(ms) : ", float64(flowDict[flow5Tuple][constants.MapKeys["packetSizeTotal"]].(int))/float64(values[constants.MapKeys["flowDuration"]].(time.Duration)/time.Millisecond))
logrus.Info("Forward Packet Size Per Time(ms) : ", float64(flowDict[flow5Tuple][constants.MapKeys["fwdPacketSizeTotal"]].(int))/float64(values[constants.MapKeys["flowDuration"]].(time.Duration)/time.Millisecond))
logrus.Info("Backward Packet Size Per Time(ms) : ", float64(flowDict[flow5Tuple][constants.MapKeys["bwdPacketSizeTotal"]].(int))/float64(values[constants.MapKeys["flowDuration"]].(time.Duration)/time.Millisecond))
logrus.Info(" ")
logrus.Info(" ")
}
}
}
}
fileProcess.FileSave(flowSave, constants.MapKeys, constants.FlowOutputFolder+"/"+fname+"_flow_stats")
}
}
if numPackets == maxNumPackets {
logrus.Info("Target number packets reached.")
done <- struct{}{}
return nil
}
}
return nil
}