Spark from R

Working with big data for modeling purposes sometimes can be challenging due to memory issues. In this post, we will understand how to prepare a dataset with a machine learning pipeline for modeling with Spark in R.

Turgut Abdullayev https://github.com/henry090 (AccessBank Azerbaijan)https://www.accessbank.az/en/
11-18-2019

Table of Contents


What is Spark?

Apache Spark is an open-source system which is very effective for large-scale data processing. Spark has become much faster1 and now it provides users with different tools2, such as:

So, in this post we will mainly concentrate our attention on building a model by loading a dataset directly into spark. But before building a pipeline for modeling, the power of dplyr library8 will also be shown in this article.

Where is a starting point?

R provides users with a sparklyr9 library which is very straightforward to understand because it uses a dplyr database interface.

You can install the sparklyr package from CRAN as follows:


install.packages("sparklyr")

Later just install spark via:


library(sparklyr)
spark_install()

When it is done, we can begin building data transformation for modeling. But one more step is required as well:

Connection to local instance.


library(sparklyr)
sc <- spark_connect(master = "local")

The dataset for this post will be a binary classification Kaggle competition. The purpose is to find out whether a customer will make a transaction or not. Our target variable is called target as well. Let’s print the top rows of dataset.


df
# Source: spark<train> [?? x 202]
   ID_code target var_0   var_1 var_2 var_3 var_4   var_5 var_6 var_7  var_8 var_9 var_10  
   <chr>    <int> <dbl>   <dbl> <dbl> <dbl> <dbl>   <dbl> <dbl> <dbl>  <dbl> <dbl>  <dbl>   
 1 train_0      0  8.93 -6.79   11.9   5.09 11.5   -9.28   5.12  18.6 -4.92   5.75  2.93    
 2 train_1      0 11.5  -4.15   13.9   5.39 12.4    7.04   5.62  16.5  3.15   8.09 -0.403    
 3 train_2      0  8.61 -2.75   12.1   7.89 10.6   -9.08   6.94  14.6 -4.92   5.95 -0.325 
 4 train_3      0 11.1  -2.15    8.95  7.20 12.6   -1.84   5.84  14.9 -5.86   8.24  2.31   
 5 train_4      0  9.84 -1.48   12.9   6.64 12.3    2.45   5.94  19.3  6.27   7.68 -9.45 
 6 train_5      0 11.5  -2.32   12.6   8.63 11.0    3.56   4.53  15.2  3.59   5.98  0.801  
 7 train_6      0 11.8  -0.0832  9.35  4.29 11.1   -8.02   6.20  12.1 -4.38   7.92 -5.13  
 8 train_7      0 13.6  -7.99   13.9   7.60  8.65   0.831  5.69  22.3  5.06   7.20  1.45   
 9 train_8      0 16.1   2.44   13.9   5.63  8.80   6.16   4.45  10.2 -3.19   9.08  0.950  
10 train_9      0 12.5   1.97    8.90  5.45 13.6  -16.3    6.06  16.8  0.129  7.97  0.879  

Spark offers the following features:

There are a lot of features but we focuse only on data manipulation and machine learning (ML) part. So, lets write a few preprocess steps for our data and then work on modeling part.

Data preparation

Sparklyr has some great features as data bucketing like base cut function, quantile bucketing, NA imputation and other interesting functions. But one can ask: “How about grouping data then finding a mean?”. As it was mentioned, dplyr can fully meet your needs.

For example:


library(rmarkdown)
library(dplyr)

# group and collect result
df %>% group_by(var_196) %>% summarise(mean_197 = mean(var_196,na.rm = T)) %>% collect()

# cor
corr = sparklyr::ml_corr(df, columns = paste('var',seq(1,30,1),sep = '_'))

paged_table(corr, options = list(rows.print = 10))

You can even quickly plot a heatmap of some variables.


# cor plot
#cor_data = sparklyr::ml_corr(df, columns = paste('var',seq(1,30,1),sep = '_'))

#highcharter::hchart(round(as.matrix(cor_data),2), label = T)

