Performance Optimization of Spark-SQL

What is Spark and Spark SQL?

Spark is an open source, scalable, massively parallel, an in-memory execution environment for running analytics applications. Think of it as an in-memory layer that sits above multiple data stores, where data can be loaded into memory and analyzed in parallel across a cluster. Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine.

Note: dataset mentioned in the following discussion is not dataset API in spark. It’s a set of data.

There are N numbers of things responsible for bad or below average performance. I am going to touch base only things that I come across and how I addressed those issues.

When optimizing the performance of Spark-SQL or any other distributed computing framework it’s very very important to know your data (KYD) first. Because spark job may run exceptionally well with one set of data and really bad for other.  We are going to address painful joins in the following scenarios

  1. Joins between datasets with different cardinalities
  2. Skewed data joins

How joins works?

Following diagram works how normally joins works in spark-SQL. This join is also known as shuffle join. Because when we join two datasets, let’s say A & B, data for one key is brought on one executor. (Small boxes inside rectangle show executors on different machines).

In some of the scenarios above operation can be problematic. In worst it may result in complete data movement of both datasets. This may result in network congestions and also increase I/O. Also if there is a lot of data against one key (skewed data) it can result in job failure or at least terribly slow down job execution. In the following topics, we will see how to solve these problems.

  1. a) Joins between datasets with different cardinalities:

 Joins can be of two types. Map-side join and Reduce-side join.  Both joins perform good or bad depending on the datasets.

1) Consider, there are two datasets A and B. When we join A & B in spark automatically reduce-side join happens. That means spark shuffle the data for similar keys on the same executor.  This join does well when the distribution of the data is uniform across the join-keys (keys/fields on which joins are happening)

e.g. If I join df_A.join(df_B, [key1, key2]) distribution of records for both key1 and key2 in both sets play an important role. If records are distributed uniformly across all join keys in both datasets taking part in this join.

To improve the performance of reduce join we should join on the minimal datasets. Minimal can be obtained by filtering out unwanted records before joins instead of doing this after the join. This way we can avoid unnecessary shuffling of data, as records for only those keys will be shuffled which are actually needed in results.

2) Now consider A is big datasets and B is a really small dataset. Now, what qualifies for the small dataset? As per spark documentation, 2GB is max-threshold for the auto-broadcast parameter. I am kind of agree with this threshold because this data will be copied to each executor.  Datasets which few MB to 2-3 GB in size must be broadcasted.

This number depends on how big are the executors in the cluster. Also, roughly size of dataset * number of executors should not bigger than the bigger dataset. Otherwise using simple reduce side join will be efficient.  Following diagram shows how broadcast joins works. You can see on each executor where partition for A exists, entire B dataset is made available.

Enough theory!! How we can broadcast smaller dataset? It’s really simple. We just need to provide the broadcast hint to spark processing engine and that’s it. Following code snippet showcase how it can be done.

With both above approach check the shuffle size in spark-UI. Even if shuffle size is less, that does not mean amount of data-transferred is less. Might be in case of broadcast join we are moving the larger amount of data.  Try out both to check performance and adopt which suits best for your need.

  1. b) Skewed data joins:

 Consider we are joining dataset A and B. Skewed data means data with the un-uniform distribution of records across keys. More precisely huge %tage of data have very few numbers of keys. Now due spark processing the way joins are performed a large amount of data is collected on single or very few executors. This has following effects

  • Tasks with large data run for a very very long time. Ultimately jobs take longer to finish. Other executors sit idle during this time. So ineffective usage of resources.
  • If records for the key are too large, executors run out of memory and job fails.

Problem is explained/shown in the following diagram

So one obvious solution of that is to remove these keys with skewed data. But that may not be possible every time.  In such cases where it’s mandatory to process everything, we have two solutions

  • Broadcast smaller dataset, if one of data is smaller
  • If broadcasting is not possible, we have to split the data against skewed keys to more number of executors by adding the limited-range random number in join key in the dataset which has skewed records. Let’s call it A.  But doing this will produce wrong results as records for skewed keys from other datasets (let’s call it B) may not be available at executors where we have distributed records from A.

To handle this we will multiply data in set B by cross join with limited range number dataset (lets 1-30 for distributing it to 30 records). Also, the random number within 1-30 range to each record in set A. Now join A and B on original join key “key1” and “random_val”. This will avoid executors from being flooded with a large number of keys. But be careful if we increase range 1-30 too much it will grow the data B exponentially. This is explained in the following diagram

Here is how it can be done –

Conclusion:

  • Broadcast joins are really helpful in the case of one small and one big dataset joins. It can drastically improve performance as well as network utilization.
  • Datasets with skewed data can be handled efficiently by distributing data to different executors by adding the key to it. It can improve the performance and stability of the job run.

Setting up development environment for Google App Engine and Python

Google App Engine is a PAAS offering from Google Cloud Platform, which enables you to build complex web solutions with significant ease without worrying too much about the scalability or infrastructure management. If you want to develop GAE applications using python and looking for a way to setup your development environment then this post is for you. Continue reading Setting up development environment for Google App Engine and Python

Build a Custom Solr Filter to Handle Unit Conversions

Recently, I came across a use case where it was required to handle units of weight in the index. For instance, 2kg and 2000g, when searched should return the same set of results.

So, for achieving the above, I wrote a custom Solr filter that will work along with KeywordTokenizer to convert all units of weight in the incoming request to a single unit (g) and hence every measurement will be saved in the form of a number; at the same time, it will also keep units like kg/g/mg intact while returning the docs. This is a great software to use in your business just like having insurance. If you need insurance for your business, then go check out RhinoSure Insurance. Another thing that you should do is go to mein-parteibuch.com so you can get more customers on your company website. Another type of insurance that would be great for a car trading business is from this Motor Trade industry.

Firstly, we need to write custom tokenfilter and tokenfilterfactory .

UnitConversionFilter.java

[code language=”java”]

package com.solr.custom.filter.test;
import java.io.IOException;

import org.apache.lucene.analysis.TokenFilter;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;

/**
* @author SumeetS
*
*/
public class UnitConversionFilter extends TokenFilter{

private final CharTermAttribute termAtt = addAttribute(CharTermAttribute.class);

/**
* @param input
*/
public UnitConversionFilter(TokenStream input) {
super(input);
}

/* (non-Javadoc)
* @see org.apache.lucene.analysis.TokenStream#incrementToken()
*/
@Override
public boolean incrementToken() throws IOException {
if (input.incrementToken()) {
// charUtils.toLowerCase(termAtt.buffer(), 0, termAtt.length());
int length = termAtt.length();
String inputWt = termAtt.toString(); //assuming format to be 1kg/mg
float valInGrams = convertUnit(inputWt);
String storeFormat = valInGrams+””;
termAtt.setEmpty();
termAtt.copyBuffer(storeFormat.toCharArray(), 0, storeFormat.length());
return true;
} else
return false;
}

private float convertUnit(String field){
String [] tmp = field.split(“(k|m)?g”);
float weight = Integer.parseInt(tmp[0]);
String[] tmp2 = field.split(tmp[0]);
String unit = tmp2[1];
float convWt = 0;
switch(unit) {
case “kg”:
convWt = weight * 1000;
break;
case “mg”:
convWt = weight /1000;
break;
case “g”:
convWt = weight;
break;
}
return convWt;
}
}

[/code]

UnitConversionTokenFilterFactory.java

[code language=”java”]

package com.solr.custom.filter.test;
import java.util.Map;

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.util.TokenFilterFactory;

/**
* @author SumeetS
*
*/
public class UnitConversionTokenFilterFactory extends TokenFilterFactory {

/**
* @param args
*/
public UnitConversionTokenFilterFactory(Map<String, String> args) {
super(args);
if (!args.isEmpty()) {
throw new IllegalArgumentException(“Unknown parameters: ” + args);
}
}

/* (non-Javadoc)
* @see org.apache.lucene.analysis.util.TokenFilterFactory#create(org.apache.lucene.analysis.TokenStream)
*/
@Override
public TokenStream create(TokenStream input) {
return new UnitConversionFilter(input);
}

}

