Skip to main content
Home
for DLMS smart meters

Main navigation

  • Home
  • Products
  • About us
  • Open Source
  • Community
  • Forum
  • Downloads
User account menu
  • Log in

Breadcrumb

  1. Home
  2. Communicating Meters From Gurux Director

Communicating meters from Gurux Director

By Yamini, 9 January, 2024
Forums
Gurux.MQTT

Hi

I have one meter which is connected to my RabbitMQ and I am trying connect with this meter from Gurux Director but it is connecting how to resolve it.

Profile picture for user Kurumi

Kurumi

2 years 1 month ago

Hi, What kind of MQTT…

Hi,

What kind of MQTT messages your meter is using? Can you share the structure of the envelope or one MQTT example message?

BR,
Mikko

Yamini

2 years 1 month ago

My meter supports DLMS…

My meter supports DLMS Protocol and I am trying to connect to my meter using Gurux director in that I am selecting media as MQTT and giving my RMQ broker IP and topic but I want to publish my request to one topic and subscribe to another topic for meter sending responses

Yamini

2 years 1 month ago

From gurux director {"id":1,…

From gurux director {"id":1,"sender":"PublisherTest.Request"}
{"frame":"00 01 00 10 00 01 00 1F 60 1D A1 09 06 07 60 85 74 05 08 01 01 BE 10 04 0E 01 00 00 00 06 5F 1F 04 00 62 1E 5D FF FF","id":2,"sender":"PublisherTest.Request","type":1} like this frames are sending

Profile picture for user Kurumi

Kurumi

2 years 1 month ago

Hi, Check the Gurux Bridge…

Hi,

Check the Gurux Bridge and Broker. That will do what you want to do.

https://github.com/Gurux/Gurux.MQTT

You can replace Broker with RabbitMQ.

BR,
Mikko

Yamini

2 years 1 month ago

Hi Thank you for your reply…

Hi
Thank you for your reply.

I am using GXMQTT class and I can Publish and Subscribe to same topic it is working for me but when I am trying to publish to one topic and subscribe from another topic it is saying invalid topic

Profile picture for user Kurumi

Kurumi

2 years 1 month ago

Hi, Check the bridge. It…

Hi,

Check the bridge. It will do what you want to do. OnReceived sends received data fro the meter to the broker.

https://github.com/Gurux/Gurux.MQTT/blob/62f10a9dc17f7977013fc619823cf5…

BR,
Mikko

Yamini

2 years 1 month ago

I have tried it but it not…

I have tried it but it not working. Two topics are not taking.

Profile picture for user Kurumi

Kurumi

2 years 1 month ago

Hi, What your meter is…

Hi,

What your meter is responding? What kind of topic do you try to send?

BR,
Mikko

Yamini

2 years 1 month ago

I have created two topics in…

I have created two topics in my rabbit mq broker under one queue and trying to publish the data to request topic and subscribe to response topic but it is not working when I am giving two topics one for publish and other for subscribe it giving me invalid request topic

Yamini

2 years 1 month ago

Can I share my gxmqtt file…

Shall I share my gxmqtt file where I am giving publish topic and subscribe topic here.

Profile picture for user Kurumi

Kurumi

2 years 1 month ago

Hi, Your MQTT format must be…

Hi,

Your MQTT format must be in the same format that Bridge is using.

BR,

Mikko

Yamini

2 years 1 month ago

Yes we are using same format…

Yes we are using same format but when connecting only invalid topic exception is coming

Yamini

2 years 1 month ago

Hi This my code in GuruxMQTT…

Hi
This my code in GuruxMQTT class file

mqttClient.ConnectedAsync += async t =>
{

// Subscribe to a topic
mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("gw-response.Eficaa_GW*").WithAtMostOnceQoS().Build()).Wait();
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Opening));
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Open, sender = clientId };
await PublishMessageAsync(msg,"gw-request.Eficaa_GW*");
}; this connect method is not working I am using two topics but it giving me exception as invalid "gw-request.Eficaa_GW*" topic please give me the solution for this.

Yamini

2 years 1 month ago

Is it correct code I am…

Is it correct code I am using or not

Profile picture for user Kurumi

Kurumi

2 years 1 month ago

Hi, Can you try to subscribe…

Hi,

Can you try to subscribe in the same way as the example bridge or use it? This is now tested and it seems to work.

https://github.com/Gurux/Gurux.MQTT/blob/62f10a9dc17f7977013fc619823cf5…

Start the Broker first and then the Bridge.
When you start the bridge you should see available medias like this:

Connecting to the Broker in address: localhost:1883
Bridge topic: 60b740fb-fd16-408b-b05f-686278b2782d
Media topic: 60b740fb-fd16-408b-b05f-686278b2782d/1
Media topic: 60b740fb-fd16-408b-b05f-686278b2782d/2
Press Esc to close application or delete clear the console.

Check that your serial port and TCP/IP settings are correct in the connections.json file.

In GXDLMSDirector check that broker address and port are correct. Then add the topic.
The example uses 60b740fb-fd16-408b-b05f-686278b2782d/1 for the TCP/IP and
60b740fb-fd16-408b-b05f-686278b2782d/2 for the serial port.

BR,
Mikko

Yamini

2 years 1 month ago

Hi I am using only TCP/IP…

Hi
I am using only TCP/IP connection I am using my rmq broker with queue having two topics but when I run the bridge in broker only a dynamic queue is created in that two topics created one for tcp/ip and another for serial port connection but in my case
I want to publish to one topic and meter will publish to another topic from that I have to subscribe and again I have to publish this scenario I Want to work

Yamini

2 years 1 month ago

// // ----------------------…

//
// --------------------------------------------------------------------------
// Gurux Ltd
//
//
//
// Filename: $HeadURL$
//
// Version: $Revision$,
// $Date$
// $Author$
//
// Copyright (c) Gurux Ltd
//
//---------------------------------------------------------------------------
//
// DESCRIPTION
//
// This file is a part of Gurux Device Framework.
//
// Gurux Device Framework is Open Source software; you can redistribute it
// and/or modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; version 2 of the License.
// Gurux Device Framework is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// This code is licensed under the GNU General Public License v2.
// Full text may be retrieved at http://www.gnu.org/licenses/gpl-2.0.txt
//---------------------------------------------------------------------------
using Gurux.Common;
using Gurux.MQTT.Message;
using Gurux.MQTT.Properties;
using Gurux.Shared;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.ComponentModel;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
//using static System.Windows.Forms.VisualStyles.VisualStyleElement.TrackBar;

namespace Gurux.MQTT
{
/// <summary>
/// GXMqtt implements MQTT client that sends bytes to the MQTT broker.
/// </summary>
public class GXMqtt : IGXMedia2
{
/// <summary>
/// Sync object.
/// </summary>
private object sync = new object();
internal GXSynchronousMediaBase syncBase;

//Events
PropertyChangedEventHandler m_OnPropertyChanged;
MediaStateChangeEventHandler m_OnMediaStateChange;
TraceEventHandler m_OnTrace;
ClientConnectedEventHandler m_OnClientConnected;
ClientDisconnectedEventHandler m_OnClientDisconnected;
internal ErrorEventHandler m_OnError;
internal ReceivedEventHandler m_OnReceived;
internal AutoResetEvent replyReceivedEvent = new AutoResetEvent(false);

/// <summary>
/// Last exception.
/// </summary>
private string lastException;

/// <summary>
/// Unique Message ID.
/// </summary>
private UInt16 messageId;

private UInt16 MessageId
{
get
{
return ++messageId;
}
}

/// <summary>
/// Used topic.
/// </summary>
string topic;

/// <summary>
/// Used client ID.
/// </summary>
string clientId;

/// <summary>
/// Client ID that user want's to use.
/// </summary>
string userClientId;

/// <summary>
/// Server address.
/// </summary>
string serverAddress;
/// <summary>
/// Host port.
/// </summary>
int port = 1883;

static readonly MqttFactory factory = new MqttFactory();
IMqttClient mqttClient;
string IGXMedia.Name => "MQTT";

/// <summary>
/// What level of tracing is used.
/// </summary>
public TraceLevel Trace
{
get
{
return syncBase.Trace;
}
set
{
syncBase.Trace = value;
}
}

/// <inheritdoc cref="IGXMedia.IsOpen"/>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Close">Close</seealso>
[Browsable(false)]
public bool IsOpen => mqttClient != null && mqttClient.IsConnected;

string IGXMedia.MediaType => "MQTT";

bool IGXMedia.Enabled => true;

/// <summary>
/// Constructor.
/// </summary>
public GXMqtt()
{
syncBase = new GXSynchronousMediaBase(1024);
ConfigurableSettings = AvailableMediaSettings.All;
}

string IGXMedia.Settings
{
get
{
string tmp = "";
if (!string.IsNullOrEmpty(serverAddress))
{
tmp += "<IP>" + serverAddress + "</IP>" + Environment.NewLine;
}
if (port != 0)
{
tmp += "<Port>" + port + "</Port>" + Environment.NewLine;
}
tmp += "<Topic>" + topic + "</Topic>" + Environment.NewLine;
//if (!string.IsNullOrEmpty(ClientId))
//{
// tmp += "<ClientId>" + ClientId + "</ClientId>" + Environment.NewLine;
//}
return tmp;
}
set
{
if (!string.IsNullOrEmpty(value))
{
XmlReaderSettings settings = new XmlReaderSettings(); ;
settings.ConformanceLevel = ConformanceLevel.Fragment;
using (XmlReader xmlReader = XmlReader.Create(new System.IO.StringReader(value), settings))
{
while (!xmlReader.EOF)
{
if (xmlReader.IsStartElement())
{
switch (xmlReader.Name)
{
case "Topic":
topic = (string)xmlReader.ReadElementContentAs(typeof(string), null);
break;
case "Port":
port = (int)(xmlReader.ReadElementContentAs(typeof(int), null));
break;
case "IP":
serverAddress = (string)xmlReader.ReadElementContentAs(typeof(string), null);
break;
case "ClientId":
ClientId = (string)xmlReader.ReadElementContentAs(typeof(string), null);
break;
}
}
else
{
xmlReader.Read();
}
}
}
}
}
}

/// <inheritdoc cref="IGXMedia.Synchronous"/>
public object Synchronous { get; } = new object();

/// <inheritdoc cref="IGXMedia.IsSynchronous"/>
public bool IsSynchronous
{
get
{
bool reserved = System.Threading.Monitor.TryEnter(Synchronous, 0);
if (reserved)
{
System.Threading.Monitor.Exit(Synchronous);
}
return !reserved;
}
}

/// <inheritdoc cref="IGXMedia.ResetSynchronousBuffer"/>
public void ResetSynchronousBuffer()
{
lock (syncBase.receivedSync)
{
syncBase.receivedSize = 0;
}
}

/// <inheritdoc cref="IGXMedia.Validate"/>
public void Validate()
{
if (port == 0)
{
throw new Exception(Resources.InvalidBrokerPort);
}
if (!string.IsNullOrEmpty(serverAddress))
{
throw new Exception(Resources.InvalidBrokerName);
}
if (!string.IsNullOrEmpty(topic))
{
throw new Exception(Resources.InvalidTopic);
}
}

/// <summary>
/// Sent byte count.
/// </summary>
/// <seealso cref="BytesReceived">BytesReceived</seealso>
/// <seealso cref="ResetByteCounters">ResetByteCounters</seealso>
[Browsable(false)]
public UInt64 BytesSent
{
get;
private set;
}

/// <summary>
/// Received byte count.
/// </summary>
/// <seealso cref="BytesSent">BytesSent</seealso>
/// <seealso cref="ResetByteCounters">ResetByteCounters</seealso>
[Browsable(false)]
public UInt64 BytesReceived
{
get;
private set;
}

/// <inheritdoc cref="IGXMedia.Eop"/>
public object Eop
{
get;
set;
}

/// <inheritdoc cref="IGXMedia.ConfigurableSettings"/>
public AvailableMediaSettings ConfigurableSettings
{
get
{
return (AvailableMediaSettings)((IGXMedia)this).ConfigurableSettings;
}
set
{
((IGXMedia)this).ConfigurableSettings = (int)value;
}
}

/// <inheritdoc cref="IGXMedia.ConfigurableSettings"/>
int IGXMedia.ConfigurableSettings
{
get;
set;
}

/// <inheritdoc />
object IGXMedia.Tag { get; set; }

IGXMediaContainer IGXMedia.MediaContainer { get; set; }

/// <inheritdoc />
[Browsable(false), ReadOnly(true)]
public object SyncRoot
{
get
{
//In some special cases when binary serialization is used this might be null
//after deserialize. Just set it.
if (sync == null)
{
sync = new object();
}
return sync;
}
}

#if NET462_OR_GREATER || WINDOWS
/// <summary>
/// Shows MQTT Properties dialog.
/// </summary>
/// <param name="parent">Owner window of the Properties dialog.</param>
/// <returns>True, if the user has accepted the changes.</returns>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ServerAddress">HostName</seealso>
/// <seealso href="PropertiesDialog.html">Properties Dialog</seealso>
public bool Properties(System.Windows.Forms.Form parent)
{
return new PropertiesForm(PropertiesForm, Resources.SettingsTxt, IsOpen, Resources.OK, Resources.Cancel,
"https://www.gurux.fi/GXMQTTProperties&quot;).ShowDialog(parent) == System.Windows.Forms.DialogResult.OK;
}

/// <inheritdoc cref="IGXMedia.PropertiesForm"/>
public System.Windows.Forms.Form PropertiesForm
{
get
{
return new Settings(this);
}
}
#endif //NET462_OR_GREATER || WINDOWS

private void NotifyPropertyChanged(string info)
{
if (m_OnPropertyChanged != null)
{
m_OnPropertyChanged(this, new PropertyChangedEventArgs(info));
}
}

/// <summary>
/// Used topic.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ClientId">Protocol</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used topic.")]
public string Topic
{
get
{
return topic;
}
set
{
if (topic != value)
{
topic = value;
NotifyPropertyChanged("Topic");
}
}
}

/// <summary>
/// Retrieves or sets used client ID.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="Topic">Topic</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used client ID.")]
public string ClientId
{
get
{
return userClientId;
}
set
{
if (userClientId != value)
{
userClientId = value;
NotifyPropertyChanged("ClientId");
}
}
}

/// <summary>
/// MQTT server address.
/// </summary>
/// <value>MQTT server address.</value>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("MQTT server address.")]
public string ServerAddress
{
get
{
return serverAddress;
}
set
{
if (serverAddress != value)
{
serverAddress = value;
NotifyPropertyChanged("ServerAddress");
}
}
}

/// <summary>
/// MQTT server port number.
/// </summary>
/// <value>MQTT server port number.</value>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="ServerAddress">ServerAddress</seealso>
[DefaultValue(1883)]
[Category("Communication")]
[Description("MQTT server port number.")]
public int Port
{
get
{
return port;
}
set
{
if (port != value)
{
port = value;
NotifyPropertyChanged("Port");
}
}
}

/// <inheritdoc />
public override string ToString()
{
StringBuilder sb = new StringBuilder();
sb.Append(serverAddress);
sb.Append(':');
sb.Append(port);
sb.Append(' ');
sb.Append(topic);
return sb.ToString();
}

/// <summary>
/// Occurs when a property value changes.
/// </summary>
public event PropertyChangedEventHandler PropertyChanged
{
add
{
m_OnPropertyChanged += value;
}
remove
{
m_OnPropertyChanged -= value;
}
}

/// <summary>
/// GXNet component sends received data through this method.
/// </summary>
[Description("GXNet component sends received data through this method.")]
public event ReceivedEventHandler OnReceived
{
add
{
m_OnReceived += value;
}
remove
{
m_OnReceived -= value;
}
}

/// <summary>
/// Errors that occur after the connection is established, are sent through this method.
/// </summary>
[Description("Errors that occur after the connection is established, are sent through this method.")]
public event ErrorEventHandler OnError
{
add
{

m_OnError += value;
}
remove
{
m_OnError -= value;
}
}

/// <summary>
/// Media component sends notification, when its state changes.
/// </summary>
[Description("Media component sends notification, when its state changes.")]
public event MediaStateChangeEventHandler OnMediaStateChange
{
add
{
m_OnMediaStateChange += value;
}
remove
{
m_OnMediaStateChange -= value;
}
}
#if WINDOWS_PHONE
event ClientConnectedEventHandler IGXMedia.OnClientConnected
{
add {
throw new NotImplementedException();
}
remove {
throw new NotImplementedException();
}
}

event ClientDisconnectedEventHandler IGXMedia.OnClientDisconnected
{
add {
throw new NotImplementedException();
}
remove {
throw new NotImplementedException();
}
}
#else
/// <summary>
/// Called when the client is establishing a connection with a Net Server.
/// </summary>
[Description("Called when the client is establishing a connection with a Net Server.")]
public event ClientConnectedEventHandler OnClientConnected
{
add
{
m_OnClientConnected += value;
}
remove
{
m_OnClientConnected -= value;
}
}

/// <summary>
/// Called when the client has been disconnected from the network server.
/// </summary>
[Description("Called when the client has been disconnected from the network server.")]
public event ClientDisconnectedEventHandler OnClientDisconnected
{
add
{
m_OnClientDisconnected += value;
}
remove
{
m_OnClientDisconnected -= value;
}
}

/// <inheritdoc />
[Description("Called when the Media is sending or receiving data.")]
public event TraceEventHandler OnTrace
{
add
{
m_OnTrace += value;
}
remove
{
m_OnTrace -= value;
}
}

#endif
/// <summary>
/// Publish message.
/// </summary>
/// <param name="msg"></param>
private async Task PublishMessageAsync(GXMessage msg)
{
string str;
//string subscribeTopic = "gw-response.Eficaa_GW*";
#if NET462
var parser = new Gurux.Common.JSon.GXJsonParser();
str = parser.Serialize(msg);
#else
str = System.Text.Json.JsonSerializer.Serialize(msg);
#endif
MqttApplicationMessage message = new MqttApplicationMessageBuilder()
.WithTopic("PublisherTest.1")
.WithPayload(str)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.WithRetainFlag()
.Build();
await mqttClient.PublishAsync(message);
}

/// <inheritdoc />
public void Close()
{
lastException = null;
if (mqttClient != null && mqttClient.IsConnected)
{
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Close, sender = clientId };
try
{
PublishMessageAsync(msg).Wait();
}
catch (Exception)
{
replyReceivedEvent.Set();
}
if (AsyncWaitTime == 0)
{
replyReceivedEvent.WaitOne();
}
else
{
replyReceivedEvent.WaitOne((int)AsyncWaitTime * 1000);
}
if (mqttClient != null)
{
mqttClient.DisconnectAsync().Wait();
}
mqttClient = null;
if (lastException != null)
{
throw new Exception(lastException);
}
}
}

void IGXMedia.Copy(object target)
{
GXMqtt tmp = (GXMqtt)target;
port = tmp.port;
serverAddress = tmp.serverAddress;
topic = tmp.topic;
}

private int HandleReceivedData(int bytes, byte[] buff, string sender)
{
BytesReceived += (uint)bytes;
if (this.IsSynchronous)
{
TraceEventArgs arg;
lock (syncBase.receivedSync)
{
int index = syncBase.receivedSize;
syncBase.AppendData(buff, 0, bytes);
if (bytes != 0 && Trace == TraceLevel.Verbose && m_OnTrace != null)
{
arg = new TraceEventArgs(TraceTypes.Received, buff, 0, bytes, null);
m_OnTrace(this, arg);
}
if (bytes != 0 && Eop != null) //Search Eop if given.
{
if (Eop is Array)
{
foreach (object eop in (Array)Eop)
{
bytes = Gurux.Common.GXCommon.IndexOf(syncBase.m_Received, Gurux.Common.GXCommon.GetAsByteArray(eop), index, syncBase.receivedSize);
if (bytes != -1)
{
break;
}
}
}
else
{
bytes = Gurux.Common.GXCommon.IndexOf(syncBase.m_Received, Gurux.Common.GXCommon.GetAsByteArray(Eop), index, syncBase.receivedSize);
}
}
if (bytes != -1)
{
syncBase.receivedEvent.Set();
}
}
}
else
{
if (m_OnReceived != null)
{
syncBase.receivedSize = 0;
byte[] data = new byte[bytes];
Array.Copy(buff, data, bytes);
if (Trace == TraceLevel.Verbose && m_OnTrace != null)
{
m_OnTrace(this, new TraceEventArgs(TraceTypes.Received, data, null));
}
m_OnReceived(this, new ReceiveEventArgs(data, sender));
}
else if (Trace == TraceLevel.Verbose && m_OnTrace != null)
{
m_OnTrace(this, new TraceEventArgs(TraceTypes.Received, buff, 0, bytes, null));
}
}
return bytes;
}

/// <inheritdoc />
public void Open()
{
Close();
mqttClient = factory.CreateMqttClient();
if (string.IsNullOrEmpty(userClientId))
{
clientId = Guid.NewGuid().ToString();
}
else
{
clientId = userClientId;
}
var options = new MqttClientOptionsBuilder()
.WithTcpServer(serverAddress).WithClientId(clientId).WithCredentials("admin","admin")
.Build();

mqttClient.ApplicationMessageReceivedAsync += async t =>
{
string str = ASCIIEncoding.ASCII.GetString(t.ApplicationMessage.Payload);
GXMessage msg;
#if NET462
var parser = new Gurux.Common.JSon.GXJsonParser();
msg = parser.Deserialize<GXMessage>(str);
#else
msg = System.Text.Json.JsonSerializer.Deserialize<GXMessage>(str);
#endif
//if (msg.id == messageId || (MessageType)msg.type == MessageType.Close || (MessageType)msg.type == MessageType.Exception)
{
switch ((MessageType)msg.type)
{
case MessageType.Open:
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Open));
replyReceivedEvent.Set();
break;
case MessageType.Send:
break;
case MessageType.Receive:
byte[] bytes = Gurux.Common.GXCommon.HexToBytes(msg.frame);
replyReceivedEvent.Set();
if (bytes.Length != 0)
{
HandleReceivedData(bytes.Length, bytes, t.ClientId);
}
break;
case MessageType.Close:
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Closed));
replyReceivedEvent.Set();
break;
case MessageType.Exception:
lastException = msg.exception;
replyReceivedEvent.Set();
break;
}
}
//else
{
m_OnTrace?.Invoke(this, new TraceEventArgs(TraceTypes.Info, Resources.UnknownReply + msg, msg.sender));
}
};
mqttClient.ConnectedAsync += async t =>
{

// Subscribe to a topic
//mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("gw-response.Eficaa_GW").WithAtMostOnceQoS().Build()).Wait();
//m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Opening));
//GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Open, sender = clientId };
//await PublishMessageAsync(msg);

var mqttSubscribeOptions = factory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic("PublisherTest.2");
})
.Build();
await mqttClient.SubscribeAsync(mqttSubscribeOptions);
//if (trace > TraceLevel.Warning)
//{
// Console.WriteLine("--- {0} Subscribed. ", it.Name);
//}
};
//mqttClient.ConnectAsync(options);
GXMessage msg1 = new GXMessage() { id = MessageId, type = (int)MessageType.Open, sender = clientId };
PublishMessageAsync(msg1);
mqttClient.DisconnectedAsync += async t =>
{
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Closed));
replyReceivedEvent.Set();
};
try
{
replyReceivedEvent.Reset();
if (AsyncWaitTime == 0)
{
mqttClient.ConnectAsync(options).Wait();
}
else
{
mqttClient.ConnectAsync(options).Wait((int)AsyncWaitTime * 1000);
}
//mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("gw-response.Eficaa_GW").WithAtMostOnceQoS().Build()).Wait();

}
catch (AggregateException ex)
{
if (mqttClient != null)
{
mqttClient.DisconnectAsync().Wait(10000);
}
mqttClient = null;
throw ex.InnerException;
}
if (AsyncWaitTime == 0)
{
replyReceivedEvent.WaitOne();
}
else
{
if (!replyReceivedEvent.WaitOne((int)AsyncWaitTime * 1000))
{
throw new TimeoutException(Resources.InvalidTopic + "'" + topic + "'.");
}
}
if (lastException != null)
{
throw new Exception(lastException);
}
}

/// <inheritdoc />
public bool Receive<T>(ReceiveParameters<T> args)
{
if (!IsOpen)
{
throw new InvalidOperationException(Resources.MediaIsClosed);
}
return syncBase.Receive(args);
}

/// <summary>
/// Resets BytesReceived and BytesSent counters.
/// </summary>
/// <seealso cref="BytesSent">BytesSent</seealso>
/// <seealso cref="BytesReceived">BytesReceived</seealso>
public void ResetByteCounters()
{
BytesSent = BytesReceived = 0;
}

/// <inheritdoc />
public void Send(object data)
{
(this as IGXMedia).Send(data, null);
}

void IGXMedia.Send(object data, string receiver)
{
byte[] tmp = (byte[])data;
if (tmp != null)
{
BytesSent += (ulong)tmp.Length;
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Send, sender = clientId, frame = Common.GXCommon.ToHex(tmp) };
PublishMessageAsync(msg).Wait();
}
}

/// <inheritdoc />
public uint AsyncWaitTime
{
get;
set;
}

/// <inheritdoc />
public EventWaitHandle AsyncWaitHandle
{
get
{
return replyReceivedEvent;
}
}

public uint ReceiveDelay
{
get;
set;
}
}
}

I am using this class for these in my broker I have created one queue with two topics PublisherTest.1 for Publishing msg and to subscribe PublisherTest.2 these two In Gxdirector I am giving broker address IP & Port and topic as PublisherTest.1 and when I click on connect {"id":2,"sender":"15a20ae0-ff51-4a18-a7ea-721751dedf29","type":3} like this data is publishing to topic but in director it saying invalid topic please give me the correct solution for this where I am doing wrong.

