基于Spark的GNSS网基线向量并行化处理
2018-10-09白帆孙宁
白帆,孙宁
(1.92941部队,辽宁 葫芦岛125000;2.辽宁工程技术大学 软件学院,辽宁 葫芦岛 125105)
0 引 言
随着CORS建立得越来越多,全球卫星导航系统(GNSS)观测的数据量也越来越大,因此,数据的存储及计算面临着巨大的挑战[1]。由于计算机和网络技术的迅猛发展,大规模海量数据的分布式存储以及计算也不断发展起来,并在现实社会中得到了广泛的应用。Spark是专为大规模数据处理而设计的快速通用计算引擎,Spark的出现改善了Hadoop反复在磁盘上进行读写操作的缺陷,提高了计算大规模海量数据的计算效率。现如今,已经有很多企业引入Spark大数据集群平台以解决计算大规模复杂数据的瓶颈。针对目前全球卫星导航系统GNSS数据量大、计算效率低等问题,本文将Spark大数据集群引入到GNSS网的基线解算中,对预处理后的观测文件进行并行化计算和存储,通过实验验证了利用子网划分思想结合Spark大数据集群平台解算GNSS网基线向量的效率。
1 相关平台
1.1 Hadoop
Hadoop是一个分布式系统基础架构,用户可以充分利用集群的计算优势进行高速运算和存储。Hadoop最核心的设计是HDFS和MapReduce。
HDFS为海量的数据提供了存储,MapReduce为海量的数据提供了计算[2]。Hadoop采用多进程模型,如图1所示。
Hadoop的每个Task运行在JVM进程中,且Task不被复用。MapReduce不构建可重用资源池,Task动态申请资源且执行结束后立即释放资源。启动MapReduce中Task进程的速度慢,造成了不必要的启动时间消耗,不适合运行低延迟作业。
1.2 Spark平台
Spark是专为大规模数据处理而设计的快速通用的计算引擎,可用来构建大型的、低延迟的数据分析应用程序。Spark可将中间输出结果保存在内存中,从而不再反复读写HDFS。实际上,Spark是对Hadoop的补充,Spark通过名为Mesos的第三方集群框架可以在Hadoop文件系统中并行运行。Scala用作Spark应用程序框架,Scala可轻松地操作分布式数据集。Spark采用多线程模型,如图2所示。
Spark的每个Executor运行在JVM进程中,Executor中可运行多个ShuffleMapTask或ReduceTask,Task则是在Executor的一个线程中运行,而且Task是可共享的,在Executor中加载一次文件或者数据后可一直被Task复用,直至程序执行结束后释放资源,避免了任务重复申请资源所造成的时间花费。Spark可以建立可重用资源池来运行全部的ShuffleMapTask和ReduceTask。Spark启动任务速度快,可大大降低运行时间。
1.3 GAMIT软件
GAMIT软件是目前高精度GPS基线解算软件中最为经济、使用最广泛的一个软件。GAMIT软件是由美国麻省理工学院(MIT)和斯克里普斯海洋研究所(SIO)联合研制的GPS数据处理软件[6]。当它采用精密星历和高精度起算点时,其处理长基线和连续时段静态定位相对精度可达10e-8~10e-9 数量级,处理短基线的精度可达1~3 mm.本文使用的GAMIT版本为GAMIT10.6.
2 基于Spark的基线解算并行化处理
2.1 利用Spark实现并行化计算
Hadoop分布式文件系统(HDFS)是一种广泛使用的文件系统,而Spark支持读写很多种文件系统,可以使用任何我们想要的文件格式,Spark能够很好地使用HDFS。HDFS被设计为可以在廉价的硬件上工作,有弹性地对应节点失败,同时提供高吞吐量。Spark和HDFS可以部署在同一批机器上,这样Spark可以利用数据分布来尽量避免一些网络开销。在Spark中使用HDFS只需要将输入输出路径指定为hdfs://master:post/path.
YARN是在Hadoop中引入的集群管理器,它可以让多种数据处理构架运行在一个共享的资源池上,并且通常安装在与Hadoop文件系统(简称HDFS)相同的物理节点上。在这样配置的YARN集群上运行Spark是很有意义的,它可以让Spark在存储数据的物理节点上运行,以快速访问HDFS中的数据。基于HDFS的观测数据分布式文件系统结构如图3所示[4]。
在Spark中使用YARN主要伪代码如下所示:
第一步,设置Hadoop配置目录的环境变量hadoop-conf-dir。
第二步,使用Spark-submit提交应用作业:
exporthadoop-conf-dir=“…”;
Spark-submit——master yarn yourapp。
spark读取HDFS中的文件和写入数据到HDFS中,主要伪代码如下所示:
def main(args: Array[String]): Unit={
valconf=new SparkConf()
conf.set("spark.master", "local")
conf.set("spark.app.name", "spark demo")
valsc=new SparkContext(conf);
//读取HDFS数据
valtextFileRdd=sc.textFile("hdfs://路径")
valfRdd=textFileRdd.flatMap {-.split(" ")}
valmrdd=fRdd.map {(-, 1)}
valrbkrdd=mrdd.reduceByKey(-+-)
//写入数据到HDFS系统
rbkrdd.saveAsTextFile("hdfs://路径")
}
2.2 基于Spark的基线解算过程
第一步,由子网划分创建索引文件,其中子网编号设置为文件名,测站点名设置为文件内容,为了便于对文件内容进行操作,各名之间用空格相隔开。
第二步,客户端上传文件至HDFS(文件包括:观测文件、精密星历文件、广播星历文件、索引文件)。
第三步,建立GAMIT软件执行所需的工程文件目录。
第四步,解析索引文件后获取相应的子网的全部测站名,从HDFS中复制相应文件到对应的文件夹中。
第五步,调用sh-setup链接外部表文件tables,再调用sh-gamit进行基线解算。
需注意的是,在此过程中,一个索引文件对应一个子网进行处理,Spark分配Shuffle Map Task执行索引文件。基于Spark进行基线解算过程如图4所示。
3 实验结果及分析
3.1 实验数据及环境
实验数据:采用中国及周边地区41个IGS连续运行跟踪站2018年年积日第58天的30 s采样间隔的观测值数据文件,如图5所示。
实验环境:所有的实验都是在实验室搭建的Spark平台上运行的。平台由10个节点组成,物理机配置为Intel Core i5处理器,主频2.30 GHz,双核,内存2G.软件配置:CentOS 6.7,JDK 1.8.0,Hadoop 2.6.0,Spark 1.6.0,Scala 2.12.1.
3.2 实验结果分析
采用划分子网的思想,其中单节点对整网进行基线解算,有几个节点就划分几个子网,实验结果如表1所示。
表1 不同数量子网个数和计算节点数下的解算时间
由表1可知,随着子网个数和计算节点数的增加,运行时间在逐步减少,这说明计算效率也在增加。Spark将大数据处理任务分为n块并行处理,其计算能力优势随着计算任务复杂程度增加而扩大。在同样处理41个IGS连续运行跟踪站数据工程中,从运行消耗加速比来看,计算节点越多时间越短,计算效率越高。Spark大数据平台会比Hadoop速度提高约20倍[4],但实际中因环境及应用的不同等原因理论速度通常无法达到,因此,需要做到尽可能合理地分配计算资源,使效率最大化。
4 结束语
本文针对传统的单机方法无法应对现今对大规模复杂的GNSS观测数据的处理需求,利用Spark大数据平台和子网划分思想实现了基于Spark的基线解算过程的分布式处理,大大提高了计算的效率。可以在实际工程中合理有效地分配计算资源,在保证解算精度的前提下,降低执行时间,提高对海量GNSS数据处理能力。