How to Create Scalable Data Pipelines with Python

Data Pipeline

The Universe is not static nor is the data it generates. As your business produces more data points, you need to be prepared to ingest and process them, and then load the results into a data lake that has been prepared to keep them safe and ready to be analyzed.

In this article, you will learn how to build scalable data pipelines using only Python code. Despite the simplicity, the pipeline you build will be able to scale to large amounts of data with some degree of flexibility.

ETL-based Data Pipelines

The classic Extraction, Transformation and Load, or ETL paradigm is still a handy way to model data pipelines. The heterogeneity of data sources (structured data, unstructured data points, events, server logs, database transaction information, etc.) demands an architecture flexible enough to ingest big data solutions (such as Apache Kafka-based data streams), as well as simpler data streams. We’re going to use the standard Pub/Sub pattern in order to achieve this flexibility.

In our test case, we’re going to process the Wikimedia Foundation’s (WMF) RecentChange stream, which is a web service that provides access to messages generated by changes to Wikipedia content. Because the stream is not in the format of a standard JSON message, we’ll first need to treat it before we can process the actual payload. The definition of the message structure is available online, but here’s a sample message:

event: message
id: [{"topic":"eqiad.mediawiki.recentchange","partition":0,"timestamp":1532031066001},{"topic":"codfw.mediawiki.recentchange","partition":0,"offset":-1}]data: {"event": "data", "is": "here"}

Server Side Events (SSE) are defined by the World Wide Web Consortium (W3C) as part of the HTML5 definition. They allow clients to receive streams using the HTTP protocol. In this particular case, the WMF EventStreams Web Service is backed by an Apache Kafka server. Our architecture should be able to process both types of connections:

  • SSE events, and
  • Subscriptions to more sophisticated services

Once we receive the messages, we’re going to process them in batches of 100 elements with the help of Python’s Pandas library, and then load our results into a data lake. The following diagram shows the entire pipeline:

The four components in our data pipeline each have a specific role to play:

  • SSE Consumer – This component will receive the events from the WMF server, extract the JSON payload, and forward it to our second component.
  • Message Queue – This component should be a massively scalable, durable and managed service that will queue up messages until they can be processed.
  • Stream Processor – This component will process messages from the queue in batches, and then publish the results into our data lake.
  • Data Lake – This long-term storage service will store our processed messages as a series of Comma Separated Value (CSV) files.

In this post, we’ll show how to code the SSE Consumer and Stream Processor, but we’ll use managed services for the Message Queue and Data Lake.

Getting Started with Data Pipelines

To follow along with the code in this tutorial, you’ll need to have a recent version of Python installed. When starting a new project, it’s always best to begin with a clean implementation in a virtual environment. You have two choices:

  • Download the pre-built Data Pipeline runtime environment (including Python 3.6) for Linux or macOS and install it using the State Tool into a virtual environment, or
  • Follow the instructions provided in my Python Data Pipeline Github repository to run the code in a containerized instance of JupyterLab.

All set? Let’s dive into the details.

How to Mock AWS SQS and S3

To run our data pipelines, we’re going to use the Moto Python library, which mocks the Amazon Web Services (AWS) infrastructure in a local server. The two AWS managed services that we’ll use are:

  • Simple Queue System (SQS) – this is the component that will queue up the incoming messages for us
  • Simple Storage Service (S3) – this is the data lake component, which will store our output CSVs

Other major cloud providers (Google Cloud Platform, Microsoft Azure, etc) have their own implementations for these components, but the principles are the same.

Once you’ve installed the Moto server library and the AWS CLI client, you have to create a credentials file at ~/.aws/credentials with the following content in order to authenticate to the AWS services:

[default]
AWS_ACCESS_KEY_ID = foo
AWS_SECRET_ACCESS_KEY = bar

You can then launch the SQS mock server from your terminal with the following command:

moto_server sqs -p 4576 -H localhost

If everything is OK, you can create a queue in another terminal using the following command:

aws --endpoint-url=http://localhost:4576 sqs create-queue --queue-name sse_queue --region us-east-1

This will return the URL of the queue that we’ll use in our SSE Consumer component. Now it’s time to launch the data lake and create a folder (or ‘bucket’ in AWS jargon) to store our results. Use the following snippet to launch a mock S3 service in a terminal:

moto_server s3 -p 4572 -H localhost

To create a bucket called sse-bucket in the US East region, use the following command:

aws --endpoint-url=http://localhost:4572 s3 mb s3://sse-bucket --region us-east-1

Consuming Events with Python

Our SSE Consumer will ingest the entire RecentChange web service message, but we’re only interested in the JSON payload. To extract just the JSON, we’ll use the SSEClient Python library and code a simple function to iterate over the message stream to pull out the JSON payload, and then place it into the recently created Message Queue using the AWS Boto3 Python library:

import boto3
import json
from sseclient import SSEClient as EventSource
 
#SQS client library
sqs = boto3.client('sqs'
    , endpoint_url="http://localhost:4576" #only for test purposes
    , use_ssl=False #only for test purposes
    , region_name='us-east-1')
queue_url = 'http://localhost:4576/queue/sse_queue'
 
