[go: nahoru, domu]

Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Adjust broker ingress parameters to support target load #1656

Merged
merged 4 commits into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ import (
"github.com/google/knative-gcp/pkg/utils/clients"
"github.com/google/knative-gcp/pkg/utils/mainhelper"

"cloud.google.com/go/pubsub"
"go.uber.org/zap"
)

type envConfig struct {
PodName string `envconfig:"POD_NAME" required:"true"`
Port int `envconfig:"PORT" default:"8080"`
ProjectID string `envconfig:"PROJECT_ID"`

// Default 300Mi.
PublishBufferedByteLimit int `envconfig:"PUBLISH_BUFFERED_BYTES_LIMIT" default:"314572800"`
}

const (
Expand Down Expand Up @@ -63,6 +67,7 @@ func main() {
clients.ProjectID(projectID),
metrics.PodName(env.PodName),
metrics.ContainerName(component),
publishSetting(env),
)
if err != nil {
logger.Desugar().Fatal("Unable to create ingress handler: ", zap.Error(err))
Expand All @@ -73,3 +78,11 @@ func main() {
logger.Desugar().Fatal("failed to start ingress: ", zap.Error(err))
}
}

func publishSetting(env envConfig) pubsub.PublishSettings {
s := pubsub.DefaultPublishSettings
if env.PublishBufferedByteLimit > 0 {
yolocs marked this conversation as resolved.
Show resolved Hide resolved
s.BufferedByteLimit = env.PublishBufferedByteLimit
}
return s
}
2 changes: 2 additions & 0 deletions cmd/broker/ingress/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package main
import (
"context"

"cloud.google.com/go/pubsub"
"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/ingress"
"github.com/google/knative-gcp/pkg/metrics"
Expand All @@ -34,6 +35,7 @@ func InitializeHandler(
projectID clients.ProjectID,
podName metrics.PodName,
containerName metrics.ContainerName,
publishSettings pubsub.PublishSettings,
) (*ingress.Handler, error) {
panic(wire.Build(
ingress.HandlerSet,
Expand Down
5 changes: 3 additions & 2 deletions cmd/broker/ingress/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/apis/intevents/v1alpha1/brokercell_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@ const (
// usage, HPA could have enough time to kick in.
// See: https://github.com/google/knative-gcp/issues/1265
avgMemoryUsageFanout string = "1500Mi"
avgMemoryUsageIngress string = "700Mi"
avgMemoryUsageIngress string = "1500Mi"
avgMemoryUsageRetry string = "1500Mi"
cpuRequestFanout string = "1500m"
cpuRequestIngress string = "1000m"
cpuRequestIngress string = "2000m"
cpuRequestRetry string = "1000m"
cpuLimitFanout string = ""
cpuLimitIngress string = ""
cpuLimitRetry string = ""
memoryRequestFanout string = "500Mi"
memoryRequestIngress string = "500Mi"
memoryRequestIngress string = "2000Mi"
memoryRequestRetry string = "500Mi"
memoryLimitFanout string = "3000Mi"
memoryLimitIngress string = "1000Mi"
memoryLimitIngress string = "2000Mi"
memoryLimitRetry string = "3000Mi"
minReplicas int32 = 1
maxReplicas int32 = 10
Expand Down
4 changes: 2 additions & 2 deletions pkg/broker/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func runIngressHandlerBenchmark(b *testing.B, eventSize int) {
defer psSrv.Close()

psClient := createPubsubClient(ctx, b, psSrv)
decouple := NewMultiTopicDecoupleSink(ctx, memory.NewTargets(brokerConfig), psClient)
decouple := NewMultiTopicDecoupleSink(ctx, memory.NewTargets(brokerConfig), psClient, pubsub.DefaultPublishSettings)
statsReporter, err := metrics.NewIngressReporter(metrics.PodName(pod), metrics.ContainerName(container))
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -405,7 +405,7 @@ func setupTestReceiver(ctx context.Context, t testing.TB, psSrv *pstest.Server)

// createAndStartIngress creates an ingress and calls its Start() method in a goroutine.
func createAndStartIngress(ctx context.Context, t testing.TB, psSrv *pstest.Server) string {
decouple := NewMultiTopicDecoupleSink(ctx, memory.NewTargets(brokerConfig), createPubsubClient(ctx, t, psSrv))
decouple := NewMultiTopicDecoupleSink(ctx, memory.NewTargets(brokerConfig), createPubsubClient(ctx, t, psSrv), pubsub.DefaultPublishSettings)

receiver := &testHttpMessageReceiver{urlCh: make(chan string)}
statsReporter, err := metrics.NewIngressReporter(metrics.PodName(pod), metrics.ContainerName(container))
Expand Down
17 changes: 12 additions & 5 deletions pkg/broker/ingress/multi_topic_decouple_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,17 @@ import (
const projectEnvKey = "PROJECT_ID"

// NewMultiTopicDecoupleSink creates a new multiTopicDecoupleSink.
func NewMultiTopicDecoupleSink(ctx context.Context, brokerConfig config.ReadonlyTargets, client *pubsub.Client) *multiTopicDecoupleSink {
func NewMultiTopicDecoupleSink(
ctx context.Context,
brokerConfig config.ReadonlyTargets,
client *pubsub.Client,
publishSettings pubsub.PublishSettings) *multiTopicDecoupleSink {

return &multiTopicDecoupleSink{
logger: logging.FromContext(ctx),
pubsub: client,
brokerConfig: brokerConfig,
logger: logging.FromContext(ctx),
pubsub: client,
publishSettings: publishSettings,
brokerConfig: brokerConfig,
// TODO(#1118): remove Topic when broker config is removed
topics: make(map[types.NamespacedName]*pubsub.Topic),
}
Expand All @@ -52,7 +58,8 @@ func NewMultiTopicDecoupleSink(ctx context.Context, brokerConfig config.Readonly
// to the broker to which the events are sent.
type multiTopicDecoupleSink struct {
// pubsub talks to pubsub.
pubsub *pubsub.Client
pubsub *pubsub.Client
publishSettings pubsub.PublishSettings
// map from brokers to topics
topics map[types.NamespacedName]*pubsub.Topic
topicsMut sync.RWMutex
Expand Down
2 changes: 1 addition & 1 deletion pkg/broker/ingress/multi_topic_decouple_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestMultiTopicDecoupleSink(t *testing.T) {
t.Fatal(err)
}

sink := NewMultiTopicDecoupleSink(ctx, brokerConfig, psClient)
sink := NewMultiTopicDecoupleSink(ctx, brokerConfig, psClient, pubsub.DefaultPublishSettings)
// Send events
event := createTestEvent(uuid.New().String())
err = sink.Send(context.Background(), testCase.broker, *event)
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/brokercell/resources/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func MakeIngressDeployment(args IngressArgs) *appsv1.Deployment {
Scheme: corev1.URISchemeHTTP,
},
},
FailureThreshold: 3,
FailureThreshold: 5,
PeriodSeconds: 2,
SuccessThreshold: 1,
TimeoutSeconds: 5,
Expand All @@ -56,7 +56,7 @@ func MakeIngressDeployment(args IngressArgs) *appsv1.Deployment {
Scheme: corev1.URISchemeHTTP,
},
},
FailureThreshold: 3,
FailureThreshold: 5,
InitialDelaySeconds: 5,
PeriodSeconds: 2,
SuccessThreshold: 1,
Expand Down
10 changes: 5 additions & 5 deletions pkg/reconciler/brokercell/testingdata/ingress_deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ spec:
- name: ingress
image: ingress
livenessProbe:
failureThreshold: 3
failureThreshold: 5
httpGet:
path: /healthz
port: 8080
Expand All @@ -53,7 +53,7 @@ spec:
successThreshold: 1
timeoutSeconds: 5
readinessProbe:
failureThreshold: 3
failureThreshold: 5
httpGet:
path: /healthz
port: 8080
Expand Down Expand Up @@ -85,10 +85,10 @@ spec:
mountPath: /var/secrets/google
resources:
limits:
memory: 1000Mi
memory: 2000Mi
requests:
cpu: 1000m
memory: 500Mi
cpu: 2000m
memory: 2000Mi
ports:
- name: metrics
containerPort: 9090
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ spec:
- name: ingress
image: ingress
livenessProbe:
failureThreshold: 3
failureThreshold: 5
httpGet:
path: /healthz
port: 8080
Expand All @@ -54,7 +54,7 @@ spec:
successThreshold: 1
timeoutSeconds: 5
readinessProbe:
failureThreshold: 3
failureThreshold: 5
httpGet:
path: /healthz
port: 8080
Expand Down Expand Up @@ -86,10 +86,10 @@ spec:
mountPath: /var/secrets/google
resources:
limits:
memory: 1000Mi
memory: 2000Mi
requests:
cpu: 1000m
memory: 500Mi
cpu: 2000m
memory: 2000Mi
ports:
- name: metrics
containerPort: 9090
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/brokercell/testingdata/ingress_hpa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ spec:
name: memory
target:
type: AverageValue
averageValue: 700Mi
averageValue: 1500Mi