Hadoop clickstream analysis

Hadoop for Clickstream Analysis and hadoop clickstream data and hadoop reporting and analysis
CecilGalot Profile Pic
CecilGalot,United Kingdom,Teacher
Published Date:06-08-2017
Your Website URL(Optional)
Comment
Clickstream Analysis Clickstream analysis commonly refers to analyzing the events (click data) that are col‐ lected as users browse a website. Such an analysis is typically done to extract insights into visitor behavior on your website in order to inform subsequent data-driven deci‐ sions. You can use clickstream analysis to figure out how users interact with your online presence, which geographic regions or time zones your traffic originates from, what devices and operating systems are most commonly used to access your site, what the common paths are that your users take before they do something that’s of interest to you (usually referred to as a goal—e.g., register for the mailing list, sign up for an account, or add an item to the shopping cart), and so on. You can correlate such click data with your marketing spend to do further analysis like the return on investment (ROI) of various marketing channels (organic search, paid search, social media spending, etc.). You can also, optionally, join click data with your ODS (Operational Data Store) or CRM (Customer Relationship Management) data to do further analy‐ sis based on additional information about your users. Although this chapter uses analysis on clickstream data to show how to put together the various pieces described in Part I for a given use case, the concepts described here are applicable to any “batch” processing system that operates on machine-generated data. Examples of such data include but are not limited to ad impressions data, per‐ formance or other machine logs, network logs, and call logs. Defining the Use Case Suppose you are an online retailer selling bike parts. Users visit your website to browse bike parts, read reviews, and hopefully make a purchase. Whenever users view a page or click a link, this gets logged by your website application. The logs gen‐ 253 erated by the website, in many cases, are plain old Apache web server or Nginx logs, but in some cases could be custom logs generated by web analytics tools tracking your website. Regardless, all these clicks are logged in strictly increasing order of time. As an online retailer, you’d like to gain a better understanding of your users and streamline your sales and marketing efforts. To accomplish these goals, you’d like to answer questions like the following: • What is the total number of pages viewed on my website last month? How does it compare to previous months? • How many unique visitors visited my website in the past month? How does it compare to previous months? • How much time on average do visitors spend shopping on my website? • What is the bounce rate of my website? In other words, how many users leave the website without visiting any other page except the one they landed on? • For each landing page, what is the probability that a new user who landed at that page will end up buying something from the website in that session? • For each landing page, what is the probability that a new user who landed at that page will end up buying something from the website within the next seven days? • For each user who ends up buying something, to which marketing channel (direct, organic search, paid search, etc.) can that purchase be attributed? To answer these questions and more, you would need to scan, process, and analyze data logged by your web servers. For the purposes of subsequent discussion, we will assume the web logs are in a com‐ monly used web log format known as combined log format. Each individual log entry in this log format looks like (note that due to space constraints, the following example is shown on multiple lines; however in an actual log entry, this code would appear on one continuous line): 127.0.0.1 - frank 10/Oct/2000:13:55:36 -0700 "GET / apache_pb.gif HTTP/1.0" 200 2326 "http://www.example.com/start.html" "Mozilla/4.08 en (Win98; I ;Nav)" Breaking the log entry down into various parts, we get: 127.0.0.1 This is the source IP (i.e., the IP address of the device where the click came from). 254 Chapter 8: Clickstream Analysis- The second field, represented by a hyphen in this example, is intended to be the identity of the client, but in practice is considered unreliable and is simply disre‐ garded by the server. frank This is the username (if applicable) as identified by HTTP authentication. 10/Oct/2000:13:55:36 -0700 This specifies the time at which the server finished processing the request, along with the associated time zone. GET /apache_pb.gif HTTP/1.0 This is the type of request and page requested by the client. 200 This indicates the status code returned by the web server to the client. 2326 This is the size of the object returned to the client, minus any response headers. http://www.example.com/start.html This represents the referrer (if applicable)—that is, the page that brought the user to the site. Mozilla/4.08 en (Win98; I ;Nav) This is the user agent string identifying the browser and operating system on the device from which the click was made. Using Hadoop for Clickstream Analysis A very active website can generate several gigabytes of web server logs per day. Stor‐ ing and analyzing this sheer volume of data requires a robust distributed system. Also, log data arrives at a very high velocity. Typically, logs need to be rotated several times a day—often, several times in an hour—and analysts want to know the latest micro and macro trends as soon as possible. Moreover, log data is semistructured; it may have user agent strings that need to be parsed, or additional parameters that might be added to and later removed from log records, such as parameters added for testing. All these characteristics make Hadoop a very good candidate for doing ana‐ lytics on clickstream data. In this chapter, we will show how you can implement an application for collecting, processing, and analyzing web server clickstream data using Apache Hadoop and related ecosystem projects. Using Hadoop for Clickstream Analysis 255Design Overview We can break the high-level design of our example into five main sections—storage, ingestion, processing, analyzing, and orchestration: • Storage refers to decisions around the storage system (HDFS or HBase), data for‐ mats, compression, and data models. • Ingestion refers to getting click data from the web servers and secondary data (e.g., CRM, ODS, marketing spend data) and loading it into Hadoop for processing. • Processing refers to taking the raw data ingested into Hadoop and transforming it into data set(s) that can be used for analysis and querying. Processing here may refer to, but is not limited to, deduplication of data, filtering out “bad” clicks, converting the clicks into columnar format, sessionization (the process of group‐ ing clicks made by a single user in a single visit by associating a session ID repre‐ senting that visit to each click made as a part of the visit), and aggregation. • Analyzing refers to running various analytical queries on the processed data sets to find answers and insights to the questions presented earlier in the chapter. • Orchestration refers to automating the arrangement and coordination of various processes that perform ingestion, processing, and analyzing. Figure 8-1 depicts an overview of such a design. In our design implementation, we will use HDFS to store the data, Flume for ingest‐ ing Apache logs, and Sqoop for ingesting other secondary data (e.g., CRM, ODS, marketing spend) into Hadoop. We will use Spark for processing and Impala for ana‐ lyzing the processed data. By connecting a BI tool to Impala, we will be able to per‐ form interactive queries on the processed data. We will use Oozie to orchestrate multiple actions into a single workflow. 256 Chapter 8: Clickstream AnalysisFigure 8-1. Design overview Next, we will dive into a detailed description of our design. This will include details of file formats and data models, how to instrument a web application to send click‐ stream information to Flume, and how to configure Flume to store these events in text format. We will also discuss various processing algorithms involved—in particu‐ lar, the sessionization algorithm and how it can be implemented—and how Impala (or another SQL-on-Hadoop tool) can be used to efficiently perform aggregations over large data sets. Where applicable, we will also discuss the cadence of the automa‐ ted processes (ingestion, processing, etc.) being run on the data. We will also discuss why each tool was chosen over the alternatives. Let’s go through the aforementioned sections of our design one by one. Storage As we discussed in the previous section, our application will take raw data ingested via Flume, perform several steps of transformation, and prepare a cleaned and enriched data set for analysis. To begin this process, we need to decide how to store the raw data, the intermediate results of the transformations, and then the final data set. Because each of these data sets serves a different purpose, we want to make sure our storage decisions—data formats and models—will fit the purpose of the data set. Storage 257 To start, we will store the raw data ingested via Flume in text format on HDFS. We will use HDFS because the subsequent processing calls for batch transformations across multiple records. As we discussed in previous chapters, HDFS is a better fit for batch operations that need to scan large amounts of data efficiently. We chose text as the file format because it’s simple to handle and can be used with any log type without requiring extra processing in Flume. However, after data is processed, we will be running aggregate queries and analysis on large chunks of data at a time, mostly only on a subset of columns. Analytical queries require scanning at least a day’s worth, and often months’ worth, of data as efficiently as possible, which means that HDFS is still a good choice. Because many analytical queries only select a subset of columns at a time, a columnar format is a very good choice for storage format for the processed data. Consequently, we use Parquet—a columnar format—for storing the processed (i.e., sessionized) data that will be ana‐ lyzed later on. We will compress the processed data set with Snappy due to its improved CPU efficiency. For a complete discussion of all possible file and compres‐ sion formats, and guidelines on how to choose the best file and compression format, please see Chapter 1. We plan to store the raw data indefinitely, rather than removing it after we are done processing it. This is a common choice for designing Hadoop applications because of several benefits: • It allows us to reprocess data easily in case of bugs or failures in ETL processing. • It allows us to analyze the raw data for interesting features we may have neglected to extract into the processed data set. This is also useful in data discovery and exploration in which raw data is explored in the design of the ETL pipeline—in particular, for deciding what features need to be included in the processed data set. • It is also useful for auditing purposes. To store the data sets, we are going to use the directory structure introduced in the section “HDFS Schema Design” on page 14. Moreover, because our analysis would be over chunks of at least a day in duration (or a multiple thereof), we decided to parti‐ tion our click data sets by date where one leaf-level partition corresponds to one day of click data. For click data, here is the directory structure we will use to store raw and processed click data: /etl/BI/casualcyclist/clicks/rawlogs/year=2014/month=10/day=10 Raw clicks as they arrive from Flume. This is the raw data set. /data/bikeshop/clickstream/year=2014/month=10/day=10 Cleaned, sessionized data, ready for analyzing. This is the processed data set. 258 Chapter 8: Clickstream Analysis As you see, we use three levels of partitions (year, month, and day) to partition both the data sets by date. Size of the Processed Data Set The processed data set is often smaller than that of the raw data set. This is true especially when the processed data set is stored in a more compression-friendly format (e.g., a columnar format like Parquet). Because the processed data set is smaller, the average size of each leaf-level partition (i.e., a partition representing one day of data) may be too small for efficient use of Hadoop. As mentioned in Chapter 1, the average size of each partition should be at least a few multiples of the HDFS block size (the default HDFS block size being 64 MB) to make efficient use of Hadoop’s processing capabil‐ ities. If you find that the average size of the daily partitions in your processed set is smaller than that, we recommend you get rid of the day partitions in the processed data set and have only year and month as the two levels of partitions. The raw data set can still stay partitioned by year, month, and day. For the rest of this chapter, we assume that the daily data in the processed data set is large enough to warrant day being a third-level partition column. Also, oftentimes you will see a single-level partitioning scheme like dt=2014-10-10 being used instead of the three-level partitioning scheme year=2014/month=10/ day=10. This alternate one-level partitioning scheme is another valid choice for parti‐ tioning the raw and processed data sets and has only one partition column (of type string: dt) instead of having three partition columns (of type int: year, month, and day). The one-level partitioning scheme creates fewer total partitions, even though the number of leaf-level partitions remains the same. Fewer number of partitions means less logical metadata about these partitions that needs to be stored in the meta‐ store, which may lead to a small performance gain when you’re querying the data set using tools that access metadata like Hive, HCatalog, and Impala. However, the one- level partitioning scheme offers less flexibility with querying the data sets as com‐ pared to the three-level partitioning scheme. For example, if you were doing yearly analysis on the data set with three-level partitioning scheme, we could simply use WHERE year=2014 instead ofWHERE dt LIKE '2014-%'. At the end of the day, they are both reasonable partitioning schemes and are used equally. Choosing one over the other is likely more of a matter of preference and style. In this chapter, we will use the three-level partitioning scheme. Figures 8-2 and 8-3 show the three-level and one-level partition scheme, respectively. Storage 259 Figure 8-2.Three-level partitioning scheme Figure 8-3. One-level partitioning scheme For a more complete discussion of schema design in HDFS and Hive, please refer to Programming Hive. Ingestion As we discussed in Chapter 2, we have several options for getting data into Hadoop. Let’s evaluate these options and see how they fit with the requirements for our archi‐ tecture: File transfer This option is appropriate for one-time transfers of files, but not for reliable ingestion of large volumes of clickstream data. Sqoop Sqoop is an excellent tool for moving data to and from external data stores such as a relational database management system, but it’s clearly not a fit for ingesting log data. 260 Chapter 8: Clickstream AnalysisKafka As we discussed in Chapter 2, Kafka’s architecture makes it an excellent solution for reliably moving large volumes of log data from our web servers to various consumers, including HDFS. We’ll consider Kafka a strong contender for the ingestion part of our architecture. Flume Like Kafka, Flume is also an excellent choice for reliably moving large volumes of event data such as logs into HDFS. So our top contenders for ingesting our logs are Kafka and Flume. Both provide the reliable, scalable ingestion of log data we require. Because our application only requires ingesting the log data into HDFS, we’ll select Flume, because it’s purpose- built for HDFS—the built-in components tailored for HDFS mean that we won’t have to do any custom development in order to build our Flume pipeline. Flume also sup‐ ports interceptors—the ability to perform small transformations of the data, such as filtering fake clicks from search crawlers, while the data is being ingested and before it is written to Hadoop. If we needed to build a more general-purpose ingestion pipeline supporting multiple data sinks in addition to HDFS, then we’d likely want to consider Kafka. Now that we’ve selected our tool, let’s discuss our options for getting data into our Flume pipeline: • If we’re working with a web application written in Java that uses Log4j for log‐ ging, we can use the Flume Log4j appender to send data to a Flume Avro source. This is one of the simplest ways to send events to Flume, and requires just the addition of a single file to the application classpath (flume-ng-sdk-.jar) and a few modifications to Log4j properties. Unfortunately, in this case we’re working with a third-party application where we may not have access to modify the code, or it may be written in a language other than Java, so the Log4j appender won’t be suitable. • In cases such as this in which the Log4j appender isn’t suitable, there are other good options available. You can use the Avro or Thrift sources to send Avro or Thrift messages to Flume. Flume can also pull messages from a JMS queue. Another option is to send JSON messages to the HTTP source. The choice of integration point depends on the frameworks you are using in your application. Having the choice of multiple integration options allows you to use something that works well with your existing infrastructure. For example, if you are already sending events to a JMS queue, it makes sense to integrate Flume with this queue. If you are not already using a JMS queue, there is no need to introduce it just for Flume integration, and you can choose a more convenient option. Again, none of these will be appropriate choices for ingesting logs from disk. Ingestion 261 • Flume provides a syslog source, which can read events from a syslog stream and convert them into Flume events. Syslog is a standard tool for capturing and mov‐ ing system logs. The Apache HTTP server has some support for sending log out‐ put to syslog, but currently error logs only. We’d need to implement a workaround for the access logs, such as piping them to syslog. • In our case, because we’re working with a third-party application and don’t have the flexibility to add clickstream instrumentation into the application itself, and we need to work with logfiles already written to disk, we’ll look at Flume’s spool‐ ing directory source. This source will read files from a directory and turn every line into a Flume event. Note that some examples found on the Web show how to use the exec source to tail a logfile. This is highly discouraged because it is not a reliable solution. Tail does not track the last place it read, and if the Flume agent crashes it will either read duplicate records or miss them, both of which are, of course, undesirable. By contrast, the spooling directory source will only read complete files, so in case of failure it will retry the entire file and mark files suc‐ cessfully read so they will not be ingested twice. This creates a much more relia‐ ble solution for ingesting data from files. After we’ve selected the Flume source to get events into our Flume pipeline, we need to consider the appropriate Flume channels throughout our tiers. As we discussed in Chapter 2, the memory channel is the best choice when performance is more impor‐ tant than reliability, and the file channel is recommended when reliability is more important. In this example, reliability is very important. If the memory channel is los‐ ing clicks, it is more likely to do so during times of peak activity, leading to potential data loss when your site is under heavy load. We’ll therefore use the file channel in all of our Flume tiers. The choice of the terminal sink is pretty obvious: we want to store the data in HDFS, and will therefore use the HDFS sink. In particular, we will store the data in text for‐ mat for reasons that will become clear in the processing section. Data will be parti‐ tioned by timestamp within our HDFS directories; we’ll discuss this in more detail when we dig into the ingestion architecture in a moment. This partitioning by date and time will allow us to minimize disk I/O during processing, making our data pro‐ cessing jobs more efficient. Now that we’ve made these architectural decisions for our ingestion layer, let’s dig deeper into the architecture of our Flume pipeline. We’ll start with a higher-level view, and then we’ll drill down into the configuration details of the individual tiers in our ingestion pipeline. Figure 8-4 shows the high-level view of our ingestion work‐ flow. You’ll note that we’re using the fan-in pattern (as discussed in Chapter 2) in this flow. 262 Chapter 8: Clickstream AnalysisFigure 8-4. High-level view of our Flume ingestion architecture In this diagram, you’ll see that we have three primary tiers in our pipeline: Client tier In this example, our client tier is composed of the web servers generating the logs that need to be ingested into HDFS so we can use them for the required analysis. An important thing to note here is that each of our web servers hosts a Flume agent, which is responsible for taking the log events generated by the web server running on that host, and sending them to the collector tier. Collector tier These hosts aggregate the events coming from the client tier and pass them on to the final destination, in this case HDFS. In this example, we’re assuming the col‐ lector agents are running on cluster edge nodes—these are nodes within the clus‐ ter network that have a Hadoop client configuration so they can communicate with the Hadoop cluster in order to submit Hadoop commands, write data to HDFS, and so on. HDFS This is our terminal destination. The Flume agents on the collector tier will be responsible for persisting the events to files in HDFS. As part of this persistence, the Flume agents are configured to ensure the files are partitioned and named appropriately on disk. In this configuration, our Flume agents run on the web servers and collector tiers— note that we don’t run a Flume agent on HDFS, but instead the agents on the collec‐ tor tier leverage the Hadoop client to write the events to HDFS. Ingestion 263 As we talked about in Chapter 2, this fan-in architecture provides several benefits: • It allows us to control the number of Flume agents connecting to the cluster. Allowing the web server agents to connect directly to the cluster may not present an issue if we only have a handful of servers, but if we have hundreds or even thousands of servers connecting to our cluster this could cause resource issues on the cluster. • It potentially allows us to take advantage of greater disk space on the collector nodes to buffer events, and preserve possibly scarce disk space on our web servers. • Multiple collector agents allow us to both spread the load as well as provide for failover in the event that one or more collector agents goes down. We’ll get into more detail on the tiers in our Flume pipeline shortly. The Client Tier Figure 8-5 shows the details of our client-tier Flume agents. Figure 8-5. Client-tier detail The following list describes the individual components in the client agents: Source data This is the incoming events to our Flume pipeline, in this case Apache log records. As mentioned, for purposes of this example we’re using the spooling directory source, so in this case our source data will be pulled from a specific directory on each web server host. Flume source This is the spooling directory source, which will read files for ingestion from a directory on disk, convert them into events, and then rename logs when process‐ ing is completed. 264 Chapter 8: Clickstream AnalysisTimestamp interceptor This interceptor simply inserts a timestamp into the header of each of our Flume events. We’ll use this timestamp downstream in the HDFS sink in order to parti‐ tion the resulting files in HDFS by date. Channel We use a file channel here for maximum durability. Avro sinks The Avro sinks, in combination with Avro sources, provide the serialization mechanism to transmit our events between Flume tiers. The important thing to note here is that we’re configuring multiple sinks to provide the failover function‐ ality that we spoke about earlier, using a load balancing sink group processor to spread the load across all available sinks. Our choice of the Flume Avro source has little to do with the for‐ mat that the data will be stored on HDFS, via the HDFS sink. As mentioned in Chapter 1, Avro is a serialization format that can be used when we’re transferring data from one process or another or when storing data on a filesystem, say HDFS. In this case, the Avro sinks and sources are just acting as a serialization mechanism to move our event data over the wire. The format of the click data stored in HDFS will be determined by the terminal Flume sink. The following shows an example Flume configuration file for the client Flume agents. This configuration would be deployed on all of our web servers: Define spooling directory source: client.sources=r1 client.sources.r1.channels=ch1 client.sources.r1.type=spooldir client.sources.r1.spoolDir=/opt/weblogs Use the Timestamp interceptor to add a timestamp to all event headers: client.sources.r1.interceptors.i1.type=timestamp client.sources.r1.interceptors=i1 Define a file channel: client.channels=ch1 client.channels.ch1.type=FILE Define two Avro sinks: client.sinks=k1 k2 client.sinks.k1.type=avro client.sinks.k1.hostname=collector1.hadoopapplicationarch.com Compress data before sending across the wire: client.sinks.k1.compression-type=deflate client.sinks.k1.port=4141 client.sinks.k2.type=avro client.sinks.k2.hostname=collector2.hadoopapplicationarch.com client.sinks.k2.port=4141 Ingestion 265client.sinks.k2.compression-type=deflate client.sinks.k1.channel=ch1 client.sinks.k2.channel=ch1 Define a load balancing sink group to spread load over multiple collectors: client.sinkgroups=g1 client.sinkgroups.g1.sinks=k1 k2 client.sinkgroups.g1.processor.type=load_balance client.sinkgroups.g1.processor.selector=round_robin client.sinkgroups.g1.processor.backoff=true The Collector Tier Figure 8-6 shows the details of our collector tier Flume agents. Figure 8-6. Collector tier detail Following are the descriptions of the components in our collector-tier agents: Avro source As we noted in the discussion of the Avro sink, this provides the other end of our serialization hop from the client tier to the collector tier. Channel Here again, we use a file channel to ensure the reliability of event delivery. To fur‐ ther increase reliability, note that we take advantage of multiple disks on the edge nodes, as discussed in Chapter 2. HDFS sink This is the end of the line for our Flume pipeline; this is where we’re taking the log events and persisting them to HDFS. Some things to note in the configura‐ tion of the HDFS sink are the use of the hdfs.path, hdfs.filePrefix, and 266 Chapter 8: Clickstream Analysis hdfs.fileSuffix parameters to partition the resulting files by date, with a defined filename. With the parameters shown in the upcoming configuration, our resulting files will be written to a path that looks like /weblogs/access/YEAR/ MONTH/DAY/access.EPOCH_TIMESTAMP.log. Also, note that we’re writing the files as plain text using the hdfs.fileType=DataStream and hdfs.writeFor mat=Text parameters to the HDFS sink. The following shows what our configuration file for the collector agents looks like. In our architecture, this configuration will be deployed to each of our edge nodes that’s part of the collector tier: Define an Avro source: collector.sources=r1 collector.sources.r1.type=avro collector.sources.r1.bind=0.0.0.0 collector.sources.r1.port=4141 collector.sources.r1.channels=ch1 Decompress the incoming data: collector1.sources.r1.compression-type=deflate Define a file channel using multiple disks for reliability: collector.channels=ch1 collector.channels.ch1.type=FILE collector.channels.ch1.checkpointDir=/opt/flume/ch1/cp1,/opt/flume/ch1/cp2 collector.channels.ch1.dataDirs=/opt/flume/ch1/data1,/opt/flume/ch1/data2 Define HDFS sinks to persist events to disk as text. Note the use of multiple sinks to spread the load: collector.sinks=k1 k2 collector.sinks.k1.type=hdfs collector.sinks.k1.channel=ch1 Partition files by date: collector.sinks.k1.hdfs.path=/weblogs/combined/%Y/%m/%d collector.sinks.k1.hdfs.filePrefix=combined collector.sinks.k1.hdfs.fileSuffix=.log collector.sinks.k1.hdfs.fileType=DataStream collector.sinks.k1.hdfs.writeFormat=Text collector.sinks.k1.hdfs.batchSize=10000 Roll HDFS files every 10000 events or 30 seconds: collector.sinks.k1.hdfs.rollCount=10000 collector.sinks.k1.hdfs.rollSize=0 collector.sinks.k1.hdfs.rollInterval=30 collector.sinks.k2.type=hdfs collector.sinks.k2.channel=ch1 Partition files by date: collector.sinks.k2.hdfs.path=/weblogs/combined/%Y/%m/%d collector.sinks.k2.hdfs.filePrefix=combined collector.sinks.k2.hdfs.fileSuffix=.log collector.sinks.k2.hdfs.fileType=DataStream collector.sinks.k2.hdfs.writeFormat=Text collector.sinks.k2.hdfs.batchSize=10000 collector.sinks.k2.hdfs.rollCount=10000 collector.sinks.k2.hdfs.rollSize=0 Ingestion 267collector.sinks.k2.hdfs.rollInterval=30 collector.sinkgroups=g1 collector.sinkgroups.g1.sinks=k1 k2 collector.sinkgroups.g1.processor.type=load_balance collector.sinkgroups.g1.processor.selector=round_robin collector.sinkgroups.g1.processor.backoff=true Before wrapping up the discussion of data ingestion, let’s talk about ingesting secon‐ dary data into Hadoop. If you are joining click data with data from CRM, ODS, or similar systems, you would need to ingest data from secondary data sources into Hadoop. While the exact choice of ingestion method would depend on what these data sources look like in your organization, in this chapter we will assume they are traditional relational databases. As discussed in Chapter 2, Sqoop is the best choice for transferring data from relational databases into Hadoop. CRM and ODS data sets are very small compared to the click data and don’t grow (or change) as fast as click data. This makes them ideal candidates for a batch Sqoop job, say once a day or sev‐ eral times a day, to import the data from CRM and ODS databases into Hadoop. If these data sets are small enough, it may make sense to simply delete and re-create them on every Sqoop job. However, if they’re big enough, an incremental Sqoop import would make more sense. Processing Figure 8-7 shows the processing section of the architecture. Figure 8-7. Processing design 268 Chapter 8: Clickstream Analysis As explained in the previous section, the click data is ingested into HDFS via Flume. However, this is raw data from the web servers that needs to be cleaned up. For exam‐ ple, we may need to remove incomplete and invalid log lines. Furthermore, there may be some log lines that have been duplicated that you will have to deduplicate. Then, we may also need to sessionize this data (i.e., assign unique session IDs to each of the clicks). We may also decide to do some further preaggregations or pre-analysis on the data. This may include doing daily or hourly aggregates/roll-ups on the clicks for faster querying later on, performing marketing ROI or attribution analysis by joining the data with your marketing spend data, or joining click data with your CRM or other data sources so you can analyze your website activity based on additional attributes from those data sources. Essentially any processing that you don’t want to be done on the fly when running your interactive BI queries will fall into this category of preprocessing. Finally, we want the processed data to be in a format that can be queried in a performant manner. Therefore, to summarize, we have the following four goals for the processing pipe‐ line: 1. Sanitize and clean up the raw data. 2. Extract the interesting data from the raw clickstream events. 3. Transform the extracted data to create processed data set(s). In our example, we will create a sessionized data set. 4. Load (i.e., store) the results in a data format in Hadoop that supports high- performance querying. As you might have noticed, barring the preliminary sanitization step, the other three steps represent each of the extract, transform, and load steps of an ETL pipeline, thereby illustrating Hadoop’s use as an ETL tool replacement. For the sanitization step, first we need to remove incomplete and invalid log lines. This can be simple logic written either in MapReduce or with tools like Hive or Pig to ensure that each record (i.e., log line) has all the fields and optionally does some quick validation on a few or all the fields. This is also where you could add logic to ignore any clicks that you believe are from spam bots by checking for particular IP addresses or referrers. Then, we also need logic for deduplicating the log lines. This is because we are using Flume’s file channel for ingesting logs. In the rare event of a Flume agent crash, Flume guarantees that all the log records being ingested will end up in Hadoop. However, there is no guarantee that each of these records would be stored exactly once. In the case of a Flume agent crash, you may end up with some of the log records duplicated, hence the need for deduplicating them. You will learn more about deduplication in the section “Data Deduplication” on page 270. Processing 269 Hive, Pig, Spark, and MapReduce are candidates for tools for deduplicating data. Since both Hive and Pig are higher-level languages, they allow for a simpler and more readable implementation than MapReduce. Therefore, we recommend using Hive or Pig for implementing deduplication; both are reasonable choices. The main consider‐ ations for choosing between the two will be the developer skill set, fit with other tech‐ nologies used by the team, or any other specific project requirements. Regardless of the language, we recommend writing the deduplicated data into a temporary table and then moving it to a final table, to avoid clobbering the original data set. For the extract step, we will extract the timestamp (in seconds since epoch), the refer‐ rer address, user agent string (containing browser and operating system version), IP address, language, and the URL. We chose these fields because we think they are suf‐ ficient for doing the analysis presented by the questions earlier in the chapter. You may decide to simply extract all fields without filtering any, if your subsequent analy‐ sis may use all of them. For the transform step, we will perform sessionization of the clicks and create a new sessionized data set. You will learn more about sessionization in the section “Sessioni‐ zation” on page 272. For secondary data like ODS or CRM data, you may not need to sanitize it since the data coming from relational databases has already been cleaned before it was ingested there. Consequently, you may be able to directly ingest it into a queryable location on HDFS. The directory structure could look like: • /data/bikeshop/ods: data from ODS • /data/bikeshop/crm: data from CRM Also, there is typically no need to partition the secondary data since it’s usually much smaller in size. Now that you have an overview of all the processing steps required, let’s look in detail at how we will implement two of those steps: deduplication and sessionization. Data Deduplication As explained, in the case of a Flume agent crash, you may see some of the log records duplicated. Hive, Pig, and MapReduce are good candidates for deduplicating data. As we’ve men‐ tioned, because both Hive and Pig are higher-level languages, they allow for a simpler and more readable implementation than MapReduce, and are therefore our tools of choice. Hive requires creating an external table for the source raw data and another table for storing the deduplicated data on which further processing will be done. Pig, on the other hand, doesn’t require creating extra tables for writing the deduplicated 270 Chapter 8: Clickstream Analysis data. Again, the choice between Hive and Pig will be determined by the developer skill set, fit with other technologies used by the team, or any other specific project requirements. Deduplication in Hive Here is some example code performing deduplication of clicks in Hive and inserting into a daily partition: INSERT INTO table deduped_log SELECT ip, ts, url, referrer, user_agent, YEAR(FROM_UNIXTIME(unix_ts)) year, MONTH(FROM_UNIXTIME(unix_ts)) month FROM ( SELECT ip, ts, url, referrer, user_agent, UNIX_TIMESTAMP(ts,'dd/MMM/yyyy:HH:mm:ss') unix_ts FROM raw_log WHERE year=YEAR AND month=MONTH AND day=DAY GROUP BY ip, ts, url, referrer, user_agent, UNIX_TIMESTAMP(ts,'dd/MMM/yyyy:HH:mm:ss') ) t; Other than extracting the month and day information for the partitioning, notice that the only operation we do here is to group the data set by all its columns. If two rows are duplicated, they will be identical in all their columns, and after the GROUP BY operation we will be left with a single copy. Deduplication in Pig Deduplicating in Pig is very straight forward and requires little explanation: Processing 271rawlogs = LOAD 'raw_log_dir'; dedupedlogs = DISTINCT rawlogs; STORE dedupedlogs INTO 'deduped_log_dir' USING PigStorage(); Sessionization An important part of clickstream analysis is to group clicks made by a single user in a single visit by associating a session ID that represents that visit to each click made as a part of the visit. This process is known as sessionization. You can then analyze these sessions to figure out what people are doing on your web‐ site during a visit: are they buying things, how are their visits being initiated (e.g., organic search, paid search, links on partner websites). Some tools and marketing analysts use the terms session and visit interchangeably. Sessionization can be done by the web server itself, in which case it assigns a session ID to each of the clicks, representing which session a click is a part of. However, if this session ID is not reliable or if you want to use your own custom logic to session‐ ize clicks, you can write your own sessionization algorithm. In order to perform sessionization, you have to figure out two things: • Given a list of clicks, determine which clicks came from the same user. It is tough to gauge which clicks came from the same user just by looking at the information in the clicks. Some websites drop cookies on their users’ browsers when they visit the website (or an affiliate site) for the first time. These cookies are logged in the web server logs, which can then be used to identify clicks from the same user. Keep in mind that cookies are not 100% reliable since users may delete their cookies often, even in the middle of a visit. Alternatively, or if the cookies are not present, IP addresses can be used to identify clicks that came from the same user. IP addresses, like cookies, are not 100% reliable either, because many companies use network address translators to share IPs between multiple workstations and therefore multiple users may be treated as one user. Your application may take additional parameters, such as the user agent and language, into account to dif‐ ferentiate between such users. In our design, we simply use IP address to identify clicks from the same user. • Given a particular user’s clicks, determine if a given click is a part of a new visit to the website or a continuation of the previous visit. If a user has three consecutive clicks within five minutes of each other, it’s reasonable to say that the clicks were all part of the same browsing session. However, if first two clicks were within five minutes of each other while the third one came an hour later, it is reasonable to assume that the third click was a part of a separate visit from the user and hence a separate session. Most marketing analytics tools consider a click to be a part of a new session if more than 30 minutes has elapsed since the last click from the same user. Also, in our implementation, and as is the case with many marketing 272 Chapter 8: Clickstream Analysis