Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

5/10/2023

Reading time:18 mins

• Google Dataflow - Awesome-Astra

by John Doe

Integrating Astra and Beam/DataflowAstra allows both bulk and real time operations through AstraDB and Astra Streaming. For each service there are multiple interfaces available and integration with Apache Beam/Google Dataflow is possible in different ways. Some of the design choices for this integration are below:Data Bulk OperationsThe Astra service which handles massive amount of data is Astra DB. It provides multiples ways to load data but some methods are preferred over others.Cassandra and CQL: This is the way to go. It is the most mature and provides an efficient way to execute queries. With the native drivers you can run reactive queries and token range queries to distribute the load across the nodes. This is the approach that was taken with the original CassandraIO connector. The existing CassandraIO connector does not support Astra but we leveraged it to create a new AstraIO connector.CQL over REST: This interface can be use with any HTTP client. While the Astra SDKs provides a built-in client, this interface is not the best for bulk loading as it introduces an extra layer of serialization.CQL over GraphQL: This interface can be used with any HTTP Client. While the Astra SDKs provides a built-in client, this interface is not the best for bulk loading as it introduces an extra layer of serialization.CQL over GRPC: This interface is stateless, with an optimized serialization component (grpc), and reactive interfaces so it is a viable option. Currently, the operations exposed are CQL and the token metadata information is not available to perform range queries.Data Streaming OperationsThe Astra service to handle streaming data is Astra Streaming. It provides multiple interfaces like JMS, RabbitMQ, Kafka, and built-in Apache Beam support is available in standard connectors.To leverage the split capabilities of Pulsar, a PulsarIO connector was released in 2022. To learn more about its development you can follow this video from the Beam Summit 2022.Apache Beam1. OverviewIntroduction to Apache BeamObjectivesApache Beam is an open-source, unified programming model for batch and streaming data processing pipelines that simplifies large-scale data processing dynamics. Thousands of organizations around the world choose Apache Beam due to its unique data processing features, proven scale, and powerful yet extensible capabilities.Main ConceptsPipeline: A Pipeline encapsulates your entire data processing task, from start to finish. This includes reading input data, transforming that data, and writing output data. All Beam driver programs must create a Pipeline. When you create the Pipeline, you must also specify the execution options that tell the Pipeline where and how to run.PCollection: A PCollection represents a distributed data set that your Beam pipeline operates on. The data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanisms. Your pipeline typically creates an initial PCollection by reading data from an external data source, but you can also create a PCollection from in-memory data within your driver program. From there, PCollections are the inputs and outputs for each step in your pipeline.PTransform: A PTransform represents a data processing operation, or a step, in your pipeline. Every PTransform takes one or more PCollection objects as input, performs a processing function that you provide on the elements of that PCollection, and produces zero or more output PCollection objects.Input and Output so called I/O transforms: Beam comes with a number of “IOs” - library PTransforms that read or write data to various external storage systems.I/O ConnectorsApache Beam I/O connectors provide read and write transforms for the most popular data storage systems so that Beam users can benefit from natively optimised connectivity. With the available I/Os, Apache Beam pipelines can read and write data to and from an external storage type in a unified and distributed way.Integration with DataStax Astra is inspired by the built-in CassandraIO and PulsarIO connectors. This integration leverages a new AstraIO connector.RunnersA runner in Apache Beam is responsible for executing pipelines on a particular processing engine or framework, such as Apache Flink or Google Cloud Dataflow. The runner translates the Beam pipeline into the appropriate format for the underlying engine, manages job execution, and provides feedback on job progress and status.2. PrerequisitesSetup your JAVA Development environmentInstall Java Development Kit (JDK) 8+Use java reference documentation targetting your operating system to install a Java Development Kit. You can then validate your installation with the following command.java--versionInstall Apache Maven (3.8+)Samples and tutorials have been designed with Apache Maven. Use the reference documentation top install maven validate your installation withmvn-versionSetup Datastax Astra DBCreate your DataStax Astra account:Sign UpCreate an Astra TokenAn astra token acts as your credentials, it holds the different permissions. The scope of a token is the whole organization (tenant) but permissions can be edited to limit usage to a single database.To create a token, please follow this guideThe Token is in fact three separate strings: a Client ID, a Client Secret and the token proper. You will need some of these strings to access the database, depending on the type of access you plan. Although the Client ID, strictly speaking, is not a secret, you should regard this whole object as a secret and make sure not to share it inadvertently (e.g. committing it to a Git repository) as it grants access to your databases.{"ClientId":"ROkiiDZdvPOvHRSgoZtyAapp","ClientSecret":"fakedfaked","Token":"AstraCS:fake"}It is handy to have your token declare as an environment variable (replace with proper value):export ASTRA_TOKEN="AstraCS:replace_me"Create a Database and a keyspaceWith your account you can run multiple databases, a Databases is an Apache Cassandra cluster. It can live in one or multiple regions (dc). In each Database you can have multiple keyspaces. In the page we will use the database name db_demo and the keyspace keyspace_demo.You can create the DB using the user interface and here is a tutorial. You can also use Astra command line interface. To install and setup the CLI run the following:curl -Ls "https://dtsx.io/get-astra-cli" | bashsource ~/.astra/cli/astra-init.shastra setup --token ${ASTRA_TOKEN}To create DB and keyspace with the CLI:astra db create db_demo -k keyspace_demo --if-not-existsDownload the Secure Connect Bundle for current databaseA Secure Connect Bundle contains the certificates and endpoints informations to open a mTLS connection. Often mentionned as scb its scope is a database AND a region. If your database is deployed on multiple regions you will have to download the bundle for each one and initiate the connection accordingly. Instructions to download Secure Connect Bundle are hereYou can download the secure connect bundle from the user interface and here is a tutorial. You can also use Astra command line interface.astra db download-scb db_demo -f /tmp/secure-connect-bundle-db-demo.zip3. Installation and SetupSetup the maven project locallyClone the Repository with AstraIO and sample flowsgit clone https://github.com/DataStax-Examples/astra-dataflow-starter.gitBuild the project with mavencd astra-dataflow-startermvn clean install -Dmaven.test.skip=trueIn this page multiple flows will be described and this is how the project is defined:4. Bulk Data LoadDescription of Pipeline BulkDataLoadWithBeam.In this pipeline, 100 records are generated randomly to populate a table simpledata in AstraDB. The simpledata table looks like the following:CREATETABLEsimpledata(idintPRIMARYKEY,datatext);Implementation of Pipeline BulkDataLoadWithBeamThe pipeline requires 3 arguments:Parameter NameDescriptiontokenCredentials to connect to the Astra platform, it should start with AstraCS:...secureConnectBundleZip containing certificates to open a secured connection and endpoint definition to pick the proper databasekeyspaceTarget keyspace in Astra DBParameters are defined in a specialized interface LoadDataPipelineOptions inheriting from PipelineOptions/** * Interface definition of parameters needed for this pipeline */publicinterface LoadDataPipelineOptionsextendsPipelineOptions{@Description("The Zip file to secure the transport (secure connect bundle)")@Validation.RequiredStringgetSecureConnectBundle();voidsetSecureConnectBundle(Stringpath);@Description("The token used as credentials (Astra Token)")@Validation.RequiredStringgetToken();voidsetToken(Stringtoken);@Description("Target Keyspace in the database")@Validation.RequiredStringgetKeyspace();voidsetKeyspace(Stringkeyspace);}Parameters are marshalled all the time with a PipelineOptionsFactory.fromArgs(args).LoadDataPipelineOptionsastraOptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(LoadDataPipelineOptions.class);FileSystems.setDefaultPipelineOptions(astraOptions);Run the pipeline// Create a pipeline with the optionsPipelinepipelineWrite=Pipeline.create(astraOptions);pipelineWrite// Create 100 records randomly.apply(Create.of(AstraIOTestUtils.generateTestData(100)))// Create the target table.apply(newCreateTableTransform<SimpleDataEntity>(astraOptions))// Write data in tables.apply(AstraIO.<SimpleDataEntity>write().withToken(astraOptions.getToken()).withKeyspace(astraOptions.getKeyspace()).withSecureConnectBundle(newFile(astraOptions.getSecureConnectBundle())).withEntity(SimpleDataEntity.class));// Pipeline ExecutionpipelineWrite.run().waitUntilFinish();Executing the Pipeline BulkDataLoadWithBeam.Setup parameterscd samples-astra-beam-pipelinesexport ASTRA_KEYSPACE=demoexport ASTRA_SCB_PATH=/tmp/scb-demo.zipexport ASTRA_TOKEN=AstraCS:uZclXTY....Run the pipelinemvn -Pdirect-runner compile \ exec:java \ -Dexec.mainClass=com.dtx.astra.pipelines.beam.BulkDataLoadWithBeam \ -Dexec.args="\ --keyspace=${ASTRA_KEYSPACE} \ --secureConnectBundle=${ASTRA_SCB_PATH} \ --token=${ASTRA_TOKEN}"Validate table simpledata has been populated astra db cqlsh demo -k demo \ -e "select * from simpledata" \ --connect-timeout 20 \ --request-timeout 205. Bulk Data ExportDescription of Pipeline BulkDataExportWithBeamIn this pipeline, the contents of an Astra table are exported as set of CSV files. The read is split in token ranges for maximum performance (reads are distributed accross the nodes). Multiple files are produced in the output directory.Implementation of Pipeline BulkDataExportWithBeamThe pipeline requires 5 arguments:Parameter NameDescriptiontokenCredentials to connect to the Astra platform, it should sart with AstraCS:...secureConnectBundleZip containing certificates to open a secured connection and endpoint definition to pick the proper databasekeyspaceTarget keyspace in Astra DBtableThe table name to be exportedtargetFolderDestination for the files on diskThose parameters are parsed using a specialized PipelineOptions interface:publicinterface ExportTablePipelineOptionsextendsPipelineOptions{@Description("AstraToken Value")@Validation.RequiredValueProvider<String>getAstraToken();voidsetAstraToken(ValueProvider<String>token);@Description("Location of fie on disk")@Validation.RequiredValueProvider<String>getSecureConnectBundle();voidsetSecureConnectBundle(ValueProvider<String>path);@Description("Source Keyspace")@Validation.RequiredStringgetKeyspace();voidsetKeyspace(Stringkeyspace);@Description("Source Table")StringgetTable();voidsetTable(Stringtable);@Description("Destination folder")@Validation.RequiredStringgetTargetFolder();voidsetTargetFolder(Stringfolder);}Items are read with a AstraIO.read() as an entity, then serialized as a String@ProcessElementpublicvoidprocessElement(ProcessContextc){StringcsvLine=c.element().getId()+";"+c.element().getData();LOGGER.info("CSV Line: {}",csvLine);c.output(csvLine);}Run the pipeline// Build ReadPipelineexportCsvPipeline=Pipeline.create(options);exportCsvPipeline.apply("Read Table",AstraIO.<SimpleDataEntity>read().withToken(options.getAstraToken().get()).withSecureConnectBundle(newFile(options.getSecureConnectBundle().get())).withKeyspace(options.getKeyspace()).withTable(options.getTable()).withCoder(SerializableCoder.of(SimpleDataEntity.class)).withEntity(SimpleDataEntity.class)).apply("MapCsv",ParDo.of(newMapRecordAsCsvLine())).apply("WriteCsvInLocally",TextIO.write().to(options.getTargetFolder()));exportCsvPipeline.run().waitUntilFinish(Duration.standardSeconds(30));Executing of Pipeline BulkDataExportWithBeamSetup parameterscd samples-astra-beam-pipelinesexport ASTRA_KEYSPACE=demoexport ASTRA_SCB_PATH=/tmp/scb-demo.zipexport ASTRA_TABLE=simpledataexport DESTINATION=/tmpexport ASTRA_TOKEN=AstraCS:uZclXTY....Run the pipelinemvn -Pdirect-runner compile exec:java \ -Dexec.mainClass=com.dtx.astra.pipelines.beam.BulkDataExportWithBeam \ -Dexec.args="\ --astraToken=${ASTRA_TOKEN} \ --secureConnectBundle=${ASTRA_SCB_PATH} \ --keyspace=${ASTRA_KEYSPACE} \ --table=${ASTRA_TABLE} \ --targetFolder=${DESTINATION}"Google DataFlow1. OverviewIntroduction to Google DataflowGoogle Dataflow is an hosted version of Apache Beam running in the Google Cloud Platform, it is also called an Apache Beam Runner It allows users to build and execute data pipelines. It enables the processing of large amounts of data in a parallel and distributed manner, making it scalable and efficient. Dataflow supports both batch and streaming processing, allowing for real-time data analysis. Users can write data processing pipelines using a variety of programming languages such as Java, Python, and SQL. Dataflow provides native integration with main Google Cloud services, such as BigQuery and Pub/Sub.Dataflow provides built-in integrations with most in use Google Cloud Platform products suchh as Cloud Storage, Pub/Sub, Datastore or Big Query. The plaform can be extended and run any java code and I/O connectors deployed form the CLI.Integration with DataStax comes with the integration of proper runners but also some best practice on how to handle the credentials.2. PrerequisitesSetup your JAVA Development environmentInstall Java Development Kit (JDK) 8+Use java reference documentation targetting your operating system to install a Java Development Kit. You can then validate your installation with the following command.java--versionInstall Apache Maven (3.8+)Samples and tutorials have been designed with Apache Maven. Use the reference documentation top install maven validate your installation withmvn-versionSetup Datastax Astra DBCreate your DataStax Astra account:Sign UpCreate an Astra TokenAn astra token acts as your credentials, it holds the different permissions. The scope of a token is the whole organization (tenant) but permissions can be edited to limit usage to a single database.To create a token, please follow this guideThe Token is in fact three separate strings: a Client ID, a Client Secret and the token proper. You will need some of these strings to access the database, depending on the type of access you plan. Although the Client ID, strictly speaking, is not a secret, you should regard this whole object as a secret and make sure not to share it inadvertently (e.g. committing it to a Git repository) as it grants access to your databases.{"ClientId":"ROkiiDZdvPOvHRSgoZtyAapp","ClientSecret":"fakedfaked","Token":"AstraCS:fake"}It is handy to have your token declare as an environment variable (replace with proper value):export ASTRA_TOKEN="AstraCS:replace_me"Create a Database and a keyspaceWith your account you can run multiple databases, a Databases is an Apache Cassandra cluster. It can live in one or multiple regions (dc). In each Database you can have multiple keyspaces. In the page we will use the database name db_demo and the keyspace keyspace_demo.You can create the DB using the user interface and here is a tutorial. You can also use Astra command line interface. To install and setup the CLI run the following:curl -Ls "https://dtsx.io/get-astra-cli" | bashsource ~/.astra/cli/astra-init.shastra setup --token ${ASTRA_TOKEN}To create DB and keyspace with the CLI:astra db create db_demo -k keyspace_demo --if-not-existsDownload the Secure Connect Bundle for current databaseA Secure Connect Bundle contains the certificates and endpoints informations to open a mTLS connection. Often mentionned as scb its scope is a database AND a region. If your database is deployed on multiple regions you will have to download the bundle for each one and initiate the connection accordingly. Instructions to download Secure Connect Bundle are hereYou can download the secure connect bundle from the user interface and here is a tutorial. You can also use Astra command line interface.astra db download-scb db_demo -f /tmp/secure-connect-bundle-db-demo.zipSetup GCP Project1. Create projectIn the Google Cloud console, on the project selector page, select or create a Google Cloud projectNote: If you don't plan to keep the resources that you create in this guide, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project. Create a new Project in Google Cloud Console or select an existing one.2. Enable Billing: Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project3. Save project ID: The project identifier is available in the column ID. We will need it so let's save it as an environment variableexport GCP_PROJECT_ID=integrations-379317export GCP_PROJECT_CODE=747469159044export GCP_USER=cedrick.lunven@datastax.comexport GCP_COMPUTE_ENGINE=747469159044-compute@developer.gserviceaccount.com4. Download and install gCoud CLIcurl https://sdk.cloud.google.com | bash5. Associated CLI with project in GCPgcloud init6. Describe the projectgcloud projects describe ${GCP_PROJECT_ID}7. Enable expected APIgcloud services enable dataflow compute_component \ logging storage_component storage_api \ bigquery pubsub datastore.googleapis.com \ cloudresourcemanager.googleapis.com8. Add Roles to dataflow users: To complete the steps, your user account must have the Dataflow Admin role and the Service Account User role. The Compute Engine default service account must have the Dataflow Worker role. To add the required roles in the Google Cloud console:gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID} \ --member="user:${GCP_USER}" \ --role=roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding ${GCP_PROJECT_ID} \ --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \ --role=roles/dataflow.admingcloud projects add-iam-policy-binding ${GCP_PROJECT_ID} \ --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \ --role=roles/dataflow.workergcloud projects add-iam-policy-binding ${GCP_PROJECT_ID} \ --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \ --role=roles/storage.objectAdmin9. Create buckets for the project in cloud storage: Flows will load and export CSV files. In GCP we will create dedicated folder in Google Cloud Storage.gsutil mb -c STANDARD -l US gs://astra_dataflow_inputsgsutil mb -c STANDARD -l US gs://astra_dataflow_outputsgsutil ls10. Create secrets for the project in secret manager. To connect to AstraDB you need a token (credentials) and a zip used to secure the transport. Those two inputs should be defined as secrets.gcloud secrets create astra-token \ --data-file <(echo -n "${ASTRA_TOKEN}") \ --replication-policy="automatic"gcloud secrets create cedrick-demo-scb \ --data-file ${ASTRA_SCB_PATH} \ --replication-policy="automatic"gcloud secrets add-iam-policy-binding cedrick-demo-scb \ --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \ --role='roles/secretmanager.secretAccessor'gcloud secrets add-iam-policy-binding astra-token \ --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \ --role='roles/secretmanager.secretAccessor'gcloud secrets list11. Check that your secrets can be readmvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=com.dtx.astra.pipelines.test.ReadSecretAndConnectDataFlow \ -Dexec.args="\ --astraToken=projects/${GCP_PROJECT_CODE}/secrets/astra-token/versions/1 \ --secureConnectBundle=projects/${GCP_PROJECT_CODE}/secrets/cedrick-demo-scb/versions/1 \ --runner=DataflowRunner \ --project=${GCP_PROJECT_ID} \ --region=us-central1"3. Bulk Data LoadDescription of Pipeline BulkDataLoadWithDataFlowIn this pipeline, 100 records are generated randomly to populate a table simpledata in Cassandra in AstraDB.The simpledata table looks like:CREATETABLEsimpledata(idintPRIMARYKEY,datatext);Implementation of Pipeline BulkDataLoadWithDataFlowWe create a pipeline with 3 arguments:Parameter NameDescriptionastraTokenCredentials to connect to Astra platform, it should sart with AstraCS:...secureConnectBundleZip containing certificates to open a secured connection and endpoint definition to pick the proper databasekeyspaceTarget keyspace in Astra DBThose parameters are parsed using a specialized PipelineOptions interface:/** * Flow Interface */publicinterface LoadDataPipelineOptionsextendsPipelineOptions{@Description("Location of Astra Token secret")@Validation.RequiredStringgetAstraToken();voidsetAstraToken(Stringtoken);@Description("Location of secret for secure connect bundle")@Validation.RequiredStringgetSecureConnectBundle();voidsetSecureConnectBundle(Stringpath);@Description("Destination Keyspace")@Validation.RequiredStringgetKeyspace();voidsetKeyspace(Stringkeyspace);}Secrets are extracted from Secret Manager:SecretManagerServiceClientclient=SecretManagerServiceClient.create();StringastraToken=client.accessSecretVersion(astraOptions.getAstraToken()).getPayload().getData().toStringUtf8();LOGGER.info("+ Token retrieved");byte[]astraSecureBundle=client.accessSecretVersion(astraOptions.getSecureConnectBundle()).getPayload().getData().toByteArray();LOGGER.info("+ Secure connect bundle retrieved");Pipeline uses the parameters:PipelinepipelineWrite=Pipeline.create(astraOptions);pipelineWrite.apply("Create 100 random items",Create.of(AstraIOTestUtils.generateTestData(100))).apply("Write into Astra",AstraIO.<SimpleDataEntity>write().withToken(astraToken).withKeyspace(astraOptions.getKeyspace()).withSecureConnectBundleData(astraSecureBundle).withEntity(SimpleDataEntity.class));pipelineWrite.run().waitUntilFinish();Execution of Pipeline BulkDataLoadWithDataFlowRun the pipeline. As you see, the runner is set to DataflowRunner and the parameters are provided with the option exec.args in the command. The token and cloud secure bundle are read from secrets.mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=com.dtx.astra.pipelines.beam.dataflow.BulkDataLoadWithDataflow \ -Dexec.args="\ --astraToken=projects/${GCP_PROJECT_CODE}/secrets/astra-token/versions/1 \ --secureConnectBundle=projects/${GCP_PROJECT_CODE}/secrets/cedrick-demo-scb/versions/1 \ --keyspace=${ASTRA_KEYSPACE} \ --runner=DataflowRunner \ --project=${GCP_PROJECT_ID} \ --region=us-central1"4. Bulk Data ExportDescription of Pipeline BulkDataExportWithDataFlowIn this pipeline, the content of an Astra Table is exported as a set of CSV Files. The read is split in token ranges for maximum performance (read are distributed accross the nodes). Multiple files are produced in the output directory. The files are created in Google CLoud Storage.Implementation of Pipeline BulkDataExportWithDataFlowThe pipeline requires 5 arguments:Parameter NameDescriptiontokenCredentials to connect to Astra platform, it should sart with AstraCS:...secureConnectBundleZip containing certificates to open a secured connection and endpoint definition to pick the proper databasekeyspaceTarget keyspace in Astra DBtableThe table name to be exportedtargetFolderDestination for the files on diskThose parameters are parsed using a specialized PipelineOptions interface:publicinterface ExportTablePipelineOptionsextendsPipelineOptions{@Description("AstraToken Value")@Validation.RequiredValueProvider<String>getAstraToken();voidsetAstraToken(ValueProvider<String>token);@Description("Location of fie on disk")@Validation.RequiredValueProvider<String>getSecureConnectBundle();voidsetSecureConnectBundle(ValueProvider<String>path);@Description("Source Keyspace")@Validation.RequiredStringgetKeyspace();voidsetKeyspace(Stringkeyspace);@Description("Source Table")StringgetTable();voidsetTable(Stringtable);@Description("Destination folder")@Validation.RequiredStringgetTargetFolder();voidsetTargetFolder(Stringfolder);}Items are read with a AstraIO.read() as entity, then serialized as a String@ProcessElementpublicvoidprocessElement(ProcessContextc){StringcsvLine=c.element().getId()+";"+c.element().getData();LOGGER.info("CSV Line: {}",csvLine);c.output(csvLine);}Secrets are extracted from the Secret Manager and used for the READExecution of Pipeline BulkDataExportWithDataFlowRun the pipeline. As you see, the runner is set to DataflowRunner and the parameters are provided with the option exec.args in the command. The token and cloud secure bundle are read from the secrets.mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=com.dtx.astra.pipelines.dataflow.BulkDataExportWithDataflow \ -Dexec.args="\ --astraToken=projects/${GCP_PROJECT_CODE}/secrets/astra-token/versions/1 \ --secureConnectBundle=projects/${GCP_PROJECT_CODE}/secrets/cedrick-demo-scb/versions/1 \ --keyspace=demo \ --table=simpledata \ --targetFolder=gs://astra_dataflow_ouput --runner=DataflowRunner \ --project=integrations-379317 \ --region=us-central1"Last update: 2023-05-04

Illustration Image
Integrating Astra and Beam/Dataflow

Astra allows both bulk and real time operations through AstraDB and Astra Streaming. For each service there are multiple interfaces available and integration with Apache Beam/Google Dataflow is possible in different ways. Some of the design choices for this integration are below:

image

Data Bulk Operations

The Astra service which handles massive amount of data is Astra DB. It provides multiples ways to load data but some methods are preferred over others.

  • Cassandra and CQL: This is the way to go. It is the most mature and provides an efficient way to execute queries. With the native drivers you can run reactive queries and token range queries to distribute the load across the nodes. This is the approach that was taken with the original CassandraIO connector. The existing CassandraIO connector does not support Astra but we leveraged it to create a new AstraIO connector.

  • CQL over REST: This interface can be use with any HTTP client. While the Astra SDKs provides a built-in client, this interface is not the best for bulk loading as it introduces an extra layer of serialization.

  • CQL over GraphQL: This interface can be used with any HTTP Client. While the Astra SDKs provides a built-in client, this interface is not the best for bulk loading as it introduces an extra layer of serialization.

  • CQL over GRPC: This interface is stateless, with an optimized serialization component (grpc), and reactive interfaces so it is a viable option. Currently, the operations exposed are CQL and the token metadata information is not available to perform range queries.

