如何获取kafka消费者重新连接后的最新数据?
1.如果消费者比分区多,那就是浪费。因为kafka是设计成分区的,不允许并发,所以消费者的数量不能大于分区的数量。
2.如果消费者的数量少于分区的数量,一个消费者将对应于多个分区。这里主要是合理分配消费者的数量和分区的数量,否则分区中的数据会被不均衡的取用。
最佳分区数是用户数的整数倍,因此分区数非常重要。比如取24,就很容易设置消费者的数量。
3.如果使用者从多个分区读取数据,则不能保证数据是有序的。kafka只保证数据在一个分区上是有序的,但是多个分区会根据你读取的顺序不同。
4.增加或减少消费者、代理和分区会导致重新平衡,因此重新平衡后消费者对应的分区会发生变化。
5.当数据不可用时,高级接口将被阻塞。
简单版,
简单的坑,如果测试过程是先产生一些数据,然后和消费者一起读,记得加第一句设置。
因为初始偏移默认是非法的,然后这个设置的意思是,当偏移非法的时候,如何修正偏移,默认是最大的,也就是最新的,所以没有这个配置,你就无法读取你之前产生的数据,这个时候你添加最小的配置也没用,因为这个时候偏移是合法的,不会再修正,所以你需要手动或者用工具重新设置偏移。
Properties props = new Properties();
props.put("auto.offset.reset "," minimum ");//必须添加,如果要读取旧数据的话。
props.put("zookeeper.connect "," localhost:2181 ");
props.put("group.id "," PV ");
props . put(" zookeeper . session . time out . ms "," 400 ");
props . put(" zookeeper . sync . time . ms "," 200 ");
props . put(" auto . commit . interval . ms "," 1000 ");
consumer config conf = new consumer config(props);
consumer connector consumer = Kafka . consumer . consumer . createjavaconsumeconnector(conf);
String topic = " page _ visits
地图& ltString,Integer & gttopicCountMap = new HashMap & ltString,Integer & gt();
topicCountMap.put(topic,new Integer(1));
地图& lt字符串,列表& ltKafkaStream & ltbyte[],byte[]& gt;& gt& gtconsumer map = consumer . create message streams(topicCountMap);
列表& ltKafkaStream & ltbyte[],byte[]& gt;& gtstreams = consumer map . get(topic);
KafkaStream & ltbyte[],byte[]& gt;stream = streams . get(0);
消费畸胎& ltbyte[],byte[]& gt;it = stream . iterator();
while (it.hasNext()){
system . out . println(" message:"+new String(it . next()。message())));
}
如果(消费者!= null) consumer.shutdown()。//实际上不能执行,因为上面的hasNext会阻塞。
当使用高级消费者时,两个强大的工具,
1.bin/Kafka-run-class . sh Kafka . tools . consumer offset checker-group PV
你可以看到组偏移量的当前状态,比如这里pv的状态,3分区。
群组主题Pid偏移日志大小滞后所有者
PV page _ visits 0 21 21 0无
PV page _ visits 1 19 19 0无
pv page_visits 2 20 20 0无
关键是偏移量、对数大小和滞后
我之前在这里看完了,所以offset=logSize,Lag=0。
2.bin/Kafka-run-class . sh Kafka . tools . updateoffsetsinzk earliest config/consumer . properties page _ visits
三个参数,
[最早|最晚],指示放置偏移的位置。
这里是配置文件的路径。
话题,话题名,这里是page_visits。
在我们对上面的pv组执行此操作后,我们将转到检查组偏移状态,结果如下。
群组主题Pid偏移日志大小滞后所有者
PV page _ visits 0 0 21 21无
PV page _ visits 1 0 19 19无
pv page_visits 2 0 20 20无
可以看到offset已经被清除为0,Lag=logSize。
下面给出了原文中多线程消费者的完整代码。
导入Kafka . consumer . consumer config;
导入Kafka . consumer . kafkastream;
导入Kafka . javaapi . consumer . consumer connector;
导入Java . util . hashmap;
导入Java . util . list;
导入Java . util . map;
导入Java . util . properties;
导入Java . util . concurrent . executorservice;
导入Java . util . concurrent . executors;
公共类消费者组示例{
私人最终消费者连接器消费者;
私有最终字符串主题;
私人遗嘱执行人;
public ConsumerGroupExample(String a _ zookeeper,String a_groupId,String a_topic) {
消费者=卡夫卡。消费者。消费者。createjavaconsumeconnector(//创建连接器,并注意conf的如下配置。
createConsumerConfig(a _ zookeeper,a _ groupId));
this.topic = a _ topic
}
公共void shutdown() {
如果(消费者!= null) consumer.shutdown()。
如果(遗嘱执行人!= null)executor . shut down();
}
public void run(int a _ num threads){//创建并发使用者。
地图& ltString,Integer & gttopicCountMap = new HashMap & ltString,Integer & gt();
topicCountMap.put(topic,new Integer(a _ numThreads));//描述要读取的主题,以及需要多少个线程来读取它。
地图& lt字符串,列表& ltKafkaStream & ltbyte[],byte[]& gt;& gt& gtconsumer map = consumer . create message streams(topicCountMap);//创建流
列表& ltKafkaStream & ltbyte[],byte[]& gt;& gtstreams = consumer map . get(topic);//每个线程对应一个KafkaStream。
//现在启动所有线程
//
executor = executors . newfixedthreadpool(a _ numThreads);
//现在创建一个对象来使用消息
//
int thread number = 0;
for(最终KafkaStream流:streams) {
executor . submit(new consumer test(stream,thread number));//启动消费者线程。
thread number++;
}
}
私有静态consumer config createConsumerConfig(String a _ zookeeper,String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect ",a _ zookeeper);
props.put("group.id ",a _ groupId);
props . put(" zookeeper . session . time out . ms "," 400 ");
props . put(" zookeeper . sync . time . ms "," 200 ");
props . put(" auto . commit . interval . ms "," 1000 ");
返回新的ConsumerConfig(道具);
}
公共静态void main(String[] args) {
string zooKeeper = args[0];
string groupId = args[1];
string topic = args[2];
int threads = integer . parse int(args[3]);
ConsumerGroupExample示例= new ConsumerGroupExample(zooKeeper,groupId,topic);
example.run(线程);
尝试{
线程.睡眠(10000);
} catch(interrupted exception ie){
}
example . shut down();
}
}
简单消费者
另一个是SimpleConsumer,以其命名,以为是简单的接口,其实是低级的消费者,更复杂的接口。
参考,https://c wiki.apache.org/conflict/display/Kafka/0.8.0+simple消费者+示例。
什么时候使用这个界面?
多次阅读一封邮件
在一个流程中,仅使用一个主题中分区的子集
管理事务以确保一条消息只被处理一次
当然,使用这个接口是有代价的,就是分区、broker、offset对你来说不再透明,需要你自己管理,还需要切换handle broker leader,很麻烦。
所以不必用,最好不用。
您必须跟踪应用程序中的偏移量,以了解您停止消耗的位置。
您必须确定哪个代理是主题和分区的主要代理
你必须处理经纪人领导的变化
要使用简单消费者:
找到一个活跃的经纪人,并找出哪个经纪人是你的主题和分区的领导者
确定谁是您的主题和分区的副本代理
构建定义您感兴趣的数据的请求
获取数据
识别领导变更并从中恢复
首先,你必须知道读哪个题目,哪个分区。
然后,找到负责分区的经纪人负责人,从而找到拥有分区副本的经纪人。
此外,自己编写请求并获取数据。
最后,注意需要识别和处理经纪人领导的变化。