Running Flink and K8ssandra on VMware Tanzu Kubernetes Grid 2

Executive Summary

Many organizations generate vast amounts of data in short intervals from various sources like sensors, user activity, and transactional systems. There is a growing need to process this streaming data in real-time to gain meaningful insights and make critical business decisions. Stream computing can help organizations achieve this by instantaneously processing insights, storing them immutably, scaling horizontally, and ensuring high availability, which enables innovative applications and real-time aligned business outcomes.

Apache Flink is a stream processing framework that helps solve this problem. Flink processes enormous volumes of data in real-time, generating insights, alerts, and dashboards instantly. Meanwhile, Apache Cassandra is a distributed database that can store large amounts of data and scale elastically, providing an immutable audit trail. They scale horizontally to handle petabyte workloads and peak throughput demands, with inherent fault tolerance features.

Deploying and managing these streaming applications at scale can be challenging. Using VMware Tanzu® Kubernetes Grid™ to run and manage Apache Flink and Cassandra can provide organizations with a scalable, reliable, and manageable solution to process and store large volumes of streaming data in real-time.

This paper demonstrates how Flink and Cassandra integrate seamlessly on Tanzu Kubernetes Grid to build highly scalable real-time streaming analytics applications with robust processing, availability, scalability, and governance. Real-time insights and immutable storage meet scalability and consistency at scale, enabling advanced use cases such as IoT and log processing, recommendations, and reporting. Enterprises can thus realize the potential of streaming data through Flink and Cassandra to disrupt markets. Running both platforms on Tanzu Kubernetes Grid delivers a robust and scalable platform for streaming analysis.

Technology Overview

The technological components in this solution are:

  •  VMware Tanzu Kubernetes Grid 2
  •  Apache Flink
  •  K8ssandra
  •  Monitoring tool

VMware Tanzu Kubernetes Grid

VMware Tanzu Kubernetes Grid provides organizations with a consistent, upstream-compatible Kubernetes substrate that is ready for end-user workloads and ecosystem integrations. It uses a new API called ClusterClass that defines a common cluster configuration for different infrastructure providers. Tanzu Kubernetes Grid 2 deploys clusters using an opinionated configuration of Kubernetes open-source software that is supported by VMware, so that you do not have to build a Kubernetes environment by yourself, it also provides packaged services such as networking, authentication, ingress control, and logging that are required for production Kubernetes environments.

Tanzu Kubernetes Grid 2 supports two types of deployment models: Supervisor deployment and standalone management cluster deployment. Supervisor deployment allows you to create and operate workload clusters natively in VMware vSphere® with Tanzu and leverage vSphere features. Standalone management cluster deployment allows you to create workload clusters on vSphere 6.7, 7, and 8 without Supervisor, or on AWS.

Apache Flink is a framework for distributed stream processing. It can process data in real-time streams and can also run batch processing jobs. Flink uses operator-based APIs that provide reusable data processing building blocks. Some common Flink operators include:

  • Data sources: Read data from sources like Kafka, Kinesis, and files.
  • Transformations: Map, filter, aggregate, and join to transform data
  • Data sinks: Write output to sinks like Kafka, Cassandra, and ElasticSearch

Users can compose operators into data processing pipelines and Flink manages distributed processing, fault tolerance, state management, and optimization. Flink powers real-time data applications, and the operator-based APIs allow for declarative and scalable data processing.

Flink Kubernetes Operator allows deploying Flink on Kubernetes. It handles provisioning and lifecycle management of Flink clusters on Kubernetes. This allows running Flink easily and efficiently on cloud-native Kubernetes infrastructures. See https://flink.apache.org for more information.

K8ssandra

K8ssandra is a cloud-native distribution of Apache Cassandra® that runs on Kubernetes. K8ssandra provides an ecosystem of tools to provide richer data APIs and automated operations alongside Cassandra. This includes metric monitoring to promote observability, data anti-entropy services to support reliability, and backup/restore tools to support high availability and disaster recovery. As an open-source project licensed under Apache Software License version 2, K8ssandra is free to use, improve, and enjoy. K8ssandra integrates and packages together:

See the k8ssandra website for more information.

Monitoring Tools

Prometheus is an open-source monitoring and alerting toolkit that is commonly used for Kubernetes but also supports other cloud-native environments. It is a high-scalable open-source monitoring framework that provides out-of-the-box monitoring capabilities for the Kubernetes container orchestration platform.

Grafana is an open-source analytics and monitoring platform that is commonly used for Kubernetes but also supports other cloud-native environments. It provides a powerful and elegant way to create, explore, and share dashboards and data with your team and others. Grafana can be used with Prometheus as a data source to visualize the metrics collected by Prometheus. Grafana can also be used with other data sources such as Graphite, Elasticsearch, Influx DB, and more.

vSAN Performance Service is used to monitor the performance of the vSAN environment, using the vSphere web client. The performance service collects and analyzes performance statistics and displays the data in a graphical format. You can use the performance charts to manage your workload and determine the root cause of problems.

Solution Configuration

Architecture

Two Tanzu workload clusters, each with multiple worker nodes, are provisioned. Persistent volumes are backed by vSAN. The k8ssandra operator is installed in cluster 1, and the Flink Operator is installed on cluster 2.

Diagram</p>
<p>Description automatically generated

Figure 1. Solution Architecture

Monitoring service Prometheus is deployed in namespace tanzu-system-monitoring and Grafana is deployed in namespace tanzu-system-dashboard.

Note: Our validation is based on vSphere 8 cluster.

Hardware Resource

Table 1. Hardware Resource

Property

Specification

 

Server model name 

 

4 x Dell PowerEdge R640   

CPU 

Intel(R) Xeon(R) Gold 6132 CPU @ 2.60GHz, 28 cores each 

RAM 

512GB 

Network adapter  

2 x Intel(R) Ethernet Controller 10G X550 

2 x Intel Corporation I350 Gigabit Network Connection 

Storage adapter 

1 x Dell HBA330 Mini 

2 x Express Flash PM1725a 1.6TB AIC 

Disks 

Cache - 2 x 1.6TB Dell Express Flash NVMe PCIe SSD PM1725a 

Capacity - 8 x 1.92TB write-intensive SAS SSDs 

Software Resource

Table 2. Software Resource

Software Version
vSphere 8.0
Tanzu Kubernetes Release v1.21.6+vmware.1
K8ssandra K8ssandra-operator v1.5.2
Flink Flink operator v1.3.1
MetalLB MetalLB v0.13.9

 

Network Configuration

Tanzu Kubernetes Grid is configured with Virtual Distributed Scheduler (VDS). MetalLB (see installation instructions) provides load balancing for external services. The Cassandra pods are placed in a namespace, and the Stargate and Reaper services can also be configured as load balancers. The configuration YAML files are here.

Tanzu Kubernetes Grid Configuration

Tanzu Kubernetes Grid uses a management cluster to create and manage workload clusters and has different deployment options based on the location that management cluster runs.

We followed Install the Tanzu CLI and Other Tools for Use with Standalone Management Clusters for the installation and deployed Management Clusters with the Installer Interface.

Then we deployed Tanzu Kubernetes Grid 2.1 with a standalone management cluster on vSphere 8 without a supervisor (see the guide).

Creating Workload Clusters

In VMware Tanzu Kubernetes Grid, workload clusters are the Kubernetes clusters on which your application workloads run. We followed the instruction that defined the workload cluster with Class-based clusters control planes and worker nodes as follows.

Table 3. Tanzu Kubernetes Cluster Definition

Role

Replicas

VM Configuration

Tanzu Kubernetes Release (TKR)

Control Plane

3

best-effort-small machine:

diskGiB: 40

memoryMiB: 8192

numCPUs: 2

v1.24.9+vmware.1-tkg.1

Worker Nodes

3

best-effort-2xlarge

machine:

diskGiB: 40

memoryMiB: 65536

numCPUs: 16

v1.24.9+vmware.1-tkg.1

Role

Replicas

VM Configuration

Tanzu Kubernetes Release (TKR)

Control Plane

3

best-effort-small machine:

          diskGiB: 40

          memoryMiB: 8192

          numCPUs: 2

v1.24.9+vmware.1-tkg.1

Worker Nodes

8 (initially 3, later scaled)

best-effort-2xlarge

machine:

          diskGiB: 40

          memoryMiB: 16192

          numCPUs: 4

