May 1st, 2018

Runtime Configuration of Spark Streaming Jobs

Kevin Hartman
Senior Software Engineer

Background

Project Fortis is a social data ingestion, analysis, and visualization platform. Originally developed by Microsoft in collaboration with the United Nations Office for the Coordination of Humanitarian Affairs (UN OCHA), Fortis provides planners and scientists with tools to gain insight from social media, public websites and custom data sources. We’ve explored previous work on Project Fortis in other code stories on this blog, such as Project Fortis: Accelerating UN Humanitarian Aid Planning with GraphQL and Building a Custom Spark Connector for Near Real-Time Speech-to-Text Transcription.

Introduction

In Project Fortis, we use Spark Streaming to consume and process real-time news and events from a variety of sources, including social media outlets such as Twitter and Facebook. This article assumes familiarity with Apache Spark Streaming application development.  As a part of this processing, we perform sentiment and geolocation analysis as well as entity extraction, ultimately aggregating this information into a Cassandra database, which powers a web service used to visualize it.

One notable project requirement for Fortis is that many aspects of the processing done by Spark must be user-tunable; for example, our Spark application makes use of both a whitelist and a blacklist to filter out incoming events that are uninteresting to the user. These lists, among other settings, are maintained through an admin dashboard from within our web service.  It’s important that the user is able to update these lists at any time.

To satisfy this requirement of our work on Project Fortis, we needed to be able to communicate with Spark to reconfigure our data processing logic at runtime in response to configuration changes from an external service. In this code story, we’ll explore how we were able to achieve this goal using Spark transformations and Azure Service Bus.

Runtime Reconfiguration

One of Spark’s limitations is that it’s not possible to change an application’s DStream graph after the streaming context has been started. From an API perspective, this graph is comprised of all DStream transformation lineages which lead from an output operation (i.e.,printforeachRDD, etc.) back to a source input stream (receiver, direct input stream, etc.). In other words, the hierarchy of the DStream graph defines the configuration and ordering of the stages which comprise an application’s data-processing pipeline(s).

For example, myDStream.transform(...).map(...).filter(...).print()defines a pipeline in which all RDDs arriving on the stream “myDStream” will be transformed by some function, mapped by another, filtered by yet another, and finally printed.

While we cannot change pipeline stages or their ordering, we can use generic transformations and output actions that operate at the RDD-level  (namely transform and foreachRDD, respectively) which give us a chance to run arbitrary code for each incoming RDD (batch).

Functions (callbacks) we provide to foreachRDD and transform are executed within the driver process. They’re designed to build an execution plan for each RDD; they set up the RDD-level transformations that will occur for each individual RDD, and scheduling is done from the Spark driver. We were able to make use of this to satisfy our goal; before we process each RDD, we poll an external service for configuration changes and adjust the logic applied to the current and future RDDs.

Designing our Approach

For a deeper understanding of the approach covered here, check out our supplemental blog post, Spark Streaming Transformations : A Deep-dive, which captures some non-obvious behaviors and limitations of Spark which we’d uncovered throughout its design.

By writing some setup code to run on the driver for each incoming RDD, we can reference an external service and reactively change how we process data (i.e., in response to another service). Useful external services for this purpose include databases, web services, and messaging queue services like Azure Service Bus (more on this option below), but the options are of course limitless.

For Project Fortis’s requirements, it made the most sense to place the code to do this within a transform callback (a transformation). We chose this over foreachRDD (an output operation) since it allowed us to modify our configuration the farthest upstream within our data pipeline. We were then able to use the resulting DStream to supply data to completely separate downstream transformation sequences (data pipelines). For simpler applications with a single output operation, using foreachRDD should suffice.

When using transform, however, work done in the callback must not be long-running (a Spark thread will invoke the callback). This limitation rules out network communication, making it more difficult to communicate with external services. To solve this, we used a worker/background thread to fetch and prepare data from our external service, passing it off to the Spark driver thread(s) only once ready.

However, this approach itself added additional complexity due to serialization concerns around Spark checkpointing. We needed to find a way to share the mutable state between the Spark thread executing our transform callback and our background thread, which we’d started prior to the streaming context, near our application’s entry point. This problem is somewhat non-trivial given that the state of the transform function is serialized by Spark during checkpointing, so upon recovery/deserialization, no direct references from this code to or from objects we had control over in the old JVM process would exist.

We found the most elegant solution to the above was to declare the mutable state through which these threads would communicate as static memory. In this way, the Spark thread(s) always refer to the current copy, as does startup code. This works because static fields are not serialized, and instead follow the lifetime of the process; our transform callback would have a code reference to the shared state. The implication of this is that we needed to ensure that the state was initialized properly prior to starting the Spark streaming context, even after recovery from checkpointing. In practice, we could just add some more initialization code again at the entry point to our application both before running the background thread and (transitively) before we telling the streaming context to start.

Azure Service Bus

When designing Project Fortis, we chose to use Azure Service Bus as the external service from which we’d receive runtime configuration changes. Azure Service Bus is a highly available reliable message queue service which can be used to achieve exactly-once semantics for messages, which is what we wanted for our use cases. Furthermore, its API supports long-polling, allowing our Spark application to listen on a socket under the hood in order to begin processing configuration changes immediately.

To best illustrate the concepts of the approach we took with Project Fortis, we’ve created a bare-bones sample project. In the sections below, we’ll explore its design. While the sample is based on Azure Service Bus, the approach it illustrates can be easily extended for use with other external services.

Sample Project

The sample project consists of a simple Spark application (with checkpointing enabled) which processes a self-generated sample stream of string RDDs. The logic of the Spark application prefixes each string in the stream with data from a configuration object, which is updated in real-time using Azure Service Bus. For each RDD (batch) in the stream, the contents are printed to the console (batch interval is 5 seconds). By sending new configuration objects to the Service Bus Queue on which the application is configured to listen, the output of each batch is affected.

The video below shows a demonstration of the semantics achievable using the sample project.

Video Steps

  1. Start the Spark application.
  2. Observe the console output.
  3. Send an updated config JSON object using Service Bus Explorer to change the computation of future batches.
  4. Observe the console output has changed in response.
  5. Repeat a few times with different data.

Project Structure

build.sbt

In order to integrate our Spark Streaming application with Azure Service Bus, we first needed to import its client library by adding a project reference to the latest version (at the time of writing, 1.1.0) in our build.sbt:

// https://mvnrepository.com/artifact/com.microsoft.azure/azure-servicebus
libraryDependencies += "com.microsoft.azure" % "azure-servicebus" % "1.1.0"
Main.scala

Next, let’s look at the entry-point to the application: the Main object. It’s here that we need to set up the client to listen for new messages on our Azure Service Bus queue, configure our data stream, and start the streaming context.

The first interesting section initializes ConfigManagerSingleton with a configuration object. This is the code mentioned above which will initialize our static shared mutable state on program start.

// Initialize ConfigManagerSingleton, which is used to pass updated configs from Service Bus to Spark
ConfigManagerSingleton.init(
  initialConfig = Config("Hello ")
)

The ConfigManagerSingleton will be covered in more detail in a section below. For now, it’s important to know that the above Config will be available to our Spark logic until it is replaced by a new one arriving on our Service Bus queue. In a production application, you may want to consider storing initial configuration values in a database, loading them as above (we did this in Project Fortis).

Next, we configure the Service Bus client library (QueueClient) to listen for messages.

// Initialize connection to Service Bus
val queueClient = new QueueClient(
  new ConnectionStringBuilder(busConnStr, busQueueName),
  ReceiveMode.PEEKLOCK
)

We use the PEEKLOCK receive mode, which avoids removing incoming messages from the queue until we’ve explicitly acknowledged them as processed. This guarantees that we’ll have had a chance to handle each message at least once. Next, we will:

  • register a callback that will be invoked for each incoming message on a Service Bus client library worker thread
  • register a callback invoked by Spark to set up our stream (createStreamingContext)
  • start the streaming context
try {
  // Register config manager as handler
  queueClient.registerMessageHandler(ConfigManagerSingleton,
    new MessageHandlerOptions(
      1, // Max concurrent calls
      true,
      Duration.ofMinutes(5)
    )
  )

  // Get StreamingContext from checkpoint data or create a new one
  val context = StreamingContext.getOrCreate(checkpointDir, createStreamingContext)

  // Start the context
  context.start()
  context.awaitTermination()
} finally {
  queueClient.close()
}

Note the values passed to the MessageHandlerOptions constructor. The first specifies to the client library that we do not want our callback to be invoked concurrently. The second indicates that we want the client library to automatically mark messages as handled if the callback executes fully. The last indicates that the max auto renew duration is 5 minutes (which is the default, but we need to pass it to satisfy this constructor’s argument list).

The createStreamingContext callback is executed by Spark on startup if a checkpoint does not exist:

private def createStreamingContext(): StreamingContext = {
  ...

  // Create a static stream with a few hundred RDDs, each containing a set of test strings.
  val stream = ExampleStream(ssc)

  val transformedStream = stream.transform(rdd => {
    // The body of transform will execute on the driver for each RDD.
    // It's here that we can fetch the latest config from the Service Bus thread.
    val config = ConfigManagerSingleton.get()

    rdd.map(value => config.prefix + value)
  })

  ...
}

The ExampleStream class creates a DStream of sample data. We won’t cover it here, but its implementation can be found in the project’s repo.

We use the transform function of DStream to register a callback which is invoked for each RDD arriving on the stream. At the start of this callback, we get the latest Config instance from the Service Bus thread (more on this in the ConfigManagerSingleton section), and then define an RDD transformation for the current RDD which prefixes each of its elements with the value of config.prefix. Recall that the map function applied to the RDD will be executed in parallel on worker threads, while ConfigManagerSingleton.get() will be executed on the driver.

ConfigManagerSingleton.scala

The static ConfigManagerSingleton object encapsulates the shared mutable state which we’ll use to pass new configuration changes (instances of Config sent over Service Bus) to our Spark code. It consists of:

  • fields for mutable state
  • a state initialization function
  • an update function called by a Service Bus thread when a configuration change occurs
  • a state accessor function called by our Spark driver-side code

Our state includes:

Field Purpose
nextConfig Used to synchronously hand off new Config instances from our Service Bus worker thread to Spark threads. The Service Bus worker thread will synchronously offer a new config to the Spark threads, blocking until one of them takes the config.
config Holds the current Config, which will be updated by a Spark thread upon a successful handoff. Its initial value must be set manually from within the startup code, using the init function shown below.

Note that this field is volatile since it declares a variable reference which will be updated and read by different threads.

Here it is seen in the code:

private val nextConfig: SynchronousQueue[Config] = new SynchronousQueue[Config]()

@volatile private var config: Config = _
def init(initialConfig: Config): Unit = {
  config = initialConfig
}

When a new Config arrives on the Service Bus queue, a Service Bus thread will invoke the following update callback:

override def onMessageAsync(message: IMessage): CompletableFuture[Void] = {
  implicit val _ = net.liftweb.json.DefaultFormats

  val messageStr = new String(message.getBody, "UTF-8")
  val json = parse(messageStr)
  val config = json.extractOpt[Config]

  if (config.isDefined) {
    // Block for up to two minutes for a Spark thread to acknowledge the updated
    // state.
    if (!nextConfig.offer(config.get, 2, TimeUnit.MINUTES)) {
      throw new Exception("No Spark thread acknowledged the update message within the timeout.")
    }
  }

  CompletableFuture.completedFuture(null)
}

The body of the Service Bus message (JSON) is parsed for a Config instance.

The Config is passed to a Spark thread by offering it on our SynchronousQueue. The offer call will block until either a Spark thread has received the Config or until the timeout has been exceeded, at which point an exception is thrown to let the Service Bus client library know that the message was not acknowledged. If the offer succeeds, a Spark thread has received the new Config.

The success of the offer call only guarantees that a consuming Spark thread got possession of the Config. This should be sufficient for our purposes (it’s unimportant if the corresponding batch is processed successfully using this exact config, assuming we initialize ConfigManagerSingleton with the latest config on (re)startup).

