Apache Kudu 是一个开源的分布式数据存储引擎
,使得快速分析和更改数据变得容易
。
Kudu 提供了快速插入/更新和高效的柱状扫描的组合
,以支持跨一个存储层的多个实时分析工作负载
。Kudu 为架构师提供了灵活性,可以处理更多种类的用例,而不需要特殊的解决方案和必需的外部服务依赖。
Kudu 是专门为需要对快速(快速变化的)数据进行快速分析的用例而设计的
。Kudu 利用下一代硬件和内存处理的优势,显著降低了 Apache Impala、 Apache NiFi、 Apache Spark、 Apache Flink 等引擎的查询延迟。
Apache Kudu 由 Apache 大数据生态系统的长期贡献者建立
,是 Apache 2许可下发布的顶级 Apache软件基金会项目
,重视社区参与作为其长期成功的一个重要因素。
关注博主不迷路,获取更多干货资源
1 ETL实现方案
1.1 ETL处理流程图 1.2 为什么使用Kudu作为存储介质
数据库数据上的快速分析
目前很多业务使用事务型数据库(MySQL、Oracle)做数据分析,把数据写入数据库,然后使用 SQL 进行有效信息提取,当数据规模很小的时候,这种方式确实是立竿见影的,但是当数据量级起来以后,会发现数据库吃不消了或者成本开销太大了,此时就需要把数据从事务型数据库里拷贝出来或者说剥离出来,装入一个分析型的数据库里。发现对于实时性和变更性的需求,目前只有 Kudu 一种组件能够满足需求,所以就产生了这样的一种场景:
MySQL 数据库增、删、改的数据通过 Binlog 实时的被同步到 Kudu 里,同时在 Impala(或者其他计算引擎如 Spark、Hive、Presto、MapReduce)上可以实时的看到。
这种场景也是目前业界使用最广泛的,认可度最高。
用户行为日志的快速分析
对于用户行为日志的实时性敏感的业务,比如电商流量、AB 测试、优惠券的点击反馈、广告投放效果以及秒级导入秒级查询等需求,按 Kudu 出现以前的架构基本上都是这张图的模式:
不仅链路长而且实时性得不到有力保障,有些甚至是 T + 1 的,极大的削弱了业务的丰富度。
引入 Kudu 以后,大家看,数据的导入和查询都是在线实时的:
这种场景目前也是网易考拉和hub在使用的,其中hub甚至把 Kudu 当 HBase 来作点查使用。
2 Kudu入门 2.1 Kudu介绍 2.1.1 背景介绍
在Kudu之前,大数据主要以两种方式存储;
静态数据
:
以 HDFS 引擎作为存储引擎,适用于高吞吐量的离线大数据分析场景。
这类存储的局限性是数据无法进行随机的读写。
动态数据:
以 HBase、Cassandra 作为存储引擎,适用于大数据随机读写场景。
这类存储的局限性是批量读取吞吐量远不如 HDFS,不适用于批量数据分析的场景。
从上面分析可知,这两种数据在存储方式上
完全不同,进而导致使用场景完全不同,但在真实的场景中,边界可能没有那么清晰,面对既需要随机读写,又需要批量分析的大数据场景,该如何选择呢?这个场景中,单种存储引擎无法满足业务需求,我们需要通过多种大数据工具组合来满足这一需求。
如上图所示,数据实时写入 HBase,实时的数据更新也在 HBase 完成,为了应对 OLAP 需求,我们定时(通常是 T+1 或者 T+H)将 HBase 数据写成静态的文件(如:Parquet)导入到 OLAP 引擎(如:HDFS)。这一架构能满足既需要随机读写,又可以支持 OLAP 分析的场景,但它有如下缺点:
架构复杂
。从架构上看,数据在HBase、消息队列、HDFS 间流转,涉及环节太多,运维成本很高。并且每个环节需要保证高可用,都需要维护多个副本,存储空间也有一定的浪费。最后数据在多个系统上,对数据安全策略、监控等都提出了挑战。
时效性低
。数据从HBase导出成静态文件是周期性的,一般这个周期是一天(或一小时),在时效性上不是很高。
难以应对后续的更新
。真实场景中,总会有数据是延迟到达的。如果这些数据之前已经从HBase导出到HDFS,新到的变更数据就难以处理了,一个方案是把原有数据应用上新的变更后重写一遍,但这代价又很高。
为了解决上述架构的这些问题,Kudu应运而生。Kudu的定位是Fast Analytics on Fast Data,是一个既支持随机读写、又支持 OLAP 分析的大数据存储引擎
。
从上图可以看出,KUDU 是一个折中的产品,在 HDFS 和 HBase 这两个偏科生中平衡了随机读写和批量分析的性能。从 KUDU 的诞生可以说明一个观点:底层的技术发展很多时候都是上层的业务推动的,脱离业务的技术很可能是空中楼阁
。
2.1.2 新的硬件设备
内存(RAM)的技术发展非常快,它变得越来越便宜,容量也越来越大。Cloudera的客户数据显示,他们的客户所部署的服务器,2012年每个节点仅有32GB RAM,现如今增长到每个节点有128GB或256GB RAM。存储设备上更新也非常快,在很多普通服务器中部署SSD也是屡见不鲜。HBase、HDFS、以及其他的Hadoop工具都在不断自我完善,从而适应硬件上的升级换代。然而,从根本上,HDFS基于03年GFS,HBase基于05年BigTable,在当时系统瓶颈主要取决于底层磁盘速度。当磁盘速度较慢时,CPU利用率不足的根本原因是磁盘速度导致的瓶颈,当磁盘速度提高了之后,CPU利用率提高,这时候CPU往往成为系统的瓶颈。HBase、HDFS由于年代久远,已经很难从基本架构上进行修改,而Kudu是基于全新的设计,因此可以更充分地利用RAM、I/O资源,并优化CPU利用率。
我们可以理解为:Kudu相比与以往的系统,CPU使用降低了,I/O的使用提高了,RAM的利用更充分了
。
2.1.3 Kudu是什么
Apache Kudu是由Cloudera开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力。它是一个融合HDFS和HBase的功能的新组件,具备介于两者之间的新存储组件。
Kudu支持水平扩展,并且与Cloudera Impala和Apache Spark
等当前流行的大数据查询和分析工具结合紧密。
2.1.4 Kudu的应用场景
Kudu的很多特性跟HBase很像,它支持索引键的查询和修改。Cloudera曾经想过基于Hbase进行修改,然而结论是对HBase的改动非常大,Kudu的数据模型和磁盘存储都与Hbase不同。HBase本身成功的适用于大量的其它场景,因此修改HBase很可能吃力不讨好。最后Cloudera决定开发一个全新的存储系统。
Strong performance for both scan and random access to help customers simplify complex hybrid architectures(适用于那些既有随机访问,也有批量数据扫描的复合场景
)
High CPU efficiency in order to maximize the return on investment that our customers are making in modern processors(高计算量的场景
)
High IO efficiency in order to leverage modern persistent storage(使用了高性能的存储设备,包括使用更多的内存
)
The ability to upDATE data in place, to avoid extraneous processing and data movement(支持数据更新,避免数据反复迁移
)
The ability to support active-active replicated clusters that span multiple data centers in geographically distant locations(支持跨地域的实时数据备份和查询
)
2.1.5 Kudu的架构
下图显示了一个具有三个 master
和多个 tablet server
的 Kudu 集群
,每个服务器都支持多个 tablet。
它说明了如何使用 Raft 共识来允许 master 和 tablet server 的 leader
和 follow
。
此外,tablet server 可以成为某些 tablet 的 leader,也可以是其他 tablet 的 follower。leader 以金色显示,而 follower 则显示为蓝色。
下面是一些基本概念:
角色
作用
Master
集群中的老大,负责集群管理、元数据管理等功能
Tablet Server
集群中的小弟,负责数据存储,并提供数据读写服务 一个 tablet server 存储了table表的tablet 和为 tablet 向 client 提供服务。对于给定的 tablet,一个tablet server 充当 leader,其他 tablet server 充当该 tablet 的 follower 副本。 只有 leader服务写请求,然而 leader 或 followers 为每个服务提供读请求 。一个 tablet server 可以服务多个 tablets ,并且一个 tablet 可以被多个 tablet servers 服务着。
Table(表)
一张table是数据存储在Kudu的tablet server中。表具有 schema 和全局有序的primary key(主键)。table 被分成称为 tablets 的 segments。
Tablet
一个 tablet 是一张 table连续的segment,tablet是kudu表的水平分区,类似于google Bigtable的tablet,或者HBase的region。每个tablet存储着一定连续range的数据(key),且tablet两两间的range不会重叠。一张表的所有tablet包含了这张表的所有key空间。与其它数据存储引擎或关系型数据库中的 partition(分区)相似。给定的tablet 冗余到多个 tablet 服务器上,并且在任何给定的时间点,其中一个副本被认为是leader tablet。任何副本都可以对读取进行服务,并且写入时需要在为 tablet 服务的一组 tablet server之间达成一致性。
2.2 Kudu的分区
为了提供可扩展性,Kudu 表被划分为称为 tablets 的单元,并分布在许多 tablet servers 上。行总是属于单个tablet 。将行分配给 tablet 的方法由在表创建期间设置的表的分区决定。
kudu提供了3种分区方式。
2.2.1 Hash Partitioning(哈希分区)
哈希分区通过哈希值将行分配到许多 buckets ( 存储桶 )之一; 哈希分区是一种有效的策略,当不需要对表进行有序访问时。哈希分区对于在 tablet 之间随机散布这些功能是有效的,这有助于减轻热点和 tablet 大小不均匀。
优点
:可以解决数据的热点问题(数据倾斜)
缺点
:数据是无序的
2.2.2 Range Partitioning(范围分区)
范围分区可以根据存入数据的数据量,均衡的存储到各个机器上,防止机器出现负载不均衡现象.
优点
:可以解决数据全局有序问题
缺点
:导致数据热点问题
2.2.3 Multilevel Partitioning(多级分区)
Kudu 允许一个表在单个表上组合多级分区。
当正确使用时,多级分区可以保留各个分区类型的优点,同时减少每个分区的缺点 需求.
hashPartition+ranagePartiotin=混合分区
优点
:既解决了热点问题也解决了数据有序问题
2.3 Java操作Kudu 2.3.1 pom 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > cn.xiaoma</groupId > <artifactId > xiaoma_kudu</artifactId > <version > 1.0-SNAPSHOT</version > <repositories > <repository > <id > cloudera</id > <url > https://repository.cloudera.com/artifactory/cloudera-repos/</url > </repository > </repositories > <dependencies > <dependency > <groupId > org.apache.kudu</groupId > <artifactId > kudu-client</artifactId > <version > 1.9.0</version > </dependency > <dependency > <groupId > org.apache.kudu</groupId > <artifactId > kudu-client-tools</artifactId > <version > 1.9.0</version > </dependency > <dependency > <groupId > org.apache.kudu</groupId > <artifactId > kudu-spark2_2.11</artifactId > <version > 1.9.0</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-sql_2.11</artifactId > <version > 2.1.0</version > </dependency > <dependency > <groupId > org.testng</groupId > <artifactId > testng</artifactId > <version > RELEASE</version > <scope > compile</scope > </dependency > <dependency > <groupId > junit</groupId > <artifactId > junit</artifactId > <version > 4.12</version > </dependency > </dependencies > </project >
2.3.2 创建表 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 package cn.xiaoma;import org.apache.kudu.ColumnSchema;import org.apache.kudu.Schema;import org.apache.kudu.Type;import org.apache.kudu.client.CreateTableOptions;import org.apache.kudu.client.KuduClient;import org.apache.kudu.client.KuduException;import org.junit.Before;import org.junit.Test;import java.util.ArrayList;import java.util.List;public class CreateTable { private static String masterAddress = "node2.xiaoma.cn" ; private static KuduClient kuduClient; private static String tableName = "person1" ; @Before public void init(){ System.out.println("---初始化kuduclient对象--" ); kuduClient = new KuduClient .KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(60000 ).build(); } private ColumnSchema new Column (String name, Type type, Boolean isKey){ ColumnSchema.ColumnSchemaBuilder columnSchemaBuilder = new ColumnSchema .ColumnSchemaBuilder(name, type); columnSchemaBuilder.key(isKey); return columnSchemaBuilder.build(); } @Test public void CreateTable(){ List<ColumnSchema> columnSchemaList = new ArrayList <ColumnSchema>(); columnSchemaList.add(new Column ("id" , Type.INT32, true )); columnSchemaList.add(new Column ("name" , Type.STRING, false )); columnSchemaList.add(new Column ("gender" , Type.STRING, false )); columnSchemaList.add(new Column ("phone" , Type.STRING, false )); Schema schema = new Schema (columnSchemaList); CreateTableOptions createTableOptions = new CreateTableOptions (); List<String > partitionColumnList = new ArrayList <String >(); partitionColumnList.add("id" ); createTableOptions.addHashPartitions(partitionColumnList, 3 ); createTableOptions.setNumReplicas(1 ); try { kuduClient.createTable(tableName, schema, createTableOptions); } catch (KuduException e) { e.printStackTrace(); } } }
2.3.3 数据操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 package cn.xiaoma; import org.apache.kudu.client.*; import org.junit.Before; import org.junit.Test; public class KuduDemo { private static String masterAddress = "node2.xiaoma.cn" ; private static KuduClient kuduClient; private static String tableName = "person" ; @Before public void init() { System . out.println("---初始化kuduclient对象--" ); kuduClient = new KuduClient.KuduClientBuilder(masterAddress ) .defaultSocketReadTimeoutMs(60000) .build() ; } @Test public void loadData() throws KuduException { KuduTable kuduTable = kuduClient.open Table(tableName ) ; KuduSession kuduSession = kuduClient.new Session() ; for (int i = 0 ; i <100 ; i++) { Insert insert = kuduTable.new Insert() ; insert.getRow() .addInt("id" , i +1) ; insert.getRow() .addString("name" , "zhangsan" ) ; insert.getRow() .addString("gender" , "male" ) ; insert.getRow() .addString("phone" , "12213213232" ) ; kuduSession.apply(insert); } kuduSession.close() ; kuduClient.close() ; } @Test public void queryData() throws KuduException { KuduTable kuduTable = kuduClient.open Table(tableName ) ; KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.new ScannerBuilder(kuduTable ) ; KuduScanner scanner = scannerBuilder.build() ; while (scanner.hasMoreRows() ){ RowResultIterator rowResults = scanner.nextRows() ; while (rowResults.hasNext() ){ RowResult result = rowResults.next() ; System . out.println(result.getInt("id" ) ); System . out.println(result.getString("name" ) ); System . out.println(result.getString("gender" ) ); System . out.println(result.getString("phone" ) ); } } scanner.close() ; kuduClient.close() ; } @Test public void updateData() throws KuduException { KuduTable kuduTable = kuduClient.open Table(tableName ) ; KuduSession kuduSession = kuduClient.new Session() ; Update update = kuduTable.new Update() ; PartialRow row = update.getRow() ; row.addInt("id" , 100) ; row.addString("name" , "lisi" ) ; row.addString("phone" , "1111111111" ) ; kuduSession.apply(update); kuduSession.close() ; kuduClient.close() ; } @Test public void deleteData() throws KuduException { KuduTable kuduTable = kuduClient.open Table(tableName ) ; KuduSession kuduSession = kuduClient.new Session() ; Delete delete = kuduTable.new Delete() ; PartialRow row = delete.getRow() ; row.addInt("id" , 1) ; kuduSession.apply(delete); kuduSession.close() ; kuduClient.close() ; } @Test public void upSertData() throws KuduException { KuduTable kuduTable = kuduClient.open Table(tableName ) ; KuduSession kuduSession = kuduClient.new Session() ; Upsert upsert = kuduTable.new Upsert() ; PartialRow row = upsert.getRow() ; row.addInt("id" , 100) ; row.addString("name" , "zhangsan" ) ; row.addString("gender" , "male" ) ; row.addString("phone" , "1111111111" ) ; kuduSession.apply(upsert); kuduSession.close() ; kuduClient.close() ; } }
2.3.4 表操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 package cn.xiaoma; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.*; import org.junit.Before; import org.junit.Test; import java.util.ArrayList; import java.util.List; public class AlterTable { private static String masterAddress = "node2.xiaoma.cn" ; private static KuduClient kuduClient; private static String tableName = "person" ; @Before public void init() { System . out.println("---初始化kuduclient对象--" ); kuduClient = new KuduClient.KuduClientBuilder(masterAddress ) .defaultSocketReadTimeoutMs(60000) .build() ; } @Test public void alterTableAddColumn() { AlterTableOptions alterTableOptions = new AlterTableOptions() ; alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("address" , Type.STRING) .nullable(true ).build() ); try { kuduClient.alterTable(tableName ,alterTableOptions ) ; } catch (KuduException e) { e.printStackTrace() ; } } @Test public void alterTableDeleteColumn() { AlterTableOptions alterTableOptions = new AlterTableOptions() .dropColumn("address" ) ; try { kuduClient.alterTable(tableName , alterTableOptions ) ; } catch (KuduException e) { e.printStackTrace() ; } } @Test public void alterTableAddRangePartition() { int lowerValue = 110 ; int upperValue = 120 ; try { KuduTable kuduTable = kuduClient.open Table("person_range" ) ; boolean flag = true ; List<Partition> rangePartitions = kuduTable.getRangePartitions(6000) ; for (Partition partition : rangePartitions){ int startKey = partition.getDecodedRangeKeyStart(kuduTable ) .getInt("id" ) ; int endKey = partition.getDecodedRangeKeyEnd(kuduTable ) .getInt("id" ) ; if (startKey == lowerValue){ flag = false ; } } if (flag) { PartialRow lower = kuduTable.getSchema() .new PartialRow() ; lower.addInt("id" , lowerValue ) ; PartialRow upper = kuduTable.getSchema() .new PartialRow() ; upper.addInt("id" , upperValue ) ; kuduClient.alterTable("person_range" , new AlterTableOptions() .addRangePartition(lower , upper ) ); }else { System . out.println("表分区已经创建过,无需再次创建...." ); } } catch (KuduException e) { e.printStackTrace() ; } catch (Exception exception ) { exception .printStackTrace() ; } } @Test public void dropTable() throws KuduException { kuduClient.deleteTable(tableName ) ; } }
2.3.5 分区演示 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 package cn.xiaoma;import org.apache.kudu.ColumnSchema;import org.apache.kudu.Schema;import org.apache.kudu.Type;import org.apache.kudu.client.CreateTableOptions;import org.apache.kudu.client.KuduClient;import org.apache.kudu.client.KuduException;import org.apache.kudu.client.PartialRow;import org.junit.Before;import org.junit.Test;import javax.print.DocFlavor;import java.util.ArrayList;import java.util.LinkedList;import java.util.List;public class KuduPartition { private static String masterAddress = "node2.xiaoma.cn" ; private static KuduClient kuduClient; private static String tableName = "person" ; @Before public void init(){ System.out.println("---初始化kuduclient对象--" ); kuduClient = new KuduClient .KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(60000 ).build(); } private ColumnSchema new Column (String name, Type type, Boolean isKey){ ColumnSchema.ColumnSchemaBuilder columnSchemaBuilder = new ColumnSchema .ColumnSchemaBuilder(name, type); columnSchemaBuilder.key(isKey); return columnSchemaBuilder.build(); } @Test public void testHashPartition(){ List<ColumnSchema> columnSchemaList = new ArrayList <ColumnSchema>(); columnSchemaList.add(new Column ("id" , Type.INT32, true )); columnSchemaList.add(new Column ("name" , Type.STRING, false )); columnSchemaList.add(new Column ("gender" , Type.STRING, false )); columnSchemaList.add(new Column ("phone" , Type.STRING, false )); Schema schema = new Schema (columnSchemaList); CreateTableOptions createTableOptions = new CreateTableOptions (); List<String > partitionColumnList = new ArrayList <String >(); partitionColumnList.add("id" ); createTableOptions.addHashPartitions(partitionColumnList, 3 ); createTableOptions.setNumReplicas(1 ); try { kuduClient.createTable(tableName, schema, createTableOptions); } catch (KuduException e) { e.printStackTrace(); } } @Test public void testRangePartition(){ List<ColumnSchema> columnSchemaList = new ArrayList <ColumnSchema>(); columnSchemaList.add(new Column ("id" , Type.INT32, true )); columnSchemaList.add(new Column ("name" , Type.STRING, false )); columnSchemaList.add(new Column ("gender" , Type.STRING, false )); columnSchemaList.add(new Column ("phone" , Type.STRING, false )); Schema schema = new Schema (columnSchemaList); CreateTableOptions createTableOptions = new CreateTableOptions (); List<String > partitionColumnList = new ArrayList <String >(); partitionColumnList.add("id" ); createTableOptions.setRangePartitionColumns(partitionColumnList); createTableOptions.setNumReplicas(1 ); try { int count = 0 ; for (int i = 0 ; i < 10 ; i++) { PartialRow lower = new PartialRow (schema); lower.addInt("id" , count); PartialRow upper = new PartialRow (schema); count+= 10 ; upper.addInt("id" , count); createTableOptions.addRangePartition(lower, upper); } kuduClient.createTable(tableName+"_range" , schema, createTableOptions); } catch (KuduException e) { e.printStackTrace(); } } @Test public void testMultilevelPartition() throws KuduException { List<ColumnSchema> columnSchemaList = new ArrayList <ColumnSchema>(); columnSchemaList.add(new Column ("id" , Type.INT32, true )); columnSchemaList.add(new Column ("name" , Type.STRING, false )); columnSchemaList.add(new Column ("gender" , Type.STRING, false )); columnSchemaList.add(new Column ("phone" , Type.STRING, false )); Schema schema = new Schema (columnSchemaList); CreateTableOptions createTableOptions = new CreateTableOptions (); List<String > columns = new LinkedList <String >(); columns.add("id" ); createTableOptions.setNumReplicas(1 ); createTableOptions.addHashPartitions(columns, 3 ); int count = 0 ; for (int i = 0 ; i < 3 ; i++) { PartialRow lower = new PartialRow (schema); lower.addInt("id" , count); PartialRow upper = new PartialRow (schema); count+= 10 ; upper.addInt("id" , count); createTableOptions.addRangePartition(lower, upper); } try { kuduClient.createTable(tableName+"_multilevel" , schema, createTableOptions); } catch (KuduException e) { e.printStackTrace(); } kuduClient.close(); } }
2.4 Spark操作Kudu 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 package cn.xiaomaimport java.utilimport org.apache.kudu.{ColumnSchema, Schema, Type}import org.apache.kudu.client.{AlterTableOptions, CreateTableOptions, KuduClient}import org.apache.kudu.spark.kudu.{KuduContext, KuduWriteOptions}import org.apache.parquet.schema.Typesimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}object SparkDemo { val tableName:String = "person_spark1" val kuduMaster = "node2.xiaoma.cn" def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]" ).setAppName(this .getClass.getSimpleName) val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() val kuduContext: KuduContext = new KuduContext("node2.xiaoma.cn:7051" , sparkSession.sparkContext) dataFrame2Kudu(sparkSession) sparkSession.stop() } def createTable(kuduContext: KuduContext) = { if (!kuduContext.tableExists(tableName)){ val schema: StructType = StructType( StructField("id" , IntegerType, false ) :: StructField("name" , StringType, true ) :: StructField("gender" , StringType, true ) :: StructField("phone" , StringType, true ) :: Nil ) val keys: List[String] = List("id" ) val createTableOptions = new CreateTableOptions createTableOptions.setNumReplicas(1 ) val partitionColumnList = new util.ArrayList[String]() partitionColumnList.add("id" ) createTableOptions.addHashPartitions(partitionColumnList, 3 ) kuduContext.createTable(tableName, schema, keys, createTableOptions) } } def loadData(sparkSession: SparkSession, kuduContext: KuduContext) = { val data : List[Person] = List(Person(1 , "zhangsan" , "male" , "2323232" ),Person(2 , "lisi" , "male" , "2323232" )) val personRDD: RDD[Person] = sparkSession.sparkContext.makeRDD(data ) import sparkSession.implicits._ val personDF: DataFrame = personRDD.toDF kuduContext.insertRows(personDF, tableName) } def deleteData(sparkSession: SparkSession, kuduContext: KuduContext) = { val data : List[Person] = List(Person(2 , "lisi" , "male" , "2323232" )) val personRDD: RDD[Person] = sparkSession.sparkContext.makeRDD(data ) import sparkSession.implicits._ val personDF: DataFrame = personRDD.toDF.select("id" ) kuduContext.deleteRows(personDF, tableName) } def updateData(sparkSession: SparkSession, kuduContext: KuduContext) = { val data : List[Person] = List(Person(1 , "zhangsan" , "male" , "11111111111111111" ),Person(2 , "lisi" , "male" , "2222" )) val personRDD: RDD[Person] = sparkSession.sparkContext.makeRDD(data ) import sparkSession.implicits._ val personDF: DataFrame = personRDD.toDF kuduContext.upsertRows(personDF, tableName) } def getTableData(sparkSession: SparkSession) = { val options: Map[String, String] = Map( "kudu.master" ->kuduMaster, "kudu.table" -> tableName ) import org.apache.kudu.spark.kudu._ sparkSession.read.options(options).kudu.show() } def dataFrame2Kudu(sparkSession: SparkSession) = { val data : List[Person] = List(Person(3 , "wangwu" , "male" , "11111111111111111" ),Person(4 , "zhaoliu" , "male" , "2222" )) val personRDD: RDD[Person] = sparkSession.sparkContext.makeRDD(data ) import sparkSession.implicits._ val personDF: DataFrame = personRDD.toDF val options: Map[String, String] = Map( "kudu.master" ->kuduMaster, "kudu.table" -> tableName ) import org.apache.kudu.spark.kudu._ personDF.write.mode(SaveMode.Append).options(options).kudu } def sparkSql2Kudu(sparkSession: SparkSession) = { val data : List[Person] = List(Person(5 , "laowang" , "male" , "11111111111111111" ),Person(6 , "laoli" , "male" , "2222" )) val personRDD: RDD[Person] = sparkSession.sparkContext.makeRDD(data ) import sparkSession.implicits._ val personDF: DataFrame = personRDD.toDF personDF.createTempView("tmp1" ) val options: Map[String, String] = Map( "kudu.master" ->kuduMaster, "kudu.table" -> tableName ) import org.apache.kudu.spark.kudu._ val kuduDataFrame: DataFrame = sparkSession.read.options(options).kudu kuduDataFrame.createTempView("tmp2" ) sparkSession.sql("insert into table tmp2 select * from tmp1" ) } def kuduNativeRdd(sparkSession: SparkSession,kuduContext: KuduContext) = { val columnList = List("id" , "name" , "gender" , "phone" ) val rowRdd: RDD[Row] = kuduContext.kuduRDD(sparkSession.sparkContext, tableName, columnList) rowRdd.foreach(println(_)) } def addColumn(kuduContext: KuduContext) = { val alterTableOptions = new AlterTableOptions alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("address" , Type.STRING).nullable(true ).build()) kuduContext.syncClient.alterTable(tableName, alterTableOptions) } case class Person (id:Int , name:String, gender:String, phone:String) }
3 Kudu原理 3.1 表与schema
Kudu设计是面向结构化存储
的,因此Kudu的表需要用户在建表时定义它的Schema
信息,这些Schema信息包含:
列定义(含类型)
Primary Key定义(用户指定的若干个列的有序组合)
数据的唯一性,依赖于用户所提供的Primary Key
中的Column组合的值的唯一性。
Kudu提供了Alter
命令来增删列,但位于Primary Key中的列是不允许删除的。
Kudu当前并不支持二级索引
。
从用户角度来看,Kudu是一种存储结构化数据表的存储系统。
在一个Kudu集群中可以定义任意数量的table
,每个table都需要预先定义好schema
。每个table的列数是确定的,每一列都需要有名字和类型,每个表中可以把其中一列或多列定义为主键。这么看来,Kudu更像关系型数据库,而不是像HBase、Cassandra和MongoDB这些NoSQL数据库。
不过Kudu目前还不能像关系型数据一样支持二级索引。
Kudu使用确定的列类型,而不是类似于NoSQL的“everything is byte”
(一切都是字节)。
这可以带来两点好处
:
确定的列类型使Kudu可以进行类型特有的编码
可以提供 SQL-like 元数据给其他上层查询工具使用,比如BI工具。
3.2 Kudu的底层数据模型
Kudu的底层数据文件的存储,未采用HDFS这样的较高抽象层次的分布式文件系统,而是自行开发了一套可基于Table/Tablet/Replica视图级别的底层存储系统
这套实现基于如下的几个设计目标
:
可提供快速的列式查询
可支持快速的随机更新
可提供更为稳定的查询性能保障
一个Table会被分成若干个tablet
,其中Tablet
的数量是根据hash
或者是range
进行设置的
一个Tablet中包含MetaData信息
和多个RowSet信息
,其中MetaData信息是block
和block在data中的位置。
一个RowSet包含一个MemRowSet
和多个DiskRowSet
,其中MemRowSet用于存储insert数据和update后的数据
,写满后会刷新到磁盘中也就是多个DiskRowSet中,默认是1G刷新一次或者是2分钟。
DiskRowSet用于老数据的mutation
(改变),比如说数据的更新操作,后台定期对DiskRowSet进行合并操作,删除历史数据和没有的数据,减少查询过程中的IO开销。
一个DiskRowSet包含1个BloomFilter
,1个Ad_hoc Index
,多个UndoFile
、RedoFile
、BaseData
、DeltaMem
BloomFile:根据一个DiskRowSet中的key生成一个bloom filter,用于快速模糊定位某个key是否在DiskRowSet
中存在。
Ad_hoc Index:是主键的索引,用于定位主键在DiskRowSet中的具体偏移位置。
BaseData:是MemRowSet flush下来的数据,按列存储,按主键有序。
UndoFile:是基于BaseData之前时间(上次flush之前)的历史数据,可以获得历史数据,类似mysql中的回滚日志。
RedoFile:是基于BaseData之后时间(上次flush之后)的变更记录,可以获得新的数据,类似mysql中的重做日志。
DeltaMem:在内存中存储DiskRowSet中数据的更新,写满后flush到磁盘,形成deltafile文件。
注意
:
这里有两个在内存中处理的数据集,区别如下:
MemRowSet:存储新增的数据,对该内存数据集中还未flush的数据的更新;
DeltaMem:对已flush到磁盘内的数据的更新;
MemRowSets可以对比理解成HBase中的MemStore
, 而DiskRowSets可理解成HBase中的HFile
。MemRowSets中的数据按照行视图进行存储
,数据结构为B-Tree
。
MemRowSets中的数据被Flush到磁盘之后,形成DiskRowSets。
DiskRowSets中的数据,按照32MB大小为单位
,按序划分为一个个的DiskRowSet。 DiskRowSet中的数据按照Column进行组织,与Parquet类似。
这是Kudu可支持一些分析性查询的基础。每一个Column的数据被存储在一个相邻的数据区域,而这个数据区域进一步被细分成一个个的小的Page单元,与HBase File中的Block类似,对每一个Column Page可采用一些Encoding算法,以及一些通用的Compression算法。 既然可对Column Page可采用Encoding以及Compression算法,那么,对单条记录的更改就会比较困难了。
前面提到了Kudu可支持单条记录级别的更新/删除,是如何做到的?
与HBase类似,也是通过增加一条新的记录来描述这次更新/删除操作的。DiskRowSet是不可修改了,那么 KUDU 要如何应对数据的更新呢?在KUDU中,把DiskRowSet分为了两部分:
base data: 负责存储基础数据
delta stores:delta stores负责存储 base data 中的变更数据.
如上图所示,数据从 MemRowSet
刷到磁盘后就形成了一份 DiskRowSet
(只包含 base data),每份 DiskRowSet 在内存中都会有一个对应的 DeltaMemStore,负责记录此 DiskRowSet 后续的数据变更(更新、删除)。DeltaMemStore 内部维护一个 B-树索引,映射到每个 row_offset 对应的数据变更。DeltaMemStore 数据增长到一定程度后转化成二进制文件存储到磁盘,形成一个 DeltaFile,随着 base data 对应数据的不断变更,DeltaFile 逐渐增长。
3.3 Kudu的读写原理 3.3.1 工作模式
Kudu的工作模式如下图,有些在上面的内容中已经介绍了,这里简单标注一下:
每个kudu table按照hash或range分区为多个tablet;
每个tablet中包含一个MemRowSet以及多个DiskRowSet;
每个DiskRowSet包含BaseData以及DeltaStores;
DeltaStores由多个DeltaFile和一个DeltaMemStore组成;
insert请求的新增数据以及对MemRowSet中数据的update操作(新增的数据还没有来得及触发compaction操作再次进行更新操作的新数据
) 会先进入到MemRowSet;
当触发flush条件时将新增数据真正的持久化到磁盘的DiskRowSet内;
对老数据的update和delete操作是提交到内存中的DeltaMemStore;
当触发flush条件时会将更新和删除操作持久化到磁盘DIskRowSet中的DeltaFile内,此时老数据还在BaseData内(逻辑删除),新数据已在DeltaFile内;
当触发compaction条件时,将DeltaFile和BaseData进行合并,DiskRowSet进行合并,此时老数据才真正的从磁盘内消失掉(物理删除),只留下更新后的数据记录;
3.3.2 Kudu的读流程
客户端向Kudu Master请求tablet所在位置
Kudu Master返回tablet所在位置
为了优化读取和写入,客户端将元数据进行缓存
根据主键范围过滤目标tablet,请求Tablet Follower
根据主键过滤scan范围,定位DataRowSets
加载BaseData,并与DeltaStores合并,得到老数据的最新结果
拼接第6步骤得到的老数据与MemRowSet数据 得到所需数据
将数据返回给客户端
3.3.3 Kudu的写流程
客户端向Kudu Master请求tablet所在位置;
Kudu Master返回tablet所在位置;
为了优化读取和写入,客户端将元数据进行缓存;
根据分区策略,路由到对应Tablet,请求Tablet Leader;
根据RowSet记录的主键范围过滤掉不包含新增数据主键的RowSet;
根据RowSet 布隆过滤器再进行一次过滤,过滤掉不包含新数据主键的RowSet;
查询RowSet中的B树索引判断是否命中新数据主键,若命中则报错主键冲突,否则新数据写入MemRowSet;
返回响应给客户端;
3.3.4 Kude的更新流程
更新删除流程与写入流程类似,区别就是最后判断是否存在主键时候的操作,若存在才能更新,不存在才能插入新数据。
客户端向Kudu Master请求tablet所在位置
Kudu Master返回tablet所在位置
为了优化读取和写入,客户端将元数据进行缓存
根据分区策略,路由到对应Tablet,请求Tablet Leader
根据RowSet记录的主键范围过滤掉不包含修改的数据主键的RowSet
根据RowSet 布隆过滤器再进行一次过滤,过滤掉不包含修改的数据主键的RowSet
查询RowSet中的B树索引判断是否命中修改的数据主键,若命中则修改至DeltaStores,否则报错数据不存在
返回响应给客户端
4 Kudu优化 4.1 Kudu关键配置
TabletServer 在开始拒绝所有传入的写入之前可以消耗的最大内存量:memory_limit_hard_bytes=1073741824
分配给 Kudu Tablet Server 块缓存的最大内存量:block_cache_capacity_mb=512
4.2 Kudu的使用限制 4.2.1 主键
创建表后,不能更改主键。必须删除并重新创建表以选择新的主键。
创建表的时候,主键必须放在最前边。
主键不能通过 update 更新,如果要修改主键就必须先删除行,然后重新插入。这种操作不是原子性的。(kudu的删除和插入操作无法事务)
不支持自动生成主键,可以通过内置的 uuid 函数表示为主键值。
联合主键由 kudu 编码后,大小不能超过 16KB。
4.2.2 Cells
在编码或压缩之前,任何单个单元都不得大于 64KB。
在 Kudu 完成内部复合键编码之后,组成复合键的单元格总共限制为 16KB。
如果插入不符合这些限制的行时会报错误并返回给客户端。
4.2.3 字段
默认情况下,Kudu 不允许创建超过 300 列的表。官方建议使用较少列的 Schema 设计以获得最佳性能。
不支持 CHAR、VARCHAR、DATE 和数组等复杂类型。
现有列的类型和是否允许为空,一旦设置后,是不可修改的。
Decimal 类型的精度不可修改。也不允许通过更改表来更改 Decimal 列的精度和小数位数
删除列不会立即回收空间。首先必须运行压缩。
4.2.4 表
表中的副本数必须为奇数,最多为 7
复制因子(在表创建时设置)不能更改
无法手动运行压缩,但是删除表将立即回收空间
4.2.5 其他限制
不支持二级索引。
不支持多行事务。
不支持外键。
列名和表名之类的标识符仅限于有效的 UTF-8 字符串并且其最大长度为 256 个字符。
4.2.6 分区限制
表必须根据一个主键 or 联合主键被预先切成 tablet,不支持自动切。表被创建后不支持修改分区字段,支持添加和删除 range 分区(意思分区表,分区字段需提前定义好,kudu 不会自动分)。
已经存在的表不支持自动重新分区,只能创建新表时指定。
丢失副本时,必须通过手动修复方式来恢复。
4.2.7 扩展建议和限制
建议 TabletServer 最多为 100 台。
建议 Master 最多 3 台。
建议每个 TabletServer 最大数据为 8T(压缩后)。
建议每台 TabletServer 的 tablet 数为 1000,最多 2000。
创建表的时候,建议在每个 Tablet Server 上,每个表的 Tablet 数最大为 60,也就是 3 节点的话,3 副本,创表分区最大 60,这样每个单 TabletServer 上该表的 Tablets 也就为 60。
建议每个 Tablet 最大为 50GB,超出后可能导致压缩和启动有问题。
建议单 Tablet 的大小<10GB。
4.2.8 守护进程
部署至少 4G 内存,理想情况下应超过 16GB。
预写日志(WAL)只能存储在一个磁盘上。
不能直接删除数据目录,必须使用重新格式化数据目录的方式来达到删除目的。
TabletServer 不能修改 IP 和 PORT。
Kudu 对 NTP 有严格要求,如果时间不同步时,Kudu 的 Master 和 TabletServer 会崩溃。
Kudu 仅使用 NTP 进行了测试,不支持其他时间同步工具。
4.2.9 集群管理限制
不支持滚动重启。
建议 Kudu 集群中的最大点对点延迟为 20 毫秒。推荐的最小点对点带宽是 10GB。
如果要使用位置感知功能将平板服务器放置在不同的位置,官方建议先测量服务器之间的带宽和延迟,以确保它们符合上述指导原则。
首次启动群集时,必须同时启动所有 Master 服务。
4.2.10 复制和备份限制
Kudu 当前不支持任何用于备份和还原的内置功能。鼓励用户根据需要使用 Spark 或 Impala之类的工具导出或导入表。
4.2.11 Impala集成限制
创建 Kudu 表时,建表语句中的主键字段必须在最前面。
Impala 无法更新主键列中的值。
Impala 无法使用以下命令创建 Kudu 表 VARCHAR 或嵌套类型的列。
名称包含大写字母或非 ASCII 字符的 Kudu 表在 Impala 中用作外部表时,必须分配一个备用名称。
列名包含大写字母或非 ASCII 字符的 Kudu 表不能用作 Impala 中的外部表。可以在 Kudu 中重命名列以解决此问题。
!=和 like 谓词不会下推到 Kudu,而是由 Impala 扫描节点评估。相对于其他类型的谓语,这会导致降低性能。
使用 Impala 进行更新,插入和删除是非事务性的。如果查询在部分途中失败,则其部分效果不会回滚。
单个查询的最大并行度受限于 Table 中 Tablet 的数量。为了获得良好的分析性能,每位主机目标为 10 片或更多 tablets。
Impala 的关键字(PARTITIONED、LOCATION、ROWFORMAT)不适用于在创建 Kudu 表时使用。
4.2.12 Spark集成限制
必须使用 JDK8,自 Kudu-1.5.0 起,Spark 2.2 是默认的依赖项版本。
Kudu 表只能在 Spark SQL 中注册为临时表。
无法使用 HiveContext 查询 Kudu 表。
关注博主不迷路