pushprox works

This commit is contained in:
Martin Ptáček
2023-05-02 11:44:14 +02:00
parent ef749b4e46
commit d9e8cbf5b9
2 changed files with 33 additions and 55 deletions

View File

@ -1,42 +1,28 @@
package com.birdthedeveloper.prometheus.android.prometheus.android.exporter
import android.preference.PreferenceActivity.Header
import android.util.Log
import io.github.reugn.kotlin.backoff.StrategyBackoff
import io.github.reugn.kotlin.backoff.strategy.ExponentialStrategy
import io.ktor.client.HttpClient
import io.ktor.client.call.body
import io.ktor.client.engine.cio.CIO
import io.ktor.client.request.HttpRequestBuilder
import io.ktor.client.request.get
import io.ktor.client.request.header
import io.ktor.client.request.post
import io.ktor.client.request.request
import io.ktor.client.request.setBody
import io.ktor.client.statement.HttpResponse
import io.ktor.http.HttpHeaders
import io.ktor.http.HttpMethod
import io.ktor.http.URLBuilder
import io.ktor.http.Url
import io.ktor.http.maxAge
import io.prometheus.client.CollectorRegistry
import io.prometheus.client.Counter
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.system.exitProcess
import kotlin.time.Duration
// Counters for monitoring the pushprox itself, compatible with the reference implementation in go.
// Counters for monitoring the pushprox itself, compatible with the reference
// implementation in golang, source: https://github.dev/prometheus-community/PushProx
private class Counters(collectorRegistry: CollectorRegistry?) {
private val collectorRegistry : CollectorRegistry?
private lateinit var scrapeErrorCounter : Counter
private lateinit var pushErrorCounter : Counter
private lateinit var pollErrorCounter : Counter
private lateinit var pollSuccessCounter : Counter
private var enabled : Boolean = false
init {
@ -57,12 +43,6 @@ private class Counters(collectorRegistry: CollectorRegistry?) {
.name("pushprox_client_poll_errors_total")
.help("Number of poll errors")
.register(collectorRegistry)
// custom
pollSuccessCounter = Counter.build()
.name("pushprox_client_poll_total")
.help("Number of succesfull polls")
.register(collectorRegistry)
}
}
@ -71,8 +51,6 @@ private class Counters(collectorRegistry: CollectorRegistry?) {
fun pushError(){ if (enabled) pushErrorCounter.inc() }
fun pollError(){ if (enabled) pollErrorCounter.inc() }
fun pollSuccess(){ if (enabled) pollSuccessCounter.inc() }
}
// Configuration object for this pushprox client
@ -85,18 +63,22 @@ data class PushProxConfig(
val performScrape: suspend () -> String,
)
// error in parsing HTTP header "Id" from HTTP request from Prometheus
// Error in parsing HTTP header "Id" from HTTP request from Prometheus
class PushProxIdParseException(message: String) : Exception(message)
// This is a stripped down kotlin implementation of github.com/prometheus-community/PushProx client
class PushProxClient(config: PushProxConfig) {
//TODO dispose this thing - delete http client
//TODO dispose this thing - delete http client - something like the bellow stuff
//val status = HttpClient().use { client ->
// // ...
//}
private val config: PushProxConfig
private val pollURL: String
private val pushURL: String
private lateinit var client: HttpClient
private lateinit var counters : Counters
private var running : Boolean = false
init {
this.config = config
@ -109,19 +91,20 @@ class PushProxClient(config: PushProxConfig) {
pushURL = "$modifiedProxyURL/push"
}
// initialize resource - heavier objects
// Initialize resources - heavier objects
private fun setup(){
// init counters if they are enabled
counters = Counters(config.collectorRegistry)
client = HttpClient(CIO)
}
// use this function to start exporting metrics to pushprox in the background
// Use this function to start exporting metrics to pushprox in the background
public fun startBackground() {
setup()
loop(newBackoffFromFlags())
}
// Continuous poll from android phone to pushprox gateway
private suspend fun doPoll(){
log("poll", "polling now")
log(pollURL, pollURL)
@ -131,12 +114,6 @@ class PushProxClient(config: PushProxConfig) {
log("here", "here")
val responseBody: String = response.body<String>()
doPush(responseBody)
// response body is not needed
log("responseBody in poll", responseBody)
log("got scrape request", responseBody)
//TODO asap
}
// get value of HTTP header "Id" from response body
@ -150,10 +127,19 @@ class PushProxClient(config: PushProxConfig) {
return id
}
// responseBody: response body of /poll request
private suspend fun doPush(responseBody : String) {
//TODO implement
private fun composeRequestBody(scrapedMetrics: String, id : String) : String {
val httpHeaders = "HTTP/1.1 200 OK\r\n" +
"Content-Type: text/plain; version=0.0.4; charset=utf-8\r\n" +
"Id: $id\r\n" +
"X-Prometheus-Scrape-Timeout: 9.5\r\n"
val result : String = httpHeaders + "\r\n" + scrapedMetrics
log("result", result)
return result
}
// Parameter responseBody: response body of /poll request
private suspend fun doPush(pollResponseBody : String) {
// perform scrape
lateinit var scrapedMetrics : String
try {
@ -165,15 +151,12 @@ class PushProxClient(config: PushProxConfig) {
}
try{
log("scraped metrics in doPush", scrapedMetrics)
val scrapeId : String = getIdFromResponseBody(responseBody)
log("scrapeId", scrapeId)
val response : HttpResponse = client.request(pushURL) {
header("id", scrapeId)
//header("X-prometheus-scrape-timeout", "4") //TODO this is dummy for now
method = HttpMethod.Post
val scrapeId : String = getIdFromResponseBody(pollResponseBody)
val pushResponseBody: String = composeRequestBody(scrapedMetrics, scrapeId)
setBody(scrapedMetrics)
val response : HttpResponse = client.request(pushURL) {
method = HttpMethod.Post
setBody(pushResponseBody)
}
}catch(e : Exception){
@ -184,10 +167,6 @@ class PushProxClient(config: PushProxConfig) {
}
private fun handleErr(){
//TODO implement
}
private fun newBackoffFromFlags() : StrategyBackoff<Unit> {
return StrategyBackoff<Unit>(
strategy = ExponentialStrategy(
@ -198,8 +177,7 @@ class PushProxClient(config: PushProxConfig) {
)
}
//TODO migrate to work manager
private fun loop(backoff: StrategyBackoff<Unit>) {
// fire and forget a new coroutine
GlobalScope.launch {
@ -217,9 +195,8 @@ class PushProxClient(config: PushProxConfig) {
throw e
}
}
log("pushprox main loop", "loop end")
}
job.join()
job.join() // wait for the job to finish
}
}
}

View File

@ -10,8 +10,9 @@ scrape_configs:
- job_name: "android phones"
proxy_url: "http://pushprox:8080" #TODO add mobile phones here
tls_config:
insecure_skip_verify: true
static_configs:
- targets: [
"test.example.com"
]