Fraud Detection in Hadoop

credit card fraud detection using hadoop and hadoop fraud detection example and fraud analytics using hadoop
CecilGalot Profile Pic
CecilGalot,United Kingdom,Teacher
Published Date:06-08-2017
Your Website URL(Optional)
Fraud Detection What is fraud detection? Well, yes, it’s a multibillion-dollar industry that touches every company with significant amounts of money to lose, but what is fraud detection at its heart? For the purpose of our discussion, we can view fraud detection as making a decision based on whether an actor (human or machine) is behaving as it should. There are two different points here: 1) knowing the difference between normal and abnormal behavior, and 2) being able to act on that knowledge. An easy example to illustrate the first point is parents knowing when their child is telling a lie or hiding something. If you are a parent, you know how it is: you see your child every day and you know his normal behaviors and patterns. If one day he’s a little quieter than normal or trying to avoid eye contact, your instincts kick in and you start asking questions until you find out he failed a test, got into a fight with a friend, or got bullied at school. You were able to detect the deception because of your close relationship and knowledge of your child. That close relationship is key to detecting changes in well-established patterns. Continuous Improvement Now, our example will rarely end in your child stealing millions from a bank or get‐ ting involved in money laundering, but, like the actors in those examples, the child is trying to hide something. Just as your child becomes better at deception as he grows older, scammers and hackers become more sophisticated as time goes on. As humans we have different levels for processing new information and making deci‐ sions. For simplicity we’ll break this into two different groups of processing: back‐ ground processing and quick reactions. Background processing is useful for learning and improving our process, while quick reactions are used when we’re taking action based on the existing process. 281In our example, background processing is learning the normal behavior of our child: you do that by observing your child over time. It can happen consciously or uncon‐ sciously while you engage in activities with your child, or afterward in a retrospective way. This is where we will be reviewing our child’s behavior and essentially building a profile of our child’s normal behavior. Generally, it’s this deeper review of events, typi‐ cally done offline, where new insights are discovered. A more relevant example of this is how credit card scammers have gotten better through the years. First, the credit card thieves would steal one or more credit card numbers and start buying things. After a while the card company would flag the card and find the thieves if they continued to use the card. The thieves adapted to only use the card a couple of times in a single, short burst. This worked for a while until banks developed new algorithms designed to detect those outliers from the original card‐ holders’ normal behavior. Now, the card thieves have the power of computers and the Internet to make microtransactions that can make the fraudulent behavior even harder to detect. Consider scammers firing off microtransactions across many users. This approach can affect what is perceived as normal behavior for the actor, which hinders the ability to isolate it as something abnormal. Getting back to our parenting example, to know one child or even a couple of chil‐ dren is totally different (in terms of scale) from knowing billions of actors like credit card holders, shoppers, bank account owners, video game players, or fans. This is where Hadoop provides valuable services by offering enough space to store all the information we need for all those actors, and the processing power to review all of that information and act on it. Taking Action Based on this background processing, we are ready to act when something seems a bit odd. The quick reaction in this scenario is the observation that your child is quieter than normal or is avoiding eye contact; this triggers your instinct that there may be something off. However, this quick reaction processing can only happen if you know the normal behavior to expect from your child. This allows us, the parents, to match the child’s actions against this profile and react quickly, if and when it’s necessary to detect that something’s off. It can be interesting to consider that successful fraud detection systems are modeled after our understanding of how the brain learns and reacts: the fraud detection sys‐ tem has to be able to react quickly and make tiny updates to its reactions based on incoming data, but offline processes are needed to find that insight. It’s these insights that may change the rules that are firing in the fast reaction stage or uncover new pat‐ terns to which to react. 282 Chapter 9: Fraud Detection We’ve been talking about credit card transaction fraud in this discussion, but it should be noted that fraud detection cuts across industries and domains. As we noted before, any business for which there is something to gain by cheating is at risk. In the upcoming case study discussion, we use the example of a fraud detection system that can be used by a bank or a credit card company to detect fraud on a customer’s account. However, similar architecture would apply fraud detection applications as well—for example, detecting fraud in trading currencies or goods in a massively mul‐ tiplayer online game (MMOG). Architectural Requirements of Fraud Detection Systems Fraud detection systems are different than many other Hadoop use cases because they include the quick reaction component. A clickstream analysis system or a data ware‐ house may need to process billions of events, but it can take hours to do so. Fraud detection systems must respond to events in few milliseconds and be utterly reliable —all this in addition to handling millions of transactions per second and running large-scale data analysis in the background. The Hadoop ecosystem provides real- time components (such as Flume, Kafka, and HBase), near-real-time components (such as Spark Streaming), and Search components that can be used for background processing and analysis (such as Spark, Hive, and Impala). Introducing Our Use Case To help you understand the fraud detection architecture, we’ve created a simple example that implements fraud detection as it pertains to a bank account or a credit card. The goal of the fraud detection in this example is to review a new transaction that comes in. If the transaction is a given multiple larger than the user’s normal transac‐ tion amount per day, it will be flagged. The system will also check to see if the user location is different from the user’s last 20 locations. If this is a web transaction sys‐ tem, this would mean checking the IP address against that of the user’s last 20 logins. If it’s from a physical point-of-sale system, that would mean checking the user’s geo‐ graphic location (say, city) against the user’s last 20 locations and basing the decision on proximity. If the transaction originates from a new location (online or otherwise), the threshold of the amount over the average transaction value may be lowered fur‐ ther. This is a simple example that uses a single profile table to hold information about account holders. It implements a simple local caching layer and uses HBase for the backend persistence. Although this is a simple use case, it should provide you with an Architectural Requirements of Fraud Detection Systems 283 understanding of how Hadoop is used to provide a solution for fraud detection as well as a concrete example of near-real-time processing on Hadoop. Broad Relevance of Anomaly Detection Techniques Note that we started the discussion by defining fraud detection as “making a decision based on whether an actor (human or machine) is behaving as it should.” The ability to differentiate normal and abnormal events is also called anomaly detection or outlier detection and has wide implications in many industries, in addition to preventing financial fraud. The same techniques shown in this chapter can be used to detect intrusion into secure networks, early warning signs of failures in machines, customers who may abandon a service in order to retain them in time, or patients who go “doc‐ tor shopping”—an early indicator of a possible drug-abuse problem. No matter what industry you are in, anomaly detection is one of the most useful data analysis techni‐ ques in your arsenal. High-Level Design So, how can Hadoop help us implement such a system? There are a number of ways to build this system, but we’ll start with a general design and then dig into different options for the components in order to demonstrate how the overall design can be tuned for specific use cases. Figure 9-1 provides a high-level view of our system. There’s a lot going on in this dia‐ gram, so let’s break it up and take it piece by piece: • Starting from the upper-left side, we have the client. In our example, the client is a web service responsible for receiving requests for credit card transactions and responding with an approval or rejection. There are multiple boxes because the web service could be distributed across multiple servers. • To approve a transaction, the web service may retrieve the user’s profile and transaction history from HBase. It also needs to record the current transaction in HBase as part of the user’s history. • The transaction, the decision on whether it was approved or rejected, and the reasons for the decision are sent to HDFS through Flume. • When the data is in HDFS, it can be automatically processed by a near-real-time processing framework such as Spark or Spark Streaming. Spark Streaming can also process data directly from Flume. NRT processing can detect trends and events unfolding over a few minutes that you would not be able to see by inspect‐ ing each transaction in isolation (for example, suspicious activity across multiple users in the same geographical region or same retailer). The NRT process can then automatically update the profiles in HBase accordingly. 284 Chapter 9: Fraud Detection • The data can also be explored and analyzed by humans, using tools such as Impala, Spark, and MapReduce. They can discover additional behavior patterns and update both the client logic and the information in HBase. Figure 9-1. High-level view of fraud detection system In this chapter, we will cover the architecture of the client, the HBase schema design, and the Flume-based ingestion framework in detail. We will give some suggestions regarding near-real-time processing and exploratory analysis, but specific machine- learning algorithms are outside the scope of this book. Recommended Machine Learning Resources Knowledge of machine-learning algorithms is obviously useful for anyone serious about implementing a fraud detection system. Here are some resources to help you get started: • For an amusing and educational approach to data analysis, we recommend Ana‐ lytics Made Skeezy, a blog that explains essential methods and algorithms with an entertaining narrative about shady criminals. • For a practical approach to machine learning, we recommend Algorithms of the Intelligent Web by Douglas G. McIlwraith, et al. (Manning), a book that explains High-Level Design 285the algorithms that power popular websites with easy-to-follow Java implementa‐ tions. • If you are looking for a practical introduction and prefer Python to Java, Sarah Guido’s upcoming book Introduction to Machine Learning with Python (O’Reilly) is recommended. • For a more in-depth theoretical background that still doesn’t require a PhD in mathematics to understand, we recommend Stuart Russell and Peter Norvig’s Artificial Intelligence: A Modern Approach (Pearson). Here is a list of the high-level subsections we are going to drill into: • Client architecture: — How will the client make decisions regarding incoming events? — How does the client use HBase to store, retrieve, and update user profile infor‐ mation? — How does the client deliver the transaction status to additional systems? • Ingest: How do we use Flume to ingest the transaction data from the client into HDFS? • Near-real-time processing: How do we learn from incoming data with low- latency processing? • Background processing, reviewing, and adjustments: How do we learn from all our data and improve the process? Although there are other solutions out there, which we will discuss in more detail at the end of the chapter, the architecture in this chapter was selected because it is easy, simple, scalable, and fast. Let’s now look into the subsystems in our architecture in more detail. Client Architecture In our architecture, the transaction approval logic is implemented in the client. The client itself can be a web server or a similar application. This is any system that an event hits requiring review and decision. For this discussion we’ll assume this is a web server and one of its main jobs is to review incoming events and respond with appro‐ val or rejection. How exactly does the client do all this reviewing and alerting? It can be broken down into three checks on each event: 286 Chapter 9: Fraud DetectionEvent validation This is validation of rules that fire off in the context of the event itself—for exam‐ ple, format or basic logical rules like “column A must be a positive number” and “column B says withdraw.” Global context validation This is validation of rules in the context of global information, such as thresholds of risk. An example would be: Is this URL safe? I might have a global list of IP address masks that have a risk rating, and the rule would say I’m only allowing IP addresses below a certain level of risk. Profile content validation This version of validation raises the level of required information needed about the actors related to the event. A good example of this is bank transactions. If the user’s last 100 transactions were in the states of Ohio and Michigan and then sud‐ denly we get a transaction in Russia 22 minutes after a transaction in Ohio, that should give us reason to be alarmed. Putting these together, each time the client application receives an event it retrieves the profile, executes fraud detection logic using the profile and the event, and returns a result. Note that it’s because of the global- and profile-level validations that we require Hadoop or HBase; if all we needed was the event validation, we could remove Hadoop and HBase entirely from the architecture. It is getting and updating profile information of potentially billions of actors that makes Hadoop and HBase a great fit. Let’s focus on how we’re going to populate the client with the global and user profile information when it is needed. Profile Storage and Retrieval There are two challenges in storing user profiles and accessing them from the client. One is the sheer amount of information we need to store—potentially a long history of transactions about billions of users. The second challenge is fast retrieval of infor‐ mation. To react to fraud in a timely manner, we need to be able to look up the his‐ tory of any user in few milliseconds. In this section, we will explore the options of using NoSQL databases and caches for quick retrieval of information. We will focus on HBase and show how to store the user profiles in HBase in ways that make the data fast to both retrieve and update. Profile Storage and Retrieval 287Caching The fastest way for the client to retrieve profile information is from local memory. If the profile is found in the local cache, we are looking at sub-microsecond latency to get the information ready for the validation logic. A complete discussion on implementing a good caching layer is out of scope for this book, but we will give a quick high-level overview to provide the basic ideas behind implementing a caching framework. At the heart of any caching framework is memory. Memory, of course, has limita‐ tions, and it’s very possible that we won’t have enough memory on a given host to hold all the data we need. We recommend a multi-layered approach, combining a local cache with a remote distributed cache. Regardless of whether additional layers of caching are used, we recommend using the local cache at least for the most active users. Although this can complicate the archi‐ tecture to some extent, the difference in latency between local memory (submicrosec‐ ond) and reading a profile over the network (at least a few milliseconds) is typically worth the additional complexity. To overcome the memory size limitation of the local cache, it is possible to partition the profile information between the clients so that each client holds a subset of records, and transaction approval requests are routed to the appropriate client. In our example, we use Google’s Guava Cache library, which makes it very easy to create local caches and we configure its get method to load the profile from HBase if it does not exist in the cache. As for the remote cache layer, there are two possibilities: • A distributed memory caching solution, such as Memcached, to distribute the data across a cluster of nodes. • Setting up HBase so that all needed records can be found in the block cache. The block cache keeps data blocks in memory, where they can be quickly accessed. Let’s explore these options a little more in the following sections. Distributed memory caching A distributed memory solution like Memcached or Redis simplifies the work of developing a caching layer. In terms of performance, though, it still requires a net‐ work call, which can add a small amount of latency to requests. Request times should be in the 1- to 4-millisecond range. The advantage of this solution over the partition‐ ing solution is that we won’t have downtime when nodes fail, since we can set up the caching solution with multiple replicas of the data. The only downside of the dis‐ tributed memory caching solution is you need enough memory to hold everything. If you can’t hold everything, you need an additional persistence store backed by disk, 288 Chapter 9: Fraud Detection which means an additional call when data is not in memory. As we’ll see shortly, if we’re utilizing HBase, there is little reason why you need to also use a distributed caching solution. HBase with BlockCache If configured correctly, HBase can give us a couple of millisecond response times when used as our caching layer, so long as we can keep the results we are looking for in memory. In this case, we’ll be using the HBase BlockCache, which again is allowing us to keep recently used data blocks in memory. Note that previous versions of HBase had memory limitations that impacted the size of the BlockCache, but recent enhancements have reduced many of these limitations with advancements like off- heap block cache, compressed block cache, and improvements in Java’s garbage col‐ lection (GC) system. However, a core feature of HBase—strong consistency—presents a potential down‐ side for near-real-time responses in our fraud detection system. As the CAP theorem notes, systems such as HBase with strong consistency and partition tolerance will likely not offer strong availability guarantees. And, indeed, when an HBase region server fails, there is a subsection of row keys that cannot be read or written for a period of time that could span minutes. For this reason, in a production system utilizing HBase as a core component we’ll need to introduce a mechanism to provide stronger availability. A full discussion of this is outside the scope of this book, but note that work is under way in the HBase project to provide capabilities to run multiple HBase clusters and keep the data in sync between them. This will allow for failover in the case of a failure in persisting or fetching data from a primary HBase cluster. HBase DataDefinition To further explore how we should implement a solution with HBase, let’s walk through what the data model and interaction with HBase looks like. In our example of credit card transaction fraud, we need a profile that will hold enough information for us to run our model against to determine if there is abnormal behavior. The following is Java code that will represent our ProfilePojo class with the values we will be discussing. Note that we are separating the fields into few categories: values that rarely change, values that change often, values that count things, and values that capture historical events: // Fields that rarely change: private String username; private int age; private long firstLogIn; Profile Storage and Retrieval 289// Values that frequently change: private long lastLogIn; private String lastLogInIpAddress; // Counters: private long logInCount; private AtomicLong totalSells; private AtomicLong totalValueOfPastSells; private AtomicLong currentLogInSellsValue; private AtomicLong totalPurchases; private AtomicLong totalValueOfPastPurchases; private AtomicLong currentLogInPurchasesValue; // Historical fields: private SetString last20LogOnIpAddresses; The different categories are important because each type may be stored in HBase dif‐ ferently. Let’s see what options we have in terms of persisting and updating data fields in HBase. Columns (combined or atomic) The first two categories we will look at are the values that don’t change a lot and the ones that do change a lot. With all these columns, we have the option to store them in a single column qualifier or to store them in their own column qualifiers. Those coming from the RDBMS world would never consider storing multiple fields in a single column, but HBase is a little different because of how it stores the data on disk. Figure 9-2 shows the difference between storing these five values as a single col‐ umn in HBase and storing them as individual columns. Figure 9-2. Single-column versus individual-column storage As we can see from Figure 9-2, the benefit of using a single column is less space con‐ sumed on disk. Now, most of that extra information in the individual columns will be 290 Chapter 9: Fraud Detection compressed away, but still there is the cost of decompressing and sending it over the network. The downside to combining all the values is that you can no longer change them atomically. This is why this strategy of combining values is mostly reserved to col‐ umns that don’t change much or always change together. That said, because this is a performance optimization, it’s often best to keep things simple in initial development. A good design principle to follow is to have a method that encapsulates the persis‐ tence and serialization of objects to and from HBase so that the effect of any schema changes is localized in your code. Using Short Column Names One thing to call out is the short column names in Figure 9-2. Remember that column names are also stored on disk. This wouldn’t be true for a normal RDBMS, but remember HBase doesn’t have a fixed schema and can have different columns per record. Therefore, it needs to label the column names for each record. With that understanding, remember to keep your column names short. Event counting using HBase increment or put The HBase APIs provide an increment function that allows you to increment a numerical value stored in HBase and referenced by a row key, column family, and column qualifier. For example, if the value was originally 42 and you increment it by 2, it will be 44. This is powerful because multiple applications can increment the same value with assurance that the value is correctly incremented in a thread-safe manner. If you are used to Java, the increment is like using anAtomicLong and the addAndGet() method, as opposed to a simplelong++, which is not thread-safe. So despite increment being thread-safe, there’s still a potential concern—not with HBase, but with the client. It’s easy to think of a case where a client will send the increment and then fail before getting acknowledgment of the original event. In that case, the client will try to recover and increment again. The result is a double incre‐ ment, which means our number is wrong until the number is reset. This issue is even more pronounced if you try to update multiple HBase clusters: you could send the increment to the primary cluster and have the request take long enough that the cli‐ ent believes it has failed and then sends the increment to the failover cluster. If the first increment then completes successfully, we now have a double increment. Given these potential issues, increment may not be the best option. Another option is to simply use HBase put, although there are limitations to puts that are actually addressed by the increment functions: Profile Storage and Retrieval 291 • The put requires the initial value. So, for example, if you’re incrementing by 2, you first need to get the initial value, which in our example was 42. Then, we need to do the addition on the client before sending the value of 44 back to HBase. • Because we need the original value, there is no way we can increment a value in a multithreaded manner. A possible way to address these limitations would be a streaming solution like Spark Streaming. As we talked about in Chapter 7, Spark Streaming can store counts in an RDD that can be incremented in a reduce process. Then, you can put() these values into HBase. Although simple, this provides a powerful model. We don’t need multi‐ threading because we are partitioning and microbatching. Also, we don’t need to get from HBase on every pass because we are the only system updating the values, so as long as we don’t lose our RDD we don’t need to re-get the latest values. To summarize, HBase increment is simpler to use, but can have issues with duplicate increments.put does not have this issue, but can be a bit more complex to update in a consistent manner. In our example implementation we useput. Event history using HBase put In addition to simply counting events, we also want to store the complete transaction history for each user. Because an HBase record is uniquely described by its row, col‐ umn, and version, we can store history for a user by simply putting new events for the user into the same row and column, and use HBase to keep track of the user his‐ tory. Like increments, HBase versioning provides a lot of power, but also has drawbacks: • First off, the default number of versions stored per cell is one. For this use case we’ll want to set the number of versions to a much higher number—at least 20 and possibly more. The number of versions is set at a column family level, not at a column level. So, if you want 20 versions only for one column of many in a row, you need to decide if you want all the columns to have 20 versions, or if you want to pay the extra cost of having two column families. • If we decide to add 20 versions to all the columns, then we’re adding overhead for the table. First, the scan times will slow down. Second, the amount of usable information going into the block cache will decrease, and the overall size of the table on disk will increase even after a compaction. So is there another option? The answer is yes, and it’s HBase puts again. The client will read the full history from HBase, add the new transactions, and then write the full history in a singleput. 292 Chapter 9: Fraud Detection Unlike the increment and put solution, this one is not as easy to make a clear deci‐ sion on. We may want to keep the last 100 IP addresses. In that case the put solution would involve a lot of updating. Also, the cost to retrieve this much history and store it in the local cache may be too great. You should consider your use case and perfor‐ mance of the two solutions carefully before locking down your schema design. Let’s look at how we will update profiles in HBase from our web service. Note that the example is a simplified code snippet; you’ll find the full application in the book’s Git‐ Hub repository. In this example, each profile has its own row, with theuserId as the row key. The row has two columns: json, which stores the user profile in JSON format, and timestamp, which stores the time the profile was updated. Storing the profile in JSON is not our typical recommendation, because the Avro format has significant advantages (as dis‐ cussed in previous chapters). In the case of low-latency stream processing, JSON takes less CPU to process and therefore increases the throughput of the web server. The overall plan is to open a connection to HBase, instantiate a table, and then get andput profiles from the table: static HConnection hConnection; hConnection = HConnectionManager.createConnection(hbaseConfig); byte rowKey = HBaseUtils.convertKeyToRowKey("profileCacheTableName", userId); Get get = new Get(rowKey); final HTableInterface table = hConnection.getTable("profileCacheTableName"); Result result = table.get(get); NavigableMapbyte, byte userProfile = result .getFamilyMap("profile"); Put put = new Put(rowKey); put.add("profile", "json", Bytes.toBytes(profile.getKey().getJSONObject().toString())); put.add("profile", "timestamp", Bytes.toBytes(Long.toString(System.currentTimeMillis()))); long previousTimeStamp = profile.getKey().lastUpdatedTimeStamp; table.checkAndPut(rowKey, "profile", "timestamp", Bytes.toBytes(Long.toString(previousTimeStamp)),put) Profile Storage and Retrieval 293 This code segment gets the profile from HBase. In the full application, it appears in the loadProfileFromHBase() method, which executes when we are looking for the profile in the cache, if it’s not in the cache already. This code segment updates the profile in HBase. In the full application, it appears in HBaseFlusher. HBaseFlusher is a background thread that reads updated pro‐ files from a queue and puts them into HBase as shown here. We are using the checkAndPut() method to avoid race conditions where we acci‐ dentally override a profile that was already updated by another web server. If the timestamp in HBase does not match the timestamp in our app, we have an outda‐ ted copy. We’ll need to get a new copy from HBase, make our updates on it, and try writing it to HBase again. This line usually appears in a while() statement, so if thecheckAndPut() fails, we reload the new profile, update it, and try again. Delivering Transaction Status: Approved or Denied? Now, let’s discuss how to send notifications to systems when we have found fraud. For the case in which our client has detected fraud, there could be actions that exter‐ nal systems need to take in response. With respect to the client, the number one external system is most likely the system sending the event to the client in the first place, with the requirement to know if the request or action was legitimate. In our simple architecture, having this method alert the original sender is as simple as popu‐ lating the response to the sender’s request. The next group of external systems that may need to know about fraud detection out‐ comes are systems that are downstream and out of the real-time response window. We could send the detection events directly to these systems, but that would mean closely coupling these systems. Further, if there is more than one downstream system it would require multiple submissions of the same alert from the client, increasing the load on our NRT client solution. This is where Kafka or a message queue (MQ) sys‐ tem would be a perfect addition to our architecture. We could publish alerts to a topic on our Kafka or MQ system, allowing any downstream systems needing access to these alerts to subscribe to the appropriate topic. Next, there most likely will be a need to log and learn from our fraud detection decisions. This will require us to send every event, its corresponding fraud detection decision (fraud, no fraud, or what kind of fraud), and the reasoning for the decision to HDFS for long-term storage and batch processing. In the long term, this will allow us to do a much deeper dive into the data with engines like Spark, Impala, and Map‐ Reduce. This long-term storage area and batch processing would be the deep thought process we previously discussed. We will look into how to do this next. 294 Chapter 9: Fraud Detection Lastly, the events and the decisions could also be sent to a stream processing system to get more up to the minute real-time learning done on the data. We will touch on that later in the chapter as well. Ingest Figure 9-3 highlights the subset of the architecture we will focus on when digging into the ingestion portion of our fraud application. Figure 9-3. Ingest architecture We’ll use Flume here as the last leg before we hit HDFS or Spark Streaming, for all the reasons we talked about in Chapter 2: easy to configure, the ability to scale out, and maturity of the project. In the next subsections we will focus on different approaches to connecting the client to Flume and approaches to deploy Flume alongside HDFS. There is a lot more that goes into building a Flume ingestion architecture, much of which we cover in Chap‐ ter 2. Ingest 295Path Between the Client and Flume We have many options for connecting the client to Flume, but for this discussion we’ll focus on three: client push, logfile pull, and a message queue between the client and the final sink. Client push The first option is an easy one to start off with: putting a Flume client in the applica‐ tion that will batch up the messages and send them on their way to Flume. In this model the client will point straight to the Flume agents that will be doing the write to HDFS (see Figure 9-4). Figure 9-4. Client push We could implement this easily using the NettyAvroRpcClient API in the web service, which would offer us the advantage of over-the-wire encryption, compression, and fine control of the batching and threads. To use the NettyAvroRpcClient, you first need to get an instance from RpcClientFac‐ tory. You then create Flume events using EventBuilder and append the events to the NettyAvroRpcClient to send them to Flume. You can also call the appendBatch() function to send a list of events together; this method increases latency (because you are waiting for events to accumulate before sending them), but it increases through‐ put and we recommend using it for sending data to Flume. For example, here’s how we used an Avro client to send events from our fraud detec‐ tion app to Flume (this is only a code snippet, the full application is available on our GitHub repository): 296 Chapter 9: Fraud Detection public static class FlumeFlusher implements Runnable int flumeHost = 0; Override public void run() NettyAvroRpcClient client = null; while (isRunning) if (client == null) client = getClient(); ListEvent eventActionList = new ArrayListEvent(); ListAction actionList = new ArrayListAction(); try for (int i = 0; i MAX_BATCH_SIZE; i++) Action action = pendingFlumeSubmits.poll(); if (action == null) break; Event event = new SimpleEvent(); event.setBody(Bytes.toBytes(action.getJSONObject().toString())); eventActionList.add(event); actionList.add(action); if (eventActionList.size() 0) client.appendBatch(eventActionList); catch (Throwable t) try LOG.error("Problem in HBaseFlusher", t); pendingFlumeSubmits.addAll(actionList); actionList.clear(); client = null; catch (Throwable t2) LOG.error("Problem in HBaseFlusher when trying to return puts to " + + "queue", t2); finally for (Action action: actionList) synchronized (action) action.notify(); try Thread.sleep(HBASE_PULL_FLUSH_WAIT_TIME); catch (InterruptedException e) LOG.error("Problem in HBaseFlusher", e); Ingest 297 We start by initializing the Avro client with the address of the Flume agent to which we will send the data. We maintain a queue of actions (approve/deny decisions) that we want to send to Flume. We pick events out of the queue and convert them to Flume events. We add those events to a list, so we can send them to Flume in one batch. Here we send the list of events to Flume. If there was an error, we place the actions back in the queue so we can retry later. This client connects to a Flume agent that has an Avro source listening to the port we used when initializing the client. a1.sources.r1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = a1.sources.r1.port = 4243 There are some downsides to pushing events directly from the client to Flume: • It assumes the client is implemented in Java. • The Flume libraries would have to be included with your client application. • It requires additional memory and CPU resources on the client. • The client needs to be able to switch between Flume agents in the case where a Flume agent crashes. • Clients will have different network latency between Flume agents depending on where they are located in relation to the Flume agents and the cluster. This will affect threading and batch sizing. This is the recommended approach if you have full implementation control of the cli‐ ents (which is something we assume in this use case), and if one of the client’s pri‐ mary jobs is ingesting data to Flume. Unfortunately, this is not always the case, so we need to consider additional options. Logfile pull The second option is one that is frequently seen in the real world, mainly because it’s simple and usually easy to start with. Most applications are already using logfiles, so we can ingest events from the logfiles to Flume without modifying the client code. In 298 Chapter 9: Fraud Detection this case, we’re writing logs to disk and then using a Flume source to read log records into our ingest pipeline. Figure 9-5 illustrates this approach in a high-level way. Figure 9-5.Logfile pull Even though this is a common implementation, we do not recommend this solution for fraud detection. Using logfiles for ingest has some drawbacks compared to using the Avro client: Performance There is a performance hit in writing the logs to disk and then reading them back into Flume. Although the reading will hopefully benefit from OS disk-level cach‐ ing, there is still the cost of writing, serialization, and de-serialization. There have even been cases where the reader is unable to keep up with the writers, leading to even bigger problems. Data loss It is not a simple task to tail logs that are changing quickly on disk, increasing the chance of losing data as files are rotated. A spooling directory source can be used to mitigate this problem, but this source will add latency since it only starts pro‐ cessing files after they’ve been closed by the application writing to them. Flume has to know the client Flume has to be knowledgeable of the application’s log locations and file formats. Because in this use case we assume that we are writing the client, we can use the pre‐ ferred architecture and add the Avro client to the client. Compare this to the click‐ stream use case for which we assumed that the web server is a given and we can’t modify it; in that case, we used Flume’s spooling directory source to ingest from the logfiles. Message queue or Kafka in the middle This model is also pretty common. This solution decouples the client from Flume even more and allows third parties to get incoming messages. This solution could look different depending on how you set it up. Note that these are fairly complex Ingest 299 options and were not included in our original high-level architecture where we aimed at a more straightforward approach. One option might be client → MQ → Flume/Spark → HDFS/Spark as shown in Figure 9-6. Figure 9-6. Client → MQ → Flume/Spark → HDFS/Spark The advantage here is that we have decoupled our client from Flume and Spark. Note the two potential paths in this diagram where Spark can read from Flume or the MQ system. Both would work, but the advantage of having Spark receiving events from Flume is that we could have filtering before we send to Spark. This would allow us to reduce the load on our microbatching system. This architecture is simple, and integrating Kafka can have advantages. So let’s look at some of these options, starting with the client → Kafka → Flume/Spark, as shown in Figure 9-7. 300 Chapter 9: Fraud Detection

Advise: Why You Wasting Money in Costly SEO Tools, Use World's Best Free SEO Tool Ubersuggest.