Introduction

Nakadi Event Broker

The goal of Nakadi (ნაკადი means “stream” in Georgian) is to provide an event broker infrastructure to:

  • Abstract event delivery via a secured RESTful API.

    This allows microservices teams to maintain service boundaries, and not directly depend on any specific message broker technology. Access can be managed individually for every Event Type and secured using OAuth and custom authorization plugins.

  • Enable convenient development of event-driven applications and asynchronous microservices.

    Event types can be defined with Event type schemas and managed via a registry. All events will be validated against the schema before publishing the event type. It allows to granite the data quality and data consistency for the data consumers.

  • Efficient low latency event delivery.

    Once a publisher sends an event using a simple HTTP POST, consumers can be pushed to via a streaming HTTP connection, allowing near real-time event processing. The consumer connection has keepalive controls and support for managing stream offsets using subscriptions.

Read more to understand The big picture Architecture for data integration

Watch the talk Data Integration in the World of Microservices

Development status

Nakadi is high-load production ready. Zalando uses Nakadi as its central Event Bus Service. Nakadi reliably handles the traffic from thousands event types with the throughput of more than hundreds gigabytes per second. The project is in active development. See the changelog

Features

  • Stream:
    • REST abstraction over Kafka-like queues.
    • CRUD for event types.
    • Event batch publishing.
    • Low-level interface.
      • manual client side partition management is needed
      • no support of commits
    • High-level interface (Subscription API).
      • automatic redistribution of partitions between consuming clients
      • commits should be issued to move server-side cursors
  • Schema:
    • Schema registry.
    • Several event type categories (Undefined, Business, Data Change).
    • Several partitioning strategies (Random, Hash, User defined).
    • Event enrichment strategies.
    • Schema evolution.
    • Events validation using an event type schema.
  • Security:
    • OAuth2 authentication.
    • Per-event type authorization.
    • Blacklist of users and applications.
  • Operations:
    • STUPS platform compatible.
    • ZMON monitoring compatible.
    • SLO monitoring.
    • Timelines.
      • This allows transparently switch production and consumption to different cluster (tier, region, AZ) without moving actual data and any service degradation.
      • Opens possibility for implementation of other streaming technologies and engines besides Kafka (like AWS Kinesis, Google pub/sub etc.)

Read more about latest development in our Changelog

Additional features that we plan to cover in the future are:

  • Support for different streaming technologies and engines. Nakadi currently uses Apache Kafka as its broker, but other providers (such as Kinesis) will be possible.
  • Filtering of events for subscribing consumers.
  • Store old published events forever using transparent fall back backup shortages like AWS S3.
  • Separate the internal schema register to standalone service.
  • Use additional schema formats and protocols like Avro, protobuf and others.

The zalando-nakadi organisation contains many useful related projects like

  • Client libraries
  • SDK
  • GUI
  • DevOps tools and more

Examples

Creating Event Types

An event type can be created by posting to the event-types resource.

curl -v -XPOST https://localhost:8080/event-types -H "Content-type: application/json" -d '{
  "name": "order.ORDER_RECEIVED",
  "owning_application": "order-service",
  "category": "undefined",
  "partition_strategy": "random",
  "schema": {
    "type": "json_schema",
    "schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
  }
}'

Publishing Events

Events for an event type can be published by posting to its “events” collection:

curl -v -XPOST https://localhost:8080/event-types/order.ORDER_RECEIVED/events -H "Content-type: application/json" -d '[
  {
    "order_number": "24873243241",
    "metadata": {
      "eid": "d765de34-09c0-4bbb-8b1e-7160a33a0791",
      "occurred_at": "2016-03-15T23:47:15+01:00"
    }
  }, {
    "order_number": "24873243242",
    "metadata": {
      "eid": "a7671c51-49d1-48e6-bb03-b50dcf14f3d3",
      "occurred_at": "2016-03-15T23:47:16+01:00"
    }
  }]'


HTTP/1.1 200 OK  

Consuming Events

You can open a stream for an Event Type via the events sub-resource:

curl -v https://localhost:8080/event-types/order.ORDER_RECEIVED/events 
    

HTTP/1.1 200 OK

{"cursor":{"partition":"0","offset":"4"},"events":[{"order_number": "ORDER_001", "metadata": {"eid": "4ae5011e-eb01-11e5-8b4a-1c6f65464fc6", "occurred_at": "2016-03-15T23:56:11+01:00"}}]}
{"cursor":{"partition":"0","offset":"5"},"events":[{"order_number": "ORDER_002", "metadata": {"eid": "4bea74a4-eb01-11e5-9efa-1c6f65464fc6", "occurred_at": "2016-03-15T23:57:15+01:00"}}]}
{"cursor":{"partition":"0","offset":"6"},"events":[{"order_number": "ORDER_003", "metadata": {"eid": "4cc6d2f0-eb01-11e5-b606-1c6f65464fc6", "occurred_at": "2016-03-15T23:58:15+01:00"}}]}

Nakadi community

There is a large ecosystem of projects around Nakadi. Check they out on zalando-nakadi

Getting Started

In this section we’ll walk through running a Nakadi service on your machine. Once you have the service up and running, you can jump to Using Nakadi to see how produce and consume messages.

Quickstart

You can run a Nakadi service locally using Docker. If you don’t have Docker installed, there are great instructions available on the Docker website.

Running a Server

From the project’s home directory you can install and start a Nakadi container via the gradlew command:

./gradlew startNakadi

This will start a docker container for the Nakadi server and another container with its PostgreSQL, Kafka and Zookeeper dependencies. You can read more about the gradlew script in the Building and Developing section

Stopping a Server

To stop the running Nakadi:

./gradlew stopNakadi

Notes

If you’re having trouble getting started, you might find an answer in the Frequently Asked Questions (FAQ) section of the documentation.

Ports

Some ports need to be available to run the service:

  • 8080 for the API server
  • 5432 for PostgreSQL
  • 9092 for Kafka
  • 2181 for Zookeeper

They allow the services to communicate with each other and should not be used by other applications.

Mac OS Docker Settings

Since Docker for Mac OS runs inside Virtual Box, you will want to expose some ports first to allow Nakadi to access its dependencies:

docker-machine ssh default \
-L 9092:localhost:9092 \
-L 8080:localhost:8080 \
-L 5432:localhost:5432 \
-L 2181:localhost:2181

Alternatively you can set up port forwarding on the “default” machine through its network settings in the VirtualBox UI.

vbox

If you get the message “Is the docker daemon running on this host?” but you know Docker and VirtualBox are running, you might want to run this command:

eval "$(docker-machine env default)"

Note: Docker for Mac OS (previously in beta) version 1.12 (1.12.0 or 1.12.1) currently is not supported due to the bug in networking host configuration.

Nakadi Concepts

Nakadi Concepts

The Nakadi API allows the publishing and consuming of events over HTTP.

A good way to think of events is that they are like messages in a stream processing or queuing system, but have a defined structure that can be understood and validated. The object containing the information describing an event is called an event type.

To publish and consume events, an owning application must first register a new event type with Nakadi. The event type contains information such as its name, the aforementioned owning application, strategies for partitioning and enriching data, and a JSON Schema. Nakadi supports an event type registry API that lists all the available event types.

Once the event type is created, a resource called a stream becomes available for that event type. The stream will accept events for the type from a producer and can be read from by one or more consumers. Nakadi can validate each event that is sent to the stream.

An event type’s stream can be divided into one or more partitions. Each event is placed into exactly one partition. Each partition represents an ordered log - once an event is added to a partition its position is never changed, but there is no global ordering across partitions [1].

Consumers can read events and track their position in the stream using a cursor that is given to each partition. Consumers can also use a cursor to read from a stream at a particular position. Multiple consumers can read from the same stream, allowing different applications to read the stream simultaneously.

In summary, applications using Nakadi can be grouped as follows:

  • Event Type Owners: Event type owners interact with Nakadi via the event type registry to define event types based on a schema and create event streams.

  • Event Producers: Producers publish events to the event type’s stream, that conform to the event type’s schema.

  • Event Consumers: Consumers read events from the event stream. Multiple consumers can read from the same stream.


[1] For more detail on partitions and the design of streams see “The Log” by Jay Kreps.

Cursors, Offsets and Partitions

By default the events resource will consume from all partitions of an event type and from the end (or “tail”) of the stream. To select only particular partitions and a position where in the stream to start, you can supply an X-Nakadi-Cursors header in the request:

curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events \
  -H 'X-Nakadi-Cursors: [{"partition": "0", "offset":"12"}]'

The header value is a JSON array of cursors. Each cursor in the array describes its partition for the stream and an offset to stream from. Note that events within the same partition maintain their overall order.

The offset value of the cursor allows you select where the in the stream you want to consume from. This can be any known offset value, or the dedicated value BEGIN which will start the stream from the beginning. For example, to read from partition 0 from the beginning:

curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events \
  -H 'X-Nakadi-Cursors:[{"partition": "0", "offset":"BEGIN"}]'

The details of the partitions and their offsets for an event type are available via its partitions resource.

Event Stream Keepalives

If there are no events to be delivered Nakadi will keep a streaming connection open by periodically sending a batch with no events but which contains a cursor pointing to the current offset. For example:

curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events 
      

HTTP/1.1 200 OK

{"cursor":{"partition":"0","offset":"6"},"events":[{"order_number": "ORDER_003", "metadata": {"eid": "4cc6d2f0-eb01-11e5-b606-1c6f65464fc6", "occurred_at": "2016-03-15T23:58:15+01:00"}}]}
{"cursor":{"partition":"0","offset":"6"}}
{"cursor":{"partition":"0","offset":"6"}}
{"cursor":{"partition":"0","offset":"6"}}
{"cursor":{"partition":"0","offset":"6"}}

This can be treated as a keep-alive control for some load balancers.

Event Types

Event Types

The object containing the information describing an event is called an event type.

To publish events an event type must exist in Nakadi. The event type contains information such as its name, a category, the owning application, strategies for partitioning and enriching data, and a JSON Schema. Nakadi has an event type registry API that lists all the available event types.

Event Types and Categories

There are three main categories of event type defined by Nakadi -

  • Business Event: An event that is part of, or drives a business process, such as a state transition in a customer order.

  • Data Change Event: An event that represents a change to a record or other item, or a new item. Change events are associated with a create, update, delete, or snapshot operation.

  • Undefined Event: A free form category suitable for events that are entirely custom to the producer.

Each event category enables different capabilities for an event type, notably their schema and validation rules, which we’ll describe next.

Event Type Schema and Effective Schema

The events for the ‘business’ and ‘data’ categories have their own pre-defined schema structures, based on JSON Schema, as well as a schema that is defined custom to the event type when it is created. The pre-defined structures describe common fields for an event and the custom schema for the event is defined when the event type is created.

The schema for an event type is submitted as a JSON Schema and will only declare the custom part of the event. This means the pre-defined schema for the ‘business’ and ‘data’ categories don’t need to be declared (and should not be declared). The ‘undefined’ category has no-predefined schema.

When an event for one of these categories is posted to the server, it is expected to conform to the combination of the pre-defined schema and to the custom schema defined for the event type, and not just the custom part of the event. This combination is called the effective schema and is validated by Nakadi for the ‘business’ and ‘data’ types.

