• 周日. 11月 27th, 2022

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

Hivetohbase practice of shell based on spark

[db:作者]

1月 6, 2022

{“type”:”doc”,”content”:[{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” This paper introduces in detail how to make the Hive The data in is written in quickly and stably HBase in . Because of the large amount of data , What we did is HBase Official bulkload To avoid HBase put api Write high pressure defects . The early adoption of the team was MapReduce It is computationally generated HFile, And then use bulkload Carry out the work of data import .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Because of structural factors , The overall performance is not very good , It is unacceptable for some business parties . The most important factor is to build HBase The planning of table time pre partition is unreasonable , As a result, many tasks in the future take too long to run , Many of them have reached 4~5 It’s going to take an hour to make it .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” In reviewing and planning , It’s natural to think of one that performs better in terms of computation Spark. It will take over MapReduce Complete data format conversion , And generate HFile The core work of .”}]},{“type”:”heading”,”attrs”:{“align”:null,”level”:2},”content”:[{“type”:”text”,”text”:”HiveToHBase Full resolution “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” In the actual production work, because the work involves the interaction between two data terminals , In order to better understand the overall process and how to optimize , know ETL Why do you need steps that don’t seem to be needed in the process , We need a simple understanding first HBase The architecture of .”}]},{“type”:”heading”,”attrs”:{“align”:null,”level”:3},”content”:[{“type”:”text”,”text”:”1. HBase A brief introduction to the structure “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”Apache HBase Is an open source non relational distributed database , To run on HDFS above . It can be based on HDFS Providing real-time computing services is mainly determined by the architecture and underlying data structure , by LSM-Tree (Log-Structured Merge-Tree) + HTable (Region Partition ) + Cache Decisive :”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”bulletedlist”,”content”:[{“type”:”listitem”,”attrs”:{“listStyle”:null},”content”:[{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”LSM Tree is one of the most popular immutable disk storage structures , Use only caching and append file Method to realize the sequential write operation . The key point is : Sort string table Sorted-String-Table, We don’t go into the details here , This underlying structure is very important for bulkload The most important requirement is that the data needs to be sorted . And then HBase In terms of storage form , Namely KeyValue You need to sort !”}]}]},{“type”:”listitem”,”attrs”:{“listStyle”:null},”content”:[{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”HTable The data need to be evenly distributed in each Region in , visit HBase I’ll go first HBase The system table looks up and locates which record belongs to Region , And then go to this Region Which is it? RegionServer, Then go to the corresponding server to find the corresponding Region Data in .”}]}]}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” final bulkload The process is the same , The difference is just creating HFile Steps for . This is also the focus of the following part .”}]},{“type”:”heading”,”attrs”:{“align”:null,”level”:3},”content”:[{“type”:”text”,”text”:”2. Data flow path “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Data from Hive To HBase The process is roughly as shown in the figure below :”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”image”,”attrs”:{“src”:”https:\/\/static001.geekbang.org\/infoq\/23\/230313c5a6a543388040dcf2f88b815e.png”,”alt”:” picture “,”title”:null,”style”:[{“key”:”width”,”value”:”75%”},{“key”:”bordertype”,”value”:”none”}],”href”:null,”fromPaste”:true,”pastePass”:true}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” The whole process really needs us cover is ETL ( Extract Transfer Load ) part ,HBase The underlying file HFile Belongs to the column file , Each column is represented by KeyValue Data format for storage .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”image”,”attrs”:{“src”:”https:\/\/static001.geekbang.org\/infoq\/77\/772ddfa8e84484752523ce16e9a4bdc0.png”,”alt”:” picture “,”title”:null,”style”:[{“key”:”width”,”value”:”75%”},{“key”:”bordertype”,”value”:”none”}],”href”:null,”fromPaste”:true,”pastePass”:true}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” The logic of what really needs to be done is simple :( For simplicity 、 No timestamp Version column )、HBase The logical concept of a piece of data is simplified as follows :”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”image”,”attrs”:{“src”:”https:\/\/static001.geekbang.org\/infoq\/0f\/0fb006a6ac54d4f707be36278e70c17a.png”,”alt”:” picture “,”title”:null,”style”:[{“key”:”width”,”value”:”75%”},{“key”:”bordertype”,”value”:”none”}],”href”:null,”fromPaste”:true,”pastePass”:true}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” If you see here , Congratulations, you have basically understood the logic of this article . Next is the code principle time :”}]},{“type”:”heading”,”attrs”:{“align”:null,”level”:2},”content”:[{“type”:”text”,”text”:”MapReduce Workflow “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”Map\/Reduce The frame works on key value pairs , That is to say, the framework regards the input of the job as a set of key value pairs , It also outputs a set of key value pairs as the output of the operation . It’s like this in our scenario :”}]},{“type”:”heading”,”attrs”:{“align”:null,”level”:3},”content”:[{“type”:”text”,”text”:”1. mapper: Data format conversion “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”mapper The purpose of this method is to put a row of data , To rowkey:column family:qualifer:value In the form of . pivotal ETL The code is to map Obtained value, Turn into Output 、 And give it to reducer To deal with .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”codeblock”,”attrs”:{“lang”:”text”},”content”:[{“type”:”text”,”text”:”\nprotected void map(LongWritable key, Text value, Mapper.Context context)\n throws IOException, InterruptedException {\n \/\/ Cut a string into each column value Array \n String[] values = value.toString().split(\”\\\\x01\”, -1);\n String rowKeyStr = generateRowKey();\n ImmutableBytesWritable hKey = new ImmutableBytesWritable(Bytes.toBytes(rowKeyStr));\n\n Put hPut = new Put(Bytes.toBytes(rowKeyStr));\n for (int i = 0; i < columns.length; i++) {\n String columnStr = columns[i];\n String cfNameStr = \"cf1\";\n String cellValueStr = values[i].trim();\n \n byte[] columbByte = Bytes.toBytes(columnStr);\n byte[] cfNameByte = Bytes.toBytes(cfNameStr);\n byte[] cellValueByte = Bytes.toBytes(cellValueStr);\n \n hPut.addColumn(cfNameByte, columbByte, cellValueByte);\n \n }\n context.write(hKey, hPut);\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"mapper Finished writing , It seems that the data format has been converted , It also needs to be reducer Do you ? There's no official reference to reducer The news of , I'll think about it It's not that simple ! Read the submission Job In addition to the output file format settings and other mr The procedure is different :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"\njob.setOutputFormatClass(HFileOutputFormat2.class);"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" There's another part of the program that doesn't have , That's it :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"\nHFileOutputFormat2.configureIncrementalLoad(job,htable)"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" So it's called Siyi job Conduct HFile Related configuration .HFileOutputFormat2  It's provided by the toolkit , Let's see what's going on inside !"}]},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"2. job Configuration of "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Select and compare the configuration of the relevant core :"}]},{"type":"bulletedlist","content":[{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" according to mapper Output format to automatically set reducer, It means that we have this mr The program can just write mapper, No need to write reducer 了 ."}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Get corresponding HBase Table each region Of startKey, according to region Set the number of reduce The number of , Simultaneous configuration partitioner Let's move on mapper Data generated , Disperse to the corresponding partition ( reduce ) in ."}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong"}],"text":"reducer Automatic settings for "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"\n\/\/ Based on the configured map output class, set the correct reducer to properly\n\/\/ sort the incoming values.\n\/\/ TODO it would be nice to pick one or the other of these formats.\nif (KeyValue.class.equals(job.getMapOutputValueClass())) {\n job.setReducerClass(KeyValueSortReducer.class);\n} else if (Put.class.equals(job.getMapOutputValueClass())) {\n job.setReducerClass(PutSortReducer.class);\n} else if (Text.class.equals(job.getMapOutputValueClass())) {\n job.setReducerClass(TextSortReducer.class);\n} else {\n LOG.warn(\"Unknown map output value type:\" + job.getMapOutputValueClass());\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" In fact, the above three reducer At the bottom, it turns data into KeyValue form , And then sort it . It should be noted that KeyValue The order of all is full order , It's not in a single rowkey Just sort it . The so-called full order , Is to compare the whole object !"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" see KeyValueSortRducer You'll find that the bottom layer is called KeyValue.COMPARATOR Comparator , It is from Bytes.compareTo(byte[] buffer1, int offset1, int length1, byte[] buffer2, int offset2, int length2) Put two KeyValue Each byte of the object is compared from the beginning , This is the full sort form mentioned above ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" The file format we output is HFileOutputFormat2, It also checks when we write data check Whether the data written each time is in accordance with KeyValue.COMPARATOR The order of definition , If there is no sorting, it will report an error and exit !Added a key not lexically larger than previous."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong"}],"text":"reduce Quantity and partitioner Set up "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Why according to HBase Of region To set up our reduce And the number of partitions ? It's mentioned in the above section ,region yes HBase A key point of the query . Every htable Of region It will have its own 【startKey、endKey】, It's distributed in different region server in ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https:\/\/static001.geekbang.org\/infoq\/a7\/a7f780e06bef1a1e37dfc180ade95497.png","alt":" picture ","title":null,"style":[{"key":"width","value":"75%"},{"key":"bordertype","value":"none"}],"href":null,"fromPaste":true,"pastePass":true}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" This key The scope of is related to rowkey Matching , Take the table above as an example , Data into the region The logical scenario is as follows :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https:\/\/static001.geekbang.org\/infoq\/79\/79a7545c8e6737cb0b324c88bb46531c.png","alt":" picture ","title":null,"style":[{"key":"width","value":"75%"},{"key":"bordertype","value":"none"}],"href":null,"fromPaste":true,"pastePass":true}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" It is precisely because of this management structure , Give Way HBase The table of rowkey Design and region Pre partition ( In fact, that is region The quantity is related to [starkey,endkey] Set up ) It is very important in the daily production process . In the scenario of mass data import, of course, we also need to consider !"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"HBase The documents are in hdfs The path is :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"\n\/HBase\/data\/\/\/\/\/”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” By parallel processing Region To speed up the response time of the query , So I’m going to ask each of you Region Try to balance the amount of data , Otherwise, a large number of requests will pile up in a Region On , Cause hot issues 、 about Region Server It’s going to be a lot of pressure .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” How to avoid hot issues and good pre partitioning and rowkey Design is not our focus , But that can explain why ETL In the process of development, it needs to be based on region Of startkey Conduct reduce The partition . It’s all about the fit HBase The original design , Let the follow-up bulkload It’s simple and convenient , Quickly generate HFile Import directly to region in !”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” This is the part of subsequent optimization , Give Way HiveToHBase Be able to get rid of other front-end processes as much as possible ( build htable ) Interference of 、 More focused on ETL part . Actually bulkload There is no mandatory requirement for a HFile It’s all the same in China region The record of !”}]},{“type”:”heading”,”attrs”:{“align”:null,”level”:3},”content”:[{“type”:”text”,”text”:”3. perform bulkload、 The ritual of completion “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” So far, what we started with is the need cover The key part of has been completed and analyzed the underlying principle , Plus the last job Submit and bulkload, Put an end to the whole process .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”codeblock”,”attrs”:{“lang”:”text”},”content”:[{“type”:”text”,”text”:”\nJob job = Job.getInstance(conf, \”HFile Generator … \”);\njob.setJarByClass(MRMain.class);\njob.setMapperClass(MRMapper.class);\njob.setMapOutputKeyClass(ImmutableBytesWritable.class);\njob.setMapOutputValueClass(Put.class);\njob.setInputFormatClass(TextInputFormat.class);\njob.setOutputFormatClass(HFileOutputFormat2.class);\n\nHFileOutputFormat2.configureIncrementalLoad(job, htable);\n\/\/ wait for mr Run complete \njob.waitForCompletion(true);\n\nLoadIncrementalHFiles loader = new LoadIncrementalHFiles(loadHBaseConf);\nloader.doBulkLoad(new Path(targetHtablePath), htable);”}]},{“type”:”heading”,”attrs”:{“align”:null,”level”:3},”content”:[{“type”:”text”,”text”:”4. Analysis of current situation “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Here we are. HiveToHBase Of MapReduce The work details and process have been resolved , Let’s take a look at an example of a running task , According to the total 248903451 strip ,60GB Compressed ORC file .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”marks”:[{“type”:”strong”}],”text”:” Pain points “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Because the task of history HBase When creating tables, the pre partition is not set or unreasonable , Leading to a lot of tasks region There are only a few . So the historical task performance card points are basically in progress reduce Generate HFile When , It is found that 747 individual Map Implemented about 4 minute , There are two left Reduce Yes 2 Hours 22 minute .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”image”,”attrs”:{“src”:”https:\/\/static001.geekbang.org\/infoq\/66\/6651209c7acade54839ad794d72370e5.png”,”alt”:” picture “,”title”:null,”style”:[{“key”:”width”,”value”:”75%”},{“key”:”bordertype”,”value”:”none”}],”href”:null,”fromPaste”:true,”pastePass”:true}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” And the platform as a whole HiveToHBase Of HBase surface region The number distribution is as follows :”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”image”,”attrs”:{“src”:”https:\/\/static001.geekbang.org\/infoq\/67\/6725214f7b47de4f18bd466be87808ba.png”,”alt”:” picture “,”title”:null,”style”:[{“key”:”width”,”value”:”75%”},{“key”:”bordertype”,”value”:”none”}],”href”:null,”fromPaste”:true,”pastePass”:true}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” You can see most of the HBase surface region There are only a few of them , In this case, if the previous system is used for partition . So the overall performance change can be expected not to be too high !”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” And for historical reasons HiveToHBase Support users to write sql complete Hive Data processing , Then import HBase in .mr It is not supported sql Written in , It was used first tez The engine insert overwrite directory + sql The way to generate temporary files , Then the temporary file is processed as mentioned above .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”marks”:[{“type”:”strong”}],”text”:” Solution “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” After comprehensive consideration , Decided to adopt Spark rewrite HiveToHBase The process of . Now the official toolkit has been provided , There are also examples scala Code ( Apache HBase Reference Guide、 Chinese version :HBase and Spark-HBase Chinese Reference Guide 3.0 ), Let’s write like MR Just write mapper, You don’t have to worry about partitioning and sorting .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” But it doesn’t solve our pain , So I decided not to use the official toolbox , This is exactly what we analyze mr Of job The biggest reason for configuration , You can customize the development according to your own needs .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” I remember what I said above , Actually bulkload There is no mandatory requirement for a HFile It’s all the same in China region The record of Do you ? So we can not follow region Quantity segmentation partition Of , Get rid of htable region Influence .HBase bulkload When meeting check It was generated before HFile, See where the data should be divided Region in .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” If it is the way before, put the same prefix in advance rowkey So bulkload It’s going to be fast , And if not in this way , each region The data is mixed up in a HFile in , Then it will be right bulkload Has a certain impact on the performance and load of ! This needs to be evaluated according to the actual situation .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Use Spark Why :”}]},{“type”:”bulletedlist”,”content”:[{“type”:”listitem”,”attrs”:{“listStyle”:null},”content”:[{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Consider that it directly supports sql Connect hive, Can optimize the steps mentioned above , The whole process will be simpler .”}]}]},{“type”:”listitem”,”attrs”:{“listStyle”:null},”content”:[{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”spark In terms of architecture, it’s better than mr It runs much faster .”}]}]}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” The last expectation is illustrated by the above example Here’s the picture :”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”image”,”attrs”:{“src”:”https:\/\/static001.geekbang.org\/infoq\/79\/797e0548830150cabbf545d690384257.png”,”alt”:” picture “,”title”:null,”style”:[{“key”:”width”,”value”:”75%”},{“key”:”bordertype”,”value”:”none”}],”href”:null,”fromPaste”:true,”pastePass”:true}},{“type”:”heading”,”attrs”:{“align”:null,”level”:2},”content”:[{“type”:”text”,”text”:”Spark Workflow “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Core process code : And MR similar , But it uses Spark take RDD Written as a disk file api. We need to sort the data ourselves .”}]},{“type”:”heading”,”attrs”:{“align”:null,”level”:3},”content”:[{“type”:”text”,”text”:”1. Sort “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Construct a KeyFamilyQualifier class , And then inherit Comparable Design similar to complete sequencing . The actual verification process only needs rowkey:family:qualifier Just sort it out .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”codeblock”,”attrs”:{“lang”:”text”},”content”:[{“type”:”text”,”text”:”\npublic class KeyFamilyQualifier implements Comparable, Serializable {\n\n private byte[] rowKey;\n private byte[] family;\n private byte[] qualifier;\n\n public KeyFamilyQualifier(byte[] rowKey, byte[] family, byte[] qualifier) {\n this.rowKey = rowKey;\n this.family = family;\n this.qualifier = qualifier;\n }\n\n @Override\n public int compareTo(KeyFamilyQualifier o) {\n int result = Bytes.compareTo(rowKey, o.getRowKey());\n if (result == 0) {\n result = Bytes.compareTo(family, o.getFamily());\n if (result == 0) {\n result = Bytes.compareTo(qualifier, o.getQualifier());\n }\n }\n return result;\n }\n}”}]},{“type”:”heading”,”attrs”:{“align”:null,”level”:3},”content”:[{“type”:”text”,”text”:”2. Core processing flow “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”spark Because there is no automatic configuration reducer, We need to do more ourselves . Here’s the workflow :”}]},{“type”:”bulletedlist”,”content”:[{“type”:”listitem”,”attrs”:{“listStyle”:null},”content”:[{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” take spark Of dataset Turn to this part, we deal with ETL Key points of .”}]}]},{“type”:”listitem”,”attrs”:{“listStyle”:null},”content”:[{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Will be in accordance with the KeyFamilyQualifier Sort , Satisfy HBase The underlying needs , This step uses sortByKey(true) Ascending order is OK , because Key It’s above. KeyFamilyQualifier!”}]}]},{“type”:”listitem”,”attrs”:{“listStyle”:null},”content”:[{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Turn the ordered data into ,HFile Accepted input data format .”}]}]},{“type”:”listitem”,”attrs”:{“listStyle”:null},”content”:[{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” What will be built rdd Data sets , Turn into hfile File format .”}]}]}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”codeblock”,”attrs”:{“lang”:”text”},”content”:[{“type”:”text”,”text”:”\nSparkSession spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate();\nDataset rows = spark.sql(hql);\n\nJavaPairRDD javaPairRDD = rows.javaRDD()\n .flatMapToPair(row -> rowToKeyFamilyQualifierPairRdd(row).iterator())\n .sortByKey(true)\n .mapToPair(combineKey -> {\n return new Tuple2(combineKey._1()._1(), combineKey._2());\n });\n\nJob job = Job.getInstance(conf, HBaseConf.getName());\njob.setMapOutputKeyClass(ImmutableBytesWritable.class);\njob.setMapOutputValueClass(KeyValue.class);\nHFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator); \/\/ Use job Of conf, Instead of using job In itself ; Complete the follow-up compression,bloomType,blockSize,DataBlockSize Configuration of \njavaPairRDD.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, job.getConfiguration());”}]},{“type”:”heading”,”attrs”:{“align”:null,”level”:3},”content”:[{“type”:”text”,”text”:”3. Spark: Data format conversion “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”row -> rowToKeyFamilyQualifierPairRdd(row).iterator()  this part It’s really just the row The data goes to “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”codeblock”,”attrs”:{“lang”:”text”},”content”:[{“type”:”text”,”text”:”\n\/\/ Get field Of tuple\nTuple2[] dTypes = dataset.dtypes();\nreturn dataset.javaRDD().flatMapToPair(row -> {\n List<Tuple2> kvs = new ArrayList();\n byte[] rowKey = generateRowKey();\n \/\/ If rowKey by null, skip \n if (rowKey != null) {\n for (Tuple2 dType : dTypes) {\n Object obj = row.getAs(dType._1);\n if (obj != null) {\n kvs.add(new Tuple2(new KeyFamilyQualifier(rowkey,\”cf1\”.getBytes(),Bytes.toBytes(dType._1)),getKV(param-x));\n }\n }\n } else {\n LOGGER.error(\”row key is null ,row = {}\”, row.toString());\n }\n return kvs.iterator();\n});”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” This is about HiveToHBase Of spark The way is done , About partition We set the parameters separately for easy adjustment :”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”codeblock”,”attrs”:{“lang”:”text”},”content”:[{“type”:”text”,”text”:”\n\/\/ If the parameters of the task Into Scheduled number of partitions \nif (partitionNum > 0) {\n hiveData = hiveData.repartition(partitionNum);\n}”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Separated partition And sort The process of , because repartition It’s also necessary shuffle There is a loss of performance , So it’s not on by default . Just follow spark Normal read strategy One hdfs block Corresponding to one partition that will do . If there are tasks that require special maintenance , For example, increase the degree of parallelism , It can also be controlled by parameters .”}]},{“type”:”heading”,”attrs”:{“align”:null,”level”:2},”content”:[{“type”:”text”,”text”:” distinguish between “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” The task in the above example is run in a new way , function 33 Minutes to complete . from 146 Minutes to 33 minute , The performance has been improved 4 Times more than . Because task migration and upgrade still need a lot of front-end work , The overall data was not produced at the time of writing , So for the time being, take a single task as an example to conduct a comparative experiment .( Because the operation of tasks is closely related to the resources of the cluster , For reference only )”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” You can see the change in strategy for bulkload There is little change in the performance of , It turns out that our strategy is feasible :”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”image”,”attrs”:{“src”:”https:\/\/static001.geekbang.org\/infoq\/9e\/9e0fc8cf297f729369b0d712ffd29f7b.png”,”alt”:” picture “,”title”:null,”style”:[{“key”:”width”,”value”:”75%”},{“key”:”bordertype”,”value”:”none”}],”href”:null,”fromPaste”:true,”pastePass”:true}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Another task is the original mr The way it works requires 5.29 Hours , Migrate to spark The way After tuning ( Improve partition Number ) It only needs 11 minute 45 second . The most important thing in this way is that it can be adjusted manually , It’s flexible to maintain . The running time of offline tasks is restricted by many factors , Although the experiment is not very convincing , But basically, it can be compared that the improved performance is very much .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Limited to space , There are many points that we can’t elaborate on , For example, add salt to make the data evenly distributed in region in ,partition Automatic calculation of ,spark Generate hfile In the process oom problem . The writing is clumsy , I hope you can get something .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Finally, I would like to thank Yu Song and Feng Liang for their help in the development and testing process , And the strong support of my classmates .”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”marks”:[{“type”:”strong”}],”text”:” Reference article :”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”1. 20 This picture takes you to HBase Travel around the world 【 turn 】 – sunsky303 – Blog Garden “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”https:\/\/www.cnblogs.com\/sunsky303\/p\/14312350.html”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”2. HBase principle - Data reading process analysis “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”http:\/\/HBasefly.com\/2016\/12\/21\/HBase-getorscan\/?aixuds=6h5ds3″}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”3. Hive、Spark SQL Task parameter tuning “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”https:\/\/www.jianshu.com\/p\/2964bf816efc”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”4. Spark On HBase The official jar Package compilation and use “}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”https:\/\/juejin.cn\/post\/6844903961242124295″}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”5. Apache HBase Reference Guide”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”https:\/\/hbase.apache.org\/book.html#_bulk_load”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”6. HBase and Spark-HBase Chinese Reference Guide 3.0″}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”https:\/\/www.cntofu.com\/book\/173\/docs\/17.md”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null}},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Reprinted from :DataFunTalk(ID:dataFunTalk)”}]},{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:” Link to the original text :”},{“type”:”link”,”attrs”:{“href”:”https:\/\/mp.weixin.qq.com\/s\/pfeg25F_E3UrZJXJRXsfug”,”title”:”xxx”,”type”:null},”content”:[{“type”:”text”,”text”:” Shells are based on Spark Of HiveToHBase practice “}]}]}]}

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注