CodeSnips

Kafka with AWS MSK and an S3 archive via Kafka Connect

terraform {
    required_providers {
        kafka = {
            source = "Mongey/kafka"
            version = "0.5.3"
        }
        http = {
            source = "hashicorp/http"
        }
    }
}

variable "vpc-id" {  
    type = string  
}

variable "msk-cluster-name" {  
    type = string 
}  

variable "archive-bucket-name" {  
    type = string 
}  

variable "archived-topics" {  
    type = list(string)  
}

data "aws_vpc" "vpc" {  
    id = var.vpc-id
}  

data "aws_subnets" "private" {
    filter {
        name = "vpc-id"
        values = [data.aws_vpc.vpc.id]
    }
    filter {
        # Assumes private=1 is the tag for private subnets
        # Adjust based on exact subnet setup
        name = "private"
        values = ["1"]  
    }
}

# Allows access from all traffic in VPC private subnets
# Adjust if more fine-grained security is required
module "security_group" {  
    source = "terraform-aws-modules/security-group/aws"  
    version = "~> 5.0"  

    name = "${var.msk-cluster-name}-access"
    description = "Security group for MSK ${var.msk-cluster-name}"
    vpc_id = data.aws_vpc.vpc.id

    ingress_cidr_blocks = [data.aws_vpc.vpc.cidr_block]
    ingress_rules = [
        "kafka-broker-tcp",
        "kafka-broker-tls-tcp"
    ]
}  

resource "aws_s3_bucket" "archive" {  
    bucket = var.archive-bucket-name
}  

resource "aws_s3_bucket_lifecycle_configuration" "expiration_policy" {  
    bucket = aws_s3_bucket.archive.id
    rule {
        status = "Enabled"

        expiration {
            # Adjust as desired
            days = 90
        }

        filter {
            prefix = "topics/"
        }

        id = "topics"
    }
}

data "http" "file" {
    # Note version needs updating when appropriate
    url = "https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.5.1/confluentinc-kafka-connect-s3-10.5.1.zip"  
}

# Pretend contents are sensitive to avoid it printing binary data
resource "local_sensitive_file" "confluentinc-kafka-connect-s3" {  
    content_base64 = data.http.file.response_body_base64
    filename = "confluentinc-kafka-connect-s3.zip"
}

resource "aws_s3_object" "s3_sink_msk_connect_custom_plugin_code" {
    bucket = aws_s3_bucket.archive.id
    key = local_sensitive_file.confluentinc-kafka-connect-s3.filename
    source = local_sensitive_file.confluentinc-kafka-connect-s3.filename
    etag = local_sensitive_file.confluentinc-kafka-connect-s3.content_sha512
}  

module "msk_cluster" {  
    source = "terraform-aws-modules/msk-kafka-cluster/aws"  

    name = var.msk-cluster-name  
    kafka_version = "3.2.0"
    # Adjust as desired
    number_of_broker_nodes = 4  

    # Adjust as desired so long as it matches no. of nodes
    broker_node_client_subnets = [data.aws_subnets.subnets.ids.0, data.aws_subnets.subnets.ids.1]
    # Adjust as desired
    broker_node_instance_type = "kafka.m5.large"  
    # Adjust as desired
    broker_node_storage_info = {
        ebs_storage_info = { volume_size = 100 }
    }

    broker_node_security_groups = [  
        module.security_group.security_group_id  
    ]

    configuration_name = "${var.msk-cluster-name}-config"
    # Adjust as desired
    configuration_server_properties = {  
        "auto.create.topics.enable" = true  
        "default.replication.factor" = 2
        "min.insync.replicas" = 1
        "num.io.threads" = 12
        "num.network.threads" = 10
        "num.partitions" = 24
        "num.replica.fetchers" = 2
        "replica.lag.time.max.ms" = 30000
        "unclean.leader.election.enable" = true
        "zookeeper.session.timeout.ms" = 18000
        "log.retention.hours" = 3
    }  

