From 0abf3511646446ee64f21d551dd0d2214fd624b7 Mon Sep 17 00:00:00 2001 From: Matt Singleton Date: Wed, 19 Jan 2022 18:58:02 -0600 Subject: organizing the repo a bit --- browser/gemini.py | 148 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 browser/gemini.py (limited to 'browser/gemini.py') diff --git a/browser/gemini.py b/browser/gemini.py new file mode 100644 index 0000000..7bedda5 --- /dev/null +++ b/browser/gemini.py @@ -0,0 +1,148 @@ +import io +import re +import socket +import ssl +import string +import urllib.parse + +import fsm + + +def htmlescape(text: str) -> str: + return text.replace('<', '<').replace('>', '>') + + +def gem2html(gem: dict) -> str: + params = { + 'charset': 'utf-8', + 'lang': 'en', + 'css': open('style.css').read() + } + if gem['status'][0] == '2': + template = string.Template(open('page_template.html').read()) + body = io.StringIO() + parser = fsm.Parser(gem['body'].split('\n'), body) + parser.parse() + params['body'] = body.getvalue() + elif gem['status'][0] == '1': + template = string.Template(open('input_template.html').read()) + params['meta'] = gem['meta'] + else: + template = string.Template(open('error_template.html').read()) + if gem['status'] == '00': + params['status'] = 'CLIENT ERROR' + elif gem['status'][0] == '4': + params['status'] = gem['status'] + ' TEMPORARY FAILURE' + elif gem['status'][0] == '5': + params['status'] = gem['status'] + ' PERMANENT FAILURE' + else: + params['status'] = 'UNHANDLED STATUS {}'.format(gem['status']) + params['meta'] = gem['meta'] + + html = template.substitute(params) + with open('latest.html', 'w') as fp: + fp.write(html) + return html + + +def urljoin(base: str, url: str) -> str: + if base is None: + return url + base = re.sub('^gemini:', 'http:', base) + url = re.sub('^gemini:', 'http:', url) + return re.sub('^http:', 'gemini:', urllib.parse.urljoin(base, url)) + + +def get(url: str, follow_redirects: bool = True) -> dict: + response = _get(url) + if follow_redirects is True: + count = 0 + while response['status'][0] == '3': + count += 1 + if count > 20: + return {'status': '00', 'meta': 'Too many redirects'} + print('{status} {meta}'.format(**response)) + response = _get(response['meta']) + return response + + +def hack_url(url: str) -> str: + """ + An ugly hack to reformat input queries the way gemini wants them: + ? + Rather than the default way an html get form renders them: + ?= + I don't think this ever *should* break but I guess it *could*. + """ + url_parts = urllib.parse.urlsplit(url) + query = urllib.parse.parse_qs(url_parts.query) + if len(query) == 1 and '__client_internal_input' in query and len(query['__client_internal_input']) == 1: + query = str(query['__client_internal_input'][0]) + url = urllib.parse.urlunsplit(( + url_parts.scheme, + url_parts.netloc, + url_parts.path, + query, + url_parts.fragment, + )) + url_parts = urllib.parse.urlsplit(url) + return url + + +def _parse_meta(meta: str) -> dict: + mime, _, params_text = meta.lower().strip().partition(';') + params = {} + if params_text.strip(): + for param in params_text.split(';'): + k, val = param.split('=') + params[k.strip()] = val.strip() + params['mime'] = mime.strip() + return params + + +def _get(url: str) -> dict: + url_parts = urllib.parse.urlsplit(url) + if len(url_parts.path) == 0: + return { + 'status': '32', + 'meta': urllib.parse.urlunsplit(( + url_parts.scheme, + url_parts.netloc, + '/', + url_parts.query, + url_parts.fragment, + )) + } + try: + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + port = 1965 if url_parts.port is None else url_parts.port + with socket.create_connection((url_parts.hostname, port)) as sock: + with context.wrap_socket(sock, server_hostname=url_parts.hostname) as ssock: + ssock.sendall('{url}\r\n'.format(url=url).encode('utf8')) + fp = ssock.makefile(mode='rb') + header = fp.readline(1027) + parts = header.decode('utf8').split(None, 1) + status = parts[0] + if len(parts) == 1: + meta = '' + else: + meta = parts[1] + if status[0] != '2': + return { + 'status': status, + 'meta': meta.strip(), + } + meta_params = _parse_meta(meta) + body = fp.read() + return { + 'status': status, + 'meta': meta.strip(), + 'body': body.decode(meta_params.get('charset', 'utf8')), + } + except Exception as ex: + return { + 'status': '00', + 'meta': '{}'.format(ex), + } -- cgit v1.2.3