今天看啥  ›  专栏  ›  白衣少年丶

php的kafka踩坑(二)

白衣少年丶  · 掘金  ·  · 2020-04-10 02:52
阅读 10

php的kafka踩坑(二)

接上一篇文章,上次没有解决的一个问题就是在做一个队列的时候,存在多消费者消费到同一个消息的情况,今天终于解决了这个问题,问题的本质是因为运维给我创建的topic是有问题的,他创建的分区数量是0,我今天上容器看了一下,终于发现了,然后删了自己重新建了一个,具体容器操作kafka的topic教程可以看我另一个文档基于kafka容器操作topic

在这里,我们从头开始介绍一下topic(主题),partition(分区),group(分组),consumer(消费者),producer(生产者)的关系

  • producer,生产者,生产数据
  • consumer,消费者,消费数据
  • topic,简单点说就是一个队列,生产者生产数据和消费者消费数据都必须指定一个Topic,就是生产的数据要放到哪个队列去给消费者消费
  • partition和group,一个topic可以配置多个partition,consumer消费数据时是按照group来消费的,kafka确保每个partition只能由同一个group中的同一个consumer消费,如果想要重复消费,那么需要其他的组来消费,所以同一个group的消费者数量应当小于等于partition数量。Zookeerper中保存这每个topic下的每个partition在每个group中消费的offset。(此段介绍引用这篇文章
    • consumer读取时,会指定读取的group,同一个消息在同一个group下只会读取到一次,如果要重复消费数据,需要新建group
    • 如果group只有一个,并且有多个partition,一个consumer时,所有partition里的消息都会发往该consumer,如果consumer不止一个时,可能会存在有的consumer里面数据消费的多,有的消费的少,做多消费者的队列就是用这个特性,当partition数量=consuerm时,消息可以达到负载均衡。

下面上一下下代码,修改之后topic的partition是3个,依旧是基于 enqueue/rdkafka 这个包

生产者,不指定partition,kafka会自动分配

$connFactory = new RdKafkaConnectionFactory([
    'global' => [
        'metadata.broker.list' => '127.0.0.1:9092',
        'socket.timeout.ms' => '50'
    ]
]);
$context = $connFactory->createContext();
$topic = $context->createQueue('app');
for ($i = 0; $i <= 5; $i++) {
    $message = $context->createMessage('hello world!' . $i);
    $context->createProducer()->send($topic, $message);
}
复制代码

消费者

$config = [
    'global' => [
        'group.id' => date('Ymd'), // 指定一个分区,分区名自定义,做队列分区名必须一样
        'metadata.broker.list' => '127.0.0.1:9092',
        'enable.auto.commit' => 'false',
    ],
    'topic' => [
        'auto.offset.reset' => 'latest',
    ],
];
$connFactory = new RdKafkaConnectionFactory($config);
$context = $connFactory->createContext();
$queue = $context->createQueue('app');
$consumer = $context->createConsumer($queue);
while (true) {
    $message = $consumer->receive(30 * 1000);
    if (!is_object($message)) {
        continue;
    }
    var_dump($message->getBody());
    $consumer->acknowledge($message);
    $consumer->reject($message);
}
复制代码

启动三个消费者,运行一次生产者,可以得到以下结果

消费者1

string(13) "hello world!1"
string(13) "hello world!3"
string(13) "hello world!4"
string(13) "hello world!5"
复制代码

消费者2

string(13) "hello world!2"
复制代码

消费者3

string(13) "hello world!0"
复制代码

到此就实现了kafka做队列的需求了,本文内容就到这里,相关kafka知识可以看这篇文章




原文地址:访问原文地址
快照地址: 访问文章快照