Stream Pool은 Stream의 Group을 의미합니다. 위에서 생성한 Stream에 별도의 Stream Pool을 지정하지 않은 경우 기본 Stream Pool (DefaultPool)에 포함됩니다. Stream Pool을 보면 Stream Pool에 접속을 위한 Kafka 설정을 포함하고 있습니다. 이 정보를 이용해서 Kafka 및 Flink 설정을 하게 됩니다.
Kafka 접속 정보에서 필요한 정보를 얻을 수 있지만, 자신의 계정과 연관된 AUTH_TOKEN은 생성해 줘야합니다. 다음 순서로 토큰을 생성합니다.
Flink 환경을 구성하고 소비자(Consumer)를 작성하여 실행할 것입니다. 여기서는 로컬 클러스터 환경으로 구성하도록 하겠습니다.
설치는 아래 링크를 참고합니다.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/local_setup.html
MacOS의 경우 다음과 같이 HomeBrew를 활용하면 쉽게 설치할 수 있습니다.
$ brew install apache-flink
$ flink --version
MacOS에서 HomeBrew로 설치한 경우 보통 /usr/local/Cellar/apache-flink/{버전}과 같은 위치에 설치됩니다. 정확한 경로 위치는 다음과 같이 brew info로 확인할 수 있습니다.
$ brew info apache-flink
apache-flink: stable 1.16.0, HEAD
Scalable batch and stream data processing
https://flink.apache.org/
/usr/local/Cellar/apache-flink/1.16.0 (174 files, 336MB) *
Built from source on 2022-11-04 at 14:46:29
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/apache-flink.rb
License: Apache-2.0
==> Requirements
Required: java = 1.8 ✔
==> Options
--HEAD
Install HEAD version
==> Analytics
install: 713 (30 days), 2,253 (90 days), 13,079 (365 days)
install-on-request: 714 (30 days), 2,243 (90 days), 13,028 (365 days)
build-error: 0 (30 days)
다음과 같이 이동하여 실행합니다.
$ cd /usr/local/Cellar/apache-flink/1.16.0/libexec/bin
$ ./start-cluster.sh
이제 Flink에서 제공하는 관리 콘솔에 접속해 봅니다.
Flink는 Java와 Scala를 통해 코드를 작성할 수 있습니다. 여기서는 메이븐 프로젝트와 Java를 활용하여 작성하였습니다.
Flink는 최신 버전인 1.16.0 버전을 사용하였습니다. 아래와 같이 Dependency와 Maven Plugin 설정을 합니다.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>oss-kafka-flink-consumer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<flink.version>1.16.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.2.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<!-- This list contains all dependencies of flink-dist
Everything else will be packaged into the fat-jar
-->
<exclude>org.apache.flink:flink-shaded-*</exclude>
<exclude>org.apache.flink:flink-core</exclude>
<exclude>org.apache.flink:flink-java</exclude>
<exclude>org.apache.flink:flink-scala</exclude>
<exclude>org.apache.flink:flink-runtime</exclude>
<exclude>org.apache.flink:flink-optimizer</exclude>
<exclude>org.apache.flink:flink-clients</exclude>
<exclude>org.apache.flink:flink-avro</exclude>
<exclude>org.apache.flink:flink-java-examples</exclude>
<exclude>org.apache.flink:flink-scala-examples</exclude>
<exclude>org.apache.flink:flink-streaming-examples</exclude>
<exclude>org.apache.flink:flink-streaming-java</exclude>
<!-- Also exclude very big transitive dependencies of Flink
WARNING: You have to remove these excludes if your code relies on other
versions of these dependencies.
-->
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
<exclude>org.scala-lang:scala-reflect</exclude>
<exclude>com.amazonaws:aws-java-sdk</exclude>
<exclude>com.typesafe.akka:akka-actor_*</exclude>
<exclude>com.typesafe.akka:akka-remote_*</exclude>
<exclude>com.typesafe.akka:akka-slf4j_*</exclude>
<exclude>io.netty:netty-all</exclude>
<exclude>io.netty:netty</exclude>
<exclude>org.eclipse.jetty:jetty-server</exclude>
<exclude>org.eclipse.jetty:jetty-continuation</exclude>
<exclude>org.eclipse.jetty:jetty-http</exclude>
<exclude>org.eclipse.jetty:jetty-io</exclude>
<exclude>org.eclipse.jetty:jetty-util</exclude>
<exclude>org.eclipse.jetty:jetty-security</exclude>
<exclude>org.eclipse.jetty:jetty-servlet</exclude>
<exclude>commons-fileupload:commons-fileupload</exclude>
<exclude>org.apache.avro:avro</exclude>
<exclude>commons-collections:commons-collections</exclude>
<exclude>org.codehaus.jackson:jackson-core-asl</exclude>
<exclude>org.codehaus.jackson:jackson-mapper-asl</exclude>
<exclude>com.thoughtworks.paranamer:paranamer</exclude>
<exclude>org.xerial.snappy:snappy-java</exclude>
<exclude>org.apache.commons:commons-compress</exclude>
<exclude>org.tukaani:xz</exclude>
<exclude>com.esotericsoftware.kryo:kryo</exclude>
<exclude>com.esotericsoftware.minlog:minlog</exclude>
<exclude>org.objenesis:objenesis</exclude>
<exclude>com.twitter:chill_*</exclude>
<exclude>com.twitter:chill-java</exclude>
<exclude>com.twitter:chill-avro_*</exclude>
<exclude>com.twitter:chill-bijection_*</exclude>
<exclude>com.twitter:bijection-core_*</exclude>
<exclude>com.twitter:bijection-avro_*</exclude>
<exclude>commons-lang:commons-lang</exclude>
<exclude>junit:junit</exclude>
<exclude>de.javakaffee:kryo-serializers</exclude>
<exclude>joda-time:joda-time</exclude>
<exclude>org.apache.commons:commons-lang3</exclude>
<exclude>org.slf4j:slf4j-api</exclude>
<exclude>org.slf4j:slf4j-log4j12</exclude>
<exclude>log4j:log4j</exclude>
<exclude>org.apache.commons:commons-math</exclude>
<exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
<exclude>commons-logging:commons-logging</exclude>
<exclude>org.apache.httpcomponents:httpclient</exclude>
<exclude>org.apache.httpcomponents:httpcore</exclude>
<exclude>commons-codec:commons-codec</exclude>
<exclude>com.fasterxml.jackson.core:jackson-core</exclude>
<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
<exclude>org.codehaus.jettison:jettison</exclude>
<exclude>stax:stax-api</exclude>
<exclude>com.typesafe:config</exclude>
<exclude>org.uncommons.maths:uncommons-maths</exclude>
<exclude>com.github.scopt:scopt_*</exclude>
<exclude>org.mortbay.jetty:servlet-api</exclude>
<exclude>commons-io:commons-io</exclude>
<exclude>commons-cli:commons-cli</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:*</artifact>
<excludes>
<exclude>org/apache/flink/shaded/**</exclude>
<exclude>web-docs/**</exclude>
</excludes>
</filter>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<!-- add Main-Class to manifest file -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.app.FlinkTestConsumer</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
<!-- Configure the jar plugin to add the main class as a manifest entry -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.5</version>
<configuration>
<archive>
<manifestEntries>
<Main-Class>com.example.app.FlinkTestConsumer</Main-Class>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>8</release>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<!-- A profile that does everyting correctly:
We set the Flink dependencies to provided -->
<id>build-jar</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
다음은 OCI Streaming Service 접속을 위한 설정 파일을 작성합니다. 여기서 Stream Pool에서 확인한 Kafka 접속 정보를 사용합니다.
consumer.config
bootstrap.servers=cell-1.streaming.ap-seoul-1.oci.oraclecloud.com:9092
group.id=FlinkExampleConsumer
request.timeout.ms=60000
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{테넌시명}/{사용자 계정}/{Stream Pool OCID}\" password=\"{AUTH_TOKEN}\";
Java로 작성된 코드이며, FlinkKafkaConsumer로 Stream(Topic)으로 메시지를 수신하여 출력만 하는 역할을 하는 간단한 코드입니다.
@SuppressWarnings("all")
public class FlinkTestConsumer {
private static final String TOPIC = "flink-topic";
private static final String FILE_PATH = "src/main/resources/consumer.config";
public static void main(String... args) {
try {
//Load properties from config file
Properties properties = new Properties();
properties.load(new FileReader(FILE_PATH));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<String>(TOPIC, new SimpleStringSchema(), properties));
stream.print();
env.execute("Testing flink consumer");
} catch(FileNotFoundException e){
System.out.println("FileNoteFoundException: " + e);
} catch (Exception e){
System.out.println("Failed with exception " + e);
}
}
}
빌드
$ mvn package
잡 실행
$ flink run -c "com.example.app.FlinkTestConsumer" target/oss-kafka-flink-consumer-1.0-SNAPSHOT.jar
Job has been submitted with JobID fe7e3abba870a537cf4f5a7961ae924e
잡(Job) 실행 확인
Kafka 생산자는 Java로 작성하였습니다.
Kafka Dependency 설정과 함께 Jar 실행 시 Kafka 관련 Library추가를 위한 Maven Plugin 구성을 합니다.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>oss-kafka-producer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<kafka.version>3.3.1</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.1</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>8</release>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.2</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/libs</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.5</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<addExtensions>true</addExtensions>
<mainClass>com.example.app.KafkaProducerExample</mainClass>
<classpathPrefix>libs/</classpathPrefix>
</manifest>
<!-- <manifestEntries>
<Class-Path>libs/kafka-clients-2.6.0.jar</Class-Path>
</manifestEntries> -->
</archive>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<!-- A profile that does everyting correctly:
We set the Flink dependencies to provided -->
<id>build-jar</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
실행 시 5건의 텍스트 메시지를 OCI Streaming Service로 전송하도록 다음과 같이 코드를 작성하였습니다.
@SuppressWarnings("all")
public class CompatibleProducer {
public void produce() {
String authToken = "{AUTH_TOKEN}";
String tenancyName = "{테넌시명}";
String username = "{사용자 계정}";
String streamPoolId = "{Stream Pool OCID}";
String topicName = "{Stream Name/Topic Name}";
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "cell-1.streaming.ap-seoul-1.oci.oraclecloud.com:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5000);
properties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 5000);
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
properties.put("sasl.mechanism", "PLAIN");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ tenancyName + "/"
+ username + "/"
+ streamPoolId + "\" "
+ "password=\""
+ authToken + "\";"
);
properties.put("retries", 5); // retries on transient errors and load balancing disconnection
properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB
KafkaProducer producer = new KafkaProducer(properties);
for (int i = 0; i < 5; i++) {
ProducerRecord record = new ProducerRecord(topicName, UUID.randomUUID().toString(), "Test record #" + i);
producer.send(record, (md, ex) -> {
if( ex != null ) {
ex.printStackTrace();
}
else {
System.out.println(
"Sent msg to "
+ md.partition()
+ " with offset "
+ md.offset()
+ " at "
+ md.timestamp()
);
}
});
}
producer.flush();
producer.close();
System.out.println("produced 5 messages");
}
}
빌드
$ mvn package
실행
$ java -jar target/oss-kafka-producer-1.0-SNAPSHOT.jar
00:11 INFO o.a.k.c.s.a.AbstractLogin - Successfully logged in.
00:11 INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.6.0
00:11 INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 62abe01bee039651
00:11 INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1601824282658
00:11 INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: OSS
Sent msg to 0 with offset 15 at 1601824286584
Sent msg to 0 with offset 16 at 1601824286584
Sent msg to 0 with offset 17 at 1601824286584
Sent msg to 0 with offset 18 at 1601824286584
Sent msg to 0 with offset 19 at 1601824286584
00:11 INFO o.a.k.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
produced 5 messages
게시된 메시지를 OCI Streaming Service에서 확인
Flink Console에서 Task Managers > Stdout으로 이동하여 소비한 메시지를 확인할 수 있습니다.
소스는 다음 깃헙 저장소에서 확인할 수 있습니다.
이 글은 개인적으로 얻은 지식과 경험을 작성한 글로 내용에 오류가 있을 수 있습니다. 또한 글 속의 의견은 개인적인 의견으로 특정 회사를 대변하지 않습니다.
Donghu Kim CLOUDNATIVE
oci streaming kafka flink