[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer max_bytes param does not return any messages if too small #111

Open
kjvalencik opened this issue Sep 21, 2015 · 2 comments
Open

Comments

@kjvalencik
Copy link

If max_bytes is set smaller than the size of a single message, it will return zero messages. Example:

curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \
      --data '{"records":[{"value":"S2Fma2E="}]}' "http://localhost:8082/topics/test"

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
      --data '{"id": "my_instance", "format": "binary", "auto.offset.reset": "smallest"}' \
      http://localhost0:8082/consumers/my_binary_consumer

curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \
      http://localhost:8082/consumers/my_binary_consumer/instances/my_instance/topics/test?max_bytes=1

Expected: Should return a single message.

@gawth
Copy link
gawth commented Nov 26, 2015

I have also been looking at this functionality to support long polling.

I think the behaviour that you describe above is as per "spec" in that to send a larger message through Kafka Rest would have to exceed the max message you have specified. The tests certainly seem to back that up. Would be great if one of the core maintainers comment on this...

Having said that, its rather inconvenient as I want the same behaviour that you describe - a sort of byte threshold rather than max.

Finally, when investigating this I have discovered a bug. The following code is the loop that handles reading messages from kafka. The loop correctly breaks when the max bytes is exceeded.

        while (iter.hasNext()) {
          MessageAndMetadata<KafkaK, KafkaV> msg = iter.peek();
          ConsumerRecordAndSize<ClientK, ClientV> recordAndSize = parent.createConsumerRecord(msg);
          long roughMsgSize = recordAndSize.getSize();
          if (bytesConsumed + roughMsgSize > maxResponseBytes) {
            break; // Loop breaks here due to max bytes being exceeded
          }

          iter.next();
          messages.add(recordAndSize.getRecord());
          bytesConsumed += roughMsgSize;
          // Updating the consumed offsets isn't done until we're actually going to return the
          // data since we may encounter an error during a subsequent read, in which case we'll
          // have to defer returning the data so we can return an HTTP error instead
        }

But then later on the check to determine if the processing of messages has finished the check fails to take into account the roughMsgSize and so will not call finish. The only way finish can be called is for the timeout to elapse. So, for a long poll example of 30s the messages won't be picked up until that 30s has elapsed...

   if (elapsed >= requestTimeoutMs || bytesConsumed >= maxResponseBytes) {
        finish();
      }

I'll be submitting a patch to fix this particular issue in the next few days.

Thanks,
Alan

@kjvalencik
Copy link
Author

#155

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants