MOBI BOOT CAMP CORP. logoLearning Buddy
  • SIGN IN
  • Foundations
  • The Hadoop Ecosystem: Batch at Scale
  • The Spark Ecosystem: In-Memory Processing
  • Data Pipelines and Transport
    • Apache Kafka
    • Data Pipeline Concepts
    • Apache Beam: A Unified Model
    • Building a GCP Data Pipeline
    • Slides
  • Search & Information Retrieval
  • The Modern Data Stack
  • Glossary

Apache Beam: A Unified Model for Data Processing

In the big data ecosystem, developers historically had to write and maintain separate codebases for batch processing (e.g., using Spark or MapReduce) and stream processing (e.g., using Spark Streaming or Flink). Apache Beam was created to solve this problem.

Beam provides a unified, portable programming model for defining both batch and streaming data processing pipelines. The name itself stands for Batch + Stream.

The core principle of Beam is to separate the logical definition of a data pipeline from its physical execution. You write your pipeline logic once using the Beam SDK (available in Python, Java, and Go), and then you can execute it on any supported distributed processing engine.

The Beam Model: Pipelines, PCollections, and PTransforms

The Beam programming model consists of three key abstractions:

  1. Pipeline: This is the entire data processing workflow, from start to finish. It represents a Directed Acyclic Graph (DAG) of all the steps involved in your job.
  2. PCollection: A PCollection (Parallel Collection) is the distributed dataset that your Beam pipeline operates on. It can be a bounded dataset (from a batch source like a file) or an unbounded dataset (from a streaming source like Kafka). This single abstraction for both batch and streaming data is a core part of the unified model.
  3. PTransform: A PTransform (Parallel Transform) is a data processing step in your pipeline. It takes one or more PCollections as input, performs a processing function, and produces one or more PCollections as output. Beam provides a library of core transforms like Map, Filter, GroupByKey, and Combine.

The Magic of Runners: Write Once, Run Anywhere

The component that makes Beam portable is the Runner. A Runner adapts a Beam pipeline to run on a specific distributed processing engine. When you run your pipeline, you choose a Runner for your target backend.

The Apache Beam Model: Write Once, Run on Any Engine

This means you can:

  • Write a pipeline and test it locally using the DirectRunner.
  • Run the exact same code at scale on an Apache Spark cluster using the SparkRunner.
  • Run it on an Apache Flink cluster using the FlinkRunner.
  • Run it as a fully managed service on Google Cloud Dataflow.

This portability prevents vendor lock-in and allows you to choose the best execution engine for your specific needs without rewriting your core business logic.

Example: Calculating Customer Spending

Let's use a more practical example than word count. Imagine we have a set of purchase records and we want to calculate the total amount spent by each customer.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Sample data: a list of purchase events (dictionaries)
purchase_data = [
    {"customer_id": "c1", "product": "Laptop", "price": 1200},
    {"customer_id": "c2", "product": "Mouse", "price": 25},
    {"customer_id": "c1", "product": "Keyboard", "price": 75},
    {"customer_id": "c3", "product": "Monitor", "price": 300},
    {"customer_id": "c2", "product": "Webcam", "price": 50},
    {"customer_id": "c1", "product": "Webcam", "price": 50},
]

# 1. Create a Pipeline object
with beam.Pipeline(options=PipelineOptions()) as pipeline:
    # 2. Create a PCollection from our in-memory list
    purchases = pipeline | "CreatePurchases" >> beam.Create(purchase_data)

    # 3. Apply a series of PTransforms
    customer_spending = (
        purchases
        # Map each purchase dictionary to a (customer_id, price) tuple
        | "ExtractCustomerAndPrice"
        >> beam.Map(lambda p: (p["customer_id"], p["price"]))
        # Group by the key (customer_id) and sum the values (prices)
        | "SumPerCustomer" >> beam.CombinePerKey(sum)
    )

    # A final transform to format the output for printing
    formatted_output = customer_spending | "FormatOutput" >> beam.Map(
        lambda kv: f"Customer: {kv[0]}, Total Spent: ${kv[1]}"
    )

    # Print the results to the console
    formatted_output | "PrintResults" >> beam.Map(print)

# The pipeline runs automatically when the 'with' block is exited.
# Expected Output:
# Customer: c1, Total Spent: 325
# Customer: c2, Total Spent: $75
# Customer: c3, Total Spent: $300

This example demonstrates a common ETL pattern: extracting relevant fields, grouping by a key, and performing an aggregation, all within a clean, portable Beam pipeline.

Machine Learning with RunInference

Apache Beam provides a built-in transform called RunInference specifically designed for running machine learning models within a pipeline. While Beam supports many frameworks out-of-the-box (like PyTorch, TensorFlow, and Scikit-learn), you can also "bring your own model" by implementing a custom ModelHandler.

How to Add Your Own Custom ML Model

To use a custom model with RunInference, you need to subclass the apache_beam.ml.inference.base.ModelHandler and implement two critical methods:

  1. load_model: This method is called once per worker process when the pipeline starts. It should return your loaded model object. This is where you would initialize your weights or load a serialized model file.
  2. run_inference: This method defines how to process a batch of data. It takes a list of inputs, passes them through your model, and returns a list of PredictionResult objects.

Example Implementation Pattern

from apache_beam.ml.inference.base import ModelHandler, PredictionResult, RunInference


class MyCustomModelHandler(ModelHandler):
    def load_model(self):
        # Load your model (e.g., from a file or API)
        model = my_custom_library.load("model_path")
        return model

    def run_inference(self, batch, model, inference_args=None):
        # Process a batch of inputs
        predictions = model.predict(batch)
        # Return results as PredictionResult objects
        return [PredictionResult(x, p) for x, p in zip(batch, predictions)]


# Using it in a pipeline
with beam.Pipeline() as p:
    predictions = (
        p
        | "CreateInputs" >> beam.Create([data1, data2])
        | "RunInference" >> RunInference(MyCustomModelHandler())
    )

By abstracting the model loading and execution, RunInference handles complex production concerns for you, such as batching requests for GPU efficiency and model sharing across threads.


For more details, see the official Google Cloud documentation: Bring your own ML model to Beam RunInference

  • Google Colab Notebook: Run Custom Inference
  • GitHub Source: run_custom_inference.ipynb
Privacy Policy | Terms & Conditions