Jakiś czas temu, otrzymałem zadanie dodania do istniejącej aplikacji napisanej z użyciem Django, wyszukiwarki danych korzystającej z wielu róznych źródeł plikowych. Samo zadanie okazało się dość ciekawe, gdyż docelowo miały się znaleźć na nim dziesiątki plików tekstowych, o rozmiarach przekraczających nawet 10-15GB, a sam serwer miał mieć tylko 4GB ramu. Dodatkowym utrudnieniem miało być utworzenie serwera w zupełnie innej lokalizacji, będącego za nat-em bez publicznego adresu ip.
Poniżej zaprezentuję Ci drogi czytelniku, kroki które pozwolą nam razem stworzyć powyższą aplikację.
1. Przygotowanie środowiska
Na samym początku przygotowujemy zależności do naszej aplikacji Flaskowej, która będzie naszym api do workera Celery, definiując plik requirements.txt z wymaganymi pakietami:
celery==5.3.6
Flask==3.0.0
importlib-metadata==7.0.0
celery[redis]
python-dotenv
które następnie instalujemy przez
pip install -r requirements.txt
2. Integracja Celery z Flaskiem
Wzorując się na oficjalnej dokumentacji, tworzymy:
- funkcję celery_init_app która ma za zadanie utworzenia i zwrócenia obiektu celery z skonfigurowanym flaskiem, dzięki czemu podklasa task, ma dostęp do usług takich jak np połączenie się z bazą danych,
- funkcję create_app, która podczas użycia wzorca fabryki, pozwoli na korzystanie z aplikacji celery, otrzymanej z fabryki tworzącej nam serwer oparty na Flask,
config.py
from celery import Celery, Task
from flask import Flask
from dotenv import load_dotenv
from os import getenv
load_dotenv()
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
def create_app() -> Flask:
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url=getenv("BROKER_URL"),
result_backend=getenv("RESULT_BACKEND"),
task_ignore_result=True,
),
)
app.config.from_prefixed_env()
celery_init_app(app)
return app
3. Serwer cache
W celu integracji Celery, wymagane jest zastosowanie serwera cache, będącego miejscem do obsługi i zapisu danych z naszych tasków.
W studium przypadku wykorzystam dystrybucję CentOS, uruchomioną za pośrednictwem Windowsowego WSL. Na niej zainstalujemy oraz uruchomimy serwer redis po przez:
yum install redis
systemctl enable redis
systemctl start redis
Następnie w naszym projekcie uworzymy plik z danymi połączeniowymi do serwera Redis.
.env
BROKER_URL = "redis://127.0.0.1:6379"
RESULT_BACKEND = "redis://127.0.0.1:6379"
4. Tworzenie widoków z użyciem Flask
W naszej aplikacji utworzymy dwa wywołania dla api opakowane w blueprinty:
- pierwsze, pozwalające nam na dodanie zadania wyszukującego nasz zadany tekst,
- drugie, pozwalające na otrzymanie wyników naszego zapytania (zadanie zwróci wynik po zakończeniu, a w trakcie jego aktualny stan),
from flask import request, Blueprint
from celery.result import AsyncResult
from typing import Dict
from src import tasks
bp = Blueprint("tasks", __name__)
@bp.post("/find")
def find() -> Dict[str, object]:
data = request.form.get("data", type=str)
result = tasks.find.delay(data)
return {"result_id": result.id}
@bp.post("/result/<id>")
def result(id: str) -> Dict[str, object]:
result = AsyncResult(id)
ready = result.ready()
return {
"ready": ready,
"successful": result.successful() if ready else None,
"value": result.get() if ready else result.result,
}
5. Właściwa część, czyli jak wyszukać dane w wielu plikach
Utworzymy tu współdzielone zadanie które po dodaniu do kolejki, cierpliwie będzie przeszukiwało wszystkie pliki znadujące się w katalogu DATA, folderu aplikacji.
from json import dumps
from os import getcwd, path, listdir
from celery import Task
from celery import shared_task
from celery.utils.log import get_task_logger
REPLACE_CHARS = {
")": "",
"(": "",
"+": "",
"\\": "",
"/": "",
"|": ";",
":": ";",
}
logger = get_task_logger(__name__)
@shared_task(bind=True, ignore_result=False)
def find(self: Task, searched_text: str) -> str:
"""Function searches for specific text in files within a directory and returns the results
in JSON format.
:param self: Task
:param searched_text: str
:return: results string with json data or error message
:rtype: str
"""
logger.info(f"Searching for {searched_text}")
dir_path = path.join(getcwd(), "data")
if not path.exists(dir_path):
return f"[Error] Directory {dir_path} does not exist."
list_dir = listdir(dir_path)
if not list_dir:
return f"[Error] Directory {dir_path} is empty."
searched_strings = list(
map(
lambda x: "".join([REPLACE_CHARS.get(char, char) for char in x]).lower(),
searched_text.split(" "),
)
)
result = []
for index, file in enumerate(list_dir):
cur_path = path.join(dir_path, file)
self.update_state(
state="PROGRESS",
meta={"current": index + 1, "total": len(list_dir)},
)
with open(cur_path, "r", encoding="utf-8") as curr:
for l_no, line in enumerate(curr):
try:
actual = "".join(
[REPLACE_CHARS.get(char, char) for char in line]
).lower()
except UnicodeDecodeError:
result.append(
{
"file": path.basename(cur_path),
"line": "-",
"data": f"[Error] Problem reading {cur_path}",
}
)
except Exception as e:
logger.error("[Error] Can't process file, retry in 5 seconds")
raise self.retry(exc=e, countdown=5)
for searched_string in searched_strings:
if searched_string in actual:
result.append(
{
"file": path.basename(cur_path),
"line": l_no,
"data": line,
}
)
json_dump = dumps(result, indent="\t")
logger.info(f"Stopping searching for {searched_text}")
return json_dump
- Z uwagi na ograniczenia w pamięci ram, wymagane było zastosowanie podejścia wykorzystującego iterator do przejścia po kolejnych liniach pliku. Sam projekt miał ignorować wszelkie formatowania, aby np wpisując numer telefonu, można było by znaleźć go zapisanego w róznych formatach stąd zastosowanie mechanizmu eliminującego REPLACE_CHARS.
- Zalecanym jest stosowanie sprawdzania plików pod względem rozszerzenia i typu, w tym przypadku chcemy operować na wszystkich plikach jakie otrzymamy.
6. Tworzenie instancji aplikacji
W tej częsci, tworzymy naszą aplikację flaskową, rejestrujemy w niej wywołania api, a także rejestrujemy w niej celery.
app.py
from config import create_app
from src.views import bp
flask_app = create_app()
flask_app.register_blueprint(bp)
celery_app = flask_app.extensions["celery"]
7. Dane testowe
W katalogu naszej aplikacji, tworzymy folder data do którego wrzucamy nasze pliki np z logami w których chcemy wyszukać nasze dane.
8. Uruchomienie aplikacji
Celem przetestowania uruchamiamy w dwóch terminalach wywołania:
- silnika celery, pozwalającego na kolejkowanie naszych zadań
- serwera flask, dostarczającego nam api
celery -A project worker --loglevel=info
flask --app make_celery.py run --host=127.0.0.1
* W przypadku problemów możemy uruchomić celery w trybie wykonywania pojedynczo tasków:
celery -A make_celery worker --loglevel INFO --concurrency 1 -P solo
Aby przetestować naszą aplikację, polecam aplikację postman. Na ten moment wykonamy testy ręcznie wywołując w terminalu:
curl --location 'http://127.0.0.1:5000/find' --form 'data="Test-Serv"'
Po czym otrzymujemy guid naszego wywołania.
{
"result_id": "9cd376d8-55ae-4da8-b7c5-6a5a43a1729a"
}
Teraz możemy wywołać zapytanie do naszego serwera, celem otrzymania statusu naszego wyszukiwania, bądź jego wyników.
curl --location --request POST 'http://127.0.0.1:5000/result/03ca92ef-3259-46d6-8be3-712c500e0aec' --header 'Content-Type: application/json'
W aktualnym przypadku otrzymaliśmy w wyniku nazwę pliku oraz dokłądną linię w której możemy odnaleźć nasze dane.
{
"ready": true,
"successful": true,
"value": "[\n\t{\n\t\t\"file\": \"test_db.txt\",\n\t\t\"line\": 32,\n\t\t\"data\": \"Name=dev_test_serv;level=342;error=No space left\\n\"\n\t}\n]"
}
W drugiej części zajmiemy się obsługą naszego api, przez drugi serwer oparty na Frameworku Django.
Komentarze