Created
June 9, 2019 09:14
-
-
Save gxercavins/46e938a358db3826be3d0bf53dc8726c to your computer and use it in GitHub Desktop.
StackOverflow question 56465103
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.beam.sdk.transforms.windowing; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Collection; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.Objects; | |
import javax.annotation.Nullable; | |
import org.apache.beam.sdk.coders.Coder; | |
import org.apache.beam.sdk.transforms.display.DisplayData; | |
import org.apache.beam.sdk.values.KV; | |
import org.joda.time.Duration; | |
/** | |
* A {@link WindowFn} that windows values into sessions separated by periods with no input for at | |
* least the duration specified by {@link #getGapDuration()} or until it fins a stopValue given by | |
* {@link #withStopValue()}. | |
* | |
* <p>For example, in order to window data into session with at least 10 minute gaps in between | |
* them and use 0 as the value where we want to close the window: | |
* | |
* <pre>{@code | |
* PCollection<Integer> pc = ...; | |
* PCollection<Integer> windowed_pc = pc.apply( | |
* Window.<Integer>into(StopSessions.withGapDuration(Duration.standardMinutes(10).withStopValue(0)))); | |
* }</pre> | |
*/ | |
public class StopSessions extends WindowFn<KV<String,Integer>, IntervalWindow> { | |
/** Duration of the gaps between sessions. */ | |
private final Duration gapDuration; | |
/** Value that closes the session. */ | |
private final Integer stopValue; | |
/** Creates a {@code StopSessions} {@link WindowFn} with the specified gap duration. */ | |
public static StopSessions withGapDuration(Duration gapDuration) { | |
return new StopSessions(gapDuration, 0); | |
} | |
/** Creates a {@code StopSessions} {@link WindowFn} with the specified stop value. */ | |
public StopSessions withStopValue(Integer stopValue) { | |
return new StopSessions(gapDuration, stopValue); | |
} | |
/** Creates a {@code StopSessions} {@link WindowFn} with the specified gap duration and stop value. */ | |
private StopSessions(Duration gapDuration, Integer stopValue) { | |
this.gapDuration = gapDuration; | |
this.stopValue = stopValue; | |
} | |
@Override | |
public Collection<IntervalWindow> assignWindows(AssignContext c) { | |
// Assign each element into a window from its timestamp until gapDuration in the | |
// future. Overlapping windows (representing elements within gapDuration of | |
// each other) will be merged. If a stop signal is encountered the element will | |
// be windowed into a minimum window of 1 ms | |
Duration newGap = c.element().getValue().equals(this.stopValue) ? new Duration(1) : gapDuration; | |
return Arrays.asList(new IntervalWindow(c.timestamp(), newGap)); | |
} | |
@Override | |
public Coder<IntervalWindow> windowCoder() { | |
return IntervalWindow.getCoder(); | |
} | |
@Override | |
public boolean isCompatible(WindowFn<?, ?> other) { | |
return other instanceof StopSessions; | |
} | |
@Override | |
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException { | |
if (!this.isCompatible(other)) { | |
throw new IncompatibleWindowException( | |
other, | |
String.format( | |
"%s is only compatible with %s.", | |
StopSessions.class.getSimpleName(), StopSessions.class.getSimpleName())); | |
} | |
} | |
@Override | |
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() { | |
throw new UnsupportedOperationException("StopSessions is not allowed in side inputs"); | |
} | |
public Duration getGapDuration() { | |
return gapDuration; | |
} | |
public Integer getStopValue() { | |
return stopValue; | |
} | |
@Override | |
public void populateDisplayData(DisplayData.Builder builder) { | |
super.populateDisplayData(builder); | |
builder.add(DisplayData.item("gapDuration", gapDuration).withLabel("Session Gap Duration")); | |
builder.add(DisplayData.item("stopValue", stopValue).withLabel("Stop Value that closes Session")); | |
} | |
@Override | |
public boolean equals(Object object) { | |
if (!(object instanceof StopSessions)) { | |
return false; | |
} | |
StopSessions other = (StopSessions) object; | |
return getGapDuration().equals(other.getGapDuration()); | |
} | |
@Override | |
public int hashCode() { | |
return Objects.hash(gapDuration); | |
} | |
/** Merge overlapping {@link IntervalWindow}s. */ | |
@Override | |
public void mergeWindows(MergeContext c) throws Exception { | |
// Merge any overlapping windows into a single window. | |
// Sort the list of existing windows so we only have to | |
// traverse the list once rather than considering all | |
// O(n^2) window pairs. | |
List<IntervalWindow> sortedWindows = new ArrayList<>(); | |
for (IntervalWindow window : c.windows()) { | |
sortedWindows.add(window); | |
} | |
Collections.sort(sortedWindows); | |
List<MergeCandidate> merges = new ArrayList<>(); | |
MergeCandidate current = new MergeCandidate(); | |
for (IntervalWindow window : sortedWindows) { | |
// get window duration and check if it's a stop session request | |
Long windowDuration = new Duration(window.start(), window.end()).getMillis(); | |
if (current.intersects(window) && !windowDuration.equals(1L)) { | |
current.add(window); | |
} else { | |
merges.add(current); | |
current = new MergeCandidate(window); | |
} | |
} | |
merges.add(current); | |
for (MergeCandidate merge : merges) { | |
merge.apply(c); | |
} | |
} | |
private static class MergeCandidate { | |
@Nullable private IntervalWindow union; | |
private final List<IntervalWindow> parts; | |
public MergeCandidate() { | |
union = null; | |
parts = new ArrayList<>(); | |
} | |
public MergeCandidate(IntervalWindow window) { | |
union = window; | |
parts = new ArrayList<>(Arrays.asList(window)); | |
} | |
public boolean intersects(IntervalWindow window) { | |
return union == null || union.intersects(window); | |
} | |
public void add(IntervalWindow window) { | |
union = union == null ? window : union.span(window); | |
parts.add(window); | |
} | |
public void apply(WindowFn<?, IntervalWindow>.MergeContext c) throws Exception { | |
if (parts.size() > 1) { | |
c.merge(parts, union); | |
} | |
} | |
@Override | |
public String toString() { | |
return "MergeCandidate[union=" + union + ", parts=" + parts + "]"; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.dataflow.samples; | |
import java.util.Arrays; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.options.Description; | |
import org.apache.beam.sdk.options.PipelineOptions; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.options.Validation.Required; | |
import org.apache.beam.sdk.transforms.Create; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.GroupByKey; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import org.apache.beam.sdk.transforms.Sum; | |
import org.apache.beam.sdk.transforms.windowing.AfterWatermark; | |
import org.apache.beam.sdk.transforms.windowing.StopSessions; // custom one | |
import org.apache.beam.sdk.transforms.windowing.BoundedWindow; | |
import org.apache.beam.sdk.transforms.windowing.Window; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.TimestampedValue; | |
import org.joda.time.Duration; | |
import org.joda.time.Instant; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class StopSessionsExample { | |
private static final Logger LOG = LoggerFactory.getLogger(StopSessionsExample.class); | |
public static void main(String[] args) { | |
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); | |
Pipeline p = Pipeline.create(options); | |
p | |
.apply("Create data", Create.timestamped( | |
TimestampedValue.of(KV.of("k1", 0), new Instant()), // <t0, k1, 0> | |
TimestampedValue.of(KV.of("k1",98), new Instant().plus(1000)), // <t1, k1, 98> | |
TimestampedValue.of(KV.of("k1",145), new Instant().plus(2000)), // <t2, k1, 145> | |
TimestampedValue.of(KV.of("k1",0), new Instant().plus(4000)), // <t4, k1, 0> | |
TimestampedValue.of(KV.of("k1",350), new Instant().plus(3000)), // <t3, k1, 350> | |
TimestampedValue.of(KV.of("k1",40), new Instant().plus(5000)), // <t5, k1, 40> | |
TimestampedValue.of(KV.of("k1",65), new Instant().plus(6000)), // <t6, k1, 65> | |
TimestampedValue.of(KV.of("k1",120), new Instant().plus(7000)), // <t7, k1, 120> | |
TimestampedValue.of(KV.of("k1",240), new Instant().plus(8000)), // <t8, k1, 240> | |
TimestampedValue.of(KV.of("k1",352), new Instant().plus(9000)))) // <t9, k1, 352> | |
.apply("Window into StopSessions", Window.<KV<String, Integer>>into(StopSessions | |
.withGapDuration(Duration.standardSeconds(10)) | |
.withStopValue(0)) | |
.triggering(AfterWatermark.pastEndOfWindow()) | |
.withAllowedLateness(Duration.ZERO) | |
.discardingFiredPanes()) | |
.apply("Group By Key", GroupByKey.<String, Integer>create()) | |
.apply("Log results", ParDo.of(new DoFn<KV<String, Iterable<Integer>>, Void>() { | |
@ProcessElement | |
public void processElement(ProcessContext c, BoundedWindow window) { | |
String user = c.element().getKey(); | |
Iterable<Integer> scores = c.element().getValue(); | |
StringBuilder values = new StringBuilder(); | |
scores.forEach(value->values.append(value + ",")); | |
LOG.info(String.format("user=%s, scores=%s, window=%s", user, "[" + values.substring(0, values.length() - 1) + "]", window.toString())); | |
} | |
})); | |
p.run(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment