API-Based Custom Integrations
Topics
RESTful API framework for custom integration development.
Webhook Integration Framework
Inbound Webhook Configuration
Configure Energylogserver to receive webhooks from external systems:
# Webhook Input Configuration
input {
http {
port => 8080
host => "0.0.0.0"
# Authentication
user => "${WEBHOOK_USER}"
password => "${WEBHOOK_PASSWORD}"
# SSL Configuration
ssl => true
ssl_certificate => "/opt/els/ssl/webhook.crt"
ssl_key => "/opt/els/ssl/webhook.key"
# Request validation
verify_certificate => true
ssl_certificate_authorities => ["/opt/els/ssl/ca.crt"]
# Codec configuration
codec => "json"
}
}
filter {
if [headers] {
# Route based on webhook source
if [headers][x-webhook-source] == "github" {
mutate {
add_field => { "event_source" => "github_security" }
add_field => { "event_type" => "code_security" }
}
}
if [headers][x-webhook-source] == "jira" {
mutate {
add_field => { "event_source" => "jira_security" }
add_field => { "event_type" => "incident_management" }
}
}
# Extract authentication info
if [headers][authorization] {
grok {
match => { "[headers][authorization]" => "Bearer %{DATA:api_token}" }
}
# Validate token
if [api_token] != "${EXPECTED_TOKEN}" {
drop { }
}
}
}
}
output {
elasticsearch {
hosts => ["${ELS_DATA_NODE_HOST}:9200"]
index => "webhook-events-%{+YYYY.MM.dd}"
# Use webhook source for routing
template_name => "webhook-events"
template_pattern => "webhook-events-*"
}
}
Database Integration
JDBC Input Configuration
Integrate with external databases for enrichment and data correlation:
# Database Integration Input
input {
jdbc {
jdbc_connection_string => "jdbc:sqlserver://db.company.com:1433;databaseName=SecurityDB"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
jdbc_driver_library => "/opt/logstash/vendor/jar/jdbc/sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
# Scheduled query execution
schedule => "*/15 * * * *"
# Query for threat intelligence data
statement => "
SELECT
indicator_value,
indicator_type,
threat_level,
source_feed,
created_date,
expiration_date
FROM threat_indicators
WHERE updated_date > ?
ORDER BY updated_date
"
# Parameters for incremental updates
use_column_value => true
tracking_column => "updated_date"
tracking_column_type => "timestamp"
# Performance optimization
jdbc_page_size => 50000
jdbc_paging_enabled => true
}
}
filter {
if [jdbc_input] {
# Transform database fields
mutate {
rename => {
"indicator_value" => "threat_indicator"
"indicator_type" => "threat_type"
"threat_level" => "severity"
}
}
# Add metadata
mutate {
add_field => {
"data_source" => "threat_intelligence_db"
"enrichment_type" => "threat_intel"
}
}
# Date handling
date {
match => [ "created_date", "yyyy-MM-dd HH:mm:ss" ]
target => "created_timestamp"
}
}
}
Message Queue Integration
Apache Kafka Integration
# Kafka Input Configuration
input {
kafka {
bootstrap_servers => "kafka1.company.com:9092,kafka2.company.com:9092,kafka3.company.com:9092"
topics => ["security-events", "network-logs", "application-logs"]
# Consumer Configuration
group_id => "energy-logserver-consumers"
client_id => "els-kafka-consumer"
# Security Configuration (SASL/SSL)
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${KAFKA_USER}' password='${KAFKA_PASSWORD}';"
# SSL Configuration
ssl_truststore_location => "/opt/els/ssl/kafka.truststore.jks"
ssl_truststore_password => "${KAFKA_TRUSTSTORE_PASSWORD}"
ssl_keystore_location => "/opt/els/ssl/kafka.keystore.jks"
ssl_keystore_password => "${KAFKA_KEYSTORE_PASSWORD}"
# Performance tuning
max_poll_records => 500
session_timeout_ms => 30000
# Offset management
auto_offset_reset => "earliest"
enable_auto_commit => false
# Codec configuration
codec => "json"
}
}
# Kafka Output for forwarding events
output {
kafka {
bootstrap_servers => "kafka1.company.com:9092,kafka2.company.com:9092"
topic_id => "energy-logserver-alerts"
# Message configuration
codec => json
key_serializer => "org.apache.kafka.common.serialization.StringSerializer"
value_serializer => "org.apache.kafka.common.serialization.StringSerializer"
# Performance optimization
batch_size => 16384
linger_ms => 5
compression_type => "gzip"
# Reliability configuration
acks => "all"
retries => 2147483647
max_in_flight_requests_per_connection => 5
}
}
Integration Testing and Validation
Integration Health Monitoring
Automated Integration Testing
#!/bin/bash
# Integration Health Check Script
INTEGRATION_LOG="/var/log/energy-logserver/integration-health.log"
echo "$(date): Starting integration health checks" >> $INTEGRATION_LOG
# Test Splunk Integration
test_splunk_integration() {
echo "Testing Splunk HEC connectivity..." >> $INTEGRATION_LOG
curl -k -X POST "https://splunk.company.com:8088/services/collector" \
-H "Authorization: Splunk ${SPLUNK_HEC_TOKEN}" \
-H "Content-Type: application/json" \
-d '{"event": "test_connectivity", "source": "els_integration_test"}' \
--max-time 10
if [ $? -eq 0 ]; then
echo "✓ Splunk integration: HEALTHY" >> $INTEGRATION_LOG
else
echo "✗ Splunk integration: FAILED" >> $INTEGRATION_LOG
fi
}
# Test API Endpoint Connectivity
test_api_endpoints() {
echo "Testing API endpoint connectivity..." >> $INTEGRATION_LOG
# Test internal ELS API
API_RESPONSE=$(curl -s -k "https://localhost:9200/_cluster/health" \
-H "Authorization: ApiKey ${ELS_API_KEY}")
if echo "$API_RESPONSE" | grep -q "green\|yellow"; then
echo "✓ ELS Data Node API: HEALTHY" >> $INTEGRATION_LOG
else
echo "✗ ELS Data Node API: FAILED" >> $INTEGRATION_LOG
fi
# Test external webhook endpoint
curl -s -X POST "https://els.company.com:8080/webhook" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${WEBHOOK_TOKEN}" \
-d '{"test": "connectivity_check"}' \
--max-time 5
if [ $? -eq 0 ]; then
echo "✓ Webhook endpoint: HEALTHY" >> $INTEGRATION_LOG
else
echo "✗ Webhook endpoint: FAILED" >> $INTEGRATION_LOG
fi
}
# Test Database Connectivity
test_database_connections() {
echo "Testing database connectivity..." >> $INTEGRATION_LOG
# Test threat intel database
sqlcmd -S db.company.com -U ${DB_USER} -P ${DB_PASSWORD} \
-Q "SELECT TOP 1 * FROM threat_indicators" -t 10
if [ $? -eq 0 ]; then
echo "✓ Threat Intel Database: HEALTHY" >> $INTEGRATION_LOG
else
echo "✗ Threat Intel Database: FAILED" >> $INTEGRATION_LOG
fi
}
# Test Message Queue Connectivity
test_message_queues() {
echo "Testing message queue connectivity..." >> $INTEGRATION_LOG
# Test Kafka connectivity
/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server kafka1.company.com:9092 > /dev/null 2>&1
if [ $? -eq 0 ]; then
echo "✓ Kafka cluster: HEALTHY" >> $INTEGRATION_LOG
else
echo "✗ Kafka cluster: FAILED" >> $INTEGRATION_LOG
fi
}
# Execute all tests
test_splunk_integration
test_api_endpoints
test_database_connections
test_message_queues
echo "$(date): Integration health checks completed" >> $INTEGRATION_LOG
Performance Monitoring
Integration Performance Metrics
# Integration Performance Monitoring Pipeline
input {
beats {
port => 5044
type => "integration_metrics"
}
}
filter {
if [type] == "integration_metrics" {
# Parse integration performance data
if [integration_type] == "splunk_hec" {
grok {
match => {
"message" => "Splunk HEC response time: %{NUMBER:response_time:float}ms, status: %{NUMBER:status_code:int}"
}
}
if [response_time] > 5000 {
mutate {
add_field => { "performance_alert" => "true" }
add_field => { "alert_reason" => "High Splunk HEC response time" }
}
}
}
if [integration_type] == "api_endpoint" {
grok {
match => {
"message" => "API endpoint %{WORD:endpoint} response time: %{NUMBER:response_time:float}ms"
}
}
# Calculate response time percentiles
aggregate {
task_id => "%{endpoint}"
code => "
map['response_times'] ||= []
map['response_times'] << event.get('response_time')
# Keep only last 100 measurements
if map['response_times'].length > 100
map['response_times'].shift
end
# Calculate percentiles
sorted_times = map['response_times'].sort
event.set('p95_response_time', sorted_times[(sorted_times.length * 0.95).to_i])
event.set('p99_response_time', sorted_times[(sorted_times.length * 0.99).to_i])
"
}
}
}
}
output {
if [type] == "integration_metrics" {
elasticsearch {
hosts => ["${ELS_DATA_NODE_HOST}:9200"]
index => "integration-metrics-%{+YYYY.MM.dd}"
}
# Send performance alerts
if [performance_alert] == "true" {
http {
url => "https://alerts.company.com/webhook"
http_method => "post"
headers => {
"Content-Type" => "application/json"
"Authorization" => "Bearer ${ALERT_WEBHOOK_TOKEN}"
}
format => "json"
mapping => {
"alert_type" => "integration_performance"
"message" => "%{alert_reason}"
"integration" => "%{integration_type}"
"metrics" => {
"response_time" => "%{response_time}"
"endpoint" => "%{endpoint}"
}
}
}
}
}
}