#!/usr/bin/env python # -*- coding: UTF-8 -*- # Copyright © 1985, 2004 Progiciels Bourbeau-Pinard inc. # François Pinard , 1985-01, 2004-12. u"""\ Gantt - Éditeur de diagrammes de Gantt. Usage: gantt [OPTION]... [FICHIER]... Options: -a Produire le diagramme global pour les activités. -e Produire le diagramme d'occupation des exécutants. -t Produire le tableau des attributions. -s Supprimer les titres et les explications de libellés. -i Entrelacer les explications au diagramme, à chaque jour. -d HEURES Utiliser ces heures pour diviser chaque jour en tranches. -j JOURS Limiter chaque diagramme à ce nombre de jours. -r JOURS Retarder le départ des diagrammes de ce nombre de jours. -m COLONNES Prévoir ce nombre de colonnes pour les libellés. -w COLONNES Ne pas dépasser ce nombre de colonnes par ligne. -n NOMBRE Ne conserver que ce nombre d'activités par fichier lu. -o Produire une cédule comprenant les fractions déjà complétées. -c Forcer une allocation contiguë des tranches d'une activité. [brisé] -f Implicitement utiliser les fins de semaine. -v Indiquer les étapes d'exécution du programme. En l'absence d'aucun de `-aet', `-a' est présumé. Les valeurs implicites sont `-d10,12,15,17', `-m16' et `-w79'. Si aucun FICHIER n'est fourni, le programme lit l'entrée standard. Toutes les données sont tenues en format Allout: voir le manuel. JOURS est un nombre flottant. COLONNES est un nombre entier. HEURES est une liste de nombres entiers séparés par des virgules, ou une virgule seule. """ # REVOIR: Implanter les dépendances sur des sous-arbres complets. from __future__ import division __metaclass__ = type import bisect, codecs, datetime, math, os, re, sys, time from Etc.Allout import allout # Largeur prise par l'ensemble des marges. largeur_marges = 12 # Jours de la semaine. semaine = (u"Lundi", u"Mardi", u"Mercredi", u"Jeudi", u"Vendredi", u"Samedi", u"Dimanche") # Pour toute erreur durant l'analyse ou l'exécution. class Erreur(Exception): def __str__(self): return self.args[0].encode('Latin-1', 'backslashreplace') #def __unicode__(self): # return self.args[0].encode(u'Latin-1', u'backslashreplace') class Main: option_tableau_des_attributions = False option_diagramme_activites = False option_diagramme_executants = False option_sans_description = False option_y_compris_termine = False option_nombre_maximal_de_colonnes = 79 option_largeur_libelle = 16 option_limite_en_jours = None option_allocation_contigue = False option_retarder_depart = 0 option_verbeuse = False option_intercaler_descriptions = False option_nombre_limite = None # Lignes de titre pour les diagrammes. titres = [] # Heures de séparation entre deux tranches. points_de_tranche = '10:00', '12:00', '15:00', '17:00' # Nombre de tranches dans une journée. tranches_par_jour = 5 # Tranche limite au-delà de laquelle aucune cédule ne se fera. tranche_limite = None # Options implicites à la création d'une activité ou d'un exécutant. options_activite = {'fixe': 5.} options_executant = {'jours': [True, True, True, True, True, False, False]} def main(self, *arguments): if not arguments: sys.stdout.write(__doc__) return # Décoder les options. import getopt options, arguments = getopt.getopt(arguments, 'acd:efij:m:n:or:stvw:') for option, valeur in options: if option == '-a': self.option_diagramme_activites = True elif option == '-c': self.option_allocation_contigue = True elif option == '-d': if valeur == ',': tranches = [] else: tranches = map(int, valeur.split(',')) tranches.sort() assert tranches[0] > 0, tranches assert tranches[-1] < 24, tranches self.points_de_tranche = ['%.2d:00' % tranche for tranche in tranches] self.tranches_par_jour = len(tranches) + 1 self.options_activite['fixe'] = len(tranches) elif option == '-e': self.option_diagramme_executants = True elif option == '-f': self.options_executant['jours'][5] = True self.options_executant['jours'][6] = True elif option == '-i': self.option_intercaler_descriptions = True elif option == '-j': self.option_limite_en_jours = max(1, float(valeur)) elif option == '-m': self.option_largeur_libelle = max(8, int(valeur)) elif option == '-n': self.option_nombre_limite = max(1, int(valeur)) elif option == '-o': self.option_y_compris_termine = True elif option == '-r': self.option_retarder_depart = float(valeur) elif option == '-s': self.option_sans_description = True elif option == '-t': self.option_tableau_des_attributions = True elif option == '-v': self.option_verbeuse = True elif option == '-w': self.option_nombre_maximal_de_colonnes = int(valeur) if not (self.option_diagramme_activites or self.option_diagramme_executants or self.option_tableau_des_attributions): self.option_diagramme_activites = True if self.option_limite_en_jours is not None: self.tranche_limite = int(math.ceil(self.option_limite_en_jours * self.tranches_par_jour)) # Produire les divers résultats. # `run.projet' donne accès à AUJOURDHUI et EXÉCUTANTS. self.projet = Projet(*arguments) self.projet.ordonnancer() if self.option_tableau_des_attributions: self.projet.produire_tableau_attributions(sys.stdout.write) if self.option_diagramme_executants: self.projet.produire_diagramme(sys.stdout.write, False) if self.option_diagramme_activites: self.projet.produire_diagramme(sys.stdout.write, True) gabarit_nombre = r'\.[0-9]+|[0-9]+\.?[0-9]*' class Projet: def __init__(self, *fichiers): # Activités constituant le projet. self.activites = [] # Couples [AVANT, APRÈS], où AVANT et APRÈS peuvent être des chaînes. # Utilisé uniquement durant le décodage du projet. self.preseances = [] # Exécutant pour chaque code. Ce dictionnaire est conservé # d'un fichier Allout au suivant, et garantit que tous les # exécutants partageant un code partagent aussi le diagramme. self.executant_global = {} # Exécutant pour chaque code. Ce dictionnaire est vidé au # début de la lecture de chaque fichier Allout, il sert à # résoudre les références entre activités et exécutants. self.executant_local = {} # Date de départ du calendrier (celle dont la tranche sera 0). # Le calendrier démarre toujours à l'heure de minuit. self.depart = datetime.datetime.combine( (datetime.datetime.today() + datetime.timedelta(run.option_retarder_depart)), datetime.time()) # Dates représentant les congés. self.conges = [] # Tranche correspondant au moment de l'exécution du programme. # Inutilisé pendant le décodage du projet. self.tranche_maintenant = None # Liste des tranches utilisées dans les diagrammes, afin de sauter # plus rapidement les blocs ne contenant pas d'information utile. self.tranches_utilisees = set() # Digérer la description du projet. if len(fichiers) < 2: if len(fichiers) == 0: if run.option_verbeuse: sys.stderr.write(u"Lecture de ...\n") structure = allout.read() run.titres.append(structure[0] or '') else: if run.option_verbeuse: sys.stderr.write(u"Lecture de %s...\n" % fichiers[0]) structure = allout.read(fichiers[0]) run.titres.append(structure[0] or fichiers[0]) activites = self.digerer(structure) if run.option_nombre_limite is not None: activites = activites[:run.option_nombre_limite] else: # Lorsque plusieurs projets, lister les activités avec # parcours en largeur, mais sans étirement des projets # moins fournis. paires = [] for compteur, fichier in enumerate(fichiers): if run.option_verbeuse: sys.stderr.write(u"Lecture de %s...\n" % fichier) structure = allout.read(fichier) run.titres.append(structure[0] or fichier) activites = self.digerer(structure, [fichier]) if run.option_nombre_limite is not None: activites = activites[:run.option_nombre_limite] self.executant_local = {} for ordinal_parcours, activite in enumerate(activites): paires.append(((ordinal_parcours, compteur), activite)) if run.option_verbeuse: sys.stderr.write(u"Parallélisation...\n") paires.sort() activites = [paire[1] for paire in paires] self.finaliser_projet(activites) def tranche_selon_date(self, date): return ((date - self.depart) * run.tranches_par_jour).days def date_selon_tranche(self, tranche, cache={}): if tranche not in cache: cache[tranche] = ( self.depart + datetime.timedelta(tranche / run.tranches_par_jour)) return cache[tranche] def digerer(self, structure, description=[], options_activite=None): # Cette fonction décode les activités et les exécutants, et retourne # une liste d'activités avec parcours en largeur, avec étirement # automatique des sous-arbres moins profonds. if options_activite is None: options_activite = run.options_activite if isinstance(structure, list) and len(structure) == 1: structure = structure[0] if isinstance(structure, unicode): if structure in ('', 'vim:ft=allout', 'vim: ft=allout'): return [] fichier = structure if fichier[0] == '~': fichier = os.path.expanduser(fichier) if fichier[0] == '/' and os.path.isdir(fichier): fichiers = [] for base in os.listdir(fichier): if base.endswith('~'): continue fichiers.append(os.path.join(fichier, base)) structure = [structure] + fichiers else: options = dict(options_activite) if structure.startswith(': '): options['description'] = description if '=@' in structure: executant = self.digerer_executant( dict(run.options_executant), structure) code = executant.code self.executant_local[code] = executant if code in self.executant_global: executant.diagramme = ( self.executant_global[code].diagramme) else: self.executant_global[code] = executant return [] activite = self.digerer_activite(options, structure) if activite is None: return [] elif ' :: ' in structure: gauche, droite = structure.split(' :: ', 1) options['description'] = description + [gauche.rstrip()] activite = self.digerer_activite(options, ': ' + droite.lstrip()) activite.executants = self.executant_local.values() else: options['description'] = description + [structure] activite = apply(Activite, (), options) activite.executants = self.executant_local.values() if run.option_y_compris_termine or activite.fraction < 1.0: return [activite] return [] if structure[0]: if structure[0].startswith(': '): if '=@' in structure[0]: raise Erreur(u"Options implicites `%s' invalides." % structure[0]) options_activite = dict(options_activite) # DIGÉRER_ACTIVITÉ n'est exécutée que pour son effet de bord # sur OPTIONS_ACTIVITÉ, on ne retient pas l'activité créée. self.digerer_activite(options_activite, structure[0]) else: description = description + [structure[0]] activites = [] paires = [] for compteur, structure in enumerate(structure[1:]): activites = self.digerer(structure, description, options_activite) if activites: curseur = 0. facteur = 1. / len(activites) for activite in activites: paires.append((curseur, compteur, activite)) curseur += facteur paires.sort() return [paire[2] for paire in paires] def digerer_activite(self, options, texte): texte += ' ' conge = False preseances = [] match = re.match(':\s+', texte) while match.end() < len(texte): texte = texte[match.end():] match = re.match('=(\S+)\s+', texte) if match: options['code'] = match.group(1)[:run.option_largeur_libelle] continue match = re.match('(%s)\s+' % gabarit_nombre, texte) if match: options['duree'] = (float(match.group(1)) * run.tranches_par_jour) options['fixe'] = 0 continue match = re.match(('(%s)?\+(%s)?\s+' % (gabarit_nombre, gabarit_nombre)), texte) if match: if match.group(1): options['duree'] = (float(match.group(1)) * run.tranches_par_jour) else: options['duree'] = 0 if match.group(2): options['fixe'] = (float(match.group(2)) * run.tranches_par_jour) else: options['fixe'] = 0 continue match = re.match(r'\+(%s)?N\s+' % gabarit_nombre, texte) if match: if match.group(1): options['recurrence'] = (float(match.group(1)) * run.tranches_par_jour) else: options['recurrence'] = run.tranches_par_jour continue match = re.match( ('([<>]=?)?' '([0-9]{4}-[0-9]{2}-[0-9]{2})' '(\s+[0-9]{2}:[0-9]{2})?\s+'), texte) if match: comparateur = match.group(1) try: info = time.strptime(match.group(2), '%Y-%m-%d') except ValueError: raise Erreur(u"Date `%s' invalide." % match.group(2)) date = datetime.datetime(*info[:3]) if match.group(3): heure = match.group(3).lstrip() try: time.strptime(heure, '%H:%M') except ValueError: raise Erreur(u"Heure `%s' invalide." % heure) options['description'].append(heure) date += datetime.timedelta( bisect.bisect(run.points_de_tranche, heure) / run.tranches_par_jour) else: heure = None if date < self.depart and run.option_y_compris_termine: self.depart = date if comparateur == '>=': options['minimum'] = date elif comparateur == '>': if heure: delta = datetime.timedelta(1 / run.tranches_par_jour) else: delta = datetime.timedelta(1) options['minimum'] = date + delta elif comparateur == '<=': if heure: delta = datetime.timedelta(1 / run.tranches_par_jour) else: delta = datetime.timedelta(1) options['maximum'] = date - delta elif comparateur == '<': options['maximum'] = date else: options['minimum'] = date options['maximum'] = None continue match = re.match( ('(([0-9]{4}|\.\.\.\.)-([0-9]{2}|\.\.)-([0-9]{2}|\.\.))' '(\s+[0-9]{2}:[0-9]{2})?\s+'), texte) if match: try: annee = int(match.group(2)) except ValueError: annee = None try: mois = int(match.group(3)) except ValueError: mois = None try: jour = int(match.group(4)) except ValueError: jour = None try: datetime.date(annee or 2001, mois or 1, jour or 1) except ValueError: raise Erreur(u"Gabarit de date `%s' invalide." % match.group(1)) if match.group(5): heure = match.group(5).lstrip() options['description'].append(heure) delta = datetime.timedelta( bisect.bisect(run.points_de_tranche, heure) / run.tranches_par_jour) else: delta = datetime.timedelta() options['recurrence'] = annee, mois, jour, delta continue match = re.match('([<>])(\S+)\s+', texte) if match: if match.group(1) == '<': preseances.append([None, match.group(2)]) else: preseances.append([match.group(2), None]) continue match = re.match('@(\S*)\s+', texte) if match: responsable = match.group(1) if not responsable: conge = True continue executant = self.executant_local.get(responsable) if executant is None: raise Erreur(u"Responsable `%s' non déclaré." % responsable) options['responsable'] = executant continue match = re.match(r'\*(%s)\s+' % gabarit_nombre, texte) if match: fraction = float(match.group(1)) if 0. <= fraction <= 1.: options['fraction'] = fraction continue raise Erreur(u"Clause `%s' non reconnue." % texte) # Valider la déclaration de l'activité. activite = apply(Activite, (), options) if 'maximum' in options and options['maximum'] is None: activite.maximum = (activite.minimum + datetime.timedelta(activite.fixe)) if conge: if activite.minimum is None: raise Erreur(u"Date requise pour congé %s." % activite) if (activite.duree != 0 or activite.fixe != run.tranches_par_jour): raise Erreur(u"Durée invalide pour congé %s." % activite) self.conges.append(activite.minimum) return for preseance in preseances: for compteur in 0, 1: if preseance[compteur] is None: preseance[compteur] = activite self.preseances.append(preseance) activite.executants = self.executant_local.values() return activite def digerer_executant(self, options, texte): texte += ' ' match = re.match(':\s+', texte) while match.end() < len(texte): texte = texte[match.end():] match = re.match('=@(\S+)\s+', texte) if match: options['code'] = match.group(1)[:run.option_largeur_libelle] continue match = re.match('([-L][-M][-M][-J][-V][-S][-D])\s+', texte) if match: if match.group(1) != '-------': options['jours'] = [jour != '-' for jour in match.group(1)] continue match = re.match(r'/(%s)\s+' % gabarit_nombre, texte) if match: vitesse = float(match.group(1)) if vitesse > 0.: options['vitesse'] = vitesse continue raise Erreur(u"Clause `%s' non reconnue." % texte) return apply(Executant, (), options) def finaliser_projet(self, activites): if run.option_verbeuse: sys.stderr.write(u"Finalisation...\n") # Fixer le calendrier. self.tranche_maintenant = self.tranche_selon_date( datetime.datetime.today()) # Préparer un exécutant pour les activités qui n'en ont pas. options = dict(run.options_executant) options['description'] = [u"Exécutant"] executant = apply(Executant, (), options) self.executant_global[None] = executant # Finaliser les activités. codes = set() for compteur, activite in enumerate(activites): activite.ordinal_parcours = compteur if activite.code is not None: if activite.code in codes: raise Erreur(u"Activité `%s' répétée." % activite.code) codes.add(activite.code) if activite.minimum is None: activite.minimum = 0 else: activite.minimum = self.tranche_selon_date(activite.minimum) if activite.maximum is not None: activite.maximum = self.tranche_selon_date(activite.maximum) if not activite.executants: activite.executants.append(executant) self.activites.append(activite) for preseance in self.preseances: for compteur, libelle in enumerate(preseance): if isinstance(libelle, unicode): libelle = libelle[:run.option_largeur_libelle] for curseur in activites: if libelle == curseur.code: preseance[compteur] = curseur break else: activite = None for curseur in activites: description = curseur.description[-1] if (description[:run.option_largeur_libelle] == libelle): if activite: raise Erreur(u"Activité `%s' ambiguë." % libelle) activite = curseur if not activite: raise Erreur(u"Activité `%s' inconnue." % libelle) preseance[compteur] = activite if preseance[0].recurrence is not None: raise Erreur(u"Ne peut dépendre de `%s' récurrente." % preseance[0]) preseance[1].dependances.append(preseance[0]) self.activites.sort() def ordonnancer(self): if run.option_verbeuse: sys.stderr.write(u"Ordonnancement...\n") # Ordonnancer toutes les activités non récurrentes, tout en # préparant le corps des diagrammes. for activite in self.activites: if not activite.decide and activite.recurrence is None: activite.decider_cedule([]) # La limite du diagramme peut maintenant être déterminée. if run.tranche_limite is None: run.tranche_limite = 0 for activite in self.activites: if isinstance(activite.fin, int): run.tranche_limite = max(run.tranche_limite, activite.fin) # Ordonnancer toutes les activités récurrentes, toujours en # préparant le corps des diagrammes. for activite in self.activites: if not activite.decide and activite.recurrence is not None: activite.decider_cedule([]) # Ajouter les marques de dépendances entre activités. for apres in self.activites: if apres.debut is not None and apres.fraction < 1.: for avant in apres.dependances: if avant.debut is not None and avant.fraction < 1.: if apres.debut not in avant.diagramme: avant.diagramme[apres.debut] = '+' if avant.fin - 1 not in apres.diagramme: apres.diagramme[avant.fin - 1] = '+' def produire_tableau_attributions(self, write): if run.option_verbeuse: sys.stderr.write(u"Tableau des attributions...\n") write('\n' u"+-%s-+-%s-+-------+------------+\n" u"| %s | %s | J.-p. | Date fin |\n" u"+-%s-+-%s-+-------+------------+\n" % ('-' * run.option_largeur_libelle, '-' * run.option_largeur_libelle, u'Activité'.ljust(run.option_largeur_libelle), 'Responsable'.ljust(run.option_largeur_libelle), '-' * run.option_largeur_libelle, '-' * run.option_largeur_libelle)) total = 0. paires = [(unicode(activite), activite) for activite in self.activites if not (unicode(activite).startswith('%') or unicode(activite.responsable).startswith('%'))] paires.sort() for libelle, activite in paires: write('| %*s | %*s | %5.2f | %s |\n' % (-run.option_largeur_libelle, libelle, -run.option_largeur_libelle, unicode(activite.responsable), activite.duree + activite.fixe, self.date_selon_tranche(activite.fin - 1).isoformat())) total += activite.duree + activite.fixe write('+-%s-+-%s-+-------+------------+\n' '| %s | %s | %5.2f | |\n' '+-%s-+-%s-+-------+------------+\n' % ('-' * run.option_largeur_libelle, '-' * run.option_largeur_libelle, 'Total'.ljust(run.option_largeur_libelle), ' ' * run.option_largeur_libelle, total, '-' * run.option_largeur_libelle, '-' * run.option_largeur_libelle)) def produire_diagramme(self, write, mode_activites): if run.option_verbeuse: if mode_activites: sys.stderr.write(u"Diagramme des activités...\n") else: sys.stderr.write(u"Diagramme des exécutants...\n") # Possiblement produire les titres avant le diagramme. if not (run.option_sans_description or run.option_intercaler_descriptions): write('\n') for titre in run.titres: write('%s\n' % titre) # Planifier l'ordre des lignes et le libellé pour chacune. triplets = [] if mode_activites: largeur_ordinal = len(unicode(Activite.ordinal_creation)) for activite in self.activites: code = activite.code or '%.*d' % (largeur_ordinal, activite.ordinal_creation) if code.startswith('%'): continue if activite.debut is None: if activite.maximum is not None: triplets.append(( (activite.maximum, activite.maximum, activite.ordinal_creation, activite.maximum), code, activite)) else: triplets.append(( (activite.debut, activite.fin, activite.ordinal_creation, activite.maximum), code, activite)) else: for executant in self.executant_global.itervalues(): if executant.code.startswith('%'): continue triplets.append((executant.code, executant.code, executant)) if not triplets: sys.stdout.write(u"Diagramme vide.\n") return triplets.sort() def imprimer_en_tete(jour): write('\n' '+-%s-+-%s-+\n' % ('-' * largeur_libelle, ''.join(regle))) for compteur, (chiffre, date) in enumerate(zip( unicode(self.date_selon_tranche(tranche_debut).year), dates)): if jour is not None and compteur == 1: marge = (semaine[(self.depart + datetime.timedelta(jour)) .weekday()] [:largeur_libelle-2]) else: marge = '' write('| %*s%s | %s |\n' % (-(largeur_libelle - 1), marge, chiffre, ''.join(date))) write('+-%s-+-%s-+\n' % ('-' * largeur_libelle, ''.join(regle))) def imprimer_pied(): write('+-%s-+-%s-+\n' % ('-' * largeur_libelle, ''.join(regle))) def imprimer_descriptions(libelles): if run.option_sans_description: return libelles = list(libelles) libelles.sort() for libelle in libelles: liste = description[libelle] write('\n' '%*s %s\n' % (-largeur_libelle, libelle, liste[0])) marge = largeur_libelle + 2 for texte in liste[1:]: marge += 4 write('%s%s\n' % (' ' * marge, texte)) # Planifier le découpage d'un diagramme en blocs suffisamment petits. largeur_libelle = max([len(triplet[1]) for triplet in triplets]) largeur_en_tranches = ((run.option_nombre_maximal_de_colonnes - largeur_marges - largeur_libelle) // run.tranches_par_jour * run.tranches_par_jour) if largeur_en_tranches < 10: raise Erreur( u"Option `-mN' ou `-tN' trop grande, ou `-wN' trop petite.") # Imprimer les blocs du diagramme, tour à tour, chacun allant de # sons propre TRANCHE_DÉBUT à son propre TRANCHE_FIN. description = {} libelles = set() for tranche_debut in range(0, run.tranche_limite, largeur_en_tranches): tranche_fin = min(tranche_debut + largeur_en_tranches, run.tranche_limite) # Sauter les blocs ne contenant que des tranches inutilisées. for tranche in range(tranche_debut, tranche_fin): if tranche in self.tranches_utilisees: break else: continue # Ne garder que les lignes donnant une information utile. donnees = [] for cle, libelle, objet in triplets: jour = None diagramme = objet.diagramme if mode_activites: debut, fin, ordinal_creation, maximum = cle if debut is not None: jour = debut // run.tranches_par_jour if maximum is not None: if debut is None: if tranche_fin == run.tranche_limite: description[libelle] = objet.description donnees.append((None, libelle, diagramme, 'droite')) continue elif fin > maximum: if (fin > run.tranche_limite and tranche_fin == run.tranche_limite): description[libelle] = objet.description donnees.append((jour, libelle, diagramme, 'droite')) continue if fin < 0 and tranche_debut == 0: description[libelle] = objet.description donnees.append((jour, libelle, diagramme, 'gauche')) continue for tranche in range(tranche_debut, tranche_fin): if tranche in diagramme: description[libelle] = objet.description donnees.append((jour, libelle, diagramme, None)) break # Préparer le fond du diagramme et les dates verticales. regle = [] fond1 = [] fond2 = [] dates = [], [], [], [] mois_precedent = None for tranche in range(tranche_debut, tranche_fin): if tranche % run.tranches_par_jour == 0: date = self.date_selon_tranche(tranche) if tranche == self.tranche_maintenant: regle.append(':') fond1.append(':') fond2.append(':') elif date.weekday() == 6: regle.append(u'·') fond1.append(u'·') fond2.append(' ') else: regle.append('-') fond1.append(' ') fond2.append(' ') mois = '%.2d' % date.month if mois == mois_precedent: dates[0].append(' ') dates[1].append(' ') else: mois_precedent = mois dates[0].append(mois[0]) dates[1].append(mois[1]) quantieme = '%.2d' % date.day dates[2].append(quantieme[0]) dates[3].append(quantieme[1]) else: regle.append('-') fond1.append(' ') fond2.append(' ') dates[0].append(' ') dates[1].append(' ') dates[2].append(' ') dates[3].append(' ') # Imprimer le diagramme proprement dit. jour_courant = donnees[0][0] imprimer_en_tete(jour_courant) for compteur, (jour, libelle, diagramme, defoncer) in ( enumerate(donnees)): if jour != jour_courant: if run.option_intercaler_descriptions: if libelles: imprimer_pied() imprimer_descriptions(libelles) libelles = set() imprimer_en_tete(jour) jour_courant = jour libelles.add(libelle) if compteur % 3 == 2: fond = fond1 else: fond = fond2 if defoncer == 'gauche': write('| %*s@@ ' % (-largeur_libelle, libelle)) defoncer = None else: write('| %*s | ' % (-largeur_libelle, libelle)) for tranche in range(tranche_debut, tranche_fin): symbole = diagramme.get(tranche) if symbole is None: write(fond[tranche-tranche_debut]) else: if symbole == '@': defoncer = None write(diagramme[tranche]) if defoncer == 'droite': write(' @@\n') else: write(' |\n') if libelles: imprimer_pied() if run.option_intercaler_descriptions: imprimer_descriptions(libelles) libelles = set() imprimer_descriptions(libelles) class Activite: # Compteur de structures `Activite', dans l'ordre de création. ordinal_creation = 0 # Compteur de structures `Activite', dans l'ordre idéal de parcours. ordinal_parcours = 0 # Description, sous la forme d'une liste télescopée. description = None # Court code servant de référence. code = None # Tranche avant laquelle l'activité ne peut commencer. minimum = None # Tranche avant laquelle l'activité doit finir. maximum = None # Durée en tranches temps plein. duree = 0 # Temps fixe à ajouter à la durée. fixe = 0 # Fraction réalisée de l'activité. fraction = 0 # Liste d'exécutants possibles pour cette activité. executants = None # Responsable choisi pour l'activité. responsable = None # Liste d'activités devant être complétées d'abord. dependances = None # Si flottant, période de récurrence exprimée en tranches. # Sinon, tuple (ANNÉE, MOIS, JOUR, DELTA), les trois premiers # étant un entier ou None (qui signifie que toute valeur est # acceptable), le dernier étant un timedelta représentant l'heure. recurrence = None # Si les valeurs de DÉBUT et FIN sont finales (None si impossible). decide = False # Tranche du début de l'ordonnancement, une fois décidé. debut = None # Tranche de fin + 1 de l'ordonnancement, une fois décidé. fin = None # Symbole dans le diagramme de Gantt en fonction de la tranche. diagramme = None def __init__(self, **definitions): Activite.ordinal_creation += 1 self.ordinal_creation = Activite.ordinal_creation for nom, valeur in definitions.iteritems(): setattr(self, nom, valeur) self.dependances = [] self.diagramme = {} def __unicode__(self): if self.code: return self.code return self.description[-1][:run.option_largeur_libelle] def __cmp__(self, other): # Passent d'abord: les échéances les plus proches, les dépendances les # plus nombreuses, les délais les plus petits, l'ordre de parcours # parallélisé. Une échéance absente équivaut à une échéance infinie. # Un délai absent est considéré nul. return (cmp(self.maximum or sys.maxint, other.maximum or sys.maxint) or cmp(len(self.dependances), len(other.dependances)) or cmp(self.minimum, other.minimum) or cmp(self.ordinal_parcours, other.ordinal_parcours)) def decider_cedule(self, incomplets): # S'assurer que les dépendances sont satisfaites. debut = self.minimum incomplets.append(self) try: for dependance in self.dependances: if dependance in incomplets: raise Erreur( u"Boucle de dépendances parmi: %s." % ', '.join([unicode(activite) for activite in incomplets])) if dependance.debut is None: dependance.decider_cedule(incomplets) if dependance.debut is not None: debut = max(debut, dependance.fin) finally: incomplets.pop() self.decide = True # Allouer des tranches de temps pour cette activité. if incomplets or self.maximum is not None: limite = None else: limite = run.tranche_limite if self.duree == 0 and self.fixe == 0: # Marquer l'occupation sur le diagramme d'activités seulement. for point in self.chaque_occurrence(debut, limite): self.debut = self.fin = point self.diagramme[point] = 'o' run.projet.tranches_utilisees.add(point) else: if run.option_y_compris_termine: tranches = self.ceduler(debut, limite, self.duree, self.fixe) tranches_faites = int(math.floor(len(tranches) * self.fraction)) else: # Céduler, ne serait-ce que pour assigner un exécutant. tranches = self.ceduler(max(0, debut), limite, self.duree * (1. - self.fraction), self.fixe * (1. - self.fraction)) tranches_faites = 0 if not tranches: return # Marquer l'occupation sur les deux diagrammes. for diagramme in self.diagramme, self.responsable.diagramme: if len(tranches) == 1 and self.fraction == 0.: diagramme[tranches[0]] = 'O' run.projet.tranches_utilisees.add(debut) else: for tranche in tranches: diagramme[tranche] = '-' run.projet.tranches_utilisees.add(tranche) diagramme[tranches[-1]] = ')' if self.fraction == 0.: diagramme[tranches[0]] = '(' for compteur, tranche in enumerate(tranches): if compteur < tranches_faites: diagramme[tranche] = '*' elif self.maximum is not None and tranche >= self.maximum: diagramme[tranche] = '@' def ceduler(self, debut, limite, duree, fixe): # Céduler une activité ne débutant pas avant la tranche # DÉBUT, se terminant avant la tranche LIMITE (pas de limite # si None), et qui consomment DURÉE tranches au total pour # un exécutant pleinment disponible, et FIXE tranches # indépendamment de la vitesse de l'exécutant. Si la cédule # n'est pas possible, retourne None. Autrement, fixe DÉBUT et # FIN, et retourne la liste des tranches cédulées. if self.responsable is None: executants = self.executants else: executants = [self.responsable] # Trouver un exécutant et la meilleure affectation. Une # affectation est meilleure si elle se termine plus tôt, ou # sinon, si elle débute plus tôt. possibilites = [] for executant in executants: if limite is None: tranches = executant.ceduler( self.chaque_occurrence(debut, None), limite, duree, fixe) else: tranches = executant.ceduler( self.chaque_occurrence(debut, limite - fixe), limite, duree, fixe) if tranches: possibilites.append((tranches[-1] + 1, tranches[0], tranches, executant)) if possibilites: possibilites.sort() self.fin, self.debut, tranches, self.responsable = possibilites[0] return tranches def chaque_occurrence(self, debut, limite): # Fournir chaque tranche de début d'une nouvelle occurrence # pour l'activité, comprise entre les tranches DÉBUT et # LIMITE, en fonction de la formule de récurrence prévue. def avancer_date(date): # Avancer DATE, qui respecte déjà le gabarit de date, # vers une autre valeur qui respecte elle aussi le gabarit. # Retourner cette valeur, ou None si ça n'est pas possible. if jour is None: try: return date.replace(day=date.day+1) except ValueError: date = date.replace(day=1) if mois is None: month = date.month while month < 12: month += 1 try: return date.replace(month=month) except ValueError: pass date = date.replace(month=1) if annee is None: return date.replace(year=date.year+1) if self.recurrence is None: # Pas de récurrence. yield debut elif isinstance(self.recurrence, float): # Récurrence à intervalle fixe. while debut < limite: # Céduler une occurrence. yield debut debut += self.recurrence else: # Récurrence avec gabarit de date. annee, mois, jour, delta = self.recurrence # - Trouver la date égale ou future la plus proche. date = run.projet.depart + delta if mois is None and jour is None: if annee is not None and annee < date.year: return while date < run.projet.depart: date = avancer_date(date) if date is None: return elif mois is not None and jour is not None: # REVOIR: `....-02-29' faillit sur une année non-bissextile. if mois < date.month or mois == date.month and jour < date.day: date = datetime.datetime(date.year + 1, mois, jour) + delta else: date = datetime.datetime(date.year, mois, jour) + delta else: date = (datetime.datetime( annee or date.year, mois or 1, jour or 1) + delta) while date < run.projet.depart: date = avancer_date(date) if date is None: return # - Céduler des occurrences tant que c'est possible. while date is not None: debut = run.projet.tranche_selon_date(date) if debut > limite: break yield debut date = avancer_date(date) class Executant: # Nom de l'individu ou de l'organisation. description = None # Court code servant de référence. code = None # Vitesse de cet exécutant, en unités de travail par tranche de temps. vitesse = 1. # Liste des jours de la semaine où cet exécutant travaille. jours = None # Symbole dans le diagramme de Gantt en fonction de la tranche. diagramme = None # Prochaine tranche pour laquelle on ne sait pas encore si l'exécutant est # disponible. suivante = 0 # Ensemble de tranches qui sont connues disponibles pour cet exécutant. disponibles = None def __init__(self, **definitions): for nom, valeur in definitions.iteritems(): setattr(self, nom, valeur) self.diagramme = {} self.disponibles = set() def __unicode__(self): if self.code: return self.code return self.description[-1][:run.option_largeur_libelle] def __cmp__(self, other): return cmp(self.description, other.description) def ceduler(self, occurrences, limite, duree, fixe): # Trouver du temps libre pour que cet exécutant puisse # accomplir toutes les occurrences de l'activité. Chaque # occurrence ne peut pas débuter avant la tranche fournie par # l'itérateur OCCURRENCES, elle doit se terminer avant la # tranche LIMITE (pas de limite si None), elle demande DURÉE # tranches de temps à un exécutant dont la VITESSE serait 1., # et FIXE tranches supplémentaires indépendamment de toute # vitesse. Retourne la liste des tranches proposées (chacune # étant soit un entier, soit un 1-tuple d'un entier donnant # la position d'une tranche sans longueur), ou encore None si # aucune cédule n'est possible. requis = int(math.ceil(duree / self.vitesse)) + fixe toutes_tranches = [] for debut in occurrences: tranches = [] curseur = debut while len(tranches) < requis: if limite is not None and curseur >= limite: tranches = None break while self.suivante <= curseur: date = run.projet.date_selon_tranche(self.suivante) if (date not in run.projet.conges and self.jours[date.weekday()]): self.disponibles.add(self.suivante) self.suivante += 1 if curseur in self.disponibles: if curseur in self.diagramme: if run.option_allocation_contigue: for tranche in tranches: self.disponibles.add(tranche) tranches = [] else: self.disponibles.remove(curseur) tranches.append(curseur) curseur += 1 if tranches: toutes_tranches += tranches else: for tranche in toutes_tranches: self.disponibles.add(tranche) return return toutes_tranches run = Main() main = run.main if __name__ == '__main__': if False: import profile, pstats profile.run('main(*sys.argv[1:])', 'profile-data') stats = pstats.Stats('profile-data') stats.strip_dirs().sort_stats('time', 'cumulative').print_stats(10) else: main(*sys.argv[1:])