-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pubsub: Messages for a key are not received up to an indefinite time when an old stream within the iterator closes and a new stream is opened #10440
Comments
This isn't recommended for exactly once delivery, since you need to check the AckResult to make sure the acks actually go through. Is there a reason why you're using the former?
To my knowledge, once a message gets into the library sections, there is always a graceful shutdown (it doesn't discard messages), even when the streams are recreated. I think your stream count is too high. It's something we just started warning users about, but we strongly recommend using 1 stream ( |
So we have made the change to use "AckResult"instead of "Ack"
That's what I was expecting, but it seems that some messages associated with an ordering key are sometimes delayed by up to 10 minutes. This typically occurs after a stream is recreated.
We tried setting NumGoroutines to 1, but this led to a reduction in the number of outstanding messages and caused delays in processing them. I am currently gathering metrics to determine the exact amount of data we're processing before adjusting the NumGoroutines value. Is there a metric in the library that can provide this information? Additionally, why does a higher goroutine count pose a problem? We have also created a Google support ticket that you can look into: https://console.cloud.google.com/support/cases/detail/v2/51980112 Thanks! |
Thanks, currently following up on the support ticket but thought I'd update you here as well:
The problem is that there's an unintended behavior where As mentioned before, each stream can handle 10 MB/s, and looking at your processing speed, it doesn't look like you are hitting that. |
Thanks for your response, @hongalex. Your explanation provides helpful context. However, I'm still unclear about how this relates to the issue we're facing. Our workers' outstanding messages are significantly below the MaxOutstandingMessages threshold. Are you observing any server-side metrics indicating that the server is pausing to send metrics to a client due to backpressure? Additionally, could you share which metrics you are using to determine that we are processing less than 10MB/s per client? I am trying to add a metric in our application to measure the payload size of a pubsub.Message. Another detail here is that within the Receive function, when we process a pubsub message, we publish a new message for the same ordering key before calling AckWithResult for the current message. Could that be an issue? |
Yes, I can see that your streams are being flow controlled, which unfortunately aren't public. Strangely, the expirations aren't tied to when the flow control is happening. It might be more correlated to when streams are reopened (as you originally suggested) but it's hard for me to see since I'm basing this on
These are server side metrics, but I'm seeing it per subscription, rather than per client. You can also estimate this if you know the average size of the message.
I don't think that's a problem, since the publish and subscribe side is fairly decoupled. |
Client
PubSub v1.38.0
Environment
GKE
Go Environment
NA
Code
We are using Google Pub/Sub with exactly-once and ordered delivery enabled on a topic. For acknowledging messages, we use "Ack" instead of "AckWithResult". I've observed that whenever a new stream is opened due to the Pub/Sub server closing an old stream (part of the iterator), messages that have been Ack'ed stop being received mostly until the ack deadline is reached or sometimes even longer (say 8 or 12 mins) when the deadline is 2m.
It seems that after a stream is closed because the Pub/Sub server sent a GOAWAY, the library fails to acknowledge the messages that were Ack'ed using ACK until the server's ack deadline is reached, and then they are resent for processing. Could it be that when streams are closed, the library discards the messages that were accepted for ACKing or doesn't retry for certain gRPC error codes when it should?
NumGoRoutines: 10
MaxOutstandingMessages: 4000
MinExtensionTimePeriod: 2m
MaxExtensionTimePeriod: 2m
Expected behavior
I would have expected the messages to show up immediately as they were acked by the application.
Actual behavior
The messages for a given key do not show up for a couple of couple of mins ("MaxExtensionPeriod" is 2 mins) and then they show up.
Screenshots
The text was updated successfully, but these errors were encountered: