Last active
August 29, 2015 14:11
-
-
Save markpollack/f6f98e1ec75396053ab0 to your computer and use it in GitHub Desktop.
Time window average with some anemic domain objects
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SensorProcessor implements ReactorProcessor<SensorData, SensorSummary> { | |
@Override | |
public Stream<SensorSummary> process(Stream<SensorData> inputStream) { | |
return inputStream | |
.window(5, 20, TimeUnit.SECONDS) | |
//would be better to convert to stream of double 'values' and then have generic avg for type safety. | |
.flatMap(sensorDataStream -> | |
sensorDataStream.buffer().reduce(tuple2 -> | |
new SensorSummary(avg(tuple2.getT1(), "value"), System.currentTimeMillis()) | |
)); | |
} | |
private double avg(List<?> objectList, String propertyName) { | |
ExpressionParser parser = new SpelExpressionParser(); | |
Expression exp = parser.parseExpression(propertyName); | |
double sum = 0; | |
double count = 0; | |
for(Object obj : objectList) { | |
count++; | |
sum += exp.getValue(obj, Double.class); | |
} | |
return sum/count; | |
} | |
//older stuff with specific domain objects used in calculation.... | |
public Stream<SensorSummary> processOld(Stream<SensorData> inputStream) { | |
return inputStream | |
.window(5, 20, TimeUnit.SECONDS) | |
.flatMap(sensorDataStream -> sensorDataStream.buffer().reduce(getAverage())); | |
} | |
private Function<Tuple2<List<SensorData>, SensorSummary>, SensorSummary> getAverage() { | |
return listSensorSummaryTuple2 -> { | |
final List<SensorData> sensorDataList = listSensorSummaryTuple2.getT1(); | |
long sum = 0; | |
long count = 0; | |
for (SensorData sensorData : sensorDataList) { | |
sum += sensorData.getValue(); | |
count++; | |
} | |
return new SensorSummary(sum / count); | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment