package com.liuyaofeng.kafka.service; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; @SuppressWarnings("all") public class KafkaConsumer { private final ConsumerConnector consumer; private KafkaConsumer() { Properties props = new Properties(); // zookeeper 配置 props.put("zookeeper.connect", "127.0.0.1:2181"); // group 代表一个消费组 props.put("group.id", "liugroup"); // zk连接超时 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("rebalance.max.retries", "5"); props.put("rebalance.backoff.ms", "1200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); // 序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } void consume() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(KafkaProducer.TOPIC, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()) System.out.println("******" + it.next().message() + "******"); } public static void main(String[] args) { new KafkaConsumer().consume(); } }
最近下载更多
lironggang LV38
2023年3月18日
503382513 LV10
2022年8月31日
zdm1231 LV2
2022年8月1日
chenhuahao LV18
2019年9月11日
2663811356 LV1
2019年9月5日
倪卟懂 LV18
2019年7月22日
TwinkleQin LV6
2019年6月26日
无上英雄 LV8
2019年6月22日
cqm0609 LV13
2019年4月30日
夕阳2266 LV10
2019年4月2日
最近浏览更多
youwuzuichen LV10
1月4日
xiexiaoming05 LV14
2023年6月29日
zhaoka
2023年5月30日
暂无贡献等级
starmomom LV10
2023年3月14日
cc900118 LV17
2022年12月3日
zdm1231 LV2
2022年8月1日
itcaizhe LV9
2022年5月24日
泡芙1234 LV8
2022年4月21日
tangjj7260 LV18
2022年4月6日
yych007 LV5
2022年2月11日