Question? Leave a message!




Cloud Computing and Hadoop data Management

Cloud Computing and Hadoop data Management
Cloud Computing Data Management in the Cloud Dell Zhang Birkbeck, University of London 2016/17Data Management in Today’s OrganisationsBig Data Analysis • Petascale datasets are everywhere: – Facebook: 2.5PB of user data + 15TB/day (4/2009) – eBay: 6.5PB of user data + 50TB/day (5/2009) – … • A lot of these datasets are (mostly) structured – Query logs – Pointofsale records – User data (e.g., demographics) – …Big Data Analysis • How do we perform data analysis at scale – Relational databases (RDBMS) – MapReduce (Hadoop)RDBMS vs MapReduce • Relational databases – Multipurpose • transactions analysis • batch interactive – Data integrity via ACID transactions – Lots of tools in software ecosystem • for ingesting, reporting, etc. – Supports SQL (and SQL integration, e.g., JDBC) – Automatic SQL query optimization Source: O’Reilly Blog post by Joseph Hellerstein (11/19/2008)RDBMS vs MapReduce • MapReduce (Hadoop): – Designed for large clusters, fault tolerant – Data is accessed in “native format” – Supports many query languages – Programmers retain control over performance – Open source Source: O’Reilly Blog post by Joseph Hellerstein (11/19/2008)Database Workloads • Online Transaction Processing (OLTP) – Typical applications: • ecommerce, banking, airline reservations – User facing: • realtime, low latency, highlyconcurrent – Tasks: • relatively small set of “standard” transactional queries – Data access pattern: • random reads, updates, writes (involving relatively small amounts of data)Database Workloads • Online Analytical Processing (OLAP) – Typical applications: • business intelligence, data mining – Backend processing: • batch workloads, less concurrency – Tasks: • complex analytical queries, often ad hoc – Data access pattern: • table scans, large amounts of data involved per queryOne Database or Two • Downsides of coexisting OLTP and OLAP workloads – Poor memory management – Conflicting data access patterns – Variable latency • Solution: separate databases – OLTP database for userfacing transactions – OLAP database for data warehousing • How do we connect the twoOLTP/OLAP Architecture ETL (Extract, Transform, and Load) OLTP OLAPOLTP/OLAP Integration • ExtractTransformLoad (ETL) – Extract records from OLTP database – Transform records • clean data, check integrity, aggregate, etc. – Load records into OLAP databaseOLTP/OLAP Integration • OLTP database for userfacing transactions – Retain records of all activity – Periodic ETL (e.g., nightly) • OLAP database for data warehousing – Business intelligence • reporting, ad hoc queries, data mining, etc. – Feedback to improve OLTP servicesBusiness Intelligence • Premise: more data leads to better business decisions – Periodic reporting as well as ad hoc queries – Analysts, not programmers • Importance of tools and dashboardsBusiness Intelligence • Examples: – Slicinganddicing activity by different dimensions to better understand the marketplace – Analyzing log data to improve OLTP experience – Analyzing log data to better optimize ad placement – Analyzing purchasing trends for better supply chain management – Mining for correlations between otherwise unrelated activitiesOLTP/OLAP Architecture: Hadoop ETL (Extract, Transform, and Load) OLTP OLAPOLTP/OLAP/Hadoop Architecture ETL (Extract, Transform, and Load) OLTP Hadoop OLAPETL Bottleneck • Reporting is often a nightly task: – ETL is often slow: why – What happens if processing 24 hours of data takes longer than 24 hoursETL Bottleneck • Hadoop is perfect: – Most likely, you already have some data warehousing solution – Ingestion is limited by the speed of HDFS – Scales out with more nodes – Massively parallel – Ability to use any processing tool – Much cheaper than parallel databases – ETL is a batch process anywayMapReduce Algorithms for Processing Relational and Matrix DataWorking Scenario • Two tables: – User demographics (gender, age, income, etc.) – User page visits (URL, time spent, etc.) • Analyses we might want to perform: – Statistics on demographic characteristics – Statistics on page visits – Statistics on page visits by URL – Statistics on page visits by demographic characteristicRelational Algebra • Primitives – Projection () – Selection () – Cartesian product () – Set union () – Set difference () – Rename () – …Relational Algebra • Other operations – Join (⋈) – Group by… aggregation – …Projection R R 1 1 R R 2 2 R R 3 3  R R 4 4 R R 5 5Projection in MapReduce • Easy – Map over tuples, emit new tuples with appropriate attributes – No reducers • unless for regrouping or resorting tuples – Alternatively: perform in reducer, after some other processingProjection in MapReduce • Basically limited by HDFS streaming speeds – Speed of encoding/decoding tuples becomes important – Relational databases take advantage of compression – Semistructured data No problemSelection R 1 R 2 R 1 R 3 R 3  R 4 R 5Selection in MapReduce • Easy – Map over tuples, emit only tuples that meet criteria – No reducers • unless for regrouping or resorting tuples – Alternatively: perform in reducer, after some other processingSelection in MapReduce • Basically limited by HDFS streaming speeds – Speed of encoding/decoding tuples becomes important – Relational databases take advantage of compression – Semistructured data No problemGroup by… Aggregation • What is the average time spent per URL • In SQL: – SELECT url, AVG(time) FROM visits GROUP BY url • In MapReduce: – Map over tuples, emit time, keyed by url – Framework automatically groups values by keys – Compute average in reducer – Optimize with combinersRelational Joins R S 1 1 R S 2 2 R S 3 3 R S 4 4 R S 1 2 R S 2 4 R S 3 1 R S 4 3Natural Join: Example R S sid bid day sid sname rating age 22 101 10/10/96 22 dustin 7 45.0 58 103 11/12/96 31 lubber 8 55.5 58 rusty 10 35.0 R S = sid sname rating age bid day 22 dustin 7 45.0 101 10/10/96 58 rusty 10 35.0 103 11/12/96Types of Relationships ManytoMany OnetoMany OnetoOneJoin Algorithms in MapReduce • Reduceside join • Mapside join • Inmemory join – Striped variant – Memcached variantReduceside Join • Basic idea: group by join key – Map over both sets of tuples – Emit tuple as value with join key as the intermediate key – Execution framework brings together tuples sharing the same key – Perform actual join in reducer – Similar to a “sortmerge join” in database terminologyReduceside Join • Two variants – 1to1 joins – 1tomany and manytomany joinsReduceside Join: 1to1 Map keys values R R 1 1 R R 4 4 S S 2 2 S S 3 3 Reduce keys values R S 1 2 S R 3 4 Note: no guarantee if R is going to come first or SReduceside Join: 1tomany Map keys values R R 1 1 S S 2 2 S S 3 3 S S 9 9 Reduce keys values R S S … 1 2 3Reduceside Join: 1tomany In reducer… ValuetoKey Conversion keys values R New key encountered: hold in memory 1 Cross with records from other set S 2 S 3 S 9 R New key encountered: hold in memory 4 Cross with records from other set S 3 S 7Reduceside Join: manytomany In reducer… keys values R 1 R Hold in memory 5 R 8 Cross with records from other set S 2 S 3 S 9Mapside Join: Basic Idea Assume two datasets are sorted by the join key: R S 1 2 R S 2 4 R S 4 3 R S 3 1 A sequential scan through both datasets to join (called a “merge join” in database terminology)Mapside Join: Parallel Scans • If datasets are sorted by join key, join can be accomplished by a scan over both datasets • How can we accomplish this in parallel – Partition and sort both datasets in the same mannerMapside Join: Parallel Scans • In MapReduce: – Map over one dataset, read from other corresponding partition – No reducers necessary • unless to repartition or resort • Consistently partitioned datasets: realistic to expectInMemory Join • Basic idea: load one dataset into memory, stream over other dataset – Works if R S and R fits into memory – Called a “hash join” in database terminologyInMemory Join • MapReduce implementation – Distribute R to all nodes – Map over S, each mapper loads R in memory, hashed by join key – For every tuple in S, look up join key in R – No reducers • unless for regrouping or resorting tuplesInMemory Join: Variants • Striped variant: – R too big to fit into memory – Divide R into R , R , R , … s.t. each R fits into 1 2 3 n memory – Perform inmemory join: n, R⋈ S n – Take the union of all join resultsInMemory Join: Variants • Memcached join: – Load R into memcached – Replace inmemory hash lookup with memcached lookupMemcached Caching servers: 15 million requests per second, 95 handled by memcache (15 TB of RAM) Database layer: 800 eightcore Linux servers running MySQL (40 TB user data) Source: Technology Review (July/August, 2008)Memcached Join • Capacity and Scalability – Memcached capacity RAM of individual node – Memcached scales out with cluster • Latency – Memcached is fast (basically, speed of network) – Batch requests to amortize latency costs Source: See tech report by Lin et al. (2009)Which join to use • Inmemory join Mapside join Reduceside join – Why • Limitations of each – Inmemory join: memory – Mapside join: sort order and partitioning – Reduceside join: general purposeProcessing Relational Data • Summary: MapReduce algorithms for processing relational data – Group by, sorting, partitioning are handled automatically by shuffle/sort in MapReduce – Selection, projection, and other computations (e.g., aggregation), are performed either in mapper or reducer – Multiple strategies for relational joinsProcessing Relational Data • Complex operations require multiple MapReduce jobs – Example: top 10 URLs in terms of average time spent – Opportunities for automatic optimisationMatrixVector Multiplication • Suppose we have an n×n matrix M, whose element in row i and column j is denoted m . ij • Suppose we also have a vector v of length n, whose jth element is v . j • Then the matrixvector product is the vector x of length n, whose ith element x is given by iMatrixVector MultiplicationMatrixVector Multiplication • If v can fit in main memoryMatrixVector Multiplication • If v can fit in main memory: – Each Map task will operate on a chunk of the matrix M. – At the compute node executing a Map task, v is first read (in its entirety) into main memory, and subsequently it will be available to all applications of the Map function performed at this Map task.MatrixVector Multiplication • If v can fit in main memory – The Map Function: For each matrix element m it ij produces the keyvalue pair (i, m v ). ij j – The Reduce Function: It simply sums all the values associated with a given key i, thus the result will be a pair (i, x ). iMatrixVector Multiplication • If v cannot fit in main memory – To avoid excessive disk access, we can divide the matrix into vertical stripes of equal width and divide the vector into an equal number of horizontal stripes, of the same height, so that the portion of the vector in one stripe can fit into main memory at a compute node.MatrixVector Multiplication • If v cannot fit in main memoryMatrixVector Multiplication • If v cannot fit in main memory – The ith stripe of the matrix multiplies only components from the ith stripe of the vector. – Thus, we can divide the matrix into one file for each stripe, and do the same for the vector. – Each Map task is assigned a chunk from one of the stripes of the matrix and gets the entire corresponding stripe of the vector.Matrix Multiplication • If M is a matrix with element m in row i and ij column j, and N is a matrix with element n in row j and jk column k, then the product P = MN is the matrix P with element p in row i and column k, where ikMatrix MultiplicationMatrix Multiplication • A matrix = a relation with three attributes: the row number, the column number, and the value at that row and column. – M relation M(I, J, V), with tuples (i, j, m ) ij – N relation N(J, K, W), with tuples (j, k, n ) jk • The product MN is almost a natural join (on attribute J) followed by grouping and aggregation.Matrix Multiplication • With two MapReduce steps (1/2) – The Map Function: For each matrix element m , ij produce the keyvalue pair (j, (M, i, m )). Likewise, ij for each matrix element n , produce the keyvalue jk pair (j, (N, k, n )) . jk – The Reduce Function: For each key j, examine its list of associated values. For each value from M, say (M, i, m ), and each value from N, say (N, k, ij n ), produce a keyvalue pair ((i, k), m n ). jk ij jkMatrix Multiplication • With two MapReduce steps (2/2) – The Map Function: This function is just the identity. – The Reduce Function: For each key (i, k), produce the sum of the list of values associated with this key. The result is a pair ((i, k), v), where v is the value of the element in row i and column k of the matrix P = MN .Matrix Multiplication • With one MapReduce step – The Map Function: For each element m of M, ij produce all the keyvalue pairs ((i, k), (M, j, m )) ij for k = 1, 2, …, up to the number of columns of N. Similarly, for each element n of N, produce all the jk keyvalue pairs ((i, k), (N, j, n )) for i = 1, 2, …, up jk to the number of rows of M. Matrix Multiplication • With one MapReduce step – The Reduce Function: Each key (i, k) has an associated list with all the values (M, j, m ) and ij (N, j, n ), for all possible values of j. To connect jk the two values on the list that have the same value of j for each j, we can sort by j the values beginning with M and the values beginning with N, in separate lists. The jth values on each list must have their third components m and n ij jk extracted and multiplied. Then, these products are summed and the paired with (i, k) in the output.Evolving Roles for Relational Database and MapReduceNeed for HighLevel Languages • Hadoop is great for largedata processing – But writing Java programs for everything is verbose and slow – Analysts don’t want to (or can’t) write Java • Solution: develop higherlevel data processing languages – Hive: HQL is like SQL – Pig: Pig Latin is a bit like PerlHive and Pig • Common idea: – Provide higherlevel language to facilitate large data processing – Higherlevel language “compiles down” to Hadoop jobsHive • Hive: data warehousing application in Hadoop – Query language is HQL, variant of SQL – Tables stored on HDFS as flat files – Developed by Facebook, now open sourceHive: Example • Hive looks similar to an SQL database • Relational join on two tables: – Table of word counts from Shakespeare collection – Table of word counts from the BibleHive: Example SELECT s.word, s.freq, k.freq FROM shakespeare s JOIN bible k ON (s.word = k.word) WHERE s.freq = 1 AND k.freq = 1 ORDER BY s.freq DESC LIMIT 10; the 25848 62394 I 23031 8854 and 19671 38985 to 18038 13526 of 16700 34654 a 14170 8057 you 12702 2720 my 11297 4135 in 10797 12445 is 8882 6884 Source: Material drawn from Cloudera training VMHive: Behind the Scenes SELECT s.word, s.freq, k.freq FROM shakespeare s JOIN bible k ON (s.word = k.word) WHERE s.freq = 1 AND k.freq = 1 ORDER BY s.freq DESC LIMIT 10; (Abstract Syntax Tree) (TOKQUERY (TOKFROM (TOKJOIN (TOKTABREF shakespeare s) (TOKTABREF bible k) (= (. (TOKTABLEORCOL s) word) (. (TOKTABLEORCOL k) word)))) (TOKINSERT (TOKDESTINATION (TOKDIR TOKTMPFILE)) (TOKSELECT (TOKSELEXPR (. (TOKTABLEORCOL s) word)) (TOKSELEXPR (. (TOKTABLEORCOL s) freq)) (TOKSELEXPR (. (TOKTABLEORCOL k) freq))) (TOKWHERE (AND (= (. (TOKTABLEORCOL s) freq) 1) (= (. (TOKTABLEORCOL k) freq) 1))) (TOKORDERBY (TOKTABSORTCOLNAMEDESC (. (TOKTABLEORCOL s) freq))) (TOKLIMIT 10))) (one or more of MapReduce jobs)STAGE DEPENDENCIES: Stage1 is a root stage Stage2 depends on stages: Stage1 Stage: Stage2 Stage0 is a root stage Map Reduce Alias Map Operator Tree: STAGE PLANS: hdfs://localhost:8022/tmp/hivetraining/364214370/10002 Stage: Stage1 Reduce Output Operator Map Reduce key expressions: Alias Map Operator Tree: expr: col1 s type: int TableScan sort order: alias: s tag: 1 Filter Operator value expressions: predicate: expr: col0 expr: (freq = 1) type: string type: boolean expr: col1 Reduce Output Operator type: int key expressions: expr: col2 expr: word type: int Reduce Operator Tree: type: string Reduce Operator Tree: Join Operator sort order: + Extract condition map: Mapreduce partition columns: Limit Inner Join 0 to 1 expr: word File Output Operator condition expressions: type: string compressed: false 0 VALUE.col0 VALUE.col1 tag: 0 GlobalTableId: 0 1 VALUE.col0 value expressions: table: outputColumnNames: col0, col1, col2 expr: freq input format: org.apache.hadoop.mapred.TextInputFormat Filter Operator type: int output format: predicate: expr: word org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat expr: ((col0 = 1) and (col2 = 1)) type: string type: boolean k Select Operator TableScan Stage: Stage0 expressions: alias: k Fetch Operator expr: col1 Filter Operator limit: 10 type: string predicate: expr: col0 expr: (freq = 1) type: int type: boolean expr: col2 Reduce Output Operator type: int key expressions: outputColumnNames: col0, col1, col2 expr: word File Output Operator type: string compressed: false sort order: + GlobalTableId: 0 Mapreduce partition columns: table: expr: word input format: org.apache.hadoop.mapred.SequenceFileInputFormat type: string output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat tag: 1 value expressions: expr: freq type: intPig • Pig: largescale data processing system – Scripts are written in Pig Latin, a dataflow language – Developed by Yahoo, now open source – Roughly 1/3 of all Yahoo internal jobsPig: Example Task: Find the top 10 most visited pages in each category Visits Url Info User Url Time Url Category PageRank Amy cnn.com 8:00 cnn.com News 0.9 Amy bbc.com 10:00 bbc.com News 0.8 Amy flickr.com 10:05 flickr.com Photos 0.7 Fred cnn.com 12:00 espn.com Sports 0.9 Pig Slides adapted from Olston et al. (SIGMOD 2008)Pig Query Plan Load Visits Group by url Foreach url Load Url Info generate count Join on url Group by category Foreach category generate top10(urls) Pig Slides adapted from Olston et al. (SIGMOD 2008)Pig Script visits = load ‘/data/visits’ as (user, url, time); gVisits = group visits by url; visitCounts = foreach gVisits generate url, count(visits); urlInfo = load ‘/data/urlInfo’ as (url, category, pRank); visitCounts = join visitCounts by url, urlInfo by url; gCategories = group visitCounts by category; topUrls = foreach gCategories generate top(visitCounts,10); store topUrls into ‘/data/topUrls’; Pig Slides adapted from Olston et al. (SIGMOD 2008)Pig Script in Hadoop Map 1 Load Visits Group by url Reduce 1 Map 2 Foreach url Load Url Info generate count Join on url Reduce 2 Map Group by 3 category Reduce 3 Foreach category generate top10(urls) Pig Slides adapted from Olston et al. (SIGMOD 2008)Parallel Databases  MapReduce • Lots of synergy between parallel databases and MapReduce • Communities have much to learn from each other • Bottom line: use the right tool for the jobTake Home Messages • Data management in today’s organisations – Where does MapReduce fit in • MapReduce algorithms for processing relational and matrix data – How do I perform a join, etc. • Evolving roles of relational databases and MapReduce – What’s in store for the future