步骤 1:使用 Vector 将数据写入 Kafka
部署 KSE 并安装日志相关组件
KubeSphere 企业版中需要安装的扩展组件:
-
RadonDB DMP
-
OpenSearch 分布式检索与分析引擎
-
WhizardTelemetry 平台服务
-
WhizardTelemetry 数据流水线
-
WhizardTelemetry 日志管理
-
WhizardTelemetry 审计管理
-
WhizardTelemetry 通知管理
-
WhizardTelemetry 事件管理
禁用 OpenSearch Sink
在安装部署 WhizardTelemetry 日志管理、WhizardTelemetry 审计管理、WhizardTelemetry 事件管理以及 WhizardTelemetry 通知管理前,需要禁用这些扩展组件配置中 opensearch 的 sink。
以安装 WhizardTelemetry 审计管理扩展组件为例,将 sinks.opensearch.enabled 设置为 false。
配置 Kafka
在 KubeSphere 企业版中,安装 RadonDB DMP 扩展组件后,点击顶部导航栏上的图标,然后点击 RadonDB DMP 进入数据库管理平台,创建 Kafka 集群以用于收集日志。
启用自动创建 topic
点击 Kafka 集群名称,进入参数管理页签,启用自动创建 topic 的功能。
说明 |
---|
在 Kafka 集群的详情页左侧可获取 Kafka 的读写地址。 |
创建 Kafka 用户
-
在 Kafka 集群的详情页面,进入 Kafka 用户页签,点击创建开始创建 Kafka 用户。
-
按下图所示设置用户权限。
获取证书
查看证书相关信息
为了与 Kafka 通信,需要配置相关的证书及文件,具体为 <cluster>-cluster-ca-cert,以及上一个步骤中创建的用户的 user.p12 字段及密码,详细信息可在 KubeSphere 企业版 Web 控制台界面上查询,如下所示。
-
点击页面上方的工作台 > 集群管理,进入 host 集群。
-
在左侧导航栏选择配置 > 保密字典。
-
在保密字典页面,搜索
cluster-ca-cert
,点击 Kafka 集群对应的保密字典进入详情页面,查看 ca-crt 字段的信息。 -
在保密字典页面,搜索已创建的 Kafka 用户的名称,点击其对应的保密字典进入详情页面,查看 user.p12 及 user.password 字段的信息。
生成证书
-
在 Kafka 所在集群的节点上,执行以下命令。
说明 kafka cluster 为 Kafka 集群的名称,kafka namespace 为 Kafka 所在的 namespace,kafka user 为之前创建的 Kafka 用户。
export kafka_cluster=< kafka cluster > export kafka_namespace=< kafka namespace > export kafka_user=< kafka user > echo -e "apiVersion: v1\ndata:" > kafka-ca.yaml echo " ca.crt: $(kubectl get secret -n $kafka_namespace ${kafka_cluster}-cluster-ca-cert \ -o jsonpath='{.data.ca\.crt}')" >> kafka-ca.yaml echo -e "kind: Secret\nmetadata:\n name: kafka-agent-cluster-ca\n labels:\n logging.whizard.io/certification: 'true'\n logging.whizard.io/vector-role: Agent\n \ namespace: kubesphere-logging-system\ntype: Opaque" >> kafka-ca.yaml echo "---" >> kafka-ca.yaml echo -e "apiVersion: v1\ndata:" >> kafka-ca.yaml echo " user.p12: $(kubectl get secret -n $kafka_namespace ${kafka_user} \ -o jsonpath='{.data.user\.p12}')" >> kafka-ca.yaml echo -e "kind: Secret\nmetadata:\n name: kafka-agent-user-ca\n labels:\n logging.whizard.io/certification: 'true'\n logging.whizard.io/vector-role: Agent\n \ namespace: kubesphere-logging-system\ntype: Opaque" >> kafka-ca.yaml
此命令会生成 kafka-ca.yaml 文件,包含 kafka-agent-cluster-ca 以及 kafka-agent-user-ca 两个 secret 文件,分别含有上一个步骤中的 ca.crt 以及 user.p12 信息。示例如下:
apiVersion: v1 data: ca.crt: xxx kind: Secret metadata: name: kafka-agent-cluster-ca labels: logging.whizard.io/certification: 'true' logging.whizard.io/vector-role: Agent namespace: kubesphere-logging-system type: Opaque --- apiVersion: v1 data: user.p12: xxxx kind: Secret metadata: name: kafka-agent-user-ca labels: logging.whizard.io/certification: 'true' logging.whizard.io/vector-role: Agent namespace: kubesphere-logging-system
-
将 kafka-ca.yaml 文件复制到需要收集日志数据的集群节点上,执行以下命令。
kubectl apply -f kafka-ca.yaml
此命令会在 kubesphere-logging-system 项目下创建 kafka-agent-cluster-ca 以及 kafka-agent-user-ca 两个 secret 文件。vector-config 会自动加载这两个 secret,并且在 vector 中配置相关证书。
创建 Kafka 日志接收器
cat <<EOF | kubectl apply -f -
kind: Secret
apiVersion: v1
metadata:
name: vector-agent-auditing-sink-kafka
namespace: kubesphere-logging-system
labels:
logging.whizard.io/component: auditing
logging.whizard.io/enable: 'true'
logging.whizard.io/vector-role: Agent
annotations:
kubesphere.io/creator: admin
stringData:
sink.yaml: >-
sinks:
kafka_auditing:
type: "kafka"
topic: "vector-{{ .cluster }}-auditing"
# 逗号分隔的 Kafka bootstrap servers 如:"10.14.22.123:9092,10.14.23.332:9092"
bootstrap_servers: "172.31.73.214:32239"
librdkafka_options:
security.protocol: "ssl"
ssl.endpoint.identification.algorithm: "none"
ssl.ca.location: "/etc/vector/custom/certification/ca.crt"
ssl.keystore.location: "/etc/vector/custom/certification/user.p12"
ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o"
encoding:
codec: "json"
inputs:
- auditing_remapped
batch:
max_events: 100
timeout_secs: 10
type: Opaque
---
kind: Secret
apiVersion: v1
metadata:
name: vector-agent-events-sink-kafka
namespace: kubesphere-logging-system
labels:
logging.whizard.io/component: events
logging.whizard.io/enable: 'true'
logging.whizard.io/vector-role: Agent
annotations:
kubesphere.io/creator: admin
stringData:
sink.yaml: >-
sinks:
kafka_events:
type: "kafka"
topic: "vector-{{ .cluster }}-events"
bootstrap_servers: "172.31.73.214:32239"
librdkafka_options:
security.protocol: "ssl"
ssl.endpoint.identification.algorithm: "none"
ssl.ca.location: "/etc/vector/custom/certification/ca.crt"
ssl.keystore.location: "/etc/vector/custom/certification/user.p12"
ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o"
encoding:
codec: "json"
inputs:
- kube_events_remapped
batch:
max_events: 100
timeout_secs: 10
type: Opaque
---
kind: Secret
apiVersion: v1
metadata:
name: vector-agent-logs-sink-kafka
namespace: kubesphere-logging-system
labels:
logging.whizard.io/component: logs
logging.whizard.io/enable: 'true'
logging.whizard.io/vector-role: Agent
annotations:
kubesphere.io/creator: admin
stringData:
sink.yaml: >-
sinks:
kafka_logs:
type: "kafka"
topic: "vector-{{ .cluster }}-logs"
bootstrap_servers: "172.31.73.214:32239"
librdkafka_options:
security.protocol: "ssl"
ssl.endpoint.identification.algorithm: "none"
ssl.ca.location: "/etc/vector/custom/certification/ca.crt"
ssl.keystore.location: "/etc/vector/custom/certification/user.p12"
ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o"
encoding:
codec: "json"
inputs:
- kube_logs_remapped
- systemd_logs_remapped
batch:
max_events: 100
timeout_secs: 10
type: Opaque
---
apiVersion: v1
kind: Secret
metadata:
name: vector-aggregator-notification-history-sink-kafka
namespace: kubesphere-logging-system
labels:
logging.whizard.io/component: "notification-history"
logging.whizard.io/vector-role: Aggregator
logging.whizard.io/enable: "true"
stringData:
sink.yaml: >-
sinks:
kafka_notification_history:
type: "kafka"
topic: "vector-{{ .cluster }}-notification-history"
bootstrap_servers: "172.31.73.214:32239"
librdkafka_options:
security.protocol: "ssl"
ssl.endpoint.identification.algorithm: "none"
ssl.ca.location: "/etc/vector/custom/certification/ca.crt"
ssl.keystore.location: "/etc/vector/custom/certification/user.p12"
ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o"
encoding:
codec: "json"
inputs:
- notification_history_remapped
batch:
max_events: 100
timeout_secs: 10
type: Opaque
EOF