Skip to main content

SDK for Java 2.x Kinesis

https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java_kinesis_code_examples.html

Example CreateStream

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package com.example.kinesis;

// snippet-start:[kinesis.java2.create.main]
// snippet-start:[kinesis.java2.create.import]
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
// snippet-end:[kinesis.java2.create.import]

/**
* Before running this Java V2 code example, set up your development
* environment, including your credentials.
*
* For more information, see the following documentation topic:
*
* https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
*/
public class CreateDataStream {
public static void main(String[] args) {

final String usage = """

Usage:
<streamName>

Where:
streamName - The Amazon Kinesis data stream (for example, StockTradeStream).
""";

if (args.length != 1) {
System.out.println(usage);
System.exit(1);
}

String streamName = args[0];
Region region = Region.US_EAST_1;
KinesisClient kinesisClient = KinesisClient.builder()
.region(region)
.build();
createStream(kinesisClient, streamName);
System.out.println("Done");
kinesisClient.close();
}

public static void createStream(KinesisClient kinesisClient, String streamName) {
try {
CreateStreamRequest streamReq = CreateStreamRequest.builder()
.streamName(streamName)
.shardCount(1)
.build();

kinesisClient.createStream(streamReq);

} catch (KinesisException e) {
System.err.println(e.getMessage());
System.exit(1);
}
}
}
// snippet-end:[kinesis.java2.create.main]

Example PutRecord

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package com.example.kinesis;

// snippet-start:[kinesis.java2.putrecord.main]
// snippet-start:[kinesis.java2.putrecord.import]
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
// snippet-end:[kinesis.java2.putrecord.import]

/**
* Before running this Java V2 code example, set up your development
* environment, including your credentials.
*
* For more information, see the following documentation topic:
*
* https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
*/
public class StockTradesWriter {
public static void main(String[] args) {
final String usage = """

Usage:
<streamName>

Where:
streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream)
""";

if (args.length != 1) {
System.out.println(usage);
System.exit(1);
}

String streamName = args[0];
Region region = Region.US_EAST_1;
KinesisClient kinesisClient = KinesisClient.builder()
.region(region)
.build();

// Ensure that the Kinesis Stream is valid.
validateStream(kinesisClient, streamName);
setStockData(kinesisClient, streamName);
kinesisClient.close();
}

public static void setStockData(KinesisClient kinesisClient, String streamName) {
try {
// Repeatedly send stock trades with a 100 milliseconds wait in between.
StockTradeGenerator stockTradeGenerator = new StockTradeGenerator();

// Put in 50 Records for this example.
int index = 50;
for (int x = 0; x < index; x++) {
StockTrade trade = stockTradeGenerator.getRandomTrade();
sendStockTrade(trade, kinesisClient, streamName);
Thread.sleep(100);
}

} catch (KinesisException | InterruptedException e) {
System.err.println(e.getMessage());
System.exit(1);
}
System.out.println("Done");
}

private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient,
String streamName) {
byte[] bytes = trade.toJsonAsBytes();

// The bytes could be null if there is an issue with the JSON serialization by
// the Jackson JSON library.
if (bytes == null) {
System.out.println("Could not get JSON bytes for stock trade");
return;
}

System.out.println("Putting trade: " + trade);
PutRecordRequest request = PutRecordRequest.builder()
.partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in
// the Supplemental Information section below.
.streamName(streamName)
.data(SdkBytes.fromByteArray(bytes))
.build();

try {
kinesisClient.putRecord(request);
} catch (KinesisException e) {
System.err.println(e.getMessage());
}
}

private static void validateStream(KinesisClient kinesisClient, String streamName) {
try {
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(streamName)
.build();

DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest);

if (!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) {
System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again.");
System.exit(1);
}

} catch (KinesisException e) {
System.err.println("Error found while describing the stream " + streamName);
System.err.println(e);
System.exit(1);
}
}
}
// snippet-end:[kinesis.java2.putrecord.main]

Example GetRecords

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package com.example.kinesis;

// snippet-start:[kinesis.java2.getrecord.main]
// snippet-start:[kinesis.java2.getrecord.import]
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import java.util.ArrayList;
import java.util.List;
// snippet-end:[kinesis.java2.getrecord.import]

/**
* Before running this Java V2 code example, set up your development
* environment, including your credentials.
*
* For more information, see the following documentation topic:
*
* https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
*/
public class GetRecords {
public static void main(String[] args) {
final String usage = """

Usage:
<streamName>

Where:
streamName - The Amazon Kinesis data stream to read from (for example, StockTradeStream).
""";

if (args.length != 1) {
System.out.println(usage);
System.exit(1);
}

String streamName = args[0];
Region region = Region.US_EAST_1;
KinesisClient kinesisClient = KinesisClient.builder()
.region(region)
.build();

getStockTrades(kinesisClient, streamName);
kinesisClient.close();
}

public static void getStockTrades(KinesisClient kinesisClient, String streamName) {
String shardIterator;
String lastShardId = null;
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(streamName)
.build();

List<Shard> shards = new ArrayList<>();
DescribeStreamResponse streamRes;
do {
streamRes = kinesisClient.describeStream(describeStreamRequest);
shards.addAll(streamRes.streamDescription().shards());

if (shards.size() > 0) {
lastShardId = shards.get(shards.size() - 1).shardId();
}
} while (streamRes.streamDescription().hasMoreShards());

GetShardIteratorRequest itReq = GetShardIteratorRequest.builder()
.streamName(streamName)
.shardIteratorType("TRIM_HORIZON")
.shardId(lastShardId)
.build();

GetShardIteratorResponse shardIteratorResult = kinesisClient.getShardIterator(itReq);
shardIterator = shardIteratorResult.shardIterator();

// Continuously read data records from shard.
List<Record> records;

// Create new GetRecordsRequest with existing shardIterator.
// Set maximum records to return to 1000.
GetRecordsRequest recordsRequest = GetRecordsRequest.builder()
.shardIterator(shardIterator)
.limit(1000)
.build();

GetRecordsResponse result = kinesisClient.getRecords(recordsRequest);

// Put result into record list. Result may be empty.
records = result.records();

// Print records
for (Record record : records) {
SdkBytes byteBuffer = record.data();
System.out.printf("Seq No: %s - %s%n", record.sequenceNumber(), new String(byteBuffer.asByteArray()));
}
}
}
// snippet-end:[kinesis.java2.getrecord.main]

Example DeleteStream

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package com.example.kinesis;

// snippet-start:[kinesis.java2.delete.main]
// snippet-start:[kinesis.java2.delete.import]
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
// snippet-end:[kinesis.java2.delete.import]

/**
* Before running this Java V2 code example, set up your development
* environment, including your credentials.
*
* For more information, see the following documentation topic:
*
* https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
*/
public class DeleteDataStream {

public static void main(String[] args) {
final String usage = """

Usage:
<streamName>

Where:
streamName - The Amazon Kinesis data stream (for example, StockTradeStream)
""";

if (args.length != 1) {
System.out.println(usage);
System.exit(1);
}

String streamName = args[0];
Region region = Region.US_EAST_1;
KinesisClient kinesisClient = KinesisClient.builder()
.region(region)
.build();

deleteStream(kinesisClient, streamName);
kinesisClient.close();
System.out.println("Done");
}

public static void deleteStream(KinesisClient kinesisClient, String streamName) {
try {
DeleteStreamRequest delStream = DeleteStreamRequest.builder()
.streamName(streamName)
.build();

kinesisClient.deleteStream(delStream);

} catch (KinesisException e) {
System.err.println(e.getMessage());
System.exit(1);
}
}
}
// snippet-end:[kinesis.java2.delete.main]