這篇文章將為大家詳細講解有關(guān)RabbitMq的5種開發(fā)方案是什么,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
創(chuàng)新互聯(lián)公司是一家集網(wǎng)站建設(shè),云安企業(yè)網(wǎng)站建設(shè),云安品牌網(wǎng)站建設(shè),網(wǎng)站定制,云安網(wǎng)站建設(shè)報價,網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,云安網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強企業(yè)競爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實用型網(wǎng)站。
windows安裝rabbitmq
mac安裝rabbitmq 需要注意的是,mac安裝rabbitmq,啟動的時候命令前,需要加 sudo,不然會報錯誤
2.1 Producer(生產(chǎn)者)
2.2 Consumer(消費者)
2.3 Exchange(交換機)
2.4 Queue(隊列)
2.5 rountingKey(交換機與隊列之間的關(guān)系)
官網(wǎng)的6中模式,可以點開這個網(wǎng)址,顯示6中模式,第6中模式RPC遠程調(diào)用我們不需要用該模式,所以我們只要關(guān)注前五種就可以了。
接下來我們就直接簡單教學(xué)rabbitmq的簡單使用
/** * 聲明隊列,五個參數(shù)列表,如果直接使用默認(rèn)channel.queueDeclare("queue"),那么其他參數(shù)都會自動默認(rèn)設(shè)置屬性,所以一般我們幾乎都默認(rèn)它 * String queue 隊列名稱 * boolean durable 隊列是需要持久化,意思就是rabbitmq重啟的時候,如果不是持久化,那么該隊列就會消失,默認(rèn)true * boolean exclusive 如果你想創(chuàng)建一個只有自己可見的隊列,不允許其它用戶訪問RabbitMQ允許你將一個Queue聲明成為排他性的,只對首次聲明它的連接(Connection)可見,會在其連接斷開的時候自動刪除。所以我們開發(fā)中一般不需要此操作,默認(rèn)false * boolean autoDelete 消息是需要持久化,意思就是rabbitmq重啟的時候,如果為true,那么關(guān)閉期間接受的消息就會自動消失,默認(rèn)false * Map<String, Object> arguments 這個參數(shù)我們只會在使用延遲隊列中才會用到,就是延遲隊列的相關(guān)配置等屬性 * 方法channel.queueDelete("queue")等同于channel.queueDeclare("queue",true,false,false,null); */ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** * 綁定隊列到交換機 * String queue 隊列名 * String exchange 交換機名 * String routingKey 綁定關(guān)系 如 大頭兒子,小頭爸爸 他們的rountingKey就是父子 */ channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
版本隨意,本博客任何版本高版本也行。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.0.RELEASE</version> </dependency>
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; public class ConnectionUtil { public static Connection getConnection() throws Exception { //定義連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設(shè)置服務(wù)地址 factory.setHost("localhost"); //端口 factory.setPort(5672); //設(shè)置賬號信息,用戶名、密碼、vhost factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); // 通過工程獲取連接 Connection connection = factory.newConnection(); return connection; } }
理解:該rabbit服務(wù)器只有一個單一的隊列
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "test_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 從連接中創(chuàng)建通道 Channel channel = connection.createChannel(); // 聲明(創(chuàng)建)隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息內(nèi)容 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //關(guān)閉通道和連接 channel.close(); connection.close(); } }
public class Recv { private final static String QUEUE_NAME = "mujiutian_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); //創(chuàng)建channel Channel channel = connection.createChannel(); //創(chuàng)建隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); //監(jiān)聽隊列,發(fā)送消息 channel.basicConsume(QUEUE_NAME, true, consumer); // 獲取消息,該線程一直進行 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("獲取到消息"+message); } } }
先打開rece的main函數(shù),這樣就可以執(zhí)行send發(fā)送消息。
理解:rabbit服務(wù)器有一個exchange(交換機),該交換機下有兩個隊列,總共發(fā)送了100條消息,A隊列效率高可以搶到80條消息給消費者,B隊列只能搶到20條消息給發(fā)送者,他們的總和是100條,為工作模式。
先執(zhí)行,先執(zhí)行消費者main函數(shù),在看結(jié)果圖:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { // 消息內(nèi)容 String message = "" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻服務(wù)器只會發(fā)一條消息給消費者 channel.basicQos(10); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監(jiān)聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("收到消息'" + message + "'"); // 返回確認(rèn)狀態(tài) channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻服務(wù)器只會發(fā)一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監(jiān)聽隊列,手動返回完成狀態(tài) channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
如圖:
// 同一時刻服務(wù)器只會發(fā)一條消息給消費者 channel.basicQos(1);
跟這個有關(guān)系,這也就是我們所說的工作模式
理解:就是群發(fā),該交換機下的所有隊列,都會接受相同的所有交換機發(fā)來的消息,類似于qq群一樣
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息內(nèi)容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue_work"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一時刻服務(wù)器只會發(fā)一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監(jiān)聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_work2"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一時刻服務(wù)器只會發(fā)一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監(jiān)聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
理解就是:你是人,如果性別是女,請進女廁,是男性,請進男廁,如同,一個交換機下面的多個隊列,根據(jù)rountingKey判斷該接受的消息
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 消息內(nèi)容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue_work"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key"); // 同一時刻服務(wù)器只會發(fā)一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監(jiān)聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_work2"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2"); // 同一時刻服務(wù)器只會發(fā)一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監(jiān)聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
理解就是:路徑aas.# A隊列接受這種所有路徑,B隊列接受路徑下的所有隊列aab.#
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息內(nèi)容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue_topic_work"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*"); // 同一時刻服務(wù)器只會發(fā)一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監(jiān)聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_topic_work2"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*"); // 同一時刻服務(wù)器只會發(fā)一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監(jiān)聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
關(guān)于RabbitMq的5種開發(fā)方案是什么就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
分享標(biāo)題:RabbitMq的5種開發(fā)方案是什么
瀏覽路徑:http://chinadenli.net/article20/gehjjo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站建設(shè)、建站公司、品牌網(wǎng)站設(shè)計、做網(wǎng)站、網(wǎng)站內(nèi)鏈、小程序開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)