Skip to content

Instantly share code, notes, and snippets.

@danmrichards
Last active March 1, 2017 17:31
Show Gist options
  • Save danmrichards/323c91143b04e0ef2870b89adc941db3 to your computer and use it in GitHub Desktop.
Save danmrichards/323c91143b04e0ef2870b89adc941db3 to your computer and use it in GitHub Desktop.
Google Pub/Sub experiment with Golang
package main
import (
"fmt"
"log"
"os"
"time"
"cloud.google.com/go/pubsub"
"golang.org/x/net/context"
"google.golang.org/api/option"
"google.golang.org/api/iterator"
)
func main() {
ctx := context.Background()
// Get the project ID from envvar.
proj := os.Getenv("GOOGLE_CLOUD_PROJECT")
if proj == "" {
fmt.Fprintf(os.Stderr, "GOOGLE_CLOUD_PROJECT environment variable must be set.\n")
os.Exit(1)
}
// Create a pubsub client.
pwd, _ := os.Getwd()
client, err := pubsub.NewClient(ctx, proj, option.WithServiceAccountFile(pwd + "/sub-test/lush-global-soa-architecture-edbe7150a322.json"))
if err != nil {
log.Fatalf("Could not create pubsub Client: %v", err)
}
// Print all the subscriptions in the project.
fmt.Println("Listing all subscriptions from the project:")
subs, err := listSub(client)
if err != nil {
log.Fatal(err)
}
for _, sub := range subs {
fmt.Println(sub)
}
t := createTopicIfNotExists(client)
// Create a new subscription.
const sub = "example-subscription"
if err := createSub(client, sub, t); err != nil {
fmt.Printf("Could not create subscription: %v\n", err)
}
// Pull messages via the subscription.
if err := pullMsgs(client, sub, t); err != nil {
log.Fatal(err)
}
// Delete the subscription.
if err := delete(client, sub); err != nil {
log.Fatal(err)
}
}
// createTopicIfNotExists - Creates a new topic if it does not exist.
//
// Params:
// c *pubsub.Client - The pub sub client.
//
// Return:
// *pubsub.Topic - The topic.
func createTopicIfNotExists(c *pubsub.Client) *pubsub.Topic {
ctx := context.Background()
const topic = "epos.transaction.created"
// Check if the topic already exists.
t := c.Topic(topic)
ok, err := t.Exists(ctx)
if err != nil {
log.Fatal(err)
}
if ok {
return t
}
// Create a topic to subscribe to.
t, err = c.CreateTopic(ctx, topic)
if err != nil {
fmt.Printf("Could not create topic: %v\n", err)
}
return t
}
// createSub - Create a new pubsub subscription.
//
// Params:
// client *pubsub.Client - The pubsub client.
// name string - The name of the subscription to create.
// topic *pubsub.Topic - The topic to subscribe to.
//
// Return:
// error - An error if it occured or nil.
func createSub(client *pubsub.Client, name string, topic *pubsub.Topic) error {
ctx := context.Background()
sub, err := client.CreateSubscription(ctx, name, topic, 20*time.Second, nil)
if err != nil {
return err
}
fmt.Printf("Created subscription: %v\n", sub)
return nil
}
// listSub - Lists all available pubsub subcriptions for a client.
//
// Params
// client *pubsub.Client - The pub sub client.
//
// Return
// []*pubsub.Subscription - Array of subscriptions.
func listSub(client *pubsub.Client) ([]*pubsub.Subscription, error) {
ctx := context.Background()
var subs []*pubsub.Subscription
it := client.Subscriptions(ctx)
for {
s, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
subs = append(subs, s)
}
return subs, nil
}
// pullMsgs - Get 10 messages for a given pubsub topic.
//
// Params:
// client *pubsub.Client - The pubsub client.
// name string - The name of the subscription to use.
// topic *pubsub.Topic - The topic to get messages from.
//
// Return:
// error - An error if it occured or nil.
func pullMsgs(client *pubsub.Client, name string, topic *pubsub.Topic) error {
ctx := context.Background()
// Get the subscription.
sub := client.Subscription(name)
it, err := sub.Pull(ctx)
if err != nil {
return err
}
defer it.Stop()
// Consume messages.
for {
msg, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
fmt.Printf("Got message: %q\n", string(msg.Data))
msg.Done(true)
}
return nil
}
// delete - Delete a subcription.
//
// Params:
// client *pubsub.Client - The pubsub client.
// name string - The name of the subscription to delete.
//
// Return:
// error - An error if it occured or nil.
func delete(client *pubsub.Client, name string) error {
ctx := context.Background()
sub := client.Subscription(name)
if err := sub.Delete(ctx); err != nil {
return err
}
fmt.Println("Subscription deleted.")
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment