exponential backoff bug fix

This commit is contained in:
Martin Ptáček
2023-06-14 09:49:17 +02:00
parent 89884172f7
commit 20e2804c5e
4 changed files with 29 additions and 10 deletions

View File

@ -1,10 +1,13 @@
package com.birdthedeveloper.prometheus.android.prometheus.android.exporter.worker package com.birdthedeveloper.prometheus.android.prometheus.android.exporter.worker
import android.util.Log
import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlin.math.min import kotlin.math.min
import kotlin.math.pow import kotlin.math.pow
private const val TAG : String = "EXPONENTIAL_BACKOFF"
class ExponentialBackoff { class ExponentialBackoff {
companion object { companion object {
private const val multiplier: Double = 2.0 private const val multiplier: Double = 2.0
@ -26,12 +29,12 @@ class ExponentialBackoff {
function() function()
successfull = true successfull = true
} catch (e: CancellationException) { } catch (e: CancellationException) {
break throw e
} catch (e: Exception) { } catch (e: Exception) {
// check for suppressed exceptions // check for suppressed exceptions
for (exception in e.suppressed) { for (exception in e.suppressed) {
if (exception is CancellationException) { if (exception is CancellationException) {
break throw exception
} }
} }
@ -47,6 +50,8 @@ class ExponentialBackoff {
break break
} }
Log.d(TAG, "Backoff with delay: $currentDelay seconds")
delay(currentDelay.toLong() * 1000) delay(currentDelay.toLong() * 1000)
} }
} }

View File

@ -48,7 +48,7 @@ class PromWorker(
return writer.toString() return writer.toString()
} }
private suspend fun countSuccessfulScrape() { private fun countSuccessfulScrape() {
Log.d(TAG, "Counting successful scrape") Log.d(TAG, "Counting successful scrape")
remoteWriteSender?.countSuccessfulScrape() remoteWriteSender?.countSuccessfulScrape()
} }

View File

@ -112,11 +112,13 @@ class PushProxClient(private val pushProxConfig: PushProxConfig) {
// Continuously poll from pushprox gateway // Continuously poll from pushprox gateway
private suspend fun doPoll(context: PushProxContext) { private suspend fun doPoll(context: PushProxContext) {
Log.d(TAG, "Polling now")
val response: HttpResponse = context.client.post(context.pollUrl) { val response: HttpResponse = context.client.post(context.pollUrl) {
setBody(context.fqdn) setBody(context.fqdn)
} }
val responseBody: String = response.body<String>() val responseBody: String = response.body<String>()
doPush(context, responseBody) doPush(context, responseBody)
Log.d(TAG, "Polling finished")
} }
// get value of HTTP header "Id" from response body // get value of HTTP header "Id" from response body

View File

@ -6,12 +6,12 @@ import android.net.NetworkCapabilities
import android.util.Log import android.util.Log
import androidx.work.impl.utils.getActiveNetworkCompat import androidx.work.impl.utils.getActiveNetworkCompat
import io.ktor.client.HttpClient import io.ktor.client.HttpClient
import io.ktor.client.call.body
import io.ktor.client.request.header import io.ktor.client.request.header
import io.ktor.client.request.headers import io.ktor.client.request.headers
import io.ktor.client.request.post import io.ktor.client.request.post
import io.ktor.client.request.setBody import io.ktor.client.request.setBody
import io.ktor.http.HttpHeaders import io.ktor.http.HttpHeaders
import io.ktor.http.HttpStatusCode
import io.prometheus.client.CollectorRegistry import io.prometheus.client.CollectorRegistry
import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.BufferOverflow
@ -109,12 +109,19 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
private suspend fun sendAll() { private suspend fun sendAll() {
Log.d(TAG, "sendAll") Log.d(TAG, "sendAll")
scrapesAreBeingSent = true if (!scrapesAreBeingSent) {
while (!storage.isEmpty()) { scrapesAreBeingSent = true
val body = storage.getScrapedSamplesCompressedProtobuf(config.maxSamplesPerExport)
ExponentialBackoff.runWithBackoff({ sendRequestToRemoteWrite(body) }, {}, false) while (!storage.isEmpty()) {
val body = storage.getScrapedSamplesCompressedProtobuf(config.maxSamplesPerExport)
Log.d(TAG, "Exponential backoff to export remote write started")
ExponentialBackoff.runWithBackoff({
sendRequestToRemoteWrite(body, config.maxSamplesPerExport)
}, {}, false)
Log.d(TAG, "Exponential backoff to export remote write finish")
}
lastTimeRemoteWriteSent = System.currentTimeMillis()
} }
lastTimeRemoteWriteSent = System.currentTimeMillis()
} }
private fun deviceHasInternet(): Boolean { private fun deviceHasInternet(): Boolean {
@ -193,7 +200,7 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
lastTimeRingBuffer.setLastTime(System.currentTimeMillis()) lastTimeRingBuffer.setLastTime(System.currentTimeMillis())
} }
private suspend fun sendRequestToRemoteWrite(body: ByteArray) { private suspend fun sendRequestToRemoteWrite(body: ByteArray, numOfMetricScrapes : Int) {
Log.d(TAG, "Exporting remote write to prometheus now") Log.d(TAG, "Exporting remote write to prometheus now")
val response = client.post(config.remoteWriteEndpoint) { val response = client.post(config.remoteWriteEndpoint) {
setBody(body) setBody(body)
@ -206,5 +213,10 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
} }
Log.d(TAG, "Response status: ${response.status}") Log.d(TAG, "Response status: ${response.status}")
if (response.status == HttpStatusCode.NoContent){
// this export was successful
storage.removeNumberOfScrapedSamples(numOfMetricScrapes)
}
} }
} }