Created
November 5, 2019 22:25
-
-
Save rionmonster/c5ec6ae7dc79c61aad268e02a9c4cad0 to your computer and use it in GitHub Desktop.
Examples of Custom KStream-KTable Joins to Handle Slowly Loading KTables
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package streams.common.kafka.transformers | |
import org.apache.kafka.streams.KeyValue | |
import org.apache.kafka.streams.kstream.Transformer | |
import org.apache.kafka.streams.state.ValueAndTimestamp | |
import org.apache.kafka.streams.processor.PunctuationType | |
import org.apache.kafka.streams.state.KeyValueStore | |
import org.apache.kafka.streams.processor.ProcessorContext | |
import org.apache.kafka.streams.kstream.TransformerSupplier | |
import java.time.Duration | |
import java.time.Instant | |
@Suppress("UNCHECKED_CAST") | |
class GuaranteedStreamSideJoinTransformer<K, V>(private val streamBufferStoreName: String, private val tableStoreName: String) : TransformerSupplier<K, V, KeyValue<K, KeyValue<K, V>>> { | |
private val approxMaxWaitTimePerRecordForTableData = Duration.ofMinutes(5) | |
private val frequencyToCheckForExpiredWaitTimes = Duration.ofSeconds(5) | |
override fun get(): Transformer<K, V, KeyValue<K, KeyValue<K, V>>> { | |
return object : Transformer<K, V, KeyValue<K, KeyValue<K, V>>> { | |
private var streamBufferStore: KeyValueStore<K, KeyValue<V, Instant>>? = null | |
private var tableStore: KeyValueStore<K, ValueAndTimestamp<V>>? = null | |
private var context: ProcessorContext? = null | |
override fun init(context: ProcessorContext) { | |
streamBufferStore = context.getStateStore(streamBufferStoreName) as KeyValueStore<K, KeyValue<V, Instant>> | |
tableStore = context.getStateStore(tableStoreName) as KeyValueStore<K, ValueAndTimestamp<V>> | |
this.context = context | |
this.context!!.schedule(frequencyToCheckForExpiredWaitTimes, PunctuationType.STREAM_TIME) { timestamp -> punctuate(timestamp)} | |
} | |
override fun transform(key: K, value: V): KeyValue<K, KeyValue<K, V>>? { | |
sendAnyWaitingRecordForKey(key) | |
return sendFullJoinRecordOrWaitForTableSide(key, value, context!!.timestamp()) | |
} | |
/** | |
* In this example we opt to force-forward any waiting record for a given key when a new record for that key | |
* arrives. Alternatively, we could decide to keep buffering such records until either their wait times expire | |
* or a table-side record is received. | |
*/ | |
private fun sendAnyWaitingRecordForKey(key: K) { | |
val streamValue = streamBufferStore!!.get(key) | |
if (streamValue != null) { | |
// No need to check whether a table-side record exists. Because if it did, the table side would have | |
// already triggered a join update and removed that stream record from the buffer. | |
val joinedValue = KeyValue(streamValue.key, null) | |
context!!.forward<K, Any>(key, joinedValue) | |
streamBufferStore!!.delete(key) | |
} | |
} | |
private fun sendFullJoinRecordOrWaitForTableSide(key: K, value: V, streamRecordTimestamp: Long): KeyValue<K, KeyValue<K, V>>? { | |
val tableValue = tableStore!!.get(key) | |
if (tableValue != null) { | |
return KeyValue.pair(key, KeyValue(key, tableValue.value())) | |
} else { | |
streamBufferStore!!.put(key, KeyValue(value, Instant.ofEpochMilli(streamRecordTimestamp))) | |
return null | |
} | |
} | |
private fun punctuate(timestamp: Long) { | |
sendAndPurgeAnyWaitingRecordsThatHaveExceededWaitTime(timestamp) | |
} | |
private fun sendAndPurgeAnyWaitingRecordsThatHaveExceededWaitTime(currentStreamTime: Long) { | |
streamBufferStore!!.all().use { iterator -> | |
while (iterator.hasNext()) { | |
val record = iterator.next() | |
if (waitTimeExpired(record.value.value, currentStreamTime)) { | |
val joinedValue = KeyValue(record.value.key, null) | |
context!!.forward<Any, Any>(record.key, joinedValue) | |
streamBufferStore!!.delete(record.key) | |
} | |
} | |
} | |
} | |
private fun waitTimeExpired(recordTimestamp: Instant, currentStreamTime: Long): Boolean { | |
return Duration.between(recordTimestamp, Instant.ofEpochMilli(currentStreamTime)) > approxMaxWaitTimePerRecordForTableData | |
} | |
override fun close() {} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package streams.common.kafka.transformers | |
import org.apache.kafka.streams.KeyValue | |
import org.apache.kafka.streams.state.KeyValueStore | |
import org.apache.kafka.streams.processor.ProcessorContext | |
import org.apache.kafka.streams.kstream.ValueTransformerWithKey | |
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier | |
import java.time.Duration | |
import java.time.Instant | |
@Suppress("UNCHECKED_CAST") | |
class GuaranteedTableSideJoinTransformer<K, V>(private val streamBufferStoreName: String) : ValueTransformerWithKeySupplier<K, V, KeyValue<K, V>> { | |
private val approxMaxWaitTimePerRecordForTableData = Duration.ofMinutes(5) | |
override fun get(): ValueTransformerWithKey<K, V, KeyValue<K, V>> { | |
return object : ValueTransformerWithKey<K, V, KeyValue<K, V>> { | |
private var streamBufferStore: KeyValueStore<K, KeyValue<K, Instant>>? = null | |
private var context: ProcessorContext? = null | |
override fun init(context: ProcessorContext) { | |
streamBufferStore = context.getStateStore(streamBufferStoreName) as KeyValueStore<K, KeyValue<K, Instant>> | |
this.context = context | |
} | |
override fun transform(key: K, value: V?): KeyValue<K, V>? { | |
return possiblySendFullJoinRecord(key, value, context!!.timestamp()) | |
} | |
private fun possiblySendFullJoinRecord(key: K, tableValue: V?, tableRecordTimestamp: Long): KeyValue<K, V>? { | |
if (tableValue != null) { | |
val streamValue = streamBufferStore!!.get(key) | |
if (streamValue != null) { | |
// You can also incorporate timestamps of records into your join logic as shown here. | |
if (withinAcceptableBounds(streamValue.value, Instant.ofEpochMilli(tableRecordTimestamp))) { | |
streamBufferStore!!.delete(key) | |
return KeyValue(streamValue.key, tableValue) | |
} else { | |
return null | |
} | |
} else { | |
return null | |
} | |
} else { | |
return null | |
} | |
} | |
private fun withinAcceptableBounds(streamRecordTimestamp: Instant, tableRecordTimestamp: Instant): Boolean { | |
return Duration.between(streamRecordTimestamp, tableRecordTimestamp) <= approxMaxWaitTimePerRecordForTableData | |
} | |
override fun close() {} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package streams.common.kafka.extensions | |
import org.apache.avro.specific.SpecificRecord | |
import org.apache.kafka.streams.kstream.KStream | |
import org.apache.kafka.streams.kstream.KTable | |
import org.apache.kafka.streams.kstream.ValueJoiner | |
import streams.common.kafka.TopicManager | |
import streams.common.kafka.transformers.GuaranteedStreamSideJoinTransformer | |
import streams.common.kafka.transformers.GuaranteedTableSideJoinTransformer | |
fun <K, V: SpecificRecord, VO> KStream<K, V>.guaranteedJoin(ktable: KTable<K, VO>, joiner: ValueJoiner<K, V, K>, topicManager: TopicManager): KStream<K, V> { | |
// Set up the store | |
val streamBufferName = "guaranteed_buffer_for_${ktable.queryableStoreName()}" | |
// Ensure that the backing store exists | |
topicManager.getStoreForGuaranteedJoins<V>(streamBufferName) | |
val transformedStream = this.transform( | |
GuaranteedStreamSideJoinTransformer(streamBufferName, ktable.queryableStoreName()), | |
streamBufferName, | |
ktable.queryableStoreName()) | |
val transformedTable = ktable.transformValues( | |
GuaranteedTableSideJoinTransformer(streamBufferName), streamBufferName) | |
.toStream() | |
.filter { _, value -> value != null } | |
val joined = transformedStream | |
// This line complains about a type mismatch in that its expecting to be a KStream<K, KeyValue<K, V>> | |
// as opposed to the KStream<K, KeyValue<K, VO>> | |
.merge(transformedTable) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment