{"id":16318,"date":"2025-08-07T00:00:00","date_gmt":"2025-08-07T07:00:00","guid":{"rendered":"https:\/\/devblogs.microsoft.com\/ise\/?p=16318"},"modified":"2025-08-07T07:44:41","modified_gmt":"2025-08-07T14:44:41","slug":"learnings-from-external-data-handling-learnings","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/ise\/learnings-from-external-data-handling-learnings\/","title":{"rendered":"Learnings from External Data Handling"},"content":{"rendered":"<h1>Intro<\/h1>\n<p>Our <code>ISE<\/code> team at <code>Microsoft<\/code> recently completed an engagement with a large industrial customer. The system we developed was distributed between the edge and the cloud and made use of technologies including <code>Python<\/code>, <code>Azure IoT<\/code>, <code>Kubernetes<\/code>, <code>Redis<\/code> and <code>Dapr<\/code>. The system was built prior to this engagement and our goal for this phase was to make the system production ready and handle a greater scale of messages.<\/p>\n<p>In this system, messages originating from the edge would be sent to the cloud where they would be augmented with additional data from the company\u2019s various internal APIs and document storage. Augmented messages would then be processed further in the system; however, this blog post will focus on features surrounding the augmenting process.<\/p>\n<h2>Problems<\/h2>\n<p>From initial investigations a couple of problems were observed within the message augmentation process.<\/p>\n<h3>Speed of processing<\/h3>\n<p>A single message could take up to as much as a minute to go through the full augmentation process. With our goal in mind this was deemed insufficient.<\/p>\n<p>Influencing factors:<\/p>\n<ul>\n<li>In some cases the data returned from the APIs was very large.<\/li>\n<li>Only basic caching was in place and many messages required the same data resulting in repeated retrieval and processing of the same data.<\/li>\n<li>Some of the company\u2019s internal APIs had \u2018rate limiting\u2019 in place. Meaning, if the rate limit was hit, we would have to wait a determined length of time before attempting to call again.<\/li>\n<li>Each message was processed sequentially, and each step of the augmentation was performed sequentially as well.<\/li>\n<\/ul>\n<h3>Out of memory exceptions<\/h3>\n<p>On the container performing the augmentation process exit code <code>137<\/code> \/ <code>OOMKilled<\/code> was observed frequently. This was a significant problem as this would cause the message being processed at the time of error to be dropped.<\/p>\n<p>The major contributor to this was the way external documents were processed from the Storage Accounts by using the <code>.readall()<\/code> method of the <code>Azure storage client library<\/code>. Reading the entire document into memory with large documents was easy to encounter the container memory limit and cause the container to be shutdown.<\/p>\n<p>Example below, taken from <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/storage\/blobs\/storage-blob-download-python#download-to-a-string\">Download a blob with Python &#8211; Azure Storage | Microsoft Learn<\/a>, shows downloading the complete contents of the <code>sample-blob.txt<\/code> file using this method and prints the contents.<\/p>\n<pre><code class=\"language-python\">def download_blob_to_string(self, blob_service_client: BlobServiceClient, container_name):\r\n    blob_client = blob_service_client.get_blob_client(container=container_name, blob=\"sample-blob.txt\")\r\n    # encoding param is necessary for readall() to return str, otherwise it returns bytes\r\n    downloader = blob_client.download_blob(max_concurrency=1, encoding='UTF-8')\r\n    blob_text = downloader.readall()\r\n    print(f\"Blob contents: {blob_text}\")<\/code><\/pre>\n<h3>No queuing<\/h3>\n<p>Any message sent to the cloud was attempted to be processed immediately. No queuing system was in place. Being cautious of future growth in the system and the number of messages it may process, this was deemed a problem.<\/p>\n<h2>Solutions<\/h2>\n<p>This next section will describe a number of actions taken to resolve the problems previously identified.<\/p>\n<h3>Dedicated Caching Process<\/h3>\n<p>We introduced a separate process with the sole purpose of building a cache of frequently used data between messages. This ran in its own container on a <a href=\"https:\/\/crontab.guru\/\"><code>CRON<\/code><\/a> schedule, allowing easy control of how frequently we update the cache.<\/p>\n<p>This separation of concerns gave many advantages:<\/p>\n<ul>\n<li>This reduced the need to lookup \/ filter any commonly used data from APIs or data storage, resulting in faster message processing.<\/li>\n<li>Avoids situations where a failure in accessing data sources does not result in a failure of processing a message.<\/li>\n<li>Significant reduction in memory usage during message processing. Higher memory usage is only observed during building of cache, but this is known and manageable. Memory and CPU can be added to the container to speed up the cache building process, but having lower limits no longer affects the message processing.<\/li>\n<li>When we scale to have multiple instances of the augmentation process, they can all share the same cached data.<\/li>\n<\/ul>\n<h3><code>Redis<\/code> Queue \/ <code>Dapr<\/code><\/h3>\n<p>A cloud side queue was implemented to ingest edge messages and have the augmentation process consume messages from the queue for processing.<\/p>\n<p>This was done with a <a href=\"https:\/\/hub.docker.com\/_\/redis\/\"><code>Redis<\/code> container<\/a> for the queue which was then accessed via <a href=\"https:\/\/dapr.io\"><code>Dapr<\/code><\/a>.<\/p>\n<p><code>Dapr<\/code> was chosen as it provides an interface to a variety of backend implementations of queueing \/ storage etc. The team had considerations of other queuing implementations, such as <code>Azure Event Grid<\/code>. With <code>Dapr<\/code> in place if the customer development team wish to rework their architecture this will be easy to do.<\/p>\n<p>The code sample below demonstrates publishing events to queue with the <code>DaprClient<\/code> class. As can be seen, there is no reference to the underlying implementation. Example source <a href=\"https:\/\/docs.dapr.io\/developing-applications\/building-blocks\/pubsub\/howto-publish-subscribe\/#publish-a-message\">How to: Publish a message and subscribe to a topic | Dapr Docs<\/a>.<\/p>\n<pre><code class=\"language-python\">with DaprClient() as client:\r\n    result = client.publish_event(\r\n        pubsub_name=PUBSUB_NAME,\r\n        topic_name=TOPIC_NAME,\r\n        data=json.dumps(my_data),\r\n        data_content_type='application\/json',\r\n    )<\/code><\/pre>\n<p>The next sample shows the subscription to a topic. Sample taken from <a href=\"https:\/\/docs.dapr.io\/developing-applications\/building-blocks\/pubsub\/subscription-methods\/#declarative-subscriptions\">Declarative, streaming, and programmatic subscription types | Dapr Docs<\/a>.<\/p>\n<pre><code class=\"language-python\">from cloudevents.sdk.event import v1\r\n\r\n@app.route('\/orders', methods=['POST'])\r\ndef process_order(event: v1.Event) -&gt; None:\r\n    data = json.loads(event.Data())\r\n    logging.info('Subscriber received: ' + str(data))<\/code><\/pre>\n<p>The reference to the <code>Redis<\/code> implementation is defined within a <code>Dapr<\/code> <code>Component<\/code> configuration file.<\/p>\n<p>An example <code>Redis<\/code> pub\/sub component definition follows <a href=\"https:\/\/docs.dapr.io\/reference\/components-reference\/supported-pubsub\/setup-redis-pubsub\/\">Redis Streams | Dapr Docs<\/a>.<\/p>\n<pre><code class=\"language-yaml\">apiVersion: dapr.io\/v1alpha1\r\nkind: Component\r\nmetadata:\r\n    name: redis-pubsub\r\nspec:\r\n    type: pubsub.redis\r\n    version: v1\r\n    metadata:\r\n    - name: redisHost\r\n        value: localhost:6379\r\n    - name: redisPassword\r\n        value: \"&lt;Password&gt;\"\r\n    - name: consumerID\r\n        value: \"channel1\"\r\n    - name: useEntraID\r\n        value: \"true\"\r\n    - name: enableTLS\r\n        value: \"false\"<\/code><\/pre>\n<p>The complimenting subscription definition is as follows <a href=\"https:\/\/docs.dapr.io\/developing-applications\/building-blocks\/pubsub\/howto-publish-subscribe\/#subscribe-to-topics\">How to: Publish a message and subscribe to a topic | Dapr Docs<\/a>.<\/p>\n<pre><code class=\"language-yaml\">apiVersion: dapr.io\/v2alpha1\r\nkind: Subscription\r\nmetadata:\r\nname: order-pub-sub\r\nspec:\r\ntopic: orders\r\nroutes: \r\n    default: \/orders\r\npubsubname: redis-pubsub\r\nscopes:\r\n- orderprocessing <\/code><\/pre>\n<h3>Read docs as a stream<\/h3>\n<p>As previously mentioned, when reading documents from blob storage the system used the <code>.readall()<\/code> method loading the complete document into memory. It was not foreseen that we would read such large documents for this to be a problem.<\/p>\n<p>The method of reading was changed to read documents by chunks as a stream.<\/p>\n<blockquote><p>Note:\nThis approach is best suited to data that can be consumed in chunks or lines\nE.g. <code>.csv<\/code> or textual data.<\/p><\/blockquote>\n<p>Example below shows the equivalent from the <a href=\"#out-of-memory-exceptions\">previous sample<\/a> now iterating on each chunk of the document.<\/p>\n<pre><code class=\"language-python\">def download_blob_chunks(self, blob_service_client: BlobServiceClient, container_name):\r\n    blob_client = blob_service_client.get_blob_client(container=container_name, blob=\"sample-blob.txt\")\r\n    # This returns a StorageStreamDownloader\r\n    stream = blob_client.download_blob()\r\n    chunk_list = []\r\n\r\n    # Read data in chunks to avoid loading all into memory at once\r\n    for chunk in stream.chunks():\r\n        # Process your data (anything can be done here - 'chunk' is a byte array)\r\n        chunk_list.append(chunk)<\/code><\/pre>\n<p>Source <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/storage\/blobs\/storage-blob-download-python#download-a-blob-in-chunks\">Download a blob with Python &#8211; Azure Storage | Microsoft Learn<\/a>.<\/p>\n<h2>Load testing<\/h2>\n<p>Load testing was performed at the beginning of the project and after the discussed changes were implemented.<\/p>\n<p>The graph below demonstrates the change in performance. Here we measure the time taken to process a number of messages.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/devblogs.microsoft.com\/ise\/wp-content\/uploads\/sites\/55\/2025\/08\/load-testing-graph.png\" alt=\"load-testing-graph\" \/><\/p>\n<p>As you can see, a significant performance improvement has been made. These changes have been successful in contributing to the goal of having the system production ready and to be able to handle a greater scale of messages.<\/p>\n<h2>Future Improvements<\/h2>\n<h3>Storage with <code>Dapr<\/code><\/h3>\n<p><code>Dapr<\/code> has the ability to abstract state storage in a similar fashion to the queue mechanism previously described. Quickstart demo available <a href=\"https:\/\/docs.dapr.io\/getting-started\/quickstarts\/statemanagement-quickstart\/\">here<\/a>.<\/p>\n<pre><code class=\"language-python\">with DaprClient() as client:\r\n\r\n    # Save state into the state store\r\n    client.save_state(DAPR_STORE_NAME, orderId, str(order))\r\n    logging.info('Saving Order: %s', order)\r\n\r\n    # Get state from the state store\r\n    result = client.get_state(DAPR_STORE_NAME, orderId)\r\n    logging.info('Result after get: ' + str(result.data))\r\n\r\n    # Delete state from the state store\r\n    client.delete_state(store_name=DAPR_STORE_NAME, key=orderId)\r\n    logging.info('Deleting Order: %s', order)<\/code><\/pre>\n<p>Here we use the <code>DaprClient<\/code> class to interface with the underlying state storage implementation.<\/p>\n<p>With an appropriate state storage component file defined. Such as the following for <a href=\"https:\/\/docs.dapr.io\/reference\/components-reference\/supported-state-stores\/setup-azure-blobstorage\/#component-format\"><code>Azure Blob Storage<\/code><\/a>.<\/p>\n<pre><code class=\"language-yaml\">apiVersion: dapr.io\/v1alpha1\r\nkind: Component\r\nmetadata:\r\n  name: &lt;NAME&gt;\r\nspec:\r\n  type: state.azure.blobstorage\r\n  version: v2\r\n  metadata:\r\n  - name: accountName\r\n    value: \"[your_account_name]\"\r\n  - name: accountKey\r\n    value: \"[your_account_key]\"\r\n  - name: containerName\r\n    value: \"[your_container_name]\"<\/code><\/pre>\n<p>Similarly to the queuing system, this provides the flexibility to change the implemented storage backend via configuration and not code.<\/p>\n<h3>Scaling<\/h3>\n<h4>Current Scaling<\/h4>\n<p>The augmentation process container is currently scaled using the <code>Kubernetes<\/code> built-in <code>Horizontal Pod Autoscaler<\/code> (<code>HPA<\/code>) to increase the number of instances. Scaling is based on CPU %, with the assumption that when we have high CPU usage we are processing a high number of messages.<\/p>\n<p>An example <code>Helm<\/code> chart for <code>HPA<\/code> is as follows:<\/p>\n<pre><code class=\"language-yaml\">apiVersion: autoscaling\/v1\r\nkind: HorizontalPodAutoscaler\r\nmetadata:\r\n  name: contextualizer\r\n  namespace: default\r\nspec:\r\n  scaleTargetRef:\r\n    apiVersion: apps\/v1\r\n    kind: Deployment\r\n    name: contextualizer\r\n  minReplicas: 1\r\n  maxReplicas: 5\r\n  metrics:\r\n    - type: Resource\r\n      resource:\r\n        name: cpu\r\n        target:\r\n          type: Utilization\r\n          averageUtilization: 75<\/code><\/pre>\n<p>The value <code>averageUtilization: 75<\/code> means that we calculate the average cpu utilization across the current set of pods and if we exceed 75 then the system will scale.\nFurther details on <code>HPA<\/code> can be found <a href=\"https:\/\/kubernetes.io\/docs\/tasks\/run-application\/horizontal-pod-autoscale\">here<\/a>.<\/p>\n<h3><code>KEDA<\/code><\/h3>\n<p>An alternative to <code>HPA<\/code> is the use of <code>Kubernetes Event Driven Autoscaling<\/code> (<code>KEDA<\/code>). As the name suggests this scaling framework is based on events\/metrics observed elsewhere in the system. Further information can be found <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/aks\/keda-about\">here<\/a><\/p>\n<p>In our situation we could use the message queue length metric to determine when to increase or decrease the number of instances. This would ensure we are appropriately consuming in the incoming queue at a desired rate.<\/p>\n<p>Scaling is achieved by defining a <code>ScaledObject<\/code> definition. An example with <code>Azure EventHub<\/code> is shown below.<\/p>\n<pre><code class=\"language-yaml\">apiVersion: keda.sh\/v1alpha1\r\nkind: ScaledObject\r\nmetadata:\r\n  name: azure-eventhub-scaled-object\r\n  namespace: default\r\nspec:\r\n  scaleTargetRef:\r\n    name: azure-eventhub-function\r\n  minReplicaCount: 0 # Change to define how many minimum replicas you want\r\n  maxReplicaCount: 10\r\n  # The period to wait after the last trigger reported active before scaling the resource back to 0.\r\n  # By default it\u2019s 5 minutes (300 seconds).\r\n  cooldownPeriod: 5\r\n  triggers:\r\n    - type: azure-eventhub\r\n      metadata:\r\n        # Required\r\n        storageConnectionFromEnv: AzureWebJobsStorage\r\n        # Required if not using Pod Identity\r\n        connectionFromEnv: EventHub\r\n        # Required if using Pod Identity\r\n        eventHubNamespace: AzureEventHubNameSpace\r\n        eventHubName: NameOfTheEventHub\r\n        # Optional\r\n        consumerGroup: $Default # default: $Default\r\n        unprocessedEventThreshold: \"64\" # default 64 events.\r\n        blobContainer: container<\/code><\/pre>\n<p>The property <code>unprocessedEventThreshold<\/code> allows us to define the threshold the system will begin scaling from.\n<code>minReplicaCount<\/code> and <code>maxReplicaCount<\/code> determine the number of replicas we scale to.<\/p>\n<h2>Key Take-aways<\/h2>\n<h3>Testing<\/h3>\n<p>If you are in a situation where performance is a concern, having a quantitative measure of what you are attempting to improve is vital. The means of testing should also be easily repeatable in order to have comparisons before and after any changes.<\/p>\n<h3>Assumptions on data<\/h3>\n<p>Take care to analyse and not under-estimate the size of data to be consumed. The size of data used by the augmentation process was a significant contributor to the problems faced with out of memory exceptions and speed of processing.\nHad this been identified sooner, the initial system design could have changed to accommodate this.<\/p>\n<p>Additionally, be aware of using functions such as <code>.readall()<\/code> method of the <code>Azure storage client library<\/code> and its effect on memory usage.<\/p>\n<h3>Queuing<\/h3>\n<p>When working with a system in which you want to process many messages and where such number of messages can fluctuate it is advisable to make use of a queueing mechanism. Allowing to to take advantage of common queue functions such as retry logic, dead-lettering and scaling based on queue metrics.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>This blog post discusses the challenges and solutions encountered by the ISE team at Microsoft while making a distributed system production-ready. It focuses on issues including slow processing speeds and out-of-memory exceptions, and provides insights into the methods used to address these problems.<\/p>\n","protected":false},"author":181523,"featured_media":16332,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[1,3451],"tags":[60,3443,3430,300,3608,3596,3609],"class_list":["post-16318","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-cse","category-ise","tag-azure","tag-dapr","tag-performance","tag-python","tag-queuing","tag-redis","tag-scaling"],"acf":[],"blog_post_summary":"<p>This blog post discusses the challenges and solutions encountered by the ISE team at Microsoft while making a distributed system production-ready. It focuses on issues including slow processing speeds and out-of-memory exceptions, and provides insights into the methods used to address these problems.<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/posts\/16318","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/users\/181523"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/comments?post=16318"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/posts\/16318\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/media\/16332"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/media?parent=16318"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/categories?post=16318"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/tags?post=16318"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}