This commit is contained in:
Martin Ptáček
2023-06-07 21:16:49 +02:00
parent 74ebf06202
commit 1bfa8779e2
7 changed files with 194 additions and 80 deletions

View File

@ -2,12 +2,14 @@ package com.birdthedeveloper.prometheus.android.prometheus.android.exporter.work
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.delay
import kotlin.math.min
import kotlin.math.pow
class ExponentialBackoff {
companion object {
private const val multiplier: Double = 2.0
private const val initialDelay = 3.0 // seconds
private const val maxDelay = 61.0 // seconds
suspend fun runWithBackoff(function: suspend () -> Unit, onException: () -> Unit) {
var successfull: Boolean = false
@ -34,6 +36,7 @@ class ExponentialBackoff {
// calculate new delay
currentExpIndex++
currentDelay = initialDelay + multiplier.pow(currentExpIndex)
currentDelay = min(currentDelay, maxDelay)
}
delay(currentDelay.toLong())

View File

@ -12,8 +12,11 @@ import com.birdthedeveloper.prometheus.android.prometheus.android.exporter.R
import com.birdthedeveloper.prometheus.android.prometheus.android.exporter.compose.PromConfiguration
import io.prometheus.client.CollectorRegistry
import io.prometheus.client.exporter.common.TextFormat
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import java.io.StringWriter
private const val TAG = "Worker"
@ -45,55 +48,57 @@ class PromWorker(
androidCustomExporter = AndroidCustomExporter(metricsEngine).register(collectorRegistry)
}
private fun countSuccessfulScrape(){
private suspend fun countSuccessfulScrape(){
remoteWriteSender?.countSuccessfulScrape()
}
private suspend fun startServices(config: PromConfiguration) {
@OptIn(DelicateCoroutinesApi::class)
private suspend fun startServicesInOneThread(config: PromConfiguration){
val threadContext = newSingleThreadContext("PromWorkerThreadContext")
var deferred = coroutineScope {
withContext(threadContext){
if (config.remoteWriteEnabled) {
remoteWriteSender = RemoteWriteSender(
RemoteWriteConfiguration(
config.remoteWriteScrapeInterval,
config.remoteWriteEndpoint,
::performScrape,
if (config.remoteWriteEnabled) {
remoteWriteSender = RemoteWriteSender(
RemoteWriteConfiguration(
config.remoteWriteScrapeInterval,
config.remoteWriteEndpoint,
collectorRegistry,
)
)
)
launch {
remoteWriteSender?.start()
launch {
remoteWriteSender?.start()
}
}
if (config.prometheusServerEnabled) {
launch {
PrometheusServer.start(
PrometheusServerConfig(
config.prometheusServerPort,
::performScrape,
::countSuccessfulScrape,
),
)
}
}
if (config.pushproxEnabled) {
pushProxClient = PushProxClient(
PushProxConfig(
pushProxUrl = config.pushproxProxyUrl,
performScrape = ::performScrape,
pushProxFqdn = config.pushproxFqdn,
registry = collectorRegistry,
countSuccessfulScrape = ::countSuccessfulScrape,
)
)
launch {
pushProxClient.start()
}
}
}
if (config.prometheusServerEnabled) {
launch {
PrometheusServer.start(
PrometheusServerConfig(
config.prometheusServerPort,
::performScrape,
::countSuccessfulScrape,
),
)
}
}
if (config.pushproxEnabled) {
pushProxClient = PushProxClient(
PushProxConfig(
pushProxUrl = config.pushproxProxyUrl,
performScrape = ::performScrape,
pushProxFqdn = config.pushproxFqdn,
registry = collectorRegistry,
countSuccessfulScrape = ::countSuccessfulScrape,
)
)
launch {
pushProxClient.start()
}
}
}
}
@ -104,7 +109,7 @@ class PromWorker(
//setForeground(createForegroundInfo())
initializeWork(inputConfiguration)
startServices(inputConfiguration)
startServicesInOneThread(inputConfiguration)
return Result.success()
}

View File

@ -21,7 +21,7 @@ private const val TAG = "PROMETHEUS_SERVER"
data class PrometheusServerConfig(
val port: Int,
val performScrape: suspend () -> String,
val countSuccessfulScrape: () -> Unit,
val countSuccessfulScrape: suspend () -> Unit,
)

View File

@ -21,7 +21,7 @@ data class PushProxConfig(
val pushProxFqdn: String,
val registry: CollectorRegistry,
val performScrape: () -> String,
val countSuccessfulScrape: () -> Unit,
val countSuccessfulScrape: suspend () -> Unit,
)
/**

View File

@ -8,58 +8,54 @@ import io.ktor.client.request.headers
import io.ktor.client.request.post
import io.ktor.client.request.setBody
import io.ktor.http.HttpHeaders
import io.prometheus.client.CollectorRegistry
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import org.iq80.snappy.Snappy
import remote.write.RemoteWrite
import remote.write.RemoteWrite.Label
import remote.write.RemoteWrite.TimeSeries
import remote.write.RemoteWrite.WriteRequest
import java.util.Timer
import kotlin.concurrent.schedule
private const val TAG: String = "REMOTE_WRITE_SENDER"
private class LastTimeMutex{
private val mutex = Mutex()
private var lastTime : Long = 0
suspend fun setLastTime(time : Long){
mutex.withLock {
lastTime = time
}
}
fun getLastTime() : Long { return lastTime }
}
private enum class RemoteWriteSenderState {
REMOTE_WRITE,
PUSHPROX_OR_PROMETHEUS_SERVER,
}
private class RemoteWriteSenderStateMutex {
private val mutex = Mutex()
private var remoteWriteSenderState = RemoteWriteSenderState.PUSHPROX_OR_PROMETHEUS_SERVER
suspend fun setRemoteWriteSenderState(state : RemoteWriteSenderState){
mutex.withLock {
remoteWriteSenderState = state
}
private class LastTimeRingBuffer {
//TODO implement this with ring array
private fun checkScrapeDidNotHappenInTime() : Boolean {
return lastTimeMutex.getLastTime() < System.currentTimeMillis() - 3 * config.scrape_interval
}
fun getRemoteWriteSenderState() : RemoteWriteSenderState {
return remoteWriteSenderState
private fun checkScrapeDidNotHappenHysteresis() : Boolean {
return false //TODO implement this with ring buffer in lastTimeMutex
}
}
data class RemoteWriteConfiguration(
val scrape_interval: Int,
val remote_write_endpoint: String,
val performScrape: () -> String, //TODO this class needs it structured in objects
val collectorRegistry: CollectorRegistry,
)
//TODO implement this thing
class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
private val lastTimeMutex = LastTimeMutex()
// TODO ring buffer for last time
// TODO last time into it's own object with boolean functions
// private val lastTimeMutex = LastTimeMutex()
private var alreadyStoredSampleLength : Int = 0
private val storage : RemoteWriteSenderStorage = RemoteWriteSenderMemoryStorage()
private val scrapesAreBeingSentMutex = Mutex()
private fun getRequestBody(): ByteArray {
val label1: Label = Label.newBuilder()
@ -104,25 +100,112 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
return request.toByteArray()
}
suspend fun start(){
private fun performScrape() : MetricsScrape {
for ( item in config.collectorRegistry.metricFamilySamples() ){
val name : String = item.name
for (sample in item.samples){
val timestamp : Long = sample.timestampMs
val labelValueIterator : Iterator<String> = sample.labelValues.iterator()
for(labelName in sample.labelNames){
val protobufLabel : Label = Label.newBuilder()
.setName(labelName)
.setValue(labelValueIterator.next())
.build()
}
val sampleValue : Double = sample.value
}
}
}
//TODO channel je k hovnu
//TODO v remotewriteseender storage musi byt mutex
private suspend fun scraper(channel : Channel<Unit>){
val checkDelay = 1000L
while (true){
if (checkScrapeDidNotHappenInTime()){
delay(config.scrape_interval * 1000L)
storage.writeScrapedSample(performScrape())
while(checkScrapeDidNotHappenHysteresis()){
delay(config.scrape_interval * 1000L)
storage.writeScrapedSample(performScrape())
}
}
delay(checkDelay)
}
}
// sending metric scrapes to remote write endpoint will not be parallel
private suspend fun sendAll(){
scrapesAreBeingSentMutex.withLock {
// Take all metric samples and send them in batches of (max_samples_per_send)
// one by one batch
}
}
private suspend fun senderManager(channel : Channel<Unit>){
val alreadyStoredMetricScrapes : Int = storage.getLength()
runBlocking {
//TODO from here
if (alreadyStoredMetricScrapes > 0){
launch { // fire and forget
sendAll()
}
}
channel.receive()
// suspended on channel.receive
// when there are enough to send:
// start a sender
// send with these conditions:
//
}
}
fun countSuccessfulScrape(){
Log.v(TAG, "countSuccesful scrape")
//TODO implement this "last time" and mutex
suspend fun start(){
val channel = Channel<Unit>()
try {
runBlocking {
launch {
scraper(channel)
}
launch {
senderManager(channel)
}
}
} finally {
withContext(NonCancellable){
channel.close()
Log.v(TAG, "Canceling Remote Write Sender")
}
}
}
scheduleCheckScrapesHappened()
suspend fun countSuccessfulScrape(){
Log.v(TAG, "Counting successful scrape")
lastTimeMutex.setLastTime(System.currentTimeMillis())
}
private fun encodeWithSnappy(data: ByteArray): ByteArray {
return Snappy.compress(data)
}
suspend fun sendTestRequest() {
private suspend fun sendTestRequest() {
Log.v(TAG, "sending to prometheus now")
val client = HttpClient()
val response = client.post(config.remote_write_endpoint) {
@ -141,4 +224,3 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
client.close()
}
}

View File

@ -1,16 +1,29 @@
package com.birdthedeveloper.prometheus.android.prometheus.android.exporter.worker
typealias MetricsScrape = String
import java.sql.Timestamp
import java.util.LinkedList
import java.util.Queue
// define the contract for Remote Write Sender storage
//TODO toto je na houby cele, musi byt structured misto byte array
class MetricsScrape(
val compressedMetrics : ByteArray,
val timestamp: Long,
)
// No need for locks as all operations are run on a single thread, defined in PromWorker
// This class defines contract for RemoteWriteSender storage
abstract class RemoteWriteSenderStorage {
abstract fun writeScrapedSample(metricsScrape: MetricsScrape)
abstract fun getNumberOfScrapedSamples(number: Int): List<MetricsScrape>
abstract fun removeNumberOfScrapedSamples(number: Int)
abstract fun isEmpty(): Boolean
abstract fun getLength(): Int
}
class RemoteWriteSenderMemoryStorage : RemoteWriteSenderStorage() {
// writeRequests are stored in protobuf already compressed
private val data : Queue<MetricsScrape> = LinkedList<MetricsScrape>()
override fun getNumberOfScrapedSamples(number: Int): List<MetricsScrape> {
TODO("Not yet implemented")
}
@ -26,6 +39,10 @@ class RemoteWriteSenderMemoryStorage : RemoteWriteSenderStorage() {
override fun isEmpty(): Boolean {
TODO("Not yet implemented")
}
override fun getLength(): Int {
TODO("Not yet implemented")
}
}
class RemoteWriteSenterDatabaseStorage : RemoteWriteSenderStorage() {
@ -44,4 +61,8 @@ class RemoteWriteSenterDatabaseStorage : RemoteWriteSenderStorage() {
override fun isEmpty(): Boolean {
TODO("Not yet implemented")
}
override fun getLength(): Int {
TODO("Not yet implemented")
}
}

View File

@ -31,4 +31,7 @@ remote_write:
# where to post metrics
# data type: string, no default value provided
# example: http://localhost:9090/
remote_write_endpoint:
remote_write_endpoint:
# default value is 5 scrapes
max_samples_per_send: 5