last time ring buffer

This commit is contained in:
Martin Ptáček
2023-06-12 22:25:47 +02:00
parent 0bef5d006a
commit 0da06a543c

View File

@ -17,7 +17,6 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import org.iq80.snappy.Snappy
import remote.write.RemoteWrite import remote.write.RemoteWrite
import remote.write.RemoteWrite.Label import remote.write.RemoteWrite.Label
import remote.write.RemoteWrite.TimeSeries import remote.write.RemoteWrite.TimeSeries
@ -25,24 +24,45 @@ import remote.write.RemoteWrite.WriteRequest
private const val TAG: String = "REMOTE_WRITE_SENDER" private const val TAG: String = "REMOTE_WRITE_SENDER"
private enum class RemoteWriteSenderState { //TODO this thing
REMOTE_WRITE,
PUSHPROX_OR_PROMETHEUS_SERVER,
}
private class LastTimeRingBuffer { // This class stores information about scrapes to PROM_SERVER and PUSHPROX
//TODO implement this with ring array // for purposes of scraping metrics on device and back-filling them later using remote write
//
// Only timestamps of succesfull scrapes are stored
private class LastTimeRingBuffer(private val scrapeIntervalMs: Int) {
private val buffer : Array<Long> = Array(hysteresisThreshold) { _ -> 0 }
private var firstIndex : Int = 0
companion object{
private const val hysteresisThreshold : Int = 3
}
fun setLastTime(timestamp : Long) { fun setLastTime(timestamp : Long) {
//TODO implement this firstIndex = firstIndex++ % hysteresisThreshold
buffer[firstIndex] = timestamp
}
private fun getTimeByIndex(index : Int) : Long {
if(index > hysteresisThreshold - 1){
throw IllegalArgumentException("index cannot be bigger than hysteresisThreshold")
}
val bufferIndex : Int = firstIndex + index % hysteresisThreshold
return buffer[bufferIndex]
} }
private fun checkScrapeDidNotHappenInTime() : Boolean { private fun checkScrapeDidNotHappenInTime() : Boolean {
return lastTimeMutex.getLastTime() < System.currentTimeMillis() - 3 * config.scrape_interval return getTimeByIndex(0) < System.currentTimeMillis() - 3 * scrapeIntervalMs
} }
private fun checkScrapeDidNotHappenHysteresis() : Boolean { private fun checkScrapeDidNotHappenHysteresis() : Boolean {
return false //TODO implement this with ring buffer in lastTimeMutex val scrapeOccuredAfterThis : Long = System.currentTimeMillis() - 5 * scrapeIntervalMs
for (i in 0 until hysteresisThreshold) {
if (getTimeByIndex(i) < scrapeOccuredAfterThis){
return true
}
}
return false
} }
} }
@ -58,8 +78,7 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
// TODO last time into it's own object with boolean functions // TODO last time into it's own object with boolean functions
// private val lastTimeMutex = LastTimeMutex() // private val lastTimeMutex = LastTimeMutex()
private var alreadyStoredSampleLength : Int = 0 private var alreadyStoredSampleLength : Int = 0
private val storage : RemoteWriteSenderStorage = RemoteWriteSenderMemoryStorage() private val storage : RemoteWriteSenderStorage = RemoteWriteSenderSimpleMemoryStorage()
private val scrapesAreBeingSentMutex = Mutex()
private fun getRequestBody(): ByteArray { private fun getRequestBody(): ByteArray {
val label1: Label = Label.newBuilder() val label1: Label = Label.newBuilder()