Improve local sync

This commit is contained in:
Ash 2022-03-23 18:50:13 +08:00
parent b2fe0674c8
commit ada579377b
2 changed files with 131 additions and 117 deletions

View File

@ -7,11 +7,13 @@ import androidx.paging.PagingSource
import androidx.work.* import androidx.work.*
import dagger.assisted.Assisted import dagger.assisted.Assisted
import dagger.assisted.AssistedInject import dagger.assisted.AssistedInject
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.update import kotlinx.coroutines.flow.update
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.withContext
import me.ash.reader.DataStoreKeys import me.ash.reader.DataStoreKeys
import me.ash.reader.data.account.AccountDao import me.ash.reader.data.account.AccountDao
import me.ash.reader.data.article.Article import me.ash.reader.data.article.Article
@ -109,16 +111,17 @@ abstract class AbstractRssRepository constructor(
} }
} }
fun pullImportant( suspend fun pullImportant(
isStarred: Boolean = false, isStarred: Boolean = false,
isUnread: Boolean = false, isUnread: Boolean = false,
): Flow<List<ImportantCount>> { ): Flow<List<ImportantCount>> {
return withContext(Dispatchers.IO) {
val accountId = context.dataStore.get(DataStoreKeys.CurrentAccountId)!! val accountId = context.dataStore.get(DataStoreKeys.CurrentAccountId)!!
Log.i( Log.i(
"RLog", "RLog",
"pullImportant: accountId: ${accountId}, isStarred: ${isStarred}, isUnread: ${isUnread}" "pullImportant: accountId: ${accountId}, isStarred: ${isStarred}, isUnread: ${isUnread}"
) )
return when { when {
isStarred -> articleDao isStarred -> articleDao
.queryImportantCountWhenIsStarred(accountId, isStarred) .queryImportantCountWhenIsStarred(accountId, isStarred)
isUnread -> articleDao isUnread -> articleDao
@ -126,6 +129,7 @@ abstract class AbstractRssRepository constructor(
else -> articleDao.queryImportantCountWhenIsAll(accountId) else -> articleDao.queryImportantCountWhenIsAll(accountId)
} }
} }
}
suspend fun findArticleById(id: String): ArticleWithFeed? { suspend fun findArticleById(id: String): ArticleWithFeed? {
return articleDao.queryById(id) return articleDao.queryById(id)

View File

@ -12,8 +12,11 @@ import androidx.core.app.NotificationCompat
import androidx.core.content.ContextCompat.getSystemService import androidx.core.content.ContextCompat.getSystemService
import androidx.work.WorkManager import androidx.work.WorkManager
import dagger.hilt.android.qualifiers.ApplicationContext import dagger.hilt.android.qualifiers.ApplicationContext
import kotlinx.coroutines.flow.* import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import me.ash.reader.* import me.ash.reader.*
import me.ash.reader.data.account.AccountDao import me.ash.reader.data.account.AccountDao
import me.ash.reader.data.article.Article import me.ash.reader.data.article.Article
@ -42,6 +45,22 @@ class LocalRssRepository @Inject constructor(
context, accountDao, articleDao, groupDao, context, accountDao, articleDao, groupDao,
feedDao, rssNetworkDataSource, workManager, feedDao, rssNetworkDataSource, workManager,
) { ) {
private val notificationManager: NotificationManager =
(getSystemService(
context,
NotificationManager::class.java
) as NotificationManager).also {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
it.createNotificationChannel(
NotificationChannel(
NotificationGroupName.ARTICLE_UPDATE,
NotificationGroupName.ARTICLE_UPDATE,
NotificationManager.IMPORTANCE_DEFAULT
)
)
}
}
override suspend fun updateArticleInfo(article: Article) { override suspend fun updateArticleInfo(article: Article) {
articleDao.update(article) articleDao.update(article)
} }
@ -68,25 +87,53 @@ class LocalRssRepository @Inject constructor(
override suspend fun sync() { override suspend fun sync() {
mutex.withLock { mutex.withLock {
val accountId = context.dataStore.get(DataStoreKeys.CurrentAccountId)
?: return
val feeds = feedDao.queryAll(accountId)
val feedNotificationMap = mutableMapOf<String, Boolean>()
feeds.forEach { feed ->
feedNotificationMap[feed.id] = feed.isNotification
}
val preTime = System.currentTimeMillis() val preTime = System.currentTimeMillis()
val chunked = feeds.chunked(6) withContext(Dispatchers.IO) {
chunked.forEachIndexed { index, item -> val accountId = context.dataStore.get(DataStoreKeys.CurrentAccountId)
item.forEach { ?: return@withContext
Log.i("RlOG", "chunked $index: ${it.name}") val feeds = async { feedDao.queryAll(accountId) }
val articles = feeds.await().also { feed ->
updateSyncState {
it.copy(
feedCount = feed.size,
)
}
}.map { feed ->
async {
val articles = syncFeed(accountId, feed)
articles
} }
} }
val flows = mutableListOf<Flow<List<Article>>>()
repeat(chunked.size) { articles.awaitAll().sumOf { it.size }.let { count ->
flows.add(flow { Log.i(
"RlOG",
"[${count}] onCompletion: ${System.currentTimeMillis() - preTime}"
)
accountDao.queryById(accountId)?.let { account ->
accountDao.update(
account.apply {
updateAt = Date()
}
)
}
updateSyncState {
it.copy(
feedCount = 0,
syncedCount = 0,
currentFeedName = ""
)
}
}
}
}
}
private suspend fun syncFeed(
accountId: Int,
feed: Feed
): MutableList<Article> {
val articles = mutableListOf<Article>() val articles = mutableListOf<Article>()
chunked[it].forEach { feed ->
val latest = articleDao.queryLatestByFeedId(accountId, feed.id) val latest = articleDao.queryLatestByFeedId(accountId, feed.id)
articles.addAll( articles.addAll(
rssHelper.queryRssXml( rssHelper.queryRssXml(
@ -102,37 +149,21 @@ class LocalRssRepository @Inject constructor(
) )
updateSyncState { updateSyncState {
it.copy( it.copy(
feedCount = feeds.size,
syncedCount = it.syncedCount + 1, syncedCount = it.syncedCount + 1,
currentFeedName = feed.name currentFeedName = feed.name
) )
} }
articleDao.insertList(articles)
if (feed.isNotification) {
notify(articles)
} }
emit(articles) return articles
})
} }
combine(
flows private fun notify(
articles: MutableList<Article>,
) { ) {
val notificationManager: NotificationManager = articles.forEach { article ->
getSystemService(
context,
NotificationManager::class.java
) as NotificationManager
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
notificationManager.createNotificationChannel(
NotificationChannel(
NotificationGroupName.ARTICLE_UPDATE,
"文章更新",
NotificationManager.IMPORTANCE_DEFAULT
)
)
}
it.forEach { articleList ->
val ids = articleDao.insertList(articleList)
articleList.forEachIndexed { index, article ->
Log.i("RlOG", "combine ${article.feedId}: ${article.title}")
if (feedNotificationMap[article.feedId] == true) {
val builder = NotificationCompat.Builder( val builder = NotificationCompat.Builder(
context, context,
NotificationGroupName.ARTICLE_UPDATE NotificationGroupName.ARTICLE_UPDATE
@ -144,20 +175,20 @@ class LocalRssRepository @Inject constructor(
.setContentIntent( .setContentIntent(
PendingIntent.getActivity( PendingIntent.getActivity(
context, context,
ids[index].toInt(), Random().nextInt() + article.id.hashCode(),
Intent(context, MainActivity::class.java).apply { Intent(context, MainActivity::class.java).apply {
flags = Intent.FLAG_ACTIVITY_NEW_TASK or flags = Intent.FLAG_ACTIVITY_NEW_TASK or
Intent.FLAG_ACTIVITY_CLEAR_TASK Intent.FLAG_ACTIVITY_CLEAR_TASK
putExtra( putExtra(
ExtraName.ARTICLE_ID, ExtraName.ARTICLE_ID,
ids[index] article.id
) )
}, },
PendingIntent.FLAG_IMMUTABLE or PendingIntent.FLAG_UPDATE_CURRENT PendingIntent.FLAG_IMMUTABLE or PendingIntent.FLAG_UPDATE_CURRENT
) )
) )
notificationManager.notify( notificationManager.notify(
ids[index].toInt(), Random().nextInt() + article.id.hashCode(),
builder.build().apply { builder.build().apply {
flags = Notification.FLAG_AUTO_CANCEL flags = Notification.FLAG_AUTO_CANCEL
} }
@ -165,24 +196,3 @@ class LocalRssRepository @Inject constructor(
} }
} }
} }
}.buffer().onCompletion {
val afterTime = System.currentTimeMillis()
Log.i("RlOG", "onCompletion: ${afterTime - preTime}")
accountDao.queryById(accountId)?.let { account ->
accountDao.update(
account.apply {
updateAt = Date()
}
)
}
updateSyncState {
it.copy(
feedCount = 0,
syncedCount = 0,
currentFeedName = ""
)
}
}.collect()
}
}
}