В этом руководстве мы в основном сосредоточимся на чтении/загрузке файла паркета в PySpark DataFrame/SQL с использованием функции read.parquet(), доступной в классе pyspark.sql.DataFrameReader.
Тема содержания:
Считайте файл Parquet в PySpark DataFrame
Прочитайте файл Parquet в PySpark SQL
Pyspark.sql.DataFrameReader.parquet()
Эта функция используется для чтения файла паркета и его загрузки в PySpark DataFrame. Он принимает путь/имя файла паркета. Мы можем просто использовать функцию read.parquet(), так как это общая функция.
Синтаксис:
Давайте посмотрим на синтаксис read.parquet():
spark_app.read.parquet(имя_файла.parquet/путь)Сначала установите модуль PySpark с помощью команды pip:
pip установить pyspark
Получить файл паркета
Чтобы прочитать файл паркета, вам нужны данные, в которых файл паркета генерируется из этих данных. В этой части мы увидим, как создать файл паркета из PySpark DataFrame.
Давайте создадим PySpark DataFrame с 5 записями и запишем это в файл паркета «industry_parquet».
импортировать pysparkиз pyspark.sql импортировать SparkSession, Row
linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()
# создаем фрейм данных, в котором хранится информация об отрасли
Industry_df = linuxhint_spark_app.createDataFrame([Строка(Тип= 'Сельское хозяйство' , Площадь = 'США' ,
Рейтинг= 'Горячий' ,Всего_сотрудников= 100 ),
Строка (Тип = 'Сельское хозяйство' , Площадь = 'Индия' ,Рейтинг= 'Горячий' ,Всего_сотрудников= 200 ),
Строка (Тип = 'Разработка' , Площадь = 'США' ,Рейтинг= 'Теплый' ,Всего_сотрудников= 100 ),
Строка (Тип = 'Образование' , Площадь = 'США' ,Рейтинг= 'Прохладный' ,Всего_сотрудников= 400 ),
Строка (Тип = 'Образование' , Площадь = 'США' ,Рейтинг= 'Теплый' ,Всего_сотрудников= двадцать )
])
# Фактический кадр данных
Industry_df.show ()
# Записываем industry_df в файл паркета
Industry_df.coalesce( 1 ).написать.паркет( 'индустрия_паркет' )
Выход:
Это DataFrame, который содержит 5 записей.
Файл паркета создается для предыдущего кадра данных. Здесь имя нашего файла с расширением «part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet». Мы используем этот файл во всем руководстве.
Считайте файл Parquet в PySpark DataFrame
У нас есть паркетная доска. Давайте прочитаем этот файл с помощью функции read.parquet() и загрузим его в PySpark DataFrame.
импортировать pysparkиз pyspark.sql импортировать SparkSession, Row
linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()
# Прочитать файл паркета в объект dataframe_from_parquet.
dataframe_from_parquet = linuxhint_spark_app.read.parquet ( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Показать dataframe_from_parquet-DataFrame
dataframe_from_parquet.show()
Выход:
Мы отображаем DataFrame, используя метод show(), который был создан из файла паркета.
SQL-запросы с файлом Parquet
После загрузки в DataFrame можно создать таблицы SQL и отобразить данные, присутствующие в DataFrame. Нам нужно создать ВРЕМЕННОЕ ПРЕДСТАВЛЕНИЕ и использовать команды SQL для возврата записей из DataFrame, созданного из файла паркета.
Пример 1:
Создайте временное представление с именем «Секторы» и используйте команду SELECT для отображения записей в DataFrame. Вы можете обратиться к этому руководство это объясняет, как создать VIEW в Spark — SQL.
импортировать pysparkиз pyspark.sql импортировать SparkSession, Row
linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()
# Прочитать файл паркета в объект dataframe_from_parquet.
dataframe_from_parquet = linuxhint_spark_app.read.parquet ( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Создаем вид из указанного выше файла паркета с именем - 'Сектора'
dataframe_from_parquet.createOrReplaceTempView( «Сектора» )
# Запрос для отображения всех записей из секторов
linuxhint_spark_app.sql( 'выбрать * из секторов' ).показывать()
Выход:
Пример 2:
Используя предыдущий VIEW, напишите запрос SQL:
- Чтобы отобразить все записи из секторов, принадлежащих «Индии».
- Чтобы отобразить все записи из секторов с сотрудником, который больше 100.
linuxhint_spark_app.sql( 'выберите * из секторов, где Area='Индия'' ).показывать()
# Запрос для отображения всех записей из секторов с числом сотрудников более 100
linuxhint_spark_app.sql( 'выберите * из секторов, где Total_employees > 100' ).показывать()
Выход:
Есть только одна запись с областью «Индия» и две записи с количеством сотрудников больше 100.
Прочитайте файл Parquet в PySpark SQL
Во-первых, нам нужно создать ВИД с помощью команды CREATE. Используя ключевое слово «путь» в SQL-запросе, мы можем прочитать файл паркета в Spark SQL. После пути нам нужно указать имя файла/местоположение файла.
Синтаксис:
spark_app.sql( 'СОЗДАТЬ ВРЕМЕННЫЙ ВИД view_name ИСПОЛЬЗУЯ ПАРАМЕТРЫ паркета (путь ' имя_файла.parquet ')' )Пример 1:
Создайте временное представление с именем «Sector2» и прочитайте в него файл паркета. Используя функцию sql(), напишите запрос выбора для отображения всех записей, присутствующих в представлении.
импортировать pysparkиз pyspark.sql импортировать SparkSession, Row
linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()
# Прочитать файл паркета в Spark-SQL
linuxhint_spark_app.sql( 'СОЗДАТЬ ВРЕМЕННЫЙ ВИД Сектор2, ИСПОЛЬЗУЯ ПАРАМЕТРЫ паркета (путь ' часть-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.моментальный.паркет ')' )
# Запрос для отображения всех записей из Sector2
linuxhint_spark_app.sql( 'выбрать * из Сектора2' ).показывать()
Выход:
Пример 2:
Используйте предыдущий VIEW и напишите запрос для отображения всех записей с рейтингом «Hot» или «Cool».
# Запрос для отображения всех записей из Sector2 с рейтингом Hot или Cool.linuxhint_spark_app.sql( 'выберите * из Sector2, где Rating='Hot' OR Rating='Cool'' ).показывать()
Выход:
Есть три записи с рейтингом «Горячий» или «Крутой».
Заключение
В PySpark функция write.parquet() записывает кадр данных в файл паркета. Функция read.parquet() считывает файл паркета в PySpark DataFrame или любой другой источник данных. Мы научились читать файл паркета в PySpark DataFrame и в таблицу PySpark. В рамках этого руководства мы также обсудили, как создавать таблицы из фрейма данных PySpark и фильтровать данные с помощью предложения WHERE.