티스토리 뷰

반응형

메시지 보내는걸 일반 소켓 대신 메시지 브로커로 깔끔하게 쓰려고 찾다 보니 redis, kafka, rabbitMQ 정도 나옴

 

kafka 쓰기는 너무 큰 것 같고, redis보다는 rabbitMQ가 나아 보여서 rabbitMQ로

- 근데 일단 내가 전달하려는 json 메시지와 메시지 브로커간의 메시지의 의미가 다르긴 한데 그냥 쓰려면 쓸 수는 있을 듯

 

pika로 rabbitMQ에 연결해서 메시지 넣고 사용하면서 좀 더 찾아보니 일반적으로 클라이언트는 rabbitMQ와 같은 브로커에 직접 연결하지 않는다고 함

 

api 등을 통해 처리를 한다고 해서 fastapi 써야겠다~ 생각하고 찾다 보니 또 fastapi와 같은 서버에서 직접 메시지를 넣거나 꺼내지 않고 celery와 같은 브로커를 써서 작업을 처리한다고 함...

 

그렇구나~ 하고 celery 추가하고 fastapi에서 rabbitMQ로, celery task에서 rabbitMQ task queue를 보고 task에서 도착지를 확인해서 도착지 queue를 만들고 해당 queue에 json 넣고...

 

celery task 모니터링하기 편하려면 flower 설치해서 확인 가능하다고 나와서 flower는 설치했는데 task queue에 추가가 돼도 아예 반영이 안 돼서 더 찾아봐야 할 듯

- worker랑 flower랑 두 개를 실행 해야하는데 flower만 켜고 왜 안되지 이러고 있었음

- celery -A celery_app worker --loglevel=debug

- celery -A celery_app flower

- 근데 브로커 queue 정보가 안 나옴... -> 안나와도 이상한건 아닌듯

근데 또 fastapi를 직접 외부로 노출시키지 않고 proxy로 nginx를 쓴다고 함... 

 

nginx 설치하고 포트 80에 fastapi로 proxy까지 추가함

 

rabbitMQ 서비스가 꺼지면 다 사라진다고 해서 json 메시지를 db에 저장하려고 mariadb 설치하고 celery task에서 저장시키도록 함

 

이쯤 되니까 이거 누가 관리하냐라는 생각이 다가오긴 하지만 그래도 일단 하는 데까지는 해보려고 시도 중 

 

다시 보니까 rabbitMQ가 fastapi 오른쪽에 오는게 맞음

mac + homebrew 기준

 

nginx proxy 설정

- conf 위치: cd /opt/homebrew/etc/nginx/nginx.conf

(base)  j0n9m1n1  /opt/homebrew/var/log/nginx   stable  cat /opt/homebrew/etc/nginx/nginx.conf

#user  nobody;
worker_processes  1;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       mime.types;
    default_type  application/octet-stream;
    upstream fastapi_app{
        server 127.0.0.1:8000;
    }
    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    #gzip  on;

    server {
        listen       80;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;

        location / {
	    proxy_pass http://fastapi_app;
	    proxy_set_header Host $host;
	    proxy_set_header X-Real-IP $remote_addr;
	    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            root   html;
            index  index.html index.htm;
        }

        #error_page  404              /404.html;

        # redirect server error pages to the static page /50x.html
        #
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
    include servers/*;
}

 

locust로 테스트를 많이들 하는 것  같아 돌려봤으나 봐도 사실 뭔지 모름

평균 초당 60개 정도 처리된다?

 

뭔가 이상함, dest queue를 만들고 거기 넣어줘야 하는데 1만 개 중에 2천 개만 들어감...

근데 db에는 8천 개가 들어가 있고 2천 개는 또 없음

나중에 한 번 다시 봐야 할 듯

 

얼추 원하는 대로 흘러가면 docker까지

 

fastapi에서 celery task를 호출하면 rabbitMQ에 task queue가 생기고 해당 queue에 작업이 쌓이는 것 같은데 task에서 작업을 문제없이 처리해도 해당 task queue의 메시지가 줄어들지를 않음...

- task decorator? 변경하니 되긴 되는 것 같음 

@celery_app.task(ignore_result=True)

- 근데 option이 ignore_result라서 뭔가 쓰기 찝찝함

 

ignore_result 없애기 위해서 찾아보니 result_backend를 지정해주면 된다고 하여

결과를 maria에 넣어주려고 했으나 mysql 라이브러리 충돌에 스트레스 받아서 sqlite에 저장함

task queue consume도 잘 돼서 일단 문제 없는 듯

def make_celery():
    celery = Celery(
        "worker",
        # result_backend='db+mysql://celery_user:!Q@W3e4r@localhost/celery_results',
        backend="rpc://",
        result_backend = 'db+sqlite:///results.sqlite3',
        broker="amqp://guest:guest@localhost//"
    )

    return celery

celery_app = make_celery()

 

 

댓글

티스토리 방명록

최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday