实时数据处理是现代数据架构的核心能力。无论是用户行为分析、IoT设备监控,还是金融交易系统,都需要低延迟、高吞吐的数据管道。这篇文章介绍如何用Python构建一个从Kafka到ClickHouse的完整实时数据管道。

架构概述

我们的数据管道包含以下组件:

整个流程:应用产生日志 -> Kafka暂存 -> Python消费者拉取处理 -> 批量写入ClickHouse -> 可供实时查询分析。

为什么选择这个技术栈?

Kafka:业界标准的分布式消息队列,高吞吐、可持久化、支持消费者组实现水平扩展。

ClickHouse:列式存储的OLAP数据库,单节点每秒可处理数亿行数据,特别适合时序数据和日志分析。

Python:生态丰富,开发效率高,有成熟的Kafka客户端(aiokafka)和ClickHouse驱动。

环境准备

使用Docker Compose快速搭建测试环境:

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  clickhouse:
    image: clickhouse/clickhouse-server:24.3
    ports:
      - "8123:8123"
      - "9000:9000"
    volumes:
      - clickhouse_data:/var/lib/clickhouse

volumes:
  clickhouse_data:

启动服务:

docker-compose up -d

完整代码实现

"""
实时数据管道:Kafka -> Python处理 -> ClickHouse
支持批量写入、错误重试、监控指标
"""

import asyncio
import json
import logging
import signal
import time
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Dict, Optional, Callable
from collections import deque
import random
import statistics

import aiokafka
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import aiohttp
import asyncpg

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


@dataclass
class UserEvent:
    """用户行为事件"""
    user_id: str
    event_type: str
    page_url: str
    timestamp: datetime
    session_id: str
    device_type: str
    ip_address: str
    duration_ms: int
    metadata: Dict
    
    def to_clickhouse_row(self) -> tuple:
        """转换为ClickHouse行格式"""
        return (
            self.user_id,
            self.event_type,
            self.page_url,
            self.timestamp,
            self.session_id,
            self.device_type,
            self.ip_address,
            self.duration_ms,
            json.dumps(self.metadata)
        )


class MetricsCollector:
    """指标收集器"""
    
    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self.processing_times: deque = deque(maxlen=window_size)
        self.messages_consumed = 0
        self.messages_produced = 0
        self.messages_inserted = 0
        self.errors_count = 0
        self.start_time = time.time()
        self._lock = asyncio.Lock()
    
    async def record_processing_time(self, duration: float):
        async with self._lock:
            self.processing_times.append(duration)
    
    async def increment_consumed(self, count: int = 1):
        async with self._lock:
            self.messages_consumed += count
    
    async def increment_inserted(self, count: int = 1):
        async with self._lock:
            self.messages_inserted += count
    
    async def increment_errors(self, count: int = 1):
        async with self._lock:
            self.errors_count += count
    
    def get_stats(self) -> Dict:
        elapsed = time.time() - self.start_time
        stats = {
            'uptime_seconds': int(elapsed),
            'messages_consumed': self.messages_consumed,
            'messages_inserted': self.messages_inserted,
            'errors': self.errors_count,
            'consume_rate': round(self.messages_consumed / elapsed, 2) if elapsed > 0 else 0,
            'insert_rate': round(self.messages_inserted / elapsed, 2) if elapsed > 0 else 0,
        }
        
        if self.processing_times:
            times = list(self.processing_times)
            stats['avg_processing_ms'] = round(statistics.mean(times) * 1000, 2)
            stats['p95_processing_ms'] = round(sorted(times)[int(len(times) * 0.95)] * 1000, 2)
            stats['p99_processing_ms'] = round(sorted(times)[int(len(times) * 0.99)] * 1000, 2)
        
        return stats
    
    def log_stats(self):
        stats = self.get_stats()
        logger.info(f"📊 指标统计: {json.dumps(stats, ensure_ascii=False)}")


class ClickHouseClient:
    """ClickHouse异步客户端"""
    
    def __init__(
        self,
        host: str = "localhost",
        port: int = 8123,
        database: str = "default",
        username: str = "default",
        password: str = ""
    ):
        self.host = host
        self.port = port
        self.database = database
        self.username = username
        self.password = password
        self.base_url = f"http://{host}:{port}"
        self.session: Optional[aiohttp.ClientSession] = None
    
    async def connect(self):
        self.session = aiohttp.ClientSession()
        await self._ensure_table_exists()
    
    async def close(self):
        if self.session:
            await self.session.close()
    
    async def _execute(self, query: str, data: Optional[str] = None) -> str:
        """执行ClickHouse查询"""
        url = f"{self.base_url}/"
        params = {
            'query': query,
            'database': self.database,
            'user': self.username,
            'password': self.password
        }
        
        async with self.session.post(url, params=params, data=data) as response:
            if response.status != 200:
                text = await response.text()
                raise Exception(f"ClickHouse error: {text}")
            return await response.text()
    
    async def _ensure_table_exists(self):
        """确保目标表存在"""
        create_table_query = """
        CREATE TABLE IF NOT EXISTS user_events (
            user_id String,
            event_type String,
            page_url String,
            timestamp DateTime64(3),
            session_id String,
            device_type String,
            ip_address String,
            duration_ms Int32,
            metadata String
        ) ENGINE = MergeTree()
        PARTITION BY toYYYYMMDD(timestamp)
        ORDER BY (timestamp, event_type)
        TTL timestamp + INTERVAL 90 DAY
        SETTINGS index_granularity = 8192
        """
        await self._execute(create_table_query)
        logger.info("ClickHouse表检查/创建完成")
    
    async def insert_batch(self, events: List[UserEvent]) -> int:
        """批量插入事件"""
        if not events:
            return 0
        
        # 构建TSV格式数据
        rows = []
        for event in events:
            row = event.to_clickhouse_row()
            # TSV格式:用制表符分隔,换行符结束
            line = '\t'.join(str(field) for field in row)
            rows.append(line)
        
        data = '\n'.join(rows)
        
        query = "INSERT INTO user_events FORMAT TabSeparated"
        await self._execute(query, data)
        
        return len(events)


class DataTransformer:
    """数据转换器:清洗、过滤、丰富"""
    
    def __init__(self):
        self.filters: List[Callable[[UserEvent], bool]] = []
        self.enrichers: List[Callable[[UserEvent], UserEvent]] = []
    
    def add_filter(self, filter_fn: Callable[[UserEvent], bool]):
        self.filters.append(filter_fn)
        return self
    
    def add_enricher(self, enrich_fn: Callable[[UserEvent], UserEvent]):
        self.enrichers.append(enrich_fn)
        return self
    
    def transform(self, event: UserEvent) -> Optional[UserEvent]:
        """转换单个事件,返回None表示过滤掉"""
        # 应用过滤器
        for filter_fn in self.filters:
            if not filter_fn(event):
                return None
        
        # 应用丰富器
        for enrich_fn in self.enrichers:
            event = enrich_fn(event)
        
        return event


class KafkaToClickHousePipeline:
    """Kafka到ClickHouse的数据管道"""
    
    def __init__(
        self,
        kafka_bootstrap: str = "localhost:9092",
        topic: str = "user-events",
        consumer_group: str = "clickhouse-sink",
        clickhouse_host: str = "localhost",
        batch_size: int = 1000,
        flush_interval: float = 5.0,
        max_retries: int = 3
    ):
        self.kafka_bootstrap = kafka_bootstrap
        self.topic = topic
        self.consumer_group = consumer_group
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.max_retries = max_retries
        
        self.consumer: Optional[AIOKafkaConsumer] = None
        self.clickhouse = ClickHouseClient(host=clickhouse_host)
        self.transformer = DataTransformer()
        self.metrics = MetricsCollector()
        
        self.buffer: List[UserEvent] = []
        self._running = False
        self._flush_task: Optional[asyncio.Task] = None
        
        # 设置默认转换规则
        self._setup_default_transforms()
    
    def _setup_default_transforms(self):
        """设置默认的数据转换规则"""
        # 过滤无效事件
        self.transformer.add_filter(
            lambda e: e.user_id and e.event_type and len(e.user_id) < 100
        )
        
        # 过滤测试用户
        self.transformer.add_filter(
            lambda e: not e.user_id.startswith("test_")
        )
        
        # 丰富设备信息
        def enrich_device(event: UserEvent) -> UserEvent:
            device_mapping = {
                'mobile': 'Mobile',
                'desktop': 'Desktop',
                'tablet': 'Tablet'
            }
            event.device_type = device_mapping.get(
                event.device_type.lower(), 
                'Unknown'
            )
            return event
        
        self.transformer.add_enricher(enrich_device)
    
    async def start(self):
        """启动管道"""
        logger.info("正在启动数据管道...")
        
        # 连接ClickHouse
        await self.clickhouse.connect()
        
        # 创建Kafka消费者
        self.consumer = AIOKafkaConsumer(
            self.topic,
            bootstrap_servers=self.kafka_bootstrap,
            group_id=self.consumer_group,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='earliest',
            enable_auto_commit=False,  # 手动提交offset
            max_poll_records=self.batch_size,
        )
        await self.consumer.start()
        
        self._running = True
        
        # 启动定时刷新任务
        self._flush_task = asyncio.create_task(self._periodic_flush())
        
        # 启动指标打印任务
        asyncio.create_task(self._metrics_printer())
        
        logger.info("数据管道启动完成,开始消费...")
        
        # 主消费循环
        try:
            await self._consume_loop()
        finally:
            await self.stop()
    
    async def _consume_loop(self):
        """主消费循环"""
        async for msg in self.consumer:
            if not self._running:
                break
            
            start_time = time.time()
            
            try:
                # 解析消息
                event_data = msg.value
                event = UserEvent(
                    user_id=event_data['user_id'],
                    event_type=event_data['event_type'],
                    page_url=event_data['page_url'],
                    timestamp=datetime.fromisoformat(event_data['timestamp']),
                    session_id=event_data['session_id'],
                    device_type=event_data['device_type'],
                    ip_address=event_data['ip_address'],
                    duration_ms=event_data['duration_ms'],
                    metadata=event_data.get('metadata', {})
                )
                
                # 数据转换
                transformed = self.transformer.transform(event)
                
                if transformed:
                    self.buffer.append(transformed)
                
                await self.metrics.increment_consumed()
                
                # 检查是否需要刷新
                if len(self.buffer) >= self.batch_size:
                    await self._flush_buffer()
                
                # 记录处理时间
                processing_time = time.time() - start_time
                await self.metrics.record_processing_time(processing_time)
                
            except Exception as e:
                logger.error(f"处理消息失败: {e}")
                await self.metrics.increment_errors()
    
    async def _periodic_flush(self):
        """定时刷新缓冲区"""
        while self._running:
            await asyncio.sleep(self.flush_interval)
            if self.buffer:
                await self._flush_buffer()
    
    async def _flush_buffer(self):
        """将缓冲区数据写入ClickHouse"""
        if not self.buffer:
            return
        
        batch = self.buffer.copy()
        self.buffer = []
        
        for attempt in range(self.max_retries):
            try:
                inserted = await self.clickhouse.insert_batch(batch)
                await self.metrics.increment_inserted(inserted)
                
                # 提交Kafka offset
                await self.consumer.commit()
                
                logger.debug(f"成功插入 {inserted} 条记录")
                return
                
            except Exception as e:
                logger.error(f"写入ClickHouse失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
                await self.metrics.increment_errors()
                
                if attempt < self.max_retries - 1:
                    await asyncio.sleep(2 ** attempt)  # 指数退避
                else:
                    logger.error(f"丢弃 {len(batch)} 条记录,写入失败")
    
    async def _metrics_printer(self):
        """定期打印指标"""
        while self._running:
            await asyncio.sleep(30)
            self.metrics.log_stats()
    
    async def stop(self):
        """停止管道"""
        logger.info("正在停止数据管道...")
        self._running = False
        
        # 最后一次刷新
        if self.buffer:
            await self._flush_buffer()
        
        # 取消定时任务
        if self._flush_task:
            self._flush_task.cancel()
            try:
                await self._flush_task
            except asyncio.CancelledError:
                pass
        
        # 关闭连接
        if self.consumer:
            await self.consumer.stop()
        await self.clickhouse.close()
        
        # 打印最终统计
        self.metrics.log_stats()
        logger.info("数据管道已停止")


class DataGenerator:
    """模拟数据生成器"""
    
    def __init__(
        self,
        kafka_bootstrap: str = "localhost:9092",
        topic: str = "user-events"
    ):
        self.kafka_bootstrap = kafka_bootstrap
        self.topic = topic
        self.producer: Optional[AIOKafkaProducer] = None
    
    async def start(self):
        self.producer = AIOKafkaProducer(
            bootstrap_servers=self.kafka_bootstrap,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            compression_type='gzip',
        )
        await self.producer.start()
    
    async def stop(self):
        if self.producer:
            await self.producer.stop()
    
    def generate_event(self) -> Dict:
        """生成随机事件"""
        event_types = ['page_view', 'click', 'scroll', 'purchase', 'login', 'logout']
        device_types = ['mobile', 'desktop', 'tablet']
        pages = ['/home', '/products', '/cart', '/checkout', '/profile', '/search']
        
        user_id = f"user_{random.randint(1, 100000)}"
        session_id = hashlib.md5(f"{user_id}_{datetime.now().strftime('%Y%m%d')}".encode()).hexdigest()[:16]
        
        return {
            'user_id': user_id,
            'event_type': random.choice(event_types),
            'page_url': random.choice(pages),
            'timestamp': datetime.now().isoformat(),
            'session_id': session_id,
            'device_type': random.choice(device_types),
            'ip_address': f"192.168.{random.randint(0, 255)}.{random.randint(0, 255)}",
            'duration_ms': random.randint(10, 10000),
            'metadata': {
                'referrer': random.choice(['google', 'direct', 'social', 'email']),
                'ab_test_group': random.choice(['A', 'B'])
            }
        }
    
    async def generate_continuously(self, rate_per_second: int = 100):
        """持续生成数据"""
        logger.info(f"开始生成数据,速率: {rate_per_second}/秒")
        
        interval = 1.0 / rate_per_second
        count = 0
        
        while True:
            event = self.generate_event()
            await self.producer.send(self.topic, event)
            count += 1
            
            if count % 1000 == 0:
                logger.info(f"已生成 {count} 条事件")
            
            await asyncio.sleep(interval)


# 主程序
async def main():
    # 创建数据生成器
    generator = DataGenerator()
    await generator.start()
    
    # 创建数据管道
    pipeline = KafkaToClickHousePipeline(
        batch_size=500,
        flush_interval=3.0
    )
    
    # 设置信号处理
    def signal_handler(sig, frame):
        logger.info("收到停止信号...")
        asyncio.create_task(pipeline.stop())
    
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    # 并行运行生成器和管道
    await asyncio.gather(
        generator.generate_continuously(rate_per_second=500),
        pipeline.start(),
        return_exceptions=True
    )


if __name__ == "__main__":
    asyncio.run(main())

运行步骤

1. 启动基础设施

docker-compose up -d

2. 创建Kafka Topic

docker exec -it <kafka-container> kafka-topics --create --topic user-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

3. 运行数据管道

pip install aiokafka aiohttp asyncpg
python data_pipeline.py

性能优化建议

1. 批量大小调优

2. 并行消费

3. 异步写入

4. 数据分区

5. 监控告警

这套架构可以支撑每秒数万条事件的实时处理,适用于大多数互联网应用场景。根据实际数据量,可以水平扩展Kafka分区和消费者实例。