|
- # -*- coding: utf-8 -*-
- # Copyright (c) 2015 Ian Stapleton Cordasco
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- # implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Module containing the urlparse compatibility logic."""
- from collections import namedtuple
-
- from . import compat
- from . import exceptions
- from . import misc
- from . import normalizers
- from . import uri
-
- __all__ = ('ParseResult', 'ParseResultBytes')
-
- PARSED_COMPONENTS = ('scheme', 'userinfo', 'host', 'port', 'path', 'query',
- 'fragment')
-
-
- class ParseResultMixin(object):
- def _generate_authority(self, attributes):
- # I swear I did not align the comparisons below. That's just how they
- # happened to align based on pep8 and attribute lengths.
- userinfo, host, port = (attributes[p]
- for p in ('userinfo', 'host', 'port'))
- if (self.userinfo != userinfo or
- self.host != host or
- self.port != port):
- if port:
- port = '{0}'.format(port)
- return normalizers.normalize_authority(
- (compat.to_str(userinfo, self.encoding),
- compat.to_str(host, self.encoding),
- port)
- )
- return self.authority
-
- def geturl(self):
- """Shim to match the standard library method."""
- return self.unsplit()
-
- @property
- def hostname(self):
- """Shim to match the standard library."""
- return self.host
-
- @property
- def netloc(self):
- """Shim to match the standard library."""
- return self.authority
-
- @property
- def params(self):
- """Shim to match the standard library."""
- return self.query
-
-
- class ParseResult(namedtuple('ParseResult', PARSED_COMPONENTS),
- ParseResultMixin):
- """Implementation of urlparse compatibility class.
-
- This uses the URIReference logic to handle compatibility with the
- urlparse.ParseResult class.
- """
-
- slots = ()
-
- def __new__(cls, scheme, userinfo, host, port, path, query, fragment,
- uri_ref, encoding='utf-8'):
- """Create a new ParseResult."""
- parse_result = super(ParseResult, cls).__new__(
- cls,
- scheme or None,
- userinfo or None,
- host,
- port or None,
- path or None,
- query,
- fragment)
- parse_result.encoding = encoding
- parse_result.reference = uri_ref
- return parse_result
-
- @classmethod
- def from_parts(cls, scheme=None, userinfo=None, host=None, port=None,
- path=None, query=None, fragment=None, encoding='utf-8'):
- """Create a ParseResult instance from its parts."""
- authority = ''
- if userinfo is not None:
- authority += userinfo + '@'
- if host is not None:
- authority += host
- if port is not None:
- authority += ':{0}'.format(port)
- uri_ref = uri.URIReference(scheme=scheme,
- authority=authority,
- path=path,
- query=query,
- fragment=fragment,
- encoding=encoding).normalize()
- userinfo, host, port = authority_from(uri_ref, strict=True)
- return cls(scheme=uri_ref.scheme,
- userinfo=userinfo,
- host=host,
- port=port,
- path=uri_ref.path,
- query=uri_ref.query,
- fragment=uri_ref.fragment,
- uri_ref=uri_ref,
- encoding=encoding)
-
- @classmethod
- def from_string(cls, uri_string, encoding='utf-8', strict=True,
- lazy_normalize=True):
- """Parse a URI from the given unicode URI string.
-
- :param str uri_string: Unicode URI to be parsed into a reference.
- :param str encoding: The encoding of the string provided
- :param bool strict: Parse strictly according to :rfc:`3986` if True.
- If False, parse similarly to the standard library's urlparse
- function.
- :returns: :class:`ParseResult` or subclass thereof
- """
- reference = uri.URIReference.from_string(uri_string, encoding)
- if not lazy_normalize:
- reference = reference.normalize()
- userinfo, host, port = authority_from(reference, strict)
-
- return cls(scheme=reference.scheme,
- userinfo=userinfo,
- host=host,
- port=port,
- path=reference.path,
- query=reference.query,
- fragment=reference.fragment,
- uri_ref=reference,
- encoding=encoding)
-
- @property
- def authority(self):
- """Return the normalized authority."""
- return self.reference.authority
-
- def copy_with(self, scheme=misc.UseExisting, userinfo=misc.UseExisting,
- host=misc.UseExisting, port=misc.UseExisting,
- path=misc.UseExisting, query=misc.UseExisting,
- fragment=misc.UseExisting):
- """Create a copy of this instance replacing with specified parts."""
- attributes = zip(PARSED_COMPONENTS,
- (scheme, userinfo, host, port, path, query, fragment))
- attrs_dict = {}
- for name, value in attributes:
- if value is misc.UseExisting:
- value = getattr(self, name)
- attrs_dict[name] = value
- authority = self._generate_authority(attrs_dict)
- ref = self.reference.copy_with(scheme=attrs_dict['scheme'],
- authority=authority,
- path=attrs_dict['path'],
- query=attrs_dict['query'],
- fragment=attrs_dict['fragment'])
- return ParseResult(uri_ref=ref, encoding=self.encoding, **attrs_dict)
-
- def encode(self, encoding=None):
- """Convert to an instance of ParseResultBytes."""
- encoding = encoding or self.encoding
- attrs = dict(
- zip(PARSED_COMPONENTS,
- (attr.encode(encoding) if hasattr(attr, 'encode') else attr
- for attr in self)))
- return ParseResultBytes(
- uri_ref=self.reference,
- encoding=encoding,
- **attrs
- )
-
- def unsplit(self, use_idna=False):
- """Create a URI string from the components.
-
- :returns: The parsed URI reconstituted as a string.
- :rtype: str
- """
- parse_result = self
- if use_idna and self.host:
- hostbytes = self.host.encode('idna')
- host = hostbytes.decode(self.encoding)
- parse_result = self.copy_with(host=host)
- return parse_result.reference.unsplit()
-
-
- class ParseResultBytes(namedtuple('ParseResultBytes', PARSED_COMPONENTS),
- ParseResultMixin):
- """Compatibility shim for the urlparse.ParseResultBytes object."""
-
- def __new__(cls, scheme, userinfo, host, port, path, query, fragment,
- uri_ref, encoding='utf-8', lazy_normalize=True):
- """Create a new ParseResultBytes instance."""
- parse_result = super(ParseResultBytes, cls).__new__(
- cls,
- scheme or None,
- userinfo or None,
- host,
- port or None,
- path or None,
- query or None,
- fragment or None)
- parse_result.encoding = encoding
- parse_result.reference = uri_ref
- parse_result.lazy_normalize = lazy_normalize
- return parse_result
-
- @classmethod
- def from_parts(cls, scheme=None, userinfo=None, host=None, port=None,
- path=None, query=None, fragment=None, encoding='utf-8',
- lazy_normalize=True):
- """Create a ParseResult instance from its parts."""
- authority = ''
- if userinfo is not None:
- authority += userinfo + '@'
- if host is not None:
- authority += host
- if port is not None:
- authority += ':{0}'.format(int(port))
- uri_ref = uri.URIReference(scheme=scheme,
- authority=authority,
- path=path,
- query=query,
- fragment=fragment,
- encoding=encoding)
- if not lazy_normalize:
- uri_ref = uri_ref.normalize()
- to_bytes = compat.to_bytes
- userinfo, host, port = authority_from(uri_ref, strict=True)
- return cls(scheme=to_bytes(scheme, encoding),
- userinfo=to_bytes(userinfo, encoding),
- host=to_bytes(host, encoding),
- port=port,
- path=to_bytes(path, encoding),
- query=to_bytes(query, encoding),
- fragment=to_bytes(fragment, encoding),
- uri_ref=uri_ref,
- encoding=encoding,
- lazy_normalize=lazy_normalize)
-
- @classmethod
- def from_string(cls, uri_string, encoding='utf-8', strict=True,
- lazy_normalize=True):
- """Parse a URI from the given unicode URI string.
-
- :param str uri_string: Unicode URI to be parsed into a reference.
- :param str encoding: The encoding of the string provided
- :param bool strict: Parse strictly according to :rfc:`3986` if True.
- If False, parse similarly to the standard library's urlparse
- function.
- :returns: :class:`ParseResultBytes` or subclass thereof
- """
- reference = uri.URIReference.from_string(uri_string, encoding)
- if not lazy_normalize:
- reference = reference.normalize()
- userinfo, host, port = authority_from(reference, strict)
-
- to_bytes = compat.to_bytes
- return cls(scheme=to_bytes(reference.scheme, encoding),
- userinfo=to_bytes(userinfo, encoding),
- host=to_bytes(host, encoding),
- port=port,
- path=to_bytes(reference.path, encoding),
- query=to_bytes(reference.query, encoding),
- fragment=to_bytes(reference.fragment, encoding),
- uri_ref=reference,
- encoding=encoding,
- lazy_normalize=lazy_normalize)
-
- @property
- def authority(self):
- """Return the normalized authority."""
- return self.reference.authority.encode(self.encoding)
-
- def copy_with(self, scheme=misc.UseExisting, userinfo=misc.UseExisting,
- host=misc.UseExisting, port=misc.UseExisting,
- path=misc.UseExisting, query=misc.UseExisting,
- fragment=misc.UseExisting, lazy_normalize=True):
- """Create a copy of this instance replacing with specified parts."""
- attributes = zip(PARSED_COMPONENTS,
- (scheme, userinfo, host, port, path, query, fragment))
- attrs_dict = {}
- for name, value in attributes:
- if value is misc.UseExisting:
- value = getattr(self, name)
- if not isinstance(value, bytes) and hasattr(value, 'encode'):
- value = value.encode(self.encoding)
- attrs_dict[name] = value
- authority = self._generate_authority(attrs_dict)
- to_str = compat.to_str
- ref = self.reference.copy_with(
- scheme=to_str(attrs_dict['scheme'], self.encoding),
- authority=to_str(authority, self.encoding),
- path=to_str(attrs_dict['path'], self.encoding),
- query=to_str(attrs_dict['query'], self.encoding),
- fragment=to_str(attrs_dict['fragment'], self.encoding)
- )
- if not lazy_normalize:
- ref = ref.normalize()
- return ParseResultBytes(
- uri_ref=ref,
- encoding=self.encoding,
- lazy_normalize=lazy_normalize,
- **attrs_dict
- )
-
- def unsplit(self, use_idna=False):
- """Create a URI bytes object from the components.
-
- :returns: The parsed URI reconstituted as a string.
- :rtype: bytes
- """
- parse_result = self
- if use_idna and self.host:
- # self.host is bytes, to encode to idna, we need to decode it
- # first
- host = self.host.decode(self.encoding)
- hostbytes = host.encode('idna')
- parse_result = self.copy_with(host=hostbytes)
- if self.lazy_normalize:
- parse_result = parse_result.copy_with(lazy_normalize=False)
- uri = parse_result.reference.unsplit()
- return uri.encode(self.encoding)
-
-
- def split_authority(authority):
- # Initialize our expected return values
- userinfo = host = port = None
- # Initialize an extra var we may need to use
- extra_host = None
- # Set-up rest in case there is no userinfo portion
- rest = authority
-
- if '@' in authority:
- userinfo, rest = authority.rsplit('@', 1)
-
- # Handle IPv6 host addresses
- if rest.startswith('['):
- host, rest = rest.split(']', 1)
- host += ']'
-
- if ':' in rest:
- extra_host, port = rest.split(':', 1)
- elif not host and rest:
- host = rest
-
- if extra_host and not host:
- host = extra_host
-
- return userinfo, host, port
-
-
- def authority_from(reference, strict):
- try:
- subauthority = reference.authority_info()
- except exceptions.InvalidAuthority:
- if strict:
- raise
- userinfo, host, port = split_authority(reference.authority)
- else:
- # Thanks to Richard Barrell for this idea:
- # https://twitter.com/0x2ba22e11/status/617338811975139328
- userinfo, host, port = (subauthority.get(p)
- for p in ('userinfo', 'host', 'port'))
-
- if port:
- try:
- port = int(port)
- except ValueError:
- raise exceptions.InvalidPort(port)
- return userinfo, host, port
|