Skip to content

Commit

Permalink
Implement merge behavior to the ngsild bridge and update Scorpio Version
Browse files Browse the repository at this point in the history
Update operation of ngsild bridge now exclusively uses batch processed
merge patch operation. Tests are adapted to reflect behavioral change.
New tests are added to verify new behavior. Scorpio version upgraded
to v5.0.5.

Signed-off-by: Meric Feyzullahoglu <[email protected]>
  • Loading branch information
MericFeyz committed Dec 13, 2024
1 parent 128617c commit 3096624
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 110 deletions.
20 changes: 20 additions & 0 deletions KafkaBridge/lib/ngsild.js
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,26 @@ function fiwareApi (conf) {
return rest.postBody({ options, body: data });
};

/**
* Run batch merge operation on the entities
* @param {array[Object]} entities - Array of JSON patches to merge
* @param {array[Object]} headers - additional headers
*/
this.batchMerge = function (entities, { headers }) {
headers = headers || {};
headers['Content-Type'] = 'application/ld+json';

const options = {
hostname: config.ngsildServer.hostname,
protocol: config.ngsildServer.protocol,
port: config.ngsildServer.port,
path: '/ngsi-ld/v1/entityOperations/merge',
headers: headers,
method: 'POST'
};
return rest.postBody({ options, body: entities });
};

/**
* Helpers
*/
Expand Down
22 changes: 9 additions & 13 deletions KafkaBridge/lib/ngsildUpdates.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,16 @@ module.exports = function NgsildUpdates (conf) {
try {
// update the entity - do not create it
if (op === 'update') {
// NOTE: The batch update API of Scorpio does not yet support noOverwrite options. For the time being
// the batch processing will be done sequentially - until this is fixed in Scorpio
for (const entity of entities) { // olet i = 0; i < entities.length; i ++) {
// basic health check of entity
if (entity.id === undefined || entity.id == null) {
logger.error('Unhealthy entity - ignoring it:' + JSON.stringify(entity));
} else {
logger.debug('Updating: ' + JSON.stringify(entities));
result = await ngsild.updateProperties({ id: entity.id, body: entity, isOverwrite: overwriteOrReplace }, { headers });
if (result.statusCode !== 204 && result.statusCode !== 207) {
logger.error('Entity cannot update entity:' + JSON.stringify(result.body) + ' and status code ' + result.statusCode); // throw no error, log it and ignore it, repeating would probably not solve it
}
// Only batch merge is run
if (entities === undefined || entities == null) {
logger.error('Unhealthy entities - ignoring it:' + JSON.stringify(entities));
} else {
logger.debug('Updating: ' + JSON.stringify(entities));
result = await ngsild.batchMerge(entities, { headers });
if (result.statusCode !== 204 && result.statusCode !== 207) {
logger.error('Entity cannot run merge:' + JSON.stringify(result.body) + ' and status code ' + result.statusCode); // throw no error, log it and ignore it, repeating would probably not solve it
}
};
}
} else if (op === 'upsert') {
// in this case, entity will be created if not existing
logger.debug('Upserting: ' + JSON.stringify(entities));
Expand Down
41 changes: 41 additions & 0 deletions KafkaBridge/test/testLibNgsild.js
Original file line number Diff line number Diff line change
Expand Up @@ -971,3 +971,44 @@ describe('Test updateEntities', function () {
revert();
});
});
describe('Test batchMerge', function () {
it('Should use correct options and headers', async function () {
const Logger = function () {
return logger;
};
const Rest = function () {
return rest;
};
const headers = { Authorization: 'Bearer token' };
const expectedOptions = {
hostname: 'hostname',
protocol: 'http:',
port: 1234,
method: 'POST',
path: '/ngsi-ld/v1/entityOperations/merge',
headers: {
'Content-Type': 'application/ld+json',
Authorization: 'Bearer token'
}
};
const rest = {
postBody: function (obj) {
assert.deepEqual(obj.options, expectedOptions);
assert.deepEqual(obj.body, entities);
return Promise.resolve('merged');
}
};

const entities = [
{ id: 'id1', type: 'type1', attr1: 'value1' },
{ id: 'id2', type: 'type2', attr2: 'value2' }
];

const revert = ToTest.__set__('Logger', Logger);
ToTest.__set__('Rest', Rest);
const ngsild = new ToTest(config);
const result = await ngsild.batchMerge(entities, { headers });
result.should.equal('merged');
revert();
});
});
86 changes: 46 additions & 40 deletions KafkaBridge/test/testLibNgsildUpdates.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ const logger = {
const addSyncOnAttribute = function () {};

describe('Test libNgsildUpdates', function () {
it('Should post body with correct path and token for nonOverwrite update', async function () {
let updatePropertiesCalled = false;
it('Should post entities with correct path and token for nonOverwrite update using batchMerge', async function () {
let batchMergeCalled = false;
const config = {
ngsildUpdates: {
clientSecretVariable: 'CLIENT_SECRET',
Expand Down Expand Up @@ -66,19 +66,20 @@ describe('Test libNgsildUpdates', function () {
};
const Ngsild = function () {
return {
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
updatePropertiesCalled = true;
id.should.equal('id');
assert.deepEqual(body, { id: 'id', type: 'type' });
isOverwrite.should.equal(false);
batchMerge: function (entities, { headers }) {
batchMergeCalled = true;
assert.deepEqual(entities, body.entities);
assert.deepEqual(headers, expHeaders);
return new Promise(function (resolve) {
resolve({
statusCode: 204
});
});
},
// Stub updateProperties if needed
updateProperties: function () {},
replaceEntities: function () {

}
};
};
Expand Down Expand Up @@ -108,11 +109,11 @@ describe('Test libNgsildUpdates', function () {
ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute);
const ngsildUpdates = new ToTest(config);
await ngsildUpdates.ngsildUpdates(body);
updatePropertiesCalled.should.equal(true);
batchMergeCalled.should.equal(true);
revert();
});
it('Should post body and filter out datasetId === "@none"', async function () {
let updatePropertiesCalled = false;
it('Should post entities and filter out datasetId === "@none"', async function () {
let batchMergeCalled = false;
const config = {
ngsildUpdates: {
clientSecretVariable: 'CLIENT_SECRET',
Expand Down Expand Up @@ -151,19 +152,24 @@ describe('Test libNgsildUpdates', function () {
};
const Ngsild = function () {
return {
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
updatePropertiesCalled = true;
id.should.equal('id');
assert.deepEqual(body, { id: 'id', type: 'type', attribute: { value: 'value' } });
isOverwrite.should.equal(false);
batchMerge: function (entities, { headers }) {
batchMergeCalled = true;
entities.forEach(entity => {
// Check top-level properties
assert.equal(entity.id, 'id');
assert.equal(entity.type, 'type');

// Check attribute properties
assert.isUndefined(entity.attribute.datasetId, 'datasetId should be filtered out');
assert.property(entity.attribute, 'value');
assert.equal(entity.attribute.value, 'value');
});
assert.deepEqual(headers, expHeaders);
return new Promise(function (resolve) {
resolve({
statusCode: 204
});
});
},
replaceEntities: function () {
}
};
};
Expand Down Expand Up @@ -192,11 +198,11 @@ describe('Test libNgsildUpdates', function () {
ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute);
const ngsildUpdates = new ToTest(config);
await ngsildUpdates.ngsildUpdates(body);
updatePropertiesCalled.should.equal(true);
batchMergeCalled.should.equal(true);
revert();
});
it('Should post body and filter out datasetId === "@none" from attribute array', async function () {
let updatePropertiesCalled = false;
let batchMergeCalled = false;
const config = {
ngsildUpdates: {
clientSecretVariable: 'CLIENT_SECRET',
Expand Down Expand Up @@ -241,11 +247,12 @@ describe('Test libNgsildUpdates', function () {
};
const Ngsild = function () {
return {
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
updatePropertiesCalled = true;
id.should.equal('id');
assert.deepEqual(body, { id: 'id', type: 'type', attribute: [{ value: 'value' }, { value: 'value2', datasetId: 'http://example.com#source10' }] });
isOverwrite.should.equal(false);
batchMerge: function (entities, { headers }) {
batchMergeCalled = true;
entities.forEach(entity => {
assert.deepEqual(entity.id, 'id');
assert.deepEqual(entity, { id: 'id', type: 'type', attribute: [{ value: 'value' }, { value: 'value2', datasetId: 'http://example.com#source10' }] });
});
assert.deepEqual(headers, expHeaders);
return new Promise(function (resolve) {
resolve({
Expand Down Expand Up @@ -282,11 +289,11 @@ describe('Test libNgsildUpdates', function () {
ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute);
const ngsildUpdates = new ToTest(config);
await ngsildUpdates.ngsildUpdates(body);
updatePropertiesCalled.should.equal(true);
batchMergeCalled.should.equal(true);
revert();
});
it('Should post body and not filter out datasetId !== "@none"', async function () {
let updatePropertiesCalled = false;
it('Should post entities and not filter out datasetId !== "@none"', async function () {
let batchMergeCalled = false;
const config = {
ngsildUpdates: {
clientSecretVariable: 'CLIENT_SECRET',
Expand Down Expand Up @@ -325,11 +332,9 @@ describe('Test libNgsildUpdates', function () {
};
const Ngsild = function () {
return {
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
updatePropertiesCalled = true;
id.should.equal('id');
assert.deepEqual(body, { id: 'id', type: 'type', attribute: { datasetId: 'https://example.com/source1', value: 'value' } });
isOverwrite.should.equal(false);
batchMerge: function (entities, { headers }) {
batchMergeCalled = true;
assert.deepEqual(entities, body.entities);
assert.deepEqual(headers, expHeaders);
return new Promise(function (resolve) {
resolve({
Expand Down Expand Up @@ -366,7 +371,7 @@ describe('Test libNgsildUpdates', function () {
ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute);
const ngsildUpdates = new ToTest(config);
await ngsildUpdates.ngsildUpdates(body);
updatePropertiesCalled.should.equal(true);
batchMergeCalled.should.equal(true);
revert();
});
it('Should post body with correct path and token for nonOverwrite upsert', async function () {
Expand Down Expand Up @@ -446,7 +451,7 @@ describe('Test libNgsildUpdates', function () {
revert();
});
it('Should post body with string entity', async function () {
let updatePropertiesCalled = false;
let batchMergeCalled = false;
const config = {
ngsildUpdates: {
clientSecretVariable: 'CLIENT_SECRET',
Expand Down Expand Up @@ -481,11 +486,12 @@ describe('Test libNgsildUpdates', function () {
};
const Ngsild = function () {
return {
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
updatePropertiesCalled = true;
id.should.equal('id');
assert.deepEqual(body, { id: 'id', type: 'type' });
isOverwrite.should.equal(false);
batchMerge: function (entities, { headers }) {
batchMergeCalled = true;
entities.forEach(entity => {
assert.deepEqual(entity.id, 'id');
assert.deepEqual(entity, { id: 'id', type: 'type' });
});
assert.deepEqual(headers, expHeaders);
return new Promise(function (resolve) {
resolve({
Expand Down Expand Up @@ -524,7 +530,7 @@ describe('Test libNgsildUpdates', function () {
const ngsildUpdates = new ToTest(config);
body.entities = JSON.stringify(body.entities);
await ngsildUpdates.ngsildUpdates(body);
updatePropertiesCalled.should.equal(true);
batchMergeCalled.should.equal(true);
revert();
});
});
Expand Down
Loading

0 comments on commit 3096624

Please sign in to comment.