Reading and Writing data from Kafka

As a security data lake SonarG can consume data from Kafka (as a Kafka consumer) and can generate data sent to be sent using Kafka (as a Kafka producer).

Producing Data and Sending it to Kafka

The data created by any job (relying on any report or query) can be sent over Kafka topics by using a job producer. Schedule your job as you would schedule any report and select the Kafka radio button. The pipeline you are using should create the data as you want it to appear on your queue. Normally you would ensure you project key and value strings and these will be placed onto the Kafka queue. If SonarG does not find a key it will try to use an _id and if that is not found either it will ue the current time since epoch in milliseconds as the key. If it cannot find a value attribute as a string it will try to serialize the entire document as a JSON document.

If you can connect to Kafka without authentication you can enter the server, port and topic directly within the job definition (but in production you should still use aliases). If you use SSL or Kerberos authentication you need to use an alias to a configuration definition in /etc/sonar/sonar_kafka_aliases.yaml. In this release the combination of SSL and Kerberos is not supported for SonarG as a Kafka producer (just one or the other; both are supported for a consumer).

In the job definition you can specify an alias or a list of aliases in an array for the servers. Each alias needs to already be defined in the mentioned yaml file. Aliases can be plain, SSL, Kerberos or SSL+Kerberos:

[
    {
        "name": "alias-plain",
        "bootstrap.servers": "192.168.1.1:9092",
        "security.protocol": "PLAINTEXT",
        "kafka.topics": ["topic1"],
        "rsyslog.host": "localhost",
        "rsyslog.port": 10546
    },
    {
        "name": "alias-ssl",
        "bootstrap.servers": "192.168.1.1:9092",
        "group.id": "group-ID",
        "security.protocol": "SSL",
        "ssl.truststore.location": "/trust_store",
        "ssl.truststore.password": "123456",
        "ssl.keystore.location": "/key_store",
        "ssl.keystore.password": "123456",
        "ssl.key.password": "123456",
        "ssl.key.location": "/path/to/key.pem",
        "ssl.certificate.location": "/pat/to/certificate.pem",
        "ssl.ca.location": "/path/to/ca-cert",
        "kafka-topics": ["topic1"],
        "rsyslog-host": "localhost",
        "rsyslog-port": 10546
    },
    {
        "name": "alias-kerberos",
        "bootstrap.servers": "192.168.1.1:9092",
        "group.id": "group-ID",
        "security.protocol": "SASL_PLAINTEXT",
        "sasl.mechanism": "GSSAPI",
        "sasl.kerberos.service.name": "my-kafka-service",
        "sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required \n
        useKeyTab=true \n useCache=false \n storeKey=false \n keyTab=
        '/home/user/my_keytab/user.keytab' \n serviceName='kafka' \n principal=
        'Principle_Name';",
        "kafka.topics": ["topic1", "topic2"],
        "rsyslog.host": "localhost",
        "rsyslog.port": 10546
    },
    {
        "name": "alias-kerberos-ssl",
        "kafka-host": "192.168.1.1",
        "kafka-port": 9095,
        "security.protocol": "SASL_SSL",
        "sasl.mechanism": "GSSAPI",
        "sasl.kerberos.service.name": "my-kafka-service",
        "sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required \n
        useKeyTab=true \n useCache=false \n storeKey=false \n keyTab=
        '/home/user/my_keytab/user.keytab' \n serviceName='kafka' \n principal=
        'Principle_Name';",
        "security.protocol": "SSL",
        "ssl.truststore.location": "/trust_store",
        "ssl.truststore.password": "123456",
        "ssl.keystore.location": "/key_store",
        "ssl.keystore.password": "123456",
        "ssl.key.password": "123456",
        "ssl.key.location": "/path/to/key.pem",
        "ssl.certificate.location": "/path/to/certificate.pem",
        "ssl.ca.location": "/path/to/ca-cert",
        "kafka.topics": ["topic1", "topic2"],
        "rsyslog.host": "localhost",
        "rsyslog.port": 10546
    }
]

Parameters:

  • “bootstrap.servers”:<kafka-host>:<kafka-port>
  • “ssl.truststore.location”: <full path name to the trust store> - used for kafka producer.
  • “ssl.keystore.location”: <full path name to the key store> - used for kafka producer.
  • “ssl.key.location”: <full path to the key file> - used for kafka consumer. can be generated from the key store - see here.
  • “ssl.key.password”: password for client key
  • “ssl.certificate.location”: <full path to the signed certificate pem file> - used for kafka consumer, needed only if ssl.client.auth=required in the kafka server settings can be generated from the key store - see here.
  • “ssl.ca.location”: <full path to the ca file> - used for kafka consumer.
  • “sasl.jaas.config”: There are 3 parameters that require setting
    1. keyTab=<full path name to the keytab file>
    2. serviceName=<name of the kerberos service> (same as “sasl.kerberos.service.name”)
    3. principal=<the principal logged in to kerberos>
  • “kafka.topics”: an array of 1 or more topics (used by kafka consumer)

Make sure a sonar user has access to the files in the following parameters:

  • “ssl.keystore.location”
  • “ssl.truststore.location”
  • “ssl.key.location”
  • “ssl.certificate.location”
  • “ssl.ca.location”

This means that these files should be in /etc/sonar directory, and under sonar-kafka-consumer user:

sudo chown sonar-kafka-consumer:sonar <file name>
sudo chmod 440 <file name>

When using SSL you need to create a trust store and a key store using standard tools such as OpenSSL. The following is an example sequence but this is in no way different than what you will already have done within your Kafka environment:

  1. Create private keys (server and client) and certificates using OpenSSL

2. Create trust chain (CA) certificate file using OpenSSL. For example, follow instructions at https://jamielinux.com/docs/openssl-certificate-authority/introduction.html 3. Create trust store with trust chain:

$ ~/dev/jdk1.8.0_112/bin/keytool -import -file ca-chain.cert.pem -alias localhost -keystore ca.truststore.jks -storepass 123456
  1. Create client/server keys and certificates, e.g.:

$ openssl genrsa -out intermediate/private/user1.key.pem 2048

$ openssl genrsa -out intermediate/private/server1.key.pem 2048

$ openssl req -new -sha256 -nodes -config intermediate/openssl.cnf -subj “/C=CA/ST=BC/O=jSonar/CN=aditya/emailAddress=user1@jsonar.com/L=Vancouver” -out intermediate/csr/user1.csr.pem -key intermediate/private/user1.key.pem

$ openssl req -new -sha256 -nodes -config intermediate/openssl.cnf -subj “/C=CA/ST=BC/O=jSonar/CN=localhost/emailAddress=user1@jsonar.com/L=Vancouver” -out intermediate/csr/server1.csr.pem -key intermediate/private/server1.key.pem

$ openssl ca -config intermediate/openssl.cnf -extensions usr_cert -days 365 -notext -md sha256 -in intermediate/csr/user1.csr.pem -out intermediate/certs/user1.cert.pem

$ openssl ca -config intermediate/openssl.cnf -extensions server_cert -days 365 -notext -md sha256 -in intermediate/csr/server1.csr.pem -out intermediate/certs/server1.cert.pem

$ openssl pkcs12 -export -in intermediate/certs/server1.cert.pem -inkey intermediate/private/server1.key.pem -out intermediate/certs/server1.pkcs12 -name localhost -noiter -nomaciter

  1. Create server keystore:

$ keytool -importkeystore -destkeystore sonar_key_store -srckeystore intermediate/certs/server1.pkcs12 -srcstoretype pkcs12 -alias localhost

$ openssl pkcs12 -export -in intermediate/certs/user1.cert.pem -inkey intermediate/private/user1.key.pem -out intermediate/certs/user1.pkcs12 -name localhost -noiter -nomaciter

  1. Create client keystore:

    $ /opt/jdk1.8.0_141/bin/keytool -importkeystore -destkeystore user_key_store -srckeystore intermediate/certs/user1.pkcs12 -srcstoretype pkcs12 -alias localhost

    $ openssl s_client -connect aditya:9093 -verify 10 -state -cert intermediate/certs/user1.cert.pem -key intermediate/private/user1.key.pem -CAfile intermediate/certs/ca-chain.cert.pem

    $ openssl pkcs12 -export -in user1.pem -out user1.pkcs12 -name user1 -noiter -nomaciter

    $ ~/dev/jdk1.8.0_112/bin/keytool -importkeystore

