2009年9月21日月曜日

MapReduce を C# で 04

だいぶ修正。しかし、括弧は増えるばかり(泣)

/// <summary>
/// パイプライン実行を可能にするための列挙子。
/// </summary>
/// <remarks>IEnumerator は生成後のソース変更を許可しないため、このようなインターフェースを用意。</remarks>
/// <typeparam name="T"></typeparam>
public interface ParallelIterator<T>
: IEnumerator<T>, IDisposable, IEnumerator
{
bool TryNext(out T result);
new void Reset();
}

public interface ParallelIterable<T>
: IEnumerable<T>, IEnumerable
{
ParallelIterator<T> GetIterator();
}

public interface IParallelCollection<T>
: ParallelIterable<T>, IEnumerable<T>, IEnumerable
{
int Count { get; }
bool IsReadOnly { get; }
void Add(T item);
void Clear();
bool Contains(T item);
void CopyTo(T[] array, int arrayIndex);
bool Remove(T item);

/// <summary>
/// 更新終了。
/// </summary>
bool Fixed { get; set; }
}

public interface IParallelList<T>
: IParallelCollection<T>, ParallelIterable<T>, IEnumerable<T>, IEnumerable
{
T this[int index] { get; set; }
int IndexOf(T item);
void Insert(int index, T item);
void RemoveAt(int index);
}

public interface IParallelDictionary<TKey, TValue>
: IParallelCollection<KeyValuePair<TKey, TValue>>, ParallelIterable<KeyValuePair<TKey, TValue>>,
IEnumerable<KeyValuePair<TKey, TValue>>, IEnumerable
{
IParallelCollection<TKey> Keys { get; }
IParallelCollection<TValue> Values { get; }
TValue this[TKey key] { get; set; }
void Add(TKey key, TValue value);
bool ContainsKey(TKey key);
bool Remove(TKey key);
bool TryGetValue(TKey key, out TValue value);
}

public interface IMapper<TInKey, TInValue, TOutKey, TIntermediateValue>
{
IEnumerable<KeyValuePair<TOutKey, TIntermediateValue>> Map(TInKey inkey, TInValue inValue);
}

public interface IReducer<TOutKey, TIntermediateValue, TOutValue>
{
IEnumerable<TOutValue> Reduce(TOutKey outKey, ParallelIterable<TIntermediateValue> intermediateValues);
}

public interface IOperation
{
void Wait();
}

public interface ITaskQueue
{
IOperation AddTask(Delegate action, params object[] state);
}


public sealed class MapReduceManager<TInKey, TInValue, TOutKey, TIntermediateValue, TOutValue>
{
// 各実装に合わせた中間値保存、出力値保存用のストレージ取得処理。
public Func<IParallelCollection<TIntermediateValue>> NewIntermediateValueStorage { get; set; }
public Func<IParallelCollection<TOutValue>> NewOutValueStorage { get; set; }

// 各実装にあわせた MapReduce 処理と中間値参照用のリポジトリ。
public IMapper<TInKey, TInValue, TOutKey, TIntermediateValue> Mapper { get; set; }
public IReducer<TOutKey, TIntermediateValue, TOutValue> Reducer { get; set; }
public IParallelDictionary<TOutKey, IParallelCollection<TIntermediateValue>> Repository { get; set; }

// パイプライン実行のためのタスクキューおよび実行ハンドラ。
public ITaskQueue TaskQueue { get; set; }
public Func<IParallelCollection<IOperation>> NewIntermediateOperations { get; set; }
public Func<IParallelCollection<IOperation>> NewReducingOperations { get; set; }
public Func<IParallelCollection<IOperation>> NewOutValueOperations { get; set; }

public IEnumerable<TOutValue> DoProcess(TInKey inKey, TInValue inValue)
{
var outValues = this.NewOutValueStorage();

var intermediateOperations = this.NewIntermediateOperations();
var reducingOperations = this.NewReducingOperations();
var outValueOperations = this.NewOutValueOperations();

foreach (var intermediate in this.Mapper.Map(inKey, inValue))
{
intermediateOperations.Add(
this.TaskQueue.AddTask(
new Action(
() =>
{
if (!this.Repository.ContainsKey(intermediate.Key))
{
var intermediateValues = this.NewIntermediateValueStorage();
intermediateValues.Add(intermediate.Value);
this.Repository.Add(intermediate.Key, intermediateValues);

reducingOperations.Add(
this.TaskQueue.AddTask(
new Action(
() =>
{
foreach (var outValue in this.Reducer.Reduce(intermediate.Key,
this.Repository[intermediate.Key]))
{
outValueOperations.Add(
this.TaskQueue.AddTask(
new Action(
() =>
{
outValues.Add(outValue);
}
)
)
);
}
}
)
)
);
}
else
{
this.Repository[intermediate.Key].Add(intermediate.Value);
}
}
)
)
);
}

this.TaskQueue.AddTask(
new Action(
()=>
{
intermediateOperations.Fixed = true;
foreach (var operation in intermediateOperations)
{
operation.Wait();
}

this.Repository.Fixed = true;

this.Repository.Values.Fixed = true;
foreach (var value in this.Repository.Values)
{
value.Fixed = true;
}

reducingOperations.Fixed = true;
foreach (var operation in reducingOperations)
{
operation.Wait();
}

outValueOperations.Fixed = true;
foreach (var operation in outValueOperations)
{
operation.Wait();
}

outValues.Fixed = true;
}
)
);

return outValues;
}
}

2 件のコメント:

  1. Map 処理が列挙し終わらないと outValues が列挙開始できないのはいただけないかも。

    返信削除
  2. ParallelFor みたいなの作ったほうがすっきりするのかな。ちょっと置いとこ。

    返信削除