SELECT  s.name + '.' + t.Name AS [Table Name], part.rows AS [Total Rows In Table - Modified],

 CAST((SUMDISTINCT au.Total_pages) * 8 ) / 1024.000 / 1024.000 AS NUMERIC(183)) 

 AS [Table's Total Space In GB]

FROM 

 SYS.Tables t INNER JOIN sys.schemas s ON t.schema_id = s.schema_id

 INNER JOIN SYS.Indexes idx ON t.Object_id = idx.Object_id

 INNER JOIN SYS.Partitions part ON idx.Object_id = part.Object_id 

                    AND idx.Index_id = part.Index_id

 INNER JOIN SYS.Allocation_units au ON part.Partition_id = au.Container_id

 INNER JOIN SYS.Filegroups fGrp ON idx.Data_space_id = fGrp.Data_space_id

 INNER JOIN SYS.Database_files Df ON Df.Data_space_id = fGrp.Data_space_id

WHERE t.Is_ms_shipped = 0 AND idx.Object_id > 255 

GROUP BY t.Name, s.name, part.rows

ORDER BY [Table's Total Space In GB] DESC

'Database > Query' 카테고리의 다른 글

INDEX 상세 정보 조회  (0) 2020.11.02
프로시저 파리미터  (0) 2020.10.20
DELETE 복원 프로시저  (0) 2020.10.20
EXEC와 동적(adhoc) 쿼리  (0) 2020.10.05
Find and Kill all the Blocked Process/Query  (0) 2020.09.22

기업의 가장 기본적 목표는 '오래 생존하는 것'입니다. 디지털 트랜스포메이션은 생존 도구로써 중요성이 커지고 있습니다. 앞서 고객과 시장의 변화 요구에 빠르게 대응하기 위한 도구로 클라우드 아키텍처, 애자일(Agile) 조직문화, 거버넌스 등에 대해 살펴보았습니다. 사실 실무자 입장에선 이런 거시적 관점보단 당장 자신이 하고 있는 일에 어떤 변화가 생길지가 더 중요할 것입니다.

이번 글에선 클라우드 유연성에 기반한 데이터 분석 및 활용의 장점과 각 클라우드 서비스 제공자(CSP·Cloud Service Provider)가 제공하는 Cloud Native Application(이하 CNA) 기반의 End-to End 데이터 플랫폼의 이점, 그리고 성공 사례를 살펴보겠습니다.

클라우드는 유연하다

클라우드가 유연한 것과 최적화된 빅데이터 분석 환경을 구축하는 것이 무슨 관련이 있냐고 물을 수 있습니다. 여기서 클라우드의 유연함이란 ‘내가 원하는 서비스만을 골라 쓸 수 있다’는 점과 ‘내가 쓴 만큼 비용을 지불한다’는 점을 의미합니다. 즉, 빅데이터 분석의 세 가지 요소 기술인 ‘대량의(Volume)’ ‘다양한(Variety)’ 데이터를 ‘빠르게(Velocity)’ 최적화된 비용으로 가공할 수 있는 환경을 클라우드는 제공합니다.

 

예를 들어 매달 5TB의 신규 데이터가 발생하는 A사에서 데이터 분석을 위한 일반적인 온프레미스(On-Premise) 시스템을 구축한다고 가정해 보겠습니다.

[가정1: 직접 스토리지 서버를 구축하는 경우]

A사는 6개월치 데이터 처리량에 해당하는 30TB의 라이브 스토리지 서버를 구축하고, 6개월 이후 데이터를 옮겨 저장할 아카이브용 스토리지 서버를 따로 30TB 구축했습니다. 그리고 6개월에 한 번씩 데이터 통합 분석을 실시하기 위해 CPU 및 메모리 등의 컴퓨팅 서버 또한 최적화했습니다.

막상 구축하고 나니 스토리지와 컴퓨팅 파워는 초기에 여유가 있는 상황이며, 문제없이 잘 돌아가는 것 같으니 만족하면서 사용을 합니다. 하지만 더 의미 있는 데이터 분석 결과를 도출하기 위해 정보를 추가 수집하기로 의사 결정됨에 따라 매달 유입되는 데이터 량이 10TB로 증가했습니다. 혹은, 반대로 매달 유입되는 데이터 량이 줄거나 없어질 수도 있습니다.

이런 경우에 A사는 추가적인 스토리지, 컴퓨팅 자원을 확보(혹은 감가상각을 감수하고 재판매) 할 수밖에 없습니다. 최악의 경우 데이터 분석 환경과 관련된 불확실 요소들이 제거되지 않는 경우가 발생할 수도 있습니다.

[가정2: 클라우드 환경을 통해 환경을 구축하는 경우]

이번에는 A사가 클라우드 환경을 통해서 데이터 분석 환경을 구축한 경우를 생각해 보겠습니다. 첫 달에는 5TB를 위한 스토리지와 컴퓨팅 파워만을 사용했으며 두 번째 달에는 10TB를 위한 스토리지를 사용했습니다. 5개월 차부터 위치 정보, 결제 정보를 추가 수집하기로 했지만 확장성에 있어서 아무런 문제가 되지 않습니다.

5개월 차에는 30TB(5+5+5+5+10), 6개월 차에는 40TB의 스토리지를 사용하면 됩니다. 반대로 데이터 유입이 줄거나 없어지면 다시 자원을 조정하면 됩니다. 6개월에 한 번 시행하는 통합 데이터 분석 또한 그 시점에 필요한 40TB를 분석하려면 추가로 구성하면 됩니다.

뿐만 아니라, CSP들은 다양한 서비스를 제공하기에 6개월 이후의 데이터는 더욱 값싼 가격으로 보관할 수 있고(e.g. AWS S3 Glacier), 더욱 높은 컴퓨팅 파워를 위해 GPU를 활용할 수도 있으며(e.g. AWS EC2 P3 인스턴스) 대규모 데이터 처리를 위한 스토리지의 고성능 파일 시스템을 추가 확보할 수도 있습니다.

이와 같이 원하는 서비스를 선택해 사용할 수 있는 유연한 클라우드의 특성은 최적화된 빅데이터 분석 환경 구축을 가능하게 합니다. 그러므로 급변하는 비즈니스 환경에서 데이터 분석을 통한 애자일하고 초 개인화된 맞춤형 서비스, 제품을 개발하기 위해서 클라우드는 더 이상 선택이 아닌 필수라 말할 수 있습니다.

출처 게티이미지뱅크

클라우드는 자기 완결형 분석 환경이다

데이터를 분석하고 활용하는 일련의 활동은 기존에도 수행해 온 업무입니다. 하지만 조직에서 데이터를 처리하고 분석할 수 있는 가용자원의 한계 및 기존 IT 조직과의 협업이 원활하지 못한 점이 발목을 잡는 경우가 많은 것 또한 현실입니다. IT를 통해 제공받고 싶은 데이터는 규모, 속도, 다양성 측면에서 한계가 있으며 이는 곧 비즈니스 활동의 제약으로 이어집니다.

반면 각 CSP는 이러한 불편함을 해소하기 위해 미리 준비해 둔 데이터 플랫폼을 사용자가 End-to-End(전 과정을 완결된 서비스)로 빠르고 손쉽게 사용할 수 있게 해 줌으로써, 디지털 시대 비즈니스 환경 변화에 민첩하게 대응하기 위한 기존 방식과는 비교할 수 없는 장점을 제공합니다. 이를 반증하듯 가트너(Gartner) 리서치 결과에 따르면 약 80% 이상의 기업에서 CSP가 제공하는 클라우드 환경 기반의 데이터 관리를 고려하고 있습니다. 

80%이상이 고려하고 있다는 표 Gartner Research Circle Members. (출처: ID : 465521_C)

가트너(Gartner)에서 제공하는 Data & Analytics 아키텍처에서는 데이터 플랫폼의 End-to-End 프로세스의 Core Componen를 7개 영역으로 구분하고 있습니다.

l Gartner Technical Professional Advice. (출처: ID : 451418_C)

Core Component는 데이터의 수집(① Ingestion)부터 데이터를 변환 가공(② Transformation) 및 저장, 보존하고(③ Persistence) 이를 사용해 데이터 분석, 활용(④ Analytics and BI)에 적용, 나아가 데이터 사이언스, 기계학습(⑤ Data Science and ML)을 통한 보다 고도화된 분석 가치를 이끌어내는 데이터 애플리케이션 프로세스와 이를 관리하기 위한 데이터 거버넌스(⑥ Data Governance)  데이터 워크플로우 자동화(⑦ Data Orchestration)로 구성되며, 주요 CSP 사에서는 CNA 및 ISV(Independent Software Vendor)로 구성된 데이터 플랫폼을 제공하고 있습니다.

대표적 클라우드 데이터 플랫폼

① Amazon AWS 데이터 플랫폼

 

 

Amazon AWS Data Eco-system. (출처: https://aws.amazon.com/)

Amazon AWS의 데이터 플랫폼은 실시간 처리, 대용량 데이터의 다양성을 수용할 수 있는 저장, 사용자 분석을 용이하게 수행할 수 있는 비즈니스 인텔리전스(이하 BI) 및 기계학습(Machine Learning, 이하 ML) 등 Full Stack을 보유하고 있으며 특히 수요에 따른 빅데이터 애플리케이션을 보다 쉽게 구축, 배포, 조정할 수 있으므로 사용자는 관리보다는 비즈니스 문제 자체에 집중할 수 있습니다.

Amazon Redshift는 빠르고 완벽하게 관리되는 페타바이트(Petabyte) 규모의 데이터 웨어하우스(DW) 서비스로서 간편하고 비용 효율적으로 기존 BI 도구를 사용해 모든 데이터를 효율적으로 분석할 수 있습니다.

컬럼 방식의 스토리지 기술을 사용해 여러 노드에서 쿼리를 병렬로 실행하고 분산함으로써 데이터 세트 크기에 상관없이 빠른 쿼리 및 I/O 성능을 제공하며, 데이터 웨어하우스(DW)에 대한 프로비저닝, 구성, 모니터링, 백업 및 보안과 관련된 일반적인 관리 작업을 대부분 자동화하므로 유지 및 관리가 간편하면서도 비용이 적게 듭니다.

Amazon ML을 활용하면 누구나 예측 분석 및 기계 학습 기술을 손쉽게 사용할 수 있기에 현업 사용자도 복잡한 ML 알고리즘과 기술을 배우지 않아도 시각화 도구 및 대화식의 마법사 기능을 통한 모델 프로세스 작성할 수 있습니다.

또한 야간에 다양한 데이터 소스를 통합하고 처리하는 데이터 프로세스 작업을 고려해 Amazon EC2 스팟 시장을 제공하며 이를 통해 최대 90% 할인됨 금액으로 사용할 수 있는 다양한 요금제 또한 제공하고 있습니다. 다만 테라바이트(Terabyte) 규모의 데이터 수집은 지원하지 않으며 시퀀스 예측 및 비감독 클러스터링은 지원되지 않는 등 ML 모델의 한계는 존재합니다.

② Google GCP 데이터 플랫폼

 

Gartner Technical Professional Advice. (출처: ID : 451418_C)

구글의 클라우드 데이터 플랫폼(GCP)은 퍼블릭 클라우드 서비스 기반 데이터 분석 도구와 (솔루션 또한 서버리스 데이터 플랫폼을 지향하며 클라우드 DW 제품인 빅쿼리(BigQuery)와) 다양한 AI 제품군을 장점으로 꼽을 수 있습니다.

빅쿼리(BigQuery)는 GCP의 장점을 극대화한 제품으로, 별도의 설치작업을 거치지 않는 구글 클라우드 기반의 완전 관리형 DW 제품이며 SQL을 통해 현업도 쉽게 분석할 수 있고 페타바이트 규모의 처리 성능과 인프라 운영 등을 제공해 부가적인 관리 포인트를 전혀 신경 쓸 필요가 없습니다.

Google은 20년간 ML 및 AI 분야에서 혁신을 이룬 결과 다양한 솔루션을 보유하고 있습니다. 현업이 간단한 SQL만 사용해 쉽게 구조화 또는 반 구조화된 데이터를 기반으로 ML 모델을 만들고 예측할 수 있는 BigQueryML, 간단한 그래픽 사용자 인터페이스를 사용해 데이터를 기반으로 모델을 학습, 평가, 개선, 배포할 수 있는 Cloud Auto ML 등 별도의 전문화된 기술 및 지식 없이도 예측 모형을 현업이 직접 만들 수 있는 기능이 제공됩니다.

③ Microsoft Azure 데이터 플랫폼

Microsoft Build. Azure 데이터 플랫폼 (출처: docs.microsoft.com)

Microsoft Azure 데이터 플랫폼은 PaaS(Platform as a service) 형태의 클라우드 서비스로, 사용자 친화적인 OS 및 Office365 도구 등과 밀접하게 연계가 가능합니다. 유연성 측면에서는 제한적이지만 플랫폼 구축에 대한 비용 및 유지 보수 차원에서 보다 효율적이라는 점을 장점으로 꼽을 수 있습니다.

현업 분석 활용에 있어서는 기존 온프레미스에서 사용한 Azure SQL Database 및 Power BI 솔루션을 그대로 클라우드 환경에서 활용할 수 있고, 분석 측면에서는 Azure Machine Learning에서 제공하는 별도 제공 디자이너를 통해 사용자가 쉽게 기계학습모델을 만들 수 있습니다. 또한 Deep Learning 기반의 Pre-built 된 BOT Service를 통한 지능형 대화봇, Cognitive Services를 통한 시나리오 기반의 자연어 처리봇 등을 제공합니다.

주요 3사에서 제공하는 데이터 플랫폼의 특징은, 데이터 수집부터 분석까지 기구축되어 있는 최적화된 환경을 제공한다는 점이며, 현업 사용자가 전문 프로그래밍의 지식 없이도 간단한 조작만으로 비즈니스 가치에 더 집중할 수 있도록 한다는 점입니다.

대세로서의 클라우드

클라우드의 도입과 이에 기반한 빅데이터 분석 환경의 구축은 이미 대세라고 말할 수 있습니다. 영화산업에서 블록버스터사가 넷플릭스에 자리를 내어 주고 쇠퇴한 것처럼 새로운 흐름을 읽지 못하고 낡은 것에 안주하는 기업은 경쟁력을 잃고 몰락할 수밖에 없습니다. 실제로 많은 분야의 기업과 정부 기관에서 이 점을 인지하고 각종 클라우드 관련 사업을 활발히 발주하고 있습니다.

한국정보화진흥원(NIA)의 경우 공공기관 최초로 원내 전체 시스템을 민간 클라우드로 전환하겠다고 천명하고 ‘NIA ICT 클라우드 플랫폼 운영환경 구축’ 공고를 발주했습니다. 정부 시스템인 만큼 민감한 정보와 컴플라이언스 이슈가 많아 민관협력사업(PPP) 방식으로 사업이 이루어지며 데이터 센터를 비롯한 IaaS 구축부터 PaaS-TA를 활용한 PaaS 구축까지 사업 범위가 매우 방대합니다.

또한 행정안전부에서도 ‘전자정부 클라우드 플랫폼 구축’ 프로젝트를 통해 정부부처의 클라우드 기반 AI•빅데이터 활용 능력을 향상시키기 위한 노력을 기울이고 있습니다. 플랫폼은 포털, 클라우드 어시스트, 클라우드 엔진, 플랫폼 통합 관리, 연계 채널의 5개 모듈로 구성되며 클라우드 인프라 모듈의 경우 별도의 사업으로 발주해 진행 중입니다.

출처 한국정보화진흥원

과학기술정보통신부는 민관 협력을 통해 유통•소비, 통신, 금융, 헬스케어, 교통, 환경, 문화미디어, 중소기업, 산림, 지역 경제의 10가지 분야에 10종의 빅데이터 플랫폼을 클라우드 기반으로 구축했습니다. (이 중 민감한 정보가 많은 헬스케어의 경우 프라이빗 클라우드(Private Cloud), 교통과 유통•소비는 하이브리드(Hybrid)로 구성되었으며 나머지 플랫폼은 퍼블릭 클라우드(Public Cloud)로 구성되어 있습니다.)

정부뿐만 아니라 일반 기업도 클라우드 전환 및 데이터 분석 환경 구축에 힘을 쏟고 있는데, 금융업계의 경우 데이터 3법의 시행을 앞두고 적극적으로 클라우드 기반 빅데이터 분석 환경 구축에 앞장서고 있습니다. NH농협은행이 하이브리드•멀티 클라우드 전략을 펼치면서 빅데이터 분석, 클라우드 포털 등의 업무를 PaaS에 적용한 바 있고, KB국민은행도 클라우드 플랫폼 클레이온(CLAYON)의 운영을 통해 빅데이터를 활용한 초 개인화 서비스 개발에 박차를 가하고 있습니다.

AI 헬스케어 스타트업인 뷰노(Vuno)는 코로나19 확진자의 데이터를 기반으로 X-Ray 판독 솔루션을 무료로 제공하는데 이 과정에서 고사양의 컴퓨팅 파워를 확보하기 위해 AWS의 EC2 인스턴스와 Azure의 컴퓨팅 인스턴스를 사용하고 있습니다.

이처럼 많은 주체들이 앞다투어 클라우드와 빅데이터 분석 기술을 차용하고 있습니다. 즉 더 이상 클라우드와 빅데이터는 IT 기업만의 전유물이 아니며 모든 기업의 대세이자 흐름입니다. 아직은 도입 초기인 만큼 가시적인 성과가 현저하게 나타나고 있지는 않지만 향후에 선두주자들이 어떠한 경쟁력을 가지게 될지, 후발주자들은 어떤 후폭풍을 겪게 될지 지켜보는 것도 좋을 듯합니다.

declare @SchemaName sysname=NULL

, @TableName sysname=NULL

, @IndexName sysname=NULL

, @dataspace sysname=NULL

 

set @SchemaName = 'Sales'

set @TableName = 'SalesOrderDetail'

--set @IndexName = 'AK_SalesOrderDetail_rowguid'

 

 

declare @_SchemaName varchar(100)

declare @_TableName varchar(256)

declare @_IndexName varchar(256)

declare @ColumnName varchar(256)

declare @is_unique varchar(100)

declare @IndexTypeDesc varchar(100)

declare @FileGroupName varchar(100)

declare @is_disabled varchar(100)

declare @IndexColumnId int

declare @IsDescendingKey int

declare @IsIncludedColumn int

 

 

-- getting the index sizes

SELECT schema_name(t.schema_id) [SchemaName],

OBJECT_NAME(ix.OBJECT_ID) AS TableName,

ix.name AS IndexName,

CAST( 8 * SUM(a.used_pages)/1024.0 AS DECIMAL(20,1))AS 'Indexsize(MB)'

INTO #IndexSizeTable

from sys.tables t

inner join sys.indexes ix on t.object_id=ix.object_id

inner join sys.partitions AS p ON p.OBJECT_ID = ix.OBJECT_ID AND p.index_id = ix.index_id

inner join sys.allocation_units AS a ON a.container_id = p.partition_id

WHERE ix.type>0 and t.is_ms_shipped=0

and schema_name(t.schema_id)= isnull(@SchemaName,schema_name(t.schema_id)) and t.name=isnull(@TableName,t.name) AND ix.name=isnull(@IndexName, ix.name)

GROUP BY schema_name(t.schema_id), ix.OBJECT_ID,ix.name

ORDER BY OBJECT_NAME(ix.OBJECT_ID),ix.name

 

--getting important properties of indexes

select schema_name(t.schema_id) [SchemaName], t.name TableName, ix.name IndexName,

cast( '' as varchar(max)) AS IndexKeys, casT('' as varchar(max)) AS IncludedColumns,

ix.is_unique

, ix.type_desc, ix.fill_factor as [Fill_Factor]

, ix.is_disabled , da.name as data_space,

ix.is_padded,

ix.allow_page_locks,

ix.allow_row_locks,

INDEXPROPERTY(t.object_id, ix.name, 'IsAutoStatistics') IsAutoStatistics ,

ix.ignore_dup_key

INTO #helpindex

from sys.tables t

inner join sys.indexes ix on t.object_id=ix.object_id

inner join sys.data_spaces da on da.data_space_id= ix.data_space_id

where ix.type>0 and t.is_ms_shipped=0

and schema_name(t.schema_id)= isnull(@SchemaName,schema_name(t.schema_id)) and t.name=isnull(@TableName,t.name) AND ix.name=isnull(@IndexName, ix.name)

and da.name=isnull(@dataspace,da.name)

order by schema_name(t.schema_id), t.name, ix.name

 

---getting the index keys and included columns

declare CursorIndex cursor for

select schema_name(t.schema_id) [schema_name], t.name, ix.name

from sys.tables t

inner join sys.indexes ix on t.object_id=ix.object_id

where ix.type>0 and t.is_ms_shipped=0

and schema_name(t.schema_id)= isnull(@SchemaName,schema_name(t.schema_id)) and t.name=isnull(@TableName,t.name) AND ix.name=isnull(@IndexName, ix.name)

order by schema_name(t.schema_id), t.name, ix.name

open CursorIndex

fetch next from CursorIndex into @_SchemaName, @_TableName, @_IndexName

while (@@fetch_status=0)

begin

declare @IndexColumns varchar(4000)

declare @IncludedColumns varchar(4000)

set @IndexColumns=''

set @IncludedColumns=''

declare CursorIndexColumn cursor for

select col.name, ixc.is_descending_key, ixc.is_included_column

from sys.tables tb

inner join sys.indexes ix on tb.object_id=ix.object_id

inner join sys.index_columns ixc on ix.object_id=ixc.object_id and ix.index_id= ixc.index_id

inner join sys.columns col on ixc.object_id =col.object_id and ixc.column_id=col.column_id

where ix.type>0 and tb.is_ms_shipped=0

and schema_name(tb.schema_id)=@_SchemaName and tb.name=@_TableName and ix.name=@_IndexName

order by ixc.index_column_id

 

open CursorIndexColumn

fetch next from CursorIndexColumn into @ColumnName, @IsDescendingKey, @IsIncludedColumn

while (@@fetch_status=0)

begin

if @IsIncludedColumn=0

set @IndexColumns=@IndexColumns + @ColumnName +', '

else

set @IncludedColumns=@IncludedColumns + @ColumnName +', '

 

fetch next from CursorIndexColumn into @ColumnName, @IsDescendingKey, @IsIncludedColumn

end

close CursorIndexColumn

deallocate CursorIndexColumn

 

set @IndexColumns = substring(@IndexColumns, 1, len(@IndexColumns)-1)

set @IncludedColumns = case when len(@IncludedColumns) >0 then substring(@IncludedColumns, 1, len(@IncludedColumns)-1) else '' end

 

UPDATE #helpindex

SET IndexKeys = @IndexColumns, IncludedColumns=@IncludedColumns

WHERE [SchemaName]=@_SchemaName and TableName=@_TableName and IndexName=@_IndexName

 

fetch next from CursorIndex into @_SchemaName, @_TableName, @_IndexName

 

end

close CursorIndex

deallocate CursorIndex

 

--showing the results

SELECT hi.SchemaName, hi.TableName, hi.IndexName, hi.IndexKeys, hi.IncludedColumns, ixs.[Indexsize(MB)],

hi.is_unique, hi.type_desc,hi.data_space, hi.Fill_Factor, hi.IsAutoStatistics,

hi.is_disabled, hi.is_padded, hi.allow_page_locks, hi.allow_row_locks,hi.ignore_dup_key

FROM #helpindex hi

INNER JOIN #IndexSizeTable ixs ON hi.SchemaName=ixs.SchemaName and hi.TableName=ixs.TableName and hi.IndexName=ixs.IndexName

order by hi.SchemaName, hi.TableName, hi.IndexKeys, hi.IncludedColumns

 

drop table #helpindex

drop table #IndexSizeTable

'Database > Query' 카테고리의 다른 글

테이블 사이즈  (0) 2020.11.13
프로시저 파리미터  (0) 2020.10.20
DELETE 복원 프로시저  (0) 2020.10.20
EXEC와 동적(adhoc) 쿼리  (0) 2020.10.05
Find and Kill all the Blocked Process/Query  (0) 2020.09.22

CREATE PROCEDURE  [dbo].[SP_TEMP_INSERT_ERROR_LOG](

    @pi_name varchar(50)=''

    ,@pi_age int

 

AS

 

BEGIN 

  --  DECLARE @crud_count    BIGINT ;

    

    DECLARE @sp_name    NVARCHAR(100)  -- 프로시저 이름

            ,@sp_start_date    VARCHAR(25-- 프로시저 실행날짜

            ,@sp_end_date    VARCHAR(25-- 프로시저 종료 날짜

            ,@error_line    VARCHAR(50-- 에러라인

            ,@error_msg    NVARCHAR(MAX-- 에러메세지

            ,@error_number    VARCHAR(50-- 에러행

            ,@error_severity    VARCHAR(50-- 에러심각도

            

      SET NOCOUNT ON

      -- SET XACT_ABORT ON

      

      SET @sp_name = 'SP_TEMP_INSERT_ERROR_LOG';

/**

    IF XACT_STATE() = -1

        BEGIN

            GOTO ERRORHANDLER;

        END

        **/

    

 

    BEGIN TRY  

        print 'TRY 실행';

        SET @sp_start_date = CONVERT(VARCHAR(25), GETDATE(), 121);

 

        BEGIN TRAN

            

            INSERT INTO T_TEMP (TEMP_NAME, TEMP_AGE) VALUES(@pi_name, @pi_age);

            INSERT INTO T_TEMP (TEMP_NAME, TEMP_AGE) VALUES(@pi_name, '에러발생시키기'); -- 에러발생 

            INSERT INTO T_TEMP (TEMP_NAME, TEMP_AGE) VALUES(@pi_name, @pi_age+2); 

        

        COMMIT TRAN;

        PRINT '커밋 성공'

 

    END TRY 

    BEGIN CATCH

        PRINT 'CATCH 실행';

        

        SELECT @error_line=ERROR_LINE(), @error_number=ERROR_NUMBER(), @error_msg=ERROR_MESSAGE(), @error_severity=ERROR_SEVERITY()

        

        SET @sp_end_date=CONVERT(varchar(25), GETDATE(), 121);

 

        PRINT '에러발생 SP명 ===> ' + @sp_name

        PRINT '에러라인 ===> ' + @error_line

        PRINT '에러발생시간 ===> ' + @sp_end_date 

        PRINT '에러번호 ===> ' + @error_number

        PRINT '에러 메세지 ===> ' + @error_msg 

        PRINT '에러 심각도 ===> '+ @error_severity

 

        ROLLBACK TRAN

    END CATCH

 

END 

 -- ERRORHANDLER:

--    print 'ERRORHANDLER 실행'

--    IF XACT_STATE() <> 0

--        PRINT '롤백성공';

--        ROLLBACK TRAN ;

sp_helpdb tempdb

go

 

 

ALTER DATABASE tempdb ADD FILE ( NAME = N'tempdev2',

FILENAME = N'E:\Program Files\Microsoft SQL Server\MSSQL12.GHOST\MSSQL\DATA\tempdev2.ndf' , SIZE =2MB , FILEGROWTH = 2MB)

ALTER DATABASE tempdb ADD FILE ( NAME = N'tempdev3',

FILENAME = N'E:\Program Files\Microsoft SQL Server\MSSQL12.GHOST\MSSQL\DATA\tempdev3.ndf' , SIZE =3MB , FILEGROWTH = 3MB)

ALTER DATABASE tempdb ADD FILE ( NAME = N'tempdev4',

FILENAME = N'E:\Program Files\Microsoft SQL Server\MSSQL12.GHOST\MSSQL\DATA\tempdev4.ndf' , SIZE =4MB , FILEGROWTH = 4MB)

ALTER DATABASE tempdb ADD FILE ( NAME = N'tempdev5',

FILENAME = N'E:\Program Files\Microsoft SQL Server\MSSQL12.GHOST\MSSQL\DATA\tempdev5.ndf' , SIZE =5MB , FILEGROWTH = 5MB)

ALTER DATABASE tempdb ADD FILE ( NAME = N'tempdev6',

FILENAME = N'E:\Program Files\Microsoft SQL Server\MSSQL12.GHOST\MSSQL\DATA\tempdev6.ndf' , SIZE =6MB , FILEGROWTH = 6MB)

ALTER DATABASE tempdb ADD FILE ( NAME = N'tempdev7',

FILENAME = N'E:\Program Files\Microsoft SQL Server\MSSQL12.GHOST\MSSQL\DATA\tempdev7.ndf' , SIZE =7MB , FILEGROWTH = 7MB)

ALTER DATABASE tempdb ADD FILE ( NAME = N'tempdev8',

FILENAME = N'E:\Program Files\Microsoft SQL Server\MSSQL12.GHOST\MSSQL\DATA\tempdev8.ndf' , SIZE =8MB , FILEGROWTH = 8MB)

ALTER DATABASE tempdb ADD FILE ( NAME = N'tempdev9',

FILENAME = N'E:\Program Files\Microsoft SQL Server\MSSQL12.GHOST\MSSQL\DATA\tempdev9.ndf' , SIZE =9MB , FILEGROWTH = 9MB)

 

sp_helpdb tempdb

go

 

 

 

[작업내역]

1. tempdev9 파일을 제거하기

 

-- 데이터파일을 초기화 단계.

USE tempdb

GO

DBCC SHRINKFILE (tempdev9, EMPTYFILE);

GO

 

/*

DbId    FileId    CurrentSize    MinimumSize    UsedPages    EstimatedPages

2        10        1152        1152        0            0

*/

-- 데이터파일 제거 단계.

 

ALTER DATABASE tempdb

REMOVE FILE tempdev9; --to delete "tempdev12" data file

GO

 

-- 제거된 결과보기

sp_helpdb tempdb

 

2. 각기 다른 파일 사이즈 통일하기.

 

-- 8MB에서 5MB 로 줄이기

USE [tempdb]

GO

    DBCC SHRINKFILE (N'tempdev' , 5)

    GO

 

 

    DBCC SHRINKFILE (N'tempdev6' , 5)

    DBCC SHRINKFILE (N'tempdev7' , 5)

    DBCC SHRINKFILE (N'tempdev8' , 5)

 

 

-- 작은 파일은 늘려주기

USE [master]

GO

    ALTER DATABASE [tempdb] MODIFY FILE ( NAME = N'tempdev2', SIZE = 5MB, filegrowth=5MB )

    ALTER DATABASE [tempdb] MODIFY FILE ( NAME = N'tempdev3', SIZE = 5MB, filegrowth=5MB )

    ALTER DATABASE [tempdb] MODIFY FILE ( NAME = N'tempdev4', SIZE = 5MB, filegrowth=5MB )

 

    ALTER DATABASE [tempdb] MODIFY FILE ( NAME = N'tempdev6', filegrowth=5MB )

    ALTER DATABASE [tempdb] MODIFY FILE ( NAME = N'tempdev7', filegrowth=5MB )

    ALTER DATABASE [tempdb] MODIFY FILE ( NAME = N'tempdev8', filegrowth=5MB )

 

-- 최종 결과 확인

    select DB_NAME(mf.database_id) database_name

    , mf.name logical_name, mf.file_id

    , CONVERT (DECIMAL (20,2)

    , (CONVERT(DECIMAL, size)/128)) as [file_size_MB]

    , CASE mf.is_percent_growth

    WHEN 1 THEN 'Yes'

    ELSE 'No'

    END AS [is_percent_growth]

    , CASE mf.is_percent_growth

    WHEN 1 THEN CONVERT(VARCHAR, mf.growth) + '%'

    WHEN 0 THEN CONVERT(VARCHAR, mf.growth/128) + ' MB'

    END AS [growth_in_increment_of]

    , CASE mf.is_percent_growth

    WHEN 1 THEN CONVERT(DECIMAL(20,2)

    ,(((CONVERT(DECIMAL, size)*growth)/100)*8)/1024)

    WHEN 0 THEN CONVERT(DECIMAL(20,2)

    , (CONVERT(DECIMAL, growth)/128))

    END AS [next_auto_growth_size_MB]

    , physical_name from sys.master_files mf

    where database_id =2 and type_desc= 'rows'

 

 

 



출처: https://www.overtop.co.kr/233 [AWSin]

 

 

특정 시점에 테이블의 데이터 값을 반환하는 테이블은 첫 번째 관계형 데이터베이스 이후로 우리와 함께 해왔지만 항상 특별한 쿼리와 제약 조건이 필요했고 제대로하기가 까다로울 수 있습니다. SQL Server 2016의 새로운 기능인 시스템 버전 임시 테이블은 이러한 테이블이 다른 테이블처럼 작동하도록합니다. 테이블을 생성하거나 기존 테이블을 수정하는 방법은 무엇입니까? 메모리 내 최적화 된 OLTP 테이블을 임시로 만들려면 어떻게해야합니까? Alex Grinberg가 방법을 보여줍니다.

임시 또는 시스템 버전 테이블은 SQL Server 2016의 데이터베이스 기능으로 도입되었습니다.이를 통해 현재 데이터가 아닌 지정된 시간에 저장된 데이터에 대한 정보를 제공 할 수있는 테이블 유형이 제공됩니다. ANSI SQL 2011은 먼저 임시 테이블을 데이터베이스 기능으로 지정했으며 이제 SQL Server에서 지원됩니다.

임시 테이블의 가장 일반적인 비즈니스 용도는 다음과 같습니다.

  • 천천히 변화하는 치수. 시간 테이블은 데이터웨어 하우징 데이터베이스에서 잘 알려진 문제인 시간 분할 데이터와 같이 지정된 기간 동안 최신 데이터를 쿼리하는 더 간단한 방법을 제공합니다.
  • 데이터 감사. 임시 테이블은 "상위"테이블에서 데이터가 수정 된시기를 결정하는 감사 추적을 제공합니다. 이를 통해 규정 준수 요구 사항을 충족하고 시간에 따른 데이터 변경 사항을 추적 및 감사하여 필요할 때 데이터 포렌식을 수행 하는 데 도움이됩니다 .
  • 레코드 수준 손상 복구 또는 복구 . 레코드가 실수로 삭제되거나 업데이트 된 경우 다운 타임없이 테이블 행의 데이터 변경을 '실행 취소'하는 방법을 설정합니다. 따라서 이전 버전의 데이터를 히스토리 테이블에서 검색하여 '상위'테이블에 다시 삽입 할 수 있습니다. – 이는 누군가 (또는 일부 애플리케이션 오류로 인해) 실수로 데이터를 삭제하고 사용자가 데이터를 되돌 리거나 복구하려고 할 때 도움이됩니다.
  • 문서 발행일에 대한 정확한 데이터로 재무 보고서, 송장 및 명세서  복제 합니다. 임시 테이블을 사용하면 특정 시점의 데이터를 쿼리하여 당시의 데이터 상태를 조사 할 수 있습니다.
  • 진행중인 비즈니스 활동에서 데이터가 시간에 따라 어떻게 변하는 지 이해하여 추세  분석하고 시간에 따라 데이터가 변하는 방식의 추세를 계산합니다.

SQL Server 2016이 도입되기 전 어두운 날에는 데이터 로깅 메커니즘이 트리거에서 명시 적으로 설정되어야했습니다. 간단한 예제를 제공하려면 Department 테이블 자체부터 시작하여 다음 구조로 Department 테이블 에 대한 기록 유지 관리를 자동화해야 합니다.

 

CREATE TABLE dbo.Department

    (

    DeptID INT NOT NULL,

    DeptName VARCHAR(50) NOT NULL,

    ManagerID INT NULL,

    ParentDeptID INT NULL,

    Created DATETIME NOT NULL

      CONSTRAINT DF_Department_Created DEFAULT GETDATE(),

    CONSTRAINT PK_Department_DeptID PRIMARY KEY CLUSTERED(DeptID ASC) ON [PRIMARY]

    ) ON [PRIMARY];

  GO

다음 단계는 변경 내역을 제공하는 두 개의 추가 열이 있는 Department_Log 테이블 을 만드는 것입니다.

  • LogDate
  • LogAction

 

CREATE TABLE dbo.Department_Log

    (

    DeptID INT NOT NULL,

    DeptName VARCHAR(50) NOT NULL,

    ManagerID INT NULL,

    ParentDeptID INT NULL,

    Created DATETIME NOT NULL,

    LogDate DATETIME NOT NULL,

    LogAction VARCHAR(10) NOT NULL

    ) ON [PRIMARY];

  GO

로깅 '히스토리'테이블이 준비되면 UPDATE 및 DELETE 작업에 대한 변경 사항을 기록하는 트리거를 만들 수 있습니다.

 

CREATE TRIGGER dbo.tr_Department_Log

  ON dbo.Department

  FOR UPDATE, DELETE

  AS

    BEGIN

    SET NOCOUNT ON;

    IF

      (SELECT COUNT(1)

         FROM inserted

           JOIN deleted

             ON Inserted.DeptID = Deleted.DeptID

      ) > 0

      BEGIN

      INSERT dbo.Department_Log

        (DeptID, DeptName, ManagerID, ParentDeptID, Created, LogDate, LogAction)

        SELECT Deleted.DeptID, Deleted.DeptName, Deleted.ManagerID,

          Deleted.ParentDeptID, Deleted.Created, GETDATE(), 'UPDATED'

        FROM deleted;

      END;

    ELSE

      BEGIN

      INSERT dbo.Department_Log

        (DeptID, DeptName, ManagerID, ParentDeptID, Created, LogDate, LogAction)

        SELECT Deleted.DeptID, Deleted.DeptName, Deleted.ManagerID,

          Deleted.ParentDeptID, Deleted.Created, GETDATE(), 'DELETED'

        FROM deleted;

      END;

    SET NOCOUNT OFF;

    END;

  GO

Department_Log 테이블이 트리거와 함께 작동 하는 방식을 보여주기 위해 DeptID = 1 인 행을 세 번 업데이트 한 다음이 행을 삭제하고 마지막으로 마지막 업데이트에서 DeptName 열을 원래 값으로 설정했습니다.

 

update dbo.Department

SET DeptName = ''

where DeptID = 1

 

update dbo.Department

SET DeptName = 'Engineering IT'

where DeptID = 1

 

update dbo.Department

SET DeptName = 'Engineering WEB'

where DeptID = 1

 

DELETE dbo.Department where DeptID = 1

 

INSERT dbo.Department(DeptID,  DeptName)

SELECT  DeptID,DeptName

FROM Department_Log WHERE DeptID = 1 and LogAction = 'DELETED'

 

update dbo.Department

SET DeptName = 'Engineering'

where DeptID = 1

 

select DeptID,  DeptName,Created,LogDate,LogAction from Department_Log

다음 그림에 표시된 Department_Log 테이블의 결과 : 

SQL Server 2016의 임시 테이블 기능은 로깅 메커니즘을 크게 단순화 할 수 있습니다. 이 문서에서는 시스템 버전 테이블을 작성하는 방법에 대한 단계별 지침을 제공합니다.

테이블을 임시 테이블로 마이그레이션하기 위해 기존 테이블에 임시 테이블 옵션을 설정할 수 있습니다. 새 임시 테이블을 생성하려면 임시 테이블 옵션을 ON으로 설정하기 만하면됩니다 (예 : SYSTEM_VERSIONING = ON). 임시 테이블 옵션이 활성화되면 SQL Server 2016은 "기록"테이블을 자동으로 생성하고 내부적으로 상위 및 기록 테이블을 모두 유지합니다. 하나는 실제 데이터를 저장하고 다른 하나는 기록 데이터를 저장합니다. 시간 테이블의 SYSTEM_TIME 기간 열 (예 : SysStartTime 및 SysEndTime)을 사용하면 메커니즘이 다른 시간 조각에 대한 데이터를보다 효율적으로 쿼리 할 수 ​​있습니다. 업데이트되거나 삭제 된 데이터는 "기록"테이블로 이동하는 반면 "상위"테이블은 업데이트 된 레코드에 대한 최신 행 버전을 유지합니다.

캐치는 무엇입니까?

임시 테이블의 가장 중요한 고려 사항, 제한 사항 및 제한 사항은 다음과 같습니다.

  • 임시 테이블과 히스토리 테이블 사이에 레코드를 연결하려면 임시 테이블에 기본 키가 있어야합니다. 그러나 히스토리 테이블은 기본 키를 가질 수 없습니다.
  • DATETIME2의 데이터 형식은 (예를 들어 SYSTEM_TIME 기간 열의 설정해야 SysStartTime  SysEndTime ).
  • 히스토리 테이블을 생성 할 때 항상 히스토리 테이블에서 임시 테이블의 스키마와 테이블 이름을 모두 지정해야합니다.
  • PAGE 압축은 히스토리 테이블의 기본 설정입니다.
  • 임시 테이블은 스토리지 비용에 영향을 미치고 성능 문제가있을 수있는 blob 데이터 유형 (nvarchar (max), varchar (max), varbinary (max), ntext, text 및 image)을 지원합니다.
  • 시간 테이블과 히스토리 테이블은 모두 동일한 데이터베이스에 생성되어야합니다. 링크 된 서버를 사용하여 임시 테이블을 제공 할 수 없습니다.
  • 히스토리 테이블에는 제약 조건, 기본 키, 외래 키 또는 열 제약 조건을 사용할 수 없습니다.
  • FOR SYSTEM_TIME 절을 사용하는 쿼리가있는 인덱싱 된 뷰에서 임시 테이블을 참조 할 수 없습니다.
  • SYSTEM_TIME 기간 열은 INSERT 및 UPDATE 문에서 직접 참조 할 수 없습니다.
  • SYSTEM_VERSIONING이 ON 인 동안에는 TRUNCATE TABLE을 사용할 수 없습니다.
  • 히스토리 테이블의 데이터를 직접 수정할 수 없습니다.

전체 고려 사항 및 제한 사항 목록은 온라인 설명서를 참조 하십시오.

임시 테이블 생성

Listing 1의 하나의 DDL 스크립트에서 임시 및 히스토리 테이블을 생성하는 방법을 설명했습니다. 앞서 언급했듯이, 두 열 모두에 대해 데이터 유형이 datetime2 인 SysStartTime  SysEndTime 열은 임시 테이블에 필요합니다. SysStartTime 컬럼  GENERATED ALWAYS AS ROW START NOT NULL 스펙 이어야 하며 SysEndTime  GENERATED ALWAYS AS ROW END NOT NULL 이어야 합니다 . 해당 열에 대한 기본값을 제공 할 의무는 없지만 권장합니다. SysStartTime  SysEndTime 열은 모두 PERIOD FOR SYSTEM_TIME 열에 지정되어야합니다 (MSDN이 PERIOD를 정의한대로 다른 발행물 PERIOD calls 절).

참고 : 시스템 버전 열의 이름을 SysStartTime  SysEndTime 으로 지정할 필요는 없지만 시간 캡처 기능을 반영하도록 열 이름을 선택해야합니다. GENERATED ALWAYS AS ROW START / END 및 PERIOD FOR SYSTEM_TIME (nameFrom, nameTo) 옵션은 임시 테이블 기능을 활성화합니다.

 

CREATE TABLE Department

    (

    DeptID INT NOT NULL PRIMARY KEY CLUSTERED,

    DeptName VARCHAR(50) NOT NULL,

    ManagerID INT NULL,

    ParentDeptID INT NULL,

    SysStartTime DATETIME2 GENERATED ALWAYS AS ROW START

      CONSTRAINT DF_Department_SysStartTime DEFAULT SYSUTCDATETIME() NOT NULL,

    SysEndTime DATETIME2 GENERATED ALWAYS AS ROW END

      CONSTRAINT DF_Department_SysEndTime

   DEFAULT CONVERT( DATETIME2, '9999-12-31 23:59:59' ) NOT NULL,

    PERIOD FOR SYSTEM_TIME(SysStartTime, SysEndTime)

    )

  WITH (SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.DepartmentHistory));

목록 1 : 시간 및 히스토리 테이블 작성

임시 테이블이 생성되면 밑줄이 그어진 히스토리 테이블이 자동으로 생성되고 (그림 1), 히스토리에 대해 SysStartTime  SysEndTime (또는 시스템 버전 관리를 정의하기 위해 선택한 이름) 열이 있는 CLUSTERED INDEX 가 생성됩니다. 표, 목록 2.

 

CREATE CLUSTERED INDEX ix_DepartmentHistory

  ON dbo.DepartmentHistory

    (SysStartTime ASC,

    SysEndTime ASC

    ) ON [PRIMARY];

목록 2 : 클러스터형 인덱스 생성

임시 테이블에 새 열을 추가해야하는 경우 ALTER TABLE… ADD 열 DDL을 허용해야하며 새 열은 히스토리 테이블에서 자동으로 미러링됩니다.

그림 1 : 개체 탐색기에서 새로 생성 된 시간 및 기록 테이블 표시.

그러나 임시 테이블에는 DROP TABLE DDL을 사용할 수 없습니다. 먼저 SYSTEM_VERSIONING을 꺼야합니다.

 

ALTER TABLE Department SET (SYSTEM_VERSIONING = OFF);

목록 3 : 부서 테이블에서 SYSTEM_VERSIONING 비활성화.

SYSTEM_VERSIONING이 OFF로 설정되면 임시 테이블과 히스토리 테이블이 모두 일반 테이블이됩니다. 그런 다음 DROP TABLE 명령을 해당 테이블에 사용할 수 있습니다.

기존 테이블을 시스템 버전 테이블로 설정

SQL Server를 사용하면 기존 테이블을 임시 테이블로 변환 할 수 있습니다. 이 작업을 수행하려면 테이블에 기본 키가 있는지 확인하고 아직 존재하지 않는 경우 새로 만들어야합니다. 그런 다음 테이블은 두 개의 datetime2 데이터 유형 열로 변경되어야하며 GENERATED ALWAYS AS ROW START / END 옵션도…
PERIOD FOR SYSTEM_TIME (nameFrom, nameTo) 과 함께 적용 되어야합니다.
두 옵션 모두 ALTER 명령으로 완료해야합니다. 두 번째 ALTER 명령은 SYSTEM_VERSIONING 속성을 활성화하고 선택적으로 (명시 적으로 제공하는 것이 좋습니다) HISTORY_TABLE 속성의 이름을 제공합니다 (Listing 4).

예를 들어 기존 테이블 Department_Exist 를 임시 테이블로 설정해 보겠습니다 . 목록 4를 실행 한 다음 목록 5를 실행하십시오. 그림 2와 같이 테이블 아이콘을 새로 고쳐 결과를 확인하십시오.

 

CREATE TABLE Department_Exist (    

       DeptID int NOT NULL PRIMARY KEY CLUSTERED  

     , DeptName varchar(50) NOT NULL  

     , ManagerID INT  NULL  

     , ParentDeptID int NULL )  

목록 4 : Department_Exist 테이블 생성.

 

ALTER TABLE dbo.Department_Exist

  ADD SysStartTime datetime2 GENERATED ALWAYS AS ROW START  

  CONSTRAINT DF_Department_Exist_SysStartTime DEFAULT SYSUTCDATETIME() NOT NULL,

  SysEndTime datetime2 GENERATED ALWAYS AS ROW END

  CONSTRAINT DF_Department_Exist_SysEndTime DEFAULT CONVERT (DATETIME2, '9999-12-31 23:59:59') NOT NULL,

         PERIOD FOR SYSTEM_TIME (SysStartTime, SysEndTime)

  GO

  

  ALTER TABLE dbo.Department_Exist

      SET (SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.Department_ExistHistory))

  GO

목록 5 : 시스템 버전 열을 추가하고 Department_Exist 테이블 에서 시스템 버전을 활성화 합니다.

기존 테이블

시간 테이블로 변환

그림 2 : Temporal 테이블로 변환 한 후 Department_Exist  나란히 비교 합니다.

임시 테이블의 메타 데이터를 확인하십시오.

 

-- List temporal tables, temporal_type = 2  

  SELECT tables.object_id, temporal_type, temporal_type_desc, history_table_id,

    tables.name

    FROM sys.tables

    WHERE temporal_type = 2 -- SYSTEM_VERSIONED_TEMPORAL_TABLE

  -- List temporal tables and history tables

  SELECT h.name temporal_name, h.temporal_type_desc, h.temporal_type,

    t.name AS history_table_name, t.temporal_type, t.temporal_type_desc

    FROM sys.tables t

      JOIN sys.tables h

        ON t.object_id = h.history_table_id

메모리 내 최적화 된 OLTP 테이블을 시스템 버전 테이블로 변환

In-Memory Optimized OLTP 테이블을 시스템 버전 테이블 로 변환하는 프로세스 는 비슷하지만이 섹션에서 다루고 설명해야 할 몇 가지 차이점이 있습니다.

인 메모리 최적화 테이블을 시스템 버전 테이블로 변환 할 때 몇 가지 특정 세부 정보를 알고 있어야합니다.

  • 인 메모리 최적화 테이블은 내구성이 있어야합니다 (DURABILITY = SCHEMA_AND_DATA).
  • In-Memory 최적화 된 히스토리 테이블은 디스크 기반으로 생성됩니다.
  • 부모 테이블에만 영향을 미치는 쿼리는 기본적으로 컴파일 된 T-SQL 모듈에서 사용할 수 있습니다. 고유하게 컴파일 된 모듈에서 FOR SYSTEM TIME 절을 사용하여 임시 쿼리를 사용할 수 없지만 임시 쿼리 및 비원시 모듈에서 메모리 내 최적화 된 테이블과 함께 FOR SYSTEM TIME 절을 사용할 수 있습니다.
  • SYSTEM_VERSIONING = ON 일 때 메모리 내 최적화 된 상위 테이블에 대한 변경 사항에 대한 가장 최근 변경 사항 (INSERT, DELETE)을 수락하기 위해 내부 메모리 최적화 준비 테이블 이 자동으로 생성됩니다 .
  • 내부 메모리 내 최적화 된 준비 테이블의 데이터는 비동기 데이터 플러시 작업에 의해 정기적으로 디스크 기반 기록 테이블로 이동됩니다. 이 데이터 플러시 메커니즘은 내부 메모리 버퍼를 상위 개체의 메모리 사용량의 10 % 미만으로 유지하는 것을 목표로합니다. DMV sys.dm_db_xtp_memory_consumers 는 총 메모리 사용량을 추적하는 데 도움이됩니다.
  • 데이터 플러시는 sys.sp_xtp_flush_temporal_history @schema_name, @object_name 저장 프로 시저를 호출하여 적용 할 수 있습니다.
  • SYSTEM_VERSIONING = OFF이거나 열을 추가, 삭제 또는 변경하여 시스템 버전 테이블의 스키마를 수정하면 내부 스테이징 버퍼의 전체 내용이 디스크 기반 기록 테이블로 이동됩니다.
  • 기록 데이터 쿼리는 효과적으로 SNAPSHOT 격리 수준에서 수행되며 항상 중복없이 메모리 내 스테이징 버퍼와 디스크 기반 테이블 간의 결합을 반환합니다.
  • 테이블 스키마를 내부적으로 변경하는 ALTER TABLE 작업은 데이터 플러시를 수행해야하므로 작업 속도가 느려질 수 있습니다.

시스템 버전 테이블 옵션이 활성화 된 상태에서 새 메모리 내 최적화 된 OLTP 생성

임시 테이블 옵션을 사용하여 새로운 인 메모리 최적화 테이블을 생성하기위한 DDL은 구문이 기존 디스크 기반 테이블과 매우 유사합니다. 메모리 내 최적화 된 테이블 구문에는 처음에 MEMORY_OPTIMIZED 및 DURABILITY 속성을 설정하는 WITH 블록이 있습니다. 따라서 SYSTEM_VERSIONING 속성은 목록 7과 같이 쉼표로 구분하여 추가해야합니다.

 

CREATE TABLE dbo.InMemory

  (

   UniqueName varchar(50) NOT NULL

   PRIMARY KEY NONCLUSTERED HASH WITH (BUCKET_COUNT = 131072),

   City varchar(32) NULL,

   State_Province varchar(32) NULL,

   LastModified datetime NOT NULL

      , SysStartTime datetime2 GENERATED ALWAYS AS ROW START

   CONSTRAINT DF_InMemory_SysStartTime DEFAULT GETDATE() NOT NULL

     , SysEndTime datetime2 GENERATED ALWAYS AS ROW END

   CONSTRAINT DF_InMemory_SysEndTime

   DEFAULT CONVERT (DATETIME2, '9999-12-31 23:59:59') NOT NULL

     , PERIOD FOR SYSTEM_TIME (SysStartTime,SysEndTime)

  )

  WITH

  (

      MEMORY_OPTIMIZED = ON , DURABILITY = SCHEMA_AND_DATA,

      SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.InMemory_History)

  )

목록 7 : 시스템 버전 테이블 옵션이 활성화 된 상태에서 새로운 인 메모리 최적화 OLTP 생성

기존 메모리 내 최적화 된 OLTP 테이블에 시스템 버전 테이블 옵션 추가.

기존의 메모리 내 최적화 된 OLTP 테이블을 시스템 버전 테이블로 변환하는 것이 더 어렵습니다 (Listing 8 참조).

이 메커니즘을 보여주기 위해 테이블을 만들어 보겠습니다.

 

CREATE TABLE dbo.InMemoryExist

  (

   UniqueName varchar(50) NOT NULL

   PRIMARY KEY NONCLUSTERED HASH WITH (BUCKET_COUNT = 131072),

   City varchar(32) NULL,

   State_Province varchar(32) NULL

  )

  WITH

  (

      MEMORY_OPTIMIZED = ON , DURABILITY = SCHEMA_AND_DATA

  )

목록 8 : 새로운 인 메모리 최적화 OLTP 테이블 생성

테이블이 생성되면 Listing 9와 같이 데이터를 테이블에 추가하기 전에 임시 테이블 옵션을 추가해야합니다.

 

ALTER TABLE dbo.InMemoryExist

  ADD SysStartTime datetime2 GENERATED ALWAYS AS ROW START  

  CONSTRAINT DF_InMemoryExist_SysStartTime DEFAULT SYSUTCDATETIME() NOT NULL,

  SysEndTime datetime2 GENERATED ALWAYS AS ROW END

  CONSTRAINT DF_InMemoryExist_SysEndTime DEFAULT CONVERT (DATETIME2, '9999-12-31 23:59:59') NOT NULL,

         PERIOD FOR SYSTEM_TIME (SysStartTime, SysEndTime)

  GO

  

  ALTER TABLE dbo.InMemoryExist

      SET (SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.InMemoryExist_History))

  GO

목록 9 : 임시 테이블 옵션 추가

테이블에 이미 데이터가있는 경우 테이블을 시스템 버전 테이블로 변환하는 프로세스가 더 복잡합니다.

경우 InMemoryExist이 시스템 버전이 옵션을 사용하여 생성 된, 우리는 드롭 할 필요가 InMemoryExist  InMemoryExist_History 테이블 :

 

ALTER TABLE InMemoryExist set (SYSTEM_VERSIONING = OFF)

  GO

  DROP TABLE InMemoryExist

  GO

  DROP TABLE InMemoryExist_History

  GO

테이블을 다시 생성합니다 (이 섹션의 위에 제공된 코드 샘플, 목록 8 사용). 그런 다음 InMemoryExist 테이블에 데이터를 삽입 합니다.

 

INSERT InMemoryExist

  select NEWID(),name, type_desc from sys.objects where is_ms_shipped = 0

코드를 실행하여 임시 테이블 옵션 (목록 9)을 추가하면 다음 오류가 발생합니다.

 

Msg 13575, Level 16, State 0, Line 21

  ADD PERIOD FOR SYSTEM_TIME failed because table 'database.dbo.InMemoryExist' contains records where end of period is not equal to MAX datetime.

  Msg 13510, Level 16, State 1, Line 29

SYSTEM_TIME 기간이 정의되지 않은 경우 SYSTEM_VERSIONING을 ON으로 설정할 수 없습니다.

이러한 오류를 방지하려면보다 자세한 단계에서 시스템 버전 관리 옵션을 추가해야합니다.

 

--Step 1. Adding nullable the columns

  ALTER TABLE dbo.InMemoryExist ADD SysStartTime datetime2 NULL  

  GO

  ALTER TABLE dbo.InMemoryExist ADD SysEndTime datetime2 NULL  

  GO

  --Step 2 Adding the default constraints

  ALTER TABLE InMemoryExist ADD CONSTRAINT DF_InMemoryExist_SysStartTime DEFAULT GETDATE() FOR SysStartTime;

  GO

  ALTER TABLE InMemoryExist ADD CONSTRAINT DF_InMemoryExist_SysEndTime DEFAULT CAST('9999-12-31 23:59:59.9999999' AS DATETIME2) FOR SysEndTime;

  --Step 3 Updating the column

  UPDATE dbo.InMemoryExist

   SET SysStartTime = '19000101 00:00:00.0000000'

   ,SysEndTime = '99991231 23:59:59.9999999'

  GO

  --Step 4 Setting NOT NULL to the columns

  ALTER TABLE dbo.InMemoryExist ALTER COLUMN SysStartTime datetime2 NOT NULL  

  GO

  ALTER TABLE dbo.InMemoryExist ALTER COLUMN SysEndTime datetime2 NOT NULL  

  GO

  --Step 5 Adding PERIOD FOR SYSTEM_TIME option

  ALTER TABLE InMemoryExist ADD PERIOD FOR SYSTEM_TIME (SysStartTime, SysEndTime)

  GO

  --Step 6 Setting SYSTEM_VERSIONING property

  ALTER TABLE InMemoryExist

      SET (SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.InMemoryExist_History))

  GO

이제 시스템 버전 관리 프로세스에 InMemoryExist 테이블이 활성화됩니다.

DATA_CONSISTENCY_CHECK 속성 지정

기존 데이터에 대한 데이터 일관성 검사를 시행하려면 DATA_CONSISTENCY_CHECK = ON으로 SYSTEM_VERSIONING을 설정해야합니다. 그러나 DATA_CONSISTENCY_CHECK 속성은 현재 . 임시 테이블에 대해 DATA_CONSISTENCY_CHECK를 활성화하기로 결정한 경우 인스턴스에 SQL Server 2016 용 누적 업데이트 1이 있는지 확인합니다.

다음은 기존 테이블에서 DATA_CONSISTENCY_CHECK 속성을 활성화하는 예입니다.

 

ALTER TABLE InMemoryExist

      SET (SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.TableName_History, DATA_CONSISTENCY_CHECK = ON))

  For a new table, DATA_CONSISTENCY_CHECK property enables after HISTORY_TABLE property separated by a comma.

  WITH (SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.DepartmentHistory, DATA_CONSISTENCY_CHECK = ON));

결론

임시 테이블은 행 버전 프로세스를 자동화하는 데 매우 유용한 SQL Server 2016 기능입니다. 데이터 보관 작업을 단순화하고 데이터웨어 하우스 데이터베이스에 대해 천천히 변화하는 차원을 활용하기위한 실질적인 해결책이 될 수도 있습니다. 새 테이블과 기존 테이블을 설정하는 것이 매우 쉽기 때문에 임시 테이블 기능은 SQL Server 데이터베이스와 함께 구현하기에 좋은 선택입니다.

QL Server 2016에서 Temporal Table이라는 기능이 있어서, 소개하고자 합니다.

Temporal은 다음과 같은 뜻을 가집니다.

1. 현세적인, 속세의   2. 시간의; 시간의 제약을 받는   3. 관자놀이께의 

 

Temporal 테이블은 제가 볼때 엄청난 기능은 아니고, Table의 내용이 변경(UPDATE, DELETE)이 될 경우 그 내용을 기록하는 History (?)성 테이블이라고 생각하면 됩니다.

글로 표현하는 것보다 그림으로 보는 것이 더욱 이해가 될 것 입니다.

제가 생각 할때 이 그림이 가장 맞는거 같습니다. 크게 별다른 기능이 아니고, 단순히 유저가 DELETE , UPDATE 라는 조작을 가하면 테이블에 기록을 남기는 것 입니다.

음... 일종에 트리거 같은 기능을 종속하여, 추가한 것으로 보여집니다. 

 

바로 이 기능을 테스트 해보겠습니다.

 

 

 

Step 1 - 테이블 생성 및 데이터 입

 

우선 테이블을 생성하고, 데이터를 입력하는데 2개를 생성해 줍니다. 우리가 보기 위한 것은 기록을 검색하고, UPDATE , DELETE 시에 잘 남는지를 보는 것이므로,

생성과 동시에 데이터를 몇개 삽입 하도록 하겠습니다.

 

-- create history table CREATE TABLE dbo.PriceHistory (ID INT NOT NULL ,Product VARCHAR(50) NOT NULL ,Price NUMERIC(10,2) NOT NULL ,StartDate DATETIME2 NOT NULL ,EndDate DATETIME2 NOT NULL ) GO -- insert values for history INSERT INTO dbo.PriceHistory(ID,Product,Price,StartDate,EndDate) VALUES (1,'myProduct',1.15,'2015-07-01 00:00:00','2015-07-01 11:58:00') ,(1,'myProduct',1.16,'2015-07-01 11:58:00','2015-07-03 12:00:00') ,(1,'myProduct',1.18,'2015-07-03 12:00:00','2015-07-05 18:05:00') ,(1,'myProduct',1.21,'2015-07-05 18:05:00','2015-07-07 08:33:00') -- create current table to store prices CREATE TABLE dbo.Price (ID INT NOT NULL ,Product VARCHAR(50) NOT NULL ,Price NUMERIC(10,2) NOT NULL ,StartDate DATETIME2 NOT NULL ,EndDate DATETIME2 NOT NULL ,CONSTRAINT PK_Price PRIMARY KEY CLUSTERED (ID ASC) ) GO -- insert the current price (make sure start date is not in the future!) INSERT INTO dbo.Price(ID,Product,Price,StartDate,EndDate) VALUES (1,'myProduct',1.20,'2015-07-07 08:33:00','9999-12-31 23:59:59.9999999') GO

 

데이터가 잘 들어갔는지 확인해 보겠습니다.

첫번째가 Price 테이블 입니다. 그리고 아래가 PriceHistory 입니다.

 

 

 

 

Step 2 - Temporal Table 만들기 

 

이제 PriceHistory를 Price에 종속 시켜야합니다. 저는 종속이라는 표현을 쓰지만, Temporal 테이블을 생성한다고 하는게 맞겠네요.

아래를 보시면 StartDate 그리고 EndDate를 이용하여, Temporal 테이블의 기준 시간으로 삼습니다. 

-- enable system period columns ALTER TABLE dbo.Price ADD PERIOD FOR SYSTEM_TIME (StartDate,EndDate) GO -- turn on system versioning ALTER TABLE dbo.Price SET (SYSTEM_VERSIONING = ON (HISTORY_TABLE=dbo.PriceHistory,DATA_CONSISTENCY_CHECK=ON) ) GO

 

만약 Temporal 테이블이 정상적으로 만들어졌다면, 아래와 같은 모습을 확인 할 수 있습니다. Price 테이블을 클릭하면, 아래에 PriceHistory 테이블이 

나타나는 것을 확인 할 수 있습니다.

 

위의 2개의 테이블은 따로따로도 조회가 가능하며, Price 테이블의 과거 데이터를 조회 할 수도 있습니다.

 

SELECT * FROM dbo.Price FOR SYSTEM_TIME AS OF '2015-07-04' GO SELECT * FROM dbo.Price FOR SYSTEM_TIME AS OF '2015-07-03 12:00:00' GO SELECT * FROM dbo.Price FOR SYSTEM_TIME FROM '2015-07-06' TO '2015-07-06' GO

 

검색은 여러가지 가능하며, 부분적으로도 가능하며, BETWEEN 검색도 가능 합니다. 

이렇게 검색을 할 경우 현재 Price 테이블의 데이터를 조회하는 것이 아닌 PriceHistory 테이블을 조회합니다. 

 

위의 실행계획을 보면, PriceHistory를 조회하고 있는 것이 보이실 겁니다. 이전 같은 경우 Log 테이블을 따로 만들고, Join해서 조회할 것을 줄였다? 뭐 이렇게

생각하면 좋을 듯 합니다.

 

 

 

 

Step 3 - 데이터 조작

이제 데이터를 조작해 보겠습니다. 각각 데이터를 조작 할 경우 어떻게 남는지 확인해 보겠습니다.

-- Update UPDATE dbo.Price SET Price = 1.24 -- Delete DELETE dbo.Price -- Insert INSERT INTO dbo.Price(ID,Product,Price,StartDate,EndDate) VALUES (2,'YouProduct',1.60,'2015-07-07 08:33:00','9999-12-31 23:59:59.9999999');

 

우선 UPDATE 작업을 진행 합니다. 

UPDATE를 진행 할 경우 영향을 2개의 Table이 받는 것을 볼 수 있습니다.

 

보시는 바와 같이 PriceHistory 테이블에 Insert 작업이 수행됩니다. 분명히 저는 Price 테이블에만 Update 하였는데, 자동으로 삽입됩니다.

 

결과를 확인하면 다음과 같습니다.

결과를 보면 위의 Price 테이블의 StartDate가 제가 수행 한 날짜로 Update가 되었습니다.

그리고, 아래의 PriceHistory 테이블에 원래 4개의 행이 있었는데 5개로 증가하였습니다. 기존의 Price 테이블의 데이터 입니다.

 

이번에는 Delete를 수행하겠습니다. 

역시 Delete도 2개의 행이 영향을 받습니다.  실행계획에서도 이를 확인 할 수 있습니다.

또한 PriceHistory 테이블에 1개의 행이 추가된 것을 볼 수 있습니다.

 

마지막으로 Insert를 수행 하겠습니다. Insert는 Price 테이블에 해보겠습니다.

응? 오류가 납니다.... 

 

위에서 우리는 Insert구문에 StartDate와 EndDate의 값도 삽입하도록 Insert 구문에 명시하였습니다. 하지만 이는 우리가 건들일 수 없는 부분 입니다.

이것은 자동으로 SYSTEM 시간에 의해서 찍히므로, 이것을 제외한 Insert 구문을 날려야 합니다.

INSERT INTO dbo.Price(ID,Product,Price) VALUES (2,'YouProduct',1.60);

 

아래와 같이 정상적으로 추가 된 것을 확인 할 수 있습니다.

 

감사합니다.

 

 

 

추가

 

Trigger가 걸리는지 테스트를 요청하셔서, 한번 테스트 해봤습니다.

단순히 테이블을 추가하여, DELETE 시에 Trigger가 정상으로 동작하는지 보겠습니다.

 

테스트를 위해서 몇개의 행을 집어넣고, 테스트 하였습니다. 

 

CREATE TRIGGER DBO.Trg_TEST ON DBO.Price AFTER DELETE AS BEGIN SET NOCOUNT ON INSERT INTO PriceTrigger SELECT ID,'Trigger Test' FROM deleted END GO

 

만들고, DELETE를 이용하여, 행을 한개 지웠습니다.

결과는 아래에 보시는 바와 같습니다. 트리거에도 잘 남았으며, PriceHistory에도 잘 남은 것을 볼 수 있습니다.

 

 

참고 

https://msdn.microsoft.com/en-us/library/dn935015.aspx

http://www.infoq.com/news/2015/06/SQL-Server-Temporal

https://www.mssqltips.com/sqlservertip/3680/introduction-to-sql-server-2016-temporal-tables/



출처: https://burning-dba.tistory.com/75 [데이터엔지니어 주형권]

-- borrowed from  Erland Sommarskog

-- Link : http://www.sommarskog.se/query-plan-mysteries.html#dmvgettingplans

-- Remember that you are looking at the estimated plan so the actual no. of rows and actual executions wont be there ! <-- Important why a particular plan is bad.

 

DECLARE @dbname    nvarchar(256),

        @procname  nvarchar(256)

SELECT @dbname = 'Northwind',  -- Your DB name

       @procname = 'dbo.List_orders_11' -- The SP that you want to get parameters for !

 

WITH basedata AS (

   SELECT qs.statement_start_offset/2 AS stmt_start,

          qs.statement_end_offset/2 AS stmt_end,

          est.encrypted AS isencrypted, est.text AS sqltext,

          epa.value AS set_options, qp.query_plan,

          charindex('<ParameterList>', qp.query_plan) + len('<ParameterList>')

             AS paramstart,

          charindex('</ParameterList>', qp.query_plan) AS paramend

   FROM   sys.dm_exec_query_stats qs

   CROSS  APPLY sys.dm_exec_sql_text(qs.sql_handle) est

   CROSS  APPLY sys.dm_exec_text_query_plan(qs.plan_handle,

                                            qs.statement_start_offset,

                                            qs.statement_end_offset) qp

   CROSS  APPLY sys.dm_exec_plan_attributes(qs.plan_handle) epa

   WHERE  est.objectid  = object_id (@procname)

     AND  est.dbid      = db_id(@dbname)

     AND  epa.attribute = 'set_options'

), next_level AS (

   SELECT stmt_start, set_options, query_plan,

          CASE WHEN isencrypted = 1 THEN '-- ENCRYPTED'

               WHEN stmt_start >= 0

               THEN substring(sqltext, stmt_start + 1,

                              CASE stmt_end

                                   WHEN 0 THEN datalength(sqltext)

                                   ELSE stmt_end - stmt_start + 1

                              END)

          END AS Statement,

          CASE WHEN paramend > paramstart

               THEN CAST (substring(query_plan, paramstart,

                                   paramend - paramstart) AS xml)

          END AS params

   FROM   basedata

)

SELECT set_options AS [SET], n.stmt_start AS Pos, n.Statement,

       CR.c.value('@Column''nvarchar(128)'AS Parameter,

       CR.c.value('@ParameterCompiledValue''nvarchar(128)'AS [Sniffed Value],

       CAST (query_plan AS xmlAS [Query plan]

FROM   next_level n

CROSS  APPLY   n.params.nodes('ColumnReference'AS CR(c)

ORDER  BY n.set_options, n.stmt_start, Parameter

'Database > Query' 카테고리의 다른 글

테이블 사이즈  (0) 2020.11.13
INDEX 상세 정보 조회  (0) 2020.11.02
DELETE 복원 프로시저  (0) 2020.10.20
EXEC와 동적(adhoc) 쿼리  (0) 2020.10.05
Find and Kill all the Blocked Process/Query  (0) 2020.09.22

Create PROCEDURE Recover_Deleted_Data_Proc

 

@Database_Name NVARCHAR(MAX),

 

@SchemaName_n_TableName NVARCHAR(Max),

 

@Date_From DATETIME='1900/01/01',

 

@Date_To DATETIME ='9999/12/31'

 

AS

 

 

 

DECLARE @RowLogContents VARBINARY(8000)

 

DECLARE @TransactionID NVARCHAR(Max)

 

DECLARE @AllocUnitID BIGINT

 

DECLARE @AllocUnitName NVARCHAR(Max)

 

DECLARE @SQL NVARCHAR(Max)

 

DECLARE @Compatibility_Level INT

 

 

 

 

 

SELECT @Compatibility_Level=dtb.compatibility_level

 

FROM

 

master.sys.databases AS dtb WHERE dtb.name=@Database_Name

 

/* 

 

IF ISNULL(@Compatibility_Level,0)<=80

 

BEGIN

 

    RAISERROR('The compatibility level should be equal to or greater SQL SERVER 2005 (90)',16,1)

 

    RETURN

 

END

 

 */

 

IF (SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE [TABLE_SCHEMA]+'.'+[TABLE_NAME]=@SchemaName_n_TableName)=0

 

BEGIN

 

    RAISERROR('Could not found the table in the defined database',16,1)

 

    RETURN

 

END

 

 

 

DECLARE @bitTable TABLE

 

(

 

  [ID] INT,

 

  [Bitvalue] INT

 

)

 

--Create table to set the bit position of one byte.

 

 

 

INSERT INTO @bitTable

 

SELECT 0,2 UNION ALL

 

SELECT 1,2 UNION ALL

 

SELECT 2,4 UNION ALL

 

SELECT 3,8 UNION ALL

 

SELECT 4,16 UNION ALL

 

SELECT 5,32 UNION ALL

 

SELECT 6,64 UNION ALL

 

SELECT 7,128

 

 

 

--Create table to collect the row data.

 

DECLARE @DeletedRecords TABLE

 

(

 

    [Row ID]            INT IDENTITY(1,1),

 

    [RowLogContents]    VARBINARY(8000),

 

    [AllocUnitID]       BIGINT,

 

    [Transaction ID]    NVARCHAR(Max),

 

    [FixedLengthData]   SMALLINT,

 

    [TotalNoOfCols]     SMALLINT,

 

    [NullBitMapLength]  SMALLINT,

 

    [NullBytes]         VARBINARY(8000),

 

    [TotalNoofVarCols]  SMALLINT,

 

    [ColumnOffsetArray] VARBINARY(8000),

 

    [VarColumnStart]    SMALLINT,

 

    [Slot ID]           INT,

 

    [NullBitMap]        VARCHAR(MAX)

 

     

 

)

 

--Create a common table expression to get all the row data plus how many bytes we have for each row.

 

;WITH RowData AS (

 

SELECT

 

 

 

[RowLog Contents 0] AS [RowLogContents] 

 

 

 

,[AllocUnitID] AS [AllocUnitID] 

 

 

 

,[Transaction ID] AS [Transaction ID]  

 

 

 

--[Fixed Length Data] = Substring (RowLog content 0, Status Bit A+ Status Bit B + 1,2 bytes)

 

,CONVERT(SMALLINTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) AS [FixedLengthData]  --@FixedLengthData

 

 

 

-- [TotalnoOfCols] =  Substring (RowLog content 0, [Fixed Length Data] + 1,2 bytes)

 

,CONVERT(INTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], CONVERT(SMALLINTCONVERT(BINARY(2)

 

,REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 12)))) as  [TotalNoOfCols]

 

 

 

--[NullBitMapLength]=ceiling([Total No of Columns] /8.0)

 

,CONVERT(INTceiling(CONVERT(INTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], CONVERT(SMALLINTCONVERT(BINARY(2)

 

,REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 12))))/8.0)) as [NullBitMapLength] 

 

 

 

--[Null Bytes] = Substring (RowLog content 0, Status Bit A+ Status Bit B + [Fixed Length Data] +1, [NullBitMapLength] )

 

,SUBSTRING([RowLog Contents 0], CONVERT(SMALLINTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 3,

 

CONVERT(INTceiling(CONVERT(INTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], CONVERT(SMALLINTCONVERT(BINARY(2)

 

,REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 12))))/8.0))) as [NullBytes]

 

 

 

--[TotalNoofVarCols] = Substring (RowLog content 0, Status Bit A+ Status Bit B + [Fixed Length Data] +1, [Null Bitmap length] + 2 )

 

,(CASE WHEN SUBSTRING([RowLog Contents 0], 11In (0x10,0x30,0x70) THEN

 

CONVERT(INTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0],

 

CONVERT(SMALLINTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 3

 

CONVERT(INTceiling(CONVERT(INTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], CONVERT(SMALLINTCONVERT(BINARY(2)

 

,REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 12))))/8.0)), 2))))  ELSE null  ENDAS [TotalNoofVarCols] 

 

 

 

--[ColumnOffsetArray]= Substring (RowLog content 0, Status Bit A+ Status Bit B + [Fixed Length Data] +1, [Null Bitmap length] + 2 , [TotalNoofVarCols]*2 )

 

,(CASE WHEN SUBSTRING([RowLog Contents 0], 11In (0x10,0x30,0x70) THEN

 

SUBSTRING([RowLog Contents 0]

 

CONVERT(SMALLINTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 3

 

CONVERT(INTceiling(CONVERT(INTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], CONVERT(SMALLINTCONVERT(BINARY(2)

 

,REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 12))))/8.0)) + 2

 

, (CASE WHEN SUBSTRING([RowLog Contents 0], 11In (0x10,0x30,0x70) THEN

 

CONVERT(INTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0],

 

CONVERT(SMALLINTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 3

 

CONVERT(INTceiling(CONVERT(INTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], CONVERT(SMALLINTCONVERT(BINARY(2)

 

,REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 12))))/8.0)), 2))))  ELSE null  END)

 

2)  ELSE null  ENDAS [ColumnOffsetArray] 

 

 

 

--  Variable column Start = Status Bit A+ Status Bit B + [Fixed Length Data] + [Null Bitmap length] + 2+([TotalNoofVarCols]*2)

 

,CASE WHEN SUBSTRING([RowLog Contents 0], 11)In (0x10,0x30,0x70)

 

