Introduction

Nakadi Event Broker

The goal of Nakadi (ნაკადი means “stream” in Georgian) is to provide an event broker infrastructure to:

  • Abstract event delivery via a secured RESTful API.

    This allows microservices teams to maintain service boundaries, and not directly depend on any specific message broker technology. Access can be managed individually for every Event Type and secured using OAuth and custom authorization plugins.

  • Enable convenient development of event-driven applications and asynchronous microservices.

    Event types can be defined with Event type schemas and managed via a registry. All events will be validated against the schema before publishing the event type. It allows to granite the data quality and data consistency for the data consumers.

  • Efficient low latency event delivery.

    Once a publisher sends an event using a simple HTTP POST, consumers can be pushed to via a streaming HTTP connection, allowing near real-time event processing. The consumer connection has keepalive controls and support for managing stream offsets using subscriptions.

Read more to understand The big picture Architecture for data integration

Watch the talk Data Integration in the World of Microservices

Development status

Nakadi is high-load production ready. Zalando uses Nakadi as its central Event Bus Service. Nakadi reliably handles the traffic from thousands event types with the throughput of more than hundreds gigabytes per second. The project is in active development.

Features

  • Stream:
    • REST abstraction over Kafka-like queues.
    • CRUD for event types.
    • Event batch publishing.
    • Low-level interface.
      • manual client side partition management is needed
      • no support of commits
    • High-level interface (Subscription API).
      • automatic redistribution of partitions between consuming clients
      • commits should be issued to move server-side cursors
  • Schema:
    • Schema registry.
    • Several event type categories (Undefined, Business, Data Change).
    • Several partitioning strategies (Random, Hash, User defined).
    • Event enrichment strategies.
    • Schema evolution.
    • Events validation using an event type schema.
  • Security:
    • OAuth2 authentication.
    • Per-event type authorization.
    • Blacklist of users and applications.
  • Operations:
    • STUPS platform compatible.
    • ZMON monitoring compatible.
    • SLO monitoring.
    • Timelines.
      • This allows transparently switch production and consumption to different cluster (tier, region, AZ) without moving actual data and any service degradation.
      • Opens possibility for implementation of other streaming technologies and engines besides Kafka (like AWS Kinesis, Google pub/sub etc.)

Additional features that we plan to cover in the future are:

  • Support for different streaming technologies and engines. Nakadi currently uses Apache Kafka as its broker, but other providers (such as Kinesis) will be possible.
  • Filtering of events for subscribing consumers.
  • Store old published events forever using transparent fall back backup shortages like AWS S3.
  • Separate the internal schema register to standalone service.
  • Use additional schema formats and protocols like Avro, protobuf and others.

The zalando-nakadi organisation contains many useful related projects like

  • Client libraries
  • SDK
  • GUI
  • DevOps tools and more

Examples

Creating Event Types

An event type can be created by posting to the event-types resource.

curl -v -XPOST http://localhost:8080/event-types -H "Content-type: application/json" -d '{
  "name": "order.ORDER_RECEIVED",
  "owning_application": "order-service",
  "category": "undefined",
  "partition_strategy": "random",
  "schema": {
    "type": "json_schema",
    "schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
  }
}'

Publishing Events

Events for an event type can be published by posting to its “events” collection:

curl -v -XPOST http://localhost:8080/event-types/order.ORDER_RECEIVED/events -H "Content-type: application/json" -d '[
  {
    "order_number": "24873243241",
    "metadata": {
      "eid": "d765de34-09c0-4bbb-8b1e-7160a33a0791",
      "occurred_at": "2016-03-15T23:47:15+01:00"
    }
  }, {
    "order_number": "24873243242",
    "metadata": {
      "eid": "a7671c51-49d1-48e6-bb03-b50dcf14f3d3",
      "occurred_at": "2016-03-15T23:47:16+01:00"
    }
  }]'


HTTP/1.1 200 OK  

Consuming Events

You can open a stream for an Event Type via the events sub-resource:

curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events 
    

HTTP/1.1 200 OK

{"cursor":{"partition":"0","offset":"4"},"events":[{"order_number": "ORDER_001", "metadata": {"eid": "4ae5011e-eb01-11e5-8b4a-1c6f65464fc6", "occurred_at": "2016-03-15T23:56:11+01:00"}}]}
{"cursor":{"partition":"0","offset":"5"},"events":[{"order_number": "ORDER_002", "metadata": {"eid": "4bea74a4-eb01-11e5-9efa-1c6f65464fc6", "occurred_at": "2016-03-15T23:57:15+01:00"}}]}
{"cursor":{"partition":"0","offset":"6"},"events":[{"order_number": "ORDER_003", "metadata": {"eid": "4cc6d2f0-eb01-11e5-b606-1c6f65464fc6", "occurred_at": "2016-03-15T23:58:15+01:00"}}]}

Nakadi community

There is a large ecosystem of projects around Nakadi. Check they out on zalando-nakadi

Getting Started

In this section we’ll walk through running a Nakadi service on your machine. Once you have the service up and running, you can jump to Using Nakadi to see how produce and consume messages.

Quickstart

You can run a Nakadi service locally using Docker. If you don’t have Docker installed, there are great instructions available on the Docker website.

Running a Server

From the project’s home directory you can install and start a Nakadi container via the gradlew command:

./gradlew startNakadi

This will start a docker container for the Nakadi server and another container with its PostgreSQL, Kafka and Zookeeper dependencies. You can read more about the gradlew script in the Building and Developing section

Stopping a Server

To stop the running Nakadi:

./gradlew stopNakadi

Notes

If you’re having trouble getting started, you might find an answer in the Frequently Asked Questions (FAQ) section of the documentation.

Ports

Some ports need to be available to run the service:

  • 8080 for the API server
  • 5432 for PostgreSQL
  • 9092 and 29092 for Kafka
  • 2181 for Zookeeper

Even though users of the API interact with port 8080, the other ports are exposed in order to run integration tests.

If you are not running the tests, it’s safe to modify docker-compose.yaml by removing the port forwarding from dependencies.

Nakadi Concepts

Nakadi Concepts

The Nakadi API allows the publishing and consuming of events over HTTP.

A good way to think of events is that they are like messages in a stream processing or queuing system, but have a defined structure that can be understood and validated. The object containing the information describing an event is called an event type.

To publish and consume events, an owning application must first register a new event type with Nakadi. The event type contains information such as its name, the aforementioned owning application, strategies for partitioning and enriching data, and a JSON Schema. Nakadi supports an event type registry API that lists all the available event types.

Once the event type is created, a resource called a stream becomes available for that event type. The stream will accept events for the type from a producer and can be read from by one or more consumers. Nakadi can validate each event that is sent to the stream.

An event type’s stream can be divided into one or more partitions. Each event is placed into exactly one partition. Each partition represents an ordered log - once an event is added to a partition its position is never changed, but there is no global ordering across partitions [1].

Consumers can read events and track their position in the stream using a cursor that is given to each partition. Consumers can also use a cursor to read from a stream at a particular position. Multiple consumers can read from the same stream, allowing different applications to read the stream simultaneously.

In summary, applications using Nakadi can be grouped as follows:

  • Event Type Owners: Event type owners interact with Nakadi via the event type registry to define event types based on a schema and create event streams.

  • Event Producers: Producers publish events to the event type’s stream, that conform to the event type’s schema.

  • Event Consumers: Consumers read events from the event stream. Multiple consumers can read from the same stream.


[1] For more detail on partitions and the design of streams see “The Log” by Jay Kreps.

Cursors, Offsets and Partitions

By default the events resource will consume from all partitions of an event type and from the end (or “tail”) of the stream. To select only particular partitions and a position where in the stream to start, you can supply an X-Nakadi-Cursors header in the request:

curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events \
  -H 'X-Nakadi-Cursors: [{"partition": "0", "offset":"12"}]'

The header value is a JSON array of cursors. Each cursor in the array describes its partition for the stream and an offset to stream from. Note that events within the same partition maintain their overall order.

The offset value of the cursor allows you to select where in the stream you want to consume from. This can be any known offset value, or the dedicated value BEGIN which will start the stream from the beginning. For example, to read from partition 0 from the beginning:

curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events \
  -H 'X-Nakadi-Cursors:[{"partition": "0", "offset":"BEGIN"}]'

The details of the partitions and their offsets for an event type are available via its partitions resource.

Event Stream Keepalives

If there are no events to be delivered Nakadi will keep a streaming connection open by periodically sending a batch with no events but which contains a cursor pointing to the current offset. For example:

curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events 
      

HTTP/1.1 200 OK

{"cursor":{"partition":"0","offset":"6"},"events":[{"order_number": "ORDER_003", "metadata": {"eid": "4cc6d2f0-eb01-11e5-b606-1c6f65464fc6", "occurred_at": "2016-03-15T23:58:15+01:00"}}]}
{"cursor":{"partition":"0","offset":"6"}}
{"cursor":{"partition":"0","offset":"6"}}
{"cursor":{"partition":"0","offset":"6"}}
{"cursor":{"partition":"0","offset":"6"}}

This can be treated as a keep-alive control for some load balancers.

Event Types

Event Types

The object containing the information describing an event is called an event type.

To publish events an event type must exist in Nakadi. The event type contains information such as its name, a category, the owning application, strategies for partitioning and enriching data, and a JSON Schema. Nakadi has an event type registry API that lists all the available event types.

Event Types and Categories

There are three main categories of event type defined by Nakadi -

  • Business Event: An event that is part of, or drives a business process, such as a state transition in a customer order.

  • Data Change Event: An event that represents a change to a record or other item, or a new item. Change events are associated with a create, update, delete, or snapshot operation.

  • Undefined Event: A free form category suitable for events that are entirely custom to the producer.

Each event category enables different capabilities for an event type, notably their schema and validation rules, which we’ll describe next.

Event Type Schema and Effective Schema

The events for the ‘business’ and ‘data’ categories have their own pre-defined schema structures, based on JSON Schema, as well as a schema that is defined custom to the event type when it is created. The pre-defined structures describe common fields for an event and the custom schema for the event is defined when the event type is created.

The schema for an event type is submitted as a JSON Schema and will only declare the custom part of the event. This means the pre-defined schema for the ‘business’ and ‘data’ categories don’t need to be declared (and should not be declared). The ‘undefined’ category has no-predefined schema.

When an event for one of these categories is posted to the server, it is expected to conform to the combination of the pre-defined schema and to the custom schema defined for the event type, and not just the custom part of the event. This combination is called the effective schema and is validated by Nakadi for the ‘business’ and ‘data’ types.

The ‘undefined` category behaves slightly different to the other categories. Its effective schema is exactly the same as the one created with its event type definition (it has no extra structure), but it is not validated by Nakadi. Instead an ‘undefined’ event type’s schema is simply made available in the event type registry for consumers to use if they wish.

The custom schema for an event type can be as simple as { "\additionalProperties\": true } to allow arbitrary JSON, but will usually have a more specific definition :)

Compatibility modes

Compatibility modes are used to control schema changes. Each mode solves a specific problem and thus presents different constraints.

Nakadi supports different compatibility modes for event types used to control valid schema changes and to ensure schema consistency of published event data. Compatibility modes define restrictions for event schema evolution, i.e. a set cascade of allowed schema changes and thereby different compatibility guarantees for event consumers.

The default compatibility mode is forward compatible, but full compatibility usage is definitely encouraged, and the default mode will change in near future.

Fully compatible

The compatible compatibility mode is the safest mode. It’s both forward compatible and backward compatible. It means that:

  1. Consumers using older schemas can safely read events validated by newer schemas.
  2. Consumers using newer schemas can safely read events validated by older schemas.

It guarantees high data quality, which is crucial for a variety of applications. At Zalando, it’s required to use this compatibility mode to have data processed by business intelligence and long term event storage by the data lake.

Supported changes:

  1. changes to meta attributes: titles and descriptions.
  2. addition of new optional attributes.

The following json-schema attributes are not supported:

  • additionalProperties
  • additionalItems
  • not
  • patternProperties

Removing the support for these attributes is necessary to avoid the introduction of incompatible changes.

Under this compatibility mode, it’s necessary to fully specify events properties in order for validation to succeed; events containing properties that are not declared in the schema will be rejected. For this reason, producers should first update their schemas with new attributes and only after that start producing events with such attributes.

Forward compatible

The forward mode has been designed to allow event type owners to expand their schemas without breaking existing consumers.

It’s called forward because consumers using older schemas can safely read events generated by newer schemas. However, it’s not backward compatible, in the sense that reading older events using newer schemas is not safe.

Supported changes:

  1. changes to meta attributes: titles and descriptions.
  2. addition of new attributes, both optional and required.
  3. marking attributes as required.
  4. change additionalProperties from true to object.
  5. narrow schema type definition from empty to specific

Under this mode event validation accepts events with fields not declared in the schema. In other words, it’s not necessary to specify additionalProperties true in the schema, since this is the default behaviour of json-schema validator.

We discourage the usage of additionalProperties entirely. The more complete a schema definition the more stable and valuable the events. The ability to not specify every attribute is there only for some very specific situations when it’s not possible to define them all.

Consumers reading events from forward compatible event types SHOULD ignore event attributes not declared in the event type schema. This is aligned with API guidelines for compatibility.

Incompatible

Under compatibility mode none schemas can be changed arbitrarily. This mode is not recommended unless there is a very good reason not to provide any compatibility guarantee.

Changing compatibility modes

Compatibility modes are designed to provide data consumers with guarantees about what can and cannot be changed with regards to event schema. With that in mind, it’s highly recommended that schema compatibility be changed only from lesser restrictive modes to more strict modes, e.g. none to forward to compatible. That way consumers are protected against incompatible changes.

In the case an incompatible change to a schema is required, it’s possible for event type admins to change the compatibility mode freely without any restrictions. This level of flexibility assumes that admins are sure that the changes to be made to a schema are not going to cause any disruption to downstream services.

Users should be aware about changes in validation behaviour when upgrading to compatible. Please, be sure to read the section on compatible mode above.

Creating an Event Type

An event type can be created by posting to the /event-types resource.

This example shows a business category event type called order_received:

curl -v -XPOST -H "Content-Type: application/json" http://localhost:8080/event-types -d '{
  "name": "order_received",
  "owning_application": "acme-order-service",
  "category": "business",
  "partition_strategy": "hash",
  "partition_key_fields": ["order_number"],
  "enrichment_strategies": ["metadata_enrichment"],
  "default_statistic": {
    "messages_per_minute": 1000,	
    "message_size":	5,
    "read_parallelism":	1,
    "write_parallelism": 1
  },
  "schema": {
    "type": "json_schema",
    "schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
  }
}'

The event type has a simple JSON Schema submitted as an escaped JSON string describing the order number and thus only declare the custom part of the schema. The partition_strategy says events will be allocated to partitions according to the hash of the order_number field, defined in partition_key_fields, and the owner’s name is "acme-order-service". The enrichment_strategies array says to apply metadata_enrichment to submitted events (common metadata is a feature of some categories).

The event type has an optional default_statistic object, which controls the number of partitions. Nakadi will use a sensible default if no value is provided. The values provided here cannot be changed later, so choose them wisely.

A successful request will result in a 201 Created response.

Once an event type is created, it is added to the event type registry and its details are visible from its URI in the registry. Events can then be posted to its stream and consumed by multiple clients.

The exact required fields depend on the event type’s category, but name, owning_application and schema are always expected. The “API Reference” contains more details on event types.

Partitions

An event type’s stream is divided into one or more partitions and each event is placed into exactly one partition. Partitions preserve the order of events - once an event is added to a partition its position relative to other events in the partition is never changed. The details of the partitions and their offsets for an event type are available via its /partitions resource.

Partition Ordering

Each partition is a fully ordered log, and there is no global ordering across partitions. Clients can consume a stream’s partitions independently and track their position across the stream.

/img/partitions.png

Dividing a stream this way allows the overall system to be scaled and provide good throughput for producers and consumers. It’s similar to how systems such as Apache Kafka and AWS Kinesis work.

Partition Strategies

The assignment of events to a partition is controllable by the producer. The partition_strategy field determines how events are mapped to partitions. Nakadi offers the following strategies:

  • random: the partition is selected randomly and events will be evenly distributed across partitions. Random is the default option used by Nakadi.

  • hash: the partition is selected by hashing the value of the fields defined in the event type’s partition_key_fields. In practice this means events that are about the same logical entity and which have the same values for the partition key will be sent to the same partition.

  • user_defined: the partition is set by the producer when sending an event. This option is only available for the ‘business’ and data’ categories.

Which option to use depends on your requirements. When order matters, hash is usually the right choice. For very high volume streams where order doesn’t matter, random can be a good choice as it load balances data well. The user defined option is a power tool, unless you know you need it, use hash or random. Hash is the preferred strategy, as it ensures that duplicated events will end up in the same partition.

Authorization

Per-resource authorization

Nakadi allows users to restrict access to resources they own - both for event types and subscriptions.

The authorization model is simple: policies can be attached to resources. A policy P defines which subjects can perform which operations on a resource R. To do so, the policy contains, for each operation, a list of attributes that represent subjects who are authorized to perform the operation on the resource. For a subject to be authorized, it needs to match at least one of these attributes (not necessarily all of them).

There are three kinds of operation: admin, to update the resource and delete it; read, to read events from the resource; and write, to write events to the resource.

An authorization request is represented by the tuple

R(subject, operation, resource)

The request will be approved iff the resource policy has at least one attribute for operation that matches the subject.

Protecting an event type or a subscription

Protecting an event type can be done either during the creation of the event type, or later, as an update to the event type. Users simply need to add an authorization section to their event type description, which looks like this:

  "authorization": {
    "admins": [{"data_type": "user", "value": "bfawlty"}],
    "readers": [{"data_type": "user", "value": "bfawlty"}],
    "writers": [{"data_type": "user", "value": "bfawlty"}]
  }

In this section, the admins list includes the attributes that authorize a subject to perform the admin operation; the readers list includes the attributes that authorize a subject to perform the read operation; and the writers list includes the attributes that authorize a subject to perform the write operation;

Similarly, protecting a subscription can be done during its creation or as an update to the subscription. Authorization section for a subscription looks like below, and contains only the admins and the readers lists. The list of admins specifies the attributes that can do admin operations like deletion or updating a subscription and the list of readers specifies the attributes that can read and commit to that subscription.

  "authorization": {
    "admins": [{"data_type": "user", "value": "bfawlty"}],
    "readers": [{"data_type": "user", "value": "bfawlty"}],
  }

Whenever an event type and subscription is created, or its authorization section is updated, all attributes are validated. The exact nature of the validation depends on the plugin implementation.

Creating an event type or a subscription with authorization

Here is a sample request with an authorization section. It gives read, write, and admin access to a single attribute, of type service:

curl -v -XPOST -H "Content-Type: application/json" http://localhost:8080/event-types -d '{
  "name": "order_received",
  "owning_application": "acme-order-service",
  "category": "business",
  "partition_strategy": "hash",
  "partition_key_fields": ["order_number"],
  "enrichment_strategies": ["metadata_enrichment"],
  "default_statistic": {
    "messages_per_minute": 1000,    
    "message_size":    5,
    "read_parallelism":    1,
    "write_parallelism": 1
  },
  "schema": {
    "type": "json_schema",
    "schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
  },
  "authorization": {
    "admins": [{"data_type": "user", "value": "bfawlty"}],
    "readers": [{"data_type": "user", "value": "bfawlty"}],
    "writers": [{"data_type": "user", "value": "bfawlty"}]
  }
}'

For creating a subscription, the sample request is as follows:

curl -v -XPOST -H "Content-Type: application/json" http://localhost:8080/subscriptions -d '{
  "owning_application": "acme-order-service",
  "consumer_group": "acme-orders",
  "event-types": ["orders_received"],
  "authorization": {
    "admins": [{"data_type": "user", "value": "bfawlty"}],
    "readers": [{"data_type": "user", "value": "bfawlty"}],
  }
}'

Updating an event type

Updating an event type or a subscription is similar to creating one. Here is a sample request, that gives read, write, and admin access to the same application (for an event-type):

curl -v -XPUT -H "Content-Type: application/json" http://localhost:8080/event-types/order_received -d '{
  "name": "order_received",
  "owning_application": "acme-order-service",
  "category": "business",
  "partition_strategy": "hash",
  "partition_key_fields": ["order_number"],
  "enrichment_strategies": ["metadata_enrichment"],
  "default_statistic": {
    "messages_per_minute": 1000,    
    "message_size":    5,
    "read_parallelism":    1,
    "write_parallelism": 1
  },
  "schema": {
    "type": "json_schema",
    "schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
  },
  "authorization": {
    "admins": [{"data_type": "user", "value": "bfawlty"}],
    "readers": [{"data_type": "user", "value": "bfawlty"}],
    "writers": [{"data_type": "user", "value": "bfawlty"}]
  }
}'

When updating an event type or subscription, users should keep in mind the following caveats:

  • If the event type or subscription already has an authorization section, then it cannot be removed in an update;
  • If the update changes the list of readers (for event-types and subscriptions), then all consumers will be disconnected. It is expected that they will try to reconnect, which will only work for those that are still authorized.

WARNING: this also applies to consumers using subscriptions; if a subscription includes multiple event types, and as a result of the update, a consumer loses read access to one of them, then the consumer will not be able to consume from the subscription anymore.

Producing Events

Producing Events

Posting an Event

One or more events can be published by posting to an event type’s stream.

The URI for a stream is a nested resource and based on the event type’s name - for example the “widgets” event type will have a relative resource path called /event-types/widgets/events.

This example posts two events to the order_received stream:

curl -v -XPOST -H "Content-Type: application/json" http://localhost:8080/event-types/order_received/events -d '[
  {
    "order_number": "24873243241",
    "metadata": {
      "eid": "d765de34-09c0-4bbb-8b1e-7160a33a0791",
      "occurred_at": "2016-03-15T23:47:15+01:00"
    }
  }, {
    "order_number": "24873243242",
    "metadata": {
      "eid": "a7671c51-49d1-48e6-bb03-b50dcf14f3d3",
      "occurred_at": "2016-03-15T23:47:16+01:00"
    }
  }]'


HTTP/1.1 200 OK  

As shown above, the event stream accepts an array of events.

Validation Strategies and Effective Schema

Each event sent to the stream will be validated relative to the effective schema for the event type’s category.

The validation behavior and the effective schema varies based on the event type’s category. For example, because the example above is a ‘business’ category event type, as well as the fields defined in the event type’s original schema, the events must also contain a metadata object with an eid and occurred_at fields in order to conform to the standard structure for that category.

Once the event is validated, it is placed into a partition and made available to consumers. If the event is invalid, it is rejected by Nakadi.

Enrichment Strategies

@@@TODO

Event Ordering

The order of events in the posted array will be the order they are published onto the event stream and seen by consumers. They are not re-ordered based on any values or properties of the data.

Applications that need to order events for a particular entity or based on a identifiable key in the data should configure their event type with the hash partitioning strategy and name the fields that can be used to construct the key. This allows partial ordering for a given entity.

Total ordering is not generally achievable with Nakadi (or Kafka) unless the partition size is configured to be size 1. In most cases, total ordering is not needed and in many cases is not desirable as it can severely limit system scalability and result in cluster hot spotting.

Delivery versus Arrival Order

Nakadi preserves the order of events sent to it (the “arrival order”), but has no control over the network between it and the producer. In some cases it may be possible for events to leave the producer but arrive at Nakadi in a different order (the “delivery order”).

Not all events need ordering guarantees but producers that do need end to end ordering have a few options they can take:

  • Wait for a response from the Nakadi server before posting the next event. This trades off overall producer throughput for ordering.

  • Use the parent_eids field in the ‘business’ and ‘data’ categories. This acts as a causality mechanism by allowing events to have “parent” events. Note the parent_eids option is not available in the ‘undefined’ category.

  • Define and document the ordering semantics as part of the event type’s scheme definition such that a consumer could use the information to sequence events at their end.

Low-level API

Consuming Events with the Low-level API

The Low-level API is deprecated, and will be removed from a future version of Nakadi. Please consider using the High-level API instead.

Connecting to a Stream

A consumer can open the stream for an Event Type via the /events sub-resource. For example to connect to the order_received stream send a GET request to its stream as follows:

curl -v http://localhost:8080/event-types/order_received/events 

The stream accepts various parameters from the consumer, which you can read about in the “API Reference”. In this section we’ll just describe the response format, along with how cursors and keepalives work.

HTTP Event Stream

The HTTP response on the wire will look something like this (the newline is show as \n for clarity):

curl -v http://localhost:8080/event-types/order_received/events 
    

HTTP/1.1 200 OK
Content-Type: application/x-json-stream

{"cursor":{"partition":"0","offset":"6"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"5"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"4"},"events":[...]}\n

Nakadi groups events into batch responses (see the next section, “Batch Responses” for some more details). Batches are separated by a newline and each available batch will be emitted on a single line. If there are no new batches the server will occasionally emit an empty batch (see the section “Event Stream Keepalives” further down).

Technically, while each batch is a JSON document, the overall response is not valid JSON. For this reason it is served as the media type application/x-stream-json rather than application/json. Consumers can use the single line delimited structure to frame data for JSON parsing.

Batch Response Formats

A pretty-printed batch object looks like this -

{
  "cursor": {
    "partition": "0",
    "offset": "4"
  },
  "events": [...]
} 

Each batch belongs to a single partition. The cursor object describes the partition and the offset for this batch of events. The cursor allow clients to checkpoint their position in the stream’s partition. Note that individual events in the stream don’t have cursors, they live at the level of a batch.

The events array contains a list of events that were published in the order they arrived from the producer. Note that while the producer can also send batches of events, there is no strict correlation between the batches the consumer is given and the ones the producer sends. Nakadi will regroup events send by the producer and distribute them across partitions as needed.

Cursors and Offsets

By default the /events resource will return data from all partitions of an event type stream and will do so from the end (or “tail”) of the stream. To select only particular partitions and a position in the stream to start, you can supply an X-Nakadi-Cursors header in the request:

curl -v http://localhost:8080/event-types/order_received/events \
  -H 'X-Nakadi-Cursors: [{"partition": "0", "offset":"12"}]'

The X-Nakadi-Cursors header value is a JSON array of cursors. Each cursor in the array describes its partition for the stream and an offset to stream from.

The offset value of the cursor allows you select where in the stream partition you want to consume from. This can be any known offset value, or the dedicated value begin which will start the stream from the beginning. For example, to read from partition 0 from the beginning:

curl -v http://localhost:8080/event-types/order_received/events \
  -H 'X-Nakadi-Cursors:[{"partition": "0", "offset":"begin"}]'

Event Stream Keepalives

If there are no events to be delivered the server will keep a streaming connection open by periodically sending a batch with no events but which contains a cursor pointing to the current offset. For example:

curl -v http://localhost:8080/event-types/order_received/events 
      

HTTP/1.1 200 OK
Content-Type: application/x-json-stream

{"cursor":{"partition":"0","offset":"6"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"6"}}\n
{"cursor":{"partition":"0","offset":"6"}}\n
{"cursor":{"partition":"0","offset":"6"}}\n

This can be treated as a keep-alive control.

Subscriptions

Subscriptions

Subscriptions allow clients to consume events, where the Nakadi server store offsets and automatically manages reblancing of partitions across consumer clients. This allows clients to avoid managing stream state locally.

The typical workflow when using subscriptions is:

  1. Create a Subscription specifying the event-types you want to read.

  2. Start reading batches of events from the subscription.

  3. Commit the cursors found in the event batches back to Nakadi, which will store the offsets.

If the connection is closed, and later restarted, clients will get events from the point of your last cursor commit. If you need more than one client for your subscription to distribute the load you can read the subscription with multiple clients and Nakadi will balance the load across them.

The following sections provide more detail on the Subscription API and basic examples of Subscription API creation and usage:

For a more detailed description and advanced configuration options please take a look at Nakadi swagger file.

Creating Subscriptions

A Subscription can be created by posting to the /subscriptions collection resource:

curl -v -XPOST "http://localhost:8080/subscriptions" -H "Content-type: application/json" -d '{
    "owning_application": "order-service",
    "event_types": ["order.ORDER_RECEIVED"]
  }'    

The response returns the whole Subscription object that was created, including the server generated id field:

HTTP/1.1 201 Created
Content-Type: application/json;charset=UTF-8

{
  "owning_application": "order-service",
  "event_types": [
    "order.ORDER_RECEIVED"
  ],
  "consumer_group": "default",
  "read_from": "end",
  "id": "038fc871-1d2c-4e2e-aa29-1579e8f2e71f",
  "created_at": "2016-09-23T16:35:13.273Z"
}

If there is already a subscription with same owning_application, event_types and consumer_group, it is just returned (and not updated, all other parts of the request body are then ignored).

Consuming Events from a Subscription

Consuming events is done by sending a GET request to the Subscriptions’s event resource (/subscriptions/{subscription-id}/events):

curl -v -XGET "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f/events"

The response is a stream that groups events into JSON batches separated by an endline (\n) character. The output looks like this:

HTTP/1.1 200 OK
X-Nakadi-StreamId: 70779f46-950d-4e48-9fca-10c413845e7f
Transfer-Encoding: chunked

{"cursor":{"partition":"5","offset":"543","event_type":"order.ORDER_RECEIVED","cursor_token":"b75c3102-98a4-4385-a5fd-b96f1d7872f2"},"events":[{"metadata":{"occurred_at":"1996-10-15T16:39:57+07:00","eid":"1f5a76d8-db49-4144-ace7-e683e8ff4ba4","event_type":"aruha-test-hila","partition":"5","received_at":"2016-09-30T09:19:00.525Z","flow_id":"blahbloh"},"data_op":"C","data":{"order_number":"abc","id":"111"},"data_type":"blah"},"info":{"debug":"Stream started"}]}
{"cursor":{"partition":"5","offset":"544","event_type":"order.ORDER_RECEIVED","cursor_token":"a28568a9-1ca0-4d9f-b519-dd6dd4b7a610"},"events":[{"metadata":{"occurred_at":"1996-10-15T16:39:57+07:00","eid":"1f5a76d8-db49-4144-ace7-e683e8ff4ba4","event_type":"aruha-test-hila","partition":"5","received_at":"2016-09-30T09:19:00.741Z","flow_id":"blahbloh"},"data_op":"C","data":{"order_number":"abc","id":"111"},"data_type":"blah"}]}
{"cursor":{"partition":"5","offset":"545","event_type":"order.ORDER_RECEIVED","cursor_token":"a241c147-c186-49ad-a96e-f1e8566de738"},"events":[{"metadata":{"occurred_at":"1996-10-15T16:39:57+07:00","eid":"1f5a76d8-db49-4144-ace7-e683e8ff4ba4","event_type":"aruha-test-hila","partition":"5","received_at":"2016-09-30T09:19:00.741Z","flow_id":"blahbloh"},"data_op":"C","data":{"order_number":"abc","id":"111"},"data_type":"blah"}]}
{"cursor":{"partition":"0","offset":"545","event_type":"order.ORDER_RECEIVED","cursor_token":"bf6ee7a9-0fe5-4946-b6d6-30895baf0599"}}
{"cursor":{"partition":"1","offset":"545","event_type":"order.ORDER_RECEIVED","cursor_token":"9ed8058a-95be-4611-a33d-f862d6dc4af5"}}

Each batch contains the following fields:

  • cursor: The cursor of the batch which should be used for committing the batch.

  • events: The array of events of this batch.

  • info: An optional field that can hold useful information (e.g. the reason why the stream was closed by Nakadi).

Please also note that when stream is started, the client receives a header X-Nakadi-StreamId which must be used when committing cursors.

To see a full list of parameters that can be used to control a stream of events, please see an API specification in swagger file.

Client Rebalancing

If you need more than one client for your subscription to distribute load or increase throughput - you can read the subscription with multiple clients and Nakadi will automatically balance the load across them.

The balancing unit is the partition, so the number of clients of your subscription can’t be higher than the total number of all partitions of the event-types of your subscription.

For example, suppose you had a subscription for two event-types A and B, with 2 and 4 partitions respectively. If you start reading events with a single client, then the client will get events from all 6 partitions. If a second client connects, then 3 partitions will be transferred from first client to a second client, resulting in each client consuming 3 partitions. In this case, the maximum possible number of clients for the subscription is 6, where each client will be allocated 1 partition to consume.

The Subscription API provides a guarantee of at-least-once delivery. In practice this means clients can see a duplicate event in the case where there are errors committing events. However the events which were successfully committed will not be resent.

A useful technique to detect and handle duplicate events on consumer side is to be idempotent and to check eid field of event metadata. Note: eid checking is not possible using the “undefined” category, as it’s only supplied in the “business” and “data” categories.

Subscription Cursors

The cursors in the Subscription API have the following structure:

{
  "partition": "5",
  "offset": "543",
  "event_type": "order.ORDER_RECEIVED",
  "cursor_token": "b75c3102-98a4-4385-a5fd-b96f1d7872f2"
}

The fields are:

  • partition: The partition this batch belongs to. A batch can only have one partition.

  • offset: The offset of this batch. The offset is server defined and opaque to the client - clients should not try to infer or assume a structure.

  • event_type: Specifies the event-type of the cursor (as in one stream there can be events of different event-types);

  • cursor_token: The cursor token generated by Nakadi.

Committing Cursors

Cursors can be committed by posting to Subscription’s cursor resource (/subscriptions/{subscriptionId}/cursors), for example:

curl -v -XPOST "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f/cursors"\
  -H "X-Nakadi-StreamId: ae1e39c3-219d-49a9-b444-777b4b03e84c" \
  -H "Content-type: application/json" \
  -d '{
    "items": [
      {
        "partition": "0",
        "offset": "543",
        "event_type": "order.ORDER_RECEIVED",
        "cursor_token": "b75c3102-98a4-4385-a5fd-b96f1d7872f2"
      },
      {
        "partition": "1",
        "offset": "923",
        "event_type": "order.ORDER_RECEIVED",
        "cursor_token": "a28568a9-1ca0-4d9f-b519-dd6dd4b7a610"
      }
    ]
  }'

Please be aware that X-Nakadi-StreamId header is required when doing a commit. The value should be the same as you get in X-Nakadi-StreamId header when opening a stream of events. Also, each client can commit only the batches that were sent to it.

The possible successful responses for a commit are:

  • 204: cursors were successfully committed and offset was increased.

  • 200: cursors were committed but at least one of the cursors didn’t increase the offset as it was less or equal to already committed one. In a case of this response code user will get a json in a response body with a list of cursors and the results of their commits.

The timeout for commit is 60 seconds. If you open the stream, read data and don’t commit anything for 60 seconds - the stream connection will be closed from Nakadi side. Please note that if there are no events available to send and you get only empty batches - there is no need to commit, Nakadi will close connection only if there is some uncommitted data and no commits happened for 60 seconds.

If the connection is closed for some reason then the client still has 60 seconds to commit the events it received from the moment when the events were sent. After that the session will be considered closed and it will be not possible to do commits with that X-Nakadi-StreamId. If the commit was not done - then the next time you start reading from a subscription you will get data from the last point of your commit, and you will again receive the events you haven’t committed.

When a rebalance happens and a partition is transferred to another client - the commit timeout of 60 seconds saves the day again. The first client will have 60 seconds to do the commit for that partition, after that the partition is started to stream to a new client. So if the commit wasn’t done in 60 seconds then the streaming will start from a point of last successful commit. In other case if the commit was done by the first client - the data from this partition will be immediately streamed to second client (because there is no uncommitted data left and there is no need to wait any more).

It is not necessary to commit each batch. When the cursor is committed, all events that are before this cursor in the partition will also be considered committed. For example suppose the offset was at e0 in the stream below,

partition: [ e0 | e1 | e2 | e3 | e4 | e5 | e6 | e7 | e8 | e9 ]
     offset--^

and the stream sent back three batches to the client, where the client committed batch 3 but not batch 1 or batch 2,

partition: [ e0 | e1 | e2 | e3 | e4 | e5 | e6 | e7 | e8 | e9 ]
     offset--^       
                |--- batch1 ---|--- batch2 ---|--- batch3 ---|
                        |             |               |
                        v             |               | 
                [ e1 | e2 | e3 ]      |               |
                                      v               |
                               [ e4 | e5 | e6 ]       |
                                                      v
                                              [ e7 | e8 | e9 ]
                                                    
client: cursor commit --> |--- batch3 ---|

then the offset will be moved all the way up to e9 implicitly committing all the events that were in the previous batches 1 and 2,

partition: [ e0 | e1 | e2 | e3 | e4 | e5 | e6 | e7 | e8 | e9 ]
                                                          ^-- offset

Checking Current Position

You can also check the current position of your subscription:

curl -v -XGET "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f/cursors"

The response will be a list of current cursors that reflect the last committed offsets:

HTTP/1.1 200 OK
{
  "items": [
    {
      "partition": "0",
      "offset": "8361",
      "event_type": "order.ORDER_RECEIVED",
      "cursor_token": "35e7480a-ecd3-488a-8973-3aecd3b678ad"
    },
    {
      "partition": "1",
      "offset": "6214",
      "event_type": "order.ORDER_RECEIVED",
      "cursor_token": "d1e5d85e-1d8d-4a22-815d-1be1c8c65c84"
    }
  ]
}

Subscription Statistics

The API also provides statistics on your subscription:

curl -v -XGET "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f/stats"

The output will contain the statistics for all partitions of the stream:

HTTP/1.1 200 OK
{
  "items": [
    {
      "event_type": "order.ORDER_RECEIVED",
      "partitions": [
        {
          "partition": "0",
          "state": "reassigning",
          "unconsumed_events": 2115,
          "stream_id": "b75c3102-98a4-4385-a5fd-b96f1d7872f2"
        },
        {
          "partition": "1",
          "state": "assigned",
          "unconsumed_events": 1029,
          "stream_id": "ae1e39c3-219d-49a9-b444-777b4b03e84c"
        }
      ]
    }
  ]
}

Deleting a Subscription

To delete a Subscription, send a DELETE request to the Subscription resource using its id field (/subscriptions/{id}):

curl -v -X DELETE "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f"

Successful response:

HTTP/1.1 204 No Content

Getting and Listing Subscriptions

To view a Subscription send a GET request to the Subscription resource resource using its id field (/subscriptions/{id}): :

curl -v -XGET "http://localhost:8080/subscriptions/038fc871-1d2c-4e2e-aa29-1579e8f2e71f"

Successful response:

HTTP/1.1 200 OK
{
  "owning_application": "order-service",
  "event_types": [
    "order.ORDER_RECEIVED"
  ],
  "consumer_group": "default",
  "read_from": "end",
  "id": "038fc871-1d2c-4e2e-aa29-1579e8f2e71f",
  "created_at": "2016-09-23T16:35:13.273Z"
}

To get a list of subscriptions send a GET request to the Subscription collection resource:

curl -v -XGET "http://localhost:8080/subscriptions"

Example answer:

HTTP/1.1 200 OK
{
  "items": [
    {
      "owning_application": "order-service",
      "event_types": [
        "order.ORDER_RECEIVED"
      ],
      "consumer_group": "default",
      "read_from": "end",
      "id": "038fc871-1d2c-4e2e-aa29-1579e8f2e71f",
      "created_at": "2016-09-23T16:35:13.273Z"
    }
  ],
  "_links": {
    "next": {
      "href": "/subscriptions?offset=20&limit=20"
    }
  }
}

It’s possible to filter the list with the following parameters: event_type, owning_application.
Also, the following pagination parameters are available: offset, limit.

Clients

Clients

Nakadi does not ship with a client, but there are some open source clients available that you can try:

Name Language/Framework GitHub
Nakadi Java Java https://github.com/dehora/nakadi-java
Fahrschein Java https://github.com/zalando-nakadi/fahrschein
Riptide: Stream Java/Spring https://github.com/zalando/riptide/tree/master/riptide-stream
Kanadi Scala https://github.com/zalando-incubator/kanadi
Nakadion Rust https://crates.io/crates/nakadion
nakadi-client Haskell https://nakadi-client.haskell.silverratio.net
go-nakadi Go https://github.com/stoewer/go-nakadi
nakacli CLI https://github.com/amrhassan/nakacli
pyNakadi Python https://github.com/eiunkar/pyNakadi
Clin CLI https://github.com/zalando-incubator/clin

More Nakadi related projects can be found here https://github.com/zalando-nakadi

We’ll add more clients to this section as they appear. Nakadi doesn’t support these clients; issues and pull requests should be filed with the client project.

Comparison

Comparison to Other Systems

In this section, we’ll look at how Nakadi fits in with the stream broker/processing ecosystems. Notably we’ll compare it to Apache Kafka, as that’s a common question, but also look briefly at some of the main cloud offerings in this area.

Apache Kafka (version 0.9)

Relative to Apache Kafka, Nakadi provides a number of benefits while still leveraging the raw power of Kafka as its internal broker.

  • Nakadi has some characteristics in common with Kafka, which is to be expected as the Kafka community has done an excellent job in defining the space. The logical model is basically the same - streams have partitions, messages in a partition maintain their order, and there’s no order across partitions. One producer can send an event to be read by multiple consumers and consumers have access to offset data that they can checkpoint. There are also some differences. For example, Nakadi doesn’t expose Topics as a concept in its API. Instead there are Event Types that define structure and ownership details as well as the stream. Also, consumers receive messages in batches and each batch is checkpointed rather than an individual message.

  • Nakadi uses HTTP for communications. This lets microservices maintain their boundaries and avoids forcing a shared technology dependency on producers and consumers - if you can speak HTTP you can use Nakadi and communicate with other services. This is a fairly subtle point, but Nakadi is optimised for general microservices integration and message passing, and not just handing off data to analytics subsystems. This means it needs to be available to as many different runtimes and stacks as possible, hence HTTP becomes the de-facto choice.

  • Nakadi is designed to support autonomous service teams. In Zalando, where Nakadi originated, each team has autonomy and control of their microservices stack to let them move quickly and take ownership. When running on AWS, this extends all the way down - every team has their own account structure, and to ensure a level of security and compliance, teams run standard AMIs and constrain how they interact to HTTPS using OAuth2 access controls. This means we tend to want to run any shared infrastructure as a service with a HTTP based interface. Granted, not everyone has this need - many shops on AWS won’t have per-team account structures and will tend to use a smaller number of shared environments, but we’ve found it valuable to be able leverage the power of systems like Kafka in a way that fits in with this service architecture.

  • An event type registry with schema validation. Producers can define event types using JSON Schema. Having events validated against a published schema allows consumers to know they will. There are projects in the Kafka ecosystem from Confluent that provide similar features such as the rest-proxy and the schema-registry, but they’re slightly optimised for analytics, and not quite ideal for microservices where it’s more common to use regular JSON rather than Avro. The schema registry in particular is dependent on Avro. Also the consumer connection model for the rest-proxy requires clients are pinned to servers which complicates clients - the hope for the Nakadi is that its managed subscription API, when that’s available, will not require session affinity in this way.

  • Inbuilt event types. Nakadi also optionally supports events that describe business processes and data changes. These provide common primitives for event identity, timestamps, causality, operations on data and header propagation. Teams could define their own structures, but there’s value in having some basic things that consumers and producers can coordinate on, independently of the payload, and which are being checked before being propagated to multiple consumers.

  • Operations is also a factor in Nakadi’s design. Managing upgrades to systems like Kafka becomes easier when technology sits behind an API and isn’t a shared dependency between microservices. Asynchronous event delivery can be a simpler overall option for a microservice architecture compared to synchronized and deep call paths that have to be mitigated with caches, bulkheads and circuit breakers.

In short, Nakadi is best seen as a complement to Kafka. It allows teams to use Kafka within their own boundaries but not be forced into sharing it as a global dependency.

Google Cloud Pub/Sub

Like Nakadi, Pub/Sub has a HTTP API which hides details from producers and consumers and makes it suitable for use as a microservices backplane. There are some differences worth noting:

  • Pub/Sub lets you acknowledge every message individually rather than checkpointing a position in a logical log. This approach makes its model fairly different to the other systems mentioned here. While it implies that there are no inbuilt ordering assurances, it does allow consumers to be very precise about what they have received.

  • Pub/Sub requires a subscription to be setup before messages can be consumed, which can then be used to manage delivery state for messages. In that sense it’s not unlike a traditional queuing system where the server (or “broker”) manages state for the consumer, with the slight twist that messages have a sort of random access for acknowledgements instead of competing for work at the top of the queue. Nakadi may offer a similar subscription option in the future via a managed API, but today consumers are expected to manage their own offsets.

  • Pub/Sub uses a polling model for consumers. Consumers grab a page of messages to process and acknowledge, and then make a new HTTP request to grab another page. Nakadi maintains a streaming connection to consumers, and will push events as they arrive.

  • Pub/Sub uses a common envelope structure for producing and consuming messages, and does not define any higher level structures beyond that.

Amazon Kinesis

Like Nakadi and Pub/Sub, AWS Kinesis has a HTTP API to hide its details. Kinesis and Nakadi are more similar to each other than Pub/Sub, but there are some differences.

  • Kinesis exposes shards (partitions) for a stream and supplies enough information to support per-message checkpointing with semantics much like Kafka and Nakadi. Nakadi only supplies checkpointing information per batch of messages. Kinesis allows setting the partition hash key directly, whereas Nakadi computes the key based on the data.

  • Kinesis uses a polling model for consumers, whereas Nakadi maintains a streaming connection. Kinesis consumers use a “shard iterator” to a grab pages of message, and then make a new HTTP request to grab another page. Kinesis limits the rate at which this can be done across all consumers (typically 5 transactions per second per open shard), which places an upper bound on consumer throughput. Kinesis has a broad range of choices for resuming from a position in the stream, Nakadi allows access only from the beginning and a named offset.

  • Kinesis uses a common envelope structure for producing and consuming messages, and does not define any higher level structures beyond that. Payload data is submitted as an opaque base64 blob.

  • Amazon restricts the number of streams available to an account to quite a low starting number, and messages can be stored for a maximum of 7 days, whereas Nakadi can support a large number of event types and the expiration for events is configurable.

  • Kinesis supports resizing the number of shards in a stream, whereas partition counts in Nakadi are fixed once set for an event type.

AWS Simple Queue Service (SQS)

The basic abstraction in SQS is a queue, which is quite different from a Nakadi / Kafka stream.

  • SQS queues are durable and highly available. A queue can hold an unlimited number of messages, with a maximum message retention of 2 weeks. Each message carries an opaque text payload (max. 256KB). In addition to that, messages can have up to 10 message attributes, which can be read without inspecting the payload.

  • Each message in an SQS queue can only be consumed once. In the case of multiple consumers, each one would typically use a dedicated SQS queue, which are all hooked up to a shared Amazon SNS topic that provides the fanout. When a new consumer is later added to this setup, its queue will initially be empty. An SQS queue does not have any history, and cannot be “replayed” again like a Kafka stream.

  • SQS has “work queue” semantics. This means that delivered messages have to be removed from the queue explicitly by a separate call. If this call is not received within a configured timeframe, the message is delivered again (“automatic retry”). After a configurable number of unsuccessful deliveries, the message is moved to a dead letter queue.

  • In contrast to moving a single cursor in the datastream (like in Nakadi, Kinesis or Kafka), SQS semantics of confirming individual messages, has advantages if a single message is unprocessable (i.e., format is not parseable). In SQS only the problematic message is delayed. In a cursor semantic, the client has to decide: either stop all further message processing until the problem is fixed or skip the message and move the cursor.

Allegro Hermes

Hermes, like Nakadi, is an API-based broker built on Apache Kafka. There are some differences worth noting:

  • Hermes uses webhooks to deliver messages to consumers. Consumers register a subscription with a callback url and a subscription policy that defines behaviours such as retries and delivery rates. Nakadi maintains a streaming connection to consumers, and will push events as they arrive. Whether messages are delivered in order to consumers does not appear to be a defined behaviour in the API. Similarly to Kafka, Nakadi will deliver messages to consumers in arrival order for each partition. Hermes does not appear to support partitioning in its API. Hermes has good support for tracking delivered and undelivered messages to subscribers.

  • Hermes supports JSON Schema and Avro validation in its schema registry. Nakadi’s registry currently only supports JSON Schema, but may support Avro in the future. Hermes does not provide inbuilt event types, whereas Nakadi defines optional types to support data change and business process events, with some uniform fields producers and consumers can coordinate on.

  • Hermes allows topics (event types in Nakadi) to be collated into groups that are administered by a single publisher. Consumers access data at a per topic level, the same as Nakadi currently; Nakadi may support multi-topic subscriptions in the future via a subscription API.

  • The Hermes project supports a Java client driver for publishing messages. Nakadi does not ship with a client.

  • Hermes claims resilience when it comes to issues with its internal Kafka broker, such that it will continue to accept messages when Kafka is down. It does this by buffering messages in memory with an optional means to spill to local disk; this will help with crashing brokers or hermes nodes, but not with loss of an instance (e.g., an EC2 instance). Nakadi does not accept messages if its Kafka brokers are down or unavailable.

Recipes

This section features patterns on how to use Nakadi and event stream processing in general.

OverPartitioning

Problem

Nakadi throughput scales with the number of partitions in an event type. The number of partitions in an event type is fixed — it can only be configured on create. Scaling throughput by creating a new event type can be tricky though, because the switch-over has to be coordinated between producers and consumers.

You expect a significant increase in throughput over time. How many partitions should you create?

Solution

Create more partitions then you currently need. Each consumer initially reads from multiple partitions. Increase the number of consumers as throughput increases, until the number of consumers is equal to the number of partitions.

To distribute the workload evenly, make sure that each consumer reads from the same number of partitions. This strategy works best if the number of partitions is a product of small primes:

  • with 6 (= 2 * 3) partitions, you can use 1, 2, 3 or 6 consumers
  • with 8 (= 2 * 2 * 2) partitions, you can use 1, 2, 4 or 8 consumers
  • with 12 (= 2 * 2 * 3) partitions, you can use 1, 2, 3, 4, 6 or 12 consumers

Discussion

The total number of partitions in a Nakadi cluster is limited. Start with a single partition, and employ this pattern only once you are forced to use multiple partitions. Don’t over-overpartition, use the lowest sensible number that works. You can always fall back on creating a new event type with more partitions later, if necessary.

ProcessingPipeline

Problem

You want to process all events in a given event type, but you have to preserve local (per-partition) ordering of the events.

Solution

Create a processing pipeline with multiple stages. Each stage consists of a single worker thread, and an inbox (small bounded in-memory list).

Each stage reads events one by one from its inbox, processes them, and puts them in the inbox of the next stage. The first stage reads events from Nakadi instead.

If you want to publish the events to Nakadi after processing, then the last stage can collect them in an internal buffer and post them in batches.

To keep track of Nakadi cursors, you can push them as pseudo-events trough the pipeline. Once the cursor has reached the last stage, all events in the batch must have been processed, so the cursor can be saved.

Discussion

Using bounded inboxes decouples the stages from each other, creates backpressure between them, and puts an upper limit on the total amount of work-in-progress in the pipeline.

Overall troughput of the pipeline is limited by the stage with the largest average processing time per event. By optimizing this bottleneck, you can optimize the overall throughput. Example: if the slowest stage needs 20ms to process each event, throughput will be lower than 50 events per second.

Each pipeline can consume events from one or more partitions. This setup can be scaled by increasing the number of pipelines running in parallel, up to the number of partitions in the event type.

More ideas

  • OrderedProducer: strong and weak ordering techniques
  • StatefulConsumer: managing offsets locally
  • UsingPartitions: when to use which partition options and selecting partition keys
  • HighThroughputEventing: approaches to high volume events
  • ConsumerLeaseStealing: redistributing partition workloads in a cluster
  • CausalEventing: events that have causal relationships (happens-before)
  • EventTypeVersioning: approaches to versioning event types
  • SendAnything: approaches to send arbitrary content through Nakadi (incl Avro and Protobufs)
  • ProcessMonitor: a microservice that coordinates/watches other event streams (cf @gregyoung)

Repartitioning

Repartitioning

Throughput of event type is defined by default statistic, which basically sets number of partitions for the event type (although it does not represent it clearly). Number of partitions is a scaling unit for Nakadi publishing and consumption. In order to change number of partitions one have to perform the following call, which you can read about in the “API Reference”. At the moment the request can be performed only by Nakadi admins.

curl -v -XPOST -H "Content-Type: application/json" http://localhost:8080/event-types/order_received/partition-count -d '[
{
    "partition_count": 3
}


HTTP/1.1 204 OK

Important caveats

  • Publishing events to event type with hash partition strategy will change the partitions in which they were appearing before
  • Nakadi guarantees ordering per partition per batch, repartitioning event types from 1 partitions to more will break total order of events
  • Repartitioning allows to only increase number of partitions
  • Consuming subscriptions are disconnected once repartitioning is finished

Developing

Building and Developing

Getting the Code

Nakadi is hosted on Github - zalando/nakadi and you can clone or fork it from there.

Building

The project is built with Gradle.

The gradlew wrapper script is available in the project’s root and will bootstrap the right Gradle version if it’s not already installed.

The gradle setup is fairly standard, the main dev tasks are:

  • ./gradlew build: run a build and test
  • ./gradlew clean: clean down the build

Pull requests and master are built using Travis CI and you can see the build history here.

Running Tests

There are a few build commands for testing -

  • ./gradlew build: will run a build along with the unit tests
  • ./gradlew startNakadiForTest: start Nakadi configured for acceptance tests
  • ./gradlew acceptance-test:test: will run the acceptance tests (only after Nakadi is stated)

Running Containers

There are a few build commands for running Docker -

  • ./gradlew startNakadi: start the docker containers and download images if needed.
  • ./gradlew stopNakadi: shutdown the docker processes
  • ./gradlew startStorages: start the storage container that runs Kafka and PostgreSQL. This is handy for running Nakadi directly or in your IDE.

IDE Setup

For working with an IDE, the ./gradlew eclipse IDE task is available and you’ll be able to import the build.gradle into Intellij IDEA directly.

idea

Event-Based Authorization

Event-Based Authorization

Nakadi provides per-event filtering, allowing event type publishers to specify which consumers can read an event published to an event type. This can be achieved by defining the event_owner_selector in an event type definition, that will specify how to extract ownership information.

The event_owner_selector defines following values:

  • type - the way how nakadi will extract owner from published events
  • name - the name of authorization_parameter that will be extracted and stored with event. This name is used as AuthorizationAttribute data_type for security checks with authz plugin.
  • value - parameter that defines the way of extracting AuthorizationAttribute value according to type.

In case if event_owner_selector is set in event type, then resolution of authorization parameter value should succeed with non null value, otherwise publishing will be blocked.

{
  "name": "order_received",
  "owning_application": "acme-order-service",
  ...
  "event_owner_selector": {
    "type": "path",
    "name": "retailer_id",
    "value": "security.exclusive_readers"
  }
  "category": "business",
  ...
}

The events that were published to the event type above could be read only by a set of readers, that has matching retailer_id provided by authorization plugin. In case if consumer does not have this value - the events are silently omitted from the output.

Also, once a event_owner_selector is specified for an event type, it cannot be removed or updated.

There are following event owner selector types supported:

  • path - dot separated path within published event (after enrichment), in this case value should hold dot separated path to a field that will be used as AuthorizationParameter value.
  • static - all events, that are published to nakadi will have the same AuthorizationParameter value, equal to event_owner_selector value field.

During consumption, the consumer is checked through authorization plugin whether or not it is allowed to read Event resource with AuthorizationParameter data_type equal to event_owner_selector name and extracted value.

The access is checked for all the events being sent. If the access for some events is not allowed, then the events are filtered out from the stream (not sent to consumer).

Also, filtered out events are automatically committed when subscription API is used.

F.A.Q

Frequently Asked Questions

Table of Contents


How long will events be persisted for?

The default retention time in the project is set by the retentionMs value in application.yml, which is currently 2 days.

The service installation you’re working with may have a different operational setting, and you should get in touch with the team operating that internal Nakadi service.

How many partitions will an event type be given?

The default partition count in the project is set by the partitionNum value in application.yml, which is currently 1.

The service installation you’re working with may have a different operational setting, and you should get in touch with the team operating that internal Nakadi service.

How do I configure the number of partitions?

At the moment, partition size can’t be defined via the API per event type. It may be added as an option in the future. The best option for now would be to configure the underlying Kafka topic directly.

If you want to change the default for a server installation, you can set the partitionNum value in application.yml to a new value.

Which partitioning strategy should I use?

See the section “Partition Strategies”, which goes into more detail on the available options and what they’re good for.

How can I keep track of a position in a stream?

Clients can track offset information sent in the Cursor on a per-partition basis - each batch of events sent to a consumer will contain such a Cursor that will detail the partition id and an offset (see “Cursors and Offsets” for more information). This allows a client to track how far in the partition they have consumed events, and also allows them to submit a cursor with an appropriate value as described in the “Cursors and Offsets” section. One approach would be to use local storage (eg a datastore like PostgreSQL or DynamoDB) to record the position to date outside the client application, making it available in the event of restarts.

Note that a managed API is being developed which will supporting storing offsets for consumer clients in the future.

What’s an effective schema?

The effective schema is the combination of the schema structure defined for a particular category, such as ‘business’ or ‘data’ and the custom schema submitted when creating an event type. When an event is posted to Nakadi, the effective schema is used to validate the event and not the separate category level and custom level schemas.

You can read more in the section “Effective Schema”.

Nakadi isn’t validating metadata and/or event identifiers, what’s going on?

It’s possible you are working with an ‘undefined’ event type. The ‘undefined’ category doesn’t support metadata validation or enrichment. In more technical terms, the effective schema for an undefined event is exactly the same as the schema that was submitted when the event type was created.

What clients are available?

The project doesn’t ship with a client, but there are a number of open source clients described in the “Clients” section.

If you have an open source client not listed there, we’d love to hear from you :) Let us know via GitHub and we’ll add it to the list.

How do I disable OAuth for local development?

The default behavior when running the docker containers locally will be for OAuth to be disabled.

If you are running a Nakadi server locally outside docker, you can disable token checks by setting the environment variable NAKADI_OAUTH2_MODE to OFF before starting the server.

I want to send arbitrary JSON, how do I avoid defining a JSON Schema?

The standard workaround is to define an event type with the following category and schema:

  • category: undefined
  • schema: {"additionalProperties": true}

Note that sending a schema of {} means nothing will validate, not that anything will be allowed.

Can I post something other than JSON as an event?

It’s a not a configuration the project directly supports or is designed for. But if you are willing to use JSON as a wrapper, one option is to define a JSON Schema with a property whose type is a string, and send the non-JSON content as a Base64 encoded value for the string. It’s worth pointing out this is entirely opaque to Nakadi and you won’t get the benefits of schema checking (or even that the submitted string is properly encoded Base64). Note that if you try this, you’ll need to be careful to encode the Base64 as being URL/file safe to avoid issues with the line delimited stream format Nakadi uses to send messages to consumers - as mentioned this is an option that the server doesn’t directly support.

I get the message “Is the docker daemon running on this host?” - Help!

If you get the message “Is the docker daemon running on this host?” first check that Docker and VirtualBox are running. If you know they are running, you might want to run this command -

eval "$(docker-machine env default)"

What’s the reason for newest available offset being smaller than oldest offset?

When there are no events available for an event-type because they’ve expired, then newest_available_offset will be smaller than oldest_available_offset. Because Nakadi has exclusive offset handling, it shows the offset of the last message in newest_available_offset.

Is there a way to make publishing batches of events atomic?

Not at the moment. If the events are for different event types, or the events will be distributed across different partitions for a single event type, then there’s no way to achieve atomicity in the sense of “all events or no events will be published” in the general case. If the events belong to the same partition, the server does not have compensating behavior to ensure they will all be written.

Producers that need atomicity will want to create an event type structure that allows all the needed information to be contained within an event. This is a general distributed systems observation around message queuing and stream broker systems rather than anything specific to Nakadi.

Does Nakadi support compression?

The server will accept gzip encoded events when posted. On the consumer side, if the client asks for compression the server will honor the request.

How do I contribute to the project?

Nakadi accepts contributions from the open-source community. Please see CONTRIBUTE.md.

Settings/Features

Settings/Features

Some features of the Nakadi can be enabled/disabled using the settings/features API. Following are some of the important features you should be aware about.

disable_event_type_creation

Sometimes we need to disable the creation of new event-types for operational reasons. The disable_event_type_creation feature allows you to temporarily disable creation of new event types.

disable_event_type_deletion

Sometimes we need to disable the deletion of event-types for operational reasons. The disable_event_type_deletion feature allows you to temporarily disable deletion of event types.

delete_event_type_with_subscriptions

The delete_event_type_with_subscriptions is helpful to allow deletion of event-types without an active subscription. When this feature is enabled, even those event-types with subscriptions can be deleted.

event_type_deletion_only_admins

Not in use anymore

disable_subscription_creation

Sometimes we need to disable the creation of new subscriptions for operational reasons. The disable_subscription_creation feature allows you to temporarily disable creation of new subscriptions.

remote_tokeninfo

Nakadi can be configured with two tokenInfo services; local and remote. By default Nakadi uses the local but Nakadi can be forced to use the remote one by enabling remote_tokeninfo.

kpi_collection

Nakadi publishes several KPIs of Nakadi as special event-types in Nakadi. Publishing of these KPIs can be turned off by disabling kpi_collection if they are not used.

audit_log_collection

In addition to KPIs, Nakadi also publishes another set of events called audit logs. These can also be turned off by disabling audit_log_collection, if they are not used.

disable_db_write_operations

The disable_db_write_operations feature can be used to completely block all non-read access to the database. This can be useful when you want to do maintenance of the database.

force_event_type_authz

Authorization section of an event-type can define who (or what) have which kind of access to an event-type. When the force_event_type_authz feature is enabled, all new event-types must have an authorization section.

force_subscription_authz

Same as event-type, subscription can also have an authorization section. The force_subscription_authz can be used to make sure that all new subscriptions have an authorization section.

repartitioning

Nakadi supports repartitioning to increase the number of partitions available for an event-type. This can be enabled by enabling the repartitioning feature.

Timelines

Timelines

This document covers Timelines internals. It’s meant to explain how timelines work, to help you understand the code and what each part of it contributes to the overall picture.

Timeline creation

Timeline creation is coordinated through a series of locks and barriers using Zookeeper. Following we depict an example of what the ZK datastructure looks like at each step.

Initial state

Every time a Nakadi application is launched, it tries to create the following ZK structure:

timelines:
  lock: -                    lock for timeline versions synchronization
  version: {version}      monotonically incremented long value (version of timelines configuration)
  locked_et: -
  nodes:                    nakadi nodes
    node1: {version}    Each nakadi node exposes the version used on this node
    node2: {version}

In order to not override the initial structure, due to concurrency, each instance needs to take the lock /nakadi/timelines/lock before executing.

Start timeline creation for et_1

When a new timeline creation is initiated, the first step is to acquire a lock to update timelines for et_1 by creating an ephemeral node at /timelines/locked_et/et_1.

timelines:
  lock: -
  version: 0
  locked_et:
    et_1: -
  nodes:
    node1: 0
    node2: 0

Notify all Nakadi nodes about change: the version barrier

Next, the instance coordinating the timeline creation bumps the version node, which all Nakadi instances are listening to changes, so they are notified when something changes.

timelines:
  lock: -
  version: 1       # this is incremented by 1
  locked_et:
    et_1: -
  nodes:
    node1: 0
    node2: 0

Wait for all nodes to react to the new version

Each Nakadi instance watches the value of the /nakadi/timelines/version/ node. When it changes, each instance checks all locked event types and reacts accordingly, by either releasing or blocking publishers locally.

Once each instance has updated its local list of locked event types, it bumps its own version, to let the timeline creator initiator know that it can proceed.

timelines:
  lock: -
  version: 1 
  locked_et:
     et_1: -
  nodes:
    node1: 1       # each instance updates its own version
    node2: 1

Proceed with timeline creation

Once all instances reacted, the creation proceeds with the initiator inserting the necessary database entries in the timelines table, and by snapshotting the latest available offset for the existing storage. It also creates a topic in the new storage. Be aware that if a timeline partition has never been used, the offset stored is -1. If it has a single event, the offset is zero and so on.

Remove lock and notify all instances again

Following the same logic for initiating the creation of a timeline, locks are deleted and version is bumped. All Nakadi instances react by removing their local locks and switching timeline if necessary.

timelines:
  lock: -
  version: 2 
  locked_et:     
  nodes:
    node1: 1
    node2: 1

After every instance reacted, it should look like:

timelines:
  lock: -
  version: 2 
  locked_et:
  nodes:
    node1: 2       # each instance updates its own version
    node2: 2

Done

All done here. A new timeline has been created successfully. All operations are logged so in case you need to debug things, just take a look at INFO level logs.

Data Integration at Zalando

Introduction

Teams running platform microservices encapsulate their datastores and data models in accordance with our API Guidelines. The result is business intelligence and analysis, which are also teams running their own services, don’t have direct access to raw data. This demands a different approach to data integration compared to traditional techniques such as extract-transform-load (ETL) from application databases.

Event Based Integration

The way we support data integration is twofold. First, teams take responsibility for providing data if there is demand for it or a platform requirement to do so, by publishing Events, defined according to an Event Type. Second, at the technical level, data integration happens by microservices publishing streams of event data to an Event Broker, which acts as managed infrastructure for the platform.

We can call this style “Event Based Integration”. It allows providing access to local microservice datastores as logical data synchronization events written into data lake and can be used to reconstruct state for BI and DS analysis and reporting purposes.

Event Based Interchange

While we mostly focus on data integation in this section, we should mention that the architectural style can also be used for interchange. This is where events are also used for business and other events used e.g. for asynchronous service to service communication or process monitoring purposes. These events also use event infrastructure and are also stored in data lake and provided for further analysis.

Architectural Elements

In the platform architecture, there are two foundational services used for event based integration. First, an Event Broker, which is a managed deployment of the Nakadi project. Second, the Data Lake, which captures and stores events from the broker, and is used by the data warehouse and other downstream systems.

Event Based Integration - Architecture

Data Model: Event Types and Events

Producers and consumers of events coordinate asynchronously via an event definition, called an Event Type, and its corresponding stream of Events that conform to the type. Both event types and events are handled by the event broker, which supports a HTTP API.

Events types allow an Event Producer to define a data contract based on a JSON-Schema and an Event Category. An event producer owns the definition of the events it publishes and is responsible for sending those events to the broker, registering the schema for the event type, and managing the versioning and evolution of the data. Events are part of the service’s interface to the outside world.

Event Consumers can connect to an HTTP event stream to receive events. Consumers can also use a Subscription to have the broker manage their offset positions in an event stream. One event type can have many subscribing consumers, allowing new services to receive events without the producer having to know or directly integrate with each consumer.

The Event Broker: Nakadi

Nakadi allows services to register types of event and have them published and enriched. Services can subscribe to events by type and consume them as a stream. Nakadi’s publish/subscribe model preserves ordering for each received event, and supports expiration based retention and replay of events for its consumers.

Nakadi supports HTTP APIs for event publish-subscribe and schema management. This API constrains events on the platform to be one of three categories:

  • Data Change Event: An event that represents a change to a record or other entity.

  • Business Event: An event that is part of, or drives a business process.

  • Undefined Event: Suitable for other kinds of events where schema validation still provides value.

Documentation on event types is available in the Nakadi Developer Manual - Event Types.

The Data Lake

A significant consumer of events is the Data Lake, which subscribes to multiple event types (specified by use case) sent to the broker. The data lake prepares and makes available platform data for use by the data warehouse and other analytical systems.

The Data Lake is not only capturing data for storage in the data lake, it also enables functions such as business process monitoring. For example, all EventLog events sent to the platform are captured by the Data Lake.

Nakadi Event Bus API Definition

version 0.10.1


Nakadi at its core aims at being a generic and content-agnostic event broker with a convenient
API. In doing this, Nakadi abstracts away, as much as possible, details of the backing
messaging infrastructure. The single currently supported messaging infrastructure is Kafka
(Kinesis is planned for the future).

In Nakadi every Event has an EventType, and a stream of Events is exposed for each
registered EventType.

An EventType defines properties relevant for the operation of its associated stream, namely:

  • The schema of the Event of this EventType. The schema defines the accepted format of
    Events of an EventType and will be, if so desired, enforced by Nakadi. Usually Nakadi will
    respect the schema for the EventTypes in accordance to how an owning Application defines them.

  • The expected validation and enrichment procedures upon reception of an Event.
    Validation define conditions for the acceptance of the incoming Event and are strictly enforced
    by Nakadi. Usually the validation will enforce compliance of the payload (or part of it) with
    the defined schema of its EventType. Enrichment specify properties that are added to the payload
    (body) of the Event before persisting it. Usually enrichment affects the metadata of an Event
    but is not limited to.

  • The ordering expectations of Events in this stream. Each EventType will have its Events
    stored in an underlying logical stream (the Topic) that is physically organized in disjoint
    collections of strictly ordered Events (the Partition). The EventType defines the field that
    acts as evaluator of the ordering (that is, its partition key); this ordering is guaranteed by
    making Events whose partition key resolves to the same Partition (usually a hash function on its
    value) be persisted strictly ordered in a Partition. In practice this means that all Events
    within a Partition have their relative order guaranteed: Events (of a same EventType) that are
    about a same data entity (that is, have the same value on its Partition key) reach always the
    same Partition, the relative ordering of them is secured. This mechanism implies that no
    statements can be made about the relative ordering of Events that are in different partitions.

  • The authorization rules for the event type. The rules define who can produce events for
    the event type (write), who can consume events from the event type (read), and who can update
    the event type properties (admin). If the authorization rules are absent, then the event type
    is open to all authenticated users.

    Except for defined enrichment rules, Nakadi will never manipulate the content of any Event.

    Clients of Nakadi can be grouped in 2 categories: EventType owners and Clients (clients
    in turn are both Producers and Consumers of Events). Event Type owners interact with
    Nakadi via the Schema Registry API for the definition of EventTypes, while Clients via the
    streaming API for submission and reception of Events.

    In the Subscriptions API (High Level API), the consumption of Events proceeds via the establishment
    of a named Subscription to an EventType. Subscriptions are persistent relationships from an
    Application (which might have several instances) and the stream of one or more EventType’s,
    whose consumption tracking is managed by Nakadi, freeing Consumers from any responsibility in
    tracking of the current position on a Stream.


    Scope and status of the API
    ———————————

    In this document, you’ll find:

  • The Schema Registry API, including configuration possibilities for the Schema, Validation,
    Enrichment and Partitioning of Events, and their effects on reception of Events.

  • The existing event format (see definition of Event, BusinessEvent and DataChangeEvent)
    (Note: in the future this is planned to be configurable and not an inherent part of this API).

  • High Level API.

    Other aspects of the Event Bus are at this moment to be defined and otherwise specified, not included
    in this version of this specification.

name: Team Aruha @ Zalando
email: team-aruha+nakadi-maintainers@zalando.de

Methods

/avro-schemas/{name}/versions

The API to return Avro schemas to work with Nakadi.

Name Located in Description
name path String

Name of the Avro schema to fetch. Example, batch.publishing, batch.consumption.

Object
Object properties:
items :
Array of AvroSchema

list of schemas

The wrapper containing the list of the available Avro schemas.

The list of the available Avro schemas.

/avro-schemas/{name}/versions/{version}

The API to return Avro schemas to work with Nakadi.

Name Located in Description
name path String

Name of the Avro schema to fetch. Example, batch.publishing, batch.consumption.

version path String

Version of the Avro schema to fetch.

AvroSchema

The available Avro schema.

/event-types

Returns a list of all registered EventTypes

Name Located in Description
X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

writer query String

Will query all event types that the given user or service can write to based on authorization section. Service or user
should be listed explicitly on authorization section.

Example: “service:stups_nakadi-ui”

owning_application query String

Will query all event types where owning_application matches.

Example: “stups_nakadi-ui”

Array of EventType

Ok

Problem

Client is not authenticated

/event-types

Creates a new EventType.

The fields enrichment-strategies and partition-resolution-strategy
have all an effect on the incoming Event of this EventType. For its impacts on the reception
of events please consult the Event submission API methods.

  • Validation strategies define an array of validation stategies to be evaluated on reception
    of an Event of this EventType. Details of usage can be found in this external document

    • http://zalando.github.io/nakadi-manual/

  • Enrichment strategy. (todo: define this part of the API).

  • The schema of an EventType is defined as an EventTypeSchema. Currently only
    the value json-schema is supported, representing JSON Schema draft 04.

    Following conditions are enforced. Not meeting them will fail the request with the indicated
    status (details are provided in the Problem object):

  • EventType name on creation must be unique (or attempting to update an EventType with
    this method), otherwise the request is rejected with status 409 Conflict.

  • Using EventTypeSchema.type other than json-schema or passing a EventTypeSchema.schema
    that is invalid with respect to the schema’s type. Rejects with 422 Unprocessable entity.

  • Referring any Enrichment or Partition strategies that do not exist or
    whose parametrization is deemed invalid. Rejects with 422 Unprocessable entity.

    Nakadi MIGHT impose necessary schema, validation and enrichment minimal configurations that
    MUST be followed by all EventTypes (examples include: validation rules to match the schema;
    enriching every Event with the reception date-type; adhering to a set of schema fields that
    are mandatory for all EventTypes). The mechanism to set and inspect such rules is not
    defined at this time and might not be exposed in the API.

Name Located in Description
event-type body EventType

EventType to be created

EventType

Created

Problem

Client is not authenticated

Problem

Conflict, for example on creation of EventType with already existing name.

Problem

Unprocessable Entity

/event-types/{name}

Returns the EventType identified by its name.

Name Located in Description
name path String

Name of the EventType to load.

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

Problem

Client is not authenticated

/event-types/{name}

Updates the EventType identified by its name. Behaviour is the same as creation of
EventType (See POST /event-type) except where noted below.

The name field cannot be changed. Attempting to do so will result in a 422 failure.

Modifications to the schema are constrained by the specified compatibility_mode.

Updating the EventType is only allowed for clients that satisfy the authorization admin requirements,
if it exists.

Name Located in Description
name path String

Name of the EventType to update.

event-type body EventType

EventType to be updated.

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

Problem

Client is not authenticated

Problem

Unprocessable Entity

Problem

Access forbidden

/event-types/{name}

Deletes an EventType identified by its name. All events in the EventType’s stream’ will
also be removed. Note: deletion happens asynchronously, which has the following
consequences:

  • Creation of an equally named EventType before the underlying topic deletion is complete
    might not succeed (failure is a 409 Conflict).

  • Events in the stream may be visible for a short period of time before being removed.

    Updating the EventType is only allowed for clients that satisfy the authorization admin requirements,
    if it exists.
Name Located in Description
name path String

Name of the EventType to delete.

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

-

EventType is successfuly removed

-

EventType is not found in nakadi

Problem

Client is not authenticated

Problem

Access forbidden

/event-types/{name}/cursor-distances

GET with payload.

Calculate the distance between two offsets. This is useful for performing checks for data completeness, when
a client has read some batches and wants to be sure that all delivered events have been correctly processed.

If the event type uses ‘compact’ cleanup policy - then the actual number of events for consumption can be lower
than the distance reported by this endpoint.

If per-EventType authorization is enabled, the caller must be authorized to read from the EventType.

Name Located in Description
name path String

Name of the EventType

cursors-distances-query body

List of pairs of cursors: initial_cursor and final_cursor. Used as input by Nakadi to
calculate how many events there are between two cursors. The distance doesn’t include the initial_cursor
but includes the final_cursor. So if they are equal the result is zero.

An array of CursorDistanceResults with the distance between two offsets.

OK

Problem

Unprocessable Entity

Problem

Access forbidden because of missing scope or EventType authorization failure.

/event-types/{name}/cursors-lag

GET with payload.

This endpoint is mostly interesting for monitoring purposes. Used when a consumer wants to know how far behind
in the stream its application is lagging.

It provides the number of unconsumed events for each cursor’s partition.

If the event type uses ‘compact’ cleanup policy - then the actual number of unconsumed events can be lower than
the one reported by this endpoint.

If per-EventType authorization is enabled, the caller must be authorized to read from the EventType.

Name Located in Description
name path String

EventType name

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

cursors body
Array of Cursor

Each cursor indicates the partition and offset consumed by the client. When a cursor is provided,
Nakadi calculates the consumer lag, e.g. the number of events between the provided offset and the most
recent published event. This lag will be present in the response as unconsumed_events property.

It’s not mandatory to specify cursors for every partition.

The lag calculation is non inclusive, e.g. if the provided offset is the offset of the latest published
event, the number of unconsumed_events will be zero.

All provided cursors must be valid, i.e. a non expired cursor of an event in the stream.

Array of Partition

List of partitions with the number of unconsummed events.

OK

Problem

Unprocessable Entity

Problem

Access forbidden because of missing scope or EventType authorization failure.

/event-types/{name}/deleted-events

Once an event is published to a log compacted event-type in nakadi, it
will stay there until another event with the same key is published.
In order to delete an event from a log compacted event-type, it can be
posted to the deleted-events endpoint. The content of the event is not
validated, but the information required to generate key and select
partition should be present. On next log compaction, which could be
hours later based on configurations, any previous events with the same
key is removed from the specified partition + event-type.

Name Located in Description
name path String

Name of the EventType

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

span_ctx header String

The span context, which is used to trace the spans and passed to called services. Helpful
for operational troubleshooting and will help users integrate Nakadi as a part of their trace.

event body
Array of Event

The Event being deleted.

-
headers:
span_ctx:
type: string
description: Span context of the span used to trace the request in Nakadi

Deletion event successfully published; any existing events with
same key will be deleted on next log compaction.

Problem
headers:
span_ctx:
type: string
description: Span context of the span used to trace the request in Nakadi

Client is not authenticated

Problem
headers:
span_ctx:
type: string
description: Span context of the span used to trace the request in Nakadi

Access is forbidden for the client or event type

/event-types/{name}/events

The Low Level API is deprecated. It will be removed in a future version.

Starts a stream delivery for the specified partitions of the given EventType.

The event stream is formatted as a sequence of EventStreamBatches separated by \n. Each
EventStreamBatch contains a chunk of Events and a Cursor pointing to the end of the
chunk (i.e. last delivered Event). The cursor might specify the offset with the symbolic
value BEGIN, which will open the stream starting from the oldest available offset in the
partition.

Currently the application/x-json-stream format is the only one supported by the system,
but in the future other media types may be supported.

If streaming for several distinct partitions, each one is an independent EventStreamBatch.

The initialization of a stream can be parameterized in terms of size of each chunk, timeout
for flushing each chunk, total amount of delivered Events and total time for the duration of
the stream.

Nakadi will keep a streaming connection open even if there are no events to be delivered. In
this case the timeout for the flushing of each chunk will still apply and the
EventStreamBatch will contain only the Cursor pointing to the same offset. This can be
treated as a keep-alive control for some load balancers.

The tracking of the current offset in the partitions and of which partitions is being read
is in the responsibility of the client. No commits are needed.

The HTTP response on the wire will look something like this (the newline is show as \n for clarity):

curl -v http://localhost:8080/event-types/order_received/events 
    

HTTP/1.1 200 OK
Content-Type: application/x-json-stream

{"cursor":{"partition":"0","offset":"6"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"5"},"events":[...]}\n
{"cursor":{"partition":"0","offset":"4"},"events":[...]}\n
Name Located in Description
name path String

EventType name to get events about

X-nakadi-cursors header
Array of String
format: #/definitions/Cursor

Cursors indicating the partitions to read from and respective starting offsets.

Assumes the offset on each cursor is not inclusive (i.e., first delivered Event is the
first one after the one pointed to in the cursor).

If the header is not present, the stream for all partitions defined for the EventType
will start from the newest event available in the system at the moment of making this
call.

Note: we are not using query parameters for passing the cursors only because of the
length limitations on the HTTP query. Another way to initiate this call would be the
POST method with cursors passed in the method body. This approach can implemented in the
future versions of this API.

batch_limit query Integer
format: int32
default: 1

Maximum number of Events in each chunk (and therefore per partition) of the stream.

stream_limit query Integer
format: int32
default: 0

Maximum number of Events in this stream (over all partitions being streamed in this
connection).

  • If 0 or undefined, will stream batches indefinitely.

  • Stream initialization will fail if stream_limit is lower than batch_limit.
batch_flush_timeout query Number
format: int32
default: 30

Maximum time in seconds to wait for the flushing of each chunk (per partition).

  • If the amount of buffered Events reaches batch_limit before this batch_flush_timeout
    is reached, the messages are immediately flushed to the client and batch flush timer is reset.

  • If 0 or undefined, will assume 30 seconds.

  • Value is treated as a recommendation. Nakadi may flush chunks with a smaller timeout.
stream_timeout query Number
format: int32
default: 0
minimum: 0
maximum: 4200

Maximum time in seconds a stream will live before connection is closed by the server.
If 0 or unspecified will stream for 1h ±10min.

If this timeout is reached, any pending messages (in the sense of stream_limit) will be flushed
to the client.

Stream initialization will fail if stream_timeout is lower than batch_flush_timeout.
If the stream_timeout is greater than max value (4200 seconds) - Nakadi will treat this as not
specifying stream_timeout (this is done due to backwards compatibility).

stream_keep_alive_limit query Integer
format: int32
default: 0

Maximum number of empty keep alive batches to get in a row before closing the connection.

If 0 or undefined will send keep alive messages indefinitely.

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

EventStreamBatch

Starts streaming to the client.
Stream format is a continuous series of EventStreamBatchs separated by \n

Problem

Not authenticated

Problem

Unprocessable entity

Problem

Too Many Requests. The client reached the maximum amount of simultaneous connections to a single partition

Problem

Access is forbidden for the client or event type

/event-types/{name}/events

Publishes a batch of Events of this EventType. All items must be of the EventType
identified by name.

Reception of Events will always respect the configuration of its EventType with respect to
validation, enrichment and partition. The steps performed on reception of incoming message
are:

  1. Every validation rule specified for the EventType will be checked in order against the
    incoming Events. Validation rules are evaluated in the order they are defined and the Event
    is rejected in the first case of failure. If the offending validation rule provides
    information about the violation it will be included in the BatchItemResponse. If the
    EventType defines schema validation it will be performed at this moment. The size of each
    Event will also be validated. The maximum size per Event is configured by the adminitrator.
    We use the batch input to measure the size of events, so unnecessary spaces, tabs, and
    carriage returns will count towards the event size.

  2. Once the validation succeeded, the content of the Event is updated according to the
    enrichment rules in the order the rules are defined in the EventType. No preexisting
    value might be changed (even if added by an enrichment rule). Violations on this will force
    the immediate rejection of the Event. The invalid overwrite attempt will be included in
    the item’s BatchItemResponse object.

  3. The incoming Event’s relative ordering is evaluated according to the rule on the
    EventType. Failure to evaluate the rule will reject the Event.

    Given the batched nature of this operation, any violation on validation or failures on
    enrichment or partitioning will cause the whole batch to be rejected, i.e. none of its
    elements are pushed to the underlying broker.

    Failures on writing of specific partitions to the broker might influence other
    partitions. Failures at this stage will fail only the affected partitions.
Name Located in Description
name path String

Name of the EventType

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

span_ctx header String

The span context, which is used to trace the spans and passed to called services. Helpful
for operational troubleshooting and will help users integrate Nakadi as a part of their trace.

event body
Array of Event

The Event being published

-
headers:
span_ctx:
type: string
description: Span context of the span used to trace the request in Nakadi

All events in the batch have been successfully published.

headers:
span_ctx:
type: string
description: Span context of the span used to trace the request in Nakadi

At least one event has failed to be submitted. The batch might be partially submitted.

Problem
headers:
span_ctx:
type: string
description: Span context of the span used to trace the request in Nakadi

Client is not authenticated

headers:
span_ctx:
type: string
description: Span context of the span used to trace the request in Nakadi

At least one event failed to be validated, enriched or partitioned. None were submitted.

Problem
headers:
span_ctx:
type: string
description: Span context of the span used to trace the request in Nakadi

Access is forbidden for the client or event type

/event-types/{name}/partition-count

Repartitioning given event type to the number of provided paritions.

This endpoint is idempotent. In case of unexpected server failures the user must call this endpoint again until
it succeeds in order to avoid corrupting internal structures.

Name Located in Description
name path String

EventType name

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

partition_count body PartitionCount
-

Repartitioning was successful

Problem

If asked number of parititons is more than maximum for the event type, or paritions number is
smaller than current number of partitions.

Problem

Client is not authenticated

Problem

Access forbidden because of missing scope or EventType authorization failure.

/event-types/{name}/partitions

Lists the Partitions for the given event-type.

This endpoint is mostly interesting for monitoring purposes or in cases when consumer wants
to start consuming older messages.
If per-EventType authorization is enabled, the caller must be authorized to read from the EventType.

Name Located in Description
name path String

EventType name

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

cursors query
Array of String
format: #/definitions/Cursor

Each cursor indicates the partition and offset consumed by the client. When this parameter is provided,
Nakadi calculates the consumer lag, e.g. the number of events between the provided offset and the most
recent published event. This lag will be present in the response as unconsumed_events property.

It’s not mandatory to specify cursors for every partition.

The lag calculation is non inclusive, e.g. if the provided offset is the offset of the latest published
event, the number of unconsumed_events will be zero.

The value of this parameter must be a json array URL encoded.

Array of Partition

An array of Partitions

OK

Problem

Client is not authenticated

Problem

Access forbidden because of missing scope or EventType authorization failure.

/event-types/{name}/partitions/{partition}

Returns the given Partition of this EventType. If per-EventType authorization is enabled, the caller must
be authorized to read from the EventType.

Name Located in Description
name path String

EventType name

partition path String

Partition id

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

consumed_offset query String

Offset to query for unconsumed events. Depends on partition parameter. When present adds the property
unconsumed_events to the response partition object.

Problem

Client is not authenticated

Problem

Access forbidden because of missing scope or EventType authorization failure.

/event-types/{name}/schemas

List of schemas ordered from most recent to oldest.

Name Located in Description
name path String

EventType name

limit query Integer
format: int64
default: 20
minimum: 1
maximum: 1000

maximum number of schemas retuned in one page

offset query Integer
format: int64
default: 0
minimum: 0

page offset

Object
Object properties:
_links : PaginationLinks
items :
Array of EventTypeSchema

list of schemas.

OK

/event-types/{name}/schemas

Adds a new schema for this event type. New schemas are automatically used for validation of events. The rules
for adding a new schema are described in details in the
event types section of the documentation.

For API backward compatibility, adding a new schema can also be done by updating the entire event type via PUT.

In case fetch parameter is set to true, returns existing schema that matches supplied schema.

Name Located in Description
name path String

EventType name

fetch query Boolean
default: false

If fetch parameter is set to true, returns existing schema that matches supplied schema.

event-type-schema body EventTypeSchema

New version of the event type’s schema to be used for validating published events.

EventTypeSchema

Existing event type schema, with version.

-

New schema registered with success.

Problem

Not authenticated

Problem

If no matching schema is found.

Problem

Unprocessable entity. For understanding the rules on what can be changed in a schema, please refer to
the schema evolution documentation.

Problem

Access is forbidden for the client or event type

/event-types/{name}/schemas/{version}

Retrieves a given schema version. A special {version} key named ‘latest’ is provided for
convenience.

Name Located in Description
name path String

EventType name

version path String

EventType schema version

EventTypeSchema

Schema object.

/event-types/{name}/shifted-cursors

Transforms a list of Cursors with shift into a list without shifts. This is useful when there is the need
for randomly access events in the stream.
If per-EventType authorization is enabled, the caller must be authorized to read from the EventType.

Name Located in Description
name path String

EventType name

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

cursors body
Array of ShiftedCursor

GET with payload.

Cursors indicating the positions to be calculated. Given a initial cursor, with partition and offset, it’s
possible to obtain another cursor that is relatively forward or backward to it. When shift is positive,
Nakadi will respond with a cursor that in forward shif positions based on the initial partition and
offset. In case shift is negative, Nakadi will move the cursor backward.

Note: It’s not currently possible to shift cursors based on time. It’s only possible to shift cursors
based on the number of events to forward of backward given by shift.

Array of Cursor

An array of Cursors with shift applied to the offset. The response should contain cursors in the same
order as the request.

OK

Problem

It’s only possible to navigate from a valid cursor to another valid cursor, i.e. partitions and
offsets must exist and not be expired. Any combination of parameters that might break this rule will
result in 422. For example:

  • if the initial partition and offset are expired.
  • if the shift provided leads to a already expired cursor.
  • if the shift provided leads to a cursor that is not yet existent, i.e. it’s pointing to
    some cursor yet to be generated in the future.
Problem

Access forbidden because of missing scope or EventType authorization failure.

/event-types/{name}/timelines

List timelines for a given event type.

Name Located in Description
name path String

Name of the EventType to list timelines for.

Array of Object
Object properties:
id : String
format: uuid
event_type : String
order : Integer
storage_id : String
topic : String
created_at : String
format: RFC 3339 date-time
switched_at : String
format: RFC 3339 date-time
cleaned_up_at : String
format: RFC 3339 date-time
latest_position : Object

list of timelines.

OK

Problem

No such event type

Problem

Access forbidden

/event-types/{name}/timelines

Creates a new timeline for an event type and makes it active.

Name Located in Description
name path String

Name of the EventType

timeline_request body Object
Object properties:
storage_id : String

Storage id to be used for timeline creation

-

New timeline is created and in use

Problem

No such event type

Problem

Unprocessable entity due to non existing storage

Problem

Access forbidden

/metrics

Get monitoring metrics

Name Located in Description
Metrics

Ok

Problem

Client is not authenticated

/registry/enrichment-strategies

Lists all of the enrichment strategies supported by this Nakadi installation. Special or
custom strategies besides the defaults will be listed here.

Name Located in Description
Array of String

Returns a list of all enrichment strategies known to Nakadi

Problem

Client is not authenticated

/registry/partition-strategies

Lists all of the partition resolution strategies supported by this installation of Nakadi.
Special or custom strategies besides the defaults will be listed here.

Nakadi currently offers these inbuilt strategies:

  • random: Resolution of the target partition happens randomly (events are evenly
    distributed on the topic’s partitions).

  • user_defined: Target partition is defined by the client. As long as the indicated
    partition exists, Event assignment will respect this value. Correctness of the relative
    ordering of events is under the responsibility of the Producer. Requires that the client
    provides the target partition on metadata.partition (See EventMetadata). Failure to do
    so will reject the publishing of the Event.

  • hash: Resolution of the partition follows the computation of a hash from the value of
    the fields indicated in the EventType’s partition_key_fields, guaranteeing that Events
    with same values on those fields end in the same partition. Given the event type’s category
    is DataChangeEvent, field path is considered relative to “data”.
Name Located in Description
Array of String

Returns a list of all partitioning strategies known to Nakadi

Problem

Client is not authenticated

/settings/admins

Lists all administrator permissions. This endpoint is restricted to administrators with the ‘read’ permission.

Name Located in Description
AdminAuthorization

List all administrator permissions.

Problem

Client is not authenticated

Problem

Access forbidden

/settings/admins

Updates the list of administrators. This endpoint is restricted to administrators with the ‘admin’ permission.

Name Located in Description
authorization body AdminAuthorization

Lists of administrators

AdminAuthorization

List all administrator permissions.

Problem

Client is not authenticated

Problem

Access forbidden

Problem

Unprocessable entity due to not enough administrators in a list

/settings/blacklist

Lists all blocked producers/consumers divided by app and event type.

Name Located in Description
Object
Object properties:
producers : Object
Object properties:
event_types :
Array of String

a list of all blocked event types for publishing events.

apps :
Array of String

a list of all blocked apps for publishing events.

a list of all blocked producers.

consumers : Object
Object properties:
event_types :
Array of String

a list of all blocked event types for consuming events.

apps :
Array of String

a list of all blocked apps for consuming events.

a list of all blocked consumers.

Lists all blocked producers/consumers.

/settings/blacklist/{blacklist_type}/{name}

Blocks publication/consumption for particular app or event type.

Name Located in Description
blacklist_type path String

Type of the blacklist to put client into.

List of available types:

  • ‘CONSUMER_APP’: consumer application.
  • ‘CONSUMER_ET’: consumer event type.
  • ‘PRODUCER_APP’: producer application.
  • ‘PRODUCER_ET’: producer event type.
name path String

Name of the client to block.

-

Client or event type was successfully blocked.

/settings/blacklist/{blacklist_type}/{name}

Unblocks publication/consumption for particular app or event type.

Name Located in Description
blacklist_type path String

Type of the blacklist to put client into.

List of available types:

  • ‘CONSUMER_APP’: consumer application.
  • ‘CONSUMER_ET’: consumer event type.
  • ‘PRODUCER_APP’: producer application.
  • ‘PRODUCER_ET’: producer event type.
name path String

Name of the client to unblock.

-

Client was successfully unblocked.

/settings/features

Lists all available features.

Name Located in Description
Object
Object properties:
items :
Array of Feature

list of features.

A list of all available features.

/settings/features

Enables or disables feature depends on the payload

Name Located in Description
feature body Feature
-

Feature was successfully accepted.

/storages

Lists all available storage backends.

Name Located in Description
Array of Storage

list of storage backends.

A list of all available storage backends.

Problem

Access forbidden

/storages

Creates a new storage backend.

Name Located in Description
storage body Storage

Storage description

Storage

Storage backend was successfully registered. Returns storage object that was created.

Problem

A storage with the same ID already exists

Problem

Access forbidden

/storages/default/{id}

Sets default storage to use in Nakadi.

Name Located in Description
id path String

storage backend ID

Storage

OK

Problem

Storage backend not found

Problem

Access forbidden

/storages/{id}

Retrieves a storage backend by its ID.

Name Located in Description
id path String

storage backend ID

Storage

OK

Problem

Storage backend not found

Problem

Access forbidden

/storages/{id}

Deletes a storage backend from its ID, if it is not in use.

Name Located in Description
id path String

storage backend ID

-

Storage backend was deleted

Problem

Storage backend not found

Problem

Storage backend could not be deleted

Problem

Access forbidden

/subscriptions

Lists all subscriptions that exist in a system. List is ordered by creation date/time descending (newest
subscriptions come first).

Name Located in Description
owning_application query String

Parameter to filter subscriptions list by owning application. If not specified - the result list will
contain subscriptions of all owning applications.

event_type query
Array of String
collectionFormat: multi

Parameter to filter subscriptions list by event types. If not specified - the result list will contain
subscriptions for all event types. It’s possible to provide multiple values like
event_type=et1&event_type=et2, in this case it will show subscriptions having both et1 and et2

limit query Integer
format: int64
default: 20
minimum: 1
maximum: 1000

maximum number of subscriptions retuned in one page

offset query Integer
format: int64
default: 0
minimum: 0

page offset; will be deprecated as it generates heavy load for the database

token query String

Replacement for offset. Used as a base for detecting from where and in which direction to iterate over
subscription list. In case if specified - offset parameter is ignored.
Used only in case if feature toggle token_subscription_iteration is on.
For test purposes it is possible to set to special value “new” in order to force migration from offset
based iteration even if feature is not on.
Apart from testing scenario should not be generated by client, is present in next/prev links in response

show_status query Boolean
default: false

show subscription status

reader query String

Will query all subscriptions that the given user or service can read from, based on authorization section. Service or user
should be listed explicitly on authorization section.

Example: “service:stups_nakadi-ui”

Object
Object properties:
_links : PaginationLinks
items :
Array of Subscription

list of subscriptions

OK

Problem

Bad Request

/subscriptions

This endpoint creates a subscription for EventTypes. The subscription is needed to be able to
consume events from EventTypes in a high level way when Nakadi stores the offsets and manages the
rebalancing of consuming clients.
The subscription is identified by its key parameters (owning_application, event_types, consumer_group). If
this endpoint is invoked several times with the same key subscription properties in body (order of even_types is
not important) - the subscription will be created only once and for all other calls it will just return
the subscription that was already created (ignoring the other parts of the body – the subscription will not be updated).

Name Located in Description
subscription body Subscription

Subscription to create

Subscription
headers:
Location:
description: The relative URI for this subscription resource.
type: string

Subscription for such parameters already exists. Returns subscription object that already
existed.

Subscription
headers:
Location:
description: The relative URI for the created resource.
type: string
Content-Location:
description: If the Content-Location header is present and the same as the Location header the client can assume it has an up to date representation of the Subscription and a corresponding GET request is not needed.
type: string

Subscription was successfuly created. Returns subscription object that was created.

Problem

Bad Request

Problem

Unprocessable Entity

/subscriptions/{subscription_id}

Returns a subscription identified by id.

Name Located in Description
subscription_id path String
format: uuid

Id of subscription.

Problem

Subscription not found

/subscriptions/{subscription_id}

This endpoint only allows to update the authorization section of a subscription. All other properties are
immutable. This operation is restricted to subjects with administrative role. This call captures the timestamp
of the update request.

Name Located in Description
subscription body Subscription

Subscription with modified authorization section.

subscription_id path String

Id of subscription

-

Subscription was updated

Problem

Access forbidden

Problem

Subscription not found

/subscriptions/{subscription_id}

Deletes a subscription.

Name Located in Description
subscription_id path String
format: uuid

Id of subscription.

-

Subscription was deleted

Problem

Access forbidden

Problem

Subscription not found

/subscriptions/{subscription_id}/cursors

Exposes the currently committed offsets of a subscription.

Name Located in Description
subscription_id path String
format: uuid

Id of subscription.

Object
Object properties:
items :

list of cursors for subscription

Ok

Problem

Subscription not found

/subscriptions/{subscription_id}/cursors

Endpoint for committing offsets of the subscription. If there is uncommited data, and no commits happen
for 60 seconds, then Nakadi will consider the client to be gone, and will close the connection. As long
as no events are sent, the client does not need to commit.

If the connection is closed, the client has 60 seconds to commit the events it received, from the moment
they were sent. After that, the connection will be considered closed, and it will not be possible to do
commit with that X-Nakadi-StreamId anymore.

When a batch is committed that also automatically commits all previous batches that were
sent in a stream for this partition.

Name Located in Description
subscription_id path String

Id of subscription

X-Nakadi-StreamId header String

Id of stream which client uses to read events. It is not possible to make a commit for a terminated or
none-existing stream. Also the client can’t commit something which was not sent to his stream.

cursors body Object
Object properties:
items :

List of cursors that the consumer acknowledges to have successfully processed.

-

Offsets were committed

Object
Object properties:
items :

list of items which describe commit result for each cursor

At least one cursor which was tried to be committed is older or equal to already committed one. Array
of commit results is returned for this status code.

Problem

Access forbidden

Problem

Subscription not found

Problem

Unprocessable Entity

/subscriptions/{subscription_id}/cursors

Reset subscription offsets to specified values.
Client connected after this operation will get events starting from next offset position.
During this operation the subscription’s consumers will be disconnected. The request can hang up until
subscription commit timeout. During that time requests to subscription streaming endpoint
will be rejected with 409. The clients should reconnect once the request is finished with 204.
In case, when subscription was never streamed, and therefore does not have cursors initialized, this call
will first initialize starting offsets, and then perform actual patch.
In order to provide explicit cursor initialization functionality this method supports empty cursors list,
allowing to initialize subscription cursors without side effects.

Name Located in Description
subscription_id path String

Id of subscription

cursors body Object
Object properties:
items :

List of cursors to reset subscription to.

-

Offsets were reset

Problem

Access forbidden

Problem

Subscription not found

Problem

Cursors reset is already in progress for provided subscription

Problem

Unprocessable Entity

/subscriptions/{subscription_id}/events

Starts a new stream for reading events from this subscription. The data will be automatically rebalanced
between streams of one subscription. The minimal consumption unit is a partition, so it is possible to start as
many streams as the total number of partitions in event-types of this subscription. The rebalance currently
only operates with the number of partitions so the amount of data in event-types/partitions is not considered
during autorebalance.
The position of the consumption is managed by Nakadi. The client is required to commit the cursors he gets in
a stream.

Name Located in Description
subscription_id path String
format: uuid

Id of subscription.

max_uncommitted_events query Integer
format: int32
default: 10
minimum: 1

The maximum number of uncommitted events that Nakadi will stream before pausing the stream. When in
paused state and commit comes - the stream will resume.

batch_limit query Integer
format: int32
default: 1

Maximum number of Events in each chunk (and therefore per partition) of the stream.

stream_limit query Integer
format: int32
default: 0

Maximum number of Events in this stream (over all partitions being streamed in this
connection).

  • If 0 or undefined, will stream batches indefinitely.

  • Stream initialization will fail if stream_limit is lower than batch_limit.
batch_flush_timeout query Number
format: int32
default: 30

Maximum time in seconds to wait for the flushing of each chunk (per partition).

  • If the amount of buffered Events reaches batch_limit before this batch_flush_timeout
    is reached, the messages are immediately flushed to the client and batch flush timer is reset.

  • If 0 or undefined, will assume 30 seconds.

  • Value is treated as a recommendation. Nakadi may flush chunks with a smaller timeout.
stream_timeout query Number
format: int32
default: 0
minimum: 0
maximum: 4200

Maximum time in seconds a stream will live before connection is closed by the server.
If 0 or unspecified will stream for 1h ±10min.

If this timeout is reached, any pending messages (in the sense of stream_limit) will be flushed
to the client.

Stream initialization will fail if stream_timeout is lower than batch_flush_timeout.
If the stream_timeout is greater than max value (4200 seconds) - Nakadi will treat this as not
specifying stream_timeout (this is done due to backwards compatibility).

stream_keep_alive_limit query Integer
format: int32
default: 0

Maximum number of empty keep alive batches to get in a row before closing the connection.

If 0 or undefined will send keep alive messages indefinitely.

commit_timeout query Number
format: int32
default: 60
maximum: 60
minimum: 0

Maximum amount of seconds that nakadi will be waiting for commit after sending a batch to a client.
In case if commit does not come within this timeout, nakadi will initialize stream termination, no
new data will be sent. Partitions from this stream will be assigned to other streams.
Setting commit_timeout to 0 is equal to setting it to the maximum allowed value - 60 seconds.

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

SubscriptionEventStreamBatch
headers:
X-Nakadi-StreamId:
description: the id of this stream generated by Nakadi. Must be used for committing events that were read by client from this stream.
type: string

Ok. Stream started.
Stream format is a continuous series of SubscriptionEventStreamBatchs separated by \n

Problem

Bad Request

Problem

Access forbidden

Problem

Subscription not found.

Problem

Conflict. There are several possible reasons for receiving this status code:

  • There are no empty slots for this subscriptions. The amount of consumers for this subscription
    already equals the maximum value - the total number of partitions in this subscription.

  • Request to reset subscription cursors is still in progress.

/subscriptions/{subscription_id}/events

GET with body.

Starts a new stream for reading events from this subscription. The minimal consumption unit is a partition, so
it is possible to start as many streams as the total number of partitions in event-types of this subscription.
The position of the consumption is managed by Nakadi. The client is required to commit the cursors he gets in
a stream.

If you create a stream without specifying the partitions to read from - Nakadi will automatically assign
partitions to this new stream. By default Nakadi distributes partitions among clients trying to give an equal
number of partitions to each client (the amount of data is not considered). This is default and the most common
way to use streaming endpoint.

It is also possible to directly request specific partitions to be delivered within the stream. If these
partitions are already consumed by another stream of this subscription - Nakadi will trigger a rebalance that
will assign these partitions to the new stream. The request will fail if user directly requests partitions that
are already requested directly by another active stream of this subscription. The overall picture will be the
following: streams which directly requested specific partitions will consume from them; streams that didn’t
specify which partitions to consume will consume partitions that left - Nakadi will autobalance free partitions
among these streams (balancing happens by number of partitions).

Specifying partitions to consume is not a trivial way to consume as it will require additional coordination
effort from the client application, that’s why it should only be used if such way of consumption should be
implemented due to some specific requirements.

Also, when using streams with directly assigned partitions, it is the user’s responsibility to detect, and react
to, changes in the number of partitions in the subscription (following the re-partitioning of an event type).
Using the GET /subscriptions/{subscription_id}/stats endpoint can be helpful.

Name Located in Description
streamParameters body Object
Object properties:
partitions :

List of partitions to read from in this stream. If absent or empty - then the partitions will be
automatically assigned by Nakadi.

max_uncommitted_events : Integer
format: int32
default: 10
minimum: 1

The maximum number of uncommitted events that Nakadi will stream before pausing the stream. When in
paused state and commit comes - the stream will resume.

batch_limit : Integer
format: int32
default: 1

Maximum number of Events in each chunk (and therefore per partition) of the stream.

stream_limit : Integer
format: int32
default: 0

Maximum number of Events in this stream (over all partitions being streamed in this
connection).

  • If 0 or undefined, will stream batches indefinitely.

  • Stream initialization will fail if stream_limit is lower than batch_limit.
batch_flush_timeout : Number
format: int32
default: 30

Maximum time in seconds to wait for the flushing of each chunk (per partition).

  • If the amount of buffered Events reaches batch_limit before this batch_flush_timeout
    is reached, the messages are immediately flushed to the client and batch flush timer is reset.

  • If 0 or undefined, will assume 30 seconds.

  • Value is treated as a recommendation. Nakadi may flush chunks with a smaller timeout.
batch_timespan : Number
format: int32
default: 0

Useful for batching events based on their received_at timestamp. For example, if batch_timespan is 5
seconds then Nakadi would flush a batch as soon as the difference in time between the first and the
last event in the batch exceeds 5 seconds. It waits for an event outside of the window to signal the
closure of a batch.

This is different from batch_flush_timeout as it takes into consideration the received_at
timestamp of events and not the time when the stream was open. This becomes clear considering there is
an accumulated backlog of more than 5 seconds. Using batch_timespan would cause a batch to be
flushed immediately, while batch_flush_timeout would wait for 5 seconds and flush the backglog plus
the events that might have arrived since the stream was open.

If 0 then this criteria is not used to close batches.

stream_timeout : Number
format: int32
default: 0
minimum: 0
maximum: 4200

Maximum time in seconds a stream will live before connection is closed by the server.
If 0 or unspecified will stream for 1h ±10min.

If this timeout is reached, any pending messages (in the sense of stream_limit) will be flushed
to the client.

Stream initialization will fail if stream_timeout is lower than batch_flush_timeout.
If the stream_timeout is greater than max value (4200 seconds) - Nakadi will treat this as not
specifying stream_timeout (this is done due to backwards compatibility).

commit_timeout : Number
format: int32
default: 60
maximum: 60
minimum: 0

Maximum amount of seconds that nakadi will be waiting for commit after sending a batch to a client.
In case if commit does not come within this timeout, nakadi will initialize stream termination, no
new data will be sent. Partitions from this stream will be assigned to other streams.
Setting commit_timeout to 0 is equal to setting it to the maximum allowed value - 60 seconds.

subscription_id path String
format: uuid

Id of subscription.

X-Flow-Id header String

The flow id of the request, which is written into the logs and passed to called services. Helpful
for operational troubleshooting and log analysis.

SubscriptionEventStreamBatch
headers:
X-Nakadi-StreamId:
description: the id of this stream generated by Nakadi. Must be used for committing events that were read by client from this stream.
type: string

Ok. Stream started.
Stream format is a continuous series of SubscriptionEventStreamBatchs separated by \n

Problem

Bad Request

Problem

Access forbidden

Problem

Subscription not found.

Problem

Conflict. There are several possible reasons for receiving this status code:

  • There are no empty slots for this subscriptions. The amount of consumers for this subscription
    already equals the maximum value - the total number of partitions in this subscription.

  • Request to reset subscription cursors is still in progress.

  • It is requested to read from a partition that is already assigned to another stream by direct request.
Problem

At least one of specified partitions doesn’t belong to this subscription.

/subscriptions/{subscription_id}/stats

exposes statistics of specified subscription

Name Located in Description
subscription_id path String
format: uuid

Id of subscription.

show_time_lag query Boolean
default: false

Shows consumer time lag as an optional field.
This option is a time consuming operation and Nakadi attempts to compute it in the best possible strategy.
In cases of failures, resulting in Nakadi being unable to compute it within a configurable timeout,
the field might either be partially present or not present (depending on the number of successful requests)
in the output.

Object
Object properties:
items :

statistics list for specified subscription

Ok

Problem

Subscription not found

Definitions

AdminAuthorization

Authorization section for admin operations. This section defines three access control lists: one for writing to admin endpoints and producing events (‘writers’), one for reading from admin endpoints and consuming events (‘readers’), and one for updating the list of administrators (‘admins’).

Name Description
admins
minItems: 1

An array of subject attributes that are required for updating the list of administrators. Any one of the
attributes defined in this array is sufficient to be authorized.

readers
minItems: 1

An array of subject attributes that are required for reading from admin endpoints. Any one of the
attributes defined in this array is sufficient to be authorized.

writers
minItems: 1

An array of subject attributes that are required for writing to admin endpoints. Any one of the
attributes defined in this array is sufficient to be authorized.

Annotations

Annotations of the Nakadi resource.

A Nakadi event-type or a subscription is considered as a Nakadi resource. A Nakadi resource can have a set of annotations associated with it. The syntax of annotations is adopted from Kubernetes.

From the Kubernetes documentation about annotation keys:

Annotations are key/value pairs. Valid annotation keys have two segments: an optional prefix and name, separated by a slash (/). The name segment is required and must be 63 characters or less, beginning and ending with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), dots (.), and alphanumerics between. The prefix is optional. If specified, the prefix must be a DNS subdomain: a series of DNS labels separated by dots (.), not longer than 253 characters in total, followed by a slash (/).

Values of the annotations are currently limited to 1000 bytes. This can be relaxed in the future. The prefix nakadi.io/ is reserved for internal use, and should not be used by clients.

example:{"nakadi.io/internal-event-type"=>"true", "criticality"=>"low"}
additionalProperties:{"type"=>"string"}

AuthorizationAttribute

An attribute for authorization. This object includes a data type, which represents the type of the attribute attribute (which data types are allowed depends on which authorization plugin is deployed, and how it is configured), and a value. A wildcard can be represented with data type ‘’, and value ‘’. It means that all authenticated users are allowed to perform an operation.

Name Description
data_type String

the type of attribute (e.g., ‘team’, or ‘permission’, depending on the Nakadi configuration)

value String

the value of the attribute

AvroSchema

The Avro schema with the version.

Name Description
version String

The schema version.

avro_schema Object
additionalProperties: true

Avro schema as it is.

BatchItemResponse

A status corresponding to one individual Event’s publishing attempt.

Name Description
eid String
format: uuid

eid of the corresponding item. Will be absent if missing on the incoming Event.

publishing_status String
enum: submitted, failed, aborted

Indicator of the submission of the Event within a Batch.

  • “submitted” indicates successful submission, including commit on he underlying broker.

  • “failed” indicates the message submission was not possible and can be resubmitted if so
    desired.

  • “aborted” indicates that the submission of this item was not attempted any further due
    to a failure on another item in the batch.
step String
enum: none, validating, partitioning, enriching, publishing

Indicator of the step in the publishing process this Event reached.

In Items that “failed” means the step of the failure.

  • “none” indicates that nothing was yet attempted for the publishing of this Event. Should
    be present only in the case of aborting the publishing during the validation of another
    (previous) Event.

  • “validating”, “partitioning”, “enriching” and “publishing” indicate all the
    corresponding steps of the publishing process.
detail String

Human readable information about the failure on this item. Items that are not “submitted”
should have a description.

BusinessEvent

A Business Event.

Usually represents a status transition in a Business process.

Name Description
metadata EventMetadata

Cursor

Name Description
partition String

Id of the partition pointed to by this cursor.

offset String

Offset of the event being pointed to.

Note that if you want to specify beginning position of a stream with first event at offset N,
you should specify offset N-1.
This applies in cases when you create new subscription or reset subscription offsets.
Also for stream start offsets one can use special value:

  • begin - read from the oldest available event.

CursorCommitResult

The result of single cursor commit. Holds a cursor itself and a result value.

Name Description
cursor SubscriptionCursor
result String

The result of cursor commit.

  • committed: cursor was successfully committed
  • outdated: there already was more recent (or the same) cursor committed, so the current one was not
    committed as it is outdated

CursorDistanceQuery

Name Description
initial_cursor Cursor
final_cursor Cursor

CursorDistanceResult

Mixin:
  1. CursorDistanceQuery
  2. Object
    Object properties:
    distance : Number
    format: int64

    Number of events between two offsets. Initial offset is exclusive. It’s only zero when both provided offsets
    are equal.

DataChangeEvent

A Data change Event.

Represents a change on a resource. Also contains indicators for the data type and the type of operation performed.

Name Description
data_type String
example: pennybags:order
data_op String
enum: C, U, D, S

The type of operation executed on the entity.

  • C: Creation
  • U: Update
  • D: Deletion
  • S: Snapshot
metadata EventMetadata
data Object

The payload of the type

Event

Note The Event definition will be externalized in future versions of this document.

A basic payload of an Event. The actual schema is dependent on the information configured for the EventType, as is its enforcement (see POST /event-types). Setting of metadata properties are dependent on the configured enrichment as well.

For explanation on default configurations of validation and enrichment, see documentation of EventType.category.

For concrete examples of what will be enforced by Nakadi see the objects BusinessEvent and DataChangeEvent below.

schema:{"oneOf"=>[{"$ref"=>"#/definitions/BusinessEvent"}, {"$ref"=>"#/definitions/DataChangeEvent"}, {"$ref"=>"#/definitions/UndefinedEvent"}]}

EventMetadata

Metadata for this Event.

Contains commons fields for both Business and DataChange Events. Most are enriched by Nakadi upon reception, but they in general MIGHT be set by the client.

Name Description
eid String
format: uuid
example: 105a76d8-db49-4144-ace7-e683e8f4ba46

Identifier of this Event.

Clients MUST generate this value and it SHOULD be guaranteed to be unique from the
perspective of the producer. Consumers MIGHT use this value to assert uniqueness of
reception of the Event.

event_type String
example: pennybags.payment-business-event

The EventType of this Event. This is enriched by Nakadi on reception of the Event
based on the endpoint where the Producer sent the Event to.

If provided MUST match the endpoint. Failure to do so will cause rejection of the
Event.

occurred_at String
format: RFC 3339 date-time
example: 1996-12-19T16:39:57-08:00

Technical timestamp of when the event object was created during processing of the
business event by the producer application. Note, it may differ from the timestamp
when the related real-world business event happened (e.g. when the packet was handed
over to the customer), which should be passed separately via an event type specific
attribute.
Depending on the producer implementation, the timestamp is typically some milliseconds
earlier than when the event is published and received by the API event post endpoint
server – see below.

received_at String
readOnly: true
format: RFC 3339 date-time
example: 1996-12-19T16:39:57-08:00

Timestamp of when the event was received via the API event post endpoints.
It is automatically enriched, and events will be rejected if set by the event producer.

version String
readOnly: true

Version of the schema used for validating this event. This is enriched upon reception.
This string uses semantic versioning, which is better defined in the EventTypeSchema object.

parent_eids
Array of String
format: uuid
example: 105a76d8-db49-4144-ace7-e683e8f4ba46

Event identifier of the Event that caused the generation of this Event.
Set by the producer.

published_by String
example: jane

ID of the subject that published this specific event to Nakadi.
NOTE: Enriched by Nakadi, should not be specified when publishing event.

flow_id String
example: JAh6xH4OQhCJ9PutIV_RYw

The flow-id of the producer of this Event. As this is usually a HTTP header, this is
enriched from the header into the metadata by Nakadi to avoid clients having to
explicitly copy this.

partition String
example: 0

Indicates the partition assigned to this Event.

Required to be set by the client if partition strategy of the EventType is
‘user_defined’.

partition_compaction_key String
example: 329ed3d2-8366-11e8-adc0-fa7ae01bbebc

Value used for per-partition compaction of the event type. Given two events with the same
partition_compaction_key value are published to the same partition, the later overwrites the former.
Required when ‘cleanup_policy’ of event type is set to ‘compact’ or ‘compact_and_delete’. Must be absent
otherwise.

When using more than one partition for event type - it will make sense to specify ‘partition_compaction_key’
and partitioning parameters in a way that events with the same partition_compaction_key are published to the
same partition and therefore compacted in a proper way (as compaction is performed per partition).

span_ctx Object
additionalProperties:
type: string

Object containing an OpenTracing http://opentracing.io span context. In case the producer of this event type
uses OpenTracing for tracing transactions accross distributed services, this field is populated with an
object that allows consumers to resume the context of a span. The details of the payload are vendor
specific and consumers are expected to contact the producer in order to obtain further details. Any attempt
to inspect or effectively use the payload, apart from the tracer implementation, can lead to vendor lock-in
which should be avoided.

EventOwnerSelector

Parameters to define per-event ownership. This selector can be used with an authorization plugin to differentiate which authorized consumers for an event type can receive an event published to the event type. The authorization section for an event type controls per event-type authorization, and this selector controls per-event authorization.

The value of the selector can be static, or be selected from within the event published (this is defined by the type of the selector), and name is informational field as to define what the selector contains. The semantics and affects of all fields of the selector should be handled by the authorization plugin.

In case the selector is null or not present in the event, it will be allowed to be read by all authorized consumers.

Once defined, the definition for EventOwnerSelector for an event-type cannot be updated or removed.

Name Description
type String
x-extensible-enum:
path
static
example: type

The type indicates how the value to decide authorization should be accessed.
“path” value means, that event owner value will be selected from every event by using dot path that is
specified in the “value” field of selector.
“static” means that event owner value will be just copied from “value” field of EventOwnerSelector of event
type definition

name String
example: team_name

Information/Description defining the data (classifier) used for per-event authorization.

value String
example: data.security.team

Based on the type of selector, the value is a string, specifying either the data to be used for per-event
authorization or path to a field inside the events published to fetch the data to be used for per-event
authorization.

EventStreamBatch

One chunk of events in a stream. A batch consists of an array of Events plus a Cursor pointing to the offset of the last Event in the stream.

The size of the array of Event is limited by the parameters used to initialize a Stream.

If acting as a keep alive message (see GET /event-type/{name}/events) the events array will be omitted.

Sequential batches might present repeated cursors if no new events have arrived.

Name Description
cursor Cursor
info StreamInfo
events
Array of Event

EventType

An event type defines the schema and its runtime properties.

Name Description
name String
pattern: [a-zA-Z][-0-9a-zA-Z_]*(\.[0-9a-zA-Z][-0-9a-zA-Z_]*)*
example: order.order_cancelled

Name of this EventType. The name is constrained by a regular expression.

Note: the name can encode the owner/responsible for this EventType and ideally should
follow a common pattern that makes it easy to read and understand, but this level of
structure is not enforced.

owning_application String
example: price-service

Indicator of the (Stups) Application owning this EventType.

category String
enum: undefined, data, business

Defines the category of this EventType.

The value set will influence, if not set otherwise, the default set of
validations, enrichment-strategies, and the effective schema for validation in
the following way:

  • undefined: No predefined changes apply. The effective schema for the validation is
    exactly the same as the EventTypeSchema.

  • data: Events of this category will be DataChangeEvents. The effective schema during
    the validation contains metadata, and adds fields data_op and data_type. The
    passed EventTypeSchema defines the schema of data.

  • business: Events of this category will be BusinessEvents. The effective schema for
    validation contains metadata and any additionally defined properties passed in the
    EventTypeSchema directly on top level of the Event. If name conflicts arise, creation
    of this EventType will be rejected.
enrichment_strategies
Array of String
enum: metadata_enrichment

Determines the enrichment to be performed on an Event upon reception. Enrichment is
performed once upon reception (and after validation) of an Event and is only possible on
fields that are not defined on the incoming Event.

For event types in categories ‘business’ or ‘data’ it’s mandatory to use
metadata_enrichment strategy. For ‘undefined’ event types it’s not possible to use this
strategy, since metadata field is not required.

See documentation for the write operation for details on behaviour in case of unsuccessful
enrichment.

partition_strategy String
default: random

Determines how the assignment of the event to a partition should be handled.

For details of possible values, see GET /registry/partition-strategies.

compatibility_mode String
default: forward

Compatibility mode provides a mean for event owners to evolve their schema, given changes respect the
semantics defined by this field.

It’s designed to be flexible enough so that producers can evolve their schemas while not
inadvertently breaking existent consumers.

Once defined, the compatibility mode is fixed, since otherwise it would break a predefined contract,
declared by the producer.

List of compatibility modes:

  • ‘compatible’: Consumers can reliably parse events produced under different versions. Every event published
    since the first version is still valid based on the newest schema. When in compatible mode, it’s allowed to
    add new optional properties and definitions to an existing schema, but no other changes are allowed.
    Under this mode, the following json-schema attributes are not supported: not, patternProperties,
    additionalProperties and additionalItems. When validating events, additional properties is false.

  • ‘forward’: Compatible schema changes are allowed. It’s possible to use the full json schema specification
    for defining schemas. Consumers of forward compatible event types can safely read events tagged with the
    latest schema version as long as they follow the robustness principle.

  • ‘none’: Any schema modification is accepted, even if it might break existing producers or consumers. When
    validating events, no additional properties are accepted unless explicitly stated in the schema.
schema EventTypeSchema
partition_key_fields
Array of String

Required when ‘partition_strategy’ is set to ‘hash’. Must be absent otherwise.
Indicates the fields used for evaluation the partition of Events of this type.

If this is set it MUST be a valid required field as defined in the schema.

cleanup_policy String
x-extensible-enum:
delete
compact
compact_and_delete
default: delete

Event type cleanup policy. Possible values:

  • ‘delete’ cleanup policy will delete old events after retention time expires. Nakadi guarantees that each
    event will be available for at least the retention time period. However Nakadi doesn’t guarantee that event
    will be deleted right after retention time expires.

  • ‘compact’ cleanup policy will keep only the latest event for each event key. The compaction is performed per
    partition, there is no compaction across partitions. The key that will be used as a compaction key should be
    specified in ‘partition_compaction_key’ field of event metadata. This cleanup policy is not available for
    ‘undefined’ category of event types.

  • ‘compact_and_delete’ cleanup policy combines both time based retention and per key compaction. It’s
    useful in situations where the entire keyset may be too large and require too much disk space to keep a
    copy of each event but at the same time allowing users to benefit from compaction in order to reduce
    duplication of events with the same key within the retention time. Its usage requires events to have a
    partition_compaction_key in the metadata in the same way as compact option.

    The compaction can not be applied to events that were published recently and located at the head of the
    queue, which means that the actual amount of events received by consumers can be different depending on time
    when the consumption happened.

    When using ‘compact’ cleanup policy user should consider that different Nakadi endpoints showing the amount
    of events will actually show the original amount of events published, not the actual amount of events that
    are currently there.
    E.g. subscription /stats endpoint will show the value ‘unconsumed_events’ - but that may not match with the
    actual amount of events unconsumed in that subscription as ‘compact’ cleanup policy may delete older events
    in the middle of queue if there is a newer event for the same key published.

    For more details about compaction implementation please read the documentation of Log Compaction in Kafka
    https://kafka.apache.org/documentation/#compaction, Nakadi currently relies on this implementation.

    For an existing event type, it is possible to change the value of this field only from delete to compact_and_delete.
default_statistic EventTypeStatistics
options EventTypeOptions
authorization EventTypeAuthorization
annotations Annotations
labels Labels
ordering_key_fields
Array of String
example: data.incremental_counter

Indicates a single ordering field. This is a dot separated string, which is applied
onto the whole event object, including the contained metadata and data (in
case of a data change event) objects.

The field must be present in the schema. This field can be modified at any moment, but event type owners are
expected to notify consumer in advance about the change.

This is only an informational field. The events are delivered to consumers in the order they were published.
No reordering is done by Nakadi.

This field is useful in case the producer wants to communicate the complete order accross all the events
published to all partitions. This is the case when there is an incremental generator on the producer side,
for example.

It differs from partition_key_fields in the sense that it’s not used for partitioning (known as sharding in
some systems). The order indicated by ordering_key_fields can also differ from the order the events are in
each partition, in case of out-of-order submission.

In most cases, this would have just a single item (the path of the field
by which this is to be ordered), but can have multiple items, in which case
those are considered as a compound key, with lexicographic ordering (first
item is most significant).

event_owner_selector EventOwnerSelector
ordering_instance_ids
Array of String
example: data.order_number

Indicates a single key field. It is a simple path (dot separated) to the JSON leaf element of the whole
event object, including the contained metadata and data (in case of a data change event) objects, and it
must be present in the schema.

This is only an informational field.

Indicates which field represents the data instance identifier and scope in which ordering_key_fields
provides a strict order. It is typically a single field, but multiple fields for compound identifier
keys are also supported.

This is an informational-only event type attribute without specific Nakadi semantics for specification of
application level ordering. It only can be used in combination with ordering_key_fields.

This field can be modified at any moment, but event type owners are expected to notify consumer in advance
about the change.

audience String
x-extensible-enum:
component-internal
business-unit-internal
company-internal
external-partner
external-public

Intended target audience of the event type. Relevant for standards around quality of design and documentation,
reviews, discoverability, changeability, and permission granting. See the guidelines
https://opensource.zalando.com/restful-api-guidelines/#219

This attribute adds no functionality and is used only to inform users about the usage scope of the event type.

created_at String
pattern: RFC 3339 date-time

Date and time when this event type was created.

updated_at String
pattern: RFC 3339 date-time

Date and time when this event type was last updated.

EventTypeAuthorization

Authorization section for an event type. This section defines three access control lists: one for producing events (‘writers’), one for consuming events (‘readers’), and one for administering an event type (‘admins’). Regardless of the values of the authorization properties, administrator accounts will always be authorized.

Name Description
admins
minItems: 1

An array of subject attributes that are required for updating the event type. Any one of the attributes
defined in this array is sufficient to be authorized. The wildcard item takes precedence over all others,
i.e. if it is present, all users are authorized.

readers
minItems: 1

An array of subject attributes that are required for reading events from the event type. Any one of the
attributes defined in this array is sufficient to be authorized. The wildcard item takes precedence over
all others, i.e., if it is present, all users are authorized.

writers
minItems: 1

An array of subject attributes that are required for writing events to the event type. Any one of the
attributes defined in this array is sufficient to be authorized.

EventTypeOptions

Additional parameters for tuning internal behavior of Nakadi.

Name Description
retention_time Integer
format: int64
default: 172800000

Number of milliseconds that Nakadi stores events published to this event type.

EventTypePartition

represents event-type:partition pair.

Name Description
event_type String

event-type name.

partition String

partition of the event-type.

EventTypeSchema

The most recent schema for this EventType. Submitted events will be validated against it.

Name Description
version String
readOnly: true
default: 1.0.0

This field is automatically generated by Nakadi. Values are based on semantic versioning
and represent compatibility information: Changes not allowed in compatible mode are
considered MAJOR level, for example, field type changes or adding a required field (which
is e.g. compatible in forward mode). Structural changes that are allowed in compatible
mode, i.e. adding new optional fields are considered MINOR level. Non-structural changes
that are allowed in compatible mode, e.g. updating descriptions are considered PATCH level. 

created_at String
readOnly: true
format: RFC 3339 date-time
example: 1996-12-19T16:39:57-08:00

Creation timestamp of the schema. This is generated by Nakadi. It should not be
specified when updating a schema and sending it may result in a client error.

type String
enum: json_schema, avro_schema

The type of schema definition. Currently only json_schema (JSON Schema v04) is supported, but in the
future there could be others.

schema String

The schema as string in the syntax defined in the field type. Failure to respect the
syntax will fail any operation on an EventType.

EventTypeStatistics

This field defines the number of partitions in the underlying Kafka topic of an event type. The amount of partitions is given by the expression max(read_parallelism, write_parallelism). The maximum number of partitions is specific to each deployment of Nakadi and should be referred to in a separated document.

For historical reasons the way that the number of partitions is defined is not as straighforward as it could. The fields messages_per_minute and message_size could potentially influence the resulting amount of partitions, so it’s recommended to set both of them to 1 (one). Providing values different than 1 could result in a higher number of partitions being created.

For those interested in why these fields exist, in the beginning of the project the developers run a very rudimentary benchmark to understand how much data could be ingested by a single Kafka topic-partition. This benchmark data was later used by this feature to define the suposedely ideal number of partitions for the user’s needs. Over time the maintainers of the project found this benchmark to be unreliable, usually resulting in fewer partitions than needed.

Name Description
messages_per_minute Integer

This property is no longer supported. Please provide value of 1 for backward compatibility.

message_size Integer

This property is no longer supported. Please provide value of 1 for backward compatibility.

read_parallelism Integer

Amount of parallel readers (consumers) to this EventType. This property is used to define the number of
partitions of the underlying Kafka topic by the expression max(read_parallelism, write_parallelism).

write_parallelism Integer

Amount of parallel writers (producers) to this EventType. This property is used to define the number of
partitions of the underlying Kafka topic by the expression max(read_parallelism, write_parallelism).

Feature

Feature of Nakadi to be enabled or disabled

Name Description
feature String
enabled Boolean

Labels

Labels of the Nakadi resource.

A Nakadi event-type or a subscription is considered as a Nakadi resource. A Nakadi resource can have a set of labels associated with it. The syntax of labels is adopted from Kubernetes.

From the Kubernetes documentation about label keys:

Labels are key/value pairs. Valid label keys have two segments: an optional prefix and name, separated by a slash (/). The name segment is required and must be 63 characters or less, beginning and ending with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), dots (.), and alphanumerics between. The prefix is optional. If specified, the prefix must be a DNS subdomain: a series of DNS labels separated by dots (.), not longer than 253 characters in total, followed by a slash (/).

From the Kubernetes documentation about Label values:

A valid value must be 63 characters or less (can be empty), unless empty, must begin and end with an alphanumeric character ([a-z0-9A-Z]),could contain dashes (-), underscores (_), dots (.), and alphanumerics between.

The prefix nakadi.io/ is reserved for internal use, and should not be used by clients.

example:{"classification"=>"production.tier-1", "custom-label"=>"custom-label-value"}
additionalProperties:{"type"=>"string"}

Metrics

Object containing application metrics.

PaginationLink

URI identifying another page of items

Name Description
href String
format: uri
example: /subscriptions?offset=0&limit=10&token=abracadabra

PaginationLinks

contains links to previous and next pages of items

Name Description
prev PaginationLink
next PaginationLink

Partition

Partition information. Can be helpful when trying to start a stream using an unmanaged API.

This information is not related to the state of the consumer clients.

Name Description
partition String
oldest_available_offset String

An offset of the oldest available Event in that partition. This value will be changing
upon removal of Events from the partition by the background archiving/cleanup mechanism.

newest_available_offset String

An offset of the newest available Event in that partition. This value will be changing
upon reception of new events for this partition by Nakadi.

This value can be used to construct a cursor when opening streams (see
GET /event-type/{name}/events for details).

Might assume the special name BEGIN, meaning a pointer to the offset of the oldest
available event in the partition.

unconsumed_events Number
format: int64

Approximate number of events unconsumed by the client. This is also known as consumer lag and is used for
monitoring purposes by consumers interested in keeping an eye on the number of unconsumed events.

If the event type uses ‘compact’ cleanup policy - then the actual number of unconsumed events in this
partition can be lower than the one reported in this field.

PartitionCount

Name Description
partition_count Integer

Desired number of partitions for an event type. Should always be more than or equal to existing number of
partitions.

Problem

Name Description
type String
format: uri
example: http://httpstatus.es/503

An absolute URI that identifies the problem type. When dereferenced, it SHOULD provide
human-readable API documentation for the problem type (e.g., using HTML). This Problem
object is the same as provided by https://github.com/zalando/problem

title String
example: Service Unavailable

A short, summary of the problem type. Written in English and readable for engineers
(usually not suited for non technical stakeholders and not localized)

status Integer
format: int32
example: 503

The HTTP status code generated by the origin server for this occurrence of the problem.

detail String
example: Connection to database timed out

A human readable explanation specific to this occurrence of the problem.

instance String
format: uri

An absolute URI that identifies the specific occurrence of the problem.
It may or may not yield further information if dereferenced.

ShiftedCursor

Mixin:
  1. Cursor
  2. Object
    Object properties:
    shift : Number
    format: int64

    This number is a modifier for the offset. It moves the cursor forward or backwards by the number of events
    provided.
    For example, suppose a user wants to read events starting 100 positions before offset
    “001-000D-0000000000000009A8”, it’s possible to specify shift with -100 and Nakadi will make the
    necessary calculations to move the cursor backwards relatively to the given offset.

    Users should use this feature only for debugging purposes. Users should favor using cursors provided in
    batches when streaming from Nakadi. Navigating in the stream using shifts is provided only for
    debugging purposes.

Storage

A storage backend.

Name Description
id String
maxLength: 36

The ID of the storage backend.

storage_type String

the type of storage. Possible values: [‘kafka’]

kafka_configuration Object
Object properties:
exhibitor_address : String

the Zookeeper address

exhibitor_port : String

the Zookeeper path

zk_address : String

the Zookeeper address

zk_path : String

the Zookeeper path

configuration settings for kafka storage. Only necessary if the storage type is ‘kafka’

StreamInfo

This object contains general information about the stream. Used only for debugging purposes. We recommend logging this object in order to solve connection issues. Clients should not parse this structure.

Subscription

Subscription is a high level consumption unit. Subscriptions allow applications to easily scale the number of clients by managing consumed event offsets and distributing load between instances. The key properties that identify subscription are ‘owning_application’, ‘event_types’ and ‘consumer_group’. It’s not possible to have two different subscriptions with these properties being the same. Creating a subscription with the keys matching an existing subscription will neither create a new one nor change the existing subscription, ignoring the remaining fields (e.g. read_from, authorization, etc.).

Name Description
id String
readOnly: true

Id of subscription that was created. Is generated by Nakadi, should not be specified when creating
subscription.

owning_application String
example: gizig
minLength: 1

The id of application owning the subscription.

event_types
Array of String

EventTypes to subscribe to.
The order is not important. Subscriptions that differ only by the order of EventTypes will be
considered the same and will have the same id. The size of event_types list is limited by total number
of partitions within these event types. Default limit for partition count is 100.

consumer_group String
example: read-product-updates
minLength: 1
default: default

The value describing the use case of this subscription.
In general that is an additional identifier used to differ subscriptions having the same
owning_application and event_types.

created_at String
readOnly: true
format: RFC 3339 date-time
example: 1996-12-19T16:39:57-08:00

Timestamp of creation of the subscription. This is generated by Nakadi. It should not be
specified when creating subscription and sending it may result in a client error.

updated_at String
readOnly: true
format: RFC 3339 date-time
example: 1996-12-19T16:39:57-08:00

Timestamp of last update of the subscription. This is generated by Nakadi. It should not be
specified when creating subscription and sending it may result in a client error. Its initial value is same
as created_at.

read_from String
default: end

Position to start reading events from. Currently supported values:

  • begin - read from the oldest available event.
  • end - read from the most recent offset.
  • cursors - read from cursors provided in initial_cursors property.
    Applied when the client starts reading from a subscription.
initial_cursors

List of cursors to start reading from. This property is required when read_from = cursors.
The initial cursors should cover all partitions of subscription.
Clients will get events starting from next offset positions.

status

Subscription status. This data is only available when querying the subscriptions endpoint for
status.

authorization SubscriptionAuthorization
annotations Annotations
labels Labels

SubscriptionAuthorization

Authorization section of a Subscription. This section defines two access control lists: one for consuming events and committing cursors (‘readers’), and one for administering a subscription (‘admins’). Regardless of the values of the authorization properties, administrator accounts will always be authorized.

Name Description
admins
minItems: 1

An array of subject attributes that are required for updating the subscription. Any one of the attributes
defined in this array is sufficient to be authorized. The wildcard item takes precedence over all others,
i.e. if it is present, all users are authorized.

readers
minItems: 1

An array of subject attributes that are required for reading events and committing cursors to this
subscription. Any one of the attributes defined in this array is sufficient to be authorized. The wildcard
item takes precedence over all others, i.e., if it is present, all users are authorized.

SubscriptionCursor

Mixin:
  1. Cursor
  2. Object
    Object properties:
    event_type : String

    The name of the event type this partition’s events belong to.

    cursor_token : String

    An opaque value defined by the server.

SubscriptionCursorWithoutToken

Mixin:
  1. Cursor
  2. Object
    Object properties:
    event_type : String

    The name of the event type this partition’s events belong to.

SubscriptionEventStreamBatch

Analogue to EventStreamBatch but used for high level streamming. It includes specific cursors for committing in the high level API.

Name Description
cursor SubscriptionCursor
info StreamInfo
events
Array of Event

SubscriptionEventTypeStats

statistics of one event-type within a context of subscription

Name Description
event_type String

event-type name

partitions
Array of Object
Object properties:
partition : String

the partition id

state : String

The state of this partition in current subscription. Currently following values are possible:

  • unassigned: the partition is currently not assigned to any client;
  • reassigning: the partition is currently reasssigning from one client to another;
  • assigned: the partition is assigned to a client.
unconsumed_events : Number

The amount of events in this partition that are not yet consumed within this subscription.
The property may be absent at the moment when no events were yet consumed from the partition in this
subscription (In case of read_from is BEGIN or END)

If the event type uses ‘compact’ cleanup policy - then the actual number of unconsumed events can be
lower than the one reported in this field.

consumer_lag_seconds : Number

Subscription consumer lag for this partition in seconds. Measured as the age of the oldest event of
this partition that is not yet consumed within this subscription.

stream_id : String

the id of the stream that consumes data from this partition

assignment_type : String
  • direct: partition can’t be transferred to another stream until the stream is closed;
  • auto: partition can be transferred to another stream in case of rebalance, or if another stream
    requests to read from this partition.

statistics of partition within a subscription context

statistics of partitions of this event-type

SubscriptionEventTypeStatus

Status of one event-type within a context of subscription

Name Description
event_type String

event-type name

partitions
Array of Object
Object properties:
partition : String

The partition id

state : String

The state of this partition in current subscription. Currently following values are possible:

  • unassigned: the partition is currently not assigned to any client;
  • reassigning: the partition is currently reasssigning from one client to another;
  • assigned: the partition is assigned to a client.
stream_id : String

the id of the stream that consumes data from this partition

assignment_type : String
  • direct: partition can’t be transferred to another stream until the stream is closed;
  • auto: partition can be transferred to another stream in case of rebalance, or if another stream
    requests to read from this partition.

status of partition within a subscription context

status of partitions of this event-type

UndefinedEvent

An Event without any defined category.

Nakadi will not change any content of this event.