一、声明一个交换机
- com.rabbitmq.client.Channel#exchangeDeclare()
/** * Declare an exchange. * @see com.rabbitmq.client.AMQP.Exchange.Declare * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk * @param exchange 交换机名称 * @param type 交换机类型 * @param durable 持久化,消息代理重启后交换机依然存在 * @param autoDelete 在服务器不再使用这个交换机(没有队列和与其绑定时)删除 * @param arguments 其他属性 * @return a declaration-confirm method to indicate the exchange was successfully declared * @throws java.io.IOException if an error is encountered */ Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; 复制代码
二、声明一个队列
- com.rabbitmq.client.Channel#queueDeclare()
/** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue 队列名称 * @param durable 持久化,当消息代理重启后队列依然存在 * @param exclusive 独占队列,只能被声明这个队列的Connection使用, * 当声明这个队列的Connection关闭或者丢失的时候这个队列将会被删除 * @param autoDelete 当最后一个消费者断开连接之后队列是否自动被删除 * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; 复制代码
- 声明一个临时队列
/** * 例如日志系统,当消费每次连接到MQ的时候都希望接收到最新的消息 * 每次连接RabbitMQ的时候都会创建一个新的、空的、 * 非持久化、专有的(exclusive)、自动删除的、名字随机生成的队列。 */ String queueName = channel.queueDeclare().getQueue(); 复制代码
三、绑定队列
- com.rabbitmq.client.Channel#queueBind()
/** * Bind a queue to an exchange. * @see com.rabbitmq.client.AMQP.Queue.Bind * @see com.rabbitmq.client.AMQP.Queue.BindOk * @param queue 队列名称 * @param exchange 交换机名称 * @param routingKey 路由关键字 * @param arguments other properties (binding parameters) * @return a binding-confirm method if the binding was successfully created * @throws java.io.IOException if an error is encountered */ Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException; 复制代码
四、发送消息
-
com.rabbitmq.client.Channel#basicPublish()
/** * Publish a message. * @see <a href="https://www.rabbitmq.com/alarms.html">Resource-driven alarms</a> * @param exchange 要投递消息得交换机 * @param routingKey 路由关键字 * @param mandatory 强制的 * @param immediate 立即的,RabbitMQ不支持此标志 * set. Note that the RabbitMQ server does not support this flag. * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */ void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException; 复制代码
mandatory
:当mandatory标志位设置为true时,如果消息不能被路由,例如,exchange根据自身类型和消息routingKey无法找到一个符合条件的queue,那么broker会通过一个AMQP.Basic.Return
命令将消息返回给生产者;当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。
五、接收消息
- com.rabbitmq.client.Channel#basicConsume()
/** * @param queue 队列名称 * @param autoAck 是否自动确认 * @param arguments 参数 * @param deliverCallback 消息到达时的回调函数 * @param cancelCallback 消费者被取消的回调 * @param shutdownSignalCallback 当 channel/connection 关闭时回调 * @return 返回一个消息代理生成的consumerTag * @throws IOException if an error is encountered * @see com.rabbitmq.client.AMQP.Basic.Consume * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk * @see #basicAck * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) * @since 5.0 */ String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; 复制代码
六、消息回调(DeliverCallback)
@FunctionalInterface
public interface DeliverCallback {
void handle(String consumerTag, Delivery message) throws IOException;
}
复制代码
七、Return模式
-
com.rabbitmq.client.Channel#addReturnListener(ReturnCallback)
/** * 实现此接口,以便在调用basicPublish时, * 当设置了"mandatory"或"immediate"标志时,通知失败的交付。 * 要获得更简单的、面向lambda的语法,请首选此接口而不是{@link ReturnListener}。 * @see Channel#basicPublish * @see ReturnListener * @see Return */ @FunctionalInterface public interface ReturnCallback { void handle(Return returnMessage); } 复制代码
八、Confirm模式
- com.rabbitmq.client.Channel#confirmSelect()
- com.rabbitmq.client.Channel#addConfirmListener()
复制代码
九、限流控制
/**
* Request specific "quality of service" settings.
*
*这些设置限制了服务器在需要确认之前向消费者交付的数据量。
* 因此,它们提供了一种由消费者发起的流控制方法。
* @see com.rabbitmq.client.AMQP.Basic.Qos
* @param prefetchSize 服务器将交付的最大内容量(以八进制为单位度量),如果没有限制,则为0
* @param prefetchCount 告诉RabbitMQ不要同时给一个消费者推送多于N个消息,
* 即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
* @param global 如果设置应用于整个channel而不是每个消费者,则为 true
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize,
int prefetchCount,
boolean global) throws IOException;
复制代码