Kafka Architecture

LinkedIn engineering built Kafka to support real-time analytics. Kafka was designed to feed analytics system that did real-time processing of streams. LinkedIn developed Kafka as a unified platform for real-time handling of streaming data feeds. The goal behind Kafka, build a high-throughput streaming data platform that supports high-volume event streams like log aggregation, user activity, etc.

To scale to meet the demands of LinkedIn Kafka is distributed, supports sharding and load balancing. Scaling needs inspired Kafka’s partitioning and consumer model. Kafka scales writes and reads with partitioned, distributed, commit logs. Kafka’s sharding is called partitioning. (Kinesis which is similar to Kafka calls partitions shards.)

What is Kafka?

Kafka’s growth is exploding, more than 13 of all Fortune 500 companies use Kafka. These companies includes the top ten travel companies, 7 of top ten banks, 8 of top ten insurance companies, 9 of top ten telecom companies, and much more. LinkedIn, Microsoft and Netflix process four comma messages a day with Kafka (1,000,000,000,000). Kafka is used for real-time streams of data, used to collect big data or to do real time analysis or both). Kafka is used with in-memory microservices to provide durability and it can be used to feed events to CEP (complex event streaming systems), and IOT/IFTTT style automation systems.


Why Kafka?

Kafka often gets used in the real-time streaming data architectures to provide real-time analytics. Since Kafka is a fast, scalable, durable, and fault-tolerant publish-subscribe messaging system, Kafka is used in use cases where JMS, RabbitMQ, and AMQP may not even be considered due to volume and responsiveness. Kafka has higher throughput, reliability and replication characteristics which make it applicable for things like tracking service calls (tracks every call) or track IOT sensors data where a traditional MOM might not be considered.

Kafka can works with Flume/Flafka, Spark Streaming, Storm, HBase, Flink and Spark for real-time ingesting, analysis and processing of streaming data. Kafka is a data stream used to feed Hadoop BigData lakes. Kafka brokers support massive message streams for low-latency follow-up analysis in Hadoop or Spark. Also, Kafka Streaming (a subproject) can be used for real-time analytics.


Kafka use cases

In short, Kafka gets used for stream processing, website activity tracking, metrics collection and monitoring, log aggregation, real-time analytics, CEP, ingesting data into Spark, ingesting data into Hadoop, CQRS, replay messages, error recovery, and guaranteed distributed commit log for in-memory computing (microservices).


Who uses Kafka?

A lot of large companies who handle a lot of data use Kafka. LinkedIn, where it originated, uses it to track activity data and operational metrics. Twitter uses it as part of Storm to provide a stream processing infrastructure. Square uses Kafka as a bus to move all system events to various Square data centers (logs, custom events, metrics, and so on), outputs to Splunk, Graphite (dashboards), and to implement an Esper-like/CEP alerting systems. It gets used by other companies too like Spotify, Uber, Tumbler, Goldman Sachs, PayPal, Box, Cisco, CloudFlare, NetFlix, and much more.


Why is Kafka so popular?

Kafka has operational simplicity. Kafka is to set up and use, and it is easy to reason how Kafka works. However, the main reason Kafka is very popular is its excellent performance. It has other characteristics as well, but so do other messaging systems. Kafka has great performance, and it is stable, provides reliable durability, has a flexible publish-subscribe/queue that scales well with N-number of consumer groups, has robust replication, provides Producers with tunable consistency guarantees, and it provides preserved ordering at shard level (Kafka Topic Partition). In addition, Kafka works well with systems that have data streams to process and enables those systems to aggregate, transform & load into other stores. But none of those characteristics would matter if Kafka was slow. The most important reason Kafka is popular is Kafka’s exceptional performance.


Why is Kafka so Fast?

Kafka relies heavily on the OS kernel to move data around quickly. It relies on the principals of Zero Copy. Kafka enables you to batch data records into chunks. These batches of data can be seen end to end from Producer to file system (Kafka Topic Log) to the Consumer. Batching allows for more efficient data compression and reduces I/O latency. Kafka writes to the immutable commit log to the disk sequential; thus, avoids random disk access, slow disk seeking. Kafka provides horizontal Scale through sharding. It shards a Topic Log into hundreds potentially thousands of partitions to thousands of servers. This sharding allows Kafka to handle massive load.


Kafka: Streaming Architecture

Kafka gets used most often for real-time streaming of data into other systems. Kafka is a middle layer to decouple your real-time data pipelines. Kafka core is not good for direct computations such as data aggregations, or CEP. Kafka Streaming which is part of the Kafka ecosystem does provide the ability to do real-time analytics. Kafka can be used to feed fast lane systems (real-time, and operational data systems) like Storm, Flink, Spark Streaming and your services and CEP systems. Kafka is also used to stream data for batch data analysis. Kafka feeds Hadoop. It streams data into your BigData platform or into RDBMS, Cassandra, Spark, or even S3 for some future data analysis. These data stores often support data analysis, reporting, data science crunching, compliance auditing, and backups.


Kafka Streaming Architecture Diagram

what is kafka - Kafka Streaming Architecture Diagram

Now let’s truly answer the question.

Kafka is…

Kafka is a distributed streaming platform that is used publish and subscribe to streams of records. Kafka gets used for fault tolerant storage. Kafka replicates topic log partitions to multiple servers. Kafka is designed to allow your apps to process records as they occur. Kafka is fast, uses IO efficiently by batching, compressing records. Kafka gets used for decoupling data streams. Kafka is used to stream data into data lakes, applications and real-time stream analytics systems.


Kafka Decoupling Data Streams

what is kafka - Kafka-Decoupling-Data-Streams


Kafka is Polyglot

Kafka communication from clients and servers uses a wire protocol over TCP that is versioned and documented. Kafka promises to maintain backwards compatibility with older clients, and many languages are supported. There are clients in C#, Java, C, Python, Ruby and many more languages. The Kafka ecosystem also provides REST proxy allows easy integration via HTTP and JSON, which makes integration even easier. Kafka also supports Avro schemas via the Confluent Schema Registry for Kafka. Avro and the Schema Registry allows complex records to be produced and read by clients in many programming languages and allows for the evolution of the records. Kafka is truly polyglot.


Kafka is useful - Kafka Usage

Kafka allows you to build real-time streaming data pipe-lines. Kafka enable in-memory microservices (actors, AkkaBaratine.ioQBit, reactors, reactiveVert.xRxJavaSpring Reactor) Kafka allows you to build real-time streaming applications that react to streams to do real-time data analytics, transform, react, aggregate, join real-time data flows and perform CEP (complex event processing).

You can use Kafka to aid in gathering Metrics/KPIs, aggregate statistics from many sources implement event sourcing, use it with microservices (in-memory) and actor systems to implement in-memory services (external commit log for distributed systems).

You can use Kafka to replicate data between nodes, to re-sync for nodes, to restore state. While it is true, Kafka used for real-time data analytics and stream processing, you can also use it for log aggregation, messaging, click-stream tracking, audit trails, and much more.

In a world where data science and analytics is a big deal, then capturing data to feed into your data lakes and real-time analytics systems is a big deal, and since Kafka can hold up to these kinds of strenuous use cases, Kafka is a big deal.


Kafka is a scalable message storage

Kafka is a good storage system for records/messages. Kafka acts like high-speed file system for commit log storage and replication. These characteristics make Kafka useful for all manners of applications. Records written to Kafka topics are persisted to disk and replicated to other servers for fault-tolerance. Since modern drives are fast and quite large, this fits well and is very useful. Kafka Producers can wait on acknowledgment, so messages are durable as the producer write not complete until the message replicates. The Kafka disk structure scales well. Modern disk drives have very high throughput when writing in large streaming batches. Also, Kafka Clients/Consumers can control read position (offset) which allows for use cases like replaying the log if there was a critical bug (fix the bug and the replay). And since offsets are tracked per consumer group, which we talk about in the Kafka Architecture article, the consumers can be quite flexible (e.g., replaying the log).


Kafka Record Retention

Kafka cluster retains all published records and if you don’t set a limit, it will keep records until it runs out of disk space. You can set time-based limits (configurable retention period), size-based limits (configurable based on size), or use compaction (keeps the latest version of record using key). You can, for example, set a retention policy of three days or two weeks or a month. The records in the topic log are available for consumption until discarded by time, size or compaction. The consumption speed not impacted by size as Kafka always writes to the end of the topic log.


Let’s Review

Why is Kafka so fast?

Kafka is fast because it avoids copying buffers in-memory (Zero Copy), and streams data to immutable logs instead of using random access.

How fast is Kafka usage growing?

When you consider Kafka is six years old, and over 13 of fortune 500 companies use Kafka, then the only answer is fast, very fast.

How is Kafka getting used?

Kafka is used to feed data lakes like Hadoop, and to feed real-time analytics systems like Flink, Storm and Spark Streaming.

Where does Kafka fit in the Big Data Architecture?

Kafka is a data stream that fills up Big Data’s data lakes.

How does Kafka relate to real-time analytics?

Kafka feeds data to real-time analytics systems like Storm, Spark Streaming, Flink, and Kafka Streaming.

Who uses Kafka?

The top ten travel companies, 7 of top ten banks, 8 of top ten insurance companies, 9 of top ten telecom companies, LinkedIn, Microsoft, Netflix and many more companies.

How does Kafka decouple streams of data?

It decouple streams of data by allowing multiple consumer groups that can each control where in the topic partition they are. The producers don’t know about the consumers. Since the Kafka broker delegates the log partition offset (where the consumer is in the record stream) to the clients (Consumers), the message consumption is flexible. This allows you to feed your high-latency daily or hourly data analysis in Spark and Hadoop and the same time you are feeding microservices real-time messages, sending events to your CEP system and feeding data to your real-time analytic systems.

What are some common use cases for Kafka?

Kafka feeds data to real-time analytics systems like Storm, Spark Streaming, Flink, and Kafka Streaming. It also gets used for log aggregation, feeding events to CEP systems, and commit log for in-memory microservices.

Kafka Architecture

Kafka consists of Records, Topics, Consumers, Producers, Brokers, Logs, Partitions, and Clusters. Records can have key (optional), value and timestamp. Kafka Records are immutable. A Kafka Topic is a stream of records ("/orders", "/user-signups"). You can think of a Topic as a feed name. A topic has a Log which is the topic’s storage on disk. A Topic Log is broken up into partitions and segments. The Kafka Producer API is used to produce streams of data records. The Kafka Consumer API is used to consume a stream of records from Kafka. A Broker is a Kafka server that runs in a Kafka Cluster. Kafka Brokers form a cluster. The Kafka Cluster consists of many Kafka Brokers on many servers. Broker sometimes refer to more of a logical system or as Kafka as a whole.

Kafka Architecture: Topics, Producers and Consumers


Kafka uses ZooKeeper to manage the cluster. ZooKeeper is used to coordinate the brokers/cluster topology. ZooKeeper is a consistent file system for configuration information. ZooKeeper gets used for leadership election for Broker Topic Partition Leaders.

Kafka Architecture: Core Kafka


Kafka needs ZooKeeper

Kafka uses Zookeeper to do leadership election of Kafka Broker and Topic Partition pairs. Kafka uses Zookeeper to manage service discovery for Kafka Brokers that form the cluster. Zookeeper sends changes of the topology to Kafka, so each node in the cluster knows when a new broker joined, a Broker died, a topic was removed or a topic was added, etc. Zookeeper provides an in-sync view of Kafka Cluster configuration.

Kafka Producer, Consumer, Topic details

Kafka producers write to Topics. Kafka consumers read from Topics. A topic is associated with a log which is data structure on disk. Kafka appends records from a producer(s) to the end of a topic log. A topic log consists of many partitions that are spread over multiple files which can be spread on multiple Kafka cluster nodes. Consumers read from Kafka topics at their cadence and can pick where they are (offset) in the topic log. Each consumer group tracks offset from where they left off reading. Kafka distributes topic log partitions on different nodes in a cluster for high performance with horizontal scalability. Spreading partitions aids in writing data quickly. Topic log partitions are Kafka way to shard reads and writes to the topic log. Also, partitions are needed to have multiple consumers in a consumer group work at the same time. Kafka replicates partitions to many nodes to provide failover.

Kafka Architecture: Topic Partition, Consumer group, Offset and Producers


Kafka Scale and Speed

How can Kafka scale if multiple producers and consumers read and write to same Kafka topic log at the same time? First Kafka is fast, Kafka writes to filesystem sequentially which is fast. On a modern fast drive, Kafka can easily write up to 700 MB or more bytes of data a second. Kafka scales writes and reads by sharding topic logs into partitions. Recall topics logs can be split into multiple partitions which can be stored on multiple different servers, and those servers can use multiple disks. Multiple producers can write to different partitions of the same topic. Multiple consumers from multiple consumer groups can read from different partitions efficiently.


Kafka Brokers

Kafka cluster is made up of multiple Kafka Brokers. Each Kafka Broker has a unique ID (number). Kafka Brokers contain topic log partitions. Connecting to one broker bootstraps a client to the entire Kafka cluster. For failover, you want to start with at least three to five brokers. A Kafka cluster can have, 10, 100, or 1,000 brokers in a cluster if needed.

Kafka Cluster, Failover, ISRs

Kafka supports replication to support failover. Recall that Kafka uses ZooKeeper to form Kafka Brokers into a cluster and each node in Kafka cluster is called a Kafka Broker. Topic partitions can be replicated across multiple nodes for failover. The topic should have a replication factor greater than 1 (2, or 3). For example, if you are running in AWS, you would want to be able to survive a single availability zone outage. If one Kafka Broker goes down, then the Kafka Broker which is an ISR (in-sync replica) can serve data.

Kafka Failover vs. Kafka Disaster Recovery

Kafka uses replication for failover. Replication of Kafka topic log partitions allows for failure of a rack or AWS availability zone (AZ). You need a replication factor of at least 3 to survive a single AZ failure. You need to use Mirror Maker, a Kafka utility that ships with Kafka core, for disaster recovery. Mirror Maker replicates a Kafka cluster to another data-center or AWS region. They call what Mirror Maker does mirroring as not to be confused with replication.

Note there is no hard and fast rule on how you have to set up the Kafka cluster per se. You could, for example, set up the whole cluster in a single AZ so you can use AWS enhanced networking and placement groups for higher throughput, and then use Mirror Maker to mirror the cluster to another AZ in the same region as a hot-standby.

Kafka Architecture: Kafka Zookeeper Coordination


Kafka Topics Architecture

Please continue reading about Kafka Architecture. The next article covers Kafka Topics Architecture with a discussion of how partitions are used for fail-over and parallel processing.

Kafka Topic Architecture - Replication, Failover and Parallel Processing

This article covers some lower level details of Kafka topic architecture. It is a continuation of the Kafka Architecture article.

This article covers Kafka Topic’s Architecture with a discussion of how partitions are used for fail-over and parallel processing.

Kafka Topics, Logs, Partitions

Recall that a Kafka topic is a named stream of records. Kafka stores topics in logs. A topic log is broken up into partitions. Kafka spreads log’s partitions across multiple servers or disks. Think of a topic as a category, stream name or feed.

Topics are inherently published and subscribe style messaging. A Topic can have zero or many subscribers called consumer groups. Topics are broken up into partitions for speed, scalability, and size.

Kafka Topic Partitions

Kafka breaks topic logs up into partitions. A record is stored on a partition usually by record key if the key is present and round-robin if the key is missing (default behavior). The record key, by default, determines which partition a producer sends the record.

Kafka uses partitions to scale a topic across many servers for producer writes. Also, Kafka also uses partitions to facilitate parallel consumers. Consumers consume records in parallel up to the number of partitions.

The order guaranteed per partition. If partitioning by key then all records for the key will be on the same partition which is useful if you ever have to replay the log. Kafka can replicate partitions to multiple brokers for failover.

Kafka Topic Log Partition’s Ordering and Cardinality

Kafka maintains record order only in a single partition. A partition is an ordered, immutable record sequence. Kafka continually appended to partitions using the partition as a structured commit log. Records in partitions are assigned sequential id number called the offset. The offset identifies each record location within the partition. Topic partitions allow Kafka log to scale beyond a size that will fit on a single server. Topic partitions must fit on servers that host it, but topics can span many partitions hosted on many servers. Also, topic partitions are a unit of parallelism - a partition can only be worked on by one consumer in a consumer group at a time. Consumers can run in their own process or their own thread. If a consumer stops, Kafka spreads partitions across the remaining consumer in the same consumer group.

Kafka Architecture: Topic Partition Layout and Offsets


Kafka Topic Partition Replication

Kafka can replicate partitions across a configurable number of Kafka servers which is used for fault tolerance. Each partition has a leader server and zero or more follower servers. Leaders handle all read and write requests for a partition.

Followers replicate leaders and take over if the leader dies. Kafka uses also uses partitions for parallel consumer handling within a group. Kafka distributes topic log partitions over servers in the Kafka cluster. Each server handles its share of data and requests by sharing partition leadership.

Replication: Kafka Partition Leaders, Followers and ISRs.

Kafka chooses one broker’s partition’s replicas as leader using ZooKeeper.

The broker that has the partition leader handles all reads and writes of records for the partition. Kafka replicates writes to the leader partition to followers (node/partition pair). A follower that is in-sync is called an ISR (in-sync replica). If a partition leader fails, Kafka chooses a new ISR as the new leader.

Kafka Architecture: Kafka Replication - Replicating to Partition 0

The record is considered “committed” when all ISRs for partition wrote to their log. Only committed records are readable from consumer. Another partition can be owned by another leader on another Kafka Broker.

Kafka Architecture: Kafka Replication - Replicating to Partition 1


Kafka Topic Architecture in Review

What is an ISR?

An ISR is an in-sync replica. If a leader fails, an ISR is picked to be a new leader.

How does Kafka scale consumers?

Kafka scales consumers by partition such that each consumer gets its share of partitions. A consumer can have more than one partition, but a partition can only be used by one consumer in a consumer group at a time. If you only have one partition, then you can only have one consumer.

What are leaders? Followers?

Leaders perform all reads and writes to a particular topic partition. Followers replicate leaders.

How does Kafka perform failover for consumers?

If a consumer in a consumer group dies, the partitions assigned to that consumer is divided up amongst the remaining consumers in that group.

How does Kafka perform failover for Brokers?

If a broker dies, then Kafka divides up leadership of its topic partitions to the remaining brokers in the cluster.

Kafka Producer Architecture

Please continue reading about Kafka Architecture. The next article covers Kafka Producer Architecture with a discussion of how partitions are picked for records.

We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka trainingKafka consultingKafka support and helps setting up Kafka clusters in AWS.

Kafka Producers

Kafka producers send records to topics. The records are sometimes referred to as messages.

The producer picks which partition to send a record to per topic. The producer can send records round-robin. The producer could implement priority systems based on sending records to certain partitions based on the priority of the record.

Generally speaking, producers send records to a partition based on the record’s key. The default partitioner for Java uses a hash of the record’s key to choose the partition or uses a round-robin strategy if the record has no key.

The important concept here is that the producer picks partition.

Kafka Architecture: Kafka Producers


Producers are writing at Offset 12 while at the same time Consumer Group A is Reading from Offset 9.


Kafka Producers write cadence and partitioning of records

Producers write at their cadence so the order of Records cannot be guaranteed across partitions. The producers get to configure their consistency/durability level (ack=0, ack=all, ack=1), which we will cover later. Producers pick the partition such that Record/messages go to a given partition based on the data. For example, you could have all the events of a certain ‘employeeId’ go to the same partition. If order within a partition is not needed, a ‘Round Robin’ partition strategy can be used, so Records get evenly distributed across partitions.


Review of Producers

Can producers occasionally write faster than consumers?

Yes. A producer could have a burst of records, and a consumer does not have to be on the same page as the consumer.

What is the default partition strategy for producers without using a key?

Round-Robin

What is the default partition strategy for Producers using a key?

Records with the same key get sent to the same partition.

What picks which partition a record is sent to?

The Producer picks which partition a record goes to.

We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka trainingKafka consultingKafka support and helps setting up Kafka clusters in AWS.

Kafka Consumer Architecture

Please continue reading about Kafka Architecture. The next article covers Kafka Consumer Architecture with a discussion of how records are divided up among consumers in a consumer group, consumer failover, and consumer load balancing.

Kafka Consumer Groups

