fix exception no such element

This commit is contained in:
Martin Ptáček
2023-06-15 16:56:39 +02:00
parent 143c244af1
commit b9645a9c0a
3 changed files with 64 additions and 58 deletions

View File

@ -16,7 +16,7 @@ class ExponentialBackoff {
suspend fun runWithBackoff(
function: suspend () -> Unit,
onException: () -> Unit,
onException: (Exception) -> Unit,
debugLabel: String,
infinite: Boolean = true,
) {
@ -40,10 +40,7 @@ class ExponentialBackoff {
}
}
Log.d(TAG, "Debug label: $debugLabel Exception caught")
Log.d(TAG, "Exception: $e")
onException()
onException(e)
// calculate new delay
currentExpIndex++
@ -55,7 +52,7 @@ class ExponentialBackoff {
break
}
Log.d(TAG, "Label :$debugLabel, Backoff with delay: $currentDelay seconds")
Log.d(TAG, "$debugLabel: backoff with delay: $currentDelay seconds")
delay(currentDelay.toLong() * 1000)
}

View File

@ -4,7 +4,6 @@ import android.content.Context
import android.net.ConnectivityManager
import android.net.NetworkCapabilities
import android.util.Log
import androidx.work.impl.utils.getActiveNetworkCompat
import io.ktor.client.HttpClient
import io.ktor.client.request.header
import io.ktor.client.request.headers
@ -12,7 +11,6 @@ import io.ktor.client.request.post
import io.ktor.client.request.setBody
import io.ktor.http.HttpHeaders
import io.ktor.http.HttpStatusCode
import io.prometheus.client.Collector.MetricFamilySamples
import io.prometheus.client.CollectorRegistry
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.channels.BufferOverflow
@ -20,10 +18,8 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.lang.IndexOutOfBoundsException
import kotlin.math.abs
private const val TAG: String = "REMOTE_WRITE_SENDER"
@ -31,41 +27,45 @@ private const val TAG: String = "REMOTE_WRITE_SENDER"
// for purposes of scraping metrics on device and back-filling them later using remote write
//
// Only timestamps of successful scrapes are stored
internal class LastTimeRingBuffer(private val scrapeIntervalMs: Int) {
private val buffer: Array<Long> = Array(hysteresisThreshold) { 0 }
internal class LastTimeRingBuffer(private val scrapeInterval: Int) {
private val buffer: Array<Long> = Array(hysteresisMemory) { 0 }
private var firstIndex: Int = -1
companion object {
private const val hysteresisThreshold: Int = 3
private const val hysteresisMemory: Int = 3
private const val hysteresisCoefficient : Double = 1.2
private const val scrapeTimeCoefficient : Double = 2.2
}
fun setLastTime(timestamp: Long) {
firstIndex = (++firstIndex) % hysteresisThreshold
firstIndex = (++firstIndex) % hysteresisMemory
buffer[firstIndex] = timestamp
System.out.println("${buffer[0]} ${buffer[1]} ${buffer[2]}")
System.out.flush()
}
fun getTimeByIndex(index: Int): Long {
if (index > hysteresisThreshold - 1) {
if (index > hysteresisMemory - 1) {
throw IndexOutOfBoundsException("index cannot be bigger than hysteresisThreshold")
}
val bufferIndex: Int = (firstIndex - index)
return if (bufferIndex < 0){
buffer[hysteresisThreshold + bufferIndex]
buffer[hysteresisMemory + bufferIndex]
}else{
buffer[bufferIndex]
}
}
fun checkScrapeDidNotHappenInTime(): Boolean {
return getTimeByIndex(0) < System.currentTimeMillis() - 3 * scrapeIntervalMs
val now : Long = System.currentTimeMillis()
return getTimeByIndex(0) < now - scrapeTimeCoefficient * scrapeInterval * 1000
}
fun checkScrapeDidNotHappenHysteresis(): Boolean {
val scrapeOccurredAfterThis: Long = System.currentTimeMillis() - 5 * scrapeIntervalMs
for (i in 0 until hysteresisThreshold) {
val diff = (hysteresisMemory * hysteresisCoefficient) * (scrapeInterval * 1000).toDouble()
val scrapeOccurredAfterThis: Long = System.currentTimeMillis() - diff.toLong()
// if any recorded time is lower: return true
for (i in 0 until hysteresisMemory) {
if (getTimeByIndex(i) < scrapeOccurredAfterThis) {
return true
}
@ -84,7 +84,7 @@ data class RemoteWriteConfiguration(
)
class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
private val lastTimeRingBuffer = LastTimeRingBuffer(config.scrapeInterval * 1000)
private val lastTimeRingBuffer = LastTimeRingBuffer(config.scrapeInterval)
private val storage: RemoteWriteSenderStorage = RemoteWriteSenderSimpleMemoryStorage()
private var scrapesAreBeingSent: Boolean = false
private lateinit var client: HttpClient
@ -118,10 +118,13 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
Log.d(TAG, "Turning remote write on")
performScrapeAndSaveIt(channel)
delay(config.scrapeInterval * 1000L)
while (lastTimeRingBuffer.checkScrapeDidNotHappenHysteresis()) {
delay(config.scrapeInterval * 1000L)
Log.d(TAG, "Hysteresis loop start")
performScrapeAndSaveIt(channel)
delay(config.scrapeInterval * 1000L)
Log.d(TAG, "Hysteresis loop end")
}
Log.d(TAG, "Turning remote write off")
@ -170,7 +173,9 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
Log.d(TAG, "Exponential backoff to export remote write started")
ExponentialBackoff.runWithBackoff({
sendRequestToRemoteWrite(body, config.maxSamplesPerExport)
}, {}, "Remote Write", false)
}, {
Log.d(TAG, "exportToRemoteWriteEndpointException, ${it.message}, ${it}, ${it.stackTraceToString()}")
}, "Remote Write", false)
Log.d(TAG, "Exponential backoff to export remote write finish")
}
lastTimeRemoteWriteSent = System.currentTimeMillis()

View File

@ -28,39 +28,41 @@ data class MetricsScrape(
for (family in input){
for (sample in family.samples){
val labels : MutableList<TimeSeriesLabel> = mutableListOf()
if (sample != null){
val labels : MutableList<TimeSeriesLabel> = mutableListOf()
// name label
val sampleName : String = sample.name
val sampleNameLabel = TimeSeriesLabel(
name = "__name__",
value = sampleName
)
labels.add(sampleNameLabel)
// labels are stored in parallel lists -> iterate over two lists at a time
val labelNamesIterator = sample.labelNames.iterator()
val labelValuesIterator = sample.labelValues.iterator()
while (labelNamesIterator.hasNext() && labelValuesIterator.hasNext()) {
val labelName: String = labelNamesIterator.next()
val labelValue: String = labelValuesIterator.next()
val label = TimeSeriesLabel(
name = labelName,
value = labelValue,
// name label
val sampleName : String = sample.name
val sampleNameLabel = TimeSeriesLabel(
name = "__name__",
value = sampleName
)
labels.add(label)
labels.add(sampleNameLabel)
// labels are stored in parallel lists -> iterate over two lists at a time
val labelNamesIterator = sample.labelNames.iterator()
val labelValuesIterator = sample.labelValues.iterator()
while (labelNamesIterator.hasNext() && labelValuesIterator.hasNext()) {
val labelName: String = labelNamesIterator.next()
val labelValue: String = labelValuesIterator.next()
val label = TimeSeriesLabel(
name = labelName,
value = labelValue,
)
labels.add(label)
}
val timeSeries = StorageTimeSeries(
labels = labels.toList(),
sample = TimeSeriesSample(
value = sample.value,
timeStampMs = System.currentTimeMillis(),
)
)
timeSeriesList.add(timeSeries)
}
val timeSeries = StorageTimeSeries(
labels = labels.toList(),
sample = TimeSeriesSample(
value = sample.value,
timeStampMs = sample.timestampMs ?: System.currentTimeMillis(),
)
)
timeSeriesList.add(timeSeries)
}
}
@ -103,7 +105,6 @@ data class TimeSeriesSample(
// HashMap<List of labels including name, List of TimeSeries samples to this TimeSeries>
private typealias ConverterHashMap = HashMap<List<TimeSeriesLabel>, MutableList<TimeSeriesSample>>
//TODO rewrite these classes
abstract class RemoteWriteSenderStorage {
private val remoteWriteLabel: TimeSeriesLabel = TimeSeriesLabel(
name = "backfill",
@ -189,7 +190,6 @@ class RemoteWriteSenderSimpleMemoryStorage : RemoteWriteSenderStorage() {
private val data: Queue<MetricsScrape> = LinkedList()
override fun getScrapedSamplesCompressedProtobuf(howMany: Int): ByteArray {
Log.d(TAG, "Getting scraped samples: $howMany samples")
if (howMany < 1) {
throw IllegalArgumentException("howMany must be bigger than zero")
}
@ -203,6 +203,7 @@ class RemoteWriteSenderSimpleMemoryStorage : RemoteWriteSenderStorage() {
scrapedMetrics.add(oneMetric)
}
}
Log.d(TAG, "Getting scraped samples: ${scrapedMetrics.size} samples")
val writeRequest: WriteRequest = this.metricsScrapeListToProtobuf(scrapedMetrics.toList())
val bytes: ByteArray = writeRequest.toByteArray()
@ -211,10 +212,13 @@ class RemoteWriteSenderSimpleMemoryStorage : RemoteWriteSenderStorage() {
//TODO use this thing
override fun removeNumberOfScrapedSamples(number: Int) {
Log.d(TAG, "Removing number of scraped samples: $number samples")
if (number > 0) {
for (i in 1..number) {
data.remove()
if(data.isEmpty()){
break;
}else{
data.remove()
}
}
} else {
throw IllegalArgumentException("number must by higher than 0")