July 28th, 2020

Custom transport in Python SDK: an HTTPX experiment

lmazuel
Principal developer

One of the most important design concepts of the new Azure SDK for Python is that the SDK is modular. This allows us to provide highly customizable HTTP pipelines that users can configure to their own needs. In this article, I will briefly describe the main HTTP pipeline for the Azure SDK for Python, then focus on the HTTP transport and the abstraction behind it. Finally, I will show you how to implement a custom HTTP transport based on the HTTPX library and use it to communicate with Azure Storage.

HTTPX is one of the rising stars of the Python ecosystem. This Python HTTP library is close to the sync API of requests, making migration easy. It also supports async programming, allowing you to use an async programming library like asyncio.

HTTP pipeline architecture

The HTTP pipeline is split into two concepts:

  • The policies: These act on a request before being sent and/or on a response before being returned to the user.
  • The transport: These convert a request prepared by the policies into actual wire calls.

For example, let’s say we want to send a request with a specific UserAgent, using the transport that wraps requests:

import json

from azure.core.pipeline.transport import RequestsTransport, HttpRequest
from azure.core.pipeline.policies import UserAgentPolicy
from azure.core.pipeline import Pipeline

policies = [
    UserAgentPolicy("myapplication/1.0")
]
transport = RequestsTransport()

with Pipeline(transport, policies) as pipeline:
    response = pipeline.run(HttpRequest("GET", "https://httpbin.org/get"))

assert "myapplication/1.0" in json.loads(response.http_response.text())["headers"]["User-Agent"]

The azure-core package supports transports for requests and aiohttp, and provides some standard policies to enable scenarios like retry, proxy, and authentication. The full list of available policies and developer documentation about azure-core can be found on our developer guide for azure-core.

HTTP transport contract

In Python, a commonly used HTTP transport layer is requests. By default all our SDKs will use the requests library to connect to an HTTP server. However, since we have policies, we are not using some of the builtin capabilities of requests. Retry, for instance, has been reimplemented in a generic way, and we disable retry in our transport module. This ensures that the retry behavior is consistent independent of the transport.

The Python world has also recently introduced async programming. We use aiohttp as default async HTTP provider. The benefits of the Azure SDK modular architecture become more obvious here. aiohttp does not, by default, provide any retry mechanism. Since we provide a generic retry policy implementation, not only will users have retry applied to async requests automatically, but this retry strategy is also consistent in behavior with the sync pipeline.

Transport abstraction

The Azure SDK defines a generic HTTPRequest object. This is NOT transport specific and is intended to cover all characteristics of an HTTP request. You could liken this object with the prepared request object from requests. The HTTPRequest stub is as follow:

class HttpRequest:
    def __init__(self, method, url, headers=None, files=None, data=None)

    def set_text_body(self, data): # helper to set text/plain body
    def set_xml_body(self, data): # helper to set application/xml body
    # More helpers for all standard content-type

Example:

req = HttpRequest("POST", "http.//example.org")
req.set_json_body({'creation': True})

HTTP requests are not specific to any given HTTP transport implementation since they only expose HTTP specification concepts. HTTP responses, on the other hand, are implementation-specific because they wrap an actual response. It’s particularly important to wrap and delegate, instead of consuming content for a streaming scenario, for instance. The HTTP response protocol is predictable:

class HttpResponseBase:
    def __init__(self, request, internal_response, block_size=None):
        # type: (HttpRequest, Any, Optional[int]) -> None
        self.request = request
        self.internal_response = internal_response
        self.status_code = None  # type: Optional[int]
        self.headers = {}  # type: Dict[str, str]
        self.reason = None  # type: Optional[str]
        self.content_type = None  # type: Optional[str]
        self.block_size = block_size or 4096  # Default to same as Requests

    def body(self):
        # type: () -> bytes
        """Return the whole body as bytes in memory.
        """
        raise NotImplementedError()

Because the content can be streamed synchronously or asynchronously, we have defined two implementations of this:

class HttpResponse(HttpResponseBase):
    def stream_download(self, pipeline):
        # type: (PipelineType) -> Iterator[bytes]
        """Generator for streaming request body data.
        """

class AsyncHttpResponse(HttpResponseBase):
    def stream_download(self, pipeline) -> AsyncIteratorType[bytes]:
        """Generator for streaming request body data.
        """

Now that we have defined our basic input and output HTTP types, we can understand the HTTP transport abstract class1:

class HttpTransport(
    AbstractContextManager, ABC, Generic[HTTPRequestType, HTTPResponseType]
):

    @abc.abstractmethod
    def send(self, request: HttpRequest, **kwargs) -> HttpResponse:
        """Send the request using this HTTP sender.

        :param request: The pipeline request object
        :type request: ~azure.core.transport.HTTPRequest
        :return: The pipeline response object.
        :rtype: ~azure.core.pipeline.transport.HttpResponse
        """

    @abc.abstractmethod
    def open(self):
        """Assign new session if one does not already exist."""

    @abc.abstractmethod
    def close(self):
        """Close the session if it is not externally owned."""

Building a simple HTTPX transport layer

As an example of building a custom transport, we will create a transport for HTTPX and plug it into an existing SDK like azure-storage-blob. Looking at the HTTPX documentation, the simplest possible call is as follow:

>>> import httpx
>>> r = httpx.get('https://www.example.org/')
>>> r.text
'<!doctype html>\n<html>\n<head>\n<title>Example Domain</title>...'

In order to define our own transport, we need to define both an HTTPX response and an HTTPX transport mapping. Luckily, this is mostly a one-to-one mapping:

import httpx
from azure.core.pipeline.transport import HttpResponse

class HttpXTransportResponseBase(HttpResponse):
    def __init__(self,
            request: HttpRequest,
            httpx_response: httpx.Response,
            stream_contextmanager: Optional[ContextManager]=None,
        ):
        super(_HttpXTransportResponseBase, self).__init__(request, httpx_response)
        self.status_code = httpx_response.status_code
        self.headers = httpx_response.headers
        self.reason = httpx_response.reason_phrase
        self.content_type = httpx_response.headers.get('content-type')
        self.stream_contextmanager = stream_contextmanager

def body(self):
    return self.internal_response.content    

def stream_download(self, _) -> Iterator[bytes]:
    return HttpxStreamDownloadGenerator(_, self)

class HttpxStreamDownloadGenerator(object):
    def __init__(self, _, response):
        self.response = response
        self.iter_bytes_func = self.response.internal_response.iter_bytes()

    def __iter__(self):
        return self

    def __next__(self):
        try:
            return next(self.iter_bytes_func)
        except StopIteration:
            self.response.stream_contextmanager.__exit__()
            raise

class HttpXTransport(HttpTransport): 
    def __init__(self):
        self.client = None

    def open(self):
        self.client = httpx.Client()

    def close(self):
        self.client = None

    def __enter__(self) -> "HttpXTransport":
        self.open()
        return self

    def __exit__(self, *args):
        self.close()

    def send(self, request: HttpRequest, **kwargs) -> HttpResponse:
        print(f"I was told to send a {request.method} request to {request.url}")

        # Our HTTP transport contract expect "stream" kwarg for a streamable request
        stream_response = kwargs.pop("stream", False)
        parameters = {
            "method": request.method,
            "url": request.url,
            "headers": request.headers.items(),
            "data": request.data,
            "files": request.files,
            **kwargs
        }

        stream_ctx = None  # type: Optional[ContextManager]
        if stream_response:
            stream_ctx = self.client.stream(**parameters)
            response = stream_ctx.__enter__()
        else:
            response = self.client.request(**parameters)

        return HttpXTransportResponse(
            request,
            response,
            stream_contextmanager=stream_ctx,
        )

Using a custom transport in a released Python storage blob SDK

To use this custom transport with a client library within the Azure SDK, simply pass the transport kwarg to the constructor. Taking the Python storage blob SDK, we will start with one call to check the metadata of a public blob, and a second call to download the blob and print the contents to the console.

For the sake of the exercise, we will also inject an HTTP response callback, in order to verify from the inside that the call is indeed done with httpx. The raw_response_hook keyword argument is available on all recent versions of the SDK, with the same API and same abilities. In other words, the same callback can be widely used in Storage, KeyVault, Identity, AppConfiguration, etc.

For this sample to work, just pip install azure-storage-blob.

from azure.storage.blob import BlobClient

def raw_response_hook(pipeline_response):
    print("Checking that I can ask the HTTPX response if I want to: ")
    print(type(pipeline_response.http_response.internal_response))

blob_client = BlobClient(
    'https://lmazuelblog.blob.core.windows.net/',
    'demo',
    'blog.txt',
    transport=HttpXTransport()
)
with blob_client:
    # A non-stream query
    blob = blob_client.get_blob_properties(
        raw_response_hook=raw_response_hook
    )
    print(f"The blob name is {blob.name}\n")

    data = blob_client.download_blob(
        raw_response_hook=raw_response_hook
    )
    print(f"The blob content is {data.content_as_text()}")

Executing the preceding code will output the following text:

> python blog.py
I was told to send a HEAD request to https://lmazuelblog.blob.core.windows.net/demo/blog.txt
Checking that I can ask the HTTPX response if I want to:
<class 'httpx._models.Response'>
The blob name is blog.txt

I was told to send a GET request to https://lmazuelblog.blob.core.windows.net/demo/blog.txt
Checking that I can ask the HTTPX response if I want to:
<class 'httpx._models.Response'>
The blob content is https://devblogs.microsoft.com/azure-sdk/

The full demo, with executable code from this blog post, can be found in a Github gist here.

I want httpx transport out of the box now!

We are working on bringing an HTTPX transport as part of our standard offering in the near future. If you’re excited about it and want this to happen sooner rather than later, feel free to poke us in the comment section or open an issue on the Azure SDK for Python Github repository!

Conclusion

In this blog post, we described:

  • The contract of our custom transport layer, and how it is applied to requests and aiohttp
  • A simple HTTPX transport implementation
  • How to use this custom transport in any of our SDKs (using storage blob as an example)

1: For the sake of simplicity, we show here a simplified version of the sync API using direct type annotations, but the actual definition is compatible with Python 2.7. We also have an async version of it.

Author

lmazuel
Principal developer

Principal developer in the Azure SDK for Python team. Develop in Python since 2008, and the birth of Python 3!

1 comment

Discussion is closed. Login to edit/delete existing comments.

  • Victor VazquezMicrosoft employee

    What happen if you want to have a feature for the transport that is not supported by HTTPX.
    Like something related to SSL, or using a custom http protocol?
    Can you still implement it ? Or are we basically limited to what HTTPX can offer?