-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
consumer_group_members_test.go
120 lines (107 loc) · 3.42 KB
/
consumer_group_members_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package sarama
import (
"bytes"
"reflect"
"testing"
)
var (
groupMemberMetadataV0 = []byte{
0, 0, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
groupMemberAssignmentV0 = []byte{
0, 0, // Version
0, 0, 0, 1, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 0, 0, 3, // Topic one, partition array length
0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
// notably it looks like the old 3rdparty bsm/sarama-cluster incorrectly
// set V1 in the member metadata when it sent the JoinGroup request so
// we need to cope with that one being too short
groupMemberMetadataV1Bad = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
groupMemberMetadataV1 = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
0, 0, 0, 0, // OwnedPartitions KIP-429
}
groupMemberMetadataV3NilOwned = []byte{
0, 3, // Version
0, 0, 0, 1, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
0, 0, 0, 0, // OwnedPartitions KIP-429
0, 0, 0, 64, // GenerationID
0, 4, 'r', 'a', 'c', 'k', // RackID
}
)
func TestConsumerGroupMemberMetadata(t *testing.T) {
meta := &ConsumerGroupMemberMetadata{
Version: 0,
Topics: []string{"one", "two"},
UserData: []byte{0x01, 0x02, 0x03},
}
buf, err := encode(meta, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberMetadataV0, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadataV0, buf)
}
meta2 := new(ConsumerGroupMemberMetadata)
err = decode(buf, meta2, nil)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(meta, meta2) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", meta, meta2)
}
}
func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(groupMemberMetadataV1, meta, nil); err != nil {
t.Error("Failed to decode V1 data", err)
}
if err := decode(groupMemberMetadataV1Bad, meta, nil); err != nil {
t.Error("Failed to decode V1 'bad' data", err)
}
}
func TestConsumerGroupMemberMetadataV3Decode(t *testing.T) {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(groupMemberMetadataV3NilOwned, meta, nil); err != nil {
t.Error("Failed to decode V3 data", err)
}
}
func TestConsumerGroupMemberAssignment(t *testing.T) {
amt := &ConsumerGroupMemberAssignment{
Version: 0,
Topics: map[string][]int32{
"one": {0, 2, 4},
},
UserData: []byte{0x01, 0x02, 0x03},
}
buf, err := encode(amt, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberAssignmentV0, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignmentV0, buf)
}
amt2 := new(ConsumerGroupMemberAssignment)
err = decode(buf, amt2, nil)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(amt, amt2) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", amt, amt2)
}
}