如何解析SparkSQL外部数据源
这期内容当中小编将会给大家带来有关如何解析SparkSQL外部数据源,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
成都创新互联公司是创新、创意、研发型一体的综合型网站建设公司,自成立以来公司不断探索创新,始终坚持为客户提供满意周到的服务,在本地打下了良好的口碑,在过去的十多年时间我们累计服务了上千家以及全国政企客户,如成都自拌料搅拌车等企业单位,完善的项目管理流程,严格把控项目进度与质量监控加上过硬的技术实力获得客户的一致赞美。
场景介绍:
大数据MapReduce,Hive,Spark作业,首先需要加载数据,数据的存放源可能是HDFS、HBase、S3、OSS MongoDB;数据格式也可能为json、text、csv、parquet、jdbc..或者数据格式经过压缩,不同格式文件需要不同的解析方式,
如果需要HDFS关联MySQL数据,可以通过sqoop进行一些列转换到,如果使用External Data Source API直接加载为DF拿到数据,简单的说可以通过SparkSQL拿到外部数据源数据加载成DF。
加载方式:
build-in :内置加载外部数据如 json、text、parquet、jdbc、HDFS;
third-party:第三方加载外部数据如HBase、S3、OSS mongoDB
第三方JAR地址:https://spark-packages.org/
Maven工程需要导入gav
spark-shell:需要外部导入--package g:a:v
SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
优势:下载依赖包到本地
缺点:内网环境没有网络无法下载
一、外部数据源读取parquet文件:
Spark context Web UI available at http://hadoop001:4040
Spark context available as 'sc' (master = local[2], app id = local-1536244013147).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/people.txt").show
提示错误:/people.txt is not a Parquet file
注意:spark.read.load()底层默认读取Parquet file
scala> spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet").show
18/09/06 10:32:29 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
scala> val users = spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet")
users: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
scala> users.printSchema
root
|-- name: string (nullable = true)
|-- favorite_color: string (nullable = true)
|-- favorite_numbers: array (nullable = true)
| |-- element: integer (containsNull = true)
scala> users.show
+------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
-- 查看列,常规操作
scala> users.select("name").show
+------+ | name| +------+ |Alyssa| | Ben| +------+
二、转换操作
-- 转成json格式输出
scala> users.select("name","favorite_color").write.format("json").save("file:////home/hadoop/data/parquet/")
[hadoop@hadoop001 parquet]$ cat * {"name":"Alyssa"} {"name":"Ben","favorite_color":"red"}
-- 不采取压缩
.option("compression","none")
-- 转成text格式输出
scala> users.select("name").write.format("text").save("file:////home/hadoop/data/parquet2/")
[hadoop@hadoop001 parquet2]$ cat *
Alyssa
-- Save Modes
用法:.mode("")
1、default -- 目标目录存在,抛出异常
2、append -- 目标目录存在,重跑数据+1,无法保证数据幂等
3、overwrite-- 目标目录存在,覆盖原文件
4、ignore-- 忽略你的模式,目标纯在将不保存
三、spark-shell操作JDBC数据
-- 读取外部MySQL数据为DF
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/ruozedata").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info").option("user","root").option("password", "root").load()
-- 查看表信息
jdbcDF.show()
-- 获取本地数据
val deptDF = spark.table("dept")
-- join关联使用
deptDF.join(jdbcDF,deptDF.col("deptid") === jdbcDF.col("deptid"))
-- DF写入MySQL本地,数据类型有变化,重复写入需要加上.mode("overwrite")
jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/hive_data").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info_bak").option("user","root").option("password", "root").save()
mysql> show tables
+---------------------------+ | Tables_in_hive_data | +---------------------------+ | bucketing_cols | | cds | | city_info_bak | +---------------------------+
-- 如果想类型不发生变化指定option指定字段类型
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
四、spark-sql操作JDBC数据
-- SQL创建临时表视图,单session
CREATE TEMPORARY VIEW emp_sql USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:mysql://hadoop001:3306/ruozedata", dbtable "city_info", user 'root', password 'root' )
show tbales;
INSERT INTO TABLE emp_sql
SELECT * FROM emp_sql
五、Perdicate Push Down(PPD)
disk network CPU
外部数据外(1T)------->获取本地磁盘(1T)---------->提交到集群(1T)--------->结果(1G)
disk network CPU
外部数据外(1T)------->经过列裁剪(10G)----------->提交到集群(10G)----------->传结果(1g)
disk CPU network
外部数据外(1T)------->经过列裁剪(10G)---------->进过计算(1G)----------->传输结果
六、SparkSQL外部数据源实现机制
-- 0.有效的读取外部数据源的数据的
-- 1.buildScan扫描整张表,变成一个RDD[ROW]
trait TableScan {
def buildScan(): RDD[Row]
}
-- 2.PrunedScan获取表的裁剪列
trait PrunedScan {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}
-- 3.PrunedFilteredScan列裁剪,行过滤
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
-- 4.加载外部数据源的数据,定义数据的schema信息
abstract class BaseRelation{
}
-- 5.Relation产生BaseRelation使用
trait RelationProvider {
}
总归:
-- 查询类操作
trait PrunedScan {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}
-- 列裁剪,行过滤
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
-- 写入类操作
trait InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit
}
上述就是小编为大家分享的如何解析SparkSQL外部数据源了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注创新互联行业资讯频道。
当前文章:如何解析SparkSQL外部数据源
文章出自:http://myzitong.com/article/godsgg.html