|
|
|
@ -27,6 +27,7 @@ import time |
|
|
|
|
import itertools |
|
|
|
|
import tempfile |
|
|
|
|
import socket |
|
|
|
|
import concurrent.futures |
|
|
|
|
|
|
|
|
|
import cdist |
|
|
|
|
import cdist.hostsource |
|
|
|
@ -266,13 +267,14 @@ class Config(object): |
|
|
|
|
else: |
|
|
|
|
yield cdist_object |
|
|
|
|
|
|
|
|
|
def iterate_once(self): |
|
|
|
|
def iterate_once_parallel(self): |
|
|
|
|
""" |
|
|
|
|
Iterate over the objects once - helper method for |
|
|
|
|
iterate_until_finished |
|
|
|
|
""" |
|
|
|
|
objects_changed = False |
|
|
|
|
|
|
|
|
|
cargo = [] |
|
|
|
|
for cdist_object in self.object_list(): |
|
|
|
|
if cdist_object.requirements_unfinished(cdist_object.requirements): |
|
|
|
|
"""We cannot do anything for this poor object""" |
|
|
|
@ -281,18 +283,38 @@ class Config(object): |
|
|
|
|
if cdist_object.state == core.CdistObject.STATE_UNDEF: |
|
|
|
|
"""Prepare the virgin object""" |
|
|
|
|
|
|
|
|
|
self.object_prepare(cdist_object) |
|
|
|
|
objects_changed = True |
|
|
|
|
# self.object_prepare(cdist_object) |
|
|
|
|
# objects_changed = True |
|
|
|
|
cargo.append(cdist_object) |
|
|
|
|
|
|
|
|
|
if cdist_object.requirements_unfinished(cdist_object.autorequire): |
|
|
|
|
"""The previous step created objects we depend on - |
|
|
|
|
wait for them |
|
|
|
|
""" |
|
|
|
|
if cargo: |
|
|
|
|
with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor: |
|
|
|
|
for x in executor.map(self.object_prepare, cargo): |
|
|
|
|
pass # returns None |
|
|
|
|
objects_changed = True |
|
|
|
|
|
|
|
|
|
cargo.clear() |
|
|
|
|
for cdist_object in self.object_list(): |
|
|
|
|
if cdist_object.requirements_unfinished(cdist_object.requirements): |
|
|
|
|
"""We cannot do anything for this poor object""" |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
if cdist_object.state == core.CdistObject.STATE_PREPARED: |
|
|
|
|
self.object_run(cdist_object) |
|
|
|
|
objects_changed = True |
|
|
|
|
if cdist_object.requirements_unfinished(cdist_object.autorequire): |
|
|
|
|
"""The previous step created objects we depend on - |
|
|
|
|
wait for them |
|
|
|
|
""" |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
# self.object_run(cdist_object) |
|
|
|
|
# objects_changed = True |
|
|
|
|
cargo.append(cdist_object) |
|
|
|
|
|
|
|
|
|
if cargo: |
|
|
|
|
with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor: |
|
|
|
|
for x in executor.map(self.object_run, cargo): |
|
|
|
|
pass # returns None |
|
|
|
|
objects_changed = True |
|
|
|
|
|
|
|
|
|
return objects_changed |
|
|
|
|
|
|
|
|
@ -305,7 +327,8 @@ class Config(object): |
|
|
|
|
objects_changed = True |
|
|
|
|
|
|
|
|
|
while objects_changed: |
|
|
|
|
objects_changed = self.iterate_once() |
|
|
|
|
if self.jobs: |
|
|
|
|
objects_changed = self.iterate_once_parallel() |
|
|
|
|
|
|
|
|
|
# Check whether all objects have been finished |
|
|
|
|
unfinished_objects = [] |
|
|
|
|