Compare commits

...

9 Commits

13 changed files with 302 additions and 100 deletions
@@ -945,6 +945,9 @@ class Settings : FragmentedStorageFileJson() {
@FormField(R.string.connect_through_relay, FieldForm.TOGGLE, R.string.connect_through_relay_description, 3)
var connectThroughRelay: Boolean = true;
@FormField(R.string.connect_local_direct_through_relay, FieldForm.TOGGLE, R.string.connect_local_direct_through_relay_description, 3)
var connectLocalDirectThroughRelay: Boolean = true;
}
@FormField(R.string.info, FieldForm.GROUP, -1, 21)
@@ -176,7 +176,11 @@ class StateCasting {
fun stopDiscovering() {
_nsdManager?.apply {
_discoveryListeners.forEach {
stopServiceDiscovery(it.value)
try {
stopServiceDiscovery(it.value)
} catch (e: Throwable) {
//Ignored
}
}
}
}
@@ -72,6 +72,10 @@ class PackageBridge : V8Package {
fun buildSpecVersion(): Int {
return JSClientConstants.PLUGIN_SPEC_VERSION;
}
@V8Property
fun buildPlatform(): String {
return "android";
}
@V8Function
fun dispose(value: V8Value) {
@@ -2,9 +2,11 @@ package com.futo.platformplayer.fragment.mainactivity.main
import android.annotation.SuppressLint
import android.os.Bundle
import android.util.AttributeSet
import android.view.LayoutInflater
import android.view.View
import android.view.ViewGroup
import androidx.core.view.allViews
import androidx.lifecycle.lifecycleScope
import com.futo.platformplayer.Settings
import com.futo.platformplayer.UIDialogs
@@ -23,6 +25,8 @@ import com.futo.platformplayer.models.SearchType
import com.futo.platformplayer.states.StateMeta
import com.futo.platformplayer.states.StatePlatform
import com.futo.platformplayer.views.FeedStyle
import com.futo.platformplayer.views.ToggleBar
import com.futo.platformplayer.views.others.RadioGroupView
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
@@ -114,6 +118,25 @@ class ContentSearchResultsFragment : MainFragment() {
}
setPreviewsEnabled(Settings.instance.search.previewFeedItems);
initializeToolbar();
}
fun initializeToolbar(){
if(_toolbarContentView.allViews.any { it is RadioGroupView })
_toolbarContentView.removeView(_toolbarContentView.allViews.find { it is RadioGroupView });
val radioGroup = RadioGroupView(context);
radioGroup.onSelectedChange.subscribe {
if (it.size != 1)
setSearchType(SearchType.VIDEO);
else
setSearchType((it[0] ?: SearchType.VIDEO) as SearchType);
}
radioGroup?.setOptions(listOf(Pair("Media", SearchType.VIDEO), Pair("Creators", SearchType.CREATOR), Pair("Playlists", SearchType.PLAYLIST)), listOf(_searchType), false, true)
_toolbarContentView.addView(radioGroup);
}
override fun cleanup() {
@@ -361,8 +361,7 @@ class StateSync {
_relaySession = SyncSocketSession(
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
keyPair!!,
LittleEndianDataInputStream(socket.getInputStream()),
LittleEndianDataOutputStream(socket.getOutputStream()),
socket,
isHandshakeAllowed = { linkType, syncSocketSession, publicKey, pairingCode, appId -> isHandshakeAllowed(linkType, syncSocketSession, publicKey, pairingCode, appId) },
onNewChannel = { _, c ->
val remotePublicKey = c.remotePublicKey
@@ -407,12 +406,14 @@ class StateSync {
relaySession.publishConnectionInformation(unconnectedAuthorizedDevices, PORT, Settings.instance.synchronization.discoverThroughRelay, false, false, Settings.instance.synchronization.discoverThroughRelay && Settings.instance.synchronization.connectThroughRelay)
Logger.v(TAG, "Requesting ${unconnectedAuthorizedDevices.size} devices connection information")
val connectionInfos = runBlocking { relaySession.requestBulkConnectionInfo(unconnectedAuthorizedDevices) }
Logger.v(TAG, "Received ${connectionInfos.size} devices connection information")
for ((targetKey, connectionInfo) in connectionInfos) {
val potentialLocalAddresses = connectionInfo.ipv4Addresses.union(connectionInfo.ipv6Addresses)
.filter { it != connectionInfo.remoteIp }
if (connectionInfo.allowLocalDirect) {
if (connectionInfo.allowLocalDirect && Settings.instance.synchronization.connectLocalDirectThroughRelay) {
Thread {
try {
Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.")
@@ -433,10 +434,10 @@ class StateSync {
if (connectionInfo.allowRemoteRelayed && Settings.instance.synchronization.connectThroughRelay) {
try {
Log.v(TAG, "Attempting relayed connection with '$targetKey'.")
Logger.v(TAG, "Attempting relayed connection with '$targetKey'.")
runBlocking { relaySession.startRelayedChannel(targetKey, APP_ID, null) }
} catch (e: Throwable) {
Log.e(TAG, "Failed to start relayed channel with $targetKey.", e)
Logger.e(TAG, "Failed to start relayed channel with $targetKey.", e)
}
}
}
@@ -444,7 +445,7 @@ class StateSync {
Thread.sleep(15000)
}
} catch (e: Throwable) {
Log.e(TAG, "Unhandled exception in relay session.", e)
Logger.e(TAG, "Unhandled exception in relay session.", e)
relaySession.stop()
}
}.start()
@@ -460,10 +461,10 @@ class StateSync {
Log.i(TAG, "Started relay session.")
} catch (e: Throwable) {
Log.e(TAG, "Relay session failed.", e)
Thread.sleep(5000)
} finally {
_relaySession?.stop()
_relaySession = null
Thread.sleep(5000)
}
}
}.apply { start() }
@@ -585,16 +586,33 @@ class StateSync {
Logger.i(TAG, "Received SyncSessionData from $remotePublicKey");
val subscriptionPackageString = StateSubscriptions.instance.getSyncSubscriptionsPackageString()
Logger.i(TAG, "syncStateExchange syncSubscriptions b (size: ${subscriptionPackageString.length})")
session.sendData(GJSyncOpcodes.syncSubscriptions, subscriptionPackageString);
Logger.i(TAG, "syncStateExchange syncSubscriptions (size: ${subscriptionPackageString.length})")
session.sendData(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString());
session.sendData(GJSyncOpcodes.syncSubscriptionGroups, StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString());
session.sendData(GJSyncOpcodes.syncPlaylists, StatePlaylists.instance.getSyncPlaylistsPackageString())
val subscriptionGroupPackageString = StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString()
Logger.i(TAG, "syncStateExchange syncSubscriptionGroups b (size: ${subscriptionGroupPackageString.length})")
session.sendData(GJSyncOpcodes.syncSubscriptionGroups, subscriptionGroupPackageString);
Logger.i(TAG, "syncStateExchange syncSubscriptionGroups (size: ${subscriptionGroupPackageString.length})")
session.sendData(GJSyncOpcodes.syncWatchLater, Json.encodeToString(StatePlaylists.instance.getWatchLaterSyncPacket(false)));
val syncPlaylistPackageString = StatePlaylists.instance.getSyncPlaylistsPackageString()
Logger.i(TAG, "syncStateExchange syncPlaylists b (size: ${syncPlaylistPackageString.length})")
session.sendData(GJSyncOpcodes.syncPlaylists, syncPlaylistPackageString)
Logger.i(TAG, "syncStateExchange syncPlaylists (size: ${syncPlaylistPackageString.length})")
val watchLaterPackageString = Json.encodeToString(StatePlaylists.instance.getWatchLaterSyncPacket(false))
Logger.i(TAG, "syncStateExchange syncWatchLater b (size: ${watchLaterPackageString.length})")
session.sendData(GJSyncOpcodes.syncWatchLater, watchLaterPackageString);
Logger.i(TAG, "syncStateExchange syncWatchLater (size: ${watchLaterPackageString.length})")
val recentHistory = StateHistory.instance.getRecentHistory(syncSessionData.lastHistory);
Logger.i(TAG, "syncStateExchange syncHistory b (size: ${recentHistory.size})")
if(recentHistory.isNotEmpty())
session.sendJsonData(GJSyncOpcodes.syncHistory, recentHistory);
Logger.i(TAG, "syncStateExchange syncHistory (size: ${recentHistory.size})")
}
GJSyncOpcodes.syncExport -> {
@@ -737,6 +755,9 @@ class StateSync {
val json = String(dataBody, Charsets.UTF_8);
val history = Serializer.json.decodeFromString<List<HistoryVideo>>(json);
Logger.i(TAG, "SyncHistory received ${history.size} videos from ${remotePublicKey}");
if (history.size == 1) {
Logger.i(TAG, "SyncHistory received update video '${history[0].video.name}' (url: ${history[0].video.url}) at timestamp ${history[0].position}");
}
var lastHistory = OffsetDateTime.MIN;
for(video in history){
@@ -822,7 +843,17 @@ class StateSync {
}
},
dataHandler = { it, opcode, subOpcode, data ->
handleData(it, opcode, subOpcode, data)
val dataCopy = ByteArray(data.remaining())
data.get(dataCopy)
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
try {
handleData(it, opcode, subOpcode, ByteBuffer.wrap(dataCopy))
} catch (e: Throwable) {
Logger.e(TAG, "Exception occurred while handling data, closing session", e)
it.close()
}
}
},
remoteDeviceName
)
@@ -857,8 +888,7 @@ class StateSync {
return SyncSocketSession(
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
keyPair!!,
LittleEndianDataInputStream(socket.getInputStream()),
LittleEndianDataOutputStream(socket.getOutputStream()),
socket,
onClose = { s ->
if (channelSocket != null)
session?.removeChannel(channelSocket!!)
@@ -5,9 +5,11 @@ import com.futo.platformplayer.noise.protocol.CipherStatePair
import com.futo.platformplayer.noise.protocol.DHState
import com.futo.platformplayer.noise.protocol.HandshakeState
import com.futo.platformplayer.states.StateSync
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.Base64
import java.util.zip.GZIPOutputStream
interface IChannel : AutoCloseable {
val remotePublicKey: String?
@@ -15,7 +17,7 @@ interface IChannel : AutoCloseable {
var authorizable: IAuthorizable?
var syncSession: SyncSession?
fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?)
fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null)
fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null, contentEncoding: ContentEncoding? = null)
fun setCloseHandler(onClose: ((IChannel) -> Unit)?)
val linkType: LinkType
}
@@ -49,9 +51,9 @@ class ChannelSocket(private val session: SyncSocketSession) : IChannel {
onData?.invoke(session, this, opcode, subOpcode, data)
}
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?) {
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, contentEncoding: ContentEncoding?) {
if (data != null) {
session.send(opcode, subOpcode, data)
session.send(opcode, subOpcode, data, contentEncoding)
} else {
session.send(opcode, subOpcode)
}
@@ -183,51 +185,70 @@ class ChannelRelayed(
}
}
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?) {
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, ce: ContentEncoding?) {
throwIfDisposed()
val actualCount = data?.remaining() ?: 0
var contentEncoding: ContentEncoding? = ce
var processedData = data
if (data != null && contentEncoding == ContentEncoding.Gzip) {
val isGzipSupported = opcode == Opcode.DATA.value
if (isGzipSupported) {
val compressedStream = ByteArrayOutputStream()
GZIPOutputStream(compressedStream).use { gzipStream ->
gzipStream.write(data.array(), data.position(), data.remaining())
gzipStream.finish()
}
processedData = ByteBuffer.wrap(compressedStream.toByteArray())
} else {
Logger.w(TAG, "Gzip requested but not supported on this (opcode = ${opcode}, subOpcode = ${subOpcode}), falling back.")
contentEncoding = ContentEncoding.Raw
}
}
val ENCRYPTION_OVERHEAD = 16
val CONNECTION_ID_SIZE = 8
val HEADER_SIZE = 6
val HEADER_SIZE = 7
val MAX_DATA_PER_PACKET = SyncSocketSession.MAXIMUM_PACKET_SIZE - HEADER_SIZE - CONNECTION_ID_SIZE - ENCRYPTION_OVERHEAD - 16
if (actualCount > MAX_DATA_PER_PACKET && data != null) {
Logger.v(TAG, "Send (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.size: ${processedData?.remaining()})")
if (processedData != null && processedData.remaining() > MAX_DATA_PER_PACKET) {
val streamId = session.generateStreamId()
val totalSize = actualCount
var sendOffset = 0
while (sendOffset < totalSize) {
val bytesRemaining = totalSize - sendOffset
val bytesToSend = minOf(MAX_DATA_PER_PACKET - 8 - 2, bytesRemaining)
while (sendOffset < processedData.remaining()) {
val bytesRemaining = processedData.remaining() - sendOffset
val bytesToSend = minOf(MAX_DATA_PER_PACKET - 8 - HEADER_SIZE + 4, bytesRemaining)
val streamData: ByteArray
val streamOpcode: StreamOpcode
if (sendOffset == 0) {
streamOpcode = StreamOpcode.START
streamData = ByteArray(4 + 4 + 1 + 1 + bytesToSend)
streamData = ByteArray(4 + HEADER_SIZE + bytesToSend)
ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(streamId)
putInt(totalSize)
putInt(processedData.remaining())
put(opcode.toByte())
put(subOpcode.toByte())
put(data.array(), data.position() + sendOffset, bytesToSend)
put(contentEncoding?.value?.toByte() ?: 0.toByte())
put(processedData.array(), processedData.position() + sendOffset, bytesToSend)
}
} else {
streamData = ByteArray(4 + 4 + bytesToSend)
ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(streamId)
putInt(sendOffset)
put(data.array(), data.position() + sendOffset, bytesToSend)
put(processedData.array(), processedData.position() + sendOffset, bytesToSend)
}
streamOpcode = if (bytesToSend < bytesRemaining) StreamOpcode.DATA else StreamOpcode.END
}
val fullPacket = ByteArray(HEADER_SIZE + streamData.size)
ByteBuffer.wrap(fullPacket).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(streamData.size + 2)
putInt(streamData.size + HEADER_SIZE - 4)
put(Opcode.STREAM.value.toByte())
put(streamOpcode.value.toByte())
put(ContentEncoding.Raw.value.toByte())
put(streamData)
}
@@ -235,12 +256,13 @@ class ChannelRelayed(
sendOffset += bytesToSend
}
} else {
val packet = ByteArray(HEADER_SIZE + actualCount)
val packet = ByteArray(HEADER_SIZE + (processedData?.remaining() ?: 0))
ByteBuffer.wrap(packet).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(actualCount + 2)
putInt((processedData?.remaining() ?: 0) + HEADER_SIZE - 4)
put(opcode.toByte())
put(subOpcode.toByte())
if (actualCount > 0 && data != null) put(data.array(), data.position(), actualCount)
put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte())
if (processedData != null && processedData.remaining() > 0) put(processedData.array(), processedData.position(), processedData.remaining())
}
sendPacket(packet)
}
@@ -333,4 +355,8 @@ class ChannelRelayed(
completeHandshake(remoteVersion, transport)
}
}
companion object {
private val TAG = "Channel"
}
}
@@ -0,0 +1,6 @@
package com.futo.platformplayer.sync.internal
enum class ContentEncoding(val value: UByte) {
Raw(0u),
Gzip(1u)
}
@@ -196,14 +196,14 @@ class SyncSession : IAuthorizable {
}
fun sendData(subOpcode: UByte, data: String) {
send(Opcode.DATA.value, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)))
send(Opcode.DATA.value, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)), ContentEncoding.Gzip)
}
fun send(opcode: UByte, subOpcode: UByte, data: String) {
send(opcode, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)))
send(opcode, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)), ContentEncoding.Gzip)
}
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer? = null) {
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer? = null, contentEncoding: ContentEncoding? = null) {
val channels = synchronized(_channels) { _channels.sortedBy { it.linkType.ordinal }.toList() }
if (channels.isEmpty()) {
//TODO: Should this throw?
@@ -214,11 +214,13 @@ class SyncSession : IAuthorizable {
var sent = false
for (channel in channels) {
try {
channel.send(opcode, subOpcode, data)
channel.send(opcode, subOpcode, data, contentEncoding)
sent = true
break
} catch (e: Throwable) {
Logger.w(TAG, "Packet failed to send (opcode = $opcode, subOpcode = $subOpcode)", e)
Logger.w(TAG, "Packet failed to send (opcode = $opcode, subOpcode = $subOpcode), closing channel", e)
channel.close()
removeChannel(channel)
}
}
@@ -9,26 +9,35 @@ import com.futo.platformplayer.noise.protocol.CipherStatePair
import com.futo.platformplayer.noise.protocol.DHState
import com.futo.platformplayer.noise.protocol.HandshakeState
import com.futo.platformplayer.states.StateSync
import com.futo.platformplayer.sync.internal.ChannelRelayed.Companion
import kotlinx.coroutines.CompletableDeferred
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.OutputStream
import java.net.Inet4Address
import java.net.Inet6Address
import java.net.InetAddress
import java.net.NetworkInterface
import java.net.Socket
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.Base64
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.GZIPOutputStream
import kotlin.math.min
import kotlin.system.measureTimeMillis
import kotlin.time.measureTime
class SyncSocketSession {
private val _inputStream: LittleEndianDataInputStream
private val _outputStream: LittleEndianDataOutputStream
private val _socket: Socket
private val _inputStream: InputStream
private val _outputStream: OutputStream
private val _sendLockObject = Object()
private val _buffer = ByteArray(MAXIMUM_PACKET_SIZE_ENCRYPTED)
private val _bufferDecrypted = ByteArray(MAXIMUM_PACKET_SIZE)
private val _sendBuffer = ByteArray(MAXIMUM_PACKET_SIZE)
private val _sendBufferEncrypted = ByteArray(MAXIMUM_PACKET_SIZE_ENCRYPTED)
private val _sendBufferEncrypted = ByteArray(4 + MAXIMUM_PACKET_SIZE_ENCRYPTED)
private val _syncStreams = hashMapOf<Int, SyncStream>()
private var _streamIdGenerator = 0
private val _streamIdGeneratorLock = Object()
@@ -81,8 +90,7 @@ class SyncSocketSession {
constructor(
remoteAddress: String,
localKeyPair: DHState,
inputStream: LittleEndianDataInputStream,
outputStream: LittleEndianDataOutputStream,
socket: Socket,
onClose: ((session: SyncSocketSession) -> Unit)? = null,
onHandshakeComplete: ((session: SyncSocketSession) -> Unit)? = null,
onData: ((session: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit)? = null,
@@ -90,8 +98,12 @@ class SyncSocketSession {
onChannelEstablished: ((session: SyncSocketSession, channel: ChannelRelayed, isResponder: Boolean) -> Unit)? = null,
isHandshakeAllowed: ((linkType: LinkType, session: SyncSocketSession, remotePublicKey: String, pairingCode: String?, appId: UInt) -> Boolean)? = null
) {
_inputStream = inputStream
_outputStream = outputStream
_socket = socket
_socket.receiveBufferSize = MAXIMUM_PACKET_SIZE_ENCRYPTED
_socket.sendBufferSize = MAXIMUM_PACKET_SIZE_ENCRYPTED
_socket.tcpNoDelay = true
_inputStream = _socket.getInputStream()
_outputStream = _socket.getOutputStream()
_onClose = onClose
_onHandshakeComplete = onHandshakeComplete
_localKeyPair = localKeyPair
@@ -150,30 +162,45 @@ class SyncSocketSession {
}.apply { start() }
}
private fun readExact(buffer: ByteArray, offset: Int, size: Int) {
var totalBytesReceived: Int = 0
while (totalBytesReceived < size) {
val bytesReceived = _inputStream.read(buffer, offset + totalBytesReceived, size - totalBytesReceived)
if (bytesReceived == 0)
throw Exception("Socket disconnected")
totalBytesReceived += bytesReceived
}
}
private fun receiveLoop() {
while (_started) {
try {
val messageSize = _inputStream.readInt()
//Logger.v(TAG, "Waiting for message size...")
readExact(_buffer, 0, 4)
val messageSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
//Logger.v(TAG, "Read message size ${messageSize}.")
if (messageSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) {
throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)")
}
//Logger.i(TAG, "Receiving message (size = ${messageSize})")
var bytesRead = 0
while (bytesRead < messageSize) {
val read = _inputStream.read(_buffer, bytesRead, messageSize - bytesRead)
if (read == -1)
throw Exception("Stream closed")
bytesRead += read
}
readExact(_buffer, 0, messageSize)
//Logger.v(TAG, "Read ${messageSize}.")
//Logger.v(TAG, "Decrypting ${messageSize} bytes.")
val plen: Int = _cipherStatePair!!.receiver.decryptWithAd(null, _buffer, 0, _bufferDecrypted, 0, messageSize)
//Logger.i(TAG, "Decrypted message (size = ${plen})")
//Logger.v(TAG, "Decrypted ${messageSize} bytes.")
handleData(_bufferDecrypted, plen, null)
//Logger.v(TAG, "Handled data ${messageSize} bytes.")
} catch (e: Throwable) {
Logger.e(TAG, "Exception while receiving data", e)
Logger.e(TAG, "Exception while receiving data, closing socket session", e)
stop()
break
}
}
@@ -203,8 +230,7 @@ class SyncSocketSession {
_channels.values.forEach { it.close() }
_channels.clear()
_onClose?.invoke(this)
_inputStream.close()
_outputStream.close()
_socket.close()
_thread = null
_cipherStatePair?.sender?.destroy()
_cipherStatePair?.receiver?.destroy()
@@ -237,18 +263,25 @@ class SyncSocketSession {
val mainBuffer = ByteArray(512)
val mainLength = initiator.writeMessage(mainBuffer, 0, null, 0, 0)
val messageData = ByteBuffer.allocate(4 + 4 + pairingMessageLength + mainLength).order(ByteOrder.LITTLE_ENDIAN)
val messageSize = 4 + 4 + pairingMessageLength + mainLength
val messageData = ByteBuffer.allocate(4 + messageSize).order(ByteOrder.LITTLE_ENDIAN)
messageData.putInt(messageSize)
messageData.putInt(appId.toInt())
messageData.putInt(pairingMessageLength)
if (pairingMessageLength > 0) messageData.put(pairingMessage)
messageData.put(mainBuffer, 0, mainLength)
val messageDataArray = messageData.array()
_outputStream.writeInt(messageDataArray.size)
_outputStream.write(messageDataArray)
_outputStream.write(messageDataArray, 0, 4 + messageSize)
readExact(_buffer, 0, 4)
val responseSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
if (responseSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) {
throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)")
}
val responseSize = _inputStream.readInt()
val responseMessage = ByteArray(responseSize)
_inputStream.readFully(responseMessage)
readExact(responseMessage, 0, responseSize)
val plaintext = ByteArray(512) // Buffer for any payload (none expected here)
initiator.readMessage(responseMessage, 0, responseSize, plaintext, 0)
@@ -265,11 +298,16 @@ class SyncSocketSession {
responder.localKeyPair.copyFrom(_localKeyPair)
responder.start()
val messageSize = _inputStream.readInt()
val message = ByteArray(messageSize)
_inputStream.readFully(message)
val messageBuffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN)
readExact(_buffer, 0, 4)
val messageSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
if (messageSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) {
throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)")
}
val message = ByteArray(messageSize)
readExact(message, 0, messageSize)
val messageBuffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN)
val appId = messageBuffer.int.toUInt()
val pairingMessageLength = messageBuffer.int
val pairingMessage = if (pairingMessageLength > 0) ByteArray(pairingMessageLength).also { messageBuffer.get(it) } else byteArrayOf()
@@ -298,10 +336,10 @@ class SyncSocketSession {
return false
}
val responseBuffer = ByteArray(512)
val responseLength = responder.writeMessage(responseBuffer, 0, null, 0, 0)
_outputStream.writeInt(responseLength)
_outputStream.write(responseBuffer, 0, responseLength)
val responseBuffer = ByteArray(4 + 512)
val responseLength = responder.writeMessage(responseBuffer, 4, null, 0, 0)
ByteBuffer.wrap(responseBuffer).order(ByteOrder.LITTLE_ENDIAN).putInt(responseLength)
_outputStream.write(responseBuffer, 0, 4 + responseLength)
_cipherStatePair = responder.split()
_remotePublicKey = remotePublicKey
@@ -311,8 +349,13 @@ class SyncSocketSession {
private fun performVersionCheck() {
val CURRENT_VERSION = 4
val MINIMUM_VERSION = 4
_outputStream.writeInt(CURRENT_VERSION)
remoteVersion = _inputStream.readInt()
val versionBytes = ByteArray(4)
ByteBuffer.wrap(versionBytes).order(ByteOrder.LITTLE_ENDIAN).putInt(CURRENT_VERSION)
_outputStream.write(versionBytes, 0, 4)
readExact(versionBytes, 0, 4)
remoteVersion = ByteBuffer.wrap(versionBytes, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
Logger.i(TAG, "performVersionCheck (version = $remoteVersion)")
if (remoteVersion < MINIMUM_VERSION)
throw Exception("Invalid version")
@@ -321,25 +364,44 @@ class SyncSocketSession {
fun generateStreamId(): Int = synchronized(_streamIdGeneratorLock) { _streamIdGenerator++ }
private fun generateRequestId(): Int = synchronized(_requestIdGeneratorLock) { _requestIdGenerator++ }
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer) {
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer, ce: ContentEncoding? = null) {
ensureNotMainThread()
if (data.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) {
Logger.v(TAG, "send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()})")
var contentEncoding: ContentEncoding? = ce
var processedData = data
if (contentEncoding == ContentEncoding.Gzip) {
val isGzipSupported = opcode == Opcode.DATA.value
if (isGzipSupported) {
val compressedStream = ByteArrayOutputStream()
GZIPOutputStream(compressedStream).use { gzipStream ->
gzipStream.write(data.array(), data.position(), data.remaining())
gzipStream.finish()
}
processedData = ByteBuffer.wrap(compressedStream.toByteArray())
} else {
Logger.w(TAG, "Gzip requested but not supported on this (opcode = ${opcode}, subOpcode = ${subOpcode}), falling back.")
contentEncoding = ContentEncoding.Raw
}
}
if (processedData.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) {
val segmentSize = MAXIMUM_PACKET_SIZE - HEADER_SIZE
val segmentData = ByteArray(segmentSize)
var sendOffset = 0
val id = generateStreamId()
while (sendOffset < data.remaining()) {
val bytesRemaining = data.remaining() - sendOffset
while (sendOffset < processedData.remaining()) {
val bytesRemaining = processedData.remaining() - sendOffset
var bytesToSend: Int
var segmentPacketSize: Int
val streamOp: StreamOpcode
if (sendOffset == 0) {
streamOp = StreamOpcode.START
bytesToSend = segmentSize - 4 - 4 - 1 - 1
segmentPacketSize = bytesToSend + 4 + 4 + 1 + 1
bytesToSend = segmentSize - 4 - HEADER_SIZE
segmentPacketSize = bytesToSend + 4 + HEADER_SIZE
} else {
bytesToSend = minOf(segmentSize - 4 - 4, bytesRemaining)
streamOp = if (bytesToSend >= bytesRemaining) StreamOpcode.END else StreamOpcode.DATA
@@ -348,12 +410,13 @@ class SyncSocketSession {
ByteBuffer.wrap(segmentData).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(id)
putInt(if (streamOp == StreamOpcode.START) data.remaining() else sendOffset)
putInt(if (streamOp == StreamOpcode.START) processedData.remaining() else sendOffset)
if (streamOp == StreamOpcode.START) {
put(opcode.toByte())
put(subOpcode.toByte())
put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte())
}
put(data.array(), data.position() + sendOffset, bytesToSend)
put(processedData.array(), processedData.position() + sendOffset, bytesToSend)
}
send(Opcode.STREAM.value, streamOp.value, ByteBuffer.wrap(segmentData, 0, segmentPacketSize))
@@ -362,17 +425,19 @@ class SyncSocketSession {
} else {
synchronized(_sendLockObject) {
ByteBuffer.wrap(_sendBuffer).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(data.remaining() + 2)
putInt(processedData.remaining() + HEADER_SIZE - 4)
put(opcode.toByte())
put(subOpcode.toByte())
put(data.array(), data.position(), data.remaining())
put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte())
put(processedData.array(), processedData.position(), processedData.remaining())
}
//Logger.i(TAG, "Encrypting message (size = ${data.size + HEADER_SIZE})")
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 0, data.remaining() + HEADER_SIZE)
//Logger.i(TAG, "Sending encrypted message (size = ${len})")
_outputStream.writeInt(len)
_outputStream.write(_sendBufferEncrypted, 0, len)
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 4, processedData.remaining() + HEADER_SIZE)
val sendDuration = measureTimeMillis {
ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len)
_outputStream.write(_sendBufferEncrypted, 0, 4 + len)
}
//Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.remaining(): ${processedData.remaining()}, sendDuration: ${sendDuration})")
}
}
}
@@ -385,14 +450,15 @@ class SyncSocketSession {
ByteBuffer.wrap(_sendBuffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(2)
_sendBuffer.asUByteArray()[4] = opcode
_sendBuffer.asUByteArray()[5] = subOpcode
_sendBuffer.asUByteArray()[6] = ContentEncoding.Raw.value
//Logger.i(TAG, "Encrypting message (opcode = ${opcode}, subOpcode = ${subOpcode}, size = ${HEADER_SIZE})")
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 0, HEADER_SIZE)
//Logger.i(TAG, "Sending encrypted message (size = ${len})")
_outputStream.writeInt(len)
_outputStream.write(_sendBufferEncrypted, 0, len)
ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len)
_outputStream.write(_sendBufferEncrypted, 0, 4 + len)
}
}
@@ -403,7 +469,7 @@ class SyncSocketSession {
private fun handleData(data: ByteBuffer, sourceChannel: ChannelRelayed?) {
val length = data.remaining()
if (length < HEADER_SIZE)
throw Exception("Packet must be at least 6 bytes (header size)")
throw Exception("Packet must be at least ${HEADER_SIZE} bytes (header size)")
val size = data.int
if (size != length - 4)
@@ -411,7 +477,10 @@ class SyncSocketSession {
val opcode = data.get().toUByte()
val subOpcode = data.get().toUByte()
handlePacket(opcode, subOpcode, data, sourceChannel)
val contentEncoding = data.get().toUByte()
//Logger.v(TAG, "handleData (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data.remaining()}, sourceChannel.connectionId: ${sourceChannel?.connectionId})")
handlePacket(opcode, subOpcode, data, contentEncoding, sourceChannel)
}
private fun handleRequest(subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) {
@@ -759,9 +828,23 @@ class SyncSocketSession {
}
}
private fun handlePacket(opcode: UByte, subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) {
private fun handlePacket(opcode: UByte, subOpcode: UByte, d: ByteBuffer, contentEncoding: UByte, sourceChannel: ChannelRelayed?) {
Logger.i(TAG, "Handle packet (opcode = ${opcode}, subOpcode = ${subOpcode})")
var data = d
if (contentEncoding == ContentEncoding.Gzip.value) {
val isGzipSupported = opcode == Opcode.DATA.value
if (!isGzipSupported)
throw Exception("Failed to handle packet, gzip is not supported for this opcode (opcode = ${opcode}, subOpcode = ${subOpcode}, data.length = ${data.remaining()}).")
val compressedStream = ByteArrayOutputStream()
GZIPOutputStream(compressedStream).use { gzipStream ->
gzipStream.write(data.array(), data.position(), data.remaining())
gzipStream.finish()
}
data = ByteBuffer.wrap(compressedStream.toByteArray())
}
when (opcode) {
Opcode.PING.value -> {
if (sourceChannel != null)
@@ -799,8 +882,9 @@ class SyncSocketSession {
val expectedSize = data.int
val op = data.get().toUByte()
val subOp = data.get().toUByte()
val ce = data.get().toUByte()
val syncStream = SyncStream(expectedSize, op, subOp)
val syncStream = SyncStream(expectedSize, op, subOp, ce)
if (data.remaining() > 0) {
syncStream.add(data.array(), data.position(), data.remaining())
}
@@ -845,7 +929,7 @@ class SyncSocketSession {
throw Exception("After sync stream end, the stream must be complete")
}
handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, sourceChannel)
handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, contentEncoding, sourceChannel)
}
}
Opcode.DATA.value -> {
@@ -1025,7 +1109,7 @@ class SyncSocketSession {
send(Opcode.NOTIFY.value, NotifyOpcode.CONNECTION_INFO.value, publishBytes)
}
suspend fun publishRecords(consumerPublicKeys: List<String>, key: String, data: ByteArray): Boolean {
suspend fun publishRecords(consumerPublicKeys: List<String>, key: String, data: ByteArray, contentEncoding: ContentEncoding? = null): Boolean {
val keyBytes = key.toByteArray(Charsets.UTF_8)
if (key.isEmpty() || keyBytes.size > 32) throw IllegalArgumentException("Key must be 1-32 bytes")
if (consumerPublicKeys.isEmpty()) throw IllegalArgumentException("At least one consumer required")
@@ -1080,7 +1164,7 @@ class SyncSocketSession {
}
}
packet.rewind()
send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet)
send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet, ce = contentEncoding)
} catch (e: Exception) {
_pendingPublishRequests.remove(requestId)?.completeExceptionally(e)
throw e
@@ -1200,6 +1284,6 @@ class SyncSocketSession {
private const val TAG = "SyncSocketSession"
const val MAXIMUM_PACKET_SIZE = 65535 - 16
const val MAXIMUM_PACKET_SIZE_ENCRYPTED = MAXIMUM_PACKET_SIZE + 16
const val HEADER_SIZE = 6
const val HEADER_SIZE = 7
}
}
@@ -1,6 +1,6 @@
package com.futo.platformplayer.sync.internal
class SyncStream(expectedSize: Int, val opcode: UByte, val subOpcode: UByte) {
class SyncStream(expectedSize: Int, val opcode: UByte, val subOpcode: UByte, val contentEncoding: UByte) {
companion object {
const val MAXIMUM_SIZE = 10_000_000
}
@@ -45,6 +45,10 @@ open class ChannelView : LinearLayout {
_buttonSubscribe = findViewById(R.id.button_subscribe);
_platformIndicator = findViewById(R.id.platform_indicator);
//_textName.setOnClickListener { currentChannel?.let { onClick.emit(it) }; }
//_creatorThumbnail.setOnClickListener { currentChannel?.let { onClick.emit(it) }; }
//_textMetadata.setOnClickListener { currentChannel?.let { onClick.emit(it) }; }
if (_tiny) {
_buttonSubscribe.visibility = View.GONE;
_textMetadata.visibility = View.GONE;
@@ -66,8 +70,11 @@ open class ChannelView : LinearLayout {
open fun bind(content: IPlatformContent) {
isClickable = true;
if(content !is IPlatformChannelContent)
return
if(content !is IPlatformChannelContent) {
currentChannel = null;
return;
}
currentChannel = content;
_creatorThumbnail.setThumbnail(content.thumbnail, false);
_textName.text = content.name;
@@ -13,6 +13,17 @@ class RadioGroupView : FlexboxLayout {
val selectedOptions = arrayListOf<Any?>();
val onSelectedChange = Event1<List<Any?>>();
constructor(context: Context) : super(context) {
flexWrap = FlexWrap.WRAP;
_padding_px = TypedValue.applyDimension(TypedValue.COMPLEX_UNIT_DIP, _padding_dp, context.resources.displayMetrics).toInt();
if (isInEditMode) {
setOptions(listOf("Example 1" to 1, "Example 2" to 2, "Example 3" to 3, "Example 4" to 4, "Example 5" to 5), listOf("Example 1", "Example 2"),
multiSelect = true,
atLeastOne = false
);
}
}
constructor(context: Context, attrs: AttributeSet) : super(context, attrs) {
flexWrap = FlexWrap.WRAP;
_padding_px = TypedValue.applyDimension(TypedValue.COMPLEX_UNIT_DIP, _padding_dp, context.resources.displayMetrics).toInt();
+2
View File
@@ -382,6 +382,8 @@
<string name="pair_through_relay_description">Allow devices to be paired through the relay</string>
<string name="connect_through_relay">Connection through relay</string>
<string name="connect_through_relay_description">Allow devices to be connected to through the relay</string>
<string name="connect_local_direct_through_relay">Connect direct through relay</string>
<string name="connect_local_direct_through_relay_description">Allow devices to be directly locally connected to through information discovered from the relay</string>
<string name="gesture_controls">Gesture controls</string>
<string name="volume_slider">Volume slider</string>
<string name="volume_slider_descr">Enable slide gesture to change volume</string>