사전 조건
- 도커가 설치된 Linux 컴퓨터, 클릭하우스.
| IP | 포트 | 지침 |
|---|---|---|
| 3413 | 메인 라이브러리 | |
| 3414 | 노예 |
지식 요구 사항
아키텍처 다이어그램
Sqlserver마스터-슬레이브 설정
데이터베이스 준비
version: '2.1'
services:
sqlserver-master:
image: mcr.microsoft.com/mssql/server:2019-latest
container_name: sqlserver-master
ports:
- "1433:1433"
environment:
- "MSSQL_AGENT_ENABLED=true"
- "MSSQL_PID=Standard"
- "ACCEPT_EULA=Y"
- "SA_PASSWORD=P@ssw0rd01"
volumes:
- /dataroot/tools/sqlserver/data-master:/var/opt/mssql
- /dataroot/tools/sqlserver/ReplData:/var/opt/mssql/ReplData
sqlserver-slave:
image: mcr.microsoft.com/mssql/server:2019-latest
container_name: sqlserver-slave
ports:
- "1434:1433"
environment:
- "MSSQL_AGENT_ENABLED=true"
- "MSSQL_PID=Standard"
- "ACCEPT_EULA=Y"
- "SA_PASSWORD=P@ssw0rd02"
volumes:
- /dataroot/tools/sqlserver/data-slave:/var/opt/mssql
- /dataroot/tools/sqlserver/ReplData:/var/opt/mssql/ReplData
- docker-compose up -d 명령을 실행하여 데이터베이스를 시작합니다.
- docker-compose down 명령을 실행하고 데이터베이스를 중지한 다음 모든 데이터를 삭제합니다. 여기서 데이터가 호스트에 매핑되어 있으므로 매핑된 디렉터리는 수동으로 삭제해야 합니다.
SSMS 다운로드
Microsoft 공식 웹사이트로 이동하여 Sqlserver 관리 도구를 다운로드하세요: learn.microsoft.com/en-us/sql/s...
데이터 준비
-- Sqlserver
CREATE DATABASE inventory;
GO
USE inventory;
CREATE TABLE orders (
id INTEGER IDENTITY(10001,1) NOT NULL PRIMARY KEY,
order_date DATE NOT NULL,
purchaser INTEGER NOT NULL,
quantity INTEGER NOT NULL,
product_id INTEGER NOT NULL
);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('16-JAN-2016', 1001, 1, 102);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('17-JAN-2016', 1002, 2, 105);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('19-FEB-2016', 1002, 2, 106);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('21-FEB-2016', 1003, 1, 107);
GO
마스터 리포지토리에서 데이터 릴리스
- 배포를 생성할 때 다음 단계에서 IP 주소가 나타나면 배포 서버를 생성하지 못할 수 있습니다.
- 다음 SQL을 사용하여 배포를 직접 만들 수 있으며, 이는 페이지를 통해 배포를 만드는 것과 동일하지만 SQL이 배포 서버가 IP 주소 대신 서버명을 사용하도록 제어한다는 점이 다릅니다.
use master
DECLARE @Server SYSNAME;
SELECT @Server = @@servername;
exec sp_adddistributor @distributor = @Server, @password = N'P@ssw0rd01'
GO
exec sp_adddistributiondb @database = N'distribution', @data_folder = N'/var/opt/mssql/ReplData', @min_distretention = 0, @max_distretention = 72, @history_retention = 48, @security_mode = 1
GO
use [distribution]
if (not exists (select * from sysobjects where name = 'UIProperties' and type = 'U '))
create table UIProperties(id int)
if (exists (select * from ::fn_listextendedproperty('SnapshotFolder', 'user', 'dbo', 'table', 'UIProperties', null, null)))
EXEC sp_updateextendedproperty N'SnapshotFolder', N'/var/opt/mssql/ReplData', 'user', dbo, 'table', 'UIProperties'
else
EXEC sp_addextendedproperty N'SnapshotFolder', N'/var/opt/mssql/ReplData', 'user', dbo, 'table', 'UIProperties'
GO
DECLARE @Server SYSNAME;
SELECT @Server = @@servername;
exec sp_adddistpublisher @publisher = @Server, @distribution_db = N'distribution', @security_mode = 0, @login = N'sa', @password = N'P@ssw0rd01', @working_directory = N'/var/opt/mssql/ReplData', @trusted = N'false', @thirdparty_flag = 0, @publisher_type = N'MSSQLSERVER'
GO
- 배포 서버가 성공적으로 생성되면 SSMS에서 복사 > 로컬 배포, 새 배포를 클릭합니다:
- 릴리스가 성공했는지 확인하고 메인 라이브러리에서 SSMS 복제를 마우스 오른쪽 버튼으로 클릭하고 복제 모니터 시작을 선택한 다음 에이전트를 클릭하여 데이터 양이 상대적으로 많은 경우 릴리스가 성공했음을 나타내는 100%를 표시하면 일반적으로 여기에서 오랜 시간을 기다립니다.
슬레이브 구독 데이터
- 복사 > 로컬 구독을 마우스 오른쪽 버튼으로 클릭하여 새 구독을 만듭니다:
- 구독 데이터베이스 선택 새로운 데이터베이스 발명
- 여기에 슬레이브 계정과 비밀번호 sa/P@ssw0rd02를 사용합니다.
- 구독이 성공했는지 확인하려면 마스터 라이브러리에서 복제 리스너를 열어 슬레이브 라이브러리 구독이 성공했는지 확인하고, 정상은 다음 그림과 같아야 합니다:
슬레이브에서 SQL을 실행하여 마스터와 슬레이브 데이터가 동일한지 확인합니다:
Clickhouse
SQL Server 마스터 데이터를 Clickhouse에 동기화하기
위에서 SQL Server 마스터-슬레이브 라이브러리 빌드를 완료했으므로 이제 슬레이브 라이브러리의 데이터는 마스터 라이브러리와 동일하므로 슬레이브 라이브러리 데이터를 Clickhouse에 동기화하기만 하면 됩니다.
- 슬레이브에 대해 CDC가 켜져 있는 상태에서 SSMS를 통해 슬레이브에 연결하고 다음 SQL을 실행합니다:
USE inventory;
EXEC sys.sp_cdc_enable_db;
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL, @supports_net_changes = 0;
GO
- 사육사를 시작합니다:
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:2.4
- 카프카를 시작합니다:
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:2.4
- 데베지움을 시작합니다:
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=ck-gid -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka quay.io/debezium/connect:2.4
- SQL Server와 Kafka 연결을 생성합니다:
- 다음 curl 명령을 실행하여 연결을 만듭니다:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector","tasks.max":"1","database.hostname":".16","database.port":"1434","database.user":"sa","database.password":"P@ssw0rd02","database.server.id":"184054","database.names":"inventory","topic.prefix":"oa","database.server.name":"","table.include.list":"dbo.orders,dbo.products","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","database.encrypt":false}}'
- 카프카 시각화의 경우, http://서버 ID:8080 에서 SQL Server 데이터가 카프카에 동기화되었는지 확인하세요.
docker run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-ui
- kafka는 주문 테이블에서 4가지 데이터를 볼 수 있었습니다.
- 벡터 설치
# __ __ __
# \ \ / / / /
# \ V / / /
# \_/ \/
#
# V E C T O R
# Configuration
#
# ------------------------------------------------------------------------------
# Website: https://.dev
# Docs: https://.dev/docs
# Chat: https://..dev
# ------------------------------------------------------------------------------
# Change this to use a non-default directory for Vector data storage:
# data_dir: "/var/lib/vector"
# Random Syslog-formatted logs
sources:
dummy_logs:
type: "demo_logs"
format: "syslog"
interval: 1
oa:
type: kafka
bootstrap_servers: .16:9092
group_id: '1'
auto_offset_reset: 'beginning'
topics:
- oa.inventory.dbo.orders
# Parse Syslog logs
# See the Vector Remap Language reference for more info: https://.dev
transforms:
parse_oa_message:
type: remap
inputs:
- oa
source: |
.topic = string!(.topic)
.data = parse_json!(.message)
filter_oa_order:
type: filter
inputs:
- parse_oa_message
condition: '.topic == "oa.inventory.dbo.orders"'
# Print parsed logs to stdout
sinks:
ck_oa_orders:
type: clickhouse
inputs:
- parse_oa_orders_payload
endpoint: "http://localhost:8321"
database: oa_sqlserver
table: orders_changes
auth:
user: default
password: xc8NalcdDXBek
strategy: basic
# Vector's GraphQL API (disabled by default)
# Uncomment to try it out with the `vector top` command or
# in your browser at "http://localhost:8686"
api:
enabled: true
address: ".1:8686"
- 데이터 수신 및 변환을 위한 관련 테이블을 만들려면 클릭하우스, 여기에는 필드 전후의 데이터에 입력되며, 조금 더 최적화할 수 있으며, 벡터 처리 시간은 데이터 아래의 json에 플레이로드만 유지하도록 데이터의 각 항목이 별도의 필드로 정의되기 전과 후가 될 수 있습니다.
DROP TABLE IF EXISTS oa_sqlserver.orders;
CREATE TABLE oa_sqlserver.orders
(
`id` UInt64,
`order_date` Date,
`purchaser` UInt32,
`quantity` UInt32,
`product_id` UInt64,
`version` String,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree
PRIMARY KEY id
ORDER BY id;
DROP TABLE IF EXISTS oa_sqlserver.orders_changes;
CREATE TABLE oa_sqlserver.orders_changes
(
`before` Tuple(id Nullable(UInt64), order_date Nullable(UInt32), product_id Nullable(UInt64), purchaser Nullable(UInt32), quantity Nullable(UInt32)),
`after` Tuple(id Nullable(UInt64), order_date Nullable(UInt32), product_id Nullable(UInt64), purchaser Nullable(UInt32), quantity Nullable(UInt32)),
`source` Tuple(change_lsn Nullable(String), commit_lsn Nullable(String), db Nullable(String), connector Nullable(String), event_serial_no Nullable(UInt32), name Nullable(String), `schema` Nullable(String), `sequence` Nullable(String), snapshot Nullable(String), `table` Nullable(String), ts_ms Nullable(UInt64), version Nullable(String)),
`op` LowCardinality(String),
`ts_ms` UInt64
)
ENGINE = MergeTree
ORDER BY tuple();
DROP VIEW IF EXISTS oa_sqlserver.orders_mv;
CREATE MATERIALIZED VIEW oa_sqlserver.orders_mv TO oa_sqlserver.orders
(
`id` Nullable(UInt64),
`order_date` Nullable(Date),
`purchaser` Nullable(UInt32),
`quantity` Nullable(UInt32),
`product_id` Nullable(UInt64),
`version` String,
`deleted` UInt8
) AS
SELECT
if(op = 'd', before.1, after.1) AS id,
if(op = 'd', toDate(before.2), toDate(after.2)) AS order_date,
if(op = 'd', before.4, after.4) AS purchaser,
if(op = 'd', before.5, after.5) AS quantity,
if(op = 'd', before.3, after.3) AS product_id,
if(op = 'd', ts_ms, ts_ms) AS version,
if(op = 'd', 1, 0) AS deleted
FROM oa_sqlserver.orders_changes
WHERE (op = 'c') OR (op = 'r') OR (op = 'u') OR (op = 'd');
클릭하우스에 연결하여 SQL을 실행하면 다음 데이터를 볼 수 있습니다:
데이터 검증
SQL Server 마스터에서 다음 SQL을 실행합니다:
INSERT INTO orders(order_date,purchaser,quantity,product_id) VALUES ('16-JAN-2024', 8888, 10, 102);
UPDATE inventory.dbo.orders SET order_date='2024-01-16', purchaser=1001, quantity=6, product_id=102 WHERE id=10001;
DELETE FROM inventory.dbo.orders WHERE id=10004;
각각 데이터를 추가, 수정, 삭제하며, 실행 후 마스터 데이터베이스 데이터는 다음과 같습니다:
Clickhouse의 데이터는 다음과 같습니다:
SQL Server 마스터 데이터 변경 사항을 Clickhouse에 정상적으로 동기화할 수 있음을 시연합니다.




