An Introduction to Flink and Better Batch Processing

What is batch processing?  

Big data defines a large amount of data that often overwhelm companies and their operations. Its constant influx is necessary to gain insights about markets and trends without which developing strategies become difficult. The resultant data can be either finite or infinite. However, it requires proper analysis as the pieces of information that come in are primarily from raw sources. The process to extract information from finite amount of data is called batch data processing or batch processing.   

What is Flink and why should we use it?  

Flink is a framework and distributed processing engine for batch and stream data processing. Its structure enables it to process a finite amount of data and infinite streams of data.  

Flink has several advantages like- 

  • It provides a high throughput, low latency streaming engine  
  • It supports time-based event processing and state management  
  • The system is fault-tolerant and guarantees exactly-once processing 
  • Flink provides data source/sink connectors to read data from and write data to external systems like Kafka, HDFS, Cassandra etc.  

Overview of batch processing  

Batch processing applications generally have a straightforward structure. First, we read data from an external data source, such as distributed file system or database. Flink application processes the data and the result is written back to external data sink like distributed file system or database.  

DataSet API  

Dataset is a handle for processing distributed data in the Flink cluster. It stores elements of the same type and it is immutable.   

Read data  

Flink provides several methods to read data into DataSet. Here are some of the commonly used ones: 

Method   Description  
readCsvFile   Read a file in CSV file format  
readTextFile   Read raw text file  
fromElements/  

fromCollection  

Create DataSet from a collection of records  
readFile (inputFormat, file)   Read file using a custom File InputFormat type  
createInput (inputFormat)   Read data from a custom data source  

Transform data  

Flink have several methods to transform elements within DataSet. Here are some of the commonly used ones:  

Method   Description  
map   Convert every input record to object  
filter   Filter out records based on some condition  
flatMap   Transform record into zero or more elements  
groupBy   Group records based on key from the element  

Flink also provides methods to be applied on multiple DataSets. Here are some of the commonly used ones:  

Method   Description  
Join   Join two datasets based on common key from those datasets  
outerJoin   Outer join for two datasets  
Cross   Cross-product of two datasets  

Write data  

Flink has several methods to write DataSet result to external sink.  

Method   Description  
writeAsText   Write result as text  
writeAsCsv   Write result in CSV format  
print/printToErr   Write data to standard output or standard error  
write   Write result to a file using common FileOutputFormat  
output   Write result to a custom output using OutputFormat  
Collect   Convert result into a List datatype  

Processing workflow  

Imagine we want to process songs.csv file where each record has song id, title, singers’ fields and we want to get the list of songs sung by ‘Taylor Swift’.  

songs.csv: https://gist.github.com/sagargangurde/78c02b1370fb1cdeb8b3a4ec089ff4be     

FilterSongs.java: https://gist.github.com/sagargangurde/d2ebbe257837c6909d0a824ec4d6df4e   

First, we read songs.csv using readCsvFile() method which will return the DataSet containing all songs. As its following step, we use map () method to convert each DataSet record to a song object. Then we apply filter () method on each song object to filter songs sung by ‘Taylor Swift’ and write the result in text format using writeAsText() method.  

Conclusion

There are multiple tools that address problems related to batch processing. But Flink DataSet API has specific pros that make developers choose it more often than others. It is easy to work with and the wide range of applications make it an attractive option. Its flexibility and affordability are also reasons that encourage developers to go for this option.  

Performance Optimization of Spark-SQL

What is Spark and Spark SQL?

Spark is an open source, scalable, massively parallel, an in-memory execution environment for running analytics applications. Think of it as an in-memory layer that sits above multiple data stores, where data can be loaded into memory and analyzed in parallel across a cluster. Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine.

Note: dataset mentioned in the following discussion is not dataset API in spark. It’s a set of data.

There are N numbers of things responsible for bad or below average performance. I am going to touch base only things that I come across and how I addressed those issues.

