One of the most important design concepts of the new Azure SDK for Python is that the SDK is modular. This allows us to provide highly customizable HTTP pipelines that users can configure to their own needs. In this article, I will briefly describe the main HTTP pipeline for the Azure SDK for Python, then focus on the HTTP transport and the abstraction behind it. Finally, I will show you how to implement a custom HTTP transport based on the HTTPX library and use it to communicate with Azure Storage.
HTTPX is one of the rising stars of the Python ecosystem. This Python HTTP library is close to the sync API of requests
, making migration easy. It also supports async programming, allowing you to use an async programming library like asyncio
.
HTTP pipeline architecture
The HTTP pipeline is split into two concepts:
- The policies: These act on a request before being sent and/or on a response before being returned to the user.
- The transport: These convert a request prepared by the policies into actual wire calls.
For example, let’s say we want to send a request with a specific UserAgent, using the transport that wraps requests
:
import json
from azure.core.pipeline.transport import RequestsTransport, HttpRequest
from azure.core.pipeline.policies import UserAgentPolicy
from azure.core.pipeline import Pipeline
policies = [
UserAgentPolicy("myapplication/1.0")
]
transport = RequestsTransport()
with Pipeline(transport, policies) as pipeline:
response = pipeline.run(HttpRequest("GET", "https://httpbin.org/get"))
assert "myapplication/1.0" in json.loads(response.http_response.text())["headers"]["User-Agent"]
The azure-core
package supports transports for requests
and aiohttp
, and provides some standard policies to enable scenarios like retry, proxy, and authentication. The full list of available policies and developer documentation about azure-core can be found on our developer guide for azure-core.
HTTP transport contract
In Python, a commonly used HTTP transport layer is requests
. By default all our SDKs will use the requests
library to connect to an HTTP server. However, since we have policies, we are not using some of the builtin capabilities of requests
. Retry, for instance, has been reimplemented in a generic way, and we disable retry in our transport module. This ensures that the retry behavior is consistent independent of the transport.
The Python world has also recently introduced async programming. We use aiohttp
as default async HTTP provider. The benefits of the Azure SDK modular architecture become more obvious here. aiohttp
does not, by default, provide any retry mechanism. Since we provide a generic retry policy implementation, not only will users have retry applied to async requests automatically, but this retry strategy is also consistent in behavior with the sync pipeline.
Transport abstraction
The Azure SDK defines a generic HTTPRequest
object. This is NOT transport specific and is intended to cover all characteristics of an HTTP request. You could liken this object with the prepared request object from requests
. The HTTPRequest
stub is as follow:
class HttpRequest:
def __init__(self, method, url, headers=None, files=None, data=None)
def set_text_body(self, data): # helper to set text/plain body
def set_xml_body(self, data): # helper to set application/xml body
# More helpers for all standard content-type
Example:
req = HttpRequest("POST", "http.//example.org")
req.set_json_body({'creation': True})
HTTP requests are not specific to any given HTTP transport implementation since they only expose HTTP specification concepts. HTTP responses, on the other hand, are implementation-specific because they wrap an actual response. It’s particularly important to wrap and delegate, instead of consuming content for a streaming scenario, for instance. The HTTP response protocol is predictable:
class HttpResponseBase:
def __init__(self, request, internal_response, block_size=None):
# type: (HttpRequest, Any, Optional[int]) -> None
self.request = request
self.internal_response = internal_response
self.status_code = None # type: Optional[int]
self.headers = {} # type: Dict[str, str]
self.reason = None # type: Optional[str]
self.content_type = None # type: Optional[str]
self.block_size = block_size or 4096 # Default to same as Requests
def body(self):
# type: () -> bytes
"""Return the whole body as bytes in memory.
"""
raise NotImplementedError()
Because the content can be streamed synchronously or asynchronously, we have defined two implementations of this:
class HttpResponse(HttpResponseBase):
def stream_download(self, pipeline):
# type: (PipelineType) -> Iterator[bytes]
"""Generator for streaming request body data.
"""
class AsyncHttpResponse(HttpResponseBase):
def stream_download(self, pipeline) -> AsyncIteratorType[bytes]:
"""Generator for streaming request body data.
"""
Now that we have defined our basic input and output HTTP types, we can understand the HTTP transport abstract class1:
class HttpTransport(
AbstractContextManager, ABC, Generic[HTTPRequestType, HTTPResponseType]
):
@abc.abstractmethod
def send(self, request: HttpRequest, **kwargs) -> HttpResponse:
"""Send the request using this HTTP sender.
:param request: The pipeline request object
:type request: ~azure.core.transport.HTTPRequest
:return: The pipeline response object.
:rtype: ~azure.core.pipeline.transport.HttpResponse
"""
@abc.abstractmethod
def open(self):
"""Assign new session if one does not already exist."""
@abc.abstractmethod
def close(self):
"""Close the session if it is not externally owned."""
Building a simple HTTPX transport layer
As an example of building a custom transport, we will create a transport for HTTPX and plug it into an existing SDK like azure-storage-blob. Looking at the HTTPX documentation, the simplest possible call is as follow:
>>> import httpx
>>> r = httpx.get('https://www.example.org/')
>>> r.text
'<!doctype html>\n<html>\n<head>\n<title>Example Domain</title>...'
In order to define our own transport, we need to define both an HTTPX response and an HTTPX transport mapping. Luckily, this is mostly a one-to-one mapping:
import httpx
from azure.core.pipeline.transport import HttpResponse
class HttpXTransportResponseBase(HttpResponse):
def __init__(self,
request: HttpRequest,
httpx_response: httpx.Response,
stream_contextmanager: Optional[ContextManager]=None,
):
super(_HttpXTransportResponseBase, self).__init__(request, httpx_response)
self.status_code = httpx_response.status_code
self.headers = httpx_response.headers
self.reason = httpx_response.reason_phrase
self.content_type = httpx_response.headers.get('content-type')
self.stream_contextmanager = stream_contextmanager
def body(self):
return self.internal_response.content
def stream_download(self, _) -> Iterator[bytes]:
return HttpxStreamDownloadGenerator(_, self)
class HttpxStreamDownloadGenerator(object):
def __init__(self, _, response):
self.response = response
self.iter_bytes_func = self.response.internal_response.iter_bytes()
def __iter__(self):
return self
def __next__(self):
try:
return next(self.iter_bytes_func)
except StopIteration:
self.response.stream_contextmanager.__exit__()
raise
class HttpXTransport(HttpTransport):
def __init__(self):
self.client = None
def open(self):
self.client = httpx.Client()
def close(self):
self.client = None
def __enter__(self) -> "HttpXTransport":
self.open()
return self
def __exit__(self, *args):
self.close()
def send(self, request: HttpRequest, **kwargs) -> HttpResponse:
print(f"I was told to send a {request.method} request to {request.url}")
# Our HTTP transport contract expect "stream" kwarg for a streamable request
stream_response = kwargs.pop("stream", False)
parameters = {
"method": request.method,
"url": request.url,
"headers": request.headers.items(),
"data": request.data,
"files": request.files,
**kwargs
}
stream_ctx = None # type: Optional[ContextManager]
if stream_response:
stream_ctx = self.client.stream(**parameters)
response = stream_ctx.__enter__()
else:
response = self.client.request(**parameters)
return HttpXTransportResponse(
request,
response,
stream_contextmanager=stream_ctx,
)
Using a custom transport in a released Python storage blob SDK
To use this custom transport with a client library within the Azure SDK, simply pass the transport
kwarg to the constructor. Taking the Python storage blob SDK, we will start with one call to check the metadata of a public blob, and a second call to download the blob and print the contents to the console.
For the sake of the exercise, we will also inject an HTTP response callback, in order to verify from the inside that the call is indeed done with httpx
. The raw_response_hook
keyword argument is available on all recent versions of the SDK, with the same API and same abilities. In other words, the same callback can be widely used in Storage, KeyVault, Identity, AppConfiguration, etc.
For this sample to work, just pip install azure-storage-blob
.
from azure.storage.blob import BlobClient
def raw_response_hook(pipeline_response):
print("Checking that I can ask the HTTPX response if I want to: ")
print(type(pipeline_response.http_response.internal_response))
blob_client = BlobClient(
'https://lmazuelblog.blob.core.windows.net/',
'demo',
'blog.txt',
transport=HttpXTransport()
)
with blob_client:
# A non-stream query
blob = blob_client.get_blob_properties(
raw_response_hook=raw_response_hook
)
print(f"The blob name is {blob.name}\n")
data = blob_client.download_blob(
raw_response_hook=raw_response_hook
)
print(f"The blob content is {data.content_as_text()}")
Executing the preceding code will output the following text:
> python blog.py
I was told to send a HEAD request to https://lmazuelblog.blob.core.windows.net/demo/blog.txt
Checking that I can ask the HTTPX response if I want to:
<class 'httpx._models.Response'>
The blob name is blog.txt
I was told to send a GET request to https://lmazuelblog.blob.core.windows.net/demo/blog.txt
Checking that I can ask the HTTPX response if I want to:
<class 'httpx._models.Response'>
The blob content is https://devblogs.microsoft.com/azure-sdk/
The full demo, with executable code from this blog post, can be found in a Github gist here.
I want httpx transport out of the box now!
We are working on bringing an HTTPX transport as part of our standard offering in the near future. If you’re excited about it and want this to happen sooner rather than later, feel free to poke us in the comment section or open an issue on the Azure SDK for Python Github repository!
Conclusion
In this blog post, we described:
- The contract of our custom transport layer, and how it is applied to
requests
andaiohttp
- A simple HTTPX transport implementation
- How to use this custom transport in any of our SDKs (using storage blob as an example)
1: For the sake of simplicity, we show here a simplified version of the sync API using direct type annotations, but the actual definition is compatible with Python 2.7. We also have an async version of it.
What happen if you want to have a feature for the transport that is not supported by HTTPX.
Like something related to SSL, or using a custom http protocol?
Can you still implement it ? Or are we basically limited to what HTTPX can offer?