Connecting to RabbitMQ
I would like a review for this connection to RabbitMQ. I just developed it and seems to be working well, but I would like another set of eyes on it before putting this on the sever.
package models
import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.MessageProperties
import anorm.SQL
import anorm.sqlToSimple
import anorm.toParameterValue
import play.api.Play.current
object RabbitMQConnection {
private var connection: Connection = null
def getConnection(ss:Connection): Connection = {
println(ss+" connection <<<<<<<<<<<<<<<<<<<<<<<<")
connection=ss
connection match {
case null => {
val factory = new ConnectionFactory()
println("waiting for new connection")
factory.setHost("172.22.22.222")
println("host setted")
connection = factory.newConnection()
println("connection created")
connection
}
case _ =>{
println("connection is not null")
connection
}
}
}
}
object RMQ {
var connection = RabbitMQConnection.getConnection(null)
def setQ(qName: String, message: String) = {
println("ping received")
try {
println(connection)
if (connection != null) {
if (connection.isOpen()) {
println("connection is open")
} else {
connection = RabbitMQConnection.getConnection(null)
println("connection is new "+connection)
}
println("connetion is ready to use")
val channel = connection.createChannel()
channel.queueDeclare(qName, true, false, false, null) //suggestion
channel.basicPublish("", qName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes())
println("status" + channel.close())
println("setQ complete executed for " + qName)
Map("result" -> "success")
} else {
println("connection can't established to rabbit mq for =>" + qName)
LogFile.QLogs(qName, message)
Map("result" -> "error")
}
} catch {
case e: Exception =>
println(e.printStackTrace())
println("Rabbit Mq Server is Down for =>" + qName)
LogFile.QLogs(qName, message)
Map("result" ->"error")
}
}
}
scala connection-pool rabbitmq
add a comment |
I would like a review for this connection to RabbitMQ. I just developed it and seems to be working well, but I would like another set of eyes on it before putting this on the sever.
package models
import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.MessageProperties
import anorm.SQL
import anorm.sqlToSimple
import anorm.toParameterValue
import play.api.Play.current
object RabbitMQConnection {
private var connection: Connection = null
def getConnection(ss:Connection): Connection = {
println(ss+" connection <<<<<<<<<<<<<<<<<<<<<<<<")
connection=ss
connection match {
case null => {
val factory = new ConnectionFactory()
println("waiting for new connection")
factory.setHost("172.22.22.222")
println("host setted")
connection = factory.newConnection()
println("connection created")
connection
}
case _ =>{
println("connection is not null")
connection
}
}
}
}
object RMQ {
var connection = RabbitMQConnection.getConnection(null)
def setQ(qName: String, message: String) = {
println("ping received")
try {
println(connection)
if (connection != null) {
if (connection.isOpen()) {
println("connection is open")
} else {
connection = RabbitMQConnection.getConnection(null)
println("connection is new "+connection)
}
println("connetion is ready to use")
val channel = connection.createChannel()
channel.queueDeclare(qName, true, false, false, null) //suggestion
channel.basicPublish("", qName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes())
println("status" + channel.close())
println("setQ complete executed for " + qName)
Map("result" -> "success")
} else {
println("connection can't established to rabbit mq for =>" + qName)
LogFile.QLogs(qName, message)
Map("result" -> "error")
}
} catch {
case e: Exception =>
println(e.printStackTrace())
println("Rabbit Mq Server is Down for =>" + qName)
LogFile.QLogs(qName, message)
Map("result" ->"error")
}
}
}
scala connection-pool rabbitmq
add a comment |
I would like a review for this connection to RabbitMQ. I just developed it and seems to be working well, but I would like another set of eyes on it before putting this on the sever.
package models
import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.MessageProperties
import anorm.SQL
import anorm.sqlToSimple
import anorm.toParameterValue
import play.api.Play.current
object RabbitMQConnection {
private var connection: Connection = null
def getConnection(ss:Connection): Connection = {
println(ss+" connection <<<<<<<<<<<<<<<<<<<<<<<<")
connection=ss
connection match {
case null => {
val factory = new ConnectionFactory()
println("waiting for new connection")
factory.setHost("172.22.22.222")
println("host setted")
connection = factory.newConnection()
println("connection created")
connection
}
case _ =>{
println("connection is not null")
connection
}
}
}
}
object RMQ {
var connection = RabbitMQConnection.getConnection(null)
def setQ(qName: String, message: String) = {
println("ping received")
try {
println(connection)
if (connection != null) {
if (connection.isOpen()) {
println("connection is open")
} else {
connection = RabbitMQConnection.getConnection(null)
println("connection is new "+connection)
}
println("connetion is ready to use")
val channel = connection.createChannel()
channel.queueDeclare(qName, true, false, false, null) //suggestion
channel.basicPublish("", qName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes())
println("status" + channel.close())
println("setQ complete executed for " + qName)
Map("result" -> "success")
} else {
println("connection can't established to rabbit mq for =>" + qName)
LogFile.QLogs(qName, message)
Map("result" -> "error")
}
} catch {
case e: Exception =>
println(e.printStackTrace())
println("Rabbit Mq Server is Down for =>" + qName)
LogFile.QLogs(qName, message)
Map("result" ->"error")
}
}
}
scala connection-pool rabbitmq
I would like a review for this connection to RabbitMQ. I just developed it and seems to be working well, but I would like another set of eyes on it before putting this on the sever.
package models
import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.MessageProperties
import anorm.SQL
import anorm.sqlToSimple
import anorm.toParameterValue
import play.api.Play.current
object RabbitMQConnection {
private var connection: Connection = null
def getConnection(ss:Connection): Connection = {
println(ss+" connection <<<<<<<<<<<<<<<<<<<<<<<<")
connection=ss
connection match {
case null => {
val factory = new ConnectionFactory()
println("waiting for new connection")
factory.setHost("172.22.22.222")
println("host setted")
connection = factory.newConnection()
println("connection created")
connection
}
case _ =>{
println("connection is not null")
connection
}
}
}
}
object RMQ {
var connection = RabbitMQConnection.getConnection(null)
def setQ(qName: String, message: String) = {
println("ping received")
try {
println(connection)
if (connection != null) {
if (connection.isOpen()) {
println("connection is open")
} else {
connection = RabbitMQConnection.getConnection(null)
println("connection is new "+connection)
}
println("connetion is ready to use")
val channel = connection.createChannel()
channel.queueDeclare(qName, true, false, false, null) //suggestion
channel.basicPublish("", qName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes())
println("status" + channel.close())
println("setQ complete executed for " + qName)
Map("result" -> "success")
} else {
println("connection can't established to rabbit mq for =>" + qName)
LogFile.QLogs(qName, message)
Map("result" -> "error")
}
} catch {
case e: Exception =>
println(e.printStackTrace())
println("Rabbit Mq Server is Down for =>" + qName)
LogFile.QLogs(qName, message)
Map("result" ->"error")
}
}
}
scala connection-pool rabbitmq
scala connection-pool rabbitmq
edited Jun 27 '14 at 16:45
rolfl♦
90.7k13190394
90.7k13190394
asked Jun 26 '14 at 10:17
Govind Singh Nagarkoti
457322
457322
add a comment |
add a comment |
3 Answers
3
active
oldest
votes
Well, the first thing I would recommend is replacing all of those println
statements with some kind of logging. In any kind of production environment, logging is something that needs more thought and care so that it is useful, without consuming resources unnecessarily. Or, if they're just for the original developer during development, maybe delete them.
I find the structure of this code to be rather odd. For all the trappings of OO programming, what you essentially have is two global public functions and one global public variable.
The function name setQ
doesn't seem to say what you're actually doing - which appears to be creating a channel and publishing a message to it. publishMessage
would make more sense, I think.
All of the code that checks the usability of the connection should be extracted into a single function, something along the lines of haveUsableConnection
- the two if/else blocks are making it hard to see the code that is actually doing the work.
I question the hard coded IP address. Magic numbers and magic strings are frequently a bad idea.
add a comment |
Don't use
null
, but insteadOption[Connection]
.null
is frowned upon in Scala.It looks quite odd that
getConnection(ss:Connection)
takes in aConnection
. That method either sets the connection (opposite of get!), or initialize and returns the connection if the argument is null. Create two different methods. It does not make much sense as it is... it looks like you just wanted to use pattern matching somewhere.You can get rid of the whole object
RabbitMQConnection
and replace your lazy initialization of the connection with the Scalalazy
keyword. (EDIT: actually, it is not clear what exactly you should do with the connection, since you also have to always check if it is still open. I'll let you figure this out, but you should be aware of Scala'slazy
.)Since
setQ
only needs theconnection
to create aChannel
, I would just have a method to fetch theChannel
. That method would check if theConnection
is open and restart it if needed. (Separation of concerns.)I hope all the
println
are just for debugging and will be removed. If you keep those, use some logging service instead, even if it just prints to console in the end. More generally, you should strive for separation of concerns and find a way to separate the code that actually does something and the logging code.I would declare the return type of
setQ
.About that return type: instead of
Map("result" -> "success")
andMap("result" ->"error")
use Scala'sTry
.Try
is actually a monad so you'll be able to callmap
andflatMap
on the result ofsetQ
.
add a comment |
To connect with RabbitMQ using C# code kindly follow below steps.
1. You should have valid Host Name that you may get it from rabbitmq URL
2. Virtual Domain name
3. User name and Password if you are connecting with remote rebbitMQ server
Please see below code,
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using System.IO;
using System.Reflection;
namespace RMQConnect
{
public class RabbitMQQueueManager
{
private const string ORIGINAL_QUEUENAME_KEY = "originalQueueName";
private const ushort PREFETCH_SIZE = 50;
public IConnection GetRabbitMqConnection()
{
ConnectionFactory factory = new ConnectionFactory
{
HostName = "d1vmrmqtrv01.oldev.arol.shell.com",
VirtualHost = "TRV_ANDD",
UserName = "ol_sqlmaint_trv",
Password = "P@ssw0rd",
};
return factory.CreateConnection();
}
public List<Message> GetMessagesFromQueueNoAck(string queueName, int
messageCount = -1)
{
QueueingBasicConsumer consumer = null;
var responseMessages = new List<Message>();
BasicDeliverEventArgs result = null;
using (var rmqConnection = GetRabbitMqConnection())
{
using (var channel = rmqConnection.CreateModel())
{
try
{
var queueMessageCount = (int)channel.MessageCount(queueName);
var count = messageCount > -1 ? messageCount <=
queueMessageCount ? messageCount : queueMessageCount :
queueMessageCount;
var pfCount = count >= PREFETCH_SIZE ? PREFETCH_SIZE : count;
channel.BasicQos(0, (ushort)pfCount, false);
consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, false, consumer);
for (int i = 0; i < pfCount; i++)
{
if (!channel.IsOpen)
{
throw new ApplicationException("Channel is closed");
}
result = consumer.Queue.Dequeue();
try
{
string messageData =
System.Text.Encoding.UTF8.GetString(result.Body);
var rMessage = new Message(messageData);
rMessage.Header = new MessageHeader();
RmqHeaderHandler.ReadRmqMessageProperties(result.BasicProperties, rMessage);
channel.BasicNack(result.DeliveryTag, false, true);
responseMessages.Add(rMessage);
}
catch (Exception ex)
{
channel.BasicNack(result.DeliveryTag, false, true);
}
}
}
catch (Exception)
{
////Nack the message back to queue in case of exception
if (result != null)
{
channel.BasicNack(result.DeliveryTag, false, true);
}
throw;
}
}
}
return responseMessages;
}
}
public static class RmqHeaderHandler
{
private const byte NonPersistentDeliveryMode = 1;
private const byte PersistentDeliveryMode = 2;
private const string SecurityTokenKey = "SecurityToken";
private const string Properties = "properties";
private const string MessageNameKey = "MessageName";
private const string SystemPropertiesKey = "SystemProperties";
private const string ApplicationPropertiesKey = "ApplicationProperties";
#region Public Methods
public static void ReadDynamicMessageProperties(dynamic messageProperties,
Message message)
{
try
{
message.Header.AppId = messageProperties.appId;
message.Header.MessageId = messageProperties.messageId;
message.Header.GeneratedAtUtc = messageProperties.generatedAtUtc;
message.Header.ExpirationInMilliseconds =
messageProperties.expirationInMilliseconds;
message.Header.IsPersistent = messageProperties.isPersistent;
message.Header.Delete = Convert.ToBoolean(messageProperties.delete);
message.Header.Move = Convert.ToBoolean(messageProperties.move);
if (messageProperties.ContainsKey("messageName"))
{
message.Header.MessageName = messageProperties.messageName;
}
if (messageProperties.ContainsKey("properties"))
{
var customProperties =
Newtonsoft.Json.JsonConvert.DeserializeObject<Dictionary<string, string>>
(Convert.ToString(messageProperties.properties));
foreach (var propPair in customProperties)
{
message.Header.Properties.Add(propPair.Key, propPair.Value);
}
}
}
catch (Exception)
{
throw;
}
}
public static void ReadRmqMessageProperties(IBasicProperties messageProperties,
Message message)
{
//message.Header.AppId = messageProperties.AppId;
message.Header.MessageId = messageProperties.MessageId;
message.Header.GeneratedAtUtc = new
DateTime(messageProperties.Timestamp.UnixTime);
message.Header.ExpirationInMilliseconds = messageProperties.Expiration;
message.Header.IsPersistent = messageProperties.DeliveryMode ==
PersistentDeliveryMode;
if (messageProperties.Headers!=null &&
messageProperties.Headers.ContainsKey(SystemPropertiesKey))
{
var systemProperties =
DeserializeMessageProperties((byte)
messageProperties.Headers[SystemPropertiesKey]);
if (systemProperties.ContainsKey(MessageNameKey))
{
message.Header.MessageName = systemProperties[MessageNameKey];
}
}
if (messageProperties.Headers!=null &&
messageProperties.Headers.ContainsKey(ApplicationPropertiesKey))
{
var applicationProperties =
DeserializeMessageProperties((byte)
messageProperties.Headers[ApplicationPropertiesKey]);
foreach (var propPair in applicationProperties)
{
message.Header.Properties.Add(propPair.Key, propPair.Value);
}
}
}
#endregion
#region private methods
private static Dictionary<string, string> DeserializeMessageProperties(byte
properties)
{
//var serializer = new .JsonMessageSerializer();
var serializedText = JsonConvert.SerializeObject(properties);
return JsonConvert.DeserializeObject<Dictionary<string, string>>
(serializedText);
}
#endregion
}
public class QueueInfoModel
{
public string Environment { get; set; }
public string ApplicationGroup { get; set; }
public string ApplicationName { get; set; }
public string ErrorQueueName { get; set; }
public string OriginalQueueName { get; set; }
public int MessageCount { get; set; }
}
[Serializable]
public class MessageHeader
{
public MessageHeader()
{
this.MessageId = Guid.NewGuid().ToString();
this.Properties = new Dictionary<string, string>();
this.IsPersistent = true;
}
public string AppId { get; set; }
public string MessageId { get; set; }
public string MessageName { get; set; }
public DateTime GeneratedAtUtc { get; set; }
public string ExpirationInMilliseconds { get; set; }
public bool IsPersistent { get; set; }
public bool Delete { get; set; }
public bool Move { get; set; }
public IDictionary<string, string> Properties { get; private set; }
}
[Serializable]
public class Message
{
private readonly string serializableBody;
public Message(string serializableBody)
{
this.serializableBody = serializableBody;
}
public MessageHeader Header
{
get;
set;
}
public string MessageBody
{
get
{
return this.serializableBody;
}
}
public byte GetBody()
{
return (byte)((object)Encoding.ASCII.GetBytes(this.MessageBody));
}
}
}
New contributor
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
return StackExchange.using("mathjaxEditing", function () {
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
});
});
}, "mathjax-editing");
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "196"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: false,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f55312%2fconnecting-to-rabbitmq%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
Well, the first thing I would recommend is replacing all of those println
statements with some kind of logging. In any kind of production environment, logging is something that needs more thought and care so that it is useful, without consuming resources unnecessarily. Or, if they're just for the original developer during development, maybe delete them.
I find the structure of this code to be rather odd. For all the trappings of OO programming, what you essentially have is two global public functions and one global public variable.
The function name setQ
doesn't seem to say what you're actually doing - which appears to be creating a channel and publishing a message to it. publishMessage
would make more sense, I think.
All of the code that checks the usability of the connection should be extracted into a single function, something along the lines of haveUsableConnection
- the two if/else blocks are making it hard to see the code that is actually doing the work.
I question the hard coded IP address. Magic numbers and magic strings are frequently a bad idea.
add a comment |
Well, the first thing I would recommend is replacing all of those println
statements with some kind of logging. In any kind of production environment, logging is something that needs more thought and care so that it is useful, without consuming resources unnecessarily. Or, if they're just for the original developer during development, maybe delete them.
I find the structure of this code to be rather odd. For all the trappings of OO programming, what you essentially have is two global public functions and one global public variable.
The function name setQ
doesn't seem to say what you're actually doing - which appears to be creating a channel and publishing a message to it. publishMessage
would make more sense, I think.
All of the code that checks the usability of the connection should be extracted into a single function, something along the lines of haveUsableConnection
- the two if/else blocks are making it hard to see the code that is actually doing the work.
I question the hard coded IP address. Magic numbers and magic strings are frequently a bad idea.
add a comment |
Well, the first thing I would recommend is replacing all of those println
statements with some kind of logging. In any kind of production environment, logging is something that needs more thought and care so that it is useful, without consuming resources unnecessarily. Or, if they're just for the original developer during development, maybe delete them.
I find the structure of this code to be rather odd. For all the trappings of OO programming, what you essentially have is two global public functions and one global public variable.
The function name setQ
doesn't seem to say what you're actually doing - which appears to be creating a channel and publishing a message to it. publishMessage
would make more sense, I think.
All of the code that checks the usability of the connection should be extracted into a single function, something along the lines of haveUsableConnection
- the two if/else blocks are making it hard to see the code that is actually doing the work.
I question the hard coded IP address. Magic numbers and magic strings are frequently a bad idea.
Well, the first thing I would recommend is replacing all of those println
statements with some kind of logging. In any kind of production environment, logging is something that needs more thought and care so that it is useful, without consuming resources unnecessarily. Or, if they're just for the original developer during development, maybe delete them.
I find the structure of this code to be rather odd. For all the trappings of OO programming, what you essentially have is two global public functions and one global public variable.
The function name setQ
doesn't seem to say what you're actually doing - which appears to be creating a channel and publishing a message to it. publishMessage
would make more sense, I think.
All of the code that checks the usability of the connection should be extracted into a single function, something along the lines of haveUsableConnection
- the two if/else blocks are making it hard to see the code that is actually doing the work.
I question the hard coded IP address. Magic numbers and magic strings are frequently a bad idea.
edited Jun 26 '14 at 17:28
answered Jun 26 '14 at 17:23
Donald.McLean
4,2622148
4,2622148
add a comment |
add a comment |
Don't use
null
, but insteadOption[Connection]
.null
is frowned upon in Scala.It looks quite odd that
getConnection(ss:Connection)
takes in aConnection
. That method either sets the connection (opposite of get!), or initialize and returns the connection if the argument is null. Create two different methods. It does not make much sense as it is... it looks like you just wanted to use pattern matching somewhere.You can get rid of the whole object
RabbitMQConnection
and replace your lazy initialization of the connection with the Scalalazy
keyword. (EDIT: actually, it is not clear what exactly you should do with the connection, since you also have to always check if it is still open. I'll let you figure this out, but you should be aware of Scala'slazy
.)Since
setQ
only needs theconnection
to create aChannel
, I would just have a method to fetch theChannel
. That method would check if theConnection
is open and restart it if needed. (Separation of concerns.)I hope all the
println
are just for debugging and will be removed. If you keep those, use some logging service instead, even if it just prints to console in the end. More generally, you should strive for separation of concerns and find a way to separate the code that actually does something and the logging code.I would declare the return type of
setQ
.About that return type: instead of
Map("result" -> "success")
andMap("result" ->"error")
use Scala'sTry
.Try
is actually a monad so you'll be able to callmap
andflatMap
on the result ofsetQ
.
add a comment |
Don't use
null
, but insteadOption[Connection]
.null
is frowned upon in Scala.It looks quite odd that
getConnection(ss:Connection)
takes in aConnection
. That method either sets the connection (opposite of get!), or initialize and returns the connection if the argument is null. Create two different methods. It does not make much sense as it is... it looks like you just wanted to use pattern matching somewhere.You can get rid of the whole object
RabbitMQConnection
and replace your lazy initialization of the connection with the Scalalazy
keyword. (EDIT: actually, it is not clear what exactly you should do with the connection, since you also have to always check if it is still open. I'll let you figure this out, but you should be aware of Scala'slazy
.)Since
setQ
only needs theconnection
to create aChannel
, I would just have a method to fetch theChannel
. That method would check if theConnection
is open and restart it if needed. (Separation of concerns.)I hope all the
println
are just for debugging and will be removed. If you keep those, use some logging service instead, even if it just prints to console in the end. More generally, you should strive for separation of concerns and find a way to separate the code that actually does something and the logging code.I would declare the return type of
setQ
.About that return type: instead of
Map("result" -> "success")
andMap("result" ->"error")
use Scala'sTry
.Try
is actually a monad so you'll be able to callmap
andflatMap
on the result ofsetQ
.
add a comment |
Don't use
null
, but insteadOption[Connection]
.null
is frowned upon in Scala.It looks quite odd that
getConnection(ss:Connection)
takes in aConnection
. That method either sets the connection (opposite of get!), or initialize and returns the connection if the argument is null. Create two different methods. It does not make much sense as it is... it looks like you just wanted to use pattern matching somewhere.You can get rid of the whole object
RabbitMQConnection
and replace your lazy initialization of the connection with the Scalalazy
keyword. (EDIT: actually, it is not clear what exactly you should do with the connection, since you also have to always check if it is still open. I'll let you figure this out, but you should be aware of Scala'slazy
.)Since
setQ
only needs theconnection
to create aChannel
, I would just have a method to fetch theChannel
. That method would check if theConnection
is open and restart it if needed. (Separation of concerns.)I hope all the
println
are just for debugging and will be removed. If you keep those, use some logging service instead, even if it just prints to console in the end. More generally, you should strive for separation of concerns and find a way to separate the code that actually does something and the logging code.I would declare the return type of
setQ
.About that return type: instead of
Map("result" -> "success")
andMap("result" ->"error")
use Scala'sTry
.Try
is actually a monad so you'll be able to callmap
andflatMap
on the result ofsetQ
.
Don't use
null
, but insteadOption[Connection]
.null
is frowned upon in Scala.It looks quite odd that
getConnection(ss:Connection)
takes in aConnection
. That method either sets the connection (opposite of get!), or initialize and returns the connection if the argument is null. Create two different methods. It does not make much sense as it is... it looks like you just wanted to use pattern matching somewhere.You can get rid of the whole object
RabbitMQConnection
and replace your lazy initialization of the connection with the Scalalazy
keyword. (EDIT: actually, it is not clear what exactly you should do with the connection, since you also have to always check if it is still open. I'll let you figure this out, but you should be aware of Scala'slazy
.)Since
setQ
only needs theconnection
to create aChannel
, I would just have a method to fetch theChannel
. That method would check if theConnection
is open and restart it if needed. (Separation of concerns.)I hope all the
println
are just for debugging and will be removed. If you keep those, use some logging service instead, even if it just prints to console in the end. More generally, you should strive for separation of concerns and find a way to separate the code that actually does something and the logging code.I would declare the return type of
setQ
.About that return type: instead of
Map("result" -> "success")
andMap("result" ->"error")
use Scala'sTry
.Try
is actually a monad so you'll be able to callmap
andflatMap
on the result ofsetQ
.
edited Jun 26 '14 at 17:55
answered Jun 26 '14 at 17:24
toto2
5,1771019
5,1771019
add a comment |
add a comment |
To connect with RabbitMQ using C# code kindly follow below steps.
1. You should have valid Host Name that you may get it from rabbitmq URL
2. Virtual Domain name
3. User name and Password if you are connecting with remote rebbitMQ server
Please see below code,
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using System.IO;
using System.Reflection;
namespace RMQConnect
{
public class RabbitMQQueueManager
{
private const string ORIGINAL_QUEUENAME_KEY = "originalQueueName";
private const ushort PREFETCH_SIZE = 50;
public IConnection GetRabbitMqConnection()
{
ConnectionFactory factory = new ConnectionFactory
{
HostName = "d1vmrmqtrv01.oldev.arol.shell.com",
VirtualHost = "TRV_ANDD",
UserName = "ol_sqlmaint_trv",
Password = "P@ssw0rd",
};
return factory.CreateConnection();
}
public List<Message> GetMessagesFromQueueNoAck(string queueName, int
messageCount = -1)
{
QueueingBasicConsumer consumer = null;
var responseMessages = new List<Message>();
BasicDeliverEventArgs result = null;
using (var rmqConnection = GetRabbitMqConnection())
{
using (var channel = rmqConnection.CreateModel())
{
try
{
var queueMessageCount = (int)channel.MessageCount(queueName);
var count = messageCount > -1 ? messageCount <=
queueMessageCount ? messageCount : queueMessageCount :
queueMessageCount;
var pfCount = count >= PREFETCH_SIZE ? PREFETCH_SIZE : count;
channel.BasicQos(0, (ushort)pfCount, false);
consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, false, consumer);
for (int i = 0; i < pfCount; i++)
{
if (!channel.IsOpen)
{
throw new ApplicationException("Channel is closed");
}
result = consumer.Queue.Dequeue();
try
{
string messageData =
System.Text.Encoding.UTF8.GetString(result.Body);
var rMessage = new Message(messageData);
rMessage.Header = new MessageHeader();
RmqHeaderHandler.ReadRmqMessageProperties(result.BasicProperties, rMessage);
channel.BasicNack(result.DeliveryTag, false, true);
responseMessages.Add(rMessage);
}
catch (Exception ex)
{
channel.BasicNack(result.DeliveryTag, false, true);
}
}
}
catch (Exception)
{
////Nack the message back to queue in case of exception
if (result != null)
{
channel.BasicNack(result.DeliveryTag, false, true);
}
throw;
}
}
}
return responseMessages;
}
}
public static class RmqHeaderHandler
{
private const byte NonPersistentDeliveryMode = 1;
private const byte PersistentDeliveryMode = 2;
private const string SecurityTokenKey = "SecurityToken";
private const string Properties = "properties";
private const string MessageNameKey = "MessageName";
private const string SystemPropertiesKey = "SystemProperties";
private const string ApplicationPropertiesKey = "ApplicationProperties";
#region Public Methods
public static void ReadDynamicMessageProperties(dynamic messageProperties,
Message message)
{
try
{
message.Header.AppId = messageProperties.appId;
message.Header.MessageId = messageProperties.messageId;
message.Header.GeneratedAtUtc = messageProperties.generatedAtUtc;
message.Header.ExpirationInMilliseconds =
messageProperties.expirationInMilliseconds;
message.Header.IsPersistent = messageProperties.isPersistent;
message.Header.Delete = Convert.ToBoolean(messageProperties.delete);
message.Header.Move = Convert.ToBoolean(messageProperties.move);
if (messageProperties.ContainsKey("messageName"))
{
message.Header.MessageName = messageProperties.messageName;
}
if (messageProperties.ContainsKey("properties"))
{
var customProperties =
Newtonsoft.Json.JsonConvert.DeserializeObject<Dictionary<string, string>>
(Convert.ToString(messageProperties.properties));
foreach (var propPair in customProperties)
{
message.Header.Properties.Add(propPair.Key, propPair.Value);
}
}
}
catch (Exception)
{
throw;
}
}
public static void ReadRmqMessageProperties(IBasicProperties messageProperties,
Message message)
{
//message.Header.AppId = messageProperties.AppId;
message.Header.MessageId = messageProperties.MessageId;
message.Header.GeneratedAtUtc = new
DateTime(messageProperties.Timestamp.UnixTime);
message.Header.ExpirationInMilliseconds = messageProperties.Expiration;
message.Header.IsPersistent = messageProperties.DeliveryMode ==
PersistentDeliveryMode;
if (messageProperties.Headers!=null &&
messageProperties.Headers.ContainsKey(SystemPropertiesKey))
{
var systemProperties =
DeserializeMessageProperties((byte)
messageProperties.Headers[SystemPropertiesKey]);
if (systemProperties.ContainsKey(MessageNameKey))
{
message.Header.MessageName = systemProperties[MessageNameKey];
}
}
if (messageProperties.Headers!=null &&
messageProperties.Headers.ContainsKey(ApplicationPropertiesKey))
{
var applicationProperties =
DeserializeMessageProperties((byte)
messageProperties.Headers[ApplicationPropertiesKey]);
foreach (var propPair in applicationProperties)
{
message.Header.Properties.Add(propPair.Key, propPair.Value);
}
}
}
#endregion
#region private methods
private static Dictionary<string, string> DeserializeMessageProperties(byte
properties)
{
//var serializer = new .JsonMessageSerializer();
var serializedText = JsonConvert.SerializeObject(properties);
return JsonConvert.DeserializeObject<Dictionary<string, string>>
(serializedText);
}
#endregion
}
public class QueueInfoModel
{
public string Environment { get; set; }
public string ApplicationGroup { get; set; }
public string ApplicationName { get; set; }
public string ErrorQueueName { get; set; }
public string OriginalQueueName { get; set; }
public int MessageCount { get; set; }
}
[Serializable]
public class MessageHeader
{
public MessageHeader()
{
this.MessageId = Guid.NewGuid().ToString();
this.Properties = new Dictionary<string, string>();
this.IsPersistent = true;
}
public string AppId { get; set; }
public string MessageId { get; set; }
public string MessageName { get; set; }
public DateTime GeneratedAtUtc { get; set; }
public string ExpirationInMilliseconds { get; set; }
public bool IsPersistent { get; set; }
public bool Delete { get; set; }
public bool Move { get; set; }
public IDictionary<string, string> Properties { get; private set; }
}
[Serializable]
public class Message
{
private readonly string serializableBody;
public Message(string serializableBody)
{
this.serializableBody = serializableBody;
}
public MessageHeader Header
{
get;
set;
}
public string MessageBody
{
get
{
return this.serializableBody;
}
}
public byte GetBody()
{
return (byte)((object)Encoding.ASCII.GetBytes(this.MessageBody));
}
}
}
New contributor
add a comment |
To connect with RabbitMQ using C# code kindly follow below steps.
1. You should have valid Host Name that you may get it from rabbitmq URL
2. Virtual Domain name
3. User name and Password if you are connecting with remote rebbitMQ server
Please see below code,
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using System.IO;
using System.Reflection;
namespace RMQConnect
{
public class RabbitMQQueueManager
{
private const string ORIGINAL_QUEUENAME_KEY = "originalQueueName";
private const ushort PREFETCH_SIZE = 50;
public IConnection GetRabbitMqConnection()
{
ConnectionFactory factory = new ConnectionFactory
{
HostName = "d1vmrmqtrv01.oldev.arol.shell.com",
VirtualHost = "TRV_ANDD",
UserName = "ol_sqlmaint_trv",
Password = "P@ssw0rd",
};
return factory.CreateConnection();
}
public List<Message> GetMessagesFromQueueNoAck(string queueName, int
messageCount = -1)
{
QueueingBasicConsumer consumer = null;
var responseMessages = new List<Message>();
BasicDeliverEventArgs result = null;
using (var rmqConnection = GetRabbitMqConnection())
{
using (var channel = rmqConnection.CreateModel())
{
try
{
var queueMessageCount = (int)channel.MessageCount(queueName);
var count = messageCount > -1 ? messageCount <=
queueMessageCount ? messageCount : queueMessageCount :
queueMessageCount;
var pfCount = count >= PREFETCH_SIZE ? PREFETCH_SIZE : count;
channel.BasicQos(0, (ushort)pfCount, false);
consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, false, consumer);
for (int i = 0; i < pfCount; i++)
{
if (!channel.IsOpen)
{
throw new ApplicationException("Channel is closed");
}
result = consumer.Queue.Dequeue();
try
{
string messageData =
System.Text.Encoding.UTF8.GetString(result.Body);
var rMessage = new Message(messageData);
rMessage.Header = new MessageHeader();
RmqHeaderHandler.ReadRmqMessageProperties(result.BasicProperties, rMessage);
channel.BasicNack(result.DeliveryTag, false, true);
responseMessages.Add(rMessage);
}
catch (Exception ex)
{
channel.BasicNack(result.DeliveryTag, false, true);
}
}
}
catch (Exception)
{
////Nack the message back to queue in case of exception
if (result != null)
{
channel.BasicNack(result.DeliveryTag, false, true);
}
throw;
}
}
}
return responseMessages;
}
}
public static class RmqHeaderHandler
{
private const byte NonPersistentDeliveryMode = 1;
private const byte PersistentDeliveryMode = 2;
private const string SecurityTokenKey = "SecurityToken";
private const string Properties = "properties";
private const string MessageNameKey = "MessageName";
private const string SystemPropertiesKey = "SystemProperties";
private const string ApplicationPropertiesKey = "ApplicationProperties";
#region Public Methods
public static void ReadDynamicMessageProperties(dynamic messageProperties,
Message message)
{
try
{
message.Header.AppId = messageProperties.appId;
message.Header.MessageId = messageProperties.messageId;
message.Header.GeneratedAtUtc = messageProperties.generatedAtUtc;
message.Header.ExpirationInMilliseconds =
messageProperties.expirationInMilliseconds;
message.Header.IsPersistent = messageProperties.isPersistent;
message.Header.Delete = Convert.ToBoolean(messageProperties.delete);
message.Header.Move = Convert.ToBoolean(messageProperties.move);
if (messageProperties.ContainsKey("messageName"))
{
message.Header.MessageName = messageProperties.messageName;
}
if (messageProperties.ContainsKey("properties"))
{
var customProperties =
Newtonsoft.Json.JsonConvert.DeserializeObject<Dictionary<string, string>>
(Convert.ToString(messageProperties.properties));
foreach (var propPair in customProperties)
{
message.Header.Properties.Add(propPair.Key, propPair.Value);
}
}
}
catch (Exception)
{
throw;
}
}
public static void ReadRmqMessageProperties(IBasicProperties messageProperties,
Message message)
{
//message.Header.AppId = messageProperties.AppId;
message.Header.MessageId = messageProperties.MessageId;
message.Header.GeneratedAtUtc = new
DateTime(messageProperties.Timestamp.UnixTime);
message.Header.ExpirationInMilliseconds = messageProperties.Expiration;
message.Header.IsPersistent = messageProperties.DeliveryMode ==
PersistentDeliveryMode;
if (messageProperties.Headers!=null &&
messageProperties.Headers.ContainsKey(SystemPropertiesKey))
{
var systemProperties =
DeserializeMessageProperties((byte)
messageProperties.Headers[SystemPropertiesKey]);
if (systemProperties.ContainsKey(MessageNameKey))
{
message.Header.MessageName = systemProperties[MessageNameKey];
}
}
if (messageProperties.Headers!=null &&
messageProperties.Headers.ContainsKey(ApplicationPropertiesKey))
{
var applicationProperties =
DeserializeMessageProperties((byte)
messageProperties.Headers[ApplicationPropertiesKey]);
foreach (var propPair in applicationProperties)
{
message.Header.Properties.Add(propPair.Key, propPair.Value);
}
}
}
#endregion
#region private methods
private static Dictionary<string, string> DeserializeMessageProperties(byte
properties)
{
//var serializer = new .JsonMessageSerializer();
var serializedText = JsonConvert.SerializeObject(properties);
return JsonConvert.DeserializeObject<Dictionary<string, string>>
(serializedText);
}
#endregion
}
public class QueueInfoModel
{
public string Environment { get; set; }
public string ApplicationGroup { get; set; }
public string ApplicationName { get; set; }
public string ErrorQueueName { get; set; }
public string OriginalQueueName { get; set; }
public int MessageCount { get; set; }
}
[Serializable]
public class MessageHeader
{
public MessageHeader()
{
this.MessageId = Guid.NewGuid().ToString();
this.Properties = new Dictionary<string, string>();
this.IsPersistent = true;
}
public string AppId { get; set; }
public string MessageId { get; set; }
public string MessageName { get; set; }
public DateTime GeneratedAtUtc { get; set; }
public string ExpirationInMilliseconds { get; set; }
public bool IsPersistent { get; set; }
public bool Delete { get; set; }
public bool Move { get; set; }
public IDictionary<string, string> Properties { get; private set; }
}
[Serializable]
public class Message
{
private readonly string serializableBody;
public Message(string serializableBody)
{
this.serializableBody = serializableBody;
}
public MessageHeader Header
{
get;
set;
}
public string MessageBody
{
get
{
return this.serializableBody;
}
}
public byte GetBody()
{
return (byte)((object)Encoding.ASCII.GetBytes(this.MessageBody));
}
}
}
New contributor
add a comment |
To connect with RabbitMQ using C# code kindly follow below steps.
1. You should have valid Host Name that you may get it from rabbitmq URL
2. Virtual Domain name
3. User name and Password if you are connecting with remote rebbitMQ server
Please see below code,
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using System.IO;
using System.Reflection;
namespace RMQConnect
{
public class RabbitMQQueueManager
{
private const string ORIGINAL_QUEUENAME_KEY = "originalQueueName";
private const ushort PREFETCH_SIZE = 50;
public IConnection GetRabbitMqConnection()
{
ConnectionFactory factory = new ConnectionFactory
{
HostName = "d1vmrmqtrv01.oldev.arol.shell.com",
VirtualHost = "TRV_ANDD",
UserName = "ol_sqlmaint_trv",
Password = "P@ssw0rd",
};
return factory.CreateConnection();
}
public List<Message> GetMessagesFromQueueNoAck(string queueName, int
messageCount = -1)
{
QueueingBasicConsumer consumer = null;
var responseMessages = new List<Message>();
BasicDeliverEventArgs result = null;
using (var rmqConnection = GetRabbitMqConnection())
{
using (var channel = rmqConnection.CreateModel())
{
try
{
var queueMessageCount = (int)channel.MessageCount(queueName);
var count = messageCount > -1 ? messageCount <=
queueMessageCount ? messageCount : queueMessageCount :
queueMessageCount;
var pfCount = count >= PREFETCH_SIZE ? PREFETCH_SIZE : count;
channel.BasicQos(0, (ushort)pfCount, false);
consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, false, consumer);
for (int i = 0; i < pfCount; i++)
{
if (!channel.IsOpen)
{
throw new ApplicationException("Channel is closed");
}
result = consumer.Queue.Dequeue();
try
{
string messageData =
System.Text.Encoding.UTF8.GetString(result.Body);
var rMessage = new Message(messageData);
rMessage.Header = new MessageHeader();
RmqHeaderHandler.ReadRmqMessageProperties(result.BasicProperties, rMessage);
channel.BasicNack(result.DeliveryTag, false, true);
responseMessages.Add(rMessage);
}
catch (Exception ex)
{
channel.BasicNack(result.DeliveryTag, false, true);
}
}
}
catch (Exception)
{
////Nack the message back to queue in case of exception
if (result != null)
{
channel.BasicNack(result.DeliveryTag, false, true);
}
throw;
}
}
}
return responseMessages;
}
}
public static class RmqHeaderHandler
{
private const byte NonPersistentDeliveryMode = 1;
private const byte PersistentDeliveryMode = 2;
private const string SecurityTokenKey = "SecurityToken";
private const string Properties = "properties";
private const string MessageNameKey = "MessageName";
private const string SystemPropertiesKey = "SystemProperties";
private const string ApplicationPropertiesKey = "ApplicationProperties";
#region Public Methods
public static void ReadDynamicMessageProperties(dynamic messageProperties,
Message message)
{
try
{
message.Header.AppId = messageProperties.appId;
message.Header.MessageId = messageProperties.messageId;
message.Header.GeneratedAtUtc = messageProperties.generatedAtUtc;
message.Header.ExpirationInMilliseconds =
messageProperties.expirationInMilliseconds;
message.Header.IsPersistent = messageProperties.isPersistent;
message.Header.Delete = Convert.ToBoolean(messageProperties.delete);
message.Header.Move = Convert.ToBoolean(messageProperties.move);
if (messageProperties.ContainsKey("messageName"))
{
message.Header.MessageName = messageProperties.messageName;
}
if (messageProperties.ContainsKey("properties"))
{
var customProperties =
Newtonsoft.Json.JsonConvert.DeserializeObject<Dictionary<string, string>>
(Convert.ToString(messageProperties.properties));
foreach (var propPair in customProperties)
{
message.Header.Properties.Add(propPair.Key, propPair.Value);
}
}
}
catch (Exception)
{
throw;
}
}
public static void ReadRmqMessageProperties(IBasicProperties messageProperties,
Message message)
{
//message.Header.AppId = messageProperties.AppId;
message.Header.MessageId = messageProperties.MessageId;
message.Header.GeneratedAtUtc = new
DateTime(messageProperties.Timestamp.UnixTime);
message.Header.ExpirationInMilliseconds = messageProperties.Expiration;
message.Header.IsPersistent = messageProperties.DeliveryMode ==
PersistentDeliveryMode;
if (messageProperties.Headers!=null &&
messageProperties.Headers.ContainsKey(SystemPropertiesKey))
{
var systemProperties =
DeserializeMessageProperties((byte)
messageProperties.Headers[SystemPropertiesKey]);
if (systemProperties.ContainsKey(MessageNameKey))
{
message.Header.MessageName = systemProperties[MessageNameKey];
}
}
if (messageProperties.Headers!=null &&
messageProperties.Headers.ContainsKey(ApplicationPropertiesKey))
{
var applicationProperties =
DeserializeMessageProperties((byte)
messageProperties.Headers[ApplicationPropertiesKey]);
foreach (var propPair in applicationProperties)
{
message.Header.Properties.Add(propPair.Key, propPair.Value);
}
}
}
#endregion
#region private methods
private static Dictionary<string, string> DeserializeMessageProperties(byte
properties)
{
//var serializer = new .JsonMessageSerializer();
var serializedText = JsonConvert.SerializeObject(properties);
return JsonConvert.DeserializeObject<Dictionary<string, string>>
(serializedText);
}
#endregion
}
public class QueueInfoModel
{
public string Environment { get; set; }
public string ApplicationGroup { get; set; }
public string ApplicationName { get; set; }
public string ErrorQueueName { get; set; }
public string OriginalQueueName { get; set; }
public int MessageCount { get; set; }
}
[Serializable]
public class MessageHeader
{
public MessageHeader()
{
this.MessageId = Guid.NewGuid().ToString();
this.Properties = new Dictionary<string, string>();
this.IsPersistent = true;
}
public string AppId { get; set; }
public string MessageId { get; set; }
public string MessageName { get; set; }
public DateTime GeneratedAtUtc { get; set; }
public string ExpirationInMilliseconds { get; set; }
public bool IsPersistent { get; set; }
public bool Delete { get; set; }
public bool Move { get; set; }
public IDictionary<string, string> Properties { get; private set; }
}
[Serializable]
public class Message
{
private readonly string serializableBody;
public Message(string serializableBody)
{
this.serializableBody = serializableBody;
}
public MessageHeader Header
{
get;
set;
}
public string MessageBody
{
get
{
return this.serializableBody;
}
}
public byte GetBody()
{
return (byte)((object)Encoding.ASCII.GetBytes(this.MessageBody));
}
}
}
New contributor
To connect with RabbitMQ using C# code kindly follow below steps.
1. You should have valid Host Name that you may get it from rabbitmq URL
2. Virtual Domain name
3. User name and Password if you are connecting with remote rebbitMQ server
Please see below code,
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using System.IO;
using System.Reflection;
namespace RMQConnect
{
public class RabbitMQQueueManager
{
private const string ORIGINAL_QUEUENAME_KEY = "originalQueueName";
private const ushort PREFETCH_SIZE = 50;
public IConnection GetRabbitMqConnection()
{
ConnectionFactory factory = new ConnectionFactory
{
HostName = "d1vmrmqtrv01.oldev.arol.shell.com",
VirtualHost = "TRV_ANDD",
UserName = "ol_sqlmaint_trv",
Password = "P@ssw0rd",
};
return factory.CreateConnection();
}
public List<Message> GetMessagesFromQueueNoAck(string queueName, int
messageCount = -1)
{
QueueingBasicConsumer consumer = null;
var responseMessages = new List<Message>();
BasicDeliverEventArgs result = null;
using (var rmqConnection = GetRabbitMqConnection())
{
using (var channel = rmqConnection.CreateModel())
{
try
{
var queueMessageCount = (int)channel.MessageCount(queueName);
var count = messageCount > -1 ? messageCount <=
queueMessageCount ? messageCount : queueMessageCount :
queueMessageCount;
var pfCount = count >= PREFETCH_SIZE ? PREFETCH_SIZE : count;
channel.BasicQos(0, (ushort)pfCount, false);
consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, false, consumer);
for (int i = 0; i < pfCount; i++)
{
if (!channel.IsOpen)
{
throw new ApplicationException("Channel is closed");
}
result = consumer.Queue.Dequeue();
try
{
string messageData =
System.Text.Encoding.UTF8.GetString(result.Body);
var rMessage = new Message(messageData);
rMessage.Header = new MessageHeader();
RmqHeaderHandler.ReadRmqMessageProperties(result.BasicProperties, rMessage);
channel.BasicNack(result.DeliveryTag, false, true);
responseMessages.Add(rMessage);
}
catch (Exception ex)
{
channel.BasicNack(result.DeliveryTag, false, true);
}
}
}
catch (Exception)
{
////Nack the message back to queue in case of exception
if (result != null)
{
channel.BasicNack(result.DeliveryTag, false, true);
}
throw;
}
}
}
return responseMessages;
}
}
public static class RmqHeaderHandler
{
private const byte NonPersistentDeliveryMode = 1;
private const byte PersistentDeliveryMode = 2;
private const string SecurityTokenKey = "SecurityToken";
private const string Properties = "properties";
private const string MessageNameKey = "MessageName";
private const string SystemPropertiesKey = "SystemProperties";
private const string ApplicationPropertiesKey = "ApplicationProperties";
#region Public Methods
public static void ReadDynamicMessageProperties(dynamic messageProperties,
Message message)
{
try
{
message.Header.AppId = messageProperties.appId;
message.Header.MessageId = messageProperties.messageId;
message.Header.GeneratedAtUtc = messageProperties.generatedAtUtc;
message.Header.ExpirationInMilliseconds =
messageProperties.expirationInMilliseconds;
message.Header.IsPersistent = messageProperties.isPersistent;
message.Header.Delete = Convert.ToBoolean(messageProperties.delete);
message.Header.Move = Convert.ToBoolean(messageProperties.move);
if (messageProperties.ContainsKey("messageName"))
{
message.Header.MessageName = messageProperties.messageName;
}
if (messageProperties.ContainsKey("properties"))
{
var customProperties =
Newtonsoft.Json.JsonConvert.DeserializeObject<Dictionary<string, string>>
(Convert.ToString(messageProperties.properties));
foreach (var propPair in customProperties)
{
message.Header.Properties.Add(propPair.Key, propPair.Value);
}
}
}
catch (Exception)
{
throw;
}
}
public static void ReadRmqMessageProperties(IBasicProperties messageProperties,
Message message)
{
//message.Header.AppId = messageProperties.AppId;
message.Header.MessageId = messageProperties.MessageId;
message.Header.GeneratedAtUtc = new
DateTime(messageProperties.Timestamp.UnixTime);
message.Header.ExpirationInMilliseconds = messageProperties.Expiration;
message.Header.IsPersistent = messageProperties.DeliveryMode ==
PersistentDeliveryMode;
if (messageProperties.Headers!=null &&
messageProperties.Headers.ContainsKey(SystemPropertiesKey))
{
var systemProperties =
DeserializeMessageProperties((byte)
messageProperties.Headers[SystemPropertiesKey]);
if (systemProperties.ContainsKey(MessageNameKey))
{
message.Header.MessageName = systemProperties[MessageNameKey];
}
}
if (messageProperties.Headers!=null &&
messageProperties.Headers.ContainsKey(ApplicationPropertiesKey))
{
var applicationProperties =
DeserializeMessageProperties((byte)
messageProperties.Headers[ApplicationPropertiesKey]);
foreach (var propPair in applicationProperties)
{
message.Header.Properties.Add(propPair.Key, propPair.Value);
}
}
}
#endregion
#region private methods
private static Dictionary<string, string> DeserializeMessageProperties(byte
properties)
{
//var serializer = new .JsonMessageSerializer();
var serializedText = JsonConvert.SerializeObject(properties);
return JsonConvert.DeserializeObject<Dictionary<string, string>>
(serializedText);
}
#endregion
}
public class QueueInfoModel
{
public string Environment { get; set; }
public string ApplicationGroup { get; set; }
public string ApplicationName { get; set; }
public string ErrorQueueName { get; set; }
public string OriginalQueueName { get; set; }
public int MessageCount { get; set; }
}
[Serializable]
public class MessageHeader
{
public MessageHeader()
{
this.MessageId = Guid.NewGuid().ToString();
this.Properties = new Dictionary<string, string>();
this.IsPersistent = true;
}
public string AppId { get; set; }
public string MessageId { get; set; }
public string MessageName { get; set; }
public DateTime GeneratedAtUtc { get; set; }
public string ExpirationInMilliseconds { get; set; }
public bool IsPersistent { get; set; }
public bool Delete { get; set; }
public bool Move { get; set; }
public IDictionary<string, string> Properties { get; private set; }
}
[Serializable]
public class Message
{
private readonly string serializableBody;
public Message(string serializableBody)
{
this.serializableBody = serializableBody;
}
public MessageHeader Header
{
get;
set;
}
public string MessageBody
{
get
{
return this.serializableBody;
}
}
public byte GetBody()
{
return (byte)((object)Encoding.ASCII.GetBytes(this.MessageBody));
}
}
}
New contributor
New contributor
answered 4 mins ago
Punit Pandya
1
1
New contributor
New contributor
add a comment |
add a comment |
Thanks for contributing an answer to Code Review Stack Exchange!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
Use MathJax to format equations. MathJax reference.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f55312%2fconnecting-to-rabbitmq%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown