步骤 2:使用 Vector 从 Kafka 读取数据
更新时间:2024-07-02 02:04:35
部署 vector aggregator
在 Kafka 所在集群,执行以下命令创建 vector namespace,并在该 namespace 下部署 vector-aggregator。
说明 |
---|
|
helm install vector-aggregator aggregator-0.30.0.tgz -n vector --create-namespace --set vectorConfig.image.tag=v0.2.1 --set image.tag=0.36.0-debian
所需镜像:
docker.io/timberio/vector:0.36.0-debian
docker.io/kubesphere/kubectl:v1.26.13
docker.io/kubesphere/vector-config:v0.2.1
获取证书
-
在 Kafka 所在集群的节点上,执行以下命令。
说明 kafka cluster 为 Kafka 集群的名称,kafka namespace 为 Kafka 所在的 namespace,kafka user 为之前创建的 Kafka 用户。
export kafka_cluster=< kafka cluster > export kafka_namespace=< kafka namespace > export kafka_user=< kafka user > echo -e "apiVersion: v1\ndata:" > kafka-aggregator-ca.yaml echo " ca.crt: $(kubectl get secret -n $kafka_namespace ${kafka_cluster}-cluster-ca-cert \ -o jsonpath='{.data.ca\.crt}')" >> kafka-aggregator-ca.yaml echo -e "kind: Secret\nmetadata:\n name: kafka-aggregator-cluster-ca\n labels:\n logging.whizard.io/certification: 'true'\n logging.whizard.io/vector-role: Aggregator\n \ namespace: vector\ntype: Opaque" >> kafka-aggregator-ca.yaml echo "---" >> kafka-aggregator-ca.yaml echo -e "apiVersion: v1\ndata:" >> kafka-aggregator-ca.yaml echo " user.p12: $(kubectl get secret -n $kafka_namespace ${kafka_user} \ -o jsonpath='{.data.user\.p12}')" >> kafka-aggregator-ca.yaml echo -e "kind: Secret\nmetadata:\n name: kafka-aggregator-user-ca\n labels:\n logging.whizard.io/certification: 'true'\n logging.whizard.io/vector-role: Aggregator\n \ namespace: vector\ntype: Opaque" >> kafka-aggregator-ca.yaml
此命令会生成 kafka-aggregator-ca.yaml 文件,包含 kafka-aggregator-cluster-ca 以及 kafka-aggregator-user-ca 两个 secret 文件,分别含有上一个步骤中的 ca.crt 以及 user.p12 信息。 示例如下:
apiVersion: v1 data: ca.crt: xxx kind: Secret metadata: name: kafka-aggregator-cluster-ca labels: logging.whizard.io/certification: 'true' logging.whizard.io/vector-role: Aggregator namespace: vector type: Opaque --- apiVersion: v1 data: user.p12: xxx kind: Secret metadata: name: kafka-aggregator-user-ca labels: logging.whizard.io/certification: 'true' logging.whizard.io/vector-role: Aggregator namespace: vector type: Opaque
配置 vector-aggregator,将消息发送至 OpenSearch
创建 vector 配置,在 bootstrap_servers 填入相应的 Kafka 集群地址,在 sink:kafka_to_opensearch:endpoints 填入相应的 OpenSearch 地址。
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
metadata:
name: vector-aggregator-opensearch
namespace: vector
labels:
logging.whizard.io/vector-role: Aggregator
logging.whizard.io/enable: "true"
stringData:
kafka-pipeline.yaml: >-
sources:
kafka_source:
type: "kafka"
group_id: "ks"
topics: [ "^(vector)-.+" ]
bootstrap_servers: "172.31.53.102:32476"
librdkafka_options:
security.protocol: "ssl"
ssl.endpoint.identification.algorithm: "none"
ssl.ca.location: "/etc/vector/custom/certification/ca.crt"
ssl.keystore.location: "/etc/vector/custom/certification/user.p12"
ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o"
max.poll.interval.ms: "600000"
partition.assignment.strategy: roundrobin
decoding:
codec: json
session_timeout_ms: 20000
socket_timeout_ms: 90000
transforms:
kafka_remapped:
inputs:
- kafka_source
source: |-
.event.original = encode_json(.)
ts = parse_timestamp!(.timestamp, format: "%+")
.timestamp = format_timestamp!(ts, format: "%+", timezone: "local")
.topictime = to_unix_timestamp(ts, unit: "milliseconds")
.logstamp = from_unix_timestamp!(.topictime, unit: "milliseconds")
.logdate = .timestamp
.idxdate = format_timestamp!(ts, format: "%Y.%m.%d", timezone: "local")
tmp = split!(.topic, "-")
.index = join!(remove!(tmp, [0]), "-")
type: remap
sinks:
kafka_to_opensearch:
api_version: v8
auth:
password: admin
strategy: basic
user: admin
batch:
timeout_secs: 5
buffer:
max_events: 10000
endpoints:
- https://<opensearch-url>:<port>
tls:
verify_certificate: false
type: elasticsearch
inputs:
- kafka_remapped
bulk:
index: "{{ .index }}-%Y.%m.%d"
request:
timeout_sec: 180
type: Opaque
EOF