{"id":16078,"date":"2025-02-20T00:00:00","date_gmt":"2025-02-20T08:00:00","guid":{"rendered":"https:\/\/devblogs.microsoft.com\/ise\/?p=16078"},"modified":"2025-02-20T06:39:16","modified_gmt":"2025-02-20T14:39:16","slug":"spark_job_otel","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/ise\/spark_job_otel\/","title":{"rendered":"Instrumenting Apache Spark Structured Streaming jobs using OpenTelemetry"},"content":{"rendered":"<h2>Introduction<\/h2>\n<p>Monitoring <a href=\"https:\/\/spark.apache.org\/docs\/latest\/structured-streaming-programming-guide.html\">Apache Spark structured streaming<\/a> data workloads is challenging because the data is continuously processed as it arrives. Because of this always-on nature of stream processing, it is harder to troubleshoot problems during development and production without real-time metrics, alerting and dashboards.\nTraces complement metrics, and since Spark doesn\u2019t include them by default, we integrate them using OpenTelemetry.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/devblogs.microsoft.com\/ise\/wp-content\/uploads\/sites\/55\/2025\/02\/spark_flow.png\" alt=\"Alt text\" \/><\/p>\n<h2>Problem statement<\/h2>\n<p>In a traditional API enterprise job, <a href=\"https:\/\/opentelemetry.io\/docs\/what-is-opentelemetry\/\">OpenTelemetry<\/a> handles the start and end tracing for simple cases. However, in a Spark streaming job that runs for an extended period, potentially days, and processes millions of messages where calls might fail, its harder to keep track of failing batches.<\/p>\n<h2>Solution<\/h2>\n<p><a href=\"https:\/\/spark.apache.org\/docs\/latest\/api\/python\/reference\/pyspark.ss\/api\/pyspark.sql.streaming.StreamingQueryListener.html\">StreamingQueryListener<\/a> is an excellent interface that is highly useful for instrumenting Spark jobs with OpenTelemetry. It&#8217;s a call-back for operations in batches that roughly equate to API requests, making it particularly beneficial for real-time monitoring of streaming data workloads.<\/p>\n<p>In PySpark, the StreamingQueryListener listens for events related to StreamingQuery, providing methods such as onQueryStarted, onQueryProgress, onQueryIdle, and onQueryTerminated to manage different stages of a query\u2019s lifecycle.<\/p>\n<p>Additionally, in Structured Streaming, you can define <a href=\"https:\/\/docs.databricks.com\/en\/structured-streaming\/stream-monitoring.html#defining-observable-metrics-in-structured-streaming\">observable metrics<\/a>, which are essentially arbitrary aggregate functions applied to a query (DataFrame). These metrics can be monitored by attaching a listener to the Spark session. For further examples, such as the Kafka-to-Kafka StreamingQueryListener event, please refer <a href=\"https:\/\/docs.databricks.com\/en\/structured-streaming\/stream-monitoring.html#example-kafka-to-kafka-streamingquerylistener-event\">here<\/a>.<\/p>\n<p>The sample code below overrides Spark StreamingQueryListener functions onQueryStarted, onQueryTerminated, and onQueryProgress.<\/p>\n<h3>Use cases<\/h3>\n<ul>\n<li><strong>Alerting on issues:<\/strong> Receive notifications when a streaming query encounters an error or experiences significant performance degradation.<\/li>\n<li><strong>Monitoring data ingestion rate:<\/strong> Track how fast data is being ingested into the streaming query.<\/li>\n<li><strong>Performance analysis:<\/strong> Gather metrics about query execution time and resource usage.<\/li>\n<li><strong>Logging and auditing:<\/strong> Log important events related to streaming queries for debugging and troubleshooting.<\/li>\n<\/ul>\n<h3>Sample code<\/h3>\n<p>The following code shows how we utilized StreamingQueryListener to track the latency and performance of batch events.<\/p>\n<pre><code class=\"language-python\">import datetime\r\n\r\nfrom opentelemetry import trace\r\nfrom pyspark.sql.streaming import StreamingQueryListener\r\n\r\ndef add_stream_progress_tracing(spark):\r\n    listener = OpenTelemetryListener()\r\n    spark.streams.addListener(listener)\r\n\r\nclass OpenTelemetryListener(StreamingQueryListener):\r\n    def __init__(self):\r\n        self.tracer = trace.get_tracer(__name__)\r\n\r\n    def onQueryProgress(self, event):\r\n        \"\"\"\r\n        Called when there is some status update (ingestion rate updated, etc.)\r\n\r\n        Parameters\r\n        ----------\r\n        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`\r\n            The properties are available as the same as Scala API.\r\n\r\n        Notes\r\n        -----\r\n        This method is asynchronous. The status in\r\n        :class:`pyspark.sql.streaming.StreamingQuery` returns the\r\n        most recent status, regardless of when this method is called. The status\r\n        of :class:`pyspark.sql.streaming.StreamingQuery`.\r\n        may change before or when you process the event.\r\n        For example, you may find :class:`StreamingQuery`\r\n        terminates when processing `QueryProgressEvent`.\r\n        \"\"\"\r\n        progress = event.progress\r\n        # epoch as int, multiply the result by 1_000_000_000 to convert to nanos\r\n        start_time = int(\r\n            datetime.datetime.fromisoformat(progress.timestamp).timestamp()\r\n            * 1_000_000_000\r\n        )\r\n        batchDuration = progress.batchDuration\r\n        # otel times are specified in nanos\r\n        with self.tracer.start_span(\r\n            name=\"process batch\",\r\n            start_time=start_time,\r\n            kind=trace.SpanKind.SERVER,\r\n        ) as span:\r\n            if progress.name:\r\n                span.set_attribute(\"spark.streaming.query_name\", progress.name)\r\n            span.set_attribute(\r\n                \"spark.streaming.query_progress_id\", progress.id.__str__()\r\n            )\r\n            span.set_attribute(\"spark.streaming.batch_id\", progress.batchId)\r\n            span.set_attribute(\"spark.streaming.num_input_rows\", progress.numInputRows)\r\n            span.set_attribute(\r\n                \"spark.streaming.input_rows_per_second\",\r\n                progress.inputRowsPerSecond,\r\n            )\r\n            span.set_attribute(\r\n                \"spark.streaming.processed_rows_per_second\",\r\n                progress.processedRowsPerSecond,\r\n            )\r\n            # The time it takes to plan and execute the microbatch.\r\n            span.set_attribute(\r\n                \"spark.streaming.trigger_execution_time\",\r\n                progress.durationMs[\"triggerExecution\"],\r\n            )\r\n\r\n            # Calculate the span end\r\n            # batchDuration is in milliseconds, so multiply the result by 1_000_000 to convert to nanos\r\n            end_time = start_time + (batchDuration * 1_000_000)\r\n            span.end(end_time=end_time)\r\n\r\n    def onQueryStarted(self, event):\r\n        \"\"\"\r\n        Called when a query is started.\r\n\r\n        Parameters\r\n        ----------\r\n        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`\r\n            The properties are available as the same as Scala API.\r\n\r\n        Notes\r\n        -----\r\n        This is called synchronously with\r\n        meth:`pyspark.sql.streaming.DataStreamWriter.start`,\r\n        that is, ``onQueryStart`` will be called on all listeners before\r\n        ``DataStreamWriter.start()`` returns the corresponding\r\n        :class:`pyspark.sql.streaming.StreamingQuery`.\r\n        Do not block in this method as it will block your query.\r\n        \"\"\"\r\n        pass\r\n\r\n    def onQueryTerminated(self, event):\r\n        \"\"\"\r\n        Called when a query is stopped, with or without error.\r\n\r\n        Parameters\r\n        ----------\r\n        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`\r\n            Determine whether the query ended with an error and retrieve the corresponding error details from the StreamingQueryProgress object.\r\n            The properties are available as the same as Scala API.\r\n        \"\"\"\r\n        with tracer.start_as_current_span(\"QueryTerminated\") as span:\r\n            # Record the query ID as an attribute\r\n            span.set_attribute(\"query.id\", str(event.id))\r\n            if event.exception is not None:\r\n                # Mark the span with error details if an exception occurred\r\n                span.set_attribute(\"query.error\", True)\r\n                span.set_attribute(\"query.error.message\", str(event.exception))\r\n                print(f\"Query terminated with an error: {event.exception}\")\r\n            else:\r\n                span.set_attribute(\"query.error\", False)\r\n                print(\"Query terminated successfully.\")<\/code><\/pre>\n<h2>Summary<\/h2>\n<p>In conclusion, the StreamingQueryListener class can be effectively utilized in the PySpark Streaming pipeline. This approach is also applicable to other Scala\/Java-supported libraries for PySpark.<\/p>\n<p>Overall, the flexibility and adaptability of the StreamingQueryListener class make it a valuable tool for monitoring and optimizing streaming data workloads across various platforms and use cases.<\/p>\n<p>We referenced this open-source code, and you can access it <a href=\"https:\/\/github.com\/Azure-Samples\/databricks-observability\">here<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Apache spark monitoring using OpenTelemetry<\/p>\n","protected":false},"author":181546,"featured_media":16081,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[1],"tags":[3400],"class_list":["post-16078","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-cse","tag-ise"],"acf":[],"blog_post_summary":"<p>Apache spark monitoring using OpenTelemetry<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/posts\/16078","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/users\/181546"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/comments?post=16078"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/posts\/16078\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/media\/16081"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/media?parent=16078"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/categories?post=16078"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/tags?post=16078"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}