Clickhouse
The Clickhouse scraper executes SQL queries against a Clickhouse database or cloud storage systems (AWS S3, Azure Blob Storage) and creates configuration items from the query results. This allows you to treat data stored in Clickhouse or cloud storage as configuration items that can be tracked and monitored.
Use Cases
- Data Warehouse Integration: Treat analytical data as configuration items
- Business Logic Tracking: Monitor business rule changes stored in Clickhouse
- Cloud Storage Monitoring: Track files and data stored in S3 or Azure Blob Storage
- Data Pipeline Monitoring: Monitor data transformation results as configuration changes
- Historical Data Analysis: Create configuration items from time-series data
clickhouse-scraper.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: clickhouse-scraper
spec:
clickhouse:
- query: |
SELECT
concat('ORD-', toString(10000 + number)) as order_id,
['Electronics', 'Clothing', 'Books', 'Home', 'Sports'][rand() % 5 + 1] as category,
['New York', 'London', 'Tokyo', 'Paris', 'Sydney'][rand() % 5 + 1] as city,
round((rand() % 50000) / 100, 2) as amount,
['completed', 'pending', 'cancelled'][rand() % 3 + 1] as status,
toDateTime('2024-01-01 00:00:00') + toIntervalSecond(rand() % 31536000) as order_date
FROM numbers(1000)
type: Order
id: $.order_id
transform:
#full: true
expr: "[config].toJSON()"
| Field | Description | Scheme | Required |
|---|---|---|---|
logLevel | Specify the level of logging. | string | |
schedule | Specify the interval to scrape in cron format. Defaults to every 60 minutes. | Cron | |
full | Set to true to extract changes from scraped configurations. Defaults to false. | bool | |
retention | Settings for retaining changes, analysis and scraped items | Retention | |
clickhouse | Specifies the list of Clickhouse configurations to scrape. | []Clickhouse | true |
Clickhouse
| Field | Description | Scheme |
|---|---|---|
query* | SQL query to execute against Clickhouse |
|
awsS3 | AWS S3 configuration for cloud storage access | |
azureBlobStorage | Azure Blob Storage configuration for cloud storage access | |
clickhouseURL | Clickhouse connection URL in format: clickhouse://user:password@host:port/database?param1=value1¶m2=value2 |
|
Mapping
Custom scrapers require you to define the id and type for each scraped item. For example, when you scrape a file containing a JSON array, where each array element represents a config item, you must specify the id and type for those items.
You can achieve this by using mappings in your custom scraper configuration.
| Field | Description | Scheme |
|---|---|---|
id* | A static value or JSONPath expression to use as the ID for the resource. |
|
name* | A static value or JSONPath expression to use as the name for the resource. |
|
type* | A static value or JSONPath expression to use as the type for the resource. |
|
class | A static value or JSONPath expression to use as the class for the resource. |
|
createFields | A list of JSONPath expressions used to identify the created time of the config. If multiple fields are specified, the first non-empty value will be used. | []jsonpath |
deleteFields | A list of JSONPath expressions used to identify the deleted time of the config. If multiple fields are specified, the first non-empty value will be used. | []jsonpath |
description | A static value or JSONPath expression to use as the description for the resource. |
|
format | Format of config item, defaults to JSON, available options are JSON, properties. See Formats |
|
health | A static value or JSONPath expression to use as the health of the config item. |
|
items | A JSONPath expression to use to extract individual items from the resource. Items are extracted first and then the ID, Name, Type and transformations are applied for each item. | |
status | A static value or JSONPath expression to use as the status of the config item. |
|
timestampFormat | A Go time format string used to parse timestamps in createFields and deleteFields. (Default: RFC3339) |
|
Formats
JSON
The scraper stores config items as jsonb fields in PostgreSQL.
Resource providers typically return the JSON used. e.g. kubectl get -o json or aws --output=json.
When you display the config, the UI automatically converts the JSON data to YAML for improved readability.
XML / Properties
The scraper stores non-JSON files as JSON using:
{ 'format': 'xml', 'content': '<root>..</root>' }
You can still access non-JSON content in scripts using config.content.
The UI formats and renders XML appropriately.
Extracting Changes & Access Logs
When you enable full: true, custom scrapers can ingest changes and access logs from external systems by separating the config data from change events in your source.
AWSS3
| Field | Description | Scheme |
|---|---|---|
accessKey | AWS access key for authentication | |
bucket | S3 bucket name |
|
endpoint | Custom S3 endpoint URL |
|
path | Path within the S3 bucket |
|
region | AWS region |
|
secretKey | AWS secret key for authentication |
AzureBlobStorage
| Field | Description | Scheme |
|---|---|---|
collection* | Name of the collection in Clickhouse. See Named Collections |
|
account | Azure storage account name |
|
container | Azure blob container name |
|
endpoint | Azure endpoint suffix (default: core.windows.net) |
|
path | Path within the container |
|
An instance of Clickhouse server needs to be deployed for this scraper to function.
Mission Control can deploy an instance by setting config-db.clickhouse.enabled: true in the helm chart.
An external Clickhouse server can also be used via the clickhouseURL parameter
Configuration Examples
Direct Clickhouse Connection
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: clickhouse-orders
spec:
clickhouse:
- clickhouseURL: "clickhouse://user:password@localhost:9000/ecommerce"
query: |
SELECT
order_id,
customer_id,
status,
total_amount,
created_at
FROM orders
WHERE status = 'completed'
ORDER BY created_at DESC
LIMIT 1000
type: Order
id: $.order_id
AWS S3 Integration
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: clickhouse-s3-data
spec:
clickhouse:
- awsS3:
bucket: analytics-data
path: exports/
connection: connection://aws
query: |
SELECT *
FROM s3('s3://analytics-data/exports/*.parquet')
LIMIT 100
type: AnalyticsData
id: $.record_id
Azure Blob Storage Integration
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: clickhouse-azure-logs
spec:
clickhouse:
- azureBlobStorage:
account: mystorageaccount
container: logs
path: application-logs/
collectionName: ApplicationLogs
connectionString:
valueFrom:
secretKeyRef:
name: azure-storage
key: connection-string
query: |
SELECT
timestamp,
level,
message,
application
FROM azure_blob_storage_table
WHERE level IN ('ERROR', 'WARN')
type: LogEntry
id: $.timestamp
Users and Roles
Scrape ClickHouse users, roles, and their permissions from system tables:
clickhouse-iam.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: clickhouse-iam
spec:
full: true
clickhouse:
- clickhouseURL: "clickhouse://admin:password@localhost:9000/system"
type: Clickhouse::Instance
id: clickhouse_iam
transform:
full: true
script:
javascript: |+
var data = config;
var result = {
id: 'clickhouse_iam',
name: 'ClickHouse IAM',
config: {
version: data.version
},
external_users: (data.users || []).map(function(u) {
return {
name: u.name,
account_id: u.id,
user_type: u.storage === 'local_directory' ? 'local' : 'ldap',
aliases: [u.id]
};
}),
external_roles: (data.roles || []).map(function(r) {
return {
name: r.name,
account_id: r.id,
role_type: 'clickhouse_role',
aliases: [r.id]
};
}),
external_user_groups: (data.role_grants || []).map(function(g) {
return {
external_user_id: g.user_name || g.role_name,
external_group_id: g.granted_role_name
};
})
};
JSON.stringify([result])
query: |
SELECT
version() as version,
(
SELECT groupArray(tuple(id, name, storage))
FROM system.users
) as users,
(
SELECT groupArray(tuple(id, name, storage))
FROM system.roles
) as roles,
(
SELECT groupArray(tuple(user_name, role_name, granted_role_name))
FROM system.role_grants
) as role_grants
FORMAT JSONEachRow
Query Log for Access Tracking
Use the system.query_log table to track who accessed what data. See Access Logs for more details on the access log schema.
clickhouse-access-logs.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: clickhouse-access-audit
spec:
full: true
schedule: "*/15 * * * *"
clickhouse:
- clickhouseURL: "clickhouse://admin:password@localhost:9000/system"
type: Clickhouse::Database
id: $.database
transform:
full: true
script:
javascript: |+
var databases = {};
var rows = config.rows || [];
rows.forEach(function(row) {
var db = row.database || 'default';
if (!databases[db]) {
databases[db] = {
id: db,
name: db,
config: { database: db },
access_logs: []
};
}
databases[db].access_logs.push({
external_user_id: row.user,
created_at: row.event_time,
properties: {
query_type: row.type,
tables: row.tables,
read_rows: row.read_rows,
read_bytes: row.read_bytes,
client_hostname: row.client_hostname,
client_name: row.client_name,
query_duration_ms: row.query_duration_ms
}
});
});
JSON.stringify(Object.values(databases))
query: |
SELECT
event_time,
user,
type,
query_duration_ms,
read_rows,
read_bytes,
databases as database,
tables,
client_hostname,
client_name
FROM system.query_log
WHERE event_time > now() - INTERVAL 1 HOUR
AND type IN ('QueryFinish', 'QueryStart')
AND user != 'default'
ORDER BY event_time DESC
LIMIT 1000
FORMAT JSONEachRow
Change Tracking from MergeTree Tables
Track changes in MergeTree tables using the _version column or custom audit columns:
clickhouse-changes.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: clickhouse-config-changes
spec:
full: true
clickhouse:
- clickhouseURL: "clickhouse://user:password@localhost:9000/config_db"
type: Configuration
id: $.config_id
transform:
full: true
script:
javascript: |+
var results = [];
var rows = config.rows || [];
rows.forEach(function(row) {
results.push({
id: row.config_id,
name: row.config_name,
config: {
value: row.config_value,
environment: row.environment
},
changes: (row.history || []).map(function(h) {
return {
external_change_id: row.config_id + '_' + h.version,
change_type: h.change_type,
summary: h.description,
created_by: h.changed_by,
created_at: h.changed_at,
details: {
old_value: h.old_value,
new_value: h.new_value
}
};
})
});
});
JSON.stringify(results)
query: |
SELECT
config_id,
config_name,
config_value,
environment,
(
SELECT groupArray(tuple(
version,
change_type,
description,
changed_by,
changed_at,
old_value,
new_value
))
FROM config_history ch
WHERE ch.config_id = c.config_id
AND ch.changed_at > now() - INTERVAL 24 HOUR
ORDER BY changed_at DESC
) as history
FROM configurations c
WHERE is_active = 1
FORMAT JSONEachRow
Database and Table Inventory
Scrape ClickHouse database structure and table metadata:
clickhouse-inventory.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: clickhouse-inventory
spec:
clickhouse:
- clickhouseURL: "clickhouse://user:password@localhost:9000/system"
type: Clickhouse::Database
id: $.name
name: $.name
query: |
SELECT
name,
engine,
data_path,
metadata_path,
uuid
FROM system.databases
WHERE name NOT IN ('system', 'INFORMATION_SCHEMA', 'information_schema')
- clickhouseURL: "clickhouse://user:password@localhost:9000/system"
type: Clickhouse::Table
id: $.table_id
name: $.name
transform:
relationship:
- filter: config_type == 'Clickhouse::Table'
type:
value: Clickhouse::Database
name:
expr: config.database
parent: true
query: |
SELECT
concat(database, '.', name) as table_id,
database,
name,
engine,
total_rows,
total_bytes,
lifetime_rows,
lifetime_bytes,
metadata_modification_time,
create_table_query,
partition_key,
sorting_key,
primary_key
FROM system.tables
WHERE database NOT IN ('system', 'INFORMATION_SCHEMA', 'information_schema')
Cluster Status
Monitor ClickHouse cluster health and replication:
clickhouse-cluster.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: clickhouse-cluster
spec:
clickhouse:
- clickhouseURL: "clickhouse://user:password@localhost:9000/system"
type: Clickhouse::Cluster
id: $.cluster
name: $.cluster
health: |
{{- if eq .total_replicas .active_replicas }}healthy
{{- else }}warning{{- end }}
query: |
SELECT
cluster,
count() as total_replicas,
countIf(is_local = 1) as local_replicas,
countIf(is_local = 0) as remote_replicas,
sum(errors_count) as total_errors,
groupArray(host_name) as hosts
FROM system.clusters
GROUP BY cluster
- clickhouseURL: "clickhouse://user:password@localhost:9000/system"
type: Clickhouse::ReplicatedTable
id: $.table_id
name: $.table
status: $.is_leader
health: |
{{- if and (eq .is_readonly 0) (lt .absolute_delay 300) }}healthy
{{- else if lt .absolute_delay 3600 }}warning
{{- else }}unhealthy{{- end }}
query: |
SELECT
concat(database, '.', table) as table_id,
database,
table,
engine,
is_leader,
is_readonly,
is_session_expired,
future_parts,
parts_to_check,
zookeeper_path,
replica_name,
replica_path,
columns_version,
queue_size,
inserts_in_queue,
merges_in_queue,
log_pointer,
last_queue_update,
absolute_delay,
total_replicas,
active_replicas
FROM system.replicas
Performance Metrics
Track ClickHouse performance and resource usage:
clickhouse-metrics.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: clickhouse-metrics
spec:
clickhouse:
- clickhouseURL: "clickhouse://user:password@localhost:9000/system"
type: Clickhouse::Metrics
id: clickhouse_metrics
name: "ClickHouse Performance Metrics"
query: |
SELECT
(SELECT value FROM system.metrics WHERE metric = 'Query') as active_queries,
(SELECT value FROM system.metrics WHERE metric = 'Merge') as active_merges,
(SELECT value FROM system.metrics WHERE metric = 'ReplicatedFetch') as replicated_fetches,
(SELECT value FROM system.metrics WHERE metric = 'ReplicatedSend') as replicated_sends,
(SELECT value FROM system.metrics WHERE metric = 'TCPConnection') as tcp_connections,
(SELECT value FROM system.metrics WHERE metric = 'HTTPConnection') as http_connections,
(SELECT value FROM system.metrics WHERE metric = 'MemoryTracking') as memory_usage_bytes,
(SELECT sum(value) FROM system.asynchronous_metrics WHERE metric LIKE '%CacheUsed%') as cache_used_bytes
- clickhouseURL: "clickhouse://user:password@localhost:9000/system"
type: Clickhouse::DiskUsage
id: $.name
name: $.name
health: |
{{- $pct := (mulf (divf .free_space .total_space) 100) }}
{{- if gt $pct 20.0 }}healthy
{{- else if gt $pct 10.0 }}warning
{{- else }}unhealthy{{- end }}
query: |
SELECT
name,
path,
free_space,
total_space,
keep_free_space
FROM system.disks
See Also
- SQL Scraper - Full documentation on transforms, relationships, and access tracking
- ClickHouse Documentation - Official ClickHouse docs