My meter supports DLMS Protocol and I am trying to connect to my meter using Gurux director in that I am selecting media as MQTT and giving my RMQ broker IP and topic but I want to publish my request to one topic and subscribe to another topic for meter sending responses
I am using GXMQTT class and I can Publish and Subscribe to same topic it is working for me but when I am trying to publish to one topic and subscribe from another topic it is saying invalid topic
I have created two topics in my rabbit mq broker under one queue and trying to publish the data to request topic and subscribe to response topic but it is not working when I am giving two topics one for publish and other for subscribe it giving me invalid request topic
// Subscribe to a topic
mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("gw-response.Eficaa_GW*").WithAtMostOnceQoS().Build()).Wait();
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Opening));
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Open, sender = clientId };
await PublishMessageAsync(msg,"gw-request.Eficaa_GW*");
}; this connect method is not working I am using two topics but it giving me exception as invalid "gw-request.Eficaa_GW*" topic please give me the solution for this.
Start the Broker first and then the Bridge.
When you start the bridge you should see available medias like this:
Connecting to the Broker in address: localhost:1883
Bridge topic: 60b740fb-fd16-408b-b05f-686278b2782d
Media topic: 60b740fb-fd16-408b-b05f-686278b2782d/1
Media topic: 60b740fb-fd16-408b-b05f-686278b2782d/2
Press Esc to close application or delete clear the console.
Check that your serial port and TCP/IP settings are correct in the connections.json file.
In GXDLMSDirector check that broker address and port are correct. Then add the topic.
The example uses 60b740fb-fd16-408b-b05f-686278b2782d/1 for the TCP/IP and
60b740fb-fd16-408b-b05f-686278b2782d/2 for the serial port.
Hi
I am using only TCP/IP connection I am using my rmq broker with queue having two topics but when I run the bridge in broker only a dynamic queue is created in that two topics created one for tcp/ip and another for serial port connection but in my case
I want to publish to one topic and meter will publish to another topic from that I have to subscribe and again I have to publish this scenario I Want to work
//
// --------------------------------------------------------------------------
// Gurux Ltd
//
//
//
// Filename: $HeadURL$
//
// Version: $Revision$,
// $Date$
// $Author$
//
// Copyright (c) Gurux Ltd
//
//---------------------------------------------------------------------------
//
// DESCRIPTION
//
// This file is a part of Gurux Device Framework.
//
// Gurux Device Framework is Open Source software; you can redistribute it
// and/or modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; version 2 of the License.
// Gurux Device Framework is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// This code is licensed under the GNU General Public License v2.
// Full text may be retrieved at http://www.gnu.org/licenses/gpl-2.0.txt
//---------------------------------------------------------------------------
using Gurux.Common;
using Gurux.MQTT.Message;
using Gurux.MQTT.Properties;
using Gurux.Shared;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.ComponentModel;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
//using static System.Windows.Forms.VisualStyles.VisualStyleElement.TrackBar;
namespace Gurux.MQTT
{
/// <summary>
/// GXMqtt implements MQTT client that sends bytes to the MQTT broker.
/// </summary>
public class GXMqtt : IGXMedia2
{
/// <summary>
/// Sync object.
/// </summary>
private object sync = new object();
internal GXSynchronousMediaBase syncBase;
/// <summary>
/// What level of tracing is used.
/// </summary>
public TraceLevel Trace
{
get
{
return syncBase.Trace;
}
set
{
syncBase.Trace = value;
}
}
/// <inheritdoc cref="IGXMedia.Validate"/>
public void Validate()
{
if (port == 0)
{
throw new Exception(Resources.InvalidBrokerPort);
}
if (!string.IsNullOrEmpty(serverAddress))
{
throw new Exception(Resources.InvalidBrokerName);
}
if (!string.IsNullOrEmpty(topic))
{
throw new Exception(Resources.InvalidTopic);
}
}
/// <inheritdoc />
[Browsable(false), ReadOnly(true)]
public object SyncRoot
{
get
{
//In some special cases when binary serialization is used this might be null
//after deserialize. Just set it.
if (sync == null)
{
sync = new object();
}
return sync;
}
}
#if NET462_OR_GREATER || WINDOWS
/// <summary>
/// Shows MQTT Properties dialog.
/// </summary>
/// <param name="parent">Owner window of the Properties dialog.</param>
/// <returns>True, if the user has accepted the changes.</returns>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ServerAddress">HostName</seealso>
/// <seealso href="PropertiesDialog.html">Properties Dialog</seealso>
public bool Properties(System.Windows.Forms.Form parent)
{
return new PropertiesForm(PropertiesForm, Resources.SettingsTxt, IsOpen, Resources.OK, Resources.Cancel,
"https://www.gurux.fi/GXMQTTProperties").ShowDialog(parent) == System.Windows.Forms.DialogResult.OK;
}
/// <inheritdoc cref="IGXMedia.PropertiesForm"/>
public System.Windows.Forms.Form PropertiesForm
{
get
{
return new Settings(this);
}
}
#endif //NET462_OR_GREATER || WINDOWS
private void NotifyPropertyChanged(string info)
{
if (m_OnPropertyChanged != null)
{
m_OnPropertyChanged(this, new PropertyChangedEventArgs(info));
}
}
/// <summary>
/// Used topic.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ClientId">Protocol</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used topic.")]
public string Topic
{
get
{
return topic;
}
set
{
if (topic != value)
{
topic = value;
NotifyPropertyChanged("Topic");
}
}
}
/// <summary>
/// Retrieves or sets used client ID.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="Topic">Topic</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used client ID.")]
public string ClientId
{
get
{
return userClientId;
}
set
{
if (userClientId != value)
{
userClientId = value;
NotifyPropertyChanged("ClientId");
}
}
}
/// <summary>
/// MQTT server address.
/// </summary>
/// <value>MQTT server address.</value>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("MQTT server address.")]
public string ServerAddress
{
get
{
return serverAddress;
}
set
{
if (serverAddress != value)
{
serverAddress = value;
NotifyPropertyChanged("ServerAddress");
}
}
}
/// <summary>
/// MQTT server port number.
/// </summary>
/// <value>MQTT server port number.</value>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="ServerAddress">ServerAddress</seealso>
[DefaultValue(1883)]
[Category("Communication")]
[Description("MQTT server port number.")]
public int Port
{
get
{
return port;
}
set
{
if (port != value)
{
port = value;
NotifyPropertyChanged("Port");
}
}
}
/// <summary>
/// Occurs when a property value changes.
/// </summary>
public event PropertyChangedEventHandler PropertyChanged
{
add
{
m_OnPropertyChanged += value;
}
remove
{
m_OnPropertyChanged -= value;
}
}
/// <summary>
/// GXNet component sends received data through this method.
/// </summary>
[Description("GXNet component sends received data through this method.")]
public event ReceivedEventHandler OnReceived
{
add
{
m_OnReceived += value;
}
remove
{
m_OnReceived -= value;
}
}
/// <summary>
/// Errors that occur after the connection is established, are sent through this method.
/// </summary>
[Description("Errors that occur after the connection is established, are sent through this method.")]
public event ErrorEventHandler OnError
{
add
{
/// <summary>
/// Media component sends notification, when its state changes.
/// </summary>
[Description("Media component sends notification, when its state changes.")]
public event MediaStateChangeEventHandler OnMediaStateChange
{
add
{
m_OnMediaStateChange += value;
}
remove
{
m_OnMediaStateChange -= value;
}
}
#if WINDOWS_PHONE
event ClientConnectedEventHandler IGXMedia.OnClientConnected
{
add {
throw new NotImplementedException();
}
remove {
throw new NotImplementedException();
}
}
event ClientDisconnectedEventHandler IGXMedia.OnClientDisconnected
{
add {
throw new NotImplementedException();
}
remove {
throw new NotImplementedException();
}
}
#else
/// <summary>
/// Called when the client is establishing a connection with a Net Server.
/// </summary>
[Description("Called when the client is establishing a connection with a Net Server.")]
public event ClientConnectedEventHandler OnClientConnected
{
add
{
m_OnClientConnected += value;
}
remove
{
m_OnClientConnected -= value;
}
}
/// <summary>
/// Called when the client has been disconnected from the network server.
/// </summary>
[Description("Called when the client has been disconnected from the network server.")]
public event ClientDisconnectedEventHandler OnClientDisconnected
{
add
{
m_OnClientDisconnected += value;
}
remove
{
m_OnClientDisconnected -= value;
}
}
/// <inheritdoc />
[Description("Called when the Media is sending or receiving data.")]
public event TraceEventHandler OnTrace
{
add
{
m_OnTrace += value;
}
remove
{
m_OnTrace -= value;
}
}
private int HandleReceivedData(int bytes, byte[] buff, string sender)
{
BytesReceived += (uint)bytes;
if (this.IsSynchronous)
{
TraceEventArgs arg;
lock (syncBase.receivedSync)
{
int index = syncBase.receivedSize;
syncBase.AppendData(buff, 0, bytes);
if (bytes != 0 && Trace == TraceLevel.Verbose && m_OnTrace != null)
{
arg = new TraceEventArgs(TraceTypes.Received, buff, 0, bytes, null);
m_OnTrace(this, arg);
}
if (bytes != 0 && Eop != null) //Search Eop if given.
{
if (Eop is Array)
{
foreach (object eop in (Array)Eop)
{
bytes = Gurux.Common.GXCommon.IndexOf(syncBase.m_Received, Gurux.Common.GXCommon.GetAsByteArray(eop), index, syncBase.receivedSize);
if (bytes != -1)
{
break;
}
}
}
else
{
bytes = Gurux.Common.GXCommon.IndexOf(syncBase.m_Received, Gurux.Common.GXCommon.GetAsByteArray(Eop), index, syncBase.receivedSize);
}
}
if (bytes != -1)
{
syncBase.receivedEvent.Set();
}
}
}
else
{
if (m_OnReceived != null)
{
syncBase.receivedSize = 0;
byte[] data = new byte[bytes];
Array.Copy(buff, data, bytes);
if (Trace == TraceLevel.Verbose && m_OnTrace != null)
{
m_OnTrace(this, new TraceEventArgs(TraceTypes.Received, data, null));
}
m_OnReceived(this, new ReceiveEventArgs(data, sender));
}
else if (Trace == TraceLevel.Verbose && m_OnTrace != null)
{
m_OnTrace(this, new TraceEventArgs(TraceTypes.Received, buff, 0, bytes, null));
}
}
return bytes;
}
/// <inheritdoc />
public void Open()
{
Close();
mqttClient = factory.CreateMqttClient();
if (string.IsNullOrEmpty(userClientId))
{
clientId = Guid.NewGuid().ToString();
}
else
{
clientId = userClientId;
}
var options = new MqttClientOptionsBuilder()
.WithTcpServer(serverAddress).WithClientId(clientId).WithCredentials("admin","admin")
.Build();
mqttClient.ApplicationMessageReceivedAsync += async t =>
{
string str = ASCIIEncoding.ASCII.GetString(t.ApplicationMessage.Payload);
GXMessage msg;
#if NET462
var parser = new Gurux.Common.JSon.GXJsonParser();
msg = parser.Deserialize<GXMessage>(str);
#else
msg = System.Text.Json.JsonSerializer.Deserialize<GXMessage>(str);
#endif
//if (msg.id == messageId || (MessageType)msg.type == MessageType.Close || (MessageType)msg.type == MessageType.Exception)
{
switch ((MessageType)msg.type)
{
case MessageType.Open:
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Open));
replyReceivedEvent.Set();
break;
case MessageType.Send:
break;
case MessageType.Receive:
byte[] bytes = Gurux.Common.GXCommon.HexToBytes(msg.frame);
replyReceivedEvent.Set();
if (bytes.Length != 0)
{
HandleReceivedData(bytes.Length, bytes, t.ClientId);
}
break;
case MessageType.Close:
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Closed));
replyReceivedEvent.Set();
break;
case MessageType.Exception:
lastException = msg.exception;
replyReceivedEvent.Set();
break;
}
}
//else
{
m_OnTrace?.Invoke(this, new TraceEventArgs(TraceTypes.Info, Resources.UnknownReply + msg, msg.sender));
}
};
mqttClient.ConnectedAsync += async t =>
{
// Subscribe to a topic
//mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("gw-response.Eficaa_GW").WithAtMostOnceQoS().Build()).Wait();
//m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Opening));
//GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Open, sender = clientId };
//await PublishMessageAsync(msg);
var mqttSubscribeOptions = factory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic("PublisherTest.2");
})
.Build();
await mqttClient.SubscribeAsync(mqttSubscribeOptions);
//if (trace > TraceLevel.Warning)
//{
// Console.WriteLine("--- {0} Subscribed. ", it.Name);
//}
};
//mqttClient.ConnectAsync(options);
GXMessage msg1 = new GXMessage() { id = MessageId, type = (int)MessageType.Open, sender = clientId };
PublishMessageAsync(msg1);
mqttClient.DisconnectedAsync += async t =>
{
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Closed));
replyReceivedEvent.Set();
};
try
{
replyReceivedEvent.Reset();
if (AsyncWaitTime == 0)
{
mqttClient.ConnectAsync(options).Wait();
}
else
{
mqttClient.ConnectAsync(options).Wait((int)AsyncWaitTime * 1000);
}
//mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("gw-response.Eficaa_GW").WithAtMostOnceQoS().Build()).Wait();
}
catch (AggregateException ex)
{
if (mqttClient != null)
{
mqttClient.DisconnectAsync().Wait(10000);
}
mqttClient = null;
throw ex.InnerException;
}
if (AsyncWaitTime == 0)
{
replyReceivedEvent.WaitOne();
}
else
{
if (!replyReceivedEvent.WaitOne((int)AsyncWaitTime * 1000))
{
throw new TimeoutException(Resources.InvalidTopic + "'" + topic + "'.");
}
}
if (lastException != null)
{
throw new Exception(lastException);
}
}
/// <inheritdoc />
public bool Receive<T>(ReceiveParameters<T> args)
{
if (!IsOpen)
{
throw new InvalidOperationException(Resources.MediaIsClosed);
}
return syncBase.Receive(args);
}
/// <inheritdoc />
public void Send(object data)
{
(this as IGXMedia).Send(data, null);
}
void IGXMedia.Send(object data, string receiver)
{
byte[] tmp = (byte[])data;
if (tmp != null)
{
BytesSent += (ulong)tmp.Length;
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Send, sender = clientId, frame = Common.GXCommon.ToHex(tmp) };
PublishMessageAsync(msg).Wait();
}
}
/// <inheritdoc />
public uint AsyncWaitTime
{
get;
set;
}
/// <inheritdoc />
public EventWaitHandle AsyncWaitHandle
{
get
{
return replyReceivedEvent;
}
}
public uint ReceiveDelay
{
get;
set;
}
}
}
I am using this class for these in my broker I have created one queue with two topics PublisherTest.1 for Publishing msg and to subscribe PublisherTest.2 these two In Gxdirector I am giving broker address IP & Port and topic as PublisherTest.1 and when I click on connect {"id":2,"sender":"15a20ae0-ff51-4a18-a7ea-721751dedf29","type":3} like this data is publishing to topic but in director it saying invalid topic please give me the correct solution for this where I am doing wrong.
GXDLMSDirector expects a specific topic structure and I believe that is causing the issue. The Bridge example does what you want to do. The first media is for the serial port and the next one is for TCP/IP port. You can create as many medias as you want to. Check connections.json
Media topic: 60b740fb-fd16-408b-b05f-686278b2782d/1
Media topic: 60b740fb-fd16-408b-b05f-686278b2782d/2
I am using two topics one for publish (Requesttopic) and one for subscribe (Responsetopic) you given an example which uses only one topic for both publish and subscribe.
You can modify the example so there is one topic for publishing and another for subscribing if you want to, but the current implementation doesn't support it.
//
// --------------------------------------------------------------------------
// Gurux Ltd
//
//
//
// Filename: $HeadURL$
//
// Version: $Revision$,
// $Date$
// $Author$
//
// Copyright (c) Gurux Ltd
//
//---------------------------------------------------------------------------
//
// DESCRIPTION
//
// This file is a part of Gurux Device Framework.
//
// Gurux Device Framework is Open Source software; you can redistribute it
// and/or modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; version 2 of the License.
// Gurux Device Framework is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// This code is licensed under the GNU General Public License v2.
// Full text may be retrieved at http://www.gnu.org/licenses/gpl-2.0.txt
//---------------------------------------------------------------------------
using Gurux.Common;
using Gurux.MQTT.Message;
using Gurux.MQTT.Properties;
using Gurux.Shared;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.ComponentModel;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
namespace Gurux.MQTT
{
/// <summary>
/// GXMqtt implements MQTT client that sends bytes to the MQTT broker.
/// </summary>
public class GXMqtt : IGXMedia2
{
/// <summary>
/// Sync object.
/// </summary>
private object sync = new object();
internal GXSynchronousMediaBase syncBase;
/// <summary>
/// What level of tracing is used.
/// </summary>
public TraceLevel Trace
{
get
{
return syncBase.Trace;
}
set
{
syncBase.Trace = value;
}
}
/// <inheritdoc cref="IGXMedia.Validate"/>
public void Validate()
{
if (port == 0)
{
throw new Exception(Resources.InvalidBrokerPort);
}
if (!string.IsNullOrEmpty(serverAddress))
{
throw new Exception(Resources.InvalidBrokerName);
}
if (!string.IsNullOrEmpty(topic))
{
throw new Exception(Resources.InvalidTopic);
}
}
/// <inheritdoc />
[Browsable(false), ReadOnly(true)]
public object SyncRoot
{
get
{
//In some special cases when binary serialization is used this might be null
//after deserialize. Just set it.
if (sync == null)
{
sync = new object();
}
return sync;
}
}
#if NET462_OR_GREATER || WINDOWS
/// <summary>
/// Shows MQTT Properties dialog.
/// </summary>
/// <param name="parent">Owner window of the Properties dialog.</param>
/// <returns>True, if the user has accepted the changes.</returns>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ServerAddress">HostName</seealso>
/// <seealso href="PropertiesDialog.html">Properties Dialog</seealso>
public bool Properties(System.Windows.Forms.Form parent)
{
return new PropertiesForm(PropertiesForm, Resources.SettingsTxt, IsOpen, Resources.OK, Resources.Cancel,
"https://www.gurux.fi/GXMQTTProperties").ShowDialog(parent) == System.Windows.Forms.DialogResult.OK;
}
/// <inheritdoc cref="IGXMedia.PropertiesForm"/>
public System.Windows.Forms.Form PropertiesForm
{
get
{
return new Settings(this);
}
}
#endif //NET462_OR_GREATER || WINDOWS
private void NotifyPropertyChanged(string info)
{
if (m_OnPropertyChanged != null)
{
m_OnPropertyChanged(this, new PropertyChangedEventArgs(info));
}
}
/// <summary>
/// Used topic.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ClientId">Protocol</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used topic.")]
public string Topic
{
get
{
return topic;
}
set
{
if (topic != value)
{
topic = value;
NotifyPropertyChanged("Topic");
}
}
}
/// <summary>
/// Used Subscribetopic.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ClientId">Protocol</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used topic.")]
public string SubscribeTopic
{
get
{
return subscribetopic;
}
set
{
if (subscribetopic != value)
{
subscribetopic = value;
NotifyPropertyChanged("subscribetopic");
}
}
}
/// <summary>
/// Retrieves or sets used client ID.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="Topic">Topic</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used client ID.")]
public string ClientId
{
get
{
return userClientId;
}
set
{
if (userClientId != value)
{
userClientId = value;
NotifyPropertyChanged("ClientId");
}
}
}
/// <summary>
/// MQTT server address.
/// </summary>
/// <value>MQTT server address.</value>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("MQTT server address.")]
public string ServerAddress
{
get
{
return serverAddress;
}
set
{
if (serverAddress != value)
{
serverAddress = value;
NotifyPropertyChanged("ServerAddress");
}
}
}
/// <summary>
/// MQTT server port number.
/// </summary>
/// <value>MQTT server port number.</value>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="ServerAddress">ServerAddress</seealso>
[DefaultValue(1883)]
[Category("Communication")]
[Description("MQTT server port number.")]
public int Port
{
get
{
return port;
}
set
{
if (port != value)
{
port = value;
NotifyPropertyChanged("Port");
}
}
}
/// <summary>
/// Occurs when a property value changes.
/// </summary>
public event PropertyChangedEventHandler PropertyChanged
{
add
{
m_OnPropertyChanged += value;
}
remove
{
m_OnPropertyChanged -= value;
}
}
/// <summary>
/// GXNet component sends received data through this method.
/// </summary>
[Description("GXNet component sends received data through this method.")]
public event ReceivedEventHandler OnReceived
{
add
{
m_OnReceived += value;
}
remove
{
m_OnReceived -= value;
}
}
/// <summary>
/// Errors that occur after the connection is established, are sent through this method.
/// </summary>
[Description("Errors that occur after the connection is established, are sent through this method.")]
public event ErrorEventHandler OnError
{
add
{
/// <summary>
/// Media component sends notification, when its state changes.
/// </summary>
[Description("Media component sends notification, when its state changes.")]
public event MediaStateChangeEventHandler OnMediaStateChange
{
add
{
m_OnMediaStateChange += value;
}
remove
{
m_OnMediaStateChange -= value;
}
}
#if WINDOWS_PHONE
event ClientConnectedEventHandler IGXMedia.OnClientConnected
{
add {
throw new NotImplementedException();
}
remove {
throw new NotImplementedException();
}
}
event ClientDisconnectedEventHandler IGXMedia.OnClientDisconnected
{
add {
throw new NotImplementedException();
}
remove {
throw new NotImplementedException();
}
}
#else
/// <summary>
/// Called when the client is establishing a connection with a Net Server.
/// </summary>
[Description("Called when the client is establishing a connection with a Net Server.")]
public event ClientConnectedEventHandler OnClientConnected
{
add
{
m_OnClientConnected += value;
}
remove
{
m_OnClientConnected -= value;
}
}
/// <summary>
/// Called when the client has been disconnected from the network server.
/// </summary>
[Description("Called when the client has been disconnected from the network server.")]
public event ClientDisconnectedEventHandler OnClientDisconnected
{
add
{
m_OnClientDisconnected += value;
}
remove
{
m_OnClientDisconnected -= value;
}
}
/// <inheritdoc />
[Description("Called when the Media is sending or receiving data.")]
public event TraceEventHandler OnTrace
{
add
{
m_OnTrace += value;
}
remove
{
m_OnTrace -= value;
}
}
/// <inheritdoc />
public void Send(object data)
{
(this as IGXMedia).Send(data, null);
}
void IGXMedia.Send(object data, string receiver)
{
byte[] tmp = (byte[])data;
if (tmp != null)
{
BytesSent += (ulong)tmp.Length;
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Send, sender = clientId, frame = Common.GXCommon.ToHex(tmp) };
PublishMessageAsync(msg).Wait();
}
}
/// <inheritdoc />
public uint AsyncWaitTime
{
get;
set;
}
/// <inheritdoc />
public EventWaitHandle AsyncWaitHandle
{
get
{
return replyReceivedEvent;
}
}
public uint ReceiveDelay
{
get;
set;
}
}
}
this is my code which is modified for publish to one topic and subscribe to another topic but when an open message is published to subscribed topic then only AARQ is publishing to publish topic but I want to publish AARQ first without any acknowledgement please help me in this implementation
Can you please provide an example to communicate the dlms meter with mqtt two topics one for publish and another for subscribe. I have tried so many ways I am strucking in with out acknowledgement I need to publish AARQ but it is not working
Hi, What kind of MQTT…
Hi,
What kind of MQTT messages your meter is using? Can you share the structure of the envelope or one MQTT example message?
BR,
Mikko
My meter supports DLMS…
My meter supports DLMS Protocol and I am trying to connect to my meter using Gurux director in that I am selecting media as MQTT and giving my RMQ broker IP and topic but I want to publish my request to one topic and subscribe to another topic for meter sending responses
From gurux director {"id":1,…
From gurux director {"id":1,"sender":"PublisherTest.Request"}
{"frame":"00 01 00 10 00 01 00 1F 60 1D A1 09 06 07 60 85 74 05 08 01 01 BE 10 04 0E 01 00 00 00 06 5F 1F 04 00 62 1E 5D FF FF","id":2,"sender":"PublisherTest.Request","type":1} like this frames are sending
Hi, Check the Gurux Bridge…
Hi,
Check the Gurux Bridge and Broker. That will do what you want to do.
https://github.com/Gurux/Gurux.MQTT
You can replace Broker with RabbitMQ.
BR,
Mikko
Hi Thank you for your reply…
Hi
Thank you for your reply.
I am using GXMQTT class and I can Publish and Subscribe to same topic it is working for me but when I am trying to publish to one topic and subscribe from another topic it is saying invalid topic
Hi, Check the bridge. It…
Hi,
Check the bridge. It will do what you want to do. OnReceived sends received data fro the meter to the broker.
https://github.com/Gurux/Gurux.MQTT/blob/62f10a9dc17f7977013fc619823cf5…
BR,
Mikko
I have tried it but it not…
I have tried it but it not working. Two topics are not taking.
Hi, What your meter is…
Hi,
What your meter is responding? What kind of topic do you try to send?
BR,
Mikko
I have created two topics in…
I have created two topics in my rabbit mq broker under one queue and trying to publish the data to request topic and subscribe to response topic but it is not working when I am giving two topics one for publish and other for subscribe it giving me invalid request topic
Can I share my gxmqtt file…
Shall I share my gxmqtt file where I am giving publish topic and subscribe topic here.
Hi, Your MQTT format must be…
Hi,
Your MQTT format must be in the same format that Bridge is using.
BR,
Mikko
Yes we are using same format…
Yes we are using same format but when connecting only invalid topic exception is coming
Hi This my code in GuruxMQTT…
Hi
This my code in GuruxMQTT class file
mqttClient.ConnectedAsync += async t =>
{
// Subscribe to a topic
mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("gw-response.Eficaa_GW*").WithAtMostOnceQoS().Build()).Wait();
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Opening));
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Open, sender = clientId };
await PublishMessageAsync(msg,"gw-request.Eficaa_GW*");
}; this connect method is not working I am using two topics but it giving me exception as invalid "gw-request.Eficaa_GW*" topic please give me the solution for this.
Is it correct code I am…
Is it correct code I am using or not
Hi, Can you try to subscribe…
Hi,
Can you try to subscribe in the same way as the example bridge or use it? This is now tested and it seems to work.
https://github.com/Gurux/Gurux.MQTT/blob/62f10a9dc17f7977013fc619823cf5…
Start the Broker first and then the Bridge.
When you start the bridge you should see available medias like this:
Connecting to the Broker in address: localhost:1883
Bridge topic: 60b740fb-fd16-408b-b05f-686278b2782d
Media topic: 60b740fb-fd16-408b-b05f-686278b2782d/1
Media topic: 60b740fb-fd16-408b-b05f-686278b2782d/2
Press Esc to close application or delete clear the console.
Check that your serial port and TCP/IP settings are correct in the connections.json file.
In GXDLMSDirector check that broker address and port are correct. Then add the topic.
The example uses 60b740fb-fd16-408b-b05f-686278b2782d/1 for the TCP/IP and
60b740fb-fd16-408b-b05f-686278b2782d/2 for the serial port.
BR,
Mikko
Hi I am using only TCP/IP…
Hi
I am using only TCP/IP connection I am using my rmq broker with queue having two topics but when I run the bridge in broker only a dynamic queue is created in that two topics created one for tcp/ip and another for serial port connection but in my case
I want to publish to one topic and meter will publish to another topic from that I have to subscribe and again I have to publish this scenario I Want to work
// // ----------------------…
//
// --------------------------------------------------------------------------
// Gurux Ltd
//
//
//
// Filename: $HeadURL$
//
// Version: $Revision$,
// $Date$
// $Author$
//
// Copyright (c) Gurux Ltd
//
//---------------------------------------------------------------------------
//
// DESCRIPTION
//
// This file is a part of Gurux Device Framework.
//
// Gurux Device Framework is Open Source software; you can redistribute it
// and/or modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; version 2 of the License.
// Gurux Device Framework is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// This code is licensed under the GNU General Public License v2.
// Full text may be retrieved at http://www.gnu.org/licenses/gpl-2.0.txt
//---------------------------------------------------------------------------
using Gurux.Common;
using Gurux.MQTT.Message;
using Gurux.MQTT.Properties;
using Gurux.Shared;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.ComponentModel;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
//using static System.Windows.Forms.VisualStyles.VisualStyleElement.TrackBar;
namespace Gurux.MQTT
{
/// <summary>
/// GXMqtt implements MQTT client that sends bytes to the MQTT broker.
/// </summary>
public class GXMqtt : IGXMedia2
{
/// <summary>
/// Sync object.
/// </summary>
private object sync = new object();
internal GXSynchronousMediaBase syncBase;
//Events
PropertyChangedEventHandler m_OnPropertyChanged;
MediaStateChangeEventHandler m_OnMediaStateChange;
TraceEventHandler m_OnTrace;
ClientConnectedEventHandler m_OnClientConnected;
ClientDisconnectedEventHandler m_OnClientDisconnected;
internal ErrorEventHandler m_OnError;
internal ReceivedEventHandler m_OnReceived;
internal AutoResetEvent replyReceivedEvent = new AutoResetEvent(false);
/// <summary>
/// Last exception.
/// </summary>
private string lastException;
/// <summary>
/// Unique Message ID.
/// </summary>
private UInt16 messageId;
private UInt16 MessageId
{
get
{
return ++messageId;
}
}
/// <summary>
/// Used topic.
/// </summary>
string topic;
/// <summary>
/// Used client ID.
/// </summary>
string clientId;
/// <summary>
/// Client ID that user want's to use.
/// </summary>
string userClientId;
/// <summary>
/// Server address.
/// </summary>
string serverAddress;
/// <summary>
/// Host port.
/// </summary>
int port = 1883;
static readonly MqttFactory factory = new MqttFactory();
IMqttClient mqttClient;
string IGXMedia.Name => "MQTT";
/// <summary>
/// What level of tracing is used.
/// </summary>
public TraceLevel Trace
{
get
{
return syncBase.Trace;
}
set
{
syncBase.Trace = value;
}
}
/// <inheritdoc cref="IGXMedia.IsOpen"/>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Close">Close</seealso>
[Browsable(false)]
public bool IsOpen => mqttClient != null && mqttClient.IsConnected;
string IGXMedia.MediaType => "MQTT";
bool IGXMedia.Enabled => true;
/// <summary>
/// Constructor.
/// </summary>
public GXMqtt()
{
syncBase = new GXSynchronousMediaBase(1024);
ConfigurableSettings = AvailableMediaSettings.All;
}
string IGXMedia.Settings
{
get
{
string tmp = "";
if (!string.IsNullOrEmpty(serverAddress))
{
tmp += "<IP>" + serverAddress + "</IP>" + Environment.NewLine;
}
if (port != 0)
{
tmp += "<Port>" + port + "</Port>" + Environment.NewLine;
}
tmp += "<Topic>" + topic + "</Topic>" + Environment.NewLine;
//if (!string.IsNullOrEmpty(ClientId))
//{
// tmp += "<ClientId>" + ClientId + "</ClientId>" + Environment.NewLine;
//}
return tmp;
}
set
{
if (!string.IsNullOrEmpty(value))
{
XmlReaderSettings settings = new XmlReaderSettings(); ;
settings.ConformanceLevel = ConformanceLevel.Fragment;
using (XmlReader xmlReader = XmlReader.Create(new System.IO.StringReader(value), settings))
{
while (!xmlReader.EOF)
{
if (xmlReader.IsStartElement())
{
switch (xmlReader.Name)
{
case "Topic":
topic = (string)xmlReader.ReadElementContentAs(typeof(string), null);
break;
case "Port":
port = (int)(xmlReader.ReadElementContentAs(typeof(int), null));
break;
case "IP":
serverAddress = (string)xmlReader.ReadElementContentAs(typeof(string), null);
break;
case "ClientId":
ClientId = (string)xmlReader.ReadElementContentAs(typeof(string), null);
break;
}
}
else
{
xmlReader.Read();
}
}
}
}
}
}
/// <inheritdoc cref="IGXMedia.Synchronous"/>
public object Synchronous { get; } = new object();
/// <inheritdoc cref="IGXMedia.IsSynchronous"/>
public bool IsSynchronous
{
get
{
bool reserved = System.Threading.Monitor.TryEnter(Synchronous, 0);
if (reserved)
{
System.Threading.Monitor.Exit(Synchronous);
}
return !reserved;
}
}
/// <inheritdoc cref="IGXMedia.ResetSynchronousBuffer"/>
public void ResetSynchronousBuffer()
{
lock (syncBase.receivedSync)
{
syncBase.receivedSize = 0;
}
}
/// <inheritdoc cref="IGXMedia.Validate"/>
public void Validate()
{
if (port == 0)
{
throw new Exception(Resources.InvalidBrokerPort);
}
if (!string.IsNullOrEmpty(serverAddress))
{
throw new Exception(Resources.InvalidBrokerName);
}
if (!string.IsNullOrEmpty(topic))
{
throw new Exception(Resources.InvalidTopic);
}
}
/// <summary>
/// Sent byte count.
/// </summary>
/// <seealso cref="BytesReceived">BytesReceived</seealso>
/// <seealso cref="ResetByteCounters">ResetByteCounters</seealso>
[Browsable(false)]
public UInt64 BytesSent
{
get;
private set;
}
/// <summary>
/// Received byte count.
/// </summary>
/// <seealso cref="BytesSent">BytesSent</seealso>
/// <seealso cref="ResetByteCounters">ResetByteCounters</seealso>
[Browsable(false)]
public UInt64 BytesReceived
{
get;
private set;
}
/// <inheritdoc cref="IGXMedia.Eop"/>
public object Eop
{
get;
set;
}
/// <inheritdoc cref="IGXMedia.ConfigurableSettings"/>
public AvailableMediaSettings ConfigurableSettings
{
get
{
return (AvailableMediaSettings)((IGXMedia)this).ConfigurableSettings;
}
set
{
((IGXMedia)this).ConfigurableSettings = (int)value;
}
}
/// <inheritdoc cref="IGXMedia.ConfigurableSettings"/>
int IGXMedia.ConfigurableSettings
{
get;
set;
}
/// <inheritdoc />
object IGXMedia.Tag { get; set; }
IGXMediaContainer IGXMedia.MediaContainer { get; set; }
/// <inheritdoc />
[Browsable(false), ReadOnly(true)]
public object SyncRoot
{
get
{
//In some special cases when binary serialization is used this might be null
//after deserialize. Just set it.
if (sync == null)
{
sync = new object();
}
return sync;
}
}
#if NET462_OR_GREATER || WINDOWS
/// <summary>
/// Shows MQTT Properties dialog.
/// </summary>
/// <param name="parent">Owner window of the Properties dialog.</param>
/// <returns>True, if the user has accepted the changes.</returns>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ServerAddress">HostName</seealso>
/// <seealso href="PropertiesDialog.html">Properties Dialog</seealso>
public bool Properties(System.Windows.Forms.Form parent)
{
return new PropertiesForm(PropertiesForm, Resources.SettingsTxt, IsOpen, Resources.OK, Resources.Cancel,
"https://www.gurux.fi/GXMQTTProperties").ShowDialog(parent) == System.Windows.Forms.DialogResult.OK;
}
/// <inheritdoc cref="IGXMedia.PropertiesForm"/>
public System.Windows.Forms.Form PropertiesForm
{
get
{
return new Settings(this);
}
}
#endif //NET462_OR_GREATER || WINDOWS
private void NotifyPropertyChanged(string info)
{
if (m_OnPropertyChanged != null)
{
m_OnPropertyChanged(this, new PropertyChangedEventArgs(info));
}
}
/// <summary>
/// Used topic.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ClientId">Protocol</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used topic.")]
public string Topic
{
get
{
return topic;
}
set
{
if (topic != value)
{
topic = value;
NotifyPropertyChanged("Topic");
}
}
}
/// <summary>
/// Retrieves or sets used client ID.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="Topic">Topic</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used client ID.")]
public string ClientId
{
get
{
return userClientId;
}
set
{
if (userClientId != value)
{
userClientId = value;
NotifyPropertyChanged("ClientId");
}
}
}
/// <summary>
/// MQTT server address.
/// </summary>
/// <value>MQTT server address.</value>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("MQTT server address.")]
public string ServerAddress
{
get
{
return serverAddress;
}
set
{
if (serverAddress != value)
{
serverAddress = value;
NotifyPropertyChanged("ServerAddress");
}
}
}
/// <summary>
/// MQTT server port number.
/// </summary>
/// <value>MQTT server port number.</value>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="ServerAddress">ServerAddress</seealso>
[DefaultValue(1883)]
[Category("Communication")]
[Description("MQTT server port number.")]
public int Port
{
get
{
return port;
}
set
{
if (port != value)
{
port = value;
NotifyPropertyChanged("Port");
}
}
}
/// <inheritdoc />
public override string ToString()
{
StringBuilder sb = new StringBuilder();
sb.Append(serverAddress);
sb.Append(':');
sb.Append(port);
sb.Append(' ');
sb.Append(topic);
return sb.ToString();
}
/// <summary>
/// Occurs when a property value changes.
/// </summary>
public event PropertyChangedEventHandler PropertyChanged
{
add
{
m_OnPropertyChanged += value;
}
remove
{
m_OnPropertyChanged -= value;
}
}
/// <summary>
/// GXNet component sends received data through this method.
/// </summary>
[Description("GXNet component sends received data through this method.")]
public event ReceivedEventHandler OnReceived
{
add
{
m_OnReceived += value;
}
remove
{
m_OnReceived -= value;
}
}
/// <summary>
/// Errors that occur after the connection is established, are sent through this method.
/// </summary>
[Description("Errors that occur after the connection is established, are sent through this method.")]
public event ErrorEventHandler OnError
{
add
{
m_OnError += value;
}
remove
{
m_OnError -= value;
}
}
/// <summary>
/// Media component sends notification, when its state changes.
/// </summary>
[Description("Media component sends notification, when its state changes.")]
public event MediaStateChangeEventHandler OnMediaStateChange
{
add
{
m_OnMediaStateChange += value;
}
remove
{
m_OnMediaStateChange -= value;
}
}
#if WINDOWS_PHONE
event ClientConnectedEventHandler IGXMedia.OnClientConnected
{
add {
throw new NotImplementedException();
}
remove {
throw new NotImplementedException();
}
}
event ClientDisconnectedEventHandler IGXMedia.OnClientDisconnected
{
add {
throw new NotImplementedException();
}
remove {
throw new NotImplementedException();
}
}
#else
/// <summary>
/// Called when the client is establishing a connection with a Net Server.
/// </summary>
[Description("Called when the client is establishing a connection with a Net Server.")]
public event ClientConnectedEventHandler OnClientConnected
{
add
{
m_OnClientConnected += value;
}
remove
{
m_OnClientConnected -= value;
}
}
/// <summary>
/// Called when the client has been disconnected from the network server.
/// </summary>
[Description("Called when the client has been disconnected from the network server.")]
public event ClientDisconnectedEventHandler OnClientDisconnected
{
add
{
m_OnClientDisconnected += value;
}
remove
{
m_OnClientDisconnected -= value;
}
}
/// <inheritdoc />
[Description("Called when the Media is sending or receiving data.")]
public event TraceEventHandler OnTrace
{
add
{
m_OnTrace += value;
}
remove
{
m_OnTrace -= value;
}
}
#endif
/// <summary>
/// Publish message.
/// </summary>
/// <param name="msg"></param>
private async Task PublishMessageAsync(GXMessage msg)
{
string str;
//string subscribeTopic = "gw-response.Eficaa_GW*";
#if NET462
var parser = new Gurux.Common.JSon.GXJsonParser();
str = parser.Serialize(msg);
#else
str = System.Text.Json.JsonSerializer.Serialize(msg);
#endif
MqttApplicationMessage message = new MqttApplicationMessageBuilder()
.WithTopic("PublisherTest.1")
.WithPayload(str)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.WithRetainFlag()
.Build();
await mqttClient.PublishAsync(message);
}
/// <inheritdoc />
public void Close()
{
lastException = null;
if (mqttClient != null && mqttClient.IsConnected)
{
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Close, sender = clientId };
try
{
PublishMessageAsync(msg).Wait();
}
catch (Exception)
{
replyReceivedEvent.Set();
}
if (AsyncWaitTime == 0)
{
replyReceivedEvent.WaitOne();
}
else
{
replyReceivedEvent.WaitOne((int)AsyncWaitTime * 1000);
}
if (mqttClient != null)
{
mqttClient.DisconnectAsync().Wait();
}
mqttClient = null;
if (lastException != null)
{
throw new Exception(lastException);
}
}
}
void IGXMedia.Copy(object target)
{
GXMqtt tmp = (GXMqtt)target;
port = tmp.port;
serverAddress = tmp.serverAddress;
topic = tmp.topic;
}
private int HandleReceivedData(int bytes, byte[] buff, string sender)
{
BytesReceived += (uint)bytes;
if (this.IsSynchronous)
{
TraceEventArgs arg;
lock (syncBase.receivedSync)
{
int index = syncBase.receivedSize;
syncBase.AppendData(buff, 0, bytes);
if (bytes != 0 && Trace == TraceLevel.Verbose && m_OnTrace != null)
{
arg = new TraceEventArgs(TraceTypes.Received, buff, 0, bytes, null);
m_OnTrace(this, arg);
}
if (bytes != 0 && Eop != null) //Search Eop if given.
{
if (Eop is Array)
{
foreach (object eop in (Array)Eop)
{
bytes = Gurux.Common.GXCommon.IndexOf(syncBase.m_Received, Gurux.Common.GXCommon.GetAsByteArray(eop), index, syncBase.receivedSize);
if (bytes != -1)
{
break;
}
}
}
else
{
bytes = Gurux.Common.GXCommon.IndexOf(syncBase.m_Received, Gurux.Common.GXCommon.GetAsByteArray(Eop), index, syncBase.receivedSize);
}
}
if (bytes != -1)
{
syncBase.receivedEvent.Set();
}
}
}
else
{
if (m_OnReceived != null)
{
syncBase.receivedSize = 0;
byte[] data = new byte[bytes];
Array.Copy(buff, data, bytes);
if (Trace == TraceLevel.Verbose && m_OnTrace != null)
{
m_OnTrace(this, new TraceEventArgs(TraceTypes.Received, data, null));
}
m_OnReceived(this, new ReceiveEventArgs(data, sender));
}
else if (Trace == TraceLevel.Verbose && m_OnTrace != null)
{
m_OnTrace(this, new TraceEventArgs(TraceTypes.Received, buff, 0, bytes, null));
}
}
return bytes;
}
/// <inheritdoc />
public void Open()
{
Close();
mqttClient = factory.CreateMqttClient();
if (string.IsNullOrEmpty(userClientId))
{
clientId = Guid.NewGuid().ToString();
}
else
{
clientId = userClientId;
}
var options = new MqttClientOptionsBuilder()
.WithTcpServer(serverAddress).WithClientId(clientId).WithCredentials("admin","admin")
.Build();
mqttClient.ApplicationMessageReceivedAsync += async t =>
{
string str = ASCIIEncoding.ASCII.GetString(t.ApplicationMessage.Payload);
GXMessage msg;
#if NET462
var parser = new Gurux.Common.JSon.GXJsonParser();
msg = parser.Deserialize<GXMessage>(str);
#else
msg = System.Text.Json.JsonSerializer.Deserialize<GXMessage>(str);
#endif
//if (msg.id == messageId || (MessageType)msg.type == MessageType.Close || (MessageType)msg.type == MessageType.Exception)
{
switch ((MessageType)msg.type)
{
case MessageType.Open:
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Open));
replyReceivedEvent.Set();
break;
case MessageType.Send:
break;
case MessageType.Receive:
byte[] bytes = Gurux.Common.GXCommon.HexToBytes(msg.frame);
replyReceivedEvent.Set();
if (bytes.Length != 0)
{
HandleReceivedData(bytes.Length, bytes, t.ClientId);
}
break;
case MessageType.Close:
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Closed));
replyReceivedEvent.Set();
break;
case MessageType.Exception:
lastException = msg.exception;
replyReceivedEvent.Set();
break;
}
}
//else
{
m_OnTrace?.Invoke(this, new TraceEventArgs(TraceTypes.Info, Resources.UnknownReply + msg, msg.sender));
}
};
mqttClient.ConnectedAsync += async t =>
{
// Subscribe to a topic
//mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("gw-response.Eficaa_GW").WithAtMostOnceQoS().Build()).Wait();
//m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Opening));
//GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Open, sender = clientId };
//await PublishMessageAsync(msg);
var mqttSubscribeOptions = factory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic("PublisherTest.2");
})
.Build();
await mqttClient.SubscribeAsync(mqttSubscribeOptions);
//if (trace > TraceLevel.Warning)
//{
// Console.WriteLine("--- {0} Subscribed. ", it.Name);
//}
};
//mqttClient.ConnectAsync(options);
GXMessage msg1 = new GXMessage() { id = MessageId, type = (int)MessageType.Open, sender = clientId };
PublishMessageAsync(msg1);
mqttClient.DisconnectedAsync += async t =>
{
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Closed));
replyReceivedEvent.Set();
};
try
{
replyReceivedEvent.Reset();
if (AsyncWaitTime == 0)
{
mqttClient.ConnectAsync(options).Wait();
}
else
{
mqttClient.ConnectAsync(options).Wait((int)AsyncWaitTime * 1000);
}
//mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("gw-response.Eficaa_GW").WithAtMostOnceQoS().Build()).Wait();
}
catch (AggregateException ex)
{
if (mqttClient != null)
{
mqttClient.DisconnectAsync().Wait(10000);
}
mqttClient = null;
throw ex.InnerException;
}
if (AsyncWaitTime == 0)
{
replyReceivedEvent.WaitOne();
}
else
{
if (!replyReceivedEvent.WaitOne((int)AsyncWaitTime * 1000))
{
throw new TimeoutException(Resources.InvalidTopic + "'" + topic + "'.");
}
}
if (lastException != null)
{
throw new Exception(lastException);
}
}
/// <inheritdoc />
public bool Receive<T>(ReceiveParameters<T> args)
{
if (!IsOpen)
{
throw new InvalidOperationException(Resources.MediaIsClosed);
}
return syncBase.Receive(args);
}
/// <summary>
/// Resets BytesReceived and BytesSent counters.
/// </summary>
/// <seealso cref="BytesSent">BytesSent</seealso>
/// <seealso cref="BytesReceived">BytesReceived</seealso>
public void ResetByteCounters()
{
BytesSent = BytesReceived = 0;
}
/// <inheritdoc />
public void Send(object data)
{
(this as IGXMedia).Send(data, null);
}
void IGXMedia.Send(object data, string receiver)
{
byte[] tmp = (byte[])data;
if (tmp != null)
{
BytesSent += (ulong)tmp.Length;
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Send, sender = clientId, frame = Common.GXCommon.ToHex(tmp) };
PublishMessageAsync(msg).Wait();
}
}
/// <inheritdoc />
public uint AsyncWaitTime
{
get;
set;
}
/// <inheritdoc />
public EventWaitHandle AsyncWaitHandle
{
get
{
return replyReceivedEvent;
}
}
public uint ReceiveDelay
{
get;
set;
}
}
}
I am using this class for these in my broker I have created one queue with two topics PublisherTest.1 for Publishing msg and to subscribe PublisherTest.2 these two In Gxdirector I am giving broker address IP & Port and topic as PublisherTest.1 and when I click on connect {"id":2,"sender":"15a20ae0-ff51-4a18-a7ea-721751dedf29","type":3} like this data is publishing to topic but in director it saying invalid topic please give me the correct solution for this where I am doing wrong.
Hi Yamini, GXDLMSDirector…
Hi Yamini,
GXDLMSDirector expects a specific topic structure and I believe that is causing the issue. The Bridge example does what you want to do. The first media is for the serial port and the next one is for TCP/IP port. You can create as many medias as you want to. Check connections.json
Media topic: 60b740fb-fd16-408b-b05f-686278b2782d/1
Media topic: 60b740fb-fd16-408b-b05f-686278b2782d/2
BR,
Mikko
Hi, I am using two topics…
Hi,
I am using two topics one for publish (Requesttopic) and one for subscribe (Responsetopic) you given an example which uses only one topic for both publish and subscribe.
Hi, Is there a reason why…
Hi,
Is there a reason why you can't use the current implementation? You can modify it as much as you like, but giving support is very complicated.
BR,
Mikko
Hi One topic for publish…
Hi
One topic for publish and another for subscribe is it possible to use like this or not
Hi, You can modify the…
Hi,
You can modify the example so there is one topic for publishing and another for subscribing if you want to, but the current implementation doesn't support it.
BR,
Mikko
Can you please provide the…
Can you please provide the implementation for it I am trying but it is not working.
// // ----------------------…
//
// --------------------------------------------------------------------------
// Gurux Ltd
//
//
//
// Filename: $HeadURL$
//
// Version: $Revision$,
// $Date$
// $Author$
//
// Copyright (c) Gurux Ltd
//
//---------------------------------------------------------------------------
//
// DESCRIPTION
//
// This file is a part of Gurux Device Framework.
//
// Gurux Device Framework is Open Source software; you can redistribute it
// and/or modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; version 2 of the License.
// Gurux Device Framework is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// This code is licensed under the GNU General Public License v2.
// Full text may be retrieved at http://www.gnu.org/licenses/gpl-2.0.txt
//---------------------------------------------------------------------------
using Gurux.Common;
using Gurux.MQTT.Message;
using Gurux.MQTT.Properties;
using Gurux.Shared;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.ComponentModel;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
namespace Gurux.MQTT
{
/// <summary>
/// GXMqtt implements MQTT client that sends bytes to the MQTT broker.
/// </summary>
public class GXMqtt : IGXMedia2
{
/// <summary>
/// Sync object.
/// </summary>
private object sync = new object();
internal GXSynchronousMediaBase syncBase;
//Events
PropertyChangedEventHandler m_OnPropertyChanged;
MediaStateChangeEventHandler m_OnMediaStateChange;
TraceEventHandler m_OnTrace;
ClientConnectedEventHandler m_OnClientConnected;
ClientDisconnectedEventHandler m_OnClientDisconnected;
internal ErrorEventHandler m_OnError;
internal ReceivedEventHandler m_OnReceived;
internal AutoResetEvent replyReceivedEvent = new AutoResetEvent(false);
/// <summary>
/// Last exception.
/// </summary>
private string lastException;
/// <summary>
/// Unique Message ID.
/// </summary>
private UInt16 messageId;
private UInt16 MessageId
{
get
{
return ++messageId;
}
}
/// <summary>
/// Used topic.
/// </summary>
string topic;
/// <summary>
/// Used subscribetopic.
/// </summary>
string subscribetopic;
/// <summary>
/// Used client ID.
/// </summary>
string clientId;
/// <summary>
/// Client ID that user want's to use.
/// </summary>
string userClientId;
/// <summary>
/// Server address.
/// </summary>
string serverAddress;
/// <summary>
/// Host port.
/// </summary>
int port = 1883;
static readonly MqttFactory factory = new MqttFactory();
IMqttClient mqttClient;
string IGXMedia.Name => "MQTT";
/// <summary>
/// What level of tracing is used.
/// </summary>
public TraceLevel Trace
{
get
{
return syncBase.Trace;
}
set
{
syncBase.Trace = value;
}
}
/// <inheritdoc cref="IGXMedia.IsOpen"/>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Close">Close</seealso>
[Browsable(false)]
public bool IsOpen => mqttClient != null && mqttClient.IsConnected;
string IGXMedia.MediaType => "MQTT";
bool IGXMedia.Enabled => true;
/// <summary>
/// Constructor.
/// </summary>
public GXMqtt()
{
syncBase = new GXSynchronousMediaBase(1024);
ConfigurableSettings = AvailableMediaSettings.All;
}
string IGXMedia.Settings
{
get
{
string tmp = "";
if (!string.IsNullOrEmpty(serverAddress))
{
tmp += "<IP>" + serverAddress + "</IP>" + Environment.NewLine;
}
if (port != 0)
{
tmp += "<Port>" + port + "</Port>" + Environment.NewLine;
}
tmp += "<Topic>" + topic + "</Topic>" + Environment.NewLine;
tmp += "<Topic>" + subscribetopic + "</Topic>" + Environment.NewLine;
//if (!string.IsNullOrEmpty(ClientId))
//{
// tmp += "<ClientId>" + ClientId + "</ClientId>" + Environment.NewLine;
//}
return tmp;
}
set
{
if (!string.IsNullOrEmpty(value))
{
XmlReaderSettings settings = new XmlReaderSettings(); ;
settings.ConformanceLevel = ConformanceLevel.Fragment;
using (XmlReader xmlReader = XmlReader.Create(new System.IO.StringReader(value), settings))
{
while (!xmlReader.EOF)
{
if (xmlReader.IsStartElement())
{
switch (xmlReader.Name)
{
case "Topic":
topic = (string)xmlReader.ReadElementContentAs(typeof(string), null);
break;
case "Port":
port = (int)(xmlReader.ReadElementContentAs(typeof(int), null));
break;
case "IP":
serverAddress = (string)xmlReader.ReadElementContentAs(typeof(string), null);
break;
case "ClientId":
ClientId = (string)xmlReader.ReadElementContentAs(typeof(string), null);
break;
}
}
else
{
xmlReader.Read();
}
}
}
}
}
}
/// <inheritdoc cref="IGXMedia.Synchronous"/>
public object Synchronous { get; } = new object();
/// <inheritdoc cref="IGXMedia.IsSynchronous"/>
public bool IsSynchronous
{
get
{
bool reserved = System.Threading.Monitor.TryEnter(Synchronous, 0);
if (reserved)
{
System.Threading.Monitor.Exit(Synchronous);
}
return !reserved;
}
}
/// <inheritdoc cref="IGXMedia.ResetSynchronousBuffer"/>
public void ResetSynchronousBuffer()
{
lock (syncBase.receivedSync)
{
syncBase.receivedSize = 0;
}
}
/// <inheritdoc cref="IGXMedia.Validate"/>
public void Validate()
{
if (port == 0)
{
throw new Exception(Resources.InvalidBrokerPort);
}
if (!string.IsNullOrEmpty(serverAddress))
{
throw new Exception(Resources.InvalidBrokerName);
}
if (!string.IsNullOrEmpty(topic))
{
throw new Exception(Resources.InvalidTopic);
}
}
/// <summary>
/// Sent byte count.
/// </summary>
/// <seealso cref="BytesReceived">BytesReceived</seealso>
/// <seealso cref="ResetByteCounters">ResetByteCounters</seealso>
[Browsable(false)]
public UInt64 BytesSent
{
get;
private set;
}
/// <summary>
/// Received byte count.
/// </summary>
/// <seealso cref="BytesSent">BytesSent</seealso>
/// <seealso cref="ResetByteCounters">ResetByteCounters</seealso>
[Browsable(false)]
public UInt64 BytesReceived
{
get;
private set;
}
/// <inheritdoc cref="IGXMedia.Eop"/>
public object Eop
{
get;
set;
}
/// <inheritdoc cref="IGXMedia.ConfigurableSettings"/>
public AvailableMediaSettings ConfigurableSettings
{
get
{
return (AvailableMediaSettings)((IGXMedia)this).ConfigurableSettings;
}
set
{
((IGXMedia)this).ConfigurableSettings = (int)value;
}
}
/// <inheritdoc cref="IGXMedia.ConfigurableSettings"/>
int IGXMedia.ConfigurableSettings
{
get;
set;
}
/// <inheritdoc />
object IGXMedia.Tag { get; set; }
IGXMediaContainer IGXMedia.MediaContainer { get; set; }
/// <inheritdoc />
[Browsable(false), ReadOnly(true)]
public object SyncRoot
{
get
{
//In some special cases when binary serialization is used this might be null
//after deserialize. Just set it.
if (sync == null)
{
sync = new object();
}
return sync;
}
}
#if NET462_OR_GREATER || WINDOWS
/// <summary>
/// Shows MQTT Properties dialog.
/// </summary>
/// <param name="parent">Owner window of the Properties dialog.</param>
/// <returns>True, if the user has accepted the changes.</returns>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ServerAddress">HostName</seealso>
/// <seealso href="PropertiesDialog.html">Properties Dialog</seealso>
public bool Properties(System.Windows.Forms.Form parent)
{
return new PropertiesForm(PropertiesForm, Resources.SettingsTxt, IsOpen, Resources.OK, Resources.Cancel,
"https://www.gurux.fi/GXMQTTProperties").ShowDialog(parent) == System.Windows.Forms.DialogResult.OK;
}
/// <inheritdoc cref="IGXMedia.PropertiesForm"/>
public System.Windows.Forms.Form PropertiesForm
{
get
{
return new Settings(this);
}
}
#endif //NET462_OR_GREATER || WINDOWS
private void NotifyPropertyChanged(string info)
{
if (m_OnPropertyChanged != null)
{
m_OnPropertyChanged(this, new PropertyChangedEventArgs(info));
}
}
/// <summary>
/// Used topic.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ClientId">Protocol</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used topic.")]
public string Topic
{
get
{
return topic;
}
set
{
if (topic != value)
{
topic = value;
NotifyPropertyChanged("Topic");
}
}
}
/// <summary>
/// Used Subscribetopic.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ClientId">Protocol</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used topic.")]
public string SubscribeTopic
{
get
{
return subscribetopic;
}
set
{
if (subscribetopic != value)
{
subscribetopic = value;
NotifyPropertyChanged("subscribetopic");
}
}
}
/// <summary>
/// Retrieves or sets used client ID.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="Topic">Topic</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used client ID.")]
public string ClientId
{
get
{
return userClientId;
}
set
{
if (userClientId != value)
{
userClientId = value;
NotifyPropertyChanged("ClientId");
}
}
}
/// <summary>
/// MQTT server address.
/// </summary>
/// <value>MQTT server address.</value>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("MQTT server address.")]
public string ServerAddress
{
get
{
return serverAddress;
}
set
{
if (serverAddress != value)
{
serverAddress = value;
NotifyPropertyChanged("ServerAddress");
}
}
}
/// <summary>
/// MQTT server port number.
/// </summary>
/// <value>MQTT server port number.</value>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="ServerAddress">ServerAddress</seealso>
[DefaultValue(1883)]
[Category("Communication")]
[Description("MQTT server port number.")]
public int Port
{
get
{
return port;
}
set
{
if (port != value)
{
port = value;
NotifyPropertyChanged("Port");
}
}
}
/// <inheritdoc />
public override string ToString()
{
StringBuilder sb = new StringBuilder();
sb.Append(serverAddress);
sb.Append(':');
sb.Append(port);
sb.Append(' ');
sb.Append(topic);
sb.Append(subscribetopic);
return sb.ToString();
}
/// <summary>
/// Occurs when a property value changes.
/// </summary>
public event PropertyChangedEventHandler PropertyChanged
{
add
{
m_OnPropertyChanged += value;
}
remove
{
m_OnPropertyChanged -= value;
}
}
/// <summary>
/// GXNet component sends received data through this method.
/// </summary>
[Description("GXNet component sends received data through this method.")]
public event ReceivedEventHandler OnReceived
{
add
{
m_OnReceived += value;
}
remove
{
m_OnReceived -= value;
}
}
/// <summary>
/// Errors that occur after the connection is established, are sent through this method.
/// </summary>
[Description("Errors that occur after the connection is established, are sent through this method.")]
public event ErrorEventHandler OnError
{
add
{
m_OnError += value;
}
remove
{
m_OnError -= value;
}
}
/// <summary>
/// Media component sends notification, when its state changes.
/// </summary>
[Description("Media component sends notification, when its state changes.")]
public event MediaStateChangeEventHandler OnMediaStateChange
{
add
{
m_OnMediaStateChange += value;
}
remove
{
m_OnMediaStateChange -= value;
}
}
#if WINDOWS_PHONE
event ClientConnectedEventHandler IGXMedia.OnClientConnected
{
add {
throw new NotImplementedException();
}
remove {
throw new NotImplementedException();
}
}
event ClientDisconnectedEventHandler IGXMedia.OnClientDisconnected
{
add {
throw new NotImplementedException();
}
remove {
throw new NotImplementedException();
}
}
#else
/// <summary>
/// Called when the client is establishing a connection with a Net Server.
/// </summary>
[Description("Called when the client is establishing a connection with a Net Server.")]
public event ClientConnectedEventHandler OnClientConnected
{
add
{
m_OnClientConnected += value;
}
remove
{
m_OnClientConnected -= value;
}
}
/// <summary>
/// Called when the client has been disconnected from the network server.
/// </summary>
[Description("Called when the client has been disconnected from the network server.")]
public event ClientDisconnectedEventHandler OnClientDisconnected
{
add
{
m_OnClientDisconnected += value;
}
remove
{
m_OnClientDisconnected -= value;
}
}
/// <inheritdoc />
[Description("Called when the Media is sending or receiving data.")]
public event TraceEventHandler OnTrace
{
add
{
m_OnTrace += value;
}
remove
{
m_OnTrace -= value;
}
}
#endif
/// <summary>
/// Publish message.
/// </summary>
/// <param name="msg"></param>
private async Task PublishMessageAsync(GXMessage msg)
{
string str;
#if NET462
var parser = new Gurux.Common.JSon.GXJsonParser();
str = parser.Serialize(msg);
#else
str = System.Text.Json.JsonSerializer.Serialize(msg);
#endif
MqttApplicationMessage message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(str)
//.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.Build();
await mqttClient.PublishAsync(message);
}
/// <inheritdoc />
public void Close()
{
lastException = null;
if (mqttClient != null && mqttClient.IsConnected)
{
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Close, sender = clientId };
try
{
PublishMessageAsync(msg).Wait();
}
catch (Exception)
{
replyReceivedEvent.Set();
}
if (AsyncWaitTime == 0)
{
replyReceivedEvent.WaitOne();
}
else
{
replyReceivedEvent.WaitOne((int)AsyncWaitTime * 1000);
}
if (mqttClient != null)
{
mqttClient.DisconnectAsync().Wait();
}
mqttClient = null;
if (lastException != null)
{
throw new Exception(lastException);
}
}
}
void IGXMedia.Copy(object target)
{
GXMqtt tmp = (GXMqtt)target;
port = tmp.port;
serverAddress = tmp.serverAddress;
topic = tmp.topic;
}
private int HandleReceivedData(int bytes, byte[] buff, string sender)
{
BytesReceived += (uint)bytes;
if (this.IsSynchronous)
{
TraceEventArgs arg;
lock (syncBase.receivedSync)
{
int index = syncBase.receivedSize;
syncBase.AppendData(buff, 0, bytes);
if (bytes != 0 && Trace == TraceLevel.Verbose && m_OnTrace != null)
{
arg = new TraceEventArgs(TraceTypes.Received, buff, 0, bytes, null);
m_OnTrace(this, arg);
}
if (bytes != 0 && Eop != null) //Search Eop if given.
{
if (Eop is Array)
{
foreach (object eop in (Array)Eop)
{
bytes = Gurux.Common.GXCommon.IndexOf(syncBase.m_Received, Gurux.Common.GXCommon.GetAsByteArray(eop), index, syncBase.receivedSize);
if (bytes != -1)
{
break;
}
}
}
else
{
bytes = Gurux.Common.GXCommon.IndexOf(syncBase.m_Received, Gurux.Common.GXCommon.GetAsByteArray(Eop), index, syncBase.receivedSize);
}
}
if (bytes != -1)
{
syncBase.receivedEvent.Set();
}
}
}
else
{
if (m_OnReceived != null)
{
syncBase.receivedSize = 0;
byte[] data = new byte[bytes];
Array.Copy(buff, data, bytes);
if (Trace == TraceLevel.Verbose && m_OnTrace != null)
{
m_OnTrace(this, new TraceEventArgs(TraceTypes.Received, data, null));
}
m_OnReceived(this, new ReceiveEventArgs(data, sender));
}
else if (Trace == TraceLevel.Verbose && m_OnTrace != null)
{
m_OnTrace(this, new TraceEventArgs(TraceTypes.Received, buff, 0, bytes, null));
}
}
return bytes;
}
/// <inheritdoc />
public void Open()
{
Close();
mqttClient = factory.CreateMqttClient();
if (string.IsNullOrEmpty(userClientId))
{
clientId = Guid.NewGuid().ToString();
}
else
{
clientId = userClientId;
}
var options = new MqttClientOptionsBuilder()
.WithTcpServer(serverAddress, port).WithClientId(clientId)
.WithCredentials("admin", "admin")
.Build();
mqttClient.ApplicationMessageReceivedAsync += async t =>
{
string str = ASCIIEncoding.ASCII.GetString(t.ApplicationMessage.Payload);
GXMessage msg;
#if NET462
var parser = new Gurux.Common.JSon.GXJsonParser();
msg = parser.Deserialize<GXMessage>(str);
#else
msg = System.Text.Json.JsonSerializer.Deserialize<GXMessage>(str);
#endif
//if (msg.id == messageId || (MessageType)msg.type == MessageType.Close || (MessageType)msg.type == MessageType.Exception)
{
switch ((MessageType)msg.type)
{
//case MessageType.Open:
// m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Open));
// replyReceivedEvent.Set();
// break;
case MessageType.Send:
break;
case MessageType.Receive:
byte[] bytes = Gurux.Common.GXCommon.HexToBytes(msg.frame);
replyReceivedEvent.Set();
if (bytes.Length != 0)
{
HandleReceivedData(bytes.Length, bytes, t.ClientId);
}
break;
case MessageType.Close:
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Closed));
replyReceivedEvent.Set();
break;
case MessageType.Exception:
lastException = msg.exception;
replyReceivedEvent.Set();
break;
}
}
//else
//{
// m_OnTrace?.Invoke(this, new TraceEventArgs(TraceTypes.Info, Resources.UnknownReply + msg, msg.sender));
//}
};
mqttClient.ConnectedAsync += async t =>
{
// Subscribe to a topic
mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(subscribetopic)
//.WithExactlyOnceQoS()
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
//.WithCleanSession()
.Build()).Wait();
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Opening));
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Send, sender = clientId };
replyReceivedEvent.Reset();
//await PublishMessageAsync(msg);
//Send
};
mqttClient.DisconnectedAsync += async t =>
{
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Closed));
replyReceivedEvent.Set();
};
try
{
replyReceivedEvent.Reset();
if (AsyncWaitTime == 0)
{
mqttClient.ConnectAsync(options).Wait();
}
else
{
mqttClient.ConnectAsync(options).Wait((int)AsyncWaitTime * 1000);
}
}
catch (AggregateException ex)
{
if (mqttClient != null)
{
mqttClient.DisconnectAsync().Wait(10000);
}
mqttClient = null;
throw ex.InnerException;
}
if (AsyncWaitTime == 0)
{
replyReceivedEvent.WaitOne();
}
else
{
if (!replyReceivedEvent.WaitOne((int)AsyncWaitTime * 1000))
{
throw new TimeoutException(Resources.InvalidTopic + "'" + topic + "'.");
}
}
if (lastException != null)
{
throw new Exception(lastException);
}
}
/// <inheritdoc />
public bool Receive<T>(ReceiveParameters<T> args)
{
if (!IsOpen)
{
throw new InvalidOperationException(Resources.MediaIsClosed);
}
return syncBase.Receive(args);
}
/// <summary>
/// Resets BytesReceived and BytesSent counters.
/// </summary>
/// <seealso cref="BytesSent">BytesSent</seealso>
/// <seealso cref="BytesReceived">BytesReceived</seealso>
public void ResetByteCounters()
{
BytesSent = BytesReceived = 0;
}
/// <inheritdoc />
public void Send(object data)
{
(this as IGXMedia).Send(data, null);
}
void IGXMedia.Send(object data, string receiver)
{
byte[] tmp = (byte[])data;
if (tmp != null)
{
BytesSent += (ulong)tmp.Length;
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Send, sender = clientId, frame = Common.GXCommon.ToHex(tmp) };
PublishMessageAsync(msg).Wait();
}
}
/// <inheritdoc />
public uint AsyncWaitTime
{
get;
set;
}
/// <inheritdoc />
public EventWaitHandle AsyncWaitHandle
{
get
{
return replyReceivedEvent;
}
}
public uint ReceiveDelay
{
get;
set;
}
}
}
this is my code which is modified for publish to one topic and subscribe to another topic but when an open message is published to subscribed topic then only AARQ is publishing to publish topic but I want to publish AARQ first without any acknowledgement please help me in this implementation
Hi, You need to do…
Hi,
You need to do modifications like that for yourself. You need to make changes to the Development folder so it can receive your topics.
BR,
Mikko
Hi I am using development…
Hi
I am using development code only but it is not working
communicate wirepas with …
communicate wirepas with .net. Is there any wirepas .net code available.
Hi , Mikko Can you please…
Hi ,
Mikko
Can you please provide an example to communicate the dlms meter with mqtt two topics one for publish and another for subscribe. I have tried so many ways I am strucking in with out acknowledgement I need to publish AARQ but it is not working
Hi Yamini, I can't say when…
Hi Yamini,
I can't say when the example for two topics is added. That is added if
our customers ask for it.
BR,
Mikko
wirepas mqtt example is there
wirepas mqtt example is there
Hi, Mikko Is Wirepass…
Hi,
Mikko
Is Wirepass example there in C#.
Hi Yamini, There are no…
Hi Yamini,
There are no Wirepass example to C#. Wirepas example is for NIC and it doesn't use MQTT.
BR,
Mikko
Hi , Mikko, Thank you for…
Hi ,
Mikko,
Thank you for your support.