From 1bfa8779e277b265bd6eaffe541e1aad961cf793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Pt=C3=A1=C4=8Dek?= Date: Wed, 7 Jun 2023 21:16:49 +0200 Subject: [PATCH] wip --- .../exporter/worker/ExponentialBackoff.kt | 3 + .../android/exporter/worker/PromWorker.kt | 89 ++++++----- .../exporter/worker/PrometheusServer.kt | 2 +- .../android/exporter/worker/PushProxClient.kt | 2 +- .../exporter/worker/RemoteWriteSender.kt | 148 ++++++++++++++---- .../worker/RemoteWriteSenderStorage.kt | 25 ++- config_file_structure.yaml | 5 +- 7 files changed, 194 insertions(+), 80 deletions(-) diff --git a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/ExponentialBackoff.kt b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/ExponentialBackoff.kt index 69337f8..1d9dac1 100644 --- a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/ExponentialBackoff.kt +++ b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/ExponentialBackoff.kt @@ -2,12 +2,14 @@ package com.birdthedeveloper.prometheus.android.prometheus.android.exporter.work import kotlinx.coroutines.CancellationException import kotlinx.coroutines.delay +import kotlin.math.min import kotlin.math.pow class ExponentialBackoff { companion object { private const val multiplier: Double = 2.0 private const val initialDelay = 3.0 // seconds + private const val maxDelay = 61.0 // seconds suspend fun runWithBackoff(function: suspend () -> Unit, onException: () -> Unit) { var successfull: Boolean = false @@ -34,6 +36,7 @@ class ExponentialBackoff { // calculate new delay currentExpIndex++ currentDelay = initialDelay + multiplier.pow(currentExpIndex) + currentDelay = min(currentDelay, maxDelay) } delay(currentDelay.toLong()) diff --git a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PromWorker.kt b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PromWorker.kt index da85338..9eb88cd 100644 --- a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PromWorker.kt +++ b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PromWorker.kt @@ -12,8 +12,11 @@ import com.birdthedeveloper.prometheus.android.prometheus.android.exporter.R import com.birdthedeveloper.prometheus.android.prometheus.android.exporter.compose.PromConfiguration import io.prometheus.client.CollectorRegistry import io.prometheus.client.exporter.common.TextFormat +import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch +import kotlinx.coroutines.newSingleThreadContext +import kotlinx.coroutines.withContext import java.io.StringWriter private const val TAG = "Worker" @@ -45,55 +48,57 @@ class PromWorker( androidCustomExporter = AndroidCustomExporter(metricsEngine).register(collectorRegistry) } - private fun countSuccessfulScrape(){ + private suspend fun countSuccessfulScrape(){ remoteWriteSender?.countSuccessfulScrape() } - private suspend fun startServices(config: PromConfiguration) { + @OptIn(DelicateCoroutinesApi::class) + private suspend fun startServicesInOneThread(config: PromConfiguration){ + val threadContext = newSingleThreadContext("PromWorkerThreadContext") + var deferred = coroutineScope { + withContext(threadContext){ - if (config.remoteWriteEnabled) { - remoteWriteSender = RemoteWriteSender( - RemoteWriteConfiguration( - config.remoteWriteScrapeInterval, - config.remoteWriteEndpoint, - ::performScrape, + if (config.remoteWriteEnabled) { + remoteWriteSender = RemoteWriteSender( + RemoteWriteConfiguration( + config.remoteWriteScrapeInterval, + config.remoteWriteEndpoint, + collectorRegistry, + ) ) - ) - launch { - remoteWriteSender?.start() + launch { + remoteWriteSender?.start() + } + } + + if (config.prometheusServerEnabled) { + launch { + PrometheusServer.start( + PrometheusServerConfig( + config.prometheusServerPort, + ::performScrape, + ::countSuccessfulScrape, + ), + ) + } + } + + if (config.pushproxEnabled) { + pushProxClient = PushProxClient( + PushProxConfig( + pushProxUrl = config.pushproxProxyUrl, + performScrape = ::performScrape, + pushProxFqdn = config.pushproxFqdn, + registry = collectorRegistry, + countSuccessfulScrape = ::countSuccessfulScrape, + ) + ) + launch { + pushProxClient.start() + } } } - - if (config.prometheusServerEnabled) { - launch { - PrometheusServer.start( - PrometheusServerConfig( - config.prometheusServerPort, - ::performScrape, - ::countSuccessfulScrape, - ), - ) - } - } - - if (config.pushproxEnabled) { - pushProxClient = PushProxClient( - PushProxConfig( - pushProxUrl = config.pushproxProxyUrl, - performScrape = ::performScrape, - pushProxFqdn = config.pushproxFqdn, - registry = collectorRegistry, - countSuccessfulScrape = ::countSuccessfulScrape, - ) - ) - launch { - pushProxClient.start() - } - } - - - } } @@ -104,7 +109,7 @@ class PromWorker( //setForeground(createForegroundInfo()) initializeWork(inputConfiguration) - startServices(inputConfiguration) + startServicesInOneThread(inputConfiguration) return Result.success() } diff --git a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PrometheusServer.kt b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PrometheusServer.kt index ac30e6e..1a77322 100644 --- a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PrometheusServer.kt +++ b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PrometheusServer.kt @@ -21,7 +21,7 @@ private const val TAG = "PROMETHEUS_SERVER" data class PrometheusServerConfig( val port: Int, val performScrape: suspend () -> String, - val countSuccessfulScrape: () -> Unit, + val countSuccessfulScrape: suspend () -> Unit, ) diff --git a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PushProxClient.kt b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PushProxClient.kt index 191cf47..2a40c68 100644 --- a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PushProxClient.kt +++ b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/PushProxClient.kt @@ -21,7 +21,7 @@ data class PushProxConfig( val pushProxFqdn: String, val registry: CollectorRegistry, val performScrape: () -> String, - val countSuccessfulScrape: () -> Unit, + val countSuccessfulScrape: suspend () -> Unit, ) /** diff --git a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/RemoteWriteSender.kt b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/RemoteWriteSender.kt index 8938bfb..77a6a82 100644 --- a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/RemoteWriteSender.kt +++ b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/RemoteWriteSender.kt @@ -8,58 +8,54 @@ import io.ktor.client.request.headers import io.ktor.client.request.post import io.ktor.client.request.setBody import io.ktor.http.HttpHeaders +import io.prometheus.client.CollectorRegistry +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withContext import org.iq80.snappy.Snappy import remote.write.RemoteWrite import remote.write.RemoteWrite.Label import remote.write.RemoteWrite.TimeSeries import remote.write.RemoteWrite.WriteRequest -import java.util.Timer -import kotlin.concurrent.schedule private const val TAG: String = "REMOTE_WRITE_SENDER" -private class LastTimeMutex{ - private val mutex = Mutex() - private var lastTime : Long = 0 - suspend fun setLastTime(time : Long){ - mutex.withLock { - lastTime = time - } - } - fun getLastTime() : Long { return lastTime } -} - private enum class RemoteWriteSenderState { REMOTE_WRITE, PUSHPROX_OR_PROMETHEUS_SERVER, } -private class RemoteWriteSenderStateMutex { - private val mutex = Mutex() - private var remoteWriteSenderState = RemoteWriteSenderState.PUSHPROX_OR_PROMETHEUS_SERVER - suspend fun setRemoteWriteSenderState(state : RemoteWriteSenderState){ - mutex.withLock { - remoteWriteSenderState = state - } +private class LastTimeRingBuffer { + //TODO implement this with ring array + + private fun checkScrapeDidNotHappenInTime() : Boolean { + return lastTimeMutex.getLastTime() < System.currentTimeMillis() - 3 * config.scrape_interval } - fun getRemoteWriteSenderState() : RemoteWriteSenderState { - return remoteWriteSenderState + private fun checkScrapeDidNotHappenHysteresis() : Boolean { + return false //TODO implement this with ring buffer in lastTimeMutex } + } data class RemoteWriteConfiguration( val scrape_interval: Int, val remote_write_endpoint: String, - val performScrape: () -> String, //TODO this class needs it structured in objects + val collectorRegistry: CollectorRegistry, ) -//TODO implement this thing class RemoteWriteSender(private val config: RemoteWriteConfiguration) { - private val lastTimeMutex = LastTimeMutex() + // TODO ring buffer for last time + // TODO last time into it's own object with boolean functions + // private val lastTimeMutex = LastTimeMutex() + private var alreadyStoredSampleLength : Int = 0 + private val storage : RemoteWriteSenderStorage = RemoteWriteSenderMemoryStorage() + private val scrapesAreBeingSentMutex = Mutex() private fun getRequestBody(): ByteArray { val label1: Label = Label.newBuilder() @@ -104,25 +100,112 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) { return request.toByteArray() } - suspend fun start(){ + private fun performScrape() : MetricsScrape { + + + for ( item in config.collectorRegistry.metricFamilySamples() ){ + val name : String = item.name + for (sample in item.samples){ + val timestamp : Long = sample.timestampMs + + val labelValueIterator : Iterator = sample.labelValues.iterator() + for(labelName in sample.labelNames){ + val protobufLabel : Label = Label.newBuilder() + .setName(labelName) + .setValue(labelValueIterator.next()) + .build() + } + + val sampleValue : Double = sample.value + + + } + } + } + + //TODO channel je k hovnu + //TODO v remotewriteseender storage musi byt mutex + + + private suspend fun scraper(channel : Channel){ + val checkDelay = 1000L + while (true){ + if (checkScrapeDidNotHappenInTime()){ + delay(config.scrape_interval * 1000L) + storage.writeScrapedSample(performScrape()) + + while(checkScrapeDidNotHappenHysteresis()){ + delay(config.scrape_interval * 1000L) + storage.writeScrapedSample(performScrape()) + } + } + delay(checkDelay) + } + } + + // sending metric scrapes to remote write endpoint will not be parallel + private suspend fun sendAll(){ + scrapesAreBeingSentMutex.withLock { + // Take all metric samples and send them in batches of (max_samples_per_send) + // one by one batch + + + } + } + + private suspend fun senderManager(channel : Channel){ + val alreadyStoredMetricScrapes : Int = storage.getLength() + runBlocking { - //TODO from here + if (alreadyStoredMetricScrapes > 0){ + launch { // fire and forget + sendAll() + } + } + + channel.receive() + + + // suspended on channel.receive + + // when there are enough to send: + // start a sender + + // send with these conditions: + // } } - fun countSuccessfulScrape(){ - Log.v(TAG, "countSuccesful scrape") - //TODO implement this "last time" and mutex + suspend fun start(){ + val channel = Channel() + try { + runBlocking { + launch { + scraper(channel) + } + launch { + senderManager(channel) + } + } + } finally { + withContext(NonCancellable){ + channel.close() + Log.v(TAG, "Canceling Remote Write Sender") + } + } + } - scheduleCheckScrapesHappened() + suspend fun countSuccessfulScrape(){ + Log.v(TAG, "Counting successful scrape") + lastTimeMutex.setLastTime(System.currentTimeMillis()) } private fun encodeWithSnappy(data: ByteArray): ByteArray { return Snappy.compress(data) } - suspend fun sendTestRequest() { + private suspend fun sendTestRequest() { Log.v(TAG, "sending to prometheus now") val client = HttpClient() val response = client.post(config.remote_write_endpoint) { @@ -141,4 +224,3 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) { client.close() } } - diff --git a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/RemoteWriteSenderStorage.kt b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/RemoteWriteSenderStorage.kt index 3305026..b4206aa 100644 --- a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/RemoteWriteSenderStorage.kt +++ b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/RemoteWriteSenderStorage.kt @@ -1,16 +1,29 @@ package com.birdthedeveloper.prometheus.android.prometheus.android.exporter.worker -typealias MetricsScrape = String +import java.sql.Timestamp +import java.util.LinkedList +import java.util.Queue -// define the contract for Remote Write Sender storage +//TODO toto je na houby cele, musi byt structured misto byte array +class MetricsScrape( + val compressedMetrics : ByteArray, + val timestamp: Long, +) + +// No need for locks as all operations are run on a single thread, defined in PromWorker +// This class defines contract for RemoteWriteSender storage abstract class RemoteWriteSenderStorage { abstract fun writeScrapedSample(metricsScrape: MetricsScrape) abstract fun getNumberOfScrapedSamples(number: Int): List abstract fun removeNumberOfScrapedSamples(number: Int) abstract fun isEmpty(): Boolean + abstract fun getLength(): Int } class RemoteWriteSenderMemoryStorage : RemoteWriteSenderStorage() { + // writeRequests are stored in protobuf already compressed + private val data : Queue = LinkedList() + override fun getNumberOfScrapedSamples(number: Int): List { TODO("Not yet implemented") } @@ -26,6 +39,10 @@ class RemoteWriteSenderMemoryStorage : RemoteWriteSenderStorage() { override fun isEmpty(): Boolean { TODO("Not yet implemented") } + + override fun getLength(): Int { + TODO("Not yet implemented") + } } class RemoteWriteSenterDatabaseStorage : RemoteWriteSenderStorage() { @@ -44,4 +61,8 @@ class RemoteWriteSenterDatabaseStorage : RemoteWriteSenderStorage() { override fun isEmpty(): Boolean { TODO("Not yet implemented") } + + override fun getLength(): Int { + TODO("Not yet implemented") + } } \ No newline at end of file diff --git a/config_file_structure.yaml b/config_file_structure.yaml index e8c24a4..4266677 100644 --- a/config_file_structure.yaml +++ b/config_file_structure.yaml @@ -31,4 +31,7 @@ remote_write: # where to post metrics # data type: string, no default value provided # example: http://localhost:9090/ - remote_write_endpoint: + remote_write_endpoint: + + # default value is 5 scrapes + max_samples_per_send: 5