diff --git a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/RemoteWriteSender.kt b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/RemoteWriteSender.kt index fa321bb..8c6460d 100644 --- a/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/RemoteWriteSender.kt +++ b/client/app/src/main/java/com/birdthedeveloper/prometheus/android/prometheus/android/exporter/worker/RemoteWriteSender.kt @@ -14,7 +14,6 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext import remote.write.RemoteWrite @@ -24,8 +23,6 @@ import remote.write.RemoteWrite.WriteRequest private const val TAG: String = "REMOTE_WRITE_SENDER" -//TODO this thing - // This class stores information about scrapes to PROM_SERVER and PUSHPROX // for purposes of scraping metrics on device and back-filling them later using remote write // @@ -51,11 +48,11 @@ private class LastTimeRingBuffer(private val scrapeIntervalMs: Int) { return buffer[bufferIndex] } - private fun checkScrapeDidNotHappenInTime() : Boolean { + fun checkScrapeDidNotHappenInTime() : Boolean { return getTimeByIndex(0) < System.currentTimeMillis() - 3 * scrapeIntervalMs } - private fun checkScrapeDidNotHappenHysteresis() : Boolean { + fun checkScrapeDidNotHappenHysteresis() : Boolean { val scrapeOccuredAfterThis : Long = System.currentTimeMillis() - 5 * scrapeIntervalMs for (i in 0 until hysteresisThreshold) { if (getTimeByIndex(i) < scrapeOccuredAfterThis){ @@ -74,13 +71,13 @@ data class RemoteWriteConfiguration( ) class RemoteWriteSender(private val config: RemoteWriteConfiguration) { - // TODO ring buffer for last time - // TODO last time into it's own object with boolean functions - // private val lastTimeMutex = LastTimeMutex() + private val lastTimeRingBuffer = LastTimeRingBuffer(config.scrape_interval * 1000) private var alreadyStoredSampleLength : Int = 0 private val storage : RemoteWriteSenderStorage = RemoteWriteSenderSimpleMemoryStorage() + private var scrapesAreBeingSent : Boolean = false + private lateinit var client : HttpClient - private fun getRequestBody(): ByteArray { + private fun testGetRequestBody(): ByteArray { val label1: Label = Label.newBuilder() .setName("code") .setValue("200").build() @@ -128,10 +125,7 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) { storage.writeScrapedSample(scrapedMetrics) } - //TODO channel je bad - //TODO v remotewriteseender storage musi byt mutex - - + //TODO refactor this thing private suspend fun scraper(channel : Channel){ val checkDelay = 1000L while (true){ @@ -148,8 +142,10 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) { } } + //TODO refactor this ting // sending metric scrapes to remote write endpoint will not be parallel - private suspend fun sendAll(){ + private suspend fun sender(){ + //TODO exponential backoff scrapesAreBeingSentMutex.withLock { // Take all metric samples and send them in batches of (max_samples_per_send) // one by one batch @@ -158,6 +154,7 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) { } } + //TODO refactor this thing private suspend fun senderManager(channel : Channel){ val alreadyStoredMetricScrapes : Int = storage.getLength() @@ -182,35 +179,39 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) { } + // entrypoint suspend fun start(){ val channel = Channel() + client = HttpClient() try { runBlocking { launch { + // check for outage in scrapes, save scrapes to storage scraper(channel) } launch { + // send saved scrapes to remote write endpoint senderManager(channel) } } } finally { withContext(NonCancellable){ channel.close() + client.close() Log.v(TAG, "Canceling Remote Write Sender") } } } - suspend fun countSuccessfulScrape(){ + fun countSuccessfulScrape(){ Log.v(TAG, "Counting successful scrape") - lastTimeMutex.setLastTime(System.currentTimeMillis()) + lastTimeRingBuffer.setLastTime(System.currentTimeMillis()) } - private suspend fun sendTestRequest() { - Log.v(TAG, "sending to prometheus now") - val client = HttpClient() + private suspend fun sendRequestToRemoteWrite(body : ByteArray){ + Log.v(TAG, "sending to prometheus remote write now") val response = client.post(config.remote_write_endpoint) { - setBody(encodeWithSnappy(getRequestBody())) + setBody(body) headers { append(HttpHeaders.ContentEncoding, "snappy") append(HttpHeaders.ContentType, "application/protobuf") @@ -221,7 +222,5 @@ class RemoteWriteSender(private val config: RemoteWriteConfiguration) { Log.v(TAG, "Response status: ${response.status.toString()}") Log.v(TAG, "body: ${response.body()}") - - client.close() } }