Cassandra
Important Capabilities
Capability | Status | Notes |
---|---|---|
Asset Containers | ✅ | Enabled by default |
Detect Deleted Entities | ✅ | Optionally enabled via stateful_ingestion.remove_stale_metadata |
Platform Instance | ✅ | Enabled by default |
Schema Metadata | ✅ | Enabled by default |
This plugin extracts the following:
- Metadata for tables
- Column types associated with each table column
- The keyspace each table belongs to
Setup
This integration pulls metadata directly from Cassandra databases, including both DataStax Astra DB and Cassandra Enterprise Edition (EE).
You’ll need to have a Cassandra instance or an Astra DB setup with appropriate access permissions.
Steps to Get the Required Information
Set Up User Credentials:
- For Astra DB:
- Log in to your Astra DB Console.
- Navigate to Organization Settings > Token Management.
- Generate an Application Token with the required permissions for read access.
- Download the Secure Connect Bundle from the Astra DB Console.
- For Cassandra EE:
- Ensure you have a username and password with read access to the necessary keyspaces.
- For Astra DB:
Permissions:
- The user or token must have
SELECT
permissions that allow it to:- Access metadata in system keyspaces (e.g.,
system_schema
) to retrieve information about keyspaces, tables, columns, and views. - Perform
SELECT
operations on the data tables if data profiling is enabled.
- Access metadata in system keyspaces (e.g.,
- The user or token must have
Verify Database Access:
- For Astra DB: Ensure the Secure Connect Bundle is used and configured correctly.
- For Cassandra EE: Verify SSL/TLS settings if required, and ensure the contact point and port are accessible.
CLI based Ingestion
Install the Plugin
The cassandra
source works out of the box with acryl-datahub
.
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: "cassandra"
config:
# Coordinates
contact_point: "localhost"
port: 9042
# Credentials
username: "admin"
password: "password"
#datastax astra db
#datastax_astra_cloud_config:
# secure_connect_bundle: "Path to Secure Connect Bundle (.zip)"
# token: "Application Token"
# Optional
keyspace_pattern:
allow: [".*"]
# Optional
table_pattern:
allow: [".*"]
# Optional
profiling:
enabled: true
profile_table_level_only: true
sink:
# config sinks
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
contact_point string | Domain or IP address of the Cassandra instance (excluding port). Default: localhost |
password string | Password credential associated with the specified username. |
platform_instance string | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details. |
port integer | Port number to connect to the Cassandra instance. Default: 9042 |
username string | Username credential with read access to the system_schema keyspace. |
env string | The environment that all assets produced by this connector belong to Default: PROD |
datastax_astra_cloud_config CassandraCloudConfig | Configuration for cloud-based Cassandra, such as DataStax Astra DB. |
datastax_astra_cloud_config.secure_connect_bundle ❓ string | File path to the Secure Connect Bundle (.zip) used for a secure connection to DataStax Astra DB. |
datastax_astra_cloud_config.token ❓ string | The Astra DB application token used for authentication. |
datastax_astra_cloud_config.connect_timeout integer | Timeout in seconds for establishing new connections to Cassandra. Default: 600 |
datastax_astra_cloud_config.request_timeout integer | Timeout in seconds for individual Cassandra requests. Default: 600 |
keyspace_pattern AllowDenyPattern | Regex patterns to filter keyspaces for ingestion. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
keyspace_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
keyspace_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
keyspace_pattern.allow.string string | |
keyspace_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
keyspace_pattern.deny.string string | |
profile_pattern AllowDenyPattern | Regex patterns for tables to profile Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
profile_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
profile_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
profile_pattern.allow.string string | |
profile_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
profile_pattern.deny.string string | |
table_pattern AllowDenyPattern | Regex patterns to filter keyspaces.tables for ingestion. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
table_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
table_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
table_pattern.allow.string string | |
table_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
table_pattern.deny.string string | |
profiling ProfileConfig | Configuration for profiling Default: {'enabled': False, 'operation_config': {'lower_fre... |
profiling.catch_exceptions boolean | Default: True |
profiling.column_count boolean | Default: True |
profiling.enabled boolean | Whether profiling should be done. Default: False |
profiling.field_sample_values_limit integer | Upper limit for number of sample values to collect for all columns. Default: 20 |
profiling.include_field_distinct_count boolean | Whether to profile for the number of distinct values for each column. Default: True |
profiling.include_field_distinct_value_frequencies boolean | Whether to profile for distinct value frequencies. Default: False |
profiling.include_field_histogram boolean | Whether to profile for the histogram for numeric fields. Default: False |
profiling.include_field_max_value boolean | Whether to profile for the max value of numeric columns. Default: True |
profiling.include_field_mean_value boolean | Whether to profile for the mean value of numeric columns. Default: True |
profiling.include_field_median_value boolean | Whether to profile for the median value of numeric columns. Default: True |
profiling.include_field_min_value boolean | Whether to profile for the min value of numeric columns. Default: True |
profiling.include_field_null_count boolean | Whether to profile for the number of nulls for each column. Default: True |
profiling.include_field_quantiles boolean | Whether to profile for the quantiles of numeric columns. Default: False |
profiling.include_field_sample_values boolean | Whether to profile for the sample values for all columns. Default: True |
profiling.include_field_stddev_value boolean | Whether to profile for the standard deviation of numeric columns. Default: True |
profiling.limit integer | Max number of documents to profile. By default, profiles all documents. |
profiling.max_workers integer | Number of worker threads to use for profiling. Set to 1 to disable. Default: 20 |
profiling.offset integer | Offset in documents to profile. By default, uses no offset. |
profiling.profile_table_level_only boolean | Whether to perform profiling at table-level only, or include column-level profiling as well. Default: False |
profiling.report_dropped_profiles boolean | Whether to report datasets or dataset columns which were not profiled. Set to True for debugging purposes. Default: False |
profiling.row_count boolean | Default: True |
profiling.turn_off_expensive_profiling_metrics boolean | Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10. Default: False |
profiling.operation_config OperationConfig | Experimental feature. To specify operation configs. |
profiling.operation_config.lower_freq_profile_enabled boolean | Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling. Default: False |
profiling.operation_config.profile_date_of_month integer | Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect. |
profiling.operation_config.profile_day_of_week integer | Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect. |
stateful_ingestion StatefulStaleMetadataRemovalConfig | Configuration for stateful ingestion and stale metadata removal. |
stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False Default: False |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"title": "CassandraSourceConfig",
"description": "Configuration for connecting to a Cassandra or DataStax Astra DB source.",
"type": "object",
"properties": {
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"stateful_ingestion": {
"title": "Stateful Ingestion",
"description": "Configuration for stateful ingestion and stale metadata removal.",
"allOf": [
{
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
}
]
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details.",
"type": "string"
},
"contact_point": {
"title": "Contact Point",
"description": "Domain or IP address of the Cassandra instance (excluding port).",
"default": "localhost",
"type": "string"
},
"port": {
"title": "Port",
"description": "Port number to connect to the Cassandra instance.",
"default": 9042,
"type": "integer"
},
"username": {
"title": "Username",
"description": "Username credential with read access to the system_schema keyspace.",
"type": "string"
},
"password": {
"title": "Password",
"description": "Password credential associated with the specified username.",
"type": "string"
},
"datastax_astra_cloud_config": {
"title": "Datastax Astra Cloud Config",
"description": "Configuration for cloud-based Cassandra, such as DataStax Astra DB.",
"allOf": [
{
"$ref": "#/definitions/CassandraCloudConfig"
}
]
},
"keyspace_pattern": {
"title": "Keyspace Pattern",
"description": "Regex patterns to filter keyspaces for ingestion.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"table_pattern": {
"title": "Table Pattern",
"description": "Regex patterns to filter keyspaces.tables for ingestion.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"profile_pattern": {
"title": "Profile Pattern",
"description": "Regex patterns for tables to profile",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"profiling": {
"title": "Profiling",
"description": "Configuration for profiling",
"default": {
"enabled": false,
"operation_config": {
"lower_freq_profile_enabled": false,
"profile_day_of_week": null,
"profile_date_of_month": null
},
"limit": null,
"offset": null,
"report_dropped_profiles": false,
"turn_off_expensive_profiling_metrics": false,
"profile_table_level_only": false,
"include_field_null_count": true,
"include_field_distinct_count": true,
"include_field_min_value": true,
"include_field_max_value": true,
"include_field_mean_value": true,
"include_field_median_value": true,
"include_field_stddev_value": true,
"include_field_quantiles": false,
"include_field_distinct_value_frequencies": false,
"include_field_histogram": false,
"include_field_sample_values": true,
"field_sample_values_limit": 20,
"max_number_of_fields_to_profile": null,
"profile_if_updated_since_days": null,
"profile_table_size_limit": 5,
"profile_table_row_limit": 5000000,
"profile_table_row_count_estimate_only": false,
"max_workers": 20,
"query_combiner_enabled": true,
"catch_exceptions": true,
"partition_profiling_enabled": true,
"partition_datetime": null,
"use_sampling": true,
"sample_size": 10000,
"profile_external_tables": false,
"tags_to_ignore_sampling": null,
"row_count": true,
"column_count": true
},
"allOf": [
{
"$ref": "#/definitions/ProfileConfig"
}
]
}
},
"additionalProperties": false,
"definitions": {
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).",
"default": {},
"type": "object"
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"CassandraCloudConfig": {
"title": "CassandraCloudConfig",
"description": "Configuration for connecting to DataStax Astra DB in the cloud.",
"type": "object",
"properties": {
"token": {
"title": "Token",
"description": "The Astra DB application token used for authentication.",
"type": "string"
},
"secure_connect_bundle": {
"title": "Secure Connect Bundle",
"description": "File path to the Secure Connect Bundle (.zip) used for a secure connection to DataStax Astra DB.",
"type": "string"
},
"connect_timeout": {
"title": "Connect Timeout",
"description": "Timeout in seconds for establishing new connections to Cassandra.",
"default": 600,
"type": "integer"
},
"request_timeout": {
"title": "Request Timeout",
"description": "Timeout in seconds for individual Cassandra requests.",
"default": 600,
"type": "integer"
}
},
"required": [
"token",
"secure_connect_bundle"
],
"additionalProperties": false
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"OperationConfig": {
"title": "OperationConfig",
"type": "object",
"properties": {
"lower_freq_profile_enabled": {
"title": "Lower Freq Profile Enabled",
"description": "Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling.",
"default": false,
"type": "boolean"
},
"profile_day_of_week": {
"title": "Profile Day Of Week",
"description": "Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect.",
"type": "integer"
},
"profile_date_of_month": {
"title": "Profile Date Of Month",
"description": "Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect.",
"type": "integer"
}
},
"additionalProperties": false
},
"ProfileConfig": {
"title": "ProfileConfig",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether profiling should be done.",
"default": false,
"type": "boolean"
},
"operation_config": {
"title": "Operation Config",
"description": "Experimental feature. To specify operation configs.",
"allOf": [
{
"$ref": "#/definitions/OperationConfig"
}
]
},
"limit": {
"title": "Limit",
"description": "Max number of documents to profile. By default, profiles all documents.",
"type": "integer"
},
"offset": {
"title": "Offset",
"description": "Offset in documents to profile. By default, uses no offset.",
"type": "integer"
},
"report_dropped_profiles": {
"title": "Report Dropped Profiles",
"description": "Whether to report datasets or dataset columns which were not profiled. Set to `True` for debugging purposes.",
"default": false,
"type": "boolean"
},
"turn_off_expensive_profiling_metrics": {
"title": "Turn Off Expensive Profiling Metrics",
"description": "Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10.",
"default": false,
"type": "boolean"
},
"profile_table_level_only": {
"title": "Profile Table Level Only",
"description": "Whether to perform profiling at table-level only, or include column-level profiling as well.",
"default": false,
"type": "boolean"
},
"include_field_null_count": {
"title": "Include Field Null Count",
"description": "Whether to profile for the number of nulls for each column.",
"default": true,
"type": "boolean"
},
"include_field_distinct_count": {
"title": "Include Field Distinct Count",
"description": "Whether to profile for the number of distinct values for each column.",
"default": true,
"type": "boolean"
},
"include_field_min_value": {
"title": "Include Field Min Value",
"description": "Whether to profile for the min value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_max_value": {
"title": "Include Field Max Value",
"description": "Whether to profile for the max value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_mean_value": {
"title": "Include Field Mean Value",
"description": "Whether to profile for the mean value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_median_value": {
"title": "Include Field Median Value",
"description": "Whether to profile for the median value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_stddev_value": {
"title": "Include Field Stddev Value",
"description": "Whether to profile for the standard deviation of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_quantiles": {
"title": "Include Field Quantiles",
"description": "Whether to profile for the quantiles of numeric columns.",
"default": false,
"type": "boolean"
},
"include_field_distinct_value_frequencies": {
"title": "Include Field Distinct Value Frequencies",
"description": "Whether to profile for distinct value frequencies.",
"default": false,
"type": "boolean"
},
"include_field_histogram": {
"title": "Include Field Histogram",
"description": "Whether to profile for the histogram for numeric fields.",
"default": false,
"type": "boolean"
},
"include_field_sample_values": {
"title": "Include Field Sample Values",
"description": "Whether to profile for the sample values for all columns.",
"default": true,
"type": "boolean"
},
"field_sample_values_limit": {
"title": "Field Sample Values Limit",
"description": "Upper limit for number of sample values to collect for all columns.",
"default": 20,
"type": "integer"
},
"max_workers": {
"title": "Max Workers",
"description": "Number of worker threads to use for profiling. Set to 1 to disable.",
"default": 20,
"type": "integer"
},
"catch_exceptions": {
"title": "Catch Exceptions",
"default": true,
"type": "boolean"
},
"row_count": {
"title": "Row Count",
"default": true,
"type": "boolean"
},
"column_count": {
"title": "Column Count",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
}
}
}
Code Coordinates
- Class Name:
datahub.ingestion.source.cassandra.cassandra.CassandraSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Cassandra, feel free to ping us on our Slack.