点击上方 Java后端,选择 设为星标
优质文章,及时送达
作者 | 卓庆森
如何去写一手好SQL ?
-
使用消息队列场景和好处
-
使用消息队列会带来什么问题,有什么解决方案
-
如何使用MQ(以ActiveMQ为例的简单例子)
1.消息队列的应用场景和好处
-
异步-流量削峰
响应速度(用户体验感)得到大幅改善。
-
异步-系统解耦
2.使用消息队列会带来什么问题
-
可用性降低:在加入MQ之前,你不用考虑MQ服务器挂掉的情况,引入MQ之后你就需要去考虑了,可用性降低。
-
复杂性提高:加入MQ之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等问题。因此需要考虑的东西更多,系统复杂性增大。
-
数据一致性:消息队列带来的异步确实可以提高系统响应速度,但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了。
-
对于可用性问题
为了用而用(显得技术复杂一下,好忽悠多收点钱),对于这个问题,需要对MQ集群技术有比较深刻的理解,各种消息中间件的集群方式不同,下面以ActiveMq的集群为例(Zookeeper+ActiveMq),先看图
以达到“总是访问主服务器”的目的。
当“主”服务器发生故障,zookeeper从指定目录下删除对应的临时节点,同时可以通知关心这一变化的所有客户端,高效且迅速的传播这一信息。当下个请求来的时候,还是连接zookeeper,但是此时其实是访问备用的MQ。
-
对于复杂性问题
-
如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
-
如果你拿到这个消息做redis的set的操作,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
-
如果上面两种情况还不行,准备一个第三服务方来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。
-
rabbitmq:拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理
-
kafka:一个topic,一个partition,一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue即可
-
Push模型实时性好,但是因为状态维护等问题,难以应用到消息中间件的实践中,因为
-
在Broker端需要维护Consumer的状态,不好适用于Broker去支持大量的Consumer的场景
Consumer的消费速度是不一致的,Broker进行推送难以处理不同的Consumer的状况
Broker难以应对Consumer无法消费消息的情况,因为不知道Consumer的宕机是短暂的还是永久的)
另外推送消息(量可能会很大)也会加重Consumer的负载或者压垮Consumer。
如果对应只有1个Consumer,用push比pull好。
-
Pull模式实现起来会相对简单一些,但是实时性取决于轮训的频率,在对实时性要求高的场景不适合使用。
3.如何使用MQ(以ActiveQM为例)
public class JMSProducer {
private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
private static final int SENDNUM=10; // 发送的消息数量
public static void main(String[] args) {
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageProducer messageProducer; // 消息生产者
// 实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
try {
connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
// destination=session.createQueue("FirstQueue1"); // 创建消息队列
destination=session.createTopic("FirstTopic1");
messageProducer=session.createProducer(destination); // 创建消息生产者
sendMessage(session, messageProducer); // 发送消息
session.commit();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
/**
* 发送消息
* @param session
* @param messageProducer
* @throws Exception
*/
public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
for(int i=0;i<JMSProducer.SENDNUM;i++){
TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
System.out.println("发送消息:"+"ActiveMQ 发布的消息"+i);
messageProducer.send(message);
}
}
}
/**
* 消息监听-订阅者一
* @author Administrator
*
*/
public class Listener implements MessageListener{
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
System.out.println("订阅者一收到的消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public class JMSConsumer {
private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageConsumer messageConsumer; // 消息的消费者
// 实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
try {
connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
// destination=session.createQueue("FirstQueue1"); // 创建连接的消息队列
destination=session.createTopic("FirstTopic1");
messageConsumer=session.createConsumer(destination); // 创建消息消费者
messageConsumer.setMessageListener(new Listener()); // 注册消息监听
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
我认为一个优秀的分布式消息队列,应该具备以下的能力:高吞吐、低时延(因场景而异),传输透明,伸缩性强,有冗灾能力,一致性顺序投递,同步+异步的发送方式,完善的运维和监控工具和开源
作者 | 卓庆森
链接 | www.cnblogs.com/zhuoqingsen/p/MQ.html
编辑 | Java架构师之路(id:Java-Road)
荐
阅
读
在看
本文分享自微信公众号 - Java后端(web_resource)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。