İçeriğe geç

Celery nedir? Asenkron Görev Kuyruklarına Giriş

Merhabalar.

Pek çok zaman, bir web uygulamasının yaptığımız istek üzerine bize, çok uzun zaman cevap veremediği, uzun yükleme sürelerini beklediğimiz olmuştur. Kimi zaman ise hiç bir cevap alamadan uygulamanın önünde koşan web sunucusunun, hata sayfası ile karşılaştığımız da olmuştur.

HTTP bildiğiniz üzere, istek-cevap döngüsüne dayanan bir protokoldür. Web uygulamaları geliştirme süreçlerinde de , ideal olan, bu istek-cevap döngüsünü olabildiğince kısa tutmak, kullanıcıya mümkün olan en kısa şekilde anlamlı bir cevap dönmektir.

Örnek bir Django viewi üzerinden problemimizi konuşmaya devam edelim.

Bu view isteği yapan kullanıcının, finansal analizini yapmakla yükümlü olsun.

@login_required
def financial_analysis_view(request):
    user = request.user
    financial_analysis = analyize_financial_data(user=user)
    send_financial_analyis_email(financial_analysis=financial_analysis, user=user)
    return JsonResponse({"message": "Your financial analyis has been sent your email."})

Satır satır gidecek ve işlemlerin maliyeti üzerine konuşacak olursak;

  • Kullanıcının, finansal analizini yapan yardımcı fonksiyonun 3 ila 5 dakika arasında bir sonuç üretmesi ile ortalama 4 dakika.
  • Oluşturulan analizin, çalışılan mail altyapı sağlayıcısına göre gönderilmesi ise 3-5 saniye arasında zaman alsın.

Ortalama 4 dakikalık bu işlem süresince kullanıcı işlemin bitmesini bekleyecek, (muhtemelen defalarca yenileme isteği atacak) veya web sunucumuz uygulamadan ümidi kesecek ve 504 dönecek. 🙂

Hem sunucu üzerinde gereksiz trafik hem de kötü bir kullanıcı deneyimi oluşturmuş olacağız.

Peki bu tarz yüksek işlem süresi gerektiren işlemleri istek-cevap döngüsünden ayrıştırarak, ayrık işlesek ve kullanıcıya, cevap olarak dolaylı aralıklarla mail adresini kontrol etmesini söyleryerek, ideal istek-cevap döngüsüne yaklaşsaydık?

Kullanıcı deneyimini arttırmış olacağımız ve uygulama üstünde gereksiz trafik yaratmayacağımız aşikar. Fakat nasıl nasıl yapabiliriz?

Celery sahneye lütfen!

Celery nedir?

Celery, dağıtık mesaj geçişine dayanan açık kaynaklı, bir asenkron görev kuyruğu veya iş kuyruğudur. Genellikle gerçek zamanlı görev işleme üzerine yoğunlaşsa ve kullanılsa dahi zamanlanmış görevleri de yönetebilir.

Celery ile doğrudan istek-cevap döngüsünün bir parçası olmayan zaman maliyeti olan işlemleri taşere edebilir, ve bir veya birden fazla workera atayabiliriz. Workerlar ayrık bir şekilde, eş zamanlı işlemleri gerçekleştirecektir.

Mimarisi, dağıtık mesaj geçişinin bir uygulanması olarak 3 ana bileşene dayanır. Mesaj gönderici(producer ya da application), mesajları çift taraflı olarak ileten ve saklayan kuyruk(broker veya queue) ve alınan mesajlar ışığında belirli bir takım işlemleri gerçekleştiren tüketiciler(consumer ya da worker).

Şu an için worker, producer veya broker gibi kavramlar pek bir şey ifade etmiyor olabilir. 🙂 Celery ile yapabileceklerimizden bahsettikten sonra, Celery mimarisi konusunda bu kavramları inceleyeceğiz.

Gerçek hayat senaryoları üzerinden yapabileceklerimizden örnekler verecek olursak;

  • Kullanıcılara mail göndermek.
  • Bir görseli işlemek.
  • Kullanıcılara push notification göndermek.
  • Trafiğin az olduğu saatlerde, uygulama üzerindeki kritik dataların backup’ının alınması.
  • Birbirine bağlı fakat, 2 ayrık uygulama arasında periyodik iletişim ve işlemlerin sağlanması vs.

Yukarıda örnekler fazlasıyla arttırılabilir fakat yazının başından beri, birlikte kullanım alanları konusunda bir resim çizebildiğimizi düşünüyorum. Kısacası ağır işlem gücü gerektiren, zaman maliyeti olan ve doğrudan istek-cevap döngüsünün bir parçası olmayan işlemler.

