实时数据相关

flink 读取mysql并使用flink sql

2019-01-02  本文已影响0人  岳过山丘

1.mysql连接


//t1只有一列a int型

TypeInformation[] fieldTypes =new TypeInformation[]{

BasicTypeInfo.INT_TYPE_INFO

};

RowTypeInfo rowTypeInfo =new RowTypeInfo(fieldTypes);

JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()

.setDrivername("com.mysql.jdbc.Driver")

.setDBUrl("jdbc:mysql://localhost/test1")

.setUsername("root")

.setPassword("123456")

.setQuery("select *  from t1")

.setRowTypeInfo(rowTypeInfo)

.finish();

2.flink sql


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSource s = env.createInput(jdbcInputFormat); //datasource

BatchTableEnvironment tableEnv =new BatchTableEnvironment(env, TableConfig.DEFAULT());

tableEnv.registerDataSet("t2", s, "a");

tableEnv.sqlQuery("select * from t2").printSchema();

Table query = tableEnv.sqlQuery("select * from t2").where("a > 1");

DataSet result = tableEnv.toDataSet(query, Row.class);

result.print();

System.out.println(s.count());

3.dependency

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.7.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-java</artifactId>

    <version>${flink.version}</version>

</dependency>

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-scala_${scala.binary.version}</artifactId>

    <version>${flink.version}</version>

</dependency>

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-jdbc_${scala.binary.version}</artifactId>

    <version>${flink.version}</version>

</dependency>

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>

    <version>${flink.version}</version>

    <scope>provided</scope>

</dependency>

<!-- <dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-table-common</artifactId>

    <version>${flink.version}</version>

</dependency>-->

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-table_${scala.binary.version}</artifactId>

    <version>1.7.0</version>

</dependency>

<!-- Add connector dependencies here. They must be in the default scope (compile). -->

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>

    <version>${flink.version}</version>

</dependency>

<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->

<dependency>

    <groupId>mysql</groupId>

    <artifactId>mysql-connector-java</artifactId>

    <version>5.1.46</version>

</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->

<!-- These dependencies are excluded from the application JAR by default. -->

<dependency>

    <groupId>org.slf4j</groupId>

    <artifactId>slf4j-log4j12</artifactId>

    <version>1.7.7</version>

    <scope>runtime</scope>

</dependency>

<dependency>

    <groupId>log4j</groupId>

    <artifactId>log4j</artifactId>

    <version>1.2.17</version>

    <scope>runtime</scope>

</dependency>

上一篇 下一篇

猜你喜欢

热点阅读