Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mount remote filesystem with sshfs #255

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
18 changes: 14 additions & 4 deletions damnit/backend/extract_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def innetgr(netgroup: bytes, host=None, user=None, domain=None):
libc = CDLL("libc.so.6")
return bool(libc.innetgr(netgroup, host, user, domain))


def default_slurm_partition():
username = getpass.getuser().encode()
if innetgr(b'exfel-wgs-users', user=username):
Expand All @@ -43,6 +44,7 @@ def default_slurm_partition():
return 'upex'
return 'all'


def run_in_subprocess(args, **kwargs):
env = os.environ.copy()
ctxsupport_dir = str(Path(__file__).parents[1] / 'ctxsupport')
Expand All @@ -52,6 +54,7 @@ def run_in_subprocess(args, **kwargs):

return subprocess.run(args, env=env, **kwargs)


def process_log_path(run, proposal, ctx_dir=Path('.'), create=True):
p = ctx_dir.absolute() / 'process_logs' / f"r{run}-p{proposal}.out"
if create:
Expand Down Expand Up @@ -89,13 +92,13 @@ def loop():

def extract_in_subprocess(
proposal, run, out_path, cluster=False, run_data=RunData.ALL, match=(),
python_exe=None, mock=False, tee_output=None
python_exe=None, mock=False, tee_output=None, data_location='localhost',
):
if not python_exe:
python_exe = sys.executable

args = [python_exe, '-m', 'ctxrunner', 'exec', str(proposal), str(run), run_data.value,
'--save', out_path]
'--save', out_path, '--data-location', data_location]
if cluster:
args.append('--cluster-job')
if mock:
Expand Down Expand Up @@ -246,7 +249,8 @@ def slurm_options(self):
return opts

def extract_and_ingest(self, proposal, run, cluster=False,
run_data=RunData.ALL, match=(), mock=False, tee_output=None):
run_data=RunData.ALL, match=(), mock=False, tee_output=None,
data_location='localhost'):
if proposal is None:
proposal = self.proposal

Expand All @@ -261,6 +265,7 @@ def extract_and_ingest(self, proposal, run, cluster=False,
reduced_data = extract_in_subprocess(
proposal, run, out_path, cluster=cluster, run_data=run_data,
match=match, python_exe=python_exe, mock=mock, tee_output=tee_output,
data_location=data_location,
)
log.info("Reduced data has %d fields", len(reduced_data))
add_to_db(reduced_data, self.db, proposal, run)
Expand All @@ -274,6 +279,9 @@ def extract_and_ingest(self, proposal, run, cluster=False,
log.info("Sent Kafka update to topic %r", self.db.kafka_topic)

# Launch a Slurm job if there are any 'cluster' variables to evaluate
if data_location != 'localhost':
log.info('Skipping cluster variables with remote data [%s].', data_location)
return
ctx = self.ctx_whole.filter(run_data=run_data, name_matches=match, cluster=cluster)
ctx_slurm = self.ctx_whole.filter(run_data=run_data, name_matches=match, cluster=True)
if set(ctx_slurm.vars) > set(ctx.vars):
Expand Down Expand Up @@ -374,6 +382,7 @@ def reprocess(runs, proposal=None, match=(), mock=False):
ap.add_argument('run_data', choices=('raw', 'proc', 'all'))
ap.add_argument('--cluster-job', action="store_true")
ap.add_argument('--match', action="append", default=[])
ap.add_argument('--data-location', default='localhost', help=argparse.SUPPRESS)
args = ap.parse_args()
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s")
Expand All @@ -389,4 +398,5 @@ def reprocess(runs, proposal=None, match=(), mock=False):
Extractor().extract_and_ingest(args.proposal, args.run,
cluster=args.cluster_job,
run_data=RunData(args.run_data),
match=args.match)
match=args.match,
data_location=args.data_location)
77 changes: 51 additions & 26 deletions damnit/backend/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,30 @@
from socket import gethostname
from threading import Thread

from extra_data.read_machinery import find_proposal
from kafka import KafkaConsumer

from .db import DamnitDB
from .extract_data import RunData, process_log_path

# For now, the migration & calibration events come via DESY's Kafka brokers,
# but the AMORE updates go via XFEL's test instance.
CONSUMER_ID = 'xfel-da-amore-prototype-{}'
KAFKA_CONF = {
'maxwell': {
'brokers': ['exflwgs06:9091'],
'topics': ["test.r2d2", "cal.offline-corrections"],
'events': ["migration_complete", "run_corrections_complete"],
},
'onc': {
'brokers': ['exflwgs06:9091'],
'topics': ['test.euxfel.hed.daq', 'test.euxfel.hed.cal'],
'events': ['daq_run_complete', 'online_correction_complete'],
}
CONSUMER_ID = "xfel-da-amore-prototype-{}"
KAFKA_BROKERS = ["exflwgs06:9091"]
KAFKA_TOPICS = ["test.r2d2", "cal.offline-corrections", "test.euxfel.hed.daq", "test.euxfel.hed.cal"]
KAFKA_EVENTS = ["migration_complete", "run_corrections_complete", "daq_run_complete", "online_correction_complete"]
BACKEND_HOSTS_TO_ONLINE = ['max-exfl-display003.desy.de', 'max-exfl-display004.desy.de']
ONLINE_HOSTS ={
'FXE': 'sa1-onc-fxe.desy.de',
'HED': 'sa2-onc-hed.desy.de',
'MID': 'sa2-onc-mid.desy.de',
# 'SA1': '',
# 'SA2': '',
# 'SA3': '',
'SCS': 'sa3-onc-scs.desy.de',
'SPB': 'sa1-onc-spb.desy.de',
'SQS': 'sa3-onc-sqs.desy.de',
'SXP': 'sa3-onc-sxp.desy.de',
}

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -68,25 +73,39 @@ def watch_processes_finish(q: queue.Queue):
class EventProcessor:

def __init__(self, context_dir=Path('.')):
if gethostname().startswith('exflonc'):
log.warning('Running the DAMNIT listener on the online cluster is not allowed')
exit(1)

self.context_dir = context_dir
self.db = DamnitDB.from_dir(context_dir)
# Fail fast if read-only - https://stackoverflow.com/a/44707371/434217
self.db.conn.execute("pragma user_version=0;")
self.proposal = self.db.metameta['proposal']
log.info(f"Will watch for events from proposal {self.proposal}")

if gethostname().startswith('exflonc'):
# running on the online cluster
kafka_conf = KAFKA_CONF['onc']
else:
kafka_conf = KAFKA_CONF['maxwell']
tmichela marked this conversation as resolved.
Show resolved Hide resolved
log.info(f"Will watch for events from proposal {self.proposal}")

consumer_id = CONSUMER_ID.format(self.db.metameta['db_id'])
self.kafka_cns = KafkaConsumer(*kafka_conf['topics'],
bootstrap_servers=kafka_conf['brokers'],
self.kafka_cns = KafkaConsumer(*KAFKA_TOPICS,
bootstrap_servers=KAFKA_BROKERS,
group_id=consumer_id)
self.events = kafka_conf['events']

# check backend host and connection to online cluster
self.online_data_host = None
self.run_online = self.db.metameta.get('run_online', False) is not False
if self.run_online and gethostname() not in BACKEND_HOSTS_TO_ONLINE:
log.warning(f"Disabled online processing, the backend must run on one of: {BACKEND_HOSTS_TO_ONLINE}")
self.run_online = False
if self.run_online:
topic = Path(find_proposal(f'p{self.proposal:06}')).parts[-3]
if (remote_host := ONLINE_HOSTS.get(topic)) is None:
log.warning(f"Can't run online processing for topic '{topic}'")
self.run_online = False
else:
self.online_data_host = remote_host
log.debug("Processing online data? %s", self.run_online)

# Monitor thread for subprocesses
self.extract_procs_queue = queue.Queue()
self.extract_procs_watcher = Thread(
target=watch_processes_finish,
Expand All @@ -113,25 +132,28 @@ def run(self):
def _process_kafka_event(self, record):
msg = json.loads(record.value.decode())
event = msg.get('event')
if event in self.events:
if event in KAFKA_EVENTS:
log.debug("Processing %s event from Kafka", event)
getattr(self, f'handle_{event}')(record, msg)
else:
log.debug("Unexpected %s event from Kafka", event)

def handle_daq_run_complete(self, record, msg: dict):
self.handle_event(record, msg, RunData.RAW)
if self.run_online:
self.handle_event(record, msg, RunData.RAW, self.online_data_host)

def handle_online_correction_complete(self, record, msg: dict):
self.handle_event(record, msg, RunData.PROC)
if self.run_online:
self.handle_event(record, msg, RunData.PROC, self.online_data_host)

def handle_migration_complete(self, record, msg: dict):
self.handle_event(record, msg, RunData.RAW)

def handle_run_corrections_complete(self, record, msg: dict):
self.handle_event(record, msg, RunData.PROC)

def handle_event(self, record, msg: dict, run_data: RunData):
def handle_event(self, record, msg: dict, run_data: RunData,
data_location: str = "localhost"):
proposal = int(msg['proposal'])
run = int(msg['run'])

Expand All @@ -149,10 +171,12 @@ def handle_event(self, record, msg: dict, run_data: RunData):
# Create subprocess to process the run
extract_proc = subprocess.Popen([
sys.executable, '-m', 'damnit.backend.extract_data',
str(proposal), str(run), run_data.value
str(proposal), str(run), run_data.value,
'--data-location', data_location,
], cwd=self.context_dir, stdout=logf, stderr=subprocess.STDOUT)
self.extract_procs_queue.put((proposal, run, extract_proc))


def listen():
# Set up logging to a file
file_handler = logging.FileHandler("amore.log")
Expand All @@ -177,5 +201,6 @@ def listen():
if os.stat("amore.log").st_uid == os.getuid():
os.chmod("amore.log", 0o666)


if __name__ == '__main__':
listen()
Loading