MOBI BOOT CAMP CORP. logoLearning Buddy
  • SIGN IN
  • Foundations
  • The Hadoop Ecosystem: Batch at Scale
  • The Spark Ecosystem: In-Memory Processing
  • Data Pipelines and Transport
    • Apache Kafka
    • Data Pipeline Concepts
    • Apache Beam: A Unified Model
    • Building a GCP Data Pipeline
    • Slides
  • Search & Information Retrieval
  • The Modern Data Stack
  • Glossary

Building a Data Pipeline on Google Cloud

This guide provides a practical, step-by-step example of how to build a serverless data pipeline on Google Cloud Platform (GCP). We will create a pipeline that automates the process of ingesting raw data, transforming it, and loading it into a data warehouse for analysis.

The Scenario

Our goal is to process raw, CSV-formatted sales data that is uploaded to a Google Cloud Storage (GCS) bucket. The pipeline will calculate the total revenue generated by each product and load these aggregated results into a BigQuery table, making them available for business intelligence and reporting.

The GCP Toolkit

We will use a combination of powerful, managed services to build our pipeline:

  • Google Cloud Storage (GCS): A scalable and durable object store. We'll use it to store our raw input CSV files.
  • Cloud Dataflow: A fully managed, serverless service for running Apache Beam pipelines. This will be our transformation engine (the "T" in our ETL process).
  • BigQuery: A serverless, highly scalable data warehouse. This will be our destination for the clean, aggregated data (the "L" in our ETL process).
  • Cloud Composer: A managed Apache Airflow service. We will use it to schedule and orchestrate our pipeline.

Step 1: The Dataflow Pipeline (The Transformation Logic)

First, we write the core logic of our pipeline using the Apache Beam SDK in Python. This script reads from GCS, performs the transformation, and writes to BigQuery. Save this file as product_revenue_pipeline.py.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class ProductRevenuePipelineOptions(PipelineOptions):
    """Custom options for our pipeline."""

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            "--input",
            required=True,
            help="GCS path for the input CSV files (e.g., gs://bucket/input/*.csv)",
        )
        parser.add_argument(
            "--output",
            required=True,
            help="BigQuery table to write results to (e.g., project:dataset.table)",
        )


def run():
    """Defines and runs the pipeline."""
    options = PipelineOptions(streaming=False)
    pipeline_options = options.view_as(ProductRevenuePipelineOptions)

    with beam.Pipeline(options=options) as p:
        (
            p
            | "ReadSalesData"
            >> beam.io.ReadFromText(pipeline_options.input, skip_header_lines=1)
            | "ParseCSV" >> beam.Map(lambda line: next(csv.reader([line])))
            | "ExtractProductAndPrice"
            >> beam.Map(
                lambda fields: (fields[1], float(fields[2]))
            )  # (product_id, price)
            | "SumPerProduct" >> beam.CombinePerKey(sum)
            | "FormatForBigQuery"
            >> beam.Map(lambda kv: {"product_id": kv[0], "total_revenue": kv[1]})
            | "WriteToBigQuery"
            >> beam.io.WriteToBigQuery(
                pipeline_options.output,
                schema="product_id:STRING, total_revenue:FLOAT",
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            )
        )


if __name__ == "__main__":
    import csv
    import logging

    logging.getLogger().setLevel(logging.INFO)
    run()

Step 2: The BigQuery Table (The Destination)

Our pipeline will automatically create this table if it doesn't exist, but for clarity, our destination table in BigQuery will have the following schema:

  • product_id: STRING
  • total_revenue: FLOAT

Step 3: The Composer DAG (The Orchestrator)

Next, we create an Airflow DAG to schedule and run our Dataflow job. This Python script defines the workflow. Save this file as product_revenue_dag.py.

from airflow import DAG
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowRunPythonJobOperator,
)
from airflow.utils.dates import days_ago

# --- CONFIGURATION ---
GCP_PROJECT_ID = "your-gcp-project-id"
GCS_BUCKET = "your-gcs-bucket-name"
GCS_INPUT_FILES = f"gs://{GCS_BUCKET}/input/sales_*.csv"
BIGQUERY_OUTPUT_TABLE = f"{GCP_PROJECT_ID}:sales_data.product_revenue"
DATAFLOW_PYTHON_FILE = f"gs://{GCS_BUCKET}/dags/product_revenue_pipeline.py"  # The Beam script must be in GCS

default_args = {
    "owner": "airflow",
    "start_date": days_ago(1),
    "retries": 1,
    "dataflow_default_options": {
        "project": GCP_PROJECT_ID,
        "region": "us-central1",
        "temp_location": f"gs://{GCS_BUCKET}/temp",
    },
}

with DAG(
    dag_id="product_revenue_pipeline_dag",
    default_args=default_args,
    schedule_interval="@daily",  # Run the pipeline once a day
    catchup=False,
) as dag:
    run_product_revenue_pipeline = DataflowRunPythonJobOperator(
        task_id="run_product_revenue_pipeline",
        py_file=DATAFLOW_PYTHON_FILE,
        job_name="product-revenue-pipeline-{{ ds_nodash }}",
        options={
            "input": GCS_INPUT_FILES,
            "output": BIGQUERY_OUTPUT_TABLE,
        },
    )

Step 4: Running the Pipeline

  1. Upload Files:
    • Upload your raw CSV data (e.g., sales_20251010.csv) to the input/ folder in your GCS bucket.
    • Upload the Beam script (product_revenue_pipeline.py) and the DAG script (product_revenue_dag.py) to the dags/ folder in your Cloud Composer GCS bucket.
  2. Trigger the DAG:
    • Airflow will automatically detect the new DAG.
    • Go to your Cloud Composer UI, find the product_revenue_pipeline_dag, and you can either wait for its daily schedule or trigger it manually.
  3. Monitor and Verify:
    • You can monitor the Dataflow job's progress in the GCP Console.
    • Once the pipeline succeeds, you can query the product_revenue table in BigQuery to see your final aggregated results.

This example demonstrates a robust, scalable, and automated pattern for data processing on Google Cloud using serverless, managed services.

Privacy Policy | Terms & Conditions