This commit is contained in:
Martin Ptáček
2023-06-06 19:09:51 +02:00
parent 563aae330b
commit 74ebf06202
4 changed files with 82 additions and 21 deletions

View File

@@ -27,7 +27,7 @@ class PromWorker(
private val metricsEngine: MetricsEngine = MetricsEngine(context) private val metricsEngine: MetricsEngine = MetricsEngine(context)
private lateinit var androidCustomExporter: AndroidCustomExporter private lateinit var androidCustomExporter: AndroidCustomExporter
private lateinit var pushProxClient: PushProxClient private lateinit var pushProxClient: PushProxClient
private lateinit var remoteWriteSender: RemoteWriteSender private var remoteWriteSender: RemoteWriteSender? = null
//TODO foreground notification //TODO foreground notification
private val notificationManager = private val notificationManager =
@@ -45,18 +45,37 @@ class PromWorker(
androidCustomExporter = AndroidCustomExporter(metricsEngine).register(collectorRegistry) androidCustomExporter = AndroidCustomExporter(metricsEngine).register(collectorRegistry)
} }
private fun countSuccessfulScrape(){
remoteWriteSender?.countSuccessfulScrape()
}
private suspend fun startServices(config: PromConfiguration) { private suspend fun startServices(config: PromConfiguration) {
var deferred = coroutineScope { var deferred = coroutineScope {
Log.v(TAG, "before launched")
if (config.prometheusServerEnabled) { if (config.remoteWriteEnabled) {
launch { remoteWriteSender = RemoteWriteSender(
PrometheusServer.start( RemoteWriteConfiguration(
PrometheusServerConfig(config.prometheusServerPort, ::performScrape), config.remoteWriteScrapeInterval,
config.remoteWriteEndpoint,
::performScrape,
) )
)
launch {
remoteWriteSender?.start()
} }
} }
Log.v(TAG, "launched") if (config.prometheusServerEnabled) {
launch {
PrometheusServer.start(
PrometheusServerConfig(
config.prometheusServerPort,
::performScrape,
::countSuccessfulScrape,
),
)
}
}
if (config.pushproxEnabled) { if (config.pushproxEnabled) {
pushProxClient = PushProxClient( pushProxClient = PushProxClient(
@@ -65,6 +84,7 @@ class PromWorker(
performScrape = ::performScrape, performScrape = ::performScrape,
pushProxFqdn = config.pushproxFqdn, pushProxFqdn = config.pushproxFqdn,
registry = collectorRegistry, registry = collectorRegistry,
countSuccessfulScrape = ::countSuccessfulScrape,
) )
) )
launch { launch {
@@ -72,20 +92,7 @@ class PromWorker(
} }
} }
if (config.remoteWriteEnabled) {
//DO something
Log.v(TAG, "Remote write service started.")
remoteWriteSender = RemoteWriteSender(
RemoteWriteConfiguration(
config.remoteWriteScrapeInterval,
config.remoteWriteEndpoint,
::performScrape,
)
)
//TODO
remoteWriteSender.sendTestRequest()
}
} }
} }

View File

@@ -21,6 +21,7 @@ private const val TAG = "PROMETHEUS_SERVER"
data class PrometheusServerConfig( data class PrometheusServerConfig(
val port: Int, val port: Int,
val performScrape: suspend () -> String, val performScrape: suspend () -> String,
val countSuccessfulScrape: () -> Unit,
) )
@@ -87,6 +88,7 @@ class PrometheusServer() {
} }
get("/metrics") { get("/metrics") {
call.respondText(config.performScrape()) call.respondText(config.performScrape())
config.countSuccessfulScrape()
} }
} }
} }

View File

@@ -21,6 +21,7 @@ data class PushProxConfig(
val pushProxFqdn: String, val pushProxFqdn: String,
val registry: CollectorRegistry, val registry: CollectorRegistry,
val performScrape: () -> String, val performScrape: () -> String,
val countSuccessfulScrape: () -> Unit,
) )
/** /**
@@ -158,6 +159,8 @@ class PushProxClient(private val pushProxConfig: PushProxConfig) {
method = HttpMethod.Post method = HttpMethod.Post
setBody(pushRequestBody) setBody(pushRequestBody)
} }
pushProxConfig.countSuccessfulScrape()
} catch (e: Exception) { } catch (e: Exception) {
counters.pushError() counters.pushError()
Log.v(TAG, "Push exception ${e.toString()}") Log.v(TAG, "Push exception ${e.toString()}")

View File

@@ -8,14 +8,49 @@ import io.ktor.client.request.headers
import io.ktor.client.request.post import io.ktor.client.request.post
import io.ktor.client.request.setBody import io.ktor.client.request.setBody
import io.ktor.http.HttpHeaders import io.ktor.http.HttpHeaders
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.iq80.snappy.Snappy import org.iq80.snappy.Snappy
import remote.write.RemoteWrite import remote.write.RemoteWrite
import remote.write.RemoteWrite.Label import remote.write.RemoteWrite.Label
import remote.write.RemoteWrite.TimeSeries import remote.write.RemoteWrite.TimeSeries
import remote.write.RemoteWrite.WriteRequest import remote.write.RemoteWrite.WriteRequest
import java.util.Timer
import kotlin.concurrent.schedule
private const val TAG: String = "REMOTE_WRITE_SENDER" private const val TAG: String = "REMOTE_WRITE_SENDER"
private class LastTimeMutex{
private val mutex = Mutex()
private var lastTime : Long = 0
suspend fun setLastTime(time : Long){
mutex.withLock {
lastTime = time
}
}
fun getLastTime() : Long { return lastTime }
}
private enum class RemoteWriteSenderState {
REMOTE_WRITE,
PUSHPROX_OR_PROMETHEUS_SERVER,
}
private class RemoteWriteSenderStateMutex {
private val mutex = Mutex()
private var remoteWriteSenderState = RemoteWriteSenderState.PUSHPROX_OR_PROMETHEUS_SERVER
suspend fun setRemoteWriteSenderState(state : RemoteWriteSenderState){
mutex.withLock {
remoteWriteSenderState = state
}
}
fun getRemoteWriteSenderState() : RemoteWriteSenderState {
return remoteWriteSenderState
}
}
data class RemoteWriteConfiguration( data class RemoteWriteConfiguration(
val scrape_interval: Int, val scrape_interval: Int,
val remote_write_endpoint: String, val remote_write_endpoint: String,
@@ -24,7 +59,7 @@ data class RemoteWriteConfiguration(
//TODO implement this thing //TODO implement this thing
class RemoteWriteSender(private val config: RemoteWriteConfiguration) { class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
private val lastTimeMutex = LastTimeMutex()
private fun getRequestBody(): ByteArray { private fun getRequestBody(): ByteArray {
val label1: Label = Label.newBuilder() val label1: Label = Label.newBuilder()
@@ -69,6 +104,20 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
return request.toByteArray() return request.toByteArray()
} }
suspend fun start(){
runBlocking {
//TODO from here
}
}
fun countSuccessfulScrape(){
Log.v(TAG, "countSuccesful scrape")
//TODO implement this "last time" and mutex
scheduleCheckScrapesHappened()
}
private fun encodeWithSnappy(data: ByteArray): ByteArray { private fun encodeWithSnappy(data: ByteArray): ByteArray {
return Snappy.compress(data) return Snappy.compress(data)
} }