Compare commits

...

2 Commits

@@ -89,6 +89,8 @@ class FCastCastingDevice : CastingDevice {
private var _version: Long = 1; private var _version: Long = 1;
private var _thread: Thread? = null private var _thread: Thread? = null
private var _pingThread: Thread? = null private var _pingThread: Thread? = null
private var _lastPongTime = -1L
private var _outputStreamLock = Object()
constructor(name: String, addresses: Array<InetAddress>, port: Int) : super() { constructor(name: String, addresses: Array<InetAddress>, port: Int) : super() {
this.name = name; this.name = name;
@@ -208,7 +210,13 @@ class FCastCastingDevice : CastingDevice {
private fun invokeInIOScopeIfRequired(action: () -> Unit): Boolean { private fun invokeInIOScopeIfRequired(action: () -> Unit): Boolean {
if(Looper.getMainLooper().thread == Thread.currentThread()) { if(Looper.getMainLooper().thread == Thread.currentThread()) {
_scopeIO?.launch { action(); } _scopeIO?.launch {
try {
action();
} catch (e: Throwable) {
Logger.e(TAG, "Failed to invoke in IO scope.", e)
}
}
return true; return true;
} }
@@ -313,12 +321,12 @@ class FCastCastingDevice : CastingDevice {
localAddress = _socket?.localAddress; localAddress = _socket?.localAddress;
connectionState = CastConnectionState.CONNECTED; connectionState = CastConnectionState.CONNECTED;
_lastPongTime = -1L
val buffer = ByteArray(4096); val buffer = ByteArray(4096);
Logger.i(TAG, "Started receiving."); Logger.i(TAG, "Started receiving.");
var exceptionOccurred = false; while (_scopeIO?.isActive == true) {
while (_scopeIO?.isActive == true && !exceptionOccurred) {
try { try {
val inputStream = _inputStream ?: break; val inputStream = _inputStream ?: break;
Log.d(TAG, "Receiving next packet..."); Log.d(TAG, "Receiving next packet...");
@@ -330,9 +338,8 @@ class FCastCastingDevice : CastingDevice {
val size = ((buffer[3].toLong() shl 24) or (buffer[2].toLong() shl 16) or (buffer[1].toLong() shl 8) or buffer[0].toLong()).toInt(); val size = ((buffer[3].toLong() shl 24) or (buffer[2].toLong() shl 16) or (buffer[1].toLong() shl 8) or buffer[0].toLong()).toInt();
if (size > buffer.size) { if (size > buffer.size) {
Logger.w(TAG, "Skipping packet that is too large $size bytes.") Logger.w(TAG, "Packets larger than $size bytes are not supported.")
inputStream.skip(size.toLong()); break
continue;
} }
Log.d(TAG, "Received header indicating $size bytes. Waiting for message."); Log.d(TAG, "Received header indicating $size bytes. Waiting for message.");
@@ -353,19 +360,21 @@ class FCastCastingDevice : CastingDevice {
try { try {
handleMessage(Opcode.find(opcode), json); handleMessage(Opcode.find(opcode), json);
} catch (e: Throwable) { } catch (e: Throwable) {
Logger.w(TAG, "Failed to handle message.", e); Logger.w(TAG, "Failed to handle message.", e)
break
} }
} catch (e: java.net.SocketException) { } catch (e: java.net.SocketException) {
Logger.e(TAG, "Socket exception while receiving.", e); Logger.e(TAG, "Socket exception while receiving.", e);
exceptionOccurred = true; break
} catch (e: Throwable) { } catch (e: Throwable) {
Logger.e(TAG, "Exception while receiving.", e); Logger.e(TAG, "Exception while receiving.", e);
exceptionOccurred = true; break
} }
} }
try { try {
_socket?.close(); _socket?.close()
_socket = null
Logger.i(TAG, "Socket disconnected."); Logger.i(TAG, "Socket disconnected.");
} catch (e: Throwable) { } catch (e: Throwable) {
Logger.e(TAG, "Failed to close socket.", e) Logger.e(TAG, "Failed to close socket.", e)
@@ -386,10 +395,26 @@ class FCastCastingDevice : CastingDevice {
try { try {
send(Opcode.Ping) send(Opcode.Ping)
} catch (e: Throwable) { } catch (e: Throwable) {
Log.w(TAG, "Failed to send ping."); Log.w(TAG, "Failed to send ping.")
try {
_socket?.close()
} catch (e: Throwable) {
Log.w(TAG, "Failed to close socket.", e)
}
} }
Thread.sleep(5000); if (_lastPongTime != -1L && System.currentTimeMillis() - _lastPongTime > 6000) {
Logger.w(TAG, "Closing socket due to last pong time being larger than 6 seconds.")
try {
_socket?.close()
} catch (e: Throwable) {
Log.w(TAG, "Failed to close socket.", e)
}
}
Thread.sleep(2000)
} }
Logger.i(TAG, "Stopped ping loop."); Logger.i(TAG, "Stopped ping loop.");
@@ -446,39 +471,42 @@ class FCastCastingDevice : CastingDevice {
Logger.i(TAG, "Remote version received: $version") Logger.i(TAG, "Remote version received: $version")
} }
Opcode.Ping -> send(Opcode.Pong) Opcode.Ping -> send(Opcode.Pong)
Opcode.Pong -> _lastPongTime = System.currentTimeMillis()
else -> { } else -> { }
} }
} }
private fun send(opcode: Opcode, message: String? = null) { private fun send(opcode: Opcode, message: String? = null) {
try { synchronized (_outputStreamLock) {
val data: ByteArray = message?.encodeToByteArray() ?: ByteArray(0) try {
val size = 1 + data.size val data: ByteArray = message?.encodeToByteArray() ?: ByteArray(0)
val outputStream = _outputStream val size = 1 + data.size
if (outputStream == null) { val outputStream = _outputStream
Log.w(TAG, "Failed to send $size bytes, output stream is null.") if (outputStream == null) {
return Log.w(TAG, "Failed to send $size bytes, output stream is null.")
return
}
val serializedSizeLE = ByteArray(4)
serializedSizeLE[0] = (size and 0xff).toByte()
serializedSizeLE[1] = (size shr 8 and 0xff).toByte()
serializedSizeLE[2] = (size shr 16 and 0xff).toByte()
serializedSizeLE[3] = (size shr 24 and 0xff).toByte()
outputStream.write(serializedSizeLE)
val opcodeBytes = ByteArray(1)
opcodeBytes[0] = opcode.value
outputStream.write(opcodeBytes)
if (data.isNotEmpty()) {
outputStream.write(data)
}
Log.d(TAG, "Sent $size bytes: (opcode: $opcode, body: $message).")
} catch (e: Throwable) {
Log.i(TAG, "Failed to send message.", e)
throw e
} }
val serializedSizeLE = ByteArray(4)
serializedSizeLE[0] = (size and 0xff).toByte()
serializedSizeLE[1] = (size shr 8 and 0xff).toByte()
serializedSizeLE[2] = (size shr 16 and 0xff).toByte()
serializedSizeLE[3] = (size shr 24 and 0xff).toByte()
outputStream.write(serializedSizeLE)
val opcodeBytes = ByteArray(1)
opcodeBytes[0] = opcode.value
outputStream.write(opcodeBytes)
if (data.isNotEmpty()) {
outputStream.write(data)
}
Log.d(TAG, "Sent $size bytes: (opcode: $opcode, body: $message).")
} catch (e: Throwable) {
Log.i(TAG, "Failed to send message.", e)
throw e
} }
} }