Created
March 21, 2018 17:03
-
-
Save mhowlett/2cf6adeeb43f222a1d90e1e486dd00a1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Text; | |
using System.Collections.Generic; | |
using Confluent.Kafka; | |
using Confluent.Kafka.Serialization; | |
namespace Issue470 | |
{ | |
class Program | |
{ | |
public static void Run_PollWithManualCommit(string brokerList, List<string> topics) | |
{ | |
using (var consumer = new Consumer<Ignore, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8))) | |
{ | |
// Note: All event handlers are called on the main thread. | |
consumer.OnMessage += (_, msg) | |
=> | |
{ | |
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}"); | |
//Console.WriteLine($"Committing offset"); | |
//var committedOffsets = consumer.CommitAsync(msg).Result; | |
//Console.WriteLine($"Committed offset: [{string.Join(", ", committedOffsets.Offsets)}]"); | |
}; | |
consumer.OnPartitionEOF += (_, end) | |
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}"); | |
// Raised on critical errors, e.g. connection failures or all brokers down. | |
consumer.OnError += (_, error) | |
=> Console.WriteLine($"Error: {error}"); | |
// Raised on deserialization errors or when a consumed message has an error != NoError. | |
consumer.OnConsumeError += (_, msg) | |
=> Console.WriteLine($"Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}"); | |
//this is NOT called, when autocommit is disabled | |
consumer.OnOffsetsCommitted += (_, commit) => | |
{ | |
Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]"); | |
if (commit.Error) | |
{ | |
Console.WriteLine($"Failed to commit offsets: {commit.Error}"); | |
} | |
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]"); | |
}; | |
consumer.OnPartitionsAssigned += (_, partitions) => | |
{ | |
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}"); | |
consumer.Assign(partitions); | |
}; | |
consumer.OnPartitionsRevoked += (_, partitions) => | |
{ | |
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]"); | |
consumer.Unassign(); | |
}; | |
//consumer.OnStatistics += (_, json) | |
// => Console.WriteLine($"Statistics: {json}"); | |
//The subscribe() method controls which topics will be fetched in poll. | |
consumer.Subscribe(topics); | |
Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]"); | |
var cancelled = false; | |
Console.CancelKeyPress += (_, e) => { | |
e.Cancel = true; // prevent the process from terminating. | |
cancelled = true; | |
}; | |
Console.WriteLine("Ctrl-C to exit."); | |
while (!cancelled) | |
{ | |
consumer.Poll(TimeSpan.FromMilliseconds(100)); | |
} | |
} | |
} | |
private static Dictionary<string, object> constructConfig(string brokerList, bool enableAutoCommit) => | |
new Dictionary<string, object> | |
{ | |
{ "group.id", "advanced-csharp-consumer" }, | |
{ "enable.auto.commit", enableAutoCommit }, | |
{ "auto.commit.interval.ms", 5000 }, | |
{ "statistics.interval.ms", 60000 }, | |
{ "bootstrap.servers", brokerList }, | |
{ "default.topic.config", new Dictionary<string, object>() | |
{ | |
{ "auto.offset.reset", "smallest" } | |
} | |
} | |
}; | |
static void Main(string[] args) | |
{ | |
Run_PollWithManualCommit("10.200.7.144:9092", new List<string> { "mytttt" }); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment