Skip to content

Latest commit

 

History

History
572 lines (355 loc) · 63.7 KB

File metadata and controls

572 lines (355 loc) · 63.7 KB

فصل 8 کارکردن به صورت ناهمزمان

  • چرا به ناهمزمانی نیاز داریم؟
  • الگوهای ناهمزمانی
  • کار کردن با Celery
  • فهم asyncio
  • ورود به channelها

در مواقع ساده‌تر، یک برنامه وب در جنگو یک فرآیند یکپارچه بزرگ بود که می‌توانست یک درخواست را مدیریت کرده و تا زمانی که پاسخی تولید نکرده است بقیه درخواست ها را مسدود کند.

در دنیای میکروسرویس ها اپلیکیشن ها مانند یک زنجیره پیچیده و اغلب بهم پیوسته تشکیل شده اند که هرکدام خدمات بخصوصی را ارائه می دهد. جنگو احتمالا مانند یک رابط در این جریان برنامه ها عمل می کند هماطور که Eliyahu Goldratt می گوید، "این زنجیره به اندازه ضعیف ترین حلقه آن قوی است.» به عبارت دیگر، ماهیت همزمانی (syncronous) جنگو به طور بالقوه می تواند آن را به یک گلوگاه عملکرد تبدیل کند (می تواند نقطه ضعفی برای جنگو باشد).

برای همین راه حل های ناهمزمان (asynchronous) مختلفی برای این موضوع در نظر گرفته شده است که این راه حل ها می توانند به شما در حفظ زمان پاسخ (response time) سریع و اجرا کردن ماهیت ناهمزمان برنامه‌های امروزی کمک کنند.

چرا ناهمزمانی؟

مانند تمامی فریمورک های وب مبتنی بر WSGI جنگو یک فریمورک همزمان (synchronous) است. وقتی که یک کاربر به یک صفحه وب درخواستی (request) را می فرستد، درخواست از طریق یک View به جنگو می رسد و از خطوط مختلف کد عبور می کند تا صفحه وب رندر شده باز گردد. به خاطر اینکه تا زمانی که این مراحل اجرا شود بقیه درخواست ها متظر می مانند یا مسدود می شوند به آن همزمان (synchronous) می گویند.

توسعه‌دهندگان تازه وارد جنگو نگران ایجاد وظایف ناهمزمان (asynchronous tasks) نیستند، من متوجه شده ام که کد آن ها شامل کارهای زمانبر و کندی مانند پردازش تصویر یا حتی درخواست‌های پیچیده پایگاه داده است که منجر به کندی بارگذاری (load) صفحه می شود. در حالت ایده آل کارهای زمانبر باید از چرخه درخواست-پاسخ خارج شوند. زمان بارگذاری صفحه برای تجربه کاربری یک موضوع حیاتی است و باید برای جلوگیری از هرگونه تاخیر بهینه سازی شود.

یکی دیگر از مشکلات اساسی این مدل همزمان (synchronous)، مدیریت رویدادهایی است که توسط درخواست های وب ایجاد نمی شوند.

حتی اگر یک وب سایت هیچ بازدید کننده ای نداشته باشد، باید به فعالیت های مختلف تعمیر و نگهداری توجه کرد. آنها را می توان در یک زمان خاص مانند ارسال یک خبرنامه در نیمه شب جمعه، یا کارهای روزانه پشت صحنه (background tasks) مانند اسکن فایل های آپلود شده برای ویروس ها، برنامه ریزی کرد. برخی از وبسایت ها ممکن است به‌روزرسانی‌های همزمان یا اعلان‌های لحظه ای را از طریق WebSockets ارائه دهند که توسط مدل WSGI قابل مدیریت و پشتیبانی نباشد.

برخی از انواع معمول کارهای ناهمزمان (asynchronous tasks) عبارتند از:

  • ارسال ایمیل/اس ام اس تک یا انبوه
  • صدا زدن وب سرویس ها
  • کوئری های SQL پیچیده و کند
  • فعالیت ها logging
  • رمزگذاری یا رمزگشایی فایل های مدیا (encoding or decoding)
  • تجزیه مجموعه بزرگی از متن
  • وب اسکرپینگ
  • ارسال خبرنامه
  • تسک های یادگیری ماشین و پردازش تصویر ( Machine learning, Image processing)

همانطور که دیدید هر پروژه بزرگ جنگو به یک زیرساخت برای مدیریت وظایف ناهمزمان نیاز دارد. ممکن است زمانی که ناهمزمانی را به کدتان بیاورید، با یک فرآیند، کدتان به مراتب سریع‌تر اجرا شود (برای مثالی چشمگیر از افزایش سرعت، به بخش درک asyncio مراجعه کنید). این به این دلیل است که مادامی که منتظر انجام یک کار I/O بودید، بهتر است آن زمان صرف اجرای کارهای دیگر استفاده شود.

مشکلات کد ناهمزمان

برنامه نویسی ناهمزمان ممکن است بسیار وسوسه کننده به نظر برسد، اما تسلط بر آن بسیار دشوار است.

چندین مشکل وجود دارد که باید از آنها آگاه باشید، مانند موارد زیر:

  • حالت رقابتی(Race condition): اگر دو یا چند نخ (thread) مقدار یک داده را تغییر دهند، ترتیب اجرای آنها می تواند بر مقدار نهایی تأثیر بگذارد. این مسابقه می تواند منجر به قرار گرفتن داده ها در وضعیت نامشخص شود.

  • گرسنگی(Starvation): انتظار نامحدود توسط یک نخ (thread) به دلیل ورود موضوعات دیگر.

  • بن بست(Deadlock): اگر نخی منتظر یک منبع باشد که نخ دیگری در حال استفاده آن منبع است و بالعکس همزمان در حال استفاده از منبع باشند، هر دو رشته در یک بن بست گیر کرده اند.

  • حفظ ترتیب(Order preservation) : ممکن است وابستگی هایی بین بخش های کد وجود داشته باشد که هنگام تغییر ترتیب اجرا مشاهده نشود.

در پایتون، ممکن است اجتناب کامل از چنین اتفاقاتی غیرممکن باشد، اما می‌توانیم یکسری از روش های بهینه(best practice) ها را برای حذف آنها برای بیشتر اهداف عملی دنبال کنیم. این موارد در بخش Celery best practices پوشش داده خواهند شد.

پترن ها و الگوهای ناهمزمانی(Asynchronous patterns)

بیایید به الگوهای کلی مختلفی که در وب اپلیکیشن ها استفاده شده است نگاه کنیم.

الگوی پاسخ به اندپوینت(Endpoint callback pattern)

در این الگو زمانی که یک سرویس صدا زده می شود یک اندپوینتی مشخص می شود که پس از اتمام عملیات صدا زده می شود. این شبیه به مشخص کردن بازخوانی(callback) در برخی از زبان های برنامه نویسی مانند جاوا اسکریپت است. زمانی که به عنوان یک بازخوانی (callback) HTTP استفاده می شود به عنوان webhook شناخته می شود.

یک فرآیند تقریباً به شرح زیر است:

  1. کلاینت یک سرویس را از طریق کانالی مانند REST، RPC یا UDP فراخوانی می کند. همچنین اندپوینتی را برای اطلاع از آماده شدن نتیجه فراهم می کند.

  2. فراخوانی(callback) بلافاصله بر میگردد

  3. زمانی که کار تکمیل شد، سرویس اندپوینت تعریف شده را فراخوانی می کند تا به فرستنده اطلاع دهد.

به یاد داشته باشید که ارائه دهنده یا گیرنده سرویس باید بتواند به فرستنده دسترسی داشته باشد. برای داده های حساس، باید نوعی احراز هویت برای شناسایی فرستنده و رمزگذاری برای محافظت از کانال در برابر سرقت دیتا وجود داشته باشد.

این الگو کاملاً محبوب است و توسط برنامه های وب مختلف مانند GitHub، PayPal، Twilio و غیره پیاده سازی شده است. این ارائه دهندگان سرویس معمولاً یک API برای مدیریت اشتراک در این WebHok دارند، مگر اینکه شما یک واسط برای انجام چنین کاری داشته باشید.

الگوی Publish-subscribe

این الگو شکل کلی تری نسبت به الگوی قبلی دارد. در اینجا، یک broker به عنوان یک واسطه بین فرستنده و گیرنده ها عمل می کند. درست است، چندین گیرنده می‌توانند در یک موضوع مشترک شوند، یعنی یک گروه منطقی نام‌گذاری شده از کانال‌هایی که توسط هر کسی منتشر شده است.

در این الگو، روند ارتباط به شرح زیر است:

  1. یک یا چند listener به broker اطلاع می دهند که علاقه مند به اشتراک گذاری یک موضوع هستند
  2. یک publisher پیامی را به broker تحت عنوان مربوطه ارسال می کند
  3. واسط (broker) پیام را به همه مشترکین (subscribers) ارسال می کند

یک broker این امکان را دارد که فرستنده و گیرنده(sender and receiver) را به جهات مختلف جدا می کند. علاوه بر این، broker می تواند بسیاری از وظایف اضافی مانند کامل تبدیل فیلتر کردن پیام، را انجام دهد. این الگو کاملاً مقیاس پذیر است و از این رو در میان افزارهای(middleware) سازمانی محبوب است.

ابزار Celery از مکانیسم‌های انتشار/اشتراک (pub/sub) برای انتقال دیتای درونی خود استفاده می‌کند، مانند Redis در ارسال پیام.

الگوی Polling

الگوریتم polling همانطور که از نامش پیداست شامل بررسی دوره ای یک سرویس برای هر رویداد جدید توسط کلاینت است. این الگو در اغلب موارد نامناسب ترین ابزار برای ارتباط ناهمزمان (asynchronous) می باشد زیرا الگوی polling میزان استفاده از سرویس را افزایش می دهد و شرایط برای مقیاس پذیری سخت می شود. با این حال، ممکن است این تنها راه حل عملی در یک سیستم قدیمی باشد.

یک سیستم polling به شرح زیر کار می کند:

  1. کلاینت یک سرویس را صدا می‌کند
  2. این صدازدن رویدادها و وضعیت جدیدی برای task بر می گرداند
  3. کلاینت منتظر می ماند و مرحله دوم را در فواصل زمانی تکرار می کند

هنگام بازیابی وضعیت سرویس ممکن است درجاتی از تاخیر همزمان وجود داشته باشد. ممکن است کلاینت تا رسیدن ریسپانس مسدود شود. از این رو، گاهی اوقات از این الگو به عنوان busy-waiting یاد می شود.

راه حل های ناهمزمانی در جنگو

بقیه این فصل سیستم‌های asynchronous محبوب زیر را که در کنار جنگو استفاده می‌شوند، با موارد استفاده متفاوت آن ها پوشش می‌دهد. آنها به شرح زیر فهرست شده اند:

  • ابزار Celery: مدل worker مبتنی بر نخ برای انجام محاسبات در خارج از یک فرآیند جنگو
  • ماژول asyncio: ماژول داخلی پایتون برای اجرای همزمان چندین کار در یک نخ
  • فریمورک django channel: معماری مشابه صف پیام در زمان واقعی برای مدیریت رویدادهای I/O مانند WebSockets

بیایید ابتدا محبوب ترین و قوی ترین راه حل برای اجرای وظایف به صورت ناهمزمان(asynchronous) را درک کنیم: Celery

کار کردن با Celery

ابزار Celery یک مدیر صف تسک های ناهمزمان با ویژگی های غنی است. در اینجا، یک تسک به یک فراخوانی اشاره دارد که هنگام اجرا، فعالیت را به صورت ناهمزمان انجام می دهد. Celery توسط چندین سازمان معروف از جمله اینستاگرام و موزیلا برای انجام میلیون ها تسک که در طول روز تولید میشوند استفاده می شود.

هنگام نصب Celery، باید اجزای مختلفی مانند broker و جایی برای ذخیره دیتا را انتخاب کنید. اگر گیج شده اید، توصیه می کنم Redis را نصب کنید و برای شروع از مکان ذخیره دیتا صرف نظر کنید. از آنجایی که Redis به صورت in-memorty کار می کند، اگر پیام های شما بزرگتر هستند و نیاز به ماندگاری دارند، باید به جای آن از RabbitMQ استفاده کنید. برای شروع می‌توانید مراحل اول با celery و استفاده از celery با جنگو را در راهنمای کاربر celery دنبال کنید.

در جنگو، تسک های Celery معمولاً در یک فایل جداگانه به نام tasks.py در کنار فایل های یک app می‌آید.

در اینجا یک کار ساده celery می ببینید:

# tasks.py
@shared_task
def fetch_feed(feed_id):
    feed_obj = models.Feed.objects.get(id=feed_id)
    feed_obj.page = retrieve_page(feed_obj.feed_url)
    feed_obj.retrieved = timezone.now()
    feed_obj.save()
‍‍‍

این تسک محتوای یک فید RSS را بازیابی کرده و در پایگاه داده ذخیره می کند.

به نظر می رسد یک تابع عادی پایتون است (حتی اگر مربوط به یک کلاس باشد)، جز دکوریتور @shared_task. این یک تسک celery را مشخص می کند. یک تسک مشترک(shared task) می تواند توسط اپ های دیگر در همان پروژه استفاده شود. این کار با ایجاد نمونه های مستقل از کار در هر برنامه ثبت شده، قابل استفاده مجدد می کند.

برای فراخوانی این تسک می توانید از متد delay() به صورت زیر استفاده کنید:

>>> from tasks import fetch_feed
>>> fetch_feed.delay(feed_id=some_feed.id)

برخلاف فراخوانی تابع عادی سریعا فانکشن اجرا نمی شود و بقیه درخواست ها تا زمانی که این تابع مقداری را برگرداند مسدود نمی شوند. در عوض بالافاصله یک آبجکت از نوع AsyncResult بر میگردد. این آبجکت می تواند برای بررسی وضعیت فانکشن و گرفتن مقدار بازگشتی تابع استفاده شود.

برای اینکه بفهمیم تابع ما چگونه و چه زمانی فراخوانی می شود، بیایید به نحوه عملکرد Celery نگاهی بیندازیم.

ابزار celery چگونه کار می کند

درک celery به دلیل معماری توزیع شده آن تا حدودی دشوار است. در اینجا یک نمودار سطح بالا وجود دارد که یک کانفیگ ساده Django-Celery را نشان می دهد:

زمانی که یک درخواست می رسد، می توان یک Celery task را برای رسیدگی به آن آن فعال کرد.Celery بلافاصله بدون مسدود کردن بقیه درخواست ها یک مقداری برمی‌گردد. در واقع، اجرای کار به پایان نرسیده است، اما یک پیام task وارد یک صف task (یا یکی از از صف های متعدد موجود) شده است.

کارگران سلری(workers) فرآیندهای جداگانه ای هستند که بر صف ها نظارت می کنند که اگر یک task جدید رسید آن ها را اجرا کنند. آن ها انجام یک کار (task) را برعهده میگیرند و یک پیام برگشت (acknowledgment) به صف میفرستند تا این task از صف ما حذف شود. سپس آن task را اجرا کنند پس از انجام این task این فرآیند تکرار می شود و worker یک task جدید را برای اجرا انتخاب می کند.

یک worker می‌تواند هنگام اجرای یک کار زمانبر یا کارهایی که درگیر I/O هستند مسدود شود و دیگر task جدیدی قبول نکند، اما طراحی جنگو به گونه ایست این کار بر روی فرآیند خود جنگو تأثیر نمی‌گذارد. وقتی کار worker تکمیل شد می توان نتیجه task را در جایی ذخیره کرد در بسیاری از موارد ذخیره نتیجه پایان یک task اهمیتی ندارد و میتوان آن را نادیده گرفت.

همچنین می‌توان با استفاده از چیزی که Celery آن را فرآیند celery beat می‌نامد، یک کار را برنامه‌ریزی کرد. می‌توانید آن را طوری پیکربندی کنید که در بازه‌های زمانی معینی، مانند هر 10 ثانیه یا در شروع یک روز هفته، کاری را آغاز کند. این ویژگی برای کارهای تعمیر و نگهداری مانند تهیه نسخه پشتیبان یا بررسی سلامت یک سرویس وب عالی است.

ابزار Celery به خوبی پشتیبانی می‌شود، مقیاس‌پذیر است و به خوبی با جنگو کار می‌کند، اما ممکن است برای کارهای ناهمزمان بی‌اهمیت بیش از حد دست و پا گیر باشد. در چنین مواردی، من استفاده از کانال‌های جنگو یا RQ را توصیه می‌کنم، یک صف ساده‌تر مبتنی بر Redis. با این حال، best practiceهایی که در بخش بعدی مورد بحث قرار می‌گیرند، ممکن است در مورد آنها نیز اعمال شود.

Celery best practices

شما دیده اید که چگونه Celery می تواند بار سنگین جنگو را تحمل کند، اما کار با Celery به دلیل مجموعه ویژگی های غنی آن کاملاً با جنگو متفاوت است. هزاران best practice درداکیومنت ها ذکر شده و در چندین وبلاگ به اشتراک گذاشته شده است.

اگر قبلاً با مفاهیم آشنا هستید و می خواهید یک چک لیست سریع داشته باشید، چک لیست celery task را در http://celerytaskschecklist.com بررسی کنید. در غیر این صورت، برای درک اینکه چگونه از celery بهترین بهره را ببرید، این آموزش را ادامه دهید.

Handling failure

هر نوع خطا می تواند هنگام اجرای یک task در Celery اتفاق بیفتند. باید مکانیزمی برای مدیریت این خطاها و تلاش مجدد باشد اگر این مکانیزم نباشد این خطاها شناسایی نمی شوند، اگر خطایی در کار رخ دهد موقتی است، مانند یک API (که خارج از کنترل ما است) یا تمام شدن حافظه. در چنین مواردی، بهتر است صبر کرد و دوباره کار را با فاصله های زمانی امتحان کرد.

در Celery، می‌توانید انتخاب کنید که به صورت خودکار یا دستی در صورت خطا آن تسک را دوباره امتحان کند. Celery تنظیم دقیق مکانیسم الگوریتم امتحان مجدد خودکار خود را آسان می کند. در مثال زیر، چندین پارامتر برای تلاش مجدد را مشخص می کنیم:

@shared_task(autoretry_for=(GatewayError,),
               retry_backoff=60,
               retry_kwargs={'max_retries': 5},
               retry_jitter=True)
              def fetch_feed(feed_id):
                  ...

آرگومان autoretry_for همه مواردی که در صورت خطا Celery باید به طور خودکار برای آنها امتحان کند فهرست می کند. در این مورد، فقط یک خطای GatewayError است. همچنین می توانید کلاس پایه خطا را در اینجا به autoretry_for همه ی خطاها ذکر کنید.

آرگومان retry_backoff مدت زمان انتظار اولیه را قبل از اولین تلاش مجدد، (60 ثانیه) مشخص می کند. هر بار که تلاش مجدد با شکست مواجه می شود، دوره انتظار دو برابر می شود، بنابراین دوره انتظار به 120، 240 و 360 ثانیه تبدیل می شود تا زمانی که به حداکثر محدودیت تلاش مجدد 5 برسد.

این تکنیک انتظار طولانی‌تر و طولانی‌تر برای تلاش مجدد، exponential backoff نامیده می‌شود. این برای تعامل با یک سرور خارجی ایده آل است زیرا ما به سرور خارجی زمان کافی برای بازیابی می دهیم.

یک نوسان تصادفی(آرگومان jitter) برای جلوگیری از مشکلی موسوم به thundering herds اضافه شده است. اگر تعداد زیادی از کارها الگوی تکرار مجدد یکسانی داشته باشند و در همان زمان منبعی را درخواست کنند، ممکن است آن را غیرقابل استفاده کند.

از این رو، یک عدد تصادفی به دوره انتظار اضافه می شود تا چنین اتفاقاتی رخ ندهد.

در اینجا مثالی از تلاش مجدد دستی در صورت خطا آورده شده است:

@shared_task(bind=True)
    def fetch_feed(self, feed_id):
        ...
        try:
            ...
        except (GatewayError) as exc:
            raise self.retry(exc=exc)

به آرگومان bind در دکوریتور و یک آرگومان self جدید برای task توجه کنید که یک آبجکت از task خواهد بود. اگر خطایی رخ داد، می‌توانید به صورت دستی دوباره متد self.retry صدا کنید. آرگومان exc برای ارسال اطلاعات خطا که می تواند در لاگ ها استفاده شود آمده است.

آخرین موردی که باید بررسی کنید که کم اهمیت هم نیست این است است که تمام خطاها جایی ثبت شود. برای این کار می توانید از ماژول لاگ استاندارد پایتون یا تابع چاپ (که به لاگ ها هدایت می شود) استفاده کنید. از ابزاری مانند Sentry برای ردیابی و مدیریت خودکار خطا استفاده کنید.

تسک های خودتوان

همانطور که دیدیم، کارهای Celery ممکن است چندین بار مجدداً راه اندازی شوند، به خصوص اگر acknowledgment با تاخیر را فعال کرده باشید. این امر کنترل side effect را یک کار مهم می کند. از این رو، celery توصیه می کند که همه کارها باید خودتوان باشد. خودتوان یک امر ریاضی است ویژگی یک تابع که است که اطمینان می دهد اگر با همان آرگومان ها فراخوانی شود، بدون توجه به اینکه چند بار آن را فراخوانی کنید، همان نتیجه را برمی گرداند.

ممکن است نمونه‌های ساده‌ای از عملکردهای بی‌توان را در خود مستندات Celery دیده باشید، مانند این:

@app.task def add(x, y):

    return x + y

مهم نیست که چند بار این تابع را فراخوانی کنیم، نتیجه add(2,2) همیشه 4 است.

با این حال، درک تفاوت بین یک تابع خودتوان و تابعی که عوارض جانبی(side effect) ندارد (تابع خالص یا پوچ) مهم است. توابع خودتوان همواره یک عارضه جانبی(side effect) ثابت دارند ، صرف نظر از اینکه یک بار یا چند بار صدا زده شده اند

با این حال درک تفاوت یک یک فانکشن خود توان و یک فانکشن معمولی که side effects ندارد مهم است. موضوع side effects یک فانکشن خودتوان صرف نظر از اینکه چندبار صدا زده شود یکسان خواهد بود.

برای مثال، تسکی که همیشه هنگام فراخوانی یک آبجکت از نوع order ایجاد می کند، یک فانکشن خودتوان نیست، اما تسکی که یک سفارش موجود را لغو می کند، خودتوان است. عملیات هایی که فقط حالت اشیا را می خوانند و هیچ اثر جانبی(side effect) ندارند، یک فانکشن خودتوان نیستند.

از آنجایی که معماری celery متکی بر تسک های خودتوان است، مهم است که سعی کنید تمام عوارض جانبی یک تسکی که خودتوان نیست را مطالعه کنید و آن را به یک کار غیر خودتوان تبدیل کنید. می‌توانید این کار را با بررسی اینکه آیا تسک ها قبلاً اجرا شده‌اند (اگر اجرا شده‌اند، پس تسک حذف شده است) یا با بررسی اینکه آیا نتیجه تسک در یک آدرس یونیک بر اساس آرگومان‌ها ذخیره شده است یا نه انجام دهید. در قسمت های بعدی کتاب (بخش Avoid writing to shared or global state) یک مثال ازین مورد آورده شده است.

در نتیجه، تسک خود را چندین بار صدا کنید تا بررسی کنید که آیا سیستم شما در همان حالت باقی می ماند یا خیر.

از عملیات نوشتن بر روی دیتابیس در فانکشن هایی به صورت shared یا global خودداری کنید

در یک سیستم همزمان، می توانید چندین خواننده داشته باشید. با این حال، لحظه‌ای که نویسندگان زیادی به یک وضعیت مشترک دسترسی پیدا می‌کنند، ممکن است دچار بن بست (deadlocks) یا شرایط رقابتی(race condition) آسیب‌پذیر می‌شوید. برای پرهیز از همه اینها کمی برنامه ریزی و نبوغ لازم است.

ابتدا بیایید سعی کنیم شرایط رقابتی(race condition) را درک کنیم. یک تسک celery به نام A را در نظر بگیرید که پردازش تصویر انجام می دهد (مانند تطبیق چهره شما با یک فرد مشهور). این تسک، ده تصویر قدیمی آپلود شده را انتخاب می کند و یک شمارنده global را آپدیت می کند.

ابتدا مقدار شمارنده را از یک پایگاه داده می خواند، آن را با تعداد تطابق تصویر موفق افزایش می دهد و سپس مقدار قدیمی را با مقدار جدید بازنویسی می کند. تصور کنید که یک کار یکسان دیگر B را به موازات شروع می کنیم تا سرعت تبدیل ها را افزایش دهیم.

حال اگر A و B شمارنده را دقیقاً همزمان از روی دیتابیس بخوانند، تا پایان کار مقدار یکدیگر را بازنویسی می کنند، بنابراین مقدار نهایی بر اساس اینکه چه کسی در پایان می نویسد خواهد بود. در واقع، مقدار شمارنده global بسیار به ترتیب اجرای وظایف بستگی دارد. بنابراین، این کار باعث ایجاد شرایط مسابقه منجر به داده های نامعتبر یا خراب می شود.

البته، مسئله واقعی این است که تسک ها از یکدیگر آگاه نیستند و ایجاد یک lock ساده ممکن است آن را حل کند، اما lock ها یا سایر اصول اولیه همگام سازی مشکلات خاص خود را دارند، مانند گرسنگی(starvation) یا بن بست (Deadlock).

یک راه حل عملی، درج وضعیت هر تصویر در یک جدول دیتابیس است که با شناسه منحصر به فرد یک تصویر مانند مقدار هش یا مسیر فایل آن ایندکس گذاری شده است:

Image hash Competed at Matched image path
SHA256: b4337bc45a8f... 2018-02-09T15:15:11+05:30 /celeb/7112.jpg
SHA256:550cd6e1e8702... 2018-02-09T15:17:24+05:30 /celeb/3529.jpg

شما می توانید تعداد کل عکس هایی که با hash تطابق دارند را با شمارش ردیف های این جدول بیابید. علاوه بر این، این رویکرد به شما امکان می دهد تطابق های موفق را بر اساس تاریخ یا زمان تقسیم کنید

با این کار دیگر شرایط رقابتی (race condition) نخواهیم داشت، زیرا ما یک وضعیت global را بازنویسی نمی کنیم. تنها امکان بازنویسی یک حالت مشترک زمانی است که دو یا چند کار تصویر یکسانی را برای پردازش انتخاب کنند. حتی اگر این اتفاق بیفتد، هیچ خرابی داده وجود ندارد زیرا نتیجه یکسان است و نتیجه آخرین کاری که باید تمام شود نتیجه نهایی خواهد شد.

به روز رسانی پایگاه داده بدون شرایط رقابتی(race condition)

ممکن است با موقعیت‌هایی مواجه شوید که مجبور باشید یک آبجکت را در یک تابع مشترک (shared function) در دیتابیس به‌روزرسانی کنید. اگر دیتابیس شما از قابلیت row-level locks پشتیبانی کند میتوانید ازین مورد یا از اشیا Django F() استفاده کنید قایل ذکر است دیتابیس mysql چون از موتور MyISAM استفاده می کند این قابلیت را پشتیبانی نمی کند.

از قابلیت row-level locks در جنگو با استفاده از متد select_for_update() در کوئری ست میتوانید استفاده کنید. مثال زیر را ببینید:

with transaction.atomic():
    feed = Feed.objects.select_for_update().get(id=id)
    feed.html = sanitize(feed.html)
    feed.save()

با استفاده از select_for_update() سطر های آبجکت Feed اصطلاحا قفل (lock) میشود که تا زمانی که ترنزکشن ما تمام نشد اجازه ایجاد و تغییر در دیتابیس نداشته باشیم. اگر نخ یا فرآیند دیگری قبلاً همان ردیف را قفل کرده باشد، کوئری منتظر می ماند یا کوِئری تا زمانی که lock در دیتابیس تمام شود مسدود می شود. این رفتار را می توان با استفاده از پارامترهای کلمه کلیدی select_for_update تغییر داد تا یک خطا ایجاد کند یا در صورت قفل از آن رد شود.

اگر عملیات روی یک فیلد را بتوان در دیتابیس انجام داد بهتر است از فانکشن F() برای جلوگیری از شرایط رقابتی(race condition) استفاده شود. فانکشن F() از گرفتن دیتا از دیتابیس و انتقال به مموری پایتون برای انجامیک عملیات جلوگیری می کند و این کار را مستقیم بر روی دیتابیس انجام می دهد. به مثال زیر توجه کنید:

from django.db.models import F
feed = Feed.objects.get(id=id)
feed.subscribers = F('subscribers') + 1
feed.save()

زمانی که save( ) صدا زده می شود عملیات اضافه کردن مقدار 1 به subscribers در خوددیتابیس انجام می شود و در هیچ کجا دیتایی از آبجکت Feed از دیتابیس گرفته نمی شود و مستقیما مقدار آپدیت می شودئ و احتمال وقوع شرایط رقابتی(race condition) بسیار کم است.

از انتقال اشیا پیچیده به task خودداری کنید

به راحتی می توان فراموش کرد که هر بار که ما یک task Celery را فراخوانی می کنیم، آرگومان ها قبل از اینکه وارد صف شوند سریالایز می شوند. از این رو، ارسال یک شی ORM جنگو یا هر شی بزرگی که ممکن است صف ها را مسدود کند، توصیه نمی شود.

دلیل خوب دیگری برای اینکه از ارسال یک شی پایگاه داده به تسک ها. با توجه به ماهیت ناهمزمان تسک ها، داده‌ها می‌توانند تا زمانی که تسک ها انجام شوند قدیمی شوند. رکورد ممکن است تغییر کرده یا حتی حذف شده باشد.

بنابراین، همیشه یک کلید اصلی(primary key) یا مقدار جستجو(lookup value) را ارسال کنید و آخرین مقدار شی را از پایگاه داده بازیابی کنید. داکیومنت celery به این امر اشاره می کند که وظیفه اپلیکیشن بر عهده آن است. مطمئن شوید اپلیکیشن شما یک اپلیکیشن بروز است، نه یک اپلیکیشن قدیمی.

درک asyncio

ماژول asyncio یک کتابخانه چندوظیفه ای مشترک است که از نسخه 3.6 در پایتون موجود است. ابزار Celery برای اجرای وظایف همزمان خارج از یک فرآیند فوق‌العاده است، اما زمان‌های خاصی وجود دارد که باید چندین نخ(thread) اجرایی را در یک فرآیند اجرا کنید.

اگر با مفاهیم async/wait آشنا نیستید (مثلاً در جاوااسکریپت یا سی شارپ)، این موضوع شامل کمی منحنی یادگیری (موضوعی تقریبا سخت برای یادگیری) است. با این حال، ارزش وقت شما را دارد، زیرا می تواند سرعت کد شما را به شدت افزایش دهد (مگر اینکه کاملاً به CPU محدود شده باشد). علاوه بر این موارد celery به درک سایر کتابخانه های ساخته شده وابسته به این ابزار مثل django-channels کمک می کند.

همه برنامه‌های asyncio توسط یک حلقه رویداد هدایت می‌شوند، این حلقه تقریباً یک حلقه بی‌نهایت است که همه کوروتین‌های ثبت‌شده را به ترتیب فراخوانی می‌کند. هر کوروتین با دادن کنترل(yield) به کوروتین های همکار در مکان های تعریف شده، به صورت مشارکتی عمل می کند.به این عمل awaiting گفته می شود.

یک کوروتین مانند یک تابع است که می تواند اجرای یک برنامه را به حالت تعلیق درآورد و یا از سر بگیرد و مانند نخ (thread) های سبک کار میکند.

هر کوروتین ذاتی از کلمات کلیدی async و await به مانند کد زیر استفاده می کند:

import asyncio
async def sleeper_coroutine():
    await asyncio.sleep(5)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(sleeper_coroutine())

این یک نمونه حداقلی از یک حلقه رویداد است که یک کوروتین به نام sleeper_coroutine را اجرا می کند. هنگامی که تابع فراخوانی می شود، این coroutine تا دستور await اجرا می شود و کنترل را به حلقه رویداد باز می گرداند. این معمولاً جایی است که یک عملیات I/O رخ می دهد.

هنگامی که عملیات مورد انتظار تکمیل شد (پس از 5 ثانیه)، کنترل در همان خط کوروتین باز می گردد. سپس، کوروتین برگشت داده می شود یا تکمیل شده در نظر گرفته می شود.

asyncio دربرابر threads

اگر روی کد چند نخی کار کرده اید، ممکن است تعجب کنید که چرا نباید فقط از نخ ها استفاده کرد؟ دلایل متعددی وجود دارد که چرا نخ ها در پایتون محبوب نیستند.

در مرحله اول، نخ ها باید در مواقع دسترسی به منابع مشترک همگام(sync) شوند، در غیر این صورت شرایط رقابتی (race condition) پیش می آید. انواع مختلفی از همگام سازی اولیه مانند قفل کردن(lock) وجود دارد، اما اساسا، آنها شامل انتظار(waiting) هستند، که عملکرد را کاهش می دهد و می تواند باعث ایجاد شرایط بن بست یا گرسنگی(deadlocks, starvation) شود.

کوروتین آدرس کاملاً مشخصی دارد که در آنجا اجرا میشوند. در نتیجه، تا زمانی که کوروتین در حالت شناخته شده(known state) باشند، می توانید تغییراتی در یک حالت اشتراکی ایجاد کنید. به عنوان مثال، می‌توانید یک فیلد را از یک پایگاه داده بازیابی کنید، محاسبات را انجام دهید، و فیلد را بازنویسی کنید، بدون اینکه نگران باشید که یک برنامه دیگر در این بین ارتباط شما را قطع کرده باشد.

دوم اینکه کوروتین ها سبک هستند. هر کوروتین به طور قابل توجهی به حافظه کمتری نسبت به یک نخ نیاز دارد. اگر بتوانید حداکثر صدها نخ را اجرا کنید، با توجه به حافظه یکسان بین کوروتین و نخ، با این حافظه می توانید ده ها هزار کوروتین را اجرا کنید. سوئیچ کردن بین نخ ها هم مقداری زمان می برد (چند میلی ثانیه). این بدان معنی است که د این زمانی که برای سوِیچ تلف می شود میتوانید کارهای بیشتری را انجام دهید یا به کاربران همزمان بیشتری سرویس دهید.

از نکات منفی کوروتین ها می توان به این اشاره کرد که نمیتوان همزمان از شیوه blocking and non-blocking در کد استفاده کرد. بنابراین هنگامی که وارد حلقه رویداد می شوید، بقیه کدها باید به سبک ناهمزمان نوشته شوند، حتی کتابخانه هایی که استفاده می کنید. این ممکن است استفاده از برخی کتابخانه های قدیمی با کد همزمان را کمی دچار مشکل کند.

مثالی از یک نمونه قدیمی برای وب اسکرپینگ

بیایید به مثالی نگاه کنیم که چگونه می توانیم کدهای همزمان را به ناهمزمان تبدیل کنیم. ما به یک وب اسکرپر نگاه خواهیم کرد که صفحات را از چند URL بارگیری می کند و اندازه آنها را اندازه می گیرد. این یک مثال مناسب است زیرا تعداد زیادی عملیات ورودی/خروجی در این مثال داریم و هنگام استفاده کانکارنسی سرعت کار به مقدار قابل توجهی افزایش می باید.

وب اسگرپینگ به صورت همزمان (Synchronous)

اسکرپ کردن به صورت همزمان فقط از کتابخانه های استاندارد پایتون مانند urllib استفاده می کند.تکه کد پایین صفحه اصلی سه سایت محبوب دانلود می کند و یک سایت دیگر که زمان بارگذاری آن را می توان برای شبیه سازی یک اتصال کُند به تاخیر انداخت. این کد اندازه صفحات مربوطه و کل زمان اجرا را چاپ می کند.

کد در آدرس src/extras/sync.py نیز آورده شده است:

"""Synchronously download a list of webpages and time it"""

from urllib.request import Request, urlopen
from time import time
sites = [
"http://news.ycombinator.com/",
"https://www.yahoo.com/",
"http://www.aliexpress.com/",
"http://deelay.me/5000/http://deelay.me/",
]
def find_size(url):
  req = Request(url)
  with urlopen(req) as response:
    page = response.read()
    return len(page)
def main():
    for site in sites:
        size = find_size(site)
        print("Read {:8d} chars from {}".format(size, site))
if __name__ == '__main__':
    start_time = time()
    main()
    print("Ran in {:6.3f} secs".format(time() - start_time))

در یک لپ تاپ آزمایشی، اجرای این کد 17.1 ثانیه طول کشید. این زمان بارگذاری کلی هر سایت است. بیایید ببینیم کد ناهمزمان چگونه اجرا می شود.

وب اسگرپینگ به صورت ناهمزمان (Asynchronous)

این کد asyncio نیاز به نصب چند کتابخانه ناهمزمان پایتون، مانند aiohttp و aiodns دارد. آنها در مستندات ذکر شده اند.

کد زیر در آدرس src/extras/async.py نیز آورده شده است. ساختار کد به گونه ای است که تا حد امکان به نسخه sync نزدیک باشد تا مقایسه آن آسان تر باشد:

"""Asynchronously download a list of webpages and time it Dependencies: Make sure you install aiohttp

import asyncio
import aiohttp
from time import time

sites = [
    "http://news.ycombinator.com/",
    "https://www.yahoo.com/",
    "http://www.aliexpress.com/",
    "http://deelay.me/5000/http://deelay.me/",
]
async def find_size(session, url):
    async with session.get(url) as response:
        page = await response.read()
        return len(page)

async def show_size(session, url):
    size = await find_size(session, url)
    print("Read {:8d} chars from {}".format(size, url))

async def main(loop):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for site in sites:
            tasks.append(loop.create_task(show_size(session, site)))
        await asyncio.wait(tasks)

if __name__ == '__main__':
    start_time = time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    print("Ran in {:6.3f} secs".format(time() - start_time))

تابع اصلی یک کوروتین است که باعث ایجاد یک کوروتین جداگانه برای هر وب سایت می شود. این کوروتین صبر میکند تا تمام کوروتینهایی که ایجاد شده اند تکیمل شوند و به پایان برسند. به عنوان یک best practices آبجکت session ایجاد شده است و به عنوان آرگومان پاس داده می شود تا از ساخت مجدد یک session برای هر صفحه جلوگیری شود.

کل زمان اجرای این برنامه روی همان لپ تاپ آزمایشی 7.5 ثانیه است. این افزایش سرعت 2.3 برابری روی یک هسته است. همانطور که در نمودار زیر نشان داده شده است، اگر بتوانیم نحوه صرف زمان را تجسم کنیم، این نتیجه شگفت انگیز را بهتر می توان درک کرد:

اسکرپینگ به صورت synchronous به راحتی قایل درک است .اسکراپر سنکرون به راحتی قابل درک است. هر کار منتظر تکمیل کار قبلی است. هر کار به زمان CPU بسیار کمی نیاز دارد و بیشتر زمان در انتظار رسیدن داده ها از شبکه صرف می شود. در نتیجه، وظایف به طور متوالی مانند یک آبشار انجام می شوند.

از طرف دیگر، اسکرپینگ به صورت Asynchronous اولین وظیفه را شروع می کند و به محض اینکه شروع به انتظار برای I/O کرد، به کار بعدی سوئیچ می کند. CPU به ندرت کاری انجام نمی دهد زیرا به محض شروع انتظار، اجرا به حلقه رویداد برمی گردد. در نهایت، I/O در همان مدت زمان کامل می شود، اما به دلیل چندگانه شدن فعالیت، زمان کلی صرف شده به شدت کاهش می یابد.

در واقع، کد ناهمزمان را می توان بیشتر سرعت بخشید. حلقه رویداد کتابخانه استاندارد asyncio با پایتون خام نوشته شده و به عنوان پیاده سازی مرجع ارائه می شود. می‌توانید پیاده‌سازی‌های سریع‌تری مانند uvloop را برای افزایش سرعت بیشتر در نظر بگیرید.

همزمانی(Concurrency) به معنای موازی(parallelism) نیست

همزمانی توانایی انجام کارهای دیگر در زمانیکه شما منتظر اتمام کار فعلی هستید می باشد. تصور کنید که در حال پختن غذاهای زیادی برای مهمانان هستید. در حالی که منتظر برای پختن چیزی هستید، می توانید کارهای دیگری مانند پوست کندن پیاز یا بریدن سبزیجات را انجام دهید.برای ایجاد یک قیاس در دنیای ابرقهرمانان یک ابرقهرمان ممکن است در یک مکان با چند نفر بجنگد، زیرا اکثر آنها پس از یک ضربه بهبود پیدا میکنند، یا مجددا به ابرقهرمان برای رویارویی میرسند(یا منتظر نوبت خود می مانند)، که باعث می‌شود قهرمان ما ضربه‌ها را یکی پس از دیگری وارد کند.

موازی سازی زمانی است که دو یا چند موتور اجرایی در حال انجام یک کار هستند. در ادامه قیاس ما، این زمانی است که دو یا چند ابرقهرمان به عنوان یک تیم با دشمنان مبارزه می کنند. این نه تنها یک فرصت عالی برای فرانچایز سینما است، بلکه سازنده تر از یک قهرمان تک نفره است که با حداکثر کارایی کار می کند.

اشتباه گرفتن همزمانی و موازی بودن بسیار آسان است زیرا می توانند همزمان اتفاق بیفتند. شما می توانید همزمان وظایف را بدون موازی سازی یا برعکس انجام دهید، اما آنها به دو چیز متفاوت اشاره دارند. همزمانی روشی برای ساختاربندی برنامه های شما است، در حالی که موازی سازی به نحوه اجرای آن اشاره دارد.

به دلیل ویژگی global interpreter lock (GIL) پایتون ما نمی‌توانیم بیش از یک رشته از مفسر پایتون (به طور خاص، مفسر استاندارد CPython) را در یک زمان اجرا کنیم، حتی در سیستم‌های چند هسته‌ای. این ویزگی باعث می شود مقدار موازی‌سازی را که می‌توانیم با یک نمونه از فرآیند پایتون به دست آوریم، محدود کند.

استفاده بهینه از منابع محاسباتی شما به parallelism و concurrency نیاز دارد. concurrency به شما کمک می‌کند از مسدود کردن هسته پردازنده در زمان انتظار مثلاً رویدادهای I/O جلوگیری کنید، در حالی که parallelism به توزیع کار بین تمام هسته‌های موجود cpu کمک می‌کند.

بر خلاف تصور در هر دو مورد parallelism و concurrency شما به صورت همزمان اجرا نمی‌کنید، یعنی قبل از اینکه به کار دیگری بروید، منتظر پایان کار هستید.سیستم های ناهمزمان ممکن است بهینه ترین به نظر برسد. با این حال، ساختن و استدلال کردن آنها دشوارتر است.

ورود به کانال ها

ماژول django channles در ابتدا برای حل مشکل مدیریت پروتکل های ارتباطی ناهمزمان، مانند WebSockets ایجاد شد.وب اپلیکیشن ها رفته رفته قابلیت هایی مانند چت و اعلان های آنی را ارائه کردند. برای حل این مشکل روش های مختلفی برای پشتیبانی نیازمندی های این قابلیت ها در جنگو اراِه شده است مثل اجرای سرورهای سوکت جداگانه یا سرورهای پراکسی.

پروژه Channels یک پروژه رسمی جنگو است،این پروژه نه فقط برای مدیریت WebSocket ها و دیگر فرم های ارتباط دو طرفه، بلکه برای اجرای وظایف پس زمینه به صورت ناهمزمان هم کابرد دارد.

در زمانی نگارش مقاله، Django Channels 2 منتشر شد، بر روی مقاله یک بازنویسی کامل بر اساس کوروتین های async/wait Python 3 است.

در اینجا یک بلوک دیاگرام ساده از راه اندازی کانال های معمولی آمده است:

یک کلاینت، مانند یک مرورگر وب، هم HTTP/HTTPS و هم WebSocket را به یک سرور Asynchronous Server Gateway Interface (ASGI) مانند Daphene ارسال می کند. مانند وب سرور ASGI ،WSGI یک روش متداول برای تعامل برنامه های سمت سرور و دیگر اپلیکیشن ها به صورت Asynchronous هست.

مانند یک برنامه متداول جنگو، ترافیک HTTP به صورت synchronous مدیریت می‌شود، یعنی وقتی مرورگر درخواستی را ارسال می‌کند، منتظر می‌ماند تا به جنگو هدایت شود و پاسخی ارسال شود. با این حال، زمانی که ترافیک WebSocket اتفاق می افتد بسیار جالب تر می شود، زیرا می تواند از هر جهت فعال شود.

هنگامی که یک اتصال WebSocket برقرار شد، یک مرورگر می تواند پیام ارسال یا دریافت کند.یک پیام ارسال شده به یک روتر از نوع پروتکل می رسد که بر اساس پروتکل حامل، کنترلر روتر بعدی را تعیین می کند. از این رو، می توانید یک روتر برای HTTP و دیگری برای پیام های WebSocket تعریف کنید.

این روترها بسیار شبیه به URLهای جنگو هستند، اما پیام‌های دریافتی را به یک consumer (به‌جای یک view) می‌رسانند.یک consumer مانند یک کنترل کننده رویداد است که به رویدادها واکنش نشان می دهد. همچنین می‌تواند پیام‌ها را به مرورگر ارسال کند و در نتیجه یک ارتباط کاملاً دوطرفه را میتواند در خود جای دهد.

یک consumer کلاسی است که می توانید متدهای آن را به عنوان توابع معمولی پایتون (synchronous) یا به صورت انتظار (asynchronous) بنویسید.یک کد asynchronous نباید با کد synchronous ترکیب شود، بنابراین توابع تبدیلی برای تبدیل از synchronous به asynchronous و بالعکس وجود دارد. به یاد داشته باشید که قسمت های مختلف جنگو synchronous هستند. یک consumer در واقع یک برنامه معتبر ASGI است.

تاکنون از لایه Channel استفاده نکرده ایم. از قضا می توانید برنامه های Channel را بدون استفاده از Channels بنویسید! با این حال، آنها منحصرا مفید نیستند زیرا به جز نمونه برداری (polling) دیتابیس هیچ مسیر ارتباطی آسانی بین نمونه های برنامه وجود ندارد. کانال ها دقیقاً همین را ارائه می دهند، پیام رسانی سریع point-to-point و انتشار در بین نمونه های برنامه.

یک کانال مانند یک لوله است. یک فرستنده از یک طرف پیامی به این لوله می فرستد و در انتهای دیگر به شنونده می رسد. یک گروه مجموعه ای از کانال ها را تعریف می کند که همگی به یک موضوع گوش می دهند. هر consumer به کانالی که خودش تولید کرده با ویژگی(attribute) self.channel_name به آن دسترسی دارد گوش(listen) می دهد.

علاوه بر موضوع انتقال پیام، می‌توانید با ارسال پیام، یک consumer را فعال کنید که به کانال گوش می‌دهد و در نتیجه یک کار که در پس‌زمینه اجرا میشود را شروع کنید. این به عنوان یک سیستم پس زمینه بسیار سریع و ساده کار می کند.

گوش دادن به اعلان ها با WebSockets

به جای مثال چت معمولی، بیایید به مثالی بهتر نگاه کنیم که یک شبکه اجتماعی بهتر channelها را نشان می دهد . یک اپلیکیشن اطلاع رسانی. این برنامه هر زمان که نوع خاصی از یک مدل در دیتابیس ذخیره شود را شناسایی می‌کند و یک اعلان را به همه مشتریان (یعنی مرورگرهای همه کاربران متصل) در زمان واقعی ارسال می‌کند.

با فرض اینکه Channels به درستی نصب و پیکربندی شده باشد، باید تمام مسیرهای از نوع پروتکل را در فایل routing.py به صورت زیر تعریف کنیم:

from channels.routing import ProtocolTypeRouter, URLRouter
from django.urls import path
from notifier.consumers import NotificationConsumer
application = ProtocolTypeRouter({
    "websocket": URLRouter([
        path("notifications/", NotificationConsumer),
    ]),
})

درخواست های HTTP به طور پیش فرض به جنگو ارسال می شود. این ما را به تکه کد consumer هدایت می کند که در خود برنامه اعلان به عنوان customers.py قرار دارد:

from channels.generic.websocket import AsyncJsonWebsocketConsumer
class NotificationConsumer(AsyncJsonWebsocketConsumer):
    async def connect(self):
        await self.accept()
        await self.channel_layer.group_add("gossip", self.channel_name)

async def disconnect(self, close_code):
    await self.channel_layer.group_discard("gossip", self.channel_name)

async def name_gossip(self, event):
    await self.send_json(event)

برای راحتی، ما از یک کلاس consumer عمومی به نام AsyncJsonWebsocketConsumer استفاده می کنیم که ارتباطات WebSocket را با سریالایز کردن به فرمت JSON و بالعکس مدیریت می کند.

متد connect به سادگی یک اتصال را می پذیرد و کانال آن را به گروه gossip Channel اضافه می کند. اکنون، هر پیامی که به این گروه ارسال شود، با نام مناسب متدش consumer را فراخوانی می کند.

ما فقط پیام هایی را میپذریم که از نوع name.gossip باشند. از این رو، ما روشی به نام name_gossip ایجاد کرده‌ایم (نقاط به زیرخط ترجمه شده اند) این روش به سادگی شی ءای که دارای یک رویداد (event) باشد را به WebSocket ارسال می کند که توسط مرورگر دریافت می شود.

متد disconnect تضمین می کند که کانال consumer از گروهی که ایجاد کرده بودیم حذف می شود زمانی که اتصال بسته می شود. بنابراین ما فقط channel های فعال در گروه خواهیم داشت.

با آغاز رویداد(event) پازل ما تکمیل می شود. کد زیر را در فایل signals.py داریم:

from .post.models import Post
from django.db.models.signals import pre_save
from django.dispatch import receiver
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
@receiver(pre_save, sender=Post)
def notify_post_save(sender, **kwargs):
    if "instance" in kwargs:
    instance = kwargs["instance"]
    # check if it is a new post
    ...
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        "gossip", {"type": "name.gossip",
                  "event": "New Post",
                  "sender": instance.posted_by.get_full_name(),
                  "message": instance.message})

ما یک hook اضافه می کنیم تا این hook زمانی که یک شی Post (که می تواند هر شیئی برای آن موضوع باشد) ایجاد شد صدا زده شود. پس از آنجایی که ما فقط به پست های جدید علاقه مند هستیم، ویرایش پست های موجود را بررسی کرده و نادیده می گیریم.

قبل از ارسال هر چیزی به کانال، باید channel_layer را بازیابی کنیم. سپس باید از روش group_send برای ارسال پیام به گروه gossip استفاده کنیم. با این حال، این یک متد asynchronous است و ما در دنیای جنگو هستیم، بنابراین به صورت synchronous اتفاق می‌افتد. از این رو، ما فانکشن را با استفاده از مبدل async_to_sync تبدیل می کنیم و آن را تا زمانی که تابع async مقداری را برگرداند مسدود می کنیم.

همانطور که ممکن است توجه کرده باشید، Channels از الگوی انتشار-اشتراک(sub/pub) استفاده می کند. طراحی کانال ها عمداً از انتظار برای یک رویداد اجتناب می کند و از این رو از بن بست(Deadlock) جلوگیری می کند. با استفاده از asyncio، می‌توانیم برنامه‌های asynchronous واقعی را با جنگو بسازیم.

تفاوت ها با ابزار celery

با توانایی اجرای وظایف در پشت صحنه(background) با استفاده از workerها ممکن است به طور طبیعی گیج شوید که شاید Channels بتواند جایگزین Celery شود. در درجه اول دو تفاوت عمده وجود دارد: تضمین های تحویل پیام و وضعیت وظایف.

کانال‌هایی که در حال حاضر با یک Backend Redis پیاده‌سازی می‌شوند، در بهترین حالت برای یکبار ضمانت ارسال پیام را ارائه می‌دهند، در حالی که Celery حداقل یک ضمانت را ارائه می‌دهد. این اساساً به این معنی است که Celery زمانی که تحویل ناموفق باشد تا زمانی که پیغام موفقیت آمیز دریافت کند دوباره تلاش خواهد کرد. در مورد کانال ها، تقریباً این اتفاق نمی افتد.

ثانیاً، Channels اطلاعاتی در مورد وضعیت یک کار خارج از وظایفش ارائه نمی دهد. ما باید خودمان چنین عملکردی را بسازیم به عنوان مثال با به روز رسانی پایگاه داده. وضعیت وظایف Celery را می توان پرس و جو کرد و آن ها حفظشان کرد.

برای جمع‌بندی، می‌توانید از Channels به جای Celery برای موارد استفاده کمتر مهم استفاده کنید. با این حال، برای یک راه حل قوی تر و اثبات شده، باید به Celery اعتماد کنید.

خلاصه

در این فصل، روش‌های مختلفی را برای پشتیبانی از اجرای ناهمزمان(asynchronous) در جنگو بررسی کردیم. آنها انتزاعات قدرتمندی را در جنگو برای ایجاد برنامه‌هایی ارائه می‌کنند که می‌توانند از اعلان‌های آنی پشتیبانی کنند، پیشرفت یک کار آهسته را نمایش دهند، با سایر کاربران ارتباط برقرار کنند یا کارهای پشت صحنه (background) را اجرا کنند.

از زمان گذشته، Celery ابزار انتخابی برای فعالیت های ناهمزمان(asynchronous) بوده است. با این حال، کانال ها راه حل سبک تر و محکم تری را ارائه می دهند. هر دو کاربرد خود را دارند و می توان از آنها در یک پروژه استفاده کرد. در نتیجه از ابزار مناسب برای کار استفاده کنید!

در فصل بعدی، به معنای RESTful API ها و نحوه پیاده سازی آن ها در جنگو با استفاده از بهترین شیوه های فعلی خواهیم پرداخت.