Introduction
Monitoring Apache Spark structured streaming data workloads is challenging because the data is continuously processed as it arrives. Because of this always-on nature of stream processing, it is harder to troubleshoot problems during development and production without real-time metrics, alerting and dashboards. Traces complement metrics, and since Spark doesn’t include them by default, we integrate them using OpenTelemetry.
Problem statement
In a traditional API enterprise job, OpenTelemetry handles the start and end tracing for simple cases. However, in a Spark streaming job that runs for an extended period, potentially days, and processes millions of messages where calls might fail, its harder to keep track of failing batches.
Solution
StreamingQueryListener is an excellent interface that is highly useful for instrumenting Spark jobs with OpenTelemetry. It’s a call-back for operations in batches that roughly equate to API requests, making it particularly beneficial for real-time monitoring of streaming data workloads.
In PySpark, the StreamingQueryListener listens for events related to StreamingQuery, providing methods such as onQueryStarted, onQueryProgress, onQueryIdle, and onQueryTerminated to manage different stages of a query’s lifecycle.
Additionally, in Structured Streaming, you can define observable metrics, which are essentially arbitrary aggregate functions applied to a query (DataFrame). These metrics can be monitored by attaching a listener to the Spark session. For further examples, such as the Kafka-to-Kafka StreamingQueryListener event, please refer here.
The sample code below overrides Spark StreamingQueryListener functions onQueryStarted, onQueryTerminated, and onQueryProgress.
Use cases
- Alerting on issues: Receive notifications when a streaming query encounters an error or experiences significant performance degradation.
- Monitoring data ingestion rate: Track how fast data is being ingested into the streaming query.
- Performance analysis: Gather metrics about query execution time and resource usage.
- Logging and auditing: Log important events related to streaming queries for debugging and troubleshooting.
Sample code
The following code shows how we utilized StreamingQueryListener to track the latency and performance of batch events.
import datetime
from opentelemetry import trace
from pyspark.sql.streaming import StreamingQueryListener
def add_stream_progress_tracing(spark):
listener = OpenTelemetryListener()
spark.streams.addListener(listener)
class OpenTelemetryListener(StreamingQueryListener):
def __init__(self):
self.tracer = trace.get_tracer(__name__)
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
progress = event.progress
# epoch as int, multiply the result by 1_000_000_000 to convert to nanos
start_time = int(
datetime.datetime.fromisoformat(progress.timestamp).timestamp()
* 1_000_000_000
)
batchDuration = progress.batchDuration
# otel times are specified in nanos
with self.tracer.start_span(
name="process batch",
start_time=start_time,
kind=trace.SpanKind.SERVER,
) as span:
if progress.name:
span.set_attribute("spark.streaming.query_name", progress.name)
span.set_attribute(
"spark.streaming.query_progress_id", progress.id.__str__()
)
span.set_attribute("spark.streaming.batch_id", progress.batchId)
span.set_attribute("spark.streaming.num_input_rows", progress.numInputRows)
span.set_attribute(
"spark.streaming.input_rows_per_second",
progress.inputRowsPerSecond,
)
span.set_attribute(
"spark.streaming.processed_rows_per_second",
progress.processedRowsPerSecond,
)
# The time it takes to plan and execute the microbatch.
span.set_attribute(
"spark.streaming.trigger_execution_time",
progress.durationMs["triggerExecution"],
)
# Calculate the span end
# batchDuration is in milliseconds, so multiply the result by 1_000_000 to convert to nanos
end_time = start_time + (batchDuration * 1_000_000)
span.end(end_time=end_time)
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
Determine whether the query ended with an error and retrieve the corresponding error details from the StreamingQueryProgress object.
The properties are available as the same as Scala API.
"""
with tracer.start_as_current_span("QueryTerminated") as span:
# Record the query ID as an attribute
span.set_attribute("query.id", str(event.id))
if event.exception is not None:
# Mark the span with error details if an exception occurred
span.set_attribute("query.error", True)
span.set_attribute("query.error.message", str(event.exception))
print(f"Query terminated with an error: {event.exception}")
else:
span.set_attribute("query.error", False)
print("Query terminated successfully.")
Summary
In conclusion, the StreamingQueryListener class can be effectively utilized in the PySpark Streaming pipeline. This approach is also applicable to other Scala/Java-supported libraries for PySpark.
Overall, the flexibility and adaptability of the StreamingQueryListener class make it a valuable tool for monitoring and optimizing streaming data workloads across various platforms and use cases.
We referenced this open-source code, and you can access it here