消息队列
有两种类型的消息队列 - System V 和 POSIX。它们之间有很多相似之处,也有细微的差别。本文重点介绍 System V,因为它是受更广泛支持的类型。
简单来说,消息队列是消息的链接列表。操作系统可以维护多个已发送消息的列表,每个列表都由唯一的整数标识符引用。消息通过附加到列表来发送,并通过从列表头部弹出来接收。该列表由操作系统内核管理并存储在内存中。列表的内存存储允许异步通信。这意味着通信进程不需要同时与同一队列交互。
在创建或访问消息队列之前,需要确定性地生成唯一密钥。它需要是唯一的,以避免创建队列时出现错误。所有应用程序进程必须使用相同的密钥才能通过同一队列进行通信。生成密钥的推荐方法是调用该ftok
函数。该函数接受文件路径和整数。文件路径必须是现有文件,否则将返回错误。推荐的文件路径可以是应用程序配置文件。ftok
只要文件未被删除并重新创建,该函数将始终返回相同的结果。碰撞可能会发生,但发生的可能性很小。
访问或创建消息队列是使用该msgget
函数完成的。该函数接受一个键和一个标志参数。创建消息队列是通过在标志中指定IPC_CREAT来完成的。消息队列权限在flag参数中定义。该权限与文件权限的格式相同。例如,创建一个消息队列,仅授予有效用户写入权限,而向其他用户授予读取权限,则通过执行msgget(key, IPC_CREAT | 0644)
. 这将创建一个消息队列,其中用户 ID 未设置为队列所有者 ID 的进程可以接收消息但无法发送消息。该msgget
函数返回消息队列标识符,在从队列发送和接收消息时使用该标识符。
您可以通过运行来查看操作系统中的所有消息队列ipcs -q
。
msgsnd
从根本上讲,消息是使用和msgrcv
函数以字节形式发送和接收的。该msgsnd
函数接受函数返回的消息队列 ID msgget
、指向消息结构的指针、消息的大小和整数标志。消息结构需要包含整数消息类型。该msgsnd
函数将消息追加到由第一个参数中指定的 id 标识的队列中。
消息队列有操作系统指定的最大大小,可由用户配置。这意味着msgsnd
如果队列已满,将会阻塞。可以通过IPC_NOWAIT
在标志参数中指定来防止阻塞,这将返回错误。
接收消息是使用该msgrcv
函数完成的。这些函数参数和参数的区别msgsnd
在于消息类型。此类型确定进程是否要读取任何消息 (0)、特定消息类型(正整数)或特定消息组(负整数)。将类型设置为负整数将读取类型小于或等于指定类型的绝对值的任何消息。此函数从队列中删除消息并将其复制到提供的消息缓冲区参数。
msgrcv
如果队列为空/没有指定类型的消息,该函数将默认阻塞。可以通过IPC_NOWAIT
在标志参数中指定来防止这种情况,这会使函数返回错误。在 Linux 中,如果没有该类型的消息,则可以读取队列中的第一条消息(如果MSG_EXCEPT
在标志参数中指定)。
消息队列可以通过该函数删除和配置msgctl
。该函数还允许读取消息队列元数据。
显示代码
此示例将演示使用 Python 中的消息队列进行通信的两个进程。不幸的是,Python 不提供开箱即用的消息队列支持。相反,我使用了优秀的sysv-ipc库。您可以在这里找到 pip 包。
这是客户端代码:
import os
import sysv_ipc
ROUNDS = 100
def run():
path = '/tmp/example'
fd = os.open(path, flags=os.O_CREAT)
os.close(fd)
key = sysv_ipc.ftok(path, 42)
mq = sysv_ipc.MessageQueue(key, flags=sysv_ipc.IPC_CREAT, mode=0o644)
msg_type = 10
i = 0
while i != ROUNDS:
mq.send(b"ping", type=msg_type)
print("Client: Sent ping")
data, _ = mq.receive(type=(msg_type+1))
data = data.decode()
print(f"Client: Received {data}")
i += 1
mq.send(b"end", type=msg_type)
run()
(可选)创建临时文件以确保其存在。使用文件路径和整数调用 ftok 函数。(可选)创建并访问消息队列。这将返回一个消息队列对象。运行一个循环,其中发送包含字节字符串且其类型设置为msg_type 的消息。然后接收到不同类型的消息。必须指定不同的消息类型以防止进程接收其发送的消息。接收到的消息被解码并打印,然后循环继续。当发送ROUNDS消息时,循环结束。之后,发送结束消息以表示客户端已完成。
发送和接收的消息是字节字符串,而不是常规字符串,因此必须进行相应的编码和解码。
这是服务器代码
import os
import sysv_ipc
def run():
path = '/tmp/example'
fd = os.open(path, flags=os.O_CREAT) # create file
os.close(fd)
key = sysv_ipc.ftok(path, 42)
mq = sysv_ipc.MessageQueue(key, flags=sysv_ipc.IPC_CREAT, mode=0o644)
msg_type = 10
data, _ = mq.receive(type=msg_type)
data = data.decode()
while data != 'end':
print(f"Server: Received {data}")
mq.send(b"pong", type=(msg_type+1))
print(f"Server: Sent pong")
data, _ = mq.receive(type=msg_type)
data = data.decode()
os.unlink(path)
mq.remove()
run()
服务器代码与客户端代码类似地设置消息队列。由于ftok
调用时使用的参数与客户端代码中的参数完全相同,因此生成的键值将相同。从消息队列接收到msg_type类型的消息。运行一个循环,检查接收到的消息是否不等于end。在此循环内,打印接收到的消息,并将不同类型的消息发送到队列。然后消息被接收并解码。
一旦收到结束消息,循环就会停止。生成密钥的文件和消息队列被删除。然后程序退出。
表现
消息队列非常快。IPC-Bench在运行 Ubuntu 20.04.1 LTS 的 Intel(R) Core(TM) i5-4590S CPU @ 3.00GHz 上进行了每秒 213,796 条 1KB 消息的基准测试。这对于大多数进程间通信需求来说已经足够快了。
演示代码
您可以在GitHub上的 UDS 上找到我的代码。
结论
消息队列是一种熟悉的通信模式。使用它们来满足您的单向或双向进程间通信需求是一个不错的选择。
下一篇文章将介绍一种称为共享内存的超快 IPC 机制。在那之前,照顾好自己并保持水分!✌
评论已关闭