1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
import io
import re
import socket
import ssl
import string
import urllib.parse
import fsm
def htmlescape(text: str) -> str:
return text.replace('<', '<').replace('>', '>')
def gem2html(gem: dict) -> str:
params = {
'charset': 'utf-8',
'lang': 'en',
'css': open('style.css').read()
}
if gem['status'][0] == '2':
template = string.Template(open('page_template.html').read())
body = io.StringIO()
parser = fsm.Parser(gem['body'].split('\n'), body)
parser.parse()
params['body'] = body.getvalue()
else:
template = string.Template(open('error_template.html').read())
if gem['status'] == '00':
params['status'] = 'CLIENT ERROR'
elif gem['status'][0] == '4':
params['status'] = gem['status'] + ' TEMPORARY FAILURE'
elif gem['status'][0] == '5':
params['status'] = gem['status'] + ' PERMANENT FAILURE'
else:
params['status'] = 'UNHANDLED STATUS {}'.format(gem['status'])
params['meta'] = gem['meta']
html = template.substitute(params)
with open('latest.html', 'w') as fp:
fp.write(html)
return html
def urljoin(base: str, url: str) -> str:
if base is None:
return url
base = re.sub('^gemini:', 'http:', base)
url = re.sub('^gemini:', 'http:', url)
return re.sub('^http:', 'gemini:', urllib.parse.urljoin(base, url))
def get(url: str, follow_redirects: bool = True) -> dict:
response = _get(url)
if follow_redirects is True:
count = 0
while response['status'][0] == '3':
count += 1
if count > 20:
return {'status': '00', 'meta': 'Too many redirects'}
print('{status} {meta}'.format(**response))
response = _get(response['meta'])
return response
def _parse_meta(meta: str) -> dict:
mime, _, params_text = meta.lower().strip().partition(';')
params = {}
if params_text.strip():
for param in params_text.split(';'):
k, val = param.split('=')
params[k.strip()] = val.strip()
params['mime'] = mime.strip()
return params
def _get(url: str) -> dict:
url_parts = urllib.parse.urlsplit(url)
if len(url_parts.path) == 0:
return {
'status': '32',
'meta': urllib.parse.urlunsplit((
url_parts.scheme,
url_parts.netloc,
'/',
url_parts.query,
url_parts.fragment,
))
}
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
port = 1965 if url_parts.port is None else url_parts.port
with socket.create_connection((url_parts.hostname, port)) as sock:
with context.wrap_socket(sock, server_hostname=url_parts.hostname) as ssock:
ssock.sendall('{url}\r\n'.format(url=url).encode('utf8'))
fp = ssock.makefile(mode='rb')
header = fp.readline(1027)
parts = header.decode('utf8').split(None, 1)
status = parts[0]
if len(parts) == 1:
meta = ''
else:
meta = parts[1]
if status[0] != '2':
return {
'status': status,
'meta': meta.strip(),
}
meta_params = _parse_meta(meta)
body = fp.read()
return {
'status': status,
'meta': meta.strip(),
'body': body.decode(meta_params.get('charset', 'utf8')),
}
except Exception as ex:
return {
'status': '00',
'meta': '{}'.format(ex),
}
|