Celery Mimarisi ve Çalışma Prensibi.

Celery grafikize edilmiş şekilde aşağıdaki gibi bir mimariye sahiptir.

3 ana bileşenden oluşan Celery’nin, merkezindeki fikir işlenmek üzere taskler oluşturmak ve takip etmektir.

Bu taskler tanımı itibarıyla Celery’nin tanıyabilmesi ve kayıt altında tutulabilmesi bakımından bir decorator ile çevrelenir. Fonksiyon gövdesi ise normal bir Python fonksiyonundan farklı olmayarak argümanları kabul eder ve yapması gereken işlemleri gerçekleştirir. Tanımlanan task fonksiyonları Celery nesnesine bind edilir.

Bir celery taskinin çalışabilmesi için 2 gerekli bilgi bulunur.

  1. Task ismi.
  2. Task argümanları.

Application, (bir web uygulaması ya da Python scripti olabilir) çalıştıracak olan task ismini ve gerekli argümanları sağlayarak, bir Celery taskini ateşler ve yukarıdaki mimaride grafikize edilmiş döngüyü başlatır.

Application tarafından çalıştırılması istenen task ismiyle ve serialize edilen argümanları ile birlikte message broker üzerine kayıt edilir. Finans analizi örneğimizde olduğu üzere, uygulama artık yapılacak olan ağır işlemleri(finansal analizi) bir celery taskine taşere etmiş ve mail adresinizi kontrol ediniz cevabını sistem üzerinde bir yük oluşturmadan kullanıcıya hızlıca dönmüş olacaktır.

Message broker üzerine konuşacak olursak;

Broker application ve worker arasında çift taraflı olarak çalışır.

İlk görevi, uygulama tarafından çalıştırılması istenen taskleri kayıt etmektir. Gelen her yeni task, broker üzerindeki kuyruğa eklenir. Bu kuyruk yeni taskler geldikçe büyüyecek ve taskleri depolayacaktır. Her yeni kayıt edilen task için, uygulama katmanına, taskin daha sonra takibinin yapılabilmesi için benzersiz bir task numarası döner.

Daha sonraki göreviyse, işletilecek taski worker(tüketicilere) aktarmaktır ve workerlardan gelen task durumlarını takip etmektir. (Bu task koşturuldu, şu kadar saniye sürdü ve şu sonucu döndürdü vs)

Buraya kadar ki olan kısım, bir Celery taski çalışıtırılmadan önceki işlemlerden ibaret fakat artık asıl işi yapan workerlara geçebiliriz.

Celery workerler’ı ayrık processlerdir ve ayağa kaldırıldıkları andan itibaren brokerı yani kuyruğu sorgulamaya başlarlar. Kuyruğa herhangi bir iş kayıt edildiği anda, müsaitlik durumu olan worker taski alacak ve geçilen argümanlarla birlikte çalıştırmaya koyulacaktır. Birden fazla worker, birden fazla taski eş zamanlı olarak işleyebilir ve sonuç üretebilir.

Eğer task ateşlendiği anda herhangi bir worker müsait durumda değilse task kuyrukta sırası korunarak kalacak, ilk müsait olan worker tarafından üstlenilecektir.

Bir worker ayakta olduğu sürece, tabiri caizse çalışkan bir arı edasıyla sürekli aynı soruyu sorar;

Arı Filmi 2007

 

“Bana verebileceğin bir iş var mı? Ben boşa çıktım. Bana verebileceğin bir iş var mı?”

 

 

 

 

Koşturulan task tamamlandıktan sonra, worker bir takım meta bilgileri broker üzerine gönderir. İşin tamamlama süresi, task runtime sırasında oluşan beklendmedik bir hata vs. Bu meta bilgiler loglanarak, daha sonrasında sistemde yolunda gitmeyen ve taskler üstünde koşan kısımlar hakkında know-how oluşturur.

Taskin döndürdüğü değerleri, eğer bir result backend kayıt edilmemiş ise pas geçer.

Taskler çoğu zaman void type fonksiyonlar olsa da bazı durumlarda döndürülen değerin kayıt altına alınması gerekebilir. Bu işlevi de broker üzerine yükleyebileceğiniz gibi, ayrık bir veritabanı da kullanabilirsiniz.

Örneğin, result backend olarak, 3. parti bir Django uygulaması ile task sonuçlarını modele kaydetmek yaygın bir davranıştır.

Genel olarak çalışma mimarisi ve prensipleri üzerine bir tablo çizdik.


Hızlı bir demo.

