canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端提供了不同语言实现不同的消费逻辑的源码样例:
canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
canal c# 客户端: https://github.com/dotnetcore/CanalSharp
canal go客户端: https://github.com/CanalClient/canal-go
canal Python客户端: https://github.com/haozi3156666/canal-python
canal PHP客户端: https://github.com/xingwenge/canal-php
canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ,可以借助于 MQ 的多语言能力
工作原理
canal-php 是 Canal 的 php 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。
工作流程
1、Canal连接到mysql数据库,模拟slave
2、canal-php 与 Canal 建立连接
3、数据库发生变更写入到binlog
4、Canal向数据库发送dump请求,获取binlog并解析
5、canal-php 向 Canal 请求数据库变更
6、Canal 发送解析后的数据给canal-php
7、canal-php收到数据,消费成功,发送回执。(可选)
8、Canal记录消费位置。
构建canal php客户端
$ composer require xingwenge/canal_php or $ git clone https://github.com/xingwenge/canal-php.git $ cd canal-php $ composer update
建立与Canal的连接
try { #客户端连接方式 $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE); # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE); $client->connect("127.0.0.1", 11111); # 对应 canal.properties的配置 $client->checkValid(); $client->subscribe("1001", "example", ".*\\\\..*"); #此处1001不需要修改 # $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤,多个数据库用逗号隔开 while (true) { $message = $client->get(100); if ($entries = $message->getEntries()) { foreach ($entries as $entry) { #此处可以处理逻辑 Fmt::println($entry); } } sleep(1); } $client->disConnect(); } catch (\\Exception $e) { echo $e->getMessage(), PHP_EOL; }
运行
php canal-php/src/sample/client.php
本文地址:https://www.stayed.cn/item/248
转载请注明出处。
本站部分内容来源于网络,如侵犯到您的权益,请 联系我