Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot add datasource timesync #733

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/core/datasource/DataSource.datasource.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class DataSource {
async onDisconnect(){}

reset() {}

}

export default DataSource;
31 changes: 19 additions & 12 deletions source/core/datasource/TimeSeries.datasource.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,17 @@ class TimeSeriesDatasource extends DataSource {
async initDataSource(properties) {
await super.initDataSource(properties);
return new Promise(async (resolve, reject) => {
const topics = {
data: this.getTopicId(),
time: this.getTimeTopicId()
};
if(this.dataSynchronizer) {
topics.sync = dataSynchronizer.getTimeTopicId()
}

this.postMessage({
message: 'topics',
topics: {
data: this.getTopicId(),
time: this.getTimeTopicId()
},
topics: topics,
}, async () => {
// listen for Events to callback to subscriptions
const datasourceBroadcastChannel = new BroadcastChannel(this.getTimeTopicId());
Expand Down Expand Up @@ -207,7 +212,7 @@ class TimeSeriesDatasource extends DataSource {
reconnect= false,
mode= this.getMode()) {


await this.checkInit();
let intersectStartTime = startTime;
let intersectEndTime = endTime;

Expand All @@ -227,13 +232,15 @@ class TimeSeriesDatasource extends DataSource {
intersectEndTime = (endDelta < 0) ? this.getMaxTime() : endTime;
}

return this.updateProperties({
startTime: intersectStartTime,
endTime: intersectEndTime,
replaySpeed: replaySpeed,
reconnect : reconnect,
mode: mode
});
if(intersectEndTime !== this.getMinTime() || intersectEndTime !== this.getMaxTime()) {
return this.updateProperties({
startTime: intersectStartTime,
endTime: intersectEndTime,
replaySpeed: replaySpeed,
reconnect: reconnect,
mode: mode
});
}
}
}

Expand Down
12 changes: 7 additions & 5 deletions source/core/datasource/common/handler/TimeSeries.handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ class DelegateReplayHandler extends DelegateHandler {
this.timeBc = new BroadcastChannel(this.timeTopic);
this.timeBc.onmessage = async (event) => {
if (event.data.type === EventType.MASTER_TIME) {

masterTimestamp = event.data.timestamp;
if (masterTimestamp >= endTimestamp) {
await this.disconnect();
Expand All @@ -178,10 +177,12 @@ class DelegateReplayHandler extends DelegateHandler {
// less than 5 sec
if (dTimestamp <= prefetchBatchDuration) {
// request next batch
data = await this.context.nextBatch();
if (!this.status.cancel && data.length > 0) {
this.handleData(data);
lastTimestamp = data[data.length - 1].timestamp;
if (!this.status.cancel) {
data = await this.context.nextBatch();
if (data.length > 0) {
this.handleData(data);
lastTimestamp = data[data.length - 1].timestamp;
}
}
}
fetching = false;
Expand Down Expand Up @@ -229,6 +230,7 @@ class DelegateReplayHandler extends DelegateHandler {
this.context.disconnect();
if (isDefined(this.timeBc)) {
this.timeBc.close();
this.timeBc = undefined;
}
this.initialized = false;
} catch (ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ class SweApiReplayContext extends SweApiContext {

console.warn(`fetching ${relativeStartTime} -> ` +
`${this.properties.endTime} for datasource ${this.properties.dataSourceId}`);
this.collection = await this.replayFunction(properties, relativeStartTime, this.properties.endTime);
// if disconnected, replay function is reset
if(this.replayFunction) {
this.collection = await this.replayFunction(properties, relativeStartTime, this.properties.endTime);
}
}

const fetchNext = async () => {
Expand Down
140 changes: 103 additions & 37 deletions source/core/timesync/DataSynchronizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class DataSynchronizer {
this.dataSources = properties.dataSources || [];
this.replaySpeed = properties.replaySpeed || 1;
this.timerResolution = properties.timerResolution || 5;
this.masterTimeRefreshRate = properties.masterTimeRefreshRate || 250,
this.mode = properties.mode || Mode.REPLAY;
this.masterTimeRefreshRate = properties.masterTimeRefreshRate || 250;
this.mode = properties.mode || Mode.REPLAY;
this.initialized = false;
this.properties = {};
this.properties.replaySpeed = this.replaySpeed;
Expand All @@ -63,7 +63,6 @@ class DataSynchronizer {
this.properties.startTime = 'now';
this.properties.endTime = '2055-01-01Z';
}

}

getTopicId() {
Expand Down Expand Up @@ -287,44 +286,81 @@ class DataSynchronizer {
* Adds a new DataSource object to the list of datasources to synchronize.
* note: don't forget to call reset() to be sure to re-init the synchronizer internal properties.
* @param {Datasource} dataSource - the new datasource to add
* @param [lazy=false] lazy - add to current running synchronizer
*/
async addDataSource(dataSource, lazy = false) {
dataSource.checkInit();
if(lazy) {
return new Promise(async resolve => {
async addDataSource(dataSource) {
return new Promise(async resolve => {
if (!this.initialized) {
console.log(`DataSynchronizer not initialized yet, add DataSource ${dataSource.id} as it`);
this.dataSources.push(dataSource);
this.onTimeChanged(this.getMinTime(), this.getMaxTime());
} else {
const dataSourceForWorker = await this.createDataSourceForWorker(dataSource);
const lastTimestamp = (await this.getCurrentTime()).data;
if (isDefined(lastTimestamp)) {
const minDsTimestamp = new Date(dataSource.getMinTime()).getTime();
const maxDsTimestamp = new Date(dataSource.getMaxTime()).getTime();
const current = lastTimestamp + 1000;
if(current > minDsTimestamp && current < maxDsTimestamp) {
await dataSource.setTimeRange(
new Date(lastTimestamp + 1000).toISOString(),
);
}
}
this.dataSources.push(dataSource);
await this.postMessage({
message: 'add',
dataSources: [dataSourceForWorker]
}, async () => {
if (this.dataSources.length === 1) {
await this.postMessage({
message: 'update-properties',
mode: this.mode,
replaySpeed: this.replaySpeed,
startTime: this.getMinTime(),
endTime: this.getMaxTime()
}, () => {
this.onTimeChanged(this.getMinTime(), this.getMaxTime());
this.onAddedDataSource(dataSource.id);
resolve();
});
} else {
this.onTimeChanged(this.getMinTime(), this.getMaxTime());
this.onAddedDataSource(dataSource.id);
resolve();
}
});
await dataSource.connect();
resolve();
});
} else {
this.dataSources.push(dataSource);
}
}
});
}

/**
* Removes a DataSource object from the list of datasources of the synchronizer.
* @param {DataSource} dataSource - the new datasource to add
* @param [lazy=false] lazy - remove from the current running synchronizer
*/
async removeDataSource(dataSource, lazy = false) {
if(lazy) {
return new Promise(async resolve => {
this.dataSources = this.dataSources.filter( elt => elt.id !== dataSource.getId());
await this.postMessage({
message: 'remove',
dataSources: [dataSource.getId()]
});
await dataSource.disconnect();
resolve();
});
} else {
async removeDataSource(dataSource) {
if(!this.initialized) {
this.dataSources = this.dataSources.filter( elt => elt.id !== dataSource.getId());
this.onTimeChanged(this.getMinTime(),this.getMaxTime());
} else {
return new Promise(async (resolve, reject) => {
try {
this.dataSources = this.dataSources.filter(elt => elt.id !== dataSource.getId());
await this.postMessage({
message: 'remove',
dataSourceIds: [dataSource.getId()]
});
await dataSource.disconnect();
if(this.dataSources.length > 0) {
this.onTimeChanged(this.getMinTime(), this.getMaxTime());
} else {
await this.reset();
}
this.onRemovedDataSource(dataSource.id);
resolve();
}catch (ex) {
reject(ex);
}
});
}
}

Expand All @@ -348,8 +384,12 @@ class DataSynchronizer {
* Connects all dataSources
*/
async connect() {
await this.checkInit();
await this.doConnect();
if((this.mode === Mode.REPLAY && this.dataSources.length === 0)) {
return;
} else {
await this.checkInit();
await this.doConnect();
}
}

async checkInit() {
Expand Down Expand Up @@ -444,8 +484,11 @@ class DataSynchronizer {
startTime: startTime,
endTime: endTime
}, () => {
for (let ds of this.dataSources) {
ds.setTimeRange(startTime, endTime, replaySpeed, reconnect, mode);
if(this.dataSources.length > 0 ) {
for (let ds of this.dataSources) {
ds.setTimeRange(startTime, endTime, replaySpeed, reconnect, mode);
}
this.onTimeChanged(this.getMinTime(),this.getMaxTime());
}
this.mode = mode;
resolve();
Expand All @@ -458,6 +501,18 @@ class DataSynchronizer {
ds.updateProperties(properties);
}
}

resetTimes() {
this.lastTime = {
start: undefined,
end: undefined
};
this.startTime = 0;
this.minTime = 0;
this.maxTime = 0;
this.endTime = 0;
this.lastTime = 0;
}
/**
* Resets reference time
*/
Expand All @@ -466,7 +521,9 @@ class DataSynchronizer {
await this.checkInit();
await this.postMessage({
message: 'reset'
}, resolve);
});
this.resetTimes();
resolve();
});
}

Expand All @@ -485,12 +542,16 @@ class DataSynchronizer {
* Connect the dataSource then the protocol will be opened as well.
*/
async isConnected() {
for (let ds of this.dataSources) {
if (!(await ds.isConnected())) {
return false;
}
if(this.dataSources.length === 0) {
return false;
} else {
await this.checkInit();
return new Promise(async resolve => {
await this.postMessage({
message: 'is-connected'
}, (message) => resolve(message.data));
});
}
return true;
}

async postMessage(props, Fn, checkInit = true) {
Expand All @@ -516,5 +577,10 @@ class DataSynchronizer {
}
}
}
onTimeChanged(start, min){}

onRemovedDataSource(dataSourceId){}

onAddedDataSource(dataSourceId){}
}
export default DataSynchronizer;
33 changes: 21 additions & 12 deletions source/core/timesync/DataSynchronizer.worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,15 @@ async function handleMessage(event) {
} else if (event.data.message === 'connect') {
startMasterTimeInterval(masterTimeRefreshRate);
dataSynchronizerAlgo.checkStart();
} else if (event.data.message === 'remove' && event.data.dataSources) {
} else if(event.data.message === 'is-connected') {
console.log(isDefined(masterTimeInterval), isDefined(dataSynchronizerAlgo), isDefined(dataSynchronizerAlgo.interval))
data = {
message: 'is-connected',
data: isDefined(masterTimeInterval) && isDefined(dataSynchronizerAlgo) && isDefined(dataSynchronizerAlgo.interval)
};
} else if (event.data.message === 'remove' && event.data.dataSourceIds) {
console.log('Remove datasource from synchronizer..')
await removeDataSources(event.data.dataSources);
await removeDataSources(event.data.dataSourceIds);
} else if (event.data.message === 'current-time') {
data = {
message: 'current-time',
Expand Down Expand Up @@ -160,9 +166,11 @@ function initBroadcastChannel(dataTopic, timeTopic) {
} else if(event.data.type === EventType.STATUS) {
const dataSourceId = event.data.dataSourceId;
dataSynchronizerAlgo.setStatus(dataSourceId, event.data.status);
console.log(dataSources[dataSourceId].name + ": status=" + event.data.status);
// bubble the message
bcChannels[dataSourceId].postMessage(event.data);
if(dataSourceId in bcChannels) {
console.log(dataSources[dataSourceId].name + ": status=" + event.data.status);
bcChannels[dataSourceId].postMessage(event.data);
}
}
}

Expand Down Expand Up @@ -192,19 +200,20 @@ function addDataSource(dataSource) {

/**
*
* @param dataSources
* @param dataSourceIds
*/
async function removeDataSources(dataSources) {
for(let dataSource of dataSources) {
await removeDataSource(dataSource);
async function removeDataSources(dataSourceIds) {
for(let dataSourceId of dataSourceIds) {
await removeDataSource(dataSourceId);
}
}

async function removeDataSource(dataSource) {
await dataSynchronizerAlgo.removeDataSource(dataSource);
async function removeDataSource(dataSourceId) {
await dataSynchronizerAlgo.removeDataSource(dataSourceId);
// create a BC to push back the synchronized data into the DATA Stream.
delete bcChannels[dataSource.id];
delete dataSources[dataSource.id];
console.log('deleting BC for datasource '+dataSourceId);
delete bcChannels[dataSourceId];
delete dataSources[dataSourceId];
}

function checkMasterTime() {
Expand Down
2 changes: 1 addition & 1 deletion source/core/timesync/DataSynchronizerAlgo.realtime.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ class DataSynchronizerAlgoRealtime extends DataSynchronizerAlgo {
}
}
const dClock = (performance.now() - refClockTime);
this.tsRun = tsRef + dClock;
// compute next data to return
for (let currentDsId in this.dataSourceMap) {
currentDs = this.dataSourceMap[currentDsId];
this.tsRun = tsRef + dClock;
if (currentDs.dataBuffer.length > 0) {
const dTs = (currentDs.dataBuffer[0].data.timestamp - tsRef);
const dClockAdj = dClock - maxLatency;
Expand Down
Loading