Skip to content

Instantly share code, notes, and snippets.

@markpollack
Last active August 29, 2015 14:11
Show Gist options
  • Save markpollack/f6f98e1ec75396053ab0 to your computer and use it in GitHub Desktop.
Save markpollack/f6f98e1ec75396053ab0 to your computer and use it in GitHub Desktop.
Time window average with some anemic domain objects
public class SensorProcessor implements ReactorProcessor<SensorData, SensorSummary> {
@Override
public Stream<SensorSummary> process(Stream<SensorData> inputStream) {
return inputStream
.window(5, 20, TimeUnit.SECONDS)
//would be better to convert to stream of double 'values' and then have generic avg for type safety.
.flatMap(sensorDataStream ->
sensorDataStream.buffer().reduce(tuple2 ->
new SensorSummary(avg(tuple2.getT1(), "value"), System.currentTimeMillis())
));
}
private double avg(List<?> objectList, String propertyName) {
ExpressionParser parser = new SpelExpressionParser();
Expression exp = parser.parseExpression(propertyName);
double sum = 0;
double count = 0;
for(Object obj : objectList) {
count++;
sum += exp.getValue(obj, Double.class);
}
return sum/count;
}
//older stuff with specific domain objects used in calculation....
public Stream<SensorSummary> processOld(Stream<SensorData> inputStream) {
return inputStream
.window(5, 20, TimeUnit.SECONDS)
.flatMap(sensorDataStream -> sensorDataStream.buffer().reduce(getAverage()));
}
private Function<Tuple2<List<SensorData>, SensorSummary>, SensorSummary> getAverage() {
return listSensorSummaryTuple2 -> {
final List<SensorData> sensorDataList = listSensorSummaryTuple2.getT1();
long sum = 0;
long count = 0;
for (SensorData sensorData : sensorDataList) {
sum += sensorData.getValue();
count++;
}
return new SensorSummary(sum / count);
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment