Compare commits

..

30 Commits

Author SHA1 Message Date
Koen J e0b5e7b808 Merge branch 'master' of gitlab.futo.org:videostreaming/grayjay 2025-05-06 16:55:51 +02:00
Koen J ac3a8da002 Various fixes for android to android pairing. 2025-05-06 16:54:58 +02:00
Kelvin 1aa45c2156 Merge branch 'master' of gitlab.futo.org:videostreaming/grayjay 2025-05-06 13:23:10 +02:00
Kelvin 3cf8abd409 Fix racecondition watchlater adds 2025-05-06 13:23:00 +02:00
Koen J db8426779c Merge branch 'master' of gitlab.futo.org:videostreaming/grayjay 2025-05-06 13:04:53 +02:00
Koen J b419e033f3 Casting device more clearly communicates when not ready. Implemented backoffs for SyncServer. 2025-05-06 13:04:42 +02:00
Kelvin d686fa327b Incorrect gzip compression 2025-05-06 12:40:24 +02:00
Kelvin a1ce5eda43 Fix synced ImageVariables showing black images 2025-05-06 11:53:30 +02:00
Koen J 1e790d1aa9 Added toggle to be able to disable local functionality for sync. Sync now automatically closes when pairing is successful. Pairing in progress layouts now properly show again. 2025-05-05 13:34:52 +02:00
Koen J d1d304b758 Merge branch 'master' of gitlab.futo.org:videostreaming/grayjay 2025-05-05 12:00:15 +02:00
Koen J e12b500144 Sync disabled by default. 2025-05-05 12:00:04 +02:00
Kelvin bd77651a1e Merge branch 'master' of gitlab.futo.org:videostreaming/grayjay 2025-05-05 11:18:57 +02:00
Kelvin 35dc186395 Login edgecase fix 2025-05-05 11:18:46 +02:00
Koen J 07e78e0d12 Fixed sending packets without data in sync protocol. 2025-05-05 10:41:21 +02:00
Koen J 5b8905c1d2 Made ipv6 casting URL fix. 2025-05-04 00:50:12 +02:00
Koen J 158a27cbae Casting fixes. 2025-05-03 21:20:19 +02:00
Koen J 5769b39d78 Merge branch 'master' of gitlab.futo.org:videostreaming/grayjay 2025-05-03 21:05:23 +02:00
Koen J 5c96262c75 Crashfix. 2025-05-03 21:05:12 +02:00
Koen J 766f57dc9d Crashfix. 2025-05-03 21:04:39 +02:00
Kelvin 9986078582 Fix sync crash and responsiveness for subs sync 2025-05-03 19:30:47 +02:00
Koen J e047ab5684 Crashfixes. 2025-05-03 17:35:17 +02:00
Koen J a100785ad7 Gzip only for data packets. 2025-05-03 17:09:34 +02:00
Koen J 156eb4d15e Implemented sync protocol gzip. 2025-05-03 15:00:17 +02:00
Koen J dabcfd965f Fixed send on wrong thread. 2025-05-03 12:11:08 +02:00
Koen J d44a71f3be Merge branch 'master' of gitlab.futo.org:videostreaming/grayjay 2025-05-03 11:22:06 +02:00
Koen J f8edd6cf3d Possibel performance improvements to sync under high lat conditions. 2025-05-03 11:21:57 +02:00
Kelvin 2baf53c5a4 Merge branch 'master' of gitlab.futo.org:videostreaming/grayjay 2025-05-02 12:10:58 +02:00
Kelvin c26e9c281f Easier search type switching on results page, fix search result click channels 2025-05-02 12:10:47 +02:00
Koen J 9f78e9b7dd Crashfix in nsdmanager. StateSync reconnects less often. Channels are closed when sending fails in sync. 2025-05-01 21:53:48 +02:00
Kelvin fdaf41b605 BuildPlatform property 2025-05-01 16:31:51 +02:00
29 changed files with 657 additions and 294 deletions
@@ -399,9 +399,11 @@ fun String.matchesDomain(queryDomain: String): Boolean {
fun String.getSubdomainWildcardQuery(): String {
val domainParts = this.split(".");
val sldParts = "." + domainParts[domainParts.size - 2].lowercase() + "." + domainParts[domainParts.size - 1].lowercase();
if(slds.contains(sldParts))
return "." + domainParts.drop(domainParts.size - 3).joinToString(".");
var wildcardDomain = if(domainParts.size > 2)
"." + domainParts.drop(1).joinToString(".")
else
return "." + domainParts.drop(domainParts.size - 2).joinToString(".");
"." + domainParts.joinToString(".");
if(slds.contains(wildcardDomain.lowercase()))
"." + domainParts.joinToString(".");
return wildcardDomain;
}
@@ -7,6 +7,9 @@ import java.net.InetAddress
import java.net.URI
import java.net.URISyntaxException
import java.net.URLEncoder
import java.time.Instant
import java.time.OffsetDateTime
import java.time.ZoneOffset
//Syntax sugaring
inline fun <reified T> Any.assume(): T?{
@@ -33,13 +36,37 @@ fun Boolean?.toYesNo(): String {
fun InetAddress?.toUrlAddress(): String {
return when (this) {
is Inet6Address -> {
"[${hostAddress}]"
val hostAddr = this.hostAddress ?: throw Exception("Invalid address: hostAddress is null")
val index = hostAddr.indexOf('%')
if (index != -1) {
val addrPart = hostAddr.substring(0, index)
val scopeId = hostAddr.substring(index + 1)
"[${addrPart}%25${scopeId}]" // %25 is URL-encoded '%'
} else {
"[$hostAddr]"
}
}
is Inet4Address -> {
hostAddress
this.hostAddress ?: throw Exception("Invalid address: hostAddress is null")
}
else -> {
throw Exception("Invalid address type")
}
}
}
fun Long?.sToOffsetDateTimeUTC(): OffsetDateTime {
if (this == null || this < 0)
return OffsetDateTime.MIN
if(this > 4070912400)
return OffsetDateTime.MAX;
return OffsetDateTime.ofInstant(Instant.ofEpochSecond(this), ZoneOffset.UTC)
}
fun Long?.msToOffsetDateTimeUTC(): OffsetDateTime {
if (this == null || this < 0)
return OffsetDateTime.MIN
if(this > 4070912400)
return OffsetDateTime.MAX;
return OffsetDateTime.ofInstant(Instant.ofEpochMilli(this), ZoneOffset.UTC)
}
@@ -590,7 +590,7 @@ class Settings : FragmentedStorageFileJson() {
@FormField(R.string.allow_ipv6, FieldForm.TOGGLE, R.string.allow_ipv6_description, 4)
@Serializable(with = FlexibleBooleanSerializer::class)
var allowIpv6: Boolean = false;
var allowIpv6: Boolean = true;
/*TODO: Should we have a different casting quality?
@FormField("Preferred Casting Quality", FieldForm.DROPDOWN, "", 3)
@@ -926,7 +926,7 @@ class Settings : FragmentedStorageFileJson() {
@Serializable
class Synchronization {
@FormField(R.string.enabled, FieldForm.TOGGLE, R.string.enabled_description, 1)
var enabled: Boolean = true;
var enabled: Boolean = false;
@FormField(R.string.broadcast, FieldForm.TOGGLE, R.string.broadcast_description, 1)
var broadcast: Boolean = false;
@@ -945,6 +945,12 @@ class Settings : FragmentedStorageFileJson() {
@FormField(R.string.connect_through_relay, FieldForm.TOGGLE, R.string.connect_through_relay_description, 3)
var connectThroughRelay: Boolean = true;
@FormField(R.string.connect_local_direct_through_relay, FieldForm.TOGGLE, R.string.connect_local_direct_through_relay_description, 3)
var connectLocalDirectThroughRelay: Boolean = true;
@FormField(R.string.local_connections, FieldForm.TOGGLE, R.string.local_connections_description, 3)
var localConnections: Boolean = true;
}
@FormField(R.string.info, FieldForm.GROUP, -1, 21)
@@ -279,7 +279,7 @@ fun <T> findNewIndex(originalArr: List<T>, newArr: List<T>, item: T): Int{
}
}
if(newIndex < 0)
return originalArr.size;
return newArr.size;
else
return newIndex;
}
@@ -89,6 +89,14 @@ class SyncHomeActivity : AppCompatActivity() {
updateEmptyVisibility()
}
}
StateSync.instance.confirmStarted(this, {
StateSync.instance.showFailedToBindDialogIfNecessary(this@SyncHomeActivity)
}, {
finish()
}, {
StateSync.instance.showFailedToBindDialogIfNecessary(this@SyncHomeActivity)
})
}
override fun onDestroy() {
@@ -83,6 +83,7 @@ class SyncPairActivity : AppCompatActivity() {
_layoutPairingSuccess.setOnClickListener {
_layoutPairingSuccess.visibility = View.GONE
finish()
}
_layoutPairingError.setOnClickListener {
_layoutPairingError.visibility = View.GONE
@@ -111,9 +112,15 @@ class SyncPairActivity : AppCompatActivity() {
try {
StateSync.instance.connect(deviceInfo) { complete, message ->
lifecycleScope.launch(Dispatchers.Main) {
if (complete != null && complete) {
_layoutPairingSuccess.visibility = View.VISIBLE
_layoutPairing.visibility = View.GONE
if (complete != null) {
if (complete) {
_layoutPairingSuccess.visibility = View.VISIBLE
_layoutPairing.visibility = View.GONE
} else {
_textError.text = message
_layoutPairingError.visibility = View.VISIBLE
_layoutPairing.visibility = View.GONE
}
} else {
_textPairingStatus.text = message
}
@@ -137,8 +144,6 @@ class SyncPairActivity : AppCompatActivity() {
_textError.text = e.message
_layoutPairing.visibility = View.GONE
Logger.e(TAG, "Failed to pair", e)
} finally {
_layoutPairing.visibility = View.GONE
}
}
@@ -149,6 +149,7 @@ class AirPlayCastingDevice : CastingDevice {
break;
} catch (e: Throwable) {
Logger.w(TAG, "Failed to get setup initial connection to AirPlay device.", e)
delay(1000);
}
}
@@ -322,6 +322,7 @@ class ChromecastCastingDevice : CastingDevice {
break;
} catch (e: Throwable) {
Logger.w(TAG, "Failed to get setup initial connection to ChromeCast device.", e)
Thread.sleep(1000);
}
}
@@ -25,6 +25,7 @@ import com.futo.platformplayer.toInetAddress
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.serialization.encodeToString
@@ -289,6 +290,7 @@ class FCastCastingDevice : CastingDevice {
break;
} catch (e: Throwable) {
Logger.w(TAG, "Failed to get setup initial connection to FastCast device.", e)
Thread.sleep(1000);
}
}
@@ -45,6 +45,8 @@ import com.futo.platformplayer.logging.Logger
import com.futo.platformplayer.models.CastingDeviceInfo
import com.futo.platformplayer.parsers.HLS
import com.futo.platformplayer.states.StateApp
import com.futo.platformplayer.states.StateSync
import com.futo.platformplayer.states.StateSync.Companion
import com.futo.platformplayer.stores.CastingDeviceInfoStorage
import com.futo.platformplayer.stores.FragmentedStorage
import com.futo.platformplayer.toUrlAddress
@@ -176,7 +178,11 @@ class StateCasting {
fun stopDiscovering() {
_nsdManager?.apply {
_discoveryListeners.forEach {
stopServiceDiscovery(it.value)
try {
stopServiceDiscovery(it.value)
} catch (e: Throwable) {
Logger.w(TAG, "Failed to stop service discovery", e)
}
}
}
}
@@ -224,12 +230,20 @@ class StateCasting {
override fun onStartDiscoveryFailed(serviceType: String, errorCode: Int) {
Log.e(TAG, "Discovery failed for $serviceType: Error code:$errorCode")
_nsdManager?.stopServiceDiscovery(this)
try {
_nsdManager?.stopServiceDiscovery(this)
} catch (e: Throwable) {
Logger.w(TAG, "Failed to stop service discovery", e)
}
}
override fun onStopDiscoveryFailed(serviceType: String, errorCode: Int) {
Log.e(TAG, "Stop discovery failed for $serviceType: Error code:$errorCode")
_nsdManager?.stopServiceDiscovery(this)
try {
_nsdManager?.stopServiceDiscovery(this)
} catch (e: Throwable) {
Logger.w(TAG, "Failed to stop service discovery", e)
}
}
override fun onServiceFound(service: NsdServiceInfo) {
@@ -72,6 +72,10 @@ class PackageBridge : V8Package {
fun buildSpecVersion(): Int {
return JSClientConstants.PLUGIN_SPEC_VERSION;
}
@V8Property
fun buildPlatform(): String {
return "android";
}
@V8Function
fun dispose(value: V8Value) {
@@ -2,9 +2,11 @@ package com.futo.platformplayer.fragment.mainactivity.main
import android.annotation.SuppressLint
import android.os.Bundle
import android.util.AttributeSet
import android.view.LayoutInflater
import android.view.View
import android.view.ViewGroup
import androidx.core.view.allViews
import androidx.lifecycle.lifecycleScope
import com.futo.platformplayer.Settings
import com.futo.platformplayer.UIDialogs
@@ -23,6 +25,8 @@ import com.futo.platformplayer.models.SearchType
import com.futo.platformplayer.states.StateMeta
import com.futo.platformplayer.states.StatePlatform
import com.futo.platformplayer.views.FeedStyle
import com.futo.platformplayer.views.ToggleBar
import com.futo.platformplayer.views.others.RadioGroupView
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
@@ -114,6 +118,25 @@ class ContentSearchResultsFragment : MainFragment() {
}
setPreviewsEnabled(Settings.instance.search.previewFeedItems);
initializeToolbar();
}
fun initializeToolbar(){
if(_toolbarContentView.allViews.any { it is RadioGroupView })
_toolbarContentView.removeView(_toolbarContentView.allViews.find { it is RadioGroupView });
val radioGroup = RadioGroupView(context);
radioGroup.onSelectedChange.subscribe {
if (it.size != 1)
setSearchType(SearchType.VIDEO);
else
setSearchType((it[0] ?: SearchType.VIDEO) as SearchType);
}
radioGroup?.setOptions(listOf(Pair("Media", SearchType.VIDEO), Pair("Creators", SearchType.CREATOR), Pair("Playlists", SearchType.PLAYLIST)), listOf(_searchType), false, true)
_toolbarContentView.addView(radioGroup);
}
override fun cleanup() {
@@ -29,7 +29,7 @@ data class ImageVariable(
Glide.with(imageView)
.load(bitmap)
.into(imageView)
} else if(resId != null) {
} else if(resId != null && resId > 0) {
Glide.with(imageView)
.load(resId)
.into(imageView)
@@ -113,7 +113,7 @@ class LoginWebViewClient : WebViewClient {
//val domainParts = domain!!.split(".");
//val cookieDomain = "." + domainParts.drop(domainParts.size - 2).joinToString(".");
val cookieDomain = domain!!.getSubdomainWildcardQuery();
if(_pluginConfig == null || _pluginConfig.allowUrls.any { it == "everywhere" || it.lowercase().matchesDomain(cookieDomain) })
if(_pluginConfig == null || _pluginConfig.allowUrls.any { it == "everywhere" || domain.matchesDomain(it) })
_authConfig.cookiesToFind?.let { cookiesToFind ->
val cookies = cookieString.split(";");
for(cookieStr in cookies) {
@@ -67,7 +67,7 @@ class WebViewRequirementExtractor {
if(cookieString != null) {
//val domainParts = domain!!.split(".");
val cookieDomain = domain!!.getSubdomainWildcardQuery()//"." + domainParts.drop(domainParts.size - 2).joinToString(".");
if(allowedUrls.any { it == "everywhere" || it.lowercase().matchesDomain(cookieDomain) })
if(allowedUrls.any { it == "everywhere" || domain.matchesDomain(it) })
cookiesToFind?.let { cookiesToFind ->
val cookies = cookieString.split(";");
for(cookieStr in cookies) {
@@ -29,6 +29,7 @@ import com.futo.platformplayer.activities.CaptchaActivity
import com.futo.platformplayer.activities.IWithResultLauncher
import com.futo.platformplayer.activities.MainActivity
import com.futo.platformplayer.activities.SettingsActivity
import com.futo.platformplayer.activities.SettingsActivity.Companion.settingsActivityClosed
import com.futo.platformplayer.api.media.platforms.js.DevJSClient
import com.futo.platformplayer.api.media.platforms.js.JSClient
import com.futo.platformplayer.background.BackgroundWorker
@@ -411,7 +412,27 @@ class StateApp {
}
if (Settings.instance.synchronization.enabled) {
StateSync.instance.start(context)
StateSync.instance.start(context, {
try {
UIDialogs.toast("Failed to start sync, port in use")
} catch (e: Throwable) {
//Ignored
}
})
}
settingsActivityClosed.subscribe {
if (Settings.instance.synchronization.enabled) {
StateSync.instance.start(context, {
try {
UIDialogs.toast("Failed to start sync, port in use")
} catch (e: Throwable) {
//Ignored
}
})
} else {
StateSync.instance.stop()
}
}
Logger.onLogSubmitted.subscribe {
@@ -707,6 +728,7 @@ class StateApp {
StatePlayer.instance.closeMediaSession();
StateCasting.instance.stop();
StateSync.instance.stop();
StatePlayer.dispose();
Companion.dispose();
_fileLogConsumer?.close();
@@ -19,6 +19,7 @@ import com.futo.platformplayer.exceptions.ReconstructionException
import com.futo.platformplayer.logging.Logger
import com.futo.platformplayer.models.ImportCache
import com.futo.platformplayer.models.Playlist
import com.futo.platformplayer.sToOffsetDateTimeUTC
import com.futo.platformplayer.smartMerge
import com.futo.platformplayer.states.StateSubscriptionGroups.Companion
import com.futo.platformplayer.stores.FragmentedStorage
@@ -85,7 +86,7 @@ class StatePlaylists {
if(value.isEmpty())
return OffsetDateTime.MIN;
val tryParse = value.toLongOrNull() ?: 0;
return OffsetDateTime.ofInstant(Instant.ofEpochSecond(tryParse), ZoneOffset.UTC);
return tryParse.sToOffsetDateTimeUTC();
}
private fun setWatchLaterReorderTime() {
val now = OffsetDateTime.now(ZoneOffset.UTC);
@@ -7,6 +7,7 @@ import android.os.Build
import android.util.Log
import com.futo.platformplayer.LittleEndianDataInputStream
import com.futo.platformplayer.LittleEndianDataOutputStream
import com.futo.platformplayer.R
import com.futo.platformplayer.Settings
import com.futo.platformplayer.UIDialogs
import com.futo.platformplayer.activities.MainActivity
@@ -24,6 +25,7 @@ import com.futo.platformplayer.models.HistoryVideo
import com.futo.platformplayer.models.Subscription
import com.futo.platformplayer.noise.protocol.DHState
import com.futo.platformplayer.noise.protocol.Noise
import com.futo.platformplayer.sToOffsetDateTimeUTC
import com.futo.platformplayer.smartMerge
import com.futo.platformplayer.stores.FragmentedStorage
import com.futo.platformplayer.stores.StringStringMapStorage
@@ -65,6 +67,7 @@ import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.util.Base64
import java.util.Locale
import kotlin.math.min
import kotlin.system.measureTimeMillis
class StateSync {
@@ -77,10 +80,11 @@ class StateSync {
private var _serverSocket: ServerSocket? = null
private var _thread: Thread? = null
private var _connectThread: Thread? = null
private var _started = false
@Volatile private var _started = false
private val _sessions: MutableMap<String, SyncSession> = mutableMapOf()
private val _lastConnectTimesMdns: MutableMap<String, Long> = mutableMapOf()
private val _lastConnectTimesIp: MutableMap<String, Long> = mutableMapOf()
private var _serverStarted = false
//TODO: Should sync mdns and casting mdns be merged?
//TODO: Decrease interval that devices are updated
//TODO: Send less data
@@ -91,6 +95,117 @@ class StateSync {
private var _threadRelay: Thread? = null
private val _remotePendingStatusUpdate = mutableMapOf<String, (complete: Boolean?, message: String) -> Unit>()
private var _nsdManager: NsdManager? = null
private var _discoveryListener: NsdManager.DiscoveryListener = object : NsdManager.DiscoveryListener {
override fun onDiscoveryStarted(regType: String) {
Log.d(TAG, "Service discovery started for $regType")
}
override fun onDiscoveryStopped(serviceType: String) {
Log.i(TAG, "Discovery stopped: $serviceType")
}
override fun onServiceLost(service: NsdServiceInfo) {
Log.e(TAG, "service lost: $service")
// TODO: Handle service lost, e.g., remove device
}
override fun onStartDiscoveryFailed(serviceType: String, errorCode: Int) {
Log.e(TAG, "Discovery failed for $serviceType: Error code:$errorCode")
try {
_nsdManager?.stopServiceDiscovery(this)
} catch (e: Throwable) {
Logger.w(TAG, "Failed to stop service discovery", e)
}
}
override fun onStopDiscoveryFailed(serviceType: String, errorCode: Int) {
Log.e(TAG, "Stop discovery failed for $serviceType: Error code:$errorCode")
try {
_nsdManager?.stopServiceDiscovery(this)
} catch (e: Throwable) {
Logger.w(TAG, "Failed to stop service discovery", e)
}
}
fun addOrUpdate(name: String, adrs: Array<InetAddress>, port: Int, attributes: Map<String, ByteArray>) {
if (!Settings.instance.synchronization.connectDiscovered) {
return
}
val urlSafePkey = attributes.get("pk")?.decodeToString() ?: return
val pkey = Base64.getEncoder().encodeToString(Base64.getDecoder().decode(urlSafePkey.replace('-', '+').replace('_', '/')))
val syncDeviceInfo = SyncDeviceInfo(pkey, adrs.map { it.hostAddress }.toTypedArray(), port, null)
val authorized = isAuthorized(pkey)
if (authorized && !isConnected(pkey)) {
val now = System.currentTimeMillis()
val lastConnectTime = synchronized(_lastConnectTimesMdns) {
_lastConnectTimesMdns[pkey] ?: 0
}
//Connect once every 30 seconds, max
if (now - lastConnectTime > 30000) {
synchronized(_lastConnectTimesMdns) {
_lastConnectTimesMdns[pkey] = now
}
Logger.i(TAG, "Found device authorized device '${name}' with pkey=$pkey, attempting to connect")
try {
connect(syncDeviceInfo)
} catch (e: Throwable) {
Logger.i(TAG, "Failed to connect to $pkey", e)
}
}
}
}
override fun onServiceFound(service: NsdServiceInfo) {
Log.v(TAG, "Service discovery success for ${service.serviceType}: $service")
addOrUpdate(service.serviceName, if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
service.hostAddresses.toTypedArray()
} else {
if(service.host != null)
arrayOf(service.host);
else
arrayOf();
}, service.port, service.attributes)
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
_nsdManager?.registerServiceInfoCallback(service, { it.run() }, object : NsdManager.ServiceInfoCallback {
override fun onServiceUpdated(serviceInfo: NsdServiceInfo) {
Log.v(TAG, "onServiceUpdated: $serviceInfo")
addOrUpdate(serviceInfo.serviceName, serviceInfo.hostAddresses.toTypedArray(), serviceInfo.port, serviceInfo.attributes)
}
override fun onServiceLost() {
Log.v(TAG, "onServiceLost: $service")
// TODO: Handle service lost
}
override fun onServiceInfoCallbackRegistrationFailed(errorCode: Int) {
Log.v(TAG, "onServiceInfoCallbackRegistrationFailed: $errorCode")
}
override fun onServiceInfoCallbackUnregistered() {
Log.v(TAG, "onServiceInfoCallbackUnregistered")
}
})
} else {
_nsdManager?.resolveService(service, object : NsdManager.ResolveListener {
override fun onResolveFailed(serviceInfo: NsdServiceInfo, errorCode: Int) {
Log.v(TAG, "Resolve failed: $errorCode")
}
override fun onServiceResolved(serviceInfo: NsdServiceInfo) {
Log.v(TAG, "Resolve Succeeded: $serviceInfo")
addOrUpdate(serviceInfo.serviceName, arrayOf(serviceInfo.host), serviceInfo.port, serviceInfo.attributes)
}
})
}
}
}
private val _registrationListener = object : NsdManager.RegistrationListener {
override fun onServiceRegistered(serviceInfo: NsdServiceInfo) {
Log.v(TAG, "onServiceRegistered: ${serviceInfo.serviceName}")
@@ -122,7 +237,7 @@ class StateSync {
}
}
fun start(context: Context) {
fun start(context: Context, onServerBindFail: () -> Unit) {
if (_started) {
Logger.i(TAG, "Already started.")
return
@@ -132,108 +247,7 @@ class StateSync {
if (Settings.instance.synchronization.connectDiscovered) {
_nsdManager?.apply {
discoverServices("_gsync._tcp", NsdManager.PROTOCOL_DNS_SD, object : NsdManager.DiscoveryListener {
override fun onDiscoveryStarted(regType: String) {
Log.d(TAG, "Service discovery started for $regType")
}
override fun onDiscoveryStopped(serviceType: String) {
Log.i(TAG, "Discovery stopped: $serviceType")
}
override fun onServiceLost(service: NsdServiceInfo) {
Log.e(TAG, "service lost: $service")
// TODO: Handle service lost, e.g., remove device
}
override fun onStartDiscoveryFailed(serviceType: String, errorCode: Int) {
Log.e(TAG, "Discovery failed for $serviceType: Error code:$errorCode")
_nsdManager?.stopServiceDiscovery(this)
}
override fun onStopDiscoveryFailed(serviceType: String, errorCode: Int) {
Log.e(TAG, "Stop discovery failed for $serviceType: Error code:$errorCode")
_nsdManager?.stopServiceDiscovery(this)
}
fun addOrUpdate(name: String, adrs: Array<InetAddress>, port: Int, attributes: Map<String, ByteArray>) {
if (!Settings.instance.synchronization.connectDiscovered) {
return
}
val urlSafePkey = attributes.get("pk")?.decodeToString() ?: return
val pkey = Base64.getEncoder().encodeToString(Base64.getDecoder().decode(urlSafePkey.replace('-', '+').replace('_', '/')))
val syncDeviceInfo = SyncDeviceInfo(pkey, adrs.map { it.hostAddress }.toTypedArray(), port, null)
val authorized = isAuthorized(pkey)
if (authorized && !isConnected(pkey)) {
val now = System.currentTimeMillis()
val lastConnectTime = synchronized(_lastConnectTimesMdns) {
_lastConnectTimesMdns[pkey] ?: 0
}
//Connect once every 30 seconds, max
if (now - lastConnectTime > 30000) {
synchronized(_lastConnectTimesMdns) {
_lastConnectTimesMdns[pkey] = now
}
Logger.i(TAG, "Found device authorized device '${name}' with pkey=$pkey, attempting to connect")
try {
connect(syncDeviceInfo)
} catch (e: Throwable) {
Logger.i(TAG, "Failed to connect to $pkey", e)
}
}
}
}
override fun onServiceFound(service: NsdServiceInfo) {
Log.v(TAG, "Service discovery success for ${service.serviceType}: $service")
addOrUpdate(service.serviceName, if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
service.hostAddresses.toTypedArray()
} else {
if(service.host != null)
arrayOf(service.host);
else
arrayOf();
}, service.port, service.attributes)
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
_nsdManager?.registerServiceInfoCallback(service, { it.run() }, object : NsdManager.ServiceInfoCallback {
override fun onServiceUpdated(serviceInfo: NsdServiceInfo) {
Log.v(TAG, "onServiceUpdated: $serviceInfo")
addOrUpdate(serviceInfo.serviceName, serviceInfo.hostAddresses.toTypedArray(), serviceInfo.port, serviceInfo.attributes)
}
override fun onServiceLost() {
Log.v(TAG, "onServiceLost: $service")
// TODO: Handle service lost
}
override fun onServiceInfoCallbackRegistrationFailed(errorCode: Int) {
Log.v(TAG, "onServiceInfoCallbackRegistrationFailed: $errorCode")
}
override fun onServiceInfoCallbackUnregistered() {
Log.v(TAG, "onServiceInfoCallbackUnregistered")
}
})
} else {
_nsdManager?.resolveService(service, object : NsdManager.ResolveListener {
override fun onResolveFailed(serviceInfo: NsdServiceInfo, errorCode: Int) {
Log.v(TAG, "Resolve failed: $errorCode")
}
override fun onServiceResolved(serviceInfo: NsdServiceInfo) {
Log.v(TAG, "Resolve Succeeded: $serviceInfo")
addOrUpdate(serviceInfo.serviceName, arrayOf(serviceInfo.host), serviceInfo.port, serviceInfo.attributes)
}
})
}
}
})
discoverServices("_gsync._tcp", NsdManager.PROTOCOL_DNS_SD, _discoveryListener)
}
}
@@ -284,23 +298,31 @@ class StateSync {
Logger.i(TAG, "Sync key pair initialized (public key = ${publicKey})")
_thread = Thread {
try {
val serverSocket = ServerSocket(PORT)
_serverSocket = serverSocket
if (Settings.instance.synchronization.localConnections) {
_serverStarted = true
_thread = Thread {
try {
val serverSocket = ServerSocket(PORT)
_serverSocket = serverSocket
Log.i(TAG, "Running on port ${PORT} (TCP)")
Log.i(TAG, "Running on port ${PORT} (TCP)")
while (_started) {
val socket = serverSocket.accept()
val session = createSocketSession(socket, true)
session.startAsResponder()
while (_started) {
val socket = serverSocket.accept()
val session = createSocketSession(socket, true)
session.startAsResponder()
}
} catch (e: Throwable) {
_serverStarted = false
Logger.e(TAG, "Failed to bind server socket to port ${PORT}", e)
StateApp.instance.scopeOrNull?.launch(Dispatchers.Main) {
onServerBindFail.invoke()
}
} finally {
_serverStarted = false
}
} catch (e: Throwable) {
Logger.e(TAG, "Failed to bind server socket to port ${PORT}", e)
UIDialogs.toast("Failed to start sync, port in use")
}
}.apply { start() }
}.apply { start() }
}
if (Settings.instance.synchronization.connectLast) {
_connectThread = Thread {
@@ -352,6 +374,9 @@ class StateSync {
if (Settings.instance.synchronization.discoverThroughRelay) {
_threadRelay = Thread {
var backoffs: Array<Long> = arrayOf(1000, 5000, 10000, 20000)
var backoffIndex = 0;
while (_started) {
try {
Log.i(TAG, "Starting relay session...")
@@ -361,8 +386,7 @@ class StateSync {
_relaySession = SyncSocketSession(
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
keyPair!!,
LittleEndianDataInputStream(socket.getInputStream()),
LittleEndianDataOutputStream(socket.getOutputStream()),
socket,
isHandshakeAllowed = { linkType, syncSocketSession, publicKey, pairingCode, appId -> isHandshakeAllowed(linkType, syncSocketSession, publicKey, pairingCode, appId) },
onNewChannel = { _, c ->
val remotePublicKey = c.remotePublicKey
@@ -398,6 +422,8 @@ class StateSync {
},
onClose = { socketClosed = true },
onHandshakeComplete = { relaySession ->
backoffIndex = 0
Thread {
try {
while (_started && !socketClosed) {
@@ -407,12 +433,14 @@ class StateSync {
relaySession.publishConnectionInformation(unconnectedAuthorizedDevices, PORT, Settings.instance.synchronization.discoverThroughRelay, false, false, Settings.instance.synchronization.discoverThroughRelay && Settings.instance.synchronization.connectThroughRelay)
Logger.v(TAG, "Requesting ${unconnectedAuthorizedDevices.size} devices connection information")
val connectionInfos = runBlocking { relaySession.requestBulkConnectionInfo(unconnectedAuthorizedDevices) }
Logger.v(TAG, "Received ${connectionInfos.size} devices connection information")
for ((targetKey, connectionInfo) in connectionInfos) {
val potentialLocalAddresses = connectionInfo.ipv4Addresses.union(connectionInfo.ipv6Addresses)
.filter { it != connectionInfo.remoteIp }
if (connectionInfo.allowLocalDirect) {
if (connectionInfo.allowLocalDirect && Settings.instance.synchronization.connectLocalDirectThroughRelay) {
Thread {
try {
Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.")
@@ -433,10 +461,10 @@ class StateSync {
if (connectionInfo.allowRemoteRelayed && Settings.instance.synchronization.connectThroughRelay) {
try {
Log.v(TAG, "Attempting relayed connection with '$targetKey'.")
Logger.v(TAG, "Attempting relayed connection with '$targetKey'.")
runBlocking { relaySession.startRelayedChannel(targetKey, APP_ID, null) }
} catch (e: Throwable) {
Log.e(TAG, "Failed to start relayed channel with $targetKey.", e)
Logger.e(TAG, "Failed to start relayed channel with $targetKey.", e)
}
}
}
@@ -444,7 +472,7 @@ class StateSync {
Thread.sleep(15000)
}
} catch (e: Throwable) {
Log.e(TAG, "Unhandled exception in relay session.", e)
Logger.e(TAG, "Unhandled exception in relay session.", e)
relaySession.stop()
}
}.start()
@@ -460,16 +488,39 @@ class StateSync {
Log.i(TAG, "Started relay session.")
} catch (e: Throwable) {
Log.e(TAG, "Relay session failed.", e)
Thread.sleep(5000)
} finally {
_relaySession?.stop()
_relaySession = null
Thread.sleep(backoffs[min(backoffs.size - 1, backoffIndex++)])
}
}
}.apply { start() }
}
}
fun showFailedToBindDialogIfNecessary(context: Context) {
if (!_serverStarted && Settings.instance.synchronization.localConnections) {
try {
UIDialogs.showDialogOk(context, R.drawable.ic_warning, "Local discovery unavailable, port was in use")
} catch (e: Throwable) {
//Ignored
}
}
}
fun confirmStarted(context: Context, onStarted: () -> Unit, onNotStarted: () -> Unit, onServerBindFail: () -> Unit) {
if (!_started) {
UIDialogs.showConfirmationDialog(context, "Sync has not been enabled yet, would you like to enable sync?", {
Settings.instance.synchronization.enabled = true
StateSync.instance.start(context, onServerBindFail)
Settings.instance.save()
onStarted.invoke()
}, {
onNotStarted.invoke()
})
} else {
onStarted.invoke()
}
}
private fun getDeviceName(): String {
@@ -547,7 +598,7 @@ class StateSync {
added.map { it.channel.name }.joinToString("\n"));
if(pack.subscriptions.isNotEmpty()) {
if(pack.subscriptionRemovals.isNotEmpty()) {
for (subRemoved in pack.subscriptionRemovals) {
val removed = StateSubscriptions.instance.applySubscriptionRemovals(pack.subscriptionRemovals);
if(removed.size > 3) {
@@ -585,16 +636,33 @@ class StateSync {
Logger.i(TAG, "Received SyncSessionData from $remotePublicKey");
val subscriptionPackageString = StateSubscriptions.instance.getSyncSubscriptionsPackageString()
Logger.i(TAG, "syncStateExchange syncSubscriptions b (size: ${subscriptionPackageString.length})")
session.sendData(GJSyncOpcodes.syncSubscriptions, subscriptionPackageString);
Logger.i(TAG, "syncStateExchange syncSubscriptions (size: ${subscriptionPackageString.length})")
session.sendData(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString());
session.sendData(GJSyncOpcodes.syncSubscriptionGroups, StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString());
session.sendData(GJSyncOpcodes.syncPlaylists, StatePlaylists.instance.getSyncPlaylistsPackageString())
val subscriptionGroupPackageString = StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString()
Logger.i(TAG, "syncStateExchange syncSubscriptionGroups b (size: ${subscriptionGroupPackageString.length})")
session.sendData(GJSyncOpcodes.syncSubscriptionGroups, subscriptionGroupPackageString);
Logger.i(TAG, "syncStateExchange syncSubscriptionGroups (size: ${subscriptionGroupPackageString.length})")
session.sendData(GJSyncOpcodes.syncWatchLater, Json.encodeToString(StatePlaylists.instance.getWatchLaterSyncPacket(false)));
val syncPlaylistPackageString = StatePlaylists.instance.getSyncPlaylistsPackageString()
Logger.i(TAG, "syncStateExchange syncPlaylists b (size: ${syncPlaylistPackageString.length})")
session.sendData(GJSyncOpcodes.syncPlaylists, syncPlaylistPackageString)
Logger.i(TAG, "syncStateExchange syncPlaylists (size: ${syncPlaylistPackageString.length})")
val watchLaterPackageString = Json.encodeToString(StatePlaylists.instance.getWatchLaterSyncPacket(false))
Logger.i(TAG, "syncStateExchange syncWatchLater b (size: ${watchLaterPackageString.length})")
session.sendData(GJSyncOpcodes.syncWatchLater, watchLaterPackageString);
Logger.i(TAG, "syncStateExchange syncWatchLater (size: ${watchLaterPackageString.length})")
val recentHistory = StateHistory.instance.getRecentHistory(syncSessionData.lastHistory);
Logger.i(TAG, "syncStateExchange syncHistory b (size: ${recentHistory.size})")
if(recentHistory.isNotEmpty())
session.sendJsonData(GJSyncOpcodes.syncHistory, recentHistory);
Logger.i(TAG, "syncStateExchange syncHistory (size: ${recentHistory.size})")
}
GJSyncOpcodes.syncExport -> {
@@ -627,12 +695,14 @@ class StateSync {
val subPackage = Serializer.json.decodeFromString<SyncSubscriptionsPackage>(json);
handleSyncSubscriptionPackage(session, subPackage);
val newestSub = subPackage.subscriptions.maxOf { it.creationTime };
if(subPackage.subscriptions.size > 0) {
val newestSub = subPackage.subscriptions.maxOf { it.creationTime };
val sesData = getSyncSessionData(remotePublicKey);
if(newestSub > sesData.lastSubscription) {
sesData.lastSubscription = newestSub;
saveSyncSessionData(sesData);
val sesData = getSyncSessionData(remotePublicKey);
if (newestSub > sesData.lastSubscription) {
sesData.lastSubscription = newestSub;
saveSyncSessionData(sesData);
}
}
}
@@ -662,7 +732,7 @@ class StateSync {
}
for(removal in pack.groupRemovals) {
val creation = StateSubscriptionGroups.instance.getSubscriptionGroup(removal.key);
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value, 0), ZoneOffset.UTC);
val removalTime = removal.value.sToOffsetDateTimeUTC();
if(creation != null && creation.creationTime < removalTime)
StateSubscriptionGroups.instance.deleteSubscriptionGroup(removal.key, false);
}
@@ -690,7 +760,7 @@ class StateSync {
}
for(removal in pack.playlistRemovals) {
val creation = StatePlaylists.instance.getPlaylist(removal.key);
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value, 0), ZoneOffset.UTC);
val removalTime = removal.value.sToOffsetDateTimeUTC();
if(creation != null && creation.dateCreation < removalTime)
StatePlaylists.instance.removePlaylist(creation, false);
@@ -708,9 +778,9 @@ class StateSync {
val allExisting = StatePlaylists.instance.getWatchLater();
for(video in pack.videos) {
val existing = allExisting.firstOrNull { it.url == video.url };
val time = if(pack.videoAdds != null && pack.videoAdds.containsKey(video.url)) OffsetDateTime.ofInstant(Instant.ofEpochSecond(pack.videoAdds[video.url] ?: 0), ZoneOffset.UTC) else OffsetDateTime.MIN;
if(existing == null) {
val time = if(pack.videoAdds != null && pack.videoAdds.containsKey(video.url)) (pack.videoAdds[video.url] ?: 0).sToOffsetDateTimeUTC() else OffsetDateTime.MIN;
val removalTime = StatePlaylists.instance.getWatchLaterRemovalTime(video.url) ?: OffsetDateTime.MIN;
if(existing == null && time > removalTime) {
StatePlaylists.instance.addToWatchLater(video, false);
if(time > OffsetDateTime.MIN)
StatePlaylists.instance.setWatchLaterAddTime(video.url, time);
@@ -719,12 +789,12 @@ class StateSync {
for(removal in pack.videoRemovals) {
val watchLater = allExisting.firstOrNull { it.url == removal.key } ?: continue;
val creation = StatePlaylists.instance.getWatchLaterRemovalTime(watchLater.url) ?: OffsetDateTime.MIN;
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value), ZoneOffset.UTC);
val removalTime = removal.value.sToOffsetDateTimeUTC()
if(creation < removalTime)
StatePlaylists.instance.removeFromWatchLater(watchLater, false, removalTime);
}
val packReorderTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(pack.reorderTime), ZoneOffset.UTC);
val packReorderTime = pack.reorderTime.sToOffsetDateTimeUTC()
val localReorderTime = StatePlaylists.instance.getWatchLaterLastReorderTime();
if(localReorderTime < packReorderTime && pack.ordering != null) {
StatePlaylists.instance.updateWatchLaterOrdering(smartMerge(pack.ordering!!, StatePlaylists.instance.getWatchLaterOrdering()), true);
@@ -737,6 +807,9 @@ class StateSync {
val json = String(dataBody, Charsets.UTF_8);
val history = Serializer.json.decodeFromString<List<HistoryVideo>>(json);
Logger.i(TAG, "SyncHistory received ${history.size} videos from ${remotePublicKey}");
if (history.size == 1) {
Logger.i(TAG, "SyncHistory received update video '${history[0].video.name}' (url: ${history[0].video.url}) at timestamp ${history[0].position}");
}
var lastHistory = OffsetDateTime.MIN;
for(video in history){
@@ -758,22 +831,15 @@ class StateSync {
}
}
private fun onAuthorized(remotePublicKey: String) {
synchronized(_remotePendingStatusUpdate) {
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(true, "Authorized")
}
}
private fun onUnuthorized(remotePublicKey: String) {
synchronized(_remotePendingStatusUpdate) {
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(false, "Unauthorized")
}
}
private fun createNewSyncSession(remotePublicKey: String, remoteDeviceName: String?): SyncSession {
private fun createNewSyncSession(rpk: String, remoteDeviceName: String?): SyncSession {
val remotePublicKey = rpk.base64ToByteArray().toBase64()
return SyncSession(
remotePublicKey,
onAuthorized = { it, isNewlyAuthorized, isNewSession ->
synchronized(_remotePendingStatusUpdate) {
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(true, "Authorized")
}
if (!isNewSession) {
return@SyncSession
}
@@ -785,7 +851,6 @@ class StateSync {
}
Logger.i(TAG, "$remotePublicKey authorized (name: ${it.displayName})")
onAuthorized(remotePublicKey)
_authorizedDevices.addDistinct(remotePublicKey)
_authorizedDevices.save()
deviceUpdatedOrAdded.emit(it.remotePublicKey, it)
@@ -793,10 +858,12 @@ class StateSync {
checkForSync(it);
},
onUnauthorized = {
unauthorize(remotePublicKey)
synchronized(_remotePendingStatusUpdate) {
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(false, "Unauthorized")
}
unauthorize(remotePublicKey)
Logger.i(TAG, "$remotePublicKey unauthorized (name: ${it.displayName})")
onUnuthorized(remotePublicKey)
synchronized(_sessions) {
it.close()
@@ -822,7 +889,17 @@ class StateSync {
}
},
dataHandler = { it, opcode, subOpcode, data ->
handleData(it, opcode, subOpcode, data)
val dataCopy = ByteArray(data.remaining())
data.get(dataCopy)
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
try {
handleData(it, opcode, subOpcode, ByteBuffer.wrap(dataCopy))
} catch (e: Throwable) {
Logger.e(TAG, "Exception occurred while handling data, closing session", e)
it.close()
}
}
},
remoteDeviceName
)
@@ -857,8 +934,7 @@ class StateSync {
return SyncSocketSession(
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
keyPair!!,
LittleEndianDataInputStream(socket.getInputStream()),
LittleEndianDataOutputStream(socket.getOutputStream()),
socket,
onClose = { s ->
if (channelSocket != null)
session?.removeChannel(channelSocket!!)
@@ -996,19 +1072,31 @@ class StateSync {
fun stop() {
_started = false
_nsdManager?.unregisterService(_registrationListener)
try {
_nsdManager?.stopServiceDiscovery(_discoveryListener)
} catch (e: Throwable) {
Logger.e(TAG, "Failed to stop discovery listener", e)
}
try {
_nsdManager?.unregisterService(_registrationListener)
} catch (e: Throwable) {
Logger.e(TAG, "Failed to unregister service", e)
}
_relaySession?.stop()
_serverSocket?.close()
_serverSocket = null
_thread?.interrupt()
_thread = null
_connectThread?.interrupt()
_connectThread = null
_threadRelay?.interrupt()
_threadRelay = null
synchronized(_sessions) {
_sessions.values.forEach { it.close() }
_sessions.clear()
}
_relaySession?.stop()
_thread = null
_connectThread = null
_threadRelay = null
_relaySession = null
}
@@ -1024,7 +1112,7 @@ class StateSync {
runBlocking {
if (onStatusUpdate != null) {
synchronized(_remotePendingStatusUpdate) {
_remotePendingStatusUpdate[deviceInfo.publicKey] = onStatusUpdate
_remotePendingStatusUpdate[deviceInfo.publicKey.base64ToByteArray().toBase64()] = onStatusUpdate
}
}
relaySession.startRelayedChannel(deviceInfo.publicKey, APP_ID, deviceInfo.pairingCode)
@@ -1043,7 +1131,7 @@ class StateSync {
val session = createSocketSession(socket, false)
if (onStatusUpdate != null) {
synchronized(_remotePendingStatusUpdate) {
_remotePendingStatusUpdate[publicKey] = onStatusUpdate
_remotePendingStatusUpdate[publicKey.base64ToByteArray().toBase64()] = onStatusUpdate
}
}
@@ -5,9 +5,11 @@ import com.futo.platformplayer.noise.protocol.CipherStatePair
import com.futo.platformplayer.noise.protocol.DHState
import com.futo.platformplayer.noise.protocol.HandshakeState
import com.futo.platformplayer.states.StateSync
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.Base64
import java.util.zip.GZIPOutputStream
interface IChannel : AutoCloseable {
val remotePublicKey: String?
@@ -15,7 +17,7 @@ interface IChannel : AutoCloseable {
var authorizable: IAuthorizable?
var syncSession: SyncSession?
fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?)
fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null)
fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null, contentEncoding: ContentEncoding? = null)
fun setCloseHandler(onClose: ((IChannel) -> Unit)?)
val linkType: LinkType
}
@@ -49,9 +51,9 @@ class ChannelSocket(private val session: SyncSocketSession) : IChannel {
onData?.invoke(session, this, opcode, subOpcode, data)
}
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?) {
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, contentEncoding: ContentEncoding?) {
if (data != null) {
session.send(opcode, subOpcode, data)
session.send(opcode, subOpcode, data, contentEncoding)
} else {
session.send(opcode, subOpcode)
}
@@ -183,51 +185,70 @@ class ChannelRelayed(
}
}
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?) {
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, ce: ContentEncoding?) {
throwIfDisposed()
val actualCount = data?.remaining() ?: 0
var contentEncoding: ContentEncoding? = ce
var processedData = data
if (data != null && contentEncoding == ContentEncoding.Gzip) {
val isGzipSupported = opcode == Opcode.DATA.value
if (isGzipSupported) {
val compressedStream = ByteArrayOutputStream()
GZIPOutputStream(compressedStream).use { gzipStream ->
gzipStream.write(data.array(), data.position(), data.remaining())
gzipStream.finish()
}
processedData = ByteBuffer.wrap(compressedStream.toByteArray())
} else {
Logger.w(TAG, "Gzip requested but not supported on this (opcode = ${opcode}, subOpcode = ${subOpcode}), falling back.")
contentEncoding = ContentEncoding.Raw
}
}
val ENCRYPTION_OVERHEAD = 16
val CONNECTION_ID_SIZE = 8
val HEADER_SIZE = 6
val HEADER_SIZE = 7
val MAX_DATA_PER_PACKET = SyncSocketSession.MAXIMUM_PACKET_SIZE - HEADER_SIZE - CONNECTION_ID_SIZE - ENCRYPTION_OVERHEAD - 16
if (actualCount > MAX_DATA_PER_PACKET && data != null) {
Logger.v(TAG, "Send (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.size: ${processedData?.remaining()})")
if (processedData != null && processedData.remaining() > MAX_DATA_PER_PACKET) {
val streamId = session.generateStreamId()
val totalSize = actualCount
var sendOffset = 0
while (sendOffset < totalSize) {
val bytesRemaining = totalSize - sendOffset
val bytesToSend = minOf(MAX_DATA_PER_PACKET - 8 - 2, bytesRemaining)
while (sendOffset < processedData.remaining()) {
val bytesRemaining = processedData.remaining() - sendOffset
val bytesToSend = minOf(MAX_DATA_PER_PACKET - 8 - HEADER_SIZE + 4, bytesRemaining)
val streamData: ByteArray
val streamOpcode: StreamOpcode
if (sendOffset == 0) {
streamOpcode = StreamOpcode.START
streamData = ByteArray(4 + 4 + 1 + 1 + bytesToSend)
streamData = ByteArray(4 + HEADER_SIZE + bytesToSend)
ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(streamId)
putInt(totalSize)
putInt(processedData.remaining())
put(opcode.toByte())
put(subOpcode.toByte())
put(data.array(), data.position() + sendOffset, bytesToSend)
put(contentEncoding?.value?.toByte() ?: 0.toByte())
put(processedData.array(), processedData.position() + sendOffset, bytesToSend)
}
} else {
streamData = ByteArray(4 + 4 + bytesToSend)
ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(streamId)
putInt(sendOffset)
put(data.array(), data.position() + sendOffset, bytesToSend)
put(processedData.array(), processedData.position() + sendOffset, bytesToSend)
}
streamOpcode = if (bytesToSend < bytesRemaining) StreamOpcode.DATA else StreamOpcode.END
}
val fullPacket = ByteArray(HEADER_SIZE + streamData.size)
ByteBuffer.wrap(fullPacket).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(streamData.size + 2)
putInt(streamData.size + HEADER_SIZE - 4)
put(Opcode.STREAM.value.toByte())
put(streamOpcode.value.toByte())
put(ContentEncoding.Raw.value.toByte())
put(streamData)
}
@@ -235,12 +256,13 @@ class ChannelRelayed(
sendOffset += bytesToSend
}
} else {
val packet = ByteArray(HEADER_SIZE + actualCount)
val packet = ByteArray(HEADER_SIZE + (processedData?.remaining() ?: 0))
ByteBuffer.wrap(packet).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(actualCount + 2)
putInt((processedData?.remaining() ?: 0) + HEADER_SIZE - 4)
put(opcode.toByte())
put(subOpcode.toByte())
if (actualCount > 0 && data != null) put(data.array(), data.position(), actualCount)
put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte())
if (processedData != null && processedData.remaining() > 0) put(processedData.array(), processedData.position(), processedData.remaining())
}
sendPacket(packet)
}
@@ -333,4 +355,8 @@ class ChannelRelayed(
completeHandshake(remoteVersion, transport)
}
}
companion object {
private val TAG = "Channel"
}
}
@@ -0,0 +1,6 @@
package com.futo.platformplayer.sync.internal
enum class ContentEncoding(val value: UByte) {
Raw(0u),
Gzip(1u)
}
@@ -129,9 +129,9 @@ class SyncSession : IAuthorizable {
fun close() {
synchronized(_channels) {
_channels.forEach { it.close() }
_channels.clear()
}
_channels.toTypedArray()
}.forEach { it.close() }
_onClose(this)
}
@@ -196,14 +196,14 @@ class SyncSession : IAuthorizable {
}
fun sendData(subOpcode: UByte, data: String) {
send(Opcode.DATA.value, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)))
send(Opcode.DATA.value, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)), ContentEncoding.Gzip)
}
fun send(opcode: UByte, subOpcode: UByte, data: String) {
send(opcode, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)))
send(opcode, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)), ContentEncoding.Gzip)
}
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer? = null) {
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer? = null, contentEncoding: ContentEncoding? = null) {
val channels = synchronized(_channels) { _channels.sortedBy { it.linkType.ordinal }.toList() }
if (channels.isEmpty()) {
//TODO: Should this throw?
@@ -214,11 +214,13 @@ class SyncSession : IAuthorizable {
var sent = false
for (channel in channels) {
try {
channel.send(opcode, subOpcode, data)
channel.send(opcode, subOpcode, data, contentEncoding)
sent = true
break
} catch (e: Throwable) {
Logger.w(TAG, "Packet failed to send (opcode = $opcode, subOpcode = $subOpcode)", e)
Logger.w(TAG, "Packet failed to send (opcode = $opcode, subOpcode = $subOpcode), closing channel", e)
channel.close()
removeChannel(channel)
}
}
@@ -3,32 +3,46 @@ package com.futo.platformplayer.sync.internal
import android.os.Build
import com.futo.platformplayer.LittleEndianDataInputStream
import com.futo.platformplayer.LittleEndianDataOutputStream
import com.futo.platformplayer.copyToOutputStream
import com.futo.platformplayer.ensureNotMainThread
import com.futo.platformplayer.logging.Logger
import com.futo.platformplayer.noise.protocol.CipherStatePair
import com.futo.platformplayer.noise.protocol.DHState
import com.futo.platformplayer.noise.protocol.HandshakeState
import com.futo.platformplayer.states.StateSync
import com.futo.platformplayer.sync.internal.ChannelRelayed.Companion
import com.futo.polycentric.core.base64ToByteArray
import com.futo.polycentric.core.toBase64
import kotlinx.coroutines.CompletableDeferred
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.OutputStream
import java.net.Inet4Address
import java.net.Inet6Address
import java.net.InetAddress
import java.net.NetworkInterface
import java.net.Socket
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.Base64
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
import kotlin.math.min
import kotlin.system.measureTimeMillis
import kotlin.time.measureTime
class SyncSocketSession {
private val _inputStream: LittleEndianDataInputStream
private val _outputStream: LittleEndianDataOutputStream
private val _socket: Socket
private val _inputStream: InputStream
private val _outputStream: OutputStream
private val _sendLockObject = Object()
private val _buffer = ByteArray(MAXIMUM_PACKET_SIZE_ENCRYPTED)
private val _bufferDecrypted = ByteArray(MAXIMUM_PACKET_SIZE)
private val _sendBuffer = ByteArray(MAXIMUM_PACKET_SIZE)
private val _sendBufferEncrypted = ByteArray(MAXIMUM_PACKET_SIZE_ENCRYPTED)
private val _sendBufferEncrypted = ByteArray(4 + MAXIMUM_PACKET_SIZE_ENCRYPTED)
private val _syncStreams = hashMapOf<Int, SyncStream>()
private var _streamIdGenerator = 0
private val _streamIdGeneratorLock = Object()
@@ -81,8 +95,7 @@ class SyncSocketSession {
constructor(
remoteAddress: String,
localKeyPair: DHState,
inputStream: LittleEndianDataInputStream,
outputStream: LittleEndianDataOutputStream,
socket: Socket,
onClose: ((session: SyncSocketSession) -> Unit)? = null,
onHandshakeComplete: ((session: SyncSocketSession) -> Unit)? = null,
onData: ((session: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit)? = null,
@@ -90,8 +103,12 @@ class SyncSocketSession {
onChannelEstablished: ((session: SyncSocketSession, channel: ChannelRelayed, isResponder: Boolean) -> Unit)? = null,
isHandshakeAllowed: ((linkType: LinkType, session: SyncSocketSession, remotePublicKey: String, pairingCode: String?, appId: UInt) -> Boolean)? = null
) {
_inputStream = inputStream
_outputStream = outputStream
_socket = socket
_socket.receiveBufferSize = MAXIMUM_PACKET_SIZE_ENCRYPTED
_socket.sendBufferSize = MAXIMUM_PACKET_SIZE_ENCRYPTED
_socket.tcpNoDelay = true
_inputStream = _socket.getInputStream()
_outputStream = _socket.getOutputStream()
_onClose = onClose
_onHandshakeComplete = onHandshakeComplete
_localKeyPair = localKeyPair
@@ -150,30 +167,45 @@ class SyncSocketSession {
}.apply { start() }
}
private fun readExact(buffer: ByteArray, offset: Int, size: Int) {
var totalBytesReceived: Int = 0
while (totalBytesReceived < size) {
val bytesReceived = _inputStream.read(buffer, offset + totalBytesReceived, size - totalBytesReceived)
if (bytesReceived <= 0)
throw Exception("Socket disconnected")
totalBytesReceived += bytesReceived
}
}
private fun receiveLoop() {
while (_started) {
try {
val messageSize = _inputStream.readInt()
//Logger.v(TAG, "Waiting for message size...")
readExact(_buffer, 0, 4)
val messageSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
//Logger.v(TAG, "Read message size ${messageSize}.")
if (messageSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) {
throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)")
}
//Logger.i(TAG, "Receiving message (size = ${messageSize})")
var bytesRead = 0
while (bytesRead < messageSize) {
val read = _inputStream.read(_buffer, bytesRead, messageSize - bytesRead)
if (read == -1)
throw Exception("Stream closed")
bytesRead += read
}
readExact(_buffer, 0, messageSize)
//Logger.v(TAG, "Read ${messageSize}.")
//Logger.v(TAG, "Decrypting ${messageSize} bytes.")
val plen: Int = _cipherStatePair!!.receiver.decryptWithAd(null, _buffer, 0, _bufferDecrypted, 0, messageSize)
//Logger.i(TAG, "Decrypted message (size = ${plen})")
//Logger.v(TAG, "Decrypted ${messageSize} bytes.")
handleData(_bufferDecrypted, plen, null)
//Logger.v(TAG, "Handled data ${messageSize} bytes.")
} catch (e: Throwable) {
Logger.e(TAG, "Exception while receiving data", e)
Logger.e(TAG, "Exception while receiving data, closing socket session", e)
stop()
break
}
}
@@ -203,8 +235,7 @@ class SyncSocketSession {
_channels.values.forEach { it.close() }
_channels.clear()
_onClose?.invoke(this)
_inputStream.close()
_outputStream.close()
_socket.close()
_thread = null
_cipherStatePair?.sender?.destroy()
_cipherStatePair?.receiver?.destroy()
@@ -237,25 +268,32 @@ class SyncSocketSession {
val mainBuffer = ByteArray(512)
val mainLength = initiator.writeMessage(mainBuffer, 0, null, 0, 0)
val messageData = ByteBuffer.allocate(4 + 4 + pairingMessageLength + mainLength).order(ByteOrder.LITTLE_ENDIAN)
val messageSize = 4 + 4 + pairingMessageLength + mainLength
val messageData = ByteBuffer.allocate(4 + messageSize).order(ByteOrder.LITTLE_ENDIAN)
messageData.putInt(messageSize)
messageData.putInt(appId.toInt())
messageData.putInt(pairingMessageLength)
if (pairingMessageLength > 0) messageData.put(pairingMessage)
messageData.put(mainBuffer, 0, mainLength)
val messageDataArray = messageData.array()
_outputStream.writeInt(messageDataArray.size)
_outputStream.write(messageDataArray)
_outputStream.write(messageDataArray, 0, 4 + messageSize)
readExact(_buffer, 0, 4)
val responseSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
if (responseSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) {
throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)")
}
val responseSize = _inputStream.readInt()
val responseMessage = ByteArray(responseSize)
_inputStream.readFully(responseMessage)
readExact(responseMessage, 0, responseSize)
val plaintext = ByteArray(512) // Buffer for any payload (none expected here)
initiator.readMessage(responseMessage, 0, responseSize, plaintext, 0)
_cipherStatePair = initiator.split()
val remoteKeyBytes = ByteArray(initiator.remotePublicKey.publicKeyLength)
initiator.remotePublicKey.getPublicKey(remoteKeyBytes, 0)
_remotePublicKey = Base64.getEncoder().encodeToString(remoteKeyBytes)
_remotePublicKey = Base64.getEncoder().encodeToString(remoteKeyBytes).base64ToByteArray().toBase64()
}
private fun handshakeAsResponder(): Boolean {
@@ -265,11 +303,16 @@ class SyncSocketSession {
responder.localKeyPair.copyFrom(_localKeyPair)
responder.start()
val messageSize = _inputStream.readInt()
val message = ByteArray(messageSize)
_inputStream.readFully(message)
val messageBuffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN)
readExact(_buffer, 0, 4)
val messageSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
if (messageSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) {
throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)")
}
val message = ByteArray(messageSize)
readExact(message, 0, messageSize)
val messageBuffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN)
val appId = messageBuffer.int.toUInt()
val pairingMessageLength = messageBuffer.int
val pairingMessage = if (pairingMessageLength > 0) ByteArray(pairingMessageLength).also { messageBuffer.get(it) } else byteArrayOf()
@@ -298,21 +341,26 @@ class SyncSocketSession {
return false
}
val responseBuffer = ByteArray(512)
val responseLength = responder.writeMessage(responseBuffer, 0, null, 0, 0)
_outputStream.writeInt(responseLength)
_outputStream.write(responseBuffer, 0, responseLength)
val responseBuffer = ByteArray(4 + 512)
val responseLength = responder.writeMessage(responseBuffer, 4, null, 0, 0)
ByteBuffer.wrap(responseBuffer).order(ByteOrder.LITTLE_ENDIAN).putInt(responseLength)
_outputStream.write(responseBuffer, 0, 4 + responseLength)
_cipherStatePair = responder.split()
_remotePublicKey = remotePublicKey
_remotePublicKey = remotePublicKey.base64ToByteArray().toBase64()
return true
}
private fun performVersionCheck() {
val CURRENT_VERSION = 4
val MINIMUM_VERSION = 4
_outputStream.writeInt(CURRENT_VERSION)
remoteVersion = _inputStream.readInt()
val versionBytes = ByteArray(4)
ByteBuffer.wrap(versionBytes).order(ByteOrder.LITTLE_ENDIAN).putInt(CURRENT_VERSION)
_outputStream.write(versionBytes, 0, 4)
readExact(versionBytes, 0, 4)
remoteVersion = ByteBuffer.wrap(versionBytes, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
Logger.i(TAG, "performVersionCheck (version = $remoteVersion)")
if (remoteVersion < MINIMUM_VERSION)
throw Exception("Invalid version")
@@ -321,25 +369,44 @@ class SyncSocketSession {
fun generateStreamId(): Int = synchronized(_streamIdGeneratorLock) { _streamIdGenerator++ }
private fun generateRequestId(): Int = synchronized(_requestIdGeneratorLock) { _requestIdGenerator++ }
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer) {
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer, ce: ContentEncoding? = null) {
ensureNotMainThread()
if (data.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) {
Logger.v(TAG, "send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()})")
var contentEncoding: ContentEncoding? = ce
var processedData = data
if (contentEncoding == ContentEncoding.Gzip) {
val isGzipSupported = opcode == Opcode.DATA.value
if (isGzipSupported) {
val compressedStream = ByteArrayOutputStream()
GZIPOutputStream(compressedStream).use { gzipStream ->
gzipStream.write(data.array(), data.position(), data.remaining())
gzipStream.finish()
}
processedData = ByteBuffer.wrap(compressedStream.toByteArray())
} else {
Logger.w(TAG, "Gzip requested but not supported on this (opcode = ${opcode}, subOpcode = ${subOpcode}), falling back.")
contentEncoding = ContentEncoding.Raw
}
}
if (processedData.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) {
val segmentSize = MAXIMUM_PACKET_SIZE - HEADER_SIZE
val segmentData = ByteArray(segmentSize)
var sendOffset = 0
val id = generateStreamId()
while (sendOffset < data.remaining()) {
val bytesRemaining = data.remaining() - sendOffset
while (sendOffset < processedData.remaining()) {
val bytesRemaining = processedData.remaining() - sendOffset
var bytesToSend: Int
var segmentPacketSize: Int
val streamOp: StreamOpcode
if (sendOffset == 0) {
streamOp = StreamOpcode.START
bytesToSend = segmentSize - 4 - 4 - 1 - 1
segmentPacketSize = bytesToSend + 4 + 4 + 1 + 1
bytesToSend = segmentSize - 4 - HEADER_SIZE
segmentPacketSize = bytesToSend + 4 + HEADER_SIZE
} else {
bytesToSend = minOf(segmentSize - 4 - 4, bytesRemaining)
streamOp = if (bytesToSend >= bytesRemaining) StreamOpcode.END else StreamOpcode.DATA
@@ -348,12 +415,13 @@ class SyncSocketSession {
ByteBuffer.wrap(segmentData).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(id)
putInt(if (streamOp == StreamOpcode.START) data.remaining() else sendOffset)
putInt(if (streamOp == StreamOpcode.START) processedData.remaining() else sendOffset)
if (streamOp == StreamOpcode.START) {
put(opcode.toByte())
put(subOpcode.toByte())
put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte())
}
put(data.array(), data.position() + sendOffset, bytesToSend)
put(processedData.array(), processedData.position() + sendOffset, bytesToSend)
}
send(Opcode.STREAM.value, streamOp.value, ByteBuffer.wrap(segmentData, 0, segmentPacketSize))
@@ -362,17 +430,19 @@ class SyncSocketSession {
} else {
synchronized(_sendLockObject) {
ByteBuffer.wrap(_sendBuffer).order(ByteOrder.LITTLE_ENDIAN).apply {
putInt(data.remaining() + 2)
putInt(processedData.remaining() + HEADER_SIZE - 4)
put(opcode.toByte())
put(subOpcode.toByte())
put(data.array(), data.position(), data.remaining())
put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte())
put(processedData.array(), processedData.position(), processedData.remaining())
}
//Logger.i(TAG, "Encrypting message (size = ${data.size + HEADER_SIZE})")
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 0, data.remaining() + HEADER_SIZE)
//Logger.i(TAG, "Sending encrypted message (size = ${len})")
_outputStream.writeInt(len)
_outputStream.write(_sendBufferEncrypted, 0, len)
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 4, processedData.remaining() + HEADER_SIZE)
val sendDuration = measureTimeMillis {
ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len)
_outputStream.write(_sendBufferEncrypted, 0, 4 + len)
}
Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.remaining(): ${processedData.remaining()}, sendDuration: ${sendDuration})")
}
}
}
@@ -382,17 +452,18 @@ class SyncSocketSession {
ensureNotMainThread()
synchronized(_sendLockObject) {
ByteBuffer.wrap(_sendBuffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(2)
ByteBuffer.wrap(_sendBuffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(HEADER_SIZE - 4)
_sendBuffer.asUByteArray()[4] = opcode
_sendBuffer.asUByteArray()[5] = subOpcode
_sendBuffer.asUByteArray()[6] = ContentEncoding.Raw.value
//Logger.i(TAG, "Encrypting message (opcode = ${opcode}, subOpcode = ${subOpcode}, size = ${HEADER_SIZE})")
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 0, HEADER_SIZE)
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 4, HEADER_SIZE)
//Logger.i(TAG, "Sending encrypted message (size = ${len})")
_outputStream.writeInt(len)
_outputStream.write(_sendBufferEncrypted, 0, len)
ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len)
_outputStream.write(_sendBufferEncrypted, 0, 4 + len)
}
}
@@ -403,7 +474,7 @@ class SyncSocketSession {
private fun handleData(data: ByteBuffer, sourceChannel: ChannelRelayed?) {
val length = data.remaining()
if (length < HEADER_SIZE)
throw Exception("Packet must be at least 6 bytes (header size)")
throw Exception("Packet must be at least ${HEADER_SIZE} bytes (header size)")
val size = data.int
if (size != length - 4)
@@ -411,7 +482,10 @@ class SyncSocketSession {
val opcode = data.get().toUByte()
val subOpcode = data.get().toUByte()
handlePacket(opcode, subOpcode, data, sourceChannel)
val contentEncoding = data.get().toUByte()
//Logger.v(TAG, "handleData (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data.remaining()}, sourceChannel.connectionId: ${sourceChannel?.connectionId})")
handlePacket(opcode, subOpcode, data, contentEncoding, sourceChannel)
}
private fun handleRequest(subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) {
@@ -759,9 +833,27 @@ class SyncSocketSession {
}
}
private fun handlePacket(opcode: UByte, subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) {
private fun handlePacket(opcode: UByte, subOpcode: UByte, d: ByteBuffer, contentEncoding: UByte, sourceChannel: ChannelRelayed?) {
Logger.i(TAG, "Handle packet (opcode = ${opcode}, subOpcode = ${subOpcode})")
var data = d
if (contentEncoding == ContentEncoding.Gzip.value) {
val isGzipSupported = opcode == Opcode.DATA.value
if (!isGzipSupported)
throw Exception("Failed to handle packet, gzip is not supported for this opcode (opcode = ${opcode}, subOpcode = ${subOpcode}, data.length = ${data.remaining()}).")
val compressedStream = ByteArrayInputStream(data.array(), data.position(), data.remaining())
val outputStream = ByteArrayOutputStream()
GZIPInputStream(compressedStream).use { gzipStream ->
val buffer = ByteArray(8192) // 8KB buffer
var bytesRead: Int
while (gzipStream.read(buffer).also { bytesRead = it } != -1) {
outputStream.write(buffer, 0, bytesRead)
}
}
data = ByteBuffer.wrap(outputStream.toByteArray())
}
when (opcode) {
Opcode.PING.value -> {
if (sourceChannel != null)
@@ -799,8 +891,9 @@ class SyncSocketSession {
val expectedSize = data.int
val op = data.get().toUByte()
val subOp = data.get().toUByte()
val ce = data.get().toUByte()
val syncStream = SyncStream(expectedSize, op, subOp)
val syncStream = SyncStream(expectedSize, op, subOp, ce)
if (data.remaining() > 0) {
syncStream.add(data.array(), data.position(), data.remaining())
}
@@ -845,7 +938,7 @@ class SyncSocketSession {
throw Exception("After sync stream end, the stream must be complete")
}
handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, sourceChannel)
handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, syncStream.contentEncoding, sourceChannel)
}
}
Opcode.DATA.value -> {
@@ -1025,7 +1118,7 @@ class SyncSocketSession {
send(Opcode.NOTIFY.value, NotifyOpcode.CONNECTION_INFO.value, publishBytes)
}
suspend fun publishRecords(consumerPublicKeys: List<String>, key: String, data: ByteArray): Boolean {
suspend fun publishRecords(consumerPublicKeys: List<String>, key: String, data: ByteArray, contentEncoding: ContentEncoding? = null): Boolean {
val keyBytes = key.toByteArray(Charsets.UTF_8)
if (key.isEmpty() || keyBytes.size > 32) throw IllegalArgumentException("Key must be 1-32 bytes")
if (consumerPublicKeys.isEmpty()) throw IllegalArgumentException("At least one consumer required")
@@ -1080,7 +1173,7 @@ class SyncSocketSession {
}
}
packet.rewind()
send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet)
send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet, ce = contentEncoding)
} catch (e: Exception) {
_pendingPublishRequests.remove(requestId)?.completeExceptionally(e)
throw e
@@ -1200,6 +1293,6 @@ class SyncSocketSession {
private const val TAG = "SyncSocketSession"
const val MAXIMUM_PACKET_SIZE = 65535 - 16
const val MAXIMUM_PACKET_SIZE_ENCRYPTED = MAXIMUM_PACKET_SIZE + 16
const val HEADER_SIZE = 6
const val HEADER_SIZE = 7
}
}
@@ -1,6 +1,6 @@
package com.futo.platformplayer.sync.internal
class SyncStream(expectedSize: Int, val opcode: UByte, val subOpcode: UByte) {
class SyncStream(expectedSize: Int, val opcode: UByte, val subOpcode: UByte, val contentEncoding: UByte) {
companion object {
const val MAXIMUM_SIZE = 10_000_000
}
@@ -45,6 +45,10 @@ open class ChannelView : LinearLayout {
_buttonSubscribe = findViewById(R.id.button_subscribe);
_platformIndicator = findViewById(R.id.platform_indicator);
//_textName.setOnClickListener { currentChannel?.let { onClick.emit(it) }; }
//_creatorThumbnail.setOnClickListener { currentChannel?.let { onClick.emit(it) }; }
//_textMetadata.setOnClickListener { currentChannel?.let { onClick.emit(it) }; }
if (_tiny) {
_buttonSubscribe.visibility = View.GONE;
_textMetadata.visibility = View.GONE;
@@ -66,8 +70,11 @@ open class ChannelView : LinearLayout {
open fun bind(content: IPlatformContent) {
isClickable = true;
if(content !is IPlatformChannelContent)
return
if(content !is IPlatformChannelContent) {
currentChannel = null;
return;
}
currentChannel = content;
_creatorThumbnail.setThumbnail(content.thumbnail, false);
_textName.text = content.name;
@@ -18,6 +18,7 @@ import com.futo.platformplayer.casting.StateCasting
import com.futo.platformplayer.constructs.Event1
import com.futo.platformplayer.constructs.Event2
import androidx.core.view.isVisible
import com.futo.platformplayer.UIDialogs
class DeviceViewHolder : ViewHolder {
private val _layoutDevice: FrameLayout;
@@ -55,9 +56,17 @@ class DeviceViewHolder : ViewHolder {
val connect = {
device?.let { dev ->
StateCasting.instance.activeDevice?.stopCasting();
StateCasting.instance.connectDevice(dev);
onConnect.emit(dev);
if (dev.isReady) {
StateCasting.instance.activeDevice?.stopCasting()
StateCasting.instance.connectDevice(dev)
onConnect.emit(dev)
} else {
try {
view.context?.let { UIDialogs.toast(it, "Device not ready, may be offline") }
} catch (e: Throwable) {
//Ignored
}
}
}
}
@@ -84,7 +93,7 @@ class DeviceViewHolder : ViewHolder {
}
_textName.text = d.name;
_imageOnline.visibility = if (isOnlineDevice) View.VISIBLE else View.GONE
_imageOnline.visibility = if (isOnlineDevice && d.isReady) View.VISIBLE else View.GONE
if (!d.isReady) {
_imageLoader.visibility = View.GONE;
@@ -37,9 +37,10 @@ class SubscriptionAdapter : RecyclerView.Adapter<SubscriptionViewHolder> {
_onDatasetChanged = onDatasetChanged;
StateSubscriptions.instance.onSubscriptionsChanged.subscribe { _, _ -> if(Looper.myLooper() != Looper.getMainLooper())
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { updateDataset() }
StateApp.instance.scopeOrNull?.launch(Dispatchers.Main) { updateDataset() }
else
updateDataset(); }
updateDataset();
}
updateDataset();
}
@@ -13,6 +13,17 @@ class RadioGroupView : FlexboxLayout {
val selectedOptions = arrayListOf<Any?>();
val onSelectedChange = Event1<List<Any?>>();
constructor(context: Context) : super(context) {
flexWrap = FlexWrap.WRAP;
_padding_px = TypedValue.applyDimension(TypedValue.COMPLEX_UNIT_DIP, _padding_dp, context.resources.displayMetrics).toInt();
if (isInEditMode) {
setOptions(listOf("Example 1" to 1, "Example 2" to 2, "Example 3" to 3, "Example 4" to 4, "Example 5" to 5), listOf("Example 1", "Example 2"),
multiSelect = true,
atLeastOne = false
);
}
}
constructor(context: Context, attrs: AttributeSet) : super(context, attrs) {
flexWrap = FlexWrap.WRAP;
_padding_px = TypedValue.applyDimension(TypedValue.COMPLEX_UNIT_DIP, _padding_dp, context.resources.displayMetrics).toInt();
+3 -3
View File
@@ -57,15 +57,15 @@
<ImageView
android:id="@+id/image_clear"
android:layout_width="16dp"
android:layout_height="16dp"
android:layout_width="36dp"
android:layout_height="36dp"
app:srcCompat="@drawable/ic_clear_16dp"
app:layout_constraintTop_toTopOf="parent"
app:layout_constraintRight_toRightOf="parent"
app:layout_constraintBottom_toBottomOf="parent"
android:layout_marginEnd="6dp"
android:layout_marginStart="6dp"
android:padding="2dp" />
android:padding="12dp" />
<TextView
android:id="@+id/text_name"
+4
View File
@@ -382,6 +382,10 @@
<string name="pair_through_relay_description">Allow devices to be paired through the relay</string>
<string name="connect_through_relay">Connection through relay</string>
<string name="connect_through_relay_description">Allow devices to be connected to through the relay</string>
<string name="connect_local_direct_through_relay">Connect direct through relay</string>
<string name="connect_local_direct_through_relay_description">Allow devices to be directly locally connected to through information discovered from the relay</string>
<string name="local_connections">Local connections</string>
<string name="local_connections_description">Allow device to be directly locally connected</string>
<string name="gesture_controls">Gesture controls</string>
<string name="volume_slider">Volume slider</string>
<string name="volume_slider_descr">Enable slide gesture to change volume</string>