remote sender storage simple memory done

This commit is contained in:
Martin Ptáček
2023-06-12 20:56:02 +02:00
parent 1bfa8779e2
commit 0bef5d006a
2 changed files with 212 additions and 61 deletions

View File

@ -33,6 +33,10 @@ private enum class RemoteWriteSenderState {
private class LastTimeRingBuffer { private class LastTimeRingBuffer {
//TODO implement this with ring array //TODO implement this with ring array
fun setLastTime(timestamp : Long) {
//TODO implement this
}
private fun checkScrapeDidNotHappenInTime() : Boolean { private fun checkScrapeDidNotHappenInTime() : Boolean {
return lastTimeMutex.getLastTime() < System.currentTimeMillis() - 3 * config.scrape_interval return lastTimeMutex.getLastTime() < System.currentTimeMillis() - 3 * config.scrape_interval
} }
@ -100,30 +104,12 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
return request.toByteArray() return request.toByteArray()
} }
private fun performScrape() : MetricsScrape { private fun performScrapeAndSaveIt() {
val scrapedMetrics = config.collectorRegistry.metricFamilySamples()
storage.writeScrapedSample(scrapedMetrics)
for ( item in config.collectorRegistry.metricFamilySamples() ){
val name : String = item.name
for (sample in item.samples){
val timestamp : Long = sample.timestampMs
val labelValueIterator : Iterator<String> = sample.labelValues.iterator()
for(labelName in sample.labelNames){
val protobufLabel : Label = Label.newBuilder()
.setName(labelName)
.setValue(labelValueIterator.next())
.build()
} }
val sampleValue : Double = sample.value //TODO channel je bad
}
}
}
//TODO channel je k hovnu
//TODO v remotewriteseender storage musi byt mutex //TODO v remotewriteseender storage musi byt mutex
@ -132,11 +118,11 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
while (true){ while (true){
if (checkScrapeDidNotHappenInTime()){ if (checkScrapeDidNotHappenInTime()){
delay(config.scrape_interval * 1000L) delay(config.scrape_interval * 1000L)
storage.writeScrapedSample(performScrape())
while(checkScrapeDidNotHappenHysteresis()){ while(checkScrapeDidNotHappenHysteresis()){
delay(config.scrape_interval * 1000L) delay(config.scrape_interval * 1000L)
storage.writeScrapedSample(performScrape()) performScrapeAndSaveIt
} }
} }
delay(checkDelay) delay(checkDelay)
@ -201,10 +187,6 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
lastTimeMutex.setLastTime(System.currentTimeMillis()) lastTimeMutex.setLastTime(System.currentTimeMillis())
} }
private fun encodeWithSnappy(data: ByteArray): ByteArray {
return Snappy.compress(data)
}
private suspend fun sendTestRequest() { private suspend fun sendTestRequest() {
Log.v(TAG, "sending to prometheus now") Log.v(TAG, "sending to prometheus now")
val client = HttpClient() val client = HttpClient()

View File

@ -1,52 +1,221 @@
package com.birdthedeveloper.prometheus.android.prometheus.android.exporter.worker package com.birdthedeveloper.prometheus.android.prometheus.android.exporter.worker
import java.sql.Timestamp import android.util.Log
import com.google.protobuf.value
import io.prometheus.client.Collector
import io.prometheus.client.Collector.MetricFamilySamples
import org.xerial.snappy.Snappy
import remote.write.RemoteWrite.Label
import remote.write.RemoteWrite.Sample
import remote.write.RemoteWrite.TimeSeries
import remote.write.RemoteWrite.WriteRequest
import java.util.Enumeration
import java.util.LinkedList import java.util.LinkedList
import java.util.Queue import java.util.Queue
//TODO toto je na houby cele, musi byt structured misto byte array private const val TAG : String = "REMOTE_WRITE_SENDER_STORAGE"
class MetricsScrape(
val compressedMetrics : ByteArray,
val timestamp: Long,
)
// This is a very primitive implementation, may require some optimization later
//
// No need for locks as all operations are run on a single thread, defined in PromWorker // No need for locks as all operations are run on a single thread, defined in PromWorker
// This class defines contract for RemoteWriteSender storage // This class defines contract for RemoteWriteSender storage
typealias MetricsScrape = Enumeration<Collector.MetricFamilySamples>
// HashMap<List of labels including name, List of TimeSeries samples to this TimeSeries>
private typealias ConverterHashMap = HashMap<List<TimeSeriesLabel>, MutableList<TimeSeriesSample>>
private data class TimeSeriesLabel(
val name : String,
val value : String,
){
fun toProtobufLabel() : Label{
return Label.newBuilder()
.setName(this.name)
.setValue(this.value)
.build()
}
}
private data class TimeSeriesSample(
val timeStampMs : Long,
val value : Double,
){
fun toProtobufSample() : Sample{
return Sample.newBuilder()
.setTimestamp(this.timeStampMs)
.setValue(this.value)
.build()
}
}
abstract class RemoteWriteSenderStorage { abstract class RemoteWriteSenderStorage {
private val remoteWriteLabel : TimeSeriesLabel = TimeSeriesLabel(
name = "backfill",
value = "true",
)
protected fun encodeWithSnappy(data: ByteArray): ByteArray {
return Snappy.compress(data)
}
private fun processLabels(sample : Collector.MetricFamilySamples.Sample,
familySampleName: String) : List<TimeSeriesLabel>{
val result : MutableList<TimeSeriesLabel> = mutableListOf()
// labels are stored in parallel lists -> iterate over two lists at a time
val sampleLabelNamesIterator = sample.labelNames.iterator()
val sampleLabelValuesIterator = sample.labelNames.iterator()
while (sampleLabelNamesIterator.hasNext() && sampleLabelValuesIterator.hasNext()) {
val labelName : String = sampleLabelNamesIterator.next()
val labelValue : String = sampleLabelValuesIterator.next()
val label : TimeSeriesLabel = TimeSeriesLabel(
name = labelName,
value = labelValue,
)
result.add(label)
}
// add name and remoteWrite mark
val nameLabel = TimeSeriesLabel(name = "__name__", value = familySampleName)
result.add(nameLabel)
result.add(remoteWriteLabel)
return result.toList()
}
private fun getTimeSeriesSample(sample : Collector.MetricFamilySamples.Sample) : TimeSeriesSample{
return TimeSeriesSample(
value = sample.value,
timeStampMs = sample.timestampMs,
)
}
private fun processTimeSeries(
hashMap: ConverterHashMap, familySample : Collector.MetricFamilySamples){
val familySampleName : String = familySample.name
Log.v(TAG, "FamilySampleName: $familySampleName")
for ( sample in familySample.samples ) {
val labels : List<TimeSeriesLabel> = processLabels(sample, familySampleName)
// TODO this may be useful in the future
val sampleName : String = sample.name
Log.v(TAG, "sampleName: $sampleName")
val timeSeriesSample : TimeSeriesSample = getTimeSeriesSample(sample)
if (hashMap[labels] == null){
// this time series does not yet exist
hashMap[labels] = mutableListOf(timeSeriesSample)
}else{
// this time series already exists
hashMap[labels]!!.add(timeSeriesSample)
}
}
}
private fun hashMapEntryToProtobufTimeSeries(
labels : List<TimeSeriesLabel>, samples : MutableList<TimeSeriesSample>) : TimeSeries {
val timeSeriesBuilder : TimeSeries.Builder = TimeSeries.newBuilder()
timeSeriesBuilder.addAllLabels(labels.map{
it.toProtobufLabel()
})
timeSeriesBuilder.addAllSamples(samples.map{
it.toProtobufSample()
})
return timeSeriesBuilder.build()
}
private fun hashmapToProtobufWriteRequest(hashMap: ConverterHashMap) : WriteRequest{
val writeRequestBuilder : WriteRequest.Builder = WriteRequest.newBuilder()
for (entry in hashMap){
val timeSeries = hashMapEntryToProtobufTimeSeries(entry.key, entry.value)
writeRequestBuilder.addTimeseries(timeSeries)
}
return writeRequestBuilder.build()
}
protected fun metricsScrapeListToProtobuf(input: List<MetricsScrape>) : WriteRequest {
if(input.isEmpty()){
throw Exception("Input is empty!")
}
val hashmap : ConverterHashMap = HashMap()
for ( metricsScrape in input ){
for ( familySample in metricsScrape ) {
processTimeSeries(hashmap, familySample)
}
}
return hashmapToProtobufWriteRequest(hashmap)
}
abstract fun writeScrapedSample(metricsScrape: MetricsScrape) abstract fun writeScrapedSample(metricsScrape: MetricsScrape)
abstract fun getNumberOfScrapedSamples(number: Int): List<MetricsScrape> abstract fun getScrapedSamplesCompressedProtobuf(howMany: Int): ByteArray
abstract fun removeNumberOfScrapedSamples(number: Int) abstract fun removeNumberOfScrapedSamples(number: Int)
abstract fun isEmpty(): Boolean abstract fun isEmpty(): Boolean
abstract fun getLength(): Int abstract fun getLength(): Int
} }
class RemoteWriteSenderMemoryStorage : RemoteWriteSenderStorage() { class RemoteWriteSenderSimpleMemoryStorage : RemoteWriteSenderStorage() {
// writeRequests are stored in protobuf already compressed private val data : Queue<MetricsScrape> = LinkedList()
private val data : Queue<MetricsScrape> = LinkedList<MetricsScrape>()
override fun getNumberOfScrapedSamples(number: Int): List<MetricsScrape> { override fun getScrapedSamplesCompressedProtobuf(howMany: Int): ByteArray {
TODO("Not yet implemented") if (howMany < 1){
} throw IllegalArgumentException("howMany must be bigger than zero")
}
override fun removeNumberOfScrapedSamples(number: Int) {
TODO("Not yet implemented") val scrapedMetrics : MutableList<MetricsScrape> = mutableListOf()
} for (i in 1..howMany){
val oneMetric : MetricsScrape? = data.poll()
override fun writeScrapedSample(metricsScrape: MetricsScrape) { if(oneMetric == null){
TODO("Not yet implemented") break
} }else{
scrapedMetrics.add(oneMetric)
override fun isEmpty(): Boolean { }
TODO("Not yet implemented") }
}
val writeRequest : WriteRequest = this.metricsScrapeListToProtobuf(scrapedMetrics.toList())
override fun getLength(): Int { val bytes : ByteArray = writeRequest.toByteArray()
TODO("Not yet implemented") return this.encodeWithSnappy(bytes)
} }
}
override fun removeNumberOfScrapedSamples(number: Int) {
class RemoteWriteSenterDatabaseStorage : RemoteWriteSenderStorage() { if (number > 0){
override fun getNumberOfScrapedSamples(number: Int): List<MetricsScrape> { for (i in 1..number){
data.remove()
}
}else{
throw IllegalArgumentException("number must by higher than 0")
}
}
override fun writeScrapedSample(metricsScrape: MetricsScrape) {
data.add(metricsScrape)
}
override fun isEmpty(): Boolean {
return data.isEmpty()
}
override fun getLength(): Int {
return data.count()
}
}
class RemoteWriteSenderDatabaseStorage : RemoteWriteSenderStorage() {
override fun getScrapedSamplesCompressedProtobuf(howMany: Int): ByteArray {
TODO("Not yet implemented") TODO("Not yet implemented")
} }