You group consumers into a consumer group by use case or function of the group. One consumer group might be responsible for delivering records to high-speed, in-memory microservices while another consumer group is streaming those same records to Hadoop. Consumer groups have names to identify them from other consumer groups.

A consumer group has a unique id. Each consumer group is a subscriber to one or more Kafka topics. Each consumer group maintains its offset per topic partition. If you need multiple subscribers, then you have multiple consumer groups. A record gets delivered to only one consumer in a consumer group.

Each consumer in a consumer group processes records and only one consumer in that group will get the same record. Consumers in a consumer group load balance record processing.


Kafka Architecture: Kafka Consumer Groups

Consumers remember offset where they left off reading. Consumers groups each have their own offset per partition.


Kafka Consumer Load Share

Kafka consumer consumption divides partitions over consumer instances within a consumer group. Each consumer in the consumer group is an exclusive consumer of a “fair share” of partitions. This is how Kafka does load balancing of consumers in a consumer group. Consumer membership within a consumer group is handled by the Kafka protocol dynamically. If new consumers join a consumer group, it gets a share of partitions. If a consumer dies, its partitions are split among the remaining live consumers in the consumer group. This is how Kafka does fail over of consumers in a consumer group.


Kafka Consumer Failover

Consumers notify the Kafka broker when they have successfully processed a record, which advances the offset.

If a consumer fails before sending commit offset to Kafka broker, then a different consumer can continue from the last committed offset.

If a consumer fails after processing the record but before sending the commit to the broker, then some Kafka records could be reprocessed. In this scenario, Kafka implements the at least once behavior, and you should make sure the messages (record deliveries ) are idempotent.


Offset management

Kafka stores offset data in a topic called "__consumer_offset". These topics use log compaction, which means they only save the most recent value per key.

When a consumer has processed data, it should commit offsets. If consumer process dies, it will be able to start up and start reading where it left off based on offset stored in "__consumer_offset" or as discussed another consumer in the consumer group can take over.


What can Kafka consumers see?

What records can be consumed by a Kafka consumer? Consumers can’t read un-replicated data. Kafka consumers can only consume messages beyond the “High Watermark” offset of the partition. “Log end offset” is offset of the last record written to log partition and where producers writes to next.

“High Watermark” is the offset of the last record that was successfully replicated to all partition’s followers. Consumer only reads up to the “High Watermark”.


Consumer to Partition Cardinality - Load sharing redux

Only a single consumer from the same consumer group can access a single partition. If consumer group count exceeds the partition count, then the extra consumers remain idle. Kafka can use the idle consumers for failover. If there are more partitions than consumer group, then some consumers will read from more than one partition.

Kafka Architecture: Consumer Group Consumers to Partitions


Notice server 1 has topic partition P2, P3, and P4 while server 2 has partition P0, P1, and P5. Notice that Consumer C0 from Consumer Group A is processing records from P0 and P2. Notice that no single partition is shared by any consumer from any consumer group. Notice that each partition gets its fair share of partitions for the topics.


Multi-threaded Kafka Consumers

You can run more than one Consumer in a JVM process by using threads.

Consumer with many threads

If processing a record takes a while, a single Consumer can run multiple threads to process records, but it is harder to manage offset for each Thread/Task. If one consumer runs multiple threads, then two messages on the same partitions could be processed by two different threads which make it hard to guarantee record delivery order without complex thread coordination. This setup might be appropriate if processing a single task takes a long time, but try to avoid it.

Thread per consumer

If you need to run multiple consumers, then run each consumer in their own thread. This way Kafka can deliver record batches to the consumer and the consumer does not have to worry about the offset ordering. A thread per consumer makes it easier to manage offsets. It is also simpler to manage failover (each process runs X num of consumer threads) as you can allow Kafka to do the brunt of the work.

Kafka Consumer Review

What is a consumer group?

A consumer group is a group of related consumers that perform a task, like putting data into Hadoop or sending messages to a service. Consumer groups each have unique offsets per partition. Different consumer groups can read from different locations in a partition.

Does each consumer group have its own offset?

Yes. The consumer groups have their own offset for every partition in the topic which is unique to what other consumer groups have.

When can a consumer see a record?

A consumer can see a record after the record gets fully replicated to all followers.

What happens if there are more consumers than partitions?

The extra consumers remain idle until another consumer dies.

What happens if you run multiple consumers in many threads in the same JVM?

Each thread manages a share of partitions for that consumer group.

We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka trainingKafka consultingKafka support and helps setting up Kafka clusters in AWS.

Kafka Architecture: Low-Level Design

This post really picks off from our series on Kafka architecture which includes Kafka topics architectureKafka producer architecture,Kafka consumer architecture and Kafka ecosystem architecture.

This article is heavily inspired by the Kafka section on design. You can think of it as the cliff notes.


Kafka Design Motivation

LinkedIn engineering built Kafka to support real-time analytics. Kafka was designed to feed analytics system that did real-time processing of streams. LinkedIn developed Kafka as a unified platform for real-time handling of streaming data feeds. The goal behind Kafka, build a high-throughput streaming data platform that supports high-volume event streams like log aggregation, user activity, etc.

To scale to meet the demands of LinkedIn Kafka is distributed, supports sharding and load balancing. Scaling needs inspired Kafka’s partitioning and consumer model. Kafka scales writes and reads with partitioned, distributed, commit logs. Kafka’s sharding is called partitioning. (Kinesis which is similar to Kafka calls partitions shards.)

A database shard is a horizontal partition of data in a database or search engine. Each individual partition is referred to as a shard or database shard. Each shard is held on a separate database server instance, to spread load.  Sharding

Kafka was designed to handle periodic large data loads from offline systems as well as traditional messaging use-cases, low-latency.

MOM is message oriented middleware think IBM MQSeries, JMS, ActiveMQ, and RabbitMQ. Like many MOMs, Kafka is fault-tolerance for node failures through replication and leadership election. However, the design of Kafka is more like a distributed database transaction log than a traditional messaging system. Unlike many MOMs, Kafka replication was built into the low-level design and is not an afterthought.


Persistence: Embrace filesystem

Kafka relies on the filesystem for storing and caching records.

The disk performance of hard drives performance of sequential writes is fast (really fast). JBOD is just a bunch of disk drives. JBOD configuration with six 7200rpm SATA RAID-5 array is about 600MB/sec. Like Cassandra tables, Kafka logs are write only structures, meaning, data gets appended to the end of the log. When using HDD, sequential reads and writes are fast, predictable, and heavily optimized by operating systems. Using HDD, sequential disk access can be faster than random memory access and SSD.

While JVM GC overhead can be high, Kafka leans on the OS a lot for caching, which is big, fast and rock solid cache. Also, modern operating systems use all available main memory for disk caching. OS file caches are almost free and don’t have the overhead of the OS. Implementing cache coherency is challenging to get right, but Kafka relies on the rock solid OS for cache coherence. Using the OS for cache also reduces the number of buffer copies. Since Kafka disk usage tends to do sequential reads, the OS read-ahead cache is impressive.

Cassandra, Netty, and Varnish use similar techniques. All of this is explained well in the Kafka Documentation. And, there is a more entertaining explanation at the Varnish site.

Big fast HDDs and long sequential access

Kafka favors long sequential disk access for reads and writes. Like Cassandra, LevelDB, RocksDB, and others Kafka uses a form of log structured storage and compaction instead of an on-disk mutable BTree. Like Cassandra, Kafka uses tombstones instead of deleting records right away.

Since disks these days have somewhat unlimited space and are very fast, Kafka can provide features not usually found in a messaging system like holding on to old messages for a long time. This flexibility allows for interesting applications of Kafka.


Kafka Producer Load Balancing

The producer asks the Kafka broker for metadata about which Kafka broker has which topic partitions leaders thus no routing layer needed. This leadership data allows the producer to send records directly to Kafka broker partition leader.

The Producer client controls which partition it publishes messages to, and can pick a partition based on some application logic. Producers can partition records by key, round-robin or use a custom application-specific partitioner logic.


Kafka Producer Record Batching

Kafka producers support record batching. Batching can be configured by the size of records in bytes in batch. Batches can be auto-flushed based on time.

Batching is good for network IO throughput.

Batching speeds up throughput drastically.

Buffering is configurable and lets you make a tradeoff between additional latency for better throughput. Or in the case of a heavily used system, it could be both better average throughput and reduces overall latency.

Batching allows accumulation of more bytes to send, which equate to few larger I/O operations on Kafka Brokers and increase compression efficiency. For higher throughput, Kafka Producer configuration allows buffering based on time and size. The producer sends multiple records as a batch with fewer network requests than sending each record one by one.



Kafka Producer Batching




Kafka compression

In large streaming platforms, the bottleneck is not always CPU or disk but often network bandwidth. There is even more network bandwidth issues in cloud, containerized and virtualized environments as multiple services could be sharing a NiC card. Also, network bandwidth issues can be problematic when talking datacenter to datacenter or WAN.

Batching is beneficial for efficient compression and network IO throughput.

Kafka provides end-to-end batch compression instead of compressing a record at a time, Kafka efficiently compresses a whole batch of records. The same message batch can be compressed and sent to Kafka broker/server in one go and written in compressed form into the log partition. You can even configure the compression so that no decompression happens until the Kafka broker delivers the compressed records to the consumer.

Kafka supports GZIP, Snappy and LZ4 compression protocols.


Pull vs. Push/Streams

With Kafka consumers pull data from brokers. Other systems brokers push data or stream data to consumers. Messaging is usually a pull-based system (SQS, most MOM use pull). With the pull-based system, if a consumer falls behind, it catches up later when it can.

Since Kafka is pull-based, it implements aggressive batching of data. Kafka like many pull based systems implements a long poll (SQS, Kafka both do). A long poll keeps a connection open after a request for a period and waits for a response.

A pull-based system has to pull data and then process it, and there is always a pause between the pull and getting the data.

Push based push data to consumers (scribe, flume, reactive streams, RxJava, Akka). Push-based or streaming systems have problems dealing with slow or dead consumers. It is possible for a push system consumer to get overwhelmed when its rate of consumption falls below the rate of production. Some push-based systems use a back-off protocol based on back pressure that allows a consumer to indicate it is overwhelmed see reactive streams. This problem of not flooding a consumer and consumer recovery, are tricky when trying to track message acknowledgments.

Push-based or streaming systems can send a request immediately or accumulate requests and send in batches (or a combination based on back pressure). Push-based systems are always pushing data. The consumer can accumulate messages while it is processing data already sent which is an advantage to reduce the latency of message processing. However, if the consumer died when it was behind processing, how does the broker know where the consumer was and when does data get sent again to another Consumer. This problem is not an easy problem to solve. Kafka gets around these complexities by using a pull-based system.


Traditional MOM Consumer Message State Tracking

With most MOM it is the broker’s responsibility to keep track of which messages gets marked consumed. Message tracking is not an easy task. As consumer consumes messages, the broker keeps track of the state.

The goal in most MOM systems is for the broker to delete data quickly after consumption. Remember most MOMs were written when disks were a lot smaller, less capable, and more expensive.

This message tracking is trickier than it sounds (acknowledgment feature), as brokers must maintain lots of states to track per message, sent, acknowledge, and know when to delete or resend the message.

Kafka Consumer Message State Tracking

Remember that Kafka topics get divided into ordered partitions. Each message has an offset in this ordered partition. Each topic partition is consumed by exactly one consumer per consumer group at a time.

This partition layout means, the Broker tracks the offset data not tracked per message like MOM, but only needs the offset of each consumer group, partition offset pair stored. This offset tracking equates to a lot fewer data to track.

The consumer sends location data periodically (consumer group, partition offset pair) to the Kafka broker, and the broker stores this offset data into an offset topic.

The offset style message acknowledgment is much cheaper compared to MOM. Also, consumers are more flexible and can rewind to an earlier offset (replay). If there was a bug, then fix the bug, rewind consumer and replay the topic. This rewind feature is a killer feature of Kafka as Kafka can hold topic log data for a very long time.


Message Delivery Semantics

There are three message delivery semantics: at most once, at least once and exactly once. At most once is messages may be lost but are never redelivered. At least once is messages are never lost but may be redelivered. Exactly once is each message is delivered once and only once. Exactly once is preferred but more expensive, and requires more bookkeeping for the producer and consumer.

Kafka Consumer and Message Delivery Semantics

Recall that all replicas have exactly the same log partitions with the same offsets and the consumer groups maintain its position in the log per topic partition.

To implement “at-most-once” consumer reads a message, then saves its offset in the partition by sending it to the broker, and finally process the message. The issue with “at-most-once” is a consumer could die after saving its position but before processing the message. Then the consumer that takes over or gets restarted would leave off at the last position and message in question is never processed.

To implement “at-least-once” the consumer reads a message, process messages, and finally saves offset to the broker. The issue with “at-least-once” is a consumer could crash after processing a message but before saving last offset position. Then if the consumer is restarted or another consumer takes over, the consumer could receive the message that was already processed. The “at-least-once” is the most common set up for messaging, and it is your responsibility to make the messages idempotent, which means getting the same message twice will not cause a problem (two debits).

To implement “exactly once” on the consumer side, the consumer would need a two-phase commit between storage for the consumer position, and storage of the consumer’s message process output. Or, the consumer could store the message process output in the same location as the last offset.

Kafka offers the first two, and it up to you to implement the third from the consumer perspective.


Kafka Producer Durability and Acknowledgement

Kafka’s offers operational predictability semantics for durability. When publishing a message, a message gets “committed” to the log which means all ISRs accepted the message. This commit strategy works out well for durability as long as at least one replica lives.

The producer connection could go down in middle of send, and producer may not be sure if a message it sent went through, and then the producer resends the message. This resend-logic is why it is important to use message keys and use idempotent messages (duplicates ok). Kafka did not make guarantees of messages not getting duplicated from producer retrying until recently (June 2017).

The producer can resend a message until it receives confirmation, i.e., acknowledgment received.

The producer resending the message without knowing if the other message it sent made it or not, negates “exactly once” and “at-most-once” message delivery semantics.

Producer Durability

The producer can specify durability level. The producer can wait on a message being committed. Waiting for commit ensures all replicas have a copy of the message.

The producer can send with no acknowledgments (0). The producer can send with just get one acknowledgment from the partition leader (1). The producer can send and wait on acknowledgments from all replicas (-1), which is the default.

As of June 2017: the producer can ensure a message or group of messages was sent “exactly once”.

Improved Producer (June 2017 release)

Kafka now supports “exactly once” delivery from producer, performance improvements and atomic write across partitions. They achieve this by the producer sending a sequence id, the broker keeps track if producer already sent this sequence, if producer tries to send it again, it gets an ack for duplicate message, but nothing is saved to log. This improvement requires no API change.

Kafka Producer Atomic Log Writes (June 2017 Release)

Another improvement to Kafka is the Kafka producers having atomic write across partitions. The atomic writes mean Kafka consumers can only see committed logs (configurable). Kafka has a coordinator that writes a marker to the topic log to signify what has been successfully transacted. The transaction coordinator and transaction log maintain the state of the atomic writes.

The atomic writes does require a new producer API for transactions.

Here is an example of using the new producer API.

New Producer API for transactions

producer.initTransaction();

try {
  producer.beginTransaction();
  producer.send(debitAccountMessage);
  producer.send(creditOtherAccountMessage);
  producer.sentOffsetsToTxn(...);
  producer.commitTransaction();
} catch (ProducerFencedTransactionException pfte) {
  ...
  producer.close();
} catch (KafkaException ke) {
  ...
  producer.abortTransaction();
}


Kafka Replication

Kafka replicates each topic’s partitions across a configurable number of Kafka brokers. Kafka’s replication model is by default, not a bolt-on feature like most MOMs as Kafka was meant to work with partitions and multi-nodes from the start. Each topic partition has one leader and zero or more followers.

Leaders and followers are called replicas. A replication factor is the leader node plus all of the followers. Partition leadership is evenly shared among Kafka brokers. Consumers only read from the leader. Producers only write to the leaders.

The topic log partitions on followers are in-sync to leader’s log, ISRs are an exact copy of the leaders minus the to-be-replicated records that are in-flight. Followers pull records in batches from their leader like a regular Kafka consumer.

Kafka Broker Failover

Kafka keeps track of which Kafka brokers are alive. To be alive, a Kafka Broker must maintain a ZooKeeper session using ZooKeeper’s heartbeat mechanism, and must have all of its followers in-sync with the leaders and not fall too far behind.

Both the ZooKeeper session and being in-sync is needed for broker liveness which is referred to as being in-sync. An in-sync replica is called an ISR. Each leader keeps track of a set of “in sync replicas”.

If ISR/follower dies, falls behind, then the leader will remove the follower from the set of ISRs. Falling behind is when a replica is not in-sync after replica.lag.time.max.ms period.

A message is considered “committed” when all ISRs have applied the message to their log. Consumers only see committed messages. Kafka guarantee: committed message will not be lost, as long as there is at least one ISR.

Replicated Log Partitions

A Kafka partition is a replicated log. A replicated log is a distributed data system primitive. A replicated log is useful for implementing other distributed systems using state machines. A replicated log models “coming into consensus” on an ordered series of values.

While a leader stays alive, all followers just need to copy values and ordering from their leader. If the leader does die, Kafka chooses a new leader from its followers which are in-sync. If a producer is told a message is committed, and then the leader fails, then the newly elected leader must have that committed message.

The more ISRs you have; the more there are to elect during a leadership failure.


Kafka and Quorum

Quorum is the number of acknowledgments required and the number of logs that must be compared to elect a leader such that there is guaranteed to be an overlap for availability. Most systems use a majority vote, Kafka does not use a simple majority vote to improve availability.

In Kafka, leaders are selected based on having a complete log. If we have a replication factor of 3, then at least two ISRs must be in-sync before the leader declares a sent message committed. If a new leader needs to be elected then, with no more than 3 failures, the new leader is guaranteed to have all committed messages.

Among the followers there must be at least one replica that contains all committed messages. Problem with majority vote Quorum is it does not take many failures to have inoperable cluster.

Kafka Quorum Majority of ISRs

Kafka maintains a set of ISRs per leader. Only members in this set of ISRs are eligible for leadership election. What the producer writes to partition is not committed until all ISRs acknowledge the write. ISRs are persisted to ZooKeeper whenever ISR set changes. Only replicas that are members of ISR set are eligible to be elected leader.

This style of ISR quorum allows producers to keep working without the majority of all nodes, but only an ISR majority vote. This style of ISR quorum also allows a replica to rejoin ISR set and have its vote count, but it has to be fully re-synced before joining even if replica lost un-flushed data during its crash.

All nodes die at same time. Now what?

Kafka’s guarantee about data loss is only valid if at least one replica is in-sync.

If all followers that are replicating a partition leader die at once, then data loss Kafka guarantee is not valid. If all replicas are down for a partition, Kafka, by default, chooses first replica (not necessarily in ISR set) that comes alive as the leader (config unclean.leader.election.enable=true is default). This choice favors availability to consistency.

If consistency is more important than availability for your use case, then you can set config unclean.leader.election.enable=falsethen if all replicas are down for a partition, Kafka waits for the first ISR member (not first replica) that comes alive to elect a new leader.


Producers pick Durability

Producers can choose durability by setting acks to - none (0), the leader only (1) or all replicas (-1 ).

The acks=all is the default. With all, the acks happen when all current in-sync replicas (ISRs) have received the message.

You can make the trade-off between consistency and availability. If durability over availability is preferred, then disable unclean leader election and specify a minimum ISR size.

The higher the minimum ISR size, the better the guarantee is for consistency. But the higher minimum ISR, the more you reduces availability since partition won’t be unavailable for writes if the size of ISR set is less than the minimum threshold.


Quotas

Kafka has quotas for consumers and producers to limits bandwidth they are allowed to consume. These quotas prevent consumers or producers from hogging up all the Kafka broker resources. The quota is by client id or user. The quota data is stored in ZooKeeper, so changes do not necessitate restarting Kafka brokers.


Kafka Low-Level Design and Architecture Review

How would you prevent a denial of service attack from a poorly written consumer?

Use Quotas to limit the consumer’s bandwidth.

What is the default producer durability (acks) level?

All. Which means all ISRs have to write the message to their log partition.

What happens by default if all of the Kafka nodes go down at once?

Kafka chooses the first replica (not necessarily in ISR set) that comes alive as the leader as unclean.leader.election.enable=trueis default to support availability.

Why is Kafka record batching important?

Optimized IO throughput over the wire as well as to the disk. It also improves compression efficiency by compressing an entire batch.

What are some of the design goals for Kafka?

To be a high-throughput, scalable streaming data platform for real-time analytics of high-volume event streams like log aggregation, user activity, etc.

What are some of the new features in Kafka as of June 2017?

Producer atomic writes, performance improvements and producer not sending duplicate messages.

What is the different message delivery semantics?

There are three message delivery semantics: at most once, at least once and exactly once.

We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka trainingKafka consultingKafka support and helps setting up Kafka clusters in AWS.

Kafka vs JMS, SQS, RabbitMQ Messaging

Is Kafka a queue or a publish and subscribe system? Yes. It can be both.

Kafka is like a queue for consumer groups, which we cover later. Basically, Kafka is a queue system per consumer group so it can do load balancing like JMS, RabbitMQ, etc.

Kafka is like topics in JMS, RabbitMQ, and other MOM systems for multiple consumer groups. Kafka has topics and producers publish to the topics and the subscribers (Consumer Groups) read from the topics.

Kafka offers consumer groups, which is a named group of consumers. A consumer group acts as a subscription. Kafka broadcast to multiple consumer groups using a single queue as each consumer group tracks its own offset into the log. MOM is message oriented middleware, and includes all JMS implementors, and includes, HornetQ, ActiveMQ, RabbitMQ, IBM MQ Series, and Tibco.


When IBM MQ Series was first released, your mobile phone would be considered a super computer.


Kafka vs MOM

By design, Kafka is better suited for scale than traditional MOM systems due to partition topic log. Kafka can divide among Consumers by partition and send those message/records in batches. Kafka handles parallel consumers better than traditional MOM, and can even handle failover for consumers in a consumer group. Failover for most MOM systems are an afterthought.

Also, by moving location (partition offset) in log (message queue) to client/consumer side of the equation instead of the broker, there is less tracking required by Broker and you have more flexible consumers.

Kafka was written with mechanical sympathy, modern hardware, cloud in mind: disk drives are faster and bigger, servers have tons of system memory, and it is easier to spin up servers for scale-out then when traditional MOM systems were first designed and built. Expect messaging systems to catch up and for new messaging systems to come out.

To learn more about Kafka vs. MOM systems read our Kafka Architecture Low Level guide. We talk about the design goals of Kafka and compare it to traditional messaging systems (MOM).

Kafka Architecture: Log Compaction

This post really picks off from our series on Kafka architecture which includes Kafka topics architectureKafka producer architecture,Kafka consumer architecture and Kafka ecosystem architecture.

This article is heavily inspired by the Kafka section on design around log compaction. You can think of it as the cliff notes about Kafka design around log compaction.

Kafka can delete older records based on time or size of a log. Kafka also supports log compaction for record key compaction. Log compaction means that Kafka will keep the latest version of a record and delete the older versions during a log compaction.

Kafka Log Compaction

Log compaction retains at least the last known value for each record key for a single topic partition. Compacted logs are useful for restoring state after a crash or system failure.

They are useful for in-memory services, persistent data stores, reloading a cache, etc. An important use case of data streams is to log changes to keyed, mutable data changes to a database table or changes to object in in-memory microservice.

Log compaction is a granular retention mechanism that retains the last update for each key. A log compacted topic log contains a full snapshot of final record values for every record key not just the recently changed keys.

Kafka log compaction allows downstream consumers to restore their state from a log compacted topic.


Kafka Log Compaction Structure

With a compacted log, the log has head and tail. The head of the compacted log is identical to a traditional Kafka log. New records get appended to the end of the head.

All log compaction works at the tail of the log. Only the tail gets compacted. Records in the tail of the log retain their original offset when written after being rewritten with compaction cleanup.

Kafka Log Compaction Structure





Kafka Log Compaction Basics

All compacted log offsets remain valid, even if record at offset has been compacted away as a consumer will get the next highest offset.

Kafka log compaction also allows for deletes. A message with a key and a null payload acts like a tombstone, a delete marker for that key. Tombstones get cleared after a period. Log compaction periodically runs in the background by recopying log segments. Compaction does not block reads and can be throttled to avoid impacting I/O of producers and consumers.

Kafka Log Compaction Process





Kafka Log Compaction Cleaning

If a Kafka consumer stays caught up to head of the log, it sees every record that is written.

Topic config min.compaction.lag.ms gets used to guarantee a minimum period that must pass before a message can be compacted. The consumer sees all tombstones as long as the consumer reaches head of a log in a period less than the topic config delete.retention.ms (the default is 24 hours). Log compaction will never re-order messages, just remove some. Partition offset for a message never changes.

Any consumer reading from the start of the log sees at least final state of all records in the order they were written.


Kafka Log Cleaner

Recall that a Kafka topic has a log. A log is broken up into partitions and partitions are divided into segments which contain records which have keys and values.

The Kafka Log Cleaner does log compaction. The Log cleaner has a pool of background compaction threads. These threads recopy log segment files, removing older records whose key reappears recently in the log. Each compaction thread chooses topic log that has the highest ratio of log head to log tail. Then the compaction thread recopies the log from start to end removing records whose keys occur later in the log.

As the log cleaner cleans log partition segments, the segments get swapped into the log partition immediately replacing the older segments. This way compaction does not require double the space of the entire partition as additional disk space required is just one additional log partition segment - divide and conquer.


Topic Config for Log Compaction

To turn on compaction for a topic use topic config log.cleanup.policy=compact.

To set delay to start compacting records after they are written use topic config log.cleaner.min.compaction.lag.ms. Records won’t get compacted until after this period. The setting gives consumers time to get every record.


Log Compaction Review

What are three ways Kafka can delete records?

Kafka can delete older records based on time or size of a log. Kafka also supports log compaction for record key compaction.

What is log compaction good for?

Since Log compaction retains last known value it is a full snapshot of the latest records it is useful for restoring state after a crash or system failure for an in-memory service, a persistent data store, or reloading a cache. It allows downstream consumers to restore their state.

What is the structure of a compacted log? Describe the structure.

With a compacted log, the log has head and tail. The head of the compacted log is identical to a traditional Kafka log. New records get appended to the end of the head. All log compaction works at the tail of the compacted log.

After compaction, do log record offsets change? No.

What is a partition segment?

Recall that a topic has a log. A topic log is broken up into partitions and partitions are divided into segment files which contain records which have keys and values. Segment files allow for divide and conquer when it comes to log compaction. A segment file is part of the partition. As the log cleaner cleans log partition segments, the segments get swapped into the log partition immediately replacing the older segment files. This way compaction does not require double the space of the entire partition as additional disk space required is just one additional log partition segment.

The Kafka Ecosystem - Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry

The core of Kafka is the brokers, topics, logs, partitions, and cluster. The core also consists of related tools like MirrorMaker. The aforementioned is Kafka as it exists in Apache.

The Kafka ecosystem consists of Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry. Most of the additional pieces of the Kafka ecosystem comes from Confluent and is not part of Apache.

Kafka Stream is the Streams API to transform, aggregate, and process records from a stream and produces derivative streams. Kafka Connect is the connector API to create reusable producers and consumers (e.g., stream of changes from DynamoDB). The Kafka REST Proxy is used to producers and consumer over REST (HTTP). The Schema Registry manages schemas using Avro for Kafka records. The Kafka MirrorMaker is used to replicate cluster data to another cluster.



Kafka Ecosystem: Diagram of Connect Source, Connect Sink, and Kafka Streams

Kafka Ecosystem: Diagram of Connect Source, Connect Sink, Kafka Streams

Kafka Connect Sources are sources of records. Kafka Connect Sinks are destination for records. 

Kafka Ecosystem: Kafka REST Proxy and Confluent Schema Registry

Kafka Ecosystem: Kafka REST Proxy and Confluent Schema Registry 

Kafka Streams - Kafka Streams for Stream Processing

The Kafka Stream API builds on core Kafka primitives and has a life of its own. Kafka Streams enables real-time processing of streams. Kafka Streams supports stream processors. A stream processor takes continual streams of records from input topics, performs some processing, transformation, aggregation on input, and produces one or more output streams. For example, a video player application might take an input stream of events of videos watched, and videos paused, and output a stream of user preferences and then gear new video recommendations based on recent user activity or aggregate activity of many users to see what new videos are hot. Kafka Stream API solves hard problems with out of order records, aggregating across multiple streams, joining data from multiple streams, allowing for stateful computations, and more.


Kafka Ecosystem: Kafka Streams and Kafka Connect

Kafka Ecosystem: Kafka Streams and Kafka Connect



Kafka Ecosystem Review

What is Kafka Streams?

Kafka Streams enable real-time processing of streams. It can aggregate across multiple streams, joining data from multiple streams, allowing for stateful computations, and more.

What is Kafka Connect?

Kafka Connect is the connector API to create reusable producers and consumers (e.g., stream of changes from DynamoDB). Kafka Connect Sources are sources of records. Kafka Connect Sinks are a destination for records.

What is the Schema Registry?

The Schema Registry manages schemas using Avro for Kafka records.

What is Kafka Mirror Maker?

The Kafka MirrorMaker is used to replicate cluster data to another cluster.

When might you use Kafka REST Proxy?

The Kafka REST Proxy is used to producers and consumer over REST (HTTP). You could use it for easy integration of existing code bases.


Avro Introduction for Big Data and Data Streaming Architectures

Apache Avro™ is a data serialization system. Avro provides data structures, binary data format, container file format to store persistent data, and provides RPC capabilities. Avro does not require code generation to use and integrates well with JavaScript, Python, Ruby, C, C#, C++ and Java. Avro gets used in the Hadoop ecosystem as well as by Kafka.

Avro is similar to Thrift, Protocol Buffers, JSON, etc. Avro does not require code generation. Avro needs less encoding as part of the data since it stores names and types in the schema reducing duplication. Avro supports the evolution of schemas.

Why Avro for Kafka and Hadoop?

Avro supports direct mapping to JSON as well as a compact binary format. It is a very fast serialization format. Avro is widely used in the Hadoop ecosystem. Avro supports polyglot bindings to many programming languages and a code generation for static languages. For dynamically typed languages, code generation is not needed. Another key advantage of Avro is its support of evolutionary schemas which supports compatibility checks, and allows evolving your data over time.

