[go: nahoru, domu]

Page MenuHomePhabricator

Create a generic stream to populate CirrusSearch weighted_tags
Open, In Progress, HighPublic

Description

It would be useful to have a stream that supports the addition and deletion of CirrusSearch weighted_tags.
The stream would allow users willing to tag/un-tag pages in the search index to simply emit events to this stream.

There might be 2 different use-cases to support:

  • realtime processes bound to the lifecycle of the page
  • batch processes possibly sending a large number of modification

We might consider exposing 2 different streams giving us the opportunity to route or throttle the events accordindly:

  • events bound to the lifecycle of the page might enter the merge window of the SUP producer so that they get a chance to be joined with other events related to the same edit
  • events produced in batch might skip that window and possibly be throttled (if deemed necessary) to limit the impact on latencies of the realtime events.

AC:

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript

From a SUP perspective this would replace all sources of weighted tags (config option: stream name):

  • article-topic-stream: mediawiki.page_outlink_topic_prediction_change.v1
  • draft-topic-stream: mediawiki.revision_score_drafttopic
  • recommendation-create-stream: mediawiki.revision-recommendation-create

Regarding rate limiting: Is there a way to rate-limit a kafka source itself? The only option I see would be to wrap a KafkaSource (flink-connector-kafka) in a custom RateLimitedKafkaSource (to be created) that forwards all calls, only the createReader method's response would be wrapped in a RateLimitedSourceReader (flink-core). But I don't know how that effects back pressure. At least, it would be confusing if the back pressure metric goes up due to the rate limiting.

public class RateLimitedKafkaSource<O> implements Source<O, KafkaPartitionSplit, KafkaSourceEnumState> {
    private final KafkaSource<O> delegate;
    private final RateLimiterStrategy rateLimiterStrategy;

    RateLimitedKafkaSource(KafkaSource<O> delegate, RateLimiterStrategy rateLimiterStrategy) {
        this.delegate = delegate;
        this.rateLimiterStrategy = rateLimiterStrategy;
    }

    @Override
    @Internal
    public SourceReader<O, KafkaPartitionSplit> createReader(SourceReaderContext readerContext)
            throws Exception {
        return new RateLimitedSourceReader<>(
                delegate.createReader(readerContext),
                rateLimiterStrategy.createRateLimiter(readerContext.currentParallelism()));
    }

    /* … remaining delegate methods … */
}
env.fromSource(new RateLimitedKafkaSource<>(source, RateLimiterStrategy.perSecond(10)), /* … */)

From a SUP perspective this would replace all sources of weighted tags (config option: stream name):

  • article-topic-stream: mediawiki.page_outlink_topic_prediction_change.v1
  • draft-topic-stream: mediawiki.revision_score_drafttopic
  • recommendation-create-stream: mediawiki.revision-recommendation-create

Ideally yes but it is unclear if these producers will be willing to support our new stream, so we might need to keep those.

Regarding rate limiting: Is there a way to rate-limit a kafka source itself? The only option I see would be to wrap a KafkaSource (flink-connector-kafka) in a custom RateLimitedKafkaSource (to be created) that forwards all calls, only the createReader method's response would be wrapped in a RateLimitedSourceReader (flink-core). But I don't know how that effects back pressure. At least, it would be confusing if the back pressure metric goes up due to the rate limiting.

public class RateLimitedKafkaSource<O> implements Source<O, KafkaPartitionSplit, KafkaSourceEnumState> {
    private final KafkaSource<O> delegate;
    private final RateLimiterStrategy rateLimiterStrategy;

    RateLimitedKafkaSource(KafkaSource<O> delegate, RateLimiterStrategy rateLimiterStrategy) {
        this.delegate = delegate;
        this.rateLimiterStrategy = rateLimiterStrategy;
    }

    @Override
    @Internal
    public SourceReader<O, KafkaPartitionSplit> createReader(SourceReaderContext readerContext)
            throws Exception {
        return new RateLimitedSourceReader<>(
                delegate.createReader(readerContext),
                rateLimiterStrategy.createRateLimiter(readerContext.currentParallelism()));
    }

    /* … remaining delegate methods … */
}
env.fromSource(new RateLimitedKafkaSource<>(source, RateLimiterStrategy.perSecond(10)), /* … */)

I'm not clear how rate limiting could be applied but I believe that you're right that it must happen at the source level otherwise it'll most certainly affect back-pressure.
It is not clear as well if rate limiting will be required, we might have to test how the pipeline behaves in such conditions and see if our SLO related to the update lag are affected during bulk load. Flink used to have ratelimiting built in their old kafka connector but it was dropped in the new kafka source API, this seems to suggest that they no longer consider ratelimiting as a valid use-case for flink.

Gehel triaged this task as High priority.Jun 10 2024, 3:39 PM
Gehel moved this task from needs triage to ML & Data Pipeline on the Discovery-Search board.
pfischer changed the task status from Open to In Progress.Thu, Jul 18, 12:19 PM
pfischer claimed this task.

Change #1055226 had a related patch set uploaded (by Peter Fischer; author: Peter Fischer):

[schemas/event/primary@master] Introducing cirrussearch/weighted_tags

https://gerrit.wikimedia.org/r/1055226