Skip to content

Commit

Permalink
Reconnect to server on lost connection
Browse files Browse the repository at this point in the history
  • Loading branch information
toasterofbread committed Dec 4, 2023
1 parent 515eadd commit 601fb82
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ actual class PlatformPlayerService: ZmqSpMsPlayerService(), PlayerService {
if (value == _repeat_mode) {
return
}
// _repeat_mode = value
// onEvent { it.onRepeatModeChanged(_repeat_mode) }
sendRequest("setRepeatMode", value.ordinal)
}
actual override var volume: Float
Expand All @@ -63,129 +61,36 @@ actual class PlatformPlayerService: ZmqSpMsPlayerService(), PlayerService {
if (value == _volume) {
return
}
// _volume = value
// onEvent { it.onVolumeChanged(_volume) }
sendRequest("setVolume", value)
}

actual override fun isPlayingOverLatentDevice(): Boolean = false // TODO

private fun onEvent(action: (PlayerListener) -> Unit) {
for (listener in listeners) {
action(listener)
listener.onEvents()
}
}

actual override fun play() {
// if (playlist.isEmpty() || _is_playing) {
// return
// }
//
// _is_playing = true
// updateCurrentSongPosition(current_song_time)
//
// onEvent { it.onPlayingChanged(_is_playing) }

sendRequest("play")
}

actual override fun pause() {
// if (playlist.isEmpty() || !_is_playing) {
// return
// }
//
// _is_playing = false
// updateCurrentSongPosition(System.currentTimeMillis() - current_song_time)
//
// onEvent { it.onPlayingChanged(_is_playing) }

sendRequest("pause")
}

actual override fun playPause() {
// if (playlist.isEmpty()) {
// return
// }
//
// val pos_ms = current_position_ms
// _is_playing = !_is_playing
// updateCurrentSongPosition(pos_ms)
//
// onEvent { it.onPlayingChanged(_is_playing) }

sendRequest("playPause")
}

actual override fun seekTo(position_ms: Long) {
// if (playlist.isEmpty()) {
// return
// }
//
// updateCurrentSongPosition(position_ms)
//
// onEvent { it.onSeeked(position_ms) }
sendRequest("seekToTime", position_ms)
}

actual override fun seekToSong(index: Int) {
// require(index in playlist.indices) { "$index | ${playlist.toList()}" }
//
// _current_song_index = index
// _duration_ms = 0
// updateCurrentSongPosition(0)
//
// onEvent {
// it.onSongTransition(playlist[index], true)
// it.onEvents()
// }

sendRequest("seekToItem", index)
}

actual override fun seekToNext() {
// if (playlist.isEmpty()) {
// return
// }
//
// val target_index = when (_repeat_mode) {
// MediaPlayerRepeatMode.NONE -> if (_current_song_index + 1 == playlist.size) return else _current_song_index + 1
// MediaPlayerRepeatMode.ONE -> _current_song_index
// MediaPlayerRepeatMode.ALL -> if (_current_song_index + 1 == playlist.size) 0 else _current_song_index + 1
// }
//
// _current_song_index = target_index
// current_song_time = 0
// _duration_ms = -1
//
// onEvent {
// it.onSongTransition(playlist[_current_song_index], true)
// it.onEvents()
// }

sendRequest("seekToNext")
}

actual override fun seekToPrevious() {
// if (playlist.isEmpty()) {
// return
// }
//
// val target_index = when (_repeat_mode) {
// MediaPlayerRepeatMode.NONE -> if (_current_song_index == 0) return else _current_song_index - 1
// MediaPlayerRepeatMode.ONE -> _current_song_index
// MediaPlayerRepeatMode.ALL -> if (_current_song_index == 0) playlist.size - 1 else _current_song_index - 1
// }
//
// _current_song_index = target_index
// current_song_time = 0
// _duration_ms = -1
//
// onEvent {
// it.onSongTransition(playlist[_current_song_index], true)
// it.onEvents()
// }

sendRequest("seekToPrevious")
}

