Python으로 Avro 파일 생성

API나 DB에서 데이터를 가져와 Nifi를 이용해 DB와 Hadoop에 적재할 때, python을 이용하여 Avro 파일을 생성하여 Nifi로 넘겨주면 NifI에서의 프로세스도 줄어들고 가독성, 관리적인 측면도 좋아질듯 하여 만들게 되었다.

Avro란?

Avro는 특정 언어에 종속되지 않은, 언어 중립적 데이터 직렬화 시스템이다. 특정 스키마를 사전에 알지 못하더라도 해당 스키마에 부합되는 데이터를 읽고 쓸 수 있다.

Avro의 스키마는 JSON으로 작성하며, 데이터는 바이너리 포맷으로 인코딩된다.

Avro의 장단점

  • 장점
    • 데이터의 타입을 알 수 있다.
    • 스키마에 설명을 포함하여 구조 이해에 도움을 줄 수 있다.
    • 데이터를 여러 언어로 액세스 가능하다.
    • 향후 스키마 변경에 유연하게 대응 가능하다.
    • Hadoop 계열과 궁합이 좋다.
  • 단점
    • 바이너리 형태로 직렬화(serialize) 되므로 JSON을 직접 쓸 때와 비교했을 때 대비 데이터를 쉽게 들여다 보기 어렵다.

Avro Data Type (Record)

Avro 파일을 생성하기 위해선 JSON형태의 스키마를 미리 만들어 주어야 한다. 여러 종류의 Avro Data Type이 있지만, 그중 Complex Data Type의 Record 타입을 이용할 것이다.

기본 형태와 설명은 아래와 같다.

  • Record
    • name : 스키마 이름
    • namespace : 패키지
    • doc : 스키마 설명/문서
    • aliases : 별칭
    • fields
      • name : 필드명
      • doc : 필드 설명
      • type : 필드의 데이터형 (data type)
      • default : 기본값

Avro 파일 생성

우선 python으로 Avro 파일을 생성하기 위해 필요한 라이브러리를 아래 명령어로 받아준다.

$ pip3 install fastavro

기본 avro 라이브러리도 있지만 fastavro가 C언어 기반의 라이브러리라 속도가 더 빠르다고 한다.


사용할 DB 테이블 스키마는 아래와 같다.

CREATE TABLE public."member" (
	"name" text NULL,
	age int4 NULL,
	score int4 NULL,
	reg_dttm text NULL
);


이제 Avro 파일 생성을 위한 JSON 형태의 스키마를 만들어 준다.

"avro_schema": {
    "name": "member",
    "type": "record",
    "fields": [
        {"name": "name", "type": ["string", "null"], "default": "null"},
        {"name": "age", "type": ["int", "null"], "default": "null"},
        {"name": "score", "type": ["int", "null"], "default": "null"},
        {"name": "dt", "type": ["string", "null"], "default": "null"}
    ]
}


가져온 데이터를 Avro 파일로 생성하는 부분의 코드는 아래와 같다.

def make_avro(data, schema, avro_path):
    parsed_schema = fastavro.parse_schema(schema)

    with open(avro_path, 'wb') as out:
        fastavro.writer(out, parsed_schema, data)
    print("Avro Path : {}".format(avro_path))

파라미터로는 넣을 데이터(dict), 위에서 작성한 JSON 형식의 스키마, 생성할 Avro 파일 이름이다.

여기서 중요한 점은 dictionary 타입의 넣을 데이터 형식이다. 데이터의 형식은 아래와 같다.

[
    {'col1': value1, 'col2': value2, 'col3': value3, 'col4': value3},
    {'col1': value1, 'col2': value2, 'col3': value3, 'col4': value3},
    {'col1': value1, 'col2': value2, 'col3': value3, 'col4': value3},
    ....
]

이와 같은 형식으로 넣어주어야 fastavro.writer에서 데이터와 스키마를 읽어 Avro 파일을 생성할 수 있다.


전체 수집 코드는 아래와 같다.

import fastavro
import jaydebeapi
import sys

try:
    con = jaydebeapi.connect('org.postgresql.Driver', 'jdbc:postgresql://127.0.0.1:5432/postgres', ['postgres', 'rootpass'], 'C:/Users/ktkim1201/Desktop/safety/lib/postgresql-42.2.20.jar')
    cursor = con.cursor()
except jaydebeapi.DatabaseError as e:
    print("DB connection Error")
    print(e)

def main():
    avro_schema = {
        "name": "member",
        "type": "record",
        "fields": [
            {"name": "name", "type": ["string", "null"], "default": "null"},
            {"name": "age", "type": ["int", "null"], "default": "null"},
            {"name": "score", "type": ["int", "null"], "default": "null"},
            {"name": "dt", "type": ["string", "null"], "default": "null"}
        ]
    }

    query = "SELECT * FROM MEMBER"
    data = select_query(query)

    # avro 파일 생성
    avro_path = "member.avro"
    make_avro(data, avro_schema, avro_path)

    # avro 파일 읽기
    read_avro(avro_path)


# 테이블 조회 후 데이터 반환
def select_query(query):
    try:
        print(query)
        cursor.execute(query)
        column_names = [desc[0] for desc in cursor.description]
        data = cursor.fetchall()
        data_dict = list(map(lambda x: dict(zip(column_names, x)), data))
        return data_dict
    except Exception as e:
        print(e)
        sys.exit(1)


# avro 파일 생성
def make_avro(data, schema, avro_path):
    parsed_schema = fastavro.parse_schema(schema)

    with open(avro_path, 'wb') as out:
        fastavro.writer(out, parsed_schema, data)
    print("Avro Path : {}".format(avro_path))


# avro 파일 읽기
def read_avro(avro):
    with open(avro, "rb") as fo:
        read_avro = fastavro.reader(fo)
        for row in read_avro:
            print(row)


if __name__ == "__main__":
    main()

schema의 type, avro type에 대한 추가적인 정보는 Apache Avro 공식 문서를 참고하자.

Leave a comment