Apache Spark streaming example

apache spark streaming twitter example and apache spark real time streaming
HartJohnson Profile Pic
HartJohnson,United States,Professional
Published Date:02-08-2017
Your Website URL(Optional)
Comment
Chapter 3 Apache Spark Streaming The Apache Streaming module is a stream processing-based module within Apache Spark. It uses the Spark cluster to offer the ability to scale to a high degree. Being based on Spark, it is also highly fault tolerant, having the ability to rerun failed tasks by checkpointing the data stream that is being processed. The following areas will be covered in this chapter after an initial section, which will provide a practical overview of how Apache Spark processes stream-based data: • Error recovery and checkpointing • TCP-based Stream Processing • File Streams • Flume Stream source • Kafka Stream source For each topic, I will provide a worked example in Scala, and will show how the stream-based architecture can be set up and tested. 61 aApache Spark Streaming Overview When giving an overview of the Apache Spark streaming module, I would advise you to check thehttp://spark.apache.org/ website for up-to-date information, as well as the Spark-based user groups such asuserspark.apache.org. My reason for saying this is because these are the primary places where Spark information is available. Also the extremely fast (and increasing) pace of change means that by the time you read this new Spark functionality and versions, will be available. So, in the light of this, when giving an overview, I will try to generalize. Kafka Flume HDFS HDFS/S3 MLlib SQL Databases Kinesis GraphX Twitter Dashboards Metric System Graphite / Grafana Lucidworks banana The previous figure shows potential data sources for Apache Streaming, such as Kafka, Flume, and HDFS. These feed into the Spark Streaming module, and are processed as discrete streams. The diagram also shows that other Spark module functionality, such as machine learning, can be used to process the stream-based data. The fully processed data can then be an output for HDFS, databases, or dashboards. This diagram is based on the one at the Spark streaming website, but I wanted to extend it for both—expressing the Spark module functionality, and for dashboarding options. The previous diagram shows a MetricSystems feed being fed from Spark to Graphite. Also, it is possible to feed Solr-based data to Lucidworks banana (a port of kabana). It is also worth mentioning here that Databricks (see Chapter 8, Spark Databricks and Chapter 9, Databricks Visualization) can also present the Spark stream data as a dashboard. time 1 time 2 time 3 time 4 time 5 original DStream window-based operation windowed DStream window window window at time 5 at time 1 at time 3 62 aChapter 3 When discussing Spark discrete streams, the previous figure, again taken from the Spark website athttp://spark.apache.org/, is the diagram I like to use. The green boxes in the previous figure show the continuous data stream sent to Spark, being broken down into a discrete stream (DStream). The size of each element in the stream is then based on a batch time, which might be two seconds. It is also possible to create a window, expressed as the previous red box, over the DStream. For instance, when carrying out trend analysis in real time, it might be necessary to determine the top ten Twitter-based Hashtags over a ten minute window. So, given that Spark can be used for Stream processing, how is a Stream created? The following Scala-based code shows how a Twitter stream can be created. This example is simplified because Twitter authorization has not been included, but you get the idea (the full example code is in the Checkpointing section). The Spark stream context, calledssc, is created using the spark contextsc. A batch time is specified when it is created; in this case, five seconds. A Twitter-based DStream, called stream, is then created from theStreamingcontext using a window of 60 seconds: val ssc = new StreamingContext(sc, Seconds(5) ) val stream = TwitterUtils.createStream(ssc,None).window( Seconds(60) ) The stream processing can be started with the stream context start method (shown next), and theawaitTermination method indicates that it should process until stopped. So, if this code is embedded in a library-based application, it will run until the session is terminated, perhaps with a Crtl + C: ssc.start() ssc.awaitTermination() This explains what Spark streaming is, and what it does, but it does not explain error handling, or what to do if your stream-based application fails. The next section will examine Spark streaming error management and recovery. Errors and recovery Generally, the question that needs to be asked for your application is; is it critical that you receive and process all the data? If not, then on failure you might just be able to restart the application and discard the missing or lost data. If this is not the case, then you will need to use checkpointing, which will be described in the next section. It is also worth noting that your application's error management should be robust and self-sufc fi ient. What I mean by this is that; if an exception is non-critical, then manage the exception, perhaps log it, and continue processing. For instance, when a task reaches the maximum number of failures (specie fi d by spark.task.maxFailures), it will terminate processing. 63 aApache Spark Streaming Checkpointing It is possible to set up an HDFS-based checkpoint directory to store Apache Spark- based streaming information. In this Scala example, data will be stored in HDFS, under/data/spark/checkpoint. The following HDFS file system ls command shows that before starting, the directory does not exist: hadoophc2nn stream hdfs dfs -ls /data/spark/checkpoint ls: `/data/spark/checkpoint': No such file or directory The Twitter-based Scala code sample given next, starts by defining a package name for the application, and by importing Spark, streaming, context, and Twitter-based functionality. It then defines an application object named stream1: package nz.co.semtechsolutions import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming.StreamingContext._ object stream1 Next, a method is defined called createContext, which will be used to create both the spark, and streaming contexts. It will also checkpoint the stream to the HDFS-based directory using the streaming context checkpoint method, which takes a directory path as a parameter. The directory path being the value (cpDir) that was passed into thecreateContext method: def createContext( cpDir : String ) : StreamingContext = val appName = "Stream example 1" val conf = new SparkConf() conf.setAppName(appName) val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(5) ) ssc.checkpoint( cpDir ) 64 www.finebook.irChapter 3 ssc Now, the main method is defined, as is the HDFS directory, as well as Twitter access authority and parameters. The Spark streaming contextssc is either retrieved or created using the HDFScheckpoint directory via theStreamingContext method— getOrCreate. If the directory doesn't exist, then the previous method called createContext is called, which will create the context and checkpoint. Obviously, I have truncated my own Twitter auth. keys in this example for security reasons: def main(args: ArrayString) val hdfsDir = "/data/spark/checkpoint" val consumerKey = "QQpxx" val consumerSecret = "0HFzxx" val accessToken = "323xx" val accessTokenSecret = "IlQxx" System.setProperty("twitter4j.oauth.consumerKey", consumerKey) System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) System.setProperty("twitter4j.oauth.accessToken", accessToken) System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) val ssc = StreamingContext.getOrCreate(hdfsDir, () = createContext( hdfsDir ) ) val stream = TwitterUtils.createStream(ssc,None).window( Seconds(60) ) // do some processing ssc.start() ssc.awaitTermination() // end main 65 www.finebook.irApache Spark Streaming Having run this code, which has no actual processing, the HDFScheckpoint directory can be checked again. This time it is apparent that thecheckpoint directory has been created, and the data has been stored: hadoophc2nn stream hdfs dfs -ls /data/spark/checkpoint Found 1 items drwxr-xr-x - hadoop supergroup 0 2015-07-02 13:41 /data/spark/checkpoint/0fc3d94e-6f53-40fb-910d-1eef044b12e9 This example, taken from the Apache Spark website, shows how checkpoint storage can be set up and used. But how often is checkpointing carried out? The Meta data is stored during each stream batch. The actual data is stored with a period, which is the maximum of the batch interval, or ten seconds. This might not be ideal for you, so you can reset the value using the method: DStream.checkpoint( newRequiredInterval ) WherenewRequiredInterval is the new checkpoint interval value that you require, generally you should aim for a value which is five to ten times your batch interval. Checkpointing saves both the stream batch and metadata (data about the data). If the application fails, then when it restarts, the checkpointed data is used when processing is started. The batch data that was being processed at the time of failure is reprocessed, along with the batched data since the failure. Remember to monitor the HDFS disk space being used for check pointing. In the next section, I will begin to examine the streaming sources, and will provide some examples of each type. Streaming sources I will not be able to cover all the stream types with practical examples in this section, but where this chapter is too small to include code, I will at least provide a description. In this chapter, I will cover the TCP and file streams, and the Flume, Kafka, and Twitter streams. I will start with a practical TCP-based example. This chapter examines stream processing architecture. For instance, what happens in cases where the stream data delivery rate exceeds the potential data processing rate? Systems like Kafka provide the possibility of solving this issue by providing the ability to use multiple data topics and consumers. 66 www.finebook.irChapter 3 TCP stream There is a possibility of using the Spark streaming context method called socketTextStream to stream data via TCP/IP, by specifying a hostname and a port number. The Scala-based code example in this section will receive data on port10777 that was supplied using thenetcat Linux command. The code sample starts by defining the package name, and importing Spark, the context, and the streaming classes. The object class namedstream2 is defined, as it is the main method with arguments: package nz.co.semtechsolutions import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object stream2 def main(args: ArrayString) The number of arguments passed to the class is checked to ensure that it is the hostname and the port number. A Spark congurati fi on object is created with an application name defined. The Spark and streaming contexts are then created. Then, a streaming batch time of 10 seconds is set: if ( args.length 2 ) System.err.println("Usage: stream2 host port") System.exit(1) val hostname = args(0).trim val portnum = args(1).toInt val appName = "Stream example 2" val conf = new SparkConf() conf.setAppName(appName) val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(10) ) 67 www.finebook.irApache Spark Streaming A DStream calledrawDstream is created by calling thesocketTextStream method of the streaming context using the host and port name parameters. val rawDstream = ssc.socketTextStream( hostname, portnum ) A top-ten word count is created from the raw stream data by splitting words by spacing. Then a (key,value) pair is created as(word,1), which is reduced by the key value, this being the word. So now, there is a list of words and their associated counts. Now, the key and value are swapped, so the list becomes (count andword). Then, a sort is done on the key, which is now the count. Finally, the top 10 items in therdd, within the DStream, are taken and printed out: val wordCount = rawDstream .flatMap(line = line.split(" ")) .map(word = (word,1)) .reduceByKey(_+_) .map(item = item.swap) .transform(rdd = rdd.sortByKey(false)) .foreachRDD( rdd = rdd.take(10).foreach(x=println("List : " + x)) ) The code closes with the Spark Streaming start, andawaitTermination methods being called to start the stream processing and await process termination: ssc.start() ssc.awaitTermination() // end main // end stream2 The data for this application is provided, as I stated previously, by the Linuxnetcat (nc) command. The Linuxcat command dumps the contents of a log file, which is piped tonc. Thelk options forcenetcat to listen for connections, and keep on listening if the connection is lost. This example shows that the port being used is 10777: roothc2nn log pwd /var/log roothc2nn log cat ./anaconda.storage.log nc -lk 10777 68 www.finebook.irChapter 3 The output from this TCP-based stream processing is shown here. The actual output is not as important as the method demonstrated. However, the data shows, as expected, a list of 10 log file words in descending count order. Note that the top word is empty because the stream was not filtered for empty words: List : (17104,) List : (2333,=) List : (1656,:) List : (1603,;) List : (1557,DEBUG) List : (564,True) List : (495,False) List : (411,None) List : (356,at) List : (335,object) This is interesting if you want to stream data using Apache Spark streaming, based upon TCP/IP from a host and port. But what about more exotic methods? What if you wish to stream data from a messaging system, or via memory-based channels? What if you want to use some of the big data tools available today like Flume and Kafka? The next sections will examine these options, but first I will demonstrate how streams can be based upon files. File streams I have modified the Scala-based code example in the last section, to monitor an HDFS-based directory, by calling the Spark streaming context method called textFileStream. I will not display all of the code, given this small change. The application class is now calledstream3, which takes a single parameter—theHDFS directory. The directory path could be on NFS or AWS S3 (all the code samples will be available with this book): val rawDstream = ssc.textFileStream( directory ) The stream processing is the same as before. The stream is split into words, and the top-ten word list is printed. The only difference this time is that the data must be put into theHDFS directory while the application is running. This is achieved with the HDFS file system put command here: roothc2nn log hdfs dfs -put ./anaconda.storage.log /data/spark/stream 69 www.finebook.irApache Spark Streaming As you can see, theHDFS directory used is/data/spark/stream/, and the text-based source log file is anaconda.storage.log (under/var/log/). As expected, the same word list and count is printed: List : (17104,) List : (2333,=) …….. List : (564,True) List : (495,False) List : (411,None) List : (356,at) List : (335,object) These are simple streaming methods based on TCP, and file system data. But what if I want to use some of the built-in streaming functionality within Spark streaming? This will be examined next. The Spark streaming Flume library will be used as an example. Flume Flume is an Apache open source project and product, which is designed to move large amounts of data at a big data scale. It is highly scalable, distributed, and reliable, working on the basis of data source, data sink, and data channels, as the diagram here, taken from thehttp://flume.apache.org/ website, shows: Source Sink Web Server Channel Agent HDFS Flume uses agents to process data streams. As can be seen in the previous figure, an agent has a data source, a data processing channel, and a data sink. A clearer way to describe this is via the following figure. The channel acts as a queue for the sourced data and the sink passes the data to the next link in the chain. 70 www.finebook.irChapter 3 Flume Agent Source Channel Sink Incoming Event Outgoing Queue Events Events Flume agents can form Flume architectures; the output of one agent's sink can be the input to a second agent. Apache Spark allows two approaches to using Apache Flume. The first is an Avro push-based in-memory approach, whereas the second one, still based on Avro, is a pull-based system, using a custom Spark sink library. I installed Flume via the Cloudera CDH 5.3 cluster manager, which installs a single agent. Checking the Linux command line, I can see that Flume version 1.5 is now available: roothc2nn flume-ng version Flume 1.5.0-cdh5.3.3 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: b88ce1fd016bc873d817343779dfff6aeea07706 Compiled by jenkins on Wed Apr 8 14:57:43 PDT 2015 From source with checksum 389d91c718e03341a2367bf4ef12428e The Flume-based Spark example that I will initially implement here, is the Flume-based push approach, where Spark acts as a receiver, and Flume pushes the data to Spark. The following figure represents the structure that I will implement on a single node: 71 www.finebook.irApache Spark Streaming The message data will be sent to port10777 on a host calledhc2r1m1 using the Linuxnetcat (nc) command. This will act as a source (source1) for the Flume agent (agent1), which will have an in-memory channel calledchannel1. The sink used by agent1 will be Apache Avro based, again on a host calledhc2r1m1, but this time, the port number will be11777. The Apache Spark Flume applicationstream4 (which I will describe shortly) will listen for Flume stream data on this port. I start the streaming process by executing thenetcat (nc) command next, against the10777 port. Now, when I type text into this window, it will be used as a Flume source, and the data will be sent to the Spark application: hadoophc2nn nc hc2r1m1.semtech-solutions.co.nz 10777 In order to run my Flume agent,agent1, I have created a Flume configuration file calledagent1.flume.cfg, which describes the agent's source, channel, and sink. The contents of the file are as follows. The first section defines the agent1 source, channel, and sink names. agent1.sources = source1 agent1.channels = channel1 agent1.sinks = sink1 The next section defines source1 to be netcat based, running on the host called hc2r1m1, and10777 port: agent1.sources.source1.channels=channel1 agent1.sources.source1.type=netcat agent1.sources.source1.bind=hc2r1m1.semtech-solutions.co.nz agent1.sources.source1.port=10777 Theagent1 channel,channel1, is defined as a memory-based channel with a maximum event capacity of 1000 events: agent1.channels.channel1.type=memory agent1.channels.channel1.capacity=1000 Finally, theagent1 sink,sink1, is defined as an Apache Avro sink on the host called hc2r1m1, and11777 port: agent1.sinks.sink1.type=avro agent1.sinks.sink1.hostname=hc2r1m1.semtech-solutions.co.nz agent1.sinks.sink1.port=11777 agent1.sinks.sink1.channel=channel1 72 www.finebook.irChapter 3 I have created a Bash script calledflume.bash to run the Flume agent,agent1. It looks like this: hadoophc2r1m1 stream more flume.bash /bin/bash run the bash agent flume-ng agent \ conf /etc/flume-ng/conf \ conf-file ./agent1.flume.cfg \ -Dflume.root.logger=DEBUG,INFO,console \ -name agent1 The script calls the Flume executableflume-ng, passing theagent1 congu fi ration file. The call specifies the agent named agent1. It also specifies the Flume configuration directory to be /etc/flume-ng/conf/, the default value. Initially, I will use anetcat Flume source with a Scala-based example to show how data can be sent to an Apache Spark application. Then, I will show how an RSS-based data feed can be processed in a similar way. So initially, the Scala code that will receive the netcat data looks like this. The class package name and the application class name are defined. The necessary classes for Spark and Flume are imported. Finally, the main method is defined: package nz.co.semtechsolutions import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.flume._ object stream4 def main(args: ArrayString) 73 www.finebook.irApache Spark Streaming The host and port name arguments for the data stream are checked and extracted: if ( args.length 2 ) System.err.println("Usage: stream4 host port") System.exit(1) val hostname = args(0).trim val portnum = args(1).toInt println("hostname : " + hostname) println("portnum : " + portnum) The Spark and streaming contexts are created. Then, the Flume-based data stream is created using the stream context host and port number. The Flume-based class FlumeUtils has been used to do this by calling it'screateStream method: val appName = "Stream example 4" val conf = new SparkConf() conf.setAppName(appName) val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(10) ) val rawDstream = FlumeUtils.createStream(ssc,hostname,portnum) Finally, a stream event count is printed, and (for debug purposes while we test the stream) the stream content is dumped. After this, the stream context is started and configured to run until terminated via the application: rawDstream.count() .map(cnt = " Received events : " + cnt ) .print() rawDstream.map(e = new String(e.event.getBody.array() )) .print ssc.start() 74 www.finebook.irChapter 3 ssc.awaitTermination() // end main // end stream4 Having compiled it, I will run this application usingspark-submit. In the other chapters of this book, I will use a Bash-based script calledrun_stream.bash to execute the job. The script looks like this: hadoophc2r1m1 stream more run_stream.bash /bin/bash SPARK_HOME=/usr/local/spark SPARK_BIN=SPARK_HOME/bin SPARK_SBIN=SPARK_HOME/sbin JAR_PATH=/home/hadoop/spark/stream/target/scala-2.10/streaming_2.10- 1.0.jar CLASS_VAL=1 CLASS_PARAMS=":2" STREAM_JAR=/usr/local/spark/lib/spark-examples-1.3.1-hadoop2.3.0.jar cd SPARK_BIN ./spark-submit \ class CLASS_VAL \ master spark://hc2nn.semtech-solutions.co.nz:7077 \ executor-memory 100M \ total-executor-cores 50 \ jars STREAM_JAR \ JAR_PATH \ CLASS_PARAMS 75 www.finebook.irApache Spark Streaming So, this script sets some Spark-based variables, and a JAR library path for this job. It takes which Spark class to run, as its first parameter. It passes all the other variables, as parameters, to the Spark application class job. So, the execution of the application looks like this: hadoophc2r1m1 stream ./run_stream.bash \ nz.co.semtechsolutions.stream4 \ hc2r1m1.semtech-solutions.co.nz \ 11777 This means that the Spark application is ready, and is running as a Flume sink on port11777. The Flume input is ready, running as a netcat task on port10777. Now, the Flume agent,agent1, can be started using the Flume script calledflume.bash to send the netcat source-based data to the Apache Spark Flume-based sink: hadoophc2r1m1 stream ./flume.bash Now, when the text is passed to the netcat session, it should flow through Flume, and be processed as a stream by Spark. Let's try it: hadoophc2nn nc hc2r1m1.semtech-solutions.co.nz 10777 I hope that Apache Spark will print this OK I hope that Apache Spark will print this OK I hope that Apache Spark will print this OK Three simple pieces of text have been added to the netcat session, and have been acknowledged with anOK, so that they can be passed to Flume. The debug output in the Flume session shows that the events (one per line ) have been received and processed: 2015-07-06 18:13:18,699 (netcat-handler-0) DEBUG - org.apache.flume. source.NetcatSourceNetcatSocketHandler.run(NetcatSource.java:318) Chars read = 41 2015-07-06 18:13:18,700 (netcat-handler-0) DEBUG - org.apache.flume. source.NetcatSourceNetcatSocketHandler.run(NetcatSource.java:322)Events processed = 1 2015-07-06 18:13:18,990 (netcat-handler-0) DEBUG - org.apache.flume. source.NetcatSourceNetcatSocketHandler.run(NetcatSource.java:318) Chars read = 41 76 www.finebook.irChapter 3 2015-07-06 18:13:18,991 (netcat-handler-0) DEBUG - org.apache.flume. source.NetcatSourceNetcatSocketHandler.run(NetcatSource.java:322)Events processed = 1 2015-07-06 18:13:19,270 (netcat-handler-0) DEBUG - org.apache.flume. source.NetcatSourceNetcatSocketHandler.run(NetcatSource.java:318) Chars read = 41 2015-07-06 18:13:19,271 (netcat-handler-0) DEBUG - org.apache.flume. source.NetcatSourceNetcatSocketHandler.run(NetcatSource.java:322)Events processed = 1 Finally, in the Sparkstream4 application session, three events have been received and processed. In this case, dumped to the session to prove the point that the data arrived. Of course, this is not what you would normally do, but I wanted to prove data transit through this configuration: - Time: 1436163210000 ms - Received events : 3 - Time: 1436163210000 ms - I hope that Apache Spark will print this I hope that Apache Spark will print this I hope that Apache Spark will print this This is interesting, but it is not really a production-worthy example of Spark Flume data processing. So, in order to demonstrate a potentially real data processing approach, I will change the Flume congu fi ration file source details so that it uses a Perl script, which is executable as follows: agent1.sources.source1.type=exec agent1.sources.source.command=./rss.perl The Perl script, which is referenced previously,rss.perl, just acts as a source of Reuters science news. It receives the news as XML, and converts it into JSON format. It also cleans the data of unwanted noise. First, it imports packages like LWP and XML::XPath to enable XML processing. Then, it specifies a science-based Reuters news data source, and creates a new LWP agent to process the data, similar to this: /usr/bin/perl use strict; 77 www.finebook.irApache Spark Streaming use LWP::UserAgent; use XML::XPath; my urlsource="http://feeds.reuters.com/reuters/scienceNews" ; my agent = LWP::UserAgent-new; Then an infinite while loop is opened, and an HTTP GET request is carried out against the URL. The request is configured, and the agent makes the request via a call to the request method: while() my req = HTTP::Request-new(GET = (urlsource)); req-header('content-type' = 'application/json'); req-header('Accept' = 'application/json'); my resp = agent-request(req); If the request is successful, then the XML data returned, is defined as the decoded content of the request. Title information is extracted from the XML, via an XPath call using the path called/rss/channel/item/title: if ( resp-is_success ) my xmlpage = resp - decoded_content; my xp = XML::XPath-new( xml = xmlpage ); my nodeset = xp-find( '/rss/channel/item/title' ); my titles = () ; my index = 0 ; For each node in the extracted title data title XML string, data is extracted. It is cleaned of unwanted XML tags, and added to a Perl-based array calledtitles: foreach my node (nodeset-get_nodelist) my xmlstring = XML::XPath::XMLParser::as_string(node) ; xmlstring = s/title//g; 78 www.finebook.irChapter 3 xmlstring = s/\/title//g; xmlstring = s/"//g; xmlstring = s/,//g; titlesindex = xmlstring ; index = index + 1 ; foreach find node The same process is carried out for description-based data in the request response XML. The XPath value used this time is/rss/channel/item/description/. There are many more tags to be cleaned from the description data, so there are many more Perl searches, and line replacements that act on this data (s///g): my nodeset = xp-find( '/rss/channel/item/description' ); my desc = () ; index = 0 ; foreach my node (nodeset-get_nodelist) my xmlstring = XML::XPath::XMLParser::as_string(node) ; xmlstring = s/img.+\/img//g; xmlstring = s/href=".+"//g; xmlstring = s/src=".+"//g; xmlstring = s/src='.+'//g; xmlstring = s/br.+\///g; xmlstring = s/\/div//g; xmlstring = s/\/a//g; xmlstring = s/a \n//g; xmlstring = s/img //g; xmlstring = s/img \///g; xmlstring = s/div.+//g; xmlstring = s/title//g; xmlstring = s/\/title//g; xmlstring = s/description//g; xmlstring = s/\/description//g; 79 www.finebook.irApache Spark Streaming xmlstring = s/<.+//g; xmlstring = s/"//g; xmlstring = s/,//g; xmlstring = s/\r\n//g; descindex = xmlstring ; index = index + 1 ; foreach find node Finally, the XML-based title and description data is output in the RSS JSON format using aprint command. The script then sleeps for 30 seconds, and requests more RSS news information to process: my newsitems = index ; index = 0 ; for (index=0; index newsitems; index++) print "\"category\": \"science\"," . " \"title\": \"" . titlesindex . "\"," . " \"summary\": \"" . descindex . "\"" . "\n"; for rss items success ? sleep(30) ; while I have created a second Scala-based stream processing code example calledstream5. It is similar to thestream4 example, but it now processes therss item data from the stream. A case class is defined next to process the category, title, and summary from the XMLrss information. An HTML location is defined to store the resulting data that comes from the Flume channel: case class RSSItem(category : String, title : String, summary : String) 80 www.finebook.ir