summaryrefslogtreecommitdiff
path: root/gemini.py
blob: b940bb74c8750d5ef7dd016cce295002fc6b57ec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import io
import re
import socket
import ssl
import urllib.parse

import fsm


def htmlescape(text: str) -> str:
    return text.replace('<', '&lt;').replace('>', '&gt;')


def gem2html(gem: str) -> str:
    html = io.StringIO()
    html.write('<!DOCTYPE html>\n<html lang="en">\n<html>\n<head>\n<meta charset="utf-8"/>\n<style type="text/css">\n')
    html.write(open('style.css', encoding='utf8').read())
    html.write('</style>\n</head>\n<body>\n<div id="root">')
    parser = fsm.Parser(gem.split('\n'), html)
    parser.parse()
    html.write('</div>\n</body>\n</html>')
    print(html.getvalue())
    return html.getvalue()


def urljoin(base: str, url: str) -> str:
    if base is None:
        return url
    base = re.sub('^gemini:', 'http:', base)
    url = re.sub('^gemini:', 'http:', url)
    return re.sub('^http:', 'gemini:', urllib.parse.urljoin(base, url))


def get(url: str, follow_redirects: bool = True) -> dict:
    response = _get(url)
    if follow_redirects is True:
        count = 0
        while response['status'][0] == '3':
            count += 1
            if count > 20:
                raise Exception('Redirect loop')
            print('redirect: {}'.format(response['meta']))
            response = _get(response['meta'])
    return response


def _parse_meta(meta: str) -> dict:
    mime, _, params_text = meta.lower().strip().partition(';')
    params = {}
    if params_text.strip():
        for param in params_text.split(';'):
            k, val = param.split('=')
            params[k.strip()] = val.strip()
    params['mime'] = mime.strip()
    return params


def _get(url: str) -> dict:
    url_parts = urllib.parse.urlsplit(url)
    if len(url_parts.path) == 0:
        return {
            'status': '32',
            'meta': urllib.parse.urlunsplit((
                url_parts.scheme,
                url_parts.netloc,
                '/',
                url_parts.query,
                url_parts.fragment,
            ))
        }
    context = ssl.create_default_context()
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE
    port = 1965 if url_parts.port is None else url_parts.port
    with socket.create_connection((url_parts.hostname, port)) as sock:
        with context.wrap_socket(sock, server_hostname=url_parts.hostname) as ssock:
            ssock.sendall('{url}\r\n'.format(url=url).encode('utf8'))
            fp = ssock.makefile(mode='rb')
            header = fp.readline(1027)
            parts = header.decode('utf8').split(None, 1)
            status = parts[0]
            if len(parts) == 1:
                meta = ''
            else:
                meta = parts[1]
            if status[0] != '2':
                return {
                    'status': status,
                    'meta': meta.strip(),
                }
            meta_params = _parse_meta(meta)
            body = fp.read()
    return {
        'status': status,
        'meta': meta.strip(),
        'body': body.decode(meta_params.get('charset', 'utf8')),
    }