summaryrefslogtreecommitdiff
path: root/gemini.py
diff options
context:
space:
mode:
authorMatt Singleton <matt@xcolour.net>2022-01-19 18:58:02 -0600
committerMatt Singleton <matt@xcolour.net>2022-01-19 18:58:02 -0600
commit0abf3511646446ee64f21d551dd0d2214fd624b7 (patch)
tree84276e8a1288cb378e93a18d406d05d362c7ee53 /gemini.py
parent704be7d2e7db33635fc9de684c8b3916cd798c68 (diff)
organizing the repo a bit
Diffstat (limited to 'gemini.py')
-rw-r--r--gemini.py148
1 files changed, 0 insertions, 148 deletions
diff --git a/gemini.py b/gemini.py
deleted file mode 100644
index 7bedda5..0000000
--- a/gemini.py
+++ /dev/null
@@ -1,148 +0,0 @@
-import io
-import re
-import socket
-import ssl
-import string
-import urllib.parse
-
-import fsm
-
-
-def htmlescape(text: str) -> str:
- return text.replace('<', '&lt;').replace('>', '&gt;')
-
-
-def gem2html(gem: dict) -> str:
- params = {
- 'charset': 'utf-8',
- 'lang': 'en',
- 'css': open('style.css').read()
- }
- if gem['status'][0] == '2':
- template = string.Template(open('page_template.html').read())
- body = io.StringIO()
- parser = fsm.Parser(gem['body'].split('\n'), body)
- parser.parse()
- params['body'] = body.getvalue()
- elif gem['status'][0] == '1':
- template = string.Template(open('input_template.html').read())
- params['meta'] = gem['meta']
- else:
- template = string.Template(open('error_template.html').read())
- if gem['status'] == '00':
- params['status'] = 'CLIENT ERROR'
- elif gem['status'][0] == '4':
- params['status'] = gem['status'] + ' TEMPORARY FAILURE'
- elif gem['status'][0] == '5':
- params['status'] = gem['status'] + ' PERMANENT FAILURE'
- else:
- params['status'] = 'UNHANDLED STATUS {}'.format(gem['status'])
- params['meta'] = gem['meta']
-
- html = template.substitute(params)
- with open('latest.html', 'w') as fp:
- fp.write(html)
- return html
-
-
-def urljoin(base: str, url: str) -> str:
- if base is None:
- return url
- base = re.sub('^gemini:', 'http:', base)
- url = re.sub('^gemini:', 'http:', url)
- return re.sub('^http:', 'gemini:', urllib.parse.urljoin(base, url))
-
-
-def get(url: str, follow_redirects: bool = True) -> dict:
- response = _get(url)
- if follow_redirects is True:
- count = 0
- while response['status'][0] == '3':
- count += 1
- if count > 20:
- return {'status': '00', 'meta': 'Too many redirects'}
- print('{status} {meta}'.format(**response))
- response = _get(response['meta'])
- return response
-
-
-def hack_url(url: str) -> str:
- """
- An ugly hack to reformat input queries the way gemini wants them:
- ?<query>
- Rather than the default way an html get form renders them:
- ?<inputname>=<query>
- I don't think this ever *should* break but I guess it *could*.
- """
- url_parts = urllib.parse.urlsplit(url)
- query = urllib.parse.parse_qs(url_parts.query)
- if len(query) == 1 and '__client_internal_input' in query and len(query['__client_internal_input']) == 1:
- query = str(query['__client_internal_input'][0])
- url = urllib.parse.urlunsplit((
- url_parts.scheme,
- url_parts.netloc,
- url_parts.path,
- query,
- url_parts.fragment,
- ))
- url_parts = urllib.parse.urlsplit(url)
- return url
-
-
-def _parse_meta(meta: str) -> dict:
- mime, _, params_text = meta.lower().strip().partition(';')
- params = {}
- if params_text.strip():
- for param in params_text.split(';'):
- k, val = param.split('=')
- params[k.strip()] = val.strip()
- params['mime'] = mime.strip()
- return params
-
-
-def _get(url: str) -> dict:
- url_parts = urllib.parse.urlsplit(url)
- if len(url_parts.path) == 0:
- return {
- 'status': '32',
- 'meta': urllib.parse.urlunsplit((
- url_parts.scheme,
- url_parts.netloc,
- '/',
- url_parts.query,
- url_parts.fragment,
- ))
- }
- try:
- context = ssl.create_default_context()
- context.check_hostname = False
- context.verify_mode = ssl.CERT_NONE
- port = 1965 if url_parts.port is None else url_parts.port
- with socket.create_connection((url_parts.hostname, port)) as sock:
- with context.wrap_socket(sock, server_hostname=url_parts.hostname) as ssock:
- ssock.sendall('{url}\r\n'.format(url=url).encode('utf8'))
- fp = ssock.makefile(mode='rb')
- header = fp.readline(1027)
- parts = header.decode('utf8').split(None, 1)
- status = parts[0]
- if len(parts) == 1:
- meta = ''
- else:
- meta = parts[1]
- if status[0] != '2':
- return {
- 'status': status,
- 'meta': meta.strip(),
- }
- meta_params = _parse_meta(meta)
- body = fp.read()
- return {
- 'status': status,
- 'meta': meta.strip(),
- 'body': body.decode(meta_params.get('charset', 'utf8')),
- }
- except Exception as ex:
- return {
- 'status': '00',
- 'meta': '{}'.format(ex),
- }