    connect_custom_plugins = {  
        s3_sink = {  
            name = "${var.msk-cluster-name}-s3-sink"  
            description = "Custom Plugin for S3 Sink"  
            content_type = "ZIP"

            s3_bucket_arn = aws_s3_bucket.archive.arn
            s3_file_key = aws_s3_object.s3_sink_msk_connect_custom_plugin_code.key

            timeouts = {
                create = "60m"
            }
        }
    }
}

resource "aws_mskconnect_connector" "s3_sink" {  
    name = "${var.msk-cluster-name}-archiver"  

    kafkaconnect_version = "2.7.1"  

    capacity {
        provisioned_capacity {
            worker_count = 1
        }
    }

    # Assumes JSON data and partitioned by the hour
    # See https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html
    connector_configuration = {  
        "connector.class" = "io.confluent.connect.s3.S3SinkConnector"  
        "s3.region" = "us-west-2",  
        "format.class" = "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",  
        "value.converter" = "org.apache.kafka.connect.converters.ByteArrayConverter",  
        "format.bytearray.extension" = ".json",  
        "path.format" = "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH"  
        "flush.size" = "1",  
        "schema.compatibility" = "NONE",  
        "topics" = var.archived-topics,
        "tasks.max" = "1",  
        "partitioner.class" = "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",  
        "locale" = "en-GB",  
        "timezone" = "UTC",  
        "partition.duration.ms" = "3600000"  
        "storage.class" = "io.confluent.connect.s3.storage.S3Storage",  
        "s3.bucket.name" = aws_s3_bucket.archive.id,  
    }  

    kafka_cluster {
        apache_kafka_cluster {
            bootstrap_servers = module.msk_cluster.bootstrap_brokers_tls

            vpc {  
                security_groups = [module.security_group.security_group_id]  
                subnets = [data.aws_subnets.subnets.ids.0, data.aws_subnets.subnets.ids.1]  
            }
        }
    }

    kafka_cluster_client_authentication {
        authentication_type = "NONE"
    }

    kafka_cluster_encryption_in_transit {
        encryption_type = "TLS"
    }

    plugin {  
        custom_plugin {  
            arn = module.msk_cluster.connect_custom_plugins.s3_sink.arn  
            revision = module.msk_cluster.connect_custom_plugins.s3_sink.latest_revision  
        }
    }

    log_delivery {  
        worker_log_delivery {  
            cloudwatch_logs {  
                enabled = true  
                log_group = aws_cloudwatch_log_group.kafka-archiver-logs.name  
            }  
        }  
    }  

    service_execution_role_arn = aws_iam_role.connector_role.arn  
}  

resource "aws_cloudwatch_log_group" "kafka-archiver-logs" {  
    name = "${var.msk-cluster-name}-archiver"
}  

resource "aws_iam_role" "connector_role" {
    name = "${var.msk-cluster-name}-archiver-role"
    assume_role_policy = jsonencode({
        Version = "2012-10-17"  
        Statement = [  
            {  
                "Effect" : "Allow",  
                "Principal" : {  
                    "Service" : "kafkaconnect.amazonaws.com"  
                },  
                "Action" : "sts:AssumeRole"  
            }  
        ]  
    })  
}

resource "aws_iam_role_policy" "connector_role_policy" {  
    name = "${var.msk-cluster-name}-archiver-role-policy"  
    role = aws_iam_role.connector_role.id  
    policy = jsonencode({  
        Version = "2012-10-17"  
        Statement = [  
            {  
                "Effect" : "Allow",  
                "Action" : [  
                    "s3:ListAllMyBuckets"  
                ],  
                "Resource" : "arn:aws:s3:::*"  
            },  
            {  
                "Effect" : "Allow",  
                "Action" : [  
                    "s3:ListBucket",  
                    "s3:GetBucketLocation",  
                    "s3:DeleteObject"  
                ],  
                "Resource" : "arn:aws:s3:::*"  
            },  
            {  
                "Effect" : "Allow",  
                "Action" : [  
                    "s3:PutObject",  
                    "s3:GetObject",  
                    "s3:AbortMultipartUpload",  
                    "s3:ListMultipartUploadParts",  
                    "s3:ListBucketMultipartUploads"  
                ],  
                "Resource" : "*"  
            }  
        ]  
    })  
}