[spark] 将dataframe中的数据插入到mysql

news/2024/7/7 23:32:03

文章目录

  • 分区写入 `foreachPartition`
  • 直接写入 `write.jdbc()`
  • 有没有插入成功
  • 在插入时记录行数 `累加器`

分区写入 foreachPartition

在Spark中,你可以使用foreachPartitionforeach来将DataFrame中的数据插入到MySQL数据库。以下是一个基本的Scala代码示例,假设你已经创建了一个SparkSession并加载了你的DataFrame:

import org.apache.spark.sql.{Row, SparkSession}
import java.sql.{Connection, DriverManager, PreparedStatement}

object SparkToMySQLExample {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder
      .appName("SparkToMySQLExample")
      .getOrCreate()

    // 从数据源创建 DataFrame,这里假设你已经有了一个 DataFrame,用 df 表示
    val df = // ... your DataFrame creation logic ...

    // 定义 MySQL 连接信息
    val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your-database"
    val jdbcUsername = "your-username"
    val jdbcPassword = "your-password"

    // 定义 MySQL 表名
    val tableName = "your-table"

    // 定义插入数据的 SQL 语句
    val insertQuery = s"INSERT INTO $tableName (column1, column2, ...) VALUES (?, ?, ...)"

    // 将 DataFrame 的数据插入到 MySQL
    df.foreachPartition { partition =>
      // 在每个分区上建立一个数据库连接
      Class.forName("com.mysql.jdbc.Driver")
      val connection: Connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)

      // 遍历分区中的每一行数据并执行插入操作
      val preparedStatement: PreparedStatement = connection.prepareStatement(insertQuery)
      partition.foreach { row =>
        // 根据你的 DataFrame 列的顺序设置参数
        preparedStatement.setString(1, row.getString(0))
        preparedStatement.setInt(2, row.getInt(1))
        // ... 设置其他参数 ...

        // 执行插入操作
        preparedStatement.executeUpdate()
      }

      // 关闭连接和声明
      preparedStatement.close()
      connection.close()
    }

    // 停止 SparkSession
    spark.stop()
  }
}

请替换示例中的your-mysql-hostyour-databaseyour-usernameyour-passwordyour-table以及列名(column1column2等)等信息为你实际使用的值。在实际应用中,请确保数据库连接信息和表结构是正确的,并根据你的数据和表结构调整插入逻辑。

此外,确保你的 Spark 应用程序能够访问 MySQL 驱动程序。你可能需要在启动 Spark 时包含 MySQL 驱动程序的 JAR 文件。

直接写入 write.jdbc()

DataFrame.write.jdbc() 是 Spark 提供的一种更方便的方式,用于将 DataFrame 中的数据写入关系型数据库。这个方法封装了连接数据库、创建表以及插入数据的整个过程,提供了一种更简洁和易用的接口。

与使用 foreachforeachPartition 直接进行数据插入相比,使用 write.jdbc() 有以下优点:

  1. 简洁性和易用性: write.jdbc() 方法抽象了底层的数据库连接和数据插入逻辑,使得代码更加简洁易读。你只需要提供数据库连接信息、表名和DataFrame即可,而不需要手动处理连接、预处理语句等细节。

  2. 性能优化: Spark 内部会进行一些优化,例如分区数据、并行写入等,以提高插入性能。write.jdbc() 方法会在背后进行一些优化,而手动编写 foreachforeachPartition 可能需要更多的手动调整以实现最佳性能。

下面是使用 write.jdbc() 的简单示例:

import org.apache.spark.sql.{SparkSession, SaveMode}

object SparkToMySQLExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("SparkToMySQLExample")
      .getOrCreate()

    val df = // ... your DataFrame creation logic ...

    val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your-database"
    val jdbcUsername = "your-username"
    val jdbcPassword = "your-password"
    val tableName = "your-table"

    df.write
      .mode(SaveMode.Append)  // 保存模式,可以选择 Append、Overwrite、ErrorIfExists 或 Ignore
      .jdbc(jdbcUrl, tableName, new java.util.Properties() {
        put("user", jdbcUsername)
        put("password", jdbcPassword)
      })

    spark.stop()
  }
}