When optimizing the performance of Spark-SQL or any other distributed computing framework it’s very very important to know your data (KYD) first. Because spark job may run exceptionally well with one set of data and really bad for other.  We are going to address painful joins in the following scenarios

  1. Joins between datasets with different cardinalities
  2. Skewed data joins

How joins works?

Following diagram works how normally joins works in spark-SQL. This join is also known as shuffle join. Because when we join two datasets, let’s say A & B, data for one key is brought on one executor. (Small boxes inside rectangle show executors on different machines).

In some of the scenarios above operation can be problematic. In worst it may result in complete data movement of both datasets. This may result in network congestions and also increase I/O. Also if there is a lot of data against one key (skewed data) it can result in job failure or at least terribly slow down job execution. In the following topics, we will see how to solve these problems.

  1. a) Joins between datasets with different cardinalities:

 Joins can be of two types. Map-side join and Reduce-side join.  Both joins perform good or bad depending on the datasets.

1) Consider, there are two datasets A and B. When we join A & B in spark automatically reduce-side join happens. That means spark shuffle the data for similar keys on the same executor.  This join does well when the distribution of the data is uniform across the join-keys (keys/fields on which joins are happening)

e.g. If I join df_A.join(df_B, [key1, key2]) distribution of records for both key1 and key2 in both sets play an important role. If records are distributed uniformly across all join keys in both datasets taking part in this join.

To improve the performance of reduce join we should join on the minimal datasets. Minimal can be obtained by filtering out unwanted records before joins instead of doing this after the join. This way we can avoid unnecessary shuffling of data, as records for only those keys will be shuffled which are actually needed in results.

2) Now consider A is big datasets and B is a really small dataset. Now, what qualifies for the small dataset? As per spark documentation, 2GB is max-threshold for the auto-broadcast parameter. I am kind of agree with this threshold because this data will be copied to each executor.  Datasets which few MB to 2-3 GB in size must be broadcasted.

This number depends on how big are the executors in the cluster. Also, roughly size of dataset * number of executors should not bigger than the bigger dataset. Otherwise using simple reduce side join will be efficient.  Following diagram shows how broadcast joins works. You can see on each executor where partition for A exists, entire B dataset is made available.

Enough theory!! How we can broadcast smaller dataset? It’s really simple. We just need to provide the broadcast hint to spark processing engine and that’s it. Following code snippet showcase how it can be done.

With both above approach check the shuffle size in spark-UI. Even if shuffle size is less, that does not mean amount of data-transferred is less. Might be in case of broadcast join we are moving the larger amount of data.  Try out both to check performance and adopt which suits best for your need.

  1. b) Skewed data joins:

 Consider we are joining dataset A and B. Skewed data means data with the un-uniform distribution of records across keys. More precisely huge %tage of data have very few numbers of keys. Now due spark processing the way joins are performed a large amount of data is collected on single or very few executors. This has following effects

  • Tasks with large data run for a very very long time. Ultimately jobs take longer to finish. Other executors sit idle during this time. So ineffective usage of resources.
  • If records for the key are too large, executors run out of memory and job fails.

Problem is explained/shown in the following diagram

So one obvious solution of that is to remove these keys with skewed data. But that may not be possible every time.  In such cases where it’s mandatory to process everything, we have two solutions

  • Broadcast smaller dataset, if one of data is smaller
  • If broadcasting is not possible, we have to split the data against skewed keys to more number of executors by adding the limited-range random number in join key in the dataset which has skewed records. Let’s call it A.  But doing this will produce wrong results as records for skewed keys from other datasets (let’s call it B) may not be available at executors where we have distributed records from A.

To handle this we will multiply data in set B by cross join with limited range number dataset (lets 1-30 for distributing it to 30 records). Also, the random number within 1-30 range to each record in set A. Now join A and B on original join key “key1” and “random_val”. This will avoid executors from being flooded with a large number of keys. But be careful if we increase range 1-30 too much it will grow the data B exponentially. This is explained in the following diagram

Here is how it can be done –

Conclusion:

  • Broadcast joins are really helpful in the case of one small and one big dataset joins. It can drastically improve performance as well as network utilization.
  • Datasets with skewed data can be handled efficiently by distributing data to different executors by adding the key to it. It can improve the performance and stability of the job run.

Setting up development environment for Google App Engine and Python

Google App Engine is a PAAS offering from Google Cloud Platform, which enables you to build complex web solutions with significant ease without worrying too much about the scalability or infrastructure management. If you want to develop GAE applications using python and looking for a way to setup your development environment then this post is for you. Continue reading Setting up development environment for Google App Engine and Python

Build a Custom Solr Filter to Handle Unit Conversions

Recently, I came across a use case where it was required to handle units of weight in the index. For instance, 2kg and 2000g, when searched should return the same set of results.

So, for achieving the above, I wrote a custom Solr filter that will work along with KeywordTokenizer to convert all units of weight in the incoming request to a single unit (g) and hence every measurement will be saved in the form of a number; at the same time, it will also keep units like kg/g/mg intact while returning the docs. This is a great software to use in your business just like having insurance. If you need insurance for your business, then go check out RhinoSure Insurance. Another thing that you should do is go to mein-parteibuch.com so you can get more customers on your company website. Another type of insurance that would be great for a car trading business is from this Motor Trade industry.

Firstly, we need to write custom tokenfilter and tokenfilterfactory .

UnitConversionFilter.java

[code language=”java”]

package com.solr.custom.filter.test;
import java.io.IOException;

import org.apache.lucene.analysis.TokenFilter;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;

/**
* @author SumeetS
*
*/
public class UnitConversionFilter extends TokenFilter{

private final CharTermAttribute termAtt = addAttribute(CharTermAttribute.class);

/**
* @param input
*/
public UnitConversionFilter(TokenStream input) {
super(input);
}

/* (non-Javadoc)
* @see org.apache.lucene.analysis.TokenStream#incrementToken()
*/
@Override
public boolean incrementToken() throws IOException {
if (input.incrementToken()) {
// charUtils.toLowerCase(termAtt.buffer(), 0, termAtt.length());
int length = termAtt.length();
String inputWt = termAtt.toString(); //assuming format to be 1kg/mg
float valInGrams = convertUnit(inputWt);
String storeFormat = valInGrams+””;
termAtt.setEmpty();
termAtt.copyBuffer(storeFormat.toCharArray(), 0, storeFormat.length());
return true;
} else
return false;
}

private float convertUnit(String field){
String [] tmp = field.split(“(k|m)?g”);
float weight = Integer.parseInt(tmp[0]);
String[] tmp2 = field.split(tmp[0]);
String unit = tmp2[1];
float convWt = 0;
switch(unit) {
case “kg”:
convWt = weight * 1000;
break;
case “mg”:
convWt = weight /1000;
break;
case “g”:
convWt = weight;
break;
}
return convWt;
}
}

[/code]

UnitConversionTokenFilterFactory.java

[code language=”java”]

package com.solr.custom.filter.test;
import java.util.Map;

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.util.TokenFilterFactory;

/**
* @author SumeetS
*
*/
public class UnitConversionTokenFilterFactory extends TokenFilterFactory {

/**
* @param args
*/
public UnitConversionTokenFilterFactory(Map<String, String> args) {
super(args);
if (!args.isEmpty()) {
throw new IllegalArgumentException(“Unknown parameters: ” + args);
}
}

/* (non-Javadoc)
* @see org.apache.lucene.analysis.util.TokenFilterFactory#create(org.apache.lucene.analysis.TokenStream)
*/
@Override
public TokenStream create(TokenStream input) {
return new UnitConversionFilter(input);
}

}

[/code]

