跳转至

第12章 Python Web 开发技术

12.1 通用网关接口(CGI)

12.1.1 CGI 的概念

  • 直接基于Socket的Web应用的问题
  • 要求熟悉HTTP协议和Socket编程
  • 程序的可扩展性很差
  • 难以支撑复杂的业务逻辑
  • 通用网关接口(Common Gateway Interface,CGI)
  • 是一种Web服务器和Web应用程序之间交互的编程规范,定义了Web服务器如何向Web应用程序发送消息,在收到Web应用程序的信息后如何处理等细节
  • 遵循CGI规范的应用程序独立于Web服务器,并能够被Web服务器调用
  • Web应用程序的开发者只需要关注业务逻辑代码的编写,无需关注HTTP协议或Socket等的底层细节
  • 大大降低了Web应用程序的开发难度
  • 通过标准输入输出进行通信,因而可以使用任意编程语言实现

  • CGI编程的不足之处
  • 在每一次HTTP请求中,Web服务器都需要执行CGI脚本,难以满足较大量并发请求的需要;
  • CGI编程依旧比较复杂,相当多的交互依旧发生在HTTP协议的层面之上;
  • CGI程序的安全性较差,容易受到攻击

12.1.2 Python CGI 编程

  • Web应用程序目录
1
2
3
www ─┬── server.py
     ├─  index.html
     └── cgi-bin ──── first_cgi.py
  • 服务器(两种方式)
  • server.py

1
2
3
4
5
6
7
8
# 例12-1
from http.server import HTTPServer, CGIHTTPRequestHandler

port = 9000
server = HTTPServer(('127.0.0.1', port), CGIHTTPRequestHandler)
print('服务器启动 http://{}:{}'.format(*server.server_address))

server.serve_forever()
- 终端命令 python -m http.server --cgi 9000

  • CGI脚本
  • 必须具有可执行权限,在Linux系统中可使用命令chmod a+x cgi.py添加执行权限;
  • 脚本中首注释中给出Python解释器的路径,例如#!/usr/bin/python
  • first_cgi.py
1
# 例 12-2
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
#!/usr/bin/python
# 该路径需根据Python环境进行设置

print("Content-type: text/html")                     # 响应头
print("\r\n")                                        # 空行

print('''
<html>
<head>
    <title>第一个CGI程序</title>
</head>
<body>
    <center><h1>第一个CGI程序</h1></center>
</body>
</html>
''')


在 CGI 脚本中获取请求参数

  • 代码
  • index.html中表单的action属性改为/cgi-bin/post.py
  • post.py
1
# 例 12-3
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
#!/path/to/envs/env_name/bin/python
# 该路径需根据Python环境进行设置
import cgi

print("Content-type: text/html")

print('\r\n')

fields = cgi.FieldStorage()
username = fields.getvalue('usertag')
code = fields.getvalue('code')
print('<html><head><title>作业提交</title></head>')

if username is None and code is None:
    print("<center><h1>请求失败,参数错误!</h1></center>")
else:
    with open('./index.html') as f:
        html = f.read()
        html = html.replace('<span id="info"/>', f'{username} 提交成功! <br>{code}')
        print(html)
print('</html>')


使用Cookie

  • cookie的发送
  • 通过http头部来实现
  • Set-cookie:name=value;expires=date;path=path;domain=domain;secure
  • name=value:cookie变量的名称和取,有多个cookie变量时用 ; 分隔,如name1=value1;name2=value2
  • expires=date:cookie的有效期限,date的格式为Wdy,DD-Mon-YYYY HH:MM:SS
  • path=path:cookie的支持路径,cookie对该路径中所有文件及子目录生效,例如path=/cgi-bin
  • domain=domain:cookie生效的域名,例如:domain="www.example.com"
  • secure:表示cookie只能通过安全套接字协议(SSL)来传递。

  • cookie的接收

  • 通过环境变量HTTP_COOKIE
1
# 例 12-4
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/path/to/envs/env_name/bin/python
# 该路径需根据Python环境进行设置
import cgi
import os
import http.cookies

form = cgi.FieldStorage()
username = form.getvalue('usertag')
code = form.getvalue('code')

print("Content-type: text/html")
print(f"Set-Cookie: code={code}")                 # 设置Cookie项
print('\r\n')

old_code = None
sc = http.cookies.SimpleCookie()                  # Cookie解析器
sc.load(os.environ.get('HTTP_COOKIE'))
old_code = sc.get('code', None)
if old_code: old_code = old_code.value            # 获取Cookie项code的值

if username is None or code is None:
    print("<center><h1>请求失败,参数错误!</h1></center>")
else:
    with open('./index.html') as f:
        html = f.read()
        info = f'{username} 提交成功! <br>{code}'
        if old_code:
            info = info + f'<br>上次提交的代码为:<br>{old_code}'
        html = html.replace('<span id="info"/>', info)
        print(html)

12.2 Web 服务器网关接口(WSGI)

12.2.1 WSGI 的概念

  • CGI编程的根本问题
  • 每次请求都需要调用CGI脚本创建新的进程,并且响应完毕之后即抛弃
  • 解决办法
  • 第一种称为服务器集成,其思路是利用Web服务器提供的API开发Web应用,然后将Web应用作为模块插入到Web服务器中,Web服务器通过函数调用的方式来执行Web应用程序
    • 要求Web应用与Web服务器使用相同的编程语言,而且Web服务器的稳定性直接受到Web应用程序的影响
  • 第二种思路是外部进程,即令Web应用程序独立于Web服务器,形成独立的外部应用程序
    • 需要制定新的专用于Web服务器和外部进程之间的通信协议,例如Java语言中的Servelet和Python语言中的WSGI(Web Server Gateway Interface)

12.2.2 WSGI 应用

  • 满足如下条件的程序就是一个合法的WSGI应用
  • 是一个可调用对象;
  • 能够接收两个参数
    • 第一个是包含了Web服务器环境变量的字典
    • 第二个是一个用于设置HTTP响应状态码和响应头的函数
  • 返回一个用于构造HTTP响应体的可迭代对象
1
# 例 12-5
1
2
3
4
5
6
7
def wsgi_app(env, start_response):
    content = '<h1>第一个WSGI应用</h1>'.encode('utf-8')
    status = '200 OK'
    headers = [('Content-Type', 'text/html'),
               ('Content-Length', str(len(content)))]
    start_response(status, headers)
    return [content]
  • WSGI应用的运行
  • WSGI应用由WSGI服务器(或称为WSGI容器)调用,它就是所谓的外部进程,负责Web服务器和WSGI应用之间的通信交互

  • WSGI的运行过程

12.2.3 WSGI 服务器

  • 简易WSGI服务器
1
# 例 12-6
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# 文件 wsgi_server.py
from socket import socket
from httplib import Parser
import threading, sys, io

class WSGIServer:
    def __init__(self, app, host='127.0.0.1', port=9000):
        self.app = app
        self.host = host
        self.port = port
        self.resp_status = None
        self.resp_headers = None

    def start(self):
        server_socket = socket()
        print(f"启动服务器,http://{self.host}:{self.port}")
        server_socket.bind((self.host, self.port))
        server_socket.listen(10)
        while True:                                        # 服务循环
            conn, addr = server_socket.accept()
            try:
                conn.settimeout(60)
                print(f"客户端:{addr}")
                threading.Thread(target=self.server_thread,# 服务线程
                                 args=(conn,)).start()
            except Exception:
                print('服务器发生错误!')

    def start_response(self, status, headers):
        self.resp_status, self.resp_headers = status, headers

    def server_thread(self, conn):
        parser = Parser()
        while True:                                        # 通信循环
            parser.append(conn.recv(1024))
            if parser.is_ok():
                break
        env = self.make_env(parser)                        # 构造环境变量
        app_contents = self.app(env, self.start_response)  # 调用WSGI应用
        resp_text = self.make_head(self.resp_status, self.resp_headers)
        response = resp_text.encode('utf-8')
        response += b' '.join(app_contents)
        conn.send(response)                                # HTTP响应
        conn.close()

    def make_head(self, status, headers):                  # 构造响应头
        head = f'HTTP/1.1 {status}\r\n'
        for k, v in headers:
            head += f'{k}: {v}\r\n'
        head += '\r\n'
        return head

    def make_env(self, parser):                            # 环境变量字典
        return {
            'wsgi.version': (1, 0),
            'wsgi.url_scheme': 'http',
            'wsgi.input': io.BytesIO(parser.body),
            'wsgi.errors': sys.stderr,
            'wsgi.multithread': True,
            'wsgi.multiprocess': False,
            'wsgi.run_once': False,
            'REQUEST_METHOD': parser.req_method,
            'SCRIPT_NAME': '',
            'PATH_INFO': parser.req_url,
            'CONTENT_TYPE': parser.content_type,
            'CONTENT_LENGTH': parser.content_length,
            'SERVER_NAME': self.host,
            'SERVER_PORT': self.port,
            'QUERY_STRING': parser.query_string,
            'SERVER_PROTOCOL': 'HTTP 1.1',
            'HTTP_COOKIE': parser.head_dict.get('cookie', ''),
            'params': parser.req_params()
        }

