mirror of
https://github.com/mii443/prometheus-android-exporter.git
synced 2025-08-22 15:15:35 +00:00
code format
This commit is contained in:
@ -115,7 +115,7 @@ data class PromConfiguration(
|
|||||||
fun fromWorkData(data: Data): PromConfiguration {
|
fun fromWorkData(data: Data): PromConfiguration {
|
||||||
val jsonString: String = data.getString("json")
|
val jsonString: String = data.getString("json")
|
||||||
?: throw Exception("PromConfiguration serialization not working correctly!")
|
?: throw Exception("PromConfiguration serialization not working correctly!")
|
||||||
return Json.decodeFromString<PromConfiguration>(jsonString)
|
return Json.decodeFromString(jsonString)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun loadFromConfigFile(context: Context): PromConfiguration {
|
fun loadFromConfigFile(context: Context): PromConfiguration {
|
||||||
|
@ -39,7 +39,7 @@ import androidx.compose.ui.text.style.TextAlign
|
|||||||
import androidx.compose.ui.unit.dp
|
import androidx.compose.ui.unit.dp
|
||||||
import androidx.navigation.NavHostController
|
import androidx.navigation.NavHostController
|
||||||
|
|
||||||
private const val TAG : String = "HOMEPAGE"
|
private const val TAG: String = "HOMEPAGE"
|
||||||
|
|
||||||
@Composable
|
@Composable
|
||||||
fun HomePage(
|
fun HomePage(
|
||||||
|
@ -43,7 +43,7 @@ class MainActivity : ComponentActivity() {
|
|||||||
navController: NavHostController = rememberNavController(),
|
navController: NavHostController = rememberNavController(),
|
||||||
promViewModel: PromViewModel
|
promViewModel: PromViewModel
|
||||||
) {
|
) {
|
||||||
val startDestination: String = "homepage"
|
val startDestination = "homepage"
|
||||||
NavHost(
|
NavHost(
|
||||||
navController = navController,
|
navController = navController,
|
||||||
startDestination = startDestination,
|
startDestination = startDestination,
|
||||||
|
@ -8,7 +8,7 @@ import kotlinx.coroutines.delay
|
|||||||
import kotlin.math.min
|
import kotlin.math.min
|
||||||
import kotlin.math.pow
|
import kotlin.math.pow
|
||||||
|
|
||||||
private const val TAG : String = "EXPONENTIAL_BACKOFF"
|
private const val TAG: String = "EXPONENTIAL_BACKOFF"
|
||||||
|
|
||||||
class ExponentialBackoff {
|
class ExponentialBackoff {
|
||||||
companion object {
|
companion object {
|
||||||
|
@ -7,13 +7,14 @@ import android.content.Intent
|
|||||||
import android.content.IntentFilter
|
import android.content.IntentFilter
|
||||||
import android.hardware.SensorManager
|
import android.hardware.SensorManager
|
||||||
import android.os.BatteryManager
|
import android.os.BatteryManager
|
||||||
import androidx.core.content.ContextCompat.getSystemService
|
|
||||||
|
|
||||||
class MetricsEngine(private val context: Context) {
|
class MetricsEngine(private val context: Context) {
|
||||||
private lateinit var sensorManager : SensorManager;
|
private lateinit var sensorManager: SensorManager;
|
||||||
|
|
||||||
init {
|
init {
|
||||||
//sensorManager = getSystemService(Context.SENSOR_SERVICE) as SensorManager
|
//sensorManager = getSystemService(Context.SENSOR_SERVICE) as SensorManager
|
||||||
}
|
}
|
||||||
|
|
||||||
public fun batteryChargeRatio(): Float {
|
public fun batteryChargeRatio(): Float {
|
||||||
val batteryStatus: Intent? = IntentFilter(Intent.ACTION_BATTERY_CHANGED).let { intFilter ->
|
val batteryStatus: Intent? = IntentFilter(Intent.ACTION_BATTERY_CHANGED).let { intFilter ->
|
||||||
context.registerReceiver(null, intFilter)
|
context.registerReceiver(null, intFilter)
|
||||||
@ -39,7 +40,6 @@ class MetricsEngine(private val context: Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//TODO
|
//TODO
|
||||||
///TYPE_ACCELEROMETER Yes Yes Yes Yes
|
///TYPE_ACCELEROMETER Yes Yes Yes Yes
|
||||||
//TYPE_AMBIENT_TEMPERATURE Yes n/a n/a n/a
|
//TYPE_AMBIENT_TEMPERATURE Yes n/a n/a n/a
|
||||||
|
@ -4,9 +4,7 @@ package com.birdthedeveloper.prometheus.android.exporter.worker
|
|||||||
|
|
||||||
import android.app.NotificationManager
|
import android.app.NotificationManager
|
||||||
import android.content.Context
|
import android.content.Context
|
||||||
import android.os.Build
|
|
||||||
import android.util.Log
|
import android.util.Log
|
||||||
import androidx.annotation.RequiresApi
|
|
||||||
import androidx.core.app.NotificationCompat
|
import androidx.core.app.NotificationCompat
|
||||||
import androidx.work.CoroutineWorker
|
import androidx.work.CoroutineWorker
|
||||||
import androidx.work.ForegroundInfo
|
import androidx.work.ForegroundInfo
|
||||||
@ -20,14 +18,12 @@ import io.prometheus.client.exporter.common.TextFormat
|
|||||||
import kotlinx.coroutines.DelicateCoroutinesApi
|
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||||
import kotlinx.coroutines.NonCancellable
|
import kotlinx.coroutines.NonCancellable
|
||||||
import kotlinx.coroutines.coroutineScope
|
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import kotlinx.coroutines.newFixedThreadPoolContext
|
import kotlinx.coroutines.newFixedThreadPoolContext
|
||||||
import kotlinx.coroutines.newSingleThreadContext
|
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
import java.io.StringWriter
|
import java.io.StringWriter
|
||||||
|
|
||||||
private const val TAG : String = "PROM_WORKER"
|
private const val TAG: String = "PROM_WORKER"
|
||||||
|
|
||||||
class PromWorker(
|
class PromWorker(
|
||||||
private val context: Context,
|
private val context: Context,
|
||||||
@ -64,9 +60,9 @@ class PromWorker(
|
|||||||
@OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class)
|
@OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class)
|
||||||
private suspend fun startServicesInOneThread(config: PromConfiguration) {
|
private suspend fun startServicesInOneThread(config: PromConfiguration) {
|
||||||
val backgroundDispatcher = newFixedThreadPoolContext(1, "Prom worker")
|
val backgroundDispatcher = newFixedThreadPoolContext(1, "Prom worker")
|
||||||
val threadContext = backgroundDispatcher.limitedParallelism(1)
|
val threadContext = backgroundDispatcher.limitedParallelism(1)
|
||||||
|
|
||||||
try{
|
try {
|
||||||
withContext(threadContext) {
|
withContext(threadContext) {
|
||||||
|
|
||||||
if (config.remoteWriteEnabled) {
|
if (config.remoteWriteEnabled) {
|
||||||
@ -106,7 +102,7 @@ class PromWorker(
|
|||||||
pushProxFqdn = config.pushproxFqdn,
|
pushProxFqdn = config.pushproxFqdn,
|
||||||
registry = collectorRegistry,
|
registry = collectorRegistry,
|
||||||
countSuccessfulScrape = ::countSuccessfulScrape,
|
countSuccessfulScrape = ::countSuccessfulScrape,
|
||||||
getContext = {context}
|
getContext = { context }
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
Log.d(TAG, "PushProx launching now")
|
Log.d(TAG, "PushProx launching now")
|
||||||
@ -116,7 +112,7 @@ class PromWorker(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}finally {
|
} finally {
|
||||||
withContext(NonCancellable) {
|
withContext(NonCancellable) {
|
||||||
Log.v(TAG, "Canceling prom worker")
|
Log.v(TAG, "Canceling prom worker")
|
||||||
backgroundDispatcher.close()
|
backgroundDispatcher.close()
|
||||||
@ -132,6 +128,7 @@ class PromWorker(
|
|||||||
|
|
||||||
return Result.success()
|
return Result.success()
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun getForegroundInfo(): ForegroundInfo {
|
override suspend fun getForegroundInfo(): ForegroundInfo {
|
||||||
return createForegroundInfo()
|
return createForegroundInfo()
|
||||||
}
|
}
|
||||||
|
@ -27,7 +27,7 @@ data class PrometheusServerConfig(
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Expose metrics on given port using Ktor http server
|
// Expose metrics on given port using Ktor http server
|
||||||
class PrometheusServer() {
|
class PrometheusServer {
|
||||||
companion object {
|
companion object {
|
||||||
suspend fun start(config: PrometheusServerConfig) {
|
suspend fun start(config: PrometheusServerConfig) {
|
||||||
Log.d(TAG, "Starting prometheus server")
|
Log.d(TAG, "Starting prometheus server")
|
||||||
@ -87,7 +87,7 @@ class PrometheusServer() {
|
|||||||
call.respondText(getLandingPage())
|
call.respondText(getLandingPage())
|
||||||
}
|
}
|
||||||
get("/metrics") {
|
get("/metrics") {
|
||||||
val response : String = config.performScrape()
|
val response: String = config.performScrape()
|
||||||
call.respondText(response)
|
call.respondText(response)
|
||||||
config.countSuccessfulScrape()
|
config.countSuccessfulScrape()
|
||||||
Log.d(TAG, "Successful scrape")
|
Log.d(TAG, "Successful scrape")
|
||||||
|
@ -16,7 +16,6 @@ import io.prometheus.client.CollectorRegistry
|
|||||||
import io.prometheus.client.Counter
|
import io.prometheus.client.Counter
|
||||||
import kotlinx.coroutines.CancellationException
|
import kotlinx.coroutines.CancellationException
|
||||||
import kotlinx.coroutines.NonCancellable
|
import kotlinx.coroutines.NonCancellable
|
||||||
import kotlinx.coroutines.delay
|
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
|
|
||||||
private const val TAG = "PUSHPROX_CLIENT"
|
private const val TAG = "PUSHPROX_CLIENT"
|
||||||
@ -28,7 +27,7 @@ data class PushProxConfig(
|
|||||||
val registry: CollectorRegistry,
|
val registry: CollectorRegistry,
|
||||||
val performScrape: () -> String,
|
val performScrape: () -> String,
|
||||||
val countSuccessfulScrape: suspend () -> Unit,
|
val countSuccessfulScrape: suspend () -> Unit,
|
||||||
val getContext : () -> Context,
|
val getContext: () -> Context,
|
||||||
)
|
)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -72,7 +71,7 @@ data class PushProxContext(
|
|||||||
val pollUrl: String,
|
val pollUrl: String,
|
||||||
val pushUrl: String,
|
val pushUrl: String,
|
||||||
val fqdn: String,
|
val fqdn: String,
|
||||||
val getContext : () -> Context,
|
val getContext: () -> Context,
|
||||||
)
|
)
|
||||||
|
|
||||||
// This is a stripped down kotlin implementation of github.com/prometheus-community/PushProx client
|
// This is a stripped down kotlin implementation of github.com/prometheus-community/PushProx client
|
||||||
@ -98,7 +97,7 @@ class PushProxClient(private val pushProxConfig: PushProxConfig) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun createClient() : HttpClient {
|
private fun createClient(): HttpClient {
|
||||||
Log.d(TAG, "Creating http client ktor")
|
Log.d(TAG, "Creating http client ktor")
|
||||||
return HttpClient(Android) {
|
return HttpClient(Android) {
|
||||||
engine {
|
engine {
|
||||||
@ -118,8 +117,8 @@ class PushProxClient(private val pushProxConfig: PushProxConfig) {
|
|||||||
modifiedProxyURL = "http://$modifiedProxyURL"
|
modifiedProxyURL = "http://$modifiedProxyURL"
|
||||||
}
|
}
|
||||||
|
|
||||||
val pollURL: String = "$modifiedProxyURL/poll"
|
val pollURL = "$modifiedProxyURL/poll"
|
||||||
val pushURL: String = "$modifiedProxyURL/push"
|
val pushURL = "$modifiedProxyURL/push"
|
||||||
|
|
||||||
return PushProxContext(
|
return PushProxContext(
|
||||||
client,
|
client,
|
||||||
@ -132,7 +131,7 @@ class PushProxClient(private val pushProxConfig: PushProxConfig) {
|
|||||||
|
|
||||||
// Continuously poll from pushprox gateway
|
// Continuously poll from pushprox gateway
|
||||||
private suspend fun doPoll(context: PushProxContext) {
|
private suspend fun doPoll(context: PushProxContext) {
|
||||||
if(Util.deviceIsConnectedToInternet(context.getContext())){
|
if (Util.deviceIsConnectedToInternet(context.getContext())) {
|
||||||
Log.d(TAG, "Polling now")
|
Log.d(TAG, "Polling now")
|
||||||
val response: HttpResponse = context.client.post(context.pollUrl) {
|
val response: HttpResponse = context.client.post(context.pollUrl) {
|
||||||
setBody(context.fqdn)
|
setBody(context.fqdn)
|
||||||
@ -140,7 +139,7 @@ class PushProxClient(private val pushProxConfig: PushProxConfig) {
|
|||||||
val responseBody: String = response.body()
|
val responseBody: String = response.body()
|
||||||
doPush(context, responseBody)
|
doPush(context, responseBody)
|
||||||
Log.d(TAG, "Polling finished")
|
Log.d(TAG, "Polling finished")
|
||||||
}else{
|
} else {
|
||||||
Log.d(TAG, "Skipping poll because network not available")
|
Log.d(TAG, "Skipping poll because network not available")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -189,7 +188,7 @@ class PushProxClient(private val pushProxConfig: PushProxConfig) {
|
|||||||
|
|
||||||
pushProxConfig.countSuccessfulScrape()
|
pushProxConfig.countSuccessfulScrape()
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
if (e is CancellationException){
|
if (e is CancellationException) {
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
counters.pushError()
|
counters.pushError()
|
||||||
|
@ -3,8 +3,6 @@
|
|||||||
package com.birdthedeveloper.prometheus.android.exporter.worker
|
package com.birdthedeveloper.prometheus.android.exporter.worker
|
||||||
|
|
||||||
import android.content.Context
|
import android.content.Context
|
||||||
import android.net.ConnectivityManager
|
|
||||||
import android.net.NetworkCapabilities
|
|
||||||
import android.util.Log
|
import android.util.Log
|
||||||
import io.ktor.client.HttpClient
|
import io.ktor.client.HttpClient
|
||||||
import io.ktor.client.request.header
|
import io.ktor.client.request.header
|
||||||
@ -21,7 +19,6 @@ import kotlinx.coroutines.coroutineScope
|
|||||||
import kotlinx.coroutines.delay
|
import kotlinx.coroutines.delay
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
import java.lang.IndexOutOfBoundsException
|
|
||||||
|
|
||||||
private const val TAG: String = "REMOTE_WRITE_SENDER"
|
private const val TAG: String = "REMOTE_WRITE_SENDER"
|
||||||
|
|
||||||
@ -35,8 +32,8 @@ internal class LastTimeRingBuffer(private val scrapeInterval: Int) {
|
|||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
private const val hysteresisMemory: Int = 3
|
private const val hysteresisMemory: Int = 3
|
||||||
private const val hysteresisCoefficient : Double = 1.2
|
private const val hysteresisCoefficient: Double = 1.2
|
||||||
private const val scrapeTimeCoefficient : Double = 2.2
|
private const val scrapeTimeCoefficient: Double = 2.2
|
||||||
}
|
}
|
||||||
|
|
||||||
fun setLastTime(timestamp: Long) {
|
fun setLastTime(timestamp: Long) {
|
||||||
@ -50,15 +47,15 @@ internal class LastTimeRingBuffer(private val scrapeInterval: Int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
val bufferIndex: Int = (firstIndex - index)
|
val bufferIndex: Int = (firstIndex - index)
|
||||||
return if (bufferIndex < 0){
|
return if (bufferIndex < 0) {
|
||||||
buffer[hysteresisMemory + bufferIndex]
|
buffer[hysteresisMemory + bufferIndex]
|
||||||
}else{
|
} else {
|
||||||
buffer[bufferIndex]
|
buffer[bufferIndex]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun checkScrapeDidNotHappenInTime(): Boolean {
|
fun checkScrapeDidNotHappenInTime(): Boolean {
|
||||||
val now : Long = System.currentTimeMillis()
|
val now: Long = System.currentTimeMillis()
|
||||||
return getTimeByIndex(0) < now - scrapeTimeCoefficient * scrapeInterval * 1000
|
return getTimeByIndex(0) < now - scrapeTimeCoefficient * scrapeInterval * 1000
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,7 +73,7 @@ internal class LastTimeRingBuffer(private val scrapeInterval: Int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class TryExportMetricsAgainException(message : String) : Exception(message)
|
class TryExportMetricsAgainException(message: String) : Exception(message)
|
||||||
|
|
||||||
data class RemoteWriteConfiguration(
|
data class RemoteWriteConfiguration(
|
||||||
val scrapeInterval: Int,
|
val scrapeInterval: Int,
|
||||||
@ -99,7 +96,7 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
|
|||||||
Log.d(TAG, "performScrapeAndSaveIt start")
|
Log.d(TAG, "performScrapeAndSaveIt start")
|
||||||
|
|
||||||
val scrapedMetrics = config.collectorRegistry.metricFamilySamples()
|
val scrapedMetrics = config.collectorRegistry.metricFamilySamples()
|
||||||
val metricsScrape : MetricsScrape = MetricsScrape.fromMfs(scrapedMetrics)
|
val metricsScrape: MetricsScrape = MetricsScrape.fromMfs(scrapedMetrics)
|
||||||
|
|
||||||
storage.writeScrapedSample(metricsScrape)
|
storage.writeScrapedSample(metricsScrape)
|
||||||
channel.send(Unit)
|
channel.send(Unit)
|
||||||
@ -107,12 +104,12 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
|
|||||||
Log.d(TAG, "performScrapeAndSaveIt end")
|
Log.d(TAG, "performScrapeAndSaveIt end")
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun insertInitialDummyScrape(){
|
private fun insertInitialDummyScrape() {
|
||||||
lastTimeRingBuffer.setLastTime(System.currentTimeMillis())
|
lastTimeRingBuffer.setLastTime(System.currentTimeMillis())
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun scraper(channel: Channel<Unit>) {
|
private suspend fun scraper(channel: Channel<Unit>) {
|
||||||
val checkDelay : Long = 1000L
|
val checkDelay = 1000L
|
||||||
|
|
||||||
insertInitialDummyScrape()
|
insertInitialDummyScrape()
|
||||||
|
|
||||||
@ -144,7 +141,7 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun conditionsForRemoteWrite(): Boolean {
|
private fun conditionsForRemoteWrite(): Boolean {
|
||||||
val ctx : Context = config.getContext()
|
val ctx: Context = config.getContext()
|
||||||
return Util.deviceIsConnectedToInternet(ctx) && (timeHasPassed() || enoughSamplesScraped())
|
return Util.deviceIsConnectedToInternet(ctx) && (timeHasPassed() || enoughSamplesScraped())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -163,7 +160,10 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
|
|||||||
ExponentialBackoff.runWithBackoff({
|
ExponentialBackoff.runWithBackoff({
|
||||||
sendRequestToRemoteWrite(body, config.maxSamplesPerExport)
|
sendRequestToRemoteWrite(body, config.maxSamplesPerExport)
|
||||||
}, {
|
}, {
|
||||||
Log.d(TAG, "exportToRemoteWriteEndpointException, ${it.message}, ${it}, ${it.stackTraceToString()}")
|
Log.d(
|
||||||
|
TAG,
|
||||||
|
"exportToRemoteWriteEndpointException, ${it.message}, ${it}, ${it.stackTraceToString()}"
|
||||||
|
)
|
||||||
}, "Remote Write", false)
|
}, "Remote Write", false)
|
||||||
Log.d(TAG, "Exponential backoff to export remote write finish")
|
Log.d(TAG, "Exponential backoff to export remote write finish")
|
||||||
}
|
}
|
||||||
@ -226,7 +226,7 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
|
|||||||
lastTimeRingBuffer.setLastTime(System.currentTimeMillis())
|
lastTimeRingBuffer.setLastTime(System.currentTimeMillis())
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun sendRequestToRemoteWrite(body: ByteArray, numOfMetricScrapes : Int) {
|
private suspend fun sendRequestToRemoteWrite(body: ByteArray, numOfMetricScrapes: Int) {
|
||||||
Log.d(TAG, "Exporting remote write to prometheus now")
|
Log.d(TAG, "Exporting remote write to prometheus now")
|
||||||
val response = client.post(config.remoteWriteEndpoint) {
|
val response = client.post(config.remoteWriteEndpoint) {
|
||||||
setBody(body)
|
setBody(body)
|
||||||
@ -245,11 +245,13 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) {
|
|||||||
// this export was successful
|
// this export was successful
|
||||||
storage.removeNumberOfScrapedSamples(numOfMetricScrapes)
|
storage.removeNumberOfScrapedSamples(numOfMetricScrapes)
|
||||||
}
|
}
|
||||||
|
|
||||||
HttpStatusCode.BadRequest -> {
|
HttpStatusCode.BadRequest -> {
|
||||||
// probably some error or race condition has occured
|
// probably some error or race condition has occured
|
||||||
// give up trying to send this data
|
// give up trying to send this data
|
||||||
storage.removeNumberOfScrapedSamples(numOfMetricScrapes)
|
storage.removeNumberOfScrapedSamples(numOfMetricScrapes)
|
||||||
}
|
}
|
||||||
|
|
||||||
else -> {
|
else -> {
|
||||||
throw TryExportMetricsAgainException("Status code: ${response.status.description}")
|
throw TryExportMetricsAgainException("Status code: ${response.status.description}")
|
||||||
}
|
}
|
||||||
|
@ -10,30 +10,30 @@ import java.util.Queue
|
|||||||
// HashMap<List of labels including name, List of TimeSeries samples to this TimeSeries>
|
// HashMap<List of labels including name, List of TimeSeries samples to this TimeSeries>
|
||||||
private typealias ConverterHashMap = HashMap<List<TimeSeriesLabel>, MutableList<TimeSeriesSample>>
|
private typealias ConverterHashMap = HashMap<List<TimeSeriesLabel>, MutableList<TimeSeriesSample>>
|
||||||
|
|
||||||
private const val TAG : String = "REMOTE_WRITE_SENDER_MEMORY_SIMPLE_STORAGE";
|
private const val TAG: String = "REMOTE_WRITE_SENDER_MEMORY_SIMPLE_STORAGE"
|
||||||
|
|
||||||
class RemoteWriteSenderSimpleMemoryStorage : RemoteWriteSenderStorage() {
|
class RemoteWriteSenderSimpleMemoryStorage : RemoteWriteSenderStorage() {
|
||||||
private val data: Queue<MetricsScrape> = LinkedList()
|
private val data: Queue<MetricsScrape> = LinkedList()
|
||||||
|
|
||||||
private fun filterExpiredMetrics(metrics : MutableList<MetricsScrape>){
|
private fun filterExpiredMetrics(metrics: MutableList<MetricsScrape>) {
|
||||||
val now : Long = System.currentTimeMillis()
|
val now: Long = System.currentTimeMillis()
|
||||||
val oldestMetricTimeMs : Long = now - maxMetricsAge * 1000
|
val oldestMetricTimeMs: Long = now - maxMetricsAge * 1000
|
||||||
var howManyMetricsRemove : Int = 0
|
var howManyMetricsRemove = 0
|
||||||
|
|
||||||
// count how many metrics to remove
|
// count how many metrics to remove
|
||||||
for (i in 0 until metrics.size){
|
for (i in 0 until metrics.size) {
|
||||||
val scrape : MetricsScrape = metrics[i]
|
val scrape: MetricsScrape = metrics[i]
|
||||||
if(scrape.timeSeriesList.isNotEmpty()){
|
if (scrape.timeSeriesList.isNotEmpty()) {
|
||||||
if(scrape.timeSeriesList.first().sample.timeStampMs < oldestMetricTimeMs){
|
if (scrape.timeSeriesList.first().sample.timeStampMs < oldestMetricTimeMs) {
|
||||||
howManyMetricsRemove++
|
howManyMetricsRemove++
|
||||||
}else{
|
} else {
|
||||||
break; // I suppose scrapes were performed one after another
|
break // I suppose scrapes were performed one after another
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove metrics
|
// remove metrics
|
||||||
for (i in 1..howManyMetricsRemove){
|
for (i in 1..howManyMetricsRemove) {
|
||||||
metrics.removeFirst()
|
metrics.removeFirst()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -56,7 +56,8 @@ class RemoteWriteSenderSimpleMemoryStorage : RemoteWriteSenderStorage() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun hashmapToProtobufWriteRequest(hashMap: ConverterHashMap): RemoteWrite.WriteRequest {
|
private fun hashmapToProtobufWriteRequest(hashMap: ConverterHashMap): RemoteWrite.WriteRequest {
|
||||||
val writeRequestBuilder: RemoteWrite.WriteRequest.Builder = RemoteWrite.WriteRequest.newBuilder()
|
val writeRequestBuilder: RemoteWrite.WriteRequest.Builder =
|
||||||
|
RemoteWrite.WriteRequest.newBuilder()
|
||||||
|
|
||||||
for (entry in hashMap) {
|
for (entry in hashMap) {
|
||||||
val timeSeries = hashMapEntryToProtobufTimeSeries(entry.key, entry.value)
|
val timeSeries = hashMapEntryToProtobufTimeSeries(entry.key, entry.value)
|
||||||
@ -84,9 +85,10 @@ class RemoteWriteSenderSimpleMemoryStorage : RemoteWriteSenderStorage() {
|
|||||||
|
|
||||||
filterExpiredMetrics(scrapedMetrics)
|
filterExpiredMetrics(scrapedMetrics)
|
||||||
|
|
||||||
val writeRequest: RemoteWrite.WriteRequest = metricsScrapeListToProtobuf(scrapedMetrics.toList())
|
val writeRequest: RemoteWrite.WriteRequest =
|
||||||
|
metricsScrapeListToProtobuf(scrapedMetrics.toList())
|
||||||
val bytes: ByteArray = writeRequest.toByteArray()
|
val bytes: ByteArray = writeRequest.toByteArray()
|
||||||
return RemoteWriteSenderStorage.encodeWithSnappy(bytes)
|
return encodeWithSnappy(bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun metricsScrapeListToProtobuf(input: List<MetricsScrape>): RemoteWrite.WriteRequest {
|
private fun metricsScrapeListToProtobuf(input: List<MetricsScrape>): RemoteWrite.WriteRequest {
|
||||||
@ -97,24 +99,22 @@ class RemoteWriteSenderSimpleMemoryStorage : RemoteWriteSenderStorage() {
|
|||||||
val hashmap: ConverterHashMap = HashMap()
|
val hashmap: ConverterHashMap = HashMap()
|
||||||
|
|
||||||
for (metricsScrape in input) {
|
for (metricsScrape in input) {
|
||||||
for (timeSeries in metricsScrape.timeSeriesList){
|
for (timeSeries in metricsScrape.timeSeriesList) {
|
||||||
processStorageTimeSeries(hashmap, timeSeries)
|
processStorageTimeSeries(hashmap, timeSeries)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val result: RemoteWrite.WriteRequest = hashmapToProtobufWriteRequest(hashmap)
|
return hashmapToProtobufWriteRequest(hashmap)
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun processStorageTimeSeries(hashMap: ConverterHashMap, timeSeries: StorageTimeSeries){
|
private fun processStorageTimeSeries(hashMap: ConverterHashMap, timeSeries: StorageTimeSeries) {
|
||||||
|
|
||||||
// add remote write label to labels
|
// add remote write label to labels
|
||||||
// this label ensures timeseries uniqueness among those scraped by pushprox or promserver
|
// this label ensures timeseries uniqueness among those scraped by pushprox or promserver
|
||||||
// and those scraped by Remote Write
|
// and those scraped by Remote Write
|
||||||
val labels: MutableList<TimeSeriesLabel> = timeSeries.labels.toMutableList()
|
val labels: MutableList<TimeSeriesLabel> = timeSeries.labels.toMutableList()
|
||||||
labels.add(remoteWriteLabel)
|
labels.add(remoteWriteLabel)
|
||||||
val immutableLabels : List<TimeSeriesLabel> = labels.toList()
|
val immutableLabels: List<TimeSeriesLabel> = labels.toList()
|
||||||
|
|
||||||
if (hashMap[immutableLabels] == null) {
|
if (hashMap[immutableLabels] == null) {
|
||||||
// this time series does not yet exist
|
// this time series does not yet exist
|
||||||
@ -128,9 +128,9 @@ class RemoteWriteSenderSimpleMemoryStorage : RemoteWriteSenderStorage() {
|
|||||||
override fun removeNumberOfScrapedSamples(number: Int) {
|
override fun removeNumberOfScrapedSamples(number: Int) {
|
||||||
if (number > 0) {
|
if (number > 0) {
|
||||||
for (i in 1..number) {
|
for (i in 1..number) {
|
||||||
if(data.isEmpty()){
|
if (data.isEmpty()) {
|
||||||
break;
|
break
|
||||||
}else{
|
} else {
|
||||||
data.remove()
|
data.remove()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,17 +2,12 @@
|
|||||||
|
|
||||||
package com.birdthedeveloper.prometheus.android.exporter.worker
|
package com.birdthedeveloper.prometheus.android.exporter.worker
|
||||||
|
|
||||||
import android.util.Log
|
|
||||||
import io.prometheus.client.Collector.MetricFamilySamples
|
import io.prometheus.client.Collector.MetricFamilySamples
|
||||||
import kotlinx.serialization.Serializable
|
import kotlinx.serialization.Serializable
|
||||||
import org.iq80.snappy.Snappy
|
import org.iq80.snappy.Snappy
|
||||||
import remote.write.RemoteWrite.Label
|
import remote.write.RemoteWrite.Label
|
||||||
import remote.write.RemoteWrite.Sample
|
import remote.write.RemoteWrite.Sample
|
||||||
import remote.write.RemoteWrite.TimeSeries
|
|
||||||
import remote.write.RemoteWrite.WriteRequest
|
|
||||||
import java.util.Enumeration
|
import java.util.Enumeration
|
||||||
import java.util.LinkedList
|
|
||||||
import java.util.Queue
|
|
||||||
|
|
||||||
private const val TAG: String = "REMOTE_WRITE_SENDER_STORAGE"
|
private const val TAG: String = "REMOTE_WRITE_SENDER_STORAGE"
|
||||||
|
|
||||||
@ -23,19 +18,19 @@ private const val TAG: String = "REMOTE_WRITE_SENDER_STORAGE"
|
|||||||
|
|
||||||
// data classes, the same structure as MetricFamilySamples
|
// data classes, the same structure as MetricFamilySamples
|
||||||
data class MetricsScrape(
|
data class MetricsScrape(
|
||||||
val timeSeriesList : List<StorageTimeSeries>
|
val timeSeriesList: List<StorageTimeSeries>
|
||||||
){
|
) {
|
||||||
companion object {
|
companion object {
|
||||||
fun fromMfs(input : Enumeration<MetricFamilySamples>) : MetricsScrape {
|
fun fromMfs(input: Enumeration<MetricFamilySamples>): MetricsScrape {
|
||||||
val timeSeriesList : MutableList<StorageTimeSeries> = mutableListOf()
|
val timeSeriesList: MutableList<StorageTimeSeries> = mutableListOf()
|
||||||
|
|
||||||
for (family in input){
|
for (family in input) {
|
||||||
for (sample in family.samples){
|
for (sample in family.samples) {
|
||||||
if (sample != null){
|
if (sample != null) {
|
||||||
val labels : MutableList<TimeSeriesLabel> = mutableListOf()
|
val labels: MutableList<TimeSeriesLabel> = mutableListOf()
|
||||||
|
|
||||||
// name label
|
// name label
|
||||||
val sampleName : String = sample.name
|
val sampleName: String = sample.name
|
||||||
val sampleNameLabel = TimeSeriesLabel(
|
val sampleNameLabel = TimeSeriesLabel(
|
||||||
name = "__name__",
|
name = "__name__",
|
||||||
value = sampleName
|
value = sampleName
|
||||||
@ -77,8 +72,8 @@ data class MetricsScrape(
|
|||||||
}
|
}
|
||||||
|
|
||||||
data class StorageTimeSeries(
|
data class StorageTimeSeries(
|
||||||
val sample : TimeSeriesSample,
|
val sample: TimeSeriesSample,
|
||||||
val labels : List<TimeSeriesLabel>,
|
val labels: List<TimeSeriesLabel>,
|
||||||
)
|
)
|
||||||
|
|
||||||
@Serializable
|
@Serializable
|
||||||
@ -107,13 +102,14 @@ data class TimeSeriesSample(
|
|||||||
}
|
}
|
||||||
|
|
||||||
abstract class RemoteWriteSenderStorage {
|
abstract class RemoteWriteSenderStorage {
|
||||||
companion object{
|
companion object {
|
||||||
const val maxMetricsAge : Int = 58 * 60 // 58 minutes
|
const val maxMetricsAge: Int = 58 * 60 // 58 minutes
|
||||||
|
|
||||||
val remoteWriteLabel: TimeSeriesLabel = TimeSeriesLabel(
|
val remoteWriteLabel: TimeSeriesLabel = TimeSeriesLabel(
|
||||||
name = "backfill",
|
name = "backfill",
|
||||||
value = "true",
|
value = "true",
|
||||||
)
|
)
|
||||||
|
|
||||||
fun encodeWithSnappy(data: ByteArray): ByteArray {
|
fun encodeWithSnappy(data: ByteArray): ByteArray {
|
||||||
return Snappy.compress(data)
|
return Snappy.compress(data)
|
||||||
}
|
}
|
||||||
|
@ -7,8 +7,8 @@ import android.net.ConnectivityManager
|
|||||||
import android.net.NetworkCapabilities
|
import android.net.NetworkCapabilities
|
||||||
|
|
||||||
class Util {
|
class Util {
|
||||||
companion object{
|
companion object {
|
||||||
fun deviceIsConnectedToInternet(context : Context) : Boolean{
|
fun deviceIsConnectedToInternet(context: Context): Boolean {
|
||||||
val connectivityManager = context
|
val connectivityManager = context
|
||||||
.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager?
|
.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager?
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user