概述

在实时数据分析领域,Apache Kafka + ClickHouse 的组合已经成为业界标准方案。Kafka 负责高吞吐的消息收集与缓冲,ClickHouse 提供亚秒级的 OLAP 查询能力。2026年,随着 ClickHouse 官方 Kafka Connect Sink Connector 的成熟(已支持原生 Exactly-Once 语义),构建生产级实时管道比以往更加简单可靠。

本文将覆盖完整架构设计、Docker Compose 部署、Connector 配置、以及生产环境性能调优策略。

整体架构

┌──────────┐    ┌──────────────┐    ┌─────────────────┐    ┌──────────────┐
│  Data     │───▶│  Apache Kafka │───▶│ Kafka Connect   │───▶│ ClickHouse   │
│  Sources  │    │  (Broker)     │    │ (Sink Connector)│    │ (OLAP Store) │
└──────────┘    └──────────────┘    └─────────────────┘    └──────────────┘
                     │                       │
                Schema Registry        KeeperMap (Exactly-Once)

数据流路径:

  1. 各类数据源(应用日志、CDC、IoT 传感器)将事件写入 Kafka Topic
  2. Kafka Connect Sink Connector 从 Topic 消费数据
  3. 通过 ClickHouse HTTP 接口批量写入目标表
  4. ClickHouse 自动完成后台合并(Merge),提供实时查询能力

快速部署:Docker Compose

以下配置启动一个完整的开发环境,包含 Zookeeper、Kafka、Kafka Connect(预装 ClickHouse Sink 插件)和 ClickHouse。

# docker-compose.yml
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    networks:
      - pipeline

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_NUM_PARTITIONS: 6
    networks:
      - pipeline

  clickhouse:
    image: clickhouse/clickhouse-server:24.8
    ports:
      - "8123:8123"
      - "9000:9000"
    volumes:
      - ./clickhouse-config.xml:/etc/clickhouse-server/config.d/custom.xml
    networks:
      - pipeline

  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.6.0
    depends_on:
      - kafka
      - clickhouse
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: clickhouse-sink-group
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_REST_ADVERTISED_HOST_NAME: localhost
    networks:
      - pipeline

networks:
  pipeline:
    driver: bridge

启动后安装 ClickHouse Sink Connector:

# 进入 Connect 容器安装插件
docker exec -it kafka-connect \
  confluent-hub install --no-prompt clickhouse/clickhouse-kafka-connect:latest

# 重启 Connect 使插件生效
docker restart kafka-connect

ClickHouse 表设计

合理的表引擎和排序键设计是性能的基石:

-- 创建事件表
CREATE TABLE IF NOT EXISTS events
(
    event_id    String,
    event_type  LowCardinality(String),
    user_id     UInt64,
    payload     String,
    timestamp   DateTime64(3, 'UTC'),
    created_at  DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE = MergeTree()
ORDER BY (event_type, timestamp, event_id)
PARTITION BY toYYYYMM(timestamp)
TTL toDateTime(timestamp) + INTERVAL 90 DAY
SETTINGS
    max_insert_threads = 4,
    merge_with_ttl_timeout = 3600;

设计要点:

Kafka Connect Sink 配置

基础配置(At-Least-Once)

{
  "name": "clickhouse-events-sink",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "3",
    "topics": "events",
    "hostname": "clickhouse",
    "port": "8123",
    "database": "default",
    "username": "default",
    "password": "",
    "ssl": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "consumer.override.max.poll.records": "5000",
    "consumer.override.max.partition.fetch.bytes": "5242880",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "events-dlq",
    "errors.deadletterqueue.context.headers.enable": "true",
    "clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
  }
}

Exactly-Once 配置(推荐生产使用)

ClickHouse Sink Connector 通过内置的 KeeperMap 实现原生 Exactly-Once 语义,无需额外依赖:

{
  "name": "clickhouse-events-sink-eos",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "3",
    "topics": "events",
    "hostname": "clickhouse",
    "port": "8123",
    "database": "default",
    "username": "default",
    "password": "",
    "exactlyOnce": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "consumer.override.max.poll.records": "10000",
    "consumer.override.max.partition.fetch.bytes": "5242880",
    "clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
  }
}

关键参数说明:

参数 说明 推荐值
exactlyOnce 启用 Exactly-Once 语义 生产环境 true
tasks.max 并行任务数 等于 Topic 分区数
max.poll.records 单次拉取记录数 5000-10000
async_insert=1 启用 ClickHouse 异步写入 高并发场景推荐
wait_for_async_insert=1 等待数据落盘再确认 保证投递可靠性

性能调优实战

1. 批量大小调优

ClickHouse 天然优化大批量写入。核心原则是尽量增大批次

# 消费者端配置
consumer.max.poll.records=10000
consumer.max.partition.fetch.bytes=5242880
consumer.fetch.min.bytes=1048576
consumer.fetch.max.wait.ms=300

目标:每次 poll 返回 10,000 条记录,ClickHouse 单次 INSERT 处理大批量数据,减少 part 数量,降低 merge 压力。

2. 异步写入(Async Insert)

当 Connector 发送的批次较小(< 1000 行)或多任务并发写入同一张表时,启用异步写入效果显著:

"clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=10485760,async_insert_busy_timeout_ms=1000"

ClickHouse 会在内存中缓冲多个 INSERT 请求,达到 10MB 或 1 秒超时后一次性刷盘,大幅减少 part 数量。

3. 多 Topic 场景

如果一个 Connector 消费多个高流量 Topic,当前实现会串行向各表写入。解决方案:

// 方案一:每个 Topic 独立 Connector
// 方案二:使用 topic2TableMap 映射,但注意串行瓶颈
"topic2TableMap": "events_orders=orders,events_users=users"

推荐:高流量场景下为每个 Topic 部署独立 Connector,实现并行写入。

4. 死信队列(DLQ)

生产环境必须配置 DLQ,防止单条异常数据阻塞整个管道:

"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "events-dlq",
"errors.deadletterqueue.context.headers.enable": "true"

5. 关键监控指标

指标 说明 告警阈值
Consumer Lag 消费延迟 持续增长 > 5 分钟
Insert QPS 写入吞吐量 突降 > 50%
Part 数量 ClickHouse 表 part 数 > 1000
Merge 吞吐 后台合并速度 持续低于 Insert 速率

发送测试数据

# 创建 Topic
docker exec kafka kafka-topics --bootstrap-server localhost:9092 \
  --create --topic events --partitions 6 --replication-factor 1

# 发送测试事件
for i in $(seq 1 1000); do
  echo "{\"event_id\":\"evt_$i\",\"event_type\":\"page_view\",\"user_id\":$((RANDOM % 10000)),\"payload\":\"{\\\"url\\\":\\\"/api/data\\\"}\",\"timestamp\":\"$(date -u +%Y-%m-%dT%H:%M:%S.000Z)\"}"
done | docker exec -i kafka kafka-console-producer \
  --bootstrap-server localhost:9092 --topic events

# 验证数据
curl "http://localhost:8123/?query=SELECT count(), event_type FROM events GROUP BY event_type"

总结

构建 Kafka 到 ClickHouse 的实时数据管道,2026 年的最佳实践可以归纳为:

  1. 使用官方 ClickHouse Kafka Connect Sink,获得原生 Exactly-Once 支持
  2. 合理设计表结构:选择合适的 ORDER BY、分区策略和 TTL
  3. 调大批量写入max.poll.records 设为 5000-10000,配合异步写入
  4. 高流量场景拆分 Connector:每个 Topic 独立部署,避免串行瓶颈
  5. 配置 DLQ 和监控:确保管道韧性和可观测性

这套方案已在多个大规模生产环境中验证,可稳定支撑每秒数十万条事件的实时写入与查询。