Clickstream plays an important role in analyzing customer behavior. It also helps organizations in making future business strategies. So, let’s discuss real-time clickstream analysis using one of the most prominent and iconoclast component of Kafka Ecosystem i.e. KsqlDB. As it not only provides capabilities of processing and analyzing real-time data, but it also provides organizations to connect to the source/sink.

Let's Get Started

Let’s skip all the theoretical parts and straight come to the hands-on aspects of Clickstream Analysis using KsqlDB.

Before starting with this blog, first of all, go through our blog on Quickstart Guide for Installing Confluent Platform having Apache Kafka using Docker. As this blog is the succeeding part of the previous blog.

Once you are done with setting up the Confluent Platform using Docker. Then proceed further with this blog. Verify that the services are up and running, run the following command:

$~: docker-compose ps

Now, login to Confluent Control Center UI: http://localhost:9021

Note: If you are deploying on the cloud, then do make sure you have provided the necessary firewall rules for Confluent Control Center UI

Click on cluster 1 i.e controlcenter.cluster

The DatagenConnector provides some simple schemas for generating example clickstream, orders, users, and pageviews data.

Generate Example ClickStream Records

The clickstream quickstart option produces records that simulate userid, time, page request, ip, agent, etc fields.

Inside Connect tab select default cluster

And then you need to click on add Connector

Search for datagen connect, then select datagen connector

Create a datagenconnect named as “clickstream_generator”

Datagenconnector would help to create sample clickstream data in the Clickstream topic.

Scroll down to general tab, inside kafka topic create a kafka topic named as “clickstream”

Then we need to add max.interval:100, iteration:500000, and select quickstart: clickstream

Then click on continue

After setup configuration, verify your configuration then click on launch

Datagen successfully generates data inside clickstream topics.

You can verify by using the below command

Open a new terminal and execute a below query

docker exec -it broker bash

kafka-console-consumer --bootstrap-server localhost:9092 --topic clickstream --from-beginning

Now open a new terminal and execute the below command to open KSQL CLI

$ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

Let’s start by checking either clickstream topic exist or not

ksql> show topics;

To check the key for your messages, you can use the PRINT command:

ksql> PRINT 'clickstream' FROM BEGINNING LIMIT 1;

Inspect Topics By Using KsqlDB in Control Center

In the cluster submenu, click ksqlDB and open the ksqlDB Editor on the default application.

Use the ksqlDB Editor page to create streams and tables, run SQL queries, and browse messages.

Let’s create a stream using ksqlDB Editor in the control center UI which would include log data fields such as userid, time, page request, ip, agent, etc.

create stream LOG_DATA_STREAM ( time varchar, ip varchar, request varchar, userid int, agent varchar) with (kafka_topic='clickstream', value_format='Avro');

After creating streams, the resizeable streams pane is populated with the defined streams and tables for the application, beyond the default LOG_DATA_STREAM:

  • Streams are indicated with a blue St expandable node: str
  • Tables are indicated with a brown Tb expandable node: tbl

Use select command to verify the output

Select * from LOG_DATA_STREAM emit changes ;

Session Window

A session window aggregates records into a session, which represents a period of activity separated by a specified gap of inactivity, or "idleness". Any records with timestamps that occur within the inactivity gap of existing sessions are merged into the existing sessions. If a record's timestamp occurs outside of the session gap, a new session is created.

Count the number of users per region for session windows with a session inactivity gap of 60 seconds, you might run the following query, which sessionizes the input data and performs the counting/aggregation step per region:

SELECT userid, COUNT(*) AS num_of_user FROM LOG_DATA_STREAM WINDOW SESSION (60 SECONDS) GROUP BY userid EMIT CHANGES;

A series of basic Log data analytics using Ksqldb Cli

Now let’s analyze the Log stream, go back to ksqlcli

Step 1: Create table using CTAS command, with the help of clickstream KStream for getting number of tasks per minute.

ksql> CREATE table tasks_per_min AS SELECT userid, count(*) AS events FROM log_data_stream GROUP BY userid;

select * from tasks_per_min emit changes limit 10;

Step 2: Now create a table that would store data of the number of times different agents used to visit various web pages.

ksql> CREATE TABLE agents_used AS SELECT userid, agent, count(*) FROM log_data_stream GROUP BY userid, agent;

Step 3: Now create a table that would help you to get to know which pages are visited and how many times.

ksql> CREATE TABLE pages_visited AS SELECT request, count(*) FROM log_data_stream GROUP BY request;

Select * from pages_visited emit changes limit 20;

Congrats, you have just performed Clickstream analysis in real-time using KsqlDB on Confluent Platform!!