Avro supports platforms like Kafka that has multiple Producers and Consumers which evolve over time. Avro schemas help keep your data clean and robust.

There was a trend towards schema-less as part of the NoSQL, but that pendulum has swung back a bit e.g., Cassandra has schemas REST/JSON was schema-less and IDL-less but not anymore with Swagger, API gateways, and RAML. Now the trend is more towards schemas that can evolve and Avro fits well in this space.

Avro Schema provides Future Proof Robustness

Streaming architecture like Kafka supports decoupling by sending data in streams to an unknown number of consumers. Streaming architecture is challenging as Consumers and Producers evolve on different timelines. Producers send a stream of records that zero to many Consumers read. Not only are there multiple consumers but data might end up in Hadoop or some other store and used for use cases you did not even imagine. Schemas help future proof your data and make it more robust. Supporting all use cases future (Big Data), past (older Consumers) and current use cases is not easy without a schema. Avro schema with its support for evolution is essential for making the data robust for streaming architectures like Kafka, and with the metadata that schema provides, you can reason on the data. Having a schema provides robustness in providing meta-data about the data stored in Avro records which are self-documenting the data.

Avro provides future usability of data

Data record format compatibility is a hard problem to solve with streaming architecture and Big Data. Avro schemas are not a cure-all, but essential for documenting and modeling your data. Avro Schema definitions capture a point in time of what your data looked like when it recorded since the schema is saved with the data. Data will evolve. New fields are added. Since streams often get recorded in data lakes like Hadoop and those records can represent historical data, not operational data, it makes sense that data streams and data lakes have a less rigid, more evolving schema than the schema of the operational relational database or Cassandra cluster. It makes sense to have a rigid schema for operational data, but not data that ends up in a data lake.

With a streaming platform, consumers and producers can change all of the time and evolve quite a bit. Producers can have Consumers that they never know. You can’t test a Consumer that you don’t know. For agility sakes, you don’t want to update every Consumer every time a Producers adds a field to a Record. These types of updates are not feasible without support for Schema.

Avro Schema

Avro data format (wire format and file format) is defined by Avro schemas. When deserializing data, the schema is used. Data is serialized based on the schema, and schema is sent with data or in the case of files stored with the data. Avro data plus schema is fully self-describing data format.

When Avro files store data it also stores schema. Avro RPC is also based on schema, and IDL. Part of the RPC protocol exchanges schemas as part of the handshake. Avro schemas and IDL are written in JSON.

Let’s take a look at an example Avro schema.

./src/main/avro/com/cloudurable/phonebook/Employee.avsc

Example schema for an Employee record

{"namespace": "com.cloudurable.phonebook",

  "type": "record",
  "name": "Employee",

    "fields": [

        {"name": "firstName", "type": "string"},

        {"name": "lastName", "type": "string"},

        {"name": "age",  "type": "int"},

        {"name": "phoneNumber",  "type": "string"}
  
    ]
}

The above defines an employee record with firstName, lastName, age and phoneNumber.

Avro schema generation tools

Avro comes with a set of tools for generating Java classes for Avro types that you define in Avro schema. There are plugins for Maven and Gradle to generate code based on Avro schemas.

This gradle-avro-plugin is a Gradle plugin that uses Avro tools to do Java code generation for Apache Avro. This plugin supports Avro schema files (.avsc), and Avro RPC IDL (.avdl). For Kafka you only need avsc schema files.

build.gradle - example using gradle-avro-plugin

plugins {
    id "com.commercehub.gradle.plugin.avro" version "0.9.0"
}

group 'cloudurable'
version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8

dependencies {
    compile "org.apache.avro:avro:1.8.1"
    testCompile group: 'junit', name: 'junit', version: '4.11'
}

repositories {
    jcenter()
    mavenCentral()
}

avro {
    createSetters = false
    fieldVisibility = "PRIVATE"
}

Notice that we did not generate setter methods, and we made the fields private. This makes the instances somewhat immutable.

Running gradle build will generate the Employee.java.

./build/generated-main-avro-java/com/cloudurable/phonebook/Employee.java

Generated Avro code


package com.cloudurable.phonebook;

import org.apache.avro.specific.SpecificData;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Employee extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = -6112285611684054927L;
  public static final org.apache.avro.Schema SCHEMA$ = new    
                        org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Employee\"...");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  private java.lang.String firstName;
  private java.lang.String lastName;
  private int age;
  private java.lang.String phoneNumber;
  ...

The gradle plugin calls the Avro utilities which generates the files and puts them under build/generated-main-avro-java

Let’s use the generated class as follows to construct an Employee instance.

Using the new Employee class

Employee bob = Employee.newBuilder().setAge(35)
        .setFirstName("Bob")
        .setLastName("Jones")
        .setPhoneNumber("555-555-1212")
        .build();

assertEquals("Bob", bob.getFirstName());

The Employee class has a constructor and has a builder. We can use the builder to build a new Employee instance.

Next we want to write the Employees to disk.

Writing a list of employees to an Avro file

final List<Employee> employeeList = ...
final DatumWriter<Employee> datumWriter = new SpecificDatumWriter<>(Employee.class);
final DataFileWriter<Employee> dataFileWriter = new DataFileWriter<>(datumWriter);

try {
    dataFileWriter.create(employeeList.get(0).getSchema(),
            new File("employees.avro"));
    employeeList.forEach(employee -> {
        try {
            dataFileWriter.append(employee);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    });
} finally {
    dataFileWriter.close();
}

The above shows serializing an Employee list to disk. In Kafka, we will not be writing to disk directly. We are just showing how so you have a way to test Avro serialization, which is helpful when debugging schema incompatibilities. Note we create a DatumWriter, which converts Java instance into an in-memory serialized format. SpecificDatumWriter is used with generated classes like Employee.DataFileWriter writes the serialized records to the employee.avro file.

Now let’s demonstrate how to read data from an Avro file.

Reading a list of employees from an avro file

final File file = new File("employees.avro");
final List<Employee> employeeList = new ArrayList<>();
final DatumReader<Employee> empReader = new SpecificDatumReader<>(Employee.class);
final DataFileReader<Employee> dataFileReader = new DataFileReader<>(file, empReader);

while (dataFileReader.hasNext()) {
    employeeList.add(dataFileReader.next(new Employee()));
}

The above deserializes employees from the employees.avro file into a java.util.List of Employee instances. Deserializing is similar to serializing but in reverse. We create a SpecificDatumReader to converts in-memory serialized items into instances of our generated Employee class. The DatumReader reads records from the file by calling next. Another way to read is using forEach as follows:

Reading a list of employees from an avro file using forEach

final DataFileReader<Employee> dataFileReader = new DataFileReader<>(file, empReader);

dataFileReader.forEach(employeeList::add);

You can use a GenericRecord instead of generating an Employee class as follows.

Using GenericRecord to create an Employee record

final String schemaLoc = "src/main/avro/com/cloudurable/phonebook/Employee.avsc";
final File schemaFile = new File(schemaLoc);
final Schema schema = new Schema.Parser().parse(schemaFile);

GenericRecord bob = new GenericData.Record(schema);
bob.put("firstName", "Bob");
bob.put("lastName", "Smith");
bob.put("age", 35);
assertEquals("Bob", bob.get("firstName"));

You can write to Avro files using GenericRecords as well.

Writing GenericRecords to an Avro file

final List<GenericRecord> employeeList = new ArrayList<>();


final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);

try {
    dataFileWriter.create(employeeList.get(0).getSchema(),
            new File("employees2.avro"));
    employeeList.forEach(employee -> {
        try {
            dataFileWriter.append(employee);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    });
} finally {
    dataFileWriter.close();
}

You can read from Avro files using GenericRecords as well.

Reading GenericRecords from an Avro file

final File file = new File("employees2.avro");
final List<GenericRecord> employeeList = new ArrayList<>();
final DatumReader<GenericRecord> empReader = new GenericDatumReader<>();
final DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, empReader);

while (dataFileReader.hasNext()) {
    employeeList.add(dataFileReader.next(null));
}

employeeList.forEach(System.out::println);

Avro will validate the data types when it serializes and deserializes the data.

Using the wrong type

GenericRecord employee = new GenericData.Record(schema);
employee.put("firstName", "Bob" + index);
employee.put("lastName", "Smith"+ index);
//employee.put("age", index % 35 + 25);
employee.put("age", "OLD");

Stack trace from above


org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException:
java.lang.String cannot be cast to java.lang.Number

    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:308)
    at com.cloudurable.phonebook.EmployeeTestNoGen.lambda$testWrite$1(EmployeeTestNoGen.java:71)
    at java.util.ArrayList.forEach(ArrayList.java:1249)
    at com.cloudurable.phonebook.EmployeeTestNoGen.testWrite(EmployeeTestNoGen.java:69)
    ...
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Number
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:117)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:153)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:143)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:105)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:60)
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:302)

If you left out a required field like firstName, then you would get this.

Stack trace from leaving out firstName

Caused by: java.lang.NullPointerException: null of string in field firstName of com.cloudurable.phonebook.Employee
    at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:132)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:126)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:60)

In the Avro schema, you can define Records, Arrays, Enums, Unions, Maps and you can use primitive types like String, Int, Boolean, Decimal, Timestamp, Date, and more.

The Avro schema and IDL specification document describes all of the supported types.

Let’s add to the Employee schema and show some of the different types that Avro supports.

####

 {"namespace": "com.cloudurable.phonebook",
  "type": "record",
  "name": "Employee",
  "fields": [
    {"name": "firstName", "type": "string"},
    {"name": "nickName", "type": ["null", "string"], "default" : null},
    {"name": "lastName", "type": "string"},
    {"name": "age",  "type": "int"},
    {"name": "emails", "default":[], "type":{"type": "array", "items": "string"}},
    {"name": "phoneNumber",  "type":
      [ "null",
        { "type": "record",   "name": "PhoneNumber",
        "fields": [
          {"name": "areaCode", "type": "string"},
          {"name": "countryCode", "type": "string", "default" : ""},
          {"name": "prefix", "type": "string"},
          {"name": "number", "type": "string"}
        ]
        }
      ]
    },
    {"name":"status", "default" :"SALARY", "type": { "type": "enum", "name": "Status",
              "symbols" : ["RETIRED", "SALARY", "HOURLY", "PART_TIME"]}
    }
  ]
}

Avro record attributes are as follows:

  • name: name of the record (required).
  • namespace: equates to packages or modules
  • doc: documentation for future user of this schema
  • aliases: array aliases (alias names)
  • fields: an array of fields

Avro field attributes are as follows:

  • name: name of the field (required)
  • doc: description of field (important for future usage)
  • type: JSON object defining a schema, or a JSON string naming a record definition (required)
  • default: Default value for this field
  • order: specifies sort ordering of record (optional, ascending, descending, ignore)
    • aliases: array of alternate names

The doc attribute is imperative for future usage as it documents what the fields and records are supposed to represent. Remember that this data can outlive systems that produced it. A self-documenting schema is critical for a robust system.

The above has examples of default values, arrays, primitive types, Records within records, enums, and more.

PhoneNumber record


package com.cloudurable.phonebook;

import org.apache.avro.specific.SpecificData;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class PhoneNumber extends org.apache.avro.specific.SpecificRecordBase ...{
  private static final long serialVersionUID = -3138777939618426199L;
  public static final org.apache.avro.Schema SCHEMA$ =
                   new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":...
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
   private java.lang.String areaCode;
   private java.lang.String countryCode;
   private java.lang.String prefix;
   private java.lang.String number;

Status enum

package com.cloudurable.phonebook;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public enum Status {
  RETIRED, SALARY, HOURLY, PART_TIME  ;
  ...

Tips for using Avro with Kafka and Hadoop

Avoid advanced Avro features which are not supported by polyglot language mappings. Think simple data transfer objects or structs. Don’t use magic strings, use enums instead as they provide better validation.

Document all records and fields in the schema. Documentation is imperative for future usage. Documents what the fields and records represent. A self-documenting schema is critical for a robust streaming system and Big Data. Don’t use complex union types. Use Unions for nullable fields only and avoid using recursive types at all costs.

Use reasonable field names and use them consistently with other records. Example, employee_id instead of id and then use employee_id in all other records that have a field that refer to the employee_id from Employee.

Avro Conclusion

Avro provides fast, compact data serialization. It supports data structures like Records, Maps, Array, and basic types. You can use it direct or use Code Generation. Avro allows schema support to Kafka which we will demonstrate in another article.

Kafka Tutorial: Kafka, Avro Serialization and the Schema Registry

Confluent Schema Registry stores Avro Schemas for Kafka producers and consumers. The Schema Registry and provides RESTful interface for managing Avro schemas It allows the storage of a history of schemas which are versioned. the Confluent Schema Registrysupports checking schema compatibility for Kafka.  You can configure compatibility setting which supports the evolution of schemas using Avro. Kafka Avro serialization project provides serializers. Kafka Producers and Consumers that use Kafka Avro serialization handle schema management and serialization of records using Avro and the Schema Registry. When using the Confluent Schema Registry, Producers don’t have to send schema just the schema id which is unique. The consumer uses the schema id to look up the full schema from the Confluent Schema Registry if not already cached. Since you don’t have to send the schema with each set of records, this saves time. Not sending the schema with each record or batch of records, speeds up the serialization as only the id of the schema is sent.

If you have never used Avro before, please read Avro Introduction for Big Data and Data Streams.

This article is going to cover what is the Schema Registry and cover why you want to use it with Kafka. We drill down into understanding Avro schema evolution and setting up and using Schema Registry with Kafka Avro Serializers. We show how to manage Avro Schemas with REST interface of the Schema Registry and then how to write Avro Serializer based Producers and Avro Deserializer based Consumers for Kafka.

The Kafka Producer creates a record/message, which is an Avro record. The record contains a schema id and data. With Kafka Avro Serializer, the schema is registered if needed and then it serializes the data and schema id. The Kafka Avro Serializer keeps a cache of registered schemas from Schema Registry their schema ids.

Consumers receive payloads and deserialize them with Kafka Avro Deserializers which use the Confluent Schema Registry. Deserializer looks up the full schema from cache or Schema Registry based on id.

Why Schema Registry?

Consumer has its schema which could be different than the producers. The consumer schema is the schema the consumer is expecting the record/message to conform to. With the Schema Registry a compatibility check is performed and if the two schemas don’t match but are compatible, then the payload transformation happens via Avro Schema Evolution. Kafka records can have a Key and a Value and both can have a schema.

Schema Registry Operations

The Schema Registry can store schemas for keys and values of Kafka records. It can also list schemas by subject. It can list all versions of a subject (schema). It can retrieve a schema by version or id. It can get the latest version of a schema. Importantly, the Schema Registry can check to see if schema is compatible with a certain version. There is a compatibility level (BACKWARDS, FORWARDS, FULL, NONE) setting for the Schema Registry and an individual subject. You can manage schemas via a REST API with the Schema registry.

Schema Registry Schema Compatibility Settings

Backward compatibility means data written with older schema is readable with a newer schema. Forward compatibility means data written with newer schema is readable with old schemas. Full compatibility means a new version of a schema is backward and forward compatible. None disables schema validation and it not recommended. If you set the level to none then Schema Registry just stores the schema and Schema will not be validated for compatibility at all.

Schema Registry Config

The Schema compatibility checks can is configured globally or per subject.

The compatibility checks value is one of the following:

  • NONE - don’t check for schema compatibility
  • FORWARD - check to make sure last schema version is forward compatible with new schemas
  • BACKWARDS (default) - make sure new schema is backwards compatible with latest
  • FULL - make sure new schema is forwards and backwards compatible from latest to new and from new to latest

Schema Evolution

If an Avro schema is changed after data has been written to store using an older version of that schema, then Avro might do a Schema Evolution when you try to read that data.

From Kafka perspective, Schema evolution happens only during deserialization at Consumer (read). If Consumer’s schema is different from Producer’s schema, then value or key is automatically modified during deserialization to conform to consumers reader schema if possible.

Avro schema evolution is an automatic transformation of Avro schema between the consumer schema version and what the schema the producer put into the Kafka log. When Consumer schema is not identical to the Producer schema used to serialize the Kafka Record, then a data transformation is performed on the Kafka record’s key or value. If the schemas match then no need to do a transformation

Allowed Modification During Schema Evolution

You can add a field with a default to a schema. You can remove a field that had a default value. You can change a field’s order attribute. You can change a field’s default value to another value or add a default value to a field that did not have one. You can remove or add a field alias (keep in mind that this could break some consumers that depend on the alias). You can change a type to a union that contains original type. If you do any of the above, then your schema can use Avro’s schema evolution when reading with an old schema.

Rules of the Road for modifying Schema

If you want to make your schema evolvable, then follow these guidelines. Provide a default value for fields in your schema as this allows you to delete the field later. Never change a field’s data type. When adding a new field to your schema, you have to provide a default value for the field. Don’t rename an existing field (use aliases instead). You can add an alias

Let’s use an example to talk about this. The following example is from our Avro tutorial.

Employee example Avro Schema

{"namespace": "com.cloudurable.phonebook",
  "type": "record",
  "name": "Employee",
  "doc" : "Represents an Employee at a company",
  "fields": [
    {"name": "firstName", "type": "string", "doc": "The persons given name"},
    {"name": "nickName", "type": ["null", "string"], "default" : null},
    {"name": "lastName", "type": "string"},
    {"name": "age",  "type": "int", "default": -1},
    {"name": "emails", "default":[], "type":{"type": "array", "items": "string"}},
    {"name": "phoneNumber",  "type":
    [ "null",
      { "type": "record",   "name": "PhoneNumber",
        "fields": [
          {"name": "areaCode", "type": "string"},
          {"name": "countryCode", "type": "string", "default" : ""},
          {"name": "prefix", "type": "string"},
          {"name": "number", "type": "string"}
        ]
      }
    ]
    },
    {"name":"status", "default" :"SALARY", "type": { "type": "enum", "name": "Status",
      "symbols" : ["RETIRED", "SALARY", "HOURLY", "PART_TIME"]}
    }
  ]
}

Avro Schema Evolution Scenario

Let’s say our Employee record did not have an age in version 1 of the schema and then later we decided to add an age field with a default value of -1. Now let’s say we have a Producer using version 2 of the schema with age, and a Consumer using version 1 with no age.

The Producer uses version 2 of the Employee schema and creates a com.cloudurable.Employee record, and sets age field to 42, then sends it to Kafka topic new-employees. The Consumer consumes records from new-employees using version 1 of the Employee Schema. Since Consumer is using version 1 of the schema, the age field gets removed during deserialization.

The same consumer modifies some records and then writes the record to a NoSQL store. When the Consumer does this, the age field is missing from the record that it writes to the NoSQL store. Another client using version 2 of the schema which has the age, reads the record from the NoSQL store. The age field is missing from the record because the Consumer wrote it with version 1, thus the client reads the record and the age is set to default value of -1.

If you added the age and it was not optional, i.e., the age field did not have a default, then the Schema Registry could reject the schema, and the Producer could never it add it to the Kafka log.

Using REST Schema Registry REST API

Recall that the Schema Registry allows you to manage schemas using the following operations:

  • store schemas for keys and values of Kafka records
  • List schemas by subject.
  • list all versions of a subject (schema).
  • Retrieves a schema by version
  • Retrieves a schema by id
  • Retrieve the latest version of a schema
  • Perform compatibility checks
  • Set compatibility level globally
  • Set compatibility level globally

Recall that all of this is available via a REST API with the Schema Registry.

To post a new schema you could do the following:

Posting a new schema

curl -X POST -H "Content-Type:
application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": …}’ \
    http://localhost:8081/subjects/Employee/versions

To list all of the schemas

curl -X GET http://localhost:8081/subjects

If you have a good HTTP client, you can basically perform all of the above operations via the REST interface for the Schema Registry. I wrote a little example to do this so I could understand the Schema registry a little better using the OkHttp client from Square (com.squareup.okhttp3:okhttp:3.7.0+) as follows:

Using REST endpoints to try out all of the Schema Registry options

package com.cloudurable.kafka.schema;

import okhttp3.*;

import java.io.IOException;

public class SchemaMain {

    private final static MediaType SCHEMA_CONTENT =
            MediaType.parse("application/vnd.schemaregistry.v1+json");

    private final static String EMPLOYEE_SCHEMA = "{\n" +
            "  \"schema\": \"" +
            "  {" +
            "    \\\"namespace\\\": \\\"com.cloudurable.phonebook\\\"," +
            "    \\\"type\\\": \\\"record\\\"," +
            "    \\\"name\\\": \\\"Employee\\\"," +
            "    \\\"fields\\\": [" +
            "        {\\\"name\\\": \\\"fName\\\", \\\"type\\\": \\\"string\\\"}," +
            "        {\\\"name\\\": \\\"lName\\\", \\\"type\\\": \\\"string\\\"}," +
            "        {\\\"name\\\": \\\"age\\\",  \\\"type\\\": \\\"int\\\"}," +
            "        {\\\"name\\\": \\\"phoneNumber\\\",  \\\"type\\\": \\\"string\\\"}" +
            "    ]" +
            "  }\"" +
            "}";

    public static void main(String... args) throws IOException {

        System.out.println(EMPLOYEE_SCHEMA);

        final OkHttpClient client = new OkHttpClient();

        //POST A NEW SCHEMA
        Request request = new Request.Builder()
                .post(RequestBody.create(SCHEMA_CONTENT, EMPLOYEE_SCHEMA))
                .url("http://localhost:8081/subjects/Employee/versions")
                .build();

        String output = client.newCall(request).execute().body().string();
        System.out.println(output);

        //LIST ALL SCHEMAS
        request = new Request.Builder()
                .url("http://localhost:8081/subjects")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);


        //SHOW ALL VERSIONS OF EMPLOYEE
        request = new Request.Builder()
                .url("http://localhost:8081/subjects/Employee/versions/")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);

        //SHOW VERSION 2 OF EMPLOYEE
        request = new Request.Builder()
                .url("http://localhost:8081/subjects/Employee/versions/2")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);

        //SHOW THE SCHEMA WITH ID 3
        request = new Request.Builder()
                .url("http://localhost:8081/schemas/ids/3")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);


        //SHOW THE LATEST VERSION OF EMPLOYEE 2
        request = new Request.Builder()
                .url("http://localhost:8081/subjects/Employee/versions/latest")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);



        //CHECK IF SCHEMA IS REGISTERED
        request = new Request.Builder()
                .post(RequestBody.create(SCHEMA_CONTENT, EMPLOYEE_SCHEMA))
                .url("http://localhost:8081/subjects/Employee")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);


        //TEST COMPATIBILITY
        request = new Request.Builder()
                .post(RequestBody.create(SCHEMA_CONTENT, EMPLOYEE_SCHEMA))
                .url("http://localhost:8081/compatibility/subjects/Employee/versions/latest")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);

        // TOP LEVEL CONFIG
        request = new Request.Builder()
                .url("http://localhost:8081/config")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);


        // SET TOP LEVEL CONFIG
        // VALUES are none, backward, forward and full
        request = new Request.Builder()
                .put(RequestBody.create(SCHEMA_CONTENT, "{\"compatibility\": \"none\"}"))
                .url("http://localhost:8081/config")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);

        // SET CONFIG FOR EMPLOYEE
        // VALUES are none, backward, forward and full
        request = new Request.Builder()
                .put(RequestBody.create(SCHEMA_CONTENT, "{\"compatibility\": \"backward\"}"))
                .url("http://localhost:8081/config/Employee")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);



    }
}

