Sharding-JDBC-5.0.0 实现按月分表、自动建表、自动刷新节点

news/2024/7/5 2:51:01

1、引入Maven 依赖

        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
            <version>5.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.2.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.tomcat</groupId>
            <artifactId>tomcat-dbcp</artifactId>
            <version>10.0.16</version>
        </dependency>

2、yml 配置文件

spring:
  sharding-sphere:
    datasource:
      names: master
      master:
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-name: org.postgresql.Driver
        url: jdbc:postgresql://127.0.0.1:5432/production_dev?serverTimezone=UTC&characterEncoding=utf-8&stringtype=unspecified
        username: postgres
        password: 123456
    rules:
      sharding:
        sharding-algorithms:
          month-sharding-algorithm:
            props:
              strategy: standard
              algorithmClassName: com.base.shading.CreateTimeShardingAlgorithm
            type: CLASS_BASED
        tables:
          work_procedure:
            actual-data-nodes: master.work_procedure_${2023..2100}_0${1..9},master.work_procedure_${2023..2100}_${10..12}
            table-strategy:
              standard:
                sharding-column: create_time
                sharding-algorithm-name: month-sharding-algorithm
          work_result:
            actual-data-nodes: master.work_result_${2023..2100}_0${1..9},master.work_result_${2023..2100}_${10..12}
            table-strategy:
              standard:
                sharding-column: create_time
                sharding-algorithm-name: month-sharding-algorithm
        bindingTables:
          - work_procedure, work_result
    props:
      sql-show: true

3、分片算法类 CreateTimeShardingAlgorithm.java

package com.base.shading;

import com.google.common.collect.Range;
import org.apache.shardingsphere.sharding.api.sharding.standard.PreciseShardingValue;
import org.apache.shardingsphere.sharding.api.sharding.standard.RangeShardingValue;
import org.apache.shardingsphere.sharding.api.sharding.standard.StandardShardingAlgorithm;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;

public class CreateTimeShardingAlgorithm implements StandardShardingAlgorithm<Date> {

        private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");


    /**
     * 精准分片
     * @param collection 对应分片库中所有分片表的集合
     * @param preciseShardingValue 分片键值,其中 logicTableName 为逻辑表,columnName 分片键,value 为从 SQL 中解析出来的分片键的值
     * @return 表名
     */
    @Override
    public String doSharding(Collection<String> collection, PreciseShardingValue<Date> preciseShardingValue) {
        Object value = preciseShardingValue.getValue();
        String tableSuffix = null;
        if(value instanceof Date){
            //Date columnValue = (Date) value;
            //tableSuffix = columnValue.toInstant().atZone(ZoneId.systemDefault()).format(DateTimeFormatter.ofPattern("yyyy_MM"));
            LocalDate localDate = ((Date) value).toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
            tableSuffix = localDate.format(DateTimeFormatter.ofPattern("yyyy_MM"));
        }else{
            String column = (String)value;
            tableSuffix = LocalDateTime.parse(column, formatter).format(DateTimeFormatter.ofPattern("yyyy_MM"));
        }
        String logicTableName = preciseShardingValue.getLogicTableName();
        String actualTableName = logicTableName.concat("_").concat(tableSuffix);
        return ShardingAlgorithmTool.shardingTablesCheckAndCreatAndReturn(logicTableName, actualTableName);
    }

    /**
     * 范围分片
     * @param collection 对应分片库中所有分片表的集合
     * @param rangeShardingValue 分片范围
     * @return 表名集合
     */
    @Override
    public Collection<String> doSharding(Collection<String> collection, RangeShardingValue<Date> rangeShardingValue) {
        // 逻辑表名
        String logicTableName = rangeShardingValue.getLogicTableName();
        // 范围参数
        Range<Date> valueRange = rangeShardingValue.getValueRange();
        //起始时间  结束时间
        LocalDateTime start = null;
        LocalDateTime end = null;
        Object lowerEndpoint = (Object)valueRange.lowerEndpoint();
        Object upperEndpoint = (Object)valueRange.upperEndpoint();
        if(lowerEndpoint instanceof  String){
            String lower = (String) lowerEndpoint;
            String upper = (String) upperEndpoint;
            start = LocalDateTime.parse(lower,formatter);
            end = LocalDateTime.parse(upper,formatter);
        }else{
             start = valueRange.lowerEndpoint().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
             end = valueRange.upperEndpoint().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
        }
        if(end.isAfter(LocalDateTime.now())){
            end = LocalDateTime.now();
        }
        // 查询范围的表
        Set<String> queryRangeTables = extracted(logicTableName, start, end);
        // 数据库中的表
        HashSet<String> tableNameSet = ShardingAlgorithmTool.cacheTableNames();
        //返回的表
        ArrayList<String> tables = new ArrayList<>(collection);
        tables.retainAll(queryRangeTables);
        tables.retainAll(tableNameSet);
        return tables;
    }