Şimdi bir task nasıl oluşturulur, koşturulur gibi konulara göz atabiliriz.

Öncelikle Celery’nin iletişim sağlayabilmesi adına bir brokera ihtiyacımız var. Celery pek çok broker ile çalışabilir fakat bu örnekte Redis üzerinden ilerleyeceğiz.

Kullanılabilecek diğer opsiyonlar ile alakalı son kısımda konuşuyor olacağız.

Kurulum için;

$ docker container run --rm -p 7055:6379 -d  --name celery_demo_broker redis:alpine

7055 üzerinden istek kabul edecek bir Redis ayağa kaldırdık.

Daha sonra Celery ve Redis ile konuşabilmesi için gerekli olan bağımlılığını tek seferde kuralım.

$ pip install "celery[redis]"
# main.py
from celery import Celery
import time
app = Celery(broker="redis://localhost:7055/0")
@app.task()
def friendly_task():
    time.sleep(3)
    return "Hello friend!"

Olabildiğince basit setup oluşturduk. Herhangi bir işlem yapmayan biraz uyuyan bir taskimiz var 🙂

Başta da bahsettiğimiz üzere, bir taskin Python fonksiyonundan tek farkı bir Celery taski olarak işaretlenmiş olması ve çeşitli yetenekler kazanmış olması.

Worker ayağa kaldıralım.

$ celery --app your_app worker --loglevel info

Tebrikler! taskleri yürütmesi için başarıyla bir worker ayağa kaldırdık.

Bir taskin yürütülmesi.

Bir celery taskinin kuyruğa yazılarak işletilebilmesi için normal fonksiyon cağrımından farklı olarak, delay keywordu kullanılması gerekmektedir. Örneğin;


friendly_task.delay()
your_heayv_task.delay(arg1, arg2, arg3)

Yürütülen tasklerin worker üzerinde görüntülenmesi;

Harika!

Yazının başında belirttiğimiz Django viewındaki, problemi giderme adına uygulamayı Celery ile çalışabilir hale getirdikten sonra, bir task yazar ve ilgili viewda ateşleyerek ayrık olarak işlenmesi için kuyruğa teslim edip, mail kontrol cevabını anında dönebiliriz.

Örn;


@login_required
def analyize_user(request):
	user = request.user
	analyize_user_data_and_send_email.delay(user=user)
	return JsonResponse({
			"message": "Your analyis have been sent your email."
		})

Daha sonrasında kullanıcı, mailini huzurla beklemeye başlayabilir.

Celery task argümanlarını serialize ederken JSON formatında bu işlemi gerçekleştirir. User nesnesi doğrudan serialize edilebilir değildir. Serialize edilebilir olsa dahi, user task çalışana kadar güncellenmiş olabilir ve state kayıpları yaşayabilirsiniz.  Önerilen kullanım, pk paslamak ve task başlangıcında nesneyi veritabanından almaktır. Bu bir örnek yazısı olduğundan bu caseleri atlıyoruz 🙂


Buraya kadar olan kısım ile Celery kullanımı, dağıtık mesaj geçisi ve asenkron işlem kuyruklarını hakkında bir fikir oluşturduk.

Fakat daha henüz, periyodik tasklerin işletilmesini ve beat’ten bahsetmedik.

Periyodik taskler? Celery Beat nedir?

Yazının başında Celery ile periyodik olarak bir takım işlemler yapılabileceğinden bahsetmiştik. Bu periyodik işlemler backup alınması, belirli raporların oluşturması vs olabilir.

Celery beat ayrık çalışan bir zamanlayıcıdır. Ayağa kaldırıldığı andan itibaren, kendine tanımlı olan konfigürasyona göre (genellikle beat_schedule ya da sql üzerinde barındırılır),  taskleri kuyruğa ekler ve müsait bir worker tarafından yürütülmesi sağlar.

Burada dikkat edilmesi gereken kısım, merkezi bir yaklaşım izleyerek sistemde bir beat processi çalıştırmaktır. Birden fazla beat sistemde aynı konfigürasyon ile bulunursa tasklerin yinelenmesine yol açacaktır.

Konfigürasyon ve zamanlayıcılar.

Örnek bir konfigürasyona göz atacak olursak;

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
app.conf.timezone = 'UTC'

task bir isim ve yürütülecek olan path ile kayıt edildikten sonra, timedelta ile zamanlanmıştır. Her 30 saniyede bir, 16 + 16 işlemini yapmak için beat tarafından kuyruğa eklenecektir. Buradaki ilk başlangıc zamanı tahmin edeceğimiz üzere, beat processinin ayağa kalkmayı başarabildikten sonra ki 30.saniyedir. 🙂

beat varsayılan olarak UTC timezone kullanır.

Bunun yanında crontab ve solar sistem zamanlayıcıları da kullanılabilir.

crontabler ile daha spesifik zaman dilimlerinde çalışmak üzere taskleri zamanlayabilirsiniz.

from celery.schedules import crontab
app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

Yukarıdaki örnekte olduğu gibi her pazartesi sabahı 7.30 da 16 + 16 işleminin yapılabilmesi için bir zamanlayıcı ayarlayabilirsiniz.

Başka bir crontab örneği;

crontab(minute=0, hour='*/3,8-17')

Sadece ofis saatleri arasında (8.00 – 17.00) , 3 e bölünebilen saatlerde çalışması için zamanlanacak bir task.

Son olarak isterseniz solar zaman kullanarak zamanlayıcılar da ayarlayabilirsiniz. Örneğin bir şehrin gün doğumu ya da batışında, ya da güneşin en tepede olduğu saat 🙂

Muğla merkezli bir şirketiniz varsa, koordinatları girerek tüm çalışanları, Muğladaki her gün doğumunda selamlayabilirsiniz 🙂

from celery.schedules import solar
app.conf.beat_schedule = {
    'greet-every-sunrise-on-mugla': {
        'task': 'tasks.good_morning_mugla',
        'schedule': solar('sunrise', '37.215118', '28.362730'),
    },
}

Buraya kadar beat ve konfigürasyonlara göz attık. Fakat beat nasıl ayağa kaldırılır?

Bir beat  ayağa kaldırmak için aşağıda ki komut şeması kullanılır.

$ celery --app your_app_name beat 

beat, ayağa kalktığı andan itibaren, konfigürasyona göre taskleri yürütülmesi için kuyruğa eklemeye başlayacaktır. Eklenen taskler müsait worker tarafından işletilecektir.Sanırım celery ve ana parçaları hakkında artık fazlasıyla bilgi sahibiyiz.

Peki neden Celery?

Yazılım dünyasının, belki de en sevdiğim yanlarından biri bir işi yapmanın birden fazla yolunun her zaman olması ve avantaj dezavantajlara göre karar verebilmek.

Celery, asenkron görev kuyrukları üzerine, piyasada bulunan tek çözüm değil. Dramatiq, Rq gibi kütüphaneler de yıldızı parlayan kullanışlı kütüphaneler.

Bu bölümde, diğer kütüphaneleri incelemeyeceğiz, en çok kullanılan çözümün neden Celery olduğu üzerine bir kaç alt başlıkta inceleme yapacağız.

Esneklik.

  • Celery, birden fazla message broker ile çalışabilir. Bu yazı da kurulumu pek dertli olmadığından Redis tercih edilmiş olsa da, RabbitMQ, AmazonSQS ve farklı bir kaç broker çözümüne tam destek sağlar.
  • Bunun yanında task sonuçlarının kayıt edilebileceği, alternatifi bol bir listeye sahiptir. Task sonuçları, Django ORM, SqlAlchemy,(tabi ki bu ormler ile koşturduğunuz veritabanları 🙂 ) MongoDb, Redis veya Memcahe gibi ortamlarda yönetilebilir ve saklanabilir.
  • Çok uzun süren tasklerinizde, custom task sınıfları oluşturarak, bir lock mekanizması kurgulayabilir ve kuyrukta işlenmekte olan taskin aynısını tekrar kuyruğa eklemenin önüne geçebilirsiniz.
  • Celery default olarak task argümanlarını JSON olarak serialize etsede, global ya da task özelinde farklı serializasyon formatları kullanabilirsiniz. Örneğin YAML, Pickle(güvenlik açıkları bulunur), msgpack.

Monitoring.

  • Canlıda koşan bir çok sistemde pekala bir şeyler yolunda gitmeyecektir. Sistem metriklerini inceleyerek çıkarımlar yapmak ve önlemler almak ise iyi bir stratejidir. Celery, Flower eklentisi ile birlikte bir monitoring paneline kavuşur. Flower kuyruklarınızın durumunu, hangi workerın ne durumda olduğunu, workerların kuyruk karşısında ezilip ezilmediği ve diğer Celery spesifiğindeki sorulara fazlasıyla bilgi sağlayan ve remote kontrole izin veren iyi bir eklenti. Göz atmak için link burada.

Birden fazla kuyrukla çalışabilme.

  • Celery, diğer daha basit çözümler aksine, ayrık kuyruklardan eş zamanlı task işleyebilir. Örneğin devasa miktarlarda mail gönderimi yapan taskleriniz varsa sadece mail tasklerini yönetecek bir kuyruk oluşturmak, mail işlemlerini diğer tasklerden ayrıştırarak daha anlık bir bildirim sistemi kurmanıza olanak verir.

Zaman limitleme.

  • Celery ile saniyede/dakikada/saatte kaç taskin yürütülebileceğini veya bir taskin ne kadar süre çalışmasına izin verilebileceğini kontrol edebilirsiniz ve bu, belirli bir worker için veya her bir task türü için ayrı ayrı olarak ayarlanabilir. Örneğin 10 sn üzerine çıkan tasklerin oto terminate edilmesi vs.

Subtask yönetimi.

  • Celery bir task üzerinden başka tasklerin iteratif ya da tekil olarak çağrılmasına olanak sağlar. Örneğin büyük bir listeye mail gönderimi yapan taskiniz bulunuyor, tek seferde gönderim yapmak için bir task yürütülmeye başlandığında, task yürütme zaman sınırına takılıp bir exception fırlatıp çıkış yapabilir. Çıkış yapıldığı anda, hangi kullancılara gönderimin başarılı olduğu muallak olacak veya ayrıştırmak zor olacaktır. En kötü ihtimallede task tekrar çalıştırılması için sıraya eklendiğinde duplike bildirimler oluşabilir. İteratif bir şekilde, tek kullanıcı için subtask çağrılır ise exception fırlatıp çıkan taskler loglanıp email bildirimi alamayan kullanıcılar için tekrardan bir gönderim sağlanabilir.
  • @app.task()
    def send_email_cron_task(huge_email_list):
    	
    	for email in huge_email_list:
    		send_email_to_sinle_user_task.delay(email=email)
    

Retry mekanizması.

  • Celery, bir task yürütürken, exception durumlarında tekrar deneyebilme mekanizmasına sahiptir. Örneğin, Twitter API’ya çıkıp feed yenileyen bir task başarısız olması durumunda, güncelleği sağlayabilmek için kendi kendini belirtilen limitler dahilinde yineleyebilir veya yaptığınız exception handling sonrası gerekli loglamayı yapıp taski manuel sizde yineleyebilirsiniz. Örneğin feedin yenilenemediği durumda,  10 sn aralıklarla kendini 5 defa yineleyecek bir task;
  • from twitter.exceptions import TimeLineNotRefreshed
    @app.task(autoretry_for=(TimeLineNotRefreshed,), 
    		  retry_kwargs={'max_retries': 5, "countdown": 10})
    def refresh_timeline(user):
        return twitter.refresh_timeline(user)

Uyumluluk.

Dökümantasyon, komünite ve eklentiler.

  • Son olarak Celery bir Python kütüphanesi ve arkasında büyük bir komunite bulunuyor. Bu komunite eklentiler ile Celery’e yetkinlikler kazandırıyor veya sorularınız olduğunda mutlaka birileri tarafından deneyimlendiği için çok hızlı yanıtlar bulabiliyorsunuz. Kişisel olarak, bir teknolojinin gelişimi ve sürdürülebilirliği için bu desteğin önemli olduğunu düşünüyorum.
  • İyi organize edilmiş bir dökümantasyona da sahip.

Son sözler.

Uzunca bir yazı oldu. Celery ilk bakışta anlaşılması biraz güç bir teknoloji fakat, en temelden alarak hangi problemi çözmek için var olduğunu, bu problemi çözerken nelere ihtiyacımız olduğu konusunda artık fikir sahibiyiz. Celery 4.0+ sürümüyle stabil, kullanışlı. Fakat buradaki temel soru gerçekten uygulamanızın ayrık taskler işlemesine ihtiyacı var mı? Celery basit olsa da uygulama kompleksliğini arttırmakta.

Cevabınız evet ise burada yapmış olduğumuz girişe ek olarak, şu adreten dökümantasyona gidebilir ve keşfetmeye başlayabilirsiniz. 🙂

Celery özelindeki spesifik işlemler için yakın gelecekte yeni yazılardan bahsediyor olacağım. Umarım sizin için de keyifli bir yazı olmuştur görüşmek üzere!


Yazı oluşturulurken yararlanılan bazı kaynaklar;

Celery dökümantasyonu

Understanding Celery & CeleryBeat Pycon

Task Queues: A Celery Story

 

 

Tarih:Blog

İlk Yorumu Siz Yapın

Bir cevap yazın

E-posta hesabınız yayımlanmayacak.

Göster
Gizle