ריבוי משימות בארדואינו

לפני מספר ימים פרסם חבר טוב סרטון ביוטיוב שבו הוא מדגים שימוש בפונקצית yield אשר מוחבאת בתוך פקודת delay כדי להריץ שתיי משימות  במקביל.

גם מיקום הקריאה, וגם המינוח yield (״תן זכות קדימה״ בתרגום חופשי) עוררו בי את החשד שמדובר בתשתית למתזמן (scheduler) עבור מערכת הפעלה. חיפוש קצר באינטרנט אישר את החשד. מדובר בספרית ריבוי משימות, אך כרגע רק לארכיטקטורות 32 ביט של ATMEL ולכן מוגבל בלוחות שנתמכים.

רציתי לכתוב גרסא גם למעבדי הAVR, ולכן התחלתי לקרוא יותר על הארכיטקטורה שלו. תוך כדי מצאתי מימוש שמישהו כתב גם למעבדים ה״רגילים״. המימוש של שנינו דומה מאוד, כך שעבודת הבדיקה נחסכה ממני. אנסה להסביר כיצד זה עובד…

נתחיל בהבנת תפקידו של המתזמן: כאשר ישנו רק מעבד אחד, הוא יכול בכל רגע נתון לבצע רק פקודת קוד אחת. אם נרצה להריץ מספר משימות במקביל, הן יצתרכו לחלק את זמן המעבד בינהן, תוך כדי שהן מתחלפות בינהן בתפקיד הקוד שרץ כרגע.  אם פעולה זו קוראת מספיק מהר, הרבה פעמים בשניה, זה נראה לצופה האנושי כאילו מספר תהליכים קורים במקביל. לספריה שקובעת איזה קוד ירוץ מתי קוראים מתזמן. לפעולת ההחלפה קוראים task switching.

ישנם שני סוגים של מתזמנים, שיתופי cooperative ו״מכת מנע״ preemptive. השיתופי כשמו כן הוא. החלפת התהליך מתבצעת באופן יזום על ידי התהליך עצמו כאשר הוא יודע שהוא ממתין לארוע עתידי (דוגמת השהיית delay, קבלת תקשורת או אות חיצוני) ולכן ״מוותר״ (yields) על השימוש במעבד עד שיצתרך לפעול שוב. במערכת preemptive ישנו שעון שכל כמה זמן יוצר פסיקה שמחליפה את התהליך הפעיל בלי קשר למצבו. במערכת הארדואינו בחרו בשיטה השיתופית, כנראה בעיקר בגלל פשטות המימוש. חסרונה של השיטה היא שהיא תלויה ב״רצון הטוב״ של הכותב וכך אם תהליך אחד בטעות נתקע, או סתם לא משחרר את המעבד לזמן ארוך, הוא למעשה חונק את התהליכים האחרים.

נקח לדוגמה את הקוד הבא (שיניתי קצת את הקוד המקורי לצורך בהירות)

#include &lt;SchedulerARMAVR.h&gt;<
int led1 = LED_BUILTIN; // more portable
int led2 = 12;
int led3 = 11;

void setup() {
  Serial.begin(9600);  pinMode(led1, OUTPUT);
  pinMode(led2, OUTPUT);
  pinMode(led3, OUTPUT);Scheduler.startLoop(loop2);
  Scheduler.startLoop(loop3);
}
void loop() {
  digitalWrite(led1, HIGH);
  Scheduler.delay(1000);
  digitalWrite(led1, LOW);
  Scheduler.delay(1000);
}
void loop2() {
  digitalWrite(led2, HIGH);
  Scheduler.delay(100);
  digitalWrite(led2, LOW);
  Scheduler.delay(100);
}
void loop3() {
  while (!Serial.available())
    yield();
  char c = Serial.read();
  if (c=='0') {
    digitalWrite(led3, LOW);
    Serial.println("Led turned off!");
  }
  if (c=='1') {
    digitalWrite(led3, HIGH);
    Serial.println("Led turned on!");
  }
}

התוכנה מתחילה כרגיל בsetup שיוצר שני תהליכים בנוסף לloop הראשי: loop2 ו loop3. הלופ הראשי מתחיל להתבצע ומדליק את לד 1. לאחר מכן מבקש התהליך השהייה של שניה. אם נתבונן במימוש של delay, נגלה כי זו למעשה לולאה שמחכה שיעבור הזמן שנתבקש. במקרה שלנו, כל עוד הזמן לא עבר, היא מוותרת על השימוש במעבד לטובת הtask-ים האחרים

void SchedulerClass::delay(uint32_t ms) {
  uint32_t end = millis() + ms;  
  while (millis() < end) 
    yield(); 
}

לכן עובר המעבד לשימוש התהליך של loop2, שמדליק גם הוא לד, ומוותר על המעבד לטובת loop3 שבודק האם הגיע תקשורת סיראלית. אם הגיע תקשורת הוא ממשיך בקריאת הפקודה שנשלחה וביצועה. אם לא, גם הוא מוותר על המעבד והתור חוזר לתהליך הראשון וחוזר חלילה… כאשר יעבור זמן ההשהיה שהגדיר תהליך 2, הוא יצא מלולאת הdelay, יכבה את הלד ויבקש delay נוסף….

במימוש הזה, תהליך בחירת הtask הבא להרצה הוא פשטני ומריץ את התהליכים אחד אחרי השני בסדר הופעתם ולכן נקרא round robin (קרוסלה). ניתן לממש אלגוריטמים מתוחכמים יותר הבוחרים את התהליכים לפי קריטריונים מסויימים, למשל לפי עדיפות.

עד כאן למדנו להשתמש במתזמן. אבל איך למעשה מתבצעת החלפת התהליך ? לצורך זה נצתרך להבין קצת כיצד פועל המעבד והקומפילר.

כאשר אנחנו עוברים מתהליך אחד לשני, אנחנו רוצים ״להקפיא״ את מצב המעבד כפי שהוא נמצא בנקודה מסויימת בביצוע, ולהחליפו במצב שהקפאנו בפעם האחרונה שתהליך הבא רץ בו. למעבד הAVR ישנם 32 רגיסטרים לשימוש כללי, מצביע למחסנית (Stack Pointer), ומצביע לפקודה הבאה לביצוע (Program Counter). למזלינו, אפילו אין צורך לשמור את כולם מכיוון שקומפילר הgcc שלנו חילק אותם לקבוצות.

  • רגיסטר R0 הוא רגיסטר שמוגדר לשימוש זמני בלבד ואין צורך לשמור אותו
  • רגיסטר R1 מוגדר תמיד כאפס
  • רגיסטרים R2-R17 ו R28-R29 מוגדרים כבטוחים לשימוש (call saved) וכל פונקציה רשאית להשתמש בהם אך נדרשת להחזירם כפי שקיבלה אותם.
  • רגיסטרים R18-R27 ו R30-R31 מוגדרים לשימוש פנימי של כל פונקציה (call used) והיא רשאית לעשות בה כרצונה ואין בטחון כי תשמר אותם.

בנוסף כדאי לשים לב למנגנון קריאה לסברוטינה של המעבד. כאשר אנו מבצעים פקודת RCALL המעבד שומר במחסנית את המיקום של הפקודה הנוכחית (PC) במחסנית. בסיום התת-שגרה פקודת RET מוציאה את ערך זה ומחזירה את הביצוע לנקודה ממנה יצאנו.

כאשר אנו קוראים לפקודה yield (בין אם באופן יזום, או בתוך delay) הפקודה שומרת את ערכי הרגיסטרים, מצביעי המחסנית ומצביע הביצוע באזור שהוקצע בזכרון לכל תהליך (נקרא לפעמים thread control block או TCB). לאחר מכן נבחר התהליך הבא, והערכים השמורים שלו מועתקים לרגיסטרים המתאימים. כאשר מתבצעת פקודת הRET המעבד חוזר לבצע את הנדרש בדיוק מאיפה שהיה בפעם האחרונה בtask החדש.

בשביל מה כל זה טוב בכלל ?

אין ספק כי אפשר לבצע את אותן פעולות בדיוק על ידי שימוש בלולאת ביצוע אחת וקצת תיכנות מתקדם. אבל החלוקה לתהליכים מאפשרת לנו לבנות את התוכנה באופן לוגי שמקל רבות על הכתיבה והתחזוק. וזה הצעד הראשון לתכנון טוב.