HDFS设计的主要目的是对海量数据进行存储,也就是说在其上能够存储很大量文件(可以存储TB级的文件)。HDFS将这些文件分割之后,存储在不同的DataNode上, HDFS 提供了两种访问接口:Shell接口和Java API 接口,对HDFS里面的文件进行操作,具体每个Block放在哪台DataNode上面,对于开发者来说是透明的。
通过Java API接口对HDFS进行操作,我将其整理成工具类,地址见底部
1、获取文件系统
1 /** 2 * 获取文件系统 3 * 4 * @return FileSystem 5 */ 6 public static FileSystem getFileSystem() { 7 //读取配置文件 8 Configuration conf = new Configuration(); 9 // 文件系统10 FileSystem fs = null;11 12 String hdfsUri = HDFSUri;13 if(StringUtils.isBlank(hdfsUri)){14 // 返回默认文件系统 如果在 Hadoop集群下运行,使用此种方法可直接获取默认文件系统15 try {16 fs = FileSystem.get(conf);17 } catch (IOException e) {18 logger.error("", e);19 }20 }else{21 // 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统22 try {23 URI uri = new URI(hdfsUri.trim());24 fs = FileSystem.get(uri,conf);25 } catch (URISyntaxException | IOException e) {26 logger.error("", e);27 }28 }29 30 return fs;31 }
2、创建文件目录
1 /** 2 * 创建文件目录 3 * 4 * @param path 5 */ 6 public static void mkdir(String path) { 7 try { 8 // 获取文件系统 9 FileSystem fs = getFileSystem();10 11 String hdfsUri = HDFSUri;12 if(StringUtils.isNotBlank(hdfsUri)){13 path = hdfsUri + path;14 }15 16 // 创建目录17 fs.mkdirs(new Path(path));18 19 //释放资源20 fs.close();21 } catch (IllegalArgumentException | IOException e) {22 logger.error("", e);23 }24 }
3、删除文件或者文件目录
1 /** 2 * 删除文件或者文件目录 3 * 4 * @param path 5 */ 6 public static void rmdir(String path) { 7 try { 8 // 返回FileSystem对象 9 FileSystem fs = getFileSystem();10 11 String hdfsUri = HDFSUri;12 if(StringUtils.isNotBlank(hdfsUri)){13 path = hdfsUri + path;14 }15 16 // 删除文件或者文件目录 delete(Path f) 此方法已经弃用17 fs.delete(new Path(path),true);18 19 // 释放资源20 fs.close();21 } catch (IllegalArgumentException | IOException e) {22 logger.error("", e);23 }24 }
3、根据filter获取目录下的文件
1 /** 2 * 根据filter获取目录下的文件 3 * 4 * @param path 5 * @param pathFilter 6 * @return String[] 7 */ 8 public static String[] ListFile(String path,PathFilter pathFilter) { 9 String[] files = new String[0];10 11 try {12 // 返回FileSystem对象13 FileSystem fs = getFileSystem();14 15 String hdfsUri = HDFSUri;16 if(StringUtils.isNotBlank(hdfsUri)){17 path = hdfsUri + path;18 }19 20 FileStatus[] status;21 if(pathFilter != null){22 // 根据filter列出目录内容23 status = fs.listStatus(new Path(path),pathFilter);24 }else{25 // 列出目录内容26 status = fs.listStatus(new Path(path));27 }28 29 // 获取目录下的所有文件路径30 Path[] listedPaths = FileUtil.stat2Paths(status);31 // 转换String[]32 if (listedPaths != null && listedPaths.length > 0){33 files = new String[listedPaths.length];34 for (int i = 0; i < files.length; i++){35 files[i] = listedPaths[i].toString();36 }37 }38 // 释放资源39 fs.close();40 } catch (IllegalArgumentException | IOException e) {41 logger.error("", e);42 }43 44 return files;45 }
4、文件上传至 HDFS
1 /** 2 * 文件上传至 HDFS 3 * 4 * @param delSrc 5 * @param overwrite 6 * @param srcFile 7 * @param destPath 8 */ 9 public static void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) {10 // 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txt11 Path srcPath = new Path(srcFile);12 13 // 目的路径14 String hdfsUri = HDFSUri;15 if(StringUtils.isNotBlank(hdfsUri)){16 destPath = hdfsUri + destPath;17 }18 Path dstPath = new Path(destPath);19 20 // 实现文件上传21 try {22 // 获取FileSystem对象23 FileSystem fs = getFileSystem();24 fs.copyFromLocalFile(srcPath, dstPath);25 fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);26 //释放资源27 fs.close();28 } catch (IOException e) {29 logger.error("", e);30 }31 }
5、从 HDFS 下载文件
1 /** 2 * 从 HDFS 下载文件 3 * 4 * @param srcFile 5 * @param destPath 6 */ 7 public static void getFile(String srcFile,String destPath) { 8 // 源文件路径 9 String hdfsUri = HDFSUri;10 if(StringUtils.isNotBlank(hdfsUri)){11 srcFile = hdfsUri + srcFile;12 }13 Path srcPath = new Path(srcFile);14 15 // 目的路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/16 Path dstPath = new Path(destPath);17 18 try {19 // 获取FileSystem对象20 FileSystem fs = getFileSystem();21 // 下载hdfs上的文件22 fs.copyToLocalFile(srcPath, dstPath);23 // 释放资源24 fs.close();25 } catch (IOException e) {26 logger.error("", e);27 }28 }
6、获取 HDFS 集群节点信息
1 /** 2 * 获取 HDFS 集群节点信息 3 * 4 * @return DatanodeInfo[] 5 */ 6 public static DatanodeInfo[] getHDFSNodes() { 7 // 获取所有节点 8 DatanodeInfo[] dataNodeStats = new DatanodeInfo[0]; 9 10 try {11 // 返回FileSystem对象12 FileSystem fs = getFileSystem();13 14 // 获取分布式文件系统15 DistributedFileSystem hdfs = (DistributedFileSystem)fs;16 17 dataNodeStats = hdfs.getDataNodeStats();18 } catch (IOException e) {19 logger.error("", e);20 }21 return dataNodeStats;22 }
7、查找某个文件在 HDFS集群的位置
1 /** 2 * 查找某个文件在 HDFS集群的位置 3 * 4 * @param filePath 5 * @return BlockLocation[] 6 */ 7 public static BlockLocation[] getFileBlockLocations(String filePath) { 8 // 文件路径 9 String hdfsUri = HDFSUri;10 if(StringUtils.isNotBlank(hdfsUri)){11 filePath = hdfsUri + filePath;12 }13 Path path = new Path(filePath);14 15 // 文件块位置列表16 BlockLocation[] blkLocations = new BlockLocation[0];17 try {18 // 返回FileSystem对象19 FileSystem fs = getFileSystem();20 // 获取文件目录 21 FileStatus filestatus = fs.getFileStatus(path);22 //获取文件块位置列表23 blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());24 } catch (IOException e) {25 logger.error("", e);26 }27 return blkLocations;28 }
8、文件重命名
1 /** 2 * 文件重命名 3 * 4 * @param srcPath 5 * @param dstPath 6 */ 7 public boolean rename(String srcPath, String dstPath){ 8 boolean flag = false; 9 try {10 // 返回FileSystem对象11 FileSystem fs = getFileSystem();12 13 String hdfsUri = HDFSUri;14 if(StringUtils.isNotBlank(hdfsUri)){15 srcPath = hdfsUri + srcPath;16 dstPath = hdfsUri + dstPath;17 }18 19 flag = fs.rename(new Path(srcPath), new Path(dstPath));20 } catch (IOException e) {21 logger.error("{} rename to {} error.", srcPath, dstPath);22 }23 24 return flag;25 }
9、判断目录是否存在
1 /** 2 * 判断目录是否存在 3 * 4 * @param srcPath 5 * @param dstPath 6 */ 7 public boolean existDir(String filePath, boolean create){ 8 boolean flag = false; 9 10 if (StringUtils.isEmpty(filePath)){11 return flag;12 }13 14 try{15 Path path = new Path(filePath);16 // FileSystem对象17 FileSystem fs = getFileSystem();18 19 if (create){20 if (!fs.exists(path)){21 fs.mkdirs(path);22 }23 }24 25 if (fs.isDirectory(path)){26 flag = true;27 }28 }catch (Exception e){29 logger.error("", e);30 }31 32 return flag;33 }
如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。 如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【】。本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
地址: