How to use Data Streaming for Big Data

streaming big data analytics and streaming big data architecture and streaming big data processing in data center clouds, streaming big data with self-adjusting computation
CecilGalot Profile Pic
CecilGalot,United Kingdom,Teacher
Published Date:06-08-2017
Your Website URL(Optional)
Comment
Streaming Data In the previous chapter, we focused on a long-term processing job, which runs in a Hadoop cluster and leverages YARN or Hive. In this chapter, I would like to introduce you to what I call the 2014 way of processing the data: streaming data. Indeed, more and more data processing infrastructures are relying on streaming or logging architecture that ingest the data, make some transformation, and then transport the data to a data persistency layer. This chapter will focus on three key technologies: Kafka, Spark, and the ELK stack from Elastic. We will work on combining them to implement different kind of logging architecture that can be used depending on the needs. Streaming Architecture In this section, we’ll see how the streaming part of the architecture is structured and also what technology we can use for data ingestion. Architecture Diagram First, before going through the architecture details, let’s discuss the word streaming, which can be misunderstand in the context of that chapter. Indeed by streaming architecture, I mean here an architecture that is capable to ingest data as they come. Data can be generated by different providers, application log files such as Apache Hatted access logs, your REST API logs, from a message broker technology such as Apache Kafka, or even directly from a TCP endpoint. Generally, the data are ingested and then transported to a buffer layer most of the time based on a message broker. We call this part the shipping part of the logging architecture. The buffer part is often used to ensure that no data will be lost and decouple the architecture between the shipping part and the processing part. The message broker should be distributed and guarantees the persistence of data whenever data are streamed in the architecture. After buffering, two kind of processing architecture can be set up: • A processing pipeline that aims to transform, format, validate, and enrich the data before persisting it. Most of the time the technology that ensures that shipping handles this part. Some technologies also handle the shipping, buffering, and processing parts. • A processing pipeline that handles Hadoop-based technology that provides extra feature to the processing such as machine learning capabilities. 57 Chapter 4 ■ Streaming Data Figure 4-1 gives a graphical representation of what we just discussed. Figure 4-1. Anatomy of a typical data ingesting architecture Technologies Let’s put some names now to those high-level blocks. For the ingestion part, the most used technologies might be Apache flume and Logstash from Elastic. Those are the two preferred technologies to ingest stream data from multiple varieties of data sources, process them, and then index them. Flume architecture is scalable and relies on the three-tier structure shown in Figure 4-2. Figure 4-2. Flume architecture The flume source receives data from an external source and stores it in one or more flume channel. Once in the channel, the data stored and kept until the sink properly drains it. The sink writes the message in the target store. Most of the time flume is used in Hadoop architecture because of the simplicity to write in HDFS, for example. Flume is capable to guarantee data delivery from a bunch of data sources such as Avro, Thrift, Syslog, or another flume agent; to HDFS, Log files, Avro, or directly to HBase and even Elasticsearch. Its horizontal scalability ensures fault tolerance and high throughput. On the other side, Logstash is totally different from a structure point of view. Logstash is a single block agent that receives, forwards, and forgets the log. By forget, I don’t mean that Logstash doesn’t keep track of what it just consumed (a file, for example) but that it doesn’t come with a buffer layer that is used to persist the data in order to provide resiliency out of the box. Logstash is written in JRuby and run within a JVM. To prevent downsides of using a JVM for transporting logs, Logstash comes with Logstash-forwarder, which are specific agents used to receive and forward the logs to another program. The typical usage of Logstash relies on the architecture shown in Figure 4-3. 58 Chapter 4 ■ Streaming Daat Figure 4-3. Logstash logging platform typical architecture As we can see, the architecture is divided following the structure discussed earlier, with the following specificity: • The first part is composed of Logstash-forwarders installed next to the log sources, which only receives the data and send them to a Logstash instances group responsible of processing the logs before putting them into a message broker. • The second part is based on a message broker; the recommended ones are RabbitMQ, Kafka, and Redis. For the sake of simplicity and because it would take many pages to go through each and every possible combination, we’ll focused on Apache Kafka, which tends to have the best using traction these days. • The third part is based on the Logstash processors group, which can validate events before indexing them into Elasticsearch. In Logstash, a processing pipeline is a declarative configuration–oriented file, which is composed of inputs, filters, and outputs. 59 Chapter 4 ■ Streaming Data Logstash comes with a palette of inputs/outputs that lets you connect to a variety of data sources such as file, syslog, imp, tcp, xmpp, and more. The full list of supported inputs are available on the following pages: https://www.elastic.co/guide/en/logstash/current/input-plugins.html and https://www.elastic.co/guide/en/logstash/current/output-plugins.html Filters help processing and events by only configuring a filter depending on your need. For example, if you want to convert a field of the received event to an integer, all you have to write is what’s shown in Listing 4-1. Listing 4-1. Logstash filter example filter mutate convert = "fieldname" = "integer" We will examine in deeper detail an example of a processing pipeline that read logs and process them later in this chapter, but for now, here is the link to list of supported filters: https://www.elastic.co/guide/en/logstash/current/filter-plugins.html You might have noticed in the diagram that the proposal here is to index the data in Elasticsearch once it has been processed. Logstash supports different type of stores and has a seamless integration with Elasticsearch. Elasticsearch will play the role of the indexation engine that we’ll use later on to make searches and analytics. The Anatomy of the Ingested Data In this part we’ll discuss the structure of the data we’ll to ingenst in the streaming platform. As mentioned, our goal is to leverage the activity of our website visitor. We’ll then focused on what we call clickstream data. Clickstream Data I don’t think I need to remind you about the key data that you can get from a website in terms of visitor activity. What we are about to describe here, clickstream data, is part of it. 60 Chapter 4 ■ Streaming Daat When you run a website, you will likely want to know how your visitors behave and how you can enhance their experience. Among the challenges for a website to grow are the following: • Site traffic • Unique visitor • Conversion rate • Pay per click traffic volume • Time on site • .... Having those KPI will help you answer questions including: how are my pages accessed? When? How long do my visitors stay on this or that page? Where do people prefer to spend time? How they exit? What is the typical page-to-page navigation? Being aware of what is happening on your website is fundamental and, hopefully, our architecture is made to generate data in logs files that will help us in this way. Typically, the clickstream data are based on the web access log, generated by servers such as Apache Server or NGINX. Depending on your logging configuration, every time a user accesses your website and navigates through pages, for each of these actions a line is written in the access logs. Figure 4-4 gives an idea of the dimension brought by a clickstream data line. Figure 4-4. Anatomy of a Clickstream data 61 Chapter 4 ■ Streaming Data The clickstream data will give directly or indirectly: • Dates such as the timestamp when the event occurred, the time spent by the visitor • The user agent used to browse the website, the version, and devices • The session attached to the visitor, which will help to make correlation between the different lines of data: the session start time, end time, duration • The page, what the visitor browsed, what the type of request was sent to the server, the domain/subdomain • Information about visitor IP addresses, which indirectly will give information about delocalization • Depending on the type of information we get, it can be feasible to identity and map the customers to the clickstream data The Raw Data In this book, we’ll work with Apache HTTP server and use the Apache logging configuration file to produce our Clickstream data. The configuration is quite simple and uses a specific syntax that makes appear in the log line information we need. Listing 4-2 is a typical access log line generate by Apache HTTP server. Listing 4-2. Access log line example 10.10.10.10 - - 10/July/2015:11:1:11 +0100 "GET /path/to/my/resource HTTP/1.1" 200 4392 "http://www.domain.com/referring-page" " Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.132 Safari/537.36" 1224 6793 This line has been produced by the Apache log configuration, described in Listing 4-3. Listing 4-3. Apache logging configuration LogLevel error ErrorLog /home/example/logs/error.log LogFormat "%h %l %u %t \"%r\" %s %b \"%Refereri\" \"%User-Agenti\" %I %O" combinedio CustomLog /home/example/logs/access.log combinedio The line in bold gives the structure of the generated log line: • 10.10.10.10 (%h): IP address attached to the request • 10/July/2015:11:11:11 +0100 (%t): reception request time. The format is day/month/year:hour:minute:second zone • GET /path/to/my/resource HTTP/1.1 (\”%r\”): first line of the request, which contains useful information such as the request type, the path, the protocol • 200 (%s): the http return code • 4392 (%b): response size return to the client without HTTP headers in bytes • %Referer: the Referrer HTTP request header • %User-Agent: the User Agent HTTP request header • %I: bytes received including HTTP header • %O: bytes sent including headers 62 Chapter 4 ■ Streaming Daat As you can see, the data are structured by Apache in the different corresponding part but, at the end of the day, the application that will receive the logs will considered these data as unstructured, because parsing them by a third-party application is not that obvious. You then need a technology that can consume these line as they are appended to the access log file or just ship the file, and also that is capable of transforming these data into structured document for later analysis and search. That’s where Logstash comes into play; we’ll now set up our streaming architecture and start shipping and ingesting data. The Log Generator There might some public dataset that we would be able to use for testing our streaming architecture, but for the sake of simplicity, I’d rather implement a small log generator to create our access logs. I’ve chose Python for implementing that generator, as shown in Listing 4-4. Listing 4-4. Access log code generator /usr/bin/python import time import datetime import random timestr = time.strftime("%Y%m%d-%H%M%S") f = open('../source/access_log_'+timestr+'.log','w') ips with open('ips.txt') as ips_file: ips = ips_file.read().splitlines() referers with open('referers.txt') as referers_file: referers = referers_file.read().splitlines() resources with open('resources.txt') as resources_file: resources = resources_file.read().splitlines() user agents with open('user_agents.txt') as user_agents_file: useragents = user_agents_file.read().splitlines() codes with open('codes.txt') as codes_file: codes = codes_file.read().splitlines() requests with open('requests.txt') as requests_file: requests = requests_file.read().splitlines() event_time = datetime.datetime(2013,10,10) 63 Chapter 4 ■ Streaming Data for i in xrange(0,50000): increment = datetime.timedelta(seconds=random.randint(30,300)) event_time += increment uri = random.choice(resources) if uri.find("Store")0: uri += `random.randint(1000,1500)` ip = random.choice(ips) useragent = random.choice(useragents) referer = random.choice(referers) code = random.choice(codes) request= random.choice(requests) f.write('%s - - %s "%s %s HTTP/1.0" %s %s "%s" "%s"\n' % (random.choice(ips),event_time.strftime('%d/%b/%Y:%H:%M:%S %z'),request,uri,coe,random.randint (2000,5000),referer,useragent)) This generator relies on different text files and generates in the ../source/access_log_ as much as specified when you launch the script, like in Listing 4-5. Listing 4-5. Launching the generator ./generator.py 50000 The previous line will generate 50,000 lines of access logs. You can easily customize the content, which is used to generate the file by updating the text fles. Note that all assets used in Listing 4-4 can be found on the following Github repo: https://github.com/bahaaldine/scalable-big-data-architecture/tree/master/chapter4/generator Setting Up the Streaming Architecture To set up our streaming architecture, we’ll split the work into two parts: shipping and processing. Shipping the Logs in Apache Kafka Shipping data consist to transport them from the machine that generates the data to the one which will ingest it, and process it. This is what we will cover here with Logstash-forwarder. Then data will be stored into an asynchronous reliable persistent layer that will ensure message delivery. The message broker here is Apache Kafka. The Forwarders In the following section, we will first start by installing the forwarder, and configuring it, specifically the security part ensured by Lumberjack, the secured transport protocol. 64 Chapter 4 ■ Streaming Daat Installation Logstash-forwarder is written in Go and can be installed either by an RPM or directly from the sources. I’ll use the second way and follow these steps: • Install Go from this URL: http://golang.org/doc/install • Fetch the source from the git repository: git clone git://github.com/elasticsearch/logstash-forwarder.git • And finally compile it: cd logstash-forwarder go build -o logstash-forwarder Logstash installation is straightforward. You just need to download the appropriate package on the following page: https://www.elastic.co/downloads/logstash Lumberjack: The Secured Protocol Forwarder will be installed next to the data sources, and will transport and send the logs to the Logstash processors. This uses the lumberjack protocol, which is secured and requires a valid private key (.key file) and an SSL certificate (.crt file). Both Logstash forwarder and Logstash server will use these assets to secure the data transport. To generate a valid KEY and CRT file, run the command described in listing 4-6. Listing 4-6. Key and Certificate command line openssl req -x509 -batch -nodes -newkey rsa:2048 -keyout lumberjack.key -out lumberjack.crt -subj /CN=localhost This command will generate lumberjack.key and lumberjack.crt. An important thing to mention here is about the certificate. The subject used to generate the certificate should be the same that the one that will be used in the Logstash-forwarder file. Here we generate the certificate with a CN=local host, then in the forwarder file we’ll need to use localhost as well. More information about this can be found in this page: https://github.com/elastic/logstash-forwarderimportant-tlsssl-certificate-notes 65 Chapter 4 ■ Streaming Data The Configuration Files Logstash-forwarder configuration is composed of network and file subconfiguration; Listing 4-7 shows the configuration that I’m using for the example project. Listing 4-7. Logstash configuration file, which can be found:https://github.com/bahaaldine/ scalable-big-data-architecture/blob/master/chapter4/logstash/forwarder/forwarder.json "network": "servers": "HOST:PORT" , "ssl certificate": "path/to/the/crt/file", "ssl key": "path/to/the/key/file", "ssl ca": "path/to/the/crt/file", "timeout": 15 , "files": "paths": "path/to/the/log/files" , "fields": "type": "access_log" , "paths": "-" , "fields": "type": "stdin" Let’s explain just a little bit this configuration: • Network configuration basically uses an array of target downstream servers that will receive the log files. Each item in the servers array is a location to a server, ex: localhost: 5043 • Then the ssl entries point to the .key and .crt files that we have just generated earlier. • Last parameters is the timeout that defines the time that a Logstash-forwarder will wait for a downstream server to wait before assuming the connection is interrupted and then try to connect to another server from the server’s array. • The files entry is an array of the data that will be transported; in my example, I’m using: • An absolute path to directory that will contain the example log files used for this book. The field value set the event type. • A special path contained a “-“, which means that the Logstash-forward will forward the standard input content, in other words, whatever you type in the command line prompt. This is a good way to test the transport pipeline between Logstash-forward and Logstash server. 66 Chapter 4 ■ Streaming Daat There are more options to configure the files path such as using wildcards that are really practical for rotating log files. More information on the configuration can be found on the Logstash-forwarder Github repository: https://github.com/elastic/logstash-forwarder Logstash processing agent also needs a configuration file. It follows a direct structure as mentioned in this chapter. Listing 4-8 shows the one that I’m using for this book. Listing 4-8. Logstash processing pipeline example, which can be found as:https://github.com/ bahaaldine/scalable-big-data-architecture/blob/master/chapter4/logstash/processor/ forwarder_to_kafka.conf input lumberjack port = "5043" ssl_certificate = "path/to/crt/file" ssl_key = "path/to/key/file" filter grok match = "message" = "%COMBINEDAPACHELOG" output stdout codec = rubydebug kafka topic_id = "clickstream" Three parts are present in this configuration file: • Input: contains the input stream that Logstash listens to, to receive the data. Here we listening on localhost:5043 to our Logstash-forwarder and, as mentioned, we are using the .key and .crt file. We also • Filter: takes each line contains in the received logs line and transforms them into a structure event. Here we are using grok filter, which is particularly useful when dealing with Apache HTTP server combined log. The grok combined log pattern is a composition of multiple grok pattern. It has the declaration described in Listing 4-9. 67 Chapter 4 ■ Streaming Data Listing 4-9. Grok Apache commong log & combined log pattern COMMONAPACHELOG %IPORHOST:clientip %USER:ident %USER:auth \%HTTPDATE:timestamp\ "(?:%WORD:verb %NOTSPACE:request(?: HTTP/%NUMBER:httpversion)?%DATA:rawrequest)" %NUMBER:response (?:%NUMBER:bytes-) COMBINEDAPACHELOG %COMMONAPACHELOG %QS:referrer %QS:agent The COMBINEDAPACHELOG pattern relies on two other patterns: COMMONAPACHELOG and QS patterns. Indeed, grok comes with a set of predefined patterns that can be reused and composed to create multilevel patterns. You can get more information on the grok filter on this page: https://www.elastic.co/guide/en/logstash/current/ plugins-filters-grok.html And more on the predefined patterns here: https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns Output: connection to our Apache Kafka server to put the data there. We also used a stdout output with a ruby codec, which is really handy to debug the pipeline and see the structure of the event. The Message Broker Now we will configure our Apache Kafka cluster. To do so, we’ll create a cluster of two brokers that are exposing a clickstream topic. A topic is basically the channel on which the Logstash processor will publish each events. Then another program will subscribe to the topic and consume the message; in the case of our architecture this will be done either by Logstash or by Apache Spark. Downloading, installing, and configuring Apache Kafka is pretty straightforward and requires the following steps: • Start by downloading Kafka on the following page: http://kafka.apache.org/downloads.html • Extract and access the config folder under the installation directory to locate the server.properties file • We will duplicate that file and create two separate files for our cluster of two brokers. You should have server-1.properties and server-2.properties. • Modify the properties, as Table 4-1 shows, within the two files so there won’t be any clashes between the two instances. Table 4-1. server-1.properties server-2.properties broker.id 1 2 port 9092 9093 log.dirs /tmp/kafka-logs-1 /tmp/kafka-logs-2 68 Chapter 4 ■ Streaming Daat The three properties respectively represent the broker identifier, the broker listening port, and the broker-logging directory. Both files can be found on the following repo: https://github.com/bahaaldine/scalable-big-data-architecture/tree/master/chapter4/kafka We are now ready to connect the dots and run the first part of our architecture to validate it. Running the Magic We’ll run each block of the shipping part from downstream to upstream; thus, we’ll start with Kafka. Kafka relies on Apache Zookeeper especially for managing the cluster of brokers; we’ll first launch it from the installation directory with the command shown in Listing 4-10. Listing 4-10. Zookeper start command bin/zookeeper-server-start.sh config/zookeeper.properties This will output a certain amount of log lines but you should have something that indicates that the Zookeeper server is running, as shown in Listing 4-11. Listing 4-11. Zookeper starting logs ... 2015-07-11 22:51:39,787 INFO Starting server (org.apache.zookeeper.server. ZooKeeperServerMain) ... 2015-07-11 22:51:39,865 INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory) ... Now we’ll start both brokers’ instance, with the commands shown in Listing 4-12. Listing 4-12. Kafka servers start commands bin/kafka-server-start.sh path/to/server-1.properties & bin/kafka-server-start.sh path/to/server-2.properties & You should get the lines of logs for the first broker as shown in Listing 4-13. Listing 4-13. First broker startup log lines 2015-07-11 22:59:36,934 INFO Kafka Server 1, started (kafka.server.KafkaServer) 2015-07-11 22:59:37,006 INFO New leader is 1 (kafka.server.ZookeeperLeaderElectorLeaderC hangeListener) Listing 4-14 shows the log lines for the second broker. Listing 4-14. Second broker startup log lines 2015-07-11 23:00:46,592 INFO Kafka Server 2, started (kafka.server.KafkaServer) 69 Chapter 4 ■ Streaming Data Listing 4-15 shows how to create a topic in our cluster. Listing 4-15. Kafka create topic command bin/kafka-topics.sh create zookeeper localhost:2181 replication-factor 3 partitions 1 topic clickstream Why is there a replication-factor of 2? Because our cluster will be able to tolerate up to one server failure without losing any data. To be sure that our shipping part messages are consumable downstream, we’ll run a test consumer as shown in Listing 4-16. Listing 4-16. Kafka test consumer command bin/kafka-console-consumer.sh zookeeper localhost:2181 from-beginning –topic clickstream Now let’s run the Logstah processing agent, from the Logstash agent installation directory, as shown in Listing 4-17. Listing 4-17. Logstash processing agent start command /bin/logstash -f /path/to/forwarder_to_kafka.conf which should have the output shown in Listing 4-18. Listing 4-18. Logstash start logs Logstash startup completed Finally, the Logstash-forwarder is shown in Listing 4-19. Listing 4-19. Logstash-forwarder start command /Applications/logstash-forwarder/logstash-forwarder -config forwarder.json This should produce the output shown in Listing 4-20. Listing 4-20. Logstash-forwarder start logs 2015/07/12 00:00:29.085015 Connecting to ::1:5043 (localhost) 2015/07/12 00:00:29.246207 Connected to ::1 We are ready to generate an access log file and see the stream through our shipping pipeline, launch the script, and generate one line as shown in Listing 4-21. Listing 4-21. Log generator start command ./generator.py 1 70 Chapter 4 ■ Streaming Daat Let’s check the log on the Logstash processor logs, as shown in Listing 4-22. Listing 4-22. Logstash processor logs "message" = "10.10.10.15 - - 10/Oct/2013:00:03:47 +0000 \"GET /products/product3 HTTP/1.0\" 401 3627 \"http://www.amazon.com\" \"Mozilla/5.0 (Windows; U; MSIE 9.0; WIndows NT 9.0; en-US))\" ", "version" = "1", "timestamp" = "2015-07-11T22:19:19.879Z", "file" = "/Users/bahaaldine/Dropbox/apress/demo/chapter4/source/ access_log_20150712-001915.log", "host" = "MBPdeBaaaldine2", "offset" = "0", "type" = "access_log", "clientip" = "10.10.10.15", "ident" = "-", "auth" = "-", "timestamp" = "10/Oct/2013:00:03:47 +0000", "verb" = "GET", "request" = "/products/product3", "httpversion" = "1.0", "response" = "401", "bytes" = "3627", "referrer" = "\"http://www.amazon.com\"", "agent" = "\"Mozilla/5.0 (Windows; U; MSIE 9.0; WIndows NT 9.0; en-US))\"" As said earlier, the ruby codec from the stdout output is really practical to see the content of the streamed event. We can see in the previous log that the grok has done its job and parsed the log line and structured the event properly to the expected format. Now we should also be able to see the same message on the Kafka consumer side but in a JSON format, as shown in Listing 4-23. Listing 4-23. Kafka consumer log file "message": "10.10.10.15 - - 10\/Oct\/2013:00:03:47 +0000 \"GET \/products\/product3 HTTP\/1.0\" 401 3627 \"http:\/\/www.amazon.com\" \"Mozilla\/5.0 (Windows; U; MSIE 9.0; WIndows NT 9.0; en-US))\" ", "version": "1", "timestamp": "2015-07-11T22:19:19.879Z", "file": "\/Users\/bahaaldine\/Dropbox\/apress\/demo\/chapter4\/source\/access_ log_20150712-001915.log", "host": "MBPdeBaaaldine2", "offset": "0", "type": "access_log", "clientip": "10.10.10.15", "ident": "-", "auth": "-", "timestamp": "10\/Oct\/2013:00:03:47 +0000", "verb": "GET", 71 Chapter 4 ■ Streaming Data "request": "\/products\/product3", "httpversion": "1.0", "response": "401", "bytes": "3627", "referrer": "\"http:\/\/www.amazon.com\"", "agent": "\"Mozilla\/5.0 (Windows; U; MSIE 9.0; WIndows NT 9.0; en-US))\"" So far so good; we are sure now that our shipping and processing pipeline works from end to end. Let’s now go on the draining side and index our data. Draining the Logs from Apache Kafka Draining consists of using a consumer agent, which connects on Kafka and consume the message in the topic. This work will be done again by Logstash but with a different configuration for the agent, as this last one, will index the data in Elasticsearch. Configuring Elasticsearch Before draining the data, we need to configure our endpoint, the Elasticsearch cluster. Installation Regardless of the method that we’ll use to index the data in Elasticsearch, from Logstash or from Spark, we’ll need to set up an Elasticsearch cluster. In this book we’ll simulate a real-life example and work on a single node cluster, but adding more Elasticsearch node takes a matter of seconds. Download and extract Elasticsearch from the following link: https://www.elastic.co/downloads/elasticsearch After extracting, you’ll need to install a plugin that will help us to have a better a better understanding of what’s happening in our Elasticsearch, Marvel. To do so, go in the Elasticsearch extracted folder and run the following command: bin/plugin -i elasticsearch/marvel/latest It’s worth mentioning here that the command may vary between the Marvel version used in this book and the latest release. For more information, please refer to the Marvel installation documentation here: https://www.elastic.co/guide/en/marvel/current/installing-marvel.html This will install Marvel in couple of seconds. Then run Elasticsearch: bin/elasticsearch Browse the following URL to the Marvel console and go in the Shard Allocation from the top right menu: http://localhost:9200/_plugin/marvel, as shown in Figure 4-5. 72 Chapter 4 ■ Streaming Daat Figure 4-5. Accessing the Shard Allocation dashboard As you can see, Marvel is mentioning that there are two unassigned replicas in the cluster, which turn the overall status to Yellow. This is completely normal as long as we are running a single instance cluster. Elasticsearch won’t assign a replica shard to the same node where the primary shard is hosted; that wouldn’t make any sense. Remember that the replica shards provide resiliency to your cluster; in our case we would need to launch another instance of Elasticsearch but as we are in a test environment that would be too much. Let’s then change the cluster configuration by using Elasticsearch indices settings API, switch to Sense console from the right menu by clicking on Sense, you will land on the Sense console that help running API queries easily with autocompletion. 73 Chapter 4 ■ Streaming Data What we want to do here is to update the number of replicas for every shard in indeces created in our cluster to 0, and to turn the cluster state to green. Copy and paste the query in Listing 4-24 and click the run button. Listing 4-24. Indeces API to update the number of replica to 0 PUT /_settings "index" : "number_of_replicas" : 0 Sense will return an acknowledgement response as shown in Figure 4-6. Figure 4-6. Query result As you can see, the request is pretty simple to understand: it uses the settings API and send a PUT request with a body containing an index object to specify that each shard in index should have 0 replicas. If you go back now to the shard allocation view, you should have a green cluster state. Creating the Index Template We now will create a mapping for our data, and set the correct parameters for our index, document, and fields. In sense, issue the request described in Listing 4-25. Listing 4-25. Clickstream indeces template PUT _template/clickstream "template": "clickstream-", "settings": "number_of_shards": 1, "number_of_replicas": 0 , "mappings": "_default_": "dynamic_templates": 74 Chapter 4 ■ Streaming Daat "string_fields": "mapping": "index": "not_analyzed", "omit_norms": true, "type": "string" , "match_mapping_type": "string", "match": "" , "_all": "enabled": true , "properties": "response": "type": "integer", "bytes": "type": "integer" Here we are creating a template called clickstream that will be applied to every index created in Elasticsearch which name will math the clickstream- regex, we’ll see the reason for this later on in this chapter. It contains the main part: • The settings: we are defining here the number of shard and replica that we will need in our cluster for the clickstream indices. This is equivalent to what we have done earlier but now packaged in a template and apply to every new clickstream index. • The mappings: this defined our the data should be analyzed by Elasticsearch. Elasticsearch comes with a very important notion, called analyzer, which basically defined our fields should be interpreted and structure for search purpose. You can for example use a tokenizer and split your field content into multiple token. The analyzers concept is really rich and I would recommend that you read the following documentation for better understanding of what is happening here: https://www.elastic.co/guide/en/elasticsearch/reference/1.6/ analysis-analyzers.html • I’ve chosen to not analyze every string field by default so those will basically be searchable as they are. Then I’ve applied specific mapping to the response and bytes fields to tell Elasticsearch that these will be integers. Keep in mind that the mappings should be done before indexation; otherwise, you will have to reindex your entire data. Indexing the Data Directly with Logstash Now that Elasticsearch is configured and running, we can launch our Logstash indexer instance and drain Kafka. 75 Chapter 4 ■ Streaming Data The Configuration File To store the documents created by our clickstream logging architecture, we could create a single index and store everything in it. But as our site traffic grows, the amount of data will grow, so we should consider splitting our index based on time, which gives more sense here. Think, for example, that the more you are getting data, the more you have old data that might not be useful, which will likely be archive to leave more server resource to the fresher data. The good news is that it’s super easy and is done directly at the indexation level, in other words, by Logstash. Listing 4-26 is the configuration of our Logstash indexer. Listing 4-26. Logstash indexer configuration and can be found inhttps://github.com/bahaaldine/ scalable-big-data-architecture/blob/master/chapter4/logstash/indexer/kafka_to_elasticsearch.conf input kafka topic_id = "clickstream" filter output stdout codec = rubydebug elasticsearch index = "clickstream-%+YYYY.MM.dd" manage_template = false host = localhost protocol = http As you can see, the configuration is quite simple compared to the processing agent, and comes with a new output dedicated to index in Elasticsearch with the following settings: • index: is the index name that we want to index the data into; here the index is pattern and will generate a day based index name • manage_template: whether or not Logstash should manage the index template; As we have created one earlier this is not needed. • host and protocol: these are obvious and point to our Elasticsearch cluster Run and Index the Sata Let’s run our Logstash index and see what happen in Elasticsearch. Issue the command shown in Listing 4-27. Listing 4-27. Logstash indexer start command bin/logstash -f /path/to/kafka_to_elasticsearch.conf Generate a single event log file with the python script, and, like earlier, you should see the ruby debug codec outputting the event structure. Let’s focus on Elasticsearch and go into the Shard Allocation dashboard to see if our clickstream index has been created, as shown in Figure 4-7. 76

Advise: Why You Wasting Money in Costly SEO Tools, Use World's Best Free SEO Tool Ubersuggest.