THEN  (

 

CONVERT(SMALLINTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 4 

 

 

 

CONVERT(INTceiling(CONVERT(INTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], CONVERT(SMALLINTCONVERT(BINARY(2)

 

,REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 12))))/8.0)) 

 

 

 

+ ((CASE WHEN SUBSTRING([RowLog Contents 0], 11In (0x10,0x30,0x70) THEN

 

CONVERT(INTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0],

 

CONVERT(SMALLINTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 3

 

CONVERT(INTceiling(CONVERT(INTCONVERT(BINARY(2), REVERSE(SUBSTRING([RowLog Contents 0], CONVERT(SMALLINTCONVERT(BINARY(2)

 

,REVERSE(SUBSTRING([RowLog Contents 0], 2 + 12)))) + 12))))/8.0)), 2))))  ELSE null  END) * 2)) 

 

 

 

ELSE null End AS [VarColumnStart]

 

,[Slot ID]

 

FROM sys.fn_dblog(NULLNULL)

 

WHERE

 

AllocUnitId IN

 

(SELECT [Allocation_unit_id] FROM sys.allocation_units allocunits

 

INNER JOIN sys.partitions partitions ON (allocunits.type IN (13)  

 

AND partitions.hobt_id = allocunits.container_id) OR (allocunits.type = 2 

 

AND partitions.partition_id = allocunits.container_id)  

 

WHERE object_id=object_ID('' + @SchemaName_n_TableName + ''))

 

 

 