v1.24.9+vmware.1-tkg.1

 The YAML files used in our validation for workload cluster deployment can be found here.

ReadWriteMany File Volume Configuration

File volumes backed by vSAN file shares can be created statically or dynamically and mounted by stateful containerized applications. See the guide to enable file volumes.

image 343

Figure 2. File Volume Configuration

Note: The Tanzu Kubernetes Grid cluster using Tanzu Kubernetes Grid role needs to assign the required privilege: Host.Configuration.Storage partition configuration, see vSphere Roles and Privileges for more information.

Prometheus and Grafana Deployment

Tanzu Kubernetes Grid provides cluster monitoring services by implementing the open-source Prometheus and Grafana projects.

By using Prometheus and Grafana, you can gain insights into the health and performance of your Tanzu Kubernetes Grid clusters. This information can help you identify and troubleshoot problems and ensure that your clusters are running smoothly.

Prerequisites:

Tanzu Kubernetes Grid provides cluster monitoring services by implementing the open-source Prometheus and Grafana projects.

Deployment procedures:

1.      Deploy Prometheus on workload cluster: we followed this guide to deploy Prometheus on your Tanzu Kubernetes Grid workload cluster. Deploy Prometheus with the default configurations because it is on standalone management.

image 344

Figure 3. tanzu-system-monitoring Namespace

2.      Deploy Grafana on the workload cluster: follow the instruction to deploy Grafana on workload clusters. And then, verify whether Prometheus and Grafana are installed.

 tanzu package installed list -A           

  NAMESPACE           NAME                               PACKAGE-NAME                           PACKAGE-VERSION                STATUS               
  contour-package     contour                            contour.tanzu.vmware.com               1.22.3+vmware.1-tkg.1          Reconcile succeeded  
  grafana-package     grafana                            grafana.tanzu.vmware.com               7.5.16+vmware.1-tkg.1          Reconcile succeeded 
  prometheus-package prometheus                          prometheus.tanzu.vmware.com            37.0+vmware.1-tkg.1          Reconcile succeeded  

Figure 4. Prometheus and Grafana Package Installed Screenshot

The Grafana package creates a Contour HTTP Proxy object with a Fully Qualified Domain Name (FQDN) of grafana.system.tanzu.

Notes: Create an entry in your local /etc/hosts file that points an IP address of a worker node to this FQDN.

To access the Grafana dashboard, enter the url: https://grafana.system.tanzu

3.      Use Grafana for monitoring.

Graphical user interface, website</p>
<p>Description automatically generated

Figure 5. Grafana Login

Use the default user admin/admin to log in and then verify the Prometheus data sources.

image 345

Figure 6. Prometheus DataSource

image 406

Figure 7. Tanzu Kubernetes Cluster Monitoring Dashboard

Solution Deployment and Validation

The Flink Kubernetes Operator and Cassandra Operator enable seamless deployment and management of Flink and Cassandra on Kubernetes respectively. By integrating them with Tanzu Kubernetes Grid, VMware's Kubernetes distribution, enterprises gain a fully validated and supported environment for running modern stream processing and storage workloads.

This solution validates the core capabilities of building and operating highly scalable real-time streaming analytics applications with strong SLAs on processing, availability, and supportability. The validation includes:

  • Flink deployment: The Flink Kubernetes Operator simplifies submitting Flink applications as Kubernetes jobs by injecting them into the cluster as pods for automated scheduling, orchestration, and high availability.
  • Flink job submission validation: The Flink Kubernetes Operator simplifies submission of Flink applications as Kubernetes jobs. It injects them into the cluster as pods for automated scheduling and orchestration.
  • Flink JobManager high availability validation: The Flink JobManager, which oversees cluster resources and job scheduling, is made high available through Kubernetes deployments to ensure no single point of failure.
  • Flink scalability with Reactive Mode: The Flink Operator supports reactive scaling to dynamically adjust cluster size based on workload metrics. It optimizes performance and costs through automated scaling.
  • Cassandra deployment: The Cassandra Operator facilitates deployment of Cassandra clusters on Kubernetes, it also includes a suite of tools to ease and automate operational tasks.
  • Stream application sample using Flink and Cassandra: An example application using Flink, Cassandra, Prometheus, and Grafana is included to demonstrate their integration in monitoring a stream processing pipeline.

Prometheus and Grafana are also deployed as part of the solution, providing metrics collection and visualization respectively, by bringing all components together on Tanzu Kubernetes Grid.

Install the certificate manager on your Kubernetes cluster to enable adding the webhook component (only needed once per Kubernetes cluster) to deploy the Flink Operator:

kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.3.1/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

Verify Flink Operator installed:

helm list
NAME                     	NAMESPACE	REVISION	UPDATED                   	STATUS  	CHART                          	APP VERSIO
flink-kubernetes-operator	default  	1       	2023-02-07 22:23:59.643829 +0800 CST	deployed	flink-kubernetes-operator-1.3.1	1.3.1    

Figure 8. Flink Deployment

JobManager is the name of the central work coordination component of Flink. It has implementations for different resource providers, which differ on high-availability, resource allocation behavior, and supported job submission modes.

Job submission has two modes: Application Mode and Session Mode. We validated the job submissions of both modes respectively in the following chapter.

Diagram</p>
<p>Description automatically generatedDiagram</p>
<p>Description automatically generated

Figure 9. JobManager

Application Mode Job

In Application Mode, Flink creates a cluster for each submitted application and runs the cluster exclusively for that application. The job's main method (or client) is run on the JobManager. Check out the YAML files.

The WordCount Beam application was first compiled with Flink runner and was packaged into a Flink application.

The Flink job can be viewed on the web UI through the beam-example-test service port.

tyin@tyin0MD6R flink-beam-example % kubectl port-forward service/beam-example-rest 8081:8081
Forwarding from 127.0.0.1:8081 -> 8081
Forwarding from [::1]:8081 -> 8081.  

Graphical user interface, application</p>
<p>Description automatically generated

Figure 10. Beam Application in Application Mode

Session Mode Job

In Session mode, an existing cluster was used to run any submitted applications. One JobManager instance manages multiple jobs that share the same cluster of TaskManagers. This has the advantage of avoiding the resource overhead of spinning up a new cluster for each job. This is important in scenarios where jobs have short running time, as a high startup time would negatively impact the end-to-end user experience. For example, interactive analysis of short queries can benefit from Session Mode, as jobs can quickly perform computations using the existing resources.

The limitation of Session Mode is that TaskManager slots are allocated by the ResourceManager on job submission and released once the job is completed. This means that there is competition for cluster resources between jobs. We first deployed a session cluster, check out the YAML file for detailed information.

Kubectl apply -f session-cluster.yaml

We started with the session manager configuration of 3 vCPU and 6GB memory. Then we tested with different parallelisms for the WordCount job, the input file is Wikipedia Dataset1 50GB with suffix 2. The file size was over 10GB. From the test results, the parallelism 32 configuration got the shortest duration.

Table 4. Session Mode Job Sample

Parallelism

TaskManager

Duration

8

1

3h6m52s

16

2

1h49m14s

32

4

1h8m33s

Figure 11. WordCount Test

Apache Flink JobManager High Availability (HA) ensures that Flink clusters continue running the submitted jobs even in the event of a JobManager failure. Flink provides two HA service implementations:

  • ZooKeeper: ZooKeeper HA services can be used with each Flink cluster deployment. They require a running ZooKeeper quorum.
  • Kubernetes: Kubernetes HA services only work when running on Kubernetes.

HA Configuration

Check out the test scripts here.

Flink Kubernetes HA allows Flink clusters to be deployed to Kubernetes and to continue operating even in the event of a JobManager failure. To recover the submitted jobs, Flink persists metadata and the job artifacts. The HA data will be kept until the respective job either succeeds, is cancelled, or fails terminally. Once this happens, all the HA data, including the metadata stored in the HA services, will be deleted.

In the YAML file, the high-availability storageDir property must be set to the directory where the HA state will be stored.

With vSAN file service enabled, we can create ReadWriteMany persistent storage volumes dynamically.

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-example-statemachine
spec:
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 1Gi

We configured the below keys:

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp

Application HA Test

The Flink Kubernetes application HA test starts by setting variables, such as CLUSTER_ID, APPLICATION_YAML, and TIMEOUT. It then applies the YAML file specified in APPLICATION_YAML to create a Kubernetes deployment. The script then waits for the JobManager to start running and submits a job. After the job is running, the script waits for the logs to show that a checkpoint has been completed. The script then kills the JobManager and waits for the new JobManager to recover from the latest successful checkpoint. Finally, the script checks the operator log for errors and prints out a message indicating that the test is successful.

We also monitored the session-cluster-1 pod has restarted during the JobManager failure testing.

tyin@tyin0MD6R ~ % kubectl get pod
NAME                                             READY   STATUS    RESTARTS      AGE
pod/flink-kubernetes-operator-76dfd7fcf5-8xbkw   2/2     Running   0             64d
pod/session-cluster-1-7465757b49-pxrpb           1/1     Running   1 (24s ago)  117s
pod/session-cluster-1-taskmanager-1-1            1/1     Running   0             73s
pod/session-cluster-1-taskmanager-1-2            1/1     Running   0             73s
pod/session-deployment-8698f96848-xcc6k          1/1     Running   0             47d

Session Job HA Test

Similarly, the Flink Session cluster high availability is tested by killing the JobManager pod and ensuring that the cluster recovers successfully. The script does the following:

  • Apply the YAML file for a Flink cluster and job, retrying up to 5 times.
  • Wait up to TIMEOUT for the cluster status and job status to become READY and RUNNING.
  • Get the job ID from the JobManager logs.
  • Kill the JobManager pod.
  • Wait for recovering logs and status until another checkpoint is completed.
  • Check the operator log for errors.
  • Print out a message indicating that the test was successful.

Flink offers reactive scaling, which automatically scales the size of a cluster up or down based on metrics. This automates and optimizes the provisioning and release of cluster resources in line with application workload demands. It adapts capacity seamlessly based on metrics to maximize performance, minimize costs, and ensure SLO/SLA compliance. This simplifies the management of stream processing at scale.

The Reactive Mode allows Flink users to implement a powerful autoscaling mechanism by having an external service monitor metrics such as aggregate CPU utilization, throughput, or latency. The Flink JobManager interacts with this external service. As soon as metrics surpass or fall below thresholds, additional TaskManagers can be added or removed from the Flink cluster.

Flink manages job parallelism that always maximizes values within constraints. Reactive scaling adjusts resources in line with workload demands, avoiding over or under provisioning. It keeps cluster size optimized for performance and cost, scaling out during load spikes and scaling in during lulls.

Diagram</p>
<p>Description automatically generated

Figure 12. Flink Scalability with Reactive Mode

We used basic-reactive.yaml to verify scalability.

Deployment of Flink clusters on Kubernetes is only supported as a standalone application deployment. To enable elastic scaling, set the mode to standalone in the YAML file and enable the reactive scaling mode.

flinkConfiguration:
scheduler-mode: REACTIVE
mode: standalone
kubectl apply -f basic-reactive.yaml to validate the scalability function.

Manual Scaling

Scale the Tanzu Kubernetes Grid workload clusters and standalone management clusters using the following methods:

  • Scale horizontally: For workload or standalone management clusters, you can manually scale the number of control plane and worker nodes. See Scale a Cluster Horizontally.
  • Scale vertically: For workload clusters, you can manually change the size of the control plane and worker nodes. See Scale a Cluster Vertically.

In our validation, the workload cluster "testworkload" initially had 3 worker nodes. We can use Tanzu command line as below to scale the cluster with 6 worker nodes.

And we use the command below to scale the task-manager to 3 replicas.

tyin@tyin0MD6R examples % kubectl scale
-replicas=3 deployments/basic-reactive-example-taskmanager
deployment.apps/basic-reactive-example-taskmanager scaled

We can see that TaskManager is scaling to 3 pods.

Automatic Scaling

Cluster Autoscaler can automatically scale the number of worker nodes in workload clusters deployed by a standalone management cluster. For more information, see Scale Worker Nodes with Cluster Autoscaler.

We can use the following command to scale TaskManager automatically based on CPU utilization metric.

kubectl autoscale deployment basic-reactive-example-taskmanager --min=1 --max=15 --cpu-percent=20
horizontalpodautoscaler.autoscaling/basic-reactive-example-taskmanager autoscaled

