ksqlDB
ksqlDB is a purpose-built database for stream processing applications, ingesting data from Apache Kafka .
Available on the Enterprise Premier plan . Contact us for details.
See how you can use ksqlDB and Cube Cloud to power real-time analytics in Power BI:
In this video, the SQL API is used to connect to Power BI. Currently, it’s recommended to use the DAX API.
Prerequisites
- Hostname for the ksqlDB server
- Username and password (or an API key) to connect to ksqlDB server
Confluent Cloud
If you are using Confluent Cloud , you need to generate an API key and use the API key name as your username and the API key secret as your password.
You can generate an API key by installing confluent-cli and running the
following commands in the command line:
brew install --cask confluent-cli
confluent login
confluent environment use <YOUR-ENVIRONMENT-ID>
confluent ksql cluster list
confluent api-key create --resource <YOUR-KSQL-CLUSTER-ID>Setup
Manual
Add the following to a .env file in your Cube project:
CUBEJS_DB_TYPE=ksql
CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DB_USER=username
CUBEJS_DB_PASS=passwordEnvironment Variables
| Environment Variable | Description | Possible Values | Required |
|---|---|---|---|
CUBEJS_DB_URL | The host URL for ksqlDB with port | A valid database host URL | ✅ |
CUBEJS_DB_USER | The username used to connect to the ksqlDB. API key for Confluent Cloud. | A valid database username | ✅ |
CUBEJS_DB_PASS | The password used to connect to the ksqlDB. API secret for Confluent Cloud. | A valid database password | ✅ |
CUBEJS_DB_KAFKA_HOST | Kafka broker host(s) for Kafka streams mode. Multiple brokers can be comma-separated. | A valid Kafka broker URL | ❌ |
CUBEJS_DB_KAFKA_USER | Username for Kafka broker authentication (SASL PLAIN) | A valid Kafka username | ❌ |
CUBEJS_DB_KAFKA_PASS | Password for Kafka broker authentication (SASL PLAIN) | A valid Kafka password | ❌ |
CUBEJS_DB_KAFKA_USE_SSL | If true, enables SASL_SSL for the Kafka connection | true, false | ❌ |
CUBEJS_CONCURRENCY | The number of concurrent queries to the data source | A valid number | ❌ |
Pre-Aggregations Support
ksqlDB supports only streaming pre-aggregations.
Kafka streams mode
By default, Cube connects to ksqlDB via its REST API. ksqlDB uses its REST API both for metadata (discovering tables and streams) and for streaming data into Cube Store during pre-aggregation builds.
In this default mode, Cube may create tables and streams in ksqlDB as part
of the pre-aggregation build process (e.g., CREATE TABLE ... AS SELECT
statements for non-read-only pre-aggregations).
When Kafka streams mode is enabled, Cube reads data directly from the underlying Kafka topics instead of going through the ksqlDB REST API for data streaming. ksqlDB is still used for metadata operations such as discovering tables, streams, and their schemas, but Cube Store subscribes to the backing Kafka topic directly.
In this mode, Cube does not create any tables or streams in ksqlDB. All pre-aggregations use the read-only refresh path: Cube discovers the existing ksqlDB objects and their backing Kafka topics, then streams data directly from Kafka into Cube Store.
When to use Kafka streams mode
Kafka streams mode is useful when:
- You want to prevent Cube from creating any objects in ksqlDB
- You need higher throughput for data ingestion by reading Kafka directly
- Your ksqlDB environment has restricted permissions that don’t allow creating tables or streams
- You prefer Cube Store to consume from Kafka topics without an intermediary
Enabling Kafka streams mode
Set the CUBEJS_DB_KAFKA_HOST environment variable to the address of your
Kafka broker(s). This activates Kafka streams mode automatically:
CUBEJS_DB_TYPE=ksql
CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DB_USER=ksql_username
CUBEJS_DB_PASS=ksql_password
CUBEJS_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092
CUBEJS_DB_KAFKA_USER=kafka_api_key
CUBEJS_DB_KAFKA_PASS=kafka_api_secret
CUBEJS_DB_KAFKA_USE_SSL=trueMultiple Kafka brokers can be specified as a comma-separated list:
CUBEJS_DB_KAFKA_HOST=broker1:9092,broker2:9092,broker3:9092When using Confluent Cloud ,
the Kafka credentials are separate from the ksqlDB credentials. Generate
an API key for the Kafka cluster (not the ksqlDB cluster) and use it as
CUBEJS_DB_KAFKA_USER and CUBEJS_DB_KAFKA_PASS.
How it works
With Kafka streams mode enabled:
- Cube uses the ksqlDB REST API to discover available tables and streams
and to retrieve their schemas via
DESCRIBE. - For each table or stream, Cube resolves the backing Kafka topic name from the ksqlDB metadata.
- Instead of streaming data through ksqlDB, Cube Store connects directly to the Kafka broker(s) and consumes from the resolved topic.
- Pre-aggregation builds use the read-only refresh strategy. Cube does
not issue any
CREATE TABLEorCREATE STREAMstatements to ksqlDB.
Data modeling
ksqlDB is typically used as an additional data source alongside a primary
data warehouse. To use Kafka streams mode, configure ksqlDB as a named
data source using decorated environment variables
and point your cubes to it with the
data_source property.
First, declare the data sources and configure the ksqlDB connection with Kafka credentials:
CUBEJS_DATASOURCES=default,ksql
CUBEJS_DB_TYPE=postgres
CUBEJS_DB_HOST=my.postgres.host
CUBEJS_DB_NAME=my_database
CUBEJS_DB_USER=postgres_user
CUBEJS_DB_PASS=postgres_password
CUBEJS_DS_KSQL_DB_TYPE=ksql
CUBEJS_DS_KSQL_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DS_KSQL_DB_USER=ksql_api_key
CUBEJS_DS_KSQL_DB_PASS=ksql_api_secret
CUBEJS_DS_KSQL_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092
CUBEJS_DS_KSQL_DB_KAFKA_USER=kafka_api_key
CUBEJS_DS_KSQL_DB_KAFKA_PASS=kafka_api_secret
CUBEJS_DS_KSQL_DB_KAFKA_USE_SSL=trueThen, create cubes that reference your data. A common pattern is to combine a batch cube (reading historical data from your warehouse) with a streaming cube (reading real-time data from ksqlDB via Kafka) using a lambda pre-aggregation.
The batch cube queries the warehouse and builds daily partitions
incrementally. The streaming cube points at an existing ksqlDB stream
with data_source: ksql and uses a read-only streaming pre-aggregation
that consumes from the backing Kafka topic directly. The lambda
pre-aggregation in the batch cube merges both, serving historical data
from the warehouse rollup and real-time data from the streaming rollup:
cube("order_events", {
data_source: "default",
sql: `
SELECT
order_id,
user_id,
status,
amount,
created_at
FROM ecommerce.order_events
WHERE ${FILTER_PARAMS.order_events.created_at.filter(
(from, to) => `created_at >= ${from} AND created_at < ${to}`
)}
`,
measures: {
count: {
type: `count`,
},
total_amount: {
sql: `amount`,
type: `sum`,
},
failed_count: {
sql: `CASE WHEN status = 'failed' THEN 1 ELSE 0 END`,
type: `sum`,
},
},
dimensions: {
order_id: {
sql: `order_id`,
type: `string`,
primary_key: true,
},
user_id: {
sql: `user_id`,
type: `string`,
},
status: {
sql: `status`,
type: `string`,
},
created_at: {
sql: `created_at`,
type: `time`,
},
},
pre_aggregations: {
lambda: {
type: `rollup_lambda`,
rollups: [
order_events.batch,
order_events_stream.stream,
],
},
batch: {
type: `rollup`,
measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count],
dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status],
time_dimension: CUBE.created_at,
granularity: `second`,
partition_granularity: `day`,
build_range_start: {
sql: `SELECT NOW() - INTERVAL '90 days'`,
},
build_range_end: {
sql: `SELECT NOW()`,
},
refresh_key: {
every: `8 hour`,
update_window: `1 day`,
incremental: true,
},
indexes: {
user_status: {
columns: [CUBE.user_id, CUBE.status],
},
},
},
},
});
cube("order_events_stream", {
data_source: "ksql",
sql: `SELECT * FROM ORDER_EVENTS_STREAM`,
measures: {
count: {
type: `count`,
},
total_amount: {
sql: `AMOUNT`,
type: `sum`,
},
failed_count: {
sql: `CASE WHEN STATUS = 'failed' THEN 1 ELSE 0 END`,
type: `sum`,
},
},
dimensions: {
order_id: {
sql: `ORDER_ID`,
type: `string`,
primary_key: true,
},
user_id: {
sql: `USER_ID`,
type: `string`,
},
status: {
sql: `STATUS`,
type: `string`,
},
created_at: {
sql: `CREATED_AT`,
type: `time`,
},
},
pre_aggregations: {
stream: {
type: `rollup`,
read_only: true,
measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count],
dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status],
unique_key_columns: [`order_id`],
time_dimension: CUBE.created_at,
granularity: `second`,
partition_granularity: `day`,
build_range_start: {
sql: `SELECT date_trunc('day', DATE_SUB(NOW(), INTERVAL '5 hour'))`,
},
build_range_end: {
sql: `SELECT DATE_ADD(NOW(), INTERVAL '15 minute')`,
},
refresh_key: {
every: `1 minute`,
update_window: `1 hour`,
incremental: true,
},
indexes: {
user_status: {
columns: [CUBE.user_id, CUBE.status],
},
},
stream_offset: `latest`,
},
},
});Key properties for the streaming pre-aggregation:
read_only: true— Cube will not create any objects in ksqlDB. The data is consumed directly from the backing Kafka topic.stream_offset: "latest"— start consuming from the latest offset in the Kafka topic. Use"earliest"to replay from the beginning.unique_key_columns— columns that uniquely identify a record, used for deduplication during ingestion into Cube Store.
The sql_table or sql value should reference an existing ksqlDB stream
or table. Cube discovers its schema automatically. With Kafka streams
mode enabled, the streaming pre-aggregation reads the backing Kafka topic
directly — no objects are created in ksqlDB.
Was this page useful?