AND Context IN ('LCX_MARK_AS_GHOST''LCX_HEAP'AND Operation in ('LOP_DELETE_ROWS'

 

And SUBSTRING([RowLog Contents 0], 11)In (0x10,0x30,0x70)

 

 

 

/*Use this subquery to filter the date*/

 

AND [TRANSACTION ID] IN (SELECT DISTINCT [TRANSACTION ID] FROM    sys.fn_dblog(NULLNULL

 

WHERE Context IN ('LCX_NULL'AND Operation in ('LOP_BEGIN_XACT')  

 

And [Transaction Name] In ('DELETE','user_transaction')

 

And  CONVERT(NVARCHAR(11),[Begin Time]) BETWEEN @Date_From AND @Date_To)),

 

 

 

--Use this technique to repeate the row till the no of bytes of the row.

 

N1 (n) AS (SELECT 1 UNION ALL SELECT 1),

 

N2 (n) AS (SELECT 1 FROM N1 AS X, N1 AS Y),

 

N3 (n) AS (SELECT 1 FROM N2 AS X, N2 AS Y),

 

N4 (n) AS (SELECT ROW_NUMBER() OVER(ORDER BY X.n)

 

           FROM N3 AS X, N3 AS Y)

 

 

 

 

 

 

 

INSERT INTO @DeletedRecords

 

SELECT  RowLogContents

 

        ,[AllocUnitID]

 

        ,[Transaction ID]

 

        ,[FixedLengthData]

 

        ,[TotalNoOfCols]

 

        ,[NullBitMapLength]

 

        ,[NullBytes]

 

        ,[TotalNoofVarCols]

 

        ,[ColumnOffsetArray]

 

        ,[VarColumnStart]

 

        ,[Slot ID]

 

         ---Get the Null value against each column (1 means null zero means not null)

 

        ,[NullBitMap]=(REPLACE(STUFF((SELECT ',' +

 

        (CASE WHEN [ID]=0 THEN CONVERT(NVARCHAR(1),(SUBSTRING(NullBytes, n, 1) % 2))  ELSE CONVERT(NVARCHAR(1),((SUBSTRING(NullBytes, n, 1) / [Bitvalue]) % 2)) END--as [nullBitMap]

 

         

 

FROM

 

N4 AS Nums

 

Join RowData AS C ON n<=NullBitMapLength

 

Cross Join @bitTable WHERE C.[RowLogContents]=D.[RowLogContents] ORDER BY [RowLogContents],n ASC FOR XML PATH('')),1,1,''),',',''))

 

FROM RowData D

 

 

 

IF (SELECT COUNT(*) FROM @DeletedRecords)=0

 

BEGIN

 

    RAISERROR('There is no data in the log as per the search criteria',16,1)

 

    RETURN

 

END

 

 

 

DECLARE @ColumnNameAndData TABLE

 

(

 

 [Row ID]           int,

 

 [Rowlogcontents]   varbinary(Max),

 

 [NAME]             sysname,

 

 [nullbit]          smallint,

 

 [leaf_offset]      smallint,

 

 [length]           smallint,

 

 [system_type_id]   tinyint,

 

 [bitpos]           tinyint,

 

 [xprec]            tinyint,

 

 [xscale]           tinyint,

 

 [is_null]          int,

 

 [Column value Size]int,

 

 [Column Length]    int,

 

 [hex_Value]        varbinary(max),

 

 [Slot ID]          int,

 

 [Update]           int

 

)

 

 

 

--Create common table expression and join it with the rowdata table

 

-- to get each column details

 

/*This part is for variable data columns*/

 

--@RowLogContents, 

 

--(col.columnOffValue - col.columnLength) + 1,

 

--col.columnLength

 

--)

 

INSERT INTO @ColumnNameAndData

 

SELECT

 

[Row ID],

 

Rowlogcontents,

 

NAME ,

 

cols.leaf_null_bit AS nullbit,

 

leaf_offset,

 

ISNULL(syscolumns.length, cols.max_length) AS [length],

 

cols.system_type_id,

 

cols.leaf_bit_position AS bitpos,

 

ISNULL(syscolumns.xprec, cols.precision) AS xprec,

 

ISNULL(syscolumns.xscale, cols.scale) AS xscale,

 

SUBSTRING([nullBitMap], cols.leaf_null_bit, 1AS is_null,

 

(CASE WHEN leaf_offset<1 and SUBSTRING([nullBitMap], cols.leaf_null_bit, 1)=0 

 

THEN

 

(Case When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) >30000

 

THEN

 

CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) - POWER(215)

 

ELSE

 

CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12))))

 

