[go: nahoru, domu]

Page MenuHomePhabricator

Requesting permission to use kafka-main cluster to transport CirrusSearch updates
Closed, ResolvedPublic

Description

Per today's Flink K8s sync, we are requesting permission to use the kafka-main cluster to transport CirrusSearch updates for the Search Update Pipeline redesign.

Quoting @dcausse from the Search Update Pipeline design doc: *

Kafka will hold most of the content required to perform the updates, this means that we will create (at the output of the preparation) events that may be relatively large (up to 4Mb uncompressed). Data shows that the uncompressed payload has a size:

  • average around 20kb
  • p95 under 100kb
  • p99 fluctuating between 1Mb and 4Mb

The kafka-main clusters are configured to accept a record batch size up to 4Mib and we do not plan to change this limit. Compression is likely to be snappy so we could expect a 2:1 compression ratio. There are concerns about using kafka this way but given the use-case of this pipeline (read everything and rarely filter) this seems legitimate. Another alternative would be to have access to a generic content store but there does not seem to be any consensus on this subject yet.

@dcausse , @pfischer et al...feel free to review this and add/change as needed before sending to Service Ops. - Reviewed and ready to send over.

*This content is currently private, but I don't think it holds any sensitive information. I'll ask for approval to make it public.

Event Timeline

bking renamed this task from Requesting permission to use kafka-main to transport CirrusSearch updates to Requesting permission to use kafka-main cluster to transport CirrusSearch updates.Jul 11 2023, 9:09 PM
bking updated the task description. (Show Details)
Gehel triaged this task as High priority.Jul 17 2023, 3:27 PM
Gehel moved this task from needs triage to Current work on the Discovery-Search board.
Gehel edited projects, added Discovery-Search (Current work); removed Discovery-Search.
Gehel moved this task from Incoming to Blocked/Waiting on the Discovery-Search (Current work) board.
Gehel moved this task from Incoming to Blocked / Waiting on the Data-Platform-SRE board.

Can someone please elaborate on what the "generic content store" would need to look like for this project?

Given how much load we have on kafka-main right now, I'd love to explore alternatives.

Could you please also add the expected events/s, requests/s towards the kafka-main cluster?
@elukey is currently in the process of rebalancing the partitions (T341558) on kafka-main - we can probably can't give valid answers before that is completed. Also: Are there plans from your side to deprecate topics you currently have on kafka-main with this (if so, is there a timeframe)?

@JMeybohm, @dcausse and I put together a summary of topics that will eventually become obsolete, and those that will be added [1]. Is that sufficient for your estimates?

Regarding the time frame, I would expect the incremental rollout to take two quarters.

A two quarters rollout would mean that we can't really wait for the removal of the old topics; this would be, for the span of 6 months, a straight addition to the load of kafka-main.

I think we need to wait for Luca to finish his rebalancing job before we give an answer about this; once he's done we'll know more about our ability to add more traffic to the cluster without excessive risks.

The stability of the kafka main cluster is now way better, they are not totally rebalanced but this case can be evaluated in my opinion.

Hi folks!

I had a chat with Joe and the plan looks good, +1 to proceed.

A few considerations to keep in mind:

  • Please use batching in the producer config, we discovered recently that not using it impacts the whole performance of Kafka Brokers. The bigger the batch the better, but of course this depends on your requirements. No strict number is requested of course :)
  • Since you are going to introduce a couple of new topics with a msg/s rate that belongs to the TOP 10 what we have now, please check metrics like RequestHandlerAvgIdlePercent and Produce request time after enabling traffic, so that we can understand the impact to the brokers.
  • Please use 5 topic partitions for the new big topics, so that the traffic will be split evenly among all brokers. If you don't know how to do it I can help in creating the topics :)

@elukey, thank you for your work on rebalancing and optimising kafka! We're glad to hear that our infrastructure will be able to support our topics. However, we still have three options to choose from, though [1]:

#descriptiontopic size (in GB, 7 day retention)
0all-fat events11485.15
1some-fat events1810.17
2all-lean events661.35

The trade-off is between load and consistency between MW and ES. Option 0 is the most consistent as all page contents are fetched before writing to kafka. Option 2 is the least consistent as page content is fetched after kafka but right before writing to ES. What option would you be able (and willing) to support?

[1] https://docs.google.com/spreadsheets/d/1Fp44MdLxUVlxi03MBD_64m0zQErny-9jUD5C6RGf_bU/edit#gid=0

