kombu 的 serializer 选择 json 和 yaml 有什么区别?

2年前 (2022) 程序员胖胖胖虎阿
251 0 0

前言

kombu 投递 message 的时候,支持多种序列化方式:

  • json
  • yaml
  • pickle

今天的主题就是,看看一个 dict 分别被这三种序列化方式序列化后,message 会长成什么样子(以 rabbitmq 管理界面的样子为准)

实验

先准备一个 dict ,value 的 type 有 str、int、datetime、sub dict 等多种类型

data = {
    'name': 'jike',
    'age': 18,
    'birthday': get_utc_now_timestamp(),
    'score': {
        'math': 100,
        'science': 99.5,
        'english': 59
    }
}

我们先来看看不指定 serializer 是一个什么结果:

with Connection(amqp_uri) as conn:
    with conn.channel() as channel:
        started_at = time.time()
        message = Message(channel=channel, body=data)

        producer = Producer(
            channel,
            exchange=imdb_exchange
        )

        res = producer.publish(
            body=message.body,
            routing_key='to_refresh',
            headers=message.headers
        )
        ended_at = time.time()
        logger.debug(f'pay time {ended_at-started_at} s')

可以看到,因为 message 的 body 是 dict,所以 kombu 即便在 serializer 缺失的情况下,也选择按照 json 进行序列化

如何得出 『按照 json 进行序列化』的结论的?因为看 message header 里面的 『content_type』属性:content_type:application/json

kombu 的 serializer 选择 json 和 yaml 有什么区别?

那如果我们选择 json 呢?

with Connection(amqp_uri) as conn:
    with conn.channel() as channel:
        started_at = time.time()
        message = Message(channel=channel, body=data)

        producer = Producer(
            channel,
            exchange=imdb_exchange
        )

        res = producer.publish(
            body=message.body,
            routing_key='to_refresh',
            headers=message.headers,
            serializer='json'
        )
        ended_at = time.time()
        logger.debug(f'pay time {ended_at-started_at} s')

可以看到,和上面的没有区别,都是 content_type: application/json
kombu 的 serializer 选择 json 和 yaml 有什么区别?

那如果我们选择 yaml 呢?

with Connection(amqp_uri) as conn:
    with conn.channel() as channel:
        started_at = time.time()
        message = Message(channel=channel, body=data)

        producer = Producer(
            channel,
            exchange=imdb_exchange
        )

        res = producer.publish(
            body=message.body,
            routing_key='to_refresh',
            headers=message.headers,
            serializer='yaml'
        )
        ended_at = time.time()
        logger.debug(f'pay time {ended_at-started_at} s')

可以看到,我们传递的 dict 对象,变成了 yaml 文本

kombu 的 serializer 选择 json 和 yaml 有什么区别?

再来看看 pickle:

可以看到,此时,我们看不出 body 了,因为 pickle 是一种二进制序列化方式

kombu 的 serializer 选择 json 和 yaml 有什么区别?

完整代码:

from kombu import Exchange, Queue
from kombu import Connection
from kombu.messaging import Producer
from kombu.transport.base import Message

from kombu import Exchange, Queue
from loguru import logger
import time
from datetime import datetime, timedelta, timezone


def get_min_utc_timestamp() -> datetime:
    return (datetime(year=1970, month=1, day=1) + timedelta(seconds=1)).replace(tzinfo=timezone.utc)


def get_utc_now_timestamp() -> datetime:
    """ https://blog.csdn.net/ball4022/article/details/101670024 """
    return datetime.utcnow().replace(tzinfo=timezone.utc)


amqp_uri = 'amqp://pon:pon@192.168.31.245:5672//'


def declare_exchange(exchange: Exchange):
    with Connection(amqp_uri) as conn:
        with conn.channel() as channel:
            exchange.declare(channel=channel)


def declare_queue(queue: Queue):
    with Connection(amqp_uri) as conn:
        with conn.channel() as channel:
            queue.declare(channel=channel)


imdb_exchange = Exchange('imdb', type='fanout')
declare_exchange(exchange=imdb_exchange)

imdb_queue = Queue('imdb_refresh', imdb_exchange,
                   routing_key='to_refresh', durable=True)
declare_queue(queue=imdb_queue)


data = {
    'name': 'jike',
    'age': 18,
    'birthday': get_utc_now_timestamp(),
    'score': {
        'math': 100,
        'science': 99.5,
        'english': 59
    }
}

with Connection(amqp_uri) as conn:
    with conn.channel() as channel:
        started_at = time.time()
        message = Message(channel=channel, body=data)

        producer = Producer(
            channel,
            exchange=imdb_exchange
        )

        res = producer.publish(
            body=message.body,
            routing_key='to_refresh',
            headers=message.headers,
            serializer='pickle'
        )
        ended_at = time.time()
        logger.debug(f'pay time {ended_at-started_at} s')

参考资料:
kombu doc: serialization

相关文章

暂无评论

暂无评论...