)>}]
شركة التطبيقات المتكاملة لتصميم وبرمجة البرمجيات الخاصة ش.ش.و.
Integrated Applications Programming Company
Home » Code Library » RabbitMq (Ia.Cl.Models)

Public general use code classes and xml files that we've compiled and used over the years:

RabbitMQ Messaging and Streaming Broker Support Class.

    1: using System;
    2: using System.Collections.Generic;
    3: using System.Text;
    4: using System.Threading.Tasks;
    5: using Newtonsoft.Json;
    6: using RabbitMQ.Client;
    7: using RabbitMQ.Client.Events;
    8:  
    9: namespace Ia.Cl.Models
   10: {
   11:     ////////////////////////////////////////////////////////////////////////////
   12:  
   13:     /// <summary publish="true">
   14:     /// RabbitMQ Messaging and Streaming Broker Support Class.
   15:     /// </summary>
   16:     /// <remarks> 
   17:     /// Copyright © 2015-2025 Jasem Y. Al-Shamlan (info@ia.com.kw), Integrated Applications - Kuwait. All Rights Reserved.
   18:     ///
   19:     /// This library is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by
   20:     /// the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
   21:     ///
   22:     /// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
   23:     /// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
   24:     /// 
   25:     /// You should have received a copy of the GNU General Public License along with this library. If not, see http://www.gnu.org/licenses.
   26:     /// 
   27:     /// Copyright notice: This notice may not be removed or altered from any source distribution.
   28:     /// </remarks> 
   29:  
   30:     public class RabbitMq
   31:     {
   32:         private bool haveReadOnce = false;
   33:  
   34:         // An instant can not use the same channel/queueName for both publishing and consumption.
   35:         private string hostName;
   36:  
   37:         private List<string> sendQueueNameList = new List<string>();
   38:         private List<string> receiveQueueNameList = new List<string>();
   39:  
   40:         private Dictionary<string, Queue<string>> queueNameToQueueDictionary = new Dictionary<string, Queue<string>>();
   41:  
   42:         private ConnectionFactory connectionFactory;
   43:         private IConnection connection;
   44:         private IChannel channel;
   45:         private AsyncEventingBasicConsumer asyncEventingBasicConsumer;
   46:  
   47:         ////////////////////////////////////////////////////////////////////////////
   48:  
   49:         /// <summary>
   50:         ///
   51:         /// </summary>
   52:         public RabbitMq()
   53:         {
   54:             hostName = "localhost";
   55:  
   56:             Initialize();
   57:         }
   58:  
   59:         ////////////////////////////////////////////////////////////////////////////
   60:  
   61:         /// <summary>
   62:         ///
   63:         /// </summary>
   64:         public RabbitMq(string _hostName)
   65:         {
   66:             hostName = _hostName;
   67:  
   68:             Initialize();
   69:         }
   70:  
   71:         ////////////////////////////////////////////////////////////////////////////
   72:  
   73:         /// <summary>
   74:         ///
   75:         /// </summary>
   76:         public void Initialize()
   77:         {
   78:             connectionFactory = new ConnectionFactory { HostName = hostName }; // If we wanted to connect to a node on a different machine we'd simply specify its hostname or IP address here.
   79:                                                                                // "guest"/"guest" by default, limited to localhost connections
   80:                                                                                // factory.UserName = user;
   81:                                                                                // factory.Password = pass;
   82:                                                                                // factory.VirtualHost = vhost;
   83:                                                                                // factory.HostName = hostName;
   84:  
   85:             connection = connectionFactory.CreateConnectionAsync().Result;
   86:             channel = connection.CreateChannelAsync().Result;
   87:  
   88:             asyncEventingBasicConsumer = new AsyncEventingBasicConsumer(channel);
   89:  
   90:             asyncEventingBasicConsumer.ReceivedAsync += (model, ea) =>
   91:             {
   92:                 var routingKey = ea.RoutingKey;
   93:                 var body = ea.Body.ToArray();
   94:                 var json = Encoding.UTF8.GetString(body);
   95:  
   96:                 if (!queueNameToQueueDictionary.ContainsKey(routingKey)) queueNameToQueueDictionary[routingKey] = new Queue<string>();
   97:  
   98:                 queueNameToQueueDictionary[routingKey].Enqueue(json);
   99:  
  100:                 //channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  101:  
  102:                 return Task.CompletedTask;
  103:             };
  104:         }
  105:  
  106:         ////////////////////////////////////////////////////////////////////////////
  107:  
  108:         /// <summary>
  109:         ///
  110:         /// </summary>
  111:         ~RabbitMq()
  112:         {
  113:             channel.CloseAsync();
  114:             connection.CloseAsync();
  115:         }
  116:  
  117:         ////////////////////////////////////////////////////////////////////////////
  118:  
  119:         /// <summary>
  120:         ///
  121:         /// </summary>
  122:         public async void Send<T>(string queueName, T content)
  123:         {
  124:             if (receiveQueueNameList.Contains(queueName))
  125:             {
  126:                 throw new ArgumentOutOfRangeException("An instant can not use the same channel/queueName for both publishing and consumption.");
  127:             }
  128:             else
  129:             {
  130:                 if (!sendQueueNameList.Contains(queueName)) sendQueueNameList.Add(queueName);
  131:  
  132:                 await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // idempotent 
  133:  
  134:                 var json = JsonConvert.SerializeObject(content);
  135:                 var body = Encoding.UTF8.GetBytes(json);
  136:  
  137:                 await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body);
  138:             }
  139:         }
  140:  
  141:         ////////////////////////////////////////////////////////////////////////////
  142:  
  143:         /// <summary>
  144:         ///
  145:         /// </summary>
  146:         public async Task<T> ReceiveAsync<T>(string queueName)
  147:         {
  148:             if (sendQueueNameList.Contains(queueName))
  149:             {
  150:                 throw new ArgumentOutOfRangeException("An instant can not use the same channel/queueName for both publishing and consumption.");
  151:             }
  152:             else
  153:             {
  154:                 if (!receiveQueueNameList.Contains(queueName)) receiveQueueNameList.Add(queueName);
  155:  
  156:                 await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // idempotent
  157:  
  158:                 //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
  159:  
  160:                 //int messageCount = Convert.ToInt16(channel.MessageCount(queueName));
  161:  
  162:                 await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: asyncEventingBasicConsumer);
  163:             }
  164:  
  165:             return Dequeue<T>(queueName);
  166:         }
  167:  
  168:         ////////////////////////////////////////////////////////////////////////////
  169:  
  170:         /// <summary>
  171:         ///
  172:         /// </summary>
  173:         private T Dequeue<T>(string queueName)
  174:         {
  175:             T content;
  176:  
  177:             if (queueNameToQueueDictionary.ContainsKey(queueName))
  178:             {
  179:                 if (queueNameToQueueDictionary[queueName].Count > 0)
  180:                 {
  181:                     var json = queueNameToQueueDictionary[queueName].Dequeue();
  182:  
  183:                     content = JsonConvert.DeserializeObject<T>(json);
  184:                 }
  185:                 else content = default;
  186:             }
  187:             else content = default;
  188:  
  189:             return content;
  190:         }
  191:  
  192:         ////////////////////////////////////////////////////////////////////////////
  193:  
  194:         /// <summary>
  195:         ///
  196:         /// </summary>
  197:         public T Peek<T>(string queueName)
  198:         {
  199:             T content;
  200:  
  201:             if (queueNameToQueueDictionary.ContainsKey(queueName))
  202:             {
  203:                 if (queueNameToQueueDictionary[queueName].Count > 0)
  204:                 {
  205:                     var json = queueNameToQueueDictionary[queueName].Peek();
  206:  
  207:                     content = JsonConvert.DeserializeObject<T>(json);
  208:                 }
  209:                 else content = default;
  210:             }
  211:             else content = default;
  212:  
  213:             return content;
  214:         }
  215:  
  216:         ////////////////////////////////////////////////////////////////////////////
  217:  
  218:         /// <summary>
  219:         ///
  220:         /// </summary>
  221:         public List<T> PeekList<T>(string queueName)
  222:         {
  223:             var list = new List<T>();
  224:  
  225:             if (queueNameToQueueDictionary.ContainsKey(queueName))
  226:             {
  227:                 if (queueNameToQueueDictionary[queueName].Count > 0)
  228:                 {
  229:                     var list0 = new List<string>(queueNameToQueueDictionary[queueName]);
  230:  
  231:                     foreach (var l in list0)
  232:                     {
  233:                         list.Add(JsonConvert.DeserializeObject<T>(l));
  234:                     }
  235:                 }
  236:             }
  237:  
  238:             return list;
  239:         }
  240:  
  241:         ////////////////////////////////////////////////////////////////////////////
  242:  
  243:         /// <summary>
  244:         ///
  245:         /// </summary>
  246:         public async Task<int> CountAsync(string queueName)
  247:         {
  248:             int count;
  249:  
  250:             if (queueNameToQueueDictionary.TryGetValue(queueName, out Queue<string> value))
  251:             {
  252:                 count = value.Count;
  253:  
  254:                 if (count == 0)
  255:                 {
  256:                     try
  257:                     {
  258:                         await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // idempotent
  259:  
  260:                         var c = await channel.MessageCountAsync(queueName);
  261:  
  262:                         count = Convert.ToInt16(c);
  263:                     }
  264:                     catch (System.TimeoutException)
  265:                     {
  266:                         count = 0;
  267:                     }
  268:                     catch (System.NotSupportedException)
  269:                     {
  270:                         count = 0;
  271:                     }
  272:                 }
  273:             }
  274:             else
  275:             {
  276:                 try
  277:                 {
  278:                     await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // idempotent
  279:  
  280:                     var c = await channel.MessageCountAsync(queueName);
  281:  
  282:                     count = Convert.ToInt16(c);
  283:                 }
  284:                 catch (System.TimeoutException)
  285:                 {
  286:                     count = 0;
  287:                 }
  288:                 catch (System.NotSupportedException)
  289:                 {
  290:                     count = 0;
  291:                 }
  292:             }
  293:  
  294:             return count;
  295:         }
  296:  
  297:         /*
  298:         ////////////////////////////////////////////////////////////////////////////
  299: 
  300:         /// <summary>
  301:         ///
  302:         /// </summary>
  303:         public static System.Messaging.Message[] ReceivePrivateMessageList(string queueName)
  304:         {
  305:             return ReceiveOrPeekPrivateMessageList(queueName, true);
  306:         }
  307: 
  308:         ////////////////////////////////////////////////////////////////////////////
  309: 
  310:         /// <summary>
  311:         ///
  312:         /// </summary>
  313:         public static List<string> RecievePrivateList(string queueName)
  314:         {
  315:             var messageList = ReceiveOrPeekPrivateMessageList(queueName, true);
  316:             var list = new List<string>();
  317: 
  318:             if (messageList != null && messageList.Length > 0)
  319:             {
  320:                 foreach (var m in messageList)
  321:                 {
  322:                     list.Add(m.Body.ToString());
  323:                 }
  324:             }
  325:             else
  326:             {
  327: 
  328:             }
  329: 
  330:             return list;
  331:         }
  332: 
  333:         ////////////////////////////////////////////////////////////////////////////
  334: 
  335:         /// <summary>
  336:         ///
  337:         /// </summary>
  338:         private static System.Messaging.Message[] ReceiveOrPeekPrivateMessageList(string queueName, bool purgeList)
  339:         {
  340:             System.Messaging.Message[] list;
  341: 
  342:             var path = @".\private$\" + queueName;
  343: 
  344:             using (MessageQueue messageQueue = new MessageQueue())
  345:             {
  346:                 try
  347:                 {
  348:                     if (MessageQueue.Exists(path))
  349:                     {
  350:                         messageQueue.Path = path;
  351: 
  352:                         messageQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
  353: 
  354:                         list = messageQueue.GetAllMessages();
  355: 
  356:                         if (purgeList) messageQueue.Purge();
  357:                     }
  358:                     else
  359:                     {
  360:                         list = null;
  361:                     }
  362:                 }
  363:                 catch (MessageQueueException)
  364:                 {
  365:                     list = null;
  366:                 }
  367:                 finally
  368:                 {
  369:                     messageQueue.Dispose();
  370:                 }
  371:             }
  372: 
  373:             return list;
  374:         }
  375: 
  376:         ////////////////////////////////////////////////////////////////////////////
  377: 
  378:         /// <summary>
  379:         ///
  380:         /// </summary>
  381:         public static void Purge(string queueName)
  382:         {
  383:             var path = @".\private$\" + queueName;
  384: 
  385:             using (MessageQueue messageQueue = new MessageQueue())
  386:             {
  387:                 try
  388:                 {
  389:                     if (MessageQueue.Exists(path))
  390:                     {
  391:                         messageQueue.Path = path;
  392: 
  393:                         messageQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
  394: 
  395:                         messageQueue.Purge();
  396:                     }
  397:                     else
  398:                     {
  399:                     }
  400:                 }
  401:                 catch (MessageQueueException)
  402:                 {
  403:                 }
  404:                 finally
  405:                 {
  406:                     messageQueue.Dispose();
  407:                 }
  408:             }
  409:         }
  410:         */
  411:  
  412:         ////////////////////////////////////////////////////////////////////////////
  413:         ////////////////////////////////////////////////////////////////////////////
  414:     }
  415:  
  416:     ////////////////////////////////////////////////////////////////////////////
  417:     ////////////////////////////////////////////////////////////////////////////
  418: }
  419: