Kafka
Kafka 是一种高吞吐量的分布式发布订阅消息系统。
Cloud Insight 通过 JMX 来获取数据监控 Kafka 客户端生产端消息请求数,处理时间等数据来可视化性能。
性能指标
Cloud Insight 采集 Kafka 以下性能指标:
指标 | 单位 | 具体含义 |
---|---|---|
kafka.broker_offset | offsets | broker 上当前消息的偏移量(offset) |
kafka.consumer.bytes_in | bytes/second | consumer 字节率(bytes in rate) |
kafka.consumer.delayed_requests | requests | 延迟的 consumer 请求数 |
kafka.consumer.expires_per_second | evictions/second | 延迟 consumer 的请求到期(expiration)速率 |
kafka.consumer.fetch_rate | requests | consumer 向 broker 发送提取请求(fetch requests)的最低速率 |
kafka.consumer.kafka_commits | writes/second | 面向 Kafka 的 offset commits 速率 |
kafka.consumer.max_lag | offsets | 最大消费滞后(consumer lag) |
kafka.consumer.messages_in | messages/second | consumer 消息消费(consumption)的速率 |
kafka.consumer.zookeeper_commits | writes/second | 面向 ZooKeeper 的 offset commits 速率 |
kafka.consumer_lag | offsets | consumer 和 broker 之间的消息滞后(lag) |
kafka.consumer_offset | offsets | consumer 的当前消息偏移量(current message offset) |
kafka.expires_sec | evictions/second | 延迟生产者(delayed producer)的请求到期(request expiration)速率 |
kafka.follower.expires_per_second | evictions/second | 关注者(followers)的请求到期(request expiration)速率 |
kafka.log.flush_rate | flushes/second | 日志刷新速率 |
kafka.messages_in | messages | 传入(incoming)信息速率 |
kafka.net.bytes_in | bytes/second | 传入(incoming)字节速率 |
kafka.net.bytes_out | bytes/second | 传出(outgoing)字节速率 |
kafka.net.bytes_rejected | bytes/second | 被拒绝(rejected)的字节速率 |
kafka.producer.bytes_out | bytes/second | producer 字节输出速率 |
kafka.producer.delayed_requests | requests | 延迟的 producer 请求数 |
kafka.producer.expires_per_seconds | evictions/second | producer 请求到期率 |
kafka.producer.io_wait | nanoseconds | Producer I/O 等待时间 |
kafka.producer.message_rate | messages/second | Producer 消息速率 |
kafka.producer.request_latency_avg | milliseconds | Producer 平均请求延迟 |
kafka.producer.request_rate | requests/second | producer 每秒钟的请求数 |
kafka.producer.response_rate | responses/second | producer 每秒钟的响应数 |
kafka.replication.isr_expands | nodes/second | 副本加入 ISR 池的速率 |
kafka.replication.isr_shrinks | nodes/second | 副本离开 ISR 池的速率 |
kafka.replication.leader_elections | events/second | 领导选举(Leader election)频率 |
kafka.replication.unclean_leader_elections | events/second | Unclean 的领导选举(Leader election)频率 |
kafka.replication.under_replicated_partitions | 未使用的分区数 | |
kafka.request.fetch.failed | requests | 客户端获取请求(fetch request)失败次数 |
kafka.request.fetch.failed_per_second | requests/second | 每秒钟的客户端获取请求(fetch request)失败率 |
kafka.request.fetch.time.99percentile | requests/second | 获取请求(fetch request)时间的第 99 百分位的值 |
kafka.request.fetch.time.avg | requests/second | 获取请求(fetch request)时间的平均值 |
kafka.request.handler.avg.idle.pct | fractions | 请求处理程序线程(request handler threads)的平均空闲时间占比 |
kafka.request.metadata.time.99percentile | milliseconds | 元数据(metadata)请求时间的第 99 百分位的值 |
kafka.request.metadata.time.avg | milliseconds | 元数据(metadata)请求时间的的平均值 |
kafka.request.offsets.time.99percentile | milliseconds | offset 请求时间的第 99 百分位的值 |
kafka.request.offsets.time.avg | milliseconds | offset 请求时间的平均值 |
kafka.request.produce.failed | requests | 失败的产品请求(produce requests)数 |
kafka.request.produce.failed_per_second | requests/second | 每秒钟的产品请求(produce requests)失败率 |
kafka.request.produce.time.99percentile | requests/second | 产品请求(produce requests)时间的第 99 百分位的值 |
kafka.request.produce.time.avg | requests/second | 产品请求(produce requests)平均时间 |
kafka.request.update_metadata.time.99percentile | milliseconds | 更新元数据请求(update metadata requests)时间的第 99 百分位的值 |
kafka.request.update_metadata.time.avg | milliseconds | 更新元数据请求(update metadata requests)时间的平均值 |
配置 Kafka 监控
JMX
开启 JMX,只需设置环境变量 JMX_PORT=9999
,修改启动脚本,添加 jmx 远程调试模式:
修改 Kafka 的 kafka-run-class.sh :
if [ $JMX_PORT ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT"
fi
需要修改为:
if [ $JMX_PORT ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
fi
然后重启 Kafka 即可。
Cloud Insight Agent 通过 JMX 获取 Kafka 中的性能指标。
由于每个实体最多可以监控 350 个性能指标,所以您需要按照下方的配置方法,修改配置文件来确定自己需要哪些指标。
有关 JMX 采集方法,请查阅 JMX 远程监控。
编辑配置文件
编辑配置文件 conf.d/kafka.yaml,使 Cloud Insight Agent 可以与 Kafka 通信。
##########
# WARNING
# This sample works only for Kafka >= 0.8.2.
instances:
# - host: localhost
# port: 9999
# name: jmx_instance
# user: username
# password: password
# #java_bin_path: /path/to/java #Optional, should be set if the agent cannot find your java executable
# #trust_store_path: /path/to/trustStore.jks # Optional, should be set if ssl is enabled
# #trust_store_password: password
init_config:
is_jmx: true
# Metrics collected by this check. You should not have to modify this.
conf:
#
# Aggregate cluster stats
#
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesOutPerSec'
attribute:
MeanRate:
metric_type: counter
alias: kafka.net.bytes_out
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesInPerSec'
attribute:
MeanRate:
metric_type: counter
alias: kafka.net.bytes_in
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=BrokerTopicMetrics,name=AllTopicsMessagesInPerSec'
attribute:
MeanRate:
metric_type: gauge
alias: kafka.messages_in
#
# Request timings
#
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=BrokerTopicMetrics,name=AllTopicsFailedFetchRequestsPerSec'
attribute:
MeanRate:
metric_type: gauge
alias: kafka.request.fetch.failed
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=BrokerTopicMetrics,name=AllTopicsFailedProduceRequestsPerSec'
attribute:
MeanRate:
metric_type: gauge
alias: kafka.request.produce.failed
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=Produce-TotalTimeMs'
attribute:
Mean:
metric_type: counter
alias: kafka.request.produce.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.produce.time.99percentile
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=Fetch-TotalTimeMs'
attribute:
Mean:
metric_type: counter
alias: kafka.request.fetch.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.fetch.time.99percentile
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=UpdateMetadata-TotalTimeMs'
attribute:
Mean:
metric_type: counter
alias: kafka.request.update_metadata.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.update_metadata.time.99percentile
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=Metadata-TotalTimeMs'
attribute:
Mean:
metric_type: counter
alias: kafka.request.metadata.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.metadata.time.99percentile
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=Offsets-TotalTimeMs'
attribute:
Mean:
metric_type: counter
alias: kafka.request.offsets.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.offsets.time.99percentile
#
# Replication stats
#
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=ReplicaManager,name=ISRShrinksPerSec'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.isr_shrinks
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=ReplicaManager,name=ISRExpandsPerSec'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.isr_expands
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=ControllerStats,name=LeaderElectionRateAndTimeMs'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.leader_elections
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=ControllerStats,name=UncleanLeaderElectionsPerSec'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.unclean_leader_elections
#
# Log flush stats
#
- include:
domain: 'kafka.log'
bean: 'kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs'
attribute:
MeanRate:
metric_type: counter
alias: kafka.log.flush_rate
编辑 Consumer 配置文件
编辑 Consumer 配置文件 conf.d/kafka_consumer.yaml
。
init_config:
instances:
# - kafka_connect_str: localhost:19092
# zk_connect_str: localhost:2181
# zk_prefix: /0.8
# consumer_groups:
# my_consumer:
# my_topic: [0, 1, 4, 12]
重启 Agent
重启 Cloud Insight Agent,使配置生效。
您也可以通过查看 Agent Info 信息,来验证配置是否成功。当出现以下信息,则代表安装成功。
Checks
======
[...]
kafka-localhost-9999
--------------------
- instance #0 [OK]
- Collected 8 metrics & 0 events
有关 Agent Info 信息的查看,请访问帮助中心,查看 Cloud Insight Agent 常用操作。
默认标签
Cloud Insight 采集 Kafka 以下默认主机标签:
标签种类 | 标签含义 |
---|---|
instance | 实例名称(例如 "kafka_demo") |
jmx_domain | JMX域(例如 "kafka.server") |
type | 指标类型(ThreadPool, GlobalRequestProcessor, Servlet, Cache, JspMonitor等) |
name | 指标名称(例如 "BytesInPerSec") |
常见问题
- Cloudn't launch JMXTerm. Is Java in your PATH ?
- Kafka: Failed to retrieve RMIServer stub
- 若要在同一个服务器上监控多个相同的平台服务,参考如何监控多个平台服务。
- 有任何关于产品的使用疑惑,参考常见问题。