Skip to main content

Spring Kinesis Producer

https://github.com/awslabs/amazon-kinesis-producer?tab=readme-ov-file

Kinesis Producer Library

Introduction

The Amazon Kinesis Producer Library (KPL) performs many tasks common to creating efficient and reliable producers for Amazon Kinesis. By using the KPL, customers do not need to develop the same logic every time they create a new application for data ingestion.

For detailed information and installation instructions, see the article Developing Producer Applications for Amazon Kinesis Using the Amazon Kinesis Producer Library in the Amazon Kinesis Developer Guide.

Example

Create AWS Stream

  • Go to the AWS console and search for Kinesis. Select the Kinesis Data Streams and click on the Create data Stream button.
  • Add a name for the Kinesis data stream:
    • aws-data-stream

Spring Boot Project Structure

aws-kinesis-producer-proj-struct-01.png

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>zjc.examples.kinesis</groupId>
<artifactId>aws-kinesis-kplkinesislibrary</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kplkinesislibrary</name>

<description>AWS Kinesis Example project for Spring Boot</description>

<properties>
<java.version>21</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.15.10</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.34</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<version>4.0.2</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.17.2</version>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

KplkinesislibraryApplication.java

package zjc.examples.kinesis.kplkinesislibrary;

import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import zjc.examples.kinesis.kplkinesislibrary.model.Car;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

@SpringBootApplication
@RequiredArgsConstructor
public class KplkinesislibraryApplication implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(KplkinesislibraryApplication.class);

private final KinesisProducer kinesisProducer;
private final ObjectMapper objectMapper;
private final FutureCallback<UserRecordResult> futureCallback;

public static void main(String[] args) {
SpringApplication.run(KplkinesislibraryApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
String data;
try {
data = objectMapper.writeValueAsString(new Car(UUID.randomUUID().toString(), "Toyota"));
} catch (JsonProcessingException e) {
log.info("Failed in convert object " + e.getMessage());
throw new RuntimeException(e);
}

var resultListenableFuture = kinesisProducer.addUserRecord("aws-data-stream",
UUID.randomUUID().toString(),
ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8))
);

Futures.addCallback(resultListenableFuture, futureCallback, MoreExecutors.directExecutor());
kinesisProducer.flushSync();
System.out.println("finish sending log to kinesis");
}
}

KplkinesislibraryConfig.java

package zjc.examples.kinesis.kplkinesislibrary;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KplkinesislibraryConfig {
private static final Logger log = LoggerFactory.getLogger(KplkinesislibraryConfig.class);

@Bean
public KinesisProducer kinesisProducer() {
return new KinesisProducer(kinesisProducerConfiguration());
}

@Bean
public KinesisProducerConfiguration kinesisProducerConfiguration() {
// String accessKey = System.getenv("AWS_ACCESS_KEY_ID");
// String secretKey = System.getenv("AWS_SECRET_ACCESS_KEY");
// String region = System.getenv("region");

String accessKey = "************";
String secretKey = "*******************************";
String region = "us-east-1";

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);

return new KinesisProducerConfiguration()
.setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
.setVerifyCertificate(false)
.setMaxConnections(1)
.setRegion(region)
.setRecordTtl(3000);
}

@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper();
}

@Bean
public FutureCallback<UserRecordResult> futureCallback() {
return new FutureCallback<>() {
@Override
public void onFailure(Throwable t) {
log.error(t.getMessage());
}

@Override
public void onSuccess(UserRecordResult result) {
log.info(result.getShardId() + " sequence number " + result.getSequenceNumber());
}
};
}
}

Car.java

package zjc.examples.kinesis.kplkinesislibrary.model;

public record Car(String id, String name) {
}

Source Code

https://github.com/ZbCiok/zjc-examples/tree/main/aws/aws/aws-kinesis-kplkinesislibrary