We can binarizer and quantile bucketizer. The first will turn numeric vector into binary based on a cutoff point and the latter one applies a quantile based cutoffs. So, threshold points can be manually assigned.


# binarizer
ft_binarizer(df,input_col = 'var_0',output_col = 'var_0_binary',threshold=10) %>% select(var_0,var_0_binary)

# quantile bucketing
ft_quantile_discretizer(df,input_col = 'var_0',output_col = 'var_0_binary',
                        num_buckets = 10) %>%  select(var_0,var_0_binary)

Modeling part

Lets build a pipeline for modeling part. At first, we will be able to observe the steps of modeling. Later, when we are sure that every step is correct, the training process can be run.


# point out the source of connection "sc"
bank_pipeline <- ml_pipeline(sc) %>%
  # then select  data
  ft_dplyr_transformer(
    tbl = df %>% select(-contains('ID'))
  ) %>%
  # binarize var_0 column and assign threshold point for that
  ft_binarizer(
    input_col = "var_0",
    output_col = "var_0_binary",
    threshold = 10
  ) %>%
  # later fit an R formula
  ft_r_formula(formula = target ~ .) %>% 
  # and finally, prepare the parameters of logsitic or gradient boosting model
  #ml_logistic_regression()
  ml_gbt_classifier(max_depth = 20, max_iter = 3, max_bins = 2^10)

See the output of steps:


bank_pipeline 

It seems that everything is okay for now. Lets split the dataset into training and test sets:


partitioned_bank <- sdf_random_split(
  df,
  training = 0.03,
  testing = 0.07,
  rest = 0.8
)

Finally, fit out model which means to run the calculation:


bank_pipeline2 <- ml_fit(
  bank_pipeline,
  partitioned_bank$training
)

We can print the result of modeling:


bank_pipeline2 

If you want to make prediction on test dataset, ml_transform function is enough for this task:


predictions <- ml_transform(
  bank_pipeline2,
  partitioned_bank$testing
)

predictions %>%
  group_by(target, prediction) %>%
  tally()

Conclusion

In this post, we have shown how to build a model using spark from R. If you have questions or suggestions/corrections, please let us know.


  1. Faster than previous approaches to work with Big Data like classical MapReduce↩︎

  2. Spark↩︎

  3. Spark Core is the underlying general execution engine for the Spark platform that all other functionality is built upon. It provides in-memory computing and referencing datasets in external storage systems.↩︎

  4. Spark SQL is Apache Spark’s module for working with structured data. The interfaces offered by Spark SQL provides Spark with more information about the structure of both the data and the computation being performed.↩︎

  5. This component allows Spark to process real-time streaming data. Data can be ingested from many sources like Kafka, Flume, and HDFS (Hadoop Distributed File System). Then the data can be processed using complex algorithms and pushed out to file systems, databases, and live dashboards.↩︎

  6. Apache Spark is equipped with a rich library known as MLlib. This library contains a wide array of machine learning algorithms- classification, regression, clustering, and collaborative filtering. It also includes other tools for constructing, evaluating, and tuning ML Pipelines. All these functionalities help Spark scale out across a cluster.↩︎

  7. Spark also comes with a library to manipulate graph databases and perform computations called GraphX. GraphX unifies ETL (Extract, Transform, and Load) process, exploratory analysis, and iterative graph computation within a single system.↩︎

  8. dplyr is a grammar of data manipulation, providing a consistent set of verbs that help you solve the most common data manipulation challenges:↩︎

  9. github source↩︎

Reuse

Text and figures are licensed under Creative Commons Attribution CC BY 4.0. The figures that have been reused from other sources don't fall under this license and can be recognized by a note in their caption: "Figure from ...".

Citation

For attribution, please cite this work as

Abdullayev (2019, Nov. 18). Data Experts: Spark from R. Retrieved from http://dataexperts.tech/posts/2020-01-21-sparkr/

BibTeX citation

@misc{abdullayev2019spark,
  author = {Abdullayev, Turgut},
  title = {Data Experts: Spark from R},
  url = {http://dataexperts.tech/posts/2020-01-21-sparkr/},
  year = {2019}
}