Profile picture for user Kurumi

Kurumi

2 years ago

Hi Yamini, GXDLMSDirector…

Hi Yamini,

GXDLMSDirector expects a specific topic structure and I believe that is causing the issue. The Bridge example does what you want to do. The first media is for the serial port and the next one is for TCP/IP port. You can create as many medias as you want to. Check connections.json

Media topic: 60b740fb-fd16-408b-b05f-686278b2782d/1
Media topic: 60b740fb-fd16-408b-b05f-686278b2782d/2

BR,
Mikko

Yamini

2 years ago

Hi, I am using two topics…

Hi,

I am using two topics one for publish (Requesttopic) and one for subscribe (Responsetopic) you given an example which uses only one topic for both publish and subscribe.

Profile picture for user Kurumi

Kurumi

2 years ago

Hi, Is there a reason why…

Hi,

Is there a reason why you can't use the current implementation? You can modify it as much as you like, but giving support is very complicated.

BR,
Mikko

Yamini

2 years ago

Hi One topic for publish…

Hi

One topic for publish and another for subscribe is it possible to use like this or not

Profile picture for user Kurumi

Kurumi

2 years ago

Hi, You can modify the…

Hi,

You can modify the example so there is one topic for publishing and another for subscribing if you want to, but the current implementation doesn't support it.