    @Override
    public String getType() {
        return null;
    }

    @Override
    public Properties getProps() {
        return null;
    }

    @Override
    public void init() {

    }

    /**
     * 根据范围计算表名
     *
     * @param logicTableName 逻辑表名
     * @param lowerEndpoint 范围起点
     * @param upperEndpoint 范围终端
     * @return 物理表名集合
     */
    private Set<String> extracted(String logicTableName, LocalDateTime lowerEndpoint, LocalDateTime upperEndpoint) {
        Set<String> rangeTable = new HashSet<>();
        while (lowerEndpoint.isBefore(upperEndpoint)) {
            String str = getTableNameByDate(lowerEndpoint, logicTableName);
            rangeTable.add(str);
            lowerEndpoint = lowerEndpoint.plusMonths(1);
        }
        // 获取物理表名
        String tableName = getTableNameByDate(upperEndpoint, logicTableName);
        rangeTable.add(tableName);
        return rangeTable;
    }
    /**
     * 根据日期获取表名
     * @param dateTime 日期
     * @param logicTableName 逻辑表名
     * @return 物理表名
     */
    private String getTableNameByDate(LocalDateTime dateTime, String logicTableName) {
        String tableSuffix = dateTime.format(DateTimeFormatter.ofPattern("yyyy_MM"));
        return logicTableName.concat("_").concat(tableSuffix);
    }
}

4、 分片工具类  ShardingAlgorithmTool.java

package com.base.shading;

import com.base.utils.ApplicationContextUtil;
import lombok.extern.slf4j.Slf4j;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;

@Slf4j
public class ShardingAlgorithmTool {
    private static final HashSet<String> tableNameCache = new HashSet<>();

