import re
import socket
import ssl
import urllib.parse
import pyphen
dic = pyphen.Pyphen(lang='en_US')
def htmlescape(text: str) -> str:
return text.replace('<', '<').replace('>', '>')
def gem2html(gem: str) -> str:
html = []
html.append('\n
\n')
state = 'text'
blanklines = 0
for line in gem.split('\n'):
if line.startswith('```'):
if state == 'pre':
newstate = 'text'
blanklines = 0
else:
newstate = 'pre'
elif state == 'pre':
newstate = 'pre'
elif line.startswith('=>'):
newstate = 'links'
elif line.startswith('* '):
newstate = 'list'
else:
newstate = 'text'
if state != 'pre':
if len(line.strip()) == 0:
blanklines += 1
if blanklines > 1:
html.append('
')
continue
blanklines = 0
if state != newstate:
if state in ('links', 'list'):
html.append('')
elif state == 'pre':
html.append('')
if newstate in ('links', 'list'):
html.append('
')
elif newstate == 'pre':
html.append('
')
state = newstate
if line.startswith('```'):
pass
elif state == 'links':
tokens = line.split(None, 2)
url = tokens[1]
text = None if len(tokens) < 3 else tokens[2]
url_parts = urllib.parse.urlsplit(url)
external = ' class="external"' if url_parts.scheme not in ('gemini', '') else ''
html.append('
{text}'.format(url=url, text=text or url, external=external))
elif state == 'list':
html.append('
{}'.format(line[2:]))
elif state == 'pre':
html.append(line)
else:
if line.startswith('###'):
html.append('
{}
'.format(line[3:].lstrip()))
elif line.startswith('##'):
html.append('
{}
'.format(line[2:].lstrip()))
elif line.startswith('#'):
html.append('
{}
'.format(line[1:].lstrip()))
else:
' '.join([dic.inserted(word, "\u00AD") for word in line.split()])
html.append('
{}
'.format(htmlescape(line)))
html.append('
\n\n')
return '\n'.join(html)
def urljoin(base: str, url: str) -> str:
if base is None:
return url
base = re.sub('^gemini:', 'http:', base)
url = re.sub('^gemini:', 'http:', url)
return re.sub('^http:', 'gemini:', urllib.parse.urljoin(base, url))
def get(url: str, follow_redirects: bool = True) -> dict:
response = _get(url)
if follow_redirects is True:
count = 0
while response['status'][0] == '3':
count += 1
if count > 20:
raise Exception('Redirect loop')
print('redirect: {}'.format(response['meta']))
response = _get(response['meta'])
return response
def _parse_meta(meta: str) -> dict:
mime, _, params_text = meta.lower().strip().partition(';')
params = {}
if params_text.strip():
for param in params_text.split(';'):
k, val = param.split('=')
params[k.strip()] = val.strip()
params['mime'] = mime.strip()
return params
def _get(url: str) -> dict:
url_parts = urllib.parse.urlsplit(url)
if len(url_parts.path) == 0:
return {
'status': '32',
'meta': urllib.parse.urlunsplit((
url_parts.scheme,
url_parts.netloc,
'/',
url_parts.query,
url_parts.fragment,
))
}
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
port = 1965 if url_parts.port is None else url_parts.port
with socket.create_connection((url_parts.hostname, port)) as sock:
with context.wrap_socket(sock, server_hostname=url_parts.hostname) as ssock:
ssock.sendall('{url}\r\n'.format(url=url).encode('utf8'))
fp = ssock.makefile(mode='rb')
header = fp.readline(1027)
status, meta = header.decode('utf8').split(None, 1)
if status[0] != '2':
return {
'status': status,
'meta': meta.strip(),
}
meta_params = _parse_meta(meta)
body = fp.read()
return {
'status': status,
'meta': meta.strip(),
'body': body.decode(meta_params.get('charset', 'utf8')),
}