You are migrating a SQL table to a PySpark DataFrame. The SQL table has three columns:
1. EmployeeID (BIGINT),
2. Salary (FLOAT), and
3. JoinDate (DATE).
What are the three corresponding PySpark DataType classes you would use to define the schema for this DataFrame?
LongType()FloatType()DateType()INT in SQL becomes _ in PySpark.VARCHAR or CHAR in SQL becomes _ in PySpark.TIMESTAMP in SQL becomes _ in PySpark.DECIMAL(10,2) in SQL becomes _ in PySpark.IntegerType()StringType()TimestampType()DecimalType(10,2)-- create SalesDB database CREATE DATABASE SalesDB;
Convert the above SQL code into PySpark code.
Hint: You can wrap standard SQL commands in spark.sql("...")
spark.sql('CREATE DATABASE IF NOT EXISTS SalesDB')Commands not directly related to DataFrames can be wrapped like this.
-- use the recently created SalesDB USE SalesDB;
Convert the above SQL code into PySpark code.
Hint: databases created by user are stored in spark.catalog.
spark.catalog.setCurrentDatabase('SalesDB')DROP DATABASE SalesDB;
Convert the above SQL code into PySpark code.
spark.sql('DROP DATABASE SalesDB')SHOW DATABASES;
Convert the above SQL code into PySpark code.
spark.sql('SHOW DATABASES').show()In PySpark, manual schema management is a professional necessity because it prevents the overhead of Spark having to _ the schema by reading the data twice.
“infer”
CREATE TABLE customers ( id int not null, name varchar(15) not null, country varchar(15), score int );
Create schema for the customers_df DataFrame in PySpark.
cust_schema = StructType([
StructField('id', IntegerType(), False),
StructField('name', StringType(), False),
StructField('country', StringType(), True),
StructField('score', IntegerType(), True)
])Create an empty DataFrame customers_df in PySpark.
Assume you’ve already defined cust_schema as the schema for the DataFrame.
spark.createDataFrame([], cust_schema)
Write PySpark code to read CSV file located at path: “path/to/customers_data.csv” with predefined schema cust_schema.
spark.read.format('csv') \
.option('header', "true") \
.schema(cust_schema) \
.load('path/to/customers_data.csv')DESCRIBE TABLE customers;
Convert the above SQL code into PySpark code.
customers_df.printSchema()
ALTER TABLE customers ALTER COLUMN score TYPE BIGINT;
/* Step 1: Load the table into a DataFrame */
customers_df = spark.table("customers")
/* Step 2: Cast the column 'score' to BIGINT */
customers_casted_df = customers_df.withColumn("score", customers_df["score"].cast("bigint"))
/* Step 3: Overwrite the existing table with the updated schema */
customers_casted_df.write.mode("overwrite").saveAsTable("customers")Imagine you are working on a data pipeline. You have a DataFrame named clean_df that contains processed customer data, and you want to save it as a “permanent table” named gold_customers using the Parquet format.
What is the PySpark code you would use to save this DataFrame as a table?
Think of a DataFrame like a temporary spreadsheet open on your screen; if you close the program without saving, it’s gone. Using a specific function from DataFrameWriter class helps storing that spreadsheet into a physical cabinet (the Data Catalog)
clean_df.write.format('parquet').saveAsTable('gold_customers')ALTER TABLE customers ADD COLUMN region VARCHAR(50);
Convert the above SQL code into PySpark code.
customers_df.withColumn('region', lit(None).cast('string'))This method introduces a projection internally. Therefore, calling it multiple times can cause performance issues.
Update the table customer stored in catalog by adding a new column region to it.
/* load the table as df */
customers_df = spark.table('customers')
/* add column and save with overwrite */
customers_df.withColumn('region', StringType(), True) \
.write.mode('overwrite').saveAsTable('customers')ALTER TABLE customers RENAME COLUMN score TO sales;
Convert the above SQL code into PySpark code.
customers_df.withColumnRenamed('score', 'sales')ALTER TABLE customers DROP COLUMN region;
Convert the above SQL code into PySpark code.
customers_df.drop('region')Partitioning divides your data physically into sub-folders based on a specific column. This allows Spark to _ reading irrelevant data during a query—a process known as Partition Pruning
“skip”
Imagine you are working with a massive dataset of retail transactions. You frequently run queries that filter data by category (e.g., SELECT * FROM sales WHERE category = 'Electronics').
.write operation?sales_df DataFrame. Write PySpark code to save this in parquet format, with data partitioned by categories.partitionBy('category')sales_df.write.partitionBy('category').format('parquet').saveAsTable('category_sales')Partitioning works excellent for columns with low to medium “cardinality” (a small or moderate number of unique values, like “category”). However, partitioning by a _ column like user_id creates millions of tiny folders, which degrades read/write performance.
high-cardinality
For each of the above statements, determine if True or False.
To handle high-cardinality columns, we use Bucketing. Instead of creating a folder for every unique value, Bucketing distributes the data across a fixed number of files (buckets) based on a hash of the _ .
Bucketing affects physical file layout by controlling how rows are distributed among multiple files (inside each partition).
column value
Scenario: You have a massive dataset of transactions. You frequently join this dataset with a users table on the user_id column. You decide to bucket the transactions table into 20 buckets using the user_id column to optimize these future joins.
Question: What is the specific PySpark code to save your transactions_df DataFrame as a table named “final_transactions”, bucketed (by the user_id column) into 20 buckets?
transactions_df.write.bucketBy(20, 'user_id').sortBy('user_id').format('parquet').saveAsTable('final_transactions')s3://my-data-bucket/warehouse/final_transactions/
├── _SUCCESS
├── part-00000-tid-12345-bucket00000.parquet
├── part-00001-tid-12345-bucket00001.parquet
├── part-00002-tid-12345-bucket00002.parquet
…
└── part-00019-tid-12345-bucket00019.parquet
Consider the Hive SQL below:
CREATE TABLE purchases_bucketed_sorted (
user_id BIGINT,
purchase_id STRING,
purchase_amt DECIMAL(10,2),
-- other columns here
)
PARTITIONED BY (category STRING)
CLUSTERED BY (user_id)
SORTED BY (user_id ASC)
INTO 128 BUCKETS
STORED AS PARQUET;Write the equivalent PySpark code for this.
df.write.partitionBy('category').bucketBy(128, "user_id").sortBy("user_id").saveAsTable("purchases_bucketed_sorted")