[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: Significant redelivery while existing messages are still being "processed" #10437

Open
jameshartig opened this issue Jun 25, 2024 · 7 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@jameshartig
Copy link
jameshartig commented Jun 25, 2024

Client

PubSub v1.39.0

Environment

CentOS on GCE

Go Environment

N/A

Code

I'll try to include the relevant code below as it pertains to this issue.

e.g.

pubsub.NewClientWithConfig(ctx, project, &pubsub.ClientConfig{}, 
	 option.WithGRPCConnectionPool(4),
)
...
sub.ReceiveSettings.MaxOutstandingMessages = 400000
sub.ReceiveSettings.MaxOutstandingBytes = -1
sub.ReceiveSettings.MaxExtension = 2*time.Hour + 30*time.Minute + 5*time.Minute
sub.ReceiveSettings.MinExtensionPeriod = 10*time.Minute
sub.ReceiveSettings.Synchronous = false
sub.ReceiveSettings.NumGoroutines = 1
...
var waitingIDs sync.Map
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	var qbm queuedExitMessage
	if err := json.Unmarshal(m.Data, &qbm); err != nil {
		// nack
		return
	}

	if qbm.Time.Add(minTime).Before(time.Now()) {
		// check to see if we already have this message ID in memory and if
		// we do then we need to cancel the old one
		waitingCancelledCh = make(chan struct{})
		olderMsgCancelChI, ok := waitingIDs.Swap(psm.ID, waitingCancelledCh)
		// clean it up once were done as long as its the same channel
		defer waitingIDs.CompareAndDelete(psm.ID, waitingCancelledCh)
		// if there was already a channel in there then close it to cause it
		// to stop waiting
		if ok {
			close(olderMsgCancelChI.(chan struct{}))
		}
		select {
		case <-ctx.Done():
			// nack
			return
		case <-time.After(time.Duration(srimErr)):
			// continue to handle message
		case <-waitingCancelledCh:
			// nack since another goroutine got this message
			return
		}
	}

	// ... process message
})

Expected behavior

I expect there to be minimal redeliveries while a message is being handled and should still be modacked by the client.

Actual behavior

We see messages being redelivered in spikes frequently. Since we only keep track of "already processing" messages on a per VM basis we can't accurately track when we get redeliveries to different VMs.

Screenshots

There are around 400k-450k unacked messages at any given time.
image

That said, the messages are each pretty small and only total 50MB-60MB:
image

We let messages get as old as ~2 hours before we start to actually "process" them:
image

We are generally "processing" the message in less than 10 seconds at the 90th percentile:
image

This is a chart of all of the times we had to cancel an existing "pending" message because we received another one.
image
You'll notice it spikes as high as 300k which means we are getting redeliveries of most of the outstanding messages.

This chart shows the total messages being held in memory per datacenter:
image
You'll notice that it's pretty normal until 6/22/2024 at 20:00 UTC where it then started to increase dramatically. Then it returned to normal on 6/24 but then started to be erratic again on 6/24 at 20:00 UTC. Another thing to notice is that it spikes above 400k which means we have significantly more messages being held in-memory than are even outstanding in the subscription.

Even worse is if you look at the messages being held by the pub/sub client itself (this comes from the OustandingMessagesView) (green is sum (across 2-3 servers) and yellow is average):
image
When things were normal the client was holding onto 400k-500k messages but when things go wrong the client is holding onto 1 million+ messages (presumably in flow control) which seems insane since there shouldn't be more than 400k messages in the subscription.

We set MinExtensionPeriod to 10 minutes since we expect to hold onto the messages for a long time and don't need to modAck all of the time. This seems to be working since we see a spike in modAck's every 5 minutes:
image

What's odd though is that around when we submit the modAck's we see the spike in new pulled messages and spike in duplicate messages:
image

Additional context

NumGoroutines was previously set to 2 but after the issues on 6/22, we lowered it to 1 without much of a difference. WithGRPCConnectionPool was previously set to 2 (default) but we raised it to 4 at the same time as lowering NumGoroutines and again, there wasn't much difference. We also tried lowering the number of VMs supporting this workload from 3 -> 2 but that seemed to have made things worse.

The Acknowledgement deadline on the subscription is set to the default (10 seconds). We haven't changed this but could if you think that would help.

We were previously "abusing" the backoff delay and kept nack'ing messages until the ~2 hours was up but after talking to a Pub/Sub PM we were told that the backoff delay isn't a guarantee and that only a certain number of messages will respect that backoff delay. This approach (holding the messages in memory in our handler) was suggested as a better approach. Things were working pretty normally until recently so I'm not sure if there was some change on the Pub/Sub side. We didn't make any changes to our code on 6/22. It's also a bit interesting that things were okay on 6/24 until 20:00 UTC again like on 6/22.

I'm wondering if there's some memory/message limit on the forwarders that we might be hitting which causes them to crash and then another forwarder redelivers the messages. My assumption is based on us not having issues when the number of undelivered messages total stays under ~420k because on both 6/22, 6/23, and 6/24 we hit new peaks of ~430k, ~451k, and ~445k outstanding messages, respectively. If that's the case then would actually increasing the number of goroutines help so that messages could be potentially spread across more forwarders? We could also try to decrease the size of the messages but they're already pretty small.

Finally, related to above, I limited this to only europe-west1 because we have the most traffic there and the other regions seem relatively unaffected.

@hongalex I apologize in advance for the complicated issues.

@jameshartig jameshartig added the triage me I really want to be triaged. label Jun 25, 2024
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Jun 25, 2024
@hongalex hongalex added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. and removed triage me I really want to be triaged. labels Jun 26, 2024
@hongalex
Copy link
Member

Thanks as always for the detailed logs.

What's odd though is that around when we submit the modAck's we see the spike in new pulled messages and spike in duplicate messages:

I think this might be the other way around, where pulled messages = more modacks issued at the time they are pulled in (to acknowledge receip of these messages).

If that's the case then would actually increasing the number of goroutines help so that messages could be potentially spread across more forwarders? We could also try to decrease the size of the messages but they're already pretty small.

There's no guarantee that increasing the number of streams means you'll hit another server. On top of that, this level of processing shouldn't be OOMing the servers so that doesn't seem right either. I'll check to see if there are any changes around then that might've resulted in this.

Just to confirm, this region was processing the data at the same speed (around 400k messages per client outstanding) without issue prior to 6/20?

@jameshartig
Copy link
Author

I think this might be the other way around, where pulled messages = more modacks issued at the time they are pulled in (to acknowledge receip of these messages).

Good point, yes that's probably what it is.

Just to confirm, this region was processing the data at the same speed (around 400k messages per client outstanding) without issue prior to 6/20?

Looking back we did have a bit of an issue on 6/18/24 starting at 7:00:00 UTC but it wasn't as bad.

Also, we seem to be doing well today (6/26) so far:
image

Other than a small blip when we had a VM get preempted and restarted things seem to be pretty even and working well. I'll update if anything changes later in the day.

@jameshartig
Copy link
Author
jameshartig commented Jun 26, 2024

Update:

Around 20:17:00 UTC there was a jump in redeliveries:
image

I don't see any significant increase in the number of unacked messages around that time:
image

We also didn't have any preemptions or any service restarts but it does seem like at that same time there was a new streaming pull connection opened:
image
(The title of this graph says "by subscription" but its actually "by hostname")
But that seems to happen pretty frequently so I'm not sure if that's related.

Let me know what else I can gather on my end.

@hongalex
Copy link
Member

So it seems like the issue is because of messages expiring. Specifically, messages are being pulled into the client library but not being processed (because concurrency control, MaxOutstandingMessages, is reached). This is supported by the fact that your end to end latency is very high around the time of these expirations (>1 hour). I think server side flow control is working most of the time, but when the stream is torn down and recreated, the server is erroneously delivering more messages. My current hypothesis goes something like

  1. Client reads 400k messages from stream
  2. Client acquires flow control resources for this
  3. Client tries to pull more messages from stream, but seeing that flow control limits are reached, server doesn't deliver any more messages
  4. Stream is restarted and connects to a different server, which forgets about the previous flow control limits for this stream, and delivers another 400k messages, while the messages from step 1 are still being processed

These steps happen randomly and eventually you build up a large number of outstanding messages in your client that can't be processed in time. Since your MaxExtension is set to 2 hours, the client could keep these messages around for a while, while accumulating more messages and resulting in a larger in-memory backlog it won't be able to process. Eventually, these expire and are redelivered.

I'll try to craft a version of the library that includes a separate setting for #callbacks, as promised in #9727, which should let you set num_callbacks to > 400k and thus your application won't get bogged down by messages received but not being processed in these off cases. The long term solution is to fix the behavior in step 4, but that requires a lot more planning.

@jameshartig
Copy link
Author
  1. Stream is restarted and connects to a different server, which forgets about the previous flow control limits for this stream, and delivers another 400k messages, while the messages from step 1 are still being processed

I was thinking something like this was happening but I shrugged it off initially because I assumed the new server would not have any messages to deliver because all of the existing messages in the subscription were already leased by a client (and being modack'd). Or does the stream being killed/restarted invalidate all of the leases being held by the outstanding messages delivered on that stream?

These steps happen randomly and eventually you build up a large number of outstanding messages in your client that can't be processed in time.

I'm also a bit confused why it's working for several hours of the day then suddenly decides to redeliver a bunch of messages despite the stream (seemingly) being restarted constantly throughout the day. Could it be that it's reconnecting to the same forwarder most of the time but when it connects to a different one (in step 4) that causes the issue?

Or I'm trying to see if it's somehow related to MaxOutstandingMessages=400000 and when the number of unacked messages grows higher than 400k we start to have problems. I'm not sure what that might have to do with it given your explanation but it seems too coincidental that things are fine until the unacked messages grows over 400k in the subscription. If I raise the MaxOutstandingMessages to 500k could the problem be delayed further?

@hongalex
Copy link
Member

because I assumed the new server would not have any messages to deliver because all of the existing messages in the subscription were already leased by a client (and being modack'd).

I was assuming you have a constant flow of messages coming into your subscription. Is your publish throughput less sustained and done more in bursts?

Could it be that it's reconnecting to the same forwarder most of the time but when it connects to a different one (in step 4) that causes the issue?

Yeah that's what I'm thinking is happening.

I'm not sure if increasing MaxOutstandingMessages will help here. I think it might be best to try the new version of the library that tweaks callbacks independently (maybe setting that to 500k) and seeing if this issue is better. I'll update here once that's ready.

@hongalex
Copy link
Member
hongalex commented Jun 28, 2024

@jameshartig When you get the chance, can you pull in this version of the library:
go get cloud.google.com/go/pubsub@pubsub-max-callbacks.

This introduces a new setting to ReceiveSettings called MaxCallbacks which configures how many callbacks will be invoked at any given time (superseding the previous way of configuring this via MaxOutstandingMessages). MaxCallbacks defaults to 2 * MaxOutstandingMessages, but that is subject to change.

For your case, I recommend keeping MaxOutstandingMessages to 400k and MaxCallbacks to 500k and seeing if that helps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

2 participants