Stream, Process, and Store Data Efficiently with Amazon Kinesis and S3

Ghita EL AMLAQUI
5 min read5 days ago

--

Introduction

In today’s data-driven world, businesses need to process and store streaming data efficiently. AWS provides a robust solution using S3, Kinesis Data Streams, and Kinesis Firehose to handle real-time data ingestion and storage.

This article walks you through building a scalable data pipeline to collect, process, and store streaming data in Amazon S3.

We’ll show you how to do this in the following steps:

  • Step 1: Create an S3 bucket
  • Step 2: Setting Up a Kinesis Data Stream
  • Step 3: Create a Kinesis Firehose Delivery Stream
  • Step 4: End-to-end testing

Architecture

The AWS solution we are building provides:

  • Scalability: Kinesis Data Streams handles high-throughput data ingestion.
  • Reliability: Kinesis Firehose ensures seamless data delivery with automatic retries.
  • Cost Efficiency: Amazon S3 provides a durable and low-cost storage option

Here’s an architecture diagram of what we’ll implement:

Figure 1: Architecture diagram

Key concepts

Before we jump into the implementation, let’s review some key concepts of Kinesis:

  • Data Producer: An application or system that generates data and pushes it into a data stream. Examples include IoT devices, application logs, or real-time analytics systems.
  • Data Consumer: A system that reads and processes data from a data stream. This could be an analytics engine, a machine learning model, or a database.
  • Data Stream: A continuously flowing sequence of data records that are processed in real time. Kinesis Data Streams allow high-throughput data ingestion.
  • Shard: A unit of capacity in a Kinesis Data Stream that determines the throughput limits. More shards allow higher ingestion rates and parallel processing.

Understanding these concepts will simplify following along as we progress through the next sections.

Steps

Step 1: Create an S3 bucket

The first step is to create an S3 bucket to store the data delivered by Kinesis Firehose.

Note:Since S3 bucket names must be globally unique, a random suffix is added to ensure uniqueness.

BUCKET_NAME="my-data-pipeline-bucket-$(openssl rand -hex 4)"

aws s3api create-bucket --bucket "$BUCKET_NAME" --region us-east-1bash

> Output:

AWS CLI — create a new S3 bucket

If we check the AWS S3 console, we can confirm the bucket was created successfully.

AWS Console — S3 buckets

Step 2: Setting Up a Kinesis Data Stream

Create a Kinesis Data Stream using the AWS CLI:

aws kinesis create-stream --stream-name my-data-stream --shard-count 1 --region us-east-1

Check the status of your stream:

aws kinesis describe-stream-summary --stream-name my-data-stream --region us-east-1

> Output:

AWS CLI — create a Kinesis data stream and check its status

From the AWS Console, we can see that a new data stream has been created.

AWS Console — Kinesis Data Stream

Step 3: Create a Kinesis Firehose Delivery Stream

Kinesis Data Firehose needs permissions to read from Kinesis Data Streams and write to an S3 bucket. To enable this, we’ll create an IAM role that:

  • Allows Kinesis Data Firehose to assume the role using sts:AssumeRole.
  • Grants read access to the Kinesis Data Stream.
  • Grants write access to the S3 bucket.

Create an IAM role for Firehose to access Kinesis and S3:

aws iam create-role --role-name FirehoseRole --assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": { "Service": "firehose.amazonaws.com" },
"Action": "sts:AssumeRole"
}
]
}'

> Output:

AWS CLI — create the IAM role: “FirehoseRole”

Attach the following permissions:

aws iam attach-role-policy --role-name FirehoseRole --policy-arn arn:aws:iam::aws:policy/AmazonKinesisFullAccess

aws iam attach-role-policy --role-name FirehoseRole --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess

Retrieve the current AWS account ID:

ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)

Create the Firehose delivery stream:

aws firehose create-delivery-stream \
--delivery-stream-name my-firehose-stream \
--delivery-stream-type KinesisStreamAsSource \
--kinesis-stream-source-configuration KinesisStreamARN=arn:aws:kinesis:us-east-1:$ACCOUNT_ID:stream/my-data-stream,RoleARN=arn:aws:iam::$ACCOUNT_ID:role/FirehoseRole \
--s3-destination-configuration BucketARN=arn:aws:s3:::$BUCKET_NAME,RoleARN=arn:aws:iam::$ACCOUNT_ID:role/FirehoseRole

> Output:

AWS CLI — create Kinesis Firehose delivery stream

From the AWS Console, we can see that a new Firehose stream has been created.

AWS Console — Kinesis Data Firehose delivery stream created successfully

Step 4: End-to-end testing

In this final step, we’ll test the workflow we’ve built by sending records to Kinesis Data Streams and verifying in S3 that they were processed successfully.

Send test data to the Kinesis Data Stream:

aws kinesis put-record --stream-name my-data-stream --partition-key test-key --data "$(echo -n '{"data": "Hello, Kinesis!"}' | base64)" --region us-east-1

> Output:

AWS CLI — send data to Kinesis Data Stream

Monitor the Firehose Delivery Stream:

In the Firehose monitoring tab, we can confirm that our record was successfully streamed. Make sure to check the ‘Records Read from Kinesis Data Streams (Sum)’ and ‘Successful Delivery to Amazon S3metrics.

AWS Console — Kinesis Firehose Stream metrics

Verify data in S3:

A new file has been added to S3, with the prefix based on the timestamp.

AWS Console — AWS S3 bucket

The file’s content:

AWS Console — viewing the file’s content with S3 Select

Conclusion

This real-time data pipeline is ideal for scenarios requiring immediate processing and storage of streaming data. By leveraging AWS services like Kinesis and S3 using the CLI, you ensure high availability, durability, and flexibility for your data workflows.

--

--

Ghita EL AMLAQUI
Ghita EL AMLAQUI

Written by Ghita EL AMLAQUI

Software engineer | Data engineer | AWS Certified

No responses yet