三门峡市文章资讯

基于Canal实现MySQL到Elasticsearch的数据同步

2026-05-06 13:29:02 浏览次数:2
详细信息

一、方案选择

1. Canal Server + Adapter(推荐)

2. Canal Server + 自定义Client

3. Canal Server + Kafka + Consumer

二、方案一:Canal Adapter 实现

1. 环境准备

# docker-compose.yml 示例
version: '3'
services:
  mysql:
    image: mysql:8.0
    environment:
      MYSQL_ROOT_PASSWORD: 123456
      MYSQL_DATABASE: test_db
    ports:
      - "3306:3306"

  elasticsearch:
    image: elasticsearch:7.17.0
    environment:
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
      - "9200:9200"

  canal-server:
    image: canal/canal-server:v1.1.6
    environment:
      canal.instance.master.address: mysql:3306
      canal.instance.dbUsername: root
      canal.instance.dbPassword: 123456
    depends_on:
      - mysql

  canal-adapter:
    image: slpcat/canal-adapter:latest
    environment:
      canal.conf.mode: http
    depends_on:
      - canal-server
      - elasticsearch

2. Canal Server 配置

# canal.properties
canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = root
canal.instance.dbPassword = 123456
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex = .*\\..*

# 启用MQ模式(可选)
canal.serverMode = kafka
canal.mq.servers = localhost:9092

3. Canal Adapter 配置

# application.yml
canal.conf:
  mode: tcp  # kafka, rocketMQ, rabbitMQ
  canalServerHost: 127.0.0.1:11111

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test_db?useSSL=false
      username: root
      password: 123456

  canalAdapters:
  - instance: example
    groups:
    - groupId: g1
      outerAdapters:
      - name: es7
        hosts: 127.0.0.1:9200
        properties:
          mode: rest
          # security.auth: test:123456
          cluster.name: elasticsearch

4. 表映射配置

# es7/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: mytest_user
  _id: _id
  sql: "SELECT id AS _id, name, email, age, created_at FROM user"
  etlCondition: "where created_at>={}"
  commitBatch: 3000

三、方案二:自定义Client实现

// 自定义Canal客户端示例
public class ESDataSyncClient {

    public static void main(String[] args) {
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress("127.0.0.1", 11111),
            "example", "", "");

        int batchSize = 1000;
        connector.connect();
        connector.subscribe(".*\\..*");

        while (true) {
            Message message = connector.getWithoutAck(batchSize);
            long batchId = message.getId();

            if (batchId != -1) {
                List<CanalEntry.Entry> entries = message.getEntries();
                for (CanalEntry.Entry entry : entries) {
                    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                        processRowData(entry);
                    }
                }
                connector.ack(batchId);
            }
        }
    }

    private static void processRowData(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(
                entry.getStoreValue());

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                switch (rowChange.getEventType()) {
                    case INSERT:
                        handleInsert(rowData, entry.getHeader().getTableName());
                        break;
                    case UPDATE:
                        handleUpdate(rowData, entry.getHeader().getTableName());
                        break;
                    case DELETE:
                        handleDelete(rowData, entry.getHeader().getTableName());
                        break;
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static void handleInsert(CanalEntry.RowData rowData, String tableName) {
        Map<String, Object> source = new HashMap<>();
        List<CanalEntry.Column> columns = rowData.getAfterColumnsList();

        for (CanalEntry.Column column : columns) {
            source.put(column.getName(), column.getValue());
        }

        // 同步到Elasticsearch
        IndexRequest request = new IndexRequest("mytest_user")
            .id(source.get("id").toString())
            .source(source);
        // 执行ES操作
    }
}

四、高级配置

1. 批量处理优化

// 批量写入ES提高性能
BulkRequest bulkRequest = new BulkRequest();
for (Document doc : documents) {
    IndexRequest request = new IndexRequest("index")
        .id(doc.getId())
        .source(doc.getSource());
    bulkRequest.add(request);
}
BulkResponse response = client.bulk(bulkRequest);

2. 数据转换处理

// 自定义数据转换
public class DataTransformer {
    public static Map<String, Object> transform(Map<String, Object> source) {
        Map<String, Object> target = new HashMap<>();
        // 字段映射
        target.put("userName", source.get("name"));
        target.put("userAge", Integer.parseInt(source.get("age").toString()));
        // 时间格式转换
        target.put("createTime", formatDate(source.get("created_at")));
        return target;
    }
}

3. 错误处理与重试

# adapter重试配置
canalAdapters:
  - instance: example
    retries: 3
    timeout: 5000
    errorHandler:
      type: log_and_continue
      deadLetterQueue: dlq_es_sync

五、监控与管理

1. 健康检查

# Canal Server状态检查
curl http://localhost:11111/health

# ES索引状态
curl http://localhost:9200/_cat/indices?v

2. 监控指标

# Prometheus监控配置示例
- job_name: 'canal'
  static_configs:
    - targets: ['localhost:11111']

- job_name: 'elasticsearch'
  static_configs:
    - targets: ['localhost:9200']

六、最佳实践建议

索引设计

同步策略

容错处理

性能优化

七、常见问题

数据不一致

同步延迟

内存溢出

这种架构既能保证数据同步的实时性,又能通过合理的配置确保系统的稳定性和可维护性。

相关推荐