{"id":194,"date":"2020-07-28T07:34:53","date_gmt":"2020-07-28T14:34:53","guid":{"rendered":"https:\/\/devblogs.microsoft.com\/azure-sdk\/?p=194"},"modified":"2020-07-23T15:17:23","modified_gmt":"2020-07-23T22:17:23","slug":"custom-transport-in-python-sdk-an-httpx-experiment","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/azure-sdk\/custom-transport-in-python-sdk-an-httpx-experiment\/","title":{"rendered":"Custom transport in Python SDK: an HTTPX experiment"},"content":{"rendered":"<p>One of the most important design concepts of the new Azure SDK for Python is that the SDK is modular. This allows us to provide highly customizable HTTP pipelines that users can configure to their own needs. In this article, I will briefly describe the main HTTP pipeline for the Azure SDK for Python, then focus on the HTTP transport and the abstraction behind it. Finally, I will show you how to implement a custom HTTP transport based on the HTTPX library and use it to communicate with Azure Storage.<\/p>\n<p><a href=\"https:\/\/github.com\/encode\/httpx\">HTTPX<\/a> is one of the rising stars of the Python ecosystem. This Python HTTP library is close to the sync API of <code>requests<\/code>, making migration easy. It also supports async programming, allowing you to use an async programming library like <code>asyncio<\/code>.<\/p>\n<h2>HTTP pipeline architecture<\/h2>\n<p>The HTTP pipeline is split into two concepts:<\/p>\n<ul>\n<li>The policies: These act on a request before being sent and\/or on a response before being returned to the user.<\/li>\n<li>The transport: These convert a request prepared by the policies into actual wire calls.<\/li>\n<\/ul>\n<p>For example, let&#8217;s say we want to send a request with a specific UserAgent, using the transport that wraps <code>requests<\/code>:<\/p>\n<pre><code>import json\n\nfrom azure.core.pipeline.transport import RequestsTransport, HttpRequest\nfrom azure.core.pipeline.policies import UserAgentPolicy\nfrom azure.core.pipeline import Pipeline\n\npolicies = [\n    UserAgentPolicy(\"myapplication\/1.0\")\n]\ntransport = RequestsTransport()\n\nwith Pipeline(transport, policies) as pipeline:\n    response = pipeline.run(HttpRequest(\"GET\", \"https:\/\/httpbin.org\/get\"))\n\nassert \"myapplication\/1.0\" in json.loads(response.http_response.text())[\"headers\"][\"User-Agent\"]\n<\/code><\/pre>\n<p>The <code>azure-core<\/code> package supports transports for <code>requests<\/code> and <code>aiohttp<\/code>, and provides some standard policies to enable scenarios like retry, proxy, and authentication. The full list of available policies and developer documentation about azure-core can be found on our <a href=\"https:\/\/github.com\/Azure\/azure-sdk-for-python\/blob\/master\/sdk\/core\/azure-core\/CLIENT_LIBRARY_DEVELOPER.md\">developer guide for azure-core<\/a>.<\/p>\n<h2>HTTP transport contract<\/h2>\n<p>In Python, a commonly used HTTP transport layer is <code>requests<\/code>. By default all our SDKs will use the <code>requests<\/code> library to connect to an HTTP server. However, since we have policies, we are not using some of the builtin capabilities of <code>requests<\/code>. Retry, for instance, has been reimplemented in a generic way, and we disable retry in our transport module. This ensures that the retry behavior is consistent independent of the transport.<\/p>\n<p>The Python world has also recently introduced async programming. We use <code>aiohttp<\/code> as default async HTTP provider. The benefits of the Azure SDK modular architecture become more obvious here. <code>aiohttp<\/code> does not, by default, provide any retry mechanism. Since we provide a generic retry policy implementation, not only will users have retry applied to async requests automatically, but this retry strategy is also consistent in behavior with the sync pipeline.<\/p>\n<h2>Transport abstraction<\/h2>\n<p>The Azure SDK defines a generic <code>HTTPRequest<\/code> object. This is NOT transport specific and is intended to cover all characteristics of an HTTP request. You could liken this object with the prepared request object from <code>requests<\/code>. The <code>HTTPRequest<\/code> stub is as follow:<\/p>\n<pre><code>class HttpRequest:\n    def __init__(self, method, url, headers=None, files=None, data=None)\n\n    def set_text_body(self, data): # helper to set text\/plain body\n    def set_xml_body(self, data): # helper to set application\/xml body\n    # More helpers for all standard content-type\n<\/code><\/pre>\n<p>Example:<\/p>\n<pre><code>req = HttpRequest(\"POST\", \"http.\/\/example.org\")\nreq.set_json_body({'creation': True})\n<\/code><\/pre>\n<p>HTTP requests are not specific to any given HTTP transport implementation since they only expose HTTP specification concepts. HTTP responses, on the other hand, are implementation-specific because they wrap an actual response. It&#8217;s particularly important to wrap and delegate, instead of consuming content for a streaming scenario, for instance. The HTTP response protocol is predictable:<\/p>\n<pre><code>class HttpResponseBase:\n    def __init__(self, request, internal_response, block_size=None):\n        # type: (HttpRequest, Any, Optional[int]) -&gt; None\n        self.request = request\n        self.internal_response = internal_response\n        self.status_code = None  # type: Optional[int]\n        self.headers = {}  # type: Dict[str, str]\n        self.reason = None  # type: Optional[str]\n        self.content_type = None  # type: Optional[str]\n        self.block_size = block_size or 4096  # Default to same as Requests\n\n    def body(self):\n        # type: () -&gt; bytes\n        \"\"\"Return the whole body as bytes in memory.\n        \"\"\"\n        raise NotImplementedError()\n<\/code><\/pre>\n<p>Because the content can be streamed synchronously or asynchronously, we have defined two implementations of this:<\/p>\n<pre><code>class HttpResponse(HttpResponseBase):\n    def stream_download(self, pipeline):\n        # type: (PipelineType) -&gt; Iterator[bytes]\n        \"\"\"Generator for streaming request body data.\n        \"\"\"\n\nclass AsyncHttpResponse(HttpResponseBase):\n    def stream_download(self, pipeline) -&gt; AsyncIteratorType[bytes]:\n        \"\"\"Generator for streaming request body data.\n        \"\"\"\n<\/code><\/pre>\n<p>Now that we have defined our basic input and output HTTP types, we can understand the HTTP transport abstract class<a href=\"#note1\">1<\/a>:<\/p>\n<pre><code>class HttpTransport(\n    AbstractContextManager, ABC, Generic[HTTPRequestType, HTTPResponseType]\n):\n\n    @abc.abstractmethod\n    def send(self, request: HttpRequest, **kwargs) -&gt; HttpResponse:\n        \"\"\"Send the request using this HTTP sender.\n\n        :param request: The pipeline request object\n        :type request: ~azure.core.transport.HTTPRequest\n        :return: The pipeline response object.\n        :rtype: ~azure.core.pipeline.transport.HttpResponse\n        \"\"\"\n\n    @abc.abstractmethod\n    def open(self):\n        \"\"\"Assign new session if one does not already exist.\"\"\"\n\n    @abc.abstractmethod\n    def close(self):\n        \"\"\"Close the session if it is not externally owned.\"\"\"\n<\/code><\/pre>\n<h3>Building a simple HTTPX transport layer<\/h3>\n<p>As an example of building a custom transport, we will create a transport for HTTPX and plug it into an existing SDK like <a href=\"https:\/\/pypi.org\/project\/azure-storage-blob\/\">azure-storage-blob<\/a>. Looking at the <a href=\"https:\/\/www.python-httpx.org\/\">HTTPX documentation<\/a>, the simplest possible call is as follow:<\/p>\n<pre><code>&gt;&gt;&gt; import httpx\n&gt;&gt;&gt; r = httpx.get('https:\/\/www.example.org\/')\n&gt;&gt;&gt; r.text\n'&lt;!doctype html&gt;\\n&lt;html&gt;\\n&lt;head&gt;\\n&lt;title&gt;Example Domain&lt;\/title&gt;...'\n<\/code><\/pre>\n<p>In order to define our own transport, we need to define both an HTTPX response and an HTTPX transport mapping. Luckily, this is mostly a one-to-one mapping:<\/p>\n<pre><code>import httpx\nfrom azure.core.pipeline.transport import HttpResponse\n\nclass HttpXTransportResponseBase(HttpResponse):\n    def __init__(self,\n            request: HttpRequest,\n            httpx_response: httpx.Response,\n            stream_contextmanager: Optional[ContextManager]=None,\n        ):\n        super(_HttpXTransportResponseBase, self).__init__(request, httpx_response)\n        self.status_code = httpx_response.status_code\n        self.headers = httpx_response.headers\n        self.reason = httpx_response.reason_phrase\n        self.content_type = httpx_response.headers.get('content-type')\n        self.stream_contextmanager = stream_contextmanager\n\ndef body(self):\n    return self.internal_response.content    \n\ndef stream_download(self, _) -&gt; Iterator[bytes]:\n    return HttpxStreamDownloadGenerator(_, self)\n\nclass HttpxStreamDownloadGenerator(object):\n    def __init__(self, _, response):\n        self.response = response\n        self.iter_bytes_func = self.response.internal_response.iter_bytes()\n\n    def __iter__(self):\n        return self\n\n    def __next__(self):\n        try:\n            return next(self.iter_bytes_func)\n        except StopIteration:\n            self.response.stream_contextmanager.__exit__()\n            raise\n\nclass HttpXTransport(HttpTransport): \n    def __init__(self):\n        self.client = None\n\n    def open(self):\n        self.client = httpx.Client()\n\n    def close(self):\n        self.client = None\n\n    def __enter__(self) -&gt; \"HttpXTransport\":\n        self.open()\n        return self\n\n    def __exit__(self, *args):\n        self.close()\n\n    def send(self, request: HttpRequest, **kwargs) -&gt; HttpResponse:\n        print(f\"I was told to send a {request.method} request to {request.url}\")\n\n        # Our HTTP transport contract expect \"stream\" kwarg for a streamable request\n        stream_response = kwargs.pop(\"stream\", False)\n        parameters = {\n            \"method\": request.method,\n            \"url\": request.url,\n            \"headers\": request.headers.items(),\n            \"data\": request.data,\n            \"files\": request.files,\n            **kwargs\n        }\n\n        stream_ctx = None  # type: Optional[ContextManager]\n        if stream_response:\n            stream_ctx = self.client.stream(**parameters)\n            response = stream_ctx.__enter__()\n        else:\n            response = self.client.request(**parameters)\n\n        return HttpXTransportResponse(\n            request,\n            response,\n            stream_contextmanager=stream_ctx,\n        )\n<\/code><\/pre>\n<h3>Using a custom transport in a released Python storage blob SDK<\/h3>\n<p>To use this custom transport with a client library within the Azure SDK, simply pass the <code>transport<\/code> kwarg to the constructor. Taking the Python storage blob SDK, we will start with one call to check the metadata of a public blob, and a second call to download the blob and print the contents to the console.<\/p>\n<p>For the sake of the exercise, we will also inject an HTTP response callback, in order to verify from the inside that the call is indeed done with <code>httpx<\/code>. The <code>raw_response_hook<\/code> keyword argument is available on all recent versions of the SDK, with the same API and same abilities. In other words, the same callback can be widely used in <a href=\"https:\/\/pypi.org\/project\/azure-storage-blob\/\">Storage<\/a>, <a href=\"https:\/\/pypi.org\/project\/azure-keyvault-secrets\/\">KeyVault<\/a>, <a href=\"https:\/\/pypi.org\/project\/azure-identity\/\">Identity<\/a>, <a href=\"https:\/\/pypi.org\/project\/azure-appconfiguration\/\">AppConfiguration<\/a>, etc.<\/p>\n<p>For this sample to work, just <code>pip install azure-storage-blob<\/code>.<\/p>\n<pre><code>from azure.storage.blob import BlobClient\n\ndef raw_response_hook(pipeline_response):\n    print(\"Checking that I can ask the HTTPX response if I want to: \")\n    print(type(pipeline_response.http_response.internal_response))\n\nblob_client = BlobClient(\n    'https:\/\/lmazuelblog.blob.core.windows.net\/',\n    'demo',\n    'blog.txt',\n    transport=HttpXTransport()\n)\nwith blob_client:\n    # A non-stream query\n    blob = blob_client.get_blob_properties(\n        raw_response_hook=raw_response_hook\n    )\n    print(f\"The blob name is {blob.name}\\n\")\n\n    data = blob_client.download_blob(\n        raw_response_hook=raw_response_hook\n    )\n    print(f\"The blob content is {data.content_as_text()}\")\n<\/code><\/pre>\n<p>Executing the preceding code will output the following text:<\/p>\n<pre><code>&gt; python blog.py\nI was told to send a HEAD request to https:\/\/lmazuelblog.blob.core.windows.net\/demo\/blog.txt\nChecking that I can ask the HTTPX response if I want to:\n&lt;class 'httpx._models.Response'&gt;\nThe blob name is blog.txt\n\nI was told to send a GET request to https:\/\/lmazuelblog.blob.core.windows.net\/demo\/blog.txt\nChecking that I can ask the HTTPX response if I want to:\n&lt;class 'httpx._models.Response'&gt;\nThe blob content is https:\/\/devblogs.microsoft.com\/azure-sdk\/\n<\/code><\/pre>\n<p>The full demo, with executable code from this blog post, can be found in a Github gist <a href=\"https:\/\/gist.github.com\/lmazuel\/8ca3a8462a3167c5e5587afac9ada155\">here<\/a>.<\/p>\n<h3>I want httpx transport out of the box now!<\/h3>\n<p>We are working on bringing an HTTPX transport as part of our standard offering in the near future. If you&#8217;re excited about it and want this to happen sooner rather than later, feel free to poke us in the comment section or open an issue on the <a href=\"https:\/\/github.com\/Azure\/azure-sdk-for-python\/issues\">Azure SDK for Python Github repository<\/a>!<\/p>\n<h2>Conclusion<\/h2>\n<p>In this blog post, we described:<\/p>\n<ul>\n<li>The contract of our custom transport layer, and how it is applied to <code>requests<\/code> and <code>aiohttp<\/code><\/li>\n<li>A simple HTTPX transport implementation<\/li>\n<li>How to use this custom transport in any of our SDKs (using storage blob as an example)<\/li>\n<\/ul>\n<p><a name=\"note1\"><\/a><a href=\"https:\/\/www.python-httpx.org\/\">1<\/a>: For the sake of simplicity, we show here a simplified version of the sync API using direct type annotations, but the actual definition is compatible with Python 2.7. We also have an async version of it.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>One of the most important design concepts of the new Azure SDK for Python is that the SDK is modular. This allows us to provide highly customizable HTTP pipelines that users can configure to their own needs. In this article, I will briefly describe the main HTTP pipeline for the Azure SDK for Python, then focus on the HTTP transport and the ab<\/p>\n","protected":false},"author":812,"featured_media":96,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[1],"tags":[683,686,685,684,162],"class_list":["post-194","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-azure-sdk","tag-http","tag-http-pipeline","tag-http-transport","tag-httpx","tag-python"],"acf":[],"blog_post_summary":"<p>One of the most important design concepts of the new Azure SDK for Python is that the SDK is modular. This allows us to provide highly customizable HTTP pipelines that users can configure to their own needs. In this article, I will briefly describe the main HTTP pipeline for the Azure SDK for Python, then focus on the HTTP transport and the ab<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/posts\/194","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/users\/812"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/comments?post=194"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/posts\/194\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/media\/96"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/media?parent=194"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/categories?post=194"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/tags?post=194"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}