August 7th, 2025
heartlike2 reactions

Learnings from External Data Handling

Intro

Our ISE team at Microsoft recently completed an engagement with a large industrial customer. The system we developed was distributed between the edge and the cloud and made use of technologies including Python, Azure IoT, Kubernetes, Redis and Dapr. The system was built prior to this engagement and our goal for this phase was to make the system production ready and handle a greater scale of messages.

In this system, messages originating from the edge would be sent to the cloud where they would be augmented with additional data from the company’s various internal APIs and document storage. Augmented messages would then be processed further in the system; however, this blog post will focus on features surrounding the augmenting process.

Problems

From initial investigations a couple of problems were observed within the message augmentation process.

Speed of processing

A single message could take up to as much as a minute to go through the full augmentation process. With our goal in mind this was deemed insufficient.

Influencing factors:

  • In some cases the data returned from the APIs was very large.
  • Only basic caching was in place and many messages required the same data resulting in repeated retrieval and processing of the same data.
  • Some of the company’s internal APIs had ‘rate limiting’ in place. Meaning, if the rate limit was hit, we would have to wait a determined length of time before attempting to call again.
  • Each message was processed sequentially, and each step of the augmentation was performed sequentially as well.

Out of memory exceptions

On the container performing the augmentation process exit code 137 / OOMKilled was observed frequently. This was a significant problem as this would cause the message being processed at the time of error to be dropped.

The major contributor to this was the way external documents were processed from the Storage Accounts by using the .readall() method of the Azure storage client library. Reading the entire document into memory with large documents was easy to encounter the container memory limit and cause the container to be shutdown.

Example below, taken from Download a blob with Python – Azure Storage | Microsoft Learn, shows downloading the complete contents of the sample-blob.txt file using this method and prints the contents.

def download_blob_to_string(self, blob_service_client: BlobServiceClient, container_name):
    blob_client = blob_service_client.get_blob_client(container=container_name, blob="sample-blob.txt")
    # encoding param is necessary for readall() to return str, otherwise it returns bytes
    downloader = blob_client.download_blob(max_concurrency=1, encoding='UTF-8')
    blob_text = downloader.readall()
    print(f"Blob contents: {blob_text}")

No queuing

Any message sent to the cloud was attempted to be processed immediately. No queuing system was in place. Being cautious of future growth in the system and the number of messages it may process, this was deemed a problem.

Solutions

This next section will describe a number of actions taken to resolve the problems previously identified.

Dedicated Caching Process

We introduced a separate process with the sole purpose of building a cache of frequently used data between messages. This ran in its own container on a CRON schedule, allowing easy control of how frequently we update the cache.

This separation of concerns gave many advantages:

  • This reduced the need to lookup / filter any commonly used data from APIs or data storage, resulting in faster message processing.
  • Avoids situations where a failure in accessing data sources does not result in a failure of processing a message.
  • Significant reduction in memory usage during message processing. Higher memory usage is only observed during building of cache, but this is known and manageable. Memory and CPU can be added to the container to speed up the cache building process, but having lower limits no longer affects the message processing.
  • When we scale to have multiple instances of the augmentation process, they can all share the same cached data.

Redis Queue / Dapr

A cloud side queue was implemented to ingest edge messages and have the augmentation process consume messages from the queue for processing.

This was done with a Redis container for the queue which was then accessed via Dapr.

Dapr was chosen as it provides an interface to a variety of backend implementations of queueing / storage etc. The team had considerations of other queuing implementations, such as Azure Event Grid. With Dapr in place if the customer development team wish to rework their architecture this will be easy to do.

The code sample below demonstrates publishing events to queue with the DaprClient class. As can be seen, there is no reference to the underlying implementation. Example source How to: Publish a message and subscribe to a topic | Dapr Docs.

with DaprClient() as client:
    result = client.publish_event(
        pubsub_name=PUBSUB_NAME,
        topic_name=TOPIC_NAME,
        data=json.dumps(my_data),
        data_content_type='application/json',
    )

The next sample shows the subscription to a topic. Sample taken from Declarative, streaming, and programmatic subscription types | Dapr Docs.

from cloudevents.sdk.event import v1

@app.route('/orders', methods=['POST'])
def process_order(event: v1.Event) -> None:
    data = json.loads(event.Data())
    logging.info('Subscriber received: ' + str(data))

The reference to the Redis implementation is defined within a Dapr Component configuration file.

An example Redis pub/sub component definition follows Redis Streams | Dapr Docs.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
    name: redis-pubsub
spec:
    type: pubsub.redis
    version: v1
    metadata:
    - name: redisHost
        value: localhost:6379
    - name: redisPassword
        value: "<Password>"
    - name: consumerID
        value: "channel1"
    - name: useEntraID
        value: "true"
    - name: enableTLS
        value: "false"

The complimenting subscription definition is as follows How to: Publish a message and subscribe to a topic | Dapr Docs.

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: order-pub-sub
spec:
topic: orders
routes: 
    default: /orders
pubsubname: redis-pubsub
scopes:
- orderprocessing 

Read docs as a stream

As previously mentioned, when reading documents from blob storage the system used the .readall() method loading the complete document into memory. It was not foreseen that we would read such large documents for this to be a problem.