Expand All @@ -194,55 +99,14 @@ actual class PlatformPlayerService: ZmqSpMsPlayerService(), PlayerService {
actual override fun getSong(index: Int): Song? = playlist.getOrNull(index)

actual override fun addSong(song: Song, index: Int) {
// require(index in 0..song_count)

// playlist.add(index, song)
sendRequest("addItem", song.id, index)

// if (_current_song_index < 0) {
// _current_song_index = 0
// }
// service_player.session_started = true
//
// onEvent {
// it.onSongAdded(index, song)
// it.onEvents()
// }
}

actual override fun moveSong(from: Int, to: Int) {
// require(from in 0 until song_count)
// require(to in 0 until song_count)
//
// if (playlist.size < 2) {
// return
// }
//
// playlist.add(to, playlist.removeAt(from))
//
// if (from == _current_song_index) {
// _current_song_index = to
// }
// else if (to == _current_song_index) {
// if (from < _current_song_index) {
// _current_song_index--
// }
// else {
// _current_song_index++
// }
// }
//
// onEvent { it.onSongMoved(from, to) }
sendRequest("moveItem", from, to)
}

actual override fun removeSong(index: Int) {
// playlist.removeAt(index)
// if (_current_song_index == playlist.size) {
// _current_song_index--
// }
//
// onEvent { it.onSongRemoved(index) }
sendRequest("removeItem", index)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.zeromq.ZMsg
import java.net.InetAddress

private const val POLL_STATE_INTERVAL: Long = 100
private const val POLL_TIMEOUT_MS: Long = 10000

abstract class ZmqSpMsPlayerService: PlatformServiceImpl(), PlayerService {
abstract val listeners: List<PlayerListener>
Expand Down Expand Up @@ -67,7 +68,7 @@ abstract class ZmqSpMsPlayerService: PlatformServiceImpl(), PlayerService {
}

private val zmq: ZContext = ZContext()
private var socket: ZMQ.Socket? = null
private lateinit var socket: ZMQ.Socket
private val json: Json = Json { ignoreUnknownKeys = true }
private val queued_messages: MutableList<Pair<String, List<Any>>> = mutableListOf()
private val poll_coroutine_scope: CoroutineScope = CoroutineScope(Job())
Expand Down Expand Up @@ -119,25 +120,8 @@ abstract class ZmqSpMsPlayerService: PlatformServiceImpl(), PlayerService {
override fun onCreate() {
context.getPrefs().addListener(prefs_listener)

socket = zmq.createSocket(SocketType.DEALER).apply {
suspend fun tryConnection() {
val server_url: String = getServerURL()
check(connect(getServerURL()))

if (!connectToServer(server_url)) {
disconnect(server_url)
}
}

connect_coroutine_scope.launch {
do {
cancel_connection = false
restart_connection = false
tryConnection()
}
while (restart_connection)
}
}
socket = zmq.createSocket(SocketType.DEALER)
socket.connectToServer()
}

override fun onDestroy() {
Expand All @@ -147,25 +131,33 @@ abstract class ZmqSpMsPlayerService: PlatformServiceImpl(), PlayerService {
context.getPrefs().removeListener(prefs_listener)
}

private inline fun tryTransaction(transaction: () -> Unit) {
while (true) {
try {
transaction()
break
}
catch (e: Throwable) {
if (e.javaClass.name != "org.sqlite.SQLiteException") {
throw e
private fun onSocketConnectionLost(expired_timeout_ms: Long) {
println("Connection to server timed out after ${expired_timeout_ms}ms, reconnecting...")
socket.connectToServer()
}

private fun ZMQ.Socket.connectToServer() {
connect_coroutine_scope.launch {
do {
cancel_connection = false
restart_connection = false

val server_url: String = getServerURL()
check(connect(getServerURL()))

if (!tryConnectToServer(server_url)) {
disconnect(server_url)
}
}
while (restart_connection)
}
}

private suspend fun ZMQ.Socket.connectToServer(url: String): Boolean = withContext(Dispatchers.IO) {
private suspend fun ZMQ.Socket.tryConnectToServer(url: String): Boolean = withContext(Dispatchers.IO) {
val client_info: SpMsClientInfo = SpMsClientInfo(getClientName(), SpMsClientType.HEADLESS)
val handshake_message: ZMsg = ZMsg()
handshake_message.add(json.encodeToString(client_info))
handshake_message.send(this@connectToServer)
handshake_message.send(this@tryConnectToServer)

socket_load_state = PlayerServiceLoadState(
true,
Expand Down Expand Up @@ -273,9 +265,16 @@ abstract class ZmqSpMsPlayerService: PlatformServiceImpl(), PlayerService {
updateCurrentSongPosition(state.current_position_ms.toLong())

poll_coroutine_scope.launchSingle(Dispatchers.IO) {
val context: ZMQ.Context = ZMQ.context(1)
val poller: ZMQ.Poller = context.poller()
poller.register(this@tryConnectToServer, ZMQ.Poller.POLLIN)

while (true) {
delay(POLL_STATE_INTERVAL)
pollServerState()
if (!pollServerState(poller, POLL_TIMEOUT_MS)) {
onSocketConnectionLost(POLL_TIMEOUT_MS)
break
}
}
}

Expand All @@ -289,8 +288,14 @@ abstract class ZmqSpMsPlayerService: PlatformServiceImpl(), PlayerService {
return@withContext true
}

private fun ZMQ.Socket.pollServerState() {
val events: ZMsg = ZMsg.recvMsg(socket ?: return)
private fun ZMQ.Socket.pollServerState(poller: ZMQ.Poller, timeout: Long = -1): Boolean {
val events: ZMsg
if (poller.poll(timeout) > 0) {
events = ZMsg.recvMsg(this)
}
else {
return false
}

for (i in 0 until events.size) {
val event_str: String = events.pop().data.decodeToString().removeSuffix("\u0000")
Expand Down Expand Up @@ -423,9 +428,9 @@ abstract class ZmqSpMsPlayerService: PlatformServiceImpl(), PlayerService {
reply.add(Gson().toJson(message.second))
}
}
reply.send(socket!!)

queued_messages.clear()

return reply.send(socket!!)
}
}

Expand All @@ -435,4 +440,18 @@ abstract class ZmqSpMsPlayerService: PlatformServiceImpl(), PlayerService {
receiveTimeOut = -1
return msg
}

private inline fun tryTransaction(transaction: () -> Unit) {
while (true) {
try {
transaction()
break
}
catch (e: Throwable) {
if (e.javaClass.name != "org.sqlite.SQLiteException") {
throw e
}
}
}
}
}

0 comments on commit 601fb82

Please sign in to comment.