summaryrefslogtreecommitdiff
path: root/browser/gemini.py
diff options
context:
space:
mode:
Diffstat (limited to 'browser/gemini.py')
-rw-r--r--browser/gemini.py148
1 files changed, 148 insertions, 0 deletions
diff --git a/browser/gemini.py b/browser/gemini.py
new file mode 100644
index 0000000..7bedda5
--- /dev/null
+++ b/browser/gemini.py
@@ -0,0 +1,148 @@
+import io
+import re
+import socket
+import ssl
+import string
+import urllib.parse
+
+import fsm
+
+
+def htmlescape(text: str) -> str:
+ return text.replace('<', '&lt;').replace('>', '&gt;')
+
+
+def gem2html(gem: dict) -> str:
+ params = {
+ 'charset': 'utf-8',
+ 'lang': 'en',
+ 'css': open('style.css').read()
+ }
+ if gem['status'][0] == '2':
+ template = string.Template(open('page_template.html').read())
+ body = io.StringIO()
+ parser = fsm.Parser(gem['body'].split('\n'), body)
+ parser.parse()
+ params['body'] = body.getvalue()
+ elif gem['status'][0] == '1':
+ template = string.Template(open('input_template.html').read())
+ params['meta'] = gem['meta']
+ else:
+ template = string.Template(open('error_template.html').read())
+ if gem['status'] == '00':
+ params['status'] = 'CLIENT ERROR'
+ elif gem['status'][0] == '4':
+ params['status'] = gem['status'] + ' TEMPORARY FAILURE'
+ elif gem['status'][0] == '5':
+ params['status'] = gem['status'] + ' PERMANENT FAILURE'
+ else:
+ params['status'] = 'UNHANDLED STATUS {}'.format(gem['status'])
+ params['meta'] = gem['meta']
+
+ html = template.substitute(params)
+ with open('latest.html', 'w') as fp:
+ fp.write(html)
+ return html
+
+
+def urljoin(base: str, url: str) -> str:
+ if base is None:
+ return url
+ base = re.sub('^gemini:', 'http:', base)
+ url = re.sub('^gemini:', 'http:', url)
+ return re.sub('^http:', 'gemini:', urllib.parse.urljoin(base, url))
+
+
+def get(url: str, follow_redirects: bool = True) -> dict:
+ response = _get(url)
+ if follow_redirects is True:
+ count = 0
+ while response['status'][0] == '3':
+ count += 1
+ if count > 20:
+ return {'status': '00', 'meta': 'Too many redirects'}
+ print('{status} {meta}'.format(**response))
+ response = _get(response['meta'])
+ return response
+
+
+def hack_url(url: str) -> str:
+ """
+ An ugly hack to reformat input queries the way gemini wants them:
+ ?<query>
+ Rather than the default way an html get form renders them:
+ ?<inputname>=<query>
+ I don't think this ever *should* break but I guess it *could*.
+ """
+ url_parts = urllib.parse.urlsplit(url)
+ query = urllib.parse.parse_qs(url_parts.query)
+ if len(query) == 1 and '__client_internal_input' in query and len(query['__client_internal_input']) == 1:
+ query = str(query['__client_internal_input'][0])
+ url = urllib.parse.urlunsplit((
+ url_parts.scheme,
+ url_parts.netloc,
+ url_parts.path,
+ query,
+ url_parts.fragment,
+ ))
+ url_parts = urllib.parse.urlsplit(url)
+ return url
+
+
+def _parse_meta(meta: str) -> dict:
+ mime, _, params_text = meta.lower().strip().partition(';')
+ params = {}
+ if params_text.strip():
+ for param in params_text.split(';'):
+ k, val = param.split('=')
+ params[k.strip()] = val.strip()
+ params['mime'] = mime.strip()
+ return params
+
+
+def _get(url: str) -> dict:
+ url_parts = urllib.parse.urlsplit(url)
+ if len(url_parts.path) == 0:
+ return {
+ 'status': '32',
+ 'meta': urllib.parse.urlunsplit((
+ url_parts.scheme,
+ url_parts.netloc,
+ '/',
+ url_parts.query,
+ url_parts.fragment,
+ ))
+ }
+ try:
+ context = ssl.create_default_context()
+ context.check_hostname = False
+ context.verify_mode = ssl.CERT_NONE
+ port = 1965 if url_parts.port is None else url_parts.port
+ with socket.create_connection((url_parts.hostname, port)) as sock:
+ with context.wrap_socket(sock, server_hostname=url_parts.hostname) as ssock:
+ ssock.sendall('{url}\r\n'.format(url=url).encode('utf8'))
+ fp = ssock.makefile(mode='rb')
+ header = fp.readline(1027)
+ parts = header.decode('utf8').split(None, 1)
+ status = parts[0]
+ if len(parts) == 1:
+ meta = ''
+ else:
+ meta = parts[1]
+ if status[0] != '2':
+ return {
+ 'status': status,
+ 'meta': meta.strip(),
+ }
+ meta_params = _parse_meta(meta)
+ body = fp.read()
+ return {
+ 'status': status,
+ 'meta': meta.strip(),
+ 'body': body.decode(meta_params.get('charset', 'utf8')),
+ }
+ except Exception as ex:
+ return {
+ 'status': '00',
+ 'meta': '{}'.format(ex),
+ }