Data Streaming Operations

The Astra service to handle streaming data is Astra Streaming. It provides multiple interfaces like JMS, RabbitMQ, Kafka, and built-in Apache Beam support is available in standard connectors.

To leverage the split capabilities of Pulsar, a PulsarIO connector was released in 2022. To learn more about its development you can follow this video from the Beam Summit 2022.

Apache Beam

1. Overview

Introduction to Apache Beam

image

Objectives

Apache Beam is an open-source, unified programming model for batch and streaming data processing pipelines that simplifies large-scale data processing dynamics. Thousands of organizations around the world choose Apache Beam due to its unique data processing features, proven scale, and powerful yet extensible capabilities.

image

Main Concepts

image

  • Pipeline: A Pipeline encapsulates your entire data processing task, from start to finish. This includes reading input data, transforming that data, and writing output data. All Beam driver programs must create a Pipeline. When you create the Pipeline, you must also specify the execution options that tell the Pipeline where and how to run.

  • PCollection: A PCollection represents a distributed data set that your Beam pipeline operates on. The data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanisms. Your pipeline typically creates an initial PCollection by reading data from an external data source, but you can also create a PCollection from in-memory data within your driver program. From there, PCollections are the inputs and outputs for each step in your pipeline.

  • PTransform: A PTransform represents a data processing operation, or a step, in your pipeline. Every PTransform takes one or more PCollection objects as input, performs a processing function that you provide on the elements of that PCollection, and produces zero or more output PCollection objects.

  • Input and Output so called I/O transforms: Beam comes with a number of “IOs” - library PTransforms that read or write data to various external storage systems.

I/O Connectors

Apache Beam I/O connectors provide read and write transforms for the most popular data storage systems so that Beam users can benefit from natively optimised connectivity. With the available I/Os, Apache Beam pipelines can read and write data to and from an external storage type in a unified and distributed way.

Integration with DataStax Astra is inspired by the built-in CassandraIO and PulsarIO connectors. This integration leverages a new AstraIO connector.

Runners

A runner in Apache Beam is responsible for executing pipelines on a particular processing engine or framework, such as Apache Flink or Google Cloud Dataflow. The runner translates the Beam pipeline into the appropriate format for the underlying engine, manages job execution, and provides feedback on job progress and status.

image

2. Prerequisites

Setup your JAVA Development environment

Use java reference documentation targetting your operating system to install a Java Development Kit. You can then validate your installation with the following command.

java--version

Samples and tutorials have been designed with Apache Maven. Use the reference documentation top install maven validate your installation with

mvn-version
Setup Datastax Astra DB

Sign Up

An astra token acts as your credentials, it holds the different permissions. The scope of a token is the whole organization (tenant) but permissions can be edited to limit usage to a single database.

To create a token, please follow this guide

The Token is in fact three separate strings: a Client ID, a Client Secret and the token proper. You will need some of these strings to access the database, depending on the type of access you plan. Although the Client ID, strictly speaking, is not a secret, you should regard this whole object as a secret and make sure not to share it inadvertently (e.g. committing it to a Git repository) as it grants access to your databases.

{
"ClientId":"ROkiiDZdvPOvHRSgoZtyAapp",
"ClientSecret":"fakedfaked",
"Token":"AstraCS:fake"
}

It is handy to have your token declare as an environment variable (replace with proper value):

export ASTRA_TOKEN="AstraCS:replace_me"

With your account you can run multiple databases, a Databases is an Apache Cassandra cluster. It can live in one or multiple regions (dc). In each Database you can have multiple keyspaces. In the page we will use the database name db_demo and the keyspace keyspace_demo.

You can create the DB using the user interface and here is a tutorial. You can also use Astra command line interface. To install and setup the CLI run the following:

curl -Ls "https://dtsx.io/get-astra-cli" | bash
source ~/.astra/cli/astra-init.sh
astra setup --token ${ASTRA_TOKEN}

To create DB and keyspace with the CLI:

astra db create db_demo -k keyspace_demo --if-not-exists

A Secure Connect Bundle contains the certificates and endpoints informations to open a mTLS connection. Often mentionned as scb its scope is a database AND a region. If your database is deployed on multiple regions you will have to download the bundle for each one and initiate the connection accordingly. Instructions to download Secure Connect Bundle are here

image

You can download the secure connect bundle from the user interface and here is a tutorial. You can also use Astra command line interface.

