[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages can be uploaded multiple times to different files #600

Closed
glasser opened this issue Feb 28, 2019 · 7 comments · Fixed by #1351
Closed

Messages can be uploaded multiple times to different files #600

glasser opened this issue Feb 28, 2019 · 7 comments · Fixed by #1351

Comments

@glasser
Copy link
Contributor
glasser commented Feb 28, 2019

I think there's a fundamental problem with the Secor upload algorithm that can lead to messages appearing multiple times in backups.

Let's say you have a consumer running on a single topic/partition, dividing up messages into hourly files. The current committed offset is 11111111111111111111. The messages it finds span an hour break. It has created these two files:

y=2019/m=02/d=28/H=10/1_1_11111111111111111111.gz
y=2019/m=02/d=28/H=11/1_1_11111111111111111111.gz

It decides to upload, takes out the Zookeeper lock, uploads both files, but crashes before updating the Zookeeper committed offset. (In practice this could be "uploads the first few of a large number of files", especially if you're using something like SplitByFieldMessageParser that produces many files not just on hour boundaries.)

A new consumer takes over this topic/partition and starts writing messages locally. It will choose the same filenames for its files. But for whatever reason, this consumer decides it's time to upload faster than the previous iteration did, before it even gets to the second hour worth of messages. It uploads only this file:

y=2019/m=02/d=28/H=10/1_1_11111111111111111111.gz

It successfully updates the ZK offset to 11111111111111199999, and continues reading messages. Now it creates these two files locally and uploads them:

y=2019/m=02/d=28/H=10/1_1_11111111111111199999.gz
y=2019/m=02/d=28/H=11/1_1_11111111111111199999.gz

Now these files all exist in storage:

y=2019/m=02/d=28/H=10/1_1_11111111111111111111.gz (overwritten by the second attempt)
y=2019/m=02/d=28/H=10/1_1_11111111111111199999.gz
y=2019/m=02/d=28/H=11/1_1_11111111111111111111.gz (uploaded by the first attempt)
y=2019/m=02/d=28/H=11/1_1_11111111111111199999.gz

But messages in the third file also appear in the fourth file!

I haven't tried making this happen in practice; this is based on reasoning about the code.

My assumption has been that once Secor has made it well past a timestamp range you can assume that every Kafka message has been uploaded exactly once, but this example seems to break that.

One thought for fixing this: Each topic/partition gets a "current upload" node, which is a list of files in storage. The upload logic (Uploader.uploadFiles) becomes:

  • Take out lock
  • Read current offset from ZK and stop if it doesn't match what we think it should be (no change from present)
  • Read "current upload" file if it exists and delete all files named there from storage.
  • Write "current upload" file with the list of files we're going to upload to storage.
  • Upload files to storage.
  • Transactionally update the offset in ZK and delete "current upload" file.
  • (Also update local state like we do now.)
  • Drop lock

Thoughts?

@glasser
Copy link
Contributor Author
glasser commented Feb 28, 2019

Another fix is to ensure that the upload policy is always deterministic. (I think this is what Kafka Connect S3/GCS does.) So no wallclock-time policies, no upload on shutdown, and maybe no "size on disk after compression" policies. Just things like "number of messages", "size of messages read", maybe "skew in Kafka timestamp from the first one read".

@glasser
Copy link
Contributor Author
glasser commented Feb 28, 2019

Hmm, yeah, I'm not feeling great about the "current upload" file solution either. That's because the ZooKeeper distributed lock isn't magic. And in fact, as twitter-archive/commons#5 once it gets the lock it never tells the app code if the lock is lost.

If a process' connection to ZK gets messy while it's uploading files to storage, it can lose the lock but still continue happily uploading files to storage. It will presumably fail to do the ZK write at the end because of the failed connection (maybe?) but the new process can get the lock while the old one is still doing uploads, and so deleting the files might not be effective. So yeah, determinism it is!

@HenryCaiHaiying
Copy link
Contributor
HenryCaiHaiying commented Mar 1, 2019 via email

@HenryCaiHaiying
Copy link
Contributor
HenryCaiHaiying commented Mar 1, 2019 via email

glasser added a commit to glasser/secor that referenced this issue Mar 1, 2019
This is to avoid pinterest#600.

In this mode, decisions about whether to upload files are *only* based on
properties of the input messages themselves: timestamps and input message
payload size.  We don't care about real-world time, disk file timestamps, or log
file size; we don't support upload on shutdown; and we check for uploads after
every message.

Configuration:

- set secor.upload.deterministic=true
- Configure at least one of secor.max.file.timestamp.range.millis and
  secor.max.input.payload.size.bytes.
- If you've configured secor.max.file.timestamp.range.millis, you must
  set kafka.useTimestamp=true and ensure that your FileReader/FileWriter
  supports timestamps.
glasser added a commit to glasser/secor that referenced this issue Mar 1, 2019
This is to avoid pinterest#600.

In this mode, decisions about whether to upload files are *only* based on
properties of the input messages themselves: timestamps and input message
payload size.  We don't care about real-world time, disk file timestamps, or log
file size; we don't support upload on shutdown; and we check for uploads after
every message.

Configuration:

- set secor.upload.deterministic=true
- Configure at least one of secor.max.file.timestamp.range.millis and
  secor.max.input.payload.size.bytes.
- If you've configured secor.max.file.timestamp.range.millis, you must
  set kafka.useTimestamp=true and ensure that your FileReader/FileWriter
  supports timestamps.
glasser added a commit to glasser/secor that referenced this issue Mar 1, 2019
This is to avoid pinterest#600.

In this mode, decisions about whether to upload files are *only* based on
properties of the input messages themselves: timestamps and input message
payload size.  We don't care about real-world time, disk file timestamps, or log
file size; we don't support upload on shutdown; and we check for uploads after
every message.

Configuration:

- set secor.upload.deterministic=true
- Configure at least one of secor.max.file.timestamp.range.millis and
  secor.max.input.payload.size.bytes.
- If you've configured secor.max.file.timestamp.range.millis, you must
  set kafka.useTimestamp=true and ensure that your FileReader/FileWriter
  supports timestamps.
glasser added a commit to glasser/secor that referenced this issue Mar 1, 2019
This is to avoid pinterest#600.

In this mode, decisions about whether to upload files are *only* based on
properties of the input messages themselves: timestamps and input message
payload size.  We don't care about real-world time, disk file timestamps, or log
file size; we don't support upload on shutdown; and we check for uploads after
every message.

Configuration:

- set secor.upload.deterministic=true
- Configure at least one of secor.max.file.timestamp.range.millis and
  secor.max.input.payload.size.bytes.
- If you've configured secor.max.file.timestamp.range.millis, you must
  set kafka.useTimestamp=true and ensure that your FileReader/FileWriter
  supports timestamps.
glasser added a commit to glasser/secor that referenced this issue Mar 1, 2019
This is to avoid pinterest#600.

In this mode, decisions about whether to upload files are *only* based on
properties of the input messages themselves: timestamps and input message
payload size.  We don't care about real-world time, disk file timestamps, or log
file size; we don't support upload on shutdown; and we check for uploads after
every message.

Configuration:

- set secor.upload.deterministic=true
- Configure at least one of secor.max.file.timestamp.range.millis and
  secor.max.input.payload.size.bytes.
- If you've configured secor.max.file.timestamp.range.millis, you must
  set kafka.useTimestamp=true and ensure that your FileReader/FileWriter
  supports timestamps.
glasser added a commit to glasser/secor that referenced this issue Mar 1, 2019
This is to avoid pinterest#600.

In this mode, decisions about whether to upload files are *only* based on
properties of the input messages themselves: timestamps and input message
payload size.  We don't care about real-world time, disk file timestamps, or log
file size; we don't support upload on shutdown; and we check for uploads after
every message.

Configuration:

- set secor.upload.deterministic=true
- Configure at least one of secor.max.file.timestamp.range.millis and
  secor.max.input.payload.size.bytes.
- If you've configured secor.max.file.timestamp.range.millis, you must
  set kafka.useTimestamp=true and ensure that your FileReader/FileWriter
  supports timestamps.
glasser added a commit to glasser/secor that referenced this issue Mar 1, 2019
This is to avoid pinterest#600.

In this mode, decisions about whether to upload files are *only* based on
properties of the input messages themselves: timestamps and input message
payload size.  We don't care about real-world time, disk file timestamps, or log
file size; we don't support upload on shutdown; and we check for uploads after
every message.

Configuration:

- set secor.upload.deterministic=true
- Configure at least one of secor.max.file.timestamp.range.millis and
  secor.max.input.payload.size.bytes.
- If you've configured secor.max.file.timestamp.range.millis, you must
  set kafka.useTimestamp=true and ensure that your FileReader/FileWriter
  supports timestamps.
@HenryCaiHaiying
Copy link
Contributor

This problem is also solvable when we persist secor's committed offset in ZK we also encode the associated partitions along with it. So instead of just persisting 11111111 into ZK, we persist 11111111_d=28/h=10_d=28/h=11 into the ZNode

@HenryCaiHaiying
Copy link
Contributor

Actually the approach of encoding file names in ZK offset node won't work because committing offset in ZK is the last step of upload. So if the upload failed, that information (list of file names) are not persisted.

I will go back to @glasser 's first suggestion but modify it a little bit (persist the in-memory lastSeenOffset onto a new ZK node path: lastSeenOffsetPath and add step 1.1 and 1.2)

  1. acquire ZK lock for the current topic/partition
    --- new ---> 1.1 check whether the in-memory lastSeenOffset >= ZK's lastSeenOffset, if not skip the rest;
    --- new ---> 1.2 persist the in-memory lastSeenOffset onto ZK lastSeenOffsetPath
  2. upload a list of files for the current topic/partition
  3. persist the current offset to ZK committedOffset path
  4. release ZK lock

HenryCaiHaiying pushed a commit to HenryCaiHaiying/secor that referenced this issue May 26, 2020
…stic upload problem mentioned in pinterest#600

When there was a failed attempt uploading for this topic partition, we might end up on S3 with:
                    //     s3n://topic/partition/day/hour=0/offset1
                    //     s3n://topic/partition/day/hour=1/offset1
If this attempty eventually failed and we resume the processing on another node, we might end up with the following upload if the upload was triggered too early because time based upload policy:
                    // might have less files to upload, e.g.
                    //     localfs://topic/partition/day/hour=0/offset1
                    // If we continue uploading, we will upload this file:
                    //     s3n://topic/partition/day/hour=0/offset1
                    // But the next file to be uploaded will become:
                    //     s3n://topic/partition/day/hour=1/offset2
                    // So we will end up with 2 different files for hour=1/

We should wait a bit longer to have at least getting to the same offset as ZK's

Added config property secor.upload.last.seen.offset to support the following upload sequence:

1. acquire ZK lock for the current topic/partition
--- new ---> 1.1 check whether the in-memory lastSeenOffset >= ZK's lastSeenOffset, if not skip the rest;
--- new ---> 1.2 persist the in-memory lastSeenOffset onto ZK lastSeenOffsetPath
2. upload a list of files for the current topic/partition
3. persist the current offset to ZK committedOffset path
4. release ZK lock
@HenryCaiHaiying
Copy link
Contributor

See PR: #1351

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants