Serverless architectures have transformed how teams build data pipelines. By eliminating the need to provision and manage servers, AWS Lambda and Amazon S3 allow developers to focus on business logic rather than infrastructure. In this step-by-step guide, we will build an event-driven data pipeline that processes files uploaded to S3, transforms the data using Lambda, and orchestrates multi-step workflows with AWS Step Functions.
The pipeline we will build follows this flow:
This pattern is highly scalable. Lambda scales automatically with the number of incoming events, and S3 provides virtually unlimited storage. You pay only for the compute time you consume and the storage you use.
Create two S3 buckets: one for raw input and one for processed output. Enable versioning on both buckets to protect against accidental overwrites and to maintain an audit trail.
aws s3 mb s3://my-data-pipeline-input
aws s3 mb s3://my-data-pipeline-output
aws s3api put-bucket-versioning \
--bucket my-data-pipeline-input \
--versioning-configuration Status=Enabled
Organize your input bucket with prefixes such as uploads/ and errors/ to simplify lifecycle policies and access controls.
The Lambda function is the heart of the pipeline. It will be triggered by S3 ObjectCreated events and will process each uploaded file.
Here is a Python implementation that reads a CSV file, validates its contents, and writes a cleaned version to the output bucket.
import json
import boto3
import csv
import io
import urllib.parse
s3 = boto3.client('s3')
def lambda_handler(event, context):
source_bucket = event['Records'][0]['s3']['bucket']['name']
source_key = urllib.parse.unquote_plus(
event['Records'][0]['s3']['object']['key'],
encoding='utf-8'
)
target_bucket = 'my-data-pipeline-output'
target_key = f"processed/{source_key.split('/')[-1]}"
try:
response = s3.get_object(Bucket=source_bucket, Key=source_key)
lines = response['Body'].read().decode('utf-8').splitlines()
reader = csv.DictReader(lines)
valid_rows = []
for row in reader:
if validate_row(row):
valid_rows.append(transform_row(row))
if not valid_rows:
raise ValueError("No valid rows found in file")
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=valid_rows[0].keys())
writer.writeheader()
writer.writerows(valid_rows)
s3.put_object(
Bucket=target_bucket,
Key=target_key,
Body=output.getvalue().encode('utf-8'),
ContentType='text/csv'
)
return {
'statusCode': 200,
'body': json.dumps({'message': 'File processed successfully', 'targetKey': target_key})
}
except Exception as e:
print(f"Error processing {source_key}: {str(e)}")
raise
def validate_row(row):
required_fields = ['id', 'name', 'email']
return all(field in row and row[field].strip() for field in required_fields)
def transform_row(row):
return {
'id': row['id'].strip(),
'name': row['name'].strip().title(),
'email': row['email'].strip().lower()
}
Deploy this function using the AWS CLI, AWS SAM, or Terraform. Ensure the Lambda execution role has permissions to read from the input bucket and write to the output bucket.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": "arn:aws:s3:::my-data-pipeline-input/*"
},
{
"Effect": "Allow",
"Action": ["s3:PutObject"],
"Resource": "arn:aws:s3:::my-data-pipeline-output/*"
}
]
}
With the Lambda function deployed, configure the S3 event notification to invoke it automatically whenever a new object is created in the input bucket.
aws lambda add-permission \
--function-name data-processor \
--statement-id s3invoke \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::my-data-pipeline-input \
--source-account $(aws sts get-caller-identity --query Account --output text)
aws s3api put-bucket-notification-configuration \
--bucket my-data-pipeline-input \
--notification-configuration '{
"LambdaFunctionConfigurations": [{
"LambdaFunctionArn": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:data-processor",
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"KeyFilterRules": [
{"Name": "prefix", "Value": "uploads/"}
]
}
}]
}'
The prefix filter ensures the Lambda is only triggered for objects placed under the uploads/ prefix, preventing infinite loops if the function ever writes back to the same bucket.
For pipelines that require multiple stages, AWS Step Functions provides a visual workflow engine. You can define state machines in Amazon States Language to coordinate Lambda functions, handle errors, and implement branching logic.
Here is a simple state machine that invokes our data processor and sends a notification on success or failure.
{
"Comment": "Data Pipeline State Machine",
"StartAt": "ProcessFile",
"States": {
"ProcessFile": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:data-processor",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "SendFailureNotification"
}
],
"Next": "SendSuccessNotification"
},
"SendSuccessNotification": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:notify-success",
"End": true
},
"SendFailureNotification": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:notify-failure",
"End": true
}
}
}
Instead of invoking the processor Lambda directly from S3, you can trigger the Step Functions state machine. This decouples the event source from the workflow logic and makes the pipeline easier to extend.
No pipeline is complete without robust error handling and observability.
Dead Letter Queues: Configure a dead letter queue (DLQ) for your Lambda function to capture events that fail after the maximum retry attempts.
aws lambda update-function-configuration \
--function-name data-processor \
--dead-letter-config TargetArn=arn:aws:sns:us-east-1:ACCOUNT_ID:data-pipeline-dlq
CloudWatch Alarms: Set alarms on Lambda error rates, duration thresholds, and S3 bucket size growth. An unexpected spike in errors or processing time often indicates a data quality issue or a downstream dependency failure.
Structured Logging: Use structured JSON logging within your Lambda function to make log queries in CloudWatch Logs Insights more effective.
import logging
import json
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info(json.dumps({
'message': 'Processing started',
'bucket': event['Records'][0]['s3']['bucket']['name'],
'key': event['Records'][0]['s3']['object']['key'],
'requestId': context.aws_request_id
}))
# processing logic
Idempotency: S3 event notifications are delivered at least once. If your processing logic has side effects, such as writing to a database, implement idempotency using the S3 object version ID or ETag as a deduplication key.
Serverless does not mean costless. Lambda charges per invocation and per gigabyte-second of compute. S3 charges for storage, requests, and data transfer. For high-volume pipelines, consider these optimizations:
AWS Lambda and S3 provide a solid foundation for building scalable, event-driven data pipelines without managing servers. By combining S3 event notifications for triggering, Lambda for processing, and Step Functions for orchestration, you can construct workflows that grow with your data volume. Layer in proper error handling, monitoring, and cost controls, and you have a production-ready pipeline that lets your team focus on extracting value from data rather than maintaining infrastructure.