Source code for openalea.rsml.hirros

"""
Module that provide tools on 2D+t RSML-type MTG:
    * Get observation times : `times``
    * Extract primary lengths
    * Extract secondary lengths for each plant
"""
import argparse
from path import Path

import numpy as np
import pandas as pd

import openalea.rsml


[docs] def walk(dir: str, recursive=True, force=False): """ Traverse a set of directories and return rsml files. Notes : rsml files are defined by 61_graph*.rsml """ dir = Path(dir) fns = [] for rsml_file in ['61_graph_expertized.rsml', '61_graph.rsml']: if fns and force: continue rsmls = dir.glob(rsml_file) if rsmls : fns.extend(rsmls) if recursive: rsmls = dir.glob('*/%s'%rsml_file) if rsmls : fns.extend(rsmls) if not fns: print("ERROR: no rsml files found") final_fns = [] for fn in fns: if not force: if (fn.parent/'80_graph_analysis.xlsx').exists(): continue elif (fn.parent/'80_graph_expertized_analysis.xlsx').exists(): continue final_fns.append(fn) return final_fns
[docs] def read(fn): g = rsml.rsml2mtg(fn) return g
[docs] def times(g): """Return Observation dates in hours.""" obs_t = g._graph_properties['metadata']['observation-hours'] observations = [float(t) for t in obs_t] return observations
# Primary Length
[docs] def primaries(g, plant_ids, obs): """ Return length of primary roots at various observation times. """ prims = [next(g.component_roots_iter(pid)) for pid in plant_ids] seq = [sequence_length(g, obs, vid) for vid in prims] return seq
[docs] def secondary(g, pid, obs): """ Return the length of secondary roots through time of a given plant.""" res = [] for cid in g.components_iter(pid): if g.order(cid) >= 1: res.append(sequence_length(g, obs, cid)) return res
[docs] def sequence_length(g, obs, vid): """ Return the length of a root at different observation times. """ polyline = g.property('geometry')[vid] pos = np.array(polyline) time = g.property('time_hours')[vid] n = len(obs) obs_index = 0 i = -1 length_time = [] for to in obs: index = -1 for t in time: if t <= to: index+=1 else: break if index == -1: length_time.append(0.) else: vec = np.diff(pos[:index+1],axis=0)**2 sum_seg = vec.sum(axis=1)**.5 length_time.append(sum_seg.sum()) return length_time
[docs] def length_and_number(secondary): secs = np.array(secondary) total_length = secs.sum(axis=0) total_number = np.count_nonzero(secs, axis=0) return total_length.tolist(), total_number.tolist()
[docs] def dataframes(obs, primaries, secondaries): data = [obs] index=['time(h)'] for i, p in enumerate(primaries): data.append(p) index.append('P%d'%(i+1)) ps = np.array(primaries) _mean = ps.mean(axis=0) data.append(_mean) index.append('Mean') dfp = pd.DataFrame(data, index = index) #print(dfp) dfs= [] for sec in secondaries: data = [obs] index=['time(h)'] if not sec: continue #print('Secondary: ', sec) for i, p in enumerate(sec): data.append(p) index.append('R%d'%(i+1)) tot_len, tot_num = length_and_number(sec) index.append('Total Length') data.append(tot_len) index.append('Total Number') data.append(tot_num) df = pd.DataFrame(data, index = index) dfs.append(df) return dfp, dfs
[docs] def write_xls(xls_file, dfp, dfs): if xls_file: with pd.ExcelWriter(xls_file, engine="xlsxwriter") as writer: dfp.to_excel(writer, sheet_name='Prim', float_format="%.2f", header = False) workbook = writer.book bold_format = workbook.add_format({'bold': True}) worksheet = writer.sheets['Prim'] worksheet.set_row(0, None, bold_format) for i, df in enumerate(dfs): df.to_excel(writer, sheet_name='RP%d'%(i+1), float_format="%.2f", header = False) worksheet = writer.sheets['RP%d'%(i+1)] worksheet.set_row(0, None, bold_format)
[docs] def write_xls_all(xls_file, fns, primaries, secondaries): with pd.ExcelWriter(xls_file, engine="xlsxwriter") as writer: startrow = 0 for i, dfp in enumerate(primaries): dfp.to_excel(writer, sheet_name='Prim', float_format="%.2f", header = False, startrow=startrow+1) workbook = writer.book bold_format = workbook.add_format({'bold': True}) worksheet = writer.sheets['Prim'] worksheet.set_row(startrow+1, None, bold_format) worksheet.write(startrow, 0, fns[i]) startrow += len(dfp)+3 startrow = 0 for i, dfs in enumerate(secondaries): name = fns[i] for j, df in enumerate(dfs): df.to_excel(writer, sheet_name='Lateral', float_format="%.2f", header = False, startrow=startrow+1) worksheet = writer.sheets['Lateral'] worksheet.set_row(startrow+1, None, bold_format) worksheet.write(startrow, 0, name) worksheet.write(startrow, 1, 'Plant %d'%(j+1)) startrow += len(df)+3
[docs] def process(g): obs = times(g) plant_ids = g.vertices(scale=1) prims = primaries(g, plant_ids, obs) secondaries = [] for pid in plant_ids: s2 = secondary(g, pid, obs) secondaries.append(s2) df_primary, df_secondaries = dataframes(obs, prims, secondaries) return df_primary, df_secondaries
[docs] def run(fn): g = read(fn) dfp, dfs = process(g) # Write in xlsx if 'expertized' in fn: xlsx_file = fn.parent/'80_graph_expertized_analysis.xlsx' else: xlsx_file = fn.parent/'80_graph_analysis.xlsx' write_xls(xlsx_file, dfp, dfs) print('WRITE %s'%xlsx_file)
[docs] def run_all(fns): prims = [] secondaries = [] for fn in fns: g = read(fn) dfp, dfs = process(g) prims.append(dfp) secondaries.append(dfs) write_xls_all('gxe_results.xlsx', fns, prims, secondaries) print('WRITE gxe_results.xlsx')
[docs] def merge_and_compute_primaries(dataframes): """Mean of Mean or mean of total Length and total number""" pass
[docs] def merge_and_compute_secondaries(dataframes): """Mean of Mean or mean of total Length and total number""" pass
[docs] def main(): parser = argparse.ArgumentParser() parser.add_argument('-d', default='.', help='directory to process') parser.add_argument('-r', default=True, type=bool, help='traverse the directories recursively') parser.add_argument('-f', help='text file containing the list of directories to process') args = parser.parse_args() dir = args.d recursive = args.r gxe_file = args.f fns = [] if gxe_file: print('GxE file to process', gxe_file) dirs = [] with open(gxe_file, 'r') as gxe: dirs = [d.strip() for d in gxe] dirs = [d for d in dirs if Path(d).exists()] for d in dirs: rsml_files = walk(dir=d, recursive=recursive, force=True) fns.extend(rsml_files) print('Process files %s'%(' '.join(fns))) run_all(fns) else: fns = walk(dir=dir, recursive=recursive) if not fns: return if len(fns) == 1: fn = fns[0] print('Process file %s'%(fns[0])) run(fn) else: for fn in fns: print('Process file %s'%(fns[0])) run(fn)
if __name__=='__main__': fns = walk('set_de_5/230403VS004') g = read(fns[0]) obs = times(g) plant_ids = g.vertices(scale=1) prims = primaries(g, plant_ids, obs) secondaries = [] for pid in plant_ids: s2 = secondary(g, pid, obs) secondaries.append(s2) for s in secondaries: secs = np.array(s) total_length = secs.sum(axis=0) total_secs = np.count_nonzero(secs, axis=0) print(total_length) print(total_secs) print() # Write in xlsx write_xls('80_graph_analysis.xlsx', obs, primaries=prims, secondaries=secondaries)