if __name__ == '__main__':
    from wsgi_apps import first_wsgi_app
    server = WSGIServer(first_wsgi_app)
    server.start()
  • Python内置的WSGI参考服务器
1
# 例 12-7
1
2
3
4
5
6
7
8
from wsgiref.simple_server import make_server
from apps import wsgi_app

host = '127.0.0.1'
port = 9000
print(f"启动服务器,http://{host}:{port}\n")
server = make_server(host, port, wsgi_app)
server.serve_forever()

生产环境中常用的WSGI服务器

  • 常用的WSGI服务器有uWSGIGunicornCherryPyTornadoPasteWaitress

  • Waitress是一个纯Python的WSGI 服务器,安装使用非常方便,

  • pip install waitress
1
2
3
import waitress
from apps import wsgi_app
waitress.serve(wsgi_app, listen='127.0.0.1:9000')

12.2.4 示例

1
# 例 12-8
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from urllib.parse import parse_qs
import http.cookies

def wsgi_app(env, start_response):
    method = env['REQUEST_METHOD']                        # 请求方法
    query = env['QUERY_STRING']                           # 请求参数
    if not query and method == 'POST':
        content_len = int(env['CONTENT_LENGTH'])
        query = env['wsgi.input'].read(content_len).decode('utf-8')

    with open('index.html') as f:                         # 读取HTML文档
        content = f.read()

    headers = [('Content-Type', 'text/html')]             # 响应头
    info = ''
    if method == 'POST':                                  # 响应POST请求
        # 处理请求参数
        params = parse_qs(query)
        username = params.get('usertag')
        code = params.get('code', '')
        if username:
            username = username[0]
        if code:
            code = code[0]

        # 从cookie中读取上次提交的code,并将本次提交的code写入cookie
        cookie_str = env.get('HTTP_COOKIE', '')
        sc = http.cookies.SimpleCookie()
        sc.load(cookie_str)
        old_code = sc['code'].value if sc.get('code') else ''
        info = f'{username}提交成功!<br>{code}'
        if old_code:
            info = info+f'<br>上次提交的代码为:<br>{old_code}'
        headers.append(('Set-Cookie', f'code={code}'))    # 写入cookie

    content = content.replace('<span id="info"/>', info).encode('utf-8')
    headers.append(('Content-Length', str(len(content)))) # 请求正文长度
    start_response('200 OK', headers)
    return [content]


if __name__ == "__main__":
    # 1.-- WSGIServer
    from wsgi_server import WSGIServer
    server = WSGIServer(wsgi_app)
    server.start()

#     # 2.-- simple_server
#     from wsgiref.simple_server import make_server
#     print("启动服务器,http://127.0.0.1:9000")
#     server = make_server('127.0.0.1', 9000, wsgi_app)
#     server.serve_forever()

#     # 3.-- waitress
#     import waitress
#     waitress.serve(wsgi_app, listen='127.0.0.1:9000'

12.3 异步服务器网关接口(ASGI)*

  • 异步服务器网关接口(Asynchronous Server Gateway Interface, ASGI)是 WSGI 的 “精神继承者”
  • 提供了对异步编程的支持
  • 为新型协议或标准提供了扩展的可能性
  • 在一定程度上兼容 WSGI

12.3.1 ASGI 应用

  • 典型的ASGI应用

1
2
3
4
async def application(scope, receive, send):
    receive_event = await receive()
    ...
    await send(send_event
- 参数说明 - scope:是一个字典,其中包含了服务器与客户端之间连接的详细信息(键和取值的格式在子协议中规定),其作用与WSGI应用中的环境变量参数env相似 - receive:是一个异步可等待对象,用于接收来自客户端的消息,用await语句异步运行后返回一个事件(Event),其中包含了客户端的消息内容 - send:也是一个异步可等待对象,用于向客户端发送消息,用await语句异步运行时以一个事件作为参数,其中包含了需要发送至客户端的消息内容

  • 事件
  • 字典,其键和取值由不同的子协议来确定
  • HTTP 子协议
  • WebSocket 子协议
  • Lifespan 子协议

12.3.2 HTTP 子协议

scope字典

  • type"http"
  • asgi["version"]:ASGI 协议版本,当前最新版本为"3.0"
  • asgi["spec_version"]:HTTP 子协议版本,取值为"2.0""2.1",默认为"2.0"
  • http_version"1.0""1.1""2"
  • method:HTTP 请求方法(大写形式)
  • scheme:协议名,取值为 "http""https"
  • path:HTTP 请求路径字符串(不包括查询串部分)
  • raw_path:HTTP 请求头部路径部分的原始字节串,默认取值为 None
  • query_string:查询字符串
  • root_path:ASGI 应用的根路径,与 WSGI 中 SCRIPT_NAME 的含义相同,默认取 值为空字符串
  • headers:HTTP 请求头,取值为形如 [[header_name, header_value]] 的可迭 代对象,header_nameheader_value都是字节串,请求头的顺序与其在 HTTP 请 求中的出现顺序一致
  • client:客户端地址,取值为 [host, port],默认为 None
  • server:服务器地址,取值为[host, port]或者[sock, None](sock 为套接字对象),默认为 None

请求事件

  • 请求事件是receive的异步调用返回结果,在 ASGI 应用主要用于获取 HTTP 请求正文,常用的键和取值
  • type: 取值为"http.request"
  • body: HTTP 请求正文字节串,默认取值为b"",当请求正文较大时可以分块处理
  • more_body: 取值为TrueFalse,当取值为True时表示请求正文分块传输,并且还存在后续数据块,WSGI 应用应当等待所有数据块接收完毕后合并处理

响应开始事件

  • 与 WSGI 应用中的start_response相似,表示开始构建 HTTP 响应
  • type: 取值为"http.response.start"
  • status: HTTP 响应状态码(整数)
  • headers: HTTP 响应头,取值为形如[[header_name, header_value]]的可迭代对象,header_nameheader_value都是字节串,响应头的顺序与在 HTTP 响应 中的出现顺序一致

响应正文事件

  • 包含了 HTTP 响应正文信息
  • type: 取值为"http.response.body"
  • body: HTTP 响应正文字节串,默认取值为b"",响应正文也可以分块处理
  • more_body:取值为TrueFalse,当取值为True时表示还存在后续块

12.3.3 ASGI 服务器

1
# 例12-9
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# 文件 asgi_server.py
import asyncio
from httplib import Parser

class ASGIServer:
    def __init__(self, app, host='127.0.0.1', port=9000):
        self.app = app
        self.host = host
        self.port = port

    async def __asgi_server(self, reader, writer):
        print(writer.get_extra_info('peername'))
        parser = Parser()
        msgs_receive = asyncio.Queue()           # HTTP请求事件消息队列
        while True:                              # 接收客户端HTTP请求
            data_recv = await reader.read(1024)
            parser.append(data_recv)
            if parser.is_ok(): break
        msg = self.make_msg(parser)              # 构造请求事件消息
        await msgs_receive.put(msg)
        scope = self.make_scope(parser)
        msgs_send = asyncio.Queue()              # HTTP响应事件消息队列
        await self.app(scope, msgs_receive.get,  # 异步调用ASGI应用
                       msgs_send.put)
        while True:                              # 处理HTTP响应事件
            msg = await msgs_send.get()
            if not self.send_msg(msg, writer): break
        await writer.drain()
        writer.close()

    def make_msg(self, parser):                   # 构造请求事件字典
        return { "type": "http.request",
                 "body": parser.body,
                 "more_body": False}

    def make_scope(self, parser):                 # 构造scope字典
        headers = [[name.encode('utf-8'), value.encode('utf-8')]
                   for name, value in parser.head_dict.items()]
        return { "type": "http",
                 "method": parser.req_method,
                 "scheme": "http",
                 "raw_path": parser.req_url.encode(),
                 "query_string": parser.query_string,
                 "path": parser.path,
                 "headers": headers}

    def send_msg(self, msg, writer):
        if msg["type"] == "http.response.start":  # 处理响应开始事件
            writer.write(b"HTTP/1.1 %d\r\n" % msg["status"])
            for header in msg["headers"]:
                writer.write(b"%s: %s\r\n" % (header))
            writer.write(b"\r\n")
        if msg["type"] == "http.response.body":  # 处理响应正文事件
            writer.write(msg["body"])
            return msg.get("more_body", False)
        return True

    def start(self):
        print(f"启动服务器,http://{self.host}:{self.port}")
        async def main():
            server = await asyncio.start_server(self.__asgi_server,
                                                self.host, self.port)
            await server.serve_forever()
        asyncio.run(main())

12.3.4 示例

1
# 例12-10
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from urllib.parse import parse_qs
from http import cookies

async def asgi_app(scope, receive, send):
    method = scope['method']                        # 请求方法
    code = ''
    info = ''
    if method == 'POST':
        event = await receive()                     # 接收请求事件
        # 处理请求参数
        query = event['body'].decode('utf-8')
        params = parse_qs(query)
        username = params.get('usertag')
        code = params.get('code', '')
        if username:
            username = username[0]
        if code:
            code = code[0]
        # 从cookie中读取上次提交的code,并将本次提交的code写入cookie
        cookie_str = dict(scope["headers"]).get('cookie', '')
        sc = cookies.SimpleCookie()
        sc.load(cookie_str)
        old_code = sc['code'].value if sc.get('code') else ''
        info = f'{username}提交成功!<br>{code}'
        if old_code:
            info = info+f'<br>上次提交的代码为:<br>{old_code}'

    with open('./index.html') as f:
        body = f.read()
    body = body.replace('<span id="info"/>', info).encode('utf-8')
    await send({                                    # 发送响应开始事件
        "type": "http.response.start",
        "status": 200,
        "headers": [
            (b"Content-Length", b"%d" % len(body)),
            (b"Content-Type", b"text/html"),
            (b"Set-Cookie", f'code={code}'.encode('utf-8')),
        ],
    })
    await send({                                    # 发送响应正文事件
        "type": "http.response.body",
        "body": body,
    })

if __name__ == "__main__":
    # 1. -- ASGIServer
    from asgi_server import ASGIServer
    server = ASGIServer(asgi_app)
    server.start()

#     # 2. -- Uvicorn
#     import uvicorn
#     uvicorn.run(asgi_app, host="127.0.0.1", port=9000)

12.4 Web 应用框架

12.4.1 Web 框架的基本概念

  • Web 框架
  • Web 框架对 Web 应用程序开发过程中的通用部分进行了抽象和封装,使得 Web 应用程序开发者不必关心底层的实现细节,大大降低了开发的难度和成本
  • Python 中的 Web 框架本质上就是一个 WSGI 应用或 ASGI 应用
  • Web 框架的其他功能
  • 安全问题
  • URL路由
  • Cookie使用
  • 会话管理
  • 模板引擎
  • 数据库访问
  • ... ...

  • 常见 WSGI 框架
  • Flask、Django、Tornado、Bottle等,
  • 常见ASGI 框架
  • FastAPI、Django Channels、Starlette

12.4.2 WSGI 框架

  • 工具(AGSI框架也会用到)
1
# 例12-11
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 文件 framwork_utils.py
from pathlib import Path

def router(path, method, handlers):            # URL路由装饰器
    def decorater(handler):
        handlers[method][path] = handler
        return handler
    return decorater

def load_static(static_path):                  # 加载静态文件
    statci_dict = dict()
    for f_type in ['html', 'js', 'css']:
        for file in Path(static_path).glob(f'*.{f_type}'):
            with open(file) as f:
                statci_dict[file.name] = f.read()
    return statci_dict

page404 = '''<html><head><title>404</title></head><body>
        <center><br><h1>404: page note found</h1></center>
        </body></html>'''
  • 简易WSGI框架
1
# 例12-12
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 文件wsgi_framework.py
from urllib.parse import parse_qs
from http.cookies import SimpleCookie
from framwork_utils import *

class WSGIFramework:
    def __init__(self, static='.'):
        self.handlers = {'GET': dict(), 'POST': dict()}
        self.statics = load_static(static)

    def route(self, path, method):                      # URL路由装饰器
        assert method in ['GET', 'POST'], '方法必须是GET或POST'
        return router(path, method, self.handlers)

    def __call__(self, env, start_response):            # WSGI应用
        method = env['REQUEST_METHOD']                  # 请求方法
        path = env['PATH_INFO']                         # 请求路径
        cookie_str = env['HTTP_COOKIE']                 # Cookies
        query = env['QUERY_STRING']
        req_obj = None
        if method == 'POST':
            size = int(env.get('CONTENT_LENGTH', 0))
            query = env['wsgi.input'].read(size).decode('utf-8')
            req_obj = parse_qs(query)                   # 处理请求参数
            sc = SimpleCookie()
            sc.load(cookie_str)                         # 处理Cookie
            req_obj['cookies'] = {k:m.value for k, m in sc.items()}
        handler = self.handlers[method].get(path, False)# 获取handler
        ck_dict = None
        if not handler:
            status = '404 Not Found'
            content = page404.encode('utf-8')
        else:
            status = '200 OK'
            content = handler(req_obj)                  # 执行handler
            if isinstance(content, tuple):
                content, ck_dict = content
            content = content.encode('utf-8')
        headers = [('Content-Type', 'text/html'),
                   ('Content-Length', str(len(content)))]
        if ck_dict:
            set_ck = ';'.join([f'{k}={v}' for k, v in ck_dict.items()])
            headers.append(('Set-Cookie', set_ck))
        start_response(status, headers)
        return [content]
  • WSGI Web 框架的应用
1
# 例12-13
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from wsgi_framework import WSGIFramework
app = WSGIFramework()

@app.route('/', 'GET')
def index_get(_):
    content = app.statics['index.html']
    return content

@app.route('/', 'POST')
def index_post(request):
    content = app.statics['index.html']
    username = request.get('usertag')
    code = request.get('code')
    old_code = request.get('cookies').get('code', '')
    if username:
        username = username[0]
    if code:
        code = code[0]
    info = f'{username}提交成功!<br>{code}'
    if old_code:
        info = info+f'<br>上次提交的代码为:<br>{old_code}'
    content = content.replace(r'<span id="info"/>', info)
    return content, {'code':code}

if __name__ == '__main__':
    # 1. -- WSGIServer
    from wsgi_server import WSGIServer
    server = WSGIServer(app)
    server.start()

#     # 2. -- waitress
#     import waitress
#     waitress.serve(app, listen='127.0.0.1:9000')

Flask

Flask是一个著名的“微型”Python Web开发框架 - “微型”指的是Flask仅保持一个小而简单但是足够强大、可扩展性良好的核心 - 默认情况下Flask仅包含基本的Web请求响应以及URL路由等核心功能,不过利用丰富的扩展可以非常方便地添加数据库对象关系映射、模板引擎、表单验证等功能,从而打造与项目相适应的Web开发框架 - 安装 - pip install flask - Flask依赖于Werkzeug和 Jinja2 - Werkzeug是一个WSGI的工具包实现了WSGI的底层功能如请求/响应对象对象的构建、URL路由、环境配置等 - Jinja2是一个模板引擎

(1)基本应用
1
2
3
4
5
6
7
8
9
from flask import Flask
app = Flask(__name__)

@app.route('/')
def hello_web():
    return "Hello Web"

if __name__ == '__main__':
    app.run(host='127.0.0.1', port=9000, debug=True)
(2)URL路由
  • 两种方式:
  • add_url_rule方法
  • route装饰器
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from flask import Flask
app = Flask(__name__)

def hello_web():
    return '''
    <html>
      <head><title>Flast Test</title></head>
      <body><center><h1>Hello Web</h1><center></body>
    </html>'''

app.add_url_rule('/', view_func=hello_web)

if __name__ == '__main__':
    app.run(host='127.0.0.1', port=9000, debug=True)
  • URL变量处理
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from flask import Flask
app = Flask(__name__)

@app.route('/<user>')
def hello_web(user):
    return f'''
    <html>
      <head><title>Flast Test</title></head>
      <body><center><h1>Hello {user}</h1><center></body>
    </html>'''


if __name__ == '__main__':
    app.run(host='127.0.0.1', port=9000, debug=True)
  • URL变量转换器
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from flask import Flask
app = Flask(__name__)

@app.route('/<user>/<int:uid>')
def hello_web(user, uid):
    return f'''
    <html>
      <head><title>Flast Test</title></head>
      <body><center><h1>Hello {user}, your id is {uid}</h1><center></body>
    </html>'''


if __name__ == '__main__':
    app.run(host='127.0.0.1', port=9000, debug=True)
  • 常用变量转换器
  • int转换器 <int:param> :接收整数
  • float转换器 <float:param>: 接收浮点数
  • string转换器 <string:param>: 接收string类型(默认转换器)
  • path转换器 <path:param>:和默认的相似,但也接收斜线
  • 自定义转换器(继承werkzeug.routing.BaseConverter,添加到字典app.url_map.converters之中)
(3)request对象
  • request对象的属性
  • method:当前请求方法;
  • Form:包含表单参数及其值的字典;
  • args:查询字符串中的参数;
  • Cookies:Cookie字典;
  • files:文件上传的相关数据。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from flask import Flask, request
app = Flask(__name__)

@app.route('/', methods=['GET', 'POST'])
def hello_web():
    f = open('./templates/index.html')
    html = f.read()
    f.close()
    if request.method == 'GET':
        return html.replace('{{info|safe}}', '')
    else:
        username = request.form.get('usertag')
        code = request.form.get('code')
        return html.replace('{{info|safe}}', f'提交成功<br>用户名:{username}<br>代码:<br>{code}')


if __name__ == '__main__':
    app.run(host='127.0.0.1', port=9000, debug=True)
(4)模板
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from flask import Flask, request, render_template
app = Flask(__name__, template_folder='templates', static_folder='static', static_url_path='/static')


@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')
    username = request.form['usertag']
    code = request.form['code']
    print(code)
    return render_template('index.html', info=f'提交成功<br>用户名:{username}<br>代码:<br>{code}')


app.run(debug=True, use_reloader=True, host='127.0.0.1', port=9000)
  • jinja2
  • 控制结构{% %}
  • 变量取值{{ }}
  • 注释{# #}
  • for循环

1
2
3
4
5
<ul>
{% for user in users %}
<li>{{user.username}}</li>
{% endfor %}
</ul>
- 过滤器 - safe:不对字符串进行转义; - capitialize:字符串首字母大写; - lower:字符串小写; - upper:字符串大写; - title:单词首字母大写; - trim:去除字符串两端空格; - striptags:去除字符串中所有HTML; - join:拼接多个值为字符串; - replace:字符串替换; - round:四舍五入; - int:字符串转整型。

(5)静态文件处理
  • 静态文件主要包括图片、视频、javascript、css等,其特点是数量较多、不会发生变化、不需要Python代码介入
1
2
from flask import Flask, request, render_template
app = Flask(__name__, template_folder='templates', static_folder='static', static_url_path='/static')
(6)Cookie与Session
  • Cookie
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from flask import Flask, request, render_template, make_response
app = Flask(__name__, template_folder='templates', static_folder='static', static_url_path='/static')

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('static.html')

    username = request.form['usertag']
    code = request.form['code']
    old_code = request.cookies.get('code')

    content = render_template('static.html',
                           info=f'提交成功<br>用户名:{username}<br>代码:<br>{code}<br>上一次提交的代码:<br>{old_code}'
                           )
    response = make_response(content)
    response.set_cookie("code", code)
    return response


app.run(debug=True, use_reloader=True, host='127.0.0.1', port=9000)
  • Session
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from flask import Flask, request, render_template, session
app = Flask(__name__, template_folder='templates', static_folder='static', static_url_path='/static')
app.secret_key = 'keykey'

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('static.html')

    username = request.form['usertag']
    password = request.form['password']
    code = request.form['code']
    user = session.get('user')
    print(user)
    if user:
        info = f'提交成功<br>用户名:{user}<br>代码:<br>{code}'
    elif (username == 'admin' and password == '123456'):
        info = f'提交成功<br>用户名:{username}<br>代码:<br>{code}'
        session['user'] = username
    else:
        info = f'用户名和密码不正确!'

    return render_template('static.html', info=info)

@app.route('/logout')
def logout():
    if session.get('user'):
        del session['user']
    return render_template('static.html')

app.run(debug=True, use_reloader=True, host='127.0.0.1', port=9000)
(7) 拦截器
1
2
3
@app.before_request
def intercept():
    print(f"拦截 {request.path}, {request.method}")
(8) 部署
  • Nginx配置
1
2
3
4
5
6
7
8
9
        location / {
            proxy_pass http://127.0.0.1:9000;
        }
        location /static {
            alias /Users/liuchen/Desktop/Python编程及应用/code/web/server-flask/static;
            access_log off;
            expires 10d;
            break;
        }
  • WSGI服务器
  • 端口为9000

  • Ningx命令

  • nginx:启动ningx
  • nginx -s reload:重新加载nginx配置
  • nginx -s stop:关闭nginx

12.4.3 ASGI 框架 *

  • 简易 ASGI Web 框架
1
# 例 12-14
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 文件asgi_framework.py
from urllib.parse import parse_qs
from http.cookies import SimpleCookie
from framwork_utils import *

class ASGIFramework:
    def __init__(self, static='.'):
        self.handlers = {'GET': dict(), 'POST': dict()}
        self.statics = load_static(static)

    def route(self, path, method):                      # URL路由装饰器
        assert method in ['GET', 'POST'], '方法必须是GET或POST'
        return router(path, method, self.handlers)

    async def __call__(self, scope, receive, send):     # ASGI应用
        method = scope['method']                        # 请求方法
        path = scope['raw_path'].decode('utf-8')        # 请求路径
        cookie_str = dict(scope["headers"]).get(b'cookie', '') # Cookies
        req_obj = None
        if method == 'POST':
            event = await receive()                     # 接收请求事件
            query = event['body'].decode('utf-8')
            req_obj = parse_qs(query)                   # 处理请求参数
            sc = SimpleCookie()
            sc.load(cookie_str.decode('utf-8'))         # 处理Cookie
            req_obj['cookies'] = {k: m.value for k, m in sc.items()}
        handler = self.handlers[method].get(path, False)# 获取handler
        ck_dict = None
        if not handler:
            status = 404
            content = page404.encode('utf-8')
        else:
            status = 200
            content = await handler(req_obj)            # 执行handler
            if isinstance(content, tuple):
                content, ck_dict = content
            content = content.encode('utf-8')
        headers = [
            (b"Content-Length", b"%d" % len(content)),
            (b"Content-Type", b"text/html"),
        ]
        if ck_dict:
            set_ck = ';'.join([f'{k}={v}' for k, v in ck_dict.items()])
            headers.append((b'Set-Cookie', set_ck.encode('utf-8')))
        await send({                                    # 发送响应开始事件
            "type": "http.response.start",
            "status": status,
            "headers": headers
        })
        await send({                                    # 发送响应正文事件
            "type": "http.response.body",
            "body": content,
        })
  • ASGI Web 框架的应用
1
# 例12-15
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from asgi_framework import ASGIFramework
app = ASGIFramework()

@app.route('/', 'GET')
async def index_get(_):
    content = app.statics['index.html']
    return content

@app.route('/', 'POST')
async def index_post(request):
    content = app.statics['index.html']
    username = request.get('usertag')
    code = request.get('code')
    old_code = request.get('cookies').get('code', '')
    if username:
        username = username[0]
    if code:
        code = code[0]
    info = f'{username}提交成功!<br>{code}'
    if old_code:
        info = info+f'<br>上次提交的代码为:<br>{old_code}'
    content = content.replace(r'<span id="info"/>', info)
    return content, {'code':code}

if __name__ == '__main__':
    # 1. -- ASGIServer
    from asgi_server import ASGIServer
    server = ASGIServer(app)
    server.start()

#     # 2. -- Uvicorn
#     import uvicorn
#     uvicorn.run(app, host="127.0.0.1", port=9000)

12.5 Web 开发中的设计模式

12.5.1 MVC 模式

MVC是一种软件的开发模式,将软件的业务逻辑、数据与界面显示相分离,从而降低它们之间的耦合性,以便于代码的重用、重构与维护。

  • 模型(Model):负责封装应用的业务逻辑数据及相关操作方法,例如数据库的查询、更新等操作;
  • 视图(View):负责呈现业务逻辑的处理结果;
  • 控制器(Controller):负责控制应用的业务流程,对模型和视图进行组织、整合。

  • MVC的优势
  • 开发效率高
  • 可维护性好
  • 代码复用度高
  • 便于设计
  • 利于实现工具化和自动化

  • Web 开发中的 MVC 模式

  • HTTP 协议的请求/响应访问模式使得模型不能主动向视图发送消息

12.5.2 MVC 模式的变种

  • MTV(Model-Template-View)
  • 模型:也称为数据存取层,负责数据库的访问与数据验证等
  • 模板:也称为表现层,负责页面和数据的显示
  • 视图:也称为业务逻辑层,根据业务逻辑调用模型获取业务逻辑数据,然后将业务逻辑数据与模板结合生成响应信息

  • MVP(Model-View-Presenter)
  • 模型
  • 视图
  • Presenter:MVC 模式中的控制器,不过它主要负责业务逻辑处理

  • MVVM(Model-View-ViewModel)
  • Model
  • View
  • ViewModel
  • 视图与 ViewModel 之间的数据绑定