The ‘undefined` category behaves slightly different to the other categories. Its effective schema is exactly the same as the one created with its event type definition (it has no extra structure), but it is not validated by Nakadi. Instead an ‘undefined’ event type’s schema is simply made available in the event type registry for consumers to use if they wish.

The custom schema for an event type can be as simple as { "\additionalProperties\": true } to allow arbitrary JSON, but will usually have a more specific definition :)

Compatibility modes

Compatibility modes are used to control schema changes. Each mode solves a specific problem and thus presents different constraints.

Nakadi supports different compatibility modes for event types used to control valid schema changes and to ensure schema consistency of published event data. Compatibility modes define restrictions for event schema evolution, i.e. a set cascade of allowed schema changes and thereby different compatibility guarantees for event consumers.

The default compatibility mode is forward compatible, but full compatibility usage is definitely encouraged, and the default mode will change in near future.

Fully compatible

The compatible compatibility mode is the safest mode. It’s both forward compatible and backward compatible. It means that:

  1. Consumers using older schemas can safely read events validated by newer schemas.
  2. Consumers using newer schemas can safely read events validated by older schemas.

It guarantees high data quality, which is crucial for a variety of applications. At Zalando, it’s required to use this compatibility mode to have data processed by business intelligence and long term event storage by the data lake.

Supported changes:

  1. changes to meta attributes: titles and descriptions.
  2. addition of new optional attributes.

The following json-schema attributes are not supported:

  • additionalProperties
  • additionalItems
  • not
  • patternProperties

Removing the support for these attributes is necessary to avoid the introduction of incompatible changes.

Under this compatibility mode, it’s necessary to fully specify events properties in order for validation to succeed; events containing properties that are not declared in the schema will be rejected. For this reason, producers should first update their schemas with new attributes and only after that start producing events with such attributes.

Forward compatible

The forward mode has been designed to allow event type owners to expand their schemas without breaking existing consumers.

It’s called forward because consumers using older schemas can safely read events generated by newer schemas. However, it’s not backward compatible, in the sense that reading older events using newer schemas is not safe.

Supported changes:

  1. changes to meta attributes: titles and descriptions.
  2. addition of new attributes, both optional and required.
  3. marking attributes as required.
  4. change additionalProperties from true to object.

Under this mode event validation accepts events with fields not declared in the schema. In other words, it’s not necessary to specify additionalProperties true in the schema, since this is the default behaviour of json-schema validator.

We discourage the usage of additionalProperties entirely. The more complete a schema definition the more stable and valuable the events. The ability to not specify every attribute is there only for some very specific situations when it’s not possible to define them all.

Consumers reading events from forward compatible event types SHOULD ignore event attributes not declared in the event type schema. This is aligned with API guidelines for compatibility.

Incompatible

Under compatibility mode none schemas can be changed arbitrarily. This mode is not recommended unless there is a very good reason not to provide any compatibility guarantee.

Changing compatibility modes

It’s possible to change the compatibility mode from none to forward and from forward to compatible, e.g. it’s possible to make the schema validation more strict but never more relaxed.

It’s not possible to upgrade directly from none to compatible. It’s necessary to go first through forward for later upgrading to compatible.

Users should be aware about changes in validation behaviour when upgrading to compatible. Please, be sure to read the section on compatible mode above.

Creating an Event Type

An event type can be created by posting to the /event-types resource.

This example shows a business category event type called order_received:

curl -v -XPOST -H "Content-Type: application/json" https://localhost:8080/event-types -d '{
  "name": "order_received",
  "owning_application": "acme-order-service",
  "category": "business",
  "partition_strategy": "hash",
  "partition_key_fields": ["order_number"],
  "enrichment_strategies": ["metadata_enrichment"],
  "default_statistic": {
    "messages_per_minute": 1000,	
    "message_size":	5,
    "read_parallelism":	1,
    "write_parallelism": 1
  },
  "schema": {
    "type": "json_schema",
    "schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
  }
}'

The event type has a simple JSON Schema submitted as an escaped JSON string describing the order number and thus only declare the custom part of the schema. The partition_strategy says events will be allocated to partitions according to the hash of the order_number field, defined in partition_key_fields, and the owner’s name is "acme-order-service". The enrichment_strategies array says to apply metadata_enrichment to submitted events (common metadata is a feature of some categories).

The event type has an optional default_statistic object, which controls the number of partitions. Nakadi will use a sensible default if no value is provided. The values provided here cannot be changed later, so choose them wisely.

A successful request will result in a 201 Created response.

Once an event type is created, it is added to the event type registry and its details are visible from its URI in the registry. Events can then be posted to its stream and consumed by multiple clients.

The exact required fields depend on the event type’s category, but name, owning_application and schema are always expected. The “API Reference” contains more details on event types.

Partitions

An event type’s stream is divided into one or more partitions and each event is placed into exactly one partition. Partitions preserve the order of events - once an event is added to a partition its position relative to other events in the partition is never changed. The details of the partitions and their offsets for an event type are available via its /partitions resource.

Partition Ordering

Each partition is a fully ordered log, and there is no global ordering across partitions. Clients can consume a stream’s partitions independently and track their position across the stream.

/img/partitions.png

Dividing a stream this way allows the overall system to be scaled and provide good throughput for producers and consumers. It’s similar to how systems such as Apache Kafka and AWS Kinesis work.

Partition Strategies

The assignment of events to a partition is controllable by the producer. The partition_strategy field determines how events are mapped to partitions. Nakadi offers the following strategies:

  • random: the partition is selected randomly and events will be evenly distributed across partitions. Random is the default option used by Nakadi.

  • hash: the partition is selected by hashing the value of the fields defined in the event type’s partition_key_fields. In practice this means events that are about the same logical entity and which have the same values for the partition key will be sent to the same partition.

  • user_defined: the partition is set by the producer when sending an event. This option is only available for the ‘business’ and data’ categories.

Which option to use depends on your requirements. When order matters, hash is usually the right choice. For very high volume streams where order doesn’t matter, random can be a good choice as it load balances data well. The user defined option is a power tool, unless you know you need it, use hash or random. Hash is the preferred strategy, as it ensures that duplicated events will end up in the same partition.

Authorization

Per-resource authorization

Nakadi allows users to restrict access to resources they own - currently, event types are the only resources supported, but we plan to extend this feature to subscriptions in the near future.

The authorization model is simple: policies can be attached to resources. A policy P defines which subjects can perform which operations on a resource R. To do so, the policy contains, for each operation, a list of attributes that represent subjects who are authorized to perform the operation on the resource. For a subject to be authorized, it needs to match at least one of these attributes (not necessarily all of them).

There are three kinds of operation: admin, to update the resource and delete it; read, to read events from the resource; and write, to write events to the resource.

An authorization request is represented by the tuple

R(subject, operation, resource)

The request will be approved iff the resource policy has at least one attribute for operation that matches the subject.

Protecting an event type

Protecting an event type can be done either during the creation of the event type, or later, as an update to the event type. Users simply need to add an authorization section to their event type description, which looks like this:

  "authorization": {
    "admins": [{"data_type": "user", "value": "bfawlty"}],
    "readers": [{"data_type": "user", "value": "bfawlty"}],
    "writers": [{"data_type": "user", "value": "bfawlty"}]
  }

In this section, the admins list includes the attributes that authorize a subject to perform the admin operation; the readers list includes the attributes that authorize a subject to perform the read operation; and the writers list includes the attributes that authorize a subject to perform the write operation;

Whenever an event type is created, or its authorization section is updated, all attributes are validated. The exact nature of the validation depends on the plugin implementation.

Creating an event type

Here is a sample request with an authorization section. It gives read, write, and admin access to a single attribute, of type service:

curl -v -XPOST -H "Content-Type: application/json" https://localhost:8080/event-types -d '{
  "name": "order_received",
  "owning_application": "acme-order-service",
  "category": "business",
  "partition_strategy": "hash",
  "partition_key_fields": ["order_number"],
  "enrichment_strategies": ["metadata_enrichment"],
  "default_statistic": {
    "messages_per_minute": 1000,    
    "message_size":    5,
    "read_parallelism":    1,
    "write_parallelism": 1
  },
  "schema": {
    "type": "json_schema",
    "schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
  },
  "authorization": {
    "admins": [{"data_type": "user", "value": "bfawlty"}],
    "readers": [{"data_type": "user", "value": "bfawlty"}],
    "writers": [{"data_type": "user", "value": "bfawlty"}]
  }
}'

Updating an event type

Updating an event type is similar to creating one. Here is a sample request, that gives read, write, and admin access to the same application:

curl -v -XPUT -H "Content-Type: application/json" https://localhost:8080/event-types/order_received -d '{
  "name": "order_received",
  "owning_application": "acme-order-service",
  "category": "business",
  "partition_strategy": "hash",
  "partition_key_fields": ["order_number"],
  "enrichment_strategies": ["metadata_enrichment"],
  "default_statistic": {
    "messages_per_minute": 1000,    
    "message_size":    5,
    "read_parallelism":    1,
    "write_parallelism": 1
  },
  "schema": {
    "type": "json_schema",
    "schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
  },
  "authorization": {
    "admins": [{"data_type": "user", "value": "bfawlty"}],
    "readers": [{"data_type": "user", "value": "bfawlty"}],
    "writers": [{"data_type": "user", "value": "bfawlty"}]
  }
}'

When updating an event type, users should keep in mind the following caveats:

  • If the event type already has an authorization section, then it cannot be removed in an update;
  • If the update changes the list of readers, then all consumers will be disconnected. It is expected that they will try to reconnect, which will only work for those that are still authorized.

WARNING: this also applies to consumers using subscriptions; if a subscription includes multiple event types, and as a result of the update, a consumer loses read access to one of them, then the consumer will not be able to consume from the subscription anymore.

Producing Events

Producing Events

Posting an Event

One or more events can be published by posting to an event type’s stream.

The URI for a stream is a nested resource and based on the event type’s name - for example the “widgets” event type will have a relative resource path called /event-types/widgets/events.

This example posts two events to the order_received stream:

curl -v -XPOST -H "Content-Type: application/json" https://localhost:8080/event-types/order_received/events -d '[
  {
    "order_number": "24873243241",
    "metadata": {
      "eid": "d765de34-09c0-4bbb-8b1e-7160a33a0791",
      "occurred_at": "2016-03-15T23:47:15+01:00"
    }
  }, {
    "order_number": "24873243242",
    "metadata": {
      "eid": "a7671c51-49d1-48e6-bb03-b50dcf14f3d3",
      "occurred_at": "2016-03-15T23:47:16+01:00"
    }
  }]'


HTTP/1.1 200 OK  

As shown above, the event stream accepts an array of events.

Validation Strategies and Effective Schema

Each event sent to the stream will be validated relative to the effective schema for the event type’s category.

The validation behavior and the effective schema varies based on the event type’s category. For example, because the example above is a ‘business’ category event type, as well as the fields defined in the event type’s original schema, the events must also contain a metadata object with an eid and occurred_at fields in order to conform to the standard structure for that category.

Once the event is validated, it is placed into a partition and made available to consumers. If the event is invalid, it is rejected by Nakadi.

Enrichment Strategies

@@@TODO

Event Ordering

The order of events in the posted array will be the order they are published onto the event stream and seen by consumers. They are not re-ordered based on any values or properties of the data.

Applications that need to order events for a particular entity or based on a identifiable key in the data should configure their event type with the hash partitioning strategy and name the fields that can be used to construct the key. This allows partial ordering for a given entity.

Total ordering is not generally achievable with Nakadi (or Kafka) unless the partition size is configured to be size 1. In most cases, total ordering is not needed and in many cases is not desirable as it can severely limit system scalability and result in cluster hot spotting.

Delivery versus Arrival Order

Nakadi preserves the order of events sent to it (the “arrival order”), but has no control over the network between it and the producer. In some cases it may be possible for events to leave the producer but arrive at Nakadi in a different order (the “delivery order”).

Not all events need ordering guarantees but producers that do need end to end ordering have a few options they can take:

  • Wait for a response from the Nakadi server before posting the next event. This trades off overall producer throughput for ordering.

  • Use the parent_eids field in the ‘business’ and ‘data’ categories. This acts as a causality mechanism by allowing events to have “parent” events. Note the parent_eids option is not available in the ‘undefined’ category.

  • Define and document the ordering semantics as part of the event type’s scheme definition such that a consumer could use the information to sequence events at their end.

Low-level API

Consuming Events with the Low-level API

Connecting to a Stream

A consumer can open the stream for an Event Type via the /events sub-resource. For example to connect to the order_received stream send a GET request to its stream as follows:

curl -v https://localhost:8080/event-types/order_received/events 

The stream accepts various parameters from the consumer, which you can read about in the “API Reference”. In this section we’ll just describe the response format, along with how cursors and keepalives work.

HTTP Event Stream

The HTTP response on the wire will look something like this (the newline is show as \n for clarity):

curl -v https://localhost:8080/event-types/order_received/events 
    

HTTP/1.1 200 OK
Content-Type: application/x-json-stream

{"cursor":{"partition":"0","offset":"6"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"5"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"4"},"events":[...]}\n

Nakadi groups events into batch responses (see the next section, “Batch Responses” for some more details). Batches are separated by a newline and each available batch will be emitted on a single line. If there are no new batches the server will occasionally emit an empty batch (see the section “Event Stream Keepalives” further down).

Technically, while each batch is a JSON document, the overall response is not valid JSON. For this reason it is served as the media type application/x-stream-json rather than application/json. Consumers can use the single line delimited structure to frame data for JSON parsing.

Batch Response Formats

A pretty-printed batch object looks like this -

{
  "cursor": {
    "partition": "0",
    "offset": "4"
  },
  "events": [...]
} 

Each batch belongs to a single partition. The cursor object describes the partition and the offset for this batch of events. The cursor allow clients to checkpoint their position in the stream’s partition. Note that individual events in the stream don’t have cursors, they live at the level of a batch.

The events array contains a list of events that were published in the order they arrived from the producer. Note that while the producer can also send batches of events, there is no strict correlation between the batches the consumer is given and the ones the producer sends. Nakadi will regroup events send by the producer and distribute them across partitions as needed.

Cursors and Offsets

By default the /events resource will return data from all partitions of an event type stream and will do so from the end (or “tail”) of the stream. To select only particular partitions and a position in the stream to start, you can supply an X-Nakadi-Cursors header in the request:

curl -v https://localhost:8080/event-types/order_received/events \
  -H 'X-Nakadi-Cursors: [{"partition": "0", "offset":"12"}]'

The X-Nakadi-Cursors header value is a JSON array of cursors. Each cursor in the array describes its partition for the stream and an offset to stream from.

The offset value of the cursor allows you select where in the stream partition you want to consume from. This can be any known offset value, or the dedicated value begin which will start the stream from the beginning. For example, to read from partition 0 from the beginning:

curl -v https://localhost:8080/event-types/order_received/events \
  -H 'X-Nakadi-Cursors:[{"partition": "0", "offset":"begin"}]'

Event Stream Keepalives

If there are no events to be delivered the server will keep a streaming connection open by periodically sending a batch with no events but which contains a cursor pointing to the current offset. For example:

curl -v https://localhost:8080/event-types/order_received/events 
      

HTTP/1.1 200 OK
Content-Type: application/x-json-stream

{"cursor":{"partition":"0","offset":"6"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"6"}}\n
{"cursor":{"partition":"0","offset":"6"}}\n
{"cursor":{"partition":"0","offset":"6"}}\n

This can be treated as a keep-alive control.

Subscriptions

Subscriptions

Subscriptions allow clients to consume events, where the Nakadi server store offsets and automatically manages reblancing of partitions across consumer clients. This allows clients to avoid managing stream state locally.

The typical workflow when using subscriptions is:

  1. Create a Subscription specifying the event-types you want to read.

  2. Start reading batches of events from the subscription.

  3. Commit the cursors found in the event batches back to Nakadi, which will store the offsets.

If the connection is closed, and later restarted, clients will get events from the point of your last cursor commit. If you need more than one client for your subscription to distribute the load you can read the subscription with multiple clients and Nakadi will balance the load across them.

The following sections provide more detail on the Subscription API and basic examples of Subscription API creation and usage:

For a more detailed description and advanced configuration options please take a look at Nakadi swagger file.

Creating Subscriptions

A Subscription can be created by posting to the /subscriptions collection resource:

curl -v -XPOST "http://localhost:8080/subscriptions" -H "Content-type: application/json" -d '{
    "owning_application": "order-service",
    "event_types": ["order.ORDER_RECEIVED"]
  }'    

The response returns the whole Subscription object that was created, including the server generated id field:

HTTP/1.1 201 Created
Content-Type: application/json;charset=UTF-8

{
  "owning_application": "order-service",
  "event_types": [
    "order.ORDER_RECEIVED"
  ],
  "consumer_group": "default",
  "read_from": "end",
  "id": "038fc871-1d2c-4e2e-aa29-1579e8f2e71f",
  "created_at": "2016-09-23T16:35:13.273Z"
}

Consuming Events from a Subscription

Consuming events is done by sending a GET request to the Subscriptions’s event resource (/subscriptions/{subscription-id}/events):

curl -v -XGET "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f/events"

The response is a stream that groups events into JSON batches separated by an endline (\n) character. The output looks like this:

HTTP/1.1 200 OK
X-Nakadi-StreamId: 70779f46-950d-4e48-9fca-10c413845e7f
Transfer-Encoding: chunked

{"cursor":{"partition":"5","offset":"543","event_type":"order.ORDER_RECEIVED","cursor_token":"b75c3102-98a4-4385-a5fd-b96f1d7872f2"},"events":[{"metadata":{"occurred_at":"1996-10-15T16:39:57+07:00","eid":"1f5a76d8-db49-4144-ace7-e683e8ff4ba4","event_type":"aruha-test-hila","partition":"5","received_at":"2016-09-30T09:19:00.525Z","flow_id":"blahbloh"},"data_op":"C","data":{"order_number":"abc","id":"111"},"data_type":"blah"},"info":{"debug":"Stream started"}]}
{"cursor":{"partition":"5","offset":"544","event_type":"order.ORDER_RECEIVED","cursor_token":"a28568a9-1ca0-4d9f-b519-dd6dd4b7a610"},"events":[{"metadata":{"occurred_at":"1996-10-15T16:39:57+07:00","eid":"1f5a76d8-db49-4144-ace7-e683e8ff4ba4","event_type":"aruha-test-hila","partition":"5","received_at":"2016-09-30T09:19:00.741Z","flow_id":"blahbloh"},"data_op":"C","data":{"order_number":"abc","id":"111"},"data_type":"blah"}]}
{"cursor":{"partition":"5","offset":"545","event_type":"order.ORDER_RECEIVED","cursor_token":"a241c147-c186-49ad-a96e-f1e8566de738"},"events":[{"metadata":{"occurred_at":"1996-10-15T16:39:57+07:00","eid":"1f5a76d8-db49-4144-ace7-e683e8ff4ba4","event_type":"aruha-test-hila","partition":"5","received_at":"2016-09-30T09:19:00.741Z","flow_id":"blahbloh"},"data_op":"C","data":{"order_number":"abc","id":"111"},"data_type":"blah"}]}
{"cursor":{"partition":"0","offset":"545","event_type":"order.ORDER_RECEIVED","cursor_token":"bf6ee7a9-0fe5-4946-b6d6-30895baf0599"}}
{"cursor":{"partition":"1","offset":"545","event_type":"order.ORDER_RECEIVED","cursor_token":"9ed8058a-95be-4611-a33d-f862d6dc4af5"}}

Each batch contains the following fields:

  • cursor: The cursor of the batch which should be used for committing the batch.

  • events: The array of events of this batch.

  • info: An optional field that can hold useful information (e.g. the reason why the stream was closed by Nakadi).

Please also note that when stream is started, the client receives a header X-Nakadi-StreamId which must be used when committing cursors.

To see a full list of parameters that can be used to control a stream of events, please see an API specification in swagger file.

Client Rebalancing

If you need more than one client for your subscription to distribute load or increase throughput - you can read the subscription with multiple clients and Nakadi will automatically balance the load across them.

The balancing unit is the partition, so the number of clients of your subscription can’t be higher than the total number of all partitions of the event-types of your subscription.

For example, suppose you had a subscription for two event-types A and B, with 2 and 4 partitions respectively. If you start reading events with a single client, then the client will get events from all 6 partitions. If a second client connects, then 3 partitions will be transferred from first client to a second client, resulting in each client consuming 3 partitions. In this case, the maximum possible number of clients for the subscription is 6, where each client will be allocated 1 partition to consume.

The Subscription API provides a guarantee of at-least-once delivery. In practice this means clients can see a duplicate event in the case where there are errors committing events. However the events which were successfully committed will not be resent.

A useful technique to detect and handle duplicate events on consumer side is to be idempotent and to check eid field of event metadata. Note: eid checking is not possible using the “undefined” category, as it’s only supplied in the “business” and “data” categories.

Subscription Cursors

The cursors in the Subscription API have the following structure:

{
  "partition": "5",
  "offset": "543",
  "event_type": "order.ORDER_RECEIVED",
  "cursor_token": "b75c3102-98a4-4385-a5fd-b96f1d7872f2"
}

The fields are:

  • partition: The partition this batch belongs to. A batch can only have one partition.

  • offset: The offset of this batch. The offset is server defined and opaque to the client - clients should not try to infer or assume a structure.

  • event_type: Specifies the event-type of the cursor (as in one stream there can be events of different event-types);

  • cursor_token: The cursor token generated by Nakadi.

Committing Cursors

Cursors can be committed by posting to Subscription’s cursor resource (/subscriptions/{subscriptionId}/cursors), for example:

curl -v -XPOST "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f/cursors"\
  -H "X-Nakadi-StreamId: ae1e39c3-219d-49a9-b444-777b4b03e84c" \
  -H "Content-type: application/json" \
  -d '{
    "items": [
      {
        "partition": "0",
        "offset": "543",
        "event_type": "order.ORDER_RECEIVED",
        "cursor_token": "b75c3102-98a4-4385-a5fd-b96f1d7872f2"
      },
      {
        "partition": "1",
        "offset": "923",
        "event_type": "order.ORDER_RECEIVED",
        "cursor_token": "a28568a9-1ca0-4d9f-b519-dd6dd4b7a610"
      }
    ]
  }'

Please be aware that X-Nakadi-StreamId header is required when doing a commit. The value should be the same as you get in X-Nakadi-StreamId header when opening a stream of events. Also, each client can commit only the batches that were sent to it.

The possible successful responses for a commit are:

  • 204: cursors were successfully committed and offset was increased.

  • 200: cursors were committed but at least one of the cursors didn’t increase the offset as it was less or equal to already committed one. In a case of this response code user will get a json in a response body with a list of cursors and the results of their commits.

The timeout for commit is 60 seconds. If you open the stream, read data and don’t commit anything for 60 seconds - the stream connection will be closed from Nakadi side. Please note that if there are no events available to send and you get only empty batches - there is no need to commit, Nakadi will close connection only if there is some uncommitted data and no commits happened for 60 seconds.

If the connection is closed for some reason then the client still has 60 seconds to commit the events it received from the moment when the events were sent. After that the session will be considered closed and it will be not possible to do commits with that X-Nakadi-StreamId. If the commit was not done - then the next time you start reading from a subscription you will get data from the last point of your commit, and you will again receive the events you haven’t committed.

When a rebalance happens and a partition is transferred to another client - the commit timeout of 60 seconds saves the day again. The first client will have 60 seconds to do the commit for that partition, after that the partition is started to stream to a new client. So if the commit wasn’t done in 60 seconds then the streaming will start from a point of last successful commit. In other case if the commit was done by the first client - the data from this partition will be immediately streamed to second client (because there is no uncommitted data left and there is no need to wait any more).

It is not necessary to commit each batch. When the cursor is committed, all events that are before this cursor in the partition will also be considered committed. For example suppose the offset was at e0 in the stream below,

partition: [ e0 | e1 | e2 | e3 | e4 | e5 | e6 | e7 | e8 | e9 ]
     offset--^

and the stream sent back three batches to the client, where the client committed batch 3 but not batch 1 or batch 2,

partition: [ e0 | e1 | e2 | e3 | e4 | e5 | e6 | e7 | e8 | e9 ]
     offset--^       
                |--- batch1 ---|--- batch2 ---|--- batch3 ---|
                        |             |               |
                        v             |               | 
                [ e1 | e2 | e3 ]      |               |
                                      v               |
                               [ e4 | e5 | e6 ]       |
                                                      v
                                              [ e7 | e8 | e9 ]
                                                    
client: cursor commit --> |--- batch3 ---|

then the offset will be moved all the way up to e9 implicitly committing all the events that were in the previous batches 1 and 2,

partition: [ e0 | e1 | e2 | e3 | e4 | e5 | e6 | e7 | e8 | e9 ]
                                                          ^-- offset

Checking Current Position

You can also check the current position of your subscription:

curl -v -XGET "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f/cursors"

The response will be a list of current cursors that reflect the last committed offsets:

HTTP/1.1 200 OK
{
  "items": [
    {
      "partition": "0",
      "offset": "8361",
      "event_type": "order.ORDER_RECEIVED",
      "cursor_token": "35e7480a-ecd3-488a-8973-3aecd3b678ad"
    },
    {
      "partition": "1",
      "offset": "6214",
      "event_type": "order.ORDER_RECEIVED",
      "cursor_token": "d1e5d85e-1d8d-4a22-815d-1be1c8c65c84"
    }
  ]
}

Subscription Statistics

The API also provides statistics on your subscription:

curl -v -XGET "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f/stats"

The output will contain the statistics for all partitions of the stream:

HTTP/1.1 200 OK
{
  "items": [
    {
      "event_type": "order.ORDER_RECEIVED",
      "partitions": [
        {
          "partition": "0",
          "state": "reassigning",
          "unconsumed_events": 2115,
          "stream_id": "b75c3102-98a4-4385-a5fd-b96f1d7872f2"
        },
        {
          "partition": "1",
          "state": "assigned",
          "unconsumed_events": 1029,
          "stream_id": "ae1e39c3-219d-49a9-b444-777b4b03e84c"
        }
      ]
    }
  ]
}

Deleting a Subscription

To delete a Subscription, send a DELETE request to the Subscription resource using its id field (/subscriptions/{id}):

curl -v -X DELETE "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f"

Successful response:

HTTP/1.1 204 No Content

Getting and Listing Subscriptions

To view a Subscription send a GET request to the Subscription resource resource using its id field (/subscriptions/{id}): :

curl -v -XGET "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f"

Successful response:

HTTP/1.1 200 OK
{
  "owning_application": "order-service",
  "event_types": [
    "order.ORDER_RECEIVED"
  ],
  "consumer_group": "default",
  "read_from": "end",
  "id": "038fc871-1d2c-4e2e-aa29-1579e8f2e71f",
  "created_at": "2016-09-23T16:35:13.273Z"
}

To get a list of subscriptions send a GET request to the Subscription collection resource:

curl -v -XGET "http://localhost:8080/subscriptions"

Example answer:

HTTP/1.1 200 OK
{
  "items": [
    {
      "owning_application": "order-service",
      "event_types": [
        "order.ORDER_RECEIVED"
      ],
      "consumer_group": "default",
      "read_from": "end",
      "id": "038fc871-1d2c-4e2e-aa29-1579e8f2e71f",
      "created_at": "2016-09-23T16:35:13.273Z"
    }
  ],
  "_links": {
    "next": {
      "href": "/subscriptions?offset=20&limit=20"
    }
  }
}

It’s possible to filter the list with the following parameters: event_type, owning_application.
Also, the following pagination parameters are available: offset, limit.

Clients

Clients

Nakadi does not ship with a client, but there are some open source clients available that you can try:

Name Language/Framework GitHub
Nakadi Java Java https://github.com/dehora/nakadi-java
Nakadi Klients Scala & Java https://github.com/kanuku/nakadi-klients
Reactive Nakadi Scala/Akka https://github.com/zalando-nakadi/reactive-nakadi
Fahrschein Java https://github.com/zalando-nakadi/fahrschein
Nakadion Rust https://github.com/chridou/nakadion
nakadi-client Haskell https://hackage.haskell.org/package/nakadi-client
go-nakadi Go https://github.com/stoewer/go-nakadi
nakacli CLI https://github.com/amrhassan/nakacli

More Nakadi related projects can be found here https://github.com/zalando-nakadi

We’ll add more clients to this section as they appear. Nakadi doesn’t support these clients; issues and pull requests should be filed with the client project.

Comparison

Comparison to Other Systems

In this section, we’ll look at how Nakadi fits in with the stream broker/processing ecosystems. Notably we’ll compare it to Apache Kafka, as that’s a common question, but also look briefly at some of the main cloud offerings in this area.

Apache Kafka (version 0.9)

Relative to Apache Kafka, Nakadi provides a number of benefits while still leveraging the raw power of Kafka as its internal broker.

  • Nakadi has some characteristics in common with Kafka, which is to be expected as the Kafka community has done an excellent job in defining the space. The logical model is basically the same - streams have partitions, messages in a partition maintain their order, and there’s no order across partitions. One producer can send an event to be read by multiple consumers and consumers have access to offset data that they can checkpoint. There are also some differences. For example Nakadi doens’t expose Topics as a concept in its API. Instead there are Event Types that define structure and ownership details as well as the stream. Also consumers receive messages in batches and each batch is checkpointed rather than an individual message.

  • Nakadi uses HTTP for communications. This lets microservices to maintain their boundaries and avoids forcing a shared technology dependency on producers and consumers - if you can speak HTTP you can use Nakadi and communicate with other services. This is a fairly subtle point, but Nakadi is optimised for general microservices integration and message passing, and not just handing off data to analytics subsystems. This means it needs to be available to as many different runtimes and stacks as possible, hence HTTP becomes the de-facto choice.

  • Nakadi is designed to support autonomous service teams. In Zalando, where Nakadi originated, each team has autonomy and control of their microservices stack to let them move quickly and take ownership. When running on AWS, this extends all the way down - every team has their own account structure, and to ensure a level of security and compliance teams run standard AMIs and constrain how they interact to HTTPS using OAuth2 access controls. This means we tend to want to run any shared infrastructure as a service with a HTTP based interface. Granted, not everyone has this need - many shops on AWS won’t have per-team account structures and will tend to use a smaller number of shared environments, but we’ve found it valulable to be able leverage the power of systems like Kafka in a way that fits in with this service architecture.

  • An event type registry with schema validation. Producers can define event types using JSON Schema. Having events validated against a published schema allows consumers to know they will. There are projects in the Kafka ecosystem from Confluent that provide similar features such as the rest-proxy and the schema-registry, but they’re slightly optimised for analytics, and not quite ideal for microservices where its more common to use regular JSON rather than Avro. The schema registry in particular is dependent on Avro. Also the consumer connection model for the rest-proxy requires clients are pinned to servers which complicates clients - the hope for the Nakadi is that its managed susbcription API, when that’s available, will not require session affinity in this way.

  • Inbuilt event types. Nakadi also optional support for events that describe business processes and data changes. These provide common primitives for event identity, timestamps, causality, operations on data and header propagation. Teams could define their own structures, but there’s value in having some basic things that consumers and producers can coordinate on independent of the payload, and which are being checked before being propagated to multiple consumers.

  • Operations is also a factor in Nakadi’s design. Managing upgrades to systems like Kafka becomes easier when technology sits behind an API and isn’t a shared dependency between microservices. Asychronous event delivery can be a simpler overall option for a microservice architecture compared to synchronized and deep call paths that have to be mitigated with caches, bulkheads and circuit breakers.

In short, Nakadi is best seen as a complement to Kafka. It allows teams to use Kafka within their own boundaries but not be forced into sharing it as a global dependency.

Google Pub/Sub

Like Nakadi, Pub/Sub has a HTTP API which hides details from producers and consumers and makes it suitable for use as a microservices backplane. There are some differences worth noting:

  • Pub/Sub lets you acknowledge every message individually rather than checkpointing a position in a logical log. This approach makes its model fairly different to the other systems mentioned here. While it implies that there are no inbuilt ordering assurances it does allow consumers to be very precise about what they have received.

  • Pub/Sub requires a susbcription to be setup before messages can be consumed, which can then be used to manage delivery state for messages. In that sense it’s not unlike a traditional queuing system where the server (or “broker”) manages state for the consumer, with the slight twist that messages have a sort of random access for acknowledgements instead of competing for work at the top of queue. Nakadi may offer a similar subcription option in the future via a managed API, but today consumers are expected to manage their own offsets.

  • Pub/Sub uses a polling model for consumers. Consumers grab a page of messages to process and acknowlege, and then make a new HTTP request to grab another page. Nakadi maintains a streaming connection to consumers, and will push events as they arrive.

  • Pub/Sub uses a common envelope structure for producing and consuming messages, and does not define any higher level structures beyond that.

AWS Kinesis

Like Nakadi and Pub/Sub, AWS Kinesis has a HTTP API to hide its details. Kinesis and Nakadi are more similar to each other than Pub/Sub, but there are some differences.

  • Kinesis expose shards (partitions) for a stream and supplies enough information to support per message checkpointing with semantics much like Kafka and Nakadi. Nakadi only supplies checkpointing information per batch of messages. Kinesis allows setting the partition hash key directly, whereas Nakadi computes the key based on the data.

  • Kinesis uses a polling model for consumers, whereas Nakadi maintains a streaming connection Kinesis consumers use a “shard iterator” to a grab pages of message, and then make a new HTTP request to grab another page. Kinesis limits the rate at which this can be done across all consumers (typically 5 transactions per second per open shard), which places an upper bound on consumer throughput. Kinesis has a broad range of choices for resuming from a position in the stream, Nakadi allows access only from the beginning and a named offset.

  • Kinesis uses a common envelope structure for producing and consuming messages, and does not define any higher level structures beyond that. Payload data is submitted as an opaque base64 blob.

  • AWS restrict the number of streams available to an account to quite a low starting number, and messages can be stored for a maximum of 7 days whereas Nakadi can support a large number of event types and the expiration for events is configurable.

  • Kinesis supports resizing the number of shards in a stream wheres partition counts in Nakadi are fixed once set for an event type.

AWS Simple Queue Service (SQS)

The basic abstraction in SQS is a queue, which is quite different from a Nakadi / Kafka stream.

  • SQS queues are durable and highly available. A queue can hold an unlimited number of messages, with a maximum message retention of 2 weeks. Each message carries an opaque text payload (max. 256KB). In addition to that, messages can have up to 10 message attributes, which can be read without inspecting the payload.

  • Each message in an SQS queue can only be consumed once. In the case of multiple consumers, each one would typically use a dedicated SQS queue, which are all hooked up to a shared Amazon SNS topic that provides the fanout. When a new consumer is later added to this setup, its queue will initially be empty. An SQS queue does not have any history, and cannot be “replayed” again like a Kafka stream.

  • SQS has “work queue” semantics. This means that delivered messages have to be removed from the queue explicitly by a separate call. If this call is not received within a configured timeframe, the message is delivered again (“automatic retry”). After a configurable number of unsuccessful deliveries, the message is moved to a dead letter queue.

  • In contrast to moving a single cursor in the datastream (like in Nakadi, Kinesis or Kafka), SQS semantics of confirming individual messages, has advantages if a single message is unprocessable (i.e. format is not parseable). In SQS only the problamatic message is delayed. In a cursor semantic the client has to decide: Either stop all further message processing until the problem is fixed or skip the message and move the cursor.

Allegro Hermes

Hermes like Nakadi, is an API based broker build on Apache Kafka. There are some differences worth noting:

  • Hermes uses webhooks to deliver messages to consumers. Consumers register a subscription with a callback url and a subscription policy that defines behaviors such as retries and delivery rates. Nakadi maintains a streaming connection to consumers, and will push events as they arrive. Whether messages are delivered in order to consumers does not appear to be a defined behaviour in the API. Similar to Kafka, Nakadi will deliver messages to consumers in arrival order for each partition. Hermes does not appear to support partitioning in its API. Hermes has good support for tracking delivered and undelivered messages to susbcribers.

  • Hermes supports JSON Schema and Avro validation in its schema registry. Nakadi’s registry currently only supports JSON Schema, but may support Avro in the future. Hermes does not provide inbuilt event types, whereas Nakadi defines optional types to support data change and business process events, with some uniform fields producers and consumers can coordinate on.

  • Hermes allows topics (event types in Nakadi) to be collated into groups that are adminstrated by a single publisher. Consumers access data at a per topic level, the same as Nakadi currently; Nakadi may support multi-topic subscriptions in the future via a subscription API.

  • The Hermes project supports a Java client driver for publishing messages. Nakadi does not ship with a client.

  • Hermes claims resilience when it comes to issues with its internal Kafka broker, such that it will continue to accept messages when Kafka is down. It does this by buffering messages in memory with an optional means to spill to local disk; this will help with crashing brokers or hermes nodes, but not with loss of an instance (eg an ec2 instance). Nakadi does not accept messages if its Kafka brokers are down or unavailable.

Recipes

This section features patterns on how to use Nakadi and event stream processing in general.

OverPartitioning

Problem

Nakadi throughput scales with the number of partitions in an event type. The number of partitions in an event type is fixed — it can only be configured on create. Scaling throughput by creating a new event type can be tricky though, because the switch-over has to be coordinated between producers and consumers.

You expect a significant increase in throughput over time. How many partitions should you create?

Solution

Create more partitions then you currently need. Each consumer initially reads from multiple partitions. Increase the number of consumers as throughput increases, until the number of consumers is equal to the number of partitions.

To distribute the workload evenly, make sure that each consumer reads from the same number of partitions. This strategy works best if the number of partitions is a product of small primes:

  • with 6 (= 2 * 3) partitions, you can use 1, 2, 3 or 6 consumers
  • with 8 (= 2 * 2 * 2) partitions, you can use 1, 2, 4 or 8 consumers
  • with 12 (= 2 * 2 * 3) partitions, you can use 1, 2, 3, 4, 6 or 12 consumers

Discussion

The total number of partitions in a Nakadi cluster is limited. Start with a single partition, and employ this pattern only once you are forced to use multiple partitions. Don’t over-overpartition, use the lowest sensible number that works. You can always fall back on creating a new event type with more partitions later, if necessary.

ProcessingPipeline

Problem

You want to process all events in a given event type, but you have to preserve local (per-partition) ordering of the events.

Solution

Create a processing pipeline with multiple stages. Each stage consists of a single worker thread, and an inbox (small bounded in-memory list).

Each stage reads events one by one from its inbox, processes them, and puts them in the inbox of the next stage. The first stage reads events from Nakadi instead.

If you want to publish the events to Nakadi after processing, then the last stage can collect them in an internal buffer and post them in batches.

To keep track of Nakadi cursors, you can push them as pseudo-events trough the pipeline. Once the cursor has reached the last stage, all events in the batch must have been processed, so the cursor can be saved.

Discussion

Using bounded inboxes decouples the stages from each other, creates backpressure between them, and puts an upper limit on the total amount of work-in-progress in the pipeline.

Overall troughput of the pipeline is limited by the stage with the largest average processing time per event. By optimizing this bottleneck, you can optimize the overall throughput. Example: if the slowest stage needs 20ms to process each event, throughput will be lower than 50 events per second.

Each pipeline can consume events from one or more partitions. This setup can be scaled by increasing the number of pipelines running in parallel, up to the number of partitions in the event type.

More ideas

  • OrderedProducer: strong and weak ordering techniques
  • StatefulConsumer: managing offsets locally
  • UsingPartitions: when to use which partition options and selecting partition keys
  • HighThroughputEventing: approaches to high volume events
  • ConsumerLeaseStealing: redistributing partition workloads in a cluster
  • CausalEventing: events that have causal relationships (happens-before)
  • EventTypeVersioning: approaches to versioning event types
  • SendAnything: approaches to send arbitrary content through Nakadi (incl Avro and Protobufs)
  • ProcessMonitor: a microservice that coordinates/watches other event streams (cf @gregyoung)

Developing

Building and Developing

Getting the Code

Nakadi is hosted on Github - zalando/nakadi and you can clone or fork it from there.

Building

The project is built with Gradle.

The gradlew wrapper script is available in the project’s root and will bootstrap the right Gradle version if it’s not already installed.

The gradle setup is fairly standard, the main dev tasks are:

  • ./gradlew build: run a build and test
  • ./gradlew clean: clean down the build

Pull requests and master are built using Travis CI and you can see the build history here.

Running Tests

There are a few build commands for testing -

  • ./gradlew build: will run a build along with the unit tests
  • ./gradlew acceptanceTest: will run the acceptance tests
  • ./gradlew fullAcceptanceTest: will run the ATs in the context of Docker

Running Containers

There are a few build commands for running Docker -

  • ./gradlew startDockerContainer: start the docker containers and download images if needed.
  • ./gradlew stopAndRemoveDockerContainer: shutdown the docker processes
  • ./gradlew startStoragesInDocker: start the storage container that runs Kafka and PostgreSQL. This is handy for running Nakadi directly or in your IDE.

IDE Setup

For working with an IDE, the ./gradlew eclipse IDE task is available and you’ll be able to import the build.gradle into Intellij IDEA directly.

idea

F.A.Q

Frequently Asked Questions

Table of Contents


How long will events be persisted for?

The default retention time in the project is set by the retentionMs value in application.yml, which is currently 2 days.

The service installation you’re working with may have a different operational setting, and you should get in touch with the team operating that internal Nakadi service.

How do I define how long will events be persisted for?

At the moment, retention can’t be defined via the API per event type. It may be added as an option in the future. The best option for now would be to configure the underlying Kafka topic directly.

If you want to change the default for a server installation, you can set the retentionMs value in application.yml to a new value.

How many partitions will an event type be given?

The default partition count in the project is set by the partitionNum value in application.yml, which is currently 1.

The service installation you’re working with may have a different operational setting, and you should get in touch with the team operating that internal Nakadi service.

How do I configure the number of partitions?

At the moment, partition size can’t be defined via the API per event type. It may be added as an option in the future. The best option for now would be to configure the underlying Kafka topic directly.

If you want to change the default for a server installation, you can set the partitionNum value in application.yml to a new value.

Which partitioning strategy should I use?

See the section “Partition Strategies”, which goes into more detail on the available options and what they’re good for.

How can I keep track of a position in a stream?

Clients can track offset information sent in the Cursor on a per-partition basis - each batch of events sent to a consumer will contain such a Cursor that will detail the partition id and an offset (see “Cursors and Offsets” for more information). This allows a client to track how far in the partition they have consumed events, and also allows them to submit a cursor with an appropriate value as described in the “Cursors and Offsets” section. One approach would be to use local storage (eg a datastore like PostgreSQL or DynamoDB) to record the position to date outside the client application, making it available in the event of restarts.

Note that a managed API is being developed which will supporting storing offsets for consumer clients in the future.

What’s an effective schema?

The effective schema is the combination of the schema structure defined for a particular category, such as ‘business’ or ‘data’ and the custom schema submitted when creating an event type. When an event is posted to Nakadi, the effective schema is used to validate the event and not the separate category level and custom level schemas.

You can read more in the section “Effective Schema”.

Nakadi isn’t validating metadata and/or event identifiers, what’s going on?

It’s possible you are working with an ‘undefined’ event type. The ‘undefined’ category doesn’t support metadata validation or enrichment. In more technical terms, the effective schema for an undefined event is exactly the same as the schema that was submitted when the event type was created.

What clients are available?

The project doesn’t ship with a client, but there are a number of open source clients described in the “Clients” section.

If you have an open source client not listed there, we’d love to hear from you :) Let us know via GitHub and we’ll add it to the list.

How do I disable OAuth for local development?

The default behavior when running the docker containers locally will be for OAuth to be disabled.

If you are running a Nakadi server locally outside docker, you can disable token checks by setting the environment variable NAKADI_OAUTH2_MODE to OFF before starting the server.

Note that, even if OAuth is disabled using the NAKADI_OAUTH2_MODE environment variable, the current behavior will be to check a token if one is sent by a client so you might need to configure the client to also not send tokens.

I want to send arbitrary JSON, how do I avoid defining a JSON Schema?

The standard workaround is to define an event type with the following category and schema:

  • category: undefined
  • schema: {"additionalProperties": true}

Note that sending a schema of {} means nothing will validate, not that anything will be allowed.

Can I post something other than JSON as an event?

It’s a not a configuration the project directly supports or is designed for. But if you are willing to use JSON as a wrapper, one option is to define a JSON Schema with a property whose type is a string, and send the non-JSON content as a Base64 encoded value for the string. It’s worth pointing out this is entirely opaque to Nakadi and you won’t get the benefits of schema checking (or even that the submitted string is properly encoded Base64). Note that if you try this, you’ll need to be careful to encode the Base64 as being URL/file safe to avoid issues with the line delimited stream format Nakadi uses to send messages to consumers - as mentioned this is an option that the server doesn’t directly support.

I get the message “Is the docker daemon running on this host?” - Help!

If you get the message “Is the docker daemon running on this host?” first check that Docker and VirtualBox are running. If you know they are running, you might want to run this command -

eval "$(docker-machine env default)"

What’s the reason for newest available offset being smaller than oldest offset?

When there are no events available for an event-type because they’ve expired, then newest_available_offset will be smaller than oldest_available_offset. Because Nakadi has exclusive offset handling, it shows the offset of the last message in newest_available_offset.

Is there a way to make publishing batches of events atomic?

Not at the moment. If the events are for different event types, or the events will be distributed across different partitions for a single event type, then there’s no way to achieve atomicity in the sense of “all events or no events will be published” in the general case. If the events belong to the same partition, the server does not have compensating behavior to ensure they will all be written.

Producers that need atomicity will want to create an event type structure that allows all the needed information to be contained within an event. This is a general distributed systems observation around message queuing and stream broker systems rather than anything specific to Nakadi.

Does Nakadi support compression?

The server will accept gzip encoded events when posted. On the consumer side, if the client asks for compression the server will honor the request.

How do I contribute to the project?

Nakadi accepts contributions from the open-source community. Please see CONTRIBUTE.md.

Timelines

Timelines

This document covers Timelines internals. It’s meant to explain how timelines work, to help you understand the code and what each part of it contributes to the overall picture.

Timeline creation

Timeline creation is coordinated through a series of locks and barriers using Zookeeper. Following we depict an example of what the ZK datastructure looks like at each step.

Initial state

Every time a Nakadi application is launched, it tries to create the following ZK structure:

timelines:
  lock: -                    lock for timeline versions synchronization
  version: {version}      monotonically incremented long value (version of timelines configuration)
  locked_et: -
  nodes:                    nakadi nodes
    node1: {version}    Each nakadi node exposes the version used on this node
    node2: {version}

In order to not override the initial structure, due to concurrency, each instance needs to take the lock /nakadi/timelines/lock before executing.

Start timeline creation for et_1

When a new timeline creation is initiated, the first step is to acquire a lock to update timelines for et_1 by creating an ephemeral node at /timelines/locked_et/et_1.

timelines:
  lock: -
  version: 0
  locked_et:
    et_1: -
  nodes:
    node1: 0
    node2: 0

Notify all Nakadi nodes about change: the version barrier

Next, the instance coordinating the timeline creation bumps the version node, which all Nakadi instances are listening to changes, so they are notified when something changes.

timelines:
  lock: -
  version: 1       # this is incremented by 1
  locked_et:
    et_1: -
  nodes:
    node1: 0
    node2: 0

Wait for all nodes to react to the new version

Each Nakadi instance watches the value of the /nakadi/timelines/version/ node. When it changes, each instance checks all locked event types and reacts accordingly, by either releasing or blocking publishers locally.

Once each instance has updated its local list of locked event types, it bumps its own version, to let the timeline creator initiator know that it can proceed.

timelines:
  lock: -
  version: 1 
  locked_et:
     et_1: -
  nodes:
    node1: 1       # each instance updates its own version
    node2: 1

Proceed with timeline creation

Once all instances reacted, the creation proceeds with the initiator inserting the necessary database entries in the timelines table, and by snapshotting the latest available offset for the existing storage. It also creates a topic in the new storage. Be aware that if a timeline partition has never been used, the offset stored is -1. If it has a single event, the offset is zero and so on.

Remove lock and notify all instances again

Following the same logic for initiating the creation of a timeline, locks are deleted and version is bumped. All Nakadi instances react by removing their local locks and switching timeline if necessary.

timelines:
  lock: -
  version: 2 
  locked_et:     
  nodes:
    node1: 1
    node2: 1

After every instance reacted, it should look like:

timelines:
  lock: -
  version: 2 
  locked_et:
  nodes:
    node1: 2       # each instance updates its own version
    node2: 2

Done

All done here. A new timeline has been created successfully. All operations are logged so in case you need to debug things, just take a look at INFO level logs.

Nakadi Event Bus API Definition

version 0.9.1


Nakadi at its core aims at being a generic and content-agnostic event broker with a convenient
API. In doing this, Nakadi abstracts away, as much as possible, details of the backing
messaging infrastructure. The single currently supported messaging infrastructure is Kafka
(Kinesis is planned for the future).

In Nakadi every Event has an EventType, and a stream of Events is exposed for each
registered EventType.

An EventType defines properties relevant for the operation of its associated stream, namely:

  • The schema of the Event of this EventType. The schema defines the accepted format of
    Events of an EventType and will be, if so desired, enforced by Nakadi. Usually Nakadi will
    respect the schema for the EventTypes in accordance to how an owning Application defines them.

  • The expected validation and enrichment procedures upon reception of an Event.
    Validation define conditions for the acceptance of the incoming Event and are strictly enforced
    by Nakadi. Usually the validation will enforce compliance of the payload (or part of it) with
    the defined schema of its EventType. Enrichment specify properties that are added to the payload
    (body) of the Event before persisting it. Usually enrichment affects the metadata of an Event
    but is not limited to.

  • The ordering expectations of Events in this stream. Each EventType will have its Events
    stored in an underlying logical stream (the Topic) that is physically organized in disjoint
    collections of strictly ordered Events (the Partition). The EventType defines the field that
    acts as evaluator of the ordering (that is, its partition key); this ordering is guaranteed by
    making Events whose partition key resolves to the same Partition (usually a hash function on its
    value) be persisted strictly ordered in a Partition. In practice this means that all Events
    within a Partition have their relative order guaranteed: Events (of a same EventType) that are
    about a same data entity (that is, have the same value on its Partition key) reach always the
    same Partition, the relative ordering of them is secured. This mechanism implies that no
    statements can be made about the relative ordering of Events that are in different partitions.

  • The authorization rules for the event type. The rules define who can produce events for
    the event type (write), who can consume events from the event type (read), and who can update
    the event type properties (admin). If the authorization rules are absent, then the event type
    is open to all authenticated users.

    Except for defined enrichment rules, Nakadi will never manipulate the content of any Event.

    Clients of Nakadi can be grouped in 2 categories: EventType owners and Clients (clients
    in turn are both Producers and Consumers of Events). Event Type owners interact with
    Nakadi via the Schema Registry API for the definition of EventTypes, while Clients via the
    streaming API for submission and reception of Events.

    A low level Unmanaged API is available, providing full control and responsibility of
    position tracking and partition resolution (and therefore ordering) to the Clients.

    In the High Level API the consumption of Events proceeds via the establishment
    of a named Subscription to an EventType. Subscriptions are persistent relationships from an
    Application (which might have several instances) and the stream of one or more EventType’s,
    whose consumption tracking is managed by Nakadi, freeing Consumers from any responsibility in
    tracking of the current position on a Stream.


    Scope and status of the API
    ———————————

    In this document, you’ll find:

  • The Schema Registry API, including configuration possibilities for the Schema, Validation,
    Enrichment and Partitioning of Events, and their effects on reception of Events.

  • The existing event format (see definition of Event, BusinessEvent and DataChangeEvent)
    (Note: in the future this is planned to be configurable and not an inherent part of this API).

  • High Level API.

    Other aspects of the Event Bus are at this moment to be defined and otherwise specified, not included
    in this version of this specification.

name: Team Aruha @ Zalando
email: team-aruha+nakadi-maintainers@zalando.de

Methods

/event-types

Returns a list of all registered EventTypes

Name Located in Description
X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

Array of EventType

Ok

Problem

Client is not authenticated

/event-types

Creates a new EventType.

The fields enrichment-strategies and partition-resolution-strategy
have all an effect on the incoming Event of this EventType. For its impacts on the reception
of events please consult the Event submission API methods.

  • Validation strategies define an array of validation stategies to be evaluated on reception
    of an Event of this EventType. Details of usage can be found in this external document

    • http://zalando.github.io/nakadi-manual/

  • Enrichment strategy. (todo: define this part of the API).

  • The schema of an EventType is defined as an EventTypeSchema. Currently only
    the value json-schema is supported, representing JSON Schema draft 04.

    Following conditions are enforced. Not meeting them will fail the request with the indicated
    status (details are provided in the Problem object):

  • EventType name on creation must be unique (or attempting to update an EventType with
    this method), otherwise the request is rejected with status 409 Conflict.

  • Using EventTypeSchema.type other than json-schema or passing a EventTypeSchema.schema
    that is invalid with respect to the schema’s type. Rejects with 422 Unprocessable entity.

  • Referring any Enrichment or Partition strategies that do not exist or
    whose parametrization is deemed invalid. Rejects with 422 Unprocessable entity.

    Nakadi MIGHT impose necessary schema, validation and enrichment minimal configurations that
    MUST be followed by all EventTypes (examples include: validation rules to match the schema;
    enriching every Event with the reception date-type; adhering to a set of schema fields that
    are mandatory for all EventTypes). The mechanism to set and inspect such rules is not
    defined at this time and might not be exposed in the API.

Name Located in Description
event-type body EventType

EventType to be created

-

Created

Problem

Client is not authenticated

Problem

Conflict, for example on creation of EventType with already existing name.

Problem

Unprocessable Entity

/event-types/{name}

Returns the EventType identified by its name.

Name Located in Description
name path String

Name of the EventType to load.

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

Problem

Client is not authenticated

/event-types/{name}

Updates the EventType identified by its name. Behaviour is the same as creation of
EventType (See POST /event-type) except where noted below.

The name field cannot be changed. Attempting to do so will result in a 422 failure.

Modifications to the schema are constrained by the specified compatibility_mode.

Updating the EventType is only allowed for clients that satisfy the authorization admin requirements,
if it exists.

Name Located in Description
name path String

Name of the EventType to update.

event-type body EventType

EventType to be updated.

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

-

Ok

Problem

Client is not authenticated

Problem

Unprocessable Entity

Problem

Access forbidden

/event-types/{name}

Deletes an EventType identified by its name. All events in the EventType’s stream’ will
also be removed. Note: deletion happens asynchronously, which has the following
consequences:

  • Creation of an equally named EventType before the underlying topic deletion is complete
    might not succeed (failure is a 409 Conflict).

  • Events in the stream may be visible for a short period of time before being removed.

    Depending on the Nakadi configuration, the oauth resource owner username may have to be equal to
    ‘nakadi.oauth2.adminClientId’ property to be able to access this endpoint.

    Updating the EventType is only allowed for clients that satisfy the authorization admin requirements,
    if it exists.
Name Located in Description
name path String

Name of the EventType to delete.

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

-

EventType is successfuly removed

Problem

Client is not authenticated

Problem

Access forbidden

/event-types/{name}/cursor-distances

GET with payload.

Calculate the distance between two offsets. This is useful for performing checks for data completeness, when
a client has read some batches and wants to be sure that all delivered events have been correctly processed.

If per-EventType authorization is enabled, the caller must be authorized to read from the EventType.

Name Located in Description
name path String

Name of the EventType

cursors-distances-query body

List of pairs of cursors: initial_cursor and final_cursor. Used as input by Nakadi to
calculate how many events there are between two cursors. The distance doesn’t include the initial_cursor
but includes the final_cursor. So if they are equal the result is zero.

An array of CursorDistanceResults with the distance between two offsets.

OK

Problem

Unprocessable Entity

Problem

Access forbidden because of missing scope or EventType authorization failure.

/event-types/{name}/cursors-lag

GET with payload.

This endpoint is mostly interesting for monitoring purposes. Used when a consumer wants to know how far behind
in the stream its application is lagging.

It provides the number of unconsumed events for each cursor’s partition.

If per-EventType authorization is enabled, the caller must be authorized to read from the EventType.

Name Located in Description
name path String

EventType name

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

cursors body
Array of Cursor

Each cursor indicates the partition and offset consumed by the client. When a cursor is provided,
Nakadi calculates the consumer lag, e.g. the number of events between the provided offset and the most
recent published event. This lag will be present in the response as unconsumed_events property.

It’s not mandatory to specify cursors for every partition.

The lag calculation is non inclusive, e.g. if the provided offset is the offset of the latest published
event, the number of unconsumed_events will be zero.

All provided cursors must be valid, i.e. a non expired cursor of an event in the stream.

Array of Partition

List of partitions with the number of unconsummed events.

OK

Problem

Unprocessable Entity

Problem

Access forbidden because of missing scope or EventType authorization failure.

/event-types/{name}/events

Starts a stream delivery for the specified partitions of the given EventType.

The event stream is formatted as a sequence of EventStreamBatches separated by \n. Each
EventStreamBatch contains a chunk of Events and a Cursor pointing to the end of the
chunk (i.e. last delivered Event). The cursor might specify the offset with the symbolic
value BEGIN, which will open the stream starting from the oldest available offset in the
partition.

Currently the application/x-json-stream format is the only one supported by the system,
but in the future other media types may be supported.

If streaming for several distinct partitions, each one is an independent EventStreamBatch.

The initialization of a stream can be parameterized in terms of size of each chunk, timeout
for flushing each chunk, total amount of delivered Events and total time for the duration of
the stream.

Nakadi will keep a streaming connection open even if there are no events to be delivered. In
this case the timeout for the flushing of each chunk will still apply and the
EventStreamBatch will contain only the Cursor pointing to the same offset. This can be
treated as a keep-alive control for some load balancers.

The tracking of the current offset in the partitions and of which partitions is being read
is in the responsibility of the client. No commits are needed.

The HTTP response on the wire will look something like this (the newline is show as \n for clarity):

curl -v https://localhost:8080/event-types/order_received/events 
    

HTTP/1.1 200 OK
Content-Type: application/x-json-stream

{"cursor":{"partition":"0","offset":"6"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"5"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"4"},"events":[...]}\n
Name Located in Description
name path String

EventType name to get events about

X-nakadi-cursors header
Array of String
format: #/definitions/Cursor

Cursors indicating the partitions to read from and respective starting offsets.

Assumes the offset on each cursor is not inclusive (i.e., first delivered Event is the
first one after the one pointed to in the cursor).

If the header is not present, the stream for all partitions defined for the EventType
will start from the newest event available in the system at the moment of making this
call.

Note: we are not using query parameters for passing the cursors only because of the
length limitations on the HTTP query. Another way to initiate this call would be the
POST method with cursors passed in the method body. This approach can implemented in the
future versions of this API.

batch_limit query Integer
format: int32
default: 1

Maximum number of Events in each chunk (and therefore per partition) of the stream.

stream_limit query Integer
format: int32
default: 0

Maximum number of Events in this stream (over all partitions being streamed in this
connection).

  • If 0 or undefined, will stream batches indefinitely.

  • Stream initialization will fail if stream_limit is lower than batch_limit.
batch_flush_timeout query Number
format: int32
default: 30

Maximum time in seconds to wait for the flushing of each chunk (per partition).

  • If the amount of buffered Events reaches batch_limit before this batch_flush_timeout
    is reached, the messages are immediately flushed to the client and batch flush timer is reset.

  • If 0 or undefined, will assume 30 seconds.
stream_timeout query Number
format: int32
default: 0
minimum: 0
maximum: 4200

Maximum time in seconds a stream will live before connection is closed by the server.
If 0 or unspecified will stream for 1h ±10min.

If this timeout is reached, any pending messages (in the sense of stream_limit) will be flushed
to the client.

Stream initialization will fail if stream_timeout is lower than batch_flush_timeout.
If the stream_timeout is greater than max value (4200 seconds) - Nakadi will treat this as not
specifying stream_timeout (this is done due to backwards compatibility).

stream_keep_alive_limit query Integer
format: int32
default: 0

Maximum number of empty keep alive batches to get in a row before closing the connection.

If 0 or undefined will send keep alive messages indefinitely.

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

EventStreamBatch

Starts streaming to the client.
Stream format is a continuous series of EventStreamBatchs separated by \n

Problem

Not authenticated

Problem

Unprocessable entity

Problem

Too Many Requests. The client reached the maximum amount of simultaneous connections to a single partition

Problem

Access is forbidden for the client or event type

/event-types/{name}/events

Publishes a batch of Events of this EventType. All items must be of the EventType
identified by name.

Reception of Events will always respect the configuration of its EventType with respect to
validation, enrichment and partition. The steps performed on reception of incoming message
are:

  1. Every validation rule specified for the EventType will be checked in order against the
    incoming Events. Validation rules are evaluated in the order they are defined and the Event
    is rejected in the first case of failure. If the offending validation rule provides
    information about the violation it will be included in the BatchItemResponse. If the
    EventType defines schema validation it will be performed at this moment. The size of each
    Event will also be validated. The maximum size per Event is 999,000 bytes. We use the batch
    input to measure the size of events, so unnecessary spaces, tabs, and carriage returns will
    count towards the event size.

  2. Once the validation succeeded, the content of the Event is updated according to the
    enrichment rules in the order the rules are defined in the EventType. No preexisting
    value might be changed (even if added by an enrichment rule). Violations on this will force
    the immediate rejection of the Event. The invalid overwrite attempt will be included in
    the item’s BatchItemResponse object.

  3. The incoming Event’s relative ordering is evaluated according to the rule on the
    EventType. Failure to evaluate the rule will reject the Event.

    Given the batched nature of this operation, any violation on validation or failures on
    enrichment or partitioning will cause the whole batch to be rejected, i.e. none of its
    elements are pushed to the underlying broker.

    Failures on writing of specific partitions to the broker might influence other
    partitions. Failures at this stage will fail only the affected partitions.
Name Located in Description
name path String

Name of the EventType

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

event body
Array of Event

The Event being published

-

All events in the batch have been successfully published.

At least one event has failed to be submitted. The batch might be partially submitted.

Problem

Client is not authenticated

At least one event failed to be validated, enriched or partitioned. None were submitted.

Problem

Access is forbidden for the client or event type

/event-types/{name}/partitions

Lists the Partitions for the given event-type.

This endpoint is mostly interesting for monitoring purposes or in cases when consumer wants
to start consuming older messages.
If per-EventType authorization is enabled, the caller must be authorized to read from the EventType.

Name Located in Description
name path String

EventType name

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

cursors query
Array of String
format: #/definitions/Cursor

Each cursor indicates the partition and offset consumed by the client. When this parameter is provided,
Nakadi calculates the consumer lag, e.g. the number of events between the provided offset and the most
recent published event. This lag will be present in the response as unconsumed_events property.

It’s not mandatory to specify cursors for every partition.

The lag calculation is non inclusive, e.g. if the provided offset is the offset of the latest published
event, the number of unconsumed_events will be zero.

The value of this parameter must be a json array URL encoded.

Array of Partition

An array of Partitions

OK

Problem

Client is not authenticated

Problem

Access forbidden because of missing scope or EventType authorization failure.

/event-types/{name}/partitions/{partition}

Returns the given Partition of this EventType. If per-EventType authorization is enabled, the caller must
be authorized to read from the EventType.

Name Located in Description
name path String

EventType name

partition path String

Partition id

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

consumed_offset query String

Offset to query for unconsumed events. Depends on partition parameter. When present adds the property
unconsumed_events to the response partition object.

Problem

Client is not authenticated

Problem

Access forbidden because of missing scope or EventType authorization failure.

/event-types/{name}/schemas

List of schemas ordered from most recent to oldest.

Name Located in Description
name path String

EventType name

limit query Integer
format: int64
default: 20
minimum: 1
maximum: 1000

maximum number of schemas retuned in one page

offset query Integer
format: int64
default: 0
minimum: 0

page offset

Object
Object properties:
_links : PaginationLinks
items :
Array of EventTypeSchema

list of schemas.

OK

/event-types/{name}/schemas/{version}

Retrieves a given schema version. A special {version} key named ‘latest’ is provided for
convenience.

Name Located in Description
name path String

EventType name

version path String

EventType schema version

EventTypeSchema

Schema object.

/event-types/{name}/shifted-cursors

Transforms a list of Cursors with shift into a list without shifts. This is useful when there is the need
for randomly access events in the stream.
If per-EventType authorization is enabled, the caller must be authorized to read from the EventType.

Name Located in Description
name path String

EventType name

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

cursors body
Array of ShiftedCursor

GET with payload.

Cursors indicating the positions to be calculated. Given a initial cursor, with partition and offset, it’s
possible to obtain another cursor that is relatively forward or backward to it. When shift is positive,
Nakadi will respond with a cursor that in forward shif positions based on the initial partition and
offset. In case shift is negative, Nakadi will move the cursor backward.

Note: It’s not currently possible to shift cursors based on time. It’s only possible to shift cursors
based on the number of events to forward of backward given by shift.

Array of Cursor

An array of Cursors with shift applied to the offset. The response should contain cursors in the same
order as the request.

OK

Problem

It’s only possible to navigate from a valid cursor to another valid cursor, i.e. partitions and
offsets must exist and not be expired. Any combination of parameters that might break this rule will
result in 422. For example:

  • if the initial partition and offset are expired.
  • if the shift provided leads to a already expired cursor.
  • if the shift provided leads to a cursor that is not yet existent, i.e. it’s pointing to
    some cursor yet to be generated in the future.
Problem

Access forbidden because of missing scope or EventType authorization failure.

/event-types/{name}/timelines

List timelines for a given event type.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.

Name Located in Description
name path String

Name of the EventType to list timelines for.

Array of Object
Object properties:
id : String
format: uuid
event_type : String
order : Integer
storage_id : String
topic : String
created_at : String
format: date-time
switched_at : String
format: date-time
cleaned_up_at : String
format: date-time
latest_position : Object

list of timelines.

OK

Problem

No such event type

Problem

Access forbidden

/event-types/{name}/timelines

Creates a new timeline for an event type and makes it active.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.

Name Located in Description
name path String

Name of the EventType

timeline_request body Object
Object properties:
storage_id : String

Storage id to be used for timeline creation

-

New timeline is created and in use

Problem

No such event type

Problem

Unprocessable entity due to non existing storage

Problem

Access forbidden

/metrics

Get monitoring metrics

Name Located in Description
Metrics

Ok

Problem

Client is not authenticated

/registry/enrichment-strategies

Lists all of the enrichment strategies supported by this Nakadi installation. Special or
custom strategies besides the defaults will be listed here.

Name Located in Description
Array of String

Returns a list of all enrichment strategies known to Nakadi

Problem

Client is not authenticated

/registry/partition-strategies

Lists all of the partition resolution strategies supported by this installation of Nakadi.
Special or custom strategies besides the defaults will be listed here.

Nakadi currently offers these inbuilt strategies:

  • random: Resolution of the target partition happens randomly (events are evenly
    distributed on the topic’s partitions).

  • user_defined: Target partition is defined by the client. As long as the indicated
    partition exists, Event assignment will respect this value. Correctness of the relative
    ordering of events is under the responsibility of the Producer. Requires that the client
    provides the target partition on metadata.partition (See EventMetadata). Failure to do
    so will reject the publishing of the Event.

  • hash: Resolution of the partition follows the computation of a hash from the value of
    the fields indicated in the EventType’s partition_key_fields, guaranteeing that Events
    with same values on those fields end in the same partition. Given the event type’s category
    is DataChangeEvent, field path is considered relative to “data”.
Name Located in Description
Array of String

Returns a list of all partitioning strategies known to Nakadi

Problem

Client is not authenticated

/settings/admins

Lists all administrator permissions. This endpoint is restricted to administrators with the ‘read’ permission.

Name Located in Description
AdminAuthorization

List all administrator permissions.

Problem

Client is not authenticated

Problem

Access forbidden

/settings/admins

Updates the list of administrators. This endpoint is restricted to administrators with the ‘admin’ permission.

Name Located in Description
authorization body AdminAuthorization

Lists of administrators

AdminAuthorization

List all administrator permissions.

Problem

Client is not authenticated

Problem

Access forbidden

Problem

Unprocessable entity due to not enough administrators in a list

/settings/blacklist

Lists all blocked producers/consumers divided by app and event type.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.

Name Located in Description
Object
Object properties:
producers : Object
Object properties:
event_types :
Array of String

a list of all blocked event types for publishing events.

apps :
Array of String

a list of all blocked apps for publishing events.

a list of all blocked producers.

consumers : Object
Object properties:
event_types :
Array of String

a list of all blocked event types for consuming events.

apps :
Array of String

a list of all blocked apps for consuming events.

a list of all blocked consumers.

Lists all blocked producers/consumers.

/settings/blacklist/{blacklist_type}/{name}

Blocks publication/consumption for particular app or event type.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.

Name Located in Description
blacklist_type path String

Type of the blacklist to put client into.

List of available types:

  • ‘CONSUMER_APP’: consumer application.
  • ‘CONSUMER_ET’: consumer event type.
  • ‘PRODUCER_APP’: producer application.
  • ‘PRODUCER_ET’: producer event type.
name path String

Name of the client to block.

-

Client or event type was successfully blocked.

/settings/blacklist/{blacklist_type}/{name}

Unblocks publication/consumption for particular app or event type.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.

Name Located in Description
blacklist_type path String

Type of the blacklist to put client into.

List of available types:

  • ‘CONSUMER_APP’: consumer application.
  • ‘CONSUMER_ET’: consumer event type.
  • ‘PRODUCER_APP’: producer application.
  • ‘PRODUCER_ET’: producer event type.
name path String

Name of the client to unblock.

-

Client was successfully unblocked.

/settings/features

Lists all available features.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.

Name Located in Description
Object
Object properties:
items :
Array of Feature

list of features.

A list of all available features.

/settings/features

Enables or disables feature depends on the payload
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.

Name Located in Description
feature body Feature
-

Feature was successfully accepted.

/storages

Lists all available storage backends.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.

Name Located in Description
Array of Storage

list of storage backends.

A list of all available storage backends.

Problem

Access forbidden

/storages

Creates a new storage backend.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.

Name Located in Description
storage body Storage

Storage description

Storage

Storage backend was successfully registered. Returns storage object that was created.

Problem

A storage with the same ID already exists

Problem

Access forbidden

/storages/default/{id}

Sets default storage to use in Nakadi.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.

Name Located in Description
id path String

storage backend ID

Storage

OK

Problem

Storage backend not found

Problem

Access forbidden

/storages/{id}

Retrieves a storage backend by its ID.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.

Name Located in Description
id path String

storage backend ID

Storage

OK

Problem

Storage backend not found

Problem

Access forbidden

/storages/{id}

Deletes a storage backend from its ID, if it is not in use.
The oauth resource owner username has to be equal to ‘nakadi.oauth2.adminClientId’ property
to be able to access this endpoint.

Name Located in Description
id path String

storage backend ID

-

Storage backend was deleted

Problem

Storage backend not found

Problem

Storage backend could not be deleted

Problem

Access forbidden

/subscriptions

Lists all subscriptions that exist in a system. List is ordered by creation date/time descending (newest
subscriptions come first).

Name Located in Description
owning_application query String

Parameter to filter subscriptions list by owning application. If not specified - the result list will
contain subscriptions of all owning applications.

event_type query
Array of String
collectionFormat: multi

Parameter to filter subscriptions list by event types. If not specified - the result list will contain
subscriptions for all event types. It’s possible to provide multiple values like
event_type=et1&event_type=et2, in this case it will show subscriptions having both et1 and et2

limit query Integer
format: int64
default: 20
minimum: 1
maximum: 1000

maximum number of subscriptions retuned in one page

offset query Integer
format: int64
default: 0
minimum: 0

page offset

Object
Object properties:
_links : PaginationLinks
items :
Array of Subscription

list of subscriptions

OK

Problem

Bad Request

/subscriptions

This endpoint creates a subscription for EventTypes. The subscription is needed to be able to
consume events from EventTypes in a high level way when Nakadi stores the offsets and manages the
rebalancing of consuming clients.
The subscription is identified by its key parameters (owning_application, event_types, consumer_group). If
this endpoint is invoked several times with the same key subscription properties in body (order of even_types is
not important) - the subscription will be created only once and for all other calls it will just return
the subscription that was already created.

Name Located in Description
subscription body Subscription

Subscription to create

Subscription
headers:
Location:
description: The relative URI for this subscription resource.
type: string

Subscription for such parameters already exists. Returns subscription object that already
existed.

Subscription
headers:
Location:
description: The relative URI for the created resource.
type: string
Content-Location:
description: If the Content-Location header is present and the same as the Location header the client can assume it has an up to date representation of the Subscription and a corresponding GET request is not needed.
type: string

Subscription was successfuly created. Returns subscription object that was created.

Problem

Bad Request

Problem

Unprocessable Entity

/subscriptions/{subscription_id}

Returns a subscription identified by id.

Name Located in Description
subscription_id path String
format: uuid

Id of subscription.

Problem

Subscription not found

/subscriptions/{subscription_id}

Deletes a subscription.

Name Located in Description
subscription_id path String
format: uuid

Id of subscription.

-

Subscription was deleted

Problem

Subscription not found

/subscriptions/{subscription_id}/cursors

Exposes the currently committed offsets of a subscription.

Name Located in Description
subscription_id path String
format: uuid

Id of subscription.

Object
Object properties:
items :

list of cursors for subscription

Ok

Problem

Subscription not found

/subscriptions/{subscription_id}/cursors

Endpoint for committing offsets of the subscription. If there is uncommited data, and no commits happen
for 60 seconds, then Nakadi will consider the client to be gone, and will close the connection. As long
as no events are sent, the client does not need to commit.

If the connection is closed, the client has 60 seconds to commit the events it received, from the moment
they were sent. After that, the connection will be considered closed, and it will not be possible to do
commit with that X-Nakadi-StreamId anymore.

When a batch is committed that also automatically commits all previous batches that were
sent in a stream for this partition.

Name Located in Description
subscription_id path String

Id of subscription

X-Nakadi-StreamId header String

Id of stream which client uses to read events. It is not possible to make a commit for a terminated or
none-existing stream. Also the client can’t commit something which was not sent to his stream.

cursors body Object
Object properties:
items :

List of cursors that the consumer acknowledges to have successfully processed.

-

Offsets were committed

Object
Object properties:
items :

list of items which describe commit result for each cursor

At least one cursor which was tried to be committed is older or equal to already committed one. Array
of commit results is returned for this status code.

Problem

Subscription not found

Problem

Unprocessable Entity

/subscriptions/{subscription_id}/cursors

Reset subscription offsets to specified values.
Client connected after this operation will get events starting from next offset position.
During this operation the subscription’s consumers will be disconnected. The request can hang up until
subscription commit timeout. During that time requests to subscription streaming endpoint
will be rejected with 409. The clients should reconnect once the request is finished with 204.

Name Located in Description
subscription_id path String

Id of subscription

cursors body Object
Object properties:
items :

List of cursors to reset subscription to.

-

Offsets were reset

Problem

Subscription not found

Problem

Cursors reset is already in progress for provided subscription

Problem

Unprocessable Entity

/subscriptions/{subscription_id}/events

Starts a new stream for reading events from this subscription. The data will be automatically rebalanced
between streams of one subscription. The minimal consumption unit is a partition, so it is possible to start as
many streams as the total number of partitions in event-types of this subscription. The rebalance currently
only operates with the number of partitions so the amount of data in event-types/partitions is not considered
during autorebalance.
The position of the consumption is managed by Nakadi. The client is required to commit the cursors he gets in
a stream.

Name Located in Description
subscription_id path String
format: uuid

Id of subscription.

max_uncommitted_events query Integer
format: int32
default: 10

The amount of uncommitted events Nakadi will stream before pausing the stream. When in paused
state and commit comes - the stream will resume. Minimal value is 1.

batch_limit query Integer
format: int32
default: 1

Maximum number of Events in each chunk (and therefore per partition) of the stream.

stream_limit query Integer
format: int32
default: 0

Maximum number of Events in this stream (over all partitions being streamed in this
connection).

  • If 0 or undefined, will stream batches indefinitely.

  • Stream initialization will fail if stream_limit is lower than batch_limit.
batch_flush_timeout query Number
format: int32
default: 30

Maximum time in seconds to wait for the flushing of each chunk (per partition).

  • If the amount of buffered Events reaches batch_limit before this batch_flush_timeout
    is reached, the messages are immediately flushed to the client and batch flush timer is reset.

  • If 0 or undefined, will assume 30 seconds.
stream_timeout query Number
format: int32
default: 0
minimum: 0
maximum: 4200

Maximum time in seconds a stream will live before connection is closed by the server.
If 0 or unspecified will stream for 1h ±10min.

If this timeout is reached, any pending messages (in the sense of stream_limit) will be flushed
to the client.

Stream initialization will fail if stream_timeout is lower than batch_flush_timeout.
If the stream_timeout is greater than max value (4200 seconds) - Nakadi will treat this as not
specifying stream_timeout (this is done due to backwards compatibility).

stream_keep_alive_limit query Integer
format: int32
default: 0

Maximum number of empty keep alive batches to get in a row before closing the connection.

If 0 or undefined will send keep alive messages indefinitely.

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

SubscriptionEventStreamBatch
headers:
X-Nakadi-StreamId:
description: the id of this stream generated by Nakadi. Must be used for committing events that were read by client from this stream.
type: string

Ok. Stream started.
Stream format is a continuous series of SubscriptionEventStreamBatchs separated by \n

Problem

Bad Request

Problem

Subscription not found.

Problem

Conflict. There are no empty slots for this subscriptions. The amount of consumers for this subscription
already equals the maximal value - the total amount of this subscription partitions.

This status code is also returned in the case of resetting subscription cursors request still in the
progress.

Problem

Access is forbidden for the client or event type

/subscriptions/{subscription_id}/stats

exposes statistics of specified subscription

Name Located in Description
subscription_id path String
format: uuid

Id of subscription.

Object
Object properties:
items :

statistics list for specified subscription

Ok

Problem

Subscription not found

Definitions

AdminAuthorization

Authorization section for admin operations. This section defines three access control lists: one for writing to admin endpoints and producing events (‘writers’), one for reading from admin endpoints and consuming events (‘readers’), and one for updating the list of administrators (‘admins’).

Name Description
admins
minItems: 1

An array of subject attributes that are required for updating the list of administrators. Any one of the
attributes defined in this array is sufficient to be authorized.

readers
minItems: 1

An array of subject attributes that are required for reading from admin endpoints. Any one of the
attributes defined in this array is sufficient to be authorized.

writers
minItems: 1

An array of subject attributes that are required for writing to admin endpoints. Any one of the
attributes defined in this array is sufficient to be authorized.

AuthorizationAttribute

An attribute for authorization. This object includes a data type, which represents the type of the attribute attribute (which data types are allowed depends on which authorization plugin is deployed, and how it is configured), and a value. A wildcard can be represented with data type ‘’, and value ‘’. It means that all authenticated users are allowed to perform an operation.

Name Description
data_type String

the type of attribute (e.g., ‘team’, or ‘permission’, depending on the Nakadi configuration)

value String

the value of the attribute

BatchItemResponse

A status corresponding to one individual Event’s publishing attempt.

Name Description
eid String
format: uuid

eid of the corresponding item. Will be absent if missing on the incoming Event.

publishing_status String
enum: submitted, failed, aborted

Indicator of the submission of the Event within a Batch.

  • “submitted” indicates successful submission, including commit on he underlying broker.

  • “failed” indicates the message submission was not possible and can be resubmitted if so
    desired.

  • “aborted” indicates that the submission of this item was not attempted any further due
    to a failure on another item in the batch.
step String
enum: none, validating, partitioning, enriching, publishing

Indicator of the step in the publishing process this Event reached.

In Items that “failed” means the step of the failure.

  • “none” indicates that nothing was yet attempted for the publishing of this Event. Should
    be present only in the case of aborting the publishing during the validation of another
    (previous) Event.

  • “validating”, “partitioning”, “enriching” and “publishing” indicate all the
    corresponding steps of the publishing process.
detail String

Human readable information about the failure on this item. Items that are not “submitted”
should have a description.

BusinessEvent

A Business Event.

Usually represents a status transition in a Business process.

Mixin:
  1. Event
  2. Object
    Object properties:
    metadata : EventMetadata

Cursor

Name Description
partition String

Id of the partition pointed to by this cursor.

offset String

Offset of the event being pointed to.

Note that if you want to specify beginning position of a stream with first event at offset N,
you should specify offset N-1.
This applies in cases when you create new subscription or reset subscription offsets.
Also for stream start offsets one can use two special values:

  • begin - read from the oldest available event.
  • end - read from the most recent offset.

CursorCommitResult

The result of single cursor commit. Holds a cursor itself and a result value.

Name Description
cursor SubscriptionCursor
result String

The result of cursor commit.

  • committed: cursor was successfully committed
  • outdated: there already was more recent (or the same) cursor committed, so the current one was not
    committed as it is outdated

CursorDistanceQuery

Name Description
initial_cursor Cursor
final_cursor Cursor

CursorDistanceResult

Mixin:
  1. CursorDistanceQuery
  2. Object
    Object properties:
    distance : Number
    format: int64

    Number of events between two offsets. Initial offset is exclusive. It’s only zero when both provided offsets
    are equal.

DataChangeEvent

A Data change Event.

Represents a change on a resource. Also contains indicators for the data type and the type of operation performed.

Mixin:
  1. Event
  2. Object
    Object properties:
    data_type : String
    example: pennybags:order
    data_op : String
    enum: C, U, D, S

    The type of operation executed on the entity.

    • C: Creation
    • U: Update
    • D: Deletion
    • S: Snapshot
    metadata : EventMetadata
    data : Object

    The payload of the type

Event

Note The Event definition will be externalized in future versions of this document.

A basic payload of an Event. The actual schema is dependent on the information configured for the EventType, as is its enforcement (see POST /event-types). Setting of metadata properties are dependent on the configured enrichment as well.

For explanation on default configurations of validation and enrichment, see documentation of EventType.category.

For concrete examples of what will be enforced by Nakadi see the objects BusinessEvent and DataChangeEvent below.

EventMetadata

Metadata for this Event.

Contains commons fields for both Business and DataChange Events. Most are enriched by Nakadi upon reception, but they in general MIGHT be set by the client.

Name Description
eid String
format: uuid
example: 105a76d8-db49-4144-ace7-e683e8f4ba46

Identifier of this Event.

Clients MUST generate this value and it SHOULD be guaranteed to be unique from the
perspective of the producer. Consumers MIGHT use this value to assert uniqueness of
reception of the Event.

event_type String
example: pennybags.payment-business-event

The EventType of this Event. This is enriched by Nakadi on reception of the Event
based on the endpoint where the Producer sent the Event to.

If provided MUST match the endpoint. Failure to do so will cause rejection of the
Event.

occurred_at String
format: date-time
example: 1996-12-19T16:39:57-08:00

Timestamp of creation of the Event generated by the producer.

received_at String
readOnly: true
format: date-time
example: 1996-12-19T16:39:57-08:00

Timestamp of the reception of the Event by Nakadi. This is enriched upon reception of
the Event.
If set by the producer Event will be rejected.

version String
readOnly: true

Version of the schema used for validating this event. This is enriched upon reception.
This string uses semantic versioning, which is better defined in the EventTypeSchema object.

parent_eids
Array of String
format: uuid
example: 105a76d8-db49-4144-ace7-e683e8f4ba46

Event identifier of the Event that caused the generation of this Event.
Set by the producer.

flow_id String
example: JAh6xH4OQhCJ9PutIV_RYw

The flow-id of the producer of this Event. As this is usually a HTTP header, this is
enriched from the header into the metadata by Nakadi to avoid clients having to
explicitly copy this.

partition String
example: 0

Indicates the partition assigned to this Event.

Required to be set by the client if partition strategy of the EventType is
‘user_defined’.

EventStreamBatch

One chunk of events in a stream. A batch consists of an array of Events plus a Cursor pointing to the offset of the last Event in the stream.

The size of the array of Event is limited by the parameters used to initialize a Stream.

If acting as a keep alive message (see GET /event-type/{name}/events) the events array will be omitted.

Sequential batches might present repeated cursors if no new events have arrived.

Name Description
cursor Cursor
info StreamInfo
events
Array of Event

EventType

An event type defines the schema and its runtime properties.

Name Description
name String
pattern: [a-zA-Z][-0-9a-zA-Z_]*(\.[0-9a-zA-Z][-0-9a-zA-Z_]*)*
example: order.order_cancelled, acme-platform.users

Name of this EventType. The name is constrained by a regular expression.

Note: the name can encode the owner/responsible for this EventType and ideally should
follow a common pattern that makes it easy to read and understand, but this level of
structure is not enforced. For example a team name and data type can be used such as
‘acme-team.price-change’.

owning_application String
example: price-service

Indicator of the (Stups) Application owning this EventType.

category String
enum: undefined, data, business

Defines the category of this EventType.

The value set will influence, if not set otherwise, the default set of
validations, enrichment-strategies, and the effective schema for validation in
the following way:

  • undefined: No predefined changes apply. The effective schema for the validation is
    exactly the same as the EventTypeSchema.

  • data: Events of this category will be DataChangeEvents. The effective schema during
    the validation contains metadata, and adds fields data_op and data_type. The
    passed EventTypeSchema defines the schema of data.

  • business: Events of this category will be BusinessEvents. The effective schema for
    validation contains metadata and any additionally defined properties passed in the
    EventTypeSchema directly on top level of the Event. If name conflicts arise, creation
    of this EventType will be rejected.
enrichment_strategies
Array of String
enum: metadata_enrichment

Determines the enrichment to be performed on an Event upon reception. Enrichment is
performed once upon reception (and after validation) of an Event and is only possible on
fields that are not defined on the incoming Event.

For event types in categories ‘business’ or ‘data’ it’s mandatory to use
metadata_enrichment strategy. For ‘undefined’ event types it’s not possible to use this
strategy, since metadata field is not required.

See documentation for the write operation for details on behaviour in case of unsuccessful
enrichment.

partition_strategy String
default: random

Determines how the assignment of the event to a partition should be handled.

For details of possible values, see GET /registry/partition-strategies.

compatibility_mode String
default: forward

Compatibility mode provides a mean for event owners to evolve their schema, given changes respect the
semantics defined by this field.

It’s designed to be flexible enough so that producers can evolve their schemas while not
inadvertently breaking existent consumers.

Once defined, the compatibility mode is fixed, since otherwise it would break a predefined contract,
declared by the producer.

List of compatibility modes:

  • ‘compatible’: Consumers can reliably parse events produced under different versions. Every event published
    since the first version is still valid based on the newest schema. When in compatible mode, it’s allowed to
    add new optional properties and definitions to an existing schema, but no other changes are allowed.
    Under this mode, the following json-schema attributes are not supported: not, patternProperties,
    additionalProperties and additionalItems. When validating events, additional properties is false.

  • ‘forward’: Compatible schema changes are allowed. It’s possible to use the full json schema specification
    for defining schemas. Consumers of forward compatible event types can safely read events tagged with the
    latest schema version as long as they follow the robustness principle.

  • ‘none’: Any schema modification is accepted, even if it might break existing producers or consumers. When
    validating events, no additional properties are accepted unless explicitly stated in the schema.
schema EventTypeSchema
partition_key_fields
Array of String

Required when ‘partition_resolution_strategy’ is set to ‘hash’. Must be absent otherwise.
Indicates the fields used for evaluation the partition of Events of this type.

If this is set it MUST be a valid required field as defined in the schema.

default_statistic EventTypeStatistics
options EventTypeOptions
authorization EventTypeAuthorization
created_at String
pattern: date-time

Date and time when this event type was created.

updated_at String
pattern: date-time

Date and time when this event type was last updated.

EventTypeAuthorization

Authorization section for an event type. This section defines three access control lists: one for producing events (‘writers’), one for consuming events (‘readers’), and one for administering an event type (‘admins’). Regardless of the values of the authorization properties, administrator accounts will always be authorized.

Name Description
admins
minItems: 1

An array of subject attributes that are required for updating the event type. Any one of the attributes
defined in this array is sufficient to be authorized. The wildcard item takes precedence over all others,
i.e. if it is present, all users are authorized.

readers
minItems: 1

An array of subject attributes that are required for reading events from the event type. Any one of the
attributes defined in this array is sufficient to be authorized. The wildcard item takes precedence over
all others, i.e., if it is present, all users are authorized.

writers
minItems: 1

An array of subject attributes that are required for writing events to the event type. Any one of the
attributes defined in this array is sufficient to be authorized.

EventTypeOptions

Additional parameters for tuning internal behavior of Nakadi.

Name Description
retention_time Integer
format: int64
default: 345600000

Number of milliseconds that Nakadi stores events published to this event type.

EventTypeSchema

The most recent schema for this EventType. Submitted events will be validated against it.

Name Description
version String
readOnly: true
default: 1.0.0

This field is automatically generated by Nakadi. Values are based on semantic versioning. Changes to title
or description are considered PATCH level changes. Adding new optional fields is considered a MINOR level
change. All other changes are considered MAJOR level.

created_at String
readOnly: true
format: date-time
example: 1996-12-19T16:39:57-08:00

Creation timestamp of the schema. This is generated by Nakadi. It should not be
specified when updating a schema and sending it may result in a client error.

type String
enum: json_schema

The type of schema definition. Currently only json_schema (JSON Schema v04) is supported, but in the
future there could be others.

schema String

The schema as string in the syntax defined in the field type. Failure to respect the
syntax will fail any operation on an EventType.

EventTypeStatistics

Operational statistics for an EventType. This data may be provided by users on Event Type creation. Nakadi uses this object in order to provide an optimal number of partitions from a throughput perspective.

Name Description
messages_per_minute Integer

Write rate for events of this EventType. This rate encompasses all producers of this
EventType for a Nakadi cluster.

Measured in event count per minute.

message_size Integer

Average message size for each Event of this EventType. Includes in the count the whole serialized
form of the event, including metadata.
Measured in bytes.

read_parallelism Integer

Amount of parallel readers (consumers) to this EventType.

write_parallelism Integer

Amount of parallel writers (producers) to this EventType.

Feature

Feature of Nakadi to be enabled or disabled

Name Description
feature String
enabled Boolean

Metrics

Object containing application metrics.

PaginationLink

URI identifying another page of items

Name Description
href String
format: uri
example: /subscriptions?offset=20&limit=10

PaginationLinks

contains links to previous and next pages of items

Name Description
prev PaginationLink
next PaginationLink

Partition

Partition information. Can be helpful when trying to start a stream using an unmanaged API.

This information is not related to the state of the consumer clients.

Name Description
partition String
oldest_available_offset String

An offset of the oldest available Event in that partition. This value will be changing
upon removal of Events from the partition by the background archiving/cleanup mechanism.

newest_available_offset String

An offset of the newest available Event in that partition. This value will be changing
upon reception of new events for this partition by Nakadi.

This value can be used to construct a cursor when opening streams (see
GET /event-type/{name}/events for details).

Might assume the special name BEGIN, meaning a pointer to the offset of the oldest
available event in the partition.

unconsumed_events Number
format: int64

Approximate number of events unconsumed by the client. This is also known as consumer lag and is used for
monitoring purposes by consumers interested in keeping an eye on the number of unconsumed events.

Problem

Name Description
type String
format: uri
example: http://httpstatus.es/503

An absolute URI that identifies the problem type. When dereferenced, it SHOULD provide
human-readable API documentation for the problem type (e.g., using HTML). This Problem
object is the same as provided by https://github.com/zalando/problem

title String
example: Service Unavailable

A short, summary of the problem type. Written in English and readable for engineers
(usually not suited for non technical stakeholders and not localized)

status Integer
format: int32
example: 503

The HTTP status code generated by the origin server for this occurrence of the problem.

detail String
example: Connection to database timed out

A human readable explanation specific to this occurrence of the problem.

instance String
format: uri

An absolute URI that identifies the specific occurrence of the problem.
It may or may not yield further information if dereferenced.

ShiftedCursor

Mixin:
  1. Cursor
  2. Object
    Object properties:
    shift : Number
    format: int64

    This number is a modifier for the offset. It moves the cursor forward or backwards by the number of events
    provided.
    For example, suppose a user wants to read events starting 100 positions before offset
    “001-000D-0000000000000009A8”, it’s possible to specify shift with -100 and Nakadi will make the
    necessary calculations to move the cursor backwards relatively to the given offset.

    Users should use this feature only for debugging purposes. Users should favor using cursors provided in
    batches when streaming from Nakadi. Navigating in the stream using shifts is provided only for
    debugging purposes.

Storage

A storage backend.

Name Description
id String
maxLength: 36

The ID of the storage backend.

storage_type String

the type of storage. Possible values: [‘kafka’]

kafka_configuration Object
Object properties:
exhibitor_address : String

the Zookeeper address

exhibitor_port : String

the Zookeeper path

zk_address : String

the Zookeeper address

zk_path : String

the Zookeeper path

configuration settings for kafka storage. Only necessary if the storage type is ‘kafka’

StreamInfo

This object contains general information about the stream. Used only for debugging purposes. We recommend logging this object in order to solve connection issues. Clients should not parse this structure.

Subscription

Subscription is a high level consumption unit. Subscriptions allow applications to easily scale the number of clients by managing consumed event offsets and distributing load between instances. The key properties that identify subscription are ‘owning_application’, ‘event_types’ and ‘consumer_group’. It’s not possible to have two different subscriptions with these properties being the same.

Name Description
id String
readOnly: true

Id of subscription that was created. Is generated by Nakadi, should not be specified when creating
subscription.

owning_application String
example: gizig
minLength: 1

The id of application owning the subscription.

event_types
Array of String

EventTypes to subscribe to.
The order is not important. Subscriptions that differ only by the order of EventTypes will be
considered the same and will have the same id. The size of event_types list is limited by total number
of partitions within these event types. Default limit for partition count is 100.

consumer_group String
example: read-product-updates
minLength: 1
default: default

The value describing the use case of this subscription.
In general that is an additional identifier used to differ subscriptions having the same
owning_application and event_types.

created_at String
readOnly: true
format: date-time
example: 1996-12-19T16:39:57-08:00

Timestamp of creation of the subscription. This is generated by Nakadi. It should not be
specified when creating subscription and sending it may result in a client error.

read_from String
default: end

Position to start reading events from. Currently supported values:

  • begin - read from the oldest available event.
  • end - read from the most recent offset.
  • cursors - read from cursors provided in initial_cursors property.
    Applied when the client starts reading from a subscription.
initial_cursors

List of cursors to start reading from. This property is required when read_from = cursors.
The initial cursors should cover all partitions of subscription.
Clients will get events starting from next offset positions.

SubscriptionCursor

Mixin:
  1. Cursor
  2. Object
    Object properties:
    event_type : String

    The name of the event type this partition’s events belong to.

    cursor_token : String

    An opaque value defined by the server.

SubscriptionCursorWithoutToken

Mixin:
  1. Cursor
  2. Object
    Object properties:
    event_type : String

    The name of the event type this partition’s events belong to.

SubscriptionEventStreamBatch

Analogue to EventStreamBatch but used for high level streamming. It includes specific cursors for committing in the high level API.

Name Description
cursor SubscriptionCursor
info StreamInfo
events
Array of Event

SubscriptionEventTypeStats

statistics of one event-type within a context of subscription

Name Description
event_type String

event-type name

partitions
Array of Object
Object properties:
partition : String
state : String

The state of this partition in current subscription. Currently following values are possible:

  • unassigned: the partition is currently not assigned to any client;
  • reassigning: the partition is currently reasssigning from one client to another;
  • assigned: the partition is assigned to a client.
unconsumed_events : Number

The amount of events in this partition that are not yet consumed within this subscription.
The property may be absent at the moment when no events were yet consumed from the partition in this
subscription (In case of read_from is BEGIN or END)

stream_id : String

the id of the stream that consumes data from this partition

statistics of partition within a subscription context

statistics of partitions of this event-type