Last time I looked at creating my first Apache Kafka consumer in Go, which used the now-deprecated channel-based consumer. Whilst idiomatic for Go, it has some issues which mean that the function-based consumer is recommended for use instead. So let’s go and use it!
Instead of reading from the Events()
channel of the consumer, we read events using the Poll()
function with a timeout. The way we handle events (a switch
based on their type
) is the same:
switch ev.(type) {
case *kafka.Message:
// It's a message
case kafka.PartitionEOF:
// We've finished reading messages on this partition so let's wrap up
}
We also remove the Go routine and its slightly more complex execution logic in which channels were used to indicate when to terminate processing, and instead just use a for
loop:
doTerm := false
for !doTerm {
// do polling until we're done
}
Just like in the previous example, when we receive a PartitionEOF
we then go to exit (since we make the BIG assumption that we’re only consuming from one partition)
The full code looks like this:
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
topic := "ratings"
// --
// Create Consumer instance
// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#NewConsumer
// Store the config
cm := kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "rmoff_learning_go_foo",
"enable.partition.eof": true}
// Variable p holds the new Consumer instance.
c, e := kafka.NewConsumer(&cm)
// Check for errors in creating the Consumer
if e != nil {
if ke, ok := e.(kafka.Error); ok == true {
switch ec := ke.Code(); ec {
case kafka.ErrInvalidArg:
fmt.Printf("😢 Can't create the Consumer because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
default:
fmt.Printf("😢 Can't create the Consumer (Kafka error code %d)\n\tError: %v\n", ec, e)
}
} else {
// It's not a kafka.Error
fmt.Printf("😢 Oh noes, there's a generic error creating the Consumer! %v", e.Error())
}
} else {
// Subscribe to the topic
if e := c.Subscribe(topic, nil); e != nil {
fmt.Printf("☠️ Uh oh, there was an error subscribing to the topic :\n\t%v\n", e)
} else {
doTerm := false
for !doTerm {
if ev := c.Poll(1000); ev == nil {
// the Poll timed out and we got nothin'
fmt.Printf("……\n")
continue
} else {
// The poll pulled an event, let's now
// look at the type of Event we've received
switch ev.(type) {
case *kafka.Message:
// It's a message
km := ev.(*kafka.Message)
fmt.Printf("✅ Message '%v' received from topic '%v' (partition %d at offset %d)\n",
string(km.Value),
string(*km.TopicPartition.Topic),
km.TopicPartition.Partition,
km.TopicPartition.Offset)
case kafka.PartitionEOF:
// We've finished reading messages on this partition so let's wrap up
// n.b. this is a BIG assumption that we are only consuming from one partition
pe := ev.(kafka.PartitionEOF)
fmt.Printf("🌆 Got to the end of partition %v on topic %v at offset %v\n",
pe.Partition,
string(*pe.Topic),
pe.Offset)
doTerm = true
case kafka.OffsetsCommitted:
continue
case kafka.Error:
// It's an error
em := ev.(kafka.Error)
fmt.Printf("☠️ Uh oh, caught an error:\n\t%v\n", em)
default:
// It's not anything we were expecting
fmt.Printf("Got an event that's not a Message, Error, or PartitionEOF 👻\n\t%v\n", ev)
}
}
}
fmt.Printf("👋 … and we're done. Closing the consumer and exiting.\n")
// Now we can exit
c.Close()
}
}
}
I run it using a Docker Compose which also runs a data generator in Kafka Connect populating a topic for the consumer to read from. When I shut down Kafka Connect the data generator stops, the consumer reads to the end of the topic, and exits:
……
……
……
✅ Message 'Struct{ip=122.249.79.233,userid=20,remote_user=-,time=81,_time=81,request=GET /site/login.html HTTP/1.1,status=405,bytes=1289,referrer=-,agent=Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)}' received from topic 'ratings' (partition 0 at offset 2522)
✅ Message 'Struct{ip=222.245.174.248,userid=14,remote_user=-,time=91,_time=91,request=GET /index.html HTTP/1.1,status=404,bytes=278,referrer=-,agent=Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)}' received from topic 'ratings' (partition 0 at offset 2523)
🌆 Got to the end of partition 0 on topic ratings at offset 2524
👋 … and we're done. Closing the consumer and exiting.