[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: Messages for a key are not received up to an indefinite time when an old stream within the iterator closes and a new stream is opened #10440

Open
pawanrawal opened this issue Jun 26, 2024 · 5 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. status: investigating The issue is under investigation, which is determined to be non-trivial.

Comments

@pawanrawal
Copy link
pawanrawal commented Jun 26, 2024

Client

PubSub v1.38.0

Environment

GKE

Go Environment

NA

Code

We are using Google Pub/Sub with exactly-once and ordered delivery enabled on a topic. For acknowledging messages, we use "Ack" instead of "AckWithResult". I've observed that whenever a new stream is opened due to the Pub/Sub server closing an old stream (part of the iterator), messages that have been Ack'ed stop being received mostly until the ack deadline is reached or sometimes even longer (say 8 or 12 mins) when the deadline is 2m.

It seems that after a stream is closed because the Pub/Sub server sent a GOAWAY, the library fails to acknowledge the messages that were Ack'ed using ACK until the server's ack deadline is reached, and then they are resent for processing. Could it be that when streams are closed, the library discards the messages that were accepted for ACKing or doesn't retry for certain gRPC error codes when it should?

NumGoRoutines: 10
MaxOutstandingMessages: 4000
MinExtensionTimePeriod: 2m
MaxExtensionTimePeriod: 2m

Expected behavior

I would have expected the messages to show up immediately as they were acked by the application.

Actual behavior

The messages for a given key do not show up for a couple of couple of mins ("MaxExtensionPeriod" is 2 mins) and then they show up.

Screenshots

  • logs showing 2 min gap between a message being acked and then being received again
Screenshot 2024-06-26 at 20 57 01
  • Expired ack deadlines graph from the pubsub server
Screenshot 2024-06-26 at 20 58 52
  • Graph showing increase in open_stream_count around 20:25 when we saw that the message was delayed by a couple of mins
Screenshot 2024-06-26 at 21 09 07
@pawanrawal pawanrawal added the triage me I really want to be triaged. label Jun 26, 2024
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Jun 26, 2024
@pawanrawal pawanrawal changed the title pubsub: Messages are not acked when an old stream within the iterator closes and a new stream is opened pubsub: Messages for a key are not received up to MaxExtensionPeriod when an old stream within the iterator closes and a new stream is opened Jun 26, 2024
@pawanrawal pawanrawal changed the title pubsub: Messages for a key are not received up to MaxExtensionPeriod when an old stream within the iterator closes and a new stream is opened pubsub: Messages for a key are not received up to an indefinite team when an old stream within the iterator closes and a new stream is opened Jun 26, 2024
@hongalex hongalex added status: investigating The issue is under investigation, which is determined to be non-trivial. and removed triage me I really want to be triaged. labels Jun 26, 2024
@hongalex
Copy link
Member
hongalex commented Jun 26, 2024

For acknowledging messages, we use "Ack" instead of "AckWithResult".

This isn't recommended for exactly once delivery, since you need to check the AckResult to make sure the acks actually go through. Is there a reason why you're using the former?

Could it be that when streams are closed, the library discards the messages that were accepted for ACKing or doesn't retry for certain gRPC error codes when it should?

To my knowledge, once a message gets into the library sections, there is always a graceful shutdown (it doesn't discard messages), even when the streams are recreated.

I think your stream count is too high. It's something we just started warning users about, but we strongly recommend using 1 stream (sub.ReceiveSettings.NumGoroutines=1) if your processing is under 10 MB/s. If higher, I recommend horizontal scaling (creating more subscriber applications). Could you try lowering that to see if this helps?

@pawanrawal
Copy link
Author

So we have made the change to use "AckResult"instead of "Ack"

To my knowledge, once a message gets into the library sections, there is always a graceful shutdown (it doesn't discard messages), even when the streams are recreated.

That's what I was expecting, but it seems that some messages associated with an ordering key are sometimes delayed by up to 10 minutes. This typically occurs after a stream is recreated.

I think your stream count is too high. It's something we just started warning users about, but we strongly recommend using 1 stream (sub.ReceiveSettings.NumGoroutines=1) if your processing is under 10 MB/s. If higher, I recommend horizontal scaling (creating more subscriber applications). Could you try lowering that to see if this helps?

We tried setting NumGoroutines to 1, but this led to a reduction in the number of outstanding messages and caused delays in processing them. I am currently gathering metrics to determine the exact amount of data we're processing before adjusting the NumGoroutines value. Is there a metric in the library that can provide this information? Additionally, why does a higher goroutine count pose a problem?

We have also created a Google support ticket that you can look into: https://console.cloud.google.com/support/cases/detail/v2/51980112

Thanks!

@pawanrawal pawanrawal changed the title pubsub: Messages for a key are not received up to an indefinite team when an old stream within the iterator closes and a new stream is opened pubsub: Messages for a key are not received up to an indefinite time when an old stream within the iterator closes and a new stream is opened Jun 27, 2024
@hongalex
Copy link
Member

Thanks, currently following up on the support ticket but thought I'd update you here as well:

We tried setting NumGoroutines to 1, but this led to a reduction in the number of outstanding messages and caused delays in processing them. I am currently gathering metrics to determine the exact amount of data we're processing before adjusting the NumGoroutines value. Is there a metric in the library that can provide this information? Additionally, why does a higher goroutine count pose a problem?

The problem is that there's an unintended behavior where MaxOutstandingMessages isn't being respected when NumGoroutines is > 1. This setting configures how many streaming pull connections we open, where each stream currently tells the server "I want you to give me 4,000 messages" at a time, using your numbers. The client is still limited by 4,000 messages total, but if you have 10 streams, the server will try to deliver 40,000 messages at once. The client will start processing 4,000, and pause on the remaining 36,000 while the others are being processed. This is likely what is contributing to message expirations.

As mentioned before, each stream can handle 10 MB/s, and looking at your processing speed, it doesn't look like you are hitting that.

@pawanrawal
Copy link
Author

Thanks for your response, @hongalex. Your explanation provides helpful context. However, I'm still unclear about how this relates to the issue we're facing. Our workers' outstanding messages are significantly below the MaxOutstandingMessages threshold. Are you observing any server-side metrics indicating that the server is pausing to send metrics to a client due to backpressure? Additionally, could you share which metrics you are using to determine that we are processing less than 10MB/s per client? I am trying to add a metric in our application to measure the payload size of a pubsub.Message.

Another detail here is that within the Receive function, when we process a pubsub message, we publish a new message for the same ordering key before calling AckWithResult for the current message. Could that be an issue?

@hongalex
Copy link
Member
hongalex commented Jun 28, 2024

Are you observing any server-side metrics indicating that the server is pausing to send metrics to a client due to backpressure?

Yes, I can see that your streams are being flow controlled, which unfortunately aren't public. Strangely, the expirations aren't tied to when the flow control is happening. It might be more correlated to when streams are reopened (as you originally suggested) but it's hard for me to see since I'm basing this on subscription/open_streaming_pulls which fluctuates as subscriber clients come up and spin down.

Additionally, could you share which metrics you are using to determine that we are processing less than 10MB/s per client? I am trying to add a metric in our application to measure the payload size of a pubsub.Message.

These are server side metrics, but I'm seeing it per subscription, rather than per client. You can also estimate this if you know the average size of the message.

Another detail here is that within the Receive function, when we process a pubsub message, we publish a new message for the same ordering key before calling AckWithResult for the current message. Could that be an issue?

I don't think that's a problem, since the publish and subscribe side is fairly decoupled.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. status: investigating The issue is under investigation, which is determined to be non-trivial.
Projects
None yet
Development

No branches or pull requests

2 participants