@pfischer it is not clear from the google spreadsheet what is the architecture that you have in mind, so it is difficult to judge - Does option 0 mean that you'd fetch all contents like page_content_change and store them in the same topic (a sort of aggregator) ? If so the option doesn't seem great, at first reading it seems more an antipattern but I don't have enough data to judge, it may be totally acceptable :)

On the raw numbers that you provided - option 0 seems to list 11.5 TB of total space required, that is around 2.3TB for every broker if we split the topic in 5 partitions, way too much for the current available space on our nodes:

elukey@kafka-main1001:~$ df -h
Filesystem           Size  Used Avail Use% Mounted on
...
/dev/mapper/vg0-srv  3.3T  1.2T  1.9T  39% /srv

Option two, with 5 partitions, would be around 360GB for each node, that is still a lot but way more manageable.

Edit: One thing that I missed is that there is also the 3x replication factor (for each partition), so my estimations above are not correct, the requirements would be more.

I am uneasy suggesting one of the type of events without a clearer picture of what changes in what case.

I can say from the numbers that while the first two options are clearly unfeasible as they'd make our storage in kafka grow too much, the third is also somewhat worrisome in terms of size in the queue, and it's not clear to me what the side effects would be. Can you elaborate on that?

In terms of retention, why did you chose 7 days? Do we have a corresponding SLO for the service?

@elukey, @Joe, thank you for your feedback! I revisited the size estimations, here are the updated numbers:

#descriptionraw topic size (in GB, 7 day retention)per broker topic size (in GB, 7 day retention, 5 partitions à 3 replicas)
0all-fat events114856891
1some-fat events18101086
2all-lean events661397

We do not have final SLOs for the search update pipeline yet. As of now, we only want to guarantee an update lag of less then 10 minutes. If we assume 99% availability of our application, that would allow ~4 days outage per year. If they coincide with a long weekend, 7 days should give enough buffer to fix things and to catch up. 6 might be doable, too, but that would only save us 40GB for option 2. That said, the retention is only for our internal topic, that is used to decouple the two parts of our flink application. The producing side is based on topics already existing (except for one). So if option 2 still requires too much storage, we can go down with the retention and have to replay all the incoming topics instead, to recover after an outage.

Edit: One thing that I missed is that there is also the 3x replication factor (for each partition), so my estimations above are not correct, the requirements would be more.

I added a column with the effectively required storage per broker.

I am uneasy suggesting one of the type of events without a clearer picture of what changes in what case.

I can say from the numbers that while the first two options are clearly unfeasible as they'd make our storage in kafka grow too much, the third is also somewhat worrisome in terms of size in the queue, and it's not clear to me what the side effects would be. Can you elaborate on that?

