首页>代码>kafka模拟生产者消费者(集群模式)实例>/kafka-demo/src/main/java/com/kafka/consumer/SimpleKafkaConsumer.java
package com.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class SimpleKafkaConsumer {
    private static KafkaConsumer<String, String> consumer;
    private final static String TOPIC = "test";
    public SimpleKafkaConsumer(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.110:9092,192.168.1.111:9092,192.168.1.112:9092");
        //每个消费者分配独立的组号
        props.put("group.id", "test");
        //如果value合法,则自动提交偏移量
        props.put("enable.auto.commit", "true");
        //设置多久一次更新被消费消息的偏移量
        props.put("auto.commit.interval.ms", "1000");
        //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
        props.put("session.timeout.ms", "30000");
        //自动重置offset
        props.put("auto.offset.reset","earliest");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<String, String>(props);
    }

    public void consume(){
        consumer.subscribe(Arrays.asList(TOPIC));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value());
                System.out.println();
            }
        }
    }

    public static void main(String[] args) {
        new SimpleKafkaConsumer().consume();
    }
}
最近下载更多
lironggang  LV38 2023年3月18日
1145304128  LV12 2023年2月13日
gaoxin222  LV14 2022年12月10日
heweimin  LV13 2022年10月5日
503382513  LV10 2022年8月31日
sevenliu  LV9 2021年11月1日
hello_jugg  LV7 2021年10月22日
ewan007  LV30 2021年9月27日
sunlzh888888  LV29 2021年6月29日
数据库1  LV12 2020年12月27日
最近浏览更多
guviva  LV6 2023年6月26日
zhaoka 2023年5月30日
暂无贡献等级
starmomom  LV10 2023年3月14日
gaoxin222  LV14 2022年12月10日
heweimin  LV13 2022年10月5日
zdm1231  LV2 2022年7月30日
itcaizhe  LV9 2022年5月24日
泡芙1234  LV8 2022年4月21日
yych007  LV5 2022年2月11日
1145304128  LV12 2022年2月10日
顶部 客服 微信二维码 底部
>扫描二维码关注最代码为好友扫描二维码关注最代码为好友