APACHE AIRFLOW

Mevlüt Yıldırım
6 min readJul 21, 2022

--

Merhabalar 🤗️, uzun zamandır yazı yazmadım ve bu süre zarfında bir çok yeni teknolojilerle çalıştım. Bunlardan biri de Airflow. Bugün Airflow’dan bahsetmek istiyorum.

Ben bir yazılım mühendisiyim. Veri tabanıma sürekli yeni kayıtlar atmalıyım ve kayıtları belli zaman aralıklarında yapmam gerek. Bunlara ek olarak bir de aynı anda bir çok işlem gerçekleşmesi gerekiyor. Tüm işlemler birbiriyle ilişkili ve birbirini tetikleyerek yapılıyor. Hatta bu kadar ile de yetinmeyip, bir de çok kolay yönetmek istiyorum. Mümkünse bir arayüze sahip olayım, tüm bağlantıları görebileyim, işlemlerin başarılı/başarısız durumlarını bu arayüzden kontrol edebileyim.

Çok şey istedim diye düşünebilirsiniz. Ama aslında Airflow kullanarak çözebileceğiniz bazı problemleri yazımın girişinde belirtmiş oldum :)

Airflow iş akışlarını planlamak ve izlemek için kullanılan bir platformdur. Kullanılan dil Python’dur. Açık kaynaklı ve Apache tarafından yayınlanmıştır. İş akışlarını düzenlenmesinde çok büyük kolaylık sağlamakla birlikte basit arayüz ile yönetim zorluğunu optimum seviyeye indirgemektedir.

Örneğin herhangi bir platform’un Api’sini kullanıyorum. Belli bir zaman aralıklarla bu Api’ye istek atıp, kendi veri tabanımı beslemem gerekiyor. Airflow’da bunu yapabilmek için bir DAG (Yönlendirilmiş Asiklik Grafikler) oluşturmam ve içinde kod parçalarını yazmam yeterli. Belli bir zaman aralığı vererek bu Dag’ın otomatik tetiklenmesini sağlayabilirim. Hatta bu işlemi yaparken aynı anda farklı Api’lerde tetiklemek istediğimde devreye Task kavramı giriyor. En kaba tabirde Dag’ın görevini yerine getiren farklı fonksiyonlara Task denmekte. Birden çok oluşturabileceğim bu Task yapısı ile de Dag’ın iş akışını tasarlayıp, işlemlerin takibine sağlayabilirim.

Az önce verdiğim sadece bir örnekti. Benzer senaryolara sahip problemlerin çözümü tabii ki Airflow ile mevcut. Tamamen hayal gücünüze ve problem içeriğine kalmış.

Yazımın ilerleyen kısımlarında örnek bir Airflow uygulaması oluşturacağız ama ondan önce dökümantasyonunu inceleyelim.

Öncelikle detaylı bir dökümantasyona sahip. Postgres, Google ve benzeri bir çok paketi içeriyor ve bu paketlerle ilgili entegrasyonların nasıl sağlanacağı açıklanıyor. Docker ile nasıl ayağa kaldırılacağı, Api isteği olarak nasıl dışarıdan tetikleneceği vs detaylı bir şekilde verilmiş. Airflow geliştirirken kesinlikle kendi dokümanı her zaman en iyi kaynak olacaktır.

Linux Makineye Airflow Kurulumu

Airflow çok basit bir şekilde bulut ortamında ki bir sanal makineye veya kendi linux işletim sisteminize kurabilirsiniz. Aşağıda dokümantasyon dan alınmış kod parçalarını görebilirsiniz. Öncelikle Python ve paket yükleyicisi olan pip modülünün bilgisayarınızda kurulumu olması gerekmekte. Daha sonra bir kök dizin belirleyip, versiyonlar ile ilgili değişkenler tanımladıktan sonra pip aracılığı ile Airflow’un kurulumunu sağlayabilirsiniz.

export AIRFLOW_HOME=~/airflow

AIRFLOW_VERSION=2.3.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

airflow standalone

Airflow standalone komutu ile kurduğunuz Airflow uygulamasını ayağa kaldırabilirsiniz. Genelde standalone komutu çoklu olarak tüm yapıyı ayağa kaldırmasına rağmen bir veri tabanına ihtiyaç duyabilir. Kendi bilgisayarınız için local üzerinden kurduğunuz bir veritabanı veya aktif kullandığınız bir veritabanının bilgilerini vermeniz gerekebilmektedir. Bu ve bunun benzeri bir çok ayarlamayı yapacağınız dosya, airflow dizininizde ki airflow.cfg dosyasıdır. Buradan veri tabanını bağlantı bilgilerinizi cfg dosyası içinde tanımlamanız gerekmektedir.

Localhost:8080 portunda aşağıdaki gibi bir arayüz sizi karşılayacaktır.

Bu arayüz sayesinde aktif&pasif Dag’larınızı kaç kere çalıştıklarını ve bu çalışmalarının hangilerinin başarılı hangilerinin başarısız tamamlandığını görebileceksiniz. Ayrıca üst kısımdaki seçenekler ile farklı veri tabanı veya bulut ortamlarının bağlantılarını, Airflow’unuza erişebilecek kullanıcıların yetki ve giriş bilgilerini ayarlamak gibi bir çok işlemi yapabilirsiniz.

Airflow Kullanarak Dag Ve Task Oluşturma

Kurulumu tamamlandıktan sonra artık Dag’larımızı oluşturabiliriz. Airflow dizinimiz de oluşan dags klasörümüzün içinde Python dosyalarının her biri aslında bir Dag’ı oluşturmaktadır. Bu Dag’ların içinde ise peşi sıra veya aynı anda tetiklenebilecek Task’lar oluşturulabilir.

Örneğin first_dag.py adında bir python dosyası oluşturduk. Aşağıdaki kod parçasında ki gibi bu python dosyasını doldurabiliriz.

Öncelikle Airflow üzerinden gerekli modülleri dosyamıza import ediyoruz. Default argümanlar kısmı gerekli olmamasına karşın tekrar deneme, tekrar deneme süresi, yazan kişi gibi çeşitli bilgileri vererek Dag çalışmasını daha kontrollü bir şekilde yapabiliriz. Dag id aslında oluşturulan Dag’ın arayüz de görünecek ismidir. Oluşturduğumuz fonksiyonlar ise Dag’ımızın tasklarını belirtmektedir. Python Operator ile bu tasklarımızı çalıştırabiliriz. Airflow’u çalıştırdığımızda Dag aşağıdaki resimde olduğu gibi görünmektedir.

Dag detayına baktığımızda tetiklenecek Task’ları gösteren bir graph yapısı, her Task’ın çalışma ve tamamlanma durumlarını gösteren ifadeler bulunmaktadır.

Ayrıca diğer seçeneklerden yazılan kodu, bir sonra ki çalışma zamanını ve log çıktılarını da görebilmekteyiz. Şimdi bu Dag’a yeni farklı örnek Task’lar ekleyelim ve bu Task’ların çalışma sırasını birbirine bağlayalım. İlk oluşturmada gösterdiğim gibi her Task için fonksiyon oluşturmamız gerekmektedir. Fonksiyonları oluşturduktan sonra aşağıda ki gibi >> işaretleri ile Task’ları birbirini tetikleyecek şekilde bağlayabiliriz.

Birden çok Task’lı yapımızın kod yapısı aşağıdaki gibidir.

Task1>>Task2>>Task3>>Task4 farklı kombinasyonlarda ayarlayabiliriz.

örneğin ;

Task1>>[Task2,Task3]>>task4 şeklinde Task1 sonrası Task2 ve Task3 ü aynı anda tetikletebiliririz.

Airflow arayüzü çalışma durum ve zamanlarını da aşağıdaki gibi sunmaktadır.

Airflow Bağlantı İşemleri

Airflow arayüzünden bir çok bağlantı sağlayabiliriz. (Cloud, Postgres, Docker, HTTP..) Örnek olması açısından burada Postgres bağlantısı kurmaktan bahsedeceğim. Öncelikle arayüzden admin seçeceğine tıklayarak bağlantılara gitmemiz gerekmektedir. Yeni bağlantı butonuna tıkladığımızda aşağıdaki gibi ekran açılacaktır.

Bu ekrandaki bağlantı id’si kod içeriğinde bağlantıyı sağlayacağımız isimdir. Gerekli bilgileri ve bağlantı tipini seçtikten sonra test edip kayıt edebiliriz.

Örneğin postgres bağlantısı için;

Kayıt ettikten sonra kod parçamızda Airflow’un sunduğu Postgres modülü ile bir cursor oluşturup sorgularımızı çalıştırabiliriz. Bu ve diğer bağlantıları da yine aynı mantık çerçevesinde kolayca Airflow modülleri ile kullanabiliriz.

Airflow Zamanlayıcı İşlemleri

Yukarıdaki örnekte sadece başlangıç zamanı vermiş olup herhangi bir zamanlayıcı kurmamıştık. Python’ın datetime modülü ile çok kolay bir şekilde istediğimiz sürede bir Dag’ı çalıştırabiliriz. Yalnızca Dag oluştururken ek olarak bir argüman daha girmemiz yeterli.

Schedule_interval argümanına verdiğimiz sürede bir Dag çalıştırılacaktır. Eş zamanlı çalışma veya biri bitmeden diğerinin başlaması durumunda kuyruğa alınacak ve çalışma aksamayacaktır. Yani kod yapısında bir problem olmadığı takdirde Dag’lar belirtilen sürede bir tetiklenecektir.

Airflow Tasklar Arası Haberleşme

Airflow da bir Dag’ın içinde birden fazla Task olabileceğini söylemiştik. Peki Task1 deki işlem sonuçları diğer Task’larda kullanılacaksa verileri nasıl iletebiliriz ? Böyle bir şey mümkün mü ? Evet mümkün. Task’lara vereceğimiz op_args parametresi sayesinde herhangi bir Task’dan döndüreceğimiz çıktıyı diğer bir Task’a iletebiliriz. Tabii ki büyük çıktılar olması önerilmeyen ve genelde 0–50 karakterlik veya True-False çıktıları döndürülmesi performans açısından ve depolama açısından daha iyi sonuçlar verecektir. Büyük verilerin Task’lar arası geçişinde daha çok bir bulut ortamına verilerin yazılıp daha sonra Url iletimi ile diğer Task’dan çekilmesi kullanılmaktadır. (Örneğin Cloud Storage kullanılması)

Op_args kullanımı aşağıdaki gibidir.

Sonuç

Airflow; karmaşık, sıralı ve otomatik tetiklenmesi gereken işlemlerinizi oluşturabilir, bunları arayüzü sayesinde kontrol edilebilir kılar. Python dilini kullanarak farklı iş akışlarınızı hem oluşturup hem de arayüz üzerinden kontrol etmenizi sağlar. Farklı bağlantıları ile diğer teknolojilere bağlantınız kolaylaşır. Hata olması durumunda size pop-up çıkarak hataların kolayca tespit edilmesini sağlar.

Bu yazımda genel itibariyle Airflow dan, nasıl kurulacağından, hangi işlemler için kullanılabilir olduğundan bahsettim. İleri ki yazılarımda Rest Api ile tetiklenmesi, Docker üzerinden kullanımı, XCOM kullanımı gibi detay konulardan bahsetmeyi düşünüyorum. Okuduğunuz için teşekkürler…

--

--