Skip to content

Instantly share code, notes, and snippets.

@pavelfomin
Last active February 27, 2025 22:43
Show Gist options
  • Save pavelfomin/56e2f53e8279cdbe8973cc22988074df to your computer and use it in GitHub Desktop.
Save pavelfomin/56e2f53e8279cdbe8973cc22988074df to your computer and use it in GitHub Desktop.
Use Multiple Avro Schemas in the same Kafka topic

Avro does not support inheritance but some of the reuse can still be accomplished using composition. For example, instead of using the same Event and keep adding mutually exclusive optional fields for BlueEvent and GreenEvent, composition can be used to create two separate events, BlueEvent and GreenEvent.

A consumer could then consume either one or both. To register multiple schemas in the schema registry for the same topic, TopicRecordNameStrategy subject name strategy should be used.

See events, consumer and relevant Kafka configuration attached.

spring:
kafka:
properties:
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
import com.droidablebee.avro.BlueEvent;
import com.droidablebee.avro.GreenEvent;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service;
@Service
public class EventConsumer {
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
@KafkaListener(topics = "${app.kafka.test-topic}")
public void consume(ConsumerRecord<String, SpecificRecord> consumerRecord, MessageHeaders headers) {
logger.debug("Received message: key: {} value: {}", consumerRecord.key(), consumerRecord.value());
if (consumerRecord.value() instanceof BlueEvent) {
BlueEvent blueEvent = (BlueEvent) consumerRecord.value();
} else if (consumerRecord.value() instanceof GreenEvent) {
GreenEvent greenEvent = (GreenEvent) consumerRecord.value();
} else {
handleUnknownMessage(consumerRecord);
}
}
protected void handleUnknownMessage(ConsumerRecord<String, SpecificRecord> consumerRecord) {
logger.warn("Ignoring received message with key: {} value class: {}", consumerRecord.key(), consumerRecord.value().getClass());
}
}
@namespace("com.droidablebee.avro")
protocol BlueEventProtocol {
import idl "Event.avdl";
record BlueEvent {
Event event;
string? blueSpecificField;
}
}
@namespace("com.droidablebee.avro")
protocol EventProtocol {
record Event {
uuid id;
string description;
}
}
@namespace("com.droidablebee.avro")
protocol GreenEventProtocol {
import idl "Event.avdl";
record GreenEvent {
Event event;
string? greenSpecificField;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment