PubSub
The PubSub scraper subscribes to message queues and pub/sub systems to consume messages and create configuration items from them. This enables real-time configuration tracking based on events and messages published to various messaging systems.
pubsub-scraper.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: pubsub-check
spec:
pubsub:
- pubsub:
project_id: flanksource-sandbox
subscription: incident-alerts-sub
type: PubItem
id: $.msg_id
transform:
expr: "[config].toJSON()"
| Field | Description | Scheme | Required |
|---|---|---|---|
schedule | Specify the interval to scrape in cron format. Defaults to every 60 minutes. | Cron | |
retention | Settings for retaining changes, analysis and scraped items | Retention | |
pubsub | Specifies the list of PubSub configurations to scrape. | []PubSub | true |
PubSub
Mapping
Custom scrapers require you to define the id and type for each scraped item. For example, when you scrape a file containing a JSON array, where each array element represents a config item, you must specify the id and type for those items.
You can achieve this by using mappings in your custom scraper configuration.
| Field | Description | Scheme |
|---|---|---|
id* | A static value or JSONPath expression to use as the ID for the resource. |
|
name* | A static value or JSONPath expression to use as the name for the resource. |
|
type* | A static value or JSONPath expression to use as the type for the resource. |
|
class | A static value or JSONPath expression to use as the class for the resource. |
|
createFields | A list of JSONPath expressions used to identify the created time of the config. If multiple fields are specified, the first non-empty value will be used. | []jsonpath |
deleteFields | A list of JSONPath expressions used to identify the deleted time of the config. If multiple fields are specified, the first non-empty value will be used. | []jsonpath |
description | A static value or JSONPath expression to use as the description for the resource. |
|
format | Format of config item, defaults to JSON, available options are JSON, properties. See Formats |
|
health | A static value or JSONPath expression to use as the health of the config item. |
|
items | A JSONPath expression to use to extract individual items from the resource. Items are extracted first and then the ID, Name, Type and transformations are applied for each item. | |
status | A static value or JSONPath expression to use as the status of the config item. |
|
timestampFormat | A Go time format string used to parse timestamps in createFields and deleteFields. (Default: RFC3339) |
|
Formats
JSON
The scraper stores config items as jsonb fields in PostgreSQL.
Resource providers typically return the JSON used. e.g. kubectl get -o json or aws --output=json.
When you display the config, the UI automatically converts the JSON data to YAML for improved readability.
XML / Properties
The scraper stores non-JSON files as JSON using:
{ 'format': 'xml', 'content': '<root>..</root>' }
You can still access non-JSON content in scripts using config.content.
The UI formats and renders XML appropriately.
Extracting Changes & Access Logs
When you enable full: true, custom scrapers can ingest changes and access logs from external systems by separating the config data from change events in your source.
QueueConfig
The PubSub scraper supports various message queue systems. Currently, GCP Pub/Sub is the primary supported system.
GCP Pub/Sub Configuration
Mapping
Custom scrapers require you to define the id and type for each scraped item. For example, when you scrape a file containing a JSON array, where each array element represents a config item, you must specify the id and type for those items.
You can achieve this by using mappings in your custom scraper configuration.
| Field | Description | Scheme |
|---|---|---|
id* | A static value or JSONPath expression to use as the ID for the resource. |
|
name* | A static value or JSONPath expression to use as the name for the resource. |
|
type* | A static value or JSONPath expression to use as the type for the resource. |
|
class | A static value or JSONPath expression to use as the class for the resource. |
|
createFields | A list of JSONPath expressions used to identify the created time of the config. If multiple fields are specified, the first non-empty value will be used. | []jsonpath |
deleteFields | A list of JSONPath expressions used to identify the deleted time of the config. If multiple fields are specified, the first non-empty value will be used. | []jsonpath |
description | A static value or JSONPath expression to use as the description for the resource. |
|
format | Format of config item, defaults to JSON, available options are JSON, properties. See Formats |
|
health | A static value or JSONPath expression to use as the health of the config item. |
|
items | A JSONPath expression to use to extract individual items from the resource. Items are extracted first and then the ID, Name, Type and transformations are applied for each item. | |
status | A static value or JSONPath expression to use as the status of the config item. |
|
timestampFormat | A Go time format string used to parse timestamps in createFields and deleteFields. (Default: RFC3339) |
|
Formats
JSON
The scraper stores config items as jsonb fields in PostgreSQL.
Resource providers typically return the JSON used. e.g. kubectl get -o json or aws --output=json.
When you display the config, the UI automatically converts the JSON data to YAML for improved readability.
XML / Properties
The scraper stores non-JSON files as JSON using:
{ 'format': 'xml', 'content': '<root>..</root>' }
You can still access non-JSON content in scripts using config.content.
The UI formats and renders XML appropriately.
Extracting Changes & Access Logs
When you enable full: true, custom scrapers can ingest changes and access logs from external systems by separating the config data from change events in your source.
Use Cases
- Event-Driven Configuration: React to configuration changes published to message queues
- Microservices Communication: Track service state changes communicated via pub/sub
- Alert Processing: Convert alert notifications into configuration changes
- Real-time Monitoring: Process streaming configuration data from various sources
- Integration Hub: Consume configuration events from multiple systems through a unified queue
Configuration Examples
GCP Pub/Sub Integration
pubsub-gcp.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: pubsub-check
spec:
pubsub:
- pubsub:
project_id: flanksource-sandbox
subscription: incident-alerts-sub
type: PubItem
id: $.msg_id
transform:
expr: "[config].toJSON()"
Multi-Message Processing
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: pubsub-deployment-events
spec:
pubsub:
- pubsub:
project_id: devops-project
subscription: deployment-events
credentials:
valueFrom:
secretKeyRef:
name: gcp-credentials
key: service-account.json
maxMessages: 50
type: DeploymentEvent
id: $.deployment_id
transform:
expr: |
dyn(config).map(msg, {
"name": msg.service_name + "-" + msg.version,
"type": "Service::Deployment",
"config": msg,
"changes": [{
"change_type": msg.event_type,
"external_id": msg.deployment_id,
"summary": "Deployed " + msg.service_name + " version " + msg.version,
"severity": msg.event_type == "deployment_failed" ? "high" : "info",
"created_at": msg.timestamp
}]
})
Message Filtering and Processing
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: pubsub-config-changes
spec:
pubsub:
- pubsub:
project_id: config-management
subscription: config-change-notifications
maxMessages: 200
type: ConfigurationChange
id: $.change_id
transform:
expr: |
dyn(config).
filter(msg, msg.event_type == "configuration_updated").
map(msg, {
"name": msg.component_name,
"type": "Configuration",
"config": {
"component": msg.component_name,
"environment": msg.environment,
"old_config": msg.previous_config,
"new_config": msg.current_config
},
"changes": [{
"change_type": "ConfigurationUpdate",
"external_id": msg.change_id,
"summary": "Configuration updated for " + msg.component_name,
"severity": msg.impact_level,
"created_at": msg.timestamp,
"diff": msg.config_diff
}]
})
Best Practices
- Message Acknowledgment: Messages are automatically acknowledged after successful processing
- Error Handling: Failed message processing will be retried based on the Pub/Sub subscription settings
- Batch Processing: Use
maxMessagesto control throughput and resource usage - Transform Expressions: Use CEL expressions to filter and transform messages into the desired configuration format