[go: nahoru, domu]

Page MenuHomePhabricator

topicmappr marshal error on kafka-logging cluster
Closed, ResolvedPublic

Description

As part of T369256 I'm rebalancing mediawiki.httpd.accesslog topic on kafka-logging cluster (both eqiad and codfw, though focusing on eqiad since that's where logs are written all the time, codfw is idle)

I wanted to test first topicmappr to equalize throughput, following the instructions here https://wikitech.wikimedia.org/wiki/Kafka/Administration#Rebalance_topic_partitions_to_equalize_broker_storage

kafka-logging1001:~$ prometheus-metricsfetcher 
time="2024-07-16T09:13:21.242Z" level=info msg="Getting broker storage stats from Prometheus"
time="2024-07-16T09:13:21.243Z" level=info msg="Connected to [2620:0:861:101:10:64:0:207]:2181" logger=zk
time="2024-07-16T09:13:21.245Z" level=info msg="authenticated: id=5983579338673291539, timeout=20000" logger=zk
time="2024-07-16T09:13:21.245Z" level=info msg="re-submitting `0` credentials after reconnect" logger=zk
time="2024-07-16T09:13:21.253Z" level=info msg="Broker ID Map: map[kafka-logging1001:9100:1001 kafka-logging1002:9100:1002 kafka-logging1003:9100:1006 kafka-logging1004:9100:1004 kafka-logging1005:9100:1005]"
time="2024-07-16T09:13:21.253Z" level=warning msg="Returned metrics not equal to broker ID override map"
time="2024-07-16T09:13:21.253Z" level=info msg="Getting partition sizes from Prometheus"
time="2024-07-16T09:13:21.299Z" level=info msg="writing data to /kafka/logging-eqiad/topicmappr/partitionmeta"
time="2024-07-16T09:13:21.305Z" level=info msg="writing data to /kafka/logging-eqiad/topicmappr/brokermetrics"
time="2024-07-16T09:13:21.309Z" level=info msg="recv loop terminated: err=EOF" logger=zk
kafka-logging1001:~$ topicmappr rebalance --brokers -2 --topics mediawiki.httpd.accesslog --out-file cluster-rebalance
Error unmarshalling broker metrics: json: cannot unmarshal number logging-hd1001:9100 into Go value of type int

And I'm not quite sure if we've seen this before or where to go next. I'm also confused by logging-hd1001:9100 popping up since that is a opensearch storage host. Maybe @brouberol you have seen this before and/or can assist? thank you !

Event Timeline

I ran

brouberol@kafka-logging1005:~$ prometheus-metricsfetcher -compress=false
time="2024-07-19T12:46:16.346Z" level=info msg="Getting broker storage stats from Prometheus"
time="2024-07-19T12:46:16.347Z" level=info msg="Connected to [2620:0:861:101:10:64:0:207]:2181" logger=zk
time="2024-07-19T12:46:16.349Z" level=info msg="authenticated: id=5983579338673291548, timeout=20000" logger=zk
time="2024-07-19T12:46:16.349Z" level=info msg="re-submitting `0` credentials after reconnect" logger=zk
time="2024-07-19T12:46:16.563Z" level=info msg="Broker ID Map: map[kafka-logging1001:9100:1001 kafka-logging1002:9100:1002 kafka-logging1003:9100:1006 kafka-logging1004:9100:1004 kafka-logging1005:9100:1005]"
time="2024-07-19T12:46:16.563Z" level=warning msg="Returned metrics not equal to broker ID override map"
time="2024-07-19T12:46:16.563Z" level=info msg="Getting partition sizes from Prometheus"
time="2024-07-19T12:46:16.607Z" level=info msg="writing data to /kafka/logging-eqiad/topicmappr/partitionmeta"
time="2024-07-19T12:46:16.611Z" level=info msg="writing data to /kafka/logging-eqiad/topicmappr/brokermetrics"
time="2024-07-19T12:46:16.614Z" level=info msg="recv loop terminated: err=EOF" logger=zk

After which, I was able to inspect the data written to ZooKeeper:

>>> import os
>>> from kazoo.client import KazooClient as ZK
>>> zk = ZK(hosts=os.environ['TOPICMAPPR_ZK_ADDR'])
>>> zk.start()
>>> zk.get('/kafka/logging-eqiad/topicmappr/brokermetrics')[0]
b'{"1001":{"StorageFree":1956792328192},"1002":{"StorageFree":2216329617408},"1004":{"StorageFree":1526096535552},"1005":{"StorageFree":1771090173952},"1006":{"StorageFree":2029153849344},"logging-hd1001:9100":{"StorageFree":1591497449472},"logging-hd1002:9100":{"StorageFree":1606957555712},"logging-hd1003:9100":{"StorageFree":1606618304512},"logstash1026:9100":{"StorageFree":1837025230848},"logstash1027:9100":{"StorageFree":966301868032},"logstash1028:9100":{"StorageFree":1215237312512},"logstash1029:9100":{"StorageFree":1849835425792},"logstash1033:9100":{"StorageFree":2642813374464},"logstash1034:9100":{"StorageFree":2732499357696},"logstash1035:9100":{"StorageFree":3271194734592},"logstash1036:9100":{"StorageFree":1878232752128},"logstash1037:9100":{"StorageFree":2854542331904}}'

You see that prometheus answers with data tagged with pure broker IDs (which can be casted to int), and values such as logstash1034:9100 or logging-hd1003:9100, which looks like a hostname and a prometheus port.

prometheus-metricsfetcher sends a query defined in the KAFKA_KIT_METRICSFETCHER_BROKER_STORAGE_QUERY env var, that is

KAFKA_KIT_METRICSFETCHER_BROKER_STORAGE_QUERY=node_filesystem_avail_bytes{cluster="logstash", mountpoint="/srv"}

and then maps these names to broker ID via

KAFKA_KIT_METRICSFETCHER_BROKER_ID_MAP=kafka-logging1001:9100=1001,kafka-logging1002:9100=1002,kafka-logging1003:9100=1006,kafka-logging1004:9100=1004,kafka-logging1005:9100=1005

The issue here is that logstash1034:9100 and the likes are not mapped to a broker ID. I think the easiest way out here would be to tweak the query to exclude these timeseries, such as this.

brouberol@kafka-logging1005:~$ export KAFKA_KIT_METRICSFETCHER_BROKER_STORAGE_QUERY='node_filesystem_avail_bytes{cluster="logstash", mountpoint="/srv", instance=~"^kafka-.+"}'
brouberol@kafka-logging1005:~$ prometheus-metricsfetcher -compress=false
time="2024-07-19T12:55:34.899Z" level=info msg="Getting broker storage stats from Prometheus"
time="2024-07-19T12:55:34.899Z" level=info msg="Connected to [2620:0:861:101:10:64:0:207]:2181" logger=zk
time="2024-07-19T12:55:34.901Z" level=info msg="authenticated: id=5983579338673291552, timeout=20000" logger=zk
time="2024-07-19T12:55:34.901Z" level=info msg="re-submitting `0` credentials after reconnect" logger=zk
time="2024-07-19T12:55:34.909Z" level=info msg="Broker ID Map: map[kafka-logging1001:9100:1001 kafka-logging1002:9100:1002 kafka-logging1003:9100:1006 kafka-logging1004:9100:1004 kafka-logging1005:9100:1005]"
time="2024-07-19T12:55:34.909Z" level=info msg="Getting partition sizes from Prometheus"
time="2024-07-19T12:55:34.949Z" level=info msg="writing data to /kafka/logging-eqiad/topicmappr/partitionmeta"
time="2024-07-19T12:55:34.954Z" level=info msg="writing data to /kafka/logging-eqiad/topicmappr/brokermetrics"
time="2024-07-19T12:55:34.956Z" level=info msg="recv loop terminated: err=EOF" logger=zk
time="2024-07-19T12:55:34.956Z" level=info msg="send loop terminated: err=<nil>" logger=zk
brouberol@kafka-logging1005:~$ topicmappr rebalance --brokers -2 --topics mediawiki.httpd.accesslog --out-file cluster-rebalance

Topics:
  mediawiki.httpd.accesslog
  mediawiki.httpd.accesslog-sampled

Validating broker list:

Brokers targeted for partition offloading (>= 20.00% threshold below hmean):

Reassignment parameters:
  Ignoring partitions smaller than 512MB
  Free storage mean, harmonic mean: 1773.00GB, 1744.40GB
  Broker free storage limits (with a 61.00% tolerance from mean):
    Sources limited to <= 2854.52GB
    Destinations limited to >= 691.47GB
  -
  Total relocation volume: 0.00GB

Partition map changes:
  mediawiki.httpd.accesslog p0: [1001 1002 1005] -> [1001 1002 1005] no-op
  mediawiki.httpd.accesslog p1: [1004 1001 1002] -> [1004 1001 1002] no-op
  mediawiki.httpd.accesslog p2: [1001 1002 1004] -> [1001 1002 1004] no-op
  mediawiki.httpd.accesslog p3: [1006 1002 1001] -> [1006 1002 1001] no-op
  mediawiki.httpd.accesslog p4: [1001 1006 1002] -> [1001 1006 1002] no-op
  mediawiki.httpd.accesslog p5: [1005 1002 1001] -> [1005 1002 1001] no-op
  mediawiki.httpd.accesslog p6: [1002 1006 1004] -> [1002 1006 1004] no-op
  mediawiki.httpd.accesslog p7: [1006 1004 1005] -> [1006 1004 1005] no-op
  mediawiki.httpd.accesslog p8: [1004 1005 1001] -> [1004 1005 1001] no-op
  mediawiki.httpd.accesslog p9: [1005 1001 1002] -> [1005 1001 1002] no-op
  mediawiki.httpd.accesslog p10: [1001 1006 1004] -> [1001 1006 1004] no-op
  mediawiki.httpd.accesslog p11: [1002 1004 1005] -> [1002 1004 1005] no-op
  mediawiki.httpd.accesslog-sampled p0: [1006 1001 1005] -> [1006 1001 1005] no-op
  mediawiki.httpd.accesslog-sampled p1: [1004 1005 1001] -> [1004 1005 1001] no-op
  mediawiki.httpd.accesslog-sampled p2: [1005 1001 1002] -> [1005 1001 1002] no-op
  mediawiki.httpd.accesslog-sampled p3: [1001 1002 1006] -> [1001 1002 1006] no-op
  mediawiki.httpd.accesslog-sampled p4: [1002 1006 1001] -> [1002 1006 1001] no-op
  mediawiki.httpd.accesslog-sampled p5: [1006 1002 1001] -> [1006 1002 1001] no-op

Broker distribution:
  degree [min/max/avg]: 4/4/4.00 -> 4/4/4.00
  -
  Broker 1001 - leader: 5, follower: 10, total: 15
  Broker 1002 - leader: 3, follower: 10, total: 13
  Broker 1004 - leader: 3, follower: 5, total: 8
  Broker 1005 - leader: 3, follower: 6, total: 9
  Broker 1006 - leader: 4, follower: 5, total: 9

Storage free change estimations:
  range: 642.72GB -> 642.72GB
  range spread: 45.16% -> 45.16%
  std. deviation: 219.01GB -> 219.01GB
  min-max: 1423.35GB, 2066.07GB -> 1423.35GB, 2066.07GB
  -
  Broker 1001: 1826.89 -> 1826.89 (+0.00GB, 0.00%)
  Broker 1002: 2066.07 -> 2066.07 (+0.00GB, 0.00%)
  Broker 1004: 1423.35 -> 1423.35 (+0.00GB, 0.00%)
  Broker 1005: 1654.40 -> 1654.40 (+0.00GB, 0.00%)
  Broker 1006: 1894.28 -> 1894.28 (+0.00GB, 0.00%)