在这个例子中,.mode(SaveMode.Append) 表示将数据追加到现有表中。你可以根据需要选择不同的保存模式,例如覆盖现有表,如果表不存在则创建新表等。

总体来说,如果你的目标是将 DataFrame 中的数据写入关系型数据库,推荐使用 write.jdbc() 方法,因为它更容易使用且通常会有更好的性能。

有没有插入成功

在使用df.write.mode(SaveMode.Append).jdbc()方式插入数据时,你可以利用Spark的Action操作触发数据写入,并检查写入操作是否成功。write 操作属于Spark的Transformation,它不会立即执行,而是在遇到一个触发执行的Action操作时才实际执行。

在Spark中,一些典型的Action操作包括 count()collect() 等,它们会触发 Spark 作业的执行。

以下是一个简单的示例,演示如何在写入数据后使用 count() 来验证插入是否成功:

import org.apache.spark.sql.{SparkSession, SaveMode}

object SparkToMySQLExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("SparkToMySQLExample")
      .getOrCreate()

    val df = // ... your DataFrame creation logic ...

    val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your-database"
    val jdbcUsername = "your-username"
    val jdbcPassword = "your-password"
    val tableName = "your-table"

    // 写入数据
    df.write
      .mode(SaveMode.Append)
      .jdbc(jdbcUrl, tableName, new java.util.Properties() {
        put("user", jdbcUsername)
        put("password", jdbcPassword)
      })

    // 触发写入操作后,使用 count() 来验证插入是否成功
    val rowCount = spark.read.jdbc(jdbcUrl, tableName, new java.util.Properties() {
      put("user", jdbcUsername)
      put("password", jdbcPassword)
    }).count()

    println(s"Number of rows in the table after insertion: $rowCount")

    spark.stop()
  }
}

在这个例子中,我们使用 spark.read.jdbc 读取插入后的表,并使用 count() 操作来获取表中的行数。如果插入成功,你应该能够看到插入前后的行数有所增加。

请注意,这种方法有一个缺点,即每次插入后都需要读取整个表,可能会导致性能问题。在生产环境中,可以考虑使用更高效的方法,例如通过其他手段检查数据库中的行数,或者在插入数据时记录插入的行数,并在Spark中进行验证。

在插入时记录行数 累加器

在Spark中,你可以使用foreachPartitionforeach操作,结合累加器(Accumulator)来记录插入的行数。累加器是一种分布式变量,可以在任务之间共享和累加值。以下是一个简单的示例,演示如何在Spark中记录插入的行数:

import org.apache.spark.sql.{Row, SparkSession}
import java.sql.{Connection, DriverManager, PreparedStatement}

object SparkInsertWithRowCounter {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("SparkInsertWithRowCounter")
      .getOrCreate()

    val df = // ... your DataFrame creation logic ...

    // 定义累加器
    val rowCounter = spark.sparkContext.longAccumulator("rowCounter")

    // 定义 MySQL 连接信息
    val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your-database"
    val jdbcUsername = "your-username"
    val jdbcPassword = "your-password"
    val tableName = "your-table"

    // 定义插入数据的 SQL 语句
    val insertQuery = s"INSERT INTO $tableName (column1, column2, ...) VALUES (?, ?, ...)"

    // 将 DataFrame 的数据插入到 MySQL,并在插入时累加行数
    df.foreachPartition { partition =>
      // 在每个分区上建立一个数据库连接
      Class.forName("com.mysql.jdbc.Driver")
      val connection: Connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)

      // 遍历分区中的每一行数据并执行插入操作
      val preparedStatement: PreparedStatement = connection.prepareStatement(insertQuery)
      partition.foreach { row =>
        // 根据你的 DataFrame 列的顺序设置参数
        preparedStatement.setString(1, row.getString(0))
        preparedStatement.setInt(2, row.getInt(1))
        // ... 设置其他参数 ...

        // 执行插入操作
        preparedStatement.executeUpdate()

        // 累加行数
        rowCounter.add(1)
      }

      // 关闭连接和声明
      preparedStatement.close()
      connection.close()
    }

    // 打印插入的总行数
    println(s"Total rows inserted: ${rowCounter.value}")

    // 停止 SparkSession
    spark.stop()
  }
}

在这个例子中,我们创建了一个名为 rowCounter 的累加器,并在插入数据时使用 rowCounter.add(1) 来累加行数。最后,通过 rowCounter.value 获取累加的总行数,并在Spark应用程序中进行验证。

确保替换示例中的 your-mysql-hostyour-databaseyour-usernameyour-passwordyour-table 以及列名(column1column2等)等信息为你实际使用的值。


http://lihuaxi.xjx100.cn/news/1923134.html

相关文章

机器学习(三) -- 特征工程(更新中)

系列文章目录 未完待续…… 目录 系列文章目录 前言 tips:这里只是总结,不是教程哈。 “***”开头的是给好奇心重的宝宝看的,其实不太重要可以跳过。 此处以下所有内容均为暂定,因为我还没找到一个好的,让小白&…

黑客(网络安全)技术30天速成

1.网络安全是什么 网络安全可以基于攻击和防御视角来分类,我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术,而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 2.网络安全市场 一、是市场需求量高; 二、则是发展相对成熟…

Docker六 | Docker Compose容器编排

目录 Docker Compose 基本概念 使用步骤 常用命令 Docker Compose Docker-Compose是Docker官方的开源项目,负责实现对Docker容器集群的快速编排。Compose可以管理多个Docker容器组成一个应用。 需要定义一个YAML格式的配置文件docker-compose.yml,…

Java EE Servlet之第一个Servlet代码

文章目录 1. Servlet 是什么2. 第一个Servlet 代码2.1 创建一个项目2.2 引入依赖2.3 创建目录2.4 编写代码2.5 打包程序2.6 部署程序2.7 验证程序2.8 更新代码 3. 访问出错了怎么办?3.1 出现 4043.2 出现 4053.3 出现 5003.4 出现空白界面3.5 出现“无法访问此网站”…

数组的声明

概要&#xff1a; 数组的声明分为三个部分 第一部分&#xff1a;数组中元素的数据类型 第二部分&#xff1a;数组名 第三部分&#xff1a;数组标识符(方括号)和数组大小 一、测试代码 #include<stdio.h> int main() {int arr_int[10];char* arr_str[10];arr_in…

Redis主从切换(单点故障)解决源码

1、使用过程&#xff1a; 发布创建channel1消息 redis-cli> PUBLISH channel1 "Hello, world!" redis-cli> SUBSCRIBE channel1 优点&#xff1a; 1、采用Reactor事件单线程去驱动发布订阅事件的&#xff0c;实时性高。 2、从redis架构去思考&#xff0c;拓展哨…

Innosetup 调用c# dll 和 c# dll的函数导出

目标需求&#xff0c;基于现在安装包脚本。需要在用户安装和卸载成功时。进行数据记录,所以需要调用c#dll 主要涉及到的知识点 需要理解脚本的文件使用机制脚本的文件dll加载&#xff0c;和dll的调用c# dll的制作&#xff0c;和工具的使用 下面具体介绍 脚本的文件dll加载&…

【Python排序算法系列】—— 冒泡排序

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 &#x1f4ab;个人格言:"没有罗马,那就自己创造罗马~" 冒泡排序 过程演示&#xff1a; 做题实际遇到的图像是横向的&#xff0c;但是它位置变化和纵向是一样…