diff --git a/apisix/plugins/error-log-logger.lua b/apisix/plugins/error-log-logger.lua index f2028d4f5269..a0a364564a27 100644 --- a/apisix/plugins/error-log-logger.lua +++ b/apisix/plugins/error-log-logger.lua @@ -123,6 +123,7 @@ local metadata_schema = { -- in lua-resty-kafka, cluster_name is defined as number -- see https://github.com/doujiang24/lua-resty-kafka#new-1 cluster_name = {type = "integer", minimum = 1, default = 1}, + meta_refresh_interval = {type = "integer", minimum = 1, default = 30}, }, required = {"brokers", "kafka_topic"}, }, @@ -370,6 +371,7 @@ local function send_to_kafka(log_message) broker_config["request_timeout"] = config.timeout * 1000 broker_config["producer_type"] = config.kafka.producer_type broker_config["required_acks"] = config.kafka.required_acks + broker_config["refresh_interval"] = config.kafka.meta_refresh_interval * 1000 -- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions of messages in kafka local prod, err = kafka_prod_lrucache(plugin_name, metadata.modifiedIndex, diff --git a/docs/en/latest/plugins/error-log-logger.md b/docs/en/latest/plugins/error-log-logger.md index 6e4db909331d..a774a74c689a 100644 --- a/docs/en/latest/plugins/error-log-logger.md +++ b/docs/en/latest/plugins/error-log-logger.md @@ -60,6 +60,7 @@ It might take some time to receive the log data. It will be automatically sent a | kafka.required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. | | kafka.key | string | False | | | Key used for allocating partitions for messages. | | kafka.cluster_name | integer | False | 1 | [0,...] | Name of the cluster. Used when there are two or more Kafka clusters. Only works if the `producer_type` attribute is set to `async`. | +| kafka.meta_refresh_interval | integer | False | 30 | [1,...] | `refresh_interval` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) specifies the time to auto refresh the metadata, in seconds.| | timeout | integer | False | 3 | [1,...] | Timeout (in seconds) for the upstream to connect and send data. | | keepalive | integer | False | 30 | [1,...] | Time in seconds to keep the connection alive after sending data. | | level | string | False | WARN | ["STDERR", "EMERG", "ALERT", "CRIT", "ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"] | Log level to filter the error logs. `ERR` is same as `ERROR`. | diff --git a/docs/zh/latest/plugins/error-log-logger.md b/docs/zh/latest/plugins/error-log-logger.md index d0e5af184526..50b9cc370042 100644 --- a/docs/zh/latest/plugins/error-log-logger.md +++ b/docs/zh/latest/plugins/error-log-logger.md @@ -59,6 +59,7 @@ description: API 网关 Apache APISIX error-log-logger 插件用于将 APISIX | kafka.required_acks | integer | 否 | 1 | [0, 1, -1] | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka `acks` 属性相同,具体配置请参考 [Apache Kafka 文档](https://kafka.apache.org/documentation/#producerconfigs_acks)。 | | kafka.key | string | 否 | | | 用于消息分区而分配的密钥。 | | kafka.cluster_name | integer | 否 | 1 | [0,...] | Kafka 集群的名称,当有两个及以上 Kafka 集群时使用。只有当 `producer_type` 设为 `async` 模式时才可以使用该属性。| +| kafka.meta_refresh_interval | integer | 否 | 30 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `refresh_interval` 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。 | | timeout | integer | 否 | 3 | [1,...] | 连接和发送数据超时间,以秒为单位。 | | keepalive | integer | 否 | 30 | [1,...] | 复用连接时,连接保持的时间,以秒为单位。 | | level | string | 否 | WARN | | 进行错误日志筛选的级别,默认为 `WARN`,取值 ["STDERR", "EMERG", "ALERT", "CRIT", "ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"],其中 `ERR` 与 `ERROR` 级别一致。 | diff --git a/t/plugin/error-log-logger-kafka.t b/t/plugin/error-log-logger-kafka.t index bb20d17c3204..afae2a5589c2 100644 --- a/t/plugin/error-log-logger-kafka.t +++ b/t/plugin/error-log-logger-kafka.t @@ -74,6 +74,14 @@ done === TEST 2: put plugin metadata and log an error level message - no auth kafka +--- extra_init_by_lua + local core = require("apisix.core") + local producer = require("resty.kafka.producer") + local old_producer_new = producer.new + producer.new = function(self, broker_list, producer_config, cluster_name) + core.log.info("broker_config is: ", core.json.delay_encode(producer_config)) + return old_producer_new(self, broker_list, producer_config, cluster_name) + end --- config location /t { content_by_lua_block { @@ -87,7 +95,8 @@ done "host": "127.0.0.1", "port": 9092 }], - "kafka_topic": "test2" + "kafka_topic": "test2", + "meta_refresh_interval": 1 }, "level": "ERROR", "inactive_timeout": 1 @@ -99,7 +108,9 @@ done } --- error_log eval [qr/this is a error message for test2/, -qr/send data to kafka: .*test2/] +qr/send data to kafka: .*test2/, +qr/broker_config is: \{.*"refresh_interval":1000/, +] --- wait: 3