WARN:
  [none]

No partition reassignments, skipping map generation

Note: the -compress=false flag is only used here to be able to visually look at the data from ZK.

Note #2: the rebalance command uses storage data to perform a rebalance. If you're only aiming at moving partition leadership between brokers, you can do that with

brouberol@kafka-logging1005:~$ topicmappr rebuild --optimize-leadership --topics 'mediawiki.httpd.accesslog.*' --brokers -1

Topics:
  mediawiki.httpd.accesslog
  mediawiki.httpd.accesslog-sampled

Broker change summary:
  Replacing 0, added 0, missing 0, total count changed by 0

Action:
  Optimizing leader/follower ratios

Partition map changes:
  mediawiki.httpd.accesslog p0: [1001 1002 1005] -> [1002 1001 1005] preferred leader
  mediawiki.httpd.accesslog p1: [1004 1001 1002] -> [1001 1004 1002] preferred leader
  mediawiki.httpd.accesslog p2: [1001 1002 1004] -> [1004 1001 1002] preferred leader
  mediawiki.httpd.accesslog p3: [1006 1002 1001] -> [1002 1001 1006] preferred leader
  mediawiki.httpd.accesslog p4: [1001 1006 1002] -> [1001 1006 1002] no-op
  mediawiki.httpd.accesslog p5: [1005 1002 1001] -> [1001 1005 1002] preferred leader
  mediawiki.httpd.accesslog p6: [1002 1006 1004] -> [1002 1006 1004] no-op
  mediawiki.httpd.accesslog p7: [1006 1004 1005] -> [1004 1006 1005] preferred leader
  mediawiki.httpd.accesslog p8: [1004 1005 1001] -> [1005 1001 1004] preferred leader
  mediawiki.httpd.accesslog p9: [1005 1001 1002] -> [1005 1001 1002] no-op
  mediawiki.httpd.accesslog p10: [1001 1006 1004] -> [1006 1001 1004] preferred leader
  mediawiki.httpd.accesslog p11: [1002 1004 1005] -> [1005 1004 1002] preferred leader
  mediawiki.httpd.accesslog-sampled p0: [1006 1001 1005] -> [1001 1006 1005] preferred leader
  mediawiki.httpd.accesslog-sampled p1: [1004 1005 1001] -> [1004 1001 1005] preferred leader
  mediawiki.httpd.accesslog-sampled p2: [1005 1001 1002] -> [1001 1005 1002] preferred leader
  mediawiki.httpd.accesslog-sampled p3: [1001 1002 1006] -> [1002 1006 1001] preferred leader
  mediawiki.httpd.accesslog-sampled p4: [1002 1006 1001] -> [1002 1006 1001] no-op
  mediawiki.httpd.accesslog-sampled p5: [1006 1002 1001] -> [1006 1001 1002] preferred leader

Broker distribution:
  degree [min/max/avg]: 4/4/4.00 -> 4/4/4.00
  -
  Broker 1001 - leader: 5, follower: 10, total: 15
  Broker 1002 - leader: 5, follower: 8, total: 13
  Broker 1004 - leader: 3, follower: 5, total: 8
  Broker 1005 - leader: 3, follower: 6, total: 9
  Broker 1006 - leader: 2, follower: 7, total: 9

WARN:
  [none]

New partition maps:
  mediawiki.httpd.accesslog.json
  mediawiki.httpd.accesslog-sampled.json

Note: --brokers -1 means "all brokers currently assigned partitions of the provided topics pattern".

By the way, the previous command will not necessarily produce a balanced result. It will produce the most balance result possible without data movement, which is instantaneous.

If you're keen on having the best possible balance, then you might have to move data around with the --force-rebuild flag:

rol@kafka-logging1005:~$ topicmappr rebuild --optimize-leadership --topics 'mediawiki.httpd.accesslog.*' --brokers -1 --force-rebuild

