IT教程 ·

Flink系统之Table API 和 SQL

golang中使用Shutdown特性对http服务进行优雅退出使用总结

  Flink供应了像表一样处置惩罚的API和像实行SQL语句一样把效果集举行实行。如许很轻易的让人人举行数据处置惩罚了。比方实行一些查询,在无界数据和批处置惩罚的使命上,然后将这些按肯定的花样举行输出,很轻易的让人人像实行SQL一样简朴。

  本日重要写的东西分为以下几个方面,然后遵照着下边几个方面举行睁开:

  1. Flink的差别API的层级梗概。

  2. FlinkSQL的编程的步骤。

  3. Flink编程的例子。

  

一、  Flink有着差别级别的API,差别级别的API轻易差别用户举行处置惩罚。普通用户运用Datastream以及Dataset举行程序编写,我们能够在其更高的基础上运用Table API以及SQL,这也是Flink的壮大的地方,能够像运用处置惩罚表一样处置惩罚数据。假如想研讨的更高能够看更底层的东西。

SQL  High-level Language
Table API Declarative  DSL
Datastream / Dataset API Core API
Stateful Stream Processing Low-level building block

(streams, state, [event] time)

二、 Flink的Table API 和 SQL编程步骤以下:

  1) 建立一个TableEnvironment表环境用于后续运用。TableEnvironment是 SQL 和 Table API的中心观点,它用于设置实行所须要的数据属性,和ExecutionEnvironment相似,它重要担任:

    a) 注册表数据源,从内部或许外部泉源。

    b) 实行响应的SQL语句。

    c) 注册自定义集数。

    d 将效果集举行扫描和写入到目的数据源。

    e) 雷同的environment能够实行响应的join unin操纵。

  2)接下来,我们看一下怎样注册数据源,注重差别的Flink版本有差别的完成,然则中心的内容是稳定的:

    a) 能够直接从数据集里举行注册。比方 tableEnvironment.registerDataSet()。

    b) 在一个已存在的Table中直接实行scan或许select,那末会生成一个新的Table,也就是数据能够从已有的Table中再次猎取,Table t = tableEnv.scan("x").select("a, b,c")。

    c) 能够是TableSource, 也就是从差别的文件、数据库、音讯体系举行读取。 比方csv文件,TableSource csvSource = new CsvTableSource("path/to/file")。

  3)读取完数据后举行处置惩罚,处置惩罚完以后要存储起来,那末须要Sink(存储)到文件或许数据库、音讯体系等。

    a) 比方Sink到CSV文件。 TableSink csvSink = new TableCSVSink("path/to/sink", ..)。

    b) Sink为指定字段句和范例到CSV文件中。

      指定表字段: String[] fieldNames = {"fild1", "filed2", "field3"}; 

      指定字段范例: TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; 

      指定表名和csv文件:tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);

三、接下来,看一下实在的例子。

    1)从给定的单词和单词的个数中统计一下,每一个单词涌现的数据,运用SQL语句举行完成查询统计。完全的样例以下(注重,差别的FLink版本完成上有轻微的差别):

package myflink.sql;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;


public class WordCountSQL {

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.getTableEnvironment(env);

        DataSet<WC> input = env.fromElements(
                WC.of("hello", 1),
                WC.of("hqs", 1),
                WC.of("world", 1),
                WC.of("hello", 1)
        );
        //注册数据集
        tEnv.registerDataSet("WordCount", input, "word, frequency");

        //实行SQL,并效果集做为一个新表
        Table table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");

        DataSet<WC> result = tEnv.toDataSet(table, WC.class);

        result.print();

    }

    public static class WC {
        public String word; //hello
        public long frequency;

        //建立组织要领,让flink举行实例化
        public WC() {}

        public static WC of(String word, long frequency) {
            WC wc = new WC();
            wc.word = word;
            wc.frequency = frequency;
            return wc;
        }

        @Override
        public String toString() {
            return "WC " + word + " " + frequency;
        }
    }

}

  输出的效果为,和我们想的效果是一样的。

WC world 1
WC hello 2
WC hqs 1

2)接下来的例子会庞杂一些,从一个txt文件中读取数据,txt文件中包括id, 人字, 书名,价钱信息。然后将数据注册成一个表,然后将这个表的效果举行统计,按人名统计出来这个人买书所消费的钱,将效果sink到一个文件中。上代码。

package myflink.sql;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;

public class SQLFromFile {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

        env.setParallelism(1);
        //读取文件
        DataSource<String> input = env.readTextFile("test.txt");
        //将读取到的文件举行输出
        input.print();
        //转换为DataSet
        DataSet<Orders> inputDataSet = input.map(new MapFunction<String, Orders>() {
            @Override
            public Orders map(String s) throws Exception {
                String[] splits = s.split(" ");
                return Orders.of(Integer.valueOf(splits[0]), String.valueOf(splits[1]), String.valueOf(splits[2]), Double.valueOf(splits[3]));
            }
        });
        //转换为table
        Table order = tableEnv.fromDataSet(inputDataSet);
        //注册Orders表名
        tableEnv.registerTable("Orders", order);
        Table nameResult = tableEnv.scan("Orders").select("name");
        //输出一下表
        nameResult.printSchema();

        //实行一下查询
        Table sqlQueryResult = tableEnv.sqlQuery("select name, sum(price) as total from Orders group by name order by total desc");
        //查询效果转换为DataSet
        DataSet<Result> result = tableEnv.toDataSet(sqlQueryResult, Result.class);
        result.print();

        //以tuple的体式格局举行输出
        result.map(new MapFunction<Result, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> map(Result result) throws Exception {
                String name = result.name;
                Double total = result.total;
                return Tuple2.of(name, total);
            }
        }).print();

        TableSink sink  = new CsvTableSink("SQLText.txt", " | ");

        //设置字段名
        String[] filedNames = {"name", "total"};
        //设置字段范例
        TypeInformation[] filedTypes = {Types.STRING(), Types.DOUBLE()};

        tableEnv.registerTableSink("SQLTEXT", filedNames, filedTypes, sink);

        sqlQueryResult.insertInto("SQLTEXT");

        env.execute();

    }

    public static class Orders {
        public Integer id;
        public String name;
        public String book;
        public Double price;

        public Orders() {
            super();
        }

        public static Orders of(Integer id, String name, String book, Double price) {
            Orders orders = new Orders();
            orders.id = id;
            orders.name = name;
            orders.book = book;
            orders.price = price;
            return orders;
        }
    }

    public static class Result {
        public String name;
        public Double total;

        public Result() {
            super();
        }

        public static Result of(String name, Double total) {
            Result result = new Result();
            result.name = name;
            result.total = total;
            return result;
        }
    }

}

    

Scala函数式编程(五) 函数式的错误处理

参与评论