BR,
Mikko

Yamini

2 years ago

Can you please provide the…

Can you please provide the implementation for it I am trying but it is not working.

Yamini

2 years ago

// // ----------------------…

//
// --------------------------------------------------------------------------
// Gurux Ltd
//
//
//
// Filename: $HeadURL$
//
// Version: $Revision$,
// $Date$
// $Author$
//
// Copyright (c) Gurux Ltd
//
//---------------------------------------------------------------------------
//
// DESCRIPTION
//
// This file is a part of Gurux Device Framework.
//
// Gurux Device Framework is Open Source software; you can redistribute it
// and/or modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; version 2 of the License.
// Gurux Device Framework is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// This code is licensed under the GNU General Public License v2.
// Full text may be retrieved at http://www.gnu.org/licenses/gpl-2.0.txt
//---------------------------------------------------------------------------
using Gurux.Common;
using Gurux.MQTT.Message;
using Gurux.MQTT.Properties;
using Gurux.Shared;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.ComponentModel;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace Gurux.MQTT
{
/// <summary>
/// GXMqtt implements MQTT client that sends bytes to the MQTT broker.
/// </summary>
public class GXMqtt : IGXMedia2
{
/// <summary>
/// Sync object.
/// </summary>
private object sync = new object();
internal GXSynchronousMediaBase syncBase;

//Events
PropertyChangedEventHandler m_OnPropertyChanged;
MediaStateChangeEventHandler m_OnMediaStateChange;
TraceEventHandler m_OnTrace;
ClientConnectedEventHandler m_OnClientConnected;
ClientDisconnectedEventHandler m_OnClientDisconnected;
internal ErrorEventHandler m_OnError;
internal ReceivedEventHandler m_OnReceived;
internal AutoResetEvent replyReceivedEvent = new AutoResetEvent(false);

/// <summary>
/// Last exception.
/// </summary>
private string lastException;

/// <summary>
/// Unique Message ID.
/// </summary>
private UInt16 messageId;

private UInt16 MessageId
{
get
{
return ++messageId;
}
}

/// <summary>
/// Used topic.
/// </summary>
string topic;

/// <summary>
/// Used subscribetopic.
/// </summary>
string subscribetopic;

/// <summary>
/// Used client ID.
/// </summary>
string clientId;

/// <summary>
/// Client ID that user want's to use.
/// </summary>
string userClientId;

/// <summary>
/// Server address.
/// </summary>
string serverAddress;
/// <summary>
/// Host port.
/// </summary>
int port = 1883;

static readonly MqttFactory factory = new MqttFactory();
IMqttClient mqttClient;
string IGXMedia.Name => "MQTT";

/// <summary>
/// What level of tracing is used.
/// </summary>
public TraceLevel Trace
{
get
{
return syncBase.Trace;
}
set
{
syncBase.Trace = value;
}
}

/// <inheritdoc cref="IGXMedia.IsOpen"/>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Close">Close</seealso>
[Browsable(false)]
public bool IsOpen => mqttClient != null && mqttClient.IsConnected;

string IGXMedia.MediaType => "MQTT";

bool IGXMedia.Enabled => true;

/// <summary>
/// Constructor.
/// </summary>
public GXMqtt()
{
syncBase = new GXSynchronousMediaBase(1024);
ConfigurableSettings = AvailableMediaSettings.All;
}

string IGXMedia.Settings
{
get
{
string tmp = "";
if (!string.IsNullOrEmpty(serverAddress))
{
tmp += "<IP>" + serverAddress + "</IP>" + Environment.NewLine;
}
if (port != 0)
{
tmp += "<Port>" + port + "</Port>" + Environment.NewLine;
}
tmp += "<Topic>" + topic + "</Topic>" + Environment.NewLine;
tmp += "<Topic>" + subscribetopic + "</Topic>" + Environment.NewLine;
//if (!string.IsNullOrEmpty(ClientId))
//{
// tmp += "<ClientId>" + ClientId + "</ClientId>" + Environment.NewLine;
//}
return tmp;
}
set
{
if (!string.IsNullOrEmpty(value))
{
XmlReaderSettings settings = new XmlReaderSettings(); ;
settings.ConformanceLevel = ConformanceLevel.Fragment;
using (XmlReader xmlReader = XmlReader.Create(new System.IO.StringReader(value), settings))
{
while (!xmlReader.EOF)
{
if (xmlReader.IsStartElement())
{
switch (xmlReader.Name)
{
case "Topic":
topic = (string)xmlReader.ReadElementContentAs(typeof(string), null);
break;
case "Port":
port = (int)(xmlReader.ReadElementContentAs(typeof(int), null));
break;
case "IP":
serverAddress = (string)xmlReader.ReadElementContentAs(typeof(string), null);
break;
case "ClientId":
ClientId = (string)xmlReader.ReadElementContentAs(typeof(string), null);
break;
}
}
else
{
xmlReader.Read();
}
}
}
}
}
}

/// <inheritdoc cref="IGXMedia.Synchronous"/>
public object Synchronous { get; } = new object();

/// <inheritdoc cref="IGXMedia.IsSynchronous"/>
public bool IsSynchronous
{
get
{
bool reserved = System.Threading.Monitor.TryEnter(Synchronous, 0);
if (reserved)
{
System.Threading.Monitor.Exit(Synchronous);
}
return !reserved;
}
}

/// <inheritdoc cref="IGXMedia.ResetSynchronousBuffer"/>
public void ResetSynchronousBuffer()
{
lock (syncBase.receivedSync)
{
syncBase.receivedSize = 0;
}
}

/// <inheritdoc cref="IGXMedia.Validate"/>
public void Validate()
{
if (port == 0)
{
throw new Exception(Resources.InvalidBrokerPort);
}
if (!string.IsNullOrEmpty(serverAddress))
{
throw new Exception(Resources.InvalidBrokerName);
}
if (!string.IsNullOrEmpty(topic))
{
throw new Exception(Resources.InvalidTopic);
}
}

/// <summary>
/// Sent byte count.
/// </summary>
/// <seealso cref="BytesReceived">BytesReceived</seealso>
/// <seealso cref="ResetByteCounters">ResetByteCounters</seealso>
[Browsable(false)]
public UInt64 BytesSent
{
get;
private set;
}

/// <summary>
/// Received byte count.
/// </summary>
/// <seealso cref="BytesSent">BytesSent</seealso>
/// <seealso cref="ResetByteCounters">ResetByteCounters</seealso>
[Browsable(false)]
public UInt64 BytesReceived
{
get;
private set;
}

/// <inheritdoc cref="IGXMedia.Eop"/>
public object Eop
{
get;
set;
}

/// <inheritdoc cref="IGXMedia.ConfigurableSettings"/>
public AvailableMediaSettings ConfigurableSettings
{
get
{
return (AvailableMediaSettings)((IGXMedia)this).ConfigurableSettings;
}
set
{
((IGXMedia)this).ConfigurableSettings = (int)value;
}
}

/// <inheritdoc cref="IGXMedia.ConfigurableSettings"/>
int IGXMedia.ConfigurableSettings
{
get;
set;
}

/// <inheritdoc />
object IGXMedia.Tag { get; set; }

IGXMediaContainer IGXMedia.MediaContainer { get; set; }

/// <inheritdoc />
[Browsable(false), ReadOnly(true)]
public object SyncRoot
{
get
{
//In some special cases when binary serialization is used this might be null
//after deserialize. Just set it.
if (sync == null)
{
sync = new object();
}
return sync;
}
}

#if NET462_OR_GREATER || WINDOWS
/// <summary>
/// Shows MQTT Properties dialog.
/// </summary>
/// <param name="parent">Owner window of the Properties dialog.</param>
/// <returns>True, if the user has accepted the changes.</returns>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ServerAddress">HostName</seealso>
/// <seealso href="PropertiesDialog.html">Properties Dialog</seealso>
public bool Properties(System.Windows.Forms.Form parent)
{
return new PropertiesForm(PropertiesForm, Resources.SettingsTxt, IsOpen, Resources.OK, Resources.Cancel,
"https://www.gurux.fi/GXMQTTProperties&quot;).ShowDialog(parent) == System.Windows.Forms.DialogResult.OK;
}

/// <inheritdoc cref="IGXMedia.PropertiesForm"/>
public System.Windows.Forms.Form PropertiesForm
{
get
{
return new Settings(this);
}
}
#endif //NET462_OR_GREATER || WINDOWS

private void NotifyPropertyChanged(string info)
{
if (m_OnPropertyChanged != null)
{
m_OnPropertyChanged(this, new PropertyChangedEventArgs(info));
}
}

/// <summary>
/// Used topic.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ClientId">Protocol</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used topic.")]
public string Topic
{
get
{
return topic;
}
set
{
if (topic != value)
{
topic = value;
NotifyPropertyChanged("Topic");
}
}
}

/// <summary>
/// Used Subscribetopic.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="ClientId">Protocol</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used topic.")]
public string SubscribeTopic
{
get
{
return subscribetopic;
}
set
{
if (subscribetopic != value)
{
subscribetopic = value;
NotifyPropertyChanged("subscribetopic");
}
}
}

/// <summary>
/// Retrieves or sets used client ID.
/// </summary>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
/// <seealso cref="Topic">Topic</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("Retrieves or sets used client ID.")]
public string ClientId
{
get
{
return userClientId;
}
set
{
if (userClientId != value)
{
userClientId = value;
NotifyPropertyChanged("ClientId");
}
}
}

/// <summary>
/// MQTT server address.
/// </summary>
/// <value>MQTT server address.</value>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="Port">Port</seealso>
[DefaultValue("")]
[Category("Communication")]
[Description("MQTT server address.")]
public string ServerAddress
{
get
{
return serverAddress;
}
set
{
if (serverAddress != value)
{
serverAddress = value;
NotifyPropertyChanged("ServerAddress");
}
}
}

/// <summary>
/// MQTT server port number.
/// </summary>
/// <value>MQTT server port number.</value>
/// <seealso cref="Open">Open</seealso>
/// <seealso cref="ServerAddress">ServerAddress</seealso>
[DefaultValue(1883)]
[Category("Communication")]
[Description("MQTT server port number.")]
public int Port
{
get
{
return port;
}
set
{
if (port != value)
{
port = value;
NotifyPropertyChanged("Port");
}
}
}

/// <inheritdoc />
public override string ToString()
{
StringBuilder sb = new StringBuilder();
sb.Append(serverAddress);
sb.Append(':');
sb.Append(port);
sb.Append(' ');
sb.Append(topic);
sb.Append(subscribetopic);
return sb.ToString();
}

/// <summary>
/// Occurs when a property value changes.
/// </summary>
public event PropertyChangedEventHandler PropertyChanged
{
add
{
m_OnPropertyChanged += value;
}
remove
{
m_OnPropertyChanged -= value;
}
}

/// <summary>
/// GXNet component sends received data through this method.
/// </summary>
[Description("GXNet component sends received data through this method.")]
public event ReceivedEventHandler OnReceived
{
add
{
m_OnReceived += value;
}
remove
{
m_OnReceived -= value;
}
}

/// <summary>
/// Errors that occur after the connection is established, are sent through this method.
/// </summary>
[Description("Errors that occur after the connection is established, are sent through this method.")]
public event ErrorEventHandler OnError
{
add
{

m_OnError += value;
}
remove
{
m_OnError -= value;
}
}

/// <summary>
/// Media component sends notification, when its state changes.
/// </summary>
[Description("Media component sends notification, when its state changes.")]
public event MediaStateChangeEventHandler OnMediaStateChange
{
add
{
m_OnMediaStateChange += value;
}
remove
{
m_OnMediaStateChange -= value;
}
}
#if WINDOWS_PHONE
event ClientConnectedEventHandler IGXMedia.OnClientConnected
{
add {
throw new NotImplementedException();
}
remove {
throw new NotImplementedException();
}
}

event ClientDisconnectedEventHandler IGXMedia.OnClientDisconnected
{
add {
throw new NotImplementedException();
}
remove {
throw new NotImplementedException();
}
}
#else
/// <summary>
/// Called when the client is establishing a connection with a Net Server.
/// </summary>
[Description("Called when the client is establishing a connection with a Net Server.")]
public event ClientConnectedEventHandler OnClientConnected
{
add
{
m_OnClientConnected += value;
}
remove
{
m_OnClientConnected -= value;
}
}

/// <summary>
/// Called when the client has been disconnected from the network server.
/// </summary>
[Description("Called when the client has been disconnected from the network server.")]
public event ClientDisconnectedEventHandler OnClientDisconnected
{
add
{
m_OnClientDisconnected += value;
}
remove
{
m_OnClientDisconnected -= value;
}
}

/// <inheritdoc />
[Description("Called when the Media is sending or receiving data.")]
public event TraceEventHandler OnTrace
{
add
{
m_OnTrace += value;
}
remove
{
m_OnTrace -= value;
}
}

#endif
/// <summary>
/// Publish message.
/// </summary>
/// <param name="msg"></param>
private async Task PublishMessageAsync(GXMessage msg)
{
string str;
#if NET462
var parser = new Gurux.Common.JSon.GXJsonParser();
str = parser.Serialize(msg);
#else
str = System.Text.Json.JsonSerializer.Serialize(msg);
#endif
MqttApplicationMessage message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(str)
//.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.Build();
await mqttClient.PublishAsync(message);
}

/// <inheritdoc />
public void Close()
{
lastException = null;
if (mqttClient != null && mqttClient.IsConnected)
{
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Close, sender = clientId };
try
{
PublishMessageAsync(msg).Wait();
}
catch (Exception)
{
replyReceivedEvent.Set();
}
if (AsyncWaitTime == 0)
{
replyReceivedEvent.WaitOne();
}
else
{
replyReceivedEvent.WaitOne((int)AsyncWaitTime * 1000);
}
if (mqttClient != null)
{
mqttClient.DisconnectAsync().Wait();
}
mqttClient = null;
if (lastException != null)
{
throw new Exception(lastException);
}
}
}

void IGXMedia.Copy(object target)
{
GXMqtt tmp = (GXMqtt)target;
port = tmp.port;
serverAddress = tmp.serverAddress;
topic = tmp.topic;
}

private int HandleReceivedData(int bytes, byte[] buff, string sender)
{
BytesReceived += (uint)bytes;
if (this.IsSynchronous)
{
TraceEventArgs arg;
lock (syncBase.receivedSync)
{
int index = syncBase.receivedSize;
syncBase.AppendData(buff, 0, bytes);
if (bytes != 0 && Trace == TraceLevel.Verbose && m_OnTrace != null)
{
arg = new TraceEventArgs(TraceTypes.Received, buff, 0, bytes, null);
m_OnTrace(this, arg);
}
if (bytes != 0 && Eop != null) //Search Eop if given.
{
if (Eop is Array)
{
foreach (object eop in (Array)Eop)
{
bytes = Gurux.Common.GXCommon.IndexOf(syncBase.m_Received, Gurux.Common.GXCommon.GetAsByteArray(eop), index, syncBase.receivedSize);
if (bytes != -1)
{
break;
}
}
}
else
{
bytes = Gurux.Common.GXCommon.IndexOf(syncBase.m_Received, Gurux.Common.GXCommon.GetAsByteArray(Eop), index, syncBase.receivedSize);
}
}
if (bytes != -1)
{
syncBase.receivedEvent.Set();
}
}
}
else
{
if (m_OnReceived != null)
{
syncBase.receivedSize = 0;
byte[] data = new byte[bytes];
Array.Copy(buff, data, bytes);
if (Trace == TraceLevel.Verbose && m_OnTrace != null)
{
m_OnTrace(this, new TraceEventArgs(TraceTypes.Received, data, null));
}
m_OnReceived(this, new ReceiveEventArgs(data, sender));
}
else if (Trace == TraceLevel.Verbose && m_OnTrace != null)
{
m_OnTrace(this, new TraceEventArgs(TraceTypes.Received, buff, 0, bytes, null));
}
}
return bytes;
}

/// <inheritdoc />
public void Open()
{
Close();
mqttClient = factory.CreateMqttClient();
if (string.IsNullOrEmpty(userClientId))
{
clientId = Guid.NewGuid().ToString();
}
else
{
clientId = userClientId;
}
var options = new MqttClientOptionsBuilder()
.WithTcpServer(serverAddress, port).WithClientId(clientId)
.WithCredentials("admin", "admin")
.Build();
mqttClient.ApplicationMessageReceivedAsync += async t =>
{
string str = ASCIIEncoding.ASCII.GetString(t.ApplicationMessage.Payload);
GXMessage msg;
#if NET462
var parser = new Gurux.Common.JSon.GXJsonParser();
msg = parser.Deserialize<GXMessage>(str);
#else
msg = System.Text.Json.JsonSerializer.Deserialize<GXMessage>(str);
#endif
//if (msg.id == messageId || (MessageType)msg.type == MessageType.Close || (MessageType)msg.type == MessageType.Exception)
{
switch ((MessageType)msg.type)
{
//case MessageType.Open:
// m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Open));
// replyReceivedEvent.Set();
// break;
case MessageType.Send:
break;
case MessageType.Receive:
byte[] bytes = Gurux.Common.GXCommon.HexToBytes(msg.frame);
replyReceivedEvent.Set();
if (bytes.Length != 0)
{
HandleReceivedData(bytes.Length, bytes, t.ClientId);
}
break;
case MessageType.Close:
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Closed));
replyReceivedEvent.Set();
break;
case MessageType.Exception:
lastException = msg.exception;
replyReceivedEvent.Set();
break;
}
}
//else
//{
// m_OnTrace?.Invoke(this, new TraceEventArgs(TraceTypes.Info, Resources.UnknownReply + msg, msg.sender));
//}
};
mqttClient.ConnectedAsync += async t =>
{
// Subscribe to a topic
mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(subscribetopic)
//.WithExactlyOnceQoS()
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
//.WithCleanSession()
.Build()).Wait();
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Opening));
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Send, sender = clientId };
replyReceivedEvent.Reset();