And
Kubectl get all
 …
NAME                                                                     REFERENCE                                       TARGETS   MINPODS   MAXPODS   REPLICAS   AGE
horizontalpodautoscaler.autoscaling/basic-reactive-example-taskmanager   Deployment/basic-reactive-example-taskmanager   4%/20%    1         15        3          2m21s

K8ssandra Deployment

Deploying K8ssandra-Operator

We performed the following steps to deploy K8ssandra-operator on a single workload cluster:

1.      Install k8sssandra-operator:

helm install k8ssandra-operator k8ssandra/k8ssandra-operator -n k8ssandra-operator -- create-namespace

helm list -n k8ssandra-operator             
NAME              	NAMESPACE         	REVISION   UPDATED                             STATUS  	                  	
k8ssandra-operator	k8ssandra-operator	1          2023-02-28 10:52:22.13637 +0800 CST deployed
CHART		k8ssandra-operator-1.5.2	 
APP VERSION    1.5.2      

2.    Check that there are two deployments in the k8ssandra-operator namespace:

kubectl -n k8ssandra-operator get deployment
NAME                 READY   UP-TO-DATE   AVAILABLE   AGE
cass-operator-controller-manager   1/1     1            1           2d
k8ssandra-operator                 1/1     1            1           2d 

For more information, refer to Install K8ssandra Operator.

Cassandra Cluster

We chose Cassandra v4.0. 3 for the workload testing. Check out the test cluster YAML file here.

For detailed benchmark and configuration information, refer to running-k8ssandra-vmware-tanzu-kubernetes-grid-vmware-cloud-aws.

kubectl get svc -n k8ssandra-operator
NAME                                               TYPE           CLUSTER-IP       EXTERNAL-IP      PORT(S)                                                                                                    AGE
demo-dc1-additional-seed-service                   ClusterIP      None             <none>           <none>                                                                                                     43d
demo-dc1-all-pods-service                          ClusterIP      None             <none>           9042/TCP,8080/TCP,9103/TCP,9000/TCP                                                                        13d
demo-dc1-service                                   ClusterIP      None             <none>           9042/TCP,9142/TCP,8080/TCP,9103/TCP,9000/TCP                                                               13d
demo-dc1-stargate-service                          LoadBalancer   100.67.100.149    <none>  8080:30294/TCP,8081:31590/TCP,8082:31398/TCP,8084:30105/TCP,8085:30287/TCP,8090:31838/TCP,9042:30062/TCP   12d
demo-seed-service                                  ClusterIP      None             <none>           <none>                                                                                                     43d
k8ssandra-operator-cass-operator-webhook-service   ClusterIP      100.64.223.255   <none>           443/TCP                                                                                                    14d
k8ssandra-operator-webhook-service                 ClusterIP      100.64.241.64    <none>           443/TCP                                                                                                    14d

We changed the type of the demo-dc1-stargate-service service to LoadBalancer:

kubectl -n k8ssandra-operator patch service demo-dc1-stargate-service -p '{"spec": {"type":"LoadBalancer"}}'
service/demo-dc1-stargate-service patched

We used 100.67.100.149 for internal pod access and used 10.156.159.64 for Stargate service access.

kubectl get svc -n k8ssandra-operator
NAME                                               TYPE           CLUSTER-IP       EXTERNAL-IP     PORT(S)                                                                                                    AGE
demo-dc1-additional-seed-service                   ClusterIP      None             <none>          <none>                                                                                                     23d
demo-dc1-all-pods-service                          ClusterIP      None             <none>          9042/TCP,8080/TCP,9103/TCP,9000/TCP                                                                        23d
demo-dc1-service                                   ClusterIP      None             <none>          9042/TCP,9142/TCP,8080/TCP,9103/TCP,9000/TCP                                                               23d
demo-dc1-stargate-service                          LoadBalancer   100.67.100.149   10.156.159.64   8080:30828/TCP,8081:31581/TCP,8082:31336/TCP,8084:30232/TCP,8085:31656/TCP,8090:32022/TCP,9042:30204/TCP   22d
demo-seed-service                                  ClusterIP      None             <none>          <none>                                                                                                     23d
k8ssandra-operator-cass-operator-webhook-service   ClusterIP      100.64.223.255   <none>          443/TCP                                                                                                    24d
k8ssandra-operator-webhook-service                 ClusterIP      100.64.241.64    <none>          443/TCP             

K8ssandra enables authentication and authorization by default. When the authentication is enabled, K8ssandra configures a new and default superuser. The username defaults to {metadata.name}-superuser. The credentials for the superuser are stored in a secret named {metadata.name}-superuser.

To retrieve the Cassandra cluster credential for future connections, use the following command:

kubectl get secret demo-superuser -o jsonpath="{.data.username}" -n k8ssandra-operator | base64 --decode

In our example, we deployed a cluster with name demo (replace the demo with the name configured for your running cluster).

To extract and decode the username secret, use the following command:

kubectl get secret demo-superuser -o jsonpath="{.data.password}" -n k8ssandra-operator | base64 –decode

Stargate has no specific credentials. It uses the same superuser as defined for Cassandra.

We validated an example of the streaming pipeline. The workflow is as follows:

A screenshot of a computer</p>
<p>Description automatically generated with low confidence

Figure 13. Stream Application Sample Running on Flink and Cassandra

The TopSpeedWindowing application is an example implementation of a streaming data processing application that uses Flink's windowing functionality to find the top speed of cars over a sliding time window, the code has been modified to persist the data to a Cassandra keyspace table.

Overall, this example provides a demonstration of how Flink's windowing functionality can be used to process the streaming data in a flexible and scalable way, it allows developers to perform complex computations on time-based data with ease.

Check out the example code and YAML files.

Schema of Destination Cassandra Keyspace Table

The output data stream topSpeeds contains tuples of the form (carId, speed, distance, timestamp) representing the highest speed recorded for each car during the window.

create keyspace example WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'} AND durable_writes = true; 
CREATE TABLE example.topSpeeds (carid int, speed int, distance double, time bigint, PRIMARY KEY (carid,time))  WITH CLUSTERING ORDER BY (time DESC); 

describe table example.topspeeds;

CREATE TABLE example.topspeeds (
    carid int PRIMARY KEY,
    distance double,
    speed int,
    time bigint
) WITH additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99p';

Apache Flink Code to Aggregate and Persist Data in Cassandra Keyspace Table

Apache Cassandra Connector provides sinks that allow you to write data from a Flink data stream to a Cassandra database. The connector supports both batch and streaming data, and it can be used to write data to any Cassandra tables.

Perform the following steps to use the connector:

  1. Create a CassandraSink object. This object takes a Flink data stream as an input and a Cassandra table as an output. You can then configure the sink with options such as the Cassandra host and port, the Cassandra keyspace, and the Cassandra table name.
final String username = "demo-superuser";
            final String password = "dJLCx7Y0opoIfEarDJTV";
            final String contactpoint = params.getContactPoint().orElse("127.0.0.1");
            try {
                CassandraSink.addSink(topSpeeds)
                        .setQuery(
                                "INSERT INTO example.topspeeds(carid, speed,distance, time) values (?, ?, ?, ?);")
                        .setClusterBuilder(
                                new ClusterBuilder() {
                                    private static final long serialVersionUID =
                                            2793938419775311824L;

                                    public Cluster buildCluster(Cluster.Builder builder) {
                                        return builder.addContactPoints(contactpoint)
                                                .withPort(9042)
                                                .withCredentials(username, password)
                                                .withoutJMXReporting()
                                                .build();
                                    }
                                })
                        .build();
            } catch (Exception e) {
                System.out.println("Error connecting to cluster" + e.getMessage());
            }
  1. To use this connector, add the following dependency to the pom.xml in your project:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-cassandra_2.12</artifactId>
    <version>1.16.1</version>
</dependency>
  1. Build an executable jar. Since we are running in a containered environment, we need to Create Executable Fat Jar with Maven Shade Plugin.
  2. Build an image that includes the jar.

Job Running

We used the following YAML file to submit the carTopSpeed job and sink to Cassandra.

Kubectl apply -f topcarwithcassparam.yaml

Graphical user interface</p>
<p>Description automatically generated

Figure 14. Job Running Sample

Result Verification

In our example, we deployed a cluster with name demo (replace the demo with the name configured for your running cluster).

