消息队列究竟是个什么鬼?
本文简单的介绍了一下RabbitMQ、应用场景以及使用Java和Python操作MQ的简单例子。
01 什么是RabbitMQ?
RabbitMQ 是一个基于AMQP协议,服务端用Erlang开发的消息队列,支持Java、Python、Ruby、.NET、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。所谓消息队列就是用来实现系统与系统之间,程序与程序之间进行通信的中间件。整体来看是一个异步的过程,由生产者(Publish)来生产消息,这个消息会被先放到一个容器中,当满足一定条件时,这个消息会被消费者(Subscribe )拿走去消费。这个容器就是队列。生产者和消费者之间遵守的协议就是AMQP协议。其次还可以对消费者设置一个优先级(Priority),以及对消费者的请求进行限流,对负载进行有效均衡。
02 应用场景有哪些?
1、用户注册时需要发送验证邮箱或者短信验证。使用消息队列之后,应用程序只需要关心注册成功即可。不需要等待邮件或者短信发送成功的响应。因为这个是由消费者去完成的。
2、电商系统,用户下单消费成功之后,对应的库存需要进行更新。我们可以调用库存系统提供的接口,但是这样如果库存系统出现故障就会导致库存不能准确的更新。而且耦合性非常高。我们可以通过消息队列进行解耦,而且消息队列具有持久化功能。保证数据的准确性
3、秒杀系统,可以通过消息队列过滤掉部分请求,缓解服务的压力。
4、注册、日志、监控系统等大部分只要求最终一致性的场景
03 消息队列常见关键词
AMQP的核心是Producer(消息生产者)、Broker(消息队列的服务器实体)、Consumer(消息消费者)
Producer/Consumer概念比较好理解,无非就是一个生产者创建一个信息去由消费者去进行相关的逻辑处理。
Broker消息队列的服务器,一个Broker可以包含多个VirtualHost(虚拟主机),主要起到了一个隔离的作用。 而一个VirtualHost又包括以下三部分
Exchange(交换机):由它按照某些规则 去决定消息最终路由到哪个队列。
Binding:绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来。如果没有bind,消息会直接被丢掉。
Queue:存储消息的地方,每个消息都会被投入到一个或多个队列。。
整体流程如下图(源自网络)
04 Java操作RabbitMQ
1、创建链接工厂的工具类
importcom.rabbitmq.client.ConnectionFactory;
public class RabbitFactory { public ConnectionFactory getFactory(){
// 创建链接工厂 ConnectionFactory factory = newConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
returnfactory;
}
}2、生产者
importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
importjava.io.IOException;
importjava.util.concurrent.TimeoutException;
/**
* 消息生产者
*/ public classProducer {
/**
* 声明队列名
*/ public final static String QUEUENAME = "rabbitMQ";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接工厂 RabbitFactory rabbitFactory = newRabbitFactory();
ConnectionFactory factory = rabbitFactory.getFactory();
// 创建一个新的链接Connection connection = factory.newConnection();
// 创建一个通道Channel channel = connection.createChannel();
// 声明一个队列 channel.queueDeclare(QUEUENAME, false, false, false, null);
//发送一个消息至队列中 channel.basicPublish("", QUEUENAME, null,"Hello World".getBytes());
System.out.println("Producer Send Message Over");
// 关闭通道和连接channel.close();
connection.close();
}
}3、消费者代码
importcom.rabbitmq.client.*;
importjava.io.IOException;
importjava.util.concurrent.TimeoutException;
public class Customer{
/**
* 声明队列名
*/ public final static String QUEUENAME = "rabbitMQ";
public static void main(String[] args) throws IOException, TimeoutException{
RabbitFactory rabbitFactory = newRabbitFactory();
ConnectionFactory factory = rabbitFactory.getFactory();
// 创建一个新的链接Connection connection = factory.newConnection();
// 创建一个通道Channel channel = connection.createChannel();
// 声明一个队列 channel.queueDeclare(QUEUENAME, false, false, false, null);
//创建一个消费者 Consumer consumer = newDefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException{
String message = new String(body, "UTF-8");
System.out.println("[x]Received : "+ message);
}
};
//channel绑定队列、消费者、autoAck为true表示一旦收到消息则自动回复确认消息 channel.basicConsume(QUEUENAME, true, consumer);
}
}05 Python操作RabbitMQ
1、定义一个生产者
importpika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=127.0.0.1))
# 创建频道channel = connection.channel()
# 声明一个消息队列channel.queue_declare(mq_test)
# 消息不能直接到达queue,需要经过exchange,exchange= 表示使用默认的exchangechannel.basic_publish(exchange=, routing_key=mq_test, body=hello world)
print(push end)
# 关闭发送消息的mq连接connection.close()2、定义一个消费者
importpika
# 连接mqconnection = pika.BlockingConnection(pika.ConnectionParameters(host=127.0.0.1, port=5672))
# 创建一个通道channel = connection.channel()
# 声明mq_test的队列,如果不存在 则自动创建channel.queue_declare(queue=mq_test)
# 定义一个回调函数,打印收到的信息def callback(ch, method, properties, body): print(receive msg: %s% body)
# 指明从哪个队列(queue)接收message# no_ack=True,表示不对消息进行确认channel.basic_consume(callback, queue=mq_test, no_ack=True)
print(waiting for msg)
channel.start_consuming()扫一扫,关注我们