Created
January 29, 2020 05:34
-
-
Save rionmonster/8a216817c101e5cd1eb55572eaddfb30 to your computer and use it in GitHub Desktop.
Users Preprocessing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object EventProcessing { | |
@JvmStatic | |
fun main(args: Array<String>) { | |
val streamsManager = StreamsManager() | |
// Get the events, we need those to enrich with | |
val eventProcessor = EventProcessor() | |
// The raw events that originated everything | |
val events = streamsManager.createKStream<RawEvent>(Topics.RAW_EVENTS) | |
// Get the arrays of enriched entities from the pipeline | |
val users = streamsManager.createArrayKTable<ProcessingUser>(Topics.PROCESSING_USERS) | |
val ips = streamsManager.createArrayKTable<ProcessingIP>(Topics.PROCESSING_IPS) | |
// Append all of the enrichments that were created through the pipeline to the raw event | |
val finalEvents = events | |
.leftJoin(users, ValueJoiner<FinalEvent, ArrayList<ProcessingUser>, FinalEvent>(eventProcessor::enrichWithUsers)) | |
// This filter feels like a hack to prevent incomplete joins from going through as we want everything to be done | |
// before proceeding to final | |
.filter { _, event -> event != null } | |
.leftJoin(ips, ValueJoiner<FinalEvent, ArrayList<ProcessingIP>, FinalEvent>(eventProcessor::enrichWithIps)) | |
.filter { _, event -> event != null } | |
// If it has made it through all of the enrichments, then push it to the final topic | |
// Preferably we would only want to hit this once (if possible) | |
finalEvents | |
.filter { key, event -> key != null && event != null } | |
.map { _, event -> processor.mapToFinalEventAndRekey(event) } | |
.to(Topics.FINAL_EVENTS, Produced.with(Serdes.String, Serdes.getSerde<FinalEvent>())) | |
streamsManager.execute() | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object UserPreprocessing { | |
@JvmStatic | |
fun main(args: Array<String>) { | |
val streamsManager = StreamsManager() | |
// Contains business logic related to identifying / extracting user(s) | |
val preprocessor = UserPreprocessor() | |
// This will map an event to 0-N users and returns a KStream<String, ArrayList<ProcessingUser>> | |
val users = streamsManager | |
.createKStream<RawEvent>(Topics.RAW_EVENTS) | |
.mapValues(preprocessor::extractUsers) | |
// Send these to the preprocessing_users topic (which is picked up and will handle enrichment) | |
users | |
.filter { _, value -> value.isNotEmpty() } | |
.to(Topics.PREPROCESSING_USERS, Produced.with(Serdes.String, Serdes.getArraySerde<ProcessingUser>())) | |
// No users were identified, so send to the processing topic for event enrichment | |
users | |
.filter { _, value -> value.isEmpty() } | |
.to(Topics.PROCESSING_USERS, Produced.with(Serdes.String, Serdes.getArraySerde<ProcessingUser>())) | |
streamsManager.execute() | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object UserProcessing { | |
@JvmStatic | |
fun main(args: Array<String>) { | |
val streamsManager = StreamsManager() | |
// This is a GlobalKTable with all of the users that handles enrichment / new user creation | |
val users = streamsManager.createGlobalKTable<User>(Topics.FINAL_USERS) | |
// This is the stream of user(s) "candidates" that came from the raw event, which need to be created/enriched | |
val processingUsers = streamsManager.createArrayKStream<ProcessingUser>(Topics.PREPROCESSING_USERS) | |
// Contains business logic related to identifying / extracting user(s) | |
val processor = UserProcessor() | |
// Break apart the users and enrich each individually | |
val enrichedUsers = processingUsers | |
.flatMapValues{ _, user -> user } | |
.leftJoin( | |
users, | |
KeyValueMapper { _, user -> processor.extractKey(user.user) }, | |
ValueJoiner<ProcessingUser, User, ProcessingUser>(processor::enrichWithExistingUser) | |
) | |
// Aggregate the enriched users back into a single record and send that to the processing_users topic for event enrichment | |
enrichedUsers | |
.groupByKey(Grouped.with(Serdes.String, Serdes.getSerde<ProcessingUser>())) | |
.aggregate( | |
{ ArrayList() }, | |
{ _, user, aggregatedUsers -> | |
if (!aggregatedUsers.contains(user)){ | |
aggregatedUsers.add(user) | |
} | |
aggregatedUsers | |
}, | |
streamsManager.materializeTo("enriched_users", Serdes.getArraySerde<ProcessingUser>()) | |
) | |
.toStream() | |
.to(Topics.PROCESSING_USERS, Produced.with(Serdes.String, Serdes.getArraySerde<ProcessingUser>())) | |
// Send the latest version of each enriched entity to the final_users topic as it will need to update the global | |
// k table and the topic so that any associated sink connectors pick up the changes | |
enrichedUsers | |
.map { _, value -> processor.mapToUserAndRekey(value) } | |
.filter { _, user -> user != null } | |
.to(Topics.FINAL_USERS, Produced.with(Serdes.String, Serdes.getSerde<User>())) | |
streamsManager.execute() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment