Python Dataframes — via Snowpark an alternative to Oracle PL/SQL

David Ruthven
6 min readSep 29, 2023

--

It is difficult to find an alternative to PL/SQL for Data Engineering. PL/SQL is extremely efficient at querying and manipulating data and has convenient and high performance features such as bulk collections, cursors, runtime type binding, etc and of course it runs inside the Oracle database engine which reduces latency and preserves security.

I have been learning how to implement data transformation pipelines with Python Dataframes via Snowpark. Snowpark adds all the PL/SQL performance and security equivalence to Python Dataframes by executing all the queries and DML in the Snowflake query engine.

I can see why many programmers prefer to implement data pipelines using Python Dataframes. Dataframes have methods for selecting, filtering, aggregating, grouping, sorting, etc, all the capabilities you expect from SQL but with a very succinct syntax. Python supports the necessary procedural logic. Using notebooks as your development tool makes code easy to write and debug and there loads of code examples due to the popularity of Python.

As a PL/SQL Programmer why would I want to learn Python?

Python is one of (if not) the most popular programming languages. A lot of this is down to how easy it is to express logic in Python and also the enormous number of accessible libraries to extend functionality. It is also platform and database agnostic. If you want to build code that can endure changes to underlying platforms then Python is a great choice. As a PL/SQL programmer you will also find it remarkably easy to learn, which is not a reason to learn it but once you start exploring Python it will be quite rewarding and potentially career enhancing.

Can we see some examples?

Its always easiest to learn a new programming language if you compare it to one you already know well, so a great exercise for learning Python Dataframes (Snowpark is the Snowflake version) is to start with your own PL/SQL code and learn how you would implement the same in Snowpark.

The examples below kick off with a simple SQL statement with JOIN, FILTER, AGGREGATE and SORT steps and then some PL/SQL specifics such as BULK COLLECT, CURSOR, LOOP, LIMIT and FORALL.

SQL Example

Find the number of employees for each department where the department name starts with the letter ‘S’ and sort the result by highest number of employees.

SELECT
d.department_name, count(1)
FROM
employees e,
departments d
WHERE
d.department_name like 'S%'
AND
e.department_id = d.department_id
GROUP BY
d.department_name
ORDER BY
count(1) desc;

SQL Example: Snowpark Dataframe equivalent

Using Dataframes, methods can be added to implement each phase of the query. With Snowpark each step is just declarative, nothing is executed until data needs to be fetched. In this case in the final step.


# assign Dataframes as handles or pointers to the tables/views and required columns
employees = session.table("public.employees").select("department_id")
departments = session.table("public.departments").select("department_id", "department_name")

# execute the query
result = employees.join(departments,
(employees["department_id"] == departments["department_id"]) &
(departments["department_name"].like("S%")),
"inner"
).group_by("department_name").count().alias("count").sort(F.col("count").desc())

# Show the result
result.show()

PL/SQL BULK COLLECT

Fetch all the records for all employees in department 50. Display the count of matching employees. BULK COLLECT makes this efficient since it fetches all the qualifying rows in one step.

# PL/SQL BULK COLLECT

DECLARE
TYPE employee_info_t IS TABLE OF employees%ROWTYPE;
l_employees employee_info_t;
BEGIN
SELECT *
BULK COLLECT INTO l_employees
FROM employees
WHERE department_id = 50;
DBMS_OUTPUT.PUT_LINE (l_employees.COUNT);
END;

Snowpark BULK COLLECT (equivalent)

Below is the same query in Snowpark. The BULK COLLECT is implemented implicitly by assigning the results of the query to the dataframe. There is no need to declare types. When the bulk_df.count() command is run, the translated SQL query is executed on Snowflake.

# Snowpark BULK COLLECT

# assign filter query to dataframe
bulk_df = session.table("employees").filter(F.col("department_id") == 50)

# print count of rows
bulk_df.count()

PL/SQL BULK COLLECT with CURSOR, LOOP and LIMIT

Fetch employee_id’s for all employees in department 50. Fetch at most 10 rows at a time, display the row count per fetch.

# ---------------------------------------
# PL/SQL BULK COLLECT with LOOP and LIMIT
# ---------------------------------------

DECLARE
c_limit PLS_INTEGER := 10;

CURSOR employees_cur
IS
SELECT employee_id
FROM employees
WHERE department_id = 50;

TYPE employee_ids_t IS TABLE OF
employees.employee_id%TYPE;

l_employee_ids employee_ids_t;
BEGIN
OPEN employees_cur;

LOOP
FETCH employees_cur
BULK COLLECT INTO l_employee_ids
LIMIT c_limit;

DBMS_OUTPUT.PUT_LINE (l_employee_ids.COUNT || ' fetched');

EXIT WHEN l_employee_ids.COUNT = 0;
END LOOP;
END;

Snowpark BULK COLLECT with CURSOR, LOOP and LIMIT (equivalent)

Using Snowpark there is usually no real need to batch fetches, all operations are pushed down to Snowflake and leverage the memory of the Snowflake compute cluster.

# -----------------------------------------
# Snowpark BULK COLLECT with LOOP and LIMIT
# -----------------------------------------

c_limit = 10 # batch limit
c_emps = 0 # employee count

# Filter employees by department_id = 50
employees_cur = session.table("public.employees").filter(F.col("department_id") == 50)

# Collect employee_ids in batches
while True:
l_employee_ids = employees_cur.limit(c_limit, c_emps).select("employee_id").collect()

c_emps = c_emps + len(l_employee_ids);

print(len(l_employee_ids), "fetched")

if len(l_employee_ids) == 0:
break

PL/SQL FORALL with DELETE

Delete all employees in departments 50 or 100. This will issue two separate DELETEs one for each department. Of course this could be done in a single DELETE SQL statement but is just being used as a simple example.

# -------------
# PL/SQL FORALL
# -------------


DECLARE
TYPE ids_t IS TABLE OF employees.department_id%TYPE;
l_ids ids_t := ids_t (50, 100);
BEGIN
FORALL l_index IN 1 .. l_ids.COUNT
DELETE FROM employees
WHERE department_id = l_ids (l_index);

DBMS_OUTPUT.put_line (SQL%ROWCOUNT);
ROLLBACK;
END;

Python Snowpark FORALL (equivalent)

In the translated version we capture the counts for each delete and total them up.

# ---------------
# Snowpark FORALL
# ---------------

# Link the employees DataFrame
employees_df = session.table("employees")

l_ids = [50, 100] # list of department IDs to delete

deleted_rows_count = 0 # deleted row count

# Loop through the list of department IDs and perform the deletes
for department_id in l_ids:
# Filter the DataFrame to get rows with the specified department_id
filtered_df = employees_df.filter(F.col("department_id") == department_id)

# Get the count of rows to be deleted
rows_to_delete_count = filtered_df.count()

# Delete the rows
employees_df = employees_df.subtract(filtered_df)

# Update the deleted rows count
deleted_rows_count += rows_to_delete_count

# Print the number of deleted rows
print(deleted_rows_count)

How would migrating to Python Dataframes or Snowpark work with Oracle?

If you migrate your PL/SQL data pipelines to Snowflake via Snowpark then clearly all the data will reside in Snowflake. Hence you would have two options, you could replicate the processed data from Snowflake to Oracle or you could have Snowflake publish the data in Apache Iceberg format. Later this calendar year Oracle will support Apache Iceberg, see Building Data Lakes with Oracle: Exploring Innovations and Integrations.

If you don’t want to use Snowflake you could use PySpark based on Apache Spark and connect to Oracle directly, its not as efficient though!

How do I get started?

You already know PL/SQL but for examples and testing I used the PL/SQL examples from Oracle’s LiveSQL website, specifically the Bulk Processing with PL/SQL tutorial.

For Snowpark the Snowpark API and Snowpark API Reference (Python) documentation are handy references.

For translation assistance you can use ChatGPT . I used the free version which is trained up to September 2021 so hasn’t caught up with Snowpark but you can ask it to translate to PySpark which is similar and easily modified.

You can fire up a free Snowflake trial account at:

I acknowledge that the examples in this article are rather simple but hopefully illustrate some features of PL/SQL which are hard to implement in other stored procedure languages but are straightforward using Python Dataframes and Snowpark in particular.

Let me know how you get on!

--

--

David Ruthven

Snowflake Sales Engineer — opinions expressed are solely my own and do not express the views or opinions of my employer.