This commit is contained in:
Martin Ptáček
2023-06-12 22:40:05 +02:00
parent 0da06a543c
commit dc07c3add3

View File

@ -14,7 +14,6 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import remote.write.RemoteWrite import remote.write.RemoteWrite
@ -24,8 +23,6 @@ import remote.write.RemoteWrite.WriteRequest
private const val TAG: String = "REMOTE_WRITE_SENDER" private const val TAG: String = "REMOTE_WRITE_SENDER"
//TODO this thing
// This class stores information about scrapes to PROM_SERVER and PUSHPROX // This class stores information about scrapes to PROM_SERVER and PUSHPROX
// for purposes of scraping metrics on device and back-filling them later using remote write // for purposes of scraping metrics on device and back-filling them later using remote write
// //
@ -51,11 +48,11 @@ private class LastTimeRingBuffer(private val scrapeIntervalMs: Int) {
return buffer[bufferIndex] return buffer[bufferIndex]
} }
private fun checkScrapeDidNotHappenInTime() : Boolean { fun checkScrapeDidNotHappenInTime() : Boolean {
return getTimeByIndex(0) < System.currentTimeMillis() - 3 * scrapeIntervalMs return getTimeByIndex(0) < System.currentTimeMillis() - 3 * scrapeIntervalMs
} }
private fun checkScrapeDidNotHappenHysteresis() : Boolean { fun checkScrapeDidNotHappenHysteresis() : Boolean {
val scrapeOccuredAfterThis : Long = System.currentTimeMillis() - 5 * scrapeIntervalMs val scrapeOccuredAfterThis : Long = System.currentTimeMillis() - 5 * scrapeIntervalMs
for (i in 0 until hysteresisThreshold) { for (i in 0 until hysteresisThreshold) {
if (getTimeByIndex(i) < scrapeOccuredAfterThis){ if (getTimeByIndex(i) < scrapeOccuredAfterThis){
@ -74,13 +71,13 @@ data class RemoteWriteConfiguration(
) )
class RemoteWriteSender(private val config: RemoteWriteConfiguration) { class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
// TODO ring buffer for last time private val lastTimeRingBuffer = LastTimeRingBuffer(config.scrape_interval * 1000)
// TODO last time into it's own object with boolean functions
// private val lastTimeMutex = LastTimeMutex()
private var alreadyStoredSampleLength : Int = 0 private var alreadyStoredSampleLength : Int = 0
private val storage : RemoteWriteSenderStorage = RemoteWriteSenderSimpleMemoryStorage() private val storage : RemoteWriteSenderStorage = RemoteWriteSenderSimpleMemoryStorage()
private var scrapesAreBeingSent : Boolean = false
private lateinit var client : HttpClient
private fun getRequestBody(): ByteArray { private fun testGetRequestBody(): ByteArray {
val label1: Label = Label.newBuilder() val label1: Label = Label.newBuilder()
.setName("code") .setName("code")
.setValue("200").build() .setValue("200").build()
@ -128,10 +125,7 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
storage.writeScrapedSample(scrapedMetrics) storage.writeScrapedSample(scrapedMetrics)
} }
//TODO channel je bad //TODO refactor this thing
//TODO v remotewriteseender storage musi byt mutex
private suspend fun scraper(channel : Channel<Unit>){ private suspend fun scraper(channel : Channel<Unit>){
val checkDelay = 1000L val checkDelay = 1000L
while (true){ while (true){
@ -148,8 +142,10 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
} }
} }
//TODO refactor this ting
// sending metric scrapes to remote write endpoint will not be parallel // sending metric scrapes to remote write endpoint will not be parallel
private suspend fun sendAll(){ private suspend fun sender(){
//TODO exponential backoff
scrapesAreBeingSentMutex.withLock { scrapesAreBeingSentMutex.withLock {
// Take all metric samples and send them in batches of (max_samples_per_send) // Take all metric samples and send them in batches of (max_samples_per_send)
// one by one batch // one by one batch
@ -158,6 +154,7 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
} }
} }
//TODO refactor this thing
private suspend fun senderManager(channel : Channel<Unit>){ private suspend fun senderManager(channel : Channel<Unit>){
val alreadyStoredMetricScrapes : Int = storage.getLength() val alreadyStoredMetricScrapes : Int = storage.getLength()
@ -182,35 +179,39 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
} }
// entrypoint
suspend fun start(){ suspend fun start(){
val channel = Channel<Unit>() val channel = Channel<Unit>()
client = HttpClient()
try { try {
runBlocking { runBlocking {
launch { launch {
// check for outage in scrapes, save scrapes to storage
scraper(channel) scraper(channel)
} }
launch { launch {
// send saved scrapes to remote write endpoint
senderManager(channel) senderManager(channel)
} }
} }
} finally { } finally {
withContext(NonCancellable){ withContext(NonCancellable){
channel.close() channel.close()
client.close()
Log.v(TAG, "Canceling Remote Write Sender") Log.v(TAG, "Canceling Remote Write Sender")
} }
} }
} }
suspend fun countSuccessfulScrape(){ fun countSuccessfulScrape(){
Log.v(TAG, "Counting successful scrape") Log.v(TAG, "Counting successful scrape")
lastTimeMutex.setLastTime(System.currentTimeMillis()) lastTimeRingBuffer.setLastTime(System.currentTimeMillis())
} }
private suspend fun sendTestRequest() { private suspend fun sendRequestToRemoteWrite(body : ByteArray){
Log.v(TAG, "sending to prometheus now") Log.v(TAG, "sending to prometheus remote write now")
val client = HttpClient()
val response = client.post(config.remote_write_endpoint) { val response = client.post(config.remote_write_endpoint) {
setBody(encodeWithSnappy(getRequestBody())) setBody(body)
headers { headers {
append(HttpHeaders.ContentEncoding, "snappy") append(HttpHeaders.ContentEncoding, "snappy")
append(HttpHeaders.ContentType, "application/protobuf") append(HttpHeaders.ContentType, "application/protobuf")
@ -221,7 +222,5 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
Log.v(TAG, "Response status: ${response.status.toString()}") Log.v(TAG, "Response status: ${response.status.toString()}")
Log.v(TAG, "body: ${response.body<String>()}") Log.v(TAG, "body: ${response.body<String>()}")
client.close()
} }
} }