def catch_events():
    url = 'https://stream.wikimedia.org/v2/stream/recentchange'
    for event in EventSource(url):
        if event.event == 'message':
            try:
                message = json.loads(event.data)
            except ValueError:
                pass
            else: 
                enqueue_message( json.dumps(message) )
 
def enqueue_message( message ):
    response = sqs.send_message(
        QueueUrl = queue_url,
        DelaySeconds=1,
        MessageBody = message
    )
    print('\rMessage %s enqueued' % response['MessageId'], sep=' ', end='', flush=True
)
  
if __name__== "__main__":
  catch_events()

This component will run indefinitely, consuming the SSE events and printing the id of each message queued. Now it’s time to process those messages.

Processing Data Streams with Python

In order to explore the data from the stream, we’ll consume it in batches of 100 messages. To make sure that the payload of each message is what we expect, we’re going to process the messages before adding them to the Pandas DataFrame. Let’s start reading the messages from the queue:

import boto3
import json
import time
import pandas as pd
def read_batch():
    while True:
        try:
            response = sqs.receive_message(
                QueueUrl = queue_url,
                MaxNumberOfMessages = 10 #Max Batch size
            )
            process_batch( response['Messages'] )
        except KeyError:
            print('No messages available, retrying in 5 seconds...')
            time.sleep(5)

This short function takes up to 10 messages and tries to process them. If there are no messages in the queue, it will wait five seconds before trying again.

Next, the process_batch function will clean the message’s body and enrich each one with their respective ReceiptHandle, which is an attribute from the Message Queue that uniquely identifies the message:

def process_batch( messages ):
    global list_msgs
    for message in messages:
        d = json.loads(message['Body'])
        #This just cleans the message's body from non-desired data 
        clean_dict = { key:(d[key] if key in d else None) for key in map_keys }
        #We enrich our df with the message's receipt handle in order to clean it from the queue
        clean_dict['ReceiptHandle'] = message['ReceiptHandle']
        list_msgs.append(clean_dict)
 
    if len( list_msgs ) >= 100:
        print('Batch ready to be exported to the Data Lake')
        to_data_lake( list_msgs )
        list_msgs = []

This function is an oversimplification. It creates a clean dictionary with the keys that we’re interested in, and sets the value to None if the original message body does not contain one of those keys. Also, after processing each message, our function appends the clean dictionary to a global list.

Finally, if the list contains the desired batch size (i.e., 100 messages), our processing function will persist the list into the data lake, and then restart the batch:

def to_data_lake( df ):
    batch_df = pd.DataFrame( list_msgs )
    csv = batch_df.to_csv( index=False )
    filename = 'batch-%s.csv' % df[0]['id']
    #csv to s3 bucket
    s3.Bucket('sse-bucket').put_object( Key=filename, Body=csv, ACL='public-read' )
    print('\r%s saved into the Data Lake' % filename, sep=' ', end='', flush=True
)
    remove_messages( batch_df )

The to_data_lake function transforms the list into a Pandas DataFrame in order to create a simple CSV file that will be put into the S3 service using the first message of the batch’s ReceiptHandle as a unique identifier.

We then proceed to clean all the messages from the queue using the remove_messages function:

def remove_messages( df ):
    for receipt_handle in df['ReceiptHandle'].values:
        sqs.delete_message(
            QueueUrl = queue_url,
            ReceiptHandle = receipt_handle
        )

If we want to check whether there are files in our bucket, we can use the AWS CLI to list all the objects in the bucket:

aws --endpoint-url=http://localhost:4572 s3 ls sse-bucket

Putting It All Together

The complete source code of this example is available in my Github repository. There, you’ll find:

  • A set of JupyterLab notebooks.
  • A run.sh file, which you can execute by pointing your browser at http://localhost:8888 and following the notebooks.

I’ve left some exercises to the reader to fill in, such as improving the sample SSE Consumer and Stream Processor by adding exception handling and more interesting data processing capabilities. Python has a number of different connectors you can implement to access a wide range of Event Sources (check out Faust, Smartalert or Streamz for more information).

When it comes to scaling, a good recommendation is to deploy both services as auto-scalable instances using AWS Fargate or similar service at your cloud provider. Finally, our entire example could be improved using standard data engineering tools such as Kedro or Dagster.

Next Steps – Create Scalable Data Pipelines with Python

  • Check out the source code on Github.
  • Download and install the Data Pipeline build, which contains a version of Python and all the tools listed in this post so you can test them out for yourself:
    • Install the State Tool on Windows using Powershell:
      IEX(New-Object Net.WebClient).downloadString('https://platform.activestate.com/dl/cli/install.ps1')Or install the State Tool on Linux / Mac:
      sh <(curl -q https://platform.activestate.com/dl/cli/install.sh)
    • Run the following command to download the build and automatically install it into a virtual environment for your OS:
      state activate Pizza-Team/Data-Pipeline
      state activate Pizza-Team/Data-Pipeline-Mac
      state activate Pizza-Team/Data-Pipeline-Win

Related Blogs:

Pandas: Framing the Data

How to Best Manage Threads in Python

Recent Posts

Scroll to Top