set job.name 'nmf' REGISTER udfs-0.0.1.jar; DEFINE MULTIPLY_VECTOR com.adform.pig.udfs.MultiplyVector(); DEFINE MULTIPLY_VECTOR_BY_MATRIX com.adform.pig.udfs.MultiplyVectorByMatrix(); DEFINE DIVIDE_VECTOR com.adform.pig.udfs.DivideVector(); DEFINE SUM_VECTOR com.adform.pig.udfs.SumVector(); DEFINE MULTIPLY_VECTOR_BY_TRANSPOSE com.adform.pig.udfs.MultiplyVectorByTranspose(); DEFINE SUM_MATRIX com.adform.pig.udfs.SumMatrix(); ----------------------------------------------------------------------------------------- -- Load data. ----------------------------------------------------------------------------------------- A = LOAD 'A.txt' AS (r:long, c:long, v:double); W = LOAD 'W.txt' as (i:long, row:bag{val: tuple(value: double)}); H = LOAD 'H.txt' as (j:long, column:bag{val: tuple(value: double)}); ----------------------------------------------------------------------------------------- -- Compute update to H. ----------------------------------------------------------------------------------------- -- Compute W transpose * W; ww_summed = FOREACH W GENERATE i, MULTIPLY_VECTOR_BY_TRANSPOSE(row, row) AS matrix; ww_groupped = GROUP ww_summed ALL; WW = FOREACH ww_groupped GENERATE SUM_MATRIX(ww_summed.matrix) AS matrix; -- Compute Y for updating H. y_crossed = CROSS H, WW; Y = FOREACH y_crossed GENERATE j AS j, MULTIPLY_VECTOR_BY_MATRIX(column, matrix) AS column; -- Compute X for updating H. aw_joined = JOIN A by r, W by i; aw_multiplied = FOREACH aw_joined GENERATE r, c, MULTIPLY_VECTOR(row, v) AS column; aw_groupped = GROUP aw_multiplied BY c; X = FOREACH aw_groupped GENERATE group AS j, SUM_VECTOR(aw_multiplied.column) AS column; -- Compute the new H. h_joined = JOIN H BY j, X BY j, Y BY j; H_new = FOREACH h_joined GENERATE H::j AS j, MULTIPLY_VECTOR(H::column, DIVIDE_VECTOR(X::column, Y::column)) AS column; ----------------------------------------------------------------------------------------- -- Compute update to W. ----------------------------------------------------------------------------------------- -- Compute H * H transpose. hh_summed = FOREACH H_new GENERATE j, MULTIPLY_VECTOR_BY_TRANSPOSE(column, column) AS matrix; hh_grouped = GROUP hh_summed ALL; HH = FOREACH hh_grouped GENERATE SUM_MATRIX(hh_summed.matrix) AS matrix; -- Compute X for updating W. ah_joined = JOIN A BY c, H_new BY j; ah_multiplied = FOREACH ah_joined GENERATE r, c, MULTIPLY_VECTOR(column, v) AS row; ah_groupped = GROUP ah_multiplied BY r; hX = FOREACH ah_groupped GENERATE group AS i, SUM_VECTOR(ah_multiplied.row) AS row; -- Compute Y for updating W. hy_crossed = CROSS W, HH; hY = FOREACH hy_crossed GENERATE i, MULTIPLY_VECTOR_BY_MATRIX(row, matrix) AS row; -- Compute the new W. w_joined = JOIN W BY i, hX BY i, hY BY i; W_new = FOREACH w_joined GENERATE W::i AS i, MULTIPLY_VECTOR(W::row, DIVIDE_VECTOR(hX::row, hY::row)) AS row;