MyFamilyTracker
Real-time family location sharing — Firebase Realtime DB for sub-second propagation, WorkManager + ForegroundService for OS-compliant background collection, geofencing via Google Maps API.
Coroutines and Flow add async complexity that makes testing tricky. Here's how to use TestCoroutineDispatcher, runTest, and Turbine to write fast, deterministic tests for all your coroutine-based code.
On this page
Coroutines make async code cleaner to write. They make it harder to test if you don't know the tools.
The good news: the Kotlin coroutines testing library and Turbine give you everything you need to test coroutines and Flow deterministically, without real delays.
runTestkotlinx-coroutines-testrunBlockingTestCoroutineScheduler// build.gradle.kts
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.0")
@Test
fun `coroutine test example`() = runTest {
val result = someCoroutineFunction()
assertEquals(expected, result)
}Key behavior of
runTestdelay()Straightforward — just use
runTestclass TaskRepositoryTest {
private val repository = TaskRepositoryImpl(FakeTaskDao())
@Test
fun `getTask returns null for nonexistent id`() = runTest {
val result = repository.getTask("nonexistent-id")
assertNull(result)
}
@Test
fun `createTask then getTask returns the task`() = runTest {
val task = Task("1", "Buy milk", false)
repository.createTask(task)
val result = repository.getTask("1")
assertEquals("Buy milk", result?.title)
}
}When your code uses
delay()runTestclass RetryServiceImpl : RetryService {
override suspend fun fetchWithRetry(url: String): Result<String> {
repeat(3) { attempt ->
try {
return Result.success(httpClient.get(url))
} catch (e: Exception) {
if (attempt < 2) delay(1000L * (attempt + 1)) // Exponential backoff
}
}
return Result.failure(Exception("Max retries exceeded"))
}
}
@Test
fun `fetchWithRetry succeeds on third attempt`() = runTest {
val failTwiceThenSucceed = FakeHttpClient(
responses = listOf(Failure, Failure, Success("data"))
)
val service = RetryServiceImpl(failTwiceThenSucceed)
val result = service.fetchWithRetry("https://api.example.com")
assertTrue(result.isSuccess) // Completes instantly despite 3-second total delay
assertEquals("data", result.getOrNull())
}Turbine makes testing Flow clean and intuitive:
// build.gradle.kts
testImplementation("app.cash.turbine:turbine:1.1.0")
@Test
fun `task list flow emits tasks as they are added`() = runTest {
val repository = FakeTaskRepository()
repository.getAllTasksFlow().test {
// First emission — empty list
assertEquals(emptyList(), awaitItem())
// Add a task
repository.addTask(Task("1", "Buy milk", false))
// Flow emits updated list
val updated = awaitItem()
assertEquals(1, updated.size)
assertEquals("Buy milk", updated.first().title)
cancelAndIgnoreRemainingEvents()
}
}Turbine methods:
awaitItem()awaitComplete()awaitError()cancelAndIgnoreRemainingEvents()class TaskViewModelTest {
@get:Rule
val mainDispatcherRule = MainDispatcherRule() // See below
@Test
fun `completing a task updates uiState`() = runTest {
val repository = FakeTaskRepository().apply {
addTask(Task("1", "Buy milk", completed = false))
}
val viewModel = TaskViewModel(repository)
// Assert initial state
val initial = viewModel.uiState.value
assertFalse(initial.tasks.first().completed)
// Action
viewModel.completeTask("1")
// Assert updated state
val updated = viewModel.uiState.value
assertTrue(updated.tasks.first { it.id == "1" }.completed)
}
@Test
fun `loading state shown while fetching tasks`() = runTest {
val slowRepository = SlowFakeTaskRepository()
val viewModel = TaskViewModel(slowRepository)
viewModel.uiState.test {
// Initial loading state
assertTrue(awaitItem().isLoading)
// Loading completes
val loaded = awaitItem()
assertFalse(loaded.isLoading)
assertFalse(loaded.tasks.isEmpty())
cancelAndIgnoreRemainingEvents()
}
}
}ViewModels use
Dispatchers.MainDispatchers.MainMainDispatcherRuleUnconfinedTestDispatcherclass MainDispatcherRule(
private val dispatcher: TestDispatcher = UnconfinedTestDispatcher()
) : TestWatcher() {
override fun starting(description: Description) {
Dispatchers.setMain(dispatcher)
}
override fun finished(description: Description) {
Dispatchers.resetMain()
}
}Apply it as a JUnit rule:
@get:Rule
val mainDispatcherRule = MainDispatcherRule()Every ViewModel test needs this rule.
@Test
fun `network error shows error state`() = runTest {
val repository = FakeTaskRepository(throwOnFetch = NetworkException("No internet"))
val viewModel = TaskViewModel(repository)
viewModel.uiState.test {
skipItems(1) // Skip loading state
val errorState = awaitItem()
assertTrue(errorState.hasError)
assertEquals("No internet", errorState.errorMessage)
cancelAndIgnoreRemainingEvents()
}
}When testing code that uses
withContext(Dispatchers.IO)// In production
class TaskRepository(private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO) {
suspend fun loadTasks(): List<Task> = withContext(ioDispatcher) {
dao.getAllTasks()
}
}
// In tests — inject TestDispatcher
@Test
fun `loadTasks returns all tasks`() = runTest {
val repository = TaskRepository(ioDispatcher = UnconfinedTestDispatcher(testScheduler))
val tasks = repository.loadTasks()
assertEquals(expectedTasks, tasks)
}[!TIP] Always inject dispatchers rather than using
directly. This makes your code testable without any special setup.codeDispatchers.IO
runTestrunBlockingrunTestdelay()awaitItem()MainDispatcherRuleDispatchers.IOSudarshan Chaudhari
AI Systems Builder / Product Engineer
Bangkok, Thailand
Solo Android developer with 13+ years in QA, building Android apps, AI automation systems, and developer tools at SudarshanTechLabs.
Related Posts
Related Apps
Real-time family location sharing — Firebase Realtime DB for sub-second propagation, WorkManager + ForegroundService for OS-compliant background collection, geofencing via Google Maps API.
Building something? Available for Android dev and QA consulting.
Work with meComments — powered by Giscus
Real-time family location sharing — Firebase Realtime DB for sub-second propagation, WorkManager + ForegroundService for OS-compliant background collection, geofencing via Google Maps API.
ReadPrivate dream journal — structured entry capture, pattern tagging, and optional Claude-powered insight generation. All data stays on-device by default.
ReadWorkout tracker — exercise logging with set/rep/weight history, goal progression, and local Room DB persistence. No account, no cloud sync required.
Read