import io import re import socket import ssl import string import urllib.parse import fsm from util import resource_path def htmlescape(text: str) -> str: return text.replace('<', '<').replace('>', '>') def gem2html(gem: dict) -> str: params = { 'charset': 'utf-8', 'lang': 'en', 'css': open(resource_path('resources/style.css')).read() } if gem['status'][0] == '2': template = string.Template(open(resource_path('resources/page_template.html')).read()) body = io.StringIO() parser = fsm.Parser(gem['body'].split('\n'), body) parser.parse() params['body'] = body.getvalue() elif gem['status'][0] == '1': template = string.Template(open(resource_path('resources/input_template.html')).read()) params['meta'] = gem['meta'] else: template = string.Template(open(resource_path('resources/error_template.html')).read()) if gem['status'] == '00': params['status'] = 'CLIENT ERROR' elif gem['status'][0] == '4': params['status'] = gem['status'] + ' TEMPORARY FAILURE' elif gem['status'][0] == '5': params['status'] = gem['status'] + ' PERMANENT FAILURE' else: params['status'] = 'UNHANDLED STATUS {}'.format(gem['status']) params['meta'] = gem['meta'] html = template.substitute(params) return html def urljoin(base: str, url: str) -> str: if base is None: return url base = re.sub('^gemini:', 'http:', base) url = re.sub('^gemini:', 'http:', url) return re.sub('^http:', 'gemini:', urllib.parse.urljoin(base, url)) def get(url: str, follow_redirects: bool = True) -> dict: response = _get(url) if follow_redirects is True: count = 0 while response['status'][0] == '3': count += 1 if count > 20: return {'status': '00', 'meta': 'Too many redirects'} print('{status} {meta}'.format(**response)) response = _get(response['meta']) return response def hack_url(url: str) -> str: """ An ugly hack to reformat input queries the way gemini wants them: ? Rather than the default way an html get form renders them: ?= I don't think this ever *should* break but I guess it *could*. """ url_parts = urllib.parse.urlsplit(url) query = urllib.parse.parse_qs(url_parts.query) if len(query) == 1 and '__client_internal_input' in query and len(query['__client_internal_input']) == 1: query = str(query['__client_internal_input'][0]) url = urllib.parse.urlunsplit(( url_parts.scheme, url_parts.netloc, url_parts.path, query, url_parts.fragment, )) url_parts = urllib.parse.urlsplit(url) return url def _parse_meta(meta: str) -> dict: mime, _, params_text = meta.lower().strip().partition(';') params = {} if params_text.strip(): for param in params_text.split(';'): k, val = param.split('=') params[k.strip()] = val.strip() params['mime'] = mime.strip() return params def _get(url: str) -> dict: url_parts = urllib.parse.urlsplit(url) if len(url_parts.path) == 0: return { 'status': '32', 'meta': urllib.parse.urlunsplit(( url_parts.scheme, url_parts.netloc, '/', url_parts.query, url_parts.fragment, )) } try: context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE port = 1965 if url_parts.port is None else url_parts.port with socket.create_connection((url_parts.hostname, port)) as sock: with context.wrap_socket(sock, server_hostname=url_parts.hostname) as ssock: ssock.sendall('{url}\r\n'.format(url=url).encode('utf8')) fp = ssock.makefile(mode='rb') header = fp.readline(1027) parts = header.decode('utf8').split(None, 1) status = parts[0] if len(parts) == 1: meta = '' else: meta = parts[1] if status[0] != '2': return { 'status': status, 'meta': meta.strip(), } meta_params = _parse_meta(meta) body = fp.read() return { 'status': status, 'meta': meta.strip(), 'body': body.decode(meta_params.get('charset', 'utf8')), } except Exception as ex: return { 'status': '00', 'meta': '{}'.format(ex), }