• 乌拉圭绝杀埃及 48年来首次世界杯开门红  2019-05-19
  • 人社部:尽快实现基金统收统支的全国统筹模式 2019-05-19
  • 垫付医疗费无人偿付 敬老院起诉患者儿女 2019-05-19
  • 这些最平常的东西竟然最养人 滋补身体健康生活-美食资讯 2019-05-19
  • 世界杯频爆冷门网友大呼“天台见”!警方:不要冲动,不要跳楼 2019-05-18
  • 吕梁:女子被贴罚单心怀不满 朋友圈恶意辱警被查 2019-05-18
  • 民进陕西省委召开学习宣传《梁家河》座谈会 2019-05-18
  • 女性之声——全国妇联 2019-05-17
  • 重庆市公安局交通管理局 2019-05-17
  • 轩辕坛-聚焦汽车两会热点 2019-05-16
  • 《归去来》大结局 主角完成双重回归 2019-05-16
  • 女游客迷失深山 消防人员连夜搜救成功 2019-05-16
  • 新华国际时评:中国两会向世界传递三大信号 2019-05-15
  • 西藏的和平解放(下) 2019-05-15
  • 和尚为什么叫“和尚”? 2019-05-15
  • 网络空间安全:行业资讯、技术分享、法规研讨、趋势分析……

    “游侠安全网”创建了网络安全从业者QQ大群(群号:1255197) ,欢迎各位同仁加入!有其它问题如合作等,请联系站长“网路游侠”,QQ:55984512
    ?

    笨重超脱指什么生肖:【SPARK】使用Spark(jdbc)从MySQL读取和保存数据

    2016-01-07 21:32 推荐: 浏览: 4,805 views 字号:

    2018历史开码结果查询 www.amkxg.tw 摘要: 本文最初思路来自 //www.sparkexpert.com ,但是,发现 https://github.com/sujee81/SparkApps 提供的源码太老了,Spark官方从1.4.0已经放弃原来的方法(包含:createJDBCTable...

    本文最初思路来自 //www.sparkexpert.com ,但是,发现 https://github.com/sujee81/SparkApps 提供的源码太老了,Spark官方从1.4.0已经放弃原来的方法(包含:createJDBCTable,insertIntoJDBC等 ),取而代之的是 sqlContext.read().jdbc()和sqlContext.write().jdbc()方法。

    一、源码下载

    git clone https://github.com/jiekechoo/spark-jdbc-apps.git

    源代码目录如下,今天主要说明前面两个:

    spark-load-from-db:从数据库读取
    spark-save-to-db:保存到数据库
    spark-stats:下一篇文章介绍
    spark-jdbcrdd:下一篇文章介绍

    二、源码分析

    依赖包分析

    父项目pom,定义了共用组件slf4j,spark版本1.5.1,mysql5.1.32等

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="//maven.apache.org/POM/4.0.0" xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.sectong</groupId>
        <artifactId>spark-apps-parent</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <name>spark-apps-parent</name>
        <packaging>pom</packaging>
    
        <modules>
            <module>spark-jdbcrdd</module>
            <module>spark-load-from-db</module>
            <module>spark-save-to-db</module>
            <module>spark-stats</module>
        </modules>
        <dependencies>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.13</version>
            </dependency>
        </dependencies>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <spark.version>1.5.1</spark.version>
            <mysql.version>5.1.32</mysql.version>
        </properties>
    
    </project>

    保存到数据库spark-save-to-db

    依赖包,主要是spark-core和spark-sql,还有mysql驱动:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="//maven.apache.org/POM/4.0.0" xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>com.sectong</groupId>
            <artifactId>spark-apps-parent</artifactId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <artifactId>spark-save-to-db</artifactId>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.10</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.10</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.version}</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <compilerArgument>-Xlint:all</compilerArgument>
                        <showWarnings>true</showWarnings>
                        <showDeprecation>true</showDeprecation>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>

    看源码:

    package com.sectong;
    
    import java.io.Serializable;
    import java.util.Properties;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.SaveMode;
    
    public class Main implements Serializable {
    
        /**
         * 
         */
        private static final long serialVersionUID = -8513279306224995844L;
        private static final String MYSQL_USERNAME = "demo";
        private static final String MYSQL_PWD = "demo";
        private static final String MYSQL_CONNECTION_URL = "jdbc:mysql://192.168.1.91:3306/demo";
    
        private static final JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkSaveToDb").setMaster("local[*]"));
    
        private static final SQLContext sqlContext = new SQLContext(sc);
    
        public static void main(String[] args) {
            // Sample data-frame loaded from a JSON file
            DataFrame usersDf = sqlContext.read().json("users.json");
    
            // Save data-frame to MySQL (or any other JDBC supported databases)
            Properties connectionProperties = new Properties();
            connectionProperties.put("user", MYSQL_USERNAME);
            connectionProperties.put("password", MYSQL_PWD);
    
            // write dataframe to jdbc mysql
            usersDf.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "users", connectionProperties);
        }
    }

    我们为了写入数据方便测试,需要一个json文件,类似下方:

    {"id":994,"name":"Betty","email":"bsmithrl@simplemachines.org","city":"Eláteia","country":"Greece","ip":"9.19.204.44"},
    {"id":995,"name":"Anna","email":"alewisrm@canalblog.com","city":"Shangjing","country":"China","ip":"14.207.119.126"},
    {"id":996,"name":"David","email":"dgarrettrn@japanpost.jp","city":"Tsarychanka","country":"Ukraine","ip":"111.252.63.159"},
    {"id":997,"name":"Heather","email":"hgilbertro@skype.com","city":"Koilás","country":"Greece","ip":"29.57.181.250"},
    {"id":998,"name":"Diane","email":"ddanielsrp@statcounter.com","city":"Mapiripán","country":"Colombia","ip":"19.205.181.99"},
    {"id":999,"name":"Philip","email":"pfullerrq@reuters.com","city":"El Cairo","country":"Colombia","ip":"210.248.121.194"},
    {"id":1000,"name":"Maria","email":"mfordrr@shop-pro.jp","city":"Karabash","country":"Russia","ip":"224.21.41.52"}

    读取文件时,users.json需要与jar包在同一目录下,测试采用本地运行方式:

    DataFrame usersDf = sqlContext.read().json("users.json");

    其中,代码中的这行mode(SaveMode.Append)要特别注意,这个使得每次写入的数据是增加到数据表中。否则会一直提 示:Exception in thread “main” java.lang.RuntimeException: Table users already exists.

    usersDf.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "users", connectionProperties);

    打包,上传spark运行:

    /opt/spark/bin/spark-submit --class com.sectong.Main --driver-class-path mysql-connector-java-5.1.32.jar spark-save-to-db-1.0-SNAPSHOT.jar 

    结果就是下面这样了: 保存数据后结果

    从数据库读取spark-load-from-db

    依赖包与保存数据基本一致,不再说明。

    看源码:

    package com.sectong;
    
    import java.io.Serializable;
    import java.util.List;
    import java.util.Properties;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SQLContext;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class Main implements Serializable {
    
        /**
         * 
         */
        private static final long serialVersionUID = -8513279306224995844L;
    
        private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
    
        private static final String MYSQL_USERNAME = "demo";
        private static final String MYSQL_PWD = "demo";
        private static final String MYSQL_CONNECTION_URL = "jdbc:mysql://192.168.1.91:3306/demo";
    
        private static final JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcFromDb").setMaster("local[*]"));
    
        private static final SQLContext sqlContext = new SQLContext(sc);
    
        public static void main(String[] args) {
    
            Properties properties = new Properties();
            properties.put("user", MYSQL_USERNAME);
            properties.put("password", MYSQL_PWD);
            // Load MySQL query result as DataFrame
            DataFrame jdbcDF = sqlContext.read().jdbc(MYSQL_CONNECTION_URL, "users", properties);
    
            List<Row> employeeFullNameRows = jdbcDF.collectAsList();
    
            for (Row employeeFullNameRow : employeeFullNameRows) {
                LOGGER.info(employeeFullNameRow.toString());
            }
        }
    }

    读取MySQL数据,这行最关键:

    DataFrame jdbcDF = sqlContext.read().jdbc(MYSQL_CONNECTION_URL, "users", properties);

    再打印出来:

    List<Row> employeeFullNameRows = jdbcDF.collectAsList();
    for (Row employeeFullNameRow : employeeFullNameRows) {
                LOGGER.info(employeeFullNameRow.toString());
            }

    Spark运行程序,注意–driver-class-path mysql-connector-java-5.1.32.jar参数,需要把mysql-connector挂载上:

    /opt/spark/bin/spark-submit --class com.sectong.Main --driver-class-path mysql-connector-java-5.1.32.jar spark-load-from-db-1.0-SNAPSHOT.jar 

    中间运行省略了,把结果输出:

    2016-01-06 08:14:01[main] INFO  Main:43 - [Matriz de Camaragibe,Brazil,sgarciadp@nifty.com,494,39.244.171.48,Steven]
    2016-01-06 08:14:01[main] INFO  Main:43 - [Huarancante,Peru,njacksondq@si.edu,495,67.123.78.80,Nicholas]
    2016-01-06 08:14:01[main] INFO  Main:43 - [Zandak,Russia,sjonesdr@nbcnews.com,496,167.69.237.11,Sarah]
    2016-01-06 08:14:01[main] INFO  Main:43 - [Somovo,Russia,jgardnerds@nsw.gov.au,497,112.190.104.80,Judy]
    2016-01-06 08:14:01[main] INFO  Main:43 - [Huaping,China,calexanderdt@blinklist.com,498,79.242.142.206,Christine]
    2016-01-06 08:14:01[main] INFO  Main:43 - [Isulan,Philippines,wgomezdu@imdb.com,499,26.220.121.74,Wanda]
    2016-01-06 08:14:01[main] INFO  Main:43 - [Wujiayao,China,wleedv@latimes.com,500,26.104.219.178,Walter]
    2016-01-06 08:14:01[main] INFO  Main:43 - [Dongtou,China,hriveradw@skype.com,501,82.13.121.35,Henry]
    2016-01-06 08:14:01[Thread-3] INFO  SparkContext:59 - Invoking stop() from shutdown hook
    2016-01-06 08:14:01[Thread-3] INFO  ContextHandler:843 - stopped o.s.j.s.ServletContextHandler{/static/sql,null}
    2016-01-06 08:14:01[Thread-3] INFO  ContextHandler:843 - stopped o.s.j.s.ServletContextHandler{/SQL/execution/json,null}

    微信公众服务号:sectong

    原文地址: //blog.sectong.com/blog/spark_jdbc_load_save.html

    联系站长租广告位!
    ?
    中国首席信息安全官


    关闭


    2018历史开码结果查询
    关闭
  • 乌拉圭绝杀埃及 48年来首次世界杯开门红  2019-05-19
  • 人社部:尽快实现基金统收统支的全国统筹模式 2019-05-19
  • 垫付医疗费无人偿付 敬老院起诉患者儿女 2019-05-19
  • 这些最平常的东西竟然最养人 滋补身体健康生活-美食资讯 2019-05-19
  • 世界杯频爆冷门网友大呼“天台见”!警方:不要冲动,不要跳楼 2019-05-18
  • 吕梁:女子被贴罚单心怀不满 朋友圈恶意辱警被查 2019-05-18
  • 民进陕西省委召开学习宣传《梁家河》座谈会 2019-05-18
  • 女性之声——全国妇联 2019-05-17
  • 重庆市公安局交通管理局 2019-05-17
  • 轩辕坛-聚焦汽车两会热点 2019-05-16
  • 《归去来》大结局 主角完成双重回归 2019-05-16
  • 女游客迷失深山 消防人员连夜搜救成功 2019-05-16
  • 新华国际时评:中国两会向世界传递三大信号 2019-05-15
  • 西藏的和平解放(下) 2019-05-15
  • 和尚为什么叫“和尚”? 2019-05-15