//await PublishMessageAsync(msg);
//Send
};

mqttClient.DisconnectedAsync += async t =>
{
m_OnMediaStateChange?.Invoke(this, new MediaStateEventArgs(MediaState.Closed));
replyReceivedEvent.Set();
};
try
{
replyReceivedEvent.Reset();
if (AsyncWaitTime == 0)
{
mqttClient.ConnectAsync(options).Wait();
}
else
{
mqttClient.ConnectAsync(options).Wait((int)AsyncWaitTime * 1000);
}
}
catch (AggregateException ex)
{
if (mqttClient != null)
{
mqttClient.DisconnectAsync().Wait(10000);
}
mqttClient = null;
throw ex.InnerException;
}
if (AsyncWaitTime == 0)
{
replyReceivedEvent.WaitOne();
}
else
{
if (!replyReceivedEvent.WaitOne((int)AsyncWaitTime * 1000))
{
throw new TimeoutException(Resources.InvalidTopic + "'" + topic + "'.");
}
}
if (lastException != null)
{
throw new Exception(lastException);
}
}

/// <inheritdoc />
public bool Receive<T>(ReceiveParameters<T> args)
{
if (!IsOpen)
{
throw new InvalidOperationException(Resources.MediaIsClosed);
}
return syncBase.Receive(args);
}

