Securing your logs in Confluent Cloud with HashiCorp Vault
Challenge
Logging is an important part of managing service availability, security, and customer experience. It allows Site Reliability Engineers (SREs), developers, security teams, and infrastructure teams to gain insights to how their services are being consumed and address any issues before they result in service outages or security incidents. Often, logs contain sensitive information that needs to be protected.
Consider the scenario where the applications team and security operations team require access to the same set of logs, however, the teams must not be able to see specific fields in the log and the security requirement is that they must be masked or encrypted when presented back to the applications team. The ability to perform field-level encryption of the log data is difficult to achieve, it requires the ability to extract, transform, and load (ETL) the data before it is presented to the end user.
Now, you might be thinking, ETL? Do I need to build a data pipeline? What data formats do I need to use? What encryption libraries do I use? How do I protect the encryption keys? How do I scale the infrastructure to match increased demand in log ingestion and processing? Sounds complex, but it doesn't have to be. This tutorial walks you through how to build a secure data pipeline with Confluent Cloud and HashiCorp Vault.
Architecture
This section walks through an example architecture that can achieve the requirements covered earlier.
Exploring various log aggregation and data streaming services, Confluent Cloud, a cloud-native Apache Kafka® service, is used in this specific architecture because it allows for easy provisioning of fully managed Kafka, providing ease of access, storage, and management of data streams. It also provides many data integration options.
The following covers the components used in this architecture and how they come together. Please note that configurations here are only for demonstration, and not to be used in a production environment.
Application
The application (app-a) is a simple JSON data generator that dumps logs to a specific volume. It is written in Python.
A Fluentd sidecar is configured to ingest the application logs and ship them to Confluent Cloud via a Fluentd Kafka plugin. The Fluentd plugin must have PKI certificates generated to be able to connect successfully to the Confluent Cloud platform; the generation of the certificates is taken care of by HashiCorp Vault.
Confluent Cloud
One of the use cases supported by Confluent is log analytics and Confluent Cloud is a core component of this architecture, it accelerates the deployment without having to worry about standing up a Kafka cluster. Confluent Cloud will be set up with two topics:
- app-a-ingress: Kafka topic for ingesting and storing app-a logs.
- app-a-egress-dev: Kafka topic for the storage of the encrypted logs. The topic name has -dev here to represent the topic for transformed logs for the developer team. A managed Confluent connector will be set up to push the encrypted log data to a logging system, Elasticsearch, which is used by the developer team.
Confluent Cloud supports many different types of connectors; this blog sets up two connector sinks, Elasticsearch, and AWS S3 sinks. Check out the Confluent Hub for a comprehensive list of sinks.
HashiCorp Vault Enterprise
HashiCorp Vault Enterprise is an identity-based secrets and encryption management system. A secret is anything that you want to tightly control access to, such as API encryption keys, passwords, or certificates. Vault provides encryption services that are gated by authentication and authorization methods.
For encryption, this tutorial utilizes various encryption methods of Vault Enterprise including transit, masking, and format preserving encryption (FPE). For detailed information on the encryption methods, have a look at the How to Choose a Data Protection Method blog.
Transformer
Transformer (app-a-transformer-dev) is a service responsible for encrypting the JSON log data, by calling to HashiCorp Vault APIs (using the hvac Python SDK). It is both a Kafka consumer and producer where encrypted JSON logs are written to another topic. The transformer is written in Python and utilizes the hvac Python Vault API client.
Elasticsearch/Kibana
ELK is widely used for analysis of logs and dashboards. Confluent Cloud will push the encrypted logs to Elasticsearch.
Prerequisites
Should have the following installed:
- AWS CLI installed
- Amazon EKSCTL CLI
- Helm
- Vault CLI
- Kubernetes command-line interface (CLI)
- HashiCorp Vault Enterprise: To test out all the encryption features covered in this blog, you need an Enterprise license key. You can sign up for a free trial. For more information on installing a Vault enterprise license see the Vault documentation here.
- Vault enterprise license key should be in a file named
vault.hclic
.
- Vault enterprise license key should be in a file named
- Confluent Cloud subscription: You can sign up for a free trial.
- AWS account
- AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY for a IAM User that can create and destroy EC2 instances, VPCs, and EKS clusters.
Clone example repository
Clone the learn-vault-secure-logs-confluent repo.
$ git clone https://github.com/hashicorp-education/learn-vault-secure-logs-confluent.git
Move into working directory.
$ cd learn-vault-secure-logs-confluent
Set up Confluent Cloud
Once logged in to Confluent Cloud, you need to set up the following.
After you log in, click Environments on the initial page.
Click +Add cloud environment.
Name the environment
confl
.Choose a Stream Governance Package - for this tutorial you want the Essentials free tier package, and then choose Begin Configuration.
In the Enable Stream Governance Essentials screen, choose AWS as a cloud provider and a region that does not incur extra cost (ex. Ohio us-east-2), choose Enable.
Add a cluster into the environment through the Create Cluster button.
In the Create Cluster page choose the Basic type and then select Begin configuration.
Choose a cloud provider to deploy the cluster to, this tutorial uses AWS, Singapore (ap-southeast-1) with a single zone and choose Continue
When the Enter payment card info page opens, look to the bottom left choose Skip payment.
Choose Launch cluster
In a short while, you will have a cluster up and running.
Configure topics
To configure the topics, select your cluster and choose the Topics on the left nav as below.
Click on Create topic, update the name to app-a-egress-dev, and then click on Create with defaults to use the default settings.
The topic Overview will appear for app-a-egress-dev.
Click on the Topics link on the left navigation panel once more.
Click on Create topic and update the name to app-a-ingress and use the default setting.
Then click on Create with defaults use the default settings.
The topic Overview will appear.
API keys
To publish to or consume data from a topic, authentication is required. Confluent Cloud provides the ability to generate API keys with role-based access control (RBAC) permissions that control which topics can be consumed to or published to. This setup uses a Global Access API key. To set this up, go to Confluent Cloud management console:
Under Cluster Overview select API Keys option on the left navigation menu.
Select the Create key button.
Select Global access and choose Next button.
Download the API credentials. The
API KEY
,API SECRET
, andBOOTSTRAP SERVER
in this file will be used to configure Vault.
Bootstrap server details
You also need the bootstrap server details, this can be found in the cluster settings page.
Also under Cluster Overview choose Cluster settings on the left navigation and see the page open.
Copy the Bootstrap server field. Keep a record of this information because it will be used for the application and transformer deployment configurations.
AWS EKS cluster
Set up your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY, replacing with the appropriate values below.
$ export AWS_ACCESS_KEY_ID=<YOUR_AWS_ACCESS_KEY> && export AWS_SECRET_ACCESS_KEY=<YOUR_SECRET_ACCESS_KEY>
Run the eksctl command shown below to create a VPC and a managed AWS EKS cluster. Since this is a temporary environment and to keep costs down, spot instances are used.
$ eksctl create cluster --name cluster-1 --region ap-southeast-1 \ --nodegroup-name nodes --spot --instance-types=t3.medium --nodes 3 \ --nodes-min 1 --nodes-max 3 --with-oidc --managed
Note
This step can take a while (20+ minutes). The following message will be displayed when the EKS cluster is ready: `2022-11-18 11:08:52 [✔] EKS cluster "cluster-1" in "ap-southeast-1" region is ready`.Create a IAM service account. This will map an AWS IAM role to a Kubernetes service account. The AWS IAM role will use a policy that allows EBS CSI Driver access.
$ eksctl create iamserviceaccount \ --name ebs-csi-controller-sa \ --namespace kube-system \ --cluster cluster-1 \ --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \ --approve \ --role-only \ --role-name AmazonEKS_EBS_CSI_DriverRole
The output should resemble this:
2023-05-16 08:05:46 [ℹ] 1 existing iamserviceaccount(s) (kube-system/aws-node) will be excluded2023-05-16 08:05:46 [ℹ] 1 iamserviceaccount (kube-system/ebs-csi-controller-sa) was included (based on the include/exclude rules)2023-05-16 08:05:46 [!] serviceaccounts in Kubernetes will not be created or modified, since the option --role-only is used2023-05-16 08:05:46 [ℹ] 1 task: { create IAM role for serviceaccount "kube-system/ebs-csi-controller-sa" }2023-05-16 08:05:46 [ℹ] building iamserviceaccount stack "eksctl-cluster-1-addon-iamserviceaccount-kube-system-ebs-csi-controller-sa"2023-05-16 08:05:46 [ℹ] deploying stack "eksctl-cluster-1-addon-iamserviceaccount-kube-system-ebs-csi-controller-sa"2023-05-16 08:05:47 [ℹ] waiting for CloudFormation stack "eksctl-cluster-1-addon-iamserviceaccount-kube-system-ebs-csi-controller-sa"2023-05-16 08:06:18 [ℹ] waiting for CloudFormation stack "eksctl-cluster-1-addon-iamserviceaccount-kube-system-ebs-csi-controller-sa"2023-05-16 08:06:52 [ℹ] waiting for CloudFormation stack "eksctl-cluster-1-addon-iamserviceaccount-kube-system-ebs-csi-controller-sa"
Retrieve and copy down your AWS Account number for use in the next step.
$ aws sts get-caller-identity --query "Account" --output text
Add the aws-ebs-csi-driver to the EKS cluster. Update the
AWS_ACCOUNT_NUMBER
with the account number for your AWS account.$ eksctl create addon --name aws-ebs-csi-driver --cluster cluster-1 --service-account-role-arn arn:aws:iam::<AWS_ACCOUNT_NUMBER>:role/AmazonEKS_EBS_CSI_DriverRole --force
Output should resemble the following:
2023-05-16 08:11:20 [ℹ] Kubernetes version "1.25" in use by cluster "cluster-1"2023-05-16 08:11:21 [ℹ] using provided ServiceAccountRoleARN "arn:aws:iam::166839932314:role/AmazonEKS_EBS_CSI_DriverRole"2023-05-16 08:11:21 [ℹ] creating addon
Vault server
Move your copy of an Vault enterprise license to the current directory. The file should be named
vault.hclic
.Start with adding the HashiCorp repo to Helm.
$ helm repo add hashicorp https://helm.releases.hashicorp.com
Copy your file with the Vault Enterprise licence to the local directory.
Now you will copy the licence key to a Kubernetes secret.
$ secret=$(cat vault.hclic) && kubectl create secret generic vault-ent-license --from-literal="license=${secret}"
Install Vault on your cluster.
$ helm install hashicorp hashicorp/vault -f vault-config.yaml
This will deploy a Vault Enterprise instance in development mode with the root token set to
root
.Verify that Vault is deployed and running:
$ kubectl get podsNAME READY STATUS RESTARTS AGEhashicorp-vault-0 1/1 Running 0 9shashicorp-vault-agent-injector-985cd6494-ftpwf 1/1 Running 0 9s
Note
Problems here are likely due to issues with the enterprise license file. Check that the Kubernetes secret vault-ent-license was successfully created.
Configure Vault
There are a few things you need to configure on Vault, including the Transit and Transform secrets engine and Kubernetes authentication methods.
Now you will connect to the Vault container and confirm you can access it.
Open a new terminal window.
Expose Vault externally to the Kubernetes cluster using port-forwarding:
$ kubectl port-forward hashicorp-vault-0 8200:8200Forwarding from 127.0.0.1:8200 -> 8200Forwarding from [::1]:8200 -> 8200...
Back in the original terminal window, set the AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID and run these:
$ export VAULT_ADDR="http://localhost:8200" && export VAULT_TOKEN="root"
You can check the status of Vault:
$ vault statusKey Value--- -----Seal Type shamirInitialized trueSealed falseTotal Shares 1Threshold 1Version 1.9.1+entStorage Type inmemCluster Name vault-cluster-a5b35278Cluster ID 50bbe23a-1648-004f-a523-6fe8b8a9bb38HA Enabled false
You should be able to see the Vault UI by navigating in your browser to http://localhost:8200.
KV secrets engine
The application and transformers will require access to the Confluent Cloud API keys and the bootstrap server details you recorded in the API keys and bootstrap server details steps above.
As part of InfoSec best practices, avoid hardcoding credentials.
Mount the KV secrets engine.
$ vault secrets enable -version=2 kv
Store Confluent Cloud API keys for the application and transformer. Update the
BOOTSTRAP_SERVER
with the bootstrap server,API_key
with the Confluent Cloud global API client ID,API_SECRET
with the Confluent Cloud global API client secret before running the command.$ vault kv put kv/confluent-cloud client_id=<API_KEY> \ client_secret=<API_SECRET> \ connection_string=<BOOTSTRAP_SERVER> \ convergent_context_id="YWJjMTIz"
The results will resemble this:
===== Secret Path =====kv/data/confluent-cloud======= Metadata =======Key Value--- -----created_time 2022-12-13T17:17:38.293549086Zcustom_metadata <nil>deletion_time n/adestroyed falseversion 1
Store configurations for json values to be encrypted and encryption method to apply. These will be fetched by the transformer.
$ vault kv put kv/app-a/config - << EOF{"keys_of_interest":[{"key": "owner.email", "method": "aes"},{"key": "owner.NRIC", "method": "transform", "transformation":"sg-nric-mask"},{"key": "owner.telephone", "method": "transform", "transformation":"sg-phone-fpe"},{"key": "choices.places_of_interest", "method": "aes-converge"}],"transform_mount":"transform","transform_role_name":"sg-transform","transit_mount":"transit","transit_key_name":"transit","convergent_key_name":"transit-convergent"}EOF
PKI secrets engine
The PKI secrets engine needs to be set up to provide X.509 certificates for the application, specifically the Fluentd sidecar. The Kafka plugin requires the certificates to make the connection to Confluent Cloud.
Enable PKI secrets engine.
$ vault secrets enable pki
Configure the CA Certificate and private key
$ vault write pki/root/generate/internal \ common_name=service.internal \ ttl=8760h
Create a new PKI role.
$ vault write pki/roles/app \ allowed_domains=service.internal \ allow_subdomains=true \ max_ttl=72h
Transit secrets engine
This section walks through the setup of the Vault Transit secrets engine. The requirements specify the need to encrypt the owner.email and choices.places_of_interest
with the AES encryption method. Below are the Vault CLI commands to set up the secrets engine:
Enable the transit secrets engine.
$ vault secrets enable transit
Create a transit AES256 encryption key.
$ vault write -f transit/keys/transit type=aes256-gcm96
Create a convergent transit encryption key.
$ vault write -f transit/keys/transit-convergent \ convergent_encryption=true derived=true type=aes256-gcm96
This will mount the Transit secrets engine and configure two AES-256 encryption keys and will be used by the transformer to encrypt the required fields in the logs.
Transform secrets engine
The Transform secrets engine is a Vault Enterprise feature that allows for more advanced encryption capabilities.
To configure the Transform secrets engine, first mount the Transform secrets engine:
$ vault secrets enable transform
NRIC transform configuration
Singaporean security requirements dictate that NRIC (National Registration Identity Card) details must be masked. This template configuration specifies the regex pattern for the NRIC, while the transformation configuration specifies the type of transform (masking or format preserving encryption) to be done.
Create a template for the NRIC pattern.
$ vault write transform/template/sg-nric \ type=regex \ pattern='[A-Z]{1}(\d{7})[A-Z]{1}' \ alphabet=builtin/numeric
Create a transformation for NRIC.
$ vault write transform/transformation/sg-nric-mask \ type=masking \ masking_character='*' \ template=sg-nric \ tweak_source=internal \ allowed_roles=sg-transform
Telephone transform configuration
Security requirements also dictate that phone numbers must be encrypted with format preserving encryption (FPE).
Create a template for the phone number pattern.
$ vault write transform/template/sg-phone \ type=regex \ pattern='[+](\d{2})-(\d{4})-(\d{4})' \ alphabet=builtin/numeric
Create a transformation for the phone number.
$ vault write transform/transformation/sg-phone-fpe \ type=fpe \ template=sg-phone \ tweak_source=internal \ allowed_roles=sg-transform
A transform role is configured to allow access to the two transformations (sg-nric-mask and sg-phone-fpe) created earlier.
$ vault write transform/role/sg-transform \ transformations=sg-nric-mask,sg-phone-fpe
Kubernetes auth method
Since the application and the transformer will be deployed on Kubernetes and require access to HashiCorp Vault, the Kubernetes authentication method is an effective way to enable this. To configure:
Set up an authentication service account on the Kubernetes cluster.
$ kubectl apply --filename kubernetes/vault-auth-service-account.yamlserviceaccount/vault-auth createdclusterrolebinding.rbac.authorization.k8s.io/role-tokenreview-binding created
Create a secret used by Kubernetes authentication.
$ kubectl apply --filename kubernetes/vault-auth-secret.yamlsecret/vault-auth-secret created
Enable the Kubernetes auth method.
$ vault auth enable kubernetesSuccess! Enabled kubernetes auth method at: kubernetes/
Need to get a few details from the Kubernetes cluster to complete the Vault configuration.
VAULT_HELM_SECRET_NAME=$(kubectl get secrets --output=json | jq -r '.items[].metadata | select(.name|startswith("vault-auth-")).name')TOKEN_REVIEW_JWT=$(kubectl get secret $VAULT_HELM_SECRET_NAME --output='go-template={{ .data.token }}' | base64 --decode)KUBE_CA_CERT=$(kubectl config view --raw --minify --flatten --output='jsonpath={.clusters[].cluster.certificate-authority-data}' | base64 --decode)KUBE_HOST=$(kubectl get services --field-selector metadata.name=kubernetes -o jsonpath='{.items[].spec.clusterIP}')
Review the values.
$ echo $VAULT_HELM_SECRET_NAME && echo $TOKEN_REVIEW_JWT && echo $KUBE_CA_CERT && echo $KUBE_HOST
Blank lines indicate a problem, so output should resemble the following:
vault-auth-secreteyJhbGciOiJSUzI1NiIsImtpZCI6Imw5MnpHMURxZG5mNDZJVlFvWjQ0M01ENHZPLW1hWk5Rd284OE11OW8tZFkifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJkZWZhdWx0Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZWNyZXQubmFtZSI6InZhdWx0LWF1dGgtc2VjcmV0Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6InZhdWx0LWF1dGgiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51aWQiOiI1YzMwNDhlNC0xMjIzLTRmMjUtOGMyYi0zZGIzNDVmZWI2ZmIiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6ZGVmYXVsdDp2YXVsdC1hdXRoIn0.dJ1dgM7T7oUsu5bb0g5KwpZncItfaE48p_8bqlSSxqjin0nWsuj9KXjtp-RRw7iMJdtOKg3HYSoedr0daefo-3ohbM_ECdZ7IL7YrA4bbfSsk3X7pcaK0hMGOapWM0MYvI863GDWv0S7bU2zeeL1bO6cYpc0YwziJllOAQz52X3hOgXaS4PP_hYbCeZZ3pdPJwCsQBcXtsgjVNg5VdI4WJDSyqWPqiKpuNlLgtYD7ur-KODHZ7gViI83Iy7_0z2Y0be_VVJL_RuVJmU3sFmkagkYOrOm5CXp_gIKmEFDaCxbThJPJIAL6ESKuE-9gHcxwuYAt9SrlX64nz5N-idfZQ-----BEGIN CERTIFICATE-----MIIC/jCCAeagAwIBAgIBADANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDEwprdWJlcm5ldGVzMB4XDTIzMDUxNjA3NTkyOFoXDTMzMDUxMzA3NTkyOFowFTETMBEGA1UEAxMKa3ViZXJuZXRlczCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMDVWiGSbRlCFtzYCRBeX3cssXJgLj6Aox9vKyy/cAfXxz0N0E0wHiMls6/T0alzKxT6NDp/rMcE7zSwVLFljVhSLIJCGxjUHcaackQj6bFeXwoR7mmv7EshwDHYXcAXA2X3n9AmEArQeXF5DpoD5xx0NSCoZ90eOjPeidHw6J3xR3aBMpdZsvj6fS62M/CWxEQq9nd5NspYaFGrysczsC4zF1MNuj3S1LVWuEQMgP/rWFRV/L9qsMaiKlNTsEbkXC9pE9ICpFNokqI/YwbLnhLyCp1jFsq3xFWoE65OE3ve/YtdPm1vV8rbU5vEFII6pG8+3vZlQtpuzOJJ9TJ95okCAwEAAaNZMFcwDgYDVR0PAQH/BAQDAgKkMA8GA1UdEwEB/wQFMAMBAf8wHQYDVR0OBBYEFGrdINWtfp/c0gsrezX1KCYHiHRCMBUGA1UdEQQOMAyCCmt1YmVybmV0ZXMwDQYJKoZIhvcNAQELBQADggEBAG8X3ksv4pseskKESBEMY4wfAqr/M2pny4RCZlnfZZ8EHU75rDzl9PJzsqvgDKvRLrEdHKBJ5i4a3TEaVahcvQyCEvVneV7OqzuotUA3qCXwO9+J7VGnfZIjTT18t2xjT3lt9O7MANS6sYtM3VbQB7mNEMYFnQfsSSTqMM1AtVNOoFz81yJsDUOHmD3D0e9R4N1KCt643EHPxPEVNytG8aAWkZOITqxaHjEhh5Tlt8+KUDeevr53jef91S9+1jKdG8w+6eoGxWFW2iDJb0u7kZImtvTGeZulYvQmuaSGU9uoL6CyJUtcH4cWY8GBuOqlS9PBbOdFrR2puB3dFpGKe1E=-----END CERTIFICATE-----10.100.0.1
Configure the Kubernetes secrets engine.
$ vault write auth/kubernetes/config \ kubernetes_host="https://$KUBE_HOST" \ token_reviewer_jwt=$TOKEN_REVIEW_JWT \ kubernetes_ca_cert="$KUBE_CA_CERT" \ disable_iss_validation=true
Kubernetes auth method roles
These roles will be used by the application and transformers to authenticate to Vault.
Create the application role.
$ vault write auth/kubernetes/role/app \ bound_service_account_names=app \ bound_service_account_namespaces=default \ policies=app-a-policy \ ttl=24h
Create the transformer role.
$ vault write auth/kubernetes/role/transform \ bound_service_account_names=transform \ bound_service_account_namespaces=default \ policies=transformer-policy \ ttl=24h
Configure Vault policies
The application will require access to the secrets configured earlier in the KV secrets engine section. To allow this, Vault policies need to be configured:
$ vault policy write app-a-policy - <<EOFpath "kv/data/confluent-cloud" {capabilities = ["read"]}path "pki/issue/app" {capabilities = ["update"]}EOF
Transformer will require access to the transit and transform secrets engines for encryption.
$ vault policy write transformer-policy - <<EOFpath "/transit/encrypt/transit-convergent" { capabilities = ["update"]}path "transit/encrypt/transit" { capabilities = ["update"]}path "kv/data/confluent-cloud" { capabilities = ["read"]}path "kv/data/app-a/config" { capabilities = ["read"]}path "transform/encode/sg-transform" { capabilities = ["update"]}EOF
Transformer
The transformer will retrieve certain configurations stored in Vault as per the steps in the KV secrets engine, specifically in the
kv/app-a/config
andkv/confluent-cloud
paths. Here is a run down of the configurations:Configuration parameters description client_id string Confluent Cloud global API client ID set up in API keys client_secret string Confluent Cloud global API client secret set up in API keys connection_string string Confluent Cloud Bootstrap server found in Bootstrap server details keys_of_interest key: The JSON key path (in . notation) - method Encryption method options to use: aes, aes-converge, transform (if using transform, the transformation name also needs to be specified) - transformation Specifies the name of the transformation configuration (masking, FPE, tokenization); these transformations were created in steps NRIC transform configuration and Telephone transform configuration transform_mount string Transform secrets engine path, configured in Transform , default is transform transform_role_name string Transform role that has permissions to the transformations configured in NRIC transform configuration and Telephone transform configuration transit_mount string Transit secrets engine path, configured in Transit secrets engine transit_key_name string Name of Transit encryption key convergent_key_name string Name of Transit encryption key set with derived as true. Convergent encryption requires a context which must be provided. Encryption operations yield the same ciphertext when using this key. convergent_context_id string(base64-encoded) Context used for convergent encryption To build and deploy the transformer, run this command (from
learn-vault-secure-logs-confluent
git repo directory):$ kubectl apply -f deploy/transform-deploy.ymldeployment.apps/transform createdservice/transform createdserviceaccount/transform created
The annotations in the deployment will configure a Vault Agent sidecar (listening on port 8200) and authenticate using the Kubernetes authentication method. Since agent-cache-enable and agent-cache-use-auto-auth-token are set to true, this will allow the Transformer to request secrets using the Vault Agent on
http://localhost:8200
using the supplied token to the Vault Agent.
---apiVersion: apps/v1kind: Deploymentmetadata:name: transformspec:selector: matchLabels: app: transformtemplate: metadata: labels: app: transform annotations: vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/role: "transform" vault.hashicorp.com/agent-cache-enable: "true" vault.hashicorp.com/agent-cache-use-auto-auth-token: "true" spec: serviceAccountName: transform containers: - name: transform env: - name: KAFKA_GROUP value: 'app-a-group' - name: INGRESS_TOPIC value: 'app-a-ingress' - name: EGRESS_TOPIC value: 'app-a-egress-dev' - name: SECRETS_PATH value: 'kv/data/confluent-cloud' - name: CONFIGS_PATH value: 'kv/data/app-a/config' - name: VAULT_ADDR value: 'http://localhost:8200' - name: VAULT_TOKEN value: '' - name: LOGLEVEL value: 'DEBUG' image: hashieducation/vault-confluentcloud-demo-transform:latest imagePullPolicy: Always resources: limits: memory: "128Mi" cpu: "500m" ports: - containerPort: 8080---kind: ServiceapiVersion: v1metadata:name: transformspec:selector: app: transformtype: ClusterIPports: - name: tcp port: 8080 targetPort: 8080---apiVersion: v1 kind: ServiceAccountmetadata: name: transform
Once the Transformer is deployed, it will subscribe to the Confluent Cloud app-a-ingress topic and monitor for incoming logs. Logs are processed and are then published to the app-a-engress-dev topic.
Elasticsearch and Kibana
The encrypted logs will be sent to Elasticsearch and viewed in Kibana. This section covers a setup with ECK (Elastic Cloud on Kubernetes) per quickstart instructions.
Some modifications were made to the deployment, including exposing Elasticsearch to the internet with a LoadBalancer.
To install, run the following:
Create the instance of Elastic Cloud.
$ kubectl create -f https://download.elastic.co/downloads/eck/1.9.1/crds.yamlcustomresourcedefinition.apiextensions.k8s.io/agents.agent.k8s.elastic.co createdcustomresourcedefinition.apiextensions.k8s.io/apmservers.apm.k8s.elastic.co createdcustomresourcedefinition.apiextensions.k8s.io/beats.beat.k8s.elastic.co createdcustomresourcedefinition.apiextensions.k8s.io/elasticmapsservers.maps.k8s.elastic.co createdcustomresourcedefinition.apiextensions.k8s.io/elasticsearches.elasticsearch.k8s.elastic.co createdcustomresourcedefinition.apiextensions.k8s.io/enterprisesearches.enterprisesearch.k8s.elastic.co createdcustomresourcedefinition.apiextensions.k8s.io/kibanas.kibana.k8s.elastic.co created
Apply the operator.
$ kubectl apply -f https://download.elastic.co/downloads/eck/1.9.1/operator.yamlnamespace/elastic-system createdserviceaccount/elastic-operator createdsecret/elastic-webhook-server-cert createdconfigmap/elastic-operator createdclusterrole.rbac.authorization.k8s.io/elastic-operator createdclusterrole.rbac.authorization.k8s.io/elastic-operator-view createdclusterrole.rbac.authorization.k8s.io/elastic-operator-edit createdclusterrolebinding.rbac.authorization.k8s.io/elastic-operator createdservice/elastic-webhook-server createdstatefulset.apps/elastic-operator createdvalidatingwebhookconfiguration.admissionregistration.k8s.io/elastic-webhook.k8s.elastic.co created
Deploy Elasticsearch and Kibana pods.
$ kubectl apply -f deploy/elk-deploy.ymlelasticsearch.elasticsearch.k8s.elastic.co/quickstart createdkibana.kibana.k8s.elastic.co/quickstart created
Once deployed and Elasticsearch is up and running, you need to capture a few configurations for the Confluent Cloud connector in the next section, such as the credentials for Elasticsearch. The default username is elastic, to get the password:
$ PASSWORD=$(kubectl get secret quickstart-es-elastic-user -o go-template='{{.data.elastic | base64decode}}')
Note down the password:
$ echo $PASSWORD
You also need to note down the load balancer details (EXTERNAL-IP):
$ kubectl get svcNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGEkubernetes ClusterIP 10.100.0.1 <none> quickstart-es-default ClusterIP None <none> 9200/TCP 13hquickstart-es-http LoadBalancer 10.100.134.61 a5db09d337eca490f82cf7a6ea17adf8-668057098.ap-southeast-1.elb.amazonaws.com 9200:31983/TCP 13hquickstart-es-transport ClusterIP None <none> 9300/TCP 13hquickstart-kb-http ClusterIP 10.100.157.8 <none> 5601/TCP 13htransform ClusterIP 10.100.253.55 <none> 8080/TCP 50dvault ClusterIP 10.100.255.143 <none> 8200/TCP,8201/TCP 9dvault-agent-injector-svc ClusterIP 10.100.49.129 <none> 443/TCP 9dvault-internal ClusterIP None <none> 8200/TCP,8201/TCP 9d
Confluent Cloud connectors
Confluent Cloud connectors provide fully managed connectivity to multiple data sources and sinks. In this case, you will set up two connectors:
- Elasticsearch Service Sink connector
- Amazon S3 Sink connector
Elasticsearch service sink connector
This connector will subscribe to the app-a-engress-dev topic (containing the encrypted JSON logs) and publish all messages to an instance of Elasticsearch, to be viewed in Kibana.
In the Confluent Cloud portal, select your cluster created in Set up Confluent Cloud steps. To set up the connector:
Select Connectors left navigation menu.
In the filters, search for
Elasticsearch
and select Elasticsearch Service Sink.Choose the topic app-a-engress-dev and select Next.
On the Add Elasticsearch Service Sink connector 2. Kafka credentials choose Use an existing API key and put the API keys that you downloaded earlier.
On the 3. Authentication section, add the load balancer details you noted down earlier in the Connection URI field and append 9200 to the URI, the Connection user is elastic and the Connection password from the $PASSWORD you wrote down earlier.
In 4. Configuration the Input Kafka record value format is JSON.
Open Show advanced configurations.
Both Key ignore and Scheme ignore are true.
Data stream type and Data stream dataset are logs.
Everything else can be left with the default settings, and you can choose Continue.
In 5. Sizing, Tasks should be 1 then choose Continue.
For 6. Review and launch. the Connector name is ElasticsearchSink.
Review the settings below against the Connector configuration and if they match select Continue.
Setting | Value |
---|---|
topics | app-a-engress-dev |
Kafka Cluster Authentication mode | KAFKA_API_KEY |
Kafka API Key | Same key created in step API keys |
Kafka API Secret | Same secret created in step API keys |
Connection URI | <<loadbalancer_address>>:9200 |
Connection user | elastic |
Connection password | elastic password retrieved in step Elasticsearch and Kibana |
Enable SSL security | true |
Input messages | JSON |
Key ignore | true |
Scheme ignore | true |
Data Stream Type | logs |
Data Stream Dataset | logs |
Number of tasks for this connector | 1 |
Name | ElasticsearchSink |
If there are no errors with the configuration, after a few minutes of provisioning you should now have an operational connector:
Check connector status
On the page that appears make sure connector has a status of Running.
Application and Fluentd
The application deployment consists of two components:
- The application (app-a) itself which is a JSON data generator using the Mimesis data generator. It appends the generated JSON records to
/fluentd/log/user.log
. - The Fluentd sidecar has the fluent-plugin-kafka installed. It will track changes in the
/fluentd/log/user.log
and upload the JSON records to the app-a-ingress topic in Confluent Cloud.
The Fluentd sidecar requires a few configurations to work, including a few secrets:
- X.509 certificates for the fluent-plugin-kafka, the certificates are required by the plugin to connect to the Confluent Cloud cluster broker.
- Confluent Cloud API credentials for the fluent-plugin-kafka plugin to authenticate as a producer and push the logs to the app-a-ingress topic.
These secrets will be provided by Vault, and these configurations will be passed as part of the deployment file.
The deployment file is below and makes use of Vault Agent Sidecar Annotations to retrieve the required secrets and render the Fluentd configuration file.
---apiVersion: apps/v1kind: Deploymentmetadata:name: appspec:selector: matchLabels: app: apptemplate: metadata: labels: app: app annotations: vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/role: "app" vault.hashicorp.com/agent-cache-enable: "true" vault.hashicorp.com/agent-cache-use-auto-auth-token: "true" vault.hashicorp.com/agent-inject-secret-ca.pem: "" vault.hashicorp.com/secret-volume-path-ca.pem: "/fluentd/cert" vault.hashicorp.com/agent-inject-template-ca.pem: | {{- with secret "pki/issue/app" "common_name=app-a.service.internal" -}} {{ .Data.issuing_ca }} {{- end }} vault.hashicorp.com/agent-inject-secret-key.pem: "" vault.hashicorp.com/secret-volume-path-key.pem: "/fluentd/cert" vault.hashicorp.com/agent-inject-template-key.pem: | {{- with secret "pki/issue/app" "common_name=app-a.service.internal" -}} {{ .Data.private_key }} {{- end }} vault.hashicorp.com/agent-inject-secret-cert.pem: "" vault.hashicorp.com/secret-volume-path-cert.pem: "/fluentd/cert" vault.hashicorp.com/agent-inject-template-cert.pem: | {{- with secret "pki/issue/app" "common_name=app-a.service.internal" -}} {{ .Data.certificate }} {{- end }} vault.hashicorp.com/agent-inject-secret-fluent.conf: "" vault.hashicorp.com/secret-volume-path-fluent.conf: "/fluentd/etc" vault.hashicorp.com/agent-inject-template-fluent.conf: | <system> log_level debug </system> # TCP input <source> @type forward port 24224 </source> <source> @type tail path /fluentd/log/user.log pos_file /fluentd/log/user.pos @log_level debug tag user.log <parse> @type json </parse> </source> <match user.log> @type kafka2 # list of seed brokers brokers {{- with secret "kv/data/confluent-cloud" }} {{ .Data.data.connection_string }}{{- end }} use_event_time true # buffer settings <buffer ingress> @type file path /fluentd/td/log flush_interval 1s </buffer> # data type settings <format> @type json </format> # topic settings topic_key app-a-ingress default_topic app-a-ingress # producer settings required_acks -1 compression_codec gzip ssl_ca_cert '/fluentd/cert/ca.pem' ssl_client_cert '/fluentd/cert/cert.pem' ssl_client_cert_key '/fluentd/cert/key.pem' sasl_over_ssl true ssl_ca_certs_from_system true username {{- with secret "kv/data/confluent-cloud" }} {{ .Data.data.client_id }}{{- end }} password {{- with secret "kv/data/confluent-cloud" }} {{ .Data.data.client_secret }}{{- end }} </match> spec: serviceAccountName: app containers: - name: app env: - name: NUM_OF_RUNS value: '10' - name: PATH_TO_LOG value: '/fluentd/log/user.log' image: hashieducation/vault-confluentcloud-demo-app:latest imagePullPolicy: Always resources: limits: memory: "128Mi" cpu: "500m" ports: - containerPort: 8080 volumeMounts: - name: app-log mountPath: /fluentd/log - name: fluentd image: hashieducation/vault-confluentcloud-demo-fluentd:latest imagePullPolicy: Always resources: limits: memory: "128Mi" cpu: "500m" ports: - containerPort: 24224 volumeMounts: - name: app-log mountPath: /fluentd/log volumes: - name: app-log emptyDir: {}---kind: ServiceapiVersion: v1metadata:name: appspec:selector: app: apptype: ClusterIPports:- name: tcp port: 8080 targetPort: 8080---apiVersion: v1kind: ServiceAccountmetadata:name: app
Deploy the application:
$ kubectl apply -f ./deploy/app-deploy.yml
Once the application is deployed, it will begin to generate fake JSON data and append to the
/fluentd/log/user.json
file.
View logs in Confluent Cloud
It is possible to see the messages being published in the Confluent Cloud topic.
To view them from the Confluent Cloud portal, you will select the topic name you wish to view as shown below.
In the app-a-ingress topic, choose the Messages tab. You should see a live stream of JSON logs being pushed by app-a Fluentd sidecar. Below is an example:
Click on a message and look at the details.
In the app-a-egress-dev topic you should see a live stream of encrypted JSON logs being pushed by the Transformer. Below is an example:
Click on a message and look at the details.
The highlighted fields were encrypted successfully.
The owner.telephone
field was put through a format preserving encryption transform and the owner.NRIC
field was masked.
The owner.email
and choices.places_of_interest
fields were encrypted with Vault Transit secrets engine. The secrets engine appends the ciphertext with vault:v1
indicating that it was encrypted by Vault, using version 1 of the encryption key. This is important as Vault Transit secrets engine can also perform key rotation; tracking which version of the key was used to encrypt is necessary to be able to decrypt the data.
Architecture considerations
Below are some important considerations related to this architecture:
- The Vault configuration is in development mode and should not be used in production; TLS was not enabled on the Vault API. TLS listener should be configured in Vault.
- The Transformer optimizes encryption requests to HashiCorp Vault in batches using batch_input, which improves the encryption performance significantly.
- HashiCorp Vault Enterprise can be horizontally scaled by adding more nodes, allowing for scaling of encryption/decryption operations.
- Confluent Cloud API keys should be configured to provide least privilege access to resources such as topics. Please see Confluent Cloud API best practices for more details.
- Confluent Cloud has a number of networking options including different private networking options.
Clean up
Delete the cluster.
$ eksctl delete cluster --name cluster-1 --region=ap-southeast-12022-11-17 14:15:59 [ℹ] deleting EKS cluster "cluster-1"2022-11-17 14:16:01 [ℹ] will drain 0 unmanaged nodegroup(s) in cluster "cluster-1"2022-11-17 14:16:01 [ℹ] starting parallel draining, max in-flight of 12022-11-17 14:16:02 [ℹ] deleted 0 Fargate profile(s)2022-11-17 14:16:04 [✔] kubeconfig has been updated2022-11-17 14:16:04 [ℹ] cleaning up AWS load balancers created by Kubernetes objects of Kind Service or Ingress2022-11-17 14:16:08 [ℹ] 2 sequential tasks: { delete nodegroup "ng-33a2dd27", delete cluster control plane "cluster-1" [async] }2022-11-17 14:16:08 [ℹ] will delete stack "eksctl-cluster-1-nodegroup-ng-33a2dd27"2022-11-17 14:16:08 [ℹ] waiting for stack "eksctl-cluster-1-nodegroup-ng-33a2dd27" to get deleted2022-11-17 14:16:08 [ℹ] waiting for CloudFormation stack "eksctl-cluster-1-nodegroup-ng-33a2dd27"2022-11-17 14:16:39 [ℹ] waiting for CloudFormation stack "eksctl-cluster-1-nodegroup-ng-33a2dd27"2022-11-17 14:17:27 [ℹ] waiting for CloudFormation stack "eksctl-cluster-1-nodegroup-ng-33a2dd27"2022-11-17 14:17:59 [ℹ] waiting for CloudFormation stack "eksctl-cluster-1-nodegroup-ng-33a2dd27"2022-11-17 14:18:31 [ℹ] waiting for CloudFormation stack "eksctl-cluster-1-nodegroup-ng-33a2dd27"2022-11-17 14:20:11 [ℹ] waiting for CloudFormation stack "eksctl-cluster-1-nodegroup-ng-33a2dd27"2022-11-17 14:21:50 [ℹ] waiting for CloudFormation stack "eksctl-cluster-1-nodegroup-ng-33a2dd27"2022-11-17 14:23:33 [ℹ] waiting for CloudFormation stack "eksctl-cluster-1-nodegroup-ng-33a2dd27"2022-11-17 14:24:37 [ℹ] waiting for CloudFormation stack "eksctl-cluster-1-nodegroup-ng-33a2dd27"2022-11-17 14:25:58 [ℹ] waiting for CloudFormation stack "eksctl-cluster-1-nodegroup-ng-33a2dd27"2022-11-17 14:25:59 [ℹ] will delete stack "eksctl-cluster-1-cluster"2022-11-17 14:26:00 [✔] all cluster resources were deleted
Unset all the environment variables.
$ unset AWS_ACCESS_KEY \AWS_REGION \AWS_SECRET_ACCESS_KEY \VAULT_ADDR \VAULT_TOKEN \VAULT_HELM_SECRET_NAME \TOKEN_REVIEW_JWT \KUBE_CA_CERT \KUBE_HOST
Go into your AWS Account and double check the CloudFormation templates with the name of "cluster-1". To verify that they deleted successfully, there will be no CloudFormation stacks present.
If there were issues with the CloudFormation templates deletion you can manually delete the Load Balancer, InternetGateway and VPC associated with "cluster-1".
If you had to manually delete anything return to CloudFormation and rerun the delete stacks. After a few minutes the stacks should delete themselves.
Help and reference
HashiCorp Vault Enterprise and Confluent Cloud can work together to address various data protection requirements. This use case is not limited to just logs, but any data that is managed within Kafka/Confluent Cloud. Vault Enterprise can be deployed across any cloud and on premises, allowing it to stay near your data, minimizing latency and improving performance.
To learn more about Confluent Cloud and HashiCorp Vault, here are a few useful resources: