보아즈 학기 스터디로 Data Pipeline 스터디를 시작하게 되었습니다.
Data engineering zoomcamp 라는 깃허브&유튜브를 중점적으로 진행 예정이예요.
공부한 기록을 업로드하려고 합니다.
https://github.com/DataTalksClub/data-engineering-zoomcamp
GitHub - DataTalksClub/data-engineering-zoomcamp: Data Engineering Zoomcamp is a free nine-week course that covers the fundament
Data Engineering Zoomcamp is a free nine-week course that covers the fundamentals of data engineering. - DataTalksClub/data-engineering-zoomcamp
github.com
물론 시작한지 얼마 되지 않았지만 만족도 200%
이론으로만 배웠던 내용을 실제로 구현하는 커리여서 이해가 확 더 되고 재미도 있습니다.
양이 조금 많을뿐..ㅎㅎ
저는 윈도우 powershell + git bash 로 진행했습니다!
목차
Ingesting NY Taxi Data to Postgres
Connecting pgAdmin and Postgres
챕터1_1. docker, sql
🐳 Docker
Introduction to Docker
- https://www.youtube.com/watch?v=EYNwNlOrpr0&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=5
- 도커 = 독립 컨테이너
- 특정 서비스에 필요한 모든 것을 가질 수 있음
- Data Pipeline
- 파이썬 3.9
- 판다스
- Postgres connection library
- 호스트 컴퓨터에서 여러 데이터베이스 실행 가능
- 간섭 X
- pgAdmind
- 장점
- 재현성 Reproducibility
- docker 이미지를 가져와 다른 환경에서 실행 가능
- 로컬에서 만든 환경을 그래도 Google Cloud (쿠버네틱스) 에서도 사용 가능
- docker 이미지를 가져와 다른 환경에서 실행 가능
- CI/CD
- 클라우드에서 파이프라인으로 사용 가능함
- Spark
- Severless (AWS Lamda, Google functions)
- 재현성 Reproducibility
- 실습
- it : 대화형 모드에서 실행 (대화형 = 터미널)
- 이 때 에러가 날 수 있는데 winpty: windows 환경에서 발생하는 오류
- 격리
- docker run -it ubuntu bash 후 컨테이너 안에서 모든 파일을 삭제한 뒤 다시 컨테이너 실행 시 삭제 전(원래 컨테이너) 상태 유지
- 호스트 머신 영향 X
- entrypoint 설정하여 도커 이미지의 기본 엔트리포인트 덮어쓰기 가능 성성
- pip install pandas
- it : 대화형 모드에서 실행 (대화형 = 터미널)
# bash
docker run -it ubuntu bash
# 윈도우에서 발생하는 오류 winpty
# winpty docker run -it ubuntu bash
# bash
# python 명령을 실행하는 대신, bash 쉘이 실행
docker run -it --entrypoint=bash python:3.9
# winpty docker run -it --entrypoint=bash python:3.9
- 실습
- Dockerfile로 build 하기
# Dockerfile
FROM python:3.9
RUN pip install pandas
ENTRYPOINT [ "bash" ]
# bash
# test:pandas 라는 이미지 이름으로 빌드
# . : 현재 위치
docker build -t test:pandas .
docker run -it test:pandas
- 실습
- 데이터파이프라인 py 파일 설정 및 실행
# Dockerfile
FROM python:3.9
RUN pip install pandas
WORKDIR /app
COPY pipeline.py pipeline.py
ENTRYPOINT [ "bash" ]
# pipeline.py
import pandas as pd
print('job finished successfully')
- 실습
- 입력값 받기
# Dockerfile
FROM python:3.9
RUN pip install pandas
WORKDIR /app
COPY pipeline.py pipeline.py
ENTRYPOINT [ "python", "pipeline.py" ]
# pipeline.py
import sys
import pandas as pd
print(sys.argv)
day = sys.argv[1]
print(f'job finished successfully for day ={day}')
Ingesting NY Taxi Data to Postgres
- https://www.youtube.com/watch?v=2JM-ziJt0WI&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=5
- 실습
- postgreSQL 데이터베이스 설정
- -v PostgreSQL의 데이터 파일이 저장되는 위치, 컨테이너가 삭제되어도 로컬에 저장
docker run -it \
-e POSTGRES_USER="root" \ # 기본 사용자 root 설정
-e POSTGRES_PASSWORD="root" \ # 비밀번호 root 로 설정
-e POSTGRES_DB="ny_taxi" \ # 기본 데이터베이스 ny_taxi 설정
-v /c/Users/82105/zoomcamp/2_docker_sql/ny_taxi_postgres_data:/var/lib/postgresql/data \
-p 5432:5432 \ # 포트
postgres:13 # 이미지
- 실습
- postgreSQL 데이터베이스에 접속
- pgcli: postgreSQL 데이터베이스에 연결하는 cli
- password 나오면 root 치면 됨
pip install pgcli # 설치
pgcli -h localhost -p 5432 -u root -d ny_taxi
- 실습
- 데이터 다운, 해제, 데이터베이스에 업로드
# 데이터 다운
curl -L -o yellow_tripdata_2021-01.csv.gz https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
# 라이브러리 설치
pip install jupyter notebook
pip install sqlalchemy
pip install psycopg2-binary
gzip -d yellow_tripdata_2021-01.csv.gz
# 주피터 노트북 환경
import pandas as pd
df_iter=pd.read_csv("yellow_tripdata_2021-01.csv", iterator=True, chunksize=100000)
df = next(df_iter)
df.tpep_pickup_datetime= pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime= pd.to_datetime(df.tpep_dropoff_datetime)
df.head(n=0).to_sql(name="yellow_taxi_data", con=engine, if_exists='replace')
%time df.to_sql(name="yellow_taxi_data", con=engine, if_exists='append')
Connecting pgAdmin and Postgres
- http://youtube.com/watch?v=hCAIVe9N0ow&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=7&feature=youtu.be
- 실습
- pgAdmin : postgreSQL 데이터베이스를 관리할 수 있는 웹 기반 도구
docker run -it \
-e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD="root" \
-p 8080:80 \
dpage/pgadmin4
# docker run -it -e PGADMIN_DEFAULT_EMAIL="admin@admin.com" -e PGADMIN_DEFAULT_PASSWORD="root" -p 8080:80 dpage/pgadmin4
- 실습
- 다른 컨테이너 연결 필요(pgAdmin 컨테이너, postgreSQL 데이터베이스 컨테이너)
- → 네트워크 연결 pg-network
docker network create pg-network
docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB="ny_taxi" \
-v /c/Users/82105/zoomcamp/2_docker_sql/ny_taxi_postgres_data:/var/lib/postgresql/data \
-p 5432:5432 \
--network=pg-network
--name pg-database
postgres:13
#docker run -it -e POSTGRES_USER="root" -e POSTGRES_PASSWORD="root" -e POSTGRES_DB="ny_taxi" -v /c/Users/82105/zoomcamp/2_docker_sql/ny_taxi_postgres_data:/var/lib/postgresql/data -p 5432:5432 --network=pg-network --name pg-database postgres:13
docker run -it \
-e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD="root" \
-p 8080:80 \
--network=pg-network \
--name pgadmin \
dpage/pgadmin4
# docker run -it -e PGADMIN_DEFAULT_EMAIL="admin@admin.com" -e PGADMIN_DEFAULT_PASSWORD="root" -p 8080:80 --network=pg-network --name pgadmin dpage/pgadmin4
Putting the ingestion script into Docker
- https://www.youtube.com/watch?v=B1WwATwf-vY&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=9
- 실습
- python 파일로 다운부터 데이터베이스 업로드까지
# ingest_data.py
import argparse
import pandas as pd
from sqlalchemy import create_engine
from time import time
import os
def main(params):
user = params.user
password = params.password
host = params.host
port = params.port
db = params.db
table_name = params.table_name
url = params.url
# the backup files are gzipped, and it's important to keep the correct extension
# for pandas to be able to open the file
if url.endswith('.csv.gz'):
csv_name = 'output.csv.gz'
else:
csv_name = 'output.csv'
os.system(f"wget {url} -O {csv_name}")
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')
df.to_sql(name=table_name, con=engine, if_exists='append')
while True:
try:
t_start = time()
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.to_sql(name=table_name, con=engine, if_exists='append')
t_end = time()
print('inserted another chunk, took %.3f second' % (t_end - t_start))
except StopIteration:
print("Finished ingesting data into the postgres database")
break
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Ingest CSV data to Postgres')
# user, password, host, port, database name, table name
# url of the csv
parser.add_argument('--user', required=True, help='user name for postgres')
parser.add_argument('--password', required=True, help='password for postgres')
parser.add_argument('--host', required=True, help='host for postgres')
parser.add_argument('--port', required=True, help='port for postgres')
parser.add_argument('--db', required=True, help='database name for postgres')
parser.add_argument('--table_name', required=True, help='name of the table where we will write the results to')
parser.add_argument('--url', required=True, help='url of the csv file')
args = parser.parse_args()
main(args)
$env:URL="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"
python ingest_data.py --user=root --password=root --host=localhost --port=5432 --db=ny_taxi --table_name=yellow_taxi_trips --url=$env:URL
- 실습: dockerizing
FROM python:3.9
RUN apt-get install wget
RUN pip install pandas sqlalchemy psycopg2-binary
WORKDIR /app
COPY ingest_data.py pipeline.py
ENTRYPOINT [ "python", "ingest_data.py" ]
docker build -t taxi_ingest:v001 .
docker run -it \
--network=pg-network \
taxi_ingest:v001 \
--user=root \
--password=root \
--host=pg-database \
--port=5432 \
--db=ny_taxi \
--table_name=yellow_taxi_trips \
--url=$env:URL
# docker run -it --network=pg-network taxi_ingest:v001 --user=root --password=root --host=pg-database --port=5432 --db=ny_taxi --table_name=yellow_taxi_trips --url=$env:URL
Running Postgres and pgAdmin with Docker-Compose
- https://www.youtube.com/watch?v=hKI6PkPhpa0&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=11애
- docker compose
- 여러 컨테이너의 구성을 하나의 파일에 넣을 수 있게 해줌
- 실습
- 지금까지 만든 두 개의 컨테이너 docker-compose.yaml 파일로 생성
- 궁금증: network 설정 안해도 되나?
- gpt 왈)
docker-compose.yml
파일 내에서 네트워크가 자동으로 설정pgdatabase
와pgadmin
서비스가 동일한 네트워크에 포함- Compose는 내부적으로 같은 네트워크를 사용하여 두 컨테이너 간의 통신을 가능
- gpt 왈)
# docker-compose.yaml
services:
pgdatabase:
image: postgres:13
environment:
- POSTGRES_USER=root
- POSTGRES_PASSWORD=root
- POSTGRES_DB=ny_taxi
volumes:
- "./ny_taxi_postgres_data:/var/lib/postgresql/data:rw"
ports:
- "5432:5432"
pgadmin:
image: dpage/pgadmin4
environment:
- PGADMIN_DEFAULT_EMAIL=admin@admin.com
- PGADMIN_DEFAULT_PASSWORD=root
ports:
- "8080:80"
docker-compose up
docker-compose down
# 분리모드
# docker-compose up -d
SQL refresher
- SQL 코드 연습은 패스하겠습니당
'Boaz > Data Pipeline' 카테고리의 다른 글
[Data Engineering Zoomcamp #2] GCP, Terraform (0) | 2025.03.31 |
---|