如何获取kafka消费者重新连接后的最新数据?

但也要注意一些注意事项。对于多个分区和多个消费者,

1.如果消费者比分区多,那就是浪费。因为kafka是设计成分区的,不允许并发,所以消费者的数量不能大于分区的数量。

2.如果消费者的数量少于分区的数量,一个消费者将对应于多个分区。这里主要是合理分配消费者的数量和分区的数量,否则分区中的数据会被不均衡的取用。

最佳分区数是用户数的整数倍,因此分区数非常重要。比如取24,就很容易设置消费者的数量。

3.如果使用者从多个分区读取数据,则不能保证数据是有序的。kafka只保证数据在一个分区上是有序的,但是多个分区会根据你读取的顺序不同。

4.增加或减少消费者、代理和分区会导致重新平衡,因此重新平衡后消费者对应的分区会发生变化。

5.当数据不可用时,高级接口将被阻塞。

简单版,

简单的坑,如果测试过程是先产生一些数据,然后和消费者一起读,记得加第一句设置。

因为初始偏移默认是非法的,然后这个设置的意思是,当偏移非法的时候,如何修正偏移,默认是最大的,也就是最新的,所以没有这个配置,你就无法读取你之前产生的数据,这个时候你添加最小的配置也没用,因为这个时候偏移是合法的,不会再修正,所以你需要手动或者用工具重新设置偏移。

Properties props = new Properties();

props.put("auto.offset.reset "," minimum ");//必须添加,如果要读取旧数据的话。

props.put("zookeeper.connect "," localhost:2181 ");

props.put("group.id "," PV ");

props . put(" zookeeper . session . time out . ms "," 400 ");

props . put(" zookeeper . sync . time . ms "," 200 ");

props . put(" auto . commit . interval . ms "," 1000 ");

consumer config conf = new consumer config(props);

consumer connector consumer = Kafka . consumer . consumer . createjavaconsumeconnector(conf);

String topic = " page _ visits

地图& ltString,Integer & gttopicCountMap = new HashMap & ltString,Integer & gt();

topicCountMap.put(topic,new Integer(1));

地图& lt字符串,列表& ltKafkaStream & ltbyte[],byte[]& gt;& gt& gtconsumer map = consumer . create message streams(topicCountMap);

列表& ltKafkaStream & ltbyte[],byte[]& gt;& gtstreams = consumer map . get(topic);

KafkaStream & ltbyte[],byte[]& gt;stream = streams . get(0);

消费畸胎& ltbyte[],byte[]& gt;it = stream . iterator();

while (it.hasNext()){

system . out . println(" message:"+new String(it . next()。message())));

}

如果(消费者!= null) consumer.shutdown()。//实际上不能执行,因为上面的hasNext会阻塞。

当使用高级消费者时,两个强大的工具,

1.bin/Kafka-run-class . sh Kafka . tools . consumer offset checker-group PV

你可以看到组偏移量的当前状态,比如这里pv的状态,3分区。

群组主题Pid偏移日志大小滞后所有者

PV page _ visits 0 21 21 0无

PV page _ visits 1 19 19 0无

pv page_visits 2 20 20 0无

关键是偏移量、对数大小和滞后

我之前在这里看完了,所以offset=logSize,Lag=0。

2.bin/Kafka-run-class . sh Kafka . tools . updateoffsetsinzk earliest config/consumer . properties page _ visits

三个参数,

[最早|最晚],指示放置偏移的位置。

这里是配置文件的路径。

话题,话题名,这里是page_visits。

在我们对上面的pv组执行此操作后,我们将转到检查组偏移状态,结果如下。

群组主题Pid偏移日志大小滞后所有者

PV page _ visits 0 0 21 21无

PV page _ visits 1 0 19 19无

pv page_visits 2 0 20 20无

可以看到offset已经被清除为0,Lag=logSize。

下面给出了原文中多线程消费者的完整代码。

