一、安装jdk(已安装可以不用管)
Kafka需要依赖jdk,这里先安装jdk
【1】卸载jdk
有时候会因为各种原因我们需要卸载jdk,这里也记录一下
# 查看安装的jdk rpm -qa|grep -i jdk
![]()
# 卸载jdk(根据自己的包名来进行卸载) rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.332.b09-1.el7_9.x86_64 rpm -e --nodeps java-1.8.0-openjdk-1.8.0.332.b09-1.el7_9.x86_64 rpm -e --nodeps java-1.8.0-openjdk-devel-1.8.0.332.b09-1.el7_9.x86_64
# 卸载后查看jdk版本,发现找不到,卸载成功 java -version
【2】安装jdk
官网下载:Java Downloads | Oracle 找到 tar.gz 结尾的进行下载,上传到服务器进行安装配置
# 解压(我的解压目录为:/software/jdk/jdk1.8.0_333) tar -zxvf jdk-8u333-linux-x64.tar.gz # 打开配置文件进行配置 vim /etc/profile # 添加如下配置(根据自己的路径来) export JAVA_HOME=/software/jdk/jdk1.8.0_333 export JRE_HOME=/$JAVA_HOME/jre export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin # 使配置文件生效 source /etc/profile # 查看是否安装成功 java -version
二、下载Kafka
![]()
根据对应的版本,使用wget下载,如果下载速度太慢,可以在Windows上通过下载器下载,然后上传到服务器上
wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz --no-check-certificate
上传后进行解压
tar -axvf kafka_2.13-3.2.0.tgz
注,解压路径不宜过长,否则后面启动的时候可能会报错,所以这里我将kafka_2.13-3.2.0重命名为kafka
mv kafka_2.13-3.2.0 kafka
解压后目录结构如下:
![]()
三、配置Kafka
这里只是单机kafka的配置,并不是集群的配置
【1】配置kafka的log路径
打开config目录下的server.properties文件修改配置
# 打开配置文件 vim config/server.properties # 配置日志存放位置(启动的时候会自己在相应的目录下创建kafka-logs文件夹) log.dirs=/software/kafka/kafka-logs
【2】配置外网访问
还是修改server.properties文件,找到下面配置进行修改,位置大概在34行左右
# 放开注解 listeners=PLAINTEXT://:9092 # 修改注解,改成自己服务器ip advertised.listeners=PLAINTEXT://42.104.249.49:9092
![]()
【3】配置zookeeper数据路径
先在解压目录下创建zookeeper文件夹用来存放数据,再打开config目录下的zookeeper.properties文件修改配置
# 打开配置文件 vim config/zookeeper.properties # 配置数据存放位置(启动的时候会自己在相应的目录下创建zookeeper文件夹) dataDir=/software/kafka/zookeeper
![]()
【4】放通端口
# 放通9092端口 firewall-cmd --zone=public --add-port=9092/tcp --permanent # 重启防火墙 firewall-cmd --reload
注意这里除了放通centos的端口,还要将安全组的端口也放通,比如我这里用的是腾讯云,就要上腾讯云的控制台开放相应的安全组端口
四、启动Kafka
# 启动zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties # 启动kafka bin/kafka-server-start.sh config/server.properties # 生产消息(创建名为testTopic的主题) bin/kafka-console-producer.sh --topic testTopic --bootstrap-server localhost:9092 # 监听消息(重开一个终端监听testTopic主题的消息,在生产消息的终端发消息,此终端收消息) bin/kafka-console-consumer.sh --topic testTopic --from-beginning --bootstrap-server localhost:9092
在监听消息的终端能够收到消息,则说明安装配置成功
五、SpringBoot连接测试
目录结构如下:
![]()
【1】引入pom依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
【2】配置application.yml
bootstrap-servers配置为自己服务器的IP
spring: kafka: # 消费者 consumer: group-id: foo auto-offset-reset: earliest bootstrap-servers: 42.104.249.49:9092 # 生产者 producer: bootstrap-servers: 42.104.249.49:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
【3】生产消息
生产消息通过controller访问接口来进行测试
@RestController public class KafkaProducerController { @Autowired private KafkaTemplate kafkaTemplate; @GetMapping("/send") public void send() { // testTopic为主题,和上面终端测试的保持一致,方便测试,onestar为要发送的消息 kafkaTemplate.send("testTopic","onestar"); } }
【4】监听消费消息
@Component @Slf4j public class KafkaConsumer { /** * kafka的监听器,topic为"testTopic",消费者组默认为配置文件里面的 */ @KafkaListener(topics = {"testTopic"}) public void onMessage1(ConsumerRecord<?, ?> consumerRecord) { Optional<?> optional = Optional.ofNullable(consumerRecord.value()); if (optional.isPresent()) { Object msg = optional.get(); log.info("message:{}", msg); } } }
【5】测试
为了方便查看,我们可以将上面的监听消息终端开着,然后运行代码,通过访问 http://localhost:8080/send 进行测试
从打印的日志可以看到消息已经消费了,并且在监听消息的终端也打印出消息
评论