- 项目地址: https://github.com/apecloud/mysql-binlog-connector-rust
- crate 地址: https://crates.io/crates/mysql-binlog-connector-rust
binlog 解析
对于 mysql,数据同步是最常用的基础功能之一,尽管 mysql 自身具有主从复制能力,但原生功能很难直接满足诸多日常使用场景,如:数据迁移,数据集成,增量数据订阅,数据同步等。这部分能力往往由第三方工具进行补充。
各大云厂商的数据库平台均提供数据同步服务,如 aws 的 datasync,阿里云和腾讯云的 dts。同时,一些开源产品也提供了强大的数据同步能力,如 debezium ,canal,chunjun 等。
mysql 数据同步的基础是 binlog 解析,尽管 mysql 支持 3 种 binlog 格式(STATEMENT,ROW,MIXED),但从方便性和数据准确性来讲,ROW 类型是最优解,也是行业的通用方案。
在 ROW 格式的 binlog 中,每个操作以行的形式记录,它提供了详细的更改信息,而不仅仅是每个操作的 sql 语句。具体来说,对于每个修改操作(insert、update 或 delete),binlog 会记录受影响的行的完整内容,包括修改前的旧值和修改后的新值。目前所有主流的 binlog 解析库均是针对 ROW 类型。
github 上面可以找到一些优秀的开源 binlog 解析库,如:mysql-binlog-connector-java (java),canal (java),python-mysql-replication (python),go-mysql (go)
而在本文中,我们提供了一款新的 rust 版本的 binlog 解析器。
mysql-binlog-connector-rust 概览
- 一句话概括:mysql-binlog-connector-rust 是一款通过 rust 实现的,使用异步 IO,解析 mysql 的 Row Based Replication Events 的 lib。
- 项目地址
支持的 mysql 版本
- mysql 5.6 (测试版本: mysql:5.6.51)
- mysql 5.7 (测试版本: mysql:5.7.40)
- mysql 8.0 (测试版本: mysql:8.0.31)
支持的事件类型
-
FORMAT_DESCRIPTION_EVENT
-
ROTATE_EVENT
-
PREVIOUS_GTIDS_LOG_EVENT
-
GTID_LOG_EVENT
-
QUERY_EVENT
-
XID_EVENT
-
XA_PREPARE_LOG_EVENT
-
TRANSACTION_PAYLOAD_EVENT
-
ROWS_QUERY_LOG_EVENT
-
TABLE_MAP_EVENT
-
WRITE_ROWS_EVENT_V1
-
WRITE_ROWS_EVENT
-
UPDATE_ROWS_EVENT_V1
-
UPDATE_ROWS_EVENT
-
DELETE_ROWS_EVENT_V1
-
DELETE_ROWS_EVENT
-
对于暂不支持的事件类型,mysql-binlog-connector-rust 会直接忽略,并不影响对其他事件的解析。
-
关于 mysql binlog 中所有事件详情,可参考
mysql 数据类型和 rust 数据类型映射
mysql 列类型 | binlog 原始列类型 | rust binlog 类型 | rust 数据类型 |
---|---|---|---|
BIT | MYSQL_TYPE_BIT = 16 | ColumnType::Bit | ColumnValue::Bit(u64) |
TINYINT [UNSIGNED] | MYSQL_TYPE_TINY = 1 | ColumnType::Tiny | ColumnValue::Tiny(i8) |
SMALLINT [UNSIGNED] | MYSQL_TYPE_SHORT = 2 | ColumnType::Short | ColumnValue::Short(i16) |
MEDIUMINT [UNSIGNED] | MYSQL_TYPE_INT24 = 9 | ColumnType::Int24 | ColumnValue::Long(i32) |
INT [UNSIGNED] | MYSQL_TYPE_LONG = 3 | ColumnType::Long | ColumnValue::Long(i32) |
BIGINT [UNSIGNED] | MYSQL_TYPE_LONGLONG = 8 | ColumnType::LongLong | ColumnValue::LongLong(i64) |
FLOAT | MYSQL_TYPE_FLOAT = 4 | ColumnType::Float | ColumnValue::Float(f32) |
DOUBLE | MYSQL_TYPE_DOUBLE = 5 | ColumnType::Double | ColumnValue::Double(f64) |
DECIMAL | MYSQL_TYPE_NEWDECIMAL = 246 | ColumnType::NewDecimal | ColumnValue::Decimal(String) |
DATE | MYSQL_TYPE_DATE = 10 | ColumnType::Date | ColumnValue::Date(String) |
TIME | MYSQL_TYPE_TIME2 = 19 | ColumnType::Time2 | ColumnValue::Time(String) |
TIMESTAMP | MYSQL_TYPE_TIMESTAMP2 = 17 | ColumnType::TimeStamp2 | ColumnValue::Timestamp(i64) |
DATETIME | MYSQL_TYPE_DATETIME2 = 18 | ColumnType::DateTime2 | ColumnValue::DateTime(String) |
YEAR | MYSQL_TYPE_YEAR = 13 | ColumnType::Year | ColumnValue::Year(u16) |
CHAR | MYSQL_TYPE_STRING = 254 | ColumnType::String | ColumnValue::String(Vec) |
VARCHAR | MYSQL_TYPE_VARCHAR = 15 | ColumnType::VarChar | ColumnValue::String(Vec) |
BINARY | MYSQL_TYPE_STRING = 254 | ColumnType::String | ColumnValue::String(Vec) |
VARBINARY | MYSQL_TYPE_VARCHAR = 15 | ColumnType::VarChar | ColumnValue::String(Vec) |
ENUM | MYSQL_TYPE_STRING = 254 | ColumnType::Enum | ColumnValue::Enum(u32) |
SET | MYSQL_TYPE_STRING = 254 | ColumnType::Set | ColumnValue::Set(u64) |
TINYTEXT TEXT MEDIUMTEXT LONGTEXT TINYBLOB BLOB MEDIUMBLOB LONGBLOB | MYSQL_TYPE_BLOB = 252 | ColumnType::Blob | ColumnValue::Blob(Vec) |
GEOMETRY | MYSQL_TYPE_GEOMETRY = 255 | ColumnType::Geometry | ColumnValue::Blob(Vec) |
JSON | MYSQL_TYPE_JSON = 245 | ColumnType::Json | ColumnValue::Json(Vec) |
- 对于 CHAR / VARCHAR 列,由于 binlog 不包含字符集信息,我们只获取二进制数据并存储在 ColumnValue::String(Vec) 对象中,用户需根据列的元数据进行转换。
- 对于 UNSIGNED 数字列,由于 binlog 不包含符号标志,我们只将其解析为有符号数字,用户需根据列的元数据进行转换。
- 对于 JSON 列,我们只获取二进制数据并将其存储在 ColumnValue::Json(Vec) 对象中,同时我们还提供一个的默认解析器 JsonBinary 将其解析为字符串,本文后续有相应示例。
快速开始
运行测试用例
- docker 启动 mysql 5.7,开启 binlog,关闭 binlog-transaction-compression
docker run -d --name mysql57 \
--platform linux/x86_64 \
-it --restart=always \
-p 3307:3306 \
-e MYSQL_ROOT_PASSWORD="123456" \
mysql:5.7.40 \
--lower_case_table_names=1 \
--character-set-server=utf8 \
--collation-server=utf8_general_ci \
--datadir=/var/lib/mysql \
--user=mysql \
--server_id=1 \
--log_bin=/var/lib/mysql/mysql-bin.log \
--max_binlog_size=100M \
--gtid_mode=ON \
--enforce_gtid_consistency=ON \
--binlog_format=ROW
- docker 启动 mysql 8.0,开启 binlog,打开 binlog-transaction-compression
docker run -d --name mysql80 \
--platform linux/x86_64 \
-it --restart=always \
-p 3308:3306 -e MYSQL_ROOT_PASSWORD="123456" \
mysql:8.0.31 \
--lower_case_table_names=1 \
--character-set-server=utf8 \
--collation-server=utf8_general_ci \
--datadir=/var/lib/mysql \
--user=mysql \
--server_id=1 \
--log_bin=/var/lib/mysql/mysql-bin.log \
--max_binlog_size=100M \
--gtid_mode=ON \
--enforce_gtid_consistency=ON \
--binlog_format=ROW \
--binlog-transaction-compression \
--binlog_rows_query_log_events=ON \
--default_authentication_plugin=mysql_native_password \
--default_time_zone="+08:00"
- 更新 tests/.env 中的配置
db_url=mysql://root:123456@127.0.0.1:3307
server_id=200
default_db="db_test"
default_tb="tb_test"
binlog_parse_millis=100
- 运行测试
cargo test --package mysql-binlog-connector-rust --test integration_test
- 每个测试用例会:
- 执行 sql 并生成 binlog
- 获取 binlog 并解析
- 等待 binlog_parse_millis 以将所有 binlog 解析完毕
- 对于大事务,可能需要增大 binlog_parse_millis
用例
fn main() {
let env_path = env::current_dir().unwrap().join("example/src/.env");
dotenv::from_path(env_path).unwrap();
let db_url = env::var("db_url").unwrap();
let server_id: u64 = env::var("server_id").unwrap().parse().unwrap();
let binlog_filename = env::var("binlog_filename").unwrap();
let binlog_position: u32 = env::var("binlog_position").unwrap().parse().unwrap();
block_on(start_client(
db_url,
server_id,
binlog_filename,
binlog_position,
));
}
async fn start_client(url: String, server_id: u64, binlog_filename: String, binlog_position: u32) {
let mut client = BinlogClient {
url,
binlog_filename,
binlog_position,
server_id,
};
let mut stream = client.connect().await.unwrap();
loop {
let (header, data) = stream.read().await.unwrap();
println!("header: {:?}", header);
println!("data: {:?}", data);
println!("");
}
}
用例 1: 解析关闭 binlog-transaction-compression 的事务
- 执行 sql 语句
flush logs;
SET autocommit=0;
CREATE DATABASE test_db;
USE test_db;
CREATE TABLE test_tb(id INT, value INT);
INSERT INTO test_tb VALUES(1,1),(2,2),(3,3),(4,4);
UPDATE test_tb SET value=3 WHERE id in(1,2);
DELETE FROM test_tb WHERE id in (1,2);
TRUNCATE TABLE test_tb;
DROP TABLE test_tb;
commit;
- 查看 binlog 事件
mysql> show binary logs;
+------------------+-----------+
| Log_name | File_size |
+------------------+-----------+
| mysql-bin.000050 | 1255 |
mysql> show binlog events in 'mysql-bin.000050';
+------------------+------+----------------+-----------+-------------+------------------------------------------------------------------------+
| Log_name | Pos | Event_type | Server_id | End_log_pos | Info |
+------------------+------+----------------+-----------+-------------+------------------------------------------------------------------------+
| mysql-bin.000050 | 4 | Format_desc | 1 | 123 | Server ver: 5.7.40-log, Binlog ver: 4 |
| mysql-bin.000050 | 123 | Previous_gtids | 1 | 194 | 50dc6874-13d3-11ee-a17a-0242ac110002:1-176027 |
| mysql-bin.000050 | 194 | Gtid | 1 | 259 | SET @@SESSION.GTID_NEXT= '50dc6874-13d3-11ee-a17a-0242ac110002:176028' |
| mysql-bin.000050 | 259 | Query | 1 | 378 | use `test_db`; CREATE TABLE test_tb(id INT, value INT) |
| mysql-bin.000050 | 378 | Gtid | 1 | 443 | SET @@SESSION.GTID_NEXT= '50dc6874-13d3-11ee-a17a-0242ac110002:176029' |
| mysql-bin.000050 | 443 | Query | 1 | 518 | BEGIN |
| mysql-bin.000050 | 518 | Table_map | 1 | 572 | table_id: 12832 (test_db.test_tb) |
| mysql-bin.000050 | 572 | Write_rows | 1 | 643 | table_id: 12832 flags: STMT_END_F |
| mysql-bin.000050 | 643 | Table_map | 1 | 697 | table_id: 12832 (test_db.test_tb) |
| mysql-bin.000050 | 697 | Update_rows | 1 | 769 | table_id: 12832 flags: STMT_END_F |
| mysql-bin.000050 | 769 | Table_map | 1 | 823 | table_id: 12832 (test_db.test_tb) |
| mysql-bin.000050 | 823 | Delete_rows | 1 | 876 | table_id: 12832 flags: STMT_END_F |
| mysql-bin.000050 | 876 | Xid | 1 | 907 | COMMIT /* xid=13739 */ |
| mysql-bin.000050 | 907 | Gtid | 1 | 972 | SET @@SESSION.GTID_NEXT= '50dc6874-13d3-11ee-a17a-0242ac110002:176030' |
| mysql-bin.000050 | 972 | Query | 1 | 1064 | use `test_db`; TRUNCATE TABLE test_tb |
| mysql-bin.000050 | 1064 | Gtid | 1 | 1129 | SET @@SESSION.GTID_NEXT= '50dc6874-13d3-11ee-a17a-0242ac110002:176031' |
| mysql-bin.000050 | 1129 | Query | 1 | 1255 | use `test_db`; DROP TABLE `test_tb` /* generated by server */ |
+------------------+------+----------------+-----------+-------------+------------------------------------------------------------------------+
- 解析出的 binlog 信息
header: EventHeader { timestamp: 0, event_type: 4, server_id: 1, event_length: 47, next_event_position: 0, event_flags: 32 }
data: Rotate(RotateEvent { binlog_filename: "mysql-bin.000050", binlog_position: 194 })
header: EventHeader { timestamp: 1704443761, event_type: 15, server_id: 1, event_length: 119, next_event_position: 0, event_flags: 0 }
data: FormatDescription(FormatDescriptionEvent { binlog_version: 4, server_version: "5.7.40-log\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", create_timestamp: 0, header_length: 19, checksum_type: CRC32 })
header: EventHeader { timestamp: 1704443769, event_type: 33, server_id: 1, event_length: 65, next_event_position: 259, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "50dc6874-13d3-11ee-a17a-0242ac110002:176028" })
header: EventHeader { timestamp: 1704443769, event_type: 2, server_id: 1, event_length: 119, next_event_position: 378, event_flags: 0 }
data: Query(QueryEvent { thread_id: 493, exec_time: 0, error_code: 0, schema: "test_db", query: "CREATE TABLE test_tb(id INT, value INT)" })
header: EventHeader { timestamp: 1704443769, event_type: 33, server_id: 1, event_length: 65, next_event_position: 443, event_flags: 0 }
data: Gtid(GtidEvent { flags: 0, gtid: "50dc6874-13d3-11ee-a17a-0242ac110002:176029" })
header: EventHeader { timestamp: 1704443769, event_type: 2, server_id: 1, event_length: 75, next_event_position: 518, event_flags: 8 }
data: Query(QueryEvent { thread_id: 493, exec_time: 0, error_code: 0, schema: "test_db", query: "BEGIN" })
header: EventHeader { timestamp: 1704443769, event_type: 19, server_id: 1, event_length: 54, next_event_position: 572, event_flags: 0 }
data: TableMap(TableMapEvent { table_id: 12832, database_name: "test_db", table_name: "test_tb", column_types: [3, 3], column_metas: [0, 0], null_bits: [true, true] })
header: EventHeader { timestamp: 1704443769, event_type: 30, server_id: 1, event_length: 71, next_event_position: 643, event_flags: 0 }
data: WriteRows(WriteRowsEvent { table_id: 12832, included_columns: [true, true], rows: [RowEvent { column_values: [Long(1), Long(1)] }, RowEvent { column_values: [Long(2), Long(2)] }, RowEvent { column_values: [Long(3), Long(3)] }, RowEvent { column_values: [Long(4), Long(4)] }] })
header: EventHeader { timestamp: 1704443769, event_type: 19, server_id: 1, event_length: 54, next_event_position: 697, event_flags: 0 }
data: TableMap(TableMapEvent { table_id: 12832, database_name: "test_db", table_name: "test_tb", column_types: [3, 3], column_metas: [0, 0], null_bits: [true, true] })
header: EventHeader { timestamp: 1704443769, event_type: 31, server_id: 1, event_length: 72, next_event_position: 769, event_flags: 0 }
data: UpdateRows(UpdateRowsEvent { table_id: 12832, included_columns_before: [true, true], included_columns_after: [true, true], rows: [(RowEvent { column_values: [Long(1), Long(1)] }, RowEvent { column_values: [Long(1), Long(3)] }), (RowEvent { column_values: [Long(2), Long(2)] }, RowEvent { column_values: [Long(2), Long(3)] })] })
header: EventHeader { timestamp: 1704443769, event_type: 19, server_id: 1, event_length: 54, next_event_position: 823, event_flags: 0 }
data: TableMap(TableMapEvent { table_id: 12832, database_name: "test_db", table_name: "test_tb", column_types: [3, 3], column_metas: [0, 0], null_bits: [true, true] })
header: EventHeader { timestamp: 1704443769, event_type: 32, server_id: 1, event_length: 53, next_event_position: 876, event_flags: 0 }
data: DeleteRows(DeleteRowsEvent { table_id: 12832, included_columns: [true, true], rows: [RowEvent { column_values: [Long(1), Long(3)] }, RowEvent { column_values: [Long(2), Long(3)] }] })
header: EventHeader { timestamp: 1704443769, event_type: 16, server_id: 1, event_length: 31, next_event_position: 907, event_flags: 0 }
data: Xid(XidEvent { xid: 13739 })
header: EventHeader { timestamp: 1704443769, event_type: 33, server_id: 1, event_length: 65, next_event_position: 972, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "50dc6874-13d3-11ee-a17a-0242ac110002:176030" })
header: EventHeader { timestamp: 1704443769, event_type: 2, server_id: 1, event_length: 92, next_event_position: 1064, event_flags: 0 }
data: Query(QueryEvent { thread_id: 493, exec_time: 0, error_code: 0, schema: "test_db", query: "TRUNCATE TABLE test_tb" })
header: EventHeader { timestamp: 1704443769, event_type: 33, server_id: 1, event_length: 65, next_event_position: 1129, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "50dc6874-13d3-11ee-a17a-0242ac110002:176031" })
header: EventHeader { timestamp: 1704443769, event_type: 2, server_id: 1, event_length: 126, next_event_position: 1255, event_flags: 4 }
data: Query(QueryEvent { thread_id: 493, exec_time: 0, error_code: 0, schema: "test_db", query: "DROP TABLE `test_tb` /* generated by server */" })
用例 2: 解析开启 binlog-transaction-compression 的 binlog
- 执行 sql 语句
flush logs;
SET autocommit=0;
CREATE DATABASE test_db;
USE test_db;
CREATE TABLE test_tb(id INT, value INT);
INSERT INTO test_tb VALUES(1,1),(2,2),(3,3),(4,4);
UPDATE test_tb SET value=3 WHERE id in(1,2);
DELETE FROM test_tb WHERE id in (1,2);
TRUNCATE TABLE test_tb;
DROP TABLE test_tb;
commit;
- 查看 binlog 事件
mysql> show binary logs;
+------------------+-----------+-----------+
| Log_name | File_size | Encrypted |
+------------------+-----------+-----------+
| mysql-bin.000033 | 1429 | No |
mysql> show binlog events in 'mysql-bin.000033';
+------------------+------+---------------------+-----------+-------------+----------------------------------------------------------------------------+
| Log_name | Pos | Event_type | Server_id | End_log_pos | Info |
+------------------+------+---------------------+-----------+-------------+----------------------------------------------------------------------------+
| mysql-bin.000033 | 4 | Format_desc | 1 | 126 | Server ver: 8.0.31, Binlog ver: 4 |
| mysql-bin.000033 | 126 | Previous_gtids | 1 | 197 | 36682cf3-a048-11ed-b4b3-0242ac110004:1-125753 |
| mysql-bin.000033 | 197 | Gtid | 1 | 274 | SET @@SESSION.GTID_NEXT= '36682cf3-a048-11ed-b4b3-0242ac110004:125754' |
| mysql-bin.000033 | 274 | Query | 1 | 391 | CREATE DATABASE test_db /* xid=5 */ |
| mysql-bin.000033 | 391 | Gtid | 1 | 468 | SET @@SESSION.GTID_NEXT= '36682cf3-a048-11ed-b4b3-0242ac110004:125755' |
| mysql-bin.000033 | 468 | Query | 1 | 601 | use `test_db`; CREATE TABLE test_tb(id INT, value INT) /* xid=10 */ |
| mysql-bin.000033 | 601 | Gtid | 1 | 680 | SET @@SESSION.GTID_NEXT= '36682cf3-a048-11ed-b4b3-0242ac110004:125756' |
| mysql-bin.000033 | 680 | Transaction_payload | 1 | 1033 | compression='ZSTD', decompressed_size=633 bytes |
| mysql-bin.000033 | 1033 | Query | 1 | 1033 | BEGIN |
| mysql-bin.000033 | 1033 | Rows_query | 1 | 1033 | # INSERT INTO test_tb VALUES(1,1),(2,2),(3,3),(4,4) |
| mysql-bin.000033 | 1033 | Table_map | 1 | 1033 | table_id: 90 (test_db.test_tb) |
| mysql-bin.000033 | 1033 | Write_rows | 1 | 1033 | table_id: 90 flags: STMT_END_F |
| mysql-bin.000033 | 1033 | Rows_query | 1 | 1033 | # UPDATE test_tb SET value=3 WHERE id in(1,2) |
| mysql-bin.000033 | 1033 | Table_map | 1 | 1033 | table_id: 90 (test_db.test_tb) |
| mysql-bin.000033 | 1033 | Update_rows | 1 | 1033 | table_id: 90 flags: STMT_END_F |
| mysql-bin.000033 | 1033 | Rows_query | 1 | 1033 | # DELETE FROM test_tb WHERE id in (1,2) |
| mysql-bin.000033 | 1033 | Table_map | 1 | 1033 | table_id: 90 (test_db.test_tb) |
| mysql-bin.000033 | 1033 | Delete_rows | 1 | 1033 | table_id: 90 flags: STMT_END_F |
| mysql-bin.000033 | 1033 | Xid | 1 | 1033 | COMMIT /* xid=11 */ |
| mysql-bin.000033 | 1033 | Gtid | 1 | 1110 | SET @@SESSION.GTID_NEXT= '36682cf3-a048-11ed-b4b3-0242ac110004:125757' |
| mysql-bin.000033 | 1110 | Query | 1 | 1214 | use `test_db`; TRUNCATE TABLE test_tb /* xid=14 */ |
| mysql-bin.000033 | 1214 | Gtid | 1 | 1291 | SET @@SESSION.GTID_NEXT= '36682cf3-a048-11ed-b4b3-0242ac110004:125758' |
| mysql-bin.000033 | 1291 | Query | 1 | 1429 | use `test_db`; DROP TABLE `test_tb` /* generated by server */ /* xid=15 */ |
+------------------+------+---------------------+-----------+-------------+----------------------------------------------------------------------------+
- 解析出的 binlog 信息
header: EventHeader { timestamp: 1704445709, event_type: 15, server_id: 1, event_length: 122, next_event_position: 126, event_flags: 0 }
data: FormatDescription(FormatDescriptionEvent { binlog_version: 4, server_version: "8.0.31\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", create_timestamp: 0, header_length: 19, checksum_type: CRC32 })
header: EventHeader { timestamp: 1704445709, event_type: 35, server_id: 1, event_length: 71, next_event_position: 197, event_flags: 128 }
data: PreviousGtids(PreviousGtidsEvent { gtid_set: "36682cf3-a048-11ed-b4b3-0242ac110004:1-125753" })
header: EventHeader { timestamp: 1704445716, event_type: 33, server_id: 1, event_length: 77, next_event_position: 274, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "36682cf3-a048-11ed-b4b3-0242ac110004:125754" })
header: EventHeader { timestamp: 1704445716, event_type: 2, server_id: 1, event_length: 117, next_event_position: 391, event_flags: 8 }
data: Query(QueryEvent { thread_id: 8, exec_time: 0, error_code: 0, schema: "test_db", query: "CREATE DATABASE test_db" })
header: EventHeader { timestamp: 1704445716, event_type: 33, server_id: 1, event_length: 77, next_event_position: 468, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "36682cf3-a048-11ed-b4b3-0242ac110004:125755" })
header: EventHeader { timestamp: 1704445716, event_type: 2, server_id: 1, event_length: 133, next_event_position: 601, event_flags: 0 }
data: Query(QueryEvent { thread_id: 8, exec_time: 0, error_code: 0, schema: "test_db", query: "CREATE TABLE test_tb(id INT, value INT)" })
header: EventHeader { timestamp: 1704445717, event_type: 33, server_id: 1, event_length: 79, next_event_position: 680, event_flags: 0 }
data: Gtid(GtidEvent { flags: 0, gtid: "36682cf3-a048-11ed-b4b3-0242ac110004:125756" })
header: EventHeader { timestamp: 1704445717, event_type: 40, server_id: 1, event_length: 353, next_event_position: 1033, event_flags: 0 }
data: TransactionPayload(TransactionPayloadEvent { uncompressed_size: 633, uncompressed_events: [(EventHeader { timestamp: 1704445716, event_type: 2, server_id: 1, event_length: 74, next_event_position: 0, event_flags: 8 }, Query(QueryEvent { thread_id: 8, exec_time: 1, error_code: 0, schema: "test_db", query: "BEGIN" })), (EventHeader { timestamp: 1704445716, event_type: 29, server_id: 1, event_length: 69, next_event_position: 0, event_flags: 128 }, RowsQuery(RowsQueryEvent { query: "INSERT INTO test_tb VALUES(1,1),(2,2),(3,3),(4,4)" })), (EventHeader { timestamp: 1704445716, event_type: 19, server_id: 1, event_length: 53, next_event_position: 0, event_flags: 0 }, TableMap(TableMapEvent { table_id: 90, database_name: "test_db", table_name: "test_tb", column_types: [3, 3], column_metas: [0, 0], null_bits: [true, true] })), (EventHeader { timestamp: 1704445716, event_type: 30, server_id: 1, event_length: 67, next_event_position: 0, event_flags: 0 }, WriteRows(WriteRowsEvent { table_id: 90, included_columns: [true, true], rows: [RowEvent { column_values: [Long(1), Long(1)] }, RowEvent { column_values: [Long(2), Long(2)] }, RowEvent { column_values: [Long(3), Long(3)] }, RowEvent { column_values: [Long(4), Long(4)] }] })), (EventHeader { timestamp: 1704445717, event_type: 29, server_id: 1, event_length: 63, next_event_position: 0, event_flags: 128 }, RowsQuery(RowsQueryEvent { query: "UPDATE test_tb SET value=3 WHERE id in(1,2)" })), (EventHeader { timestamp: 1704445717, event_type: 19, server_id: 1, event_length: 53, next_event_position: 0, event_flags: 0 }, TableMap(TableMapEvent { table_id: 90, database_name: "test_db", table_name: "test_tb", column_types: [3, 3], column_metas: [0, 0], null_bits: [true, true] })), (EventHeader { timestamp: 1704445717, event_type: 31, server_id: 1, event_length: 68, next_event_position: 0, event_flags: 0 }, UpdateRows(UpdateRowsEvent { table_id: 90, included_columns_before: [true, true], included_columns_after: [true, true], rows: [(RowEvent { column_values: [Long(1), Long(1)] }, RowEvent { column_values: [Long(1), Long(3)] }), (RowEvent { column_values: [Long(2), Long(2)] }, RowEvent { column_values: [Long(2), Long(3)] })] })), (EventHeader { timestamp: 1704445717, event_type: 29, server_id: 1, event_length: 57, next_event_position: 0, event_flags: 128 }, RowsQuery(RowsQueryEvent { query: "DELETE FROM test_tb WHERE id in (1,2)" })), (EventHeader { timestamp: 1704445717, event_type: 19, server_id: 1, event_length: 53, next_event_position: 0, event_flags: 0 }, TableMap(TableMapEvent { table_id: 90, database_name: "test_db", table_name: "test_tb", column_types: [3, 3], column_metas: [0, 0], null_bits: [true, true] })), (EventHeader { timestamp: 1704445717, event_type: 32, server_id: 1, event_length: 49, next_event_position: 0, event_flags: 0 }, DeleteRows(DeleteRowsEvent { table_id: 90, included_columns: [true, true], rows: [RowEvent { column_values: [Long(1), Long(3)] }, RowEvent { column_values: [Long(2), Long(3)] }] })), (EventHeader { timestamp: 1704445717, event_type: 16, server_id: 1, event_length: 27, next_event_position: 0, event_flags: 0 }, Xid(XidEvent { xid: 11 }))] })
header: EventHeader { timestamp: 1704445717, event_type: 33, server_id: 1, event_length: 77, next_event_position: 1110, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "36682cf3-a048-11ed-b4b3-0242ac110004:125757" })
header: EventHeader { timestamp: 1704445717, event_type: 2, server_id: 1, event_length: 104, next_event_position: 1214, event_flags: 0 }
data: Query(QueryEvent { thread_id: 8, exec_time: 0, error_code: 0, schema: "test_db", query: "TRUNCATE TABLE test_tb" })
header: EventHeader { timestamp: 1704445717, event_type: 33, server_id: 1, event_length: 77, next_event_position: 1291, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "36682cf3-a048-11ed-b4b3-0242ac110004:125758" })
header: EventHeader { timestamp: 1704445717, event_type: 2, server_id: 1, event_length: 138, next_event_position: 1429, event_flags: 4 }
data: Query(QueryEvent { thread_id: 8, exec_time: 0, error_code: 0, schema: "test_db", query: "DROP TABLE `test_tb` /* generated by server */" })
用例 3: 将 json 字段解析成 string
fn parse_json_columns(data: EventData) {
let parse_row = |row: RowEvent| {
for column_value in row.column_values {
if let ColumnValue::Json(bytes) = column_value {
println!(
"json column: {}",
JsonBinary::parse_as_string(&bytes).unwrap()
)
}
}
};
match data {
EventData::WriteRows(event) => {
for row in event.rows {
parse_row(row)
}
}
EventData::DeleteRows(event) => {
for row in event.rows {
parse_row(row)
}
}
EventData::UpdateRows(event) => {
for (before, after) in event.rows {
parse_row(before);
parse_row(after);
}
}
_ => {}
}
}
- 执行 sql 语句
CREATE TABLE test_db_1.json_test(id INT AUTO_INCREMENT, json_col JSON, PRIMARY KEY(id));
SET autocommit=0;
INSERT INTO test_db_1.json_test VALUES (NULL, '{"k.1":1,"k.0":0,"k.-1":-1,"k.true":true,"k.false":false,"k.null":null,"k.string":"string","k.true_false":[true,false],"k.32767":32767,"k.32768":32768,"k.-32768":-32768,"k.-32769":-32769,"k.2147483647":2147483647,"k.2147483648":2147483648,"k.-2147483648":-2147483648,"k.-2147483649":-2147483649,"k.18446744073709551615":18446744073709551615,"k.18446744073709551616":18446744073709551616,"k.3.14":3.14,"k.{}":{},"k.[]":[]}');
INSERT INTO test_db_1.json_test VALUES (NULL, '{"中文":"😀"}');
commit;
- 解析出的 json 字段
json column: {"k.0":0,"k.1":1,"k.-1":-1,"k.[]":[],"k.{}":{},"k.3.14":3.14,"k.null":null,"k.true":true,"k.32767":32767,"k.32768":32768,"k.false":false,"k.-32768":-32768,"k.-32769":-32769,"k.string":"string","k.2147483647":2147483647,"k.2147483648":2147483648,"k.true_false":[true,false],"k.-2147483648":-2147483648,"k.-2147483649":-2147483649,"k.18446744073709551615":18446744073709551615,"k.18446744073709551616":18446744073709552000}
json column: {"中文":"😀"}
评论区
写评论解析了binlog,怎么让数据在另外一台机器恢复数据?需要转换成sql语句吗?
已经发布了。
--
👇
lsk569937453: 这个crates是不是没发布啊,我在crates-io上没搜到https://crates.io/search?q=mysql-binlog-connector-rust
测试用例很多哦,可以从测试用例看。
我们公司线上的数据同步工具就是基于这个做的,代码是完全同步的,没有任何藏私哦。
这周末就会发布
--
👇
lsk569937453: 这个crates是不是没发布啊,我在crates-io上没搜到https://crates.io/search?q=mysql-binlog-connector-rust
已经给github仓库star了。
这个crates是不是没发布啊,我在crates-io上没搜到https://crates.io/search?q=mysql-binlog-connector-rust