The method of reading was changed to read documents by chunks as a stream.

Note: This approach is best suited to data that can be consumed in chunks or lines E.g. .csv or textual data.

Example below shows the equivalent from the previous sample now iterating on each chunk of the document.

def download_blob_chunks(self, blob_service_client: BlobServiceClient, container_name):
    blob_client = blob_service_client.get_blob_client(container=container_name, blob="sample-blob.txt")
    # This returns a StorageStreamDownloader
    stream = blob_client.download_blob()
    chunk_list = []

    # Read data in chunks to avoid loading all into memory at once
    for chunk in stream.chunks():
        # Process your data (anything can be done here - 'chunk' is a byte array)
        chunk_list.append(chunk)

Source Download a blob with Python – Azure Storage | Microsoft Learn.

Load testing

Load testing was performed at the beginning of the project and after the discussed changes were implemented.

The graph below demonstrates the change in performance. Here we measure the time taken to process a number of messages.

load-testing-graph

As you can see, a significant performance improvement has been made. These changes have been successful in contributing to the goal of having the system production ready and to be able to handle a greater scale of messages.

Future Improvements

Storage with Dapr

Dapr has the ability to abstract state storage in a similar fashion to the queue mechanism previously described. Quickstart demo available here.

with DaprClient() as client:

    # Save state into the state store
    client.save_state(DAPR_STORE_NAME, orderId, str(order))
    logging.info('Saving Order: %s', order)

    # Get state from the state store
    result = client.get_state(DAPR_STORE_NAME, orderId)
    logging.info('Result after get: ' + str(result.data))

    # Delete state from the state store
    client.delete_state(store_name=DAPR_STORE_NAME, key=orderId)
    logging.info('Deleting Order: %s', order)

Here we use the DaprClient class to interface with the underlying state storage implementation.

With an appropriate state storage component file defined. Such as the following for Azure Blob Storage.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: <NAME>
spec:
  type: state.azure.blobstorage
  version: v2
  metadata:
  - name: accountName
    value: "[your_account_name]"
  - name: accountKey
    value: "[your_account_key]"
  - name: containerName
    value: "[your_container_name]"

Similarly to the queuing system, this provides the flexibility to change the implemented storage backend via configuration and not code.

Scaling

Current Scaling

The augmentation process container is currently scaled using the Kubernetes built-in Horizontal Pod Autoscaler (HPA) to increase the number of instances. Scaling is based on CPU %, with the assumption that when we have high CPU usage we are processing a high number of messages.

An example Helm chart for HPA is as follows:

apiVersion: autoscaling/v1
kind: HorizontalPodAutoscaler
metadata:
  name: contextualizer
  namespace: default
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: contextualizer
  minReplicas: 1
  maxReplicas: 5
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 75

The value averageUtilization: 75 means that we calculate the average cpu utilization across the current set of pods and if we exceed 75 then the system will scale. Further details on HPA can be found here.

KEDA

An alternative to HPA is the use of Kubernetes Event Driven Autoscaling (KEDA). As the name suggests this scaling framework is based on events/metrics observed elsewhere in the system. Further information can be found here

In our situation we could use the message queue length metric to determine when to increase or decrease the number of instances. This would ensure we are appropriately consuming in the incoming queue at a desired rate.

Scaling is achieved by defining a ScaledObject definition. An example with Azure EventHub is shown below.

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: azure-eventhub-scaled-object
  namespace: default
spec:
  scaleTargetRef:
    name: azure-eventhub-function
  minReplicaCount: 0 # Change to define how many minimum replicas you want
  maxReplicaCount: 10
  # The period to wait after the last trigger reported active before scaling the resource back to 0.
  # By default it’s 5 minutes (300 seconds).
  cooldownPeriod: 5
  triggers:
    - type: azure-eventhub
      metadata:
        # Required
        storageConnectionFromEnv: AzureWebJobsStorage
        # Required if not using Pod Identity
        connectionFromEnv: EventHub
        # Required if using Pod Identity
        eventHubNamespace: AzureEventHubNameSpace
        eventHubName: NameOfTheEventHub
        # Optional
        consumerGroup: $Default # default: $Default
        unprocessedEventThreshold: "64" # default 64 events.
        blobContainer: container

The property unprocessedEventThreshold allows us to define the threshold the system will begin scaling from. minReplicaCount and maxReplicaCount determine the number of replicas we scale to.

Key Take-aways

Testing

If you are in a situation where performance is a concern, having a quantitative measure of what you are attempting to improve is vital. The means of testing should also be easily repeatable in order to have comparisons before and after any changes.

Assumptions on data

Take care to analyse and not under-estimate the size of data to be consumed. The size of data used by the augmentation process was a significant contributor to the problems faced with out of memory exceptions and speed of processing. Had this been identified sooner, the initial system design could have changed to accommodate this.

Additionally, be aware of using functions such as .readall() method of the Azure storage client library and its effect on memory usage.

Queuing

When working with a system in which you want to process many messages and where such number of messages can fluctuate it is advisable to make use of a queueing mechanism. Allowing to to take advantage of common queue functions such as retry logic, dead-lettering and scaling based on queue metrics.