From c0cdae89ec2f816ec43bfb968e083b7f901913c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Pt=C3=A1=C4=8Dek?= Date: Thu, 1 Jun 2023 18:59:11 +0200 Subject: [PATCH] pushprox refactored --- .../android/exporter/compose/PromViewModel.kt | 63 +++++++++------- .../exporter/worker/ExponentialBackoff.kt | 44 +++++++++++ .../android/exporter/worker/PromWorker.kt | 17 +++-- .../android/exporter/worker/PushProxClient.kt | 75 +++++++------------ 4 files changed, 120 insertions(+), 79 deletions(-) create mode 100644 client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/ExponentialBackoff.kt diff --git a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/compose/PromViewModel.kt b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/compose/PromViewModel.kt index 88bdfb8..569a758 100644 --- a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/compose/PromViewModel.kt +++ b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/compose/PromViewModel.kt @@ -99,15 +99,9 @@ class PromViewModel(): ViewModel() { when(_uiState.value.exporterState) { ExporterState.Running -> { stopWorker() - _uiState.update { current -> - current.copy(exporterState = ExporterState.NotRunning) - } } ExporterState.NotRunning -> { startWorker() - _uiState.update { current -> - current.copy(exporterState = ExporterState.Running) - } } } } @@ -125,36 +119,53 @@ class PromViewModel(): ViewModel() { } } + private fun validatePromConfiguration() : Boolean{ + //TODO implement this, on missing fields, e.g. not valid, display alert dialog in UI + return true + } + private fun startWorker(){ - Log.v(TAG, "Enqueuing work") - val workManagerInstance = WorkManager.getInstance(getContext()) + if (validatePromConfiguration()){ + Log.v(TAG, "Enqueuing work") + val workManagerInstance = WorkManager.getInstance(getContext()) - // worker configuration - val inputData : Data = _uiState.value.promConfig.toWorkData() + // worker configuration + val inputData : Data = _uiState.value.promConfig.toWorkData() - // constraints - val constraints = Constraints.Builder() - .setRequiredNetworkType(NetworkType.NOT_REQUIRED) - .build() + // constraints + val constraints = Constraints.Builder() + .setRequiredNetworkType(NetworkType.NOT_REQUIRED) + .build() - // setup worker request - val workerRequest = OneTimeWorkRequestBuilder() - .setInputData(inputData) - .setConstraints(constraints) - .setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST) - .build() + // setup worker request + val workerRequest = OneTimeWorkRequestBuilder() + .setInputData(inputData) + .setConstraints(constraints) + .setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST) + .build() - // enqueue - workManagerInstance.beginUniqueWork( - PROM_UNIQUE_WORK, - ExistingWorkPolicy.KEEP, - workerRequest, - ).enqueue() + // enqueue + workManagerInstance.beginUniqueWork( + PROM_UNIQUE_WORK, + ExistingWorkPolicy.KEEP, + workerRequest, + ).enqueue() + + // set UI state + _uiState.update { current -> + current.copy(exporterState = ExporterState.Running) + } + } } private fun stopWorker(){ val workerManagerInstance = WorkManager.getInstance(getContext()) workerManagerInstance.cancelUniqueWork(PROM_UNIQUE_WORK) + + // update UI state + _uiState.update { current -> + current.copy(exporterState = ExporterState.NotRunning) + } } fun updatePromConfig(part : UpdatePromConfig, value : Any){ diff --git a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/ExponentialBackoff.kt b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/ExponentialBackoff.kt new file mode 100644 index 0000000..894cb9e --- /dev/null +++ b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/ExponentialBackoff.kt @@ -0,0 +1,44 @@ +package com.birdthedeveloper.prometheus.android.prometheus.android.exporter.worker + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.delay +import kotlin.math.pow + +class ExponentialBackoff{ + companion object{ + private const val multiplier : Double = 2.0 + private const val initialDelay = 3.0 // seconds + + suspend fun runWithBackoff(function : suspend () -> Unit, onException: () -> Unit){ + var successfull : Boolean = false + + var currentDelay = initialDelay + var currentExpIndex = -1 + + while(!successfull){ + try { + function() + successfull = true + }catch (e : CancellationException){ + successfull = true + } + catch (e : Exception){ + // check for suppressed exceptions + for(exception in e.suppressed){ + if(exception is CancellationException){ + successfull = true + } + } + + onException() + + // calculate new delay + currentExpIndex++ + currentDelay = initialDelay + multiplier.pow(currentExpIndex) + } + + delay(currentDelay.toLong()) + } + } + } +} diff --git a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PromWorker.kt b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PromWorker.kt index 322d1d3..ab18a08 100644 --- a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PromWorker.kt +++ b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PromWorker.kt @@ -27,9 +27,10 @@ class PromWorker( parameters : WorkerParameters, ) : CoroutineWorker(context, parameters) { + private val collectorRegistry = CollectorRegistry() private val metricsEngine : MetricsEngine = MetricsEngine(context) - private val pushProxClient = PushProxClient(::performScrape) private lateinit var androidCustomExporter : AndroidCustomExporter + private lateinit var pushProxClient : PushProxClient //TODO foreground notification private val notificationManager = @@ -44,7 +45,7 @@ class PromWorker( private fun initializeWork(config : PromConfiguration){ // initialize metrics - androidCustomExporter = AndroidCustomExporter(metricsEngine).register() + androidCustomExporter = AndroidCustomExporter(metricsEngine).register(collectorRegistry) } private suspend fun startServices(config : PromConfiguration){ @@ -54,13 +55,20 @@ class PromWorker( PrometheusServer.start( PrometheusServerConfig(config.prometheusServerPort, ::performScrape), ) - Log.v(TAG, "Prometheus server started.") } } if(config.pushproxEnabled){ + pushProxClient = PushProxClient( + PushProxConfig( + pushProxUrl = config.pushproxProxyUrl, + performScrape = ::performScrape, + pushProxFqdn = config.pushproxFqdn, + registry = collectorRegistry, + ) + ) launch { - + pushProxClient.start() } } @@ -81,7 +89,6 @@ class PromWorker( initializeWork(inputConfiguration) startServices(inputConfiguration) - //TODO implement this asap return Result.success() } diff --git a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PushProxClient.kt b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PushProxClient.kt index 41a8e3f..a6aa0ac 100644 --- a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PushProxClient.kt +++ b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PushProxClient.kt @@ -20,6 +20,7 @@ data class PushProxConfig( val pushProxUrl : String, val pushProxFqdn : String, val registry : CollectorRegistry, + val performScrape : () -> String, ) /** @@ -43,10 +44,10 @@ private class PushProxCounter(registry: CollectorRegistry) { fun scrapeError(){ scrapeErrorCounter.inc()} fun pushError(){ pushErrorCounter.inc() } - fun pollError(){ pollErrorCounter.inc() } //TODO use this thing + fun pollError(){ pollErrorCounter.inc() } } -// Error in parsing HTTP header "Id" from HTTP request from Prometheus //TODO wtf +// Error in parsing HTTP header "Id" from HTTP request from Prometheus class PushProxIdParseException(message: String) : Exception(message) // Context object for pushprox internal functions to avoid global variables @@ -58,17 +59,17 @@ data class PushProxContext( ) // This is a stripped down kotlin implementation of github.com/prometheus-community/PushProx client -class PushProxClient(config: PushProxConfig) { - private val counters : PushProxCounter = PushProxCounter(config.registry) +class PushProxClient(private val pushProxConfig: PushProxConfig) { + private val counters : PushProxCounter = PushProxCounter(pushProxConfig.registry) // Use this function to start exporting metrics to pushprox in the background - suspend fun start(config: PushProxConfig) { + suspend fun start() { Log.v(TAG, "Starting pushprox client") var client : HttpClient? = null try { client = HttpClient() - val context : PushProxContext = getPushProxContext(client, config) + val context : PushProxContext = getPushProxContext(client) loop(context) }finally { withContext(NonCancellable){ @@ -78,9 +79,8 @@ class PushProxClient(config: PushProxConfig) { } } - - private fun getPushProxContext(client : HttpClient, config : PushProxConfig) : PushProxContext { - var modifiedProxyURL = config.pushProxUrl.trim('/') + private fun getPushProxContext(client : HttpClient) : PushProxContext { + var modifiedProxyURL = pushProxConfig.pushProxUrl.trim('/') if( !modifiedProxyURL.startsWith("http://") && @@ -96,23 +96,19 @@ class PushProxClient(config: PushProxConfig) { client, pollURL, pushURL, - config.pushProxFqdn, + pushProxConfig.pushProxFqdn, ) } - //TODO refactor this function - // Continuous poll from android phone to pushprox gateway + // Continuously poll from pushprox gateway private suspend fun doPoll(context : PushProxContext){ - log("poll", "polling now") val response : HttpResponse = context.client.post(context.pollUrl){ setBody(context.fqdn) } - log("here", "here") val responseBody: String = response.body() doPush(context, responseBody) } - //TODO refactor this function // get value of HTTP header "Id" from response body private fun getIdFromResponseBody(responseBody: String) : String { @@ -124,7 +120,6 @@ class PushProxClient(config: PushProxConfig) { return id } - //TODO refactor this function private fun composeRequestBody(scrapedMetrics: String, id : String) : String { val httpHeaders = "HTTP/1.1 200 OK\r\n" + "Content-Type: text/plain; version=0.0.4; charset=utf-8\r\n" + @@ -137,55 +132,39 @@ class PushProxClient(config: PushProxConfig) { // Parameter responseBody: response body of /poll request private suspend fun doPush(context : PushProxContext, pollResponseBody : String) { // perform scrape - lateinit var scrapedMetrics : String - try { - scrapedMetrics = performScrape() - }catch(e : Exception){ + val scrapedMetrics : String = try { + pushProxConfig.performScrape() + }catch (e : Exception){ + Log.v(TAG, "Scrape exception ${e.toString()}") counters.scrapeError() - log("scrape exception", e.toString()) - return + "" } + // push metrics to pushprox try{ val scrapeId : String = getIdFromResponseBody(pollResponseBody) - val pushResponseBody: String = composeRequestBody(scrapedMetrics, scrapeId) + val pushRequestBody: String = composeRequestBody(scrapedMetrics, scrapeId) - val response : HttpResponse = context.client.request(context.pushUrl) { + context.client.request(context.pushUrl) { method = HttpMethod.Post - setBody(pushResponseBody) + setBody(pushRequestBody) } - }catch(e : Exception){ counters.pushError() - log("push exception", e.toString()) + Log.v(TAG,"Push exception ${e.toString()}") return } } - //TODO migrate to work manager private suspend fun loop(context : PushProxContext) { while (true) { Log.v(TAG, "PushProxClient main loop start") - // register poll error using try-catch block - //TODO backoff strategy - //TODO asap -// var result = context.backoff.withRetries { -// try { -// doPoll(context) -// }catch(e : CancellationException){ -// shouldContinue = false -// } -// catch (e: Exception) { -// for(exception in e.suppressed){ -// if(exception is CancellationException){ -// shouldContinue = false -// } -// } -// log("exception encountered!", e.toString()) -// counters.pollError() -// throw e -// } -// } + + ExponentialBackoff.runWithBackoff( + function = { doPoll(context) }, + onException = { counters.pollError() } + ) + Log.v(TAG,"PushProxClient main loop end") } }