I suggest running the example and trying to force incompatible schemas to the Schema Registry and note the behavior for the various compatibility settings.

Running Schema Registry

$ cat ~/tools/confluent-3.2.1/etc/schema-registry/schema-registry.properties

listeners=http://0.0.0.0:8081
kafkastore.connection.url=localhost:2181
kafkastore.topic=_schemas
debug=false

~/tools/confluent-3.2.1/bin/schema-registry-start  ~/tools/confluent-3.2.1/etc/schema-registry/schema-registry.properties

Writing Consumers and Producers that use Kafka Avro Serializers and the Schema Registry

Now let’s cover writing consumers and producers that use Kafka Avro Serializers which in turn use the Schema Registry and Avro.

We will need to start up the Schema Registry server pointing to our Zookeeper cluster. Then we will need to import the Kafka Avro Serializer and Avro Jars into our gradle project. You will then need to configure the Producer to use Schema Registry and the KafkaAvroSerializer. To write the consumer, you will need to configure it to use Schema Registry and to use the KafkaAvroDeserializer.

Here is our build file which shows the Avro jar files and such that we need.

Gradle build file for Kafka Avro Serializer examples

plugins {
    id "com.commercehub.gradle.plugin.avro" version "0.9.0"
}

group 'cloudurable'
version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8

dependencies {
    compile "org.apache.avro:avro:1.8.1"
    compile 'com.squareup.okhttp3:okhttp:3.7.0'
    testCompile 'junit:junit:4.11'
    compile 'org.apache.kafka:kafka-clients:0.10.2.0'
    compile 'io.confluent:kafka-avro-serializer:3.2.1'
}
repositories {
    jcenter()
    mavenCentral()
    maven {
        url "http://packages.confluent.io/maven/"
    }
}
avro {
    createSetters = false
    fieldVisibility = "PRIVATE"
}

Notice that we include the Kafka Avro Serializer lib (io.confluent:kafka-avro-serializer:3.2.1) and the Avro lib (org.apache.avro:avro:1.8.1).

To learn more about the Gradle Avro plugin, please read this article on using Avro.

Writing a Producer

Next, let’s write the Producer as follows.

Producer that uses Kafka Avro Serialization and Kafka Registry

package com.cloudurable.kafka.schema;

import com.cloudurable.phonebook.Employee;
import com.cloudurable.phonebook.PhoneNumber;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;

import java.util.Properties;
import java.util.stream.IntStream;

public class AvroProducer {

    private static Producer<Long, Employee> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                LongSerializer.class.getName());

        // Configure the KafkaAvroSerializer.
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                KafkaAvroSerializer.class.getName());

        // Schema Registry location.
        props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                "http://localhost:8081");

        return new KafkaProducer<>(props);
    }

    private final static String TOPIC = "new-employees";

    public static void main(String... args) {

        Producer<Long, Employee> producer = createProducer();

        Employee bob = Employee.newBuilder().setAge(35)
                .setFirstName("Bob")
                .setLastName("Jones")
                .setPhoneNumber(
                        PhoneNumber.newBuilder()
                                .setAreaCode("301")
                                .setCountryCode("1")
                                .setPrefix("555")
                                .setNumber("1234")
                                .build())
                .build();

        IntStream.range(1, 100).forEach(index->{
            producer.send(new ProducerRecord<>(TOPIC, 1L * index, bob));

        });

        producer.flush();
        producer.close();
    }

}

Notice that we configure the schema registry and the KafkaAvroSerializer as part of the Producer setup.

// Configure the KafkaAvroSerializer.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                KafkaAvroSerializer.class.getName());

// Schema Registry location.        props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                "http://localhost:8081");

Then we use the Producer as expected.

Writing a Consumer

Next we have to write the Consumer.

Consumer that uses Kafka Avro Serialization and Schema Registry

package com.cloudurable.kafka.schema;

import com.cloudurable.phonebook.Employee;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;

public class AvroConsumer {

    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String TOPIC = "new-employees";

    private static Consumer<Long, Employee> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                LongDeserializer.class.getName());

        //Use Kafka Avro Deserializer.
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                KafkaAvroDeserializer.class.getName());  //<----------------------

        //Use Specific Record or else you get Avro GenericRecord.
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");


        //Schema registry location.
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                "http://localhost:8081"); //<----- Run Schema Registry on 8081


        return new KafkaConsumer<>(props);
    }





    public static void main(String... args) {

        final Consumer<Long, Employee> consumer = createConsumer();
        consumer.subscribe(Collections.singletonList(TOPIC));

        IntStream.range(1, 100).forEach(index -> {

            final ConsumerRecords<Long, Employee> records =
                    consumer.poll(100);

            if (records.count() == 0) {
                System.out.println("None found");
            } else records.forEach(record -> {

                Employee employeeRecord = record.value();

                System.out.printf("%s %d %d %s \n", record.topic(),
                        record.partition(), record.offset(), employeeRecord);
            });
        });
    }


}

Notice just like the producer we have to tell the consumer where to find the Registry, and we have to configure the Kafka Avro Deserializer.

Configuring Schema Registry for Consumer

//Use Kafka Avro Deserializer.

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                KafkaAvroDeserializer.class.getName());  

//Use Specific Record or else you get Avro GenericRecord.
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

//Schema registry location.        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                "http://localhost:8081"); //<----- Run Schema Registry on 8081

An additional step is we have to tell it to use the generated version of the Employee object. If we did not, then it would use Avro GenericRecord instead of our generated Employee object, which is a SpecificRecord. To learn more about using GenericRecord and generating code from Avro read the Avro Kafka tutorial as it has examples of both.

To run the above example, you need to startup Kafka and Zookeeper. To learn how to do this if you have not done it before see Kafka Tutorial. Essentially, there is a startup script for Kafka and ZooKeeper like there was with the Schema Registry and there is default configuration, you pass the default configuration to the startup scripts, and Kafka is running locally on your machine.

Running Zookeeper and Kafka

kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties &

kafka/bin/kafka-server-start.sh kafka/config/server.properties

Schema Registry Conclusion

Confluent provides Schema Registry to manage Avro Schemas for Kafka Consumers and Producers. Avro provides Schema Migration which is necessary for streaming and big data architectures.
Confluent uses Schema compatibility checks to see if the Producer’s schema and Consumer’s schemas are compatible and to do Schema evolution if needed. You use KafkaAvroSerializer from the Producer and point to the Schema Registry. You use KafkaAvroDeserializer from Consumer and point to the Schema Registry.

Related content


About Cloudurable

We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka trainingKafka consultingKafka support and helps setting up Kafka clusters in AWS.


Ċ
Rick Hightower,
May 22, 2017, 12:43 AM
Ċ
Rick Hightower,
May 26, 2017, 3:58 PM
Ċ
Rick Hightower,
May 22, 2017, 12:44 AM
Ċ
Rick Hightower,
May 22, 2017, 12:44 AM
Comments