Pages

搜尋此網誌

2014年1月9日 星期四

rabbit MQ 與 Spring boot 整合,以 RPC mode 為例

rabbit MQ 與 Spring boot 整合

最近在開發的專案剛好有使用到 rabbit MQ,也花了一點時間了解運作原理,關於使用 MQ 的好處,與各種應用情形,網路上有很多,有需要可以搜尋看看,本篇要說明的是 rabbit MQ 與 java 的串接,在實作的過程中花了一些時間,希望透過這篇的說明可以幫助到有需要的人。

在開始說明前,先簡單說明一下 RPC 模式的運作:

在 rabbit MQ 的官網關於 RPC 的介紹,模型示意圖如下:

enter image description here

可以看到在 RPC 模式下分為 client 與 server,request 與 Reply 分別有各自的隊列負責,本範例使用 spring AmqpTemplate 作為 client 進行操作,在上圖中總共有四個重要的成員,分為 client 與 server 跟大家介紹。

一開始要先跟大家說明的是,在整個 java 與 rabbit MQ 整合有個很重要的物件為 ConnectionFactory connectionFactory,該物件存放了存取 rabbit MQ server 的相關資訊,包括登入帳號與密碼,因此在此範例中不管 client、server 或是去跟回的隊列都會跟他有關係,因此一開始我們需要先建立 ConnectionFactory。

因為是用 spring boot 開發,預設只要有使用到 amqp 套件,spring 在啟動時就算你沒有特別設定,就會產生好有預設參數的 bean 等待你取用,本範例沒有用到特殊參數,因此透過下列程式就可以取得 ConnectionFactory 實體:

@Autowired
ConnectionFactory connectionFactory;

所謂的預設值分別是 ip、port、帳號、密碼等,client 與 server 用的會是一個物件,接著我們就可以來看 client 的定義。

client

client 是訊息的發起方,一開始我們需要先定義 client 的隊列,RPC 模式有去有回,對於 client 而言,他需要消化的隊列是 reply,因此我們需要先將 client 與 reply 進行綁定,首先需要先定義 reply 隊列:

final static String queueName = "reply";

@Bean
public Queue responseQueue() {
    return new Queue(queueName);
}

一旦隊列建立完成,再來要設定 amqpTemplete,還記得剛剛說的 connectionFactory 儲存了存取 MQ server 的相關資訊,因此在建立時需要傳入,建立以後綁定 reply 隊列,如下:

@Bean
public RabbitTemplate amqpTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setReplyQueue(responseQueue());
    return rabbitTemplate;
}

接著我們需要設定 SimpleMessageListenerContainer,首先為了讓 SimpleMessageListenerContainer 可以存取隊列訊息,所以他也需要 ConnectionFactory,一旦 amqpTemplate 送出訊息後,將透過 SimpleMessageListenerContainer 去監看 replay 的隊列有沒有新的訊息近來並且要將訊息傳給 amqpTemplate,因此 SimpleMessageListenerContainer 需要綁定 amqpTemplate 與 replay 隊列,設定的程式碼如下:

@Bean
public SimpleMessageListenerContainer clientMessageListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueues(responseQueue());
    container.setMessageListener(amqpTemplate());
    return container;
}

如此就完成 client 的設定,完整程式碼如下:

package goinfo.test;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqClientConfig {


    @Autowired
    ConnectionFactory connectionFactory;

    final static String queueName = "reply";

    @Bean
    public Queue responseQueue() {
        return new Queue(queueName);
    }

    @Bean
    public RabbitTemplate amqpTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReplyQueue(responseQueue());
        return rabbitTemplate;
    }
    @Bean
    public SimpleMessageListenerContainer clientMessageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(responseQueue());
        container.setMessageListener(amqpTemplate());

        return container;
    }


}

server

server 的部分同樣需要 ConnectionFactory connectionFactory,跟 client 一樣,這邊不多說明,除了 connectionFactory,第一步要定義的是 server 所要消化的對列,也就是上圖中的 rpc_queue,我們將隊列命名為 spring-boot 如下:

@Autowired
ConnectionFactory connectionFactory;

final static String queueName = "spring-boot";

@Bean
Queue queue() {
    return new Queue(queueName, false);
}

我們需要一個類別來處理由 rpc_queue 傳入的訊息,並且定義一旦接收到 message 之後,要呼叫哪個函式,透過 MessageListenerAdapter 來幫我們完成:

@Bean
AmqpController receiver() {
    return new AmqpController();
}
@Bean
MessageListenerAdapter listenerAdapter(AmqpController receiver) {
    return new MessageListenerAdapter(receiver, "receiveMessage");
}

