Skip to content
 

Extracting Data from a Source System to History Tables

This is a topic I haven’t found much information written about, however nearly every system I’ve worked with needs this exact functionality. It is important that the method for extracting data be done in a way that does not hinder performance of the source system.  In this example, the goal is to extract data from a source system, into another database (or server) all while requiring as little resources as possible.  This is why I choose to pull from a source system in two separate stages.

First Stage – Staging Import

The first step is to do a very simple select statement into a staging table. This first select statement may do some ETL — mostly in regards to lookups that are needed from the source system. There could be multiple select statements pulling data into multiple staging tables. I prefer to pull tables from the source to staging in a one to one relationship. So for every table we need, we also have a corresponding staging table. See the diagram below:

Transfer from source system to historical tables

Source to History database diagram

The reason for pulling one to one is simple.  First of all, the query is a very simple select.  Second, it makes troubleshooting very simple.  After importing into staging, the next step is to move the records to the history table(s).

Second Stage – Historical Import

In the historical import, we compare what we have in our history table with what is in staging.  Each record in staging is joined with the corresponding current record in the history table on the primary key(s).  The checksum of each of the columns is then compared to see if an update has happened to the row.  If a row has been updated from the source system, we will need to record this in the history table.  In order to do this, we close out the previous record by way of effective dating.

Limiting the Pulls

During the production hours, it’s not a good idea to pull every record from the source system in order to compare to the destination. Therefore, we implement a method of running called an incremental refresh. The incremental refresh simply selects all the records from the table who have a change date greater than the time of the last successful pull. We also employ a full refresh, which can be run at the end of the night, which will allow for a complete comparison of source to destination.

Code


CREATE PROCEDURE [Dba].[Refresh_Table_1]
	@RefreshType	int = 1,
	@DebugMode		int = 0
AS

SET NOCOUNT ON
BEGIN

	DECLARE @JobID				int
	DECLARE @DataFlowID			int			= 1002			-- Internal code specific to Refresh_Table_1
	DECLARE @LastMaxChangeDate	datetime					-- time the last job ended
	DECLARE @MaxChangeDate		datetime
	DECLARE @RowsStaging		int			= 0
	DECLARE @RowsInserted		int			= 0
	DECLARE @RowsUpdated		int			= 0
	DECLARE @ErrorDesc			varchar(max)
	DECLARE @ErrorSeverity		int
	DECLARE @IsSuccess			bit			= 1
	DECLARE @Now				datetime	= GETDATE()
	DECLARE @EffToDate			datetime	= '2079-06-06T00:00:00.000'

	INSERT INTO Log.DataImport
	(
		DataFlowID,
		TimeBegin,
		RowsInserted,
		RowsUpdated,
		RowsDeleted,
		RefreshType
	)
	SELECT
		@DataFlowID,
		TimeBegin = @Now,
		NULL,
		NULL,
		NULL,
		@RefreshType

	SET @JobID = (SELECT @@IDENTITY)

	IF @RefreshType = 1
		-- Incremental Refresh
		BEGIN
			SET @LastMaxChangeDate =
			(
				SELECT LastMaxChangeDate
				FROM History.Log.DataImport
				WHERE JobID =
				(
					SELECT MAX(JobID)
					FROM History.Log.DataImport
					WHERE
						DataFlowID = @DataFlowID
					AND
						IsComplete = 1
					AND
						IsSuccess = 1
				)
			)
		END
	ELSE
		-- Full Refresh
		BEGIN
			SET @LastMaxChangeDate = 1
		END

	-- Get the last time we entered data into history
	SET @MaxChangeDate =
	(
		SELECT MAX(DateModified)
		FROM SourceDB.Dbo.Employees
	)

	-- Drop current index on staging for fast inserting
	IF  EXISTS (SELECT * FROM sys.indexes WHERE object_id = OBJECT_ID(N'[Stg].[Table_1]') AND name = N'IDX_Tmp_Stg_Table_1')
	DROP INDEX [IDX_Tmp_Stg_Table_1] ON [Stg].[Table_1] WITH ( ONLINE = OFF )

	TRUNCATE TABLE Stg.Table_1

	INSERT INTO Stg.Table_1
	(
		EmployeeID
       ,FirstName
       ,LastName
       ,Marital_Status
       ,StartDate
       ,TerminatedDate
       ,ManagerID
       ,Dependents
       ,EmploymentType
       ,HasFelony
       ,FlagRehire
       ,DateModified
       ,JobID
       ,_Chksum
	)
	SELECT
		EmployeeID
       ,FirstName
       ,LastName
       ,Marital_Status
       ,StartDate
       ,TerminatedDate
       ,ManagerID
       ,Dependents
       ,EmploymentType
       ,HasFelony
       ,FlagRehire
       ,DateModified
       ,JobID
       ,_Chksum =
		   CHECKSUM
		   (
				FirstName
				,LastName
				,Marital_Status
				,StartDate
				,TerminatedDate
				,ManagerID
				,Dependents
				,EmploymentType
				,HasFelony
				,FlagRehire
			)
    FROM SourceDB.Dbo.Employees pfh
    WHERE pfh.DateModified > @LastMaxChangeDate
		AND pfh.DateModified <= @MaxChangeDate

    SET @RowsStaging = (SELECT @@ROWCOUNT)

	CREATE INDEX IDX_Tmp_Stg_FirstName ON Stg.Table_1 (FirstName,LastName,Marital_Status) INCLUDE(_Chksum)

	-- Must be in transaction because we cannot close out
	-- without successfully inserting a changed record
	BEGIN TRY

		BEGIN TRANSACTION
			-- Close out records that have changed
			UPDATE hpf
			SET
				_IsCurrentRecord = 0
				,_DateLastUpdated = @Now
				,_ToDate = spf.DateModified
				,_JobID = @JobID
			FROM Hst.Table_1 hpf
			JOIN Stg.Table_1 spf
				ON hpf.EmployeeID = spf.EmployeeID AND
				hpf._ChkSum != spf._ChkSum
			WHERE hpf._IsCurrentRecord = 1

			SET @RowsUpdated = (SELECT @@ROWCOUNT)

			-- Insert changed and new records
			INSERT INTO Hst.Table_1
			(
				EmployeeID
				,FirstName
				,LastName
				,Marital_Status
				,StartDate
				,TerminatedDate
				,ManagerID
				,Dependents
				,EmploymentType
				,HasFelony
				,FlagRehire
				,_FromDate
				,_ToDate
				,_ProcessInd
				,_IsCurrentRecord
				,_DateLastUpdated
				,_JobID
				,_ChkSum
			)
			SELECT DISTINCT
				EmployeeID						= spf.EmployeeID
				,FirstName						= spf.FirstName
				,LastName						= spf.LastName
				,Marital_Status					= spf.Marital_Status
				,StartDate						= spf.StartDate
				,TerminatedDate					= spf.TerminatedDate
				,ManagerID						= spf.ManagerID
				,Dependents						= spf.Dependents
				,EmploymentType					= spf.EmploymentType
				,HasFelony						= spf.HasFelony
				,FlagRehire						= spf.FlagRehire
				,_FromDate						= spf.DateModified
				,_ToDate						= @EffToDate
				,_ProcessInd					= NULL
				,_IsCurrentRecord				= 1
				,_DateLastUpdated				= @Now
				,_JobID							= @JobID
				,_ChkSum						= spf._ChkSum

			FROM Stg.Table_1 spf
			WHERE NOT EXISTS(
				SELECT 1
				FROM Hst.Table_1 hpf
				WHERE
					hpf.EmployeeID = spf.EmployeeID AND
					hpf._FromDate = spf.DateModified
			)

			SET @RowsInserted = (SELECT @@ROWCOUNT)

		COMMIT TRANSACTION

	END TRY

	BEGIN CATCH

		IF @@TRANCOUNT > 0
		BEGIN
			ROLLBACK TRAN
			SELECT @ErrorDesc = ERROR_MESSAGE()
			SELECT @ErrorSeverity = ERROR_SEVERITY()

			DECLARE @ErrorMessage varchar(55)

			SET @IsSuccess = 0
			SET @RowsUpdated = 0
			SET @RowsInserted = 0
		END

	END CATCH

	UPDATE Log.DataImport
	SET
		LastMaxChangeDate	= @MaxChangeDate,
		RowsStaging			= @RowsStaging,
		RowsInserted		= @RowsInserted,
		RowsUpdated			= @RowsUpdated,
		RefreshType			= @RefreshType,
		IsComplete			= 1,
		IsSuccess			= @IsSuccess,
		ElapsedMS			= DATEDIFF(ms,@Now,GETDATE())
	WHERE JobID = @JobID

	-- Output Job Statistics
	IF @DebugMode = 1
	BEGIN
		SELECT
		JobID					= @JobID
		,TimeStart				= @Now
		,TimeEnd				= GETDATE()
		,ElapsedMS				= DATEDIFF(ms,@Now,GETDATE())
		,LastMaxChangeDate		= @MaxChangeDate
		,RowsStaging			= @RowsStaging
		,RowsInserted			= @RowsInserted
		,RowsUpdated			= @RowsUpdated
		,RefreshType			= @RefreshType
		,IsSuccess				= @IsSuccess
		,Error					= @ErrorSeverity
		,ErrorMessage			= @ErrorDesc
	END

END


Popular search terms:

post a comment OR Post Your Question on our ASK! Community!