Logs
The Logs scraper queries log aggregation systems to extract configuration changes from log entries. It supports multiple log backends including Loki, GCP Cloud Logging, OpenSearch, and BigQuery. This allows you to create configuration items and track changes based on log data.
Use Cases
- Application Configuration Changes: Track config reloads and updates from application logs
- Deployment Tracking: Monitor deployment events from CI/CD pipeline logs
- Error Analysis: Create configuration items from error patterns in logs
- Audit Trail: Track security and compliance events from audit logs
- Performance Monitoring: Extract performance metrics as configuration changes
logs-scraper.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: app-config-changes
namespace: mc
spec:
full: true
logs:
- id: None
type: None
loki:
url: http://localhost:3100
query: '{job="app"} |~ "Configuration reloaded:.*changed from.*to"'
limit: '50'
start: 24h
transform:
expr: |
dyn(config.logs).map(line, {
"changes": [
{
"external_change_id": line.hash,
"change_type": "ConfigReload",
"external_id": "fdee1b15-4579-499e-adc5-2817735ec3f6",
"config_type": "Azure::AppRegistration",
"created_at": line.firstObserved,
"scraper_id": "all"
}
]
}).toJSON()
# curl -X POST http://localhost:3100/loki/api/v1/push \
# -H "Content-Type: application/json" \
# -d '{
# "streams": [
# {
# "stream": {
# "job": "app"
# },
# "values": [
# ["'$(date +%s%N)'", "Configuration reloaded: database.max_connections changed from 100 to 200"],
# ["'$(date +%s%N)'", "Configuration reloaded: server.timeout changed from 30s to 60s"],
# ["'$(date +%s%N)'", "Configuration reloaded: cache.size changed from 1GB to 2GB"]
# ]
# }
# ]
# }'
| Field | Description | Scheme | Required |
|---|---|---|---|
schedule | Specify the interval to scrape in cron format. Defaults to every 60 minutes. | Cron | |
retention | Settings for retaining changes, analysis and scraped items | Retention | |
logs | Specifies the list of log configurations to scrape. | []Logs | true |
Logs
| Field | Description | Scheme |
|---|---|---|
bigQuery | BigQuery configuration for log scraping | |
gcpCloudLogging | GCP Cloud Logging configuration | |
loki | Loki configuration for log scraping | |
openSearch | OpenSearch configuration for log scraping | |
fieldMapping.id | Source field names containing unique identifiers for deduplication. Common values: |
|
fieldMapping.message | Source field names containing the main log message content. Common values: |
|
fieldMapping.severity | Source field names containing log level/severity. Used to categorize log entries. Common values: |
|
fieldMapping.timestamp | Source field names containing timestamp data. The scraper tries each field in order until it finds a non-empty value. Common values: |
|
Field Mapping
Different log systems use different field names for the same data. For example:
- Timestamp: Elasticsearch uses
@timestamp, Loki usests, CloudWatch usestimestamp - Message: Some systems use
message, others usemsg,log, ortext - Severity: Could be
level,severity,log_level, orpriority
Field mapping normalizes these differences so your transform expressions work consistently regardless of the log source.
How It Works
When you specify multiple field names, the scraper tries each one in order and uses the first non-empty value:
fieldMapping:
timestamp: ['@timestamp', 'timestamp', 'time'] # Try @timestamp first, then timestamp, then time
message: ['message', 'msg', 'log']
severity: ['level', 'severity']
id: ['trace_id', 'request_id', 'event_id']
When to Use Field Mapping
Use field mapping when:
- Your logs come from multiple sources with different schemas
- You're migrating between log systems and field names changed
- Third-party applications use non-standard field names
- You want transforms to be portable across different backends
Example: Normalizing ELK and Loki logs
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: unified-app-logs
spec:
logs:
# OpenSearch/Elasticsearch logs use @timestamp and message
- openSearch:
url: https://elasticsearch:9200
index: app-logs-*
fieldMapping:
timestamp: ['@timestamp']
message: ['message']
severity: ['level', 'log.level']
id: ['trace.id', 'request_id']
# Loki logs use different field names
- loki:
url: http://loki:3100
query: '{app="myservice"}'
fieldMapping:
timestamp: ['ts', 'timestamp']
message: ['line', 'msg']
severity: ['level', 'detected_level']
id: ['traceID']
BigQuery
| Field | Description | Scheme |
|---|---|---|
project* | GCP project ID containing the BigQuery dataset |
|
query* | SQL query to execute against BigQuery |
|
credentials | GCP service account credentials |
Example
bigquery-github-commits.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: github-commits-logs
namespace: mc
spec:
full: true
schedule: '@every 1h'
logs:
- name: golang-github-commits
id: None
type: None
transform:
expr: |
dyn(config.logs).map(line, {
"changes": [
{
"external_change_id": line.id,
"change_type": "Commit",
"external_id": "github://golang/go",
"config_type": "GitHub::Repository",
"created_at": line.firstObserved,
"summary": line.message,
"scraper_id": "all"
}
]
}).toJSON()
bigQuery:
project: workload-prod-eu-02
query: |
SELECT
FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%SZ', TIMESTAMP_SECONDS(committer.date.seconds)) as timestamp,
CASE
WHEN REGEXP_CONTAINS(LOWER(message), r'fix|bug|error') THEN 'high'
WHEN REGEXP_CONTAINS(LOWER(message), r'feat|add|new') THEN 'medium'
ELSE 'info'
END as severity,
message,
commit
FROM `bigquery-public-data.github_repos.commits`
Where 'golang/go' IN UNNEST(repo_name)
ORDER BY committer.date.seconds DESC
LIMIT 100
fieldMapping:
timestamp: ['timestamp']
severity: ['severity']
message: ['message']
id: ['commit']
GCPCloudLogging
| Field | Description | Scheme |
|---|---|---|
project* | GCP project ID |
|
credentials | GCP service account credentials | |
filter | Cloud Logging filter query |
|
orderBy | Field to order results by |
|
pageSize | Number of entries per page |
|
Example
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: gcp-audit-logs
spec:
logs:
- gcpCloudLogging:
project: my-gcp-project
filter: |
protoPayload.serviceName="compute.googleapis.com"
protoPayload.methodName:"compute.instances"
orderBy: timestamp desc
pageSize: 100
transform:
expr: |
dyn(config.logs).map(line, {
"changes": [{
"change_type": "GCPResourceChange",
"external_id": line.resource.labels.instance_id,
"config_type": "GCP::Instance",
"created_at": line.timestamp,
"summary": line.protoPayload.methodName
}]
}).toJSON()
Loki
| Field | Description | Scheme |
|---|---|---|
query* | LogQL query to execute |
|
url* | Loki server URL |
|
end | End time for the query |
|
limit | Maximum number of log entries to return |
|
password | Basic auth password | |
start | Start time for the query (e.g., '24h', '7d') |
|
username | Basic auth username |
Example
loki-config-changes.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: app-config-changes
namespace: mc
spec:
full: true
logs:
- id: None
type: None
loki:
url: http://localhost:3100
query: '{job="app"} |~ "Configuration reloaded:.*changed from.*to"'
limit: '50'
start: 24h
transform:
expr: |
dyn(config.logs).map(line, {
"changes": [
{
"external_change_id": line.hash,
"change_type": "ConfigReload",
"external_id": "fdee1b15-4579-499e-adc5-2817735ec3f6",
"config_type": "Azure::AppRegistration",
"created_at": line.firstObserved,
"scraper_id": "all"
}
]
}).toJSON()
# curl -X POST http://localhost:3100/loki/api/v1/push \
# -H "Content-Type: application/json" \
# -d '{
# "streams": [
# {
# "stream": {
# "job": "app"
# },
# "values": [
# ["'$(date +%s%N)'", "Configuration reloaded: database.max_connections changed from 100 to 200"],
# ["'$(date +%s%N)'", "Configuration reloaded: server.timeout changed from 30s to 60s"],
# ["'$(date +%s%N)'", "Configuration reloaded: cache.size changed from 1GB to 2GB"]
# ]
# }
# ]
# }'
OpenSearch
| Field | Description | Scheme |
|---|---|---|
index* | Index name or pattern |
|
url* | OpenSearch cluster URL |
|
password | Basic auth password | |
query | OpenSearch query DSL |
|
size | Number of results to return |
|
username | Basic auth username |
Example
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: opensearch-security-events
spec:
logs:
- openSearch:
url: https://opensearch-cluster:9200
index: security-logs-*
query: |
{
"query": {
"bool": {
"must": [
{"term": {"event_type": "authentication"}},
{"range": {"@timestamp": {"gte": "now-1h"}}}
]
}
}
}
size: 1000
username:
valueFrom:
secretKeyRef:
name: opensearch-creds
key: username
password:
valueFrom:
secretKeyRef:
name: opensearch-creds
key: password
fieldMapping:
timestamp: ['@timestamp', 'timestamp']
message: ['message', 'event_description']
severity: ['severity', 'log_level']
id: ['event_id', 'transaction_id']