Open source scheduler

Has anyone came across an open source scheduler? Swim lanes sila2 drivers etc. Maybe academics are working on such a thing?

I have interfaces for a few different pieces of equipment in Python, I suppose organizing these into a library would count as a scheduler? Often, equipment companies are very locked down about letting people build open-source interfaces to their equipment. They will give you the documentation to create an interface, but to get it you have to go through several of the following steps:

  1. Contact a sales or customer support rep over email and await a response
  2. Prove that you own the equipment by providing a serial number
  3. Provide the name of the organization you are working for that owns the equipment
  4. Sign an NDA to not release the API

This is to obtain documentation for the interface to a piece of equipment. I believe one of the moats that scheduling companies have is the license to distribute interfaces to equipment. Just owning the equipment and building your own interface does not always give you that right.

However, many companies have no restriction on providing interfaces or documentation to their equipment. One could easily build open-source interfaces to these (some of which I have done already) and package these into a scheduler. What equipment are you interested in?

Are you aware of sila2?

They have set up standardised interfaces for device drivers and tried to encourage vendors to support it. As you describe its not so easy to align incentives in industry in the same way midi did.

That of course is drivers but what im asking about here is schedulers like green button go or cellario. Im wondering if anyone has worked on an open source equivalent of something like that. Something that can hook
Into sila2 drivers (for which there are a number of things) or homebrew drivers (there is usually a way to work around :))

Many SiLA drivers are not free or open-source, so it is not possible to distribute an open-source scheduler that relies on these drivers.

My scheduling/ equipment orchestration is all done in Python. I just write a script that calls the different interfaces as needed, with logic implemented in Python. As far as I know, a scheduler is just an implementation of different device drivers with some control logic, so once you have drivers/ interfaces then you can use a typical programming environment to tie everything together with logic.

@Dovod I’m curious what the desire for an open source scheduler is? Cost / customizability / new features that aren’t present on the two you mentioned?

A few reasons i suppose.

Might be fun (especially in 3d with godot)
Open up repurposing or purchase of second hand equipment
Lower initial barrier of entry into space as can go full DIY
Allow you to do something with sila2 drivers.

I think initial costs of things like ELNs and LIMS mean that a lot of companies build with what they have available. Normally excel and lab benches with humans behind them!

2 Likes

Green button go scheduler can run SILA drivers. We offer a driver development training, but being a commercial product far from open source. We do try to empower our customers to do their own thing if that is what they want to do. We have some distributors and other customers that develop their own drivers to control devices in GBG.

2 Likes

would something like this be useful: (https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=162089cd47bcb1cada5da4f786281af9ffc78885)? I was part of a project back in 2012 where a custom scheduler was created using Petri Nets as a backbone to interface with a Precise arm, combi, and Envision plate reader.

2 Likes

There have been many efforts in the past to write open-source schedulers/executors for the lab environment; I am trying to figure out why they never took off (e.g. Aquarium from Doug Densmore)

From the recent efforts, I would reach out to these two projects:

  • The SiLA 2 Manager by Lukas Bromig (he works at my former company UniteLabs now, not sure if they plan any open source efforts
  • LabOP execution engine, they seem to have funding problems (on paper, it seems to tick all boxes, but isn’t accessible enough yet?)

Have you looked at these and other past projects yet?

1 Like

SiLA 2 Manager seems really interesting, I’d love to learn more about that

Also, welcome to the forum Max!

1 Like

No, these look exactly like what i am looking for thank you.

Although if something commercial and battle tested like unite labs open sourced their code like blender did originally did then things could really get interesting (despite being an unlikely outcome!)

I haven’t read the whole thread, but an open source SiLA2 based scheduler is what Stefan Maak, PhD student in the Kiwi biolab is working on: Team , might reach out to his mentor Mark Dörr if really interested . I am also involved in the LabOP project mentioned above, which aims to eventually involve scheduling functionality but isn’t there yet.

2 Likes

Is anybody experienced with using the Hamilton Dynamic Scheduler in this regard? Is this worth looking into? I am just starting out trying to handle parallel processes.

I believe @KyleCook_GeNovu has extensively used the scheduler. Not sure that I would qualify it as an Open Source Scheduler.

We’ve used the Hamilton Scheduler for a lot of solutions. I’m not clear on what exactly you are trying to do so I can’t speak to if it is worth looking into or not.

It can be used without having a Hamilton liquid handler in the process and I’ve found it capable for everything we’ve had to do. It’s not open source and might be priced differently based on what else you have quoted, but I believe the line item price is around a one-time $5k cost. At that price it is very attractive vs the other closed source scheduler solutions out there.

scheduler engine is not very complex. Actually we wrote one for our project, total code is about 20000 lines. We can not open its code for it is under a commercial project. But it is possible

1 Like

I just wrote a tiny 200-line scheduler with the help of GPT-4.

I recommend simply writing detailed requirements of your scheduler in plain english or pseudocode and asking chatgpt to help make a simple and working python skeleton. From there, you just need to integrate the drivers for your equipment & call pylabrobot/pyhamilton functions for your liquid handler.

Here’s my scheduler skeleton:

@dataclass
class Plate:
    #generated
    plate_id: str
    storage_loc: str 
    
    #user-defined
    num_wells: int
    storage_instrument: str #'6000', '6002', '2c'
    deepwell: bool
    max_vol: float
    min_vol: float

    samples = {} #filled with Sample objects as they are declared

    @classmethod
    def generate_plate(cls, sched, plate_info_dict):
        storage_instrument = plate_info_dict.get('storage_instrument', "6000")
        num_wells = plate_info_dict.get('num_wells', 96)
        deepwell = plate_info_dict.get('deepwell', False)
        max_vol = plate_info_dict.get('max_vol', 200.0)
        min_vol = plate_info_dict.get('min_vol', 0.0)
        plate_id = str(uuid4())
        existing_locs = sched.used_storage_locs[storage_instrument]
        storage_loc = cls.generate_unique_storage_loc(existing_locs)
        sched.used_storage_locs[storage_instrument].append(storage_loc)
        return cls(plate_id, storage_loc, num_wells, storage_instrument, deepwell, max_vol, min_vol)

    @classmethod
    def generate_unique_storage_loc(cls, existing_locs: List[str]):
        for rack in range(1, 10):
            for position in range(1, 24):
                candidate_loc = f"{rack:1d}{position:02d}"
                if candidate_loc not in existing_locs:
                    return candidate_loc
        return None
    
    def __str__(self):
        # Format plate and location information
        plate_loc_info = (
            f"plate: {self.plate_id[:8]}    "
            f"loc: {self.storage_instrument:<5}{self.storage_loc:<4}\n"
        )

        # Format sample and well information for each sample
        sample_info = [
            f"  sample: {sample.sample_id[:8]}    well: {well:<3}    vol: {sample.volume:6.2f}    attributes: {sample.attributes}"
            for well, sample in self.samples.items()
        ]

        # Join the plate and location info with sample info
        return plate_loc_info + '\n'.join(sample_info)

@dataclass
class Sample:
    """Represents a biological sample."""
    #generated
    sample_id: str
    plate: Plate
    well: int       #well index of a particular plate

    #user-defined
    volume: float
    created_at: datetime
    expires_in: timedelta
    attributes: Dict[str, Any] = field(default_factory=dict)

    @classmethod
    def generate_sample(cls, plate, well, expires_in, attributes):
        sample_id = str(uuid4())
        created_at = datetime.now()
        expires_at = created_at + expires_in
        volume = plate.min_vol                  #assures min_vol is not violated. update vol after as needed
        plate.samples[well] = cls(sample_id, plate, well, volume, expires_in, expires_at, attributes)
        return plate.samples[well]

    def update_volume(self, delta_volume: float):
        self.volume += delta_volume
        if self.volume > self.plate.max_vol:
            raise ValueError(f"Sample {self.sample_id} volume exceeds max volume of {self.plate.max_vol} uL.")
        if self.volume < self.plate.min_vol:
            raise ValueError(f"Sample {self.sample_id} volume is below min volume of {self.plate.min_vol} uL.")

    def age_sample(self):
        current_time = datetime.now()
        if current_time >= self.expires_at:
            print(f"Warning: Sample {self.sample_id} has expired.")
    
    def __str__(self):
        return (
            f"Sample ID: {self.sample_id[:8]}    "
            f"Plate: {self.plate.plate_id[:8]}    "
            f"Loc: {self.plate.storage_instrument:<5}{self.plate.storage_loc:<4}    "
            f"Well: {self.well:<4}    "
            f"Volume: {self.volume:<20}    "
            f"Attributes: {self.attributes}"
        )

@dataclass(order=True)
class Task:
    """Represents a task to be scheduled."""
    sched_time: float
    priority: int
    func: Callable[..., None]
    sample_list: List[Sample] = field(compare=False) 
    reagent_list: List[Sample] = field(compare=False)
    kwargs: Dict[str, Any] = field(default_factory=dict)
    end_time: float = field(init=False, default=None, hash=False, compare=False)
    task_id: str = field(init=False, default_factory=lambda: str(uuid4()), compare=False)
    status: str = field(init=False, default='Scheduled', compare=False)
    reagents_loaded: bool = field(default=False, compare=False)
    
    def __post_init__(self):
        self.sample_plates = self.group_samples_by_plate(samples=self.sample_list)
        self.reagent_plates = self.group_samples_by_plate(samples=self.reagent_list)

    def group_samples_by_plate(self, samples) -> Dict[str, List[Sample]]:
        plates = defaultdict(list)
        for sample in samples:
            plates[sample.plate.plate_id].append(sample)
        return plates
    
    def execute(self, ham_int, cyt_6002_int, cyt_6000_int):
        self.func(ham_int, cyt_6002_int, cyt_6000_int, self.sample_plates, self.reagent_plates, **self.kwargs)
        self.status = 'Completed'

class Scheduler:
    """Scheduler for executing tasks."""
    def __init__(self):
        #state variables of Scheduler
        self.task_queue = []      # for pending tasks
        self.completed_tasks = [] # for completed tasks
        self.failed_tasks = []    # for failed tasks
        self.used_storage_locs = {"6000": [], "6002": []} # for keeping track of active storage locations

    def add_task(self, task: Task):
        self.lock = threading.Lock()
        with self.lock:
            heapq.heappush(self.task_queue, task)
        self.get_scheduler_state()

    def execute(self, ham_int, cyt_6002_int, cyt_6000_int, tasks: List[Task]):
        for task in tasks:
            try:
                if task.reagents_loaded:
                    task.execute(ham_int, cyt_6002_int, cyt_6000_int)
                    self.completed_tasks.append(task)
                else:
                    print(f"Reagents for task {task.task_id} not loaded. Skipping.")
                    self.task_queue.append(task)
                    #self.failed_tasks.append(task)
            except Exception as e:
                print(f"Error executing task {task.task_id}: {e}")
                self.failed_tasks.append(task)

    def get_scheduler_state(self):
        summary = f"{'Task ID':<36} {'Status':<15} {'Function':<25} {'Scheduled Time':<20}\n"
        summary += "=" * 100

        for task in self.completed_tasks + self.task_queue + self.failed_tasks:
            status = 'Scheduled' if task in self.task_queue else 'Completed' if task in self.completed_tasks else 'Failed'
            sched_time_str = datetime.fromtimestamp(task.sched_time).strftime('%Y-%m-%d %H:%M:%S')
            summary += f"\n{task.task_id:<36} {status:<15} {task.func.__name__:<25} {sched_time_str:<20} "
  
        print(f"\nCurrent State Summary:\n{summary}\n")

    def run(self):
        #with HamiltonInterface(simulate=True) as ham_int, CytomatInterface('COM3') as cyt_6002_int, CytomatInterface('COM4') as cyt_6000_int:
            ham_int = "hammy"
            cyt_6002_int = "cyt_6002"
            cyt_6000_int = "cyt_6000"
            while True:
                time.sleep(1)
                if not self.task_queue:
                    continue

                current_time = time.time()
                tasks_to_execute = []

                while self.task_queue and self.task_queue[0].sched_time <= current_time:
                    task = heapq.heappop(self.task_queue)
                    tasks_to_execute.append(task)

                if tasks_to_execute:
                    self.execute(ham_int, cyt_6002_int, cyt_6000_int, tasks_to_execute)
                    self.get_scheduler_state()
                    """
                    # In the future, we may want to combine tasks with the same method and kwargs that collide in the scheduling.
                    # For now, we just execute them separately.
                    combined_tasks = {}
                    for task in tasks_to_execute:
                        key = (task.priority, task.func)
                        if key not in combined_tasks:
                            combined_tasks[key] = task
                        else:
                            combined_tasks[key].sample_list.extend(task.sample_list)

                    # Now execute the combined tasks
                    for task in combined_tasks.values():
                        self.execute(ham_int, cyt_6002_int, cyt_6000_int, [task])
                    
                    self.get_scheduler_state()
                    """
2 Likes

The scheduler here is not task scheduler like in windows, but it is called dynamic scheduler, which can relocate the actitivity and tasks based on used resources. So first there should be resource defination, and task defination. The task is a sequential list of activities (not only one function), and for every activity some resources will be required.

following link is the introduction for for Dynamic Scheduling in the Laboratory
https://journals.sagepub.com/doi/full/10.1016/j.jala.2004.10.001

4 Likes

Hi all,

Noob to the forum here! I like the direction that this is going and if we can contribute somehow we would like to do so. More thoughts below…

Open source thoughts (and in general thoughts on freedom to create and invent):- We are heading in the direction of (partial) open source for Revolution Web (our new version of Revolution that is web based for the UI and also has some other pretty major changes to software arch). The key points that will be of interest to members of this forum is that we will be offering a free base version of Revolution Web. So a user will be able to download the “Runtime” locally and build their system using our drivers, or their own, or a 3rd party drivers without needing to purchase licenses… or go through a lengthy quotation process etc etc.

Why are we doing this I hear you say! Well that’s a long story that probably should be in a new thread of it’s own but briefly it is our goal to move Revolution to a “Freemium” model with the release of Revolution Web. The base feature set will be free.

Brief list of things that could be of interest to this group:

  • Revolution Web’s C# driver interface ( and all supporting system interfaces ) are open
  • The Python scripting interface allows rich access to scheduling features so you can run the system from Python if you wish (instead of writing schedules)
  • We are “open source” on some things like our public plate library:- https://labware.ukrobotics.app/
  • I have heard that Sila v2 is much improved over v1.x so we will look to support Sila also…

We are open to feedback and suggestions and welcome beta testers in the near future. We will have a web based 3D simulation (using ThreeJs) that runs in your browser and looks pretty cool also. If there is any interest in becoming a beta tester please DM me via Linkedin or this forum (or via ukrobotics.com)!!

Mike

9 Likes