code formatting

This commit is contained in:
Martin Ptáček
2023-06-13 23:39:10 +02:00
parent de217e9503
commit 91e16d5c43
8 changed files with 159 additions and 147 deletions

View File

@@ -14,8 +14,8 @@ private const val TAG: String = "CONFIGURATION"
//TODO register within prometheus foundation //TODO register within prometheus foundation
private const val defaultPrometheusServerPort: Int = 10101 private const val defaultPrometheusServerPort: Int = 10101
private const val defaultRemoteWriteScrapeInterval: Int = 30 // seconds private const val defaultRemoteWriteScrapeInterval: Int = 30 // seconds
private const val defaultRemoteWriteMaxSamplesPerExport : Int = 60 // seconds private const val defaultRemoteWriteMaxSamplesPerExport: Int = 60 // seconds
private const val defaultRemoteWriteExportInterval : Int = 120 // seconds private const val defaultRemoteWriteExportInterval: Int = 120 // seconds
// serialization classes for parsing YAML configuration file // serialization classes for parsing YAML configuration file
@Serializable @Serializable
@@ -27,15 +27,19 @@ data class PromConfigFile(
fun toPromConfiguration(): PromConfiguration { fun toPromConfiguration(): PromConfiguration {
return PromConfiguration( return PromConfiguration(
pushproxProxyUrl = this.pushprox?.proxy_url ?: "", pushproxProxyUrl = this.pushprox?.proxy_url ?: "",
remoteWriteScrapeInterval = (this.remote_write?.scrape_interval ?: defaultRemoteWriteScrapeInterval).toString(), remoteWriteScrapeInterval = (this.remote_write?.scrape_interval
?: defaultRemoteWriteScrapeInterval).toString(),
pushproxEnabled = this.pushprox?.enabled ?: false, pushproxEnabled = this.pushprox?.enabled ?: false,
pushproxFqdn = this.pushprox?.fqdn ?: "", pushproxFqdn = this.pushprox?.fqdn ?: "",
remoteWriteEnabled = this.remote_write?.enabled ?: false, remoteWriteEnabled = this.remote_write?.enabled ?: false,
remoteWriteEndpoint = this.remote_write?.remote_write_endpoint ?: "", remoteWriteEndpoint = this.remote_write?.remote_write_endpoint ?: "",
prometheusServerEnabled = this.prometheus_server?.enabled ?: true, prometheusServerEnabled = this.prometheus_server?.enabled ?: true,
prometheusServerPort = (this.prometheus_server?.port ?: defaultPrometheusServerPort).toString(), prometheusServerPort = (this.prometheus_server?.port
remoteWriteMaxSamplesPerExport = (this.remote_write?.max_samples_per_export ?: defaultRemoteWriteMaxSamplesPerExport).toString(), ?: defaultPrometheusServerPort).toString(),
remoteWriteExportInterval = (this.remote_write?.export_interval ?: defaultRemoteWriteExportInterval).toString(), remoteWriteMaxSamplesPerExport = (this.remote_write?.max_samples_per_export
?: defaultRemoteWriteMaxSamplesPerExport).toString(),
remoteWriteExportInterval = (this.remote_write?.export_interval
?: defaultRemoteWriteExportInterval).toString(),
) )
} }
} }
@@ -74,8 +78,8 @@ data class PromConfiguration(
val remoteWriteEnabled: Boolean = false, val remoteWriteEnabled: Boolean = false,
val remoteWriteScrapeInterval: String = defaultRemoteWriteScrapeInterval.toString(), val remoteWriteScrapeInterval: String = defaultRemoteWriteScrapeInterval.toString(),
val remoteWriteEndpoint: String = "", val remoteWriteEndpoint: String = "",
val remoteWriteExportInterval : String = defaultRemoteWriteExportInterval.toString(), val remoteWriteExportInterval: String = defaultRemoteWriteExportInterval.toString(),
val remoteWriteMaxSamplesPerExport : String = defaultRemoteWriteMaxSamplesPerExport.toString(), val remoteWriteMaxSamplesPerExport: String = defaultRemoteWriteMaxSamplesPerExport.toString(),
) { ) {
fun toStructuredText(): String { fun toStructuredText(): String {

View File

@@ -27,11 +27,8 @@ import androidx.compose.material.TopAppBar
import androidx.compose.material.icons.Icons import androidx.compose.material.icons.Icons
import androidx.compose.material.icons.outlined.Settings import androidx.compose.material.icons.outlined.Settings
import androidx.compose.runtime.Composable import androidx.compose.runtime.Composable
import androidx.compose.runtime.MutableState
import androidx.compose.runtime.collectAsState import androidx.compose.runtime.collectAsState
import androidx.compose.runtime.getValue import androidx.compose.runtime.getValue
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.remember
import androidx.compose.ui.Alignment import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier import androidx.compose.ui.Modifier
import androidx.compose.ui.graphics.Color import androidx.compose.ui.graphics.Color
@@ -301,7 +298,7 @@ private fun RemoteWritePage(
horizontalAlignment = Alignment.CenterHorizontally, horizontalAlignment = Alignment.CenterHorizontally,
) { ) {
Text("Remote write configuration:") Text("Remote write configuration:")
Spacer(modifier = Modifier.padding(bottom = 12.dp)) Spacer(modifier = Modifier.padding(bottom = 12.dp))
TextField( TextField(

View File

@@ -65,7 +65,7 @@ class PromViewModel : ViewModel() {
private lateinit var getContext: () -> Context private lateinit var getContext: () -> Context
private var workerLiveData: LiveData<List<WorkInfo>>? = null private var workerLiveData: LiveData<List<WorkInfo>>? = null
private val workerLiveDataObserver : Observer<List<WorkInfo>> = Observer { private val workerLiveDataObserver: Observer<List<WorkInfo>> = Observer {
if (it.isEmpty()) { if (it.isEmpty()) {
updateExporterStateWith(ExporterState.NotRunning) updateExporterStateWith(ExporterState.NotRunning)
} else { } else {
@@ -83,6 +83,7 @@ class PromViewModel : ViewModel() {
companion object { companion object {
private const val PROM_UNIQUE_WORK: String = "prom_unique_job" private const val PROM_UNIQUE_WORK: String = "prom_unique_job"
} }
private fun loadConfigurationFile() { private fun loadConfigurationFile() {
Log.v(TAG, "Checking for configuration file") Log.v(TAG, "Checking for configuration file")
@@ -139,18 +140,18 @@ class PromViewModel : ViewModel() {
} }
} }
override fun onCleared(){ override fun onCleared() {
super.onCleared() super.onCleared()
workerLiveData?.removeObserver(workerLiveDataObserver) workerLiveData?.removeObserver(workerLiveDataObserver)
} }
private fun updateExporterStateWith(exporterState: ExporterState){ private fun updateExporterStateWith(exporterState: ExporterState) {
_uiState.update { _uiState.update {
it.copy(exporterState = exporterState) it.copy(exporterState = exporterState)
} }
} }
private fun startMonitoringWorker(){ private fun startMonitoringWorker() {
val workManagerInstance = WorkManager.getInstance(getContext()) val workManagerInstance = WorkManager.getInstance(getContext())
workerLiveData = workManagerInstance.getWorkInfosLiveData( workerLiveData = workManagerInstance.getWorkInfosLiveData(
WorkQuery.fromUniqueWorkNames( WorkQuery.fromUniqueWorkNames(
@@ -185,15 +186,15 @@ class PromViewModel : ViewModel() {
return false return false
} }
private fun somePushProxVariableUnset(config : PromConfiguration) : Boolean { private fun somePushProxVariableUnset(config: PromConfiguration): Boolean {
return config.pushproxFqdn.isBlank() || config.pushproxProxyUrl.isBlank() return config.pushproxFqdn.isBlank() || config.pushproxProxyUrl.isBlank()
} }
private fun somePrometheusServerVariableUnset(config : PromConfiguration) : Boolean { private fun somePrometheusServerVariableUnset(config: PromConfiguration): Boolean {
return config.prometheusServerPort.isBlank() return config.prometheusServerPort.isBlank()
} }
private fun someRemoteWriteVariableUnset(config : PromConfiguration) : Boolean { private fun someRemoteWriteVariableUnset(config: PromConfiguration): Boolean {
return config.remoteWriteEndpoint.isBlank() return config.remoteWriteEndpoint.isBlank()
|| config.remoteWriteScrapeInterval.isBlank() || config.remoteWriteScrapeInterval.isBlank()
|| config.remoteWriteExportInterval.isBlank() || config.remoteWriteExportInterval.isBlank()
@@ -209,22 +210,22 @@ class PromViewModel : ViewModel() {
} }
// check for empty configuration // check for empty configuration
if(config.pushproxEnabled && somePushProxVariableUnset(config)){ if (config.pushproxEnabled && somePushProxVariableUnset(config)) {
return displayConfigValidationDialog("Please set all PushProx configuration settings!") return displayConfigValidationDialog("Please set all PushProx configuration settings!")
} }
if(config.prometheusServerEnabled && somePrometheusServerVariableUnset(config)){ if (config.prometheusServerEnabled && somePrometheusServerVariableUnset(config)) {
return displayConfigValidationDialog("Set all Prometheus Server config settings!") return displayConfigValidationDialog("Set all Prometheus Server config settings!")
} }
if(config.remoteWriteEnabled && someRemoteWriteVariableUnset(config)){ if (config.remoteWriteEnabled && someRemoteWriteVariableUnset(config)) {
return displayConfigValidationDialog("Set all Remote Write configuration settings!") return displayConfigValidationDialog("Set all Remote Write configuration settings!")
} }
// validate settings for remote write // validate settings for remote write
if(config.remoteWriteEnabled){ if (config.remoteWriteEnabled) {
// check scrape interval boundaries // check scrape interval boundaries
val minScrapeInterval = 1 val minScrapeInterval = 1
val maxScrapeInterval = 3600 / 4 val maxScrapeInterval = 3600 / 4
val scrapeInterval : Int = config.remoteWriteScrapeInterval.toIntOrNull() val scrapeInterval: Int = config.remoteWriteScrapeInterval.toIntOrNull()
?: return displayConfigValidationDialog("Scrape interval must be a number!") ?: return displayConfigValidationDialog("Scrape interval must be a number!")
if (scrapeInterval > maxScrapeInterval || scrapeInterval < minScrapeInterval) { if (scrapeInterval > maxScrapeInterval || scrapeInterval < minScrapeInterval) {
@@ -236,9 +237,9 @@ class PromViewModel : ViewModel() {
?: return displayConfigValidationDialog("Max Samples Per Export must be a number!") ?: return displayConfigValidationDialog("Max Samples Per Export must be a number!")
// check export interval // check export interval
val exportInterval : Int = config.remoteWriteExportInterval.toIntOrNull() val exportInterval: Int = config.remoteWriteExportInterval.toIntOrNull()
?: return displayConfigValidationDialog("Export interval must be a number!") ?: return displayConfigValidationDialog("Export interval must be a number!")
if (scrapeInterval > exportInterval){ if (scrapeInterval > exportInterval) {
return displayConfigValidationDialog( return displayConfigValidationDialog(
"Scrape interval must be smaller than Export interval!" "Scrape interval must be smaller than Export interval!"
) )
@@ -247,11 +248,11 @@ class PromViewModel : ViewModel() {
} }
// validate settings for prometheus server // validate settings for prometheus server
if(config.prometheusServerEnabled){ if (config.prometheusServerEnabled) {
// check port boundaries // check port boundaries
val minPort = 1024 val minPort = 1024
val maxPort = 65535 val maxPort = 65535
val prometheusServerPort : Int = config.prometheusServerPort.toIntOrNull() val prometheusServerPort: Int = config.prometheusServerPort.toIntOrNull()
?: return displayConfigValidationDialog("Prometheus Server Port must be a number!") ?: return displayConfigValidationDialog("Prometheus Server Port must be a number!")
if (prometheusServerPort < minPort || prometheusServerPort > maxPort) { if (prometheusServerPort < minPort || prometheusServerPort > maxPort) {
return displayConfigValidationDialog("Prometheus exporter port out of bounds!") return displayConfigValidationDialog("Prometheus exporter port out of bounds!")

View File

@@ -11,8 +11,10 @@ class ExponentialBackoff {
private const val initialDelay = 3.0 // seconds private const val initialDelay = 3.0 // seconds
private const val maxDelay = 40.0 // seconds private const val maxDelay = 40.0 // seconds
suspend fun runWithBackoff(function: suspend () -> Unit, suspend fun runWithBackoff(
onException: () -> Unit, infinite : Boolean = true) { function: suspend () -> Unit,
onException: () -> Unit, infinite: Boolean = true
) {
var successfull: Boolean = false var successfull: Boolean = false
@@ -41,7 +43,7 @@ class ExponentialBackoff {
currentDelay = min(currentDelay, maxDelay) currentDelay = min(currentDelay, maxDelay)
// finite vs infinite exponential backoff // finite vs infinite exponential backoff
if(currentDelay == maxDelay && !infinite){ if (currentDelay == maxDelay && !infinite) {
break break
} }

View File

@@ -27,7 +27,8 @@ class PromWorker(
private val collectorRegistry = CollectorRegistry() private val collectorRegistry = CollectorRegistry()
private val metricsEngine: MetricsEngine = MetricsEngine(context) private val metricsEngine: MetricsEngine = MetricsEngine(context)
private val androidCustomExporter : AndroidCustomExporter = AndroidCustomExporter(metricsEngine).register(collectorRegistry) private val androidCustomExporter: AndroidCustomExporter =
AndroidCustomExporter(metricsEngine).register(collectorRegistry)
private lateinit var pushProxClient: PushProxClient private lateinit var pushProxClient: PushProxClient
private var remoteWriteSender: RemoteWriteSender? = null private var remoteWriteSender: RemoteWriteSender? = null
@@ -42,16 +43,16 @@ class PromWorker(
return writer.toString() return writer.toString()
} }
private suspend fun countSuccessfulScrape(){ private suspend fun countSuccessfulScrape() {
remoteWriteSender?.countSuccessfulScrape() remoteWriteSender?.countSuccessfulScrape()
} }
@OptIn(DelicateCoroutinesApi::class) @OptIn(DelicateCoroutinesApi::class)
private suspend fun startServicesInOneThread(config: PromConfiguration){ private suspend fun startServicesInOneThread(config: PromConfiguration) {
val threadContext = newSingleThreadContext("PromWorkerThreadContext") val threadContext = newSingleThreadContext("PromWorkerThreadContext")
coroutineScope { coroutineScope {
withContext(threadContext){ withContext(threadContext) {
if (config.remoteWriteEnabled) { if (config.remoteWriteEnabled) {
remoteWriteSender = RemoteWriteSender( remoteWriteSender = RemoteWriteSender(

View File

@@ -28,34 +28,35 @@ private const val TAG: String = "REMOTE_WRITE_SENDER"
// //
// Only timestamps of succesfull scrapes are stored // Only timestamps of succesfull scrapes are stored
private class LastTimeRingBuffer(private val scrapeIntervalMs: Int) { private class LastTimeRingBuffer(private val scrapeIntervalMs: Int) {
private val buffer : Array<Long> = Array(hysteresisThreshold) { 0 } private val buffer: Array<Long> = Array(hysteresisThreshold) { 0 }
private var firstIndex : Int = 0 private var firstIndex: Int = 0
companion object{
private const val hysteresisThreshold : Int = 3 companion object {
private const val hysteresisThreshold: Int = 3
} }
fun setLastTime(timestamp : Long) { fun setLastTime(timestamp: Long) {
firstIndex = firstIndex++ % hysteresisThreshold firstIndex = firstIndex++ % hysteresisThreshold
buffer[firstIndex] = timestamp buffer[firstIndex] = timestamp
} }
private fun getTimeByIndex(index : Int) : Long { private fun getTimeByIndex(index: Int): Long {
if(index > hysteresisThreshold - 1){ if (index > hysteresisThreshold - 1) {
throw IllegalArgumentException("index cannot be bigger than hysteresisThreshold") throw IllegalArgumentException("index cannot be bigger than hysteresisThreshold")
} }
val bufferIndex : Int = firstIndex + index % hysteresisThreshold val bufferIndex: Int = firstIndex + index % hysteresisThreshold
return buffer[bufferIndex] return buffer[bufferIndex]
} }
fun checkScrapeDidNotHappenInTime() : Boolean { fun checkScrapeDidNotHappenInTime(): Boolean {
return getTimeByIndex(0) < System.currentTimeMillis() - 3 * scrapeIntervalMs return getTimeByIndex(0) < System.currentTimeMillis() - 3 * scrapeIntervalMs
} }
fun checkScrapeDidNotHappenHysteresis() : Boolean { fun checkScrapeDidNotHappenHysteresis(): Boolean {
val scrapeOccurredAfterThis : Long = System.currentTimeMillis() - 5 * scrapeIntervalMs val scrapeOccurredAfterThis: Long = System.currentTimeMillis() - 5 * scrapeIntervalMs
for (i in 0 until hysteresisThreshold) { for (i in 0 until hysteresisThreshold) {
if (getTimeByIndex(i) < scrapeOccurredAfterThis){ if (getTimeByIndex(i) < scrapeOccurredAfterThis) {
return true return true
} }
} }
@@ -69,33 +70,33 @@ data class RemoteWriteConfiguration(
val remoteWriteEndpoint: String, val remoteWriteEndpoint: String,
val collectorRegistry: CollectorRegistry, val collectorRegistry: CollectorRegistry,
val maxSamplesPerExport: Int, val maxSamplesPerExport: Int,
val exportInterval : Int, val exportInterval: Int,
val getContext : () -> Context, val getContext: () -> Context,
) )
class RemoteWriteSender(private val config: RemoteWriteConfiguration) { class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
private val lastTimeRingBuffer = LastTimeRingBuffer(config.scrapeInterval * 1000) private val lastTimeRingBuffer = LastTimeRingBuffer(config.scrapeInterval * 1000)
private val storage : RemoteWriteSenderStorage = RemoteWriteSenderSimpleMemoryStorage() private val storage: RemoteWriteSenderStorage = RemoteWriteSenderSimpleMemoryStorage()
private var scrapesAreBeingSent : Boolean = false private var scrapesAreBeingSent: Boolean = false
private lateinit var client : HttpClient private lateinit var client: HttpClient
private var lastTimeRemoteWriteSent : Long = 0 private var lastTimeRemoteWriteSent: Long = 0
private var remoteWriteOn : Boolean = false private var remoteWriteOn: Boolean = false
private suspend fun performScrapeAndSaveIt(channel : Channel<Unit>) { private suspend fun performScrapeAndSaveIt(channel: Channel<Unit>) {
val scrapedMetrics = config.collectorRegistry.metricFamilySamples() val scrapedMetrics = config.collectorRegistry.metricFamilySamples()
storage.writeScrapedSample(scrapedMetrics) storage.writeScrapedSample(scrapedMetrics)
channel.send(Unit) channel.send(Unit)
} }
private suspend fun scraper(channel : Channel<Unit>){ private suspend fun scraper(channel: Channel<Unit>) {
val checkDelay = 1000L val checkDelay = 1000L
while (true){ while (true) {
if (lastTimeRingBuffer.checkScrapeDidNotHappenInTime()){ if (lastTimeRingBuffer.checkScrapeDidNotHappenInTime()) {
remoteWriteOn = true remoteWriteOn = true
performScrapeAndSaveIt(channel) performScrapeAndSaveIt(channel)
delay(config.scrapeInterval * 1000L) delay(config.scrapeInterval * 1000L)
while(lastTimeRingBuffer.checkScrapeDidNotHappenHysteresis()){ while (lastTimeRingBuffer.checkScrapeDidNotHappenHysteresis()) {
delay(config.scrapeInterval * 1000L) delay(config.scrapeInterval * 1000L)
performScrapeAndSaveIt(channel) performScrapeAndSaveIt(channel)
} }
@@ -104,44 +105,44 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
} }
} }
private suspend fun sendAll(){ private suspend fun sendAll() {
scrapesAreBeingSent = true scrapesAreBeingSent = true
while (!storage.isEmpty()){ while (!storage.isEmpty()) {
val body = storage.getScrapedSamplesCompressedProtobuf(config.maxSamplesPerExport) val body = storage.getScrapedSamplesCompressedProtobuf(config.maxSamplesPerExport)
ExponentialBackoff.runWithBackoff( {sendRequestToRemoteWrite(body)}, {}, false) ExponentialBackoff.runWithBackoff({ sendRequestToRemoteWrite(body) }, {}, false)
} }
lastTimeRemoteWriteSent = System.currentTimeMillis() lastTimeRemoteWriteSent = System.currentTimeMillis()
} }
private fun deviceHasInternet() : Boolean { private fun deviceHasInternet(): Boolean {
val connectivityManager = config.getContext() val connectivityManager = config.getContext()
.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager? .getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager?
if (connectivityManager != null){ if (connectivityManager != null) {
val network = connectivityManager.getActiveNetworkCompat() val network = connectivityManager.getActiveNetworkCompat()
val cap = connectivityManager.getNetworkCapabilities(network) val cap = connectivityManager.getNetworkCapabilities(network)
if (cap != null && cap.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)){ if (cap != null && cap.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)) {
return true return true
} }
} }
return false return false
} }
private fun timeHasPassed() : Boolean { private fun timeHasPassed(): Boolean {
return lastTimeRemoteWriteSent < System.currentTimeMillis() - config.exportInterval * 1000 return lastTimeRemoteWriteSent < System.currentTimeMillis() - config.exportInterval * 1000
} }
private fun conditionsForRemoteWrite() : Boolean { private fun conditionsForRemoteWrite(): Boolean {
return deviceHasInternet() && ( timeHasPassed() || enoughSamplesScraped() ) return deviceHasInternet() && (timeHasPassed() || enoughSamplesScraped())
} }
private fun enoughSamplesScraped() : Boolean { private fun enoughSamplesScraped(): Boolean {
return storage.getLength() > config.maxSamplesPerExport return storage.getLength() > config.maxSamplesPerExport
} }
private suspend fun senderManager(channel : Channel<Unit>){ private suspend fun senderManager(channel: Channel<Unit>) {
while (true) { while (true) {
if (storage.isEmpty()){ if (storage.isEmpty()) {
// channel is conflated, one receive is enough // channel is conflated, one receive is enough
// suspend here until sending remote write is needed // suspend here until sending remote write is needed
channel.receive() channel.receive()
@@ -157,37 +158,37 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
} }
// entrypoint // entrypoint
suspend fun start(){ suspend fun start() {
// conflated channel // conflated channel
val channel = Channel<Unit>(1, onBufferOverflow = BufferOverflow.DROP_OLDEST) val channel = Channel<Unit>(1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
client = HttpClient() client = HttpClient()
try { try {
runBlocking { runBlocking {
launch { launch {
// check for outage in scrapes, save scrapes to storage // check for outage in scrapes, save scrapes to storage
scraper(channel) scraper(channel)
} }
launch { launch {
// send saved scrapes to remote write endpoint // send saved scrapes to remote write endpoint
senderManager(channel) senderManager(channel)
} }
} }
} finally { } finally {
withContext(NonCancellable){ withContext(NonCancellable) {
channel.close() channel.close()
client.close() client.close()
Log.v(TAG, "Canceling Remote Write Sender") Log.v(TAG, "Canceling Remote Write Sender")
} }
} }
} }
fun countSuccessfulScrape(){ fun countSuccessfulScrape() {
Log.v(TAG, "Counting successful scrape") Log.v(TAG, "Counting successful scrape")
lastTimeRingBuffer.setLastTime(System.currentTimeMillis()) lastTimeRingBuffer.setLastTime(System.currentTimeMillis())
} }
private suspend fun sendRequestToRemoteWrite(body : ByteArray){ private suspend fun sendRequestToRemoteWrite(body: ByteArray) {
Log.v(TAG, "sending to prometheus remote write now") Log.v(TAG, "sending to prometheus remote write now")
val response = client.post(config.remoteWriteEndpoint) { val response = client.post(config.remoteWriteEndpoint) {
setBody(body) setBody(body)

View File

@@ -11,7 +11,7 @@ import java.util.Enumeration
import java.util.LinkedList import java.util.LinkedList
import java.util.Queue import java.util.Queue
private const val TAG : String = "REMOTE_WRITE_SENDER_STORAGE" private const val TAG: String = "REMOTE_WRITE_SENDER_STORAGE"
// This is a very primitive implementation, may require some optimization later // This is a very primitive implementation, may require some optimization later
// //
@@ -24,21 +24,22 @@ typealias MetricsScrape = Enumeration<MetricFamilySamples>
private typealias ConverterHashMap = HashMap<List<TimeSeriesLabel>, MutableList<TimeSeriesSample>> private typealias ConverterHashMap = HashMap<List<TimeSeriesLabel>, MutableList<TimeSeriesSample>>
private data class TimeSeriesLabel( private data class TimeSeriesLabel(
val name : String, val name: String,
val value : String, val value: String,
){ ) {
fun toProtobufLabel() : Label{ fun toProtobufLabel(): Label {
return Label.newBuilder() return Label.newBuilder()
.setName(this.name) .setName(this.name)
.setValue(this.value) .setValue(this.value)
.build() .build()
} }
} }
private data class TimeSeriesSample( private data class TimeSeriesSample(
val timeStampMs : Long, val timeStampMs: Long,
val value : Double, val value: Double,
){ ) {
fun toProtobufSample() : Sample{ fun toProtobufSample(): Sample {
return Sample.newBuilder() return Sample.newBuilder()
.setTimestamp(this.timeStampMs) .setTimestamp(this.timeStampMs)
.setValue(this.value) .setValue(this.value)
@@ -47,26 +48,29 @@ private data class TimeSeriesSample(
} }
abstract class RemoteWriteSenderStorage { abstract class RemoteWriteSenderStorage {
private val remoteWriteLabel : TimeSeriesLabel = TimeSeriesLabel( private val remoteWriteLabel: TimeSeriesLabel = TimeSeriesLabel(
name = "backfill", name = "backfill",
value = "true", value = "true",
) )
protected fun encodeWithSnappy(data: ByteArray): ByteArray { protected fun encodeWithSnappy(data: ByteArray): ByteArray {
return Snappy.compress(data) return Snappy.compress(data)
} }
private fun processLabels(sample : MetricFamilySamples.Sample, private fun processLabels(
sampleName: String) : List<TimeSeriesLabel>{ sample: MetricFamilySamples.Sample,
sampleName: String
): List<TimeSeriesLabel> {
val result : MutableList<TimeSeriesLabel> = mutableListOf() val result: MutableList<TimeSeriesLabel> = mutableListOf()
// labels are stored in parallel lists -> iterate over two lists at a time // labels are stored in parallel lists -> iterate over two lists at a time
val sampleLabelNamesIterator = sample.labelNames.iterator() val sampleLabelNamesIterator = sample.labelNames.iterator()
val sampleLabelValuesIterator = sample.labelNames.iterator() val sampleLabelValuesIterator = sample.labelNames.iterator()
while (sampleLabelNamesIterator.hasNext() && sampleLabelValuesIterator.hasNext()) { while (sampleLabelNamesIterator.hasNext() && sampleLabelValuesIterator.hasNext()) {
val labelName : String = sampleLabelNamesIterator.next() val labelName: String = sampleLabelNamesIterator.next()
val labelValue : String = sampleLabelValuesIterator.next() val labelValue: String = sampleLabelValuesIterator.next()
val label = TimeSeriesLabel( val label = TimeSeriesLabel(
name = labelName, name = labelName,
@@ -83,8 +87,8 @@ abstract class RemoteWriteSenderStorage {
return result.toList() return result.toList()
} }
private fun getTimeSeriesSample(sample : MetricFamilySamples.Sample) : TimeSeriesSample{ private fun getTimeSeriesSample(sample: MetricFamilySamples.Sample): TimeSeriesSample {
val timestampMs : Long = sample.timestampMs ?: System.currentTimeMillis() val timestampMs: Long = sample.timestampMs ?: System.currentTimeMillis()
return TimeSeriesSample( return TimeSeriesSample(
value = sample.value, value = sample.value,
@@ -93,18 +97,19 @@ abstract class RemoteWriteSenderStorage {
} }
private fun processTimeSeries( private fun processTimeSeries(
hashMap: ConverterHashMap, familySample : MetricFamilySamples){ hashMap: ConverterHashMap, familySample: MetricFamilySamples
) {
for ( sample in familySample.samples ) { for (sample in familySample.samples) {
val sampleName : String = sample.name val sampleName: String = sample.name
val labels : List<TimeSeriesLabel> = processLabels(sample, sampleName) val labels: List<TimeSeriesLabel> = processLabels(sample, sampleName)
val timeSeriesSample : TimeSeriesSample = getTimeSeriesSample(sample) val timeSeriesSample: TimeSeriesSample = getTimeSeriesSample(sample)
if (hashMap[labels] == null){ if (hashMap[labels] == null) {
// this time series does not yet exist // this time series does not yet exist
hashMap[labels] = mutableListOf(timeSeriesSample) hashMap[labels] = mutableListOf(timeSeriesSample)
}else{ } else {
// this time series already exists // this time series already exists
hashMap[labels]!!.add(timeSeriesSample) hashMap[labels]!!.add(timeSeriesSample)
} }
@@ -112,25 +117,26 @@ abstract class RemoteWriteSenderStorage {
} }
private fun hashMapEntryToProtobufTimeSeries( private fun hashMapEntryToProtobufTimeSeries(
labels : List<TimeSeriesLabel>, samples : MutableList<TimeSeriesSample>) : TimeSeries { labels: List<TimeSeriesLabel>, samples: MutableList<TimeSeriesSample>
): TimeSeries {
val timeSeriesBuilder : TimeSeries.Builder = TimeSeries.newBuilder() val timeSeriesBuilder: TimeSeries.Builder = TimeSeries.newBuilder()
timeSeriesBuilder.addAllLabels(labels.map{ timeSeriesBuilder.addAllLabels(labels.map {
it.toProtobufLabel() it.toProtobufLabel()
}) })
timeSeriesBuilder.addAllSamples(samples.map{ timeSeriesBuilder.addAllSamples(samples.map {
it.toProtobufSample() it.toProtobufSample()
}) })
return timeSeriesBuilder.build() return timeSeriesBuilder.build()
} }
private fun hashmapToProtobufWriteRequest(hashMap: ConverterHashMap) : WriteRequest{ private fun hashmapToProtobufWriteRequest(hashMap: ConverterHashMap): WriteRequest {
val writeRequestBuilder : WriteRequest.Builder = WriteRequest.newBuilder() val writeRequestBuilder: WriteRequest.Builder = WriteRequest.newBuilder()
for (entry in hashMap){ for (entry in hashMap) {
val timeSeries = hashMapEntryToProtobufTimeSeries(entry.key, entry.value) val timeSeries = hashMapEntryToProtobufTimeSeries(entry.key, entry.value)
writeRequestBuilder.addTimeseries(timeSeries) writeRequestBuilder.addTimeseries(timeSeries)
} }
@@ -138,20 +144,20 @@ abstract class RemoteWriteSenderStorage {
return writeRequestBuilder.build() return writeRequestBuilder.build()
} }
protected fun metricsScrapeListToProtobuf(input: List<MetricsScrape>) : WriteRequest { protected fun metricsScrapeListToProtobuf(input: List<MetricsScrape>): WriteRequest {
if(input.isEmpty()){ if (input.isEmpty()) {
throw Exception("Input is empty!") throw Exception("Input is empty!")
} }
val hashmap : ConverterHashMap = HashMap() val hashmap: ConverterHashMap = HashMap()
for ( metricsScrape in input ){ for (metricsScrape in input) {
for ( familySample in metricsScrape ) { for (familySample in metricsScrape) {
processTimeSeries(hashmap, familySample) processTimeSeries(hashmap, familySample)
} }
} }
val result : WriteRequest = hashmapToProtobufWriteRequest(hashmap) val result: WriteRequest = hashmapToProtobufWriteRequest(hashmap)
Log.v(TAG, result.timeseriesList.toString() + "<- protobuf") Log.v(TAG, result.timeseriesList.toString() + "<- protobuf")
@@ -166,34 +172,34 @@ abstract class RemoteWriteSenderStorage {
} }
class RemoteWriteSenderSimpleMemoryStorage : RemoteWriteSenderStorage() { class RemoteWriteSenderSimpleMemoryStorage : RemoteWriteSenderStorage() {
private val data : Queue<MetricsScrape> = LinkedList() private val data: Queue<MetricsScrape> = LinkedList()
override fun getScrapedSamplesCompressedProtobuf(howMany: Int): ByteArray { override fun getScrapedSamplesCompressedProtobuf(howMany: Int): ByteArray {
if (howMany < 1){ if (howMany < 1) {
throw IllegalArgumentException("howMany must be bigger than zero") throw IllegalArgumentException("howMany must be bigger than zero")
} }
val scrapedMetrics : MutableList<MetricsScrape> = mutableListOf() val scrapedMetrics: MutableList<MetricsScrape> = mutableListOf()
for (i in 1..howMany){ for (i in 1..howMany) {
val oneMetric : MetricsScrape? = data.poll() val oneMetric: MetricsScrape? = data.poll()
if(oneMetric == null){ if (oneMetric == null) {
break break
}else{ } else {
scrapedMetrics.add(oneMetric) scrapedMetrics.add(oneMetric)
} }
} }
val writeRequest : WriteRequest = this.metricsScrapeListToProtobuf(scrapedMetrics.toList()) val writeRequest: WriteRequest = this.metricsScrapeListToProtobuf(scrapedMetrics.toList())
val bytes : ByteArray = writeRequest.toByteArray() val bytes: ByteArray = writeRequest.toByteArray()
return this.encodeWithSnappy(bytes) return this.encodeWithSnappy(bytes)
} }
override fun removeNumberOfScrapedSamples(number: Int) { override fun removeNumberOfScrapedSamples(number: Int) {
if (number > 0){ if (number > 0) {
for (i in 1..number){ for (i in 1..number) {
data.remove() data.remove()
} }
}else{ } else {
throw IllegalArgumentException("number must by higher than 0") throw IllegalArgumentException("number must by higher than 0")
} }
} }

View File

@@ -4,18 +4,18 @@ private const val TAG = "SCRAPE_RECORDER"
//TODO implement this thing //TODO implement this thing
// mutex with last scraped time // mutex with last scraped time
class ScrapeRecorder{ class ScrapeRecorder {
//TODO mutex variable if mode is {pushprox / prometheus server} or {remote write} //TODO mutex variable if mode is {pushprox / prometheus server} or {remote write}
//TODO go back to mode {pushprox / prometheus server} only after N succesfull scrapes and no failures //TODO go back to mode {pushprox / prometheus server} only after N succesfull scrapes and no failures
fun countSuccesfullScrape(){ fun countSuccesfullScrape() {
//TODO implement this thing //TODO implement this thing
// write to mutex that scrape has happend at this current time // write to mutex that scrape has happend at this current time
// set timer to 2 x remote_write_scrape_interval seconds to check if next scrape has happened // set timer to 2 x remote_write_scrape_interval seconds to check if next scrape has happened
} }
private fun onTimerTick(){ private fun onTimerTick() {
//TODO implement this //TODO implement this
// check if other scrape has happened // check if other scrape has happened
// if no scrape happened, go to mode {remote write} // if no scrape happened, go to mode {remote write}