What is the format of the routingKey for an amqp, kinesis, or kafka integration rule?

The routingKey (which for a Kinesis rule is used as partitionKey, and for an amqp rule as the routing key) is an interpolable string.

Interpolation is done using #{...}, similar to ruby. The contents of the interpolation is a variable name, of which the following are currently supported: channelName, message.name, message.id, message.clientId, and message.extras.headers['someHeaderName'].

This is optionally followed by a filter. Filters use a pipe syntax, similar to AngularJS filter expressions, jq, or bash. Currently supported filters are hash and moduloHash.

hash transforms the variable into a stringified 32-bit fingerprint, and takes an optional numerical argument, the base to use when stringifying (defaults to 16).

moduloHash is similar to hash, but runs the result through a modulo function before stringifying, which is useful for bucketing. It takes one mandatory argument (the number of buckets) and one optional argument (the base to use when stringifying; defaults to 16).

If using a filter, you can specify a tuple of two or more variables as the input to the filter. It should be comma-separated and delimited with parentheses.

The message.extras.headers item is populated from the specified element the headers object (a map of string to string) inside message extras. For example, to publish a message with a "foo" header in ably-js, channel.publish({ name: <message name>, body: <message body>, extras: {headers: {foo: "bar"}} }).

Kafka

For a Kafka rule, the routingKey includes both the topic and message routing key, joined by a colon (e.g. "topic:key", or with interpolation "topic-#{channelName}:message-key-#{message.name}"). So either or both can be dynamic. Note that this split is done after any interpolation; but since Kafka topics cannot contain a colon, this does not introduce any ambiguity.

Examples:

  • "the-channel-name-is-#{channelName}"
  • "hashed channel name: #{channelName | hash} as hex, #{channelName | hash(10)} as decimal"
  • "the-foo-header-is-#{ message.extras.headers['foo'] }"
  • "channel name mod 256: #{channelName | moduloHash(256)}, or in octal: #{channelName | moduloHash(256, 8)}"
  • "message name: #{message.name}, clientId: #{message.clientId}"
  • "hexadecimal hash combining all message properties except id: #{(message.name, message.clientId, channelName) | hash}"
  • "#{message.id} will be different for every message, so useful for routing to kinesis shards at random"

  • "shard-#{message.id | moduloHash(4, 10)}" will be one of "shard-0", "shard-1", "shard-2", "shard-3" 
  • For Kafka: "topic-#{channelName}:message-key-#{message.name}"