0 0 Share PDF

Getting started with kafka

Article ID: KB000927

What is Kafka?

Kafka is a distributed append log; in a simplistic view it is like a file on a filesystem. Producers can append data (echo 'data' >> file.dat), and consumers subscribe to a certain file (tail -f file.dat). Kafka, like a POSIX filesystem, makes sure that the order of the data put in (in the analogy via echo) is received by the consumer in the same order (via tail -f). In addition, Kafka provides an ever-increasing counter and a timestamp for each consumed message.

To break the analogy, you do not have to go far.

Consider the need for multiple consumers, subscribing to a given file (in Kafka called topics). If they are consuming independently, it is the equivalent to executing tail -f file.dat multiple times - ok, that works. But what if you don't want them to process a single line twice - no POSIX breaks. Or even worse: if multiple producers are writing in the same topic? Furthermore, scaling the system out to process more data needs to spread the load across multiple node; how do you coordinate the write/read then?

Zookeeper

Kafka uses Zookeeper (simplified: solid, reliable, transactional key/value store) to keep track of the state of producers, topics, and consumers. This will be explained later.

Spreading load across a given topic on multiple nodes chunks up the topic into multiple partitions. Zookeeper makes sure that the messages are still ordered and that the broker can replicate messages, so that failures are handled gracefully.

A producer connects to one of the partitions and publishs its data. Consumers create a consumer_group which keeps track of which message was already processed. A consumer group can consist of multiple consumers, which never process a given message twice. If the topic has more partitions than consumers in a group, they simply connect to multiple partitions (in the case of only one consumer, it connects to all).

That is actually all you need to know to get started.

Example Usage

Note: The compose file at the end of this article can be used to demonstrate this example.

First, create a network to connect to Kafka, with non-service containers. The network is made attachable.

$ docker network create -d overlay --attachable kafka-net
zhd4d9ecqvnnz1ebev4wzwqb5

The first service is a single Zookeeper container, plus a Zookeeper Web UI to discover what is going on.

$ docker service create --network kafka-net --name=zookeeper \
          --publish 2181:2181 qnib/plain-zookeeper:2018-04-25
z4of88rgbei8n4n7i8bdyqei3
overall progress: 1 out of 1 tasks
1/1: running   [==================================================>]
verify: Service converged
$ docker service create --network kafka-net --name=zkui \
          --publish 9090:9090 \
          qnib/plain-zkui@sha256:30c4aa1236ee90e4274a9059a5fa87de2ee778d9bfa3cb48c4c9aafe7cfa1a13
s8xydg5kw9n83fswj0o5fo703
overall progress: 1 out of 1 tasks
1/1: running   [==================================================>]
verify: Service converged
$ docker service ls --format 'table {{.Name}}\t{{.Replicas}}\t{{.Ports}}'
NAME                REPLICAS            PORTS
zkui                1/1                 *:9090->9090/tcp
zookeeper           1/1                 *:2181->2181/tcp

Next, openlocalhost:9090 in a web browser to see what Zookeeper keeps. The login is admin/manager.

zkui_empty.png

Spin up Brokers

To populate Zookeeper, bring up at least one broker using these command line options:

  • --hostname uses Docker templates to derive the hostname from the placement decisions. This makes the containers identifiable
  • KAFKA_BROKER_ID pins the identifier of the broker to its slot-id. If a container goes down, the container is replaced and since the ID is set to the slot-id, it will take the place of the earlier one. Otherwise the broker-id is increased with each new broker.
  • ZK_SERVERS simply points Kafka to the Zookeeper service.
$ docker service create --network kafka-net --name broker \
         --hostname="{{.Service.Name}}.{{.Task.Slot}}.{{.Task.ID}}" \
         -e KAFKA_BROKER_ID={{.Task.Slot}} -e ZK_SERVERS=tasks.zookeeper \
         qnib/plain-kafka:2018-04-25_1.1.0
f9qcjupwr923mojm1rs2hvls9
overall progress: 1 out of 1 tasks
1/1: running   [==================================================>]
verify: Service converged

Now the Zookeeper interface displays the initial Kafka data.

kafka_init.png

Create Topics

Next, use the Kafka tooling within the broker image to create a first topic:

$ docker exec -t -e JMX_PORT="" \
   $(docker ps -q --filter 'label=com.docker.swarm.service.name=broker'|head -n1) \
     /opt/kafka/bin/kafka-topics.sh --zookeeper tasks.zookeeper:2181 \
     --partitions=1 --replication-factor=1 --create --topic test
Created topic "test".

Now zkui actually can list the topic.

zkui_topic_test.png

Kafka Manager

While you can view the Kafka configuration data within the Zookeeper management interface, this is not tailored to provide easy-to-use management of Kafka. To help you manage the cluster more easily, provision Yahoo's Kafka Manager interface:

$ docker service create --network kafka-net --name manager \
          -e ZOOKEEPER_HOSTS=tasks.zookeeper --publish=9000:9000 \
          qnib/plain-kafka-manager:2018-04-25

Open localhost:9090 in a web browser and notice an empty cluster list.

kafka_manager_landing.png

Next, add a cluster with the default config.

kafka_manager_add.png

The Kafka Manager allows you to control the Kafka cluster from a single WebUI.

kafka_manager_topics.png

Populate Kakfa

To populate Kafka, provision a golang-based container, which sends a couple of messages.

$ docker run -t --rm --network kafka-net qnib/golang-kafka-producer:2018-05-01.5 5
Delivered message to test[0]@0

Within the Kafka Manager interface, you can now see changes at clusters/kafka/topics/test.

kafka_manager_test.png

Scale Out Brokers

To run more then one broker, scale the broker service up.

$ docker service update --replicas=3 broker
broker
overall progress: 3 out of 3 tasks
1/3: running   [==================================================>]
2/3: running   [==================================================>]
3/3: running   [==================================================>]
verify: Service converged

Now Kafka Manager shows three brokers.

kafka_manager_scaled.png

By creating a topic with two partitions and the replication factor set to 2, you can leverage all three brokers.

$ docker exec -t -e JMX_PORT="" \
   $(docker ps -q --filter 'label=com.docker.swarm.service.name=broker'|head -n1) \
     /opt/kafka/bin/kafka-topics.sh --zookeeper tasks.zookeeper:2181 \
     --partitions=2 --replication-factor=2 --create --topic scaled
Created topic "scaled".

Next, create a service to publish with multiple producers to all partitions.

$ docker service create --network kafka-net --name producer \
          --replicas=3 -e KAFKA_BROKER=tasks.broker \
          -e KAFKA_TOPIC=scaled \
          qnib/golang-kafka-producer:2018-05-01.5
$ sleep 10 && docker service logs producer |head
producer.1.vvh28cp9tndh@linuxkit-025000000001    | Start producer without msg limit. Delay between mgs '1s'
producer.2.p60w3gyfwz5e@linuxkit-025000000001    | Start producer without msg limit. Delay between mgs '1s'
producer.3.pxc2jmdii53j@linuxkit-025000000001    | Start producer without msg limit. Delay between mgs '1s'
producer.2.p60w3gyfwz5e@linuxkit-025000000001    | Delivered message to scaled[0]@0
producer.2.p60w3gyfwz5e@linuxkit-025000000001    | Delivered message to scaled[1]@1
producer.1.vvh28cp9tndh@linuxkit-025000000001    | Delivered message to scaled[0]@1
producer.3.pxc2jmdii53j@linuxkit-025000000001    | Delivered message to scaled[1]@0
producer.1.vvh28cp9tndh@linuxkit-025000000001    | Delivered message to scaled[1]@3
producer.2.p60w3gyfwz5e@linuxkit-025000000001    | Delivered message to scaled[0]@2
producer.3.pxc2jmdii53j@linuxkit-025000000001    | Delivered message to scaled[1]@2

The number in brackets (scaled[0]) shows which partition was used. Now the Kafka Manager shows activity on all brokers.

kafka_producer_scaled.png

Compose File

To recreate the stack in one step, use the following compose file:

version: '3'
services:
  zookeeper:
    image: qnib/plain-zookeeper:2018-04-25
    ports:
      - "2181:2181"
  zkui:
    image: qnib/plain-zkui@sha256:30c4aa1236ee90e4274a9059a5fa87de2ee778d9bfa3cb48c4c9aafe7cfa1a13
    ports:
      - "9090:9090"
  broker:
    image: qnib/plain-kafka:2018-04-25_1.1.0
    hostname: "{{.Service.Name}}.{{.Task.Slot}}.{{.Task.ID}}"
    ports:
      - "9092:9092"
    deploy:
      replicas: 3
    environment:
      - KAFKA_BROKER_ID={{.Task.Slot}}
  manager:
    image: qnib/plain-kafka-manager:2018-04-25@sha256:8a30ced91432b9b3ef21bc66c4b5635690e1af823684fe6e29ab866d839d10ca
    ports:
      - "9000:9000"
    environment:
      ZOOKEEPER_HOSTS: "tasks.zookeeper:2181"
  producer:
    image: qnib/golang-kafka-producer:2018-05-01.12
    deploy:
      replicas: 3
    depends_on:
      - broker
    environment:
      - KAFKA_BROKER=tasks.broker
      - KAFKA_CREATE_TOPICS=test:1:1,scaled:2:2
      - KAFKA_TOPIC=scaled
      - MSG_COUNT=-1

Limitations

The variable to predefine topics (KAFKA_CREATE_TOPICS) does not work for now (version incompatibility). However, the consumer will create the topic it is publishing to but without replication and partition. You can use the Kafka Manager to change the settings.

Please keep in mind that the setup does not delete BROKERIDs from Zookeeper. Scaling down a service will therefore result in orphant brokers. Confluence dealt with this by creating a services to keep the state clean. For further information please go through the blog post Introducing the Confluent Operator: Apache Kafka® on Kubernetes Made Simple.