php怎么连kafka,PHP使用kafka入门

news/2024/7/4 12:59:53 标签: php怎么连kafka

第一步,kafka安装使用入门

0:环境准备,Linux环境(CentOS为例),wget命令,git命令,java8

1:下载kafka

2:解压

tar -zxvf kafka_2.12-2.7.0.tgz

3:进入目录

cd kafka_2.12-2.7.0

4:启动ZooKeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

5:这时窗口会停留在监听状态,再新建一个窗口,同目录下启动kafka

bin/kafka-server-start.sh config/server.properties

6:再新建一个窗口,创建一个topic(topic用来存储数据)

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

7:在这个topic上写入一些内容,使用producer(生产者)

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

这时会出现输入光标,可以输入一些字符,然后按回车生产数据。

8:再新建一个窗口,使用consumer(消费者)来使用刚刚生产者生产的数据

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

这时可以比对7和8两个窗口,7窗口的输入,会在8窗口上输出。

这时,我们就完成了kafka的一个基本流程。包括使用zookeeper来管理kafka进程,新建topic和broker,使用producer生产数据以及使用consumer来消费数据。这些会话都可以用ctrl+c来终止。

下面,我们通过一个demo来看下php是如何与kafka交互的。

第二步,php和kafka交互入门

0:环境准备,php7,Linux环境,pecl命令

1:安装依赖

git clone https://github.com/edenhill/librdkafka.git

cd librdkafka

./configure

make && make install

2: 安装php扩展

pecl install rdkafka

安装完使用php -m |grep kafka,看下有没有添加成功

3:运行producer来生产数据

$conf = new RdKafka\Conf();

$conf->set('metadata.broker.list', 'localhost:9092');

$producer = new RdKafka\Producer($conf);

$topic = $producer->newTopic("quickstart-events");

for ($i = 0; $i < 10; $i++) {

$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");

$producer->poll(0);

}

for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {

$result = $producer->flush(10000);

if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {

break;

}

}

if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {

throw new \RuntimeException('Was unable to flush, messages might be lost!');

}

执行:

php producer.php

这时,上面第8条的窗口如果没有关闭,也能同步收到这边在topic写入的数据。

4:运行consumer来消费数据

vim consumer.php

$conf = new RdKafka\Conf();

$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);

$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafka\TopicConf();

$topicConf->set('auto.commit.interval.ms', 100);

$topicConf->set('offset.store.method', 'broker');

$topicConf->set('auto.offset.reset', 'earliest');

$topic = $rk->newTopic("quickstart-events", $topicConf);

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {

$message = $topic->consume(0, 120*10000);

switch ($message->err) {

case RD_KAFKA_RESP_ERR_NO_ERROR:

var_dump($message);

break;

case RD_KAFKA_RESP_ERR__PARTITION_EOF:

echo "No more messages; will wait for more\n";

break;

case RD_KAFKA_RESP_ERR__TIMED_OUT:

echo "Timed out\n";

break;

default:

throw new \Exception($message->errstr(), $message->err);

break;

}

}

执行:

php consumer.php

可以看到,进入了监听状态,这时,每执行一次producer.php,都会在consumer这里打印响应的数据。

END. Enjoy it!


http://www.niftyadmin.cn/n/1378237.html

相关文章

使用javacv,解码socket接收的H264码流(byte[]),转为yuv处理,最后再合成转为H264...

其实是一个用java实现录像的功能&#xff0c;还没有实现&#xff0c;但解码和转码已经可以。 1.maven环境&#xff0c;pom.xml配置 1 <properties>2 <javacpp.version>1.4.1</javacpp.version>3 <!-- ffmpeg版本 -->4 <ffm…

spark 资源参数调优

资源参数调优 了解完了Spark作业运行的基本原理之后&#xff0c;对资源相关的参数就容易理解了。所谓的Spark资源参数调优&#xff0c;其实主要就是对Spark运行过程中各个使用资源的地方&#xff0c;通过调节各种参数&#xff0c;来优化资源使用的效率&#xff0c;从而提升Spar…

CodeForces - 534B-Covered Path+思路

CodeForces - 534B 题意&#xff1a;给定初始和末尾的速度&#xff0c;和最大加速度和总时间&#xff0c;求出走的最长路程&#xff1b; 我一开始以为代码写起来会很繁琐。。。 #include <iostream> #include <cstring> #include <string> #include <alg…

Mac安装Dart的SDK

最近了解到谷歌推迟Flutter兼容开发iOS、Android移动应用的框架&#xff0c;该框架使用的语音是Dart。作为一个iOS开发者来说&#xff0c;不感兴趣就不正常了&#xff0c;于是开始从学习Dart开始&#xff0c;所有的开发语音其实都是大同小异的。 不如正题&#xff0c;本人是采用…

php遍历xml 数组,PHP从SimpleXMLElement数组获取值

我有这个&#xff1a;[1]>object(SimpleXMLElement)#6 (1) {["attributes"]>array(14) {["name"]>string(5) "MySQL"["acknowledged"]>string(1) "1"["comments"]>string(1) "1"["…

Python基础---容器元组Tuple

元组Tuple(有序集合) 定义&#xff1a;容器内的元素不可变&#xff0c;该容器为元组 使用 () 来表示一个元组 元组在初始化后&#xff0c;其中的元素不可修改&#xff0c;不可删除 创建元组&#xff1a; 如果元组中只有一个元素&#xff0c;需要在元素后加上 , &#xff0c;防止…

结合场景谈一谈微服务配置

作为 Nacos 5W1H 的系列文章&#xff0c;本文将围绕“Where”&#xff0c;讲述 Nacos 配置管理的三个典型的应用场景&#xff1a; 数据库连接信息限流阈值和降级开关流量的动态调度上一篇&#xff1a;Nacos帮我解决了什么问题&#xff1f;数据库连接信息 曾经有朋友跟我聊过一个…

php获取跳转前的地址,php如何获取跳转前的url

php获取跳转前的url方法&#xff1a;1、获取URL带QUESTRING参数的JAVASCRIPT客户端方法&#xff1b;2、正则分析法&#xff0c;设置或获取整个URL为字符串&#xff0c;代码为【alert(window.location.href)】。php获取跳转前的url方法&#xff1a;一&#xff1a;获取URL带QUEST…