Как читать и записывать данные таблицы в PySpark

Kak Citat I Zapisyvat Dannye Tablicy V Pyspark



Обработка данных в PySpark происходит быстрее, если данные загружаются в виде таблицы. При этом, используя выражения SQL, обработка будет быстрой. Таким образом, преобразование PySpark DataFrame/RDD в таблицу перед отправкой на обработку является лучшим подходом. Сегодня мы увидим, как читать данные таблицы в PySpark DataFrame, записывать PySpark DataFrame в таблицу и вставлять новый DataFrame в существующую таблицу с помощью встроенных функций. Пойдем!

Pyspark.sql.DataFrameWriter.saveAsTable()

Сначала мы увидим, как записать существующий кадр данных PySpark в таблицу с помощью функции write.saveAsTable(). Для записи DataFrame в таблицу требуется имя таблицы и другие необязательные параметры, такие как режимы, partionBy и т. д. Хранится в виде паркетного файла.

Синтаксис:







dataframe_obj.write.saveAsTable(путь/имя_таблицы,режим,partitionBy,…)
  1. Table_name — это имя таблицы, созданной из dataframe_obj.
  2. Мы можем добавить/перезаписать данные таблицы, используя параметр режима.
  3. PartitionBy использует один или несколько столбцов для создания разделов на основе значений в этих предоставленных столбцах.

Пример 1:

Создайте PySpark DataFrame с 5 строками и 4 столбцами. Запишите этот кадр данных в таблицу с именем «Agri_Table1».



импортировать pyspark

из pyspark.sql импортировать SparkSession

linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()

# фермерские данные с 5 строками и 5 столбцами

сельское хозяйство =[{ 'Тип_почвы' : «Черный» , «Ирригация_доступность» : 'Нет' , 'Акры' : 2500 , 'Soil_status' : 'Сухой' ,
'Страна' : 'США' },

{ 'Тип_почвы' : «Черный» , «Ирригация_доступность» : 'Да' , 'Акры' : 3500 , 'Soil_status' : 'Влажный' ,
'Страна' : 'Индия' },

{ 'Тип_почвы' : 'Красный' , «Ирригация_доступность» : 'Да' , 'Акры' : 210 , 'Soil_status' : 'Сухой' ,
'Страна' : 'ВЕЛИКОБРИТАНИЯ' },

{ 'Тип_почвы' : 'Другой' , «Ирригация_доступность» : 'Нет' , 'Акры' : 1000 , 'Soil_status' : 'Влажный' ,
'Страна' : 'США' },

{ 'Тип_почвы' : 'Песок' , «Ирригация_доступность» : 'Нет' , 'Акры' : 500 , 'Soil_status' : 'Сухой' ,
'Страна' : 'Индия' }]



# создаем фрейм данных из приведенных выше данных

agri_df = linuxhint_spark_app.createDataFrame (агри)

agri_df.show()

# Записываем указанный выше DataFrame в таблицу.

agri_df.coalesce( 1 ).write.saveAsTable( 'Сельское хозяйство_Таблица1' )

Выход:







Мы видим, что один файл паркета создается с предыдущими данными PySpark.



Пример 2:

Рассмотрим предыдущий DataFrame и запишите «Agri_Table2» в таблицу, разделив записи на основе значений в столбце «Страна».

# Записываем указанный выше DataFrame в таблицу с параметром partitionBy

agri_df.write.saveAsTable( 'Агро_Таблица2' ,partitionBy=[ 'Страна' ])

Выход:

В столбце «Страна» есть три уникальных значения — «Индия», «Великобритания» и «США». Итак, три раздела созданы. Каждый раздел содержит паркетные файлы.

Pyspark.sql.DataFrameReader.table()

Давайте загрузим таблицу в PySpark DataFrame, используя функцию spark.read.table(). Он принимает только один параметр, который является путем/именем таблицы. Он напрямую загружает таблицу в PySpark DataFrame, и все функции SQL, которые применяются к PySpark DataFrame, также могут применяться к этому загруженному DataFrame.

Синтаксис:

spark_app.read.table (путь/имя_таблицы)

В этом сценарии мы используем предыдущую таблицу, созданную из PySpark DataFrame. Убедитесь, что вам нужно реализовать фрагменты кода предыдущего сценария в вашей среде.

Пример:

Загрузите таблицу «Agri_Table1» в DataFrame с именем «loaded_data».

загруженные_данные = linuxhint_spark_app.read.table( 'Agri_Table1' )

загруженные_данные.show()

Выход:

Мы видим, что таблица загружается в PySpark DataFrame.

Выполнение запросов SQL

Теперь мы выполняем некоторые SQL-запросы к загруженному DataFrame, используя функцию spark.sql().

# Используйте команду SELECT, чтобы отобразить все столбцы из приведенной выше таблицы.

linuxhint_spark_app.sql( 'ВЫБЕРИТЕ * из Agri_Table1' ).показывать()

# ГДЕ Предложение

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry'' ).показывать()

linuxhint_spark_app.sql( 'ВЫБЕРИТЕ * из Agri_Table1, ГДЕ Акры > 2000' ).показывать()

Выход:

  1. Первый запрос отображает все столбцы и записи из DataFrame.
  2. Второй запрос отображает записи на основе столбца «Soil_status». Всего три записи с элементом «Сухой».
  3. Последний запрос возвращает две записи с «Акрами», превышающими 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Используя функцию insertInto(), мы можем добавить DataFrame в существующую таблицу. Мы можем использовать эту функцию вместе с selectExpr(), чтобы определить имена столбцов, а затем вставить их в таблицу. Эта функция также принимает имя таблицы в качестве параметра.

Синтаксис:

DataFrame_obj.write.insertInto('Table_name')

В этом сценарии мы используем предыдущую таблицу, созданную из PySpark DataFrame. Убедитесь, что вам нужно реализовать фрагменты кода предыдущего сценария в вашей среде.

Пример:

Создайте новый DataFrame с двумя записями и вставьте их в таблицу «Agri_Table1».

импортировать pyspark

из pyspark.sql импортировать SparkSession

linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()

# фермерские данные с 2 строками

сельское хозяйство =[{ 'Тип_почвы' : 'Песок' , «Ирригация_доступность» : 'Нет' , 'Акры' : 2500 , 'Soil_status' : 'Сухой' ,
'Страна' : 'США' },

{ 'Тип_почвы' : 'Песок' , «Ирригация_доступность» : 'Нет' , 'Акры' : 1200 , 'Soil_status' : 'Влажный' ,
'Страна' : 'Япония' }]

# создаем фрейм данных из приведенных выше данных

agri_df2 = linuxhint_spark_app.createDataFrame (агро)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Акры' , 'Страна' , 'Ирригация_доступность' , 'Тип_почвы' ,
'Состояние_почвы' ).write.insertInto( 'Сельское хозяйство_Таблица1' )

# Показать окончательный Agri_Table1

linuxhint_spark_app.sql( 'ВЫБЕРИТЕ * из Agri_Table1' ).показывать()

Выход:

Теперь общее количество строк, присутствующих в DataFrame, равно 7.

Заключение

Теперь вы понимаете, как записать кадр данных PySpark в таблицу с помощью функции write.saveAsTable(). Он принимает имя таблицы и другие необязательные параметры. Затем мы загрузили эту таблицу в PySpark DataFrame с помощью функции spark.read.table(). Он принимает только один параметр, который является путем/именем таблицы. Если вы хотите добавить новый DataFrame в существующую таблицу, используйте функцию insertInto().