The existing system that is being replaced currently does around 1k messages/sec and has a topic size of around 1.45TB, the suggested option 1 is ~650 messages/sec with a topic size of 1.8TB (per the spreadsheet, I haven't personally dug into those). Different but not a monumental change. Option 2 is a reduction in kafka usage vs the existing deployment. Option 0 isn't really an option, but included to understand the impact of option 1 and 2.

The main difference between these options is where, when, and how often the mediawiki api is invoked to fetch the document to index (example). There are two main options for where to perform these api requests, Option 0 is to perform the api calls in the central flink application and write the response into the intermediate messages, or Option 2 is to fetch in the per-cluster flink application that actually pushes the updates into elasticsearch. Option 1 is the middleground where we do Option 0 for direct page edits, and Option 2 for cascaded updates/rerenders.

Tradeoffs made:

  • While we request from mediawiki the cirrusdoc for a specific revision id, not all of the embedded content is strictly based on that revision (redirects, templated content, etc.). Fetching centrally will ensure all clusters have the same doc, while fetching per-cluster creates some ability for the clusters to have slightly different data. In theory this works itself out eventually as rerenders due to template or redirect changes work through the system. But it has less guarantees and consistency between clusters.
  • Fetching the cirrusdoc on a per-cluster basis will be 3x as many api calls. The expense of these calls varies, they need to process the parser output along with pull a variety of metadata out of sql. Plausibly these calls could be made cachable and we could fetch them through an http cache. The pre-existing system in mediawiki is a hybrid approach that is the equivalent of 2x api calls (eqiad and codfw share results, cloudelastic processes independantly. They used to all be independant but changeprop was running into throughput problems so we now share).

Unkowns:

  • We have talked about running a copy of the main flink application per datacenter, with each datacenter operating independantly. We've also talked about running a single copy of the main flink application and letting consumers in each datacenter read it through kafka replication (or kafka stretch). I think we are currently leaning towards a single central flink application, because per-datacenter would be twice as many messages. But I'm not certain.

In terms of retention, why did you chose 7 days? Do we have a corresponding SLO for the service?

We don't have a strict SLO defined here. At a very high level we are talking about starting at 0.99 over a year, which allows for 3.65 days of downtime. In general it would be nice if the system could run into problems late on a friday and not require attention over even a three day weekend. This suggests 4 days of retention would be too little. 7 is arbitrary but felt reasonable. Perhaps we could come up with a method by which the main application has it's topic positions set back in time to let it regenerate the data that fell out of retention, but is plausibly more tedious and error prone, and would require spare kafka capacity to be available during recovery but not used in normal operations.

Retention plays into re-indexing. When recreating an index it can take more than a day to copy the contents of the old index into the new index on commonswiki and wikidata. Once the new index is ready we need to reprocess the updates that occured while reindexing was running. This probably only sets a minimum retention of 2-ish days.

Retention similarly plays into debuggability. When a ticket is filed that some edit occured but didn't make it into the search systems (ex: T342593) being able to look through the updates is helpful. For debugging it doesn't have to stay in kafka though, a copy in hdfs would be equally (perhaps more) viable.

This comment was removed by bking.

I am uneasy suggesting one of the type of events without a clearer picture of what changes in what case.

I can say from the numbers that while the first two options are clearly unfeasible as they'd make our storage in kafka grow too much, the third is also somewhat worrisome in terms of size in the queue, and it's not clear to me what the side effects would be. Can you elaborate on that?

The existing system that is being replaced currently does around 1k messages/sec and has a topic size of around 1.45TB, the suggested option 1 is ~650 messages/sec with a topic size of 1.8TB (per the spreadsheet, I haven't personally dug into those). Different but not a monumental change. Option 2 is a reduction in kafka usage vs the existing deployment. Option 0 isn't really an option, but included to understand the impact of option 1 and 2.

I have different numbers from the grafana dashboard - the total storage space used by the cirrus search topics is usually well below 1 TB - which can be verified by evaluating the prometheus query sum (kafka_log_Size{kafka_cluster=~"main-eqiad", topic=~"eqiad\\.mediawiki\\.job\\.cirrus.*"}) . So in that hypothesis you would be doubling your current occupation and grow the total storage on the cluster by 40%

The main difference between these options is where, when, and how often the mediawiki api is invoked to fetch the document to index (example). There are two main options for where to perform these api requests, Option 0 is to perform the api calls in the central flink application and write the response into the intermediate messages, or Option 2 is to fetch in the per-cluster flink application that actually pushes the updates into elasticsearch. Option 1 is the middleground where we do Option 0 for direct page edits, and Option 2 for cascaded updates/rerenders.

The advantage of option 2 is that you're making read-only requests, so we can dispatch that api request to the local datacenter, and will reduce the amount of cross-dc traffic produced. The obvious advantage of option 0 is an overall smaller footprint.

I must also ask: does it make sense to re-index content when there is a cascaded update for a template change? This is a fundamental question and I think the answer is "generally no", but I guess you have done some research on the topic.
Keep in mind that at the moment we're not re-generating parsercache for re-renders because it was deemed too expensive, so every api call for a re-render risks being very expensive to us, or worse receive outdated content. If at all possible, we should avoid updating search for re-renders / template transclusions.

If we can avoid re-rendering all the time for such changes, I would go with option 0 without doubt. So I would be ok with option 1 I guess?

Tradeoffs made:

  • While we request from mediawiki the cirrusdoc for a specific revision id, not all of the embedded content is strictly based on that revision (redirects, templated content, etc.). Fetching centrally will ensure all clusters have the same doc, while fetching per-cluster creates some ability for the clusters to have slightly different data. In theory this works itself out eventually as rerenders due to template or redirect changes work through the system. But it has less guarantees and consistency between clusters.

Is there a reason why more consistency is desirable? If not, coordinating things cross-dc has a non-negligible cost and should definitely be avoided as much as possible. I should also note that if you want consistency, you also need a process to check such consistency and a reconciliation method. Also: the data coming out of MediaWiki can also be lacking data. I don't think consistency across datacenters is really our major concern here, but even if it is, centralizing querying of the API is just one step in that direction. I think there are other reasons why centralization might be desirable.

  • Fetching the cirrusdoc on a per-cluster basis will be 3x as many api calls. The expense of these calls varies, they need to process the parser output along with pull a variety of metadata out of sql. Plausibly these calls could be made cachable and we could fetch them through an http cache. The pre-existing system in mediawiki is a hybrid approach that is the equivalent of 2x api calls (eqiad and codfw share results, cloudelastic processes independantly. They used to all be independant but changeprop was running into throughput problems so we now share).

Unkowns:

  • We have talked about running a copy of the main flink application per datacenter, with each datacenter operating independantly. We've also talked about running a single copy of the main flink application and letting consumers in each datacenter read it through kafka replication (or kafka stretch). I think we are currently leaning towards a single central flink application, because per-datacenter would be twice as many messages. But I'm not certain.

I'm not sure I fully grasp the model, do you have any diagram showing the two alternatives? Or a less succinct explanation? I kinda think this model doesn't fit well with multi-dc but I just might have completely misunderstood it.

So to boil down to my recommendations if you want to keep the events in flink:

  • While it might be ok to grow our kafka clusters topic size by 40%, it's a very significant change and without expansion/refresh of the cluster I would recommend tuning the space occupancy with reduced retention at least until we're able to dismiss the old jobs. Once that's possible, the growth would be of "just" 20% compared to what we have now, and we can go back to 7 days of retention.
  • I would try to consider if there's a way to reduce the amount of updates we make to the search indexes based on template transclusions and re-renders.

@Joe, thank you for your feedback!

I'm not sure I fully grasp the model, do you have any diagram showing the two alternatives? Or a less succinct explanation? I kinda think this model doesn't fit well with multi-dc but I just might have completely misunderstood it.

Search Update Pipeline - Multi DC.png (840×1 px, 176 KB)
Google Draw Document

I have different numbers from the grafana dashboard - the total storage space used by the cirrus search topics is usually well below 1 TB - which can be verified by evaluating the prometheus query sum (kafka_log_Size{kafka_cluster=~"main-eqiad", topic=~"eqiad\\.mediawiki\\.job\\.cirrus.*"}) . So in that hypothesis you would be doubling your current occupation and grow the total storage on the cluster by 40%

This is missing eqiad.cpjobqueue.partitioned.mediawiki.job.cirrusSearchElasticaWrite which is another ~450GB in the current deployment. If I update the query to sum (kafka_log_Size{kafka_cluster=~"main-eqiad", topic=~"eqiad\\.(cpjobqueue\\.partitioned\\.)?mediawiki\\.job\\.cirrus.*"}) I get a current size of around 1.5TB.

The main difference between these options is where, when, and how often the mediawiki api is invoked to fetch the document to index (example). There are two main options for where to perform these api requests, Option 0 is to perform the api calls in the central flink application and write the response into the intermediate messages, or Option 2 is to fetch in the per-cluster flink application that actually pushes the updates into elasticsearch. Option 1 is the middleground where we do Option 0 for direct page edits, and Option 2 for cascaded updates/rerenders.

The advantage of option 2 is that you're making read-only requests, so we can dispatch that api request to the local datacenter, and will reduce the amount of cross-dc traffic produced. The obvious advantage of option 0 is an overall smaller footprint.

I must also ask: does it make sense to re-index content when there is a cascaded update for a template change? This is a fundamental question and I think the answer is "generally no", but I guess you have done some research on the topic.
Keep in mind that at the moment we're not re-generating parsercache for re-renders because it was deemed too expensive, so every api call for a re-render risks being very expensive to us, or worse receive outdated content. If at all possible, we should avoid updating search for re-renders / template transclusions.

If we can avoid re-rendering all the time for such changes, I would go with option 0 without doubt. So I would be ok with option 1 I guess?

I suppose we don't have hard data about how much the search index changes based on template transclusions, and how much editors depend (or not) on the template updates. At a high level some editors use search for generating lists of work and depend on the items leaving the search results (somedays after waiting a day or more for the recursive jobs to process). Index updates have been attached to these as long as I can remember and we've never really considered disabling them. I imagine we'll have to evaluate this a bit, I can't immediatly provide any specific data here.

Tradeoffs made:

  • While we request from mediawiki the cirrusdoc for a specific revision id, not all of the embedded content is strictly based on that revision (redirects, templated content, etc.). Fetching centrally will ensure all clusters have the same doc, while fetching per-cluster creates some ability for the clusters to have slightly different data. In theory this works itself out eventually as rerenders due to template or redirect changes work through the system. But it has less guarantees and consistency between clusters.

Is there a reason why more consistency is desirable? If not, coordinating things cross-dc has a non-negligible cost and should definitely be avoided as much as possible. I should also note that if you want consistency, you also need a process to check such consistency and a reconciliation method. Also: the data coming out of MediaWiki can also be lacking data. I don't think consistency across datacenters is really our major concern here, but even if it is, centralizing querying of the API is just one step in that direction. I think there are other reasons why centralization might be desirable.

The question of what level of consistency is desirable is a bit tough to answer. We don't have any particular strict requirements that I'm aware of, like so many things what we have is the state of expected behaviour that editors are used to. Indeed there isn't anything that strictly requires consistency, but as an update process to a secondary data source it seems an important part of the problem is ensuring the primary and secondary data sources contain the same data. If different clusters can have different data then it is certain that one of those two doesn't match the current state of the primary. I think this is a case where more consistency would be a nice property to have, but not required and can certainly give to other needs.

Within the existing deployment we have a process called the Saneitizer (which keeps indices sane) which does consistency checking and issues jobs to perform reconcilliation between the sql databases and the search clusters. Due to the total number of pages this is a slow process, pages are checked around twice a month. This is also relatively limited, ensuring pages are in the correct index with the expected revision id. We've talked a bit but we don't yet have a fleshed out answer for what the complement to Saneitizer is in the streaming updater world. We don't have plans to regularly compare consistency between search clusters afaik, although we do have a script that does it that we've used for debugging in the past.

  • Fetching the cirrusdoc on a per-cluster basis will be 3x as many api calls. The expense of these calls varies, they need to process the parser output along with pull a variety of metadata out of sql. Plausibly these calls could be made cachable and we could fetch them through an http cache. The pre-existing system in mediawiki is a hybrid approach that is the equivalent of 2x api calls (eqiad and codfw share results, cloudelastic processes independantly. They used to all be independant but changeprop was running into throughput problems so we now share).

Unkowns:

  • We have talked about running a copy of the main flink application per datacenter, with each datacenter operating independantly. We've also talked about running a single copy of the main flink application and letting consumers in each datacenter read it through kafka replication (or kafka stretch). I think we are currently leaning towards a single central flink application, because per-datacenter would be twice as many messages. But I'm not certain.

I'm not sure I fully grasp the model, do you have any diagram showing the two alternatives? Or a less succinct explanation? I kinda think this model doesn't fit well with multi-dc but I just might have completely misunderstood it.

I'm terrible at diagrams, but perhaps i can be more clear. Also I think I misspoke above, there are twice as many messages produced by our application when running with an active producer's per-datacenter, but functionally it's the same as long as we make sure mirrormaker doesn't copy the topics between datacenters.

  • In both possible solutions:
    • Three consumer instances, one for each elasticsearch cluster, running in the same dc as the cluster (one in codfw, two in eqiad).
  • If we were to use kafka stretch or mirrormaker:
    • One producer instance running in each datacenter reading mediawiki events from it's own datacenter. Functionally this means only one producer is generating updates, since only one datacenter is generating mediawiki events.
    • Infrastructure responsible for delivering events from the producer to consumers in other datacenters
    • Consumers read from topics, potentially mirrored over, initially produced by whichever datacenter is triggering the mediawiki events
  • If we were to avoid allowing our topics to be mirrored.
    • Infrastructure responsible for delivering events from mediawiki to the producers in each datacenter.
    • One producer instance running in each datacenter reading mediawiki events from all datacenters
    • Topic's emitted from the producer are not mirrored between datacenters
    • Consumers read from topics initially produced in their own datacenter

So to boil down to my recommendations if you want to keep the events in flink:

  • While it might be ok to grow our kafka clusters topic size by 40%, it's a very significant change and without expansion/refresh of the cluster I would recommend tuning the space occupancy with reduced retention at least until we're able to dismiss the old jobs. Once that's possible, the growth would be of "just" 20% compared to what we have now, and we can go back to 7 days of retention.
  • I would try to consider if there's a way to reduce the amount of updates we make to the search indexes based on template transclusions and re-renders.

This all sounds quite reasonable, thanks for the review.

DECISION (as discussed in synchronous meeting):

  • Reading bulk data is done from the consumer (at the end of the pipeline), increasing reads for each ES cluster (all-lean-events)
  • We use kafka-main to store intermediate events
  • Each Flink enricher consumes only events from the local DC
  • Each Flink indexer (3 instances: eqiad / codfw / cloudelastic) consumes replicated streams from both DC and updates its Elasticsearch instance