kreta/Kreta.DataAccess.Migrations/DBScripts/Database/auditlog/usp_AuditProcessing.sql
2024-03-13 00:33:46 +01:00

351 lines
No EOL
14 KiB
Transact-SQL

CREATE OR ALTER PROCEDURE auditlog.usp_AuditProcessing
AS
SET NOCOUNT ON;
DECLARE @messagetype sysname
,@handle uniqueidentifier
,@message_body nvarchar(max)
,@json_part nvarchar(2000)
,@entity varchar(100)
,@reason varchar(100)
,@datetime datetime
,@action tinyint = 0
,@userid int
,@login nvarchar(1000)
,@hostname nvarchar(1000)
,@rowcount int
,@limit int = 100;
-- table for inserted rows
CREATE TABLE #ins(
C_ENTITYID INT NOT NULL,
C_PROPERTYNAME VARCHAR(100) NOT NULL,
C_INTEZMENYID INT NOT NULL,
C_TANEVID INT NOT NULL,
C_VALUE NVARCHAR(MAX) NULL,
PRIMARY KEY CLUSTERED (C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME)
)
-- table for deleted rows
CREATE TABLE #del(
C_ENTITYID INT NOT NULL,
C_PROPERTYNAME VARCHAR(100) NOT NULL,
C_INTEZMENYID INT NOT NULL,
C_TANEVID INT NOT NULL,
C_VALUE NVARCHAR(MAX) NULL,
PRIMARY KEY CLUSTERED (C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME)
)
-- table for audit rows
CREATE TABLE #audit(
C_ENTITYID INT NOT NULL,
C_INTEZMENYID INT NOT NULL,
C_TANEVID INT NOT NULL,
C_FELHASZNALOID INT NULL,
C_PROPERTYNAME VARCHAR(1000) NULL,
C_ORIGINALVALUE NVARCHAR(MAX) NULL,
C_CURRENTVALUE NVARCHAR(MAX) NULL
);
-- table for ids inserted into ENTITYHISTORY
CREATE TABLE #entity(
ENTITYID INT NOT NULL,
INTEZMENYID INT NOT NULL,
TANEVID INT NOT NULL,
ENTITYHISTORYID INT NOT NULL,
PRIMARY KEY CLUSTERED (ENTITYID, INTEZMENYID, TANEVID)
);
WHILE (1 = 1)
BEGIN
BEGIN TRY
TRUNCATE TABLE #ins;
TRUNCATE TABLE #del;
TRUNCATE TABLE #audit;
TRUNCATE TABLE #entity;
BEGIN TRANSACTION;
-- take 1 message from queue
WAITFOR (
RECEIVE TOP(1)
@handle = conversation_handle,
@message_body = message_body,
@messagetype = message_type_name
FROM auditlog.AuditQueue
), TIMEOUT 1000;
-- if there are no more rows in the queue then exit
IF @@ROWCOUNT = 0 BREAK;
IF @messagetype <> 'AuditMessage'
GOTO CONTINUE_PROCESS;
-- take the beginning of the message and search for "ROWS" node
SET @json_part = LEFT(@message_body, 2000);
IF @json_part LIKE '%"ROWS"%'
BEGIN
SET @json_part = LEFT(@json_part, PATINDEX('%"ROWS"%', @json_part) - 1);
SET @json_part = @json_part + '"ROWS":null}';
END
ELSE
BEGIN
INSERT INTO auditlog.AuditServiceFailedItems([conversation_handle], [message_body], [error_message])
VALUES (@handle, @message_body, 'No "ROWS" in first 2000 chars');
GOTO CONTINUE_PROCESS;
END
-- lightweight parsing of the message for headers
SELECT
@entity = CASE WHEN j.[key] = 'ENTITY' THEN j.[value] ELSE @entity END,
@reason = CASE WHEN j.[key] = 'REASON' THEN j.[value] ELSE @reason END,
@datetime = CASE WHEN j.[key] = 'DATETIME' THEN CONVERT(datetime, j.[value], 121) ELSE @datetime END,
@action = CASE WHEN j.[key] = 'ACTION' THEN j.[value] ELSE @action END,
@hostname = CASE WHEN j.[key] = 'HOST' THEN j.[value] ELSE @hostname END,
@login = CASE WHEN j.[key] = 'LOGIN' THEN j.[value] ELSE @login END,
@userid = CASE WHEN j.[key] = 'USERID' THEN j.[value] ELSE @userid END,
@rowcount = CASE WHEN j.[key] = 'ROWCOUNT' THEN j.[value] ELSE @rowcount END
FROM OPENJSON(@json_part) j
WHERE j.[key] in ('ENTITY', 'REASON', 'DATETIME', 'ACTION', 'HOST', 'LOGIN', 'USERID', 'ROWCOUNT');
-- user parsing
IF NOT EXISTS(SELECT * FROM dbo.T_FELHASZNALO WHERE ID = @userid)
BEGIN
INSERT INTO auditlog.Users(USERNAME)
SELECT @login AS USERNAME
WHERE NOT EXISTS(SELECT * FROM auditlog.Users WHERE USERNAME = @login);
SET @userid = null;
END
IF @rowcount > @limit
INSERT INTO #ins(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_VALUE)
SELECT
JSON_VALUE(j.value, '$.ID') AS C_ENTITYID,
JSON_VALUE(j.value, '$.IID') AS C_INTEZMENYID,
JSON_VALUE(j.value, '$.TID') AS C_TANEVID,
i.[key] AS C_PROPERTYNAME,
i.[value] AS C_VALUE
FROM OPENJSON(@message_body, '$.ROWS') j
CROSS APPLY OPENJSON(j.value, '$.new') i
OPTION (MAXDOP 1);
IF @rowcount > @limit
INSERT INTO #del(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_VALUE)
SELECT
JSON_VALUE(j.value, '$.ID') AS C_ENTITYID,
JSON_VALUE(j.value, '$.IID') AS C_INTEZMENYID,
JSON_VALUE(j.value, '$.TID') AS C_TANEVID,
d.[key] AS C_PROPERTYNAME,
d.[value] AS C_VALUE
FROM OPENJSON(@message_body, '$.ROWS') j
CROSS APPLY OPENJSON(j.value, '$.old') d
OPTION (MAXDOP 1);
-- extract audit rows from message
IF @action = 1 -- INSERT
IF @rowcount > @limit
INSERT INTO #audit(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE)
SELECT
i.C_ENTITYID,
i.C_INTEZMENYID,
i.C_TANEVID,
i.C_PROPERTYNAME AS C_PROPERTYNAME,
NULL AS C_ORIGINALVALUE,
i.C_VALUE AS C_CURRENTVALUE
FROM #ins i
OPTION (MAXDOP 1);
ELSE
INSERT INTO #audit(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE)
SELECT
JSON_VALUE(j.value, '$.ID') AS C_ENTITYID,
JSON_VALUE(j.value, '$.IID') AS C_INTEZMENYID,
JSON_VALUE(j.value, '$.TID') AS C_TANEVID,
i.[key] AS C_PROPERTYNAME,
NULL AS C_ORIGINALVALUE,
i.[value] AS C_CURRENTVALUE
FROM OPENJSON(@message_body, '$.ROWS') j
OUTER APPLY OPENJSON(j.value, '$.new') i
OPTION (MAXDOP 1);
IF @action = 2 -- DELETE
IF @rowcount > @limit
INSERT INTO #audit(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE)
SELECT
d.C_ENTITYID,
d.C_INTEZMENYID,
d.C_TANEVID,
d.C_PROPERTYNAME AS C_PROPERTYNAME,
d.C_VALUE AS C_ORIGINALVALUE,
NULL AS C_CURRENTVALUE
FROM #del d
OPTION (MAXDOP 1);
ELSE
INSERT INTO #audit(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE)
SELECT
JSON_VALUE(j.value, '$.ID') AS C_ENTITYID,
JSON_VALUE(j.value, '$.IID') AS C_INTEZMENYID,
JSON_VALUE(j.value, '$.TID') AS C_TANEVID,
d.[key] AS C_PROPERTYNAME,
d.[value] AS C_ORIGINALVALUE,
NULL AS C_CURRENTVALUE
FROM OPENJSON(@message_body, '$.ROWS') j
OUTER APPLY OPENJSON(j.value, '$.old') d
OPTION (MAXDOP 1);
IF @action = 3 -- UPDATE
IF @rowcount > @limit
INSERT INTO #audit(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE)
SELECT
ISNULL(i.C_ENTITYID, d.C_ENTITYID) AS C_ENTITYID,
ISNULL(i.C_INTEZMENYID, d.C_INTEZMENYID) AS C_INTEZMENYID,
ISNULL(i.C_TANEVID, d.C_TANEVID) AS C_TANEVID,
ISNULL(i.C_PROPERTYNAME, d.C_PROPERTYNAME) AS C_PROPERTYNAME,
d.C_VALUE AS C_ORIGINALVALUE,
i.C_VALUE AS C_CURRENTVALUE
FROM #ins i
INNER MERGE JOIN #del d ON i.C_ENTITYID = d.C_ENTITYID
AND i.C_INTEZMENYID = d.C_INTEZMENYID
AND i.C_TANEVID = d.C_TANEVID
AND i.C_PROPERTYNAME = d.C_PROPERTYNAME
OPTION (MAXDOP 1);
ELSE
INSERT INTO #audit(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE)
SELECT
JSON_VALUE(j.value, '$.ID') AS C_ENTITYID,
JSON_VALUE(j.value, '$.IID') AS C_INTEZMENYID,
JSON_VALUE(j.value, '$.TID') AS C_TANEVID,
ISNULL(i.[key], d.[key]) AS C_PROPERTYNAME,
d.[value] AS C_ORIGINALVALUE,
i.[value] AS C_CURRENTVALUE
FROM OPENJSON(@message_body, '$.ROWS') j
OUTER APPLY OPENJSON(j.value, '$.new') i
OUTER APPLY OPENJSON(j.value, '$.old') d
WHERE i.[key] = d.[key]
OPTION (MAXDOP 1);
BEGIN TRANSACTION T1;
-- fill entityhistory and save generated ids
INSERT INTO dbo.T_ENTITYHISTORY(C_ENTITYNAME, C_ENTITYID, C_REASON, C_INTEZMENYID, C_TANEVID, C_FELHASZNALOID, C_ALTERATIONDATE, CREATED, HOSTNAME)
OUTPUT inserted.C_ENTITYID, inserted.C_INTEZMENYID, inserted.C_TANEVID, inserted.ID INTO #entity(ENTITYID, INTEZMENYID, TANEVID, ENTITYHISTORYID)
SELECT DISTINCT
@entity AS C_ENTITYNAME,
C_ENTITYID,
@reason AS C_REASON,
C_INTEZMENYID,
C_TANEVID,
COALESCE(@userid, (SELECT USERID FROM auditlog.Users WHERE USERNAME = @login)) AS C_FELHASZNALOID,
@datetime AS C_ALTERATIONDATE,
GETDATE() AS CREATED,
@hostname AS HOSTNAME
FROM #audit
OPTION (MAXDOP 1);
-- fill attributehistory for small values
INSERT INTO dbo.T_ENTITYATTRIBUTEHISTORY(C_ENTITYHISTORYID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE)
SELECT
e.ENTITYHISTORYID AS C_ENTITYHISTORYID,
a.C_PROPERTYNAME,
a.C_ORIGINALVALUE,
a.C_CURRENTVALUE
FROM #audit a
INNER JOIN #entity e ON e.ENTITYID = a.C_ENTITYID and e.INTEZMENYID = a.C_INTEZMENYID and e.TANEVID = a.C_TANEVID
WHERE (
(@action IN (1, 2))
OR
(@action = 3 AND ISNULL(a.C_ORIGINALVALUE, '') <> ISNULL(a.C_CURRENTVALUE, ''))
OR
(@action = 3 AND a.C_ORIGINALVALUE IS NULL AND a.C_CURRENTVALUE IS NOT NULL)
OR
(@action = 3 AND a.C_ORIGINALVALUE IS NOT NULL AND a.C_CURRENTVALUE IS NULL)
)
AND (ISNULL(DATALENGTH(a.C_ORIGINALVALUE), 0) <= 2000 AND ISNULL(DATALENGTH(a.C_CURRENTVALUE), 0) <= 2000)
OPTION (MAXDOP 1);
-- fill attributehistory for big values
INSERT INTO dbo.T_ENTITYBLOBVALUES(C_ENTITYHISTORYID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE)
SELECT
e.ENTITYHISTORYID AS C_ENTITYHISTORYID,
a.C_PROPERTYNAME,
a.C_ORIGINALVALUE,
a.C_CURRENTVALUE
FROM #audit a
INNER JOIN #entity e ON e.ENTITYID = a.C_ENTITYID and e.INTEZMENYID = a.C_INTEZMENYID and e.TANEVID = a.C_TANEVID
WHERE (
(@action IN (1, 2))
OR
(@action = 3 AND ISNULL(a.C_ORIGINALVALUE, '') <> ISNULL(a.C_CURRENTVALUE, ''))
OR
(@action = 3 AND a.C_ORIGINALVALUE IS NULL AND a.C_CURRENTVALUE IS NOT NULL)
OR
(@action = 3 AND a.C_ORIGINALVALUE IS NOT NULL AND a.C_CURRENTVALUE IS NULL)
)
AND (ISNULL(DATALENGTH(a.C_ORIGINALVALUE), 0) > 2000 OR ISNULL(DATALENGTH(a.C_CURRENTVALUE), 0) > 2000)
OPTION (MAXDOP 1);
COMMIT TRANSACTION T1; -- commit changes in ENTITYHISTORY
CONTINUE_PROCESS:
IF @@TRANCOUNT > 0 COMMIT TRANSACTION; -- commit changes in the Queue
END TRY
BEGIN CATCH
DECLARE @E NVARCHAR(MAX) = dbo.fn_get_error_info();
IF @@TRANCOUNT > 1 ROLLBACK TRANSACTION T1; -- rollback changes in ENTITYHISTORY
INSERT auditlog.ErrorLog(UserName, ErrorNumber, ErrorSeverity, ErrorState, ErrorProcedure, ErrorLine, ErrorMessage)
VALUES(SUSER_NAME(), JSON_VALUE(@E, '$.ERROR_NUMBER'), JSON_VALUE(@E, '$.ERROR_SEVERITY'), JSON_VALUE(@E, '$.ERROR_STATE'), JSON_VALUE(@E, '$.ERROR_PROCEDURE'), JSON_VALUE(@E, '$.ERROR_LINE'), JSON_VALUE(@E, '$.ERROR_MESSAGE'));
INSERT INTO auditlog.AuditServiceFailedItems([conversation_handle], [message_body], [error_message])
VALUES(@handle, @message_body, JSON_VALUE(@E, '$.ERROR_MESSAGE'));
IF @@TRANCOUNT > 0 COMMIT TRANSACTION; -- commit changes in the Queue
END CATCH
END -- end of while
IF @@TRANCOUNT > 0 COMMIT TRANSACTION; -- commit any changes
-- close orphaned conversations
DECLARE @conversations TABLE([conversation_handle] uniqueidentifier NOT NULL PRIMARY KEY CLUSTERED);
INSERT INTO @conversations([conversation_handle])
SELECT e.[conversation_handle]
FROM sys.conversation_endpoints e WITH (NOLOCK)
WHERE e.[state] IN ('ER', 'DI', 'DO', 'CD')
AND e.far_service = 'AuditService'
AND NOT EXISTS(SELECT * FROM auditlog.AuditQueue a WITH (NOLOCK) WHERE a.[conversation_handle] = e.[conversation_handle])
OPTION (HASH JOIN);
SET @handle = 0x0;
WHILE (1 = 1)
BEGIN
SELECT TOP 1 @handle = [conversation_handle]
FROM @conversations
WHERE [conversation_handle] > @handle
ORDER BY [conversation_handle];
IF @@ROWCOUNT = 0 BREAK;
BEGIN TRY
END CONVERSATION @handle WITH CLEANUP;
END TRY
BEGIN CATCH
END CATCH
END
GO