title: Python 操作redis消息队列 多进程消费 date: 2022-02-17 09:59:10 categories:
- IT技术
- 编程语言
- python tags:
- IT技术
- 编程语言
- python
摘要:生产端 javascript:void%280%29; import json import redis #以下代码是向redis 发命令 QUEUE = “code” # 队列名称key redisPool = redis.ConnectionPool(host=config.get_redis_host(), port=6379, db=config.g
Python 操作redis消息队列 多进程消费
生产端
import json
import redis
# 以下代码是向redis 发命令
QUEUE = "code" # 队列名称key
# redisPool = redis.ConnectionPool(host=config.get_redis_host(), port=6379, db=config.get_redis_db())
redisPool = redis.ConnectionPool(host='localhost', port=6379, db=8)
client = redis.Redis(connection_pool=redisPool)
def send_cmd(seaweed):
json_cmd = json.dumps(seaweed, ensure_ascii=False)
client.rpush(QUEUE, json_cmd)
ll = list(range(100))
# get_weekend('20180325')})
if __name__ == "__main__":
for k in ll:
send_cmd({"label": k, 'timd': 20160503, 'timm': 20170430})
消费端多进程消费
import chardet
import json
import multiprocessing
import redis
# 以下代码是向redis 发命令
QUEUE = "code"
# redisPool = redis.ConnectionPool(host=config.get_redis_host(), port=6379, db=config.get_redis_db())
redisPool = redis.ConnectionPool(host='localhost', port=6379, db=8)
client = redis.Redis(connection_pool=redisPool)
# 以下代码是向redis 取命令,并且采用多进程来实现计算
def func(a, b, c):
print(a, b)
def worker(pname):
client = redis.Redis(connection_pool=redisPool)
# client_ = redis.ConnectionPool(host='localhost', port=6379, db=8)
while True:
# print(client)
# print(cmd)
try:
cmd = client.lpop(QUEUE)
encode1 = chardet.detect(cmd)["encoding"]
cmd = cmd.decode(encode1)
except:
cmd = None
if cmd is None:
return
else:
cmd = format_cmd(cmd)
try:
func(cmd["label"], cmd['timd'], cmd['timm'])
# price_fix.update(cmd["city"], cmd["region"], cmd["name"])
# print(pname + ":", cmd, "计算成功")
except Exception as ex:
print(ex)
print(pname + ":", cmd, "计算失败")
def format_cmd(cmd):
return json.loads(cmd)
if __name__ == "__main__":
# 多进程消费
pro_num = 5
pool = multiprocessing.Pool(processes=pro_num)
for pid in range(1, pro_num):
pid = "PROC" + str(pid).zfill(3)
pool.apply_async(worker, (pid,))
pool.close()
pool.join()