    /**
     * 获取所有表名
     * @return 表名集合
     */
    public static List<String> getAllTableNameBySchema() {
        List<String> tableNames = new ArrayList<>();
        String sql = "SELECT tablename FROM pg_tables  WHERE schemaname = 'public' AND tablename ~ 'work_procedure_\\d{4}_\\d{2}$' OR tablename ~ 'work_result_\\d{4}_\\d{2}$';";
        DataSource dataSource = ApplicationContextUtil.getBean(DataSource.class);
        try (Connection connection = dataSource.getConnection()) {
            Statement statement = connection.createStatement();
            try (ResultSet rs = statement.executeQuery(sql)) {
                while (rs.next()) {
                    String actualTableName = rs.getString(1);
                    tableNames.add(actualTableName);

                }
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
        return tableNames;
    }


    /**
     * 判断 分表获取的表名是否存在 不存在则自动建表
     *
     * @param logicTableName  逻辑表名(表头)
     * @param actualTableName 真实表名
     * @return 确认存在于数据库中的真实表名
     */
    public static String shardingTablesCheckAndCreatAndReturn(String logicTableName, String actualTableName) {
        synchronized (logicTableName.intern()) {
            // 缓存中有此表 返回
            if (tableNameCache.contains(actualTableName)) {
                return actualTableName;
            }
            String sql="CREATE TABLE "+ actualTableName +" (LIKE "+ logicTableName +" INCLUDING CONSTRAINTS);";
            // 缓存中无此表,则建表并添加缓存
            DataSource dataSource = ApplicationContextUtil.getBean(DataSource.class);
            try (Connection connection = dataSource.getConnection()) {
                try (Statement statement = connection.createStatement()) {
                    statement.executeUpdate(sql);
                }catch (SQLException e){
                    throw new RuntimeException(e);
                }
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }

            // 缓存重载
            tableNameCacheReload();
            //添加缓存
            //tableNameCache.add(resultTableName);

        }
        return actualTableName;
    }

    /**
     * 缓存重载方法
     */
    public static void tableNameCacheReload() {
        // 读取数据库中所有表名
        List<String> tableNameList = getAllTableNameBySchema();
        // 删除旧的缓存(如果存在)
        ShardingAlgorithmTool.tableNameCache.clear();
        // 写入新的缓存
        ShardingAlgorithmTool.tableNameCache.addAll(tableNameList);
    }

    /**
     * 获取缓存中的表名
     * @return
     */
    public static HashSet<String> cacheTableNames() {
        return tableNameCache;
    }

}

5. 初始化缓存类 ShardingTablesLoadRunner.java

package com.base.shading;

import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@Order(value = 1) // 数字越小,越先执行
@Component
public class ShardingTablesLoadRunner implements CommandLineRunner {

    @Override
    public void run(String... args) {
        // 读取已有分表,进行缓存
        ShardingAlgorithmTool.tableNameCacheReload();
    }
}

6. Spring工具类 ApplicationContextUtil .java

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ApplicationContextUtil implements ApplicationContextAware {

    private static ApplicationContext applicationContext = null;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if(ApplicationContextUtil.applicationContext == null) {
            ApplicationContextUtil.applicationContext = applicationContext;
        }
    }

    public static ApplicationContext getApplicationContext() {
        return ApplicationContextUtil.applicationContext;
    }

    public static <T> T getBean(Class<T> clazz) {
        return applicationContext.getBean(clazz);
    }

    public static <T> T getBean(String name, Class<T> clazz) {
        return applicationContext.getBean(name, clazz);
    }

    public static String getProperty(String key) {
        return applicationContext.getBean(Environment.class).getProperty(key);
    }
    public static Object getBean(String name){
        return applicationContext.getBean(name);
    }
}

7、注意

(1)SQL中where后面的查询条件包含分表规则中配置的sharding-column字段才会进入,定义的algorithmClassName分片算法类中。

(2)自定义SQL中表名和sharding-column字段最好不要用双引号包括。

(3)IDEA中yml配置文件中有些属性显示红色,也不要紧可以识别。

(4)修改查询中没有出现sharding-column字段的查询。


http://lihuaxi.xjx100.cn/news/1717105.html

相关文章

SpringBoot 全局请求拦截

方法一 在Spring Boot中&#xff0c;可以使用拦截器&#xff08;Interceptor&#xff09;来实现全局请求拦截。示例&#xff1a; 首先&#xff0c;创建一个拦截器类&#xff0c;实现HandlerInterceptor接口&#xff1a; import javax.servlet.http.HttpServletRequest; impo…

Java并发线程池原理源码深入分析与调优实战

一&#xff0c;开篇&#xff1a; java中提供了多线程设计的Api&#xff0c;为什么还要用线程池呢&#xff1f; 下来看两个例子&#xff1a; 1. 使用多线程跑十万次 2. 使用线程池跑十万次 使用多线程跑十万次 package com.laoyang.ThreadPool.公开课;import java.util.ArrayL…

2023-10学习笔记

1.sql注入 不管是上一篇博客&#xff0c;通过java代码执行sql 还是我们常用的Mybatis的#{}和${} 都会提到sql注入的问题 1.1啥是sql注入 应该知道是说传入无关的参数&#xff0c;比如本来是想要一个where条件查询参数 但是你拼了一个drop 比如 原来的sql select * from…

统计学习方法 支持向量机(下)

文章目录 统计学习方法 支持向量机&#xff08;下&#xff09;非线性支持向量机与和核函数核技巧正定核常用核函数非线性 SVM 序列最小最优化算法两个变量二次规划的求解方法变量的选择方法SMO 算法 统计学习方法 支持向量机&#xff08;下&#xff09; 学习李航的《统计学习方…

如何将Python程序打包并保护源代码

导言: 在某些情况下,我们可能希望将Python程序打包成可执行文件,以便用户无法查看程序的源代码。这种需求通常出现在商业软件、数据分析工具或其他需要保护知识产权的场景中。本文将介绍如何使用PyInstaller工具来打包Python程序,并确保它可以执行和读取配置信息。 第一步…

纺织辅料经营小程序商城的作用是什么

数字化时代&#xff0c;各个行业都在通过线上转型实现新增长&#xff0c;纺织辅料厂商也一样&#xff0c;需要通过线上化实现与客户的面对面交流有利于打造品牌及构建私有化流量池&#xff0c;但无论入驻哪家电商平台及垂直行业平台&#xff0c;都有不少的佣金及入驻费。 可通过…

【人工智能Ⅰ】实验1:谓词表示法与产生式知识表示

实验1 谓词表示法与产生式知识表示 一、实验目的 1、熟悉谓词逻辑表示法&#xff1b; 2、理解和掌握产生式知识表示方法&#xff0c;实现产生式系统的规则库。 二、实验内容 要求通过C/C/python语言编程实现&#xff1a; 1、猴子摘香蕉问题 2、动物识别系统 &#xff08…

长城首款MPV上市,能否迎来「高山」时刻?

作者 | Amy 编辑 | 德新 去年下半年起&#xff0c;自主品牌接二连三抢占高端MPV市场&#xff0c;腾势D9、岚图梦想家、极氪009以及传祺E9等车型接连亮相。 国泰君安证券研究报告显示&#xff0c;新能源MPV 2021年、2022年年销量分别为4.4万和9.2万辆&#xff0c;今年上半年销…