Topics:
  mediawiki.httpd.accesslog
  mediawiki.httpd.accesslog-sampled

Broker change summary:
  Replacing 0, added 0, missing 0, total count changed by 0

Action:
  Force rebuilding map
  Optimizing leader/follower ratios

Partition map changes:
  mediawiki.httpd.accesslog p0: [1001 1002 1005] -> [1004 1005 1001] replaced broker
  mediawiki.httpd.accesslog p1: [1004 1001 1002] -> [1001 1002 1006] replaced broker
  mediawiki.httpd.accesslog p2: [1001 1002 1004] -> [1002 1001 1004] preferred leader
  mediawiki.httpd.accesslog p3: [1006 1002 1001] -> [1005 1006 1004] replaced broker
  mediawiki.httpd.accesslog p4: [1001 1006 1002] -> [1006 1002 1005] replaced broker
  mediawiki.httpd.accesslog p5: [1005 1002 1001] -> [1004 1002 1006] replaced broker
  mediawiki.httpd.accesslog p6: [1002 1006 1004] -> [1001 1005 1006] replaced broker
  mediawiki.httpd.accesslog p7: [1006 1004 1005] -> [1002 1001 1004] replaced broker
  mediawiki.httpd.accesslog p8: [1004 1005 1001] -> [1005 1001 1004] preferred leader
  mediawiki.httpd.accesslog p9: [1005 1001 1002] -> [1006 1001 1002] replaced broker
  mediawiki.httpd.accesslog p10: [1001 1006 1004] -> [1002 1004 1006] replaced broker
  mediawiki.httpd.accesslog p11: [1002 1004 1005] -> [1004 1001 1005] replaced broker
  mediawiki.httpd.accesslog-sampled p0: [1006 1001 1005] -> [1005 1006 1002] replaced broker
  mediawiki.httpd.accesslog-sampled p1: [1004 1005 1001] -> [1004 1002 1005] replaced broker
  mediawiki.httpd.accesslog-sampled p2: [1005 1001 1002] -> [1001 1006 1005] replaced broker
  mediawiki.httpd.accesslog-sampled p3: [1001 1002 1006] -> [1006 1001 1004] replaced broker
  mediawiki.httpd.accesslog-sampled p4: [1002 1006 1001] -> [1001 1002 1004] replaced broker
  mediawiki.httpd.accesslog-sampled p5: [1006 1002 1001] -> [1002 1005 1006] replaced broker

Broker distribution:
  degree [min/max/avg]: 4/4/4.00 -> 4/4/4.00
  -
  Broker 1001 - leader: 4, follower: 7, total: 11
  Broker 1002 - leader: 4, follower: 7, total: 11
  Broker 1004 - leader: 4, follower: 7, total: 11
  Broker 1005 - leader: 3, follower: 7, total: 10
  Broker 1006 - leader: 3, follower: 8, total: 11

WARN:
  [none]

New partition maps:
  mediawiki.httpd.accesslog.json
  mediawiki.httpd.accesslog-sampled.json

Thank you @brouberol that is super helpful! I'm indeed interested in throughput in this case more than space, I'll be trying what you suggested next week

Mentioned in SAL (#wikimedia-operations) [2024-07-22T08:56:01Z] <godog> rebalance mediawiki.httpd.accesslog partitions across brokers - T370129

Change #1055922 had a related patch set uploaded (by Filippo Giunchedi; author: Filippo Giunchedi):

[operations/puppet@production] kafka: use instance-based selection for kafka-kit storage metrics

https://gerrit.wikimedia.org/r/1055922

Change #1055922 merged by Filippo Giunchedi:

[operations/puppet@production] kafka: use instance-based selection for kafka-kit storage metrics

https://gerrit.wikimedia.org/r/1055922

fgiunchedi claimed this task.

With the patch above we're now using the broker instance label to pick disk space available, thus I'm calling this resolved. Thank you @brouberol !