导入Kafka . consumer . consumer config;

导入Kafka . consumer . kafkastream;

导入Kafka . javaapi . consumer . consumer connector;

导入Java . util . hashmap;

导入Java . util . list;

导入Java . util . map;

导入Java . util . properties;

导入Java . util . concurrent . executorservice;

导入Java . util . concurrent . executors;

公共类消费者组示例{

私人最终消费者连接器消费者;

私有最终字符串主题;

私人遗嘱执行人;

public ConsumerGroupExample(String a _ zookeeper,String a_groupId,String a_topic) {

消费者=卡夫卡。消费者。消费者。createjavaconsumeconnector(//创建连接器,并注意conf的如下配置。

createConsumerConfig(a _ zookeeper,a _ groupId));

this.topic = a _ topic

}

公共void shutdown() {

如果(消费者!= null) consumer.shutdown()。

如果(遗嘱执行人!= null)executor . shut down();

}

public void run(int a _ num threads){//创建并发使用者。

地图& ltString,Integer & gttopicCountMap = new HashMap & ltString,Integer & gt();

topicCountMap.put(topic,new Integer(a _ numThreads));//描述要读取的主题,以及需要多少个线程来读取它。

地图& lt字符串,列表& ltKafkaStream & ltbyte[],byte[]& gt;& gt& gtconsumer map = consumer . create message streams(topicCountMap);//创建流

列表& ltKafkaStream & ltbyte[],byte[]& gt;& gtstreams = consumer map . get(topic);//每个线程对应一个KafkaStream。

//现在启动所有线程

//

executor = executors . newfixedthreadpool(a _ numThreads);

//现在创建一个对象来使用消息

//

int thread number = 0;

for(最终KafkaStream流:streams) {

executor . submit(new consumer test(stream,thread number));//启动消费者线程。

thread number++;

}

}

私有静态consumer config createConsumerConfig(String a _ zookeeper,String a_groupId) {

Properties props = new Properties();

props.put("zookeeper.connect ",a _ zookeeper);

props.put("group.id ",a _ groupId);

props . put(" zookeeper . session . time out . ms "," 400 ");

props . put(" zookeeper . sync . time . ms "," 200 ");

props . put(" auto . commit . interval . ms "," 1000 ");

返回新的ConsumerConfig(道具);

}

公共静态void main(String[] args) {

string zooKeeper = args[0];

string groupId = args[1];

string topic = args[2];

int threads = integer . parse int(args[3]);

ConsumerGroupExample示例= new ConsumerGroupExample(zooKeeper,groupId,topic);

example.run(线程);

尝试{

线程.睡眠(10000);

} catch(interrupted exception ie){

}

example . shut down();

}

}

简单消费者

另一个是SimpleConsumer,以其命名,以为是简单的接口,其实是低级的消费者,更复杂的接口。

参考,https://c wiki.apache.org/conflict/display/Kafka/0.8.0+simple消费者+示例。

什么时候使用这个界面?

多次阅读一封邮件

在一个流程中,仅使用一个主题中分区的子集

管理事务以确保一条消息只被处理一次

当然,使用这个接口是有代价的,就是分区、broker、offset对你来说不再透明,需要你自己管理,还需要切换handle broker leader,很麻烦。

所以不必用,最好不用。

您必须跟踪应用程序中的偏移量,以了解您停止消耗的位置。

您必须确定哪个代理是主题和分区的主要代理

你必须处理经纪人领导的变化

要使用简单消费者:

找到一个活跃的经纪人,并找出哪个经纪人是你的主题和分区的领导者

确定谁是您的主题和分区的副本代理

构建定义您感兴趣的数据的请求

获取数据

识别领导变更并从中恢复

首先,你必须知道读哪个题目,哪个分区。

然后,找到负责分区的经纪人负责人,从而找到拥有分区副本的经纪人。

此外,自己编写请求并获取数据。

最后,注意需要识别和处理经纪人领导的变化。