Skip to content

Commit

Permalink
Parse OPCUA LocalizedText and pass 'lang' value to iff agent & mqtt
Browse files Browse the repository at this point in the history
The OPCUA Connector does forward Localized Tex to the value which provides
an inconsistent data view. In this PR, the Localized text is decomposed into
a string a and language vaue which is compatible with Semantic Web and NGSI-LD.

Signed-off-by: marcel <[email protected]>
  • Loading branch information
wagmarcel authored and abhijith-hr committed Nov 11, 2024
1 parent 8e7513c commit 5794a32
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 10 deletions.
11 changes: 11 additions & 0 deletions NgsildAgent/lib/DataSubmission.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ class DataSubmission {
msg.properties.keys = ['datasetId'];
}
}
if (msg.l !== undefined || msg.l === null) {
const lang = msg.l;
delete msg.l;
if (msg.properties === undefined) {
msg.properties = {};
msg.properties.values = [];
msg.properties.keys = [];
}
msg.properties.values.push(lang);
msg.properties.keys.push('lang');
}
if (msg.on === undefined || msg.on === null) {
msg.on = new Date().getTime();
}
Expand Down
3 changes: 3 additions & 0 deletions NgsildAgent/lib/schemas/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
},
"i": {
"type": "string"
},
"l": {
"type": "string"
}
},
"required": [
Expand Down
79 changes: 75 additions & 4 deletions NgsildAgent/test/DataSubmission.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,13 @@ describe('DataSubmission', function () {
validateIriStub.validateIri.withArgs('dataset1').returns(false);

// Mock validator to return null (valid)
const processedMsg = {
const processedMsg = [{
n: 'Property/metric1',
v: 100,
on: 1627891234567,
properties: { values: ['dataset1'], keys: ['datasetId'] }
};
validatorFunction.withArgs(processedMsg).returns(null);
}];
validatorFunction.returns(null);

// Act
await submissionCallback(msgs);
Expand All @@ -294,10 +294,81 @@ describe('DataSubmission', function () {

// acknowledge should be called with msgKeys
expect(dbManagerMock.acknowledge).to.have.been.calledOnce;
expect(connectorStub.dataSubmit.firstCall.args[0]).to.deep.equal(processedMsg);

});

it('should handle messages with lang', async function () {
// Arrange
const msgs = [
{
n: 'metric1',
v: "test",
l: 'lang',
on: 1627891234667,
},
];

// Mock validator to return null (valid)
const processedMsg = [{
n: 'Property/metric1',
v: "test",
on: 1627891234667,
properties: { values: ['lang'], keys: ['lang'] }
}];
validatorFunction.returns(null);

// Act
await submissionCallback(msgs);

// Assert
// preInsert should be called with status 1
expect(dbManagerMock.preInsert).to.have.been.calledOnce;

// connector.dataSubmit should be called with modified message
expect(connectorStub.dataSubmit).to.have.been.calledOnce;

expect(dbManagerMock.acknowledge).to.have.been.calledWith([]);
// acknowledge should be called with msgKeys
expect(dbManagerMock.acknowledge).to.have.been.calledOnce;
expect(connectorStub.dataSubmit.firstCall.args[0]).to.deep.equal(processedMsg);
});

it('should handle messages with lang and datasetId', async function () {
// Arrange
const msgs = [
{
n: 'metric1',
v: "test",
l: 'lang',
d: 'dataset2',
on: 1627891234667,
},
];

// Mock validator to return null (valid)
const processedMsg = [{
n: 'Property/metric1',
v: "test",
on: 1627891234667,
properties: { values: ['dataset2', 'lang'], keys: ['datasetId', 'lang'] }
}];
validatorFunction.returns(null);

// Act
await submissionCallback(msgs);

// Assert
// preInsert should be called with status 1
expect(dbManagerMock.preInsert).to.have.been.calledOnce;

// connector.dataSubmit should be called with modified message
expect(connectorStub.dataSubmit).to.have.been.calledOnce;

// acknowledge should be called with msgKeys
expect(dbManagerMock.acknowledge).to.have.been.calledOnce;
expect(connectorStub.dataSubmit.firstCall.args[0]).to.deep.equal(processedMsg);
});

it('should handle validation failures', async function () {
// Arrange
const msgs = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
try:
asyncua = importlib.import_module('asyncua')
asyncua_client = asyncua.Client
ua = asyncua.ua
except ImportError:
asyncua_client = None
print("The 'asyncua' library is not installed. Please install it separately to use this functionality.")
Expand Down Expand Up @@ -73,6 +74,11 @@ async def subscribe(map, firmware, sleeptime=5):
print(f"Warning Could not retrieve data for nodeid {map['connectorAttribute']}.")
value = None
print(f"Value {value} received")
map['value'] = Literal(value)
if isinstance(value, ua.LocalizedText):
map['value'] = Literal(value.Text)
map['lang'] = value.Locale
else:
map['value'] = Literal(value)
map['lang'] = None
map['updated'] = True
await asyncio.sleep(sleeptime)
12 changes: 9 additions & 3 deletions semantic-model/dataservice/startDataservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,16 +345,22 @@ def send(results, attribute, entityId, dryrun, port):
result = results[datasetId]
value = result['value']
type = result['type']
lang = None
if 'lang' in result:
lang = result['lang']
datasetId = result
prefix = "Property"
if type == prefixes['ngsi-ld'].Relationship:
prefix = "Relationship"
elif isinstance(value, URIRef):
prefix = "PropertyIri"
# Send over mqtt/device-agent

payload.append(f'{{ "n": "{attribute}",\
"v": "{value.toPython()}", "t": "{prefix}", "i": "{entityId}"}}')
payload_build = f'{{ "n": "{attribute}",\
"v": "{value.toPython()}", "t": "{prefix}", "i": "{entityId}"'
if lang is not None:
payload_build += f', "l": "{lang}"'
payload_build += '}'
payload.append(payload_build)
payloads = f'[{",".join(payload)}]'
if not dryrun:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand Down
9 changes: 7 additions & 2 deletions semantic-model/dataservice/tests/server/binding_server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#
# Copyright (c) 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -26,11 +25,13 @@

ua = None
Server = None
LocalizedText = None

try:
asyncua = importlib.import_module('asyncua')
ua = asyncua.ua
Server = asyncua.Server
LocalizedText = ua.LocalizedText
except ImportError:
asyncua_client = None
print("The 'asyncua' library is not installed. Please install it separately to use this functionality.")
Expand Down Expand Up @@ -61,6 +62,8 @@ async def update_values(variables):
value = random.randint(0, 100)
elif datatype == str:
value = generate_random_string(12) # Generate a 12 character random string
elif datatype == LocalizedText:
value = LocalizedText(generate_random_string(12), "en") # Random string with English locale
else:
value = "UnknownType"

Expand Down Expand Up @@ -120,6 +123,8 @@ async def setup_opcua_server(mapping_data):
value = random.randint(0, 100)
elif data['datatype'] == str:
value = generate_random_string(12)
elif data['datatype'] == LocalizedText:
value = LocalizedText(generate_random_string(12), "en")
else:
value = "UnknownType"

Expand Down Expand Up @@ -180,7 +185,7 @@ def parse_rdf_to_mapping(rdf_file, base_ns, binding_ns=None, uaentity_ns=None):
elif datatype_uri == opcua_ns.String:
datatype = str
elif datatype_uri == opcua_ns.LocalizedText:
datatype = str
datatype = LocalizedText
else:
print(f"Warning, could not determine python type for {datatype_uri}. Using default: str.")
datatype = str # Default to string if not recognized
Expand Down

0 comments on commit 5794a32

Please sign in to comment.