-
Notifications
You must be signed in to change notification settings - Fork 0
/
elkeid_test.go
84 lines (80 loc) · 1.94 KB
/
elkeid_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package natsmq
import (
"errors"
"fmt"
"log"
"testing"
"time"
agentproto "github.com/bytedance/Elkeid/agent/proto"
"github.com/bytedance/Elkeid/server/agent_center/common/ylog"
pb "github.com/bytedance/Elkeid/server/agent_center/grpctrans/proto"
"github.com/gogo/protobuf/proto"
"github.com/nats-io/nats.go"
)
func Testreceive(t *testing.T) {
logLevel := 1
logPath := "/tmp/a.log"
logger := ylog.NewYLog(
ylog.WithLogFile(logPath),
ylog.WithMaxAge(3),
ylog.WithMaxSize(10),
ylog.WithMaxBackups(3),
ylog.WithLevel(logLevel),
)
ylog.InitLogger(logger)
InitNats()
sub, err := Js.PullSubscribe(SubjectRawData, DurableName)
if err != nil {
log.Fatal(err)
}
//for {
msgs, err := sub.Fetch(10, nats.MaxWait(30*time.Second))
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
}
ylog.Infof("nats", "fetch error: %v\n", err.Error())
}
for _, msg := range msgs {
fmt.Println("new msg >>>")
m := MQMsgPool.Get().(*pb.MQData)
err := proto.Unmarshal(msg.Data, m)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Msg type: %+v\n", m.DataType)
t := PayloadPool.Get().(*agentproto.Payload)
switch m.DataType {
case 1000:
fmt.Printf("Agent Stat\n")
case 1001:
fmt.Printf("Plgin Stat\n")
case 1010:
fmt.Printf("Plugin loger\n")
case 5001:
fmt.Printf("Plugin : Collector ===> Socket\n")
case 5002:
fmt.Printf("Plugin : Collector ===> User\n")
case 5003:
fmt.Printf("Plugin : Collector ===> Cron\n")
case 5004:
fmt.Printf("Plugin : Collector ===> Deb\n")
case 5005:
fmt.Printf("Plugin : Collector ===> rpm\n")
case 5006:
fmt.Printf("Plugin : Collector ===> pypi\n")
case 5010:
fmt.Printf("Plugin Collector ===> Systemd Unit\n")
case 5011:
fmt.Printf("Plugin Collector ===> Jar\n")
default:
fmt.Printf("Msg: %+v\n", m)
}
err = proto.Unmarshal(m.Body, t)
fmt.Printf("Payload : %v\n", t.Fields)
fmt.Printf("end ----\n")
msg.Ack()
PayloadPool.Put(t)
MQMsgPool.Put(m)
}
//}
}