亚洲精品亚洲人成在线观看麻豆,在线欧美视频一区,亚洲国产精品一区二区动图,色综合久久丁香婷婷

              當(dāng)前位置:首頁 > IT技術(shù) > Web編程 > 正文

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解
              2021-10-22 09:57:55

              ?

              一、RabbitMQ簡介

              是一個開源的消息代理和隊列服務(wù)器,用來通過普通協(xié)議在完全不同的應(yīng)用之間共享數(shù)據(jù),RabbitMQ是使用Erlang(高并發(fā)語言)語言來編寫的,并且RabbitMQ是基于AMQP協(xié)議的。

              1.1 AMQP協(xié)議

              Advanced Message Queuing Protocol(高級消息隊列協(xié)議)

              1.2 AMQP專業(yè)術(shù)語:(多路復(fù)用->在同一個線程中開啟多個通道進行操作)


              • Server:又稱broker,接受客戶端的鏈接,實現(xiàn)AMQP實體服務(wù)
              • Connection:連接,應(yīng)用程序與broker的網(wǎng)絡(luò)連接
              • Channel:網(wǎng)絡(luò)信道,幾乎所有的操作都在channel中進行,Channel是進行消息讀寫的通道??蛻舳丝梢越⒍鄠€channel,每個channel代表一個會話任務(wù)。
              • Message:消息,服務(wù)器與應(yīng)用程序之間傳送的數(shù)據(jù),由Properties和Body組成.Properties可以對消息進行修飾,必須消息的優(yōu)先級、延遲等高級特性;Body則是消息體內(nèi)容。
              • virtualhost: 虛擬地址,用于進行邏輯隔離,最上層的消息路由。一個virtual host里面可以有若干個Exchange和Queue,同一個Virtual Host 里面不能有相同名稱的Exchange 或 Queue。
              • Exchange:交換機,接收消息,根據(jù)路由鍵轉(zhuǎn)單消息到綁定隊列
              • Binding: Exchange和Queue之間的虛擬鏈接,binding中可以包換routing key
              • Routing key: 一個路由規(guī)則,虛擬機可用它來確定如何路由一個特定消息。(如負載均衡)

              1.3 RabbitMQ整體架構(gòu)

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_.NET5

              ?

              ?

              ClientA(生產(chǎn)者)發(fā)送消息到Exchange1(交換機),同時帶上RouteKey(路由Key),Exchange1找到綁定交換機為它和綁定傳入的RouteKey的隊列,把消息轉(zhuǎn)發(fā)到對應(yīng)的隊列,消費者Client1,Client2,Client3只需要指定對應(yīng)的隊列名即可以消費隊列數(shù)據(jù)。

              交換機和隊列多對多關(guān)系,實際開發(fā)中一般是一個交換機對多個隊列,防止設(shè)計復(fù)雜化。

              ?

              二、安裝RabbitMQ

              安裝方式不影響下面的使用,這里用Docker安裝



              #15672端口為web管理端的端口,5672為RabbitMQ服務(wù)的端口
              docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3-management


              輸入:ip:5672訪問驗證。

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_工作隊列_02

              建一個名為develop的Virtual host(虛擬主機)使用,項目中一般是一個項目建一個Virtual host用,能夠隔離隊列。

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_RabbitMQ_03

              切換Virtual host

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_工作隊列_04

              三、RabbitMQ六種隊列模式在.NetCore中使用

              (1)簡單隊列

              最簡單的工作隊列,其中一個消息生產(chǎn)者,一個消息消費者,一個隊列。也稱為點對點模式

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_發(fā)送消息_05

              ?

              ?

              ?

              描述:一個生產(chǎn)者 P 發(fā)送消息到隊列 Q,一個消費者 C 接收

              建一個RabbitMQHelper.cs類



              /// <summary>
              /// RabbitMQ幫助類
              /// </summary>
              public class RabbitMQHelper
              {
              private static ConnectionFactory factory;
              private static object lockObj = new object();
              /// <summary>
              /// 獲取單個RabbitMQ連接
              /// </summary>
              /// <returns></returns>
              public static IConnection GetConnection()
              {
              if (factory == null)
              {
              lock (lockObj)
              {
              if (factory == null)
              {
              factory = new ConnectionFactory
              {
              HostName = "172.16.2.84",//ip
              Port = 5672,//端口
              UserName = "admin",//賬號
              Password = "123456",//密碼
              VirtualHost = "develop" //虛擬主機
              };
              }
              }
              }
              return factory.CreateConnection();
              }
              }


              生產(chǎn)者代碼:

              新建發(fā)送類Send.cs

              ?



              public static void SimpleSendMsg()
              {
              string queueName = "simple_order";//隊列名
              //創(chuàng)建連接
              using (var connection = RabbitMQHelper.GetConnection())
              {
              //創(chuàng)建信道
              using (var channel = connection.CreateModel())
              {//創(chuàng)建隊列
              channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
              for (var i = 0; i < 10; i++)
              {
              string message = $"Hello RabbitMQ MessageHello,{i + 1}";
              var body = Encoding.UTF8.GetBytes(message);//發(fā)送消息
              channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: null, body);
              Console.WriteLine($"發(fā)送消息到隊列:{queueName},內(nèi)容:{message}");
              }
              }
              }
              }


              創(chuàng)建隊列參數(shù)解析:

              durable:是否持久化。

              exclusive:排他隊列,只有創(chuàng)建它的連接(connection)能連,創(chuàng)建它的連接關(guān)閉,會自動刪除隊列。

              autoDelete:被消費后,消費者數(shù)量都斷開時自動刪除隊列。

              arguments:創(chuàng)建隊列的參數(shù)。

              發(fā)送消息參數(shù)解析:

              exchange:交換機,為什么能傳空呢,因為RabbitMQ內(nèi)置有一個默認的交換機,如果傳空時,就會用默認交換機。

              routingKey:路由名稱,這里用隊列名稱做路由key。

              mandatory:true告訴服務(wù)器至少將消息route到一個隊列種,否則就將消息return給發(fā)送者;false:沒有找到路由則消息丟棄。

              執(zhí)行效果:

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_RabbitMQ_06

              ?

              ?

              隊列產(chǎn)生10條消息。

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_發(fā)送消息_07

              ?

              ?消費者代碼:

              新建Recevie.cs類



              public static void SimpleConsumer()
              {
              string queueName = "simple_order";
              var connection = RabbitMQHelper.GetConnection();
              {
              //創(chuàng)建信道
              var channel = connection.CreateModel();
              {
              //創(chuàng)建隊列
              channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
              var consumer = new EventingBasicConsumer(channel);
              int i = 0;
              consumer.Received += (model, ea) =>
              {
              //消費者業(yè)務(wù)處理
              var message = Encoding.UTF8.GetString(ea.Body.ToArray());
              Console.WriteLine($"{i},隊列{queueName}消費消息長度:{message.Length}");
              i++;
              };
              channel.BasicConsume(queueName, true, consumer);
              }
              }
              }


              消費者只需要知道隊列名就可以消費了,不需要Exchange和routingKey。

              注:消費者這里有一個創(chuàng)建隊列,它本身不需要,是預(yù)防消費端程序先執(zhí)行,沒有隊列會報錯。

              執(zhí)行效果:

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_客戶端_08

              ?

              ?

              ?

              ?

              ?RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_工作隊列_09

              消息已經(jīng)被消費完。

              (2)工作隊列模式

              一個消息生產(chǎn)者,一個交換器,一個消息隊列,多個消費者。同樣也稱為點對點模式

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_發(fā)送消息_10

              ?

              生產(chǎn)者P發(fā)送消息到隊列,多個消費者C消費隊列的數(shù)據(jù)。

              工作隊列也稱為公平性隊列模式,循環(huán)分發(fā),RabbitMQ?將按順序?qū)⒚織l消息發(fā)送給下一個消費者,每個消費者將獲得相同數(shù)量的消息。

              生產(chǎn)者:

              Send.cs代碼:



              /// <summary>
              /// 工作隊列模式
              /// </summary>
              public static void WorkerSendMsg()
              {
              string queueName = "worker_order";//隊列名
              //創(chuàng)建連接
              using (var connection = RabbitMQHelper.GetConnection())
              {
              //創(chuàng)建信道
              using (var channel = connection.CreateModel())
              {
              //創(chuàng)建隊列
              channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                           var properties = channel.CreateBasicProperties();
                           properties.Persistent = true; //消息持久化
              for ( var i=0;i<10;i++)
              {
              string message = $"Hello RabbitMQ MessageHello,{i+1}";
              var body = Encoding.UTF8.GetBytes(message);
              //發(fā)送消息到rabbitmq
              channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: properties, body);
              Console.WriteLine($"發(fā)送消息到隊列:{queueName},內(nèi)容:{message}");
              }
              }
              }
              }


              ?

              參數(shù)durable:true,需要持久化,實際項目中肯定需要持久化的,不然重啟RabbitMQ數(shù)據(jù)就會丟失了。

              執(zhí)行效果:

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_RabbitMQ_11

              ?

              ?

              ?

              ?寫入10條數(shù)據(jù),有持久化標識D。RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_RabbitMQ_12

              ?

              ?

              ?消費端:

              Recevie代碼:



              public static void WorkerConsumer()
              {
              string queueName = "worker_order";
              var connection = RabbitMQHelper.GetConnection();
              {
              //創(chuàng)建信道
              var channel = connection.CreateModel();
              {
              //創(chuàng)建隊列
              channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
              var consumer = new EventingBasicConsumer(channel);
              //prefetchCount:1來告知RabbitMQ,不要同時給一個消費者推送多于 N 個消息,也確保了消費速度和性能
              channel.BasicQos(prefetchSize: 0, prefetchCount:1, global: false);
              int i = 1;
              int index = new Random().Next(10);
              consumer.Received += (model, ea) =>
              {
              //處理業(yè)務(wù)
              var message = Encoding.UTF8.GetString(ea.Body.ToArray());
              Console.WriteLine($"{i},消費者:{index},隊列{queueName}消費消息長度:{message.Length}");
              channel.BasicAck(ea.DeliveryTag, false); //消息ack確認,告訴mq這條隊列處理完,可以從mq刪除了
              Thread.Sleep(1000);
              i++;
              };
              channel.BasicConsume(queueName,autoAck:false, consumer);
              }
              }
              }


              BasicQos參數(shù)解析:

              prefetchSize:每條消息大小,一般設(shè)為0,表示不限制。

              prefetchCount:1,作用限流,告訴RabbitMQ不要同時給一個消費者推送多于N個消息,消費者會把N條消息緩存到本地一條條消費,如果不設(shè),RabbitMQ會進可能快的把消息推到客戶端,導(dǎo)致客戶端內(nèi)存升高。設(shè)置合理可以不用頻繁從RabbitMQ 獲取能提升消費速度和性能,設(shè)的太多的話則會增大本地內(nèi)存,需要根據(jù)機器性能合理設(shè)置,官方建議設(shè)為30。

              global:是否為全局設(shè)置。

              這些限流設(shè)置針對消費者autoAck:false時才有效,如果是自動Ack的,限流不生效。

              ?

              執(zhí)行兩個消費者,效果:

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_工作隊列_13

              ?

              可以看到消費者號的標識,8,2,8,2是平均的,一個消費者5個,RabbitMQ上也能看到有2個消費者,Unacked數(shù)是2,因為每個客戶端的限流數(shù)是1。

              ?

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_RabbitMQ_14

              工作隊列模式也是很常用的隊列模式。

              (3)發(fā)布訂閱模式?

              Pulish/Subscribe,無選擇接收消息,一個消息生產(chǎn)者,一個交換機(交換機類型為fanout),多個消息隊列,多個消費者。稱為發(fā)布/訂閱模式

              在應(yīng)用中,只需要簡單的將隊列綁定到交換機上。一個發(fā)送到交換機的消息都會被轉(zhuǎn)發(fā)到與該交換機綁定的所有隊列上。很像子網(wǎng)廣播,每臺子網(wǎng)內(nèi)的主機都獲得了一份復(fù)制的消息。

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_客戶端_15

              ?

              ?

              ?

              生產(chǎn)者P只需把消息發(fā)送到交換機X,綁定這個交換機的隊列都會獲得一份一樣的數(shù)據(jù)。

              ?

              應(yīng)用場景:適合于用同一份數(shù)據(jù)源做不同的業(yè)務(wù)。

              生產(chǎn)者代碼:



              /// <summary>
              /// 發(fā)布訂閱, 扇形隊列
              /// </summary>
              public static void SendMessageFanout()
              {
              //創(chuàng)建連接
              using (var connection = RabbitMQHelper.GetConnection())
              {
              //創(chuàng)建信道
              using (var channel = connection.CreateModel())
              {
              string exchangeName = "fanout_exchange";
              //創(chuàng)建交換機,fanout類型
              channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
              string queueName1 = "fanout_queue1";
              string queueName2 = "fanout_queue2";
              string queueName3 = "fanout_queue3";
              //創(chuàng)建隊列
              channel.QueueDeclare(queueName1, false, false, false);
              channel.QueueDeclare(queueName2, false, false, false);
              channel.QueueDeclare(queueName3, false, false, false);

              //把創(chuàng)建的隊列綁定交換機,routingKey不用給值,給了也沒意義的
              channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "");
              channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "");
              channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "");
                           var properties = channel.CreateBasicProperties();
                           properties.Persistent = true; //消息持久化
              //向交換機寫10條消息
              for (int i = 0; i < 10; i++)
              {
              string message = $"RabbitMQ Fanout {i + 1} Message";
              var body = Encoding.UTF8.GetBytes(message);
              channel.BasicPublish(exchangeName, routingKey: "", null, body);
              Console.WriteLine($"發(fā)送Fanout消息:{message}");
              }
              }
              }
              }


              ?

              執(zhí)行代碼:

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_客戶端_16

              ?

              ?

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_發(fā)送消息_17

              ?

              ?向交換機發(fā)送10條消息,則綁定這個交換機的3個隊列都會有10條消息。

              消費端的代碼和工作隊列的一樣,只需知道隊列名即可消費,聲明時要和生產(chǎn)者的聲明一樣。

              (4)路由模式(推薦使用)

              在發(fā)布/訂閱模式的基礎(chǔ)上,有選擇的接收消息,也就是通過 routing 路由進行匹配條件是否滿足接收消息。

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_發(fā)送消息_18

              ?

              ?

              ?

              ?上圖是一個結(jié)合日志消費級別的配圖,在路由模式它會把消息路由到那些 binding key 與 routing key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的???direct???模式。

              ?生產(chǎn)者P發(fā)送數(shù)據(jù)是要指定交換機(X)和routing發(fā)送消息 ,指定的routingKey=error,則隊列Q1和隊列Q2都會有一份數(shù)據(jù),如果指定routingKey=into,或=warning,交換機(X)只會把消息發(fā)到Q2隊列。

              ?生產(chǎn)者代碼:



              /// <summary>
              /// 路由模式,點到點直連隊列
              /// </summary>
              public static void SendMessageDirect()
              {
              //創(chuàng)建連接
              using (var connection = RabbitMQHelper.GetConnection())
              {
              //創(chuàng)建信道
              using (var channel = connection.CreateModel())
              {
              //聲明交換機對象,fanout類型
              string exchangeName = "direct_exchange";
              channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
              //創(chuàng)建隊列
              string queueName1 = "direct_errorlog";
              string queueName2 = "direct_alllog";
              channel.QueueDeclare(queueName1, true, false, false);
              channel.QueueDeclare(queueName2, true, false, false);

              //把創(chuàng)建的隊列綁定交換機,direct_errorlog隊列只綁定routingKey:error
              channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "error");
              //direct_alllog隊列綁定routingKey:error,info
              channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "info");
              channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "error");
                           var properties = channel.CreateBasicProperties();
                           properties.Persistent = true; //消息持久化
              //向交換機寫10條錯誤日志和10條Info日志
              for (int i = 0; i < 10; i++)
              {
              string message = $"RabbitMQ Direct {i + 1} error Message";
              var body = Encoding.UTF8.GetBytes(message);
              channel.BasicPublish(exchangeName, routingKey: "error", properties, body);
              Console.WriteLine($"發(fā)送Direct消息error:{message}");

              string message2 = $"RabbitMQ Direct {i + 1} info Message";
              var body2 = Encoding.UTF8.GetBytes(message);
              channel.BasicPublish(exchangeName, routingKey: "info", properties, body2);
              Console.WriteLine($"info:{message2}");

              }
              }
              }
              }


              ?

              這里創(chuàng)建一個direct類型的交換機,兩個路由key,一個error,一個info,兩個隊列,一個隊列只綁定error,一個隊列綁定error和info,向error和info各發(fā)10條消息。

              執(zhí)行代碼:

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_客戶端_19

              ?

              ?

              ?查看RabbitMQ管理界面,direct_errorlog隊列10條,而direct_alllog有20條,因為direct_alllog隊列兩個routingKey的消息都進去了。

              ?

              ?

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_發(fā)送消息_20

              ?

              ?

              ?

              ?點進去看下兩個隊列綁定的交換機和routingKey

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_.NET5_21

              ?

              ?

              ?

              ?

              ?

              ?

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_發(fā)送消息_22

              ?

              ?

              ?消費者代碼:

              消費者和工作隊列一樣,只需根據(jù)隊列名消費即可,這里只消費direct_errorlog隊列作示例



              public static void DirectConsumer()
              {
              string queueName = "direct_errorlog";
              var connection = RabbitMQHelper.GetConnection();
              {
              //創(chuàng)建信道
              var channel = connection.CreateModel();
              {
              //創(chuàng)建隊列
              channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
              var consumer = new EventingBasicConsumer(channel);
              ///prefetchCount:1來告知RabbitMQ,不要同時給一個消費者推送多于 N 個消息,也確保了消費速度和性能
              ///global:是否設(shè)為全局的
              ///prefetchSize:單條消息大小,通常設(shè)0,表示不做限制
              //是autoAck=false才會有效
              channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
              int i = 1;
              consumer.Received += (model, ea) =>
              {
              //處理業(yè)務(wù)
              var message = Encoding.UTF8.GetString(ea.Body.ToArray());
              Console.WriteLine($"{i},隊列{queueName}消費消息長度:{message.Length}");
              channel.BasicAck(ea.DeliveryTag, false); //消息ack確認,告訴mq這條隊列處理完,可以從mq刪除了
              i++;
              };
              channel.BasicConsume(queueName, autoAck: false, consumer);
              }
              }
              }


              ?

              普通場景中推薦使用路由模式,因為路由模式有交換機,有路由key,能夠更好的拓展各種應(yīng)用場景。

              (5)主題模式

              topics(主題)模式跟routing路由模式類似,只不過路由模式是指定固定的路由鍵 routingKey,而主題模式是可以模糊匹配路由鍵 routingKey,類似于SQL中 = 和 like 的關(guān)系。

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_工作隊列_23

              ?

              P 表示為生產(chǎn)者、 X 表示交換機、C1C2 表示為消費者,紅色表示隊列。

              ?

              ?topics 模式與 routing 模式比較相近,topics 模式不能具有任意的 routingKey,必須由一個英文句點號“.”分隔的字符串(我們將被句點號“.”分隔開的每一段獨立的字符串稱為一個單詞),比如 "lazy.orange.a"。topics routingKey 中可以存在兩種特殊字符"*"與“#”,用于做模糊匹配,其中“*”用于匹配一個單詞,“#”用于匹配多個單詞(可以是零個)。

              以上圖為例:

              如果發(fā)送消息的routingKey設(shè)置為:

              aaa.orange.rabbit,那么消息會路由到Q1與Q2,

              routingKey=aaa.orange.bb的消息會路由到Q1,

              routingKey=lazy.aa.bb.cc的消息會路由到Q2;

              routingKey=lazy.aa.rabbit的消息會路由到 Q2(只會投遞給Q2一次,雖然這個routingKey 與 Q2 的兩個 bindingKey 都匹配);

              沒匹配routingKey的消息將會被丟棄。

              生產(chǎn)者代碼:

              ?



              public static void SendMessageTopic()
              {
              //創(chuàng)建連接
              using (var connection = RabbitMQHelper.GetConnection())
              {
              //創(chuàng)建信道
              using (var channel = connection.CreateModel())
              {
              //聲明交換機對象,fanout類型
              string exchangeName = "topic_exchange";
              channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
              //隊列名
              string queueName1 = "topic_queue1";
              string queueName2 = "topic_queue2";
              //路由名
              string routingKey1 = "*.orange.*";
              string routingKey2 = "*.*.rabbit";
              string routingKey3 = "lazy.#";
              channel.QueueDeclare(queueName1, true, false, false);
              channel.QueueDeclare(queueName2, true, false, false);

              //把創(chuàng)建的隊列綁定交換機,routingKey指定routingKey
              channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: routingKey1);
              channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey2);
              channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey3);
              //向交換機寫10條消息
              for (int i = 0; i < 10; i++)
              {
              string message = $"RabbitMQ Direct {i + 1} Message";
              var body = Encoding.UTF8.GetBytes(message);
              channel.BasicPublish(exchangeName, routingKey: "aaa.orange.rabbit", null, body);
              channel.BasicPublish(exchangeName, routingKey: "lazy.aa.rabbit", null, body);
              Console.WriteLine($"發(fā)送Topic消息:{message}");
              }
              }
              }
              }


              這里演示了 routingKey為aaa.orange.rabbit,和lazy.aa.rabbit的情況,第一個匹配到Q1和Q2,第二個匹配到Q2,所以應(yīng)該Q1是10條,Q2有20條,

              執(zhí)行后看rabbitMQ界面:

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_RabbitMQ_24

              ?

              (6)RPC模式

              與上面其他5種所不同之處,該模式是擁有請求/回復(fù)的。也就是有響應(yīng)的,上面5種都沒有。

              RPC是指遠程過程調(diào)用,也就是說兩臺服務(wù)器A,B,一個應(yīng)用部署在A服務(wù)器上,想要調(diào)用B服務(wù)器上應(yīng)用提供的處理業(yè)務(wù),處理完后然后在A服務(wù)器繼續(xù)執(zhí)行下去,把異步的消息以同步的方式執(zhí)行。

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_客戶端_25

              ?

              ?客戶端(C)聲明一個排他隊列自己訂閱,然后發(fā)送消息到RPC隊列同時也把這個排他隊列名也在消息里傳進去,服務(wù)端監(jiān)聽RPC隊列,處理完業(yè)務(wù)后把處理結(jié)果發(fā)送到這個排他隊列,然后客戶端收到結(jié)果,繼續(xù)處理自己的邏輯。

              RPC的處理流程:


              • 當(dāng)客戶端啟動時,創(chuàng)建一個匿名的回調(diào)隊列。
              • 客戶端為RPC請求設(shè)置2個屬性:replyTo:設(shè)置回調(diào)隊列名字;correlationId:標記request。
              • 請求被發(fā)送到rpc_queue隊列中。
              • RPC服務(wù)器端監(jiān)聽rpc_queue隊列中的請求,當(dāng)請求到來時,服務(wù)器端會處理并且把帶有結(jié)果的消息發(fā)送給客戶端。接收的隊列就是replyTo設(shè)定的回調(diào)隊列。
              • 客戶端監(jiān)聽回調(diào)隊列,當(dāng)有消息時,檢查correlationId屬性,如果與request中匹配,那就是結(jié)果了。

              服務(wù)端代碼:



              public class RPCServer
              {
              public static void RpcHandle()
              {

              var connection = RabbitMQHelper.GetConnection();
              {
              var channel = connection.CreateModel();
              {
              string queueName = "rpc_queue";
              channel.QueueDeclare(queue: queueName, durable: false,
              exclusive: false, autoDelete: false, arguments: null);
              channel.BasicQos(0, 1, false);
              var consumer = new EventingBasicConsumer(channel);
              channel.BasicConsume(queue: queueName,
              autoAck: false, consumer: consumer);
              Console.WriteLine("【服務(wù)端】等待RPC請求...");

              consumer.Received += (model, ea) =>
              {
              string response = null;

              var body = ea.Body.ToArray();
              var props = ea.BasicProperties;
              var replyProps = channel.CreateBasicProperties();
              replyProps.CorrelationId = props.CorrelationId;

              try
              {
              var message = Encoding.UTF8.GetString(body);
              Console.WriteLine($"【服務(wù)端】接收到數(shù)據(jù):{ message},開始處理");
              response = $"消息:{message},處理完成";
              }
              catch (Exception e)
              {
              Console.WriteLine("錯誤:" + e.Message);
              response = "";
              }
              finally
              {
              var responseBytes = Encoding.UTF8.GetBytes(response);
              channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
              basicProperties: replyProps, body: responseBytes);
              channel.BasicAck(deliveryTag: ea.DeliveryTag,
              multiple: false);
              }
              };
              }
              }
              }

              }


              ?

              客戶端:



              public class RPCClient
              {
              private readonly IConnection connection;
              private readonly IModel channel;
              private readonly string replyQueueName;
              private readonly EventingBasicConsumer consumer;
              private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
              private readonly IBasicProperties props;

              public RPCClient()
              {
              connection = RabbitMQHelper.GetConnection();

              channel = connection.CreateModel();
              replyQueueName = channel.QueueDeclare().QueueName;
              consumer = new EventingBasicConsumer(channel);

              props = channel.CreateBasicProperties();
              var correlationId = Guid.NewGuid().ToString();
              props.CorrelationId = correlationId; //給消息id
              props.ReplyTo = replyQueueName;//回調(diào)的隊列名,Client關(guān)閉后會自動刪除

              consumer.Received += (model, ea) =>
              {
              var body = ea.Body.ToArray();
              var response = Encoding.UTF8.GetString(body);
              //監(jiān)聽的消息Id和定義的消息Id相同代表這條消息服務(wù)端處理完成
              if (ea.BasicProperties.CorrelationId == correlationId)
              {
              respQueue.Add(response);
              }
              };

              channel.BasicConsume(
              consumer: consumer,
              queue: replyQueueName,
              autoAck: true);
              }

              public string Call(string message)
              {
              var messageBytes = Encoding.UTF8.GetBytes(message);
              //發(fā)送消息
              channel.BasicPublish(
              exchange: "",
              routingKey: "rpc_queue",
              basicProperties: props,
              body: messageBytes);
              //等待回復(fù)
              return respQueue.Take();
              }

              public void Close()
              {
              connection.Close();
              }
              }


              ?

              執(zhí)行代碼:



              static void Main(string[] args)
              {
              Console.WriteLine("Hello World!");
              //啟動服務(wù)端,正常邏輯是在另一個程序
              RPCServer.RpcHandle();
              //實例化客戶端
              var rpcClient = new RPCClient();
              string message = $"消息id:{new Random().Next(1, 1000)}";
              Console.WriteLine($"【客服端】RPC請求中,{message}");
              //向服務(wù)端發(fā)送消息,等待回復(fù)
              var response = rpcClient.Call(message);
              Console.WriteLine("【客服端】收到回復(fù)響應(yīng):{0}", response);
              rpcClient.Close();
              Console.ReadKey();
              }


              ?

              測試效果:

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_客戶端_26

              ?

              ?z執(zhí)行完,客服端close后,可以接著自己的下一步業(yè)務(wù)處理。

              ?

              ?

              總結(jié):

              以上便是RabbitMQ的6中模式在.net core中實際使用,其中(1)簡單隊列,(2)工作隊列,(4)路由模式,(6)RPC模式的交換機類型都是direct,(3)發(fā)布訂閱的交換機是fanout,(5)topics的交換機是topic。正常場景用的是direct,默認交換機也是direct類型的,推薦用(4)路由模式,因為指定交換機名比起默認的交換機會容易擴展場景,其他的交換機看業(yè)務(wù)場景所需使用。

              下面位置可以看到交換機類型,amq.開頭那幾個是內(nèi)置的,避免交換機過多可以直接使用。

              RabbitMQ從零到集群高可用.NetCore(.NET5) - RabbitMQ簡介和六種工作模式詳解_工作隊列_27

              ?

              本文摘自 :https://blog.51cto.com/u

              開通會員,享受整站包年服務(wù)立即開通 >