astra db download-scb db_demo -f /tmp/secure-connect-bundle-db-demo.zip

3. Installation and Setup

Setup the maven project locally
git clone https://github.com/DataStax-Examples/astra-dataflow-starter.git
cd astra-dataflow-starter
mvn clean install -Dmaven.test.skip=true
  • In this page multiple flows will be described and this is how the project is defined:

image

4. Bulk Data Load

Description of Pipeline BulkDataLoadWithBeam.

In this pipeline, 100 records are generated randomly to populate a table simpledata in AstraDB. The simpledata table looks like the following:

CREATETABLEsimpledata(
idintPRIMARYKEY,
datatext
);

image

Implementation of Pipeline BulkDataLoadWithBeam
  • The pipeline requires 3 arguments:
Parameter Name Description
token Credentials to connect to the Astra platform, it should start with AstraCS:...
secureConnectBundle Zip containing certificates to open a secured connection and endpoint definition to pick the proper database
keyspace Target keyspace in Astra DB
  • Parameters are defined in a specialized interface LoadDataPipelineOptions inheriting from PipelineOptions
/**
 * Interface definition of parameters needed for this pipeline
 */
publicinterface LoadDataPipelineOptionsextendsPipelineOptions{
@Description("The Zip file to secure the transport (secure connect bundle)")
@Validation.Required
StringgetSecureConnectBundle();
voidsetSecureConnectBundle(Stringpath);
@Description("The token used as credentials (Astra Token)")
@Validation.Required
StringgetToken();
voidsetToken(Stringtoken);
@Description("Target Keyspace in the database")
@Validation.Required
StringgetKeyspace();
voidsetKeyspace(Stringkeyspace);
}
  • Parameters are marshalled all the time with a PipelineOptionsFactory.fromArgs(args).
LoadDataPipelineOptionsastraOptions=PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(LoadDataPipelineOptions.class);
FileSystems.setDefaultPipelineOptions(astraOptions);
  • Run the pipeline
// Create a pipeline with the options
PipelinepipelineWrite=Pipeline.create(astraOptions);
pipelineWrite
// Create 100 records randomly
.apply(Create.of(AstraIOTestUtils.generateTestData(100)))
// Create the target table
.apply(newCreateTableTransform<SimpleDataEntity>(astraOptions))
// Write data in tables
.apply(AstraIO.<SimpleDataEntity>write()
.withToken(astraOptions.getToken())
.withKeyspace(astraOptions.getKeyspace())
.withSecureConnectBundle(newFile(astraOptions.getSecureConnectBundle()))
.withEntity(SimpleDataEntity.class));
// Pipeline Execution
pipelineWrite.run().waitUntilFinish();
Executing the Pipeline BulkDataLoadWithBeam.
cd samples-astra-beam-pipelines
export ASTRA_KEYSPACE=demo
export ASTRA_SCB_PATH=/tmp/scb-demo.zip
export ASTRA_TOKEN=AstraCS:uZclXTY....
mvn -Pdirect-runner compile \
  exec:java \
  -Dexec.mainClass=com.dtx.astra.pipelines.beam.BulkDataLoadWithBeam \
  -Dexec.args="\
      --keyspace=${ASTRA_KEYSPACE} \
      --secureConnectBundle=${ASTRA_SCB_PATH} \
      --token=${ASTRA_TOKEN}"

5. Bulk Data Export

Description of Pipeline BulkDataExportWithBeam

In this pipeline, the contents of an Astra table are exported as set of CSV files. The read is split in token ranges for maximum performance (reads are distributed accross the nodes). Multiple files are produced in the output directory.

image

Implementation of Pipeline BulkDataExportWithBeam
  • The pipeline requires 5 arguments:
Parameter Name Description
token Credentials to connect to the Astra platform, it should sart with AstraCS:...
secureConnectBundle Zip containing certificates to open a secured connection and endpoint definition to pick the proper database
keyspace Target keyspace in Astra DB
table The table name to be exported
targetFolder Destination for the files on disk
  • Those parameters are parsed using a specialized PipelineOptions interface:
publicinterface ExportTablePipelineOptionsextendsPipelineOptions{
@Description("AstraToken Value")
@Validation.Required
ValueProvider<String>getAstraToken();
voidsetAstraToken(ValueProvider<String>token);
@Description("Location of fie on disk")
@Validation.Required
ValueProvider<String>getSecureConnectBundle();
voidsetSecureConnectBundle(ValueProvider<String>path);
@Description("Source Keyspace")
@Validation.Required
StringgetKeyspace();
voidsetKeyspace(Stringkeyspace);
@Description("Source Table")
StringgetTable();
voidsetTable(Stringtable);
@Description("Destination folder")
@Validation.Required
StringgetTargetFolder();
voidsetTargetFolder(Stringfolder);
}
  • Items are read with a AstraIO.read() as an entity, then serialized as a String
@ProcessElement
publicvoidprocessElement(ProcessContextc){
StringcsvLine=c.element().getId()+";"+c.element().getData();
LOGGER.info("CSV Line: {}",csvLine);
c.output(csvLine);
}
  • Run the pipeline
// Build Read
PipelineexportCsvPipeline=Pipeline.create(options);
exportCsvPipeline
.apply("Read Table",AstraIO
.<SimpleDataEntity>read()
.withToken(options.getAstraToken().get())
.withSecureConnectBundle(newFile(options.getSecureConnectBundle().get()))
.withKeyspace(options.getKeyspace())
.withTable(options.getTable())
.withCoder(SerializableCoder.of(SimpleDataEntity.class))
.withEntity(SimpleDataEntity.class))
.apply("MapCsv",ParDo.of(newMapRecordAsCsvLine()))
.apply("WriteCsvInLocally",TextIO.write().to(options.getTargetFolder()));
exportCsvPipeline
.run()
.waitUntilFinish(Duration.standardSeconds(30));
Executing of Pipeline BulkDataExportWithBeam
cd samples-astra-beam-pipelines
export ASTRA_KEYSPACE=demo
export ASTRA_SCB_PATH=/tmp/scb-demo.zip
export ASTRA_TABLE=simpledata
export DESTINATION=/tmp
export ASTRA_TOKEN=AstraCS:uZclXTY....
mvn -Pdirect-runner compile exec:java \
  -Dexec.mainClass=com.dtx.astra.pipelines.beam.BulkDataExportWithBeam \
  -Dexec.args="\
    --astraToken=${ASTRA_TOKEN} \
    --secureConnectBundle=${ASTRA_SCB_PATH} \
    --keyspace=${ASTRA_KEYSPACE} \
    --table=${ASTRA_TABLE} \
    --targetFolder=${DESTINATION}"

Google DataFlow

1. Overview

Introduction to Google Dataflow

image

Google Dataflow is an hosted version of Apache Beam running in the Google Cloud Platform, it is also called an Apache Beam Runner It allows users to build and execute data pipelines. It enables the processing of large amounts of data in a parallel and distributed manner, making it scalable and efficient. Dataflow supports both batch and streaming processing, allowing for real-time data analysis. Users can write data processing pipelines using a variety of programming languages such as Java, Python, and SQL. Dataflow provides native integration with main Google Cloud services, such as BigQuery and Pub/Sub.

image

Dataflow provides built-in integrations with most in use Google Cloud Platform products suchh as Cloud Storage, Pub/Sub, Datastore or Big Query. The plaform can be extended and run any java code and I/O connectors deployed form the CLI.

Integration with DataStax comes with the integration of proper runners but also some best practice on how to handle the credentials.

2. Prerequisites

Setup your JAVA Development environment

Use java reference documentation targetting your operating system to install a Java Development Kit. You can then validate your installation with the following command.

java--version

Samples and tutorials have been designed with Apache Maven. Use the reference documentation top install maven validate your installation with

mvn-version
Setup Datastax Astra DB

Sign Up

An astra token acts as your credentials, it holds the different permissions. The scope of a token is the whole organization (tenant) but permissions can be edited to limit usage to a single database.

To create a token, please follow this guide

The Token is in fact three separate strings: a Client ID, a Client Secret and the token proper. You will need some of these strings to access the database, depending on the type of access you plan. Although the Client ID, strictly speaking, is not a secret, you should regard this whole object as a secret and make sure not to share it inadvertently (e.g. committing it to a Git repository) as it grants access to your databases.

{
"ClientId":"ROkiiDZdvPOvHRSgoZtyAapp",
"ClientSecret":"fakedfaked",
"Token":"AstraCS:fake"
}

It is handy to have your token declare as an environment variable (replace with proper value):

export ASTRA_TOKEN="AstraCS:replace_me"

With your account you can run multiple databases, a Databases is an Apache Cassandra cluster. It can live in one or multiple regions (dc). In each Database you can have multiple keyspaces. In the page we will use the database name db_demo and the keyspace keyspace_demo.

You can create the DB using the user interface and here is a tutorial. You can also use Astra command line interface. To install and setup the CLI run the following:

curl -Ls "https://dtsx.io/get-astra-cli" | bash
source ~/.astra/cli/astra-init.sh
astra setup --token ${ASTRA_TOKEN}

To create DB and keyspace with the CLI:

astra db create db_demo -k keyspace_demo --if-not-exists

A Secure Connect Bundle contains the certificates and endpoints informations to open a mTLS connection. Often mentionned as scb its scope is a database AND a region. If your database is deployed on multiple regions you will have to download the bundle for each one and initiate the connection accordingly. Instructions to download Secure Connect Bundle are here

image

You can download the secure connect bundle from the user interface and here is a tutorial. You can also use Astra command line interface.

astra db download-scb db_demo -f /tmp/secure-connect-bundle-db-demo.zip
Setup GCP Project

In the Google Cloud console, on the project selector page, select or create a Google Cloud project

Note: If you don't plan to keep the resources that you create in this guide, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project. Create a new Project in Google Cloud Console or select an existing one.

export GCP_PROJECT_ID=integrations-379317
export GCP_PROJECT_CODE=747469159044
export GCP_USER=cedrick.lunven@datastax.com
export GCP_COMPUTE_ENGINE=747469159044-compute@developer.gserviceaccount.com
curl https://sdk.cloud.google.com | bash
gcloud init
gcloud projects describe ${GCP_PROJECT_ID}
gcloud services enable dataflow compute_component \
    logging storage_component storage_api \
    bigquery pubsub datastore.googleapis.com \
    cloudresourcemanager.googleapis.com
gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID} \
    --member="user:${GCP_USER}" \
    --role=roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID}  \
    --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \
    --role=roles/dataflow.admin
gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID}  \
    --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \
    --role=roles/dataflow.worker
gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID}  \
    --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \
    --role=roles/storage.objectAdmin
gsutil mb -c STANDARD -l US gs://astra_dataflow_inputs
gsutil mb -c STANDARD -l US gs://astra_dataflow_outputs
gsutil ls
gcloud secrets create astra-token \
   --data-file <(echo -n "${ASTRA_TOKEN}") \
   --replication-policy="automatic"
gcloud secrets create cedrick-demo-scb \
   --data-file ${ASTRA_SCB_PATH} \
   --replication-policy="automatic"
gcloud secrets add-iam-policy-binding cedrick-demo-scb \
    --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \
    --role='roles/secretmanager.secretAccessor'
gcloud secrets add-iam-policy-binding astra-token \
    --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \
    --role='roles/secretmanager.secretAccessor'
gcloud secrets list
mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=com.dtx.astra.pipelines.test.ReadSecretAndConnectDataFlow \
  -Dexec.args="\
  --astraToken=projects/${GCP_PROJECT_CODE}/secrets/astra-token/versions/1 \
  --secureConnectBundle=projects/${GCP_PROJECT_CODE}/secrets/cedrick-demo-scb/versions/1 \
  --runner=DataflowRunner \
  --project=${GCP_PROJECT_ID} \
  --region=us-central1"

image

3. Bulk Data Load

Description of Pipeline BulkDataLoadWithDataFlow

In this pipeline, 100 records are generated randomly to populate a table simpledata in Cassandra in AstraDB.The simpledata table looks like:

CREATETABLEsimpledata(
idintPRIMARYKEY,
datatext
);

image

Implementation of Pipeline BulkDataLoadWithDataFlow
  • We create a pipeline with 3 arguments:
Parameter Name Description
astraToken Credentials to connect to Astra platform, it should sart with AstraCS:...
secureConnectBundle Zip containing certificates to open a secured connection and endpoint definition to pick the proper database
keyspace Target keyspace in Astra DB
  • Those parameters are parsed using a specialized PipelineOptions interface:
/**
 * Flow Interface
 */
publicinterface LoadDataPipelineOptionsextendsPipelineOptions{
@Description("Location of Astra Token secret")
@Validation.Required
StringgetAstraToken();
voidsetAstraToken(Stringtoken);
@Description("Location of secret for secure connect bundle")
@Validation.Required
StringgetSecureConnectBundle();
voidsetSecureConnectBundle(Stringpath);
@Description("Destination Keyspace")
@Validation.Required
StringgetKeyspace();
voidsetKeyspace(Stringkeyspace);
}
  • Secrets are extracted from Secret Manager:
SecretManagerServiceClientclient=SecretManagerServiceClient.create();
StringastraToken=client
.accessSecretVersion(astraOptions.getAstraToken())
.getPayload().getData()
.toStringUtf8();
LOGGER.info("+ Token retrieved");
byte[]astraSecureBundle=client
.accessSecretVersion(astraOptions.getSecureConnectBundle())
.getPayload().getData()
.toByteArray();
LOGGER.info("+ Secure connect bundle retrieved");
  • Pipeline uses the parameters:
PipelinepipelineWrite=Pipeline.create(astraOptions);
pipelineWrite.apply("Create 100 random items",Create.of(AstraIOTestUtils.generateTestData(100)))
.apply("Write into Astra",AstraIO.<SimpleDataEntity>write()
.withToken(astraToken)
.withKeyspace(astraOptions.getKeyspace())
.withSecureConnectBundleData(astraSecureBundle)
.withEntity(SimpleDataEntity.class));
pipelineWrite.run().waitUntilFinish();
Execution of Pipeline BulkDataLoadWithDataFlow
mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=com.dtx.astra.pipelines.beam.dataflow.BulkDataLoadWithDataflow \
  -Dexec.args="\
    --astraToken=projects/${GCP_PROJECT_CODE}/secrets/astra-token/versions/1 \
    --secureConnectBundle=projects/${GCP_PROJECT_CODE}/secrets/cedrick-demo-scb/versions/1 \
    --keyspace=${ASTRA_KEYSPACE} \
    --runner=DataflowRunner \
    --project=${GCP_PROJECT_ID} \
    --region=us-central1"

image

4. Bulk Data Export

Description of Pipeline BulkDataExportWithDataFlow

In this pipeline, the content of an Astra Table is exported as a set of CSV Files. The read is split in token ranges for maximum performance (read are distributed accross the nodes). Multiple files are produced in the output directory. The files are created in Google CLoud Storage.

image

Implementation of Pipeline BulkDataExportWithDataFlow
  • The pipeline requires 5 arguments:
Parameter Name Description
token Credentials to connect to Astra platform, it should sart with AstraCS:...
secureConnectBundle Zip containing certificates to open a secured connection and endpoint definition to pick the proper database
keyspace Target keyspace in Astra DB
table The table name to be exported
targetFolder Destination for the files on disk
  • Those parameters are parsed using a specialized PipelineOptions interface:
publicinterface ExportTablePipelineOptionsextendsPipelineOptions{
@Description("AstraToken Value")
@Validation.Required
ValueProvider<String>getAstraToken();
voidsetAstraToken(ValueProvider<String>token);
@Description("Location of fie on disk")
@Validation.Required
ValueProvider<String>getSecureConnectBundle();
voidsetSecureConnectBundle(ValueProvider<String>path);
@Description("Source Keyspace")
@Validation.Required
StringgetKeyspace();
voidsetKeyspace(Stringkeyspace);
@Description("Source Table")
StringgetTable();
voidsetTable(Stringtable);
@Description("Destination folder")
@Validation.Required
StringgetTargetFolder();
voidsetTargetFolder(Stringfolder);
}
  • Items are read with a AstraIO.read() as entity, then serialized as a String
@ProcessElement
publicvoidprocessElement(ProcessContextc){
StringcsvLine=c.element().getId()+";"+c.element().getData();
LOGGER.info("CSV Line: {}",csvLine);
c.output(csvLine);
}
  • Secrets are extracted from the Secret Manager and used for the READ
Execution of Pipeline BulkDataExportWithDataFlow
mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=com.dtx.astra.pipelines.dataflow.BulkDataExportWithDataflow \
  -Dexec.args="\
      --astraToken=projects/${GCP_PROJECT_CODE}/secrets/astra-token/versions/1 \
      --secureConnectBundle=projects/${GCP_PROJECT_CODE}/secrets/cedrick-demo-scb/versions/1 \
      --keyspace=demo \
      --table=simpledata \
      --targetFolder=gs://astra_dataflow_ouput
      --runner=DataflowRunner \
      --project=integrations-379317 \
      --region=us-central1"

image


Last update: 2023-05-04

Related Articles

sstable
cassandra
spark

Spark and Cassandra’s SSTable loader

Arunkumar

11/1/2024

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

flink