CREATE OR ALTER PROCEDURE auditlog.usp_AuditProcessing AS SET NOCOUNT ON; DECLARE @messagetype sysname ,@handle uniqueidentifier ,@message_body nvarchar(max) ,@json_part nvarchar(2000) ,@entity varchar(100) ,@reason varchar(100) ,@datetime datetime ,@action tinyint = 0 ,@userid int ,@login nvarchar(1000) ,@hostname nvarchar(1000) ,@rowcount int ,@limit int = 100; -- table for inserted rows CREATE TABLE #ins( C_ENTITYID INT NOT NULL, C_PROPERTYNAME VARCHAR(100) NOT NULL, C_INTEZMENYID INT NOT NULL, C_TANEVID INT NOT NULL, C_VALUE NVARCHAR(MAX) NULL, PRIMARY KEY CLUSTERED (C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME) ) -- table for deleted rows CREATE TABLE #del( C_ENTITYID INT NOT NULL, C_PROPERTYNAME VARCHAR(100) NOT NULL, C_INTEZMENYID INT NOT NULL, C_TANEVID INT NOT NULL, C_VALUE NVARCHAR(MAX) NULL, PRIMARY KEY CLUSTERED (C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME) ) -- table for audit rows CREATE TABLE #audit( C_ENTITYID INT NOT NULL, C_INTEZMENYID INT NOT NULL, C_TANEVID INT NOT NULL, C_FELHASZNALOID INT NULL, C_PROPERTYNAME VARCHAR(1000) NULL, C_ORIGINALVALUE NVARCHAR(MAX) NULL, C_CURRENTVALUE NVARCHAR(MAX) NULL ); -- table for ids inserted into ENTITYHISTORY CREATE TABLE #entity( ENTITYID INT NOT NULL, INTEZMENYID INT NOT NULL, TANEVID INT NOT NULL, ENTITYHISTORYID INT NOT NULL, PRIMARY KEY CLUSTERED (ENTITYID, INTEZMENYID, TANEVID) ); WHILE (1 = 1) BEGIN BEGIN TRY TRUNCATE TABLE #ins; TRUNCATE TABLE #del; TRUNCATE TABLE #audit; TRUNCATE TABLE #entity; BEGIN TRANSACTION; -- take 1 message from queue WAITFOR ( RECEIVE TOP(1) @handle = conversation_handle, @message_body = message_body, @messagetype = message_type_name FROM auditlog.AuditQueue ), TIMEOUT 1000; -- if there are no more rows in the queue then exit IF @@ROWCOUNT = 0 BREAK; IF @messagetype <> 'AuditMessage' GOTO CONTINUE_PROCESS; -- take the beginning of the message and search for "ROWS" node SET @json_part = LEFT(@message_body, 2000); IF @json_part LIKE '%"ROWS"%' BEGIN SET @json_part = LEFT(@json_part, PATINDEX('%"ROWS"%', @json_part) - 1); SET @json_part = @json_part + '"ROWS":null}'; END ELSE BEGIN INSERT INTO auditlog.AuditServiceFailedItems([conversation_handle], [message_body], [error_message]) VALUES (@handle, @message_body, 'No "ROWS" in first 2000 chars'); GOTO CONTINUE_PROCESS; END -- lightweight parsing of the message for headers SELECT @entity = CASE WHEN j.[key] = 'ENTITY' THEN j.[value] ELSE @entity END, @reason = CASE WHEN j.[key] = 'REASON' THEN j.[value] ELSE @reason END, @datetime = CASE WHEN j.[key] = 'DATETIME' THEN CONVERT(datetime, j.[value], 121) ELSE @datetime END, @action = CASE WHEN j.[key] = 'ACTION' THEN j.[value] ELSE @action END, @hostname = CASE WHEN j.[key] = 'HOST' THEN j.[value] ELSE @hostname END, @login = CASE WHEN j.[key] = 'LOGIN' THEN j.[value] ELSE @login END, @userid = CASE WHEN j.[key] = 'USERID' THEN j.[value] ELSE @userid END, @rowcount = CASE WHEN j.[key] = 'ROWCOUNT' THEN j.[value] ELSE @rowcount END FROM OPENJSON(@json_part) j WHERE j.[key] in ('ENTITY', 'REASON', 'DATETIME', 'ACTION', 'HOST', 'LOGIN', 'USERID', 'ROWCOUNT'); -- user parsing IF NOT EXISTS(SELECT * FROM dbo.T_FELHASZNALO WHERE ID = @userid) BEGIN INSERT INTO auditlog.Users(USERNAME) SELECT @login AS USERNAME WHERE NOT EXISTS(SELECT * FROM auditlog.Users WHERE USERNAME = @login); SET @userid = null; END IF @rowcount > @limit INSERT INTO #ins(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_VALUE) SELECT JSON_VALUE(j.value, '$.ID') AS C_ENTITYID, JSON_VALUE(j.value, '$.IID') AS C_INTEZMENYID, JSON_VALUE(j.value, '$.TID') AS C_TANEVID, i.[key] AS C_PROPERTYNAME, i.[value] AS C_VALUE FROM OPENJSON(@message_body, '$.ROWS') j CROSS APPLY OPENJSON(j.value, '$.new') i OPTION (MAXDOP 1); IF @rowcount > @limit INSERT INTO #del(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_VALUE) SELECT JSON_VALUE(j.value, '$.ID') AS C_ENTITYID, JSON_VALUE(j.value, '$.IID') AS C_INTEZMENYID, JSON_VALUE(j.value, '$.TID') AS C_TANEVID, d.[key] AS C_PROPERTYNAME, d.[value] AS C_VALUE FROM OPENJSON(@message_body, '$.ROWS') j CROSS APPLY OPENJSON(j.value, '$.old') d OPTION (MAXDOP 1); -- extract audit rows from message IF @action = 1 -- INSERT IF @rowcount > @limit INSERT INTO #audit(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE) SELECT i.C_ENTITYID, i.C_INTEZMENYID, i.C_TANEVID, i.C_PROPERTYNAME AS C_PROPERTYNAME, NULL AS C_ORIGINALVALUE, i.C_VALUE AS C_CURRENTVALUE FROM #ins i OPTION (MAXDOP 1); ELSE INSERT INTO #audit(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE) SELECT JSON_VALUE(j.value, '$.ID') AS C_ENTITYID, JSON_VALUE(j.value, '$.IID') AS C_INTEZMENYID, JSON_VALUE(j.value, '$.TID') AS C_TANEVID, i.[key] AS C_PROPERTYNAME, NULL AS C_ORIGINALVALUE, i.[value] AS C_CURRENTVALUE FROM OPENJSON(@message_body, '$.ROWS') j OUTER APPLY OPENJSON(j.value, '$.new') i OPTION (MAXDOP 1); IF @action = 2 -- DELETE IF @rowcount > @limit INSERT INTO #audit(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE) SELECT d.C_ENTITYID, d.C_INTEZMENYID, d.C_TANEVID, d.C_PROPERTYNAME AS C_PROPERTYNAME, d.C_VALUE AS C_ORIGINALVALUE, NULL AS C_CURRENTVALUE FROM #del d OPTION (MAXDOP 1); ELSE INSERT INTO #audit(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE) SELECT JSON_VALUE(j.value, '$.ID') AS C_ENTITYID, JSON_VALUE(j.value, '$.IID') AS C_INTEZMENYID, JSON_VALUE(j.value, '$.TID') AS C_TANEVID, d.[key] AS C_PROPERTYNAME, d.[value] AS C_ORIGINALVALUE, NULL AS C_CURRENTVALUE FROM OPENJSON(@message_body, '$.ROWS') j OUTER APPLY OPENJSON(j.value, '$.old') d OPTION (MAXDOP 1); IF @action = 3 -- UPDATE IF @rowcount > @limit INSERT INTO #audit(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE) SELECT ISNULL(i.C_ENTITYID, d.C_ENTITYID) AS C_ENTITYID, ISNULL(i.C_INTEZMENYID, d.C_INTEZMENYID) AS C_INTEZMENYID, ISNULL(i.C_TANEVID, d.C_TANEVID) AS C_TANEVID, ISNULL(i.C_PROPERTYNAME, d.C_PROPERTYNAME) AS C_PROPERTYNAME, d.C_VALUE AS C_ORIGINALVALUE, i.C_VALUE AS C_CURRENTVALUE FROM #ins i INNER MERGE JOIN #del d ON i.C_ENTITYID = d.C_ENTITYID AND i.C_INTEZMENYID = d.C_INTEZMENYID AND i.C_TANEVID = d.C_TANEVID AND i.C_PROPERTYNAME = d.C_PROPERTYNAME OPTION (MAXDOP 1); ELSE INSERT INTO #audit(C_ENTITYID, C_INTEZMENYID, C_TANEVID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE) SELECT JSON_VALUE(j.value, '$.ID') AS C_ENTITYID, JSON_VALUE(j.value, '$.IID') AS C_INTEZMENYID, JSON_VALUE(j.value, '$.TID') AS C_TANEVID, ISNULL(i.[key], d.[key]) AS C_PROPERTYNAME, d.[value] AS C_ORIGINALVALUE, i.[value] AS C_CURRENTVALUE FROM OPENJSON(@message_body, '$.ROWS') j OUTER APPLY OPENJSON(j.value, '$.new') i OUTER APPLY OPENJSON(j.value, '$.old') d WHERE i.[key] = d.[key] OPTION (MAXDOP 1); BEGIN TRANSACTION T1; -- fill entityhistory and save generated ids INSERT INTO dbo.T_ENTITYHISTORY(C_ENTITYNAME, C_ENTITYID, C_REASON, C_INTEZMENYID, C_TANEVID, C_FELHASZNALOID, C_ALTERATIONDATE, CREATED, HOSTNAME) OUTPUT inserted.C_ENTITYID, inserted.C_INTEZMENYID, inserted.C_TANEVID, inserted.ID INTO #entity(ENTITYID, INTEZMENYID, TANEVID, ENTITYHISTORYID) SELECT DISTINCT @entity AS C_ENTITYNAME, C_ENTITYID, @reason AS C_REASON, C_INTEZMENYID, C_TANEVID, COALESCE(@userid, (SELECT USERID FROM auditlog.Users WHERE USERNAME = @login)) AS C_FELHASZNALOID, @datetime AS C_ALTERATIONDATE, GETDATE() AS CREATED, @hostname AS HOSTNAME FROM #audit OPTION (MAXDOP 1); -- fill attributehistory for small values INSERT INTO dbo.T_ENTITYATTRIBUTEHISTORY(C_ENTITYHISTORYID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE) SELECT e.ENTITYHISTORYID AS C_ENTITYHISTORYID, a.C_PROPERTYNAME, a.C_ORIGINALVALUE, a.C_CURRENTVALUE FROM #audit a INNER JOIN #entity e ON e.ENTITYID = a.C_ENTITYID and e.INTEZMENYID = a.C_INTEZMENYID and e.TANEVID = a.C_TANEVID WHERE ( (@action IN (1, 2)) OR (@action = 3 AND ISNULL(a.C_ORIGINALVALUE, '') <> ISNULL(a.C_CURRENTVALUE, '')) OR (@action = 3 AND a.C_ORIGINALVALUE IS NULL AND a.C_CURRENTVALUE IS NOT NULL) OR (@action = 3 AND a.C_ORIGINALVALUE IS NOT NULL AND a.C_CURRENTVALUE IS NULL) ) AND (ISNULL(DATALENGTH(a.C_ORIGINALVALUE), 0) <= 2000 AND ISNULL(DATALENGTH(a.C_CURRENTVALUE), 0) <= 2000) OPTION (MAXDOP 1); -- fill attributehistory for big values INSERT INTO dbo.T_ENTITYBLOBVALUES(C_ENTITYHISTORYID, C_PROPERTYNAME, C_ORIGINALVALUE, C_CURRENTVALUE) SELECT e.ENTITYHISTORYID AS C_ENTITYHISTORYID, a.C_PROPERTYNAME, a.C_ORIGINALVALUE, a.C_CURRENTVALUE FROM #audit a INNER JOIN #entity e ON e.ENTITYID = a.C_ENTITYID and e.INTEZMENYID = a.C_INTEZMENYID and e.TANEVID = a.C_TANEVID WHERE ( (@action IN (1, 2)) OR (@action = 3 AND ISNULL(a.C_ORIGINALVALUE, '') <> ISNULL(a.C_CURRENTVALUE, '')) OR (@action = 3 AND a.C_ORIGINALVALUE IS NULL AND a.C_CURRENTVALUE IS NOT NULL) OR (@action = 3 AND a.C_ORIGINALVALUE IS NOT NULL AND a.C_CURRENTVALUE IS NULL) ) AND (ISNULL(DATALENGTH(a.C_ORIGINALVALUE), 0) > 2000 OR ISNULL(DATALENGTH(a.C_CURRENTVALUE), 0) > 2000) OPTION (MAXDOP 1); COMMIT TRANSACTION T1; -- commit changes in ENTITYHISTORY CONTINUE_PROCESS: IF @@TRANCOUNT > 0 COMMIT TRANSACTION; -- commit changes in the Queue END TRY BEGIN CATCH DECLARE @E NVARCHAR(MAX) = dbo.fn_get_error_info(); IF @@TRANCOUNT > 1 ROLLBACK TRANSACTION T1; -- rollback changes in ENTITYHISTORY INSERT auditlog.ErrorLog(UserName, ErrorNumber, ErrorSeverity, ErrorState, ErrorProcedure, ErrorLine, ErrorMessage) VALUES(SUSER_NAME(), JSON_VALUE(@E, '$.ERROR_NUMBER'), JSON_VALUE(@E, '$.ERROR_SEVERITY'), JSON_VALUE(@E, '$.ERROR_STATE'), JSON_VALUE(@E, '$.ERROR_PROCEDURE'), JSON_VALUE(@E, '$.ERROR_LINE'), JSON_VALUE(@E, '$.ERROR_MESSAGE')); INSERT INTO auditlog.AuditServiceFailedItems([conversation_handle], [message_body], [error_message]) VALUES(@handle, @message_body, JSON_VALUE(@E, '$.ERROR_MESSAGE')); IF @@TRANCOUNT > 0 COMMIT TRANSACTION; -- commit changes in the Queue END CATCH END -- end of while IF @@TRANCOUNT > 0 COMMIT TRANSACTION; -- commit any changes -- close orphaned conversations DECLARE @conversations TABLE([conversation_handle] uniqueidentifier NOT NULL PRIMARY KEY CLUSTERED); INSERT INTO @conversations([conversation_handle]) SELECT e.[conversation_handle] FROM sys.conversation_endpoints e WITH (NOLOCK) WHERE e.[state] IN ('ER', 'DI', 'DO', 'CD') AND e.far_service = 'AuditService' AND NOT EXISTS(SELECT * FROM auditlog.AuditQueue a WITH (NOLOCK) WHERE a.[conversation_handle] = e.[conversation_handle]) OPTION (HASH JOIN); SET @handle = 0x0; WHILE (1 = 1) BEGIN SELECT TOP 1 @handle = [conversation_handle] FROM @conversations WHERE [conversation_handle] > @handle ORDER BY [conversation_handle]; IF @@ROWCOUNT = 0 BREAK; BEGIN TRY END CONVERSATION @handle WITH CLEANUP; END TRY BEGIN CATCH END CATCH END GO