实时数据处理是现代数据架构的核心能力。无论是用户行为分析、IoT设备监控,还是金融交易系统,都需要低延迟、高吞吐的数据管道。这篇文章介绍如何用Python构建一个从Kafka到ClickHouse的完整实时数据管道。
我们的数据管道包含以下组件:
整个流程:应用产生日志 -> Kafka暂存 -> Python消费者拉取处理 -> 批量写入ClickHouse -> 可供实时查询分析。
Kafka:业界标准的分布式消息队列,高吞吐、可持久化、支持消费者组实现水平扩展。
ClickHouse:列式存储的OLAP数据库,单节点每秒可处理数亿行数据,特别适合时序数据和日志分析。
Python:生态丰富,开发效率高,有成熟的Kafka客户端(aiokafka)和ClickHouse驱动。
使用Docker Compose快速搭建测试环境:
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
clickhouse:
image: clickhouse/clickhouse-server:24.3
ports:
- "8123:8123"
- "9000:9000"
volumes:
- clickhouse_data:/var/lib/clickhouse
volumes:
clickhouse_data:
启动服务:
docker-compose up -d
"""
实时数据管道:Kafka -> Python处理 -> ClickHouse
支持批量写入、错误重试、监控指标
"""
import asyncio
import json
import logging
import signal
import time
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Dict, Optional, Callable
from collections import deque
import random
import statistics
import aiokafka
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import aiohttp
import asyncpg
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class UserEvent:
"""用户行为事件"""
user_id: str
event_type: str
page_url: str
timestamp: datetime
session_id: str
device_type: str
ip_address: str
duration_ms: int
metadata: Dict
def to_clickhouse_row(self) -> tuple:
"""转换为ClickHouse行格式"""
return (
self.user_id,
self.event_type,
self.page_url,
self.timestamp,
self.session_id,
self.device_type,
self.ip_address,
self.duration_ms,
json.dumps(self.metadata)
)
class MetricsCollector:
"""指标收集器"""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.processing_times: deque = deque(maxlen=window_size)
self.messages_consumed = 0
self.messages_produced = 0
self.messages_inserted = 0
self.errors_count = 0
self.start_time = time.time()
self._lock = asyncio.Lock()
async def record_processing_time(self, duration: float):
async with self._lock:
self.processing_times.append(duration)
async def increment_consumed(self, count: int = 1):
async with self._lock:
self.messages_consumed += count
async def increment_inserted(self, count: int = 1):
async with self._lock:
self.messages_inserted += count
async def increment_errors(self, count: int = 1):
async with self._lock:
self.errors_count += count
def get_stats(self) -> Dict:
elapsed = time.time() - self.start_time
stats = {
'uptime_seconds': int(elapsed),
'messages_consumed': self.messages_consumed,
'messages_inserted': self.messages_inserted,
'errors': self.errors_count,
'consume_rate': round(self.messages_consumed / elapsed, 2) if elapsed > 0 else 0,
'insert_rate': round(self.messages_inserted / elapsed, 2) if elapsed > 0 else 0,
}
if self.processing_times:
times = list(self.processing_times)
stats['avg_processing_ms'] = round(statistics.mean(times) * 1000, 2)
stats['p95_processing_ms'] = round(sorted(times)[int(len(times) * 0.95)] * 1000, 2)
stats['p99_processing_ms'] = round(sorted(times)[int(len(times) * 0.99)] * 1000, 2)
return stats
def log_stats(self):
stats = self.get_stats()
logger.info(f"📊 指标统计: {json.dumps(stats, ensure_ascii=False)}")
class ClickHouseClient:
"""ClickHouse异步客户端"""
def __init__(
self,
host: str = "localhost",
port: int = 8123,
database: str = "default",
username: str = "default",
password: str = ""
):
self.host = host
self.port = port
self.database = database
self.username = username
self.password = password
self.base_url = f"http://{host}:{port}"
self.session: Optional[aiohttp.ClientSession] = None
async def connect(self):
self.session = aiohttp.ClientSession()
await self._ensure_table_exists()
async def close(self):
if self.session:
await self.session.close()
async def _execute(self, query: str, data: Optional[str] = None) -> str:
"""执行ClickHouse查询"""
url = f"{self.base_url}/"
params = {
'query': query,
'database': self.database,
'user': self.username,
'password': self.password
}
async with self.session.post(url, params=params, data=data) as response:
if response.status != 200:
text = await response.text()
raise Exception(f"ClickHouse error: {text}")
return await response.text()
async def _ensure_table_exists(self):
"""确保目标表存在"""
create_table_query = """
CREATE TABLE IF NOT EXISTS user_events (
user_id String,
event_type String,
page_url String,
timestamp DateTime64(3),
session_id String,
device_type String,
ip_address String,
duration_ms Int32,
metadata String
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (timestamp, event_type)
TTL timestamp + INTERVAL 90 DAY
SETTINGS index_granularity = 8192
"""
await self._execute(create_table_query)
logger.info("ClickHouse表检查/创建完成")
async def insert_batch(self, events: List[UserEvent]) -> int:
"""批量插入事件"""
if not events:
return 0
# 构建TSV格式数据
rows = []
for event in events:
row = event.to_clickhouse_row()
# TSV格式:用制表符分隔,换行符结束
line = '\t'.join(str(field) for field in row)
rows.append(line)
data = '\n'.join(rows)
query = "INSERT INTO user_events FORMAT TabSeparated"
await self._execute(query, data)
return len(events)
class DataTransformer:
"""数据转换器:清洗、过滤、丰富"""
def __init__(self):
self.filters: List[Callable[[UserEvent], bool]] = []
self.enrichers: List[Callable[[UserEvent], UserEvent]] = []
def add_filter(self, filter_fn: Callable[[UserEvent], bool]):
self.filters.append(filter_fn)
return self
def add_enricher(self, enrich_fn: Callable[[UserEvent], UserEvent]):
self.enrichers.append(enrich_fn)
return self
def transform(self, event: UserEvent) -> Optional[UserEvent]:
"""转换单个事件,返回None表示过滤掉"""
# 应用过滤器
for filter_fn in self.filters:
if not filter_fn(event):
return None
# 应用丰富器
for enrich_fn in self.enrichers:
event = enrich_fn(event)
return event
class KafkaToClickHousePipeline:
"""Kafka到ClickHouse的数据管道"""
def __init__(
self,
kafka_bootstrap: str = "localhost:9092",
topic: str = "user-events",
consumer_group: str = "clickhouse-sink",
clickhouse_host: str = "localhost",
batch_size: int = 1000,
flush_interval: float = 5.0,
max_retries: int = 3
):
self.kafka_bootstrap = kafka_bootstrap
self.topic = topic
self.consumer_group = consumer_group
self.batch_size = batch_size
self.flush_interval = flush_interval
self.max_retries = max_retries
self.consumer: Optional[AIOKafkaConsumer] = None
self.clickhouse = ClickHouseClient(host=clickhouse_host)
self.transformer = DataTransformer()
self.metrics = MetricsCollector()
self.buffer: List[UserEvent] = []
self._running = False
self._flush_task: Optional[asyncio.Task] = None
# 设置默认转换规则
self._setup_default_transforms()
def _setup_default_transforms(self):
"""设置默认的数据转换规则"""
# 过滤无效事件
self.transformer.add_filter(
lambda e: e.user_id and e.event_type and len(e.user_id) < 100
)
# 过滤测试用户
self.transformer.add_filter(
lambda e: not e.user_id.startswith("test_")
)
# 丰富设备信息
def enrich_device(event: UserEvent) -> UserEvent:
device_mapping = {
'mobile': 'Mobile',
'desktop': 'Desktop',
'tablet': 'Tablet'
}
event.device_type = device_mapping.get(
event.device_type.lower(),
'Unknown'
)
return event
self.transformer.add_enricher(enrich_device)
async def start(self):
"""启动管道"""
logger.info("正在启动数据管道...")
# 连接ClickHouse
await self.clickhouse.connect()
# 创建Kafka消费者
self.consumer = AIOKafkaConsumer(
self.topic,
bootstrap_servers=self.kafka_bootstrap,
group_id=self.consumer_group,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=False, # 手动提交offset
max_poll_records=self.batch_size,
)
await self.consumer.start()
self._running = True
# 启动定时刷新任务
self._flush_task = asyncio.create_task(self._periodic_flush())
# 启动指标打印任务
asyncio.create_task(self._metrics_printer())
logger.info("数据管道启动完成,开始消费...")
# 主消费循环
try:
await self._consume_loop()
finally:
await self.stop()
async def _consume_loop(self):
"""主消费循环"""
async for msg in self.consumer:
if not self._running:
break
start_time = time.time()
try:
# 解析消息
event_data = msg.value
event = UserEvent(
user_id=event_data['user_id'],
event_type=event_data['event_type'],
page_url=event_data['page_url'],
timestamp=datetime.fromisoformat(event_data['timestamp']),
session_id=event_data['session_id'],
device_type=event_data['device_type'],
ip_address=event_data['ip_address'],
duration_ms=event_data['duration_ms'],
metadata=event_data.get('metadata', {})
)
# 数据转换
transformed = self.transformer.transform(event)
if transformed:
self.buffer.append(transformed)
await self.metrics.increment_consumed()
# 检查是否需要刷新
if len(self.buffer) >= self.batch_size:
await self._flush_buffer()
# 记录处理时间
processing_time = time.time() - start_time
await self.metrics.record_processing_time(processing_time)
except Exception as e:
logger.error(f"处理消息失败: {e}")
await self.metrics.increment_errors()
async def _periodic_flush(self):
"""定时刷新缓冲区"""
while self._running:
await asyncio.sleep(self.flush_interval)
if self.buffer:
await self._flush_buffer()
async def _flush_buffer(self):
"""将缓冲区数据写入ClickHouse"""
if not self.buffer:
return
batch = self.buffer.copy()
self.buffer = []
for attempt in range(self.max_retries):
try:
inserted = await self.clickhouse.insert_batch(batch)
await self.metrics.increment_inserted(inserted)
# 提交Kafka offset
await self.consumer.commit()
logger.debug(f"成功插入 {inserted} 条记录")
return
except Exception as e:
logger.error(f"写入ClickHouse失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
await self.metrics.increment_errors()
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt) # 指数退避
else:
logger.error(f"丢弃 {len(batch)} 条记录,写入失败")
async def _metrics_printer(self):
"""定期打印指标"""
while self._running:
await asyncio.sleep(30)
self.metrics.log_stats()
async def stop(self):
"""停止管道"""
logger.info("正在停止数据管道...")
self._running = False
# 最后一次刷新
if self.buffer:
await self._flush_buffer()
# 取消定时任务
if self._flush_task:
self._flush_task.cancel()
try:
await self._flush_task
except asyncio.CancelledError:
pass
# 关闭连接
if self.consumer:
await self.consumer.stop()
await self.clickhouse.close()
# 打印最终统计
self.metrics.log_stats()
logger.info("数据管道已停止")
class DataGenerator:
"""模拟数据生成器"""
def __init__(
self,
kafka_bootstrap: str = "localhost:9092",
topic: str = "user-events"
):
self.kafka_bootstrap = kafka_bootstrap
self.topic = topic
self.producer: Optional[AIOKafkaProducer] = None
async def start(self):
self.producer = AIOKafkaProducer(
bootstrap_servers=self.kafka_bootstrap,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
compression_type='gzip',
)
await self.producer.start()
async def stop(self):
if self.producer:
await self.producer.stop()
def generate_event(self) -> Dict:
"""生成随机事件"""
event_types = ['page_view', 'click', 'scroll', 'purchase', 'login', 'logout']
device_types = ['mobile', 'desktop', 'tablet']
pages = ['/home', '/products', '/cart', '/checkout', '/profile', '/search']
user_id = f"user_{random.randint(1, 100000)}"
session_id = hashlib.md5(f"{user_id}_{datetime.now().strftime('%Y%m%d')}".encode()).hexdigest()[:16]
return {
'user_id': user_id,
'event_type': random.choice(event_types),
'page_url': random.choice(pages),
'timestamp': datetime.now().isoformat(),
'session_id': session_id,
'device_type': random.choice(device_types),
'ip_address': f"192.168.{random.randint(0, 255)}.{random.randint(0, 255)}",
'duration_ms': random.randint(10, 10000),
'metadata': {
'referrer': random.choice(['google', 'direct', 'social', 'email']),
'ab_test_group': random.choice(['A', 'B'])
}
}
async def generate_continuously(self, rate_per_second: int = 100):
"""持续生成数据"""
logger.info(f"开始生成数据,速率: {rate_per_second}/秒")
interval = 1.0 / rate_per_second
count = 0
while True:
event = self.generate_event()
await self.producer.send(self.topic, event)
count += 1
if count % 1000 == 0:
logger.info(f"已生成 {count} 条事件")
await asyncio.sleep(interval)
# 主程序
async def main():
# 创建数据生成器
generator = DataGenerator()
await generator.start()
# 创建数据管道
pipeline = KafkaToClickHousePipeline(
batch_size=500,
flush_interval=3.0
)
# 设置信号处理
def signal_handler(sig, frame):
logger.info("收到停止信号...")
asyncio.create_task(pipeline.stop())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 并行运行生成器和管道
await asyncio.gather(
generator.generate_continuously(rate_per_second=500),
pipeline.start(),
return_exceptions=True
)
if __name__ == "__main__":
asyncio.run(main())
1. 启动基础设施
docker-compose up -d
2. 创建Kafka Topic
docker exec -it <kafka-container> kafka-topics --create --topic user-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
3. 运行数据管道
pip install aiokafka aiohttp asyncpg
python data_pipeline.py
1. 批量大小调优
2. 并行消费
3. 异步写入
asyncio.gather并行写入多个批次4. 数据分区
5. 监控告警
这套架构可以支撑每秒数万条事件的实时处理,适用于大多数互联网应用场景。根据实际数据量,可以水平扩展Kafka分区和消费者实例。