API-Based Custom Integrations

Topics

RESTful API framework for custom integration development.

Webhook Integration Framework

Inbound Webhook Configuration

Configure Energylogserver to receive webhooks from external systems:

# Webhook Input Configuration
input {
  http {
    port => 8080
    host => "0.0.0.0"
    
    # Authentication
    user => "${WEBHOOK_USER}"
    password => "${WEBHOOK_PASSWORD}"
    
    # SSL Configuration
    ssl => true
    ssl_certificate => "/opt/els/ssl/webhook.crt"
    ssl_key => "/opt/els/ssl/webhook.key"
    
    # Request validation
    verify_certificate => true
    ssl_certificate_authorities => ["/opt/els/ssl/ca.crt"]
    
    # Codec configuration
    codec => "json"
  }
}

filter {
  if [headers] {
    # Route based on webhook source
    if [headers][x-webhook-source] == "github" {
      mutate {
        add_field => { "event_source" => "github_security" }
        add_field => { "event_type" => "code_security" }
      }
    }
    
    if [headers][x-webhook-source] == "jira" {
      mutate {
        add_field => { "event_source" => "jira_security" }
        add_field => { "event_type" => "incident_management" }
      }
    }
    
    # Extract authentication info
    if [headers][authorization] {
      grok {
        match => { "[headers][authorization]" => "Bearer %{DATA:api_token}" }
      }
      
      # Validate token
      if [api_token] != "${EXPECTED_TOKEN}" {
        drop { }
      }
    }
  }
}

output {
  elasticsearch {
    hosts => ["${ELS_DATA_NODE_HOST}:9200"]
    index => "webhook-events-%{+YYYY.MM.dd}"
    
    # Use webhook source for routing
    template_name => "webhook-events"
    template_pattern => "webhook-events-*"
  }
}

Database Integration

JDBC Input Configuration

Integrate with external databases for enrichment and data correlation:

# Database Integration Input
input {
  jdbc {
    jdbc_connection_string => "jdbc:sqlserver://db.company.com:1433;databaseName=SecurityDB"
    jdbc_user => "${DB_USER}"
    jdbc_password => "${DB_PASSWORD}"
    jdbc_driver_library => "/opt/logstash/vendor/jar/jdbc/sqljdbc42.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    
    # Scheduled query execution
    schedule => "*/15 * * * *"
    
    # Query for threat intelligence data
    statement => "
      SELECT 
        indicator_value,
        indicator_type,
        threat_level,
        source_feed,
        created_date,
        expiration_date
      FROM threat_indicators 
      WHERE updated_date > ?
      ORDER BY updated_date
    "
    
    # Parameters for incremental updates
    use_column_value => true
    tracking_column => "updated_date"
    tracking_column_type => "timestamp"
    
    # Performance optimization
    jdbc_page_size => 50000
    jdbc_paging_enabled => true
  }
}

filter {
  if [jdbc_input] {
    # Transform database fields
    mutate {
      rename => {
        "indicator_value" => "threat_indicator"
        "indicator_type" => "threat_type"
        "threat_level" => "severity"
      }
    }
    
    # Add metadata
    mutate {
      add_field => { 
        "data_source" => "threat_intelligence_db"
        "enrichment_type" => "threat_intel"
      }
    }
    
    # Date handling
    date {
      match => [ "created_date", "yyyy-MM-dd HH:mm:ss" ]
      target => "created_timestamp"
    }
  }
}

Message Queue Integration

Apache Kafka Integration

# Kafka Input Configuration
input {
  kafka {
    bootstrap_servers => "kafka1.company.com:9092,kafka2.company.com:9092,kafka3.company.com:9092"
    topics => ["security-events", "network-logs", "application-logs"]
    
    # Consumer Configuration
    group_id => "energy-logserver-consumers"
    client_id => "els-kafka-consumer"
    
    # Security Configuration (SASL/SSL)
    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${KAFKA_USER}' password='${KAFKA_PASSWORD}';"
    
    # SSL Configuration
    ssl_truststore_location => "/opt/els/ssl/kafka.truststore.jks"
    ssl_truststore_password => "${KAFKA_TRUSTSTORE_PASSWORD}"
    ssl_keystore_location => "/opt/els/ssl/kafka.keystore.jks"
    ssl_keystore_password => "${KAFKA_KEYSTORE_PASSWORD}"
    
    # Performance tuning
    max_poll_records => 500
    session_timeout_ms => 30000
    
    # Offset management
    auto_offset_reset => "earliest"
    enable_auto_commit => false
    
    # Codec configuration
    codec => "json"
  }
}

# Kafka Output for forwarding events
output {
  kafka {
    bootstrap_servers => "kafka1.company.com:9092,kafka2.company.com:9092"
    topic_id => "energy-logserver-alerts"
    
    # Message configuration
    codec => json
    key_serializer => "org.apache.kafka.common.serialization.StringSerializer"
    value_serializer => "org.apache.kafka.common.serialization.StringSerializer"
    
    # Performance optimization
    batch_size => 16384
    linger_ms => 5
    compression_type => "gzip"
    
    # Reliability configuration
    acks => "all"
    retries => 2147483647
    max_in_flight_requests_per_connection => 5
  }
}

Integration Testing and Validation

Integration Health Monitoring

Automated Integration Testing

#!/bin/bash
# Integration Health Check Script
INTEGRATION_LOG="/var/log/energy-logserver/integration-health.log"

echo "$(date): Starting integration health checks" >> $INTEGRATION_LOG

# Test Splunk Integration
test_splunk_integration() {
    echo "Testing Splunk HEC connectivity..." >> $INTEGRATION_LOG
    
    curl -k -X POST "https://splunk.company.com:8088/services/collector" \
         -H "Authorization: Splunk ${SPLUNK_HEC_TOKEN}" \
         -H "Content-Type: application/json" \
         -d '{"event": "test_connectivity", "source": "els_integration_test"}' \
         --max-time 10
    
    if [ $? -eq 0 ]; then
        echo "✓ Splunk integration: HEALTHY" >> $INTEGRATION_LOG
    else
        echo "✗ Splunk integration: FAILED" >> $INTEGRATION_LOG
    fi
}

# Test API Endpoint Connectivity
test_api_endpoints() {
    echo "Testing API endpoint connectivity..." >> $INTEGRATION_LOG
    
    # Test internal ELS API
    API_RESPONSE=$(curl -s -k "https://localhost:9200/_cluster/health" \
                        -H "Authorization: ApiKey ${ELS_API_KEY}")
    
    if echo "$API_RESPONSE" | grep -q "green\|yellow"; then
        echo "✓ ELS Data Node API: HEALTHY" >> $INTEGRATION_LOG
    else
        echo "✗ ELS Data Node API: FAILED" >> $INTEGRATION_LOG
    fi
    
    # Test external webhook endpoint
    curl -s -X POST "https://els.company.com:8080/webhook" \
         -H "Content-Type: application/json" \
         -H "Authorization: Bearer ${WEBHOOK_TOKEN}" \
         -d '{"test": "connectivity_check"}' \
         --max-time 5
    
    if [ $? -eq 0 ]; then
        echo "✓ Webhook endpoint: HEALTHY" >> $INTEGRATION_LOG
    else
        echo "✗ Webhook endpoint: FAILED" >> $INTEGRATION_LOG
    fi
}

# Test Database Connectivity
test_database_connections() {
    echo "Testing database connectivity..." >> $INTEGRATION_LOG
    
    # Test threat intel database
    sqlcmd -S db.company.com -U ${DB_USER} -P ${DB_PASSWORD} \
           -Q "SELECT TOP 1 * FROM threat_indicators" -t 10
    
    if [ $? -eq 0 ]; then
        echo "✓ Threat Intel Database: HEALTHY" >> $INTEGRATION_LOG
    else
        echo "✗ Threat Intel Database: FAILED" >> $INTEGRATION_LOG
    fi
}

# Test Message Queue Connectivity
test_message_queues() {
    echo "Testing message queue connectivity..." >> $INTEGRATION_LOG
    
    # Test Kafka connectivity
    /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server kafka1.company.com:9092 > /dev/null 2>&1
    
    if [ $? -eq 0 ]; then
        echo "✓ Kafka cluster: HEALTHY" >> $INTEGRATION_LOG
    else
        echo "✗ Kafka cluster: FAILED" >> $INTEGRATION_LOG
    fi
}

# Execute all tests
test_splunk_integration
test_api_endpoints
test_database_connections
test_message_queues

echo "$(date): Integration health checks completed" >> $INTEGRATION_LOG

Performance Monitoring

Integration Performance Metrics

# Integration Performance Monitoring Pipeline
input {
  beats {
    port => 5044
    type => "integration_metrics"
  }
}

filter {
  if [type] == "integration_metrics" {
    # Parse integration performance data
    if [integration_type] == "splunk_hec" {
      grok {
        match => { 
          "message" => "Splunk HEC response time: %{NUMBER:response_time:float}ms, status: %{NUMBER:status_code:int}" 
        }
      }
      
      if [response_time] > 5000 {
        mutate {
          add_field => { "performance_alert" => "true" }
          add_field => { "alert_reason" => "High Splunk HEC response time" }
        }
      }
    }
    
    if [integration_type] == "api_endpoint" {
      grok {
        match => { 
          "message" => "API endpoint %{WORD:endpoint} response time: %{NUMBER:response_time:float}ms" 
        }
      }
      
      # Calculate response time percentiles
      aggregate {
        task_id => "%{endpoint}"
        code => "
          map['response_times'] ||= []
          map['response_times'] << event.get('response_time')
          
          # Keep only last 100 measurements
          if map['response_times'].length > 100
            map['response_times'].shift
          end
          
          # Calculate percentiles
          sorted_times = map['response_times'].sort
          event.set('p95_response_time', sorted_times[(sorted_times.length * 0.95).to_i])
          event.set('p99_response_time', sorted_times[(sorted_times.length * 0.99).to_i])
        "
      }
    }
  }
}

output {
  if [type] == "integration_metrics" {
    elasticsearch {
      hosts => ["${ELS_DATA_NODE_HOST}:9200"]
      index => "integration-metrics-%{+YYYY.MM.dd}"
    }
    
    # Send performance alerts
    if [performance_alert] == "true" {
      http {
        url => "https://alerts.company.com/webhook"
        http_method => "post"
        headers => {
          "Content-Type" => "application/json"
          "Authorization" => "Bearer ${ALERT_WEBHOOK_TOKEN}"
        }
        format => "json"
        mapping => {
          "alert_type" => "integration_performance"
          "message" => "%{alert_reason}"
          "integration" => "%{integration_type}"
          "metrics" => {
            "response_time" => "%{response_time}"
            "endpoint" => "%{endpoint}"
          }
        }
      }
    }
  }
}