Created
April 27, 2016 21:42
-
-
Save kaniska/5ceb214193b7604815fb23b74151bd93 to your computer and use it in GitHub Desktop.
Flink-Kafka-Avro
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* | |
*/ | |
package com.xyz.topology.netflow.beam; | |
import java.util.Properties; | |
import org.apache.avro.io.BinaryDecoder; | |
import org.apache.avro.io.DatumReader; | |
import org.apache.avro.io.DecoderFactory; | |
import org.apache.avro.reflect.ReflectDatumReader; | |
import org.apache.avro.specific.SpecificDatumReader; | |
import org.apache.beam.runners.flink.FlinkPipelineRunner; | |
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.PipelineResult; | |
import org.apache.beam.sdk.io.Read; | |
import org.apache.beam.sdk.io.UnboundedSource; | |
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.transforms.Count; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.windowing.AfterWatermark; | |
import org.apache.beam.sdk.transforms.windowing.FixedWindows; | |
import org.apache.beam.sdk.transforms.windowing.Window; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.flink.api.common.ExecutionConfig; | |
import org.apache.flink.api.common.typeinfo.TypeInformation; | |
import org.apache.flink.api.java.typeutils.TypeExtractor; | |
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; | |
import org.apache.flink.streaming.util.serialization.DeserializationSchema; | |
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; | |
import org.joda.time.Duration; | |
import com.xyz.schemas.Test; | |
public class BeamKafkaFlinkAvroConsumerTest { | |
private static final String TOPIC = "topic3"; | |
private static BeamKafkaOptions options; | |
private static Properties props = new Properties(); | |
public static void setup(String[] args) { | |
PipelineOptionsFactory.register(BeamKafkaOptions.class); | |
options = PipelineOptionsFactory.fromArgs(args).as(BeamKafkaOptions.class); | |
options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds"); | |
// options.setZookeeper(EMBEDDED_ZOOKEEPER.getConnection()); | |
// options.setBroker(EMBEDDED_KAFKA_CLUSTER.getBrokerList()); | |
options.setKafkaTopic(TOPIC); | |
options.setStreaming(true); | |
options.setCheckpointingInterval(1000L); | |
options.setNumberOfExecutionRetries(5); | |
options.setExecutionRetryDelay(3000L); | |
options.setRunner(FlinkPipelineRunner.class); | |
System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " " + options.getBroker() + " " | |
+ options.getGroup()); | |
props.setProperty("zookeeper.connect", options.getZookeeper()); | |
props.setProperty("bootstrap.servers", options.getBroker()); | |
props.setProperty("group.id", options.getGroup()); | |
} | |
public static UnboundedSource<Test, CheckpointMark> consumeMessages() { | |
AvroDeserializationSchema schema = new AvroDeserializationSchema(Test.class); | |
//TypeInformation<Test> info = TypeExtractor.getForClass(Test.class); | |
//TypeInformationSerializationSchema<Test> schema = new TypeInformationSerializationSchema<Test>(info, | |
// new ExecutionConfig()); | |
FlinkKafkaConsumer08<Test> kafkaConsumer = new FlinkKafkaConsumer08<>(TOPIC, schema, props); | |
return UnboundedFlinkSource.of(kafkaConsumer); | |
} | |
public static void main(String args[]) { | |
setup(args); | |
Pipeline pipeline = Pipeline.create(options); | |
PCollection<Test> users = pipeline.apply( | |
Read.named("StreamingWordCount").from(consumeMessages())) | |
.apply(Window.<Test> into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))) | |
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) | |
.discardingFiredPanes()); | |
// users.apply(ConsoleIO.Write.create()); | |
PCollection<Long> counts = users.apply(Count.globally()); | |
// .apply(ConsoleIO.Write.create()); | |
// .apply(TextIO.Write.to("outputKafka.txt")); | |
System.out.println("***************** " + counts); | |
PipelineResult result = pipeline.run(); | |
System.out.println("***************** " + result.toString()); | |
} | |
} | |
class AvroDeserializationSchema<T> implements DeserializationSchema<T> { | |
private final Class<T> avroType; | |
private transient DatumReader<T> reader; | |
private transient BinaryDecoder decoder; | |
public AvroDeserializationSchema(Class<T> avroType) { | |
this.avroType = avroType; | |
} | |
@Override | |
public T deserialize(byte[] message) { | |
ensureInitialized(); | |
try { | |
decoder = DecoderFactory.get().binaryDecoder(message, decoder); | |
return reader.read(null, decoder); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
@Override | |
public boolean isEndOfStream(T nextElement) { | |
return false; | |
} | |
@Override | |
public TypeInformation<T> getProducedType() { | |
return TypeExtractor.getForClass(avroType); | |
} | |
private void ensureInitialized() { | |
if (reader == null) { | |
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) { | |
reader = new SpecificDatumReader<T>(avroType); | |
} else { | |
reader = new ReflectDatumReader<T>(avroType); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* | |
*/ | |
package com.xyz.topology.netflow.beam; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.util.Properties; | |
import org.apache.avro.generic.GenericDatumWriter; | |
import org.apache.avro.io.BinaryEncoder; | |
import org.apache.avro.io.DatumWriter; | |
import org.apache.avro.io.EncoderFactory; | |
import org.apache.avro.reflect.ReflectDatumWriter; | |
import org.apache.avro.specific.SpecificDatumWriter; | |
import org.apache.beam.runners.flink.FlinkPipelineRunner; | |
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.coders.AvroCoder; | |
import org.apache.beam.sdk.io.Write; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.transforms.Create; | |
import org.apache.flink.api.common.ExecutionConfig; | |
import org.apache.flink.api.common.typeinfo.TypeInformation; | |
import org.apache.flink.api.java.typeutils.TypeExtractor; | |
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; | |
import org.apache.flink.streaming.util.serialization.SerializationSchema; | |
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; | |
import com.xyz.schemas.Test; | |
import kafka.javaapi.producer.Producer; | |
import kafka.producer.KeyedMessage; | |
import kafka.producer.ProducerConfig; | |
/** | |
* @author kaniska | |
* | |
*/ | |
public class BeamKafkaFlinkAvroProducerTest { | |
private static final String TOPIC = "topic3"; | |
private static BeamKafkaOptions options; | |
private static Properties props = new Properties(); | |
public static void setup(String[] args) { | |
PipelineOptionsFactory.register(BeamKafkaOptions.class); | |
options = PipelineOptionsFactory.fromArgs(args).as(BeamKafkaOptions.class); | |
options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds"); | |
// options.setZookeeper(EMBEDDED_ZOOKEEPER.getConnection()); | |
// options.setBroker(EMBEDDED_KAFKA_CLUSTER.getBrokerList()); | |
options.setKafkaTopic(TOPIC); | |
options.setStreaming(true); | |
options.setCheckpointingInterval(1000L); | |
options.setNumberOfExecutionRetries(5); | |
options.setExecutionRetryDelay(3000L); | |
options.setRunner(FlinkPipelineRunner.class); | |
System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " " + options.getBroker() + " " | |
+ options.getGroup()); | |
props.setProperty("zookeeper.connect", options.getZookeeper()); | |
props.setProperty("bootstrap.servers", options.getBroker()); | |
props.setProperty("group.id", options.getGroup()); | |
props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); | |
} | |
private void produceData1(){ | |
FlinkKafkaProducer08<User> kafkaSink = | |
new FlinkKafkaProducer08<>(TOPIC, new AvroSerializationSchema(User.class), props); | |
Pipeline pipeline = Pipeline.create(options); | |
pipeline | |
.apply(Create.of( | |
new User("Joe", 3, "red"), | |
new User("Mary", 4, "blue"), | |
new User("Mark", 1, "green"), | |
new User("Julia", 5, "purple"))) | |
//.withCoder(AvroCoder.of(User.class))) | |
.apply(Write.to(UnboundedFlinkSink.of(kafkaSink))); | |
pipeline.run(); | |
} | |
private static void produceAvroData2(){ | |
TypeInformation<Test> info = TypeExtractor.getForClass(Test.class); | |
TypeInformationSerializationSchema<Test> schema =new TypeInformationSerializationSchema<Test>(info, new ExecutionConfig()); | |
AvroSerializationSchema schema = new AvroSerializationSchema(Test.class); | |
FlinkKafkaProducer08<Test> kafkaSink = | |
new FlinkKafkaProducer08<>(TOPIC, schema, props); | |
Pipeline pipeline = Pipeline.create(options); | |
pipeline | |
.apply(Create.of( | |
new Test("Joe", 6)) | |
.withCoder(AvroCoder.of(Test.class))). | |
apply(Write.to(UnboundedFlinkSink.of(kafkaSink))); | |
pipeline.run(); | |
} | |
private static void produceSimpleData() throws IOException{ | |
Properties props = new Properties(); | |
props.put("metadata.broker.list", "localhost:9092"); | |
props.put("serializer.class", "kafka.serializer.DefaultEncoder"); | |
props.put("request.required.acks", "1"); | |
ProducerConfig config = new ProducerConfig(props); | |
Producer<String, byte[]> kafkaProducer = new Producer<String, byte[]>(config); | |
Test test = new Test("Don", 32); | |
// serializing in avro format | |
//DatumWriter<NetFlowPkt> datumWriter = new SpecificDatumWriter<NetFlowPkt>(NetFlowPkt | |
// .class); | |
DatumWriter<Test> datumWriter = new SpecificDatumWriter<Test>(Test | |
.class); | |
ByteArrayOutputStream out = new ByteArrayOutputStream(); | |
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); | |
datumWriter.write(test, encoder); | |
encoder.flush(); | |
byte[] serializedBytes = out.toByteArray(); | |
out.close(); | |
KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("topic3", | |
serializedBytes); | |
kafkaProducer.send(message); | |
kafkaProducer.close(); | |
} | |
public static void main(String args[]){ | |
setup(args); | |
try { | |
//produceSimpleData(); | |
produceAvroData2(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
/** | |
public void testDeSerialization() { | |
try { | |
TypeInformation<User> info = TypeExtractor.getForClass(User.class); | |
TypeInformationSerializationSchema<User> schema = | |
new TypeInformationSerializationSchema<User>(info, new ExecutionConfig()); | |
User[] types = { | |
new User(72, new Date(763784523L), new Date(88234L)), | |
new User(-1, new Date(11111111111111L)), | |
new User(42), | |
new User(17, new Date(222763784523L)) | |
}; | |
for (User val : types) { | |
byte[] serialized = schema.serialize(val); | |
User deser = schema.deserialize(serialized); | |
assertEquals(val, deser); | |
} | |
} | |
catch (Exception e) { | |
e.printStackTrace(); | |
fail(e.getMessage()); | |
} | |
} | |
**/ | |
private static class AvroSerializationSchema<T> implements SerializationSchema { | |
private final Class<T> avroType; | |
private transient GenericDatumWriter writer; | |
private transient BinaryEncoder encoder; | |
private T obj; | |
public AvroSerializationSchema(Class<T> avroType) { | |
this.avroType = avroType; | |
} | |
@Override | |
public byte[] serialize(Object elem) { | |
obj = (T)elem; | |
ensureInitialized(); | |
// TODO Auto-generated method stub | |
//return SerializationUtils.serialize((Serializable) obj); | |
ByteArrayOutputStream out = new ByteArrayOutputStream(); | |
encoder = EncoderFactory.get().binaryEncoder(out, null); | |
byte[] serializedBytes = null; | |
try { | |
writer.write(obj, encoder); | |
encoder.flush(); | |
serializedBytes = out.toByteArray(); | |
out.close(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
return serializedBytes; | |
} | |
private void ensureInitialized() { | |
if (writer == null) { | |
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) { | |
writer = new SpecificDatumWriter<T>(avroType); | |
//if(obj instanceof GenericRecord) { | |
// writer = new GenericDatumWriter(((GenericRecord)obj).getSchema()); | |
//}else { | |
// writer = new SpecificDatumWriter<T>(avroType); | |
//} | |
} else { | |
writer = new ReflectDatumWriter<T>(avroType); | |
} | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Autogenerated by Avro | |
* | |
* DO NOT EDIT DIRECTLY | |
*/ | |
package com.xyz.schemas; | |
import java.io.Serializable; | |
@SuppressWarnings("all") | |
@org.apache.avro.specific.AvroGenerated | |
public class Test extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord , Serializable{ | |
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Test\",\"namespace\":\"com.xyz.schemas\",\"fields\":[{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"int\"}]}"); | |
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } | |
@Deprecated public java.lang.CharSequence uname; | |
@Deprecated public int id; | |
/** | |
* Default constructor. Note that this does not initialize fields | |
* to their default values from the schema. If that is desired then | |
* one should use <code>newBuilder()</code>. | |
*/ | |
public Test() {} | |
/** | |
* All-args constructor. | |
*/ | |
public Test(java.lang.CharSequence uname, java.lang.Integer id) { | |
this.uname = uname; | |
this.id = id; | |
} | |
public org.apache.avro.Schema getSchema() { return SCHEMA$; } | |
// Used by DatumWriter. Applications should not call. | |
public java.lang.Object get(int field$) { | |
switch (field$) { | |
case 0: return uname; | |
case 1: return id; | |
default: throw new org.apache.avro.AvroRuntimeException("Bad index"); | |
} | |
} | |
// Used by DatumReader. Applications should not call. | |
@SuppressWarnings(value="unchecked") | |
public void put(int field$, java.lang.Object value$) { | |
switch (field$) { | |
case 0: uname = (java.lang.CharSequence)value$; break; | |
case 1: id = (java.lang.Integer)value$; break; | |
default: throw new org.apache.avro.AvroRuntimeException("Bad index"); | |
} | |
} | |
/** | |
* Gets the value of the 'uname' field. | |
*/ | |
public java.lang.CharSequence getUname() { | |
return uname; | |
} | |
/** | |
* Sets the value of the 'uname' field. | |
* @param value the value to set. | |
*/ | |
public void setUname(java.lang.CharSequence value) { | |
this.uname = value; | |
} | |
/** | |
* Gets the value of the 'id' field. | |
*/ | |
public java.lang.Integer getId() { | |
return id; | |
} | |
/** | |
* Sets the value of the 'id' field. | |
* @param value the value to set. | |
*/ | |
public void setId(java.lang.Integer value) { | |
this.id = value; | |
} | |
/** Creates a new Test RecordBuilder */ | |
public static com.xyz.schemas.Test.Builder newBuilder() { | |
return new com.xyz.schemas.Test.Builder(); | |
} | |
/** Creates a new Test RecordBuilder by copying an existing Builder */ | |
public static com.xyz.schemas.Test.Builder newBuilder(com.xyz.schemas.Test.Builder other) { | |
return new com.xyz.schemas.Test.Builder(other); | |
} | |
/** Creates a new Test RecordBuilder by copying an existing Test instance */ | |
public static com.xyz.schemas.Test.Builder newBuilder(com.xyz.schemas.Test other) { | |
return new com.xyz.schemas.Test.Builder(other); | |
} | |
/** | |
* RecordBuilder for Test instances. | |
*/ | |
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Test> | |
implements org.apache.avro.data.RecordBuilder<Test> { | |
private java.lang.CharSequence uname; | |
private int id; | |
/** Creates a new Builder */ | |
private Builder() { | |
super(com.xyz.schemas.Test.SCHEMA$); | |
} | |
/** Creates a Builder by copying an existing Builder */ | |
private Builder(com.xyz.schemas.Test.Builder other) { | |
super(other); | |
if (isValidValue(fields()[0], other.uname)) { | |
this.uname = data().deepCopy(fields()[0].schema(), other.uname); | |
fieldSetFlags()[0] = true; | |
} | |
if (isValidValue(fields()[1], other.id)) { | |
this.id = data().deepCopy(fields()[1].schema(), other.id); | |
fieldSetFlags()[1] = true; | |
} | |
} | |
/** Creates a Builder by copying an existing Test instance */ | |
private Builder(com.xyz.schemas.Test other) { | |
super(com.xyz.schemas.Test.SCHEMA$); | |
if (isValidValue(fields()[0], other.uname)) { | |
this.uname = data().deepCopy(fields()[0].schema(), other.uname); | |
fieldSetFlags()[0] = true; | |
} | |
if (isValidValue(fields()[1], other.id)) { | |
this.id = data().deepCopy(fields()[1].schema(), other.id); | |
fieldSetFlags()[1] = true; | |
} | |
} | |
/** Gets the value of the 'uname' field */ | |
public java.lang.CharSequence getUname() { | |
return uname; | |
} | |
/** Sets the value of the 'uname' field */ | |
public com.xyz.schemas.Test.Builder setUname(java.lang.CharSequence value) { | |
validate(fields()[0], value); | |
this.uname = value; | |
fieldSetFlags()[0] = true; | |
return this; | |
} | |
/** Checks whether the 'uname' field has been set */ | |
public boolean hasUname() { | |
return fieldSetFlags()[0]; | |
} | |
/** Clears the value of the 'uname' field */ | |
public com.xyz.schemas.Test.Builder clearUname() { | |
uname = null; | |
fieldSetFlags()[0] = false; | |
return this; | |
} | |
/** Gets the value of the 'id' field */ | |
public java.lang.Integer getId() { | |
return id; | |
} | |
/** Sets the value of the 'id' field */ | |
public com.xyz.schemas.Test.Builder setId(int value) { | |
validate(fields()[1], value); | |
this.id = value; | |
fieldSetFlags()[1] = true; | |
return this; | |
} | |
/** Checks whether the 'id' field has been set */ | |
public boolean hasId() { | |
return fieldSetFlags()[1]; | |
} | |
/** Clears the value of the 'id' field */ | |
public com.xyz.schemas.Test.Builder clearId() { | |
fieldSetFlags()[1] = false; | |
return this; | |
} | |
@Override | |
public Test build() { | |
try { | |
Test record = new Test(); | |
record.uname = fieldSetFlags()[0] ? this.uname : (java.lang.CharSequence) defaultValue(fields()[0]); | |
record.id = fieldSetFlags()[1] ? this.id : (java.lang.Integer) defaultValue(fields()[1]); | |
return record; | |
} catch (Exception e) { | |
throw new org.apache.avro.AvroRuntimeException(e); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.xyz.topology.netflow.beam; | |
import java.io.Serializable; | |
import org.apache.avro.Schema; | |
public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { | |
private String name; | |
private int favoriteNumber; | |
private String favoriteColor; | |
public User() { | |
} | |
public User(String name, int favoriteNumber, String favoriteColor) { | |
this.name = name; | |
this.favoriteNumber = favoriteNumber; | |
this.favoriteColor = favoriteColor; | |
} | |
public String getName() { | |
return name; | |
} | |
public String getFavoriteColor() { | |
return favoriteColor; | |
} | |
public int getFavoriteNumber() { | |
return favoriteNumber; | |
} | |
@Override | |
public Schema getSchema() { | |
// TODO Auto-generated method stub | |
return null; | |
} | |
@Override | |
public Object get(int field) { | |
// TODO Auto-generated method stub | |
return null; | |
} | |
@Override | |
public void put(int field, Object value) { | |
// TODO Auto-generated method stub | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment