博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【spark2】【源码学习】【代码】spark2的textFile()是怎么实例化各个不同的fs(FileSystem)
阅读量:2156 次
发布时间:2019-05-01

本文共 11285 字,大约阅读时间需要 37 分钟。

参考博客内 “【spark2】【源码学习】【分区数】spark读取 本地/可分割/单个 的文件时是如何划分分区”, 直接跳到 getPartitions 这个方法。

一、获取RDD分区的函数 getPartitions()

// HadoopRDD.scala  override def getPartitions: Array[Partition] = {
val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) try {
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) print("\n>>>>>> pgx code <<<<<<\n") print("allInputSplits.size: " + allInputSplits.size) print("\n") print("minPartitions: " + minPartitions) print("\n>>>>>> pgx code <<<<<<\n") val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0) } else {
allInputSplits } val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i)) } array } catch {
case e: InvalidInputException if ignoreMissingFiles => logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" + s" partitions returned from this path.", e) Array.empty[Partition] } }

二、获取分区快大小的函数 getSplits()

getSplits() 返回分区的信息

listStatus() 根据文件路径,返回这个文件的相关属性,用于后续的文件操作。
singleThreadedListStatus() 开启线程具体做获取文件属性的事情

// FileInputFormat.java  /** Splits files returned by {@link #listStatus(JobConf)} when   * they're too big.*/   public InputSplit[] getSplits(JobConf job, int numSplits)    throws IOException {
Stopwatch sw = new Stopwatch().start(); FileStatus[] files = listStatus(job); // Save the number of input files for metrics/loadgen job.setLong(NUM_INPUT_FILES, files.length); long totalSize = 0; // compute total size for (FileStatus file: files) {
// check we have valid files if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); } long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); // generate splits ArrayList
splits = new ArrayList
(numSplits); NetworkTopology clusterMap = new NetworkTopology(); for (FileStatus file: files) {
Path path = file.getPath(); long length = file.getLen(); if (length != 0) {
FileSystem fs = path.getFileSystem(job); BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else {
blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } } else {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else {
//Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } sw.stop(); if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()); } return splits.toArray(new FileSplit[splits.size()]); } /** List input directories. * Subclasses may override to, e.g., select only files matching a regular * expression. * * @param job the job to list input paths for * @return array of FileStatus objects * @throws IOException if zero items. */ protected FileStatus[] listStatus(JobConf job) throws IOException {
Path[] dirs = getInputPaths(job); if (dirs.length == 0) {
throw new IOException("No input paths specified in job"); } // get tokens for all the required FileSystems.. TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); // Whether we need to recursive look into the directory structure boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); // creates a MultiPathFilter with the hiddenFileFilter and the // user provided one (if any). List
filters = new ArrayList
(); filters.add(hiddenFileFilter); PathFilter jobFilter = getInputPathFilter(job); if (jobFilter != null) {
filters.add(jobFilter); } PathFilter inputFilter = new MultiPathFilter(filters); FileStatus[] result; int numThreads = job .getInt( org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS, org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS); Stopwatch sw = new Stopwatch().start(); if (numThreads == 1) {
List
locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]); } else {
Iterable
locatedFiles = null; try { LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( job, dirs, recursive, inputFilter, false); locatedFiles = locatedFileStatusFetcher.getFileStatuses(); } catch (InterruptedException e) { throw new IOException("Interrupted while getting file statuses"); } result = Iterables.toArray(locatedFiles, FileStatus.class); } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis()); } LOG.info("Total input paths to process : " + result.length); return result; } private List
singleThreadedListStatus(JobConf job, Path[] dirs, PathFilter inputFilter, boolean recursive) throws IOException { List
result = new ArrayList
(); List
errors = new ArrayList
(); for (Path p: dirs) { FileSystem fs = p.getFileSystem(job); FileStatus[] matches = fs.globStatus(p, inputFilter); if (matches == null) { errors.add(new IOException("Input path does not exist: " + p)); } else if (matches.length == 0) { errors.add(new IOException("Input Pattern " + p + " matches 0 files")); } else { for (FileStatus globStat: matches) { if (globStat.isDirectory()) { RemoteIterator
iter = fs.listLocatedStatus(globStat.getPath()); while (iter.hasNext()) { LocatedFileStatus stat = iter.next(); if (inputFilter.accept(stat.getPath())) { if (recursive && stat.isDirectory()) { addInputPathRecursively(result, fs, stat.getPath(), inputFilter); } else { result.add(stat); } } } } else { result.add(globStat); } } } } if (!errors.isEmpty()) { throw new InvalidInputException(errors); } return result; }

三、获取对应的fs实例

根据不同的文件系统实例化不同的fs。

getFileSystem() 在 singleThreadedListStatus() 中被调用
get(URI uri, Configuration conf)
CACHE.get(URI uri, Configuration conf)
getInternal(URI uri, Configuration conf, Key key)
createFileSystem(URI uri, Configuration conf)
getFileSystemClass(String scheme, Configuration conf) 在这里看截图,根据不同的scheme获取不同的fs实例

在这里插入图片描述

// FileSystem.java    /** Return the FileSystem that owns this Path. */  public FileSystem getFileSystem(Configuration conf) throws IOException {
return FileSystem.get(this.toUri(), conf); } /** Returns the FileSystem for this URI's scheme and authority. The scheme * of the URI determines a configuration property name, * fs.scheme.class whose value names the FileSystem class. * The entire URI is passed to the FileSystem instance's initialize method. */ public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme(); String authority = uri.getAuthority(); if (scheme == null && authority == null) {
// use default FS return get(conf); } if (scheme != null && authority == null) {
// no authority URI defaultUri = getDefaultUri(conf); if (scheme.equals(defaultUri.getScheme()) // if scheme matches default && defaultUri.getAuthority() != null) {
// & default has authority return get(defaultUri, conf); // return default } } String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf); } return CACHE.get(uri, conf); }static class Cache {
... FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf); return getInternal(uri, conf, key); } private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
FileSystem fs; synchronized (this) {
fs = map.get(key); } if (fs != null) {
return fs; } fs = createFileSystem(uri, conf); synchronized (this) {
// refetch the lock again FileSystem oldfs = map.get(key); if (oldfs != null) {
// a file system is created while lock is releasing fs.close(); // close the new file system return oldfs; // return the old file system } // now insert the new file system into the map if (map.isEmpty() && !ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY); } fs.key = key; map.put(key, fs); if (conf.getBoolean("fs.automatic.close", true)) {
toAutoClose.add(key); } return fs; } }...} private static FileSystem createFileSystem(URI uri, Configuration conf ) throws IOException {
Class
clazz = getFileSystemClass(uri.getScheme(), conf); if (clazz == null) {
throw new IOException("No FileSystem for scheme: " + uri.getScheme()); } FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); return fs; } public static Class
getFileSystemClass(String scheme, Configuration conf) throws IOException {
if (!FILE_SYSTEMS_LOADED) {
loadFileSystems(); } Class
clazz = null; if (conf != null) {
clazz = (Class
) conf.getClass("fs." + scheme + ".impl", null); } if (clazz == null) {
clazz = SERVICE_FILE_SYSTEMS.get(scheme); } if (clazz == null) {
throw new IOException("No FileSystem for scheme: " + scheme); } return clazz; }

转载地址:http://sulwb.baihongyu.com/

你可能感兴趣的文章
【Python】关于Python多线程的一篇文章转载
查看>>
【Pyton】【小甲鱼】文件
查看>>
【Pyton】【小甲鱼】永久存储:腌制一缸美味的泡菜
查看>>
【Pyton】【小甲鱼】异常处理:你不可能总是对的
查看>>
APP性能测试工具
查看>>
【Pyton】【小甲鱼】类和对象
查看>>
压力测试工具JMeter入门教程
查看>>
作为一名软件测试工程师,需要具备哪些能力
查看>>
【Pyton】【小甲鱼】类和对象:一些相关的BIF(内置函数)
查看>>
【Pyton】【小甲鱼】魔法方法
查看>>
单元测试需要具备的技能和4大阶段的学习
查看>>
【Loadrunner】【浙江移动项目手写代码】代码备份
查看>>
Python几种并发实现方案的性能比较
查看>>
[Jmeter]jmeter之脚本录制与回放,优化(windows下的jmeter)
查看>>
Jmeter之正则
查看>>
【JMeter】1.9上考试jmeter测试调试
查看>>
【虫师】【selenium】参数化
查看>>
【Python练习】文件引用用户名密码登录系统
查看>>
学习网站汇总
查看>>
【Python】用Python打开csv和xml文件
查看>>