Question 21: Which of the function (Scala and Python), you will be using to create PairRDD?

Answer: We can use map () function to create PairRDD. For example x has two values, then PairRDD can be created as below.

map(x => (x(0), x(1)))

 

Question22: Please explain, how combineByKey() works?

Answer: Lets understand first that combineByKey() has three arguments. We need to understand use of each one.

  • createCombiner: It works locally on each node, on each individual key’s value part.
  • MergeValue: It combines all the values locally on each node.
  • MergeCombiner : It combines all the values across the nodes, to generate final expected result.

Let’s take an example, we have following dataset (three different keys, total 9 key-value pair)

(hadoop, 5000), (spark,7000),(NiFi,6000), (hadoop, 7000), (spark,9000),(NiFi,4000) , (hadoop, 7500), (spark,9500),(NiFi,4500)

Now calculate the average of each key. We need to sum all the values for same key, as well as we need count/occurrence of each key.

We wanted to calculate the average for each key. In this case, we need to group/combine all the values from same key. Hence, my final result should be as below (key, (totalForEachDistinctKey,count)).

(hadoop, (19500,3))

(spark, (25500,3))

(NiFi, (14500, 3))

This is what is expected, when you execute combine by key on given dataset.

Keep in mind that, Spark works on cluster, hence each node independently work on different subset of data.  Hence, locally data needs to be first combined on each node. Let’s assume we have two node cluster and data is partitioned as below.

Node-1

Node-2

(hadoop, 5000), (spark,7000),(NiFi,6000), (hadoop, 7000), (spark,9000)

(NiFi,4000) , (hadoop, 7500), (spark,9500),(NiFi,4500)

 

Step-1: On each node combiner should work locally

On Node-1, local combiner should generate data something like that, this is the responsibility of the first argument, which is createCombiner (lambda x: (x, 1)), Here x, is the value part of the key-value pair.

Node-1

Node-2

(hadoop, (5000,1))

(spark, (7000,1))

(NiFi, (6000,1))

(hadoop, (7000,1))

(spark,(9000,1))

 

(NiFi,(4000,1))

(hadoop, (7500,1))

(spark,(9500,1))

(NiFi,(4500,1))

 

Step-2: In next step merge all the values, locally on each node as below. You can use below lambda function for that

lambda valuecountpair , val => (valuecountpair[0]+val, valuecountpair[1]+1)

 

Node-1

Node-2

(hadoop, (12000,2))

(spark, (16000,2))

(NiFi, (6000,1))

 

(NiFi,(8500,2))

(hadoop, (7500,1))

(spark,(9500,1))

 

 

Step-3: As, you can see that, now all values are locally on each node generated. We need to aggregate these values, across the node. We will use the MergeCombiner for that, which will combine all the values across the nodes. Lambda function would be as below.

Lambda valuecountpair,nextvaluecountpair => ((valuecountpair[0]+nextvaluecountpair[0])

                                                                                     ,( valuecountpair[1]+nextvaluecountpair[1]))

 

(hadoop, (19500,3))

(spark, (25500,3))

(NiFi, (14500,3))

 

Question 23: Which all are the shared variables are provided by the Spark framework?

Answer: In Spark shared variables means, the variables which provides data sharing globally across the nodes. This can be implemented using below two variables, each has different purpose.

  • Broadcast variables: Read only variables cached on each node on the cluster. This variables cannot be updated on individual node. It is more of a same data, you want to share across the nodes, during data processing.
  • Accumulators: This variable can be updated on each individual node. However, final value will be aggregated, which is sent by each individual node.

Question 24: Please give us the scenario, in which case you will be using broadcast and accumulator shared variable?

Answer:

Broadcast variables: You can use it as a cached data on each node. So whenever we need most frequently used small dataset which entire data processing. Then ask Spark to cache this small dataset on each node, this can be done using broadcast variable and during calculation, you can refer this cached data.

You can set the broadcast variable using driver program, and will be retrieved by the worker node on the cluster. Remember, broadcast variable will be retrieved and cached only when first read request is sent.

Accumulator: You can consider them more as a global counter. Remember they are not read-only variables, on each worker node, executor will update the counter independently. Then driver program will accumulate all the accumulator from worker node and generate aggregated result.

So you can use them, when you need to do some counting like how many messages were not processed correctly. So using accumulator on each node individual count will be generated for the messages which are not processed, and at the last at driver side all the count will be accumulated, and you will get to know, which all messages are not processed.

Question 25: How do you define ETL process?

Answer: ETL extends to extraction, transformation and loading. This is where, we create data pipelines for data movement and transformation. In short there are three stages (now a days order of ETL steps can be re-ordered and sometime it could be ELT)

  • Extract: You will extract data from most of the source systems like RDBMS, FlatFiles, Social Networking feed, web log files etc. Data can be in various formats like XML, CSV,JSON, Parquet, AVRO, also frequency of the data retrieval can also be defined as daily, hourly etc.
  • Transform: In this step you will be transforming data as per your downstream system expect. For example from text file, you can create JSON file. Like changing the file formats, similarly you can filter valid and invalid data. In this step you would do many sub-steps to clean your data as next step expected.
  • Loading: This step refer to send the data in the sink, where you have defined. In hadoop world it could be HDFS, Hive tables, HDFS etc. In case of RDBMS it could be MySQL, Oracle and for NoSQL it could be Cassandra, MongoDB

However, please note that, Spark is not an ETL tool, you can have some ETL job done using entirely Spark framework.