博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink批处理中的source以及sink介绍
阅读量:5277 次
发布时间:2019-06-14

本文共 4871 字,大约阅读时间需要 16 分钟。

 

一、flink在批处理中常见的source

  flink在批处理中常见的source主要有两大类:  

    1.基于本地集合的source(Collection-based-source)   

    2.基于文件的source(File-based-source)

 

 1.基于本地集合的source

      在flink最常见的创建DataSet方式有三种。   

1.使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。   

2.使用env.fromCollection(),这种方式支持多种Collection的具体类型   

3.使用env.generateSequence()方法创建基于Sequence的DataSet

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}import scala.collection.immutable.{Queue, Stack}import scala.collection.mutableimport scala.collection.mutable.{ArrayBuffer, ListBuffer}object DataSource001 {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    //0.用element创建DataSet(fromElements)    val ds0: DataSet[String] = env.fromElements("spark", "flink")    ds0.print()    //1.用Tuple创建DataSet(fromElements)    val ds1: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))    ds1.print()    //2.用Array创建DataSet    val ds2: DataSet[String] = env.fromCollection(Array("spark", "flink"))    ds2.print()    //3.用ArrayBuffer创建DataSet    val ds3: DataSet[String] = env.fromCollection(ArrayBuffer("spark", "flink"))    ds3.print()    //4.用List创建DataSet    val ds4: DataSet[String] = env.fromCollection(List("spark", "flink"))    ds4.print()    //5.用List创建DataSet    val ds5: DataSet[String] = env.fromCollection(ListBuffer("spark", "flink"))    ds5.print()    //6.用Vector创建DataSet    val ds6: DataSet[String] = env.fromCollection(Vector("spark", "flink"))    ds6.print()    //7.用Queue创建DataSet    val ds7: DataSet[String] = env.fromCollection(Queue("spark", "flink"))    ds7.print()    //8.用Stack创建DataSet    val ds8: DataSet[String] = env.fromCollection(Stack("spark", "flink"))    ds8.print()    //9.用Stream创建DataSet(Stream相当于lazy List,避免在中间过程中生成不必要的集合)    val ds9: DataSet[String] = env.fromCollection(Stream("spark", "flink"))    ds9.print()    //10.用Seq创建DataSet    val ds10: DataSet[String] = env.fromCollection(Seq("spark", "flink"))    ds10.print()    //11.用Set创建DataSet    val ds11: DataSet[String] = env.fromCollection(Set("spark", "flink"))    ds11.print()    //12.用Iterable创建DataSet    val ds12: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))    ds12.print()    //13.用ArraySeq创建DataSet    val ds13: DataSet[String] = env.fromCollection(mutable.ArraySeq("spark", "flink"))    ds13.print()    //14.用ArrayStack创建DataSet    val ds14: DataSet[String] = env.fromCollection(mutable.ArrayStack("spark", "flink"))    ds14.print()    //15.用Map创建DataSet    val ds15: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))    ds15.print()    //16.用Range创建DataSet    val ds16: DataSet[Int] = env.fromCollection(Range(1, 9))    ds16.print()    //17.用fromElements创建DataSet    val ds17: DataSet[Long] =  env.generateSequence(1,9)    ds17.print()  }}

2.基于文件的source(File-based-source)

flink支持多种存储设备上的文件,包括本地文件,hdfs文件,alluxio文件等。flink支持多种文件的存储格式,包括text文件,CSV文件等。
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment,_}object DataSource002 {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    //1.读取本地文本文件,本地文件以file://开头    val ds1: DataSet[String] = env.readTextFile("file:///Applications/flink-1.1.3/README.txt")    ds1.print()    //2.读取hdfs文本文件,hdfs文件以hdfs://开头,不指定master的短URL    val ds2: DataSet[String] = env.readTextFile("hdfs:///input/flink/README.txt")    ds2.print()    //3.读取hdfs CSV文件,转化为tuple    val path = "hdfs://qingcheng11:9000/input/flink/sales.csv"    val ds3 = env.readCsvFile[(String, Int, Int, Double)](      filePath = path,      lineDelimiter = "\n",      fieldDelimiter = ",",      lenient = false,      ignoreFirstLine = true,      includedFields = Array(0, 1, 2, 3))    ds3.print()    //4.读取hdfs CSV文件,转化为case class    case class Sales(transactionId: String, customerId: Int, itemId: Int, amountPaid: Double)    val ds4 = env.readCsvFile[Sales](      filePath = path,      lineDelimiter = "\n",      fieldDelimiter = ",",      lenient = false,      ignoreFirstLine = true,      includedFields = Array(0, 1, 2, 3),      pojoFields = Array("transactionId", "customerId", "itemId", "amountPaid")    )    ds4.print()  }}

3.基于文件的source(遍历目录)

flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。
import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.configuration.Configuration/**  * 递归读取hdfs目录中的所有文件,会遍历各级子目录  */object DataSource003 {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    // create a configuration object    val parameters = new Configuration    // set the recursive enumeration parameter    parameters.setBoolean("recursive.file.enumeration", true)    // pass the configuration to the data source    val ds1 = env.readTextFile("hdfs:///input/flink").withParameters(parameters)    ds1.print()  }}
 
 

转载于:https://www.cnblogs.com/linkmust/p/10896051.html

你可能感兴趣的文章
PAT甲级——1101 Quick Sort (快速排序)
查看>>
python创建进程的两种方式
查看>>
1.2 基础知识——关于猪皮(GP,Generic Practice)
查看>>
迭代器Iterator
查看>>
java易错题----静态方法的调用
查看>>
php建立MySQL数据表
查看>>
最简单的线程同步的例子
查看>>
旅途上看的电影和观后感
查看>>
Ztree异步树加载
查看>>
关于IE和火狐,谷歌,Safari对Html标签Object和Embed的支持问题
查看>>
poj3320 Jessica's Reading Problem(尺取思路+STL)
查看>>
分布式计算开源框架Hadoop介绍
查看>>
安卓平台接口剖析
查看>>
坏的事情不都会带来坏的结果
查看>>
RPC的基础:调研EOS插件http_plugin
查看>>
第二次团队冲刺第二天
查看>>
bzoj 2257 (JSOI 2009) 瓶子与燃料
查看>>
11)Java abstract class 和 interface
查看>>
使用xrdp或Xmanager 远程连接 CentOS6
查看>>
Linux误删恢复
查看>>