Python Publisher and Observer Pattern Example

Posted by 韩同学的笔记本 on March 20, 2020

Python 课的作业,大致是实现一个基于 socket 的“观察-发布者模型”。主要目的是练习 Python 的套接字编程。

选做题2 实现设计模式中的观察者模式的网络通信版本

关于观察者模式说明:

本题要是考察网络编程知识,所以跟上周作业相比简化了publish和reader的功能,大家仔细看题干

观察者模式也被称为发布-订阅(Publish/Subscribe)模式。当这个主题对象状态变化时,会通知所有观察者对象并作出相应处理逻辑。

在本问题中,可以形象地理解成,Publisher为疫情消息发布中心,Reader为订阅疫情消息的人。只需要实现订阅功能,Publisher可以发布每日新增死亡、出院这两种消息,需要推送给所有订阅者。

Publisher(发布者)

  • 根据订阅者发送过来的订阅信息维护订阅者列表(注意去重)
  • 按照日期顺序发布每日新增两类病人消息(死亡、出院)给所有已订阅的读者

Reader(订阅者)

  • 订阅发布者
  • 接受发布者发来的消息,每接受到一条消息,输出自订阅起接收到的两类病人的总数。输出格式为:自订阅以来,死亡患者共x人,出院患者共y人。

关于网络通信说明:

在本地利用不同端口模拟通信,通信的消息格式可以自己设置,只要达到目的即可。按照模版完成代码测试没问题后将代码补充到模版相应位置,最后只需要提交本份作业文件即可

利用 C/S架构 实现网络通信,在本问题中发布者和订阅者互为客户端和服务端。具体而言:

Publisher(发布者) - 订阅功能时作为服务端,等待接收订阅者订阅请求,将订阅者信息保存在本地文档中,并返回订阅成功的确认信息 - 发送消息时作为客户端,从本地文档读取订阅者,根据订阅者信息发送每日消息(假设读取订阅者信息的瞬间不会发生新的订阅请求)。

Reader(订阅者) - 订阅功能时作为客户端,将自身接受消息服务的ip+port信息发送给发布者 - 发布消息时作为服务端,接受到发布者的信息

完成代码后请进行下述模拟流程进行测试:

  • 发布者:
    • 实例化一个发布者P1对象开启订阅服务,p1始终保持运行
    • 实例化一个发布者P2,读取订阅者消息,作为客户端,向各订阅者发送消息(注意订阅者是在动态变化的)
  • 订阅者:
    • 实例化订阅者R1作为发布订阅请求,在收到订阅成功的消息后后立即开启接受信息服务,监听来自发布者的消息
    • 在发布者P2发布若干消息后,实例化订阅者R1作为发布订阅请求,在收到订阅成功的消息后后立即开启接受信息服务,监听来自发布者的消息

让我们来看一下 DEMO:

DEMO

以下是我的实现。

实现

Publisher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import socket
import time
from threading import Thread
import pickle
import os

STARTPORT = 16666

# 发布者
class Publisher(object):
    PORT = STARTPORT

    def __init__(self, file_path, host, *args,**kwargs):
        port = Publisher.PORT
        Publisher.PORT += 1
        self.file_path = file_path
        if os.path.exists(self.file_path):
            with open(self.file_path, 'rb') as f:
                self.clients = pickle.load(f)
            #self.clients = set()
        else:
            self.clients = set()
            with open(self.file_path, 'wb') as f:
                pickle.dump(self.clients, f)
        print('初始化服务器主机信息')
        self.address = (host, port)
        print("初始化服务器主机套接字对象......")
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        print('绑定的主机信息......')
        self.socket.bind(self.address)
        print(self.address)
        self.socket.listen()
        self.tot = [0,0]
        pass

    def subscribe_server(self, conn, addr, *args,**kwargs):
        op, send_to = pickle.loads(conn.recv(128))
        if op == 'subscribe':
            self.clients.add(send_to)
            print(send_to, 'subscribed!')
        elif op == 'unsubscribe':
            self.clients.remove(send_to)
            print(send_to, 'unsubscribed!')
        with open(self.file_path, 'wb') as f:
            pickle.dump(self.clients, f)
        conn.close()

    def notify(self, *args,**kwargs):
        # Publisher向订阅者发布消息
        print('start notifying ...')
        print(self.clients)
        for addr in self.clients:
            s = socket.socket()
            s.connect(addr)
            s.send(pickle.dumps((self.tot)))
            print('notified', addr)
            s.close()
        print('end notifying.')
    
    def publisher_wait(self):
        while True:
            conn, addr = self.socket.accept()
            self.subscribe_server(conn, addr)
        
    def add_number(self, typ, num):
        self.tot[typ] += num


if __name__ == '__main__':
    publisher = Publisher('server.dat', '127.0.0.1')

    p = Thread(target=publisher.publisher_wait)
    p.start()
    while True:
        lst = input('(a)dd <type> <number>/ (n)otify / (q)uit: ').split()
        if len(lst) == 0:
            continue
        if lst[0] == 'a':
            typ = int(lst[1])
            num = int(lst[2])
            publisher.add_number(typ, num)
        elif lst[0] == 'n':
            publisher.notify()
        elif lst[0] == 'q':
            break
    publisher.socket.close()
    p.join(timeout=5)
    print('QUIT')

    #publisher.publisher_wait()
    #publisher.add_number(1,12)
    #publisher.notify()
    #publisher.socket.close()

Reader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import socket
import time
from threading import Thread
import pickle
import os
import time

STARTPORT = 17777

# 读者基类
class Reader(object):
    PORT = STARTPORT

    def __init__(self, host, *args,**kwargs):
        # Reader 的初始化方法
        port = Reader.PORT
        Reader.PORT += 1
        self.addr = (host,port)
        print("初始化服务器主机套接字对象......")
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        print('绑定的主机信息......')
        print(self.addr)
        self.socket.bind(self.addr)
        self.socket.listen()
        pass
        
    def subscribePublisher(self,host,port):
        s = socket.socket()
        addr = (host, port)
        print('connecting to', addr)
        s.connect(addr)
        s.send(pickle.dumps(('subscribe', self.addr)))
        s.close()
        print('end subscribePublisher')

    def unsubscribePublisher(self, host, port):
        addr = (host, port)
        s = socket.socket()
        s.connect(addr)
        s.send(pickle.dumps(('unsubscribe', self.addr)))
        s.close()

    def receive_server(self, conn, addr):
        tot = pickle.loads(conn.recv(128))
        conn.close()
        print("自订阅以来,死亡患者共{sw}人,出院患者共{cy}人。".format(sw=tot[0], cy=tot[1]))
        pass
    
    def reader_wait(self):
        #print('waiting')
        while True:
            conn, addr = self.socket.accept()
            print('got information')
            self.receive_server(conn, addr)

if __name__ == '__main__':
    reader = Reader('127.0.0.1')
    p = Thread(target=reader.reader_wait)
    p.start()
    while True:
        lst = input('(s)ubscribe <host> <port> / (u)nsubscribe <host> <port> / (q)uit: ').split()
        if len(lst) == 0:
            continue
        if lst[0] == 's':
            print('subscribing...')
            host = lst[1]
            port = int(lst[2])
            reader.subscribePublisher(host, port)
        elif lst[0] == 'u':
            host = lst[1]
            port = int(lst[2])
            reader.unsubscribePublisher(host, port)
        elif lst[0] == 'q':
            break
    reader.socket.close()
    p.join(timeout=5)
    print('QUIT')

    #reader.subscribePublisher('127.0.0.1', 16666)
    #reader.reader_wait()
    #reader.socket.close()