配置自行管理的 SQL Server 数据库

以下步骤介绍了如何配置自行管理的 SQL Server 数据库,以便与 Datastream 配合使用:

  1. 为源数据库启用 CDC。为此,请连接到数据库,并在 SQL 提示符或终端运行以下命令:

    USE [DATABASE_NAME]
    GO
    EXEC sys.sp_cdc_enable_db
    GO
    

    DATABASE_NAME 替换为您的源数据库的名称。

  2. 在需要捕获更改的表上启用 CDC:

    USE [DATABASE_NAME]
    EXEC sys.sp_cdc_enable_table
    @source_schema = N'SCHEMA_NAME',
    @source_name = N'TABLE_NAME',
    @role_name = NULL
    GO
    

    请替换以下内容:

    • DATABASE_NAME:源数据库的名称
    • SCHEMA_NAME:表所属的架构的名称
    • TABLE_NAME:要为其启用 CDC 的表的名称
  3. 启动 SQL Server Agent 并确保其始终运行。如果 SQL Server Agent 长时间保持关闭状态,日志可能会被截断,从而导致 Datastream 未读取的更改数据永久丢失。

    如需了解如何运行 SQL Server 代理,请参阅启动、停止或重启 SQL Server 代理的实例

  4. 创建 Datastream 用户:

    1. 连接到源数据库并输入以下命令:

      USE DATABASE_NAME;
      
    2. 创建要在 Datastream 中设置连接配置文件时使用的登录信息。

      CREATE LOGIN YOUR_LOGIN WITH PASSWORD = 'PASSWORD';
      
    3. 创建一个用户,并为其分配 db_ownerdb_denydatawriter 角色:

      CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN;
      
      EXEC sp_addrolemember 'db_owner', 'USER_NAME';
      EXEC sp_addrolemember 'db_denydatawriter', 'USER_NAME';
      
    4. 将此用户添加到 master 数据库:

      USE master;
      CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN;
      

使用事务日志 CDC 方法所需的其他步骤

仅当您配置源 SQL Server 数据库以用于事务日志 CDC 方法时,才需要执行本部分中介绍的步骤。

  1. sys.fn_dblog 函数授予 SELECT 权限。

    USE master;
    GRANT SELECT ON sys.fn_dblog TO USER_NAME;
    
  2. 将您的用户添加到 msdb 数据库,并为其分配以下权限:

    USE msdb;
    CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN;
    GRANT SELECT ON dbo.sysjobs TO USER_NAME;
    
  3. master 数据库中为用户分配以下权限:

      USE master;
      GRANT VIEW SERVER STATE TO YOUR_LOGIN;
    
  4. 设置您希望更改在来源中生效的保留期限。

    USE [DATABASE_NAME]
    EXEC sys.sp_cdc_change_job @job_type = 'capture' , @pollinginterval = 86399
    EXEC sp_cdc_stop_job 'capture'
    EXEC sp_cdc_start_job 'capture'
    

    @pollinginterval 参数以秒为单位,建议值设为 86399。这意味着事务日志会将更改保留 86399 秒(一天)。执行 sp_cdc_start_job 'capture 过程会启动设置。

  5. 如果数据库上正在运行任何清理或捕获作业,请停止这些作业。 如需了解详情,请参阅管理和监控变更数据捕获

  6. 设置日志截断保护。

    为确保 CDC 读取器有足够的时间来读取日志,同时允许日志截断以避免耗尽存储空间,您可以设置日志截断保护措施:

    1. 使用 SQL Server 客户端连接到数据库。
    2. 在数据库中创建虚拟表:

      USE [DATABASE_NAME];
      CREATE TABLE dbo.gcp_datastream_truncation_safeguard (
        [id] INT IDENTITY(1,1) PRIMARY KEY,
        CreatedDate DATETIME DEFAULT GETDATE(),
        [char_column] CHAR(8)
        );
      
    3. 创建一个存储过程,以便在您指定的时间段内运行活跃事务来防止日志截断:

      CREATE PROCEDURE dbo.DatastreamLogTruncationSafeguard @transaction_logs_retention_time INT
      AS
      BEGIN
      
      DECLARE @transactionLog TABLE (beginLSN BINARY(10), endLSN BINARY(10))
      INSERT @transactionLog EXEC sp_repltrans
      
      DECLARE @currentDateTime DATETIME = GETDATE()
      DECLARE @cutoffDateTime DATETIME = DATEADD(MINUTE, -@transaction_logs_retention_time, @currentDateTime)
      
      DECLARE @firstValidLSN BINARY(10) = NULL
      DECLARE @lastValidLSN BINARY(10) = NULL
      DECLARE @firstTxnTime DATETIME = NULL
      DECLARE @lastTxnTime DATETIME = NULL
      
      SELECT TOP 1
          @lastTxnTime = t.logStartTime,
          @lastValidLSN = t.beginLSN
      FROM (
        SELECT
          beginLSN AS beginLSN,
          (SELECT TOP 1 [begin time]
          FROM fn_dblog(stuff(stuff(CONVERT(CHAR(24), beginLSN, 1), 19, 0, ':'), 11, 0, ':'), DEFAULT)) AS logStartTime
        FROM @transactionLog
      ) t
      ORDER BY t.beginLSN DESC
      
      -- If all transactions are before cutoff, clear everything
      IF (@lastTxnTime < @cutoffDateTime)
      BEGIN
          EXEC sp_repldone NULL, NULL, 0, 0, 1
      END
      ELSE
      BEGIN
          -- Find the earliest transaction
          SELECT TOP 1
            @firstTxnTime = t.logStartTime,
            @firstValidLSN = ISNULL(@firstValidLSN, t.beginLSN)
          FROM (
            SELECT
              beginLSN AS beginLSN,
              (SELECT TOP 1 [begin time]
              FROM fn_dblog(stuff(stuff(CONVERT(CHAR(24), beginLSN, 1), 19, 0, ':'), 11, 0, ':'), DEFAULT)) AS logStartTime
            FROM @transactionLog
          ) t
          ORDER BY t.beginLSN ASC
      
          IF (@firstTxnTime < @cutoffDateTime)
          BEGIN
              -- Identify the earliest and latest LSNs within VLogs before cutoff
              SELECT
                @firstValidLSN = SUBSTRING(MAX(t.lsnMarkers), 1, 10),
                @lastValidLSN = SUBSTRING(MAX(t.lsnMarkers), 11, 10)
              FROM (
                SELECT MIN(beginLSN + endLSN) AS lsnMarkers
                FROM @transactionLog
                GROUP BY SUBSTRING(beginLSN, 1, 4)
              ) t
              WHERE (
                SELECT TOP 1 [begin time]
                FROM fn_dblog(stuff(stuff(CONVERT(CHAR(24), t.lsnMarkers, 1), 19, 0, ':'), 11, 0, ':'), DEFAULT)
                WHERE Operation = 'LOP_BEGIN_XACT'
              ) < @cutoffDateTime
      
              EXEC sp_repldone @firstValidLSN, @lastValidLSN, 0, 0, 0
          END
        END
      END;
      
    4. 创建另一个存储过程。这一次,您将创建一个作业,以按照指定的节奏运行上一步中创建的存储过程:

      CREATE PROCEDURE [dbo].[SetUpDatastreamJob] @transaction_logs_retention_time INT
      AS
      BEGIN
      
      DECLARE @database_name VARCHAR(MAX)
        SET @database_name =  (SELECT DB_NAME());;
      
        DECLARE @command_str VARCHAR(MAX);
        SET @command_str = CONCAT('Use ', @database_name,'; exec dbo.DatastreamLogTruncationSafeguard @transaction_logs_retention_time = ' + CAST(@transaction_logs_retention_time AS VARCHAR(10)));
      
        DECLARE @job_name VARCHAR(MAX);
      SET @job_name =
        CONCAT(@database_name, '_', 'DatastreamLogTruncationSafeguardJob1')
          DECLARE @current_time INT
        = CAST(FORMAT(GETDATE(), 'HHmmss') AS INT);
      
        -- Schedule the procedure to run after every 5 minutes.
        IF NOT EXISTS (
          SELECT * FROM msdb.dbo.sysjobs
          WHERE name = @job_name
        )
        BEGIN
          EXEC msdb.dbo.sp_add_job
          @job_name = @job_name,
          @enabled = 1,
          @description = N'Execute the procedure every 5 minutes.' ;
      
          EXEC msdb.dbo.sp_add_jobstep
          @job_name =  @job_name,
          @step_name = N'Execute_DatastreamLogTruncationSafeguard',
          @subsystem = N'TSQL',
          @command = @command_str;
      
            DECLARE @schedule_name_1 VARCHAR(MAX);
          SET @schedule_name_1 = CONCAT(@database_name, '_', 'DatastreamEveryFiveMinutesSchedule')
      
          EXEC msdb.dbo.sp_add_schedule
          @schedule_name = @schedule_name_1,
          @freq_type = 4,  -- daily start
          @freq_subday_type = 4,  -- every X minutes daily
          @freq_interval = 1,
          @freq_subday_interval = 5,
          @active_start_time = @current_time;
      
          EXEC msdb.dbo.sp_attach_schedule
          @job_name = @job_name,
          @schedule_name = @schedule_name_1 ;
      
          -- Add a schedule that runs the stored procedure on the SQL Server Agent startup.
          DECLARE @schedule_name_agent_startup VARCHAR(MAX);
          SET @schedule_name_agent_startup = CONCAT(@database_name, '_', 'DatastreamSqlServerAgentStartupSchedule')
      
          EXEC msdb.dbo.sp_add_schedule
          @schedule_name = @schedule_name_agent_startup,
          @freq_type = 64,  -- start on SQL Server Agent startup
          @active_start_time = @current_time;
      
          EXEC msdb.dbo.sp_attach_schedule
          @job_name = @job_name,
          @schedule_name = @schedule_name_agent_startup ;
      
          EXEC msdb.dbo.sp_add_jobserver
          @job_name = @job_name,
          @server_name = @@servername ;
        END
      END;
      
    5. 执行会创建 Datastream 作业的存储过程。

      EXEC [dbo].[SetUpDatastreamJob] @transaction_logs_retention_time = (INT)
      

      INT 替换为您要保留日志的分钟数。例如:

      • 60 会将保留时间设置为 1 小时
      • 24 * 60 会将保留时间设置为 1 天
      • 3 * 24 * 60 会将保留时间设置为 3 天