今天看啥  ›  专栏  ›  克里斯朵夫李维

Rabbitmq常用api

克里斯朵夫李维  · 掘金  ·  · 2019-07-03 15:23

文章预览

阅读 9

Rabbitmq常用api

一、声明一个交换机

  • com.rabbitmq.client.Channel#exchangeDeclare()
    /**
       * Declare an exchange.
       * @see com.rabbitmq.client.AMQP.Exchange.Declare
       * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
       * @param exchange 交换机名称
       * @param type 交换机类型
       * @param durable 持久化,消息代理重启后交换机依然存在
       * @param autoDelete 在服务器不再使用这个交换机(没有队列和与其绑定时)删除
       * @param arguments 其他属性
       * @return a declaration-confirm method to indicate the exchange was successfully declared
       * @throws java.io.IOException if an error is encountered
       */
      Exchange.DeclareOk exchangeDeclare(String exchange,
                                         BuiltinExchangeType type,
                                         boolean durable,
                                         boolean autoDelete,
                                         Map<String, Object> arguments) throws IOException;
    复制代码

二、声明一个队列

  • com.rabbitmq.client.Channel#queueDeclare()
    /**
       * Declare a queue
       * @see com.rabbitmq.client.AMQP.Queue.Declare
       * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
       * @param queue 队列名称
       * @param durable 持久化,当消息代理重启后队列依然存在
       * @param exclusive 独占队列,只能被声明这个队列的Connection使用,
       * 当声明这个队列的Connection关闭或者丢失的时候这个队列将会被删除
       * @param autoDelete 当最后一个消费者断开连接之后队列是否自动被删除
       * @param arguments other properties (construction arguments) for the queue
       * @return a declaration-confirm method to indicate the queue was successfully declared
       * @throws java.io.IOException if an error is encountered
       */
      Queue.DeclareOk queueDeclare(String queue, 
                                   boolean durable, 
                                   boolean exclusive, 
                                   boolean autoDelete,
                                   Map<String, Object> arguments) throws IOException;
    复制代码
  • 声明一个临时队列
    /**
     * 例如日志系统,当消费每次连接到MQ的时候都希望接收到最新的消息
     * 每次连接RabbitMQ的时候都会创建一个新的、空的、
     * 非持久化、专有的(exclusive)、自动删除的、名字随机生成的队列。
     */
    String queueName = channel.queueDeclare().getQueue();
    复制代码

三、绑定队列

  • com.rabbitmq.client.Channel#queueBind()
      /**
     * Bind a queue to an exchange.
     * @see com.rabbitmq.client.AMQP.Queue.Bind
     * @see com.rabbitmq.client.AMQP.Queue.BindOk
     * @param queue 队列名称
     * @param exchange 交换机名称
     * @param routingKey 路由关键字
     * @param arguments other properties (binding parameters)
     * @return a binding-confirm method if the binding was successfully created
     * @throws java.io.IOException if an error is encountered
     */
    Queue.BindOk queueBind(String queue,
                           String exchange,
                           String routingKey,
                           Map<String, Object> arguments) throws IOException;
    复制代码

四、发送消息

  • com.rabbitmq.client.Channel#basicPublish()

    /**
       * Publish a message.
       * @see <a href="https://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
       * @param exchange 要投递消息得交换机
       * @param routingKey 路由关键字
       * @param mandatory 强制的
       * @param immediate 立即的,RabbitMQ不支持此标志
       * set. Note that the RabbitMQ server does not support this flag.
       * @param props other properties for the message - routing headers etc
       * @param body the message body
       * @throws java.io.IOException if an error is encountered
       */
      void basicPublish(String exchange,
                        String routingKey,
                        boolean mandatory,
                        boolean immediate,
                        BasicProperties props, byte[] body)
              throws IOException;
    复制代码

    mandatory:当mandatory标志位设置为true时,如果消息不能被路由,例如,exchange根据自身类型和消息routingKey无法找到一个符合条件的queue,那么broker会通过一个AMQP.Basic.Return命令将消息返回给生产者;当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。

五、接收消息

  • com.rabbitmq.client.Channel#basicConsume()
    /**
      * @param queue 队列名称
      * @param autoAck 是否自动确认
      * @param arguments 参数
      * @param deliverCallback 消息到达时的回调函数
      * @param cancelCallback 消费者被取消的回调
      * @param shutdownSignalCallback 当 channel/connection 关闭时回调
      * @return 返回一个消息代理生成的consumerTag
      * @throws IOException if an error is encountered
      * @see com.rabbitmq.client.AMQP.Basic.Consume
      * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
      * @see #basicAck
      * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
      * @since 5.0
      */
     String basicConsume(String queue,
                         boolean autoAck,
                         Map<String, Object> arguments,
                         DeliverCallback deliverCallback,
                         CancelCallback cancelCallback,
                         ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
    复制代码

六、消息回调(DeliverCallback)

 @FunctionalInterface
public interface DeliverCallback {

   void handle(String consumerTag, Delivery message) throws IOException;

}
复制代码

七、Return模式

  • com.rabbitmq.client.Channel#addReturnListener(ReturnCallback)

    /**
     * 实现此接口,以便在调用basicPublish时,
     * 当设置了"mandatory""immediate"标志时,通知失败的交付。
     * 要获得更简单的、面向lambda的语法,请首选此接口而不是{@link ReturnListener}。
     * @see Channel#basicPublish
     * @see ReturnListener
     * @see Return
     */
     @FunctionalInterface
     public interface ReturnCallback {
         void handle(Return returnMessage);
    }
    复制代码

八、Confirm模式

  • com.rabbitmq.client.Channel#confirmSelect()
  • com.rabbitmq.client.Channel#addConfirmListener()
    复制代码

九、限流控制

/**
     * Request specific "quality of service" settings.
     *
     *这些设置限制了服务器在需要确认之前向消费者交付的数据量。
     * 因此,它们提供了一种由消费者发起的流控制方法。
     * @see com.rabbitmq.client.AMQP.Basic.Qos
     * @param prefetchSize 服务器将交付的最大内容量(以八进制为单位度量),如果没有限制,则为0
     * @param prefetchCount 告诉RabbitMQ不要同时给一个消费者推送多于N个消息,
     *                      即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
     * @param global 如果设置应用于整个channel而不是每个消费者,则为 true
     * @throws java.io.IOException if an error is encountered
     */
    void basicQos(int prefetchSize,
                  int prefetchCount,
                  boolean global) throws IOException;
复制代码
………………………………

原文地址:访问原文地址
快照地址: 访问文章快照
总结与预览地址:访问总结与预览