#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright © 1991-92, 94, 96-00, 02 Progiciels Bourbeau-Pinard inc. # François Pinard , March 1992-03. # 1989-03: tried an sh script to convert between message formats. # 1991-08: Perl version to convert rn, UNIX mail and Babyl to Babyl. # 1992-03: added UNIX mail format output. # 1994-07: added many inputs to a single output, added summary listing. # 1999-03: added printing option. # 1999-09: converted from Perl to Python. # 2002-05: get rid of state automaton, save Babyl real headers. # 2003-12: allow buffering one message in memory, use generators. # 2005-09: restore @ and . from at and dot. u"""\ Append messages or Usenet articles to a Babyl or UNIX mail file. Somehow handle slightly mangled input or output files. Usage: babyl [ OPTION ] ... [INPUT] ... Options: -b OUTPUT Convert all INPUT files into Babyl format on OUTPUT. -m OUTPUT Convert all INPUT files into Unix mailbox format on OUTPUT. -p Print all messages of a mail folder through Enscript. -c Clean all \\201 bytes, as from Emacs Mule. -s Produce one summary line per input message. -n Avoid numbering summary lines, to ease later diff. -v Produce progress messages and dots on stderr. Options -b, -m, -p or -s may be repeated. If none, -s is assumed. Examples: `babyl -m MBOX-OUTPUT BABYL-INPUT' will convert a Babyl file into an mbox-format file; `babyl -b BABYL-OUTPUT BABYL-INPUT' will either merge a Babyl input file into/after a Babyl output file, or reconstruct a good Babyl file out of a damaged one. To get an inventory of some folder file, merely use `babyl -s FOLDER'. """ __metaclass__ = type import os, re, sys def decoded_header(texte): try: from email.header import decode_header except ImportError: from email.Header import decode_header try: result = '' for fragment, charset in decode_header(texte): if result: result += ' ' if charset is None: result += unicode(fragment) else: result += unicode(fragment, charset) return result except UnicodeDecodeError: return unicode(texte, 'ISO-8859-1') class Main: # Program options. clean_mule = False force = False nonumber = False verbose = False # Various patterns. from_ = 'From [^ ]+( at [^ ]+)?' remote = 'remote from [^ ]+' day = '(Mon|Tue|Wed|Thu|Fri|Sat|Sun)' date = '([ 0-2]?[0-9]|3[01])' month = '(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)' year = '((19)?[7-9][0-9]|(20)?0[0-9])' time = '([ 0-1][0-9]|2[0-3]):[0-5][0-9](:[0-5][0-9])?' zone = ('([A-Z][A-Z]T|[a-z][a-z]t|UT' '|[-+][0-1][0-9][0-5][0-9]( \\([CM]?[A-Z][A-Z]T\\))?)') # Unix mailbox envelopes. MATCH_1 = re.compile('\37?%s +%s %s %s %s( %s)? %s( %s)?$' % ( from_, day, month, date, time, zone, year, remote)).search MATCH_2 = re.compile('\37?%s +%s, %s %s %s %s( %s)?( %s)?$' % ( from_, day, date, month, year, time, zone, remote)).search MATCH_3 = re.compile('\37?%s-%s-%s %s-%s,[0-9]+;[0-9]+$' % ( date, month, year, time, zone)).search MATCH_4 = re.compile('\37?From .* %s %s %s %s %s$' % ( day, month, date, time, year)).search # Babyl separator. MATCH_5 = re.compile('\37\f?$').match # MMDF separator. MATCH_6 = re.compile('\1' * 20 + '$').match # News article envelope. MATCH_7 = re.compile('#! rnews [0-9]+$').match MATCH_8 = re.compile('\37?Article [0-9]+ of [^ ]+:$').match del from_, remote, day, date, month, year, time, zone def __init__(self): self.outputs = [] self.newline_needed = False self.file_name = None def main(self, *arguments): if not arguments: sys.stdout.write(__doc__) return outputs = [] import getopt options, arguments = getopt.getopt(arguments, 'b:cm:npsv') for option, value in options: if option == '-b': outputs.append((BabylOutput, value)) elif option == '-c': self.clean_mule = True elif option == '-m': outputs.append((MboxOutput, value)) elif option == '-n': self.nonumber = True elif option == '-p': outputs.append((PrintOutput, None)) elif option == '-s': outputs.append((SummaryOutput, None)) elif option == '-v': self.verbose = True if not outputs: outputs.append((SummaryOutput, None)) for maker, value in outputs: self.outputs.append(maker(value)) for input_name in arguments: self.process_file(input_name) def process_file(self, input_name): if self.verbose: write('Reading %s: ' % input_name) for output in self.outputs: output.start_input_file(input_name) message_counter = 0 for message in self.all_messages(input_name): for output in self.outputs: output.process_message(message) message_counter += 1 if self.verbose: self.write('.') if self.verbose: self.write('done (%d messages)\n' % message_counter) def all_messages(self, input_name): def prefix_of_delimiter(line): # Returns None if not a delimiter, or the string in the line # before the delimiter (which may be the empty string). match = (Main.MATCH_1(line) or Main.MATCH_2(line) or Main.MATCH_3(line) or Main.MATCH_4(line) or Main.MATCH_5(line) or Main.MATCH_6(line) or Main.MATCH_7(line) or Main.MATCH_8(line)) if match is not None: prefix = line[:match.start()] if prefix.startswith(""'>'): return None if prefix.rstrip().lower() in ('mail-from:',): return None return prefix def is_babyl(headers, line): if headers and headers[0] in ('BABYL OPTIONS:\n', 'BABYL OPTIONS: -*- rmail -*-\n'): if line not in (""'\37', ""'\37\f\n'): self.report(u"Invalid Babyl terminator.") return True return False def clean_message(): if which is headers: self.report(u"Message ended within headers.") if babyl_format: if body and body[0] == '*** EOOH ***\n': del body[0] if headers and headers[0] and headers[0][0] in '01': del headers[0] else: print repr(headers) self.report(u"Invalid Babyl status header.") else: try: index = headers.index('*** EOOH ***\n') except ValueError: self.report(u"Missing EOOH separator in Babyl file.") index = None if headers[0].startswith('0'): if index is None: del headers[0] else: del headers[:index+1] elif headers[0].startswith('1'): if index is not None: del headers[index:] del headers[0] else: self.report(u"Invalid Babyl status header.") for counter, line in enumerate(headers): headers[counter] = (line.replace(""' at ', ""'@') .replace(""' dot ', ""'.')) if self.clean_mule: for counter, line in enumerate(body): body[counter] = line.replace(""'\201', '') self.input_name = input_name self.line_counter = 0 message_count = 0 babyl_format = False start_line = 1 which = headers = [] body = [] for line in file(input_name, 'rb'): self.line_counter += 1 incomplete_line = prefix_of_delimiter(line) if incomplete_line is not None: if incomplete_line: self.report(u"Incomplete line.") which.append(incomplete_line + '\n') if headers or body: if is_babyl(headers, line): if message_count > 0: self.report(u"Babyl header not at beginning·") babyl_format = True else: clean_message() message_count += 1 yield start_line, headers, body start_line = self.line_counter + 1 which = headers = [] body = [] continue if which is headers and line == ""'\n': which = body continue if line in (""'\37', ""'\37\f\n'): if not babyl_format: self.report(u"Unexpected Babyl terminator.") babyl_format = True which.append(line) if headers or body: if is_babyl(headers, line): if message_count > 0: self.report(u"Babyl header not at beginning·") return clean_message() yield start_line, headers, body ## Feedback to user. def report(self, text): if self.newline_needed: sys.stderr.write('\n') if self.input_name: self.write('%s:%d: %s\n' % (self.input_name, self.line_counter, text)) else: self.write('%s\n' % text) def write(self, text): sys.stderr.write(text) self.newline_needed = text[-1] != '\n' class Output: def __init__(self, output_name): pass def start_input_file(self, input_name): pass class SummaryOutput(Output): MONTHS = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12} def start_input_file(self, input_name): sys.stdout.write('\n%s\n' % input_name) self.message_count = 0 def process_message(self, (start, headers, body)): self.message_count += 1 originator = '-----' date = '---- -- --' subject = '(none)' try: from email.utils import parsedate_tz except ImportError: from email.Utils import parsedate_tz for line in headers: match = re.match('From:[ \t]*(.*)', line, re.I) if match: line = match.group(1) match = re.match('([^<]*)<(.*)>', line, re.I) if match: name, originator = match.group(1, 2) else: match = re.match(r'([^\(]*)\((.*)\)', line, re.I) if match: originator, name = match.group(1, 2) else: originator, name = line, '' match = re.match(' *"(.*)" *$', name) if match: name = match.group(1) originator = decoded_header(name or originator) originator = re.sub(' +', ' ', originator.strip()) continue match = re.match( 'Date:[ \t]*((19|20)[0-9][0-9]-[01][0-9]-[0-3][0-9])', line, re.I) if match: date = match.group(1) continue match = re.match('Date:[ \t]*(.*)', line, re.I) if match: date = parsedate_tz(match.group(1)) if date is None: date = '....-..-..' else: date = '%.4d-%.2d-%.2d' % date[:3] continue match = re.match('Subject:[ \t]*(.*)', line, re.I) if match: subject = decoded_header(match.group(1)) subject = re.sub(' +', ' ', subject.strip()) continue if run.nonumber: line = ' %s %s: %s' % (date, originator, subject) else: line = ('%2d. %s %s: %s' % (self.message_count, date, originator, subject)) sys.stdout.write('%s\n' % line[0:79].rstrip()) class BabylOutput(Output): def __init__(self, output_name): if os.path.isfile(output_name): self.write = file(output_name, 'ab').write else: self.write = file(output_name, 'wb').write if run.verbose: feedback.write('making new Babyl file: %s\n' % self.output_name) self.write('BABYL OPTIONS: -*- rmail -*-\n' 'Version: 5\n' 'Labels:\n' 'Note: This is the header of an rmail file.\n' 'Note: If you are seeing it in rmail,\n' 'Note: it means the file has no messages in it.\n' '\37') def process_message(self, (start, headers, body)): write = self.write write('\f\n' '0, unseen,,\n' '*** EOOH ***\n') for line in headers: write(line) write('\n') for line in body: write(line) if body and not body[-1].endswith('\n'): write('\n') write('\37') class MboxOutput(Output): def __init__(self, output_name): if os.path.isfile(output_name): self.write = file(output_name, 'ab').write else: self.write = file(output_name, 'wb').write def process_message(self, (start, headers, body)): write = self.write # Process headers. originator = 'unknown' import time date = None for line in headers: match = re.match('X-From-Line:[ \t]*([^ ]*)', line, re.I) if match: text = match.group(1) if '@' in text and '@' not in originator: originator = text continue match = re.match('From:[ \t]*(.*)', line, re.I) if match: text = match.group(1) match = re.search('<(.+)>', text) if match: text = match.group(1) text = re.sub(r' *\(.*\) *', '', text, 1) text = re.sub(' *".*" *', '', text, 1) if '@' in text and '@' not in originator: originator = text continue match = re.match('Date:[ \t]*(.*)', line, re.I) if match: try: from email.utils import parsedate except importError: from email.Utils import parsedate date = parsedate(match.group(1)) if date is None: try: date = time.strptime(match.group(1)[:19], '%Y-%m-%d %H:%M:%S') except ValueError: pass continue if date is None: date = time.localtime(time.time()) elif date[0] < 20: date = (date[0] + 2000,) + date[1:] elif date[0] < 200: date = (date[0] + 1900,) + date[1:] write('From %s %s\n' % (originator, time.asctime(date))) for line in headers: write(line) write('\n') # Process body. blank_lines = 0 for line in body: if re.match('_?From', line, re.I): write(""'>' + line) elif line == ""'\n': blank_lines += 1 elif blank_lines: write(""'\n' * blank_lines + line) blank_lines = 0 else: write(line) if body and not body[-1].endswith('\n'): write('\n') write('\n') class PrintOutput(Output): def __init__(self, output_name): assert output_name is None, output_name def start_input_file(self, input_name): self.input_name = input_name self.message_count = 0 def process_message(self, (start, headers, body)): self.message_count += 1 write = (os.popen(('enscript -G -Email -t"%s [%d.]"' % (self.input_name, self.message_count)), 'w') .write) # Process headers. while headers: line = headers[0] del headers[0] if line == 'X-Coding-System: nil\n': continue if re.match('X-UIDL:', line): continue if re.match('X-Face:', line): while headers and headers[0][0] == ' ': del headers[0] continue write(decoded_header(line)) write('\n') # Process body. for line in body: write(line) if body and not body[-1].endswith('\n'): write('\n') run = Main() main = run.main if __name__ == '__main__': main(*sys.argv[1:])