[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GET /consumers/.../records is neither idempotent nor safe #582

Open
TrueWill opened this issue Aug 13, 2019 · 1 comment
Open

GET /consumers/.../records is neither idempotent nor safe #582

TrueWill opened this issue Aug 13, 2019 · 1 comment

Comments

@TrueWill
Copy link

https://docs.confluent.io/current/kafka-rest/api.html#get--consumers-(string-group_name)-instances-(string-instance)-records appears to have the potential for data loss. If a GET response is lost, the consumer cannot retry the request without the possibility of losing messages.

The only workaround I've found is to immediately create a new consumer instance any time a fetch times out. That would read from the last committed offset.

Proof-of-concept:
(Run against https://github.com/confluentinc/cp-docker-images/tree/5.3.0-post/examples/cp-all-in-one with the appropriate topic created)

#!/bin/sh
host="http://localhost:8082"
contenttype="application/vnd.kafka.v2+json"
messagecontenttype="application/vnd.kafka.json.v2+json"
group="group1000"

echo 'Pre-created compacted topic with 1 partition, 1 replica, and 0 messages.'

echo "\nProducing 4 messages (followed by sleep)..."
curl -i -X POST -H "Content-Type: $messagecontenttype" \
    --data '{"records":[{"key": "A","value":"Value A"},{"key": "B","value":"Value B"},{"key": "C","value":"Value C"},{"key": "D","value":"Value D"}]}' \
    "$host/topics/topic-x1"

sleep 4

echo "\n\nCreating initial consumer (auto.commit.enable is false, auto.offset.reset is earliest)..."
curl -i -X POST -H "Content-Type: $contenttype" \
    -H "Accept: $contenttype" \
    --data '{"name":"instance1000","format":"json","auto.offset.reset":"earliest","auto.commit.enable":"false"}' \
    "$host/consumers/$group"

echo "\n\nSubscribing to topic..."
curl -i -X POST -H "Content-Type: $contenttype" \
    --data '{"topics":["topic-x1"]}' \
    "$host/consumers/$group/instances/instance1000/subscription"

echo "Consuming data..."
curl -i -X GET -H "Accept: $messagecontenttype" \
    "$host/consumers/$group/instances/instance1000/records"

echo "\n\n** OOPS - GET RESPONSE LOST! **"

echo "\nProducing 1 more message (followed by sleep)..."
curl -i -X POST -H "Content-Type: $messagecontenttype" \
    --data '{"records":[{"key": "E","value":"Value E"}]}' \
    "$host/topics/topic-x1"

sleep 4

echo "\n\nRetrying (_exact_ same GET request)..."
echo "Consuming data..."
curl -i -X GET -H "Accept: $messagecontenttype" \
    "$host/consumers/$group/instances/instance1000/records"

echo "\n\nCommitting offset 4 (the only one we know about)..."
curl -i -X POST -H "Content-Type: $contenttype" \
    --data '{"offsets":[{"topic":"topic-x1","partition":0,"offset":4}]}' \
    "$host/consumers/$group/instances/instance1000/offsets"

echo "\n\nUnsubscribing..."
curl -i -X DELETE -H "Accept: $contenttype" \
    "$host/consumers/$group/instances/instance1000/subscription"

echo "Closing initial consumer..."
curl -i -X DELETE -H "Accept: $contenttype" \
    "$host/consumers/$group/instances/instance1000"

echo "********\n"

echo 'Creating new consumer in same group (it will ignore auto.offset.reset since committed offsets exist)...'
curl -i -X POST -H "Content-Type: $contenttype" \
    -H "Accept: $contenttype" \
    --data '{"name":"instance10001","format":"json","auto.offset.reset":"earliest","auto.commit.enable":"false"}' \
    "$host/consumers/$group"

echo "\n\nSubscribing to topic..."
curl -i -X POST -H "Content-Type: $contenttype" \
    --data '{"topics":["topic-x1"]}' \
    "$host/consumers/$group/instances/instance10001/subscription"

echo "Consuming data..."
curl -i -X GET -H "Accept: $messagecontenttype" \
    "$host/consumers/$group/instances/instance10001/records"

echo "\n\n** NOTE - NO RECORDS FOUND! **"

echo "\n\nUnsubscribing..."
curl -i -X DELETE -H "Accept: $contenttype" \
    "$host/consumers/$group/instances/instance10001/subscription"

echo "Closing consumer..."
curl -i -X DELETE -H "Accept: $contenttype" \
    "$host/consumers/$group/instances/instance10001"

echo "Done."
@TrueWill
Copy link
Author

One possible way to handle this is discussed in https://www.databasesandlife.com/idempotency/ under Alternative approaches. The client would include a random request ID with each call. The server would keep a cache keyed on client instance IDs + request ID (items could expire relatively quickly). When retrying, the client would send the same request ID; the server would get a cache hit and would respond with the contents of the cache (instead of talking to Kafka).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant