跳转至

第10章 网络编程与并发处理

10.1 网络套接字的概念

  • 套接字(Socket)
  • 是一种用于网络通信的数据结构,是对 TCP 协议或 UDP 协议的封装,用于 描述计算机网络上的一个通信终端
  • 客户机端套接字和服务器端套接字
  • 套接字源于伯克利版本的 Unix 系统(即 BSD Unix)
  • 最初被用于作为进程间通信 (Inter Process Communication, IPC)的一种方式
  • 也称为伯克利套接字或 BSD 套接字

10.1.1 套接字的类型

  • 基于文件的套接字
  • 用于在同一计算机的不同进程之间通信时基于文件实现
  • 地址簇(Address Family)名称AF_UNIX
  • 基于网络的套接字
  • 用于不同计算机之间通 信
  • 地址簇名称为AF_INET

  • 面向连接的套接字

  • 基于 TCP 协 议实现
  • SOCK_STREAM
  • 面向无连接的套接字
  • 基于 UDP 协议实现
  • SOCK_DGRAM

10.1.2 基于套接字的网络通信过程

  • 面向连接的套接字(TCP)

  • 面向无连接的套接字(UDP)

10.2 套接字编程

10.2.1 socket模块

  • socket函数
  • 用于创建一个套接字对象
  • 参数

    • family:地址簇,取值可为AF_INET(默认)、AF_INET6AF_UNIXAF_CAN, AF_PACKETAF_RDS
    • type:套接字类型,取值可为SOCK_STREAM(默认)、SOCK_DGRAMSOCK_RAW或者 其他名称以SOCK_开头的常量之一
    • proto:协议号,通常取值为 0,当协议簇为AF_CAN时取值为CAN_RAWCAN_BCMCAN_ISOTPCAN_J1939之一;
    • fileno:套接字文件描述符,如果指定了fileno,那么将从中自动检测familytypeproto的值,更重要的是在不同程序中使用相同的套接字文件描述符能够获取同 一个套接字对象
  • 套接字对象常用方法和属性

方法/属性 功能
bind(address) 用于服务器端,将套接字绑定到地址 addressaddress 是由 IP 字符串和端口号组成的元组。
listen(backlog) 用于服务器端,启动监听准备接受连接请求。backlog 用于指 定等待建立连接的最大客户端请求数量,未指定时根据系统状 态自动设定合理取值。
accept() 用于服务器端,接受一个连接请求,返回一个用于向客户端发 送数据的套接字对象以及客户端地址。
connect(address) 用于客户端,向服务器端发起连接请求,当使用 AF_INET 地 址簇时,address 是服务器端 IP 字符串和端口号组成的元组。
recv(bufsize) 从 TCP 套接字接中收数据,返回值一个字节串,bufsize 用 于指定一次接收的最大数据量。
recv_into(buffer, nbytes) 从 TCP 套接字接收至多 nbytes 个字节,将其写入缓冲区而 不是创建新的字节串,nbytes 默认为 0,表示接受的最大数据 量取决于缓冲区大小。
send(bytes) 通过 TCP 套接字发送数据,bytes 为字节串。
sendall(bytes) 持续发送 TCP 数据 bytes,直到发送完毕或发生错误,成功 返回 None
recv_from(bufsize) 从 UDP 套接字接中收数据,返回值字节串和发送端地址, bufsize 用于指定一次接收的最大数据量。
recv_from_into 从 UDP 套接字接收至多 nbytes 个字节,将其写入缓冲区而 不是创建新的字节串,nbytes 默认为 0,表示接受的最大数据 量取决于缓冲区大小,返回值字节串和发送端地址。
sendto(bytes, address) 通过 UDP 套接字发送数据 bytesaddress
getpeername() 返回 TCP 套接字连接到的远程地址。
getsockname() 返回当前套接字的地址。
shutdown(how) 半关闭或全关闭 TCP 连接,how 取值为 SHUT_RDSHUT_WRSHUT_RDWR,表示关闭后不再允许接收、发送或者都不允许。
close() 关闭套接字。
setsockopt(level,opt,value) 设置套接字选项取值。

10.2.2 面向连接的套接字编程

  • TCP 服务器端
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from socket import socket
from datetime import datetime

server_socket = socket()                              # 创建套接字对象
server_socket.bind(('127.0.0.1', 9000))               # 绑定地址
server_socket.listen()                                # 监听
print('TCP服务器启动,监听之中... ...')

while True:                                           # 服务循环
    conn, client_addr = server_socket.accept()        # 接受连接请求
    print(f'客户端{client_addr}连接成功,等待输入 ...')

    while True:                                       # 通信循环
        data_recv = conn.recv(1024).decode('utf-8')   # 接收数据

        now = datetime.now().strftime("%H:%M:%S")
        print(f'{now} 接收到数据:{data_recv}')
        send_data = f'接收到长度为 {len(data_recv)} 的数据'
        conn.send(send_data.encode('utf-8'))          # 发送数据

        if not data_recv:
            conn.close()                              # 关闭连接
            break
    print(f'客户端{client_addr}连接结束!')
  • TCP 客户端
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from socket import socket
from datetime import datetime

tcp_socket = socket()                                 # 创建套接字
tcp_socket.connect(('127.0.0.1', 9000))               # 发起连接请求
while True:
    data_send = input('> ')
    tcp_socket.send(data_send.encode('utf-8'))        # 发送数据
    if not data_send:
        break
    data_recv = tcp_socket.recv(1024)                 # 接收数据
    now = datetime.now().strftime("%H:%M:%S")
    print(f'{now} 服务器回复:{data_recv.decode("utf-8")}')
tcp_socket.close()
  • 注意:需要将上述服务器端和客户端保存为脚本,在两个终端分别运行。

10.2.3 面向无连接套接字编程

  • UDP 服务器端
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from socket import socket 
from datetime import datetime

server_socket = socket()                                 # 创建套接字
server_socket.bind(('127.0.0.1', 9000))                  # 绑定地址
print('UDP服务器启动,等待客户端数据 ...')

while True:
    data_recv, address = server_socket.recvfrom(1024)    # 接收数据
    now = datetime.now().strftime("%H:%M:%S")
    data_recv = data_recv.decode("utf-8")
    print(f'{now}接收到来自{address[0]}的数据:{data_recv}')
    data_send = f'接收到长度为 {len(data_recv)} 的数据'.encode('utf-8')
    server_socket.sendto(data_send, address)             # 发送数据
server_socket.close()
  • UDP客户端
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from socket import socket
import datetime

client_socket = socket()                                 # 创建套接字
while True:
    data_send = input('> ').encode("utf-8")
    if not data_send:
        break
    client_socket.sendto(data_send, ('127.0.0.1', 9000)) # 发送数据
    data_recv, addr = client_socket.recvfrom(1024)       # 接收数据
    now = datetime.datetime.now().strftime("%H:%M:%S")
    data_recv = data_recv.decode(encoding="utf-8")
    print(f'{now} 服务器回复:{data_recv}')
client_socket.close()
  • 在终端中运行

10.2.4 并发问题

  • 并发
  • 在一个很短的时间段内多个客户端同时对服务器发起连接请求
  • 对于基于面向连接的应用来说,并发带来的问题尤其严重
  • 并发处理
  • 多进程编程
  • 多线程编程
  • 异步编程
  • 多路复用

10.3 多进程编程

10.3.1 进程的创建与运行

  • 多进程编程相关的模块
  • multiprocessing
  • subprocess
  • multiprocessing.Process
  • 实现多进程编程
  • 初始化参数

    • group: 取值总是为None,其存在只是为了与多线程编程的 API 相兼容;
    • target: 进程的调用目标,是一个可调用对象,表示该进程要执行的任务;
    • name: 进程的名称;
    • args: 传递给调用目标的位置参数元组;
    • kwargs: 传递给调用目标的关键字参数字典;
    • daemon: 新的进程是否守护进程,取值为TrueFalse。守护进程是一直在后台运行不受终端控制的特殊子进程,主要用于为其他子进程提供服务。父进程在退出时会终止所有守护子进程。
  • Process 类的常用方法和属性

方法/属性 作用
run() 进程所执行的方法,该方法会调用 target 属性或者在子类中被重写
start() 启动进程
join(timeout) 由父进程调用以阻塞自己直到子进程结束,timeout 默认取值为 None
is_alive() 进程是否仍处理活动状态
terminate() 终止进程
daemon 是否是守护进程,如果要设置该属性的值,必须在 start 方法调用之前
pid 进程号
  • Process类的使用方法有两种
  • 创建Process类对象时指定target参数
  • 继承Process

  • 通过指定target创建子进程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import os
import time
from multiprocessing import Process

def target_fun(name_):
    print(f'这里是子进程{name_}')
    time.sleep(20)

if __name__ == '__main__':
    subp_A = Process(target=target_fun, args='A')
    subp_B = Process(target=target_fun, args='B')
    subp_A.start()
    subp_B.start()
    print(f'这里是父进程,进程ID为 {os.getpid()}')
    print(f'子进程A的ID为 {subp_A.pid}')
    print(f'子进程B的ID为 {subp_B.pid}')
    subp_A.join()
    subp_B.join()
  • 运行结果

    1
    2
    3
    4
    5
    这里是父进程,进程ID为 2042
    子进程A的ID为 2043
    子进程B的ID为 2044
    这里是子进程B
    这里是子进程A
    

  • 进程树

  • 通过继承Process类创建子进程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import os
import time
from multiprocessing import Process

class SubProcess(Process):
    def __init__(self, name_):
        self.__name = name_
        super().__init__()

    def run(self):
        print(f'这里是子进程{self.__name}')
        time.sleep(20)

if __name__ == '__main__':
    subp_A = SubProcess('A')
    subp_B = SubProcess('B')
    subp_A.start()
    subp_B.start()
    print(f'这里是父进程,进程ID为 {os.getpid()}')
    print(f'子进程A的ID为 {subp_A.pid}')
    print(f'子进程B的ID为 {subp_B.pid}')
    subp_A.join()
    subp_B.join()

10.3.2 利用多进程处理网络并发

  • 每当服务器端接受一个客户端发起的连接请求时,就新建一个进程来为客户端提供服务
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from socket import socket
from datetime import datetime
from multiprocessing import Process

class ServiceProcess(Process):
    def __init__(self, socket, addr):
        self.conn = socket
        self.addr = addr
        super().__init__()

    def run(self):
        print(f'客户端{self.addr}连接成功,等待输入 ...')
        while True:                                       # 通信循环
            data_recv = self.conn.recv(1024)              # 接收数据
            data_recv = data_recv.decode('utf-8')
            now = datetime.now().strftime("%H:%M:%S")
            print(f'{now} 接收到来自{self.addr}的数据:{data_recv}')
            send_data = f'接收到长度为 {len(data_recv)} 的数据'
            self.conn.send(send_data.encode('utf-8'))     # 发送数据
            if not data_recv:
                self.conn.close()
                break
        print(f'客户端{self.addr}连接结束!')

def main():
    server_socket = socket()                              # 创建套接字对象
    server_socket.bind(('127.0.0.1', 9000))               # 绑定地址
    server_socket.listen()                                # 监听
    print('TCP服务器启动,监听之中... ...')
    while True:
        conn_socket, addr = server_socket.accept()        # 接受连接请求
        subp = ServiceProcess(conn_socket, addr)          # 创建子进程
        subp.start()                                      # 启动子进程

if __name__ == '__main__':
    main()

10.3.3 利用进程池处理网络并发

  • 进程池
  • 进程池是一种维护进程的数据类型
    • 它在启动的时候创建一定量的空闲进程,每当需要的时候就取出一个进程来执行任务
    • 任务结束之后进程并不会退出,而是放回进程池中重新进入空闲状态等待下次调用
  • 避免频繁创建和销毁进程带来的开销,能够大大降低系统的压力
  • 进程池的实现
  • Python中进程池由multiprocessingpool子模块中的Pool类实现
  • 为了与线程池的API相统一,concurrent.futures中的ProcessPoolExecutor类对Pool进行了包装

  • 利用进程池实现并发处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from socket import socket
from datetime import datetime
from concurrent.futures import ProcessPoolExecutor

def service_task(conn, addr):
    print(f'客户端{addr}连接成功,等待输入 ...')
    while True:                                           # 通信循环
        data_recv = conn.recv(1024).decode('utf-8')       # 接收数据
        now = datetime.now().strftime("%H:%M:%S")
        print(f'{now} 接收到来自{addr}的数据:{data_recv}')
        send_data = f'接收到长度为 {len(data_recv)} 的数据'
        conn.send(send_data.encode('utf-8'))              # 发送数据
        if not data_recv:
            conn.close()
            break
    print(f'客户端{addr}连接结束!')

def main():
    server_socket = socket()                              # 创建套接字对象
    server_socket.bind(('127.0.0.1', 9000))               # 绑定地址
    server_socket.listen()
    print('TCP服务器启动,监听之中... ...')
    pool = ProcessPoolExecutor(10)                        # 进程池
    while True:
        conn, addr = server_socket.accept()               # 接受连接请求
        pool.submit(service_task, conn, addr)

if __name__ == '__main__':
    main()

10.4 多线程编程

10.4.1 线程的概念与特点

  • 线程
  • 又称为轻量级进程,运行开销远小于进程,是并发编程的一种有效方法
  • 线程是为了进一步提高程序并发处理能力而在进程中引入的一种执行单元
  • 多线程编程
  • 当进程中的线程数量多于一个时,就称为多线程编程
  • 线程的优势
  • 消耗的 CPU 和内存资源更少,隶属于同一进程的线程共享着进程的绝大多数资源
  • 线程的切换开销也远小于进程切换
  • 能够利用多个或多核 CPU(Python中由于全局锁的存在而不具备这样的能力)
  • 实现
  • threading模块
  • Thread
  • Python 统一了多线程编程和多进程编程的 API,因此Thread类和Process类使用方法基本相同

10.4.2 网络并发处理的多线程方法

  • 多线程
  • 将多进程并发处理中的Process类直接替换为Thread类即可
  • 线程池
  • 将进程池并发正理中的ProcessPoolExecutor类直接替换为ThreadPoolExecutor类即可

10.5 异步编程 *

  • 异步编程是一种新型的并发处理技术,能够在单线程中以极高的效率处理输入/输出 密集型并发任务

10.5.1 异步编程概念

  • 计算机程序的类型
  • 计算密集型
  • 输入/输出密集型
    • 网络程序就是一种典型的输入/输出密集型任务,套接字对象的acceptsendrecv等方法都会导致服务器端程序阻塞
  • 同步运行
  • 程序要执行的多项操作按时间顺序进行
    • 当遇到输入/输出时即阻塞等待,直到输入/输出结束后继续执行
  • 异步运行
  • 程序在运行过程中遇到输入/输出时不发生阻塞,而是跳转到能够继续执行的其他位置继续运行
  • 等输入/输出结束之后,在合适的时机会跳转回来继续执行

  • 程序的同步运行与异步运行


  • 异步编程的优势
  • 仅利用一个进程或线程就能够实现输入/输出密集型任务的并发处理
  • 业务之间发生切换时,没有导致进程或线程的切换,节省了大量的计算机资源开销,从而能够以极高的效率处理并发问题
  • 程序运行于同一线程或进程之中,不需要使用复杂的锁来实现同步

10.5.2 基于生成器的协程

  • 生成器的一个重要的特征是惰性计算,yield 语句返回一个值之后函数的运行被暂停,这就为程序的异步运行提供了可能性

  • 与普通生成器相比,生成器协程的特点有

  • yield 能够在表达式中使用,即可用于发送数据也能用于接收数据
  • 主调函数可以利用send方法向生成器发送数据
  • 可使用close方法和throw方法终止生成器或向生成器抛出异常

  • 利用yield语句接收数据

1
2
3
4
def test_coroutine():
    while True:
        x = yield
        print(f"传入的值为{x}")
1
2
3
4
tc1 = test_coroutine()
tc2 = test_coroutine()
next(tc1)    # 预激活协程tc1
next(tc2)    # 预激活协程tc2
1
tc1.send(0)
1
传入的值为0
1
tc2.send(1)
1
传入的值为1
1
2
tc1.close()
tc2.close()
  • 利用 yield 同时接收并返回数据
1
2
3
4
5
6
7
8
9
def cumulative_average():
    n = 0
    total = 0
    avg = 0
    while True:
        x = yield avg
        total += x
        n += 1
        avg = total/n
1
ca = cumulative_average()
1
next(ca)
1
0
1
ca.send(5)
1
5.0
1
ca.send(15)
1
10.0
1
ca.send(10)
1
10.0
1
ca.close()
  • 生成器协程的状态
  • GEN_CREATED: 生成器刚刚创建出来,等待激活执行
  • GEN_RUNNING: 生成器正在执行中
  • GEN_SUSPENDED: 生成器执行至 yield 语句处暂停
  • GEN_CLOSED: 生成器被终止。
1
2
3
from inspect import getgeneratorstate 
tc = test_coroutine()
getgeneratorstate(tc)
1
'GEN_CREATED'
1
2
next(tc)
getgeneratorstate(tc)
1
'GEN_SUSPENDED'
1
2
tc.close()
getgeneratorstate(tc)
1
'GEN_CLOSED'

10.5.3 协程

  • 生成器协程的使用比较复杂,并且其功能超出了生成器的范畴
  • Python 3.4 加入了asyncio模块,定义了corountine装饰器用于将函数变为协程,才终于使协程与生成器相分离,成为不同的数据类型
  • Python 3.5 中引入了asyncawait两个关键字,替代corountine装饰器和yield from关键字,简化了异步编程

  • 协程函数

  • 使用async关键字定义的函数称为协程函数
  • 调用协程函数返回一个协程对象
  • await
  • 用于使协程交出运行的控制权,只能在协程函数内部使用
  • 在交出控制权的同时等待输入/输出操作结束,并获取返回结果
  • 链式协程
  • 在一个协程中用await语句调用其他的 协程就形成了链式协程

  • 协程的定义和运行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import asyncio

async def coroutine(name):
    print(f'协程 {name} 开始运行...')
    await asyncio.sleep(1)                  # 模拟输入/输出  
    print(f'协程 {name} 运行结束!')
    return f'协程{name}运行结果'

async def main():
    c_A = coroutine('A')                    # 协程A
    c_B = coroutine('B')                    # 协程B
    c_C = coroutine('C')                    # 协程C
    r = await asyncio.gather(c_A, c_B, c_C) # 创建并发任务
    return r

r = asyncio.run(main())
print(r)
输出结果:
1
2
3
4
5
6
7
协程 A 开始运行...
协程 B 开始运行...
协程 C 开始运行...
协程 A 运行结束!
协程 B 运行结束!
协程 C 运行结束!
['协程A运行结果', '协程B运行结果', '协程C运行结果']

10.5.4 Python 异步编程基础

  • 异步编程的关键概念和数据类型
  • 事件循环
    • 是异步程序运行的核心,负责不断收集各种异步编程事件(输入/输出开始和完成),并在一个事件发生之后分配执行权限
    • 使用asyncio.get_event_loop获取当前的事件循环对象
  • Future
    • 表示未完成的异步任务对象
    • 待执行或中断执行的协程对象会被包装成为Future对象
    • 事件循环通过Future对象来实现对协程异步调度
    • Future对象也可 以用于获取异步任务的执行结果
  • Task

    • Future的子类,协程对象通常会被包装为Task然后放入事件循环
    • 使用asyncio.create_task函数创建Task对象
  • 异步编程的步骤

  • 定义协程函数
    • 使用asyncawait关键字
  • 创建协程对象并包装为并发任务
    • 使用loop.create_task方法用协程对象创建任务(Task)并在事件循环中注册
    • 或者利用asyncio.gether函数将多个协程对象包装为并发任务
  • 获取事件循环对象
    • 使用asyncio.get_event_loop函数
  • 运行事件循环
    • 使用事件循环对象的run_until_complete方法
  • 关闭事件循环

10.5.5 利用异步编程处理网络并发

  • socket模块并不支持异步编程
  • 网络套接字对象的输入/输出操作(acceptsendrecv等方法)不能直接用于await语句
  • asyncio模块对socket进行了包装,提供了低级和高级两种类型的网络套接字操作方式

低级异步网络套接字编程

  • 通过事件循环对象的方法对已有的套接字对象进行操作,返回一个可 等待对象从而实现对异步编程的支持
  • 已有的套接字对象必须设置为非 阻塞模式
  • 事件循环对象常用的套接字操作方法
  • sock_connect(sock, address):套接字对象connect方法的异步版本
  • sock_accept(sock):套接字对象accept方法的异步版本
  • sock_recv(sock, nbytes):套接字对象recv方法的异步版本
  • sock_recv_into(sock, buf):套接字对象recv_into方法的异步版本
  • sock_sendall(sock, data):套接字对象sendall方法的异步版本
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from socket import socket
from datetime import datetime
import asyncio

async def service_task(conn, addr, loop):
    print(f'客户端{addr}连接成功,等待输入 ...')
    while True:                                           # 通信循环
        data_recv = (await loop.sock_recv(conn, 1024))    # 异步接收数据
        data_recv = data_recv.decode('utf-8')
        now = datetime.now().strftime("%H:%M:%S")
        print(f'{now} 接收到来自{addr}的数据:{data_recv}')
        send_data = f'接收到长度为 {len(data_recv)} 的数据'
        send_data = send_data.encode('utf-8')
        await loop.sock_sendall(conn, send_data)          # 异步发送数据
        if not data_recv:
            conn.close()
            break
    print(f'客户端{addr}连接结束!')

async def server_routine(server_socket, loop):
    while True:
        conn, addr = await loop.sock_accept(server_socket)# 异步接受连接
        loop.create_task(service_task(conn, addr, loop))

def main():
    server_socket = socket()                              # 创建套接字
    server_socket.bind(('127.0.0.1', 9000))               # 绑定地址
    server_socket.listen()
    print('TCP服务器启动,监听之中... ...')
    server_socket.setblocking(False)  # 设置为非阻塞套接字
    loop = asyncio.get_event_loop()
    routine = server_routine(server_socket, loop)
    loop.run_until_complete(routine)
    loop.close()

if __name__ == '__main__':
    main()

高级异步网络套接字编程

  • 低级套接字操作方式
  • 需要手动将客户端请求创建为任务并在事件循环中注册
  • 处理可能发生的各种异常、缓冲区管理等一系复杂的底层操作
  • 容易出错且运行效率低
  • 事件循环对象中允许直接创建服务器对象(asyncio.Server)
  • 是一个能够异步处理客户端请求的可等待对象
  • 接收到客户端连接请求之后,将读写操作包 装为具有异步通信功能的输入流(asyncio.StreamReader)对象和输出流(asyncio. StreamWriter)对象
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
from asyncio import start_server
from datetime import datetime

async def service_task(reader, writer):
    addr = writer.get_extra_info('peername')
    print(f'客户端{addr}连接成功,等待输入 ...')
    while True:
        data_recv = await reader.read(1024)    # 异步接收数据
        data_recv = data_recv.decode('utf-8')
        now = datetime.now().strftime("%H:%M:%S")
        print(f'{now} 接收到来自{addr}的数据:{data_recv}')
        send_data = f'接收到长度为 {len(data_recv)} 的数据'
        writer.write(send_data.encode('utf-8'))
        await writer.drain()                   # 等待缓冲区刷新
        if data_recv == '':
            break
    writer.close()                             # 关闭输出流
    print(f'客户端{addr}连接结束!')

async def main():
    server = await start_server(service_task,  # 创建服务器对象
                                '127.0.0.1', 9000)
    print(f'TCP服务器启动,监听之中... ...')
    async with server:
        await server.serve_forever()           # 运行服务器

if __name__ == '__main__':
    asyncio.run(main())

10.6 套接字服务器

  • Python 标准库中的socketserver模块实现了一系列套接字服务器类,对多种套接 字的通信过程进行了封装,能够非常容易地实现套接字服务器端

10.6.1 socketserver模块简介

  • socketserver模块中的套接字服务器类

  • 套接字服务器类的实例化需要两个参数
  • server_address:服务器套接字的绑定地址
  • RequestHandlerClass:客户端请求处理器类
  • 套接字服务器常用的方法和属性
  • serve_forever方法:启动套接字服务器
  • shutdown方法:关闭套接字服务器
  • address_family属性:套接字的协议地址簇
  • server_address属性:套接字绑定地址
  • socket属性:套接字对象
  • allow_reuse_address属性:套接字对象是否允许端口重用,默认为False
  • socket_type属性:套接字类型(SOCK_STREAMSOCK_DGRAM)

  • 请求处理器

  • StreamRequestHandler:处理 TCP 请求
  • DatagramRequestHandler:处理 UDP 请求
  • 请求处理器的核心方法为handle
  • 实际应用中需继承请求处理器类,重写handle方法
  • handle方法中可以通过 如下属性获取服务器和客户端的信息
    • self.request:客户端请求的通信套接字对象(在 TCP 和 UDP 中有所差异)
    • self.client_address:客户端地址
    • self.server:套接字服务器对象

10.6.2 利用套接字服务器处理网络并发

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from socketserver import ThreadingTCPServer, StreamRequestHandler
from datetime import datetime

class TCPHandler(StreamRequestHandler):                     # 请求处理器
    def handle(self):
        print(f'客户端{self.client_address}连接成功,等待输入 ...')
        while True:                                         # 通信循环
            data_recv = self.request.recv(1024)             # 接收数据
            data_recv = data_recv.decode('utf-8')
            now = datetime.now().strftime("%H:%M:%S")
            print(f'{now} 接收到来自{self.client_address}的数据:{data_recv}')
            send_data = f'接收到长度为 {len(data_recv)} 的数据'
            self.request.send(send_data.encode('utf-8'))    # 发送数据
            if not data_recv:
                break
        print(f'客户端{self.client_address}连接结束!')

if __name__ == '__main__':
    with ThreadingTCPServer(('127.0.0.1', 9000), TCPHandler) as server:
        print('TCP服务器启动,监听之中... ...')
        server.serve_forever()