国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 數(shù)據(jù)庫(kù) > Redis > 正文

使用 Redis 流實(shí)現(xiàn)消息隊(duì)列的代碼

2020-10-28 21:28:52
字體:
供稿:網(wǎng)友

在介紹了 Redis 流的基本功能之后, 現(xiàn)在是時(shí)候使用這些功能來構(gòu)建一些實(shí)際的應(yīng)用了。 消息隊(duì)列作為流的典型應(yīng)用之一, 具有非常好的示范性, 因此我們將使用 Redis 流的相關(guān)功能構(gòu)建一個(gè)消息隊(duì)列應(yīng)用, 這個(gè)消息隊(duì)列跟我們之前使用其他 Redis 數(shù)據(jù)結(jié)構(gòu)構(gòu)建的消息隊(duì)列具有相似的功能。

代碼清單 10-1 展示了一個(gè)具有基本功能的消息隊(duì)列實(shí)現(xiàn):

  • 代碼最開頭的是幾個(gè)轉(zhuǎn)換函數(shù), 它們負(fù)責(zé)對(duì)程序的相關(guān)輸入輸出進(jìn)行轉(zhuǎn)換和格式化;
  • MessageQueue 類用于實(shí)現(xiàn)消息隊(duì)列, 它的添加消息、移除消息以及返回消息數(shù)量三個(gè)方法分別使用了流的 XADD 命令、 XDEL 命令和 XLEN 命令;
  • 消息隊(duì)列的兩個(gè)獲取方法 get_message() 和 get_by_range() 分別以兩種形式調(diào)用了流的 XRANGE 命令;
  • 最后, 用于迭代消息的 iterate() 方法使用了 XREAD 命令對(duì)流進(jìn)行迭代。

代碼清單 10-1 使用 Redis 流實(shí)現(xiàn)的消息隊(duì)列: /stream/message_queue.py

def reconstruct_message_list(message_list):  """  為了讓多條消息能夠以更結(jié)構(gòu)化的方式返回給調(diào)用者,  將 Redis 返回的多條消息從原來的格式:  [(id1, {k1:v1, k2:v2, ...}), (id2, {k1:v1, k2:v2, ...}), ...]  轉(zhuǎn)換成以下格式:  [{id1: {k1:v1, k2:v2, ...}}, {id2: {k1:v1, k2:v2, ...}}, ...]  """  result = []  for id, kvs in message_list:    result.append({id: kvs})  return resultdef get_message_from_nested_list(lst):  """  從嵌套列表中取出消息本體。  """  return lst[0][1]class MessageQueue:  """  使用 Redis 流實(shí)現(xiàn)的消息隊(duì)列。  """  def __init__(self, client, stream_key):    self.client = client    self.stream = stream_key  def add_message(self, key_value_pairs):    """    將給定的鍵值對(duì)存入到消息里面,并返回相應(yīng)的消息 ID 。    """    return self.client.xadd(self.stream, key_value_pairs)  def get_message(self, message_id):    """    根據(jù)給定的消息 ID 返回相應(yīng)的消息,如果消息不存在則返回 None 。    """    reply = self.client.xrange(self.stream, message_id, message_id)    if len(reply) == 1:      return get_message_from_nested_list(reply)  def remove_message(self, message_id):    """    根據(jù)給定的消息 ID 刪除相應(yīng)的消息,如果消息不存在則忽略該動(dòng)作。    """    self.client.xdel(self.stream, message_id)  def len(self):    """    返回消息隊(duì)列的長(zhǎng)度。    """    return self.client.xlen(self.stream)  def get_by_range(self, start_id, end_id, max_item=10):    """    根據(jù)給定的 ID 區(qū)間范圍返回隊(duì)列中的消息。    """    reply = self.client.xrange(self.stream, start_id, end_id, max_item)    return reconstruct_message_list(reply)  def iterate(self, start_id=0, max_item=10):    """    對(duì)消息隊(duì)列進(jìn)行迭代,返回最多 N 條大于給定 ID 的消息。    """    reply = self.client.xread({self.stream: start_id}, max_item)    if len(reply) == 0:      return list()    else:      messages = get_message_from_nested_list(reply)      return reconstruct_message_list(messages)

對(duì)于這個(gè)消息隊(duì)列實(shí)現(xiàn), 我們可以通過執(zhí)行以下代碼, 創(chuàng)建出它的實(shí)例:

>>> from redis import Redis>>> from message_queue import MessageQueue>>> client = Redis(decode_responses=True)>>> mq = MessageQueue(client, "mq")

然后通過執(zhí)行以下代碼, 向隊(duì)列里面添加十條消息:

>>> for i in range(10):...  key = "key{0}".format(i)...  value = "value{0}".format(i)...  msg = {key:value}...  mq.add_message(msg)...'1554113926280-0''1554113926280-1''1554113926281-0''1554113926281-1''1554113926281-2''1554113926281-3''1554113926281-4''1554113926281-5''1554113926281-6''1554113926282-0'

還可以根據(jù) ID 獲取指定的消息, 又或者使用 get_by_range() 方法同時(shí)獲取多條消息:

>>> mq.get_message('1554113926280-0'){'key0': 'value0'}>>> mq.get_message('1554113926280-1'){'key1': 'value1'}>>> mq.get_by_range("-", "+", 3)[{'1554113926280-0': {'key0': 'value0'}}, {'1554113926280-1': {'key1': 'value1'}}, {'1554113926281-0': {'key2': 'value2'}}]

又或者使用 iterate() 方法對(duì)消息隊(duì)列進(jìn)行迭代, 等等:

>>> mq.iterate(0, 3)[{'1554113926280-0': {'key0': 'value0'}}, {'1554113926280-1': {'key1': 'value1'}}, {'1554113926281-0': {'key2': 'value2'}}]>>> mq.iterate('1554113926281-0', 3)[{'1554113926281-1': {'key3': 'value3'}}, {'1554113926281-2': {'key4': 'value4'}}, {'1554113926281-3': {'key5': 'value5'}}]

總結(jié)

以上所述是小編給大家介紹的使用 Redis 流實(shí)現(xiàn)消息隊(duì)列的代碼,希望對(duì)大家有所幫助,如果大家有任何疑問請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)武林網(wǎng)網(wǎng)站的支持!
如果你覺得本文對(duì)你有幫助,歡迎轉(zhuǎn)載,煩請(qǐng)注明出處,謝謝!

發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 嘉义市| 福清市| 平遥县| 全州县| 通州市| 象山县| 岳阳市| 通化县| 封开县| 汽车| 临夏县| 陵川县| 广西| 济南市| 集贤县| 固安县| 界首市| 琼中| 寻乌县| 体育| 黄石市| 特克斯县| 固镇县| 务川| 阳高县| 灌云县| 漳浦县| 汤原县| 仙居县| 定边县| 北辰区| 五寨县| 岳池县| 喀什市| 图木舒克市| 电白县| 竹溪县| 沁阳市| 思茅市| 馆陶县| 富裕县|