Building a Data Pipeline on Google Cloud
This guide provides a practical, step-by-step example of how to build a serverless data pipeline on Google Cloud Platform (GCP). We will create a pipeline that automates the process of ingesting raw data, transforming it, and loading it into a data warehouse for analysis.
The Scenario
Our goal is to process raw, CSV-formatted sales data that is uploaded to a Google Cloud Storage (GCS) bucket. The pipeline will calculate the total revenue generated by each product and load these aggregated results into a BigQuery table, making them available for business intelligence and reporting.
The GCP Toolkit
We will use a combination of powerful, managed services to build our pipeline:
- Google Cloud Storage (GCS): A scalable and durable object store. We'll use it to store our raw input CSV files.
- Cloud Dataflow: A fully managed, serverless service for running Apache Beam pipelines. This will be our transformation engine (the "T" in our ETL process).
- BigQuery: A serverless, highly scalable data warehouse. This will be our destination for the clean, aggregated data (the "L" in our ETL process).
- Cloud Composer: A managed Apache Airflow service. We will use it to schedule and orchestrate our pipeline.
Step 1: The Dataflow Pipeline (The Transformation Logic)
First, we write the core logic of our pipeline using the Apache Beam SDK in Python. This script reads from GCS, performs the transformation, and writes to BigQuery. Save this file as product_revenue_pipeline.py.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class ProductRevenuePipelineOptions(PipelineOptions):
"""Custom options for our pipeline."""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
"--input",
required=True,
help="GCS path for the input CSV files (e.g., gs://bucket/input/*.csv)",
)
parser.add_argument(
"--output",
required=True,
help="BigQuery table to write results to (e.g., project:dataset.table)",
)
def run():
"""Defines and runs the pipeline."""
options = PipelineOptions(streaming=False)
pipeline_options = options.view_as(ProductRevenuePipelineOptions)
with beam.Pipeline(options=options) as p:
(
p
| "ReadSalesData"
>> beam.io.ReadFromText(pipeline_options.input, skip_header_lines=1)
| "ParseCSV" >> beam.Map(lambda line: next(csv.reader([line])))
| "ExtractProductAndPrice"
>> beam.Map(
lambda fields: (fields[1], float(fields[2]))
) # (product_id, price)
| "SumPerProduct" >> beam.CombinePerKey(sum)
| "FormatForBigQuery"
>> beam.Map(lambda kv: {"product_id": kv[0], "total_revenue": kv[1]})
| "WriteToBigQuery"
>> beam.io.WriteToBigQuery(
pipeline_options.output,
schema="product_id:STRING, total_revenue:FLOAT",
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
)
)
if __name__ == "__main__":
import csv
import logging
logging.getLogger().setLevel(logging.INFO)
run()
Step 2: The BigQuery Table (The Destination)
Our pipeline will automatically create this table if it doesn't exist, but for clarity, our destination table in BigQuery will have the following schema:
product_id: STRINGtotal_revenue: FLOAT
Step 3: The Composer DAG (The Orchestrator)
Next, we create an Airflow DAG to schedule and run our Dataflow job. This Python script defines the workflow. Save this file as product_revenue_dag.py.
from airflow import DAG
from airflow.providers.google.cloud.operators.dataflow import (
DataflowRunPythonJobOperator,
)
from airflow.utils.dates import days_ago
# --- CONFIGURATION ---
GCP_PROJECT_ID = "your-gcp-project-id"
GCS_BUCKET = "your-gcs-bucket-name"
GCS_INPUT_FILES = f"gs://{GCS_BUCKET}/input/sales_*.csv"
BIGQUERY_OUTPUT_TABLE = f"{GCP_PROJECT_ID}:sales_data.product_revenue"
DATAFLOW_PYTHON_FILE = f"gs://{GCS_BUCKET}/dags/product_revenue_pipeline.py" # The Beam script must be in GCS
default_args = {
"owner": "airflow",
"start_date": days_ago(1),
"retries": 1,
"dataflow_default_options": {
"project": GCP_PROJECT_ID,
"region": "us-central1",
"temp_location": f"gs://{GCS_BUCKET}/temp",
},
}
with DAG(
dag_id="product_revenue_pipeline_dag",
default_args=default_args,
schedule_interval="@daily", # Run the pipeline once a day
catchup=False,
) as dag:
run_product_revenue_pipeline = DataflowRunPythonJobOperator(
task_id="run_product_revenue_pipeline",
py_file=DATAFLOW_PYTHON_FILE,
job_name="product-revenue-pipeline-{{ ds_nodash }}",
options={
"input": GCS_INPUT_FILES,
"output": BIGQUERY_OUTPUT_TABLE,
},
)
Step 4: Running the Pipeline
- Upload Files:
- Upload your raw CSV data (e.g.,
sales_20251010.csv) to theinput/folder in your GCS bucket. - Upload the Beam script (
product_revenue_pipeline.py) and the DAG script (product_revenue_dag.py) to thedags/folder in your Cloud Composer GCS bucket.
- Upload your raw CSV data (e.g.,
- Trigger the DAG:
- Airflow will automatically detect the new DAG.
- Go to your Cloud Composer UI, find the
product_revenue_pipeline_dag, and you can either wait for its daily schedule or trigger it manually.
- Monitor and Verify:
- You can monitor the Dataflow job's progress in the GCP Console.
- Once the pipeline succeeds, you can query the
product_revenuetable in BigQuery to see your final aggregated results.
This example demonstrates a robust, scalable, and automated pattern for data processing on Google Cloud using serverless, managed services.