[go: nahoru, domu]

Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Adjust broker ingress parameters to support target load (#1656)
Browse files Browse the repository at this point in the history
* Adjust broker ingress parameters to support target load

* fix unit test

* fix more unit

* log invalid env
  • Loading branch information
yolocs committed Sep 2, 2020
1 parent 46ac068 commit ad79cd4
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 30 deletions.
14 changes: 14 additions & 0 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ import (
"github.com/google/knative-gcp/pkg/utils/clients"
"github.com/google/knative-gcp/pkg/utils/mainhelper"

"cloud.google.com/go/pubsub"
"go.uber.org/zap"
)

type envConfig struct {
PodName string `envconfig:"POD_NAME" required:"true"`
Port int `envconfig:"PORT" default:"8080"`
ProjectID string `envconfig:"PROJECT_ID"`

// Default 300Mi.
PublishBufferedByteLimit int `envconfig:"PUBLISH_BUFFERED_BYTES_LIMIT" default:"314572800"`
}

const (
Expand Down Expand Up @@ -63,6 +67,7 @@ func main() {
clients.ProjectID(projectID),
metrics.PodName(env.PodName),
metrics.ContainerName(component),
publishSetting(logger.Desugar(), env),
)
if err != nil {
logger.Desugar().Fatal("Unable to create ingress handler: ", zap.Error(err))
Expand All @@ -73,3 +78,12 @@ func main() {
logger.Desugar().Fatal("failed to start ingress: ", zap.Error(err))
}
}

func publishSetting(logger *zap.Logger, env envConfig) pubsub.PublishSettings {
s := pubsub.DefaultPublishSettings
if env.PublishBufferedByteLimit > 0 {
logger.Warn("PUBLISH_BUFFERED_BYTES_LIMIT is less or equal than 0; ignoring it", zap.Int("PublishBufferedByteLimit", env.PublishBufferedByteLimit))
s.BufferedByteLimit = env.PublishBufferedByteLimit
}
return s
}
2 changes: 2 additions & 0 deletions cmd/broker/ingress/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package main
import (
"context"

"cloud.google.com/go/pubsub"
"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/ingress"
"github.com/google/knative-gcp/pkg/metrics"
Expand All @@ -34,6 +35,7 @@ func InitializeHandler(
projectID clients.ProjectID,
podName metrics.PodName,
containerName metrics.ContainerName,
publishSettings pubsub.PublishSettings,
) (*ingress.Handler, error) {
panic(wire.Build(
ingress.HandlerSet,
Expand Down
5 changes: 3 additions & 2 deletions cmd/broker/ingress/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/apis/intevents/v1alpha1/brokercell_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@ const (
// usage, HPA could have enough time to kick in.
// See: https://github.com/google/knative-gcp/issues/1265
avgMemoryUsageFanout string = "1500Mi"
avgMemoryUsageIngress string = "700Mi"
avgMemoryUsageIngress string = "1500Mi"
avgMemoryUsageRetry string = "1500Mi"
cpuRequestFanout string = "1500m"
cpuRequestIngress string = "1000m"
cpuRequestIngress string = "2000m"
cpuRequestRetry string = "1000m"
cpuLimitFanout string = ""
cpuLimitIngress string = ""
cpuLimitRetry string = ""
memoryRequestFanout string = "500Mi"
memoryRequestIngress string = "500Mi"
memoryRequestIngress string = "2000Mi"
memoryRequestRetry string = "500Mi"
memoryLimitFanout string = "3000Mi"
memoryLimitIngress string = "1000Mi"
memoryLimitIngress string = "2000Mi"
memoryLimitRetry string = "3000Mi"
minReplicas int32 = 1
maxReplicas int32 = 10
Expand Down
7 changes: 4 additions & 3 deletions pkg/apis/intevents/v1alpha1/brokercell_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ func TestBrokerCell_Validate(t *testing.T) {
Spec: (func() BrokerCellSpec {
brokerCellWithInvalidMemoryRequest := MakeDefaultBrokerCellSpec()
testComponent := brokerCellWithInvalidMemoryRequest.Components.Ingress
testComponent.MemoryLimit = "1000Mi"
testComponent.AvgMemoryUsage = ptr.String("1001Mi")
testComponent.MemoryLimit = "2000Mi"
testComponent.MemoryRequest = "2000Mi"
testComponent.AvgMemoryUsage = ptr.String("2001Mi")
return brokerCellWithInvalidMemoryRequest
}()),
},
want: func() *apis.FieldError {
var fieldErrors *apis.FieldError
fe := apis.ErrInvalidValue("1001Mi", "spec.components.ingress.avgMemoryUsage")
fe := apis.ErrInvalidValue("2001Mi", "spec.components.ingress.avgMemoryUsage")
fe.Details = "avgMemoryUsage should not exceed the memory limit"
fieldErrors = fieldErrors.Also(fe)
return fieldErrors
Expand Down
4 changes: 2 additions & 2 deletions pkg/broker/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func runIngressHandlerBenchmark(b *testing.B, eventSize int) {
defer psSrv.Close()

psClient := createPubsubClient(ctx, b, psSrv)
decouple := NewMultiTopicDecoupleSink(ctx, memory.NewTargets(brokerConfig), psClient)
decouple := NewMultiTopicDecoupleSink(ctx, memory.NewTargets(brokerConfig), psClient, pubsub.DefaultPublishSettings)
statsReporter, err := metrics.NewIngressReporter(metrics.PodName(pod), metrics.ContainerName(container))
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -405,7 +405,7 @@ func setupTestReceiver(ctx context.Context, t testing.TB, psSrv *pstest.Server)

// createAndStartIngress creates an ingress and calls its Start() method in a goroutine.
func createAndStartIngress(ctx context.Context, t testing.TB, psSrv *pstest.Server) string {
decouple := NewMultiTopicDecoupleSink(ctx, memory.NewTargets(brokerConfig), createPubsubClient(ctx, t, psSrv))
decouple := NewMultiTopicDecoupleSink(ctx, memory.NewTargets(brokerConfig), createPubsubClient(ctx, t, psSrv), pubsub.DefaultPublishSettings)

receiver := &testHttpMessageReceiver{urlCh: make(chan string)}
statsReporter, err := metrics.NewIngressReporter(metrics.PodName(pod), metrics.ContainerName(container))
Expand Down
17 changes: 12 additions & 5 deletions pkg/broker/ingress/multi_topic_decouple_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,17 @@ import (
const projectEnvKey = "PROJECT_ID"

// NewMultiTopicDecoupleSink creates a new multiTopicDecoupleSink.
func NewMultiTopicDecoupleSink(ctx context.Context, brokerConfig config.ReadonlyTargets, client *pubsub.Client) *multiTopicDecoupleSink {
func NewMultiTopicDecoupleSink(
ctx context.Context,
brokerConfig config.ReadonlyTargets,
client *pubsub.Client,
publishSettings pubsub.PublishSettings) *multiTopicDecoupleSink {

return &multiTopicDecoupleSink{
logger: logging.FromContext(ctx),
pubsub: client,
brokerConfig: brokerConfig,
logger: logging.FromContext(ctx),
pubsub: client,
publishSettings: publishSettings,
brokerConfig: brokerConfig,
// TODO(#1118): remove Topic when broker config is removed
topics: make(map[types.NamespacedName]*pubsub.Topic),
}
Expand All @@ -52,7 +58,8 @@ func NewMultiTopicDecoupleSink(ctx context.Context, brokerConfig config.Readonly
// to the broker to which the events are sent.
type multiTopicDecoupleSink struct {
// pubsub talks to pubsub.
pubsub *pubsub.Client
pubsub *pubsub.Client
publishSettings pubsub.PublishSettings
// map from brokers to topics
topics map[types.NamespacedName]*pubsub.Topic
topicsMut sync.RWMutex
Expand Down
2 changes: 1 addition & 1 deletion pkg/broker/ingress/multi_topic_decouple_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestMultiTopicDecoupleSink(t *testing.T) {
t.Fatal(err)
}

sink := NewMultiTopicDecoupleSink(ctx, brokerConfig, psClient)
sink := NewMultiTopicDecoupleSink(ctx, brokerConfig, psClient, pubsub.DefaultPublishSettings)
// Send events
event := createTestEvent(uuid.New().String())
err = sink.Send(context.Background(), testCase.broker, *event)
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/brokercell/resources/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func MakeIngressDeployment(args IngressArgs) *appsv1.Deployment {
Scheme: corev1.URISchemeHTTP,
},
},
FailureThreshold: 3,
FailureThreshold: 5,
PeriodSeconds: 2,
SuccessThreshold: 1,
TimeoutSeconds: 5,
Expand All @@ -56,7 +56,7 @@ func MakeIngressDeployment(args IngressArgs) *appsv1.Deployment {
Scheme: corev1.URISchemeHTTP,
},
},
FailureThreshold: 3,
FailureThreshold: 5,
InitialDelaySeconds: 5,
PeriodSeconds: 2,
SuccessThreshold: 1,
Expand Down
10 changes: 5 additions & 5 deletions pkg/reconciler/brokercell/testingdata/ingress_deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ spec:
- name: ingress
image: ingress
livenessProbe:
failureThreshold: 3
failureThreshold: 5
httpGet:
path: /healthz
port: 8080
Expand All @@ -53,7 +53,7 @@ spec:
successThreshold: 1
timeoutSeconds: 5
readinessProbe:
failureThreshold: 3
failureThreshold: 5
httpGet:
path: /healthz
port: 8080
Expand Down Expand Up @@ -85,10 +85,10 @@ spec:
mountPath: /var/secrets/google
resources:
limits:
memory: 1000Mi
memory: 2000Mi
requests:
cpu: 1000m
memory: 500Mi
cpu: 2000m
memory: 2000Mi
ports:
- name: metrics
containerPort: 9090
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ spec:
- name: ingress
image: ingress
livenessProbe:
failureThreshold: 3
failureThreshold: 5
httpGet:
path: /healthz
port: 8080
Expand All @@ -54,7 +54,7 @@ spec:
successThreshold: 1
timeoutSeconds: 5
readinessProbe:
failureThreshold: 3
failureThreshold: 5
httpGet:
path: /healthz
port: 8080
Expand Down Expand Up @@ -86,10 +86,10 @@ spec:
mountPath: /var/secrets/google
resources:
limits:
memory: 1000Mi
memory: 2000Mi
requests:
cpu: 1000m
memory: 500Mi
cpu: 2000m
memory: 2000Mi
ports:
- name: metrics
containerPort: 9090
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/brokercell/testingdata/ingress_hpa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ spec:
name: memory
target:
type: AverageValue
averageValue: 700Mi
averageValue: 1500Mi

0 comments on commit ad79cd4

Please sign in to comment.