mirror of
https://gitlab.futo.org/videostreaming/grayjay.git
synced 2026-05-16 13:02:39 +02:00
Compare commits
30 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e0b5e7b808 | |||
| ac3a8da002 | |||
| 1aa45c2156 | |||
| 3cf8abd409 | |||
| db8426779c | |||
| b419e033f3 | |||
| d686fa327b | |||
| a1ce5eda43 | |||
| 1e790d1aa9 | |||
| d1d304b758 | |||
| e12b500144 | |||
| bd77651a1e | |||
| 35dc186395 | |||
| 07e78e0d12 | |||
| 5b8905c1d2 | |||
| 158a27cbae | |||
| 5769b39d78 | |||
| 5c96262c75 | |||
| 766f57dc9d | |||
| 9986078582 | |||
| e047ab5684 | |||
| a100785ad7 | |||
| 156eb4d15e | |||
| dabcfd965f | |||
| d44a71f3be | |||
| f8edd6cf3d | |||
| 2baf53c5a4 | |||
| c26e9c281f | |||
| 9f78e9b7dd | |||
| fdaf41b605 |
@@ -399,9 +399,11 @@ fun String.matchesDomain(queryDomain: String): Boolean {
|
||||
|
||||
fun String.getSubdomainWildcardQuery(): String {
|
||||
val domainParts = this.split(".");
|
||||
val sldParts = "." + domainParts[domainParts.size - 2].lowercase() + "." + domainParts[domainParts.size - 1].lowercase();
|
||||
if(slds.contains(sldParts))
|
||||
return "." + domainParts.drop(domainParts.size - 3).joinToString(".");
|
||||
var wildcardDomain = if(domainParts.size > 2)
|
||||
"." + domainParts.drop(1).joinToString(".")
|
||||
else
|
||||
return "." + domainParts.drop(domainParts.size - 2).joinToString(".");
|
||||
"." + domainParts.joinToString(".");
|
||||
if(slds.contains(wildcardDomain.lowercase()))
|
||||
"." + domainParts.joinToString(".");
|
||||
return wildcardDomain;
|
||||
}
|
||||
@@ -7,6 +7,9 @@ import java.net.InetAddress
|
||||
import java.net.URI
|
||||
import java.net.URISyntaxException
|
||||
import java.net.URLEncoder
|
||||
import java.time.Instant
|
||||
import java.time.OffsetDateTime
|
||||
import java.time.ZoneOffset
|
||||
|
||||
//Syntax sugaring
|
||||
inline fun <reified T> Any.assume(): T?{
|
||||
@@ -33,13 +36,37 @@ fun Boolean?.toYesNo(): String {
|
||||
fun InetAddress?.toUrlAddress(): String {
|
||||
return when (this) {
|
||||
is Inet6Address -> {
|
||||
"[${hostAddress}]"
|
||||
val hostAddr = this.hostAddress ?: throw Exception("Invalid address: hostAddress is null")
|
||||
val index = hostAddr.indexOf('%')
|
||||
if (index != -1) {
|
||||
val addrPart = hostAddr.substring(0, index)
|
||||
val scopeId = hostAddr.substring(index + 1)
|
||||
"[${addrPart}%25${scopeId}]" // %25 is URL-encoded '%'
|
||||
} else {
|
||||
"[$hostAddr]"
|
||||
}
|
||||
}
|
||||
is Inet4Address -> {
|
||||
hostAddress
|
||||
this.hostAddress ?: throw Exception("Invalid address: hostAddress is null")
|
||||
}
|
||||
else -> {
|
||||
throw Exception("Invalid address type")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun Long?.sToOffsetDateTimeUTC(): OffsetDateTime {
|
||||
if (this == null || this < 0)
|
||||
return OffsetDateTime.MIN
|
||||
if(this > 4070912400)
|
||||
return OffsetDateTime.MAX;
|
||||
return OffsetDateTime.ofInstant(Instant.ofEpochSecond(this), ZoneOffset.UTC)
|
||||
}
|
||||
|
||||
fun Long?.msToOffsetDateTimeUTC(): OffsetDateTime {
|
||||
if (this == null || this < 0)
|
||||
return OffsetDateTime.MIN
|
||||
if(this > 4070912400)
|
||||
return OffsetDateTime.MAX;
|
||||
return OffsetDateTime.ofInstant(Instant.ofEpochMilli(this), ZoneOffset.UTC)
|
||||
}
|
||||
@@ -590,7 +590,7 @@ class Settings : FragmentedStorageFileJson() {
|
||||
|
||||
@FormField(R.string.allow_ipv6, FieldForm.TOGGLE, R.string.allow_ipv6_description, 4)
|
||||
@Serializable(with = FlexibleBooleanSerializer::class)
|
||||
var allowIpv6: Boolean = false;
|
||||
var allowIpv6: Boolean = true;
|
||||
|
||||
/*TODO: Should we have a different casting quality?
|
||||
@FormField("Preferred Casting Quality", FieldForm.DROPDOWN, "", 3)
|
||||
@@ -926,7 +926,7 @@ class Settings : FragmentedStorageFileJson() {
|
||||
@Serializable
|
||||
class Synchronization {
|
||||
@FormField(R.string.enabled, FieldForm.TOGGLE, R.string.enabled_description, 1)
|
||||
var enabled: Boolean = true;
|
||||
var enabled: Boolean = false;
|
||||
|
||||
@FormField(R.string.broadcast, FieldForm.TOGGLE, R.string.broadcast_description, 1)
|
||||
var broadcast: Boolean = false;
|
||||
@@ -945,6 +945,12 @@ class Settings : FragmentedStorageFileJson() {
|
||||
|
||||
@FormField(R.string.connect_through_relay, FieldForm.TOGGLE, R.string.connect_through_relay_description, 3)
|
||||
var connectThroughRelay: Boolean = true;
|
||||
|
||||
@FormField(R.string.connect_local_direct_through_relay, FieldForm.TOGGLE, R.string.connect_local_direct_through_relay_description, 3)
|
||||
var connectLocalDirectThroughRelay: Boolean = true;
|
||||
|
||||
@FormField(R.string.local_connections, FieldForm.TOGGLE, R.string.local_connections_description, 3)
|
||||
var localConnections: Boolean = true;
|
||||
}
|
||||
|
||||
@FormField(R.string.info, FieldForm.GROUP, -1, 21)
|
||||
|
||||
@@ -279,7 +279,7 @@ fun <T> findNewIndex(originalArr: List<T>, newArr: List<T>, item: T): Int{
|
||||
}
|
||||
}
|
||||
if(newIndex < 0)
|
||||
return originalArr.size;
|
||||
return newArr.size;
|
||||
else
|
||||
return newIndex;
|
||||
}
|
||||
|
||||
@@ -89,6 +89,14 @@ class SyncHomeActivity : AppCompatActivity() {
|
||||
updateEmptyVisibility()
|
||||
}
|
||||
}
|
||||
|
||||
StateSync.instance.confirmStarted(this, {
|
||||
StateSync.instance.showFailedToBindDialogIfNecessary(this@SyncHomeActivity)
|
||||
}, {
|
||||
finish()
|
||||
}, {
|
||||
StateSync.instance.showFailedToBindDialogIfNecessary(this@SyncHomeActivity)
|
||||
})
|
||||
}
|
||||
|
||||
override fun onDestroy() {
|
||||
|
||||
@@ -83,6 +83,7 @@ class SyncPairActivity : AppCompatActivity() {
|
||||
|
||||
_layoutPairingSuccess.setOnClickListener {
|
||||
_layoutPairingSuccess.visibility = View.GONE
|
||||
finish()
|
||||
}
|
||||
_layoutPairingError.setOnClickListener {
|
||||
_layoutPairingError.visibility = View.GONE
|
||||
@@ -111,9 +112,15 @@ class SyncPairActivity : AppCompatActivity() {
|
||||
try {
|
||||
StateSync.instance.connect(deviceInfo) { complete, message ->
|
||||
lifecycleScope.launch(Dispatchers.Main) {
|
||||
if (complete != null && complete) {
|
||||
_layoutPairingSuccess.visibility = View.VISIBLE
|
||||
_layoutPairing.visibility = View.GONE
|
||||
if (complete != null) {
|
||||
if (complete) {
|
||||
_layoutPairingSuccess.visibility = View.VISIBLE
|
||||
_layoutPairing.visibility = View.GONE
|
||||
} else {
|
||||
_textError.text = message
|
||||
_layoutPairingError.visibility = View.VISIBLE
|
||||
_layoutPairing.visibility = View.GONE
|
||||
}
|
||||
} else {
|
||||
_textPairingStatus.text = message
|
||||
}
|
||||
@@ -137,8 +144,6 @@ class SyncPairActivity : AppCompatActivity() {
|
||||
_textError.text = e.message
|
||||
_layoutPairing.visibility = View.GONE
|
||||
Logger.e(TAG, "Failed to pair", e)
|
||||
} finally {
|
||||
_layoutPairing.visibility = View.GONE
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -149,6 +149,7 @@ class AirPlayCastingDevice : CastingDevice {
|
||||
break;
|
||||
} catch (e: Throwable) {
|
||||
Logger.w(TAG, "Failed to get setup initial connection to AirPlay device.", e)
|
||||
delay(1000);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -322,6 +322,7 @@ class ChromecastCastingDevice : CastingDevice {
|
||||
break;
|
||||
} catch (e: Throwable) {
|
||||
Logger.w(TAG, "Failed to get setup initial connection to ChromeCast device.", e)
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ import com.futo.platformplayer.toInetAddress
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.serialization.encodeToString
|
||||
@@ -289,6 +290,7 @@ class FCastCastingDevice : CastingDevice {
|
||||
break;
|
||||
} catch (e: Throwable) {
|
||||
Logger.w(TAG, "Failed to get setup initial connection to FastCast device.", e)
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -45,6 +45,8 @@ import com.futo.platformplayer.logging.Logger
|
||||
import com.futo.platformplayer.models.CastingDeviceInfo
|
||||
import com.futo.platformplayer.parsers.HLS
|
||||
import com.futo.platformplayer.states.StateApp
|
||||
import com.futo.platformplayer.states.StateSync
|
||||
import com.futo.platformplayer.states.StateSync.Companion
|
||||
import com.futo.platformplayer.stores.CastingDeviceInfoStorage
|
||||
import com.futo.platformplayer.stores.FragmentedStorage
|
||||
import com.futo.platformplayer.toUrlAddress
|
||||
@@ -176,7 +178,11 @@ class StateCasting {
|
||||
fun stopDiscovering() {
|
||||
_nsdManager?.apply {
|
||||
_discoveryListeners.forEach {
|
||||
stopServiceDiscovery(it.value)
|
||||
try {
|
||||
stopServiceDiscovery(it.value)
|
||||
} catch (e: Throwable) {
|
||||
Logger.w(TAG, "Failed to stop service discovery", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -224,12 +230,20 @@ class StateCasting {
|
||||
|
||||
override fun onStartDiscoveryFailed(serviceType: String, errorCode: Int) {
|
||||
Log.e(TAG, "Discovery failed for $serviceType: Error code:$errorCode")
|
||||
_nsdManager?.stopServiceDiscovery(this)
|
||||
try {
|
||||
_nsdManager?.stopServiceDiscovery(this)
|
||||
} catch (e: Throwable) {
|
||||
Logger.w(TAG, "Failed to stop service discovery", e)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onStopDiscoveryFailed(serviceType: String, errorCode: Int) {
|
||||
Log.e(TAG, "Stop discovery failed for $serviceType: Error code:$errorCode")
|
||||
_nsdManager?.stopServiceDiscovery(this)
|
||||
try {
|
||||
_nsdManager?.stopServiceDiscovery(this)
|
||||
} catch (e: Throwable) {
|
||||
Logger.w(TAG, "Failed to stop service discovery", e)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onServiceFound(service: NsdServiceInfo) {
|
||||
|
||||
@@ -72,6 +72,10 @@ class PackageBridge : V8Package {
|
||||
fun buildSpecVersion(): Int {
|
||||
return JSClientConstants.PLUGIN_SPEC_VERSION;
|
||||
}
|
||||
@V8Property
|
||||
fun buildPlatform(): String {
|
||||
return "android";
|
||||
}
|
||||
|
||||
@V8Function
|
||||
fun dispose(value: V8Value) {
|
||||
|
||||
+23
@@ -2,9 +2,11 @@ package com.futo.platformplayer.fragment.mainactivity.main
|
||||
|
||||
import android.annotation.SuppressLint
|
||||
import android.os.Bundle
|
||||
import android.util.AttributeSet
|
||||
import android.view.LayoutInflater
|
||||
import android.view.View
|
||||
import android.view.ViewGroup
|
||||
import androidx.core.view.allViews
|
||||
import androidx.lifecycle.lifecycleScope
|
||||
import com.futo.platformplayer.Settings
|
||||
import com.futo.platformplayer.UIDialogs
|
||||
@@ -23,6 +25,8 @@ import com.futo.platformplayer.models.SearchType
|
||||
import com.futo.platformplayer.states.StateMeta
|
||||
import com.futo.platformplayer.states.StatePlatform
|
||||
import com.futo.platformplayer.views.FeedStyle
|
||||
import com.futo.platformplayer.views.ToggleBar
|
||||
import com.futo.platformplayer.views.others.RadioGroupView
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
@@ -114,6 +118,25 @@ class ContentSearchResultsFragment : MainFragment() {
|
||||
}
|
||||
|
||||
setPreviewsEnabled(Settings.instance.search.previewFeedItems);
|
||||
|
||||
initializeToolbar();
|
||||
}
|
||||
|
||||
fun initializeToolbar(){
|
||||
if(_toolbarContentView.allViews.any { it is RadioGroupView })
|
||||
_toolbarContentView.removeView(_toolbarContentView.allViews.find { it is RadioGroupView });
|
||||
|
||||
val radioGroup = RadioGroupView(context);
|
||||
radioGroup.onSelectedChange.subscribe {
|
||||
|
||||
if (it.size != 1)
|
||||
setSearchType(SearchType.VIDEO);
|
||||
else
|
||||
setSearchType((it[0] ?: SearchType.VIDEO) as SearchType);
|
||||
}
|
||||
radioGroup?.setOptions(listOf(Pair("Media", SearchType.VIDEO), Pair("Creators", SearchType.CREATOR), Pair("Playlists", SearchType.PLAYLIST)), listOf(_searchType), false, true)
|
||||
|
||||
_toolbarContentView.addView(radioGroup);
|
||||
}
|
||||
|
||||
override fun cleanup() {
|
||||
|
||||
@@ -29,7 +29,7 @@ data class ImageVariable(
|
||||
Glide.with(imageView)
|
||||
.load(bitmap)
|
||||
.into(imageView)
|
||||
} else if(resId != null) {
|
||||
} else if(resId != null && resId > 0) {
|
||||
Glide.with(imageView)
|
||||
.load(resId)
|
||||
.into(imageView)
|
||||
|
||||
@@ -113,7 +113,7 @@ class LoginWebViewClient : WebViewClient {
|
||||
//val domainParts = domain!!.split(".");
|
||||
//val cookieDomain = "." + domainParts.drop(domainParts.size - 2).joinToString(".");
|
||||
val cookieDomain = domain!!.getSubdomainWildcardQuery();
|
||||
if(_pluginConfig == null || _pluginConfig.allowUrls.any { it == "everywhere" || it.lowercase().matchesDomain(cookieDomain) })
|
||||
if(_pluginConfig == null || _pluginConfig.allowUrls.any { it == "everywhere" || domain.matchesDomain(it) })
|
||||
_authConfig.cookiesToFind?.let { cookiesToFind ->
|
||||
val cookies = cookieString.split(";");
|
||||
for(cookieStr in cookies) {
|
||||
|
||||
@@ -67,7 +67,7 @@ class WebViewRequirementExtractor {
|
||||
if(cookieString != null) {
|
||||
//val domainParts = domain!!.split(".");
|
||||
val cookieDomain = domain!!.getSubdomainWildcardQuery()//"." + domainParts.drop(domainParts.size - 2).joinToString(".");
|
||||
if(allowedUrls.any { it == "everywhere" || it.lowercase().matchesDomain(cookieDomain) })
|
||||
if(allowedUrls.any { it == "everywhere" || domain.matchesDomain(it) })
|
||||
cookiesToFind?.let { cookiesToFind ->
|
||||
val cookies = cookieString.split(";");
|
||||
for(cookieStr in cookies) {
|
||||
|
||||
@@ -29,6 +29,7 @@ import com.futo.platformplayer.activities.CaptchaActivity
|
||||
import com.futo.platformplayer.activities.IWithResultLauncher
|
||||
import com.futo.platformplayer.activities.MainActivity
|
||||
import com.futo.platformplayer.activities.SettingsActivity
|
||||
import com.futo.platformplayer.activities.SettingsActivity.Companion.settingsActivityClosed
|
||||
import com.futo.platformplayer.api.media.platforms.js.DevJSClient
|
||||
import com.futo.platformplayer.api.media.platforms.js.JSClient
|
||||
import com.futo.platformplayer.background.BackgroundWorker
|
||||
@@ -411,7 +412,27 @@ class StateApp {
|
||||
}
|
||||
|
||||
if (Settings.instance.synchronization.enabled) {
|
||||
StateSync.instance.start(context)
|
||||
StateSync.instance.start(context, {
|
||||
try {
|
||||
UIDialogs.toast("Failed to start sync, port in use")
|
||||
} catch (e: Throwable) {
|
||||
//Ignored
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
settingsActivityClosed.subscribe {
|
||||
if (Settings.instance.synchronization.enabled) {
|
||||
StateSync.instance.start(context, {
|
||||
try {
|
||||
UIDialogs.toast("Failed to start sync, port in use")
|
||||
} catch (e: Throwable) {
|
||||
//Ignored
|
||||
}
|
||||
})
|
||||
} else {
|
||||
StateSync.instance.stop()
|
||||
}
|
||||
}
|
||||
|
||||
Logger.onLogSubmitted.subscribe {
|
||||
@@ -707,6 +728,7 @@ class StateApp {
|
||||
|
||||
StatePlayer.instance.closeMediaSession();
|
||||
StateCasting.instance.stop();
|
||||
StateSync.instance.stop();
|
||||
StatePlayer.dispose();
|
||||
Companion.dispose();
|
||||
_fileLogConsumer?.close();
|
||||
|
||||
@@ -19,6 +19,7 @@ import com.futo.platformplayer.exceptions.ReconstructionException
|
||||
import com.futo.platformplayer.logging.Logger
|
||||
import com.futo.platformplayer.models.ImportCache
|
||||
import com.futo.platformplayer.models.Playlist
|
||||
import com.futo.platformplayer.sToOffsetDateTimeUTC
|
||||
import com.futo.platformplayer.smartMerge
|
||||
import com.futo.platformplayer.states.StateSubscriptionGroups.Companion
|
||||
import com.futo.platformplayer.stores.FragmentedStorage
|
||||
@@ -85,7 +86,7 @@ class StatePlaylists {
|
||||
if(value.isEmpty())
|
||||
return OffsetDateTime.MIN;
|
||||
val tryParse = value.toLongOrNull() ?: 0;
|
||||
return OffsetDateTime.ofInstant(Instant.ofEpochSecond(tryParse), ZoneOffset.UTC);
|
||||
return tryParse.sToOffsetDateTimeUTC();
|
||||
}
|
||||
private fun setWatchLaterReorderTime() {
|
||||
val now = OffsetDateTime.now(ZoneOffset.UTC);
|
||||
|
||||
@@ -7,6 +7,7 @@ import android.os.Build
|
||||
import android.util.Log
|
||||
import com.futo.platformplayer.LittleEndianDataInputStream
|
||||
import com.futo.platformplayer.LittleEndianDataOutputStream
|
||||
import com.futo.platformplayer.R
|
||||
import com.futo.platformplayer.Settings
|
||||
import com.futo.platformplayer.UIDialogs
|
||||
import com.futo.platformplayer.activities.MainActivity
|
||||
@@ -24,6 +25,7 @@ import com.futo.platformplayer.models.HistoryVideo
|
||||
import com.futo.platformplayer.models.Subscription
|
||||
import com.futo.platformplayer.noise.protocol.DHState
|
||||
import com.futo.platformplayer.noise.protocol.Noise
|
||||
import com.futo.platformplayer.sToOffsetDateTimeUTC
|
||||
import com.futo.platformplayer.smartMerge
|
||||
import com.futo.platformplayer.stores.FragmentedStorage
|
||||
import com.futo.platformplayer.stores.StringStringMapStorage
|
||||
@@ -65,6 +67,7 @@ import java.time.OffsetDateTime
|
||||
import java.time.ZoneOffset
|
||||
import java.util.Base64
|
||||
import java.util.Locale
|
||||
import kotlin.math.min
|
||||
import kotlin.system.measureTimeMillis
|
||||
|
||||
class StateSync {
|
||||
@@ -77,10 +80,11 @@ class StateSync {
|
||||
private var _serverSocket: ServerSocket? = null
|
||||
private var _thread: Thread? = null
|
||||
private var _connectThread: Thread? = null
|
||||
private var _started = false
|
||||
@Volatile private var _started = false
|
||||
private val _sessions: MutableMap<String, SyncSession> = mutableMapOf()
|
||||
private val _lastConnectTimesMdns: MutableMap<String, Long> = mutableMapOf()
|
||||
private val _lastConnectTimesIp: MutableMap<String, Long> = mutableMapOf()
|
||||
private var _serverStarted = false
|
||||
//TODO: Should sync mdns and casting mdns be merged?
|
||||
//TODO: Decrease interval that devices are updated
|
||||
//TODO: Send less data
|
||||
@@ -91,6 +95,117 @@ class StateSync {
|
||||
private var _threadRelay: Thread? = null
|
||||
private val _remotePendingStatusUpdate = mutableMapOf<String, (complete: Boolean?, message: String) -> Unit>()
|
||||
private var _nsdManager: NsdManager? = null
|
||||
private var _discoveryListener: NsdManager.DiscoveryListener = object : NsdManager.DiscoveryListener {
|
||||
override fun onDiscoveryStarted(regType: String) {
|
||||
Log.d(TAG, "Service discovery started for $regType")
|
||||
}
|
||||
|
||||
override fun onDiscoveryStopped(serviceType: String) {
|
||||
Log.i(TAG, "Discovery stopped: $serviceType")
|
||||
}
|
||||
|
||||
override fun onServiceLost(service: NsdServiceInfo) {
|
||||
Log.e(TAG, "service lost: $service")
|
||||
// TODO: Handle service lost, e.g., remove device
|
||||
}
|
||||
|
||||
override fun onStartDiscoveryFailed(serviceType: String, errorCode: Int) {
|
||||
Log.e(TAG, "Discovery failed for $serviceType: Error code:$errorCode")
|
||||
try {
|
||||
_nsdManager?.stopServiceDiscovery(this)
|
||||
} catch (e: Throwable) {
|
||||
Logger.w(TAG, "Failed to stop service discovery", e)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onStopDiscoveryFailed(serviceType: String, errorCode: Int) {
|
||||
Log.e(TAG, "Stop discovery failed for $serviceType: Error code:$errorCode")
|
||||
try {
|
||||
_nsdManager?.stopServiceDiscovery(this)
|
||||
} catch (e: Throwable) {
|
||||
Logger.w(TAG, "Failed to stop service discovery", e)
|
||||
}
|
||||
}
|
||||
|
||||
fun addOrUpdate(name: String, adrs: Array<InetAddress>, port: Int, attributes: Map<String, ByteArray>) {
|
||||
if (!Settings.instance.synchronization.connectDiscovered) {
|
||||
return
|
||||
}
|
||||
|
||||
val urlSafePkey = attributes.get("pk")?.decodeToString() ?: return
|
||||
val pkey = Base64.getEncoder().encodeToString(Base64.getDecoder().decode(urlSafePkey.replace('-', '+').replace('_', '/')))
|
||||
val syncDeviceInfo = SyncDeviceInfo(pkey, adrs.map { it.hostAddress }.toTypedArray(), port, null)
|
||||
val authorized = isAuthorized(pkey)
|
||||
|
||||
if (authorized && !isConnected(pkey)) {
|
||||
val now = System.currentTimeMillis()
|
||||
val lastConnectTime = synchronized(_lastConnectTimesMdns) {
|
||||
_lastConnectTimesMdns[pkey] ?: 0
|
||||
}
|
||||
|
||||
//Connect once every 30 seconds, max
|
||||
if (now - lastConnectTime > 30000) {
|
||||
synchronized(_lastConnectTimesMdns) {
|
||||
_lastConnectTimesMdns[pkey] = now
|
||||
}
|
||||
|
||||
Logger.i(TAG, "Found device authorized device '${name}' with pkey=$pkey, attempting to connect")
|
||||
|
||||
try {
|
||||
connect(syncDeviceInfo)
|
||||
} catch (e: Throwable) {
|
||||
Logger.i(TAG, "Failed to connect to $pkey", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun onServiceFound(service: NsdServiceInfo) {
|
||||
Log.v(TAG, "Service discovery success for ${service.serviceType}: $service")
|
||||
addOrUpdate(service.serviceName, if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
|
||||
service.hostAddresses.toTypedArray()
|
||||
} else {
|
||||
if(service.host != null)
|
||||
arrayOf(service.host);
|
||||
else
|
||||
arrayOf();
|
||||
}, service.port, service.attributes)
|
||||
|
||||
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
|
||||
_nsdManager?.registerServiceInfoCallback(service, { it.run() }, object : NsdManager.ServiceInfoCallback {
|
||||
override fun onServiceUpdated(serviceInfo: NsdServiceInfo) {
|
||||
Log.v(TAG, "onServiceUpdated: $serviceInfo")
|
||||
addOrUpdate(serviceInfo.serviceName, serviceInfo.hostAddresses.toTypedArray(), serviceInfo.port, serviceInfo.attributes)
|
||||
}
|
||||
|
||||
override fun onServiceLost() {
|
||||
Log.v(TAG, "onServiceLost: $service")
|
||||
// TODO: Handle service lost
|
||||
}
|
||||
|
||||
override fun onServiceInfoCallbackRegistrationFailed(errorCode: Int) {
|
||||
Log.v(TAG, "onServiceInfoCallbackRegistrationFailed: $errorCode")
|
||||
}
|
||||
|
||||
override fun onServiceInfoCallbackUnregistered() {
|
||||
Log.v(TAG, "onServiceInfoCallbackUnregistered")
|
||||
}
|
||||
})
|
||||
} else {
|
||||
_nsdManager?.resolveService(service, object : NsdManager.ResolveListener {
|
||||
override fun onResolveFailed(serviceInfo: NsdServiceInfo, errorCode: Int) {
|
||||
Log.v(TAG, "Resolve failed: $errorCode")
|
||||
}
|
||||
|
||||
override fun onServiceResolved(serviceInfo: NsdServiceInfo) {
|
||||
Log.v(TAG, "Resolve Succeeded: $serviceInfo")
|
||||
addOrUpdate(serviceInfo.serviceName, arrayOf(serviceInfo.host), serviceInfo.port, serviceInfo.attributes)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val _registrationListener = object : NsdManager.RegistrationListener {
|
||||
override fun onServiceRegistered(serviceInfo: NsdServiceInfo) {
|
||||
Log.v(TAG, "onServiceRegistered: ${serviceInfo.serviceName}")
|
||||
@@ -122,7 +237,7 @@ class StateSync {
|
||||
}
|
||||
}
|
||||
|
||||
fun start(context: Context) {
|
||||
fun start(context: Context, onServerBindFail: () -> Unit) {
|
||||
if (_started) {
|
||||
Logger.i(TAG, "Already started.")
|
||||
return
|
||||
@@ -132,108 +247,7 @@ class StateSync {
|
||||
|
||||
if (Settings.instance.synchronization.connectDiscovered) {
|
||||
_nsdManager?.apply {
|
||||
discoverServices("_gsync._tcp", NsdManager.PROTOCOL_DNS_SD, object : NsdManager.DiscoveryListener {
|
||||
override fun onDiscoveryStarted(regType: String) {
|
||||
Log.d(TAG, "Service discovery started for $regType")
|
||||
}
|
||||
|
||||
override fun onDiscoveryStopped(serviceType: String) {
|
||||
Log.i(TAG, "Discovery stopped: $serviceType")
|
||||
}
|
||||
|
||||
override fun onServiceLost(service: NsdServiceInfo) {
|
||||
Log.e(TAG, "service lost: $service")
|
||||
// TODO: Handle service lost, e.g., remove device
|
||||
}
|
||||
|
||||
override fun onStartDiscoveryFailed(serviceType: String, errorCode: Int) {
|
||||
Log.e(TAG, "Discovery failed for $serviceType: Error code:$errorCode")
|
||||
_nsdManager?.stopServiceDiscovery(this)
|
||||
}
|
||||
|
||||
override fun onStopDiscoveryFailed(serviceType: String, errorCode: Int) {
|
||||
Log.e(TAG, "Stop discovery failed for $serviceType: Error code:$errorCode")
|
||||
_nsdManager?.stopServiceDiscovery(this)
|
||||
}
|
||||
|
||||
fun addOrUpdate(name: String, adrs: Array<InetAddress>, port: Int, attributes: Map<String, ByteArray>) {
|
||||
if (!Settings.instance.synchronization.connectDiscovered) {
|
||||
return
|
||||
}
|
||||
|
||||
val urlSafePkey = attributes.get("pk")?.decodeToString() ?: return
|
||||
val pkey = Base64.getEncoder().encodeToString(Base64.getDecoder().decode(urlSafePkey.replace('-', '+').replace('_', '/')))
|
||||
val syncDeviceInfo = SyncDeviceInfo(pkey, adrs.map { it.hostAddress }.toTypedArray(), port, null)
|
||||
val authorized = isAuthorized(pkey)
|
||||
|
||||
if (authorized && !isConnected(pkey)) {
|
||||
val now = System.currentTimeMillis()
|
||||
val lastConnectTime = synchronized(_lastConnectTimesMdns) {
|
||||
_lastConnectTimesMdns[pkey] ?: 0
|
||||
}
|
||||
|
||||
//Connect once every 30 seconds, max
|
||||
if (now - lastConnectTime > 30000) {
|
||||
synchronized(_lastConnectTimesMdns) {
|
||||
_lastConnectTimesMdns[pkey] = now
|
||||
}
|
||||
|
||||
Logger.i(TAG, "Found device authorized device '${name}' with pkey=$pkey, attempting to connect")
|
||||
|
||||
try {
|
||||
connect(syncDeviceInfo)
|
||||
} catch (e: Throwable) {
|
||||
Logger.i(TAG, "Failed to connect to $pkey", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun onServiceFound(service: NsdServiceInfo) {
|
||||
Log.v(TAG, "Service discovery success for ${service.serviceType}: $service")
|
||||
addOrUpdate(service.serviceName, if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
|
||||
service.hostAddresses.toTypedArray()
|
||||
} else {
|
||||
if(service.host != null)
|
||||
arrayOf(service.host);
|
||||
else
|
||||
arrayOf();
|
||||
}, service.port, service.attributes)
|
||||
|
||||
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
|
||||
_nsdManager?.registerServiceInfoCallback(service, { it.run() }, object : NsdManager.ServiceInfoCallback {
|
||||
override fun onServiceUpdated(serviceInfo: NsdServiceInfo) {
|
||||
Log.v(TAG, "onServiceUpdated: $serviceInfo")
|
||||
addOrUpdate(serviceInfo.serviceName, serviceInfo.hostAddresses.toTypedArray(), serviceInfo.port, serviceInfo.attributes)
|
||||
}
|
||||
|
||||
override fun onServiceLost() {
|
||||
Log.v(TAG, "onServiceLost: $service")
|
||||
// TODO: Handle service lost
|
||||
}
|
||||
|
||||
override fun onServiceInfoCallbackRegistrationFailed(errorCode: Int) {
|
||||
Log.v(TAG, "onServiceInfoCallbackRegistrationFailed: $errorCode")
|
||||
}
|
||||
|
||||
override fun onServiceInfoCallbackUnregistered() {
|
||||
Log.v(TAG, "onServiceInfoCallbackUnregistered")
|
||||
}
|
||||
})
|
||||
} else {
|
||||
_nsdManager?.resolveService(service, object : NsdManager.ResolveListener {
|
||||
override fun onResolveFailed(serviceInfo: NsdServiceInfo, errorCode: Int) {
|
||||
Log.v(TAG, "Resolve failed: $errorCode")
|
||||
}
|
||||
|
||||
override fun onServiceResolved(serviceInfo: NsdServiceInfo) {
|
||||
Log.v(TAG, "Resolve Succeeded: $serviceInfo")
|
||||
addOrUpdate(serviceInfo.serviceName, arrayOf(serviceInfo.host), serviceInfo.port, serviceInfo.attributes)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
discoverServices("_gsync._tcp", NsdManager.PROTOCOL_DNS_SD, _discoveryListener)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -284,23 +298,31 @@ class StateSync {
|
||||
|
||||
Logger.i(TAG, "Sync key pair initialized (public key = ${publicKey})")
|
||||
|
||||
_thread = Thread {
|
||||
try {
|
||||
val serverSocket = ServerSocket(PORT)
|
||||
_serverSocket = serverSocket
|
||||
if (Settings.instance.synchronization.localConnections) {
|
||||
_serverStarted = true
|
||||
_thread = Thread {
|
||||
try {
|
||||
val serverSocket = ServerSocket(PORT)
|
||||
_serverSocket = serverSocket
|
||||
|
||||
Log.i(TAG, "Running on port ${PORT} (TCP)")
|
||||
Log.i(TAG, "Running on port ${PORT} (TCP)")
|
||||
|
||||
while (_started) {
|
||||
val socket = serverSocket.accept()
|
||||
val session = createSocketSession(socket, true)
|
||||
session.startAsResponder()
|
||||
while (_started) {
|
||||
val socket = serverSocket.accept()
|
||||
val session = createSocketSession(socket, true)
|
||||
session.startAsResponder()
|
||||
}
|
||||
} catch (e: Throwable) {
|
||||
_serverStarted = false
|
||||
Logger.e(TAG, "Failed to bind server socket to port ${PORT}", e)
|
||||
StateApp.instance.scopeOrNull?.launch(Dispatchers.Main) {
|
||||
onServerBindFail.invoke()
|
||||
}
|
||||
} finally {
|
||||
_serverStarted = false
|
||||
}
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Failed to bind server socket to port ${PORT}", e)
|
||||
UIDialogs.toast("Failed to start sync, port in use")
|
||||
}
|
||||
}.apply { start() }
|
||||
}.apply { start() }
|
||||
}
|
||||
|
||||
if (Settings.instance.synchronization.connectLast) {
|
||||
_connectThread = Thread {
|
||||
@@ -352,6 +374,9 @@ class StateSync {
|
||||
|
||||
if (Settings.instance.synchronization.discoverThroughRelay) {
|
||||
_threadRelay = Thread {
|
||||
var backoffs: Array<Long> = arrayOf(1000, 5000, 10000, 20000)
|
||||
var backoffIndex = 0;
|
||||
|
||||
while (_started) {
|
||||
try {
|
||||
Log.i(TAG, "Starting relay session...")
|
||||
@@ -361,8 +386,7 @@ class StateSync {
|
||||
_relaySession = SyncSocketSession(
|
||||
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
|
||||
keyPair!!,
|
||||
LittleEndianDataInputStream(socket.getInputStream()),
|
||||
LittleEndianDataOutputStream(socket.getOutputStream()),
|
||||
socket,
|
||||
isHandshakeAllowed = { linkType, syncSocketSession, publicKey, pairingCode, appId -> isHandshakeAllowed(linkType, syncSocketSession, publicKey, pairingCode, appId) },
|
||||
onNewChannel = { _, c ->
|
||||
val remotePublicKey = c.remotePublicKey
|
||||
@@ -398,6 +422,8 @@ class StateSync {
|
||||
},
|
||||
onClose = { socketClosed = true },
|
||||
onHandshakeComplete = { relaySession ->
|
||||
backoffIndex = 0
|
||||
|
||||
Thread {
|
||||
try {
|
||||
while (_started && !socketClosed) {
|
||||
@@ -407,12 +433,14 @@ class StateSync {
|
||||
|
||||
relaySession.publishConnectionInformation(unconnectedAuthorizedDevices, PORT, Settings.instance.synchronization.discoverThroughRelay, false, false, Settings.instance.synchronization.discoverThroughRelay && Settings.instance.synchronization.connectThroughRelay)
|
||||
|
||||
Logger.v(TAG, "Requesting ${unconnectedAuthorizedDevices.size} devices connection information")
|
||||
val connectionInfos = runBlocking { relaySession.requestBulkConnectionInfo(unconnectedAuthorizedDevices) }
|
||||
Logger.v(TAG, "Received ${connectionInfos.size} devices connection information")
|
||||
|
||||
for ((targetKey, connectionInfo) in connectionInfos) {
|
||||
val potentialLocalAddresses = connectionInfo.ipv4Addresses.union(connectionInfo.ipv6Addresses)
|
||||
.filter { it != connectionInfo.remoteIp }
|
||||
if (connectionInfo.allowLocalDirect) {
|
||||
if (connectionInfo.allowLocalDirect && Settings.instance.synchronization.connectLocalDirectThroughRelay) {
|
||||
Thread {
|
||||
try {
|
||||
Log.v(TAG, "Attempting to connect directly, locally to '$targetKey'.")
|
||||
@@ -433,10 +461,10 @@ class StateSync {
|
||||
|
||||
if (connectionInfo.allowRemoteRelayed && Settings.instance.synchronization.connectThroughRelay) {
|
||||
try {
|
||||
Log.v(TAG, "Attempting relayed connection with '$targetKey'.")
|
||||
Logger.v(TAG, "Attempting relayed connection with '$targetKey'.")
|
||||
runBlocking { relaySession.startRelayedChannel(targetKey, APP_ID, null) }
|
||||
} catch (e: Throwable) {
|
||||
Log.e(TAG, "Failed to start relayed channel with $targetKey.", e)
|
||||
Logger.e(TAG, "Failed to start relayed channel with $targetKey.", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -444,7 +472,7 @@ class StateSync {
|
||||
Thread.sleep(15000)
|
||||
}
|
||||
} catch (e: Throwable) {
|
||||
Log.e(TAG, "Unhandled exception in relay session.", e)
|
||||
Logger.e(TAG, "Unhandled exception in relay session.", e)
|
||||
relaySession.stop()
|
||||
}
|
||||
}.start()
|
||||
@@ -460,16 +488,39 @@ class StateSync {
|
||||
Log.i(TAG, "Started relay session.")
|
||||
} catch (e: Throwable) {
|
||||
Log.e(TAG, "Relay session failed.", e)
|
||||
Thread.sleep(5000)
|
||||
} finally {
|
||||
_relaySession?.stop()
|
||||
_relaySession = null
|
||||
Thread.sleep(backoffs[min(backoffs.size - 1, backoffIndex++)])
|
||||
}
|
||||
}
|
||||
}.apply { start() }
|
||||
}
|
||||
}
|
||||
|
||||
fun showFailedToBindDialogIfNecessary(context: Context) {
|
||||
if (!_serverStarted && Settings.instance.synchronization.localConnections) {
|
||||
try {
|
||||
UIDialogs.showDialogOk(context, R.drawable.ic_warning, "Local discovery unavailable, port was in use")
|
||||
} catch (e: Throwable) {
|
||||
//Ignored
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun confirmStarted(context: Context, onStarted: () -> Unit, onNotStarted: () -> Unit, onServerBindFail: () -> Unit) {
|
||||
if (!_started) {
|
||||
UIDialogs.showConfirmationDialog(context, "Sync has not been enabled yet, would you like to enable sync?", {
|
||||
Settings.instance.synchronization.enabled = true
|
||||
StateSync.instance.start(context, onServerBindFail)
|
||||
Settings.instance.save()
|
||||
onStarted.invoke()
|
||||
}, {
|
||||
onNotStarted.invoke()
|
||||
})
|
||||
} else {
|
||||
onStarted.invoke()
|
||||
}
|
||||
}
|
||||
|
||||
private fun getDeviceName(): String {
|
||||
@@ -547,7 +598,7 @@ class StateSync {
|
||||
added.map { it.channel.name }.joinToString("\n"));
|
||||
|
||||
|
||||
if(pack.subscriptions.isNotEmpty()) {
|
||||
if(pack.subscriptionRemovals.isNotEmpty()) {
|
||||
for (subRemoved in pack.subscriptionRemovals) {
|
||||
val removed = StateSubscriptions.instance.applySubscriptionRemovals(pack.subscriptionRemovals);
|
||||
if(removed.size > 3) {
|
||||
@@ -585,16 +636,33 @@ class StateSync {
|
||||
|
||||
Logger.i(TAG, "Received SyncSessionData from $remotePublicKey");
|
||||
|
||||
val subscriptionPackageString = StateSubscriptions.instance.getSyncSubscriptionsPackageString()
|
||||
Logger.i(TAG, "syncStateExchange syncSubscriptions b (size: ${subscriptionPackageString.length})")
|
||||
session.sendData(GJSyncOpcodes.syncSubscriptions, subscriptionPackageString);
|
||||
Logger.i(TAG, "syncStateExchange syncSubscriptions (size: ${subscriptionPackageString.length})")
|
||||
|
||||
session.sendData(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString());
|
||||
session.sendData(GJSyncOpcodes.syncSubscriptionGroups, StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString());
|
||||
session.sendData(GJSyncOpcodes.syncPlaylists, StatePlaylists.instance.getSyncPlaylistsPackageString())
|
||||
val subscriptionGroupPackageString = StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString()
|
||||
Logger.i(TAG, "syncStateExchange syncSubscriptionGroups b (size: ${subscriptionGroupPackageString.length})")
|
||||
session.sendData(GJSyncOpcodes.syncSubscriptionGroups, subscriptionGroupPackageString);
|
||||
Logger.i(TAG, "syncStateExchange syncSubscriptionGroups (size: ${subscriptionGroupPackageString.length})")
|
||||
|
||||
session.sendData(GJSyncOpcodes.syncWatchLater, Json.encodeToString(StatePlaylists.instance.getWatchLaterSyncPacket(false)));
|
||||
val syncPlaylistPackageString = StatePlaylists.instance.getSyncPlaylistsPackageString()
|
||||
Logger.i(TAG, "syncStateExchange syncPlaylists b (size: ${syncPlaylistPackageString.length})")
|
||||
session.sendData(GJSyncOpcodes.syncPlaylists, syncPlaylistPackageString)
|
||||
Logger.i(TAG, "syncStateExchange syncPlaylists (size: ${syncPlaylistPackageString.length})")
|
||||
|
||||
val watchLaterPackageString = Json.encodeToString(StatePlaylists.instance.getWatchLaterSyncPacket(false))
|
||||
Logger.i(TAG, "syncStateExchange syncWatchLater b (size: ${watchLaterPackageString.length})")
|
||||
session.sendData(GJSyncOpcodes.syncWatchLater, watchLaterPackageString);
|
||||
Logger.i(TAG, "syncStateExchange syncWatchLater (size: ${watchLaterPackageString.length})")
|
||||
|
||||
val recentHistory = StateHistory.instance.getRecentHistory(syncSessionData.lastHistory);
|
||||
|
||||
Logger.i(TAG, "syncStateExchange syncHistory b (size: ${recentHistory.size})")
|
||||
if(recentHistory.isNotEmpty())
|
||||
session.sendJsonData(GJSyncOpcodes.syncHistory, recentHistory);
|
||||
|
||||
Logger.i(TAG, "syncStateExchange syncHistory (size: ${recentHistory.size})")
|
||||
}
|
||||
|
||||
GJSyncOpcodes.syncExport -> {
|
||||
@@ -627,12 +695,14 @@ class StateSync {
|
||||
val subPackage = Serializer.json.decodeFromString<SyncSubscriptionsPackage>(json);
|
||||
handleSyncSubscriptionPackage(session, subPackage);
|
||||
|
||||
val newestSub = subPackage.subscriptions.maxOf { it.creationTime };
|
||||
if(subPackage.subscriptions.size > 0) {
|
||||
val newestSub = subPackage.subscriptions.maxOf { it.creationTime };
|
||||
|
||||
val sesData = getSyncSessionData(remotePublicKey);
|
||||
if(newestSub > sesData.lastSubscription) {
|
||||
sesData.lastSubscription = newestSub;
|
||||
saveSyncSessionData(sesData);
|
||||
val sesData = getSyncSessionData(remotePublicKey);
|
||||
if (newestSub > sesData.lastSubscription) {
|
||||
sesData.lastSubscription = newestSub;
|
||||
saveSyncSessionData(sesData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -662,7 +732,7 @@ class StateSync {
|
||||
}
|
||||
for(removal in pack.groupRemovals) {
|
||||
val creation = StateSubscriptionGroups.instance.getSubscriptionGroup(removal.key);
|
||||
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value, 0), ZoneOffset.UTC);
|
||||
val removalTime = removal.value.sToOffsetDateTimeUTC();
|
||||
if(creation != null && creation.creationTime < removalTime)
|
||||
StateSubscriptionGroups.instance.deleteSubscriptionGroup(removal.key, false);
|
||||
}
|
||||
@@ -690,7 +760,7 @@ class StateSync {
|
||||
}
|
||||
for(removal in pack.playlistRemovals) {
|
||||
val creation = StatePlaylists.instance.getPlaylist(removal.key);
|
||||
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value, 0), ZoneOffset.UTC);
|
||||
val removalTime = removal.value.sToOffsetDateTimeUTC();
|
||||
if(creation != null && creation.dateCreation < removalTime)
|
||||
StatePlaylists.instance.removePlaylist(creation, false);
|
||||
|
||||
@@ -708,9 +778,9 @@ class StateSync {
|
||||
val allExisting = StatePlaylists.instance.getWatchLater();
|
||||
for(video in pack.videos) {
|
||||
val existing = allExisting.firstOrNull { it.url == video.url };
|
||||
val time = if(pack.videoAdds != null && pack.videoAdds.containsKey(video.url)) OffsetDateTime.ofInstant(Instant.ofEpochSecond(pack.videoAdds[video.url] ?: 0), ZoneOffset.UTC) else OffsetDateTime.MIN;
|
||||
|
||||
if(existing == null) {
|
||||
val time = if(pack.videoAdds != null && pack.videoAdds.containsKey(video.url)) (pack.videoAdds[video.url] ?: 0).sToOffsetDateTimeUTC() else OffsetDateTime.MIN;
|
||||
val removalTime = StatePlaylists.instance.getWatchLaterRemovalTime(video.url) ?: OffsetDateTime.MIN;
|
||||
if(existing == null && time > removalTime) {
|
||||
StatePlaylists.instance.addToWatchLater(video, false);
|
||||
if(time > OffsetDateTime.MIN)
|
||||
StatePlaylists.instance.setWatchLaterAddTime(video.url, time);
|
||||
@@ -719,12 +789,12 @@ class StateSync {
|
||||
for(removal in pack.videoRemovals) {
|
||||
val watchLater = allExisting.firstOrNull { it.url == removal.key } ?: continue;
|
||||
val creation = StatePlaylists.instance.getWatchLaterRemovalTime(watchLater.url) ?: OffsetDateTime.MIN;
|
||||
val removalTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(removal.value), ZoneOffset.UTC);
|
||||
val removalTime = removal.value.sToOffsetDateTimeUTC()
|
||||
if(creation < removalTime)
|
||||
StatePlaylists.instance.removeFromWatchLater(watchLater, false, removalTime);
|
||||
}
|
||||
|
||||
val packReorderTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(pack.reorderTime), ZoneOffset.UTC);
|
||||
val packReorderTime = pack.reorderTime.sToOffsetDateTimeUTC()
|
||||
val localReorderTime = StatePlaylists.instance.getWatchLaterLastReorderTime();
|
||||
if(localReorderTime < packReorderTime && pack.ordering != null) {
|
||||
StatePlaylists.instance.updateWatchLaterOrdering(smartMerge(pack.ordering!!, StatePlaylists.instance.getWatchLaterOrdering()), true);
|
||||
@@ -737,6 +807,9 @@ class StateSync {
|
||||
val json = String(dataBody, Charsets.UTF_8);
|
||||
val history = Serializer.json.decodeFromString<List<HistoryVideo>>(json);
|
||||
Logger.i(TAG, "SyncHistory received ${history.size} videos from ${remotePublicKey}");
|
||||
if (history.size == 1) {
|
||||
Logger.i(TAG, "SyncHistory received update video '${history[0].video.name}' (url: ${history[0].video.url}) at timestamp ${history[0].position}");
|
||||
}
|
||||
|
||||
var lastHistory = OffsetDateTime.MIN;
|
||||
for(video in history){
|
||||
@@ -758,22 +831,15 @@ class StateSync {
|
||||
}
|
||||
}
|
||||
|
||||
private fun onAuthorized(remotePublicKey: String) {
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(true, "Authorized")
|
||||
}
|
||||
}
|
||||
|
||||
private fun onUnuthorized(remotePublicKey: String) {
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(false, "Unauthorized")
|
||||
}
|
||||
}
|
||||
|
||||
private fun createNewSyncSession(remotePublicKey: String, remoteDeviceName: String?): SyncSession {
|
||||
private fun createNewSyncSession(rpk: String, remoteDeviceName: String?): SyncSession {
|
||||
val remotePublicKey = rpk.base64ToByteArray().toBase64()
|
||||
return SyncSession(
|
||||
remotePublicKey,
|
||||
onAuthorized = { it, isNewlyAuthorized, isNewSession ->
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(true, "Authorized")
|
||||
}
|
||||
|
||||
if (!isNewSession) {
|
||||
return@SyncSession
|
||||
}
|
||||
@@ -785,7 +851,6 @@ class StateSync {
|
||||
}
|
||||
|
||||
Logger.i(TAG, "$remotePublicKey authorized (name: ${it.displayName})")
|
||||
onAuthorized(remotePublicKey)
|
||||
_authorizedDevices.addDistinct(remotePublicKey)
|
||||
_authorizedDevices.save()
|
||||
deviceUpdatedOrAdded.emit(it.remotePublicKey, it)
|
||||
@@ -793,10 +858,12 @@ class StateSync {
|
||||
checkForSync(it);
|
||||
},
|
||||
onUnauthorized = {
|
||||
unauthorize(remotePublicKey)
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate.remove(remotePublicKey)?.invoke(false, "Unauthorized")
|
||||
}
|
||||
|
||||
unauthorize(remotePublicKey)
|
||||
Logger.i(TAG, "$remotePublicKey unauthorized (name: ${it.displayName})")
|
||||
onUnuthorized(remotePublicKey)
|
||||
|
||||
synchronized(_sessions) {
|
||||
it.close()
|
||||
@@ -822,7 +889,17 @@ class StateSync {
|
||||
}
|
||||
},
|
||||
dataHandler = { it, opcode, subOpcode, data ->
|
||||
handleData(it, opcode, subOpcode, data)
|
||||
val dataCopy = ByteArray(data.remaining())
|
||||
data.get(dataCopy)
|
||||
|
||||
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) {
|
||||
try {
|
||||
handleData(it, opcode, subOpcode, ByteBuffer.wrap(dataCopy))
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Exception occurred while handling data, closing session", e)
|
||||
it.close()
|
||||
}
|
||||
}
|
||||
},
|
||||
remoteDeviceName
|
||||
)
|
||||
@@ -857,8 +934,7 @@ class StateSync {
|
||||
return SyncSocketSession(
|
||||
(socket.remoteSocketAddress as InetSocketAddress).address.hostAddress!!,
|
||||
keyPair!!,
|
||||
LittleEndianDataInputStream(socket.getInputStream()),
|
||||
LittleEndianDataOutputStream(socket.getOutputStream()),
|
||||
socket,
|
||||
onClose = { s ->
|
||||
if (channelSocket != null)
|
||||
session?.removeChannel(channelSocket!!)
|
||||
@@ -996,19 +1072,31 @@ class StateSync {
|
||||
|
||||
fun stop() {
|
||||
_started = false
|
||||
_nsdManager?.unregisterService(_registrationListener)
|
||||
|
||||
try {
|
||||
_nsdManager?.stopServiceDiscovery(_discoveryListener)
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Failed to stop discovery listener", e)
|
||||
}
|
||||
|
||||
try {
|
||||
_nsdManager?.unregisterService(_registrationListener)
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Failed to unregister service", e)
|
||||
}
|
||||
|
||||
_relaySession?.stop()
|
||||
_serverSocket?.close()
|
||||
_serverSocket = null
|
||||
|
||||
_thread?.interrupt()
|
||||
_thread = null
|
||||
_connectThread?.interrupt()
|
||||
_connectThread = null
|
||||
_threadRelay?.interrupt()
|
||||
_threadRelay = null
|
||||
synchronized(_sessions) {
|
||||
_sessions.values.forEach { it.close() }
|
||||
_sessions.clear()
|
||||
}
|
||||
|
||||
_relaySession?.stop()
|
||||
_thread = null
|
||||
_connectThread = null
|
||||
_threadRelay = null
|
||||
_relaySession = null
|
||||
}
|
||||
|
||||
@@ -1024,7 +1112,7 @@ class StateSync {
|
||||
runBlocking {
|
||||
if (onStatusUpdate != null) {
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate[deviceInfo.publicKey] = onStatusUpdate
|
||||
_remotePendingStatusUpdate[deviceInfo.publicKey.base64ToByteArray().toBase64()] = onStatusUpdate
|
||||
}
|
||||
}
|
||||
relaySession.startRelayedChannel(deviceInfo.publicKey, APP_ID, deviceInfo.pairingCode)
|
||||
@@ -1043,7 +1131,7 @@ class StateSync {
|
||||
val session = createSocketSession(socket, false)
|
||||
if (onStatusUpdate != null) {
|
||||
synchronized(_remotePendingStatusUpdate) {
|
||||
_remotePendingStatusUpdate[publicKey] = onStatusUpdate
|
||||
_remotePendingStatusUpdate[publicKey.base64ToByteArray().toBase64()] = onStatusUpdate
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,9 +5,11 @@ import com.futo.platformplayer.noise.protocol.CipherStatePair
|
||||
import com.futo.platformplayer.noise.protocol.DHState
|
||||
import com.futo.platformplayer.noise.protocol.HandshakeState
|
||||
import com.futo.platformplayer.states.StateSync
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.ByteOrder
|
||||
import java.util.Base64
|
||||
import java.util.zip.GZIPOutputStream
|
||||
|
||||
interface IChannel : AutoCloseable {
|
||||
val remotePublicKey: String?
|
||||
@@ -15,7 +17,7 @@ interface IChannel : AutoCloseable {
|
||||
var authorizable: IAuthorizable?
|
||||
var syncSession: SyncSession?
|
||||
fun setDataHandler(onData: ((SyncSocketSession, IChannel, UByte, UByte, ByteBuffer) -> Unit)?)
|
||||
fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null)
|
||||
fun send(opcode: UByte, subOpcode: UByte = 0u, data: ByteBuffer? = null, contentEncoding: ContentEncoding? = null)
|
||||
fun setCloseHandler(onClose: ((IChannel) -> Unit)?)
|
||||
val linkType: LinkType
|
||||
}
|
||||
@@ -49,9 +51,9 @@ class ChannelSocket(private val session: SyncSocketSession) : IChannel {
|
||||
onData?.invoke(session, this, opcode, subOpcode, data)
|
||||
}
|
||||
|
||||
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?) {
|
||||
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, contentEncoding: ContentEncoding?) {
|
||||
if (data != null) {
|
||||
session.send(opcode, subOpcode, data)
|
||||
session.send(opcode, subOpcode, data, contentEncoding)
|
||||
} else {
|
||||
session.send(opcode, subOpcode)
|
||||
}
|
||||
@@ -183,51 +185,70 @@ class ChannelRelayed(
|
||||
}
|
||||
}
|
||||
|
||||
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?) {
|
||||
override fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer?, ce: ContentEncoding?) {
|
||||
throwIfDisposed()
|
||||
|
||||
val actualCount = data?.remaining() ?: 0
|
||||
var contentEncoding: ContentEncoding? = ce
|
||||
var processedData = data
|
||||
if (data != null && contentEncoding == ContentEncoding.Gzip) {
|
||||
val isGzipSupported = opcode == Opcode.DATA.value
|
||||
if (isGzipSupported) {
|
||||
val compressedStream = ByteArrayOutputStream()
|
||||
GZIPOutputStream(compressedStream).use { gzipStream ->
|
||||
gzipStream.write(data.array(), data.position(), data.remaining())
|
||||
gzipStream.finish()
|
||||
}
|
||||
processedData = ByteBuffer.wrap(compressedStream.toByteArray())
|
||||
} else {
|
||||
Logger.w(TAG, "Gzip requested but not supported on this (opcode = ${opcode}, subOpcode = ${subOpcode}), falling back.")
|
||||
contentEncoding = ContentEncoding.Raw
|
||||
}
|
||||
}
|
||||
|
||||
val ENCRYPTION_OVERHEAD = 16
|
||||
val CONNECTION_ID_SIZE = 8
|
||||
val HEADER_SIZE = 6
|
||||
val HEADER_SIZE = 7
|
||||
val MAX_DATA_PER_PACKET = SyncSocketSession.MAXIMUM_PACKET_SIZE - HEADER_SIZE - CONNECTION_ID_SIZE - ENCRYPTION_OVERHEAD - 16
|
||||
|
||||
if (actualCount > MAX_DATA_PER_PACKET && data != null) {
|
||||
Logger.v(TAG, "Send (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.size: ${processedData?.remaining()})")
|
||||
|
||||
if (processedData != null && processedData.remaining() > MAX_DATA_PER_PACKET) {
|
||||
val streamId = session.generateStreamId()
|
||||
val totalSize = actualCount
|
||||
var sendOffset = 0
|
||||
|
||||
while (sendOffset < totalSize) {
|
||||
val bytesRemaining = totalSize - sendOffset
|
||||
val bytesToSend = minOf(MAX_DATA_PER_PACKET - 8 - 2, bytesRemaining)
|
||||
while (sendOffset < processedData.remaining()) {
|
||||
val bytesRemaining = processedData.remaining() - sendOffset
|
||||
val bytesToSend = minOf(MAX_DATA_PER_PACKET - 8 - HEADER_SIZE + 4, bytesRemaining)
|
||||
|
||||
val streamData: ByteArray
|
||||
val streamOpcode: StreamOpcode
|
||||
if (sendOffset == 0) {
|
||||
streamOpcode = StreamOpcode.START
|
||||
streamData = ByteArray(4 + 4 + 1 + 1 + bytesToSend)
|
||||
streamData = ByteArray(4 + HEADER_SIZE + bytesToSend)
|
||||
ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply {
|
||||
putInt(streamId)
|
||||
putInt(totalSize)
|
||||
putInt(processedData.remaining())
|
||||
put(opcode.toByte())
|
||||
put(subOpcode.toByte())
|
||||
put(data.array(), data.position() + sendOffset, bytesToSend)
|
||||
put(contentEncoding?.value?.toByte() ?: 0.toByte())
|
||||
put(processedData.array(), processedData.position() + sendOffset, bytesToSend)
|
||||
}
|
||||
} else {
|
||||
streamData = ByteArray(4 + 4 + bytesToSend)
|
||||
ByteBuffer.wrap(streamData).order(ByteOrder.LITTLE_ENDIAN).apply {
|
||||
putInt(streamId)
|
||||
putInt(sendOffset)
|
||||
put(data.array(), data.position() + sendOffset, bytesToSend)
|
||||
put(processedData.array(), processedData.position() + sendOffset, bytesToSend)
|
||||
}
|
||||
streamOpcode = if (bytesToSend < bytesRemaining) StreamOpcode.DATA else StreamOpcode.END
|
||||
}
|
||||
|
||||
val fullPacket = ByteArray(HEADER_SIZE + streamData.size)
|
||||
ByteBuffer.wrap(fullPacket).order(ByteOrder.LITTLE_ENDIAN).apply {
|
||||
putInt(streamData.size + 2)
|
||||
putInt(streamData.size + HEADER_SIZE - 4)
|
||||
put(Opcode.STREAM.value.toByte())
|
||||
put(streamOpcode.value.toByte())
|
||||
put(ContentEncoding.Raw.value.toByte())
|
||||
put(streamData)
|
||||
}
|
||||
|
||||
@@ -235,12 +256,13 @@ class ChannelRelayed(
|
||||
sendOffset += bytesToSend
|
||||
}
|
||||
} else {
|
||||
val packet = ByteArray(HEADER_SIZE + actualCount)
|
||||
val packet = ByteArray(HEADER_SIZE + (processedData?.remaining() ?: 0))
|
||||
ByteBuffer.wrap(packet).order(ByteOrder.LITTLE_ENDIAN).apply {
|
||||
putInt(actualCount + 2)
|
||||
putInt((processedData?.remaining() ?: 0) + HEADER_SIZE - 4)
|
||||
put(opcode.toByte())
|
||||
put(subOpcode.toByte())
|
||||
if (actualCount > 0 && data != null) put(data.array(), data.position(), actualCount)
|
||||
put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte())
|
||||
if (processedData != null && processedData.remaining() > 0) put(processedData.array(), processedData.position(), processedData.remaining())
|
||||
}
|
||||
sendPacket(packet)
|
||||
}
|
||||
@@ -333,4 +355,8 @@ class ChannelRelayed(
|
||||
completeHandshake(remoteVersion, transport)
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
private val TAG = "Channel"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package com.futo.platformplayer.sync.internal
|
||||
|
||||
enum class ContentEncoding(val value: UByte) {
|
||||
Raw(0u),
|
||||
Gzip(1u)
|
||||
}
|
||||
@@ -129,9 +129,9 @@ class SyncSession : IAuthorizable {
|
||||
|
||||
fun close() {
|
||||
synchronized(_channels) {
|
||||
_channels.forEach { it.close() }
|
||||
_channels.clear()
|
||||
}
|
||||
_channels.toTypedArray()
|
||||
}.forEach { it.close() }
|
||||
|
||||
_onClose(this)
|
||||
}
|
||||
|
||||
@@ -196,14 +196,14 @@ class SyncSession : IAuthorizable {
|
||||
}
|
||||
|
||||
fun sendData(subOpcode: UByte, data: String) {
|
||||
send(Opcode.DATA.value, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)))
|
||||
send(Opcode.DATA.value, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)), ContentEncoding.Gzip)
|
||||
}
|
||||
|
||||
fun send(opcode: UByte, subOpcode: UByte, data: String) {
|
||||
send(opcode, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)))
|
||||
send(opcode, subOpcode, ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)), ContentEncoding.Gzip)
|
||||
}
|
||||
|
||||
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer? = null) {
|
||||
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer? = null, contentEncoding: ContentEncoding? = null) {
|
||||
val channels = synchronized(_channels) { _channels.sortedBy { it.linkType.ordinal }.toList() }
|
||||
if (channels.isEmpty()) {
|
||||
//TODO: Should this throw?
|
||||
@@ -214,11 +214,13 @@ class SyncSession : IAuthorizable {
|
||||
var sent = false
|
||||
for (channel in channels) {
|
||||
try {
|
||||
channel.send(opcode, subOpcode, data)
|
||||
channel.send(opcode, subOpcode, data, contentEncoding)
|
||||
sent = true
|
||||
break
|
||||
} catch (e: Throwable) {
|
||||
Logger.w(TAG, "Packet failed to send (opcode = $opcode, subOpcode = $subOpcode)", e)
|
||||
Logger.w(TAG, "Packet failed to send (opcode = $opcode, subOpcode = $subOpcode), closing channel", e)
|
||||
channel.close()
|
||||
removeChannel(channel)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,32 +3,46 @@ package com.futo.platformplayer.sync.internal
|
||||
import android.os.Build
|
||||
import com.futo.platformplayer.LittleEndianDataInputStream
|
||||
import com.futo.platformplayer.LittleEndianDataOutputStream
|
||||
import com.futo.platformplayer.copyToOutputStream
|
||||
import com.futo.platformplayer.ensureNotMainThread
|
||||
import com.futo.platformplayer.logging.Logger
|
||||
import com.futo.platformplayer.noise.protocol.CipherStatePair
|
||||
import com.futo.platformplayer.noise.protocol.DHState
|
||||
import com.futo.platformplayer.noise.protocol.HandshakeState
|
||||
import com.futo.platformplayer.states.StateSync
|
||||
import com.futo.platformplayer.sync.internal.ChannelRelayed.Companion
|
||||
import com.futo.polycentric.core.base64ToByteArray
|
||||
import com.futo.polycentric.core.toBase64
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.InputStream
|
||||
import java.io.OutputStream
|
||||
import java.net.Inet4Address
|
||||
import java.net.Inet6Address
|
||||
import java.net.InetAddress
|
||||
import java.net.NetworkInterface
|
||||
import java.net.Socket
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.ByteOrder
|
||||
import java.util.Base64
|
||||
import java.util.Locale
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.zip.GZIPInputStream
|
||||
import java.util.zip.GZIPOutputStream
|
||||
import kotlin.math.min
|
||||
import kotlin.system.measureTimeMillis
|
||||
import kotlin.time.measureTime
|
||||
|
||||
class SyncSocketSession {
|
||||
private val _inputStream: LittleEndianDataInputStream
|
||||
private val _outputStream: LittleEndianDataOutputStream
|
||||
private val _socket: Socket
|
||||
private val _inputStream: InputStream
|
||||
private val _outputStream: OutputStream
|
||||
private val _sendLockObject = Object()
|
||||
private val _buffer = ByteArray(MAXIMUM_PACKET_SIZE_ENCRYPTED)
|
||||
private val _bufferDecrypted = ByteArray(MAXIMUM_PACKET_SIZE)
|
||||
private val _sendBuffer = ByteArray(MAXIMUM_PACKET_SIZE)
|
||||
private val _sendBufferEncrypted = ByteArray(MAXIMUM_PACKET_SIZE_ENCRYPTED)
|
||||
private val _sendBufferEncrypted = ByteArray(4 + MAXIMUM_PACKET_SIZE_ENCRYPTED)
|
||||
private val _syncStreams = hashMapOf<Int, SyncStream>()
|
||||
private var _streamIdGenerator = 0
|
||||
private val _streamIdGeneratorLock = Object()
|
||||
@@ -81,8 +95,7 @@ class SyncSocketSession {
|
||||
constructor(
|
||||
remoteAddress: String,
|
||||
localKeyPair: DHState,
|
||||
inputStream: LittleEndianDataInputStream,
|
||||
outputStream: LittleEndianDataOutputStream,
|
||||
socket: Socket,
|
||||
onClose: ((session: SyncSocketSession) -> Unit)? = null,
|
||||
onHandshakeComplete: ((session: SyncSocketSession) -> Unit)? = null,
|
||||
onData: ((session: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit)? = null,
|
||||
@@ -90,8 +103,12 @@ class SyncSocketSession {
|
||||
onChannelEstablished: ((session: SyncSocketSession, channel: ChannelRelayed, isResponder: Boolean) -> Unit)? = null,
|
||||
isHandshakeAllowed: ((linkType: LinkType, session: SyncSocketSession, remotePublicKey: String, pairingCode: String?, appId: UInt) -> Boolean)? = null
|
||||
) {
|
||||
_inputStream = inputStream
|
||||
_outputStream = outputStream
|
||||
_socket = socket
|
||||
_socket.receiveBufferSize = MAXIMUM_PACKET_SIZE_ENCRYPTED
|
||||
_socket.sendBufferSize = MAXIMUM_PACKET_SIZE_ENCRYPTED
|
||||
_socket.tcpNoDelay = true
|
||||
_inputStream = _socket.getInputStream()
|
||||
_outputStream = _socket.getOutputStream()
|
||||
_onClose = onClose
|
||||
_onHandshakeComplete = onHandshakeComplete
|
||||
_localKeyPair = localKeyPair
|
||||
@@ -150,30 +167,45 @@ class SyncSocketSession {
|
||||
}.apply { start() }
|
||||
}
|
||||
|
||||
private fun readExact(buffer: ByteArray, offset: Int, size: Int) {
|
||||
var totalBytesReceived: Int = 0
|
||||
while (totalBytesReceived < size) {
|
||||
val bytesReceived = _inputStream.read(buffer, offset + totalBytesReceived, size - totalBytesReceived)
|
||||
if (bytesReceived <= 0)
|
||||
throw Exception("Socket disconnected")
|
||||
totalBytesReceived += bytesReceived
|
||||
}
|
||||
}
|
||||
|
||||
private fun receiveLoop() {
|
||||
while (_started) {
|
||||
try {
|
||||
val messageSize = _inputStream.readInt()
|
||||
//Logger.v(TAG, "Waiting for message size...")
|
||||
|
||||
readExact(_buffer, 0, 4)
|
||||
val messageSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
|
||||
|
||||
//Logger.v(TAG, "Read message size ${messageSize}.")
|
||||
|
||||
if (messageSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) {
|
||||
throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)")
|
||||
}
|
||||
|
||||
//Logger.i(TAG, "Receiving message (size = ${messageSize})")
|
||||
|
||||
var bytesRead = 0
|
||||
while (bytesRead < messageSize) {
|
||||
val read = _inputStream.read(_buffer, bytesRead, messageSize - bytesRead)
|
||||
if (read == -1)
|
||||
throw Exception("Stream closed")
|
||||
bytesRead += read
|
||||
}
|
||||
readExact(_buffer, 0, messageSize)
|
||||
//Logger.v(TAG, "Read ${messageSize}.")
|
||||
|
||||
//Logger.v(TAG, "Decrypting ${messageSize} bytes.")
|
||||
val plen: Int = _cipherStatePair!!.receiver.decryptWithAd(null, _buffer, 0, _bufferDecrypted, 0, messageSize)
|
||||
//Logger.i(TAG, "Decrypted message (size = ${plen})")
|
||||
|
||||
//Logger.v(TAG, "Decrypted ${messageSize} bytes.")
|
||||
handleData(_bufferDecrypted, plen, null)
|
||||
//Logger.v(TAG, "Handled data ${messageSize} bytes.")
|
||||
} catch (e: Throwable) {
|
||||
Logger.e(TAG, "Exception while receiving data", e)
|
||||
Logger.e(TAG, "Exception while receiving data, closing socket session", e)
|
||||
stop()
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -203,8 +235,7 @@ class SyncSocketSession {
|
||||
_channels.values.forEach { it.close() }
|
||||
_channels.clear()
|
||||
_onClose?.invoke(this)
|
||||
_inputStream.close()
|
||||
_outputStream.close()
|
||||
_socket.close()
|
||||
_thread = null
|
||||
_cipherStatePair?.sender?.destroy()
|
||||
_cipherStatePair?.receiver?.destroy()
|
||||
@@ -237,25 +268,32 @@ class SyncSocketSession {
|
||||
val mainBuffer = ByteArray(512)
|
||||
val mainLength = initiator.writeMessage(mainBuffer, 0, null, 0, 0)
|
||||
|
||||
val messageData = ByteBuffer.allocate(4 + 4 + pairingMessageLength + mainLength).order(ByteOrder.LITTLE_ENDIAN)
|
||||
val messageSize = 4 + 4 + pairingMessageLength + mainLength
|
||||
val messageData = ByteBuffer.allocate(4 + messageSize).order(ByteOrder.LITTLE_ENDIAN)
|
||||
messageData.putInt(messageSize)
|
||||
messageData.putInt(appId.toInt())
|
||||
messageData.putInt(pairingMessageLength)
|
||||
if (pairingMessageLength > 0) messageData.put(pairingMessage)
|
||||
messageData.put(mainBuffer, 0, mainLength)
|
||||
val messageDataArray = messageData.array()
|
||||
_outputStream.writeInt(messageDataArray.size)
|
||||
_outputStream.write(messageDataArray)
|
||||
_outputStream.write(messageDataArray, 0, 4 + messageSize)
|
||||
|
||||
readExact(_buffer, 0, 4)
|
||||
val responseSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
|
||||
if (responseSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) {
|
||||
throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)")
|
||||
}
|
||||
|
||||
val responseSize = _inputStream.readInt()
|
||||
val responseMessage = ByteArray(responseSize)
|
||||
_inputStream.readFully(responseMessage)
|
||||
readExact(responseMessage, 0, responseSize)
|
||||
|
||||
val plaintext = ByteArray(512) // Buffer for any payload (none expected here)
|
||||
initiator.readMessage(responseMessage, 0, responseSize, plaintext, 0)
|
||||
|
||||
_cipherStatePair = initiator.split()
|
||||
val remoteKeyBytes = ByteArray(initiator.remotePublicKey.publicKeyLength)
|
||||
initiator.remotePublicKey.getPublicKey(remoteKeyBytes, 0)
|
||||
_remotePublicKey = Base64.getEncoder().encodeToString(remoteKeyBytes)
|
||||
_remotePublicKey = Base64.getEncoder().encodeToString(remoteKeyBytes).base64ToByteArray().toBase64()
|
||||
}
|
||||
|
||||
private fun handshakeAsResponder(): Boolean {
|
||||
@@ -265,11 +303,16 @@ class SyncSocketSession {
|
||||
responder.localKeyPair.copyFrom(_localKeyPair)
|
||||
responder.start()
|
||||
|
||||
val messageSize = _inputStream.readInt()
|
||||
val message = ByteArray(messageSize)
|
||||
_inputStream.readFully(message)
|
||||
val messageBuffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN)
|
||||
readExact(_buffer, 0, 4)
|
||||
val messageSize = ByteBuffer.wrap(_buffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
|
||||
if (messageSize > MAXIMUM_PACKET_SIZE_ENCRYPTED) {
|
||||
throw Exception("Message size (${messageSize}) cannot exceed MAXIMUM_PACKET_SIZE ($MAXIMUM_PACKET_SIZE_ENCRYPTED)")
|
||||
}
|
||||
|
||||
val message = ByteArray(messageSize)
|
||||
readExact(message, 0, messageSize)
|
||||
|
||||
val messageBuffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN)
|
||||
val appId = messageBuffer.int.toUInt()
|
||||
val pairingMessageLength = messageBuffer.int
|
||||
val pairingMessage = if (pairingMessageLength > 0) ByteArray(pairingMessageLength).also { messageBuffer.get(it) } else byteArrayOf()
|
||||
@@ -298,21 +341,26 @@ class SyncSocketSession {
|
||||
return false
|
||||
}
|
||||
|
||||
val responseBuffer = ByteArray(512)
|
||||
val responseLength = responder.writeMessage(responseBuffer, 0, null, 0, 0)
|
||||
_outputStream.writeInt(responseLength)
|
||||
_outputStream.write(responseBuffer, 0, responseLength)
|
||||
val responseBuffer = ByteArray(4 + 512)
|
||||
val responseLength = responder.writeMessage(responseBuffer, 4, null, 0, 0)
|
||||
ByteBuffer.wrap(responseBuffer).order(ByteOrder.LITTLE_ENDIAN).putInt(responseLength)
|
||||
_outputStream.write(responseBuffer, 0, 4 + responseLength)
|
||||
|
||||
_cipherStatePair = responder.split()
|
||||
_remotePublicKey = remotePublicKey
|
||||
_remotePublicKey = remotePublicKey.base64ToByteArray().toBase64()
|
||||
return true
|
||||
}
|
||||
|
||||
private fun performVersionCheck() {
|
||||
val CURRENT_VERSION = 4
|
||||
val MINIMUM_VERSION = 4
|
||||
_outputStream.writeInt(CURRENT_VERSION)
|
||||
remoteVersion = _inputStream.readInt()
|
||||
|
||||
val versionBytes = ByteArray(4)
|
||||
ByteBuffer.wrap(versionBytes).order(ByteOrder.LITTLE_ENDIAN).putInt(CURRENT_VERSION)
|
||||
_outputStream.write(versionBytes, 0, 4)
|
||||
|
||||
readExact(versionBytes, 0, 4)
|
||||
remoteVersion = ByteBuffer.wrap(versionBytes, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int
|
||||
Logger.i(TAG, "performVersionCheck (version = $remoteVersion)")
|
||||
if (remoteVersion < MINIMUM_VERSION)
|
||||
throw Exception("Invalid version")
|
||||
@@ -321,25 +369,44 @@ class SyncSocketSession {
|
||||
fun generateStreamId(): Int = synchronized(_streamIdGeneratorLock) { _streamIdGenerator++ }
|
||||
private fun generateRequestId(): Int = synchronized(_requestIdGeneratorLock) { _requestIdGenerator++ }
|
||||
|
||||
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer) {
|
||||
fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer, ce: ContentEncoding? = null) {
|
||||
ensureNotMainThread()
|
||||
|
||||
if (data.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) {
|
||||
Logger.v(TAG, "send (opcode: ${opcode}, subOpcode: ${subOpcode}, data.remaining(): ${data.remaining()})")
|
||||
|
||||
var contentEncoding: ContentEncoding? = ce
|
||||
var processedData = data
|
||||
if (contentEncoding == ContentEncoding.Gzip) {
|
||||
val isGzipSupported = opcode == Opcode.DATA.value
|
||||
if (isGzipSupported) {
|
||||
val compressedStream = ByteArrayOutputStream()
|
||||
GZIPOutputStream(compressedStream).use { gzipStream ->
|
||||
gzipStream.write(data.array(), data.position(), data.remaining())
|
||||
gzipStream.finish()
|
||||
}
|
||||
processedData = ByteBuffer.wrap(compressedStream.toByteArray())
|
||||
} else {
|
||||
Logger.w(TAG, "Gzip requested but not supported on this (opcode = ${opcode}, subOpcode = ${subOpcode}), falling back.")
|
||||
contentEncoding = ContentEncoding.Raw
|
||||
}
|
||||
}
|
||||
|
||||
if (processedData.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) {
|
||||
val segmentSize = MAXIMUM_PACKET_SIZE - HEADER_SIZE
|
||||
val segmentData = ByteArray(segmentSize)
|
||||
var sendOffset = 0
|
||||
val id = generateStreamId()
|
||||
|
||||
while (sendOffset < data.remaining()) {
|
||||
val bytesRemaining = data.remaining() - sendOffset
|
||||
while (sendOffset < processedData.remaining()) {
|
||||
val bytesRemaining = processedData.remaining() - sendOffset
|
||||
var bytesToSend: Int
|
||||
var segmentPacketSize: Int
|
||||
val streamOp: StreamOpcode
|
||||
|
||||
if (sendOffset == 0) {
|
||||
streamOp = StreamOpcode.START
|
||||
bytesToSend = segmentSize - 4 - 4 - 1 - 1
|
||||
segmentPacketSize = bytesToSend + 4 + 4 + 1 + 1
|
||||
bytesToSend = segmentSize - 4 - HEADER_SIZE
|
||||
segmentPacketSize = bytesToSend + 4 + HEADER_SIZE
|
||||
} else {
|
||||
bytesToSend = minOf(segmentSize - 4 - 4, bytesRemaining)
|
||||
streamOp = if (bytesToSend >= bytesRemaining) StreamOpcode.END else StreamOpcode.DATA
|
||||
@@ -348,12 +415,13 @@ class SyncSocketSession {
|
||||
|
||||
ByteBuffer.wrap(segmentData).order(ByteOrder.LITTLE_ENDIAN).apply {
|
||||
putInt(id)
|
||||
putInt(if (streamOp == StreamOpcode.START) data.remaining() else sendOffset)
|
||||
putInt(if (streamOp == StreamOpcode.START) processedData.remaining() else sendOffset)
|
||||
if (streamOp == StreamOpcode.START) {
|
||||
put(opcode.toByte())
|
||||
put(subOpcode.toByte())
|
||||
put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte())
|
||||
}
|
||||
put(data.array(), data.position() + sendOffset, bytesToSend)
|
||||
put(processedData.array(), processedData.position() + sendOffset, bytesToSend)
|
||||
}
|
||||
|
||||
send(Opcode.STREAM.value, streamOp.value, ByteBuffer.wrap(segmentData, 0, segmentPacketSize))
|
||||
@@ -362,17 +430,19 @@ class SyncSocketSession {
|
||||
} else {
|
||||
synchronized(_sendLockObject) {
|
||||
ByteBuffer.wrap(_sendBuffer).order(ByteOrder.LITTLE_ENDIAN).apply {
|
||||
putInt(data.remaining() + 2)
|
||||
putInt(processedData.remaining() + HEADER_SIZE - 4)
|
||||
put(opcode.toByte())
|
||||
put(subOpcode.toByte())
|
||||
put(data.array(), data.position(), data.remaining())
|
||||
put(contentEncoding?.value?.toByte() ?: ContentEncoding.Raw.value.toByte())
|
||||
put(processedData.array(), processedData.position(), processedData.remaining())
|
||||
}
|
||||
|
||||
//Logger.i(TAG, "Encrypting message (size = ${data.size + HEADER_SIZE})")
|
||||
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 0, data.remaining() + HEADER_SIZE)
|
||||
//Logger.i(TAG, "Sending encrypted message (size = ${len})")
|
||||
_outputStream.writeInt(len)
|
||||
_outputStream.write(_sendBufferEncrypted, 0, len)
|
||||
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 4, processedData.remaining() + HEADER_SIZE)
|
||||
val sendDuration = measureTimeMillis {
|
||||
ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len)
|
||||
_outputStream.write(_sendBufferEncrypted, 0, 4 + len)
|
||||
}
|
||||
Logger.v(TAG, "_outputStream.write (opcode: ${opcode}, subOpcode: ${subOpcode}, processedData.remaining(): ${processedData.remaining()}, sendDuration: ${sendDuration})")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -382,17 +452,18 @@ class SyncSocketSession {
|
||||
ensureNotMainThread()
|
||||
|
||||
synchronized(_sendLockObject) {
|
||||
ByteBuffer.wrap(_sendBuffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(2)
|
||||
ByteBuffer.wrap(_sendBuffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(HEADER_SIZE - 4)
|
||||
_sendBuffer.asUByteArray()[4] = opcode
|
||||
_sendBuffer.asUByteArray()[5] = subOpcode
|
||||
_sendBuffer.asUByteArray()[6] = ContentEncoding.Raw.value
|
||||
|
||||
//Logger.i(TAG, "Encrypting message (opcode = ${opcode}, subOpcode = ${subOpcode}, size = ${HEADER_SIZE})")
|
||||
|
||||
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 0, HEADER_SIZE)
|
||||
val len = _cipherStatePair!!.sender.encryptWithAd(null, _sendBuffer, 0, _sendBufferEncrypted, 4, HEADER_SIZE)
|
||||
//Logger.i(TAG, "Sending encrypted message (size = ${len})")
|
||||
|
||||
_outputStream.writeInt(len)
|
||||
_outputStream.write(_sendBufferEncrypted, 0, len)
|
||||
ByteBuffer.wrap(_sendBufferEncrypted, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(len)
|
||||
_outputStream.write(_sendBufferEncrypted, 0, 4 + len)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -403,7 +474,7 @@ class SyncSocketSession {
|
||||
private fun handleData(data: ByteBuffer, sourceChannel: ChannelRelayed?) {
|
||||
val length = data.remaining()
|
||||
if (length < HEADER_SIZE)
|
||||
throw Exception("Packet must be at least 6 bytes (header size)")
|
||||
throw Exception("Packet must be at least ${HEADER_SIZE} bytes (header size)")
|
||||
|
||||
val size = data.int
|
||||
if (size != length - 4)
|
||||
@@ -411,7 +482,10 @@ class SyncSocketSession {
|
||||
|
||||
val opcode = data.get().toUByte()
|
||||
val subOpcode = data.get().toUByte()
|
||||
handlePacket(opcode, subOpcode, data, sourceChannel)
|
||||
val contentEncoding = data.get().toUByte()
|
||||
|
||||
//Logger.v(TAG, "handleData (opcode: ${opcode}, subOpcode: ${subOpcode}, data.size: ${data.remaining()}, sourceChannel.connectionId: ${sourceChannel?.connectionId})")
|
||||
handlePacket(opcode, subOpcode, data, contentEncoding, sourceChannel)
|
||||
}
|
||||
|
||||
private fun handleRequest(subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) {
|
||||
@@ -759,9 +833,27 @@ class SyncSocketSession {
|
||||
}
|
||||
}
|
||||
|
||||
private fun handlePacket(opcode: UByte, subOpcode: UByte, data: ByteBuffer, sourceChannel: ChannelRelayed?) {
|
||||
private fun handlePacket(opcode: UByte, subOpcode: UByte, d: ByteBuffer, contentEncoding: UByte, sourceChannel: ChannelRelayed?) {
|
||||
Logger.i(TAG, "Handle packet (opcode = ${opcode}, subOpcode = ${subOpcode})")
|
||||
|
||||
var data = d
|
||||
if (contentEncoding == ContentEncoding.Gzip.value) {
|
||||
val isGzipSupported = opcode == Opcode.DATA.value
|
||||
if (!isGzipSupported)
|
||||
throw Exception("Failed to handle packet, gzip is not supported for this opcode (opcode = ${opcode}, subOpcode = ${subOpcode}, data.length = ${data.remaining()}).")
|
||||
|
||||
val compressedStream = ByteArrayInputStream(data.array(), data.position(), data.remaining())
|
||||
val outputStream = ByteArrayOutputStream()
|
||||
GZIPInputStream(compressedStream).use { gzipStream ->
|
||||
val buffer = ByteArray(8192) // 8KB buffer
|
||||
var bytesRead: Int
|
||||
while (gzipStream.read(buffer).also { bytesRead = it } != -1) {
|
||||
outputStream.write(buffer, 0, bytesRead)
|
||||
}
|
||||
}
|
||||
data = ByteBuffer.wrap(outputStream.toByteArray())
|
||||
}
|
||||
|
||||
when (opcode) {
|
||||
Opcode.PING.value -> {
|
||||
if (sourceChannel != null)
|
||||
@@ -799,8 +891,9 @@ class SyncSocketSession {
|
||||
val expectedSize = data.int
|
||||
val op = data.get().toUByte()
|
||||
val subOp = data.get().toUByte()
|
||||
val ce = data.get().toUByte()
|
||||
|
||||
val syncStream = SyncStream(expectedSize, op, subOp)
|
||||
val syncStream = SyncStream(expectedSize, op, subOp, ce)
|
||||
if (data.remaining() > 0) {
|
||||
syncStream.add(data.array(), data.position(), data.remaining())
|
||||
}
|
||||
@@ -845,7 +938,7 @@ class SyncSocketSession {
|
||||
throw Exception("After sync stream end, the stream must be complete")
|
||||
}
|
||||
|
||||
handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, sourceChannel)
|
||||
handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }, syncStream.contentEncoding, sourceChannel)
|
||||
}
|
||||
}
|
||||
Opcode.DATA.value -> {
|
||||
@@ -1025,7 +1118,7 @@ class SyncSocketSession {
|
||||
send(Opcode.NOTIFY.value, NotifyOpcode.CONNECTION_INFO.value, publishBytes)
|
||||
}
|
||||
|
||||
suspend fun publishRecords(consumerPublicKeys: List<String>, key: String, data: ByteArray): Boolean {
|
||||
suspend fun publishRecords(consumerPublicKeys: List<String>, key: String, data: ByteArray, contentEncoding: ContentEncoding? = null): Boolean {
|
||||
val keyBytes = key.toByteArray(Charsets.UTF_8)
|
||||
if (key.isEmpty() || keyBytes.size > 32) throw IllegalArgumentException("Key must be 1-32 bytes")
|
||||
if (consumerPublicKeys.isEmpty()) throw IllegalArgumentException("At least one consumer required")
|
||||
@@ -1080,7 +1173,7 @@ class SyncSocketSession {
|
||||
}
|
||||
}
|
||||
packet.rewind()
|
||||
send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet)
|
||||
send(Opcode.REQUEST.value, RequestOpcode.BULK_PUBLISH_RECORD.value, packet, ce = contentEncoding)
|
||||
} catch (e: Exception) {
|
||||
_pendingPublishRequests.remove(requestId)?.completeExceptionally(e)
|
||||
throw e
|
||||
@@ -1200,6 +1293,6 @@ class SyncSocketSession {
|
||||
private const val TAG = "SyncSocketSession"
|
||||
const val MAXIMUM_PACKET_SIZE = 65535 - 16
|
||||
const val MAXIMUM_PACKET_SIZE_ENCRYPTED = MAXIMUM_PACKET_SIZE + 16
|
||||
const val HEADER_SIZE = 6
|
||||
const val HEADER_SIZE = 7
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.futo.platformplayer.sync.internal
|
||||
|
||||
class SyncStream(expectedSize: Int, val opcode: UByte, val subOpcode: UByte) {
|
||||
class SyncStream(expectedSize: Int, val opcode: UByte, val subOpcode: UByte, val contentEncoding: UByte) {
|
||||
companion object {
|
||||
const val MAXIMUM_SIZE = 10_000_000
|
||||
}
|
||||
|
||||
@@ -45,6 +45,10 @@ open class ChannelView : LinearLayout {
|
||||
_buttonSubscribe = findViewById(R.id.button_subscribe);
|
||||
_platformIndicator = findViewById(R.id.platform_indicator);
|
||||
|
||||
//_textName.setOnClickListener { currentChannel?.let { onClick.emit(it) }; }
|
||||
//_creatorThumbnail.setOnClickListener { currentChannel?.let { onClick.emit(it) }; }
|
||||
//_textMetadata.setOnClickListener { currentChannel?.let { onClick.emit(it) }; }
|
||||
|
||||
if (_tiny) {
|
||||
_buttonSubscribe.visibility = View.GONE;
|
||||
_textMetadata.visibility = View.GONE;
|
||||
@@ -66,8 +70,11 @@ open class ChannelView : LinearLayout {
|
||||
open fun bind(content: IPlatformContent) {
|
||||
isClickable = true;
|
||||
|
||||
if(content !is IPlatformChannelContent)
|
||||
return
|
||||
if(content !is IPlatformChannelContent) {
|
||||
currentChannel = null;
|
||||
return;
|
||||
}
|
||||
currentChannel = content;
|
||||
|
||||
_creatorThumbnail.setThumbnail(content.thumbnail, false);
|
||||
_textName.text = content.name;
|
||||
|
||||
@@ -18,6 +18,7 @@ import com.futo.platformplayer.casting.StateCasting
|
||||
import com.futo.platformplayer.constructs.Event1
|
||||
import com.futo.platformplayer.constructs.Event2
|
||||
import androidx.core.view.isVisible
|
||||
import com.futo.platformplayer.UIDialogs
|
||||
|
||||
class DeviceViewHolder : ViewHolder {
|
||||
private val _layoutDevice: FrameLayout;
|
||||
@@ -55,9 +56,17 @@ class DeviceViewHolder : ViewHolder {
|
||||
|
||||
val connect = {
|
||||
device?.let { dev ->
|
||||
StateCasting.instance.activeDevice?.stopCasting();
|
||||
StateCasting.instance.connectDevice(dev);
|
||||
onConnect.emit(dev);
|
||||
if (dev.isReady) {
|
||||
StateCasting.instance.activeDevice?.stopCasting()
|
||||
StateCasting.instance.connectDevice(dev)
|
||||
onConnect.emit(dev)
|
||||
} else {
|
||||
try {
|
||||
view.context?.let { UIDialogs.toast(it, "Device not ready, may be offline") }
|
||||
} catch (e: Throwable) {
|
||||
//Ignored
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,7 +93,7 @@ class DeviceViewHolder : ViewHolder {
|
||||
}
|
||||
|
||||
_textName.text = d.name;
|
||||
_imageOnline.visibility = if (isOnlineDevice) View.VISIBLE else View.GONE
|
||||
_imageOnline.visibility = if (isOnlineDevice && d.isReady) View.VISIBLE else View.GONE
|
||||
|
||||
if (!d.isReady) {
|
||||
_imageLoader.visibility = View.GONE;
|
||||
|
||||
@@ -37,9 +37,10 @@ class SubscriptionAdapter : RecyclerView.Adapter<SubscriptionViewHolder> {
|
||||
_onDatasetChanged = onDatasetChanged;
|
||||
|
||||
StateSubscriptions.instance.onSubscriptionsChanged.subscribe { _, _ -> if(Looper.myLooper() != Looper.getMainLooper())
|
||||
StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { updateDataset() }
|
||||
StateApp.instance.scopeOrNull?.launch(Dispatchers.Main) { updateDataset() }
|
||||
else
|
||||
updateDataset(); }
|
||||
updateDataset();
|
||||
}
|
||||
updateDataset();
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,17 @@ class RadioGroupView : FlexboxLayout {
|
||||
|
||||
val selectedOptions = arrayListOf<Any?>();
|
||||
val onSelectedChange = Event1<List<Any?>>();
|
||||
constructor(context: Context) : super(context) {
|
||||
flexWrap = FlexWrap.WRAP;
|
||||
_padding_px = TypedValue.applyDimension(TypedValue.COMPLEX_UNIT_DIP, _padding_dp, context.resources.displayMetrics).toInt();
|
||||
|
||||
if (isInEditMode) {
|
||||
setOptions(listOf("Example 1" to 1, "Example 2" to 2, "Example 3" to 3, "Example 4" to 4, "Example 5" to 5), listOf("Example 1", "Example 2"),
|
||||
multiSelect = true,
|
||||
atLeastOne = false
|
||||
);
|
||||
}
|
||||
}
|
||||
constructor(context: Context, attrs: AttributeSet) : super(context, attrs) {
|
||||
flexWrap = FlexWrap.WRAP;
|
||||
_padding_px = TypedValue.applyDimension(TypedValue.COMPLEX_UNIT_DIP, _padding_dp, context.resources.displayMetrics).toInt();
|
||||
|
||||
@@ -57,15 +57,15 @@
|
||||
|
||||
<ImageView
|
||||
android:id="@+id/image_clear"
|
||||
android:layout_width="16dp"
|
||||
android:layout_height="16dp"
|
||||
android:layout_width="36dp"
|
||||
android:layout_height="36dp"
|
||||
app:srcCompat="@drawable/ic_clear_16dp"
|
||||
app:layout_constraintTop_toTopOf="parent"
|
||||
app:layout_constraintRight_toRightOf="parent"
|
||||
app:layout_constraintBottom_toBottomOf="parent"
|
||||
android:layout_marginEnd="6dp"
|
||||
android:layout_marginStart="6dp"
|
||||
android:padding="2dp" />
|
||||
android:padding="12dp" />
|
||||
|
||||
<TextView
|
||||
android:id="@+id/text_name"
|
||||
|
||||
@@ -382,6 +382,10 @@
|
||||
<string name="pair_through_relay_description">Allow devices to be paired through the relay</string>
|
||||
<string name="connect_through_relay">Connection through relay</string>
|
||||
<string name="connect_through_relay_description">Allow devices to be connected to through the relay</string>
|
||||
<string name="connect_local_direct_through_relay">Connect direct through relay</string>
|
||||
<string name="connect_local_direct_through_relay_description">Allow devices to be directly locally connected to through information discovered from the relay</string>
|
||||
<string name="local_connections">Local connections</string>
|
||||
<string name="local_connections_description">Allow device to be directly locally connected</string>
|
||||
<string name="gesture_controls">Gesture controls</string>
|
||||
<string name="volume_slider">Volume slider</string>
|
||||
<string name="volume_slider_descr">Enable slide gesture to change volume</string>
|
||||
|
||||
Reference in New Issue
Block a user