SQL PySpark Conversion Flashcards

https://notebooklm.google.com/notebook/d7475926-0ffb-4005-b0e3-7f3f2c55278b (36 cards)

1
Q

You are migrating a SQL table to a PySpark DataFrame. The SQL table has three columns:
1. EmployeeID (BIGINT),
2. Salary (FLOAT), and
3. JoinDate (DATE).

What are the three corresponding PySpark DataType classes you would use to define the schema for this DataFrame?

A
  1. LongType()
  2. FloatType()
  3. DateType()
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q
  1. INT in SQL becomes _ in PySpark.
  2. VARCHAR or CHAR in SQL becomes _ in PySpark.
  3. TIMESTAMP in SQL becomes _ in PySpark.
  4. DECIMAL(10,2) in SQL becomes _ in PySpark.
A
  1. IntegerType()
  2. StringType()
  3. TimestampType()
  4. DecimalType(10,2)
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q
-- create SalesDB database
CREATE DATABASE SalesDB;

Convert the above SQL code into PySpark code.

Hint: You can wrap standard SQL commands in spark.sql("...")

A
spark.sql('CREATE DATABASE IF NOT EXISTS SalesDB')

Commands not directly related to DataFrames can be wrapped like this.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q
-- use the recently created SalesDB
USE SalesDB;

Convert the above SQL code into PySpark code.

Hint: databases created by user are stored in spark.catalog.

A
spark.catalog.setCurrentDatabase('SalesDB')
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q
DROP DATABASE SalesDB;

Convert the above SQL code into PySpark code.

A
spark.sql('DROP DATABASE SalesDB')
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q
SHOW DATABASES;

Convert the above SQL code into PySpark code.

A
spark.sql('SHOW DATABASES').show()
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

In PySpark, manual schema management is a professional necessity because it prevents the overhead of Spark having to _ the schema by reading the data twice.

A

“infer”

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q
CREATE TABLE customers (
  id int not null,
  name varchar(15) not null,
  country varchar(15),
  score int
);

Create schema for the customers_df DataFrame in PySpark.

A
cust_schema = StructType([
StructField('id', IntegerType(), False),
StructField('name', StringType(), False),
StructField('country', StringType(), True),
StructField('score', IntegerType(), True)
])
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Create an empty DataFrame customers_df in PySpark.
Assume you’ve already defined cust_schema as the schema for the DataFrame.

A
spark.createDataFrame([], cust_schema)
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

Write PySpark code to read CSV file located at path: “path/to/customers_data.csv” with predefined schema cust_schema.

A
spark.read.format('csv') \
.option('header', "true") \
.schema(cust_schema) \
.load('path/to/customers_data.csv')
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q
DESCRIBE TABLE customers;

Convert the above SQL code into PySpark code.

A
customers_df.printSchema()
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q
ALTER TABLE customers
ALTER COLUMN score TYPE 
BIGINT;
A
/* Step 1: Load the table into a DataFrame */
customers_df = spark.table("customers")

/* Step 2: Cast the column 'score' to BIGINT */
customers_casted_df = customers_df.withColumn("score", customers_df["score"].cast("bigint"))

/* Step 3: Overwrite the existing table with the updated schema */
customers_casted_df.write.mode("overwrite").saveAsTable("customers")
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

Imagine you are working on a data pipeline. You have a DataFrame named clean_df that contains processed customer data, and you want to save it as a “permanent table” named gold_customers using the Parquet format.
What is the PySpark code you would use to save this DataFrame as a table?

Think of a DataFrame like a temporary spreadsheet open on your screen; if you close the program without saving, it’s gone. Using a specific function from DataFrameWriter class helps storing that spreadsheet into a physical cabinet (the Data Catalog)

A
clean_df.write.format('parquet').saveAsTable('gold_customers')
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q
ALTER TABLE customers
ADD COLUMN region VARCHAR(50);

Convert the above SQL code into PySpark code.

A
customers_df.withColumn('region', lit(None).cast('string'))

This method introduces a projection internally. Therefore, calling it multiple times can cause performance issues.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

Update the table customer stored in catalog by adding a new column region to it.

A
/* load the table as df */
customers_df = spark.table('customers')

/* add column and save with overwrite */
customers_df.withColumn('region', StringType(), True) \
.write.mode('overwrite').saveAsTable('customers')
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q
ALTER TABLE customers
RENAME COLUMN score TO sales;

Convert the above SQL code into PySpark code.

A
customers_df.withColumnRenamed('score', 'sales')
17
Q
ALTER TABLE customers 
DROP COLUMN region;

Convert the above SQL code into PySpark code.

A
customers_df.drop('region')
18
Q

Partitioning divides your data physically into sub-folders based on a specific column. This allows Spark to _ reading irrelevant data during a query—a process known as Partition Pruning

19
Q

Imagine you are working with a massive dataset of retail transactions. You frequently run queries that filter data by category (e.g., SELECT * FROM sales WHERE category = 'Electronics').

  1. If you want to save your DataFrame so that these ‘category’ queries run as fast as possible, what PySpark method would you chain to your .write operation?
  2. Assume you’ve sales_df DataFrame. Write PySpark code to save this in parquet format, with data partitioned by categories.
A
  1. partitionBy('category')
  2. sales_df.write.partitionBy('category').format('parquet').saveAsTable('category_sales')
20
Q

Partitioning works excellent for columns with low to medium “cardinality” (a small or moderate number of unique values, like “category”). However, partitioning by a _ column like user_id creates millions of tiny folders, which degrades read/write performance.

A

high-cardinality

21
Q
  1. Each partition creates a folder in S3.
  2. Each bucket creates a folder in S3.

For each of the above statements, determine if True or False.

22
Q

To handle high-cardinality columns, we use Bucketing. Instead of creating a folder for every unique value, Bucketing distributes the data across a fixed number of files (buckets) based on a hash of the _ .
Bucketing affects physical file layout by controlling how rows are distributed among multiple files (inside each partition).

23
Q

Scenario: You have a massive dataset of transactions. You frequently join this dataset with a users table on the user_id column. You decide to bucket the transactions table into 20 buckets using the user_id column to optimize these future joins.
Question: What is the specific PySpark code to save your transactions_df DataFrame as a table named “final_transactions”, bucketed (by the user_id column) into 20 buckets?

A
transactions_df.write.bucketBy(20, 'user_id').sortBy('user_id').format('parquet').saveAsTable('final_transactions')

s3://my-data-bucket/warehouse/final_transactions/
├── _SUCCESS
├── part-00000-tid-12345-bucket00000.parquet
├── part-00001-tid-12345-bucket00001.parquet
├── part-00002-tid-12345-bucket00002.parquet

└── part-00019-tid-12345-bucket00019.parquet

24
Q

Consider the Hive SQL below:

CREATE TABLE purchases_bucketed_sorted (
    user_id       BIGINT,
    purchase_id   STRING,
    purchase_amt  DECIMAL(10,2),
    -- other columns here
)
PARTITIONED BY (category STRING)
CLUSTERED BY (user_id) 
SORTED BY (user_id ASC) 
INTO 128 BUCKETS
STORED AS PARQUET;

Write the equivalent PySpark code for this.

A
df.write.partitionBy('category').bucketBy(128, "user_id").sortBy("user_id").saveAsTable("purchases_bucketed_sorted")
25
What will be the equivalent PySpark code for the Hive SQL query: ``` INSERT INTO sales PARTITION (category) SELECT product_id, price, category FROM purchases; ``` Assume that target table sales is defined something like: ``` CREATE TABLE sales ( product_id STRING, price DECIMAL(10,2) ) PARTITIONED BY (category STRING); ```
``` # Read source table purchases_df = spark.table("purchases") Write into target table with dynamic partitioning on 'category' purchases_df.select("product_id", "price", "category") \ .write \ .insertInto("sales", overwrite=False) ```
26
``` CREATE VIEW electronics_sales_view AS SELECT * FROM sales WHERE category='Electronics'; ``` Write equivalent PySpark code for the above SQL statement. ## Footnote Assume that 'sales' is already a Spark table
# Assuming 'sales' is already a Spark table or DataFrame ``` /* Step 1: Load the sales table into a DataFrame */ sales_df = spark.table("sales") /* Step 2: Apply the filter condition */ electronics_sales_df = sales_df.filter(sales_df.category == "Electronics") /* Step 3: Register the filtered DataFrame as a temporary view */ electronics_sales_df.createOrReplaceTempView("electronics_sales_view") ```
27
``` DROP VIEW electronics_sales_view; ``` Write equivalent PySpark code for the above SQL statement.
``` spark.sql('DROP VIEW IF EXISTS electronics_sales_view') ```
28
``` /* Global view */ spark.sql("CREATE GLOBAL VIEW global_sales_view AS SELECT * FROM sales WHERE category='Electronics'") ``` Write PySpark code to show the data from `global_sales_view` view. ## Footnote global view is accessible across all Spark sessions in the same cluster, using `global_temp` namespace
``` spark.sql("SELECT * FROM global_temp.global_sales_view").show() ```
29
Save the DataFrame `df` in parquet format at path: '/path/to/file'
``` df.write.format('parquet').save('/path/to/file') ```
30
Save the DataFrame `df` in csv format, along with header at path: '/path/to/file'
``` df.write.format('csv').option('header', True).save('/path/to/file') ```
31
You need to query the `final_transactions` table that you just created. You want to see the `user_id` and `amount` for all transactions where the `amount` is greater than 1000, and you want the results sorted by `amount` in descending order, showing only the top 5 results. Write the PySpark code to perform this transformation. ## Footnote `select user_id, amount from final_transactions where amount > 1000 order by amount desc;`
``` final_transactions_df.filter(col('amount') > 1000).sort(col('amount').desc()).select(col('user_id'), col('amount')).limit(5) ```
32
``` SELECT DISTINCT category FROM sales; ```
``` sales_df.select("category").disti nct() ```
33
You are using the `final_transactions_df`. You need to calculate two things for every unique `user_id`: 1. The total sum of all their transactions (name this column `total_spent`). 2. The average transaction amount (name this column `avg_transaction`). Write the PySpark code to group the data by `user_id` and perform these two aggregations in one command.
``` final_transactions_df.groupBy('user_id').agg( sum(col('amount')).alias('total_spent'), avg(col('amount')).alias('avg_transaction') ) ```
34
You have the `final_transactions_df` which includes `user_id`, `amount`, and `transaction_date`. Your manager wants to see a report that shows every single transaction, but with one extra column: `running_total`. This column must show the cumulative sum of the `amount` for each specific `user_id`, ordered by the `transaction_date`. Question: Write the PySpark code to add this `running_total` column to your DataFrame without losing any of the original transaction rows.
``` from pyspark.sql.functions import sum from pyspark.sql.window import Window Your logic final_transactions_df.withColumn( 'running_total', sum('amount').over(Window.partitionBy('user_id').orderBy('transaction_date')) ) ```
35
1. Tie of 2 rows at Rank 1: Next is Rank _ . 2. Tie of 3 rows at Rank 1: Next is Rank _ .
1. 3 (= 1 + 2) 2. 4 (= 1 + 3)
36
You have the `final_transactions_df` which includes `user_id`, `amount`, and `transaction_date`. Your manager wants to see a report to find the rank of each transaction for a user based on the amount (highest amount gets rank 1). Question: Write the PySpark code to add this `transaction_rank` column to your DataFrame without losing any of the original transaction rows.
# We use col("amount").desc() to correctly handle descending order ``` from pyspark.sql.functions import col, rank from pyspark.sql.window import Window /* # 1. Define the Window Specification # We use col("amount").desc() to correctly handle descending order */ window_spec = Window.partitionBy("user_id").orderBy(col("amount").desc()) /* # 2. Apply the rank function within the over() clause */ result_df = final_transactions_df.withColumn( "transaction_rank", rank().over(window_spec) ) ```