izzylib-http-async/izzylib_http_async/utils.py

643 lines
12 KiB
Python

import asyncio, json
from datetime import datetime, timezone
from io import BytesIO
from izzylib import (
DateString,
DotDict,
Url,
logging
)
try:
from .signatures import sign_headers, verify_headers
except ImportError:
sign_headers = None
verify_headers = None
ports = {
'http': 80,
'https': 443,
'ws': 80,
'wss': 443
}
methods = [
'CONNECT',
'DELETE',
'GET',
'HEAD',
'OPTIONS',
'PATCH',
'POST',
'PUT',
'TRACE'
]
class Headers(DotDict):
def __getattr__(self, key):
return self[key.replace('_', '-')]
def __setattr__(self, key, value):
self[key.replace('_', '-')] = value
def __getitem__(self, key):
return super().__getitem__(key.title())
def __setitem__(self, key, value):
key = key.title()
if key in ['Cookie', 'Set-Cookie']:
logging.warning('Do not set the "Cookie" or "Set-Cookie" headers')
return
elif key == 'Date':
value = DateString(value, 'http')
try:
self[key].append(value)
except KeyError:
super().__setitem__(key, HeaderItem(key, value))
def get(self, key, default=None):
return super().get(key.title(), default)
def as_dict(self):
data = {}
for k,v in self.items():
data[k] = str(v)
return data
def getone(self, key, default=None):
try:
return self[key].one()
except (KeyError, IndexError):
return default
def setall(self, key, value):
try:
self[key].set(value)
except KeyError:
self[key] = value
class Cookies(DotDict):
def __setitem__(self, key, value):
if type(value) != CookieItem:
value = CookieItem(key, value)
super().__setitem__(key, value)
class HeaderItem(list):
def __init__(self, key, values):
super().__init__()
self.update(values)
self.key = key
def __str__(self):
return ','.join(str(v) for v in self)
def set(self, *values):
self.clear()
for value in values:
self.append(value)
def one(self):
return self[0]
def update(self, *items):
for item in items:
self.append(item)
class CookieItem:
def __init__(self, key, value, **kwargs):
self.key = key
self.value = value
self.args = DotDict()
for k,v in kwargs.items():
if k not in cookie_params.values():
raise AttributeError(f'Not a valid cookie parameter: {key}')
setattr(self, k, v)
def __str__(self):
text = f'{self.key}={self.value}'
if self.expires:
text += f'; Expires={self.expires.strftime("%a, %d %b %Y %H:%M:%S GMT")}'
if self.maxage != None:
text += f'; Max-Age={self.maxage}'
if self.domain:
text += f'; Domain={self.domain}'
if self.path:
text += f'; Path={self.path}'
if self.samesite:
text += f'; SameSite={self.samesite}'
if self.secure:
text += f'; Secure'
if self.httponly:
text += f'; HttpOnly'
return text
@classmethod
def from_string(cls, data):
kwargs = {}
for idx, pairs in enumerate(data.split(';')):
try:
k, v = pairs.split('=', 1)
v = v.strip()
except:
k, v = pairs, True
k = k.replace(' ', '')
if isinstance(v, str) and v.startswith('"') and v.endswith('"'):
v = v[1:-1]
if idx == 0:
key = k
value = v
elif k in cookie_params.keys():
kwargs[cookie_params[k]] = v
else:
logging.info(f'Invalid key/value pair for cookie: {k} = {v}')
return cls(key, value, **kwargs)
@property
def expires(self):
return self.args.get('Expires')
@expires.setter
def expires(self, data):
if isinstance(data, str):
data = DateString(data, 'http')
elif isinstance(data, int) or isinstance(data, float):
data = datetime.fromtimestamp(data).replace(tzinfo=timezone.utc)
elif isinstance(data, datetime):
if not data.tzinfo:
data = data.replace(tzinfo=timezone.utc)
elif isinstance(data, timedelta):
data = datetime.now(timezone.utc) + data
else:
raise TypeError(f'Expires must be a http date string, timestamp, or datetime object, not {data.__class__.__name__}')
self.args['Expires'] = data
@property
def maxage(self):
return self.args.get('Max-Age')
@maxage.setter
def maxage(self, data):
if isinstance(data, int):
pass
elif isinstance(date, timedelta):
data = data.seconds
elif isinstance(date, datetime):
data = (datetime.now() - date).seconds
else:
raise TypeError(f'Max-Age must be an integer, timedelta object, or datetime object, not {data.__class__.__name__}')
self.args['Max-Age'] = data
@property
def domain(self):
return self.args.get('Domain')
@domain.setter
def domain(self, data):
if not isinstance(data, str):
raise ValueError(f'Domain must be a string, not {data.__class__.__name__}')
self.args['Domain'] = data
@property
def path(self):
return self.args.get('Path')
@path.setter
def path(self, data):
if not isinstance(data, str):
raise ValueError(f'Path must be a string or izzylib.Path object, not {data.__class__.__name__}')
self.args['Path'] = Path(data)
@property
def secure(self):
return self.args.get('Secure')
@secure.setter
def secure(self, data):
self.args['Secure'] = boolean(data)
@property
def httponly(self):
return self.args.get('HttpOnly')
@httponly.setter
def httponly(self, data):
self.args['HttpOnly'] = boolean(data)
@property
def samesite(self):
return self.args.get('SameSite')
@samesite.setter
def samesite(self, data):
if isinstance(data, bool):
data = 'Strict' if data else 'None'
elif isinstance(data, str) and data.title() in ['Strict', 'Lax', 'None']:
data = data.title()
else:
raise TypeError(f'SameSite must be a boolean or one of Strict, Lax, or None, not {data.__class__.__name__}')
self.args['SameSite'] = data
self.args['Secure'] = True
def as_dict(self):
data = DotDict({self.key: self.value})
data.update(self.args)
return data
def set_defaults(self):
for key in list(self.args.keys()):
del self.args[key]
def set_delete(self):
self.args.pop('Expires', None)
self.maxage = 0
return self
class AsyncTransport:
def __init__(self, reader, writer, timeout=60):
self.reader = reader
self.writer = writer
self.timeout = timeout
@property
def client_address(self):
return self.writer.get_extra_info('peername')[0]
@property
def client_port(self):
return self.writer.get_extra_info('peername')[1]
@property
def closed(self):
return self.writer.is_closing()
def eof(self):
return self.reader.at_eof()
async def read(self, length=None, timeout=None):
if timeout == False:
return await self.reader.read(length or 2048)
return await asyncio.wait_for(self.reader.read(length or 2048), timeout or self.timeout)
async def read_until(self, bytes, timeout=None):
return await asyncio.wait_for(self.reader.readuntil(bytes), timeout or self.timeout)
async def read_headers(self, request=True, timeout=None):
raw_headers = await self.read_until(b'\r\n\r\n', timeout=timeout)
return parse_headers(raw_headers.decode('utf-8'), request)
async def write(self, data):
self.writer.write(convert_to_bytes(data))
await self.writer.drain()
async def close(self):
if self.closed:
return
self.writer.close()
await self.writer.wait_closed()
class Request:
def __init__(self, url, body=None, headers={}, cookies={}, method='GET'):
method = method.upper()
if method not in http_methods:
raise ValueError(f'Invalid HTTP method: {method}')
self._body = b''
self.version = 1.1
self.url = Url(url)
self.headers = Headers(headers)
self.cookies = Cookies(cookies)
self.method = method
if self.url.proto not in ['http', 'https', 'ws', 'wss']:
raise ValueError(f'Invalid protocol in url: {self.url.proto}')
if not self.headers.get('host'):
self.headers.host = self.host
@property
def body(self):
return self._body
@body.setter
def body(self, data):
self._body = convert_to_bytes(data)
self.headers.setall('Content-Length', str(len(self._body)))
@property
def host(self):
return self.url.host
@property
def length(self):
return self._body.getbuffer().nbytes
@property
def port(self):
return self.url.port or http_ports[self.url.proto]
@property
def secure(self):
return self.url.proto in ['https', 'wss']
def set_header(self, key, value):
self.headers.setall(key, value)
def unset_header(self, key):
self.headers.pop(key, None)
def sign_headers(self, privkey, keyid):
if not sign_request:
raise ImportError(f'Could not import HTTP signatures. Header signing disabled')
return sign_request(self, privkey, keyid)
def verify_headers(self, headhash, pubkey):
pass
def compile(self):
first = bytes(f'{self.method} {self.path} HTTP/{self.version}', 'utf-8')
self.set_header('Content-Length', self.length)
return create_message(first, self.headers, self.cookies, self.body)
class Response:
def __init__(self, status, message, version, headers, cookies):
self._body = BytesIO()
self.version = version
self.status = status
self.message = message
self.headers = headers if type(headers) == Headers else Headers(headers)
self.cookies = cookies if type(cookies) == Cookies else Cookies(cookies)
@property
def body(self):
return self._body
@body.setter
def body(self, data):
self._body = convert_to_bytes(data)
self.headers.setall('Content-Length', str(len(self._body)))
@property
def host(self):
return self.url.host
@property
def port(self):
return self.url.port or http_ports[self.url.proto]
@property
def secure(self):
return self.url.proto in ['https', 'wss']
def set_header(self, key, value):
self.headers.setall(key, value)
def unset_header(self, key):
self.headers.pop(key, None)
async def read(self, length=None):
data = await self.transport.read(length or self.length or 8192)
if data:
self._body.write(data)
return data
def body(self, encoding=None):
data = self._body.readall()
return data.decode(encoding) if encoding else data
def text(self, encoding='utf-8'):
return self.body(encoding)
def dict(self):
return DotDict(self.text())
def close(self):
self._body.close()
def parse_headers(raw_headers, request=True):
headers = Headers()
cookies = Cookies()
for idx, line in enumerate(raw_headers.splitlines()):
if idx == 0:
if request:
method, path, version = line.split()
else:
split_line = line.split()
version = split_line[0]
status = split_line[1]
try: message = ' '.join(split_line[2:])
except IndexError: message = None
else:
try: key, value = line.split(': ', 1)
except: continue
if key.lower() in ['cookie', 'set-cookie']:
for cookie in value.split(';'):
try:
item = CookieItem.from_string(cookie)
except:
traceback.print_exc()
continue
cookies[item.key] = item
continue
else:
headers[key] = value
version = float(version.replace('HTTP/', ''))
if request:
return method, path, version, headers, cookies
return int(status), message, version, headers, cookies
def convert_to_bytes(data):
if isinstance(data, str):
data = data.encode('utf-8')
elif isinstance(data, bytearray):
data = bytes(data)
elif any(map(isinstance, [data], [dict, list, tuple])):
data = json.dumps(data).encode('utf-8')
elif not isinstance(data, bytes):
data = str(data).encode('utf-8')
return data
def create_message(data, headers, cookies=None, body=None):
for k,v in headers.items():
for value in v:
data += bytes(f'\r\n{k}: {value}', 'utf-8')
if cookies:
for cookie in cookies.values():
data += bytes(f'\r\nSet-Cookie: {cookie}', 'utf-8')
if not headers.get('Date'):
data += bytes(f'\r\nDate: {DateString.now("http")}', 'utf-8')
if not headers.get('Content-Length'):
data += bytes(f'\r\nContent-Length: {len(body)}', 'utf-8')
if body and not headers.get('Content-Type'):
data += bytes(f'\r\nContent-Type: text/plain')
data += b'\r\n\r\n'
if body:
data += body
return data
def first_line(method=None, path=None, status=None, message=None, version='1.1'):
if method and path:
return bytes(f'{method} {path} HTTP/{version}', 'utf-8')
elif status:
if message:
return bytes(f'HTTP/{version} {status} {message}', 'utf-8')
return bytes(f'HTTP/{version} {status}', 'utf-8')
raise TypeError('Please only provide a method and path or a status and message')