Fix metrics not being properly collected for RW

This commit is contained in:
Martin Ptáček
2023-06-15 07:42:22 +02:00
parent 766297a741
commit de8a00bda5
3 changed files with 68 additions and 69 deletions

View File

@ -19,9 +19,9 @@ class ExponentialBackoff {
onException: () -> Unit, infinite: Boolean = true
) {
var successfull: Boolean = false
var successfull = false
var currentDelay = initialDelay
var currentDelay: Double
var currentExpIndex = -1
while (!successfull) {

View File

@ -86,11 +86,13 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
private suspend fun performScrapeAndSaveIt(channel: Channel<Unit>) {
Log.d(TAG, "performScrapeAndSaveIt start")
val scrapedMetrics = config.collectorRegistry.metricFamilySamples()
val metricsScrape : MetricsScrape = MetricsScrape.fromMfs(scrapedMetrics)
storage.writeScrapedSample(metricsScrape)
channel.send(Unit)
Log.d(TAG, "performScrapeAndSaveIt end")
}

View File

@ -18,22 +18,60 @@ private const val TAG: String = "REMOTE_WRITE_SENDER_STORAGE"
// No need for locks as all operations are run on a single thread, defined in PromWorker
// This class defines contract for RemoteWriteSender storage
// the same structure as MetricFamilySamples
// data classes, the same structure as MetricFamilySamples
data class MetricsScrape(
val timeSeriesList : List<TimeSeries>
val timeSeriesList : List<StorageTimeSeries>
){
companion object {
fun fromMfs(input : Enumeration<MetricFamilySamples>) : MetricsScrape{
fun fromMfs(input : Enumeration<MetricFamilySamples>) : MetricsScrape {
val timeSeriesList : MutableList<StorageTimeSeries> = mutableListOf()
for (family in input){
for (sample in family.samples){
val labels : MutableList<TimeSeriesLabel> = mutableListOf()
// name label
val sampleName : String = sample.name
val sampleNameLabel = TimeSeriesLabel(
name = "__name__",
value = sampleName
)
labels.add(sampleNameLabel)
// labels are stored in parallel lists -> iterate over two lists at a time
val labelNamesIterator = sample.labelNames.iterator()
val labelValuesIterator = sample.labelValues.iterator()
while (labelNamesIterator.hasNext() && labelValuesIterator.hasNext()) {
val labelName: String = labelNamesIterator.next()
val labelValue: String = labelValuesIterator.next()
val label = TimeSeriesLabel(
name = labelName,
value = labelValue,
)
labels.add(label)
}
val timeSeries = StorageTimeSeries(
labels = labels.toList(),
sample = TimeSeriesSample(
value = sample.value,
timeStampMs = sample.timestampMs ?: System.currentTimeMillis(),
)
)
timeSeriesList.add(timeSeries)
}
}
return MetricsScrape(
timeSeriesList = timeSeriesList
)
}
}
}
data class TimeSeries(
data class StorageTimeSeries(
val sample : TimeSeriesSample,
val labels : List<TimeSeriesLabel>,
)
@ -65,7 +103,7 @@ data class TimeSeriesSample(
// HashMap<List of labels including name, List of TimeSeries samples to this TimeSeries>
private typealias ConverterHashMap = HashMap<List<TimeSeriesLabel>, MutableList<TimeSeriesSample>>
//TODO rewrite these classes
abstract class RemoteWriteSenderStorage {
private val remoteWriteLabel: TimeSeriesLabel = TimeSeriesLabel(
name = "backfill",
@ -76,65 +114,6 @@ abstract class RemoteWriteSenderStorage {
return Snappy.compress(data)
}
private fun processLabels(
sample: MetricFamilySamples.Sample,
sampleName: String
): List<TimeSeriesLabel> {
val result: MutableList<TimeSeriesLabel> = mutableListOf()
// labels are stored in parallel lists -> iterate over two lists at a time
val sampleLabelNamesIterator = sample.labelNames.iterator()
val sampleLabelValuesIterator = sample.labelNames.iterator()
while (sampleLabelNamesIterator.hasNext() && sampleLabelValuesIterator.hasNext()) {
val labelName: String = sampleLabelNamesIterator.next()
val labelValue: String = sampleLabelValuesIterator.next()
val label = TimeSeriesLabel(
name = labelName,
value = labelValue,
)
result.add(label)
}
// add name and remoteWrite mark
val nameLabel = TimeSeriesLabel(name = "__name__", value = sampleName)
result.add(nameLabel)
result.add(remoteWriteLabel)
return result.toList()
}
private fun getTimeSeriesSample(sample: MetricFamilySamples.Sample): TimeSeriesSample {
val timestampMs: Long = sample.timestampMs ?: System.currentTimeMillis()
return TimeSeriesSample(
value = sample.value,
timeStampMs = timestampMs,
)
}
private fun processTimeSeries(
hashMap: ConverterHashMap, familySample: MetricFamilySamples
) {
for (sample in familySample.samples) {
val sampleName: String = sample.name
val labels: List<TimeSeriesLabel> = processLabels(sample, sampleName)
val timeSeriesSample: TimeSeriesSample = getTimeSeriesSample(sample)
if (hashMap[labels] == null) {
// this time series does not yet exist
hashMap[labels] = mutableListOf(timeSeriesSample)
} else {
// this time series already exists
hashMap[labels]!!.add(timeSeriesSample)
}
}
}
private fun hashMapEntryToProtobufTimeSeries(
labels: List<TimeSeriesLabel>, samples: MutableList<TimeSeriesSample>
): TimeSeries {
@ -163,6 +142,24 @@ abstract class RemoteWriteSenderStorage {
return writeRequestBuilder.build()
}
private fun processStorageTimeSeries(hashMap: ConverterHashMap, timeSeries: StorageTimeSeries){
// add remote write label to labels
// this label ensures timeseries uniqueness among those scraped by pushprox or promserver
// and those scraped by Remote Write
val labels: MutableList<TimeSeriesLabel> = timeSeries.labels.toMutableList()
labels.add(remoteWriteLabel)
val immutableLabels : List<TimeSeriesLabel> = labels.toList()
if (hashMap[immutableLabels] == null) {
// this time series does not yet exist
hashMap[immutableLabels] = mutableListOf(timeSeries.sample)
} else {
// this time series already exists
hashMap[immutableLabels]!!.add(timeSeries.sample)
}
}
protected fun metricsScrapeListToProtobuf(input: List<MetricsScrape>): WriteRequest {
if (input.isEmpty()) {
throw Exception("Input is empty!")
@ -171,8 +168,8 @@ abstract class RemoteWriteSenderStorage {
val hashmap: ConverterHashMap = HashMap()
for (metricsScrape in input) {
for (familySample in metricsScrape) {
processTimeSeries(hashmap, familySample)
for (timeSeries in metricsScrape.timeSeriesList){
processStorageTimeSeries(hashmap, timeSeries)
}
}