AmqpController 是我們要處理 message 的類別,而 MessageListenerAdapter(receiver, "receiveMessage");這一句表示,一旦接收到訊息,要呼叫 receiver 物件中所定義的 receiveMessage 方法, AmqpController 程式碼如下:


import goinfo.service.ApiFecadeService;
import org.springframework.beans.factory.annotation.Autowired;

public class AmqpController {

    @Autowired
    private ApiFecadeService apiFecadeService;

    public String receiveMessage(String message) {
        return apiFecadeService.excute(apiFecadeService, message);
    }
}

可以看到我們定義了 receiveMessage method,所傳入的 message 將透過 MessageListenerAdapter 傳入,return 的部份就是要傳入 reply 隊列的內容。

這邊不得不補充一下,自己在研究時,一直想不透到底要怎麼將處理的結果回傳給 reply,因為 spring AMQP 官方的範例是最單純的有去沒回的模式,因此在宣告 receiveMessage 是 void 而不是 String,嘗試許多方法,靈機想說該不會直接 return 就會傳給 reply,果不其然…就是這麼簡單!

一旦 MessageListenerAdapter 設定好,我們同樣需要設定 SimpleMessageListenerContainer,該物件的建立同樣需要 ConnectionFactory 作為存取隊列的依據,與 client 類似,但不一樣的是隊列對象是 rpc_queue,而訊息的處理交由 MessageListenerAdapter 傳入 AmqpController,設定如下:

@Bean
SimpleMessageListenerContainer serverMessageListenerContainer(MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueues(queue());
    container.setMessageListener(listenerAdapter);
    return container;
}

完整的程式如下:

package goinfo.cfg;

import goinfo.web.AmqpController;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqServerConfig {

    @Autowired
    ConnectionFactory connectionFactory;

    final static String queueName = "spring-boot";

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    AmqpController receiver() {
        return new AmqpController();
    }

    @Bean
    MessageListenerAdapter listenerAdapter(AmqpController receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    SimpleMessageListenerContainer serverMessageListenerContainer(MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(queue());
        container.setMessageListener(listenerAdapter);
        return container;
    }
}

如此就完成 client 與 server 端的設定,可以開始實際使用。

另外在 MQ 的運行中還有一個很重要的參與者名為 exchange,若有需要也可以透過下面的程式碼綁定:

@Bean
TopicExchange exchange() {
    return new TopicExchange("spring-boot-exchange");
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(queueName);
}

沒有設定也沒有關係,spring boot 會幫你設定好預設為 server 的 queue 名稱加上 -exchange,此例來說就是 spring-boot-exchange。

範例訊息送出與接收

在這裡我們建立一個 TestController 進行訊息的組成在透過一開始在 client 說明中建立的 AmqpTemplete 將訊息送出,如下:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

@Controller
public class TestController {

    @Autowired
    RabbitTemplate amqpTemplate;
    public String sendMsg() {
        Object o = amqpTemplate.convertSendAndReceive("spring-boot", "{test: 'OK'}");
        return result;
    }
}

最後再整理一下整個執行的過程:

  1. 使用 amqpTemplate 所提供的 convertSendAndReceive,表示送出訊息後要等待接收 reply
  2. 透過該函式將訊息送到指定的隊列名稱為 spring-boot,透過 connectionFactory 將訊息傳給 rpc_quene
  3. Server 端的 serverMessageListenerContainer,一發現 rpc_quene 有新的訊息,就會把訊息傳給 listenerAdapter,交由 AmqpController 處理
  4. 一旦 AmqpController 的 receiveMessage 函式處理完成後,將處理結果 return
  5. 此時 serverMessageListenerContainer 根據從 amqpTemplate 送出訊息裡包含的 BasicProperties 知道要回傳到隊列 reply
  6. 一旦隊列 reply 有了新訊息,client 的 clientMessageListenerContainer 監控到 reply 新的訊息需要處理,根據 clientMessageListenerContainer 綁定的 MessageListener 對象為一開始發出訊息的 amqpTemplate,將處理結果傳入,回到原點,完成整個交易過程。

最後附上在 MQ server 控制頁的模型圖示:

enter image description here

若是以 rabbit MQ server 角度來看,當你送出訊息時,訊息會送到與 spring-boot 隊列成對的 exchange,spring-boot-exchange 綁定了 spring-boot 與 reply 隊列。

上圖中的藍色線就是當你送出 message 時會出現代表有新訊息傳入,在交由 exchange 去分派到對應的隊列。

寫了很多,希望可以幫助到需要的朋友。

張貼留言