Last active
October 27, 2021 22:42
-
-
Save wallyqs/c259c7c77c74880c487ff63a3268d12f to your computer and use it in GitHub Desktop.
NATS::Client and NATS::JetStream (nats-pure.rb v2.0.0)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'nats' | |
# Connect to server that has JetStream support, e.g. | |
# | |
# nats-server -js | |
# | |
# To include in Gemfile: | |
# | |
# source "https://rubygems.org" | |
# | |
# gem 'nats-pure', '2.0.0.pre.alpha' | |
# | |
nc = NATS.connect("localhost") | |
# Create JetStream context. | |
js = nc.jetstream | |
# Create Stream that will persist messages from foo subject. | |
begin | |
info = js.add_stream(name: "sample-stream", subjects: ["foo"]) | |
rescue => e | |
puts "Error: #{e}" | |
end | |
# Send 10 messages and wait to get an ack that they have been persisted. | |
10.times do |i| | |
ack = js.publish("foo", "hello world: #{i}", timeout: 2) | |
puts "Published: #{ack.seq}" | |
end | |
# Create pull based consumer. | |
psub = js.pull_subscribe("foo", "psub") | |
# Fetch 3 messages from consumer. | |
msgs = psub.fetch(3) | |
msgs.each do |msg| | |
puts " ACK: Stream Seq: #{msg.metadata.sequence.stream} || Consumer Seq: #{msg.metadata.sequence.consumer}" | |
msg.ack | |
end | |
# Get latest consumer info. | |
cinfo = psub.consumer_info | |
puts "Consumer '#{cinfo.name}' Pending Messages: #{cinfo.num_pending}" | |
# Subscribe is now dispatched a NATS::Msg that may include headers | |
nc.subscribe("hello") do |msg| | |
puts "Received on '#{msg.subject}': Data: #{msg.data} || Header: #{msg.header}" | |
msg.respond("OK") if msg.reply | |
end | |
sub = nc.subscribe("hello") | |
# Can use publish to send a message with headers. | |
nc.publish("hello", header: { 'quux': 'quuz'}) | |
nc.publish_msg(NATS::Msg.new(subject: "hello", data: "world", header:{ 'foo': 'bar'})) | |
# Request also supports publishing with headers. | |
msg = nc.request("hello", header: { 'a': 'b'}) | |
puts "Response #{msg.data}" | |
msg = nc.request_msg(NATS::Msg.new(subject: "hello", data: "world!!!", header:{ 'foo': 'bar'})) | |
puts "Response #{msg.data}" | |
# Can also use iterator style to consume messages now. | |
msg = sub.next_msg | |
puts "Received on '#{msg.subject}': Data: #{msg.data} || Header: #{msg.header}" | |
msg = sub.next_msg(timeout: 2) | |
puts "Received on '#{msg.subject}': Data: #{msg.data} || Header: #{msg.header}" | |
begin | |
sub.next_msg(timeout: 1) | |
rescue NATS::Timeout => e | |
# puts "Timeout since no new messages yet: #{e}" | |
end | |
nc.flush | |
nc.close |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment