B
Beacon Globe News

azure.core.pipeline.transport._base_async — Azure SDK for Python 2.0.0 documentation

Author

Matthew Sanders

Published Feb 16, 2026

# --------------------------------------------------------------------------## Copyright (c) Microsoft Corporation. All rights reserved.## The MIT License (MIT)## Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the ""Software""), to# deal in the Software without restriction, including without limitation the# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or# sell copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:## The above copyright notice and this permission notice shall be included in# all copies or substantial portions of the Software.## THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS# IN THE SOFTWARE.## --------------------------------------------------------------------------import asyncioimport abcfrom collections.abc import AsyncIteratorfrom typing import AsyncIterator as AsyncIteratorType, Generic, TypeVarfrom ._base import ( _HttpResponseBase, _HttpClientTransportResponse, PipelineContext, PipelineRequest, PipelineResponse,)from .._base_async import _await_resulttry: from contextlib import AbstractAsyncContextManager # type: ignoreexcept ImportError: # Python <= 3.7 class AbstractAsyncContextManager(object): # type: ignore async def __aenter__(self): """Return `self` upon entering the runtime context.""" return self @abc.abstractmethod async def __aexit__(self, exc_type, exc_value, traceback): """Raise any exception triggered within the runtime context.""" return NoneAsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType")HTTPResponseType = TypeVar("HTTPResponseType")HTTPRequestType = TypeVar("HTTPRequestType")class _ResponseStopIteration(Exception): passdef _iterate_response_content(iterator): """"To avoid: TypeError: StopIteration interacts badly with generators and cannot be raised into a Future """ try: return next(iterator) except StopIteration: raise _ResponseStopIteration()class _PartGenerator(AsyncIterator): """Until parts is a real async iterator, wrap the sync call. :param parts: An iterable of parts """ def __init__(self, response: "AsyncHttpResponse") -> None: self._response = response self._parts = None async def _parse_response(self): responses = self._response._get_raw_parts( # pylint: disable=protected-access http_response_type=AsyncHttpClientTransportResponse ) if self._response.request.multipart_mixed_info: policies = self._response.request.multipart_mixed_info[ 1 ] # type: List[SansIOHTTPPolicy] async def parse_responses(response): http_request = response.request context = PipelineContext(None) pipeline_request = PipelineRequest(http_request, context) pipeline_response = PipelineResponse( http_request, response, context=context ) for policy in policies: await _await_result( policy.on_response, pipeline_request, pipeline_response ) # Not happy to make this code asyncio specific, but that's multipart only for now # If we need trio and multipart, let's reinvesitgate that later await asyncio.gather(*[parse_responses(res) for res in responses]) return responses async def __anext__(self): if not self._parts: self._parts = iter(await self._parse_response()) try: return next(self._parts) except StopIteration: raise StopAsyncIteration()[docs]class AsyncHttpResponse(_HttpResponseBase): # pylint: disable=abstract-method """An AsyncHttpResponse ABC. Allows for the asynchronous streaming of data from the response. """[docs] def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: """Generator for streaming response body data. Should be implemented by sub-classes if streaming download is supported. Will return an asynchronous generator. :param pipeline: The pipeline object :type pipeline: azure.core.pipeline """[docs] def parts(self) -> AsyncIterator: """Assuming the content-type is multipart/mixed, will return the parts as an async iterator. :rtype: AsyncIterator :raises ValueError: If the content is not multipart/mixed """ if not self.content_type or not self.content_type.startswith("multipart/mixed"): raise ValueError( "You can't get parts if the response is not multipart/mixed" ) return _PartGenerator(self)class AsyncHttpClientTransportResponse(_HttpClientTransportResponse, AsyncHttpResponse): """Create a HTTPResponse from an http.client response. Body will NOT be read by the constructor. Call "body()" to load the body in memory if necessary. :param HttpRequest request: The request. :param httpclient_response: The object returned from an HTTP(S)Connection from http.client """[docs]class AsyncHttpTransport( AbstractAsyncContextManager, abc.ABC, Generic[HTTPRequestType, AsyncHTTPResponseType],): """An http sender ABC. """[docs] @abc.abstractmethod async def send(self, request, **kwargs): """Send the request using this HTTP sender. """[docs] @abc.abstractmethod async def open(self): """Assign new session if one does not already exist."""[docs] @abc.abstractmethod async def close(self): """Close the session if it is not externally owned."""[docs] async def sleep(self, duration): await asyncio.sleep(duration) def __enter__(self): raise TypeError("Use async with instead") def __exit__(self, exc_type, exc_val, exc_tb): # __exit__ should exist in pair with __enter__ but never executed pass # pragma: no cover