On arrival of each batch, Spark threads invoke our transform callback, which accesses ConfigManagerSingleton‘s shared state via the following function:

def get(): Config = this.synchronized {
  // Grab the next config from the service bus client thread if one is ready. Else, return the current config.
  val value = Option(nextConfig.poll(0, TimeUnit.SECONDS))

  if (value.isDefined) {
    config = value.get
  }

  config
}

This function runs quickly since it does not wait for the Service Bus client thread to offer a Config; if a new Config is not available right now, then the old one is returned instead, which satisfies the requirement that our transform function’s callback must not be long-running.

Note that the function itself is a critical section to avoid interleavings of calling (Spark) threads which could otherwise cause the value of ConfigManagerSingleton.config to become stale. This is only relevant when spark.streaming.concurrentJobs is enabled, but even then shouldn’t cause any Spark thread to wait too long since the critical section itself does not block.

Notes and Caveats

For many applications (Fortis included), more work may be required to prepare configuration data for use by Spark threads. In that case, this work should be done on the (Service Bus) worker thread.

In Fortis, the filter word lists we send along with our configuration data are large. To avoid serializing them with each Spark task, we broadcasted them somewhere in our equivalent of ConfigManagerSingleton.get (and hence on a Spark driver thread). Broadcasting is not especially expensive since it’s implemented lazily (Spark executors reach back to the driver when referencing a broadcast), so this should be fine to do within transform. While it may be tempting to do this from the (Service Bus) worker thread, Spark threads appear to have a custom thread-local state which I expect may be required for a successful broadcast.

For complex applications, it may be necessary to think carefully about the interplay between configuration changes and data checkpointing and persistence. For example, if an RDD processed with configuration A were persisted, a new configuration B applied at a later point in time would not affect that data. As another example, consider a pipeline defined as follows, where upstream and downstream configurations control separate data processing settings:myDStream.transform(<upstream config>).checkpoint().transform(<downstream config>)If recovering from checkpoint, the effects of the upstream config may be much older than those of the downstream config, since its resulting RDD will have been recovered from disk rather than recomputed.

Sample Project

The sample project’s ExampleStream is backed by Spark’s QueueStream, which does not support checkpointing. Consequently, recovering the sample application from a checkpoint will fail. Checkpointing is included to demonstrate how the approach taken here can be correctly integrated into a production scenario in which checkpointing is enabled. Before running the sample, ensure the specified checkpoint folder is emptied.

Since the Service Bus thread synchronously and serially passes Configs to Spark, each Config in the Service Bus queue will be used for the configuration of at least one RDD, even if a newer one exists subsequently in the queue. For some applications, it may be desirable to skip older Configs which are waiting in the queue until no newer ones are present. In this case, it should be possible to manually poll additional Configs from the queue within the onMessageAsync callback until no more can be found, passing only the latest one to Spark.

Both the sample application as well as Fortis use the Service Bus client library’s QueueClient to take advantage of the library’s built-in message pump loop/thread pool. However, using this client does not necessarily guarantee that messages will be delivered in the order in which they are submitted to the Service Bus queue (though for our human/UI-triggered use-cases, ordering appeared to be temporally sequential), since it does not support message sessions. For applications which require ordering guarantees, enable Message Sessions for the queue. The drawback to this approach is of course that the QueueClient message loop cannot be used, and no equivalent exists today within the Service Bus Java client library (though an issue captures this feature request on GitHub).

Epilogue

Throughout this article, we’ve covered the approach we took to enable runtime reconfiguration of the Spark Streaming data pipeline used in Project Fortis. Using Azure Service Bus, we were able to communicate with our Spark application from Fortis’s admin UI, allowing administrators of Fortis deployments to control settings such as information filtering, geofences, and more without code modification and without application restarts (zero downtime). Developers should be able to extend the principles employed by the provided sample project and technical deep-dive to work with other external services for applications which cannot feasibly use Azure Service Bus.

Feel free to reach out in the comments section below with questions and feedback!

Resources

Author

Kevin Hartman
Senior Software Engineer

0 comments

Discussion are closed.