[ad_1]
[*]
Piotr Chotkowski, Cloud Software Growth Advisor, AWS Skilled Companies
Utilizing a Java software to course of knowledge queued in Apache Kafka is a typical use case throughout many industries. Occasion-driven and microservices architectures, for instance, typically depend on Apache Kafka for knowledge streaming and element decoupling. You should utilize it as a message queue or an occasion bus, in addition to a means to enhance resilience and reproducibility of occasions occurring within the appliance.
On this put up, I stroll you thru the method of making a easy end-to-end knowledge processing software utilizing AWS instruments and companies in addition to different business commonplace methods. We begin with a quick structure overview and an infrastructure definition. Then you definately see how with only a few strains of code you possibly can arrange an Apache Kafka cluster utilizing Amazon Managed Streaming for Apache Kafka (Amazon MSK) and the AWS Cloud Growth Package (AWS CDK). Subsequent, I present you methods to form your undertaking construction and package deal your software for deployment. We additionally take a look at the implementation particulars and the way we will create Kafka subjects in Amazon MSK cluster in addition to ship and obtain messages from Apache Kafka utilizing companies akin to AWS Lambda and AWS Fargate.
I exploit the AWS CDK to automate infrastructure creation and software deployment. The AWS CDK is an open-source software program improvement framework to outline your cloud software assets utilizing acquainted programming languages. For extra data, see the Developer Information, AWS CDK Intro Workshop, and the AWS CDK Examples GitHub repo.
All of the code offered on this put up is open sourced and out there on GitHub.
Overview of answer
The next diagram illustrates our total structure.
Triggering the TransactionHandler Lambda operate publishes messages to an Apache Kafka matter. The applying is packaged in a container and deployed to ECS Fargate, consumes messages from the Kafka matter, processes them, and shops the leads to an Amazon DynamoDB desk. The KafkaTopicHandler Lambda operate known as as soon as throughout deployment to create Kafka matter. Each the Lambda operate and the buyer software publish logs to Amazon CloudWatch.
To comply with together with this put up, you want the next conditions:
Challenge construction and infrastructure definition
The undertaking consists of three foremost elements: the infrastructure (together with Kafka cluster and Amazon DynamoDB), a Spring Boot Java client software, and Lambda producer code.
Let’s begin with exploring the infrastructure and deployment definition. It’s carried out utilizing a set of AWS CDK stacks and constructs. I’ve chosen Typescript as my language right here primarily due to private desire. Nonetheless when you choose you need to use CDK with different languages. On the time of writing, AWS CDK helps Python, TypeScript, Java, .NET and Go. For extra data, see Working with the AWS CDK.
Let’s take a look at the undertaking listing construction. All AWS CDK stacks are situated within the amazon-msk-java-app-cdk/lib listing. In amazon-msk-java-app-cdk/bin, you’ll find the primary AWS CDK app the place all the stacks are instantiated. amazon-msk-java-app-cdk/lambda incorporates code for TransactionHandler
, which publishes messages to a Kafka matter, in addition to code for KafkaTopicHandler
, which is accountable for creating Kafka matter. The enterprise logic for the Kafka client, which is a Java Maven undertaking, is within the client listing. The Dockerfile vital for Fargate container creation is situated in client/docker/Dockerfile. Lastly, doc incorporates structure diagrams and scripts incorporates the deployment script.
Establishing your Kafka cluster
The central a part of the structure is the Kafka cluster created utilizing Amazon MSK, which is comparatively simple to outline and deploy with the AWS CDK. Within the following code, I exploit the CfnCluster assemble to arrange my cluster:
new msk.CfnCluster(this, "kafkaCluster", {
brokerNodeGroupInfo: {
securityGroups: [vpcStack.kafkaSecurityGroup.securityGroupId],
clientSubnets: [...vpcStack.vpc.selectSubnets({
subnetType: ec2.SubnetType.PRIVATE
}).subnetIds],
instanceType: "kafka.t3.small",
storageInfo: {
ebsStorageInfo: {
volumeSize: 5
}
}
},
clusterName: "TransactionsKafkaCluster",
kafkaVersion: "2.7.0",
numberOfBrokerNodes: 2
});
vpcStack
within the previous code refers back to the AWS CDK stack containing the VPC definition. As a result of we’re utilizing this cluster for demonstration functions solely, I restrict storage to five GB, the occasion sort to kafka.t3.small, and the variety of dealer nodes to 2, which is the minimal allowed quantity. We don’t wish to hook up with this cluster from outdoors the VPC, so I place the cluster in a personal subnet of my VPC. For extra details about the allowed settings, see interface CfnClusterProps. To study extra about Amazon MSK, take a look at the Amazon MSK Labs workshop.
Matter creation
On the time of writing Amazon MSK doesn’t mean you can create a Kafka matter contained in the cluster utilizing the AWS service API. You possibly can solely do that by connecting on to the Kafka cluster both utilizing Kafka instruments or utilizing a library from inside the code of your software. On this undertaking I’m utilizing the AWS CDK’s customized useful resource supplier. It lets you use a customized Lambda operate to deal with AWS CloudFormation’s lifecycle occasions. The definitions of CustomResource, Supplier and Lambda operate assets you’ll find within the kafka-topic-stack.ts file and implementation of the handler Lambda operate within the kafka-topic-handler.ts file. Let’s take a look at the code of operate:
export const handler = async (occasion: any, context: any = {}): Promise<any> => {
strive {
if (occasion.RequestType === 'Create' || occasion.RequestType === 'Replace') {
let consequence = await createTopic(occasion.ResourceProperties.topicConfig);
response.ship(occasion, context, response.SUCCESS, {alreadyExists: !consequence});
} else if (occasion.RequestType === 'Delete') {
await deleteTopic(occasion.ResourceProperties.topicConfig.matter);
response.ship(occasion, context, response.SUCCESS, {deleted: true});
}
} catch (e) {
response.ship(occasion, context, response.FAILED, {motive: e});
}
}
Handler known as as soon as when the KafkaTopicStack is deployed and as soon as when it’s destroyed. I exploit the admin consumer from the KafkaJS open-source library to create Kafka matter on ‘Create’ AWS CloudFormation occasion and to destroy it on ‘Delete’ occasion. Calling KafkaJS’s createTopics
technique will resolve to true
if the subject was created efficiently or false
if it already exists.
Shopper implementation particulars
The primary objective of the Kafka client a part of this undertaking is to course of and validate incoming transaction messages and retailer leads to the DynamoDB desk. The patron software is written in Java with using the Spring Boot framework. The core a part of performance is carried out within the KafkaConsumer class. I exploit the KafkaListener annotation to outline the entry level for incoming messages. Spring takes care of a lot of the boilerplate code for us, particularly, we don’t want to put in writing the logic to manually pull messages from the Kafka matter or fear about deserialization. All it’s essential to do is present the required components within the configuration class. Within the following code, the Spring Boot configuration is situated within the ApplicationConfiguration class:
@Bean
public ConsumerFactory<String, byte[]> consumerFactory(KafkaConsumerProperties properties) {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapAddress());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, properties.getTrustStoreLocation());
LOGGER.data(configs.toString());
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConsumerFactory<String, byte[]> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, byte[]> manufacturing unit = new ConcurrentKafkaListenerContainerFactory<>();
manufacturing unit.setConsumerFactory(consumerFactory);
manufacturing unit.setMessageConverter(new ByteArrayJsonMessageConverter());
return manufacturing unit;
}
The previous code units up the Kafka client configuration. We get the bootstrap servers tackle string and Kafka client group ID from the surroundings variables which are arrange throughout software deployment. By default, Amazon MSK makes use of TLS 1.2 for safe communication, so we have to arrange SSL configuration in our software as nicely. For extra details about encryption, see Amazon MSK Encryption.
For the deserialization of incoming Kafka messages, I exploit courses offered by the Apache Kafka library. To allow Spring to deserialize Kafka JSON messages into POJOs, I exploit the ByteArrayDeserializer
class mixed with ByteArrayJsonMessageConverter
. That means, Spring merely passes bytes as is from the deserializer to the message converter, and the converter transforms bytes into Java objects utilizing Jackson’s ObjectMapper
beneath. I exploit this method as a result of it permits me to ship plaintext JSON messages. We don’t want something extra subtle for the aim of this put up. Relying in your wants, you need to use completely different mixtures of deserializers and message converters or devoted deserializers, akin to KafkaAvroDeserializer, which makes use of the schema registry to determine the goal sort.
For extra details about methods to use Apache Kafka with Spring framework please confer with the Spring documentation.
Shopper deployment
We full three high-level steps to deploy the buyer software into Fargate.
First, we have to construct and package deal our software into an executable JAR. I exploit the Apache Maven Shade plugin with Spring Boot Maven plugin dependency. It’s configured within the client software pom.xml. The JAR is created throughout the package deal part of the Maven undertaking construct and positioned within the client/docker listing subsequent to the Dockerfile.
Subsequent, we outline the picture used to create the ECS activity container. To do this, we create a Dockerfile, which is a textual content file containing all of the directions and configuration essential to assemble a Docker picture. I exploit Amazon Linux 2 as a base for the picture, moreover putting in Java 11 Amazon Corretto distribution, awslogs, and a CloudWatch agent. For the SSL configuration, we additionally want to repeat the truststore
file. In line 9, we copy the executable JAR constructed within the earlier step from the native location into the picture. The final line within the Dockerfile is an entry level beginning the buyer software. It’s an ordinary Java command:
java -cp kafka-consumer-1.0-SNAPSHOT-shaded.jar amazon.aws.samples.kafka.ConsumerApplication
Lastly, we reference the Dockerfile within the AWS CDK stack. We do that contained in the fargate-stack.ts file. We outline the infrastructure essential to run our containerized software within the ECS activity. To make use of the native Dockerfile picture definition contained in the AWS CDK stack, it’s essential to create the asset DockerImageAsset:
const picture = new belongings.DockerImageAsset(this, "ConsumerImage", {
listing: '../client/docker'
});
Subsequent, we reference this picture asset within the definition of the ECS activity utilizing the ContainerImage.fromDockerImageAsset technique:
fargateTaskDefinition.addContainer("KafkaConsumer", {
picture: ecs.ContainerImage.fromDockerImageAsset(picture),
logging: ecs.LogDrivers.awsLogs({streamPrefix: 'KafkaConsumer'}),
surroundings: {
'TABLE_NAME': this.tableName,
'GROUP_ID': this.groupId,
'BOOTSTRAP_ADDRESS': bootstrapAddress.valueAsString,
'REGION': this.area,
'TOPIC_NAME': topicName.valueAsString
}
});
In the course of the AWS CDK stack deployment, the picture outlined within the Dockerfile is created and uploaded to an Amazon Elastic Container Registry (Amazon ECR) repository. That picture is used to create and begin the ECS activity container, thereby beginning our client software. For extra details about different methods of acquiring pictures, see the Amazon ECS Assemble Library.
Producer implementation particulars
We now have our Kafka cluster and client software outlined. Now we have to publish messages to Kafka. I exploit a Lambda operate to publish messages to Kafka. All of the code of the producer is situated within the transaction-handler.ts file. I exploit the KafkaJS open-source library to speak with the Kafka cluster and ship messages.
Producer deployment
Now let’s deploy our Kafka producer code. The AWS CDK stack definition for that half is situated within the lambda-stack.ts file.
let transactionHandler = new NodejsFunction(this, "TransactionHandler", {
runtime: Runtime.NODEJS_14_X,
entry: 'lambda/transaction-handler.ts',
handler: 'handler',
vpc: vpcStack.vpc,
securityGroups: [vpcStack.lambdaSecurityGroup],
functionName: 'TransactionHandler',
timeout: Period.minutes(5),
surroundings: {
'BOOTSTRAP_ADDRESS': bootstrapAddress.valueAsString,
'TOPIC_NAME': topicName.valueAsString
}
});
This can be a comparatively brief piece of code. The AWS CDK NodejsFunction
assemble permits us to package deal our enterprise logic code and deploy it as a Node.js Lambda operate to the AWS Cloud. As a consequence of inside AWS CDK packaging and deployment logic, it makes your life simpler when you place the listing containing your Lambda code within the AWS CDK root listing subsequent to the bin and lib directories. Within the properties, within the entry
discipline, you must level to the native file containing your code. That is the relative path from the AWS CDK root listing. You possibly can cross surroundings variables within the surroundings
discipline. For this put up, I cross Kafka’s bootstrap tackle string and matter title that I would like with a purpose to talk with the Kafka cluster and ship messages from inside the Lambda operate. If esbuild is obtainable, it’s used to bundle your code in your surroundings. In any other case, bundling happens in a Docker container. Because of this when you don’t wish to use esbuild, you must begin a Docker daemon earlier than deploying your AWS CDK stack. For extra details about the NodejsFunction
assemble, see the Amazon Lambda Node.js Library.
Execution stroll by way of
As soon as we deploy the appliance it’s time to check it. To set off Lambda operate and ship a message to the Kafka queue you need to use the next AWS CLI command.
aws lambda invoke --cli-binary-format raw-in-base64-out --function-name TransactionHandler --log-type Tail --payload '{ "accountId": "account_123", "worth": 456}' /dev/stdout --query 'LogResult' --output textual content | base64 –d
Right here you’re including 456 to the steadiness of the account account_123
. Lambda operate sends JSON message to the Amazon MSK cluster. The patron software pulls the message from the Kafka matter within the type of bytes and transforms it to an occasion of POJO class. Subsequent the buyer enterprise logic executes and the appliance shops leads to the Amazon DynamoDB desk. You possibly can run following command to see the content material of the desk.
aws dynamodb scan --table-name Accounts --query "Gadgets[*].[id.S,Balance.N]" --output textual content
All of the logs from execution are saved in Amazon CloudWatch. To view them you possibly can go to AWS console or run aws logs tail
command with specified CloudWatch Logs group.
You possibly can experiment with the appliance by sending a number of messages with completely different values of accountId and worth fields of JSON payload.
Conclusion
On this put up, we mentioned completely different methods to implement and deploy your software utilizing AWS CDK constructs, Java and Typescript software code. Excessive-level AWS CDK constructs allow you to rapidly outline the cloud infrastructure of your system and allow you to focus extra on implementing your corporation logic. You should utilize a mixture of programing languages that finest suit your use case and maintain all of your code and infrastructure definitions in a single place.
To run the code offered on this put up, comply with the conditions and utilization steps described within the README file of the GitHub undertaking.
Keep tuned for extra content material about cloud software improvement. In case you have any questions or strategies, please go away a remark. I hope you have got loved studying this put up and discovered one thing new. In case you did, please share along with your colleagues. Pleased coding!
[*][ad_2]