Connecting to RabbitMQ












6














I would like a review for this connection to RabbitMQ. I just developed it and seems to be working well, but I would like another set of eyes on it before putting this on the sever.



package models

import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.MessageProperties

import anorm.SQL
import anorm.sqlToSimple
import anorm.toParameterValue
import play.api.Play.current

object RabbitMQConnection {
private var connection: Connection = null
def getConnection(ss:Connection): Connection = {
println(ss+" connection <<<<<<<<<<<<<<<<<<<<<<<<")
connection=ss
connection match {
case null => {
val factory = new ConnectionFactory()
println("waiting for new connection")
factory.setHost("172.22.22.222")
println("host setted")
connection = factory.newConnection()
println("connection created")
connection
}
case _ =>{
println("connection is not null")
connection
}
}
}
}

object RMQ {
var connection = RabbitMQConnection.getConnection(null)

def setQ(qName: String, message: String) = {
println("ping received")
try {
println(connection)
if (connection != null) {
if (connection.isOpen()) {
println("connection is open")
} else {
connection = RabbitMQConnection.getConnection(null)
println("connection is new "+connection)
}
println("connetion is ready to use")
val channel = connection.createChannel()
channel.queueDeclare(qName, true, false, false, null) //suggestion
channel.basicPublish("", qName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes())
println("status" + channel.close())
println("setQ complete executed for " + qName)
Map("result" -> "success")

} else {
println("connection can't established to rabbit mq for =>" + qName)
LogFile.QLogs(qName, message)
Map("result" -> "error")
}
} catch {
case e: Exception =>
println(e.printStackTrace())
println("Rabbit Mq Server is Down for =>" + qName)
LogFile.QLogs(qName, message)
Map("result" ->"error")
}
}

}









share|improve this question





























    6














    I would like a review for this connection to RabbitMQ. I just developed it and seems to be working well, but I would like another set of eyes on it before putting this on the sever.



    package models

    import com.rabbitmq.client.Connection
    import com.rabbitmq.client.ConnectionFactory
    import com.rabbitmq.client.ConnectionFactory
    import com.rabbitmq.client.MessageProperties

    import anorm.SQL
    import anorm.sqlToSimple
    import anorm.toParameterValue
    import play.api.Play.current

    object RabbitMQConnection {
    private var connection: Connection = null
    def getConnection(ss:Connection): Connection = {
    println(ss+" connection <<<<<<<<<<<<<<<<<<<<<<<<")
    connection=ss
    connection match {
    case null => {
    val factory = new ConnectionFactory()
    println("waiting for new connection")
    factory.setHost("172.22.22.222")
    println("host setted")
    connection = factory.newConnection()
    println("connection created")
    connection
    }
    case _ =>{
    println("connection is not null")
    connection
    }
    }
    }
    }

    object RMQ {
    var connection = RabbitMQConnection.getConnection(null)

    def setQ(qName: String, message: String) = {
    println("ping received")
    try {
    println(connection)
    if (connection != null) {
    if (connection.isOpen()) {
    println("connection is open")
    } else {
    connection = RabbitMQConnection.getConnection(null)
    println("connection is new "+connection)
    }
    println("connetion is ready to use")
    val channel = connection.createChannel()
    channel.queueDeclare(qName, true, false, false, null) //suggestion
    channel.basicPublish("", qName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes())
    println("status" + channel.close())
    println("setQ complete executed for " + qName)
    Map("result" -> "success")

    } else {
    println("connection can't established to rabbit mq for =>" + qName)
    LogFile.QLogs(qName, message)
    Map("result" -> "error")
    }
    } catch {
    case e: Exception =>
    println(e.printStackTrace())
    println("Rabbit Mq Server is Down for =>" + qName)
    LogFile.QLogs(qName, message)
    Map("result" ->"error")
    }
    }

    }









    share|improve this question



























      6












      6








      6







      I would like a review for this connection to RabbitMQ. I just developed it and seems to be working well, but I would like another set of eyes on it before putting this on the sever.



      package models

      import com.rabbitmq.client.Connection
      import com.rabbitmq.client.ConnectionFactory
      import com.rabbitmq.client.ConnectionFactory
      import com.rabbitmq.client.MessageProperties

      import anorm.SQL
      import anorm.sqlToSimple
      import anorm.toParameterValue
      import play.api.Play.current

      object RabbitMQConnection {
      private var connection: Connection = null
      def getConnection(ss:Connection): Connection = {
      println(ss+" connection <<<<<<<<<<<<<<<<<<<<<<<<")
      connection=ss
      connection match {
      case null => {
      val factory = new ConnectionFactory()
      println("waiting for new connection")
      factory.setHost("172.22.22.222")
      println("host setted")
      connection = factory.newConnection()
      println("connection created")
      connection
      }
      case _ =>{
      println("connection is not null")
      connection
      }
      }
      }
      }

      object RMQ {
      var connection = RabbitMQConnection.getConnection(null)

      def setQ(qName: String, message: String) = {
      println("ping received")
      try {
      println(connection)
      if (connection != null) {
      if (connection.isOpen()) {
      println("connection is open")
      } else {
      connection = RabbitMQConnection.getConnection(null)
      println("connection is new "+connection)
      }
      println("connetion is ready to use")
      val channel = connection.createChannel()
      channel.queueDeclare(qName, true, false, false, null) //suggestion
      channel.basicPublish("", qName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes())
      println("status" + channel.close())
      println("setQ complete executed for " + qName)
      Map("result" -> "success")

      } else {
      println("connection can't established to rabbit mq for =>" + qName)
      LogFile.QLogs(qName, message)
      Map("result" -> "error")
      }
      } catch {
      case e: Exception =>
      println(e.printStackTrace())
      println("Rabbit Mq Server is Down for =>" + qName)
      LogFile.QLogs(qName, message)
      Map("result" ->"error")
      }
      }

      }









      share|improve this question















      I would like a review for this connection to RabbitMQ. I just developed it and seems to be working well, but I would like another set of eyes on it before putting this on the sever.



      package models

      import com.rabbitmq.client.Connection
      import com.rabbitmq.client.ConnectionFactory
      import com.rabbitmq.client.ConnectionFactory
      import com.rabbitmq.client.MessageProperties

      import anorm.SQL
      import anorm.sqlToSimple
      import anorm.toParameterValue
      import play.api.Play.current

      object RabbitMQConnection {
      private var connection: Connection = null
      def getConnection(ss:Connection): Connection = {
      println(ss+" connection <<<<<<<<<<<<<<<<<<<<<<<<")
      connection=ss
      connection match {
      case null => {
      val factory = new ConnectionFactory()
      println("waiting for new connection")
      factory.setHost("172.22.22.222")
      println("host setted")
      connection = factory.newConnection()
      println("connection created")
      connection
      }
      case _ =>{
      println("connection is not null")
      connection
      }
      }
      }
      }

      object RMQ {
      var connection = RabbitMQConnection.getConnection(null)

      def setQ(qName: String, message: String) = {
      println("ping received")
      try {
      println(connection)
      if (connection != null) {
      if (connection.isOpen()) {
      println("connection is open")
      } else {
      connection = RabbitMQConnection.getConnection(null)
      println("connection is new "+connection)
      }
      println("connetion is ready to use")
      val channel = connection.createChannel()
      channel.queueDeclare(qName, true, false, false, null) //suggestion
      channel.basicPublish("", qName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes())
      println("status" + channel.close())
      println("setQ complete executed for " + qName)
      Map("result" -> "success")

      } else {
      println("connection can't established to rabbit mq for =>" + qName)
      LogFile.QLogs(qName, message)
      Map("result" -> "error")
      }
      } catch {
      case e: Exception =>
      println(e.printStackTrace())
      println("Rabbit Mq Server is Down for =>" + qName)
      LogFile.QLogs(qName, message)
      Map("result" ->"error")
      }
      }

      }






      scala connection-pool rabbitmq






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jun 27 '14 at 16:45









      rolfl

      90.7k13190394




      90.7k13190394










      asked Jun 26 '14 at 10:17









      Govind Singh Nagarkoti

      457322




      457322






















          3 Answers
          3






          active

          oldest

          votes


















          6














          Well, the first thing I would recommend is replacing all of those println statements with some kind of logging. In any kind of production environment, logging is something that needs more thought and care so that it is useful, without consuming resources unnecessarily. Or, if they're just for the original developer during development, maybe delete them.



          I find the structure of this code to be rather odd. For all the trappings of OO programming, what you essentially have is two global public functions and one global public variable.



          The function name setQ doesn't seem to say what you're actually doing - which appears to be creating a channel and publishing a message to it. publishMessage would make more sense, I think.



          All of the code that checks the usability of the connection should be extracted into a single function, something along the lines of haveUsableConnection - the two if/else blocks are making it hard to see the code that is actually doing the work.



          I question the hard coded IP address. Magic numbers and magic strings are frequently a bad idea.






          share|improve this answer































            4















            • Don't use null, but instead Option[Connection]. null is frowned upon in Scala.


            • It looks quite odd that getConnection(ss:Connection) takes in a Connection. That method either sets the connection (opposite of get!), or initialize and returns the connection if the argument is null. Create two different methods. It does not make much sense as it is... it looks like you just wanted to use pattern matching somewhere.


            • You can get rid of the whole object RabbitMQConnection and replace your lazy initialization of the connection with the Scala lazy keyword. (EDIT: actually, it is not clear what exactly you should do with the connection, since you also have to always check if it is still open. I'll let you figure this out, but you should be aware of Scala's lazy.)


            • Since setQ only needs the connection to create a Channel, I would just have a method to fetch the Channel. That method would check if the Connection is open and restart it if needed. (Separation of concerns.)


            • I hope all the println are just for debugging and will be removed. If you keep those, use some logging service instead, even if it just prints to console in the end. More generally, you should strive for separation of concerns and find a way to separate the code that actually does something and the logging code.


            • I would declare the return type of setQ.


            • About that return type: instead of Map("result" -> "success") and Map("result" ->"error") use Scala's Try. Try is actually a monad so you'll be able to call map and flatMap on the result of setQ.







            share|improve this answer































              0














                 To connect with RabbitMQ using C# code kindly follow below steps.

              1. You should have valid Host Name that you may get it from rabbitmq URL
              2. Virtual Domain name
              3. User name and Password if you are connecting with remote rebbitMQ server

              Please see below code,

              using RabbitMQ.Client;
              using RabbitMQ.Client.Events;
              using System;
              using System.Collections.Generic;
              using System.Linq;
              using System.Text;
              using System.Threading.Tasks;
              using Newtonsoft.Json;
              using System.IO;
              using System.Reflection;
              namespace RMQConnect
              {
              public class RabbitMQQueueManager
              {
              private const string ORIGINAL_QUEUENAME_KEY = "originalQueueName";
              private const ushort PREFETCH_SIZE = 50;

              public IConnection GetRabbitMqConnection()
              {
              ConnectionFactory factory = new ConnectionFactory
              {
              HostName = "d1vmrmqtrv01.oldev.arol.shell.com",
              VirtualHost = "TRV_ANDD",
              UserName = "ol_sqlmaint_trv",
              Password = "P@ssw0rd",

              };
              return factory.CreateConnection();
              }

              public List<Message> GetMessagesFromQueueNoAck(string queueName, int
              messageCount = -1)
              {
              QueueingBasicConsumer consumer = null;
              var responseMessages = new List<Message>();
              BasicDeliverEventArgs result = null;
              using (var rmqConnection = GetRabbitMqConnection())
              {
              using (var channel = rmqConnection.CreateModel())
              {
              try
              {
              var queueMessageCount = (int)channel.MessageCount(queueName);
              var count = messageCount > -1 ? messageCount <=
              queueMessageCount ? messageCount : queueMessageCount :
              queueMessageCount;
              var pfCount = count >= PREFETCH_SIZE ? PREFETCH_SIZE : count;
              channel.BasicQos(0, (ushort)pfCount, false);
              consumer = new QueueingBasicConsumer(channel);
              channel.BasicConsume(queueName, false, consumer);
              for (int i = 0; i < pfCount; i++)
              {
              if (!channel.IsOpen)
              {
              throw new ApplicationException("Channel is closed");
              }
              result = consumer.Queue.Dequeue();
              try
              {
              string messageData =
              System.Text.Encoding.UTF8.GetString(result.Body);
              var rMessage = new Message(messageData);
              rMessage.Header = new MessageHeader();



              RmqHeaderHandler.ReadRmqMessageProperties(result.BasicProperties, rMessage);
              channel.BasicNack(result.DeliveryTag, false, true);
              responseMessages.Add(rMessage);
              }
              catch (Exception ex)
              {
              channel.BasicNack(result.DeliveryTag, false, true);
              }
              }
              }
              catch (Exception)
              {
              ////Nack the message back to queue in case of exception
              if (result != null)
              {
              channel.BasicNack(result.DeliveryTag, false, true);
              }
              throw;
              }
              }
              }
              return responseMessages;
              }
              }
              public static class RmqHeaderHandler
              {
              private const byte NonPersistentDeliveryMode = 1;
              private const byte PersistentDeliveryMode = 2;
              private const string SecurityTokenKey = "SecurityToken";
              private const string Properties = "properties";
              private const string MessageNameKey = "MessageName";
              private const string SystemPropertiesKey = "SystemProperties";
              private const string ApplicationPropertiesKey = "ApplicationProperties";
              #region Public Methods
              public static void ReadDynamicMessageProperties(dynamic messageProperties,
              Message message)
              {
              try
              {
              message.Header.AppId = messageProperties.appId;
              message.Header.MessageId = messageProperties.messageId;
              message.Header.GeneratedAtUtc = messageProperties.generatedAtUtc;
              message.Header.ExpirationInMilliseconds =
              messageProperties.expirationInMilliseconds;
              message.Header.IsPersistent = messageProperties.isPersistent;
              message.Header.Delete = Convert.ToBoolean(messageProperties.delete);
              message.Header.Move = Convert.ToBoolean(messageProperties.move);
              if (messageProperties.ContainsKey("messageName"))
              {
              message.Header.MessageName = messageProperties.messageName;
              }
              if (messageProperties.ContainsKey("properties"))
              {
              var customProperties =
              Newtonsoft.Json.JsonConvert.DeserializeObject<Dictionary<string, string>>
              (Convert.ToString(messageProperties.properties));
              foreach (var propPair in customProperties)
              {
              message.Header.Properties.Add(propPair.Key, propPair.Value);
              }
              }
              }
              catch (Exception)
              {
              throw;
              }
              }
              public static void ReadRmqMessageProperties(IBasicProperties messageProperties,
              Message message)
              {
              //message.Header.AppId = messageProperties.AppId;
              message.Header.MessageId = messageProperties.MessageId;
              message.Header.GeneratedAtUtc = new
              DateTime(messageProperties.Timestamp.UnixTime);
              message.Header.ExpirationInMilliseconds = messageProperties.Expiration;
              message.Header.IsPersistent = messageProperties.DeliveryMode ==
              PersistentDeliveryMode;
              if (messageProperties.Headers!=null &&
              messageProperties.Headers.ContainsKey(SystemPropertiesKey))
              {
              var systemProperties =
              DeserializeMessageProperties((byte)
              messageProperties.Headers[SystemPropertiesKey]);
              if (systemProperties.ContainsKey(MessageNameKey))
              {
              message.Header.MessageName = systemProperties[MessageNameKey];
              }
              }
              if (messageProperties.Headers!=null &&
              messageProperties.Headers.ContainsKey(ApplicationPropertiesKey))
              {
              var applicationProperties =
              DeserializeMessageProperties((byte)
              messageProperties.Headers[ApplicationPropertiesKey]);
              foreach (var propPair in applicationProperties)
              {
              message.Header.Properties.Add(propPair.Key, propPair.Value);
              }
              }
              }
              #endregion
              #region private methods
              private static Dictionary<string, string> DeserializeMessageProperties(byte
              properties)
              {
              //var serializer = new .JsonMessageSerializer();
              var serializedText = JsonConvert.SerializeObject(properties);
              return JsonConvert.DeserializeObject<Dictionary<string, string>>
              (serializedText);
              }
              #endregion
              }
              public class QueueInfoModel
              {
              public string Environment { get; set; }
              public string ApplicationGroup { get; set; }
              public string ApplicationName { get; set; }
              public string ErrorQueueName { get; set; }
              public string OriginalQueueName { get; set; }
              public int MessageCount { get; set; }
              }
              [Serializable]
              public class MessageHeader
              {
              public MessageHeader()
              {
              this.MessageId = Guid.NewGuid().ToString();
              this.Properties = new Dictionary<string, string>();
              this.IsPersistent = true;
              }
              public string AppId { get; set; }
              public string MessageId { get; set; }
              public string MessageName { get; set; }
              public DateTime GeneratedAtUtc { get; set; }
              public string ExpirationInMilliseconds { get; set; }
              public bool IsPersistent { get; set; }
              public bool Delete { get; set; }
              public bool Move { get; set; }
              public IDictionary<string, string> Properties { get; private set; }
              }
              [Serializable]
              public class Message
              {
              private readonly string serializableBody;
              public Message(string serializableBody)
              {
              this.serializableBody = serializableBody;
              }
              public MessageHeader Header
              {
              get;
              set;
              }
              public string MessageBody
              {
              get
              {
              return this.serializableBody;
              }
              }
              public byte GetBody()
              {
              return (byte)((object)Encoding.ASCII.GetBytes(this.MessageBody));
              }
              }

              }




              share








              New contributor




              Punit Pandya is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
              Check out our Code of Conduct.


















                Your Answer





                StackExchange.ifUsing("editor", function () {
                return StackExchange.using("mathjaxEditing", function () {
                StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
                StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
                });
                });
                }, "mathjax-editing");

                StackExchange.ifUsing("editor", function () {
                StackExchange.using("externalEditor", function () {
                StackExchange.using("snippets", function () {
                StackExchange.snippets.init();
                });
                });
                }, "code-snippets");

                StackExchange.ready(function() {
                var channelOptions = {
                tags: "".split(" "),
                id: "196"
                };
                initTagRenderer("".split(" "), "".split(" "), channelOptions);

                StackExchange.using("externalEditor", function() {
                // Have to fire editor after snippets, if snippets enabled
                if (StackExchange.settings.snippets.snippetsEnabled) {
                StackExchange.using("snippets", function() {
                createEditor();
                });
                }
                else {
                createEditor();
                }
                });

                function createEditor() {
                StackExchange.prepareEditor({
                heartbeatType: 'answer',
                autoActivateHeartbeat: false,
                convertImagesToLinks: false,
                noModals: true,
                showLowRepImageUploadWarning: true,
                reputationToPostImages: null,
                bindNavPrevention: true,
                postfix: "",
                imageUploader: {
                brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
                contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
                allowUrls: true
                },
                onDemand: true,
                discardSelector: ".discard-answer"
                ,immediatelyShowMarkdownHelp:true
                });


                }
                });














                draft saved

                draft discarded


















                StackExchange.ready(
                function () {
                StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f55312%2fconnecting-to-rabbitmq%23new-answer', 'question_page');
                }
                );

                Post as a guest















                Required, but never shown

























                3 Answers
                3






                active

                oldest

                votes








                3 Answers
                3






                active

                oldest

                votes









                active

                oldest

                votes






                active

                oldest

                votes









                6














                Well, the first thing I would recommend is replacing all of those println statements with some kind of logging. In any kind of production environment, logging is something that needs more thought and care so that it is useful, without consuming resources unnecessarily. Or, if they're just for the original developer during development, maybe delete them.



                I find the structure of this code to be rather odd. For all the trappings of OO programming, what you essentially have is two global public functions and one global public variable.



                The function name setQ doesn't seem to say what you're actually doing - which appears to be creating a channel and publishing a message to it. publishMessage would make more sense, I think.



                All of the code that checks the usability of the connection should be extracted into a single function, something along the lines of haveUsableConnection - the two if/else blocks are making it hard to see the code that is actually doing the work.



                I question the hard coded IP address. Magic numbers and magic strings are frequently a bad idea.






                share|improve this answer




























                  6














                  Well, the first thing I would recommend is replacing all of those println statements with some kind of logging. In any kind of production environment, logging is something that needs more thought and care so that it is useful, without consuming resources unnecessarily. Or, if they're just for the original developer during development, maybe delete them.



                  I find the structure of this code to be rather odd. For all the trappings of OO programming, what you essentially have is two global public functions and one global public variable.



                  The function name setQ doesn't seem to say what you're actually doing - which appears to be creating a channel and publishing a message to it. publishMessage would make more sense, I think.



                  All of the code that checks the usability of the connection should be extracted into a single function, something along the lines of haveUsableConnection - the two if/else blocks are making it hard to see the code that is actually doing the work.



                  I question the hard coded IP address. Magic numbers and magic strings are frequently a bad idea.






                  share|improve this answer


























                    6












                    6








                    6






                    Well, the first thing I would recommend is replacing all of those println statements with some kind of logging. In any kind of production environment, logging is something that needs more thought and care so that it is useful, without consuming resources unnecessarily. Or, if they're just for the original developer during development, maybe delete them.



                    I find the structure of this code to be rather odd. For all the trappings of OO programming, what you essentially have is two global public functions and one global public variable.



                    The function name setQ doesn't seem to say what you're actually doing - which appears to be creating a channel and publishing a message to it. publishMessage would make more sense, I think.



                    All of the code that checks the usability of the connection should be extracted into a single function, something along the lines of haveUsableConnection - the two if/else blocks are making it hard to see the code that is actually doing the work.



                    I question the hard coded IP address. Magic numbers and magic strings are frequently a bad idea.






                    share|improve this answer














                    Well, the first thing I would recommend is replacing all of those println statements with some kind of logging. In any kind of production environment, logging is something that needs more thought and care so that it is useful, without consuming resources unnecessarily. Or, if they're just for the original developer during development, maybe delete them.



                    I find the structure of this code to be rather odd. For all the trappings of OO programming, what you essentially have is two global public functions and one global public variable.



                    The function name setQ doesn't seem to say what you're actually doing - which appears to be creating a channel and publishing a message to it. publishMessage would make more sense, I think.



                    All of the code that checks the usability of the connection should be extracted into a single function, something along the lines of haveUsableConnection - the two if/else blocks are making it hard to see the code that is actually doing the work.



                    I question the hard coded IP address. Magic numbers and magic strings are frequently a bad idea.







                    share|improve this answer














                    share|improve this answer



                    share|improve this answer








                    edited Jun 26 '14 at 17:28

























                    answered Jun 26 '14 at 17:23









                    Donald.McLean

                    4,2622148




                    4,2622148

























                        4















                        • Don't use null, but instead Option[Connection]. null is frowned upon in Scala.


                        • It looks quite odd that getConnection(ss:Connection) takes in a Connection. That method either sets the connection (opposite of get!), or initialize and returns the connection if the argument is null. Create two different methods. It does not make much sense as it is... it looks like you just wanted to use pattern matching somewhere.


                        • You can get rid of the whole object RabbitMQConnection and replace your lazy initialization of the connection with the Scala lazy keyword. (EDIT: actually, it is not clear what exactly you should do with the connection, since you also have to always check if it is still open. I'll let you figure this out, but you should be aware of Scala's lazy.)


                        • Since setQ only needs the connection to create a Channel, I would just have a method to fetch the Channel. That method would check if the Connection is open and restart it if needed. (Separation of concerns.)


                        • I hope all the println are just for debugging and will be removed. If you keep those, use some logging service instead, even if it just prints to console in the end. More generally, you should strive for separation of concerns and find a way to separate the code that actually does something and the logging code.


                        • I would declare the return type of setQ.


                        • About that return type: instead of Map("result" -> "success") and Map("result" ->"error") use Scala's Try. Try is actually a monad so you'll be able to call map and flatMap on the result of setQ.







                        share|improve this answer




























                          4















                          • Don't use null, but instead Option[Connection]. null is frowned upon in Scala.


                          • It looks quite odd that getConnection(ss:Connection) takes in a Connection. That method either sets the connection (opposite of get!), or initialize and returns the connection if the argument is null. Create two different methods. It does not make much sense as it is... it looks like you just wanted to use pattern matching somewhere.


                          • You can get rid of the whole object RabbitMQConnection and replace your lazy initialization of the connection with the Scala lazy keyword. (EDIT: actually, it is not clear what exactly you should do with the connection, since you also have to always check if it is still open. I'll let you figure this out, but you should be aware of Scala's lazy.)


                          • Since setQ only needs the connection to create a Channel, I would just have a method to fetch the Channel. That method would check if the Connection is open and restart it if needed. (Separation of concerns.)


                          • I hope all the println are just for debugging and will be removed. If you keep those, use some logging service instead, even if it just prints to console in the end. More generally, you should strive for separation of concerns and find a way to separate the code that actually does something and the logging code.


                          • I would declare the return type of setQ.


                          • About that return type: instead of Map("result" -> "success") and Map("result" ->"error") use Scala's Try. Try is actually a monad so you'll be able to call map and flatMap on the result of setQ.







                          share|improve this answer


























                            4












                            4








                            4







                            • Don't use null, but instead Option[Connection]. null is frowned upon in Scala.


                            • It looks quite odd that getConnection(ss:Connection) takes in a Connection. That method either sets the connection (opposite of get!), or initialize and returns the connection if the argument is null. Create two different methods. It does not make much sense as it is... it looks like you just wanted to use pattern matching somewhere.


                            • You can get rid of the whole object RabbitMQConnection and replace your lazy initialization of the connection with the Scala lazy keyword. (EDIT: actually, it is not clear what exactly you should do with the connection, since you also have to always check if it is still open. I'll let you figure this out, but you should be aware of Scala's lazy.)


                            • Since setQ only needs the connection to create a Channel, I would just have a method to fetch the Channel. That method would check if the Connection is open and restart it if needed. (Separation of concerns.)


                            • I hope all the println are just for debugging and will be removed. If you keep those, use some logging service instead, even if it just prints to console in the end. More generally, you should strive for separation of concerns and find a way to separate the code that actually does something and the logging code.


                            • I would declare the return type of setQ.


                            • About that return type: instead of Map("result" -> "success") and Map("result" ->"error") use Scala's Try. Try is actually a monad so you'll be able to call map and flatMap on the result of setQ.







                            share|improve this answer















                            • Don't use null, but instead Option[Connection]. null is frowned upon in Scala.


                            • It looks quite odd that getConnection(ss:Connection) takes in a Connection. That method either sets the connection (opposite of get!), or initialize and returns the connection if the argument is null. Create two different methods. It does not make much sense as it is... it looks like you just wanted to use pattern matching somewhere.


                            • You can get rid of the whole object RabbitMQConnection and replace your lazy initialization of the connection with the Scala lazy keyword. (EDIT: actually, it is not clear what exactly you should do with the connection, since you also have to always check if it is still open. I'll let you figure this out, but you should be aware of Scala's lazy.)


                            • Since setQ only needs the connection to create a Channel, I would just have a method to fetch the Channel. That method would check if the Connection is open and restart it if needed. (Separation of concerns.)


                            • I hope all the println are just for debugging and will be removed. If you keep those, use some logging service instead, even if it just prints to console in the end. More generally, you should strive for separation of concerns and find a way to separate the code that actually does something and the logging code.


                            • I would declare the return type of setQ.


                            • About that return type: instead of Map("result" -> "success") and Map("result" ->"error") use Scala's Try. Try is actually a monad so you'll be able to call map and flatMap on the result of setQ.








                            share|improve this answer














                            share|improve this answer



                            share|improve this answer








                            edited Jun 26 '14 at 17:55

























                            answered Jun 26 '14 at 17:24









                            toto2

                            5,1771019




                            5,1771019























                                0














                                   To connect with RabbitMQ using C# code kindly follow below steps.

                                1. You should have valid Host Name that you may get it from rabbitmq URL
                                2. Virtual Domain name
                                3. User name and Password if you are connecting with remote rebbitMQ server

                                Please see below code,

                                using RabbitMQ.Client;
                                using RabbitMQ.Client.Events;
                                using System;
                                using System.Collections.Generic;
                                using System.Linq;
                                using System.Text;
                                using System.Threading.Tasks;
                                using Newtonsoft.Json;
                                using System.IO;
                                using System.Reflection;
                                namespace RMQConnect
                                {
                                public class RabbitMQQueueManager
                                {
                                private const string ORIGINAL_QUEUENAME_KEY = "originalQueueName";
                                private const ushort PREFETCH_SIZE = 50;

                                public IConnection GetRabbitMqConnection()
                                {
                                ConnectionFactory factory = new ConnectionFactory
                                {
                                HostName = "d1vmrmqtrv01.oldev.arol.shell.com",
                                VirtualHost = "TRV_ANDD",
                                UserName = "ol_sqlmaint_trv",
                                Password = "P@ssw0rd",

                                };
                                return factory.CreateConnection();
                                }

                                public List<Message> GetMessagesFromQueueNoAck(string queueName, int
                                messageCount = -1)
                                {
                                QueueingBasicConsumer consumer = null;
                                var responseMessages = new List<Message>();
                                BasicDeliverEventArgs result = null;
                                using (var rmqConnection = GetRabbitMqConnection())
                                {
                                using (var channel = rmqConnection.CreateModel())
                                {
                                try
                                {
                                var queueMessageCount = (int)channel.MessageCount(queueName);
                                var count = messageCount > -1 ? messageCount <=
                                queueMessageCount ? messageCount : queueMessageCount :
                                queueMessageCount;
                                var pfCount = count >= PREFETCH_SIZE ? PREFETCH_SIZE : count;
                                channel.BasicQos(0, (ushort)pfCount, false);
                                consumer = new QueueingBasicConsumer(channel);
                                channel.BasicConsume(queueName, false, consumer);
                                for (int i = 0; i < pfCount; i++)
                                {
                                if (!channel.IsOpen)
                                {
                                throw new ApplicationException("Channel is closed");
                                }
                                result = consumer.Queue.Dequeue();
                                try
                                {
                                string messageData =
                                System.Text.Encoding.UTF8.GetString(result.Body);
                                var rMessage = new Message(messageData);
                                rMessage.Header = new MessageHeader();



                                RmqHeaderHandler.ReadRmqMessageProperties(result.BasicProperties, rMessage);
                                channel.BasicNack(result.DeliveryTag, false, true);
                                responseMessages.Add(rMessage);
                                }
                                catch (Exception ex)
                                {
                                channel.BasicNack(result.DeliveryTag, false, true);
                                }
                                }
                                }
                                catch (Exception)
                                {
                                ////Nack the message back to queue in case of exception
                                if (result != null)
                                {
                                channel.BasicNack(result.DeliveryTag, false, true);
                                }
                                throw;
                                }
                                }
                                }
                                return responseMessages;
                                }
                                }
                                public static class RmqHeaderHandler
                                {
                                private const byte NonPersistentDeliveryMode = 1;
                                private const byte PersistentDeliveryMode = 2;
                                private const string SecurityTokenKey = "SecurityToken";
                                private const string Properties = "properties";
                                private const string MessageNameKey = "MessageName";
                                private const string SystemPropertiesKey = "SystemProperties";
                                private const string ApplicationPropertiesKey = "ApplicationProperties";
                                #region Public Methods
                                public static void ReadDynamicMessageProperties(dynamic messageProperties,
                                Message message)
                                {
                                try
                                {
                                message.Header.AppId = messageProperties.appId;
                                message.Header.MessageId = messageProperties.messageId;
                                message.Header.GeneratedAtUtc = messageProperties.generatedAtUtc;
                                message.Header.ExpirationInMilliseconds =
                                messageProperties.expirationInMilliseconds;
                                message.Header.IsPersistent = messageProperties.isPersistent;
                                message.Header.Delete = Convert.ToBoolean(messageProperties.delete);
                                message.Header.Move = Convert.ToBoolean(messageProperties.move);
                                if (messageProperties.ContainsKey("messageName"))
                                {
                                message.Header.MessageName = messageProperties.messageName;
                                }
                                if (messageProperties.ContainsKey("properties"))
                                {
                                var customProperties =
                                Newtonsoft.Json.JsonConvert.DeserializeObject<Dictionary<string, string>>
                                (Convert.ToString(messageProperties.properties));
                                foreach (var propPair in customProperties)
                                {
                                message.Header.Properties.Add(propPair.Key, propPair.Value);
                                }
                                }
                                }
                                catch (Exception)
                                {
                                throw;
                                }
                                }
                                public static void ReadRmqMessageProperties(IBasicProperties messageProperties,
                                Message message)
                                {
                                //message.Header.AppId = messageProperties.AppId;
                                message.Header.MessageId = messageProperties.MessageId;
                                message.Header.GeneratedAtUtc = new
                                DateTime(messageProperties.Timestamp.UnixTime);
                                message.Header.ExpirationInMilliseconds = messageProperties.Expiration;
                                message.Header.IsPersistent = messageProperties.DeliveryMode ==
                                PersistentDeliveryMode;
                                if (messageProperties.Headers!=null &&
                                messageProperties.Headers.ContainsKey(SystemPropertiesKey))
                                {
                                var systemProperties =
                                DeserializeMessageProperties((byte)
                                messageProperties.Headers[SystemPropertiesKey]);
                                if (systemProperties.ContainsKey(MessageNameKey))
                                {
                                message.Header.MessageName = systemProperties[MessageNameKey];
                                }
                                }
                                if (messageProperties.Headers!=null &&
                                messageProperties.Headers.ContainsKey(ApplicationPropertiesKey))
                                {
                                var applicationProperties =
                                DeserializeMessageProperties((byte)
                                messageProperties.Headers[ApplicationPropertiesKey]);
                                foreach (var propPair in applicationProperties)
                                {
                                message.Header.Properties.Add(propPair.Key, propPair.Value);
                                }
                                }
                                }
                                #endregion
                                #region private methods
                                private static Dictionary<string, string> DeserializeMessageProperties(byte
                                properties)
                                {
                                //var serializer = new .JsonMessageSerializer();
                                var serializedText = JsonConvert.SerializeObject(properties);
                                return JsonConvert.DeserializeObject<Dictionary<string, string>>
                                (serializedText);
                                }
                                #endregion
                                }
                                public class QueueInfoModel
                                {
                                public string Environment { get; set; }
                                public string ApplicationGroup { get; set; }
                                public string ApplicationName { get; set; }
                                public string ErrorQueueName { get; set; }
                                public string OriginalQueueName { get; set; }
                                public int MessageCount { get; set; }
                                }
                                [Serializable]
                                public class MessageHeader
                                {
                                public MessageHeader()
                                {
                                this.MessageId = Guid.NewGuid().ToString();
                                this.Properties = new Dictionary<string, string>();
                                this.IsPersistent = true;
                                }
                                public string AppId { get; set; }
                                public string MessageId { get; set; }
                                public string MessageName { get; set; }
                                public DateTime GeneratedAtUtc { get; set; }
                                public string ExpirationInMilliseconds { get; set; }
                                public bool IsPersistent { get; set; }
                                public bool Delete { get; set; }
                                public bool Move { get; set; }
                                public IDictionary<string, string> Properties { get; private set; }
                                }
                                [Serializable]
                                public class Message
                                {
                                private readonly string serializableBody;
                                public Message(string serializableBody)
                                {
                                this.serializableBody = serializableBody;
                                }
                                public MessageHeader Header
                                {
                                get;
                                set;
                                }
                                public string MessageBody
                                {
                                get
                                {
                                return this.serializableBody;
                                }
                                }
                                public byte GetBody()
                                {
                                return (byte)((object)Encoding.ASCII.GetBytes(this.MessageBody));
                                }
                                }

                                }




                                share








                                New contributor




                                Punit Pandya is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
                                Check out our Code of Conduct.























                                  0














                                     To connect with RabbitMQ using C# code kindly follow below steps.

                                  1. You should have valid Host Name that you may get it from rabbitmq URL
                                  2. Virtual Domain name
                                  3. User name and Password if you are connecting with remote rebbitMQ server

                                  Please see below code,

                                  using RabbitMQ.Client;
                                  using RabbitMQ.Client.Events;
                                  using System;
                                  using System.Collections.Generic;
                                  using System.Linq;
                                  using System.Text;
                                  using System.Threading.Tasks;
                                  using Newtonsoft.Json;
                                  using System.IO;
                                  using System.Reflection;
                                  namespace RMQConnect
                                  {
                                  public class RabbitMQQueueManager
                                  {
                                  private const string ORIGINAL_QUEUENAME_KEY = "originalQueueName";
                                  private const ushort PREFETCH_SIZE = 50;

                                  public IConnection GetRabbitMqConnection()
                                  {
                                  ConnectionFactory factory = new ConnectionFactory
                                  {
                                  HostName = "d1vmrmqtrv01.oldev.arol.shell.com",
                                  VirtualHost = "TRV_ANDD",
                                  UserName = "ol_sqlmaint_trv",
                                  Password = "P@ssw0rd",

                                  };
                                  return factory.CreateConnection();
                                  }

                                  public List<Message> GetMessagesFromQueueNoAck(string queueName, int
                                  messageCount = -1)
                                  {
                                  QueueingBasicConsumer consumer = null;
                                  var responseMessages = new List<Message>();
                                  BasicDeliverEventArgs result = null;
                                  using (var rmqConnection = GetRabbitMqConnection())
                                  {
                                  using (var channel = rmqConnection.CreateModel())
                                  {
                                  try
                                  {
                                  var queueMessageCount = (int)channel.MessageCount(queueName);
                                  var count = messageCount > -1 ? messageCount <=
                                  queueMessageCount ? messageCount : queueMessageCount :
                                  queueMessageCount;
                                  var pfCount = count >= PREFETCH_SIZE ? PREFETCH_SIZE : count;
                                  channel.BasicQos(0, (ushort)pfCount, false);
                                  consumer = new QueueingBasicConsumer(channel);
                                  channel.BasicConsume(queueName, false, consumer);
                                  for (int i = 0; i < pfCount; i++)
                                  {
                                  if (!channel.IsOpen)
                                  {
                                  throw new ApplicationException("Channel is closed");
                                  }
                                  result = consumer.Queue.Dequeue();
                                  try
                                  {
                                  string messageData =
                                  System.Text.Encoding.UTF8.GetString(result.Body);
                                  var rMessage = new Message(messageData);
                                  rMessage.Header = new MessageHeader();



                                  RmqHeaderHandler.ReadRmqMessageProperties(result.BasicProperties, rMessage);
                                  channel.BasicNack(result.DeliveryTag, false, true);
                                  responseMessages.Add(rMessage);
                                  }
                                  catch (Exception ex)
                                  {
                                  channel.BasicNack(result.DeliveryTag, false, true);
                                  }
                                  }
                                  }
                                  catch (Exception)
                                  {
                                  ////Nack the message back to queue in case of exception
                                  if (result != null)
                                  {
                                  channel.BasicNack(result.DeliveryTag, false, true);
                                  }
                                  throw;
                                  }
                                  }
                                  }
                                  return responseMessages;
                                  }
                                  }
                                  public static class RmqHeaderHandler
                                  {
                                  private const byte NonPersistentDeliveryMode = 1;
                                  private const byte PersistentDeliveryMode = 2;
                                  private const string SecurityTokenKey = "SecurityToken";
                                  private const string Properties = "properties";
                                  private const string MessageNameKey = "MessageName";
                                  private const string SystemPropertiesKey = "SystemProperties";
                                  private const string ApplicationPropertiesKey = "ApplicationProperties";
                                  #region Public Methods
                                  public static void ReadDynamicMessageProperties(dynamic messageProperties,
                                  Message message)
                                  {
                                  try
                                  {
                                  message.Header.AppId = messageProperties.appId;
                                  message.Header.MessageId = messageProperties.messageId;
                                  message.Header.GeneratedAtUtc = messageProperties.generatedAtUtc;
                                  message.Header.ExpirationInMilliseconds =
                                  messageProperties.expirationInMilliseconds;
                                  message.Header.IsPersistent = messageProperties.isPersistent;
                                  message.Header.Delete = Convert.ToBoolean(messageProperties.delete);
                                  message.Header.Move = Convert.ToBoolean(messageProperties.move);
                                  if (messageProperties.ContainsKey("messageName"))
                                  {
                                  message.Header.MessageName = messageProperties.messageName;
                                  }
                                  if (messageProperties.ContainsKey("properties"))
                                  {
                                  var customProperties =
                                  Newtonsoft.Json.JsonConvert.DeserializeObject<Dictionary<string, string>>
                                  (Convert.ToString(messageProperties.properties));
                                  foreach (var propPair in customProperties)
                                  {
                                  message.Header.Properties.Add(propPair.Key, propPair.Value);
                                  }
                                  }
                                  }
                                  catch (Exception)
                                  {
                                  throw;
                                  }
                                  }
                                  public static void ReadRmqMessageProperties(IBasicProperties messageProperties,
                                  Message message)
                                  {
                                  //message.Header.AppId = messageProperties.AppId;
                                  message.Header.MessageId = messageProperties.MessageId;
                                  message.Header.GeneratedAtUtc = new
                                  DateTime(messageProperties.Timestamp.UnixTime);
                                  message.Header.ExpirationInMilliseconds = messageProperties.Expiration;
                                  message.Header.IsPersistent = messageProperties.DeliveryMode ==
                                  PersistentDeliveryMode;
                                  if (messageProperties.Headers!=null &&
                                  messageProperties.Headers.ContainsKey(SystemPropertiesKey))
                                  {
                                  var systemProperties =
                                  DeserializeMessageProperties((byte)
                                  messageProperties.Headers[SystemPropertiesKey]);
                                  if (systemProperties.ContainsKey(MessageNameKey))
                                  {
                                  message.Header.MessageName = systemProperties[MessageNameKey];
                                  }
                                  }
                                  if (messageProperties.Headers!=null &&
                                  messageProperties.Headers.ContainsKey(ApplicationPropertiesKey))
                                  {
                                  var applicationProperties =
                                  DeserializeMessageProperties((byte)
                                  messageProperties.Headers[ApplicationPropertiesKey]);
                                  foreach (var propPair in applicationProperties)
                                  {
                                  message.Header.Properties.Add(propPair.Key, propPair.Value);
                                  }
                                  }
                                  }
                                  #endregion
                                  #region private methods
                                  private static Dictionary<string, string> DeserializeMessageProperties(byte
                                  properties)
                                  {
                                  //var serializer = new .JsonMessageSerializer();
                                  var serializedText = JsonConvert.SerializeObject(properties);
                                  return JsonConvert.DeserializeObject<Dictionary<string, string>>
                                  (serializedText);
                                  }
                                  #endregion
                                  }
                                  public class QueueInfoModel
                                  {
                                  public string Environment { get; set; }
                                  public string ApplicationGroup { get; set; }
                                  public string ApplicationName { get; set; }
                                  public string ErrorQueueName { get; set; }
                                  public string OriginalQueueName { get; set; }
                                  public int MessageCount { get; set; }
                                  }
                                  [Serializable]
                                  public class MessageHeader
                                  {
                                  public MessageHeader()
                                  {
                                  this.MessageId = Guid.NewGuid().ToString();
                                  this.Properties = new Dictionary<string, string>();
                                  this.IsPersistent = true;
                                  }
                                  public string AppId { get; set; }
                                  public string MessageId { get; set; }
                                  public string MessageName { get; set; }
                                  public DateTime GeneratedAtUtc { get; set; }
                                  public string ExpirationInMilliseconds { get; set; }
                                  public bool IsPersistent { get; set; }
                                  public bool Delete { get; set; }
                                  public bool Move { get; set; }
                                  public IDictionary<string, string> Properties { get; private set; }
                                  }
                                  [Serializable]
                                  public class Message
                                  {
                                  private readonly string serializableBody;
                                  public Message(string serializableBody)
                                  {
                                  this.serializableBody = serializableBody;
                                  }
                                  public MessageHeader Header
                                  {
                                  get;
                                  set;
                                  }
                                  public string MessageBody
                                  {
                                  get
                                  {
                                  return this.serializableBody;
                                  }
                                  }
                                  public byte GetBody()
                                  {
                                  return (byte)((object)Encoding.ASCII.GetBytes(this.MessageBody));
                                  }
                                  }

                                  }




                                  share








                                  New contributor




                                  Punit Pandya is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
                                  Check out our Code of Conduct.





















                                    0












                                    0








                                    0






                                       To connect with RabbitMQ using C# code kindly follow below steps.

                                    1. You should have valid Host Name that you may get it from rabbitmq URL
                                    2. Virtual Domain name
                                    3. User name and Password if you are connecting with remote rebbitMQ server

                                    Please see below code,

                                    using RabbitMQ.Client;
                                    using RabbitMQ.Client.Events;
                                    using System;
                                    using System.Collections.Generic;
                                    using System.Linq;
                                    using System.Text;
                                    using System.Threading.Tasks;
                                    using Newtonsoft.Json;
                                    using System.IO;
                                    using System.Reflection;
                                    namespace RMQConnect
                                    {
                                    public class RabbitMQQueueManager
                                    {
                                    private const string ORIGINAL_QUEUENAME_KEY = "originalQueueName";
                                    private const ushort PREFETCH_SIZE = 50;

                                    public IConnection GetRabbitMqConnection()
                                    {
                                    ConnectionFactory factory = new ConnectionFactory
                                    {
                                    HostName = "d1vmrmqtrv01.oldev.arol.shell.com",
                                    VirtualHost = "TRV_ANDD",
                                    UserName = "ol_sqlmaint_trv",
                                    Password = "P@ssw0rd",

                                    };
                                    return factory.CreateConnection();
                                    }

                                    public List<Message> GetMessagesFromQueueNoAck(string queueName, int
                                    messageCount = -1)
                                    {
                                    QueueingBasicConsumer consumer = null;
                                    var responseMessages = new List<Message>();
                                    BasicDeliverEventArgs result = null;
                                    using (var rmqConnection = GetRabbitMqConnection())
                                    {
                                    using (var channel = rmqConnection.CreateModel())
                                    {
                                    try
                                    {
                                    var queueMessageCount = (int)channel.MessageCount(queueName);
                                    var count = messageCount > -1 ? messageCount <=
                                    queueMessageCount ? messageCount : queueMessageCount :
                                    queueMessageCount;
                                    var pfCount = count >= PREFETCH_SIZE ? PREFETCH_SIZE : count;
                                    channel.BasicQos(0, (ushort)pfCount, false);
                                    consumer = new QueueingBasicConsumer(channel);
                                    channel.BasicConsume(queueName, false, consumer);
                                    for (int i = 0; i < pfCount; i++)
                                    {
                                    if (!channel.IsOpen)
                                    {
                                    throw new ApplicationException("Channel is closed");
                                    }
                                    result = consumer.Queue.Dequeue();
                                    try
                                    {
                                    string messageData =
                                    System.Text.Encoding.UTF8.GetString(result.Body);
                                    var rMessage = new Message(messageData);
                                    rMessage.Header = new MessageHeader();



                                    RmqHeaderHandler.ReadRmqMessageProperties(result.BasicProperties, rMessage);
                                    channel.BasicNack(result.DeliveryTag, false, true);
                                    responseMessages.Add(rMessage);
                                    }
                                    catch (Exception ex)
                                    {
                                    channel.BasicNack(result.DeliveryTag, false, true);
                                    }
                                    }
                                    }
                                    catch (Exception)
                                    {
                                    ////Nack the message back to queue in case of exception
                                    if (result != null)
                                    {
                                    channel.BasicNack(result.DeliveryTag, false, true);
                                    }
                                    throw;
                                    }
                                    }
                                    }
                                    return responseMessages;
                                    }
                                    }
                                    public static class RmqHeaderHandler
                                    {
                                    private const byte NonPersistentDeliveryMode = 1;
                                    private const byte PersistentDeliveryMode = 2;
                                    private const string SecurityTokenKey = "SecurityToken";
                                    private const string Properties = "properties";
                                    private const string MessageNameKey = "MessageName";
                                    private const string SystemPropertiesKey = "SystemProperties";
                                    private const string ApplicationPropertiesKey = "ApplicationProperties";
                                    #region Public Methods
                                    public static void ReadDynamicMessageProperties(dynamic messageProperties,
                                    Message message)
                                    {
                                    try
                                    {
                                    message.Header.AppId = messageProperties.appId;
                                    message.Header.MessageId = messageProperties.messageId;
                                    message.Header.GeneratedAtUtc = messageProperties.generatedAtUtc;
                                    message.Header.ExpirationInMilliseconds =
                                    messageProperties.expirationInMilliseconds;
                                    message.Header.IsPersistent = messageProperties.isPersistent;
                                    message.Header.Delete = Convert.ToBoolean(messageProperties.delete);
                                    message.Header.Move = Convert.ToBoolean(messageProperties.move);
                                    if (messageProperties.ContainsKey("messageName"))
                                    {
                                    message.Header.MessageName = messageProperties.messageName;
                                    }
                                    if (messageProperties.ContainsKey("properties"))
                                    {
                                    var customProperties =
                                    Newtonsoft.Json.JsonConvert.DeserializeObject<Dictionary<string, string>>
                                    (Convert.ToString(messageProperties.properties));
                                    foreach (var propPair in customProperties)
                                    {
                                    message.Header.Properties.Add(propPair.Key, propPair.Value);
                                    }
                                    }
                                    }
                                    catch (Exception)
                                    {
                                    throw;
                                    }
                                    }
                                    public static void ReadRmqMessageProperties(IBasicProperties messageProperties,
                                    Message message)
                                    {
                                    //message.Header.AppId = messageProperties.AppId;
                                    message.Header.MessageId = messageProperties.MessageId;
                                    message.Header.GeneratedAtUtc = new
                                    DateTime(messageProperties.Timestamp.UnixTime);
                                    message.Header.ExpirationInMilliseconds = messageProperties.Expiration;
                                    message.Header.IsPersistent = messageProperties.DeliveryMode ==
                                    PersistentDeliveryMode;
                                    if (messageProperties.Headers!=null &&
                                    messageProperties.Headers.ContainsKey(SystemPropertiesKey))
                                    {
                                    var systemProperties =
                                    DeserializeMessageProperties((byte)
                                    messageProperties.Headers[SystemPropertiesKey]);
                                    if (systemProperties.ContainsKey(MessageNameKey))
                                    {
                                    message.Header.MessageName = systemProperties[MessageNameKey];
                                    }
                                    }
                                    if (messageProperties.Headers!=null &&
                                    messageProperties.Headers.ContainsKey(ApplicationPropertiesKey))
                                    {
                                    var applicationProperties =
                                    DeserializeMessageProperties((byte)
                                    messageProperties.Headers[ApplicationPropertiesKey]);
                                    foreach (var propPair in applicationProperties)
                                    {
                                    message.Header.Properties.Add(propPair.Key, propPair.Value);
                                    }
                                    }
                                    }
                                    #endregion
                                    #region private methods
                                    private static Dictionary<string, string> DeserializeMessageProperties(byte
                                    properties)
                                    {
                                    //var serializer = new .JsonMessageSerializer();
                                    var serializedText = JsonConvert.SerializeObject(properties);
                                    return JsonConvert.DeserializeObject<Dictionary<string, string>>
                                    (serializedText);
                                    }
                                    #endregion
                                    }
                                    public class QueueInfoModel
                                    {
                                    public string Environment { get; set; }
                                    public string ApplicationGroup { get; set; }
                                    public string ApplicationName { get; set; }
                                    public string ErrorQueueName { get; set; }
                                    public string OriginalQueueName { get; set; }
                                    public int MessageCount { get; set; }
                                    }
                                    [Serializable]
                                    public class MessageHeader
                                    {
                                    public MessageHeader()
                                    {
                                    this.MessageId = Guid.NewGuid().ToString();
                                    this.Properties = new Dictionary<string, string>();
                                    this.IsPersistent = true;
                                    }
                                    public string AppId { get; set; }
                                    public string MessageId { get; set; }
                                    public string MessageName { get; set; }
                                    public DateTime GeneratedAtUtc { get; set; }
                                    public string ExpirationInMilliseconds { get; set; }
                                    public bool IsPersistent { get; set; }
                                    public bool Delete { get; set; }
                                    public bool Move { get; set; }
                                    public IDictionary<string, string> Properties { get; private set; }
                                    }
                                    [Serializable]
                                    public class Message
                                    {
                                    private readonly string serializableBody;
                                    public Message(string serializableBody)
                                    {
                                    this.serializableBody = serializableBody;
                                    }
                                    public MessageHeader Header
                                    {
                                    get;
                                    set;
                                    }
                                    public string MessageBody
                                    {
                                    get
                                    {
                                    return this.serializableBody;
                                    }
                                    }
                                    public byte GetBody()
                                    {
                                    return (byte)((object)Encoding.ASCII.GetBytes(this.MessageBody));
                                    }
                                    }

                                    }




                                    share








                                    New contributor




                                    Punit Pandya is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
                                    Check out our Code of Conduct.









                                       To connect with RabbitMQ using C# code kindly follow below steps.

                                    1. You should have valid Host Name that you may get it from rabbitmq URL
                                    2. Virtual Domain name
                                    3. User name and Password if you are connecting with remote rebbitMQ server

                                    Please see below code,

                                    using RabbitMQ.Client;
                                    using RabbitMQ.Client.Events;
                                    using System;
                                    using System.Collections.Generic;
                                    using System.Linq;
                                    using System.Text;
                                    using System.Threading.Tasks;
                                    using Newtonsoft.Json;
                                    using System.IO;
                                    using System.Reflection;
                                    namespace RMQConnect
                                    {
                                    public class RabbitMQQueueManager
                                    {
                                    private const string ORIGINAL_QUEUENAME_KEY = "originalQueueName";
                                    private const ushort PREFETCH_SIZE = 50;

                                    public IConnection GetRabbitMqConnection()
                                    {
                                    ConnectionFactory factory = new ConnectionFactory
                                    {
                                    HostName = "d1vmrmqtrv01.oldev.arol.shell.com",
                                    VirtualHost = "TRV_ANDD",
                                    UserName = "ol_sqlmaint_trv",
                                    Password = "P@ssw0rd",

                                    };
                                    return factory.CreateConnection();
                                    }

                                    public List<Message> GetMessagesFromQueueNoAck(string queueName, int
                                    messageCount = -1)
                                    {
                                    QueueingBasicConsumer consumer = null;
                                    var responseMessages = new List<Message>();
                                    BasicDeliverEventArgs result = null;
                                    using (var rmqConnection = GetRabbitMqConnection())
                                    {
                                    using (var channel = rmqConnection.CreateModel())
                                    {
                                    try
                                    {
                                    var queueMessageCount = (int)channel.MessageCount(queueName);
                                    var count = messageCount > -1 ? messageCount <=
                                    queueMessageCount ? messageCount : queueMessageCount :
                                    queueMessageCount;
                                    var pfCount = count >= PREFETCH_SIZE ? PREFETCH_SIZE : count;
                                    channel.BasicQos(0, (ushort)pfCount, false);
                                    consumer = new QueueingBasicConsumer(channel);
                                    channel.BasicConsume(queueName, false, consumer);
                                    for (int i = 0; i < pfCount; i++)
                                    {
                                    if (!channel.IsOpen)
                                    {
                                    throw new ApplicationException("Channel is closed");
                                    }
                                    result = consumer.Queue.Dequeue();
                                    try
                                    {
                                    string messageData =
                                    System.Text.Encoding.UTF8.GetString(result.Body);
                                    var rMessage = new Message(messageData);
                                    rMessage.Header = new MessageHeader();



                                    RmqHeaderHandler.ReadRmqMessageProperties(result.BasicProperties, rMessage);
                                    channel.BasicNack(result.DeliveryTag, false, true);
                                    responseMessages.Add(rMessage);
                                    }
                                    catch (Exception ex)
                                    {
                                    channel.BasicNack(result.DeliveryTag, false, true);
                                    }
                                    }
                                    }
                                    catch (Exception)
                                    {
                                    ////Nack the message back to queue in case of exception
                                    if (result != null)
                                    {
                                    channel.BasicNack(result.DeliveryTag, false, true);
                                    }
                                    throw;
                                    }
                                    }
                                    }
                                    return responseMessages;
                                    }
                                    }
                                    public static class RmqHeaderHandler
                                    {
                                    private const byte NonPersistentDeliveryMode = 1;
                                    private const byte PersistentDeliveryMode = 2;
                                    private const string SecurityTokenKey = "SecurityToken";
                                    private const string Properties = "properties";
                                    private const string MessageNameKey = "MessageName";
                                    private const string SystemPropertiesKey = "SystemProperties";
                                    private const string ApplicationPropertiesKey = "ApplicationProperties";
                                    #region Public Methods
                                    public static void ReadDynamicMessageProperties(dynamic messageProperties,
                                    Message message)
                                    {
                                    try
                                    {
                                    message.Header.AppId = messageProperties.appId;
                                    message.Header.MessageId = messageProperties.messageId;
                                    message.Header.GeneratedAtUtc = messageProperties.generatedAtUtc;
                                    message.Header.ExpirationInMilliseconds =
                                    messageProperties.expirationInMilliseconds;
                                    message.Header.IsPersistent = messageProperties.isPersistent;
                                    message.Header.Delete = Convert.ToBoolean(messageProperties.delete);
                                    message.Header.Move = Convert.ToBoolean(messageProperties.move);
                                    if (messageProperties.ContainsKey("messageName"))
                                    {
                                    message.Header.MessageName = messageProperties.messageName;
                                    }
                                    if (messageProperties.ContainsKey("properties"))
                                    {
                                    var customProperties =
                                    Newtonsoft.Json.JsonConvert.DeserializeObject<Dictionary<string, string>>
                                    (Convert.ToString(messageProperties.properties));
                                    foreach (var propPair in customProperties)
                                    {
                                    message.Header.Properties.Add(propPair.Key, propPair.Value);
                                    }
                                    }
                                    }
                                    catch (Exception)
                                    {
                                    throw;
                                    }
                                    }
                                    public static void ReadRmqMessageProperties(IBasicProperties messageProperties,
                                    Message message)
                                    {
                                    //message.Header.AppId = messageProperties.AppId;
                                    message.Header.MessageId = messageProperties.MessageId;
                                    message.Header.GeneratedAtUtc = new
                                    DateTime(messageProperties.Timestamp.UnixTime);
                                    message.Header.ExpirationInMilliseconds = messageProperties.Expiration;
                                    message.Header.IsPersistent = messageProperties.DeliveryMode ==
                                    PersistentDeliveryMode;
                                    if (messageProperties.Headers!=null &&
                                    messageProperties.Headers.ContainsKey(SystemPropertiesKey))
                                    {
                                    var systemProperties =
                                    DeserializeMessageProperties((byte)
                                    messageProperties.Headers[SystemPropertiesKey]);
                                    if (systemProperties.ContainsKey(MessageNameKey))
                                    {
                                    message.Header.MessageName = systemProperties[MessageNameKey];
                                    }
                                    }
                                    if (messageProperties.Headers!=null &&
                                    messageProperties.Headers.ContainsKey(ApplicationPropertiesKey))
                                    {
                                    var applicationProperties =
                                    DeserializeMessageProperties((byte)
                                    messageProperties.Headers[ApplicationPropertiesKey]);
                                    foreach (var propPair in applicationProperties)
                                    {
                                    message.Header.Properties.Add(propPair.Key, propPair.Value);
                                    }
                                    }
                                    }
                                    #endregion
                                    #region private methods
                                    private static Dictionary<string, string> DeserializeMessageProperties(byte
                                    properties)
                                    {
                                    //var serializer = new .JsonMessageSerializer();
                                    var serializedText = JsonConvert.SerializeObject(properties);
                                    return JsonConvert.DeserializeObject<Dictionary<string, string>>
                                    (serializedText);
                                    }
                                    #endregion
                                    }
                                    public class QueueInfoModel
                                    {
                                    public string Environment { get; set; }
                                    public string ApplicationGroup { get; set; }
                                    public string ApplicationName { get; set; }
                                    public string ErrorQueueName { get; set; }
                                    public string OriginalQueueName { get; set; }
                                    public int MessageCount { get; set; }
                                    }
                                    [Serializable]
                                    public class MessageHeader
                                    {
                                    public MessageHeader()
                                    {
                                    this.MessageId = Guid.NewGuid().ToString();
                                    this.Properties = new Dictionary<string, string>();
                                    this.IsPersistent = true;
                                    }
                                    public string AppId { get; set; }
                                    public string MessageId { get; set; }
                                    public string MessageName { get; set; }
                                    public DateTime GeneratedAtUtc { get; set; }
                                    public string ExpirationInMilliseconds { get; set; }
                                    public bool IsPersistent { get; set; }
                                    public bool Delete { get; set; }
                                    public bool Move { get; set; }
                                    public IDictionary<string, string> Properties { get; private set; }
                                    }
                                    [Serializable]
                                    public class Message
                                    {
                                    private readonly string serializableBody;
                                    public Message(string serializableBody)
                                    {
                                    this.serializableBody = serializableBody;
                                    }
                                    public MessageHeader Header
                                    {
                                    get;
                                    set;
                                    }
                                    public string MessageBody
                                    {
                                    get
                                    {
                                    return this.serializableBody;
                                    }
                                    }
                                    public byte GetBody()
                                    {
                                    return (byte)((object)Encoding.ASCII.GetBytes(this.MessageBody));
                                    }
                                    }

                                    }





                                    share








                                    New contributor




                                    Punit Pandya is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
                                    Check out our Code of Conduct.








                                    share


                                    share






                                    New contributor




                                    Punit Pandya is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
                                    Check out our Code of Conduct.









                                    answered 4 mins ago









                                    Punit Pandya

                                    1




                                    1




                                    New contributor




                                    Punit Pandya is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
                                    Check out our Code of Conduct.





                                    New contributor





                                    Punit Pandya is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
                                    Check out our Code of Conduct.






                                    Punit Pandya is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
                                    Check out our Code of Conduct.






























                                        draft saved

                                        draft discarded




















































                                        Thanks for contributing an answer to Code Review Stack Exchange!


                                        • Please be sure to answer the question. Provide details and share your research!

                                        But avoid



                                        • Asking for help, clarification, or responding to other answers.

                                        • Making statements based on opinion; back them up with references or personal experience.


                                        Use MathJax to format equations. MathJax reference.


                                        To learn more, see our tips on writing great answers.





                                        Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


                                        Please pay close attention to the following guidance:


                                        • Please be sure to answer the question. Provide details and share your research!

                                        But avoid



                                        • Asking for help, clarification, or responding to other answers.

                                        • Making statements based on opinion; back them up with references or personal experience.


                                        To learn more, see our tips on writing great answers.




                                        draft saved


                                        draft discarded














                                        StackExchange.ready(
                                        function () {
                                        StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f55312%2fconnecting-to-rabbitmq%23new-answer', 'question_page');
                                        }
                                        );

                                        Post as a guest















                                        Required, but never shown





















































                                        Required, but never shown














                                        Required, but never shown












                                        Required, but never shown







                                        Required, but never shown

































                                        Required, but never shown














                                        Required, but never shown












                                        Required, but never shown







                                        Required, but never shown







                                        Popular posts from this blog

                                        404 Error Contact Form 7 ajax form submitting

                                        How to know if a Active Directory user can login interactively

                                        TypeError: fit_transform() missing 1 required positional argument: 'X'