pushprox refactored

This commit is contained in:
Martin Ptáček
2023-06-01 18:59:11 +02:00
parent 27c62e0731
commit c0cdae89ec
4 changed files with 120 additions and 79 deletions

View File

@ -99,15 +99,9 @@ class PromViewModel(): ViewModel() {
when(_uiState.value.exporterState) {
ExporterState.Running -> {
stopWorker()
_uiState.update { current ->
current.copy(exporterState = ExporterState.NotRunning)
}
}
ExporterState.NotRunning -> {
startWorker()
_uiState.update { current ->
current.copy(exporterState = ExporterState.Running)
}
}
}
}
@ -125,36 +119,53 @@ class PromViewModel(): ViewModel() {
}
}
private fun validatePromConfiguration() : Boolean{
//TODO implement this, on missing fields, e.g. not valid, display alert dialog in UI
return true
}
private fun startWorker(){
Log.v(TAG, "Enqueuing work")
val workManagerInstance = WorkManager.getInstance(getContext())
if (validatePromConfiguration()){
Log.v(TAG, "Enqueuing work")
val workManagerInstance = WorkManager.getInstance(getContext())
// worker configuration
val inputData : Data = _uiState.value.promConfig.toWorkData()
// worker configuration
val inputData : Data = _uiState.value.promConfig.toWorkData()
// constraints
val constraints = Constraints.Builder()
.setRequiredNetworkType(NetworkType.NOT_REQUIRED)
.build()
// constraints
val constraints = Constraints.Builder()
.setRequiredNetworkType(NetworkType.NOT_REQUIRED)
.build()
// setup worker request
val workerRequest = OneTimeWorkRequestBuilder<PromWorker>()
.setInputData(inputData)
.setConstraints(constraints)
.setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST)
.build()
// setup worker request
val workerRequest = OneTimeWorkRequestBuilder<PromWorker>()
.setInputData(inputData)
.setConstraints(constraints)
.setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST)
.build()
// enqueue
workManagerInstance.beginUniqueWork(
PROM_UNIQUE_WORK,
ExistingWorkPolicy.KEEP,
workerRequest,
).enqueue()
// enqueue
workManagerInstance.beginUniqueWork(
PROM_UNIQUE_WORK,
ExistingWorkPolicy.KEEP,
workerRequest,
).enqueue()
// set UI state
_uiState.update { current ->
current.copy(exporterState = ExporterState.Running)
}
}
}
private fun stopWorker(){
val workerManagerInstance = WorkManager.getInstance(getContext())
workerManagerInstance.cancelUniqueWork(PROM_UNIQUE_WORK)
// update UI state
_uiState.update { current ->
current.copy(exporterState = ExporterState.NotRunning)
}
}
fun updatePromConfig(part : UpdatePromConfig, value : Any){

View File

@ -0,0 +1,44 @@
package com.birdthedeveloper.prometheus.android.prometheus.android.exporter.worker
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.delay
import kotlin.math.pow
class ExponentialBackoff{
companion object{
private const val multiplier : Double = 2.0
private const val initialDelay = 3.0 // seconds
suspend fun runWithBackoff(function : suspend () -> Unit, onException: () -> Unit){
var successfull : Boolean = false
var currentDelay = initialDelay
var currentExpIndex = -1
while(!successfull){
try {
function()
successfull = true
}catch (e : CancellationException){
successfull = true
}
catch (e : Exception){
// check for suppressed exceptions
for(exception in e.suppressed){
if(exception is CancellationException){
successfull = true
}
}
onException()
// calculate new delay
currentExpIndex++
currentDelay = initialDelay + multiplier.pow(currentExpIndex)
}
delay(currentDelay.toLong())
}
}
}
}

View File

@ -27,9 +27,10 @@ class PromWorker(
parameters : WorkerParameters,
) : CoroutineWorker(context, parameters) {
private val collectorRegistry = CollectorRegistry()
private val metricsEngine : MetricsEngine = MetricsEngine(context)
private val pushProxClient = PushProxClient(::performScrape)
private lateinit var androidCustomExporter : AndroidCustomExporter
private lateinit var pushProxClient : PushProxClient
//TODO foreground notification
private val notificationManager =
@ -44,7 +45,7 @@ class PromWorker(
private fun initializeWork(config : PromConfiguration){
// initialize metrics
androidCustomExporter = AndroidCustomExporter(metricsEngine).register()
androidCustomExporter = AndroidCustomExporter(metricsEngine).register(collectorRegistry)
}
private suspend fun startServices(config : PromConfiguration){
@ -54,13 +55,20 @@ class PromWorker(
PrometheusServer.start(
PrometheusServerConfig(config.prometheusServerPort, ::performScrape),
)
Log.v(TAG, "Prometheus server started.")
}
}
if(config.pushproxEnabled){
pushProxClient = PushProxClient(
PushProxConfig(
pushProxUrl = config.pushproxProxyUrl,
performScrape = ::performScrape,
pushProxFqdn = config.pushproxFqdn,
registry = collectorRegistry,
)
)
launch {
pushProxClient.start()
}
}
@ -81,7 +89,6 @@ class PromWorker(
initializeWork(inputConfiguration)
startServices(inputConfiguration)
//TODO implement this asap
return Result.success()
}

View File

@ -20,6 +20,7 @@ data class PushProxConfig(
val pushProxUrl : String,
val pushProxFqdn : String,
val registry : CollectorRegistry,
val performScrape : () -> String,
)
/**
@ -43,10 +44,10 @@ private class PushProxCounter(registry: CollectorRegistry) {
fun scrapeError(){ scrapeErrorCounter.inc()}
fun pushError(){ pushErrorCounter.inc() }
fun pollError(){ pollErrorCounter.inc() } //TODO use this thing
fun pollError(){ pollErrorCounter.inc() }
}
// Error in parsing HTTP header "Id" from HTTP request from Prometheus //TODO wtf
// Error in parsing HTTP header "Id" from HTTP request from Prometheus
class PushProxIdParseException(message: String) : Exception(message)
// Context object for pushprox internal functions to avoid global variables
@ -58,17 +59,17 @@ data class PushProxContext(
)
// This is a stripped down kotlin implementation of github.com/prometheus-community/PushProx client
class PushProxClient(config: PushProxConfig) {
private val counters : PushProxCounter = PushProxCounter(config.registry)
class PushProxClient(private val pushProxConfig: PushProxConfig) {
private val counters : PushProxCounter = PushProxCounter(pushProxConfig.registry)
// Use this function to start exporting metrics to pushprox in the background
suspend fun start(config: PushProxConfig) {
suspend fun start() {
Log.v(TAG, "Starting pushprox client")
var client : HttpClient? = null
try {
client = HttpClient()
val context : PushProxContext = getPushProxContext(client, config)
val context : PushProxContext = getPushProxContext(client)
loop(context)
}finally {
withContext(NonCancellable){
@ -78,9 +79,8 @@ class PushProxClient(config: PushProxConfig) {
}
}
private fun getPushProxContext(client : HttpClient, config : PushProxConfig) : PushProxContext {
var modifiedProxyURL = config.pushProxUrl.trim('/')
private fun getPushProxContext(client : HttpClient) : PushProxContext {
var modifiedProxyURL = pushProxConfig.pushProxUrl.trim('/')
if(
!modifiedProxyURL.startsWith("http://") &&
@ -96,23 +96,19 @@ class PushProxClient(config: PushProxConfig) {
client,
pollURL,
pushURL,
config.pushProxFqdn,
pushProxConfig.pushProxFqdn,
)
}
//TODO refactor this function
// Continuous poll from android phone to pushprox gateway
// Continuously poll from pushprox gateway
private suspend fun doPoll(context : PushProxContext){
log("poll", "polling now")
val response : HttpResponse = context.client.post(context.pollUrl){
setBody(context.fqdn)
}
log("here", "here")
val responseBody: String = response.body<String>()
doPush(context, responseBody)
}
//TODO refactor this function
// get value of HTTP header "Id" from response body
private fun getIdFromResponseBody(responseBody: String) : String {
@ -124,7 +120,6 @@ class PushProxClient(config: PushProxConfig) {
return id
}
//TODO refactor this function
private fun composeRequestBody(scrapedMetrics: String, id : String) : String {
val httpHeaders = "HTTP/1.1 200 OK\r\n" +
"Content-Type: text/plain; version=0.0.4; charset=utf-8\r\n" +
@ -137,55 +132,39 @@ class PushProxClient(config: PushProxConfig) {
// Parameter responseBody: response body of /poll request
private suspend fun doPush(context : PushProxContext, pollResponseBody : String) {
// perform scrape
lateinit var scrapedMetrics : String
try {
scrapedMetrics = performScrape()
}catch(e : Exception){
val scrapedMetrics : String = try {
pushProxConfig.performScrape()
}catch (e : Exception){
Log.v(TAG, "Scrape exception ${e.toString()}")
counters.scrapeError()
log("scrape exception", e.toString())
return
""
}
// push metrics to pushprox
try{
val scrapeId : String = getIdFromResponseBody(pollResponseBody)
val pushResponseBody: String = composeRequestBody(scrapedMetrics, scrapeId)
val pushRequestBody: String = composeRequestBody(scrapedMetrics, scrapeId)
val response : HttpResponse = context.client.request(context.pushUrl) {
context.client.request(context.pushUrl) {
method = HttpMethod.Post
setBody(pushResponseBody)
setBody(pushRequestBody)
}
}catch(e : Exception){
counters.pushError()
log("push exception", e.toString())
Log.v(TAG,"Push exception ${e.toString()}")
return
}
}
//TODO migrate to work manager
private suspend fun loop(context : PushProxContext) {
while (true) {
Log.v(TAG, "PushProxClient main loop start")
// register poll error using try-catch block
//TODO backoff strategy
//TODO asap
// var result = context.backoff.withRetries {
// try {
// doPoll(context)
// }catch(e : CancellationException){
// shouldContinue = false
// }
// catch (e: Exception) {
// for(exception in e.suppressed){
// if(exception is CancellationException){
// shouldContinue = false
// }
// }
// log("exception encountered!", e.toString())
// counters.pollError()
// throw e
// }
// }
ExponentialBackoff.runWithBackoff(
function = { doPoll(context) },
onException = { counters.pollError() }
)
Log.v(TAG,"PushProxClient main loop end")
}
}