Getting started with apache-flink
Remarks#
This section provides an overview of what apache-flink is, and why a developer might want to use it.
It should also mention any large subjects within apache-flink, and link out to the related topics. Since the Documentation for apache-flink is new, you may need to create initial versions of those related topics.
Overview and requirements
What is Flink
Like Apache Hadoop and Apache Spark, Apache Flink is a community-driven open source framework for distributed Big Data Analytics. Written in Java, Flink has APIs for Scala, Java and Python, allowing for Batch and Real-Time streaming analytics.
Requirements
- a UNIX-like environment, such as Linux, Mac OS X or Cygwin;
- Java 6.X or later;
- [optional] Maven 3.0.4 or later.
Stack
Execution environments
Apache Flink is a data processing system and an alternative to Hadoop’s MapReduce component. It comes with its own runtime rather than building on top of MapReduce. As such, it can work completely independently of the Hadoop ecosystem.
The ExecutionEnvironment
is the context in which a program is executed. There are different environments you can use, depending on your needs.
-
JVM environment: Flink can run on a single Java Virtual Machine, allowing users to test and debug Flink programs directly from their IDE. When using this environment, all you need is the correct maven dependencies.
-
Local environment: to be able to run a program on a running Flink instance (not from within your IDE), you need to install Flink on your machine. See local setup.
-
Cluster environment: running Flink in a fully distributed fashion requires a standalone or a yarn cluster. See the cluster setup page or this slideshare for more information.
mportant__: the 2.11
in the artifact name is the scala version, be sure to match the one you have on your system.
APIs
Flink can be used for either stream or batch processing. They offer three APIs:
- DataStream API: stream processing, i.e. transformations (filters, time-windows, aggregations) on unbounded flows of data.
- DataSet API: batch processing, i.e. transformations on data sets.
- Table API: a SQL-like expression language (like dataframes in Spark) that can be embedded in both batch and streaming applications.
Building blocks
At the most basic level, Flink is made of source(s), transformations(s) and sink(s).
At the most basic level, a Flink program is made up of:
- Data source: Incoming data that Flink processes
- Transformations: The processing step, when Flink modifies incoming data
- Data sink: Where Flink sends data after processing
Sources and sinks can be local/HDFS files, databases, message queues, etc. There are many third-party connectors already available, or you can easily create your own.
Local runtime setup
-
ensure you have java 6 or above and that the
JAVA_HOME
environment variable is set. -
download the latest flink binary here:
wget flink-XXXX.tar.gz
If you don’t plan to work with Hadoop, pick the hadoop 1 version. Also, note the scala version you download, so you can add the correct maven dependencies in your programs.
-
start flink:
tar xzvf flink-XXXX.tar.gz ./flink/bin/start-local.sh
Flink is already configured to run locally. To ensure flink is running, you can inspect the logs in
flink/log/
or open the flink jobManager’s interface running onhttps://localhost:8081
. -
stop flink:
./flink/bin/stop-local.sh
Flink Environment setup
To run a flink program from your IDE(we can use either Eclipse or Intellij IDEA(preffered)), you need two dependencies:flink-java
/ flink-scala
and flink-clients
(as of february 2016). These JARS can be added using Maven and SBT(if you are using scala).
- Maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.1.4</version>
</dependency>
-
SBT name := ” ”
version := "1.0" scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.flink" %% "flink-scala" % "1.2.0", "org.apache.flink" %% "flink-clients" % "1.2.0" )
important: the 2.11
in the artifact name is the scala version, be sure to match the one you have on your system.
WordCount - Table API
This example is the same as WordCount, but uses the Table API. See WordCount for details about execution and results.
Maven
To use the Table API, add flink-table
as a maven dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.1.4</version>
</dependency>
The code
public class WordCountTable{
public static void main( String[] args ) throws Exception{
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment( env );
// get input data
DataSource<String> source = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles"
);
// split the sentences into words
FlatMapOperator<String, String> dataset = source
.flatMap( ( String value, Collector<String> out ) -> {
for( String token : value.toLowerCase().split( "\\W+" ) ){
if( token.length() > 0 ){
out.collect( token );
}
}
} )
// with lambdas, we need to tell flink what type to expect
.returns( String.class );
// create a table named "words" from the dataset
tableEnv.registerDataSet( "words", dataset, "word" );
// word count using an sql query
Table results = tableEnv.sql( "select word, count(*) from words group by word" );
tableEnv.toDataSet( results, Row.class ).print();
}
}
Note: For a version using Java < 8, replace the lambda by an anonymous class:
FlatMapOperator<String, String> dataset = source.flatMap( new FlatMapFunction<String, String>(){
@Override
public void flatMap( String value, Collector<String> out ) throws Exception{
for( String token : value.toLowerCase().split( "\\W+" ) ){
if( token.length() > 0 ){
out.collect( token );
}
}
}
} );
WordCount
Maven
Add the dependencies flink-java
and flink-client
(as explained in the JVM environment setup example).
The code
public class WordCount{
public static void main( String[] args ) throws Exception{
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// input data
// you can also use env.readTextFile(...) to get words
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap( new LineSplitter() )
// group by the tuple field "0" and sum up tuple field "1"
.groupBy( 0 )
.aggregate( Aggregations.SUM, 1 );
// emit result
counts.print();
}
}
LineSplitter.java
:
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>>{
public void flatMap( String value, Collector<Tuple2<String, Integer>> out ){
// normalize and split the line into words
String[] tokens = value.toLowerCase().split( "\\W+" );
// emit the pairs
for( String token : tokens ){
if( token.length() > 0 ){
out.collect( new Tuple2<String, Integer>( token, 1 ) );
}
}
}
}
If you use Java 8, you can replace .flatmap(new LineSplitter())
by a lambda expression:
DataSet<Tuple2<String, Integer>> counts = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
// normalize and split the line into words
String[] tokens = value.toLowerCase().split( "\\W+" );
// emit the pairs
for( String token : tokens ){
if( token.length() > 0 ){
out.collect( new Tuple2<>( token, 1 ) );
}
}
} )
// group by the tuple field "0" and sum up tuple field "1"
.groupBy( 0 )
.aggregate( Aggregations.SUM, 1 );
Execution
From the IDE: simply hit run in your IDE. Flink will create an environment inside the JVM.
From the flink command line: to run the program using a standalone local environment, do the following:
-
ensure flink is running (
flink/bin/start-local.sh
); -
create a jar file (
maven package
); -
use the
flink
command-line tool (in thebin
folder of your flink installation) to launch the program:flink run -c your.package.WordCount target/your-jar.jar
The
-c
option allows you to specify the class to run. It is not necessary if the jar is executable/defines a main class.
Result
(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)
WordCount - Streaming API
This example is the same as WordCount, but uses the Table API. See WordCount for details about execution and results.
Maven
To use the Streaming API, add flink-streaming
as a maven dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.4</version>
</dependency>
The code
public class WordCountStreaming{
public static void main( String[] args ) throws Exception{
// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data
DataStreamSource<String> source = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles"
);
source
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
// emit the pairs
for( String token : value.toLowerCase().split( "\\W+" ) ){
if( token.length() > 0 ){
out.collect( new Tuple2<>( token, 1 ) );
}
}
} )
// due to type erasure, we need to specify the return type
.returns( TupleTypeInfo.getBasicTupleTypeInfo( String.class, Integer.class ) )
// group by the tuple field "0"
.keyBy( 0 )
// sum up tuple on field "1"
.sum( 1 )
// print the result
.print();
// start the job
env.execute();
}
}