END)

 

END)  AS [Column value Size],

 

 

 

(CASE WHEN leaf_offset<1 and SUBSTRING([nullBitMap], cols.leaf_null_bit, 1)=0  THEN

 

(Case

 

 

 

When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) >30000 And

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])<30000

 

THEN  (Case When [System_type_id]In (35,34,99Then 16 else 24  end)

 

 

 

When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) >30000 And

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])>30000

 

THEN  (Case When [System_type_id]In (35,34,99Then 16 else 24  end--24 

 

 

 

When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) <30000 And

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])<30000

 

THEN (CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12))))

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart]))

 

 

 

When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) <30000 And

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])>30000

 

 

 

THEN POWER(215) +CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12))))

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])

 

 

 

END)

 

 

 

ENDAS [Column Length]

 

 

 

,(CASE WHEN SUBSTRING([nullBitMap], cols.leaf_null_bit, 1)=1 THEN  NULL ELSE

 

 SUBSTRING

 

 (

 

 Rowlogcontents, 

 

 (

 

 

 

(Case When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) >30000

 

THEN

 

CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) - POWER(215)

 

ELSE

 

CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12))))

 

END)

 

 

 

 - 

 

(Case When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) >30000 And

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])<30000

 

 

 

THEN  (Case When [System_type_id]In (35,34,99Then 16 else 24  end--24 

 

When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) >30000 And

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])>30000

 

 

 