After the Flink job starts and persists results to database, we can use ‘cqlsh’ to connect to the Cassandra keyspace and then query and review the aggregated data as illustrated below:

tyin@tyin0MD6R bin % /cqlsh 10.156.159.164 -u demo-superuser --request-timeout=6000 
Password: 
Connected to demo at 10.156.159.164:9042.
[cqlsh 6.8.0 | Cassandra 4.0.4 | CQL spec 3.4.5 | Native protocol v4]
Use HELP for help.
demo-superuser@cqlsh> select count(*) from example.topspeeds ;

 count
-------
  3546

However, as the number of records inserted increases quickly, using ‘cqlsh’ will eventually generate a timeout error. In this case, we can use tablestats to check the number of records written to the topspeeds table.

tyin@tyin0MD6R examples % kubectl exec -it pod/demo-dc1-default-sts-0 -n k8ssandra-operator -c cassandra -- nodetool -u demo-superuser -pw dJLCx7Y0opoIfEarDJTV tablestats example.topspeeds 
Total number of tables: 42
----------------
Keyspace: example
	Read Count: 0
	Read Latency: NaN ms
	Write Count: 35040986
	Write Latency: 0.009253590552503289 ms
	Pending Flushes: 0
		Table: topspeeds
		SSTable count: 3
		Old SSTable count: 0
		Space used (live): 150225664
		Space used (total): 150225664
		Space used by snapshots (total): 0
		Off heap memory used (total): 118112
		SSTable Compression Ratio: 0.6663820948690086
		Number of partitions (estimate): 4000
		Memtable cell count: 4552412
		Memtable data size: 77782389
		Memtable off heap memory used: 0
		Memtable switch count: 6
		Local read count: 0
		Local read latency: NaN ms
		Local write count: 35040986
		Local write latency: NaN ms
		Pending flushes: 0
		Percent repaired: 0.0
		Bytes repaired: 0.000KiB
		Bytes unrepaired: 214.478MiB
		Bytes pending repair: 0.000KiB
		Bloom filter false positives: 0
		Bloom filter false ratio: 0.00000
		Bloom filter space used: 7536
		Bloom filter off heap memory used: 7512
		Index summary off heap memory used: 768
		Compression metadata off heap memory used: 109832
		Compacted partition minimum bytes: 11865
		Compacted partition maximum bytes: 379022
		Compacted partition mean bytes: 41255
		Average live cells per slice (last five minutes): NaN
		Maximum live cells per slice (last five minutes): 0
		Average tombstones per slice (last five minutes): NaN
		Maximum tombstones per slice (last five minutes): 0
		Dropped Mutations: 0
		Droppable tombstone ratio: 0.00000

Best Practices

  •  It is recommended to deploy multiple workload cluster for production for better resource isolation and monitoring.
  •  For workload cluster deployment, start from a small number of nodes to tune the parameters and then scale up gradually.
  •  For K8ssandra cluster deployment, the Stargate heap size should match the Cassandra pod, the proper size can be tuned through the throughput or latency workload testing.
  •  For the Flink cluster deployment, start from a small number of nodes and small worker node size to tune the parameters and then scale up gradually

Conclusion

Flink and Cassandra are two popular open-source tools that work together appropriately for modern applications. By combining Flink and Cassandra, enterprises can build integrated real-time streaming analytics pipelines. The Flink Kubernetes Operator, Cassandra Operator, Prometheus, and Grafana integrate seamlessly on Tanzu Kubernetes Grid. This solution thus streamlines building and running real-time workloads on Kubernetes, which allows IT administrators to enable fast application deployment, achieve better scalability for performance, ensure high availability, governance and lower TCO expenditure.

Reference

For more information, you can explore the following resources:

About the Author

Ting Yin, Senior Technical Marketing Architect in the Workload Technical Marketing Team of the Cloud Infrastructure Big Group, wrote the original version of this paper. The following reviewers also contributed to the paper contents: 

  •  Chen Wei, Director of the Workload Technical Marketing Team in VMware
  •  Catherine Xu, Senior Manager of the Workload Technical Marketing Team in VMware

Filter Tags

Modern Applications Tanzu Kubernetes Grid vSphere 8 Document Reference Architecture