Spark Streaming: Processing Data in Real-Time
Modern data doesn't always arrive in neat batches; it often flows continuously as a stream. Spark Streaming is Spark's module for processing these unbounded streams of data in a scalable, high-throughput, and fault-tolerant way.
Common sources of streaming data include:
- Log files being generated by web servers
- Messages from queuing systems like Apache Kafka or RabbitMQ
- Data from IoT sensors
- Social media feeds
The Modern Approach: Structured Streaming
While early versions of Spark used an RDD-based abstraction called DStreams, the modern and recommended approach is Structured Streaming.
Structured Streaming is a stream processing engine built on the Spark SQL engine and the DataFrame API. The core idea is to treat a live data stream as a table that is being continuously appended. This allows you to apply the same powerful DataFrame operations—selections, filters, aggregations, and joins—to streaming data just as you would to static, batch data.
This model is much simpler and more powerful than the older DStream API. Spark handles the complexities of fault tolerance, state management, and exactly-once processing semantics behind the scenes.

Common Streaming Sources
Spark can ingest data from a variety of sources, which can be broadly categorized by how they handle data.
1. File Source (Processing Data-at-Rest as a Stream)
This source treats data-at-rest as a stream. Spark monitors a directory for new files, processing them as they arrive. This is simple to set up, but has an important limitation: files must be immutable. Spark's file stream source works by discovering new files added to the directory. It cannot detect updates to existing files. Therefore, the upstream process must always write new, unique files to the target directory.
2. Apache Kafka Source (Processing Data-in-Motion)
For most real-world scenarios, this is the ideal source. It processes true data-in-motion. Apache Kafka is a durable, high-throughput message bus that decouples your data producers from your Spark consumer.
The model is:
- Producers: Your applications (e.g., web servers, IoT devices) send messages (records) to a Kafka topic.
- Kafka Cluster: Acts as the intermediary, storing the stream of records reliably.
- Spark Consumer: Your Spark Streaming application subscribes to the topic and processes the records as they arrive.
This is the most robust and scalable architecture for production streaming pipelines.
Example 1: Monitoring IoT Data from a File Source
A common use case for streaming is monitoring real-time data from a network of IoT devices. Imagine we have sensors sending temperature readings as JSON data to a storage location. Our goal is to create a live dashboard of alerts for any sensor reporting a temperature above a critical threshold.
In this scenario, we'll read a stream of JSON files from a directory. This is a common pattern where an upstream process continuously drops new data files into a shared location.
The Data (Sample JSON files):
You would create files like data-1.json, data-2.json in a directory (e.g., /tmp/iot_stream/).
File 1 (data-1.json):
{"deviceId": "device-A", "timestamp": "2025-10-10T12:01:00Z", "temperature": 65.5}
{"deviceId": "device-B", "timestamp": "2025-10-10T12:01:00Z", "temperature": 102.1}
File 2 (data-2.json):
{"deviceId": "device-A", "timestamp": "2025-10-10T12:02:00Z", "temperature": 68.0}
{"deviceId": "device-C", "timestamp": "2025-10-10T12:02:00Z", "temperature": 105.7}
The PySpark Code:
This code reads the JSON stream, filters for high temperatures, and prints the resulting alerts to the console.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
)
# 1. Create a SparkSession
spark = SparkSession.builder.appName("IoTStreamMonitoring").getOrCreate()
# Hide INFO logs for better readability of the output
spark.sparkContext.setLogLevel("WARN")
# 2. Define the schema for the incoming JSON data
schema = StructType(
[
StructField("deviceId", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("temperature", DoubleType(), True),
]
)
# 3. Create a streaming DataFrame from a directory of JSON files
# Spark will monitor this directory for any new files being added.
streaming_df = spark.readStream.schema(schema).json(
"/tmp/iot_stream/"
) # Make sure this directory exists
# 4. Define the transformation: filter for high-temperature alerts
alerts_df = streaming_df.filter(col("temperature") > 100.0).select(
"deviceId", "temperature", "timestamp"
)
# 5. Define the output sink and start the query
# This query will print the alerts to the console as they are detected.
query = (
alerts_df.writeStream.outputMode("append")
.format("console")
.option("truncate", "false")
.start()
)
# 6. Wait for the stream to terminate
query.awaitTermination()
How to Run It:
- Create the input directory:
mkdir -p /tmp/iot_stream/ - Run the PySpark script. It will start and wait for data.
- In a separate terminal, copy the sample JSON files one by one into the
/tmp/iot_stream/directory. - Observe the console output of your Spark application. You will see a table with the high-temperature alerts appearing in real-time as Spark processes each new file.
Example 2: Processing IoT Data from a Kafka Topic
This example achieves the same goal as the first one, but uses a more robust Kafka source.
The PySpark Code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
)
# 1. Create a SparkSession
# Note: For Kafka, you need an additional package. You would typically run this
# with: spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 my_script.py
spark = SparkSession.builder.appName("IoTKafkaExample").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
# 2. Define the schema for the JSON data contained in the Kafka messages
schema = StructType(
[
StructField("deviceId", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("temperature", DoubleType(), True),
]
)
# 3. Create a streaming DataFrame that reads from a Kafka topic
# This subscribes to the "iot_data" topic from a Kafka cluster.
kafka_df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "iot_data")
.load()
)
# 4. Parse the Kafka messages
# Kafka messages have a 'value' column containing the payload.
# We cast it to a STRING and then use 'from_json' to parse it.
parsed_df = kafka_df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# 5. Define the transformation (same as before)
alerts_df = parsed_df.filter(col("temperature") > 100.0)
# 6. Define the output sink and start the query
query = (
alerts_df.writeStream.outputMode("append")
.format("console")
.option("truncate", "false")
.start()
)
# 7. Wait for the stream to terminate
query.awaitTermination()
Code Explanation:
.format("kafka"): This tells Spark to use the built-in Kafka source..option("kafka.bootstrap.servers", ...): Provides the address of the Kafka brokers.- `.option("subscribe", "iot_data"): Specifies the topic to read from.
from_json(...): The payload (value) from Kafka is often a binary or string. This function is crucial for parsing a JSON string into a structured DataFrame with a proper schema. We then select the fields from the parsed data to get our final columns.
Controlling the Stream: Triggers
By default, Spark tries to process streaming data as quickly as possible. As soon as one micro-batch is finished, it immediately starts the next. However, you have fine-grained control over how often Spark checks the source for new data using the .trigger() method. This is crucial for managing computational resources and controlling costs.
There are several types of triggers:
| Trigger Type | Configuration | Description |
|---|---|---|
| Default | (No .trigger() call) |
As fast as possible. As soon as one micro-batch finishes, the next one starts. Ideal for the lowest latency. |
| Processing Time | .trigger(processingTime='30 seconds') |
Fixed Interval Polling. Spark will check the source for new data at a fixed interval (e.g., every 30 seconds). This is the most common way to control a stream. |
| Once | .trigger(once=True) |
Single Batch. Spark processes all available data in a single batch and then stops the query. This is perfect for converting a streaming job into a scheduled, incremental batch job. |
| Continuous | .trigger(continuous='1 second') |
(Advanced) Low-Latency. An experimental mode that aims for near-real-time (sub-millisecond) latency. It has more limitations and is used for specialized cases. |
Example with a Trigger:
This modified query will poll the source for new data every 30 seconds.
query = (
alerts_df.writeStream.outputMode("append")
.format("console")
.trigger(processingTime="30 seconds") # Poll for new data every 30 seconds
.start()
)
query.awaitTermination()
**Code Explanation:**
- **`.format("kafka")`**: This tells Spark to use the built-in Kafka source.
- **`.option("kafka.bootstrap.servers", ...)`**: Provides the address of the Kafka brokers.
- **`.option("subscribe", "iot_data")`**: Specifies the topic to read from.
- **`from_json(...)`**: The payload (`value`) from Kafka is often a binary or string. This function is crucial for parsing a JSON string into a structured DataFrame with a proper schema. We then select the fields from the parsed data to get our final columns.