)>}]
شركة التطبيقات المتكاملة لتصميم وبرمجة البرمجيات الخاصة ش.ش.و.
Integrated Applications Programming Company
Home » Code Library » RabbitMq (Ia.Cl.Models)

Public general use code classes and xml files that we've compiled and used over the years:

RabbitMQ Messaging and Streaming Broker Support Class.

    1: using System;
    2: using System.Collections.Generic;
    3: using System.Printing;
    4: using System.Text;
    5: using System.Windows.Markup;
    6: using Microsoft.Identity.Client;
    7: using Newtonsoft.Json;
    8: using RabbitMQ.Client;
    9: using RabbitMQ.Client.Events;
   10:  
   11: namespace Ia.Cl.Models
   12: {
   13:     ////////////////////////////////////////////////////////////////////////////
   14:  
   15:     /// <summary publish="true">
   16:     /// RabbitMQ Messaging and Streaming Broker Support Class.
   17:     /// </summary>
   18:     /// <remarks> 
   19:     /// Copyright © 2015-2024 Jasem Y. Al-Shamlan (info@ia.com.kw), Integrated Applications - Kuwait. All Rights Reserved.
   20:     ///
   21:     /// This library is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by
   22:     /// the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
   23:     ///
   24:     /// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
   25:     /// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
   26:     /// 
   27:     /// You should have received a copy of the GNU General Public License along with this library. If not, see http://www.gnu.org/licenses.
   28:     /// 
   29:     /// Copyright notice: This notice may not be removed or altered from any source distribution.
   30:     /// </remarks> 
   31:  
   32:     public class RabbitMq
   33:     {
   34:         private bool haveReadOnce = false;
   35:  
   36:         // An instant can not use the same channel/queueName for both publishing and consumption.
   37:         private string hostName;
   38:  
   39:         private List<string> publishingQueueNameList = new List<string>();
   40:         private List<string> consumptionQueueNameList = new List<string>();
   41:  
   42:         private Dictionary<string, Queue<string>> queueNameToQueueDictionary = new Dictionary<string, Queue<string>>();
   43:  
   44:         private EventingBasicConsumer consumer;
   45:         private IModel channel;
   46:         private IConnection connection;
   47:         private ConnectionFactory factory;
   48:  
   49:         ////////////////////////////////////////////////////////////////////////////
   50:  
   51:         /// <summary>
   52:         ///
   53:         /// </summary>
   54:         public RabbitMq()
   55:         {
   56:             hostName = "localhost";
   57:  
   58:             Initialize();
   59:         }
   60:  
   61:         ////////////////////////////////////////////////////////////////////////////
   62:  
   63:         /// <summary>
   64:         ///
   65:         /// </summary>
   66:         public RabbitMq(string _hostName)
   67:         {
   68:             hostName = _hostName;
   69:  
   70:             Initialize();
   71:         }
   72:  
   73:         ////////////////////////////////////////////////////////////////////////////
   74:  
   75:         /// <summary>
   76:         ///
   77:         /// </summary>
   78:         public void Initialize()
   79:         {
   80:             factory = new ConnectionFactory { HostName = hostName }; // If we wanted to connect to a node on a different machine we'd simply specify its hostname or IP address here.
   81:             // "guest"/"guest" by default, limited to localhost connections
   82:             // factory.UserName = user;
   83:             // factory.Password = pass;
   84:             // factory.VirtualHost = vhost;
   85:             // factory.HostName = hostName;
   86:  
   87:             connection = factory.CreateConnection();
   88:  
   89:             channel = connection.CreateModel();
   90:  
   91:             consumer = new EventingBasicConsumer(channel);
   92:  
   93:             consumer.Received += (model, ea) =>
   94:             {
   95:                 var routingKey = ea.RoutingKey;
   96:                 var body = ea.Body.ToArray();
   97:                 var json = Encoding.UTF8.GetString(body);
   98:  
   99:                 if (!queueNameToQueueDictionary.ContainsKey(routingKey)) queueNameToQueueDictionary[routingKey] = new Queue<string>();
  100:  
  101:                 queueNameToQueueDictionary[routingKey].Enqueue(json);
  102:  
  103:                 //channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  104:             };
  105:  
  106:         }
  107:  
  108:         ////////////////////////////////////////////////////////////////////////////
  109:  
  110:         /// <summary>
  111:         ///
  112:         /// </summary>
  113:         ~RabbitMq()
  114:         {
  115:             channel.Close();
  116:             connection.Close();
  117:         }
  118:  
  119:         ////////////////////////////////////////////////////////////////////////////
  120:  
  121:         /// <summary>
  122:         ///
  123:         /// </summary>
  124:         public void Publish<T>(string queueName, T content)
  125:         {
  126:             if (consumptionQueueNameList.Contains(queueName))
  127:             {
  128:                 throw new ArgumentOutOfRangeException("An instant can not use the same channel/queueName for both publishing and consumption.");
  129:             }
  130:             else
  131:             {
  132:                 if (!publishingQueueNameList.Contains(queueName)) publishingQueueNameList.Add(queueName);
  133:  
  134:                 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // idempotent 
  135:  
  136:                 var json = JsonConvert.SerializeObject(content);
  137:                 var body = Encoding.UTF8.GetBytes(json);
  138:  
  139:                 //var properties = channel.CreateBasicProperties();
  140:                 //properties.Persistent = true; 
  141:  
  142:                 channel.BasicPublish(exchange: string.Empty, routingKey: queueName, basicProperties: null, body: body);
  143:             }
  144:         }
  145:  
  146:         ////////////////////////////////////////////////////////////////////////////
  147:  
  148:         /// <summary>
  149:         ///
  150:         /// </summary>
  151:         public T Consume<T>(string queueName)
  152:         {
  153:             if (publishingQueueNameList.Contains(queueName))
  154:             {
  155:                 throw new ArgumentOutOfRangeException("An instant can not use the same channel/queueName for both publishing and consumption.");
  156:             }
  157:             else
  158:             {
  159:                 if (!consumptionQueueNameList.Contains(queueName)) consumptionQueueNameList.Add(queueName);
  160:  
  161:                 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // idempotent 
  162:  
  163:                 //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
  164:  
  165:                 //int messageCount = Convert.ToInt16(channel.MessageCount(queueName));
  166:  
  167:                 channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
  168:             }
  169:  
  170:             return Dequeue<T>(queueName);
  171:         }
  172:  
  173:         ////////////////////////////////////////////////////////////////////////////
  174:  
  175:         /// <summary>
  176:         ///
  177:         /// </summary>
  178:         private T Dequeue<T>(string queueName)
  179:         {
  180:             T content;
  181:  
  182:             if (queueNameToQueueDictionary.ContainsKey(queueName))
  183:             {
  184:                 if (queueNameToQueueDictionary[queueName].Count > 0)
  185:                 {
  186:                     var json = queueNameToQueueDictionary[queueName].Dequeue();
  187:  
  188:                     content = JsonConvert.DeserializeObject<T>(json);
  189:                 }
  190:                 else content = default;
  191:             }
  192:             else content = default;
  193:  
  194:             return content;
  195:         }
  196:  
  197:         ////////////////////////////////////////////////////////////////////////////
  198:  
  199:         /// <summary>
  200:         ///
  201:         /// </summary>
  202:         public T Peek<T>(string queueName)
  203:         {
  204:             T content;
  205:  
  206:             if (queueNameToQueueDictionary.ContainsKey(queueName))
  207:             {
  208:                 if (queueNameToQueueDictionary[queueName].Count > 0)
  209:                 {
  210:                     var json = queueNameToQueueDictionary[queueName].Peek();
  211:  
  212:                     content = JsonConvert.DeserializeObject<T>(json);
  213:                 }
  214:                 else content = default;
  215:             }
  216:             else content = default;
  217:  
  218:             return content;
  219:         }
  220:  
  221:         ////////////////////////////////////////////////////////////////////////////
  222:  
  223:         /// <summary>
  224:         ///
  225:         /// </summary>
  226:         public List<T> PeekList<T>(string queueName)
  227:         {
  228:             var list = new List<T>();
  229:  
  230:             if (queueNameToQueueDictionary.ContainsKey(queueName))
  231:             {
  232:                 if (queueNameToQueueDictionary[queueName].Count > 0)
  233:                 {
  234:                     var list0 = new List<string>(queueNameToQueueDictionary[queueName]);
  235:  
  236:                     foreach (var l in list0)
  237:                     {
  238:                         list.Add(JsonConvert.DeserializeObject<T>(l));
  239:                     }
  240:                 }
  241:             }
  242:  
  243:             return list;
  244:         }
  245:  
  246:         ////////////////////////////////////////////////////////////////////////////
  247:  
  248:         /// <summary>
  249:         ///
  250:         /// </summary>
  251:         public int Count(string queueName)
  252:         {
  253:             int count;
  254:  
  255:             if (queueNameToQueueDictionary.ContainsKey(queueName))
  256:             {
  257:                 count = queueNameToQueueDictionary[queueName].Count;
  258:  
  259:                 if (count == 0)
  260:                 {
  261:                     channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // idempotent 
  262:  
  263:                     count = Convert.ToInt16(channel.MessageCount(queueName));
  264:                 }
  265:             }
  266:             else
  267:             {
  268:                 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // idempotent 
  269:  
  270:                 count = Convert.ToInt16(channel.MessageCount(queueName));
  271:             }
  272:  
  273:             return count;
  274:         }
  275:  
  276:         /*
  277:         ////////////////////////////////////////////////////////////////////////////
  278: 
  279:         /// <summary>
  280:         ///
  281:         /// </summary>
  282:         public static System.Messaging.Message[] ReceivePrivateMessageList(string queueName)
  283:         {
  284:             return ReceiveOrPeekPrivateMessageList(queueName, true);
  285:         }
  286: 
  287:         ////////////////////////////////////////////////////////////////////////////
  288: 
  289:         /// <summary>
  290:         ///
  291:         /// </summary>
  292:         public static List<string> RecievePrivateList(string queueName)
  293:         {
  294:             var messageList = ReceiveOrPeekPrivateMessageList(queueName, true);
  295:             var list = new List<string>();
  296: 
  297:             if (messageList != null && messageList.Length > 0)
  298:             {
  299:                 foreach (var m in messageList)
  300:                 {
  301:                     list.Add(m.Body.ToString());
  302:                 }
  303:             }
  304:             else
  305:             {
  306: 
  307:             }
  308: 
  309:             return list;
  310:         }
  311: 
  312:         ////////////////////////////////////////////////////////////////////////////
  313: 
  314:         /// <summary>
  315:         ///
  316:         /// </summary>
  317:         private static System.Messaging.Message[] ReceiveOrPeekPrivateMessageList(string queueName, bool purgeList)
  318:         {
  319:             System.Messaging.Message[] list;
  320: 
  321:             var path = @".\private$\" + queueName;
  322: 
  323:             using (MessageQueue messageQueue = new MessageQueue())
  324:             {
  325:                 try
  326:                 {
  327:                     if (MessageQueue.Exists(path))
  328:                     {
  329:                         messageQueue.Path = path;
  330: 
  331:                         messageQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
  332: 
  333:                         list = messageQueue.GetAllMessages();
  334: 
  335:                         if (purgeList) messageQueue.Purge();
  336:                     }
  337:                     else
  338:                     {
  339:                         list = null;
  340:                     }
  341:                 }
  342:                 catch (MessageQueueException)
  343:                 {
  344:                     list = null;
  345:                 }
  346:                 finally
  347:                 {
  348:                     messageQueue.Dispose();
  349:                 }
  350:             }
  351: 
  352:             return list;
  353:         }
  354: 
  355:         ////////////////////////////////////////////////////////////////////////////
  356: 
  357:         /// <summary>
  358:         ///
  359:         /// </summary>
  360:         public static void Purge(string queueName)
  361:         {
  362:             var path = @".\private$\" + queueName;
  363: 
  364:             using (MessageQueue messageQueue = new MessageQueue())
  365:             {
  366:                 try
  367:                 {
  368:                     if (MessageQueue.Exists(path))
  369:                     {
  370:                         messageQueue.Path = path;
  371: 
  372:                         messageQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
  373: 
  374:                         messageQueue.Purge();
  375:                     }
  376:                     else
  377:                     {
  378:                     }
  379:                 }
  380:                 catch (MessageQueueException)
  381:                 {
  382:                 }
  383:                 finally
  384:                 {
  385:                     messageQueue.Dispose();
  386:                 }
  387:             }
  388:         }
  389:         */
  390:  
  391:         ////////////////////////////////////////////////////////////////////////////
  392:         ////////////////////////////////////////////////////////////////////////////
  393:     }
  394:  
  395:     ////////////////////////////////////////////////////////////////////////////
  396:     ////////////////////////////////////////////////////////////////////////////
  397: }
  398: