15%

Implementasi Schedule Delay Flask menggunakan Message Broker Redis

17-Jul-2021

Di era digital yang serba terhubung ini, komunikasi antar aplikasi bukan hanya keinginan, melainkan kebutuhan yang mendesak. Dalam upaya untuk menjawab tantangan ini, muncullah teknologi yang dikenal sebagai message broker.

Alat vital ini memastikan bahwa aplikasi dan sistem dapat berinteraksi satu sama lain dengan efisiensi dan keandalan yang tinggi, tanpa terhambat oleh perbedaan bahasa pemrograman, platform, atau arsitektur.

Sebelum kita menyelami lebih dalam tentang bagaimana message broker berfungsi dan mengapa itu menjadi bagian integral dari infrastruktur teknologi modern, mari kita mengerti dasar-dasar dan prinsip kerjanya.

Artikel ini akan membawa Anda melalui konsep message broker, mengungkap bagaimana ia menjadi jembatan komunikasi yang penting di antara sistem-sistem terisolasi, memfasilitasi pertukaran informasi yang mulus dan meningkatkan kemampuan sistem untuk berskala dan beradaptasi dengan perubahan.

Apa itu Message Broker ?

Message broker, atau broker pesan dalam bahasa Indonesia, adalah sebuah alat perangkat lunak yang memainkan peran penting dalam komunikasi antar aplikasi. Bayangkan sebuah dunia di mana berbagai aplikasi dan sistem dapat berbicara, bertukar data, dan bekerja bersama dengan lancar, tanpa hambatan. Di sinilah message broker berperan.

Message broker bertindak sebagai perantara yang mengatur, menerjemahkan, dan mengarahkan pesan antara sistem yang berbeda. Misalnya, dalam sebuah perusahaan, sistem penjualan mungkin perlu berkomunikasi dengan sistem inventori. Alih-alih memiliki koneksi langsung yang rumit dan mahal antara kedua sistem ini, message broker dapat mengambil pesan dari sistem penjualan, menerjemahkannya ke format yang dapat dimengerti oleh sistem inventori, dan mengirimkannya dengan cara yang efisien.

Fungsi utama message broker adalah untuk menjamin bahwa pesan dikirim dari satu titik ke titik lain dengan aman, dapat diandalkan, dan sesuai dengan urutan yang benar, bahkan dalam kondisi beban kerja yang tinggi atau ketika sistem-sistem tersebut beroperasi dalam lingkungan yang berbeda.

Menggunakan message broker juga memungkinkan pengembangan dan pemeliharaan sistem yang lebih mudah, karena perubahan pada satu sistem tidak harus secara langsung mempengaruhi sistem lain. Ini mendukung arsitektur mikroservis, di mana aplikasi dibangun sebagai kumpulan layanan kecil dan terpisah yang berkomunikasi melalui mekanisme yang ringan seperti message broker.

Dalam dunia yang semakin terhubung dan kompleks ini, message broker merupakan kunci untuk menciptakan solusi yang fleksibel, dapat diandalkan, dan skalabel, memberikan fondasi yang kuat untuk inovasi dan pertumbuhan. Mari kita ambil langkah proaktif menuju masa depan teknologi yang lebih cerdas dan terintegrasi dengan memanfaatkan kekuatan dari message broker.

Pada artikel kali ini kita akan membahas schedule update data dengan waktu dinamis. Untuk itu saya membuat relasi database sederhana untuk skenario ini.

Pada gambar atau ERD sederhana diatas, kita dapat melihat relasi pada setiap tabel. Untuk tipe data field status transaksi pada tabel transaksi diatas adalah String, Jika kita merujuk pada native SQL maka akan menjadi varchar. Perlu menjadi catatan untuk kondisi status transaksi disesuaikan dengan kondisi proyek yang Anda kerjakan, mungkin status transaksi dibuat tabel terpisah dengan transaksi dan dibuat relasi ke tabel transaksi untuk memudahkan filter dan order dan lainnya, disini hanya contoh untuk memahami proses schedule update status transaksi.


Model Flask SQLAlchemy

Untuk memulainya mari kita membuat modelnya.

model.py

from app import db
from datetime import datetime
 
class Product(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    product_name = db.Column(db.String(60), nullable=False)
    price = db.Column(db.Numeric)
    created_at = db.Column(db.DateTime, nullable=False, default=datetime.now)
    transactions = db.relationship('Transaction', backref='product', lazy='dynamic')
 
class Transaction(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    amount = db.Column(db.Numeric)
    status_transaksi = db.Column(db.String(20), nullable=False)
    product_id = db.Column(db.Integer, db.ForeignKey('product.id'), nullable=False)
    created_at = db.Column(db.DateTime, nullable=False, default=datetime.now)

 

Endpoint Order

Selanjutnya kita akan membuat endpoint order

view.py

from . import transaction as bp
from ..model import Product, Transaction
from app import db
from flask import current_app
 
@bp.route('/order/<int:product_id>', methods=['GET'])
def order(product_id):
    product = Product.query.get_or_404(product_id)
    transaction = Transaction(
        amount = 1000000,
        status_transaksi = 'pending',
        product = product
    )
    db.session.add(transaction)
    db.session.commit()
    return {'success': 'Order berhasil'}

 

Insert data pertama akan menambahkan status transaksi pending karena menunggu pembayaran dari customer (biasanya menggunakan metode pembayaran transfer atau va).

Kasus: Status Transaksi akan berubah menjadi expired jika customer tidak melakukan pembayaran dalam 1x24 jam dimulai dari waktu order.


Ada beberapa cara untuk melakukan pengecekan ini:

 

  1. Cronjob/Crontab
    Schedule pada cronjob menjadi masalah, karena waktu expired tidak sesuai dengan schedule. Bagaimana jika ada banyak customer melakukan order pada waktu yang berbeda-beda? pilihan pertama adalah melakukan pengecekan setiap order.

    def cek_billing_expired():
            billing = Transaksi.query.filter_by(status_transaksi = 'pending').all()
            if billing is not None:
                for bil in billing:
                    if datetime.now() > bil.created_at + timedelta(days=1):
                        bil.status_transaksi = 'expired'
                db.session.commit()


    Masalahnya pada cronjon waktu bersifat statis, mungkin ada cara untuk membuatnya sesuai skenario yang kita inginkan, akan tetapi akan menjadi rumit.
    disini kita banyak menghabiskan memory untuk melakukan pengecakan yang sebenarnya disetiap cronjob berjalan belum tentu ada waktu order yang melewati (>=) 1 x 24 jam.
     

  2. Before Request
    Kita bisa melakukan pengecekan dengan algoritma cronjob diatas tetapi dijalankan oleh before request. 
    Perbedaan teknik ini dengan cronjob adalah, pengecakan akan dijalankan ketika ada request (akan menjalankan function berfore request dan setelahnya endpoint tujuan) jika ada order dengan status transaksi pending yang telah melewati 1 x24 jam tetapi tidak ada request maka hal ini tidak mengganggu sistem karena status transaksi akan dilakukan pengecakan setiap ada request. Kekurangan teknik ini, juga menguras memory karena pengecakan dilakukan setiap ada request.

     
  3. Message Broker
    Pada teknik ini kita akan memberikan task kepada message broker dengan delay time (sesuai waktu yang kita tentukan). Jadi misalkan ada order pada jam 1 siang, maka kita akan kirimkan parameter id order (id pada tabel transaksi) ke message broker untuk dilakukan pengecekan 24 jam mendatang.

Menurut saya teknik ini adalah yang terbaik dari teknik-teknik sebelumnya sesuai dengan skenario/kasus proyek ini. karena akan melakukan pengecekan sesuai dengan task dan waktu order.


Implementasi Schedule Message Broker dengan Redis


Pilihan menggunakan redis karena lebih mudah dipahami untuk pemula dan juga mudah installasinya. Untuk menghubungkan Flask dengan redis kita menggunakan package interface python redis
pip install redis
pip install rq

kemudian kita inisiasi __init__.py

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from redis import Redis
import rq
from .config import Config
 
db = SQLAlchemy()
 
def create_app(config_class=Config):
    app = Flask(__name__)
    app.config.from_object(Config)
 
    db.init_app(app)
 
    app.redis = Redis.from_url(app.config['REDIS_URL'])
    app.task_queue = rq.Queue('background-task', connection=app.redis)
 
    # Register blueprints
    from app.transaction.view import bp
    app.register_blueprint(bp)
 
    return app

 

Pada app.task_queue kita membuat queue di redis dengan nama background-task, jadi tugas akan diberikan ke redis untuk dijalankan sesuai waktu yang kita set. Untuk membuat tugas ke redis maka kita akan modifikasi/menambahkan pada endpoint  view.py /order/<int:product_id>

from . import transaction as bp
from ..model import Product, Transaction
from app import db
from flask import current_app
from datetime import timedelta
 
@bp.route('/order/<int:product_id>', methods=['GET'])
def order(product_id):
    product = Product.query.get_or_404(product_id)
    transaction = Transaction(
        amount = 1000000,
        status_transaksi = 'pending',
        product = product
    )
    db.session.add(transaction)
    db.session.commit()
 
    # kirim ke task/worker untuk dicek dalam 15 detik, untuk mencoba
    current_app.task_queue.enqueue_in(
            timedelta(seconds=15),
            'app.task.cek_billing',
            transaction.id
        )
    return {'success': 'Order berhasil, status akan expired dalam 15 detik'}
 

 

current_app.task_queue.enqueue_in adalah perintah tugas yang akan dikirimkan ke redis. queue.enqueue_at() juga dapat digunakan tapi parameter pertama adalah object datetime sedangkan queue.enqueue_in() dapat menerima parameter timedelta pada parameternya seperti contoh diatas.

Kemudian perhatikan parameter kedua 'app.task.cek_billing'  yaitu redis akan menuju ke fungsi tersebut sesuai waktu yang ditentukan yaitu 15 detik dan akan membawa parameter ketiga transaction.id

task.py

from app import create_app, db
from .model import Transaction
 
app = create_app()
app.app_context().push()
 
def cek_billing(id):
    transaksi = Transaction.query.get(id)
    if transaksi:
        if transaksi.status_transaksi == 'pending':
            transaksi.status_transaksi = 'expired'   
    db.session.commit()

 

app.app_context().push() perlu dilakukan karena file ini berada diluar inisiasi flask pada __init__.py sehingga kita perlu memberi tahu flask bahwa file ini termasuk dalam aplikasi.

worker wajib dijalankan agar task yang sudah dikirimkan ke redis dapat dijalankan. berikut perintahnya rq worker -u redis://redis:6379/0 background-task --with-scheduler dan buka terimal lagi jalankan flask run

 

Untuk melihat kode secara utuh Anda dapat clone pada di https://github.com/dhanipro/flask-schedule-delay-redis jalankan dengan docker docker-compose up

Topik : python sql Flask
Similar Posts

Komentar (0)

Tinggalkan Komentar