博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
通过Thrift访问HDFS分布式文件系统的性能瓶颈分析
阅读量:4482 次
发布时间:2019-06-08

本文共 7388 字,大约阅读时间需要 24 分钟。

引言

  Hadoop提供的HDFS布式文件存储系统,提供了基于thrift的客户端访问支持,但是因为Thrift自身的访问特点,在高并发的访问情况下,thrift自身结构可能将会成为HDFS文件存储系统的一个性能瓶颈。我们先来看一下一不使用Thrfit方式访问HDFS文件系统的业务流程。

一、HDFS文件读取流程

  

流程说明:

  1. 使用HDFS提供的客户端开发库Client,向远程的Namenode发起RPC请求;
  2. Namenode会视情况返回文件的部分或者全部block列表,对于每个block,Namenode都会返回有该block拷贝的DataNode地址;
  3. 客户端开发库Client会选取离客户端最接近的DataNode来读取block;如果客户端本身就是DataNode,那么将从本地直接获取数据.
  4. 读取完当前block的数据后,关闭与当前的DataNode连接,并为读取下一个block寻找最佳的DataNode;
  5. 当读完列表的block后,且文件读取还没有结束,客户端开发库会继续向Namenode获取下一批的block列表。
  6. 读取完一个block都会进行checksum验证,如果读取datanode时出现错误,客户端会通知Namenode,然后再从下一个拥有该block拷贝的datanode继续读。

二、HDFS文件写入流程

流程说明:

  1. 使用HDFS提供的客户端开发库Client,向远程的Namenode发起RPC请求;
  2. Namenode会检查要创建的文件是否已经存在,创建者是否有权限进行操作,成功则会为文件创建一个记录,否则会让客户端抛出异常;
  3. 当 客户端开始写入文件的时候,开发库会将文件切分成多个packets,并在内部以数据队列"data queue"的形式管理这些packets,并向Namenode申请新的blocks,获取用来存储replicas的合适的datanodes列表, 列表的大小根据在Namenode中对replication的设置而定。
  4. 开始以pipeline(管道)的形式将packet写入所 有的replicas中。开发库把packet以流的方式写入第一个datanode,该datanode把该packet存储之后,再将其传递给在此 pipeline中的下一个datanode,直到最后一个datanode,这种写数据的方式呈流水线的形式。
  5. 最后一个datanode成功存储之后会返回一个ack packet,在pipeline里传递至客户端,在客户端的开发库内部维护着"ack queue",成功收到datanode返回的ack packet后会从"ack queue"移除相应的packet。
  6. 如 果传输过程中,有某个datanode出现了故障,那么当前的pipeline会被关闭,出现故障的datanode会从当前的pipeline中移除, 剩余的block会继续剩下的datanode中继续以pipeline的形式传输,同时Namenode会分配一个新的datanode,保持 replicas设定的数量。

三、关键词

  HDFSClient通过文件IO操作最终实现是通过直接访问DataNode进行。

四、Thrift的访问流程:猜测版

  

流程说明:

1.ThriftClient客户端将操作命令传给ThriftServer。

2.ThriftServer调用HDFSClient接口API实现HDFS读写操作,操作流程如和三所示。

五、疑问

  与DataNode发生数据交换的到底是ThriftServer还是ThriftClient,如果是ThriftServer,那么多个ThriftClient并行访问时,ThriftServer必将成为HDFS访问的性能瓶颈;如果是ThriftClient直接访问DataNode,那么理论依据何在呢?

六、示例程序

  下面是一个基于Thrift实现的HDFS客户端程序,实现了文件的访问和创建和读取

1 // HdfsDemo.cpp : Defines the entry point for the console application.  2 //  3   4 #include "stdafx.h"  5 #include 
6 #include
7 #include
8 #include
9 #include
10 #include
11 #include "ThriftHadoopFileSystem.h" 12 13 #ifndef _WIN32_WINNT 14 #define _WIN32_WINNT 0x0500 15 #endif 16 using namespace std; 17 using namespace apache::thrift; 18 using namespace apache::thrift::protocol; 19 using namespace apache::thrift::transport; 20 21 int _tmain(int argc, _TCHAR* argv[]) 22 { 23 if (argc < 3) 24 { 25 std::cerr << "Invalid arguments!\n" << "Usage: DemoClient host port" << std::endl; 26 //return -1; 27 } 28 boost::shared_ptr
socket(new TSocket("192.168.230.133", 55952));//boost::lexical_cast
(argv[2]))); 29 boost::shared_ptr
transport(new TBufferedTransport(socket)); 30 boost::shared_ptr
protocol(new TBinaryProtocol(transport)); 31 ThriftHadoopFileSystemClient client(protocol); 32 try 33 { 34 transport->open(); 35 Pathname path; 36 //01_create directory 37 path.__set_pathname("/user/hadoop"); 38 if(client.exists(path) == true) 39 { 40 printf("path is exists.\r\n"); 41 } 42 else 43 { 44 printf("path is not exists."); 45 //return 0; 46 } 47 //02_put file 48 Pathname filepath; 49 filepath.__set_pathname("/user/hadoop/in/test1.txt"); 50 /* 51 FILE* localfile = fopen("E:\\project\\Hadoop\\HdfsDemo\\Debug\\hello.txt","rb"); 52 if (localfile == NULL) 53 { 54 transport->close(); 55 return 0; 56 } 57 ThriftHandle hdl; 58 client.create(hdl,filepath); 59 while (true) 60 { 61 char data[1024]; 62 memset(data,0x00,sizeof(data)); 63 size_t Num = fread(data,1,1024,localfile); 64 if (Num <= 0) 65 { 66 break; 67 } 68 client.write(hdl,data); 69 } 70 fclose(localfile); 71 client.close(hdl); 72 */ 73 //03_get file 74 /* 75 ThriftHandle hd2; 76 FileStatus stat1; 77 client.open(hd2,filepath); 78 client.stat(stat1,filepath); 79 int index = 0; 80 while(true) 81 { 82 string data; 83 if (stat1.length <= index) 84 { 85 break; 86 } 87 client.read(data,hd2,index,1024); 88 89 index += data.length(); 90 printf("==%s\r\n",data.c_str()); 91 } 92 client.close(hd2); 93 */ 94 95 //04_list files 96 std::vector
vFileStatus; 97 client.listStatus(vFileStatus,path); 98 for (int i=0;i
close();103 } catch (const TException &tx) {104 std::cerr << "ERROR: " << tx.what() << std::endl;105 }106 getchar();107 return 0;108 }

 七、源码分析

  1.文件创建:

