Skip to main content

ksqlDB Quickstart

Description

ksqldb-quickstart.png
  • create a stream
  • run a continuous query over the stream
  • populate the stream with events

Docker

docker-compose.yml
---
version: '2'

services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-enterprise-kafka:latest
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

ksqldb-server:
image: confluentinc/ksqldb-server:latest
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

ksqldb-cli:
image: confluentinc/ksqldb-cli:latest
container_name: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true

Start Kafka/ksqldb

docker-compose up

Start ksqlDB's interactive CLI

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= The Database purpose-built =
= for stream processing apps =
===========================================

Copyright 2017-2022 Confluent Inc.

CLI v0.28.2, Server v0.28.2 located at http://ksqldb-server:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

Create a stream


ksql> CREATE STREAM riderLocations01 (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
> WITH (kafka_topic='locations', value_format='json', partitions=1);

Message
----------------
Stream created
----------------
ksql>

CREATE STREAM parameters


kafka_topic - Name of the Kafka topic underlying the stream. In this case it will be automatically created because it doesn't exist yet, but streams may also be created over topics that already exist.
value_format - Encoding of the messages stored in the Kafka topic. For JSON encoding, each row will be stored as a JSON object whose keys/values are column names/values. For example: {"profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205}
partitions - Number of partitions to create for the locations topic. Note that this parameter is not needed for topics that already exist.

Run a continuous query over the stream

This query will output all rows from the riderLocations01 stream whose coordinates are within 5 miles of Mountain View.

This is the first thing that may feel a bit unfamiliar to you, because the query will never return until it's terminated. It will perpetually push output rows to the client as events are written to the riderLocations stream.

Leave this query running in the CLI session for now. Next, we're going to write some data into the riderLocations01 stream so that the query begins producing output.

 -- Mountain View lat, long: 37.4133, -122.1162
>SELECT * FROM riderLocations01
> WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
>
+---------------------------------------------+---------------------------------------------+---------------------------------------------+
|PROFILEID |LATITUDE |LONGITUDE |
+---------------------------------------------+---------------------------------------------+---------------------------------------------+

Press CTRL-C to interrupt

Start another CLI session

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

Populate the stream with events

INSERT INTO riderLocations01 (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
INSERT INTO riderLocations01 (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
INSERT INTO riderLocations01 (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
INSERT INTO riderLocations01 (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
INSERT INTO riderLocations01 (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
INSERT INTO riderLocations01 (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);

Output of the first CLI:


+---------------------------------------------+---------------------------------------------+---------------------------------------------+
|PROFILEID |LATITUDE |LONGITUDE |
+---------------------------------------------+---------------------------------------------+---------------------------------------------+
|4ab5cbad |37.3952 |-122.0813 |
|8b6eae59 |37.3944 |-122.0813 |
|4a7c7b41 |37.4049 |-122.0822 |

--
On the base: https://ksqldb.io/quickstart-enterprise.html