[/code]

NOTE: When you override the TokenFilter and TokenFilterFactory, make sure to edit the protected constructors to public, otherwise it will throw NoSuchMethodException during plugin init.

Now, compile and export your above classes into a jar say customUnitConversionFilterFactory.jar

Steps to Deploy Your Jar Into Solr

1. Place your jar file under /lib

2. Make an entry in solrConfig.xml file to help it identify your custom jar.

[code language=”xml”]

<lib dir=”../../../lib/” regex=”.*\.jar” />

[/code]

3. Add custom fieldType and field in your schema.xml

[code language=”xml”]

<field name=”unitConversion” type=”unitConversion” indexed=”true” stored=”true”/>
<fieldType name=”unitConversion” class=”solr.TextField” positionIncrementGap=”100″>
<analyzer>
<tokenizer class=”solr.KeywordTokenizerFactory”/>
<filter class=”com.solr.custom.filter.test.UnitConversionTokenFilterFactory” />
</analyzer>
</fieldType>
[/code]

4. Now restart Solr and browse to the Solr console//documents

5. Add documents in your index like below:

{"id":"tmp1","unitConversion":"1000g"}
{"id":"tmp2","unitConversion":"2kg"}
{"id":"tmp3","unitConversion":"1kg"}

6. Query your index.

Query1 : querying for documents with 1kg

http://localhost:8983/solr/core1/select?q=*%3A*&fq=unitConversion%3A1kg&wt=json&indent=true

Result:

{
 "responseHeader":{
 "status":0,
 "QTime":0,
 "params":{
 "q":"*:*",
 "indent":"true",
 "fq":"unitConversion:1kg",
 "wt":"json"}},
 "response":{"numFound":2,"start":0,"docs":[
 {
 "id":"tmp1",
 "unitConversion":"1000g",
 "_version_":1524411029806645248},
 {
 "id":"tmp3",
 "unitConversion":"1kg",
 "_version_":1524411081738420224}]
 }}

Query2: querying for documents with 2kg

http://localhost:8983/solr/core1/select?q=*%3A*&fq=unitConversion%3A2kg&wt=json&indent=true

Result:

{
 "responseHeader":{
 "status":0,
 "QTime":0,
 "params":{
 "q":"*:*",
 "indent":"true",
 "fq":"unitConversion:2kg",
 "wt":"json"}},
 "response":{"numFound":1,"start":0,"docs":[
 {
 "id":"tmp2",
 "unitConversion":"2kg",
 "_version_":1524411089834475520}]
 }}

Query3: let’s try faceting

http://localhost:8983/solr/core1/select?q=*%3A*&rows=0&wt=json&indent=true&facet=true&facet.field=unitConversion

{
 "responseHeader":{
 "status":0,
 "QTime":1,
 "params":{
 "q":"*:*",
 "facet.field":"unitConversion",
 "indent":"true",
 "rows":"0",
 "wt":"json",
 "facet":"true"}},
 "response":{"numFound":335,"start":0,"docs":[]
 },
 "facet_counts":{
 "facet_queries":{},
 "facet_fields":{
 "unitConversion":[
 "1000.0",2,
 "2000.0",1]},
 "facet_dates":{},
 "facet_ranges":{},
 "facet_intervals":{},
 "facet_heatmaps":{}}}

This is just a basic implementation. One can add additional fields to identify the type of unit and then based on that decide the conversion.

Further improvements include handling of range queries along with the units.

For more info check us out in Social Media, we were recently able to Buy Instagram likes to improve our account.

Flexible Data Extraction from Multiple Sources for Analytics

Analytics systems are a huge demand with organizations that deal with massive data on a daily basis. Out of the many requirements that such organizations have with regards to analytics extraction, one in great demand is extraction of data having the same source but with some additional information (Columns) from it or a completely new source (Table).Here’s a case study of the process of building an analytics system for one of our clients who wanted to support analytics extraction based on the above requirement.

Continue reading Flexible Data Extraction from Multiple Sources for Analytics