1     /** 2       * Create a file and open it for writing, delete file if it exists 3       */ 4     public ThriftHandle createFile(Pathname path,  5                                    short mode, 6                                    boolean  overwrite, 7                                    int bufferSize, 8                                    short replication, 9                                    long blockSize) throws ThriftIOException {10       try {11         now = now();12         HadoopThriftHandler.LOG.debug("create: " + path +13                                      " permission: " + mode +14                                      " overwrite: " + overwrite +15                                      " bufferSize: " + bufferSize +16                                      " replication: " + replication +17                                      " blockSize: " + blockSize);18         FSDataOutputStream out = fs.create(new Path(path.pathname), 19                                            new FsPermission(mode),20                                            overwrite,21                                            bufferSize,22                                            replication,23                                            blockSize,24                                            null); // progress25         long id = insert(out);26         ThriftHandle obj = new ThriftHandle(id);27         HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);28         return obj;29       } catch (IOException e) {30         throw new ThriftIOException(e.getMessage());31       }32     }

  ThriftHandle的两端到底是谁呢?是ThriftClient和DataNode?还是ThriftServer与DataNode?

  2.文件写入

1     public boolean write(ThriftHandle tout, String data) throws ThriftIOException { 2       try { 3         now = now(); 4         HadoopThriftHandler.LOG.debug("write: " + tout.id); 5         FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id); 6         byte[] tmp = data.getBytes("UTF-8"); 7         out.write(tmp, 0, tmp.length); 8         HadoopThriftHandler.LOG.debug("wrote: " + tout.id); 9         return true;10       } catch (IOException e) {11         throw new ThriftIOException(e.getMessage());12       }13     }

  写入时依赖的还是ThriftHandle?  

  3.文件读取

1     /** 2      * read from a file 3      */ 4     public String read(ThriftHandle tout, long offset, 5                        int length) throws ThriftIOException { 6       try { 7         now = now(); 8         HadoopThriftHandler.LOG.debug("read: " + tout.id + 9                                      " offset: " + offset +10                                      " length: " + length);11         FSDataInputStream in = (FSDataInputStream)lookup(tout.id);12         if (in.getPos() != offset) {13           in.seek(offset);14         }15         byte[] tmp = new byte[length];16         int numbytes = in.read(offset, tmp, 0, length);17         HadoopThriftHandler.LOG.debug("read done: " + tout.id);18         return new String(tmp, 0, numbytes, "UTF-8");19       } catch (IOException e) {20         throw new ThriftIOException(e.getMessage());21       }22     }

 八、遗留问题

  ThriftHandle可以看做是Socket连接句柄,但是他的两端到底是谁呢?如果是ThriftClient代表的客户端则一切OK,那么我该如何证明呢?存疑待考!

 

转载于:https://www.cnblogs.com/hadoopdev/p/3282386.html

你可能感兴趣的文章
借用Snippet插件美化博客中的代码
查看>>
深入研究java.lang.Runtime类
查看>>
10677 我们仍未知道那天所看见的花的名字
查看>>
ScanTailor-ScanTailor 自动矫正图像歪斜
查看>>
UVA GCD - Extreme (II)
查看>>
完成个人中心—导航标签
查看>>
【C++】C++中变量的声明与定义的区别
查看>>
前端性能优化
查看>>
static
查看>>
属性动画
查看>>
Swift 字符串
查看>>
Python 生成器 Generator 和迭代器 Iterator
查看>>
实现icon和文字垂直居中的两种方法-(vertical-align and line-height)
查看>>
[CareerCup] 3.6 Sort Stack 栈排序
查看>>
Beta版总结会议
查看>>
Cocos2d-x中使用的数据容器类
查看>>
创建ORACLE 查询用户
查看>>
jzoj3297. 【SDOI2013】逃考
查看>>
通过例子学python(2.1)
查看>>
高效率场景-内存映射
查看>>