展开

Kafka简介

最后发布时间 : 2024-03-12 17:30:59 浏览量 :

学习资料

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group service-platform
GROUP            TOPIC               PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                      HOST            CLIENT-ID
service-platform ngs-analysis-result 0          56              56              0               consumer-service-platform-1-25786487-fef6-4bfd-ae83-98a64602305b /192.168.10.194 consumer-service-platform-1

使用KRaft启动Kafka

# Generate a Cluster UUID
 KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
# Format Log Directories
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
# Start the Kafka Server
bin/kafka-server-start.sh config/kraft/server.properties

创建一个topic存储events

Kafka是一个分布式事件流平台,允许您在多台机器上读取、写入、存储和处理事件。

示例事件包括支付交易、手机的地理位置更新、运输订单、物联网设备或医疗设备的传感器测量等。这些事件按主题进行组织和存储。非常简单地说,topic类似于文件系统中的文件夹,events是该文件夹中的文件。

创建topic

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

查看topic

bin/kafka-topics.sh --list  --bootstrap-server localhost:9092
quickstart-events

查看topic描述

bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events	TopicId: 6QhIln_gSryc0bWusBPHhw	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: quickstart-events	Partition: 0	Leader: 1	Replicas: 1	Isr: 1

将事件写入主题

KAFKA客户端通过网络进行写作(或阅读)事件与Kafka brokers进行通信。收到后,brokers将在您需要的时间内以持久和耐受性的方式存储活动。

运行控制台生产者客户端以将一些事件写入您的主题。默认情况下,您输入的每一行都将导致将单独的事件写入主题。

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event

Ctrl+C 停止producer

读取事件

打开另一个终端会话并运行控制台消费者客户端以阅读您刚创建的事件:

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

Ctrl+C 停止consumer

CMAK

cmak.zkhosts="my.zookeeper.host.com:2181"

测试发送以逗号分割的fastq文件名

#!/bin/bash

topic=$1
shift
input_list=("$@")

# 将列表转换为逗号分隔的字符串
value=$(IFS=, ; echo "${input_list[*]}")

# 使用 kafka-console-producer.sh 发送消息
/home/wangyang/application/kafka_2.13-3.5.1/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic "$topic" --property "parse.key=true" --property "key.separator=:" <<EOF
key1:$value
EOF
./send.sh liukang-test-topic  fastq1,fastq2

kafka代理

listeners=PLAINTEXT://media01.dynabook.site:9092
## 这个地方填写的端口,必须和nginx代理的端口一致
advertised.listeners=PLAINTEXT://media01.dynabook.site:80