Canal-php 客户端使用

学习笔记 2020/08/21 Canal, PHP

    canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端提供了不同语言实现不同的消费逻辑的源码样例:

    canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample

    canal c# 客户端: https://github.com/dotnetcore/CanalSharp

    canal go客户端: https://github.com/CanalClient/canal-go

    canal Python客户端: https://github.com/haozi3156666/canal-python

    canal PHP客户端: https://github.com/xingwenge/canal-php

canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ,可以借助于 MQ 的多语言能力


工作原理

    canal-php 是 Canal 的 php 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。


工作流程

1、Canal连接到mysql数据库,模拟slave

2、canal-php 与 Canal 建立连接

3、数据库发生变更写入到binlog

4、Canal向数据库发送dump请求,获取binlog并解析

5、canal-php 向 Canal 请求数据库变更

6、Canal 发送解析后的数据给canal-php

7、canal-php收到数据,消费成功,发送回执。(可选)

8、Canal记录消费位置。


构建canal php客户端

$ composer require xingwenge/canal_php

or

$ git clone https://github.com/xingwenge/canal-php.git
$ cd canal-php
$ composer update


建立与Canal的连接

try {
    #客户端连接方式
    $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE); 
    # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
    $client->connect("127.0.0.1", 11111); # 对应 canal.properties的配置
    $client->checkValid();
    $client->subscribe("1001", "example", ".*\\\\..*"); #此处1001不需要修改
    # $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤,多个数据库用逗号隔开

    while (true) { 
        $message = $client->get(100); 
        if ($entries = $message->getEntries()) {
            foreach ($entries as $entry) { 
                #此处可以处理逻辑
                Fmt::println($entry);
            }
        } 
        sleep(1);
    }
    $client->disConnect();
} catch (\\Exception $e) {
    echo $e->getMessage(), PHP_EOL;
}


运行

php canal-php/src/sample/client.php



本文地址:https://www.stayed.cn/item/248

转载请注明出处。

本站部分内容来源于网络,如侵犯到您的权益,请 联系我

我的博客

人生若只如初见,何事秋风悲画扇。