-destkeystore sonar_key_store -srckeystore user1.pkcs12 -srcstoretype pkcs12 -alias user1 -storepass 123456 -keypass 123456

Extract the certificate and key from jks keystores

In order to connect to the kafka consumer when the server configurations Everything we need is contained in the ‘kafka.client.keystore.jks’ file. To get an overview of its content you can call:

keytool -list -rfc -keystore kafka.client.keystore.jks

First, we will extract the client certificate:

keytool -exportcert -alias localhost -keystore kafka.client.keystore.jks -rfc -file certificate.pem -srcstorepass test1234 -deststorepass test1234

Next we will extract the client’s key. This is not supported directly by keytool, which is why we have to convert the keystore to pkcs12 format first and then extract the private key from that:

keytool -importkeystore -srckeystore kafka.client.keystore.jks -destkeystore cert_and_key.p12 -srcstoretype JKS -deststoretype PKCS12 -srcstorepass test1234 -deststorepass test1234 -srcalias localhost -srckeypass test1234 -destkeypass test1234 -noprompt

Now we convert it to key.pem:

openssl pkcs12 -in cert_and_key.p12 -nocerts -nodes -passin pass:test1234 -passout pass:test1234 | sed -n ‘/—-/,$p’ - > key.pem

kafka.client.keystore.jks - needs to be changes to the keystore file name in your system test1234 - needs to be changes to the password of your keyfile. These parameters can be omitted if you wish to be prompted for the passwords.

SonarKafka - Consuming Data from Kafka

SonarKafka is a collection of generic Kafka consumers that allow you to get any data from Kafka into the SonarG security lake. It binds to a Kafka queue and reads messages using the configuration specified in the same configuration file used by the Producer: /etc/sonar/sonar_kafka_aliases.yaml. When you run the consumer services it will use the configuration set named by the aliases specified in /etc/sonar/kafka_consumer_alias.txt. This file will contain just the names of the aliases to use. For example:

alias2-kerberos-plain
alias-kerberos-ssl

When started SonarKafka will run a service with a consumer per alias, This consumer will live until you stop the services.

In order for the Consumer to connect to rsyslog it needs to connect to the specified sonargateway port meant for the kafka consumer. this port is 10546, and this should be the value for the “rsyslog.port” parameter in the yaml file - /etc/sonar/sonar_kafka_aliases.yaml.

In addition in order for rsyslog to start reading from this port you need to uncomment the line corresponding to this port in the sonargateway configuration file: /etc/rsyslog.d/sonargateway.conf:

#$IncludeConfig /etc/rsyslog.d/sonar/gateway/rulesets/kafka_consumer.conf

After you remove the leading '#' restart rsyslog:

sudo systemctl restart rsyslog

Starting the consumers:

> sudo sonar-kafka-consumer-services start

Every alias in the /etc/sonar/kafka_consumer_alias.txt file creates a service called: sonar-kafka-consumer.<alias-name>.service

Stopping the consumers:

> sudo sonar-kafka-consumer-services stop

All info, warning and errors related to the services are logged at /var/log/sonargd/kafka-consumer.log

To see the status of the services generated by SonarKafka:

> sudo sonar-kafka-consumer-services status

A heartbeat is sent to rsyslog every 5 minutes from every consumer that is running. This heartbeat gets inserted into a logging collection in sonar_log.

The messages collected by SonarKafka are forwarded to SonarGateway by being posted on the rsyslog instance specified in the configuration, like so:

Sep 25 17:17:26 localhost {“foo”: 1, “bar”: 2}

The data can then be transformed, mapped and directed to the appropriate SonarG collections, as shown in the diagram:

_images/SonarKafka.png

SonarGateway can do any transformation needed on the Kafka message data; see SonarGateway.