diff --git a/CODEOWNERS b/CODEOWNERS deleted file mode 100644 index e8c73d0fbac922f97580849a957eb7525be56518..0000000000000000000000000000000000000000 --- a/CODEOWNERS +++ /dev/null @@ -1,2 +0,0 @@ -* @suchenzang @stephenroller @ngoyal2707 @punitkoura @moyapchen @klshuster - diff --git a/metaseq/hub_utils.py b/metaseq/hub_utils.py index ecfbc0d46e8eb919f8d45d13c898490e544188b5..0ab10374868df0e88ab5cbe8b33c1daa9236b22b 100644 --- a/metaseq/hub_utils.py +++ b/metaseq/hub_utils.py @@ -210,7 +210,6 @@ class GeneratorHubInterface(nn.Module): # this is useful for determining the device self.register_buffer("_float_tensor", torch.tensor([0], dtype=torch.float)) - @property def device(self): return self._float_tensor.device @@ -305,16 +304,30 @@ class GeneratorHubInterface(nn.Module): else: # For Sequence Generator - for id, i in zip(batch["id"].tolist(), range(translations["tokens"].size(0))): + for id, i in zip( + batch["id"].tolist(), range(translations["tokens"].size(0)) + ): beams = [] for j in range(0, translations["tokens"].size(1)): - tokens, scores, distributions = GeneratorInterface._filter_special(self._pad_token_ind, self._special_token_inds, translations["tokens"][i][j], translations["scores"][i][j], distributions=None) - beams.append({ - "id": id, - "tokens": [x.item() for x in list(tokens)], - "positional_scores": [s.item() for s in list(scores)], - }) + ( + tokens, + scores, + distributions, + ) = GeneratorInterface._filter_special( + self._pad_token_ind, + self._special_token_inds, + translations["tokens"][i][j], + translations["scores"][i][j], + distributions=None, + ) + beams.append( + { + "id": id, + "tokens": [x.item() for x in list(tokens)], + "positional_scores": [s.item() for s in list(scores)], + } + ) results.append((id, beams)) @@ -694,8 +707,11 @@ class GeneratorInterface: prompt_len = lengths[i] tokens, scores, distributions = self._filter_special( - self._pad_token_ind, self._special_token_inds, - tokens, scores, distributions + self._pad_token_ind, + self._special_token_inds, + tokens, + scores, + distributions, ) if echo: diff --git a/metaseq/logging/meters.py b/metaseq/logging/meters.py index a392fe13d9c3d67394553b4949dd6edfb2404b80..d69bb1c5a061cd4672b2ed150e17b6d0c219ba81 100644 --- a/metaseq/logging/meters.py +++ b/metaseq/logging/meters.py @@ -63,6 +63,45 @@ def safe_round(number, ndigits): return number +class HistoryMeter(Meter): + """Keeps history and current value""" + + def __init__(self, round: Optional[int] = None): + self.round = round + self.reset() + + def reset(self): + self.history = [] # history from all updates + + def update(self, val): + assert type(val) is list + self.history.extend(val) + + def state_dict(self): + return { + "history": self.history, + "round": self.round, + } + + def load_state_dict(self, state_dict): + self.history = state_dict["history"] + self.round = state_dict.get("round", None) + + @property + def avg(self): + if type(self.history[0]) is tuple: + return sum([h for i, h in self.history]) + else: + return sum(self.history) + + @property + def smoothed_value(self) -> float: + val = self.avg + if self.round is not None and val is not None: + val = safe_round(val, self.round) + return val + + class AverageMeter(Meter): """Computes and stores the average and current value""" diff --git a/metaseq/logging/metrics.py b/metaseq/logging/metrics.py index 363ff4cf4e3ccf0c6307934f959c2db8d7b880b9..5b345ec71b6502bfff776f2648c265b554c41618 100644 --- a/metaseq/logging/metrics.py +++ b/metaseq/logging/metrics.py @@ -22,6 +22,7 @@ from .meters import ( OrderedDict, MetersDict, AverageMeter, + HistoryMeter, TimeMeter, StopwatchMeter, Meter, @@ -139,6 +140,26 @@ def log_scalar( agg[key].update(value, weight) +def log_history( + key: str, + value: list, + priority: int = 10, + round: Optional[int] = None, +): + """Log a scalar value. + + Args: + key (str): name of the field to log + value (list): value to log + priority (int): smaller values are logged earlier in the output + round (Optional[int]): number of digits to round to when displaying + """ + for agg in get_active_aggregators(): + if key not in agg: + agg.add_meter(key, HistoryMeter(round=round), priority) + agg[key].update(value) + + def log_derived(key: str, fn: Callable[[MetersDict], float], priority: int = 20): """Log a scalar value derived from other meters. diff --git a/metaseq/model_parallel/criterions/vocab_parallel_cross_entropy.py b/metaseq/model_parallel/criterions/vocab_parallel_cross_entropy.py index 3282208cd95a2d985a95fd266b15226d141e07f9..601989ebbf344f8b65f82b008340e586e3c9886e 100644 --- a/metaseq/model_parallel/criterions/vocab_parallel_cross_entropy.py +++ b/metaseq/model_parallel/criterions/vocab_parallel_cross_entropy.py @@ -9,7 +9,6 @@ import torch from metaseq import metrics, utils from metaseq.criterions import BaseCriterion, register_criterion - try: from megatron.mpu.cross_entropy import ( vocab_parallel_cross_entropy, @@ -45,6 +44,7 @@ class VocabParallelCrossEntropyCriterion(BaseCriterion): loss = vocab_parallel_cross_entropy(net_output[0].float(), target) if has_pad: loss = loss * (target != self.padding_idx) + batch_loss = loss.mean(-1) loss = loss.sum() # When using target loss only, use num tokens in target only as the sample_size # See StreamingSrcTgtDataset @@ -59,6 +59,9 @@ class VocabParallelCrossEntropyCriterion(BaseCriterion): "nsentences": sample["target"].size(0), "sample_size": sample_size, } + if not model.training: + logging_output.update({"batch_loss": batch_loss}) + logging_output.update({"id": sample["id"].tolist()}) if "src_tokens" in sample["net_input"] and hasattr(self.task, "eod"): logging_output["ndocseps"] = (sample["target"] == self.task.eod).sum() if ( @@ -91,6 +94,14 @@ class VocabParallelCrossEntropyCriterion(BaseCriterion): loss_sum = sum(log.get("loss", 0) for log in logging_outputs) ntokens = sum(log.get("ntokens", 0) for log in logging_outputs) sample_size = sum(log.get("sample_size", 0) for log in logging_outputs) + if logging_outputs[0].get("batch_loss", None) is not None: + batch_loss = [] + ids = [] + for log in logging_outputs: + batch_loss += log.get("batch_loss", []) + ids += log.get("id", []) + else: + batch_loss = None for type_ in ("actv", "pos", "tok", "emb"): key = f"{type_}_norm" @@ -112,6 +123,10 @@ class VocabParallelCrossEntropyCriterion(BaseCriterion): metrics.log_derived( "ppl", lambda meters: utils.get_perplexity(meters["loss"].avg) ) + if batch_loss is not None: + metrics.log_history( + "batch_loss", [(id, l) for id, l in zip(ids, batch_loss)], 1, round=3 + ) @staticmethod def logging_outputs_can_be_summed() -> bool: @@ -120,4 +135,4 @@ class VocabParallelCrossEntropyCriterion(BaseCriterion): across workers prior to calling `reduce_metrics`. Setting this to True will improves distributed training speed. """ - return True + return False # Since not all metrics are scalar, setting this to False to make it work. diff --git a/metaseq/model_parallel/modules/multihead_attention.py b/metaseq/model_parallel/modules/multihead_attention.py index c7200766fff23f85e7d41d2fa65f05c17b693195..b008887f83a241f00122a961366392ff225941bd 100644 --- a/metaseq/model_parallel/modules/multihead_attention.py +++ b/metaseq/model_parallel/modules/multihead_attention.py @@ -335,7 +335,9 @@ class ModelParallelMultiheadAttention(nn.Module): # Megatron's fused kernel: "ScaledUpperTriangMaskedSoftmax" seems to crash with odd shape across seq_len dimension. # This is okay for training cause training we have all seq_len nice power of 2s but during evaluation and generation, # we have seq_lens not power of 2. - CHANGES = False # not getattr(self, "inference", False) + CHANGES = ( + self.training + ) # not getattr(self, "inference", False) # Don't use megatron for eval because of internal assert errors. if CHANGES: output_size = ( diff --git a/metaseq/models/base_model.py b/metaseq/models/base_model.py index cc106c1ec4e6b6eacb7013d4207cc6d450447813..0d073be7aeec526ed2f1d92b16b68d450547e078 100644 --- a/metaseq/models/base_model.py +++ b/metaseq/models/base_model.py @@ -67,15 +67,8 @@ class BaseModel(nn.Module): """Get normalized probabilities (or log probs) from a net's output.""" if hasattr(self, "decoder"): return self.decoder.get_normalized_probs(logits, log_probs) - elif torch.is_tensor(net_output): - # syntactic sugar for simple models which don't have a decoder - # (e.g., the classification tutorial) - logits = net_output.float() - if log_probs: - return F.log_softmax(logits, dim=-1) - else: - return F.softmax(logits, dim=-1) - raise NotImplementedError + else: + raise NotImplementedError def extract_features(self, *args, **kwargs): """Similar to *forward* but only return features.""" diff --git a/metaseq/sequence_generator.py b/metaseq/sequence_generator.py index ac89555c3568d64ca10544a5cbf9f5e76732fb61..5e3b6b07ff0e8b0f433bc77c2b9f3f12ca1cb4fc 100644 --- a/metaseq/sequence_generator.py +++ b/metaseq/sequence_generator.py @@ -164,7 +164,9 @@ class SequenceGenerator(nn.Module): # need to handle normalization and prep for bookkeeping of incremental # decoding start_step = src_tokens.shape[1] - print(f"sample_id={sample['id'][0].item()} gpu_rank={torch.distributed.get_rank()} start_step={start_step}") + print( + f"sample_id={sample['id'][0].item()} gpu_rank={torch.distributed.get_rank()} start_step={start_step}" + ) # set all the forced tokens tokens[:, :start_step] = src_tokens.repeat_interleave(beam_size, 0) # compute the model predictions diff --git a/metaseq/tasks/streaming_finetune_language_modeling.py b/metaseq/tasks/streaming_finetune_language_modeling.py index 9b2345e1ca18bc9acd983a8afd9f7f2d04ad2626..e948d1a4c4b0626cc0be62fec97abc457a1d7a98 100644 --- a/metaseq/tasks/streaming_finetune_language_modeling.py +++ b/metaseq/tasks/streaming_finetune_language_modeling.py @@ -104,7 +104,9 @@ class StreamingFinetuneLanguageModelingTask(StreamingLanguageModelingTask): # We generate blocks with one extra token, so that we have a target # for the final input token. This results in slight data loss. block_size=self.args.tokens_per_sample + 1, - break_mode=self.args.sample_break_mode, + break_mode=self.args.sample_break_mode + if split == "train" + else "eos_pad_8", # eos mode for val/test to get accuracies # we drop the remainder block during training drop_last=(split == "train"), padding_idx=self.source_dictionary.pad(), diff --git a/metaseq_cli/train.py b/metaseq_cli/train.py index 676044fa1c3a6ec727a5962db61919b3ffb94a18..a363d4ccb8703e60e881b5e5fd174bcd432021ed 100644 --- a/metaseq_cli/train.py +++ b/metaseq_cli/train.py @@ -505,6 +505,7 @@ def validate( trainer.begin_valid_epoch(epoch_itr.epoch) valid_losses = [] + valid_losses_per_example = {} with metrics.aggregate(new_root=True) as combined_agg: for subset in subsets: logger.info( @@ -574,9 +575,32 @@ def validate( trainer.valid_step(sample) # log validation stats stats = get_valid_stats(cfg, trainer, agg.get_smoothed_values()) + if "batch_loss" in stats: + del stats["batch_loss"] + if subset in ["valid_pos", "valid_neg"]: + valid_losses_per_example[subset] = agg["batch_loss"].history progress.print(stats, tag=subset, step=trainer.get_num_updates()) valid_losses.append(stats[cfg.checkpoint.best_checkpoint_metric]) stats = get_valid_stats(cfg, trainer, combined_agg.get_smoothed_values()) + if "batch_loss" in stats: + del stats["batch_loss"] + if "valid_pos" in subsets and "valid_neg" in subsets: + pos_valid_losses_per_example = { + id: loss for id, loss in valid_losses_per_example["valid_pos"] + } + neg_valid_losses_per_example = { + id: loss for id, loss in valid_losses_per_example["valid_neg"] + } + assert len(pos_valid_losses_per_example.keys()) == len( + neg_valid_losses_per_example.keys() + ) + correct = 0 + for id, loss in pos_valid_losses_per_example.items(): + if loss < neg_valid_losses_per_example[id]: + correct += 1 + accuracy = correct / len(pos_valid_losses_per_example) + stats["accuracy"] = accuracy + progress.print(stats, tag="valid/combined", step=trainer.get_num_updates()) return valid_losses