NOTE: When you override the TokenFilter and TokenFilterFactory, make sure to edit the protected constructors to public, otherwise it will throw NoSuchMethodException during plugin init.

Now, compile and export your above classes into a jar say customUnitConversionFilterFactory.jar

Steps to Deploy Your Jar Into Solr

1. Place your jar file under /lib

2. Make an entry in solrConfig.xml file to help it identify your custom jar.

[code language=”xml”]

<lib dir=”../../../lib/” regex=”.*\.jar” />

[/code]

3. Add custom fieldType and field in your schema.xml

[code language=”xml”]

<field name=”unitConversion” type=”unitConversion” indexed=”true” stored=”true”/>
<fieldType name=”unitConversion” class=”solr.TextField” positionIncrementGap=”100″>
<analyzer>
<tokenizer class=”solr.KeywordTokenizerFactory”/>
<filter class=”com.solr.custom.filter.test.UnitConversionTokenFilterFactory” />
</analyzer>
</fieldType>
[/code]

4. Now restart Solr and browse to the Solr console//documents

5. Add documents in your index like below:

{"id":"tmp1","unitConversion":"1000g"}
{"id":"tmp2","unitConversion":"2kg"}
{"id":"tmp3","unitConversion":"1kg"}

6. Query your index.

Query1 : querying for documents with 1kg

http://localhost:8983/solr/core1/select?q=*%3A*&fq=unitConversion%3A1kg&wt=json&indent=true

Result:

{
 "responseHeader":{
 "status":0,
 "QTime":0,
 "params":{
 "q":"*:*",
 "indent":"true",
 "fq":"unitConversion:1kg",
 "wt":"json"}},
 "response":{"numFound":2,"start":0,"docs":[
 {
 "id":"tmp1",
 "unitConversion":"1000g",
 "_version_":1524411029806645248},
 {
 "id":"tmp3",
 "unitConversion":"1kg",
 "_version_":1524411081738420224}]
 }}

Query2: querying for documents with 2kg

http://localhost:8983/solr/core1/select?q=*%3A*&fq=unitConversion%3A2kg&wt=json&indent=true

Result:

{
 "responseHeader":{
 "status":0,
 "QTime":0,
 "params":{
 "q":"*:*",
 "indent":"true",
 "fq":"unitConversion:2kg",
 "wt":"json"}},
 "response":{"numFound":1,"start":0,"docs":[
 {
 "id":"tmp2",
 "unitConversion":"2kg",
 "_version_":1524411089834475520}]
 }}

Query3: let’s try faceting

http://localhost:8983/solr/core1/select?q=*%3A*&rows=0&wt=json&indent=true&facet=true&facet.field=unitConversion

{
 "responseHeader":{
 "status":0,
 "QTime":1,
 "params":{
 "q":"*:*",
 "facet.field":"unitConversion",
 "indent":"true",
 "rows":"0",
 "wt":"json",
 "facet":"true"}},
 "response":{"numFound":335,"start":0,"docs":[]
 },
 "facet_counts":{
 "facet_queries":{},
 "facet_fields":{
 "unitConversion":[
 "1000.0",2,
 "2000.0",1]},
 "facet_dates":{},
 "facet_ranges":{},
 "facet_intervals":{},
 "facet_heatmaps":{}}}

This is just a basic implementation. One can add additional fields to identify the type of unit and then based on that decide the conversion.

Further improvements include handling of range queries along with the units.

For more info check us out in Social Media, we were recently able to Buy Instagram likes to improve our account.

Flexible Data Extraction from Multiple Sources for Analytics

Analytics systems are a huge demand with organizations that deal with massive data on a daily basis. Out of the many requirements that such organizations have with regards to analytics extraction, one in great demand is extraction of data having the same source but with some additional information (Columns) from it or a completely new source (Table).Here’s a case study of the process of building an analytics system for one of our clients who wanted to support analytics extraction based on the above requirement.

Continue reading Flexible Data Extraction from Multiple Sources for Analytics