/// <summary>
/// Resets BytesReceived and BytesSent counters.
/// </summary>
/// <seealso cref="BytesSent">BytesSent</seealso>
/// <seealso cref="BytesReceived">BytesReceived</seealso>
public void ResetByteCounters()
{
BytesSent = BytesReceived = 0;
}

/// <inheritdoc />
public void Send(object data)
{
(this as IGXMedia).Send(data, null);
}

void IGXMedia.Send(object data, string receiver)
{
byte[] tmp = (byte[])data;
if (tmp != null)
{
BytesSent += (ulong)tmp.Length;
GXMessage msg = new GXMessage() { id = MessageId, type = (int)MessageType.Send, sender = clientId, frame = Common.GXCommon.ToHex(tmp) };
PublishMessageAsync(msg).Wait();
}
}

/// <inheritdoc />
public uint AsyncWaitTime
{
get;
set;
}

/// <inheritdoc />
public EventWaitHandle AsyncWaitHandle
{
get
{
return replyReceivedEvent;
}
}

public uint ReceiveDelay
{
get;
set;
}

}
}
this is my code which is modified for publish to one topic and subscribe to another topic but when an open message is published to subscribed topic then only AARQ is publishing to publish topic but I want to publish AARQ first without any acknowledgement please help me in this implementation

