Skip to content

Commit

Permalink
Refactor websocket code to $oh namespace & Pass access token throug…
Browse files Browse the repository at this point in the history
…h header (#2907)

Depens on openhab/openhab-core#4515.

This refactors the WebSocket connection code from #2884 to the `$oh`
namespace, same as it is for the SSE logic.
It also passes the access token as WebSocket subprotocol so it is sent
with the `Sec-WebSocket-Protocol` header.

---------

Signed-off-by: Florian Hotze <[email protected]>
  • Loading branch information
florian-h05 authored Dec 31, 2024
1 parent f424e2b commit 85616e1
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 53 deletions.
2 changes: 2 additions & 0 deletions bundles/org.openhab.ui/web/src/js/openhab/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import api from './api'
import auth from './auth'
import sse from './sse'
import ws from './ws'
import media from './media'
import speech from './speech'
import utils from './utils'
Expand All @@ -9,6 +10,7 @@ export default {
api,
auth,
sse,
ws,
media,
speech,
utils
Expand Down
105 changes: 105 additions & 0 deletions bundles/org.openhab.ui/web/src/js/openhab/ws.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import { getAccessToken } from './auth'

const HEARTBEAT_MESSAGE = `{
"type": "WebSocketEvent",
"topic": "openhab/websocket/heartbeat",
"payload": "PING",
"source": "WebSocketTestInstance"
}`

const openWSConnections = []

function newWSConnection (path, messageCallback, readyCallback, errorCallback, heartbeatCallback, heartbeatInterval) {
// Create a new WebSocket connection
const socket = new WebSocket(path, [`org.openhab.ws.accessToken.base64.${btoa(getAccessToken())}`, 'org.openhab.ws.protocol.default'])

// Handle WebSocket connection opened
socket.onopen = (event) => {
socket.setKeepalive(heartbeatInterval)
if (readyCallback) {
readyCallback(event)
}
}

// Handle WebSocket message received
socket.onmessage = (event) => {
let evt = event.data
try {
evt = JSON.parse(event.data)
} catch (e) {
console.error('Error while parsing message', e)
}
messageCallback(evt)
}

// Handle WebSocket error
socket.onerror = (event) => {
console.error('WebSocket error', event)
if (errorCallback) {
errorCallback(event)
}
}

// WebSocket keep alive
socket.setKeepalive = (seconds = 5) => {
console.debug('Setting keepalive interval seconds', seconds)
socket.clearKeepalive()
socket.keepaliveTimer = setInterval(() => {
if (heartbeatCallback) {
heartbeatCallback()
} else {
socket.send(HEARTBEAT_MESSAGE)
}
}, seconds * 1000)
}

socket.clearKeepalive = () => {
if (socket.keepaliveTimer) clearInterval(socket.keepaliveTimer)
delete socket.keepaliveTimer
}

// Add the new WebSocket connection to the list
openWSConnections.push(socket)
console.debug(`new WS connection: ${socket.url}, ${openWSConnections.length} open connections`)
console.debug(openWSConnections)

return socket
}

export default {
/**
* Connect to the websocket at the given path.
*
* @param {string} path path to connect to, e.g. `/ws`
* @param {fn} messageCallback
* @param {fn} [readyCallback=null]
* @param {fn} [errorCallback=null]
* @param {fn} [heartbeatCallback=null] heartbeat callback to use instead of the default PING/PONG
* @param {number} [heartbeatInterval=5] heartbeat interval in seconds
* @return {WebSocket}
*/
connect (path, messageCallback, readyCallback = null, errorCallback = null, heartbeatCallback = null, heartbeatInterval = 5) {
return newWSConnection(path, messageCallback, readyCallback, errorCallback, heartbeatCallback, heartbeatInterval)
},
/**
* Close the given websocket connection.
*
* @param {WebSocket} socket
* @param {fn} [callback=null] callback to execute on connection close
*/
close (socket, callback = null) {
if (!socket) return
if (openWSConnections.indexOf(socket) >= 0) {
openWSConnections.splice(openWSConnections.indexOf(socket), 1)
}
console.debug(`WS connection closed: ${socket.url}, ${openWSConnections.length} open connections`)
console.debug(openWSConnections)
socket.onclose = (event) => {
if (callback) {
callback(event)
}
}
socket.close()
socket.clearKeepalive()
}
}
68 changes: 15 additions & 53 deletions bundles/org.openhab.ui/web/src/pages/developer/log-viewer.vue
Original file line number Diff line number Diff line change
Expand Up @@ -312,24 +312,14 @@
</style>

<script lang="ts">
import Vue from 'vue'
import auth from '@/components/auth-mixin.js'
import { getAccessToken, getTokenInCustomHeader, getBasicCredentials } from '@/js/openhab/auth.js'
export default {
mixins: [auth],
components: {
},
data () {
return {
stateConnecting: false,
stateConnected: false,
stateProcessing: true,
scrollTime: 0,
autoScroll: true,
socket: {},
keepAliveTimer: null,
socket: null,
defaultLogLevel: 'WARN',
logPackageInputText: '',
highlightFilters: [],
Expand Down Expand Up @@ -425,44 +415,21 @@ export default {
this.loggerPackages = this.loggerPackages.filter(loggerPackage => loggerPackage.loggerName !== logger.loggerName)
},
socketConnect () {
this.stateConnecting = true
// Create a new WebSocket connection
const wsUrl = '/ws/logs?accessToken=' + getAccessToken()
this.socket = new WebSocket(wsUrl)
const me = this
// Event handler when the WebSocket connection is opened
this.socket.onopen = function () {
me.stateConnected = true
me.stateConnecting = false
me.stateProcessing = true
me.$nextTick(() => me.scrollToBottom())
const readyCallback = () => {
this.stateConnected = true
this.stateProcessing = true
this.$nextTick(() => this.scrollToBottom())
}
// Event handler when a message is received from OpenHAB
this.socket.onmessage = function (event) {
try {
const data = JSON.parse(event.data)
me.addLogEntry(data)
} catch (e) {
console.error('Error parsing event data:', e)
}
const messageCallback = (event) => {
this.addLogEntry(event)
}
// Event handler for WebSocket errors
this.socket.onerror = function (error) {
console.error('WebSocket error:', error)
}
// Event handler when the WebSocket connection is closed
this.socket.onclose = function () {
me.stateConnected = false
me.stateConnecting = false
const keepaliveCallback = () => {
this.socket.send('[]')
}
this.keepAliveTimer = setTimeout(this.keepAlive, 9000)
this.socket = this.$oh.ws.connect('/ws/logs', messageCallback, readyCallback, null, keepaliveCallback, 9)
// TEMP
// for (let i = 0; i < 1980; i++) {
Expand All @@ -474,15 +441,10 @@ export default {
// })
// }
},
keepAlive () {
if (this.socket && this.stateConnected) {
this.socket.send('[]')
this.keepAliveTimer = setTimeout(this.keepAlive, 9000)
} else {
if (this.keepAliveTimer) {
clearTimeout(this.keepAliveTimer)
}
}
socketClose () {
this.$oh.ws.close(this.socket, () => {
this.stateConnected = false
})
},
renderEntry (entity) {
let tr = document.createElement('tr')
Expand Down Expand Up @@ -574,7 +536,7 @@ export default {
},
loggingStop () {
this.stateConnected = false
this.socket.close()
this.socketClose()
},
clearLog () {
this.tableData.length = 0
Expand Down

0 comments on commit 85616e1

Please sign in to comment.