在实时数据分析领域,Apache Kafka + ClickHouse 的组合已经成为业界标准方案。Kafka 负责高吞吐的消息收集与缓冲,ClickHouse 提供亚秒级的 OLAP 查询能力。2026年,随着 ClickHouse 官方 Kafka Connect Sink Connector 的成熟(已支持原生 Exactly-Once 语义),构建生产级实时管道比以往更加简单可靠。
本文将覆盖完整架构设计、Docker Compose 部署、Connector 配置、以及生产环境性能调优策略。
┌──────────┐ ┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
│ Data │───▶│ Apache Kafka │───▶│ Kafka Connect │───▶│ ClickHouse │
│ Sources │ │ (Broker) │ │ (Sink Connector)│ │ (OLAP Store) │
└──────────┘ └──────────────┘ └─────────────────┘ └──────────────┘
│ │
Schema Registry KeeperMap (Exactly-Once)
数据流路径:
以下配置启动一个完整的开发环境,包含 Zookeeper、Kafka、Kafka Connect(预装 ClickHouse Sink 插件)和 ClickHouse。
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
networks:
- pipeline
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_NUM_PARTITIONS: 6
networks:
- pipeline
clickhouse:
image: clickhouse/clickhouse-server:24.8
ports:
- "8123:8123"
- "9000:9000"
volumes:
- ./clickhouse-config.xml:/etc/clickhouse-server/config.d/custom.xml
networks:
- pipeline
kafka-connect:
image: confluentinc/cp-kafka-connect:7.6.0
depends_on:
- kafka
- clickhouse
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: clickhouse-sink-group
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_REST_ADVERTISED_HOST_NAME: localhost
networks:
- pipeline
networks:
pipeline:
driver: bridge
启动后安装 ClickHouse Sink Connector:
# 进入 Connect 容器安装插件
docker exec -it kafka-connect \
confluent-hub install --no-prompt clickhouse/clickhouse-kafka-connect:latest
# 重启 Connect 使插件生效
docker restart kafka-connect
合理的表引擎和排序键设计是性能的基石:
-- 创建事件表
CREATE TABLE IF NOT EXISTS events
(
event_id String,
event_type LowCardinality(String),
user_id UInt64,
payload String,
timestamp DateTime64(3, 'UTC'),
created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE = MergeTree()
ORDER BY (event_type, timestamp, event_id)
PARTITION BY toYYYYMM(timestamp)
TTL toDateTime(timestamp) + INTERVAL 90 DAY
SETTINGS
max_insert_threads = 4,
merge_with_ttl_timeout = 3600;
设计要点:
ORDER BY 按查询模式排列:event_type 作为第一列支持高效的类型过滤,timestamp 支持时间范围查询PARTITION BY toYYYYMM 按月分区,便于数据管理和过期清理TTL 自动删除 90 天前的旧数据LowCardinality 对低基数字符串字段(如 event_type)可显著降低存储和提升查询速度{
"name": "clickhouse-events-sink",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"tasks.max": "3",
"topics": "events",
"hostname": "clickhouse",
"port": "8123",
"database": "default",
"username": "default",
"password": "",
"ssl": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"consumer.override.max.poll.records": "5000",
"consumer.override.max.partition.fetch.bytes": "5242880",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "events-dlq",
"errors.deadletterqueue.context.headers.enable": "true",
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
}
}
ClickHouse Sink Connector 通过内置的 KeeperMap 实现原生 Exactly-Once 语义,无需额外依赖:
{
"name": "clickhouse-events-sink-eos",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"tasks.max": "3",
"topics": "events",
"hostname": "clickhouse",
"port": "8123",
"database": "default",
"username": "default",
"password": "",
"exactlyOnce": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"consumer.override.max.poll.records": "10000",
"consumer.override.max.partition.fetch.bytes": "5242880",
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
}
}
关键参数说明:
| 参数 | 说明 | 推荐值 |
|---|---|---|
exactlyOnce |
启用 Exactly-Once 语义 | 生产环境 true |
tasks.max |
并行任务数 | 等于 Topic 分区数 |
max.poll.records |
单次拉取记录数 | 5000-10000 |
async_insert=1 |
启用 ClickHouse 异步写入 | 高并发场景推荐 |
wait_for_async_insert=1 |
等待数据落盘再确认 | 保证投递可靠性 |
ClickHouse 天然优化大批量写入。核心原则是尽量增大批次:
# 消费者端配置
consumer.max.poll.records=10000
consumer.max.partition.fetch.bytes=5242880
consumer.fetch.min.bytes=1048576
consumer.fetch.max.wait.ms=300
目标:每次 poll 返回 10,000 条记录,ClickHouse 单次 INSERT 处理大批量数据,减少 part 数量,降低 merge 压力。
当 Connector 发送的批次较小(< 1000 行)或多任务并发写入同一张表时,启用异步写入效果显著:
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=10485760,async_insert_busy_timeout_ms=1000"
ClickHouse 会在内存中缓冲多个 INSERT 请求,达到 10MB 或 1 秒超时后一次性刷盘,大幅减少 part 数量。
如果一个 Connector 消费多个高流量 Topic,当前实现会串行向各表写入。解决方案:
// 方案一:每个 Topic 独立 Connector
// 方案二:使用 topic2TableMap 映射,但注意串行瓶颈
"topic2TableMap": "events_orders=orders,events_users=users"
推荐:高流量场景下为每个 Topic 部署独立 Connector,实现并行写入。
生产环境必须配置 DLQ,防止单条异常数据阻塞整个管道:
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "events-dlq",
"errors.deadletterqueue.context.headers.enable": "true"
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| Consumer Lag | 消费延迟 | 持续增长 > 5 分钟 |
| Insert QPS | 写入吞吐量 | 突降 > 50% |
| Part 数量 | ClickHouse 表 part 数 | > 1000 |
| Merge 吞吐 | 后台合并速度 | 持续低于 Insert 速率 |
# 创建 Topic
docker exec kafka kafka-topics --bootstrap-server localhost:9092 \
--create --topic events --partitions 6 --replication-factor 1
# 发送测试事件
for i in $(seq 1 1000); do
echo "{\"event_id\":\"evt_$i\",\"event_type\":\"page_view\",\"user_id\":$((RANDOM % 10000)),\"payload\":\"{\\\"url\\\":\\\"/api/data\\\"}\",\"timestamp\":\"$(date -u +%Y-%m-%dT%H:%M:%S.000Z)\"}"
done | docker exec -i kafka kafka-console-producer \
--bootstrap-server localhost:9092 --topic events
# 验证数据
curl "http://localhost:8123/?query=SELECT count(), event_type FROM events GROUP BY event_type"
构建 Kafka 到 ClickHouse 的实时数据管道,2026 年的最佳实践可以归纳为:
ORDER BY、分区策略和 TTLmax.poll.records 设为 5000-10000,配合异步写入这套方案已在多个大规模生产环境中验证,可稳定支撑每秒数十万条事件的实时写入与查询。