This commit is contained in:
Martin Ptáček
2023-05-31 23:58:20 +02:00
parent f7dcd87548
commit 27c62e0731
5 changed files with 63 additions and 82 deletions

View File

@ -1,29 +0,0 @@
package com.birdthedeveloper.prometheus.android.prometheus.android.exporter.worker
import androidx.work.Data
data class PushProxConfig(
val pushProxUrl : String,
val pushProxFqdn : String,
){
companion object{
fun fromData(data : Data) : PushProxConfig {
return PushProxConfig(
data.getString("0")!!,
data.getString("1")!!,
)
}
}
fun toData() : Data {
return Data.Builder()
.putString("0", pushProxUrl)
.putString("1", pushProxFqdn)
.build()
}
}
data class PromServerConfig(
//TODO implement this
val dummy : String,
)

View File

@ -27,9 +27,8 @@ class PromWorker(
parameters : WorkerParameters, parameters : WorkerParameters,
) : CoroutineWorker(context, parameters) { ) : CoroutineWorker(context, parameters) {
private val collectorRegistry: CollectorRegistry = CollectorRegistry()
private val metricsEngine : MetricsEngine = MetricsEngine(context) private val metricsEngine : MetricsEngine = MetricsEngine(context)
private val pushProxClient = PushProxClient(collectorRegistry, ::performScrape) private val pushProxClient = PushProxClient(::performScrape)
private lateinit var androidCustomExporter : AndroidCustomExporter private lateinit var androidCustomExporter : AndroidCustomExporter
//TODO foreground notification //TODO foreground notification
@ -39,13 +38,13 @@ class PromWorker(
private fun performScrape() : String{ private fun performScrape() : String{
val writer = StringWriter() val writer = StringWriter()
TextFormat.write004(writer, collectorRegistry.metricFamilySamples()) TextFormat.write004(writer, CollectorRegistry.defaultRegistry.metricFamilySamples())
return writer.toString() return writer.toString()
} }
private fun initializeWork(config : PromConfiguration){ private fun initializeWork(config : PromConfiguration){
// initialize metrics // initialize metrics
androidCustomExporter = AndroidCustomExporter(metricsEngine).register(collectorRegistry) androidCustomExporter = AndroidCustomExporter(metricsEngine).register()
} }
private suspend fun startServices(config : PromConfiguration){ private suspend fun startServices(config : PromConfiguration){

View File

@ -40,9 +40,8 @@ class PrometheusServer() {
delay(Long.MAX_VALUE) delay(Long.MAX_VALUE)
}finally { }finally {
withContext(NonCancellable){ withContext(NonCancellable){
Log.v(TAG, "3") Log.v(TAG, "Canceling Prometheus server")
server.stop() server.stop()
Log.v(TAG, "4")
} }
} }
} }

View File

@ -10,45 +10,43 @@ import io.ktor.client.statement.HttpResponse
import io.ktor.http.HttpMethod import io.ktor.http.HttpMethod
import io.prometheus.client.CollectorRegistry import io.prometheus.client.CollectorRegistry
import io.prometheus.client.Counter import io.prometheus.client.Counter
import kotlinx.coroutines.CancellationException import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.withContext
private const val TAG = "PUSHPROX_CLIENT"
// configuration class for pushprox
data class PushProxConfig(
val pushProxUrl : String,
val pushProxFqdn : String,
val registry : CollectorRegistry,
)
/** /**
* Counters for monitoring the pushprox itself, compatible with the reference * Counters for monitoring the pushprox itself, compatible with the reference
* implementation in golang, source: https://github.dev/prometheus-community/PushProx * implementation in golang, source: https://github.dev/prometheus-community/PushProx
*/ */
private class Counters(collectorRegistry: CollectorRegistry) { private class PushProxCounter(registry: CollectorRegistry) {
private val collectorRegistry : CollectorRegistry private val pushErrorCounter : Counter = Counter.build()
private val scrapeErrorCounter : Counter
private val pushErrorCounter : Counter
private val pollErrorCounter : Counter
init {
this.collectorRegistry = collectorRegistry
// following 3 counters are compatible with reference implementation
scrapeErrorCounter = Counter.build()
.name("pushprox_client_scrape_errors_total")
.help("Number of scrape errors")
.register(collectorRegistry)
pushErrorCounter = Counter.build()
.name("pushprox_client_push_errors_total")
.help("Number of push errors")
.register(collectorRegistry)
pollErrorCounter = Counter.build()
.name("pushprox_client_poll_errors_total") .name("pushprox_client_poll_errors_total")
.help("Number of poll errors") .help("Number of poll errors")
.register(collectorRegistry) .register(registry)
private val scrapeErrorCounter : Counter = Counter.build()
.name("pushprox_client_scrape_errors_total")
.help("Number of scrape errors")
.register(registry)
} private val pollErrorCounter : Counter = Counter.build()
.name("pushprox_client_push_errors_total")
.help("Number of push errors")
.register(registry)
fun scrapeError(){ scrapeErrorCounter.inc()} fun scrapeError(){ scrapeErrorCounter.inc()}
fun pushError(){ pushErrorCounter.inc() } fun pushError(){ pushErrorCounter.inc() }
fun pollError(){ pollErrorCounter.inc() } //TODO use this thing fun pollError(){ pollErrorCounter.inc() } //TODO use this thing
} }
// Error in parsing HTTP header "Id" from HTTP request from Prometheus // Error in parsing HTTP header "Id" from HTTP request from Prometheus //TODO wtf
class PushProxIdParseException(message: String) : Exception(message) class PushProxIdParseException(message: String) : Exception(message)
// Context object for pushprox internal functions to avoid global variables // Context object for pushprox internal functions to avoid global variables
@ -60,22 +58,28 @@ data class PushProxContext(
) )
// This is a stripped down kotlin implementation of github.com/prometheus-community/PushProx client // This is a stripped down kotlin implementation of github.com/prometheus-community/PushProx client
class PushProxClient( class PushProxClient(config: PushProxConfig) {
collectorRegistry: CollectorRegistry, private val counters : PushProxCounter = PushProxCounter(config.registry)
private val performScrape: suspend () -> String
) {
private val counters : Counters = Counters(collectorRegistry)
private val retryInitialWaitSeconds : Int = 1
private val retryMaxWaitSeconds : Int = 5
// Use this function to start exporting metrics to pushprox in the background // Use this function to start exporting metrics to pushprox in the background
suspend fun startBackground(config: PushProxConfig) { suspend fun start(config: PushProxConfig) {
val client : HttpClient = HttpClient() //TODO close this thing Log.v(TAG, "Starting pushprox client")
val context : PushProxContext = processConfig(client, config)
var client : HttpClient? = null
try {
client = HttpClient()
val context : PushProxContext = getPushProxContext(client, config)
loop(context) loop(context)
}finally {
withContext(NonCancellable){
Log.v(TAG, "Canceling pushprox client")
client?.close()
}
}
} }
private fun processConfig(client : HttpClient, config : PushProxConfig) : PushProxContext {
private fun getPushProxContext(client : HttpClient, config : PushProxConfig) : PushProxContext {
var modifiedProxyURL = config.pushProxUrl.trim('/') var modifiedProxyURL = config.pushProxUrl.trim('/')
if( if(
@ -96,6 +100,7 @@ class PushProxClient(
) )
} }
//TODO refactor this function
// Continuous poll from android phone to pushprox gateway // Continuous poll from android phone to pushprox gateway
private suspend fun doPoll(context : PushProxContext){ private suspend fun doPoll(context : PushProxContext){
log("poll", "polling now") log("poll", "polling now")
@ -107,6 +112,7 @@ class PushProxClient(
doPush(context, responseBody) doPush(context, responseBody)
} }
//TODO refactor this function
// get value of HTTP header "Id" from response body // get value of HTTP header "Id" from response body
private fun getIdFromResponseBody(responseBody: String) : String { private fun getIdFromResponseBody(responseBody: String) : String {
@ -118,14 +124,14 @@ class PushProxClient(
return id return id
} }
//TODO refactor this function
private fun composeRequestBody(scrapedMetrics: String, id : String) : String { private fun composeRequestBody(scrapedMetrics: String, id : String) : String {
val httpHeaders = "HTTP/1.1 200 OK\r\n" + val httpHeaders = "HTTP/1.1 200 OK\r\n" +
"Content-Type: text/plain; version=0.0.4; charset=utf-8\r\n" + "Content-Type: text/plain; version=0.0.4; charset=utf-8\r\n" +
"Id: $id\r\n" + "Id: $id\r\n" +
"X-Prometheus-Scrape-Timeout: 9.5\r\n" "X-Prometheus-Scrape-Timeout: 9.5\r\n"
val result : String = httpHeaders + "\r\n" + scrapedMetrics return httpHeaders + "\r\n" + scrapedMetrics
return result
} }
// Parameter responseBody: response body of /poll request // Parameter responseBody: response body of /poll request
@ -158,11 +164,11 @@ class PushProxClient(
//TODO migrate to work manager //TODO migrate to work manager
private suspend fun loop(context : PushProxContext) { private suspend fun loop(context : PushProxContext) {
var shouldContinue : Boolean = true while (true) {
while (shouldContinue) { Log.v(TAG, "PushProxClient main loop start")
log("pushprox main loop", "loop start")
// register poll error using try-catch block // register poll error using try-catch block
//TODO backoff strategy //TODO backoff strategy
//TODO asap
// var result = context.backoff.withRetries { // var result = context.backoff.withRetries {
// try { // try {
// doPoll(context) // doPoll(context)
@ -180,11 +186,7 @@ class PushProxClient(
// throw e // throw e
// } // }
// } // }
log("pushprox main loop", "loop end") Log.v(TAG,"PushProxClient main loop end")
} }
} }
private fun log(title: String, text: String) {
Log.v("PUSHPROXCLIENT", "$title: $text")
}
} }

View File

@ -0,0 +1,10 @@
package com.birdthedeveloper.prometheus.android.prometheus.android.exporter.worker
data class RemoteWriteConfiguration(
val scrape_interval : Int,
val remote_write_endpoint : String,
)
class RemoteWrite {
//TODO implement this thing
}