Python Spark SQL 02
Description

This Example - How to get list of databases and tables from spark catalog.
Prerequisites
- Python
- Spark
Go to spark/my-examples and create:
python-spark-sql-02.py
print ("import SparkContext")
from datetime import date
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, StructType, IntegerType, DateType, DecimalType
from decimal import Decimal
print("Connect to spark cluster on local")
sc = SparkSession.builder.config('spark.driver.host','localhost').appName('My_Pyspark_Test').getOrCreate()
print("define department dept_df schema")
dept_schema = StructType([
StructField('deptid', IntegerType(), False), StructField('deptname', StringType(), False)
])
print("define department dept_df data")
dept_data = [(10, 'Human Resources'), (20, 'Infrastructure'), (30, 'Administration'), (40, 'Governance')]
print("define employee emp_df schema")
emp_schema = StructType([
StructField('empid', IntegerType(), False), StructField('ename', StringType(), False), StructField('deptid', IntegerType(), False), StructField('doj', DateType(), False),
StructField('salary', DecimalType(), False)
])
print("define employee emp_df data")
emp_data = [
(1, 'Patrick', 10, date(2015, 1, 1), Decimal(1000.00)), (2, 'Lisbon', 10, date(2016, 1, 1), Decimal(1500.00)), (3, 'Cho', 20, date(2017, 1, 1), Decimal(800.00)),
(4, 'Rigsby', 20, date(2018, 1, 1), Decimal(200.00)), (5, 'VanPelt', 30, date(2019, 1, 2), Decimal(8000.00)), (6, 'Charlotte', 30, date(2020, 1, 1), Decimal(5000.00))
]
print("\ncreate department dataframe")
dept_df = sc.createDataFrame(data = dept_data, schema = dept_schema)
dept_df.show()
print("\ncreate employee dataframe")
emp_df = sc.createDataFrame(data = emp_data, schema = emp_schema)
emp_df.show()
print("DROP TABLE IF EXISTS department_tbl")
sc.sql("DROP TABLE IF EXISTS department_tbl")
print("\nwrite to hive table department_tbl")
dept_df.write.saveAsTable("department_tbl")
print("write to hive table employee_tbl")
emp_df.write.saveAsTable("employee_tbl")
print("\nshow all the catalog information")
catalog = sc.catalog
print(catalog.listDatabases())
print(catalog.listTables())
print("show metadata of any table")
metadata = catalog.getTable("employee_tbl")
print(metadata.name)
print(metadata.database)
for column in catalog.listColumns("employee_tbl"):
print("Column name is " + column.name + " and column type is " + column.dataType)
print("drop the tables")
sc.sql("DROP TABLE IF EXISTS department_tbl")
sc.sql("DROP TABLE IF EXISTS employee_tbl")
print("\nstop the spark session")
sc.stop()
Run
./bin/spark-submit --master local[4] ./my-examples/python-spark-sql-02.py
Output
import SparkContext
Connect to spark cluster on local
define department dept_df schema
define department dept_df data
define employee emp_df schema
define employee emp_df data
create department dataframe
+------+---------------+
|deptid| deptname|
+------+---------------+
| 10|Human Resources|
| 20| Infrastructure|
| 30| Administration|
| 40| Governance|
+------+---------------+
create employee dataframe
+-----+---------+------+----------+------+
|empid| ename|deptid| doj|salary|
+-----+---------+------+----------+------+
| 1| Patrick| 10|2015-01-01| 1000|
| 2| Lisbon| 10|2016-01-01| 1500|
| 3| Cho| 20|2017-01-01| 800|
| 4| Rigsby| 20|2018-01-01| 200|
| 5| VanPelt| 30|2019-01-02| 8000|
| 6|Charlotte| 30|2020-01-01| 5000|
+-----+---------+------+----------+------+
DROP TABLE IF EXISTS department_tbl
write to hive table department_tbl
write to hive table employee_tbl
show all the catalog information
[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/opt/spark/spark-warehouse')]
[Table(name='department_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False), Table(name='employee_tbl', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False)]
show metadata of any table
employee_tbl
default
Column name is empid and column type is int
Column name is ename and column type is string
Column name is deptid and column type is int
Column name is doj and column type is date
Column name is salary and column type is decimal(10,0)
drop the tables
stop the spark session
On the base:
Pyspark — How to get list of databases and tables from spark catalog