Profile picture for user Kurumi

Kurumi

2 years ago

Hi, You need to do…

Hi,

You need to do modifications like that for yourself. You need to make changes to the Development folder so it can receive your topics.

BR,
Mikko

Yamini

2 years ago

Hi I am using development…

Hi

I am using development code only but it is not working

Yamini

2 years ago

communicate wirepas with …

communicate wirepas with .net. Is there any wirepas .net code available.

Yamini

2 years ago

Hi , Mikko Can you please…

Hi ,

Mikko

Can you please provide an example to communicate the dlms meter with mqtt two topics one for publish and another for subscribe. I have tried so many ways I am strucking in with out acknowledgement I need to publish AARQ but it is not working

Profile picture for user Kurumi

Kurumi

2 years ago

Hi Yamini, I can't say when…

Hi Yamini,

I can't say when the example for two topics is added. That is added if
our customers ask for it.

BR,
Mikko

Yamini

2 years ago

wirepas mqtt example is there

wirepas mqtt example is there

Yamini

2 years ago

Hi, Mikko Is Wirepass…

Hi,

Mikko

Is Wirepass example there in C#.

Profile picture for user Kurumi

Kurumi

2 years ago

Hi Yamini, There are no…

Hi Yamini,

There are no Wirepass example to C#. Wirepas example is for NIC and it doesn't use MQTT.

BR,
Mikko

Yamini

2 years ago

Hi , Mikko, Thank you for…

Hi ,

Mikko,

Thank you for your support.

  • Create new account
  • Reset your password

Hire Us!

Latest Releases

  • Sun, 02/01/2026 - 19:47
    gurux.dlms.cpp 9.0.2602.0101
  • Tue, 01/20/2026 - 10:38
    gurux.dlms.java 4.0.89
  • Wed, 01/14/2026 - 14:29
    Gurux.DLMS.Python 1.0.196
  • Mon, 12/29/2025 - 10:38
    Gurux.Serial.Android 3.0.5
  • Mon, 12/15/2025 - 08:11
    Gurux.DLMS.Net 4.0.87

New forum topics

  • Landisgyr meter
  • GxDateTime always return DayOfWeek as FF
  • Asynchronous support in Gurux DLMS stack (.NET)?
  • Multiple meter loop On OFF Problem
  • Python GXDLMSTranslator
More
RSS feed
Privacy FAQ GXDN Issues Contact
Follow Gurux on Twitter Follow Gurux on Linkedin