THEN  (Case When [System_type_id]In (35,34,99Then 16 else 24  end--24 

 

When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) <30000 And

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])<30000

 

 

 

THEN CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12))))

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])

 

 

 

When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) <30000 And

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])>30000

 

 

 

THEN POWER(215) +CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12))))

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])

 

 

 

END)

 

 

 

) + 1,

 

(Case When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) >30000 And

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])<30000

 

 

 

THEN  (Case When [System_type_id] In (35,34,99Then 16 else 24  end--24 

 

When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) >30000 And

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])>30000

 

 

 

THEN  (Case When [System_type_id] In (35,34,99Then 16 else 24  end--24 

 

When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) <30000 And

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])<30000

 

 

 

THEN ABS(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12))))

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart]))

 

 

 

When CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12)))) <30000 And

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])>30000

 

 

 

THEN POWER(215) +CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * leaf_offset*-1) - 12))))

 

ISNULL(NULLIF(CONVERT(INTCONVERT(BINARY(2), REVERSE (SUBSTRING ([ColumnOffsetArray], (2 * ((leaf_offset*-1) - 1)) - 12)))), 0), [varColumnStart])

 

 

 

END)

 

)

 

 

 

ENDAS hex_Value

 

,[Slot ID]

 

,0

 

FROM @DeletedRecords A

 

Inner Join sys.allocation_units allocunits On A.[AllocUnitId]=allocunits.[Allocation_Unit_Id]

 

INNER JOIN sys.partitions partitions ON (allocunits.type IN (13)

 

AND partitions.hobt_id = allocunits.container_id) OR (allocunits.type = 2 AND partitions.partition_id = allocunits.container_id)

 

INNER JOIN sys.system_internals_partition_columns cols ON cols.partition_id = partitions.partition_id

 

LEFT OUTER JOIN syscolumns ON syscolumns.id = partitions.object_id AND syscolumns.colid = cols.partition_column_id

 

WHERE leaf_offset<0

 

UNION

 

/*This part is for fixed data columns*/

 

SELECT 

 

[Row ID],

 

Rowlogcontents,

 

NAME ,

 

cols.leaf_null_bit AS nullbit,

 

leaf_offset,

 

ISNULL(syscolumns.length, cols.max_length) AS [length],

 

cols.system_type_id,

 

cols.leaf_bit_position AS bitpos,

 

ISNULL(syscolumns.xprec, cols.precision) AS xprec,

 

ISNULL(syscolumns.xscale, cols.scale) AS xscale,

 

SUBSTRING([nullBitMap], cols.leaf_null_bit, 1AS is_null,

 

(SELECT TOP 1 ISNULL(SUM(CASE WHEN C.leaf_offset >1 THEN max_length ELSE 0 END),0FROM

 

sys.system_internals_partition_columns C WHERE cols.partition_id =C.partition_id And C.leaf_null_bit<cols.leaf_null_bit)+5 AS [Column value Size],

 

syscolumns.length AS [Column Length]

 

 

 

,CASE WHEN SUBSTRING([nullBitMap], cols.leaf_null_bit, 1)=1 THEN NULL ELSE

 

SUBSTRING

 

(

 

Rowlogcontents,(SELECT TOP 1 ISNULL(SUM(CASE WHEN C.leaf_offset >1 And C.leaf_bit_position=0 THEN max_length ELSE 0 END),0FROM

 

sys.system_internals_partition_columns C where cols.partition_id =C.partition_id And C.leaf_null_bit<cols.leaf_null_bit)+5

 

,syscolumns.length) END AS hex_Value

 

,[Slot ID]

 

,0

 

FROM @DeletedRecords A

 

Inner Join sys.allocation_units allocunits ON A.[AllocUnitId]=allocunits.[Allocation_Unit_Id]

 

INNER JOIN sys.partitions partitions ON (allocunits.type IN (13)

 

AND partitions.hobt_id = allocunits.container_id) OR (allocunits.type = 2 AND partitions.partition_id = allocunits.container_id)

 

INNER JOIN sys.system_internals_partition_columns cols ON cols.partition_id = partitions.partition_id

 

LEFT OUTER JOIN syscolumns ON syscolumns.id = partitions.object_id AND syscolumns.colid = cols.partition_column_id

 

WHERE leaf_offset>0

 

Order By nullbit

 

 

 

Declare @BitColumnByte as int

 

Select @BitColumnByte=CONVERT(INTceilingCount(*)/8.0)) from @ColumnNameAndData Where [System_Type_id]=104

 

 

 

;With N1 (n) AS (SELECT 1 UNION ALL SELECT 1),

 

N2 (n) AS (SELECT 1 FROM N1 AS X, N1 AS Y),

 

N3 (n) AS (SELECT 1 FROM N2 AS X, N2 AS Y),

 

N4 (n) AS (SELECT ROW_NUMBER() OVER(ORDER BY X.n)

 

           FROM N3 AS X, N3 AS Y),

 

CTE As(

 

Select RowLogContents,[nullbit]

 

        ,[BitMap]=Convert(varbinary(1),Convert(int,Substring((REPLACE(STUFF((SELECT ',' +

 

        (CASE WHEN [ID]=0 THEN CONVERT(NVARCHAR(1),(SUBSTRING(hex_Value, n, 1) % 2))  ELSE CONVERT(NVARCHAR(1),((SUBSTRING(hex_Value, n, 1) / [Bitvalue]) % 2)) END--as [nullBitMap]

 

 

 

from N4 AS Nums

 

Join @ColumnNameAndData AS C ON n<=@BitColumnByte And [System_Type_id]=104 And bitpos=0

 

Cross Join @bitTable WHERE C.[RowLogContents]=D.[RowLogContents] ORDER BY [RowLogContents],n ASC FOR XML PATH('')),1,1,''),',','')),bitpos+1,1)))

 

FROM @ColumnNameAndData D Where  [System_Type_id]=104)

 

 

 

Update A Set [hex_Value]=[BitMap]

 

from @ColumnNameAndData  A

 

Inner Join CTE B On A.[RowLogContents]=B.[RowLogContents]

 

And A.[nullbit]=B.[nullbit]

 

 

 

 

 

/**************Check for BLOB DATA TYPES******************************/

 

DECLARE @Fileid INT

 

DECLARE @Pageid INT

 

DECLARE @Slotid INT

 

DECLARE @CurrentLSN INT

 

DECLARE @LinkID INT

 

DECLARE @Context VARCHAR(50)

 

DECLARE @ConsolidatedPageID VARCHAR(MAX)

 

DECLARE @LCX_TEXT_MIX VARBINARY(MAX)

 

 

 

declare @temppagedata table

 

(

 

[ParentObject] sysname,

 

[Object] sysname,

 

[Field] sysname,

 

[Value] sysname)

 

 

 

declare @pagedata table

 

(

 

[Page ID] sysname,

 

[File IDS] int,

 

[Page IDS] int,

 

[AllocUnitId] bigint,

 

[ParentObject] sysname,

 

[Object] sysname,

 

[Field] sysname,

 

[Value] sysname)

 

 

 

DECLARE @ModifiedRawData TABLE

 

(

 

  [ID] INT IDENTITY(1,1),

 

  [PAGE ID] VARCHAR(MAX),

 

  [FILE IDS] INT,

 

  [PAGE IDS] INT,

 

  [Slot ID]  INT,

 

  [AllocUnitId] BIGINT,

 

  [RowLog Contents 0_var] VARCHAR(Max),

 

  [RowLog Length] VARCHAR(50),

 

  [RowLog Len] INT,

 

  [RowLog Contents 0] VARBINARY(Max),

 

  [Link ID] INT default (0),

 

  [Update] INT

 

)

 

 

 

            DECLARE Page_Data_Cursor CURSOR FOR

 

            /*We need to filter LOP_MODIFY_ROW,LOP_MODIFY_COLUMNS from log for deleted records of BLOB data type& Get its Slot No, Page ID & AllocUnit ID*/

 

            SELECT LTRIM(RTRIM(Replace([Description],'Deallocated',''))) AS [PAGE ID]

 

            ,[Slot ID],[AllocUnitId],NULL AS [RowLog Contents 0],NULL AS [RowLog Contents 0],Context

 

            FROM    sys.fn_dblog(NULLNULL)  

 

            WHERE   

 

            AllocUnitId IN

 

            (SELECT [Allocation_unit_id] FROM sys.allocation_units allocunits

 

            INNER JOIN sys.partitions partitions ON (allocunits.type IN (13)  

 

            AND partitions.hobt_id = allocunits.container_id) OR (allocunits.type = 2 

 

            AND partitions.partition_id = allocunits.container_id)  

 

            WHERE object_id=object_ID('' + @SchemaName_n_TableName + ''))

 

            AND Operation IN ('LOP_MODIFY_ROW'AND [Context] IN ('LCX_PFS'

 

            AND Description Like '%Deallocated%'

 

            /*Use this subquery to filter the date*/

 

            AND [TRANSACTION ID] IN (SELECT DISTINCT [TRANSACTION ID] FROM    sys.fn_dblog(NULLNULL

 

            WHERE Context IN ('LCX_NULL'AND Operation in ('LOP_BEGIN_XACT')  

 

            AND [Transaction Name]='DELETE'

 

            AND  CONVERT(NVARCHAR(11),[Begin Time]) BETWEEN @Date_From AND @Date_To)

 

            GROUP BY [Description],[Slot ID],[AllocUnitId],Context

 

 

 

            UNION

 

 

 

            SELECT [PAGE ID],[Slot ID],[AllocUnitId]

 

            ,Substring([RowLog Contents 0],15,LEN([RowLog Contents 0])) AS [RowLog Contents 0]

 

            ,CONVERT(INT,Substring([RowLog Contents 0],7,2)),Context --,CAST(RIGHT([Current LSN],4) AS INT) AS [Current LSN]

 

            FROM    sys.fn_dblog(NULLNULL)  

 

            WHERE  

 

             AllocUnitId IN

 

            (SELECT [Allocation_unit_id] FROM sys.allocation_units allocunits

 

            INNER JOIN sys.partitions partitions ON (allocunits.type IN (13)  

 

            AND partitions.hobt_id = allocunits.container_id) OR (allocunits.type = 2 

 

            AND partitions.partition_id = allocunits.container_id)  

 

            WHERE object_id=object_ID('' + @SchemaName_n_TableName + ''))

 

            AND Context IN ('LCX_TEXT_MIX'AND Operation in ('LOP_DELETE_ROWS'

 

            /*Use this subquery to filter the date*/

 

            AND [TRANSACTION ID] IN (SELECT DISTINCT [TRANSACTION ID] FROM    sys.fn_dblog(NULLNULL

 

            WHERE Context IN ('LCX_NULL'AND Operation in ('LOP_BEGIN_XACT')  

 

            And [Transaction Name]='DELETE'

 

            And  CONVERT(NVARCHAR(11),[Begin Time]) BETWEEN @Date_From AND @Date_To)

 

                         

 

            /****************************************/

 

 

 

        OPEN Page_Data_Cursor

 

 

 

        FETCH NEXT FROM Page_Data_Cursor INTO @ConsolidatedPageID, @Slotid,@AllocUnitID,@LCX_TEXT_MIX,@LinkID,@Context

 

 

 

        WHILE @@FETCH_STATUS = 0

 

        BEGIN

 

            DECLARE @hex_pageid AS VARCHAR(Max)

 

            /*Page ID contains File Number and page number It looks like 0001:00000130.

 

              In this example 0001 is file Number &  00000130 is Page Number & These numbers are in Hex format*/

 

            SET @Fileid=SUBSTRING(@ConsolidatedPageID,0,CHARINDEX(':',@ConsolidatedPageID)) -- Seperate File ID from Page ID

 

         

 

            SET @hex_pageid ='0x'SUBSTRING(@ConsolidatedPageID,CHARINDEX(':',@ConsolidatedPageID)+1,Len(@ConsolidatedPageID))  ---Seperate the page ID

 

            SELECT @Pageid=Convert(INT,cast('' AS XML).value('xs:hexBinary(substring(sql:variable("@hex_pageid"),sql:column("t.pos")) )''varbinary(max)')) -- Convert Page ID from hex to integer

 

            FROM (SELECT CASE substring(@hex_pageid, 12WHEN '0x' THEN 3 ELSE 0 ENDAS t(pos) 

 

             

 

            IF @Context='LCX_PFS'    

 

              BEGIN

 

                        DELETE @temppagedata

 

                        INSERT INTO @temppagedata EXEC'DBCC PAGE(' + @DataBase_Name + ', ' + @fileid + ', ' + @pageid + ', 1) with tableresults,no_infomsgs;'); 

 

                        INSERT INTO @pagedata SELECT @ConsolidatedPageID,@fileid,@pageid,@AllocUnitID,[ParentObject],[Object],[Field] ,[Value] FROM @temppagedata

 

              END

 

            ELSE IF @Context='LCX_TEXT_MIX'

 

              BEGIN

 

                        INSERT INTO  @ModifiedRawData SELECT @ConsolidatedPageID,@fileid,@pageid,@Slotid,@AllocUnitID,NULL,0,CONVERT(INT,CONVERT(VARBINARY,REVERSE(SUBSTRING(@LCX_TEXT_MIX,11,2)))),@LCX_TEXT_MIX,@LinkID,0

 

              END    

 

            FETCH NEXT FROM Page_Data_Cursor INTO  @ConsolidatedPageID, @Slotid,@AllocUnitID,@LCX_TEXT_MIX,@LinkID,@Context

 

        END

 

     

 

    CLOSE Page_Data_Cursor

 

    DEALLOCATE Page_Data_Cursor

 

 

 

    DECLARE @Newhexstring VARCHAR(MAX);

 

 

 

    --The data is in multiple rows in the page, so we need to convert it into one row as a single hex value.

 

    --This hex value is in string format

 

    INSERT INTO @ModifiedRawData ([PAGE ID],[FILE IDS],[PAGE IDS],[Slot ID],[AllocUnitId]

 

    ,[RowLog Contents 0_var]

 

    , [RowLog Length])

 

    SELECT [Page ID],[FILE IDS],[PAGE IDS],Substring([ParentObject],CHARINDEX('Slot', [ParentObject])+4, (CHARINDEX('Offset', [ParentObject])-(CHARINDEX('Slot', [ParentObject])+4))-2 ) as [Slot ID]

 

    ,[AllocUnitId]

 

    ,Substring((

 

    SELECT

 

    REPLACE(STUFF((SELECT REPLACE(SUBSTRING([Value],CHARINDEX(':',[Value])+1,CHARINDEX('†',[Value])-CHARINDEX(':',[Value])),'†','')

 

    FROM @pagedata C  WHERE B.[Page ID]= C.[Page ID] And Substring(B.[ParentObject],CHARINDEX('Slot', B.[ParentObject])+4, (CHARINDEX('Offset', B.[ParentObject])-(CHARINDEX('Slot', B.[ParentObject])+4)) )=Substring(C.[ParentObject],CHARINDEX('Slot', C.[ParentObject])+4, (CHARINDEX('Offset', C.[ParentObject])-(CHARINDEX('Slot', C.[ParentObject])+4)) ) And

 

    [Object] Like '%Memory Dump%'  Order By '0x'LEFT([Value],CHARINDEX(':',[Value])-1)

 

    FOR XML PATH('') ),1,1,'') ,' ','')

 

    ),1,20000AS [Value]

 

     

 

    ,

 

     Substring((

 

    SELECT '0x' +REPLACE(STUFF((SELECT REPLACE(SUBSTRING([Value],CHARINDEX(':',[Value])+1,CHARINDEX('†',[Value])-CHARINDEX(':',[Value])),'†','')

 

    FROM @pagedata C  WHERE B.[Page ID]= C.[Page ID] And Substring(B.[ParentObject],CHARINDEX('Slot', B.[ParentObject])+4, (CHARINDEX('Offset', B.[ParentObject])-(CHARINDEX('Slot', B.[ParentObject])+4)) )=Substring(C.[ParentObject],CHARINDEX('Slot', C.[ParentObject])+4, (CHARINDEX('Offset', C.[ParentObject])-(CHARINDEX('Slot', C.[ParentObject])+4)) ) And

 

    [Object] Like '%Memory Dump%'  Order By '0x'LEFT([Value],CHARINDEX(':',[Value])-1)

 

    FOR XML PATH('') ),1,1,'') ,' ','')

 

    ),7,4AS [Length]

 

     

 

    From @pagedata B

 

    Where [Object] Like '%Memory Dump%'

 

    Group By [Page ID],[FILE IDS],[PAGE IDS],[ParentObject],[AllocUnitId]--,[Current LSN]

 

    Order By [Slot ID]

 

 

 

    UPDATE @ModifiedRawData  SET [RowLog Len] = CONVERT(VARBINARY(8000),REVERSE(cast('' AS XML).value('xs:hexBinary(substring(sql:column("[RowLog Length]"),0))''varbinary(Max)')))

 

    FROM @ModifiedRawData Where [LINK ID]=0

 

 

 

    UPDATE @ModifiedRawData  SET [RowLog Contents 0] =cast('' AS XML).value('xs:hexBinary(substring(sql:column("[RowLog Contents 0_var]"),0))''varbinary(Max)')  

 

    FROM @ModifiedRawData Where [LINK ID]=0

 

 

 

    Update B Set B.[RowLog Contents 0] =

 

    (CASE WHEN A.[RowLog Contents 0] IS NOT NULL AND C.[RowLog Contents 0] IS NOT NULL THEN  A.[RowLog Contents 0]+C.[RowLog Contents 0] 

 

        WHEN A.[RowLog Contents 0] IS NULL AND C.[RowLog Contents 0] IS NOT NULL THEN  C.[RowLog Contents 0]

 

        WHEN A.[RowLog Contents 0] IS NOT NULL AND C.[RowLog Contents 0] IS NULL THEN  A.[RowLog Contents 0]  

 

        END)

 

    ,B.[Update]=ISNULL(B.[Update],0)+1

 

    from @ModifiedRawData B

 

    LEFT Join @ModifiedRawData A On A.[Page IDS]=Convert(int,Convert(Varbinary(Max),Reverse(Substring(B.[RowLog Contents 0],15+14,2))))

 

    And A.[File IDS]=Convert(int,Convert(Varbinary(Max),Reverse(Substring(B.[RowLog Contents 0],19+14,2)))) 

 

    And A.[Link ID]=B.[Link ID]

 

    LEFT Join @ModifiedRawData C On C.[Page IDS]=Convert(int,Convert(Varbinary(Max),Reverse(Substring(B.[RowLog Contents 0],27+14,2))))

 

    And C.[File IDS]=Convert(int,Convert(Varbinary(Max),Reverse(Substring(B.[RowLog Contents 0],31+14,2))))

 

    And C.[Link ID]=B.[Link ID]

 

    Where  (A.[RowLog Contents 0] IS NOT NULL OR C.[RowLog Contents 0] IS NOT NULL)

 

 

 

 

 

    Update B Set B.[RowLog Contents 0] =

 

    (CASE WHEN A.[RowLog Contents 0] IS NOT NULL AND C.[RowLog Contents 0] IS NOT NULL THEN  A.[RowLog Contents 0]+C.[RowLog Contents 0] 

 

        WHEN A.[RowLog Contents 0] IS NULL AND C.[RowLog Contents 0] IS NOT NULL THEN  C.[RowLog Contents 0]

 

        WHEN A.[RowLog Contents 0] IS NOT NULL AND C.[RowLog Contents 0] IS NULL THEN  A.[RowLog Contents 0]  

 

        END)

 

    --,B.[Update]=ISNULL(B.[Update],0)+1

 

    from @ModifiedRawData B

 

    LEFT Join @ModifiedRawData A On A.[Page IDS]=Convert(int,Convert(Varbinary(Max),Reverse(Substring(B.[RowLog Contents 0],15+14,2))))

 

    And A.[File IDS]=Convert(int,Convert(Varbinary(Max),Reverse(Substring(B.[RowLog Contents 0],19+14,2)))) 

 

    And A.[Link ID]<>B.[Link ID] And B.[Update]=0

 

    LEFT Join @ModifiedRawData C On C.[Page IDS]=Convert(int,Convert(Varbinary(Max),Reverse(Substring(B.[RowLog Contents 0],27+14,2))))

 

    And C.[File IDS]=Convert(int,Convert(Varbinary(Max),Reverse(Substring(B.[RowLog Contents 0],31+14,2))))

 

    And C.[Link ID]<>B.[Link ID] And B.[Update]=0

 

    Where  (A.[RowLog Contents 0] IS NOT NULL OR C.[RowLog Contents 0] IS NOT NULL)

 

 

 

    UPDATE @ModifiedRawData  SET [RowLog Contents 0] =  

 

    (Case When [RowLog Len]>=8000 Then

 

    Substring([RowLog Contents 0] ,15,[RowLog Len]) 

 

    When [RowLog Len]<8000 Then

 

    SUBSTRING([RowLog Contents 0],15+6,Convert(int,Convert(varbinary(max),REVERSE(Substring([RowLog Contents 0],15,6)))))

 

    End)

 

    FROM @ModifiedRawData Where [LINK ID]=0

 

 

 

    UPDATE @ColumnNameAndData SET [hex_Value]=[RowLog Contents 0] 

 

    --,A.[Update]=A.[Update]+1

 

    FROM @ColumnNameAndData A

 

    INNER JOIN @ModifiedRawData B ON

 

    Convert(int,Convert(Varbinary(Max),Reverse(Substring([hex_value],17,4))))=[PAGE IDS]

 

    AND  Convert(int,Substring([hex_value],9,2)) =B.[Link ID] 

 

    Where [System_Type_Id] In (99,167,175,231,239,241,165,98And [Link ID] <>0 

 

 

 

    UPDATE @ColumnNameAndData SET [hex_Value]=

 

    (CASE WHEN B.[RowLog Contents 0] IS NOT NULL AND C.[RowLog Contents 0] IS NOT NULL THEN  B.[RowLog Contents 0]+C.[RowLog Contents 0] 

 

    WHEN B.[RowLog Contents 0] IS NULL AND C.[RowLog Contents 0] IS NOT NULL THEN  C.[RowLog Contents 0]

 

    WHEN B.[RowLog Contents 0] IS NOT NULL AND C.[RowLog Contents 0] IS NULL THEN  B.[RowLog Contents 0]  

 

    END)

 

    --,A.[Update]=A.[Update]+1

 

    FROM @ColumnNameAndData A

 

    LEFT JOIN @ModifiedRawData B ON

 

    Convert(int,Convert(Varbinary(Max),Reverse(Substring([hex_value],5,4))))=B.[PAGE IDS]  And B.[Link ID] =0 

 

    LEFT JOIN @ModifiedRawData C ON

 

    Convert(int,Convert(Varbinary(Max),Reverse(Substring([hex_value],17,4))))=C.[PAGE IDS]  And C.[Link ID] =0 

 

    Where [System_Type_Id] In (99,167,175,231,239,241,165,98)  And (B.[RowLog Contents 0] IS NOT NULL OR C.[RowLog Contents 0] IS NOT NULL)

 

 

 

    UPDATE @ColumnNameAndData SET [hex_Value]=[RowLog Contents 0] 

 

    --,A.[Update]=A.[Update]+1

 

    FROM @ColumnNameAndData A

 

    INNER JOIN @ModifiedRawData B ON

 

    Convert(int,Convert(Varbinary(Max),Reverse(Substring([hex_value],9,4))))=[PAGE IDS]

 

    And Convert(int,Substring([hex_value],3,2))=[Link ID]

 

    Where [System_Type_Id] In (35,34,99And [Link ID] <>0 

 

     

 

    UPDATE @ColumnNameAndData SET [hex_Value]=[RowLog Contents 0]

 

    --,A.[Update]=A.[Update]+10

 

    FROM @ColumnNameAndData A

 

    INNER JOIN @ModifiedRawData B ON

 

    Convert(int,Convert(Varbinary(Max),Reverse(Substring([hex_value],9,4))))=[PAGE IDS]

 

    Where [System_Type_Id] In (35,34,99And [Link ID] =0

 

 

 

    UPDATE @ColumnNameAndData SET [hex_Value]=[RowLog Contents 0] 

 

    --,A.[Update]=A.[Update]+1

 

    FROM @ColumnNameAndData A

 

    INNER JOIN @ModifiedRawData B ON

 

    Convert(int,Convert(Varbinary(Max),Reverse(Substring([hex_value],15,4))))=[PAGE IDS]

 

    Where [System_Type_Id] In (35,34,99And [Link ID] =0

 

 

 

    Update @ColumnNameAndData set [hex_value]= 0xFFFE + Substring([hex_value],9,LEN([hex_value]))

 

    --,[Update]=[Update]+1

 

    Where [system_type_id]=241

 

 

 

CREATE TABLE [#temp_Data]

 

(

 

    [FieldName]  VARCHAR(MAX),

 

    [FieldValue] NVARCHAR(MAX),

 

    [Rowlogcontents] VARBINARY(8000),

 

    [Row ID] int

 

)

 

 

 

INSERT INTO #temp_Data

 

SELECT NAME,

 

CASE

 

 WHEN system_type_id IN (231239THEN  LTRIM(RTRIM(CONVERT(NVARCHAR(max),hex_Value)))  --NVARCHAR ,NCHAR

 

 WHEN system_type_id IN (167,175THEN  LTRIM(RTRIM(CONVERT(VARCHAR(max),hex_Value)))  --VARCHAR,CHAR

 

 WHEN system_type_id IN (35THEN  LTRIM(RTRIM(CONVERT(VARCHAR(max),hex_Value))) --Text

 

 WHEN system_type_id IN (99THEN  LTRIM(RTRIM(CONVERT(NVARCHAR(max),hex_Value))) --nText 

 

 WHEN system_type_id = 48 THEN CONVERT(VARCHAR(MAX), CONVERT(TINYINTCONVERT(BINARY(1), REVERSE (hex_Value)))) --TINY INTEGER

 

 WHEN system_type_id = 52 THEN CONVERT(VARCHAR(MAX), CONVERT(SMALLINTCONVERT(BINARY(2), REVERSE (hex_Value)))) --SMALL INTEGER

 

 WHEN system_type_id = 56 THEN CONVERT(VARCHAR(MAX), CONVERT(INTCONVERT(BINARY(4), REVERSE(hex_Value)))) -- INTEGER

 

 WHEN system_type_id = 127 THEN CONVERT(VARCHAR(MAX), CONVERT(BIGINTCONVERT(BINARY(8), REVERSE(hex_Value))))-- BIG INTEGER

 

 WHEN system_type_id = 61 Then CONVERT(VARCHAR(MAX),CONVERT(DATETIME,CONVERT(VARBINARY(8000),REVERSE (hex_Value))),100--DATETIME

 

 WHEN system_type_id =58 Then CONVERT(VARCHAR(MAX),CONVERT(SMALLDATETIME,CONVERT(VARBINARY(8000),REVERSE(hex_Value))),100--SMALL DATETIME

 

 WHEN system_type_id = 108 THEN CONVERT(VARCHAR(MAX),CONVERT(NUMERIC(38,20), CONVERT(VARBINARY,CONVERT(VARBINARY(1),xprec)+CONVERT(VARBINARY(1),xscale))+CONVERT(VARBINARY(1),0) + hex_Value)) --- NUMERIC

 

 WHEN system_type_id =106 THEN CONVERT(VARCHAR(MAX), CONVERT(DECIMAL(38,20), CONVERT(VARBINARY,Convert(VARBINARY(1),xprec)+CONVERT(VARBINARY(1),xscale))+CONVERT(VARBINARY(1),0) + hex_Value)) --- DECIMAL

 

 WHEN system_type_id In(60,122THEN CONVERT(VARCHAR(MAX),Convert(MONEY,Convert(VARBINARY(8000),Reverse(hex_Value))),2--MONEY,SMALLMONEY

 

 WHEN system_type_id = 104 THEN CONVERT(VARCHAR(MAX),CONVERT (BIT,CONVERT(BINARY(1), hex_Value)%2))  -- BIT

 

 WHEN system_type_id =62 THEN  RTRIM(LTRIM(STR(CONVERT(FLOAT,SIGN(CAST(CONVERT(VARBINARY(8000),Reverse(hex_Value)) AS BIGINT)) * (1.0 + (CAST(CONVERT(VARBINARY(8000),Reverse(hex_Value)) AS BIGINT) & 0x000FFFFFFFFFFFFF) * POWER(CAST(2 AS FLOAT), -52)) * POWER(CAST(2 AS FLOAT),((CAST(CONVERT(VARBINARY(8000),Reverse(hex_Value)) AS BIGINT) & 0x7ff0000000000000) / EXP(52 * LOG(2))-1023))),53,LEN(hex_Value)))) --- FLOAT

 

 When system_type_id =59 THEN  Left(LTRIM(STR(CAST(SIGN(CAST(Convert(VARBINARY(8000),REVERSE(hex_Value)) AS BIGINT))* (1.0 + (CAST(CONVERT(VARBINARY(8000),Reverse(hex_Value)) AS BIGINT) & 0x007FFFFF) * POWER(CAST(2 AS Real), -23)) * POWER(CAST(2 AS Real),(((CAST(CONVERT(VARBINARY(8000),Reverse(hex_Value)) AS INT) )& 0x7f800000)/ EXP(23 * LOG(2))-127))AS REAL),23,23)),8--Real

 

 WHEN system_type_id In (165,173THEN (CASE WHEN CHARINDEX(0x,cast('' AS XML).value('xs:hexBinary(sql:column("hex_Value"))''VARBINARY(8000)')) = 0 THEN '0x' ELSE '' END) +cast('' AS XML).value('xs:hexBinary(sql:column("hex_Value"))''varchar(max)'-- BINARY,VARBINARY

 

 WHEN system_type_id =34 THEN (CASE WHEN CHARINDEX(0x,cast('' AS XML).value('xs:hexBinary(sql:column("hex_Value"))''VARBINARY(8000)')) = 0 THEN '0x' ELSE '' END) +cast('' AS XML).value('xs:hexBinary(sql:column("hex_Value"))''varchar(max)')  --IMAGE

 

 WHEN system_type_id =36 THEN CONVERT(VARCHAR(MAX),CONVERT(UNIQUEIDENTIFIER,hex_Value)) --UNIQUEIDENTIFIER

 

 WHEN system_type_id =231 THEN CONVERT(VARCHAR(MAX),CONVERT(sysname,hex_Value)) --SYSNAME

 

 WHEN system_type_id =241 THEN CONVERT(VARCHAR(MAX),CONVERT(xml,hex_Value)) --XML

 

 

 

 WHEN system_type_id =189 THEN (CASE WHEN CHARINDEX(0x,cast('' AS XML).value('xs:hexBinary(sql:column("hex_Value"))''VARBINARY(8000)')) = 0 THEN '0x' ELSE '' END) +cast('' AS XML).value('xs:hexBinary(sql:column("hex_Value"))''varchar(max)'--TIMESTAMP

 

 WHEN system_type_id=98 THEN (CASE

 

 WHEN CONVERT(INT,SUBSTRING(hex_Value,1,1))=56 THEN CONVERT(VARCHAR(MAX), CONVERT(INTCONVERT(BINARY(4), REVERSE(Substring(hex_Value,3,Len(hex_Value))))))  -- INTEGER

 

 WHEN CONVERT(INT,SUBSTRING(hex_Value,1,1))=108 THEN CONVERT(VARCHAR(MAX),CONVERT(numeric(38,20),CONVERT(VARBINARY(1),Substring(hex_Value,3,1)) +CONVERT(VARBINARY(1),Substring(hex_Value,4,1))+CONVERT(VARBINARY(1),0) + Substring(hex_Value,5,Len(hex_Value)))) --- NUMERIC

 

 WHEN CONVERT(INT,SUBSTRING(hex_Value,1,1))=167 THEN LTRIM(RTRIM(CONVERT(VARCHAR(max),Substring(hex_Value,9,Len(hex_Value))))) --VARCHAR,CHAR

 

 WHEN CONVERT(INT,SUBSTRING(hex_Value,1,1))=36 THEN CONVERT(VARCHAR(MAX),CONVERT(UNIQUEIDENTIFIER,Substring((hex_Value),3,20))) --UNIQUEIDENTIFIER

 

 WHEN CONVERT(INT,SUBSTRING(hex_Value,1,1))=61 THEN CONVERT(VARCHAR(MAX),CONVERT(DATETIME,CONVERT(VARBINARY(8000),REVERSE (Substring(hex_Value,3,LEN(hex_Value)) ))),100--DATETIME

 

 WHEN CONVERT(INT,SUBSTRING(hex_Value,1,1))=165 THEN '0x'SUBSTRING((CASE WHEN CHARINDEX(0x,cast('' AS XML).value('xs:hexBinary(sql:column("hex_Value"))''VARBINARY(8000)')) = 0 THEN '0x' ELSE '' END) +cast('' AS XML).value('xs:hexBinary(sql:column("hex_Value"))''varchar(max)'),11,LEN(hex_Value)) -- BINARY,VARBINARY

 

 END)

 

  

 

END AS FieldValue

 

,[Rowlogcontents]

 

,[Row ID]

 

FROM @ColumnNameAndData ORDER BY nullbit

 

 

 

--Create the column name in the same order to do pivot table.

 

 

 

DECLARE @FieldName VARCHAR(max)

 

SET @FieldName = STUFF(

 

(

 

    SELECT ',' + CAST(QUOTENAME([Name]) AS VARCHAR(MAX)) FROM syscolumns WHERE id=object_id('' + @SchemaName_n_TableName + '')

 

    FOR XML PATH('')), 11'')

 

 

 

--Finally did pivot table and get the data back in the same format.

 

 

 

SET @sql = 'SELECT ' + @FieldName  + ' FROM #temp_Data PIVOT (Min([FieldValue]) FOR FieldName IN (' + @FieldName  + ')) AS pvt'

 

EXEC sp_executesql @sql

 

'Database > Query' 카테고리의 다른 글

INDEX 상세 정보 조회  (0) 2020.11.02
프로시저 파리미터  (0) 2020.10.20
EXEC와 동적(adhoc) 쿼리  (0) 2020.10.05
Find and Kill all the Blocked Process/Query  (0) 2020.09.22
오래된 커서  (0) 2020.09.11

개요

CDC라고도하는 변경 데이터 캡처는 추가 프로그래밍 작업없이 SQL Server 데이터베이스 테이블에서 수행되는 변경 사항을 추적하고 캡처하는 데 유용한 기능으로 SQL Server 2008 버전에 처음 도입되었습니다. SQL Server 2016 이전에는 SQL Server Enterprise 버전 에서만 SQL Server 데이터베이스에서 변경 데이터 캡처를 사용할 수 있었으며 SQL Server 2016부터는 필요하지 않았습니다.

변경 데이터 캡처는 데이터베이스 테이블에 대한 INSERT, UPDATE 및 DELETE 작업을 추적하고 원본 테이블의 동일한 열 구조와 이러한 변경 사항에 대한 설명을 기록하는 추가 열을 사용하여 미러링 된 테이블에 이러한 변경 사항에 대한 자세한 정보를 기록합니다. SQL Server는 삽입 된 값을 보여주는 각 INSERT 문에 대해 하나의 레코드, 삭제 된 데이터를 표시하는 각 DELETE 문에 대한 레코드와 각 UPDATE 문에 대해 두 개의 레코드를 기록합니다. 첫 번째는 변경 전 데이터를 표시하고 두 번째는 수행 후 데이터를 표시합니다. 변화.

추가 열에는 다음이 포함됩니다.

  • SQL Server 엔진에서 기록 된 변경 사항에 할당 한 커밋 로그 시퀀스 번호 (LSN)를 표시하는 __ $ start_lsn  __ $ end_lsn
  • 동일한 트랜잭션의 다른 변경과 관련된 변경 순서를 표시하는 __ $ seqval , 변경 작업 유형을 표시하는 __ $ operation ( 여기서 1 = 삭제, 2 = 삽입, 3 = 업데이트 (변경 전) 및 4) = 업데이트 (변경 후)
  • __ $ update_mask 는 캡처 된 각 열에 대해 정의 된 비트 마스크이며 업데이트 열을 식별합니다.

이 세부 정보를 통해 보안 또는 감사 목적으로 데이터베이스 변경 사항을 모니터링하거나 T-SQL 또는 ETL 방법을 사용하여 이러한 변경 사항을 OLTP 소스에서 대상 OLAP 데이터웨어 하우스로 점진적으로로드 할 수 있습니다.

설치 및 아키텍처

변경 데이터 캡처를 사용하려면 SQL Server 에이전트가 SQL Server 인스턴스에서 실행되고 있어야합니다. SQL Server 데이터베이스 테이블에서 기능이 활성화되면 해당 데이터베이스에 대해 두 개의 SQL Server 에이전트 작업이 생성됩니다. 첫 번째 작업은 데이터베이스 변경 테이블을 변경 정보로 채우고 두 번째 작업은 구성 가능한 보존 기간 인 3 일보다 오래된 레코드를 삭제하여 변경 테이블을 정리합니다.

변경 데이터 캡처는 데이터 변경의 소스 인 SQL Server 트랜잭션 로그에 의존합니다. 변경이 수행되면이 변경 사항이 트랜잭션 로그 파일에 기록됩니다.

해당 테이블에서 CDC 기능이 활성화 된 경우 CDC 기능에 대한 캡처 프로세스 역할을하는 트랜잭션 로그 복제 로그 판독기 에이전트는 트랜잭션 로그 파일에서 변경 로그를 읽고 이러한 변경 사항에 대한 메타 데이터 정보를 추가하고 아래와 같이 관련 CDC 변경 테이블 :

감사 솔루션으로서의 변경 데이터 캡처

변경 데이터 캡처는 비동기 SQL Server 감사 솔루션으로 사용되어 SELECT 문을 추적하는 옵션없이 INSERT, UPDATE 또는 DELETE 작업과 같은 테이블의 DML 변경을 추적하고 감사 할 수 있습니다.

Change Data Capture를 좋은 SQL Server Audit 솔루션으로 만드는 이유는 몇 가지 T-SQL 명령을 사용하여 쉽게 구성 할 수 있고 수정 프로세스 이전의 값에 대한 기록 정보를 제공하며 데이터 수정 프로세스에 대한 자세한 정보를 제공한다는 것입니다.

변경 데이터 캡처를 사용하여 SQL Server DML 변경 사항을 감사하는 방법을 살펴 보겠습니다.

CDC 활성화

감사를 위해 특정 테이블에서 변경 데이터 캡처를 사용하려면 먼저 SYSADMIN 고정 서버 역할의 구성원이 아래에 표시된대로 sys.sp_cdc_enable_db 시스템 저장 프로 시저를 사용하여 데이터베이스 수준에서 CDC를 먼저 사용하도록 설정해야 합니다. :

USE [CDCAudit]

GO

EXEC sys.sp_cdc_enable_db

GO

해당 데이터베이스에서 CDC가 활성화되었는지 확인하기 위해 다음과 같이 CDC가 활성화 된 데이터베이스 목록에 대해 sys.databases DMV를 쿼리합니다.

데이터베이스 수준에서 CDC를 활성화 한 후에는 sys.sp_cdc_enable_table 시스템 저장 프로 시저를 사용하여 db_owner 고정 데이터베이스 역할의 구성원이 데이터베이스 테이블의 DML 변경 사항을 추적하고 감사 할 수 있습니다. 아래에 표시된 것처럼 @captured_column_list 매개 변수로 지정된 열 목록의 변경 사항  @filegroup_name 매개 변수로 지정된 별도의 파일 그룹에 변경 테이블을 만듭니다 .

USE [CDCAudit]

GO  

EXEC sys.sp_cdc_enable_table  

@source_schema = N'dbo',  

@source_name   = N'Employee_Main',  

@role_name     = NULL,  

@filegroup_name = NULL,  

@supports_net_changes = 0

GO

해당 테이블에서 CDC가 활성화되었는지 확인하기 위해 아래와 같이 CDC가 활성화 된 현재 데이터베이스 아래의 모든 테이블에 대해 sys.tables DMV를 쿼리합니다.

테이블에서 CDC가 활성화되면 CDC 관련 정보를 저장하기 위해 데이터베이스의 CDC 스키마 아래에 여러 시스템 테이블이 생성됩니다. 이 테이블에는 다음이 포함됩니다.

  • 캡처 된 열 목록이 포함 된 CDC .captured_columns 테이블
  • 캡처에 사용할 수있는 테이블 목록이 포함 된 CDC .change_tables 테이블
  • 캡처 데이터가 활성화 된 이후의 모든 DDL 변경 내역을 기록하는 CDC .ddl_history 테이블
  • 변경 테이블과 연관된 모든 인덱스를 포함하는 CDC .index_columns 테이블
  • LSN 번호를 시간과 매핑하는 데 사용되는 CDC .lsn_time_mapping 테이블과 마지막으로 소스 테이블에서 DML 변경 사항을 캡처하는 데 사용되는 각 CDC 사용 테이블에 대한 하나의 변경 테이블이 아래와 같이 표시됩니다.

… 그리고 CDC 지원 테이블, 캡처 및 정리 작업과 관련된 SQL 에이전트 작업은 아래와 같이 생성됩니다.

CDC 비활성화

변경 데이터 캡처는 아래와 같이 sys.sp_cdc_disable_table 시스템 저장 프로 시저를 사용하여 특정 테이블에서 쉽게 비활성화 할 수 있습니다 .

… 또는 아래와 같이 sys.sp_cdc_disable_db 시스템 저장 프로 시저를 사용하여 CDC 활성화 테이블에서 하나씩 비활성화 할 필요없이 데이터베이스 수준에서 완전히 비활성화합니다 .

DML 변경 감사

데이터베이스 테이블에서 CDC를 활성화 한 후 해당 테이블에서 아래의 세 가지 데이터베이스 DML 변경 (INSERT, UPDATE, DELETE)을 수행하고 다음과 같이 CDC 기능을 사용하여 이러한 변경 사항을 감사하는 방법을 확인합니다.

앞서 언급했듯이 데이터 수정 사항은 CDC 사용 테이블과 관련된 변경 테이블에 기록됩니다 . 여기서는 [cdc]. [dbo_Employee_Main_CT] 테이블입니다. 소스 테이블에 최근 삽입 된 모든 레코드를보기 위해 유형 2의 모든 작업에 대한 변경 테이블을 쿼리 할 수 ​​있으며 삽입 된 값을 포함하여 INSERT 작업에 대한 전체 정보가 아래와 같이 표시됩니다.

유형 1의 모든 작업에 대한 변경 테이블을 쿼리하면 원본 테이블에서 최근에 삭제 된 모든 레코드가 반환되고 삭제 된 레코드의 값이 아래와 같이 표시됩니다.

마지막으로 유형 3 및 4의 작업에 대한 변경 테이블을 쿼리하여 UPDATE 문을 추적 할 수 있습니다. 그러면 업데이트 이전의 값이 작업 유형 3에, 변경 후 값이 작업 유형 4에 표시됩니다. 아래 그림과 같이:

변경 테이블 쿼리는 Microsoft에서 권장하지 않습니다. 대신 아래와 같이 CDC 사용 테이블과 관련된 CDC.fn_cdc_get_all_changes 시스템 함수를 쿼리 할 수 있습니다.

CDC.fn_cdc_get_all_changes 함수는 아래와 같이 모든 DML 변경 정보를 검색 하는 @from_lsn , @to_lsn , @row_filter_option 매개 변수 를 제공하여 쿼리 할 수 ​​있습니다 .

한계

변경 데이터 캡처를 사용 하면 데이터베이스 DML 변경 사항  감사 할 수 있지만 SELECT 문을 모니터링하는 옵션은 없지만 구성 작업은 무시할 수 있습니다. 반면 CDC를 SQL Server Audit 솔루션으로 고려하려면 상당한 유지 관리 및 관리 노력이 필요합니다. 여기에는 추적 데이터가 구성 가능한 일 수 동안 변경 테이블에 보관되고 동일하거나 다른 데이터 파일에 저장되기 때문에 보관 메커니즘 자동화가 포함됩니다.

또한 변경 테이블은 각 데이터베이스 아래에 저장되며 추적 된 각 테이블에 대해 함수가 생성됩니다. 이로 인해 번거롭고 동일한 데이터베이스의 모든 테이블, 동일한 인스턴스의 모든 데이터베이스 또는 여러 인스턴스의 DML 변경 정보를 읽는 통합 감사 보고서를 만드는 데 상당한 프로그래밍 노력이 필요합니다.

SQL Server Audit 솔루션으로서 CDC 기능에 대한 또 다른 제한은 CDC 사용 테이블에서 DDL 변경을 처리하는 데 필요한 어려운 프로세스입니다. 소스 테이블에서 변경 데이터 캡처를 사용하면 해당 테이블에서 DDL 변경을 수행하는 것을 방해하지 않기 때문입니다.

또한 SQL Server 에이전트 서비스가 실행되고 있지 않으면 CDC 캡처 작업이 작동하지 않으며 데이터베이스 복구 모델이 단순하더라도 CHECKPOINT가 실행 중이더라도 로그 잘림이 진행되지 않기 때문에 데이터베이스 로그 파일이 빠르게 증가합니다. 캡처를 기다리는 모든 변경 사항이 CDC 캡처 프로세스에 의해 수집 될 때까지 수행됩니다.

+ Recent posts