博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
熟练掌握HDFS的Java API接口访问
阅读量:6003 次
发布时间:2019-06-20

本文共 7987 字,大约阅读时间需要 26 分钟。

    HDFS设计的主要目的是对海量数据进行存储,也就是说在其上能够存储很大量文件(可以存储TB级的文件)。HDFS将这些文件分割之后,存储在不同的DataNode上, HDFS 提供了两种访问接口:Shell接口和Java API 接口,对HDFS里面的文件进行操作,具体每个Block放在哪台DataNode上面,对于开发者来说是透明的。

    通过Java API接口对HDFS进行操作,我将其整理成工具类,地址见底部

1、获取文件系统

1 /** 2  * 获取文件系统 3  *  4  * @return FileSystem 5  */ 6 public static FileSystem getFileSystem() { 7     //读取配置文件 8     Configuration conf = new Configuration(); 9     // 文件系统10     FileSystem fs = null;11     12     String hdfsUri = HDFSUri;13     if(StringUtils.isBlank(hdfsUri)){14         // 返回默认文件系统  如果在 Hadoop集群下运行,使用此种方法可直接获取默认文件系统15         try {16             fs = FileSystem.get(conf);17         } catch (IOException e) {18             logger.error("", e);19         }20     }else{21         // 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统22         try {23             URI uri = new URI(hdfsUri.trim());24             fs = FileSystem.get(uri,conf);25         } catch (URISyntaxException | IOException e) {26             logger.error("", e);27         }28     }29         30     return fs;31 }

 2、创建文件目录

1 /** 2  * 创建文件目录 3  *  4  * @param path 5  */ 6 public static void mkdir(String path) { 7     try { 8         // 获取文件系统 9         FileSystem fs = getFileSystem();10         11         String hdfsUri = HDFSUri;12         if(StringUtils.isNotBlank(hdfsUri)){13             path = hdfsUri + path;14         }15         16         // 创建目录17         fs.mkdirs(new Path(path));18         19         //释放资源20         fs.close();21     } catch (IllegalArgumentException | IOException e) {22         logger.error("", e);23     }24 }

3、删除文件或者文件目录

1 /** 2  * 删除文件或者文件目录 3  *  4  * @param path 5  */ 6 public static void rmdir(String path) { 7     try { 8         // 返回FileSystem对象 9         FileSystem fs = getFileSystem();10         11         String hdfsUri = HDFSUri;12         if(StringUtils.isNotBlank(hdfsUri)){13             path = hdfsUri + path;14         }15         16         // 删除文件或者文件目录  delete(Path f) 此方法已经弃用17         fs.delete(new Path(path),true);18         19         // 释放资源20         fs.close();21     } catch (IllegalArgumentException | IOException e) {22         logger.error("", e);23     }24 }

3、根据filter获取目录下的文件

1 /** 2  * 根据filter获取目录下的文件 3  *  4  * @param path 5  * @param pathFilter 6  * @return String[] 7  */ 8 public static String[] ListFile(String path,PathFilter pathFilter) { 9     String[] files = new String[0];10     11     try {12         // 返回FileSystem对象13         FileSystem fs = getFileSystem();14         15         String hdfsUri = HDFSUri;16         if(StringUtils.isNotBlank(hdfsUri)){17             path = hdfsUri + path;18         }19         20         FileStatus[] status;21         if(pathFilter != null){22             // 根据filter列出目录内容23             status = fs.listStatus(new Path(path),pathFilter);24         }else{25             // 列出目录内容26             status = fs.listStatus(new Path(path));27         }28         29         // 获取目录下的所有文件路径30         Path[] listedPaths = FileUtil.stat2Paths(status);31         // 转换String[]32         if (listedPaths != null && listedPaths.length > 0){33             files = new String[listedPaths.length];34             for (int i = 0; i < files.length; i++){35                 files[i] = listedPaths[i].toString();36             }37         }38         // 释放资源39         fs.close();40     } catch (IllegalArgumentException | IOException e) {41         logger.error("", e);42     }43     44     return files;45 }

4、文件上传至 HDFS

1 /** 2  * 文件上传至 HDFS 3  *  4  * @param delSrc 5  * @param overwrite 6  * @param srcFile 7  * @param destPath 8  */ 9 public static void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) {10     // 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txt11     Path srcPath = new Path(srcFile);12     13     // 目的路径14     String hdfsUri = HDFSUri;15     if(StringUtils.isNotBlank(hdfsUri)){16         destPath = hdfsUri + destPath;17     }18     Path dstPath = new Path(destPath);19     20     // 实现文件上传21     try {22         // 获取FileSystem对象23         FileSystem fs = getFileSystem();24         fs.copyFromLocalFile(srcPath, dstPath);25         fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);26         //释放资源27         fs.close();28     } catch (IOException e) {29         logger.error("", e);30     }31 }

5、从 HDFS 下载文件

1 /** 2  * 从 HDFS 下载文件 3  *  4  * @param srcFile 5  * @param destPath 6  */ 7 public static void getFile(String srcFile,String destPath) { 8     // 源文件路径 9     String hdfsUri = HDFSUri;10     if(StringUtils.isNotBlank(hdfsUri)){11         srcFile = hdfsUri + srcFile;12     }13     Path srcPath = new Path(srcFile);14     15     // 目的路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/16     Path dstPath = new Path(destPath);17     18     try {19         // 获取FileSystem对象20         FileSystem fs = getFileSystem();21         // 下载hdfs上的文件22         fs.copyToLocalFile(srcPath, dstPath);23         // 释放资源24         fs.close();25     } catch (IOException e) {26         logger.error("", e);27     }28 }

6、获取 HDFS 集群节点信息

1 /** 2  * 获取 HDFS 集群节点信息 3  *  4  * @return DatanodeInfo[] 5  */ 6 public static DatanodeInfo[] getHDFSNodes() { 7     // 获取所有节点 8     DatanodeInfo[] dataNodeStats = new DatanodeInfo[0]; 9     10     try {11         // 返回FileSystem对象12         FileSystem fs = getFileSystem();13         14         // 获取分布式文件系统15         DistributedFileSystem hdfs = (DistributedFileSystem)fs;16         17         dataNodeStats = hdfs.getDataNodeStats();18     } catch (IOException e) {19         logger.error("", e);20     }21     return dataNodeStats;22 }

7、查找某个文件在 HDFS集群的位置

1 /** 2  * 查找某个文件在 HDFS集群的位置 3  *  4  * @param filePath 5  * @return BlockLocation[] 6  */ 7 public static BlockLocation[] getFileBlockLocations(String filePath) { 8     // 文件路径 9     String hdfsUri = HDFSUri;10     if(StringUtils.isNotBlank(hdfsUri)){11         filePath = hdfsUri + filePath;12     }13     Path path = new Path(filePath);14     15     // 文件块位置列表16     BlockLocation[] blkLocations = new BlockLocation[0];17     try {18         // 返回FileSystem对象19         FileSystem fs = getFileSystem();20         // 获取文件目录 21         FileStatus filestatus = fs.getFileStatus(path);22         //获取文件块位置列表23         blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());24     } catch (IOException e) {25         logger.error("", e);26     }27     return blkLocations;28 }

 8、文件重命名

1 /** 2  * 文件重命名 3  *  4  * @param srcPath 5  * @param dstPath 6  */ 7 public boolean rename(String srcPath, String dstPath){ 8     boolean flag = false; 9     try    {10         // 返回FileSystem对象11         FileSystem fs = getFileSystem();12         13         String hdfsUri = HDFSUri;14         if(StringUtils.isNotBlank(hdfsUri)){15             srcPath = hdfsUri + srcPath;16             dstPath = hdfsUri + dstPath;17         }18         19         flag = fs.rename(new Path(srcPath), new Path(dstPath));20     } catch (IOException e) {21         logger.error("{} rename to {} error.", srcPath, dstPath);22     }23     24     return flag;25 }

9、判断目录是否存在

1 /** 2  * 判断目录是否存在 3  *  4  * @param srcPath 5  * @param dstPath 6  */ 7 public boolean existDir(String filePath, boolean create){ 8     boolean flag = false; 9     10     if (StringUtils.isEmpty(filePath)){11         return flag;12     }13     14     try{15         Path path = new Path(filePath);16         // FileSystem对象17         FileSystem fs = getFileSystem();18         19         if (create){20             if (!fs.exists(path)){21                 fs.mkdirs(path);22             }23         }24         25         if (fs.isDirectory(path)){26             flag = true;27         }28     }catch (Exception e){29         logger.error("", e);30     }31     32     return flag;33 }

 

如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。

如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【】。

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

地址:

转载于:https://www.cnblogs.com/codeOfLife/p/5396624.html

你可能感兴趣的文章
IE6 7下绝对定位引发浮动元素神秘消失
查看>>
android - ADT本地配置、sdk配置
查看>>
I.MX6 dhcpcd 需要指定网卡
查看>>
js递归原理之return
查看>>
浏览器的回流和重绘及其优化方式
查看>>
centos配置ssh免密码登录后,仍提示输入密码
查看>>
gulp+browser-sync实现前端自动化刷新
查看>>
python学习笔记 --- 来看看 random_state 这个参数
查看>>
基于angular2实现用户登录并信息持久化的一些理解(三)
查看>>
新框架 - 收藏集 - 掘金
查看>>
JQuery坑,说说哪些大家都踩过的坑
查看>>
高性能迷你React框架anu在低版本IE的实践
查看>>
windows中用cmd 删除文件夹以及文件夹里面的所有内容
查看>>
中国在两年内赶超美国AI?李开复:不一定
查看>>
2018年OpenStack用户调查报告出炉:Kubernetes仍居首
查看>>
Eclipse基金会发布Eclipse Photon IDE
查看>>
纯css实现左右横线,文字自适应居中效果
查看>>
唯品会HDFS性能挑战和优化实践
查看>>
JavaScript 设计模式
查看>>
Java EE供应商和伦敦Java用户组宣布新的MicroProfile
查看>>